Infrastructure as Code for Data Pipelines: Managing Airflow Like Software

Airflow DAG files are Python. That sentence should end the conversation about whether they belong in Git, get reviewed in pull requests, and get deployed through a pipeline. They are code. The same discipline that applies to your PySpark jobs applies here.

And yet, the standard operating procedure at most shops is still: engineer writes or modifies a DAG file, SSHs into the scheduler node, copies the file into the $AIRFLOW_HOME/dags/ directory, and watches the scheduler UI to see if it picks it up. No review. No test. No rollback path beyond "SSH back in and copy the old file from wherever you saved it."

Applying infrastructure-as-code discipline to Airflow is not complicated. Here's the full picture.

DAGs in Git: The Non-Negotiable Baseline

Your DAG files live in a Git repository. All of them. One repository for DAGs is fine; co-locating DAGs with the Spark job code they orchestrate is also fine if your team prefers it. The key is that every change to every DAG goes through a commit and, if you have more than one engineer on the team, a pull request.

This gives you the obvious things — history, diff, rollback — and one less-obvious thing: the PR description becomes the changelog for your pipeline changes. When the pipeline behaves differently next week, the commit history tells you when the DAG changed and why. That's worth more than any monitoring dashboard when you're diagnosing a Saturday night incident at 11 p.m.

Testing DAGs Before They Hit the Scheduler

Airflow's DagBag loads all DAG files in a directory and reports import errors. A simple test suite that instantiates the DagBag catches the most common DAG failures before deployment:

import pytest
from airflow.models import DagBag

@pytest.fixture(scope="session")
def dagbag():
    return DagBag(dag_folder="dags/", include_examples=False)

def test_no_import_errors(dagbag):
    assert dagbag.import_errors == {}, \
        f"DAG import errors: {dagbag.import_errors}"

def test_session_pipeline_exists(dagbag):
    assert "daily_session_pipeline" in dagbag.dags

def test_session_pipeline_has_correct_schedule(dagbag):
    dag = dagbag.get_dag("daily_session_pipeline")
    assert dag.schedule_interval == "0 2 * * *"

def test_session_pipeline_task_dependencies(dagbag):
    dag = dagbag.get_dag("daily_session_pipeline")
    task_ids = {t.task_id for t in dag.tasks}
    assert "check_upstream" in task_ids
    assert "run_aggregation" in task_ids
    assert "write_report" in task_ids

    # Verify dependency ordering
    run_task = dag.get_task("run_aggregation")
    assert "check_upstream" in {t.task_id for t in run_task.upstream_list}

These tests run in seconds. They catch circular dependencies, missing task references, schedule syntax errors, and import failures — all before the DAG touches the scheduler.

Deployment via CI

The Airflow scheduler polls the dags/ directory for file changes (configurable via dag_dir_list_interval). Deployment is just copying files to that directory. On a well-configured CI pipeline, this is an S3 sync — the scheduler polls S3, or a deploy step syncs from S3 to the local dags folder.

# .travis.yml (or equivalent Jenkins/CircleCI config)
stages:
  - name: test
    script:
      - pip install -r requirements.txt
      - flake8 dags/ tests/
      - pytest tests/ -v

  - name: deploy-dev
    if: branch = develop
    script:
      - aws s3 sync dags/ s3://my-airflow-dags/dev/ --delete
      - aws s3 sync src/ s3://my-spark-jobs/dev/ --delete

  - name: deploy-prod
    if: branch = main
    script:
      - aws s3 sync dags/ s3://my-airflow-dags/prod/ --delete
      - aws s3 sync src/ s3://my-spark-jobs/prod/ --delete

Environment Configuration via Variables

DAGs should not contain environment-specific values — S3 bucket names, cluster IDs, database connection strings — hardcoded in the Python files. Airflow Variables and Connections handle this: store environment-specific values in the Airflow metadata database, and reference them in DAGs via Variable.get("my_s3_bucket"). The same DAG file works in dev and prod because the environment-specific values come from the Airflow configuration layer, not the code.

This also means your DAG files don't contain secrets. A DAG file in Git should be safe to read without knowing your production bucket names or database passwords.

If you've automated Airflow DAG deployment and hit rough edges — especially around managing Connections and Variables across environments — I'd like to compare approaches. As always, I'm here to help.

Read more