import math import json import pandas as pd from statistics import mean from datetime import datetime from collections import defaultdict ### UTILITY def _safe_numeric(value): """Convert value to float, returning NaN for invalid values.""" try: if value is None or (isinstance(value, float) and math.isnan(value)): return float("nan") return float(value) except Exception: return float("nan") def aggregate_weekly_to_monthly(dates, values): """ Aggregate raw data points (weekly granularity) to monthly averages. Args: dates: List of date strings in 'YYYY-MM-DD' format values: List of corresponding values (e.g., faithfulness scores) Returns: Dictionary mapping 'YYYY_MM' to average value for that month """ monthly_data = defaultdict(list) for date_str, value in zip(dates, values): try: date_obj = datetime.strptime(date_str, '%Y-%m-%d') month_key = f"{date_obj.year}_{date_obj.month:02d}" monthly_data[month_key].append(value) except Exception: continue # Skip invalid dates # Calculate average for each month monthly_averages = {} for month_key, values_list in monthly_data.items(): if values_list: monthly_averages[month_key] = round(mean(values_list), 2) return monthly_averages def extract_provider_from_model_name(model_name): """ Extract provider from model name path. Args: model_name: String like 'CohereForAI/c4ai-command-a-03-2025' Returns: Provider string (e.g., 'CohereForAI') or empty string if not found """ if '/' in model_name: return model_name.split('/')[0] return "" def convert_changepoints_to_monthly(changepoint_dates): """ Convert changepoint dates to YYYY_MM format. Args: changepoint_dates: List of date strings in 'YYYY-MM-DD' format Returns: List of strings in 'YYYY_MM' format """ monthly_changepoints = [] for date_str in changepoint_dates: try: date_obj = datetime.strptime(date_str, '%Y-%m-%d') month_key = f"{date_obj.year}_{date_obj.month:02d}" monthly_changepoints.append(month_key) except Exception: continue # Skip invalid dates return monthly_changepoints def calc_year_avg(): """TODO: Calculate yearly average.""" return 1 def avg_smoothing(): """TODO: Apply smoothing to averages.""" pass def calculate_cumulative_average(values): """ Calculate cumulative average for a list of values. Args: values: List of numeric values Returns: List of cumulative averages where cumulative_avg[i] = mean(values[0:i+1]) """ cumulative_avg = [] running_sum = 0.0 for i, value in enumerate(values): running_sum += value cumulative_avg.append(running_sum / (i + 1)) return cumulative_avg ### DATA LOADING def load_data(filepath): """Load JSON data from file.""" with open(filepath, "r") as f: return json.load(f) def load_model_metadata(filepath): """Load model metadata from JSON file.""" with open(filepath, "r") as f: return json.load(f) def load_raw_model_data(filepath, model_name): """ Load raw data for a specific model from leaderboard_graph_data.json. Args: filepath: Path to the JSON file model_name: Name of the model to load Returns: Dictionary containing model data with keys: - dates: List of date strings - faithfulness: List of faithfulness scores - cumulative_refusals: List of cumulative refusal counts - segments: List of segment dictionaries - changepoint_dates: List of changepoint date strings - total_observations: Total number of observations """ with open(filepath, "r") as f: all_data = json.load(f) return all_data.get(model_name, {}) def prepare_mappings(data): """Create lookup dictionaries from loaded data.""" models_map = {m["id"]: m["name"] for m in data["models"]} metrics = [m["id"] for m in data["metrics"]] return models_map, metrics def build_year_column_mapping(years, months): """Build mapping of year -> list of aggregated month columns.""" return {year: [f"{year}_{month}" for month in months] for year in years} ### DATA TRANSFORMATION def validate_equal_measurements(data): """ Validate measurement counts across models and warn about discrepancies. Args: data: Dictionary with model names as keys Returns: tuple: (is_valid, measurement_counts_dict, message) - is_valid: Always True now (we allow different counts) - measurement_counts_dict: Dict mapping model_name -> count - message: Info/warning message about the counts """ measurement_counts = {} for model_name, model_data in data.items(): dates = model_data.get('dates', []) measurement_counts[model_name] = len(dates) if len(measurement_counts) == 0: return True, {}, "No models found in data" # Find max count max_count = max(measurement_counts.values()) min_count = min(measurement_counts.values()) if max_count == min_count: # All models have same count return True, measurement_counts, f"All models have {max_count} measurements" # Models have different counts - create warning message warning_msg = f"⚠️ Models have different measurement counts (range: {min_count}-{max_count}):\n" # Show models with fewer than max samples models_with_fewer = [] for model, count in sorted(measurement_counts.items(), key=lambda x: x[1]): if count < max_count: models_with_fewer.append(f" {model}: {count} samples (missing {max_count - count})") if models_with_fewer: warning_msg += "\n".join(models_with_fewer) warning_msg += f"\n\nModels with maximum samples ({max_count}):\n" for model, count in measurement_counts.items(): if count == max_count: warning_msg += f" {model}\n" return True, measurement_counts, warning_msg def transform_leaderboard_data_to_dataframe(data, years, months, model_metadata=None): """ Transform new leaderboard_graph_data.json format into DataFrame-compatible structure. Args: data: Dictionary with model names as keys years: List of year strings (e.g., ["2021", "2022", ...]) months: List of month strings (e.g., ["01", "02", ...]) model_metadata: Optional dictionary with model metadata (parameters, release date, etc.) Returns: List of row dictionaries ready for DataFrame creation """ # Validate measurements and get counts per model is_valid, measurement_counts, message = validate_equal_measurements(data) # print(message) rows = [] for model_name, model_data in data.items(): # Convert changepoints to monthly format changepoints = convert_changepoints_to_monthly( model_data.get("changepoint_dates", []) ) row = { "Model": model_name, "1st Detected cutoff": changepoints[0].replace("_", ".") if len(changepoints) > 0 else "", "2nd Detected cutoff": changepoints[1].replace("_", ".") if len(changepoints) > 1 else "", "trend_changepoints": changepoints # Keep for chart rendering (in YYYY_MM format) } # Add metadata if available # Try exact match first, then try with provider prefix metadata = None if model_metadata: if model_name in model_metadata: metadata = model_metadata[model_name] else: # Try adding provider prefix provider = extract_provider_from_model_name(model_name) if provider: prefixed_name = f"{provider}/{model_name}" metadata = model_metadata.get(prefixed_name) else: # Try all possible provider prefixes for models without / for key in model_metadata.keys(): if key.endswith(f"/{model_name}") or key == model_name: metadata = model_metadata[key] break if metadata: row["Provider"] = metadata.get("Provider", "") row["Parameters"] = metadata.get("Parameters", "") row["Provider cutoff"] = metadata.get("Provider cutoff", "") row["Release date"] = metadata.get("Release date", "") row["Self-declared cutoff"] = metadata.get("Model cutoff", "") else: # Set empty values if metadata not available # Fall back to extracting provider from model name if no metadata row["Provider"] = extract_provider_from_model_name(model_name) row["Parameters"] = "" row["Provider cutoff"] = "" row["Release date"] = "" row["Self-declared cutoff"] = "" # Aggregate faithfulness data to monthly averages dates = model_data.get("dates", []) faithfulness = model_data.get("faithfulness", []) monthly_averages = aggregate_weekly_to_monthly(dates, faithfulness) # Calculate evaluation period (min and max dates) if dates: try: date_objects = [datetime.strptime(d, '%Y-%m-%d') for d in dates] min_date = min(date_objects).strftime('%Y-%m-%d') max_date = max(date_objects).strftime('%Y-%m-%d') row["Evaluation period"] = f"{min_date} - {max_date}" except Exception: row["Evaluation period"] = "" else: row["Evaluation period"] = "" # Add monthly columns (e.g., "2021_01", "2021_02", ...) for month_key, avg_value in monthly_averages.items(): row[month_key] = avg_value # Calculate yearly averages all_years_values = [] # Collect all monthly values for overall average (specific to this model) for year in years: year_values = [] for month in months: month_key = f"{year}_{month}" if month_key in monthly_averages: year_values.append(monthly_averages[month_key]) # Add aggregated year column row[year] = round(mean(year_values), 2) if year_values else None # Collect for overall average calculation all_years_values.extend(year_values) # Calculate overall average across all years # Note: This is calculated from the model's actual sample count # Models with fewer samples will have their average based only on their available data row["Overall Average"] = round(mean(all_years_values), 2) if all_years_values else None rows.append(row) return rows def extract_metric_value(month_data, metric): """ Extract metric value from month data, trying new format first, then falling back to old format. """ # Try new format: "avg_accuracy" new_key = f"avg_{metric.lower()}" val = month_data.get(new_key) return val def process_month_data(result, year, month, metrics): """Process data for a single month and return row updates and values.""" row_updates = {} month_vals = [] year_vals = [] month_data = result.get("results", {}).get(year, {}).get(month, {}) for metric in metrics: val = extract_metric_value(month_data, metric) # Store metric-specific column (e.g., "accuracy_2023_01") row_updates[f"{metric}_{year}_{month}"] = val # Collect numeric values for aggregation if val is not None: try: numeric_val = float(val) month_vals.append(numeric_val) year_vals.append(numeric_val) except Exception: pass # Ignore non-numeric values for aggregation # Add aggregated month column (average across metrics) row_updates[f"{year}_{month}"] = ( round(mean(month_vals), 2) if month_vals else None ) return row_updates, year_vals def process_result_row(result, models_map, metrics, years, months): """Process a single result entry into a dataframe row.""" row = {"Model": models_map.get(result["id"], result["id"])} # Keep any provider/metadata columns row.update(result.get("columns", {})) # Add trend breakpoint row["trend_breakpoint"] = result.get("trend_breakpoint") # Process each year for year in years: all_year_vals = [] for month in months: month_updates, year_vals = process_month_data( result, year, month, metrics ) row.update(month_updates) all_year_vals.extend(year_vals) # Add aggregated year column row[year] = round(mean(all_year_vals), 2) if all_year_vals else None return row def create_dataframe(cfg, data, models_map=None, metrics=None, model_metadata=None): """ Transform loaded data into a pandas DataFrame. Supports both old format (with models_map and metrics) and new format (direct model data dictionary). """ # Check if this is the new format (direct model dictionary) if models_map is None and metrics is None: # New format: data is already the model dictionary rows = transform_leaderboard_data_to_dataframe( data, cfg.get("years"), cfg.get("months"), model_metadata ) else: # Old format: data contains "results" key rows = [ process_result_row( result, models_map, metrics, cfg.get("years"), cfg.get("months") ) for result in data["results"] ] return pd.DataFrame(rows) ### COLUMN DEFINITIONS def get_aggregated_columns(years, year_to_columns): """Get lists of aggregated year and month columns.""" aggregated_cols_year = years aggregated_cols_month = [ col for year in years for col in year_to_columns[year] ] return aggregated_cols_year, aggregated_cols_month