- 移除 Motia Streams 实时通信,改用 3 秒轮询 - 简化前端代码,移除冗余组件 - 简化后端架构,准备 FastAPI 重构 - 更新 pixi.toml 环境配置 - 保留 bttoxin_digger_v5_repro 作为参考文档 Co-Authored-By: Claude <noreply@anthropic.com>
246 lines
6.5 KiB
TypeScript
246 lines
6.5 KiB
TypeScript
/**
|
|
* Queue Manager
|
|
*
|
|
* Manages task queue for concurrency control.
|
|
* Implements FIFO queue with configurable max concurrent tasks.
|
|
*
|
|
* @see Requirements 10.1, 10.2, 10.4
|
|
*/
|
|
|
|
import {
|
|
TaskQueueState,
|
|
MAX_CONCURRENT,
|
|
createInitialQueueState,
|
|
} from '../streams/taskQueue'
|
|
|
|
/**
|
|
* Result of checkAndEnqueue operation
|
|
*/
|
|
export interface EnqueueResult {
|
|
status: 'RUNNING' | 'QUEUED'
|
|
queuePosition: number | null
|
|
}
|
|
|
|
/**
|
|
* Queue Manager class for managing task concurrency
|
|
*
|
|
* This class provides methods to:
|
|
* - Check if a task can run immediately or needs to be queued
|
|
* - Dequeue the next task when a slot becomes available
|
|
* - Get the current queue position for a task
|
|
*/
|
|
export class QueueManager {
|
|
private state: TaskQueueState
|
|
|
|
constructor(initialState?: TaskQueueState) {
|
|
this.state = initialState ?? createInitialQueueState()
|
|
}
|
|
|
|
/**
|
|
* Get the current queue state
|
|
*/
|
|
getState(): TaskQueueState {
|
|
return { ...this.state }
|
|
}
|
|
|
|
/**
|
|
* Check if a new task can run immediately or needs to be queued.
|
|
* If the task can run, increments runningCount.
|
|
* If the task needs to be queued, adds it to the queue.
|
|
*
|
|
* @param sessionId - The unique session ID for the task
|
|
* @returns EnqueueResult with status and queue position
|
|
*
|
|
* @see Requirements 10.1, 10.4
|
|
*/
|
|
checkAndEnqueue(sessionId: string): EnqueueResult {
|
|
// Check if task is already in queue or running
|
|
if (this.state.queue.includes(sessionId)) {
|
|
const position = this.state.queue.indexOf(sessionId) + 1
|
|
return { status: 'QUEUED', queuePosition: position }
|
|
}
|
|
|
|
// Check if we can run immediately
|
|
if (this.state.runningCount < this.state.maxConcurrent) {
|
|
this.state.runningCount++
|
|
return { status: 'RUNNING', queuePosition: null }
|
|
}
|
|
|
|
// Need to queue the task
|
|
this.state.queue.push(sessionId)
|
|
const queuePosition = this.state.queue.length
|
|
return { status: 'QUEUED', queuePosition }
|
|
}
|
|
|
|
/**
|
|
* Dequeue the next task when a running task completes.
|
|
* Decrements runningCount and returns the next queued sessionId if available.
|
|
*
|
|
* @returns The sessionId of the next task to run, or null if queue is empty
|
|
*
|
|
* @see Requirements 10.2
|
|
*/
|
|
dequeueNext(): string | null {
|
|
// Decrement running count (a task just completed)
|
|
if (this.state.runningCount > 0) {
|
|
this.state.runningCount--
|
|
}
|
|
|
|
// Check if there's a queued task to start
|
|
if (this.state.queue.length === 0) {
|
|
return null
|
|
}
|
|
|
|
// Get the next task from the front of the queue (FIFO)
|
|
const nextSessionId = this.state.queue.shift()!
|
|
|
|
// Increment running count for the new task
|
|
this.state.runningCount++
|
|
|
|
return nextSessionId
|
|
}
|
|
|
|
/**
|
|
* Get the queue position for a specific session.
|
|
* Returns 1-based position, or 0 if not in queue.
|
|
*
|
|
* @param sessionId - The session ID to look up
|
|
* @returns Queue position (1-based), or 0 if not in queue
|
|
*
|
|
* @see Requirements 10.4
|
|
*/
|
|
getQueuePosition(sessionId: string): number {
|
|
const index = this.state.queue.indexOf(sessionId)
|
|
return index === -1 ? 0 : index + 1
|
|
}
|
|
|
|
/**
|
|
* Remove a task from the queue (e.g., if cancelled)
|
|
*
|
|
* @param sessionId - The session ID to remove
|
|
* @returns true if the task was removed, false if not found
|
|
*/
|
|
removeFromQueue(sessionId: string): boolean {
|
|
const index = this.state.queue.indexOf(sessionId)
|
|
if (index === -1) {
|
|
return false
|
|
}
|
|
this.state.queue.splice(index, 1)
|
|
return true
|
|
}
|
|
|
|
/**
|
|
* Mark a task as no longer running (without dequeuing next)
|
|
* Used when a task fails or is cancelled while running
|
|
*/
|
|
markTaskCompleted(): void {
|
|
if (this.state.runningCount > 0) {
|
|
this.state.runningCount--
|
|
}
|
|
}
|
|
|
|
/**
|
|
* Check if the queue has capacity for immediate execution
|
|
*/
|
|
hasCapacity(): boolean {
|
|
return this.state.runningCount < this.state.maxConcurrent
|
|
}
|
|
|
|
/**
|
|
* Get the number of tasks currently running
|
|
*/
|
|
getRunningCount(): number {
|
|
return this.state.runningCount
|
|
}
|
|
|
|
/**
|
|
* Get the number of tasks in the queue
|
|
*/
|
|
getQueueLength(): number {
|
|
return this.state.queue.length
|
|
}
|
|
}
|
|
|
|
// ============================================================================
|
|
// Standalone Functions (for use without class instance)
|
|
// ============================================================================
|
|
|
|
/**
|
|
* Check if a new task can run immediately or needs to be queued.
|
|
* Returns a new state and the result.
|
|
*
|
|
* @param state - Current queue state
|
|
* @param sessionId - The unique session ID for the task
|
|
* @returns Tuple of [newState, result]
|
|
*/
|
|
export function checkAndEnqueue(
|
|
state: TaskQueueState,
|
|
sessionId: string
|
|
): [TaskQueueState, EnqueueResult] {
|
|
const newState = { ...state, queue: [...state.queue] }
|
|
|
|
// Check if task is already in queue
|
|
if (newState.queue.includes(sessionId)) {
|
|
const position = newState.queue.indexOf(sessionId) + 1
|
|
return [newState, { status: 'QUEUED', queuePosition: position }]
|
|
}
|
|
|
|
// Check if we can run immediately
|
|
if (newState.runningCount < newState.maxConcurrent) {
|
|
newState.runningCount++
|
|
return [newState, { status: 'RUNNING', queuePosition: null }]
|
|
}
|
|
|
|
// Need to queue the task
|
|
newState.queue.push(sessionId)
|
|
const queuePosition = newState.queue.length
|
|
return [newState, { status: 'QUEUED', queuePosition }]
|
|
}
|
|
|
|
/**
|
|
* Dequeue the next task when a running task completes.
|
|
* Returns a new state and the next sessionId.
|
|
*
|
|
* @param state - Current queue state
|
|
* @returns Tuple of [newState, nextSessionId or null]
|
|
*/
|
|
export function dequeueNext(
|
|
state: TaskQueueState
|
|
): [TaskQueueState, string | null] {
|
|
const newState = { ...state, queue: [...state.queue] }
|
|
|
|
// Decrement running count (a task just completed)
|
|
if (newState.runningCount > 0) {
|
|
newState.runningCount--
|
|
}
|
|
|
|
// Check if there's a queued task to start
|
|
if (newState.queue.length === 0) {
|
|
return [newState, null]
|
|
}
|
|
|
|
// Get the next task from the front of the queue (FIFO)
|
|
const nextSessionId = newState.queue.shift()!
|
|
|
|
// Increment running count for the new task
|
|
newState.runningCount++
|
|
|
|
return [newState, nextSessionId]
|
|
}
|
|
|
|
/**
|
|
* Get the queue position for a specific session.
|
|
* Returns 1-based position, or 0 if not in queue.
|
|
*
|
|
* @param state - Current queue state
|
|
* @param sessionId - The session ID to look up
|
|
* @returns Queue position (1-based), or 0 if not in queue
|
|
*/
|
|
export function getQueuePosition(
|
|
state: TaskQueueState,
|
|
sessionId: string
|
|
): number {
|
|
const index = state.queue.indexOf(sessionId)
|
|
return index === -1 ? 0 : index + 1
|
|
}
|