Skip to content

Using Boto3 with lakeFS S3 Gateway

lakeFS exposes an S3-compatible API through its S3 Gateway, allowing you to use Boto3 (AWS SDK for Python) directly with lakeFS. This integration is perfect for existing S3 workflows and applications.

Info

To use Boto with lakeFS alongside S3, check out Boto S3 Router. It will route requests to either S3 or lakeFS according to the provided bucket name.

When to Use

Use Boto with lakeFS when you:

  • Have existing S3 workflows you want to use with lakeFS
  • Need S3-compatible operations (put, get, list, delete)
  • Work with legacy S3 applications
  • Want to migrate from S3 without code changes

For versioning-focused workflows, use the High-Level SDK or lakefs-spec.

Installation

Install Boto3 using pip:

pip install boto3

Or upgrade to the latest version:

pip install --upgrade boto3

Basic Setup

Initializing Boto3 Client

import boto3

# Create S3 client pointing to lakeFS
s3 = boto3.client(
    's3',
    endpoint_url='https://example.lakefs.io',
    aws_access_key_id='your-access-key',
    aws_secret_access_key='your-secret-key',
    region_name='us-east-1'
)

print("Client initialized")

Checksum Configuration

In newer versions of Boto3 when using HTTPS, you might encounter an AccessDenied error with lakeFS logs showing encoding/hex: invalid byte: U+0053 'S'. This is due to checksum configuration.

Configuring Checksum Settings

import boto3
from botocore.config import Config

# Configure checksum settings
config = Config(
    request_checksum_calculation='when_required',
    response_checksum_validation='when_required'
)

s3 = boto3.client(
    's3',
    endpoint_url='https://lakefs.example.io',
    aws_access_key_id='your-access-key',
    aws_secret_access_key='your-secret-key',
    config=config
)

print("Client with checksum configuration initialized")

Basic Operations

Uploading Objects

import boto3

s3 = boto3.client(
    's3',
    endpoint_url='https://example.lakefs.io',
    aws_access_key_id='your-access-key',
    aws_secret_access_key='your-secret-key'
)

# Upload from bytes
data = b"Hello, lakeFS!"
s3.put_object(
    Bucket='my-repo',
    Key='main/data/hello.txt',
    Body=data
)

# Upload from file
with open('local_file.csv', 'rb') as f:
    s3.put_object(
        Bucket='my-repo',
        Key='main/data/imported.csv',
        Body=f
    )

# Upload with metadata
s3.put_object(
    Bucket='my-repo',
    Key='main/data/data.csv',
    Body=b'id,name\n1,Alice\n2,Bob',
    Metadata={
        'owner': 'data-team',
        'version': '1.0'
    }
)

print("Upload complete")

Downloading Objects

import boto3

s3 = boto3.client(
    's3',
    endpoint_url='https://example.lakefs.io',
    aws_access_key_id='your-access-key',
    aws_secret_access_key='your-secret-key'
)

# Download entire object
response = s3.get_object(
    Bucket='my-repo',
    Key='main/data/data.csv'
)
data = response['Body'].read()
print(f"Downloaded {len(data)} bytes")

# Download to file
s3.download_file(
    Bucket='my-repo',
    Key='main/data/large_file.parquet',
    Filename='local_file.parquet'
)

# Stream download (for large files)
response = s3.get_object(Bucket='my-repo', Key='main/data/large.csv')
for chunk in iter(lambda: response['Body'].read(1024), b''):
    process_chunk(chunk)

Listing Objects

import boto3

s3 = boto3.client(
    's3',
    endpoint_url='https://example.lakefs.io',
    aws_access_key_id='your-access-key',
    aws_secret_access_key='your-secret-key'
)

# List objects in branch
response = s3.list_objects_v2(
    Bucket='my-repo',
    Prefix='main/data/'
)

for obj in response.get('Contents', []):
    print(f"{obj['Key']} ({obj['Size']} bytes)")

# List objects at commit
response = s3.list_objects_v2(
    Bucket='my-repo',
    Prefix='abc123def456/data/'
)

for obj in response.get('Contents', []):
    print(f"{obj['Key']}")

Getting Object Metadata

import boto3

s3 = boto3.client(
    's3',
    endpoint_url='https://example.lakefs.io',
    aws_access_key_id='your-access-key',
    aws_secret_access_key='your-secret-key'
)

# Head object
response = s3.head_object(
    Bucket='my-repo',
    Key='main/data/file.csv'
)

print(f"Content Type: {response.get('ContentType')}")
print(f"Content Length: {response.get('ContentLength')}")
print(f"Last Modified: {response.get('LastModified')}")
print(f"Metadata: {response.get('Metadata')}")

Deleting Objects

import boto3

s3 = boto3.client(
    's3',
    endpoint_url='https://example.lakefs.io',
    aws_access_key_id='your-access-key',
    aws_secret_access_key='your-secret-key'
)

# Delete single object
s3.delete_object(
    Bucket='my-repo',
    Key='main/data/temp.txt'
)

