Skip to content

Integrate your pipeline (DVC · MLflow · Dagster)

✅ Stable — DVC, MLflow and Dagster tested in CI against the same loan scenario; lakehouse pending.

The sei engine never imports any MLOps tool. Pipeline reproducibility is delegated to the Reproducer seam: one adapter per tool, all implementing the same interface. The pipeline.tool field in sei.yaml determines which adapter is activated when the engine calls sei run.

The concrete proof of this agnosticism is the loan scenario: the same evaluation code (compliance_eval.py), the same treatment (train.py), the same risk program (the risk: section of sei.yaml) — and three different backends, verified in CI.


Cat.Paradigmpipeline.toolStatus
1Git-native / versioned filesdvcStable
2Experiment → RegistrymlflowStable
3Asset graph with lineagedagsterStable
4Lakehouse / tables(pending)Future

DVC extends git to version data and models with content-addressable storage, and describes the pipeline as a DAG in dvc.yaml. It is category 1 because the unit of change is a file versioned in git.

sei.yaml (excerpt — DVC)
pipeline: { tool: dvc, metrics: metrics.json }

The loan scenario defines two stages: featurize prepares the dataset via Croissant (Annex IV §2) and produces a cached data/features.parquet; evaluate trains the model and writes metrics.json and model.pkl.

dvc.yaml — loan pipeline
stages:
featurize:
cmd: .venv/bin/python featurize.py
deps:
- featurize.py
- compliance_eval.py
- data/german_credit.csv
- data/german_credit.croissant.json
outs:
- data/features.parquet
evaluate:
cmd: .venv/bin/python evaluate.py
deps:
- evaluate.py
- compliance_eval.py
- train.py # the TREATMENT — its change marks evaluate as stale
- data/features.parquet
- shared_data/policies/assessment_plan.oscal.yaml
params:
- seed
outs:
- model.pkl:
cache: true
metrics:
- metrics.json:
cache: false

dvc repro recomputes only the stages whose dependencies have changed. When the treatment consists of replacing train.py (V1 → V2), only the evaluate stage becomes stale; featurize remains cached because the data has not changed. This behavior is the concrete expression of class B typed drift (model drift) in the engine: the digest of train.py enters the model phase, and featurize (class C, data) is not touched.

sei run calls dvc repro, reads metrics.json, and anchors the digest of dvc.lock in the signed evidence bundle (pipeline_lock_digest). In this way, the pipeline lock is part of the evidence.


MLflow (cat. 2 — experiment → registry)

Section titled “MLflow (cat. 2 — experiment → registry)”

MLflow manages the experiment → registration → promotion cycle. The unit of change is a model version in the Model Registry; promotion to @champion is the treatment.

sei.yaml (excerpt — MLflow)
pipeline: { tool: mlflow, metrics: metrics.json }

The adapter expects an eval/mlflow_entry.py that opens an MLflow run, executes the agnostic eval, registers the real model in the Registry, and, if the blocking control passes, promotes it to the @champion alias:

eval/mlflow_entry.py (excerpt)
with mlflow.start_run() as run:
_, model = compliance_eval.run(train.build_model)
metrics = json.load(open("metrics.json"))
info = mlflow.sklearn.log_model(model, name="model",
registered_model_name=REGISTERED_MODEL)
version = info.registered_model_version
if _val(metrics.get("unfair-credit-exclusion", 1.0)) < 0.03:
client.set_registered_model_alias(REGISTERED_MODEL, "champion", version)

The store is local (file:./mlruns) and requires no server. MlflowReproducer reads metrics from the run via mlflow runs describe and delivers them to the engine.


Dagster (cat. 3 — asset graph with lineage)

Section titled “Dagster (cat. 3 — asset graph with lineage)”

Dagster materializes assets with explicit lineage (code_version) and allows declaring native quality checks (asset_check). Staleness is detected via code_version, derived from the hash of the treatment file.

sei.yaml (excerpt — Dagster)
pipeline: { tool: dagster, metrics: metrics.json }

The scenario defines two assets in a chain and one asset check:

dagster_defs.py (excerpt)
def _treatment_code_version() -> str:
return "train-" + hashlib.sha256(Path("train.py").read_bytes()).hexdigest()[:12]
@asset(code_version=FEATURES_CODE_VERSION)
def credit_features(context):
rows = len(compliance_eval.load_applications())
context.add_output_metadata({"rows": rows})
return rows
@asset(deps=[credit_features], code_version=_treatment_code_version())
def compliance_evaluation(context):
compliance_eval.run(train.build_model)
metrics = json.load(open("metrics.json"))
context.add_output_metadata({k: MetadataValue.float(float(_val(v)))
for k, v in metrics.items()})
return metrics
@asset_check(asset=compliance_evaluation,
description="Fairness gate as a native Dagster check")
def unfair_credit_exclusion_gate(context):
dp = _val(json.load(open("metrics.json")).get("unfair-credit-exclusion", 1.0))
return AssetCheckResult(passed=dp < 0.03,
severity=AssetCheckSeverity.WARN,
metadata={"demographic_parity_diff": dp, "threshold": 0.03})

code_version is derived from the hash of train.py. When the treatment is applied (V1 → V2), the hash changes and Dagster marks compliance_evaluation as genuinely stale. sei run calls dagster asset materialize --select '*' and the asset check acts as a native parallel expression of the fairness gate (severity WARN; the authoritative verdict is emitted by the sei engine from the OSCAL).


For each backend, the command flow is identical:

Common flow (backend-independent)
sei compile # risk program (risk: in sei.yaml) → assessment_plan.oscal.yaml
sei run # Reproducer → dvc repro | mlflow run | dagster materialize
sei status # detects drift without recomputing
sei verify # verifies the bundle signature
sei reconstruct # reconstructs the ISO 23894 cycle by git replay

The only thing that changes between scenarios is the value of pipeline.tool in sei.yaml and the pipeline definition files (dvc.yaml, eval/mlflow_entry.py, dagster_defs.py). The Rust core, the controls, the AssuranceProgram, and the agnostic eval remain unchanged.