"""Docker/Podman 容器管理(修正版,支持 arm64/macOS 与 linux/amd64)""" from __future__ import annotations import os import subprocess import logging import time from pathlib import Path from typing import Dict, Any, Optional, List import sys try: import docker # type: ignore except Exception: # 允许在无 docker SDK 环境下使用 podman fallback docker = None # type: ignore from ..core.config import settings logger = logging.getLogger(__name__) def _which(cmd: str) -> Optional[str]: from shutil import which return which(cmd) class DockerContainerManager: """容器管理器 - 兼容 Docker 与 Podman。 优先尝试 docker SDK;若不可用则回落到 podman CLI(或 docker CLI)。 在 arm64 主机上默认以 --platform linux/amd64 运行镜像。 """ def __init__( self, image: str = settings.DOCKER_IMAGE, platform: str = settings.DOCKER_PLATFORM, ) -> None: self.image = image self.platform = platform self._engine: str = "docker" self._client = None # 首选 docker-py 客户端(若可用) if docker is not None: try: self._client = docker.from_env() # 探测 daemon self._client.ping() self._engine = "docker-sdk" except Exception as err: logger.info(f"docker SDK 不可用,将尝试 CLI 回落: {err}") self._client = None # CLI 回落:优先 podman,其次 docker if self._client is None: if _which("podman"): self._engine = "podman-cli" elif _which("docker"): self._engine = "docker-cli" else: raise RuntimeError("未找到可用的容器引擎(需要 podman 或 docker)") self._ensure_image() # ----------------------------- 公共方法 ----------------------------- def run_command_in_container( self, command: List[str], volumes: Dict[str, Dict[str, str]], environment: Optional[Dict[str, str]] = None, working_dir: str = "/workspace", name: Optional[str] = None, detach: bool = False, remove: bool = True, ) -> Dict[str, Any]: """在容器中执行命令,返回执行结果。""" # 确保挂载目录存在且可写 for host_path, spec in volumes.items(): p = Path(host_path) p.mkdir(parents=True, exist_ok=True) try: p.chmod(0o777) except Exception: pass if self._engine == "docker-sdk" and self._client is not None: return self._run_with_docker_sdk( command, volumes, environment, working_dir, name, detach, remove ) else: return self._run_with_cli( command, volumes, environment, working_dir, name, detach, remove ) def update_database(self, log_dir: Path) -> Dict[str, Any]: """在容器中更新 BtToxin_Digger 数据库。""" cmd = [ "/usr/local/env-execute", "BtToxin_Digger", "--update-db", ] vols = {str(log_dir): {"bind": "/logs", "mode": "rw"}} result = self.run_command_in_container( command=cmd, volumes=vols, working_dir="/tmp", name=f"bttoxin_update_db_{int(time.time())}" ) if result.get("logs"): (log_dir / "update_db.log").write_text(result["logs"], encoding="utf-8") return result def validate_reads_filenames( self, input_dir: Path, platform: str, reads1_suffix: str, reads2_suffix: Optional[str] = None, suffix_len: int = 0, ) -> Dict[str, Any]: files = list(input_dir.glob("*")) if platform == "illumina": r1 = [f for f in files if reads1_suffix and reads1_suffix in f.name] r2 = [f for f in files if reads2_suffix and reads2_suffix in f.name] if not r1 or not r2 or len(r1) != len(r2): return {"valid": False, "error": "Illumina R1/R2 配对数量不匹配或缺失"} for f1 in r1: strain = f1.name.replace(reads1_suffix, "") if not (input_dir / f"{strain}{reads2_suffix}").exists(): return {"valid": False, "error": f"未找到配对文件: {strain}{reads2_suffix}"} return { "valid": True, "strain_count": len(r1), "suggested_suffix_len": suffix_len or len(reads1_suffix), } if platform in ("pacbio", "oxford"): r = [f for f in files if reads1_suffix and reads1_suffix in f.name] if not r: return {"valid": False, "error": f"未找到匹配 {reads1_suffix} 的 reads 文件"} return { "valid": True, "strain_count": len(r), "suggested_suffix_len": suffix_len or len(reads1_suffix), } return {"valid": True} def run_bttoxin_digger( self, input_dir: Path, output_dir: Path, log_dir: Path, sequence_type: str = "nucl", scaf_suffix: str = ".fna", threads: int = 4, **kwargs: Any, ) -> Dict[str, Any]: """在容器中运行 BtToxin_Digger 主分析(单目录方案)。""" # 1) 在宿主输出目录下准备 input_files,并复制输入文件 work_input_dir = (output_dir / "input_files").resolve() work_input_dir.mkdir(parents=True, exist_ok=True) import shutil if sequence_type == "nucl": pattern = f"*{scaf_suffix}" elif sequence_type == "orfs": pattern = f"*{kwargs.get('orfs_suffix', '.ffn')}" elif sequence_type == "prot": pattern = f"*{kwargs.get('prot_suffix', '.faa')}" elif sequence_type == "reads": pattern = "*" else: pattern = "*" copied_files = 0 for f in input_dir.glob(pattern): if f.is_file(): shutil.copy2(f, work_input_dir / f.name) copied_files += 1 logger.info(f"已复制 {copied_files} 个输入文件到 {work_input_dir}") base_cmd: List[str] = [ "/usr/local/env-execute", "BtToxin_Digger", "--SeqPath", "/workspace/input_files", "--SequenceType", sequence_type, "--threads", str(threads), ] if sequence_type == "nucl": base_cmd += ["--Scaf_suffix", scaf_suffix] elif sequence_type == "orfs": base_cmd += ["--orfs_suffix", kwargs.get("orfs_suffix", ".ffn")] elif sequence_type == "prot": base_cmd += ["--prot_suffix", kwargs.get("prot_suffix", ".faa")] elif sequence_type == "reads": platform = kwargs.get("platform", "illumina") base_cmd += ["--platform", platform] if platform == "illumina": r1 = kwargs.get("reads1_suffix", "_R1.fastq.gz") r2 = kwargs.get("reads2_suffix", "_R2.fastq.gz") sfx = kwargs.get("suffix_len") or len(r1) v = self.validate_reads_filenames(work_input_dir, platform, r1, r2, sfx) if not v.get("valid"): raise ValueError(f"Reads 文件验证失败: {v.get('error')}") sfx = v.get("suggested_suffix_len", sfx) base_cmd += ["--reads1", r1, "--reads2", r2, "--suffix_len", str(sfx)] elif platform in ("pacbio", "oxford"): r = kwargs.get("reads1_suffix", ".fastq.gz") gsize = kwargs.get("genome_size", "6.07m") sfx = kwargs.get("suffix_len") or len(r) v = self.validate_reads_filenames(work_input_dir, platform, r, None, sfx) if not v.get("valid"): raise ValueError(f"Reads 文件验证失败: {v.get('error')}") sfx = v.get("suggested_suffix_len", sfx) base_cmd += ["--reads1", r, "--genomeSize", gsize, "--suffix_len", str(sfx)] elif platform == "hybrid": short1 = kwargs.get("short1") short2 = kwargs.get("short2") long = kwargs.get("long") if not all([short1, short2, long]): raise ValueError("hybrid 需要 short1/short2/long 三个完整文件名") for fn in (short1, short2, long): if not (work_input_dir / fn).exists(): raise ValueError(f"文件不存在: {fn}") base_cmd += [ "--short1", short1, "--short2", short2, "--long", long, "--hout", "/workspace/Results/Assembles/Hybrid", ] if kwargs.get("assemble_only"): base_cmd.append("--assemble_only") # 2) 只挂载输出目录(含 input_files)与日志目录 volumes = { str(output_dir.resolve()): {"bind": "/workspace", "mode": "rw"}, str(log_dir.resolve()): {"bind": "/data/logs", "mode": "rw"}, } logger.info("开始 BtToxin_Digger 分析...") final_cmd = base_cmd working_dir = "/workspace" result = self.run_command_in_container( command=final_cmd, volumes=volumes, working_dir=working_dir, name=f"bttoxin_digger_{int(time.time())}", ) # 保存容器日志 logs_path = log_dir / "digger_execution.log" if result.get("logs"): logs_path.write_text(result["logs"], encoding="utf-8") logger.info(f"容器日志已保存: {logs_path}") # 验证输出 results_dir = output_dir / "Results" if result.get("success") and results_dir.exists(): files = [f for f in results_dir.rglob("*") if f.is_file()] result["output_files"] = len(files) else: result["output_files"] = 0 return result # ----------------------------- 内部实现 ----------------------------- def _ensure_image(self) -> None: if self._engine == "docker-sdk" and self._client is not None: try: self._client.images.get(self.image) return except Exception: logger.info(f"拉取镜像 {self.image} (platform={self.platform}) ...") self._client.images.pull(self.image, platform=self.platform) else: # CLI 模式:先尝试拉取 cli = "podman" if self._engine == "podman-cli" else "docker" try: subprocess.run( [cli, "pull", "--platform", self.platform, self.image], check=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True, ) except Exception as err: logger.warning(f"{cli} pull 失败: {err}") def _run_with_docker_sdk( self, command: List[str], volumes: Dict[str, Dict[str, str]], environment: Optional[Dict[str, str]], working_dir: str, name: Optional[str], detach: bool, remove: bool, ) -> Dict[str, Any]: assert self._client is not None try: # 注意:docker SDK 在 detach=False 时返回的是日志字节串,而非容器对象。 # 这里统一以 detach=True 运行,然后等待并抓取日志,最后按需删除容器。 container = self._client.containers.run( image=self.image, command=command, volumes=volumes, environment=environment or {}, working_dir=working_dir, platform=self.platform, name=name, user="0:0", # 以 root 运行,避免挂载目录权限问题 detach=True, remove=False, # 获取日志后再删 stdout=True, stderr=True, ) exit_info = container.wait() code = exit_info.get("StatusCode", 1) logs = container.logs().decode("utf-8", errors="ignore") if remove: try: container.remove() except Exception: pass return {"success": code == 0, "exit_code": code, "logs": logs, "status": "completed" if code == 0 else "failed"} except Exception as e: logger.error(f"docker SDK 运行失败: {e}", exc_info=True) return {"success": False, "error": str(e), "exit_code": -1, "status": "error"} def _run_with_cli( self, command: List[str], volumes: Dict[str, Dict[str, str]], environment: Optional[Dict[str, str]], working_dir: str, name: Optional[str], detach: bool, remove: bool, ) -> Dict[str, Any]: cli = "podman" if self._engine == "podman-cli" else "docker" cmd: List[str] = [cli, "run", "--rm" if remove and not detach else ""] cmd = [c for c in cmd if c] cmd += ["--platform", self.platform] # 以 root 运行,避免权限问题 cmd += ["--user", "0:0"] if name: cmd += ["--name", name] for host, spec in volumes.items(): bind = spec.get("bind") mode = spec.get("mode", "rw") # Podman(Linux) 下附加 :Z 处理 SELinux 标注;其他平台保持不变 mount_mode = mode if self._engine == "podman-cli" and os.name == "posix" and sys.platform.startswith("linux"): mount_mode = f"{mode},Z" cmd += ["-v", f"{host}:{bind}:{mount_mode}"] for k, v in (environment or {}).items(): cmd += ["-e", f"{k}={v}"] cmd += ["-w", working_dir, self.image] cmd += command try: if detach: # 后台运行:CLI 简化返回 p = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, text=True) return {"success": True, "status": "running", "pid": p.pid} else: proc = subprocess.run(cmd, capture_output=True, text=True) out = (proc.stdout or "") + (proc.stderr or "") return {"success": proc.returncode == 0, "exit_code": proc.returncode, "logs": out, "status": "completed" if proc.returncode == 0 else "failed"} except Exception as e: logger.error(f"{cli} 运行失败: {e}", exc_info=True) return {"success": False, "error": str(e), "exit_code": -1, "status": "error"}