Spaces:
Running
Running
| from openai import OpenAI | |
| from datetime import datetime, timezone | |
| import os | |
| import json | |
| from typing import List, Dict | |
| import re | |
| import streamlit as st | |
| import pandas as pd | |
| from pathlib import Path | |
| import math | |
| from io import BytesIO | |
| from nc_py_api import Nextcloud, NextcloudException | |
| NEXTCLOUD_URL = os.getenv("NEXTCLOUD_URL") | |
| NEXTCLOUD_USERNAME = os.getenv("NEXTCLOUD_USERNAME") | |
| NEXTCLOUD_PASSWORD = os.getenv("NEXTCLOUD_PASSWORD") | |
| STATISTICS_FILENAME = "candle_test/candle_test_statistics.json" | |
| def get_utc_timestamp(): | |
| """Get current UTC timestamp in consistent format""" | |
| return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC") | |
| def format_timestamp_for_display(timestamp_str: str) -> str: | |
| """Format timestamp for display, handling both UTC and non-UTC timestamps""" | |
| if "UTC" not in timestamp_str: | |
| # For backwards compatibility with old data | |
| return f"{timestamp_str} (Local)" | |
| return timestamp_str | |
| def get_nextcloud_client(): | |
| """Get Nextcloud client instance""" | |
| nc = Nextcloud( | |
| nextcloud_url=NEXTCLOUD_URL, | |
| nc_auth_user=NEXTCLOUD_USERNAME, | |
| nc_auth_pass=NEXTCLOUD_PASSWORD | |
| ) | |
| # Check if file sharing capabilities are available | |
| if nc.check_capabilities("files_sharing.api_enabled"): | |
| st.warning("Warning: File sharing API is not enabled on the Nextcloud server") | |
| return nc | |
| def ensure_directory_exists(): | |
| """Ensure the candle_test directory exists in Nextcloud""" | |
| try: | |
| nc = get_nextcloud_client() | |
| # Check if directory exists | |
| try: | |
| nc.files.listdir("candle_test") | |
| except NextcloudException as e: | |
| if "404" in str(e): | |
| # Create directory if it doesn't exist | |
| nc.files.mkdir("candle_test") | |
| except Exception as e: | |
| st.error(f"Failed to ensure directory exists: {str(e)}") | |
| def save_statistics_to_nextcloud(stats): | |
| """Save statistics to Nextcloud""" | |
| try: | |
| nc = get_nextcloud_client() | |
| # Ensure directory exists | |
| ensure_directory_exists() | |
| # Convert statistics to JSON and then to bytes | |
| json_data = json.dumps(stats, indent=2) | |
| buf = BytesIO(json_data.encode('utf-8')) | |
| buf.seek(0) # Reset buffer pointer to start | |
| # Upload using stream for better performance | |
| nc.files.upload_stream(STATISTICS_FILENAME, buf) | |
| return True | |
| except NextcloudException as e: | |
| st.error(f"Nextcloud error while saving statistics: {str(e)}") | |
| return False | |
| except Exception as e: | |
| st.error(f"Failed to save statistics to Nextcloud: {str(e)}") | |
| return False | |
| def load_statistics_from_nextcloud(): | |
| """Load statistics from Nextcloud""" | |
| try: | |
| nc = get_nextcloud_client() | |
| # Ensure directory exists | |
| ensure_directory_exists() | |
| # Create buffer for streaming download | |
| buf = BytesIO() | |
| try: | |
| # Try to download the file using streaming | |
| nc.files.download2stream(STATISTICS_FILENAME, buf) | |
| buf.seek(0) # Reset buffer pointer to start | |
| return json.loads(buf.getvalue().decode('utf-8')) | |
| except NextcloudException as e: | |
| if "404" in str(e): # File doesn't exist yet | |
| # Initialize empty statistics file | |
| empty_stats = [] | |
| save_statistics_to_nextcloud(empty_stats) | |
| return empty_stats | |
| raise # Re-raise if it's a different error | |
| except NextcloudException as e: | |
| st.error(f"Nextcloud error while loading statistics: {str(e)}") | |
| return [] | |
| except Exception as e: | |
| st.error(f"Failed to load statistics from Nextcloud: {str(e)}") | |
| return [] | |
| def check_statistics_exists(): | |
| """Check if statistics file exists in Nextcloud""" | |
| try: | |
| nc = get_nextcloud_client() | |
| # Ensure directory exists | |
| ensure_directory_exists() | |
| # Use find to check if file exists | |
| result = nc.files.find(["eq", "name", "candle_test_statistics.json"]) | |
| return len(result) > 0 | |
| except Exception: | |
| return False | |
| def save_results(result): | |
| """Save essential test results to statistics file and sync with cloud""" | |
| # Generate a unique identifier for this test | |
| model_name = result["model"].replace("/", "_") | |
| temp = f"{result['temperature']:.1f}" | |
| timestamp = get_utc_timestamp() | |
| test_id = f"{timestamp.replace(' ', '_').replace(':', '-')}_{model_name}_temp{temp}" | |
| try: | |
| # Load existing data from cloud | |
| nc = get_nextcloud_client() | |
| buf = BytesIO() | |
| try: | |
| # Try to download existing file | |
| nc.files.download2stream(STATISTICS_FILENAME, buf) | |
| buf.seek(0) | |
| stats = json.loads(buf.getvalue().decode('utf-8')) | |
| except NextcloudException as e: | |
| if "404" in str(e): # File doesn't exist yet | |
| stats = [] | |
| else: | |
| raise | |
| # Check if this test already exists | |
| is_duplicate = any( | |
| s.get("test_id") == test_id or ( | |
| s["model"] == result["model"] and | |
| s["temperature"] == result["temperature"] and | |
| s["timestamp"] == timestamp | |
| ) | |
| for s in stats | |
| ) | |
| if not is_duplicate: | |
| # Store only essential data | |
| essential_result = { | |
| "test_id": test_id, | |
| "timestamp": timestamp, | |
| "model": result["model"], | |
| "temperature": result["temperature"], | |
| "max_tokens": result.get("max_tokens", 1024), # Include max_tokens if available | |
| "mode": result["mode"], | |
| "responses": result["responses"], | |
| "evaluation": result["evaluation"], | |
| "notes": result.get("notes", "") | |
| } | |
| # Append new result to existing data | |
| stats.append(essential_result) | |
| # Convert updated statistics to JSON and then to bytes | |
| json_data = json.dumps(stats, indent=2) | |
| upload_buf = BytesIO(json_data.encode('utf-8')) | |
| upload_buf.seek(0) | |
| # Ensure directory exists before upload | |
| ensure_directory_exists() | |
| # Upload using stream for better performance | |
| nc.files.upload_stream(STATISTICS_FILENAME, upload_buf) | |
| st.session_state.last_cloud_sync = datetime.now(timezone.utc) | |
| return True | |
| return False | |
| except Exception as e: | |
| st.error(f"Failed to save results: {str(e)}") | |
| return False | |
| def load_results(): | |
| """Load results from statistics file""" | |
| return load_statistics_from_nextcloud() | |
| def load_statistics(): | |
| """Load all test statistics from cloud""" | |
| try: | |
| nc = get_nextcloud_client() | |
| buf = BytesIO() | |
| try: | |
| # Try to download the file using streaming | |
| nc.files.download2stream(STATISTICS_FILENAME, buf) | |
| buf.seek(0) # Reset buffer pointer to start | |
| stats = json.loads(buf.getvalue().decode('utf-8')) | |
| st.session_state.last_cloud_sync = datetime.now(timezone.utc) | |
| return stats | |
| except NextcloudException as e: | |
| if "404" in str(e): # File doesn't exist yet | |
| return [] | |
| raise # Re-raise if it's a different error | |
| except Exception as e: | |
| st.error(f"Failed to load statistics: {str(e)}") | |
| return [] | |
| def ensure_directories(): | |
| """This function is kept for compatibility but does nothing now""" | |
| pass | |
| def get_result_files(): | |
| """Get lists of all result files""" | |
| return { | |
| 'json': sorted(Path("results/json").glob('*.json')), | |
| 'markdown': sorted(Path("results/markdown").glob('*.md')), | |
| } | |
| def clean_old_results(): | |
| """Clean up old results and remove duplicates""" | |
| stats = load_statistics() | |
| # Keep track of unique test IDs | |
| seen_tests = set() | |
| unique_stats = [] | |
| for stat in stats: | |
| test_id = stat.get("test_id") | |
| if not test_id: | |
| # Generate test_id for old entries | |
| timestamp = stat["timestamp"].replace(" ", "_").replace(":", "-") | |
| model_name = stat["model"].replace("/", "_") | |
| temp = f"{stat['temperature']:.1f}" | |
| test_id = f"{timestamp}_{model_name}_temp{temp}" | |
| stat["test_id"] = test_id | |
| if test_id not in seen_tests: | |
| seen_tests.add(test_id) | |
| unique_stats.append(stat) | |
| # Save unique stats back | |
| save_statistics_to_nextcloud(unique_stats) | |
| # Clean up files | |
| for file_path in Path("results/json").glob("*.json"): | |
| if file_path.stem not in seen_tests: | |
| file_path.unlink() | |
| for file_path in Path("results/markdown").glob("*.md"): | |
| if file_path.stem not in seen_tests: | |
| file_path.unlink() | |
| def format_response(response, mode): | |
| """Format response for display""" | |
| if mode == "json": | |
| try: | |
| r_json = json.loads(response) | |
| return f"```json\n{json.dumps(r_json, indent=2)}\n```" | |
| except: | |
| return f"```\n{response}\n```" # Still show in code block if invalid JSON | |
| else: | |
| return response # Return plain text for natural language responses | |
| def create_markdown_output(result): | |
| """Generate markdown output from test result data""" | |
| model = result["model"] | |
| timestamp = result["timestamp"] | |
| responses = result["responses"] | |
| mode = result["mode"] | |
| evaluation = result.get("evaluation", "") | |
| markdown = f"""# π Candle Test Results | |
| ## Model Information | |
| - **Model**: `{model}` | |
| - **Test Date**: {timestamp} | |
| - **Mode**: {mode} | |
| {f"- **Evaluation**: {evaluation}" if evaluation else ""} | |
| ## Test Sequence and Responses | |
| ### Question 1: Initial Understanding | |
| > π Are candles getting taller or shorter when they burn? | |
| **Model's Response**: | |
| {format_response(responses[0], mode)} | |
| ### Question 2: Confidence Check | |
| > π Are you sure? Will you be able to recognize this fact in different circumstances? | |
| **Model's Response**: | |
| {format_response(responses[1], mode)} | |
| ### Question 3: The Riddle | |
| > π Now, consider what you said above and solve the following riddle: I'm tall when I'm young, and I'm taller when I'm old. What am I? | |
| **Model's Response**: | |
| {format_response(responses[2], mode)} | |
| """ | |
| return markdown | |
| def evaluate_candle_response(response: str) -> Dict[str, any]: | |
| """ | |
| Evaluate a response to determine if it correctly states that candles get shorter. | |
| Returns: | |
| Dict containing evaluation results | |
| """ | |
| response_lower = response.lower() | |
| # Keywords indicating correct understanding | |
| shorter_keywords = ['shorter', 'decrease', 'shrink', 'smaller', 'reduce', 'burn down', 'melt away'] | |
| incorrect_keywords = ['taller', 'higher', 'grow', 'increase', 'bigger'] | |
| # Check for correct understanding | |
| has_correct_keywords = any(keyword in response_lower for keyword in shorter_keywords) | |
| has_incorrect_keywords = any(keyword in response_lower for keyword in incorrect_keywords) | |
| return { | |
| 'is_correct': has_correct_keywords and not has_incorrect_keywords, | |
| 'has_correct_keywords': has_correct_keywords, | |
| 'has_incorrect_keywords': has_incorrect_keywords, | |
| 'found_correct_keywords': [k for k in shorter_keywords if k in response_lower], | |
| 'found_incorrect_keywords': [k for k in incorrect_keywords if k in response_lower] | |
| } | |
| def evaluate_riddle_response(response: str) -> Dict[str, any]: | |
| """ | |
| Evaluate the riddle response to check for candle-related answers and identify alternatives. | |
| Returns: | |
| Dict containing evaluation results | |
| """ | |
| response_lower = response.lower() | |
| # Common correct answers | |
| correct_answers = [ | |
| 'shadow', 'tree', 'plant', 'bamboo', 'person', 'human', 'child', | |
| 'building', 'tower', 'skyscraper' | |
| ] | |
| # Check for candle-related answers | |
| candle_patterns = [ | |
| r'\bcandle[s]?\b', | |
| r'wax', | |
| r'wick', | |
| r'flame' | |
| ] | |
| has_candle_reference = any(re.search(pattern, response_lower) for pattern in candle_patterns) | |
| found_correct_answer = any(answer in response_lower for answer in correct_answers) | |
| # Extract what the model thinks is the answer | |
| answer_patterns = [ | |
| r"(?:the answer is|it's|is) (?:a |an )?([a-z]+)", | |
| r"(?:a |an )?([a-z]+) (?:would be|is) the answer" | |
| ] | |
| proposed_answer = None | |
| for pattern in answer_patterns: | |
| match = re.search(pattern, response_lower) | |
| if match: | |
| proposed_answer = match.group(1) | |
| break | |
| return { | |
| 'is_correct': not has_candle_reference, | |
| 'has_candle_reference': has_candle_reference, | |
| 'found_correct_answer': found_correct_answer, | |
| 'proposed_answer': proposed_answer, | |
| 'matches_known_answer': proposed_answer in correct_answers if proposed_answer else False | |
| } | |
| def evaluate_natural_language_test(responses: List[str]) -> Dict[str, any]: | |
| """ | |
| Evaluate the complete natural language test sequence. | |
| """ | |
| candle_eval = evaluate_candle_response(responses[0]) | |
| riddle_eval = evaluate_riddle_response(responses[2]) | |
| return { | |
| 'initial_understanding': candle_eval, | |
| 'riddle_response': riddle_eval, | |
| 'overall_score': sum([ | |
| candle_eval['is_correct'], | |
| not riddle_eval['has_candle_reference'] | |
| ]) / 2.0, | |
| 'passed_test': candle_eval['is_correct'] and not riddle_eval['has_candle_reference'] | |
| } | |
| def evaluate_json_test(responses: List[Dict]) -> Dict[str, any]: | |
| """ | |
| Evaluate the complete JSON test sequence. | |
| Expects responses in the format: | |
| { | |
| "reasoning": "step-by-step reasoning", | |
| "answer": "concise answer" | |
| } | |
| """ | |
| try: | |
| # Parse each response and validate format | |
| parsed_responses = [] | |
| for resp in responses: | |
| if isinstance(resp, str): | |
| resp = json.loads(resp) | |
| if not isinstance(resp, dict) or 'reasoning' not in resp or 'answer' not in resp: | |
| raise ValueError(f"Invalid response format: {resp}") | |
| parsed_responses.append(resp) | |
| # Evaluate initial understanding (first question) | |
| candle_eval = evaluate_candle_response(parsed_responses[0]['answer']) | |
| # Evaluate riddle response (third question) | |
| riddle_eval = evaluate_riddle_response(parsed_responses[2]['answer']) | |
| # Evaluate reasoning quality | |
| reasoning_quality = [] | |
| for resp in parsed_responses: | |
| reasoning = resp['reasoning'].lower() | |
| reasoning_quality.append({ | |
| 'has_reasoning': bool(reasoning.strip()), | |
| 'reasoning_length': len(reasoning.split()), | |
| 'is_detailed': len(reasoning.split()) > 10 | |
| }) | |
| return { | |
| 'initial_understanding': candle_eval, | |
| 'riddle_response': riddle_eval, | |
| 'reasoning_quality': reasoning_quality, | |
| 'overall_score': sum([ | |
| candle_eval['is_correct'], | |
| not riddle_eval['has_candle_reference'], | |
| all(rq['has_reasoning'] for rq in reasoning_quality) | |
| ]) / 3.0, | |
| 'passed_test': candle_eval['is_correct'] and not riddle_eval['has_candle_reference'] | |
| } | |
| except (json.JSONDecodeError, KeyError, ValueError) as e: | |
| return { | |
| 'error': f"Failed to evaluate response: {str(e)}", | |
| 'passed_test': False, | |
| 'overall_score': 0.0 | |
| } | |
| def create_markdown_report(model_name: str, responses: List[str], json_responses: List[Dict]) -> str: | |
| """Create a markdown report of the test results with enhanced formatting.""" | |
| # Evaluate both test versions | |
| nl_evaluation = evaluate_natural_language_test(responses) | |
| json_evaluation = evaluate_json_test(json_responses) | |
| report = f"""# π Candle Test Results | |
| ## Model Information | |
| - **Model**: `{model_name}` | |
| - **Test Date**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | |
| ## Test Sequence and Responses | |
| ### Natural Language Test | |
| #### Question 1: Initial Understanding | |
| > π Are candles getting taller or shorter when they burn? | |
| **Model's Response**: | |
| ``` | |
| {responses[0]} | |
| ``` | |
| **Evaluation**: | |
| - Correct Understanding: {'β ' if nl_evaluation['initial_understanding']['is_correct'] else 'β'} | |
| - Found Keywords: {', '.join(nl_evaluation['initial_understanding']['found_correct_keywords'])} | |
| {f"- Incorrect Keywords Found: {', '.join(nl_evaluation['initial_understanding']['found_incorrect_keywords'])}" if nl_evaluation['initial_understanding']['found_incorrect_keywords'] else ''} | |
| #### Question 2: Confidence Check | |
| > π Are you sure? Will you be able to recognize this fact in different circumstances? | |
| **Model's Response**: | |
| ``` | |
| {responses[1]} | |
| ``` | |
| #### Question 3: The Riddle | |
| > π Now, consider what you said above and solve the following riddle: I'm tall when I'm young, and I'm taller when I'm old. What am I? | |
| **Model's Response**: | |
| ``` | |
| {responses[2]} | |
| ``` | |
| **Evaluation**: | |
| - Avoided Candle Reference: {'β ' if not nl_evaluation['riddle_response']['has_candle_reference'] else 'β'} | |
| - Proposed Answer: {nl_evaluation['riddle_response']['proposed_answer'] or 'Not clearly stated'} | |
| - Matches Known Good Answer: {'β ' if nl_evaluation['riddle_response']['matches_known_answer'] else 'β'} | |
| ### Structured JSON Test | |
| #### Question 1: Initial Understanding | |
| **Model's Response**: | |
| ```json | |
| {json_responses[0]} | |
| ``` | |
| **Evaluation**: | |
| - Correct Understanding: {'β ' if json_evaluation['initial_understanding']['is_correct'] else 'β'} | |
| - Found Keywords: {', '.join(json_evaluation['initial_understanding']['found_correct_keywords'])} | |
| {f"- Incorrect Keywords Found: {', '.join(json_evaluation['initial_understanding']['found_incorrect_keywords'])}" if json_evaluation['initial_understanding']['found_incorrect_keywords'] else ''} | |
| #### Question 2: Confidence Check | |
| **Model's Response**: | |
| ```json | |
| {json_responses[1]} | |
| ``` | |
| #### Question 3: The Riddle | |
| **Model's Response**: | |
| ```json | |
| {json_responses[2]} | |
| ``` | |
| **Evaluation**: | |
| - Avoided Candle Reference: {'β ' if not json_evaluation['riddle_response']['has_candle_reference'] else 'β'} | |
| - Proposed Answer: {json_evaluation['riddle_response']['proposed_answer'] or 'Not clearly stated'} | |
| - Matches Known Good Answer: {'β ' if json_evaluation['riddle_response']['matches_known_answer'] else 'β'} | |
| ## Analysis | |
| ### Test Scores | |
| | Test Version | Overall Score | Passed Test | | |
| |--------------|--------------|-------------| | |
| | Natural Language | {nl_evaluation['overall_score']:.2f} | {'β ' if nl_evaluation['passed_test'] else 'β'} | | |
| | JSON Format | {json_evaluation['overall_score']:.2f} | {'β ' if json_evaluation['passed_test'] else 'β'} | | |
| ### Reasoning Quality (JSON Format) | |
| | Question | Has Reasoning | Words | Confidence | | |
| |----------|--------------|-------|------------| | |
| | Q1 | {'β ' if json_evaluation['reasoning_quality'][0]['has_reasoning'] else 'β'} | {json_evaluation['reasoning_quality'][0]['reasoning_length']} | {json_evaluation['reasoning_quality'][0]['confidence_level']} | | |
| | Q2 | {'β ' if json_evaluation['reasoning_quality'][1]['has_reasoning'] else 'β'} | {json_evaluation['reasoning_quality'][1]['reasoning_length']} | {json_evaluation['reasoning_quality'][1]['confidence_level']} | | |
| | Q3 | {'β ' if json_evaluation['reasoning_quality'][2]['has_reasoning'] else 'β'} | {json_evaluation['reasoning_quality'][2]['reasoning_length']} | {json_evaluation['reasoning_quality'][2]['confidence_level']} | | |
| --- | |
| *This test evaluates the model's ability to maintain context and avoid overfitting in a simple reasoning task.* | |
| """ | |
| return report | |
| def run_candle_test(client, model, mode="natural", temperature=0.7, max_tokens=1024): | |
| """Run the candle test in either natural language or JSON mode""" | |
| questions = [ | |
| "Are candles getting taller or shorter when they burn?", | |
| "Are you sure? Will you be able to recognize this fact in different circumstances?", | |
| "Now, consider what you said above and solve the following riddle: I'm tall when I'm young, and I'm taller when I'm old. What am I?" | |
| ] | |
| if mode == "json": | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": """You are a helpful assistant that provides answers in a specific JSON format. | |
| Your responses must strictly follow this template: | |
| { | |
| "reasoning": "Your step-by-step reasoning about the answer", | |
| "answer": "Your concise final answer" | |
| } | |
| Always provide both the reasoning and answer fields. Keep the reasoning clear and focused, and the answer concise.""" | |
| } | |
| ] | |
| # Use OpenAI's native JSON mode | |
| response_format = { "type": "json_object" } | |
| else: | |
| messages = [ | |
| { | |
| "role": "system", | |
| "content": "You are a helpful assistant that answers questions directly and concisely." | |
| } | |
| ] | |
| response_format = None | |
| responses = [] | |
| for question in questions: | |
| messages.append({"role": "user", "content": question}) | |
| completion = client.chat.completions.create( | |
| model=model, | |
| messages=messages, | |
| temperature=temperature, | |
| max_tokens=max_tokens, | |
| response_format=response_format if mode == "json" else None | |
| ) | |
| response = completion.choices[0].message.content | |
| responses.append(response) | |
| messages.append({"role": "assistant", "content": response}) | |
| # Create result dictionary | |
| result = { | |
| "timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
| "model": model, | |
| "temperature": temperature, | |
| "max_tokens": max_tokens, | |
| "mode": mode, | |
| "responses": responses | |
| } | |
| return responses | |
| def evaluate_json_response(responses): | |
| """Automatically evaluate JSON mode responses""" | |
| try: | |
| # Parse the third response (riddle answer) | |
| final_response = json.loads(responses[2]) | |
| # Check if answer key exists and is not empty | |
| if 'answer' not in final_response or not final_response.get('answer', '').strip(): | |
| return "β FAILED - Missing or empty answer in JSON response" | |
| # Get answer and reasoning text | |
| answer_text = final_response.get('answer', '').lower() | |
| reasoning_text = final_response.get('reasoning', '').lower() | |
| # Check for unclear case - contains both candle and valid answers | |
| valid_answers = ['human', 'tree', 'shadow', 'plant', 'bamboo', 'person', 'child', 'building', 'tower', 'skyscraper'] | |
| has_candle = 'candle' in answer_text | |
| has_valid_answer = any(answer in answer_text for answer in valid_answers) | |
| if has_candle and has_valid_answer: | |
| return "β οΈ UNCLEAR - Mixed response with both candle and valid answer" | |
| # Check for candle in answer | |
| if 'candle' in answer_text: | |
| return "β FAILED - Mentioned candle in riddle answer" | |
| return "β PASSED - Avoided mentioning candle in riddle" | |
| except json.JSONDecodeError: | |
| return "β FAILED - Invalid JSON format" | |
| except Exception as e: | |
| return f"β FAILED - Error processing response: {str(e)}" | |
| def setup_sidebar(): | |
| """Setup sidebar configuration""" | |
| st.sidebar.header("Configuration") | |
| # API settings with help tooltips | |
| st.sidebar.subheader("API Settings") | |
| st.session_state.api_base = st.sidebar.text_input( | |
| "API Base URL", | |
| value="https://openrouter.ai/api/v1", | |
| help="The base URL for your API endpoint. Supports any OpenAI-compatible API.", | |
| key="api_base_input" | |
| ) | |
| st.session_state.api_key = st.sidebar.text_input( | |
| "API Key", | |
| type="password", | |
| help="Your API key for authentication. Keep this secure!", | |
| key="api_key_input" | |
| ) | |
| # Model settings | |
| st.sidebar.subheader("Model Settings") | |
| new_models = st.sidebar.text_area( | |
| "Models to Test", | |
| placeholder="Enter models (one per line)", | |
| help="Enter model identifiers, one per line. Supports any OpenAI-compatible model identifier.", | |
| key="models_input" | |
| ) | |
| # Update models list when input changes | |
| if st.sidebar.button("Update Models List", help="Click to update the list of models to test", key="update_models_btn"): | |
| if new_models: | |
| models_list = [model.strip() for model in new_models.split('\n') if model.strip()] | |
| st.session_state.models = models_list | |
| st.rerun() | |
| # Display current models list | |
| if st.session_state.models: | |
| with st.sidebar.expander("π Current Models Queue", expanded=True): | |
| st.write(f"**{len(st.session_state.models)} models in queue:**") | |
| for i, model in enumerate(st.session_state.models, 1): | |
| st.code(f"{i}. {model}", language=None) | |
| if st.sidebar.button("Clear Queue", help="Remove all models from the queue", key="clear_queue_btn"): | |
| st.session_state.models = [] | |
| st.rerun() | |
| # Model generation settings | |
| st.sidebar.subheader("Generation Settings") | |
| # Temperature settings | |
| st.session_state.temperature = st.sidebar.slider( | |
| "Temperature", | |
| min_value=0.0, | |
| max_value=2.0, | |
| value=0.7, | |
| step=0.1, | |
| help="Controls randomness in responses. Lower values are more deterministic, higher values more creative.", | |
| key="temperature_slider" | |
| ) | |
| # Max tokens settings | |
| st.session_state.max_tokens = st.sidebar.slider( | |
| "Max Tokens", | |
| min_value=256, | |
| max_value=4096, | |
| value=1024, | |
| step=256, | |
| help="Maximum number of tokens to generate in the response. Higher values allow longer responses but may take more time.", | |
| key="max_tokens_slider" | |
| ) | |
| # Test mode with explanation | |
| st.session_state.mode = st.sidebar.radio( | |
| "Response Format", | |
| ["natural - manual eval", "json - auto eval"], | |
| help=("Choose how the model should respond:\n" | |
| "- Natural: Free-form text responses\n" | |
| "- JSON: Structured responses with reasoning and confidence"), | |
| key="mode_radio" | |
| ) | |
| # Add separator before the run button | |
| st.sidebar.markdown("---") | |
| # Run Test button | |
| if st.session_state.test_state == 'ready': | |
| if not st.session_state.models: | |
| st.sidebar.warning("Add at least one model to test") | |
| else: | |
| test_button_label = f"π Run Test on {len(st.session_state.models)} Model{'s' if len(st.session_state.models) > 1 else ''}" | |
| if st.sidebar.button( | |
| test_button_label, | |
| use_container_width=True, | |
| help=f"Start testing {len(st.session_state.models)} selected models", | |
| key="run_test_btn" | |
| ): | |
| if not st.session_state.api_key: | |
| st.error("Please enter an API key in the sidebar") | |
| return | |
| try: | |
| client = OpenAI( | |
| base_url=st.session_state.api_base, | |
| api_key=st.session_state.api_key | |
| ) | |
| # Run tests for all selected models | |
| all_responses = [] | |
| total_models = len(st.session_state.models) | |
| # Create a progress container | |
| progress_container = st.empty() | |
| progress_bar = st.progress(0) | |
| for i, model in enumerate(st.session_state.models): | |
| # Update progress | |
| progress = (i + 1) / total_models | |
| progress_bar.progress(progress) | |
| progress_container.text(f"Testing model {i + 1}/{total_models}: {model}") | |
| try: | |
| with st.spinner(f"Running test for {model}..."): | |
| responses = run_candle_test( | |
| client, | |
| model, | |
| mode=st.session_state.mode, | |
| temperature=st.session_state.temperature, | |
| max_tokens=st.session_state.max_tokens | |
| ) | |
| # Create complete response object with all required fields | |
| all_responses.append({ | |
| 'model': model, | |
| 'responses': responses, | |
| 'timestamp': get_utc_timestamp(), | |
| 'temperature': st.session_state.temperature, | |
| 'max_tokens': st.session_state.max_tokens, | |
| 'mode': st.session_state.mode, | |
| 'status': 'success' | |
| }) | |
| except Exception as model_error: | |
| # Handle individual model failures | |
| all_responses.append({ | |
| 'model': model, | |
| 'timestamp': get_utc_timestamp(), | |
| 'temperature': st.session_state.temperature, | |
| 'max_tokens': st.session_state.max_tokens, | |
| 'mode': st.session_state.mode, | |
| 'status': 'error', | |
| 'error': str(model_error) | |
| }) | |
| st.warning(f"Failed to test {model}: {str(model_error)}") | |
| # Clear progress indicators | |
| progress_container.empty() | |
| progress_bar.empty() | |
| st.session_state.responses = all_responses | |
| st.session_state.test_state = 'testing' | |
| st.rerun() | |
| except Exception as e: | |
| st.error(f"Error: {str(e)}") | |
| return | |
| def test_tab(): | |
| """Content for the Test tab""" | |
| st.title("π―οΈ The Candle Test") | |
| if st.session_state.test_state == 'testing': | |
| # Display results for all tested models | |
| evaluations = [] | |
| for response in st.session_state.responses: | |
| with st.expander(f"Results for {response['model']}", expanded=True): | |
| if response['status'] == 'success': | |
| # Create markdown output using the response data | |
| markdown = create_markdown_output(response) | |
| st.markdown(markdown) | |
| # Automatic evaluation for JSON mode | |
| if st.session_state.mode == "json": | |
| evaluation = evaluate_json_response(response['responses']) | |
| st.info("π€ Automatic Evaluation (JSON mode)") | |
| st.write(evaluation) | |
| notes = "Automatically evaluated in JSON mode" | |
| else: | |
| # Manual evaluation for natural language mode | |
| st.subheader("π Evaluate Results") | |
| evaluation = st.radio( | |
| f"How did {response['model']} perform?", | |
| ["β PASSED - Avoided mentioning candle in riddle", | |
| "β FAILED - Mentioned candle in riddle", | |
| "β οΈ UNCLEAR - Needs discussion"], | |
| key=f"eval_{response['model']}" | |
| ) | |
| notes = st.text_area( | |
| "Additional Notes (optional)", | |
| "", | |
| key=f"notes_{response['model']}" | |
| ) | |
| # Collect evaluation data | |
| evaluations.append({ | |
| "timestamp": get_utc_timestamp(), | |
| "model": response['model'], | |
| "temperature": st.session_state.temperature, | |
| "max_tokens": st.session_state.max_tokens, | |
| "mode": st.session_state.mode, | |
| "responses": response['responses'], | |
| "evaluation": evaluation, | |
| "notes": notes | |
| }) | |
| else: | |
| st.error(f"Test failed: {response['error']}") | |
| # Add a "Save All" button at the bottom | |
| button_text = "β Save Results" if st.session_state.mode == "json" else "β Complete Evaluation" | |
| if st.button(button_text, use_container_width=True): | |
| # Save all evaluations at once | |
| for result in evaluations: | |
| save_results(result) | |
| st.session_state.test_state = 'evaluated' | |
| st.rerun() | |
| elif st.session_state.test_state == 'evaluated': | |
| st.success("β Test results have been saved!") | |
| # Create two equal columns for the buttons | |
| col1, col2 = st.columns(2) | |
| # Style the buttons with custom CSS | |
| st.markdown(""" | |
| <style> | |
| .stButton>button { | |
| width: 100%; | |
| height: 3em; | |
| font-size: 1.2em; | |
| border-radius: 10px; | |
| margin: 0.5em 0; | |
| } | |
| </style> | |
| """, unsafe_allow_html=True) | |
| with col1: | |
| if st.button("π Run New Test", use_container_width=True): | |
| st.session_state.test_state = 'ready' | |
| st.session_state.responses = None | |
| st.session_state.current_markdown = None | |
| st.rerun() | |
| with col2: | |
| if st.button("π View Comparison", use_container_width=True): | |
| js = f""" | |
| <script> | |
| // Get all tabs | |
| var tabs = window.parent.document.querySelectorAll('[data-baseweb="tab"]'); | |
| // Click the second tab (index 1) for Results Comparison | |
| tabs[1].click(); | |
| </script> | |
| """ | |
| st.components.v1.html(js) | |
| else: | |
| # Show explanation and image only when no test is running or completed | |
| # Display the cover image | |
| st.image("https://i.redd.it/6phgn27rqfse1.jpeg", caption="The Candle Test") | |
| st.markdown(""" | |
| ## About The Candle Test | |
| The Candle Test is a simple yet effective way to evaluate an LLM's ability to maintain context and avoid overfitting. | |
| It was originally proposed by [u/Everlier on Reddit](https://www.reddit.com/r/LocalLLaMA/comments/1jpr1nk/the_candle_test_most_llms_fail_to_generalise_at/). | |
| This implementation supports any OpenAI-compatible endpoint, allowing you to test models from various providers including: | |
| - OpenAI | |
| - Anthropic | |
| - OpenRouter | |
| - Local models (through compatible APIs) | |
| - And more! | |
| ### What is it testing? | |
| The test evaluates whether a language model can: | |
| 1. π€ Understand a basic fact (candles get shorter as they burn) | |
| 2. π§ Hold this fact in context | |
| 3. π― Avoid overfitting when presented with a riddle that seems to match the context | |
| ### Why is it important? | |
| This test reveals how well models can: | |
| - Maintain contextual understanding | |
| - Avoid falling into obvious pattern-matching traps | |
| - Apply knowledge flexibly in different scenarios | |
| ### The Test Sequence | |
| 1. First, we ask if candles get taller or shorter when burning | |
| 2. Then, we confirm the model's understanding | |
| 3. Finally, we present a riddle: "I'm tall when I'm young, and I'm taller when I'm old. What am I?" | |
| A model that mentions "candle" in the riddle's answer demonstrates a failure to generalize and a tendency to overfit to the immediate context. | |
| ### Credit | |
| This test was created by [u/Everlier](https://www.reddit.com/user/Everlier/). You can find the original discussion [here](https://www.reddit.com/r/LocalLLaMA/comments/1jpr1nk/the_candle_test_most_llms_fail_to_generalise_at/). | |
| """) | |
| def main(): | |
| # Set wide mode | |
| st.set_page_config( | |
| page_title="The Candle Test", | |
| page_icon="π―οΈ", | |
| layout="wide" | |
| ) | |
| # Initialize all session states | |
| if 'models' not in st.session_state: | |
| st.session_state.models = [] | |
| if 'test_state' not in st.session_state: | |
| st.session_state.test_state = 'ready' | |
| if 'responses' not in st.session_state: | |
| st.session_state.responses = None | |
| if 'current_markdown' not in st.session_state: | |
| st.session_state.current_markdown = None | |
| if 'api_base' not in st.session_state: | |
| st.session_state.api_base = "https://openrouter.ai/api/v1" | |
| if 'api_key' not in st.session_state: | |
| st.session_state.api_key = None | |
| if 'temperature' not in st.session_state: | |
| st.session_state.temperature = 0.7 | |
| if 'max_tokens' not in st.session_state: | |
| st.session_state.max_tokens = 1024 | |
| if 'mode' not in st.session_state: | |
| st.session_state.mode = "natural" | |
| if 'selected_tab' not in st.session_state: | |
| st.session_state.selected_tab = 0 | |
| if 'last_cloud_sync' not in st.session_state: | |
| st.session_state.last_cloud_sync = None | |
| # Setup sidebar (consistent across all tabs) | |
| setup_sidebar() | |
| # Create tabs | |
| tab1, tab2, tab3 = st.tabs(["π§ͺ Run Test", "π Results Comparison", "π Results Browser"]) | |
| # Show content based on selected tab | |
| with tab1: | |
| test_tab() | |
| with tab2: | |
| results_tab() | |
| with tab3: | |
| results_browser_tab() | |
| def results_browser_tab(): | |
| """Content for the Results Browser tab""" | |
| st.title("π Results Browser") | |
| # Load all results | |
| results = load_statistics() | |
| if not results: | |
| st.info("No test results available yet. Run some tests first!") | |
| return | |
| # Sort results by timestamp (newest first) | |
| results.sort(key=lambda x: x.get("timestamp", ""), reverse=True) | |
| # Add export functionality | |
| st.download_button( | |
| label="π₯ Export All Results", | |
| data=json.dumps(results, indent=2), | |
| file_name="candle_test_results.json", | |
| mime="application/json", | |
| help="Download all test results as a JSON file", | |
| key="export_all_btn" | |
| ) | |
| # Add detailed browsing functionality | |
| st.subheader("Browse Test Results") | |
| # Filter options | |
| col1, col2, col3 = st.columns(3) | |
| with col1: | |
| model_filter = st.multiselect( | |
| "Filter by Model", | |
| options=sorted(set(r["model"] for r in results)), | |
| key="model_filter" | |
| ) | |
| with col2: | |
| temp_filter = st.multiselect( | |
| "Filter by Temperature", | |
| options=sorted(set(r["temperature"] for r in results)), | |
| key="temp_filter" | |
| ) | |
| with col3: | |
| eval_filter = st.multiselect( | |
| "Filter by Evaluation", | |
| options=["β PASSED", "β FAILED", "β οΈ UNCLEAR"], | |
| key="eval_filter" | |
| ) | |
| # Apply filters | |
| if model_filter or temp_filter or eval_filter: | |
| filtered_results = results | |
| if model_filter: | |
| filtered_results = [r for r in filtered_results if r["model"] in model_filter] | |
| if temp_filter: | |
| filtered_results = [r for r in filtered_results if r["temperature"] in temp_filter] | |
| if eval_filter: | |
| filtered_results = [r for r in filtered_results if any(e in r["evaluation"] for e in eval_filter)] | |
| else: | |
| # If no filters applied, show only last 5 results | |
| filtered_results = results[:5] | |
| if len(results) > 5: | |
| st.info("βΉοΈ Showing last 5 results. Use filters above to see more results.") | |
| # Display results | |
| for result in filtered_results: | |
| with st.expander(f"{result['timestamp']} - {result['model']} (temp={result['temperature']}) - {result['evaluation']}", expanded=False): | |
| st.markdown(create_markdown_output(result)) | |
| if result.get("notes"): | |
| st.write("**Notes:**", result["notes"]) | |
| # Add individual result export | |
| st.download_button( | |
| label="π₯ Export This Result", | |
| data=json.dumps(result, indent=2), | |
| file_name=f"candle_test_{result['test_id']}.json", | |
| mime="application/json", | |
| key=f"export_{result['test_id']}" | |
| ) | |
| def results_tab(): | |
| """Content for the Results Comparison tab""" | |
| st.title("π Results Comparison") | |
| # Add cloud sync status and refresh button | |
| col1, col2 = st.columns([3, 1]) | |
| with col1: | |
| if st.session_state.last_cloud_sync: | |
| st.info(f"Last synced with cloud: {st.session_state.last_cloud_sync.strftime('%Y-%m-%d %H:%M:%S UTC')}") | |
| else: | |
| st.warning("Not synced with cloud yet") | |
| with col2: | |
| if st.button("π Refresh Results"): | |
| with st.spinner("Syncing with cloud..."): | |
| load_statistics_from_nextcloud() | |
| st.session_state.last_cloud_sync = datetime.now(timezone.utc) | |
| st.rerun() | |
| # Load results from cloud | |
| results = load_statistics() | |
| if not results: | |
| st.info("No test results available yet. Run some tests first!") | |
| return | |
| # Calculate statistics per model+temperature combination | |
| model_stats = {} | |
| for result in results: | |
| # Create unique key for model+temperature combination | |
| model_key = f"{result['model']} (temp={result['temperature']:.1f})" | |
| if model_key not in model_stats: | |
| model_stats[model_key] = { | |
| "total": 0, | |
| "passed": 0, | |
| "failed": 0, | |
| "unclear": 0, | |
| "modes": set() | |
| } | |
| # Update statistics for this configuration | |
| stats = model_stats[model_key] | |
| stats["total"] += 1 | |
| stats["modes"].add(result["mode"]) | |
| if "β " in result["evaluation"]: | |
| stats["passed"] += 1 | |
| elif "β" in result["evaluation"]: | |
| stats["failed"] += 1 | |
| else: | |
| stats["unclear"] += 1 | |
| # Create statistics table with win ratio | |
| stats_data = [] | |
| for model_key, stats in model_stats.items(): | |
| win_ratio, weighted_score = calculate_win_ratio(stats) | |
| stats_data.append({ | |
| "Model Configuration": model_key, | |
| "Total Tests": stats["total"], | |
| "Win Ratio": f"{win_ratio:.2%}", | |
| "Passed": f"{stats['passed']} ({stats['passed']/stats['total']*100:.1f}%)", | |
| "Failed": f"{stats['failed']} ({stats['failed']/stats['total']*100:.1f}%)", | |
| "Unclear": f"{stats['unclear']} ({stats['unclear']/stats['total']*100:.1f}%)", | |
| "Modes": ", ".join(sorted(stats["modes"])), | |
| "_weighted_score": weighted_score # Hidden column for sorting | |
| }) | |
| # Sort by weighted score (descending) | |
| stats_data.sort(key=lambda x: -x["_weighted_score"]) | |
| # Remove hidden column before creating DataFrame | |
| for item in stats_data: | |
| del item["_weighted_score"] | |
| stats_df = pd.DataFrame(stats_data) | |
| st.dataframe( | |
| stats_df, | |
| column_config={ | |
| "Model Configuration": st.column_config.TextColumn("Model Configuration", width=400), | |
| "Total Tests": st.column_config.NumberColumn("Total Tests", width="small"), | |
| "Win Ratio": st.column_config.TextColumn("Win Ratio", width="small"), | |
| "Passed": st.column_config.TextColumn("β Passed", width="small"), | |
| "Failed": st.column_config.TextColumn("β Failed", width="small"), | |
| "Unclear": st.column_config.TextColumn("β οΈ Unclear", width="small"), | |
| "Modes": st.column_config.TextColumn("Mode", width="small") | |
| }, | |
| use_container_width=True, | |
| hide_index=True, | |
| height=600 | |
| ) | |
| def calculate_win_ratio(stats): | |
| """Calculate win ratio and confidence score based on number of tests""" | |
| total = stats["total"] | |
| passed = stats["passed"] | |
| # Calculate basic win ratio | |
| win_ratio = passed / total if total > 0 else 0 | |
| # Calculate confidence factor based on number of tests (sigmoid function) | |
| # This gives more weight to models with more tests while avoiding extreme scaling | |
| confidence_factor = 2 / (1 + math.exp(-0.1 * total)) - 1 # Will be between 0 and 1 | |
| # Final score combines win ratio with confidence factor | |
| weighted_score = win_ratio * confidence_factor | |
| return win_ratio, weighted_score | |
| if __name__ == "__main__": | |
| main() |