/** * Queue Manager * * Manages task queue for concurrency control. * Implements FIFO queue with configurable max concurrent tasks. * * @see Requirements 10.1, 10.2, 10.4 */ import { TaskQueueState, MAX_CONCURRENT, createInitialQueueState, } from '../streams/taskQueue' /** * Result of checkAndEnqueue operation */ export interface EnqueueResult { status: 'RUNNING' | 'QUEUED' queuePosition: number | null } /** * Queue Manager class for managing task concurrency * * This class provides methods to: * - Check if a task can run immediately or needs to be queued * - Dequeue the next task when a slot becomes available * - Get the current queue position for a task */ export class QueueManager { private state: TaskQueueState constructor(initialState?: TaskQueueState) { this.state = initialState ?? createInitialQueueState() } /** * Get the current queue state */ getState(): TaskQueueState { return { ...this.state } } /** * Check if a new task can run immediately or needs to be queued. * If the task can run, increments runningCount. * If the task needs to be queued, adds it to the queue. * * @param sessionId - The unique session ID for the task * @returns EnqueueResult with status and queue position * * @see Requirements 10.1, 10.4 */ checkAndEnqueue(sessionId: string): EnqueueResult { // Check if task is already in queue or running if (this.state.queue.includes(sessionId)) { const position = this.state.queue.indexOf(sessionId) + 1 return { status: 'QUEUED', queuePosition: position } } // Check if we can run immediately if (this.state.runningCount < this.state.maxConcurrent) { this.state.runningCount++ return { status: 'RUNNING', queuePosition: null } } // Need to queue the task this.state.queue.push(sessionId) const queuePosition = this.state.queue.length return { status: 'QUEUED', queuePosition } } /** * Dequeue the next task when a running task completes. * Decrements runningCount and returns the next queued sessionId if available. * * @returns The sessionId of the next task to run, or null if queue is empty * * @see Requirements 10.2 */ dequeueNext(): string | null { // Decrement running count (a task just completed) if (this.state.runningCount > 0) { this.state.runningCount-- } // Check if there's a queued task to start if (this.state.queue.length === 0) { return null } // Get the next task from the front of the queue (FIFO) const nextSessionId = this.state.queue.shift()! // Increment running count for the new task this.state.runningCount++ return nextSessionId } /** * Get the queue position for a specific session. * Returns 1-based position, or 0 if not in queue. * * @param sessionId - The session ID to look up * @returns Queue position (1-based), or 0 if not in queue * * @see Requirements 10.4 */ getQueuePosition(sessionId: string): number { const index = this.state.queue.indexOf(sessionId) return index === -1 ? 0 : index + 1 } /** * Remove a task from the queue (e.g., if cancelled) * * @param sessionId - The session ID to remove * @returns true if the task was removed, false if not found */ removeFromQueue(sessionId: string): boolean { const index = this.state.queue.indexOf(sessionId) if (index === -1) { return false } this.state.queue.splice(index, 1) return true } /** * Mark a task as no longer running (without dequeuing next) * Used when a task fails or is cancelled while running */ markTaskCompleted(): void { if (this.state.runningCount > 0) { this.state.runningCount-- } } /** * Check if the queue has capacity for immediate execution */ hasCapacity(): boolean { return this.state.runningCount < this.state.maxConcurrent } /** * Get the number of tasks currently running */ getRunningCount(): number { return this.state.runningCount } /** * Get the number of tasks in the queue */ getQueueLength(): number { return this.state.queue.length } } // ============================================================================ // Standalone Functions (for use without class instance) // ============================================================================ /** * Check if a new task can run immediately or needs to be queued. * Returns a new state and the result. * * @param state - Current queue state * @param sessionId - The unique session ID for the task * @returns Tuple of [newState, result] */ export function checkAndEnqueue( state: TaskQueueState, sessionId: string ): [TaskQueueState, EnqueueResult] { const newState = { ...state, queue: [...state.queue] } // Check if task is already in queue if (newState.queue.includes(sessionId)) { const position = newState.queue.indexOf(sessionId) + 1 return [newState, { status: 'QUEUED', queuePosition: position }] } // Check if we can run immediately if (newState.runningCount < newState.maxConcurrent) { newState.runningCount++ return [newState, { status: 'RUNNING', queuePosition: null }] } // Need to queue the task newState.queue.push(sessionId) const queuePosition = newState.queue.length return [newState, { status: 'QUEUED', queuePosition }] } /** * Dequeue the next task when a running task completes. * Returns a new state and the next sessionId. * * @param state - Current queue state * @returns Tuple of [newState, nextSessionId or null] */ export function dequeueNext( state: TaskQueueState ): [TaskQueueState, string | null] { const newState = { ...state, queue: [...state.queue] } // Decrement running count (a task just completed) if (newState.runningCount > 0) { newState.runningCount-- } // Check if there's a queued task to start if (newState.queue.length === 0) { return [newState, null] } // Get the next task from the front of the queue (FIFO) const nextSessionId = newState.queue.shift()! // Increment running count for the new task newState.runningCount++ return [newState, nextSessionId] } /** * Get the queue position for a specific session. * Returns 1-based position, or 0 if not in queue. * * @param state - Current queue state * @param sessionId - The session ID to look up * @returns Queue position (1-based), or 0 if not in queue */ export function getQueuePosition( state: TaskQueueState, sessionId: string ): number { const index = state.queue.indexOf(sessionId) return index === -1 ? 0 : index + 1 }