# Delete multiple objects
s3.delete_objects(
    Bucket='my-repo',
    Delete={
        'Objects': [
            {'Key': 'main/data/file1.txt'},
            {'Key': 'main/data/file2.txt'},
            {'Key': 'main/data/file3.txt'}
        ]
    }
)

print("Delete complete")

Real-World Workflows

ETL with S3-Like Operations

import boto3
import csv
import io

def etl_pipeline():
    s3 = boto3.client(
        's3',
        endpoint_url='https://example.lakefs.io',
        aws_access_key_id='your-access-key',
        aws_secret_access_key='your-secret-key'
    )

    # Extract: Read from source
    response = s3.get_object(Bucket='my-repo', Key='main/raw/input.csv')
    input_data = response['Body'].read().decode()

    # Transform: Process data
    reader = csv.DictReader(io.StringIO(input_data))
    rows = list(reader)

    # Clean: Remove duplicates
    unique_rows = {row['id']: row for row in rows}.values()

    # Load: Write processed data
    output = io.StringIO()
    writer = csv.DictWriter(output, fieldnames=['id', 'name', 'value'])
    writer.writeheader()
    writer.writerows(unique_rows)

    s3.put_object(
        Bucket='my-repo',
        Key='main/processed/output.csv',
        Body=output.getvalue()
    )

    print(f"ETL complete: {len(unique_rows)} unique records")

etl_pipeline()

Backup and Sync

import boto3
import os
from pathlib import Path

def backup_to_lakeFS(local_dir, repo, branch, prefix):
    """Backup local directory to lakeFS"""
    s3 = boto3.client(
        's3',
        endpoint_url='https://example.lakefs.io',
        aws_access_key_id='your-access-key',
        aws_secret_access_key='your-secret-key'
    )

    count = 0
    for local_file in Path(local_dir).rglob('*'):
        if local_file.is_file():
            # Calculate remote path
            rel_path = local_file.relative_to(local_dir)
            remote_path = f"{branch}/{prefix}/{rel_path}".replace("\\", "/")

            # Upload
            with open(local_file, 'rb') as f:
                s3.put_object(
                    Bucket=repo,
                    Key=remote_path,
                    Body=f
                )
            count += 1

            if count % 100 == 0:
                print(f"Backed up {count} files...")

    print(f"Backup complete: {count} files uploaded")

# Usage:
# backup_to_lakeFS("/path/to/local/data", "my-repo", "main", "backups/2024-01")

Copy from S3 to lakeFS

import boto3

def migrate_s3_to_lakefs(s3_bucket, prefix, repo, branch):
    """Migrate data from S3 to lakeFS"""
    # Connect to S3
    s3_source = boto3.client(
        's3',
        region_name='us-east-1'
    )

    # Connect to lakeFS
    s3_dest = boto3.client(
        's3',
        endpoint_url='https://example.lakefs.io',
        aws_access_key_id='your-access-key',
        aws_secret_access_key='your-secret-key'
    )

    # List objects
    paginator = s3_source.get_paginator('list_objects_v2')
    pages = paginator.paginate(Bucket=s3_bucket, Prefix=prefix)

    count = 0
    for page in pages:
        for obj in page.get('Contents', []):
            # Download from S3
            response = s3_source.get_object(
                Bucket=s3_bucket,
                Key=obj['Key']
            )
            data = response['Body'].read()

            # Upload to lakeFS
            s3_dest.put_object(
                Bucket=repo,
                Key=f"{branch}/{obj['Key']}",
                Body=data
            )

            count += 1
            if count % 100 == 0:
                print(f"Migrated {count} objects...")

    print(f"Migration complete: {count} objects")

# Usage:
# migrate_s3_to_lakefs("my-s3-bucket", "data/", "my-repo", "main")

Version-Specific Access

import boto3

def read_from_commit(repo, commit_id, key):
    """Read object from specific commit"""
    s3 = boto3.client(
        's3',
        endpoint_url='https://example.lakefs.io',
        aws_access_key_id='your-access-key',
        aws_secret_access_key='your-secret-key'
    )

    # Use commit ID as prefix
    response = s3.get_object(
        Bucket=repo,
        Key=f"{commit_id}/{key}"
    )

    return response['Body'].read()

# Usage:
# data = read_from_commit("my-repo", "abc123def456", "data/file.csv")

Error Handling

import boto3
from botocore.exceptions import ClientError

s3 = boto3.client(
    's3',
    endpoint_url='https://example.lakefs.io',
    aws_access_key_id='your-access-key',
    aws_secret_access_key='your-secret-key'
)

try:
    s3.put_object(
        Bucket='my-repo',
        Key='main/data/file.txt',
        Body=b'data'
    )
except ClientError as e:
    error_code = e.response['Error']['Code']
    if error_code == 'AccessDenied':
        print("Access denied - check credentials or permissions")
    elif error_code == 'NoSuchBucket':
        print("Bucket not found - check repository name")
    else:
        print(f"Error: {error_code}")

Further Resources