Files
bttoxin-pipeline/backend/app/workers/steps/run_digger.py
2025-10-13 21:05:00 +08:00

47 lines
1.1 KiB
Python

from __future__ import annotations
from pathlib import Path
from typing import Dict
from ...services.docker_service import DockerRunner
def run_bttoxin_digger(workspace_root: Path, threads: int, sequence_type: str, scaf_suffix: str, update_db: bool) -> Dict:
runner = DockerRunner()
inputs_dir = workspace_root / "inputs"
results_dir = workspace_root / "results"
results_dir.mkdir(parents=True, exist_ok=True)
# 示例命令:根据实际 BtToxin_Digger CLI 调整
command = [
"BtToxin_Digger",
"--threads", str(threads),
"--seq_type", sequence_type,
"--scaf_suffix", scaf_suffix,
"--input", str(inputs_dir),
"--outdir", str(results_dir),
]
if update_db:
command += ["--update_db"]
logs = runner.run(
image=None,
command=command,
workdir=Path("/work"),
mounts={
workspace_root: "/work",
},
env=None,
platform=None,
detach=False,
remove=True,
)
return {
"command": command,
"logs": logs,
"results_dir": str(results_dir),
}