Using lakeFS with Airflow
Apache Airflow is a platform to programmatically author, schedule and monitor workflows.
There are some aspects we will need to handle in order to run Airflow with lakeFS:
Creating the lakeFS connection
For authenticating to the lakeFS server, you need to create a new Airflow Connection of type HTTP and pass it to your DAG. You can do that using the Airflow UI or the cli. Here’s an example Airflow command that does just that:
airflow connections add conn_lakefs --conn-type=HTTP --conn-host=http://<LAKEFS_ENDPOINT> \
--conn-extra='{"access_key_id":"<LAKEFS_ACCESS_KEY_ID>","secret_access_key":"<LAKEFS_SECRET_ACCESS_KEY>"}'
Installing lakeFS Airflow package
Installing the package using pip
:
pip install airflow-provider-lakefs
Using the package
The package exposes several operations for interacting with a lakeFS server:
-
CreateBranchOperator
creates a new lakeFS branch from the source branch (defaults to main).task_create_branch = CreateBranchOperator( task_id='create_branch', repo='example-repo', branch='example-branch', source_branch='main' )
-
CommitOperator
commits uncommitted changes to a branch.task_commit = CommitOperator( task_id='commit', repo='example-repo', branch='example-branch', msg='committing to lakeFS using airflow!', metadata={'committed_from": "airflow-operator'} )
-
MergeOperator
merges 2 lakeFS branches.task_merge = MergeOperator( task_id='merge_branches', source_ref='example-branch', destination_branch='main', msg='merging job outputs', metadata={'committer': 'airflow-operator'} )
Sensors are also available if you want to synchronize a running DAG with external operations:
-
CommitSensor
waits until a commit has been applied to the branchtask_sense_commit = CommitSensor( repo='example-repo', branch='example-branch', task_id='sense_commit' )
-
FileSensor
waits until a given file is present in a branch.task_sense_file = FileSensor( task_id='sense_file', repo='example-repo', branch='example-branch', path="file/to/sense" )
For a DAG example that uses all the above, check out the example DAG in the airflow-provider-lakeFS repository.
Performing other operations
To perform other operations that are not yet supported by the package, you can use:
- SimpleHttpOperator to send API requests to lakeFS.
- BashOperator with lakeCTL commands.
For example, deleting a branch using BashOperator:
commit_extract = BashOperator( task_id='delete_branch', bash_command='lakectl branch delete lakefs://example-repo/example-branch', dag=dag, )
Note lakeFS version <= v0.33.1 uses ‘@’ (instead of ‘/’) as separator between repository and branch.