""" WebSocket manager for real-time sensor data broadcasting """ import asyncio import json from typing import List, Set, Dict, Any from fastapi import WebSocket, WebSocketDisconnect import logging from models import SensorReading logger = logging.getLogger(__name__) class WebSocketManager: """Manages WebSocket connections for real-time data broadcasting""" def __init__(self): self.active_connections: List[WebSocket] = [] self.room_subscriptions: Dict[str, Set[WebSocket]] = {} self.sensor_subscriptions: Dict[str, Set[WebSocket]] = {} self.connection_metadata: Dict[WebSocket, Dict[str, Any]] = {} async def connect(self, websocket: WebSocket, room: str = None, sensor_id: str = None): """Accept a WebSocket connection and handle subscriptions""" await websocket.accept() self.active_connections.append(websocket) # Store connection metadata self.connection_metadata[websocket] = { "connected_at": asyncio.get_event_loop().time(), "room": room, "sensor_id": sensor_id, "message_count": 0 } # Handle room subscription if room: if room not in self.room_subscriptions: self.room_subscriptions[room] = set() self.room_subscriptions[room].add(websocket) # Handle sensor subscription if sensor_id: if sensor_id not in self.sensor_subscriptions: self.sensor_subscriptions[sensor_id] = set() self.sensor_subscriptions[sensor_id].add(websocket) logger.info(f"WebSocket client connected. Total connections: {len(self.active_connections)}") # Send initial connection confirmation await self.send_to_connection(websocket, { "type": "connection_established", "timestamp": asyncio.get_event_loop().time(), "subscriptions": { "room": room, "sensor_id": sensor_id }, "total_connections": len(self.active_connections) }) async def disconnect(self, websocket: WebSocket): """Remove a WebSocket connection and clean up subscriptions""" if websocket in self.active_connections: self.active_connections.remove(websocket) # Clean up room subscriptions for room_connections in self.room_subscriptions.values(): room_connections.discard(websocket) # Clean up sensor subscriptions for sensor_connections in self.sensor_subscriptions.values(): sensor_connections.discard(websocket) # Clean up metadata self.connection_metadata.pop(websocket, None) logger.info(f"WebSocket client disconnected. Total connections: {len(self.active_connections)}") async def send_to_connection(self, websocket: WebSocket, data: Dict[str, Any]): """Send data to a specific WebSocket connection""" try: await websocket.send_text(json.dumps(data)) # Update message count if websocket in self.connection_metadata: self.connection_metadata[websocket]["message_count"] += 1 except Exception as e: logger.error(f"Error sending data to WebSocket: {e}") await self.disconnect(websocket) async def broadcast_to_all(self, data: Dict[str, Any]): """Broadcast data to all connected WebSocket clients""" if not self.active_connections: return message = json.dumps(data) disconnected = [] for websocket in self.active_connections: try: await websocket.send_text(message) # Update message count if websocket in self.connection_metadata: self.connection_metadata[websocket]["message_count"] += 1 except Exception as e: logger.error(f"Error broadcasting to WebSocket: {e}") disconnected.append(websocket) # Clean up disconnected connections for websocket in disconnected: await self.disconnect(websocket) async def broadcast_to_room(self, room: str, data: Dict[str, Any]): """Broadcast data to all clients subscribed to a specific room""" if room not in self.room_subscriptions: return room_connections = self.room_subscriptions[room].copy() if not room_connections: return message = json.dumps(data) disconnected = [] for websocket in room_connections: try: await websocket.send_text(message) # Update message count if websocket in self.connection_metadata: self.connection_metadata[websocket]["message_count"] += 1 except Exception as e: logger.error(f"Error broadcasting to room {room}: {e}") disconnected.append(websocket) # Clean up disconnected connections for websocket in disconnected: await self.disconnect(websocket) async def broadcast_to_sensor(self, sensor_id: str, data: Dict[str, Any]): """Broadcast data to all clients subscribed to a specific sensor""" if sensor_id not in self.sensor_subscriptions: return sensor_connections = self.sensor_subscriptions[sensor_id].copy() if not sensor_connections: return message = json.dumps(data) disconnected = [] for websocket in sensor_connections: try: await websocket.send_text(message) # Update message count if websocket in self.connection_metadata: self.connection_metadata[websocket]["message_count"] += 1 except Exception as e: logger.error(f"Error broadcasting to sensor {sensor_id}: {e}") disconnected.append(websocket) # Clean up disconnected connections for websocket in disconnected: await self.disconnect(websocket) async def broadcast_sensor_data(self, sensor_reading: SensorReading): """Broadcast sensor reading data to appropriate subscribers""" data = { "type": "sensor_data", "sensor_id": sensor_reading.sensor_id, "room": sensor_reading.room, "sensor_type": sensor_reading.sensor_type.value, "timestamp": sensor_reading.timestamp, "data": { "energy": sensor_reading.energy, "co2": sensor_reading.co2, "temperature": sensor_reading.temperature, "humidity": sensor_reading.humidity, "motion": sensor_reading.motion, "power": sensor_reading.power, "voltage": sensor_reading.voltage, "current": sensor_reading.current, "generation": sensor_reading.generation }, "metadata": sensor_reading.metadata } # Broadcast to all connections await self.broadcast_to_all(data) # Broadcast to room-specific subscribers if sensor_reading.room: await self.broadcast_to_room(sensor_reading.room, data) # Broadcast to sensor-specific subscribers await self.broadcast_to_sensor(sensor_reading.sensor_id, data) async def broadcast_room_metrics(self, room: str, metrics: Dict[str, Any]): """Broadcast room-level metrics to subscribers""" data = { "type": "room_metrics", "room": room, "timestamp": asyncio.get_event_loop().time(), "metrics": metrics } # Broadcast to all connections await self.broadcast_to_all(data) # Broadcast to room-specific subscribers await self.broadcast_to_room(room, data) async def broadcast_system_event(self, event: Dict[str, Any]): """Broadcast system events to all subscribers""" data = { "type": "system_event", "timestamp": asyncio.get_event_loop().time(), "event": event } await self.broadcast_to_all(data) async def broadcast_raw_data(self, raw_data: str): """Broadcast raw data from Redis or other sources""" try: # Try to parse as JSON data = json.loads(raw_data) # Add type if not present if "type" not in data: data["type"] = "raw_data" await self.broadcast_to_all(data) except json.JSONDecodeError: # Send as raw string if not JSON data = { "type": "raw_data", "data": raw_data, "timestamp": asyncio.get_event_loop().time() } await self.broadcast_to_all(data) def get_connection_stats(self) -> Dict[str, Any]: """Get statistics about current WebSocket connections""" total_connections = len(self.active_connections) room_stats = {room: len(connections) for room, connections in self.room_subscriptions.items()} sensor_stats = {sensor: len(connections) for sensor, connections in self.sensor_subscriptions.items()} # Calculate message statistics total_messages = sum( metadata.get("message_count", 0) for metadata in self.connection_metadata.values() ) return { "total_connections": total_connections, "room_subscriptions": room_stats, "sensor_subscriptions": sensor_stats, "total_messages_sent": total_messages, "active_rooms": len([room for room, connections in self.room_subscriptions.items() if connections]), "active_sensors": len([sensor for sensor, connections in self.sensor_subscriptions.items() if connections]) } async def send_connection_stats(self): """Send connection statistics to all clients""" stats = self.get_connection_stats() data = { "type": "connection_stats", "timestamp": asyncio.get_event_loop().time(), "stats": stats } await self.broadcast_to_all(data) async def ping_all_connections(self): """Send ping to all connections to keep them alive""" data = { "type": "ping", "timestamp": asyncio.get_event_loop().time() } await self.broadcast_to_all(data)