Files
connectai/src/lmstudio/streamer.ts
T

163 lines
7.6 KiB
TypeScript

import type { ILMStudioClient } from './client';
import { LMStudioLifecycleError } from './client';
import { logError, logInfo } from '../utils';
export interface ChatStreamMessage {
role: 'user' | 'assistant' | 'system';
content: string;
}
export interface ChatStreamRequest {
modelName: string;
messages: ChatStreamMessage[];
temperature: number;
/** Upper bound on tokens to generate. Omit to fall back to a conservative default. */
maxTokens?: number;
/** LM Studio context-overflow safety net used only if the prompt still exceeds the window. */
contextOverflowPolicy?: 'stopAtLimit' | 'truncateMiddle' | 'rollingWindow';
signal?: AbortSignal;
}
/**
* One stream event. `token` carries generated text (possibly empty for the final event);
* `stopReason` is set on the *last* event only and is the SDK's `stats.stopReason`
* (e.g. `eosFound`, `maxPredictedTokensReached`, `contextLengthReached`, `userStopped`).
*/
export interface ChatStreamEvent {
token: string;
stopReason?: string;
}
export interface IChatStreamer {
/** Token-level streaming for an LM Studio chat completion via the WebSocket SDK. */
stream(req: ChatStreamRequest): AsyncIterable<ChatStreamEvent>;
/**
* Drop the SDK's cached handle for `modelName`. Callers invoke this when
* the previous stream returned zero tokens with no error — a symptom of a
* silently-disposed handle that needs a fresh WebSocket round-trip.
*/
resetHandle?(modelName: string): Promise<void>;
}
/**
* Adapter that streams LM Studio chat completions via @lmstudio/sdk's `model.respond()`,
* replacing the manual fetch + SSE parser path used for the OpenAI-compatible REST endpoint.
*
* Benefits over the REST path:
* - No SSE parsing (no `data: [DONE]` / partial-chunk fragility).
* - Reuses the same WebSocket the lifecycle manager already opened — handle lookup is cheap
* if the model is already loaded, and load-on-first-use is implicit when it isn't.
* - First-class `signal` support for user-cancel and abort propagation.
*/
export class LMStudioStreamer implements IChatStreamer {
constructor(private readonly client: ILMStudioClient) {}
async *stream(req: ChatStreamRequest): AsyncIterable<ChatStreamEvent> {
const trimmedModel = (req.modelName || '').trim();
if (!trimmedModel) {
throw new LMStudioLifecycleError('LMStudioStreamer.stream called without a model name.');
}
// One automatic retry path: when the first attempt blows up with a
// "Model is disposed!" / "lock() request could not be registered"
// error before any tokens have been yielded, we drop the cached SDK
// handle and try once more. These errors are caused by a previous
// aborted prediction leaving the SDK's internal handle map pointing
// at a dead WebSocket binding — a fresh client.model() lookup minted
// from a recreated SDK fixes it. We only retry when zero tokens have
// streamed: if the consumer already saw partial output, restarting
// would duplicate tokens.
for (let attempt = 1; attempt <= 2; attempt++) {
const refresh = attempt > 1;
const model = await this.client.getModelHandle(trimmedModel, refresh ? { refresh: true } : undefined);
logInfo('LM Studio SDK chat stream started.', { model: trimmedModel, messageCount: req.messages.length, attempt });
const prediction = (model as any).respond(req.messages, {
temperature: req.temperature,
maxTokens: req.maxTokens ?? 4096,
// Safety net: if our own token budgeting still underestimated and the prompt
// exceeds the model's context window, decide whether the SDK should fail
// loudly (stopAtLimit — default) or silently drop content.
contextOverflowPolicy: req.contextOverflowPolicy ?? 'stopAtLimit',
signal: req.signal,
});
// Bridge AbortSignal → prediction.cancel(): without this, an
// aborted request keeps generating on the LM Studio server. The
// orphaned prediction holds locks on the model handle, which is
// a known cause of "lock() request could not be registered" on
// the very next request — the reused handle is still bound to a
// dead prediction.
const onAbort = () => {
try { (prediction as any)?.cancel?.(); } catch { /* swallow — best effort */ }
};
if (req.signal) {
if (req.signal.aborted) onAbort();
else req.signal.addEventListener('abort', onAbort, { once: true });
}
let yielded = 0;
let caught: any = null;
try {
for await (const fragment of prediction as AsyncIterable<{ content: string }>) {
if (req.signal?.aborted) return;
const token = fragment?.content ?? '';
if (token) {
yielded++;
yield { token };
}
}
} catch (err: any) {
if (req.signal?.aborted) return;
if (err?.name === 'AbortError') return;
caught = err;
} finally {
req.signal?.removeEventListener?.('abort', onAbort);
}
if (!caught) {
if (req.signal?.aborted) return;
// The prediction object is also a Promise<PredictionResult>; awaiting it after
// the stream drains gives us stats.stopReason so callers can tell a truncated
// answer (maxPredictedTokensReached / contextLengthReached) from a normal one.
let stopReason: string | undefined;
try {
const result: any = await prediction;
stopReason = result?.stats?.stopReason;
if (stopReason) {
logInfo('LM Studio SDK chat stream finished.', { model: trimmedModel, stopReason, tokensYielded: yielded });
}
} catch { /* result unavailable on some SDK versions — non-fatal */ }
// Don't claim `eosFound` if we couldn't actually read the stop reason — leave it
// undefined so the caller treats it as 'unknown' (and its mid-sentence heuristics kick in).
yield { token: '', stopReason };
return;
}
const errMsg = String(caught?.message ?? caught);
const handleDead = /\bdisposed\b/i.test(errMsg)
|| /lock\(\) request could not be registered/i.test(errMsg);
if (handleDead && yielded === 0 && attempt === 1) {
logInfo('Dead LM Studio handle detected — retrying with a fresh SDK.', { model: trimmedModel, error: errMsg });
continue;
}
logError('LM Studio SDK chat stream failed.', { model: trimmedModel, error: errMsg, attempt });
throw caught;
}
}
async resetHandle(modelName: string): Promise<void> {
const trimmed = (modelName || '').trim();
if (!trimmed) return;
try {
await this.client.getModelHandle(trimmed, { refresh: true });
} catch (err: any) {
// Best effort — caller will see the next stream() attempt fail
// with a normal error path if the refresh itself was broken.
logError('LM Studio handle reset failed.', { model: trimmed, error: err?.message ?? String(err) });
}
}
}