Files
2nd/10_Wiki/Topics/Coding/Data_Eng_Streaming_ETL.md
T
2026-05-09 21:08:02 +09:00

7.2 KiB

id, title, category, status, source_trust_level, verification_status, created_at, updated_at, tags, tech_stack, applied_in, aliases
id title category status source_trust_level verification_status created_at updated_at tags tech_stack applied_in aliases
data-eng-streaming-etl Streaming ETL — Flink / Spark Structured / Materialize Coding draft B conceptual 2026-05-09 2026-05-09
data-engineering
streaming
flink
kafka
vibe-coding
language applicable_to
Scala / Java / Python / SQL
Data Engineering
Apache Flink
Spark Structured Streaming
Materialize
RisingWave
exactly-once
watermark

Streaming ETL

Real-time data — batch ETL 의 stream 버전. Flink (가장 강력), Spark Structured (배치 친화), Materialize / RisingWave (Postgres-like SQL). Window + watermark + state.

📖 핵심 개념

  • Event time vs processing time.
  • Watermark: late event 처리 boundary.
  • Window: tumbling / hopping / session.
  • Exactly-once: stateful + checkpoint.

💻 코드 패턴

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.enableCheckpointing(60_000)  // 60s

val orders = env
  .fromSource(KafkaSource.builder()
    .setBootstrapServers("kafka:9092")
    .setTopics("orders")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new OrderSchema())
    .build(), WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(5)), "orders")

val hourly = orders
  .keyBy(_.userId)
  .window(TumblingEventTimeWindows.of(Time.hours(1)))
  .aggregate(new SumAggregator)

hourly.sinkTo(new IcebergSink(...))
env.execute("hourly aggregation")
CREATE TABLE orders (
    id STRING,
    user_id STRING,
    amount DECIMAL(10, 2),
    event_time TIMESTAMP(3),
    WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND
) WITH (
    'connector' = 'kafka',
    'topic' = 'orders',
    'properties.bootstrap.servers' = 'kafka:9092',
    'format' = 'json'
);

CREATE TABLE hourly_revenue (
    user_id STRING,
    window_start TIMESTAMP(3),
    revenue DECIMAL(20, 2),
    PRIMARY KEY (user_id, window_start) NOT ENFORCED
) WITH (
    'connector' = 'iceberg',
    ...
);

INSERT INTO hourly_revenue
SELECT user_id, TUMBLE_START(event_time, INTERVAL '1' HOUR) AS window_start,
       SUM(amount)
FROM orders
GROUP BY user_id, TUMBLE(event_time, INTERVAL '1' HOUR);

Window types

-- Tumbling (non-overlap)
TUMBLE(event_time, INTERVAL '1' HOUR)

-- Hopping (overlap)
HOP(event_time, INTERVAL '5' MINUTES, INTERVAL '1' HOUR)
-- 5분 마다 새 1시간 window

-- Session (gap-based)
SESSION(event_time, INTERVAL '30' MINUTES)
-- 30분 idle = window end

State + side output

class FraudDetector extends KeyedProcessFunction[String, Order, Alert] {
  lazy val state = getRuntimeContext.getState(new ValueStateDescriptor("count", classOf[Int]))
  
  override def processElement(o: Order, ctx: Context, out: Collector[Alert]): Unit = {
    val cur = state.value()
    if (cur > 10 && o.amount > 1000) {
      out.collect(Alert(o.userId, "suspicious"))
    }
    state.update(cur + 1)
  }
}

Spark Structured Streaming

val df = spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "kafka:9092")
  .option("subscribe", "orders")
  .load()

val parsed = df.selectExpr("CAST(value AS STRING)")
  .select(from_json($"value", schema).as("d"))
  .select("d.*")
  .withWatermark("event_time", "5 seconds")

val agg = parsed
  .groupBy(window($"event_time", "1 hour"), $"user_id")
  .agg(sum("amount").as("revenue"))

