A repeating pattern in Databricks pipelines: you have files landing in ADLS or S3 — from Kafka Capture, from Event Hubs Capture, from an SFTP drop — and you need to pick up every new file as it arrives. The naive implementation is a batch job that lists the directory, filters for files newer than the last run, and processes them. The naive implementation has a race condition: two jobs running close together will list some files twice, and files that arrive between the listing and the processing will be missed until the next run.
Databricks Auto Loader, which shipped in 2020, is the proper solution. It uses cloud provider file notification APIs (Azure Event Grid, AWS SNS/SQS) to detect new files without listing the entire directory. It maintains its own checkpoint store so it knows exactly which files have been processed. It integrates with Structured Streaming, giving you the same trigger options and exactly-once guarantees you get with Kafka sources.
The Basic Pattern
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, col
spark = SparkSession.builder.appName("AutoLoaderRawLanding").getOrCreate()
# Auto Loader as a Structured Streaming source
raw_stream = spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.schemaLocation", "/mnt/checkpoints/sensor-autoloader/schema") .load("/mnt/landing/sensor_readings/")
raw_landed = raw_stream.select(
col("_metadata.file_path").alias("source_file"),
col("_metadata.file_modification_time").alias("file_ts"),
current_timestamp().alias("arrived_at"),
col("*") # all parsed fields from the JSON file
)
raw_landed.writeStream .format("delta") .outputMode("append") .option("checkpointLocation", "/mnt/checkpoints/sensor-autoloader/checkpoint") .option("mergeSchema", "true") .trigger(availableNow=True) # process all new files and stop
.table("raw.sensor_readings_files")
Schema Inference and the Schema Location
Auto Loader infers the schema from the first batch of files it sees and stores it at cloudFiles.schemaLocation. On subsequent runs, it uses the stored schema rather than re-inferring. This has two important behaviors:
First, new columns in incoming files are detected automatically. Auto Loader logs a schema evolution event and, if you have mergeSchema=true on the write side, adds the new column to the Delta table. If you do not have mergeSchema=true, new columns cause a write failure — which is the right behavior if you want strict schema control.
Second, the schema location is a checkpoint too. Do not delete it unless you want Auto Loader to re-infer the schema from scratch, which can cause issues if the first batch of files it sees is not representative of the full schema.
Rescue Column: Handling Schema Drift Gracefully
# Enable the rescue column: catches any fields that don't match the inferred schema
raw_stream = spark.readStream .format("cloudFiles") .option("cloudFiles.format", "json") .option("cloudFiles.schemaLocation", "/mnt/checkpoints/sensor-autoloader/schema") .option("cloudFiles.inferColumnTypes", "true") .option("rescuedDataColumn", "_rescued_data") # non-schema fields land here
.load("/mnt/landing/sensor_readings/")
The rescue column stores any JSON fields that do not match the inferred schema as a JSON string within that column. This is Auto Loader's version of "land raw before parsing" — fields that drift outside the expected schema are preserved in _rescued_data rather than being silently dropped. Check this column regularly in your raw zone monitoring. A non-empty _rescued_data is a signal that the producer schema has changed.
Auto Loader vs. Direct Kafka Streaming
Use Auto Loader when your data lands as files (Kafka/Event Hubs Capture, SFTP drops, batch exports from source systems). Use direct Kafka Structured Streaming when you need low latency and can connect to the Kafka broker directly.
Auto Loader is not a Kafka consumer — it reads files, not topics. But in the context of the three-stage architecture, it is a natural fit for Stage 1 when the upstream system delivers files rather than a stream endpoint. The checkpoint behavior and exactly-once guarantees are equivalent. I am here to help if you are evaluating whether Auto Loader or direct Kafka streaming is the right fit for your source.