Writing SQL Server Data to Delta Lake: The End-to-End Pattern

Getting data from SQL Server into Delta Lake is one of the most common first tasks when a team starts moving to Databricks. The JDBC path works, but there are enough gotchas in the full end-to-end flow that it's worth walking through a complete, production-realistic pattern rather than the minimal code sample you'll find in most documentation.

The Full Pattern

from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, lit
from delta.tables import DeltaTable
import mlflow

# --- Configuration ---
jdbc_url = "jdbc:sqlserver://prod-sql.mycompany.com:1433;databaseName=Operations"
source_table = "dbo.CustomerOrders"
target_path = "/mnt/myproject/bronze/customer_orders"
checkpoint_table = "pipeline_control.watermarks"
pipeline_name = "customer_orders_extract"

svc_password = dbutils.secrets.get(scope="myproject-prod", key="sql-svc-password")

# --- Get watermark ---
last_processed = spark.sql(f"""
    SELECT COALESCE(MAX(last_extracted_at), '2019-01-01 00:00:00') as wm
    FROM {checkpoint_table}
    WHERE pipeline = '{pipeline_name}'
""").collect()[0]["wm"]

print(f"Extracting records updated after: {last_processed}")
# --- Extract from SQL Server ---
# Use partition column for parallel read if table is large
source_df = (spark.read
  .format("jdbc")
  .option("url", jdbc_url)
  .option("user", "svc_databricks")
  .option("password", svc_password)
  .option("query", f"""
      SELECT order_id, customer_id, order_date, total_amount, status, region,
             updated_at
      FROM {source_table}
      WHERE updated_at > '{last_processed}'
  """)
  .option("fetchsize", "10000")  # tune for network throughput
  .load()
  .withColumn("ingested_at", current_timestamp())
  .withColumn("source_system", lit("operations_db"))
)

row_count = source_df.count()
print(f"Extracted {row_count} rows")
# --- Write to Delta bronze (append) ---
(source_df.write
  .format("delta")
  .mode("append")
  .option("mergeSchema", "false")  # fail loudly if schema changes
  .partitionBy("region")
  .save(target_path))

# --- Update watermark ---
spark.sql(f"""
    MERGE INTO {checkpoint_table} AS t
    USING (SELECT '{pipeline_name}' AS pipeline, current_timestamp() AS ts) AS s
    ON t.pipeline = s.pipeline
    WHEN MATCHED THEN UPDATE SET last_extracted_at = s.ts
    WHEN NOT MATCHED THEN INSERT (pipeline, last_extracted_at) VALUES (s.pipeline, s.ts)
""")

The Decisions Embedded in This Pattern

fetchsize: The default JDBC fetch size is 10 rows, which means thousands of round trips for a large extract. Set it to 10,000 or higher to reduce round-trip overhead. Watch for memory pressure if individual rows are very wide.

mergeSchema false on bronze: Bronze is a raw copy of the source. If the source schema changes, you want to know about it immediately — not silently accept an unexpected column. Let the pipeline fail, investigate, and decide whether the schema change is expected before re-running with mergeSchema true.

Watermark in a control table: Better than storing state in a notebook variable, which doesn't survive restarts. The control table is the persistent record of what's been processed. MERGE ensures the row is created on first run and updated on subsequent runs.

Source timestamp dependency: This pattern only works if the source table has a reliable updated_at that's always set on insert and update. If it's nullable, not maintained consistently, or set by application code that might miss updates, add a fallback or switch to CDC. Trust but verify your source timestamps before depending on them for incremental loads. As always, I'm here to help.

Read more