Spaces:
Sleeping
Sleeping
| """ assign_subformulae.py | |
| Copied from https://github.com/samgoldman97/mist/blob/main_v2/src/mist/subformulae/assign_subformulae.py | |
| Given a set of spectra and candidates from a labels file, assign subformulae and save to JSON files. | |
| """ | |
| from pathlib import Path | |
| import argparse | |
| from functools import partial | |
| import numpy as np | |
| import pandas as pd | |
| import json | |
| import os | |
| from tqdm import tqdm | |
| import utils | |
| def get_args(): | |
| """get args""" | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument( | |
| "--feature-id", | |
| default="ID", | |
| help="ID key in mgf input" | |
| ) | |
| parser.add_argument( | |
| "--spec-files", | |
| default="data/paired_spectra/canopus_train/spec_files/", | |
| help="Spec files; either MGF or directory.", | |
| ) | |
| parser.add_argument("--output-dir", default=None, | |
| help="Name of output dir.") | |
| parser.add_argument( | |
| "--labels-file", | |
| default="data/paired_spectra/canopus_train/labels.tsv", | |
| help="Labels file", | |
| ) | |
| parser.add_argument( | |
| "--debug", action="store_true", default=False, help="Debug flag." | |
| ) | |
| parser.add_argument( | |
| "--mass-diff-type", | |
| default="ppm", | |
| type=str, | |
| help="Type of mass difference - absolute differece (abs) or relative difference (ppm).", | |
| ) | |
| parser.add_argument( | |
| "--mass-diff-thresh", | |
| action="store", | |
| default=20, | |
| type=float, | |
| help="Threshold of mass difference.", | |
| ) | |
| parser.add_argument( | |
| "--inten-thresh", | |
| action="store", | |
| default=0.001, | |
| type=float, | |
| help="Threshold of MS2 subpeak intensity (normalized to 1).", | |
| ) | |
| parser.add_argument( | |
| "--max-formulae", | |
| action="store", | |
| default=50, | |
| type=int, | |
| help="Max number of peaks to keep", | |
| ) | |
| parser.add_argument( | |
| "--num-workers", action="store", default=32, type=int, help="num workers" | |
| ) | |
| return parser.parse_args() | |
| def process_spec_file(spec_name: str, spec_files: str, max_inten=0.001, max_peaks=60): | |
| """_summary_ | |
| Args: | |
| spec_name (str): _description_ | |
| spec_files (str): _description_ | |
| max_inten (float, optional): _description_. Defaults to 0.001. | |
| max_peaks (int, optional): _description_. Defaults to 60. | |
| Returns: | |
| _type_: _description_ | |
| """ | |
| spec_file = Path(spec_files) / f"{spec_name}.ms" | |
| meta, tuples = utils.parse_spectra(spec_file) | |
| spec = utils.process_spec_file(meta, tuples) | |
| return spec_name, spec | |
| def assign_subforms(spec_files, labels_file, | |
| mass_diff_thresh: int = 20, | |
| mass_diff_type: str = "ppm", | |
| inten_thresh: float = 0.001, | |
| output_dir=None, | |
| num_workers: int = 32, | |
| feature_id="ID", | |
| max_formulae: int = 50, | |
| debug=False): | |
| """_summary_ | |
| Args: | |
| spec_files (_type_): _description_ | |
| labels_file (_type_): _description_ | |
| mass_diff_thresh (int, optional): _description_. Defaults to 20. | |
| mass_diff_type (str, optional): _description_. Defaults to "ppm". | |
| inten_thresh (float, optional): _description_. Defaults to 0.001. | |
| output_dir (_type_, optional): _description_. Defaults to None. | |
| num_workers (int, optional): _description_. Defaults to 32. | |
| feature_id (str, optional): _description_. Defaults to "ID". | |
| max_formulae (int, optional): _description_. Defaults to 50. | |
| debug (bool, optional): _description_. Defaults to False. | |
| Raises: | |
| ValueError: _description_ | |
| """ | |
| spec_files = Path(spec_files) | |
| label_path = Path(labels_file) | |
| # Read in labels | |
| labels_df = pd.read_csv(label_path, sep="\t").astype(str) | |
| if spec_files.suffix == ".tsv": # YZC msgym-like data | |
| labels_df.rename(columns={'identifier': 'spec', | |
| 'adduct': 'ionization'}, inplace=True) | |
| if debug: | |
| labels_df = labels_df[:50] | |
| # Define output directory name | |
| output_dir = Path(output_dir) | |
| if output_dir is None: | |
| subform_dir = label_path.parent / "subformulae" | |
| output_dir_name = f"subform_{max_formulae}" | |
| output_dir = subform_dir / output_dir_name | |
| output_dir.mkdir(exist_ok=True, parents=True) | |
| if spec_files.suffix == ".mgf": | |
| # Input specs | |
| parsed_specs = utils.parse_spectra_mgf(spec_files) | |
| input_specs = [utils.process_spec_file(*i) for i in parsed_specs] | |
| spec_names = [i[0][feature_id] for i in parsed_specs] | |
| input_specs = list(zip(spec_names, input_specs)) | |
| elif spec_files.is_dir(): | |
| spec_fn_lst = labels_df["spec"].to_list() | |
| proc_spec_full = partial( | |
| process_spec_file, | |
| spec_files=spec_files, | |
| max_inten=inten_thresh, | |
| max_peaks=max_formulae, | |
| ) | |
| # input_specs = [proc_spec_full(i) for i in tqdm(spec_fn_lst)] | |
| input_specs = utils.chunked_parallel( | |
| spec_fn_lst, proc_spec_full, chunks=100, max_cpu=max(num_workers, 1) | |
| ) | |
| elif spec_files.suffix == '.tsv': | |
| parsed_specs = utils.parse_spectra_msgym(labels_df) | |
| input_specs = [utils.process_spec_file(*i) for i in parsed_specs] | |
| spec_names = [i[0][feature_id] for i in parsed_specs] | |
| input_specs = list(zip(spec_names, input_specs)) | |
| else: | |
| raise ValueError(f"Spec files arg {spec_files} is not a dir or mgf") | |
| # input_specs contains a list of tuples (spec, subpeak tuple array) | |
| input_specs_dict = {tup[0]: tup[1] for tup in input_specs} | |
| export_dicts, spec_names = [], [] | |
| for _, row in labels_df.iterrows(): | |
| spec = str(row["spec"]) | |
| new_entry = { | |
| "spec": input_specs_dict[spec], | |
| "form": row["formula"], | |
| "mass_diff_type": mass_diff_type, | |
| "spec_name": spec, | |
| "mass_diff_thresh": mass_diff_thresh, | |
| "ion_type": row["ionization"], | |
| } | |
| spec_names.append(spec) | |
| export_dicts.append(new_entry) | |
| # Build dicts | |
| print(f"There are {len(export_dicts)} spec-cand pairs this spec files") | |
| def export_wrapper(x): return utils.get_output_dict(**x) | |
| if debug: | |
| output_dict_lst = [export_wrapper(i) for i in export_dicts[:10]] | |
| else: | |
| output_dict_lst = utils.chunked_parallel( | |
| export_dicts, export_wrapper, chunks=100, max_cpu=max(num_workers, 1) | |
| ) | |
| assert len(export_dicts) == len(output_dict_lst) | |
| # Write all output jsons to files | |
| os.makedirs(output_dir, exist_ok=True) | |
| print(f"Writing output to {output_dir}") | |
| for output_dict, spec_name in tqdm(zip(output_dict_lst, spec_names)): | |
| with open(output_dir / f"{spec_name}.json", "w") as f: | |
| json.dump(output_dict, f, indent=4) | |
| f.close() | |
| if __name__ == "__main__": | |
| args = get_args() | |
| assign_subforms(spec_files=args.spec_files, | |
| labels_file=args.labels_file, | |
| mass_diff_thresh=args.mass_diff_thresh, | |
| mass_diff_type=args.mass_diff_type, | |
| inten_thresh=args.inten_thresh, | |
| output_dir=args.output_dir, | |
| num_workers=args.num_workers, | |
| feature_id=args.feature_id, | |
| max_formulae=args.max_formulae, | |
| debug=args.debug) |