#!/usr/bin/env python3 """ Plotting utilities for Bttoxin_Shoter outputs. Generates heatmaps for: - Strain × Target Orders scores (from strain_target_scores.tsv) - Optional Per-hit contributions for one strain (from toxin_support.tsv) - Optional Strain × Target Species scores (from strain_target_species_scores.tsv) Can also write a Markdown report with: - summary mode: brief overview - paper mode: expanded, paper-like sections (Abstract, Methods, Results, Discussion, Params, Appendix) Dependencies: - pandas, matplotlib; seaborn is optional (for nicer heatmaps). If seaborn is missing, a pure-matplotlib fallback will be used. """ from __future__ import annotations import argparse import json from pathlib import Path from typing import List, Optional, Dict, Any import pandas as pd try: import seaborn as sns # type: ignore _HAS_SNS = True except Exception: _HAS_SNS = False import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt def _parse_figsize(s: str | None, default=(12, 6)): if not s: return default try: w, h = s.lower().split("x") return (float(w), float(h)) except Exception: return default def _merge_unresolved_columns(df: pd.DataFrame, merge_unresolved: bool) -> pd.DataFrame: if not merge_unresolved: return df cols = df.columns.tolist() has_other = "other" in cols has_unknown = "unknown" in cols if has_other or has_unknown: df = df.copy() df["unresolved"] = df.get("other", 0.0) + df.get("unknown", 0.0) drop_cols = [c for c in ("other", "unknown") if c in df.columns] df = df.drop(columns=drop_cols) return df def plot_strain_scores(strain_scores_path: Path, out_path: Path, cmap: str, vmin: float, vmax: float, figsize: str | None, merge_unresolved: bool): df = pd.read_csv(strain_scores_path, sep="\t") base_cols = {"Strain", "TopOrder", "TopScore"} orders = [c for c in df.columns if c not in base_cols] mat = df.set_index("Strain")[orders] mat = _merge_unresolved_columns(mat, merge_unresolved) orders = list(mat.columns) # dynamic figure size if not provided if figsize: size = _parse_figsize(figsize) else: size = (max(8, 0.6 * len(orders) + 3), max(4, 0.35 * len(mat) + 2)) plt.figure(figsize=size) if _HAS_SNS: ax = sns.heatmap(mat, cmap=cmap, vmin=vmin, vmax=vmax, cbar_kws={"label": "score"}) else: ax = plt.gca() im = ax.imshow(mat.values, aspect="auto", cmap=cmap, vmin=vmin, vmax=vmax) plt.colorbar(im, ax=ax, label="score") ax.set_xticks(range(len(orders))) ax.set_xticklabels(orders, rotation=45, ha="right") ax.set_yticks(range(len(mat.index))) ax.set_yticklabels(mat.index) ax.set_title("Strain vs Target Orders (Shotter)") plt.tight_layout() out_path.parent.mkdir(parents=True, exist_ok=True) plt.savefig(out_path, dpi=200) plt.close() def plot_per_hit_for_strain(toxin_support_path: Path, strain: str, out_path: Path, cmap: str, vmin: float, vmax: float, figsize: str | None, merge_unresolved: bool): hits = pd.read_csv(toxin_support_path, sep="\t") sub = hits[hits["Strain"].astype(str).eq(strain)].copy() if sub.empty: print(f"[plot_shotter] No hits for strain: {strain}") return base_cols = { "Strain","Protein_id","Hit_id","Identity","Aln_length","Hit_length","Coverage", "HMM","Family","NameKey","PartnerFulfilled","Weight","TopOrder","TopScore" } orders = [c for c in sub.columns if c not in base_cols] mat = sub.set_index("Hit_id")[orders] mat = _merge_unresolved_columns(mat, merge_unresolved) orders = list(mat.columns) # dynamic figure size if not provided if figsize: size = _parse_figsize(figsize) else: size = (max(8, 0.6 * len(orders) + 3), max(4, 0.35 * len(mat) + 2)) plt.figure(figsize=size) if _HAS_SNS: ax = sns.heatmap(mat, cmap=cmap, vmin=vmin, vmax=vmax, cbar_kws={"label": "contrib"}) else: ax = plt.gca() im = ax.imshow(mat.values, aspect="auto", cmap=cmap, vmin=vmin, vmax=vmax) plt.colorbar(im, ax=ax, label="contrib") ax.set_xticks(range(len(orders))) ax.set_xticklabels(orders, rotation=45, ha="right") ax.set_yticks(range(len(mat.index))) ax.set_yticklabels(mat.index) ax.set_title(f"Per-hit contributions for {strain}") plt.tight_layout() out_path.parent.mkdir(parents=True, exist_ok=True) plt.savefig(out_path, dpi=200) plt.close() def plot_species_scores(species_scores_path: Path, out_path: Path, cmap: str, vmin: float, vmax: float, figsize: str | None): df = pd.read_csv(species_scores_path, sep="\t") species_cols = [c for c in df.columns if c not in ("Strain", "TopSpecies", "TopSpeciesScore")] if not species_cols: print("[plot_shotter] No species columns found; skip species heatmap") return mat = df.set_index("Strain")[species_cols] if figsize: size = _parse_figsize(figsize) else: size = (max(8, 0.6 * len(species_cols) + 3), max(4, 0.35 * len(mat) + 2)) plt.figure(figsize=size) if _HAS_SNS: ax = sns.heatmap(mat, cmap=cmap, vmin=vmin, vmax=vmax, cbar_kws={"label": "score"}) else: ax = plt.gca() im = ax.imshow(mat.values, aspect="auto", cmap=cmap, vmin=vmin, vmax=vmax) plt.colorbar(im, ax=ax, label="score") ax.set_xticks(range(len(species_cols))) ax.set_xticklabels(species_cols, rotation=45, ha="right") ax.set_yticks(range(len(mat.index))) ax.set_yticklabels(mat.index) ax.set_title("Strain vs Target Species (Shotter)") plt.tight_layout() out_path.parent.mkdir(parents=True, exist_ok=True) plt.savefig(out_path, dpi=200) plt.close() def _percent(x: float) -> str: try: return f"{100.0*float(x):.1f}%" except Exception: return "NA" def _fmtf(x, nd=3): try: return f"{float(x):.{nd}f}" except Exception: return "NA" def _labels(lang: str): zh = { "title": "Bttoxin Shotter 论文式报告", "summary": "# Bttoxin Shotter Summary", "params": "## 参数", "abstract": "## 摘要", "methods": "## 数据与方法", "results": "## 结果", "discussion": "## 讨论与局限", "appendix": "## 附录", "top_targets": "## 各菌株 TopOrder/TopScore", "heat_orders": "## 热图:菌株 × 目标目", "heat_species": "## 热图:菌株 × 目标物种", "perhit": "## 命中 TopOrder/TopScore(前50)", "notes": "## 说明", "unresolved_ratio": "未解析贡献(other+unknown)整体占比", "order_distribution": "整体目标目分布(按合成分数求和排名前5):", "hits_stats": "命中统计", "n_strains": "菌株数", "n_hits": "命中条数", "avg_weight": "平均权重", "table_header_strain": "Strain | TopOrder | TopScore", "table_header_hit": "Hit_id | Family | TopOrder | TopScore | Weight", "table_sep": "--- | --- | ---", "table_sep_hit": "--- | --- | --- | --- | ---", "methods_text": ( "- 输入:BtToxin_Digger 的 All_Toxins.txt;BPPRC 特异性 CSV(activity=Yes)。\n" "- 特异性索引:按 name→亚家族→家族聚合,target_order/target_species 以加权和归一构成分布。\n" "- 相似性权重 w_hit:若 identity≥0.78 且 coverage≥0.8 则 base=1;若 0.45≤identity<0.78 则线性插值;否则 0。" "最终 w=min(1, base×coverage + 0.1×I(HMM))). 若需配对而未满足,w×=0.2。\n" "- 单命中贡献:c(order)=w_hit×P(order|·)。\n" "- 菌株层合成:noisy-OR,score(order)=1-∏(1-c_i(order))。\n" "- 物种维度:同上(若 CSV 含 target_species)。\n" "- 分类含义:other=家族可解析但无证据;unknown=家族不可解析。\n" ), "discussion_text": ( "- unresolved 偏高常见于:命名异常、家族稀缺、索引覆盖不足。\n" "- 可用更严格阈值或 --require_index_hit 减少伪阳性;也可 --merge_unresolved 合并展示。\n" "- 名称差异:Digger 与 BPPRC 均含 Cry 之外家族(Tpp/Spp/Vip 等),采用 name→亚家族→家族回退映射。\n" ), "notes_items": [ "TopOrder/TopScore:最高目标目及其分数(命中或菌株)。", "contrib:单命中对某目标目的贡献 w_hit×P(order|·)。", "unresolved=other+unknown,仅为可视化合并开关,不改变计算。", ], } en = { "title": "Bttoxin Shotter Paper-style Report", "summary": "# Bttoxin Shotter Summary", "params": "## Parameters", "abstract": "## Abstract", "methods": "## Data and Methods", "results": "## Results", "discussion": "## Discussion and Limitations", "appendix": "## Appendix", "top_targets": "## Top targets per strain", "heat_orders": "## Heatmap: Strain × Orders", "heat_species": "## Heatmap: Strain × Species", "perhit": "## Per-hit TopOrder/TopScore (Top 50)", "notes": "## Notes", "unresolved_ratio": "Overall unresolved (other+unknown) fraction", "order_distribution": "Overall order distribution (top 5 by summed score):", "hits_stats": "Hit statistics", "n_strains": "#Strains", "n_hits": "#Hits", "avg_weight": "Avg weight", "table_header_strain": "Strain | TopOrder | TopScore", "table_header_hit": "Hit_id | Family | TopOrder | TopScore | Weight", "table_sep": "--- | --- | ---", "table_sep_hit": "--- | --- | --- | --- | ---", "methods_text": ( "- Inputs: All_Toxins.txt from BtToxin_Digger; BPPRC specificity CSV (activity=Yes).\n" "- Specificity index: aggregate name→subfamily→family; build P(order/species|·) by weighted sums and normalization.\n" "- Similarity weight w_hit: base=1 if identity≥0.78 & coverage≥0.8; linear if 0.45≤identity<0.78; else 0.\n" " Final w=min(1, base×coverage + 0.1×I(HMM))). Partner missing → w×=0.2.\n" "- Per-hit contribution: c(order)=w_hit×P(order|·).\n" "- Strain level: noisy-OR, score(order)=1-∏(1-c_i(order)).\n" "- Species dimension: same if target_species present.\n" "- Buckets: other=parseable family but no evidence; unknown=unparseable family.\n" ), "discussion_text": ( "- High unresolved is common with unusual names, rare families, or limited index coverage.\n" "- Use stricter thresholds or --require_index_hit to reduce false positives; --merge_unresolved for visualization only.\n" "- Naming: both Digger and BPPRC include non-Cry families (Tpp/Spp/Vip etc.), mapped via name→subfamily→family backoff.\n" ), "notes_items": [ "TopOrder/TopScore: max target order and score (per hit or strain).", "contrib: per-hit contribution w_hit×P(order|·).", "unresolved=other+unknown merge is cosmetic; computation unchanged.", ], } return zh if lang == "zh" else en def _format_crispr_section(crispr_data: Dict[str, Any], lang: str) -> List[str]: """Format CRISPR analysis results as a Markdown section.""" L = { "zh": { "title": "## CRISPR-Cas 融合分析", "summary": "CRISPR-Cas 系统检测摘要", "associations": "CRISPR-毒素关联", "arrays": "检测到的 CRISPR Arrays", "proximal": "邻近关联 (Proximity)", "spacer": "Spacer 匹配", "none": "未检测到显著关联。", "table_header": "类型 | 毒素 | 距离 (bp) | Array ID", "table_sep": "--- | --- | --- | ---" }, "en": { "title": "## CRISPR-Cas Fusion Analysis", "summary": "CRISPR-Cas System Detection Summary", "associations": "CRISPR-Toxin Associations", "arrays": "Detected CRISPR Arrays", "proximal": "Proximity", "spacer": "Spacer Match", "none": "No significant associations detected.", "table_header": "Type | Toxin | Distance (bp) | Array ID", "table_sep": "--- | --- | --- | ---" } } txt = L.get(lang, L["en"]) lines = [] lines.append(txt["title"]) lines.append("") # Summary if "summary" in crispr_data: s = crispr_data["summary"] lines.append(f"- **{txt['proximal']}**: {s.get('proximal_pairs', 0)}") lines.append(f"- **{txt['spacer']}**: {s.get('spacer_matches', 0)}") lines.append("") # Associations Table assocs = crispr_data.get("associations", []) if assocs: lines.append(f"### {txt['associations']}") lines.append("") lines.append(txt["table_header"]) lines.append(txt["table_sep"]) for a in assocs: atype = a.get("type", "unknown") toxin = a.get("toxin", "") dist = a.get("distance", "-") aid = a.get("array_id", "") lines.append(f"{atype} | {toxin} | {dist} | {aid}") lines.append("") else: lines.append(txt["none"]) lines.append("") return lines def write_summary_md( out_path: Path, strain_scores_path: Path, toxin_support_path: Optional[Path], species_scores_path: Optional[Path], strain_heatmap_path: Optional[Path], per_hit_heatmap_path: Optional[Path], species_heatmap_path: Optional[Path], merge_unresolved: bool, args_namespace, crispr_data: Optional[Dict] = None, ): labels = _labels(getattr(args_namespace, "lang", "zh")) lines: List[str] = [] lines.append(labels["summary"]) lines.append("") # CRISPR Section (Top) if crispr_data: lines.extend(_format_crispr_section(crispr_data, getattr(args_namespace, "lang", "zh"))) lines.append("") # Parameters lines.append(labels["params"]) lines.append(f"- allow_unknown_families: {getattr(args_namespace, 'allow_unknown_families', 'NA')}") lines.append(f"- require_index_hit: {getattr(args_namespace, 'require_index_hit', 'NA')}") lines.append(f"- min_identity: {getattr(args_namespace, 'min_identity', 'NA')}") lines.append(f"- min_coverage: {getattr(args_namespace, 'min_coverage', 'NA')}") lines.append(f"- merge_unresolved: {merge_unresolved}") lines.append("") # Top per strain table try: df = pd.read_csv(strain_scores_path, sep="\t") cols = [c for c in df.columns if c not in ("Strain", "TopOrder", "TopScore")] unresolved_cols = [c for c in ("other", "unknown") if c in cols] unresolved_total = df[unresolved_cols].sum(axis=1).sum() if unresolved_cols else 0.0 total_sum = df[cols].sum(axis=1).sum() if cols else 0.0 frac_unresolved = (unresolved_total / total_sum) if total_sum > 0 else 0.0 lines.append(labels["top_targets"]) lines.append("") lines.append(labels["table_header_strain"]) lines.append(labels["table_sep"]) for _, r in df.iterrows(): lines.append(f"{r['Strain']} | {r.get('TopOrder','')} | {float(r.get('TopScore',0.0)):.3f}") lines.append("") lines.append(f"{labels['unresolved_ratio']}: {_percent(frac_unresolved)}") lines.append("") except Exception as e: lines.append(f"[warn] Failed to read strain scores: {e}") if strain_heatmap_path and strain_heatmap_path.exists(): lines.append(labels["heat_orders"]) lines.append(f"![]({strain_heatmap_path.name})") lines.append("") if species_scores_path and species_scores_path.exists(): lines.append(labels["heat_species"]) if species_heatmap_path and species_heatmap_path.exists(): lines.append(f"![]({species_heatmap_path.name})") else: lines.append("Species heatmap not generated.") lines.append("") # Per-hit summary if toxin_support_path and toxin_support_path.exists(): try: hits = pd.read_csv(toxin_support_path, sep="\t") lines.append(labels["perhit"]) lines.append("") lines.append(labels["table_header_hit"]) lines.append(labels["table_sep_hit"]) for _, r in hits.sort_values("TopScore", ascending=False).head(50).iterrows(): lines.append( f"{r['Hit_id']} | {r.get('Family','')} | {r.get('TopOrder','')} | {float(r.get('TopScore',0.0)):.3f} | {float(r.get('Weight',0.0)):.3f}" ) lines.append("") except Exception as e: lines.append(f"[warn] Failed to read toxin support: {e}") # FAQs lines.append(labels["notes"]) for it in _labels(getattr(args_namespace, "lang", "zh"))["notes_items"]: lines.append(f"- {it}") out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text("\n".join(lines), encoding="utf-8") def write_report_md( out_path: Path, mode: str, lang: str, strain_scores_path: Path, toxin_support_path: Optional[Path], species_scores_path: Optional[Path], strain_heatmap_path: Optional[Path], per_hit_heatmap_path: Optional[Path], species_heatmap_path: Optional[Path], merge_unresolved: bool, args_namespace, crispr_data: Optional[Dict] = None, ): if mode == "summary": return write_summary_md( out_path, strain_scores_path, toxin_support_path, species_scores_path, strain_heatmap_path, per_hit_heatmap_path, species_heatmap_path, merge_unresolved, args_namespace, crispr_data, ) L = _labels(lang) lines: List[str] = [] lines.append(L["title"]) # title lines.append("") # Abstract: brief auto-summary try: df = pd.read_csv(strain_scores_path, sep="\t") base_cols = {"Strain", "TopOrder", "TopScore"} order_cols = [c for c in df.columns if c not in base_cols] orders_only = df[order_cols] unresolved_cols = [c for c in ("other", "unknown") if c in order_cols] frac_unres = 0.0 if order_cols: tot = orders_only.sum(axis=1).sum() unres = df[unresolved_cols].sum(axis=1).sum() if unresolved_cols else 0.0 frac_unres = (unres / tot) if tot > 0 else 0.0 top_global = [] if order_cols: sums = orders_only.sum(axis=0).sort_values(ascending=False) for k, v in sums.head(5).items(): if k in ("other", "unknown") and merge_unresolved: continue top_global.append(f"{k}:{_fmtf(v)}") n_strains = len(df) except Exception: n_strains, frac_unres, top_global = 0, 0.0, [] n_hits, avg_w = 0, 0.0 try: if toxin_support_path and toxin_support_path.exists(): hits = pd.read_csv(toxin_support_path, sep="\t") n_hits = len(hits) if "Weight" in hits.columns and len(hits) > 0: avg_w = float(hits["Weight"].mean()) except Exception: pass lines.append(L["abstract"]) lines.append( f"- {L['n_strains']}: {n_strains}; {L['n_hits']}: {n_hits}; {L['avg_weight']}: {_fmtf(avg_w)}." ) if top_global: lines.append(f"- {L['order_distribution']} " + ", ".join(top_global)) lines.append(f"- {L['unresolved_ratio']}: {_percent(frac_unres)}") lines.append("") # Methods lines.append(L["methods"]) lines.append(L["methods_text"]) # Results lines.append(L["results"]) try: df = pd.read_csv(strain_scores_path, sep="\t") lines.append(L["top_targets"]) lines.append("") lines.append(L["table_header_strain"]) lines.append(L["table_sep"]) for _, r in df.iterrows(): lines.append(f"{r['Strain']} | {r.get('TopOrder','')} | {_fmtf(r.get('TopScore',0.0))}") lines.append("") except Exception as e: lines.append(f"[warn] Failed to read strain scores: {e}") if strain_heatmap_path and strain_heatmap_path.exists(): lines.append(L["heat_orders"]) lines.append(f"![]({strain_heatmap_path.name})") lines.append("") if species_heatmap_path and species_heatmap_path.exists(): lines.append(L["heat_species"]) lines.append(f"![]({species_heatmap_path.name})") lines.append("") # Discussion lines.append(L["discussion"]) lines.append(L["discussion_text"]) # Params lines.append(L["params"]) lines.append( f"- allow_unknown_families: {getattr(args_namespace, 'allow_unknown_families', 'NA')}\n" f"- require_index_hit: {getattr(args_namespace, 'require_index_hit', 'NA')}\n" f"- min_identity: {getattr(args_namespace, 'min_identity', 'NA')}\n" f"- min_coverage: {getattr(args_namespace, 'min_coverage', 'NA')}\n" f"- merge_unresolved: {merge_unresolved}" ) lines.append("") # Appendix if toxin_support_path and toxin_support_path.exists(): try: hits = pd.read_csv(toxin_support_path, sep="\t") lines.append(L["appendix"]) lines.append(L["perhit"]) lines.append("") lines.append(L["table_header_hit"]) lines.append(L["table_sep_hit"]) for _, r in hits.sort_values("TopScore", ascending=False).head(50).iterrows(): lines.append( f"{r['Hit_id']} | {r.get('Family','')} | {r.get('TopOrder','')} | {_fmtf(r.get('TopScore',0.0))} | {_fmtf(r.get('Weight',0.0))}" ) lines.append("") except Exception as e: lines.append(f"[warn] Failed to read toxin support: {e}") out_path.parent.mkdir(parents=True, exist_ok=True) out_path.write_text("\n".join(lines), encoding="utf-8") def main(): ap = argparse.ArgumentParser(description="Plot heatmaps from Bttoxin_Shoter outputs") ap.add_argument("--strain_scores", type=Path, default=Path("shotter_outputs/strain_target_scores.tsv")) ap.add_argument("--toxin_support", type=Path, default=None) ap.add_argument("--species_scores", type=Path, default=None) ap.add_argument("--out_dir", type=Path, default=Path("shotter_outputs")) ap.add_argument("--cmap", type=str, default="viridis") ap.add_argument("--vmin", type=float, default=0.0) ap.add_argument("--vmax", type=float, default=1.0) ap.add_argument("--figsize", type=str, default=None, help="e.g. 12x6") ap.add_argument("--per_hit_strain", type=str, default=None, help="If provided, also draw per-hit heatmap for this strain (requires --toxin_support)") ap.add_argument("--merge_unresolved", action="store_true", default=False, help="Merge columns 'other' and 'unknown' into 'unresolved' for plots") ap.add_argument("--summary_md", type=Path, default=None, help="Write a Markdown report to this path") ap.add_argument("--report_mode", type=str, choices=["summary", "paper"], default="paper", help="Report template style") ap.add_argument("--lang", type=str, choices=["zh", "en"], default="zh", help="Report language") # CRISPR Integration ap.add_argument("--crispr_results", type=Path, default=None, help="Path to CRISPR detection results JSON") ap.add_argument("--crispr_fusion", action="store_true", help="Visualize CRISPR-Toxin fusion events") args = ap.parse_args() args.out_dir.mkdir(parents=True, exist_ok=True) # Strain × Orders heatmap out1 = args.out_dir / "strain_target_scores.png" plot_strain_scores(args.strain_scores, out1, args.cmap, args.vmin, args.vmax, args.figsize, args.merge_unresolved) print(f"Saved: {out1}") # Optional per-hit heatmap if args.per_hit_strain and args.toxin_support: out2 = args.out_dir / f"per_hit_{args.per_hit_strain}.png" plot_per_hit_for_strain(args.toxin_support, args.per_hit_strain, out2, args.cmap, args.vmin, args.vmax, args.figsize, args.merge_unresolved) print(f"Saved: {out2}") # Load CRISPR data if available crispr_data = None if args.crispr_results and args.crispr_results.exists(): try: with open(args.crispr_results) as f: crispr_data = json.load(f) # Future: Generate CRISPR specific plots here print(f"[Plot] Loaded CRISPR data: {len(crispr_data.get('arrays', []))} arrays found") except Exception as e: print(f"[Plot] Failed to load CRISPR results: {e}") # Optional species heatmap species_png: Optional[Path] = None if args.species_scores and args.species_scores.exists(): species_png = args.out_dir / "strain_target_species_scores.png" plot_species_scores(args.species_scores, species_png, args.cmap, args.vmin, args.vmax, args.figsize) print(f"Saved: {species_png}") # Report default_name = "shotter_report_paper.md" if args.report_mode == "paper" else "shotter_summary.md" summary_md = args.summary_md or (args.out_dir / default_name) write_report_md( out_path=summary_md, mode=args.report_mode, lang=args.lang, strain_scores_path=args.strain_scores, toxin_support_path=args.toxin_support, species_scores_path=args.species_scores, strain_heatmap_path=out1, per_hit_heatmap_path=(args.out_dir / f"per_hit_{args.per_hit_strain}.png") if (args.per_hit_strain and args.toxin_support) else None, species_heatmap_path=species_png, merge_unresolved=args.merge_unresolved, args_namespace=args, crispr_data=crispr_data, ) print(f"Saved: {summary_md}") if __name__ == "__main__": main()