Files
bttoxin-pipeline/scripts/bttoxin_api.py
hotwa 5883e13c56 feat(shotter): 实现 Shotter v1 活性评估与单 FNA 流程,新增 API/CLI/绘图与报告
- 新增 scripts/bttoxin_shoter.py:从 BPPRC 正样本 CSV 构建 name/亚家族/家族特异性索引,
  解析 BtToxin_Digger All_Toxins.txt,计算 per-hit 权重并以 noisy-OR 合成菌株×目标目/物种分数,
  输出 TSV/JSON;含 HMM 加成与配对毒素规则(Vip1/Vip2,Vpa/Vpb),other/unknown 桶。
- 新增端到端工具链:
  - scripts/run_single_fna_pipeline.py:Digger → Shotter → Plot → 打包
  - scripts/plot_shotter.py:绘制热图并生成论文式/摘要式报告
  - scripts/bttoxin_api.py 与 bttoxin/api.py:纯 Python API;bttoxin/cli.py 暴露 bttoxin-run
  - pyproject.toml:项目打包与 CLI 入口
- docs(README): 增加输入文件格式与结果解读,补充单目录写入方案
- chore(gitignore): 忽略 runs/ 与 tests/output
- ci: 移除 .woodpecker/test.yml
2025-12-01 10:11:26 +08:00

