Building an AI/ML Data Lake With Apache Iceberg
Learn how Apache Iceberg powers AI/ML data lakes with time travel and schema flexibility, shown via a full Python-based feature store implementation.
Join the DZone community and get the full member experience.
Join For FreeAs companies collect massive amounts of data to fuel their artificial intelligence and machine learning initiatives, finding the right data architecture for storing, managing, and accessing such data is crucial. Traditional data storage practices are likely to fall short to meet the scale, variety, and velocity required by modern AI/ML workflows. Apache Iceberg steps in as a strong open-source table format to build solid and efficient data lakes for AI and ML.
What Is Apache Iceberg?
Apache Iceberg is an open table format for big analytical datasets, initially built at Netflix. It solves many of the limitations of data lakes, especially when handling the needs of AI/ML workloads. Iceberg offers a table layer over file systems or object stores, introducing database-like functionality into data lakes. The most important aspects that make Iceberg valuable for Artificial Intelligence and machine learning workloads are:
- ACID transactions: Ensure data consistency even with concurrent operations
- Optimized metadata handling: Efficiently manage tables of billions of files
- Schema evolution: Add, rename, or change column types without affecting existing ML pipelines
- Partition evolution: Change how data is partitioned without re-writing datasets
- Time travel: Access older versions of data for reproducible ML testing
- Hidden partitioning: Simplify query complexity by concealing partition complexity
Apache Iceberg Architecture for AI/ML
The following diagram showcases how Apache Iceberg fits into a modern AI/ML data lake architecture:
The architecture consists of several layers:
- Data Sources Layer: Various data inputs including streaming, batch, and CDC sources
- Ingestion Layer: Tools like Spark, Flink, and Kafka to process and load data
- Storage Layer: Apache Iceberg provides the table format, sitting on top of cloud storage
- Processing & Query Layer: Query engines that interact with Iceberg tables
- ML/AI Applications: Consume data from the data lake for training and inference
Iceberg's architecture is particularly well-suited for Machine Learning workloads because of its metadata design. Unlike traditional Hive tables that suffer from performance issues when dealing with millions of files, Iceberg uses a hierarchical metadata approach with the main metadata file ( metadata.json
) tracking table snapshots, schemas, and partition layouts.
Implementing a Feature Store With Apache Iceberg
Let's look at a practical implementation of a feature store for ML using Apache Iceberg. A feature store is a specialized data system that helps manage, store, and serve features for machine learning models.
Setting up the Environment
First, we need to set up our Spark environment with Iceberg support:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import datetime
def initialize_spark():
return SparkSession.builder \
.appName("Iceberg ML Feature Store") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSpa
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessi
.config("spark.sql.catalog.spark_catalog.type", "hive") \
.config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.local.type", "hadoop") \
.config("spark.sql.catalog.local.warehouse", "s3://your-data-lake/warehouse") \
.getOrCreate()
Creating Feature Store Tables
Next, we'll create the tables for our feature store:
# Create feature store tables
def create_feature_store_tables(spark):
# Create database if it doesn't exist
spark.sql("CREATE DATABASE IF NOT EXISTS feature_store")
# Create customer features table
spark.sql("""
CREATE TABLE IF NOT EXISTS feature_store.customer_features (
customer_id STRING,
feature_name STRING,
feature_value DOUBLE,
feature_timestamp TIMESTAMP,
valid_from TIMESTAMP,
valid_to TIMESTAMP
) USING iceberg
PARTITIONED BY (days(feature_timestamp))
""")
# Create customer labels table for ML training
spark.sql("""
CREATE TABLE IF NOT EXISTS feature_store.customer_labels (
customer_id STRING,
label_name STRING,
label_value INT,
label_timestamp TIMESTAMP,
valid_from TIMESTAMP,
valid_to TIMESTAMP
) USING iceberg
PARTITIONED BY (days(label_timestamp))
""")
# Create feature metadata table
spark.sql("""
CREATE TABLE IF NOT EXISTS feature_store.feature_metadata (
feature_name STRING,
feature_type STRING,
description STRING,
owner STRING,
created_timestamp TIMESTAMP,
updated_timestamp TIMESTAMP
) USING iceberg
""")
print("Feature store tables created successfully")
Registering Features and Metadata
Maintaining metadata about your features is critical for ML governance:
# Register features in the metadata table
def register_features(spark):
features = [
("monthly_spend", "numerical", "Customer monthly spending", "data_science_team",
datetime.datetime.now(), datetime.datetime.now()),
("tenure_days", "numerical", "Customer tenure in days", "data_science_team",
datetime.datetime.now(), datetime.datetime.now()),
("num_support_calls", "numerical", "Number of support calls in last 30 days", "data_science_team",
datetime.datetime.now(), datetime.datetime.now()),
("is_premium", "categorical", "Whether customer has premium subscription", "data_science_team",
datetime.datetime.now(), datetime.datetime.now()),
("days_since_last_purchase", "numerical", "Days since last purchase", "data_science_team",
datetime.datetime.now(), datetime.datetime.now())
]
schema = StructType([
StructField("feature_name", StringType(), True),
StructField("feature_type", StringType(), True),
StructField("description", StringType(), True),
StructField("owner", StringType(), True),
StructField("created_timestamp", TimestampType(), True),
StructField("updated_timestamp", TimestampType(), True)
])
features_df = spark.createDataFrame(features, schema)
# Merge into feature metadata table using Iceberg's MERGE capability
features_df.createOrReplaceTempView("features_temp")
spark.sql("""
MERGE INTO feature_store.feature_metadata AS target
USING features_temp AS source
ON target.feature_name = source.feature_name
WHEN MATCHED THEN
UPDATE SET
feature_type = source.feature_type,
description = source.description,
owner = source.owner,
updated_timestamp = source.updated_timestamp
WHEN NOT MATCHED THEN
INSERT (feature_name, feature_type, description, owner, created_timestamp, updated_timestamp)
VALUES (source.feature_name, source.feature_type, source.description,
source.owner, source.created_timestamp, source.updated_timestamp)
""")
print("Features registered in metadata table")
Creating a Point-In-Time Training Dataset
One of the most powerful capabilities of Iceberg for ML is creating point-in-time correct training datasets using time travel:
# Create point-in-time training dataset using time travel
def create_training_dataset(spark, as_of_timestamp=None):
# If no timestamp provided, use the latest data
timestamp_clause = ""
if as_of_timestamp:
timestamp_clause = f"AS OF TIMESTAMP '{as_of_timestamp}'"
# Create point-in-time correct training dataset
training_df = spark.sql(f"""
WITH features AS (
SELECT
customer_id,
feature_name,
feature_value
FROM feature_store.customer_features {timestamp_clause}
WHERE feature_timestamp <= CURRENT_TIMESTAMP()
),
labels AS (
SELECT
customer_id,
label_value as churn
FROM feature_store.customer_labels {timestamp_clause}
WHERE label_name = 'will_churn'
AND label_timestamp <= CURRENT_TIMESTAMP()
)
SELECT
f.customer_id,
MAX(CASE WHEN f.feature_name = 'monthly_spend' THEN f.feature_value ELSE NULL END) as monthly_spend,
MAX(CASE WHEN f.feature_name = 'tenure_days' THEN f.feature_value ELSE NULL END) as tenure_days,
MAX(CASE WHEN f.feature_name = 'num_support_calls' THEN f.feature_value ELSE NULL END) as num_support_calls,
MAX(CASE WHEN f.feature_name = 'is_premium' THEN f.feature_value ELSE NULL END) as is_premium,
MAX(CASE WHEN f.feature_name = 'days_since_last_purchase' THEN f.feature_value ELSE NULL END) as days_since_last_purchase,
l.churn
FROM features f
JOIN labels l ON f.customer_id = l.customer_id
GROUP BY f.customer_id, l.churn
""")
return training_df
Comparing Table Snapshots for ML Analysis
Iceberg's snapshot functionality allows us to compare feature distributions between different points in time, which is extremely valuable for understanding data drift:
# Compare feature distributions between snapshots
def compare_snapshots(spark, snapshot_id_1, snapshot_id_2):
# Query the first snapshot
spark.sql(f"""
CREATE OR REPLACE TEMPORARY VIEW snapshot_1 AS
SELECT * FROM feature_store.customer_features FOR VERSION AS OF {snapshot_id_1}
""")
# Query the second snapshot
spark.sql(f"""
CREATE OR REPLACE TEMPORARY VIEW snapshot_2 AS
SELECT * FROM feature_store.customer_features FOR VERSION AS OF {snapshot_id_2}
""")
# Compare stats between snapshots
print("Comparing feature value distributions between snapshots:")
spark.sql("""
SELECT
feature_name,
'Snapshot 1' as snapshot,
COUNT(*) as count,
MIN(feature_value) as min_value,
MAX(feature_value) as max_value,
AVG(feature_value) as avg_value,
STDDEV(feature_value) as stddev_value
FROM snapshot_1
GROUP BY feature_name
UNION ALL
SELECT
feature_name,
'Snapshot 2' as snapshot,
COUNT(*) as count,
MIN(feature_value) as min_value,
MAX(feature_value) as max_value,
AVG(feature_value) as avg_value,
STDDEV(feature_value) as stddev_value
FROM snapshot_2
GROUP BY feature_name
ORDER BY feature_name, snapshot
""").show(truncate=False)
Main Pipeline Execution
Finally, here's how we put it all together in a complete feature store pipeline:
# Main function to run the feature store pipeline
def run_feature_store_pipeline():
spark = initialize_spark()
# Create the feature store tables
create_feature_store_tables(spark)
# Register feature definitions in metadata
register_features(spark)
# Get initial snapshot ID
initial_snapshot = spark.sql("""
SELECT snapshot_id
FROM feature_store.customer_features.history
ORDER BY made_current_at DESC
LIMIT 1
""").collect()[0][0]
# Create training dataset
print("Creating initial training dataset...")
training_df = create_training_dataset(spark)
# Update features with new data
# ... (code for updating data)
# Get new snapshot ID
updated_snapshot = spark.sql("""
SELECT snapshot_id
FROM feature_store.customer_features.history
ORDER BY made_current_at DESC
LIMIT 1
""").collect()[0][0]
# Compare snapshots to analyze feature drift
compare_snapshots(spark, initial_snapshot, updated_snapshot)
# Create reproducible training dataset using time travel
print("Recreating training dataset using time travel...")
as_of_time = spark.sql("""
SELECT made_current_at
FROM feature_store.customer_features.history
ORDER BY made_current_at ASC
LIMIT 1
""").collect()[0][0]
reproduced_training_df = create_training_dataset(spark, as_of_timestamp=as_of_time)
print("Feature store pipeline completed successfully!")
return spark
if __name__ == "__main__":
# Run the feature store pipeline
spark = run_feature_store_pipeline()
Benefits of Apache Iceberg for AI/ML Workloads
There are several key benefits to using Apache Iceberg for AI/ML data lakes:
- Data Quality and Consistency: ACID transactions ensure that ML models learn on consistent data even when updates are happening concurrently.
- Schema Flexibility: ML workloads often involve adding or modifying features without disrupting existing pipelines. Iceberg schema evolution makes it easy.
- Data Quality and Consistency: ACID transactions ensure consistent data are utilized by ML models even in the event of concurrent updates.
- Efficient Queries: Iceberg's management of metadata and partitioning strategies result in highly efficient query performance, critical for feature extraction.
- Scalability: Iceberg effectively supports tables with billions of files and petabytes of data, required for large ML applications
Conclusion
Apache Iceberg is revolutionizing via open source how companies build data lakes for AI/ML workloads. Its robust functionality around schema evolution, time travel, and metadata management addresses many of the pains data scientists encounter when working with large-scale data.
By implementing a Machine Learning feature-store using Iceberg, enterprises are able to deliver data consistency, do experimentation reproducible, and improve query performance. These benefits directly translate to faster model development cycles, more reliable models, and ultimately improved AI/ML outcomes.
As ML workloads grow in scale and complexity, table frameworks like Apache Iceberg are becoming essential building blocks for modern data architecture. Whether you're building a new ML platform or restructuring an existing data lake, Iceberg offers a solid foundation to support your AI/ML data needs.
Opinions expressed by DZone contributors are their own.
Comments