/** * TaskState Stream Definition * * Manages the state of each task, including overall status and step statuses. * Uses sessionId as groupId for per-task state isolation. * * @see Requirements 5.2, 2.3 */ import { defineStream } from '@motia/core' import { z } from 'zod' // Step status enum export const StepStatusSchema = z.enum([ 'PENDING', 'RUNNING', 'SUCCESS', 'FAILED', 'SKIPPED', ]) export type StepStatus = z.infer // Overall task status enum export const TaskOverallStatusSchema = z.enum([ 'QUEUED', 'PENDING', 'RUNNING', 'SUCCESS', 'FAILED', 'EXPIRED', ]) export type TaskOverallStatus = z.infer // Step ID enum export const StepIdSchema = z.enum([ 'upload', 'digger', 'shotter', 'plot', 'done', ]) export type StepId = z.infer // Task step schema export const TaskStepSchema = z.object({ stepId: StepIdSchema, name: z.string(), status: StepStatusSchema, startAt: z.string().nullable(), endAt: z.string().nullable(), durationMs: z.number().nullable(), summary: z.string().nullable(), error: z.string().nullable(), }) export type TaskStep = z.infer // Task state schema export const TaskStateSchema = z.object({ sessionId: z.string(), status: TaskOverallStatusSchema, queuePosition: z.number().nullable(), createdAt: z.string(), startedAt: z.string().nullable(), completedAt: z.string().nullable(), expiresAt: z.string(), error: z.string().nullable(), steps: z.array(TaskStepSchema), resultBundle: z.string().nullable(), }) export type TaskState = z.infer // Default step names (English) export const DEFAULT_STEP_NAMES: Record = { upload: 'File Upload', digger: 'BtToxin Digger', shotter: 'Shotter Scoring', plot: 'Generate Report', done: 'Package Complete', } // Step order for validation export const STEP_ORDER: StepId[] = ['upload', 'digger', 'shotter', 'plot', 'done'] /** * Create initial task state with all steps set to PENDING */ export function createInitialTaskState( sessionId: string, queuePosition: number | null = null ): TaskState { const now = new Date() const expiresAt = new Date(now.getTime() + 30 * 24 * 60 * 60 * 1000) // 30 days return { sessionId, status: queuePosition !== null ? 'QUEUED' : 'PENDING', queuePosition, createdAt: now.toISOString(), startedAt: null, completedAt: null, expiresAt: expiresAt.toISOString(), error: null, steps: STEP_ORDER.map((stepId) => ({ stepId, name: DEFAULT_STEP_NAMES[stepId], status: 'PENDING' as StepStatus, startAt: null, endAt: null, durationMs: null, summary: null, error: null, })), resultBundle: null, } } // Define the taskState stream export const taskStateStream = defineStream({ name: 'taskState', schema: TaskStateSchema, persistence: { enabled: true, // State is persisted per sessionId (groupId) }, })