agg.writeStream
  .outputMode("append")
  .format("iceberg")
  .option("path", "warehouse.db.hourly")
  .option("checkpointLocation", "s3://checkpoints/")
  .trigger(Trigger.ProcessingTime("1 minute"))
  .start()
  .awaitTermination()

Materialize (Postgres-style streaming SQL)

CREATE SOURCE orders
FROM KAFKA BROKER 'kafka:9092' TOPIC 'orders'
FORMAT BYTES;

CREATE MATERIALIZED VIEW hourly AS
SELECT user_id, date_trunc('hour', event_time) AS hour,
       SUM(amount) AS revenue
FROM orders
GROUP BY user_id, hour;

-- 일반 SELECT — 항상 fresh
SELECT * FROM hourly WHERE user_id = 'u1';

→ Postgres 처럼 사용. 안 작은 latency.

RisingWave (modern, postgres-compat)

CREATE SOURCE orders WITH (connector='kafka', topic='orders', ...);
CREATE MATERIALIZED VIEW hourly AS SELECT ...;
// 일반 PG client
const client = new pg.Client({ host: 'risingwave', port: 4566 });
await client.query('SELECT * FROM hourly');

Late event handling

-- Allowed lateness (window 닫힌 후 N 분 더 기다림)
GROUP BY TUMBLE(event_time, INTERVAL '1' HOUR), GROUP BY ...
-- Watermark 가 1시간 이상 늦은 이벤트 = drop
// Allowed lateness
.window(...)
.allowedLateness(Time.minutes(10))
.sideOutputLateData(lateTag)

→ 늦은 이벤트 별 sink.

Checkpoint (exactly-once)

env.enableCheckpointing(60_000)
env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
env.getCheckpointConfig.setCheckpointStorage("s3://checkpoints/")

→ Failure 시 last checkpoint 부터 resume.

Backpressure / scaling

Flink:
- Parallelism per operator
- Auto-scaling (Reactive mode 또는 K8s operator)

Spark:
- Adaptive query execution
- Dynamic allocation

Topology

Kafka source ──▶ Filter ──▶ Window aggregate ──▶ Sink (Iceberg / Postgres / Kafka)
                                                  │
                                                  └─▶ Alert (side output)

Operational concerns

- State backend (RocksDB on disk vs in-memory)
- Checkpoint frequency (1m typical)
- Savepoint (manual snapshot for upgrade)
- Backfill historical data
- Schema evolution (Avro / Protobuf)

Test

// Flink — DataStream test harness
val testEnv = StreamExecutionEnvironment.getExecutionEnvironment
testEnv.fromCollection(testData)
  .keyBy(...).window(...).aggregate(...)
  .addSink(new TestSink)

// Materialize / RisingWave = pure SQL, regular pg test
CREATE TABLE orders_cdc (
    id STRING,
    user_id STRING,
    amount DECIMAL(10, 2),
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'kafka',
    'topic' = 'pg.public.orders',
    'value.format' = 'debezium-json',
    ...
);

INSERT INTO iceberg.warehouse.orders SELECT * FROM orders_cdc;

🤔 의사결정 기준

상황 추천
강력 stream + 큰 state Flink
Spark 사용 중 + 통합 Spark Structured
Postgres-like SQL Materialize / RisingWave
단순 (Kafka → DB) Kafka Connect
작은 / Quick DuckDB + cron
Real-time analytics Materialize 또는 ClickHouse + Kafka

안티패턴

  • Watermark 없음: late event 무한 — window 안 닫힘.
  • State 무한 자라남: TTL.
  • Checkpoint 짧음 (1초): 큰 cost. 30s-1min.
  • Backpressure 무시: lag 폭발.
  • Schema breaking: 호환성 정책.
  • Test 없음: 복잡 — bug.
  • Out-of-order 무시: late event 잃음.

🤖 LLM 활용 힌트

  • Flink SQL = 가장 단순.
  • Watermark + window + state TTL.
  • Materialize / RisingWave = Postgres 사용자 친화.

🔗 관련 문서