288 lines
7.6 KiB
Markdown
288 lines
7.6 KiB
Markdown
---
|
|
id: backend-idempotent-consumer
|
|
title: Idempotent Consumer — At-least-once → Exactly-once
|
|
category: Coding
|
|
status: draft
|
|
source_trust_level: B
|
|
verification_status: conceptual
|
|
created_at: 2026-05-09
|
|
updated_at: 2026-05-09
|
|
tags: [backend, idempotent, consumer, vibe-coding]
|
|
tech_stack: { language: "TS / SQL", applicable_to: ["Backend"] }
|
|
applied_in: []
|
|
aliases: [idempotent consumer, dedupe table, processed events, exactly-once delivery]
|
|
---
|
|
|
|
# Idempotent Consumer
|
|
|
|
> Message broker = at-least-once. **같은 메시지 두 번 처리 = 중복 사고**. Idempotent consumer = effectively-once. Dedupe table + transactional handling.
|
|
|
|
## 📖 핵심 개념
|
|
- Event ID: producer 가 생성, 고유.
|
|
- Processed table: 이미 처리한 ID list.
|
|
- Atomic: handle + insert ID 한 트랜잭션.
|
|
- TTL: 옛 ID 정리.
|
|
|
|
## 💻 코드 패턴
|
|
|
|
### Dedupe table
|
|
```sql
|
|
CREATE TABLE processed_events (
|
|
event_id TEXT PRIMARY KEY,
|
|
processed_at TIMESTAMPTZ DEFAULT NOW(),
|
|
consumer TEXT NOT NULL
|
|
);
|
|
|
|
CREATE INDEX processed_events_processed_at ON processed_events(processed_at);
|
|
```
|
|
|
|
### Consume + atomic
|
|
```ts
|
|
async function consume(msg: Message) {
|
|
const eventId = msg.headers['x-event-id'];
|
|
if (!eventId) throw new Error('event-id required');
|
|
|
|
return await db.transaction(async (tx) => {
|
|
// 1. Try insert event id (unique constraint = 중복 검출)
|
|
try {
|
|
await tx.processedEvents.insert({
|
|
eventId,
|
|
consumer: 'order-projector',
|
|
});
|
|
} catch (e) {
|
|
if (isUniqueViolation(e)) {
|
|
// 이미 처리 — skip
|
|
return { skipped: true };
|
|
}
|
|
throw e;
|
|
}
|
|
|
|
// 2. 실제 작업
|
|
await handleInTx(tx, msg);
|
|
|
|
return { processed: true };
|
|
});
|
|
}
|
|
```
|
|
|
|
→ DB 트랜잭션 = atomic. Crash 시 둘 다 X.
|
|
|
|
### Per-consumer dedupe (다른 consumer 가 같은 event 처리)
|
|
```sql
|
|
ALTER TABLE processed_events ADD CONSTRAINT processed_events_pk PRIMARY KEY (event_id, consumer);
|
|
```
|
|
|
|
→ Order-projector + Email-sender 가 둘 다 같은 OrderPlaced 처리.
|
|
|
|
### TTL cleanup
|
|
```sql
|
|
-- 30일 이상 된 events 삭제
|
|
DELETE FROM processed_events WHERE processed_at < NOW() - INTERVAL '30 days';
|
|
```
|
|
|
|
```ts
|
|
// Cron
|
|
async function cleanupProcessedEvents() {
|
|
await db.processedEvents.delete({
|
|
where: { processedAt: { lt: subDays(new Date(), 30) } },
|
|
});
|
|
}
|
|
```
|
|
|
|
→ Producer 재시도 윈도우보다 길게.
|
|
|
|
### Redis-based (빠른)
|
|
```ts
|
|
async function consume(msg: Message) {
|
|
const eventId = msg.headers['x-event-id'];
|
|
|
|
// SETNX — 중복 시 false
|
|
const ok = await redis.set(`processed:${eventId}`, '1', 'EX', 30 * 24 * 3600, 'NX');
|
|
if (!ok) return { skipped: true };
|
|
|
|
try {
|
|
await handle(msg);
|
|
return { processed: true };
|
|
} catch (e) {
|
|
// 처리 실패 — Redis 에서 제거 (재시도 가능)
|
|
await redis.del(`processed:${eventId}`);
|
|
throw e;
|
|
}
|
|
}
|
|
```
|
|
|
|
⚠️ Redis 와 DB 의 atomicity X — DB 트랜잭션 패턴 권장.
|
|
|
|
### Outbox + idempotent consumer = effectively exactly-once
|
|
```
|
|
Producer: DB write + outbox → broker (at-least-once)
|
|
Consumer: dedupe + handle (idempotent)
|
|
→ 결과: exactly-once
|
|
```
|
|
|
|
→ [[Backend_Outbox_Pattern]] + 이 문서.
|
|
|
|
### Kafka exactly-once semantics (EOS)
|
|
```ts
|
|
// 같은 transaction 안 consume + produce + commit
|
|
const producer = kafka.producer({ transactionalId: 'projector', idempotent: true });
|
|
|
|
await producer.connect();
|
|
const tx = await producer.transaction();
|
|
try {
|
|
await tx.send({ topic: 'output', messages: [...] });
|
|
await tx.sendOffsets({
|
|
consumerGroupId: 'projector',
|
|
topics: [{ topic: 'input', partitions: [{ partition: 0, offset: '42' }] }],
|
|
});
|
|
await tx.commit();
|
|
} catch {
|
|
await tx.abort();
|
|
}
|
|
```
|
|
|
|
→ Kafka 안 EOS. 외부 system 까지는 X.
|
|
|
|
### App-level idempotency key (외부 API)
|
|
```ts
|
|
// Stripe / Square / 외부 API 호출
|
|
async function chargeCustomer(eventId: string, amount: number) {
|
|
return await stripe.charges.create(
|
|
{ amount, currency: 'usd', source: '...' },
|
|
{ idempotencyKey: eventId }
|
|
);
|
|
}
|
|
```
|
|
|
|
→ Stripe 가 같은 key = 한 번만 charge.
|
|
|
|
### Retry-safe DB write
|
|
```sql
|
|
-- ON CONFLICT (UPSERT) = idempotent
|
|
INSERT INTO orders (id, status, amount) VALUES ($1, $2, $3)
|
|
ON CONFLICT (id) DO UPDATE SET status = EXCLUDED.status;
|
|
```
|
|
|
|
→ 같은 order id 재시도 OK.
|
|
|
|
### Increment vs SET (counter)
|
|
```sql
|
|
-- ❌ Increment 가 두 번 실행 = 중복
|
|
UPDATE products SET likes = likes + 1 WHERE id = $1;
|
|
|
|
-- ✅ Idempotent — UNIQUE 한 like 별 row
|
|
INSERT INTO product_likes (product_id, user_id, event_id) VALUES (...)
|
|
ON CONFLICT (event_id) DO NOTHING;
|
|
|
|
-- 그 후 count
|
|
SELECT count(*) FROM product_likes WHERE product_id = $1;
|
|
```
|
|
|
|
### State machine (transition idempotent)
|
|
```ts
|
|
async function handleOrderShipped(orderId: string) {
|
|
const order = await db.orders.find(orderId);
|
|
if (!order) throw new Error('not found');
|
|
|
|
if (order.status === 'shipped' || order.status === 'delivered') {
|
|
return; // 이미 처리 — skip
|
|
}
|
|
|
|
if (order.status !== 'paid') {
|
|
throw new Error(`cannot ship from ${order.status}`);
|
|
}
|
|
|
|
await db.orders.update(orderId, { status: 'shipped' });
|
|
}
|
|
```
|
|
|
|
→ State 만 변경 — 두 번 호출 OK.
|
|
|
|
### 외부 부수효과 추적
|
|
```ts
|
|
// 이메일 보냄 — 두 번 보내면 안 됨
|
|
async function sendOrderEmail(orderId: string, eventId: string) {
|
|
return await db.transaction(async (tx) => {
|
|
// 이미 보냄?
|
|
const sent = await tx.emails.find({ eventId });
|
|
if (sent) return { skipped: true };
|
|
|
|
// 보내기 (transactional 외부 API 가 best)
|
|
await emailService.send({ ..., idempotencyKey: eventId });
|
|
|
|
// 기록
|
|
await tx.emails.insert({ eventId, sentAt: new Date() });
|
|
return { sent: true };
|
|
});
|
|
}
|
|
```
|
|
|
|
⚠️ Email service 가 idempotencyKey 지원 안 하면 — 보내기 후 commit 사이 crash 가능. 보상 / monitoring.
|
|
|
|
### 시간 윈도우 dedupe (가벼운)
|
|
```ts
|
|
// 5분 안 같은 event 처리 X
|
|
const recent = new LRU<string, true>({ max: 100_000, ttl: 5 * 60_000 });
|
|
|
|
async function consume(msg: Message) {
|
|
const id = msg.headers['x-event-id'];
|
|
if (recent.has(id)) return;
|
|
recent.set(id, true);
|
|
await handle(msg);
|
|
}
|
|
```
|
|
|
|
⚠️ Process restart = recent 사라짐. 분산 환경 = 다른 consumer 안 dedupe.
|
|
|
|
### Test
|
|
```ts
|
|
test('consume same event twice = handled once', async () => {
|
|
const msg = makeMessage({ orderId: 'o1' });
|
|
|
|
await consumer.consume(msg);
|
|
await consumer.consume(msg); // duplicate
|
|
|
|
const count = await db.orders.count({ where: { id: 'o1' } });
|
|
expect(count).toBe(1);
|
|
});
|
|
```
|
|
|
|
### Monitoring
|
|
```ts
|
|
metrics.counter('events.processed', { consumer, type });
|
|
metrics.counter('events.duplicates', { consumer });
|
|
metrics.histogram('events.processing_ms', ms);
|
|
|
|
// Alarm: duplicates 률 너무 높음 = producer issue
|
|
```
|
|
|
|
## 🤔 의사결정 기준
|
|
| 상황 | 추천 |
|
|
|---|---|
|
|
| DB 가 truth | Processed table + transaction |
|
|
| 가벼운 / 작은 throughput | Redis SETNX |
|
|
| Kafka 안 transform | EOS transactional |
|
|
| 외부 API 호출 | App idempotency key |
|
|
| Counter 증가 | UNIQUE row + COUNT |
|
|
| State machine | State guard |
|
|
|
|
## ❌ 안티패턴
|
|
- **Dedupe 안 함 + at-least-once**: 중복 처리.
|
|
- **Dedupe + 외부 부수효과 (atomic X)**: 중복 가능.
|
|
- **TTL 없음**: 영원 자라남.
|
|
- **Process memory dedupe + 분산**: 다른 process 안 dedupe.
|
|
- **`COUNT + 1` 같은 non-idempotent**: 중복 시 잘못된 count.
|
|
- **Retry 무한**: 영원 stuck. DLQ.
|
|
- **Producer 가 다른 ID 매번 (UUID 매 retry)**: dedupe 의미 없음.
|
|
|
|
## 🤖 LLM 활용 힌트
|
|
- Processed table + transaction = 표준.
|
|
- 외부 API = idempotencyKey.
|
|
- TTL 30일 + cleanup.
|
|
- Test 가 중복 처리 검증.
|
|
|
|
## 🔗 관련 문서
|
|
- [[Backend_Idempotency_Keys]]
|
|
- [[Backend_Outbox_Pattern]]
|
|
- [[Messaging_Exactly_Once]]
|