Streaming Data Quality: Catching Schema Changes Before They Corrupt the Gold Layer

In batch ETL, schema changes are annoying but manageable. You run a load, the job fails, you investigate, you fix the mapping, you re-run. The damage is limited: at worst, one load cycle's worth of data is missing or wrong, and you fix it before it reaches the gold layer.

In streaming, schema changes are more dangerous because the pipeline is continuous. A producer pushes a schema change at 2pm. Your consumer does not crash — your error handling is too loose. It reads the wrong value into the wrong field for three hours before someone notices a sensor reading of 847,293 degrees (or -999, or null). By then, three hours of bad data is in your gold layer and the fix is not just "re-run the job." It is "identify and delete or correct every bad record that made it through."

The prevention has two layers: detection at ingest time and enforcement at the serving layer.

Detection at the Deserialize Stage

Your Stage 2 deserialize job should validate the structure of every message, not just parse it. There is a difference between "this JSON parses successfully" and "this JSON has the fields I expect, with the types I expect, in the ranges I expect."

from dataclasses import dataclass
from typing import Optional
import json

@dataclass
class SensorReading:
    sensor_id: str
    value: float
    unit: str
    ts_ms: int

    def validate(self):
        errors = []
        if not self.sensor_id or not isinstance(self.sensor_id, str):
            errors.append(f"sensor_id invalid: {self.sensor_id!r}")
        if not isinstance(self.value, (int, float)) or not (-100 <= self.value <= 10000):
            errors.append(f"value out of range: {self.value}")
        if self.unit not in ('C', 'F', 'K', 'Pa', 'psi', 'rpm', 'V', 'A'):
            errors.append(f"unrecognized unit: {self.unit!r}")
        if not isinstance(self.ts_ms, int) or self.ts_ms < 1_000_000_000_000:
            errors.append(f"ts_ms implausible: {self.ts_ms}")
        return errors

def parse_and_validate(raw_bytes):
    try:
        data = json.loads(raw_bytes.decode('utf-8'))
        reading = SensorReading(
            sensor_id=data.get('sensor_id'),
            value=data.get('value'),
            unit=data.get('unit'),
            ts_ms=data.get('ts_ms')
        )
        errors = reading.validate()
        if errors:
            raise ValueError(f"Validation failed: {'; '.join(errors)}")
        return reading
    except (KeyError, TypeError, json.JSONDecodeError) as e:
        raise ValueError(f"Parse failed: {e}") from e

The validation rules here are domain-specific — you need to know what values are plausible for your data. This knowledge lives in your code and needs to be updated when the domain changes. That is the cost of catching problems at ingest rather than discovering them in a BI report six weeks later.

Schema Drift Detection

Beyond message-level validation, you can detect systematic schema drift by tracking field presence and type distribution over rolling windows. If value was a float in 99.9% of messages last hour and it is suddenly a string in 20% of messages this hour, something changed on the producer side.

-- Schema drift detection query (run every hour against the raw zone)
-- Alerts when a field flips from one type distribution to another

WITH current_window AS (
    SELECT
        JSON_TYPE(JSON_QUERY(raw_payload, '$.value')) AS value_type,
        COUNT(*) AS msg_count
    FROM kafka_raw_landing
    WHERE arrived_at > DATEADD(hour, -1, GETUTCDATE())
      AND source_topic = 'sensor_readings_raw'
    GROUP BY JSON_TYPE(JSON_QUERY(raw_payload, '$.value'))
),
previous_window AS (
    SELECT
        JSON_TYPE(JSON_QUERY(raw_payload, '$.value')) AS value_type,
        COUNT(*) AS msg_count
    FROM kafka_raw_landing
    WHERE arrived_at BETWEEN DATEADD(hour, -2, GETUTCDATE()) AND DATEADD(hour, -1, GETUTCDATE())
      AND source_topic = 'sensor_readings_raw'
    GROUP BY JSON_TYPE(JSON_QUERY(raw_payload, '$.value'))
)
SELECT c.value_type, c.msg_count AS current_count, p.msg_count AS previous_count
FROM current_window c
FULL OUTER JOIN previous_window p ON c.value_type = p.value_type
WHERE c.value_type IS NULL OR p.value_type IS NULL   -- type appeared or disappeared
   OR ABS(c.msg_count - p.msg_count) > p.msg_count * 0.1;  -- >10% shift in distribution

Enforcement at the Gold Layer

Even with detection at Stage 2, enforce constraints at the gold layer as a backstop. Delta Lake constraints (ALTER TABLE CONSTRAINT) and CHECK constraints in SQL Server reject records that violate the defined rules at write time. A gold layer that rejects bad records is the last line of defense — nothing corrupt gets through, and the failed write creates a visible error in your pipeline logs rather than silently accepting garbage.

-- Delta Lake constraint on the gold sensor readings table
ALTER TABLE gold.sensor_readings
ADD CONSTRAINT valid_value CHECK (value BETWEEN -100 AND 10000);

ALTER TABLE gold.sensor_readings
ADD CONSTRAINT known_unit CHECK (unit IN ('C', 'F', 'K', 'Pa', 'psi', 'rpm', 'V', 'A'));

The goal of all three layers — message-level validation, schema drift detection, serving layer constraints — is to make schema changes visible quickly and loudly rather than silently and expensively. I am here to help design the validation rules specific to your message formats and your domain.

Read more