--- id: wiki-2026-0508-탭과-싱크-taps-and-sinks title: 탭과 싱크(Taps and Sinks) category: 10_Wiki/Topics status: verified canonical_id: self aliases: [Tap, Sink, Side-effect observation, Stream sink] duplicate_of: none source_trust_level: A confidence_score: 0.85 verification_status: applied tags: [streams, reactive, effect, observability, architecture] raw_sources: [] last_reinforced: 2026-05-10 github_commit: pending tech_stack: language: TypeScript framework: Effect / RxJS --- # 탭과 싱크(Taps and Sinks) ## 매 한 줄 > **"매 stream 을 손대지 않고 들여다보는 tap, 매 stream 을 끝내며 결과를 모으는 sink."**. Reactive / functional stream architecture 의 매 두 핵심 primitive. Tap 은 매 transparent observer (logging, metrics) — 매 값 변형 X. Sink 는 매 terminal consumer — 매 stream 을 result 로 collapse. ## 매 핵심 ### 매 Tap (passive) - 매 stream 의 element 를 매 side-effect 로 관찰. - 매 element 자체는 매 unchanged 통과. - 매 logging, debugging, metrics, audit trail 의 사용. - Effect 의 `Effect.tap`, RxJS 의 `tap()`, Java Stream 의 `peek()`. ### 매 Sink (terminal) - 매 stream 을 매 single value (or unit) 로 reduce. - 매 stream 종결 시점 — 매 downstream 더 X. - Reduction (sum, count, list), I/O write, DB insert 의 형태. - Effect 의 `Sink`, Akka Streams 의 `Sink[T, M]`. ### 매 응용 1. Observability — 매 production stream 에 매 tap 으로 metrics 주입. 2. Data pipeline 종착점 — 매 sink 로 file/DB write. 3. Test instrumentation — 매 tap 으로 intermediate state 검증. 4. Backpressure 의 demand source — 매 sink 가 매 pull rate 결정. ## 💻 패턴 ### Pattern 1 — Effect.tap ```typescript import { Effect, Console } from 'effect'; const program = Effect.succeed(42).pipe( Effect.tap((n) => Console.log(`got ${n}`)), Effect.map((n) => n * 2), ); // 매 log 출력 + 매 84 반환. 매 tap 은 value 변형 X. ``` ### Pattern 2 — RxJS tap (debugging) ```typescript import { from } from 'rxjs'; import { tap, map, filter } from 'rxjs/operators'; from([1, 2, 3, 4]).pipe( tap((v) => console.log(`before filter: ${v}`)), filter((v) => v % 2 === 0), tap((v) => console.log(`after filter: ${v}`)), map((v) => v * 10), ).subscribe(console.log); // 매 pipeline 의 각 stage 에 매 invasive 한 logging. ``` ### Pattern 3 — Effect Sink (collect) ```typescript import { Stream, Sink, Effect } from 'effect'; const result = Stream.range(1, 100).pipe( Stream.run(Sink.sum), ); // Effect — 매 5050. ``` ### Pattern 4 — Custom Sink (foldLeft) ```typescript const collectSink = Sink.foldLeft( [], (acc, n) => [...acc, n * 2], ); const collected = Stream.range(1, 5).pipe( Stream.run(collectSink), ); // Effect<[2, 4, 6, 8, 10]>. ``` ### Pattern 5 — Tap for metrics ```typescript import { Metric } from 'effect'; const requestCounter = Metric.counter('http_requests'); const handler = (req: Request) => Effect.succeed(req).pipe( Effect.tap(() => Metric.increment(requestCounter)), Effect.flatMap(processRequest), ); // 매 모든 request 에 매 counter ++. 매 main flow 의 변형 X. ``` ### Pattern 6 — Sink to file (Node) ```typescript import { createWriteStream } from 'node:fs'; import { Stream, Sink } from 'effect'; const fileSink = Sink.fromWritable(() => createWriteStream('out.txt')); await Stream.range(1, 1000).pipe( Stream.map((n) => `line ${n}\n`), Stream.run(fileSink), Effect.runPromise, ); // 매 1000 lines 의 file 에 sink. ``` ### Pattern 7 — tapError (failure observation) ```typescript const fetchUser = (id: string) => Effect.tryPromise(() => fetch(`/users/${id}`)).pipe( Effect.tapError((err) => Console.error(`fetch failed: ${err}`), ), ); // 매 error path 만 관찰 — 매 success 는 매 untouched. ``` ### Pattern 8 — Sink composition ```typescript const dualSink = Sink.zip(Sink.sum, Sink.count); const [total, count] = await Stream.range(1, 10).pipe( Stream.run(dualSink), Effect.runPromise, ); // 매 single pass 의 sum + count. ``` ## 매 결정 기준 | 상황 | Tap or Sink | |---|---| | 값 변형 없이 관찰 | **Tap** | | Stream 종결 + 결과 collect | **Sink** | | 매 logging / metrics | Tap | | 매 file / DB write | Sink | | Error path 만 관찰 | tapError | | 다중 결과 동시 collect | Sink.zip | **기본값**: 매 observation = tap, 매 termination = sink. 매 tap 의 안에서 매 mutation 금지. ## 🔗 Graph - 부모: [[Reactive Streams]] - 응용: [[Backpressure]] ## 🤖 LLM 활용 **언제**: 매 stream pipeline 의 instrumentation, terminal consumer 설계. **언제 X**: 매 tap 안에서 value mutate (매 functional contract 위반). 매 sink 를 매 중간 transformation 으로 오용. ## ❌ 안티패턴 - **Tap 안 mutation**: 매 element field 의 변경 — 매 pipeline 의 referential transparency 손상. - **Sink 누락**: 매 stream 정의만 하고 매 run 안 함 — Effect 의 매 lazy 의 의해 매 nothing happens. - **다중 sink 의 sequential 처리**: 매 single pass 가능한데 매 stream 을 매 두번 traverse. - **Tap 에 heavy I/O**: 매 main flow blocking — 매 fork 또는 async sink 로 분리. ## 🧪 검증 / 중복 - Verified (Effect docs 2026, RxJS 8, Akka Streams). - 신뢰도 A. ## 🕓 Changelog | 날짜 | 변경 | |---|---| | 2026-05-08 | Phase 1 | | 2026-05-10 | Manual cleanup — Tap/Sink 의 distinction + 8 patterns |