Spaces:
Sleeping
Sleeping
File size: 7,457 Bytes
c65d76d |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 |
""" assign_subformulae.py
Copied from https://github.com/samgoldman97/mist/blob/main_v2/src/mist/subformulae/assign_subformulae.py
Given a set of spectra and candidates from a labels file, assign subformulae and save to JSON files.
"""
from pathlib import Path
import argparse
from functools import partial
import numpy as np
import pandas as pd
import json
import os
from tqdm import tqdm
import utils
def get_args():
"""get args"""
parser = argparse.ArgumentParser()
parser.add_argument(
"--feature-id",
default="ID",
help="ID key in mgf input"
)
parser.add_argument(
"--spec-files",
default="data/paired_spectra/canopus_train/spec_files/",
help="Spec files; either MGF or directory.",
)
parser.add_argument("--output-dir", default=None,
help="Name of output dir.")
parser.add_argument(
"--labels-file",
default="data/paired_spectra/canopus_train/labels.tsv",
help="Labels file",
)
parser.add_argument(
"--debug", action="store_true", default=False, help="Debug flag."
)
parser.add_argument(
"--mass-diff-type",
default="ppm",
type=str,
help="Type of mass difference - absolute differece (abs) or relative difference (ppm).",
)
parser.add_argument(
"--mass-diff-thresh",
action="store",
default=20,
type=float,
help="Threshold of mass difference.",
)
parser.add_argument(
"--inten-thresh",
action="store",
default=0.001,
type=float,
help="Threshold of MS2 subpeak intensity (normalized to 1).",
)
parser.add_argument(
"--max-formulae",
action="store",
default=50,
type=int,
help="Max number of peaks to keep",
)
parser.add_argument(
"--num-workers", action="store", default=32, type=int, help="num workers"
)
return parser.parse_args()
def process_spec_file(spec_name: str, spec_files: str, max_inten=0.001, max_peaks=60):
"""_summary_
Args:
spec_name (str): _description_
spec_files (str): _description_
max_inten (float, optional): _description_. Defaults to 0.001.
max_peaks (int, optional): _description_. Defaults to 60.
Returns:
_type_: _description_
"""
spec_file = Path(spec_files) / f"{spec_name}.ms"
meta, tuples = utils.parse_spectra(spec_file)
spec = utils.process_spec_file(meta, tuples)
return spec_name, spec
def assign_subforms(spec_files, labels_file,
mass_diff_thresh: int = 20,
mass_diff_type: str = "ppm",
inten_thresh: float = 0.001,
output_dir=None,
num_workers: int = 32,
feature_id="ID",
max_formulae: int = 50,
debug=False):
"""_summary_
Args:
spec_files (_type_): _description_
labels_file (_type_): _description_
mass_diff_thresh (int, optional): _description_. Defaults to 20.
mass_diff_type (str, optional): _description_. Defaults to "ppm".
inten_thresh (float, optional): _description_. Defaults to 0.001.
output_dir (_type_, optional): _description_. Defaults to None.
num_workers (int, optional): _description_. Defaults to 32.
feature_id (str, optional): _description_. Defaults to "ID".
max_formulae (int, optional): _description_. Defaults to 50.
debug (bool, optional): _description_. Defaults to False.
Raises:
ValueError: _description_
"""
spec_files = Path(spec_files)
label_path = Path(labels_file)
# Read in labels
labels_df = pd.read_csv(label_path, sep="\t").astype(str)
if spec_files.suffix == ".tsv": # YZC msgym-like data
labels_df.rename(columns={'identifier': 'spec',
'adduct': 'ionization'}, inplace=True)
if debug:
labels_df = labels_df[:50]
# Define output directory name
output_dir = Path(output_dir)
if output_dir is None:
subform_dir = label_path.parent / "subformulae"
output_dir_name = f"subform_{max_formulae}"
output_dir = subform_dir / output_dir_name
output_dir.mkdir(exist_ok=True, parents=True)
if spec_files.suffix == ".mgf":
# Input specs
parsed_specs = utils.parse_spectra_mgf(spec_files)
input_specs = [utils.process_spec_file(*i) for i in parsed_specs]
spec_names = [i[0][feature_id] for i in parsed_specs]
input_specs = list(zip(spec_names, input_specs))
elif spec_files.is_dir():
spec_fn_lst = labels_df["spec"].to_list()
proc_spec_full = partial(
process_spec_file,
spec_files=spec_files,
max_inten=inten_thresh,
max_peaks=max_formulae,
)
# input_specs = [proc_spec_full(i) for i in tqdm(spec_fn_lst)]
input_specs = utils.chunked_parallel(
spec_fn_lst, proc_spec_full, chunks=100, max_cpu=max(num_workers, 1)
)
elif spec_files.suffix == '.tsv':
parsed_specs = utils.parse_spectra_msgym(labels_df)
input_specs = [utils.process_spec_file(*i) for i in parsed_specs]
spec_names = [i[0][feature_id] for i in parsed_specs]
input_specs = list(zip(spec_names, input_specs))
else:
raise ValueError(f"Spec files arg {spec_files} is not a dir or mgf")
# input_specs contains a list of tuples (spec, subpeak tuple array)
input_specs_dict = {tup[0]: tup[1] for tup in input_specs}
export_dicts, spec_names = [], []
for _, row in labels_df.iterrows():
spec = str(row["spec"])
new_entry = {
"spec": input_specs_dict[spec],
"form": row["formula"],
"mass_diff_type": mass_diff_type,
"spec_name": spec,
"mass_diff_thresh": mass_diff_thresh,
"ion_type": row["ionization"],
}
spec_names.append(spec)
export_dicts.append(new_entry)
# Build dicts
print(f"There are {len(export_dicts)} spec-cand pairs this spec files")
def export_wrapper(x): return utils.get_output_dict(**x)
if debug:
output_dict_lst = [export_wrapper(i) for i in export_dicts[:10]]
else:
output_dict_lst = utils.chunked_parallel(
export_dicts, export_wrapper, chunks=100, max_cpu=max(num_workers, 1)
)
assert len(export_dicts) == len(output_dict_lst)
# Write all output jsons to files
os.makedirs(output_dir, exist_ok=True)
print(f"Writing output to {output_dir}")
for output_dict, spec_name in tqdm(zip(output_dict_lst, spec_names)):
with open(output_dir / f"{spec_name}.json", "w") as f:
json.dump(output_dict, f, indent=4)
f.close()
if __name__ == "__main__":
args = get_args()
assign_subforms(spec_files=args.spec_files,
labels_file=args.labels_file,
mass_diff_thresh=args.mass_diff_thresh,
mass_diff_type=args.mass_diff_type,
inten_thresh=args.inten_thresh,
output_dir=args.output_dir,
num_workers=args.num_workers,
feature_id=args.feature_id,
max_formulae=args.max_formulae,
debug=args.debug) |