Spaces:
Sleeping
Sleeping
| import pickle | |
| import random | |
| import shutil | |
| from collections import Counter | |
| from pathlib import Path | |
| import numpy | |
| import zipfile | |
| SERVER_URL = "http://localhost:8000/" | |
| INPUT_BROWSER_LIMIT = 550 | |
| DATA_DIR = Path("./data") | |
| DEPLOYMENT_DIR = Path("./deployment") | |
| ROOT_DIR = DEPLOYMENT_DIR / "users" | |
| SHARED_BASE_MODULE_DIR = DEPLOYMENT_DIR / "base_modules" | |
| SHARED_SMOOTHER_MODULE_DIR = DEPLOYMENT_DIR / "smoother_module" | |
| KEY_SMOOTHER_MODULE_DIR = "EvaluationKey_Smoother" | |
| KEY_BASE_MODULE_DIR = "EvaluationKey_Base_Modules" | |
| ENCRYPTED_INPUT_DIR = "Encrypt_Input" | |
| ENCRYPTED_OUTPUT_DIR = "Encrypt_Output" | |
| FHE_COMPUTATION_TIMELINE = Path("server_fhe_computation_timeline.txt") | |
| LABELS = ["European", "African", "Americas", "East Asian", "South Asian"] | |
| ID_POPULATION = {0: "European", 3: "African", 2: "Americas", 1: "East Asian", 4: "South Asian"} | |
| POPULATION_ID = {"European": 0, "African": 3, "Americas": 2, "East Asian": 1, "South Asian": 4} | |
| COLORS = ["#FFD208", "#FFE46C", "#FFED9C", "#FFF6CE", "#FFD9A0"] | |
| # load_pickle("data/meta_dict.pkl") | |
| META = {"A": 5, "C": 1059079, "M": 10589, "NW": 100, "CT": 1059, "CTR": 0.1, "WSCM": 0.2, "SS": 75} | |
| BUILD_GENS = [1, 2, 4, 6, 8, 12, 16, 24, 32, 48] | |
| def load_pickle_from_zip(zip_path, file_name): | |
| """ | |
| Load a pickle file from within a zip archive. | |
| """ | |
| with zipfile.ZipFile(zip_path, 'r') as z: | |
| with z.open(file_name) as f: | |
| return pickle.load(f) | |
| def generate_weighted_percentages(): | |
| dominant_percentage = random.randint(50, 70) | |
| remaining_percentage = 100 - dominant_percentage | |
| other_percentages = [random.random() for _ in range(4)] | |
| total = sum(other_percentages) | |
| other_percentages = [round(p / total * remaining_percentage, 2) for p in other_percentages] | |
| percentages = [dominant_percentage] + other_percentages | |
| # Adjust the total to be exactly 100 (if rounding errors occurred) | |
| diff = round(100 - sum(percentages), 2) | |
| if diff != 0: | |
| percentages[0] += diff # Adjust the dominant percentage to make the total 100 | |
| return percentages | |
| def select_random_ancestors(): | |
| ancestors = list(ID_POPULATION.keys()) | |
| random.shuffle(ancestors) | |
| return ancestors | |
| def read_pickle(path): | |
| with open(path, "rb") as f: | |
| data = pickle.load(f) | |
| return data | |
| def compute_distribution(y, size=5): | |
| y_pred = numpy.zeros(size) | |
| for k, v in Counter(y).items(): | |
| y_pred[k] = v / len(y) | |
| return y_pred | |
| def slide_window(data, smooth_win_size, y=None): | |
| N, W, A = data.shape | |
| pad = (smooth_win_size + 1) // 2 | |
| data_padded = numpy.pad(data, ((0, 0), (pad, pad), (0, 0)), mode="reflect") | |
| X_slide = numpy.lib.stride_tricks.sliding_window_view(data_padded, (1, smooth_win_size, A)) | |
| X_slide = X_slide[:, :W, :].reshape(N * W, -1) | |
| y_slide = None if y is None else y.reshape(N * W) | |
| return X_slide, y_slide | |
| # def read_vcf(vcf_file): | |
| # return allel.read_vcf(vcf_file, region=None, fields="*") | |
| def clean_dir(directory): | |
| """Remove the specified directory if it exists.""" | |
| if directory.exists() and directory.is_dir(): | |
| print(f"Removing existing model directory: {directory}") | |
| shutil.rmtree(directory) | |
| def process_data_for_base_modules(meta, X_t): | |
| n_windows = meta["NW"] # meta["C"] // meta["M"] | |
| context = meta["CT"] # int(meta["M"] * meta['CTR']) | |
| if context != 0.0: | |
| pad_left = numpy.flip(X_t[:, 0:context], axis=1) | |
| pad_right = numpy.flip(X_t[:, -context:], axis=1) | |
| X_t = numpy.concatenate([pad_left, X_t, pad_right], axis=1) | |
| M_ = meta["M"] + 2 * context | |
| idx = numpy.arange(0, meta["C"], meta["M"])[:-2] | |
| X_b = numpy.lib.stride_tricks.sliding_window_view(X_t, M_, axis=1)[:, idx, :] | |
| rem = meta["C"] - meta["M"] * n_windows | |
| # print(f"{X_t.shape=} -> {X_b.shape=} | {n_windows=}, {context=}, {M_=}, {rem=}") | |
| return X_b, n_windows, M_, rem | |
| def extract_model_number(path): | |
| try: | |
| return int(path.split("_")[-1]) | |
| except (ValueError, IndexError): | |
| print(f"Error: Unable to extract model number from path: {path}") | |
| return None | |
| def is_none(obj) -> bool: | |
| """ | |
| Check if the object is None. | |
| Args: | |
| obj (any): The input to be checked. | |
| Returns: | |
| bool: True if the object is None or empty, False otherwise. | |
| """ | |
| return obj is None or (obj is not None and (hasattr(obj, "__len__") and len(obj) == 0)) | |
| def load_pickle(path: str) -> numpy.array: | |
| """Load data. | |
| Args: | |
| path (str): | |
| Returns: | |
| Dict: The genome. | |
| """ | |
| with open(path, "rb") as f: | |
| data = pickle.load(f) | |
| return data | |
| def write_pickle(path: str, data) -> numpy.array: | |
| with open(path, "wb") as f: | |
| pickle.dump(data, f) | |
| def write_bytes(path, data): | |
| """Save binary data.""" | |
| with path.open("wb") as f: | |
| f.write(data) | |
| def read_bytes(path): | |
| """Load data from a binary file.""" | |
| with path.open("rb") as f: | |
| return f.read() | |