diff --git a/.gitignore b/.gitignore index e4747b8..901218e 100644 --- a/.gitignore +++ b/.gitignore @@ -1,4 +1,7 @@ ### Python ### +#Claude file +.claude/ + # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] diff --git a/api.py b/api.py deleted file mode 100644 index 193dd5a..0000000 --- a/api.py +++ /dev/null @@ -1,582 +0,0 @@ -from fastapi import APIRouter, HTTPException, Query, Depends -from typing import List, Optional, Dict, Any -from datetime import datetime, timedelta -import time -import logging -from pymongo import ASCENDING, DESCENDING - -from database import get_database, redis_manager -from models import ( - DataQuery, DataResponse, SensorReading, SensorMetadata, - RoomMetrics, SystemEvent, SensorType, SensorStatus -) -from persistence import persistence_service -from services.token_service import TokenService - -logger = logging.getLogger(__name__) -router = APIRouter() - -# Dependency to get database -async def get_db(): - return await get_database() - -@router.get("/sensors", summary="Get all sensors") -async def get_sensors( - room: Optional[str] = Query(None, description="Filter by room"), - sensor_type: Optional[SensorType] = Query(None, description="Filter by sensor type"), - status: Optional[SensorStatus] = Query(None, description="Filter by status"), - db=Depends(get_db) -): - """Get list of all registered sensors with optional filtering""" - try: - # Build query - query = {} - if room: - query["room"] = room - if sensor_type: - query["sensor_type"] = sensor_type.value - if status: - query["status"] = status.value - - # Execute query - cursor = db.sensor_metadata.find(query).sort("created_at", DESCENDING) - sensors = await cursor.to_list(length=None) - - # Convert ObjectId to string - for sensor in sensors: - sensor["_id"] = str(sensor["_id"]) - - return { - "sensors": sensors, - "count": len(sensors), - "query": query - } - - except Exception as e: - logger.error(f"Error getting sensors: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@router.get("/sensors/{sensor_id}", summary="Get sensor details") -async def get_sensor(sensor_id: str, db=Depends(get_db)): - """Get detailed information about a specific sensor""" - try: - # Get sensor metadata - sensor = await db.sensor_metadata.find_one({"sensor_id": sensor_id}) - if not sensor: - raise HTTPException(status_code=404, detail="Sensor not found") - - sensor["_id"] = str(sensor["_id"]) - - # Get recent readings (last 24 hours) - recent_readings = await persistence_service.get_recent_readings( - sensor_id=sensor_id, - limit=100, - minutes=1440 # 24 hours - ) - - # Get latest reading from Redis - latest_reading = await redis_manager.get_sensor_data(sensor_id) - - return { - "sensor": sensor, - "latest_reading": latest_reading, - "recent_readings_count": len(recent_readings), - "recent_readings": recent_readings[:10] # Return only 10 most recent - } - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error getting sensor {sensor_id}: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@router.get("/sensors/{sensor_id}/data", summary="Get sensor historical data") -async def get_sensor_data( - sensor_id: str, - start_time: Optional[int] = Query(None, description="Start timestamp (Unix)"), - end_time: Optional[int] = Query(None, description="End timestamp (Unix)"), - limit: int = Query(100, description="Maximum records to return"), - offset: int = Query(0, description="Records to skip"), - db=Depends(get_db) -): - """Get historical data for a specific sensor""" - try: - start_query_time = time.time() - - # Build time range query - query = {"sensor_id": sensor_id} - - if start_time or end_time: - time_query = {} - if start_time: - time_query["$gte"] = datetime.fromtimestamp(start_time) - if end_time: - time_query["$lte"] = datetime.fromtimestamp(end_time) - query["created_at"] = time_query - - # Get total count - total_count = await db.sensor_readings.count_documents(query) - - # Execute query with pagination - cursor = db.sensor_readings.find(query).sort("timestamp", DESCENDING).skip(offset).limit(limit) - readings = await cursor.to_list(length=limit) - - # Convert ObjectId to string - for reading in readings: - reading["_id"] = str(reading["_id"]) - - execution_time = (time.time() - start_query_time) * 1000 # Convert to milliseconds - - return DataResponse( - data=readings, - total_count=total_count, - query=DataQuery( - sensor_ids=[sensor_id], - start_time=start_time, - end_time=end_time, - limit=limit, - offset=offset - ), - execution_time_ms=execution_time - ) - - except Exception as e: - logger.error(f"Error getting sensor data for {sensor_id}: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@router.get("/rooms", summary="Get all rooms") -async def get_rooms(db=Depends(get_db)): - """Get list of all rooms with sensor counts""" - try: - # Get distinct rooms from sensor readings - rooms = await db.sensor_readings.distinct("room", {"room": {"$ne": None}}) - - room_data = [] - for room in rooms: - # Get sensor count for each room - sensor_count = len(await db.sensor_readings.distinct("sensor_id", {"room": room})) - - # Get latest room metrics from Redis - room_metrics = await redis_manager.get_room_metrics(room) - - room_data.append({ - "room": room, - "sensor_count": sensor_count, - "latest_metrics": room_metrics - }) - - return { - "rooms": room_data, - "count": len(room_data) - } - - except Exception as e: - logger.error(f"Error getting rooms: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@router.get("/rooms/{room_name}/data", summary="Get room historical data") -async def get_room_data( - room_name: str, - start_time: Optional[int] = Query(None, description="Start timestamp (Unix)"), - end_time: Optional[int] = Query(None, description="End timestamp (Unix)"), - limit: int = Query(100, description="Maximum records to return"), - db=Depends(get_db) -): - """Get historical data for a specific room""" - try: - start_query_time = time.time() - - # Build query for room metrics - query = {"room": room_name} - - if start_time or end_time: - time_query = {} - if start_time: - time_query["$gte"] = datetime.fromtimestamp(start_time) - if end_time: - time_query["$lte"] = datetime.fromtimestamp(end_time) - query["created_at"] = time_query - - # Get room metrics - cursor = db.room_metrics.find(query).sort("timestamp", DESCENDING).limit(limit) - room_metrics = await cursor.to_list(length=limit) - - # Also get sensor readings for the room - sensor_query = {"room": room_name} - if "created_at" in query: - sensor_query["created_at"] = query["created_at"] - - sensor_cursor = db.sensor_readings.find(sensor_query).sort("timestamp", DESCENDING).limit(limit) - sensor_readings = await sensor_cursor.to_list(length=limit) - - # Convert ObjectId to string - for item in room_metrics + sensor_readings: - item["_id"] = str(item["_id"]) - - execution_time = (time.time() - start_query_time) * 1000 - - return { - "room": room_name, - "room_metrics": room_metrics, - "sensor_readings": sensor_readings, - "execution_time_ms": execution_time - } - - except Exception as e: - logger.error(f"Error getting room data for {room_name}: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@router.post("/data/query", summary="Advanced data query", response_model=DataResponse) -async def query_data(query_params: DataQuery, db=Depends(get_db)): - """Advanced data querying with multiple filters and aggregations""" - try: - start_query_time = time.time() - - # Build MongoDB query - mongo_query = {} - - # Sensor filters - if query_params.sensor_ids: - mongo_query["sensor_id"] = {"$in": query_params.sensor_ids} - - if query_params.rooms: - mongo_query["room"] = {"$in": query_params.rooms} - - if query_params.sensor_types: - mongo_query["sensor_type"] = {"$in": [st.value for st in query_params.sensor_types]} - - # Time range - if query_params.start_time or query_params.end_time: - time_query = {} - if query_params.start_time: - time_query["$gte"] = datetime.fromtimestamp(query_params.start_time) - if query_params.end_time: - time_query["$lte"] = datetime.fromtimestamp(query_params.end_time) - mongo_query["created_at"] = time_query - - # Get total count - total_count = await db.sensor_readings.count_documents(mongo_query) - - # Execute query with pagination and sorting - sort_direction = DESCENDING if query_params.sort_order == "desc" else ASCENDING - - cursor = db.sensor_readings.find(mongo_query).sort( - query_params.sort_by, sort_direction - ).skip(query_params.offset).limit(query_params.limit) - - readings = await cursor.to_list(length=query_params.limit) - - # Convert ObjectId to string - for reading in readings: - reading["_id"] = str(reading["_id"]) - - execution_time = (time.time() - start_query_time) * 1000 - - return DataResponse( - data=readings, - total_count=total_count, - query=query_params, - execution_time_ms=execution_time - ) - - except Exception as e: - logger.error(f"Error executing data query: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@router.get("/analytics/summary", summary="Get analytics summary") -async def get_analytics_summary( - hours: int = Query(24, description="Hours of data to analyze"), - db=Depends(get_db) -): - """Get analytics summary for the specified time period""" - try: - start_time = datetime.utcnow() - timedelta(hours=hours) - - # Aggregation pipeline for analytics - pipeline = [ - {"$match": {"created_at": {"$gte": start_time}}}, - {"$group": { - "_id": { - "sensor_id": "$sensor_id", - "room": "$room", - "sensor_type": "$sensor_type" - }, - "reading_count": {"$sum": 1}, - "avg_energy": {"$avg": "$energy.value"}, - "total_energy": {"$sum": "$energy.value"}, - "avg_co2": {"$avg": "$co2.value"}, - "max_co2": {"$max": "$co2.value"}, - "avg_temperature": {"$avg": "$temperature.value"}, - "latest_timestamp": {"$max": "$timestamp"} - }}, - {"$sort": {"total_energy": -1}} - ] - - cursor = db.sensor_readings.aggregate(pipeline) - analytics = await cursor.to_list(length=None) - - # Room-level summary - room_pipeline = [ - {"$match": {"created_at": {"$gte": start_time}, "room": {"$ne": None}}}, - {"$group": { - "_id": "$room", - "sensor_count": {"$addToSet": "$sensor_id"}, - "total_energy": {"$sum": "$energy.value"}, - "avg_co2": {"$avg": "$co2.value"}, - "max_co2": {"$max": "$co2.value"}, - "reading_count": {"$sum": 1} - }}, - {"$project": { - "room": "$_id", - "sensor_count": {"$size": "$sensor_count"}, - "total_energy": 1, - "avg_co2": 1, - "max_co2": 1, - "reading_count": 1 - }}, - {"$sort": {"total_energy": -1}} - ] - - room_cursor = db.sensor_readings.aggregate(room_pipeline) - room_analytics = await room_cursor.to_list(length=None) - - return { - "period_hours": hours, - "start_time": start_time.isoformat(), - "sensor_analytics": analytics, - "room_analytics": room_analytics, - "summary": { - "total_sensors_analyzed": len(analytics), - "total_rooms_analyzed": len(room_analytics), - "total_readings": sum(item["reading_count"] for item in analytics) - } - } - - except Exception as e: - logger.error(f"Error getting analytics summary: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@router.get("/events", summary="Get system events") -async def get_events( - severity: Optional[str] = Query(None, description="Filter by severity"), - event_type: Optional[str] = Query(None, description="Filter by event type"), - hours: int = Query(24, description="Hours of events to retrieve"), - limit: int = Query(50, description="Maximum events to return"), - db=Depends(get_db) -): - """Get recent system events and alerts""" - try: - start_time = datetime.utcnow() - timedelta(hours=hours) - - # Build query - query = {"created_at": {"$gte": start_time}} - - if severity: - query["severity"] = severity - - if event_type: - query["event_type"] = event_type - - # Execute query - cursor = db.system_events.find(query).sort("timestamp", DESCENDING).limit(limit) - events = await cursor.to_list(length=limit) - - # Convert ObjectId to string - for event in events: - event["_id"] = str(event["_id"]) - - return { - "events": events, - "count": len(events), - "period_hours": hours - } - - except Exception as e: - logger.error(f"Error getting events: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@router.put("/sensors/{sensor_id}/metadata", summary="Update sensor metadata") -async def update_sensor_metadata( - sensor_id: str, - metadata: dict, - db=Depends(get_db) -): - """Update sensor metadata""" - try: - # Update timestamp - metadata["updated_at"] = datetime.utcnow() - - result = await db.sensor_metadata.update_one( - {"sensor_id": sensor_id}, - {"$set": metadata} - ) - - if result.matched_count == 0: - raise HTTPException(status_code=404, detail="Sensor not found") - - return {"message": "Sensor metadata updated successfully", "modified": result.modified_count > 0} - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error updating sensor metadata: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@router.delete("/sensors/{sensor_id}", summary="Delete sensor and all its data") -async def delete_sensor(sensor_id: str, db=Depends(get_db)): - """Delete a sensor and all its associated data""" - try: - # Delete sensor readings - readings_result = await db.sensor_readings.delete_many({"sensor_id": sensor_id}) - - # Delete sensor metadata - metadata_result = await db.sensor_metadata.delete_one({"sensor_id": sensor_id}) - - # Delete from Redis cache - await redis_manager.redis_client.delete(f"sensor:latest:{sensor_id}") - await redis_manager.redis_client.delete(f"sensor:status:{sensor_id}") - - if metadata_result.deleted_count == 0: - raise HTTPException(status_code=404, detail="Sensor not found") - - return { - "message": "Sensor deleted successfully", - "sensor_id": sensor_id, - "readings_deleted": readings_result.deleted_count, - "metadata_deleted": metadata_result.deleted_count - } - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error deleting sensor {sensor_id}: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@router.get("/export", summary="Export data") -async def export_data( - start_time: int = Query(..., description="Start timestamp (Unix)"), - end_time: int = Query(..., description="End timestamp (Unix)"), - sensor_ids: Optional[str] = Query(None, description="Comma-separated sensor IDs"), - format: str = Query("json", description="Export format (json, csv)"), - db=Depends(get_db) -): - """Export sensor data for the specified time range""" - try: - # Build query - query = { - "created_at": { - "$gte": datetime.fromtimestamp(start_time), - "$lte": datetime.fromtimestamp(end_time) - } - } - - if sensor_ids: - sensor_list = [sid.strip() for sid in sensor_ids.split(",")] - query["sensor_id"] = {"$in": sensor_list} - - # Get data - cursor = db.sensor_readings.find(query).sort("timestamp", ASCENDING) - readings = await cursor.to_list(length=None) - - # Convert ObjectId to string - for reading in readings: - reading["_id"] = str(reading["_id"]) - # Convert datetime to ISO string for JSON serialization - if "created_at" in reading: - reading["created_at"] = reading["created_at"].isoformat() - - if format.lower() == "csv": - # TODO: Implement CSV export - raise HTTPException(status_code=501, detail="CSV export not yet implemented") - - return { - "data": readings, - "count": len(readings), - "export_params": { - "start_time": start_time, - "end_time": end_time, - "sensor_ids": sensor_ids.split(",") if sensor_ids else None, - "format": format - } - } - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error exporting data: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -# Token Management Endpoints -@router.get("/tokens", summary="Get all tokens") -async def get_tokens(db=Depends(get_db)): - """Get list of all tokens""" - try: - token_service = TokenService(db) - tokens = await token_service.get_tokens() - return {"tokens": tokens} - except Exception as e: - logger.error(f"Error getting tokens: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@router.post("/tokens/generate", summary="Generate new token") -async def generate_token( - name: str, - list_of_resources: List[str], - data_aggregation: bool = False, - time_aggregation: bool = False, - embargo: int = 0, - exp_hours: int = 24, - db=Depends(get_db) -): - """Generate a new JWT token with specified permissions""" - try: - token_service = TokenService(db) - token = token_service.generate_token( - name=name, - list_of_resources=list_of_resources, - data_aggregation=data_aggregation, - time_aggregation=time_aggregation, - embargo=embargo, - exp_hours=exp_hours - ) - return {"token": token} - except Exception as e: - logger.error(f"Error generating token: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@router.post("/tokens/check", summary="Validate token") -async def check_token(token: str, db=Depends(get_db)): - """Check token validity and decode payload""" - try: - token_service = TokenService(db) - decoded = token_service.decode_token(token) - return decoded - except Exception as e: - logger.error(f"Error checking token: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@router.post("/tokens/save", summary="Save token to database") -async def save_token(token: str, db=Depends(get_db)): - """Save a valid token to the database""" - try: - token_service = TokenService(db) - result = await token_service.insert_token(token) - return result - except ValueError as e: - raise HTTPException(status_code=400, detail=str(e)) - except Exception as e: - logger.error(f"Error saving token: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@router.post("/tokens/revoke", summary="Revoke token") -async def revoke_token(token: str, db=Depends(get_db)): - """Revoke a token by marking it as inactive""" - try: - token_service = TokenService(db) - result = await token_service.revoke_token(token) - return result - except ValueError as e: - raise HTTPException(status_code=404, detail=str(e)) - except Exception as e: - logger.error(f"Error revoking token: {e}") - raise HTTPException(status_code=500, detail="Internal server error") \ No newline at end of file diff --git a/database.py b/database.py deleted file mode 100644 index 0fb25e3..0000000 --- a/database.py +++ /dev/null @@ -1,220 +0,0 @@ -import os -from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase -from pymongo import IndexModel, ASCENDING, DESCENDING -from typing import Optional -import asyncio -from datetime import datetime, timedelta -import logging - -# Configure logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -class MongoDB: - client: Optional[AsyncIOMotorClient] = None - database: Optional[AsyncIOMotorDatabase] = None - -# Global MongoDB instance -mongodb = MongoDB() - -async def connect_to_mongo(): - """Create database connection""" - try: - # MongoDB connection string - default to localhost for development - mongodb_url = os.getenv("MONGODB_URL", "mongodb://localhost:27017") - database_name = os.getenv("DATABASE_NAME", "energy_monitoring") - - logger.info(f"Connecting to MongoDB at: {mongodb_url}") - - # Create async MongoDB client - mongodb.client = AsyncIOMotorClient(mongodb_url) - - # Test the connection - await mongodb.client.admin.command('ping') - logger.info("Successfully connected to MongoDB") - - # Get database - mongodb.database = mongodb.client[database_name] - - # Create indexes for better performance - await create_indexes() - - except Exception as e: - logger.error(f"Error connecting to MongoDB: {e}") - raise - -async def close_mongo_connection(): - """Close database connection""" - if mongodb.client: - mongodb.client.close() - logger.info("Disconnected from MongoDB") - -async def create_indexes(): - """Create database indexes for optimal performance""" - try: - # Sensor readings collection indexes - sensor_readings_indexes = [ - IndexModel([("sensor_id", ASCENDING), ("timestamp", DESCENDING)]), - IndexModel([("timestamp", DESCENDING)]), - IndexModel([("room", ASCENDING), ("timestamp", DESCENDING)]), - IndexModel([("sensor_type", ASCENDING), ("timestamp", DESCENDING)]), - IndexModel([("created_at", DESCENDING)]), - ] - await mongodb.database.sensor_readings.create_indexes(sensor_readings_indexes) - - # Room metrics collection indexes - room_metrics_indexes = [ - IndexModel([("room", ASCENDING), ("timestamp", DESCENDING)]), - IndexModel([("timestamp", DESCENDING)]), - IndexModel([("created_at", DESCENDING)]), - ] - await mongodb.database.room_metrics.create_indexes(room_metrics_indexes) - - # Sensor metadata collection indexes - sensor_metadata_indexes = [ - IndexModel([("sensor_id", ASCENDING)], unique=True), - IndexModel([("room", ASCENDING)]), - IndexModel([("sensor_type", ASCENDING)]), - IndexModel([("status", ASCENDING)]), - ] - await mongodb.database.sensor_metadata.create_indexes(sensor_metadata_indexes) - - # System events collection indexes - system_events_indexes = [ - IndexModel([("timestamp", DESCENDING)]), - IndexModel([("event_type", ASCENDING), ("timestamp", DESCENDING)]), - IndexModel([("severity", ASCENDING), ("timestamp", DESCENDING)]), - ] - await mongodb.database.system_events.create_indexes(system_events_indexes) - - logger.info("Database indexes created successfully") - - except Exception as e: - logger.error(f"Error creating indexes: {e}") - -async def get_database() -> AsyncIOMotorDatabase: - """Get database instance""" - if not mongodb.database: - await connect_to_mongo() - return mongodb.database - -class RedisManager: - """Redis connection and operations manager""" - - def __init__(self): - self.redis_client = None - self.redis_host = os.getenv("REDIS_HOST", "localhost") - self.redis_port = int(os.getenv("REDIS_PORT", "6379")) - self.redis_db = int(os.getenv("REDIS_DB", "0")) - - async def connect(self): - """Connect to Redis""" - try: - import redis.asyncio as redis - self.redis_client = redis.Redis( - host=self.redis_host, - port=self.redis_port, - db=self.redis_db, - decode_responses=True - ) - await self.redis_client.ping() - logger.info("Successfully connected to Redis") - except Exception as e: - logger.error(f"Error connecting to Redis: {e}") - raise - - async def disconnect(self): - """Disconnect from Redis""" - if self.redis_client: - await self.redis_client.close() - logger.info("Disconnected from Redis") - - async def set_sensor_data(self, sensor_id: str, data: dict, expire_time: int = 3600): - """Store latest sensor data in Redis with expiration""" - if not self.redis_client: - await self.connect() - - key = f"sensor:latest:{sensor_id}" - await self.redis_client.setex(key, expire_time, str(data)) - - async def get_sensor_data(self, sensor_id: str) -> Optional[dict]: - """Get latest sensor data from Redis""" - if not self.redis_client: - await self.connect() - - key = f"sensor:latest:{sensor_id}" - data = await self.redis_client.get(key) - if data: - import json - return json.loads(data) - return None - - async def set_room_metrics(self, room: str, metrics: dict, expire_time: int = 1800): - """Store room aggregated metrics in Redis""" - if not self.redis_client: - await self.connect() - - key = f"room:metrics:{room}" - await self.redis_client.setex(key, expire_time, str(metrics)) - - async def get_room_metrics(self, room: str) -> Optional[dict]: - """Get room aggregated metrics from Redis""" - if not self.redis_client: - await self.connect() - - key = f"room:metrics:{room}" - data = await self.redis_client.get(key) - if data: - import json - return json.loads(data) - return None - - async def get_all_active_sensors(self) -> list: - """Get list of all sensors with recent data in Redis""" - if not self.redis_client: - await self.connect() - - keys = await self.redis_client.keys("sensor:latest:*") - return [key.replace("sensor:latest:", "") for key in keys] - -# Global Redis manager instance -redis_manager = RedisManager() - -async def cleanup_old_data(): - """Cleanup old data from MongoDB (retention policy)""" - try: - db = await get_database() - - # Delete sensor readings older than 90 days - retention_date = datetime.utcnow() - timedelta(days=90) - result = await db.sensor_readings.delete_many({ - "created_at": {"$lt": retention_date} - }) - - if result.deleted_count > 0: - logger.info(f"Deleted {result.deleted_count} old sensor readings") - - # Delete room metrics older than 30 days - retention_date = datetime.utcnow() - timedelta(days=30) - result = await db.room_metrics.delete_many({ - "created_at": {"$lt": retention_date} - }) - - if result.deleted_count > 0: - logger.info(f"Deleted {result.deleted_count} old room metrics") - - except Exception as e: - logger.error(f"Error cleaning up old data: {e}") - -# Scheduled cleanup task -async def schedule_cleanup(): - """Schedule periodic cleanup of old data""" - while True: - try: - await cleanup_old_data() - # Wait 24 hours before next cleanup - await asyncio.sleep(24 * 60 * 60) - except Exception as e: - logger.error(f"Error in scheduled cleanup: {e}") - # Wait 1 hour before retrying - await asyncio.sleep(60 * 60) \ No newline at end of file diff --git a/main_layered.py b/main_layered.py deleted file mode 100644 index 0b40940..0000000 --- a/main_layered.py +++ /dev/null @@ -1,273 +0,0 @@ -""" -Main application entry point with layered architecture -This is the new structured version of the FastAPI application -""" -import asyncio -import time -from contextlib import asynccontextmanager -from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException -from fastapi.middleware.cors import CORSMiddleware -import logging - -# Import layered components -from layers.infrastructure.database_connection import database_connection -from layers.infrastructure.redis_connection import redis_connection -from layers.business.sensor_service import SensorService -from layers.business.cleanup_service import cleanup_service -from layers.presentation.websocket_handler import websocket_manager -from layers.presentation.redis_subscriber import redis_subscriber -from layers.presentation.api_routes import router as api_router -from models import HealthCheck - -# Configure logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -# Application startup time for uptime calculation -app_start_time = time.time() - -@asynccontextmanager -async def lifespan(app: FastAPI): - """Application lifespan manager with proper layer initialization""" - # Startup - logger.info("Application starting up...") - - try: - # Initialize infrastructure layer - await database_connection.connect() - await redis_connection.connect() - logger.info("Infrastructure layer initialized") - - # Initialize business layer - sensor_service = SensorService() # Services are initialized on-demand - logger.info("Business layer initialized") - - # Initialize presentation layer - await redis_subscriber.start_subscription("energy_data") - await cleanup_service.start_scheduled_cleanup(24) # Daily cleanup - logger.info("Presentation layer initialized") - - logger.info("Application startup complete") - - yield - - # Shutdown - logger.info("Application shutting down...") - - # Stop background tasks - await redis_subscriber.stop_subscription() - await cleanup_service.stop_scheduled_cleanup() - - # Close connections - await database_connection.disconnect() - await redis_connection.disconnect() - - logger.info("Application shutdown complete") - - except Exception as e: - logger.error(f"Error during application lifecycle: {e}") - raise - -app = FastAPI( - title="Energy Monitoring Dashboard API", - description="Real-time energy monitoring and IoT sensor data management system (Layered Architecture)", - version="2.0.0", - lifespan=lifespan -) - -# Add CORS middleware -app.add_middleware( - CORSMiddleware, - allow_origins=["*"], # Configure appropriately for production - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) - -# Include API router with version prefix -app.include_router(api_router, prefix="/api/v1") - -@app.websocket("/ws") -async def websocket_endpoint(websocket: WebSocket): - """ - WebSocket endpoint for real-time data streaming - Presentation Layer - handles WebSocket connections - """ - await websocket_manager.connect(websocket) - try: - while True: - # Keep the connection alive by waiting for messages - await websocket.receive_text() - except WebSocketDisconnect: - websocket_manager.disconnect(websocket) - -@app.get("/") -async def read_root(): - """Root endpoint with basic system information""" - return { - "message": "Energy Monitoring Dashboard Backend (Layered Architecture)", - "version": "2.0.0", - "status": "running", - "uptime_seconds": time.time() - app_start_time, - "architecture": "3-layer (Presentation, Business, Infrastructure)" - } - -@app.get("/health", response_model=HealthCheck) -async def health_check(): - """ - Comprehensive health check endpoint - Checks all layers and dependencies - """ - try: - # Check infrastructure layer - mongodb_connected = True - redis_connected = True - - try: - db = await database_connection.get_database() - await db.command("ping") - except: - mongodb_connected = False - - try: - redis_client = await redis_connection.get_client() - await redis_client.ping() - except: - redis_connected = False - - # Check business layer through service - sensor_service = SensorService() - from layers.infrastructure.repositories import SensorReadingRepository - stats_repo = SensorReadingRepository() - - # Get basic statistics - try: - # Simple count queries to test business layer - total_readings = await stats_repo.count_by_query({}) - active_sensors_data = await redis_connection.get_keys_by_pattern("sensor:latest:*") - total_sensors = len(active_sensors_data) - except Exception as e: - logger.error(f"Error getting stats for health check: {e}") - total_readings = 0 - total_sensors = 0 - - # Check presentation layer - websocket_connections = websocket_manager.get_connection_count() - redis_subscription_active = redis_subscriber.is_subscriber_running() - - # Determine overall status - status = "healthy" - if not mongodb_connected or not redis_connected: - status = "degraded" - if not mongodb_connected and not redis_connected: - status = "unhealthy" - - return HealthCheck( - status=status, - mongodb_connected=mongodb_connected, - redis_connected=redis_connected, - total_sensors=total_sensors, - active_sensors=total_sensors, # Approximation - total_readings=total_readings, - uptime_seconds=time.time() - app_start_time - ) - - except Exception as e: - logger.error(f"Health check failed: {e}") - raise HTTPException(status_code=503, detail="Service Unavailable") - -@app.get("/status") -async def system_status(): - """ - Detailed system status endpoint with layer-specific information - """ - try: - # Infrastructure layer status - infrastructure_status = { - "database_connected": True, - "redis_connected": True - } - - try: - db = await database_connection.get_database() - await db.command("ping") - except: - infrastructure_status["database_connected"] = False - - try: - redis_client = await redis_connection.get_client() - await redis_client.ping() - except: - infrastructure_status["redis_connected"] = False - - # Business layer status - business_status = { - "cleanup_service_running": cleanup_service.is_cleanup_running() - } - - # Presentation layer status - presentation_status = { - "active_websocket_connections": websocket_manager.get_connection_count(), - "redis_subscriber_running": redis_subscriber.is_subscriber_running() - } - - # Get subscriber status details - subscriber_status = await redis_subscriber.get_subscriber_status() - - return { - "timestamp": time.time(), - "uptime_seconds": time.time() - app_start_time, - "architecture": "layered", - "layers": { - "infrastructure": infrastructure_status, - "business": business_status, - "presentation": presentation_status - }, - "redis_subscriber": subscriber_status - } - - except Exception as e: - logger.error(f"Status check failed: {e}") - raise HTTPException(status_code=500, detail="Internal Server Error") - -@app.get("/system/cleanup", summary="Get cleanup service status") -async def get_cleanup_status(): - """Get data cleanup service status and statistics""" - try: - # Get cleanup service status - cleanup_running = cleanup_service.is_cleanup_running() - - # Get storage statistics - storage_stats = await cleanup_service.get_storage_statistics() - - # Get retention policy info - retention_info = await cleanup_service.get_data_retention_info() - - return { - "cleanup_service_running": cleanup_running, - "storage_statistics": storage_stats, - "retention_policies": retention_info - } - - except Exception as e: - logger.error(f"Error getting cleanup status: {e}") - raise HTTPException(status_code=500, detail="Internal Server Error") - -@app.post("/system/cleanup", summary="Run manual cleanup") -async def run_manual_cleanup(): - """Manually trigger data cleanup process""" - try: - cleanup_results = await cleanup_service.cleanup_old_data() - - return { - "message": "Manual cleanup completed", - "results": cleanup_results - } - - except Exception as e: - logger.error(f"Error running manual cleanup: {e}") - raise HTTPException(status_code=500, detail="Internal Server Error") - -if __name__ == "__main__": - import uvicorn - uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file diff --git a/models.py b/models.py deleted file mode 100644 index 9516612..0000000 --- a/models.py +++ /dev/null @@ -1,236 +0,0 @@ -from pydantic import BaseModel, Field -from typing import Optional, List, Dict, Any, Literal -from datetime import datetime -from enum import Enum - -class SensorType(str, Enum): - ENERGY = "energy" - CO2 = "co2" - TEMPERATURE = "temperature" - HUMIDITY = "humidity" - HVAC = "hvac" - LIGHTING = "lighting" - SECURITY = "security" - -class SensorStatus(str, Enum): - ONLINE = "online" - OFFLINE = "offline" - ERROR = "error" - MAINTENANCE = "maintenance" - -class CO2Status(str, Enum): - GOOD = "good" - MODERATE = "moderate" - POOR = "poor" - CRITICAL = "critical" - -class OccupancyLevel(str, Enum): - LOW = "low" - MEDIUM = "medium" - HIGH = "high" - -# Base Models -class SensorReading(BaseModel): - """Individual sensor reading model""" - sensor_id: str = Field(..., description="Unique sensor identifier") - room: Optional[str] = Field(None, description="Room where sensor is located") - sensor_type: SensorType = Field(..., description="Type of sensor") - timestamp: int = Field(..., description="Unix timestamp of reading") - created_at: datetime = Field(default_factory=datetime.utcnow, description="Record creation timestamp") - - # Sensor values - energy: Optional[Dict[str, Any]] = Field(None, description="Energy reading with value and unit") - co2: Optional[Dict[str, Any]] = Field(None, description="CO2 reading with value and unit") - temperature: Optional[Dict[str, Any]] = Field(None, description="Temperature reading with value and unit") - humidity: Optional[Dict[str, Any]] = Field(None, description="Humidity reading with value and unit") - motion: Optional[Dict[str, Any]] = Field(None, description="Motion detection reading") - - # Metadata - metadata: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Additional sensor metadata") - - class Config: - json_encoders = { - datetime: lambda v: v.isoformat() - } - -class LegacySensorReading(BaseModel): - """Legacy sensor reading format for backward compatibility""" - sensor_id: str = Field(..., alias="sensorId") - timestamp: int - value: float - unit: str - created_at: datetime = Field(default_factory=datetime.utcnow) - - class Config: - allow_population_by_field_name = True - -class SensorMetadata(BaseModel): - """Sensor configuration and metadata""" - sensor_id: str = Field(..., description="Unique sensor identifier") - name: str = Field(..., description="Human-readable sensor name") - sensor_type: SensorType = Field(..., description="Type of sensor") - room: Optional[str] = Field(None, description="Room assignment") - status: SensorStatus = Field(default=SensorStatus.OFFLINE, description="Current sensor status") - - # Physical location and installation details - location: Optional[str] = Field(None, description="Physical location description") - floor: Optional[str] = Field(None, description="Floor level") - building: Optional[str] = Field(None, description="Building identifier") - - # Technical specifications - model: Optional[str] = Field(None, description="Sensor model") - manufacturer: Optional[str] = Field(None, description="Sensor manufacturer") - firmware_version: Optional[str] = Field(None, description="Firmware version") - hardware_version: Optional[str] = Field(None, description="Hardware version") - - # Network and connectivity - ip_address: Optional[str] = Field(None, description="IP address if network connected") - mac_address: Optional[str] = Field(None, description="MAC address") - connection_type: Optional[str] = Field(None, description="Connection type (wifi, ethernet, zigbee, etc.)") - - # Power and maintenance - battery_level: Optional[float] = Field(None, description="Battery level percentage") - last_maintenance: Optional[datetime] = Field(None, description="Last maintenance date") - next_maintenance: Optional[datetime] = Field(None, description="Next scheduled maintenance") - - # Operational settings - sampling_rate: Optional[int] = Field(None, description="Data sampling rate in seconds") - calibration_date: Optional[datetime] = Field(None, description="Last calibration date") - - # Capabilities - monitoring_capabilities: List[str] = Field(default_factory=list, description="List of monitoring capabilities") - control_capabilities: List[str] = Field(default_factory=list, description="List of control capabilities") - - # Timestamps - installed_at: Optional[datetime] = Field(None, description="Installation timestamp") - last_seen: Optional[datetime] = Field(None, description="Last communication timestamp") - created_at: datetime = Field(default_factory=datetime.utcnow, description="Record creation timestamp") - updated_at: datetime = Field(default_factory=datetime.utcnow, description="Record update timestamp") - - class Config: - json_encoders = { - datetime: lambda v: v.isoformat() if v else None - } - -class RoomMetrics(BaseModel): - """Aggregated room-level metrics""" - room: str = Field(..., description="Room identifier") - timestamp: int = Field(..., description="Metrics calculation timestamp") - created_at: datetime = Field(default_factory=datetime.utcnow, description="Record creation timestamp") - - # Sensor inventory - sensor_count: int = Field(0, description="Total number of sensors in room") - active_sensors: List[str] = Field(default_factory=list, description="List of active sensor IDs") - sensor_types: List[SensorType] = Field(default_factory=list, description="Types of sensors present") - - # Energy metrics - energy: Optional[Dict[str, Any]] = Field(None, description="Energy consumption metrics") - # Format: {"current": float, "total": float, "average": float, "peak": float, "unit": str} - - # Environmental metrics - co2: Optional[Dict[str, Any]] = Field(None, description="CO2 level metrics") - # Format: {"current": float, "average": float, "max": float, "min": float, "status": CO2Status, "unit": str} - - temperature: Optional[Dict[str, Any]] = Field(None, description="Temperature metrics") - # Format: {"current": float, "average": float, "max": float, "min": float, "unit": str} - - humidity: Optional[Dict[str, Any]] = Field(None, description="Humidity metrics") - # Format: {"current": float, "average": float, "max": float, "min": float, "unit": str} - - # Occupancy and usage - occupancy_estimate: OccupancyLevel = Field(default=OccupancyLevel.LOW, description="Estimated occupancy level") - motion_detected: bool = Field(default=False, description="Recent motion detection status") - - # Time-based metrics - last_activity: Optional[datetime] = Field(None, description="Last detected activity timestamp") - daily_usage_hours: Optional[float] = Field(None, description="Estimated daily usage in hours") - - class Config: - json_encoders = { - datetime: lambda v: v.isoformat() if v else None - } - -class SystemEvent(BaseModel): - """System events and alerts""" - event_id: str = Field(..., description="Unique event identifier") - event_type: str = Field(..., description="Type of event") - severity: Literal["info", "warning", "error", "critical"] = Field(..., description="Event severity") - timestamp: int = Field(..., description="Event timestamp") - created_at: datetime = Field(default_factory=datetime.utcnow, description="Record creation timestamp") - - # Event details - title: str = Field(..., description="Event title") - description: str = Field(..., description="Event description") - source: Optional[str] = Field(None, description="Event source (sensor_id, system component, etc.)") - - # Context - sensor_id: Optional[str] = Field(None, description="Related sensor ID") - room: Optional[str] = Field(None, description="Related room") - - # Event data - data: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Additional event data") - - # Status tracking - acknowledged: bool = Field(default=False, description="Whether event has been acknowledged") - resolved: bool = Field(default=False, description="Whether event has been resolved") - acknowledged_by: Optional[str] = Field(None, description="Who acknowledged the event") - resolved_by: Optional[str] = Field(None, description="Who resolved the event") - acknowledged_at: Optional[datetime] = Field(None, description="Acknowledgment timestamp") - resolved_at: Optional[datetime] = Field(None, description="Resolution timestamp") - - class Config: - json_encoders = { - datetime: lambda v: v.isoformat() if v else None - } - -class DataQuery(BaseModel): - """Data query parameters for historical data retrieval""" - sensor_ids: Optional[List[str]] = Field(None, description="Filter by sensor IDs") - rooms: Optional[List[str]] = Field(None, description="Filter by rooms") - sensor_types: Optional[List[SensorType]] = Field(None, description="Filter by sensor types") - - # Time range - start_time: Optional[int] = Field(None, description="Start timestamp (Unix)") - end_time: Optional[int] = Field(None, description="End timestamp (Unix)") - - # Aggregation - aggregate: Optional[str] = Field(None, description="Aggregation method (avg, sum, min, max)") - interval: Optional[str] = Field(None, description="Aggregation interval (1m, 5m, 1h, 1d)") - - # Pagination - limit: int = Field(default=100, description="Maximum number of records to return") - offset: int = Field(default=0, description="Number of records to skip") - - # Sorting - sort_by: str = Field(default="timestamp", description="Field to sort by") - sort_order: Literal["asc", "desc"] = Field(default="desc", description="Sort order") - -class DataResponse(BaseModel): - """Response model for data queries""" - data: List[Dict[str, Any]] = Field(default_factory=list, description="Query results") - total_count: int = Field(0, description="Total number of matching records") - query: DataQuery = Field(..., description="Original query parameters") - execution_time_ms: float = Field(..., description="Query execution time in milliseconds") - -class HealthCheck(BaseModel): - """Health check response model""" - status: str = Field(..., description="Overall system status") - timestamp: datetime = Field(default_factory=datetime.utcnow) - - # Database status - mongodb_connected: bool = Field(..., description="MongoDB connection status") - redis_connected: bool = Field(..., description="Redis connection status") - - # Data statistics - total_sensors: int = Field(0, description="Total number of registered sensors") - active_sensors: int = Field(0, description="Number of active sensors") - total_readings: int = Field(0, description="Total sensor readings in database") - - # System metrics - uptime_seconds: float = Field(..., description="System uptime in seconds") - memory_usage_mb: Optional[float] = Field(None, description="Memory usage in MB") - - class Config: - json_encoders = { - datetime: lambda v: v.isoformat() - } \ No newline at end of file diff --git a/persistence.py b/persistence.py deleted file mode 100644 index 11ec778..0000000 --- a/persistence.py +++ /dev/null @@ -1,448 +0,0 @@ -import json -import asyncio -from datetime import datetime, timedelta -from typing import Dict, Any, List, Optional -import logging -from pymongo.errors import DuplicateKeyError -import uuid - -from database import get_database, redis_manager -from models import ( - SensorReading, LegacySensorReading, SensorMetadata, RoomMetrics, - SystemEvent, SensorType, SensorStatus, CO2Status, OccupancyLevel -) - -logger = logging.getLogger(__name__) - -class DataPersistenceService: - """Service for persisting sensor data to MongoDB and managing Redis cache""" - - def __init__(self): - self.db = None - self.redis = redis_manager - - async def initialize(self): - """Initialize the persistence service""" - self.db = await get_database() - await self.redis.connect() - logger.info("Data persistence service initialized") - - async def process_sensor_message(self, message_data: str) -> bool: - """Process incoming sensor message and persist data""" - try: - # Parse the message - data = json.loads(message_data) - logger.debug(f"Processing sensor message: {data}") - - # Determine message format and convert to standard format - if self._is_legacy_format(data): - sensor_reading = await self._convert_legacy_data(data) - else: - sensor_reading = SensorReading(**data) - - # Store in MongoDB - await self._store_sensor_reading(sensor_reading) - - # Update Redis cache for real-time access - await self._update_redis_cache(sensor_reading) - - # Update sensor metadata - await self._update_sensor_metadata(sensor_reading) - - # Calculate and store room metrics - await self._update_room_metrics(sensor_reading) - - # Check for alerts and anomalies - await self._check_alerts(sensor_reading) - - return True - - except Exception as e: - logger.error(f"Error processing sensor message: {e}") - # Log the error event - await self._log_system_event( - event_type="data_processing_error", - severity="error", - title="Sensor Data Processing Failed", - description=f"Failed to process sensor message: {str(e)}", - data={"raw_message": message_data} - ) - return False - - def _is_legacy_format(self, data: dict) -> bool: - """Check if data is in legacy format""" - legacy_keys = {"sensorId", "timestamp", "value", "unit"} - return legacy_keys.issubset(data.keys()) and "energy" not in data - - async def _convert_legacy_data(self, data: dict) -> SensorReading: - """Convert legacy format to new sensor reading format""" - legacy_reading = LegacySensorReading(**data) - - return SensorReading( - sensor_id=legacy_reading.sensor_id, - sensor_type=SensorType.ENERGY, # Assume legacy data is energy - timestamp=legacy_reading.timestamp, - created_at=legacy_reading.created_at, - energy={ - "value": legacy_reading.value, - "unit": legacy_reading.unit - } - ) - - async def _store_sensor_reading(self, reading: SensorReading): - """Store sensor reading in MongoDB""" - try: - reading_dict = reading.dict() - - # Add document ID for deduplication - reading_dict["_id"] = f"{reading.sensor_id}_{reading.timestamp}" - - await self.db.sensor_readings.insert_one(reading_dict) - logger.debug(f"Stored sensor reading for {reading.sensor_id}") - - except DuplicateKeyError: - logger.debug(f"Duplicate reading ignored for {reading.sensor_id} at {reading.timestamp}") - except Exception as e: - logger.error(f"Error storing sensor reading: {e}") - raise - - async def _update_redis_cache(self, reading: SensorReading): - """Update Redis cache with latest sensor data""" - try: - # Store latest reading for real-time access - await self.redis.set_sensor_data( - reading.sensor_id, - reading.dict(), - expire_time=3600 # 1 hour expiration - ) - - # Store sensor status - status_key = f"sensor:status:{reading.sensor_id}" - await self.redis.redis_client.setex( - status_key, - 1800, # 30 minutes - json.dumps({ - "status": "online", - "last_seen": reading.timestamp, - "room": reading.room - }) - ) - - except Exception as e: - logger.error(f"Error updating Redis cache: {e}") - - async def _update_sensor_metadata(self, reading: SensorReading): - """Update or create sensor metadata""" - try: - # Check if sensor metadata exists - existing = await self.db.sensor_metadata.find_one({"sensor_id": reading.sensor_id}) - - if existing: - # Update existing metadata - await self.db.sensor_metadata.update_one( - {"sensor_id": reading.sensor_id}, - { - "$set": { - "last_seen": datetime.utcnow(), - "status": SensorStatus.ONLINE.value, - "updated_at": datetime.utcnow() - }, - "$addToSet": { - "monitoring_capabilities": reading.sensor_type.value - } - } - ) - else: - # Create new sensor metadata - metadata = SensorMetadata( - sensor_id=reading.sensor_id, - name=f"Sensor {reading.sensor_id}", - sensor_type=reading.sensor_type, - room=reading.room, - status=SensorStatus.ONLINE, - last_seen=datetime.utcnow(), - monitoring_capabilities=[reading.sensor_type.value] - ) - - await self.db.sensor_metadata.insert_one(metadata.dict()) - logger.info(f"Created metadata for new sensor: {reading.sensor_id}") - - except Exception as e: - logger.error(f"Error updating sensor metadata: {e}") - - async def _update_room_metrics(self, reading: SensorReading): - """Calculate and store room-level metrics""" - if not reading.room: - return - - try: - # Get recent readings for this room (last 5 minutes) - recent_time = datetime.utcnow() - timedelta(minutes=5) - - # Query recent readings for the room - cursor = self.db.sensor_readings.find({ - "room": reading.room, - "created_at": {"$gte": recent_time} - }) - - recent_readings = await cursor.to_list(length=None) - - if not recent_readings: - return - - # Calculate aggregated metrics - metrics = await self._calculate_room_metrics(reading.room, recent_readings) - - # Store in MongoDB - await self.db.room_metrics.insert_one(metrics.dict()) - - # Cache in Redis - await self.redis.set_room_metrics(reading.room, metrics.dict()) - - logger.debug(f"Updated room metrics for {reading.room}") - - except Exception as e: - logger.error(f"Error updating room metrics: {e}") - - async def _calculate_room_metrics(self, room: str, readings: List[Dict]) -> RoomMetrics: - """Calculate aggregated metrics for a room""" - - # Group readings by sensor - sensors_data = {} - for reading in readings: - sensor_id = reading["sensor_id"] - if sensor_id not in sensors_data: - sensors_data[sensor_id] = [] - sensors_data[sensor_id].append(reading) - - # Initialize metrics - energy_values = [] - co2_values = [] - temperature_values = [] - humidity_values = [] - motion_detected = False - - # Extract values from readings - for sensor_readings in sensors_data.values(): - for reading in sensor_readings: - if reading.get("energy"): - energy_values.append(reading["energy"]["value"]) - if reading.get("co2"): - co2_values.append(reading["co2"]["value"]) - if reading.get("temperature"): - temperature_values.append(reading["temperature"]["value"]) - if reading.get("humidity"): - humidity_values.append(reading["humidity"]["value"]) - if reading.get("motion") and reading["motion"].get("value") == "Detected": - motion_detected = True - - # Calculate aggregated metrics - metrics = RoomMetrics( - room=room, - timestamp=int(datetime.utcnow().timestamp()), - sensor_count=len(sensors_data), - active_sensors=list(sensors_data.keys()), - sensor_types=list(set(reading.get("sensor_type") for reading in readings if reading.get("sensor_type"))), - motion_detected=motion_detected - ) - - # Energy metrics - if energy_values: - metrics.energy = { - "current": sum(energy_values), - "average": sum(energy_values) / len(energy_values), - "total": sum(energy_values), - "peak": max(energy_values), - "unit": "kWh" - } - - # CO2 metrics - if co2_values: - avg_co2 = sum(co2_values) / len(co2_values) - metrics.co2 = { - "current": avg_co2, - "average": avg_co2, - "max": max(co2_values), - "min": min(co2_values), - "status": self._get_co2_status(avg_co2).value, - "unit": "ppm" - } - - # Set occupancy estimate based on CO2 - metrics.occupancy_estimate = self._estimate_occupancy(avg_co2) - - # Temperature metrics - if temperature_values: - metrics.temperature = { - "current": sum(temperature_values) / len(temperature_values), - "average": sum(temperature_values) / len(temperature_values), - "max": max(temperature_values), - "min": min(temperature_values), - "unit": "°C" - } - - # Humidity metrics - if humidity_values: - metrics.humidity = { - "current": sum(humidity_values) / len(humidity_values), - "average": sum(humidity_values) / len(humidity_values), - "max": max(humidity_values), - "min": min(humidity_values), - "unit": "%" - } - - return metrics - - def _get_co2_status(self, co2_level: float) -> CO2Status: - """Determine CO2 status based on level""" - if co2_level < 400: - return CO2Status.GOOD - elif co2_level < 1000: - return CO2Status.MODERATE - elif co2_level < 5000: - return CO2Status.POOR - else: - return CO2Status.CRITICAL - - def _estimate_occupancy(self, co2_level: float) -> OccupancyLevel: - """Estimate occupancy level based on CO2""" - if co2_level < 600: - return OccupancyLevel.LOW - elif co2_level < 1200: - return OccupancyLevel.MEDIUM - else: - return OccupancyLevel.HIGH - - async def _check_alerts(self, reading: SensorReading): - """Check for alert conditions and create system events""" - alerts = [] - - # CO2 level alerts - if reading.co2: - co2_level = reading.co2.get("value", 0) - if co2_level > 5000: - alerts.append({ - "event_type": "co2_critical", - "severity": "critical", - "title": "Critical CO2 Level", - "description": f"CO2 level ({co2_level} ppm) exceeds critical threshold in {reading.room or 'unknown room'}" - }) - elif co2_level > 1000: - alerts.append({ - "event_type": "co2_high", - "severity": "warning", - "title": "High CO2 Level", - "description": f"CO2 level ({co2_level} ppm) is above recommended levels in {reading.room or 'unknown room'}" - }) - - # Energy consumption alerts - if reading.energy: - energy_value = reading.energy.get("value", 0) - if energy_value > 10: # Threshold for high energy consumption - alerts.append({ - "event_type": "energy_high", - "severity": "warning", - "title": "High Energy Consumption", - "description": f"Energy consumption ({energy_value} kWh) is unusually high for sensor {reading.sensor_id}" - }) - - # Temperature alerts - if reading.temperature: - temp_value = reading.temperature.get("value", 0) - if temp_value > 30 or temp_value < 15: - alerts.append({ - "event_type": "temperature_extreme", - "severity": "warning", - "title": "Extreme Temperature", - "description": f"Temperature ({temp_value}°C) is outside normal range in {reading.room or 'unknown room'}" - }) - - # Create system events for alerts - for alert in alerts: - await self._log_system_event( - sensor_id=reading.sensor_id, - room=reading.room, - **alert, - data=reading.dict() - ) - - async def _log_system_event(self, event_type: str, severity: str, title: str, description: str, - sensor_id: str = None, room: str = None, source: str = None, data: Dict = None): - """Log a system event""" - try: - event = SystemEvent( - event_id=str(uuid.uuid4()), - event_type=event_type, - severity=severity, - timestamp=int(datetime.utcnow().timestamp()), - title=title, - description=description, - sensor_id=sensor_id, - room=room, - source=source or "data_persistence_service", - data=data or {} - ) - - await self.db.system_events.insert_one(event.dict()) - logger.info(f"System event logged: {event_type} - {title}") - - except Exception as e: - logger.error(f"Error logging system event: {e}") - - async def get_recent_readings(self, sensor_id: str = None, room: str = None, - limit: int = 100, minutes: int = 60) -> List[Dict]: - """Get recent sensor readings""" - try: - # Build query - query = { - "created_at": {"$gte": datetime.utcnow() - timedelta(minutes=minutes)} - } - - if sensor_id: - query["sensor_id"] = sensor_id - if room: - query["room"] = room - - cursor = self.db.sensor_readings.find(query).sort("created_at", -1).limit(limit) - readings = await cursor.to_list(length=limit) - - return readings - - except Exception as e: - logger.error(f"Error getting recent readings: {e}") - return [] - - async def get_sensor_statistics(self) -> Dict[str, Any]: - """Get overall sensor statistics""" - try: - stats = {} - - # Total readings count - stats["total_readings"] = await self.db.sensor_readings.count_documents({}) - - # Active sensors (sensors that sent data in last 24 hours) - recent_time = datetime.utcnow() - timedelta(hours=24) - active_sensors = await self.db.sensor_readings.distinct("sensor_id", { - "created_at": {"$gte": recent_time} - }) - stats["active_sensors"] = len(active_sensors) - - # Total registered sensors - stats["total_sensors"] = await self.db.sensor_metadata.count_documents({}) - - # Readings in last 24 hours - stats["recent_readings"] = await self.db.sensor_readings.count_documents({ - "created_at": {"$gte": recent_time} - }) - - # Room count - stats["total_rooms"] = len(await self.db.sensor_readings.distinct("room", {"room": {"$ne": None}})) - - return stats - - except Exception as e: - logger.error(f"Error getting sensor statistics: {e}") - return {} - -# Global persistence service instance -persistence_service = DataPersistenceService() \ No newline at end of file diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index 16857a3..0000000 --- a/requirements.txt +++ /dev/null @@ -1,10 +0,0 @@ -fastapi -uvicorn[standard] -redis -websockets -pymongo -motor -python-dotenv -pandas -numpy -pydantic \ No newline at end of file