MVP / mvp /subformula_assign /assign_subformulae.py
yzhouchen001's picture
main scripts
c65d76d
""" assign_subformulae.py
Copied from https://github.com/samgoldman97/mist/blob/main_v2/src/mist/subformulae/assign_subformulae.py
Given a set of spectra and candidates from a labels file, assign subformulae and save to JSON files.
"""
from pathlib import Path
import argparse
from functools import partial
import numpy as np
import pandas as pd
import json
import os
from tqdm import tqdm
import utils
def get_args():
"""get args"""
parser = argparse.ArgumentParser()
parser.add_argument(
"--feature-id",
default="ID",
help="ID key in mgf input"
)
parser.add_argument(
"--spec-files",
default="data/paired_spectra/canopus_train/spec_files/",
help="Spec files; either MGF or directory.",
)
parser.add_argument("--output-dir", default=None,
help="Name of output dir.")
parser.add_argument(
"--labels-file",
default="data/paired_spectra/canopus_train/labels.tsv",
help="Labels file",
)
parser.add_argument(
"--debug", action="store_true", default=False, help="Debug flag."
)
parser.add_argument(
"--mass-diff-type",
default="ppm",
type=str,
help="Type of mass difference - absolute differece (abs) or relative difference (ppm).",
)
parser.add_argument(
"--mass-diff-thresh",
action="store",
default=20,
type=float,
help="Threshold of mass difference.",
)
parser.add_argument(
"--inten-thresh",
action="store",
default=0.001,
type=float,
help="Threshold of MS2 subpeak intensity (normalized to 1).",
)
parser.add_argument(
"--max-formulae",
action="store",
default=50,
type=int,
help="Max number of peaks to keep",
)
parser.add_argument(
"--num-workers", action="store", default=32, type=int, help="num workers"
)
return parser.parse_args()
def process_spec_file(spec_name: str, spec_files: str, max_inten=0.001, max_peaks=60):
"""_summary_
Args:
spec_name (str): _description_
spec_files (str): _description_
max_inten (float, optional): _description_. Defaults to 0.001.
max_peaks (int, optional): _description_. Defaults to 60.
Returns:
_type_: _description_
"""
spec_file = Path(spec_files) / f"{spec_name}.ms"
meta, tuples = utils.parse_spectra(spec_file)
spec = utils.process_spec_file(meta, tuples)
return spec_name, spec
def assign_subforms(spec_files, labels_file,
mass_diff_thresh: int = 20,
mass_diff_type: str = "ppm",
inten_thresh: float = 0.001,
output_dir=None,
num_workers: int = 32,
feature_id="ID",
max_formulae: int = 50,
debug=False):
"""_summary_
Args:
spec_files (_type_): _description_
labels_file (_type_): _description_
mass_diff_thresh (int, optional): _description_. Defaults to 20.
mass_diff_type (str, optional): _description_. Defaults to "ppm".
inten_thresh (float, optional): _description_. Defaults to 0.001.
output_dir (_type_, optional): _description_. Defaults to None.
num_workers (int, optional): _description_. Defaults to 32.
feature_id (str, optional): _description_. Defaults to "ID".
max_formulae (int, optional): _description_. Defaults to 50.
debug (bool, optional): _description_. Defaults to False.
Raises:
ValueError: _description_
"""
spec_files = Path(spec_files)
label_path = Path(labels_file)
# Read in labels
labels_df = pd.read_csv(label_path, sep="\t").astype(str)
if spec_files.suffix == ".tsv": # YZC msgym-like data
labels_df.rename(columns={'identifier': 'spec',
'adduct': 'ionization'}, inplace=True)
if debug:
labels_df = labels_df[:50]
# Define output directory name
output_dir = Path(output_dir)
if output_dir is None:
subform_dir = label_path.parent / "subformulae"
output_dir_name = f"subform_{max_formulae}"
output_dir = subform_dir / output_dir_name
output_dir.mkdir(exist_ok=True, parents=True)
if spec_files.suffix == ".mgf":
# Input specs
parsed_specs = utils.parse_spectra_mgf(spec_files)
input_specs = [utils.process_spec_file(*i) for i in parsed_specs]
spec_names = [i[0][feature_id] for i in parsed_specs]
input_specs = list(zip(spec_names, input_specs))
elif spec_files.is_dir():
spec_fn_lst = labels_df["spec"].to_list()
proc_spec_full = partial(
process_spec_file,
spec_files=spec_files,
max_inten=inten_thresh,
max_peaks=max_formulae,
)
# input_specs = [proc_spec_full(i) for i in tqdm(spec_fn_lst)]
input_specs = utils.chunked_parallel(
spec_fn_lst, proc_spec_full, chunks=100, max_cpu=max(num_workers, 1)
)
elif spec_files.suffix == '.tsv':
parsed_specs = utils.parse_spectra_msgym(labels_df)
input_specs = [utils.process_spec_file(*i) for i in parsed_specs]
spec_names = [i[0][feature_id] for i in parsed_specs]
input_specs = list(zip(spec_names, input_specs))
else:
raise ValueError(f"Spec files arg {spec_files} is not a dir or mgf")
# input_specs contains a list of tuples (spec, subpeak tuple array)
input_specs_dict = {tup[0]: tup[1] for tup in input_specs}
export_dicts, spec_names = [], []
for _, row in labels_df.iterrows():
spec = str(row["spec"])
new_entry = {
"spec": input_specs_dict[spec],
"form": row["formula"],
"mass_diff_type": mass_diff_type,
"spec_name": spec,
"mass_diff_thresh": mass_diff_thresh,
"ion_type": row["ionization"],
}
spec_names.append(spec)
export_dicts.append(new_entry)
# Build dicts
print(f"There are {len(export_dicts)} spec-cand pairs this spec files")
def export_wrapper(x): return utils.get_output_dict(**x)
if debug:
output_dict_lst = [export_wrapper(i) for i in export_dicts[:10]]
else:
output_dict_lst = utils.chunked_parallel(
export_dicts, export_wrapper, chunks=100, max_cpu=max(num_workers, 1)
)
assert len(export_dicts) == len(output_dict_lst)
# Write all output jsons to files
os.makedirs(output_dir, exist_ok=True)
print(f"Writing output to {output_dir}")
for output_dict, spec_name in tqdm(zip(output_dict_lst, spec_names)):
with open(output_dir / f"{spec_name}.json", "w") as f:
json.dump(output_dict, f, indent=4)
f.close()
if __name__ == "__main__":
args = get_args()
assign_subforms(spec_files=args.spec_files,
labels_file=args.labels_file,
mass_diff_thresh=args.mass_diff_thresh,
mass_diff_type=args.mass_diff_type,
inten_thresh=args.inten_thresh,
output_dir=args.output_dir,
num_workers=args.num_workers,
feature_id=args.feature_id,
max_formulae=args.max_formulae,
debug=args.debug)