Mímisbrunnr知恵の泉

← データエンジニアリング 一覧

🎓 レベル:発展 | 重要度:A(必須)

📎 前提:MapReduceの考え方Sparkの基礎 | 関連:インデックスと実行計画データレイクとオブジェクトストレージ

要点(BLUF)

概念 ── 並列の単位と、その間の移動

データを多数のパーティションに割ると、各パーティションを別タスクが並列処理できます。問題は「キーで集約・結合する」とき。同じキーが別々のパーティションに散っていると、全ノード間でデータを引っ越して同じ所に集める必要があり、これがシャッフルです。

flowchart LR
    subgraph BEFORE["map後(キーが散在)"]
      A1["A,B"] 
      A2["A,C"]
      A3["B,C"]
    end
    subgraph AFTER["shuffle後(キーごとに集約)"]
      K1["A,A"]
      K2["B,B"]
      K3["C,C"]
    end
    BEFORE -->|ネットワーク移動| AFTER

仕組み ── データ偏り(skew)が律速する

キーをパーティションにハッシュ分割しますが、特定キーが極端に多いと、そのキーのパーティションだけ巨大になります。

# data が極端に多いキー集合を3パーティションにハッシュ分割
N_PART = 3
keys = ["data"]*50 + ["spark"]*5 + ["pipeline"]*5
# 各キーを stable_hash(key) % N_PART で割り当て

実行結果(実機・決定的ハッシュ):

キー総数: 60, パーティション数: 3
partition 0:   0 
partition 1:   5 #####
partition 2:  55 #######################################################
最大/最小 = 55/0 -> 偏りがあると最も重いパーティションが律速

3並列にしたのに、partition 2 に55件が集中。全体の処理時間は「最も重いパーティション」で決まるため、他が空いていても遅い。これがデータskewです。3台あっても、実質1台分の重さがボトルネックになります。

設計の勘所 ── shuffleを減らす・skewを散らす

問題対策
shuffleが多い不要なgroupBy/joinを削る・事前集約(map側集約)
大⨯小のjoinブロードキャストjoin(小テーブルを全ノードに配り移動を回避)
skew(偏り)キーにsalt(乱数)を足して分散・偏りキーを別処理
パーティション過多/過少適切な数に再分割(repartition/coalesce
同じキーで何度も集約パーティション分割をキーに合わせて固定(再shuffle回避)

なぜそうするか ── ネットワークが一番遅い

なぜshuffleをそこまで嫌うのか。計算(CPU)より、ノード間のデータ移動(ネットワーク・ディスク)が桁違いに遅いからです。分散処理は「計算を分けて速くする」のが狙いですが、その代償に移動コストが乗る。移動が大きすぎると、分けたことの利得を食い潰します。だから「データの置き方(パーティション)を、処理のキーに合わせて、移動が最小になるよう設計する」のが分散最適化の本質。これは単一マシンで「索引でディスクI/Oを減らす」(→ インデックスと実行計画)のと同じ——一番遅い資源(移動・I/O)を減らすという共通原理です。

⚠️ よくある落とし穴

対応ラボ

data-engineering-study/labs/06_distributed.pyPYTHONIOENCODING=utf-8 で実行・skewによる偏りを決定的ハッシュで確認済み)。

関連

第6章 分散処理 目次