Files
bttoxin-pipeline/scripts/run_single_fna_pipeline.py
hotwa ae4c6351d9 feat: migrate from Docker to pixi for BtToxin_Digger execution
- Add pixi.toml with digger and pipeline environments
- Implement PixiRunner class replacing DockerContainerManager
- Add run_digger_stage.py for standalone digger execution
- Update run_single_fna_pipeline.py to use PixiRunner
- Remove docker dependency from pyproject.toml
- Delete docker_client.py (no longer needed)

BREAKING CHANGE: Docker is no longer required. Use 'pixi install' instead.
2026-01-08 16:58:45 +08:00

259 lines
9.3 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""Run a single-fna BtToxin_Digger -> Shotter -> Plots pipeline.
- Input: one .fna file (nucleotide scaffold)
- Steps:
1) Stage this single file, run BtToxin_Digger via PixiRunner (pixi environment)
2) Run Shotter scoring on Digger's All_Toxins.txt via pixi run -e pipeline
3) Render heatmaps + paper-style report via pixi run -e pipeline
4) Organize outputs under one root folder:
<out_root>/
├─ digger/ (pixi digger env outputs)
├─ shotter/ (Shotter TSV/JSON + plots + report)
└─ pipeline_results.tar.gz (bundle)
Notes
- Digger is executed in the pixi 'digger' environment with bioconda dependencies.
- Shotter and plotting are executed in the pixi 'pipeline' environment with Python dependencies.
- This script exposes CLI flags for Shotter filters to allow strict/loose runs.
- 默认使用 external_dbs/bt_toxin 作为外部数据库(若存在)。
Example
python scripts/run_single_fna_pipeline.py \\
--fna tests/test_data/HAN055.fna \\
--toxicity_csv Data/toxicity-data.csv \\
--out_root runs/HAN055_run \\
--min_identity 0.50 --min_coverage 0.60 \\
--disallow_unknown_families --require_index_hit --lang zh
# 使用自定义数据库路径
python scripts/run_single_fna_pipeline.py \\
--fna tests/test_data/HAN055.fna \\
--bttoxin_db_dir /path/to/custom/bt_toxin
# 使用 pixi 任务运行
pixi run pipeline --fna tests/test_data/HAN055.fna
"""
from __future__ import annotations
import argparse
import os
import shutil
import subprocess
import sys
import tarfile
from pathlib import Path
from typing import Dict, Any, List
# Import PixiRunner and command builders from scripts
from pixi_runner import PixiRunner, build_shotter_command, build_plot_command
def _shell(cmd: list[str]) -> subprocess.CompletedProcess:
return subprocess.run(cmd, text=True)
def _read_first_strain(strain_scores_tsv: Path) -> str:
try:
with strain_scores_tsv.open("r", encoding="utf-8") as f:
header = f.readline().strip().split("\t")
idx_strain = header.index("Strain")
# next non-empty line
for line in f:
if not line.strip():
continue
parts = line.rstrip("\n").split("\t")
if len(parts) > idx_strain:
return parts[idx_strain]
except Exception:
pass
return ""
def run_single_fna_pipeline(
fna_path: Path,
out_root: Path,
toxicity_csv: Path = Path("Data/toxicity-data.csv"),
min_identity: float = 0.0,
min_coverage: float = 0.0,
allow_unknown_families: bool = True,
require_index_hit: bool = False,
lang: str = "zh",
bttoxin_db_dir: Path | None = None,
threads: int = 4,
) -> Dict[str, Any]:
"""运行单个 fna 文件的完整 pipeline使用 pixi 环境)。
Args:
fna_path: 输入 .fna 文件路径
out_root: 输出根目录
toxicity_csv: 毒性数据 CSV 文件路径
min_identity: 最小 identity 阈值
min_coverage: 最小 coverage 阈值
allow_unknown_families: 是否允许未知家族
require_index_hit: 是否要求索引命中
lang: 报告语言 (zh/en)
bttoxin_db_dir: 外部 bt_toxin 数据库目录。若为 None则自动检测
项目根目录下的 external_dbs/bt_toxin。
threads: 线程数
"""
fna_path = fna_path.resolve()
out_root = out_root.resolve()
out_root.mkdir(parents=True, exist_ok=True)
# 自动检测外部数据库
if bttoxin_db_dir is None:
default_db = Path(__file__).resolve().parents[1] / "external_dbs" / "bt_toxin"
if default_db.exists() and (default_db / "db").exists():
bttoxin_db_dir = default_db
print(f"[pipeline] 使用外部数据库: {bttoxin_db_dir}")
else:
print("[pipeline] 未找到外部数据库,将使用 pixi 环境内置数据库")
digger_dir = out_root / "digger"
shotter_dir = out_root / "shotter"
logs_dir = out_root / "logs"
stage_dir = out_root / "stage"
for d in (digger_dir, shotter_dir, logs_dir, stage_dir):
d.mkdir(parents=True, exist_ok=True)
# Stage single input file
staged_fna = stage_dir / fna_path.name
shutil.copy2(fna_path, staged_fna)
# 1) Run BtToxin_Digger via PixiRunner (pixi digger environment)
runner = PixiRunner(env_name="digger")
result = runner.run_bttoxin_digger(
input_dir=stage_dir,
output_dir=digger_dir,
log_dir=logs_dir,
sequence_type="nucl",
scaf_suffix=fna_path.suffix or ".fna",
threads=threads,
bttoxin_db_dir=bttoxin_db_dir,
)
if not result.get("success"):
return {
"ok": False,
"stage": "digger",
"error": result.get("error") or f"Digger failed (exit={result.get('exit_code')})",
"logs": (logs_dir / "digger_execution.log").read_text(encoding="utf-8") if (logs_dir / "digger_execution.log").exists() else "",
}
toxins_dir = digger_dir / "Results" / "Toxins"
all_toxins = toxins_dir / "All_Toxins.txt"
if not all_toxins.exists():
return {"ok": False, "stage": "digger", "error": f"Missing All_Toxins.txt at {all_toxins}"}
# 2) Run Shotter scoring via pixi run -e pipeline
shotter_dir.mkdir(parents=True, exist_ok=True)
scripts_dir = Path(__file__).resolve().parents[0]
pixi_project_dir = Path(__file__).resolve().parents[1]
shoter_cmd = build_shotter_command(
pixi_project_dir=pixi_project_dir,
script_path=scripts_dir / "bttoxin_shoter.py",
toxicity_csv=toxicity_csv,
all_toxins=all_toxins,
output_dir=shotter_dir,
min_identity=min_identity,
min_coverage=min_coverage,
allow_unknown_families=allow_unknown_families,
require_index_hit=require_index_hit,
)
r1 = _shell(shoter_cmd)
if r1.returncode != 0:
return {"ok": False, "stage": "shotter", "error": f"Shotter failed: {' '.join(shoter_cmd)}"}
strain_scores = shotter_dir / "strain_target_scores.tsv"
toxin_support = shotter_dir / "toxin_support.tsv"
species_scores = shotter_dir / "strain_target_species_scores.tsv"
# 3) Plot & report via pixi run -e pipeline
strain_for_plot = _read_first_strain(strain_scores)
plot_cmd = build_plot_command(
pixi_project_dir=pixi_project_dir,
script_path=scripts_dir / "plot_shotter.py",
strain_scores=strain_scores,
toxin_support=toxin_support,
species_scores=species_scores,
out_dir=shotter_dir,
merge_unresolved=True,
report_mode="paper",
lang=lang,
per_hit_strain=strain_for_plot if strain_for_plot else None,
)
r2 = _shell(plot_cmd)
if r2.returncode != 0:
# plotting/report optional; continue
pass
# 4) Bundle
bundle = out_root / "pipeline_results.tar.gz"
with tarfile.open(bundle, "w:gz") as tar:
tar.add(digger_dir, arcname="digger")
tar.add(shotter_dir, arcname="shotter")
return {
"ok": True,
"digger_dir": str(digger_dir),
"shotter_dir": str(shotter_dir),
"bundle": str(bundle),
"all_toxins": str(all_toxins),
"strain": strain_for_plot,
}
def main() -> int:
ap = argparse.ArgumentParser(description="Run single-fna Digger -> Shotter pipeline (pixi-based)")
ap.add_argument("--fna", type=Path, required=True, help="Path to a single .fna file")
ap.add_argument("--toxicity_csv", type=Path, default=Path("Data/toxicity-data.csv"))
ap.add_argument("--out_root", type=Path, default=Path("runs/single_run"))
ap.add_argument("--min_identity", type=float, default=0.0)
ap.add_argument("--min_coverage", type=float, default=0.0)
ap.add_argument("--disallow_unknown_families", action="store_true", default=False)
ap.add_argument("--require_index_hit", action="store_true", default=False)
ap.add_argument("--lang", type=str, choices=["zh", "en"], default="zh")
ap.add_argument("--bttoxin_db_dir", type=Path, default=None,
help="外部 bt_toxin 数据库目录路径(默认自动检测 external_dbs/bt_toxin")
ap.add_argument("--threads", type=int, default=4, help="线程数")
args = ap.parse_args()
# derive per-run default out_root using file stem
if str(args.out_root) == "runs/single_run":
stem = args.fna.stem
args.out_root = Path("runs") / f"{stem}_run"
res = run_single_fna_pipeline(
fna_path=args.fna,
out_root=args.out_root,
toxicity_csv=args.toxicity_csv,
min_identity=args.min_identity,
min_coverage=args.min_coverage,
allow_unknown_families=not args.disallow_unknown_families,
require_index_hit=args.require_index_hit,
lang=args.lang,
bttoxin_db_dir=args.bttoxin_db_dir,
threads=args.threads,
)
if not res.get("ok"):
print(f"[pipeline] FAILED at stage={res.get('stage')}: {res.get('error')}")
logs = res.get("logs")
if logs:
print(logs[:2000])
return 1
print("[pipeline] ✓ Done")
print(f" Digger: {res['digger_dir']}")
print(f" Shotter: {res['shotter_dir']}")
print(f" Bundle: {res['bundle']}")
print(f" Strain: {res.get('strain','')}")
return 0
if __name__ == "__main__":
raise SystemExit(main())