Ir al contenido

Integrar tu pipeline (DVC · MLflow · Dagster)

✅ Estable — DVC, MLflow y Dagster probados en CI sobre la misma loan; lakehouse pendiente.

El motor sei nunca importa ninguna herramienta MLOps. La reproducibilidad del pipeline se delega al seam Reproducer: un adapter por herramienta que implementa la misma interfaz. El campo pipeline.tool en sei.yaml determina qué adapter se activa cuando el motor invoca sei run.

La prueba concreta de este agnosticismo es el escenario loan: el mismo código de evaluación (compliance_eval.py), el mismo tratamiento (train.py), el mismo programa de riesgos (la sección risk: de sei.yaml) — y tres backends distintos, verificados en CI.


Cat.Paradigmapipeline.toolEstado
1Git-native / ficheros versionadosdvcEstable
2Experimento → RegistrymlflowEstable
3Grafo de assets con linajedagsterEstable
4Lakehouse / tablas(pendiente)Futuro

DVC extiende git para versionar datos y modelos con contenido addressable, y describe el pipeline como un DAG en dvc.yaml. Es la categoría 1 porque la unidad de cambio es un fichero versionado en git.

sei.yaml (extracto — DVC)
pipeline: { tool: dvc, metrics: metrics.json }

El escenario loan define dos stages: featurize prepara el dataset vía Croissant (§2 del Anexo IV) y produce data/features.parquet cacheado; evaluate entrena el modelo y escribe metrics.json y model.pkl.

dvc.yaml — pipeline loan
stages:
featurize:
cmd: .venv/bin/python featurize.py
deps:
- featurize.py
- compliance_eval.py
- data/german_credit.csv
- data/german_credit.croissant.json
outs:
- data/features.parquet
evaluate:
cmd: .venv/bin/python evaluate.py
deps:
- evaluate.py
- compliance_eval.py
- train.py # el TRATAMIENTO — su cambio marca evaluate como stale
- data/features.parquet
- shared_data/policies/assessment_plan.oscal.yaml
params:
- seed
outs:
- model.pkl:
cache: true
metrics:
- metrics.json:
cache: false

dvc repro recomputa únicamente los stages cuyas dependencias han cambiado. Cuando el tratamiento consiste en sustituir train.py (V1 → V2), solo el stage evaluate queda obsoleto; featurize permanece en caché porque el dato no cambió. Este comportamiento es la expresión concreta de la deriva tipada clase B (modelo) del motor: el digest de train.py entra en la fase modelo, y featurize (clase C, datos) no se toca.

sei run invoca dvc repro, lee metrics.json y ancla el digest de dvc.lock en el bundle de evidencia firmado (pipeline_lock_digest). De este modo, el lock del pipeline es parte de la evidencia.


MLflow (cat. 2 — experimento → registry)

Sección titulada «MLflow (cat. 2 — experimento → registry)»

MLflow gestiona el ciclo experimento → registro → promoción. La unidad de cambio es una versión de modelo en el Model Registry; la promoción a @champion es el tratamiento.

sei.yaml (extracto — MLflow)
pipeline: { tool: mlflow, metrics: metrics.json }

El adapter espera un eval/mlflow_entry.py que abra un run de MLflow, ejecute el eval agnóstico, registre el modelo real en el Registry y, si el control bloqueante pasa, lo promueva al alias @champion:

eval/mlflow_entry.py (extracto)
with mlflow.start_run() as run:
_, model = compliance_eval.run(train.build_model)
metrics = json.load(open("metrics.json"))
info = mlflow.sklearn.log_model(model, name="model",
registered_model_name=REGISTERED_MODEL)
version = info.registered_model_version
if _val(metrics.get("unfair-credit-exclusion", 1.0)) < 0.03:
client.set_registered_model_alias(REGISTERED_MODEL, "champion", version)

El store es local (file:./mlruns) y no requiere servidor. MlflowReproducer lee las métricas del run via mlflow runs describe y las entrega al motor.


Dagster (cat. 3 — grafo de assets con linaje)

Sección titulada «Dagster (cat. 3 — grafo de assets con linaje)»

Dagster materializa assets con linaje explícito (code_version) y permite declarar checks de calidad nativos (asset_check). La staleness se detecta por code_version, derivado del hash del fichero de tratamiento.

sei.yaml (extracto — Dagster)
pipeline: { tool: dagster, metrics: metrics.json }

El escenario define dos assets en cadena y un asset check:

dagster_defs.py (extracto)
def _treatment_code_version() -> str:
return "train-" + hashlib.sha256(Path("train.py").read_bytes()).hexdigest()[:12]
@asset(code_version=FEATURES_CODE_VERSION)
def credit_features(context):
rows = len(compliance_eval.load_applications())
context.add_output_metadata({"rows": rows})
return rows
@asset(deps=[credit_features], code_version=_treatment_code_version())
def compliance_evaluation(context):
compliance_eval.run(train.build_model)
metrics = json.load(open("metrics.json"))
context.add_output_metadata({k: MetadataValue.float(float(_val(v)))
for k, v in metrics.items()})
return metrics
@asset_check(asset=compliance_evaluation,
description="Gate de equidad como check nativo de Dagster")
def unfair_credit_exclusion_gate(context):
dp = _val(json.load(open("metrics.json")).get("unfair-credit-exclusion", 1.0))
return AssetCheckResult(passed=dp < 0.03,
severity=AssetCheckSeverity.WARN,
metadata={"demographic_parity_diff": dp, "threshold": 0.03})

code_version se deriva del hash de train.py. Al sustituir el tratamiento (V1 → V2), el hash cambia y Dagster marca compliance_evaluation como genuinamente stale. sei run invoca dagster asset materialize --select '*' y el asset check actúa como expresión nativa paralela del gate de equidad (severidad WARN; el veredicto autoritativo lo emite el motor sei desde el OSCAL).


Para cada backend, el flujo de comandos es idéntico:

Flujo común (independiente del backend)
sei compile # programa de riesgos (risk: en sei.yaml) → assessment_plan.oscal.yaml
sei run # Reproducer → dvc repro | mlflow run | dagster materialize
sei status # detecta deriva sin recomputar
sei verify # verifica la firma del bundle
sei reconstruct # reconstruye el ciclo ISO 23894 por replay de git

Lo único que cambia entre escenarios es el valor de pipeline.tool en sei.yaml y los ficheros de definición del pipeline (dvc.yaml, eval/mlflow_entry.py, dagster_defs.py). El núcleo Rust, los controles, el AssuranceProgram y el eval agnóstico permanecen inalterados.