Skip to content

Using lakefs-spec for File System Operations

The lakefs-spec project provides a filesystem-like API to lakeFS, built on top of fsspec. This integration is perfect for data science workflows, pandas integration, and scenarios where you want S3-like operations.

Note

lakefs-spec is a third-party package maintained by the lakeFS community. For issues and questions, refer to the lakefs-spec repository.

When to Use

Use lakefs-spec when you:

  • Need file system-like operations (open, read, write, delete)
  • Work with data science tools (pandas, dask, polars)
  • Want an S3-compatible interface without managing branches explicitly
  • Need to integrate with fsspec-compatible libraries
  • Prefer familiar file operations over versioning abstractions

For versioning-focused workflows (branches, tags, commits), use the High-Level SDK instead.

Installation

Install lakefs-spec using pip:

pip install lakefs-spec

Or upgrade to the latest version:

pip install --upgrade lakefs-spec

Basic Setup

Initializing the File System

from lakefs_spec import LakeFSFileSystem

# Auto-discover credentials from ~/.lakectl.yaml
fs = LakeFSFileSystem()

# Or provide explicit credentials
fs = LakeFSFileSystem(
    host="http://localhost:8000",
    username="your-access-key",
    password="your-secret-key"
)

File Operations

Writing Files

from pathlib import Path
from lakefs_spec import LakeFSFileSystem

fs = LakeFSFileSystem()

# Write text file
fs.pipe("my-repo/main/data/text.txt", b"Hello, lakeFS!")

# Write from local file
local_file = Path("local_data.csv")
local_file.write_text("id,name\n1,Alice\n2,Bob")
fs.put(str(local_file), "my-repo/main/data/imported.csv")

# Write using context manager
with fs.open("my-repo/main/data/output.txt", "w") as f:
    f.write("Data written to lakeFS")

Reading Files

from lakefs_spec import LakeFSFileSystem

fs = LakeFSFileSystem()

# Read entire file
data = fs.cat("my-repo/main/data/text.txt")
print(data.decode())

# Read using context manager
with fs.open("my-repo/main/data/input.txt", "r") as f:
    content = f.read()
    print(content)

# Read in chunks (for large files)
with fs.open("my-repo/main/data/large_file.csv", "rb") as f:
    chunk = f.read(1024)
    while chunk:
        process(chunk)
        chunk = f.read(1024)

Listing Files

from lakefs_spec import LakeFSFileSystem

fs = LakeFSFileSystem()

# List files at path
files = fs.ls("my-repo/main/data/")
for file in files:
    print(file)

# Find files with glob pattern
csv_files = fs.glob("my-repo/main/**/*.csv")
for csv in csv_files:
    print(csv)

Deleting Files

from lakefs_spec import LakeFSFileSystem

fs = LakeFSFileSystem()

# Delete single file
fs.rm("my-repo/main/data/temp.txt")

# Delete directory recursively
fs.rm("my-repo/main/data/temp_dir", recursive=True)

Pandas Integration

Reading Data into Pandas

import pandas as pd
from lakefs_spec import LakeFSFileSystem

# Read CSV directly from lakeFS
df = pd.read_csv("lakefs://my-repo/main/data/dataset.csv")
print(df.head())

# Read Parquet
df = pd.read_parquet("lakefs://my-repo/main/data/data.parquet")

# Read JSON
df = pd.read_json("lakefs://my-repo/main/data/data.json")

Writing Data from Pandas

import pandas as pd

# Create sample data
df = pd.DataFrame({
    "id": [1, 2, 3, 4, 5],
    "name": ["Alice", "Bob", "Carol", "David", "Eve"],
    "value": [100, 200, 300, 400, 500]
})

# Write to lakeFS as CSV
df.to_csv("lakefs://my-repo/main/output/data.csv", index=False)

# Write as Parquet
df.to_parquet("lakefs://my-repo/main/output/data.parquet")

# Write as JSON
df.to_json("lakefs://my-repo/main/output/data.json")

Data Science Workflow Example

import pandas as pd
import numpy as np

# Read training data
train_df = pd.read_csv("lakefs://ml-repo/main/datasets/train.csv")
test_df = pd.read_csv("lakefs://ml-repo/main/datasets/test.csv")

# Process data
train_df["normalized_value"] = (train_df["value"] - train_df["value"].mean()) / train_df["value"].std()
test_df["normalized_value"] = (test_df["value"] - test_df["value"].mean()) / test_df["value"].std()

