#!/usr/bin/env python3 """Run a single-fna BtToxin_Digger -> Shotter -> Plots pipeline. - Input: one .fna file (nucleotide scaffold) - Steps: 1) Stage this single file, run BtToxin_Digger via DockerContainerManager 2) Run Shotter scoring on Digger's All_Toxins.txt 3) Render heatmaps + paper-style report 4) Organize outputs under one root folder: / ├─ digger/ (container outputs) ├─ shotter/ (Shotter TSV/JSON + plots + report) └─ pipeline_results.tar.gz (bundle) Notes - Digger is executed in a container (root in container); files may be owned by root on host. We write everything into /digger to keep permissions/locality predictable. - This script exposes CLI flags for Shotter filters to allow strict/loose runs. Example python scripts/run_single_fna_pipeline.py \ --fna tests/test_data/C15.fna \ --toxicity_csv Data/toxicity-data.csv \ --out_root runs/C15_run \ --min_identity 0.50 --min_coverage 0.60 \ --disallow_unknown_families --require_index_hit --lang zh """ from __future__ import annotations import argparse import os import shutil import subprocess import sys import tarfile from pathlib import Path from typing import Dict, Any # import DockerContainerManager from backend sys.path.append(str(Path(__file__).resolve().parents[1] / "backend")) from app.utils.docker_client import DockerContainerManager # type: ignore def _shell(cmd: list[str]) -> subprocess.CompletedProcess: return subprocess.run(cmd, text=True) def _read_first_strain(strain_scores_tsv: Path) -> str: try: with strain_scores_tsv.open("r", encoding="utf-8") as f: header = f.readline().strip().split("\t") idx_strain = header.index("Strain") # next non-empty line for line in f: if not line.strip(): continue parts = line.rstrip("\n").split("\t") if len(parts) > idx_strain: return parts[idx_strain] except Exception: pass return "" def run_single_fna_pipeline( fna_path: Path, out_root: Path, toxicity_csv: Path = Path("Data/toxicity-data.csv"), image: str = "quay.io/biocontainers/bttoxin_digger:1.0.10--hdfd78af_0", platform: str = "linux/amd64", min_identity: float = 0.0, min_coverage: float = 0.0, allow_unknown_families: bool = True, require_index_hit: bool = False, lang: str = "zh", ) -> Dict[str, Any]: fna_path = fna_path.resolve() out_root = out_root.resolve() out_root.mkdir(parents=True, exist_ok=True) digger_dir = out_root / "digger" shotter_dir = out_root / "shotter" logs_dir = out_root / "logs" stage_dir = out_root / "stage" for d in (digger_dir, shotter_dir, logs_dir, stage_dir): d.mkdir(parents=True, exist_ok=True) # Stage single input file staged_fna = stage_dir / fna_path.name shutil.copy2(fna_path, staged_fna) # 1) Run BtToxin_Digger via DockerContainerManager mgr = DockerContainerManager(image=image, platform=platform) result = mgr.run_bttoxin_digger( input_dir=stage_dir, output_dir=digger_dir, log_dir=logs_dir, sequence_type="nucl", scaf_suffix=fna_path.suffix or ".fna", threads=4, ) if not result.get("success"): return { "ok": False, "stage": "digger", "error": result.get("error") or f"Digger failed (exit={result.get('exit_code')})", "logs": (logs_dir / "digger_execution.log").read_text(encoding="utf-8") if (logs_dir / "digger_execution.log").exists() else "", } toxins_dir = digger_dir / "Results" / "Toxins" all_toxins = toxins_dir / "All_Toxins.txt" if not all_toxins.exists(): return {"ok": False, "stage": "digger", "error": f"Missing All_Toxins.txt at {all_toxins}"} # 2) Run Shotter scoring shotter_dir.mkdir(parents=True, exist_ok=True) py = sys.executable shoter_cmd: list[str] = [ py, str(Path(__file__).resolve().parents[0] / "bttoxin_shoter.py"), "--toxicity_csv", str(toxicity_csv), "--all_toxins", str(all_toxins), "--output_dir", str(shotter_dir), ] if min_identity and min_identity > 0: shoter_cmd += ["--min_identity", str(min_identity)] if min_coverage and min_coverage > 0: shoter_cmd += ["--min_coverage", str(min_coverage)] if not allow_unknown_families: shoter_cmd += ["--disallow_unknown_families"] if require_index_hit: shoter_cmd += ["--require_index_hit"] r1 = _shell(shoter_cmd) if r1.returncode != 0: return {"ok": False, "stage": "shotter", "error": f"Shotter failed: {' '.join(shoter_cmd)}"} strain_scores = shotter_dir / "strain_target_scores.tsv" toxin_support = shotter_dir / "toxin_support.tsv" species_scores = shotter_dir / "strain_target_species_scores.tsv" # 3) Plot & report strain_for_plot = _read_first_strain(strain_scores) plot_cmd: list[str] = [ py, str(Path(__file__).resolve().parents[0] / "plot_shotter.py"), "--strain_scores", str(strain_scores), "--toxin_support", str(toxin_support), "--species_scores", str(species_scores), "--out_dir", str(shotter_dir), "--merge_unresolved", "--report_mode", "paper", "--lang", lang, ] if strain_for_plot: plot_cmd += ["--per_hit_strain", strain_for_plot] r2 = _shell(plot_cmd) if r2.returncode != 0: # plotting/report optional; continue pass # 4) Bundle bundle = out_root / "pipeline_results.tar.gz" with tarfile.open(bundle, "w:gz") as tar: tar.add(digger_dir, arcname="digger") tar.add(shotter_dir, arcname="shotter") return { "ok": True, "digger_dir": str(digger_dir), "shotter_dir": str(shotter_dir), "bundle": str(bundle), "all_toxins": str(all_toxins), "strain": strain_for_plot, } def main() -> int: ap = argparse.ArgumentParser(description="Run single-fna Digger -> Shotter pipeline") ap.add_argument("--fna", type=Path, required=True, help="Path to a single .fna file") ap.add_argument("--toxicity_csv", type=Path, default=Path("Data/toxicity-data.csv")) ap.add_argument("--out_root", type=Path, default=Path("runs/single_run")) ap.add_argument("--image", type=str, default="quay.io/biocontainers/bttoxin_digger:1.0.10--hdfd78af_0") ap.add_argument("--platform", type=str, default="linux/amd64") ap.add_argument("--min_identity", type=float, default=0.0) ap.add_argument("--min_coverage", type=float, default=0.0) ap.add_argument("--disallow_unknown_families", action="store_true", default=False) ap.add_argument("--require_index_hit", action="store_true", default=False) ap.add_argument("--lang", type=str, choices=["zh", "en"], default="zh") args = ap.parse_args() # derive per-run default out_root using file stem if str(args.out_root) == "runs/single_run": stem = args.fna.stem args.out_root = Path("runs") / f"{stem}_run" res = run_single_fna_pipeline( fna_path=args.fna, out_root=args.out_root, toxicity_csv=args.toxicity_csv, image=args.image, platform=args.platform, min_identity=args.min_identity, min_coverage=args.min_coverage, allow_unknown_families=not args.disallow_unknown_families, require_index_hit=args.require_index_hit, lang=args.lang, ) if not res.get("ok"): print(f"[pipeline] FAILED at stage={res.get('stage')}: {res.get('error')}") logs = res.get("logs") if logs: print(logs[:2000]) return 1 print("[pipeline] ✓ Done") print(f" Digger: {res['digger_dir']}") print(f" Shotter: {res['shotter_dir']}") print(f" Bundle: {res['bundle']}") print(f" Strain: {res.get('strain','')}") return 0 if __name__ == "__main__": raise SystemExit(main())