Using lakeFS with Apache Iceberg
To enrich your Iceberg tables with lakeFS capabilities, you can use the lakeFS implementation of the Iceberg catalog. You will then be able to query your Iceberg tables using lakeFS references, such as branches, tags, and commit hashes:
SELECT * FROM catalog.ref.db.table
Setup
Use the following Maven dependency to install the lakeFS custom catalog:
<dependency>
<groupId>io.lakefs</groupId>
<artifactId>lakefs-iceberg</artifactId>
<version>0.1.4</version>
</dependency>
Include the lakefs-iceberg
jar in your package list along with Iceberg. For example:
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.3_2.12:1.3.0,io.lakefs:lakefs-iceberg:0.1.4")
Configure
Set up the Spark SQL catalog:
.config("spark.sql.catalog.lakefs", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.lakefs.catalog-impl", "io.lakefs.iceberg.LakeFSCatalog") \
.config("spark.sql.catalog.lakefs.warehouse", f"lakefs://{repo_name}") \
.config("spark.sql.catalog.lakefs.cache-enabled", "false")
Configure the S3A Hadoop FileSystem with your lakeFS connection details. Note that these are your lakeFS endpoint and credentials, not your S3 ones.
.config("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
.config("spark.hadoop.fs.s3a.endpoint", "https://example-org.us-east-1.lakefscloud.io") \
.config("spark.hadoop.fs.s3a.access.key", "AKIAIO5FODNN7EXAMPLE") \
.config("spark.hadoop.fs.s3a.secret.key", "wJalrXUtnFEMI/K3MDENG/bPxRfiCYEXAMPLEKEY") \
.config("spark.hadoop.fs.s3a.path.style.access", "true")
spark-shell --conf spark.sql.catalog.lakefs="org.apache.iceberg.spark.SparkCatalog" \
--conf spark.sql.catalog.lakefs.catalog-impl="io.lakefs.iceberg.LakeFSCatalog" \
--conf spark.sql.catalog.lakefs.warehouse="lakefs://example-repo" \
--conf spark.sql.catalog.lakefs.cache-enabled="false" \
--conf spark.hadoop.fs.s3.impl="org.apache.hadoop.fs.s3a.S3AFileSystem" \
--conf spark.hadoop.fs.s3a.endpoint="https://example-org.us-east-1.lakefscloud.io" \
--conf spark.hadoop.fs.s3a.access.key="AKIAIO5FODNN7EXAMPLE" \
--conf spark.hadoop.fs.s3a.secret.key="wJalrXUtnFEMI/K3MDENG/bPxRfiCYEXAMPLEKEY" \
--conf spark.hadoop.fs.s3a.path.style.access="true"
Using Iceberg tables with lakeFS
Create a table
To create a table on your main branch, use the following syntax:
CREATE TABLE lakefs.main.db1.table1 (id int, data string);
Insert data into the table
INSERT INTO lakefs.main.db1.table1 VALUES (1, 'data1');
INSERT INTO lakefs.main.db1.table1 VALUES (2, 'data2');
Create a branch
We can now commit the creation of the table to the main branch:
lakectl commit lakefs://example-repo/main -m "my first iceberg commit"
Then, create a branch:
lakectl branch create lakefs://example-repo/dev -s lakefs://example-repo/main
Make changes on the branch
We can now make changes on the branch:
INSERT INTO lakefs.dev.db1.table1 VALUES (3, 'data3');
Query the table
If we query the table on the branch, we will see the data we inserted:
SELECT * FROM lakefs.dev.db1.table1;
Results in:
+----+------+
| id | data |
+----+------+
| 1 | data1|
| 2 | data2|
| 3 | data3|
+----+------+
However, if we query the table on the main branch, we will not see the new changes:
SELECT * FROM lakefs.main.db1.table1;
Results in:
+----+------+
| id | data |
+----+------+
| 1 | data1|
| 2 | data2|
+----+------+
Migrating an existing Iceberg Table to lakeFS Catalog
This is done through an incremental copy from the original table into lakeFS.
- Create a new lakeFS repository
lakectl repo create lakefs://example-repo <base storage path>
-
Initiate a spark session that can interact with the source iceberg table and the target lakeFS catalog.
Here’s an example of Hadoop and S3 session and lakeFS catalog with per-bucket config:
SparkConf conf = new SparkConf(); conf.set("spark.hadoop.fs.s3a.path.style.access", "true"); // set hadoop on S3 config (source tables we want to copy) for spark conf.set("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog"); conf.set("spark.sql.catalog.hadoop_prod.type", "hadoop"); conf.set("spark.sql.catalog.hadoop_prod.warehouse", "s3a://my-bucket/warehouse/hadoop/"); conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"); conf.set("spark.hadoop.fs.s3a.bucket.my-bucket.access.key", "<AWS_ACCESS_KEY>"); conf.set("spark.hadoop.fs.s3a.bucket.my-bucket.secret.key", "<AWS_SECRET_KEY>"); // set lakeFS config (target catalog and repository) conf.set("spark.sql.catalog.lakefs", "org.apache.iceberg.spark.SparkCatalog"); conf.set("spark.sql.catalog.lakefs.catalog-impl", "io.lakefs.iceberg.LakeFSCatalog"); conf.set("spark.sql.catalog.lakefs.warehouse", "lakefs://example-repo"); conf.set("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions"); conf.set("spark.hadoop.fs.s3a.bucket.example-repo.access.key", "<LAKEFS_ACCESS_KEY>"); conf.set("spark.hadoop.fs.s3a.bucket.example-repo.secret.key", "<LAKEFS_SECRET_KEY>"); conf.set("spark.hadoop.fs.s3a.bucket.example-repo.endpoint" , "<LAKEFS_ENDPOINT>");
-
Create Schema in lakeFS and copy the data
Example of copy with spark-sql:
-- Create Iceberg Schema in lakeFS CREATE SCHEMA IF NOT EXISTS <lakefs-catalog>.<branch>.<db> -- Create new iceberg table in lakeFS from the source table (pre-lakeFS) CREATE TABLE IF NOT EXISTS <lakefs-catalog>.<branch>.<db> USING iceberg AS SELECT * FROM <iceberg-original-table>