Mímisbrunnr知恵の泉

← MLOps 一覧

🎓 レベル:標準 | 重要度:A(必須)

📎 前提:再現性とバージョニング | 関連:MLパイプラインのオーケストレーションとCI/CD/CT

要点(BLUF)

1. なぜパイプライン化するのか

ノートブックでセルを上から実行する開発は、PoC では速いですが運用では破綻します。「どのセルをどの順で実行したか」が記録されず、再現できず、自動再学習もできません。パイプライン化とは、この暗黙の手順を**明示的な有向グラフ(DAG)**として定義し、機械が再実行できるようにすることです。

2. 学習パイプラインと推論パイプライン

flowchart TB
  subgraph 学習パイプライン
    A1["データ取り込み"] --> A2["データ検証"]
    A2 --> A3["特徴量生成"]
    A3 --> A4["モデル学習"]
    A4 --> A5["モデル評価"]
    A5 -->|"合格"| A6["モデル登録"]
  end
  subgraph 推論パイプライン
    B1["リクエスト到着"] --> B2["特徴量取得"]
    B2 --> B3["推論"]
    B3 --> B4["後処理と応答"]
  end
  A3 -.->|"同じ変換ロジックを共有"| B2
  A6 -.->|"登録モデルをロード"| B3

3. ステージをアーティファクトで繋ぐ

各ステージは「前段のアーティファクトを読み、次段のアーティファクトを書く」関数です。

ステージ入力出力(アーティファクト)
データ取り込みソース生データスナップショット(+ハッシュ)
データ検証生データ検証レポート(スキーマ・分布)
特徴量生成検証済データ特徴量テーブル+変換器
学習特徴量モデル候補
評価モデル+検証データメトリクス+合否判定
登録合格モデルレジストリのバージョン

アーティファクトを介して疎結合にすると、途中から再実行(特徴量だけ作り直して再学習)でき、各ステージを独立にテストできます。

4. 動く最小例:ステージを関数で繋ぐ最小パイプライン

オーケストレーターの実装(Airflow/Kubeflow)はMLパイプラインのオーケストレーションとCI/CD/CTに譲り、ここでは「ステージ=関数、アーティファクト=戻り値」という骨格を最小化します。評価ゲートで不合格なら登録しない、という分岐も入れます。

import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

# --- 各ステージ=純粋な関数 ---
def ingest(seed):
    rng = np.random.default_rng(seed)
    X = rng.normal(0, 1, (600, 3))
    y = (X[:, 0] - X[:, 1] + 0.3*rng.normal(size=600) > 0).astype(int)
    return {"X": X, "y": y}

def validate(data):
    assert not np.isnan(data["X"]).any(), "欠損あり"
    report = {"n_rows": len(data["y"]), "pos_rate": float(data["y"].mean())}
    return report

def featurize(data):
    # 学習・推論で共有すべき変換(ここでは標準化の統計量を返す)
    mu, sd = data["X"].mean(0), data["X"].std(0)
    Xf = (data["X"] - mu) / sd
    return {"X": Xf, "y": data["y"], "transform": (mu, sd)}

def train(feat):
    n = len(feat["y"]); k = int(n*0.8)
    model = LogisticRegression().fit(feat["X"][:k], feat["y"][:k])
    return {"model": model, "holdout": (feat["X"][k:], feat["y"][k:])}

def evaluate(trained, threshold=0.8):
    Xh, yh = trained["holdout"]
    acc = accuracy_score(yh, trained["model"].predict(Xh))
    return {"accuracy": acc, "passed": acc >= threshold}

# --- パイプライン実行(DAGを直列に) ---
data = ingest(seed=0)
report = validate(data)
feat = featurize(data)
trained = train(feat)
metrics = evaluate(trained)

print(f"検証レポート : 行数={report['n_rows']} 正例率={report['pos_rate']:.2f}")
print(f"評価         : 精度={metrics['accuracy']:.3f} 合格={metrics['passed']}")
print(f"登録判定     : {'モデルを登録' if metrics['passed'] else '登録を中止(ゲート不合格)'}")

出力:

検証レポート : 行数=600 正例率=0.46
評価         : 精度=0.950 合格=True
登録判定     : モデルを登録

出力の意味:データ取り込みから評価ゲートまでが関数の連鎖として走り、精度0.950がしきい値0.8を超えたので「登録」に進みました。もし精度が基準を下回れば登録は止まります。この「各ステージが純粋関数で、評価ゲートが自動の番人になる」骨格が、本物のオーケストレータ(MLパイプラインのオーケストレーションとCI/CD/CT)でも変わらない設計の核です。

5. 運用の勘所

なぜそうするのか

パイプライン化は「自動化のため」と思われがちですが、本質は再現性とテスト可能性です。ステージを純粋関数とアーティファクトに分けることで、壊れた箇所を特定でき、部分的に再実行でき、各ステージを単体テストできる——これが手作業ノートブックでは絶対に得られない運用上の安全性です。自動化(成熟度を上げる、MLOps成熟度モデルと自動化レベル)はその上に乗ります。

⚠️ よくある落とし穴

対応 lab

関連ノート