Link Search Menu Expand Document

Use Python to interact with your objects on lakeFS

High Level Python SDK New
We’ve just released a new High Level Python SDK library, and we’re super excited to tell you about it! Continue reading to get the full story!
Though our previous SDK client is still supported and maintained, we highly recommend using the new High Level SDK.
For previous Python SDKs follow these links:
lakefs-sdk
legacy-sdk (Depracated)

There are two primary ways to work with lakeFS from Python:

Using the lakeFS SDK

Installing

Install the Python client using pip:

pip install lakefs

Initializing

The High Level SDK by default will try to collect authentication parameters from the environment and attempt to create a default client. When working in an environment where lakectl is configured it is not necessary to instantiate a lakeFS client or provide it for creating the lakeFS objects. In case no authentication parameters exist, it is also possible to explicitly create a lakeFS client

Here’s how to instantiate a client:

from lakefs.client import Client

clt = Client(
    host="http://localhost:8000",
    username="AKIAIOSFODNN7EXAMPLE",
    password="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
)

For testing SSL endpoints you may wish to use a self-signed certificate. If you do this and receive an SSL: CERTIFICATE_VERIFY_FAILED error message you might add the following configuration to your client:

clt = Client(
    host="http://localhost:8000",
    username="AKIAIOSFODNN7EXAMPLE",
    password="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
    verify_ssl=False,
)

This setting allows well-known “man-in-the-middle”, impersonation, and credential stealing attacks. Never use this in any production setting.

Optionally, to enable communication via proxies, simply set the proxy configuration:

clt = Client(
    host="http://localhost:8000",
    username="AKIAIOSFODNN7EXAMPLE",
    password="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
    ssl_ca_cert="<path to a file of concatenated CA certificates in PEM format>",  # Set this to customize the certificate file to verify the peer
    proxy="<proxy server URL>",
)

Usage Examples

Lets see how we can interact with lakeFS using the High Level SDK.

Creating a repository

import lakefs

repo = lakefs.repository("example-repo").create(storage_namespace="s3://storage-bucket/repos/example-repo")
print(repo)

If using an explicit client, create the Repository object and pass the client to it (note the changed syntax).

import lakefs
from lakefs.client import Client

clt = Client(
    host="http://localhost:8000",
    username="AKIAIOSFODNN7EXAMPLE",
    password="wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
)

repo = lakefs.Repository("example-repo", client=clt).create(storage_namespace="s3://storage-bucket/repos/example-repo")
print(repo)

Output

{id: 'example-repo', creation_date: 1697815536, default_branch: 'main', storage_namespace: 's3://storage-bucket/repos/example-repo'}

List repositories

import lakefs

print("Listing repositories:")
for repo in lakefs.repositories():
    print(repo)

Output

Listing repositories:
{id: 'example-repo', creation_date: 1697815536, default_branch: 'main', storage_namespace: 's3://storage-bucket/repos/example-repo'}

Creating a branch

import lakefs

branch1 = lakefs.repository("example-repo").branch("experiment1").create(source_reference_id="main")
print("experiment1 ref:", branch1.get_commit().id)

branch1 = lakefs.repository("example-repo").branch("experiment2").create(source_reference_id="main")
print("experiment2 ref:", branch2.get_commit().id)

Output

experiment1 ref: 7a300b41a8e1ca666c653171a364c08f640549c24d7e82b401bf077c646f8859
experiment2 ref: 7a300b41a8e1ca666c653171a364c08f640549c24d7e82b401bf077c646f8859

List branches

import lakefs

for branch in lakefs.repository("example-repo").branches():
    print(branch)

Output

experiment1
experiment2
main

IO

Great, now lets see some IO operations in action!
The new High Level SDK provide IO semantics which allow to work with lakeFS objects as if they were files in your filesystem. This is extremely useful when working with data transformation packages that accept file descriptors and streams.

Upload

A simple way to upload data is to use the upload method which accepts contents as str/bytes

obj = branch1.object(path="text/sample_data.txt").upload(content_type="text/plain", data="This is my object data")
print(obj.stats())

Output

{'path': 'text/sample_data.txt', 'physical_address': 's3://storage-bucket/repos/example-repo/data/gke0ignnl531fa6k90p0/ckpfk4fnl531fa6k90pg', 'physical_address_expiry': None, 'checksum': '4a09d10820234a95bb548f14e4435bba', 'size_bytes': 15, 'mtime': 1701865289, 'metadata': {}, 'content_type': 'text/plain'}

