import os import json import numpy as np from mvp.data.transforms import SpecBinner, SpecBinnerLog, SpecFormulaFeaturizer from massspecgym.data.transforms import SpecTransform, MolTransform from mvp.data.transforms import MolToGraph import mvp.data.datasets as jestr_datasets import typing as T from mvp.definitions import MSGYM_FORMULA_VECTOR_NORM import matchms class Subformula_Loader: def __init__(self, spectra_view, dir_path) -> None: self.dir_path = dir_path if spectra_view == 'SpecFormula': self.load = self.load_subformula_data elif spectra_view == "SpecFormulaMz": self.load = self.load_subformula_dict else: raise Exception("Spectra view is not supported.") def __call__(self, ids): id_to_form_spec = {} for id in ids: data = self.load(id) if data: id_to_form_spec[id] = data return id_to_form_spec def load_subformula_data(self, spec_id: str): '''MIST subformula format:https://github.com/samgoldman97/mist/blob/main_v2/src/mist/utils/spectra_utils.py ''' try: file = os.path.join(self.dir_path, spec_id+".json") with open(file) as f: data = json.load(f) mzs = np.array(data['output_tbl']['mz']) formulas = np.array(data['output_tbl']['formula']) intensities = np.array(data['output_tbl']['ms2_inten']) # sort by mzs ind = mzs.argsort() mzs = mzs[ind] formulas = formulas[ind] intensities = intensities[ind] return {'formulas': formulas, 'formula_mzs': mzs, 'formula_intensities': intensities} except: return None def load_subformula_dict(self, spec_id: str): '''MIST subformula format:https://github.com/samgoldman97/mist/blob/main_v2/src/mist/utils/spectra_utils.py ''' try: file = os.path.join(self.dir_path, spec_id+".json") with open(file) as f: data = json.load(f) mzs = np.array(data['output_tbl']['mz']) formulas = np.array(data['output_tbl']['formula']) intensities = np.array(data['output_tbl']['ms2_inten']) mz_to_formulas = {mz:f for mz, f in zip(mzs, formulas)} for mz, f in zip(mzs, formulas): mz_to_formulas[mz] = f ind = mzs.argsort() mzs = mzs[ind] formulas = formulas[ind] intensities = intensities[ind] return {'formulas': mz_to_formulas, 'formula_mzs': mzs, 'formula_intensities': intensities} except: return None def make_tmp_subformula_spectra(row): return {'formulas':[row['formula']], 'formula_mzs':[float(row['precursor_mz'])], 'formula_intensities':[1.0]} def get_spec_featurizer(spectra_view: T.Union[str, list[str]], params) -> T.Union[SpecTransform, T.Dict[str, SpecTransform]]: featurizers = {"BinnedSpectra": SpecBinner, "SpecBinnerLog": SpecBinnerLog, "SpecFormula": SpecFormulaFeaturizer} spectra_featurizer = {} if isinstance(spectra_view, str): spectra_view = [spectra_view] for view in spectra_view: featurizer_params = {'max_mz': params['max_mz']} if view in ["BinnedSpectra", "SpecBinnerLog"]: featurizer_params.update({'bin_width': params['bin_width']}) elif view in ["SpecFormula"]: featurizer_params.update({'element_list': params['element_list'], 'add_intensities': params['add_intensities'], 'formula_normalize_vector': MSGYM_FORMULA_VECTOR_NORM}) spectra_featurizer[view] = featurizers[view](**featurizer_params) return spectra_featurizer def get_mol_featurizer(molecule_view: T.Union[str, T.List[str]], params) -> MolTransform: featurizes = {'MolGraph':MolToGraph} mol_featurizer = {} if isinstance(molecule_view, str): molecule_view = [molecule_view] for view in molecule_view: featurizer_params = {} if view in ('MolGraph'): featurizer_params.update({'atom_feature': params['atom_feature'], 'bond_feature': params['bond_feature'], 'element_list': params['element_list']}) if len(molecule_view) == 1: return featurizes[view](**featurizer_params) mol_featurizer[view] = featurizes[view](**featurizer_params) return mol_featurizer def get_test_ms_dataset(spectra_view: T.Union[str, T.List[str]], mol_view: T.Union[str, T.List[str]], spectra_featurizer: SpecTransform, mol_featurizer: MolTransform, params, external_test: bool = False,): use_formulas = False views = [] for v in [spectra_view, mol_view]: if isinstance(v, str): views.append(v) else: views.extend(v) views = frozenset(views) dataset_params = {'spectra_view': spectra_view, 'pth': params['dataset_pth'], 'spec_transform': spectra_featurizer, 'mol_transform': mol_featurizer, "candidates_pth": params['candidates_pth']} if "SpecFormula" in views or "SpecFormulaMz" in views: dataset_params.update({'subformula_dir_pth': params['subformula_dir_pth']}) use_formulas = True if params['use_cons_spec']: dataset_params.update({'cons_spec_dir_pth': params['cons_spec_dir_pth']}) if params['pred_fp'] or params['use_fp']: dataset_params.update({'fp_dir_pth': '', 'fp_size': params['fp_size'], 'fp_radius': params['fp_radius']}) return jestr_datasets.ExpandedRetrievalDataset(use_formulas=use_formulas, external_test=external_test, **dataset_params) def get_ms_dataset(spectra_view: str, mol_view: str, spectra_featurizer: SpecTransform, mol_featurizer: MolTransform, params): # set up dataset_parameters dataset_params = {'pth': params['dataset_pth'], 'spec_transform': spectra_featurizer, 'mol_transform': mol_featurizer, 'spectra_view': spectra_view} use_formulas = False if "SpecFormula" in spectra_view: dataset_params.update({'subformula_dir_pth': params['subformula_dir_pth']}) use_formulas = True if params['pred_fp'] or params['use_fp']: dataset_params.update({'fp_dir_pth': params['fp_dir_pth']}) if params['use_cons_spec']: dataset_params.update({'cons_spec_dir_pth': params['cons_spec_dir_pth']}) # select dataset if params['aug_cands']: return jestr_datasets.MassSpecDataset_Candidates(**dataset_params) elif use_formulas: return jestr_datasets.MassSpecDataset_PeakFormulas(**dataset_params) return jestr_datasets.JESTR1_MassSpecDataset(**dataset_params) class PrepMatchMS: def __init__(self, spectra_view) -> None: if spectra_view == 'SpecFormula': self.prepare = self.specFormula elif spectra_view == "SpecFormulaMz": self.prepare = self.specFormulaMz elif spectra_view in ('SpecBinnerLog', 'BinnedSpectra', 'SpecMzIntTokenizer'): self.prepare = self.specMzInt else: raise Exception("Spectra view is not supported.") def specFormulaMz(self, row): return matchms.Spectrum( mz = np.array([float(m) for m in row["mzs"].split(",")]), intensities = np.array( [float(i) for i in row["intensities"].split(",")] ), metadata = {'precursor_mz': row['precursor_mz'], 'formulas': row['formulas']} ) def specFormula(self, row): return matchms.Spectrum( mz = np.array(row['formula_mzs']), intensities = np.array(row['formula_intensities']), metadata = {'precursor_mz': row['precursor_mz'], 'formulas': np.array(row['formulas']), 'precursor_formula': row['precursor_formula']} ) def specMzInt(self, row): return matchms.Spectrum( mz = row['mzs'], intensities = row['intensities'], metadata = {'precursor_mz': row['precursor_mz']} )