128 lines
4.9 KiB
Python
128 lines
4.9 KiB
Python
"""
|
|
Redis subscriber for real-time data processing
|
|
Presentation Layer - handles Redis pub/sub and WebSocket broadcasting
|
|
"""
|
|
import asyncio
|
|
import logging
|
|
|
|
from ..infrastructure.redis_connection import redis_connection
|
|
from ..business.sensor_service import SensorService
|
|
from ..business.room_service import RoomService
|
|
from .websocket_handler import websocket_manager
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
class RedisSubscriber:
|
|
"""Manages Redis subscription and data broadcasting"""
|
|
|
|
def __init__(self):
|
|
self.sensor_service = SensorService()
|
|
self.room_service = RoomService()
|
|
self.is_running = False
|
|
self.subscription_task = None
|
|
|
|
async def start_subscription(self, channel: str = "energy_data") -> None:
|
|
"""Start Redis subscription in background task"""
|
|
if self.is_running:
|
|
logger.warning("Redis subscriber is already running")
|
|
return
|
|
|
|
self.is_running = True
|
|
self.subscription_task = asyncio.create_task(self._subscribe_loop(channel))
|
|
logger.info(f"Started Redis subscriber for channel: {channel}")
|
|
|
|
async def stop_subscription(self) -> None:
|
|
"""Stop Redis subscription"""
|
|
self.is_running = False
|
|
if self.subscription_task:
|
|
self.subscription_task.cancel()
|
|
try:
|
|
await self.subscription_task
|
|
except asyncio.CancelledError:
|
|
pass
|
|
logger.info("Redis subscriber stopped")
|
|
|
|
async def _subscribe_loop(self, channel: str) -> None:
|
|
"""Main subscription loop"""
|
|
logger.info("Starting Redis subscriber...")
|
|
|
|
try:
|
|
# Get Redis client and create pubsub
|
|
redis_client = await redis_connection.get_client()
|
|
pubsub = await redis_connection.create_pubsub()
|
|
|
|
# Subscribe to channel
|
|
await pubsub.subscribe(channel)
|
|
logger.info(f"Subscribed to Redis channel: '{channel}'")
|
|
|
|
while self.is_running:
|
|
try:
|
|
# Get message with timeout
|
|
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0)
|
|
|
|
if message and message.get('data'):
|
|
await self._process_message(message['data'])
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in Redis subscriber loop: {e}")
|
|
# Add delay to prevent rapid-fire errors
|
|
await asyncio.sleep(5)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Could not connect to Redis for subscription: {e}")
|
|
finally:
|
|
# Clean up pubsub connection
|
|
try:
|
|
await pubsub.unsubscribe(channel)
|
|
await pubsub.close()
|
|
except Exception as e:
|
|
logger.error(f"Error closing pubsub connection: {e}")
|
|
|
|
async def _process_message(self, message_data: str) -> None:
|
|
"""Process incoming Redis message"""
|
|
try:
|
|
logger.debug(f"Received from Redis: {message_data}")
|
|
|
|
# Process sensor data through business layer
|
|
processing_success = await self.sensor_service.process_sensor_message(message_data)
|
|
|
|
if processing_success:
|
|
# Extract room from message for room metrics update
|
|
import json
|
|
try:
|
|
data = json.loads(message_data)
|
|
room = data.get('room')
|
|
if room:
|
|
# Update room metrics asynchronously
|
|
asyncio.create_task(self.room_service.update_room_metrics(room))
|
|
except json.JSONDecodeError:
|
|
logger.warning("Could not parse message for room extraction")
|
|
|
|
# Broadcast to WebSocket clients
|
|
await websocket_manager.broadcast(message_data)
|
|
else:
|
|
logger.warning("Sensor data processing failed, skipping broadcast")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing Redis message: {e}")
|
|
|
|
def is_subscriber_running(self) -> bool:
|
|
"""Check if subscriber is currently running"""
|
|
return self.is_running and (
|
|
self.subscription_task is not None and
|
|
not self.subscription_task.done()
|
|
)
|
|
|
|
async def get_subscriber_status(self) -> dict:
|
|
"""Get subscriber status information"""
|
|
return {
|
|
"is_running": self.is_running,
|
|
"task_status": (
|
|
"running" if self.subscription_task and not self.subscription_task.done()
|
|
else "stopped"
|
|
),
|
|
"active_websocket_connections": websocket_manager.get_connection_count()
|
|
}
|
|
|
|
# Global Redis subscriber instance
|
|
redis_subscriber = RedisSubscriber() |