modular monolythic

This commit is contained in:
rafaeldpsilva
2025-12-20 00:51:04 +00:00
parent 6ed61b06e8
commit 4779eb9ded
24 changed files with 5135 additions and 0 deletions

View File

@@ -0,0 +1 @@
"""Core infrastructure components for the modular monolith."""

View File

@@ -0,0 +1,56 @@
"""Centralized configuration management."""
import os
from typing import Dict, Optional
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
"""Application settings."""
# Application
app_name: str = "Energy Dashboard Monolith"
app_version: str = "1.0.0"
debug: bool = False
host: str = "0.0.0.0"
port: int = 8000
# MongoDB
mongo_url: str = os.getenv("MONGO_URL", "mongodb://admin:password123@localhost:27017/?authSource=admin")
# Module-specific databases (preserving isolation)
sensors_db_name: str = "energy_dashboard_sensors"
demand_response_db_name: str = "energy_dashboard_demand_response"
data_ingestion_db_name: str = "digitalmente_ingestion"
main_db_name: str = "energy_dashboard"
# Redis
redis_url: str = os.getenv("REDIS_URL", "redis://localhost:6379")
redis_enabled: bool = True # Can be disabled for full monolith mode
# FTP Configuration (for data ingestion)
ftp_sa4cps_host: str = os.getenv("FTP_SA4CPS_HOST", "ftp.sa4cps.pt")
ftp_sa4cps_port: int = int(os.getenv("FTP_SA4CPS_PORT", "21"))
ftp_sa4cps_username: str = os.getenv("FTP_SA4CPS_USERNAME", "curvascarga@sa4cps.pt")
ftp_sa4cps_password: str = os.getenv("FTP_SA4CPS_PASSWORD", "")
ftp_sa4cps_remote_path: str = os.getenv("FTP_SA4CPS_REMOTE_PATH", "/SLGs/")
ftp_check_interval: int = int(os.getenv("FTP_CHECK_INTERVAL", "21600")) # 6 hours
ftp_skip_initial_scan: bool = os.getenv("FTP_SKIP_INITIAL_SCAN", "true").lower() == "true"
# CORS
cors_origins: list = ["*"]
cors_allow_credentials: bool = True
cors_allow_methods: list = ["*"]
cors_allow_headers: list = ["*"]
# Background Tasks
health_check_interval: int = 30
event_scheduler_interval: int = 60
auto_response_interval: int = 30
class Config:
env_file = ".env"
case_sensitive = False
# Global settings instance
settings = Settings()

View File

@@ -0,0 +1,85 @@
"""Database connection management for all modules."""
import logging
from typing import Optional, Dict
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase
from .config import settings
logger = logging.getLogger(__name__)
class DatabaseManager:
"""Manages MongoDB connections for all modules."""
def __init__(self):
self._client: Optional[AsyncIOMotorClient] = None
self._databases: Dict[str, AsyncIOMotorDatabase] = {}
async def connect(self):
"""Establish connection to MongoDB."""
try:
logger.info(f"Connecting to MongoDB: {settings.mongo_url}")
self._client = AsyncIOMotorClient(settings.mongo_url)
# Test connection
await self._client.admin.command('ping')
logger.info("Successfully connected to MongoDB")
# Initialize database references
self._databases = {
"main": self._client[settings.main_db_name],
"sensors": self._client[settings.sensors_db_name],
"demand_response": self._client[settings.demand_response_db_name],
"data_ingestion": self._client[settings.data_ingestion_db_name],
}
except Exception as e:
logger.error(f"Failed to connect to MongoDB: {e}")
raise
async def disconnect(self):
"""Close MongoDB connection."""
if self._client:
self._client.close()
logger.info("Disconnected from MongoDB")
def get_database(self, name: str) -> AsyncIOMotorDatabase:
"""Get database by name."""
if name not in self._databases:
raise ValueError(f"Database '{name}' not configured")
return self._databases[name]
@property
def client(self) -> AsyncIOMotorClient:
"""Get the MongoDB client."""
if not self._client:
raise RuntimeError("Database not connected. Call connect() first.")
return self._client
@property
def main_db(self) -> AsyncIOMotorDatabase:
"""Get main database."""
return self.get_database("main")
@property
def sensors_db(self) -> AsyncIOMotorDatabase:
"""Get sensors database."""
return self.get_database("sensors")
@property
def demand_response_db(self) -> AsyncIOMotorDatabase:
"""Get demand response database."""
return self.get_database("demand_response")
@property
def data_ingestion_db(self) -> AsyncIOMotorDatabase:
"""Get data ingestion database."""
return self.get_database("data_ingestion")
# Global database manager instance
db_manager = DatabaseManager()
async def get_database(name: str = "main") -> AsyncIOMotorDatabase:
"""Dependency injection function for database access."""
return db_manager.get_database(name)

View File

@@ -0,0 +1,39 @@
"""FastAPI dependency injection utilities."""
from typing import Optional
from fastapi import Depends, HTTPException, status
from motor.motor_asyncio import AsyncIOMotorDatabase
import redis.asyncio as aioredis
from .database import db_manager
from .redis import redis_manager
from .events import event_bus, EventBus
async def get_main_db() -> AsyncIOMotorDatabase:
"""Get main database dependency."""
return db_manager.main_db
async def get_sensors_db() -> AsyncIOMotorDatabase:
"""Get sensors database dependency."""
return db_manager.sensors_db
async def get_demand_response_db() -> AsyncIOMotorDatabase:
"""Get demand response database dependency."""
return db_manager.demand_response_db
async def get_data_ingestion_db() -> AsyncIOMotorDatabase:
"""Get data ingestion database dependency."""
return db_manager.data_ingestion_db
async def get_redis() -> Optional[aioredis.Redis]:
"""Get Redis client dependency."""
return redis_manager.client
def get_event_bus() -> EventBus:
"""Get event bus dependency."""
return event_bus

137
monolith/src/core/events.py Normal file
View File

@@ -0,0 +1,137 @@
"""In-process event bus for inter-module communication."""
import asyncio
import logging
from typing import Dict, List, Callable, Any, Set
from collections import defaultdict
from dataclasses import dataclass
from datetime import datetime
import json
logger = logging.getLogger(__name__)
@dataclass
class Event:
"""Event data structure."""
topic: str
data: Any
timestamp: datetime
source: str = "system"
def to_dict(self) -> dict:
"""Convert to dictionary."""
return {
"topic": self.topic,
"data": self.data,
"timestamp": self.timestamp.isoformat(),
"source": self.source
}
class EventBus:
"""
In-process event bus for replacing Redis pub/sub.
Provides asynchronous event publishing and subscription.
"""
def __init__(self):
self._subscribers: Dict[str, List[Callable]] = defaultdict(list)
self._event_history: List[Event] = []
self._max_history: int = 1000
self._lock = asyncio.Lock()
async def publish(self, topic: str, data: Any, source: str = "system"):
"""
Publish an event to a topic.
Args:
topic: Event topic/channel name
data: Event data (will be JSON serialized if dict)
source: Event source identifier
"""
event = Event(
topic=topic,
data=data,
timestamp=datetime.utcnow(),
source=source
)
# Store in history
async with self._lock:
self._event_history.append(event)
if len(self._event_history) > self._max_history:
self._event_history.pop(0)
# Notify subscribers
if topic in self._subscribers:
logger.debug(f"Publishing event to topic '{topic}': {len(self._subscribers[topic])} subscribers")
# Create tasks for all subscribers
tasks = []
for callback in self._subscribers[topic]:
tasks.append(self._call_subscriber(callback, event))
# Execute all callbacks concurrently
if tasks:
await asyncio.gather(*tasks, return_exceptions=True)
else:
logger.debug(f"No subscribers for topic '{topic}'")
async def _call_subscriber(self, callback: Callable, event: Event):
"""Call a subscriber callback with error handling."""
try:
if asyncio.iscoroutinefunction(callback):
await callback(event.data)
else:
callback(event.data)
except Exception as e:
logger.error(f"Error in event subscriber: {e}", exc_info=True)
def subscribe(self, topic: str, callback: Callable):
"""
Subscribe to events on a topic.
Args:
topic: Event topic/channel name
callback: Async or sync callback function that receives event data
"""
self._subscribers[topic].append(callback)
logger.info(f"Subscribed to topic '{topic}'. Total subscribers: {len(self._subscribers[topic])}")
def unsubscribe(self, topic: str, callback: Callable):
"""Unsubscribe from a topic."""
if topic in self._subscribers and callback in self._subscribers[topic]:
self._subscribers[topic].remove(callback)
logger.info(f"Unsubscribed from topic '{topic}'")
def get_topics(self) -> List[str]:
"""Get list of all topics with subscribers."""
return list(self._subscribers.keys())
def get_subscriber_count(self, topic: str) -> int:
"""Get number of subscribers for a topic."""
return len(self._subscribers.get(topic, []))
async def get_event_history(self, topic: str = None, limit: int = 100) -> List[Event]:
"""Get event history, optionally filtered by topic."""
async with self._lock:
if topic:
events = [e for e in self._event_history if e.topic == topic]
else:
events = self._event_history.copy()
return events[-limit:]
# Global event bus instance
event_bus = EventBus()
# Common event topics (replaces Redis channels)
class EventTopics:
"""Standard event topic names."""
ENERGY_DATA = "energy_data"
DR_EVENTS = "dr_events"
SENSOR_EVENTS = "sensor_events"
SYSTEM_EVENTS = "system_events"
DATA_INGESTION = "data_ingestion"

View File

@@ -0,0 +1,25 @@
"""Logging configuration."""
import logging
import sys
from .config import settings
def setup_logging():
"""Configure application logging."""
log_level = logging.DEBUG if settings.debug else logging.INFO
logging.basicConfig(
level=log_level,
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
handlers=[
logging.StreamHandler(sys.stdout)
]
)
# Set third-party loggers to WARNING
logging.getLogger("uvicorn").setLevel(logging.WARNING)
logging.getLogger("motor").setLevel(logging.WARNING)
logging.getLogger("redis").setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
logger.info(f"Logging configured. Level: {log_level}")

View File

@@ -0,0 +1,61 @@
"""Redis connection management (optional, for caching)."""
import logging
from typing import Optional
import redis.asyncio as aioredis
from .config import settings
logger = logging.getLogger(__name__)
class RedisManager:
"""Manages Redis connection for caching."""
def __init__(self):
self._client: Optional[aioredis.Redis] = None
async def connect(self):
"""Establish connection to Redis."""
if not settings.redis_enabled:
logger.info("Redis is disabled in settings")
return
try:
logger.info(f"Connecting to Redis: {settings.redis_url}")
self._client = await aioredis.from_url(
settings.redis_url,
encoding="utf-8",
decode_responses=True
)
# Test connection
await self._client.ping()
logger.info("Successfully connected to Redis")
except Exception as e:
logger.warning(f"Failed to connect to Redis: {e}. Continuing without Redis cache.")
self._client = None
async def disconnect(self):
"""Close Redis connection."""
if self._client:
await self._client.close()
logger.info("Disconnected from Redis")
@property
def client(self) -> Optional[aioredis.Redis]:
"""Get the Redis client."""
return self._client
@property
def is_available(self) -> bool:
"""Check if Redis is available."""
return self._client is not None
# Global Redis manager instance
redis_manager = RedisManager()
async def get_redis() -> Optional[aioredis.Redis]:
"""Dependency injection function for Redis access."""
return redis_manager.client

306
monolith/src/main.py Normal file
View File