Reading the data is just as simple:

print(obj.reader(mode='r').read())

Output

This is my object data

Now let’s generate a “sample_data.csv” file and write it directly to a lakeFS writer object

import csv

sample_data = [
    [1, "Alice", "alice@example.com"],
    [2, "Bob", "bob@example.com"],
    [3, "Carol", "carol@example.com"],
]

obj = branch1.object(path="csv/sample_data.csv")

with obj.writer(mode='w', pre_sign=True, content_type="text/csv") as fd:
    writer = csv.writer(fd)
    writer.writerow(["ID", "Name", "Email"])
    for row in sample_data:
        writer.writerow(row)

On context exit the object will be uploaded to lakeFS

print(obj.stats())

Output

{'path': 'csv/sample_data.csv', 'physical_address': 's3://storage-bucket/repos/example-repo/data/gke0ignnl531fa6k90p0/ckpfk4fnl531fa6k90pg', 'physical_address_expiry': None, 'checksum': 'f181262c138901a74d47652d5ea72295', 'size_bytes': 88, 'mtime': 1701865939, 'metadata': {}, 'content_type': 'text/csv'}

We can also upload raw byte contents:

obj = branch1.object(path="raw/file1.data").upload(data=b"Hello Object World", pre_sign=True)
print(obj.stats())

Output

{'path': 'raw/file1.data', 'physical_address': 's3://storage-bucket/repos/example-repo/data/gke0ignnl531fa6k90p0/ckpfltvnl531fa6k90q0', 'physical_address_expiry': None, 'checksum': '0ef432f8eb0305f730b0c57bbd7a6b08', 'size_bytes': 18, 'mtime': 1701866323, 'metadata': {}, 'content_type': 'application/octet-stream'}

Uncommitted changes

Using the branch uncommmitted method will show all the uncommitted changes on that branch:

for diff in branch1.uncommitted():
    print(diff)

Output

{'type': 'added', 'path': 'text/sample_data.txt', 'path_type': 'object', 'size_bytes': 15}
{'type': 'added', 'path': 'csv/sample_data.csv', 'path_type': 'object', 'size_bytes': 88}
{'type': 'added', 'path': 'raw/file1.data', 'path_type': 'object', 'size_bytes': 18}

As expected, our change appears here. Let’s commit it and attach some arbitrary metadata:

ref = branch1.commit(message='Add some data!', metadata={'using': 'python_sdk'})
print(ref.get_commit())

Output

{'id': 'c4666db80d2a984b4eab8ce02b6a60830767eba53995c26350e0ad994e15fedb', 'parents': ['a7a092a5a32a2cd97f22abcc99414f6283d29f6b9dd2725ce89f90188c5901e5'], 'committer': 'admin', 'message': 'Add some data!', 'creation_date': 1701866838, 'meta_range_id': '999bedeab1b740f83d2cf8c52548d55446f9038c69724d399adc4438412cade2', 'metadata': {'using': 'python_sdk'}}

Calling uncommitted again on the same branch, this time there should be no uncommitted files:

print(len(list(branch1.uncommitted())))

Output

0

Merging changes from a branch into main

Let’s diff between your branch and the main branch:

main = repo.branch("main")
for diff in main.diff(other_ref=branch1):
    print(diff)

Output

{'type': 'added', 'path': 'text/sample_data.txt', 'path_type': 'object', 'size_bytes': 15}
{'type': 'added', 'path': 'csv/sample_data.csv', 'path_type': 'object', 'size_bytes': 88}
{'type': 'added', 'path': 'raw/file1.data', 'path_type': 'object', 'size_bytes': 18}

Looks like we have some changes. Let’s merge them:

res = branch1.merge_into(main)
print(res)
# output:
# cfddb68b7265ae0b17fafa1a2068f8414395e0a8b8bc0f8d741cbcce1e67e394

Let’s diff again - there should be no changes as all changes are on our main branch already:

print(len(list(main.diff(other_ref=branch1))))

Output

0

Read data from main branch

import csv

obj = main.object(path="csv/sample_data.csv")

for row in csv.reader(obj.reader(mode='r')):
    print(row)

Output

['ID', 'Name', 'Email']
['1', 'Alice', 'alice@example.com']
['2', 'Bob', 'bob@example.com']
['3', 'Carol', 'carol@example.com']

