4.5 KiB
4.5 KiB
id, title, category, status, source_trust_level, verification_status, created_at, updated_at, tags, tech_stack, applied_in, aliases
| id | title | category | status | source_trust_level | verification_status | created_at | updated_at | tags | tech_stack | applied_in | aliases | ||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| node-streams-patterns | Node Streams — Readable / Writable / Transform / Web Streams | Coding | draft | B | conceptual | 2026-05-09 | 2026-05-09 |
|
|
|
Node Streams
큰 파일 / 무한 데이터 = stream.
pipeline()+ async iterator + Web Streams 가 modern. Backpressure 자동 처리. 옛 .pipe() / event listener 보다 안전.
📖 핵심 개념
- Readable: 읽기 (file, network response).
- Writable: 쓰기.
- Transform: 변환 (gzip, parser).
- Async iterator:
for await (const chunk of stream). - Backpressure: writable 가 느리면 readable 도 멈춤.
💻 코드 패턴
pipeline (가장 안전)
import { pipeline } from 'node:stream/promises';
import { createReadStream, createWriteStream } from 'node:fs';
import { createGzip } from 'node:zlib';
await pipeline(
createReadStream('input.txt'),
createGzip(),
createWriteStream('output.txt.gz'),
);
// 한쪽 에러 → 모든 stream cleanup
Async iterator (read)
import { createReadStream } from 'node:fs';
import { createInterface } from 'node:readline';
const rl = createInterface({ input: createReadStream('big.txt'), crlfDelay: Infinity });
for await (const line of rl) {
if (line.includes('error')) console.log(line);
}
Transform (변환)
import { Transform } from 'node:stream';
const upper = new Transform({
transform(chunk, _enc, cb) {
cb(null, chunk.toString().toUpperCase());
},
});
await pipeline(stdin, upper, stdout);
직접 Readable 만들기
import { Readable } from 'node:stream';
async function* generate() {
for (let i = 0; i < 1_000_000; i++) yield `${i}\n`;
}
const stream = Readable.from(generate());
await pipeline(stream, createWriteStream('out.txt'));
Web Streams (modern, fetch / 브라우저 호환)
const r = await fetch('https://example.com/big.json');
const reader = r.body!.getReader();
const decoder = new TextDecoder();
while (true) {
const { value, done } = await reader.read();
if (done) break;
process(decoder.decode(value, { stream: true }));
}
// 또는 async iterator (Node 18+)
for await (const chunk of r.body!) { ... }
Web ↔ Node 변환
import { Readable, Writable } from 'node:stream';
const nodeReadable = Readable.fromWeb(webReadable);
const webReadable = Readable.toWeb(nodeReadable);
Backpressure 직접 제어
async function writeLines(writable: Writable, lines: AsyncIterable<string>) {
for await (const line of lines) {
if (!writable.write(line)) {
await new Promise(r => writable.once('drain', r));
}
}
writable.end();
}
CSV streaming parse
import { parse } from 'csv-parse';
await pipeline(
createReadStream('big.csv'),
parse({ columns: true }),
async function* (rows) {
for await (const row of rows) {
yield JSON.stringify(row) + '\n';
}
},
createWriteStream('out.ndjson'),
);
HTTP response streaming
import { Readable } from 'node:stream';
app.get('/big', async (req, res) => {
res.setHeader('content-type', 'application/x-ndjson');
const data = streamFromDB();
await pipeline(data, res); // 자동 backpressure
});
Error 처리
try {
await pipeline(...);
} catch (e) {
// 모든 stream 에서 발생한 에러 캡처
}
🤔 의사결정 기준
| 상황 | 추천 |
|---|---|
| 큰 파일 read/write | createReadStream + pipeline |
| HTTP body | for await (chunk of req) |
| 변환 | Transform 또는 async generator |
| Browser 호환 | Web Streams |
| 메모리 한계 | stream 필수 |
| 작은 데이터 | string / Buffer 직접 |
❌ 안티패턴
.pipe(...)직접 + error handler 누락: dangling stream.- String concat 으로 GB 파일 read: OOM.
- on('data') 만 + flow mode: backpressure 무시. async iterator.
- Transform
_flush누락 + 마지막 chunk: 데이터 일부 잃음. - paused readable 그대로 await: hang.
- String chunk 가정: encoding 불일치 가능. setEncoding('utf8').
🤖 LLM 활용 힌트
- pipeline + async iterator + Web Streams 3종.
- 큰 데이터는 항상 stream.
- Error = pipeline catch.