Your ETL Code Is Software: Treat It Like Software

I've made this argument before on this blog — in the context of Hive scripts in 2012, in the context of MapReduce jobs in 2013. I'm going to make it again in 2016 because the tools are better now, the excuses are thinner, and the data teams I work with are still not doing it.

Your PySpark jobs are software. Your Airflow DAGs are software. Treat them like software: version control, automated tests, and a deployment pipeline that doesn't involve SSH-ing into a scheduler node.

Testing PySpark: It's Tractable Now

spark-testing-base is a library from Holden Karau that makes PySpark unit testing straightforward. It provides a SparkTestCase base class that manages a local SparkContext for each test and gives you assertion methods for comparing DataFrames. No cluster required, tests run in seconds.

from sparktestingbase.sqltestcase import SQLTestCase
from pyspark.sql import Row
from myteam.transforms import calculate_session_duration

class TestSessionDuration(SQLTestCase):

    def test_calculates_duration_correctly(self):
        input_data = self.sqlCtx.createDataFrame([
            Row(user_id="leia_organa", session_id="s001", event_ts=1000),
            Row(user_id="leia_organa", session_id="s001", event_ts=1300),
            Row(user_id="leia_organa", session_id="s001", event_ts=1600),
        ])

        result = calculate_session_duration(input_data)

        expected = self.sqlCtx.createDataFrame([
            Row(user_id="leia_organa", session_id="s001", duration_seconds=600)
        ])

        self.assertDataFrameEqual(result, expected)

    def test_handles_single_event_session(self):
        input_data = self.sqlCtx.createDataFrame([
            Row(user_id="han_solo", session_id="s002", event_ts=5000),
        ])
        result = calculate_session_duration(input_data)
        # Single event — duration should be 0
        self.assertEqual(result.first()["duration_seconds"], 0)

Testing Airflow DAGs

DAG validation tests catch the most common Airflow errors before they hit the scheduler: circular dependencies, import errors in DAG files, and task IDs that break Airflow's naming rules.

import unittest
from airflow.models import DagBag

class TestDagIntegrity(unittest.TestCase):

    def setUp(self):
        self.dagbag = DagBag(dag_folder="dags/", include_examples=False)

    def test_no_import_errors(self):
        self.assertEqual(
            len(self.dagbag.import_errors), 0,
            f"DAG import errors: {self.dagbag.import_errors}"
        )

    def test_all_dags_have_owner(self):
        for dag_id, dag in self.dagbag.dags.items():
            self.assertNotEqual(
                dag.default_args.get("owner"), "airflow",
                f"DAG {dag_id} uses default owner — set a real owner"
            )

    def test_daily_session_pipeline_task_count(self):
        dag = self.dagbag.get_dag("daily_session_pipeline")
        self.assertIsNotNone(dag)
        self.assertEqual(len(dag.tasks), 4)

A CI Pipeline for Data Code

Here's a minimal Jenkinsfile for a Python + Spark + Airflow shop:

pipeline {
    agent any
    stages {
        stage('Install') {
            steps {
                sh 'pip install -r requirements.txt'
            }
        }
        stage('Lint') {
            steps {
                sh 'flake8 src/ dags/ tests/'
            }
        }
        stage('Unit Tests') {
            steps {
                sh 'pytest tests/unit/ -v --tb=short'
            }
        }
        stage('DAG Integrity') {
            steps {
                sh 'pytest tests/dags/ -v'
            }
        }
        stage('Deploy to Dev') {
            when { branch 'develop' }
            steps {
                sh 'aws s3 sync dags/ s3://airflow-dags/dev/'
                sh 'aws s3 sync src/ s3://spark-jobs/dev/'
            }
        }
        stage('Deploy to Prod') {
            when { branch 'main' }
            steps {
                sh 'aws s3 sync dags/ s3://airflow-dags/prod/'
                sh 'aws s3 sync src/ s3://spark-jobs/prod/'
            }
        }
    }
}

That's the whole pipeline. No magic steps. The Airflow scheduler polls S3 for DAG file changes — syncing the DAGs folder is the deployment. The Spark job source lands in S3 and gets referenced by path in your Airflow operators.

The Argument Against (and Why It's Wrong)

"Our data pipelines change too fast to maintain tests." I've heard this at three different clients in the last two years. It's never true. What's true is that the pipeline code changes fast because nobody is paying down the complexity — each change is a patch on top of a patch. Tests don't slow down that kind of development; they reveal that the rate of change is unsustainable, which is information you need.

The tooling is there. The patterns are clear. If your data pipeline code doesn't have tests, that's a choice, not a constraint. As always, I'm here to help change it.

Read more