Link Search Menu Expand Document

Using Vertex AI with lakeFS

Vertex AI lets Google Cloud users Build, deploy, and scale machine learning (ML) models faster, with fully managed ML tools for any use case.

lakeFS Works with Vertex AI by allowing users to create repositories on GCS Buckets, then use either the Dataset API to create managed Datasets on top of lakeFS version, or by automatically exporting lakeFS object versions in a way readable by Cloud Storage Mounts.

Using lakeFS with Vertex Managed Datasets

Vertex’s ImageDataset and VideoDataset allow creating a dataset by importing a CSV file from gcs (see gcs_source).

This CSV file contains GCS addresses of image files and their corresponding labels.

Since the lakeFS API supports exporting the underlying GCS address of versioned objects, we can generate such a CSV file when creating the dataset:

#!/usr/bin/env python

# Requirements:
# google-cloud-aiplatform>=1.31.0
# lakefs-client>=0.107.0

import csv
from pathlib import PosixPath
from io import StringIO

import lakefs_client
from lakefs_client.client import LakeFSClient
from google.cloud import storage
from google.cloud import aiplatform

# lakeFS connection details
configuration = lakefs_client.Configuration()
configuration.username = 'AKIAIOSFODNN7EXAMPLE'
configuration.password = 'wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY'
configuration.host = 'https://lakefs.example.com/'
client = LakeFSClient(configuration)

# Dataset configuration
lakefs_repo = 'my-repository'
lakefs_ref = 'main'
img_dataset = 'datasets/my-images/'

# Vertex configuration
import_bucket = 'underlying-gcs-bucket'

# produce import file for Vertex's SDK
buf = StringIO()
csv_writer = csv.writer(buf)
has_more = True
next_offset = ""
while has_more:
    files = client.objects_api.list_objects(
        lakefs_repo, lakefs_ref, prefix=img_dataset, after=next_offset)
    for r in files.get('results'):
        p = PosixPath(r.path)
        csv_writer.writerow((r.physical_address, p.parent.name))
    has_more = files.get('pagination').get('has_more')
    next_offset = files.get('pagination').get('next_offset')

# spit out CSV
print('generated path and labels CSV')
buf.seek(0)

# Write it to storage
storage_client = storage.Client()
bucket = storage_client.bucket(import_bucket)
blob = bucket.blob(f'vertex/imports/{lakefs_repo}/{lakefs_ref}/labels.csv')
with blob.open('w') as out:
    out.write(buf.read())

print(f'wrote csv to gs: gs://{import_bucket}/vertex/imports/{lakefs_repo}/{lakefs_ref}/labels.csv')

# import in vertex, as dataset
print('Importing dataset...')
ds = aiplatform.ImageDataset.create(
    display_name=f'{lakefs_repo}_{lakefs_ref}_imgs',
    gcs_source=f'gs://{import_bucket}/vertex/imports/{lakefs_repo}/{lakefs_ref}/labels.csv',
    import_schema_uri=aiplatform.schema.dataset.ioformat.image.single_label_classification,
    sync=True
)
ds.wait()
print(f'Done! {ds.display_name} ({ds.resource_name})')

Using lakeFS with Cloud Storage Fuse

Vertex allows using Google Cloud Storage mounted as a Fuse Filesystem as custom input for training jobs.

Instead of having to copy lakeFS files for each version we want to consume, we can create symlinks by using gcsfuse’s native symlink inodes.

This process can be fully automated by using the example gcsfuse_symlink_exporter.lua Lua hook.

Here’s what we need to do:

  1. Upload the example .lua file into our lakeFS repository. For this example, we’ll put it under scripts/gcsfuse_symlink_exporter.lua.
  2. Create a new hook definition file and upload to _lakefs_actions/export_images.yaml:
---
# Example hook declaration: (_lakefs_actions/export_images.yaml):
name: export_images

on:
  post-commit:
    branches: ["main"]
  post-merge:
    branches: ["main"]
  post-create-tag:

hooks:
- id: gcsfuse_export_images
  type: lua
  properties:
    script_path: scripts/export_gcs_fuse.lua  # Path to the script we uploaded in the previous step
    args:
      prefix: "datasets/images/"  # Path we want to export every commit
      destination: "gs://my-bucket/exports/my-repo/"  # Where should we create the symlinks?
      mount:
        from: "gs://my-bucket/repos/my-repo/"  # Symlinks are to a unix-mounted file
        to: "/gcs/my-bucket/repos/my-repo/"    #  This will ensure they point to a location that exists.
      
      # Should be the contents of a valid credentials.json file
      # See: https://developers.google.com/workspace/guides/create-credentials
      # Will be used to write the symlink files
      gcs_credentials_json_string: |
        {
          "client_id": "...",
          "client_secret": "...",
          "refresh_token": "...",
          "type": "..."
        }

Done! On the next tag creation or update to the main branch, we’ll automatically export the lakeFS version of datasets/images/ to a mountable location.

To consume the symlink-ed files, we can read them normally from the mount:

with open('/gcs/my-bucket/exports/my-repo/branches/main/datasets/images/001.jpg') as f:
    image_data = f.read()

Previously exported commits are also readable, if we exported them in the past:

commit_id = 'abcdef123deadbeef567'
with open(f'/gcs/my-bucket/exports/my-repo/commits/{commit_id}/datasets/images/001.jpg') as f:
    image_data = f.read()

Considerations when using lakeFS with Cloud Storage Fuse

For lakeFS paths to be readable by gcsfuse, the mount option --implicit-dirs must be specified.