IDAgentsFreshTest / test_isolation_automated.py
IDAgents Developer
Add comprehensive session isolation testing suite
48d481d
raw
history blame
8.28 kB
"""
Automated Session Isolation Tester
===================================
This script simulates multiple users and verifies that their sessions
are properly isolated. Run this before manual testing.
Usage:
python test_isolation_automated.py
"""
from user_session_manager import session_manager, SessionKeys
from session_helpers import get_current_username
import sys
class MockRequest:
"""Mock Gradio request object for testing."""
def __init__(self, username):
self.username = username
def test_basic_isolation():
"""Test that basic session isolation works."""
print("\nπŸ§ͺ TEST 1: Basic Session Isolation")
print("-" * 50)
# Simulate two users
user1_req = MockRequest("user1")
user2_req = MockRequest("user2")
# User 1 stores data
session_manager.set_user_data("user1", SessionKeys.SIMPLE_CHAT_HISTORY, [
["Hi", "Hello User 1!"]
])
# User 2 stores different data
session_manager.set_user_data("user2", SessionKeys.SIMPLE_CHAT_HISTORY, [
["Hey", "Hello User 2!"]
])
# Verify isolation
user1_history = session_manager.get_user_data("user1", SessionKeys.SIMPLE_CHAT_HISTORY)
user2_history = session_manager.get_user_data("user2", SessionKeys.SIMPLE_CHAT_HISTORY)
print(f"User 1 history: {user1_history}")
print(f"User 2 history: {user2_history}")
if user1_history != user2_history:
print("βœ… PASS: Users have different data")
return True
else:
print("❌ FAIL: Users have same data!")
return False
def test_concurrent_updates():
"""Test that concurrent updates don't interfere."""
print("\nπŸ§ͺ TEST 2: Concurrent Updates")
print("-" * 50)
# Clear previous data
session_manager.clear_user_data("user1")
session_manager.clear_user_data("user2")
# Simulate rapid concurrent updates
for i in range(10):
session_manager.set_user_data("user1", f"key_{i}", f"user1_value_{i}")
session_manager.set_user_data("user2", f"key_{i}", f"user2_value_{i}")
# Verify no cross-contamination
passed = True
for i in range(10):
user1_val = session_manager.get_user_data("user1", f"key_{i}")
user2_val = session_manager.get_user_data("user2", f"key_{i}")
if f"user1_value_{i}" != user1_val:
print(f"❌ FAIL: User 1 key_{i} has wrong value: {user1_val}")
passed = False
if f"user2_value_{i}" != user2_val:
print(f"❌ FAIL: User 2 key_{i} has wrong value: {user2_val}")
passed = False
if passed:
print("βœ… PASS: All concurrent updates isolated correctly")
return passed
def test_user_cleanup():
"""Test that clearing one user doesn't affect others."""
print("\nπŸ§ͺ TEST 3: User Data Cleanup")
print("-" * 50)
# Set up data for both users
session_manager.set_user_data("user1", "data", "user1_data")
session_manager.set_user_data("user2", "data", "user2_data")
print("Before cleanup:")
print(f" User 1 data: {session_manager.get_user_data('user1', 'data')}")
print(f" User 2 data: {session_manager.get_user_data('user2', 'data')}")
# Clear user 1
session_manager.clear_user_data("user1")
print("After clearing user1:")
print(f" User 1 data: {session_manager.get_user_data('user1', 'data', 'NONE')}")
print(f" User 2 data: {session_manager.get_user_data('user2', 'data')}")
user1_cleared = session_manager.get_user_data("user1", "data", "NONE") == "NONE"
user2_intact = session_manager.get_user_data("user2", "data") == "user2_data"
if user1_cleared and user2_intact:
print("βœ… PASS: User 1 cleared, User 2 intact")
return True
else:
print("❌ FAIL: Cleanup affected wrong user")
return False
def test_chat_history_simulation():
"""Simulate a real chat scenario with multiple users."""
print("\nπŸ§ͺ TEST 4: Chat History Simulation")
print("-" * 50)
# Clear previous data
session_manager.clear_user_data("alice")
session_manager.clear_user_data("bob")
# Alice's conversation
alice_history = []
alice_history.append(["Hi", "Hello Alice!"])
alice_history.append(["How are you?", "I'm great!"])
session_manager.set_user_data("alice", SessionKeys.SIMPLE_CHAT_HISTORY, alice_history)
# Bob's conversation
bob_history = []
bob_history.append(["Hey", "Hey Bob!"])
bob_history.append(["What's new?", "Not much!"])
session_manager.set_user_data("bob", SessionKeys.SIMPLE_CHAT_HISTORY, bob_history)
# Verify isolation
retrieved_alice = session_manager.get_user_data("alice", SessionKeys.SIMPLE_CHAT_HISTORY)
retrieved_bob = session_manager.get_user_data("bob", SessionKeys.SIMPLE_CHAT_HISTORY)
print(f"Alice's chat: {len(retrieved_alice)} messages")
print(f"Bob's chat: {len(retrieved_bob)} messages")
print(f"Alice sees: {retrieved_alice}")
print(f"Bob sees: {retrieved_bob}")
alice_correct = retrieved_alice == alice_history
bob_correct = retrieved_bob == bob_history
no_overlap = retrieved_alice != retrieved_bob
if alice_correct and bob_correct and no_overlap:
print("βœ… PASS: Chat histories properly isolated")
return True
else:
print("❌ FAIL: Chat history isolation broken")
return False
def test_session_stats():
"""Test session statistics and tracking."""
print("\nπŸ§ͺ TEST 5: Session Statistics")
print("-" * 50)
# Create some test data
session_manager.set_user_data("testuser1", "key1", "value1")
session_manager.set_user_data("testuser2", "key1", "value1")
session_manager.set_user_data("testuser3", "key1", "value1")
active_users = session_manager.get_active_users()
print(f"Active users: {active_users}")
has_expected_users = len(active_users) >= 3
if has_expected_users:
print("βœ… PASS: Session tracking working")
for user in active_users[:3]:
stats = session_manager.get_user_stats(user)
print(f" {user}: {stats}")
return True
else:
print("❌ FAIL: Session tracking not working")
return False
def run_all_tests():
"""Run all tests and report results."""
print("\n" + "=" * 60)
print("πŸš€ SESSION ISOLATION AUTOMATED TEST SUITE")
print("=" * 60)
tests = [
("Basic Isolation", test_basic_isolation),
("Concurrent Updates", test_concurrent_updates),
("User Cleanup", test_user_cleanup),
("Chat History Simulation", test_chat_history_simulation),
("Session Statistics", test_session_stats),
]
results = []
for name, test_func in tests:
try:
passed = test_func()
results.append((name, passed))
except Exception as e:
print(f"❌ EXCEPTION in {name}: {e}")
results.append((name, False))
# Summary
print("\n" + "=" * 60)
print("πŸ“Š TEST SUMMARY")
print("=" * 60)
passed_count = sum(1 for _, passed in results if passed)
total_count = len(results)
for name, passed in results:
status = "βœ… PASS" if passed else "❌ FAIL"
print(f"{status}: {name}")
print(f"\nπŸ“ˆ Results: {passed_count}/{total_count} tests passed")
if passed_count == total_count:
print("\nπŸŽ‰ ALL TESTS PASSED! Session isolation is working correctly!")
print("\nβœ… You can now proceed to manual testing with real browsers.")
return 0
else:
print(f"\n⚠️ {total_count - passed_count} test(s) failed.")
print("\n❌ Fix these issues before deploying.")
return 1
if __name__ == "__main__":
exit_code = run_all_tests()
print("\n" + "=" * 60)
print("πŸ“ NEXT STEPS:")
print("=" * 60)
print("1. If all tests passed, proceed to manual testing:")
print(" - See TEST_SESSION_ISOLATION.md for instructions")
print("2. If tests failed, check the error messages above")
print("3. Run this script again after fixes")
print("=" * 60 + "\n")
sys.exit(exit_code)