Unify macrolactone detection, numbering, fragmentation, and splicing under the installable macro_lactone_toolkit package. - replace legacy src.* modules with the new package layout - add analyze/number/fragment CLI entrypoints and pixi tasks - migrate tests, README, and scripts to the new package API
161 lines
4.9 KiB
Python
161 lines
4.9 KiB
Python
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
from dataclasses import asdict
|
|
from pathlib import Path
|
|
|
|
import pandas as pd
|
|
|
|
from .analyzer import MacroLactoneAnalyzer
|
|
from .errors import MacrolactoneError
|
|
from .fragmenter import MacrolactoneFragmenter
|
|
|
|
|
|
def main() -> None:
|
|
parser = build_parser()
|
|
args = parser.parse_args()
|
|
if not hasattr(args, "func"):
|
|
parser.print_help()
|
|
raise SystemExit(1)
|
|
args.func(args)
|
|
|
|
|
|
def build_parser() -> argparse.ArgumentParser:
|
|
parser = argparse.ArgumentParser(prog="macro-lactone-toolkit")
|
|
subparsers = parser.add_subparsers(dest="command")
|
|
|
|
analyze = subparsers.add_parser("analyze")
|
|
_add_common_input_arguments(analyze)
|
|
analyze.add_argument("--ring-size", type=int, default=None)
|
|
analyze.set_defaults(func=run_analyze)
|
|
|
|
number = subparsers.add_parser("number")
|
|
number.add_argument("--smiles", required=True)
|
|
number.add_argument("--ring-size", type=int, default=None)
|
|
number.set_defaults(func=run_number)
|
|
|
|
fragment = subparsers.add_parser("fragment")
|
|
_add_common_input_arguments(fragment)
|
|
fragment.add_argument("--ring-size", type=int, default=None)
|
|
fragment.add_argument("--parent-id", default=None)
|
|
fragment.add_argument("--errors-output", default=None)
|
|
fragment.set_defaults(func=run_fragment)
|
|
|
|
return parser
|
|
|
|
|
|
def run_analyze(args: argparse.Namespace) -> None:
|
|
analyzer = MacroLactoneAnalyzer()
|
|
if args.smiles:
|
|
payload = analyzer.analyze_molecule(args.smiles)
|
|
_write_output(payload, args.output)
|
|
return
|
|
|
|
rows = _read_csv_rows(args.input, args.smiles_column, args.id_column)
|
|
payload = []
|
|
for row in rows:
|
|
analysis = analyzer.analyze_molecule(row["smiles"])
|
|
analysis["parent_id"] = row["parent_id"]
|
|
payload.append(analysis)
|
|
_write_output(payload, args.output)
|
|
|
|
|
|
def run_number(args: argparse.Namespace) -> None:
|
|
fragmenter = MacrolactoneFragmenter(ring_size=args.ring_size)
|
|
payload = fragmenter.number_molecule(args.smiles).to_dict()
|
|
_write_json(payload, None)
|
|
|
|
|
|
def run_fragment(args: argparse.Namespace) -> None:
|
|
fragmenter = MacrolactoneFragmenter(ring_size=args.ring_size)
|
|
|
|
if args.smiles:
|
|
result = fragmenter.fragment_molecule(args.smiles, parent_id=args.parent_id)
|
|
_write_output(result.to_dict(), args.output)
|
|
return
|
|
|
|
rows = _read_csv_rows(args.input, args.smiles_column, args.id_column)
|
|
fragment_rows: list[dict] = []
|
|
error_rows: list[dict] = []
|
|
|
|
for row in rows:
|
|
try:
|
|
result = fragmenter.fragment_molecule(row["smiles"], parent_id=row["parent_id"])
|
|
except MacrolactoneError as exc:
|
|
error_rows.append(
|
|
{
|
|
"parent_id": row["parent_id"],
|
|
"smiles": row["smiles"],
|
|
"error_type": type(exc).__name__,
|
|
"error_message": str(exc),
|
|
}
|
|
)
|
|
continue
|
|
|
|
for fragment in result.fragments:
|
|
fragment_rows.append(
|
|
{
|
|
"parent_id": result.parent_id,
|
|
"ring_size": result.ring_size,
|
|
**fragment.to_dict(),
|
|
}
|
|
)
|
|
|
|
if args.output:
|
|
_write_output(fragment_rows, args.output)
|
|
else:
|
|
_write_json({"fragments": fragment_rows, "errors": error_rows}, None)
|
|
|
|
if args.errors_output:
|
|
_write_output(error_rows, args.errors_output)
|
|
|
|
|
|
def _add_common_input_arguments(parser: argparse.ArgumentParser) -> None:
|
|
group = parser.add_mutually_exclusive_group(required=True)
|
|
group.add_argument("--smiles")
|
|
group.add_argument("--input")
|
|
parser.add_argument("--smiles-column", default="smiles")
|
|
parser.add_argument("--id-column", default="id")
|
|
parser.add_argument("--output", default=None)
|
|
|
|
|
|
def _read_csv_rows(input_path: str, smiles_column: str, id_column: str) -> list[dict]:
|
|
dataframe = pd.read_csv(input_path)
|
|
rows = []
|
|
for index, row in dataframe.iterrows():
|
|
parent_id = row[id_column] if id_column in dataframe.columns else f"row_{index}"
|
|
rows.append(
|
|
{
|
|
"parent_id": str(parent_id),
|
|
"smiles": row[smiles_column],
|
|
}
|
|
)
|
|
return rows
|
|
|
|
|
|
def _write_output(payload: list[dict] | dict, output_path: str | None) -> None:
|
|
if output_path is None:
|
|
_write_json(payload, None)
|
|
return
|
|
|
|
path = Path(output_path)
|
|
if path.suffix.lower() == ".csv":
|
|
dataframe = pd.DataFrame(payload)
|
|
dataframe.to_csv(path, index=False)
|
|
return
|
|
|
|
_write_json(payload, path)
|
|
|
|
|
|
def _write_json(payload: list[dict] | dict, output_path: Path | None) -> None:
|
|
text = json.dumps(payload, indent=2, ensure_ascii=False)
|
|
if output_path is None:
|
|
print(text)
|
|
else:
|
|
output_path.write_text(text + "\n", encoding="utf-8")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|