Using Boto3 with lakeFS S3 Gateway¶
lakeFS exposes an S3-compatible API through its S3 Gateway, allowing you to use Boto3 (AWS SDK for Python) directly with lakeFS. This integration is perfect for existing S3 workflows and applications.
Info
To use Boto with lakeFS alongside S3, check out Boto S3 Router. It will route requests to either S3 or lakeFS according to the provided bucket name.
When to Use¶
Use Boto with lakeFS when you:
- Have existing S3 workflows you want to use with lakeFS
- Need S3-compatible operations (put, get, list, delete)
- Work with legacy S3 applications
- Want to migrate from S3 without code changes
For versioning-focused workflows, use the High-Level SDK or lakefs-spec.
Installation¶
Install Boto3 using pip:
Or upgrade to the latest version:
Basic Setup¶
Initializing Boto3 Client¶
import boto3
# Create S3 client pointing to lakeFS
s3 = boto3.client(
's3',
endpoint_url='https://example.lakefs.io',
aws_access_key_id='your-access-key',
aws_secret_access_key='your-secret-key',
region_name='us-east-1'
)
print("Client initialized")
Checksum Configuration¶
In newer versions of Boto3 when using HTTPS, you might encounter an AccessDenied error with lakeFS logs showing encoding/hex: invalid byte: U+0053 'S'. This is due to checksum configuration.
Configuring Checksum Settings¶
import boto3
from botocore.config import Config
# Configure checksum settings
config = Config(
request_checksum_calculation='when_required',
response_checksum_validation='when_required'
)
s3 = boto3.client(
's3',
endpoint_url='https://lakefs.example.io',
aws_access_key_id='your-access-key',
aws_secret_access_key='your-secret-key',
config=config
)
print("Client with checksum configuration initialized")
Basic Operations¶
Uploading Objects¶
import boto3
s3 = boto3.client(
's3',
endpoint_url='https://example.lakefs.io',
aws_access_key_id='your-access-key',
aws_secret_access_key='your-secret-key'
)
# Upload from bytes
data = b"Hello, lakeFS!"
s3.put_object(
Bucket='my-repo',
Key='main/data/hello.txt',
Body=data
)
# Upload from file
with open('local_file.csv', 'rb') as f:
s3.put_object(
Bucket='my-repo',
Key='main/data/imported.csv',
Body=f
)
# Upload with metadata
s3.put_object(
Bucket='my-repo',
Key='main/data/data.csv',
Body=b'id,name\n1,Alice\n2,Bob',
Metadata={
'owner': 'data-team',
'version': '1.0'
}
)
print("Upload complete")
Downloading Objects¶
import boto3
s3 = boto3.client(
's3',
endpoint_url='https://example.lakefs.io',
aws_access_key_id='your-access-key',
aws_secret_access_key='your-secret-key'
)
# Download entire object
response = s3.get_object(
Bucket='my-repo',
Key='main/data/data.csv'
)
data = response['Body'].read()
print(f"Downloaded {len(data)} bytes")
# Download to file
s3.download_file(
Bucket='my-repo',
Key='main/data/large_file.parquet',
Filename='local_file.parquet'
)
# Stream download (for large files)
response = s3.get_object(Bucket='my-repo', Key='main/data/large.csv')
for chunk in iter(lambda: response['Body'].read(1024), b''):
process_chunk(chunk)
Listing Objects¶
import boto3
s3 = boto3.client(
's3',
endpoint_url='https://example.lakefs.io',
aws_access_key_id='your-access-key',
aws_secret_access_key='your-secret-key'
)
# List objects in branch
response = s3.list_objects_v2(
Bucket='my-repo',
Prefix='main/data/'
)
for obj in response.get('Contents', []):
print(f"{obj['Key']} ({obj['Size']} bytes)")
# List objects at commit
response = s3.list_objects_v2(
Bucket='my-repo',
Prefix='abc123def456/data/'
)
for obj in response.get('Contents', []):
print(f"{obj['Key']}")
Getting Object Metadata¶
import boto3
s3 = boto3.client(
's3',
endpoint_url='https://example.lakefs.io',
aws_access_key_id='your-access-key',
aws_secret_access_key='your-secret-key'
)
# Head object
response = s3.head_object(
Bucket='my-repo',
Key='main/data/file.csv'
)
print(f"Content Type: {response.get('ContentType')}")
print(f"Content Length: {response.get('ContentLength')}")
print(f"Last Modified: {response.get('LastModified')}")
print(f"Metadata: {response.get('Metadata')}")
Deleting Objects¶
import boto3
s3 = boto3.client(
's3',
endpoint_url='https://example.lakefs.io',
aws_access_key_id='your-access-key',
aws_secret_access_key='your-secret-key'
)
# Delete single object
s3.delete_object(
Bucket='my-repo',
Key='main/data/temp.txt'
)
# Delete multiple objects
s3.delete_objects(
Bucket='my-repo',
Delete={
'Objects': [
{'Key': 'main/data/file1.txt'},
{'Key': 'main/data/file2.txt'},
{'Key': 'main/data/file3.txt'}
]
}
)
print("Delete complete")
Real-World Workflows¶
ETL with S3-Like Operations¶
import boto3
import csv
import io
def etl_pipeline():
s3 = boto3.client(
's3',
endpoint_url='https://example.lakefs.io',
aws_access_key_id='your-access-key',
aws_secret_access_key='your-secret-key'
)
# Extract: Read from source
response = s3.get_object(Bucket='my-repo', Key='main/raw/input.csv')
input_data = response['Body'].read().decode()
# Transform: Process data
reader = csv.DictReader(io.StringIO(input_data))
rows = list(reader)
# Clean: Remove duplicates
unique_rows = {row['id']: row for row in rows}.values()
# Load: Write processed data
output = io.StringIO()
writer = csv.DictWriter(output, fieldnames=['id', 'name', 'value'])
writer.writeheader()
writer.writerows(unique_rows)
s3.put_object(
Bucket='my-repo',
Key='main/processed/output.csv',
Body=output.getvalue()
)
print(f"ETL complete: {len(unique_rows)} unique records")
etl_pipeline()
Backup and Sync¶
import boto3
import os
from pathlib import Path
def backup_to_lakeFS(local_dir, repo, branch, prefix):
"""Backup local directory to lakeFS"""
s3 = boto3.client(
's3',
endpoint_url='https://example.lakefs.io',
aws_access_key_id='your-access-key',
aws_secret_access_key='your-secret-key'
)
count = 0
for local_file in Path(local_dir).rglob('*'):
if local_file.is_file():
# Calculate remote path
rel_path = local_file.relative_to(local_dir)
remote_path = f"{branch}/{prefix}/{rel_path}".replace("\\", "/")
# Upload
with open(local_file, 'rb') as f:
s3.put_object(
Bucket=repo,
Key=remote_path,
Body=f
)
count += 1
if count % 100 == 0:
print(f"Backed up {count} files...")
print(f"Backup complete: {count} files uploaded")
# Usage:
# backup_to_lakeFS("/path/to/local/data", "my-repo", "main", "backups/2024-01")
Copy from S3 to lakeFS¶
import boto3
def migrate_s3_to_lakefs(s3_bucket, prefix, repo, branch):
"""Migrate data from S3 to lakeFS"""
# Connect to S3
s3_source = boto3.client(
's3',
region_name='us-east-1'
)
# Connect to lakeFS
s3_dest = boto3.client(
's3',
endpoint_url='https://example.lakefs.io',
aws_access_key_id='your-access-key',
aws_secret_access_key='your-secret-key'
)
# List objects
paginator = s3_source.get_paginator('list_objects_v2')
pages = paginator.paginate(Bucket=s3_bucket, Prefix=prefix)
count = 0
for page in pages:
for obj in page.get('Contents', []):
# Download from S3
response = s3_source.get_object(
Bucket=s3_bucket,
Key=obj['Key']
)
data = response['Body'].read()
# Upload to lakeFS
s3_dest.put_object(
Bucket=repo,
Key=f"{branch}/{obj['Key']}",
Body=data
)
count += 1
if count % 100 == 0:
print(f"Migrated {count} objects...")
print(f"Migration complete: {count} objects")
# Usage:
# migrate_s3_to_lakefs("my-s3-bucket", "data/", "my-repo", "main")
Version-Specific Access¶
import boto3
def read_from_commit(repo, commit_id, key):
"""Read object from specific commit"""
s3 = boto3.client(
's3',
endpoint_url='https://example.lakefs.io',
aws_access_key_id='your-access-key',
aws_secret_access_key='your-secret-key'
)
# Use commit ID as prefix
response = s3.get_object(
Bucket=repo,
Key=f"{commit_id}/{key}"
)
return response['Body'].read()
# Usage:
# data = read_from_commit("my-repo", "abc123def456", "data/file.csv")
Error Handling¶
import boto3
from botocore.exceptions import ClientError
s3 = boto3.client(
's3',
endpoint_url='https://example.lakefs.io',
aws_access_key_id='your-access-key',
aws_secret_access_key='your-secret-key'
)
try:
s3.put_object(
Bucket='my-repo',
Key='main/data/file.txt',
Body=b'data'
)
except ClientError as e:
error_code = e.response['Error']['Code']
if error_code == 'AccessDenied':
print("Access denied - check credentials or permissions")
elif error_code == 'NoSuchBucket':
print("Bucket not found - check repository name")
else:
print(f"Error: {error_code}")
Further Resources¶
- lakeFS S3 Gateway - S3 Gateway API documentation
- Boto3 Documentation - Official Boto3 reference