The Python Data Interrogator: Systematic Schema Analysis for Every New Data Source

Every time I connect to a new data source, the first hour is the same: exploratory queries to understand what's actually in there. How many rows, what columns, what are the null rates, does that "primary key" actually have duplicates, what does a typical row look like. I got tired of writing the same queries for the fourth time this year, so I built a proper version of the data interrogator I've been using in various forms since 2019.

Version two is more systematic, handles Spark DataFrames natively (in addition to JDBC sources), and produces output that loads directly into the metadata repository.

The Core Profile Function

from pyspark.sql import DataFrame
from pyspark.sql.functions import col, count, countDistinct, when, isnan, isnull, min, max, approx_count_distinct
from typing import Any
import json

def profile_dataframe(df: DataFrame, table_name: str) -> dict[str, Any]:
row_count = df.count()

# Column-level analysis
column_profiles = []
for field in df.schema.fields:
col_name = field.name
col_type = str(field.dataType)

# Null/NaN count
null_count = df.select(
count(when(isnull(col(col_name)) | isnan(col(col_name)), col_name)).alias("n")
).collect()[0]["n"] if "DoubleType" in col_type or "FloatType" in col_type else \
df.select(count(when(isnull(col(col_name)), col_name)).alias("n")).collect()[0]["n"]

# Distinct count (approximate for large datasets)
distinct_count = df.select(
approx_count_distinct(col(col_name)).alias("d")
).collect()[0]["d"]

profile = {
"column_name": col_name,
"data_type": col_type,
"null_count": int(null_count),
"null_pct": round(null_count / row_count * 100, 2) if row_count > 0 else 0.0,
"approx_distinct_count": int(distinct_count),
"is_likely_key": distinct_count == row_count and null_count == 0
}

# Min/max for orderable types
if any(t in col_type for t in ["Int", "Long", "Double", "Float", "Date", "Timestamp", "Decimal"]):
stats = df.select(
min(col(col_name)).alias("min_val"),
max(col(col_name)).alias("max_val")
).collect()[0]
profile["min_value"] = str(stats["min_val"])
profile["max_value"] = str(stats["max_val"])

column_profiles.append(profile)

# Sample rows
sample = [row.asDict() for row in df.limit(5).collect()]

return {
"table_name": table_name,
"row_count": int(row_count),
"column_count": len(df.schema.fields),
"columns": column_profiles,
"sample_rows": sample
}

Detecting Common Problems

def flag_data_quality_issues(profile: dict) -> list[str]:
issues = []
for col in profile["columns"]:
if col["null_pct"] > 20:
issues.append(f"HIGH NULLS: {col['column_name']} is {col['null_pct']}% null")
if col["approx_distinct_count"] == 1:
issues.append(f"CONSTANT COLUMN: {col['column_name']} has only 1 distinct value")
if col["approx_distinct_count"] == profile["row_count"] and "String" in col["data_type"]:
issues.append(f"POTENTIAL KEY: {col['column_name']} has unique values on every row")

# Check for columns that look like dates stored as strings
for col in profile["columns"]:
if "String" in col["data_type"] and any(
kw in col["column_name"].lower() for kw in ["date", "dt", "time", "ts"]
):
issues.append(f"TYPE MISMATCH RISK: {col['column_name']} looks like a date but is String type")

return issues

Writing Results to the Metadata Repository

def save_profile_to_catalog(profile: dict, catalog: str = "prod_analytics") -> None:
profile_df = spark.createDataFrame([{
"table_name": profile["table_name"],
"row_count": profile["row_count"],
"column_count": profile["column_count"],
"profiled_at": spark.sql("SELECT current_timestamp()").collect()[0][0],
"profile_json": json.dumps(profile)
}])

(profile_df.write
.format("delta")
.mode("append")
.option("mergeSchema", "true")
.saveAsTable(f"{catalog}.meta.table_profiles"))

# Run against a new source table
new_source_df = spark.table("hive_metastore.staging.raw_vendor_invoices")
profile = profile_dataframe(new_source_df, "staging.raw_vendor_invoices")
issues = flag_data_quality_issues(profile)
save_profile_to_catalog(profile)

print(f"Rows: {profile['row_count']:,}")
print(f"Issues detected: {len(issues)}")
for issue in issues:
print(f" * {issue}")

The profile runs in minutes on tables up to tens of millions of rows. For larger tables, the approximate distinct counts use HyperLogLog internally and are fast at any scale. Run it at source onboarding, store the result in the metadata repository, and you have a baseline to compare against when something downstream starts misbehaving. As always, I'm here to help.

Read more