🎓 レベル:発展 | 重要度:A(必須)
📎 前提:MapReduceの考え方・Sparkの基礎 | 関連:インデックスと実行計画・データレイクとオブジェクトストレージ
要点(BLUF)
- パーティションは分散データの分割単位。各パーティションが1つのタスクとして並列処理される。シャッフルは、
groupBy/joinなどで「同じキーを同じノードに集める」ためのノード間データ移動。 - シャッフルはネットワークとディスクを大量に使う、分散処理最大のコスト。最適化の主目標は「いかにshuffleを減らすか」。
- パーティションが偏る(データskew)と、最も重い1パーティションが全体を律速する。並列なのに遅い、の典型原因。
概念 ── 並列の単位と、その間の移動
データを多数のパーティションに割ると、各パーティションを別タスクが並列処理できます。問題は「キーで集約・結合する」とき。同じキーが別々のパーティションに散っていると、全ノード間でデータを引っ越して同じ所に集める必要があり、これがシャッフルです。
flowchart LR
subgraph BEFORE["map後(キーが散在)"]
A1["A,B"]
A2["A,C"]
A3["B,C"]
end
subgraph AFTER["shuffle後(キーごとに集約)"]
K1["A,A"]
K2["B,B"]
K3["C,C"]
end
BEFORE -->|ネットワーク移動| AFTER
仕組み ── データ偏り(skew)が律速する
キーをパーティションにハッシュ分割しますが、特定キーが極端に多いと、そのキーのパーティションだけ巨大になります。
# data が極端に多いキー集合を3パーティションにハッシュ分割
N_PART = 3
keys = ["data"]*50 + ["spark"]*5 + ["pipeline"]*5
# 各キーを stable_hash(key) % N_PART で割り当て
実行結果(実機・決定的ハッシュ):
キー総数: 60, パーティション数: 3
partition 0: 0
partition 1: 5 #####
partition 2: 55 #######################################################
最大/最小 = 55/0 -> 偏りがあると最も重いパーティションが律速
3並列にしたのに、partition 2 に55件が集中。全体の処理時間は「最も重いパーティション」で決まるため、他が空いていても遅い。これがデータskewです。3台あっても、実質1台分の重さがボトルネックになります。
設計の勘所 ── shuffleを減らす・skewを散らす
| 問題 | 対策 |
|---|---|
| shuffleが多い | 不要なgroupBy/joinを削る・事前集約(map側集約) |
| 大⨯小のjoin | ブロードキャストjoin(小テーブルを全ノードに配り移動を回避) |
| skew(偏り) | キーにsalt(乱数)を足して分散・偏りキーを別処理 |
| パーティション過多/過少 | 適切な数に再分割(repartition/coalesce) |
| 同じキーで何度も集約 | パーティション分割をキーに合わせて固定(再shuffle回避) |
- ブロードキャストjoin:片方が小さければ、それを全ノードにコピーしてしまえばshuffle不要。大規模joinの定番最適化。
- partition pruning:読む段階で不要パーティションを飛ばす(→ データレイクとオブジェクトストレージ の日付パーティション)。そもそも読まない=最強の最適化。
なぜそうするか ── ネットワークが一番遅い
なぜshuffleをそこまで嫌うのか。計算(CPU)より、ノード間のデータ移動(ネットワーク・ディスク)が桁違いに遅いからです。分散処理は「計算を分けて速くする」のが狙いですが、その代償に移動コストが乗る。移動が大きすぎると、分けたことの利得を食い潰します。だから「データの置き方(パーティション)を、処理のキーに合わせて、移動が最小になるよう設計する」のが分散最適化の本質。これは単一マシンで「索引でディスクI/Oを減らす」(→ インデックスと実行計画)のと同じ——一番遅い資源(移動・I/O)を減らすという共通原理です。
⚠️ よくある落とし穴
- 並列度を上げれば速いと思う → skewがあると最重パーティションが律速。偏りを先に散らす。
- 巨大⨯巨大joinを素朴に書く → 大量shuffle。可能なら片方を小さくしてブロードキャスト。
- パーティションが小さすぎる(小ファイル問題)→ タスク起動オーバーヘッドで遅い。
coalesceで束ねる。 - 同じキーで繰り返し集約するのに毎回shuffle → キーでパーティション分割して再利用。
対応ラボ
data-engineering-study/labs/06_distributed.py(PYTHONIOENCODING=utf-8 で実行・skewによる偏りを決定的ハッシュで確認済み)。
関連
- 上流の処理モデルは MapReduceの考え方・Sparkの基礎
- 単一マシンでI/Oを減らす索引は インデックスと実行計画、置き方の最適化は データレイクとオブジェクトストレージ
- 分散の一貫性・合意の理論は 分散システム へ