Data Quality for AI Pipelines
Your model is only as good as your data. Here's how to catch bad training data, detect drift, and enforce contracts before your predictions go wrong.
The garbage in, garbage out problem
Most ML teams spend 80% of their time on data preparation but almost none on data validation. The training pipeline ingests whatever it gets. A column that was always positive suddenly has negative values. A feature that was 99% complete drops to 70%. A categorical field gains a new value nobody mapped.
The model trains, deploys, and serves bad predictions for hours before anyone notices. The fix is not better models. The fix is catching bad data before it reaches the model.
Step 1: Validate training data
Create a provero.yaml next to your training data. Define what “good data” looks like:
source:
type: duckdb
table: read_parquet('training_data/*.parquet')
checks:
# No missing values in critical features
- not_null: [user_id, amount, category, label]
# Labels are valid
- accepted_values:
column: label
values: [0, 1]
# Feature ranges are sane
- range:
column: amount
min: 0
max: 1000000
# Enough data to train
- row_count:
min: 10000
# Feature completeness above threshold
- completeness:
column: category
min: 95%
# No duplicate training examples
- unique: user_id
# Email format valid (if using as feature)
- email_validation:
column: emailRun it before training starts:
$ provero run -c provero.yaml # Exit code 1 if any check fails, blocking the pipeline
In Airflow, add it as the first task in your training DAG:
from provero.airflow.operators import ProveroCheckOperator
validate_data = ProveroCheckOperator(
task_id="validate_training_data",
config_path="dags/provero.yaml",
)
train_model = PythonOperator(...)
validate_data >> train_model # blocks training if data is badStep 2: Detect data drift
Data drift happens when the statistical properties of your input data change over time. A feature that averaged 50 last month now averages 200. Provero's anomaly detection catches this automatically by comparing current metrics against historical baselines.
checks:
# Flag if row count drops/spikes unexpectedly
- row_count_change:
max_decrease: 20%
# Statistical anomaly detection on key metrics
- anomaly:
column: amount
method: mad # robust to outliers
sensitivity: medium
- anomaly:
column: prediction_score
method: zscore
sensitivity: high # stricter for model outputsThe anomaly check uses your result history as the baseline. Run provero run daily to build up the history, then anomalies are flagged automatically. No separate configuration, no external service, no scipy dependency.
Step 3: Enforce data contracts
In ML teams, data producers (backend, data engineering) and data consumers (ML engineers, analysts) often have different assumptions about what the data looks like. Data contracts make these assumptions explicit.
contracts:
- name: user_features_contract
owner: data-engineering
version: "2.1"
table: user_features
on_violation: block # fail the pipeline
schema:
columns:
- name: user_id
type: integer
checks: [not_null, unique]
- name: lifetime_value
type: float
checks: [not_null]
- name: segment
type: varchar
sla:
freshness: 24h # data must be < 24h old
completeness: "99%" # 99% non-null across all columnsWhen a data producer changes the schema (renames a column, changes a type), provero contract diff catches it:
$ provero contract diff v2.0.yaml v2.1.yaml BREAKING: column 'ltv' renamed to 'lifetime_value' CHANGE: column 'segment' type changed varchar(10) to varchar
Step 4: Monitor in production
Once the model is deployed, the prediction input data needs continuous validation. Use provero watch to run checks on a schedule:
$ provero watch -c production_checks.yaml --interval 5m # Runs checks every 5 minutes, alerts on failure
Add a webhook alert to Slack or PagerDuty:
alerts:
- type: webhook
url: ${SLACK_WEBHOOK}
trigger: on_failure
- type: webhook
url: ${PAGERDUTY_WEBHOOK}
trigger: on_failure
headers:
Authorization: "Bearer ${PD_TOKEN}"The full picture
Before training
Validate schema, completeness, and ranges. Block pipeline if checks fail.
$ provero run -c training_checks.yaml
After training
Compare validation set metrics against training set. Detect drift.
$ provero run -c validation_checks.yaml
Before deploy
Enforce data contracts between producer and consumer.
$ provero contract validate -c contracts.yaml
In production
Continuously monitor input data quality. Alert on anomalies.
$ provero watch -c prod_checks.yaml -i 5m
All of this runs locally, on your infra, with no cloud dependency. Apache 2.0. Install and try it:
$ pip install provero $ provero init $ provero run