Files
2nd/10_Wiki/Topics/Coding/DB_Materialize_Streaming_SQL.md
T
2026-05-09 21:08:02 +09:00

7.1 KiB

id, title, category, status, source_trust_level, verification_status, created_at, updated_at, tags, tech_stack, applied_in, aliases
id title category status source_trust_level verification_status created_at updated_at tags tech_stack applied_in aliases
db-materialize-streaming-sql Materialize / RisingWave — Streaming Materialized View Coding draft B conceptual 2026-05-09 2026-05-09
database
materialize
streaming
vibe-coding
language applicable_to
SQL
Backend
Materialize
RisingWave
streaming materialized view
incremental view
dataflow

Materialize / RisingWave

Postgres-compatible streaming DB. CREATE MATERIALIZED VIEW + 실시간 incremental update. Kafka / CDC → SQL view → 항상 fresh. Redis cache + manual aggregation 의 대안.

📖 핵심 개념

  • Streaming MV: 새 data 도착 → 자동 incremental update.
  • Postgres wire: psql / 일반 client.
  • Source: Kafka / Postgres CDC / S3.
  • Sink: Kafka / Postgres.

💻 코드 패턴

Materialize 설치

docker run -d -p 6875:6875 -p 6876:6876 materialize/materialized

# 연결 (psql 호환)
psql -U materialize -h localhost -p 6875 materialize

Source (Kafka)

CREATE CONNECTION kafka_conn TO KAFKA (BROKER 'kafka:9092');
CREATE CONNECTION csr_conn TO CONFLUENT SCHEMA REGISTRY (URL 'http://schema-registry:8081');

CREATE SOURCE orders
FROM KAFKA CONNECTION kafka_conn (TOPIC 'orders')
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn
ENVELOPE NONE;

Materialized view

CREATE MATERIALIZED VIEW hourly_revenue AS
SELECT
    user_id,
    date_trunc('hour', created_at) AS hour,
    SUM(amount) AS revenue,
    COUNT(*) AS order_count
FROM orders
GROUP BY user_id, hour;

-- 일반 SELECT — 항상 fresh
SELECT * FROM hourly_revenue WHERE user_id = 'u1' ORDER BY hour DESC;

→ Stream update 시 view 자동 업데이트. ms 단위.

Postgres CDC source

CREATE SOURCE orders_pg
FROM POSTGRES CONNECTION pg_conn (PUBLICATION 'app_pub')
FOR TABLES (orders, users);

→ Postgres 의 변경이 Materialize 로 stream.

Join across sources

CREATE MATERIALIZED VIEW user_orders AS
SELECT
    u.id AS user_id,
    u.email,
    o.id AS order_id,
    o.amount,
    o.created_at
FROM users u
JOIN orders o ON o.user_id = u.id;

→ 매 새 order 또는 user 변경 = 자동 join 결과 update.

TUMBLE / window

CREATE MATERIALIZED VIEW orders_5min AS
SELECT
    window_start,
    window_end,
    COUNT(*) AS count
FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR(created_at), INTERVAL '5 MINUTES'))
GROUP BY window_start, window_end;

Sink (back to Kafka)

CREATE SINK orders_alerts
FROM (SELECT * FROM orders WHERE amount > 10000)
INTO KAFKA CONNECTION kafka_conn (TOPIC 'high_value_orders')
FORMAT JSON
ENVELOPE UPSERT;

→ 큰 order 발생 → Kafka 로 자동 publish.

Subscribe (real-time push to client)

// Materialize SUBSCRIBE = WebSocket 같은 stream
import postgres from 'postgres';
const sql = postgres('postgresql://materialize@localhost:6875/materialize');

await sql`
  COPY (SUBSCRIBE TO hourly_revenue) TO STDOUT
`.cursor(100, async (rows) => {
  for (const row of rows) {
    // mz_timestamp, mz_diff, ...row data
    if (row.mz_diff > 0) onAdd(row);
    else onRemove(row);
  }
});

→ Server-side push without polling.

RisingWave (alternative, FOSS)

-- Postgres 호환 — 거의 동일
CREATE SOURCE orders WITH (
    connector = 'kafka',
    topic = 'orders',
    properties.bootstrap.server = 'kafka:9092',
    format = 'JSON'
);

