🎓 レベル:標準 | 重要度:A(必須)
📎 前提:再現性とバージョニング | 関連:MLパイプラインのオーケストレーションとCI/CD/CT
要点(BLUF)
- MLパイプラインは学習パイプライン(データ→検証→特徴量→学習→評価→登録)と推論パイプライン(特徴量取得→推論→後処理)の2系統に分かれます。両者を分けて設計するのが基本です。
- 2つの系統は特徴量変換ロジックを共有しなければなりません。共有しないと学習と推論で前処理がずれ、学習/推論スキューが発生します(学習推論スキューの防止)。
- 各ステージは「入力アーティファクトを受け取り、出力アーティファクトを生む純粋な関数」として設計する。これにより再実行・キャッシュ・テスト・部分差し替えが可能になり、ノートブックの手作業から脱却できます。
1. なぜパイプライン化するのか
ノートブックでセルを上から実行する開発は、PoC では速いですが運用では破綻します。「どのセルをどの順で実行したか」が記録されず、再現できず、自動再学習もできません。パイプライン化とは、この暗黙の手順を**明示的な有向グラフ(DAG)**として定義し、機械が再実行できるようにすることです。
2. 学習パイプラインと推論パイプライン
flowchart TB
subgraph 学習パイプライン
A1["データ取り込み"] --> A2["データ検証"]
A2 --> A3["特徴量生成"]
A3 --> A4["モデル学習"]
A4 --> A5["モデル評価"]
A5 -->|"合格"| A6["モデル登録"]
end
subgraph 推論パイプライン
B1["リクエスト到着"] --> B2["特徴量取得"]
B2 --> B3["推論"]
B3 --> B4["後処理と応答"]
end
A3 -.->|"同じ変換ロジックを共有"| B2
A6 -.->|"登録モデルをロード"| B3
- 学習パイプライン:定期的・トリガー駆動でバッチ実行。成果物はモデルとメトリクス。
- 推論パイプライン:リクエストごと(オンライン)またはバッチで実行。成果物は予測。
- 共有点:特徴量変換は両方で同一でなければならない(点線)。これが MLOps 設計の最重要の接点です。
3. ステージをアーティファクトで繋ぐ
各ステージは「前段のアーティファクトを読み、次段のアーティファクトを書く」関数です。
| ステージ | 入力 | 出力(アーティファクト) |
|---|---|---|
| データ取り込み | ソース | 生データスナップショット(+ハッシュ) |
| データ検証 | 生データ | 検証レポート(スキーマ・分布) |
| 特徴量生成 | 検証済データ | 特徴量テーブル+変換器 |
| 学習 | 特徴量 | モデル候補 |
| 評価 | モデル+検証データ | メトリクス+合否判定 |
| 登録 | 合格モデル | レジストリのバージョン |
アーティファクトを介して疎結合にすると、途中から再実行(特徴量だけ作り直して再学習)でき、各ステージを独立にテストできます。
4. 動く最小例:ステージを関数で繋ぐ最小パイプライン
オーケストレーターの実装(Airflow/Kubeflow)はMLパイプラインのオーケストレーションとCI/CD/CTに譲り、ここでは「ステージ=関数、アーティファクト=戻り値」という骨格を最小化します。評価ゲートで不合格なら登録しない、という分岐も入れます。
import numpy as np
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
# --- 各ステージ=純粋な関数 ---
def ingest(seed):
rng = np.random.default_rng(seed)
X = rng.normal(0, 1, (600, 3))
y = (X[:, 0] - X[:, 1] + 0.3*rng.normal(size=600) > 0).astype(int)
return {"X": X, "y": y}
def validate(data):
assert not np.isnan(data["X"]).any(), "欠損あり"
report = {"n_rows": len(data["y"]), "pos_rate": float(data["y"].mean())}
return report
def featurize(data):
# 学習・推論で共有すべき変換(ここでは標準化の統計量を返す)
mu, sd = data["X"].mean(0), data["X"].std(0)
Xf = (data["X"] - mu) / sd
return {"X": Xf, "y": data["y"], "transform": (mu, sd)}
def train(feat):
n = len(feat["y"]); k = int(n*0.8)
model = LogisticRegression().fit(feat["X"][:k], feat["y"][:k])
return {"model": model, "holdout": (feat["X"][k:], feat["y"][k:])}
def evaluate(trained, threshold=0.8):
Xh, yh = trained["holdout"]
acc = accuracy_score(yh, trained["model"].predict(Xh))
return {"accuracy": acc, "passed": acc >= threshold}
# --- パイプライン実行(DAGを直列に) ---
data = ingest(seed=0)
report = validate(data)
feat = featurize(data)
trained = train(feat)
metrics = evaluate(trained)
print(f"検証レポート : 行数={report['n_rows']} 正例率={report['pos_rate']:.2f}")
print(f"評価 : 精度={metrics['accuracy']:.3f} 合格={metrics['passed']}")
print(f"登録判定 : {'モデルを登録' if metrics['passed'] else '登録を中止(ゲート不合格)'}")
出力:
検証レポート : 行数=600 正例率=0.46
評価 : 精度=0.950 合格=True
登録判定 : モデルを登録
出力の意味:データ取り込みから評価ゲートまでが関数の連鎖として走り、精度0.950がしきい値0.8を超えたので「登録」に進みました。もし精度が基準を下回れば登録は止まります。この「各ステージが純粋関数で、評価ゲートが自動の番人になる」骨格が、本物のオーケストレータ(MLパイプラインのオーケストレーションとCI/CD/CT)でも変わらない設計の核です。
5. 運用の勘所
- 特徴量変換を1か所に:学習と推論で同じコードパスを使う。コピペで二重実装するとスキューの温床(学習推論スキューの防止)。
- 評価をゲートにする:人手の判断でなく、しきい値・前バージョンとの比較を自動の合否判定にする。
- アーティファクトに版を刻む:各ステージ出力に出自(再現性とバージョニング)を付け、途中再実行を可能にする。
- 冪等に作る:同じ入力で同じ出力。リトライしても壊れないステージにする。
なぜそうするのか
パイプライン化は「自動化のため」と思われがちですが、本質は再現性とテスト可能性です。ステージを純粋関数とアーティファクトに分けることで、壊れた箇所を特定でき、部分的に再実行でき、各ステージを単体テストできる——これが手作業ノートブックでは絶対に得られない運用上の安全性です。自動化(成熟度を上げる、MLOps成熟度モデルと自動化レベル)はその上に乗ります。
⚠️ よくある落とし穴
- 学習と推論で前処理を別実装する:必ずスキューが出る。変換ロジックは共有する。
- 巨大な1関数にまとめる:再実行・テスト・差し替えができない。ステージに分ける。
- 評価ゲートを人手の目視にする:見落とし・主観でデグレが本番に漏れる。自動の合否判定にする。
- アーティファクトを上書き保存する:過去バージョンを失い、ロールバック不能になる。
対応 lab
- なし(概念ノート)。本物のオーケストレーション最小例は MLパイプラインのオーケストレーションとCI/CD/CT で扱います。
関連ノート
- 再現性とバージョニング(アーティファクトの出自)
- 学習推論スキューの防止(変換共有の重要性)
- MLパイプラインのオーケストレーションとCI/CD/CT(本物のオーケストレータ)
- 実験トラッキング(メトリクスとアーティファクトの記録)
- MLOps成熟度モデルと自動化レベル(パイプラインの自動化段階)
- 第1章 MLOpsの全体像 目次
- MLOps・AI基盤 全体目次