Files
connectai/src/agent/handlePrompt/largeInputMapReduce.ts
T
koriweb 76d5fedfb5 v2.2.256: 코어 채팅 큰 입력 청킹·통합 + 실제 컨텍스트 창 정렬 + 모델 핸들 race 수정
큰 입력 시 "Failed to acquire LM Studio model handle … Operation canceled"
로 턴 전체가 죽던 문제를 3계층으로 해결. 일반 채팅(코어 경로)은 그동안
단일 예산 호출이라 약한 모델·큰 입력에서 무너졌다 — 그 갭을 메움.

- 핸들 race 수정: getModelHandle 을 재시도 루프 안으로 이동. 취소/죽은-핸들
  류 에러는 SDK 재생성 후 1회 자동 재시도(실제 사용자 취소는 존중). 라이프
  사이클의 동시 로드가 abort 되며 SDK 가 coalesce 한 JIT 조회까지 죽던 것.
- Phase 1 실제 창 정렬: llm.getContextLength()(캐시)로 실측 창에 예산 클램프.
  설정값보다 작은 창으로 로드된 경우 서버 truncation/빈 답변 차단. 배지에 표시.
- Phase 2 코어 Map-Reduce: 단일 입력이 (유효 창 × ratio) 초과 시 청크→질의
  인지형 추출→통합. 부분/전체 폴백, 무관 시 정직 신호. 동시성 기본 2.
- Phase 3 메타 노출: 진행/결과 배지 표시, [조각 k] 출처 옵트인.

신규 설정 5종. /meet·/review 전용 경로는 불변. 테스트 +25건, 전체 684 통과.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-19 18:05:44 +09:00

