Files
bttoxin-pipeline/scripts/run_single_fna_pipeline.py
hotwa 1c0e8f90a5 feat: 支持绑定外部 bt_toxin 数据库 (2025-11-04 更新)
- docker_client.py: run_bttoxin_digger() 新增 bttoxin_db_dir 参数,支持挂载外部数据库
- run_single_fna_pipeline.py: 新增 --bttoxin_db_dir 参数,自动检测 external_dbs/bt_toxin
- README.md: 添加 bttoxin_db 更新说明和 Docker 绑定文档
- external_dbs/bt_toxin: 添加 2025-11-04 版本数据库文件

测试验证: HAN055 样本毒素命名版本号变化 (Cry2Aa9→22, Cry2Ab35→41, Cry1Ia40→42, Vip3Aa7→79)
2026-01-04 14:37:49 +08:00

263 lines
9.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Run a single-fna BtToxin_Digger -> Shotter -> Plots pipeline.
- Input: one .fna file (nucleotide scaffold)
- Steps:
1) Stage this single file, run BtToxin_Digger via DockerContainerManager
2) Run Shotter scoring on Digger's All_Toxins.txt
3) Render heatmaps + paper-style report
4) Organize outputs under one root folder:
<out_root>/
├─ digger/ (container outputs)
├─ shotter/ (Shotter TSV/JSON + plots + report)
└─ pipeline_results.tar.gz (bundle)
Notes
- Digger is executed in a container (root in container); files may be owned by root on host.
We write everything into <out_root>/digger to keep permissions/locality predictable.
- This script exposes CLI flags for Shotter filters to allow strict/loose runs.
- 默认使用 external_dbs/bt_toxin 作为外部数据库(若存在),覆盖容器内置旧库。
Example
python scripts/run_single_fna_pipeline.py \\
--fna tests/test_data/HAN055.fna \\
--toxicity_csv Data/toxicity-data.csv \\
--out_root runs/HAN055_run \\
--min_identity 0.50 --min_coverage 0.60 \\
--disallow_unknown_families --require_index_hit --lang zh
# 使用自定义数据库路径
python scripts/run_single_fna_pipeline.py \\
--fna tests/test_data/HAN055.fna \\
--bttoxin_db_dir /path/to/custom/bt_toxin
"""
from __future__ import annotations
import argparse
import os
import shutil
import subprocess
import sys
import tarfile
from pathlib import Path
from typing import Dict, Any
# import DockerContainerManager from backend
sys.path.append(str(Path(__file__).resolve().parents[1] / "backend"))
from app.utils.docker_client import DockerContainerManager # type: ignore
def _shell(cmd: list[str]) -> subprocess.CompletedProcess:
return subprocess.run(cmd, text=True)
def _read_first_strain(strain_scores_tsv: Path) -> str:
try:
with strain_scores_tsv.open("r", encoding="utf-8") as f:
header = f.readline().strip().split("\t")
idx_strain = header.index("Strain")
# next non-empty line
for line in f:
if not line.strip():
continue
parts = line.rstrip("\n").split("\t")
if len(parts) > idx_strain:
return parts[idx_strain]
except Exception:
pass
return ""
def run_single_fna_pipeline(
fna_path: Path,
out_root: Path,
toxicity_csv: Path = Path("Data/toxicity-data.csv"),
image: str = "quay.io/biocontainers/bttoxin_digger:1.0.10--hdfd78af_0",
platform: str = "linux/amd64",
min_identity: float = 0.0,
min_coverage: float = 0.0,
allow_unknown_families: bool = True,
require_index_hit: bool = False,
lang: str = "zh",
bttoxin_db_dir: Path | None = None,
) -> Dict[str, Any]:
"""运行单个 fna 文件的完整 pipeline。
Args:
bttoxin_db_dir: 外部 bt_toxin 数据库目录。若为 None则自动检测
项目根目录下的 external_dbs/bt_toxin。
"""
fna_path = fna_path.resolve()
out_root = out_root.resolve()
out_root.mkdir(parents=True, exist_ok=True)
# 自动检测外部数据库
if bttoxin_db_dir is None:
default_db = Path(__file__).resolve().parents[1] / "external_dbs" / "bt_toxin"
if default_db.exists() and (default_db / "db").exists():
bttoxin_db_dir = default_db
print(f"[pipeline] 使用外部数据库: {bttoxin_db_dir}")
else:
print("[pipeline] 未找到外部数据库,将使用容器内置数据库(可能较旧)")
digger_dir = out_root / "digger"
shotter_dir = out_root / "shotter"
logs_dir = out_root / "logs"
stage_dir = out_root / "stage"
for d in (digger_dir, shotter_dir, logs_dir, stage_dir):
d.mkdir(parents=True, exist_ok=True)
# Stage single input file
staged_fna = stage_dir / fna_path.name
shutil.copy2(fna_path, staged_fna)
# 1) Run BtToxin_Digger via DockerContainerManager
mgr = DockerContainerManager(image=image, platform=platform)
result = mgr.run_bttoxin_digger(
input_dir=stage_dir,
output_dir=digger_dir,
log_dir=logs_dir,
sequence_type="nucl",
scaf_suffix=fna_path.suffix or ".fna",
threads=4,
bttoxin_db_dir=bttoxin_db_dir,
)
if not result.get("success"):
return {
"ok": False,
"stage": "digger",
"error": result.get("error") or f"Digger failed (exit={result.get('exit_code')})",
"logs": (logs_dir / "digger_execution.log").read_text(encoding="utf-8") if (logs_dir / "digger_execution.log").exists() else "",
}
toxins_dir = digger_dir / "Results" / "Toxins"
all_toxins = toxins_dir / "All_Toxins.txt"
if not all_toxins.exists():
return {"ok": False, "stage": "digger", "error": f"Missing All_Toxins.txt at {all_toxins}"}
# 2) Run Shotter scoring
shotter_dir.mkdir(parents=True, exist_ok=True)
py = sys.executable
shoter_cmd: list[str] = [
py,
str(Path(__file__).resolve().parents[0] / "bttoxin_shoter.py"),
"--toxicity_csv",
str(toxicity_csv),
"--all_toxins",
str(all_toxins),
"--output_dir",
str(shotter_dir),
]
if min_identity and min_identity > 0:
shoter_cmd += ["--min_identity", str(min_identity)]
if min_coverage and min_coverage > 0:
shoter_cmd += ["--min_coverage", str(min_coverage)]
if not allow_unknown_families:
shoter_cmd += ["--disallow_unknown_families"]
if require_index_hit:
shoter_cmd += ["--require_index_hit"]
r1 = _shell(shoter_cmd)
if r1.returncode != 0:
return {"ok": False, "stage": "shotter", "error": f"Shotter failed: {' '.join(shoter_cmd)}"}
strain_scores = shotter_dir / "strain_target_scores.tsv"
toxin_support = shotter_dir / "toxin_support.tsv"
species_scores = shotter_dir / "strain_target_species_scores.tsv"
# 3) Plot & report
strain_for_plot = _read_first_strain(strain_scores)
plot_cmd: list[str] = [
py,
str(Path(__file__).resolve().parents[0] / "plot_shotter.py"),
"--strain_scores",
str(strain_scores),
"--toxin_support",
str(toxin_support),
"--species_scores",
str(species_scores),
"--out_dir",
str(shotter_dir),
"--merge_unresolved",
"--report_mode",
"paper",
"--lang",
lang,
]
if strain_for_plot:
plot_cmd += ["--per_hit_strain", strain_for_plot]
r2 = _shell(plot_cmd)
if r2.returncode != 0:
# plotting/report optional; continue
pass
# 4) Bundle
bundle = out_root / "pipeline_results.tar.gz"
with tarfile.open(bundle, "w:gz") as tar:
tar.add(digger_dir, arcname="digger")
tar.add(shotter_dir, arcname="shotter")
return {
"ok": True,
"digger_dir": str(digger_dir),
"shotter_dir": str(shotter_dir),
"bundle": str(bundle),
"all_toxins": str(all_toxins),
"strain": strain_for_plot,
}
def main() -> int:
ap = argparse.ArgumentParser(description="Run single-fna Digger -> Shotter pipeline")
ap.add_argument("--fna", type=Path, required=True, help="Path to a single .fna file")
ap.add_argument("--toxicity_csv", type=Path, default=Path("Data/toxicity-data.csv"))
ap.add_argument("--out_root", type=Path, default=Path("runs/single_run"))
ap.add_argument("--image", type=str, default="quay.io/biocontainers/bttoxin_digger:1.0.10--hdfd78af_0")
ap.add_argument("--platform", type=str, default="linux/amd64")
ap.add_argument("--min_identity", type=float, default=0.0)
ap.add_argument("--min_coverage", type=float, default=0.0)
ap.add_argument("--disallow_unknown_families", action="store_true", default=False)
ap.add_argument("--require_index_hit", action="store_true", default=False)
ap.add_argument("--lang", type=str, choices=["zh", "en"], default="zh")
ap.add_argument("--bttoxin_db_dir", type=Path, default=None,
help="外部 bt_toxin 数据库目录路径(默认自动检测 external_dbs/bt_toxin")
args = ap.parse_args()
# derive per-run default out_root using file stem
if str(args.out_root) == "runs/single_run":
stem = args.fna.stem
args.out_root = Path("runs") / f"{stem}_run"
res = run_single_fna_pipeline(
fna_path=args.fna,
out_root=args.out_root,
toxicity_csv=args.toxicity_csv,
image=args.image,
platform=args.platform,
min_identity=args.min_identity,
min_coverage=args.min_coverage,
allow_unknown_families=not args.disallow_unknown_families,
require_index_hit=args.require_index_hit,
lang=args.lang,
bttoxin_db_dir=args.bttoxin_db_dir,
)
if not res.get("ok"):
print(f"[pipeline] FAILED at stage={res.get('stage')}: {res.get('error')}")
logs = res.get("logs")
if logs:
print(logs[:2000])
return 1
print("[pipeline] ✓ Done")
print(f" Digger: {res['digger_dir']}")
print(f" Shotter: {res['shotter_dir']}")
print(f" Bundle: {res['bundle']}")
print(f" Strain: {res.get('strain','')}")
return 0
if __name__ == "__main__":
raise SystemExit(main())