[G1-Sync] Manual knowledge update
This commit is contained in:
@@ -0,0 +1,327 @@
|
||||
---
|
||||
id: cs-mapreduce-patterns
|
||||
title: MapReduce / Distributed Compute — Spark / DuckDB / Beam
|
||||
category: Coding
|
||||
status: draft
|
||||
source_trust_level: B
|
||||
verification_status: conceptual
|
||||
created_at: 2026-05-09
|
||||
updated_at: 2026-05-09
|
||||
tags: [cs, mapreduce, distributed, vibe-coding]
|
||||
tech_stack: { language: "Python / SQL", applicable_to: ["Backend", "Data"] }
|
||||
applied_in: []
|
||||
aliases: [MapReduce, Spark, Beam, dataflow, shuffle, partitioning, distributed compute]
|
||||
---
|
||||
|
||||
# MapReduce / Distributed Compute
|
||||
|
||||
> 큰 data set 을 여러 worker 로 분산. **Map (변환) → Shuffle (재분배) → Reduce (집계)**. 모던: Spark / Beam / DuckDB / DataFusion. SQL 가 더 간단할 때가 많음.
|
||||
|
||||
## 📖 핵심 개념
|
||||
- Map: 입력 → key-value pairs.
|
||||
- Shuffle: key 별 grouping.
|
||||
- Reduce: key 당 집계.
|
||||
- Skew: hot key 가 worker 하나만 쥐어주면 느려짐.
|
||||
|
||||
## 💻 코드 패턴
|
||||
|
||||
### Conceptual MapReduce
|
||||
```
|
||||
입력: ['the cat sat', 'the dog ran']
|
||||
|
||||
Map → [('the', 1), ('cat', 1), ('sat', 1), ('the', 1), ('dog', 1), ('ran', 1)]
|
||||
|
||||
Shuffle → {'the': [1,1], 'cat': [1], 'sat': [1], 'dog': [1], 'ran': [1]}
|
||||
|
||||
Reduce → {'the': 2, 'cat': 1, 'sat': 1, 'dog': 1, 'ran': 1}
|
||||
```
|
||||
|
||||
### Spark (PySpark)
|
||||
```python
|
||||
from pyspark.sql import SparkSession
|
||||
|
||||
spark = SparkSession.builder.appName('wc').getOrCreate()
|
||||
|
||||
# RDD 식 (low-level)
|
||||
rdd = spark.sparkContext.textFile('s3://bucket/logs/*.txt')
|
||||
counts = (
|
||||
rdd
|
||||
.flatMap(lambda line: line.split())
|
||||
.map(lambda w: (w, 1))
|
||||
.reduceByKey(lambda a, b: a + b)
|
||||
)
|
||||
counts.saveAsTextFile('s3://bucket/output/')
|
||||
|
||||
# DataFrame (high-level, 권장)
|
||||
df = spark.read.text('s3://bucket/logs/*.txt')
|
||||
counts = (
|
||||
df.selectExpr('explode(split(value, " ")) as word')
|
||||
.groupBy('word').count()
|
||||
)
|
||||
counts.write.parquet('s3://bucket/output/')
|
||||
```
|
||||
|
||||
### Spark SQL (가장 간단)
|
||||
```python
|
||||
df.createOrReplaceTempView('logs')
|
||||
spark.sql("""
|
||||
SELECT word, COUNT(*) as cnt
|
||||
FROM logs LATERAL VIEW explode(split(value, ' ')) t AS word
|
||||
GROUP BY word
|
||||
ORDER BY cnt DESC
|
||||
LIMIT 100
|
||||
""").show()
|
||||
```
|
||||
|
||||
### DuckDB (single-node, large)
|
||||
```python
|
||||
import duckdb
|
||||
con = duckdb.connect()
|
||||
|
||||
# Parquet 직접 query
|
||||
result = con.execute("""
|
||||
SELECT date, region, SUM(amount) as total
|
||||
FROM 's3://bucket/sales/*.parquet'
|
||||
GROUP BY date, region
|
||||
ORDER BY total DESC
|
||||
""").fetchdf()
|
||||
```
|
||||
|
||||
→ TB 까지 single node OK. Spark 보다 simple, 빠름.
|
||||
|
||||
### Apache Beam (portable, runners)
|
||||
```python
|
||||
import apache_beam as beam
|
||||
|
||||
with beam.Pipeline() as p:
|
||||
(p
|
||||
| 'Read' >> beam.io.ReadFromText('gs://bucket/*.txt')
|
||||
| 'Split' >> beam.FlatMap(lambda line: line.split())
|
||||
| 'Pair' >> beam.Map(lambda w: (w, 1))
|
||||
| 'Group' >> beam.CombinePerKey(sum)
|
||||
| 'Write' >> beam.io.WriteToText('gs://bucket/out')
|
||||
)
|
||||
```
|
||||
|
||||
→ Beam = code + runner (Dataflow, Flink, Spark) 분리.
|
||||
|
||||
### Partitioning (parallelism)
|
||||
```python
|
||||
# Spark
|
||||
df.repartition(200, 'date') # 200 partition by date
|
||||
df.coalesce(10) # 줄임 (no shuffle)
|
||||
|
||||
# 큰 partition 적게 vs 작은 많이?
|
||||
# Aim: ~128 MB / partition (Spark default)
|
||||
```
|
||||
|
||||
### Shuffle (가장 비싼 operation)
|
||||
```
|
||||
GroupBy / Join / Distinct = shuffle.
|
||||
|
||||
Tips:
|
||||
- Pre-aggregate before shuffle (combiner)
|
||||
- Broadcast join (작은 table 모든 worker)
|
||||
- Bucket-aligned tables (sort-merge join, no shuffle)
|
||||
```
|
||||
|
||||
```python
|
||||
# Spark broadcast join
|
||||
from pyspark.sql.functions import broadcast
|
||||
|
||||
big.join(broadcast(small), 'key') # small 가 모든 worker 로 복제
|
||||
```
|
||||
|
||||
### Skew (불균형)
|
||||
```
|
||||
key 'X' 가 90% rows = worker 하나만 일.
|
||||
|
||||
해결:
|
||||
1. Salting: key + random suffix → 분산
|
||||
2. Skew join hint
|
||||
3. 작은 키 따로 처리
|
||||
|
||||
# Spark 3
|
||||
df.hint('skew', 'user_id')
|
||||
```
|
||||
|
||||
```python
|
||||
# Salting 예
|
||||
import random
|
||||
df = df.withColumn('salt', (rand() * 10).cast('int'))
|
||||
df.groupBy('key', 'salt').agg(...)
|
||||
.groupBy('key').agg(...) # 다시 합치기
|
||||
```
|
||||
|
||||
### File format
|
||||
```
|
||||
- Parquet: columnar, compress, predicate push-down (default)
|
||||
- ORC: 비슷
|
||||
- Avro: row-based + schema (Kafka)
|
||||
- CSV: 텍스트 — 큰 data 비효율
|
||||
- JSON: 큰 → 비효율
|
||||
|
||||
→ Analytics = Parquet 거의 항상.
|
||||
```
|
||||
|
||||
### Predicate pushdown
|
||||
```sql
|
||||
-- DuckDB / Spark
|
||||
SELECT * FROM 's3://b/*.parquet'
|
||||
WHERE date = '2026-05-09' -- 파일 metadata 로 skip
|
||||
AND region = 'US' -- column scan
|
||||
```
|
||||
|
||||
→ 안 읽음. 빠름.
|
||||
|
||||
### Iceberg / Delta / Hudi (table format)
|
||||
```python
|
||||
# Apache Iceberg
|
||||
spark.sql("""
|
||||
CREATE TABLE catalog.db.events (
|
||||
id bigint, ts timestamp, payload string
|
||||
) USING iceberg
|
||||
PARTITIONED BY (days(ts))
|
||||
""")
|
||||
|
||||
# Time travel
|
||||
spark.read.option('snapshot-id', '12345').table('catalog.db.events')
|
||||
|
||||
# Schema evolution
|
||||
spark.sql('ALTER TABLE catalog.db.events ADD COLUMN region string')
|
||||
```
|
||||
|
||||
→ Parquet 위 ACID + version + schema 진화.
|
||||
|
||||
### Ray (modern alternative)
|
||||
```python
|
||||
import ray
|
||||
|
||||
@ray.remote
|
||||
def process(chunk):
|
||||
return [x * 2 for x in chunk]
|
||||
|
||||
ray.init()
|
||||
data = list(range(10_000))
|
||||
chunks = [data[i:i+1000] for i in range(0, len(data), 1000)]
|
||||
|
||||
futures = [process.remote(c) for c in chunks]
|
||||
results = ray.get(futures)
|
||||
```
|
||||
|
||||
→ Spark 보다 일반 Python 친화. ML pipeline 에 강함.
|
||||
|
||||
### Polars (single-node, modern)
|
||||
```python
|
||||
import polars as pl
|
||||
|
||||
df = pl.scan_parquet('s3://bucket/*.parquet')
|
||||
result = (
|
||||
df
|
||||
.filter(pl.col('date') == '2026-05-09')
|
||||
.group_by('user_id')
|
||||
.agg(pl.col('amount').sum())
|
||||
.collect() # lazy → eager
|
||||
)
|
||||
```
|
||||
|
||||
→ Pandas 보다 10x 빠름 (Rust + Arrow).
|
||||
|
||||
### Dataflow patterns
|
||||
```
|
||||
- Batch: 큰 데이터, 한 번에 처리 (nightly job)
|
||||
- Streaming: 실시간 (click events, IoT)
|
||||
- Windowing: streaming → batch-like (1 분 window)
|
||||
- Watermark: late event 처리 시점
|
||||
```
|
||||
|
||||
### Beam streaming
|
||||
```python
|
||||
(p
|
||||
| beam.io.ReadFromKafka(...)
|
||||
| beam.WindowInto(beam.window.FixedWindows(60)) # 1 min
|
||||
| beam.GroupByKey()
|
||||
| beam.io.WriteToBigQuery(...)
|
||||
)
|
||||
```
|
||||
|
||||
### dbt (SQL-based ETL)
|
||||
```sql
|
||||
-- models/daily_revenue.sql
|
||||
{{ config(materialized='incremental') }}
|
||||
|
||||
SELECT date, SUM(amount) as revenue
|
||||
FROM {{ ref('orders') }}
|
||||
{% if is_incremental() %}
|
||||
WHERE date > (SELECT MAX(date) FROM {{ this }})
|
||||
{% endif %}
|
||||
GROUP BY date
|
||||
```
|
||||
|
||||
→ Spark / Python 안 써도 됨. SQL → DAG.
|
||||
|
||||
### ETL vs ELT
|
||||
```
|
||||
ETL (옛): Extract → Transform → Load to warehouse.
|
||||
ELT (현): Extract → Load (raw) → Transform in warehouse.
|
||||
|
||||
ELT = warehouse 가 SQL 강하니 거기서 변환. dbt + Snowflake / BigQuery / DuckDB 가 default.
|
||||
```
|
||||
|
||||
### Job orchestration
|
||||
```
|
||||
- Airflow: 가장 인기, 무거움
|
||||
- Dagster: 모던, asset-aware
|
||||
- Prefect: 모던, simple
|
||||
- Argo Workflows: K8s-native
|
||||
- Temporal: workflow + business logic
|
||||
- Cron: 작은 job
|
||||
```
|
||||
|
||||
### Cost
|
||||
```
|
||||
Spark on EMR: 큰 cluster $ — TB 안 넘으면 과해.
|
||||
DuckDB on single VM: TB 까지 OK $$.
|
||||
BigQuery: pay per GB scanned $$$.
|
||||
Snowflake: pay per second compute $$$.
|
||||
```
|
||||
|
||||
### When NOT 분산
|
||||
```
|
||||
< 100 GB: Pandas / Polars / DuckDB (single node).
|
||||
100 GB - 10 TB: DuckDB / Spark on 1 node.
|
||||
10 TB+: Spark / BigQuery / Snowflake cluster.
|
||||
|
||||
→ "Big data is dead" — 대부분 single node 로 충분.
|
||||
```
|
||||
|
||||
## 🤔 의사결정 기준
|
||||
| Size | 추천 |
|
||||
|---|---|
|
||||
| < 1 GB | Pandas / Polars |
|
||||
| 1-100 GB | Polars / DuckDB |
|
||||
| 100 GB - 10 TB | DuckDB on big VM |
|
||||
| > 10 TB | Spark / BigQuery |
|
||||
| Streaming | Beam / Flink / Materialize |
|
||||
| ML pipeline | Ray |
|
||||
| SQL preferable | dbt + warehouse |
|
||||
|
||||
## ❌ 안티패턴
|
||||
- **모든 거 Spark**: 작은 dataset 도 Spark = 느림 + 비싼.
|
||||
- **CSV in production**: parquet 가 10x 빠름.
|
||||
- **Repartition 너무 많이**: shuffle 비싼.
|
||||
- **Skew 무시**: 1 worker 가 다 함.
|
||||
- **Broadcast 큰 table**: OOM.
|
||||
- **Local file**: HDFS / S3 / GCS.
|
||||
- **dbt 없이 SQL 흩어짐**: 종속성 안 보임.
|
||||
|
||||
## 🤖 LLM 활용 힌트
|
||||
- Map / Shuffle / Reduce 의 cost 인지.
|
||||
- DuckDB / Polars 가 modern (single node 만으로 충분).
|
||||
- Parquet + S3 표준.
|
||||
- dbt 가 SQL workflow 답.
|
||||
|
||||
## 🔗 관련 문서
|
||||
- [[Data_Eng_Lakehouse]]
|
||||
- [[Data_Eng_dbt]]
|
||||
- [[DB_DuckDB_Embedded]]
|
||||
Reference in New Issue
Block a user