Spaces:
Sleeping
Sleeping
| from MassSpecGym.massspecgym.utils import MyopicMCES | |
| import numpy as np | |
| import tqdm | |
| from multiprocessing import Pool | |
| import os | |
| import pandas as pd | |
| class Compute_Myopic_MCES: | |
| mces_compute = MyopicMCES() | |
| def compute_mces(tar_cand): | |
| target, cand = tar_cand | |
| dist = Compute_Myopic_MCES.mces_compute(target, cand) | |
| return (tar_cand, dist) | |
| def compute_mces_parallel(target_cand_list, n_processes=25): | |
| with Pool(processes=n_processes) as pool: | |
| results = list(tqdm.tqdm(pool.imap(Compute_Myopic_MCES.compute_mces, target_cand_list), total=len(target_cand_list))) | |
| return results | |
| class Compute_Myopic_MCES_timeout: | |
| mces_compute = MyopicMCES() | |
| def compute_mces(tar_cand): | |
| target, cand = tar_cand | |
| dist = Compute_Myopic_MCES.mces_compute(target, cand) | |
| return (tar_cand, dist) | |
| def compute_mces_parallel(target_cand_list, n_processes=35, timeout=60): # timeout in seconds | |
| results = [] | |
| with Pool(processes=n_processes) as pool: | |
| async_results = [ | |
| pool.apply_async(Compute_Myopic_MCES.compute_mces, args=(tar_cand,)) | |
| for tar_cand in target_cand_list | |
| ] | |
| for async_res in tqdm.tqdm(async_results, total=len(target_cand_list)): | |
| try: | |
| result = async_res.get(timeout=timeout) | |
| except Exception as e: | |
| # You can log the error or return a default value | |
| result = (None, f"Timeout or error") | |
| results.append(result) | |
| return results | |
| def get_result_files(exp_dir, spec_type, views_type): | |
| files = os.listdir(exp_dir) | |
| mass_result = '' | |
| form_result = '' | |
| for f in files: | |
| try: | |
| _, s, views = f.split('_') | |
| except: | |
| continue | |
| if s == spec_type and views == views_type: | |
| print(exp_dir / f) | |
| files = os.listdir(exp_dir / f) | |
| for fr in files: | |
| if 'mass_result' in fr: | |
| mass_result = exp_dir / f / fr | |
| elif 'result' in fr: | |
| form_result = exp_dir / f/ fr | |
| return mass_result, form_result | |
| # get target | |
| def get_target(candidates, labels): | |
| return np.array(candidates)[labels][0] | |
| # get mol rank at 1 | |
| def get_top_cand(candidates, scores): | |
| return candidates[np.argmax(scores)] | |
| # split into hit rates | |
| def convert_rank_to_hit_rates(row, rank_col ,top_k=[1,5,20]): | |
| top_k_hits ={} | |
| rank = row[rank_col] | |
| for k in top_k: | |
| if rank <= k: | |
| top_k_hits[f'{rank_col}-hit_rate@{k}'] = 1 | |
| else: | |
| top_k_hits[f'{rank_col}-hit_rate@{k}'] = 0 | |
| return pd.Series(top_k_hits) | |
| #################### Rank aggregation ####################### | |
| from collections import defaultdict | |
| import numpy as np | |
| from scipy.stats import rankdata | |
| def borda_count(candidates, score_lists, target): | |
| scores = defaultdict(int) | |
| N = len(candidates) | |
| for score_list in score_lists: | |
| ranked_list = sorted(zip(candidates, score_list), key=lambda x: x[1], reverse=True) | |
| for rank, (mol, _) in enumerate(ranked_list, start=1): | |
| scores[mol] += N - rank + 1 | |
| ranked_candidates = [mol for mol, _ in sorted(scores.items(), key=lambda x: x[1], reverse=True)] | |
| return ranked_candidates.index(target) + 1 if target in ranked_candidates else None | |
| def average_rank(candidates, score_lists, target): | |
| rank_sums = defaultdict(list) | |
| for score_list in score_lists: | |
| ranked_list = sorted(zip(candidates, score_list), key=lambda x: x[1], reverse=True) | |
| for rank, (mol, _) in enumerate(ranked_list, start=1): | |
| rank_sums[mol].append(rank) | |
| avg_ranks = {mol: np.mean(ranks) for mol, ranks in rank_sums.items()} | |
| ranked_candidates = [mol for mol, _ in sorted(avg_ranks.items(), key=lambda x: x[1])] | |
| return ranked_candidates.index(target) + 1 if target in ranked_candidates else None | |
| def reciprocal_rank_aggregation(candidates, score_lists, target): | |
| scores = defaultdict(float) | |
| for score_list in score_lists: | |
| ranked_list = sorted(zip(candidates, score_list), key=lambda x: x[1], reverse=True) | |
| for rank, (mol, _) in enumerate(ranked_list, start=1): | |
| scores[mol] += 1 / rank | |
| ranked_candidates = [mol for mol, _ in sorted(scores.items(), key=lambda x: x[1], reverse=True)] | |
| return ranked_candidates.index(target) + 1 if target in ranked_candidates else None | |
| def weighted_voting(candidates, score_lists, weights, target): | |
| scores = defaultdict(float) | |
| for weight, score_list in zip(weights, score_lists): | |
| ranked_list = sorted(zip(candidates, score_list), key=lambda x: x[1], reverse=True) | |
| for rank, (mol, _) in enumerate(ranked_list, start=1): | |
| scores[mol] += weight / rank | |
| ranked_candidates = [mol for mol, _ in sorted(scores.items(), key=lambda x: x[1], reverse=True)] | |
| return ranked_candidates.index(target) + 1 if target in ranked_candidates else None | |
| def median_rank(candidates, score_lists, target): | |
| rank_sums = defaultdict(list) | |
| for score_list in score_lists: | |
| ranked_list = sorted(zip(candidates, score_list), key=lambda x: x[1], reverse=True) | |
| for rank, (mol, _) in enumerate(ranked_list, start=1): | |
| rank_sums[mol].append(rank) | |
| median_ranks = {mol: np.median(ranks) for mol, ranks in rank_sums.items()} | |
| ranked_candidates = [mol for mol, _ in sorted(median_ranks.items(), key=lambda x: x[1])] | |
| return ranked_candidates.index(target) + 1 if target in ranked_candidates else None | |
| def score_based_aggregation(candidates, score_lists, target): | |
| scores = defaultdict(list) | |
| for score_list in score_lists: | |
| for mol, score in zip(candidates, score_list): | |
| scores[mol].append(score) | |
| avg_scores = {mol: np.mean(vals) for mol, vals in scores.items()} | |
| ranked_candidates = [mol for mol, _ in sorted(avg_scores.items(), key=lambda x: x[1], reverse=True)] | |
| return ranked_candidates.index(target) + 1 if target in ranked_candidates else None | |