Structured Streaming Watermarks: Handling Late-Arriving Data in Your Pipeline
Structured Streaming is how you build real-time pipelines in Databricks. You've probably already seen basic examples — read from Kafka, write to Delta. What those examples almost always skip is watermarking, and that omission tends to cause subtle problems with aggregations over event time.
Why Watermarks Exist
When you're aggregating a stream over event time (for example: count orders per minute), you need to decide when a window is "done." An order with a timestamp of 2:00 PM could arrive in your stream at 2:01 PM, or at 2:15 PM if there was network lag, or at 3:00 PM if the mobile app was offline for a while. Without a watermark, your streaming query has to keep every window open indefinitely, waiting for the possibility of a late event.
A watermark tells Spark: "Accept events up to N minutes late, then consider the window closed." Once a window closes, its result is final and Spark can discard the intermediate state.
Adding a Watermark
from pyspark.sql.functions import window, count
orders_stream = (spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "kafka:9092")
.option("subscribe", "orders")
.load()
.selectExpr("CAST(value AS STRING) as json_payload", "timestamp as kafka_ts"))
# Parse the event timestamp from the payload
from pyspark.sql.functions import get_json_object, to_timestamp
orders_parsed = orders_stream.withColumn(
"order_ts",
to_timestamp(get_json_object("json_payload", "$.created_at"))
)
# Apply a watermark: accept events up to 10 minutes late
windowed_counts = (orders_parsed
.withWatermark("order_ts", "10 minutes")
.groupBy(window("order_ts", "1 minute"), "region")
.agg(count("*").alias("order_count")))What "10 Minutes Late" Actually Means
Spark tracks the maximum event timestamp seen so far. The watermark is that maximum minus your delay threshold. Windows with an end time before the watermark are considered closed and their results are emitted.
If you set a 10-minute watermark, Spark will buffer events for up to 10 minutes after their window should have closed before finalizing the window result. An event that arrives 11 minutes late is dropped — it's after the watermark and the window it belongs to is already closed.
The tradeoff is latency vs completeness. A 0-minute watermark gives you the most up-to-date results but drops late events aggressively. A 30-minute watermark buffers more state and handles laggier producers but delays when results appear.
Watermarks and Output Modes
Watermarks are required when using update or append output modes with aggregations. Without a watermark, Spark has to keep all state forever, which eventually causes memory pressure and slow jobs.
(windowed_counts.writeStream
.format("delta")
.outputMode("append") # requires a watermark with aggregations
.option("checkpointLocation", "/mnt/checkpoints/order_counts")
.trigger(processingTime="30 seconds")
.start("/mnt/myproject/gold/order_counts"))The append output mode only emits a row when its window is finalized (after the watermark passes). update emits the current count every micro-batch. complete emits the entire result table every micro-batch — only works for small aggregations because the state grows unbounded.
If you're not doing aggregations over event time, you often don't need a watermark — just stream records straight through and write them to Delta with append. Watermarks become important specifically when you're grouping by time windows and need finalized results. As always, I'm here to help.