Working with Transactions¶
Transactions enable you to perform multiple operations atomically on lakeFS, similar to database transactions. This guide covers creating and managing transactions for reliable data operations.
Understanding Transactions¶
What are Transactions?¶
A transaction in lakeFS:
- Creates an ephemeral (temporary) branch from a source branch
- Performs all operations on that ephemeral branch
- Atomically merges the branch back upon successful completion
- Automatically cleans up the ephemeral branch
- Rolls back if any error occurs
This ensures either all operations succeed or none of them do.
Creating Transactions¶
Basic Transaction¶
Perform multiple operations atomically:
import lakefs
repo = lakefs.repository("my-data-repo")
branch = repo.branch("main")
try:
with branch.transact(commit_message="Add datasets") as tx:
# All operations happen on ephemeral branch
tx.object("data/file1.csv").upload(data=b"id,value\n1,100\n2,200")
tx.object("data/file2.csv").upload(data=b"id,name\n1,Alice\n2,Bob")
print("Upload successful - changes will be committed atomically")
# At this point, transaction is complete and merged
print("Transaction committed to main")
except Exception as e:
# If we get here, changes were rolled back
print(f"Transaction failed and rolled back: {e}")
Transaction with Metadata¶
Include metadata in the commit created by the transaction:
import lakefs
from datetime import datetime
repo = lakefs.repository("my-data-repo")
branch = repo.branch("main")
try:
with branch.transact(
commit_message="Import customer data",
commit_metadata={
"import-date": datetime.now().isoformat(),
"source": "database-export",
"record-count": "10000"
}
) as tx:
# Perform operations
tx.object("data/customers.csv").upload(data=b"id,name,email\n1,Alice,alice@example.com")
print("Data imported")
# Transaction complete with metadata
print("Transaction committed with tracking metadata")
except Exception as e:
print(f"Import failed: {e}")
Transaction with Tagging¶
Create a tag after successful transaction completion:
import lakefs
repo = lakefs.repository("my-data-repo")
branch = repo.branch("main")
try:
with branch.transact(
commit_message="Production data release v1.5",
tag="v1.5.0" # Tag is created if transaction succeeds
) as tx:
# Make changes
tx.object("VERSION").upload(data=b"1.5.0")
tx.object("data/prod.csv").upload(data=b"updated data")
print("Production release in progress")
print("Release tagged and deployed")
except Exception as e:
print(f"Release failed: {e}")
print("Changes rolled back, no tag created")
Working with Transaction Context¶
Performing Multiple Operations¶
Execute multiple operations within a transaction:
import lakefs
repo = lakefs.repository("analytics-repo")
branch = repo.branch("develop")
try:
with branch.transact(commit_message="Data preparation pipeline") as tx:
# Step 1: Upload raw data
tx.object("raw/input.csv").upload(data=b"raw input data")
# Step 2: Upload processing script
tx.object("scripts/transform.py").upload(
data=b"#!/usr/bin/env python\n# Transformation logic"
)
# Step 3: Upload intermediate results
tx.object("processed/output.csv").upload(data=b"processed output")
# Step 4: Upload metadata
tx.object(".metadata/pipeline_version.txt").upload(data=b"1.0")
print("All pipeline stages added atomically")
print("Pipeline committed successfully")
except Exception as e:
print(f"Pipeline failed: {e}")
Reading and Modifying Data¶
Read objects and modify them within a transaction:
import lakefs
repo = lakefs.repository("my-data-repo")
branch = repo.branch("main")
try:
with branch.transact(commit_message="Data validation update") as tx:
# Read existing data
try:
with tx.object("data/config.txt").reader(mode='r') as f:
config_data = f.read()
print(f"Current config: {config_data}")
except:
config_data = ""
# Modify data
updated_config = config_data + "\nvalidation_enabled: true"
# Write updated data
tx.object("data/config.txt").upload(data=updated_config.encode())
print("Config updated")
print("Configuration changes committed")
except Exception as e:
print(f"Update failed: {e}")
Iterating and Modifying Objects¶
Process multiple objects within a transaction:
import lakefs
repo = lakefs.repository("my-data-repo")
branch = repo.branch("main")
try:
with branch.transact(commit_message="Bulk update data versions") as tx:
# List objects and modify each one
processed_count = 0
for obj in tx.objects(prefix="data/"):
# Read object
try:
with obj.reader(mode='r') as f:
content = f.read()
# Add version marker
versioned_content = f"# version: 2.0\n{content}"
# Write back
tx.object(obj.path).upload(data=versioned_content.encode())
processed_count += 1
except Exception as e:
print(f"Error processing {obj.path}: {e}")
raise # Transaction will rollback
print(f"Processed {processed_count} objects")
print(f"Bulk update committed: {processed_count} objects updated")
except Exception as e:
print(f"Bulk update failed and rolled back: {e}")
Transaction Error Handling¶
Basic Error Handling in Transactions¶
import lakefs
repo = lakefs.repository("my-data-repo")
branch = repo.branch("main")
try:
with branch.transact(commit_message="Data operations") as tx:
# First operation
tx.object("data/file1.csv").upload(data=b"data1")
# Second operation that might fail
try:
with tx.object("data/large_file.csv").reader() as f:
large_content = f.read()
# Process large content
if len(large_content) > 1000000:
raise ValueError("File too large")
except ValueError as e:
print(f"Validation failed: {e}")
raise # This will rollback the entire transaction
# Third operation
tx.object("data/file2.csv").upload(data=b"data2")
except Exception as e:
print(f"Transaction rolled back: {e}")
Transaction with Cleanup on Error¶
import lakefs
repo = lakefs.repository("my-data-repo")
branch = repo.branch("main")
cleanup_needed = False
try:
with branch.transact(
commit_message="Complex operation",
delete_branch_on_error=True # Default is True
) as tx:
# Perform operations
cleanup_needed = True
tx.object("step1/file.csv").upload(data=b"step 1")
# Simulate error
if True: # In real code, some condition
raise Exception("Something went wrong in step 1")
tx.object("step2/file.csv").upload(data=b"step 2")
cleanup_needed = False
except Exception as e:
if cleanup_needed:
print(f"Operation failed, ephemeral branch automatically cleaned up")
print(f"Error: {e}")
Transaction with Conditional Rollback¶
import lakefs
def validate_data(data):
"""Validate data meets requirements"""
if len(data) == 0:
raise ValueError("Empty data not allowed")
if b"invalid" in data:
raise ValueError("Data contains invalid markers")
return True
repo = lakefs.repository("my-data-repo")
branch = repo.branch("main")
try:
with branch.transact(commit_message="Data import with validation") as tx:
# Upload data
data = b"id,value\n1,100\n2,200"
# Validate before committing
validate_data(data)
# If validation passes, commit
tx.object("data/validated.csv").upload(data=data)
print("Data validation passed, changes committed")
except ValueError as e:
print(f"Validation error - transaction rolled back: {e}")
except Exception as e:
print(f"Transaction error: {e}")
Real-World Workflows¶
Data Quality Validation Workflow¶
Implement data quality checks before committing changes:
import lakefs
import csv
import io
def check_data_quality(repo_name, data_path, quality_rules):
"""
Check data quality before committing
Returns True if passes all checks, False otherwise
"""
repo = lakefs.repository(repo_name)
branch = repo.branch("main")
try:
with branch.transact(commit_message="Quality checked data") as tx:
# Read data
with tx.object(data_path).reader(mode='r') as f:
data = f.read()
# Run quality checks
errors = []
# Check 1: Not empty
if len(data) == 0:
errors.append("Data is empty")
# Check 2: Valid CSV format
try:
reader = csv.reader(io.StringIO(data.decode()))
rows = list(reader)
if len(rows) < 2:
errors.append("Data has no rows")
except Exception as e:
errors.append(f"Invalid CSV format: {e}")
# Check 3: Custom rules
for rule in quality_rules:
if not rule(data):
errors.append(f"Failed custom rule: {rule.__name__}")
# If checks fail, raise error (transaction will rollback)
if errors:
error_msg = "; ".join(errors)
raise ValueError(f"Quality checks failed: {error_msg}")
# If all pass, add quality marker
tx.object(f"{data_path}.quality_passed").upload(data=b"true")
print(f"Data quality checks passed for {data_path}")
return True
except Exception as e:
print(f"Quality check failed: {e}")
return False
# Usage:
def rule_has_headers(data):
"""Custom rule: data must have headers"""
lines = data.decode().strip().split('\n')
return len(lines) > 0 and ',' in lines[0]
success = check_data_quality(
"analytics-repo",
"data/incoming.csv",
[rule_has_headers]
)
if success:
print("Data passed quality gates and is now committed")
Database Synchronization Workflow¶
Keep data synchronized with atomicity guarantees:
import lakefs
import json
from datetime import datetime
def sync_database_export(repo_name, table_name, export_data, export_metadata):
"""
Atomically sync a database table export:
1. Store the export data
2. Update metadata
3. Update sync timestamp
4. All-or-nothing
"""
repo = lakefs.repository(repo_name)
branch = repo.branch("main")
try:
with branch.transact(
commit_message=f"Sync: {table_name}",
commit_metadata={
"sync-type": "database-export",
"table": table_name,
"sync-time": datetime.now().isoformat()
}
) as tx:
# Step 1: Store the data
data_path = f"data/{table_name}.csv"
tx.object(data_path).upload(data=export_data)
# Step 2: Store metadata
metadata = {
"table": table_name,
"row_count": export_metadata.get("row_count", 0),
"columns": export_metadata.get("columns", []),
"sync_timestamp": datetime.now().isoformat(),
"source_database": export_metadata.get("source", "unknown")
}
metadata_path = f".metadata/{table_name}_metadata.json"
tx.object(metadata_path).upload(
data=json.dumps(metadata, indent=2).encode()
)
# Step 3: Update sync status
status = {
"table": table_name,
"last_sync": datetime.now().isoformat(),
"status": "success"
}
status_path = f".sync/{table_name}_status.json"
tx.object(status_path).upload(
data=json.dumps(status).encode()
)
print(f"Synchronized {table_name}")
print(f"Sync committed atomically")
return True
except Exception as e:
print(f"Sync failed - rolling back: {e}")
return False
# Usage:
success = sync_database_export(
"warehouse-repo",
"customers",
b"id,name,email\n1,Alice,alice@example.com\n2,Bob,bob@example.com",
{
"row_count": 2,
"columns": ["id", "name", "email"],
"source": "production_db"
}
)
ETL Pipeline with Checkpoints¶
Implement ETL with atomic checkpoints:
import lakefs
def etl_pipeline_step(repo_name, branch_name, step_name, step_logic):
"""
Run an ETL step with atomic checkpointing
"""
repo = lakefs.repository(repo_name)
branch = repo.branch(branch_name)
try:
with branch.transact(
commit_message=f"ETL: {step_name}",
commit_metadata={"etl-step": step_name}
) as tx:
# Run the step logic
result = step_logic(tx)
# Create checkpoint
checkpoint = {
"step": step_name,
"status": "completed",
"records_processed": result.get("count", 0)
}
tx.object(f".checkpoints/{step_name}.json").upload(
data=json.dumps(checkpoint).encode()
)
print(f"Step '{step_name}' completed with checkpoint")
return True
except Exception as e:
print(f"Step '{step_name}' failed - checkpoint rolled back: {e}")
return False
# Define ETL steps
def extract_step(tx):
"""Extract data"""
tx.object("etl/01_raw/data.csv").upload(data=b"extracted data")
return {"count": 1}
def transform_step(tx):
"""Transform data"""
tx.object("etl/02_transformed/data.csv").upload(data=b"transformed data")
return {"count": 1}
def load_step(tx):
"""Load data"""
tx.object("etl/03_loaded/data.csv").upload(data=b"loaded data")
return {"count": 1}
# Execute pipeline
import json
steps = [
("extract", extract_step),
("transform", transform_step),
("load", load_step)
]
for step_name, step_func in steps:
success = etl_pipeline_step("data-repo", "main", step_name, step_func)
if not success:
print(f"ETL failed at {step_name}")
break
else:
print("ETL pipeline completed successfully")
Schema Evolution with Validation¶
Safely evolve data schema with validation:
import lakefs
import csv
import io
def evolve_schema(repo_name, table_name, old_schema, new_schema, migration_logic):
"""
Evolve a table schema with validation
Either all rows get migrated or none do
"""
repo = lakefs.repository(repo_name)
branch = repo.branch("main")
try:
with branch.transact(
commit_message=f"Schema evolution: {table_name}",
commit_metadata={
"schema-version": new_schema.get("version"),
"migration-type": "schema-evolution"
}
) as tx:
# Read current data
try:
with tx.object(f"data/{table_name}.csv").reader(mode='r') as f:
current_data = f.read()
except:
raise ValueError(f"Table {table_name} not found")
# Parse CSV
reader = csv.DictReader(io.StringIO(current_data.decode()))
rows = list(reader)
# Migrate each row
migrated_rows = []
for row in rows:
migrated_row = migration_logic(row, old_schema, new_schema)
migrated_rows.append(migrated_row)
# Write migrated data
output = io.StringIO()
writer = csv.DictWriter(
output,
fieldnames=new_schema.get("columns", [])
)
writer.writeheader()
writer.writerows(migrated_rows)
tx.object(f"data/{table_name}.csv").upload(
data=output.getvalue().encode()
)
# Store schema version
tx.object(f".schema/{table_name}.json").upload(
data=json.dumps(new_schema).encode()
)
print(f"Schema evolved for {table_name}")
return True
except Exception as e:
print(f"Schema evolution failed - rolled back: {e}")
return False
# Usage:
old_schema = {
"version": 1,
"columns": ["id", "name", "email"]
}
new_schema = {
"version": 2,
"columns": ["id", "name", "email", "phone"]
}
def add_phone_column(row, old, new):
row["phone"] = "" # Default empty phone
return row
success = evolve_schema(
"crm-repo",
"contacts",
old_schema,
new_schema,
add_phone_column
)
Advanced Patterns¶
Conditional Transaction Execution¶
import lakefs
repo = lakefs.repository("my-data-repo")
branch = repo.branch("main")
# Check if we need to make changes
changes_needed = True # In real code, some condition
if changes_needed:
try:
with branch.transact(commit_message="Conditional update") as tx:
tx.object("data/file.csv").upload(data=b"new data")
print("Update committed")
except Exception as e:
print(f"Update failed: {e}")
else:
print("No changes needed, skipping transaction")
Transaction Retry Logic¶
import lakefs
import time
def transact_with_retry(repo_name, branch_name, max_retries=3):
"""Retry transaction on failure"""
repo = lakefs.repository(repo_name)
branch = repo.branch(branch_name)
for attempt in range(max_retries):
try:
with branch.transact(commit_message=f"Attempt {attempt + 1}") as tx:
tx.object("data/file.csv").upload(data=b"data")
print("Transaction succeeded")
return True
except Exception as e:
print(f"Attempt {attempt + 1} failed: {e}")
if attempt < max_retries - 1:
wait_time = 2 ** attempt # Exponential backoff
print(f"Retrying in {wait_time} seconds...")
time.sleep(wait_time)
else:
print(f"Failed after {max_retries} attempts")
return False
Error Handling¶
import lakefs
from lakefs.exceptions import NotFoundException, ForbiddenException, ServerException
repo = lakefs.repository("my-data-repo")
branch = repo.branch("main")
try:
with branch.transact(commit_message="Safe operation") as tx:
tx.object("data/file.csv").upload(data=b"data")
except NotFoundException:
print("Branch or repository not found")
except ForbiddenException:
print("Permission denied - cannot write to branch")
except ServerException as e:
print(f"Server error - transaction rolled back: {e}")
except Exception as e:
print(f"Unexpected error: {e}")