Files
bttoxin-pipeline/motia-backend/lib/queue-manager.ts
zly fe353fc0bc chore: 初始版本提交 - 简化架构 + 轮询改造
- 移除 Motia Streams 实时通信,改用 3 秒轮询
- 简化前端代码,移除冗余组件
- 简化后端架构,准备 FastAPI 重构
- 更新 pixi.toml 环境配置
- 保留 bttoxin_digger_v5_repro 作为参考文档

Co-Authored-By: Claude <noreply@anthropic.com>
2026-01-13 16:50:09 +08:00

246 lines
6.5 KiB
TypeScript

/**
* Queue Manager
*
* Manages task queue for concurrency control.
* Implements FIFO queue with configurable max concurrent tasks.
*
* @see Requirements 10.1, 10.2, 10.4
*/
import {
TaskQueueState,
MAX_CONCURRENT,
createInitialQueueState,
} from '../streams/taskQueue'
/**
* Result of checkAndEnqueue operation
*/
export interface EnqueueResult {
status: 'RUNNING' | 'QUEUED'
queuePosition: number | null
}
/**
* Queue Manager class for managing task concurrency
*
* This class provides methods to:
* - Check if a task can run immediately or needs to be queued
* - Dequeue the next task when a slot becomes available
* - Get the current queue position for a task
*/
export class QueueManager {
private state: TaskQueueState
constructor(initialState?: TaskQueueState) {
this.state = initialState ?? createInitialQueueState()
}
/**
* Get the current queue state
*/
getState(): TaskQueueState {
return { ...this.state }
}
/**
* Check if a new task can run immediately or needs to be queued.
* If the task can run, increments runningCount.
* If the task needs to be queued, adds it to the queue.
*
* @param sessionId - The unique session ID for the task
* @returns EnqueueResult with status and queue position
*
* @see Requirements 10.1, 10.4
*/
checkAndEnqueue(sessionId: string): EnqueueResult {
// Check if task is already in queue or running
if (this.state.queue.includes(sessionId)) {
const position = this.state.queue.indexOf(sessionId) + 1
return { status: 'QUEUED', queuePosition: position }
}
// Check if we can run immediately
if (this.state.runningCount < this.state.maxConcurrent) {
this.state.runningCount++
return { status: 'RUNNING', queuePosition: null }
}
// Need to queue the task
this.state.queue.push(sessionId)
const queuePosition = this.state.queue.length
return { status: 'QUEUED', queuePosition }
}
/**
* Dequeue the next task when a running task completes.
* Decrements runningCount and returns the next queued sessionId if available.
*
* @returns The sessionId of the next task to run, or null if queue is empty
*
* @see Requirements 10.2
*/
dequeueNext(): string | null {
// Decrement running count (a task just completed)
if (this.state.runningCount > 0) {
this.state.runningCount--
}
// Check if there's a queued task to start
if (this.state.queue.length === 0) {
return null
}
// Get the next task from the front of the queue (FIFO)
const nextSessionId = this.state.queue.shift()!
// Increment running count for the new task
this.state.runningCount++
return nextSessionId
}
/**
* Get the queue position for a specific session.
* Returns 1-based position, or 0 if not in queue.
*
* @param sessionId - The session ID to look up
* @returns Queue position (1-based), or 0 if not in queue
*
* @see Requirements 10.4
*/
getQueuePosition(sessionId: string): number {
const index = this.state.queue.indexOf(sessionId)
return index === -1 ? 0 : index + 1
}
/**
* Remove a task from the queue (e.g., if cancelled)
*
* @param sessionId - The session ID to remove
* @returns true if the task was removed, false if not found
*/
removeFromQueue(sessionId: string): boolean {
const index = this.state.queue.indexOf(sessionId)
if (index === -1) {
return false
}
this.state.queue.splice(index, 1)
return true
}
/**
* Mark a task as no longer running (without dequeuing next)
* Used when a task fails or is cancelled while running
*/
markTaskCompleted(): void {
if (this.state.runningCount > 0) {
this.state.runningCount--
}
}
/**
* Check if the queue has capacity for immediate execution
*/
hasCapacity(): boolean {
return this.state.runningCount < this.state.maxConcurrent
}
/**
* Get the number of tasks currently running
*/
getRunningCount(): number {
return this.state.runningCount
}
/**
* Get the number of tasks in the queue
*/
getQueueLength(): number {
return this.state.queue.length
}
}
// ============================================================================
// Standalone Functions (for use without class instance)
// ============================================================================
/**
* Check if a new task can run immediately or needs to be queued.
* Returns a new state and the result.
*
* @param state - Current queue state
* @param sessionId - The unique session ID for the task
* @returns Tuple of [newState, result]
*/
export function checkAndEnqueue(
state: TaskQueueState,
sessionId: string
): [TaskQueueState, EnqueueResult] {
const newState = { ...state, queue: [...state.queue] }
// Check if task is already in queue
if (newState.queue.includes(sessionId)) {
const position = newState.queue.indexOf(sessionId) + 1
return [newState, { status: 'QUEUED', queuePosition: position }]
}
// Check if we can run immediately
if (newState.runningCount < newState.maxConcurrent) {
newState.runningCount++
return [newState, { status: 'RUNNING', queuePosition: null }]
}
// Need to queue the task
newState.queue.push(sessionId)
const queuePosition = newState.queue.length
return [newState, { status: 'QUEUED', queuePosition }]
}
/**
* Dequeue the next task when a running task completes.
* Returns a new state and the next sessionId.
*
* @param state - Current queue state
* @returns Tuple of [newState, nextSessionId or null]
*/
export function dequeueNext(
state: TaskQueueState
): [TaskQueueState, string | null] {
const newState = { ...state, queue: [...state.queue] }
// Decrement running count (a task just completed)
if (newState.runningCount > 0) {
newState.runningCount--
}
// Check if there's a queued task to start
if (newState.queue.length === 0) {
return [newState, null]
}
// Get the next task from the front of the queue (FIFO)
const nextSessionId = newState.queue.shift()!
// Increment running count for the new task
newState.runningCount++
return [newState, nextSessionId]
}
/**
* Get the queue position for a specific session.
* Returns 1-based position, or 0 if not in queue.
*
* @param state - Current queue state
* @param sessionId - The session ID to look up
* @returns Queue position (1-based), or 0 if not in queue
*/
export function getQueuePosition(
state: TaskQueueState,
sessionId: string
): number {
const index = state.queue.indexOf(sessionId)
return index === -1 ? 0 : index + 1
}