198 lines
7.0 KiB
Markdown
198 lines
7.0 KiB
Markdown
---
|
|
id: backpressure-patterns
|
|
title: 백프레셔 패턴 (Backpressure Patterns)
|
|
category: Coding
|
|
status: draft
|
|
canonical_id: backpressure-patterns
|
|
aliases: [backpressure, flow control, rate limiting, throttling, queue overflow, 백프레셔]
|
|
duplicate_of: null
|
|
source_trust_level: B
|
|
confidence_score: 0.85
|
|
verification_status: conceptual
|
|
created_at: 2026-05-09
|
|
updated_at: 2026-05-09
|
|
last_reinforced: 2026-05-09
|
|
review_reason: ""
|
|
merge_history: []
|
|
tags: [coding, async, streaming, queues, throttling, vibe-coding]
|
|
raw_sources: ["P-Reinforce session 2026-05-09 — bulk Coding seed batch 1"]
|
|
tech_stack:
|
|
language: "TypeScript / Node.js / Reactive frameworks"
|
|
applicable_to: ["Backend", "Stream processing", "Worker"]
|
|
applied_in: []
|
|
---
|
|
|
|
# 백프레셔 패턴
|
|
|
|
> Producer 가 Consumer 보다 빠르면 메모리가 무한정 늘어난다. 답은 **producer 를 늦추거나(slow), 큐 한계에서 떨어뜨리거나(drop), 사용자에게 거부하기(reject)**. 절대 "그냥 큐가 알아서 하겠지" 가 아니다.
|
|
|
|
## 📖 핵심 개념
|
|
|
|
비동기 시스템에서 producer 와 consumer 의 처리 속도 비대칭이 발생하면 buffer 가 무한정 자라거나(OOM), 가장 늦은 부분에서 latency 가 폭발한다.
|
|
|
|
해결책 4종:
|
|
1. **Slow producer** (pull 모델): consumer 가 "가져갈 준비됐다" 를 신호. producer 는 그때만 push.
|
|
2. **Bounded queue + block**: 큐 가득 차면 producer 가 enqueue 시 block.
|
|
3. **Bounded queue + drop**: 가득 차면 새 메시지 drop (또는 가장 오래된 것 drop = "head drop").
|
|
4. **Reject at the door**: 입구에서 처리량 한도 초과시 client 에 4xx 즉시 반환. 큐에 넣지 않음.
|
|
|
|
언어/런타임마다 메커니즘이 다름:
|
|
- Node.js streams: `pipe()` + `stream.write()` 의 boolean return + `'drain'` 이벤트
|
|
- Async iterators: `for await` 가 자연스러운 pull
|
|
- RxJS / Reactive: `throttleTime`, `sample`, `audit`, `buffer`, `windowTime`
|
|
- Goroutines: bounded channel `make(chan T, N)` 가 자동 block
|
|
|
|
## 💻 코드 패턴
|
|
|
|
### 1. Bounded queue with reject (TypeScript / 의사코드)
|
|
|
|
```ts
|
|
class BoundedQueue<T> {
|
|
private q: T[] = [];
|
|
constructor(private readonly capacity: number) {}
|
|
|
|
tryEnqueue(item: T): boolean {
|
|
if (this.q.length >= this.capacity) return false; // reject
|
|
this.q.push(item);
|
|
return true;
|
|
}
|
|
dequeue(): T | undefined { return this.q.shift(); }
|
|
}
|
|
|
|
// 사용
|
|
if (!queue.tryEnqueue(req)) {
|
|
res.status(503).json({ error: 'overloaded, retry later' });
|
|
return;
|
|
}
|
|
```
|
|
|
|
### 2. Node.js stream — drain 이벤트 활용
|
|
|
|
```ts
|
|
function writeAll(writable: NodeJS.WritableStream, chunks: Buffer[]) {
|
|
return new Promise<void>((resolve, reject) => {
|
|
let i = 0;
|
|
function next() {
|
|
while (i < chunks.length) {
|
|
const ok = writable.write(chunks[i++]);
|
|
if (!ok) {
|
|
writable.once('drain', next); // backpressure 감지 → drain 까지 대기
|
|
return;
|
|
}
|
|
}
|
|
resolve();
|
|
}
|
|
writable.on('error', reject);
|
|
next();
|
|
});
|
|
}
|
|
```
|
|
|
|
`writable.write()` 가 `false` 를 반환하면 buffer 가 watermark 초과. `'drain'` 이벤트 후 재개. 무시하면 메모리 폭주.
|
|
|
|
### 3. Async iterator — 자연스러운 pull
|
|
|
|
```ts
|
|
async function* batchedFetch(urls: string[], concurrency = 10): AsyncGenerator<Response> {
|
|
const queue = [...urls];
|
|
const inFlight: Promise<Response>[] = [];
|
|
|
|
while (queue.length || inFlight.length) {
|
|
while (inFlight.length < concurrency && queue.length) {
|
|
inFlight.push(fetch(queue.shift()!));
|
|
}
|
|
const result = await Promise.race(inFlight.map((p, idx) => p.then(r => ({ r, idx }))));
|
|
inFlight.splice(result.idx, 1);
|
|
yield result.r;
|
|
}
|
|
}
|
|
|
|
// consumer 가 처리한 만큼만 fetch — 자동 backpressure
|
|
for await (const res of batchedFetch(urls)) {
|
|
await processSlowly(res);
|
|
}
|
|
```
|
|
|
|
### 4. Token bucket — rate limit 입구 차단
|
|
|
|
```ts
|
|
class TokenBucket {
|
|
private tokens: number;
|
|
private lastRefill = Date.now();
|
|
constructor(private readonly capacity: number, private readonly refillPerSec: number) {
|
|
this.tokens = capacity;
|
|
}
|
|
tryTake(): boolean {
|
|
this.refill();
|
|
if (this.tokens < 1) return false;
|
|
this.tokens -= 1;
|
|
return true;
|
|
}
|
|
private refill() {
|
|
const now = Date.now();
|
|
const delta = (now - this.lastRefill) / 1000;
|
|
this.tokens = Math.min(this.capacity, this.tokens + delta * this.refillPerSec);
|
|
this.lastRefill = now;
|
|
}
|
|
}
|
|
|
|
const bucket = new TokenBucket(100, 10); // 초당 10건, 버스트 100
|
|
app.use((req, res, next) => {
|
|
if (!bucket.tryTake()) return res.status(429).json({ error: 'rate limit' });
|
|
next();
|
|
});
|
|
```
|
|
|
|
### 5. Drop 정책 — 최신 우선 (head drop)
|
|
|
|
```ts
|
|
function pushOldestDrop<T>(q: T[], item: T, capacity: number): T | undefined {
|
|
let dropped: T | undefined;
|
|
if (q.length >= capacity) dropped = q.shift(); // 가장 오래된 것 버림
|
|
q.push(item);
|
|
return dropped;
|
|
}
|
|
```
|
|
|
|
실시간 메트릭 / 센서 데이터처럼 "오래된 값은 가치 없음" 일 때.
|
|
|
|
## 🤔 의사결정 기준
|
|
|
|
| 데이터 성격 | 권장 전략 |
|
|
|---|---|
|
|
| 결제·거래 — 절대 손실 불가 | bounded queue + block (또는 외부 큐) |
|
|
| 실시간 메트릭 — 최신만 중요 | head drop |
|
|
| HTTP 요청 — 사용자가 기다림 | reject at the door (429) |
|
|
| 스트림 처리 — 가능한 모두 처리 | pull 기반 async iterator |
|
|
| 분산 시스템 | Kafka / SQS / RabbitMQ — 외부 큐가 backpressure 담당 |
|
|
|
|
## ❌ 안티패턴
|
|
|
|
- **무한 in-memory 큐**: `const q = []`; producer 만 push. 시간 문제로 OOM.
|
|
- **`Promise.all(huge_array.map(...))`**: 동시성 제한 없음. 1만 개 요청 동시 발사 → external API 죽거나 우리 메모리 폭발.
|
|
- **드랍하면서 로그 안 남김**: 사일런트 데이터 손실. 최소 metric 카운터.
|
|
- **block 방식이 web request handler 안에서**: 사용자 응답 lateny 폭발. reject 쪽이 적합.
|
|
- **rate limit 을 사용자 단위가 아닌 글로벌만**: 한 악성 클라이언트가 모든 사용자 차단.
|
|
- **stream 의 `write()` boolean return 무시**: 가장 흔한 Node.js 메모리 누수 원인.
|
|
- **timer 기반 polling 으로 backpressure 흉내**: `setInterval(consume, 100)` — consumer 가 1초당 10번만 처리. queue 폭발 가능.
|
|
- **외부 API 호출에 타임아웃 없음**: 외부가 멈추면 우리 큐 무한정 누적.
|
|
|
|
## 🤖 LLM 활용 힌트
|
|
|
|
- LLM에게 worker 코드 작성: "**bounded queue 사용. capacity 초과 시 reject 또는 head drop. drop 시 metric 카운터 증가**" 명시.
|
|
- HTTP API: "**입구에 rate limit, 큐 가득 차면 429 즉시 반환**" 패턴 요청.
|
|
- 스트림 파이프라인: "**async iterator pull 기반으로 작성**" 명시 → 자연스러운 backpressure.
|
|
- 외부 API 호출: "**concurrency limit + AbortSignal timeout 필수**" 강조.
|
|
|
|
## 🧪 검증 상태
|
|
|
|
- verification_status: `conceptual`
|
|
- Reactive Streams 명세, Node.js streams docs, Kafka consumer group 메커니즘 등 표준 패턴.
|
|
- 적용 사례 발견 시 `applied_in` 추가.
|
|
|
|
## 🔗 관련 문서
|
|
|
|
- [[Idempotent_Operations]]
|
|
- [[Optimistic_Concurrency_Control]]
|
|
- [[Error_Handling_Result_vs_Throw]]
|