#!/usr/bin/env python3 """Run a single-fna BtToxin_Digger -> Shotter -> Plots pipeline. - Input: one .fna file (nucleotide scaffold) - Steps: 1) Stage this single file, run BtToxin_Digger via PixiRunner (pixi environment) 2) Run Shotter scoring on Digger's All_Toxins.txt via pixi run -e pipeline 3) Render heatmaps + paper-style report via pixi run -e pipeline 4) Organize outputs under one root folder: / ├─ digger/ (pixi digger env outputs) ├─ shotter/ (Shotter TSV/JSON + plots + report) └─ pipeline_results.tar.gz (bundle) Notes - Digger is executed in the pixi 'digger' environment with bioconda dependencies. - Shotter and plotting are executed in the pixi 'pipeline' environment with Python dependencies. - This script exposes CLI flags for Shotter filters to allow strict/loose runs. - 默认使用 external_dbs/bt_toxin 作为外部数据库(若存在)。 Example python scripts/run_single_fna_pipeline.py \\ --fna tests/test_data/HAN055.fna \\ --toxicity_csv Data/toxicity-data.csv \\ --out_root runs/HAN055_run \\ --min_identity 0.50 --min_coverage 0.60 \\ --disallow_unknown_families --require_index_hit --lang zh # 使用自定义数据库路径 python scripts/run_single_fna_pipeline.py \\ --fna tests/test_data/HAN055.fna \\ --bttoxin_db_dir /path/to/custom/bt_toxin # 使用 pixi 任务运行 pixi run pipeline --fna tests/test_data/HAN055.fna """ from __future__ import annotations import argparse import os import shutil import subprocess import sys import tarfile from pathlib import Path from typing import Dict, Any, List # Import PixiRunner and command builders from scripts from pixi_runner import PixiRunner, build_shotter_command, build_plot_command def _shell(cmd: list[str]) -> subprocess.CompletedProcess: return subprocess.run(cmd, text=True) def _read_first_strain(strain_scores_tsv: Path) -> str: try: with strain_scores_tsv.open("r", encoding="utf-8") as f: header = f.readline().strip().split("\t") idx_strain = header.index("Strain") # next non-empty line for line in f: if not line.strip(): continue parts = line.rstrip("\n").split("\t") if len(parts) > idx_strain: return parts[idx_strain] except Exception: pass return "" def run_single_fna_pipeline( fna_path: Path, out_root: Path, toxicity_csv: Path = Path("Data/toxicity-data.csv"), min_identity: float = 0.0, min_coverage: float = 0.0, allow_unknown_families: bool = True, require_index_hit: bool = False, lang: str = "zh", bttoxin_db_dir: Path | None = None, threads: int = 4, ) -> Dict[str, Any]: """运行单个 fna 文件的完整 pipeline(使用 pixi 环境)。 Args: fna_path: 输入 .fna 文件路径 out_root: 输出根目录 toxicity_csv: 毒性数据 CSV 文件路径 min_identity: 最小 identity 阈值 min_coverage: 最小 coverage 阈值 allow_unknown_families: 是否允许未知家族 require_index_hit: 是否要求索引命中 lang: 报告语言 (zh/en) bttoxin_db_dir: 外部 bt_toxin 数据库目录。若为 None,则自动检测 项目根目录下的 external_dbs/bt_toxin。 threads: 线程数 """ fna_path = fna_path.resolve() out_root = out_root.resolve() out_root.mkdir(parents=True, exist_ok=True) # 自动检测外部数据库 if bttoxin_db_dir is None: default_db = Path(__file__).resolve().parents[1] / "external_dbs" / "bt_toxin" if default_db.exists() and (default_db / "db").exists(): bttoxin_db_dir = default_db print(f"[pipeline] 使用外部数据库: {bttoxin_db_dir}") else: print("[pipeline] 未找到外部数据库,将使用 pixi 环境内置数据库") digger_dir = out_root / "digger" shotter_dir = out_root / "shotter" logs_dir = out_root / "logs" stage_dir = out_root / "stage" for d in (digger_dir, shotter_dir, logs_dir, stage_dir): d.mkdir(parents=True, exist_ok=True) # Stage single input file staged_fna = stage_dir / fna_path.name shutil.copy2(fna_path, staged_fna) # 1) Run BtToxin_Digger via PixiRunner (pixi digger environment) runner = PixiRunner(env_name="digger") result = runner.run_bttoxin_digger( input_dir=stage_dir, output_dir=digger_dir, log_dir=logs_dir, sequence_type="nucl", scaf_suffix=fna_path.suffix or ".fna", threads=threads, bttoxin_db_dir=bttoxin_db_dir, ) if not result.get("success"): return { "ok": False, "stage": "digger", "error": result.get("error") or f"Digger failed (exit={result.get('exit_code')})", "logs": (logs_dir / "digger_execution.log").read_text(encoding="utf-8") if (logs_dir / "digger_execution.log").exists() else "", } toxins_dir = digger_dir / "Results" / "Toxins" all_toxins = toxins_dir / "All_Toxins.txt" if not all_toxins.exists(): return {"ok": False, "stage": "digger", "error": f"Missing All_Toxins.txt at {all_toxins}"} # 2) Run Shotter scoring via pixi run -e pipeline shotter_dir.mkdir(parents=True, exist_ok=True) scripts_dir = Path(__file__).resolve().parents[0] pixi_project_dir = Path(__file__).resolve().parents[1] shoter_cmd = build_shotter_command( pixi_project_dir=pixi_project_dir, script_path=scripts_dir / "bttoxin_shoter.py", toxicity_csv=toxicity_csv, all_toxins=all_toxins, output_dir=shotter_dir, min_identity=min_identity, min_coverage=min_coverage, allow_unknown_families=allow_unknown_families, require_index_hit=require_index_hit, ) r1 = _shell(shoter_cmd) if r1.returncode != 0: return {"ok": False, "stage": "shotter", "error": f"Shotter failed: {' '.join(shoter_cmd)}"} strain_scores = shotter_dir / "strain_target_scores.tsv" toxin_support = shotter_dir / "toxin_support.tsv" species_scores = shotter_dir / "strain_target_species_scores.tsv" # 3) Plot & report via pixi run -e pipeline strain_for_plot = _read_first_strain(strain_scores) plot_cmd = build_plot_command( pixi_project_dir=pixi_project_dir, script_path=scripts_dir / "plot_shotter.py", strain_scores=strain_scores, toxin_support=toxin_support, species_scores=species_scores, out_dir=shotter_dir, merge_unresolved=True, report_mode="paper", lang=lang, per_hit_strain=strain_for_plot if strain_for_plot else None, ) r2 = _shell(plot_cmd) if r2.returncode != 0: # plotting/report optional; continue pass # 4) Bundle bundle = out_root / "pipeline_results.tar.gz" with tarfile.open(bundle, "w:gz") as tar: tar.add(digger_dir, arcname="digger") tar.add(shotter_dir, arcname="shotter") return { "ok": True, "digger_dir": str(digger_dir), "shotter_dir": str(shotter_dir), "bundle": str(bundle), "all_toxins": str(all_toxins), "strain": strain_for_plot, } def main() -> int: ap = argparse.ArgumentParser(description="Run single-fna Digger -> Shotter pipeline (pixi-based)") ap.add_argument("--fna", type=Path, required=True, help="Path to a single .fna file") ap.add_argument("--toxicity_csv", type=Path, default=Path("Data/toxicity-data.csv")) ap.add_argument("--out_root", type=Path, default=Path("runs/single_run")) ap.add_argument("--min_identity", type=float, default=0.0) ap.add_argument("--min_coverage", type=float, default=0.0) ap.add_argument("--disallow_unknown_families", action="store_true", default=False) ap.add_argument("--require_index_hit", action="store_true", default=False) ap.add_argument("--lang", type=str, choices=["zh", "en"], default="zh") ap.add_argument("--bttoxin_db_dir", type=Path, default=None, help="外部 bt_toxin 数据库目录路径(默认自动检测 external_dbs/bt_toxin)") ap.add_argument("--threads", type=int, default=4, help="线程数") args = ap.parse_args() # derive per-run default out_root using file stem if str(args.out_root) == "runs/single_run": stem = args.fna.stem args.out_root = Path("runs") / f"{stem}_run" res = run_single_fna_pipeline( fna_path=args.fna, out_root=args.out_root, toxicity_csv=args.toxicity_csv, min_identity=args.min_identity, min_coverage=args.min_coverage, allow_unknown_families=not args.disallow_unknown_families, require_index_hit=args.require_index_hit, lang=args.lang, bttoxin_db_dir=args.bttoxin_db_dir, threads=args.threads, ) if not res.get("ok"): print(f"[pipeline] FAILED at stage={res.get('stage')}: {res.get('error')}") logs = res.get("logs") if logs: print(logs[:2000]) return 1 print("[pipeline] ✓ Done") print(f" Digger: {res['digger_dir']}") print(f" Shotter: {res['shotter_dir']}") print(f" Bundle: {res['bundle']}") print(f" Strain: {res.get('strain','')}") return 0 if __name__ == "__main__": raise SystemExit(main())