@@ -0,0 +1,306 @@
"""
Main FastAPI application for the Energy Dashboard Modular Monolith.
Integrates all modules: sensors, demand-response, and data-ingestion.
"""
import asyncio
import logging
from contextlib import asynccontextmanager
from datetime import datetime, timedelta
from fastapi import FastAPI, HTTPException
from fastapi.middleware.cors import CORSMiddleware
# Core imports
from core.config import settings
from core.logging_config import setup_logging
from core.database import db_manager
from core.redis import redis_manager
from core.events import event_bus, EventTopics
# Module imports
from modules.sensors.router import router as sensors_router
from modules.sensors.room_service import RoomService
from modules.sensors import WebSocketManager
from modules.demand_response import DemandResponseService
# Setup logging
setup_logging()
logger = logging.getLogger(__name__)
# Background tasks
async def room_metrics_aggregation_task():
"""Periodically aggregate room-level metrics"""
logger.info("Starting room metrics aggregation task")
while True:
try:
room_service = RoomService(db_manager.sensors_db, redis_manager.client)
await room_service.aggregate_all_room_metrics()
await asyncio.sleep(300) # 5 minutes
except Exception as e:
logger.error(f"Error in room metrics aggregation: {e}")
await asyncio.sleep(600)
async def data_cleanup_task():
"""Periodic cleanup of old data"""
logger.info("Starting data cleanup task")
while True:
try:
from modules.sensors import SensorService
service = SensorService(db_manager.sensors_db, None)
cleanup_date = datetime.utcnow() - timedelta(days=90)
await service.cleanup_old_data(cleanup_date)
await asyncio.sleep(86400) # 24 hours
except Exception as e:
logger.error(f"Error in data cleanup task: {e}")
await asyncio.sleep(7200)
async def event_scheduler_task():
"""Background task for checking and executing scheduled DR events"""
logger.info("Starting event scheduler task")
while True:
try:
service = DemandResponseService(db_manager.demand_response_db, redis_manager.client)
await service.check_scheduled_events()
await asyncio.sleep(settings.event_scheduler_interval)
except Exception as e:
logger.error(f"Error in event scheduler task: {e}")
await asyncio.sleep(120)
async def auto_response_task():
"""Background task for automatic demand response"""
logger.info("Starting auto-response task")
while True:
try:
service = DemandResponseService(db_manager.demand_response_db, redis_manager.client)
await service.process_auto_responses()
await asyncio.sleep(settings.auto_response_interval)
except Exception as e:
logger.error(f"Error in auto-response task: {e}")
await asyncio.sleep(90)
async def energy_data_event_subscriber():
"""Subscribe to internal event bus for energy data events"""
logger.info("Starting energy data event subscriber")
async def handle_energy_data(data):
"""Handle energy data events"""
try:
service = DemandResponseService(db_manager.demand_response_db, redis_manager.client)
sensor_id = data.get("sensorId") or data.get("sensor_id")
power_kw = data.get("value", 0.0)
if sensor_id:
service.update_device_power_cache(sensor_id, power_kw)
except Exception as e:
logger.error(f"Error processing energy data event: {e}")
# Subscribe to energy data events
event_bus.subscribe(EventTopics.ENERGY_DATA, handle_energy_data)
async def ftp_monitoring_task():
"""Background task for FTP monitoring"""
logger.info("Starting FTP monitoring task")
while True:
try:
from modules.data_ingestion import FTPMonitor, SLGProcessor
ftp_monitor = FTPMonitor(db_manager.data_ingestion_db)
slg_processor = SLGProcessor(db_manager.data_ingestion_db)
await ftp_monitor.check_and_process_files(slg_processor)
await asyncio.sleep(settings.ftp_check_interval)
except Exception as e:
logger.error(f"Error in FTP monitoring task: {e}")
await asyncio.sleep(600)
# Application lifespan
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Manage application lifespan"""
logger.info(f"Starting {settings.app_name} v{settings.app_version}...")
# Connect to databases
await db_manager.connect()
await redis_manager.connect()
# Initialize default rooms
room_service = RoomService(db_manager.sensors_db, redis_manager.client)
await room_service.initialize_default_rooms()
# Subscribe to internal events
await energy_data_event_subscriber()
# Start background tasks
asyncio.create_task(room_metrics_aggregation_task())
asyncio.create_task(data_cleanup_task())
asyncio.create_task(event_scheduler_task())
asyncio.create_task(auto_response_task())
# Start FTP monitoring if not skipping initial scan
if not settings.ftp_skip_initial_scan:
asyncio.create_task(ftp_monitoring_task())
logger.info("Application startup complete")
yield
logger.info("Shutting down application...")
# Disconnect from databases
await db_manager.disconnect()
await redis_manager.disconnect()
logger.info("Application shutdown complete")
# Create FastAPI application
app = FastAPI(
title=settings.app_name,
description="Modular monolithic architecture for Energy Dashboard",
version=settings.app_version,
lifespan=lifespan
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins,
allow_credentials=settings.cors_allow_credentials,
allow_methods=settings.cors_allow_methods,
allow_headers=settings.cors_allow_headers,
)
# Root endpoint
@app.get("/")
async def root():
"""Root endpoint"""
return {
"service": settings.app_name,
"version": settings.app_version,
"status": "running",
"timestamp": datetime.utcnow().isoformat()
}
# Health check endpoint
@app.get("/health")
async def health_check():
"""Global health check"""
try:
# Check database connection
await db_manager.main_db.command("ping")
# Check Redis connection (optional)
redis_status = "disabled"
if redis_manager.is_available:
await redis_manager.client.ping()
redis_status = "healthy"
return {
"service": settings.app_name,
"version": settings.app_version,
"status": "healthy",
"timestamp": datetime.utcnow().isoformat(),
"components": {
"database": "healthy",
"redis": redis_status,
"event_bus": "healthy"
},
"modules": {
"sensors": "loaded",
"demand_response": "loaded",
"data_ingestion": "loaded"
}
}
except Exception as e:
logger.error(f"Health check failed: {e}")
raise HTTPException(status_code=503, detail="Service Unavailable")
# System overview endpoint
@app.get("/api/v1/overview")
async def system_overview():
"""Get system overview"""
try:
from modules.sensors import SensorService
from modules.demand_response import DemandResponseService
sensor_service = SensorService(db_manager.sensors_db, redis_manager.client)
dr_service = DemandResponseService(db_manager.demand_response_db, redis_manager.client)
# Get sensor counts
all_sensors = await sensor_service.get_sensors()
active_sensors = [s for s in all_sensors if s.get("status") == "online"]
# Get room counts
room_service = RoomService(db_manager.sensors_db, redis_manager.client)
all_rooms = await room_service.get_rooms()
# Get DR event counts
active_events = len(dr_service.active_events) if hasattr(dr_service, 'active_events') else 0
return {
"timestamp": datetime.utcnow().isoformat(),
"sensors": {
"total": len(all_sensors),
"active": len(active_sensors),
"offline": len(all_sensors) - len(active_sensors)
},
"rooms": {
"total": len(all_rooms)
},
"demand_response": {
"active_events": active_events
},
"event_bus": {
"topics": event_bus.get_topics(),
"total_topics": len(event_bus.get_topics())
}
}
except Exception as e:
logger.error(f"Error getting system overview: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
# Include module routers with prefixes
app.include_router(
sensors_router,
prefix="/api/v1",
tags=["sensors"]
)
# Note: Demand Response and Data Ingestion routers would be added here
# app.include_router(demand_response_router, prefix="/api/v1/demand-response", tags=["demand-response"])
# app.include_router(data_ingestion_router, prefix="/api/v1/ingestion", tags=["data-ingestion"])
if __name__ == "__main__":
import uvicorn
uvicorn.run(
"main:app",
host=settings.host,
port=settings.port,
reload=settings.debug
)

View File

@@ -0,0 +1,11 @@
"""Data Ingestion module - handles FTP monitoring and SA4CPS data processing."""
from .ftp_monitor import FTPMonitor
from .slg_processor import SLGProcessor
from .config import Config
__all__ = [
"FTPMonitor",
"SLGProcessor",
"Config",
]

View File

@@ -0,0 +1,45 @@
#!/usr/bin/env python3
"""
Configuration for SA4CPS Data Ingestion Service
Simple configuration management for FTP and MongoDB connections
"""
import os
from typing import Dict, Any
# FTP Configuration for SA4CPS server
FTP_CONFIG: Dict[str, Any] = {
"host": os.getenv("FTP_SA4CPS_HOST", "ftp.sa4cps.pt"),
"username": os.getenv("FTP_SA4CPS_USERNAME", "curvascarga@sa4cps.pt"),
"password": os.getenv("FTP_SA4CPS_PASSWORD", 'n$WFtz9+bleN'), # Set via environment variable
"base_path": os.getenv("FTP_SA4CPS_REMOTE_PATH", "/SLGs/"),
"check_interval": int(os.getenv("FTP_CHECK_INTERVAL", "21600")), # 6 hours default
"skip_initial_scan": os.getenv("FTP_SKIP_INITIAL_SCAN", "true").lower() == "true",
}
# MongoDB Configuration
# Debug environment variables
print(f"DEBUG: MONGO_URL env var = {os.getenv('MONGO_URL', 'NOT SET')}")
print(f"DEBUG: All env vars starting with MONGO: {[k for k in os.environ.keys() if k.startswith('MONGO')]}")
MONGO_CONFIG: Dict[str, Any] = {
"connection_string": os.getenv(
"MONGO_URL",
"mongodb://admin:password123@localhost:27017/digitalmente_ingestion?authSource=admin"
),
"database_name": os.getenv("MONGODB_DATABASE", "digitalmente_ingestion")
}
# Logging Configuration
LOGGING_CONFIG: Dict[str, Any] = {
"level": os.getenv("LOG_LEVEL", "INFO"),
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
}
# Service Configuration
SERVICE_CONFIG: Dict[str, Any] = {
"name": "SA4CPS Data Ingestion Service",
"version": "1.0.0",
"port": int(os.getenv("SERVICE_PORT", "8008")),
"host": os.getenv("SERVICE_HOST", "0.0.0.0")
}

View File

@@ -0,0 +1,478 @@
import logging
from datetime import datetime
from typing import List, Dict, Any, Optional
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError
from config import MONGO_CONFIG
logger = logging.getLogger(__name__)
class DatabaseManager:
def __init__(self):
self.client: Optional[MongoClient] = None
self.db = None
self.collections = {}
self.energy_collections_cache = {} # Cache for dynamically created energy data collections
self.connection_string = MONGO_CONFIG["connection_string"]
self.database_name = MONGO_CONFIG["database_name"]
logger.info(f"Database manager initialized for: {self.database_name}")
async def connect(self):
try:
logger.info(f"Connecting to MongoDB at: {self.connection_string}")
self.client = MongoClient(self.connection_string, serverSelectionTimeoutMS=5000)
await self.ping()
self.db = self.client[self.database_name]
self.collections = {
'files': self.db.sa4cps_files,
'metadata': self.db.sa4cps_metadata,
'scanned_directories': self.db.sa4cps_scanned_directories
}
self._create_base_indexes()
logger.info(f"Connected to MongoDB database: {self.database_name}")
except (ConnectionFailure, ServerSelectionTimeoutError) as e:
logger.error(f"Failed to connect to MongoDB: {e}")
raise
async def close(self):
"""Close MongoDB connection"""
if self.client:
self.client.close()
logger.debug("MongoDB connection closed")
async def ping(self):
"""Test database connection"""
if not self.client:
raise ConnectionFailure("No database connection")
try:
# Use async approach with timeout
import asyncio
import concurrent.futures
# Run the ping command in a thread pool to avoid blocking
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
await asyncio.wait_for(
loop.run_in_executor(pool, self.client.admin.command, 'ping'),
timeout=3.0 # 3 second timeout for ping
)
logger.debug("MongoDB ping successful")
except asyncio.TimeoutError:
logger.error("MongoDB ping timeout after 3 seconds")
raise ConnectionFailure("MongoDB ping timeout")
except ConnectionFailure as e:
logger.error(f"MongoDB ping failed - Server not available: {e}")
raise
except Exception as e:
logger.error(f"MongoDB ping failed with error: {e}")
raise ConnectionFailure(f"Ping failed: {e}")
def _create_base_indexes(self):
"""Create indexes for base collections (not energy data collections)"""
try:
self.collections['files'].create_index("filename", unique=True)
self.collections['files'].create_index("processed_at")
self.collections['files'].create_index("directory_path")
self.collections['scanned_directories'].create_index("directory_path", unique=True)
self.collections['scanned_directories'].create_index("last_scanned")
self.collections['scanned_directories'].create_index("scan_status")
logger.info("Database indexes created successfully")
except Exception as e:
logger.warning(f"Failed to create indexes: {e}")
def _extract_level3_path(self, directory_path: str) -> Optional[str]:
"""Extract level 3 directory path (SLGs/Community/Building) from full path"""
# Expected structure: /SLGs/Community/Building/...
parts = directory_path.strip('/').split('/')
if len(parts) >= 3 and parts[0] == 'SLGs':
# Return SLGs/Community/Building
return '/'.join(parts[:3])
return None
def _sanitize_collection_name(self, level3_path: str) -> str:
"""Convert level 3 directory path to valid MongoDB collection name
Example: SLGs/CommunityA/Building1 -> energy_data__CommunityA_Building1
"""
parts = level3_path.strip('/').split('/')
if len(parts) >= 3 and parts[0] == 'SLGs':
# Use Community_Building as the collection suffix
collection_suffix = f"{parts[1]}_{parts[2]}"
collection_name = f"energy_data__{collection_suffix}"
return collection_name
# Fallback: sanitize the entire path
sanitized = level3_path.replace('/', '_').replace('.', '_').replace(' ', '_')
sanitized = sanitized.strip('_')
return f"energy_data__{sanitized}"
def _get_energy_collection(self, directory_path: str):
"""Get or create energy data collection for a specific level 3 directory path"""
level3_path = self._extract_level3_path(directory_path)
if not level3_path:
logger.warning(f"Could not extract level 3 path from: {directory_path}, using default collection")
# Fallback to a default collection for non-standard paths
collection_name = "energy_data__other"
else:
collection_name = self._sanitize_collection_name(level3_path)
# Check cache first
if collection_name in self.energy_collections_cache:
return self.energy_collections_cache[collection_name]
# Create/get collection
collection = self.db[collection_name]
# Create indexes for this energy collection
try:
collection.create_index([("filename", 1), ("timestamp", 1)])
collection.create_index("timestamp")
collection.create_index("meter_id")
logger.debug(f"Created indexes for collection: {collection_name}")
except Exception as e:
logger.warning(f"Failed to create indexes for {collection_name}: {e}")
# Cache the collection
self.energy_collections_cache[collection_name] = collection
logger.info(f"Initialized energy data collection: {collection_name} for path: {directory_path}")
return collection
def _list_energy_collections(self) -> List[str]:
"""List all energy data collections in the database"""
try:
all_collections = self.db.list_collection_names()
# Filter collections that start with 'energy_data__'
energy_collections = [c for c in all_collections if c.startswith('energy_data__')]
return energy_collections
except Exception as e:
logger.error(f"Error listing energy collections: {e}")
return []
async def store_file_data(self, filename: str, records: List[Dict[str, Any]], directory_path: str = None) -> bool:
try:
current_time = datetime.now()
# Determine which collection to use based on directory path
if directory_path:
energy_collection = self._get_energy_collection(directory_path)
level3_path = self._extract_level3_path(directory_path)
else:
logger.warning(f"No directory path provided for {filename}, using default collection")
energy_collection = self._get_energy_collection("/SLGs/unknown/unknown")
level3_path = None
# Store file metadata
file_metadata = {
"filename": filename,
"directory_path": directory_path,
"level3_path": level3_path,
"record_count": len(records),
"processed_at": current_time,
"file_size": sum(len(str(record)) for record in records),
"status": "processed"
}
# Insert or update file record
self.collections['files'].replace_one(
{"filename": filename},
file_metadata,
upsert=True
)
# Add filename and processed timestamp to each record
for record in records:
record["filename"] = filename
record["processed_at"] = current_time
record["directory_path"] = directory_path
# Insert energy data records into the appropriate collection
if records:
result = energy_collection.insert_many(records)
inserted_count = len(result.inserted_ids)
logger.debug(f"Stored {inserted_count} records from {filename} to {energy_collection.name}")
return True
return False
except Exception as e:
logger.error(f"Error storing data for {filename}: {e}")
# Store error metadata
error_metadata = {
"filename": filename,
"directory_path": directory_path,
"processed_at": current_time,
"status": "error",
"error_message": str(e)
}
self.collections['files'].replace_one(
{"filename": filename},
error_metadata,
upsert=True
)
return False
async def get_processed_files(self) -> List[str]:
"""Get list of successfully processed files"""
try:
cursor = self.collections['files'].find(
{"status": "processed"},
{"filename": 1, "_id": 0}
)
files = []
for doc in cursor:
files.append(doc["filename"])
return files
except Exception as e:
logger.error(f"Error getting processed files: {e}")
return []
async def is_file_processed(self, filename: str) -> bool:
"""Mock check if file is processed"""
return filename in await self.get_processed_files()
async def get_file_info(self, filename: str) -> Optional[Dict[str, Any]]:
"""Get information about a specific file"""
try:
return self.collections['files'].find_one({"filename": filename})
except Exception as e:
logger.error(f"Error getting file info for {filename}: {e}")
return None
# Directory scanning tracking methods
# Note: Only level 4+ directories (/SLGs/Community/Building/SubDir) are tracked
# to avoid unnecessary caching of high-level organizational directories
async def is_directory_scanned(self, directory_path: str, since_timestamp: datetime = None) -> bool:
"""Check if directory has been scanned recently
Note: Only level 4+ directories are tracked in the database
"""
try:
query = {"directory_path": directory_path, "scan_status": "complete"}
if since_timestamp:
query["last_scanned"] = {"$gte": since_timestamp}
result = self.collections['scanned_directories'].find_one(query)
return result is not None
except Exception as e:
logger.error(f"Error checking directory scan status for {directory_path}: {e}")
return False
async def mark_directory_scanned(self, directory_path: str, file_count: int, ftp_last_modified: datetime = None) -> bool:
"""Mark directory as scanned with current timestamp"""
try:
scan_record = {
"directory_path": directory_path,
"last_scanned": datetime.now(),
"file_count": file_count,
"scan_status": "complete"
}
if ftp_last_modified:
scan_record["ftp_last_modified"] = ftp_last_modified
# Use upsert to update existing or create new record
self.collections['scanned_directories'].replace_one(
{"directory_path": directory_path},
scan_record,
upsert=True
)
logger.debug(f"Marked directory as scanned: {directory_path} ({file_count} files)")
return True
except Exception as e:
logger.error(f"Error marking directory as scanned {directory_path}: {e}")
return False
async def get_scanned_directories(self) -> List[Dict[str, Any]]:
"""Get all scanned directory records"""
try:
cursor = self.collections['scanned_directories'].find()
return list(cursor)
except Exception as e:
logger.error(f"Error getting scanned directories: {e}")
return []
async def should_skip_directory(self, directory_path: str, ftp_last_modified: datetime = None) -> bool:
"""Determine if directory should be skipped based on scan history and modification time"""
try:
scan_record = self.collections['scanned_directories'].find_one(
{"directory_path": directory_path, "scan_status": "complete"}
)
if not scan_record:
return False # Never scanned, should scan
# If we have FTP modification time and it's newer than our last scan, don't skip
if ftp_last_modified and scan_record.get("last_scanned"):
return ftp_last_modified <= scan_record["last_scanned"]
# If directory was scanned successfully, skip it (assuming it's historical data)
return True
except Exception as e:
logger.error(f"Error determining if directory should be skipped {directory_path}: {e}")
return False
async def get_stats(self) -> Dict[str, Any]:
"""Get database statistics including all energy collections"""
try:
stats = {
"database": self.database_name,
"timestamp": datetime.now().isoformat()
}
# Count documents in base collections
for name, collection in self.collections.items():
try:
count = collection.count_documents({})
stats[f"{name}_count"] = count
except Exception as e:
stats[f"{name}_count"] = f"error: {e}"
# Get all energy collections and their counts
try:
energy_collections = self._list_energy_collections()
energy_stats = []
total_energy_records = 0
for collection_name in energy_collections:
collection = self.db[collection_name]
count = collection.count_documents({})
total_energy_records += count
energy_stats.append({
"collection": collection_name,
"record_count": count
})
stats["energy_collections"] = energy_stats
stats["total_energy_collections"] = len(energy_collections)
stats["total_energy_records"] = total_energy_records
except Exception as e:
stats["energy_collections"] = f"error: {e}"
# Get recent files
try:
recent_files = []
cursor = self.collections['files'].find(
{},
{"filename": 1, "processed_at": 1, "record_count": 1, "status": 1, "directory_path": 1, "level3_path": 1, "_id": 0}
).sort("processed_at", -1).limit(5)
for doc in cursor:
if doc.get("processed_at"):
doc["processed_at"] = doc["processed_at"].isoformat()
recent_files.append(doc)
stats["recent_files"] = recent_files
except Exception as e:
stats["recent_files"] = f"error: {e}"
return stats
except Exception as e:
logger.error(f"Error getting database stats: {e}")
return {"error": str(e), "timestamp": datetime.now().isoformat()}
async def get_energy_data(self,
filename: Optional[str] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
directory_path: Optional[str] = None,
limit: int = 100) -> List[Dict[str, Any]]:
"""Retrieve energy data with optional filtering
Args:
filename: Filter by specific filename
start_time: Filter by start timestamp
end_time: Filter by end timestamp
directory_path: Filter by specific directory path (level 3). If None, queries all collections
limit: Maximum number of records to return
"""
try:
query = {}
if filename:
query["filename"] = filename
if start_time or end_time:
time_query = {}
if start_time:
time_query["$gte"] = start_time
if end_time:
time_query["$lte"] = end_time
query["timestamp"] = time_query
data = []
# If directory_path is specified, query only that collection
if directory_path:
collection = self._get_energy_collection(directory_path)
cursor = collection.find(query).sort("timestamp", -1).limit(limit)
for doc in cursor:
data.append(self._format_energy_document(doc))
else:
# Query across all energy collections
energy_collection_names = self._list_energy_collections()
# Collect data from all collections, then sort and limit
all_data = []
per_collection_limit = max(limit, 1000) # Get more from each to ensure we have enough after sorting
for collection_name in energy_collection_names:
collection = self.db[collection_name]
cursor = collection.find(query).sort("timestamp", -1).limit(per_collection_limit)
for doc in cursor:
all_data.append(self._format_energy_document(doc))
# Sort all data by timestamp and apply final limit
all_data.sort(key=lambda x: x.get("timestamp", ""), reverse=True)
data = all_data[:limit]
return data
except Exception as e:
logger.error(f"Error retrieving energy data: {e}")
return []
def _format_energy_document(self, doc: Dict[str, Any]) -> Dict[str, Any]:
"""Format energy document for API response"""
# Convert ObjectId to string and datetime to ISO string
if "_id" in doc:
doc["_id"] = str(doc["_id"])
if "timestamp" in doc and hasattr(doc["timestamp"], "isoformat"):
doc["timestamp"] = doc["timestamp"].isoformat()
if "processed_at" in doc and hasattr(doc["processed_at"], "isoformat"):
doc["processed_at"] = doc["processed_at"].isoformat()
return doc

View File

@@ -0,0 +1,339 @@
import asyncio
from ftplib import FTP
import logging
import os
from datetime import datetime
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
import tempfile
from config import FTP_CONFIG
from slg_processor import SLGProcessor
logger = logging.getLogger(__name__)
@dataclass
class FTPFileInfo:
path: str
name: str
size: int
directory_path: str # Directory containing the file
modified_time: Optional[datetime] = None
class FTPMonitor:
def __init__(self, db_manager):
self.db_manager = db_manager
self.processor = SLGProcessor()
self.last_check: Optional[datetime] = None
self.processed_files: set = set()
self.files_processed_count = 0
self.status = "initializing"
self.ftp_host = FTP_CONFIG["host"]
self.ftp_user = FTP_CONFIG["username"]
self.ftp_pass = FTP_CONFIG["password"]
self.base_path = FTP_CONFIG["base_path"]
self.check_interval = FTP_CONFIG["check_interval"]
self.skip_initial_scan = FTP_CONFIG["skip_initial_scan"]
logger.info(f"FTP Monitor initialized for {self.ftp_host}")
async def initialize_processed_files_cache(self):
try:
# Add timeout to prevent blocking startup indefinitely
processed_file_names = await asyncio.wait_for(
self.db_manager.get_processed_files(),
timeout=10.0 # 10 second timeout
)
for filename in processed_file_names:
self.processed_files.add(filename)
logger.info(f"Loaded {len(processed_file_names)} already processed files from database")
return len(processed_file_names)
except asyncio.TimeoutError:
logger.warning("Timeout loading processed files cache - continuing with empty cache")
return 0
except Exception as e:
logger.error(f"Error loading processed files from database: {e}")
return 0
async def start_monitoring(self):
self.status = "initializing"
logger.info("Starting FTP monitoring loop")
try:
await self.initialize_processed_files_cache()
logger.info("FTP monitor initialization completed")
except asyncio.CancelledError:
logger.info("FTP monitor initialization cancelled")
self.status = "stopped"
return
except Exception as e:
logger.error(f"Error during FTP monitor initialization: {e}")
self.status = "error"
try:
await asyncio.sleep(1800)
except asyncio.CancelledError:
logger.info("FTP monitor cancelled during error recovery")
self.status = "stopped"
return
await asyncio.wait_for(self.check_for_new_files(), timeout=300.0) # 5 minute timeout
self.status = "running"
# Optionally skip initial scan and wait for first scheduled interval
if self.skip_initial_scan:
logger.info(f"Skipping initial scan - waiting {self.check_interval/3600:.1f} hours for first scheduled check")
try:
await asyncio.sleep(self.check_interval)
except asyncio.CancelledError:
logger.info("FTP monitoring cancelled during initial wait")
self.status = "stopped"
return
while True:
try:
# Add timeout to prevent indefinite blocking on FTP operations
await asyncio.wait_for(self.check_for_new_files(), timeout=300.0) # 5 minute timeout
self.status = "running"
logger.info(f"Waiting {self.check_interval/3600:.1f} hours until next check")
await asyncio.sleep(self.check_interval)
except asyncio.TimeoutError:
logger.warning("FTP check timed out after 5 minutes - will retry")
self.status = "error"
try:
await asyncio.sleep(900) # Wait 15 minutes before retry
except asyncio.CancelledError:
logger.info("FTP monitoring task cancelled during timeout recovery")
self.status = "stopped"
break
except asyncio.CancelledError:
logger.info("FTP monitoring task cancelled - shutting down gracefully")
self.status = "stopped"
break
except Exception as e:
self.status = "error"
logger.error(f"Error in monitoring loop: {e}")
try:
await asyncio.sleep(1800) # Wait 30 minutes before retry
except asyncio.CancelledError:
logger.info("FTP monitoring task cancelled during error recovery")
self.status = "stopped"
break
async def check_for_new_files(self) -> Dict[str, Any]:
self.last_check = datetime.now()
logger.info(f"Checking FTP server at {self.last_check}")
try:
with FTP(self.ftp_host) as ftp:
ftp.login(self.ftp_user, self.ftp_pass)
logger.info(f"Connected to FTP server: {self.ftp_host}")
new_files = await self._find_slg_files(ftp)
processed_count = 0
skipped_count = 0
for file_info in new_files:
# Check for cancellation during file processing loop
if asyncio.current_task().cancelled():
raise asyncio.CancelledError()
if file_info.name in self.processed_files:
logger.debug(f"Skipping already processed file (cached): {file_info.name}")
skipped_count += 1
continue
if await self.db_manager.is_file_processed(file_info.name):
logger.debug(f"Skipping already processed file (database): {file_info.name}")
self.processed_files.add(file_info.name)
skipped_count += 1
continue
logger.debug(f"Processing new file: {file_info.name}")
success = await self._process_file(ftp, file_info)
if success:
self.processed_files.add(file_info.name)
processed_count += 1
logger.debug(f"Successfully processed file: {file_info.name} ({processed_count} total)")
self.files_processed_count += 1
result = {
"files_found": len(new_files),
"files_processed": processed_count,
"files_skipped": skipped_count,
"timestamp": self.last_check.isoformat()
}
logger.info(f"Check complete: {result}")
return result
except Exception as e:
logger.error(f"FTP check failed: {e}")
raise
async def _find_slg_files(self, ftp: FTP) -> List[FTPFileInfo]:
files = []
try:
await self._scan_directories_iterative(ftp, self.base_path, files)
logger.info(f"Found {len(files)} .slg_v2 files across all directories")
return files
except Exception as e:
logger.error(f"Error scanning FTP directory: {e}")
return []
async def _scan_directories_iterative(self, ftp: FTP, base_path: str, files: List[FTPFileInfo]):
directories_to_scan = [(base_path, 0)]
visited_dirs = set()
skipped_dirs = 0
scanned_dirs = 0
while directories_to_scan:
current_dir, current_depth = directories_to_scan.pop(0) # FIFO queue
normalized_path = current_dir.rstrip('/') if current_dir != '/' else '/'
if normalized_path in visited_dirs:
logger.debug(f"Skipping already visited directory: {normalized_path}")
continue
visited_dirs.add(normalized_path)
# Determine directory depth (level 4 = /SLGs/Community/Building/SubDir)
path_parts = normalized_path.strip('/').split('/')
directory_level = len(path_parts)
# Check if directory should be skipped based on previous scans (only for level 4+)
if directory_level >= 4 and await self.db_manager.should_skip_directory(normalized_path):
logger.info(f"Skipping previously scanned level {directory_level} directory: {normalized_path}")
skipped_dirs += 1
continue
logger.debug(f"Scanning directory: {normalized_path} (depth: {current_depth}, queue: {len(directories_to_scan)})")
scanned_dirs += 1
try:
original_dir = ftp.pwd()
ftp.cwd(current_dir)
dir_list = []
ftp.retrlines('LIST', dir_list.append)
logger.debug(f"Found {len(dir_list)} entries in {normalized_path}")
# Count files found in this directory
files_found_in_dir = 0
for line in dir_list:
parts = line.split()
if len(parts) >= 9:
filename = parts[-1]
permissions = parts[0]
if filename in ['.', '..']:
continue
if permissions.startswith('d'):
if normalized_path == '/':
subdirectory_path = f"/{filename}"
else:
subdirectory_path = f"{normalized_path}/{filename}"
subdirectory_normalized = subdirectory_path.rstrip('/') if subdirectory_path != '/' else '/'
if subdirectory_normalized not in visited_dirs:
directories_to_scan.append((subdirectory_path, current_depth + 1))
logger.debug(f"Added to queue: {subdirectory_path}")
else:
logger.debug(f"Skipping already visited: {subdirectory_path}")
elif filename.endswith('.sgl_v2'):
logger.debug(f"Found .slg_v2 file: {filename} in {normalized_path}")
try:
size = int(parts[4])
if normalized_path == '/':
full_path = f"/{filename}"
else:
full_path = f"{normalized_path}/{filename}"
files.append(FTPFileInfo(
path=full_path,
name=filename,
size=size,
directory_path=normalized_path
))
files_found_in_dir += 1
except (ValueError, IndexError):
logger.warning(f"Could not parse file info for: {filename}")
ftp.cwd(original_dir)
# Mark directory as scanned (only for level 4+ directories)
if directory_level >= 4:
await self.db_manager.mark_directory_scanned(normalized_path, files_found_in_dir)
logger.debug(f"Completed scanning level {directory_level} directory: {normalized_path} ({files_found_in_dir} files found)")
else:
logger.debug(f"Completed scanning level {directory_level} directory (not saved to cache): {normalized_path} ({files_found_in_dir} files found)")
except Exception as e:
logger.warning(f"Error scanning directory {normalized_path}: {e}")
continue
logger.info(f"Iterative scan completed. Scanned: {scanned_dirs} directories, Skipped: {skipped_dirs} directories (Total visited: {len(visited_dirs)})")
async def _process_file(self, ftp: FTP, file_info: FTPFileInfo) -> bool:
logger.debug(f"Processing file: {file_info.path} ({file_info.size} bytes) from directory: {file_info.directory_path}")
try:
with tempfile.NamedTemporaryFile(mode='wb', suffix='.slg_v2', delete=False) as temp_file:
temp_path = temp_file.name
with open(temp_path, 'wb') as f:
ftp.retrbinary(f'RETR {file_info.path}', f.write)
records = await self.processor.process_file(temp_path, file_info.name)
if records:
# Pass directory path to store_file_data for collection selection
await self.db_manager.store_file_data(file_info.name, records, file_info.directory_path)
logger.debug(f"Stored {len(records)} records from {file_info.name} to collection for {file_info.directory_path}")
return True
else:
logger.warning(f"No valid records found in {file_info.name}")
return False
except Exception as e:
logger.error(f"Error processing file {file_info.name}: {e}")
return False
finally:
try:
if 'temp_path' in locals():
os.unlink(temp_path)
except OSError:
pass
def get_status(self) -> str:
return self.status
def get_last_check_time(self) -> Optional[str]:
return self.last_check.isoformat() if self.last_check else None
def get_processed_count(self) -> int:
return self.files_processed_count
def get_detailed_status(self) -> Dict[str, Any]:
return {
"status": self.status,
"last_check": self.get_last_check_time(),
"files_processed": self.files_processed_count,
"processed_files_count": len(self.processed_files),
"check_interval_hours": self.check_interval / 3600,
"ftp_host": self.ftp_host,
"base_path": self.base_path,
}

View File

@@ -0,0 +1,171 @@
#!/usr/bin/env python3
"""
SA4CPS SLG_V2 File Processor
Simple parser for .slg_v2 energy data files
"""
import logging
from datetime import datetime
from typing import List, Dict, Any, Optional
import re
logger = logging.getLogger(__name__)
class SLGProcessor:
"""Processes SA4CPS .slg_v2 files into structured energy data records"""
def __init__(self):
self.processed_files = 0
self.total_records = 0
async def process_file(self, file_path: str, filename: str) -> List[Dict[str, Any]]:
"""Process a .slg_v2 file and return energy data records"""
logger.info(f"Processing SLG file: {filename}")
try:
with open(file_path, 'r', encoding='utf-8') as file:
lines = file.readlines()
records = []
file_metadata = self._parse_metadata(lines[:5]) # Parse first 5 lines for metadata
# Process data lines (lines starting with '20' are data records)
for line_num, line in enumerate(lines, 1):
line = line.strip()
if line.startswith('20'): # Data record lines start with '20' (year)
record = self._parse_data_line(line, file_metadata, filename)
if record:
records.append(record)
self.processed_files += 1
self.total_records += len(records)
logger.info(f"Processed {len(records)} records from {filename}")
return records
except Exception as e:
logger.error(f"Error processing {filename}: {e}")
return []
def _parse_metadata(self, header_lines: List[str]) -> Dict[str, Any]:
"""Parse metadata from SLG file header"""
metadata = {
"meter_id": None,
"measurement_type": None,
"unit": None,
"interval": None,
"period_start": None,
"period_end": None
}
try:
for line in header_lines:
line = line.strip()
if line.startswith('00'): # Header line with meter info
parts = line.split('\t')
if len(parts) >= 12:
metadata["meter_id"] = parts[3] # Meter ID
metadata["period_start"] = self._parse_date(parts[6])
metadata["period_end"] = self._parse_date(parts[7])
elif line.startswith('01'): # Measurement configuration
parts = line.split('\t')
if len(parts) >= 10:
metadata["measurement_type"] = parts[4] # POTENCIA
metadata["unit"] = parts[5] # K (kW)
metadata["interval"] = parts[6] # 15M
except Exception as e:
logger.warning(f"Error parsing metadata: {e}")
return metadata
def _parse_data_line(self, line: str, metadata: Dict[str, Any], filename: str) -> Optional[Dict[str, Any]]:
"""Parse a data line into an energy record"""
try:
parts = line.split('\t')
if len(parts) < 4:
return None
# Parse timestamp (format: 20250201 0015)
date_part = parts[1] # 20250201
time_part = parts[2] # 0015
# Convert to datetime
timestamp = self._parse_timestamp(date_part, time_part)
if not timestamp:
return None
# Parse energy value
value_str = parts[3].replace('.', '') # Remove decimal separator
try:
value = float(value_str) / 1000.0 # Convert from thousandths
except ValueError:
value = 0.0
# Create record
record = {
"timestamp": timestamp,
"meter_id": metadata.get("meter_id", "unknown"),
"measurement_type": metadata.get("measurement_type", "energy"),
"value": value,
"unit": metadata.get("unit", "kW"),
"interval": metadata.get("interval", "15M"),
"filename": filename,
"quality": int(parts[4]) if len(parts) > 4 else 0
}
return record
except Exception as e:
logger.warning(f"Error parsing data line '{line}': {e}")
return None
def _parse_date(self, date_str: str) -> Optional[datetime]:
"""Parse date string (YYYYMMDD format)"""
try:
if len(date_str) == 8 and date_str.isdigit():
year = int(date_str[:4])
month = int(date_str[4:6])
day = int(date_str[6:8])
return datetime(year, month, day)
except ValueError:
pass
return None
def _parse_timestamp(self, date_str: str, time_str: str) -> Optional[datetime]:
"""Parse timestamp from date and time strings"""
try:
# Parse date (YYYYMMDD)
if len(date_str) != 8 or not date_str.isdigit():
return None
year = int(date_str[:4])
month = int(date_str[4:6])
day = int(date_str[6:8])
# Parse time (HHMM)
if len(time_str) != 4 or not time_str.isdigit():
return None
hour = int(time_str[:2])
if hour ==24:
hour = 0
minute = int(time_str[2:4])
return datetime(year, month, day, hour, minute)
except ValueError as e:
logger.warning(f"Error parsing timestamp '{date_str} {time_str}': {e}")
return None
def get_stats(self) -> Dict[str, int]:
"""Get processing statistics"""
return {
"files_processed": self.processed_files,
"total_records": self.total_records
}

View File

@@ -0,0 +1,21 @@
"""Demand Response module - handles grid interaction and load management."""
from .models import (
DemandResponseInvitation,
InvitationResponse,
EventRequest,
EventStatus,
LoadReductionRequest,
FlexibilityResponse
)
from .demand_response_service import DemandResponseService
__all__ = [
"DemandResponseInvitation",
"InvitationResponse",
"EventRequest",
"EventStatus",
"LoadReductionRequest",
"FlexibilityResponse",
"DemandResponseService",
]

View File

@@ -0,0 +1,747 @@
"""
Demand Response Service - Core Business Logic
Handles DR invitations, event execution, auto-response, and flexibility calculation
"""
import asyncio
import json
import uuid
from datetime import datetime, timedelta
from typing import List, Dict, Optional, Any
import logging
from motor.motor_asyncio import AsyncIOMotorDatabase
import redis.asyncio as redis
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class DemandResponseService:
"""Core Demand Response service business logic"""
def __init__(self, db: AsyncIOMotorDatabase, redis_client: redis.Redis):
self.db = db
self.redis = redis_client
self.active_events: Dict[str, asyncio.Task] = {} # event_id -> task
self.device_power_cache: Dict[str, float] = {} # device_id -> power_kw (updated by Redis subscriber)
# ===== INVITATION MANAGEMENT =====
async def send_invitation(
self,
event_time: datetime,
load_kwh: float,
load_percentage: float,
iots: List[str],
duration_minutes: int = 59
) -> Dict[str, Any]:
"""
Create and send DR invitation
Returns: {"event_id": str, "response": str, "message": str}
"""
logger.info(f"Creating DR invitation for {len(iots)} devices at {event_time}")
# Generate unique event ID
event_id = str(uuid.uuid4())
# Check auto-response configuration
auto_config = await self.get_auto_response_config()
response = "YES" if auto_config.get("enabled", False) else "WAITING"
# Create invitation document
invitation = {
"event_id": event_id,
"created_at": datetime.utcnow(),
"event_time": event_time,
"load_kwh": load_kwh,
"load_percentage": load_percentage,
"iots": iots,
"duration_minutes": duration_minutes,
"response": response,
"status": "pending"
}
# Store in MongoDB
await self.db.demand_response_invitations.insert_one(invitation)
# Cache in Redis for fast access (24 hour TTL)
cache_key = f"dr:invitation:{event_id}"
await self.redis.setex(
cache_key,
86400,
json.dumps(invitation, default=str)
)
# Publish event to Redis pub/sub
await self.redis.publish("dr_events", json.dumps({
"event": "invitation_created",
"event_id": event_id,
"event_time": event_time.isoformat(),
"load_kwh": load_kwh,
"response": response
}))
logger.info(f"Invitation {event_id} created with response: {response}")
return {
"event_id": event_id,
"response": response,
"message": "Invitation created successfully"
}
async def answer_invitation(
self,
event_id: str,
iot_id: str,
response: str,
committed_reduction_kw: Optional[float] = None
) -> Dict[str, Any]:
"""
Record device response to invitation
Returns: {"success": bool, "message": str}
"""
logger.info(f"Recording response for invitation {event_id}, device {iot_id}: {response}")
# Validate invitation exists
invitation = await self.get_invitation(event_id)
if not invitation:
return {"success": False, "message": f"Invitation {event_id} not found"}
if iot_id not in invitation["iots"]:
return {"success": False, "message": f"Device {iot_id} not in invitation"}
# Check if already responded
existing = await self.db.demand_response_responses.find_one({
"event_id": event_id,
"device_id": iot_id
})
if existing:
return {"success": False, "message": f"Device {iot_id} has already responded"}
# Store response
response_doc = {
"event_id": event_id,
"device_id": iot_id,
"response": response,
"committed_reduction_kw": committed_reduction_kw,
"responded_at": datetime.utcnow()
}
await self.db.demand_response_responses.insert_one(response_doc)
# Check if all devices have responded
total_devices = len(invitation["iots"])
total_responses = await self.db.demand_response_responses.count_documents({"event_id": event_id})
if total_responses == total_devices:
# All devices responded - update invitation status
yes_count = await self.db.demand_response_responses.count_documents({
"event_id": event_id,
"response": "YES"
})
all_yes = yes_count == total_devices
new_response = "YES" if all_yes else "NO"
new_status = "scheduled" if all_yes else "cancelled"
await self.db.demand_response_invitations.update_one(
{"event_id": event_id},
{"$set": {"response": new_response, "status": new_status}}
)
logger.info(f"Invitation {event_id} final response: {new_response} (status: {new_status})")
# Clear cache
await self.redis.delete(f"dr:invitation:{event_id}")
# Publish event
await self.redis.publish("dr_events", json.dumps({
"event": "invitation_answered",
"event_id": event_id,
"device_id": iot_id,
"response": response
}))
return {"success": True, "message": "Response recorded successfully"}
async def get_invitation(self, event_id: str) -> Optional[Dict[str, Any]]:
"""
Get invitation by event_id (with Redis caching)
"""
# Try cache first
cache_key = f"dr:invitation:{event_id}"
cached = await self.redis.get(cache_key)
if cached:
invitation = json.loads(cached)
return invitation
# Fallback to MongoDB
invitation = await self.db.demand_response_invitations.find_one({"event_id": event_id})
if invitation:
invitation["_id"] = str(invitation["_id"])
# Cache for 24 hours
await self.redis.setex(
cache_key,
86400,
json.dumps(invitation, default=str)
)
return invitation
return None
async def get_unanswered_invitations(self) -> List[Dict[str, Any]]:
"""Get all pending invitations awaiting response"""
cursor = self.db.demand_response_invitations.find({
"response": "WAITING",
"status": "pending"
}).sort("created_at", -1)
invitations = []
async for inv in cursor:
inv["_id"] = str(inv["_id"])
invitations.append(inv)
return invitations
async def get_answered_invitations(self, hours: int = 24, limit: int = 50) -> List[Dict[str, Any]]:
"""Get recent answered invitations"""
start_time = datetime.utcnow() - timedelta(hours=hours)
cursor = self.db.demand_response_invitations.find({
"response": {"$ne": "WAITING"},
"created_at": {"$gte": start_time}
}).sort("created_at", -1).limit(limit)
invitations = []
async for inv in cursor:
inv["_id"] = str(inv["_id"])
invitations.append(inv)
return invitations
# ===== EVENT EXECUTION =====
async def schedule_event(
self,
event_time: datetime,
iots: List[str],
load_reduction_kw: float,
duration_minutes: int = 59
) -> Dict[str, Any]:
"""
Schedule a DR event for execution
Returns: {"event_id": str, "message": str}
"""
logger.info(f"Scheduling DR event for {len(iots)} devices at {event_time}")
# Create event document
event_id = str(uuid.uuid4())
end_time = event_time + timedelta(minutes=duration_minutes)
event = {
"event_id": event_id,
"start_time": event_time,
"end_time": end_time,
"status": "scheduled",
"participating_devices": iots,
"target_reduction_kw": load_reduction_kw,
"actual_reduction_kw": 0.0,
"power_samples": []
}
await self.db.demand_response_events.insert_one(event)
# Publish scheduled event
await self.redis.publish("dr_events", json.dumps({
"event": "event_scheduled",
"event_id": event_id,
"start_time": event_time.isoformat(),
"end_time": end_time.isoformat(),
"devices": iots
}))
logger.info(f"Event {event_id} scheduled successfully")
return {
"event_id": event_id,
"message": "Event scheduled successfully"
}
async def execute_event(self, event_id: str):
"""
Execute a DR event (spawns background task)
"""
logger.info(f"Executing DR event {event_id}")
# Get event details
event = await self.db.demand_response_events.find_one({"event_id": event_id})
if not event:
logger.error(f"Event {event_id} not found")
return
# Update status to active
await self.db.demand_response_events.update_one(
{"event_id": event_id},
{"$set": {"status": "active", "actual_start_time": datetime.utcnow()}}
)
# Publish event started
await self.redis.publish("dr_events", json.dumps({
"event": "event_started",
"event_id": event_id,
"devices": event["participating_devices"]
}))
# Create and store async task for this event
task = asyncio.create_task(self._run_event_loop(event))
self.active_events[event_id] = task
logger.info(f"DR event {event_id} started successfully")
async def _run_event_loop(self, event: Dict[str, Any]):
"""
CRITICAL: Core event execution loop - runs for duration_minutes
Samples power every 5 seconds, accumulates reduction, handles cancellation
"""
event_id = event["event_id"]
end_time = event["end_time"]
devices = event["participating_devices"]
total_reduction_kwh = 0.0
sample_count = 0
logger.info(f"Starting event loop for {event_id}, ending at {end_time}")
try:
while datetime.utcnow() < end_time:
# Get current power for all participating devices from cache
device_powers = {
device_id: self.device_power_cache.get(device_id, 0.0)
for device_id in devices
}
# Calculate reduction for this 5-second interval
# interval_hours = 5.0 / 3600.0 = 0.00139 hours
interval_reduction_kwh = sum(device_powers.values()) * (5.0 / 3600.0)
total_reduction_kwh += interval_reduction_kwh
sample_count += 1
# Store sample in MongoDB (every sample to maintain accuracy)
sample = {
"timestamp": datetime.utcnow(),
"device_powers": device_powers,
"interval_reduction_kwh": interval_reduction_kwh
}
await self.db.demand_response_events.update_one(
{"event_id": event_id},
{
"$push": {"power_samples": sample},
"$set": {"actual_reduction_kw": total_reduction_kwh}
}
)
# Update Redis cache for fast access to current reduction
cache_key = f"dr:event:active:{event_id}"
await self.redis.setex(
cache_key,
300, # 5 minute TTL
json.dumps({
"event_id": event_id,
"current_reduction_kwh": total_reduction_kwh,
"devices": device_powers,
"last_update": datetime.utcnow().isoformat()
}, default=str)
)
# Publish progress every 10 samples (50 seconds)
if sample_count % 10 == 0:
await self.redis.publish("dr_events", json.dumps({
"event": "event_progress",
"event_id": event_id,
"total_reduction_kwh": round(total_reduction_kwh, 3),
"device_powers": device_powers,
"timestamp": datetime.utcnow().isoformat()
}))
logger.info(f"Event {event_id} progress: {total_reduction_kwh:.3f} kWh ({sample_count} samples)")
# Sleep for 5 seconds
await asyncio.sleep(5)
# Event completed successfully
logger.info(f"Event {event_id} completed with {total_reduction_kwh:.3f} kWh reduction")
await self._complete_event(event_id, total_reduction_kwh)
except asyncio.CancelledError:
logger.info(f"Event {event_id} cancelled by user")
await self._cancel_event(event_id)
raise
except Exception as e:
logger.error(f"Error in event {event_id}: {e}", exc_info=True)
await self._cancel_event(event_id)
async def _complete_event(self, event_id: str, total_reduction_kwh: float):
"""Mark event as completed"""
await self.db.demand_response_events.update_one(
{"event_id": event_id},
{
"$set": {
"status": "completed",
"actual_end_time": datetime.utcnow(),
"actual_reduction_kw": total_reduction_kwh
}
}
)
# Remove from active events
self.active_events.pop(event_id, None)
# Clear cache
await self.redis.delete(f"dr:event:active:{event_id}")
# Publish completion
await self.redis.publish("dr_events", json.dumps({
"event": "event_completed",
"event_id": event_id,
"total_reduction_kwh": total_reduction_kwh
}))
logger.info(f"DR event {event_id} marked as completed")
async def _cancel_event(self, event_id: str):
"""Internal method to cancel an event"""
await self.db.demand_response_events.update_one(
{"event_id": event_id},
{
"$set": {
"status": "cancelled",
"cancelled_at": datetime.utcnow()
}
}
)
self.active_events.pop(event_id, None)
await self.redis.delete(f"dr:event:active:{event_id}")
# Publish cancellation
await self.redis.publish("dr_events", json.dumps({
"event": "event_cancelled",
"event_id": event_id,
"timestamp": datetime.utcnow().isoformat()
}))
async def cancel_event(self, event_id: str):
"""
Public method to cancel a running DR event gracefully
"""
logger.info(f"Cancelling DR event {event_id}")
# Cancel the async task
task = self.active_events.get(event_id)
if task and not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
# Expected - task cancelled successfully
pass
except Exception as e:
logger.error(f"Error cancelling event task {event_id}: {e}")
# Update database status (if not already done by _cancel_event)
event = await self.db.demand_response_events.find_one({"event_id": event_id})
if event and event.get("status") != "cancelled":
await self._cancel_event(event_id)
logger.info(f"DR event {event_id} cancelled successfully")
async def get_active_events(self) -> List[Dict[str, Any]]:
"""Get currently running events with real-time data"""
cursor = self.db.demand_response_events.find({
"status": "active"
}).sort("start_time", -1)
events = []
async for event in cursor:
event["_id"] = str(event["_id"])
# Add real-time data from cache
cache_key = f"dr:event:active:{event['event_id']}"
cached = await self.redis.get(cache_key)
if cached:
realtime_data = json.loads(cached)
event["current_reduction_kwh"] = realtime_data.get("current_reduction_kwh")
event["current_device_powers"] = realtime_data.get("devices")
events.append(event)
return events
# ===== DEVICE POWER INTEGRATION =====
def update_device_power_cache(self, device_id: str, power_kw: float):
"""
Update device power cache (called by Redis subscriber)
This is synchronous because it's just updating a dict
"""
self.device_power_cache[device_id] = power_kw
# No logging here to avoid spam (called every few seconds per device)
async def get_device_power(self, device_id: str) -> float:
"""Get current power for a device from cache"""
return self.device_power_cache.get(device_id, 0.0)
# ===== AUTO-RESPONSE CONFIGURATION =====
async def get_auto_response_config(self) -> Dict[str, Any]:
"""Get auto-response configuration"""
config = await self.db.auto_response_config.find_one({"config_id": "default"})
if not config:
# Create default config
default_config = {
"config_id": "default",
"enabled": False,
"max_reduction_percentage": 20.0,
"response_delay_seconds": 300,
"min_notice_minutes": 60,
"updated_at": datetime.utcnow()
}
await self.db.auto_response_config.insert_one(default_config)
return default_config
return config
async def set_auto_response_config(
self,
enabled: bool,
max_reduction_percentage: float = 20.0,
response_delay_seconds: int = 300,
min_notice_minutes: int = 60
) -> Dict[str, Any]:
"""Update auto-response configuration"""
await self.db.auto_response_config.update_one(
{"config_id": "default"},
{
"$set": {
"enabled": enabled,
"max_reduction_percentage": max_reduction_percentage,
"response_delay_seconds": response_delay_seconds,
"min_notice_minutes": min_notice_minutes,
"updated_at": datetime.utcnow()
}
},
upsert=True
)
# Clear cache
await self.redis.delete("dr:config:auto_response")
logger.info(f"Auto-response config updated: enabled={enabled}")
return await self.get_auto_response_config()
async def process_auto_responses(self):
"""
Process pending invitations with auto-response (called by background task)
"""
# Get auto-response configuration
auto_config = await self.get_auto_response_config()
if not auto_config.get("enabled"):
return
# Find unanswered invitations
invitations = await self.get_unanswered_invitations()
for invitation in invitations:
event_id = invitation["event_id"]
event_time = invitation["event_time"]
# Parse event_time (might be string from cache)
if isinstance(event_time, str):
event_time = datetime.fromisoformat(event_time.replace('Z', '+00:00'))
# Check if event is within auto-response criteria
time_until_event = (event_time - datetime.utcnow()).total_seconds() / 60 # minutes
min_notice = auto_config.get("min_notice_minutes", 60)
if time_until_event >= min_notice:
logger.info(f"Auto-responding to invitation {event_id}")
# Auto-accept for all devices
for device_id in invitation["iots"]:
# Check if already responded
existing = await self.db.demand_response_responses.find_one({
"event_id": event_id,
"device_id": device_id
})
if not existing:
# Get device current power
device_power = await self.get_device_power(device_id)
# Calculate committed reduction based on max_reduction_percentage
max_reduction_pct = auto_config.get("max_reduction_percentage", 20.0)
committed_reduction = device_power * (max_reduction_pct / 100) if device_power > 0 else 0.5
# Submit auto-response
try:
await self.answer_invitation(event_id, device_id, "YES", committed_reduction)
logger.info(f"Auto-accepted for device {device_id} with {committed_reduction:.2f} kW commitment")
except Exception as e:
logger.error(f"Error auto-responding for {device_id}: {e}")
else:
logger.debug(f"Invitation {event_id} too soon ({time_until_event:.0f}m < {min_notice}m)")
# ===== BACKGROUND TASK SUPPORT =====
async def check_scheduled_events(self):
"""
Check for events that need to be started (called by scheduler task)
"""
now = datetime.utcnow()
threshold = now + timedelta(minutes=1) # Start events within next minute
# Find scheduled events that should start
cursor = self.db.demand_response_events.find({
"status": "scheduled",
"start_time": {"$lte": threshold, "$gte": now}
})
async for event in cursor:
event_id = event["event_id"]
# Check if not already active
if event_id not in self.active_events:
logger.info(f"Starting scheduled DR event {event_id}")
await self.execute_event(event_id)
# ===== BASIC FLEXIBILITY CALCULATION =====
async def get_current_flexibility(self) -> Dict[str, Any]:
"""
Calculate current available flexibility from device power cache
"""
total_flexibility_kw = 0.0
devices = []
# Get all devices with instructions
cursor = self.db.device_instructions.find({})
current_hour = datetime.utcnow().hour
async for device_doc in cursor:
device_id = device_doc["device_id"]
instruction = device_doc["instructions"].get(str(current_hour), "off")
if instruction != "off":
# Get device current power from cache
device_power = self.device_power_cache.get(device_id, 0.0)
if instruction == "participation":
# Full flexibility (100%)
flexibility = device_power
elif instruction == "shifting":
# Partial flexibility (20%)
flexibility = device_power * 0.20
else:
flexibility = 0.0
if flexibility > 0:
devices.append({
"device_id": device_id,
"available_kw": round(flexibility, 2),
"instruction": instruction,
"current_power": round(device_power, 2)
})
total_flexibility_kw += flexibility
snapshot = {
"timestamp": datetime.utcnow(),
"total_flexibility_kw": round(total_flexibility_kw, 2),
"devices": devices
}
# Store snapshot
await self.db.flexibility_snapshots.insert_one(dict(snapshot))
# Cache for 5 minutes
await self.redis.setex(
"dr:flexibility:current",
300,
json.dumps(snapshot, default=str)
)
return snapshot
async def get_device_instructions(self, device_id: Optional[str] = None) -> Dict[str, Any]:
"""Get DR instructions for device(s)"""
if device_id:
doc = await self.db.device_instructions.find_one({"device_id": device_id})
return doc if doc else {"device_id": device_id, "instructions": {}}
else:
cursor = self.db.device_instructions.find({})
instructions = {}
async for doc in cursor:
instructions[doc["device_id"]] = doc["instructions"]
return instructions
async def update_device_instructions(self, device_id: str, instructions: Dict[str, str]):
"""Update hourly instructions for a device"""
await self.db.device_instructions.update_one(
{"device_id": device_id},
{
"$set": {
"instructions": instructions,
"updated_at": datetime.utcnow()
}
},
upsert=True
)
logger.info(f"Updated instructions for device {device_id}")
# ===== ANALYTICS =====
async def get_performance_analytics(self, days: int = 30) -> Dict[str, Any]:
"""Get DR performance analytics"""
start_date = datetime.utcnow() - timedelta(days=days)
# Query completed events
cursor = self.db.demand_response_events.find({
"status": "completed",
"start_time": {"$gte": start_date}
})
events = await cursor.to_list(length=None)
if not events:
return {
"period_days": days,
"total_events": 0,
"total_reduction_kwh": 0.0,
"total_target_kwh": 0.0,
"average_reduction_kwh": 0.0,
"achievement_rate": 0.0,
"average_event_duration_minutes": 59
}
total_reduction = sum(e.get("actual_reduction_kw", 0) for e in events)
total_target = sum(e.get("target_reduction_kw", 0) for e in events)
return {
"period_days": days,
"total_events": len(events),
"total_reduction_kwh": round(total_reduction, 2),
"total_target_kwh": round(total_target, 2),
"average_reduction_kwh": round(total_reduction / len(events), 2),
"achievement_rate": round((total_reduction / total_target * 100) if total_target > 0 else 0, 2),
"average_event_duration_minutes": 59
}

View File

@@ -0,0 +1,338 @@
"""
Pydantic models for Demand Response Service
"""
from datetime import datetime
from typing import List, Dict, Optional, Literal
from pydantic import BaseModel, Field
from enum import Enum
# Enums
class InvitationStatus(str, Enum):
"""Invitation status states"""
PENDING = "pending"
SCHEDULED = "scheduled"
ACTIVE = "active"
COMPLETED = "completed"
CANCELLED = "cancelled"
class ResponseType(str, Enum):
"""Device response types"""
WAITING = "WAITING"
YES = "YES"
NO = "NO"
class EventStatus(str, Enum):
"""DR event status states"""
SCHEDULED = "scheduled"
ACTIVE = "active"
COMPLETED = "completed"
CANCELLED = "cancelled"
class InstructionType(str, Enum):
"""Device participation instruction types"""
PARTICIPATION = "participation" # Full DR participation (100%)
SHIFTING = "shifting" # Partial participation (0-20%)
OFF = "off" # No DR participation
# Invitation Models
class EventRequest(BaseModel):
"""Request model for creating a DR event (alias for DRInvitationCreate)"""
event_time: datetime = Field(..., description="When the DR event should occur")
load_kwh: float = Field(..., description="Target load reduction in kWh", gt=0)
load_percentage: float = Field(..., description="Target reduction as percentage of total load", ge=0, le=100)
iots: List[str] = Field(..., description="List of device IDs to participate", min_items=1)
duration_minutes: int = Field(59, description="Event duration in minutes", gt=0, le=120)
class Config:
json_schema_extra = {
"example": {
"event_time": "2025-12-10T14:00:00",
"load_kwh": 5.0,
"load_percentage": 15.0,
"iots": ["sensor_1", "sensor_2"],
"duration_minutes": 59
}
}
class DRInvitationCreate(BaseModel):
"""Request model for creating a DR invitation"""
event_time: datetime = Field(..., description="When the DR event should occur")
load_kwh: float = Field(..., description="Target load reduction in kWh", gt=0)
load_percentage: float = Field(..., description="Target reduction as percentage of total load", ge=0, le=100)
iots: List[str] = Field(..., description="List of device IDs to participate", min_items=1)
duration_minutes: int = Field(59, description="Event duration in minutes", gt=0, le=120)
class Config:
json_schema_extra = {
"example": {
"event_time": "2025-12-10T14:00:00",
"load_kwh": 5.0,
"load_percentage": 15.0,
"iots": ["sensor_1", "sensor_2"],
"duration_minutes": 59
}
}
class DRInvitationResponse(BaseModel):
"""Response model for device answering invitation"""
event_id: str = Field(..., description="Event identifier")
iot_id: str = Field(..., description="Device identifier")
response: ResponseType = Field(..., description="Device response (YES/NO)")
committed_reduction_kw: Optional[float] = Field(None, description="Committed power reduction in kW", ge=0)
class Config:
json_schema_extra = {
"example": {
"event_id": "550e8400-e29b-41d4-a716-446655440000",
"iot_id": "sensor_1",
"response": "YES",
"committed_reduction_kw": 2.5
}
}
class DRInvitation(BaseModel):
"""Full DR invitation model"""
event_id: str = Field(..., description="Unique event identifier")
created_at: datetime = Field(..., description="Invitation creation time")
event_time: datetime = Field(..., description="Scheduled event start time")
load_kwh: float = Field(..., description="Target load reduction in kWh")
load_percentage: float = Field(..., description="Target reduction percentage")
iots: List[str] = Field(..., description="Participating device IDs")
duration_minutes: int = Field(..., description="Event duration in minutes")
response: str = Field(..., description="Overall response status")
status: str = Field(..., description="Invitation status")
class Config:
json_encoders = {
datetime: lambda v: v.isoformat() if v else None
}
json_schema_extra = {
"example": {
"event_id": "550e8400-e29b-41d4-a716-446655440000",
"created_at": "2025-12-10T13:45:00",
"event_time": "2025-12-10T14:00:00",
"load_kwh": 5.0,
"load_percentage": 15.0,
"iots": ["sensor_1", "sensor_2"],
"duration_minutes": 59,
"response": "WAITING",
"status": "pending"
}
}
# Event Models
class EventScheduleRequest(BaseModel):
"""Request model for scheduling a DR event"""
event_time: datetime = Field(..., description="Event start time")
iots: List[str] = Field(..., description="Participating device IDs", min_items=1)
load_reduction_kw: float = Field(..., description="Target reduction in kW", gt=0)
duration_minutes: int = Field(59, description="Event duration in minutes", gt=0, le=120)
class Config:
json_schema_extra = {
"example": {
"event_time": "2025-12-10T14:00:00",
"iots": ["sensor_1", "sensor_2"],
"load_reduction_kw": 5.0,
"duration_minutes": 59
}
}
class PowerSample(BaseModel):
"""Individual power sample during event"""
timestamp: datetime = Field(..., description="Sample timestamp")
device_powers: Dict[str, float] = Field(..., description="Device power readings (device_id -> kW)")
interval_reduction_kwh: Optional[float] = Field(None, description="Reduction for this interval")
class DREvent(BaseModel):
"""DR event execution model"""
event_id: str = Field(..., description="Unique event identifier")
invitation_id: Optional[str] = Field(None, description="Source invitation ID if applicable")
start_time: datetime = Field(..., description="Event start time")
end_time: datetime = Field(..., description="Event end time")
status: EventStatus = Field(..., description="Event status")
participating_devices: List[str] = Field(..., description="Device IDs participating")
target_reduction_kw: float = Field(..., description="Target power reduction in kW")
actual_reduction_kw: float = Field(0.0, description="Actual achieved reduction in kWh")
power_samples: List[Dict] = Field(default_factory=list, description="Power samples during event")
class Config:
json_encoders = {
datetime: lambda v: v.isoformat() if v else None
}
class ActiveEventResponse(BaseModel):
"""Response model for active event with real-time data"""
event_id: str = Field(..., description="Event identifier")
status: EventStatus = Field(..., description="Current status")
start_time: datetime = Field(..., description="Event start time")
end_time: datetime = Field(..., description="Event end time")
participating_devices: List[str] = Field(..., description="Participating devices")
target_reduction_kw: float = Field(..., description="Target reduction")
actual_reduction_kw: float = Field(..., description="Current achieved reduction")
current_device_powers: Optional[Dict[str, float]] = Field(None, description="Current device power readings")
progress_percentage: Optional[float] = Field(None, description="Event progress (0-100%)")
class Config:
json_encoders = {
datetime: lambda v: v.isoformat() if v else None
}
class LoadReductionRequest(BaseModel):
"""Request model for executing load reduction"""
event_time: datetime = Field(..., description="Event start time")
iot: str = Field(..., description="Device ID")
class Config:
json_schema_extra = {
"example": {
"event_time": "2025-12-10T14:00:00",
"iot": "sensor_1"
}
}
# Flexibility Models
class DeviceFlexibility(BaseModel):
"""Per-device flexibility information"""
device_id: str = Field(..., description="Device identifier")
available_kw: float = Field(..., description="Available flexibility in kW", ge=0)
instruction: str = Field(..., description="Current DR instruction")
current_power: float = Field(..., description="Current power consumption in kW", ge=0)
class FlexibilityResponse(BaseModel):
"""Response model for current flexibility"""
timestamp: datetime = Field(..., description="Calculation timestamp")
total_flexibility_kw: float = Field(..., description="Total available flexibility in kW", ge=0)
devices: List[DeviceFlexibility] = Field(..., description="Per-device breakdown")
class Config:
json_encoders = {
datetime: lambda v: v.isoformat() if v else None
}
json_schema_extra = {
"example": {
"timestamp": "2025-12-10T13:45:00",
"total_flexibility_kw": 15.5,
"devices": [
{
"device_id": "sensor_1",
"available_kw": 3.5,
"instruction": "participation",
"current_power": 3.5
},
{
"device_id": "sensor_2",
"available_kw": 0.8,
"instruction": "shifting",
"current_power": 4.0
}
]
}
}
class DeviceInstructionUpdate(BaseModel):
"""Model for updating device instructions"""
device_id: str = Field(..., description="Device identifier")
instructions: Dict[str, str] = Field(..., description="Hourly instructions (hour -> instruction type)")
class Config:
json_schema_extra = {
"example": {
"device_id": "sensor_1",
"instructions": {
"0": "participation",
"1": "shifting",
"2": "off",
"3": "participation"
}
}
}
# Configuration Models
class AutoResponseConfig(BaseModel):
"""Auto-response configuration model"""
enabled: bool = Field(..., description="Whether auto-response is enabled")
max_reduction_percentage: float = Field(20.0, description="Maximum reduction percentage for auto-accept", ge=0, le=100)
response_delay_seconds: int = Field(300, description="Delay before auto-responding (seconds)", ge=0)
min_notice_minutes: int = Field(60, description="Minimum notice required for auto-accept (minutes)", ge=0)
class Config:
json_schema_extra = {
"example": {
"enabled": True,
"max_reduction_percentage": 20.0,
"response_delay_seconds": 300,
"min_notice_minutes": 60
}
}
# Response Models
class InvitationSendResponse(BaseModel):
"""Response for sending invitation"""
event_id: str = Field(..., description="Created event identifier")
response: str = Field(..., description="Initial response status")
message: str = Field(..., description="Status message")
class InvitationAnswerResponse(BaseModel):
"""Response for answering invitation"""
success: bool = Field(..., description="Whether answer was recorded")
message: str = Field(..., description="Status message")
class EventScheduleResponse(BaseModel):
"""Response for scheduling event"""
event_id: str = Field(..., description="Scheduled event identifier")
message: str = Field(..., description="Status message")
class PerformanceAnalytics(BaseModel):
"""Performance analytics response"""
period_days: int = Field(..., description="Analysis period in days")
total_events: int = Field(..., description="Total number of events")
total_reduction_kwh: float = Field(..., description="Total energy reduced")
total_target_kwh: float = Field(..., description="Total target reduction")
average_reduction_kwh: float = Field(..., description="Average reduction per event")
achievement_rate: float = Field(..., description="Achievement rate (%)")
average_event_duration_minutes: int = Field(..., description="Average event duration")
# Health Check Model
class HealthResponse(BaseModel):
"""Health check response model"""
service: str = Field(..., description="Service name")
status: str = Field(..., description="Service status")
timestamp: datetime = Field(..., description="Check timestamp")
version: str = Field(..., description="Service version")
class Config:
json_encoders = {
datetime: lambda v: v.isoformat() if v else None
}
json_schema_extra = {
"example": {
"service": "demand-response-service",
"status": "healthy",
"timestamp": "2025-12-10T13:45:00",
"version": "1.0.0"
}
}

View File

@@ -0,0 +1,39 @@
"""Sensors module - handles sensor management, rooms, and analytics."""
from .models import (
SensorReading,
SensorMetadata,
RoomMetrics,
SystemEvent,
Room,
RoomCreate,
RoomUpdate,
RoomInfo,
SensorType,
SensorStatus,
CO2Status,
OccupancyLevel
)
from .sensor_service import SensorService
from .room_service import RoomService
from .analytics_service import AnalyticsService
from .websocket_manager import WebSocketManager
__all__ = [
"SensorReading",
"SensorMetadata",
"RoomMetrics",
"SystemEvent",
"Room",
"RoomCreate",
"RoomUpdate",
"RoomInfo",
"SensorType",
"SensorStatus",
"CO2Status",
"OccupancyLevel",
"SensorService",
"RoomService",
"AnalyticsService",
"WebSocketManager",
]

View File

@@ -0,0 +1,377 @@
"""
Analytics service for processing sensor data and generating insights
"""
import logging
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
import json
logger = logging.getLogger(__name__)
class AnalyticsService:
"""Service for analytics and data processing"""
def __init__(self, db, redis_client):
self.db = db
self.redis = redis_client
async def query_data(self, query_params) -> Dict[str, Any]:
"""Execute advanced data query"""
try:
# Build query
query = {}
if hasattr(query_params, 'sensor_ids') and query_params.sensor_ids:
query["sensor_id"] = {"$in": query_params.sensor_ids}
if hasattr(query_params, 'start_time') and query_params.start_time:
query.setdefault("timestamp", {})["$gte"] = query_params.start_time
if hasattr(query_params, 'end_time') and query_params.end_time:
query.setdefault("timestamp", {})["$lte"] = query_params.end_time
# Execute query
cursor = self.db.sensor_readings.find(query)
if hasattr(query_params, 'limit') and query_params.limit:
cursor = cursor.limit(query_params.limit)
if hasattr(query_params, 'offset') and query_params.offset:
cursor = cursor.skip(query_params.offset)
cursor = cursor.sort("timestamp", -1)
# Get results
results = []
async for reading in cursor:
reading["_id"] = str(reading["_id"])
results.append(reading)
# Get total count
total_count = await self.db.sensor_readings.count_documents(query)
return {
"data": results,
"total_count": total_count,
"query": query_params.__dict__ if hasattr(query_params, '__dict__') else {},
"execution_time_ms": 0 # Placeholder
}
except Exception as e:
logger.error(f"Error executing data query: {e}")
raise
async def get_analytics_summary(self, hours: int = 24) -> Dict[str, Any]:
"""Get comprehensive analytics summary"""
try:
start_time = datetime.utcnow() - timedelta(hours=hours)
# Get basic statistics
pipeline = [
{
"$match": {
"created_at": {"$gte": start_time}
}
},
{
"$group": {
"_id": None,
"total_readings": {"$sum": 1},
"average_value": {"$avg": "$value"},
"min_value": {"$min": "$value"},
"max_value": {"$max": "$value"},
"unique_sensors": {"$addToSet": "$sensor_id"}
}
}
]
cursor = self.db.sensor_readings.aggregate(pipeline)
stats = await cursor.to_list(length=1)
base_stats = stats[0] if stats else {
"total_readings": 0,
"average_value": 0,
"min_value": 0,
"max_value": 0,
"unique_sensors": []
}
# Get room-level statistics
room_stats = await self._get_room_analytics(hours)
# Get energy trends
energy_trends = await self._get_energy_trends(hours)
return {
"period_hours": hours,
"timestamp": datetime.utcnow().isoformat(),
"total_readings": base_stats["total_readings"],
"unique_sensors": len(base_stats["unique_sensors"]),
"value_statistics": {
"average": round(base_stats["average_value"], 2) if base_stats["average_value"] else 0,
"minimum": base_stats["min_value"],
"maximum": base_stats["max_value"]
},
"room_statistics": room_stats,
"energy_trends": energy_trends
}
except Exception as e:
logger.error(f"Error getting analytics summary: {e}")
raise
async def get_energy_analytics(self, hours: int = 24, room: Optional[str] = None) -> Dict[str, Any]:
"""Get energy-specific analytics"""
try:
start_time = datetime.utcnow() - timedelta(hours=hours)
# Build query
query = {"created_at": {"$gte": start_time}}
if room:
query["room"] = room
# Energy consumption over time
pipeline = [
{"$match": query},
{
"$group": {
"_id": {
"hour": {"$hour": "$created_at"},
"date": {"$dateToString": {"format": "%Y-%m-%d", "date": "$created_at"}}
},
"total_energy": {"$sum": "$value"},
"reading_count": {"$sum": 1}
}
},
{"$sort": {"_id.date": 1, "_id.hour": 1}}
]
cursor = self.db.sensor_readings.aggregate(pipeline)
hourly_data = []
async for data in cursor:
hourly_data.append({
"hour": data["_id"]["hour"],
"date": data["_id"]["date"],
"total_energy": data["total_energy"],
"reading_count": data["reading_count"]
})
# Peak consumption analysis
peak_analysis = await self._get_peak_consumption_analysis(query)
# Energy efficiency metrics
efficiency_metrics = await self._get_efficiency_metrics(query)
return {
"period_hours": hours,
"room": room,
"timestamp": datetime.utcnow().isoformat(),
"hourly_consumption": hourly_data,
"peak_analysis": peak_analysis,
"efficiency_metrics": efficiency_metrics,
"total_consumption": sum(item["total_energy"] for item in hourly_data)
}
except Exception as e:
logger.error(f"Error getting energy analytics: {e}")
raise
async def _get_room_analytics(self, hours: int) -> Dict[str, Any]:
"""Get room-level analytics"""
try:
start_time = datetime.utcnow() - timedelta(hours=hours)
pipeline = [
{
"$match": {
"created_at": {"$gte": start_time},
"room": {"$ne": None}
}
},
{
"$group": {
"_id": "$room",
"total_readings": {"$sum": 1},
"total_energy": {"$sum": "$value"},
"average_energy": {"$avg": "$value"},
"unique_sensors": {"$addToSet": "$sensor_id"}
}
},
{"$sort": {"total_energy": -1}}
]
cursor = self.db.sensor_readings.aggregate(pipeline)
room_data = []
async for room in cursor:
room_data.append({
"room": room["_id"],
"total_readings": room["total_readings"],
"total_energy": room["total_energy"],
"average_energy": round(room["average_energy"], 2),
"sensor_count": len(room["unique_sensors"])
})
return {
"by_room": room_data,
"total_rooms": len(room_data)
}
except Exception as e:
logger.error(f"Error getting room analytics: {e}")
return {"by_room": [], "total_rooms": 0}
async def _get_energy_trends(self, hours: int) -> Dict[str, Any]:
"""Get energy consumption trends"""
try:
start_time = datetime.utcnow() - timedelta(hours=hours)
# Get current period data
current_query = {"created_at": {"$gte": start_time}}
current_cursor = self.db.sensor_readings.aggregate([
{"$match": current_query},
{"$group": {"_id": None, "total": {"$sum": "$value"}, "count": {"$sum": 1}}}
])
current_data = await current_cursor.to_list(length=1)
current_total = current_data[0]["total"] if current_data else 0
current_count = current_data[0]["count"] if current_data else 0
# Get previous period for comparison
previous_start = start_time - timedelta(hours=hours)
previous_query = {
"created_at": {"$gte": previous_start, "$lt": start_time}
}
previous_cursor = self.db.sensor_readings.aggregate([
{"$match": previous_query},
{"$group": {"_id": None, "total": {"$sum": "$value"}, "count": {"$sum": 1}}}
])
previous_data = await previous_cursor.to_list(length=1)
previous_total = previous_data[0]["total"] if previous_data else 0
# Calculate trend
if previous_total > 0:
trend_percentage = ((current_total - previous_total) / previous_total) * 100
else:
trend_percentage = 0
return {
"current_period": {
"total_energy": current_total,
"reading_count": current_count,
"average_per_reading": current_total / current_count if current_count > 0 else 0
},
"previous_period": {
"total_energy": previous_total
},
"trend": {
"percentage_change": round(trend_percentage, 2),
"direction": "up" if trend_percentage > 0 else "down" if trend_percentage < 0 else "stable"
}
}
except Exception as e:
logger.error(f"Error getting energy trends: {e}")
return {}
async def _get_peak_consumption_analysis(self, base_query: Dict[str, Any]) -> Dict[str, Any]:
"""Analyze peak consumption patterns"""
try:
pipeline = [
{"$match": base_query},
{
"$group": {
"_id": {"$hour": "$created_at"},
"total_consumption": {"$sum": "$value"},
"reading_count": {"$sum": 1}
}
},
{"$sort": {"total_consumption": -1}}
]
cursor = self.db.sensor_readings.aggregate(pipeline)
hourly_consumption = await cursor.to_list(length=None)
if not hourly_consumption:
return {"peak_hour": None, "peak_consumption": 0, "hourly_pattern": []}
peak_data = hourly_consumption[0]
return {
"peak_hour": peak_data["_id"],
"peak_consumption": peak_data["total_consumption"],
"hourly_pattern": [
{
"hour": item["_id"],
"consumption": item["total_consumption"],
"reading_count": item["reading_count"]
}
for item in hourly_consumption
]
}
except Exception as e:
logger.error(f"Error analyzing peak consumption: {e}")
return {"peak_hour": None, "peak_consumption": 0, "hourly_pattern": []}
async def _get_efficiency_metrics(self, base_query: Dict[str, Any]) -> Dict[str, Any]:
"""Calculate energy efficiency metrics"""
try:
# Average consumption per sensor
pipeline = [
{"$match": base_query},
{
"$group": {
"_id": "$sensor_id",
"total_consumption": {"$sum": "$value"},
"reading_count": {"$sum": 1}
}
},
{
"$group": {
"_id": None,
"average_per_sensor": {"$avg": "$total_consumption"},
"sensor_count": {"$sum": 1},
"min_consumption": {"$min": "$total_consumption"},
"max_consumption": {"$max": "$total_consumption"}
}
}
]
cursor = self.db.sensor_readings.aggregate(pipeline)
efficiency_data = await cursor.to_list(length=1)
if not efficiency_data:
return {
"average_per_sensor": 0,
"sensor_count": 0,
"efficiency_score": 0,
"variation_coefficient": 0
}
data = efficiency_data[0]
# Calculate efficiency score (lower variation = higher efficiency)
if data["average_per_sensor"] > 0:
variation_coefficient = (data["max_consumption"] - data["min_consumption"]) / data["average_per_sensor"]
efficiency_score = max(0, 100 - (variation_coefficient * 10)) # Scale to 0-100
else:
variation_coefficient = 0
efficiency_score = 100
return {
"average_per_sensor": round(data["average_per_sensor"], 2),
"sensor_count": data["sensor_count"],
"efficiency_score": round(efficiency_score, 1),
"variation_coefficient": round(variation_coefficient, 2)
}
except Exception as e:
logger.error(f"Error calculating efficiency metrics: {e}")
return {
"average_per_sensor": 0,
"sensor_count": 0,
"efficiency_score": 0,
"variation_coefficient": 0
}

View File

@@ -0,0 +1,378 @@
"""
Models for Sensor Management Service - integrating all original dashboard functionality
"""
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any, Literal
from datetime import datetime
from enum import Enum
class SensorType(str, Enum):
ENERGY = "energy"
CO2 = "co2"
TEMPERATURE = "temperature"
HUMIDITY = "humidity"
HVAC = "hvac"
LIGHTING = "lighting"
SECURITY = "security"
MOTION = "motion"
class SensorStatus(str, Enum):
ONLINE = "online"
OFFLINE = "offline"
ERROR = "error"
MAINTENANCE = "maintenance"
class CO2Status(str, Enum):
GOOD = "good"
MODERATE = "moderate"
POOR = "poor"
CRITICAL = "critical"
class OccupancyLevel(str, Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
# Base Models from original dashboard
class SensorReading(BaseModel):
"""Individual sensor reading model - enhanced from original"""
sensor_id: str = Field(..., description="Unique sensor identifier")
room: Optional[str] = Field(None, description="Room where sensor is located")
sensor_type: SensorType = Field(..., description="Type of sensor")
timestamp: int = Field(..., description="Unix timestamp of reading")
created_at: datetime = Field(default_factory=datetime.utcnow, description="Record creation timestamp")
# Sensor values with enhanced structure
energy: Optional[Dict[str, Any]] = Field(None, description="Energy reading with value and unit")
co2: Optional[Dict[str, Any]] = Field(None, description="CO2 reading with value and unit")
temperature: Optional[Dict[str, Any]] = Field(None, description="Temperature reading with value and unit")
humidity: Optional[Dict[str, Any]] = Field(None, description="Humidity reading with value and unit")
motion: Optional[Dict[str, Any]] = Field(None, description="Motion detection reading")
# Additional sensor types from tiocps
power: Optional[Dict[str, Any]] = Field(None, description="Power consumption reading")
voltage: Optional[Dict[str, Any]] = Field(None, description="Voltage reading")
current: Optional[Dict[str, Any]] = Field(None, description="Current reading")
generation: Optional[Dict[str, Any]] = Field(None, description="Energy generation reading")
# Metadata
metadata: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Additional sensor metadata")
class Config:
json_encoders = {
datetime: lambda v: v.isoformat()
}
class LegacySensorReading(BaseModel):
"""Legacy sensor reading format for backward compatibility"""
sensor_id: str = Field(..., alias="sensorId")
timestamp: int
value: float
unit: str
created_at: datetime = Field(default_factory=datetime.utcnow)
class Config:
allow_population_by_field_name = True
class SensorMetadata(BaseModel):
"""Enhanced sensor metadata from original dashboard"""
sensor_id: str = Field(..., description="Unique sensor identifier")
name: str = Field(..., description="Human-readable sensor name")
sensor_type: SensorType = Field(..., description="Type of sensor")
room: Optional[str] = Field(None, description="Room assignment")
status: SensorStatus = Field(default=SensorStatus.OFFLINE, description="Current sensor status")
# Physical location and installation details
location: Optional[str] = Field(None, description="Physical location description")
floor: Optional[str] = Field(None, description="Floor level")
building: Optional[str] = Field(None, description="Building identifier")
# Technical specifications
model: Optional[str] = Field(None, description="Sensor model")
manufacturer: Optional[str] = Field(None, description="Sensor manufacturer")
firmware_version: Optional[str] = Field(None, description="Firmware version")
hardware_version: Optional[str] = Field(None, description="Hardware version")
# Network and connectivity
ip_address: Optional[str] = Field(None, description="IP address if network connected")
mac_address: Optional[str] = Field(None, description="MAC address")
connection_type: Optional[str] = Field(None, description="Connection type (wifi, ethernet, zigbee, etc.)")
# Power and maintenance
battery_level: Optional[float] = Field(None, description="Battery level percentage")
last_maintenance: Optional[datetime] = Field(None, description="Last maintenance date")
next_maintenance: Optional[datetime] = Field(None, description="Next scheduled maintenance")
# Operational settings
sampling_rate: Optional[int] = Field(None, description="Data sampling rate in seconds")
calibration_date: Optional[datetime] = Field(None, description="Last calibration date")
# Capabilities from tiocps integration
monitoring_capabilities: List[str] = Field(default_factory=list, description="List of monitoring capabilities")
control_capabilities: List[str] = Field(default_factory=list, description="List of control capabilities")
demand_response_enabled: bool = Field(default=False, description="Demand response participation")
# Timestamps
installed_at: Optional[datetime] = Field(None, description="Installation timestamp")
last_seen: Optional[datetime] = Field(None, description="Last communication timestamp")
created_at: datetime = Field(default_factory=datetime.utcnow, description="Record creation timestamp")
updated_at: datetime = Field(default_factory=datetime.utcnow, description="Record update timestamp")
class Config:
json_encoders = {
datetime: lambda v: v.isoformat() if v else None
}
class RoomMetrics(BaseModel):
"""Enhanced room metrics from original dashboard"""
room: str = Field(..., description="Room identifier")
timestamp: int = Field(..., description="Metrics calculation timestamp")
created_at: datetime = Field(default_factory=datetime.utcnow, description="Record creation timestamp")
# Sensor inventory
sensor_count: int = Field(0, description="Total number of sensors in room")
active_sensors: List[str] = Field(default_factory=list, description="List of active sensor IDs")
sensor_types: List[SensorType] = Field(default_factory=list, description="Types of sensors present")
# Energy metrics (enhanced from tiocps)
energy: Optional[Dict[str, Any]] = Field(None, description="Energy consumption metrics")
power: Optional[Dict[str, Any]] = Field(None, description="Power consumption metrics")
generation: Optional[Dict[str, Any]] = Field(None, description="Energy generation metrics")
flexibility: Optional[Dict[str, Any]] = Field(None, description="Energy flexibility metrics")
# Environmental metrics
co2: Optional[Dict[str, Any]] = Field(None, description="CO2 level metrics")
temperature: Optional[Dict[str, Any]] = Field(None, description="Temperature metrics")
humidity: Optional[Dict[str, Any]] = Field(None, description="Humidity metrics")
# Occupancy and usage
occupancy_estimate: OccupancyLevel = Field(default=OccupancyLevel.LOW, description="Estimated occupancy level")
motion_detected: bool = Field(default=False, description="Recent motion detection status")
# Time-based metrics
last_activity: Optional[datetime] = Field(None, description="Last detected activity timestamp")
daily_usage_hours: Optional[float] = Field(None, description="Estimated daily usage in hours")
# Economic metrics from tiocps
energy_cost: Optional[float] = Field(None, description="Estimated energy cost")
savings_potential: Optional[float] = Field(None, description="Potential savings from optimization")
class Config:
json_encoders = {
datetime: lambda v: v.isoformat() if v else None
}
class Room(BaseModel):
"""Room definition model"""
name: str = Field(..., description="Room name/identifier")
display_name: Optional[str] = Field(None, description="Human-readable room name")
floor: Optional[str] = Field(None, description="Floor level")
building: Optional[str] = Field(None, description="Building identifier")
area_m2: Optional[float] = Field(None, description="Room area in square meters")
capacity: Optional[int] = Field(None, description="Room capacity (people)")
room_type: Optional[str] = Field(None, description="Room type (office, meeting, etc.)")
# Configuration
target_temperature: Optional[float] = Field(None, description="Target temperature")
target_co2: Optional[float] = Field(None, description="Target CO2 level")
operating_hours: Optional[Dict[str, Any]] = Field(None, description="Operating hours schedule")
# Status
active: bool = Field(default=True, description="Whether room is active")
created_at: datetime = Field(default_factory=datetime.utcnow)
updated_at: datetime = Field(default_factory=datetime.utcnow)
class Config:
json_encoders = {
datetime: lambda v: v.isoformat()
}
class SystemEvent(BaseModel):
"""Enhanced system events from original dashboard"""
event_id: str = Field(..., description="Unique event identifier")
event_type: str = Field(..., description="Type of event")
severity: Literal["info", "warning", "error", "critical"] = Field(..., description="Event severity")
timestamp: int = Field(..., description="Event timestamp")
created_at: datetime = Field(default_factory=datetime.utcnow, description="Record creation timestamp")
# Event details
title: str = Field(..., description="Event title")
description: str = Field(..., description="Event description")
source: Optional[str] = Field(None, description="Event source (sensor_id, system component, etc.)")
# Context
sensor_id: Optional[str] = Field(None, description="Related sensor ID")
room: Optional[str] = Field(None, description="Related room")
# Event data
data: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Additional event data")
# Status tracking
acknowledged: bool = Field(default=False, description="Whether event has been acknowledged")
resolved: bool = Field(default=False, description="Whether event has been resolved")
acknowledged_by: Optional[str] = Field(None, description="Who acknowledged the event")
resolved_by: Optional[str] = Field(None, description="Who resolved the event")
acknowledged_at: Optional[datetime] = Field(None, description="Acknowledgment timestamp")
resolved_at: Optional[datetime] = Field(None, description="Resolution timestamp")
class Config:
json_encoders = {
datetime: lambda v: v.isoformat() if v else None
}
class DataQuery(BaseModel):
"""Enhanced data query parameters from original dashboard"""
sensor_ids: Optional[List[str]] = Field(None, description="Filter by sensor IDs")
rooms: Optional[List[str]] = Field(None, description="Filter by rooms")
sensor_types: Optional[List[SensorType]] = Field(None, description="Filter by sensor types")
# Time range
start_time: Optional[int] = Field(None, description="Start timestamp (Unix)")
end_time: Optional[int] = Field(None, description="End timestamp (Unix)")
# Aggregation
aggregate: Optional[str] = Field(None, description="Aggregation method (avg, sum, min, max)")
interval: Optional[str] = Field(None, description="Aggregation interval (1m, 5m, 1h, 1d)")
# Pagination
limit: int = Field(default=100, description="Maximum number of records to return")
offset: int = Field(default=0, description="Number of records to skip")
# Sorting
sort_by: str = Field(default="timestamp", description="Field to sort by")
sort_order: Literal["asc", "desc"] = Field(default="desc", description="Sort order")
# Additional filters from tiocps
energy_threshold: Optional[float] = Field(None, description="Filter by energy threshold")
co2_threshold: Optional[float] = Field(None, description="Filter by CO2 threshold")
include_metadata: bool = Field(default=False, description="Include sensor metadata in response")
class DataResponse(BaseModel):
"""Enhanced response model for data queries"""
data: List[Dict[str, Any]] = Field(default_factory=list, description="Query results")
total_count: int = Field(0, description="Total number of matching records")
query: DataQuery = Field(..., description="Original query parameters")
execution_time_ms: float = Field(..., description="Query execution time in milliseconds")
# Additional metadata
aggregation_applied: bool = Field(default=False, description="Whether data was aggregated")
cache_hit: bool = Field(default=False, description="Whether result was served from cache")
class AnalyticsSummary(BaseModel):
"""Comprehensive analytics summary"""
period_hours: int
start_time: datetime
end_time: datetime
# Sensor analytics
total_sensors: int
active_sensors: int
sensor_types_summary: Dict[str, int]
# Room analytics
total_rooms: int
active_rooms: int
room_occupancy_summary: Dict[str, int]
# Energy analytics
total_energy_consumption: float
total_energy_generation: float
net_energy_consumption: float
energy_efficiency: float
# Environmental analytics
average_co2: float
average_temperature: float
average_humidity: float
# System health
system_events_count: int
critical_events_count: int
sensor_errors_count: int
class Config:
json_encoders = {
datetime: lambda v: v.isoformat()
}
# Room Management Models
class Room(BaseModel):
"""Room model for database storage and API responses"""
name: str = Field(..., description="Unique room name")
description: Optional[str] = Field(None, description="Room description")
floor: Optional[str] = Field(None, description="Floor designation")
building: Optional[str] = Field(None, description="Building name")
area: Optional[float] = Field(None, description="Room area in square meters")
capacity: Optional[int] = Field(None, description="Maximum occupancy")
room_type: Optional[str] = Field(None, description="Room type (office, meeting, storage, etc.)")
# Metadata
created_at: datetime = Field(default_factory=datetime.utcnow, description="Room creation timestamp")
updated_at: datetime = Field(default_factory=datetime.utcnow, description="Room update timestamp")
class Config:
json_encoders = {
datetime: lambda v: v.isoformat() if v else None
}
class RoomCreate(BaseModel):
"""Model for creating new rooms"""
name: str = Field(..., description="Unique room name", min_length=1, max_length=100)
description: Optional[str] = Field(None, description="Room description", max_length=500)
floor: Optional[str] = Field(None, description="Floor designation", max_length=50)
building: Optional[str] = Field(None, description="Building name", max_length=100)
area: Optional[float] = Field(None, description="Room area in square meters", gt=0)
capacity: Optional[int] = Field(None, description="Maximum occupancy", gt=0)
room_type: Optional[str] = Field(None, description="Room type", max_length=50)
class RoomUpdate(BaseModel):
"""Model for updating existing rooms"""
description: Optional[str] = Field(None, description="Room description", max_length=500)
floor: Optional[str] = Field(None, description="Floor designation", max_length=50)
building: Optional[str] = Field(None, description="Building name", max_length=100)
area: Optional[float] = Field(None, description="Room area in square meters", gt=0)
capacity: Optional[int] = Field(None, description="Maximum occupancy", gt=0)
room_type: Optional[str] = Field(None, description="Room type", max_length=50)
class RoomInfo(BaseModel):
"""Comprehensive room information for API responses"""
name: str = Field(..., description="Room name")
description: Optional[str] = Field(None, description="Room description")
floor: Optional[str] = Field(None, description="Floor designation")
building: Optional[str] = Field(None, description="Building name")
area: Optional[float] = Field(None, description="Room area in square meters")
capacity: Optional[int] = Field(None, description="Maximum occupancy")
room_type: Optional[str] = Field(None, description="Room type")
# Runtime information
sensor_count: int = Field(0, description="Number of sensors in room")
active_sensors: int = Field(0, description="Number of active sensors")
last_updated: Optional[datetime] = Field(None, description="Last metrics update")
# Timestamps
created_at: datetime = Field(..., description="Room creation timestamp")
updated_at: datetime = Field(..., description="Room update timestamp")
class Config:
json_encoders = {
datetime: lambda v: v.isoformat() if v else None
}
class HealthResponse(BaseModel):
"""Health check response"""
service: str
status: str
timestamp: datetime
version: str
# Additional service-specific health metrics
total_sensors: Optional[int] = None
active_sensors: Optional[int] = None
total_rooms: Optional[int] = None
websocket_connections: Optional[int] = None
class Config:
json_encoders = {
datetime: lambda v: v.isoformat()
}

View File

@@ -0,0 +1,467 @@
"""
Room service for managing rooms and room-level metrics
"""
import logging
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
import json
logger = logging.getLogger(__name__)
class RoomService:
"""Service for managing rooms and room-level analytics"""
def __init__(self, db, redis_client):
self.db = db
self.redis = redis_client
async def get_all_room_names(self) -> List[str]:
"""Get a simple list of all room names for dropdowns/selections"""
try:
# Get rooms from the rooms collection
room_cursor = self.db.rooms.find({}, {"name": 1})
room_names = set()
async for room in room_cursor:
room_names.add(room["name"])
# Also get rooms that exist only in sensor data (legacy support)
sensor_cursor = self.db.sensors.find(
{"room": {"$ne": None, "$exists": True}},
{"room": 1}
)
async for sensor in sensor_cursor:
if sensor.get("room"):
room_names.add(sensor["room"])
# Convert to sorted list
return sorted(list(room_names))
except Exception as e:
logger.error(f"Error getting room names: {e}")
raise
async def initialize_default_rooms(self) -> None:
"""Initialize default rooms if none exist"""
try:
# Check if any rooms exist
room_count = await self.db.rooms.count_documents({})
if room_count == 0:
# Create default rooms
default_rooms = [
{"name": "Conference Room A", "description": "Main conference room", "room_type": "meeting"},
{"name": "Conference Room B", "description": "Secondary conference room", "room_type": "meeting"},
{"name": "Office Floor 1", "description": "First floor office space", "room_type": "office"},
{"name": "Office Floor 2", "description": "Second floor office space", "room_type": "office"},
{"name": "Kitchen", "description": "Employee kitchen and break room", "room_type": "common"},
{"name": "Lobby", "description": "Main entrance and reception", "room_type": "common"},
{"name": "Server Room", "description": "IT equipment room", "room_type": "technical"},
{"name": "Storage Room", "description": "General storage", "room_type": "storage"},
{"name": "Meeting Room 1", "description": "Small meeting room", "room_type": "meeting"},
{"name": "Meeting Room 2", "description": "Small meeting room", "room_type": "meeting"}
]
for room_data in default_rooms:
room_doc = {
**room_data,
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
await self.db.rooms.insert_one(room_doc)
logger.info(f"Initialized {len(default_rooms)} default rooms")
except Exception as e:
logger.error(f"Error initializing default rooms: {e}")
raise
async def get_rooms(self) -> List[Dict[str, Any]]:
"""Get all rooms with sensor counts and metrics"""
try:
# Get unique rooms from sensors
pipeline = [
{"$group": {"_id": "$room", "sensor_count": {"$sum": 1}}},
{"$match": {"_id": {"$ne": None}}}
]
cursor = self.db.sensors.aggregate(pipeline)
rooms = []
async for room_data in cursor:
room_name = room_data["_id"]
# Get latest room metrics
latest_metrics = await self._get_latest_room_metrics(room_name)
room_info = {
"name": room_name,
"sensor_count": room_data["sensor_count"],
"latest_metrics": latest_metrics,
"last_updated": latest_metrics.get("timestamp") if latest_metrics else None
}
rooms.append(room_info)
return rooms
except Exception as e:
logger.error(f"Error getting rooms: {e}")
raise
async def create_room(self, room_data: Dict[str, Any]) -> Dict[str, Any]:
"""Create a new room"""
try:
room_doc = {
"name": room_data.get("name"),
"description": room_data.get("description", ""),
"floor": room_data.get("floor"),
"building": room_data.get("building"),
"area": room_data.get("area"),
"capacity": room_data.get("capacity"),
"room_type": room_data.get("room_type"),
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
# Validate required fields
if not room_doc["name"] or not room_doc["name"].strip():
raise ValueError("Room name is required")
# Check if room already exists
existing = await self.db.rooms.find_one({"name": room_doc["name"]})
if existing:
raise ValueError(f"Room {room_doc['name']} already exists")
result = await self.db.rooms.insert_one(room_doc)
return {
"id": str(result.inserted_id),
"name": room_doc["name"],
"created_at": room_doc["created_at"]
}
except Exception as e:
logger.error(f"Error creating room: {e}")
raise
async def update_room(self, room_name: str, room_data: Dict[str, Any]) -> Dict[str, Any]:
"""Update an existing room"""
try:
# Check if room exists
existing = await self.db.rooms.find_one({"name": room_name})
if not existing:
raise ValueError(f"Room {room_name} not found")
# Prepare update document
update_doc = {
"updated_at": datetime.utcnow()
}
# Update only provided fields
for field in ["description", "floor", "building", "area", "capacity", "room_type"]:
if field in room_data and room_data[field] is not None:
update_doc[field] = room_data[field]
# Perform update
result = await self.db.rooms.update_one(
{"name": room_name},
{"$set": update_doc}
)
if result.modified_count == 0:
logger.warning(f"No changes made to room {room_name}")
return {
"name": room_name,
"updated_at": update_doc["updated_at"],
"modified": result.modified_count > 0
}
except Exception as e:
logger.error(f"Error updating room: {e}")
raise
async def delete_room(self, room_name: str) -> Dict[str, Any]:
"""Delete a room and optionally reassign sensors"""
try:
# Check if room exists
existing = await self.db.rooms.find_one({"name": room_name})
# Check for sensors in this room
sensors_in_room = await self.db.sensors.find({"room": room_name}).to_list(None)
if sensors_in_room:
# Update sensors to have null room (don't delete sensors)
await self.db.sensors.update_many(
{"room": room_name},
{"$unset": {"room": ""}}
)
# Delete room from rooms collection if it exists
room_deleted = False
if existing:
result = await self.db.rooms.delete_one({"name": room_name})
room_deleted = result.deleted_count > 0
# Delete room metrics
metrics_result = await self.db.room_metrics.delete_many({"room": room_name})
return {
"room": room_name,
"room_deleted": room_deleted,
"sensors_updated": len(sensors_in_room),
"metrics_deleted": metrics_result.deleted_count
}
except Exception as e:
logger.error(f"Error deleting room: {e}")
raise
async def get_room_details(self, room_name: str) -> Optional[Dict[str, Any]]:
"""Get detailed room information"""
try:
# Get room info
room = await self.db.rooms.find_one({"name": room_name})
if not room:
# Create basic room info from sensor data
sensors = await self.db.sensors.find({"room": room_name}).to_list(None)
if not sensors:
return None
room = {
"name": room_name,
"description": f"Room with {len(sensors)} sensors",
"sensor_count": len(sensors)
}
else:
room["_id"] = str(room["_id"])
# Get sensor count
sensor_count = await self.db.sensors.count_documents({"room": room_name})
room["sensor_count"] = sensor_count
# Get sensors in this room
cursor = self.db.sensors.find({"room": room_name})
sensors = []
async for sensor in cursor:
sensor["_id"] = str(sensor["_id"])
sensors.append(sensor)
room["sensors"] = sensors
# Get recent room metrics
room["recent_metrics"] = await self._get_recent_room_metrics(room_name, hours=24)
return room
except Exception as e:
logger.error(f"Error getting room details: {e}")
raise
async def get_room_data(self, room_name: str, start_time: Optional[int] = None,
end_time: Optional[int] = None, limit: int = 100) -> Dict[str, Any]:
"""Get historical data for a room"""
try:
# Get room metrics
room_query = {"room": room_name}
if start_time or end_time:
room_query["timestamp"] = {}
if start_time:
room_query["timestamp"]["$gte"] = start_time
if end_time:
room_query["timestamp"]["$lte"] = end_time
room_metrics_cursor = self.db.room_metrics.find(room_query).sort("timestamp", -1).limit(limit)
room_metrics = []
async for metric in room_metrics_cursor:
metric["_id"] = str(metric["_id"])
room_metrics.append(metric)
# Get sensor readings for this room
sensor_query = {"room": room_name}
if start_time or end_time:
sensor_query["timestamp"] = {}
if start_time:
sensor_query["timestamp"]["$gte"] = start_time
if end_time:
sensor_query["timestamp"]["$lte"] = end_time
sensor_readings_cursor = self.db.sensor_readings.find(sensor_query).sort("timestamp", -1).limit(limit)
sensor_readings = []
async for reading in sensor_readings_cursor:
reading["_id"] = str(reading["_id"])
sensor_readings.append(reading)
return {
"room_metrics": room_metrics,
"sensor_readings": sensor_readings
}
except Exception as e:
logger.error(f"Error getting room data: {e}")
raise
async def update_room_metrics(self, sensor_data):
"""Update room-level metrics when sensor data is received"""
try:
if not sensor_data.room:
return
# Calculate room-level aggregates
room_metrics = await self._calculate_room_metrics(sensor_data.room)
if room_metrics:
# Store room metrics
metrics_doc = {
"room": sensor_data.room,
"timestamp": sensor_data.timestamp,
"total_energy": room_metrics.get("total_energy", 0),
"average_temperature": room_metrics.get("avg_temperature"),
"co2_level": room_metrics.get("co2_level"),
"occupancy_estimate": room_metrics.get("occupancy_estimate"),
"sensor_count": room_metrics.get("sensor_count", 0),
"created_at": datetime.utcnow()
}
await self.db.room_metrics.insert_one(metrics_doc)
# Cache latest metrics
if self.redis:
cache_key = f"room:{sensor_data.room}:latest_metrics"
await self.redis.setex(cache_key, 3600, json.dumps(metrics_doc, default=str))
except Exception as e:
logger.error(f"Error updating room metrics: {e}")
async def aggregate_all_room_metrics(self):
"""Aggregate metrics for all rooms"""
try:
# Get all unique rooms
pipeline = [{"$group": {"_id": "$room"}}]
cursor = self.db.sensors.aggregate(pipeline)
async for room_data in cursor:
room_name = room_data["_id"]
if room_name:
await self._calculate_room_metrics(room_name)
except Exception as e:
logger.error(f"Error aggregating room metrics: {e}")
async def _get_latest_room_metrics(self, room_name: str) -> Optional[Dict[str, Any]]:
"""Get latest room metrics"""
try:
# Try Redis cache first
if self.redis:
cache_key = f"room:{room_name}:latest_metrics"
cached = await self.redis.get(cache_key)
if cached:
return json.loads(cached)
# Fall back to database
latest = await self.db.room_metrics.find_one(
{"room": room_name},
sort=[("timestamp", -1)]
)
if latest:
latest["_id"] = str(latest["_id"])
return latest
return None
except Exception as e:
logger.error(f"Error getting latest room metrics: {e}")
return None
async def _get_recent_room_metrics(self, room_name: str, hours: int = 24) -> List[Dict[str, Any]]:
"""Get recent room metrics"""
try:
start_time = datetime.utcnow() - timedelta(hours=hours)
cursor = self.db.room_metrics.find({
"room": room_name,
"created_at": {"$gte": start_time}
}).sort("timestamp", -1)
metrics = []
async for metric in cursor:
metric["_id"] = str(metric["_id"])
metrics.append(metric)
return metrics
except Exception as e:
logger.error(f"Error getting recent room metrics: {e}")
return []
async def _calculate_room_metrics(self, room_name: str) -> Dict[str, Any]:
"""Calculate aggregated metrics for a room"""
try:
# Get recent sensor readings (last 5 minutes)
five_minutes_ago = datetime.utcnow() - timedelta(minutes=5)
pipeline = [
{
"$match": {
"room": room_name,
"created_at": {"$gte": five_minutes_ago}
}
},
{
"$group": {
"_id": "$sensor_id",
"latest_value": {"$last": "$value"},
"sensor_type": {"$last": "$sensor_type"} if "sensor_type" in ["$first", "$last"] else {"$first": "energy"},
"unit": {"$last": "$unit"}
}
}
]
cursor = self.db.sensor_readings.aggregate(pipeline)
total_energy = 0
temperatures = []
co2_levels = []
sensor_count = 0
async for sensor_data in cursor:
sensor_count += 1
value = sensor_data.get("latest_value", 0)
sensor_type = sensor_data.get("sensor_type", "energy")
if sensor_type == "energy" or "energy" in str(sensor_data.get("unit", "")).lower():
total_energy += value
elif sensor_type == "temperature":
temperatures.append(value)
elif sensor_type == "co2":
co2_levels.append(value)
metrics = {
"total_energy": total_energy,
"sensor_count": sensor_count,
"avg_temperature": sum(temperatures) / len(temperatures) if temperatures else None,
"co2_level": sum(co2_levels) / len(co2_levels) if co2_levels else None,
"occupancy_estimate": self._estimate_occupancy(sensor_count, total_energy)
}
return metrics
except Exception as e:
logger.error(f"Error calculating room metrics: {e}")
return {}
def _estimate_occupancy(self, sensor_count: int, total_energy: float) -> Optional[str]:
"""Estimate occupancy level based on energy consumption"""
if total_energy == 0:
return "vacant"
elif total_energy < sensor_count * 50: # Low threshold
return "low"
elif total_energy < sensor_count * 150: # Medium threshold
return "medium"
else:
return "high"

View File

@@ -0,0 +1,475 @@
"""Sensors module API routes."""
import logging
from datetime import datetime
from fastapi import APIRouter, HTTPException, Depends, WebSocket, WebSocketDisconnect, Query, BackgroundTasks
from typing import Optional
from .models import (
SensorReading, SensorMetadata, RoomCreate, RoomUpdate, DataQuery, DataResponse,
SensorType, SensorStatus, HealthResponse
)
from .sensor_service import SensorService
from .room_service import RoomService
from .analytics_service import AnalyticsService
from .websocket_manager import WebSocketManager
from src.core.dependencies import get_sensors_db, get_redis
logger = logging.getLogger(__name__)
# Create router
router = APIRouter()
# WebSocket manager (shared across all route handlers)
websocket_manager = WebSocketManager()
# Dependency functions
async def get_sensor_service(db=Depends(get_sensors_db), redis=Depends(get_redis)):
return SensorService(db, redis)
async def get_room_service(db=Depends(get_sensors_db), redis=Depends(get_redis)):
return RoomService(db, redis)
async def get_analytics_service(db=Depends(get_sensors_db), redis=Depends(get_redis)):
return AnalyticsService(db, redis)
# Health check
@router.get("/health", response_model=HealthResponse)
async def health_check(db=Depends(get_sensors_db)):
"""Health check endpoint for sensors module"""
try:
await db.command("ping")
return HealthResponse(
service="sensors-module",
status="healthy",
timestamp=datetime.utcnow(),
version="1.0.0"
)
except Exception as e:
logger.error(f"Health check failed: {e}")
raise HTTPException(status_code=503, detail="Service Unavailable")
# WebSocket endpoint for real-time data
@router.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""WebSocket endpoint for real-time sensor data"""
await websocket_manager.connect(websocket)
try:
while True:
await websocket.receive_text()
except WebSocketDisconnect:
await websocket_manager.disconnect(websocket)
# Sensor Management Routes
@router.get("/sensors/get")
async def get_sensors(
room: Optional[str] = Query(None, description="Filter by room"),
sensor_type: Optional[SensorType] = Query(None, description="Filter by sensor type"),
status: Optional[SensorStatus] = Query(None, description="Filter by status"),
service: SensorService = Depends(get_sensor_service)
):
"""Get all sensors with optional filtering"""
try:
sensors = await service.get_sensors(room=room, sensor_type=sensor_type, status=status)
return {
"sensors": sensors,
"count": len(sensors),
"filters": {
"room": room,
"sensor_type": sensor_type.value if sensor_type else None,
"status": status.value if status else None
}
}
except Exception as e:
logger.error(f"Error getting sensors: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/sensors/{sensor_id}")
async def get_sensor(sensor_id: str, service: SensorService = Depends(get_sensor_service)):
"""Get detailed sensor information"""
try:
sensor = await service.get_sensor_details(sensor_id)
if not sensor:
raise HTTPException(status_code=404, detail="Sensor not found")
return sensor
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting sensor {sensor_id}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/sensors/{sensor_id}/data")
async def get_sensor_data(
sensor_id: str,
start_time: Optional[int] = Query(None, description="Start timestamp (Unix)"),
end_time: Optional[int] = Query(None, description="End timestamp (Unix)"),
limit: int = Query(100, description="Maximum records to return"),
offset: int = Query(0, description="Records to skip"),
service: SensorService = Depends(get_sensor_service)
):
"""Get historical data for a specific sensor"""
try:
data = await service.get_sensor_data(
sensor_id=sensor_id,
start_time=start_time,
end_time=end_time,
limit=limit,
offset=offset
)
return DataResponse(
data=data["readings"],
total_count=data["total_count"],
query=DataQuery(
sensor_ids=[sensor_id],
start_time=start_time,
end_time=end_time,
limit=limit,
offset=offset
),
execution_time_ms=data.get("execution_time_ms", 0)
)
except Exception as e:
logger.error(f"Error getting sensor data for {sensor_id}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.post("/sensors")
async def create_sensor(
sensor_data: SensorMetadata,
service: SensorService = Depends(get_sensor_service)
):
"""Register a new sensor"""
try:
result = await service.create_sensor(sensor_data)
return {
"message": "Sensor created successfully",
"sensor_id": sensor_data.sensor_id,
"created_at": result.get("created_at")
}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error creating sensor: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.put("/sensors/{sensor_id}")
async def update_sensor(
sensor_id: str,
update_data: dict,
service: SensorService = Depends(get_sensor_service)
):
"""Update sensor metadata"""
try:
result = await service.update_sensor(sensor_id, update_data)
if not result:
raise HTTPException(status_code=404, detail="Sensor not found")
return {
"message": "Sensor updated successfully",
"sensor_id": sensor_id,
"updated_at": datetime.utcnow().isoformat()
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating sensor {sensor_id}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.delete("/sensors/{sensor_id}")
async def delete_sensor(
sensor_id: str,
service: SensorService = Depends(get_sensor_service)
):
"""Delete a sensor and all its data"""
try:
result = await service.delete_sensor(sensor_id)
return {
"message": "Sensor deleted successfully",
"sensor_id": sensor_id,
"readings_deleted": result.get("readings_deleted", 0)
}
except Exception as e:
logger.error(f"Error deleting sensor {sensor_id}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
# Room Management Routes
@router.get("/rooms/names")
async def get_room_names(service: RoomService = Depends(get_room_service)):
"""Get simple list of room names for dropdowns"""
try:
room_names = await service.get_all_room_names()
return {
"rooms": room_names,
"count": len(room_names)
}
except Exception as e:
logger.error(f"Error getting room names: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/rooms")
async def get_rooms(service: RoomService = Depends(get_room_service)):
"""Get all rooms with sensor counts and metrics"""
try:
rooms = await service.get_rooms()
return {
"rooms": rooms,
"count": len(rooms)
}
except Exception as e:
logger.error(f"Error getting rooms: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.post("/rooms")
async def create_room(
room_data: RoomCreate,
service: RoomService = Depends(get_room_service)
):
"""Create a new room"""
try:
result = await service.create_room(room_data.dict())
return {
"message": "Room created successfully",
"room": result["name"],
"created_at": result["created_at"]
}
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error creating room: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.put("/rooms/{room_name}")
async def update_room(
room_name: str,
room_data: RoomUpdate,
service: RoomService = Depends(get_room_service)
):
"""Update an existing room"""
try:
result = await service.update_room(room_name, room_data.dict(exclude_unset=True))
return {
"message": "Room updated successfully",
"room": result["name"],
"updated_at": result["updated_at"],
"modified": result["modified"]
}
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Error updating room {room_name}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.delete("/rooms/{room_name}")
async def delete_room(room_name: str, service: RoomService = Depends(get_room_service)):
"""Delete a room"""
try:
result = await service.delete_room(room_name)
return {
"message": "Room deleted successfully",
**result
}
except Exception as e:
logger.error(f"Error deleting room {room_name}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/rooms/{room_name}")
async def get_room(room_name: str, service: RoomService = Depends(get_room_service)):
"""Get detailed room information"""
try:
room = await service.get_room_details(room_name)
if not room:
raise HTTPException(status_code=404, detail="Room not found")
return room
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting room {room_name}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/rooms/{room_name}/data")
async def get_room_data(
room_name: str,
start_time: Optional[int] = Query(None, description="Start timestamp (Unix)"),
end_time: Optional[int] = Query(None, description="End timestamp (Unix)"),
limit: int = Query(100, description="Maximum records to return"),
service: RoomService = Depends(get_room_service)
):
"""Get historical data for a specific room"""
try:
data = await service.get_room_data(
room_name=room_name,
start_time=start_time,
end_time=end_time,
limit=limit
)
return {
"room": room_name,
"room_metrics": data.get("room_metrics", []),
"sensor_readings": data.get("sensor_readings", []),
"period": {
"start_time": start_time,
"end_time": end_time
}
}
except Exception as e:
logger.error(f"Error getting room data for {room_name}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
# Analytics Routes
@router.post("/data/query")
async def query_data(
query_params: DataQuery,
service: AnalyticsService = Depends(get_analytics_service)
):
"""Advanced data querying with multiple filters"""
try:
result = await service.query_data(query_params)
return result
except Exception as e:
logger.error(f"Error executing data query: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/analytics/summary")
async def get_analytics_summary(
hours: int = Query(24, description="Hours of data to analyze"),
service: AnalyticsService = Depends(get_analytics_service)
):
"""Get comprehensive analytics summary"""
try:
analytics = await service.get_analytics_summary(hours)
return analytics
except Exception as e:
logger.error(f"Error getting analytics summary: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/analytics/energy")
async def get_energy_analytics(
hours: int = Query(24),
room: Optional[str] = Query(None),
service: AnalyticsService = Depends(get_analytics_service)
):
"""Get energy-specific analytics"""
try:
analytics = await service.get_energy_analytics(hours, room)
return analytics
except Exception as e:
logger.error(f"Error getting energy analytics: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
# Data Export
@router.get("/export")
async def export_data(
start_time: int = Query(..., description="Start timestamp (Unix)"),
end_time: int = Query(..., description="End timestamp (Unix)"),
sensor_ids: Optional[str] = Query(None, description="Comma-separated sensor IDs"),
format: str = Query("json", description="Export format (json, csv)"),
service: SensorService = Depends(get_sensor_service)
):
"""Export sensor data"""
try:
export_data_result = await service.export_data(
start_time=start_time,
end_time=end_time,
sensor_ids=sensor_ids,
format=format
)
return export_data_result
except Exception as e:
logger.error(f"Error exporting data: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
# System Events
@router.get("/events")
async def get_events(
severity: Optional[str] = Query(None, description="Filter by severity"),
event_type: Optional[str] = Query(None, description="Filter by event type"),
hours: int = Query(24, description="Hours of events to retrieve"),
limit: int = Query(50, description="Maximum events to return"),
service: SensorService = Depends(get_sensor_service)
):
"""Get system events and alerts"""
try:
events = await service.get_events(
severity=severity,
event_type=event_type,
hours=hours,
limit=limit
)
return {
"events": events,
"count": len(events),
"period_hours": hours
}
except Exception as e:
logger.error(f"Error getting events: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
# Real-time data ingestion endpoint
@router.post("/data/ingest")
async def ingest_sensor_data(
sensor_data: SensorReading,
background_tasks: BackgroundTasks,
service: SensorService = Depends(get_sensor_service),
room_service: RoomService = Depends(get_room_service)
):
"""Ingest real-time sensor data"""
try:
result = await service.ingest_sensor_data(sensor_data)
# Schedule background tasks
if sensor_data.room:
background_tasks.add_task(_update_room_metrics, room_service, sensor_data)
background_tasks.add_task(_broadcast_sensor_data, sensor_data)
return {
"message": "Sensor data ingested successfully",
"sensor_id": sensor_data.sensor_id,
"timestamp": sensor_data.timestamp
}
except Exception as e:
logger.error(f"Error ingesting sensor data: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
# Background task helper functions
async def _update_room_metrics(room_service: RoomService, sensor_data: SensorReading):
"""Update room-level metrics when sensor data is received"""
try:
await room_service.update_room_metrics(sensor_data)
except Exception as e:
logger.error(f"Error updating room metrics: {e}")
async def _broadcast_sensor_data(sensor_data: SensorReading):
"""Broadcast sensor data to WebSocket clients"""
try:
await websocket_manager.broadcast_sensor_data(sensor_data)
except Exception as e:
logger.error(f"Error broadcasting sensor data: {e}")

View File

@@ -0,0 +1,251 @@
"""
Sensor service business logic
"""
import logging
from datetime import datetime, timedelta
from typing import List, Dict, Any, Optional
import json
logger = logging.getLogger(__name__)
class SensorService:
"""Service for managing sensors and sensor data"""
def __init__(self, db, redis_client):
self.db = db
self.redis = redis_client
async def get_sensors(self, room: Optional[str] = None, sensor_type: Optional[str] = None, status: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get sensors with optional filtering"""
try:
query = {}
if room:
query["room"] = room
if sensor_type:
query["sensor_type"] = sensor_type
if status:
query["status"] = status
cursor = self.db.sensors.find(query)
sensors = []
async for sensor in cursor:
sensor["_id"] = str(sensor["_id"])
sensors.append(sensor)
return sensors
except Exception as e:
logger.error(f"Error getting sensors: {e}")
raise
async def get_sensor_details(self, sensor_id: str) -> Optional[Dict[str, Any]]:
"""Get detailed sensor information"""
try:
sensor = await self.db.sensors.find_one({"sensor_id": sensor_id})
if sensor:
sensor["_id"] = str(sensor["_id"])
# Get recent readings
recent_readings = await self.get_sensor_data(sensor_id, limit=10)
sensor["recent_readings"] = recent_readings.get("readings", [])
return sensor
return None
except Exception as e:
logger.error(f"Error getting sensor details: {e}")
raise
async def get_sensor_data(self, sensor_id: str, start_time: Optional[int] = None,
end_time: Optional[int] = None, limit: int = 100, offset: int = 0) -> Dict[str, Any]:
"""Get historical sensor data"""
try:
query = {"sensor_id": sensor_id}
if start_time or end_time:
query["timestamp"] = {}
if start_time:
query["timestamp"]["$gte"] = start_time
if end_time:
query["timestamp"]["$lte"] = end_time
# Get total count
total_count = await self.db.sensor_readings.count_documents(query)
# Get readings
cursor = self.db.sensor_readings.find(query).sort("timestamp", -1).skip(offset).limit(limit)
readings = []
async for reading in cursor:
reading["_id"] = str(reading["_id"])
readings.append(reading)
return {
"readings": readings,
"total_count": total_count,
"execution_time_ms": 0 # Placeholder
}
except Exception as e:
logger.error(f"Error getting sensor data: {e}")
raise
async def create_sensor(self, sensor_data) -> Dict[str, Any]:
"""Create a new sensor"""
try:
# Check if sensor already exists
existing = await self.db.sensors.find_one({"sensor_id": sensor_data.sensor_id})
if existing:
raise ValueError(f"Sensor {sensor_data.sensor_id} already exists")
# Create sensor document
sensor_doc = {
"sensor_id": sensor_data.sensor_id,
"name": sensor_data.name,
"sensor_type": sensor_data.sensor_type.value if hasattr(sensor_data.sensor_type, 'value') else str(sensor_data.sensor_type),
"room": sensor_data.room,
"location": sensor_data.location if hasattr(sensor_data, 'location') else None,
"status": "active",
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow()
}
result = await self.db.sensors.insert_one(sensor_doc)
return {"created_at": datetime.utcnow()}
except Exception as e:
logger.error(f"Error creating sensor: {e}")
raise
async def update_sensor(self, sensor_id: str, update_data: Dict[str, Any]) -> bool:
"""Update sensor metadata"""
try:
update_data["updated_at"] = datetime.utcnow()
result = await self.db.sensors.update_one(
{"sensor_id": sensor_id},
{"$set": update_data}
)
return result.modified_count > 0
except Exception as e:
logger.error(f"Error updating sensor: {e}")
raise
async def delete_sensor(self, sensor_id: str) -> Dict[str, Any]:
"""Delete a sensor and its data"""
try:
# Delete readings
readings_result = await self.db.sensor_readings.delete_many({"sensor_id": sensor_id})
# Delete sensor
await self.db.sensors.delete_one({"sensor_id": sensor_id})
return {"readings_deleted": readings_result.deleted_count}
except Exception as e:
logger.error(f"Error deleting sensor: {e}")
raise
async def ingest_sensor_data(self, sensor_data) -> Dict[str, Any]:
"""Ingest real-time sensor data"""
try:
# Create reading document
reading_doc = {
"sensor_id": sensor_data.sensor_id,
"timestamp": sensor_data.timestamp,
"value": sensor_data.value,
"unit": sensor_data.unit if hasattr(sensor_data, 'unit') else None,
"room": sensor_data.room if hasattr(sensor_data, 'room') else None,
"created_at": datetime.utcnow()
}
# Store in database
await self.db.sensor_readings.insert_one(reading_doc)
# Cache recent value in Redis
if self.redis:
cache_key = f"sensor:{sensor_data.sensor_id}:latest"
await self.redis.setex(cache_key, 3600, json.dumps(reading_doc, default=str))
return {"success": True}
except Exception as e:
logger.error(f"Error ingesting sensor data: {e}")
raise
async def export_data(self, start_time: int, end_time: int, sensor_ids: Optional[str] = None,
format: str = "json") -> Dict[str, Any]:
"""Export sensor data"""
try:
query = {
"timestamp": {"$gte": start_time, "$lte": end_time}
}
if sensor_ids:
sensor_list = [s.strip() for s in sensor_ids.split(",")]
query["sensor_id"] = {"$in": sensor_list}
cursor = self.db.sensor_readings.find(query).sort("timestamp", 1)
readings = []
async for reading in cursor:
reading["_id"] = str(reading["_id"])
readings.append(reading)
return {
"format": format,
"data": readings,
"total_records": len(readings),
"period": {"start": start_time, "end": end_time}
}
except Exception as e:
logger.error(f"Error exporting data: {e}")
raise
async def get_events(self, severity: Optional[str] = None, event_type: Optional[str] = None,
hours: int = 24, limit: int = 50) -> List[Dict[str, Any]]:
"""Get system events"""
try:
start_time = datetime.utcnow() - timedelta(hours=hours)
query = {"timestamp": {"$gte": start_time}}
if severity:
query["severity"] = severity
if event_type:
query["event_type"] = event_type
cursor = self.db.system_events.find(query).sort("timestamp", -1).limit(limit)
events = []
async for event in cursor:
event["_id"] = str(event["_id"])
events.append(event)
return events
except Exception as e:
logger.error(f"Error getting events: {e}")
return []
async def cleanup_old_data(self, cutoff_date: datetime):
"""Clean up old sensor data"""
try:
result = await self.db.sensor_readings.delete_many({
"created_at": {"$lt": cutoff_date}
})
logger.info(f"Cleaned up {result.deleted_count} old sensor readings")
except Exception as e:
logger.error(f"Error cleaning up old data: {e}")
raise

View File

@@ -0,0 +1,288 @@
"""
WebSocket manager for real-time sensor data broadcasting
"""
import asyncio
import json
from typing import List, Set, Dict, Any
from fastapi import WebSocket, WebSocketDisconnect
import logging
from models import SensorReading
logger = logging.getLogger(__name__)
class WebSocketManager:
"""Manages WebSocket connections for real-time data broadcasting"""
def __init__(self):
self.active_connections: List[WebSocket] = []
self.room_subscriptions: Dict[str, Set[WebSocket]] = {}
self.sensor_subscriptions: Dict[str, Set[WebSocket]] = {}
self.connection_metadata: Dict[WebSocket, Dict[str, Any]] = {}
async def connect(self, websocket: WebSocket, room: str = None, sensor_id: str = None):
"""Accept a WebSocket connection and handle subscriptions"""
await websocket.accept()
self.active_connections.append(websocket)
# Store connection metadata
self.connection_metadata[websocket] = {
"connected_at": asyncio.get_event_loop().time(),
"room": room,
"sensor_id": sensor_id,
"message_count": 0
}
# Handle room subscription
if room:
if room not in self.room_subscriptions:
self.room_subscriptions[room] = set()
self.room_subscriptions[room].add(websocket)
# Handle sensor subscription
if sensor_id:
if sensor_id not in self.sensor_subscriptions:
self.sensor_subscriptions[sensor_id] = set()
self.sensor_subscriptions[sensor_id].add(websocket)
logger.info(f"WebSocket client connected. Total connections: {len(self.active_connections)}")
# Send initial connection confirmation
await self.send_to_connection(websocket, {
"type": "connection_established",
"timestamp": asyncio.get_event_loop().time(),
"subscriptions": {
"room": room,
"sensor_id": sensor_id
},
"total_connections": len(self.active_connections)
})
async def disconnect(self, websocket: WebSocket):
"""Remove a WebSocket connection and clean up subscriptions"""
if websocket in self.active_connections:
self.active_connections.remove(websocket)
# Clean up room subscriptions
for room_connections in self.room_subscriptions.values():
room_connections.discard(websocket)
# Clean up sensor subscriptions
for sensor_connections in self.sensor_subscriptions.values():
sensor_connections.discard(websocket)
# Clean up metadata
self.connection_metadata.pop(websocket, None)
logger.info(f"WebSocket client disconnected. Total connections: {len(self.active_connections)}")
async def send_to_connection(self, websocket: WebSocket, data: Dict[str, Any]):
"""Send data to a specific WebSocket connection"""
try:
await websocket.send_text(json.dumps(data))
# Update message count
if websocket in self.connection_metadata:
self.connection_metadata[websocket]["message_count"] += 1
except Exception as e:
logger.error(f"Error sending data to WebSocket: {e}")
await self.disconnect(websocket)
async def broadcast_to_all(self, data: Dict[str, Any]):
"""Broadcast data to all connected WebSocket clients"""
if not self.active_connections:
return
message = json.dumps(data)
disconnected = []
for websocket in self.active_connections:
try:
await websocket.send_text(message)
# Update message count
if websocket in self.connection_metadata:
self.connection_metadata[websocket]["message_count"] += 1
except Exception as e:
logger.error(f"Error broadcasting to WebSocket: {e}")
disconnected.append(websocket)
# Clean up disconnected connections
for websocket in disconnected:
await self.disconnect(websocket)
async def broadcast_to_room(self, room: str, data: Dict[str, Any]):
"""Broadcast data to all clients subscribed to a specific room"""
if room not in self.room_subscriptions:
return
room_connections = self.room_subscriptions[room].copy()
if not room_connections:
return
message = json.dumps(data)
disconnected = []
for websocket in room_connections:
try:
await websocket.send_text(message)
# Update message count
if websocket in self.connection_metadata:
self.connection_metadata[websocket]["message_count"] += 1
except Exception as e:
logger.error(f"Error broadcasting to room {room}: {e}")
disconnected.append(websocket)
# Clean up disconnected connections
for websocket in disconnected:
await self.disconnect(websocket)
async def broadcast_to_sensor(self, sensor_id: str, data: Dict[str, Any]):
"""Broadcast data to all clients subscribed to a specific sensor"""
if sensor_id not in self.sensor_subscriptions:
return
sensor_connections = self.sensor_subscriptions[sensor_id].copy()
if not sensor_connections:
return
message = json.dumps(data)
disconnected = []
for websocket in sensor_connections:
try:
await websocket.send_text(message)
# Update message count
if websocket in self.connection_metadata:
self.connection_metadata[websocket]["message_count"] += 1
except Exception as e:
logger.error(f"Error broadcasting to sensor {sensor_id}: {e}")
disconnected.append(websocket)
# Clean up disconnected connections
for websocket in disconnected:
await self.disconnect(websocket)
async def broadcast_sensor_data(self, sensor_reading: SensorReading):
"""Broadcast sensor reading data to appropriate subscribers"""
data = {
"type": "sensor_data",
"sensor_id": sensor_reading.sensor_id,
"room": sensor_reading.room,
"sensor_type": sensor_reading.sensor_type.value,
"timestamp": sensor_reading.timestamp,
"data": {
"energy": sensor_reading.energy,
"co2": sensor_reading.co2,
"temperature": sensor_reading.temperature,
"humidity": sensor_reading.humidity,
"motion": sensor_reading.motion,
"power": sensor_reading.power,
"voltage": sensor_reading.voltage,
"current": sensor_reading.current,
"generation": sensor_reading.generation
},
"metadata": sensor_reading.metadata
}
# Broadcast to all connections
await self.broadcast_to_all(data)
# Broadcast to room-specific subscribers
if sensor_reading.room:
await self.broadcast_to_room(sensor_reading.room, data)
# Broadcast to sensor-specific subscribers
await self.broadcast_to_sensor(sensor_reading.sensor_id, data)
async def broadcast_room_metrics(self, room: str, metrics: Dict[str, Any]):
"""Broadcast room-level metrics to subscribers"""
data = {
"type": "room_metrics",
"room": room,
"timestamp": asyncio.get_event_loop().time(),
"metrics": metrics
}
# Broadcast to all connections
await self.broadcast_to_all(data)
# Broadcast to room-specific subscribers
await self.broadcast_to_room(room, data)
async def broadcast_system_event(self, event: Dict[str, Any]):
"""Broadcast system events to all subscribers"""
data = {
"type": "system_event",
"timestamp": asyncio.get_event_loop().time(),
"event": event
}
await self.broadcast_to_all(data)
async def broadcast_raw_data(self, raw_data: str):
"""Broadcast raw data from Redis or other sources"""
try:
# Try to parse as JSON
data = json.loads(raw_data)
# Add type if not present
if "type" not in data:
data["type"] = "raw_data"
await self.broadcast_to_all(data)
except json.JSONDecodeError:
# Send as raw string if not JSON
data = {
"type": "raw_data",
"data": raw_data,
"timestamp": asyncio.get_event_loop().time()
}
await self.broadcast_to_all(data)
def get_connection_stats(self) -> Dict[str, Any]:
"""Get statistics about current WebSocket connections"""
total_connections = len(self.active_connections)
room_stats = {room: len(connections) for room, connections in self.room_subscriptions.items()}
sensor_stats = {sensor: len(connections) for sensor, connections in self.sensor_subscriptions.items()}
# Calculate message statistics
total_messages = sum(
metadata.get("message_count", 0)
for metadata in self.connection_metadata.values()
)
return {
"total_connections": total_connections,
"room_subscriptions": room_stats,
"sensor_subscriptions": sensor_stats,
"total_messages_sent": total_messages,
"active_rooms": len([room for room, connections in self.room_subscriptions.items() if connections]),
"active_sensors": len([sensor for sensor, connections in self.sensor_subscriptions.items() if connections])
}
async def send_connection_stats(self):
"""Send connection statistics to all clients"""
stats = self.get_connection_stats()
data = {
"type": "connection_stats",
"timestamp": asyncio.get_event_loop().time(),
"stats": stats
}
await self.broadcast_to_all(data)
async def ping_all_connections(self):
"""Send ping to all connections to keep them alive"""
data = {
"type": "ping",
"timestamp": asyncio.get_event_loop().time()
}
await self.broadcast_to_all(data)