# Save processed data
train_df.to_parquet("lakefs://ml-repo/main/processed/train.parquet")
test_df.to_parquet("lakefs://ml-repo/main/processed/test.parquet")

print("Processing complete!")

Transactions

Atomic Operations with Transactions

from lakefs_spec import LakeFSFileSystem

fs = LakeFSFileSystem()

# Perform atomic operations
with fs.transaction("my-repo", "main") as tx:
    # All operations happen on ephemeral branch
    fs.pipe(
        f"my-repo/{tx.branch.id}/data/file1.txt",
        b"Content 1"
    )
    fs.pipe(
        f"my-repo/{tx.branch.id}/data/file2.txt",
        b"Content 2"
    )

    # Commit when done
    tx.commit(message="Add files atomically")
    print(f"Committed: {tx.branch.id}")

Transaction with Error Handling

from lakefs_spec import LakeFSFileSystem

fs = LakeFSFileSystem()

try:
    with fs.transaction("my-repo", "main") as tx:
        # Perform operations
        fs.pipe(f"my-repo/{tx.branch.id}/file.txt", b"data")

        # Validate
        stat = fs.stat(f"my-repo/{tx.branch.id}/file.txt")
        if stat["size"] < 100:
            raise ValueError("File too small")

        tx.commit(message="Validated and committed")

except Exception as e:
    print(f"Transaction failed: {e}")
    print("Changes rolled back automatically")

Tagging After Transaction

from lakefs_spec import LakeFSFileSystem

fs = LakeFSFileSystem()

with fs.transaction("ml-repo", "main") as tx:
    # Train and save model
    fs.pipe(f"ml-repo/{tx.branch.id}/models/model.pkl", model_data)

    # Save metrics
    fs.pipe(f"ml-repo/{tx.branch.id}/metrics.json", metrics_data)

    # Commit
    tx.commit(message="Model v1.0")

    # Tag as release
    tx.tag("v1.0.0")
    print("Model released as v1.0.0")

Real-World Examples

ETL Pipeline

import pandas as pd
from lakefs_spec import LakeFSFileSystem

fs = LakeFSFileSystem()

# Extract: Read from multiple sources
raw_files = fs.glob("my-repo/main/raw/*.csv")
dfs = [pd.read_csv(f"lakefs://{f}") for f in raw_files]
combined = pd.concat(dfs)

# Transform: Clean and process
combined = combined.dropna()
combined["timestamp"] = pd.to_datetime(combined["timestamp"])
combined["normalized"] = (combined["value"] - combined["value"].mean()) / combined["value"].std()

# Load: Write processed data
combined.to_parquet("lakefs://my-repo/main/processed/data.parquet")
print(f"ETL complete: {len(combined)} rows processed")

Data Analysis and Reporting

import pandas as pd
import json
from lakefs_spec import LakeFSFileSystem

fs = LakeFSFileSystem()

# Read data for analysis
df = pd.read_parquet("lakefs://analytics-repo/main/data/raw_data.parquet")

# Perform analysis
summary = {
    "total_records": len(df),
    "mean_value": float(df["value"].mean()),
    "median_value": float(df["value"].median()),
    "std_value": float(df["value"].std())
}

# Save report
report = json.dumps(summary, indent=2)
fs.pipe("lakefs://analytics-repo/main/reports/summary.json", report.encode())

print("Analysis report saved")

Model Versioning

import pickle
from datetime import datetime
from lakefs_spec import LakeFSFileSystem

fs = LakeFSFileSystem()

def save_model_version(repo, model, version, metrics):
    """Save model with version and metrics"""
    timestamp = datetime.now().isoformat()

    with fs.transaction(repo, "main") as tx:
        branch_id = tx.branch.id

        # Save model
        model_bytes = pickle.dumps(model)
        fs.pipe(
            f"{repo}/{branch_id}/models/{version}/model.pkl",
            model_bytes
        )

        # Save metrics
        metrics_json = json.dumps({
            "version": version,
            "timestamp": timestamp,
            **metrics
        })
        fs.pipe(
            f"{repo}/{branch_id}/models/{version}/metrics.json",
            metrics_json.encode()
        )

        # Commit
        tx.commit(message=f"Model {version}")

        # Tag for reference
        tx.tag(f"model-{version}")
        print(f"Model {version} saved and tagged")


# Usage:
model = train_model(training_data)
save_model_version(
    "ml-repo",
    model,
    "v2.1.0",
    {"accuracy": 0.95, "f1": 0.94}
)

Further Resources