Skip to content

Working with Objects & Data Operations

This guide covers object operations in lakeFS, including uploading, downloading, batch operations, and metadata management.

Basic Object Operations

Uploading Objects

Upload data to lakeFS:

import lakefs

repo = lakefs.repository("my-data-repo")
branch = repo.branch("main")

# Upload text data
branch.object("data/simple.txt").upload(
    data=b"Hello, lakeFS!"
)

# Upload with content type
branch.object("data/data.json").upload(
    data=b'{"key": "value"}',
    content_type="application/json"
)

# Upload larger data
csv_data = b"id,name,value\n1,Alice,100\n2,Bob,200\n3,Carol,300"
branch.object("data/records.csv").upload(data=csv_data)

print("Objects uploaded successfully")

Downloading Objects

Read object data from lakeFS:

import lakefs

repo = lakefs.repository("my-data-repo")
branch = repo.branch("main")

# Read as text
with branch.object("data/simple.txt").reader(mode='r') as f:
    content = f.read()
    print(f"Content: {content}")

# Read as binary
with branch.object("data/data.json").reader(mode='rb') as f:
    binary_content = f.read()
    print(f"Binary size: {len(binary_content)} bytes")

# Read CSV and process
import csv
import io

with branch.object("data/records.csv").reader(mode='r') as f:
    reader = csv.DictReader(f)
    for row in reader:
        print(f"  {row['name']}: {row['value']}")

Object Information & Metadata

Get object details and metadata:

import lakefs

repo = lakefs.repository("my-data-repo")
branch = repo.branch("main")

obj = branch.object("data/records.csv")

# Check if object exists
try:
    if obj.exists():
        print("Object exists")
except:
    print("Object not found")

# Get object statistics
stat = obj.stat()
print(f"Size: {stat.size_bytes} bytes")
print(f"Modified: {stat.mtime}")
print(f"Checksum: {stat.checksum}")
print(f"Content Type: {stat.content_type}")
print(f"Path: {stat.path}")

Deleting Objects

Remove objects from lakeFS:

import lakefs

repo = lakefs.repository("my-data-repo")
branch = repo.branch("main")

# Delete a single object
obj = branch.object("data/temp_file.txt")
obj.delete()
print("Object deleted")

# Handle non-existent objects gracefully
try:
    obj.delete()
except Exception as e:
    print(f"Delete failed: {e}")

Batch Operations

Batch Delete Multiple Objects

Delete many objects efficiently:

import lakefs

repo = lakefs.repository("my-data-repo")
branch = repo.branch("main")

# Delete multiple objects by path
paths_to_delete = [
    "data/file1.csv",
    "data/file2.csv",
    "data/file3.csv",
    "logs/temp.log"
]

try:
    branch.delete_objects(paths_to_delete)
    print(f"Deleted {len(paths_to_delete)} objects")
except Exception as e:
    print(f"Batch delete failed: {e}")

Listing and Filtering Objects

List Objects by Prefix

List all objects under a path:

import lakefs

repo = lakefs.repository("my-data-repo")
branch = repo.branch("main")

# List all objects in data/ folder
print("Objects in data/:")
for obj in branch.objects(prefix="data/"):
    print(f"  {obj.path} ({obj.size_bytes} bytes)")

# Count total objects
total_objects = 0
for _ in branch.objects(prefix="data/"):
    total_objects += 1
print(f"Total objects: {total_objects}")

List with Delimiter (Folder View)

Use delimiter to see folder structure:

import lakefs

repo = lakefs.repository("my-data-repo")
branch = repo.branch("main")

# List with folder delimiter
print("Folder structure (with /):")
for item in branch.objects(prefix="", delimiter="/"):
    if hasattr(item, 'path'):
        # It's a file
        print(f"  FILE: {item.path}")
    else:
        # It's a folder
        print(f"  FOLDER: {item.name}")

Working with Object Metadata

Set Custom Object Metadata

Attach custom metadata to objects:

import lakefs
import json

repo = lakefs.repository("my-data-repo")
branch = repo.branch("main")

# Create object with metadata
obj = branch.object("data/important.csv")
obj.upload(
    data=b"id,value\n1,100",
    metadata={
        "owner": "data-team",
        "sensitivity": "public",
        "version": "1.0"
    }
)

print("Object uploaded with metadata")

Read Object Metadata

Retrieve object metadata:

import lakefs

repo = lakefs.repository("my-data-repo")
branch = repo.branch("main")

obj = branch.object("data/important.csv")
stat = obj.stat()

