Delta Lake + Structured Streaming: ACID for Your Kafka Consumer

Writing a Spark Structured Streaming job that reads from Kafka and writes to Parquet files sounds straightforward until you watch it crash mid-write and leave your output directory in an ambiguous state. Did it write all the records for that micro-batch? Half of them? None? The checkpoint committed, but the files might be incomplete. Now you either re-process and risk duplicates, or you accept a gap and hope nobody notices.

Delta Lake, which Databricks open-sourced in 2019, solves this at the storage layer. It adds a transaction log on top of Parquet files, giving you atomicity and consistency guarantees for streaming writes. Either a micro-batch's worth of data is fully written and committed to the transaction log, or it is not — there is no partial state.

How Delta Streaming Writes Work

When you write a Structured Streaming query to a Delta table, Spark uses the Delta transaction log as part of its exactly-once delivery mechanism. Each micro-batch writes a new transaction to the log. If the job crashes mid-write, the uncommitted transaction is not in the log, and the next run picks up from the last committed checkpoint offset — with no duplicates and no gaps.

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, current_timestamp

spark = SparkSession.builder     .appName("SensorRawLandingDelta")     .config("spark.jars.packages", "io.delta:delta-core_2.12:0.6.1")     .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")     .getOrCreate()

raw_stream = spark.readStream     .format("kafka")     .option("kafka.bootstrap.servers", "kafka-broker-01:9092")     .option("subscribe", "sensor_readings_raw")     .option("startingOffsets", "latest")     .load()

raw_landed = raw_stream.select(
    col("topic"),
    col("partition"),
    col("offset"),
    col("timestamp").alias("kafka_ts"),
    col("value").cast("string").alias("raw_payload"),
    current_timestamp().alias("arrived_at")
)

# Write to Delta — ACID guarantees on every micro-batch
raw_query = raw_landed.writeStream     .format("delta")     .outputMode("append")     .option("checkpointLocation", "/mnt/checkpoints/sensor-raw-delta")     .trigger(processingTime="5 minutes")     .start("/mnt/datalake/raw/sensor_readings")

Schema Enforcement and Evolution

Delta enforces the schema of the target table by default. If a new batch contains a column that the table does not have, the write fails rather than silently adding it. This catches schema drift at write time — before the bad column reaches your gold layer.

If you want to intentionally evolve the schema (add a new column as producers add it), you enable schema evolution explicitly:

# Allow Delta to add new columns as the Kafka payload evolves
raw_query = raw_landed.writeStream     .format("delta")     .option("mergeSchema", "true")    # new columns are added to the table schema
    .option("checkpointLocation", "/mnt/checkpoints/sensor-raw-delta")     .start("/mnt/datalake/raw/sensor_readings")

The difference: without mergeSchema, a new column causes a write error (good for strict pipelines). With it, new columns are automatically added (good for flexible raw zones where you want to capture everything). For a raw landing zone, mergeSchema=true is usually the right call — you want to store whatever arrives. Schema enforcement belongs at Stage 2 (deserialize).

Time Travel: The Streaming Replay Power-Up

Delta's transaction log also enables time travel — reading the table as it was at a specific version or timestamp. For streaming pipelines, this is the "oh no, we had a bug in Stage 2" recovery mechanism:

# Read the raw zone as it was before the bad Stage 2 run
df_before_bad_run = spark.read     .format("delta")     .option("timestampAsOf", "2019-05-14 09:00:00")     .load("/mnt/datalake/raw/sensor_readings")

# Reprocess through the fixed deserializer
fixed_structured = deserialize_and_validate(df_before_bad_run)
fixed_structured.write.format("delta").mode("overwrite").save("/mnt/datalake/bronze/sensor_readings")

You do not need to replay from Kafka. You do not need to know the exact offset range of the affected records. You query the raw zone as of a timestamp, reprocess through the fixed logic, and overwrite the bad output. Delta's ACID guarantees mean the overwrite is atomic — readers of the gold layer see either the old data or the new data, never a mix.

The Checkpoint/Transaction Log Relationship

Two things track progress: the Structured Streaming checkpoint (which Kafka offsets have been read) and the Delta transaction log (which writes have been committed). These are separate but coordinated. If you delete the streaming checkpoint, Spark will re-read Kafka from the beginning — potentially writing duplicates to Delta. If you delete Delta's transaction log, you break the table. Neither is recoverable easily. Do not delete either unless you are intentionally rebuilding from scratch. I am here to help with the specifics of Delta streaming setup for your architecture.

Read more