Using lakeFS with Apache Airflow
Apache Airflow is a platform that allows users to programmatically author, schedule, and monitor workflows.
To run Airflow with lakeFS, you need to follow a few steps.
Create a lakeFS connection on Airflow
To access the lakeFS server and authenticate with it, create a new Airflow Connection of type HTTP and add it to your DAG. You can do that using the Airflow UI or the CLI. Hereβs an example Airflow command that does just that:
airflow connections add conn_lakefs --conn-type=HTTP --conn-host=http://<LAKEFS_ENDPOINT> \
--conn-extra='{"access_key_id":"<LAKEFS_ACCESS_KEY_ID>","secret_access_key":"<LAKEFS_SECRET_ACCESS_KEY>"}'
Install the lakeFS Airflow package
You can use pip to install the package
pip install airflow-provider-lakefs
Use the package
Operators
The package exposes several operations to interact with a lakeFS server:
-
CreateBranchOperatorcreates a new lakeFS branch from the source branch (mainby default).task_create_branch = CreateBranchOperator( task_id='create_branch', repo='example-repo', branch='example-branch', source_branch='main' ) -
CommitOperatorcommits uncommitted changes to a branch.task_commit = CommitOperator( task_id='commit', repo='example-repo', branch='example-branch', msg='committing to lakeFS using airflow!', metadata={'committed_from": "airflow-operator'} ) -
MergeOperatormerges 2 lakeFS branches.task_merge = MergeOperator( task_id='merge_branches', source_ref='example-branch', destination_branch='main', msg='merging job outputs', metadata={'committer': 'airflow-operator'} )
Sensors
Sensors are also available that allow synchronizing a running DAG with external operations:
-
CommitSensorwaits until a commit has been applied to the branchtask_sense_commit = CommitSensor( repo='example-repo', branch='example-branch', task_id='sense_commit' ) -
FileSensorwaits until a given file is present on a branch.task_sense_file = FileSensor( task_id='sense_file', repo='example-repo', branch='example-branch', path="file/to/sense" )
Example
This example DAG in the airflow-provider-lakeFS repository shows how to use all of these.
Performing other operations
Sometimes an operator might not be supported by airflow-provider-lakeFS yet. You can access lakeFS directly by using:
- SimpleHttpOperator to send API requests to lakeFS.
- BashOperator with lakectl commands.
For example, deleting a branch using BashOperator:
commit_extract = BashOperator( task_id='delete_branch', bash_command='lakectl branch delete lakefs://example-repo/example-branch', dag=dag, )
Note lakeFS version <= v0.33.1 uses β@β (instead of β/β) as separator between repository and branch.