96 lines
3.4 KiB
Python
96 lines
3.4 KiB
Python
import pandas as pd
|
||
from pathlib import Path
|
||
import shutil
|
||
from rdkit import Chem
|
||
from concurrent.futures import ProcessPoolExecutor, as_completed
|
||
|
||
# 路径设置
|
||
SOURCE_DIR = Path(r"D:\inhibitor\data\COCUNUT\COCONUT_sdf")
|
||
DEST_DIR = Path(r"C:\Users\pylyz\Documents\project\unidock-mcp\scripts\data\molecules")
|
||
|
||
DEST_DIR.mkdir(parents=True, exist_ok=True)
|
||
|
||
def process_row(row):
|
||
identifier = row['identifier']
|
||
smiles = row['canonical_smiles']
|
||
sdf_filename = f"{identifier}.sdf"
|
||
src_sdf = SOURCE_DIR / sdf_filename
|
||
dst_sdf = DEST_DIR / sdf_filename
|
||
|
||
result = {
|
||
"identifier": identifier,
|
||
"smiles": smiles,
|
||
"src_exists": src_sdf.exists(),
|
||
"copied": False,
|
||
"rdkit_ok": False,
|
||
"error": ""
|
||
}
|
||
|
||
if src_sdf.exists():
|
||
try:
|
||
shutil.copy2(src_sdf, dst_sdf)
|
||
result['copied'] = True
|
||
except Exception as e:
|
||
result['error'] = f"拷贝失败: {e}"
|
||
return result
|
||
try:
|
||
# 检查SDF文件能否被RDKit读取
|
||
suppl = Chem.SDMolSupplier(str(dst_sdf), sanitize=False)
|
||
mols = [mol for mol in suppl if mol is not None]
|
||
if len(mols) > 0:
|
||
result['rdkit_ok'] = True
|
||
else:
|
||
result['error'] = "RDKit无法读取SDF"
|
||
except Exception as e:
|
||
result['error'] = f"RDKit异常: {e}"
|
||
else:
|
||
result['error'] = "SDF文件不存在"
|
||
return result
|
||
|
||
def main():
|
||
csv_path = "coconut_data_info.csv"
|
||
df = pd.read_csv(csv_path, dtype=str)
|
||
|
||
results = []
|
||
with ProcessPoolExecutor() as executor:
|
||
futures = [executor.submit(process_row, row) for _, row in df.iterrows()]
|
||
for fut in as_completed(futures):
|
||
results.append(fut.result())
|
||
|
||
copied_and_valid = [r for r in results if r['src_exists'] and r['copied'] and r['rdkit_ok']]
|
||
copied_but_invalid = [r for r in results if r['src_exists'] and r['copied'] and not r['rdkit_ok']]
|
||
no_sdf = [r for r in results if not r['src_exists']]
|
||
failed_copy = [r for r in results if r['src_exists'] and not r['copied']]
|
||
|
||
print("=" * 60)
|
||
print(f"总分子数: {len(df)}")
|
||
print(f"有SDF并成功拷贝且可被RDKit读取: {len(copied_and_valid)}")
|
||
print(f"有SDF拷贝后不能被RDKit读取(坏文件): {len(copied_but_invalid)}")
|
||
print(f"没有SDF文件: {len(no_sdf)}")
|
||
print(f"SDF存在但拷贝失败: {len(failed_copy)}")
|
||
print("=" * 60)
|
||
|
||
# 输出详细清单
|
||
if copied_but_invalid:
|
||
print("拷贝后坏SDF列表(identifier):")
|
||
for r in copied_but_invalid:
|
||
print(f" {r['identifier']} ({r['error']})")
|
||
print("-" * 30)
|
||
if no_sdf:
|
||
print("没有SDF文件的canonical_smiles:")
|
||
for r in no_sdf:
|
||
print(f" {r['smiles']} ({r['identifier']})")
|
||
# 也可以保存为文本文件
|
||
with open("no_sdf_smiles.txt", "w", encoding="utf-8") as f:
|
||
for r in no_sdf:
|
||
f.write(f"{r['smiles']}\t{r['identifier']}\n")
|
||
print("所有无SDF分子的smiles已保存到 no_sdf_smiles.txt")
|
||
if failed_copy:
|
||
print("拷贝失败的分子:")
|
||
for r in failed_copy:
|
||
print(f" {r['identifier']} ({r['error']})")
|
||
print("-" * 30)
|
||
|
||
if __name__ == "__main__":
|
||
main()
|