Files
labweb/utils/job.js
2025-12-16 11:39:15 +08:00

671 lines
28 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
const fs = require('fs');
const path = require('path');
const { exec, spawn } = require('child_process');
const util = require('util');
const execPromise = util.promisify(exec);
const nodemailer = require('nodemailer');
const config = {
emailHost: process.env.EMAIL_HOST,
emailPort: process.env.EMAIL_PORT,
emailUser: process.env.EMAIL_USER,
emailPass: process.env.EMAIL_PASS,
htmlDir: process.env.HTML_BASE,
mediaModel: process.env.MEDIA_MODEL
};
const jobs = new Map();
const stoppedJobs = new Set(); // Track stopped jobs
// --- Helpers ---
async function ensureDirectoryExists(dirPath) {
if (!fs.existsSync(dirPath)) {
await fs.promises.mkdir(dirPath, { recursive: true });
}
}
// Simple CSV parser that handles quoted fields
function parseCSVLine(line) {
const result = [];
let current = '';
let inQuotes = false;
for (let i = 0; i < line.length; i++) {
const char = line[i];
const nextChar = line[i + 1];
if (char === '"') {
if (inQuotes && nextChar === '"') {
// Escaped quote
current += '"';
i++; // Skip next quote
} else {
// Toggle quote state
inQuotes = !inQuotes;
}
} else if (char === ',' && !inQuotes) {
// Field separator
result.push(current.trim());
current = '';
} else {
current += char;
}
}
// Add last field
result.push(current.trim());
return result;
}
// Merge CSV files by genome column
async function mergeCsvFilesByGenome(csvFiles, outputPath) {
try {
// Read all CSV files
const dataFrames = [];
const allHeaders = new Set(['genome']); // genome is the key column
for (const csvFile of csvFiles) {
if (!fs.existsSync(csvFile)) {
console.warn(`[Merge] CSV file not found: ${csvFile}`);
continue;
}
const content = await fs.promises.readFile(csvFile, 'utf8');
const lines = content.trim().split('\n').filter(line => line.trim());
if (lines.length < 2) {
console.warn(`[Merge] CSV file is empty or has no data: ${csvFile}`);
continue;
}
const headers = parseCSVLine(lines[0]).map(h => h.trim().replace(/^"|"$/g, ''));
const data = {};
// Find genome column index
const genomeColIndex = headers.findIndex(h => h.toLowerCase() === 'genome');
if (genomeColIndex === -1) {
console.warn(`[Merge] No 'genome' column found in ${csvFile}, skipping`);
continue;
}
// Collect all headers (except genome which is the key)
headers.forEach(h => {
if (h.toLowerCase() !== 'genome') {
allHeaders.add(h);
}
});
// Parse data rows
for (let i = 1; i < lines.length; i++) {
const values = parseCSVLine(lines[i]).map(v => v.trim().replace(/^"|"$/g, ''));
const genome = values[genomeColIndex]?.trim();
if (!genome) continue;
if (!data[genome]) {
data[genome] = {};
}
// Map values to headers
headers.forEach((header, idx) => {
if (header.toLowerCase() !== 'genome' && values[idx] && values[idx] !== '') {
data[genome][header] = values[idx];
}
});
}
dataFrames.push({ headers, data });
}
// Merge all data by genome
const mergedData = {};
const allGenomes = new Set();
// Collect all genomes
dataFrames.forEach(df => {
Object.keys(df.data).forEach(genome => allGenomes.add(genome));
});
// Merge data for each genome
allGenomes.forEach(genome => {
mergedData[genome] = { genome };
dataFrames.forEach(df => {
if (df.data[genome]) {
Object.assign(mergedData[genome], df.data[genome]);
}
});
});
// Write merged CSV
const headerArray = Array.from(allHeaders);
const csvLines = [headerArray.join(',')];
// Sort genomes for consistent output
const sortedGenomes = Array.from(allGenomes).sort();
sortedGenomes.forEach(genome => {
const row = headerArray.map(header => {
const value = mergedData[genome][header] || '';
// Escape values that contain commas or quotes
if (value.includes(',') || value.includes('"') || value.includes('\n')) {
return `"${value.replace(/"/g, '""')}"`;
}
return value;
});
csvLines.push(row.join(','));
});
await fs.promises.writeFile(outputPath, csvLines.join('\n'), 'utf8');
console.log(`[Merge] Merged ${csvFiles.length} CSV files into ${outputPath}, ${sortedGenomes.length} genomes`);
return outputPath;
} catch (error) {
console.error('[Merge] Error merging CSV files:', error);
throw error;
}
}
async function sendResultsEmail(analysisId, email, jobId, analysisType, customResultFile = null) {
const resultFileMap = {
'ph': 'ph_predict/pHpredict.csv',
'nutrition': 'media_predict/generateMedia.csv',
'oxygen': 'tempo2predict/tempo2predict.csv',
'temperature': 'tempo2predict/tempo2predict.csv',
'growth': 'growth_predict/growthPrediction.csv',
'all': 'merged_results.csv'
};
const transporter = nodemailer.createTransport({
host: config.emailHost,
port: config.emailPort,
secure: true,
auth: { user: config.emailUser, pass: config.emailPass }
});
// Use custom file path if provided, otherwise use default mapping
let resultFilePath;
if (customResultFile) {
resultFilePath = customResultFile;
} else {
const relativePath = resultFileMap[analysisType] || resultFileMap['nutrition'];
resultFilePath = path.join(__dirname, '..', 'results', jobId, relativePath);
}
const mailOptions = {
from: config.emailUser,
to: email,
subject: `Analysis Results - Job ID: ${analysisId}`,
text: `Your analysis has been completed.`,
html: `<h2>Analysis Results</h2><p>Job ID: ${analysisId}</p><p>Analysis Type: ${analysisType}</p>`,
attachments: [
{
filename: `analysis_result_${analysisId}.csv`,
path: resultFilePath,
contentType: 'text/csv'
}
]
};
return await transporter.sendMail(mailOptions);
}
// --- Docker Scripts ---
async function executePhpredictScripts(jobId) {
const inputDir = path.join(__dirname, '..', 'results', jobId);
const hostInputDir = config.htmlDir
? path.join(config.htmlDir.replace(/\/$/, ''), 'results', jobId)
: path.join(__dirname, '..', 'results', jobId);
await ensureDirectoryExists(inputDir);
const scriptsDir = config.htmlDir
? path.join(config.htmlDir.replace(/\/$/, ''), 'public', 'scripts')
: path.join(__dirname, '..', 'public', 'scripts');
const command = `docker run --rm -v "${hostInputDir}:${inputDir}" -v "${scriptsDir}":/app/scripts media-ph-pred:v1 bash -c "/opt/conda/bin/conda run -n ph_model_jupyter bash /app/scripts/pHPredict.sh ${inputDir}/prokka_annotation ${inputDir}/ph_predict"`;
const result = await execPromise(command);
const resultFile = `${inputDir}/ph_predict/pHpredict.csv`;
if (!fs.existsSync(resultFile)) throw new Error(`Result file missing: ${resultFile}`);
return { resultFile, ...result };
}
async function executeTempOxygenScripts(jobId) {
const inputDir = path.join(__dirname, '..', 'results', jobId);
const hostInputDir = config.htmlDir
? path.join(config.htmlDir.replace(/\/$/, ''), 'results', jobId)
: path.join(__dirname, '..', 'results', jobId);
await ensureDirectoryExists(inputDir);
const scriptsDir = config.htmlDir
? path.join(config.htmlDir.replace(/\/$/, ''), 'public', 'scripts')
: path.join(__dirname, '..', 'public', 'scripts');
const command = `docker run --rm -v "${hostInputDir}:${inputDir}" -v "${scriptsDir}":/app/scripts -w /app media-bacdiveai:v1 bash -c "source /etc/profile && source /opt/conda/etc/profile.d/conda.sh && conda activate bacdiveai && pip install tqdm && bash /app/scripts/TempO2Predict.sh ${inputDir}/prokka_annotation ${inputDir}/tempo2predict"`;
const result = await execPromise(command);
const resultFile = `${inputDir}/tempo2predict/tempo2predict.csv`;
if (!fs.existsSync(resultFile)) throw new Error(`Result file missing: ${resultFile}`);
return { resultFile, ...result };
}
async function executeGrowthScripts(jobId) {
const inputDir = path.join(__dirname, '..', 'results', jobId);
const hostInputDir = config.htmlDir
? path.join(config.htmlDir.replace(/\/$/, ''), 'results', jobId)
: path.join(__dirname, '..', 'results', jobId);
await ensureDirectoryExists(inputDir);
const scriptsDir = config.htmlDir
? path.join(config.htmlDir.replace(/\/$/, ''), 'public', 'scripts')
: path.join(__dirname, '..', 'public', 'scripts');
const command = `docker run --rm -e PATH='/usr/local/bin:/usr/bin:/bin:/opt/conda/lib/R/bin' -v "${hostInputDir}:${inputDir}" -v "${scriptsDir}":/app/scripts -w /app media-grodon2:v1 bash -c "bash /app/scripts/GrowthPredict.sh ${inputDir}/prokka_annotation ${inputDir}/growth_predict"`;
const result = await execPromise(command);
const resultFile = `${inputDir}/growth_predict/growthPrediction.csv`;
if (!fs.existsSync(resultFile)) throw new Error(`Result file missing: ${resultFile}`);
return { resultFile, ...result };
}
async function executeNutritionScripts(jobId) {
const inputDir = path.join(__dirname, '..', 'results', jobId);
const hostInputDir = config.htmlDir
? path.join(config.htmlDir.replace(/\/$/, ''), 'results', jobId)
: path.join(__dirname, '..', 'results', jobId);
await ensureDirectoryExists(inputDir);
const command = `docker run --rm -v "${hostInputDir}:${inputDir}" -v "${config.mediaModel}":/app/scripts -w /app media-transformer:v2 bash -c "source /opt/conda/etc/profile.d/conda.sh && conda activate uncultured_pytorch && pip install sentencepiece --root-user-action=ignore && bash /app/scripts/mediaPredict.sh ${inputDir}/prokka_annotation ${inputDir}/media_predict"`;
const result = await execPromise(command);
const resultFile = `${inputDir}/media_predict/generateMedia.csv`;
if (!fs.existsSync(resultFile)) throw new Error(`Result file missing: ${resultFile}`);
return { resultFile, ...result };
}
// --- Main Orchestrator ---
async function executeJobProcess(job) {
try {
// 检查是否已被停止
if (stoppedJobs.has(job.id)) {
console.log(`[Job ${job.id}] Job was stopped before execution`);
job.status = 'failed';
job.error = 'Stopped by user';
return;
}
// 使用项目根目录下的 results 文件夹,与上传路径保持一致
// __dirname 是 utils 文件夹,所以需要往上退一级到项目根目录
const inputDir = path.join(__dirname, '..', 'results', job.id);
// 如果配置了 HTML_BASE使用配置的路径否则使用项目根目录
// 使用 path.join 避免路径拼接问题(如双斜杠)
const hostInputDir = config.htmlDir
? path.join(config.htmlDir.replace(/\/$/, ''), 'results', job.id)
: path.join(__dirname, '..', 'results', job.id);
console.log(`[Job ${job.id}] Input directory: ${inputDir}`);
console.log(`[Job ${job.id}] Host input directory: ${hostInputDir}`);
// 确保目录存在
await ensureDirectoryExists(inputDir);
await ensureDirectoryExists(path.join(inputDir, 'uploads'));
// 1. Prokka Annotation
job.status = 'analyzing';
job.progress = 10;
// 再次检查是否已被停止
if (stoppedJobs.has(job.id)) {
console.log(`[Job ${job.id}] Job was stopped before Prokka execution`);
job.status = 'failed';
job.error = 'Stopped by user';
return;
}
// 构建脚本目录路径,脚本在 public/scripts 目录下
// 使用 path.join 避免路径拼接问题(如双斜杠)
const scriptsDir = config.htmlDir
? path.join(config.htmlDir.replace(/\/$/, ''), 'public', 'scripts')
: path.join(__dirname, '..', 'public', 'scripts');
const dockerCommand = `docker run --rm -v "${hostInputDir}:${inputDir}" -v "${scriptsDir}":/app/scripts -w /app media-prokka:v1 bash -c "bash /app/scripts/prokka.sh --output ${inputDir}/prokka_annotation --input ${inputDir}/uploads"`;
console.log(`[Job ${job.id}] Executing Docker command: ${dockerCommand}`);
// 使用spawn来执行Docker命令以便可以终止
const dockerProcess = spawn('docker', [
'run', '--rm',
'-v', `${hostInputDir}:${inputDir}`,
'-v', `${scriptsDir}:/app/scripts`,
'-w', '/app',
'media-prokka:v1',
'bash', '-c', `bash /app/scripts/prokka.sh --output ${inputDir}/prokka_annotation --input ${inputDir}/uploads`
], { stdio: 'pipe' });
// 存储进程引用以便停止
job.dockerProcess = dockerProcess;
// 等待进程完成
const { stdout, stderr } = await new Promise((resolve, reject) => {
let stdoutData = '';
let stderrData = '';
dockerProcess.stdout.on('data', (data) => {
stdoutData += data.toString();
});
dockerProcess.stderr.on('data', (data) => {
stderrData += data.toString();
});
dockerProcess.on('close', (code) => {
if (stoppedJobs.has(job.id)) {
reject(new Error('Stopped by user'));
} else if (code !== 0) {
reject(new Error(`Docker process exited with code ${code}: ${stderrData}`));
} else {
resolve({ stdout: stdoutData, stderr: stderrData });
}
});
dockerProcess.on('error', (error) => {
reject(error);
});
});
job.progress = 60;
// 2. Analysis Execution
const executionMap = {
'ph': executePhpredictScripts,
'nutrition': executeNutritionScripts,
'oxygen': executeTempOxygenScripts,
'temperature': executeTempOxygenScripts,
'growth': executeGrowthScripts
};
// Check if analysis all mode
const isAnalysisAll = job.analysis_type === 'all';
const analysisTypes = isAnalysisAll
? ['ph', 'nutrition', 'oxygen', 'temperature', 'growth']
: [job.analysis_type];
if (!isAnalysisAll && !executionMap[job.analysis_type]) {
throw new Error('Unsupported analysis');
}
const allResults = [];
const csvFiles = [];
const totalAnalyses = analysisTypes.length;
// Execute all analyses
for (let i = 0; i < analysisTypes.length; i++) {
// 检查是否已被停止
if (stoppedJobs.has(job.id)) {
console.log(`[Job ${job.id}] Job was stopped during analysis execution`);
job.status = 'failed';
job.error = 'Stopped by user';
return;
}
const analysisType = analysisTypes[i];
const progressBase = 60;
const progressStep = 35 / totalAnalyses; // Use 35% of progress for analyses
job.progress = Math.floor(progressBase + (i * progressStep));
console.log(`[Job ${job.id}] Executing ${analysisType} analysis (${i + 1}/${totalAnalyses})`);
try {
const scriptResult = await executionMap[analysisType](job.id);
allResults.push({ type: analysisType, result: scriptResult });
csvFiles.push(scriptResult.resultFile);
console.log(`[Job ${job.id}] ${analysisType} analysis completed`);
} catch (error) {
// 如果是因为停止导致的错误,直接返回
if (stoppedJobs.has(job.id) || error.message.includes('Stopped by user')) {
console.log(`[Job ${job.id}] Analysis stopped by user`);
job.status = 'failed';
job.error = 'Stopped by user';
return;
}
console.error(`[Job ${job.id}] ${analysisType} analysis failed:`, error);
// Continue with other analyses even if one fails
}
}
job.progress = 95;
// 3. Merge CSV files if analysis all mode
let finalResultFile;
let parsedResult = {};
if (isAnalysisAll && csvFiles.length > 0) {
// Merge all CSV files by genome column
const mergedCsvPath = path.join(inputDir, 'merged_results.csv');
await mergeCsvFilesByGenome(csvFiles, mergedCsvPath);
finalResultFile = mergedCsvPath;
// Parse merged result for display
try {
const csvContent = await fs.promises.readFile(mergedCsvPath, 'utf8');
const lines = csvContent.trim().split('\n');
if (lines.length > 1) {
const headers = lines[0].split(',').map(h => h.trim());
const values = lines[1]?.split(',') || [];
headers.forEach((h, i) => parsedResult[h] = values[i]?.trim() || '');
}
} catch (e) {
console.error('[Job] Error parsing merged CSV:', e);
}
} else if (allResults.length > 0) {
// Single analysis mode
finalResultFile = allResults[0].result.resultFile;
try {
const csvContent = await fs.promises.readFile(finalResultFile, 'utf8');
const lines = csvContent.trim().split('\n');
const headers = lines[0].split(',').map(h => h.trim());
const values = lines[1]?.split(',') || [];
headers.forEach((h, i) => parsedResult[h] = values[i]?.trim() || '');
} catch (e) { console.error(e); }
} else {
throw new Error('No analysis results available');
}
// 检查是否在最后阶段被停止
if (stoppedJobs.has(job.id)) {
console.log(`[Job ${job.id}] Job was stopped before completion`);
job.status = 'failed';
job.error = 'Stopped by user';
stoppedJobs.delete(job.id); // 清理停止标志
return;
}
job.progress = 100;
job.status = 'completed';
job.result = {
analysis_id: job.id,
result_file: finalResultFile,
analysis_types: isAnalysisAll ? analysisTypes : [job.analysis_type],
...parsedResult
};
// 清理停止标志和进程引用
stoppedJobs.delete(job.id);
delete job.dockerProcess;
console.log(`[Job ${job.id}] successfully completed !`);
if (job.email) {
// For analysis all, send email with merged file
const emailAnalysisType = isAnalysisAll ? 'all' : job.analysis_type;
await sendResultsEmail(job.id, job.email, job.id, emailAnalysisType, finalResultFile);
console.log(`[Job ${job.id}] email sent to ${job.email} successfully !`);
}
} catch (error) {
console.error(`[Job ${job.id}] Error:`, error);
// 如果是因为停止导致的错误,使用停止消息
if (stoppedJobs.has(job.id) || error.message.includes('Stopped by user')) {
job.status = 'failed';
job.error = 'Stopped by user';
stoppedJobs.delete(job.id);
} else {
job.status = 'failed';
job.error = error.message;
}
job.progress = 0;
// 清理进程引用
delete job.dockerProcess;
}
}
const jobService = {
createJob(id, analysisType, email, files) {
const job = {
id, analysis_type: analysisType, email, files,
created_at: Date.now(), status: 'queued', progress: 0, file_count: files.length,
eta_seconds: 10
};
jobs.set(id, job);
// Async Start
setTimeout(() => executeJobProcess(job), 1000);
return job;
},
getJob(id) { return jobs.get(id); },
async stopJob(id) {
const job = jobs.get(id);
if (!job) throw new Error('Job not found');
// 如果任务已完成或已失败,不需要停止
if (job.status === 'completed' || job.status === 'failed') {
console.log(`[Stop] Job ${id} is already ${job.status}`);
return true;
}
console.log(`[Stop] Stopping job ${id}...`);
// 标记任务为已停止
stoppedJobs.add(id);
// 停止Docker进程如果存在
if (job.dockerProcess) {
try {
console.log(`[Stop] Killing Docker process for job ${id}`);
job.dockerProcess.kill('SIGTERM');
// 如果SIGTERM不起作用使用SIGKILL
setTimeout(() => {
if (!job.dockerProcess.killed) {
job.dockerProcess.kill('SIGKILL');
}
}, 5000);
} catch (error) {
console.error(`[Stop] Error killing Docker process:`, error);
}
}
// 停止所有相关的Docker容器
try {
const hostInputDir = config.htmlDir
? path.join(config.htmlDir.replace(/\/$/, ''), 'results', id)
: path.join(__dirname, '..', 'results', id);
// 方法1: 通过volume路径查找容器使用--filter volume
try {
const getContainersCmd1 = `docker ps --filter "volume=${hostInputDir}" --format "{{.ID}}"`;
const { stdout } = await execPromise(getContainersCmd1);
if (stdout && stdout.trim()) {
const cids = stdout.trim().split('\n').filter(cid => cid);
for (const cid of cids) {
console.log(`[Stop] Stopping container ${cid} for job ${id} (method 1)`);
try {
await execPromise(`docker stop ${cid}`);
console.log(`[Stop] Container ${cid} stopped`);
} catch (e) {
console.warn(`[Stop] Failed to stop container ${cid}:`, e.message);
}
}
}
} catch (e) {
console.warn(`[Stop] Method 1 failed:`, e.message);
}
// 方法2: 查找所有运行中的容器,检查挂载点
try {
const getAllContainersCmd = `docker ps --format "{{.ID}} {{.Mounts}}"`;
const { stdout } = await execPromise(getAllContainersCmd);
if (stdout) {
const lines = stdout.trim().split('\n');
const containerIds = new Set();
for (const line of lines) {
// 检查是否包含job相关的路径
if (line.includes(hostInputDir) || line.includes(`results/${id}`) || line.includes(`results\\\\${id}`)) {
const containerId = line.split(' ')[0];
if (containerId && containerId.length === 12) { // Docker container IDs are 12 chars
containerIds.add(containerId);
}
}
}
for (const containerId of containerIds) {
console.log(`[Stop] Stopping container ${containerId} for job ${id} (method 2)`);
try {
await execPromise(`docker stop ${containerId}`);
console.log(`[Stop] Container ${containerId} stopped`);
} catch (e) {
console.warn(`[Stop] Failed to stop container ${containerId}:`, e.message);
}
}
}
} catch (e) {
console.warn(`[Stop] Method 2 failed:`, e.message);
}
// 方法3: 使用docker ps -a查找所有容器包括已停止的然后强制停止
// 这个方法作为最后的备用方案
try {
const getAllContainersCmd = `docker ps -a --format "{{.ID}} {{.Mounts}}"`;
const { stdout } = await execPromise(getAllContainersCmd);
if (stdout) {
const lines = stdout.trim().split('\n');
for (const line of lines) {
if (line.includes(`results/${id}`) || line.includes(`results\\\\${id}`)) {
const containerId = line.split(' ')[0];
if (containerId && containerId.length === 12) {
console.log(`[Stop] Force stopping container ${containerId} for job ${id} (method 3)`);
try {
// 先尝试正常停止
await execPromise(`docker stop ${containerId}`).catch(() => {});
// 如果还在运行,强制杀死
await execPromise(`docker kill ${containerId}`).catch(() => {});
console.log(`[Stop] Container ${containerId} force stopped`);
} catch (e) {
console.warn(`[Stop] Failed to force stop container ${containerId}:`, e.message);
}
}
}
}
}
} catch (e) {
console.warn(`[Stop] Method 3 failed:`, e.message);
}
} catch (error) {
console.error(`[Stop] Error stopping containers:`, error);
}
// 更新任务状态
job.status = 'failed';
job.error = 'Stopped by user';
job.progress = 0;
// 清理进程引用
delete job.dockerProcess;
console.log(`[Stop] Job ${id} stopped successfully`);
return true;
},
async resendEmail(id, email) {
const job = jobs.get(id);
if(!job) throw new Error('Job not found');
return await sendResultsEmail(id, email, id, job.analysis_type);
}
};
module.exports = jobService;