Incremental Loads in Databricks: The Patterns That Actually Work

Most data engineering pipelines start with full loads. You truncate and reload. Every run processes every row from the source. It works, it's simple, and it ages badly — as your source tables grow, your pipeline run time grows with them, until you're running a 4-hour job every night to load a table that changed 0.1% since yesterday.

Incremental loads are the answer, and Databricks has a few patterns worth knowing.

Pattern 1: Watermark on a Timestamp Column

If your source table has a reliable "last modified" or "created at" column, you can use it as a watermark to identify what's new since the last successful run.

# Read the last processed timestamp from a control table
last_processed = spark.sql("SELECT MAX(last_processed_at) FROM pipeline_state WHERE table_name = 'orders'").collect()[0][0]

# Fetch only new/updated rows since that timestamp
new_rows = (spark.read
  .format("jdbc")
  .option("url", jdbc_url)
  .option("query", f"SELECT * FROM dbo.Orders WHERE updated_at > '{last_processed}'")
  .option("user", svc_user)
  .option("password", dbutils.secrets.get("myproject", "sql-password"))
  .load())

# Process and merge into silver layer
# ... transformation logic ...

# Update the watermark after successful processing
spark.sql(f"UPDATE pipeline_state SET last_processed_at = current_timestamp() WHERE table_name = 'orders'")

The weakness: this only works if the source has a reliable updated_at column. Many production SQL Server tables don't — rows get updated without the timestamp being touched, or the column doesn't exist at all.

Pattern 2: Date Partitioning at Bronze

For event-based tables (orders, transactions, log entries) that are insert-only or rarely updated, partition bronze by ingestion date and process one day at a time:

# Yesterday's data
yesterday = '2019-05-09'

# Load into a dated partition
(daily_df.write
  .format("delta")
  .mode("overwrite")
  .option("replaceWhere", f"ingestion_date = '{yesterday}'")
  .partitionBy("ingestion_date")
  .save("/mnt/myproject/bronze/orders"))

The replaceWhere option is the key here — it replaces only the rows matching that condition, not the entire table. This means you can safely re-run yesterday's load without touching any other day's data. Idempotent by design.

Pattern 3: Change Data Capture with Delta MERGE

For tables that have inserts, updates, and deletes, you want to capture all three operation types. If your source system publishes a CDC stream (Debezium, SQL Server Change Tracking, or similar), you get a feed of change events that you can apply to your Delta table:

from delta.tables import DeltaTable

target = DeltaTable.forName(spark, "silver.customers")

# cdc_df has columns: operation (I/U/D), key columns, data columns
target.alias("t").merge(
    cdc_df.filter("operation != 'D'").alias("s"),
    "t.customer_id = s.customer_id"
).whenMatchedUpdateAll() \
 .whenNotMatchedInsertAll() \
 .execute()

# Handle deletes separately
deleted_ids = cdc_df.filter("operation = 'D'").select("customer_id")
target.alias("t").merge(
    deleted_ids.alias("d"),
    "t.customer_id = d.customer_id"
).whenMatchedDelete() \
 .execute()

The Idempotency Requirement

Whichever pattern you choose, make sure a re-run of the same time window produces the same result. Pipelines fail. When the retry runs, you don't want duplicate data. The replaceWhere pattern and the MERGE pattern are both naturally idempotent. The watermark pattern requires careful bookkeeping in your control table to be safe on retry. As always, I'm here to help.

Read more