287 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
from __future__ import annotations
import logging
import os
import shutil
import tarfile
from pathlib import Path
from types import SimpleNamespace
from typing import Dict, Any, Optional
import sys as _sys
# Add backend and scripts to sys.path for API imports
_BACKEND_DIR = Path(__file__).resolve().parents[1] / "backend"
_SCRIPTS_DIR = Path(__file__).resolve().parents[0]
if str(_BACKEND_DIR) not in _sys.path:
_sys.path.append(str(_BACKEND_DIR))
if str(_SCRIPTS_DIR) not in _sys.path:
_sys.path.append(str(_SCRIPTS_DIR))
from app.utils.docker_client import DockerContainerManager # type: ignore
import bttoxin_shoter as shoter # type: ignore
import plot_shotter as plotter # type: ignore
logger = logging.getLogger(__name__)
class BtToxinRunner:
"""封装 BtToxin_Digger docker 调用 - 针对单个 FNA."""
def __init__(
self,
image: str = "quay.io/biocontainers/bttoxin_digger:1.0.10--hdfd78af_0",
platform: str = "linux/amd64",
base_workdir: Optional[Path] = None,
) -> None:
self.image = image
self.platform = platform
if base_workdir is None:
base_workdir = Path(__file__).resolve().parents[1] / "runs" / "bttoxin"
self.base_workdir = base_workdir
self.base_workdir.mkdir(parents=True, exist_ok=True)
self.mgr = DockerContainerManager(image=self.image, platform=self.platform)
def _prepare_layout(self, fna_path: Path) -> tuple[Path, Path, Path, Path, str]:
if not fna_path.exists():
raise FileNotFoundError(f"FNA 文件不存在: {fna_path}")
sample_name = fna_path.stem
run_root = self.base_workdir / sample_name
input_dir = run_root / "input"
output_root = run_root / "output"
digger_out = output_root / "digger"
log_dir = run_root / "logs"
for d in (input_dir, digger_out, log_dir):
d.mkdir(parents=True, exist_ok=True)
# stage FNA (hardlink or copy)
target = input_dir / fna_path.name
if target.exists():
target.unlink()
try:
os.link(fna_path, target)
logger.info("使用硬链接复制 FNA: %s%s", fna_path, target)
except OSError:
shutil.copy2(fna_path, target)
logger.info("复制 FNA: %s%s", fna_path, target)
return input_dir, digger_out, log_dir, run_root, sample_name
def run_single_fna(self, fna_path: Path | str, sequence_type: str = "nucl", threads: int = 4) -> Dict[str, Any]:
fna_path = Path(fna_path)
input_dir, digger_out, log_dir, run_root, sample_name = self._prepare_layout(fna_path)
logger.info("开始 BtToxin_Digger 分析: %s (sample=%s)", fna_path, sample_name)
result = self.mgr.run_bttoxin_digger(
input_dir=input_dir,
output_dir=digger_out,
log_dir=log_dir,
sequence_type=sequence_type,
scaf_suffix=fna_path.suffix or ".fna",
threads=threads,
)
toxins_dir = digger_out / "Results" / "Toxins"
files = {
"list": toxins_dir / f"{sample_name}.list",
"gbk": toxins_dir / f"{sample_name}.gbk",
"all_genes": toxins_dir / "Bt_all_genes.table",
"all_toxins": toxins_dir / "All_Toxins.txt",
}
ok = bool(result.get("success")) and files["all_toxins"].exists()
return {
"success": ok,
"sample": sample_name,
"run_root": run_root,
"input_dir": input_dir,
"digger_out": digger_out,
"log_dir": log_dir,
"toxins_dir": toxins_dir,
"files": files,
"raw_result": result,
}
class ShotterAPI:
"""纯 Python API 调用 Shotter 打分并保存结果."""
def score(
self,
toxicity_csv: Path,
all_toxins: Path,
out_dir: Path,
min_identity: float = 0.0,
min_coverage: float = 0.0,
allow_unknown_families: bool = True,
require_index_hit: bool = False,
) -> Dict[str, Any]:
out_dir.mkdir(parents=True, exist_ok=True)
index = shoter.SpecificityIndex.from_csv(toxicity_csv)
df = shoter.parse_all_toxins(all_toxins)
# thresholds
if min_identity > 0:
df = df[df["identity01"].astype(float) >= float(min_identity)]
if min_coverage > 0:
df = df[df["coverage"].astype(float) >= float(min_coverage)]
# unknown families handling
if not allow_unknown_families:
df = df[df["family_key"].astype(str) != "unknown"]
# require index hit mapping
if require_index_hit:
def _has_index_orders(row) -> bool:
name_key = str(row.get("Hit_id_norm", ""))
fam = str(row.get("family_key", ""))
d = index.orders_for_name_or_backoff(name_key)
if not d:
d = index.orders_for_name_or_backoff(fam)
return bool(d)
df = df[df.apply(_has_index_orders, axis=1)]
strains = sorted(df["Strain"].astype(str).unique().tolist())
all_hits: list[shoter.ToxinHit] = []
all_strain_scores: list[shoter.StrainScores] = []
all_species_scores: list[shoter.StrainSpeciesScores] = []
for strain in strains:
sdf = df[df["Strain"].astype(str).eq(strain)].copy()
per_hit, sscore, sspecies = shoter.score_strain(strain, sdf, index)
all_hits.extend(per_hit)
all_strain_scores.append(sscore)
if sspecies is not None:
all_species_scores.append(sspecies)
order_columns = sorted({*index.all_orders, "other", "unknown"}) or ["unknown"]
species_columns = sorted(index.all_species)
shoter.ToxinHit.save_list_tsv(out_dir / "toxin_support.tsv", all_hits, order_columns)
shoter.StrainScores.save_list_tsv(out_dir / "strain_target_scores.tsv", all_strain_scores, order_columns)
shoter.StrainScores.save_list_json(out_dir / "strain_scores.json", all_strain_scores)
if species_columns and all_species_scores:
shoter.StrainSpeciesScores.save_list_tsv(out_dir / "strain_target_species_scores.tsv", all_species_scores, species_columns)
shoter.StrainSpeciesScores.save_list_json(out_dir / "strain_species_scores.json", all_species_scores)
return {
"orders": order_columns,
"species": species_columns,
"strain_scores": out_dir / "strain_target_scores.tsv",
"toxin_support": out_dir / "toxin_support.tsv",
"strain_scores_json": out_dir / "strain_scores.json",
"species_scores": out_dir / "strain_target_species_scores.tsv",
"species_scores_json": out_dir / "strain_species_scores.json",
}
class PlotAPI:
"""调用绘图与报告 API非子进程"""
def render(
self,
shotter_dir: Path,
lang: str = "zh",
merge_unresolved: bool = True,
per_hit_strain: Optional[str] = None,
cmap: str = "viridis",
vmin: float = 0.0,
vmax: float = 1.0,
) -> Dict[str, Any]:
strain_scores = shotter_dir / "strain_target_scores.tsv"
toxin_support = shotter_dir / "toxin_support.tsv"
species_scores = shotter_dir / "strain_target_species_scores.tsv"
out1 = shotter_dir / "strain_target_scores.png"
plotter.plot_strain_scores(strain_scores, out1, cmap, vmin, vmax, None, merge_unresolved)
out2 = None
if per_hit_strain and toxin_support.exists():
out2 = shotter_dir / f"per_hit_{per_hit_strain}.png"
plotter.plot_per_hit_for_strain(toxin_support, per_hit_strain, out2, cmap, vmin, vmax, None, merge_unresolved)
species_png = None
if species_scores.exists():
species_png = shotter_dir / "strain_target_species_scores.png"
plotter.plot_species_scores(species_scores, species_png, cmap, vmin, vmax, None)
# Report
args_ns = SimpleNamespace(
allow_unknown_families=None,
require_index_hit=None,
min_identity=None,
min_coverage=None,
lang=lang,
)
report_path = shotter_dir / "shotter_report_paper.md"
plotter.write_report_md(
out_path=report_path,
mode="paper",
lang=lang,
strain_scores_path=strain_scores,
toxin_support_path=toxin_support if toxin_support.exists() else None,
species_scores_path=species_scores if species_scores.exists() else None,
strain_heatmap_path=out1,
per_hit_heatmap_path=out2,
species_heatmap_path=species_png,
merge_unresolved=merge_unresolved,
args_namespace=args_ns,
)
return {
"strain_orders_png": out1,
"per_hit_png": out2,
"species_png": species_png,
"report_md": report_path,
}
class BtSingleFnaPipeline:
"""单 FNA 的完整 API 流程Digger → Shotter → Plot → 打包."""
def __init__(
self,
image: str = "quay.io/biocontainers/bttoxin_digger:1.0.10--hdfd78af_0",
platform: str = "linux/amd64",
base_workdir: Optional[Path] = None,
) -> None:
self.digger = BtToxinRunner(image=image, platform=platform, base_workdir=base_workdir)
self.shotter = ShotterAPI()
self.plotter = PlotAPI()
def run(
self,
fna: Path | str,
toxicity_csv: Path | str = Path("Data/toxicity-data.csv"),
min_identity: float = 0.0,
min_coverage: float = 0.0,
allow_unknown_families: bool = True,
require_index_hit: bool = False,
lang: str = "zh",
threads: int = 4,
) -> Dict[str, Any]:
dig = self.digger.run_single_fna(fna_path=fna, sequence_type="nucl", threads=threads)
if not dig.get("success"):
return {"ok": False, "stage": "digger", "detail": dig}
run_root: Path = dig["run_root"]
shotter_dir = run_root / "output" / "shotter"
shot = self.shotter.score(
toxicity_csv=Path(toxicity_csv),
all_toxins=Path(dig["files"]["all_toxins"]),
out_dir=shotter_dir,
min_identity=min_identity,
min_coverage=min_coverage,
allow_unknown_families=allow_unknown_families,
require_index_hit=require_index_hit,
)
# choose a strain for per-hit figure
strain_for_plot = None
try:
import pandas as pd
df = pd.read_csv(shot["strain_scores"], sep="\t")
if len(df):
strain_for_plot = str(df.iloc[0]["Strain"])
except Exception:
pass
vis = self.plotter.render(
shotter_dir=shotter_dir,
lang=lang,
merge_unresolved=True,
per_hit_strain=strain_for_plot) # type: ignore[arg-type]
# bundle
bundle = run_root / "pipeline_results.tar.gz"
with tarfile.open(bundle, "w:gz") as tar:
tar.add(run_root / "output" / "digger", arcname="digger")
tar.add(run_root / "output" / "shotter", arcname="shotter")
return {
"ok": True,
"run_root": str(run_root),
"digger_dir": str(run_root / "output" / "digger"),
"shotter_dir": str(shotter_dir),
"bundle": str(bundle),
"strain": strain_for_plot or "",
}