- Pass service registry to load balancer for dependency injection - Remove dynamic imports of service registry in load balancer - Update service registration and health check logic - Enable token-service in docker-compose and service config - Add room names and rooms proxy endpoints - Improve logging for proxy requests and health checks - Update deploy script project name to sa4cps - Add test script for coroutine fix - Minor code cleanup and formatting
42 lines
1.3 KiB
Python
42 lines
1.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test script to validate that the coroutine fix works
|
|
"""
|
|
|
|
import asyncio
|
|
import sys
|
|
from unittest.mock import MagicMock, AsyncMock
|
|
|
|
# Mock the dependencies
|
|
sys.modules['aiohttp'] = MagicMock()
|
|
sys.modules['models'] = MagicMock()
|
|
sys.modules['service_registry'] = MagicMock()
|
|
sys.modules['load_balancer'] = MagicMock()
|
|
sys.modules['auth_middleware'] = MagicMock()
|
|
|
|
# Import the main module after mocking
|
|
import main
|
|
|
|
async def test_lifespan():
|
|
"""Test that the lifespan function works correctly"""
|
|
|
|
# Mock the service registry
|
|
main.service_registry.initialize = AsyncMock()
|
|
main.service_registry.register_services = AsyncMock()
|
|
main.service_registry.close = AsyncMock()
|
|
|
|
# Test the lifespan context manager
|
|
async with main.lifespan(None):
|
|
print("✅ Lifespan startup completed successfully")
|
|
|
|
# Verify that the methods were called
|
|
main.service_registry.initialize.assert_called_once()
|
|
main.service_registry.register_services.assert_called_once_with(main.SERVICES)
|
|
|
|
# Verify shutdown was called
|
|
main.service_registry.close.assert_called_once()
|
|
print("✅ Lifespan shutdown completed successfully")
|
|
print("✅ All coroutines are properly awaited - RuntimeWarning should be resolved")
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(test_lifespan()) |