chore: 初始版本提交 - 简化架构 + 轮询改造
- 移除 Motia Streams 实时通信,改用 3 秒轮询 - 简化前端代码,移除冗余组件 - 简化后端架构,准备 FastAPI 重构 - 更新 pixi.toml 环境配置 - 保留 bttoxin_digger_v5_repro 作为参考文档 Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
245
motia-backend/lib/queue-manager.ts
Normal file
245
motia-backend/lib/queue-manager.ts
Normal file
@@ -0,0 +1,245 @@
|
||||
/**
|
||||
* Queue Manager
|
||||
*
|
||||
* Manages task queue for concurrency control.
|
||||
* Implements FIFO queue with configurable max concurrent tasks.
|
||||
*
|
||||
* @see Requirements 10.1, 10.2, 10.4
|
||||
*/
|
||||
|
||||
import {
|
||||
TaskQueueState,
|
||||
MAX_CONCURRENT,
|
||||
createInitialQueueState,
|
||||
} from '../streams/taskQueue'
|
||||
|
||||
/**
|
||||
* Result of checkAndEnqueue operation
|
||||
*/
|
||||
export interface EnqueueResult {
|
||||
status: 'RUNNING' | 'QUEUED'
|
||||
queuePosition: number | null
|
||||
}
|
||||
|
||||
/**
|
||||
* Queue Manager class for managing task concurrency
|
||||
*
|
||||
* This class provides methods to:
|
||||
* - Check if a task can run immediately or needs to be queued
|
||||
* - Dequeue the next task when a slot becomes available
|
||||
* - Get the current queue position for a task
|
||||
*/
|
||||
export class QueueManager {
|
||||
private state: TaskQueueState
|
||||
|
||||
constructor(initialState?: TaskQueueState) {
|
||||
this.state = initialState ?? createInitialQueueState()
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current queue state
|
||||
*/
|
||||
getState(): TaskQueueState {
|
||||
return { ...this.state }
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a new task can run immediately or needs to be queued.
|
||||
* If the task can run, increments runningCount.
|
||||
* If the task needs to be queued, adds it to the queue.
|
||||
*
|
||||
* @param sessionId - The unique session ID for the task
|
||||
* @returns EnqueueResult with status and queue position
|
||||
*
|
||||
* @see Requirements 10.1, 10.4
|
||||
*/
|
||||
checkAndEnqueue(sessionId: string): EnqueueResult {
|
||||
// Check if task is already in queue or running
|
||||
if (this.state.queue.includes(sessionId)) {
|
||||
const position = this.state.queue.indexOf(sessionId) + 1
|
||||
return { status: 'QUEUED', queuePosition: position }
|
||||
}
|
||||
|
||||
// Check if we can run immediately
|
||||
if (this.state.runningCount < this.state.maxConcurrent) {
|
||||
this.state.runningCount++
|
||||
return { status: 'RUNNING', queuePosition: null }
|
||||
}
|
||||
|
||||
// Need to queue the task
|
||||
this.state.queue.push(sessionId)
|
||||
const queuePosition = this.state.queue.length
|
||||
return { status: 'QUEUED', queuePosition }
|
||||
}
|
||||
|
||||
/**
|
||||
* Dequeue the next task when a running task completes.
|
||||
* Decrements runningCount and returns the next queued sessionId if available.
|
||||
*
|
||||
* @returns The sessionId of the next task to run, or null if queue is empty
|
||||
*
|
||||
* @see Requirements 10.2
|
||||
*/
|
||||
dequeueNext(): string | null {
|
||||
// Decrement running count (a task just completed)
|
||||
if (this.state.runningCount > 0) {
|
||||
this.state.runningCount--
|
||||
}
|
||||
|
||||
// Check if there's a queued task to start
|
||||
if (this.state.queue.length === 0) {
|
||||
return null
|
||||
}
|
||||
|
||||
// Get the next task from the front of the queue (FIFO)
|
||||
const nextSessionId = this.state.queue.shift()!
|
||||
|
||||
// Increment running count for the new task
|
||||
this.state.runningCount++
|
||||
|
||||
return nextSessionId
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the queue position for a specific session.
|
||||
* Returns 1-based position, or 0 if not in queue.
|
||||
*
|
||||
* @param sessionId - The session ID to look up
|
||||
* @returns Queue position (1-based), or 0 if not in queue
|
||||
*
|
||||
* @see Requirements 10.4
|
||||
*/
|
||||
getQueuePosition(sessionId: string): number {
|
||||
const index = this.state.queue.indexOf(sessionId)
|
||||
return index === -1 ? 0 : index + 1
|
||||
}
|
||||
|
||||
/**
|
||||
* Remove a task from the queue (e.g., if cancelled)
|
||||
*
|
||||
* @param sessionId - The session ID to remove
|
||||
* @returns true if the task was removed, false if not found
|
||||
*/
|
||||
removeFromQueue(sessionId: string): boolean {
|
||||
const index = this.state.queue.indexOf(sessionId)
|
||||
if (index === -1) {
|
||||
return false
|
||||
}
|
||||
this.state.queue.splice(index, 1)
|
||||
return true
|
||||
}
|
||||
|
||||
/**
|
||||
* Mark a task as no longer running (without dequeuing next)
|
||||
* Used when a task fails or is cancelled while running
|
||||
*/
|
||||
markTaskCompleted(): void {
|
||||
if (this.state.runningCount > 0) {
|
||||
this.state.runningCount--
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the queue has capacity for immediate execution
|
||||
*/
|
||||
hasCapacity(): boolean {
|
||||
return this.state.runningCount < this.state.maxConcurrent
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of tasks currently running
|
||||
*/
|
||||
getRunningCount(): number {
|
||||
return this.state.runningCount
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the number of tasks in the queue
|
||||
*/
|
||||
getQueueLength(): number {
|
||||
return this.state.queue.length
|
||||
}
|
||||
}
|
||||
|
||||
// ============================================================================
|
||||
// Standalone Functions (for use without class instance)
|
||||
// ============================================================================
|
||||
|
||||
/**
|
||||
* Check if a new task can run immediately or needs to be queued.
|
||||
* Returns a new state and the result.
|
||||
*
|
||||
* @param state - Current queue state
|
||||
* @param sessionId - The unique session ID for the task
|
||||
* @returns Tuple of [newState, result]
|
||||
*/
|
||||
export function checkAndEnqueue(
|
||||
state: TaskQueueState,
|
||||
sessionId: string
|
||||
): [TaskQueueState, EnqueueResult] {
|
||||
const newState = { ...state, queue: [...state.queue] }
|
||||
|
||||
// Check if task is already in queue
|
||||
if (newState.queue.includes(sessionId)) {
|
||||
const position = newState.queue.indexOf(sessionId) + 1
|
||||
return [newState, { status: 'QUEUED', queuePosition: position }]
|
||||
}
|
||||
|
||||
// Check if we can run immediately
|
||||
if (newState.runningCount < newState.maxConcurrent) {
|
||||
newState.runningCount++
|
||||
return [newState, { status: 'RUNNING', queuePosition: null }]
|
||||
}
|
||||
|
||||
// Need to queue the task
|
||||
newState.queue.push(sessionId)
|
||||
const queuePosition = newState.queue.length
|
||||
return [newState, { status: 'QUEUED', queuePosition }]
|
||||
}
|
||||
|
||||
/**
|
||||
* Dequeue the next task when a running task completes.
|
||||
* Returns a new state and the next sessionId.
|
||||
*
|
||||
* @param state - Current queue state
|
||||
* @returns Tuple of [newState, nextSessionId or null]
|
||||
*/
|
||||
export function dequeueNext(
|
||||
state: TaskQueueState
|
||||
): [TaskQueueState, string | null] {
|
||||
const newState = { ...state, queue: [...state.queue] }
|
||||
|
||||
// Decrement running count (a task just completed)
|
||||
if (newState.runningCount > 0) {
|
||||
newState.runningCount--
|
||||
}
|
||||
|
||||
// Check if there's a queued task to start
|
||||
if (newState.queue.length === 0) {
|
||||
return [newState, null]
|
||||
}
|
||||
|
||||
// Get the next task from the front of the queue (FIFO)
|
||||
const nextSessionId = newState.queue.shift()!
|
||||
|
||||
// Increment running count for the new task
|
||||
newState.runningCount++
|
||||
|
||||
return [newState, nextSessionId]
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the queue position for a specific session.
|
||||
* Returns 1-based position, or 0 if not in queue.
|
||||
*
|
||||
* @param state - Current queue state
|
||||
* @param sessionId - The session ID to look up
|
||||
* @returns Queue position (1-based), or 0 if not in queue
|
||||
*/
|
||||
export function getQueuePosition(
|
||||
state: TaskQueueState,
|
||||
sessionId: string
|
||||
): number {
|
||||
const index = state.queue.indexOf(sessionId)
|
||||
return index === -1 ? 0 : index + 1
|
||||
}
|
||||
Reference in New Issue
Block a user