Files
connectai/src/core/queue.ts
T

82 lines
2.4 KiB
TypeScript

import * as os from 'os';
import { logInfo, logError } from '../utils';
/**
* Default concurrency = max(2, cpus - 1). Leaves one core for the VS Code UI
* thread and the extension host, scales up on bigger boxes. Static per-process
* (no dynamic adjustment) — kept simple because the heavy work (LLM calls)
* is gated by `missionId` locks elsewhere, not the action queue.
*/
function defaultConcurrencyLimit(): number {
try {
const cpus = os.cpus()?.length ?? 4;
return Math.max(2, cpus - 1);
} catch {
return 3;
}
}
/**
* ActionQueueManager: Manages large-scale tasks by processing them
* with a concurrency limit to prevent resource exhaustion and I/O bottlenecks
* while maintaining high throughput under maximum load.
*/
export class ActionQueueManager {
private queue: (() => Promise<void>)[] = [];
private activeCount: number = 0;
private readonly concurrencyLimit: number;
constructor(concurrencyLimit: number = defaultConcurrencyLimit()) {
this.concurrencyLimit = concurrencyLimit;
}
/**
* Adds a task to the queue.
*/
public async enqueue<T>(task: () => Promise<T>): Promise<T> {
return new Promise<T>((resolve, reject) => {
this.queue.push(async () => {
try {
const result = await task();
resolve(result);
} catch (error) {
reject(error);
}
});
this.processNext();
});
}
private async processNext() {
if (this.activeCount >= this.concurrencyLimit || this.queue.length === 0) return;
this.activeCount++;
const task = this.queue.shift();
if (task) {
try {
// Add a micro-delay to allow system breathing room between heavy I/O
await new Promise(r => setTimeout(r, 10));
await task();
} catch (error) {
logError('Task in queue failed:', error);
} finally {
this.activeCount--;
this.processNext();
}
} else {
this.activeCount--;
}
}
public getPendingCount(): number {
return this.queue.length;
}
public getActiveCount(): number {
return this.activeCount;
}
}
export const actionQueue = new ActionQueueManager();