feat(crispr): implement CRISPR-Cas detection and fusion analysis module
This commit is contained in:
166
crispr_cas/scripts/fusion_analysis.py
Normal file
166
crispr_cas/scripts/fusion_analysis.py
Normal file
@@ -0,0 +1,166 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
CRISPR-Toxin Fusion Analysis
|
||||
Analyzes associations between CRISPR spacers and toxin genes.
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import logging
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Any
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(
|
||||
level=logging.INFO,
|
||||
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
|
||||
)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
def parse_args():
|
||||
parser = argparse.ArgumentParser(description="Analyze CRISPR-Toxin associations")
|
||||
parser.add_argument("--crispr-results", type=Path, required=True, help="CRISPR detection results (JSON)")
|
||||
parser.add_argument("--toxin-results", type=Path, required=True, help="Toxin detection results (JSON or TXT)")
|
||||
parser.add_argument("--genome", type=Path, required=True, help="Original genome file (.fna)")
|
||||
parser.add_argument("--output", "-o", type=Path, required=True, help="Output analysis JSON")
|
||||
parser.add_argument("--mock", action="store_true", help="Use mock analysis logic")
|
||||
return parser.parse_args()
|
||||
|
||||
def load_json(path: Path) -> Dict:
|
||||
with open(path) as f:
|
||||
return json.load(f)
|
||||
|
||||
def calculate_distance(range1: str, range2: str) -> int:
|
||||
"""
|
||||
Calculate distance between two genomic ranges.
|
||||
Format: 'contig:start-end'
|
||||
"""
|
||||
try:
|
||||
contig1, coords1 = range1.split(':')
|
||||
start1, end1 = map(int, coords1.split('-'))
|
||||
|
||||
contig2, coords2 = range2.split(':')
|
||||
start2, end2 = map(int, coords2.split('-'))
|
||||
|
||||
if contig1 != contig2:
|
||||
return -1 # Different contigs
|
||||
|
||||
# Check for overlap
|
||||
if max(start1, start2) <= min(end1, end2):
|
||||
return 0
|
||||
|
||||
# Calculate distance
|
||||
if start1 > end2:
|
||||
return start1 - end2
|
||||
else:
|
||||
return start2 - end1
|
||||
except Exception as e:
|
||||
logger.warning(f"Error calculating distance: {e}")
|
||||
return -1
|
||||
|
||||
def mock_blast_spacers(spacers: List[str], toxins: List[Dict]) -> List[Dict]:
|
||||
"""Mock BLAST spacers against toxins"""
|
||||
matches = []
|
||||
# Simulate a match if 'Cry' is in the spacer name (just for demo logic) or random
|
||||
# In reality, we'd blast sequences.
|
||||
|
||||
# Let's just create a fake match for the first spacer
|
||||
if spacers and toxins:
|
||||
matches.append({
|
||||
"spacer_seq": spacers[0],
|
||||
"target_toxin": toxins[0].get("name", "Unknown"),
|
||||
"identity": 98.5,
|
||||
"alignment_length": 32,
|
||||
"mismatches": 1
|
||||
})
|
||||
return matches
|
||||
|
||||
def perform_fusion_analysis(crispr_data: Dict, toxin_file: Path, mock: bool = False) -> Dict:
|
||||
"""
|
||||
Main analysis logic.
|
||||
1. Map CRISPR arrays
|
||||
2. Map Toxin genes
|
||||
3. Calculate distances
|
||||
4. Check for spacer matches
|
||||
"""
|
||||
|
||||
analysis_results = {
|
||||
"strain_id": crispr_data.get("strain_id"),
|
||||
"associations": [],
|
||||
"summary": {"proximal_pairs": 0, "spacer_matches": 0}
|
||||
}
|
||||
|
||||
# Extract arrays
|
||||
arrays = crispr_data.get("arrays", [])
|
||||
|
||||
# Mock Toxin Parsing (assuming simple list for now if not JSON)
|
||||
toxins = []
|
||||
if mock:
|
||||
toxins = [
|
||||
{"name": "Cry1Ac1", "position": "contig_1:10000-12000"},
|
||||
{"name": "Vip3Aa1", "position": "contig_2:60000-62000"}
|
||||
]
|
||||
else:
|
||||
# TODO: Implement real toxin file parsing (e.g. from All_Toxins.txt)
|
||||
logger.warning("Real toxin parsing not implemented yet, using empty list")
|
||||
|
||||
# Analyze Proximity
|
||||
for array in arrays:
|
||||
array_pos = f"{array.get('contig')}:{array.get('start')}-{array.get('end')}"
|
||||
|
||||
for toxin in toxins:
|
||||
dist = calculate_distance(array_pos, toxin["position"])
|
||||
|
||||
if dist != -1 and dist < 10000: # 10kb window
|
||||
association = {
|
||||
"type": "proximity",
|
||||
"array_id": array.get("id"),
|
||||
"toxin": toxin["name"],
|
||||
"distance": dist,
|
||||
"array_position": array_pos,
|
||||
"toxin_position": toxin["position"]
|
||||
}
|
||||
analysis_results["associations"].append(association)
|
||||
analysis_results["summary"]["proximal_pairs"] += 1
|
||||
|
||||
# Analyze Spacer Matches (Mock)
|
||||
all_spacers = []
|
||||
for array in arrays:
|
||||
for spacer in array.get("spacers", []):
|
||||
all_spacers.append(spacer.get("sequence"))
|
||||
|
||||
matches = mock_blast_spacers(all_spacers, toxins)
|
||||
for match in matches:
|
||||
analysis_results["associations"].append({
|
||||
"type": "spacer_match",
|
||||
**match
|
||||
})
|
||||
analysis_results["summary"]["spacer_matches"] += 1
|
||||
|
||||
return analysis_results
|
||||
|
||||
def main():
|
||||
args = parse_args()
|
||||
|
||||
if not args.crispr_results.exists():
|
||||
logger.error(f"CRISPR results file not found: {args.crispr_results}")
|
||||
sys.exit(1)
|
||||
|
||||
try:
|
||||
crispr_data = load_json(args.crispr_results)
|
||||
|
||||
results = perform_fusion_analysis(crispr_data, args.toxin_results, args.mock)
|
||||
|
||||
# Write results
|
||||
with open(args.output, 'w') as f:
|
||||
json.dump(results, f, indent=2)
|
||||
|
||||
logger.info(f"Fusion analysis complete. Results: {args.output}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error during fusion analysis: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user