Files
bttoxin-pipeline/motia-backend/streams/taskQueue.ts
zly fe353fc0bc chore: 初始版本提交 - 简化架构 + 轮询改造
- 移除 Motia Streams 实时通信,改用 3 秒轮询
- 简化前端代码,移除冗余组件
- 简化后端架构,准备 FastAPI 重构
- 更新 pixi.toml 环境配置
- 保留 bttoxin_digger_v5_repro 作为参考文档

Co-Authored-By: Claude <noreply@anthropic.com>
2026-01-13 16:50:09 +08:00

61 lines
1.4 KiB
TypeScript

/**
* TaskQueue Stream Definition
*
* Manages global task queue state for concurrency control.
* Tracks running tasks count and queued task list.
*
* @see Requirements 10.1, 10.4
*/
import { defineStream } from '@motia/core'
import { z } from 'zod'
// Maximum concurrent tasks
export const MAX_CONCURRENT = 4
// Task queue state schema
export const TaskQueueStateSchema = z.object({
runningCount: z.number().min(0).max(MAX_CONCURRENT),
maxConcurrent: z.number().default(MAX_CONCURRENT),
queue: z.array(z.string()), // sessionId list in FIFO order
})
export type TaskQueueState = z.infer<typeof TaskQueueStateSchema>
/**
* Create initial queue state
*/
export function createInitialQueueState(): TaskQueueState {
return {
runningCount: 0,
maxConcurrent: MAX_CONCURRENT,
queue: [],
}
}
/**
* Check if a new task can run immediately
*/
export function canRunImmediately(state: TaskQueueState): boolean {
return state.runningCount < state.maxConcurrent
}
/**
* Get queue position for a session (1-based, null if not in queue)
*/
export function getQueuePosition(
state: TaskQueueState,
sessionId: string
): number | null {
const index = state.queue.indexOf(sessionId)
return index === -1 ? null : index + 1
}
// Define the taskQueue stream (global, single instance)
export const taskQueueStream = defineStream({
name: 'taskQueue',
schema: TaskQueueStateSchema,
persistence: {
enabled: true,
},
})