Files
connectai/src/lmstudio/streamer.ts
T
koriweb 76d5fedfb5 v2.2.256: 코어 채팅 큰 입력 청킹·통합 + 실제 컨텍스트 창 정렬 + 모델 핸들 race 수정
큰 입력 시 "Failed to acquire LM Studio model handle … Operation canceled"
로 턴 전체가 죽던 문제를 3계층으로 해결. 일반 채팅(코어 경로)은 그동안
단일 예산 호출이라 약한 모델·큰 입력에서 무너졌다 — 그 갭을 메움.

- 핸들 race 수정: getModelHandle 을 재시도 루프 안으로 이동. 취소/죽은-핸들
  류 에러는 SDK 재생성 후 1회 자동 재시도(실제 사용자 취소는 존중). 라이프
  사이클의 동시 로드가 abort 되며 SDK 가 coalesce 한 JIT 조회까지 죽던 것.
- Phase 1 실제 창 정렬: llm.getContextLength()(캐시)로 실측 창에 예산 클램프.
  설정값보다 작은 창으로 로드된 경우 서버 truncation/빈 답변 차단. 배지에 표시.
- Phase 2 코어 Map-Reduce: 단일 입력이 (유효 창 × ratio) 초과 시 청크→질의
  인지형 추출→통합. 부분/전체 폴백, 무관 시 정직 신호. 동시성 기본 2.
- Phase 3 메타 노출: 진행/결과 배지 표시, [조각 k] 출처 옵트인.

신규 설정 5종. /meet·/review 전용 경로는 불변. 테스트 +25건, 전체 684 통과.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-19 18:05:44 +09:00

302 lines
16 KiB
TypeScript

