Link Search Menu Expand Document

Using lakeFS with Airflow

Apache Airflow is a platform to programmatically author, schedule and monitor workflows.

There are several steps needed to run Airflow with lakeFS.

Create a lakeFS connection on Airflow

To access the lakeFS server and authenticate with it, create a new Airflow Connection of type HTTP and add it to your DAG. You can do that using the Airflow UI or the CLI. Here’s an example Airflow command that does just that:

airflow connections add conn_lakefs --conn-type=HTTP --conn-host=http://<LAKEFS_ENDPOINT> \
    --conn-extra='{"access_key_id":"<LAKEFS_ACCESS_KEY_ID>","secret_access_key":"<LAKEFS_SECRET_ACCESS_KEY>"}'

Install the lakeFS Airflow package

You can use pip to install the package

pip install airflow-provider-lakefs

Use the package

Operators

The package exposes several operations to interact with a lakeFS server:

  1. CreateBranchOperator creates a new lakeFS branch from the source branch (main by default).

    task_create_branch = CreateBranchOperator(
       task_id='create_branch',
       repo='example-repo',
       branch='example-branch',
       source_branch='main'
    )
    
  2. CommitOperator commits uncommitted changes to a branch.

    task_commit = CommitOperator(
        task_id='commit',
        repo='example-repo',
        branch='example-branch',
        msg='committing to lakeFS using airflow!',
        metadata={'committed_from": "airflow-operator'}
    )
    
  3. MergeOperator merges 2 lakeFS branches.

    task_merge = MergeOperator(
      task_id='merge_branches',
      source_ref='example-branch',
      destination_branch='main',
      msg='merging job outputs',
      metadata={'committer': 'airflow-operator'}
    )
    

Sensors

Sensors are also available that allow synchronizing a running DAG with external operations:

  1. CommitSensor waits until a commit has been applied to the branch

    task_sense_commit = CommitSensor(
        repo='example-repo',
        branch='example-branch',
        task_id='sense_commit'
    )
    
  2. FileSensor waits until a given file is present on a branch.

    task_sense_file = FileSensor(
        task_id='sense_file',
        repo='example-repo',
        branch='example-branch',
        path="file/to/sense"
    )
    

Example

This example DAG in the airflow-provider-lakeFS repository shows how to use all of these.

Performing other operations

Sometimes an operator might not yet be supported by airflow-provider-lakeFS. You can directly access lakeFS by using:

  • SimpleHttpOperator to send API requests to lakeFS.
  • BashOperator with lakeCTL commands. For example, deleting a branch using BashOperator:
    commit_extract = BashOperator(
     task_id='delete_branch',
     bash_command='lakectl branch delete lakefs://example-repo/example-branch',
     dag=dag,
    )
    

Note lakeFS version <= v0.33.1 uses ‘@’ (instead of ‘/’) as separator between repository and branch.