Files
rafaeldpsilva 2008ea0e70 Refactor service registry and load balancer integration
- Pass service registry to load balancer for dependency injection -
Remove dynamic imports of service registry in load balancer - Update
service registration and health check logic - Enable token-service in
docker-compose and service config - Add room names and rooms proxy
endpoints - Improve logging for proxy requests and health checks -
Update deploy script project name to sa4cps - Add test script for
coroutine fix - Minor code cleanup and formatting
2025-09-22 15:13:06 +01:00

131 lines
5.7 KiB
Python

"""
Load balancer for distributing requests across service instances
"""
import random
from typing import List, Dict, Optional
import logging
logger = logging.getLogger(__name__)
class LoadBalancer:
"""Simple load balancer for microservice requests"""
def __init__(self, service_registry=None):
# In a real implementation, this would track multiple instances per service
self.service_instances: Dict[str, List[str]] = {}
self.current_index: Dict[str, int] = {}
self.service_registry = service_registry
def register_service_instance(self, service_name: str, instance_url: str):
"""Register a new service instance"""
if service_name not in self.service_instances:
self.service_instances[service_name] = []
self.current_index[service_name] = 0
if instance_url not in self.service_instances[service_name]:
self.service_instances[service_name].append(instance_url)
logger.info(f"Registered instance {instance_url} for service {service_name}")
def unregister_service_instance(self, service_name: str, instance_url: str):
"""Unregister a service instance"""
if service_name in self.service_instances:
try:
self.service_instances[service_name].remove(instance_url)
logger.info(f"Unregistered instance {instance_url} for service {service_name}")
# Reset index if it's out of bounds
if self.current_index[service_name] >= len(self.service_instances[service_name]):
self.current_index[service_name] = 0
except ValueError:
logger.warning(f"Instance {instance_url} not found for service {service_name}")
async def get_service_url(self, service_name: str, strategy: str = "single") -> Optional[str]:
"""
Get a service URL using the specified load balancing strategy
Strategies:
- single: Single instance (default for this simple implementation)
- round_robin: Round-robin across instances
- random: Random selection
"""
# For this microservice setup, we typically have one instance per service
# In a production environment, you'd have multiple instances
if strategy == "single":
# Default behavior - get the service URL from service registry
if self.service_registry:
return await self.service_registry.get_service_url(service_name)
else:
logger.error("No service registry available")
return None
elif strategy == "round_robin":
return await self._round_robin_select(service_name)
elif strategy == "random":
return await self._random_select(service_name)
else:
logger.error(f"Unknown load balancing strategy: {strategy}")
return None
async def _round_robin_select(self, service_name: str) -> Optional[str]:
"""Select service instance using round-robin"""
instances = self.service_instances.get(service_name, [])
if not instances:
# Fall back to service registry
if self.service_registry:
return await self.service_registry.get_service_url(service_name)
else:
logger.error("No service registry available for fallback")
return None
# Round-robin selection
current_idx = self.current_index[service_name]
selected_instance = instances[current_idx]
# Update index for next request
self.current_index[service_name] = (current_idx + 1) % len(instances)
logger.debug(f"Round-robin selected {selected_instance} for {service_name}")
return selected_instance
async def _random_select(self, service_name: str) -> Optional[str]:
"""Select service instance randomly"""
instances = self.service_instances.get(service_name, [])
if not instances:
# Fall back to service registry
if self.service_registry:
return await self.service_registry.get_service_url(service_name)
else:
logger.error("No service registry available for fallback")
return None
selected_instance = random.choice(instances)
logger.debug(f"Random selected {selected_instance} for {service_name}")
return selected_instance
def get_service_instances(self, service_name: str) -> List[str]:
"""Get all registered instances for a service"""
return self.service_instances.get(service_name, [])
def get_instance_count(self, service_name: str) -> int:
"""Get number of registered instances for a service"""
return len(self.service_instances.get(service_name, []))
def get_all_services(self) -> Dict[str, List[str]]:
"""Get all services and their instances"""
return self.service_instances.copy()
def health_check_failed(self, service_name: str, instance_url: str):
"""Handle health check failure for a service instance"""
logger.warning(f"Health check failed for {instance_url} ({service_name})")
# In a production system, you might temporarily remove unhealthy instances
# For now, we just log the failure
def health_check_recovered(self, service_name: str, instance_url: str):
"""Handle health check recovery for a service instance"""
logger.info(f"Health check recovered for {instance_url} ({service_name})")
# Re-register the instance if it was temporarily removed