From fa694443e772ff5f0a1a3b2e05dfb25c29042443 Mon Sep 17 00:00:00 2001 From: rafaeldpsilva Date: Wed, 10 Sep 2025 14:43:41 +0100 Subject: [PATCH] Add sensor-service microservice with FastAPI, models, and WebSocket manager --- microservices/init-mongo/init.js | 22 + microservices/nginx/nginx.conf | 29 + microservices/sensor-service/Dockerfile | 26 + microservices/sensor-service/main.py | 531 ++++++++++++++++++ microservices/sensor-service/models.py | 315 +++++++++++ microservices/sensor-service/requirements.txt | 10 + .../sensor-service/websocket_manager.py | 288 ++++++++++ 7 files changed, 1221 insertions(+) create mode 100644 microservices/init-mongo/init.js create mode 100644 microservices/nginx/nginx.conf create mode 100644 microservices/sensor-service/Dockerfile create mode 100644 microservices/sensor-service/main.py create mode 100644 microservices/sensor-service/models.py create mode 100644 microservices/sensor-service/requirements.txt create mode 100644 microservices/sensor-service/websocket_manager.py diff --git a/microservices/init-mongo/init.js b/microservices/init-mongo/init.js new file mode 100644 index 0000000..e99f9f3 --- /dev/null +++ b/microservices/init-mongo/init.js @@ -0,0 +1,22 @@ +// MongoDB initialization script +db = db.getSiblingDB('energy_dashboard'); +db.createUser({ + user: 'dashboard_user', + pwd: 'dashboard_pass', + roles: [ + { role: 'readWrite', db: 'energy_dashboard' }, + { role: 'readWrite', db: 'energy_dashboard_tokens' }, + { role: 'readWrite', db: 'energy_dashboard_batteries' }, + { role: 'readWrite', db: 'energy_dashboard_demand_response' }, + { role: 'readWrite', db: 'energy_dashboard_p2p' }, + { role: 'readWrite', db: 'energy_dashboard_forecasting' }, + { role: 'readWrite', db: 'energy_dashboard_iot' } + ] +}); + +// Create initial collections and indexes +db.sensors.createIndex({ "sensor_id": 1 }, { unique: true }); +db.sensor_readings.createIndex({ "sensor_id": 1, "timestamp": -1 }); +db.room_metrics.createIndex({ "room": 1, "timestamp": -1 }); + +print("MongoDB initialization completed"); diff --git a/microservices/nginx/nginx.conf b/microservices/nginx/nginx.conf new file mode 100644 index 0000000..c0f2ab2 --- /dev/null +++ b/microservices/nginx/nginx.conf @@ -0,0 +1,29 @@ +events { + worker_connections 1024; +} + +http { + upstream api_gateway { + server api-gateway:8000; + } + + server { + listen 80; + + location / { + proxy_pass http://api_gateway; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + } + + location /ws { + proxy_pass http://api_gateway; + proxy_http_version 1.1; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; + proxy_set_header Host $host; + } + } +} diff --git a/microservices/sensor-service/Dockerfile b/microservices/sensor-service/Dockerfile new file mode 100644 index 0000000..a73f9e4 --- /dev/null +++ b/microservices/sensor-service/Dockerfile @@ -0,0 +1,26 @@ +FROM python:3.9-slim + +WORKDIR /app + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + gcc \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# Copy requirements and install Python dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY . . + +# Expose port +EXPOSE 8007 + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ + CMD curl -f http://localhost:8007/health || exit 1 + +# Run the application +CMD ["python", "main.py"] \ No newline at end of file diff --git a/microservices/sensor-service/main.py b/microservices/sensor-service/main.py new file mode 100644 index 0000000..a0b8331 --- /dev/null +++ b/microservices/sensor-service/main.py @@ -0,0 +1,531 @@ +""" +Sensor Management Microservice +Handles sensors, rooms, real-time data, and analytics from the original dashboard. +Integrates all existing functionality with the microservices architecture. +Port: 8007 +""" + +import asyncio +from datetime import datetime, timedelta +from fastapi import FastAPI, HTTPException, Depends, WebSocket, WebSocketDisconnect, Query, BackgroundTasks +from fastapi.middleware.cors import CORSMiddleware +from contextual import asynccontextmanager +import logging +from typing import List, Optional, Dict, Any +import json + +from models import ( + SensorReading, SensorMetadata, RoomMetrics, SystemEvent, DataQuery, DataResponse, + SensorType, SensorStatus, CO2Status, OccupancyLevel, HealthResponse +) +from database import connect_to_mongo, close_mongo_connection, get_database, connect_to_redis, get_redis +from sensor_service import SensorService +from room_service import RoomService +from analytics_service import AnalyticsService +from websocket_manager import WebSocketManager + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# WebSocket manager for real-time updates +websocket_manager = WebSocketManager() + +@asynccontextmanager +async def lifespan(app: FastAPI): + """Application lifespan manager""" + logger.info("Sensor Service starting up...") + await connect_to_mongo() + await connect_to_redis() + + # Start background tasks + asyncio.create_task(redis_subscriber_task()) + asyncio.create_task(room_metrics_aggregation_task()) + asyncio.create_task(data_cleanup_task()) + + logger.info("Sensor Service startup complete") + + yield + + logger.info("Sensor Service shutting down...") + await close_mongo_connection() + logger.info("Sensor Service shutdown complete") + +app = FastAPI( + title="Sensor Management Service", + description="Comprehensive sensor, room, and analytics management microservice", + version="1.0.0", + lifespan=lifespan +) + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +# Dependencies +async def get_db(): + return await get_database() + +async def get_sensor_service(db=Depends(get_db)): + redis = await get_redis() + return SensorService(db, redis) + +async def get_room_service(db=Depends(get_db)): + redis = await get_redis() + return RoomService(db, redis) + +async def get_analytics_service(db=Depends(get_db)): + redis = await get_redis() + return AnalyticsService(db, redis) + +@app.get("/health", response_model=HealthResponse) +async def health_check(): + """Health check endpoint""" + try: + db = await get_database() + await db.command("ping") + + redis = await get_redis() + await redis.ping() + + return HealthResponse( + service="sensor-service", + status="healthy", + timestamp=datetime.utcnow(), + version="1.0.0" + ) + except Exception as e: + logger.error(f"Health check failed: {e}") + raise HTTPException(status_code=503, detail="Service Unavailable") + +# WebSocket endpoint for real-time data +@app.websocket("/ws") +async def websocket_endpoint(websocket: WebSocket): + """WebSocket endpoint for real-time sensor data""" + await websocket_manager.connect(websocket) + try: + while True: + # Keep connection alive and handle any incoming messages + await websocket.receive_text() + except WebSocketDisconnect: + await websocket_manager.disconnect(websocket) + +# Original Dashboard API Endpoints + +# Sensor Management +@app.get("/sensors") +async def get_sensors( + room: Optional[str] = Query(None, description="Filter by room"), + sensor_type: Optional[SensorType] = Query(None, description="Filter by sensor type"), + status: Optional[SensorStatus] = Query(None, description="Filter by status"), + service: SensorService = Depends(get_sensor_service) +): + """Get all sensors with optional filtering""" + try: + sensors = await service.get_sensors(room=room, sensor_type=sensor_type, status=status) + return { + "sensors": sensors, + "count": len(sensors), + "filters": { + "room": room, + "sensor_type": sensor_type.value if sensor_type else None, + "status": status.value if status else None + } + } + except Exception as e: + logger.error(f"Error getting sensors: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + +@app.get("/sensors/{sensor_id}") +async def get_sensor(sensor_id: str, service: SensorService = Depends(get_sensor_service)): + """Get detailed sensor information""" + try: + sensor = await service.get_sensor_details(sensor_id) + if not sensor: + raise HTTPException(status_code=404, detail="Sensor not found") + + return sensor + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting sensor {sensor_id}: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + +@app.get("/sensors/{sensor_id}/data") +async def get_sensor_data( + sensor_id: str, + start_time: Optional[int] = Query(None, description="Start timestamp (Unix)"), + end_time: Optional[int] = Query(None, description="End timestamp (Unix)"), + limit: int = Query(100, description="Maximum records to return"), + offset: int = Query(0, description="Records to skip"), + service: SensorService = Depends(get_sensor_service) +): + """Get historical data for a specific sensor""" + try: + data = await service.get_sensor_data( + sensor_id=sensor_id, + start_time=start_time, + end_time=end_time, + limit=limit, + offset=offset + ) + + return DataResponse( + data=data["readings"], + total_count=data["total_count"], + query=DataQuery( + sensor_ids=[sensor_id], + start_time=start_time, + end_time=end_time, + limit=limit, + offset=offset + ), + execution_time_ms=data.get("execution_time_ms", 0) + ) + except Exception as e: + logger.error(f"Error getting sensor data for {sensor_id}: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + +@app.post("/sensors") +async def create_sensor( + sensor_data: SensorMetadata, + service: SensorService = Depends(get_sensor_service) +): + """Register a new sensor""" + try: + result = await service.create_sensor(sensor_data) + return { + "message": "Sensor created successfully", + "sensor_id": sensor_data.sensor_id, + "created_at": result.get("created_at") + } + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + logger.error(f"Error creating sensor: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + +@app.put("/sensors/{sensor_id}") +async def update_sensor( + sensor_id: str, + update_data: dict, + service: SensorService = Depends(get_sensor_service) +): + """Update sensor metadata""" + try: + result = await service.update_sensor(sensor_id, update_data) + if not result: + raise HTTPException(status_code=404, detail="Sensor not found") + + return { + "message": "Sensor updated successfully", + "sensor_id": sensor_id, + "updated_at": datetime.utcnow().isoformat() + } + except HTTPException: + raise + except Exception as e: + logger.error(f"Error updating sensor {sensor_id}: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + +@app.delete("/sensors/{sensor_id}") +async def delete_sensor( + sensor_id: str, + service: SensorService = Depends(get_sensor_service) +): + """Delete a sensor and all its data""" + try: + result = await service.delete_sensor(sensor_id) + return { + "message": "Sensor deleted successfully", + "sensor_id": sensor_id, + "readings_deleted": result.get("readings_deleted", 0) + } + except Exception as e: + logger.error(f"Error deleting sensor {sensor_id}: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + +# Room Management +@app.get("/rooms") +async def get_rooms(service: RoomService = Depends(get_room_service)): + """Get all rooms with sensor counts and metrics""" + try: + rooms = await service.get_rooms() + return { + "rooms": rooms, + "count": len(rooms) + } + except Exception as e: + logger.error(f"Error getting rooms: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + +@app.post("/rooms") +async def create_room( + room_data: dict, + service: RoomService = Depends(get_room_service) +): + """Create a new room""" + try: + result = await service.create_room(room_data) + return { + "message": "Room created successfully", + "room": room_data.get("name"), + "created_at": result.get("created_at") + } + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + logger.error(f"Error creating room: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + +@app.get("/rooms/{room_name}") +async def get_room(room_name: str, service: RoomService = Depends(get_room_service)): + """Get detailed room information""" + try: + room = await service.get_room_details(room_name) + if not room: + raise HTTPException(status_code=404, detail="Room not found") + + return room + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting room {room_name}: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + +@app.get("/rooms/{room_name}/data") +async def get_room_data( + room_name: str, + start_time: Optional[int] = Query(None, description="Start timestamp (Unix)"), + end_time: Optional[int] = Query(None, description="End timestamp (Unix)"), + limit: int = Query(100, description="Maximum records to return"), + service: RoomService = Depends(get_room_service) +): + """Get historical data for a specific room""" + try: + data = await service.get_room_data( + room_name=room_name, + start_time=start_time, + end_time=end_time, + limit=limit + ) + + return { + "room": room_name, + "room_metrics": data.get("room_metrics", []), + "sensor_readings": data.get("sensor_readings", []), + "period": { + "start_time": start_time, + "end_time": end_time + } + } + except Exception as e: + logger.error(f"Error getting room data for {room_name}: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + +# Analytics +@app.post("/data/query") +async def query_data( + query_params: DataQuery, + service: AnalyticsService = Depends(get_analytics_service) +): + """Advanced data querying with multiple filters""" + try: + result = await service.query_data(query_params) + return result + except Exception as e: + logger.error(f"Error executing data query: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + +@app.get("/analytics/summary") +async def get_analytics_summary( + hours: int = Query(24, description="Hours of data to analyze"), + service: AnalyticsService = Depends(get_analytics_service) +): + """Get comprehensive analytics summary""" + try: + analytics = await service.get_analytics_summary(hours) + return analytics + except Exception as e: + logger.error(f"Error getting analytics summary: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + +@app.get("/analytics/energy") +async def get_energy_analytics( + hours: int = Query(24), + room: Optional[str] = Query(None), + service: AnalyticsService = Depends(get_analytics_service) +): + """Get energy-specific analytics""" + try: + analytics = await service.get_energy_analytics(hours, room) + return analytics + except Exception as e: + logger.error(f"Error getting energy analytics: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + +# Data Export +@app.get("/export") +async def export_data( + start_time: int = Query(..., description="Start timestamp (Unix)"), + end_time: int = Query(..., description="End timestamp (Unix)"), + sensor_ids: Optional[str] = Query(None, description="Comma-separated sensor IDs"), + format: str = Query("json", description="Export format (json, csv)"), + service: SensorService = Depends(get_sensor_service) +): + """Export sensor data""" + try: + export_data = await service.export_data( + start_time=start_time, + end_time=end_time, + sensor_ids=sensor_ids, + format=format + ) + + return export_data + except Exception as e: + logger.error(f"Error exporting data: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + +# System Events +@app.get("/events") +async def get_events( + severity: Optional[str] = Query(None, description="Filter by severity"), + event_type: Optional[str] = Query(None, description="Filter by event type"), + hours: int = Query(24, description="Hours of events to retrieve"), + limit: int = Query(50, description="Maximum events to return"), + service: SensorService = Depends(get_sensor_service) +): + """Get system events and alerts""" + try: + events = await service.get_events( + severity=severity, + event_type=event_type, + hours=hours, + limit=limit + ) + + return { + "events": events, + "count": len(events), + "period_hours": hours + } + except Exception as e: + logger.error(f"Error getting events: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + +# Real-time data ingestion endpoint (for simulators) +@app.post("/data/ingest") +async def ingest_sensor_data( + sensor_data: SensorReading, + background_tasks: BackgroundTasks, + service: SensorService = Depends(get_sensor_service) +): + """Ingest real-time sensor data""" + try: + # Process and store sensor data + result = await service.ingest_sensor_data(sensor_data) + + # Schedule background tasks for analytics + background_tasks.add_task(update_room_metrics, sensor_data) + background_tasks.add_task(broadcast_sensor_data, sensor_data) + + return { + "message": "Sensor data ingested successfully", + "sensor_id": sensor_data.sensor_id, + "timestamp": sensor_data.timestamp + } + except Exception as e: + logger.error(f"Error ingesting sensor data: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + +# Background task functions +async def update_room_metrics(sensor_data: SensorReading): + """Update room-level metrics when sensor data is received""" + if sensor_data.room: + try: + db = await get_database() + redis = await get_redis() + room_service = RoomService(db, redis) + await room_service.update_room_metrics(sensor_data) + except Exception as e: + logger.error(f"Error updating room metrics: {e}") + +async def broadcast_sensor_data(sensor_data: SensorReading): + """Broadcast sensor data to WebSocket clients""" + try: + await websocket_manager.broadcast_sensor_data(sensor_data) + except Exception as e: + logger.error(f"Error broadcasting sensor data: {e}") + +# Background tasks +async def redis_subscriber_task(): + """Subscribe to Redis channels for real-time data""" + logger.info("Starting Redis subscriber task") + + try: + redis = await get_redis() + pubsub = redis.pubsub() + await pubsub.subscribe("energy_data", "sensor_events") + + while True: + try: + message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0) + if message: + # Process incoming message and broadcast to WebSocket clients + await websocket_manager.broadcast_raw_data(message['data']) + + except Exception as e: + logger.error(f"Error processing Redis message: {e}") + await asyncio.sleep(5) + + except Exception as e: + logger.error(f"Redis subscriber task failed: {e}") + +async def room_metrics_aggregation_task(): + """Periodically aggregate room-level metrics""" + logger.info("Starting room metrics aggregation task") + + while True: + try: + db = await get_database() + redis = await get_redis() + room_service = RoomService(db, redis) + + # Aggregate metrics for all rooms + await room_service.aggregate_all_room_metrics() + + # Sleep for 5 minutes between aggregations + await asyncio.sleep(300) + + except Exception as e: + logger.error(f"Error in room metrics aggregation: {e}") + await asyncio.sleep(600) # Wait longer on error + +async def data_cleanup_task(): + """Periodic cleanup of old data""" + logger.info("Starting data cleanup task") + + while True: + try: + db = await get_database() + service = SensorService(db, None) + + # Clean up data older than 90 days + cleanup_date = datetime.utcnow() - timedelta(days=90) + await service.cleanup_old_data(cleanup_date) + + # Sleep for 24 hours between cleanups + await asyncio.sleep(86400) + + except Exception as e: + logger.error(f"Error in data cleanup task: {e}") + await asyncio.sleep(7200) # Wait 2 hours on error + +if __name__ == "__main__": + import uvicorn + uvicorn.run(app, host="0.0.0.0", port=8007) \ No newline at end of file diff --git a/microservices/sensor-service/models.py b/microservices/sensor-service/models.py new file mode 100644 index 0000000..cf5eb93 --- /dev/null +++ b/microservices/sensor-service/models.py @@ -0,0 +1,315 @@ +""" +Models for Sensor Management Service - integrating all original dashboard functionality +""" + +from pydantic import BaseModel, Field +from typing import Optional, List, Dict, Any, Literal +from datetime import datetime +from enum import Enum + +class SensorType(str, Enum): + ENERGY = "energy" + CO2 = "co2" + TEMPERATURE = "temperature" + HUMIDITY = "humidity" + HVAC = "hvac" + LIGHTING = "lighting" + SECURITY = "security" + MOTION = "motion" + +class SensorStatus(str, Enum): + ONLINE = "online" + OFFLINE = "offline" + ERROR = "error" + MAINTENANCE = "maintenance" + +class CO2Status(str, Enum): + GOOD = "good" + MODERATE = "moderate" + POOR = "poor" + CRITICAL = "critical" + +class OccupancyLevel(str, Enum): + LOW = "low" + MEDIUM = "medium" + HIGH = "high" + +# Base Models from original dashboard +class SensorReading(BaseModel): + """Individual sensor reading model - enhanced from original""" + sensor_id: str = Field(..., description="Unique sensor identifier") + room: Optional[str] = Field(None, description="Room where sensor is located") + sensor_type: SensorType = Field(..., description="Type of sensor") + timestamp: int = Field(..., description="Unix timestamp of reading") + created_at: datetime = Field(default_factory=datetime.utcnow, description="Record creation timestamp") + + # Sensor values with enhanced structure + energy: Optional[Dict[str, Any]] = Field(None, description="Energy reading with value and unit") + co2: Optional[Dict[str, Any]] = Field(None, description="CO2 reading with value and unit") + temperature: Optional[Dict[str, Any]] = Field(None, description="Temperature reading with value and unit") + humidity: Optional[Dict[str, Any]] = Field(None, description="Humidity reading with value and unit") + motion: Optional[Dict[str, Any]] = Field(None, description="Motion detection reading") + + # Additional sensor types from tiocps + power: Optional[Dict[str, Any]] = Field(None, description="Power consumption reading") + voltage: Optional[Dict[str, Any]] = Field(None, description="Voltage reading") + current: Optional[Dict[str, Any]] = Field(None, description="Current reading") + generation: Optional[Dict[str, Any]] = Field(None, description="Energy generation reading") + + # Metadata + metadata: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Additional sensor metadata") + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() + } + +class LegacySensorReading(BaseModel): + """Legacy sensor reading format for backward compatibility""" + sensor_id: str = Field(..., alias="sensorId") + timestamp: int + value: float + unit: str + created_at: datetime = Field(default_factory=datetime.utcnow) + + class Config: + allow_population_by_field_name = True + +class SensorMetadata(BaseModel): + """Enhanced sensor metadata from original dashboard""" + sensor_id: str = Field(..., description="Unique sensor identifier") + name: str = Field(..., description="Human-readable sensor name") + sensor_type: SensorType = Field(..., description="Type of sensor") + room: Optional[str] = Field(None, description="Room assignment") + status: SensorStatus = Field(default=SensorStatus.OFFLINE, description="Current sensor status") + + # Physical location and installation details + location: Optional[str] = Field(None, description="Physical location description") + floor: Optional[str] = Field(None, description="Floor level") + building: Optional[str] = Field(None, description="Building identifier") + + # Technical specifications + model: Optional[str] = Field(None, description="Sensor model") + manufacturer: Optional[str] = Field(None, description="Sensor manufacturer") + firmware_version: Optional[str] = Field(None, description="Firmware version") + hardware_version: Optional[str] = Field(None, description="Hardware version") + + # Network and connectivity + ip_address: Optional[str] = Field(None, description="IP address if network connected") + mac_address: Optional[str] = Field(None, description="MAC address") + connection_type: Optional[str] = Field(None, description="Connection type (wifi, ethernet, zigbee, etc.)") + + # Power and maintenance + battery_level: Optional[float] = Field(None, description="Battery level percentage") + last_maintenance: Optional[datetime] = Field(None, description="Last maintenance date") + next_maintenance: Optional[datetime] = Field(None, description="Next scheduled maintenance") + + # Operational settings + sampling_rate: Optional[int] = Field(None, description="Data sampling rate in seconds") + calibration_date: Optional[datetime] = Field(None, description="Last calibration date") + + # Capabilities from tiocps integration + monitoring_capabilities: List[str] = Field(default_factory=list, description="List of monitoring capabilities") + control_capabilities: List[str] = Field(default_factory=list, description="List of control capabilities") + demand_response_enabled: bool = Field(default=False, description="Demand response participation") + + # Timestamps + installed_at: Optional[datetime] = Field(None, description="Installation timestamp") + last_seen: Optional[datetime] = Field(None, description="Last communication timestamp") + created_at: datetime = Field(default_factory=datetime.utcnow, description="Record creation timestamp") + updated_at: datetime = Field(default_factory=datetime.utcnow, description="Record update timestamp") + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() if v else None + } + +class RoomMetrics(BaseModel): + """Enhanced room metrics from original dashboard""" + room: str = Field(..., description="Room identifier") + timestamp: int = Field(..., description="Metrics calculation timestamp") + created_at: datetime = Field(default_factory=datetime.utcnow, description="Record creation timestamp") + + # Sensor inventory + sensor_count: int = Field(0, description="Total number of sensors in room") + active_sensors: List[str] = Field(default_factory=list, description="List of active sensor IDs") + sensor_types: List[SensorType] = Field(default_factory=list, description="Types of sensors present") + + # Energy metrics (enhanced from tiocps) + energy: Optional[Dict[str, Any]] = Field(None, description="Energy consumption metrics") + power: Optional[Dict[str, Any]] = Field(None, description="Power consumption metrics") + generation: Optional[Dict[str, Any]] = Field(None, description="Energy generation metrics") + flexibility: Optional[Dict[str, Any]] = Field(None, description="Energy flexibility metrics") + + # Environmental metrics + co2: Optional[Dict[str, Any]] = Field(None, description="CO2 level metrics") + temperature: Optional[Dict[str, Any]] = Field(None, description="Temperature metrics") + humidity: Optional[Dict[str, Any]] = Field(None, description="Humidity metrics") + + # Occupancy and usage + occupancy_estimate: OccupancyLevel = Field(default=OccupancyLevel.LOW, description="Estimated occupancy level") + motion_detected: bool = Field(default=False, description="Recent motion detection status") + + # Time-based metrics + last_activity: Optional[datetime] = Field(None, description="Last detected activity timestamp") + daily_usage_hours: Optional[float] = Field(None, description="Estimated daily usage in hours") + + # Economic metrics from tiocps + energy_cost: Optional[float] = Field(None, description="Estimated energy cost") + savings_potential: Optional[float] = Field(None, description="Potential savings from optimization") + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() if v else None + } + +class Room(BaseModel): + """Room definition model""" + name: str = Field(..., description="Room name/identifier") + display_name: Optional[str] = Field(None, description="Human-readable room name") + floor: Optional[str] = Field(None, description="Floor level") + building: Optional[str] = Field(None, description="Building identifier") + area_m2: Optional[float] = Field(None, description="Room area in square meters") + capacity: Optional[int] = Field(None, description="Room capacity (people)") + room_type: Optional[str] = Field(None, description="Room type (office, meeting, etc.)") + + # Configuration + target_temperature: Optional[float] = Field(None, description="Target temperature") + target_co2: Optional[float] = Field(None, description="Target CO2 level") + operating_hours: Optional[Dict[str, Any]] = Field(None, description="Operating hours schedule") + + # Status + active: bool = Field(default=True, description="Whether room is active") + created_at: datetime = Field(default_factory=datetime.utcnow) + updated_at: datetime = Field(default_factory=datetime.utcnow) + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() + } + +class SystemEvent(BaseModel): + """Enhanced system events from original dashboard""" + event_id: str = Field(..., description="Unique event identifier") + event_type: str = Field(..., description="Type of event") + severity: Literal["info", "warning", "error", "critical"] = Field(..., description="Event severity") + timestamp: int = Field(..., description="Event timestamp") + created_at: datetime = Field(default_factory=datetime.utcnow, description="Record creation timestamp") + + # Event details + title: str = Field(..., description="Event title") + description: str = Field(..., description="Event description") + source: Optional[str] = Field(None, description="Event source (sensor_id, system component, etc.)") + + # Context + sensor_id: Optional[str] = Field(None, description="Related sensor ID") + room: Optional[str] = Field(None, description="Related room") + + # Event data + data: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Additional event data") + + # Status tracking + acknowledged: bool = Field(default=False, description="Whether event has been acknowledged") + resolved: bool = Field(default=False, description="Whether event has been resolved") + acknowledged_by: Optional[str] = Field(None, description="Who acknowledged the event") + resolved_by: Optional[str] = Field(None, description="Who resolved the event") + acknowledged_at: Optional[datetime] = Field(None, description="Acknowledgment timestamp") + resolved_at: Optional[datetime] = Field(None, description="Resolution timestamp") + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() if v else None + } + +class DataQuery(BaseModel): + """Enhanced data query parameters from original dashboard""" + sensor_ids: Optional[List[str]] = Field(None, description="Filter by sensor IDs") + rooms: Optional[List[str]] = Field(None, description="Filter by rooms") + sensor_types: Optional[List[SensorType]] = Field(None, description="Filter by sensor types") + + # Time range + start_time: Optional[int] = Field(None, description="Start timestamp (Unix)") + end_time: Optional[int] = Field(None, description="End timestamp (Unix)") + + # Aggregation + aggregate: Optional[str] = Field(None, description="Aggregation method (avg, sum, min, max)") + interval: Optional[str] = Field(None, description="Aggregation interval (1m, 5m, 1h, 1d)") + + # Pagination + limit: int = Field(default=100, description="Maximum number of records to return") + offset: int = Field(default=0, description="Number of records to skip") + + # Sorting + sort_by: str = Field(default="timestamp", description="Field to sort by") + sort_order: Literal["asc", "desc"] = Field(default="desc", description="Sort order") + + # Additional filters from tiocps + energy_threshold: Optional[float] = Field(None, description="Filter by energy threshold") + co2_threshold: Optional[float] = Field(None, description="Filter by CO2 threshold") + include_metadata: bool = Field(default=False, description="Include sensor metadata in response") + +class DataResponse(BaseModel): + """Enhanced response model for data queries""" + data: List[Dict[str, Any]] = Field(default_factory=list, description="Query results") + total_count: int = Field(0, description="Total number of matching records") + query: DataQuery = Field(..., description="Original query parameters") + execution_time_ms: float = Field(..., description="Query execution time in milliseconds") + + # Additional metadata + aggregation_applied: bool = Field(default=False, description="Whether data was aggregated") + cache_hit: bool = Field(default=False, description="Whether result was served from cache") + +class AnalyticsSummary(BaseModel): + """Comprehensive analytics summary""" + period_hours: int + start_time: datetime + end_time: datetime + + # Sensor analytics + total_sensors: int + active_sensors: int + sensor_types_summary: Dict[str, int] + + # Room analytics + total_rooms: int + active_rooms: int + room_occupancy_summary: Dict[str, int] + + # Energy analytics + total_energy_consumption: float + total_energy_generation: float + net_energy_consumption: float + energy_efficiency: float + + # Environmental analytics + average_co2: float + average_temperature: float + average_humidity: float + + # System health + system_events_count: int + critical_events_count: int + sensor_errors_count: int + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() + } + +class HealthResponse(BaseModel): + """Health check response""" + service: str + status: str + timestamp: datetime + version: str + + # Additional service-specific health metrics + total_sensors: Optional[int] = None + active_sensors: Optional[int] = None + total_rooms: Optional[int] = None + websocket_connections: Optional[int] = None + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() + } \ No newline at end of file diff --git a/microservices/sensor-service/requirements.txt b/microservices/sensor-service/requirements.txt new file mode 100644 index 0000000..f08d8eb --- /dev/null +++ b/microservices/sensor-service/requirements.txt @@ -0,0 +1,10 @@ +fastapi +uvicorn[standard] +pymongo +motor +redis +websockets +python-dotenv +pydantic +pandas +numpy \ No newline at end of file diff --git a/microservices/sensor-service/websocket_manager.py b/microservices/sensor-service/websocket_manager.py new file mode 100644 index 0000000..56b8e88 --- /dev/null +++ b/microservices/sensor-service/websocket_manager.py @@ -0,0 +1,288 @@ +""" +WebSocket manager for real-time sensor data broadcasting +""" + +import asyncio +import json +from typing import List, Set, Dict, Any +from fastapi import WebSocket, WebSocketDisconnect +import logging + +from models import SensorReading + +logger = logging.getLogger(__name__) + +class WebSocketManager: + """Manages WebSocket connections for real-time data broadcasting""" + + def __init__(self): + self.active_connections: List[WebSocket] = [] + self.room_subscriptions: Dict[str, Set[WebSocket]] = {} + self.sensor_subscriptions: Dict[str, Set[WebSocket]] = {} + self.connection_metadata: Dict[WebSocket, Dict[str, Any]] = {} + + async def connect(self, websocket: WebSocket, room: str = None, sensor_id: str = None): + """Accept a WebSocket connection and handle subscriptions""" + await websocket.accept() + self.active_connections.append(websocket) + + # Store connection metadata + self.connection_metadata[websocket] = { + "connected_at": asyncio.get_event_loop().time(), + "room": room, + "sensor_id": sensor_id, + "message_count": 0 + } + + # Handle room subscription + if room: + if room not in self.room_subscriptions: + self.room_subscriptions[room] = set() + self.room_subscriptions[room].add(websocket) + + # Handle sensor subscription + if sensor_id: + if sensor_id not in self.sensor_subscriptions: + self.sensor_subscriptions[sensor_id] = set() + self.sensor_subscriptions[sensor_id].add(websocket) + + logger.info(f"WebSocket client connected. Total connections: {len(self.active_connections)}") + + # Send initial connection confirmation + await self.send_to_connection(websocket, { + "type": "connection_established", + "timestamp": asyncio.get_event_loop().time(), + "subscriptions": { + "room": room, + "sensor_id": sensor_id + }, + "total_connections": len(self.active_connections) + }) + + async def disconnect(self, websocket: WebSocket): + """Remove a WebSocket connection and clean up subscriptions""" + if websocket in self.active_connections: + self.active_connections.remove(websocket) + + # Clean up room subscriptions + for room_connections in self.room_subscriptions.values(): + room_connections.discard(websocket) + + # Clean up sensor subscriptions + for sensor_connections in self.sensor_subscriptions.values(): + sensor_connections.discard(websocket) + + # Clean up metadata + self.connection_metadata.pop(websocket, None) + + logger.info(f"WebSocket client disconnected. Total connections: {len(self.active_connections)}") + + async def send_to_connection(self, websocket: WebSocket, data: Dict[str, Any]): + """Send data to a specific WebSocket connection""" + try: + await websocket.send_text(json.dumps(data)) + + # Update message count + if websocket in self.connection_metadata: + self.connection_metadata[websocket]["message_count"] += 1 + + except Exception as e: + logger.error(f"Error sending data to WebSocket: {e}") + await self.disconnect(websocket) + + async def broadcast_to_all(self, data: Dict[str, Any]): + """Broadcast data to all connected WebSocket clients""" + if not self.active_connections: + return + + message = json.dumps(data) + disconnected = [] + + for websocket in self.active_connections: + try: + await websocket.send_text(message) + + # Update message count + if websocket in self.connection_metadata: + self.connection_metadata[websocket]["message_count"] += 1 + + except Exception as e: + logger.error(f"Error broadcasting to WebSocket: {e}") + disconnected.append(websocket) + + # Clean up disconnected connections + for websocket in disconnected: + await self.disconnect(websocket) + + async def broadcast_to_room(self, room: str, data: Dict[str, Any]): + """Broadcast data to all clients subscribed to a specific room""" + if room not in self.room_subscriptions: + return + + room_connections = self.room_subscriptions[room].copy() + if not room_connections: + return + + message = json.dumps(data) + disconnected = [] + + for websocket in room_connections: + try: + await websocket.send_text(message) + + # Update message count + if websocket in self.connection_metadata: + self.connection_metadata[websocket]["message_count"] += 1 + + except Exception as e: + logger.error(f"Error broadcasting to room {room}: {e}") + disconnected.append(websocket) + + # Clean up disconnected connections + for websocket in disconnected: + await self.disconnect(websocket) + + async def broadcast_to_sensor(self, sensor_id: str, data: Dict[str, Any]): + """Broadcast data to all clients subscribed to a specific sensor""" + if sensor_id not in self.sensor_subscriptions: + return + + sensor_connections = self.sensor_subscriptions[sensor_id].copy() + if not sensor_connections: + return + + message = json.dumps(data) + disconnected = [] + + for websocket in sensor_connections: + try: + await websocket.send_text(message) + + # Update message count + if websocket in self.connection_metadata: + self.connection_metadata[websocket]["message_count"] += 1 + + except Exception as e: + logger.error(f"Error broadcasting to sensor {sensor_id}: {e}") + disconnected.append(websocket) + + # Clean up disconnected connections + for websocket in disconnected: + await self.disconnect(websocket) + + async def broadcast_sensor_data(self, sensor_reading: SensorReading): + """Broadcast sensor reading data to appropriate subscribers""" + data = { + "type": "sensor_data", + "sensor_id": sensor_reading.sensor_id, + "room": sensor_reading.room, + "sensor_type": sensor_reading.sensor_type.value, + "timestamp": sensor_reading.timestamp, + "data": { + "energy": sensor_reading.energy, + "co2": sensor_reading.co2, + "temperature": sensor_reading.temperature, + "humidity": sensor_reading.humidity, + "motion": sensor_reading.motion, + "power": sensor_reading.power, + "voltage": sensor_reading.voltage, + "current": sensor_reading.current, + "generation": sensor_reading.generation + }, + "metadata": sensor_reading.metadata + } + + # Broadcast to all connections + await self.broadcast_to_all(data) + + # Broadcast to room-specific subscribers + if sensor_reading.room: + await self.broadcast_to_room(sensor_reading.room, data) + + # Broadcast to sensor-specific subscribers + await self.broadcast_to_sensor(sensor_reading.sensor_id, data) + + async def broadcast_room_metrics(self, room: str, metrics: Dict[str, Any]): + """Broadcast room-level metrics to subscribers""" + data = { + "type": "room_metrics", + "room": room, + "timestamp": asyncio.get_event_loop().time(), + "metrics": metrics + } + + # Broadcast to all connections + await self.broadcast_to_all(data) + + # Broadcast to room-specific subscribers + await self.broadcast_to_room(room, data) + + async def broadcast_system_event(self, event: Dict[str, Any]): + """Broadcast system events to all subscribers""" + data = { + "type": "system_event", + "timestamp": asyncio.get_event_loop().time(), + "event": event + } + + await self.broadcast_to_all(data) + + async def broadcast_raw_data(self, raw_data: str): + """Broadcast raw data from Redis or other sources""" + try: + # Try to parse as JSON + data = json.loads(raw_data) + + # Add type if not present + if "type" not in data: + data["type"] = "raw_data" + + await self.broadcast_to_all(data) + + except json.JSONDecodeError: + # Send as raw string if not JSON + data = { + "type": "raw_data", + "data": raw_data, + "timestamp": asyncio.get_event_loop().time() + } + await self.broadcast_to_all(data) + + def get_connection_stats(self) -> Dict[str, Any]: + """Get statistics about current WebSocket connections""" + total_connections = len(self.active_connections) + room_stats = {room: len(connections) for room, connections in self.room_subscriptions.items()} + sensor_stats = {sensor: len(connections) for sensor, connections in self.sensor_subscriptions.items()} + + # Calculate message statistics + total_messages = sum( + metadata.get("message_count", 0) + for metadata in self.connection_metadata.values() + ) + + return { + "total_connections": total_connections, + "room_subscriptions": room_stats, + "sensor_subscriptions": sensor_stats, + "total_messages_sent": total_messages, + "active_rooms": len([room for room, connections in self.room_subscriptions.items() if connections]), + "active_sensors": len([sensor for sensor, connections in self.sensor_subscriptions.items() if connections]) + } + + async def send_connection_stats(self): + """Send connection statistics to all clients""" + stats = self.get_connection_stats() + data = { + "type": "connection_stats", + "timestamp": asyncio.get_event_loop().time(), + "stats": stats + } + await self.broadcast_to_all(data) + + async def ping_all_connections(self): + """Send ping to all connections to keep them alive""" + data = { + "type": "ping", + "timestamp": asyncio.get_event_loop().time() + } + await self.broadcast_to_all(data) \ No newline at end of file