k-mktr's picture
Update app.py
2c00b80 verified
raw
history blame
45.9 kB
from openai import OpenAI
from datetime import datetime, timezone
import os
import json
from typing import List, Dict
import re
import streamlit as st
import pandas as pd
from pathlib import Path
import math
from io import BytesIO
from nc_py_api import Nextcloud, NextcloudException
NEXTCLOUD_URL = os.getenv("NEXTCLOUD_URL")
NEXTCLOUD_USERNAME = os.getenv("NEXTCLOUD_USERNAME")
NEXTCLOUD_PASSWORD = os.getenv("NEXTCLOUD_PASSWORD")
STATISTICS_FILENAME = "candle_test/candle_test_statistics.json"
def get_utc_timestamp():
"""Get current UTC timestamp in consistent format"""
return datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M:%S UTC")
def format_timestamp_for_display(timestamp_str: str) -> str:
"""Format timestamp for display, handling both UTC and non-UTC timestamps"""
if "UTC" not in timestamp_str:
# For backwards compatibility with old data
return f"{timestamp_str} (Local)"
return timestamp_str
def get_nextcloud_client():
"""Get Nextcloud client instance"""
nc = Nextcloud(
nextcloud_url=NEXTCLOUD_URL,
nc_auth_user=NEXTCLOUD_USERNAME,
nc_auth_pass=NEXTCLOUD_PASSWORD
)
# Check if file sharing capabilities are available
if nc.check_capabilities("files_sharing.api_enabled"):
st.warning("Warning: File sharing API is not enabled on the Nextcloud server")
return nc
def ensure_directory_exists():
"""Ensure the candle_test directory exists in Nextcloud"""
try:
nc = get_nextcloud_client()
# Check if directory exists
try:
nc.files.listdir("candle_test")
except NextcloudException as e:
if "404" in str(e):
# Create directory if it doesn't exist
nc.files.mkdir("candle_test")
except Exception as e:
st.error(f"Failed to ensure directory exists: {str(e)}")
def save_statistics_to_nextcloud(stats):
"""Save statistics to Nextcloud"""
try:
nc = get_nextcloud_client()
# Ensure directory exists
ensure_directory_exists()
# Convert statistics to JSON and then to bytes
json_data = json.dumps(stats, indent=2)
buf = BytesIO(json_data.encode('utf-8'))
buf.seek(0) # Reset buffer pointer to start
# Upload using stream for better performance
nc.files.upload_stream(STATISTICS_FILENAME, buf)
return True
except NextcloudException as e:
st.error(f"Nextcloud error while saving statistics: {str(e)}")
return False
except Exception as e:
st.error(f"Failed to save statistics to Nextcloud: {str(e)}")
return False
def load_statistics_from_nextcloud():
"""Load statistics from Nextcloud"""
try:
nc = get_nextcloud_client()
# Ensure directory exists
ensure_directory_exists()
# Create buffer for streaming download
buf = BytesIO()
try:
# Try to download the file using streaming
nc.files.download2stream(STATISTICS_FILENAME, buf)
buf.seek(0) # Reset buffer pointer to start
return json.loads(buf.getvalue().decode('utf-8'))
except NextcloudException as e:
if "404" in str(e): # File doesn't exist yet
# Initialize empty statistics file
empty_stats = []
save_statistics_to_nextcloud(empty_stats)
return empty_stats
raise # Re-raise if it's a different error
except NextcloudException as e:
st.error(f"Nextcloud error while loading statistics: {str(e)}")
return []
except Exception as e:
st.error(f"Failed to load statistics from Nextcloud: {str(e)}")
return []
def check_statistics_exists():
"""Check if statistics file exists in Nextcloud"""
try:
nc = get_nextcloud_client()
# Ensure directory exists
ensure_directory_exists()
# Use find to check if file exists
result = nc.files.find(["eq", "name", "candle_test_statistics.json"])
return len(result) > 0
except Exception:
return False
def save_results(result):
"""Save essential test results to statistics file and sync with cloud"""
# Generate a unique identifier for this test
model_name = result["model"].replace("/", "_")
temp = f"{result['temperature']:.1f}"
timestamp = get_utc_timestamp()
test_id = f"{timestamp.replace(' ', '_').replace(':', '-')}_{model_name}_temp{temp}"
try:
# Load existing data from cloud
nc = get_nextcloud_client()
buf = BytesIO()
try:
# Try to download existing file
nc.files.download2stream(STATISTICS_FILENAME, buf)
buf.seek(0)
stats = json.loads(buf.getvalue().decode('utf-8'))
except NextcloudException as e:
if "404" in str(e): # File doesn't exist yet
stats = []
else:
raise
# Check if this test already exists
is_duplicate = any(
s.get("test_id") == test_id or (
s["model"] == result["model"] and
s["temperature"] == result["temperature"] and
s["timestamp"] == timestamp
)
for s in stats
)
if not is_duplicate:
# Store only essential data
essential_result = {
"test_id": test_id,
"timestamp": timestamp,
"model": result["model"],
"temperature": result["temperature"],
"max_tokens": result.get("max_tokens", 1024), # Include max_tokens if available
"mode": result["mode"],
"responses": result["responses"],
"evaluation": result["evaluation"],
"notes": result.get("notes", "")
}
# Append new result to existing data
stats.append(essential_result)
# Convert updated statistics to JSON and then to bytes
json_data = json.dumps(stats, indent=2)
upload_buf = BytesIO(json_data.encode('utf-8'))
upload_buf.seek(0)
# Ensure directory exists before upload
ensure_directory_exists()
# Upload using stream for better performance
nc.files.upload_stream(STATISTICS_FILENAME, upload_buf)
st.session_state.last_cloud_sync = datetime.now(timezone.utc)
return True
return False
except Exception as e:
st.error(f"Failed to save results: {str(e)}")
return False
def load_results():
"""Load results from statistics file"""
return load_statistics_from_nextcloud()
def load_statistics():
"""Load all test statistics from cloud"""
try:
nc = get_nextcloud_client()
buf = BytesIO()
try:
# Try to download the file using streaming
nc.files.download2stream(STATISTICS_FILENAME, buf)
buf.seek(0) # Reset buffer pointer to start
stats = json.loads(buf.getvalue().decode('utf-8'))
st.session_state.last_cloud_sync = datetime.now(timezone.utc)
return stats
except NextcloudException as e:
if "404" in str(e): # File doesn't exist yet
return []
raise # Re-raise if it's a different error
except Exception as e:
st.error(f"Failed to load statistics: {str(e)}")
return []
def ensure_directories():
"""This function is kept for compatibility but does nothing now"""
pass
def get_result_files():
"""Get lists of all result files"""
return {
'json': sorted(Path("results/json").glob('*.json')),
'markdown': sorted(Path("results/markdown").glob('*.md')),
}
def clean_old_results():
"""Clean up old results and remove duplicates"""
stats = load_statistics()
# Keep track of unique test IDs
seen_tests = set()
unique_stats = []
for stat in stats:
test_id = stat.get("test_id")
if not test_id:
# Generate test_id for old entries
timestamp = stat["timestamp"].replace(" ", "_").replace(":", "-")
model_name = stat["model"].replace("/", "_")
temp = f"{stat['temperature']:.1f}"
test_id = f"{timestamp}_{model_name}_temp{temp}"
stat["test_id"] = test_id
if test_id not in seen_tests:
seen_tests.add(test_id)
unique_stats.append(stat)
# Save unique stats back
save_statistics_to_nextcloud(unique_stats)
# Clean up files
for file_path in Path("results/json").glob("*.json"):
if file_path.stem not in seen_tests:
file_path.unlink()
for file_path in Path("results/markdown").glob("*.md"):
if file_path.stem not in seen_tests:
file_path.unlink()
def format_response(response, mode):
"""Format response for display"""
if mode == "json":
try:
r_json = json.loads(response)
return f"```json\n{json.dumps(r_json, indent=2)}\n```"
except:
return f"```\n{response}\n```" # Still show in code block if invalid JSON
else:
return response # Return plain text for natural language responses
def create_markdown_output(result):
"""Generate markdown output from test result data"""
model = result["model"]
timestamp = result["timestamp"]
responses = result["responses"]
mode = result["mode"]
evaluation = result.get("evaluation", "")
markdown = f"""# πŸ” Candle Test Results
## Model Information
- **Model**: `{model}`
- **Test Date**: {timestamp}
- **Mode**: {mode}
{f"- **Evaluation**: {evaluation}" if evaluation else ""}
## Test Sequence and Responses
### Question 1: Initial Understanding
> πŸ’­ Are candles getting taller or shorter when they burn?
**Model's Response**:
{format_response(responses[0], mode)}
### Question 2: Confidence Check
> πŸ’­ Are you sure? Will you be able to recognize this fact in different circumstances?
**Model's Response**:
{format_response(responses[1], mode)}
### Question 3: The Riddle
> πŸ’­ Now, consider what you said above and solve the following riddle: I'm tall when I'm young, and I'm taller when I'm old. What am I?
**Model's Response**:
{format_response(responses[2], mode)}
"""
return markdown
def evaluate_candle_response(response: str) -> Dict[str, any]:
"""
Evaluate a response to determine if it correctly states that candles get shorter.
Returns:
Dict containing evaluation results
"""
response_lower = response.lower()
# Keywords indicating correct understanding
shorter_keywords = ['shorter', 'decrease', 'shrink', 'smaller', 'reduce', 'burn down', 'melt away']
incorrect_keywords = ['taller', 'higher', 'grow', 'increase', 'bigger']
# Check for correct understanding
has_correct_keywords = any(keyword in response_lower for keyword in shorter_keywords)
has_incorrect_keywords = any(keyword in response_lower for keyword in incorrect_keywords)
return {
'is_correct': has_correct_keywords and not has_incorrect_keywords,
'has_correct_keywords': has_correct_keywords,
'has_incorrect_keywords': has_incorrect_keywords,
'found_correct_keywords': [k for k in shorter_keywords if k in response_lower],
'found_incorrect_keywords': [k for k in incorrect_keywords if k in response_lower]
}
def evaluate_riddle_response(response: str) -> Dict[str, any]:
"""
Evaluate the riddle response to check for candle-related answers and identify alternatives.
Returns:
Dict containing evaluation results
"""
response_lower = response.lower()
# Common correct answers
correct_answers = [
'shadow', 'tree', 'plant', 'bamboo', 'person', 'human', 'child',
'building', 'tower', 'skyscraper'
]
# Check for candle-related answers
candle_patterns = [
r'\bcandle[s]?\b',
r'wax',
r'wick',
r'flame'
]
has_candle_reference = any(re.search(pattern, response_lower) for pattern in candle_patterns)
found_correct_answer = any(answer in response_lower for answer in correct_answers)
# Extract what the model thinks is the answer
answer_patterns = [
r"(?:the answer is|it's|is) (?:a |an )?([a-z]+)",
r"(?:a |an )?([a-z]+) (?:would be|is) the answer"
]
proposed_answer = None
for pattern in answer_patterns:
match = re.search(pattern, response_lower)
if match:
proposed_answer = match.group(1)
break
return {
'is_correct': not has_candle_reference,
'has_candle_reference': has_candle_reference,
'found_correct_answer': found_correct_answer,
'proposed_answer': proposed_answer,
'matches_known_answer': proposed_answer in correct_answers if proposed_answer else False
}
def evaluate_natural_language_test(responses: List[str]) -> Dict[str, any]:
"""
Evaluate the complete natural language test sequence.
"""
candle_eval = evaluate_candle_response(responses[0])
riddle_eval = evaluate_riddle_response(responses[2])
return {
'initial_understanding': candle_eval,
'riddle_response': riddle_eval,
'overall_score': sum([
candle_eval['is_correct'],
not riddle_eval['has_candle_reference']
]) / 2.0,
'passed_test': candle_eval['is_correct'] and not riddle_eval['has_candle_reference']
}
def evaluate_json_test(responses: List[Dict]) -> Dict[str, any]:
"""
Evaluate the complete JSON test sequence.
Expects responses in the format:
{
"reasoning": "step-by-step reasoning",
"answer": "concise answer"
}
"""
try:
# Parse each response and validate format
parsed_responses = []
for resp in responses:
if isinstance(resp, str):
resp = json.loads(resp)
if not isinstance(resp, dict) or 'reasoning' not in resp or 'answer' not in resp:
raise ValueError(f"Invalid response format: {resp}")
parsed_responses.append(resp)
# Evaluate initial understanding (first question)
candle_eval = evaluate_candle_response(parsed_responses[0]['answer'])
# Evaluate riddle response (third question)
riddle_eval = evaluate_riddle_response(parsed_responses[2]['answer'])
# Evaluate reasoning quality
reasoning_quality = []
for resp in parsed_responses:
reasoning = resp['reasoning'].lower()
reasoning_quality.append({
'has_reasoning': bool(reasoning.strip()),
'reasoning_length': len(reasoning.split()),
'is_detailed': len(reasoning.split()) > 10
})
return {
'initial_understanding': candle_eval,
'riddle_response': riddle_eval,
'reasoning_quality': reasoning_quality,
'overall_score': sum([
candle_eval['is_correct'],
not riddle_eval['has_candle_reference'],
all(rq['has_reasoning'] for rq in reasoning_quality)
]) / 3.0,
'passed_test': candle_eval['is_correct'] and not riddle_eval['has_candle_reference']
}
except (json.JSONDecodeError, KeyError, ValueError) as e:
return {
'error': f"Failed to evaluate response: {str(e)}",
'passed_test': False,
'overall_score': 0.0
}
def create_markdown_report(model_name: str, responses: List[str], json_responses: List[Dict]) -> str:
"""Create a markdown report of the test results with enhanced formatting."""
# Evaluate both test versions
nl_evaluation = evaluate_natural_language_test(responses)
json_evaluation = evaluate_json_test(json_responses)
report = f"""# πŸ” Candle Test Results
## Model Information
- **Model**: `{model_name}`
- **Test Date**: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
## Test Sequence and Responses
### Natural Language Test
#### Question 1: Initial Understanding
> πŸ’­ Are candles getting taller or shorter when they burn?
**Model's Response**:
```
{responses[0]}
```
**Evaluation**:
- Correct Understanding: {'βœ…' if nl_evaluation['initial_understanding']['is_correct'] else '❌'}
- Found Keywords: {', '.join(nl_evaluation['initial_understanding']['found_correct_keywords'])}
{f"- Incorrect Keywords Found: {', '.join(nl_evaluation['initial_understanding']['found_incorrect_keywords'])}" if nl_evaluation['initial_understanding']['found_incorrect_keywords'] else ''}
#### Question 2: Confidence Check
> πŸ’­ Are you sure? Will you be able to recognize this fact in different circumstances?
**Model's Response**:
```
{responses[1]}
```
#### Question 3: The Riddle
> πŸ’­ Now, consider what you said above and solve the following riddle: I'm tall when I'm young, and I'm taller when I'm old. What am I?
**Model's Response**:
```
{responses[2]}
```
**Evaluation**:
- Avoided Candle Reference: {'βœ…' if not nl_evaluation['riddle_response']['has_candle_reference'] else '❌'}
- Proposed Answer: {nl_evaluation['riddle_response']['proposed_answer'] or 'Not clearly stated'}
- Matches Known Good Answer: {'βœ…' if nl_evaluation['riddle_response']['matches_known_answer'] else '❌'}
### Structured JSON Test
#### Question 1: Initial Understanding
**Model's Response**:
```json
{json_responses[0]}
```
**Evaluation**:
- Correct Understanding: {'βœ…' if json_evaluation['initial_understanding']['is_correct'] else '❌'}
- Found Keywords: {', '.join(json_evaluation['initial_understanding']['found_correct_keywords'])}
{f"- Incorrect Keywords Found: {', '.join(json_evaluation['initial_understanding']['found_incorrect_keywords'])}" if json_evaluation['initial_understanding']['found_incorrect_keywords'] else ''}
#### Question 2: Confidence Check
**Model's Response**:
```json
{json_responses[1]}
```
#### Question 3: The Riddle
**Model's Response**:
```json
{json_responses[2]}
```
**Evaluation**:
- Avoided Candle Reference: {'βœ…' if not json_evaluation['riddle_response']['has_candle_reference'] else '❌'}
- Proposed Answer: {json_evaluation['riddle_response']['proposed_answer'] or 'Not clearly stated'}
- Matches Known Good Answer: {'βœ…' if json_evaluation['riddle_response']['matches_known_answer'] else '❌'}
## Analysis
### Test Scores
| Test Version | Overall Score | Passed Test |
|--------------|--------------|-------------|
| Natural Language | {nl_evaluation['overall_score']:.2f} | {'βœ…' if nl_evaluation['passed_test'] else '❌'} |
| JSON Format | {json_evaluation['overall_score']:.2f} | {'βœ…' if json_evaluation['passed_test'] else '❌'} |
### Reasoning Quality (JSON Format)
| Question | Has Reasoning | Words | Confidence |
|----------|--------------|-------|------------|
| Q1 | {'βœ…' if json_evaluation['reasoning_quality'][0]['has_reasoning'] else '❌'} | {json_evaluation['reasoning_quality'][0]['reasoning_length']} | {json_evaluation['reasoning_quality'][0]['confidence_level']} |
| Q2 | {'βœ…' if json_evaluation['reasoning_quality'][1]['has_reasoning'] else '❌'} | {json_evaluation['reasoning_quality'][1]['reasoning_length']} | {json_evaluation['reasoning_quality'][1]['confidence_level']} |
| Q3 | {'βœ…' if json_evaluation['reasoning_quality'][2]['has_reasoning'] else '❌'} | {json_evaluation['reasoning_quality'][2]['reasoning_length']} | {json_evaluation['reasoning_quality'][2]['confidence_level']} |
---
*This test evaluates the model's ability to maintain context and avoid overfitting in a simple reasoning task.*
"""
return report
def run_candle_test(client, model, mode="natural", temperature=0.7, max_tokens=1024):
"""Run the candle test in either natural language or JSON mode"""
questions = [
"Are candles getting taller or shorter when they burn?",
"Are you sure? Will you be able to recognize this fact in different circumstances?",
"Now, consider what you said above and solve the following riddle: I'm tall when I'm young, and I'm taller when I'm old. What am I?"
]
if mode == "json":
messages = [
{
"role": "system",
"content": """You are a helpful assistant that provides answers in a specific JSON format.
Your responses must strictly follow this template:
{
"reasoning": "Your step-by-step reasoning about the answer",
"answer": "Your concise final answer"
}
Always provide both the reasoning and answer fields. Keep the reasoning clear and focused, and the answer concise."""
}
]
# Use OpenAI's native JSON mode
response_format = { "type": "json_object" }
else:
messages = [
{
"role": "system",
"content": "You are a helpful assistant that answers questions directly and concisely."
}
]
response_format = None
responses = []
for question in questions:
messages.append({"role": "user", "content": question})
completion = client.chat.completions.create(
model=model,
messages=messages,
temperature=temperature,
max_tokens=max_tokens,
response_format=response_format if mode == "json" else None
)
response = completion.choices[0].message.content
responses.append(response)
messages.append({"role": "assistant", "content": response})
# Create result dictionary
result = {
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"model": model,
"temperature": temperature,
"max_tokens": max_tokens,
"mode": mode,
"responses": responses
}
return responses
def evaluate_json_response(responses):
"""Automatically evaluate JSON mode responses"""
try:
# Parse the third response (riddle answer)
final_response = json.loads(responses[2])
# Check if answer key exists and is not empty
if 'answer' not in final_response or not final_response.get('answer', '').strip():
return "❌ FAILED - Missing or empty answer in JSON response"
# Get answer and reasoning text
answer_text = final_response.get('answer', '').lower()
reasoning_text = final_response.get('reasoning', '').lower()
# Check for unclear case - contains both candle and valid answers
valid_answers = ['human', 'tree', 'shadow', 'plant', 'bamboo', 'person', 'child', 'building', 'tower', 'skyscraper']
has_candle = 'candle' in answer_text
has_valid_answer = any(answer in answer_text for answer in valid_answers)
if has_candle and has_valid_answer:
return "⚠️ UNCLEAR - Mixed response with both candle and valid answer"
# Check for candle in answer
if 'candle' in answer_text:
return "❌ FAILED - Mentioned candle in riddle answer"
return "βœ… PASSED - Avoided mentioning candle in riddle"
except json.JSONDecodeError:
return "❌ FAILED - Invalid JSON format"
except Exception as e:
return f"❌ FAILED - Error processing response: {str(e)}"
def setup_sidebar():
"""Setup sidebar configuration"""
st.sidebar.header("Configuration")
# API settings with help tooltips
st.sidebar.subheader("API Settings")
st.session_state.api_base = st.sidebar.text_input(
"API Base URL",
value="https://openrouter.ai/api/v1",
help="The base URL for your API endpoint. Supports any OpenAI-compatible API.",
key="api_base_input"
)
st.session_state.api_key = st.sidebar.text_input(
"API Key",
type="password",
help="Your API key for authentication. Keep this secure!",
key="api_key_input"
)
# Model settings
st.sidebar.subheader("Model Settings")
new_models = st.sidebar.text_area(
"Models to Test",
placeholder="Enter models (one per line)",
help="Enter model identifiers, one per line. Supports any OpenAI-compatible model identifier.",
key="models_input"
)
# Update models list when input changes
if st.sidebar.button("Update Models List", help="Click to update the list of models to test", key="update_models_btn"):
if new_models:
models_list = [model.strip() for model in new_models.split('\n') if model.strip()]
st.session_state.models = models_list
st.rerun()
# Display current models list
if st.session_state.models:
with st.sidebar.expander("πŸ“‹ Current Models Queue", expanded=True):
st.write(f"**{len(st.session_state.models)} models in queue:**")
for i, model in enumerate(st.session_state.models, 1):
st.code(f"{i}. {model}", language=None)
if st.sidebar.button("Clear Queue", help="Remove all models from the queue", key="clear_queue_btn"):
st.session_state.models = []
st.rerun()
# Model generation settings
st.sidebar.subheader("Generation Settings")
# Temperature settings
st.session_state.temperature = st.sidebar.slider(
"Temperature",
min_value=0.0,
max_value=2.0,
value=0.7,
step=0.1,
help="Controls randomness in responses. Lower values are more deterministic, higher values more creative.",
key="temperature_slider"
)
# Max tokens settings
st.session_state.max_tokens = st.sidebar.slider(
"Max Tokens",
min_value=256,
max_value=4096,
value=1024,
step=256,
help="Maximum number of tokens to generate in the response. Higher values allow longer responses but may take more time.",
key="max_tokens_slider"
)
# Test mode with explanation
st.session_state.mode = st.sidebar.radio(
"Response Format",
["natural - manual eval", "json - auto eval"],
help=("Choose how the model should respond:\n"
"- Natural: Free-form text responses\n"
"- JSON: Structured responses with reasoning and confidence"),
key="mode_radio"
)
# Add separator before the run button
st.sidebar.markdown("---")
# Run Test button
if st.session_state.test_state == 'ready':
if not st.session_state.models:
st.sidebar.warning("Add at least one model to test")
else:
test_button_label = f"πŸš€ Run Test on {len(st.session_state.models)} Model{'s' if len(st.session_state.models) > 1 else ''}"
if st.sidebar.button(
test_button_label,
use_container_width=True,
help=f"Start testing {len(st.session_state.models)} selected models",
key="run_test_btn"
):
if not st.session_state.api_key:
st.error("Please enter an API key in the sidebar")
return
try:
client = OpenAI(
base_url=st.session_state.api_base,
api_key=st.session_state.api_key
)
# Run tests for all selected models
all_responses = []
total_models = len(st.session_state.models)
# Create a progress container
progress_container = st.empty()
progress_bar = st.progress(0)
for i, model in enumerate(st.session_state.models):
# Update progress
progress = (i + 1) / total_models
progress_bar.progress(progress)
progress_container.text(f"Testing model {i + 1}/{total_models}: {model}")
try:
with st.spinner(f"Running test for {model}..."):
responses = run_candle_test(
client,
model,
mode=st.session_state.mode,
temperature=st.session_state.temperature,
max_tokens=st.session_state.max_tokens
)
# Create complete response object with all required fields
all_responses.append({
'model': model,
'responses': responses,
'timestamp': get_utc_timestamp(),
'temperature': st.session_state.temperature,
'max_tokens': st.session_state.max_tokens,
'mode': st.session_state.mode,
'status': 'success'
})
except Exception as model_error:
# Handle individual model failures
all_responses.append({
'model': model,
'timestamp': get_utc_timestamp(),
'temperature': st.session_state.temperature,
'max_tokens': st.session_state.max_tokens,
'mode': st.session_state.mode,
'status': 'error',
'error': str(model_error)
})
st.warning(f"Failed to test {model}: {str(model_error)}")
# Clear progress indicators
progress_container.empty()
progress_bar.empty()
st.session_state.responses = all_responses
st.session_state.test_state = 'testing'
st.rerun()
except Exception as e:
st.error(f"Error: {str(e)}")
return
def test_tab():
"""Content for the Test tab"""
st.title("πŸ•―οΈ The Candle Test")
if st.session_state.test_state == 'testing':
# Display results for all tested models
evaluations = []
for response in st.session_state.responses:
with st.expander(f"Results for {response['model']}", expanded=True):
if response['status'] == 'success':
# Create markdown output using the response data
markdown = create_markdown_output(response)
st.markdown(markdown)
# Automatic evaluation for JSON mode
if st.session_state.mode == "json":
evaluation = evaluate_json_response(response['responses'])
st.info("πŸ€– Automatic Evaluation (JSON mode)")
st.write(evaluation)
notes = "Automatically evaluated in JSON mode"
else:
# Manual evaluation for natural language mode
st.subheader("πŸ“ Evaluate Results")
evaluation = st.radio(
f"How did {response['model']} perform?",
["βœ… PASSED - Avoided mentioning candle in riddle",
"❌ FAILED - Mentioned candle in riddle",
"⚠️ UNCLEAR - Needs discussion"],
key=f"eval_{response['model']}"
)
notes = st.text_area(
"Additional Notes (optional)",
"",
key=f"notes_{response['model']}"
)
# Collect evaluation data
evaluations.append({
"timestamp": get_utc_timestamp(),
"model": response['model'],
"temperature": st.session_state.temperature,
"max_tokens": st.session_state.max_tokens,
"mode": st.session_state.mode,
"responses": response['responses'],
"evaluation": evaluation,
"notes": notes
})
else:
st.error(f"Test failed: {response['error']}")
# Add a "Save All" button at the bottom
button_text = "βœ… Save Results" if st.session_state.mode == "json" else "βœ… Complete Evaluation"
if st.button(button_text, use_container_width=True):
# Save all evaluations at once
for result in evaluations:
save_results(result)
st.session_state.test_state = 'evaluated'
st.rerun()
elif st.session_state.test_state == 'evaluated':
st.success("βœ… Test results have been saved!")
# Create two equal columns for the buttons
col1, col2 = st.columns(2)
# Style the buttons with custom CSS
st.markdown("""
<style>
.stButton>button {
width: 100%;
height: 3em;
font-size: 1.2em;
border-radius: 10px;
margin: 0.5em 0;
}
</style>
""", unsafe_allow_html=True)
with col1:
if st.button("πŸ”„ Run New Test", use_container_width=True):
st.session_state.test_state = 'ready'
st.session_state.responses = None
st.session_state.current_markdown = None
st.rerun()
with col2:
if st.button("πŸ“Š View Comparison", use_container_width=True):
js = f"""
<script>
// Get all tabs
var tabs = window.parent.document.querySelectorAll('[data-baseweb="tab"]');
// Click the second tab (index 1) for Results Comparison
tabs[1].click();
</script>
"""
st.components.v1.html(js)
else:
# Show explanation and image only when no test is running or completed
# Display the cover image
st.image("https://i.redd.it/6phgn27rqfse1.jpeg", caption="The Candle Test")
st.markdown("""
## About The Candle Test
The Candle Test is a simple yet effective way to evaluate an LLM's ability to maintain context and avoid overfitting.
It was originally proposed by [u/Everlier on Reddit](https://www.reddit.com/r/LocalLLaMA/comments/1jpr1nk/the_candle_test_most_llms_fail_to_generalise_at/).
This implementation supports any OpenAI-compatible endpoint, allowing you to test models from various providers including:
- OpenAI
- Anthropic
- OpenRouter
- Local models (through compatible APIs)
- And more!
### What is it testing?
The test evaluates whether a language model can:
1. πŸ€” Understand a basic fact (candles get shorter as they burn)
2. 🧠 Hold this fact in context
3. 🎯 Avoid overfitting when presented with a riddle that seems to match the context
### Why is it important?
This test reveals how well models can:
- Maintain contextual understanding
- Avoid falling into obvious pattern-matching traps
- Apply knowledge flexibly in different scenarios
### The Test Sequence
1. First, we ask if candles get taller or shorter when burning
2. Then, we confirm the model's understanding
3. Finally, we present a riddle: "I'm tall when I'm young, and I'm taller when I'm old. What am I?"
A model that mentions "candle" in the riddle's answer demonstrates a failure to generalize and a tendency to overfit to the immediate context.
### Credit
This test was created by [u/Everlier](https://www.reddit.com/user/Everlier/). You can find the original discussion [here](https://www.reddit.com/r/LocalLLaMA/comments/1jpr1nk/the_candle_test_most_llms_fail_to_generalise_at/).
""")
def main():
# Set wide mode
st.set_page_config(
page_title="The Candle Test",
page_icon="πŸ•―οΈ",
layout="wide"
)
# Initialize all session states
if 'models' not in st.session_state:
st.session_state.models = []
if 'test_state' not in st.session_state:
st.session_state.test_state = 'ready'
if 'responses' not in st.session_state:
st.session_state.responses = None
if 'current_markdown' not in st.session_state:
st.session_state.current_markdown = None
if 'api_base' not in st.session_state:
st.session_state.api_base = "https://openrouter.ai/api/v1"
if 'api_key' not in st.session_state:
st.session_state.api_key = None
if 'temperature' not in st.session_state:
st.session_state.temperature = 0.7
if 'max_tokens' not in st.session_state:
st.session_state.max_tokens = 1024
if 'mode' not in st.session_state:
st.session_state.mode = "natural"
if 'selected_tab' not in st.session_state:
st.session_state.selected_tab = 0
if 'last_cloud_sync' not in st.session_state:
st.session_state.last_cloud_sync = None
# Setup sidebar (consistent across all tabs)
setup_sidebar()
# Create tabs
tab1, tab2, tab3 = st.tabs(["πŸ§ͺ Run Test", "πŸ“Š Results Comparison", "πŸ“š Results Browser"])
# Show content based on selected tab
with tab1:
test_tab()
with tab2:
results_tab()
with tab3:
results_browser_tab()
def results_browser_tab():
"""Content for the Results Browser tab"""
st.title("πŸ“š Results Browser")
# Load all results
results = load_statistics()
if not results:
st.info("No test results available yet. Run some tests first!")
return
# Sort results by timestamp (newest first)
results.sort(key=lambda x: x.get("timestamp", ""), reverse=True)
# Add export functionality
st.download_button(
label="πŸ“₯ Export All Results",
data=json.dumps(results, indent=2),
file_name="candle_test_results.json",
mime="application/json",
help="Download all test results as a JSON file",
key="export_all_btn"
)
# Add detailed browsing functionality
st.subheader("Browse Test Results")
# Filter options
col1, col2, col3 = st.columns(3)
with col1:
model_filter = st.multiselect(
"Filter by Model",
options=sorted(set(r["model"] for r in results)),
key="model_filter"
)
with col2:
temp_filter = st.multiselect(
"Filter by Temperature",
options=sorted(set(r["temperature"] for r in results)),
key="temp_filter"
)
with col3:
eval_filter = st.multiselect(
"Filter by Evaluation",
options=["βœ… PASSED", "❌ FAILED", "⚠️ UNCLEAR"],
key="eval_filter"
)
# Apply filters
if model_filter or temp_filter or eval_filter:
filtered_results = results
if model_filter:
filtered_results = [r for r in filtered_results if r["model"] in model_filter]
if temp_filter:
filtered_results = [r for r in filtered_results if r["temperature"] in temp_filter]
if eval_filter:
filtered_results = [r for r in filtered_results if any(e in r["evaluation"] for e in eval_filter)]
else:
# If no filters applied, show only last 5 results
filtered_results = results[:5]
if len(results) > 5:
st.info("ℹ️ Showing last 5 results. Use filters above to see more results.")
# Display results
for result in filtered_results:
with st.expander(f"{result['timestamp']} - {result['model']} (temp={result['temperature']}) - {result['evaluation']}", expanded=False):
st.markdown(create_markdown_output(result))
if result.get("notes"):
st.write("**Notes:**", result["notes"])
# Add individual result export
st.download_button(
label="πŸ“₯ Export This Result",
data=json.dumps(result, indent=2),
file_name=f"candle_test_{result['test_id']}.json",
mime="application/json",
key=f"export_{result['test_id']}"
)
def results_tab():
"""Content for the Results Comparison tab"""
st.title("πŸ“Š Results Comparison")
# Add cloud sync status and refresh button
col1, col2 = st.columns([3, 1])
with col1:
if st.session_state.last_cloud_sync:
st.info(f"Last synced with cloud: {st.session_state.last_cloud_sync.strftime('%Y-%m-%d %H:%M:%S UTC')}")
else:
st.warning("Not synced with cloud yet")
with col2:
if st.button("πŸ”„ Refresh Results"):
with st.spinner("Syncing with cloud..."):
load_statistics_from_nextcloud()
st.session_state.last_cloud_sync = datetime.now(timezone.utc)
st.rerun()
# Load results from cloud
results = load_statistics()
if not results:
st.info("No test results available yet. Run some tests first!")
return
# Calculate statistics per model+temperature combination
model_stats = {}
for result in results:
# Create unique key for model+temperature combination
model_key = f"{result['model']} (temp={result['temperature']:.1f})"
if model_key not in model_stats:
model_stats[model_key] = {
"total": 0,
"passed": 0,
"failed": 0,
"unclear": 0,
"modes": set()
}
# Update statistics for this configuration
stats = model_stats[model_key]
stats["total"] += 1
stats["modes"].add(result["mode"])
if "βœ…" in result["evaluation"]:
stats["passed"] += 1
elif "❌" in result["evaluation"]:
stats["failed"] += 1
else:
stats["unclear"] += 1
# Create statistics table with win ratio
stats_data = []
for model_key, stats in model_stats.items():
win_ratio, weighted_score = calculate_win_ratio(stats)
stats_data.append({
"Model Configuration": model_key,
"Total Tests": stats["total"],
"Win Ratio": f"{win_ratio:.2%}",
"Passed": f"{stats['passed']} ({stats['passed']/stats['total']*100:.1f}%)",
"Failed": f"{stats['failed']} ({stats['failed']/stats['total']*100:.1f}%)",
"Unclear": f"{stats['unclear']} ({stats['unclear']/stats['total']*100:.1f}%)",
"Modes": ", ".join(sorted(stats["modes"])),
"_weighted_score": weighted_score # Hidden column for sorting
})
# Sort by weighted score (descending)
stats_data.sort(key=lambda x: -x["_weighted_score"])
# Remove hidden column before creating DataFrame
for item in stats_data:
del item["_weighted_score"]
stats_df = pd.DataFrame(stats_data)
st.dataframe(
stats_df,
column_config={
"Model Configuration": st.column_config.TextColumn("Model Configuration", width=400),
"Total Tests": st.column_config.NumberColumn("Total Tests", width="small"),
"Win Ratio": st.column_config.TextColumn("Win Ratio", width="small"),
"Passed": st.column_config.TextColumn("βœ… Passed", width="small"),
"Failed": st.column_config.TextColumn("❌ Failed", width="small"),
"Unclear": st.column_config.TextColumn("⚠️ Unclear", width="small"),
"Modes": st.column_config.TextColumn("Mode", width="small")
},
use_container_width=True,
hide_index=True,
height=600
)
def calculate_win_ratio(stats):
"""Calculate win ratio and confidence score based on number of tests"""
total = stats["total"]
passed = stats["passed"]
# Calculate basic win ratio
win_ratio = passed / total if total > 0 else 0
# Calculate confidence factor based on number of tests (sigmoid function)
# This gives more weight to models with more tests while avoiding extreme scaling
confidence_factor = 2 / (1 + math.exp(-0.1 * total)) - 1 # Will be between 0 and 1
# Final score combines win ratio with confidence factor
weighted_score = win_ratio * confidence_factor
return win_ratio, weighted_score
if __name__ == "__main__":
main()