--- id: wiki-2026-0508-message-queues-and-event-streams title: Message Queues and Event Streams category: 10_Wiki/Topics status: verified canonical_id: self aliases: [Queue vs Stream, MQ vs Streaming] duplicate_of: none source_trust_level: A confidence_score: 0.9 verification_status: applied tags: [messaging, queues, streams, kafka, sqs] raw_sources: [] last_reinforced: 2026-05-10 github_commit: pending tech_stack: language: multi framework: kafka-sqs-rabbitmq --- # Message Queues and Event Streams ## 매 한 줄 > **"매 Queue 는 work 를 distribute, 매 Stream 은 history 를 record."**. Queue 와 stream 은 매 둘 다 producer-consumer 를 decouple 하지만 매 mental model 이 다르다 — 매 queue 는 "처리할 일" (소비 후 사라짐), 매 stream 은 "발생한 사건" (immutable log, 매 replay 가능). 매 modern system 은 둘을 매 함께 사용. ## 매 핵심 ### 매 Queue 의 본질 - **Consume = delete** (또는 ack 후 hidden). - 매 1 message → 매 1 worker (work distribution). - 매 short retention (ack 후 사라짐). - 매 Examples: SQS, RabbitMQ classic, Redis list (BRPOP). ### 매 Stream 의 본질 - **Append-only immutable log**. - 매 모든 consumer 가 매 모든 event 읽음 (각자 offset). - 매 long retention (days~weeks~forever). - 매 replay 가능. - 매 Examples: Kafka, Kinesis, Pulsar, Redis Streams. ### 매 비교 | 측면 | Queue | Stream | |---|---|---| | Consume | destructive | non-destructive | | Retention | 짧음 (ack 후) | 긺 (time/size 기반) | | Replay | X | O | | Ordering | 약함 (per-queue) | 강함 (per-partition) | | Throughput | 중 | 매 high (millions/sec) | | Use case | task distribution | event sourcing, analytics, ML | ### 매 함께 쓰기 - 매 Stream → consumer 가 매 work item 추출 → 매 Queue 에 push (workflow orchestration). - 매 Queue 에서 매 ack 후 매 audit event 를 매 stream 에 publish. ## 💻 패턴 ### Queue: SQS worker (Node.js) ```javascript import { SQSClient, ReceiveMessageCommand, DeleteMessageCommand } from '@aws-sdk/client-sqs'; const sqs = new SQSClient({}); const QueueUrl = process.env.QUEUE_URL; while (true) { const { Messages } = await sqs.send(new ReceiveMessageCommand({ QueueUrl, MaxNumberOfMessages: 10, WaitTimeSeconds: 20, // long polling VisibilityTimeout: 60, })); if (!Messages) continue; for (const msg of Messages) { try { await processJob(JSON.parse(msg.Body)); await sqs.send(new DeleteMessageCommand({ QueueUrl, ReceiptHandle: msg.ReceiptHandle })); } catch (e) { // Don't delete → message becomes visible again after VisibilityTimeout → retry → DLQ after maxReceiveCount } } } ``` ### Stream: Kafka consumer with offset (Node.js) ```javascript import { Kafka } from 'kafkajs'; const consumer = new Kafka({ brokers: ['kafka:9092'] }).consumer({ groupId: 'analytics-v2', }); await consumer.subscribe({ topic: 'user-events', fromBeginning: true }); // fromBeginning: true → re-read entire history (replay) await consumer.run({ eachMessage: async ({ topic, partition, message }) => { const event = JSON.parse(message.value.toString()); await projectIntoDB(event); // Offset auto-committed (or manual via heartbeat) }, }); ``` ### Stream: replay from specific offset ```javascript // Reset consumer group to specific offset (admin operation) import { Kafka } from 'kafkajs'; const admin = new Kafka({ brokers: ['kafka:9092'] }).admin(); await admin.connect(); await admin.resetOffsets({ groupId: 'analytics-v2', topic: 'user-events', earliest: true, // or specific offset / timestamp }); // Next consumer.run() will re-read from beginning ``` ### Redis Streams (XADD / XREADGROUP) ```javascript import Redis from 'ioredis'; const redis = new Redis(); // Producer await redis.xadd('events', '*', 'type', 'order.created', 'data', JSON.stringify(order)); // Consumer group await redis.xgroup('CREATE', 'events', 'workers', '$', 'MKSTREAM').catch(() => {}); // Consume while (true) { const res = await redis.xreadgroup( 'GROUP', 'workers', 'worker-1', 'COUNT', 10, 'BLOCK', 5000, 'STREAMS', 'events', '>' ); if (!res) continue; for (const [, entries] of res) { for (const [id, fields] of entries) { await process(fields); await redis.xack('events', 'workers', id); } } } ``` ### Outbox → Stream (CDC pattern) ```yaml # Debezium reads DB WAL → publishes to Kafka topic # → other services subscribe to data changes without polling connector: debezium-postgres config: database.hostname: db database.dbname: orders table.include.list: public.orders, public.outbox topic.prefix: cdc # → cdc.public.orders, cdc.public.outbox topics created ``` ### Choice: 매 task vs event ``` Q1: "결과를 매 누가 처리하면 끝나나?" → Queue (1 worker) Q2: "여러 service 가 매 동일 event 에 react?" → Stream (fan-out) Q3: "역사를 replay 해야?" → Stream Q4: "Order 가 strict (per-key)?" → Stream (partition by key) Q5: "단순 background job?" → Queue ``` ## 매 결정 기준 | 요구 | 선택 | |---|---| | Background job (이메일, 썸네일) | Queue (SQS, BullMQ) | | Event sourcing / CDC | Stream (Kafka) | | Real-time analytics | Stream (Kafka, Kinesis) | | 매 strict ordering per user | Stream + partition key | | Pub/Sub broadcast | Stream 또는 Pub/Sub | | Workflow orchestration | Queue + worker (Temporal, Step Functions) | | 매 audit log | Stream (immutable retention) | **기본값**: 매 task 면 queue, 매 event 면 stream. 매 둘 다 필요하면 매 함께 사용. ## 🔗 Graph - 부모: [[Message Broker]] · [[Distributed Systems]] - 변형: [[Kafka]] · [[SQS]] - 응용: [[Event Sourcing]] · [[CQRS]] · [[CDC]] - Adjacent: [[Dead Letter Queue]] ## 🤖 LLM 활용 **언제**: messaging architecture 결정, queue vs stream 선택, replay 요구사항 평가. **언제 X**: 매 simple sync RPC 로 충분한 매 internal call. ## ❌ 안티패턴 - **Stream 을 queue 로**: 매 short retention 만 쓰면 매 stream 의 강점 (replay) 못 살림. - **Queue 로 broadcast**: 매 fan-out 에 queue → 매 consumer 마다 별도 queue 만들어야 → 매 stream 이 정답. - **Partition key 무시**: 매 stream 에서 매 ordering 필요한데 매 random key → 매 race condition. - **Retention infinity**: 매 cost 폭주 — 매 compaction / time-based 설정. ## 🧪 검증 / 중복 - Verified (Kafka docs, AWS SQS/Kinesis docs, Confluent blog, Martin Kleppmann "DDIA" Ch 11). - 신뢰도 A. ## 🕓 Changelog | 날짜 | 변경 | |---|---| | 2026-05-08 | Phase 1 | | 2026-05-10 | Manual cleanup — queue vs stream 비교 + SQS/Kafka/Redis Streams 패턴 |