266 lines
12 KiB
TypeScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
/**
* ============================================================
* Large-Input Map-Reduce (큰 입력 청킹 + 통합)
*
* 한 번에 컨텍스트 창에 안 들어가는 단일 사용자 입력(긴 회의록·리서치 덤프 등)을
* 1) 청크로 분할(Map 대상)
* 2) 각 청크에서 "요청과 관련된 사실만" 발췌 (질의 인지형 추출 — 일반 요약 X)
* 3) 발췌들을 통합(Reduce). 합본이 또 창을 넘으면 계층적으로 재통합.
* 한 뒤, 압축된 컨텍스트를 돌려줘 정상 스트리밍 경로가 최종 답변을 생성하게 한다.
*
* 신뢰성 원칙(ASTRA): 추측·창작 금지, 원문 표현 보존, 출처(`[조각 k]`) 태깅,
* 전부 무관하면 정직하게 "관련 내용 없음" 신호.
*
* LLM 호출은 `callLLM` 으로 주입 → 코어 로직은 네트워크 의존 없이 단위 테스트 가능.
* ============================================================
*/
import type { ChatMessage } from '../../agent';
import { splitIntoSections } from '../../retrieval/chunker';
export interface MapReduceConfig {
enabled: boolean;
/** 단일 입력 토큰 > (유효 창 × triggerRatio) 이면 발동. */
triggerRatio: number;
concurrency: number;
maxDepth: number;
showProvenance: boolean;
}
export interface MapReduceDeps {
/** 메시지 배열 → 모델 응답 텍스트. (callNonStreaming 래퍼) */
callLLM: (messages: ChatMessage[], maxTokens: number) => Promise<string>;
estimateTokens: (text: string) => number;
log?: (msg: string, meta?: Record<string, unknown>) => void;
signal?: AbortSignal;
}
export interface MapReduceParams {
/** 사용자 요청 의도 힌트 (보통 원본 입력의 머리/꼬리 발췌 — 지시문이 거기 있음). */
intent: string;
/** 청킹 대상이 되는 큰 본문. */
largeContent: string;
/** 유효 컨텍스트 창(토큰) — Phase 1 의 effectiveContextLength. */
windowTokens: number;
/** 시스템 프롬프트가 이미 차지한 토큰. */
systemTokens: number;
safetyMargin: number;
cfg: MapReduceConfig;
}
export interface MapReduceResult {
/** 통합된 관련 자료. 정상 경로에서 사용자 메시지 본문을 이걸로 대체. */
condensedContext: string;
chunkCount: number;
relevantCount: number;
reduceDepth: number;
/** 모든 청크가 무관 → 호출 측에서 정직한 에스컬레이션. */
allIrrelevant: boolean;
}
const IRRELEVANT_MARKER = '(관련 없음)';
/** 추출/통합 호출이 쓸 출력 토큰 상한 — 발췌는 원문보다 짧으므로 보수적으로. */
const EXTRACT_OUTPUT_TOKENS = 1024;
const REDUCE_OUTPUT_TOKENS = 2048;
/** 토큰→문자 환산(한국어 보수치 ~2자/토큰). 청크 크기 산정용. */
const CHARS_PER_TOKEN = 2;
/** 유효 창에서 입력에 쓸 수 있는 토큰 예산. computeBudgetedRequest 와 같은 공식. */
export function inputBudgetTokens(windowTokens: number, systemTokens: number, safetyMargin: number): number {
const outputReserve = Math.max(2048, Math.floor(windowTokens * 0.1));
return Math.max(256, windowTokens - systemTokens - outputReserve - safetyMargin);
}
/** 단일 입력이 map-reduce 대상인지. (cfg.enabled + 입력이 창의 triggerRatio 초과) */
export function shouldMapReduce(latestUserTokens: number, windowTokens: number, cfg: MapReduceConfig): boolean {
if (!cfg.enabled) return false;
if (windowTokens <= 0) return false;
return latestUserTokens > windowTokens * cfg.triggerRatio;
}
/** 한 청크가 (자기 + 추출 프롬프트 오버헤드 + 출력 예약)으로 창에 들어가도록 문자 상한 산정. */
export function chunkCharBudget(windowTokens: number, systemTokens: number, safetyMargin: number): number {
// 추출 프롬프트 자체 오버헤드(지시문 + intent) ~800 토큰 가정.
const promptOverhead = 800;
const perChunkTokenBudget = Math.max(
512,
windowTokens - systemTokens - safetyMargin - EXTRACT_OUTPUT_TOKENS - promptOverhead
);
// 보수적으로 70% 만 사용 (추정 오차 흡수).
return Math.floor(perChunkTokenBudget * CHARS_PER_TOKEN * 0.7);
}
function buildExtractPrompt(intent: string, chunkText: string, idx: number, total: number): ChatMessage[] {
const system = [
'너는 긴 자료에서 사용자 요청에 필요한 사실만 정확히 발췌하는 추출기다.',
'규칙:',
'1) 사용자 요청과 직접 관련된 사실·수치·발언·결정사항만 원문 표현 그대로 발췌한다.',
'2) 요약·추측·창작·일반화 금지. 자료에 없는 내용은 절대 만들지 않는다.',
`3) 이 조각에 관련 내용이 전혀 없으면 정확히 "${IRRELEVANT_MARKER}" 한 줄만 출력한다.`,
'4) 불릿(-)으로 간결하게. 각 항목은 자료에 근거해야 한다.',
].join('\n');
const user = [
`[사용자 요청 의도]\n${intent}`,
`\n[자료 조각 ${idx}/${total}]\n${chunkText}`,
`\n위 조각에서 요청 수행에 필요한 사실만 발췌하라. 없으면 "${IRRELEVANT_MARKER}".`,
].join('\n');
return [
{ role: 'system', content: system },
{ role: 'user', content: user },
];
}
function buildReducePrompt(intent: string, extractions: string): ChatMessage[] {
const system = [
'너는 여러 발췌를 중복 없이 하나로 통합하는 통합기다.',
'규칙: 발췌에 있는 사실만 유지하고, 중복은 병합한다. 추측·창작 금지.',
'원문 사실과 (있다면) [조각 k] 출처 표기를 보존한다.',
].join('\n');
const user = `[사용자 요청 의도]\n${intent}\n\n[발췌 모음]\n${extractions}\n\n위 발췌들을 요청 관점에서 중복 없이 통합하라.`;
return [
{ role: 'system', content: system },
{ role: 'user', content: user },
];
}
/** 동시성 제한 map. 순서 보존. */
async function mapWithConcurrency<T, R>(
items: T[],
limit: number,
fn: (item: T, index: number) => Promise<R>,
signal?: AbortSignal,
): Promise<R[]> {
const results: R[] = new Array(items.length);
let next = 0;
const n = Math.max(1, Math.min(limit, items.length));
const workers = Array.from({ length: n }, async () => {
while (true) {
if (signal?.aborted) return;
const i = next++;
if (i >= items.length) return;
results[i] = await fn(items[i], i);
}
});
await Promise.all(workers);
return results;
}
function isIrrelevant(text: string): boolean {
const t = (text || '').trim();
return t.length === 0 || t === IRRELEVANT_MARKER || /^\(?\s*관련\s*없음\s*\)?$/.test(t);
}
/**
* 큰 입력을 청크→추출→통합한다. 호출 측은 trigger 를 이미 통과시킨 뒤 호출한다고 가정하지만,
* 방어적으로 단일 청크면 추출만 하고 통합은 건너뛴다.
*/
export async function runMapReduce(deps: MapReduceDeps, params: MapReduceParams): Promise<MapReduceResult> {
const { intent, largeContent, windowTokens, systemTokens, safetyMargin, cfg } = params;
const log = deps.log ?? (() => {});
const targetChars = chunkCharBudget(windowTokens, systemTokens, safetyMargin);
const sections = splitIntoSections(largeContent, {
targetChars,
maxChars: targetChars * 2,
});
const chunks = sections.map((s) => s.text);
log('Map-reduce: split large input into chunks.', { chunkCount: chunks.length, targetChars });
// ── Map: 각 청크 → 질의 인지형 추출 ──────────────────────────────────
const extracted = await mapWithConcurrency(
chunks,
cfg.concurrency,
async (chunk, i) => {
if (deps.signal?.aborted) return '';
try {
const text = await deps.callLLM(
buildExtractPrompt(intent, chunk, i + 1, chunks.length),
EXTRACT_OUTPUT_TOKENS,
);
return text ?? '';
} catch (e: any) {
// 한 청크 실패가 전체를 막지 않게 — 원문 일부로 폴백(빈손보다 낫다).
log('Map-reduce: chunk extraction failed — falling back to truncated raw.', { chunk: i + 1, error: e?.message ?? String(e) });
return chunk.slice(0, targetChars);
}
},
deps.signal,
);
const relevant: string[] = [];
extracted.forEach((text, i) => {
if (isIrrelevant(text)) return;
relevant.push(cfg.showProvenance ? `[조각 ${i + 1}]\n${text.trim()}` : text.trim());
});
if (relevant.length === 0) {
log('Map-reduce: every chunk was irrelevant.', { chunkCount: chunks.length });
return { condensedContext: '', chunkCount: chunks.length, relevantCount: 0, reduceDepth: 0, allIrrelevant: true };
}
// ── Reduce: 합본이 입력 예산에 들어갈 때까지 계층적으로 통합 ──────────
const budget = inputBudgetTokens(windowTokens, systemTokens, safetyMargin);
// intent 분량 + 헤더 여유를 위해 예산의 80% 를 컨텍스트 상한으로.
const contextCeiling = Math.floor(budget * 0.8);
let current = relevant;
let depth = 0;
while (depth < cfg.maxDepth) {
const joined = current.join('\n\n');
if (deps.estimateTokens(joined) <= contextCeiling) break;
// 그룹으로 묶어 각 그룹을 통합 → 개수 감소.
const groups = groupToFit(current, deps.estimateTokens, contextCeiling);
if (groups.length >= current.length) break; // 더 못 줄임 — 마지막에 잘림 처리
log('Map-reduce: hierarchical reduce round.', { depth: depth + 1, from: current.length, to: groups.length });
current = await mapWithConcurrency(
groups,
cfg.concurrency,
async (group) => {
if (deps.signal?.aborted) return group.join('\n\n');
try {
return await deps.callLLM(buildReducePrompt(intent, group.join('\n\n')), REDUCE_OUTPUT_TOKENS);
} catch {
return group.join('\n\n'); // 통합 실패 → 원본 그룹 유지
}
},
deps.signal,
);
depth++;
}
let condensed = current.join('\n\n');
// maxDepth 도달했는데도 넘치면 하드 트렁케이트(서버 overflow 방지) + 경고는 호출 측에서.
if (deps.estimateTokens(condensed) > contextCeiling) {
const charCeiling = contextCeiling * CHARS_PER_TOKEN;
condensed = condensed.slice(0, charCeiling) + '\n\n[…자료가 많아 일부 생략됨]';
log('Map-reduce: reduce hit max depth and was hard-truncated.', { maxDepth: cfg.maxDepth });
}
return {
condensedContext: condensed,
chunkCount: chunks.length,
relevantCount: relevant.length,
reduceDepth: depth,
allIrrelevant: false,
};
}
/** 항목들을 순서대로 누적해 ceiling 을 넘기 직전까지 한 그룹으로 묶는다. */
function groupToFit(items: string[], estimate: (s: string) => number, ceiling: number): string[][] {
const groups: string[][] = [];
let cur: string[] = [];
let curTokens = 0;
for (const item of items) {
const t = estimate(item);
if (cur.length > 0 && curTokens + t > ceiling) {
groups.push(cur);
cur = [];
curTokens = 0;
}
cur.push(item);
curTokens += t;
}
if (cur.length > 0) groups.push(cur);
return groups;
}