import json import time import numpy as np import torch import torch.nn as nn import wandb from fvcore.nn import FlopCountAnalysis from sklearn.metrics import roc_curve from torchvision import models, transforms from ndlinear import NdLinear transform = transforms.Compose([ transforms.Resize((224, 224)), transforms.RandomHorizontalFlip(), transforms.ColorJitter(brightness=0.2, contrast=0.2), transforms.RandomRotation(10), transforms.RandomResizedCrop((224, 224), scale=(0.8, 1.0)), transforms.ToTensor(), transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]) ]) class ReshapedNdLinear(torch.nn.Module): def __init__(self, nd_linear_layer): super(ReshapedNdLinear, self).__init__() self.nd_linear = nd_linear_layer def forward(self, x): x = x.reshape(*x.shape, 1) x = self.nd_linear(x) return x.view(x.size(0), -1) def print_cpu_layers(model): found_cpu_layer = False for name, module in model.named_modules(): if any(p.device.type == 'cpu' for p in module.parameters(recurse=False)): print(f"Layer: {name}, Device: CPU") found_cpu_layer = True if not found_cpu_layer: print("No layers are on the CPU.") def calculate_flops(model, input_tensor): model.eval() device = next(model.parameters()).device input_tensor = input_tensor.to(device) flops_analysis = FlopCountAnalysis(model, input_tensor) flops = flops_analysis.total() return flops def print_model_parameters(model): return sum(p.numel() for p in model.parameters()) def measure_latency_and_flops_cuda(model, input_tensor, warmup=10, runs=100): assert torch.cuda.is_available(), "CUDA is not available." device = torch.device('cuda') model.to(device) input_tensor = input_tensor.to(device) model.eval() torch.backends.cudnn.benchmark = True with torch.no_grad(): for _ in range(warmup): _ = model(input_tensor) torch.cuda.synchronize() timings = [] with torch.no_grad(): for _ in range(runs): start = time.time() _ = model(input_tensor) torch.cuda.synchronize() end = time.time() timings.append(end - start) avg_latency = sum(timings) / len(timings) flops = calculate_flops(model, input_tensor[:1, ...]) print(f"Average CUDA Latency over {runs} runs: {avg_latency * 1000:.3f} ms") print(f"Approx. FPS: {1.0 / avg_latency:.2f}") print(f"Approx. Flops: {flops / 10 ** 9:.2f} GFlops") return avg_latency, flops def modify_and_evaluate_backbone(model, cfg): device = torch.device("cuda" if torch.cuda.is_available() else "cpu") model.train() in_features = model.fc.in_features fc_nd = NdLinear((in_features, 1), (cfg.embedding_size // 32, 32)) reshaped_fc = ReshapedNdLinear(fc_nd).to(device) # Add dropout to the student model's fully connected layer model.fc = nn.Sequential( nn.Dropout(p=0.2), reshaped_fc ) for param in model.fc.parameters(): param.requires_grad = True total_params = print_model_parameters(model) wandb.log({"total_parameters": total_params}) model.to(device) print_cpu_layers(model) print(model) return model def load_config(config_path='config.json'): try: with open(config_path, 'r') as f: return json.load(f) except FileNotFoundError as fe: config = { "learning_rate": 0.001, # Adjusted learning rate "epochs": 1000, "batch_size": 32, "eval_batch_size": 512, "eval_every": 1000 } return config def find_optimal_threshold(embeddings1, embeddings2, labels): cosine_sim = np.sum(embeddings1 * embeddings2, axis=1) fpr, tpr, thresholds = roc_curve(labels, cosine_sim) # Youden's J statistic j_scores = tpr - fpr optimal_idx = np.argmax(j_scores) optimal_threshold = thresholds[optimal_idx] return optimal_threshold