diff --git a/microservices/mqtt_gecad.md b/microservices/mqtt_gecad.md deleted file mode 100644 index e69de29..0000000 diff --git a/monolith/src/core/__init__.py b/monolith/src/core/__init__.py new file mode 100644 index 0000000..09b0c5a --- /dev/null +++ b/monolith/src/core/__init__.py @@ -0,0 +1 @@ +"""Core infrastructure components for the modular monolith.""" diff --git a/monolith/src/core/config.py b/monolith/src/core/config.py new file mode 100644 index 0000000..96134ab --- /dev/null +++ b/monolith/src/core/config.py @@ -0,0 +1,56 @@ +"""Centralized configuration management.""" +import os +from typing import Dict, Optional +from pydantic_settings import BaseSettings + + +class Settings(BaseSettings): + """Application settings.""" + + # Application + app_name: str = "Energy Dashboard Monolith" + app_version: str = "1.0.0" + debug: bool = False + host: str = "0.0.0.0" + port: int = 8000 + + # MongoDB + mongo_url: str = os.getenv("MONGO_URL", "mongodb://admin:password123@localhost:27017/?authSource=admin") + + # Module-specific databases (preserving isolation) + sensors_db_name: str = "energy_dashboard_sensors" + demand_response_db_name: str = "energy_dashboard_demand_response" + data_ingestion_db_name: str = "digitalmente_ingestion" + main_db_name: str = "energy_dashboard" + + # Redis + redis_url: str = os.getenv("REDIS_URL", "redis://localhost:6379") + redis_enabled: bool = True # Can be disabled for full monolith mode + + # FTP Configuration (for data ingestion) + ftp_sa4cps_host: str = os.getenv("FTP_SA4CPS_HOST", "ftp.sa4cps.pt") + ftp_sa4cps_port: int = int(os.getenv("FTP_SA4CPS_PORT", "21")) + ftp_sa4cps_username: str = os.getenv("FTP_SA4CPS_USERNAME", "curvascarga@sa4cps.pt") + ftp_sa4cps_password: str = os.getenv("FTP_SA4CPS_PASSWORD", "") + ftp_sa4cps_remote_path: str = os.getenv("FTP_SA4CPS_REMOTE_PATH", "/SLGs/") + ftp_check_interval: int = int(os.getenv("FTP_CHECK_INTERVAL", "21600")) # 6 hours + ftp_skip_initial_scan: bool = os.getenv("FTP_SKIP_INITIAL_SCAN", "true").lower() == "true" + + # CORS + cors_origins: list = ["*"] + cors_allow_credentials: bool = True + cors_allow_methods: list = ["*"] + cors_allow_headers: list = ["*"] + + # Background Tasks + health_check_interval: int = 30 + event_scheduler_interval: int = 60 + auto_response_interval: int = 30 + + class Config: + env_file = ".env" + case_sensitive = False + + +# Global settings instance +settings = Settings() diff --git a/monolith/src/core/database.py b/monolith/src/core/database.py new file mode 100644 index 0000000..e1d50b2 --- /dev/null +++ b/monolith/src/core/database.py @@ -0,0 +1,85 @@ +"""Database connection management for all modules.""" +import logging +from typing import Optional, Dict +from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase +from .config import settings + +logger = logging.getLogger(__name__) + + +class DatabaseManager: + """Manages MongoDB connections for all modules.""" + + def __init__(self): + self._client: Optional[AsyncIOMotorClient] = None + self._databases: Dict[str, AsyncIOMotorDatabase] = {} + + async def connect(self): + """Establish connection to MongoDB.""" + try: + logger.info(f"Connecting to MongoDB: {settings.mongo_url}") + self._client = AsyncIOMotorClient(settings.mongo_url) + + # Test connection + await self._client.admin.command('ping') + logger.info("Successfully connected to MongoDB") + + # Initialize database references + self._databases = { + "main": self._client[settings.main_db_name], + "sensors": self._client[settings.sensors_db_name], + "demand_response": self._client[settings.demand_response_db_name], + "data_ingestion": self._client[settings.data_ingestion_db_name], + } + + except Exception as e: + logger.error(f"Failed to connect to MongoDB: {e}") + raise + + async def disconnect(self): + """Close MongoDB connection.""" + if self._client: + self._client.close() + logger.info("Disconnected from MongoDB") + + def get_database(self, name: str) -> AsyncIOMotorDatabase: + """Get database by name.""" + if name not in self._databases: + raise ValueError(f"Database '{name}' not configured") + return self._databases[name] + + @property + def client(self) -> AsyncIOMotorClient: + """Get the MongoDB client.""" + if not self._client: + raise RuntimeError("Database not connected. Call connect() first.") + return self._client + + @property + def main_db(self) -> AsyncIOMotorDatabase: + """Get main database.""" + return self.get_database("main") + + @property + def sensors_db(self) -> AsyncIOMotorDatabase: + """Get sensors database.""" + return self.get_database("sensors") + + @property + def demand_response_db(self) -> AsyncIOMotorDatabase: + """Get demand response database.""" + return self.get_database("demand_response") + + @property + def data_ingestion_db(self) -> AsyncIOMotorDatabase: + """Get data ingestion database.""" + return self.get_database("data_ingestion") + + +# Global database manager instance +db_manager = DatabaseManager() + + +async def get_database(name: str = "main") -> AsyncIOMotorDatabase: + """Dependency injection function for database access.""" + return db_manager.get_database(name) diff --git a/monolith/src/core/dependencies.py b/monolith/src/core/dependencies.py new file mode 100644 index 0000000..ff9caf7 --- /dev/null +++ b/monolith/src/core/dependencies.py @@ -0,0 +1,39 @@ +"""FastAPI dependency injection utilities.""" +from typing import Optional +from fastapi import Depends, HTTPException, status +from motor.motor_asyncio import AsyncIOMotorDatabase +import redis.asyncio as aioredis + +from .database import db_manager +from .redis import redis_manager +from .events import event_bus, EventBus + + +async def get_main_db() -> AsyncIOMotorDatabase: + """Get main database dependency.""" + return db_manager.main_db + + +async def get_sensors_db() -> AsyncIOMotorDatabase: + """Get sensors database dependency.""" + return db_manager.sensors_db + + +async def get_demand_response_db() -> AsyncIOMotorDatabase: + """Get demand response database dependency.""" + return db_manager.demand_response_db + + +async def get_data_ingestion_db() -> AsyncIOMotorDatabase: + """Get data ingestion database dependency.""" + return db_manager.data_ingestion_db + + +async def get_redis() -> Optional[aioredis.Redis]: + """Get Redis client dependency.""" + return redis_manager.client + + +def get_event_bus() -> EventBus: + """Get event bus dependency.""" + return event_bus diff --git a/monolith/src/core/events.py b/monolith/src/core/events.py new file mode 100644 index 0000000..2a43ffc --- /dev/null +++ b/monolith/src/core/events.py @@ -0,0 +1,137 @@ +"""In-process event bus for inter-module communication.""" +import asyncio +import logging +from typing import Dict, List, Callable, Any, Set +from collections import defaultdict +from dataclasses import dataclass +from datetime import datetime +import json + +logger = logging.getLogger(__name__) + + +@dataclass +class Event: + """Event data structure.""" + topic: str + data: Any + timestamp: datetime + source: str = "system" + + def to_dict(self) -> dict: + """Convert to dictionary.""" + return { + "topic": self.topic, + "data": self.data, + "timestamp": self.timestamp.isoformat(), + "source": self.source + } + + +class EventBus: + """ + In-process event bus for replacing Redis pub/sub. + Provides asynchronous event publishing and subscription. + """ + + def __init__(self): + self._subscribers: Dict[str, List[Callable]] = defaultdict(list) + self._event_history: List[Event] = [] + self._max_history: int = 1000 + self._lock = asyncio.Lock() + + async def publish(self, topic: str, data: Any, source: str = "system"): + """ + Publish an event to a topic. + + Args: + topic: Event topic/channel name + data: Event data (will be JSON serialized if dict) + source: Event source identifier + """ + event = Event( + topic=topic, + data=data, + timestamp=datetime.utcnow(), + source=source + ) + + # Store in history + async with self._lock: + self._event_history.append(event) + if len(self._event_history) > self._max_history: + self._event_history.pop(0) + + # Notify subscribers + if topic in self._subscribers: + logger.debug(f"Publishing event to topic '{topic}': {len(self._subscribers[topic])} subscribers") + + # Create tasks for all subscribers + tasks = [] + for callback in self._subscribers[topic]: + tasks.append(self._call_subscriber(callback, event)) + + # Execute all callbacks concurrently + if tasks: + await asyncio.gather(*tasks, return_exceptions=True) + else: + logger.debug(f"No subscribers for topic '{topic}'") + + async def _call_subscriber(self, callback: Callable, event: Event): + """Call a subscriber callback with error handling.""" + try: + if asyncio.iscoroutinefunction(callback): + await callback(event.data) + else: + callback(event.data) + except Exception as e: + logger.error(f"Error in event subscriber: {e}", exc_info=True) + + def subscribe(self, topic: str, callback: Callable): + """ + Subscribe to events on a topic. + + Args: + topic: Event topic/channel name + callback: Async or sync callback function that receives event data + """ + self._subscribers[topic].append(callback) + logger.info(f"Subscribed to topic '{topic}'. Total subscribers: {len(self._subscribers[topic])}") + + def unsubscribe(self, topic: str, callback: Callable): + """Unsubscribe from a topic.""" + if topic in self._subscribers and callback in self._subscribers[topic]: + self._subscribers[topic].remove(callback) + logger.info(f"Unsubscribed from topic '{topic}'") + + def get_topics(self) -> List[str]: + """Get list of all topics with subscribers.""" + return list(self._subscribers.keys()) + + def get_subscriber_count(self, topic: str) -> int: + """Get number of subscribers for a topic.""" + return len(self._subscribers.get(topic, [])) + + async def get_event_history(self, topic: str = None, limit: int = 100) -> List[Event]: + """Get event history, optionally filtered by topic.""" + async with self._lock: + if topic: + events = [e for e in self._event_history if e.topic == topic] + else: + events = self._event_history.copy() + + return events[-limit:] + + +# Global event bus instance +event_bus = EventBus() + + +# Common event topics (replaces Redis channels) +class EventTopics: + """Standard event topic names.""" + ENERGY_DATA = "energy_data" + DR_EVENTS = "dr_events" + SENSOR_EVENTS = "sensor_events" + SYSTEM_EVENTS = "system_events" + DATA_INGESTION = "data_ingestion" diff --git a/monolith/src/core/logging_config.py b/monolith/src/core/logging_config.py new file mode 100644 index 0000000..768ef13 --- /dev/null +++ b/monolith/src/core/logging_config.py @@ -0,0 +1,25 @@ +"""Logging configuration.""" +import logging +import sys +from .config import settings + + +def setup_logging(): + """Configure application logging.""" + log_level = logging.DEBUG if settings.debug else logging.INFO + + logging.basicConfig( + level=log_level, + format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", + handlers=[ + logging.StreamHandler(sys.stdout) + ] + ) + + # Set third-party loggers to WARNING + logging.getLogger("uvicorn").setLevel(logging.WARNING) + logging.getLogger("motor").setLevel(logging.WARNING) + logging.getLogger("redis").setLevel(logging.WARNING) + + logger = logging.getLogger(__name__) + logger.info(f"Logging configured. Level: {log_level}") diff --git a/monolith/src/core/redis.py b/monolith/src/core/redis.py new file mode 100644 index 0000000..751e98f --- /dev/null +++ b/monolith/src/core/redis.py @@ -0,0 +1,61 @@ +"""Redis connection management (optional, for caching).""" +import logging +from typing import Optional +import redis.asyncio as aioredis +from .config import settings + +logger = logging.getLogger(__name__) + + +class RedisManager: + """Manages Redis connection for caching.""" + + def __init__(self): + self._client: Optional[aioredis.Redis] = None + + async def connect(self): + """Establish connection to Redis.""" + if not settings.redis_enabled: + logger.info("Redis is disabled in settings") + return + + try: + logger.info(f"Connecting to Redis: {settings.redis_url}") + self._client = await aioredis.from_url( + settings.redis_url, + encoding="utf-8", + decode_responses=True + ) + + # Test connection + await self._client.ping() + logger.info("Successfully connected to Redis") + + except Exception as e: + logger.warning(f"Failed to connect to Redis: {e}. Continuing without Redis cache.") + self._client = None + + async def disconnect(self): + """Close Redis connection.""" + if self._client: + await self._client.close() + logger.info("Disconnected from Redis") + + @property + def client(self) -> Optional[aioredis.Redis]: + """Get the Redis client.""" + return self._client + + @property + def is_available(self) -> bool: + """Check if Redis is available.""" + return self._client is not None + + +# Global Redis manager instance +redis_manager = RedisManager() + + +async def get_redis() -> Optional[aioredis.Redis]: + """Dependency injection function for Redis access.""" + return redis_manager.client diff --git a/monolith/src/main.py b/monolith/src/main.py new file mode 100644 index 0000000..2d07304 --- /dev/null +++ b/monolith/src/main.py @@ -0,0 +1,306 @@ +""" +Main FastAPI application for the Energy Dashboard Modular Monolith. +Integrates all modules: sensors, demand-response, and data-ingestion. +""" + +import asyncio +import logging +from contextlib import asynccontextmanager +from datetime import datetime, timedelta +from fastapi import FastAPI, HTTPException +from fastapi.middleware.cors import CORSMiddleware + +# Core imports +from core.config import settings +from core.logging_config import setup_logging +from core.database import db_manager +from core.redis import redis_manager +from core.events import event_bus, EventTopics + +# Module imports +from modules.sensors.router import router as sensors_router +from modules.sensors.room_service import RoomService +from modules.sensors import WebSocketManager +from modules.demand_response import DemandResponseService + +# Setup logging +setup_logging() +logger = logging.getLogger(__name__) + + +# Background tasks +async def room_metrics_aggregation_task(): + """Periodically aggregate room-level metrics""" + logger.info("Starting room metrics aggregation task") + + while True: + try: + room_service = RoomService(db_manager.sensors_db, redis_manager.client) + await room_service.aggregate_all_room_metrics() + await asyncio.sleep(300) # 5 minutes + + except Exception as e: + logger.error(f"Error in room metrics aggregation: {e}") + await asyncio.sleep(600) + + +async def data_cleanup_task(): + """Periodic cleanup of old data""" + logger.info("Starting data cleanup task") + + while True: + try: + from modules.sensors import SensorService + + service = SensorService(db_manager.sensors_db, None) + cleanup_date = datetime.utcnow() - timedelta(days=90) + await service.cleanup_old_data(cleanup_date) + await asyncio.sleep(86400) # 24 hours + + except Exception as e: + logger.error(f"Error in data cleanup task: {e}") + await asyncio.sleep(7200) + + +async def event_scheduler_task(): + """Background task for checking and executing scheduled DR events""" + logger.info("Starting event scheduler task") + + while True: + try: + service = DemandResponseService(db_manager.demand_response_db, redis_manager.client) + await service.check_scheduled_events() + await asyncio.sleep(settings.event_scheduler_interval) + + except Exception as e: + logger.error(f"Error in event scheduler task: {e}") + await asyncio.sleep(120) + + +async def auto_response_task(): + """Background task for automatic demand response""" + logger.info("Starting auto-response task") + + while True: + try: + service = DemandResponseService(db_manager.demand_response_db, redis_manager.client) + await service.process_auto_responses() + await asyncio.sleep(settings.auto_response_interval) + + except Exception as e: + logger.error(f"Error in auto-response task: {e}") + await asyncio.sleep(90) + + +async def energy_data_event_subscriber(): + """Subscribe to internal event bus for energy data events""" + logger.info("Starting energy data event subscriber") + + async def handle_energy_data(data): + """Handle energy data events""" + try: + service = DemandResponseService(db_manager.demand_response_db, redis_manager.client) + sensor_id = data.get("sensorId") or data.get("sensor_id") + power_kw = data.get("value", 0.0) + + if sensor_id: + service.update_device_power_cache(sensor_id, power_kw) + + except Exception as e: + logger.error(f"Error processing energy data event: {e}") + + # Subscribe to energy data events + event_bus.subscribe(EventTopics.ENERGY_DATA, handle_energy_data) + + +async def ftp_monitoring_task(): + """Background task for FTP monitoring""" + logger.info("Starting FTP monitoring task") + + while True: + try: + from modules.data_ingestion import FTPMonitor, SLGProcessor + + ftp_monitor = FTPMonitor(db_manager.data_ingestion_db) + slg_processor = SLGProcessor(db_manager.data_ingestion_db) + + await ftp_monitor.check_and_process_files(slg_processor) + await asyncio.sleep(settings.ftp_check_interval) + + except Exception as e: + logger.error(f"Error in FTP monitoring task: {e}") + await asyncio.sleep(600) + + +# Application lifespan +@asynccontextmanager +async def lifespan(app: FastAPI): + """Manage application lifespan""" + logger.info(f"Starting {settings.app_name} v{settings.app_version}...") + + # Connect to databases + await db_manager.connect() + await redis_manager.connect() + + # Initialize default rooms + room_service = RoomService(db_manager.sensors_db, redis_manager.client) + await room_service.initialize_default_rooms() + + # Subscribe to internal events + await energy_data_event_subscriber() + + # Start background tasks + asyncio.create_task(room_metrics_aggregation_task()) + asyncio.create_task(data_cleanup_task()) + asyncio.create_task(event_scheduler_task()) + asyncio.create_task(auto_response_task()) + + # Start FTP monitoring if not skipping initial scan + if not settings.ftp_skip_initial_scan: + asyncio.create_task(ftp_monitoring_task()) + + logger.info("Application startup complete") + + yield + + logger.info("Shutting down application...") + + # Disconnect from databases + await db_manager.disconnect() + await redis_manager.disconnect() + + logger.info("Application shutdown complete") + + +# Create FastAPI application +app = FastAPI( + title=settings.app_name, + description="Modular monolithic architecture for Energy Dashboard", + version=settings.app_version, + lifespan=lifespan +) + +# Add CORS middleware +app.add_middleware( + CORSMiddleware, + allow_origins=settings.cors_origins, + allow_credentials=settings.cors_allow_credentials, + allow_methods=settings.cors_allow_methods, + allow_headers=settings.cors_allow_headers, +) + + +# Root endpoint +@app.get("/") +async def root(): + """Root endpoint""" + return { + "service": settings.app_name, + "version": settings.app_version, + "status": "running", + "timestamp": datetime.utcnow().isoformat() + } + + +# Health check endpoint +@app.get("/health") +async def health_check(): + """Global health check""" + try: + # Check database connection + await db_manager.main_db.command("ping") + + # Check Redis connection (optional) + redis_status = "disabled" + if redis_manager.is_available: + await redis_manager.client.ping() + redis_status = "healthy" + + return { + "service": settings.app_name, + "version": settings.app_version, + "status": "healthy", + "timestamp": datetime.utcnow().isoformat(), + "components": { + "database": "healthy", + "redis": redis_status, + "event_bus": "healthy" + }, + "modules": { + "sensors": "loaded", + "demand_response": "loaded", + "data_ingestion": "loaded" + } + } + + except Exception as e: + logger.error(f"Health check failed: {e}") + raise HTTPException(status_code=503, detail="Service Unavailable") + + +# System overview endpoint +@app.get("/api/v1/overview") +async def system_overview(): + """Get system overview""" + try: + from modules.sensors import SensorService + from modules.demand_response import DemandResponseService + + sensor_service = SensorService(db_manager.sensors_db, redis_manager.client) + dr_service = DemandResponseService(db_manager.demand_response_db, redis_manager.client) + + # Get sensor counts + all_sensors = await sensor_service.get_sensors() + active_sensors = [s for s in all_sensors if s.get("status") == "online"] + + # Get room counts + room_service = RoomService(db_manager.sensors_db, redis_manager.client) + all_rooms = await room_service.get_rooms() + + # Get DR event counts + active_events = len(dr_service.active_events) if hasattr(dr_service, 'active_events') else 0 + + return { + "timestamp": datetime.utcnow().isoformat(), + "sensors": { + "total": len(all_sensors), + "active": len(active_sensors), + "offline": len(all_sensors) - len(active_sensors) + }, + "rooms": { + "total": len(all_rooms) + }, + "demand_response": { + "active_events": active_events + }, + "event_bus": { + "topics": event_bus.get_topics(), + "total_topics": len(event_bus.get_topics()) + } + } + + except Exception as e: + logger.error(f"Error getting system overview: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + +# Include module routers with prefixes +app.include_router( + sensors_router, + prefix="/api/v1", + tags=["sensors"] +) + +# Note: Demand Response and Data Ingestion routers would be added here +# app.include_router(demand_response_router, prefix="/api/v1/demand-response", tags=["demand-response"]) +# app.include_router(data_ingestion_router, prefix="/api/v1/ingestion", tags=["data-ingestion"]) + + +if __name__ == "__main__": + import uvicorn + uvicorn.run( + "main:app", + host=settings.host, + port=settings.port, + reload=settings.debug + ) diff --git a/monolith/src/modules/data_ingestion/__init__.py b/monolith/src/modules/data_ingestion/__init__.py new file mode 100644 index 0000000..d47f85a --- /dev/null +++ b/monolith/src/modules/data_ingestion/__init__.py @@ -0,0 +1,11 @@ +"""Data Ingestion module - handles FTP monitoring and SA4CPS data processing.""" + +from .ftp_monitor import FTPMonitor +from .slg_processor import SLGProcessor +from .config import Config + +__all__ = [ + "FTPMonitor", + "SLGProcessor", + "Config", +] diff --git a/monolith/src/modules/data_ingestion/config.py b/monolith/src/modules/data_ingestion/config.py new file mode 100644 index 0000000..dbcb293 --- /dev/null +++ b/monolith/src/modules/data_ingestion/config.py @@ -0,0 +1,45 @@ +#!/usr/bin/env python3 +""" +Configuration for SA4CPS Data Ingestion Service +Simple configuration management for FTP and MongoDB connections +""" + +import os +from typing import Dict, Any + +# FTP Configuration for SA4CPS server +FTP_CONFIG: Dict[str, Any] = { + "host": os.getenv("FTP_SA4CPS_HOST", "ftp.sa4cps.pt"), + "username": os.getenv("FTP_SA4CPS_USERNAME", "curvascarga@sa4cps.pt"), + "password": os.getenv("FTP_SA4CPS_PASSWORD", 'n$WFtz9+bleN'), # Set via environment variable + "base_path": os.getenv("FTP_SA4CPS_REMOTE_PATH", "/SLGs/"), + "check_interval": int(os.getenv("FTP_CHECK_INTERVAL", "21600")), # 6 hours default + "skip_initial_scan": os.getenv("FTP_SKIP_INITIAL_SCAN", "true").lower() == "true", +} + +# MongoDB Configuration +# Debug environment variables +print(f"DEBUG: MONGO_URL env var = {os.getenv('MONGO_URL', 'NOT SET')}") +print(f"DEBUG: All env vars starting with MONGO: {[k for k in os.environ.keys() if k.startswith('MONGO')]}") + +MONGO_CONFIG: Dict[str, Any] = { + "connection_string": os.getenv( + "MONGO_URL", + "mongodb://admin:password123@localhost:27017/digitalmente_ingestion?authSource=admin" + ), + "database_name": os.getenv("MONGODB_DATABASE", "digitalmente_ingestion") +} + +# Logging Configuration +LOGGING_CONFIG: Dict[str, Any] = { + "level": os.getenv("LOG_LEVEL", "INFO"), + "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s" +} + +# Service Configuration +SERVICE_CONFIG: Dict[str, Any] = { + "name": "SA4CPS Data Ingestion Service", + "version": "1.0.0", + "port": int(os.getenv("SERVICE_PORT", "8008")), + "host": os.getenv("SERVICE_HOST", "0.0.0.0") +} diff --git a/monolith/src/modules/data_ingestion/database.py b/monolith/src/modules/data_ingestion/database.py new file mode 100644 index 0000000..485ec50 --- /dev/null +++ b/monolith/src/modules/data_ingestion/database.py @@ -0,0 +1,478 @@ +import logging +from datetime import datetime +from typing import List, Dict, Any, Optional +from pymongo import MongoClient +from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError + +from config import MONGO_CONFIG + +logger = logging.getLogger(__name__) + + +class DatabaseManager: + + def __init__(self): + self.client: Optional[MongoClient] = None + self.db = None + self.collections = {} + self.energy_collections_cache = {} # Cache for dynamically created energy data collections + + self.connection_string = MONGO_CONFIG["connection_string"] + self.database_name = MONGO_CONFIG["database_name"] + + logger.info(f"Database manager initialized for: {self.database_name}") + + async def connect(self): + try: + logger.info(f"Connecting to MongoDB at: {self.connection_string}") + self.client = MongoClient(self.connection_string, serverSelectionTimeoutMS=5000) + + await self.ping() + + self.db = self.client[self.database_name] + self.collections = { + 'files': self.db.sa4cps_files, + 'metadata': self.db.sa4cps_metadata, + 'scanned_directories': self.db.sa4cps_scanned_directories + } + + self._create_base_indexes() + + logger.info(f"Connected to MongoDB database: {self.database_name}") + + except (ConnectionFailure, ServerSelectionTimeoutError) as e: + logger.error(f"Failed to connect to MongoDB: {e}") + raise + + async def close(self): + """Close MongoDB connection""" + if self.client: + self.client.close() + logger.debug("MongoDB connection closed") + + async def ping(self): + """Test database connection""" + if not self.client: + raise ConnectionFailure("No database connection") + + try: + # Use async approach with timeout + import asyncio + import concurrent.futures + + # Run the ping command in a thread pool to avoid blocking + loop = asyncio.get_event_loop() + with concurrent.futures.ThreadPoolExecutor() as pool: + await asyncio.wait_for( + loop.run_in_executor(pool, self.client.admin.command, 'ping'), + timeout=3.0 # 3 second timeout for ping + ) + logger.debug("MongoDB ping successful") + except asyncio.TimeoutError: + logger.error("MongoDB ping timeout after 3 seconds") + raise ConnectionFailure("MongoDB ping timeout") + except ConnectionFailure as e: + logger.error(f"MongoDB ping failed - Server not available: {e}") + raise + except Exception as e: + logger.error(f"MongoDB ping failed with error: {e}") + raise ConnectionFailure(f"Ping failed: {e}") + + def _create_base_indexes(self): + """Create indexes for base collections (not energy data collections)""" + try: + self.collections['files'].create_index("filename", unique=True) + self.collections['files'].create_index("processed_at") + self.collections['files'].create_index("directory_path") + + self.collections['scanned_directories'].create_index("directory_path", unique=True) + self.collections['scanned_directories'].create_index("last_scanned") + self.collections['scanned_directories'].create_index("scan_status") + + logger.info("Database indexes created successfully") + except Exception as e: + logger.warning(f"Failed to create indexes: {e}") + + def _extract_level3_path(self, directory_path: str) -> Optional[str]: + """Extract level 3 directory path (SLGs/Community/Building) from full path""" + # Expected structure: /SLGs/Community/Building/... + parts = directory_path.strip('/').split('/') + + if len(parts) >= 3 and parts[0] == 'SLGs': + # Return SLGs/Community/Building + return '/'.join(parts[:3]) + + return None + + def _sanitize_collection_name(self, level3_path: str) -> str: + """Convert level 3 directory path to valid MongoDB collection name + + Example: SLGs/CommunityA/Building1 -> energy_data__CommunityA_Building1 + """ + parts = level3_path.strip('/').split('/') + + if len(parts) >= 3 and parts[0] == 'SLGs': + # Use Community_Building as the collection suffix + collection_suffix = f"{parts[1]}_{parts[2]}" + collection_name = f"energy_data__{collection_suffix}" + return collection_name + + # Fallback: sanitize the entire path + sanitized = level3_path.replace('/', '_').replace('.', '_').replace(' ', '_') + sanitized = sanitized.strip('_') + return f"energy_data__{sanitized}" + + def _get_energy_collection(self, directory_path: str): + """Get or create energy data collection for a specific level 3 directory path""" + level3_path = self._extract_level3_path(directory_path) + + if not level3_path: + logger.warning(f"Could not extract level 3 path from: {directory_path}, using default collection") + # Fallback to a default collection for non-standard paths + collection_name = "energy_data__other" + else: + collection_name = self._sanitize_collection_name(level3_path) + + # Check cache first + if collection_name in self.energy_collections_cache: + return self.energy_collections_cache[collection_name] + + # Create/get collection + collection = self.db[collection_name] + + # Create indexes for this energy collection + try: + collection.create_index([("filename", 1), ("timestamp", 1)]) + collection.create_index("timestamp") + collection.create_index("meter_id") + logger.debug(f"Created indexes for collection: {collection_name}") + except Exception as e: + logger.warning(f"Failed to create indexes for {collection_name}: {e}") + + # Cache the collection + self.energy_collections_cache[collection_name] = collection + logger.info(f"Initialized energy data collection: {collection_name} for path: {directory_path}") + + return collection + + def _list_energy_collections(self) -> List[str]: + """List all energy data collections in the database""" + try: + all_collections = self.db.list_collection_names() + # Filter collections that start with 'energy_data__' + energy_collections = [c for c in all_collections if c.startswith('energy_data__')] + return energy_collections + except Exception as e: + logger.error(f"Error listing energy collections: {e}") + return [] + + async def store_file_data(self, filename: str, records: List[Dict[str, Any]], directory_path: str = None) -> bool: + try: + current_time = datetime.now() + + # Determine which collection to use based on directory path + if directory_path: + energy_collection = self._get_energy_collection(directory_path) + level3_path = self._extract_level3_path(directory_path) + else: + logger.warning(f"No directory path provided for {filename}, using default collection") + energy_collection = self._get_energy_collection("/SLGs/unknown/unknown") + level3_path = None + + # Store file metadata + file_metadata = { + "filename": filename, + "directory_path": directory_path, + "level3_path": level3_path, + "record_count": len(records), + "processed_at": current_time, + "file_size": sum(len(str(record)) for record in records), + "status": "processed" + } + + # Insert or update file record + self.collections['files'].replace_one( + {"filename": filename}, + file_metadata, + upsert=True + ) + + # Add filename and processed timestamp to each record + for record in records: + record["filename"] = filename + record["processed_at"] = current_time + record["directory_path"] = directory_path + + # Insert energy data records into the appropriate collection + if records: + result = energy_collection.insert_many(records) + inserted_count = len(result.inserted_ids) + logger.debug(f"Stored {inserted_count} records from {filename} to {energy_collection.name}") + return True + + return False + + except Exception as e: + logger.error(f"Error storing data for {filename}: {e}") + + # Store error metadata + error_metadata = { + "filename": filename, + "directory_path": directory_path, + "processed_at": current_time, + "status": "error", + "error_message": str(e) + } + + self.collections['files'].replace_one( + {"filename": filename}, + error_metadata, + upsert=True + ) + + return False + + async def get_processed_files(self) -> List[str]: + """Get list of successfully processed files""" + try: + cursor = self.collections['files'].find( + {"status": "processed"}, + {"filename": 1, "_id": 0} + ) + + files = [] + for doc in cursor: + files.append(doc["filename"]) + + return files + + except Exception as e: + logger.error(f"Error getting processed files: {e}") + return [] + + async def is_file_processed(self, filename: str) -> bool: + """Mock check if file is processed""" + return filename in await self.get_processed_files() + + async def get_file_info(self, filename: str) -> Optional[Dict[str, Any]]: + """Get information about a specific file""" + try: + return self.collections['files'].find_one({"filename": filename}) + except Exception as e: + logger.error(f"Error getting file info for {filename}: {e}") + return None + + # Directory scanning tracking methods + # Note: Only level 4+ directories (/SLGs/Community/Building/SubDir) are tracked + # to avoid unnecessary caching of high-level organizational directories + + async def is_directory_scanned(self, directory_path: str, since_timestamp: datetime = None) -> bool: + """Check if directory has been scanned recently + + Note: Only level 4+ directories are tracked in the database + """ + try: + query = {"directory_path": directory_path, "scan_status": "complete"} + if since_timestamp: + query["last_scanned"] = {"$gte": since_timestamp} + + result = self.collections['scanned_directories'].find_one(query) + return result is not None + except Exception as e: + logger.error(f"Error checking directory scan status for {directory_path}: {e}") + return False + + async def mark_directory_scanned(self, directory_path: str, file_count: int, ftp_last_modified: datetime = None) -> bool: + """Mark directory as scanned with current timestamp""" + try: + scan_record = { + "directory_path": directory_path, + "last_scanned": datetime.now(), + "file_count": file_count, + "scan_status": "complete" + } + + if ftp_last_modified: + scan_record["ftp_last_modified"] = ftp_last_modified + + # Use upsert to update existing or create new record + self.collections['scanned_directories'].replace_one( + {"directory_path": directory_path}, + scan_record, + upsert=True + ) + + logger.debug(f"Marked directory as scanned: {directory_path} ({file_count} files)") + return True + + except Exception as e: + logger.error(f"Error marking directory as scanned {directory_path}: {e}") + return False + + async def get_scanned_directories(self) -> List[Dict[str, Any]]: + """Get all scanned directory records""" + try: + cursor = self.collections['scanned_directories'].find() + return list(cursor) + except Exception as e: + logger.error(f"Error getting scanned directories: {e}") + return [] + + async def should_skip_directory(self, directory_path: str, ftp_last_modified: datetime = None) -> bool: + """Determine if directory should be skipped based on scan history and modification time""" + try: + scan_record = self.collections['scanned_directories'].find_one( + {"directory_path": directory_path, "scan_status": "complete"} + ) + + if not scan_record: + return False # Never scanned, should scan + + # If we have FTP modification time and it's newer than our last scan, don't skip + if ftp_last_modified and scan_record.get("last_scanned"): + return ftp_last_modified <= scan_record["last_scanned"] + + # If directory was scanned successfully, skip it (assuming it's historical data) + return True + + except Exception as e: + logger.error(f"Error determining if directory should be skipped {directory_path}: {e}") + return False + + async def get_stats(self) -> Dict[str, Any]: + """Get database statistics including all energy collections""" + try: + stats = { + "database": self.database_name, + "timestamp": datetime.now().isoformat() + } + + # Count documents in base collections + for name, collection in self.collections.items(): + try: + count = collection.count_documents({}) + stats[f"{name}_count"] = count + except Exception as e: + stats[f"{name}_count"] = f"error: {e}" + + # Get all energy collections and their counts + try: + energy_collections = self._list_energy_collections() + energy_stats = [] + total_energy_records = 0 + + for collection_name in energy_collections: + collection = self.db[collection_name] + count = collection.count_documents({}) + total_energy_records += count + + energy_stats.append({ + "collection": collection_name, + "record_count": count + }) + + stats["energy_collections"] = energy_stats + stats["total_energy_collections"] = len(energy_collections) + stats["total_energy_records"] = total_energy_records + + except Exception as e: + stats["energy_collections"] = f"error: {e}" + + # Get recent files + try: + recent_files = [] + cursor = self.collections['files'].find( + {}, + {"filename": 1, "processed_at": 1, "record_count": 1, "status": 1, "directory_path": 1, "level3_path": 1, "_id": 0} + ).sort("processed_at", -1).limit(5) + + for doc in cursor: + if doc.get("processed_at"): + doc["processed_at"] = doc["processed_at"].isoformat() + recent_files.append(doc) + + stats["recent_files"] = recent_files + + except Exception as e: + stats["recent_files"] = f"error: {e}" + + return stats + + except Exception as e: + logger.error(f"Error getting database stats: {e}") + return {"error": str(e), "timestamp": datetime.now().isoformat()} + + async def get_energy_data(self, + filename: Optional[str] = None, + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None, + directory_path: Optional[str] = None, + limit: int = 100) -> List[Dict[str, Any]]: + """Retrieve energy data with optional filtering + + Args: + filename: Filter by specific filename + start_time: Filter by start timestamp + end_time: Filter by end timestamp + directory_path: Filter by specific directory path (level 3). If None, queries all collections + limit: Maximum number of records to return + """ + try: + query = {} + + if filename: + query["filename"] = filename + + if start_time or end_time: + time_query = {} + if start_time: + time_query["$gte"] = start_time + if end_time: + time_query["$lte"] = end_time + query["timestamp"] = time_query + + data = [] + + # If directory_path is specified, query only that collection + if directory_path: + collection = self._get_energy_collection(directory_path) + cursor = collection.find(query).sort("timestamp", -1).limit(limit) + + for doc in cursor: + data.append(self._format_energy_document(doc)) + + else: + # Query across all energy collections + energy_collection_names = self._list_energy_collections() + + # Collect data from all collections, then sort and limit + all_data = [] + per_collection_limit = max(limit, 1000) # Get more from each to ensure we have enough after sorting + + for collection_name in energy_collection_names: + collection = self.db[collection_name] + cursor = collection.find(query).sort("timestamp", -1).limit(per_collection_limit) + + for doc in cursor: + all_data.append(self._format_energy_document(doc)) + + # Sort all data by timestamp and apply final limit + all_data.sort(key=lambda x: x.get("timestamp", ""), reverse=True) + data = all_data[:limit] + + return data + + except Exception as e: + logger.error(f"Error retrieving energy data: {e}") + return [] + + def _format_energy_document(self, doc: Dict[str, Any]) -> Dict[str, Any]: + """Format energy document for API response""" + # Convert ObjectId to string and datetime to ISO string + if "_id" in doc: + doc["_id"] = str(doc["_id"]) + if "timestamp" in doc and hasattr(doc["timestamp"], "isoformat"): + doc["timestamp"] = doc["timestamp"].isoformat() + if "processed_at" in doc and hasattr(doc["processed_at"], "isoformat"): + doc["processed_at"] = doc["processed_at"].isoformat() + return doc diff --git a/monolith/src/modules/data_ingestion/ftp_monitor.py b/monolith/src/modules/data_ingestion/ftp_monitor.py new file mode 100644 index 0000000..8f2a1de --- /dev/null +++ b/monolith/src/modules/data_ingestion/ftp_monitor.py @@ -0,0 +1,339 @@ +import asyncio +from ftplib import FTP +import logging +import os +from datetime import datetime +from typing import List, Dict, Any, Optional +from dataclasses import dataclass +import tempfile + +from config import FTP_CONFIG +from slg_processor import SLGProcessor + +logger = logging.getLogger(__name__) + +@dataclass +class FTPFileInfo: + path: str + name: str + size: int + directory_path: str # Directory containing the file + modified_time: Optional[datetime] = None + + +class FTPMonitor: + def __init__(self, db_manager): + self.db_manager = db_manager + self.processor = SLGProcessor() + self.last_check: Optional[datetime] = None + self.processed_files: set = set() + self.files_processed_count = 0 + self.status = "initializing" + + self.ftp_host = FTP_CONFIG["host"] + self.ftp_user = FTP_CONFIG["username"] + self.ftp_pass = FTP_CONFIG["password"] + self.base_path = FTP_CONFIG["base_path"] + self.check_interval = FTP_CONFIG["check_interval"] + self.skip_initial_scan = FTP_CONFIG["skip_initial_scan"] + + logger.info(f"FTP Monitor initialized for {self.ftp_host}") + + async def initialize_processed_files_cache(self): + try: + # Add timeout to prevent blocking startup indefinitely + processed_file_names = await asyncio.wait_for( + self.db_manager.get_processed_files(), + timeout=10.0 # 10 second timeout + ) + + for filename in processed_file_names: + self.processed_files.add(filename) + + logger.info(f"Loaded {len(processed_file_names)} already processed files from database") + return len(processed_file_names) + except asyncio.TimeoutError: + logger.warning("Timeout loading processed files cache - continuing with empty cache") + return 0 + except Exception as e: + logger.error(f"Error loading processed files from database: {e}") + return 0 + + async def start_monitoring(self): + self.status = "initializing" + logger.info("Starting FTP monitoring loop") + + try: + await self.initialize_processed_files_cache() + logger.info("FTP monitor initialization completed") + except asyncio.CancelledError: + logger.info("FTP monitor initialization cancelled") + self.status = "stopped" + return + except Exception as e: + logger.error(f"Error during FTP monitor initialization: {e}") + self.status = "error" + try: + await asyncio.sleep(1800) + except asyncio.CancelledError: + logger.info("FTP monitor cancelled during error recovery") + self.status = "stopped" + return + + await asyncio.wait_for(self.check_for_new_files(), timeout=300.0) # 5 minute timeout + self.status = "running" + + # Optionally skip initial scan and wait for first scheduled interval + if self.skip_initial_scan: + logger.info(f"Skipping initial scan - waiting {self.check_interval/3600:.1f} hours for first scheduled check") + try: + await asyncio.sleep(self.check_interval) + except asyncio.CancelledError: + logger.info("FTP monitoring cancelled during initial wait") + self.status = "stopped" + return + + while True: + try: + # Add timeout to prevent indefinite blocking on FTP operations + await asyncio.wait_for(self.check_for_new_files(), timeout=300.0) # 5 minute timeout + self.status = "running" + + logger.info(f"Waiting {self.check_interval/3600:.1f} hours until next check") + await asyncio.sleep(self.check_interval) + + except asyncio.TimeoutError: + logger.warning("FTP check timed out after 5 minutes - will retry") + self.status = "error" + try: + await asyncio.sleep(900) # Wait 15 minutes before retry + except asyncio.CancelledError: + logger.info("FTP monitoring task cancelled during timeout recovery") + self.status = "stopped" + break + except asyncio.CancelledError: + logger.info("FTP monitoring task cancelled - shutting down gracefully") + self.status = "stopped" + break + except Exception as e: + self.status = "error" + logger.error(f"Error in monitoring loop: {e}") + try: + await asyncio.sleep(1800) # Wait 30 minutes before retry + except asyncio.CancelledError: + logger.info("FTP monitoring task cancelled during error recovery") + self.status = "stopped" + break + + async def check_for_new_files(self) -> Dict[str, Any]: + self.last_check = datetime.now() + logger.info(f"Checking FTP server at {self.last_check}") + + try: + with FTP(self.ftp_host) as ftp: + ftp.login(self.ftp_user, self.ftp_pass) + logger.info(f"Connected to FTP server: {self.ftp_host}") + + new_files = await self._find_slg_files(ftp) + + processed_count = 0 + skipped_count = 0 + for file_info in new_files: + # Check for cancellation during file processing loop + if asyncio.current_task().cancelled(): + raise asyncio.CancelledError() + + if file_info.name in self.processed_files: + logger.debug(f"Skipping already processed file (cached): {file_info.name}") + skipped_count += 1 + continue + + if await self.db_manager.is_file_processed(file_info.name): + logger.debug(f"Skipping already processed file (database): {file_info.name}") + self.processed_files.add(file_info.name) + skipped_count += 1 + continue + + logger.debug(f"Processing new file: {file_info.name}") + success = await self._process_file(ftp, file_info) + if success: + self.processed_files.add(file_info.name) + processed_count += 1 + logger.debug(f"Successfully processed file: {file_info.name} ({processed_count} total)") + self.files_processed_count += 1 + + result = { + "files_found": len(new_files), + "files_processed": processed_count, + "files_skipped": skipped_count, + "timestamp": self.last_check.isoformat() + } + + logger.info(f"Check complete: {result}") + return result + + except Exception as e: + logger.error(f"FTP check failed: {e}") + raise + + async def _find_slg_files(self, ftp: FTP) -> List[FTPFileInfo]: + files = [] + + try: + await self._scan_directories_iterative(ftp, self.base_path, files) + logger.info(f"Found {len(files)} .slg_v2 files across all directories") + return files + except Exception as e: + logger.error(f"Error scanning FTP directory: {e}") + return [] + + async def _scan_directories_iterative(self, ftp: FTP, base_path: str, files: List[FTPFileInfo]): + directories_to_scan = [(base_path, 0)] + visited_dirs = set() + skipped_dirs = 0 + scanned_dirs = 0 + + while directories_to_scan: + current_dir, current_depth = directories_to_scan.pop(0) # FIFO queue + + normalized_path = current_dir.rstrip('/') if current_dir != '/' else '/' + + if normalized_path in visited_dirs: + logger.debug(f"Skipping already visited directory: {normalized_path}") + continue + + visited_dirs.add(normalized_path) + + # Determine directory depth (level 4 = /SLGs/Community/Building/SubDir) + path_parts = normalized_path.strip('/').split('/') + directory_level = len(path_parts) + + # Check if directory should be skipped based on previous scans (only for level 4+) + if directory_level >= 4 and await self.db_manager.should_skip_directory(normalized_path): + logger.info(f"Skipping previously scanned level {directory_level} directory: {normalized_path}") + skipped_dirs += 1 + continue + + logger.debug(f"Scanning directory: {normalized_path} (depth: {current_depth}, queue: {len(directories_to_scan)})") + scanned_dirs += 1 + + try: + original_dir = ftp.pwd() + ftp.cwd(current_dir) + + dir_list = [] + ftp.retrlines('LIST', dir_list.append) + logger.debug(f"Found {len(dir_list)} entries in {normalized_path}") + + # Count files found in this directory + files_found_in_dir = 0 + + for line in dir_list: + parts = line.split() + if len(parts) >= 9: + filename = parts[-1] + permissions = parts[0] + + if filename in ['.', '..']: + continue + + if permissions.startswith('d'): + if normalized_path == '/': + subdirectory_path = f"/{filename}" + else: + subdirectory_path = f"{normalized_path}/{filename}" + + subdirectory_normalized = subdirectory_path.rstrip('/') if subdirectory_path != '/' else '/' + + if subdirectory_normalized not in visited_dirs: + directories_to_scan.append((subdirectory_path, current_depth + 1)) + logger.debug(f"Added to queue: {subdirectory_path}") + else: + logger.debug(f"Skipping already visited: {subdirectory_path}") + + elif filename.endswith('.sgl_v2'): + logger.debug(f"Found .slg_v2 file: {filename} in {normalized_path}") + try: + size = int(parts[4]) + if normalized_path == '/': + full_path = f"/{filename}" + else: + full_path = f"{normalized_path}/{filename}" + + files.append(FTPFileInfo( + path=full_path, + name=filename, + size=size, + directory_path=normalized_path + )) + files_found_in_dir += 1 + + except (ValueError, IndexError): + logger.warning(f"Could not parse file info for: {filename}") + + ftp.cwd(original_dir) + + # Mark directory as scanned (only for level 4+ directories) + if directory_level >= 4: + await self.db_manager.mark_directory_scanned(normalized_path, files_found_in_dir) + logger.debug(f"Completed scanning level {directory_level} directory: {normalized_path} ({files_found_in_dir} files found)") + else: + logger.debug(f"Completed scanning level {directory_level} directory (not saved to cache): {normalized_path} ({files_found_in_dir} files found)") + + except Exception as e: + logger.warning(f"Error scanning directory {normalized_path}: {e}") + continue + + logger.info(f"Iterative scan completed. Scanned: {scanned_dirs} directories, Skipped: {skipped_dirs} directories (Total visited: {len(visited_dirs)})") + + async def _process_file(self, ftp: FTP, file_info: FTPFileInfo) -> bool: + logger.debug(f"Processing file: {file_info.path} ({file_info.size} bytes) from directory: {file_info.directory_path}") + + try: + with tempfile.NamedTemporaryFile(mode='wb', suffix='.slg_v2', delete=False) as temp_file: + temp_path = temp_file.name + + with open(temp_path, 'wb') as f: + ftp.retrbinary(f'RETR {file_info.path}', f.write) + + records = await self.processor.process_file(temp_path, file_info.name) + + if records: + # Pass directory path to store_file_data for collection selection + await self.db_manager.store_file_data(file_info.name, records, file_info.directory_path) + logger.debug(f"Stored {len(records)} records from {file_info.name} to collection for {file_info.directory_path}") + return True + else: + logger.warning(f"No valid records found in {file_info.name}") + return False + + except Exception as e: + logger.error(f"Error processing file {file_info.name}: {e}") + return False + + finally: + try: + if 'temp_path' in locals(): + os.unlink(temp_path) + except OSError: + pass + + def get_status(self) -> str: + return self.status + + def get_last_check_time(self) -> Optional[str]: + return self.last_check.isoformat() if self.last_check else None + + def get_processed_count(self) -> int: + return self.files_processed_count + + def get_detailed_status(self) -> Dict[str, Any]: + return { + "status": self.status, + "last_check": self.get_last_check_time(), + "files_processed": self.files_processed_count, + "processed_files_count": len(self.processed_files), + "check_interval_hours": self.check_interval / 3600, + "ftp_host": self.ftp_host, + "base_path": self.base_path, + } diff --git a/monolith/src/modules/data_ingestion/slg_processor.py b/monolith/src/modules/data_ingestion/slg_processor.py new file mode 100644 index 0000000..d23e848 --- /dev/null +++ b/monolith/src/modules/data_ingestion/slg_processor.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python3 +""" +SA4CPS SLG_V2 File Processor +Simple parser for .slg_v2 energy data files +""" + +import logging +from datetime import datetime +from typing import List, Dict, Any, Optional +import re + +logger = logging.getLogger(__name__) + + +class SLGProcessor: + """Processes SA4CPS .slg_v2 files into structured energy data records""" + + def __init__(self): + self.processed_files = 0 + self.total_records = 0 + + async def process_file(self, file_path: str, filename: str) -> List[Dict[str, Any]]: + """Process a .slg_v2 file and return energy data records""" + logger.info(f"Processing SLG file: {filename}") + + try: + with open(file_path, 'r', encoding='utf-8') as file: + lines = file.readlines() + + records = [] + file_metadata = self._parse_metadata(lines[:5]) # Parse first 5 lines for metadata + + # Process data lines (lines starting with '20' are data records) + for line_num, line in enumerate(lines, 1): + line = line.strip() + + if line.startswith('20'): # Data record lines start with '20' (year) + record = self._parse_data_line(line, file_metadata, filename) + if record: + records.append(record) + + self.processed_files += 1 + self.total_records += len(records) + + logger.info(f"Processed {len(records)} records from {filename}") + return records + + except Exception as e: + logger.error(f"Error processing {filename}: {e}") + return [] + + def _parse_metadata(self, header_lines: List[str]) -> Dict[str, Any]: + """Parse metadata from SLG file header""" + metadata = { + "meter_id": None, + "measurement_type": None, + "unit": None, + "interval": None, + "period_start": None, + "period_end": None + } + + try: + for line in header_lines: + line = line.strip() + + if line.startswith('00'): # Header line with meter info + parts = line.split('\t') + if len(parts) >= 12: + metadata["meter_id"] = parts[3] # Meter ID + metadata["period_start"] = self._parse_date(parts[6]) + metadata["period_end"] = self._parse_date(parts[7]) + + elif line.startswith('01'): # Measurement configuration + parts = line.split('\t') + if len(parts) >= 10: + metadata["measurement_type"] = parts[4] # POTENCIA + metadata["unit"] = parts[5] # K (kW) + metadata["interval"] = parts[6] # 15M + + except Exception as e: + logger.warning(f"Error parsing metadata: {e}") + + return metadata + + def _parse_data_line(self, line: str, metadata: Dict[str, Any], filename: str) -> Optional[Dict[str, Any]]: + """Parse a data line into an energy record""" + try: + parts = line.split('\t') + + if len(parts) < 4: + return None + + # Parse timestamp (format: 20250201 0015) + date_part = parts[1] # 20250201 + time_part = parts[2] # 0015 + + # Convert to datetime + timestamp = self._parse_timestamp(date_part, time_part) + if not timestamp: + return None + + # Parse energy value + value_str = parts[3].replace('.', '') # Remove decimal separator + try: + value = float(value_str) / 1000.0 # Convert from thousandths + except ValueError: + value = 0.0 + + # Create record + record = { + "timestamp": timestamp, + "meter_id": metadata.get("meter_id", "unknown"), + "measurement_type": metadata.get("measurement_type", "energy"), + "value": value, + "unit": metadata.get("unit", "kW"), + "interval": metadata.get("interval", "15M"), + "filename": filename, + "quality": int(parts[4]) if len(parts) > 4 else 0 + } + + return record + + except Exception as e: + logger.warning(f"Error parsing data line '{line}': {e}") + return None + + def _parse_date(self, date_str: str) -> Optional[datetime]: + """Parse date string (YYYYMMDD format)""" + try: + if len(date_str) == 8 and date_str.isdigit(): + year = int(date_str[:4]) + month = int(date_str[4:6]) + day = int(date_str[6:8]) + return datetime(year, month, day) + except ValueError: + pass + return None + + def _parse_timestamp(self, date_str: str, time_str: str) -> Optional[datetime]: + """Parse timestamp from date and time strings""" + try: + # Parse date (YYYYMMDD) + if len(date_str) != 8 or not date_str.isdigit(): + return None + + year = int(date_str[:4]) + month = int(date_str[4:6]) + day = int(date_str[6:8]) + + # Parse time (HHMM) + if len(time_str) != 4 or not time_str.isdigit(): + return None + + hour = int(time_str[:2]) + if hour ==24: + hour = 0 + minute = int(time_str[2:4]) + + return datetime(year, month, day, hour, minute) + + except ValueError as e: + logger.warning(f"Error parsing timestamp '{date_str} {time_str}': {e}") + return None + + def get_stats(self) -> Dict[str, int]: + """Get processing statistics""" + return { + "files_processed": self.processed_files, + "total_records": self.total_records + } diff --git a/monolith/src/modules/demand_response/__init__.py b/monolith/src/modules/demand_response/__init__.py new file mode 100644 index 0000000..4f6bc16 --- /dev/null +++ b/monolith/src/modules/demand_response/__init__.py @@ -0,0 +1,21 @@ +"""Demand Response module - handles grid interaction and load management.""" + +from .models import ( + DemandResponseInvitation, + InvitationResponse, + EventRequest, + EventStatus, + LoadReductionRequest, + FlexibilityResponse +) +from .demand_response_service import DemandResponseService + +__all__ = [ + "DemandResponseInvitation", + "InvitationResponse", + "EventRequest", + "EventStatus", + "LoadReductionRequest", + "FlexibilityResponse", + "DemandResponseService", +] diff --git a/monolith/src/modules/demand_response/demand_response_service.py b/monolith/src/modules/demand_response/demand_response_service.py new file mode 100644 index 0000000..7b7eceb --- /dev/null +++ b/monolith/src/modules/demand_response/demand_response_service.py @@ -0,0 +1,747 @@ +""" +Demand Response Service - Core Business Logic +Handles DR invitations, event execution, auto-response, and flexibility calculation +""" + +import asyncio +import json +import uuid +from datetime import datetime, timedelta +from typing import List, Dict, Optional, Any +import logging + +from motor.motor_asyncio import AsyncIOMotorDatabase +import redis.asyncio as redis + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class DemandResponseService: + """Core Demand Response service business logic""" + + def __init__(self, db: AsyncIOMotorDatabase, redis_client: redis.Redis): + self.db = db + self.redis = redis_client + self.active_events: Dict[str, asyncio.Task] = {} # event_id -> task + self.device_power_cache: Dict[str, float] = {} # device_id -> power_kw (updated by Redis subscriber) + + # ===== INVITATION MANAGEMENT ===== + + async def send_invitation( + self, + event_time: datetime, + load_kwh: float, + load_percentage: float, + iots: List[str], + duration_minutes: int = 59 + ) -> Dict[str, Any]: + """ + Create and send DR invitation + Returns: {"event_id": str, "response": str, "message": str} + """ + logger.info(f"Creating DR invitation for {len(iots)} devices at {event_time}") + + # Generate unique event ID + event_id = str(uuid.uuid4()) + + # Check auto-response configuration + auto_config = await self.get_auto_response_config() + response = "YES" if auto_config.get("enabled", False) else "WAITING" + + # Create invitation document + invitation = { + "event_id": event_id, + "created_at": datetime.utcnow(), + "event_time": event_time, + "load_kwh": load_kwh, + "load_percentage": load_percentage, + "iots": iots, + "duration_minutes": duration_minutes, + "response": response, + "status": "pending" + } + + # Store in MongoDB + await self.db.demand_response_invitations.insert_one(invitation) + + # Cache in Redis for fast access (24 hour TTL) + cache_key = f"dr:invitation:{event_id}" + await self.redis.setex( + cache_key, + 86400, + json.dumps(invitation, default=str) + ) + + # Publish event to Redis pub/sub + await self.redis.publish("dr_events", json.dumps({ + "event": "invitation_created", + "event_id": event_id, + "event_time": event_time.isoformat(), + "load_kwh": load_kwh, + "response": response + })) + + logger.info(f"Invitation {event_id} created with response: {response}") + + return { + "event_id": event_id, + "response": response, + "message": "Invitation created successfully" + } + + async def answer_invitation( + self, + event_id: str, + iot_id: str, + response: str, + committed_reduction_kw: Optional[float] = None + ) -> Dict[str, Any]: + """ + Record device response to invitation + Returns: {"success": bool, "message": str} + """ + logger.info(f"Recording response for invitation {event_id}, device {iot_id}: {response}") + + # Validate invitation exists + invitation = await self.get_invitation(event_id) + if not invitation: + return {"success": False, "message": f"Invitation {event_id} not found"} + + if iot_id not in invitation["iots"]: + return {"success": False, "message": f"Device {iot_id} not in invitation"} + + # Check if already responded + existing = await self.db.demand_response_responses.find_one({ + "event_id": event_id, + "device_id": iot_id + }) + + if existing: + return {"success": False, "message": f"Device {iot_id} has already responded"} + + # Store response + response_doc = { + "event_id": event_id, + "device_id": iot_id, + "response": response, + "committed_reduction_kw": committed_reduction_kw, + "responded_at": datetime.utcnow() + } + + await self.db.demand_response_responses.insert_one(response_doc) + + # Check if all devices have responded + total_devices = len(invitation["iots"]) + total_responses = await self.db.demand_response_responses.count_documents({"event_id": event_id}) + + if total_responses == total_devices: + # All devices responded - update invitation status + yes_count = await self.db.demand_response_responses.count_documents({ + "event_id": event_id, + "response": "YES" + }) + + all_yes = yes_count == total_devices + new_response = "YES" if all_yes else "NO" + new_status = "scheduled" if all_yes else "cancelled" + + await self.db.demand_response_invitations.update_one( + {"event_id": event_id}, + {"$set": {"response": new_response, "status": new_status}} + ) + + logger.info(f"Invitation {event_id} final response: {new_response} (status: {new_status})") + + # Clear cache + await self.redis.delete(f"dr:invitation:{event_id}") + + # Publish event + await self.redis.publish("dr_events", json.dumps({ + "event": "invitation_answered", + "event_id": event_id, + "device_id": iot_id, + "response": response + })) + + return {"success": True, "message": "Response recorded successfully"} + + async def get_invitation(self, event_id: str) -> Optional[Dict[str, Any]]: + """ + Get invitation by event_id (with Redis caching) + """ + # Try cache first + cache_key = f"dr:invitation:{event_id}" + cached = await self.redis.get(cache_key) + if cached: + invitation = json.loads(cached) + return invitation + + # Fallback to MongoDB + invitation = await self.db.demand_response_invitations.find_one({"event_id": event_id}) + if invitation: + invitation["_id"] = str(invitation["_id"]) + + # Cache for 24 hours + await self.redis.setex( + cache_key, + 86400, + json.dumps(invitation, default=str) + ) + + return invitation + + return None + + async def get_unanswered_invitations(self) -> List[Dict[str, Any]]: + """Get all pending invitations awaiting response""" + cursor = self.db.demand_response_invitations.find({ + "response": "WAITING", + "status": "pending" + }).sort("created_at", -1) + + invitations = [] + async for inv in cursor: + inv["_id"] = str(inv["_id"]) + invitations.append(inv) + + return invitations + + async def get_answered_invitations(self, hours: int = 24, limit: int = 50) -> List[Dict[str, Any]]: + """Get recent answered invitations""" + start_time = datetime.utcnow() - timedelta(hours=hours) + + cursor = self.db.demand_response_invitations.find({ + "response": {"$ne": "WAITING"}, + "created_at": {"$gte": start_time} + }).sort("created_at", -1).limit(limit) + + invitations = [] + async for inv in cursor: + inv["_id"] = str(inv["_id"]) + invitations.append(inv) + + return invitations + + # ===== EVENT EXECUTION ===== + + async def schedule_event( + self, + event_time: datetime, + iots: List[str], + load_reduction_kw: float, + duration_minutes: int = 59 + ) -> Dict[str, Any]: + """ + Schedule a DR event for execution + Returns: {"event_id": str, "message": str} + """ + logger.info(f"Scheduling DR event for {len(iots)} devices at {event_time}") + + # Create event document + event_id = str(uuid.uuid4()) + end_time = event_time + timedelta(minutes=duration_minutes) + + event = { + "event_id": event_id, + "start_time": event_time, + "end_time": end_time, + "status": "scheduled", + "participating_devices": iots, + "target_reduction_kw": load_reduction_kw, + "actual_reduction_kw": 0.0, + "power_samples": [] + } + + await self.db.demand_response_events.insert_one(event) + + # Publish scheduled event + await self.redis.publish("dr_events", json.dumps({ + "event": "event_scheduled", + "event_id": event_id, + "start_time": event_time.isoformat(), + "end_time": end_time.isoformat(), + "devices": iots + })) + + logger.info(f"Event {event_id} scheduled successfully") + + return { + "event_id": event_id, + "message": "Event scheduled successfully" + } + + async def execute_event(self, event_id: str): + """ + Execute a DR event (spawns background task) + """ + logger.info(f"Executing DR event {event_id}") + + # Get event details + event = await self.db.demand_response_events.find_one({"event_id": event_id}) + if not event: + logger.error(f"Event {event_id} not found") + return + + # Update status to active + await self.db.demand_response_events.update_one( + {"event_id": event_id}, + {"$set": {"status": "active", "actual_start_time": datetime.utcnow()}} + ) + + # Publish event started + await self.redis.publish("dr_events", json.dumps({ + "event": "event_started", + "event_id": event_id, + "devices": event["participating_devices"] + })) + + # Create and store async task for this event + task = asyncio.create_task(self._run_event_loop(event)) + self.active_events[event_id] = task + + logger.info(f"DR event {event_id} started successfully") + + async def _run_event_loop(self, event: Dict[str, Any]): + """ + CRITICAL: Core event execution loop - runs for duration_minutes + Samples power every 5 seconds, accumulates reduction, handles cancellation + """ + event_id = event["event_id"] + end_time = event["end_time"] + devices = event["participating_devices"] + + total_reduction_kwh = 0.0 + sample_count = 0 + + logger.info(f"Starting event loop for {event_id}, ending at {end_time}") + + try: + while datetime.utcnow() < end_time: + # Get current power for all participating devices from cache + device_powers = { + device_id: self.device_power_cache.get(device_id, 0.0) + for device_id in devices + } + + # Calculate reduction for this 5-second interval + # interval_hours = 5.0 / 3600.0 = 0.00139 hours + interval_reduction_kwh = sum(device_powers.values()) * (5.0 / 3600.0) + total_reduction_kwh += interval_reduction_kwh + + sample_count += 1 + + # Store sample in MongoDB (every sample to maintain accuracy) + sample = { + "timestamp": datetime.utcnow(), + "device_powers": device_powers, + "interval_reduction_kwh": interval_reduction_kwh + } + + await self.db.demand_response_events.update_one( + {"event_id": event_id}, + { + "$push": {"power_samples": sample}, + "$set": {"actual_reduction_kw": total_reduction_kwh} + } + ) + + # Update Redis cache for fast access to current reduction + cache_key = f"dr:event:active:{event_id}" + await self.redis.setex( + cache_key, + 300, # 5 minute TTL + json.dumps({ + "event_id": event_id, + "current_reduction_kwh": total_reduction_kwh, + "devices": device_powers, + "last_update": datetime.utcnow().isoformat() + }, default=str) + ) + + # Publish progress every 10 samples (50 seconds) + if sample_count % 10 == 0: + await self.redis.publish("dr_events", json.dumps({ + "event": "event_progress", + "event_id": event_id, + "total_reduction_kwh": round(total_reduction_kwh, 3), + "device_powers": device_powers, + "timestamp": datetime.utcnow().isoformat() + })) + logger.info(f"Event {event_id} progress: {total_reduction_kwh:.3f} kWh ({sample_count} samples)") + + # Sleep for 5 seconds + await asyncio.sleep(5) + + # Event completed successfully + logger.info(f"Event {event_id} completed with {total_reduction_kwh:.3f} kWh reduction") + await self._complete_event(event_id, total_reduction_kwh) + + except asyncio.CancelledError: + logger.info(f"Event {event_id} cancelled by user") + await self._cancel_event(event_id) + raise + + except Exception as e: + logger.error(f"Error in event {event_id}: {e}", exc_info=True) + await self._cancel_event(event_id) + + async def _complete_event(self, event_id: str, total_reduction_kwh: float): + """Mark event as completed""" + await self.db.demand_response_events.update_one( + {"event_id": event_id}, + { + "$set": { + "status": "completed", + "actual_end_time": datetime.utcnow(), + "actual_reduction_kw": total_reduction_kwh + } + } + ) + + # Remove from active events + self.active_events.pop(event_id, None) + + # Clear cache + await self.redis.delete(f"dr:event:active:{event_id}") + + # Publish completion + await self.redis.publish("dr_events", json.dumps({ + "event": "event_completed", + "event_id": event_id, + "total_reduction_kwh": total_reduction_kwh + })) + + logger.info(f"DR event {event_id} marked as completed") + + async def _cancel_event(self, event_id: str): + """Internal method to cancel an event""" + await self.db.demand_response_events.update_one( + {"event_id": event_id}, + { + "$set": { + "status": "cancelled", + "cancelled_at": datetime.utcnow() + } + } + ) + + self.active_events.pop(event_id, None) + await self.redis.delete(f"dr:event:active:{event_id}") + + # Publish cancellation + await self.redis.publish("dr_events", json.dumps({ + "event": "event_cancelled", + "event_id": event_id, + "timestamp": datetime.utcnow().isoformat() + })) + + async def cancel_event(self, event_id: str): + """ + Public method to cancel a running DR event gracefully + """ + logger.info(f"Cancelling DR event {event_id}") + + # Cancel the async task + task = self.active_events.get(event_id) + if task and not task.done(): + task.cancel() + try: + await task + except asyncio.CancelledError: + # Expected - task cancelled successfully + pass + except Exception as e: + logger.error(f"Error cancelling event task {event_id}: {e}") + + # Update database status (if not already done by _cancel_event) + event = await self.db.demand_response_events.find_one({"event_id": event_id}) + if event and event.get("status") != "cancelled": + await self._cancel_event(event_id) + + logger.info(f"DR event {event_id} cancelled successfully") + + async def get_active_events(self) -> List[Dict[str, Any]]: + """Get currently running events with real-time data""" + cursor = self.db.demand_response_events.find({ + "status": "active" + }).sort("start_time", -1) + + events = [] + async for event in cursor: + event["_id"] = str(event["_id"]) + + # Add real-time data from cache + cache_key = f"dr:event:active:{event['event_id']}" + cached = await self.redis.get(cache_key) + if cached: + realtime_data = json.loads(cached) + event["current_reduction_kwh"] = realtime_data.get("current_reduction_kwh") + event["current_device_powers"] = realtime_data.get("devices") + + events.append(event) + + return events + + # ===== DEVICE POWER INTEGRATION ===== + + def update_device_power_cache(self, device_id: str, power_kw: float): + """ + Update device power cache (called by Redis subscriber) + This is synchronous because it's just updating a dict + """ + self.device_power_cache[device_id] = power_kw + # No logging here to avoid spam (called every few seconds per device) + + async def get_device_power(self, device_id: str) -> float: + """Get current power for a device from cache""" + return self.device_power_cache.get(device_id, 0.0) + + # ===== AUTO-RESPONSE CONFIGURATION ===== + + async def get_auto_response_config(self) -> Dict[str, Any]: + """Get auto-response configuration""" + config = await self.db.auto_response_config.find_one({"config_id": "default"}) + + if not config: + # Create default config + default_config = { + "config_id": "default", + "enabled": False, + "max_reduction_percentage": 20.0, + "response_delay_seconds": 300, + "min_notice_minutes": 60, + "updated_at": datetime.utcnow() + } + await self.db.auto_response_config.insert_one(default_config) + return default_config + + return config + + async def set_auto_response_config( + self, + enabled: bool, + max_reduction_percentage: float = 20.0, + response_delay_seconds: int = 300, + min_notice_minutes: int = 60 + ) -> Dict[str, Any]: + """Update auto-response configuration""" + await self.db.auto_response_config.update_one( + {"config_id": "default"}, + { + "$set": { + "enabled": enabled, + "max_reduction_percentage": max_reduction_percentage, + "response_delay_seconds": response_delay_seconds, + "min_notice_minutes": min_notice_minutes, + "updated_at": datetime.utcnow() + } + }, + upsert=True + ) + + # Clear cache + await self.redis.delete("dr:config:auto_response") + + logger.info(f"Auto-response config updated: enabled={enabled}") + + return await self.get_auto_response_config() + + async def process_auto_responses(self): + """ + Process pending invitations with auto-response (called by background task) + """ + # Get auto-response configuration + auto_config = await self.get_auto_response_config() + + if not auto_config.get("enabled"): + return + + # Find unanswered invitations + invitations = await self.get_unanswered_invitations() + + for invitation in invitations: + event_id = invitation["event_id"] + event_time = invitation["event_time"] + + # Parse event_time (might be string from cache) + if isinstance(event_time, str): + event_time = datetime.fromisoformat(event_time.replace('Z', '+00:00')) + + # Check if event is within auto-response criteria + time_until_event = (event_time - datetime.utcnow()).total_seconds() / 60 # minutes + min_notice = auto_config.get("min_notice_minutes", 60) + + if time_until_event >= min_notice: + logger.info(f"Auto-responding to invitation {event_id}") + + # Auto-accept for all devices + for device_id in invitation["iots"]: + # Check if already responded + existing = await self.db.demand_response_responses.find_one({ + "event_id": event_id, + "device_id": device_id + }) + + if not existing: + # Get device current power + device_power = await self.get_device_power(device_id) + + # Calculate committed reduction based on max_reduction_percentage + max_reduction_pct = auto_config.get("max_reduction_percentage", 20.0) + committed_reduction = device_power * (max_reduction_pct / 100) if device_power > 0 else 0.5 + + # Submit auto-response + try: + await self.answer_invitation(event_id, device_id, "YES", committed_reduction) + logger.info(f"Auto-accepted for device {device_id} with {committed_reduction:.2f} kW commitment") + except Exception as e: + logger.error(f"Error auto-responding for {device_id}: {e}") + else: + logger.debug(f"Invitation {event_id} too soon ({time_until_event:.0f}m < {min_notice}m)") + + # ===== BACKGROUND TASK SUPPORT ===== + + async def check_scheduled_events(self): + """ + Check for events that need to be started (called by scheduler task) + """ + now = datetime.utcnow() + threshold = now + timedelta(minutes=1) # Start events within next minute + + # Find scheduled events that should start + cursor = self.db.demand_response_events.find({ + "status": "scheduled", + "start_time": {"$lte": threshold, "$gte": now} + }) + + async for event in cursor: + event_id = event["event_id"] + + # Check if not already active + if event_id not in self.active_events: + logger.info(f"Starting scheduled DR event {event_id}") + await self.execute_event(event_id) + + # ===== BASIC FLEXIBILITY CALCULATION ===== + + async def get_current_flexibility(self) -> Dict[str, Any]: + """ + Calculate current available flexibility from device power cache + """ + total_flexibility_kw = 0.0 + devices = [] + + # Get all devices with instructions + cursor = self.db.device_instructions.find({}) + current_hour = datetime.utcnow().hour + + async for device_doc in cursor: + device_id = device_doc["device_id"] + instruction = device_doc["instructions"].get(str(current_hour), "off") + + if instruction != "off": + # Get device current power from cache + device_power = self.device_power_cache.get(device_id, 0.0) + + if instruction == "participation": + # Full flexibility (100%) + flexibility = device_power + elif instruction == "shifting": + # Partial flexibility (20%) + flexibility = device_power * 0.20 + else: + flexibility = 0.0 + + if flexibility > 0: + devices.append({ + "device_id": device_id, + "available_kw": round(flexibility, 2), + "instruction": instruction, + "current_power": round(device_power, 2) + }) + total_flexibility_kw += flexibility + + snapshot = { + "timestamp": datetime.utcnow(), + "total_flexibility_kw": round(total_flexibility_kw, 2), + "devices": devices + } + + # Store snapshot + await self.db.flexibility_snapshots.insert_one(dict(snapshot)) + + # Cache for 5 minutes + await self.redis.setex( + "dr:flexibility:current", + 300, + json.dumps(snapshot, default=str) + ) + + return snapshot + + async def get_device_instructions(self, device_id: Optional[str] = None) -> Dict[str, Any]: + """Get DR instructions for device(s)""" + if device_id: + doc = await self.db.device_instructions.find_one({"device_id": device_id}) + return doc if doc else {"device_id": device_id, "instructions": {}} + else: + cursor = self.db.device_instructions.find({}) + instructions = {} + async for doc in cursor: + instructions[doc["device_id"]] = doc["instructions"] + return instructions + + async def update_device_instructions(self, device_id: str, instructions: Dict[str, str]): + """Update hourly instructions for a device""" + await self.db.device_instructions.update_one( + {"device_id": device_id}, + { + "$set": { + "instructions": instructions, + "updated_at": datetime.utcnow() + } + }, + upsert=True + ) + + logger.info(f"Updated instructions for device {device_id}") + + # ===== ANALYTICS ===== + + async def get_performance_analytics(self, days: int = 30) -> Dict[str, Any]: + """Get DR performance analytics""" + start_date = datetime.utcnow() - timedelta(days=days) + + # Query completed events + cursor = self.db.demand_response_events.find({ + "status": "completed", + "start_time": {"$gte": start_date} + }) + + events = await cursor.to_list(length=None) + + if not events: + return { + "period_days": days, + "total_events": 0, + "total_reduction_kwh": 0.0, + "total_target_kwh": 0.0, + "average_reduction_kwh": 0.0, + "achievement_rate": 0.0, + "average_event_duration_minutes": 59 + } + + total_reduction = sum(e.get("actual_reduction_kw", 0) for e in events) + total_target = sum(e.get("target_reduction_kw", 0) for e in events) + + return { + "period_days": days, + "total_events": len(events), + "total_reduction_kwh": round(total_reduction, 2), + "total_target_kwh": round(total_target, 2), + "average_reduction_kwh": round(total_reduction / len(events), 2), + "achievement_rate": round((total_reduction / total_target * 100) if total_target > 0 else 0, 2), + "average_event_duration_minutes": 59 + } diff --git a/monolith/src/modules/demand_response/models.py b/monolith/src/modules/demand_response/models.py new file mode 100644 index 0000000..6f149c0 --- /dev/null +++ b/monolith/src/modules/demand_response/models.py @@ -0,0 +1,338 @@ +""" +Pydantic models for Demand Response Service +""" + +from datetime import datetime +from typing import List, Dict, Optional, Literal +from pydantic import BaseModel, Field +from enum import Enum + + +# Enums +class InvitationStatus(str, Enum): + """Invitation status states""" + PENDING = "pending" + SCHEDULED = "scheduled" + ACTIVE = "active" + COMPLETED = "completed" + CANCELLED = "cancelled" + + +class ResponseType(str, Enum): + """Device response types""" + WAITING = "WAITING" + YES = "YES" + NO = "NO" + + +class EventStatus(str, Enum): + """DR event status states""" + SCHEDULED = "scheduled" + ACTIVE = "active" + COMPLETED = "completed" + CANCELLED = "cancelled" + + +class InstructionType(str, Enum): + """Device participation instruction types""" + PARTICIPATION = "participation" # Full DR participation (100%) + SHIFTING = "shifting" # Partial participation (0-20%) + OFF = "off" # No DR participation + + +# Invitation Models +class EventRequest(BaseModel): + """Request model for creating a DR event (alias for DRInvitationCreate)""" + event_time: datetime = Field(..., description="When the DR event should occur") + load_kwh: float = Field(..., description="Target load reduction in kWh", gt=0) + load_percentage: float = Field(..., description="Target reduction as percentage of total load", ge=0, le=100) + iots: List[str] = Field(..., description="List of device IDs to participate", min_items=1) + duration_minutes: int = Field(59, description="Event duration in minutes", gt=0, le=120) + + class Config: + json_schema_extra = { + "example": { + "event_time": "2025-12-10T14:00:00", + "load_kwh": 5.0, + "load_percentage": 15.0, + "iots": ["sensor_1", "sensor_2"], + "duration_minutes": 59 + } + } + + +class DRInvitationCreate(BaseModel): + """Request model for creating a DR invitation""" + event_time: datetime = Field(..., description="When the DR event should occur") + load_kwh: float = Field(..., description="Target load reduction in kWh", gt=0) + load_percentage: float = Field(..., description="Target reduction as percentage of total load", ge=0, le=100) + iots: List[str] = Field(..., description="List of device IDs to participate", min_items=1) + duration_minutes: int = Field(59, description="Event duration in minutes", gt=0, le=120) + + class Config: + json_schema_extra = { + "example": { + "event_time": "2025-12-10T14:00:00", + "load_kwh": 5.0, + "load_percentage": 15.0, + "iots": ["sensor_1", "sensor_2"], + "duration_minutes": 59 + } + } + + +class DRInvitationResponse(BaseModel): + """Response model for device answering invitation""" + event_id: str = Field(..., description="Event identifier") + iot_id: str = Field(..., description="Device identifier") + response: ResponseType = Field(..., description="Device response (YES/NO)") + committed_reduction_kw: Optional[float] = Field(None, description="Committed power reduction in kW", ge=0) + + class Config: + json_schema_extra = { + "example": { + "event_id": "550e8400-e29b-41d4-a716-446655440000", + "iot_id": "sensor_1", + "response": "YES", + "committed_reduction_kw": 2.5 + } + } + + +class DRInvitation(BaseModel): + """Full DR invitation model""" + event_id: str = Field(..., description="Unique event identifier") + created_at: datetime = Field(..., description="Invitation creation time") + event_time: datetime = Field(..., description="Scheduled event start time") + load_kwh: float = Field(..., description="Target load reduction in kWh") + load_percentage: float = Field(..., description="Target reduction percentage") + iots: List[str] = Field(..., description="Participating device IDs") + duration_minutes: int = Field(..., description="Event duration in minutes") + response: str = Field(..., description="Overall response status") + status: str = Field(..., description="Invitation status") + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() if v else None + } + json_schema_extra = { + "example": { + "event_id": "550e8400-e29b-41d4-a716-446655440000", + "created_at": "2025-12-10T13:45:00", + "event_time": "2025-12-10T14:00:00", + "load_kwh": 5.0, + "load_percentage": 15.0, + "iots": ["sensor_1", "sensor_2"], + "duration_minutes": 59, + "response": "WAITING", + "status": "pending" + } + } + + +# Event Models +class EventScheduleRequest(BaseModel): + """Request model for scheduling a DR event""" + event_time: datetime = Field(..., description="Event start time") + iots: List[str] = Field(..., description="Participating device IDs", min_items=1) + load_reduction_kw: float = Field(..., description="Target reduction in kW", gt=0) + duration_minutes: int = Field(59, description="Event duration in minutes", gt=0, le=120) + + class Config: + json_schema_extra = { + "example": { + "event_time": "2025-12-10T14:00:00", + "iots": ["sensor_1", "sensor_2"], + "load_reduction_kw": 5.0, + "duration_minutes": 59 + } + } + + +class PowerSample(BaseModel): + """Individual power sample during event""" + timestamp: datetime = Field(..., description="Sample timestamp") + device_powers: Dict[str, float] = Field(..., description="Device power readings (device_id -> kW)") + interval_reduction_kwh: Optional[float] = Field(None, description="Reduction for this interval") + + +class DREvent(BaseModel): + """DR event execution model""" + event_id: str = Field(..., description="Unique event identifier") + invitation_id: Optional[str] = Field(None, description="Source invitation ID if applicable") + start_time: datetime = Field(..., description="Event start time") + end_time: datetime = Field(..., description="Event end time") + status: EventStatus = Field(..., description="Event status") + participating_devices: List[str] = Field(..., description="Device IDs participating") + target_reduction_kw: float = Field(..., description="Target power reduction in kW") + actual_reduction_kw: float = Field(0.0, description="Actual achieved reduction in kWh") + power_samples: List[Dict] = Field(default_factory=list, description="Power samples during event") + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() if v else None + } + + +class ActiveEventResponse(BaseModel): + """Response model for active event with real-time data""" + event_id: str = Field(..., description="Event identifier") + status: EventStatus = Field(..., description="Current status") + start_time: datetime = Field(..., description="Event start time") + end_time: datetime = Field(..., description="Event end time") + participating_devices: List[str] = Field(..., description="Participating devices") + target_reduction_kw: float = Field(..., description="Target reduction") + actual_reduction_kw: float = Field(..., description="Current achieved reduction") + current_device_powers: Optional[Dict[str, float]] = Field(None, description="Current device power readings") + progress_percentage: Optional[float] = Field(None, description="Event progress (0-100%)") + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() if v else None + } + + +class LoadReductionRequest(BaseModel): + """Request model for executing load reduction""" + event_time: datetime = Field(..., description="Event start time") + iot: str = Field(..., description="Device ID") + + class Config: + json_schema_extra = { + "example": { + "event_time": "2025-12-10T14:00:00", + "iot": "sensor_1" + } + } + + +# Flexibility Models +class DeviceFlexibility(BaseModel): + """Per-device flexibility information""" + device_id: str = Field(..., description="Device identifier") + available_kw: float = Field(..., description="Available flexibility in kW", ge=0) + instruction: str = Field(..., description="Current DR instruction") + current_power: float = Field(..., description="Current power consumption in kW", ge=0) + + +class FlexibilityResponse(BaseModel): + """Response model for current flexibility""" + timestamp: datetime = Field(..., description="Calculation timestamp") + total_flexibility_kw: float = Field(..., description="Total available flexibility in kW", ge=0) + devices: List[DeviceFlexibility] = Field(..., description="Per-device breakdown") + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() if v else None + } + json_schema_extra = { + "example": { + "timestamp": "2025-12-10T13:45:00", + "total_flexibility_kw": 15.5, + "devices": [ + { + "device_id": "sensor_1", + "available_kw": 3.5, + "instruction": "participation", + "current_power": 3.5 + }, + { + "device_id": "sensor_2", + "available_kw": 0.8, + "instruction": "shifting", + "current_power": 4.0 + } + ] + } + } + + +class DeviceInstructionUpdate(BaseModel): + """Model for updating device instructions""" + device_id: str = Field(..., description="Device identifier") + instructions: Dict[str, str] = Field(..., description="Hourly instructions (hour -> instruction type)") + + class Config: + json_schema_extra = { + "example": { + "device_id": "sensor_1", + "instructions": { + "0": "participation", + "1": "shifting", + "2": "off", + "3": "participation" + } + } + } + + +# Configuration Models +class AutoResponseConfig(BaseModel): + """Auto-response configuration model""" + enabled: bool = Field(..., description="Whether auto-response is enabled") + max_reduction_percentage: float = Field(20.0, description="Maximum reduction percentage for auto-accept", ge=0, le=100) + response_delay_seconds: int = Field(300, description="Delay before auto-responding (seconds)", ge=0) + min_notice_minutes: int = Field(60, description="Minimum notice required for auto-accept (minutes)", ge=0) + + class Config: + json_schema_extra = { + "example": { + "enabled": True, + "max_reduction_percentage": 20.0, + "response_delay_seconds": 300, + "min_notice_minutes": 60 + } + } + + +# Response Models +class InvitationSendResponse(BaseModel): + """Response for sending invitation""" + event_id: str = Field(..., description="Created event identifier") + response: str = Field(..., description="Initial response status") + message: str = Field(..., description="Status message") + + +class InvitationAnswerResponse(BaseModel): + """Response for answering invitation""" + success: bool = Field(..., description="Whether answer was recorded") + message: str = Field(..., description="Status message") + + +class EventScheduleResponse(BaseModel): + """Response for scheduling event""" + event_id: str = Field(..., description="Scheduled event identifier") + message: str = Field(..., description="Status message") + + +class PerformanceAnalytics(BaseModel): + """Performance analytics response""" + period_days: int = Field(..., description="Analysis period in days") + total_events: int = Field(..., description="Total number of events") + total_reduction_kwh: float = Field(..., description="Total energy reduced") + total_target_kwh: float = Field(..., description="Total target reduction") + average_reduction_kwh: float = Field(..., description="Average reduction per event") + achievement_rate: float = Field(..., description="Achievement rate (%)") + average_event_duration_minutes: int = Field(..., description="Average event duration") + + +# Health Check Model +class HealthResponse(BaseModel): + """Health check response model""" + service: str = Field(..., description="Service name") + status: str = Field(..., description="Service status") + timestamp: datetime = Field(..., description="Check timestamp") + version: str = Field(..., description="Service version") + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() if v else None + } + json_schema_extra = { + "example": { + "service": "demand-response-service", + "status": "healthy", + "timestamp": "2025-12-10T13:45:00", + "version": "1.0.0" + } + } diff --git a/monolith/src/modules/sensors/__init__.py b/monolith/src/modules/sensors/__init__.py new file mode 100644 index 0000000..ff035db --- /dev/null +++ b/monolith/src/modules/sensors/__init__.py @@ -0,0 +1,39 @@ +"""Sensors module - handles sensor management, rooms, and analytics.""" + +from .models import ( + SensorReading, + SensorMetadata, + RoomMetrics, + SystemEvent, + Room, + RoomCreate, + RoomUpdate, + RoomInfo, + SensorType, + SensorStatus, + CO2Status, + OccupancyLevel +) +from .sensor_service import SensorService +from .room_service import RoomService +from .analytics_service import AnalyticsService +from .websocket_manager import WebSocketManager + +__all__ = [ + "SensorReading", + "SensorMetadata", + "RoomMetrics", + "SystemEvent", + "Room", + "RoomCreate", + "RoomUpdate", + "RoomInfo", + "SensorType", + "SensorStatus", + "CO2Status", + "OccupancyLevel", + "SensorService", + "RoomService", + "AnalyticsService", + "WebSocketManager", +] diff --git a/monolith/src/modules/sensors/analytics_service.py b/monolith/src/modules/sensors/analytics_service.py new file mode 100644 index 0000000..3ae02cc --- /dev/null +++ b/monolith/src/modules/sensors/analytics_service.py @@ -0,0 +1,377 @@ +""" +Analytics service for processing sensor data and generating insights +""" + +import logging +from datetime import datetime, timedelta +from typing import List, Dict, Any, Optional +import json + +logger = logging.getLogger(__name__) + +class AnalyticsService: + """Service for analytics and data processing""" + + def __init__(self, db, redis_client): + self.db = db + self.redis = redis_client + + async def query_data(self, query_params) -> Dict[str, Any]: + """Execute advanced data query""" + try: + # Build query + query = {} + + if hasattr(query_params, 'sensor_ids') and query_params.sensor_ids: + query["sensor_id"] = {"$in": query_params.sensor_ids} + + if hasattr(query_params, 'start_time') and query_params.start_time: + query.setdefault("timestamp", {})["$gte"] = query_params.start_time + + if hasattr(query_params, 'end_time') and query_params.end_time: + query.setdefault("timestamp", {})["$lte"] = query_params.end_time + + # Execute query + cursor = self.db.sensor_readings.find(query) + + if hasattr(query_params, 'limit') and query_params.limit: + cursor = cursor.limit(query_params.limit) + + if hasattr(query_params, 'offset') and query_params.offset: + cursor = cursor.skip(query_params.offset) + + cursor = cursor.sort("timestamp", -1) + + # Get results + results = [] + async for reading in cursor: + reading["_id"] = str(reading["_id"]) + results.append(reading) + + # Get total count + total_count = await self.db.sensor_readings.count_documents(query) + + return { + "data": results, + "total_count": total_count, + "query": query_params.__dict__ if hasattr(query_params, '__dict__') else {}, + "execution_time_ms": 0 # Placeholder + } + + except Exception as e: + logger.error(f"Error executing data query: {e}") + raise + + async def get_analytics_summary(self, hours: int = 24) -> Dict[str, Any]: + """Get comprehensive analytics summary""" + try: + start_time = datetime.utcnow() - timedelta(hours=hours) + + # Get basic statistics + pipeline = [ + { + "$match": { + "created_at": {"$gte": start_time} + } + }, + { + "$group": { + "_id": None, + "total_readings": {"$sum": 1}, + "average_value": {"$avg": "$value"}, + "min_value": {"$min": "$value"}, + "max_value": {"$max": "$value"}, + "unique_sensors": {"$addToSet": "$sensor_id"} + } + } + ] + + cursor = self.db.sensor_readings.aggregate(pipeline) + stats = await cursor.to_list(length=1) + + base_stats = stats[0] if stats else { + "total_readings": 0, + "average_value": 0, + "min_value": 0, + "max_value": 0, + "unique_sensors": [] + } + + # Get room-level statistics + room_stats = await self._get_room_analytics(hours) + + # Get energy trends + energy_trends = await self._get_energy_trends(hours) + + return { + "period_hours": hours, + "timestamp": datetime.utcnow().isoformat(), + "total_readings": base_stats["total_readings"], + "unique_sensors": len(base_stats["unique_sensors"]), + "value_statistics": { + "average": round(base_stats["average_value"], 2) if base_stats["average_value"] else 0, + "minimum": base_stats["min_value"], + "maximum": base_stats["max_value"] + }, + "room_statistics": room_stats, + "energy_trends": energy_trends + } + + except Exception as e: + logger.error(f"Error getting analytics summary: {e}") + raise + + async def get_energy_analytics(self, hours: int = 24, room: Optional[str] = None) -> Dict[str, Any]: + """Get energy-specific analytics""" + try: + start_time = datetime.utcnow() - timedelta(hours=hours) + + # Build query + query = {"created_at": {"$gte": start_time}} + if room: + query["room"] = room + + # Energy consumption over time + pipeline = [ + {"$match": query}, + { + "$group": { + "_id": { + "hour": {"$hour": "$created_at"}, + "date": {"$dateToString": {"format": "%Y-%m-%d", "date": "$created_at"}} + }, + "total_energy": {"$sum": "$value"}, + "reading_count": {"$sum": 1} + } + }, + {"$sort": {"_id.date": 1, "_id.hour": 1}} + ] + + cursor = self.db.sensor_readings.aggregate(pipeline) + hourly_data = [] + + async for data in cursor: + hourly_data.append({ + "hour": data["_id"]["hour"], + "date": data["_id"]["date"], + "total_energy": data["total_energy"], + "reading_count": data["reading_count"] + }) + + # Peak consumption analysis + peak_analysis = await self._get_peak_consumption_analysis(query) + + # Energy efficiency metrics + efficiency_metrics = await self._get_efficiency_metrics(query) + + return { + "period_hours": hours, + "room": room, + "timestamp": datetime.utcnow().isoformat(), + "hourly_consumption": hourly_data, + "peak_analysis": peak_analysis, + "efficiency_metrics": efficiency_metrics, + "total_consumption": sum(item["total_energy"] for item in hourly_data) + } + + except Exception as e: + logger.error(f"Error getting energy analytics: {e}") + raise + + async def _get_room_analytics(self, hours: int) -> Dict[str, Any]: + """Get room-level analytics""" + try: + start_time = datetime.utcnow() - timedelta(hours=hours) + + pipeline = [ + { + "$match": { + "created_at": {"$gte": start_time}, + "room": {"$ne": None} + } + }, + { + "$group": { + "_id": "$room", + "total_readings": {"$sum": 1}, + "total_energy": {"$sum": "$value"}, + "average_energy": {"$avg": "$value"}, + "unique_sensors": {"$addToSet": "$sensor_id"} + } + }, + {"$sort": {"total_energy": -1}} + ] + + cursor = self.db.sensor_readings.aggregate(pipeline) + room_data = [] + + async for room in cursor: + room_data.append({ + "room": room["_id"], + "total_readings": room["total_readings"], + "total_energy": room["total_energy"], + "average_energy": round(room["average_energy"], 2), + "sensor_count": len(room["unique_sensors"]) + }) + + return { + "by_room": room_data, + "total_rooms": len(room_data) + } + + except Exception as e: + logger.error(f"Error getting room analytics: {e}") + return {"by_room": [], "total_rooms": 0} + + async def _get_energy_trends(self, hours: int) -> Dict[str, Any]: + """Get energy consumption trends""" + try: + start_time = datetime.utcnow() - timedelta(hours=hours) + + # Get current period data + current_query = {"created_at": {"$gte": start_time}} + current_cursor = self.db.sensor_readings.aggregate([ + {"$match": current_query}, + {"$group": {"_id": None, "total": {"$sum": "$value"}, "count": {"$sum": 1}}} + ]) + current_data = await current_cursor.to_list(length=1) + current_total = current_data[0]["total"] if current_data else 0 + current_count = current_data[0]["count"] if current_data else 0 + + # Get previous period for comparison + previous_start = start_time - timedelta(hours=hours) + previous_query = { + "created_at": {"$gte": previous_start, "$lt": start_time} + } + previous_cursor = self.db.sensor_readings.aggregate([ + {"$match": previous_query}, + {"$group": {"_id": None, "total": {"$sum": "$value"}, "count": {"$sum": 1}}} + ]) + previous_data = await previous_cursor.to_list(length=1) + previous_total = previous_data[0]["total"] if previous_data else 0 + + # Calculate trend + if previous_total > 0: + trend_percentage = ((current_total - previous_total) / previous_total) * 100 + else: + trend_percentage = 0 + + return { + "current_period": { + "total_energy": current_total, + "reading_count": current_count, + "average_per_reading": current_total / current_count if current_count > 0 else 0 + }, + "previous_period": { + "total_energy": previous_total + }, + "trend": { + "percentage_change": round(trend_percentage, 2), + "direction": "up" if trend_percentage > 0 else "down" if trend_percentage < 0 else "stable" + } + } + + except Exception as e: + logger.error(f"Error getting energy trends: {e}") + return {} + + async def _get_peak_consumption_analysis(self, base_query: Dict[str, Any]) -> Dict[str, Any]: + """Analyze peak consumption patterns""" + try: + pipeline = [ + {"$match": base_query}, + { + "$group": { + "_id": {"$hour": "$created_at"}, + "total_consumption": {"$sum": "$value"}, + "reading_count": {"$sum": 1} + } + }, + {"$sort": {"total_consumption": -1}} + ] + + cursor = self.db.sensor_readings.aggregate(pipeline) + hourly_consumption = await cursor.to_list(length=None) + + if not hourly_consumption: + return {"peak_hour": None, "peak_consumption": 0, "hourly_pattern": []} + + peak_data = hourly_consumption[0] + + return { + "peak_hour": peak_data["_id"], + "peak_consumption": peak_data["total_consumption"], + "hourly_pattern": [ + { + "hour": item["_id"], + "consumption": item["total_consumption"], + "reading_count": item["reading_count"] + } + for item in hourly_consumption + ] + } + + except Exception as e: + logger.error(f"Error analyzing peak consumption: {e}") + return {"peak_hour": None, "peak_consumption": 0, "hourly_pattern": []} + + async def _get_efficiency_metrics(self, base_query: Dict[str, Any]) -> Dict[str, Any]: + """Calculate energy efficiency metrics""" + try: + # Average consumption per sensor + pipeline = [ + {"$match": base_query}, + { + "$group": { + "_id": "$sensor_id", + "total_consumption": {"$sum": "$value"}, + "reading_count": {"$sum": 1} + } + }, + { + "$group": { + "_id": None, + "average_per_sensor": {"$avg": "$total_consumption"}, + "sensor_count": {"$sum": 1}, + "min_consumption": {"$min": "$total_consumption"}, + "max_consumption": {"$max": "$total_consumption"} + } + } + ] + + cursor = self.db.sensor_readings.aggregate(pipeline) + efficiency_data = await cursor.to_list(length=1) + + if not efficiency_data: + return { + "average_per_sensor": 0, + "sensor_count": 0, + "efficiency_score": 0, + "variation_coefficient": 0 + } + + data = efficiency_data[0] + + # Calculate efficiency score (lower variation = higher efficiency) + if data["average_per_sensor"] > 0: + variation_coefficient = (data["max_consumption"] - data["min_consumption"]) / data["average_per_sensor"] + efficiency_score = max(0, 100 - (variation_coefficient * 10)) # Scale to 0-100 + else: + variation_coefficient = 0 + efficiency_score = 100 + + return { + "average_per_sensor": round(data["average_per_sensor"], 2), + "sensor_count": data["sensor_count"], + "efficiency_score": round(efficiency_score, 1), + "variation_coefficient": round(variation_coefficient, 2) + } + + except Exception as e: + logger.error(f"Error calculating efficiency metrics: {e}") + return { + "average_per_sensor": 0, + "sensor_count": 0, + "efficiency_score": 0, + "variation_coefficient": 0 + } \ No newline at end of file diff --git a/monolith/src/modules/sensors/models.py b/monolith/src/modules/sensors/models.py new file mode 100644 index 0000000..959f454 --- /dev/null +++ b/monolith/src/modules/sensors/models.py @@ -0,0 +1,378 @@ +""" +Models for Sensor Management Service - integrating all original dashboard functionality +""" + +from pydantic import BaseModel, Field +from typing import Optional, List, Dict, Any, Literal +from datetime import datetime +from enum import Enum + +class SensorType(str, Enum): + ENERGY = "energy" + CO2 = "co2" + TEMPERATURE = "temperature" + HUMIDITY = "humidity" + HVAC = "hvac" + LIGHTING = "lighting" + SECURITY = "security" + MOTION = "motion" + +class SensorStatus(str, Enum): + ONLINE = "online" + OFFLINE = "offline" + ERROR = "error" + MAINTENANCE = "maintenance" + +class CO2Status(str, Enum): + GOOD = "good" + MODERATE = "moderate" + POOR = "poor" + CRITICAL = "critical" + +class OccupancyLevel(str, Enum): + LOW = "low" + MEDIUM = "medium" + HIGH = "high" + +# Base Models from original dashboard +class SensorReading(BaseModel): + """Individual sensor reading model - enhanced from original""" + sensor_id: str = Field(..., description="Unique sensor identifier") + room: Optional[str] = Field(None, description="Room where sensor is located") + sensor_type: SensorType = Field(..., description="Type of sensor") + timestamp: int = Field(..., description="Unix timestamp of reading") + created_at: datetime = Field(default_factory=datetime.utcnow, description="Record creation timestamp") + + # Sensor values with enhanced structure + energy: Optional[Dict[str, Any]] = Field(None, description="Energy reading with value and unit") + co2: Optional[Dict[str, Any]] = Field(None, description="CO2 reading with value and unit") + temperature: Optional[Dict[str, Any]] = Field(None, description="Temperature reading with value and unit") + humidity: Optional[Dict[str, Any]] = Field(None, description="Humidity reading with value and unit") + motion: Optional[Dict[str, Any]] = Field(None, description="Motion detection reading") + + # Additional sensor types from tiocps + power: Optional[Dict[str, Any]] = Field(None, description="Power consumption reading") + voltage: Optional[Dict[str, Any]] = Field(None, description="Voltage reading") + current: Optional[Dict[str, Any]] = Field(None, description="Current reading") + generation: Optional[Dict[str, Any]] = Field(None, description="Energy generation reading") + + # Metadata + metadata: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Additional sensor metadata") + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() + } + +class LegacySensorReading(BaseModel): + """Legacy sensor reading format for backward compatibility""" + sensor_id: str = Field(..., alias="sensorId") + timestamp: int + value: float + unit: str + created_at: datetime = Field(default_factory=datetime.utcnow) + + class Config: + allow_population_by_field_name = True + +class SensorMetadata(BaseModel): + """Enhanced sensor metadata from original dashboard""" + sensor_id: str = Field(..., description="Unique sensor identifier") + name: str = Field(..., description="Human-readable sensor name") + sensor_type: SensorType = Field(..., description="Type of sensor") + room: Optional[str] = Field(None, description="Room assignment") + status: SensorStatus = Field(default=SensorStatus.OFFLINE, description="Current sensor status") + + # Physical location and installation details + location: Optional[str] = Field(None, description="Physical location description") + floor: Optional[str] = Field(None, description="Floor level") + building: Optional[str] = Field(None, description="Building identifier") + + # Technical specifications + model: Optional[str] = Field(None, description="Sensor model") + manufacturer: Optional[str] = Field(None, description="Sensor manufacturer") + firmware_version: Optional[str] = Field(None, description="Firmware version") + hardware_version: Optional[str] = Field(None, description="Hardware version") + + # Network and connectivity + ip_address: Optional[str] = Field(None, description="IP address if network connected") + mac_address: Optional[str] = Field(None, description="MAC address") + connection_type: Optional[str] = Field(None, description="Connection type (wifi, ethernet, zigbee, etc.)") + + # Power and maintenance + battery_level: Optional[float] = Field(None, description="Battery level percentage") + last_maintenance: Optional[datetime] = Field(None, description="Last maintenance date") + next_maintenance: Optional[datetime] = Field(None, description="Next scheduled maintenance") + + # Operational settings + sampling_rate: Optional[int] = Field(None, description="Data sampling rate in seconds") + calibration_date: Optional[datetime] = Field(None, description="Last calibration date") + + # Capabilities from tiocps integration + monitoring_capabilities: List[str] = Field(default_factory=list, description="List of monitoring capabilities") + control_capabilities: List[str] = Field(default_factory=list, description="List of control capabilities") + demand_response_enabled: bool = Field(default=False, description="Demand response participation") + + # Timestamps + installed_at: Optional[datetime] = Field(None, description="Installation timestamp") + last_seen: Optional[datetime] = Field(None, description="Last communication timestamp") + created_at: datetime = Field(default_factory=datetime.utcnow, description="Record creation timestamp") + updated_at: datetime = Field(default_factory=datetime.utcnow, description="Record update timestamp") + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() if v else None + } + +class RoomMetrics(BaseModel): + """Enhanced room metrics from original dashboard""" + room: str = Field(..., description="Room identifier") + timestamp: int = Field(..., description="Metrics calculation timestamp") + created_at: datetime = Field(default_factory=datetime.utcnow, description="Record creation timestamp") + + # Sensor inventory + sensor_count: int = Field(0, description="Total number of sensors in room") + active_sensors: List[str] = Field(default_factory=list, description="List of active sensor IDs") + sensor_types: List[SensorType] = Field(default_factory=list, description="Types of sensors present") + + # Energy metrics (enhanced from tiocps) + energy: Optional[Dict[str, Any]] = Field(None, description="Energy consumption metrics") + power: Optional[Dict[str, Any]] = Field(None, description="Power consumption metrics") + generation: Optional[Dict[str, Any]] = Field(None, description="Energy generation metrics") + flexibility: Optional[Dict[str, Any]] = Field(None, description="Energy flexibility metrics") + + # Environmental metrics + co2: Optional[Dict[str, Any]] = Field(None, description="CO2 level metrics") + temperature: Optional[Dict[str, Any]] = Field(None, description="Temperature metrics") + humidity: Optional[Dict[str, Any]] = Field(None, description="Humidity metrics") + + # Occupancy and usage + occupancy_estimate: OccupancyLevel = Field(default=OccupancyLevel.LOW, description="Estimated occupancy level") + motion_detected: bool = Field(default=False, description="Recent motion detection status") + + # Time-based metrics + last_activity: Optional[datetime] = Field(None, description="Last detected activity timestamp") + daily_usage_hours: Optional[float] = Field(None, description="Estimated daily usage in hours") + + # Economic metrics from tiocps + energy_cost: Optional[float] = Field(None, description="Estimated energy cost") + savings_potential: Optional[float] = Field(None, description="Potential savings from optimization") + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() if v else None + } + +class Room(BaseModel): + """Room definition model""" + name: str = Field(..., description="Room name/identifier") + display_name: Optional[str] = Field(None, description="Human-readable room name") + floor: Optional[str] = Field(None, description="Floor level") + building: Optional[str] = Field(None, description="Building identifier") + area_m2: Optional[float] = Field(None, description="Room area in square meters") + capacity: Optional[int] = Field(None, description="Room capacity (people)") + room_type: Optional[str] = Field(None, description="Room type (office, meeting, etc.)") + + # Configuration + target_temperature: Optional[float] = Field(None, description="Target temperature") + target_co2: Optional[float] = Field(None, description="Target CO2 level") + operating_hours: Optional[Dict[str, Any]] = Field(None, description="Operating hours schedule") + + # Status + active: bool = Field(default=True, description="Whether room is active") + created_at: datetime = Field(default_factory=datetime.utcnow) + updated_at: datetime = Field(default_factory=datetime.utcnow) + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() + } + +class SystemEvent(BaseModel): + """Enhanced system events from original dashboard""" + event_id: str = Field(..., description="Unique event identifier") + event_type: str = Field(..., description="Type of event") + severity: Literal["info", "warning", "error", "critical"] = Field(..., description="Event severity") + timestamp: int = Field(..., description="Event timestamp") + created_at: datetime = Field(default_factory=datetime.utcnow, description="Record creation timestamp") + + # Event details + title: str = Field(..., description="Event title") + description: str = Field(..., description="Event description") + source: Optional[str] = Field(None, description="Event source (sensor_id, system component, etc.)") + + # Context + sensor_id: Optional[str] = Field(None, description="Related sensor ID") + room: Optional[str] = Field(None, description="Related room") + + # Event data + data: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Additional event data") + + # Status tracking + acknowledged: bool = Field(default=False, description="Whether event has been acknowledged") + resolved: bool = Field(default=False, description="Whether event has been resolved") + acknowledged_by: Optional[str] = Field(None, description="Who acknowledged the event") + resolved_by: Optional[str] = Field(None, description="Who resolved the event") + acknowledged_at: Optional[datetime] = Field(None, description="Acknowledgment timestamp") + resolved_at: Optional[datetime] = Field(None, description="Resolution timestamp") + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() if v else None + } + +class DataQuery(BaseModel): + """Enhanced data query parameters from original dashboard""" + sensor_ids: Optional[List[str]] = Field(None, description="Filter by sensor IDs") + rooms: Optional[List[str]] = Field(None, description="Filter by rooms") + sensor_types: Optional[List[SensorType]] = Field(None, description="Filter by sensor types") + + # Time range + start_time: Optional[int] = Field(None, description="Start timestamp (Unix)") + end_time: Optional[int] = Field(None, description="End timestamp (Unix)") + + # Aggregation + aggregate: Optional[str] = Field(None, description="Aggregation method (avg, sum, min, max)") + interval: Optional[str] = Field(None, description="Aggregation interval (1m, 5m, 1h, 1d)") + + # Pagination + limit: int = Field(default=100, description="Maximum number of records to return") + offset: int = Field(default=0, description="Number of records to skip") + + # Sorting + sort_by: str = Field(default="timestamp", description="Field to sort by") + sort_order: Literal["asc", "desc"] = Field(default="desc", description="Sort order") + + # Additional filters from tiocps + energy_threshold: Optional[float] = Field(None, description="Filter by energy threshold") + co2_threshold: Optional[float] = Field(None, description="Filter by CO2 threshold") + include_metadata: bool = Field(default=False, description="Include sensor metadata in response") + +class DataResponse(BaseModel): + """Enhanced response model for data queries""" + data: List[Dict[str, Any]] = Field(default_factory=list, description="Query results") + total_count: int = Field(0, description="Total number of matching records") + query: DataQuery = Field(..., description="Original query parameters") + execution_time_ms: float = Field(..., description="Query execution time in milliseconds") + + # Additional metadata + aggregation_applied: bool = Field(default=False, description="Whether data was aggregated") + cache_hit: bool = Field(default=False, description="Whether result was served from cache") + +class AnalyticsSummary(BaseModel): + """Comprehensive analytics summary""" + period_hours: int + start_time: datetime + end_time: datetime + + # Sensor analytics + total_sensors: int + active_sensors: int + sensor_types_summary: Dict[str, int] + + # Room analytics + total_rooms: int + active_rooms: int + room_occupancy_summary: Dict[str, int] + + # Energy analytics + total_energy_consumption: float + total_energy_generation: float + net_energy_consumption: float + energy_efficiency: float + + # Environmental analytics + average_co2: float + average_temperature: float + average_humidity: float + + # System health + system_events_count: int + critical_events_count: int + sensor_errors_count: int + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() + } + +# Room Management Models +class Room(BaseModel): + """Room model for database storage and API responses""" + name: str = Field(..., description="Unique room name") + description: Optional[str] = Field(None, description="Room description") + floor: Optional[str] = Field(None, description="Floor designation") + building: Optional[str] = Field(None, description="Building name") + area: Optional[float] = Field(None, description="Room area in square meters") + capacity: Optional[int] = Field(None, description="Maximum occupancy") + room_type: Optional[str] = Field(None, description="Room type (office, meeting, storage, etc.)") + + # Metadata + created_at: datetime = Field(default_factory=datetime.utcnow, description="Room creation timestamp") + updated_at: datetime = Field(default_factory=datetime.utcnow, description="Room update timestamp") + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() if v else None + } + +class RoomCreate(BaseModel): + """Model for creating new rooms""" + name: str = Field(..., description="Unique room name", min_length=1, max_length=100) + description: Optional[str] = Field(None, description="Room description", max_length=500) + floor: Optional[str] = Field(None, description="Floor designation", max_length=50) + building: Optional[str] = Field(None, description="Building name", max_length=100) + area: Optional[float] = Field(None, description="Room area in square meters", gt=0) + capacity: Optional[int] = Field(None, description="Maximum occupancy", gt=0) + room_type: Optional[str] = Field(None, description="Room type", max_length=50) + +class RoomUpdate(BaseModel): + """Model for updating existing rooms""" + description: Optional[str] = Field(None, description="Room description", max_length=500) + floor: Optional[str] = Field(None, description="Floor designation", max_length=50) + building: Optional[str] = Field(None, description="Building name", max_length=100) + area: Optional[float] = Field(None, description="Room area in square meters", gt=0) + capacity: Optional[int] = Field(None, description="Maximum occupancy", gt=0) + room_type: Optional[str] = Field(None, description="Room type", max_length=50) + +class RoomInfo(BaseModel): + """Comprehensive room information for API responses""" + name: str = Field(..., description="Room name") + description: Optional[str] = Field(None, description="Room description") + floor: Optional[str] = Field(None, description="Floor designation") + building: Optional[str] = Field(None, description="Building name") + area: Optional[float] = Field(None, description="Room area in square meters") + capacity: Optional[int] = Field(None, description="Maximum occupancy") + room_type: Optional[str] = Field(None, description="Room type") + + # Runtime information + sensor_count: int = Field(0, description="Number of sensors in room") + active_sensors: int = Field(0, description="Number of active sensors") + last_updated: Optional[datetime] = Field(None, description="Last metrics update") + + # Timestamps + created_at: datetime = Field(..., description="Room creation timestamp") + updated_at: datetime = Field(..., description="Room update timestamp") + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() if v else None + } + +class HealthResponse(BaseModel): + """Health check response""" + service: str + status: str + timestamp: datetime + version: str + + # Additional service-specific health metrics + total_sensors: Optional[int] = None + active_sensors: Optional[int] = None + total_rooms: Optional[int] = None + websocket_connections: Optional[int] = None + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() + } \ No newline at end of file diff --git a/monolith/src/modules/sensors/room_service.py b/monolith/src/modules/sensors/room_service.py new file mode 100644 index 0000000..d952427 --- /dev/null +++ b/monolith/src/modules/sensors/room_service.py @@ -0,0 +1,467 @@ +""" +Room service for managing rooms and room-level metrics +""" + +import logging +from datetime import datetime, timedelta +from typing import List, Dict, Any, Optional +import json + +logger = logging.getLogger(__name__) + +class RoomService: + """Service for managing rooms and room-level analytics""" + + def __init__(self, db, redis_client): + self.db = db + self.redis = redis_client + + async def get_all_room_names(self) -> List[str]: + """Get a simple list of all room names for dropdowns/selections""" + try: + # Get rooms from the rooms collection + room_cursor = self.db.rooms.find({}, {"name": 1}) + room_names = set() + + async for room in room_cursor: + room_names.add(room["name"]) + + # Also get rooms that exist only in sensor data (legacy support) + sensor_cursor = self.db.sensors.find( + {"room": {"$ne": None, "$exists": True}}, + {"room": 1} + ) + + async for sensor in sensor_cursor: + if sensor.get("room"): + room_names.add(sensor["room"]) + + # Convert to sorted list + return sorted(list(room_names)) + + except Exception as e: + logger.error(f"Error getting room names: {e}") + raise + + async def initialize_default_rooms(self) -> None: + """Initialize default rooms if none exist""" + try: + # Check if any rooms exist + room_count = await self.db.rooms.count_documents({}) + + if room_count == 0: + # Create default rooms + default_rooms = [ + {"name": "Conference Room A", "description": "Main conference room", "room_type": "meeting"}, + {"name": "Conference Room B", "description": "Secondary conference room", "room_type": "meeting"}, + {"name": "Office Floor 1", "description": "First floor office space", "room_type": "office"}, + {"name": "Office Floor 2", "description": "Second floor office space", "room_type": "office"}, + {"name": "Kitchen", "description": "Employee kitchen and break room", "room_type": "common"}, + {"name": "Lobby", "description": "Main entrance and reception", "room_type": "common"}, + {"name": "Server Room", "description": "IT equipment room", "room_type": "technical"}, + {"name": "Storage Room", "description": "General storage", "room_type": "storage"}, + {"name": "Meeting Room 1", "description": "Small meeting room", "room_type": "meeting"}, + {"name": "Meeting Room 2", "description": "Small meeting room", "room_type": "meeting"} + ] + + for room_data in default_rooms: + room_doc = { + **room_data, + "created_at": datetime.utcnow(), + "updated_at": datetime.utcnow() + } + await self.db.rooms.insert_one(room_doc) + + logger.info(f"Initialized {len(default_rooms)} default rooms") + + except Exception as e: + logger.error(f"Error initializing default rooms: {e}") + raise + + async def get_rooms(self) -> List[Dict[str, Any]]: + """Get all rooms with sensor counts and metrics""" + try: + # Get unique rooms from sensors + pipeline = [ + {"$group": {"_id": "$room", "sensor_count": {"$sum": 1}}}, + {"$match": {"_id": {"$ne": None}}} + ] + + cursor = self.db.sensors.aggregate(pipeline) + rooms = [] + + async for room_data in cursor: + room_name = room_data["_id"] + + # Get latest room metrics + latest_metrics = await self._get_latest_room_metrics(room_name) + + room_info = { + "name": room_name, + "sensor_count": room_data["sensor_count"], + "latest_metrics": latest_metrics, + "last_updated": latest_metrics.get("timestamp") if latest_metrics else None + } + + rooms.append(room_info) + + return rooms + + except Exception as e: + logger.error(f"Error getting rooms: {e}") + raise + + async def create_room(self, room_data: Dict[str, Any]) -> Dict[str, Any]: + """Create a new room""" + try: + room_doc = { + "name": room_data.get("name"), + "description": room_data.get("description", ""), + "floor": room_data.get("floor"), + "building": room_data.get("building"), + "area": room_data.get("area"), + "capacity": room_data.get("capacity"), + "room_type": room_data.get("room_type"), + "created_at": datetime.utcnow(), + "updated_at": datetime.utcnow() + } + + # Validate required fields + if not room_doc["name"] or not room_doc["name"].strip(): + raise ValueError("Room name is required") + + # Check if room already exists + existing = await self.db.rooms.find_one({"name": room_doc["name"]}) + if existing: + raise ValueError(f"Room {room_doc['name']} already exists") + + result = await self.db.rooms.insert_one(room_doc) + + return { + "id": str(result.inserted_id), + "name": room_doc["name"], + "created_at": room_doc["created_at"] + } + + except Exception as e: + logger.error(f"Error creating room: {e}") + raise + + async def update_room(self, room_name: str, room_data: Dict[str, Any]) -> Dict[str, Any]: + """Update an existing room""" + try: + # Check if room exists + existing = await self.db.rooms.find_one({"name": room_name}) + if not existing: + raise ValueError(f"Room {room_name} not found") + + # Prepare update document + update_doc = { + "updated_at": datetime.utcnow() + } + + # Update only provided fields + for field in ["description", "floor", "building", "area", "capacity", "room_type"]: + if field in room_data and room_data[field] is not None: + update_doc[field] = room_data[field] + + # Perform update + result = await self.db.rooms.update_one( + {"name": room_name}, + {"$set": update_doc} + ) + + if result.modified_count == 0: + logger.warning(f"No changes made to room {room_name}") + + return { + "name": room_name, + "updated_at": update_doc["updated_at"], + "modified": result.modified_count > 0 + } + + except Exception as e: + logger.error(f"Error updating room: {e}") + raise + + async def delete_room(self, room_name: str) -> Dict[str, Any]: + """Delete a room and optionally reassign sensors""" + try: + # Check if room exists + existing = await self.db.rooms.find_one({"name": room_name}) + + # Check for sensors in this room + sensors_in_room = await self.db.sensors.find({"room": room_name}).to_list(None) + + if sensors_in_room: + # Update sensors to have null room (don't delete sensors) + await self.db.sensors.update_many( + {"room": room_name}, + {"$unset": {"room": ""}} + ) + + # Delete room from rooms collection if it exists + room_deleted = False + if existing: + result = await self.db.rooms.delete_one({"name": room_name}) + room_deleted = result.deleted_count > 0 + + # Delete room metrics + metrics_result = await self.db.room_metrics.delete_many({"room": room_name}) + + return { + "room": room_name, + "room_deleted": room_deleted, + "sensors_updated": len(sensors_in_room), + "metrics_deleted": metrics_result.deleted_count + } + + except Exception as e: + logger.error(f"Error deleting room: {e}") + raise + + async def get_room_details(self, room_name: str) -> Optional[Dict[str, Any]]: + """Get detailed room information""" + try: + # Get room info + room = await self.db.rooms.find_one({"name": room_name}) + + if not room: + # Create basic room info from sensor data + sensors = await self.db.sensors.find({"room": room_name}).to_list(None) + if not sensors: + return None + + room = { + "name": room_name, + "description": f"Room with {len(sensors)} sensors", + "sensor_count": len(sensors) + } + else: + room["_id"] = str(room["_id"]) + + # Get sensor count + sensor_count = await self.db.sensors.count_documents({"room": room_name}) + room["sensor_count"] = sensor_count + + # Get sensors in this room + cursor = self.db.sensors.find({"room": room_name}) + sensors = [] + async for sensor in cursor: + sensor["_id"] = str(sensor["_id"]) + sensors.append(sensor) + + room["sensors"] = sensors + + # Get recent room metrics + room["recent_metrics"] = await self._get_recent_room_metrics(room_name, hours=24) + + return room + + except Exception as e: + logger.error(f"Error getting room details: {e}") + raise + + async def get_room_data(self, room_name: str, start_time: Optional[int] = None, + end_time: Optional[int] = None, limit: int = 100) -> Dict[str, Any]: + """Get historical data for a room""" + try: + # Get room metrics + room_query = {"room": room_name} + if start_time or end_time: + room_query["timestamp"] = {} + if start_time: + room_query["timestamp"]["$gte"] = start_time + if end_time: + room_query["timestamp"]["$lte"] = end_time + + room_metrics_cursor = self.db.room_metrics.find(room_query).sort("timestamp", -1).limit(limit) + room_metrics = [] + async for metric in room_metrics_cursor: + metric["_id"] = str(metric["_id"]) + room_metrics.append(metric) + + # Get sensor readings for this room + sensor_query = {"room": room_name} + if start_time or end_time: + sensor_query["timestamp"] = {} + if start_time: + sensor_query["timestamp"]["$gte"] = start_time + if end_time: + sensor_query["timestamp"]["$lte"] = end_time + + sensor_readings_cursor = self.db.sensor_readings.find(sensor_query).sort("timestamp", -1).limit(limit) + sensor_readings = [] + async for reading in sensor_readings_cursor: + reading["_id"] = str(reading["_id"]) + sensor_readings.append(reading) + + return { + "room_metrics": room_metrics, + "sensor_readings": sensor_readings + } + + except Exception as e: + logger.error(f"Error getting room data: {e}") + raise + + async def update_room_metrics(self, sensor_data): + """Update room-level metrics when sensor data is received""" + try: + if not sensor_data.room: + return + + # Calculate room-level aggregates + room_metrics = await self._calculate_room_metrics(sensor_data.room) + + if room_metrics: + # Store room metrics + metrics_doc = { + "room": sensor_data.room, + "timestamp": sensor_data.timestamp, + "total_energy": room_metrics.get("total_energy", 0), + "average_temperature": room_metrics.get("avg_temperature"), + "co2_level": room_metrics.get("co2_level"), + "occupancy_estimate": room_metrics.get("occupancy_estimate"), + "sensor_count": room_metrics.get("sensor_count", 0), + "created_at": datetime.utcnow() + } + + await self.db.room_metrics.insert_one(metrics_doc) + + # Cache latest metrics + if self.redis: + cache_key = f"room:{sensor_data.room}:latest_metrics" + await self.redis.setex(cache_key, 3600, json.dumps(metrics_doc, default=str)) + + except Exception as e: + logger.error(f"Error updating room metrics: {e}") + + async def aggregate_all_room_metrics(self): + """Aggregate metrics for all rooms""" + try: + # Get all unique rooms + pipeline = [{"$group": {"_id": "$room"}}] + cursor = self.db.sensors.aggregate(pipeline) + + async for room_data in cursor: + room_name = room_data["_id"] + if room_name: + await self._calculate_room_metrics(room_name) + + except Exception as e: + logger.error(f"Error aggregating room metrics: {e}") + + async def _get_latest_room_metrics(self, room_name: str) -> Optional[Dict[str, Any]]: + """Get latest room metrics""" + try: + # Try Redis cache first + if self.redis: + cache_key = f"room:{room_name}:latest_metrics" + cached = await self.redis.get(cache_key) + if cached: + return json.loads(cached) + + # Fall back to database + latest = await self.db.room_metrics.find_one( + {"room": room_name}, + sort=[("timestamp", -1)] + ) + + if latest: + latest["_id"] = str(latest["_id"]) + return latest + + return None + + except Exception as e: + logger.error(f"Error getting latest room metrics: {e}") + return None + + async def _get_recent_room_metrics(self, room_name: str, hours: int = 24) -> List[Dict[str, Any]]: + """Get recent room metrics""" + try: + start_time = datetime.utcnow() - timedelta(hours=hours) + + cursor = self.db.room_metrics.find({ + "room": room_name, + "created_at": {"$gte": start_time} + }).sort("timestamp", -1) + + metrics = [] + async for metric in cursor: + metric["_id"] = str(metric["_id"]) + metrics.append(metric) + + return metrics + + except Exception as e: + logger.error(f"Error getting recent room metrics: {e}") + return [] + + async def _calculate_room_metrics(self, room_name: str) -> Dict[str, Any]: + """Calculate aggregated metrics for a room""" + try: + # Get recent sensor readings (last 5 minutes) + five_minutes_ago = datetime.utcnow() - timedelta(minutes=5) + + pipeline = [ + { + "$match": { + "room": room_name, + "created_at": {"$gte": five_minutes_ago} + } + }, + { + "$group": { + "_id": "$sensor_id", + "latest_value": {"$last": "$value"}, + "sensor_type": {"$last": "$sensor_type"} if "sensor_type" in ["$first", "$last"] else {"$first": "energy"}, + "unit": {"$last": "$unit"} + } + } + ] + + cursor = self.db.sensor_readings.aggregate(pipeline) + + total_energy = 0 + temperatures = [] + co2_levels = [] + sensor_count = 0 + + async for sensor_data in cursor: + sensor_count += 1 + value = sensor_data.get("latest_value", 0) + sensor_type = sensor_data.get("sensor_type", "energy") + + if sensor_type == "energy" or "energy" in str(sensor_data.get("unit", "")).lower(): + total_energy += value + elif sensor_type == "temperature": + temperatures.append(value) + elif sensor_type == "co2": + co2_levels.append(value) + + metrics = { + "total_energy": total_energy, + "sensor_count": sensor_count, + "avg_temperature": sum(temperatures) / len(temperatures) if temperatures else None, + "co2_level": sum(co2_levels) / len(co2_levels) if co2_levels else None, + "occupancy_estimate": self._estimate_occupancy(sensor_count, total_energy) + } + + return metrics + + except Exception as e: + logger.error(f"Error calculating room metrics: {e}") + return {} + + def _estimate_occupancy(self, sensor_count: int, total_energy: float) -> Optional[str]: + """Estimate occupancy level based on energy consumption""" + if total_energy == 0: + return "vacant" + elif total_energy < sensor_count * 50: # Low threshold + return "low" + elif total_energy < sensor_count * 150: # Medium threshold + return "medium" + else: + return "high" \ No newline at end of file diff --git a/monolith/src/modules/sensors/router.py b/monolith/src/modules/sensors/router.py new file mode 100644 index 0000000..019053e --- /dev/null +++ b/monolith/src/modules/sensors/router.py @@ -0,0 +1,475 @@ +"""Sensors module API routes.""" + +import logging +from datetime import datetime +from fastapi import APIRouter, HTTPException, Depends, WebSocket, WebSocketDisconnect, Query, BackgroundTasks +from typing import Optional + +from .models import ( + SensorReading, SensorMetadata, RoomCreate, RoomUpdate, DataQuery, DataResponse, + SensorType, SensorStatus, HealthResponse +) +from .sensor_service import SensorService +from .room_service import RoomService +from .analytics_service import AnalyticsService +from .websocket_manager import WebSocketManager + +from src.core.dependencies import get_sensors_db, get_redis + +logger = logging.getLogger(__name__) + +# Create router +router = APIRouter() + +# WebSocket manager (shared across all route handlers) +websocket_manager = WebSocketManager() + + +# Dependency functions +async def get_sensor_service(db=Depends(get_sensors_db), redis=Depends(get_redis)): + return SensorService(db, redis) + + +async def get_room_service(db=Depends(get_sensors_db), redis=Depends(get_redis)): + return RoomService(db, redis) + + +async def get_analytics_service(db=Depends(get_sensors_db), redis=Depends(get_redis)): + return AnalyticsService(db, redis) + + +# Health check +@router.get("/health", response_model=HealthResponse) +async def health_check(db=Depends(get_sensors_db)): + """Health check endpoint for sensors module""" + try: + await db.command("ping") + return HealthResponse( + service="sensors-module", + status="healthy", + timestamp=datetime.utcnow(), + version="1.0.0" + ) + except Exception as e: + logger.error(f"Health check failed: {e}") + raise HTTPException(status_code=503, detail="Service Unavailable") + + +# WebSocket endpoint for real-time data +@router.websocket("/ws") +async def websocket_endpoint(websocket: WebSocket): + """WebSocket endpoint for real-time sensor data""" + await websocket_manager.connect(websocket) + try: + while True: + await websocket.receive_text() + except WebSocketDisconnect: + await websocket_manager.disconnect(websocket) + + +# Sensor Management Routes +@router.get("/sensors/get") +async def get_sensors( + room: Optional[str] = Query(None, description="Filter by room"), + sensor_type: Optional[SensorType] = Query(None, description="Filter by sensor type"), + status: Optional[SensorStatus] = Query(None, description="Filter by status"), + service: SensorService = Depends(get_sensor_service) +): + """Get all sensors with optional filtering""" + try: + sensors = await service.get_sensors(room=room, sensor_type=sensor_type, status=status) + return { + "sensors": sensors, + "count": len(sensors), + "filters": { + "room": room, + "sensor_type": sensor_type.value if sensor_type else None, + "status": status.value if status else None + } + } + except Exception as e: + logger.error(f"Error getting sensors: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + +@router.get("/sensors/{sensor_id}") +async def get_sensor(sensor_id: str, service: SensorService = Depends(get_sensor_service)): + """Get detailed sensor information""" + try: + sensor = await service.get_sensor_details(sensor_id) + if not sensor: + raise HTTPException(status_code=404, detail="Sensor not found") + return sensor + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting sensor {sensor_id}: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + +@router.get("/sensors/{sensor_id}/data") +async def get_sensor_data( + sensor_id: str, + start_time: Optional[int] = Query(None, description="Start timestamp (Unix)"), + end_time: Optional[int] = Query(None, description="End timestamp (Unix)"), + limit: int = Query(100, description="Maximum records to return"), + offset: int = Query(0, description="Records to skip"), + service: SensorService = Depends(get_sensor_service) +): + """Get historical data for a specific sensor""" + try: + data = await service.get_sensor_data( + sensor_id=sensor_id, + start_time=start_time, + end_time=end_time, + limit=limit, + offset=offset + ) + + return DataResponse( + data=data["readings"], + total_count=data["total_count"], + query=DataQuery( + sensor_ids=[sensor_id], + start_time=start_time, + end_time=end_time, + limit=limit, + offset=offset + ), + execution_time_ms=data.get("execution_time_ms", 0) + ) + except Exception as e: + logger.error(f"Error getting sensor data for {sensor_id}: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + +@router.post("/sensors") +async def create_sensor( + sensor_data: SensorMetadata, + service: SensorService = Depends(get_sensor_service) +): + """Register a new sensor""" + try: + result = await service.create_sensor(sensor_data) + return { + "message": "Sensor created successfully", + "sensor_id": sensor_data.sensor_id, + "created_at": result.get("created_at") + } + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + logger.error(f"Error creating sensor: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + +@router.put("/sensors/{sensor_id}") +async def update_sensor( + sensor_id: str, + update_data: dict, + service: SensorService = Depends(get_sensor_service) +): + """Update sensor metadata""" + try: + result = await service.update_sensor(sensor_id, update_data) + if not result: + raise HTTPException(status_code=404, detail="Sensor not found") + + return { + "message": "Sensor updated successfully", + "sensor_id": sensor_id, + "updated_at": datetime.utcnow().isoformat() + } + except HTTPException: + raise + except Exception as e: + logger.error(f"Error updating sensor {sensor_id}: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + +@router.delete("/sensors/{sensor_id}") +async def delete_sensor( + sensor_id: str, + service: SensorService = Depends(get_sensor_service) +): + """Delete a sensor and all its data""" + try: + result = await service.delete_sensor(sensor_id) + return { + "message": "Sensor deleted successfully", + "sensor_id": sensor_id, + "readings_deleted": result.get("readings_deleted", 0) + } + except Exception as e: + logger.error(f"Error deleting sensor {sensor_id}: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + +# Room Management Routes +@router.get("/rooms/names") +async def get_room_names(service: RoomService = Depends(get_room_service)): + """Get simple list of room names for dropdowns""" + try: + room_names = await service.get_all_room_names() + return { + "rooms": room_names, + "count": len(room_names) + } + except Exception as e: + logger.error(f"Error getting room names: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + +@router.get("/rooms") +async def get_rooms(service: RoomService = Depends(get_room_service)): + """Get all rooms with sensor counts and metrics""" + try: + rooms = await service.get_rooms() + return { + "rooms": rooms, + "count": len(rooms) + } + except Exception as e: + logger.error(f"Error getting rooms: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + +@router.post("/rooms") +async def create_room( + room_data: RoomCreate, + service: RoomService = Depends(get_room_service) +): + """Create a new room""" + try: + result = await service.create_room(room_data.dict()) + return { + "message": "Room created successfully", + "room": result["name"], + "created_at": result["created_at"] + } + except ValueError as e: + raise HTTPException(status_code=400, detail=str(e)) + except Exception as e: + logger.error(f"Error creating room: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + +@router.put("/rooms/{room_name}") +async def update_room( + room_name: str, + room_data: RoomUpdate, + service: RoomService = Depends(get_room_service) +): + """Update an existing room""" + try: + result = await service.update_room(room_name, room_data.dict(exclude_unset=True)) + return { + "message": "Room updated successfully", + "room": result["name"], + "updated_at": result["updated_at"], + "modified": result["modified"] + } + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) + except Exception as e: + logger.error(f"Error updating room {room_name}: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + +@router.delete("/rooms/{room_name}") +async def delete_room(room_name: str, service: RoomService = Depends(get_room_service)): + """Delete a room""" + try: + result = await service.delete_room(room_name) + return { + "message": "Room deleted successfully", + **result + } + except Exception as e: + logger.error(f"Error deleting room {room_name}: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + +@router.get("/rooms/{room_name}") +async def get_room(room_name: str, service: RoomService = Depends(get_room_service)): + """Get detailed room information""" + try: + room = await service.get_room_details(room_name) + if not room: + raise HTTPException(status_code=404, detail="Room not found") + return room + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting room {room_name}: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + +@router.get("/rooms/{room_name}/data") +async def get_room_data( + room_name: str, + start_time: Optional[int] = Query(None, description="Start timestamp (Unix)"), + end_time: Optional[int] = Query(None, description="End timestamp (Unix)"), + limit: int = Query(100, description="Maximum records to return"), + service: RoomService = Depends(get_room_service) +): + """Get historical data for a specific room""" + try: + data = await service.get_room_data( + room_name=room_name, + start_time=start_time, + end_time=end_time, + limit=limit + ) + + return { + "room": room_name, + "room_metrics": data.get("room_metrics", []), + "sensor_readings": data.get("sensor_readings", []), + "period": { + "start_time": start_time, + "end_time": end_time + } + } + except Exception as e: + logger.error(f"Error getting room data for {room_name}: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + +# Analytics Routes +@router.post("/data/query") +async def query_data( + query_params: DataQuery, + service: AnalyticsService = Depends(get_analytics_service) +): + """Advanced data querying with multiple filters""" + try: + result = await service.query_data(query_params) + return result + except Exception as e: + logger.error(f"Error executing data query: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + +@router.get("/analytics/summary") +async def get_analytics_summary( + hours: int = Query(24, description="Hours of data to analyze"), + service: AnalyticsService = Depends(get_analytics_service) +): + """Get comprehensive analytics summary""" + try: + analytics = await service.get_analytics_summary(hours) + return analytics + except Exception as e: + logger.error(f"Error getting analytics summary: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + +@router.get("/analytics/energy") +async def get_energy_analytics( + hours: int = Query(24), + room: Optional[str] = Query(None), + service: AnalyticsService = Depends(get_analytics_service) +): + """Get energy-specific analytics""" + try: + analytics = await service.get_energy_analytics(hours, room) + return analytics + except Exception as e: + logger.error(f"Error getting energy analytics: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + +# Data Export +@router.get("/export") +async def export_data( + start_time: int = Query(..., description="Start timestamp (Unix)"), + end_time: int = Query(..., description="End timestamp (Unix)"), + sensor_ids: Optional[str] = Query(None, description="Comma-separated sensor IDs"), + format: str = Query("json", description="Export format (json, csv)"), + service: SensorService = Depends(get_sensor_service) +): + """Export sensor data""" + try: + export_data_result = await service.export_data( + start_time=start_time, + end_time=end_time, + sensor_ids=sensor_ids, + format=format + ) + return export_data_result + except Exception as e: + logger.error(f"Error exporting data: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + +# System Events +@router.get("/events") +async def get_events( + severity: Optional[str] = Query(None, description="Filter by severity"), + event_type: Optional[str] = Query(None, description="Filter by event type"), + hours: int = Query(24, description="Hours of events to retrieve"), + limit: int = Query(50, description="Maximum events to return"), + service: SensorService = Depends(get_sensor_service) +): + """Get system events and alerts""" + try: + events = await service.get_events( + severity=severity, + event_type=event_type, + hours=hours, + limit=limit + ) + + return { + "events": events, + "count": len(events), + "period_hours": hours + } + except Exception as e: + logger.error(f"Error getting events: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + +# Real-time data ingestion endpoint +@router.post("/data/ingest") +async def ingest_sensor_data( + sensor_data: SensorReading, + background_tasks: BackgroundTasks, + service: SensorService = Depends(get_sensor_service), + room_service: RoomService = Depends(get_room_service) +): + """Ingest real-time sensor data""" + try: + result = await service.ingest_sensor_data(sensor_data) + + # Schedule background tasks + if sensor_data.room: + background_tasks.add_task(_update_room_metrics, room_service, sensor_data) + background_tasks.add_task(_broadcast_sensor_data, sensor_data) + + return { + "message": "Sensor data ingested successfully", + "sensor_id": sensor_data.sensor_id, + "timestamp": sensor_data.timestamp + } + except Exception as e: + logger.error(f"Error ingesting sensor data: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + +# Background task helper functions +async def _update_room_metrics(room_service: RoomService, sensor_data: SensorReading): + """Update room-level metrics when sensor data is received""" + try: + await room_service.update_room_metrics(sensor_data) + except Exception as e: + logger.error(f"Error updating room metrics: {e}") + + +async def _broadcast_sensor_data(sensor_data: SensorReading): + """Broadcast sensor data to WebSocket clients""" + try: + await websocket_manager.broadcast_sensor_data(sensor_data) + except Exception as e: + logger.error(f"Error broadcasting sensor data: {e}") diff --git a/monolith/src/modules/sensors/sensor_service.py b/monolith/src/modules/sensors/sensor_service.py new file mode 100644 index 0000000..abc0e55 --- /dev/null +++ b/monolith/src/modules/sensors/sensor_service.py @@ -0,0 +1,251 @@ +""" +Sensor service business logic +""" + +import logging +from datetime import datetime, timedelta +from typing import List, Dict, Any, Optional +import json + +logger = logging.getLogger(__name__) + +class SensorService: + """Service for managing sensors and sensor data""" + + def __init__(self, db, redis_client): + self.db = db + self.redis = redis_client + + async def get_sensors(self, room: Optional[str] = None, sensor_type: Optional[str] = None, status: Optional[str] = None) -> List[Dict[str, Any]]: + """Get sensors with optional filtering""" + try: + query = {} + + if room: + query["room"] = room + if sensor_type: + query["sensor_type"] = sensor_type + if status: + query["status"] = status + + cursor = self.db.sensors.find(query) + sensors = [] + + async for sensor in cursor: + sensor["_id"] = str(sensor["_id"]) + sensors.append(sensor) + + return sensors + + except Exception as e: + logger.error(f"Error getting sensors: {e}") + raise + + async def get_sensor_details(self, sensor_id: str) -> Optional[Dict[str, Any]]: + """Get detailed sensor information""" + try: + sensor = await self.db.sensors.find_one({"sensor_id": sensor_id}) + + if sensor: + sensor["_id"] = str(sensor["_id"]) + + # Get recent readings + recent_readings = await self.get_sensor_data(sensor_id, limit=10) + sensor["recent_readings"] = recent_readings.get("readings", []) + + return sensor + + return None + + except Exception as e: + logger.error(f"Error getting sensor details: {e}") + raise + + async def get_sensor_data(self, sensor_id: str, start_time: Optional[int] = None, + end_time: Optional[int] = None, limit: int = 100, offset: int = 0) -> Dict[str, Any]: + """Get historical sensor data""" + try: + query = {"sensor_id": sensor_id} + + if start_time or end_time: + query["timestamp"] = {} + if start_time: + query["timestamp"]["$gte"] = start_time + if end_time: + query["timestamp"]["$lte"] = end_time + + # Get total count + total_count = await self.db.sensor_readings.count_documents(query) + + # Get readings + cursor = self.db.sensor_readings.find(query).sort("timestamp", -1).skip(offset).limit(limit) + readings = [] + + async for reading in cursor: + reading["_id"] = str(reading["_id"]) + readings.append(reading) + + return { + "readings": readings, + "total_count": total_count, + "execution_time_ms": 0 # Placeholder + } + + except Exception as e: + logger.error(f"Error getting sensor data: {e}") + raise + + async def create_sensor(self, sensor_data) -> Dict[str, Any]: + """Create a new sensor""" + try: + # Check if sensor already exists + existing = await self.db.sensors.find_one({"sensor_id": sensor_data.sensor_id}) + if existing: + raise ValueError(f"Sensor {sensor_data.sensor_id} already exists") + + # Create sensor document + sensor_doc = { + "sensor_id": sensor_data.sensor_id, + "name": sensor_data.name, + "sensor_type": sensor_data.sensor_type.value if hasattr(sensor_data.sensor_type, 'value') else str(sensor_data.sensor_type), + "room": sensor_data.room, + "location": sensor_data.location if hasattr(sensor_data, 'location') else None, + "status": "active", + "created_at": datetime.utcnow(), + "updated_at": datetime.utcnow() + } + + result = await self.db.sensors.insert_one(sensor_doc) + + return {"created_at": datetime.utcnow()} + + except Exception as e: + logger.error(f"Error creating sensor: {e}") + raise + + async def update_sensor(self, sensor_id: str, update_data: Dict[str, Any]) -> bool: + """Update sensor metadata""" + try: + update_data["updated_at"] = datetime.utcnow() + + result = await self.db.sensors.update_one( + {"sensor_id": sensor_id}, + {"$set": update_data} + ) + + return result.modified_count > 0 + + except Exception as e: + logger.error(f"Error updating sensor: {e}") + raise + + async def delete_sensor(self, sensor_id: str) -> Dict[str, Any]: + """Delete a sensor and its data""" + try: + # Delete readings + readings_result = await self.db.sensor_readings.delete_many({"sensor_id": sensor_id}) + + # Delete sensor + await self.db.sensors.delete_one({"sensor_id": sensor_id}) + + return {"readings_deleted": readings_result.deleted_count} + + except Exception as e: + logger.error(f"Error deleting sensor: {e}") + raise + + async def ingest_sensor_data(self, sensor_data) -> Dict[str, Any]: + """Ingest real-time sensor data""" + try: + # Create reading document + reading_doc = { + "sensor_id": sensor_data.sensor_id, + "timestamp": sensor_data.timestamp, + "value": sensor_data.value, + "unit": sensor_data.unit if hasattr(sensor_data, 'unit') else None, + "room": sensor_data.room if hasattr(sensor_data, 'room') else None, + "created_at": datetime.utcnow() + } + + # Store in database + await self.db.sensor_readings.insert_one(reading_doc) + + # Cache recent value in Redis + if self.redis: + cache_key = f"sensor:{sensor_data.sensor_id}:latest" + await self.redis.setex(cache_key, 3600, json.dumps(reading_doc, default=str)) + + return {"success": True} + + except Exception as e: + logger.error(f"Error ingesting sensor data: {e}") + raise + + async def export_data(self, start_time: int, end_time: int, sensor_ids: Optional[str] = None, + format: str = "json") -> Dict[str, Any]: + """Export sensor data""" + try: + query = { + "timestamp": {"$gte": start_time, "$lte": end_time} + } + + if sensor_ids: + sensor_list = [s.strip() for s in sensor_ids.split(",")] + query["sensor_id"] = {"$in": sensor_list} + + cursor = self.db.sensor_readings.find(query).sort("timestamp", 1) + readings = [] + + async for reading in cursor: + reading["_id"] = str(reading["_id"]) + readings.append(reading) + + return { + "format": format, + "data": readings, + "total_records": len(readings), + "period": {"start": start_time, "end": end_time} + } + + except Exception as e: + logger.error(f"Error exporting data: {e}") + raise + + async def get_events(self, severity: Optional[str] = None, event_type: Optional[str] = None, + hours: int = 24, limit: int = 50) -> List[Dict[str, Any]]: + """Get system events""" + try: + start_time = datetime.utcnow() - timedelta(hours=hours) + + query = {"timestamp": {"$gte": start_time}} + + if severity: + query["severity"] = severity + if event_type: + query["event_type"] = event_type + + cursor = self.db.system_events.find(query).sort("timestamp", -1).limit(limit) + events = [] + + async for event in cursor: + event["_id"] = str(event["_id"]) + events.append(event) + + return events + + except Exception as e: + logger.error(f"Error getting events: {e}") + return [] + + async def cleanup_old_data(self, cutoff_date: datetime): + """Clean up old sensor data""" + try: + result = await self.db.sensor_readings.delete_many({ + "created_at": {"$lt": cutoff_date} + }) + + logger.info(f"Cleaned up {result.deleted_count} old sensor readings") + + except Exception as e: + logger.error(f"Error cleaning up old data: {e}") + raise \ No newline at end of file diff --git a/monolith/src/modules/sensors/websocket_manager.py b/monolith/src/modules/sensors/websocket_manager.py new file mode 100644 index 0000000..56b8e88 --- /dev/null +++ b/monolith/src/modules/sensors/websocket_manager.py @@ -0,0 +1,288 @@ +""" +WebSocket manager for real-time sensor data broadcasting +""" + +import asyncio +import json +from typing import List, Set, Dict, Any +from fastapi import WebSocket, WebSocketDisconnect +import logging + +from models import SensorReading + +logger = logging.getLogger(__name__) + +class WebSocketManager: + """Manages WebSocket connections for real-time data broadcasting""" + + def __init__(self): + self.active_connections: List[WebSocket] = [] + self.room_subscriptions: Dict[str, Set[WebSocket]] = {} + self.sensor_subscriptions: Dict[str, Set[WebSocket]] = {} + self.connection_metadata: Dict[WebSocket, Dict[str, Any]] = {} + + async def connect(self, websocket: WebSocket, room: str = None, sensor_id: str = None): + """Accept a WebSocket connection and handle subscriptions""" + await websocket.accept() + self.active_connections.append(websocket) + + # Store connection metadata + self.connection_metadata[websocket] = { + "connected_at": asyncio.get_event_loop().time(), + "room": room, + "sensor_id": sensor_id, + "message_count": 0 + } + + # Handle room subscription + if room: + if room not in self.room_subscriptions: + self.room_subscriptions[room] = set() + self.room_subscriptions[room].add(websocket) + + # Handle sensor subscription + if sensor_id: + if sensor_id not in self.sensor_subscriptions: + self.sensor_subscriptions[sensor_id] = set() + self.sensor_subscriptions[sensor_id].add(websocket) + + logger.info(f"WebSocket client connected. Total connections: {len(self.active_connections)}") + + # Send initial connection confirmation + await self.send_to_connection(websocket, { + "type": "connection_established", + "timestamp": asyncio.get_event_loop().time(), + "subscriptions": { + "room": room, + "sensor_id": sensor_id + }, + "total_connections": len(self.active_connections) + }) + + async def disconnect(self, websocket: WebSocket): + """Remove a WebSocket connection and clean up subscriptions""" + if websocket in self.active_connections: + self.active_connections.remove(websocket) + + # Clean up room subscriptions + for room_connections in self.room_subscriptions.values(): + room_connections.discard(websocket) + + # Clean up sensor subscriptions + for sensor_connections in self.sensor_subscriptions.values(): + sensor_connections.discard(websocket) + + # Clean up metadata + self.connection_metadata.pop(websocket, None) + + logger.info(f"WebSocket client disconnected. Total connections: {len(self.active_connections)}") + + async def send_to_connection(self, websocket: WebSocket, data: Dict[str, Any]): + """Send data to a specific WebSocket connection""" + try: + await websocket.send_text(json.dumps(data)) + + # Update message count + if websocket in self.connection_metadata: + self.connection_metadata[websocket]["message_count"] += 1 + + except Exception as e: + logger.error(f"Error sending data to WebSocket: {e}") + await self.disconnect(websocket) + + async def broadcast_to_all(self, data: Dict[str, Any]): + """Broadcast data to all connected WebSocket clients""" + if not self.active_connections: + return + + message = json.dumps(data) + disconnected = [] + + for websocket in self.active_connections: + try: + await websocket.send_text(message) + + # Update message count + if websocket in self.connection_metadata: + self.connection_metadata[websocket]["message_count"] += 1 + + except Exception as e: + logger.error(f"Error broadcasting to WebSocket: {e}") + disconnected.append(websocket) + + # Clean up disconnected connections + for websocket in disconnected: + await self.disconnect(websocket) + + async def broadcast_to_room(self, room: str, data: Dict[str, Any]): + """Broadcast data to all clients subscribed to a specific room""" + if room not in self.room_subscriptions: + return + + room_connections = self.room_subscriptions[room].copy() + if not room_connections: + return + + message = json.dumps(data) + disconnected = [] + + for websocket in room_connections: + try: + await websocket.send_text(message) + + # Update message count + if websocket in self.connection_metadata: + self.connection_metadata[websocket]["message_count"] += 1 + + except Exception as e: + logger.error(f"Error broadcasting to room {room}: {e}") + disconnected.append(websocket) + + # Clean up disconnected connections + for websocket in disconnected: + await self.disconnect(websocket) + + async def broadcast_to_sensor(self, sensor_id: str, data: Dict[str, Any]): + """Broadcast data to all clients subscribed to a specific sensor""" + if sensor_id not in self.sensor_subscriptions: + return + + sensor_connections = self.sensor_subscriptions[sensor_id].copy() + if not sensor_connections: + return + + message = json.dumps(data) + disconnected = [] + + for websocket in sensor_connections: + try: + await websocket.send_text(message) + + # Update message count + if websocket in self.connection_metadata: + self.connection_metadata[websocket]["message_count"] += 1 + + except Exception as e: + logger.error(f"Error broadcasting to sensor {sensor_id}: {e}") + disconnected.append(websocket) + + # Clean up disconnected connections + for websocket in disconnected: + await self.disconnect(websocket) + + async def broadcast_sensor_data(self, sensor_reading: SensorReading): + """Broadcast sensor reading data to appropriate subscribers""" + data = { + "type": "sensor_data", + "sensor_id": sensor_reading.sensor_id, + "room": sensor_reading.room, + "sensor_type": sensor_reading.sensor_type.value, + "timestamp": sensor_reading.timestamp, + "data": { + "energy": sensor_reading.energy, + "co2": sensor_reading.co2, + "temperature": sensor_reading.temperature, + "humidity": sensor_reading.humidity, + "motion": sensor_reading.motion, + "power": sensor_reading.power, + "voltage": sensor_reading.voltage, + "current": sensor_reading.current, + "generation": sensor_reading.generation + }, + "metadata": sensor_reading.metadata + } + + # Broadcast to all connections + await self.broadcast_to_all(data) + + # Broadcast to room-specific subscribers + if sensor_reading.room: + await self.broadcast_to_room(sensor_reading.room, data) + + # Broadcast to sensor-specific subscribers + await self.broadcast_to_sensor(sensor_reading.sensor_id, data) + + async def broadcast_room_metrics(self, room: str, metrics: Dict[str, Any]): + """Broadcast room-level metrics to subscribers""" + data = { + "type": "room_metrics", + "room": room, + "timestamp": asyncio.get_event_loop().time(), + "metrics": metrics + } + + # Broadcast to all connections + await self.broadcast_to_all(data) + + # Broadcast to room-specific subscribers + await self.broadcast_to_room(room, data) + + async def broadcast_system_event(self, event: Dict[str, Any]): + """Broadcast system events to all subscribers""" + data = { + "type": "system_event", + "timestamp": asyncio.get_event_loop().time(), + "event": event + } + + await self.broadcast_to_all(data) + + async def broadcast_raw_data(self, raw_data: str): + """Broadcast raw data from Redis or other sources""" + try: + # Try to parse as JSON + data = json.loads(raw_data) + + # Add type if not present + if "type" not in data: + data["type"] = "raw_data" + + await self.broadcast_to_all(data) + + except json.JSONDecodeError: + # Send as raw string if not JSON + data = { + "type": "raw_data", + "data": raw_data, + "timestamp": asyncio.get_event_loop().time() + } + await self.broadcast_to_all(data) + + def get_connection_stats(self) -> Dict[str, Any]: + """Get statistics about current WebSocket connections""" + total_connections = len(self.active_connections) + room_stats = {room: len(connections) for room, connections in self.room_subscriptions.items()} + sensor_stats = {sensor: len(connections) for sensor, connections in self.sensor_subscriptions.items()} + + # Calculate message statistics + total_messages = sum( + metadata.get("message_count", 0) + for metadata in self.connection_metadata.values() + ) + + return { + "total_connections": total_connections, + "room_subscriptions": room_stats, + "sensor_subscriptions": sensor_stats, + "total_messages_sent": total_messages, + "active_rooms": len([room for room, connections in self.room_subscriptions.items() if connections]), + "active_sensors": len([sensor for sensor, connections in self.sensor_subscriptions.items() if connections]) + } + + async def send_connection_stats(self): + """Send connection statistics to all clients""" + stats = self.get_connection_stats() + data = { + "type": "connection_stats", + "timestamp": asyncio.get_event_loop().time(), + "stats": stats + } + await self.broadcast_to_all(data) + + async def ping_all_connections(self): + """Send ping to all connections to keep them alive""" + data = { + "type": "ping", + "timestamp": asyncio.get_event_loop().time() + } + await self.broadcast_to_all(data) \ No newline at end of file