feat(toolkit): ship macro_lactone_toolkit package

Unify macrolactone detection, numbering, fragmentation, and
splicing under the installable macro_lactone_toolkit package.

- replace legacy src.* modules with the new package layout
- add analyze/number/fragment CLI entrypoints and pixi tasks
- migrate tests, README, and scripts to the new package API
This commit is contained in:
2026-03-18 22:06:45 +08:00
parent a768d26e47
commit 5e7b236f31
45 changed files with 1302 additions and 6304 deletions

View File

@@ -0,0 +1,160 @@
from __future__ import annotations
import argparse
import json
from dataclasses import asdict
from pathlib import Path
import pandas as pd
from .analyzer import MacroLactoneAnalyzer
from .errors import MacrolactoneError
from .fragmenter import MacrolactoneFragmenter
def main() -> None:
parser = build_parser()
args = parser.parse_args()
if not hasattr(args, "func"):
parser.print_help()
raise SystemExit(1)
args.func(args)
def build_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(prog="macro-lactone-toolkit")
subparsers = parser.add_subparsers(dest="command")
analyze = subparsers.add_parser("analyze")
_add_common_input_arguments(analyze)
analyze.add_argument("--ring-size", type=int, default=None)
analyze.set_defaults(func=run_analyze)
number = subparsers.add_parser("number")
number.add_argument("--smiles", required=True)
number.add_argument("--ring-size", type=int, default=None)
number.set_defaults(func=run_number)
fragment = subparsers.add_parser("fragment")
_add_common_input_arguments(fragment)
fragment.add_argument("--ring-size", type=int, default=None)
fragment.add_argument("--parent-id", default=None)
fragment.add_argument("--errors-output", default=None)
fragment.set_defaults(func=run_fragment)
return parser
def run_analyze(args: argparse.Namespace) -> None:
analyzer = MacroLactoneAnalyzer()
if args.smiles:
payload = analyzer.analyze_molecule(args.smiles)
_write_output(payload, args.output)
return
rows = _read_csv_rows(args.input, args.smiles_column, args.id_column)
payload = []
for row in rows:
analysis = analyzer.analyze_molecule(row["smiles"])
analysis["parent_id"] = row["parent_id"]
payload.append(analysis)
_write_output(payload, args.output)
def run_number(args: argparse.Namespace) -> None:
fragmenter = MacrolactoneFragmenter(ring_size=args.ring_size)
payload = fragmenter.number_molecule(args.smiles).to_dict()
_write_json(payload, None)
def run_fragment(args: argparse.Namespace) -> None:
fragmenter = MacrolactoneFragmenter(ring_size=args.ring_size)
if args.smiles:
result = fragmenter.fragment_molecule(args.smiles, parent_id=args.parent_id)
_write_output(result.to_dict(), args.output)
return
rows = _read_csv_rows(args.input, args.smiles_column, args.id_column)
fragment_rows: list[dict] = []
error_rows: list[dict] = []
for row in rows:
try:
result = fragmenter.fragment_molecule(row["smiles"], parent_id=row["parent_id"])
except MacrolactoneError as exc:
error_rows.append(
{
"parent_id": row["parent_id"],
"smiles": row["smiles"],
"error_type": type(exc).__name__,
"error_message": str(exc),
}
)
continue
for fragment in result.fragments:
fragment_rows.append(
{
"parent_id": result.parent_id,
"ring_size": result.ring_size,
**fragment.to_dict(),
}
)
if args.output:
_write_output(fragment_rows, args.output)
else:
_write_json({"fragments": fragment_rows, "errors": error_rows}, None)
if args.errors_output:
_write_output(error_rows, args.errors_output)
def _add_common_input_arguments(parser: argparse.ArgumentParser) -> None:
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--smiles")
group.add_argument("--input")
parser.add_argument("--smiles-column", default="smiles")
parser.add_argument("--id-column", default="id")
parser.add_argument("--output", default=None)
def _read_csv_rows(input_path: str, smiles_column: str, id_column: str) -> list[dict]:
dataframe = pd.read_csv(input_path)
rows = []
for index, row in dataframe.iterrows():
parent_id = row[id_column] if id_column in dataframe.columns else f"row_{index}"
rows.append(
{
"parent_id": str(parent_id),
"smiles": row[smiles_column],
}
)
return rows
def _write_output(payload: list[dict] | dict, output_path: str | None) -> None:
if output_path is None:
_write_json(payload, None)
return
path = Path(output_path)
if path.suffix.lower() == ".csv":
dataframe = pd.DataFrame(payload)
dataframe.to_csv(path, index=False)
return
_write_json(payload, path)
def _write_json(payload: list[dict] | dict, output_path: Path | None) -> None:
text = json.dumps(payload, indent=2, ensure_ascii=False)
if output_path is None:
print(text)
else:
output_path.write_text(text + "\n", encoding="utf-8")
if __name__ == "__main__":
main()