Files
2nd/10_Wiki/Topics/Coding/Backend_Backpressure_Server_Side.md
T
2026-05-09 22:47:42 +09:00

8.5 KiB

id, title, category, status, source_trust_level, verification_status, created_at, updated_at, tags, tech_stack, applied_in, aliases
id title category status source_trust_level verification_status created_at updated_at tags tech_stack applied_in aliases
backend-backpressure-server-side Server Backpressure — load shed / queue / rate Coding draft B conceptual 2026-05-09 2026-05-09
backend
backpressure
vibe-coding
language applicable_to
TS / Node
Backend
backpressure
load shedding
queue full
503
retry-after
adaptive concurrency

Backend Backpressure

Server 가 들어오는 것보다 처리 가 느릴 때. Queue 제한 + 503 + retry-after + adaptive concurrency. Cascading failure 막는 핵심.

📖 핵심 개념

  • Backpressure: downstream 가 upstream 에게 "느려" 알림.
  • Buffering: 일시 흡수, but 무한 = OOM.
  • Drop / shed: 일부 거절.
  • Latency 가 throughput 보다 중요할 때.

💻 코드 패턴

Queue 제한

class BoundedQueue<T> {
  private q: T[] = [];
  constructor(private max: number) {}
  
  push(item: T): boolean {
    if (this.q.length >= this.max) return false;  // drop
    this.q.push(item);
    return true;
  }
  
  pop(): T | undefined {
    return this.q.shift();
  }
}

const q = new BoundedQueue(1000);
app.post('/job', (req, res) => {
  if (!q.push(req.body)) {
    res.set('Retry-After', '5').status(503).json({ error: 'overloaded' });
    return;
  }
  res.status(202).json({ queued: true });
});

503 Service Unavailable

function checkLoad(req, res, next) {
  const inflight = stats.inflight();
  if (inflight > MAX_INFLIGHT) {
    res.set('Retry-After', '2');
    return res.status(503).json({ error: 'overloaded' });
  }
  next();
}

Adaptive concurrency (Vegas / Gradient)

class AdaptiveLimit {
  private limit = 100;
  private inflight = 0;
  private rttMin = Infinity;
  
  async run<T>(fn: () => Promise<T>): Promise<T> {
    if (this.inflight >= this.limit) throw new Error('overloaded');
    
    this.inflight++;
    const start = Date.now();
    try {
      const r = await fn();
      const rtt = Date.now() - start;
      this.rttMin = Math.min(this.rttMin, rtt);
      
      // Gradient: rtt > 2 * rttMin = limit ↓
      if (rtt > this.rttMin * 2) {
        this.limit = Math.max(10, this.limit * 0.9);
      } else {
        this.limit = Math.min(1000, this.limit + 1);
      }
      
      return r;
    } finally {
      this.inflight--;
    }
  }
}

→ Netflix concurrency-limits 의 idea.

Token bucket (shaping)

class TokenBucket {
  private tokens: number;
  private lastRefill = Date.now();
  
  constructor(private capacity: number, private rate: number) {
    this.tokens = capacity;
  }
  
  consume(n: number = 1): boolean {
    const now = Date.now();
    const elapsed = (now - this.lastRefill) / 1000;
    this.tokens = Math.min(this.capacity, this.tokens + elapsed * this.rate);
    this.lastRefill = now;
    
    if (this.tokens < n) return false;
    this.tokens -= n;
    return true;
  }
}

const bucket = new TokenBucket(100, 10);  // 100 burst, 10/s

LIFO vs FIFO queue

FIFO (queue): 옛 request 도 답 — 다 stale 일 수.
LIFO (stack): 최신 우선 — 옛 자동 timeout.

Overload 시 LIFO 가 user 친화 (최신 요청 = 사용자 wait).

→ Envoy 가 LIFO option.

Timeout 강제

async function withTimeout<T>(p: Promise<T>, ms: number): Promise<T> {
  return Promise.race([
    p,
    new Promise<T>((_, rej) => setTimeout(() => rej(new Error('timeout')), ms)),
  ]);
}

app.get('/slow', async (req, res) => {
  try {
    const r = await withTimeout(slowQuery(), 5000);
    res.json(r);
  } catch (e) {
    res.status(504).json({ error: 'timeout' });
  }
});

→ Hung request 가 inflight 점유 하면 cascade.

Connection pool 제한

const pool = new Pool({ max: 50, connectionTimeoutMillis: 3000 });

// 50 동시 query → 51 번 째 가 wait → 3s 후 reject
// → 명시적 제한 = 명시적 backpressure

Semaphore

class Semaphore {
  private permits: number;
  private waiters: (() => void)[] = [];
  
  constructor(n: number) { this.permits = n; }
  
