File size: 7,546 Bytes
eb096cf
 
 
 
 
 
77183b4
 
 
 
 
eb096cf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
import shutil
import json
import os
import time
import itertools
import faiss
import src.datasets as datasets
import src.models as models
import src.indexes as indexes
import src.commons as commons
from src.customlogger import log_time, logger


def build_field_selection_maps(fields: list[str]) -> dict:
    """Build all combinations of fields for proverb selection."""
    combos = []
    for r in range(1, len(fields) + 1):
        combos.extend(itertools.combinations(fields, r))

    maps = {}
    for combo in combos:
        maps["_".join(combo)] = (
            lambda proverb, combo=combo:
                [proverb[field] for field in combo if field != "themes"]
                # Treat "themes" field differently, since it is an array
                + (proverb["themes"] if "themes" in combo else [])
        )
    return maps


def setup():
    """Set up the environment by loading the model, tokenizer, and dataset."""
    # Load tokenizer and model
    tokenizer = models.load_tokenizer()
    model = models.load_model()

    # Load proverbs dataset
    proverbs = datasets.load_proverbs()
    prompts = datasets.load_prompts()

    # By default, the train ratio is zero,
    # but we might still want to do some training in the future
    if datasets.prompts_dataset_splits_exists():
        # Load existing prompt dataset splits
        _, prompts_test_set = datasets.load_prompt_dataset_splits()
    else:
        # Split the prompt dataset into train and test sets
        _, prompts_test_set = datasets.split_dataset(prompts)

    return tokenizer, model, proverbs, prompts_test_set


@log_time
def test_distances(tokenizer: models.Tokenizer, model: models.Tokenizer, model_name: str,
                   proverbs: list[dict], prompts_test_set: list[dict],
                   map: tuple[str, callable], index_type: type, pooling_method: str,
                   remarks: str = "") -> dict:
    """Test the distances between the actual and expected proverbs."""
    # Create an index of the type specified from the proverbs dataset with the given map
    embeddings = commons.embed_dataset(
        proverbs, tokenizer, model, map=map[1], pooling_method=pooling_method)
    index = indexes.create_index(embeddings, index_type)

    # Perform inference on the test prompts
    test_prompts = [entry["prompt"] for entry in prompts_test_set]
    results = commons.inference(
        test_prompts, index, tokenizer, model, proverbs, pooling_method)
    actual_proverbs_embeddings = [result["embedding"] for result in results]

    # Build a mapping from proverb text to its index for efficient lookup
    proverb_to_index = {proverb["proverb"]
        : i for i, proverb in enumerate(proverbs)}

    # Find each test proverb in the proverbs dataset and recover its embedding
    test_proverbs = [entry["proverb"] for entry in prompts_test_set]
    proverbs_indexes = [proverb_to_index[proverb] for proverb in test_proverbs]
    expected_proverbs_embeddings = [embeddings[i] for i in proverbs_indexes]

    # Compute average distance and variance between actual and expected proverbs
    distances = faiss.pairwise_distances(
        actual_proverbs_embeddings, expected_proverbs_embeddings, metric=index.metric_type)
    avg_distance = distances.mean()
    var_distance = distances.var()
    logger.info(
        f"Computed average distance between actual and expected proverbs: {avg_distance}")
    logger.info(
        f"Computed variance of distances between actual and expected proverbs: {var_distance}")

    test_results = {
        "model": model_name,
        "index_type": index_type.__name__,
        "prompts_test_set_length": len(prompts_test_set),
        "avg_distance": float(avg_distance),
        "var_distance": float(var_distance),
        "map": map[0],
        "map_fields": map[0].split("_"),
        "remarks": remarks,
        "pooling_method": pooling_method,
    }

    return test_results


def generate_unique_id() -> str:
    """Build a unique identifier including the current timestamp."""
    timestamp = time.strftime("%Y%m%d_%H%M%S")
    id = timestamp
    return id


if __name__ == "__main__":
    # Hyperparameters to combine and iterate over
    MODELS = models.MODELS
    PROVERB_FIELD_MAPS = {
        "proverb_sentiment_usage": datasets.default_proverb_fields_selection
    }
    INDEX_TYPES = [indexes.DEFAULT_INDEX_TYPE]
    POOLING_METHODS = [models.DEFAULT_POOLING_METHOD]

    remarks = "ALL hyperparameters combinations, this is going to take a while..."

    def log_test_case(test_number: int, test_case_id: str) -> str:
        """Local function to log the test case information using locally defined variables."""

        # Calculate the maximum lengths for formatting
        max_len_models = max(len(model) for model in MODELS)
        max_len_maps = max(len(map) for map in PROVERB_FIELD_MAPS.keys())
        max_len_index_types = max(len(index_type.__name__)
                                  for index_type in INDEX_TYPES)
        max_len_pooling_methods = max(len(pooling_method)
                                      for pooling_method in POOLING_METHODS)
        total_number_tests = len(
            MODELS) * len(PROVERB_FIELD_MAPS) * len(INDEX_TYPES) * len(POOLING_METHODS)
        max_len_test_number = len(str(total_number_tests))

        # Log the test case information
        logger.info(
            f"({str(test_number).rjust(max_len_test_number)}/{total_number_tests}) " +
            f"Test case {test_case_id}: " +
            f"model = {model_name.ljust(max_len_models)}, " +
            f"index type = {index_type.__name__.ljust(max_len_index_types)}, " +
            f"map = {map[0].ljust(max_len_maps)}, " +
            f"pooling = {pooling_method.ljust(max_len_pooling_methods)} "
        )

    tokenizer, model, proverbs, prompts_test_set = setup()

    # Set up the test run
    tests_run_id = generate_unique_id()
    run_folder = os.path.join(f"tests_runs", tests_run_id)
    os.makedirs(run_folder)
    tests_run_file = os.path.join(
        run_folder, f"results_test_run_{tests_run_id}.json")

    # Copy the test set to the run folder for reproducibility
    shutil.copy2(datasets.PROMPTS_TEST_FILE, run_folder)

    tests_run_results = {}
    test_number = 1

    for model_name in MODELS:
        model = models.load_model(model_name)
        tokenizer = models.load_tokenizer(model_name)
        for map in PROVERB_FIELD_MAPS.items():
            for pooling_method in POOLING_METHODS:
                for index_type in INDEX_TYPES:
                    # Generate unique identifier for the test case
                    test_case_id = generate_unique_id()

                    log_test_case(test_number, test_case_id)
                    test_case_results = test_distances(
                        tokenizer, model, model_name, proverbs, prompts_test_set, map, index_type, pooling_method, remarks
                    )

                    # Store test case results into a JSON
                    # (backup intermediate results in case of failure)
                    test_case_file = os.path.join(
                        run_folder, f"results_test_case_{test_case_id}.json")
                    with open(test_case_file, "w") as f:
                        json.dump(test_case_results, f, indent=2)

                    tests_run_results[test_case_id] = test_case_results
                    test_number += 1

    # Store test run results into a JSON
    with open(tests_run_file, "w") as f:
        json.dump(tests_run_results, f, indent=2)