330 lines
7.4 KiB
Markdown
330 lines
7.4 KiB
Markdown
---
|
|
id: frontend-streams-api
|
|
title: Streams API — ReadableStream / TransformStream / pipeThrough
|
|
category: Coding
|
|
status: draft
|
|
source_trust_level: B
|
|
verification_status: conceptual
|
|
created_at: 2026-05-09
|
|
updated_at: 2026-05-09
|
|
tags: [frontend, streams, vibe-coding]
|
|
tech_stack: { language: "TS", applicable_to: ["Frontend", "Backend"] }
|
|
applied_in: []
|
|
aliases: [Streams, ReadableStream, WritableStream, TransformStream, pipeThrough, backpressure]
|
|
---
|
|
|
|
# Streams API
|
|
|
|
> Browser + Node + Deno + Bun 에 표준. **ReadableStream → TransformStream → WritableStream**. Fetch streaming, SSE, AI streaming, file 처리. Backpressure 자동.
|
|
|
|
## 📖 핵심 개념
|
|
- ReadableStream: 데이터 출력.
|
|
- WritableStream: 데이터 입력.
|
|
- TransformStream: middle (변환).
|
|
- Backpressure: consumer 가 느리면 producer 가 자동 멈춤.
|
|
|
|
## 💻 코드 패턴
|
|
|
|
### Fetch streaming (browser)
|
|
```ts
|
|
const res = await fetch('/api/large');
|
|
const reader = res.body!.getReader();
|
|
|
|
while (true) {
|
|
const { done, value } = await reader.read();
|
|
if (done) break;
|
|
console.log('chunk:', value.byteLength);
|
|
}
|
|
```
|
|
|
|
### Text decoder
|
|
```ts
|
|
const res = await fetch('/api/sse');
|
|
const reader = res.body!
|
|
.pipeThrough(new TextDecoderStream())
|
|
.getReader();
|
|
|
|
while (true) {
|
|
const { done, value } = await reader.read();
|
|
if (done) break;
|
|
console.log(value); // string chunks
|
|
}
|
|
```
|
|
|
|
### TransformStream (custom)
|
|
```ts
|
|
const upper = new TransformStream<string, string>({
|
|
transform(chunk, controller) {
|
|
controller.enqueue(chunk.toUpperCase());
|
|
},
|
|
});
|
|
|
|
await fetch('/text')
|
|
.then(r => r.body!)
|
|
.then(s => s.pipeThrough(new TextDecoderStream()))
|
|
.then(s => s.pipeThrough(upper))
|
|
.then(s => s.pipeTo(new WritableStream({
|
|
write(chunk) { console.log(chunk); }
|
|
})));
|
|
```
|
|
|
|
### LLM SSE streaming (fetch)
|
|
```ts
|
|
async function* streamLLM(prompt: string) {
|
|
const res = await fetch('/api/chat', {
|
|
method: 'POST',
|
|
body: JSON.stringify({ prompt }),
|
|
headers: { 'content-type': 'application/json' },
|
|
});
|
|
|
|
const reader = res.body!
|
|
.pipeThrough(new TextDecoderStream())
|
|
.getReader();
|
|
|
|
let buffer = '';
|
|
while (true) {
|
|
const { done, value } = await reader.read();
|
|
if (done) break;
|
|
buffer += value;
|
|
|
|
let idx;
|
|
while ((idx = buffer.indexOf('\n\n')) >= 0) {
|
|
const event = buffer.slice(0, idx);
|
|
buffer = buffer.slice(idx + 2);
|
|
|
|
if (event.startsWith('data: ')) {
|
|
const json = event.slice(6);
|
|
if (json === '[DONE]') return;
|
|
yield JSON.parse(json);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
// 사용
|
|
for await (const chunk of streamLLM('Hello')) {
|
|
process.stdout.write(chunk.text);
|
|
}
|
|
```
|
|
|
|
### Server-side streaming (Hono / Bun)
|
|
```ts
|
|
import { stream } from 'hono/streaming';
|
|
|
|
app.get('/stream', (c) => {
|
|
return stream(c, async (stream) => {
|
|
for (let i = 0; i < 100; i++) {
|
|
await stream.writeln(`chunk ${i}`);
|
|
await stream.sleep(100);
|
|
}
|
|
});
|
|
});
|
|
```
|
|
|
|
### Manual ReadableStream
|
|
```ts
|
|
const rs = new ReadableStream({
|
|
start(controller) {
|
|
controller.enqueue('a');
|
|
controller.enqueue('b');
|
|
controller.close();
|
|
},
|
|
});
|
|
|
|
const reader = rs.getReader();
|
|
const { value } = await reader.read();
|
|
```
|
|
|
|
### Async iterator (Streams 도)
|
|
```ts
|
|
const rs = new ReadableStream({
|
|
start(controller) {
|
|
setInterval(() => controller.enqueue(Date.now()), 1000);
|
|
},
|
|
});
|
|
|
|
// for await
|
|
for await (const v of rs) {
|
|
console.log(v);
|
|
}
|
|
```
|
|
|
|
→ Modern browsers 가 stream 의 async iterator 지원.
|
|
|
|
### File API (browser, large file)
|
|
```ts
|
|
const file = input.files![0];
|
|
const stream = file.stream();
|
|
|
|
const reader = stream.getReader();
|
|
let bytesRead = 0;
|
|
while (true) {
|
|
const { done, value } = await reader.read();
|
|
if (done) break;
|
|
bytesRead += value.byteLength;
|
|
console.log(`progress: ${bytesRead / file.size * 100}%`);
|
|
}
|
|
```
|
|
|
|
→ 큰 파일 — 메모리에 올리지 않고 stream.
|
|
|
|
### DecompressionStream
|
|
```ts
|
|
const res = await fetch('/data.gz');
|
|
const decompressed = res.body!.pipeThrough(new DecompressionStream('gzip'));
|
|
const text = await new Response(decompressed).text();
|
|
```
|
|
|
|
### CompressionStream
|
|
```ts
|
|
const text = 'Lorem ipsum...';
|
|
const compressed = new Blob([text]).stream()
|
|
.pipeThrough(new CompressionStream('gzip'));
|
|
|
|
await fetch('/upload', { method: 'POST', body: compressed });
|
|
```
|
|
|
|
### Web Worker + Stream (transferable)
|
|
```ts
|
|
const stream = new ReadableStream({...});
|
|
worker.postMessage({ stream }, [stream]);
|
|
```
|
|
|
|
→ Stream 도 transferable.
|
|
|
|
### Cancel
|
|
```ts
|
|
const reader = stream.getReader();
|
|
// 중단
|
|
await reader.cancel('user cancelled');
|
|
|
|
// AbortController (fetch)
|
|
const ac = new AbortController();
|
|
fetch('/stream', { signal: ac.signal });
|
|
ac.abort();
|
|
```
|
|
|
|
### Backpressure
|
|
```ts
|
|
const ws = new WritableStream({
|
|
async write(chunk) {
|
|
// 느린 처리
|
|
await db.insert(chunk);
|
|
},
|
|
}, new CountQueuingStrategy({ highWaterMark: 10 }));
|
|
|
|
await readable.pipeTo(ws);
|
|
// → 자동 backpressure: write 느리면 read 멈춤
|
|
```
|
|
|
|
### Tee (split stream)
|
|
```ts
|
|
const [a, b] = readable.tee();
|
|
|
|
a.pipeTo(write1);
|
|
b.pipeTo(write2);
|
|
```
|
|
|
|
### Error handling
|
|
```ts
|
|
const tx = new TransformStream({
|
|
transform(chunk, controller) {
|
|
try {
|
|
controller.enqueue(JSON.parse(chunk));
|
|
} catch (e) {
|
|
controller.error(e); // downstream 도 오류
|
|
}
|
|
},
|
|
});
|
|
|
|
await readable.pipeTo(write).catch(err => {
|
|
console.error('stream error:', err);
|
|
});
|
|
```
|
|
|
|
### Node.js (web stream)
|
|
```ts
|
|
import { Readable } from 'node:stream';
|
|
|
|
// Node stream → Web stream
|
|
const webStream = Readable.toWeb(nodeStream);
|
|
|
|
// Web stream → Node stream
|
|
const nodeStream = Readable.fromWeb(webStream);
|
|
```
|
|
|
|
### React UI (streaming render)
|
|
```tsx
|
|
function ChatMessage({ stream }: { stream: ReadableStream }) {
|
|
const [text, setText] = useState('');
|
|
|
|
useEffect(() => {
|
|
let cancelled = false;
|
|
(async () => {
|
|
const reader = stream.getReader();
|
|
while (!cancelled) {
|
|
const { done, value } = await reader.read();
|
|
if (done) break;
|
|
setText(t => t + value);
|
|
}
|
|
})();
|
|
return () => { cancelled = true; };
|
|
}, [stream]);
|
|
|
|
return <p>{text}</p>;
|
|
}
|
|
```
|
|
|
|
### Bun streams
|
|
```ts
|
|
const file = Bun.file('large.txt');
|
|
for await (const chunk of file.stream()) {
|
|
// ...
|
|
}
|
|
```
|
|
|
|
### Streaming SSR (React 19 / Next)
|
|
```ts
|
|
// Next.js
|
|
export default async function Page() {
|
|
return (
|
|
<Suspense fallback={<Spinner />}>
|
|
<SlowComponent />
|
|
</Suspense>
|
|
);
|
|
}
|
|
```
|
|
|
|
→ 서버 가 HTML 을 stream. 빠른 first byte.
|
|
|
|
## 🤔 의사결정 기준
|
|
| 상황 | 추천 |
|
|
|---|---|
|
|
| Fetch 큰 response | ReadableStream |
|
|
| LLM streaming | TextDecoderStream + 파싱 |
|
|
| File upload 큰 | File.stream() |
|
|
| 변환 chain | TransformStream |
|
|
| Compress / decompress | Compression API |
|
|
| Server stream | Hono / Bun stream |
|
|
| Worker 통신 | Transferable stream |
|
|
|
|
## ❌ 안티패턴
|
|
- **모두 메모리에 (await res.text())**: 큰 = OOM. Stream.
|
|
- **Backpressure 무시 (manual write loop)**: 메모리 폭주.
|
|
- **Cancel 안 함 (component unmount)**: 누수.
|
|
- **String concatenation in transform**: copy 폭발.
|
|
- **Tee 후 한 쪽만 read**: 다른 쪽 블락.
|
|
- **Error 무전파**: 디버깅 어려움.
|
|
- **Node Buffer + Web Stream 혼동**: type 깨짐.
|
|
|
|
## 🤖 LLM 활용 힌트
|
|
- Browser + Node + Deno + Bun 표준.
|
|
- TextDecoderStream / CompressionStream 가 freebie.
|
|
- pipeThrough chain 으로 복잡 변환.
|
|
- Backpressure 자동 (highWaterMark).
|
|
|
|
## 🔗 관련 문서
|
|
- [[Web_SSE_Server_Sent_Events]]
|
|
- [[AI_Streaming_LLM_Response]]
|
|
- [[Node_Streams_Patterns]]
|