Spaces:
Sleeping
Sleeping
| """ run_magma.py | |
| Accept input processed spectra and make subformula peak assignments | |
| accordingly. | |
| """ | |
| import logging | |
| from pathlib import Path | |
| import numpy as np | |
| import pandas as pd | |
| import argparse | |
| import sys | |
| from multiprocessing import Pool | |
| from tqdm import tqdm | |
| from collections import defaultdict | |
| import json | |
| # add parent path | |
| import os | |
| sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '..'))) | |
| # Custom import | |
| from magma.fragmentation import FragmentEngine, ionmasses | |
| from magma import magma_utils | |
| from magma.fragmentation import ionmasses | |
| # Define basic logger | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format="%(asctime)s %(levelname)s: %(message)s", | |
| handlers=[ | |
| logging.StreamHandler(sys.stdout), | |
| ], | |
| ) | |
| FRAGMENT_ENGINE_PARAMS = { | |
| "max_broken_bonds": 3, | |
| "max_water_losses": 1, | |
| "ionisation_mode": 1, | |
| "skip_fragmentation": 0, | |
| "molcharge": 0, | |
| } | |
| PEAK_ASSIGNMENT_PARAMS = { | |
| 'lowest_penalty_filter': True, | |
| 'tolerance': 1 | |
| } | |
| def get_args(): | |
| """get args""" | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument( | |
| '--data_pth', | |
| required=True | |
| ) | |
| parser.add_argument( | |
| "--output_dir", | |
| required=True, | |
| help="Output directory to save MAGMA files", | |
| ) | |
| parser.add_argument( | |
| "--workers", default=30, action="store", type=int, help="Num workers" | |
| ) | |
| return parser.parse_args() | |
| def get_matching_fragment( | |
| fragment_df, mass_comparison_vector, lowest_penalty_filter: bool | |
| ): | |
| """get_matching_fragment. | |
| Compare frag | |
| Args: | |
| fragment_df | |
| mass_comparison_vec | |
| lowest_penalty_filter | |
| """ | |
| # Step 1 - Determine and filter for fragments whose mass range cover the peak mass | |
| matched_fragments_df = fragment_df[mass_comparison_vector] | |
| # If no candidate fragments exist, exit function | |
| if matched_fragments_df.shape[0] == 0: | |
| return [] | |
| # Step 2 - If multiple candidate substructures, filter for those with the lowest penalty scores | |
| if lowest_penalty_filter: | |
| if matched_fragments_df.shape[0] > 1: | |
| min_score = matched_fragments_df["score"].min() | |
| matched_fragments_df = matched_fragments_df[ | |
| matched_fragments_df["score"] == min_score | |
| ] | |
| # Step 3 - Save all remaining candidate fragments | |
| matched_fragment_idxs = list(matched_fragments_df.index) | |
| return matched_fragment_idxs | |
| def get_fragment_mass_range(fragment_engine, fragment_df, tolerance): | |
| """get_fragment_mass_range. | |
| Define min and max masses in the range that are available based upon | |
| hydrogen diffs. | |
| Args: | |
| fragment_engine: Fragment engine | |
| fragment_df: fragment_df | |
| tolerance: Tolerance | |
| """ | |
| fragment_masses_np = fragment_engine.fragment_masses_np | |
| # Build a list of the min and max mass of each fragment | |
| fragment_mass_min_max = [] | |
| for fragment_idx in range(fragment_masses_np.shape[0]): | |
| fragment_masses = fragment_masses_np[fragment_idx, :] | |
| if np.sum(fragment_masses) == 0: | |
| min_frag_mass = 0 | |
| max_frag_mass = 0 | |
| else: | |
| min_frag_mass = ( | |
| fragment_masses[np.nonzero(fragment_masses)[0][0]] - tolerance | |
| ) | |
| max_frag_mass = max(fragment_masses) + tolerance | |
| fragment_mass_min_max.append((min_frag_mass, max_frag_mass)) | |
| fragment_mass_min_max = np.array(fragment_mass_min_max) | |
| fragment_df["min_mass"] = fragment_mass_min_max[:, 0] | |
| fragment_df["max_mass"] = fragment_mass_min_max[:, 1] | |
| return fragment_df | |
| def run_magma_wrapper(args): | |
| if os.path.exists(args[-1]): # skip over ones that have been processed | |
| return | |
| return run_magma(*args) | |
| def run_magma(identifier, mzs, intensities, smiles, adduct, save_filename=''): | |
| '''YZC | |
| Run fragmentation, assignment, and save results | |
| ''' | |
| # Step 1 - Load fragmentation engine and generate fragments | |
| ( | |
| max_broken_bonds, | |
| max_water_losses, | |
| ionisation_mode, | |
| skip_fragmentation, | |
| molcharge, | |
| ) = FRAGMENT_ENGINE_PARAMS.values() | |
| try: | |
| engine = FragmentEngine( | |
| smiles=smiles, | |
| max_broken_bonds=max_broken_bonds, | |
| max_water_losses=max_water_losses, | |
| ionisation_mode=ionisation_mode, | |
| skip_fragmentation=skip_fragmentation, | |
| molcharge=molcharge, | |
| ) | |
| engine.generate_fragments() | |
| except Exception as e: | |
| logging.info(f"Error for spec {identifier}") | |
| print(e) | |
| return None | |
| # Step 2 - Assign fragments to peaks | |
| assignment_dict = peak_fragment_assignment( | |
| engine, | |
| mzs, | |
| intensities, | |
| adduct, | |
| ) | |
| # Step 3 - Save assignments | |
| if save_filename: | |
| with open(save_filename, 'w') as f: | |
| json.dump(assignment_dict, f) | |
| else: | |
| return assignment_dict | |
| def peak_fragment_assignment(fragment_engine, mzs, intensities, adduct): | |
| ''' returns a df with columns | |
| Args: | |
| fragment_engine: FragmentEngine | |
| mzs: np array of mz values | |
| adduct: str eg. [M+H]+ [M+Na]+ | |
| Returns: | |
| assignment_df | |
| ''' | |
| fragments_info = fragment_engine.fragment_info | |
| fragment_df = pd.DataFrame( | |
| fragment_engine.fragment_info, columns=["id", "score", "bond_breaks"] | |
| ) | |
| fragment_df = get_fragment_mass_range(fragment_engine, fragment_df, tolerance=PEAK_ASSIGNMENT_PARAMS['tolerance']) | |
| # Need to build comparison values here | |
| min_fragment_mass = fragment_df["min_mass"].values | |
| max_fragment_mass = fragment_df["max_mass"].values | |
| adduct = magma_utils.extract_adduct_ion(adduct) | |
| charge = 1 if adduct.startswith('+') else -1 | |
| exact_masses = mzs + ionmasses[charge][adduct] | |
| mass_comparison_matrix = np.logical_and( | |
| exact_masses[None, :] >= min_fragment_mass[:, None], | |
| exact_masses[None, :] <= max_fragment_mass[:, None], | |
| ) | |
| # Iterate over each peak to find a match | |
| assignments = defaultdict(list) # {mz, intensity, subformulas, candidates} | |
| for k, (m, i) in enumerate(zip(mzs, intensities)): | |
| mass_comparison_vector = mass_comparison_matrix[:, k] | |
| matched_fragment_idxs = get_matching_fragment( | |
| fragment_df, | |
| mass_comparison_vector, | |
| lowest_penalty_filter=PEAK_ASSIGNMENT_PARAMS['lowest_penalty_filter'], | |
| ) | |
| # Save selected fragments info | |
| subformulas = set([]) | |
| substructures = set([]) | |
| for idx in matched_fragment_idxs: | |
| fragment_info = fragment_engine.get_fragment_info(fragments_info[idx][0], 0) | |
| subformulas.add(fragment_info[2]) | |
| substructures.add(fragment_info[3]) | |
| subformulas = list(subformulas) | |
| substructures = list(substructures) | |
| assignments['mz'].append(m) | |
| assignments['intensities'].append(i) | |
| assignments['subformulas'].append(subformulas) | |
| assignments['substructures'].append(substructures) | |
| return assignments | |
| if __name__ == "__main__": | |
| import time | |
| start_time = time.time() | |
| args = get_args() | |
| kwargs = args.__dict__ | |
| os.makedirs(args.output_dir, exist_ok=True) | |
| df = pd.read_csv(args.data_pth, sep='\t') | |
| df['save_filename'] = df['identifier'].apply(lambda x: os.path.join(args.output_dir, x + '.json')) | |
| df['mzs'] = df['mzs'].apply(lambda x: np.array([float(m) for m in x.split(',')])) | |
| df['intensities'] = df['intensities'].apply(lambda x: np.array([float(i) for i in x.split(',')])) | |
| df = df[['identifier', 'mzs', 'intensities', 'smiles', 'adduct', 'save_filename']] | |
| tasks = list(df.itertuples(index=False, name=None)) | |
| with Pool(processes=args.workers) as pool: | |
| results = list(tqdm(pool.imap_unordered(run_magma_wrapper, tasks), total=len(tasks))) | |
| # pool.starmap(run_magma, tasks) | |
| end_time = time.time() | |
| print(f"Program finished in: {end_time - start_time} seconds") |