--- id: db-materialize-streaming-sql title: Materialize / RisingWave — Streaming Materialized View category: Coding status: draft source_trust_level: B verification_status: conceptual created_at: 2026-05-09 updated_at: 2026-05-09 tags: [database, materialize, streaming, vibe-coding] tech_stack: { language: "SQL", applicable_to: ["Backend"] } applied_in: [] aliases: [Materialize, RisingWave, streaming materialized view, incremental view, dataflow] --- # Materialize / RisingWave > Postgres-compatible streaming DB. **CREATE MATERIALIZED VIEW + 실시간 incremental update**. Kafka / CDC → SQL view → 항상 fresh. Redis cache + manual aggregation 의 대안. ## 📖 핵심 개념 - Streaming MV: 새 data 도착 → 자동 incremental update. - Postgres wire: psql / 일반 client. - Source: Kafka / Postgres CDC / S3. - Sink: Kafka / Postgres. ## 💻 코드 패턴 ### Materialize 설치 ```bash docker run -d -p 6875:6875 -p 6876:6876 materialize/materialized # 연결 (psql 호환) psql -U materialize -h localhost -p 6875 materialize ``` ### Source (Kafka) ```sql CREATE CONNECTION kafka_conn TO KAFKA (BROKER 'kafka:9092'); CREATE CONNECTION csr_conn TO CONFLUENT SCHEMA REGISTRY (URL 'http://schema-registry:8081'); CREATE SOURCE orders FROM KAFKA CONNECTION kafka_conn (TOPIC 'orders') FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY CONNECTION csr_conn ENVELOPE NONE; ``` ### Materialized view ```sql CREATE MATERIALIZED VIEW hourly_revenue AS SELECT user_id, date_trunc('hour', created_at) AS hour, SUM(amount) AS revenue, COUNT(*) AS order_count FROM orders GROUP BY user_id, hour; -- 일반 SELECT — 항상 fresh SELECT * FROM hourly_revenue WHERE user_id = 'u1' ORDER BY hour DESC; ``` → Stream update 시 view 자동 업데이트. ms 단위. ### Postgres CDC source ```sql CREATE SOURCE orders_pg FROM POSTGRES CONNECTION pg_conn (PUBLICATION 'app_pub') FOR TABLES (orders, users); ``` → Postgres 의 변경이 Materialize 로 stream. ### Join across sources ```sql CREATE MATERIALIZED VIEW user_orders AS SELECT u.id AS user_id, u.email, o.id AS order_id, o.amount, o.created_at FROM users u JOIN orders o ON o.user_id = u.id; ``` → 매 새 order 또는 user 변경 = 자동 join 결과 update. ### TUMBLE / window ```sql CREATE MATERIALIZED VIEW orders_5min AS SELECT window_start, window_end, COUNT(*) AS count FROM TABLE(TUMBLE(TABLE orders, DESCRIPTOR(created_at), INTERVAL '5 MINUTES')) GROUP BY window_start, window_end; ``` ### Sink (back to Kafka) ```sql CREATE SINK orders_alerts FROM (SELECT * FROM orders WHERE amount > 10000) INTO KAFKA CONNECTION kafka_conn (TOPIC 'high_value_orders') FORMAT JSON ENVELOPE UPSERT; ``` → 큰 order 발생 → Kafka 로 자동 publish. ### Subscribe (real-time push to client) ```ts // Materialize SUBSCRIBE = WebSocket 같은 stream import postgres from 'postgres'; const sql = postgres('postgresql://materialize@localhost:6875/materialize'); await sql` COPY (SUBSCRIBE TO hourly_revenue) TO STDOUT `.cursor(100, async (rows) => { for (const row of rows) { // mz_timestamp, mz_diff, ...row data if (row.mz_diff > 0) onAdd(row); else onRemove(row); } }); ``` → Server-side push without polling. ### RisingWave (alternative, FOSS) ```sql -- Postgres 호환 — 거의 동일 CREATE SOURCE orders WITH ( connector = 'kafka', topic = 'orders', properties.bootstrap.server = 'kafka:9092', format = 'JSON' ); CREATE MATERIALIZED VIEW hourly_revenue AS SELECT user_id, hour, SUM(amount) AS revenue FROM ( SELECT user_id, amount, date_trunc('hour', created_at) AS hour FROM orders ) GROUP BY user_id, hour; ``` ```ts // 일반 PG client const r = await sql`SELECT * FROM hourly_revenue WHERE user_id = 'u1'`; ``` ### Use case 1: Real-time dashboard ```sql CREATE MATERIALIZED VIEW dashboard AS SELECT COUNT(*) AS total_orders_today, SUM(amount) AS revenue_today, AVG(amount) AS avg_order_value FROM orders WHERE created_at >= today(); ``` ```ts // Frontend polls every 5s — 또는 SUBSCRIBE setInterval(async () => { const stats = await api.getDashboard(); update(stats); }, 5000); ``` → Postgres 가 매번 aggregate vs Materialize 가 미리 계산. ### Use case 2: Cache invalidation ```sql -- 사용자 totals CREATE MATERIALIZED VIEW user_totals AS SELECT user_id, SUM(amount) AS total FROM orders GROUP BY user_id; -- App const total = await sql`SELECT total FROM user_totals WHERE user_id = ${id}`; ``` → Redis cache + manual invalidation 대신 — 자동 fresh. ### Use case 3: Fraud detection ```sql CREATE MATERIALIZED VIEW suspicious_users AS SELECT user_id, COUNT(*) AS attempts, COUNT(DISTINCT card_last4) AS cards FROM payments WHERE created_at >= NOW() - INTERVAL '5 minutes' GROUP BY user_id HAVING COUNT(DISTINCT card_last4) > 5; -- Sink → alert system CREATE SINK fraud_alerts FROM (SELECT * FROM suspicious_users) INTO KAFKA ...; ``` ### Performance ``` View 가 작은 footprint = 빠름 (memory 안). 큰 join — 메모리 / state 큼. Index 자동 (groupby key). ``` ### Limitations ``` Materialize: - 비싸 (managed) - 일부 PG function X - 큰 state = OOM 가능 RisingWave: + FOSS, free + 큰 cluster scale - 더 새로움 ``` ### Hot vs cold query ```sql -- "Hot" — frequent query → MV CREATE MATERIALIZED VIEW user_recent AS ...; -- "Cold" — ad-hoc → 일반 SELECT SELECT * FROM orders WHERE ...; ``` → MV 너무 많으면 → memory. ### Operational ``` - Source connection 모니터링 - Lag (source → view) - Memory usage - Index build time on schema change ``` ### Replication (Materialize → Postgres) ```sql CREATE SINK pg_sink FROM (SELECT * FROM hourly_revenue) INTO POSTGRES CONNECTION pg_conn ( DATABASE 'analytics', SCHEMA 'public', TABLE 'hourly_revenue' ); ``` → App 가 일반 Postgres 사용 + Materialize 가 fresh data sync. ### dbt + Materialize ```yaml # dbt-materialize adapter profiles: default: target: dev outputs: dev: type: materialize host: localhost port: 6875 user: materialize ``` ```sql -- model {{ config(materialized='materializedview') }} SELECT user_id, SUM(amount) FROM {{ source('public', 'orders') }} GROUP BY user_id ``` ## 🤔 의사결정 기준 | 상황 | 추천 | |---|---| | Real-time dashboard | Materialize / RisingWave | | 캐시 + 자동 invalidate | Materialize | | 큰 streaming aggregations | Flink / Spark | | 단순 polling 충분 | Postgres + cron MV | | Postgres 사용자 친화 | Materialize / RisingWave | | Self-host / cost | RisingWave | | Managed / production | Materialize Cloud | ## ❌ 안티패턴 - **모든 query MV**: memory 폭발. - **State 큰 view**: OOM. - **MV 가 single source of truth**: 영속 X — sink 로 export. - **Source schema 변경**: MV 재빌드 (느림). - **Subscribe 매 row poll**: server load. batch. - **Materialize 가 OLTP**: read 만. ## 🤖 LLM 활용 힌트 - 캐시 + 수동 invalidation 대체. - 일반 PG client = 가까운 dev 경험. - Sink 로 외부 system 연결. - Hot query 만 MV. ## 🔗 관련 문서 - [[Data_Eng_Streaming_ETL]] - [[DB_Change_Data_Capture]] - [[Backend_Outbox_Pattern]]