328 lines
7.7 KiB
Markdown
328 lines
7.7 KiB
Markdown
---
|
|
id: cs-mapreduce-patterns
|
|
title: MapReduce / Distributed Compute — Spark / DuckDB / Beam
|
|
category: Coding
|
|
status: draft
|
|
source_trust_level: B
|
|
verification_status: conceptual
|
|
created_at: 2026-05-09
|
|
updated_at: 2026-05-09
|
|
tags: [cs, mapreduce, distributed, vibe-coding]
|
|
tech_stack: { language: "Python / SQL", applicable_to: ["Backend", "Data"] }
|
|
applied_in: []
|
|
aliases: [MapReduce, Spark, Beam, dataflow, shuffle, partitioning, distributed compute]
|
|
---
|
|
|
|
# MapReduce / Distributed Compute
|
|
|
|
> 큰 data set 을 여러 worker 로 분산. **Map (변환) → Shuffle (재분배) → Reduce (집계)**. 모던: Spark / Beam / DuckDB / DataFusion. SQL 가 더 간단할 때가 많음.
|
|
|
|
## 📖 핵심 개념
|
|
- Map: 입력 → key-value pairs.
|
|
- Shuffle: key 별 grouping.
|
|
- Reduce: key 당 집계.
|
|
- Skew: hot key 가 worker 하나만 쥐어주면 느려짐.
|
|
|
|
## 💻 코드 패턴
|
|
|
|
### Conceptual MapReduce
|
|
```
|
|
입력: ['the cat sat', 'the dog ran']
|
|
|
|
Map → [('the', 1), ('cat', 1), ('sat', 1), ('the', 1), ('dog', 1), ('ran', 1)]
|
|
|
|
Shuffle → {'the': [1,1], 'cat': [1], 'sat': [1], 'dog': [1], 'ran': [1]}
|
|
|
|
Reduce → {'the': 2, 'cat': 1, 'sat': 1, 'dog': 1, 'ran': 1}
|
|
```
|
|
|
|
### Spark (PySpark)
|
|
```python
|
|
from pyspark.sql import SparkSession
|
|
|
|
spark = SparkSession.builder.appName('wc').getOrCreate()
|
|
|
|
# RDD 식 (low-level)
|
|
rdd = spark.sparkContext.textFile('s3://bucket/logs/*.txt')
|
|
counts = (
|
|
rdd
|
|
.flatMap(lambda line: line.split())
|
|
.map(lambda w: (w, 1))
|
|
.reduceByKey(lambda a, b: a + b)
|
|
)
|
|
counts.saveAsTextFile('s3://bucket/output/')
|
|
|
|
# DataFrame (high-level, 권장)
|
|
df = spark.read.text('s3://bucket/logs/*.txt')
|
|
counts = (
|
|
df.selectExpr('explode(split(value, " ")) as word')
|
|
.groupBy('word').count()
|
|
)
|
|
counts.write.parquet('s3://bucket/output/')
|
|
```
|
|
|
|
### Spark SQL (가장 간단)
|
|
```python
|
|
df.createOrReplaceTempView('logs')
|
|
spark.sql("""
|
|
SELECT word, COUNT(*) as cnt
|
|
FROM logs LATERAL VIEW explode(split(value, ' ')) t AS word
|
|
GROUP BY word
|
|
ORDER BY cnt DESC
|
|
LIMIT 100
|
|
""").show()
|
|
```
|
|
|
|
### DuckDB (single-node, large)
|
|
```python
|
|
import duckdb
|
|
con = duckdb.connect()
|
|
|
|
# Parquet 직접 query
|
|
result = con.execute("""
|
|
SELECT date, region, SUM(amount) as total
|
|
FROM 's3://bucket/sales/*.parquet'
|
|
GROUP BY date, region
|
|
ORDER BY total DESC
|
|
""").fetchdf()
|
|
```
|
|
|
|
→ TB 까지 single node OK. Spark 보다 simple, 빠름.
|
|
|
|
### Apache Beam (portable, runners)
|
|
```python
|
|
import apache_beam as beam
|
|
|
|
with beam.Pipeline() as p:
|
|
(p
|
|
| 'Read' >> beam.io.ReadFromText('gs://bucket/*.txt')
|
|
| 'Split' >> beam.FlatMap(lambda line: line.split())
|
|
| 'Pair' >> beam.Map(lambda w: (w, 1))
|
|
| 'Group' >> beam.CombinePerKey(sum)
|
|
| 'Write' >> beam.io.WriteToText('gs://bucket/out')
|
|
)
|
|
```
|
|
|
|
→ Beam = code + runner (Dataflow, Flink, Spark) 분리.
|
|
|
|
### Partitioning (parallelism)
|
|
```python
|
|
# Spark
|
|
df.repartition(200, 'date') # 200 partition by date
|
|
df.coalesce(10) # 줄임 (no shuffle)
|
|
|
|
# 큰 partition 적게 vs 작은 많이?
|
|
# Aim: ~128 MB / partition (Spark default)
|
|
```
|
|
|
|
### Shuffle (가장 비싼 operation)
|
|
```
|
|
GroupBy / Join / Distinct = shuffle.
|
|
|
|
Tips:
|
|
- Pre-aggregate before shuffle (combiner)
|
|
- Broadcast join (작은 table 모든 worker)
|
|
- Bucket-aligned tables (sort-merge join, no shuffle)
|
|
```
|
|
|
|
```python
|
|
# Spark broadcast join
|
|
from pyspark.sql.functions import broadcast
|
|
|
|
big.join(broadcast(small), 'key') # small 가 모든 worker 로 복제
|
|
```
|
|
|
|
### Skew (불균형)
|
|
```
|
|
key 'X' 가 90% rows = worker 하나만 일.
|
|
|
|
해결:
|
|
1. Salting: key + random suffix → 분산
|
|
2. Skew join hint
|
|
3. 작은 키 따로 처리
|
|
|
|
# Spark 3
|
|
df.hint('skew', 'user_id')
|
|
```
|
|
|
|
```python
|
|
# Salting 예
|
|
import random
|
|
df = df.withColumn('salt', (rand() * 10).cast('int'))
|
|
df.groupBy('key', 'salt').agg(...)
|
|
.groupBy('key').agg(...) # 다시 합치기
|
|
```
|
|
|
|
### File format
|
|
```
|
|
- Parquet: columnar, compress, predicate push-down (default)
|
|
- ORC: 비슷
|
|
- Avro: row-based + schema (Kafka)
|
|
- CSV: 텍스트 — 큰 data 비효율
|
|
- JSON: 큰 → 비효율
|
|
|
|
→ Analytics = Parquet 거의 항상.
|
|
```
|
|
|
|
### Predicate pushdown
|
|
```sql
|
|
-- DuckDB / Spark
|
|
SELECT * FROM 's3://b/*.parquet'
|
|
WHERE date = '2026-05-09' -- 파일 metadata 로 skip
|
|
AND region = 'US' -- column scan
|
|
```
|
|
|
|
→ 안 읽음. 빠름.
|
|
|
|
### Iceberg / Delta / Hudi (table format)
|
|
```python
|
|
# Apache Iceberg
|
|
spark.sql("""
|
|
CREATE TABLE catalog.db.events (
|
|
id bigint, ts timestamp, payload string
|
|
) USING iceberg
|
|
PARTITIONED BY (days(ts))
|
|
""")
|
|
|
|
# Time travel
|
|
spark.read.option('snapshot-id', '12345').table('catalog.db.events')
|
|
|
|
# Schema evolution
|
|
spark.sql('ALTER TABLE catalog.db.events ADD COLUMN region string')
|
|
```
|
|
|
|
→ Parquet 위 ACID + version + schema 진화.
|
|
|
|
### Ray (modern alternative)
|
|
```python
|
|
import ray
|
|
|
|
@ray.remote
|
|
def process(chunk):
|
|
return [x * 2 for x in chunk]
|
|
|
|
ray.init()
|
|
data = list(range(10_000))
|
|
chunks = [data[i:i+1000] for i in range(0, len(data), 1000)]
|
|
|
|
futures = [process.remote(c) for c in chunks]
|
|
results = ray.get(futures)
|
|
```
|
|
|
|
→ Spark 보다 일반 Python 친화. ML pipeline 에 강함.
|
|
|
|
### Polars (single-node, modern)
|
|
```python
|
|
import polars as pl
|
|
|
|
df = pl.scan_parquet('s3://bucket/*.parquet')
|
|
result = (
|
|
df
|
|
.filter(pl.col('date') == '2026-05-09')
|
|
.group_by('user_id')
|
|
.agg(pl.col('amount').sum())
|
|
.collect() # lazy → eager
|
|
)
|
|
```
|
|
|
|
→ Pandas 보다 10x 빠름 (Rust + Arrow).
|
|
|
|
### Dataflow patterns
|
|
```
|
|
- Batch: 큰 데이터, 한 번에 처리 (nightly job)
|
|
- Streaming: 실시간 (click events, IoT)
|
|
- Windowing: streaming → batch-like (1 분 window)
|
|
- Watermark: late event 처리 시점
|
|
```
|
|
|
|
### Beam streaming
|
|
```python
|
|
(p
|
|
| beam.io.ReadFromKafka(...)
|
|
| beam.WindowInto(beam.window.FixedWindows(60)) # 1 min
|
|
| beam.GroupByKey()
|
|
| beam.io.WriteToBigQuery(...)
|
|
)
|
|
```
|
|
|
|
### dbt (SQL-based ETL)
|
|
```sql
|
|
-- models/daily_revenue.sql
|
|
{{ config(materialized='incremental') }}
|
|
|
|
SELECT date, SUM(amount) as revenue
|
|
FROM {{ ref('orders') }}
|
|
{% if is_incremental() %}
|
|
WHERE date > (SELECT MAX(date) FROM {{ this }})
|
|
{% endif %}
|
|
GROUP BY date
|
|
```
|
|
|
|
→ Spark / Python 안 써도 됨. SQL → DAG.
|
|
|
|
### ETL vs ELT
|
|
```
|
|
ETL (옛): Extract → Transform → Load to warehouse.
|
|
ELT (현): Extract → Load (raw) → Transform in warehouse.
|
|
|
|
ELT = warehouse 가 SQL 강하니 거기서 변환. dbt + Snowflake / BigQuery / DuckDB 가 default.
|
|
```
|
|
|
|
### Job orchestration
|
|
```
|
|
- Airflow: 가장 인기, 무거움
|
|
- Dagster: 모던, asset-aware
|
|
- Prefect: 모던, simple
|
|
- Argo Workflows: K8s-native
|
|
- Temporal: workflow + business logic
|
|
- Cron: 작은 job
|
|
```
|
|
|
|
### Cost
|
|
```
|
|
Spark on EMR: 큰 cluster $ — TB 안 넘으면 과해.
|
|
DuckDB on single VM: TB 까지 OK $$.
|
|
BigQuery: pay per GB scanned $$$.
|
|
Snowflake: pay per second compute $$$.
|
|
```
|
|
|
|
### When NOT 분산
|
|
```
|
|
< 100 GB: Pandas / Polars / DuckDB (single node).
|
|
100 GB - 10 TB: DuckDB / Spark on 1 node.
|
|
10 TB+: Spark / BigQuery / Snowflake cluster.
|
|
|
|
→ "Big data is dead" — 대부분 single node 로 충분.
|
|
```
|
|
|
|
## 🤔 의사결정 기준
|
|
| Size | 추천 |
|
|
|---|---|
|
|
| < 1 GB | Pandas / Polars |
|
|
| 1-100 GB | Polars / DuckDB |
|
|
| 100 GB - 10 TB | DuckDB on big VM |
|
|
| > 10 TB | Spark / BigQuery |
|
|
| Streaming | Beam / Flink / Materialize |
|
|
| ML pipeline | Ray |
|
|
| SQL preferable | dbt + warehouse |
|
|
|
|
## ❌ 안티패턴
|
|
- **모든 거 Spark**: 작은 dataset 도 Spark = 느림 + 비싼.
|
|
- **CSV in production**: parquet 가 10x 빠름.
|
|
- **Repartition 너무 많이**: shuffle 비싼.
|
|
- **Skew 무시**: 1 worker 가 다 함.
|
|
- **Broadcast 큰 table**: OOM.
|
|
- **Local file**: HDFS / S3 / GCS.
|
|
- **dbt 없이 SQL 흩어짐**: 종속성 안 보임.
|
|
|
|
## 🤖 LLM 활용 힌트
|
|
- Map / Shuffle / Reduce 의 cost 인지.
|
|
- DuckDB / Polars 가 modern (single node 만으로 충분).
|
|
- Parquet + S3 표준.
|
|
- dbt 가 SQL workflow 답.
|
|
|
|
## 🔗 관련 문서
|
|
- [[Data_Eng_Lakehouse]]
|
|
- [[Data_Eng_dbt]]
|
|
- [[DB_DuckDB_Embedded]]
|