From 5883e13c56e50d50072a9e979efd21d211239b80 Mon Sep 17 00:00:00 2001 From: hotwa Date: Mon, 1 Dec 2025 10:11:26 +0800 Subject: [PATCH] =?UTF-8?q?feat(shotter):=20=E5=AE=9E=E7=8E=B0=20Shotter?= =?UTF-8?q?=20v1=20=E6=B4=BB=E6=80=A7=E8=AF=84=E4=BC=B0=E4=B8=8E=E5=8D=95?= =?UTF-8?q?=20FNA=20=E6=B5=81=E7=A8=8B=EF=BC=8C=E6=96=B0=E5=A2=9E=20API/CL?= =?UTF-8?q?I/=E7=BB=98=E5=9B=BE=E4=B8=8E=E6=8A=A5=E5=91=8A?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 新增 scripts/bttoxin_shoter.py:从 BPPRC 正样本 CSV 构建 name/亚家族/家族特异性索引, 解析 BtToxin_Digger All_Toxins.txt,计算 per-hit 权重并以 noisy-OR 合成菌株×目标目/物种分数, 输出 TSV/JSON;含 HMM 加成与配对毒素规则(Vip1/Vip2,Vpa/Vpb),other/unknown 桶。 - 新增端到端工具链: - scripts/run_single_fna_pipeline.py:Digger → Shotter → Plot → 打包 - scripts/plot_shotter.py:绘制热图并生成论文式/摘要式报告 - scripts/bttoxin_api.py 与 bttoxin/api.py:纯 Python API;bttoxin/cli.py 暴露 bttoxin-run - pyproject.toml:项目打包与 CLI 入口 - docs(README): 增加输入文件格式与结果解读,补充单目录写入方案 - chore(gitignore): 忽略 runs/ 与 tests/output - ci: 移除 .woodpecker/test.yml --- .gitignore | 5 +- .woodpecker/test.yml | 15 - README.md | 42 ++ bttoxin/__init__.py | 10 + bttoxin/api.py | 299 ++++++++++++ bttoxin/cli.py | 50 +++ pyproject.toml | 26 ++ scripts/bttoxin_api.py | 286 ++++++++++++ scripts/bttoxin_shoter.py | 699 +++++++++++++++++++++++++++++ scripts/plot_shotter.py | 543 ++++++++++++++++++++++ scripts/run_single_fna_pipeline.py | 236 ++++++++++ 11 files changed, 2195 insertions(+), 16 deletions(-) delete mode 100644 .woodpecker/test.yml create mode 100644 bttoxin/__init__.py create mode 100644 bttoxin/api.py create mode 100644 bttoxin/cli.py create mode 100644 pyproject.toml create mode 100644 scripts/bttoxin_api.py create mode 100644 scripts/bttoxin_shoter.py create mode 100644 scripts/plot_shotter.py create mode 100644 scripts/run_single_fna_pipeline.py diff --git a/.gitignore b/.gitignore index f41f2c8..980d141 100644 --- a/.gitignore +++ b/.gitignore @@ -28,4 +28,7 @@ tests/test_data/genomes/*.fna .idea/ *.swp -.venv/ \ No newline at end of file +.venv/ + +runs/ +tests/output \ No newline at end of file diff --git a/.woodpecker/test.yml b/.woodpecker/test.yml deleted file mode 100644 index ddfc246..0000000 --- a/.woodpecker/test.yml +++ /dev/null @@ -1,15 +0,0 @@ -when: - event: [push, pull_request] - -steps: - prepare: - image: bttoxin-digger:latest - commands: - - echo "Preparing environment..." - - BtToxin_Digger --version - - test: - image: python:3.10 - commands: - - pip install pytest httpx - - pytest tests/ diff --git a/README.md b/README.md index e0d2934..26e11bc 100644 --- a/README.md +++ b/README.md @@ -76,6 +76,48 @@ uv run python scripts/test_bttoxin_digger.py 要求:`tests/test_data` 下存在 `97-27.fna` 与 `C15.fna`,测试成功后在 `tests/output/Results/Toxins` 看到 6 个关键文件。 +#### 输入文件格式说明 + +.fna 文件是 FASTA 格式的核酸序列文件,包含细菌的完整基因组序列: + +- **97-27.fna**: Bacillus thuringiensis strain 97-27 的完整基因组序列 +- **C15.fna**: Bacillus thuringiensis strain C15 的完整基因组序列 + +文件格式示例: +```>NZ_CP010088.1 Bacillus thuringiensis strain 97-27 chromosome, complete genome +TAATGTAACACCAGTAAATATTTCATTCATATATTCTTTTAACTGTATTTTATATTCTTTCTACTCTACAATTTCTTTTA +ACTGCCAATATGCATCTTCTAGCCAAGGGTGTAAAACTTTCAACGTGTCTTTTCTATCCCACAAATATGAAATATATGCA +... +``` + +#### 挖掘结果解读 + +BtToxin_Digger 分析完成后会生成以下关键结果文件: + +**1. 菌株毒素列表文件 (`.list`)** +- 包含每个菌株中预测到的各类毒素蛋白的详细分类信息 +- 毒素类型包括:Cry、Cyt、Vip、Others、App、Gpp、Mcf、Mpf、Mpp、Mtx、Pra、Prb、Spp、Tpp、Vpa、Vpb、Xpp +- 每个毒素显示:蛋白ID、长度、等级(Rank1-4)、BLAST结果、最佳匹配、覆盖度、相似度、SVM和HMM预测结果 + +**2. 基因银行格式文件 (`.gbk`)** +- 包含预测毒素基因的详细注释信息 +- 记录基因位置、蛋白描述、BLAST比对详情、预测结果等 +- 可用于后续的功能分析和可视化 + +**3. 汇总表格 (`Bt_all_genes.table`)** +- 所有菌株的毒素基因汇总表格 +- 显示每个菌株中不同类型毒素基因的数量和相似度信息 + +**4. 全部毒素列表 (`All_Toxins.txt`)** +- 包含所有预测到的毒素基因的完整信息 +- 字段包括:菌株、蛋白ID、蛋白长度、链向、基因位置、SVM预测、BLAST结果、HMM结果、命中ID、比对长度、一致性、E值等 + +**测试结果示例**: +- 97-27菌株预测到12个毒素基因,包括InhA1/2、Bmp1、Spp1Aa1、Zwa5A/6等 +- C15菌株预测到多个Cry毒素基因(Cry21Aa2、Cry21Aa3、Cry21Ca2、Cry5Ba1)和其他辅助毒素 +- 毒素等级分为Rank1-4,Rank1为最高置信度,Rank4为最低置信度 +- 相似度范围从27.62%到100%,表明与已知毒素的相似程度 + ### 单目录方案(跨平台稳定写入) - 运行前,程序会将输入文件复制到宿主输出目录下的 `input_files/` 子目录;容器仅挂载该输出目录(读写)为 `/workspace`。 diff --git a/bttoxin/__init__.py b/bttoxin/__init__.py new file mode 100644 index 0000000..307c2ac --- /dev/null +++ b/bttoxin/__init__.py @@ -0,0 +1,10 @@ +from __future__ import annotations + +__all__ = [ + "BtToxinRunner", + "ShotterAPI", + "PlotAPI", + "BtSingleFnaPipeline", +] + +__version__ = "0.1.0" diff --git a/bttoxin/api.py b/bttoxin/api.py new file mode 100644 index 0000000..056733f --- /dev/null +++ b/bttoxin/api.py @@ -0,0 +1,299 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import logging +import os +import shutil +import tarfile +from pathlib import Path +from types import SimpleNamespace +from typing import Dict, Any, Optional +import sys as _sys + +# Ensure repo-relative imports for backend and scripts when running from installed package +_REPO_ROOT = Path(__file__).resolve().parents[1] +_BACKEND_DIR = _REPO_ROOT / "backend" +_SCRIPTS_DIR = _REPO_ROOT / "scripts" +for _p in (str(_BACKEND_DIR), str(_SCRIPTS_DIR)): + if _p not in _sys.path: + _sys.path.append(_p) + +# Import DockerContainerManager from backend +from app.utils.docker_client import DockerContainerManager # type: ignore + +logger = logging.getLogger(__name__) + + +def _lazy_import_shoter(): + try: + import bttoxin_shoter as shoter # type: ignore + return shoter + except Exception as e: + raise ImportError( + f"Failed to import bttoxin_shoter from {_SCRIPTS_DIR}. Ensure repo is present in the image.\n{e}" + ) + + +def _lazy_import_plotter(): + try: + import plot_shotter as plotter # type: ignore + return plotter + except Exception as e: + raise ImportError( + f"Failed to import plot_shotter from {_SCRIPTS_DIR}. Ensure repo is present in the image.\n{e}" + ) + + +class BtToxinRunner: + """Wrap BtToxin_Digger docker invocation for a single FNA.""" + + def __init__( + self, + image: str = "quay.io/biocontainers/bttoxin_digger:1.0.10--hdfd78af_0", + platform: str = "linux/amd64", + base_workdir: Optional[Path] = None, + ) -> None: + self.image = image + self.platform = platform + if base_workdir is None: + base_workdir = _REPO_ROOT / "runs" / "bttoxin" + self.base_workdir = base_workdir + self.base_workdir.mkdir(parents=True, exist_ok=True) + self.mgr = DockerContainerManager(image=self.image, platform=self.platform) + + def _prepare_layout(self, fna_path: Path) -> tuple[Path, Path, Path, Path, str]: + if not fna_path.exists(): + raise FileNotFoundError(f"FNA file not found: {fna_path}") + sample_name = fna_path.stem + run_root = self.base_workdir / sample_name + input_dir = run_root / "input" + digger_out = run_root / "output" / "digger" + log_dir = run_root / "logs" + for d in (input_dir, digger_out, log_dir): + d.mkdir(parents=True, exist_ok=True) + target = input_dir / fna_path.name + if target.exists(): + target.unlink() + try: + os.link(fna_path, target) + logger.info("Hard-linked FNA: %s → %s", fna_path, target) + except OSError: + shutil.copy2(fna_path, target) + logger.info("Copied FNA: %s → %s", fna_path, target) + return input_dir, digger_out, log_dir, run_root, sample_name + + def run_single_fna(self, fna_path: Path | str, sequence_type: str = "nucl", threads: int = 4) -> Dict[str, Any]: + fna_path = Path(fna_path) + input_dir, digger_out, log_dir, run_root, sample_name = self._prepare_layout(fna_path) + logger.info("Start BtToxin_Digger: %s (sample=%s)", fna_path, sample_name) + result = self.mgr.run_bttoxin_digger( + input_dir=input_dir, + output_dir=digger_out, + log_dir=log_dir, + sequence_type=sequence_type, + scaf_suffix=fna_path.suffix or ".fna", + threads=threads, + ) + toxins_dir = digger_out / "Results" / "Toxins" + files = { + "list": toxins_dir / f"{sample_name}.list", + "gbk": toxins_dir / f"{sample_name}.gbk", + "all_genes": toxins_dir / "Bt_all_genes.table", + "all_toxins": toxins_dir / "All_Toxins.txt", + } + ok = bool(result.get("success")) and files["all_toxins"].exists() + return { + "success": ok, + "sample": sample_name, + "run_root": run_root, + "input_dir": input_dir, + "digger_out": digger_out, + "log_dir": log_dir, + "toxins_dir": toxins_dir, + "files": files, + "raw_result": result, + } + + +class ShotterAPI: + """Pure Python Shotter scoring and saving (no subprocess).""" + + def score( + self, + toxicity_csv: Path, + all_toxins: Path, + out_dir: Path, + min_identity: float = 0.0, + min_coverage: float = 0.0, + allow_unknown_families: bool = True, + require_index_hit: bool = False, + ) -> Dict[str, Any]: + shoter = _lazy_import_shoter() + out_dir.mkdir(parents=True, exist_ok=True) + index = shoter.SpecificityIndex.from_csv(toxicity_csv) + df = shoter.parse_all_toxins(all_toxins) + if min_identity > 0: + df = df[df["identity01"].astype(float) >= float(min_identity)] + if min_coverage > 0: + df = df[df["coverage"].astype(float) >= float(min_coverage)] + if not allow_unknown_families: + df = df[df["family_key"].astype(str) != "unknown"] + if require_index_hit: + def _has_index_orders(row) -> bool: + name_key = str(row.get("Hit_id_norm", "")) + fam = str(row.get("family_key", "")) + d = index.orders_for_name_or_backoff(name_key) + if not d: + d = index.orders_for_name_or_backoff(fam) + return bool(d) + df = df[df.apply(_has_index_orders, axis=1)] + strains = sorted(df["Strain"].astype(str).unique().tolist()) + all_hits: list[shoter.ToxinHit] = [] + all_strain_scores: list[shoter.StrainScores] = [] + all_species_scores: list[shoter.StrainSpeciesScores] = [] + for strain in strains: + sdf = df[df["Strain"].astype(str).eq(strain)].copy() + per_hit, sscore, sspecies = shoter.score_strain(strain, sdf, index) + all_hits.extend(per_hit) + all_strain_scores.append(sscore) + if sspecies is not None: + all_species_scores.append(sspecies) + order_columns = sorted({*index.all_orders, "other", "unknown"}) or ["unknown"] + species_columns = sorted(index.all_species) + shoter.ToxinHit.save_list_tsv(out_dir / "toxin_support.tsv", all_hits, order_columns) + shoter.StrainScores.save_list_tsv(out_dir / "strain_target_scores.tsv", all_strain_scores, order_columns) + shoter.StrainScores.save_list_json(out_dir / "strain_scores.json", all_strain_scores) + if species_columns and all_species_scores: + shoter.StrainSpeciesScores.save_list_tsv(out_dir / "strain_target_species_scores.tsv", all_species_scores, species_columns) + shoter.StrainSpeciesScores.save_list_json(out_dir / "strain_species_scores.json", all_species_scores) + return { + "orders": order_columns, + "species": species_columns, + "strain_scores": out_dir / "strain_target_scores.tsv", + "toxin_support": out_dir / "toxin_support.tsv", + "strain_scores_json": out_dir / "strain_scores.json", + "species_scores": out_dir / "strain_target_species_scores.tsv", + "species_scores_json": out_dir / "strain_species_scores.json", + } + + +class PlotAPI: + """Plot heatmaps and write Markdown report (no subprocess).""" + + def render( + self, + shotter_dir: Path, + lang: str = "zh", + merge_unresolved: bool = True, + per_hit_strain: Optional[str] = None, + cmap: str = "viridis", + vmin: float = 0.0, + vmax: float = 1.0, + ) -> Dict[str, Any]: + plotter = _lazy_import_plotter() + strain_scores = shotter_dir / "strain_target_scores.tsv" + toxin_support = shotter_dir / "toxin_support.tsv" + species_scores = shotter_dir / "strain_target_species_scores.tsv" + out1 = shotter_dir / "strain_target_scores.png" + plotter.plot_strain_scores(strain_scores, out1, cmap, vmin, vmax, None, merge_unresolved) + out2 = None + if per_hit_strain and toxin_support.exists(): + out2 = shotter_dir / f"per_hit_{per_hit_strain}.png" + plotter.plot_per_hit_for_strain(toxin_support, per_hit_strain, out2, cmap, vmin, vmax, None, merge_unresolved) + species_png = None + if species_scores.exists(): + species_png = shotter_dir / "strain_target_species_scores.png" + plotter.plot_species_scores(species_scores, species_png, cmap, vmin, vmax, None) + args_ns = SimpleNamespace( + allow_unknown_families=None, + require_index_hit=None, + min_identity=None, + min_coverage=None, + lang=lang, + ) + report_path = shotter_dir / "shotter_report_paper.md" + plotter.write_report_md( + out_path=report_path, + mode="paper", + lang=lang, + strain_scores_path=strain_scores, + toxin_support_path=toxin_support if toxin_support.exists() else None, + species_scores_path=species_scores if species_scores.exists() else None, + strain_heatmap_path=out1, + per_hit_heatmap_path=out2, + species_heatmap_path=species_png, + merge_unresolved=merge_unresolved, + args_namespace=args_ns, + ) + return { + "strain_orders_png": out1, + "per_hit_png": out2, + "species_png": species_png, + "report_md": report_path, + } + + +class BtSingleFnaPipeline: + """End-to-end single-FNA pipeline: Digger → Shotter → Plot → Bundle.""" + + def __init__( + self, + image: str = "quay.io/biocontainers/bttoxin_digger:1.0.10--hdfd78af_0", + platform: str = "linux/amd64", + base_workdir: Optional[Path] = None, + ) -> None: + self.digger = BtToxinRunner(image=image, platform=platform, base_workdir=base_workdir) + self.shotter = ShotterAPI() + self.plotter = PlotAPI() + + def run( + self, + fna: Path | str, + toxicity_csv: Path | str = Path("Data/toxicity-data.csv"), + min_identity: float = 0.0, + min_coverage: float = 0.0, + allow_unknown_families: bool = True, + require_index_hit: bool = False, + lang: str = "zh", + threads: int = 4, + ) -> Dict[str, Any]: + dig = self.digger.run_single_fna(fna_path=fna, sequence_type="nucl", threads=threads) + if not dig.get("success"): + return {"ok": False, "stage": "digger", "detail": dig} + run_root: Path = dig["run_root"] + shotter_dir = run_root / "output" / "shotter" + shot = self.shotter.score( + toxicity_csv=Path(toxicity_csv), + all_toxins=Path(dig["files"]["all_toxins"]), + out_dir=shotter_dir, + min_identity=min_identity, + min_coverage=min_coverage, + allow_unknown_families=allow_unknown_families, + require_index_hit=require_index_hit, + ) + strain_for_plot = None + try: + import pandas as pd + df = pd.read_csv(shot["strain_scores"], sep="\t") + if len(df): + strain_for_plot = str(df.iloc[0]["Strain"]) + except Exception: + pass + _ = self.plotter.render( + shotter_dir=shotter_dir, + lang=lang, + merge_unresolved=True, + per_hit_strain=strain_for_plot, + ) + bundle = run_root / "pipeline_results.tar.gz" + with tarfile.open(bundle, "w:gz") as tar: + tar.add(run_root / "output" / "digger", arcname="digger") + tar.add(run_root / "output" / "shotter", arcname="shotter") + return { + "ok": True, + "run_root": str(run_root), + "digger_dir": str(run_root / "output" / "digger"), + "shotter_dir": str(shotter_dir), + "bundle": str(bundle), + "strain": strain_for_plot or "", + } diff --git a/bttoxin/cli.py b/bttoxin/cli.py new file mode 100644 index 0000000..e5c7572 --- /dev/null +++ b/bttoxin/cli.py @@ -0,0 +1,50 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +from pathlib import Path +from .api import BtSingleFnaPipeline + + +def main() -> int: + ap = argparse.ArgumentParser(description="Bttoxin single-FNA pipeline (Digger → Shotter → Plot → Bundle)") + ap.add_argument("--fna", type=Path, required=True, help="Path to a single .fna file") + ap.add_argument("--toxicity_csv", type=Path, default=Path("Data/toxicity-data.csv")) + ap.add_argument("--base_workdir", type=Path, default=None, help="Base working dir (default: runs/bttoxin under repo root)") + ap.add_argument("--image", type=str, default="quay.io/biocontainers/bttoxin_digger:1.0.10--hdfd78af_0") + ap.add_argument("--platform", type=str, default="linux/amd64") + ap.add_argument("--min_identity", type=float, default=0.0) + ap.add_argument("--min_coverage", type=float, default=0.0) + ap.add_argument("--disallow_unknown_families", action="store_true", default=False) + ap.add_argument("--require_index_hit", action="store_true", default=False) + ap.add_argument("--lang", type=str, choices=["zh", "en"], default="zh") + ap.add_argument("--threads", type=int, default=4) + args = ap.parse_args() + + pipe = BtSingleFnaPipeline(image=args.image, platform=args.platform, base_workdir=args.base_workdir) + res = pipe.run( + fna=args.fna, + toxicity_csv=args.toxicity_csv, + min_identity=args.min_identity, + min_coverage=args.min_coverage, + allow_unknown_families=not args.disallow_unknown_families, + require_index_hit=args.require_index_hit, + lang=args.lang, + threads=args.threads, + ) + + if not res.get("ok"): + print(f"[bttoxin-run] FAILED at stage={res.get('stage')}") + return 1 + + print("[bttoxin-run] ✓ Done") + print(f" run_root: {res['run_root']}") + print(f" digger: {res['digger_dir']}") + print(f" shotter: {res['shotter_dir']}") + print(f" bundle: {res['bundle']}") + print(f" strain: {res.get('strain','')}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/pyproject.toml b/pyproject.toml new file mode 100644 index 0000000..6071388 --- /dev/null +++ b/pyproject.toml @@ -0,0 +1,26 @@ +[build-system] +requires = ["setuptools>=61.0", "wheel"] +build-backend = "setuptools.build_meta" + +[project] +name = "bttoxin-pipeline" +version = "0.1.0" +description = "BtToxin Digger + Shotter single-FNA pipeline with plotting and report" +readme = "README.md" +authors = [{name="Bttoxin Team"}] +requires-python = ">=3.9" +dependencies = [ + "pandas>=2.0.0", + "matplotlib>=3.7.0", + "seaborn>=0.12.2; python_version>='3.9'", + "docker>=6.1.0", +] + +[project.scripts] +bttoxin-run = "bttoxin.cli:main" + +[tool.setuptools] +packages = ["bttoxin"] + +[tool.setuptools.package-data] +bttoxin = [] diff --git a/scripts/bttoxin_api.py b/scripts/bttoxin_api.py new file mode 100644 index 0000000..d2e7a9d --- /dev/null +++ b/scripts/bttoxin_api.py @@ -0,0 +1,286 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import logging +import os +import shutil +import tarfile +from pathlib import Path +from types import SimpleNamespace +from typing import Dict, Any, Optional + +import sys as _sys + +# Add backend and scripts to sys.path for API imports +_BACKEND_DIR = Path(__file__).resolve().parents[1] / "backend" +_SCRIPTS_DIR = Path(__file__).resolve().parents[0] +if str(_BACKEND_DIR) not in _sys.path: + _sys.path.append(str(_BACKEND_DIR)) +if str(_SCRIPTS_DIR) not in _sys.path: + _sys.path.append(str(_SCRIPTS_DIR)) + +from app.utils.docker_client import DockerContainerManager # type: ignore +import bttoxin_shoter as shoter # type: ignore +import plot_shotter as plotter # type: ignore + +logger = logging.getLogger(__name__) + + +class BtToxinRunner: + """封装 BtToxin_Digger docker 调用 - 针对单个 FNA.""" + + def __init__( + self, + image: str = "quay.io/biocontainers/bttoxin_digger:1.0.10--hdfd78af_0", + platform: str = "linux/amd64", + base_workdir: Optional[Path] = None, + ) -> None: + self.image = image + self.platform = platform + if base_workdir is None: + base_workdir = Path(__file__).resolve().parents[1] / "runs" / "bttoxin" + self.base_workdir = base_workdir + self.base_workdir.mkdir(parents=True, exist_ok=True) + self.mgr = DockerContainerManager(image=self.image, platform=self.platform) + + def _prepare_layout(self, fna_path: Path) -> tuple[Path, Path, Path, Path, str]: + if not fna_path.exists(): + raise FileNotFoundError(f"FNA 文件不存在: {fna_path}") + sample_name = fna_path.stem + run_root = self.base_workdir / sample_name + input_dir = run_root / "input" + output_root = run_root / "output" + digger_out = output_root / "digger" + log_dir = run_root / "logs" + for d in (input_dir, digger_out, log_dir): + d.mkdir(parents=True, exist_ok=True) + # stage FNA (hardlink or copy) + target = input_dir / fna_path.name + if target.exists(): + target.unlink() + try: + os.link(fna_path, target) + logger.info("使用硬链接复制 FNA: %s → %s", fna_path, target) + except OSError: + shutil.copy2(fna_path, target) + logger.info("复制 FNA: %s → %s", fna_path, target) + return input_dir, digger_out, log_dir, run_root, sample_name + + def run_single_fna(self, fna_path: Path | str, sequence_type: str = "nucl", threads: int = 4) -> Dict[str, Any]: + fna_path = Path(fna_path) + input_dir, digger_out, log_dir, run_root, sample_name = self._prepare_layout(fna_path) + logger.info("开始 BtToxin_Digger 分析: %s (sample=%s)", fna_path, sample_name) + result = self.mgr.run_bttoxin_digger( + input_dir=input_dir, + output_dir=digger_out, + log_dir=log_dir, + sequence_type=sequence_type, + scaf_suffix=fna_path.suffix or ".fna", + threads=threads, + ) + toxins_dir = digger_out / "Results" / "Toxins" + files = { + "list": toxins_dir / f"{sample_name}.list", + "gbk": toxins_dir / f"{sample_name}.gbk", + "all_genes": toxins_dir / "Bt_all_genes.table", + "all_toxins": toxins_dir / "All_Toxins.txt", + } + ok = bool(result.get("success")) and files["all_toxins"].exists() + return { + "success": ok, + "sample": sample_name, + "run_root": run_root, + "input_dir": input_dir, + "digger_out": digger_out, + "log_dir": log_dir, + "toxins_dir": toxins_dir, + "files": files, + "raw_result": result, + } + + +class ShotterAPI: + """纯 Python API 调用 Shotter 打分并保存结果.""" + + def score( + self, + toxicity_csv: Path, + all_toxins: Path, + out_dir: Path, + min_identity: float = 0.0, + min_coverage: float = 0.0, + allow_unknown_families: bool = True, + require_index_hit: bool = False, + ) -> Dict[str, Any]: + out_dir.mkdir(parents=True, exist_ok=True) + index = shoter.SpecificityIndex.from_csv(toxicity_csv) + df = shoter.parse_all_toxins(all_toxins) + # thresholds + if min_identity > 0: + df = df[df["identity01"].astype(float) >= float(min_identity)] + if min_coverage > 0: + df = df[df["coverage"].astype(float) >= float(min_coverage)] + # unknown families handling + if not allow_unknown_families: + df = df[df["family_key"].astype(str) != "unknown"] + # require index hit mapping + if require_index_hit: + def _has_index_orders(row) -> bool: + name_key = str(row.get("Hit_id_norm", "")) + fam = str(row.get("family_key", "")) + d = index.orders_for_name_or_backoff(name_key) + if not d: + d = index.orders_for_name_or_backoff(fam) + return bool(d) + df = df[df.apply(_has_index_orders, axis=1)] + strains = sorted(df["Strain"].astype(str).unique().tolist()) + all_hits: list[shoter.ToxinHit] = [] + all_strain_scores: list[shoter.StrainScores] = [] + all_species_scores: list[shoter.StrainSpeciesScores] = [] + for strain in strains: + sdf = df[df["Strain"].astype(str).eq(strain)].copy() + per_hit, sscore, sspecies = shoter.score_strain(strain, sdf, index) + all_hits.extend(per_hit) + all_strain_scores.append(sscore) + if sspecies is not None: + all_species_scores.append(sspecies) + order_columns = sorted({*index.all_orders, "other", "unknown"}) or ["unknown"] + species_columns = sorted(index.all_species) + shoter.ToxinHit.save_list_tsv(out_dir / "toxin_support.tsv", all_hits, order_columns) + shoter.StrainScores.save_list_tsv(out_dir / "strain_target_scores.tsv", all_strain_scores, order_columns) + shoter.StrainScores.save_list_json(out_dir / "strain_scores.json", all_strain_scores) + if species_columns and all_species_scores: + shoter.StrainSpeciesScores.save_list_tsv(out_dir / "strain_target_species_scores.tsv", all_species_scores, species_columns) + shoter.StrainSpeciesScores.save_list_json(out_dir / "strain_species_scores.json", all_species_scores) + return { + "orders": order_columns, + "species": species_columns, + "strain_scores": out_dir / "strain_target_scores.tsv", + "toxin_support": out_dir / "toxin_support.tsv", + "strain_scores_json": out_dir / "strain_scores.json", + "species_scores": out_dir / "strain_target_species_scores.tsv", + "species_scores_json": out_dir / "strain_species_scores.json", + } + + +class PlotAPI: + """调用绘图与报告 API(非子进程)。""" + + def render( + self, + shotter_dir: Path, + lang: str = "zh", + merge_unresolved: bool = True, + per_hit_strain: Optional[str] = None, + cmap: str = "viridis", + vmin: float = 0.0, + vmax: float = 1.0, + ) -> Dict[str, Any]: + strain_scores = shotter_dir / "strain_target_scores.tsv" + toxin_support = shotter_dir / "toxin_support.tsv" + species_scores = shotter_dir / "strain_target_species_scores.tsv" + out1 = shotter_dir / "strain_target_scores.png" + plotter.plot_strain_scores(strain_scores, out1, cmap, vmin, vmax, None, merge_unresolved) + out2 = None + if per_hit_strain and toxin_support.exists(): + out2 = shotter_dir / f"per_hit_{per_hit_strain}.png" + plotter.plot_per_hit_for_strain(toxin_support, per_hit_strain, out2, cmap, vmin, vmax, None, merge_unresolved) + species_png = None + if species_scores.exists(): + species_png = shotter_dir / "strain_target_species_scores.png" + plotter.plot_species_scores(species_scores, species_png, cmap, vmin, vmax, None) + # Report + args_ns = SimpleNamespace( + allow_unknown_families=None, + require_index_hit=None, + min_identity=None, + min_coverage=None, + lang=lang, + ) + report_path = shotter_dir / "shotter_report_paper.md" + plotter.write_report_md( + out_path=report_path, + mode="paper", + lang=lang, + strain_scores_path=strain_scores, + toxin_support_path=toxin_support if toxin_support.exists() else None, + species_scores_path=species_scores if species_scores.exists() else None, + strain_heatmap_path=out1, + per_hit_heatmap_path=out2, + species_heatmap_path=species_png, + merge_unresolved=merge_unresolved, + args_namespace=args_ns, + ) + return { + "strain_orders_png": out1, + "per_hit_png": out2, + "species_png": species_png, + "report_md": report_path, + } + + +class BtSingleFnaPipeline: + """单 FNA 的完整 API 流程:Digger → Shotter → Plot → 打包.""" + + def __init__( + self, + image: str = "quay.io/biocontainers/bttoxin_digger:1.0.10--hdfd78af_0", + platform: str = "linux/amd64", + base_workdir: Optional[Path] = None, + ) -> None: + self.digger = BtToxinRunner(image=image, platform=platform, base_workdir=base_workdir) + self.shotter = ShotterAPI() + self.plotter = PlotAPI() + + def run( + self, + fna: Path | str, + toxicity_csv: Path | str = Path("Data/toxicity-data.csv"), + min_identity: float = 0.0, + min_coverage: float = 0.0, + allow_unknown_families: bool = True, + require_index_hit: bool = False, + lang: str = "zh", + threads: int = 4, + ) -> Dict[str, Any]: + dig = self.digger.run_single_fna(fna_path=fna, sequence_type="nucl", threads=threads) + if not dig.get("success"): + return {"ok": False, "stage": "digger", "detail": dig} + run_root: Path = dig["run_root"] + shotter_dir = run_root / "output" / "shotter" + shot = self.shotter.score( + toxicity_csv=Path(toxicity_csv), + all_toxins=Path(dig["files"]["all_toxins"]), + out_dir=shotter_dir, + min_identity=min_identity, + min_coverage=min_coverage, + allow_unknown_families=allow_unknown_families, + require_index_hit=require_index_hit, + ) + # choose a strain for per-hit figure + strain_for_plot = None + try: + import pandas as pd + df = pd.read_csv(shot["strain_scores"], sep="\t") + if len(df): + strain_for_plot = str(df.iloc[0]["Strain"]) + except Exception: + pass + vis = self.plotter.render( + shotter_dir=shotter_dir, + lang=lang, + merge_unresolved=True, + per_hit_strain=strain_for_plot) # type: ignore[arg-type] + # bundle + bundle = run_root / "pipeline_results.tar.gz" + with tarfile.open(bundle, "w:gz") as tar: + tar.add(run_root / "output" / "digger", arcname="digger") + tar.add(run_root / "output" / "shotter", arcname="shotter") + return { + "ok": True, + "run_root": str(run_root), + "digger_dir": str(run_root / "output" / "digger"), + "shotter_dir": str(shotter_dir), + "bundle": str(bundle), + "strain": strain_for_plot or "", + } diff --git a/scripts/bttoxin_shoter.py b/scripts/bttoxin_shoter.py new file mode 100644 index 0000000..deea966 --- /dev/null +++ b/scripts/bttoxin_shoter.py @@ -0,0 +1,699 @@ +#!/usr/bin/env python3 +""" +Bttoxin_Shoter v1 + +- Read BPPRC specificity CSV (positive-only) to build name/family -> target_order distributions +- Read BtToxin_Digger All_Toxins.txt to collect hits per strain +- Compute per-hit similarity weight and order contributions +- Combine contributions to strain-level potential activity scores per insect order +- Save per-hit and per-strain results (TSV + JSON) using dataclasses with save methods + +Assumptions and notes are documented in docs/shotter_workflow.md +""" +from __future__ import annotations + +import argparse +import json +import re +from dataclasses import dataclass, asdict, field +import os +from pathlib import Path +from typing import Dict, List, Optional, Tuple + +import pandas as pd + + +# ----------------------------- +# Helpers for parsing families +# ----------------------------- +FAMILY_PREFIXES = ( + "Cry", "Cyt", "Vip", "Vpa", "Vpb", "Mpp", "Tpp", "Spp", "App", + "Mcf", "Mpf", "Pra", "Prb", "Txp", "Gpp", "Mtx", "Xpp", +) +FAMILY_RE = re.compile(r"(?i)(Cry|Cyt|Vip|Vpa|Vpb|Mpp|Tpp|Spp|App|Mcf|Mpf|Pra|Prb|Txp|Gpp|Mtx|Xpp)(\d+)([A-Za-z]*)") + + +def normalize_hit_id(hit_id: str) -> str: + """Strip trailing markers like '-other', spaces, and keep core token. + Examples: 'Spp1Aa1' -> 'Spp1Aa1'; 'Bmp1-other' -> 'Bmp1' + """ + if not isinstance(hit_id, str): + return "" + x = hit_id.strip() + x = re.sub(r"-other$", "", x) + x = re.sub(r"\s+", "", x) + return x + + +def extract_family_keys(name: str) -> Tuple[Optional[str], Optional[str]]: + """From a protein name like 'Cry1Ac1' or '5618-Cry1Ia' extract: + - family_key: e.g., 'Cry1' + - subfamily_key: e.g., 'Cry1A' (first subclass letter if present) + Returns (None, None) if no family prefix is found. + """ + if not isinstance(name, str) or not name: + return None, None + # Try direct match first + m = FAMILY_RE.search(name) + if not m: + return None, None + fam = m.group(1) + num = m.group(2) + sub = m.group(3) # may be '' + family_key = f"{fam}{num}" + subfamily_key = f"{fam}{num}{sub[:1]}" if sub else None + return family_key, subfamily_key + + +# --------------------------------- +# Specificity index from CSV (BPPRC) +# --------------------------------- +@dataclass +class SpecificityIndex: + # Distributions P(order | name) and P(order | family/subfamily) + name_to_orders: Dict[str, Dict[str, float]] = field(default_factory=dict) + subfam_to_orders: Dict[str, Dict[str, float]] = field(default_factory=dict) + fam_to_orders: Dict[str, Dict[str, float]] = field(default_factory=dict) + # Set of all observed target orders + all_orders: List[str] = field(default_factory=list) + # Distributions P(species | name) and P(species | family/subfamily) + name_to_species: Dict[str, Dict[str, float]] = field(default_factory=dict) + subfam_to_species: Dict[str, Dict[str, float]] = field(default_factory=dict) + fam_to_species: Dict[str, Dict[str, float]] = field(default_factory=dict) + # Set of all observed target species + all_species: List[str] = field(default_factory=list) + + # Partner requirement heuristics at family level + partner_pairs: List[Tuple[str, str]] = field(default_factory=lambda: [ + ("Vip1", "Vip2"), + ("Vpa", "Vpb"), + ("BinA", "BinB"), # included for completeness if ever present + ]) + + @staticmethod + def _potency_from_row(row: pd.Series) -> Optional[float]: + """Map quantitative fields to a potency weight in [0,1]. Positive-only. + Rules: + - lc50: will be normalized later within unit-buckets; here return the numeric. + - percentage_mortality: map ranges to coarse weights. + - if both missing but activity is positive: default 0.55 + """ + perc = str(row.get("percentage_mortality") or "").strip().lower() + lc50 = row.get("lc50") + if pd.notnull(lc50): + try: + return float(lc50) # raw, to be normalized later + except Exception: + pass + if perc: + # coarse mappings + if ">80%" in perc or "80-100%" in perc or "greater than 80%" in perc: + return 0.9 + if "60-100%" in perc or "60-80%" in perc or "50-80%" in perc: + return 0.65 + if "0-60%" in perc or "0-50%" in perc or "25%" in perc or "some" in perc or "stunting" in perc: + return 0.25 + # default positive evidence + return 0.55 + + @classmethod + def from_csv(cls, csv_path: Path) -> "SpecificityIndex": + df = pd.read_csv(csv_path) + # Positive-only evidence + # many rows have activity 'Yes' and non_toxic empty; use only activity==Yes + df = df[df["activity"].astype(str).str.lower().eq("yes")] # keep positives + + # Normalize units bucket for lc50 normalization + # unify ppm -> ug_per_g (diet context). Others stay in own unit bucket. + units = df["units"].astype(str).str.strip().str.lower() + ug_per_g_mask = units.isin(["µg/g", "ug/g", "μg/g", "ppm"]) # ppm ~ ug/g in diet + unit_bucket = units.where(~ug_per_g_mask, other="ug_per_g") + df = df.assign(_unit_bucket=unit_bucket) + + # Compute potency: two cases: lc50 (to be inverted/normalized) vs categorical + df["_potency_raw"] = df.apply(cls._potency_from_row, axis=1) + + # For rows with numeric raw potency (lc50), do within-bucket inverted quantile + for bucket, sub in df.groupby("_unit_bucket"): + # separate numeric vs categorical + is_num = pd.to_numeric(sub["_potency_raw"], errors="coerce").notnull() + num_idx = sub.index[is_num] + cat_idx = sub.index[~is_num] + # smaller LC50 => stronger potency + if len(num_idx) > 0: + vals = sub.loc[num_idx, "_potency_raw"].astype(float) + ranks = vals.rank(method="average", pct=True) + inv = 1.0 - ranks # 0..1 + df.loc[num_idx, "_potency"] = inv.values + # categorical rows keep their 0-1 scores (already in _potency_raw) + if len(cat_idx) > 0: + df.loc[cat_idx, "_potency"] = sub.loc[cat_idx, "_potency_raw"].values + df["_potency"] = pd.to_numeric(df["_potency"], errors="coerce").fillna(0.55) + + # Extract family/subfamily keys from name + # Name fields can be like 'Spp1Aa1', 'Cry1Ac1', or '5618-Cry1Ia', etc. + fam_keys: List[Optional[str]] = [] + subfam_keys: List[Optional[str]] = [] + for name in df["name"].astype(str).tolist(): + fam, subfam = extract_family_keys(name) + fam_keys.append(fam) + subfam_keys.append(subfam) + df = df.assign(_family=fam_keys, _subfamily=subfam_keys) + + # Aggregate per protein name (exact) + def agg_distribution(group: pd.DataFrame) -> Dict[str, float]: + # Sum potencies per target_order, then normalize + s = group.groupby("target_order")["_potency"].sum() + if s.sum() <= 0: + return {} + d = (s / s.sum()).to_dict() + return d + + name_to_orders = ( + df.groupby("name", dropna=False, group_keys=False) + .apply(agg_distribution, include_groups=False) + .to_dict() + ) + + # Aggregate per subfamily and family as back-off + subfam_to_orders = ( + df.dropna(subset=["_subfamily"]).groupby("_subfamily", group_keys=False) + .apply(agg_distribution, include_groups=False) + .to_dict() + ) + fam_to_orders = ( + df.dropna(subset=["_family"]).groupby("_family", group_keys=False) + .apply(agg_distribution, include_groups=False) + .to_dict() + ) + + # Species distributions (optional if target_species present) + def agg_distribution_species(group: pd.DataFrame) -> Dict[str, float]: + s = group.groupby("target_species")["_potency"].sum() + if s.sum() <= 0: + return {} + d = (s / s.sum()).to_dict() + return d + + name_to_species = ( + df.groupby("name", dropna=False, group_keys=False) + .apply(agg_distribution_species, include_groups=False) + .to_dict() + ) + subfam_to_species = ( + df.dropna(subset=["_subfamily"]).groupby("_subfamily", group_keys=False) + .apply(agg_distribution_species, include_groups=False) + .to_dict() + ) + fam_to_species = ( + df.dropna(subset=["_family"]).groupby("_family", group_keys=False) + .apply(agg_distribution_species, include_groups=False) + .to_dict() + ) + + # collect all orders/species observed + all_orders = sorted({str(x) for x in df["target_order"].dropna().unique().tolist()}) + all_species = sorted({str(x) for x in df["target_species"].dropna().unique().tolist()}) + + return cls( + name_to_orders=name_to_orders, + subfam_to_orders=subfam_to_orders, + fam_to_orders=fam_to_orders, + all_orders=all_orders, + name_to_species=name_to_species, + subfam_to_species=subfam_to_species, + fam_to_species=fam_to_species, + all_species=all_species, + ) + + def partner_needed_for_family(self, family_or_subfam: str) -> Optional[str]: + # Heuristic partner rule at family level + for a, b in self.partner_pairs: + if family_or_subfam.startswith(a): + return b + if family_or_subfam.startswith(b): + return a + return None + + def orders_for_name_or_backoff(self, name_or_family: str) -> Dict[str, float]: + # Try exact name, then subfamily, then family + if name_or_family in self.name_to_orders: + return self.name_to_orders[name_or_family] + fam, subfam = extract_family_keys(name_or_family) + if subfam and subfam in self.subfam_to_orders: + return self.subfam_to_orders[subfam] + if fam and fam in self.fam_to_orders: + return self.fam_to_orders[fam] + return {} + + def species_for_name_or_backoff(self, name_or_family: str) -> Dict[str, float]: + # Try exact name, then subfamily, then family + if name_or_family in self.name_to_species: + return self.name_to_species[name_or_family] + fam, subfam = extract_family_keys(name_or_family) + if subfam and subfam in self.subfam_to_species: + return self.subfam_to_species[subfam] + if fam and fam in self.fam_to_species: + return self.fam_to_species[fam] + return {} + + +# ----------------------------- +# Dataclasses for outputs +# ----------------------------- +@dataclass +class ToxinHit: + strain: str + protein_id: str + hit_id: str + identity: float # 0..1 + aln_length: int + hit_length: int + coverage: float # 0..1 + hmm: bool + family: str + name_key: str + partner_fulfilled: bool + weight: float + order_contribs: Dict[str, float] + top_order: str + top_score: float + + def to_tsv_row(self, order_columns: List[str]) -> List[str]: + cols = [ + self.strain, + self.protein_id, + self.hit_id, + f"{self.identity:.4f}", + str(self.aln_length), + str(self.hit_length), + f"{self.coverage:.4f}", + "YES" if self.hmm else "NO", + self.family or "unknown", + self.name_key or "", + "YES" if self.partner_fulfilled else "NO", + f"{self.weight:.4f}", + self.top_order or "", + f"{self.top_score:.6f}", + ] + for o in order_columns: + v = self.order_contribs.get(o, 0.0) + cols.append(f"{v:.6f}") + return cols + + @staticmethod + def save_list_tsv(tsv_path: Path, items: List["ToxinHit"], order_columns: List[str]) -> None: + tsv_path.parent.mkdir(parents=True, exist_ok=True) + header = [ + "Strain", "Protein_id", "Hit_id", "Identity", "Aln_length", "Hit_length", "Coverage", "HMM", + "Family", "NameKey", "PartnerFulfilled", "Weight", "TopOrder", "TopScore", + ] + order_columns + with tsv_path.open("w", encoding="utf-8") as f: + f.write("\t".join(header) + "\n") + for h in items: + f.write("\t".join(h.to_tsv_row(order_columns)) + "\n") + + +@dataclass +class StrainScores: + strain: str + order_scores: Dict[str, float] + top_order: str + top_score: float + + def to_tsv_row(self, order_columns: List[str]) -> List[str]: + return [self.strain, self.top_order or "", f"{self.top_score:.6f}"] + [f"{self.order_scores.get(o, 0.0):.6f}" for o in order_columns] + + @staticmethod + def save_list_tsv(tsv_path: Path, items: List["StrainScores"], order_columns: List[str]) -> None: + tsv_path.parent.mkdir(parents=True, exist_ok=True) + header = ["Strain", "TopOrder", "TopScore"] + order_columns + with tsv_path.open("w", encoding="utf-8") as f: + f.write("\t".join(header) + "\n") + for s in items: + f.write("\t".join(s.to_tsv_row(order_columns)) + "\n") + + @staticmethod + def save_list_json(json_path: Path, items: List["StrainScores"]) -> None: + json_path.parent.mkdir(parents=True, exist_ok=True) + data = [asdict(s) for s in items] + with json_path.open("w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=2) + + +@dataclass +class StrainSpeciesScores: + strain: str + species_scores: Dict[str, float] + top_species: str + top_species_score: float + + def to_tsv_row(self, species_columns: List[str]) -> List[str]: + return [self.strain, self.top_species or "", f"{self.top_species_score:.6f}"] + [ + f"{self.species_scores.get(sp, 0.0):.6f}" for sp in species_columns + ] + + @staticmethod + def save_list_tsv(tsv_path: Path, items: List["StrainSpeciesScores"], species_columns: List[str]) -> None: + tsv_path.parent.mkdir(parents=True, exist_ok=True) + header = ["Strain", "TopSpecies", "TopSpeciesScore"] + species_columns + with tsv_path.open("w", encoding="utf-8") as f: + f.write("\t".join(header) + "\n") + for s in items: + f.write("\t".join(s.to_tsv_row(species_columns)) + "\n") + + @staticmethod + def save_list_json(json_path: Path, items: List["StrainSpeciesScores"]) -> None: + json_path.parent.mkdir(parents=True, exist_ok=True) + data = [asdict(s) for s in items] + with json_path.open("w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=2) + + +# ----------------------------- +# Parser and scoring +# ----------------------------- + +def compute_similarity_weight(identity: float, coverage: float, hmm: bool) -> float: + """Weight formula (bounded [0,1]): + base(identity, coverage) × coverage, with HMM bonus up to 1.0 + - If identity >= 0.78 and coverage >= 0.8: base = 1.0 + - If 0.45 <= identity < 0.78: base = (identity-0.45)/(0.78-0.45) + - Else: base = 0 + Final w = min(1.0, base * coverage + (0.1 if hmm else 0.0)) + """ + base = 0.0 + if identity >= 0.78 and coverage >= 0.8: + base = 1.0 + elif 0.45 <= identity < 0.78: + base = (identity - 0.45) / (0.78 - 0.45) + base = max(0.0, min(1.0, base)) + else: + base = 0.0 + w = base * max(0.0, min(1.0, coverage)) + if hmm: + w = min(1.0, w + 0.1) + return float(max(0.0, min(1.0, w))) + + +def parse_all_toxins(tsv_path: Path) -> pd.DataFrame: + df = pd.read_csv(tsv_path, sep="\t", dtype=str, low_memory=False) + # Coerce needed fields + for col in ["Identity", "Aln_length", "Hit_length"]: + df[col] = pd.to_numeric(df[col], errors="coerce") + df["Identity"] = df["Identity"].astype(float) + df["Aln_length"] = df["Aln_length"].fillna(0).astype(int) + df["Hit_length"] = df["Hit_length"].fillna(0).astype(int) + df["coverage"] = (df["Aln_length"].clip(lower=0) / df["Hit_length"].replace(0, pd.NA)).fillna(0.0) + df["identity01"] = (df["Identity"].fillna(0.0) / 100.0).clip(lower=0.0, upper=1.0) + df["HMM"] = df["HMM"].astype(str).str.upper().eq("YES") + + # Normalize hit_id and family + df["Hit_id_norm"] = df["Hit_id"].astype(str).map(normalize_hit_id) + + fams = [] + for hit in df["Hit_id_norm"].tolist(): + fam, _ = extract_family_keys(hit) + fams.append(fam or "unknown") + df["family_key"] = fams + + return df + + +def partner_fulfilled_for_hit(hit_family: str, strain_df: pd.DataFrame, index: SpecificityIndex) -> bool: + partner = index.partner_needed_for_family(hit_family or "") + if not partner: + return True + # partner satisfied if any other hit in same strain startswith partner family and has weight >= 0.3 + for _, row in strain_df.iterrows(): + fam = row.get("family_key", "") or "" + if fam.startswith(partner): + w = compute_similarity_weight( + float(row.get("identity01", 0.0)), float(row.get("coverage", 0.0)), bool(row.get("HMM", False)) + ) + if w >= 0.3: + return True + return False + + +def score_strain(strain: str, sdf: pd.DataFrame, index: SpecificityIndex) -> Tuple[List[ToxinHit], StrainScores, Optional[StrainSpeciesScores]]: + # Include special buckets per requirements + order_set = sorted({*index.all_orders, "other", "unknown"}) + per_hit: List[ToxinHit] = [] + + # Collect contributions per order by combining across hits + # We'll accumulate 1 - Π(1 - contrib) + one_minus = {o: 1.0 for o in order_set} + # Species accumulation if available + species_set = sorted(index.all_species) if index.all_species else [] + sp_one_minus = {sp: 1.0 for sp in species_set} + + for _, row in sdf.iterrows(): + hit_id = str(row.get("Hit_id_norm", "")) + protein_id = str(row.get("Protein_id", "")) + identity01 = float(row.get("identity01", 0.0)) + coverage = float(row.get("coverage", 0.0)) + hmm = bool(row.get("HMM", False)) + family = str(row.get("family_key", "")) or "unknown" + + # Choose name backoff distribution + name_key = hit_id + order_dist = index.orders_for_name_or_backoff(name_key) + if not order_dist: + # backoff to family + order_dist = index.orders_for_name_or_backoff(family) + if not order_dist: + # Route by whether we have a parsable family + # - parsable family but no evidence -> 'other' + # - unparseable family (family == 'unknown') -> 'unknown' + if family != "unknown": + order_dist = {"other": 1.0} + else: + order_dist = {"unknown": 1.0} + + # Compute weight with partner handling + w = compute_similarity_weight(identity01, coverage, hmm) + fulfilled = partner_fulfilled_for_hit(family, sdf, index) + if not fulfilled: + w *= 0.2 # partner penalty if required but not present + + contribs: Dict[str, float] = {} + for o in order_set: + p = float(order_dist.get(o, 0.0)) + c = w * p + if c > 0: + one_minus[o] *= (1.0 - c) + contribs[o] = c + + # species contributions (only for known distributions; no other/unknown buckets) + if species_set: + sp_dist = index.species_for_name_or_backoff(name_key) + if not sp_dist: + sp_dist = index.species_for_name_or_backoff(family) + if sp_dist: + for sp in species_set: + psp = float(sp_dist.get(sp, 0.0)) + csp = w * psp + if csp > 0: + sp_one_minus[sp] *= (1.0 - csp) + + # top for this hit (allow other/unknown if that's all we have) + if contribs: + hit_top_order, hit_top_score = max(contribs.items(), key=lambda kv: kv[1]) + else: + hit_top_order, hit_top_score = "", 0.0 + + per_hit.append( + ToxinHit( + strain=strain, + protein_id=protein_id, + hit_id=hit_id, + identity=identity01, + aln_length=int(row.get("Aln_length", 0)), + hit_length=int(row.get("Hit_length", 0)), + coverage=coverage, + hmm=hmm, + family=family, + name_key=name_key, + partner_fulfilled=fulfilled, + weight=w, + order_contribs=contribs, + top_order=hit_top_order, + top_score=float(hit_top_score), + ) + ) + + order_scores = {o: (1.0 - one_minus[o]) for o in order_set} + # choose top order excluding unresolved buckets if possible + preferred = [o for o in index.all_orders if o in order_scores] + if not preferred: + preferred = [o for o in order_set if o not in ("other", "unknown")] + if preferred: + top_o = max(preferred, key=lambda o: order_scores.get(o, 0.0)) + top_s = float(order_scores.get(top_o, 0.0)) + else: + top_o, top_s = "", 0.0 + strain_scores = StrainScores(strain=strain, order_scores=order_scores, top_order=top_o, top_score=top_s) + + species_scores_obj: Optional[StrainSpeciesScores] = None + if species_set: + species_scores = {sp: (1.0 - sp_one_minus[sp]) for sp in species_set} + if species_scores: + top_sp = max(species_set, key=lambda sp: species_scores.get(sp, 0.0)) + top_sp_score = float(species_scores.get(top_sp, 0.0)) + else: + top_sp, top_sp_score = "", 0.0 + species_scores_obj = StrainSpeciesScores( + strain=strain, + species_scores=species_scores, + top_species=top_sp, + top_species_score=top_sp_score, + ) + + return per_hit, strain_scores, species_scores_obj + + +# ----------------------------- +# Saving utilities +# ----------------------------- + +def save_toxin_support(tsv_path: Path, hits: List[ToxinHit], order_columns: List[str]) -> None: + tsv_path.parent.mkdir(parents=True, exist_ok=True) + header = [ + "Strain", "Protein_id", "Hit_id", "Identity", "Aln_length", "Hit_length", "Coverage", "HMM", + "Family", "NameKey", "PartnerFulfilled", "Weight", "TopOrder", "TopScore", + ] + order_columns + with tsv_path.open("w", encoding="utf-8") as f: + f.write("\t".join(header) + "\n") + for h in hits: + f.write("\t".join(h.to_tsv_row(order_columns)) + "\n") + + +def save_strain_targets(tsv_path: Path, strains: List[StrainScores], order_columns: List[str]) -> None: + tsv_path.parent.mkdir(parents=True, exist_ok=True) + header = ["Strain", "TopOrder", "TopScore"] + order_columns + with tsv_path.open("w", encoding="utf-8") as f: + f.write("\t".join(header) + "\n") + for s in strains: + f.write("\t".join(s.to_tsv_row(order_columns)) + "\n") + + +def save_strain_json(json_path: Path, strains: List[StrainScores]) -> None: + json_path.parent.mkdir(parents=True, exist_ok=True) + data = [asdict(s) for s in strains] + with json_path.open("w", encoding="utf-8") as f: + json.dump(data, f, ensure_ascii=False, indent=2) + + +# ----------------------------- +# CLI +# ----------------------------- + +def _try_writable_dir(p: Path) -> bool: + try: + p.mkdir(parents=True, exist_ok=True) + except Exception: + return False + try: + probe = p / ".__shotter_write_test__" + with probe.open("w", encoding="utf-8") as f: + f.write("ok") + probe.unlink(missing_ok=True) + return True + except Exception: + return False + + +def resolve_output_dir(preferred: Path) -> Path: + # 1) try preferred + if _try_writable_dir(preferred): + return preferred + # 2) try a subfolder 'Shotter' under preferred + sub = preferred / "Shotter" + if _try_writable_dir(sub): + print(f"[Shotter] Output dir not writable: {preferred}. Falling back to: {sub}") + return sub + # 3) fallback to cwd/shotter_outputs + alt = Path.cwd() / "shotter_outputs" + if _try_writable_dir(alt): + print(f"[Shotter] Output dir not writable: {preferred}. Falling back to: {alt}") + return alt + # 4) last resort: preferred (will likely fail but we return it) + print(f"[Shotter] WARNING: could not find writable output directory. Using {preferred} (may fail).") + return preferred + +def main(): + ap = argparse.ArgumentParser(description="Bttoxin_Shoter: infer target orders from BtToxin_Digger outputs") + ap.add_argument("--toxicity_csv", type=Path, default=Path("Data/toxicity-data.csv")) + ap.add_argument("--all_toxins", type=Path, default=Path("tests/output/Results/Toxins/All_Toxins.txt")) + ap.add_argument("--output_dir", type=Path, default=Path("tests/output/Results/Toxins")) + # Filtering and thresholds + ap.add_argument("--allow_unknown_families", dest="allow_unknown_families", action="store_true") + ap.add_argument("--disallow_unknown_families", dest="allow_unknown_families", action="store_false") + ap.set_defaults(allow_unknown_families=True) + ap.add_argument("--require_index_hit", action="store_true", default=False, + help="Keep only hits that map to a known name/subfamily/family in the specificity index") + ap.add_argument("--min_identity", type=float, default=0.0, help="Minimum identity (0-1) to keep a hit") + ap.add_argument("--min_coverage", type=float, default=0.0, help="Minimum coverage (0-1) to keep a hit") + args = ap.parse_args() + + index = SpecificityIndex.from_csv(args.toxicity_csv) + + df = parse_all_toxins(args.all_toxins) + # thresholds + if args.min_identity > 0: + df = df[df["identity01"].astype(float) >= float(args.min_identity)] + if args.min_coverage > 0: + df = df[df["coverage"].astype(float) >= float(args.min_coverage)] + # unknown families handling + if not args.allow_unknown_families: + df = df[df["family_key"].astype(str) != "unknown"] + # require index hit mapping + if args.require_index_hit: + def _has_index_orders(row) -> bool: + name_key = str(row.get("Hit_id_norm", "")) + fam = str(row.get("family_key", "")) + d = index.orders_for_name_or_backoff(name_key) + if not d: + d = index.orders_for_name_or_backoff(fam) + return bool(d) + df = df[df.apply(_has_index_orders, axis=1)] + strains = sorted(df["Strain"].astype(str).unique().tolist()) + + all_hits: List[ToxinHit] = [] + all_strain_scores: List[StrainScores] = [] + all_species_scores: List[StrainSpeciesScores] = [] + + for strain in strains: + sdf = df[df["Strain"].astype(str).eq(strain)].copy() + per_hit, sscore, sspecies = score_strain(strain, sdf, index) + all_hits.extend(per_hit) + all_strain_scores.append(sscore) + if sspecies is not None: + all_species_scores.append(sspecies) + + # Always include the special buckets in outputs + order_columns = sorted({*index.all_orders, "other", "unknown"}) or ["unknown"] + species_columns = sorted(index.all_species) + + # Resolve a writable output directory (avoid PermissionError on Docker-owned dirs) + out_dir = resolve_output_dir(args.output_dir) + + # Save via dataclass methods + ToxinHit.save_list_tsv(out_dir / "toxin_support.tsv", all_hits, order_columns) + StrainScores.save_list_tsv(out_dir / "strain_target_scores.tsv", all_strain_scores, order_columns) + StrainScores.save_list_json(out_dir / "strain_scores.json", all_strain_scores) + if species_columns and all_species_scores: + StrainSpeciesScores.save_list_tsv(out_dir / "strain_target_species_scores.tsv", all_species_scores, species_columns) + StrainSpeciesScores.save_list_json(out_dir / "strain_species_scores.json", all_species_scores) + + print(f"Saved: {out_dir / 'toxin_support.tsv'}") + print(f"Saved: {out_dir / 'strain_target_scores.tsv'}") + print(f"Saved: {out_dir / 'strain_scores.json'}") + if species_columns and all_species_scores: + print(f"Saved: {out_dir / 'strain_target_species_scores.tsv'}") + print(f"Saved: {out_dir / 'strain_species_scores.json'}") + + +if __name__ == "__main__": + main() diff --git a/scripts/plot_shotter.py b/scripts/plot_shotter.py new file mode 100644 index 0000000..8ed57f6 --- /dev/null +++ b/scripts/plot_shotter.py @@ -0,0 +1,543 @@ +#!/usr/bin/env python3 +""" +Plotting utilities for Bttoxin_Shoter outputs. + +Generates heatmaps for: +- Strain × Target Orders scores (from strain_target_scores.tsv) +- Optional Per-hit contributions for one strain (from toxin_support.tsv) +- Optional Strain × Target Species scores (from strain_target_species_scores.tsv) + +Can also write a Markdown report with: +- summary mode: brief overview +- paper mode: expanded, paper-like sections (Abstract, Methods, Results, Discussion, Params, Appendix) + +Dependencies: +- pandas, matplotlib; seaborn is optional (for nicer heatmaps). If seaborn is + missing, a pure-matplotlib fallback will be used. +""" +from __future__ import annotations + +import argparse +from pathlib import Path +from typing import List, Optional + +import pandas as pd + +try: + import seaborn as sns # type: ignore + _HAS_SNS = True +except Exception: + _HAS_SNS = False + +import matplotlib.pyplot as plt + + +def _parse_figsize(s: str | None, default=(12, 6)): + if not s: + return default + try: + w, h = s.lower().split("x") + return (float(w), float(h)) + except Exception: + return default + + +def _merge_unresolved_columns(df: pd.DataFrame, merge_unresolved: bool) -> pd.DataFrame: + if not merge_unresolved: + return df + cols = df.columns.tolist() + has_other = "other" in cols + has_unknown = "unknown" in cols + if has_other or has_unknown: + df = df.copy() + df["unresolved"] = df.get("other", 0.0) + df.get("unknown", 0.0) + drop_cols = [c for c in ("other", "unknown") if c in df.columns] + df = df.drop(columns=drop_cols) + return df + + +def plot_strain_scores(strain_scores_path: Path, out_path: Path, cmap: str, vmin: float, vmax: float, figsize: str | None, merge_unresolved: bool): + df = pd.read_csv(strain_scores_path, sep="\t") + base_cols = {"Strain", "TopOrder", "TopScore"} + orders = [c for c in df.columns if c not in base_cols] + mat = df.set_index("Strain")[orders] + mat = _merge_unresolved_columns(mat, merge_unresolved) + orders = list(mat.columns) + + # dynamic figure size if not provided + if figsize: + size = _parse_figsize(figsize) + else: + size = (max(8, 0.6 * len(orders) + 3), max(4, 0.35 * len(mat) + 2)) + + plt.figure(figsize=size) + if _HAS_SNS: + ax = sns.heatmap(mat, cmap=cmap, vmin=vmin, vmax=vmax, cbar_kws={"label": "score"}) + else: + ax = plt.gca() + im = ax.imshow(mat.values, aspect="auto", cmap=cmap, vmin=vmin, vmax=vmax) + plt.colorbar(im, ax=ax, label="score") + ax.set_xticks(range(len(orders))) + ax.set_xticklabels(orders, rotation=45, ha="right") + ax.set_yticks(range(len(mat.index))) + ax.set_yticklabels(mat.index) + ax.set_title("Strain vs Target Orders (Shotter)") + plt.tight_layout() + out_path.parent.mkdir(parents=True, exist_ok=True) + plt.savefig(out_path, dpi=200) + plt.close() + + +def plot_per_hit_for_strain(toxin_support_path: Path, strain: str, out_path: Path, cmap: str, vmin: float, vmax: float, figsize: str | None, merge_unresolved: bool): + hits = pd.read_csv(toxin_support_path, sep="\t") + sub = hits[hits["Strain"].astype(str).eq(strain)].copy() + if sub.empty: + print(f"[plot_shotter] No hits for strain: {strain}") + return + + base_cols = { + "Strain","Protein_id","Hit_id","Identity","Aln_length","Hit_length","Coverage", + "HMM","Family","NameKey","PartnerFulfilled","Weight","TopOrder","TopScore" + } + orders = [c for c in sub.columns if c not in base_cols] + mat = sub.set_index("Hit_id")[orders] + mat = _merge_unresolved_columns(mat, merge_unresolved) + orders = list(mat.columns) + + # dynamic figure size if not provided + if figsize: + size = _parse_figsize(figsize) + else: + size = (max(8, 0.6 * len(orders) + 3), max(4, 0.35 * len(mat) + 2)) + + plt.figure(figsize=size) + if _HAS_SNS: + ax = sns.heatmap(mat, cmap=cmap, vmin=vmin, vmax=vmax, cbar_kws={"label": "contrib"}) + else: + ax = plt.gca() + im = ax.imshow(mat.values, aspect="auto", cmap=cmap, vmin=vmin, vmax=vmax) + plt.colorbar(im, ax=ax, label="contrib") + ax.set_xticks(range(len(orders))) + ax.set_xticklabels(orders, rotation=45, ha="right") + ax.set_yticks(range(len(mat.index))) + ax.set_yticklabels(mat.index) + ax.set_title(f"Per-hit contributions for {strain}") + plt.tight_layout() + out_path.parent.mkdir(parents=True, exist_ok=True) + plt.savefig(out_path, dpi=200) + plt.close() + + +def plot_species_scores(species_scores_path: Path, out_path: Path, cmap: str, vmin: float, vmax: float, figsize: str | None): + df = pd.read_csv(species_scores_path, sep="\t") + species_cols = [c for c in df.columns if c not in ("Strain", "TopSpecies", "TopSpeciesScore")] + if not species_cols: + print("[plot_shotter] No species columns found; skip species heatmap") + return + mat = df.set_index("Strain")[species_cols] + if figsize: + size = _parse_figsize(figsize) + else: + size = (max(8, 0.6 * len(species_cols) + 3), max(4, 0.35 * len(mat) + 2)) + plt.figure(figsize=size) + if _HAS_SNS: + ax = sns.heatmap(mat, cmap=cmap, vmin=vmin, vmax=vmax, cbar_kws={"label": "score"}) + else: + ax = plt.gca() + im = ax.imshow(mat.values, aspect="auto", cmap=cmap, vmin=vmin, vmax=vmax) + plt.colorbar(im, ax=ax, label="score") + ax.set_xticks(range(len(species_cols))) + ax.set_xticklabels(species_cols, rotation=45, ha="right") + ax.set_yticks(range(len(mat.index))) + ax.set_yticklabels(mat.index) + ax.set_title("Strain vs Target Species (Shotter)") + plt.tight_layout() + out_path.parent.mkdir(parents=True, exist_ok=True) + plt.savefig(out_path, dpi=200) + plt.close() + + +def _percent(x: float) -> str: + try: + return f"{100.0*float(x):.1f}%" + except Exception: + return "NA" + + +def _fmtf(x, nd=3): + try: + return f"{float(x):.{nd}f}" + except Exception: + return "NA" + + +def _labels(lang: str): + zh = { + "title": "Bttoxin Shotter 论文式报告", + "summary": "# Bttoxin Shotter Summary", + "params": "## 参数", + "abstract": "## 摘要", + "methods": "## 数据与方法", + "results": "## 结果", + "discussion": "## 讨论与局限", + "appendix": "## 附录", + "top_targets": "## 各菌株 TopOrder/TopScore", + "heat_orders": "## 热图:菌株 × 目标目", + "heat_species": "## 热图:菌株 × 目标物种", + "perhit": "## 命中 TopOrder/TopScore(前50)", + "notes": "## 说明", + "unresolved_ratio": "未解析贡献(other+unknown)整体占比", + "order_distribution": "整体目标目分布(按合成分数求和排名前5):", + "hits_stats": "命中统计", + "n_strains": "菌株数", + "n_hits": "命中条数", + "avg_weight": "平均权重", + "table_header_strain": "Strain | TopOrder | TopScore", + "table_header_hit": "Hit_id | Family | TopOrder | TopScore | Weight", + "table_sep": "--- | --- | ---", + "table_sep_hit": "--- | --- | --- | --- | ---", + "methods_text": ( + "- 输入:BtToxin_Digger 的 All_Toxins.txt;BPPRC 特异性 CSV(activity=Yes)。\n" + "- 特异性索引:按 name→亚家族→家族聚合,target_order/target_species 以加权和归一构成分布。\n" + "- 相似性权重 w_hit:若 identity≥0.78 且 coverage≥0.8 则 base=1;若 0.45≤identity<0.78 则线性插值;否则 0。" + "最终 w=min(1, base×coverage + 0.1×I(HMM))). 若需配对而未满足,w×=0.2。\n" + "- 单命中贡献:c(order)=w_hit×P(order|·)。\n" + "- 菌株层合成:noisy-OR,score(order)=1-∏(1-c_i(order))。\n" + "- 物种维度:同上(若 CSV 含 target_species)。\n" + "- 分类含义:other=家族可解析但无证据;unknown=家族不可解析。\n" + ), + "discussion_text": ( + "- unresolved 偏高常见于:命名异常、家族稀缺、索引覆盖不足。\n" + "- 可用更严格阈值或 --require_index_hit 减少伪阳性;也可 --merge_unresolved 合并展示。\n" + "- 名称差异:Digger 与 BPPRC 均含 Cry 之外家族(Tpp/Spp/Vip 等),采用 name→亚家族→家族回退映射。\n" + ), + "notes_items": [ + "TopOrder/TopScore:最高目标目及其分数(命中或菌株)。", + "contrib:单命中对某目标目的贡献 w_hit×P(order|·)。", + "unresolved=other+unknown,仅为可视化合并开关,不改变计算。", + ], + } + en = { + "title": "Bttoxin Shotter Paper-style Report", + "summary": "# Bttoxin Shotter Summary", + "params": "## Parameters", + "abstract": "## Abstract", + "methods": "## Data and Methods", + "results": "## Results", + "discussion": "## Discussion and Limitations", + "appendix": "## Appendix", + "top_targets": "## Top targets per strain", + "heat_orders": "## Heatmap: Strain × Orders", + "heat_species": "## Heatmap: Strain × Species", + "perhit": "## Per-hit TopOrder/TopScore (Top 50)", + "notes": "## Notes", + "unresolved_ratio": "Overall unresolved (other+unknown) fraction", + "order_distribution": "Overall order distribution (top 5 by summed score):", + "hits_stats": "Hit statistics", + "n_strains": "#Strains", + "n_hits": "#Hits", + "avg_weight": "Avg weight", + "table_header_strain": "Strain | TopOrder | TopScore", + "table_header_hit": "Hit_id | Family | TopOrder | TopScore | Weight", + "table_sep": "--- | --- | ---", + "table_sep_hit": "--- | --- | --- | --- | ---", + "methods_text": ( + "- Inputs: All_Toxins.txt from BtToxin_Digger; BPPRC specificity CSV (activity=Yes).\n" + "- Specificity index: aggregate name→subfamily→family; build P(order/species|·) by weighted sums and normalization.\n" + "- Similarity weight w_hit: base=1 if identity≥0.78 & coverage≥0.8; linear if 0.45≤identity<0.78; else 0.\n" + " Final w=min(1, base×coverage + 0.1×I(HMM))). Partner missing → w×=0.2.\n" + "- Per-hit contribution: c(order)=w_hit×P(order|·).\n" + "- Strain level: noisy-OR, score(order)=1-∏(1-c_i(order)).\n" + "- Species dimension: same if target_species present.\n" + "- Buckets: other=parseable family but no evidence; unknown=unparseable family.\n" + ), + "discussion_text": ( + "- High unresolved is common with unusual names, rare families, or limited index coverage.\n" + "- Use stricter thresholds or --require_index_hit to reduce false positives; --merge_unresolved for visualization only.\n" + "- Naming: both Digger and BPPRC include non-Cry families (Tpp/Spp/Vip etc.), mapped via name→subfamily→family backoff.\n" + ), + "notes_items": [ + "TopOrder/TopScore: max target order and score (per hit or strain).", + "contrib: per-hit contribution w_hit×P(order|·).", + "unresolved=other+unknown merge is cosmetic; computation unchanged.", + ], + } + return zh if lang == "zh" else en + + +def write_summary_md( + out_path: Path, + strain_scores_path: Path, + toxin_support_path: Optional[Path], + species_scores_path: Optional[Path], + strain_heatmap_path: Optional[Path], + per_hit_heatmap_path: Optional[Path], + species_heatmap_path: Optional[Path], + merge_unresolved: bool, + args_namespace, +): + labels = _labels(getattr(args_namespace, "lang", "zh")) + lines: List[str] = [] + lines.append(labels["summary"]) + lines.append("") + # Parameters + lines.append(labels["params"]) + lines.append(f"- allow_unknown_families: {getattr(args_namespace, 'allow_unknown_families', 'NA')}") + lines.append(f"- require_index_hit: {getattr(args_namespace, 'require_index_hit', 'NA')}") + lines.append(f"- min_identity: {getattr(args_namespace, 'min_identity', 'NA')}") + lines.append(f"- min_coverage: {getattr(args_namespace, 'min_coverage', 'NA')}") + lines.append(f"- merge_unresolved: {merge_unresolved}") + lines.append("") + + # Top per strain table + try: + df = pd.read_csv(strain_scores_path, sep="\t") + cols = [c for c in df.columns if c not in ("Strain", "TopOrder", "TopScore")] + unresolved_cols = [c for c in ("other", "unknown") if c in cols] + unresolved_total = df[unresolved_cols].sum(axis=1).sum() if unresolved_cols else 0.0 + total_sum = df[cols].sum(axis=1).sum() if cols else 0.0 + frac_unresolved = (unresolved_total / total_sum) if total_sum > 0 else 0.0 + lines.append(labels["top_targets"]) + lines.append("") + lines.append(labels["table_header_strain"]) + lines.append(labels["table_sep"]) + for _, r in df.iterrows(): + lines.append(f"{r['Strain']} | {r.get('TopOrder','')} | {float(r.get('TopScore',0.0)):.3f}") + lines.append("") + lines.append(f"{labels['unresolved_ratio']}: {_percent(frac_unresolved)}") + lines.append("") + except Exception as e: + lines.append(f"[warn] Failed to read strain scores: {e}") + + if strain_heatmap_path and strain_heatmap_path.exists(): + lines.append(labels["heat_orders"]) + lines.append(f"![]({strain_heatmap_path.name})") + lines.append("") + + if species_scores_path and species_scores_path.exists(): + lines.append(labels["heat_species"]) + if species_heatmap_path and species_heatmap_path.exists(): + lines.append(f"![]({species_heatmap_path.name})") + else: + lines.append("Species heatmap not generated.") + lines.append("") + + # Per-hit summary + if toxin_support_path and toxin_support_path.exists(): + try: + hits = pd.read_csv(toxin_support_path, sep="\t") + lines.append(labels["perhit"]) + lines.append("") + lines.append(labels["table_header_hit"]) + lines.append(labels["table_sep_hit"]) + for _, r in hits.sort_values("TopScore", ascending=False).head(50).iterrows(): + lines.append( + f"{r['Hit_id']} | {r.get('Family','')} | {r.get('TopOrder','')} | {float(r.get('TopScore',0.0)):.3f} | {float(r.get('Weight',0.0)):.3f}" + ) + lines.append("") + except Exception as e: + lines.append(f"[warn] Failed to read toxin support: {e}") + + # FAQs + lines.append(labels["notes"]) + for it in _labels(getattr(args_namespace, "lang", "zh"))["notes_items"]: + lines.append(f"- {it}") + + out_path.parent.mkdir(parents=True, exist_ok=True) + out_path.write_text("\n".join(lines), encoding="utf-8") + + +def write_report_md( + out_path: Path, + mode: str, + lang: str, + strain_scores_path: Path, + toxin_support_path: Optional[Path], + species_scores_path: Optional[Path], + strain_heatmap_path: Optional[Path], + per_hit_heatmap_path: Optional[Path], + species_heatmap_path: Optional[Path], + merge_unresolved: bool, + args_namespace, +): + if mode == "summary": + return write_summary_md( + out_path, + strain_scores_path, + toxin_support_path, + species_scores_path, + strain_heatmap_path, + per_hit_heatmap_path, + species_heatmap_path, + merge_unresolved, + args_namespace, + ) + + L = _labels(lang) + lines: List[str] = [] + lines.append(L["title"]) # title + lines.append("") + + # Abstract: brief auto-summary + try: + df = pd.read_csv(strain_scores_path, sep="\t") + base_cols = {"Strain", "TopOrder", "TopScore"} + order_cols = [c for c in df.columns if c not in base_cols] + orders_only = df[order_cols] + unresolved_cols = [c for c in ("other", "unknown") if c in order_cols] + frac_unres = 0.0 + if order_cols: + tot = orders_only.sum(axis=1).sum() + unres = df[unresolved_cols].sum(axis=1).sum() if unresolved_cols else 0.0 + frac_unres = (unres / tot) if tot > 0 else 0.0 + top_global = [] + if order_cols: + sums = orders_only.sum(axis=0).sort_values(ascending=False) + for k, v in sums.head(5).items(): + if k in ("other", "unknown") and merge_unresolved: + continue + top_global.append(f"{k}:{_fmtf(v)}") + n_strains = len(df) + except Exception: + n_strains, frac_unres, top_global = 0, 0.0, [] + + n_hits, avg_w = 0, 0.0 + try: + if toxin_support_path and toxin_support_path.exists(): + hits = pd.read_csv(toxin_support_path, sep="\t") + n_hits = len(hits) + if "Weight" in hits.columns and len(hits) > 0: + avg_w = float(hits["Weight"].mean()) + except Exception: + pass + + lines.append(L["abstract"]) + lines.append( + f"- {L['n_strains']}: {n_strains}; {L['n_hits']}: {n_hits}; {L['avg_weight']}: {_fmtf(avg_w)}." + ) + if top_global: + lines.append(f"- {L['order_distribution']} " + ", ".join(top_global)) + lines.append(f"- {L['unresolved_ratio']}: {_percent(frac_unres)}") + lines.append("") + + # Methods + lines.append(L["methods"]) + lines.append(L["methods_text"]) + + # Results + lines.append(L["results"]) + try: + df = pd.read_csv(strain_scores_path, sep="\t") + lines.append(L["top_targets"]) + lines.append("") + lines.append(L["table_header_strain"]) + lines.append(L["table_sep"]) + for _, r in df.iterrows(): + lines.append(f"{r['Strain']} | {r.get('TopOrder','')} | {_fmtf(r.get('TopScore',0.0))}") + lines.append("") + except Exception as e: + lines.append(f"[warn] Failed to read strain scores: {e}") + + if strain_heatmap_path and strain_heatmap_path.exists(): + lines.append(L["heat_orders"]) + lines.append(f"![]({strain_heatmap_path.name})") + lines.append("") + if species_heatmap_path and species_heatmap_path.exists(): + lines.append(L["heat_species"]) + lines.append(f"![]({species_heatmap_path.name})") + lines.append("") + + # Discussion + lines.append(L["discussion"]) + lines.append(L["discussion_text"]) + + # Params + lines.append(L["params"]) + lines.append( + f"- allow_unknown_families: {getattr(args_namespace, 'allow_unknown_families', 'NA')}\n" + f"- require_index_hit: {getattr(args_namespace, 'require_index_hit', 'NA')}\n" + f"- min_identity: {getattr(args_namespace, 'min_identity', 'NA')}\n" + f"- min_coverage: {getattr(args_namespace, 'min_coverage', 'NA')}\n" + f"- merge_unresolved: {merge_unresolved}" + ) + lines.append("") + + # Appendix + if toxin_support_path and toxin_support_path.exists(): + try: + hits = pd.read_csv(toxin_support_path, sep="\t") + lines.append(L["appendix"]) + lines.append(L["perhit"]) + lines.append("") + lines.append(L["table_header_hit"]) + lines.append(L["table_sep_hit"]) + for _, r in hits.sort_values("TopScore", ascending=False).head(50).iterrows(): + lines.append( + f"{r['Hit_id']} | {r.get('Family','')} | {r.get('TopOrder','')} | {_fmtf(r.get('TopScore',0.0))} | {_fmtf(r.get('Weight',0.0))}" + ) + lines.append("") + except Exception as e: + lines.append(f"[warn] Failed to read toxin support: {e}") + + out_path.parent.mkdir(parents=True, exist_ok=True) + out_path.write_text("\n".join(lines), encoding="utf-8") + + +def main(): + ap = argparse.ArgumentParser(description="Plot heatmaps from Bttoxin_Shoter outputs") + ap.add_argument("--strain_scores", type=Path, default=Path("shotter_outputs/strain_target_scores.tsv")) + ap.add_argument("--toxin_support", type=Path, default=None) + ap.add_argument("--species_scores", type=Path, default=None) + ap.add_argument("--out_dir", type=Path, default=Path("shotter_outputs")) + ap.add_argument("--cmap", type=str, default="viridis") + ap.add_argument("--vmin", type=float, default=0.0) + ap.add_argument("--vmax", type=float, default=1.0) + ap.add_argument("--figsize", type=str, default=None, help="e.g. 12x6") + ap.add_argument("--per_hit_strain", type=str, default=None, help="If provided, also draw per-hit heatmap for this strain (requires --toxin_support)") + ap.add_argument("--merge_unresolved", action="store_true", default=False, help="Merge columns 'other' and 'unknown' into 'unresolved' for plots") + ap.add_argument("--summary_md", type=Path, default=None, help="Write a Markdown report to this path") + ap.add_argument("--report_mode", type=str, choices=["summary", "paper"], default="paper", help="Report template style") + ap.add_argument("--lang", type=str, choices=["zh", "en"], default="zh", help="Report language") + args = ap.parse_args() + + args.out_dir.mkdir(parents=True, exist_ok=True) + + # Strain × Orders heatmap + out1 = args.out_dir / "strain_target_scores.png" + plot_strain_scores(args.strain_scores, out1, args.cmap, args.vmin, args.vmax, args.figsize, args.merge_unresolved) + print(f"Saved: {out1}") + + # Optional per-hit heatmap + if args.per_hit_strain and args.toxin_support: + out2 = args.out_dir / f"per_hit_{args.per_hit_strain}.png" + plot_per_hit_for_strain(args.toxin_support, args.per_hit_strain, out2, args.cmap, args.vmin, args.vmax, args.figsize, args.merge_unresolved) + print(f"Saved: {out2}") + + # Optional species heatmap + species_png: Optional[Path] = None + if args.species_scores and args.species_scores.exists(): + species_png = args.out_dir / "strain_target_species_scores.png" + plot_species_scores(args.species_scores, species_png, args.cmap, args.vmin, args.vmax, args.figsize) + print(f"Saved: {species_png}") + + # Report + default_name = "shotter_report_paper.md" if args.report_mode == "paper" else "shotter_summary.md" + summary_md = args.summary_md or (args.out_dir / default_name) + write_report_md( + out_path=summary_md, + mode=args.report_mode, + lang=args.lang, + strain_scores_path=args.strain_scores, + toxin_support_path=args.toxin_support, + species_scores_path=args.species_scores, + strain_heatmap_path=out1, + per_hit_heatmap_path=(args.out_dir / f"per_hit_{args.per_hit_strain}.png") if (args.per_hit_strain and args.toxin_support) else None, + species_heatmap_path=species_png, + merge_unresolved=args.merge_unresolved, + args_namespace=args, + ) + print(f"Saved: {summary_md}") + + +if __name__ == "__main__": + main() diff --git a/scripts/run_single_fna_pipeline.py b/scripts/run_single_fna_pipeline.py new file mode 100644 index 0000000..d280822 --- /dev/null +++ b/scripts/run_single_fna_pipeline.py @@ -0,0 +1,236 @@ +#!/usr/bin/env python3 +"""Run a single-fna BtToxin_Digger -> Shotter -> Plots pipeline. + +- Input: one .fna file (nucleotide scaffold) +- Steps: + 1) Stage this single file, run BtToxin_Digger via DockerContainerManager + 2) Run Shotter scoring on Digger's All_Toxins.txt + 3) Render heatmaps + paper-style report + 4) Organize outputs under one root folder: + / + ├─ digger/ (container outputs) + ├─ shotter/ (Shotter TSV/JSON + plots + report) + └─ pipeline_results.tar.gz (bundle) + +Notes +- Digger is executed in a container (root in container); files may be owned by root on host. + We write everything into /digger to keep permissions/locality predictable. +- This script exposes CLI flags for Shotter filters to allow strict/loose runs. + +Example + python scripts/run_single_fna_pipeline.py \ + --fna tests/test_data/C15.fna \ + --toxicity_csv Data/toxicity-data.csv \ + --out_root runs/C15_run \ + --min_identity 0.50 --min_coverage 0.60 \ + --disallow_unknown_families --require_index_hit --lang zh +""" +from __future__ import annotations + +import argparse +import os +import shutil +import subprocess +import sys +import tarfile +from pathlib import Path +from typing import Dict, Any + +# import DockerContainerManager from backend +sys.path.append(str(Path(__file__).resolve().parents[1] / "backend")) +from app.utils.docker_client import DockerContainerManager # type: ignore + + +def _shell(cmd: list[str]) -> subprocess.CompletedProcess: + return subprocess.run(cmd, text=True) + + +def _read_first_strain(strain_scores_tsv: Path) -> str: + try: + with strain_scores_tsv.open("r", encoding="utf-8") as f: + header = f.readline().strip().split("\t") + idx_strain = header.index("Strain") + # next non-empty line + for line in f: + if not line.strip(): + continue + parts = line.rstrip("\n").split("\t") + if len(parts) > idx_strain: + return parts[idx_strain] + except Exception: + pass + return "" + + +def run_single_fna_pipeline( + fna_path: Path, + out_root: Path, + toxicity_csv: Path = Path("Data/toxicity-data.csv"), + image: str = "quay.io/biocontainers/bttoxin_digger:1.0.10--hdfd78af_0", + platform: str = "linux/amd64", + min_identity: float = 0.0, + min_coverage: float = 0.0, + allow_unknown_families: bool = True, + require_index_hit: bool = False, + lang: str = "zh", +) -> Dict[str, Any]: + fna_path = fna_path.resolve() + out_root = out_root.resolve() + out_root.mkdir(parents=True, exist_ok=True) + + digger_dir = out_root / "digger" + shotter_dir = out_root / "shotter" + logs_dir = out_root / "logs" + stage_dir = out_root / "stage" + for d in (digger_dir, shotter_dir, logs_dir, stage_dir): + d.mkdir(parents=True, exist_ok=True) + + # Stage single input file + staged_fna = stage_dir / fna_path.name + shutil.copy2(fna_path, staged_fna) + + # 1) Run BtToxin_Digger via DockerContainerManager + mgr = DockerContainerManager(image=image, platform=platform) + result = mgr.run_bttoxin_digger( + input_dir=stage_dir, + output_dir=digger_dir, + log_dir=logs_dir, + sequence_type="nucl", + scaf_suffix=fna_path.suffix or ".fna", + threads=4, + ) + if not result.get("success"): + return { + "ok": False, + "stage": "digger", + "error": result.get("error") or f"Digger failed (exit={result.get('exit_code')})", + "logs": (logs_dir / "digger_execution.log").read_text(encoding="utf-8") if (logs_dir / "digger_execution.log").exists() else "", + } + + toxins_dir = digger_dir / "Results" / "Toxins" + all_toxins = toxins_dir / "All_Toxins.txt" + if not all_toxins.exists(): + return {"ok": False, "stage": "digger", "error": f"Missing All_Toxins.txt at {all_toxins}"} + + # 2) Run Shotter scoring + shotter_dir.mkdir(parents=True, exist_ok=True) + py = sys.executable + shoter_cmd: list[str] = [ + py, + str(Path(__file__).resolve().parents[0] / "bttoxin_shoter.py"), + "--toxicity_csv", + str(toxicity_csv), + "--all_toxins", + str(all_toxins), + "--output_dir", + str(shotter_dir), + ] + if min_identity and min_identity > 0: + shoter_cmd += ["--min_identity", str(min_identity)] + if min_coverage and min_coverage > 0: + shoter_cmd += ["--min_coverage", str(min_coverage)] + if not allow_unknown_families: + shoter_cmd += ["--disallow_unknown_families"] + if require_index_hit: + shoter_cmd += ["--require_index_hit"] + + r1 = _shell(shoter_cmd) + if r1.returncode != 0: + return {"ok": False, "stage": "shotter", "error": f"Shotter failed: {' '.join(shoter_cmd)}"} + + strain_scores = shotter_dir / "strain_target_scores.tsv" + toxin_support = shotter_dir / "toxin_support.tsv" + species_scores = shotter_dir / "strain_target_species_scores.tsv" + + # 3) Plot & report + strain_for_plot = _read_first_strain(strain_scores) + plot_cmd: list[str] = [ + py, + str(Path(__file__).resolve().parents[0] / "plot_shotter.py"), + "--strain_scores", + str(strain_scores), + "--toxin_support", + str(toxin_support), + "--species_scores", + str(species_scores), + "--out_dir", + str(shotter_dir), + "--merge_unresolved", + "--report_mode", + "paper", + "--lang", + lang, + ] + if strain_for_plot: + plot_cmd += ["--per_hit_strain", strain_for_plot] + + r2 = _shell(plot_cmd) + if r2.returncode != 0: + # plotting/report optional; continue + pass + + # 4) Bundle + bundle = out_root / "pipeline_results.tar.gz" + with tarfile.open(bundle, "w:gz") as tar: + tar.add(digger_dir, arcname="digger") + tar.add(shotter_dir, arcname="shotter") + + return { + "ok": True, + "digger_dir": str(digger_dir), + "shotter_dir": str(shotter_dir), + "bundle": str(bundle), + "all_toxins": str(all_toxins), + "strain": strain_for_plot, + } + + +def main() -> int: + ap = argparse.ArgumentParser(description="Run single-fna Digger -> Shotter pipeline") + ap.add_argument("--fna", type=Path, required=True, help="Path to a single .fna file") + ap.add_argument("--toxicity_csv", type=Path, default=Path("Data/toxicity-data.csv")) + ap.add_argument("--out_root", type=Path, default=Path("runs/single_run")) + ap.add_argument("--image", type=str, default="quay.io/biocontainers/bttoxin_digger:1.0.10--hdfd78af_0") + ap.add_argument("--platform", type=str, default="linux/amd64") + ap.add_argument("--min_identity", type=float, default=0.0) + ap.add_argument("--min_coverage", type=float, default=0.0) + ap.add_argument("--disallow_unknown_families", action="store_true", default=False) + ap.add_argument("--require_index_hit", action="store_true", default=False) + ap.add_argument("--lang", type=str, choices=["zh", "en"], default="zh") + args = ap.parse_args() + + # derive per-run default out_root using file stem + if str(args.out_root) == "runs/single_run": + stem = args.fna.stem + args.out_root = Path("runs") / f"{stem}_run" + + res = run_single_fna_pipeline( + fna_path=args.fna, + out_root=args.out_root, + toxicity_csv=args.toxicity_csv, + image=args.image, + platform=args.platform, + min_identity=args.min_identity, + min_coverage=args.min_coverage, + allow_unknown_families=not args.disallow_unknown_families, + require_index_hit=args.require_index_hit, + lang=args.lang, + ) + + if not res.get("ok"): + print(f"[pipeline] FAILED at stage={res.get('stage')}: {res.get('error')}") + logs = res.get("logs") + if logs: + print(logs[:2000]) + return 1 + + print("[pipeline] ✓ Done") + print(f" Digger: {res['digger_dir']}") + print(f" Shotter: {res['shotter_dir']}") + print(f" Bundle: {res['bundle']}") + print(f" Strain: {res.get('strain','')}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main())