Importing data into lakeFS

The new SDK makes it much easier to import existing data from the object store into lakeFS, using the new ImportManager

import lakefs

branch = lakefs.repository("example-repo").repo.branch("experiment3")

# We can import data from multiple sources in a single import process
# The following example initializes a new ImportManager and adds 2 source types; A prefix and an object.
importer = branch.import_data(commit_message="added public S3 data") \ 
    .prefix("s3://example-bucket1/path1/", destination="datasets/path1/") \
    .object("s3://example-bucket1/path2/imported_obj", destination="datasets/path2/imported_obj")

# run() is a convenience method that blocks until the import is reported as done, raising an exception if it fails.
importer.run()

Alternatively we can call start() and status() ourselves for an async version of the above

import time

# Async version
importer.start()
status = importer.start()

while not status.completed or status.error is None:
        time.sleep(3)  # or whatever interval you choose
        status = importer.status()

if status.error:
    # handle!
    
print(f"imported a total of {status.ingested_objects} objects!")

Output

imported a total of 25478 objects!

Transactions

Transactions is a new feature in the High Level SDK. It allows performing a sequence of operations on a branch as an atomic unit, similarly to how database transactions work. Under the hood, the transaction creates an ephemeral branch from the source branch, performs all the operation on that branch, and merges it back to the source branch once the transaction is completed. Transactions are currently supported as a context manager only.

import lakefs

branch = lakefs.repository("example-repo").repo.branch("experiment3")

with branch.transact(commit_message="my transaction") as tx:
    for obj in tx.objects(prefix="prefix_to_delete/"):  # Delete some objects
        obj.delete()

    # Create new object
    tx.object("new_object").upload("new object data")

print(len(list(branch.objects(prefix="prefix_to_delete/"))))
print(branch.object("new_object").exists())

Output

0
True

Python SDK documentation and API reference

For the documentation of lakeFS’s Python package and full api reference, see https://pydocs-lakefs.lakefs.io

Using Boto

💡 To use Boto with lakeFS alongside S3, check out Boto S3 Router. It will route requests to either S3 or lakeFS according to the provided bucket name.

lakeFS exposes an S3-compatible API, so you can use Boto to interact with your objects on lakeFS.

Initializing

Create a Boto3 S3 client with your lakeFS endpoint and key-pair:

import boto3
s3 = boto3.client('s3',
    endpoint_url='https://lakefs.example.com',
    aws_access_key_id='AKIAIOSFODNN7EXAMPLE',
    aws_secret_access_key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY')

The client is now configured to operate on your lakeFS installation.

Usage Examples

Put an object into lakeFS

Use a branch name and a path to put an object in lakeFS:

with open('/local/path/to/file_0', 'rb') as f:
    s3.put_object(Body=f, Bucket='example-repo', Key='main/example-file.parquet')

You can now commit this change using the lakeFS UI or CLI.

List objects

List the branch objects starting with a prefix:

list_resp = s3.list_objects_v2(Bucket='example-repo', Prefix='main/example-prefix')
for obj in list_resp['Contents']:
    print(obj['Key'])

Or, use a lakeFS commit ID to list objects for a specific commit:

list_resp = s3.list_objects_v2(Bucket='example-repo', Prefix='c7a632d74f/example-prefix')
for obj in list_resp['Contents']:
    print(obj['Key'])

Get object metadata

Get object metadata using branch and path:

s3.head_object(Bucket='example-repo', Key='main/example-file.parquet')
# output:
# {'ResponseMetadata': {'RequestId': '72A9EBD1210E90FA',
#  'HostId': '',
#  'HTTPStatusCode': 200,
#  'HTTPHeaders': {'accept-ranges': 'bytes',
#   'content-length': '1024',
#   'etag': '"2398bc5880e535c61f7624ad6f138d62"',
#   'last-modified': 'Sun, 24 May 2020 10:42:24 GMT',
#   'x-amz-request-id': '72A9EBD1210E90FA',
#   'date': 'Sun, 24 May 2020 10:45:42 GMT'},
#  'RetryAttempts': 0},
# 'AcceptRanges': 'bytes',
# 'LastModified': datetime.datetime(2020, 5, 24, 10, 42, 24, tzinfo=tzutc()),
# 'ContentLength': 1024,
# 'ETag': '"2398bc5880e535c61f7624ad6f138d62"',
# 'Metadata': {}}