CREATE MATERIALIZED VIEW hourly_revenue AS
SELECT user_id, hour, SUM(amount) AS revenue
FROM (
    SELECT user_id, amount, date_trunc('hour', created_at) AS hour FROM orders
) GROUP BY user_id, hour;
// 일반 PG client
const r = await sql`SELECT * FROM hourly_revenue WHERE user_id = 'u1'`;

Use case 1: Real-time dashboard

CREATE MATERIALIZED VIEW dashboard AS
SELECT
    COUNT(*) AS total_orders_today,
    SUM(amount) AS revenue_today,
    AVG(amount) AS avg_order_value
FROM orders
WHERE created_at >= today();
// Frontend polls every 5s — 또는 SUBSCRIBE
setInterval(async () => {
  const stats = await api.getDashboard();
  update(stats);
}, 5000);

→ Postgres 가 매번 aggregate vs Materialize 가 미리 계산.

Use case 2: Cache invalidation

-- 사용자 totals
CREATE MATERIALIZED VIEW user_totals AS
SELECT user_id, SUM(amount) AS total FROM orders GROUP BY user_id;

-- App
const total = await sql`SELECT total FROM user_totals WHERE user_id = ${id}`;

→ Redis cache + manual invalidation 대신 — 자동 fresh.

Use case 3: Fraud detection

CREATE MATERIALIZED VIEW suspicious_users AS
SELECT user_id, COUNT(*) AS attempts, COUNT(DISTINCT card_last4) AS cards
FROM payments
WHERE created_at >= NOW() - INTERVAL '5 minutes'
GROUP BY user_id
HAVING COUNT(DISTINCT card_last4) > 5;

-- Sink → alert system
CREATE SINK fraud_alerts FROM (SELECT * FROM suspicious_users) INTO KAFKA ...;

Performance

View 가 작은 footprint = 빠름 (memory 안).
큰 join — 메모리 / state 큼.
Index 자동 (groupby key).

Limitations

Materialize:
- 비싸 (managed)
- 일부 PG function X
- 큰 state = OOM 가능

RisingWave:
+ FOSS, free
+ 큰 cluster scale
- 더 새로움

Hot vs cold query

-- "Hot" — frequent query → MV
CREATE MATERIALIZED VIEW user_recent AS ...;

-- "Cold" — ad-hoc → 일반 SELECT
SELECT * FROM orders WHERE ...;

→ MV 너무 많으면 → memory.

Operational

- Source connection 모니터링
- Lag (source → view)
- Memory usage
- Index build time on schema change

Replication (Materialize → Postgres)

CREATE SINK pg_sink
FROM (SELECT * FROM hourly_revenue)
INTO POSTGRES CONNECTION pg_conn (
    DATABASE 'analytics',
    SCHEMA 'public',
    TABLE 'hourly_revenue'
);

→ App 가 일반 Postgres 사용 + Materialize 가 fresh data sync.

dbt + Materialize

# dbt-materialize adapter
profiles:
  default:
    target: dev
    outputs:
      dev:
        type: materialize
        host: localhost
        port: 6875
        user: materialize
-- model
{{ config(materialized='materializedview') }}
SELECT user_id, SUM(amount) FROM {{ source('public', 'orders') }} GROUP BY user_id

🤔 의사결정 기준

상황 추천
Real-time dashboard Materialize / RisingWave
캐시 + 자동 invalidate Materialize
큰 streaming aggregations Flink / Spark
단순 polling 충분 Postgres + cron MV
Postgres 사용자 친화 Materialize / RisingWave
Self-host / cost RisingWave
Managed / production Materialize Cloud

안티패턴

  • 모든 query MV: memory 폭발.
  • State 큰 view: OOM.
  • MV 가 single source of truth: 영속 X — sink 로 export.
  • Source schema 변경: MV 재빌드 (느림).
  • Subscribe 매 row poll: server load. batch.
  • Materialize 가 OLTP: read 만.

🤖 LLM 활용 힌트

  • 캐시 + 수동 invalidation 대체.
  • 일반 PG client = 가까운 dev 경험.
  • Sink 로 외부 system 연결.
  • Hot query 만 MV.

🔗 관련 문서