Mímisbrunnr知恵の泉

← データエンジニアリング 一覧

🎓 レベル:標準 | 重要度:A(必須)

📎 前提:べき等性と再実行SQL変換とdbt | 関連:データガバナンスとカタログ

要点(BLUF)

概念 ── パイプラインをDAGで表す

「抽出が終わってから整形、整形が両方終わってから結合、結合の後にレポート」。この依存を絵にするとDAGになります。

flowchart LR
    EX["extract"] --> SU["stg_users"]
    EX --> SO["stg_orders"]
    SU --> JM["join_mart"]
    SO --> JM
    JM --> RP["report"]

オーケストレータは、この図を読んで「依存を満たす順序」を自動で決め、独立なタスク(stg_usersとstg_orders)は並列に走らせます。dbtのref()によるDAG(→ SQL変換とdbt)や、Sparkの実行DAG(→ Sparkの基礎)と同じ構造です。

仕組み ── トポロジカルソートで実行順を出す

依存を満たす順序は、入次数0(依存なし)のタスクから順に取り出すトポロジカルソートで求まります。

# 各タスク -> 依存(先に終えるべき)タスク
deps = {"extract":[], "stg_users":["extract"], "stg_orders":["extract"],
        "join_mart":["stg_users","stg_orders"], "report":["join_mart"]}
order = topo_sort(deps)   # 入次数0から順に取り出す

実行結果(実機・純Pythonで再現):

実行順(依存を満たす順序):
  extract -> stg_orders -> stg_users -> join_mart -> report

extractが最初、reportが最後。stg_users/stg_ordersは順不同(並列可能)。依存が循環していると順序が定義できないため、検出してエラーにします。

循環依存の検出 -> 循環依存を検出

設計の勘所 ── オーケストレータが担うもの

機能中身
依存解決DAGから実行順を決定・独立タスクは並列
スケジュールcron的な定期実行(毎時・毎日)
再実行(retry)失敗タスクを自動リトライ・部分再実行
バックフィル過去日付分をまとめて再生成
監視・通知失敗・遅延をアラート、実行履歴を可視化

なぜそうするか ── 依存と失敗を一元管理する

なぜcronでスクリプトを並べないのか。依存と失敗の管理が破綻するからです。cronは「9時にA、9時15分にB」と時刻でごまかすしかなく、Aが長引けばBが未完成データを読む。タスクが増えるほど時刻調整は地獄になります。オーケストレータは時刻でなく依存で順序を保証し、失敗を検知してそこから再実行し、全体の状態を可視化する。「いつ動かすか」でなく「何の後に動かすか」で組むことが、壊れにくいパイプラインの条件です。

⚠️ よくある落とし穴

対応ラボ

data-engineering-study/labs/07_streaming_orchestration.pyPYTHONIOENCODING=utf-8 で実行・トポロジカルソートと循環検出を確認済み)。

関連

第7章 ストリーミングとオーケストレーション 目次