Spaces:
Sleeping
Sleeping
| """ | |
| Automated Session Isolation Tester | |
| =================================== | |
| This script simulates multiple users and verifies that their sessions | |
| are properly isolated. Run this before manual testing. | |
| Usage: | |
| python test_isolation_automated.py | |
| """ | |
| from user_session_manager import session_manager, SessionKeys | |
| from session_helpers import get_current_username | |
| import sys | |
| class MockRequest: | |
| """Mock Gradio request object for testing.""" | |
| def __init__(self, username): | |
| self.username = username | |
| def test_basic_isolation(): | |
| """Test that basic session isolation works.""" | |
| print("\nπ§ͺ TEST 1: Basic Session Isolation") | |
| print("-" * 50) | |
| # Simulate two users | |
| user1_req = MockRequest("user1") | |
| user2_req = MockRequest("user2") | |
| # User 1 stores data | |
| session_manager.set_user_data("user1", SessionKeys.SIMPLE_CHAT_HISTORY, [ | |
| ["Hi", "Hello User 1!"] | |
| ]) | |
| # User 2 stores different data | |
| session_manager.set_user_data("user2", SessionKeys.SIMPLE_CHAT_HISTORY, [ | |
| ["Hey", "Hello User 2!"] | |
| ]) | |
| # Verify isolation | |
| user1_history = session_manager.get_user_data("user1", SessionKeys.SIMPLE_CHAT_HISTORY) | |
| user2_history = session_manager.get_user_data("user2", SessionKeys.SIMPLE_CHAT_HISTORY) | |
| print(f"User 1 history: {user1_history}") | |
| print(f"User 2 history: {user2_history}") | |
| if user1_history != user2_history: | |
| print("β PASS: Users have different data") | |
| return True | |
| else: | |
| print("β FAIL: Users have same data!") | |
| return False | |
| def test_concurrent_updates(): | |
| """Test that concurrent updates don't interfere.""" | |
| print("\nπ§ͺ TEST 2: Concurrent Updates") | |
| print("-" * 50) | |
| # Clear previous data | |
| session_manager.clear_user_data("user1") | |
| session_manager.clear_user_data("user2") | |
| # Simulate rapid concurrent updates | |
| for i in range(10): | |
| session_manager.set_user_data("user1", f"key_{i}", f"user1_value_{i}") | |
| session_manager.set_user_data("user2", f"key_{i}", f"user2_value_{i}") | |
| # Verify no cross-contamination | |
| passed = True | |
| for i in range(10): | |
| user1_val = session_manager.get_user_data("user1", f"key_{i}") | |
| user2_val = session_manager.get_user_data("user2", f"key_{i}") | |
| if f"user1_value_{i}" != user1_val: | |
| print(f"β FAIL: User 1 key_{i} has wrong value: {user1_val}") | |
| passed = False | |
| if f"user2_value_{i}" != user2_val: | |
| print(f"β FAIL: User 2 key_{i} has wrong value: {user2_val}") | |
| passed = False | |
| if passed: | |
| print("β PASS: All concurrent updates isolated correctly") | |
| return passed | |
| def test_user_cleanup(): | |
| """Test that clearing one user doesn't affect others.""" | |
| print("\nπ§ͺ TEST 3: User Data Cleanup") | |
| print("-" * 50) | |
| # Set up data for both users | |
| session_manager.set_user_data("user1", "data", "user1_data") | |
| session_manager.set_user_data("user2", "data", "user2_data") | |
| print("Before cleanup:") | |
| print(f" User 1 data: {session_manager.get_user_data('user1', 'data')}") | |
| print(f" User 2 data: {session_manager.get_user_data('user2', 'data')}") | |
| # Clear user 1 | |
| session_manager.clear_user_data("user1") | |
| print("After clearing user1:") | |
| print(f" User 1 data: {session_manager.get_user_data('user1', 'data', 'NONE')}") | |
| print(f" User 2 data: {session_manager.get_user_data('user2', 'data')}") | |
| user1_cleared = session_manager.get_user_data("user1", "data", "NONE") == "NONE" | |
| user2_intact = session_manager.get_user_data("user2", "data") == "user2_data" | |
| if user1_cleared and user2_intact: | |
| print("β PASS: User 1 cleared, User 2 intact") | |
| return True | |
| else: | |
| print("β FAIL: Cleanup affected wrong user") | |
| return False | |
| def test_chat_history_simulation(): | |
| """Simulate a real chat scenario with multiple users.""" | |
| print("\nπ§ͺ TEST 4: Chat History Simulation") | |
| print("-" * 50) | |
| # Clear previous data | |
| session_manager.clear_user_data("alice") | |
| session_manager.clear_user_data("bob") | |
| # Alice's conversation | |
| alice_history = [] | |
| alice_history.append(["Hi", "Hello Alice!"]) | |
| alice_history.append(["How are you?", "I'm great!"]) | |
| session_manager.set_user_data("alice", SessionKeys.SIMPLE_CHAT_HISTORY, alice_history) | |
| # Bob's conversation | |
| bob_history = [] | |
| bob_history.append(["Hey", "Hey Bob!"]) | |
| bob_history.append(["What's new?", "Not much!"]) | |
| session_manager.set_user_data("bob", SessionKeys.SIMPLE_CHAT_HISTORY, bob_history) | |
| # Verify isolation | |
| retrieved_alice = session_manager.get_user_data("alice", SessionKeys.SIMPLE_CHAT_HISTORY) | |
| retrieved_bob = session_manager.get_user_data("bob", SessionKeys.SIMPLE_CHAT_HISTORY) | |
| print(f"Alice's chat: {len(retrieved_alice)} messages") | |
| print(f"Bob's chat: {len(retrieved_bob)} messages") | |
| print(f"Alice sees: {retrieved_alice}") | |
| print(f"Bob sees: {retrieved_bob}") | |
| alice_correct = retrieved_alice == alice_history | |
| bob_correct = retrieved_bob == bob_history | |
| no_overlap = retrieved_alice != retrieved_bob | |
| if alice_correct and bob_correct and no_overlap: | |
| print("β PASS: Chat histories properly isolated") | |
| return True | |
| else: | |
| print("β FAIL: Chat history isolation broken") | |
| return False | |
| def test_session_stats(): | |
| """Test session statistics and tracking.""" | |
| print("\nπ§ͺ TEST 5: Session Statistics") | |
| print("-" * 50) | |
| # Create some test data | |
| session_manager.set_user_data("testuser1", "key1", "value1") | |
| session_manager.set_user_data("testuser2", "key1", "value1") | |
| session_manager.set_user_data("testuser3", "key1", "value1") | |
| active_users = session_manager.get_active_users() | |
| print(f"Active users: {active_users}") | |
| has_expected_users = len(active_users) >= 3 | |
| if has_expected_users: | |
| print("β PASS: Session tracking working") | |
| for user in active_users[:3]: | |
| stats = session_manager.get_user_stats(user) | |
| print(f" {user}: {stats}") | |
| return True | |
| else: | |
| print("β FAIL: Session tracking not working") | |
| return False | |
| def run_all_tests(): | |
| """Run all tests and report results.""" | |
| print("\n" + "=" * 60) | |
| print("π SESSION ISOLATION AUTOMATED TEST SUITE") | |
| print("=" * 60) | |
| tests = [ | |
| ("Basic Isolation", test_basic_isolation), | |
| ("Concurrent Updates", test_concurrent_updates), | |
| ("User Cleanup", test_user_cleanup), | |
| ("Chat History Simulation", test_chat_history_simulation), | |
| ("Session Statistics", test_session_stats), | |
| ] | |
| results = [] | |
| for name, test_func in tests: | |
| try: | |
| passed = test_func() | |
| results.append((name, passed)) | |
| except Exception as e: | |
| print(f"β EXCEPTION in {name}: {e}") | |
| results.append((name, False)) | |
| # Summary | |
| print("\n" + "=" * 60) | |
| print("π TEST SUMMARY") | |
| print("=" * 60) | |
| passed_count = sum(1 for _, passed in results if passed) | |
| total_count = len(results) | |
| for name, passed in results: | |
| status = "β PASS" if passed else "β FAIL" | |
| print(f"{status}: {name}") | |
| print(f"\nπ Results: {passed_count}/{total_count} tests passed") | |
| if passed_count == total_count: | |
| print("\nπ ALL TESTS PASSED! Session isolation is working correctly!") | |
| print("\nβ You can now proceed to manual testing with real browsers.") | |
| return 0 | |
| else: | |
| print(f"\nβ οΈ {total_count - passed_count} test(s) failed.") | |
| print("\nβ Fix these issues before deploying.") | |
| return 1 | |
| if __name__ == "__main__": | |
| exit_code = run_all_tests() | |
| print("\n" + "=" * 60) | |
| print("π NEXT STEPS:") | |
| print("=" * 60) | |
| print("1. If all tests passed, proceed to manual testing:") | |
| print(" - See TEST_SESSION_ISOLATION.md for instructions") | |
| print("2. If tests failed, check the error messages above") | |
| print("3. Run this script again after fixes") | |
| print("=" * 60 + "\n") | |
| sys.exit(exit_code) | |