Hoghoghi / tests /test_enhanced_analytics.py
Really-amin's picture
Upload 143 files
c636ebf verified
raw
history blame
20.7 kB
#!/usr/bin/env python3
"""
Enhanced Analytics Test Suite
============================
Comprehensive test suite for the enhanced analytics system including:
- Real-time metrics validation
- Trend analysis testing
- Predictive insights verification
- System health monitoring
- Document clustering analysis
- Quality assessment testing
"""
from app.services.cache_service import cache_service
from app.services.ai_service import AIScoringEngine
from app.services.database_service import DatabaseManager
from app.services.advanced_analytics_service import AdvancedAnalyticsService
import asyncio
import sys
import os
import json
import time
from datetime import datetime, timedelta
from typing import Dict, List, Any
# Add the app directory to the path
sys.path.append(os.path.join(os.path.dirname(__file__), 'app'))
class EnhancedAnalyticsTester:
"""Comprehensive tester for enhanced analytics features"""
def __init__(self):
self.analytics_service = AdvancedAnalyticsService()
self.db_manager = DatabaseManager()
self.ai_engine = AIScoringEngine()
self.test_results = {
"total_tests": 0,
"passed": 0,
"failed": 0,
"errors": []
}
async def run_all_tests(self):
"""Run all enhanced analytics tests"""
print("🚀 Enhanced Analytics Test Suite")
print("=" * 50)
try:
# Test real-time metrics
await self.test_real_time_metrics()
# Test trend analysis
await self.test_trend_analysis()
# Test predictive insights
await self.test_predictive_insights()
# Test document clustering
await self.test_document_clustering()
# Test quality assessment
await self.test_quality_assessment()
# Test system health monitoring
await self.test_system_health()
# Test similarity analysis
await self.test_similarity_analysis()
# Test cache performance
await self.test_cache_performance()
# Generate test report
self.generate_test_report()
except Exception as e:
print(f"❌ Test suite failed: {e}")
import traceback
traceback.print_exc()
async def test_real_time_metrics(self):
"""Test real-time metrics functionality"""
print("\n📊 Testing Real-time Metrics...")
try:
metrics = await self.analytics_service.get_real_time_metrics()
# Validate metrics structure
required_fields = [
'total_documents', 'processed_today', 'avg_processing_time',
'success_rate', 'error_rate', 'cache_hit_rate',
'quality_score', 'system_health'
]
for field in required_fields:
if not hasattr(metrics, field):
raise ValueError(f"Missing required field: {field}")
# Validate metric ranges
assert 0 <= metrics.total_documents, "Total documents should be non-negative"
assert 0 <= metrics.processed_today, "Processed today should be non-negative"
assert 0 <= metrics.avg_processing_time, "Average processing time should be non-negative"
assert 0 <= metrics.success_rate <= 100, "Success rate should be between 0-100"
assert 0 <= metrics.error_rate <= 100, "Error rate should be between 0-100"
assert 0 <= metrics.cache_hit_rate <= 100, "Cache hit rate should be between 0-100"
assert 0 <= metrics.quality_score <= 1, "Quality score should be between 0-1"
assert 0 <= metrics.system_health <= 100, "System health should be between 0-100"
print(f"✅ Real-time metrics test passed")
print(f" - Total documents: {metrics.total_documents}")
print(f" - Processed today: {metrics.processed_today}")
print(f" - System health: {metrics.system_health:.1f}%")
self.test_results["passed"] += 1
except Exception as e:
print(f"❌ Real-time metrics test failed: {e}")
self.test_results["failed"] += 1
self.test_results["errors"].append(f"Real-time metrics: {e}")
self.test_results["total_tests"] += 1
async def test_trend_analysis(self):
"""Test trend analysis functionality"""
print("\n📈 Testing Trend Analysis...")
try:
# Test different metrics and time periods
test_cases = [
{"metric": "processing_time", "time_period": "7d"},
{"metric": "quality_score", "time_period": "30d"},
{"metric": "document_volume", "time_period": "90d"}
]
for test_case in test_cases:
trend_data = await self.analytics_service.analyze_trends(
metric=test_case["metric"],
time_period=test_case["time_period"]
)
# Validate trend data structure
assert hasattr(trend_data, 'period'), "Missing period field"
assert hasattr(trend_data, 'metric'), "Missing metric field"
assert hasattr(trend_data, 'values'), "Missing values field"
assert hasattr(
trend_data, 'timestamps'), "Missing timestamps field"
assert hasattr(
trend_data, 'trend_direction'), "Missing trend_direction field"
assert hasattr(
trend_data, 'change_percentage'), "Missing change_percentage field"
assert hasattr(
trend_data, 'confidence'), "Missing confidence field"
# Validate trend direction
assert trend_data.trend_direction in ['up', 'down', 'stable'], \
f"Invalid trend direction: {trend_data.trend_direction}"
# Validate confidence range
assert 0 <= trend_data.confidence <= 100, \
f"Confidence should be between 0-100, got: {trend_data.confidence}"
print(f" ✅ {test_case['metric']} ({test_case['time_period']}): "
f"{trend_data.trend_direction} ({trend_data.change_percentage:.1f}%)")
print("✅ Trend analysis test passed")
self.test_results["passed"] += 1
except Exception as e:
print(f"❌ Trend analysis test failed: {e}")
self.test_results["failed"] += 1
self.test_results["errors"].append(f"Trend analysis: {e}")
self.test_results["total_tests"] += 1
async def test_predictive_insights(self):
"""Test predictive insights functionality"""
print("\n🔮 Testing Predictive Insights...")
try:
insights = await self.analytics_service.generate_predictive_insights()
# Validate insights structure
required_sections = ['patterns', 'predictions',
'confidence_intervals', 'recommendations']
for section in required_sections:
assert section in insights, f"Missing section: {section}"
# Validate predictions
predictions = insights.get('predictions', {})
if predictions:
assert isinstance(
predictions, dict), "Predictions should be a dictionary"
# Check for expected prediction fields
expected_fields = ['peak_hours', 'expected_volume',
'processing_time_forecast', 'quality_forecast']
for field in expected_fields:
if field in predictions:
assert isinstance(predictions[field], (int, float, list)), \
f"Prediction field {field} should be numeric or list"
# Validate recommendations
recommendations = insights.get('recommendations', [])
assert isinstance(
recommendations, list), "Recommendations should be a list"
print(f"✅ Predictive insights test passed")
print(
f" - Patterns analyzed: {len(insights.get('patterns', {}))}")
print(f" - Recommendations generated: {len(recommendations)}")
self.test_results["passed"] += 1
except Exception as e:
print(f"❌ Predictive insights test failed: {e}")
self.test_results["failed"] += 1
self.test_results["errors"].append(f"Predictive insights: {e}")
self.test_results["total_tests"] += 1
async def test_document_clustering(self):
"""Test document clustering functionality"""
print("\n🔗 Testing Document Clustering...")
try:
# Test clustering with different parameters
test_cases = [
{"n_clusters": 3, "category": None},
{"n_clusters": 5, "category": "legal"},
{"n_clusters": 2, "category": "contract"}
]
for test_case in test_cases:
clustering_result = await self.analytics_service.cluster_documents(
n_clusters=test_case["n_clusters"],
category=test_case["category"]
)
# Validate clustering result structure
assert 'clusters' in clustering_result, "Missing clusters field"
assert 'centroids' in clustering_result, "Missing centroids field"
assert 'silhouette_score' in clustering_result, "Missing silhouette_score field"
assert 'total_documents' in clustering_result, "Missing total_documents field"
# Validate silhouette score range
silhouette_score = clustering_result['silhouette_score']
assert -1 <= silhouette_score <= 1, \
f"Silhouette score should be between -1 and 1, got: {silhouette_score}"
# Validate clusters
clusters = clustering_result['clusters']
assert isinstance(
clusters, dict), "Clusters should be a dictionary"
print(f" ✅ {test_case['n_clusters']} clusters "
f"(silhouette: {silhouette_score:.3f})")
print("✅ Document clustering test passed")
self.test_results["passed"] += 1
except Exception as e:
print(f"❌ Document clustering test failed: {e}")
self.test_results["failed"] += 1
self.test_results["errors"].append(f"Document clustering: {e}")
self.test_results["total_tests"] += 1
async def test_quality_assessment(self):
"""Test quality assessment functionality"""
print("\n🏆 Testing Quality Assessment...")
try:
quality_report = await self.analytics_service.generate_quality_report()
# Validate quality report structure
required_fields = [
'overall_quality_score', 'quality_distribution', 'common_issues',
'recommendations', 'quality_trends', 'improvement_opportunities'
]
for field in required_fields:
assert field in quality_report, f"Missing field: {field}"
# Validate quality score range
overall_score = quality_report['overall_quality_score']
assert 0 <= overall_score <= 1, \
f"Overall quality score should be between 0-1, got: {overall_score}"
# Validate quality distribution
quality_dist = quality_report['quality_distribution']
assert isinstance(
quality_dist, dict), "Quality distribution should be a dictionary"
# Validate common issues
common_issues = quality_report['common_issues']
assert isinstance(
common_issues, list), "Common issues should be a list"
# Validate recommendations
recommendations = quality_report['recommendations']
assert isinstance(
recommendations, list), "Recommendations should be a list"
print(f"✅ Quality assessment test passed")
print(f" - Overall quality: {overall_score:.1%}")
print(f" - Common issues: {len(common_issues)}")
print(f" - Recommendations: {len(recommendations)}")
self.test_results["passed"] += 1
except Exception as e:
print(f"❌ Quality assessment test failed: {e}")
self.test_results["failed"] += 1
self.test_results["errors"].append(f"Quality assessment: {e}")
self.test_results["total_tests"] += 1
async def test_system_health(self):
"""Test system health monitoring"""
print("\n💚 Testing System Health Monitoring...")
try:
# Get real-time metrics for health calculation
metrics = await self.analytics_service.get_real_time_metrics()
# Validate system health calculation
system_health = metrics.system_health
assert 0 <= system_health <= 100, \
f"System health should be between 0-100, got: {system_health}"
# Test health calculation logic
calculated_health = self.analytics_service._calculate_system_health({
'success_rate': metrics.success_rate,
'quality_metrics': {'average_quality': metrics.quality_score},
'error_rate': metrics.error_rate
})
assert 0 <= calculated_health <= 100, \
f"Calculated health should be between 0-100, got: {calculated_health}"
print(f"✅ System health test passed")
print(f" - System health: {system_health:.1f}%")
print(f" - Success rate: {metrics.success_rate:.1f}%")
print(f" - Quality score: {metrics.quality_score:.3f}")
self.test_results["passed"] += 1
except Exception as e:
print(f"❌ System health test failed: {e}")
self.test_results["failed"] += 1
self.test_results["errors"].append(f"System health: {e}")
self.test_results["total_tests"] += 1
async def test_similarity_analysis(self):
"""Test document similarity analysis"""
print("\n🔍 Testing Similarity Analysis...")
try:
# Get a sample document ID for testing
documents = self.db_manager.get_all_documents()
if not documents:
print(" ⚠️ No documents available for similarity testing")
self.test_results["passed"] += 1
return
test_document_id = documents[0]['id']
# Test similarity analysis
similar_docs = await self.analytics_service.find_similar_documents(
document_id=test_document_id,
threshold=0.7,
limit=5
)
# Validate similarity results
assert isinstance(
similar_docs, list), "Similar documents should be a list"
for doc in similar_docs:
assert hasattr(doc, 'document_id'), "Missing document_id field"
assert hasattr(
doc, 'similarity_score'), "Missing similarity_score field"
assert hasattr(
doc, 'common_entities'), "Missing common_entities field"
assert hasattr(
doc, 'shared_topics'), "Missing shared_topics field"
assert hasattr(
doc, 'relevance_score'), "Missing relevance_score field"
# Validate similarity score range
assert 0 <= doc.similarity_score <= 1, \
f"Similarity score should be between 0-1, got: {doc.similarity_score}"
# Validate relevance score range
assert 0 <= doc.relevance_score <= 1, \
f"Relevance score should be between 0-1, got: {doc.relevance_score}"
print(f"✅ Similarity analysis test passed")
print(f" - Similar documents found: {len(similar_docs)}")
if similar_docs:
print(
f" - Average similarity: {sum(d.similarity_score for d in similar_docs) / len(similar_docs):.3f}")
self.test_results["passed"] += 1
except Exception as e:
print(f"❌ Similarity analysis test failed: {e}")
self.test_results["failed"] += 1
self.test_results["errors"].append(f"Similarity analysis: {e}")
self.test_results["total_tests"] += 1
async def test_cache_performance(self):
"""Test cache performance and efficiency"""
print("\n⚡ Testing Cache Performance...")
try:
# Test cache operations
test_key = "test_analytics_key"
test_data = {"test": "data",
"timestamp": datetime.now().isoformat()}
# Set cache data
await cache_service.set(test_key, test_data, expire=60)
# Get cache data
cached_data = await cache_service.get(test_key)
assert cached_data == test_data, "Cached data should match original data"
# Test cache stats
cache_stats = await cache_service.get_stats()
assert isinstance(
cache_stats, dict), "Cache stats should be a dictionary"
# Validate cache hit rate calculation
if 'hit_rate' in cache_stats:
hit_rate = cache_stats['hit_rate']
assert 0 <= hit_rate <= 100, \
f"Cache hit rate should be between 0-100, got: {hit_rate}"
print(f"✅ Cache performance test passed")
print(
f" - Cache hit rate: {cache_stats.get('hit_rate', 0):.1f}%")
self.test_results["passed"] += 1
except Exception as e:
print(f"❌ Cache performance test failed: {e}")
self.test_results["failed"] += 1
self.test_results["errors"].append(f"Cache performance: {e}")
self.test_results["total_tests"] += 1
def generate_test_report(self):
"""Generate comprehensive test report"""
print("\n" + "=" * 50)
print("📋 Enhanced Analytics Test Report")
print("=" * 50)
success_rate = (self.test_results["passed"] / self.test_results["total_tests"] * 100) \
if self.test_results["total_tests"] > 0 else 0
print(f"📊 Test Summary:")
print(f" - Total tests: {self.test_results['total_tests']}")
print(f" - Passed: {self.test_results['passed']}")
print(f" - Failed: {self.test_results['failed']}")
print(f" - Success rate: {success_rate:.1f}%")
if self.test_results["errors"]:
print(f"\n❌ Errors encountered:")
for error in self.test_results["errors"]:
print(f" - {error}")
# Save test results
test_report = {
"timestamp": datetime.now().isoformat(),
"test_results": self.test_results,
"success_rate": success_rate
}
with open("enhanced_analytics_test_report.json", "w", encoding="utf-8") as f:
json.dump(test_report, f, indent=2, ensure_ascii=False)
print(f"\n📄 Test report saved to: enhanced_analytics_test_report.json")
if success_rate >= 90:
print("🎉 All tests passed! Enhanced analytics system is working correctly.")
elif success_rate >= 70:
print("⚠️ Most tests passed. Some issues need attention.")
else:
print("❌ Multiple test failures detected. System needs fixes.")
return success_rate >= 90
async def main():
"""Main test runner"""
tester = EnhancedAnalyticsTester()
success = await tester.run_all_tests()
return 0 if success else 1
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code)