#!/usr/bin/env python3 from __future__ import annotations import logging import os import shutil import tarfile from pathlib import Path from types import SimpleNamespace from typing import Dict, Any, Optional import sys as _sys # Add backend and scripts to sys.path for API imports _BACKEND_DIR = Path(__file__).resolve().parents[1] / "backend" _SCRIPTS_DIR = Path(__file__).resolve().parents[0] if str(_BACKEND_DIR) not in _sys.path: _sys.path.append(str(_BACKEND_DIR)) if str(_SCRIPTS_DIR) not in _sys.path: _sys.path.append(str(_SCRIPTS_DIR)) from app.utils.docker_client import DockerContainerManager # type: ignore import bttoxin_shoter as shoter # type: ignore import plot_shotter as plotter # type: ignore logger = logging.getLogger(__name__) class BtToxinRunner: """封装 BtToxin_Digger docker 调用 - 针对单个 FNA.""" def __init__( self, image: str = "quay.io/biocontainers/bttoxin_digger:1.0.10--hdfd78af_0", platform: str = "linux/amd64", base_workdir: Optional[Path] = None, ) -> None: self.image = image self.platform = platform if base_workdir is None: base_workdir = Path(__file__).resolve().parents[1] / "runs" / "bttoxin" self.base_workdir = base_workdir self.base_workdir.mkdir(parents=True, exist_ok=True) self.mgr = DockerContainerManager(image=self.image, platform=self.platform) def _prepare_layout(self, fna_path: Path) -> tuple[Path, Path, Path, Path, str]: if not fna_path.exists(): raise FileNotFoundError(f"FNA 文件不存在: {fna_path}") sample_name = fna_path.stem run_root = self.base_workdir / sample_name input_dir = run_root / "input" output_root = run_root / "output" digger_out = output_root / "digger" log_dir = run_root / "logs" for d in (input_dir, digger_out, log_dir): d.mkdir(parents=True, exist_ok=True) # stage FNA (hardlink or copy) target = input_dir / fna_path.name if target.exists(): target.unlink() try: os.link(fna_path, target) logger.info("使用硬链接复制 FNA: %s → %s", fna_path, target) except OSError: shutil.copy2(fna_path, target) logger.info("复制 FNA: %s → %s", fna_path, target) return input_dir, digger_out, log_dir, run_root, sample_name def run_single_fna(self, fna_path: Path | str, sequence_type: str = "nucl", threads: int = 4) -> Dict[str, Any]: fna_path = Path(fna_path) input_dir, digger_out, log_dir, run_root, sample_name = self._prepare_layout(fna_path) logger.info("开始 BtToxin_Digger 分析: %s (sample=%s)", fna_path, sample_name) result = self.mgr.run_bttoxin_digger( input_dir=input_dir, output_dir=digger_out, log_dir=log_dir, sequence_type=sequence_type, scaf_suffix=fna_path.suffix or ".fna", threads=threads, ) toxins_dir = digger_out / "Results" / "Toxins" files = { "list": toxins_dir / f"{sample_name}.list", "gbk": toxins_dir / f"{sample_name}.gbk", "all_genes": toxins_dir / "Bt_all_genes.table", "all_toxins": toxins_dir / "All_Toxins.txt", } ok = bool(result.get("success")) and files["all_toxins"].exists() return { "success": ok, "sample": sample_name, "run_root": run_root, "input_dir": input_dir, "digger_out": digger_out, "log_dir": log_dir, "toxins_dir": toxins_dir, "files": files, "raw_result": result, } class ShotterAPI: """纯 Python API 调用 Shotter 打分并保存结果.""" def score( self, toxicity_csv: Path, all_toxins: Path, out_dir: Path, min_identity: float = 0.0, min_coverage: float = 0.0, allow_unknown_families: bool = True, require_index_hit: bool = False, ) -> Dict[str, Any]: out_dir.mkdir(parents=True, exist_ok=True) index = shoter.SpecificityIndex.from_csv(toxicity_csv) df = shoter.parse_all_toxins(all_toxins) # thresholds if min_identity > 0: df = df[df["identity01"].astype(float) >= float(min_identity)] if min_coverage > 0: df = df[df["coverage"].astype(float) >= float(min_coverage)] # unknown families handling if not allow_unknown_families: df = df[df["family_key"].astype(str) != "unknown"] # require index hit mapping if require_index_hit: def _has_index_orders(row) -> bool: name_key = str(row.get("Hit_id_norm", "")) fam = str(row.get("family_key", "")) d = index.orders_for_name_or_backoff(name_key) if not d: d = index.orders_for_name_or_backoff(fam) return bool(d) df = df[df.apply(_has_index_orders, axis=1)] strains = sorted(df["Strain"].astype(str).unique().tolist()) all_hits: list[shoter.ToxinHit] = [] all_strain_scores: list[shoter.StrainScores] = [] all_species_scores: list[shoter.StrainSpeciesScores] = [] for strain in strains: sdf = df[df["Strain"].astype(str).eq(strain)].copy() per_hit, sscore, sspecies = shoter.score_strain(strain, sdf, index) all_hits.extend(per_hit) all_strain_scores.append(sscore) if sspecies is not None: all_species_scores.append(sspecies) order_columns = sorted({*index.all_orders, "other", "unknown"}) or ["unknown"] species_columns = sorted(index.all_species) shoter.ToxinHit.save_list_tsv(out_dir / "toxin_support.tsv", all_hits, order_columns) shoter.StrainScores.save_list_tsv(out_dir / "strain_target_scores.tsv", all_strain_scores, order_columns) shoter.StrainScores.save_list_json(out_dir / "strain_scores.json", all_strain_scores) if species_columns and all_species_scores: shoter.StrainSpeciesScores.save_list_tsv(out_dir / "strain_target_species_scores.tsv", all_species_scores, species_columns) shoter.StrainSpeciesScores.save_list_json(out_dir / "strain_species_scores.json", all_species_scores) return { "orders": order_columns, "species": species_columns, "strain_scores": out_dir / "strain_target_scores.tsv", "toxin_support": out_dir / "toxin_support.tsv", "strain_scores_json": out_dir / "strain_scores.json", "species_scores": out_dir / "strain_target_species_scores.tsv", "species_scores_json": out_dir / "strain_species_scores.json", } class PlotAPI: """调用绘图与报告 API(非子进程)。""" def render( self, shotter_dir: Path, lang: str = "zh", merge_unresolved: bool = True, per_hit_strain: Optional[str] = None, cmap: str = "viridis", vmin: float = 0.0, vmax: float = 1.0, ) -> Dict[str, Any]: strain_scores = shotter_dir / "strain_target_scores.tsv" toxin_support = shotter_dir / "toxin_support.tsv" species_scores = shotter_dir / "strain_target_species_scores.tsv" out1 = shotter_dir / "strain_target_scores.png" plotter.plot_strain_scores(strain_scores, out1, cmap, vmin, vmax, None, merge_unresolved) out2 = None if per_hit_strain and toxin_support.exists(): out2 = shotter_dir / f"per_hit_{per_hit_strain}.png" plotter.plot_per_hit_for_strain(toxin_support, per_hit_strain, out2, cmap, vmin, vmax, None, merge_unresolved) species_png = None if species_scores.exists(): species_png = shotter_dir / "strain_target_species_scores.png" plotter.plot_species_scores(species_scores, species_png, cmap, vmin, vmax, None) # Report args_ns = SimpleNamespace( allow_unknown_families=None, require_index_hit=None, min_identity=None, min_coverage=None, lang=lang, ) report_path = shotter_dir / "shotter_report_paper.md" plotter.write_report_md( out_path=report_path, mode="paper", lang=lang, strain_scores_path=strain_scores, toxin_support_path=toxin_support if toxin_support.exists() else None, species_scores_path=species_scores if species_scores.exists() else None, strain_heatmap_path=out1, per_hit_heatmap_path=out2, species_heatmap_path=species_png, merge_unresolved=merge_unresolved, args_namespace=args_ns, ) return { "strain_orders_png": out1, "per_hit_png": out2, "species_png": species_png, "report_md": report_path, } class BtSingleFnaPipeline: """单 FNA 的完整 API 流程:Digger → Shotter → Plot → 打包.""" def __init__( self, image: str = "quay.io/biocontainers/bttoxin_digger:1.0.10--hdfd78af_0", platform: str = "linux/amd64", base_workdir: Optional[Path] = None, ) -> None: self.digger = BtToxinRunner(image=image, platform=platform, base_workdir=base_workdir) self.shotter = ShotterAPI() self.plotter = PlotAPI() def run( self, fna: Path | str, toxicity_csv: Path | str = Path("Data/toxicity-data.csv"), min_identity: float = 0.0, min_coverage: float = 0.0, allow_unknown_families: bool = True, require_index_hit: bool = False, lang: str = "zh", threads: int = 4, ) -> Dict[str, Any]: dig = self.digger.run_single_fna(fna_path=fna, sequence_type="nucl", threads=threads) if not dig.get("success"): return {"ok": False, "stage": "digger", "detail": dig} run_root: Path = dig["run_root"] shotter_dir = run_root / "output" / "shotter" shot = self.shotter.score( toxicity_csv=Path(toxicity_csv), all_toxins=Path(dig["files"]["all_toxins"]), out_dir=shotter_dir, min_identity=min_identity, min_coverage=min_coverage, allow_unknown_families=allow_unknown_families, require_index_hit=require_index_hit, ) # choose a strain for per-hit figure strain_for_plot = None try: import pandas as pd df = pd.read_csv(shot["strain_scores"], sep="\t") if len(df): strain_for_plot = str(df.iloc[0]["Strain"]) except Exception: pass vis = self.plotter.render( shotter_dir=shotter_dir, lang=lang, merge_unresolved=True, per_hit_strain=strain_for_plot) # type: ignore[arg-type] # bundle bundle = run_root / "pipeline_results.tar.gz" with tarfile.open(bundle, "w:gz") as tar: tar.add(run_root / "output" / "digger", arcname="digger") tar.add(run_root / "output" / "shotter", arcname="shotter") return { "ok": True, "run_root": str(run_root), "digger_dir": str(run_root / "output" / "digger"), "shotter_dir": str(shotter_dir), "bundle": str(bundle), "strain": strain_for_plot or "", }