Using lakeFS with Spark
Apache Spark is a multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters
Note In all of the following examples, we set AWS and lakeFS credentials at runtime for clarity. In production, properties defining AWS credentials should be set using one of Hadoop’s standard ways of authenticating with S3. Similarly, properties defining lakeFS credentials should be configured in secure site files, not on the command line or inlined in code where they might be exposed.
Two-tiered Spark support
There are two ways you can use lakeFS with Spark:
- Using the S3 gateway: get started quickly!
- Using the lakeFS Hadoop FileSystem: fully unlock the performance of lakeFS.
Use the S3 gateway
lakeFS has an S3-compatible endpoint. Simply point Spark to this endpoint to get started quickly.
You will access your data using S3-style URIs, e.g. s3a://example-repo/example-branch/example-table
.
Configuration
To configure Spark to work with lakeFS, we set S3A Hadoop configuration to the lakeFS endpoint and credentials:
Hadoop Configuration | Value |
---|---|
fs.s3a.access.key |
Set to the lakeFS access key |
fs.s3a.secret.key |
Set to the lakeFS secret key |
fs.s3a.endpoint |
Set to the lakeFS S3-compatible API endpoint |
fs.s3a.path.style.access |
Set to true |
Here is how to do it:
spark-shell --conf spark.hadoop.fs.s3a.access.key='AKIAlakefs12345EXAMPLE' \
--conf spark.hadoop.fs.s3a.secret.key='abc/lakefs/1234567bPxRfiCYEXAMPLEKEY' \
--conf spark.hadoop.fs.s3a.path.style.access=true \
--conf spark.hadoop.fs.s3a.endpoint='https://lakefs.example.com' ...
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "AKIAlakefs12345EXAMPLE")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "abc/lakefs/1234567bPxRfiCYEXAMPLEKEY")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "https://lakefs.example.com")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.path.style.access", "true")
Add these into a configuration file, e.g. $SPARK_HOME/conf/hdfs-site.xml
:
<?xml version="1.0"?>
<configuration>
<property>
<name>fs.s3a.access.key</name>
<value>AKIAlakefs12345EXAMPLE</value>
</property>
<property>
<name>fs.s3a.secret.key</name>
<value>abc/lakefs/1234567bPxRfiCYEXAMPLEKEY</value>
</property>
<property>
<name>fs.s3a.endpoint</name>
<value>https://lakefs.example.com</value>
</property>
<property>
<name>fs.s3a.path.style.access</name>
<value>true</value>
</property>
</configuration>
Per-bucket configuration
The above configuration will use lakeFS as the sole S3 endpoint. To use lakeFS in parallel with S3, you can configure Spark to use lakeFS only for specific bucket names.
For example, to configure only example-repo
to use lakeFS, set the following configurations:
spark-shell --conf spark.hadoop.fs.s3a.bucket.example-repo.access.key='AKIAlakefs12345EXAMPLE' \
--conf spark.hadoop.fs.s3a.bucket.example-repo.secret.key='abc/lakefs/1234567bPxRfiCYEXAMPLEKEY' \
--conf spark.hadoop.fs.s3a.bucket.example-repo.endpoint='https://lakefs.example.com' \
--conf spark.hadoop.fs.s3a.path.style.access=true
spark.sparkContext.hadoopConfiguration.set("fs.s3a.bucket.example-repo.access.key", "AKIAlakefs12345EXAMPLE")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.bucket.example-repo.secret.key", "abc/lakefs/1234567bPxRfiCYEXAMPLEKEY")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.bucket.example-repo.endpoint", "https://lakefs.example.com")
spark.sparkContext.hadoopConfiguration.set("fs.s3a.path.style.access", "true")
Add these into a configuration file, e.g. $SPARK_HOME/conf/hdfs-site.xml
:
<?xml version="1.0"?>
<configuration>
<property>
<name>fs.s3a.bucket.example-repo.access.key</name>
<value>AKIAlakefs12345EXAMPLE</value>
</property>
<property>
<name>fs.s3a.bucket.example-repo.secret.key</name>
<value>abc/lakefs/1234567bPxRfiCYEXAMPLEKEY</value>
</property>
<property>
<name>fs.s3a.bucket.example-repo.endpoint</name>
<value>https://lakefs.example.com</value>
</property>
<property>
<name>fs.s3a.path.style.access</name>
<value>true</value>
</property>
</configuration>
With this configuration set, you read S3A paths with example-repo
as the bucket will use lakeFS, while all other buckets will use AWS S3.
Reading Data
To access objects in lakeFS, you need to use the lakeFS S3 gateway path conventions:
s3a://[REPOSITORY]/[BRANCH]/PATH/TO/OBJECT
Here is an example for reading a parquet file from lakeFS to a Spark DataFrame:
val repo = "example-repo"
val branch = "main"
val dataPath = s"s3a://${repo}/${branch}/example-path/example-file.parquet"
val df = spark.read.parquet(dataPath)
You can now use this DataFrame like you’d normally do.
Writing Data
Now simply write your results back to a lakeFS path:
df.write.partitionBy("example-column").parquet(s"s3a://${repo}/${branch}/output-path/")
The data is now created in lakeFS as new changes in your branch. You can now commit these changes or revert them.
Use the lakeFS Hadoop FileSystem
If you’re using lakeFS on top of S3, this mode will enhance your application’s performance. In this mode, Spark will read and write objects directly from S3, reducing the load on the lakeFS server. It will still access the lakeFS server for metadata operations.
After configuring the lakeFS Hadoop FileSystem below, use URIs of the form lakefs://example-repo/ref/path/to/data
to
interact with your data on lakeFS.
Configuration
-
Install the lakeFS Hadoop FileSystem:
Add the package to your spark-submit command:
--packages io.lakefs:hadoop-lakefs-assembly:0.1.8
The jar is also available on a public S3 location:
s3://treeverse-clients-us-east/hadoop/hadoop-lakefs-assembly-0.1.8.jar
-
Configure the S3A filesystem with your S3 credentials (not the lakeFS credentials). Additionally, supply the
fs.lakefs.*
configurations to allow Spark to access metadata on lakeFS:Hadoop Configuration Value fs.s3a.access.key
Set to the AWS S3 access key fs.s3a.secret.key
Set to the AWS S3 secret key fs.lakefs.impl
io.lakefs.LakeFSFileSystem
fs.lakefs.access.key
Set to the lakeFS access key fs.lakefs.secret.key
Set to the lakeFS secret key fs.lakefs.endpoint
Set to the lakeFS API URL The lakeFS Hadoop FileSystem uses the
fs.s3a.*
properties to directly access S3. If your cluster already has access to your buckets (for example, if you’re using an AWS instance profile), then you don’t need to configure these properties. permissions.Here are some configuration examples:
spark-shell --conf spark.hadoop.fs.s3a.access.key='AKIAIOSFODNN7EXAMPLE' \ --conf spark.hadoop.fs.s3a.secret.key='wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY' \ --conf spark.hadoop.fs.s3a.endpoint='https://s3.eu-central-1.amazonaws.com' \ --conf spark.hadoop.fs.lakefs.impl=io.lakefs.LakeFSFileSystem \ --conf spark.hadoop.fs.lakefs.access.key=AKIAlakefs12345EXAMPLE \ --conf spark.hadoop.fs.lakefs.secret.key=abc/lakefs/1234567bPxRfiCYEXAMPLEKEY \ --conf spark.hadoop.fs.lakefs.endpoint=https://lakefs.example.com/api/v1 \ --packages io.lakefs:hadoop-lakefs-assembly:0.1.8 ...
Ensure you load the lakeFS FileSystem into Spark by running it with
--packages
or--jars
, and then run:spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", "AKIAIOSFODNN7EXAMPLE") spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY") spark.sparkContext.hadoopConfiguration.set("fs.s3a.endpoint", "https://s3.eu-central-1.amazonaws.com") spark.sparkContext.hadoopConfiguration.set("fs.lakefs.impl", "io.lakefs.LakeFSFileSystem") spark.sparkContext.hadoopConfiguration.set("fs.lakefs.access.key", "AKIAlakefs12345EXAMPLE") spark.sparkContext.hadoopConfiguration.set("fs.lakefs.secret.key", "abc/lakefs/1234567bPxRfiCYEXAMPLEKEY") spark.sparkContext.hadoopConfiguration.set("fs.lakefs.endpoint", "https://lakefs.example.com/api/v1")
Make sure that you load the lakeFS FileSystem into Spark by running it with
--packages
or--jars
, and then add these into a configuration file, e.g.,$SPARK_HOME/conf/hdfs-site.xml
:<?xml version="1.0"?> <configuration> <property> <name>fs.s3a.access.key</name> <value>AKIAIOSFODNN7EXAMPLE</value> </property> <property> <name>fs.s3a.secret.key</name> <value>wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY</value> </property> <property> <name>fs.s3a.endpoint</name> <value>https://s3.eu-central-1.amazonaws.com</value> </property> <property> <name>fs.lakefs.impl</name> <value>io.lakefs.LakeFSFileSystem</value> </property> <property> <name>fs.lakefs.access.key</name> <value>AKIAlakefs12345EXAMPLE</value> </property> <property> <name>fs.lakefs.secret.key</name> <value>abc/lakefs/1234567bPxRfiCYEXAMPLEKEY</value> </property> <property> <name>fs.lakefs.endpoint</name> <value>https://lakefs.example.com/api/v1</value> </property> </configuration>
If your bucket is on a region other than us-east-1, you may also need to configure
fs.s3a.endpoint
with the correct region. Amazon provides S3 endpoints you can use.
Reading Data
To access objects in lakeFS, you need to use the lakeFS path conventions:
lakefs://[REPOSITORY]/[BRANCH]/PATH/TO/OBJECT
Here’s an example for reading a parquet file from lakeFS to a Spark DataFrame:
val repo = "example-repo"
val branch = "main"
val dataPath = s"lakefs://${repo}/${branch}/example-path/example-file.parquet"
val df = spark.read.parquet(dataPath)
You can now use this DataFrame like you would normally do.
Writing Data
Now simply write your results back to a lakeFS path:
df.write.partitionBy("example-column").parquet(s"lakefs://${repo}/${branch}/output-path/")
The data is now created in lakeFS as new changes in your branch. You can now commit these changes or revert them.
Notes for the lakeFS Hadoop FileSystem
- Since data will not be sent to the lakeFS server, using this mode maximizes data security.
- The FileSystem implementation is tested with the latest Spark 2.X (Hadoop 2) and Spark 3.X (Hadoop 3) Bitnami images.
Case Study: SimilarWeb
See how SimilarWeb is using lakeFS with Spark to manage algorithm changes in data pipelines.