Great Expectations in a CI Pipeline: Failing the Build When Data Goes Bad

I've been writing about getting tSQLt into a CI pipeline since 2013 — the principle being that tests which only run when a developer remembers to run them don't actually protect you. The same argument applies to data quality checks. An expectation suite that lives in a Jupyter notebook and gets run manually before someone feels like it is not a data quality gate. It's a data quality suggestion.

The fix is the same: put the checks in a CI pipeline, run them automatically, fail the pipeline when they fail.

The Pattern

For data pipelines that run on a schedule or trigger, the CI integration looks like this:

  1. Pipeline runs: fetch data, transform, load to staging
  2. Great Expectations validates the staging output against the expectation suite
  3. If validation fails: pipeline exits non-zero, alert fires, load to production is blocked
  4. If validation passes: promote staging to production, build Data Docs, archive results

The key: validation must happen before data reaches production consumers. Validating after the fact tells you there was a problem; validating before the load gives you the chance to do something about it.

A Command-Line Validation Script

Make validation runnable from the shell so your CI system can invoke it:

#!/usr/bin/env python
# validate_pipeline_output.py

import sys
import great_expectations as ge
from great_expectations.data_context import DataContext

def main(data_path, suite_name):
    context = DataContext(".")

    batch_kwargs = {
        "path": data_path,
        "datasource": "pipeline_output",
        "data_asset_name": "pipeline_output"
    }

    results = context.run_validation_operator(
        "action_list_operator",
        assets_to_validate=[context.get_batch(batch_kwargs, suite_name)],
        run_id={"run_name": f"ci_run_{data_path}"}
    )

    if not results["success"]:
        print(f"VALIDATION FAILED: {suite_name} against {data_path}")
        print(f"Expectations failed: {results['statistics']['unsuccessful_expectations']}")
        sys.exit(1)

    print(f"VALIDATION PASSED: {results['statistics']['successful_expectations']} expectations met")
    sys.exit(0)

if __name__ == "__main__":
    main(sys.argv[1], sys.argv[2])

In your CI configuration (whether that's a Jenkins pipeline, a TFS build definition, or a shell script run by a scheduler):

# After pipeline produces output file
python validate_pipeline_output.py output/storm_events_processed.parquet storm_clean_suite

# Exit code 0 = passed, continue to production load
# Exit code 1 = failed, halt and alert

Integrating With an Orchestrator

If you're using an orchestration tool like Apache Airflow, the validation step is a task in the DAG with a dependency on the transform task and a prerequisite for the load task:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator

def run_ge_validation(**context):
    import great_expectations as ge
    from great_expectations.data_context import DataContext

    ctx = DataContext("/path/to/ge/project")
    # ... run validation, raise exception on failure ...

with DAG('storm_etl', ...) as dag:
    extract = BashOperator(task_id='extract', ...)
    transform = PythonOperator(task_id='transform', ...)
    validate = PythonOperator(task_id='validate', python_callable=run_ge_validation)
    load = BashOperator(task_id='load_to_prod', ...)

    extract >> transform >> validate >> load

If validate raises an exception, Airflow marks it failed and doesn't run load. The data quality gate is enforced by the pipeline structure, not by manual discipline.

What Changes When Validation Is Automated

When data quality checks run automatically on every pipeline execution, the conversation about bad data changes. Instead of "we think the data might be wrong, let's investigate," it's "the validation run from 14:32 UTC failed on expect_column_values_to_not_be_null('EventID') with 847 violations — here's the Data Docs link." That's a different starting point for the investigation, and it's the kind of operational data quality feedback that actually gets acted on. As always, I'm here to help.

Read more