from __future__ import annotations import json import sqlite3 import subprocess import sys from pathlib import Path import pandas as pd from macro_lactone_toolkit import MacrolactoneFragmenter from .helpers import build_ambiguous_smiles, build_macrolactone PROJECT_ROOT = Path(__file__).resolve().parents[1] ACTIVE_TEXT_ASSETS = [ PROJECT_ROOT / "scripts" / "README.md", PROJECT_ROOT / "docs" / "SUMMARY.md", PROJECT_ROOT / "docs" / "project-docs" / "QUICK_COMMANDS.md", PROJECT_ROOT / "notebooks" / "README_analyze_ring16.md", ] def run_script(script_name: str, *args: str) -> subprocess.CompletedProcess[str]: return subprocess.run( [sys.executable, str(PROJECT_ROOT / "scripts" / script_name), *args], capture_output=True, text=True, check=False, cwd=PROJECT_ROOT, ) def test_batch_process_script_writes_flat_outputs_and_summary(tmp_path): valid = build_macrolactone(14, {4: "methyl"}) ambiguous = build_ambiguous_smiles() input_path = tmp_path / "molecules.csv" output_path = tmp_path / "fragments.csv" errors_path = tmp_path / "errors.csv" summary_path = tmp_path / "summary.json" pd.DataFrame( [ {"id": "valid_1", "smiles": valid.smiles}, {"id": "ambiguous_1", "smiles": ambiguous}, ] ).to_csv(input_path, index=False) completed = run_script( "batch_process.py", "--input", str(input_path), "--output", str(output_path), "--errors-output", str(errors_path), "--summary-output", str(summary_path), ) assert completed.returncode == 0, completed.stderr assert output_path.exists() assert errors_path.exists() assert summary_path.exists() summary = json.loads(summary_path.read_text(encoding="utf-8")) assert summary["processed"] == 2 assert summary["successful"] == 1 assert summary["failed"] == 1 def test_analyze_fragments_script_generates_reports_and_plot(tmp_path): built = build_macrolactone(16, {5: "methyl", 7: "ethyl"}) result = MacrolactoneFragmenter().fragment_molecule(built.smiles, parent_id="analysis_1") fragments = pd.DataFrame( [ { "parent_id": result.parent_id, "parent_smiles": result.parent_smiles, "ring_size": result.ring_size, **fragment.to_dict(), } for fragment in result.fragments ] ) input_path = tmp_path / "fragments.csv" output_dir = tmp_path / "analysis" fragments.to_csv(input_path, index=False) completed = run_script( "analyze_fragments.py", "--input", str(input_path), "--output-dir", str(output_dir), ) assert completed.returncode == 0, completed.stderr assert (output_dir / "position_statistics.csv").exists() assert (output_dir / "fragment_property_summary.csv").exists() assert (output_dir / "position_frequencies.png").exists() assert (output_dir / "analysis_summary.txt").exists() def test_generate_sdf_and_statistics_script_generates_artifacts(tmp_path): built = build_macrolactone(16, {5: "methyl"}) result = MacrolactoneFragmenter().fragment_molecule(built.smiles, parent_id="sdf_1") fragments = pd.DataFrame( [ { "parent_id": result.parent_id, "parent_smiles": result.parent_smiles, "ring_size": result.ring_size, **fragment.to_dict(), } for fragment in result.fragments ] ) input_path = tmp_path / "fragments.csv" output_dir = tmp_path / "sdf_output" fragments.to_csv(input_path, index=False) completed = run_script( "generate_sdf_and_statistics.py", "--input", str(input_path), "--output-dir", str(output_dir), ) assert completed.returncode == 0, completed.stderr assert (output_dir / "cleavage_position_statistics.json").exists() assert (output_dir / "sdf" / "sdf_1_3d.sdf").exists() def test_analyze_validation_fragment_library_script_generates_reports(tmp_path): input_path = tmp_path / "fragment_library.csv" db_path = tmp_path / "fragments.db" output_dir = tmp_path / "fragment_library_analysis" pd.DataFrame( [ { "id": 1, "source_type": "validation_extract", "source_fragment_id": "ML16A_frag_0", "source_parent_ml_id": "ML16A", "source_parent_chembl_id": None, "cleavage_position": 3, "fragment_smiles_labeled": "[3*]C", "fragment_smiles_plain": "*C", "has_dummy_atom": True, "dummy_atom_count": 1, "splice_ready": True, "original_bond_type": "SINGLE", "created_at": "2026-03-19 00:00:00", }, { "id": 2, "source_type": "validation_extract", "source_fragment_id": "ML16A_frag_1", "source_parent_ml_id": "ML16A", "source_parent_chembl_id": None, "cleavage_position": 3, "fragment_smiles_labeled": "[3*]CC", "fragment_smiles_plain": "*CC", "has_dummy_atom": True, "dummy_atom_count": 1, "splice_ready": True, "original_bond_type": "SINGLE", "created_at": "2026-03-19 00:00:00", }, { "id": 3, "source_type": "validation_extract", "source_fragment_id": "ML16B_frag_0", "source_parent_ml_id": "ML16B", "source_parent_chembl_id": None, "cleavage_position": 4, "fragment_smiles_labeled": "[4*]O", "fragment_smiles_plain": "*O", "has_dummy_atom": True, "dummy_atom_count": 1, "splice_ready": True, "original_bond_type": "SINGLE", "created_at": "2026-03-19 00:00:00", }, { "id": 4, "source_type": "validation_extract", "source_fragment_id": "ML14A_frag_0", "source_parent_ml_id": "ML14A", "source_parent_chembl_id": None, "cleavage_position": 3, "fragment_smiles_labeled": "[3*]CCC", "fragment_smiles_plain": "*CCC", "has_dummy_atom": True, "dummy_atom_count": 1, "splice_ready": True, "original_bond_type": "SINGLE", "created_at": "2026-03-19 00:00:00", }, ] ).to_csv(input_path, index=False) with sqlite3.connect(db_path) as connection: connection.execute( """ CREATE TABLE parent_molecules ( id INTEGER PRIMARY KEY, ml_id TEXT NOT NULL, chembl_id TEXT, molecule_name TEXT, smiles TEXT NOT NULL, classification TEXT NOT NULL, ring_size INTEGER, primary_reason_code TEXT, primary_reason_message TEXT, processing_status TEXT NOT NULL, error_message TEXT, num_sidechains INTEGER, cleavage_positions TEXT, numbered_image_path TEXT, created_at TEXT NOT NULL, processed_at TEXT ) """ ) connection.executemany( """ INSERT INTO parent_molecules ( id, ml_id, chembl_id, molecule_name, smiles, classification, ring_size, primary_reason_code, primary_reason_message, processing_status, error_message, num_sidechains, cleavage_positions, numbered_image_path, created_at, processed_at ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) """, [ (1, "ML16A", None, None, "C1CCCCCCCCCCCCCC(=O)O1", "standard_macrolactone", 16, None, None, "success", None, 2, "[3]", None, "2026-03-19 00:00:00", None), (2, "ML16B", None, None, "C1CCCCCCCCCCCCCC(=O)O1", "standard_macrolactone", 16, None, None, "success", None, 1, "[4]", None, "2026-03-19 00:00:00", None), (3, "ML14A", None, None, "C1CCCCCCCCCCCC(=O)O1", "standard_macrolactone", 14, None, None, "success", None, 1, "[3]", None, "2026-03-19 00:00:00", None), ], ) connection.commit() completed = run_script( "analyze_validation_fragment_library.py", "--input", str(input_path), "--db", str(db_path), "--output-dir", str(output_dir), "--ring-size", "16", ) assert completed.returncode == 0, completed.stderr assert (output_dir / "fragment_atom_count_distribution.png").exists() assert (output_dir / "fragment_atom_count_summary.csv").exists() assert (output_dir / "fragment_atom_count_filter_candidates.csv").exists() assert (output_dir / "ring16_position_count_comparison.csv").exists() assert (output_dir / "ring16_position_count_comparison.png").exists() assert (output_dir / "ring16_position_atom_count_distribution.png").exists() assert (output_dir / "ring16_position_atom_count_boxplot_gt3.png").exists() assert (output_dir / "ring16_position_diversity.csv").exists() assert (output_dir / "ring16_position_diversity_gt3.csv").exists() assert (output_dir / "ring16_position_diversity_gt3.png").exists() assert (output_dir / "ring16_position_ring_sensitivity.csv").exists() assert (output_dir / "ring16_position_ring_sensitivity.png").exists() assert (output_dir / "ring16_medchem_hotspot_comparison.csv").exists() assert (output_dir / "ring16_medchem_hotspot_comparison.png").exists() assert (output_dir / "fragment_library_analysis_report.md").exists() assert (output_dir / "fragment_library_analysis_report_zh.md").exists() assert (output_dir / "analysis_summary.txt").exists() diversity = pd.read_csv(output_dir / "ring16_position_diversity.csv") assert set(diversity["cleavage_position"]) == {3, 4} assert set(diversity["total_fragments"]) == {1, 2} diversity_gt3 = pd.read_csv(output_dir / "ring16_position_diversity_gt3.csv") assert diversity_gt3.empty report_zh = (output_dir / "fragment_library_analysis_report_zh.md").read_text(encoding="utf-8") assert "桥连或双锚点侧链不会进入当前片段库" in report_zh assert "cyclic single-anchor side chains" in report_zh def test_active_text_assets_do_not_reference_legacy_api(): forbidden_patterns = [ "from src.", "import src.", "process_csv(", "batch_to_dataframe(", "visualize_molecule(", "save_to_json(", ] for path in ACTIVE_TEXT_ASSETS: text = path.read_text(encoding="utf-8") for pattern in forbidden_patterns: assert pattern not in text, f"{path} still contains legacy reference: {pattern}"