first commit
This commit is contained in:
128
layers/presentation/redis_subscriber.py
Normal file
128
layers/presentation/redis_subscriber.py
Normal file
@@ -0,0 +1,128 @@
|
||||
"""
|
||||
Redis subscriber for real-time data processing
|
||||
Presentation Layer - handles Redis pub/sub and WebSocket broadcasting
|
||||
"""
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from ..infrastructure.redis_connection import redis_connection
|
||||
from ..business.sensor_service import SensorService
|
||||
from ..business.room_service import RoomService
|
||||
from .websocket_handler import websocket_manager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class RedisSubscriber:
|
||||
"""Manages Redis subscription and data broadcasting"""
|
||||
|
||||
def __init__(self):
|
||||
self.sensor_service = SensorService()
|
||||
self.room_service = RoomService()
|
||||
self.is_running = False
|
||||
self.subscription_task = None
|
||||
|
||||
async def start_subscription(self, channel: str = "energy_data") -> None:
|
||||
"""Start Redis subscription in background task"""
|
||||
if self.is_running:
|
||||
logger.warning("Redis subscriber is already running")
|
||||
return
|
||||
|
||||
self.is_running = True
|
||||
self.subscription_task = asyncio.create_task(self._subscribe_loop(channel))
|
||||
logger.info(f"Started Redis subscriber for channel: {channel}")
|
||||
|
||||
async def stop_subscription(self) -> None:
|
||||
"""Stop Redis subscription"""
|
||||
self.is_running = False
|
||||
if self.subscription_task:
|
||||
self.subscription_task.cancel()
|
||||
try:
|
||||
await self.subscription_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
logger.info("Redis subscriber stopped")
|
||||
|
||||
async def _subscribe_loop(self, channel: str) -> None:
|
||||
"""Main subscription loop"""
|
||||
logger.info("Starting Redis subscriber...")
|
||||
|
||||
try:
|
||||
# Get Redis client and create pubsub
|
||||
redis_client = await redis_connection.get_client()
|
||||
pubsub = await redis_connection.create_pubsub()
|
||||
|
||||
# Subscribe to channel
|
||||
await pubsub.subscribe(channel)
|
||||
logger.info(f"Subscribed to Redis channel: '{channel}'")
|
||||
|
||||
while self.is_running:
|
||||
try:
|
||||
# Get message with timeout
|
||||
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0)
|
||||
|
||||
if message and message.get('data'):
|
||||
await self._process_message(message['data'])
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in Redis subscriber loop: {e}")
|
||||
# Add delay to prevent rapid-fire errors
|
||||
await asyncio.sleep(5)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Could not connect to Redis for subscription: {e}")
|
||||
finally:
|
||||
# Clean up pubsub connection
|
||||
try:
|
||||
await pubsub.unsubscribe(channel)
|
||||
await pubsub.close()
|
||||
except Exception as e:
|
||||
logger.error(f"Error closing pubsub connection: {e}")
|
||||
|
||||
async def _process_message(self, message_data: str) -> None:
|
||||
"""Process incoming Redis message"""
|
||||
try:
|
||||
logger.debug(f"Received from Redis: {message_data}")
|
||||
|
||||
# Process sensor data through business layer
|
||||
processing_success = await self.sensor_service.process_sensor_message(message_data)
|
||||
|
||||
if processing_success:
|
||||
# Extract room from message for room metrics update
|
||||
import json
|
||||
try:
|
||||
data = json.loads(message_data)
|
||||
room = data.get('room')
|
||||
if room:
|
||||
# Update room metrics asynchronously
|
||||
asyncio.create_task(self.room_service.update_room_metrics(room))
|
||||
except json.JSONDecodeError:
|
||||
logger.warning("Could not parse message for room extraction")
|
||||
|
||||
# Broadcast to WebSocket clients
|
||||
await websocket_manager.broadcast(message_data)
|
||||
else:
|
||||
logger.warning("Sensor data processing failed, skipping broadcast")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing Redis message: {e}")
|
||||
|
||||
def is_subscriber_running(self) -> bool:
|
||||
"""Check if subscriber is currently running"""
|
||||
return self.is_running and (
|
||||
self.subscription_task is not None and
|
||||
not self.subscription_task.done()
|
||||
)
|
||||
|
||||
async def get_subscriber_status(self) -> dict:
|
||||
"""Get subscriber status information"""
|
||||
return {
|
||||
"is_running": self.is_running,
|
||||
"task_status": (
|
||||
"running" if self.subscription_task and not self.subscription_task.done()
|
||||
else "stopped"
|
||||
),
|
||||
"active_websocket_connections": websocket_manager.get_connection_count()
|
||||
}
|
||||
|
||||
# Global Redis subscriber instance
|
||||
redis_subscriber = RedisSubscriber()
|
||||
Reference in New Issue
Block a user