first commit

This commit is contained in:
gzy
2025-12-16 11:39:15 +08:00
commit a3bdbee7c2
118 changed files with 34631 additions and 0 deletions

670
utils/job.js Normal file
View File

@@ -0,0 +1,670 @@
const fs = require('fs');
const path = require('path');
const { exec, spawn } = require('child_process');
const util = require('util');
const execPromise = util.promisify(exec);
const nodemailer = require('nodemailer');
const config = {
emailHost: process.env.EMAIL_HOST,
emailPort: process.env.EMAIL_PORT,
emailUser: process.env.EMAIL_USER,
emailPass: process.env.EMAIL_PASS,
htmlDir: process.env.HTML_BASE,
mediaModel: process.env.MEDIA_MODEL
};
const jobs = new Map();
const stoppedJobs = new Set(); // Track stopped jobs
// --- Helpers ---
async function ensureDirectoryExists(dirPath) {
if (!fs.existsSync(dirPath)) {
await fs.promises.mkdir(dirPath, { recursive: true });
}
}
// Simple CSV parser that handles quoted fields
function parseCSVLine(line) {
const result = [];
let current = '';
let inQuotes = false;
for (let i = 0; i < line.length; i++) {
const char = line[i];
const nextChar = line[i + 1];
if (char === '"') {
if (inQuotes && nextChar === '"') {
// Escaped quote
current += '"';
i++; // Skip next quote
} else {
// Toggle quote state
inQuotes = !inQuotes;
}
} else if (char === ',' && !inQuotes) {
// Field separator
result.push(current.trim());
current = '';
} else {
current += char;
}
}
// Add last field
result.push(current.trim());
return result;
}
// Merge CSV files by genome column
async function mergeCsvFilesByGenome(csvFiles, outputPath) {
try {
// Read all CSV files
const dataFrames = [];
const allHeaders = new Set(['genome']); // genome is the key column
for (const csvFile of csvFiles) {
if (!fs.existsSync(csvFile)) {
console.warn(`[Merge] CSV file not found: ${csvFile}`);
continue;
}
const content = await fs.promises.readFile(csvFile, 'utf8');
const lines = content.trim().split('\n').filter(line => line.trim());
if (lines.length < 2) {
console.warn(`[Merge] CSV file is empty or has no data: ${csvFile}`);
continue;
}
const headers = parseCSVLine(lines[0]).map(h => h.trim().replace(/^"|"$/g, ''));
const data = {};
// Find genome column index
const genomeColIndex = headers.findIndex(h => h.toLowerCase() === 'genome');
if (genomeColIndex === -1) {
console.warn(`[Merge] No 'genome' column found in ${csvFile}, skipping`);
continue;
}
// Collect all headers (except genome which is the key)
headers.forEach(h => {
if (h.toLowerCase() !== 'genome') {
allHeaders.add(h);
}
});
// Parse data rows
for (let i = 1; i < lines.length; i++) {
const values = parseCSVLine(lines[i]).map(v => v.trim().replace(/^"|"$/g, ''));
const genome = values[genomeColIndex]?.trim();
if (!genome) continue;
if (!data[genome]) {
data[genome] = {};
}
// Map values to headers
headers.forEach((header, idx) => {
if (header.toLowerCase() !== 'genome' && values[idx] && values[idx] !== '') {
data[genome][header] = values[idx];
}
});
}
dataFrames.push({ headers, data });
}
// Merge all data by genome
const mergedData = {};
const allGenomes = new Set();
// Collect all genomes
dataFrames.forEach(df => {
Object.keys(df.data).forEach(genome => allGenomes.add(genome));
});
// Merge data for each genome
allGenomes.forEach(genome => {
mergedData[genome] = { genome };
dataFrames.forEach(df => {
if (df.data[genome]) {
Object.assign(mergedData[genome], df.data[genome]);
}
});
});
// Write merged CSV
const headerArray = Array.from(allHeaders);
const csvLines = [headerArray.join(',')];
// Sort genomes for consistent output
const sortedGenomes = Array.from(allGenomes).sort();
sortedGenomes.forEach(genome => {
const row = headerArray.map(header => {
const value = mergedData[genome][header] || '';
// Escape values that contain commas or quotes
if (value.includes(',') || value.includes('"') || value.includes('\n')) {
return `"${value.replace(/"/g, '""')}"`;
}
return value;
});
csvLines.push(row.join(','));
});
await fs.promises.writeFile(outputPath, csvLines.join('\n'), 'utf8');
console.log(`[Merge] Merged ${csvFiles.length} CSV files into ${outputPath}, ${sortedGenomes.length} genomes`);
return outputPath;
} catch (error) {
console.error('[Merge] Error merging CSV files:', error);
throw error;
}
}
async function sendResultsEmail(analysisId, email, jobId, analysisType, customResultFile = null) {
const resultFileMap = {
'ph': 'ph_predict/pHpredict.csv',
'nutrition': 'media_predict/generateMedia.csv',
'oxygen': 'tempo2predict/tempo2predict.csv',
'temperature': 'tempo2predict/tempo2predict.csv',
'growth': 'growth_predict/growthPrediction.csv',
'all': 'merged_results.csv'
};
const transporter = nodemailer.createTransport({
host: config.emailHost,
port: config.emailPort,
secure: true,
auth: { user: config.emailUser, pass: config.emailPass }
});
// Use custom file path if provided, otherwise use default mapping
let resultFilePath;
if (customResultFile) {
resultFilePath = customResultFile;
} else {
const relativePath = resultFileMap[analysisType] || resultFileMap['nutrition'];
resultFilePath = path.join(__dirname, '..', 'results', jobId, relativePath);
}
const mailOptions = {
from: config.emailUser,
to: email,
subject: `Analysis Results - Job ID: ${analysisId}`,
text: `Your analysis has been completed.`,
html: `<h2>Analysis Results</h2><p>Job ID: ${analysisId}</p><p>Analysis Type: ${analysisType}</p>`,
attachments: [
{
filename: `analysis_result_${analysisId}.csv`,
path: resultFilePath,
contentType: 'text/csv'
}
]
};
return await transporter.sendMail(mailOptions);
}
// --- Docker Scripts ---
async function executePhpredictScripts(jobId) {
const inputDir = path.join(__dirname, '..', 'results', jobId);
const hostInputDir = config.htmlDir
? path.join(config.htmlDir.replace(/\/$/, ''), 'results', jobId)
: path.join(__dirname, '..', 'results', jobId);
await ensureDirectoryExists(inputDir);
const scriptsDir = config.htmlDir
? path.join(config.htmlDir.replace(/\/$/, ''), 'public', 'scripts')
: path.join(__dirname, '..', 'public', 'scripts');
const command = `docker run --rm -v "${hostInputDir}:${inputDir}" -v "${scriptsDir}":/app/scripts media-ph-pred:v1 bash -c "/opt/conda/bin/conda run -n ph_model_jupyter bash /app/scripts/pHPredict.sh ${inputDir}/prokka_annotation ${inputDir}/ph_predict"`;
const result = await execPromise(command);
const resultFile = `${inputDir}/ph_predict/pHpredict.csv`;
if (!fs.existsSync(resultFile)) throw new Error(`Result file missing: ${resultFile}`);
return { resultFile, ...result };
}
async function executeTempOxygenScripts(jobId) {
const inputDir = path.join(__dirname, '..', 'results', jobId);
const hostInputDir = config.htmlDir
? path.join(config.htmlDir.replace(/\/$/, ''), 'results', jobId)
: path.join(__dirname, '..', 'results', jobId);
await ensureDirectoryExists(inputDir);
const scriptsDir = config.htmlDir
? path.join(config.htmlDir.replace(/\/$/, ''), 'public', 'scripts')
: path.join(__dirname, '..', 'public', 'scripts');
const command = `docker run --rm -v "${hostInputDir}:${inputDir}" -v "${scriptsDir}":/app/scripts -w /app media-bacdiveai:v1 bash -c "source /etc/profile && source /opt/conda/etc/profile.d/conda.sh && conda activate bacdiveai && pip install tqdm && bash /app/scripts/TempO2Predict.sh ${inputDir}/prokka_annotation ${inputDir}/tempo2predict"`;
const result = await execPromise(command);
const resultFile = `${inputDir}/tempo2predict/tempo2predict.csv`;
if (!fs.existsSync(resultFile)) throw new Error(`Result file missing: ${resultFile}`);
return { resultFile, ...result };
}
async function executeGrowthScripts(jobId) {
const inputDir = path.join(__dirname, '..', 'results', jobId);
const hostInputDir = config.htmlDir
? path.join(config.htmlDir.replace(/\/$/, ''), 'results', jobId)
: path.join(__dirname, '..', 'results', jobId);
await ensureDirectoryExists(inputDir);
const scriptsDir = config.htmlDir
? path.join(config.htmlDir.replace(/\/$/, ''), 'public', 'scripts')
: path.join(__dirname, '..', 'public', 'scripts');
const command = `docker run --rm -e PATH='/usr/local/bin:/usr/bin:/bin:/opt/conda/lib/R/bin' -v "${hostInputDir}:${inputDir}" -v "${scriptsDir}":/app/scripts -w /app media-grodon2:v1 bash -c "bash /app/scripts/GrowthPredict.sh ${inputDir}/prokka_annotation ${inputDir}/growth_predict"`;
const result = await execPromise(command);
const resultFile = `${inputDir}/growth_predict/growthPrediction.csv`;
if (!fs.existsSync(resultFile)) throw new Error(`Result file missing: ${resultFile}`);
return { resultFile, ...result };
}
async function executeNutritionScripts(jobId) {
const inputDir = path.join(__dirname, '..', 'results', jobId);
const hostInputDir = config.htmlDir
? path.join(config.htmlDir.replace(/\/$/, ''), 'results', jobId)
: path.join(__dirname, '..', 'results', jobId);
await ensureDirectoryExists(inputDir);
const command = `docker run --rm -v "${hostInputDir}:${inputDir}" -v "${config.mediaModel}":/app/scripts -w /app media-transformer:v2 bash -c "source /opt/conda/etc/profile.d/conda.sh && conda activate uncultured_pytorch && pip install sentencepiece --root-user-action=ignore && bash /app/scripts/mediaPredict.sh ${inputDir}/prokka_annotation ${inputDir}/media_predict"`;
const result = await execPromise(command);
const resultFile = `${inputDir}/media_predict/generateMedia.csv`;
if (!fs.existsSync(resultFile)) throw new Error(`Result file missing: ${resultFile}`);
return { resultFile, ...result };
}
// --- Main Orchestrator ---
async function executeJobProcess(job) {
try {
// 检查是否已被停止
if (stoppedJobs.has(job.id)) {
console.log(`[Job ${job.id}] Job was stopped before execution`);
job.status = 'failed';
job.error = 'Stopped by user';
return;
}
// 使用项目根目录下的 results 文件夹,与上传路径保持一致
// __dirname 是 utils 文件夹,所以需要往上退一级到项目根目录
const inputDir = path.join(__dirname, '..', 'results', job.id);
// 如果配置了 HTML_BASE使用配置的路径否则使用项目根目录
// 使用 path.join 避免路径拼接问题(如双斜杠)
const hostInputDir = config.htmlDir
? path.join(config.htmlDir.replace(/\/$/, ''), 'results', job.id)
: path.join(__dirname, '..', 'results', job.id);
console.log(`[Job ${job.id}] Input directory: ${inputDir}`);
console.log(`[Job ${job.id}] Host input directory: ${hostInputDir}`);
// 确保目录存在
await ensureDirectoryExists(inputDir);
await ensureDirectoryExists(path.join(inputDir, 'uploads'));
// 1. Prokka Annotation
job.status = 'analyzing';
job.progress = 10;
// 再次检查是否已被停止
if (stoppedJobs.has(job.id)) {
console.log(`[Job ${job.id}] Job was stopped before Prokka execution`);
job.status = 'failed';
job.error = 'Stopped by user';
return;
}
// 构建脚本目录路径,脚本在 public/scripts 目录下
// 使用 path.join 避免路径拼接问题(如双斜杠)
const scriptsDir = config.htmlDir
? path.join(config.htmlDir.replace(/\/$/, ''), 'public', 'scripts')
: path.join(__dirname, '..', 'public', 'scripts');
const dockerCommand = `docker run --rm -v "${hostInputDir}:${inputDir}" -v "${scriptsDir}":/app/scripts -w /app media-prokka:v1 bash -c "bash /app/scripts/prokka.sh --output ${inputDir}/prokka_annotation --input ${inputDir}/uploads"`;
console.log(`[Job ${job.id}] Executing Docker command: ${dockerCommand}`);
// 使用spawn来执行Docker命令以便可以终止
const dockerProcess = spawn('docker', [
'run', '--rm',
'-v', `${hostInputDir}:${inputDir}`,
'-v', `${scriptsDir}:/app/scripts`,
'-w', '/app',
'media-prokka:v1',
'bash', '-c', `bash /app/scripts/prokka.sh --output ${inputDir}/prokka_annotation --input ${inputDir}/uploads`
], { stdio: 'pipe' });
// 存储进程引用以便停止
job.dockerProcess = dockerProcess;
// 等待进程完成
const { stdout, stderr } = await new Promise((resolve, reject) => {
let stdoutData = '';
let stderrData = '';
dockerProcess.stdout.on('data', (data) => {
stdoutData += data.toString();
});
dockerProcess.stderr.on('data', (data) => {
stderrData += data.toString();
});
dockerProcess.on('close', (code) => {
if (stoppedJobs.has(job.id)) {
reject(new Error('Stopped by user'));
} else if (code !== 0) {
reject(new Error(`Docker process exited with code ${code}: ${stderrData}`));
} else {
resolve({ stdout: stdoutData, stderr: stderrData });
}
});
dockerProcess.on('error', (error) => {
reject(error);
});
});
job.progress = 60;
// 2. Analysis Execution
const executionMap = {
'ph': executePhpredictScripts,
'nutrition': executeNutritionScripts,
'oxygen': executeTempOxygenScripts,
'temperature': executeTempOxygenScripts,
'growth': executeGrowthScripts
};
// Check if analysis all mode
const isAnalysisAll = job.analysis_type === 'all';
const analysisTypes = isAnalysisAll
? ['ph', 'nutrition', 'oxygen', 'temperature', 'growth']
: [job.analysis_type];
if (!isAnalysisAll && !executionMap[job.analysis_type]) {
throw new Error('Unsupported analysis');
}
const allResults = [];
const csvFiles = [];
const totalAnalyses = analysisTypes.length;
// Execute all analyses
for (let i = 0; i < analysisTypes.length; i++) {
// 检查是否已被停止
if (stoppedJobs.has(job.id)) {
console.log(`[Job ${job.id}] Job was stopped during analysis execution`);
job.status = 'failed';
job.error = 'Stopped by user';
return;
}
const analysisType = analysisTypes[i];
const progressBase = 60;
const progressStep = 35 / totalAnalyses; // Use 35% of progress for analyses
job.progress = Math.floor(progressBase + (i * progressStep));
console.log(`[Job ${job.id}] Executing ${analysisType} analysis (${i + 1}/${totalAnalyses})`);
try {
const scriptResult = await executionMap[analysisType](job.id);
allResults.push({ type: analysisType, result: scriptResult });
csvFiles.push(scriptResult.resultFile);
console.log(`[Job ${job.id}] ${analysisType} analysis completed`);
} catch (error) {
// 如果是因为停止导致的错误,直接返回
if (stoppedJobs.has(job.id) || error.message.includes('Stopped by user')) {
console.log(`[Job ${job.id}] Analysis stopped by user`);
job.status = 'failed';
job.error = 'Stopped by user';
return;
}
console.error(`[Job ${job.id}] ${analysisType} analysis failed:`, error);
// Continue with other analyses even if one fails
}
}
job.progress = 95;
// 3. Merge CSV files if analysis all mode
let finalResultFile;
let parsedResult = {};
if (isAnalysisAll && csvFiles.length > 0) {
// Merge all CSV files by genome column
const mergedCsvPath = path.join(inputDir, 'merged_results.csv');
await mergeCsvFilesByGenome(csvFiles, mergedCsvPath);
finalResultFile = mergedCsvPath;
// Parse merged result for display
try {
const csvContent = await fs.promises.readFile(mergedCsvPath, 'utf8');
const lines = csvContent.trim().split('\n');
if (lines.length > 1) {
const headers = lines[0].split(',').map(h => h.trim());
const values = lines[1]?.split(',') || [];
headers.forEach((h, i) => parsedResult[h] = values[i]?.trim() || '');
}
} catch (e) {
console.error('[Job] Error parsing merged CSV:', e);
}
} else if (allResults.length > 0) {
// Single analysis mode
finalResultFile = allResults[0].result.resultFile;
try {
const csvContent = await fs.promises.readFile(finalResultFile, 'utf8');
const lines = csvContent.trim().split('\n');
const headers = lines[0].split(',').map(h => h.trim());
const values = lines[1]?.split(',') || [];
headers.forEach((h, i) => parsedResult[h] = values[i]?.trim() || '');
} catch (e) { console.error(e); }
} else {
throw new Error('No analysis results available');
}
// 检查是否在最后阶段被停止
if (stoppedJobs.has(job.id)) {
console.log(`[Job ${job.id}] Job was stopped before completion`);
job.status = 'failed';
job.error = 'Stopped by user';
stoppedJobs.delete(job.id); // 清理停止标志
return;
}
job.progress = 100;
job.status = 'completed';
job.result = {
analysis_id: job.id,
result_file: finalResultFile,
analysis_types: isAnalysisAll ? analysisTypes : [job.analysis_type],
...parsedResult
};
// 清理停止标志和进程引用
stoppedJobs.delete(job.id);
delete job.dockerProcess;
console.log(`[Job ${job.id}] successfully completed !`);
if (job.email) {
// For analysis all, send email with merged file
const emailAnalysisType = isAnalysisAll ? 'all' : job.analysis_type;
await sendResultsEmail(job.id, job.email, job.id, emailAnalysisType, finalResultFile);
console.log(`[Job ${job.id}] email sent to ${job.email} successfully !`);
}
} catch (error) {
console.error(`[Job ${job.id}] Error:`, error);
// 如果是因为停止导致的错误,使用停止消息
if (stoppedJobs.has(job.id) || error.message.includes('Stopped by user')) {
job.status = 'failed';
job.error = 'Stopped by user';
stoppedJobs.delete(job.id);
} else {
job.status = 'failed';
job.error = error.message;
}
job.progress = 0;
// 清理进程引用
delete job.dockerProcess;
}
}
const jobService = {
createJob(id, analysisType, email, files) {
const job = {
id, analysis_type: analysisType, email, files,
created_at: Date.now(), status: 'queued', progress: 0, file_count: files.length,
eta_seconds: 10
};
jobs.set(id, job);
// Async Start
setTimeout(() => executeJobProcess(job), 1000);
return job;
},
getJob(id) { return jobs.get(id); },
async stopJob(id) {
const job = jobs.get(id);
if (!job) throw new Error('Job not found');
// 如果任务已完成或已失败,不需要停止
if (job.status === 'completed' || job.status === 'failed') {
console.log(`[Stop] Job ${id} is already ${job.status}`);
return true;
}
console.log(`[Stop] Stopping job ${id}...`);
// 标记任务为已停止
stoppedJobs.add(id);
// 停止Docker进程如果存在
if (job.dockerProcess) {
try {
console.log(`[Stop] Killing Docker process for job ${id}`);
job.dockerProcess.kill('SIGTERM');
// 如果SIGTERM不起作用使用SIGKILL
setTimeout(() => {
if (!job.dockerProcess.killed) {
job.dockerProcess.kill('SIGKILL');
}
}, 5000);
} catch (error) {
console.error(`[Stop] Error killing Docker process:`, error);
}
}
// 停止所有相关的Docker容器
try {
const hostInputDir = config.htmlDir
? path.join(config.htmlDir.replace(/\/$/, ''), 'results', id)
: path.join(__dirname, '..', 'results', id);
// 方法1: 通过volume路径查找容器使用--filter volume
try {
const getContainersCmd1 = `docker ps --filter "volume=${hostInputDir}" --format "{{.ID}}"`;
const { stdout } = await execPromise(getContainersCmd1);
if (stdout && stdout.trim()) {
const cids = stdout.trim().split('\n').filter(cid => cid);
for (const cid of cids) {
console.log(`[Stop] Stopping container ${cid} for job ${id} (method 1)`);
try {
await execPromise(`docker stop ${cid}`);
console.log(`[Stop] Container ${cid} stopped`);
} catch (e) {
console.warn(`[Stop] Failed to stop container ${cid}:`, e.message);
}
}
}
} catch (e) {
console.warn(`[Stop] Method 1 failed:`, e.message);
}
// 方法2: 查找所有运行中的容器,检查挂载点
try {
const getAllContainersCmd = `docker ps --format "{{.ID}} {{.Mounts}}"`;
const { stdout } = await execPromise(getAllContainersCmd);
if (stdout) {
const lines = stdout.trim().split('\n');
const containerIds = new Set();
for (const line of lines) {
// 检查是否包含job相关的路径
if (line.includes(hostInputDir) || line.includes(`results/${id}`) || line.includes(`results\\\\${id}`)) {
const containerId = line.split(' ')[0];
if (containerId && containerId.length === 12) { // Docker container IDs are 12 chars
containerIds.add(containerId);
}
}
}
for (const containerId of containerIds) {
console.log(`[Stop] Stopping container ${containerId} for job ${id} (method 2)`);
try {
await execPromise(`docker stop ${containerId}`);
console.log(`[Stop] Container ${containerId} stopped`);
} catch (e) {
console.warn(`[Stop] Failed to stop container ${containerId}:`, e.message);
}
}
}
} catch (e) {
console.warn(`[Stop] Method 2 failed:`, e.message);
}
// 方法3: 使用docker ps -a查找所有容器包括已停止的然后强制停止
// 这个方法作为最后的备用方案
try {
const getAllContainersCmd = `docker ps -a --format "{{.ID}} {{.Mounts}}"`;
const { stdout } = await execPromise(getAllContainersCmd);
if (stdout) {
const lines = stdout.trim().split('\n');
for (const line of lines) {
if (line.includes(`results/${id}`) || line.includes(`results\\\\${id}`)) {
const containerId = line.split(' ')[0];
if (containerId && containerId.length === 12) {
console.log(`[Stop] Force stopping container ${containerId} for job ${id} (method 3)`);
try {
// 先尝试正常停止
await execPromise(`docker stop ${containerId}`).catch(() => {});
// 如果还在运行,强制杀死
await execPromise(`docker kill ${containerId}`).catch(() => {});
console.log(`[Stop] Container ${containerId} force stopped`);
} catch (e) {
console.warn(`[Stop] Failed to force stop container ${containerId}:`, e.message);
}
}
}
}
}
} catch (e) {
console.warn(`[Stop] Method 3 failed:`, e.message);
}
} catch (error) {
console.error(`[Stop] Error stopping containers:`, error);
}
// 更新任务状态
job.status = 'failed';
job.error = 'Stopped by user';
job.progress = 0;
// 清理进程引用
delete job.dockerProcess;
console.log(`[Stop] Job ${id} stopped successfully`);
return true;
},
async resendEmail(id, email) {
const job = jobs.get(id);
if(!job) throw new Error('Job not found');
return await sendResultsEmail(id, email, id, job.analysis_type);
}
};
module.exports = jobService;