  async acquire(): Promise<void> {
    if (this.permits > 0) {
      this.permits--;
      return;
    }
    return new Promise(res => this.waiters.push(res));
  }
  
  release(): void {
    if (this.waiters.length > 0) this.waiters.shift()!();
    else this.permits++;
  }
}

const sem = new Semaphore(10);
async function processJob(job) {
  await sem.acquire();
  try { /* ... */ } finally { sem.release(); }
}

Load shedding (priority)

function shedLoad(priority: 'high' | 'normal' | 'low'): boolean {
  const cpu = currentCpuUsage();
  if (cpu > 0.95) return priority !== 'high';  // low+normal 거절
  if (cpu > 0.85) return priority === 'low';   // low 거절
  return false;
}

app.post('/job', (req, res) => {
  if (shedLoad(req.body.priority)) {
    return res.status(503).json({ error: 'shed' });
  }
  // ...
});

Stream backpressure (Node)

import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';

await pipeline(
  createReadStream('big.txt'),
  myTransform,
  createWriteStream('out.txt'),
);
// → 자동 backpressure (write 느리면 read 멈춤)

Manual stream

const ws = res;  // HTTP response
for (const chunk of bigData) {
  if (!ws.write(chunk)) {
    await once(ws, 'drain');  // buffer 빔
  }
}

write returns false = buffer full, drain event 까지 wait.

Database protect

-- Postgres
ALTER SYSTEM SET statement_timeout = '30s';
ALTER SYSTEM SET idle_in_transaction_session_timeout = '60s';
ALTER SYSTEM SET lock_timeout = '5s';

-- 한 query 가 hung → 60s 후 cancel.

Circuit breaker

class CircuitBreaker {
  private failures = 0;
  private state: 'closed' | 'open' | 'half' = 'closed';
  private nextRetry = 0;
  
  async run<T>(fn: () => Promise<T>): Promise<T> {
    if (this.state === 'open') {
      if (Date.now() < this.nextRetry) throw new Error('circuit open');
      this.state = 'half';
    }
    
    try {
      const r = await fn();
      this.failures = 0;
      this.state = 'closed';
      return r;
    } catch (e) {
      this.failures++;
      if (this.failures > 5) {
        this.state = 'open';
        this.nextRetry = Date.now() + 30_000;
      }
      throw e;
    }
  }
}

Health check (LB out of rotation)

app.get('/health', (req, res) => {
  const cpu = currentCpu();
  const memory = currentMem();
  if (cpu > 0.9 || memory > 0.9) {
    return res.status(503).end();
  }
  res.status(200).end();
});

→ Healthy=200 가 LB rotation 의 답.

Graceful shutdown

process.on('SIGTERM', async () => {
  server.close();              // 새 connection X
  await drainQueue(30_000);    // 30s 안 처리
  process.exit(0);
});

→ Drain 안 하면 in-flight 잃음.

Async / await + concurrency

import pLimit from 'p-limit';

const limit = pLimit(10);  // max 10 concurrent
const results = await Promise.all(
  items.map(i => limit(() => process(i)))
);

gRPC server backpressure

Stream RPC 가 client 가 느림 → gRPC 자동 backpressure (HTTP/2 flow control).

Set:
- maxConcurrentStreams
- writeBufferSize

Kafka consumer

// Manual commit + 1 batch 처리
consumer.run({
  eachBatch: async ({ batch, heartbeat }) => {
    for (const msg of batch.messages) {
      await processSlowly(msg);
      await heartbeat();  // 안 하면 timeout → rebalance
    }
  },
});

→ Slow consumer = Kafka 가 자체 backpressure.

모니터링 (필수)

- Inflight count
- Queue size
- p99 latency
- 503 rate
- CPU / memory
- Concurrency limit (adaptive)

→ Backpressure 활동 = visible.

🤔 의사결정 기준

상황 추천
큰 burst Token bucket + 503
Slow downstream Bounded queue
Variable load Adaptive concurrency
Stream 데이터 Native backpressure (drain)
우선순위 Shed low-priority
Multi-tenant Per-tenant limit

안티패턴

  • 무한 buffer: OOM.
  • Timeout 없음: hung 가 cascade.
  • 503 + Retry-After 없음: client 가 즉시 재 → 죽임.
  • 모든 거 균등: priority 다르게.
  • Backpressure 무시: chunk 잃음 / 누적.
  • Health 가 항상 200: LB out 안 함.
  • Graceful shutdown 없음: 잃음.

🤖 LLM 활용 힌트

  • 503 + Retry-After 가 client signal.
  • Adaptive concurrency = 안정적.
  • Stream 가 native backpressure 큰 무료.
  • 모니터링 없이 배포 X.

🔗 관련 문서