print(f"Object: {stat.path}")
print(f"Size: {stat.size_bytes}")
print(f"Metadata: {stat.metadata}")

Real-World Workflows

Data Cleanup

Remove old and temporary files:

import lakefs
from datetime import datetime, timedelta

def cleanup_old_files(repo_name, branch_name, days_old=7):
    """Delete files older than specified days"""
    repo = lakefs.repository(repo_name)
    branch = repo.branch(branch_name)

    cutoff_time = datetime.now().timestamp() - (days_old * 24 * 60 * 60)

    old_files = []

    for obj in branch.objects():
        if hasattr(obj, 'mtime') and obj.mtime < cutoff_time:
            old_files.append(obj.path)

    if old_files:
        print(f"Found {len(old_files)} files older than {days_old} days")
        branch.delete_objects(old_files)
        print(f"Deleted {len(old_files)} old files")
        return len(old_files)
    else:
        print("No old files to delete")
        return 0


# Usage:
deleted_count = cleanup_old_files("archive-repo", "main", days_old=30)
print(f"Cleanup complete: {deleted_count} files removed")

Bulk Data Import

Import multiple files efficiently:

import lakefs
import os

def bulk_import_files(repo_name, branch_name, local_dir, lakeFS_prefix):
    """Import all files from local directory"""
    repo = lakefs.repository(repo_name)
    branch = repo.branch(branch_name)

    imported = 0
    errors = 0

    # Walk local directory
    for root, dirs, files in os.walk(local_dir):
        for filename in files:
            local_path = os.path.join(root, filename)

            # Calculate lakeFS path
            rel_path = os.path.relpath(local_path, local_dir)
            lakeFS_path = f"{lakeFS_prefix}/{rel_path}".replace("\\", "/")

            try:
                # Read and upload file
                with open(local_path, 'rb') as f:
                    data = f.read()

                branch.object(lakeFS_path).upload(data=data)
                print(f"  Imported: {lakeFS_path}")
                imported += 1

            except Exception as e:
                print(f"  Error importing {lakeFS_path}: {e}")
                errors += 1

    return imported, errors


# Usage (pseudo-code - adjust for your environment):
# imported, errors = bulk_import_files(
#     "my-repo",
#     "main",
#     "/local/data/directory",
#     "data/imports"
# )
# print(f"Imported: {imported}, Errors: {errors}")

Stream Processing

Process large files efficiently:

import lakefs
import io

def process_csv_stream(repo_name, branch_name, file_path, processor_func):
    """Process large CSV file line by line"""
    repo = lakefs.repository(repo_name)
    branch = repo.branch(branch_name)

    processed = 0

    with branch.object(file_path).reader(mode='r') as f:
        for line in f:
            processor_func(line.strip())
            processed += 1

    return processed


# Usage:
def count_records(line):
    pass  # Do something with each line


count = process_csv_stream(
    "data-repo",
    "main",
    "data/large_file.csv",
    count_records
)

Creating a Complete Data Pipeline

Implement an end-to-end pipeline with data operations, transactions, and merging:

import lakefs

# Get repository and create experiment branch
repo = lakefs.repository("analytics-repo")
branch = repo.branch("processing-v2").create(source_reference="main")

try:
    # Upload raw data
    branch.object("raw/input.csv").upload(data=raw_data)

    # Perform transformations with transactions
    with branch.transact(commit_message="Process raw data") as tx:
        # Read and transform
        with tx.object("raw/input.csv").reader() as f:
            processed = transform(f.read())

        # Write processed data
        tx.object("processed/output.csv").upload(data=processed)

    # Review changes before merging
    changes = list(branch.uncommitted())
    print(f"Changes: {len(changes)} objects")

    # Merge to main if satisfied
    branch.merge_into(repo.branch("main"))

except Exception as e:
    print(f"Error in pipeline: {e}")
    branch.delete()  # Clean up on failure

This pattern ensures:

  • Raw data is preserved in isolation
  • Transformations are atomic (all-or-nothing)
  • Changes are reviewable before integration
  • Failed pipelines can be safely cleaned up

Error Handling

Handling Object Errors

import lakefs
from lakefs.exceptions import NotFoundException, ObjectNotFoundException

repo = lakefs.repository("my-data-repo")
branch = repo.branch("main")

# Object not found
try:
    obj = branch.object("non-existent.csv")
    obj.delete()
except (NotFoundException, ObjectNotFoundException):
    print("Object not found")

# Permission denied
try:
    obj = branch.object("data/file.csv")
    obj.upload(data=b"data")
except Exception as e:
    print(f"Upload failed: {e}")