import type { ILMStudioClient } from './client';
import { LMStudioLifecycleError } from './client';
import { logError, logInfo } from '../utils';
export interface ChatStreamMessage {
role: 'user' | 'assistant' | 'system';
content: string;
}
/** Shared sampling block. SDK and REST paths both read this — keep them in sync. */
export interface LmStudioSampling {
topP?: number;
topK?: number;
minP?: number;
repeatPenalty?: number;
}
/**
* Translate the sampling block into the OpenAI-compatible REST body extension that LM Studio
* understands. Ollama uses the same field names inside `options`. Returns an object you can
* spread into either body. Values <= 0 / <= 1 (penalty) are dropped so they fall back to engine
* defaults instead of effectively disabling sampling.
*/
export function samplingToRestBody(s: LmStudioSampling | undefined): Record<string, number> {
const out: Record<string, number> = {};
if (!s) return out;
if (typeof s.topP === 'number' && s.topP > 0 && s.topP <= 1) out.top_p = s.topP;
if (typeof s.topK === 'number' && s.topK > 0) out.top_k = s.topK;
if (typeof s.minP === 'number' && s.minP > 0 && s.minP <= 1) out.min_p = s.minP;
if (typeof s.repeatPenalty === 'number' && s.repeatPenalty > 1) out.repeat_penalty = s.repeatPenalty;
return out;
}
export interface ChatStreamRequest {
modelName: string;
messages: ChatStreamMessage[];
temperature: number;
/** Upper bound on tokens to generate. Omit to fall back to a conservative default. */
maxTokens?: number;
/** LM Studio context-overflow safety net used only if the prompt still exceeds the window. */
contextOverflowPolicy?: 'stopAtLimit' | 'truncateMiddle' | 'rollingWindow';
/** Sampling — defaults match small-model glitch-suppression presets. Each is omitted from the SDK call when undefined. */
topP?: number;
topK?: number;
minP?: number;
repeatPenalty?: number;
/** Draft model key for speculative decoding. Empty/undefined disables. */
draftModel?: string;
signal?: AbortSignal;
}
/** Subset of LM Studio's `PredictionResult.stats` we expose to callers. */
export interface ChatStreamStats {
tokensPerSecond?: number;
timeToFirstTokenSec?: number;
predictedTokensCount?: number;
promptTokensCount?: number;
totalTimeSec?: number;
/** Speculative decoding (only set when `draftModel` was used). */
draftModelKey?: string;
draftTokensCount?: number;
acceptedDraftTokensCount?: number;
}
/**
* One stream event. `token` carries generated text (possibly empty for the final event);
* `stopReason` is set on the *last* event only and is the SDK's `stats.stopReason`
* (e.g. `eosFound`, `maxPredictedTokensReached`, `contextLengthReached`, `userStopped`).
* `stats` is also set on the *last* event when LM Studio reports prediction stats.
*/
export interface ChatStreamEvent {
token: string;
stopReason?: string;
stats?: ChatStreamStats;
}
export interface IChatStreamer {
/** Token-level streaming for an LM Studio chat completion via the WebSocket SDK. */
stream(req: ChatStreamRequest): AsyncIterable<ChatStreamEvent>;
/**
* Drop the SDK's cached handle for `modelName`. Callers invoke this when
* the previous stream returned zero tokens with no error — a symptom of a
* silently-disposed handle that needs a fresh WebSocket round-trip.
*/
resetHandle?(modelName: string): Promise<void>;
/**
* The model's actually-loaded context window in tokens, or `undefined` if
* unavailable. Callers use this to budget against the real ceiling instead
* of the user's `contextLength` setting. Best-effort — never throws.
*/
getModelContextLength?(modelName: string): Promise<number | undefined>;
}
/**
* Adapter that streams LM Studio chat completions via @lmstudio/sdk's `model.respond()`,
* replacing the manual fetch + SSE parser path used for the OpenAI-compatible REST endpoint.
*
* Benefits over the REST path:
* - No SSE parsing (no `data: [DONE]` / partial-chunk fragility).
* - Reuses the same WebSocket the lifecycle manager already opened — handle lookup is cheap
* if the model is already loaded, and load-on-first-use is implicit when it isn't.
* - First-class `signal` support for user-cancel and abort propagation.
*/
export class LMStudioStreamer implements IChatStreamer {
constructor(private readonly client: ILMStudioClient) {}
async *stream(req: ChatStreamRequest): AsyncIterable<ChatStreamEvent> {
const trimmedModel = (req.modelName || '').trim();
if (!trimmedModel) {
throw new LMStudioLifecycleError('LMStudioStreamer.stream called without a model name.');
}
// One automatic retry path: when the first attempt blows up with a
// "Model is disposed!" / "lock() request could not be registered"
// error before any tokens have been yielded, we drop the cached SDK
// handle and try once more. These errors are caused by a previous
// aborted prediction leaving the SDK's internal handle map pointing
// at a dead WebSocket binding — a fresh client.model() lookup minted
// from a recreated SDK fixes it. We only retry when zero tokens have
// streamed: if the consumer already saw partial output, restarting
// would duplicate tokens.
for (let attempt = 1; attempt <= 2; attempt++) {
const refresh = attempt > 1;
// Handle acquisition is guarded on its own: it happens BEFORE the
// stream try/catch below, so without this an "Operation canceled"
// (the lifecycle manager's concurrent load for this same model was
// superseded/aborted and the SDK coalesced our JIT lookup into that
// dead load), a disposed handle, or a dropped WebSocket would crash
// the whole turn with no retry. Large inputs make this far more
// likely: loading a big model to hold a large prompt is slow, which
// widens the window for a concurrent switch/abort to land mid-load.
let model: Awaited<ReturnType<ILMStudioClient['getModelHandle']>>;
try {
model = await this.client.getModelHandle(trimmedModel, refresh ? { refresh: true } : undefined);
} catch (acqErr: any) {
// Genuine user cancel — don't retry, just stop quietly.
if (req.signal?.aborted || acqErr?.name === 'AbortError') return;
const acqMsg = String(acqErr?.message ?? acqErr);
if (this.isTransientHandleError(acqMsg) && attempt === 1) {
logInfo('LM Studio model handle acquisition hit a transient error — retrying with a fresh SDK.', { model: trimmedModel, error: acqMsg });
continue; // attempt 2 passes { refresh: true } → recreates the SDK client
}
logError('LM Studio model handle acquisition failed.', { model: trimmedModel, error: acqMsg, attempt });
throw acqErr;
}
logInfo('LM Studio SDK chat stream started.', { model: trimmedModel, messageCount: req.messages.length, attempt });
// Sampling defaults match the historical glitch-suppression preset for small /
// quantized models (한글 토큰 깨짐 방지) but are now overridable per-call.
const respondOpts: any = {
temperature: req.temperature,
maxTokens: req.maxTokens ?? 4096,
// Safety net: if our own token budgeting still underestimated and the prompt
// exceeds the model's context window, decide whether the SDK should fail
// loudly (stopAtLimit — default) or silently drop content.
contextOverflowPolicy: req.contextOverflowPolicy ?? 'stopAtLimit',
signal: req.signal,
};
if (typeof req.topP === 'number') respondOpts.topPSampling = req.topP;
if (typeof req.topK === 'number' && req.topK > 0) respondOpts.topKSampling = req.topK;
if (typeof req.minP === 'number' && req.minP > 0) respondOpts.minPSampling = req.minP;
if (typeof req.repeatPenalty === 'number' && req.repeatPenalty > 1) respondOpts.repeatPenalty = req.repeatPenalty;
// Speculative decoding — LM Studio loads the draft model lazily on first use if needed
// (we also `preloadDraftModel` after main load to avoid that cold cost).
if (req.draftModel && req.draftModel.trim()) respondOpts.draftModel = req.draftModel.trim();
const prediction = (model as any).respond(req.messages, respondOpts);
// Bridge AbortSignal → prediction.cancel(): without this, an
// aborted request keeps generating on the LM Studio server. The
// orphaned prediction holds locks on the model handle, which is
// a known cause of "lock() request could not be registered" on
// the very next request — the reused handle is still bound to a
// dead prediction.
const onAbort = () => {
try { (prediction as any)?.cancel?.(); } catch { /* swallow — best effort */ }
};
if (req.signal) {
if (req.signal.aborted) onAbort();
else req.signal.addEventListener('abort', onAbort, { once: true });
}
let yielded = 0;
let caught: any = null;
try {
for await (const fragment of prediction as AsyncIterable<{ content: string }>) {
if (req.signal?.aborted) return;
const token = fragment?.content ?? '';
if (token) {
yielded++;
yield { token };
}
}
} catch (err: any) {
if (req.signal?.aborted) return;
if (err?.name === 'AbortError') return;
caught = err;
} finally {
req.signal?.removeEventListener?.('abort', onAbort);
}
if (!caught) {
if (req.signal?.aborted) return;
// The prediction object is also a Promise<PredictionResult>; awaiting it after
// the stream drains gives us stats.stopReason so callers can tell a truncated
// answer (maxPredictedTokensReached / contextLengthReached) from a normal one,
// plus throughput numbers (tok/s, TTFT) we surface to the UI.
let stopReason: string | undefined;
let stats: ChatStreamEvent['stats'];
try {
const result: any = await prediction;
stopReason = result?.stats?.stopReason;
const s = result?.stats;
if (s) {
stats = {
tokensPerSecond: typeof s.tokensPerSecond === 'number' ? s.tokensPerSecond : undefined,
timeToFirstTokenSec: typeof s.timeToFirstTokenSec === 'number' ? s.timeToFirstTokenSec : undefined,
predictedTokensCount: typeof s.predictedTokensCount === 'number' ? s.predictedTokensCount : undefined,
promptTokensCount: typeof s.promptTokensCount === 'number' ? s.promptTokensCount : undefined,
totalTimeSec: typeof s.totalTimeSec === 'number' ? s.totalTimeSec : undefined,
draftModelKey: typeof s.usedDraftModelKey === 'string' ? s.usedDraftModelKey : undefined,
draftTokensCount: typeof s.totalDraftTokensCount === 'number' ? s.totalDraftTokensCount : undefined,
acceptedDraftTokensCount: typeof s.acceptedDraftTokensCount === 'number' ? s.acceptedDraftTokensCount : undefined,
};
}
if (stopReason || stats) {
logInfo('LM Studio SDK chat stream finished.', {
model: trimmedModel, stopReason, tokensYielded: yielded,
tokensPerSecond: stats?.tokensPerSecond, ttftSec: stats?.timeToFirstTokenSec,
});
}
} catch { /* result unavailable on some SDK versions — non-fatal */ }
// Empty-but-clean stream is treated like a dead handle on attempt 1:
// recreate the SDK and try once more. Same root cause (handle bound to
// a stale prediction) but no exception is thrown — just an empty stream.
if (yielded === 0 && attempt === 1) {
logInfo('Empty SDK stream with no error — retrying with a fresh SDK.', { model: trimmedModel });
continue;
}
// Don't claim `eosFound` if we couldn't actually read the stop reason — leave it
// undefined so the caller treats it as 'unknown' (and its mid-sentence heuristics kick in).
yield { token: '', stopReason, stats };
return;
}
const errMsg = String(caught?.message ?? caught);
const handleDead = this.isTransientHandleError(errMsg);
if (handleDead && yielded === 0 && attempt === 1) {
logInfo('Dead LM Studio handle detected — retrying with a fresh SDK.', { model: trimmedModel, error: errMsg });
continue;
}
logError('LM Studio SDK chat stream failed.', { model: trimmedModel, error: errMsg, attempt });
throw caught;
}
}
/**
* True when an error message indicates the SDK handle / WebSocket binding is
* dead, or its in-flight (coalesced) load was canceled out from under us —
* all fixable by recreating the SDK client so the next `llm.model()` lookup
* mints a fresh handle. Deliberately excludes genuine user aborts, which are
* caught earlier via `req.signal.aborted` / `AbortError` before reaching here.
*/
private isTransientHandleError(errMsg: string): boolean {
return (
/\bdisposed\b/i.test(errMsg)
|| /lock\(\) request could not be registered/i.test(errMsg)
|| /channel\s+closed/i.test(errMsg)
|| /WebSocket\s+(?:is\s+not\s+open|closed|disconnected)/i.test(errMsg)
|| /Connection\s+(?:lost|reset|closed)/i.test(errMsg)
|| /\bECONNRESET\b/i.test(errMsg)
|| /socket\s+hang\s*up/i.test(errMsg)
// The lifecycle manager's load got superseded/aborted and the SDK
// coalesced our JIT model() lookup into that canceled load.
|| /\boperation\s+cancell?ed\b/i.test(errMsg)
);
}
async getModelContextLength(modelName: string): Promise<number | undefined> {
const trimmed = (modelName || '').trim();
if (!trimmed) return undefined;
try {
return await this.client.getModelContextLength(trimmed);
} catch {
return undefined; // best-effort — caller falls back to the configured window
}
}
async resetHandle(modelName: string): Promise<void> {
const trimmed = (modelName || '').trim();
if (!trimmed) return;
try {
await this.client.getModelHandle(trimmed, { refresh: true });
} catch (err: any) {
// Best effort — caller will see the next stream() attempt fail
// with a normal error path if the refresh itself was broken.
logError('LM Studio handle reset failed.', { model: trimmed, error: err?.message ?? String(err) });
}
}
}