diff --git a/microservices/sensor-service/analytics_service.py b/microservices/sensor-service/analytics_service.py new file mode 100644 index 0000000..3ae02cc --- /dev/null +++ b/microservices/sensor-service/analytics_service.py @@ -0,0 +1,377 @@ +""" +Analytics service for processing sensor data and generating insights +""" + +import logging +from datetime import datetime, timedelta +from typing import List, Dict, Any, Optional +import json + +logger = logging.getLogger(__name__) + +class AnalyticsService: + """Service for analytics and data processing""" + + def __init__(self, db, redis_client): + self.db = db + self.redis = redis_client + + async def query_data(self, query_params) -> Dict[str, Any]: + """Execute advanced data query""" + try: + # Build query + query = {} + + if hasattr(query_params, 'sensor_ids') and query_params.sensor_ids: + query["sensor_id"] = {"$in": query_params.sensor_ids} + + if hasattr(query_params, 'start_time') and query_params.start_time: + query.setdefault("timestamp", {})["$gte"] = query_params.start_time + + if hasattr(query_params, 'end_time') and query_params.end_time: + query.setdefault("timestamp", {})["$lte"] = query_params.end_time + + # Execute query + cursor = self.db.sensor_readings.find(query) + + if hasattr(query_params, 'limit') and query_params.limit: + cursor = cursor.limit(query_params.limit) + + if hasattr(query_params, 'offset') and query_params.offset: + cursor = cursor.skip(query_params.offset) + + cursor = cursor.sort("timestamp", -1) + + # Get results + results = [] + async for reading in cursor: + reading["_id"] = str(reading["_id"]) + results.append(reading) + + # Get total count + total_count = await self.db.sensor_readings.count_documents(query) + + return { + "data": results, + "total_count": total_count, + "query": query_params.__dict__ if hasattr(query_params, '__dict__') else {}, + "execution_time_ms": 0 # Placeholder + } + + except Exception as e: + logger.error(f"Error executing data query: {e}") + raise + + async def get_analytics_summary(self, hours: int = 24) -> Dict[str, Any]: + """Get comprehensive analytics summary""" + try: + start_time = datetime.utcnow() - timedelta(hours=hours) + + # Get basic statistics + pipeline = [ + { + "$match": { + "created_at": {"$gte": start_time} + } + }, + { + "$group": { + "_id": None, + "total_readings": {"$sum": 1}, + "average_value": {"$avg": "$value"}, + "min_value": {"$min": "$value"}, + "max_value": {"$max": "$value"}, + "unique_sensors": {"$addToSet": "$sensor_id"} + } + } + ] + + cursor = self.db.sensor_readings.aggregate(pipeline) + stats = await cursor.to_list(length=1) + + base_stats = stats[0] if stats else { + "total_readings": 0, + "average_value": 0, + "min_value": 0, + "max_value": 0, + "unique_sensors": [] + } + + # Get room-level statistics + room_stats = await self._get_room_analytics(hours) + + # Get energy trends + energy_trends = await self._get_energy_trends(hours) + + return { + "period_hours": hours, + "timestamp": datetime.utcnow().isoformat(), + "total_readings": base_stats["total_readings"], + "unique_sensors": len(base_stats["unique_sensors"]), + "value_statistics": { + "average": round(base_stats["average_value"], 2) if base_stats["average_value"] else 0, + "minimum": base_stats["min_value"], + "maximum": base_stats["max_value"] + }, + "room_statistics": room_stats, + "energy_trends": energy_trends + } + + except Exception as e: + logger.error(f"Error getting analytics summary: {e}") + raise + + async def get_energy_analytics(self, hours: int = 24, room: Optional[str] = None) -> Dict[str, Any]: + """Get energy-specific analytics""" + try: + start_time = datetime.utcnow() - timedelta(hours=hours) + + # Build query + query = {"created_at": {"$gte": start_time}} + if room: + query["room"] = room + + # Energy consumption over time + pipeline = [ + {"$match": query}, + { + "$group": { + "_id": { + "hour": {"$hour": "$created_at"}, + "date": {"$dateToString": {"format": "%Y-%m-%d", "date": "$created_at"}} + }, + "total_energy": {"$sum": "$value"}, + "reading_count": {"$sum": 1} + } + }, + {"$sort": {"_id.date": 1, "_id.hour": 1}} + ] + + cursor = self.db.sensor_readings.aggregate(pipeline) + hourly_data = [] + + async for data in cursor: + hourly_data.append({ + "hour": data["_id"]["hour"], + "date": data["_id"]["date"], + "total_energy": data["total_energy"], + "reading_count": data["reading_count"] + }) + + # Peak consumption analysis + peak_analysis = await self._get_peak_consumption_analysis(query) + + # Energy efficiency metrics + efficiency_metrics = await self._get_efficiency_metrics(query) + + return { + "period_hours": hours, + "room": room, + "timestamp": datetime.utcnow().isoformat(), + "hourly_consumption": hourly_data, + "peak_analysis": peak_analysis, + "efficiency_metrics": efficiency_metrics, + "total_consumption": sum(item["total_energy"] for item in hourly_data) + } + + except Exception as e: + logger.error(f"Error getting energy analytics: {e}") + raise + + async def _get_room_analytics(self, hours: int) -> Dict[str, Any]: + """Get room-level analytics""" + try: + start_time = datetime.utcnow() - timedelta(hours=hours) + + pipeline = [ + { + "$match": { + "created_at": {"$gte": start_time}, + "room": {"$ne": None} + } + }, + { + "$group": { + "_id": "$room", + "total_readings": {"$sum": 1}, + "total_energy": {"$sum": "$value"}, + "average_energy": {"$avg": "$value"}, + "unique_sensors": {"$addToSet": "$sensor_id"} + } + }, + {"$sort": {"total_energy": -1}} + ] + + cursor = self.db.sensor_readings.aggregate(pipeline) + room_data = [] + + async for room in cursor: + room_data.append({ + "room": room["_id"], + "total_readings": room["total_readings"], + "total_energy": room["total_energy"], + "average_energy": round(room["average_energy"], 2), + "sensor_count": len(room["unique_sensors"]) + }) + + return { + "by_room": room_data, + "total_rooms": len(room_data) + } + + except Exception as e: + logger.error(f"Error getting room analytics: {e}") + return {"by_room": [], "total_rooms": 0} + + async def _get_energy_trends(self, hours: int) -> Dict[str, Any]: + """Get energy consumption trends""" + try: + start_time = datetime.utcnow() - timedelta(hours=hours) + + # Get current period data + current_query = {"created_at": {"$gte": start_time}} + current_cursor = self.db.sensor_readings.aggregate([ + {"$match": current_query}, + {"$group": {"_id": None, "total": {"$sum": "$value"}, "count": {"$sum": 1}}} + ]) + current_data = await current_cursor.to_list(length=1) + current_total = current_data[0]["total"] if current_data else 0 + current_count = current_data[0]["count"] if current_data else 0 + + # Get previous period for comparison + previous_start = start_time - timedelta(hours=hours) + previous_query = { + "created_at": {"$gte": previous_start, "$lt": start_time} + } + previous_cursor = self.db.sensor_readings.aggregate([ + {"$match": previous_query}, + {"$group": {"_id": None, "total": {"$sum": "$value"}, "count": {"$sum": 1}}} + ]) + previous_data = await previous_cursor.to_list(length=1) + previous_total = previous_data[0]["total"] if previous_data else 0 + + # Calculate trend + if previous_total > 0: + trend_percentage = ((current_total - previous_total) / previous_total) * 100 + else: + trend_percentage = 0 + + return { + "current_period": { + "total_energy": current_total, + "reading_count": current_count, + "average_per_reading": current_total / current_count if current_count > 0 else 0 + }, + "previous_period": { + "total_energy": previous_total + }, + "trend": { + "percentage_change": round(trend_percentage, 2), + "direction": "up" if trend_percentage > 0 else "down" if trend_percentage < 0 else "stable" + } + } + + except Exception as e: + logger.error(f"Error getting energy trends: {e}") + return {} + + async def _get_peak_consumption_analysis(self, base_query: Dict[str, Any]) -> Dict[str, Any]: + """Analyze peak consumption patterns""" + try: + pipeline = [ + {"$match": base_query}, + { + "$group": { + "_id": {"$hour": "$created_at"}, + "total_consumption": {"$sum": "$value"}, + "reading_count": {"$sum": 1} + } + }, + {"$sort": {"total_consumption": -1}} + ] + + cursor = self.db.sensor_readings.aggregate(pipeline) + hourly_consumption = await cursor.to_list(length=None) + + if not hourly_consumption: + return {"peak_hour": None, "peak_consumption": 0, "hourly_pattern": []} + + peak_data = hourly_consumption[0] + + return { + "peak_hour": peak_data["_id"], + "peak_consumption": peak_data["total_consumption"], + "hourly_pattern": [ + { + "hour": item["_id"], + "consumption": item["total_consumption"], + "reading_count": item["reading_count"] + } + for item in hourly_consumption + ] + } + + except Exception as e: + logger.error(f"Error analyzing peak consumption: {e}") + return {"peak_hour": None, "peak_consumption": 0, "hourly_pattern": []} + + async def _get_efficiency_metrics(self, base_query: Dict[str, Any]) -> Dict[str, Any]: + """Calculate energy efficiency metrics""" + try: + # Average consumption per sensor + pipeline = [ + {"$match": base_query}, + { + "$group": { + "_id": "$sensor_id", + "total_consumption": {"$sum": "$value"}, + "reading_count": {"$sum": 1} + } + }, + { + "$group": { + "_id": None, + "average_per_sensor": {"$avg": "$total_consumption"}, + "sensor_count": {"$sum": 1}, + "min_consumption": {"$min": "$total_consumption"}, + "max_consumption": {"$max": "$total_consumption"} + } + } + ] + + cursor = self.db.sensor_readings.aggregate(pipeline) + efficiency_data = await cursor.to_list(length=1) + + if not efficiency_data: + return { + "average_per_sensor": 0, + "sensor_count": 0, + "efficiency_score": 0, + "variation_coefficient": 0 + } + + data = efficiency_data[0] + + # Calculate efficiency score (lower variation = higher efficiency) + if data["average_per_sensor"] > 0: + variation_coefficient = (data["max_consumption"] - data["min_consumption"]) / data["average_per_sensor"] + efficiency_score = max(0, 100 - (variation_coefficient * 10)) # Scale to 0-100 + else: + variation_coefficient = 0 + efficiency_score = 100 + + return { + "average_per_sensor": round(data["average_per_sensor"], 2), + "sensor_count": data["sensor_count"], + "efficiency_score": round(efficiency_score, 1), + "variation_coefficient": round(variation_coefficient, 2) + } + + except Exception as e: + logger.error(f"Error calculating efficiency metrics: {e}") + return { + "average_per_sensor": 0, + "sensor_count": 0, + "efficiency_score": 0, + "variation_coefficient": 0 + } \ No newline at end of file diff --git a/microservices/sensor-service/database.py b/microservices/sensor-service/database.py new file mode 100644 index 0000000..096b063 --- /dev/null +++ b/microservices/sensor-service/database.py @@ -0,0 +1,66 @@ +""" +Database connection and management for sensor service +""" + +import asyncio +import logging +from motor.motor_asyncio import AsyncIOMotorClient +import redis.asyncio as redis +from typing import Optional +import os + +logger = logging.getLogger(__name__) + +# Global database connections +mongo_client: Optional[AsyncIOMotorClient] = None +redis_client: Optional[redis.Redis] = None +database = None + +async def connect_to_mongo(): + """Connect to MongoDB""" + global mongo_client, database + + try: + mongo_url = os.getenv("MONGO_URL", "mongodb://admin:password123@mongodb:27017/energy_dashboard_sensors?authSource=admin") + + mongo_client = AsyncIOMotorClient(mongo_url) + database = mongo_client.energy_dashboard_sensors + + # Test connection + await mongo_client.admin.command('ping') + logger.info("Connected to MongoDB successfully") + + except Exception as e: + logger.error(f"Failed to connect to MongoDB: {e}") + raise + +async def close_mongo_connection(): + """Close MongoDB connection""" + global mongo_client + if mongo_client: + mongo_client.close() + logger.info("Closed MongoDB connection") + +async def connect_to_redis(): + """Connect to Redis""" + global redis_client + + try: + redis_url = os.getenv("REDIS_URL", "redis://redis:6379") + redis_client = redis.from_url(redis_url, decode_responses=True) + + # Test connection + await redis_client.ping() + logger.info("Connected to Redis successfully") + + except Exception as e: + logger.error(f"Failed to connect to Redis: {e}") + raise + +async def get_database(): + """Get database instance""" + return database + +async def get_redis(): + """Get Redis client instance""" + return redis_client \ No newline at end of file diff --git a/microservices/sensor-service/main.py b/microservices/sensor-service/main.py index 51c9eac..04de04a 100644 --- a/microservices/sensor-service/main.py +++ b/microservices/sensor-service/main.py @@ -16,7 +16,8 @@ import json from models import ( SensorReading, SensorMetadata, RoomMetrics, SystemEvent, DataQuery, DataResponse, - SensorType, SensorStatus, CO2Status, OccupancyLevel, HealthResponse + SensorType, SensorStatus, CO2Status, OccupancyLevel, HealthResponse, + Room, RoomCreate, RoomUpdate, RoomInfo ) from database import connect_to_mongo, close_mongo_connection, get_database, connect_to_redis, get_redis from sensor_service import SensorService @@ -38,6 +39,12 @@ async def lifespan(app: FastAPI): await connect_to_mongo() await connect_to_redis() + # Initialize default rooms if none exist + db = await get_database() + redis_client = await get_redis() + room_service = RoomService(db, redis_client) + await room_service.initialize_default_rooms() + # Start background tasks asyncio.create_task(redis_subscriber_task()) asyncio.create_task(room_metrics_aggregation_task()) @@ -250,6 +257,19 @@ async def delete_sensor( raise HTTPException(status_code=500, detail="Internal server error") # Room Management +@app.get("/rooms/names") +async def get_room_names(service: RoomService = Depends(get_room_service)): + """Get simple list of room names for dropdowns""" + try: + room_names = await service.get_all_room_names() + return { + "rooms": room_names, + "count": len(room_names) + } + except Exception as e: + logger.error(f"Error getting room names: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + @app.get("/rooms") async def get_rooms(service: RoomService = Depends(get_room_service)): """Get all rooms with sensor counts and metrics""" @@ -265,16 +285,16 @@ async def get_rooms(service: RoomService = Depends(get_room_service)): @app.post("/rooms") async def create_room( - room_data: dict, + room_data: RoomCreate, service: RoomService = Depends(get_room_service) ): """Create a new room""" try: - result = await service.create_room(room_data) + result = await service.create_room(room_data.dict()) return { "message": "Room created successfully", - "room": room_data.get("name"), - "created_at": result.get("created_at") + "room": result["name"], + "created_at": result["created_at"] } except ValueError as e: raise HTTPException(status_code=400, detail=str(e)) @@ -282,6 +302,40 @@ async def create_room( logger.error(f"Error creating room: {e}") raise HTTPException(status_code=500, detail="Internal server error") +@app.put("/rooms/{room_name}") +async def update_room( + room_name: str, + room_data: RoomUpdate, + service: RoomService = Depends(get_room_service) +): + """Update an existing room""" + try: + result = await service.update_room(room_name, room_data.dict(exclude_unset=True)) + return { + "message": "Room updated successfully", + "room": result["name"], + "updated_at": result["updated_at"], + "modified": result["modified"] + } + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) + except Exception as e: + logger.error(f"Error updating room {room_name}: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + +@app.delete("/rooms/{room_name}") +async def delete_room(room_name: str, service: RoomService = Depends(get_room_service)): + """Delete a room""" + try: + result = await service.delete_room(room_name) + return { + "message": "Room deleted successfully", + **result + } + except Exception as e: + logger.error(f"Error deleting room {room_name}: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + @app.get("/rooms/{room_name}") async def get_room(room_name: str, service: RoomService = Depends(get_room_service)): """Get detailed room information""" diff --git a/microservices/sensor-service/models.py b/microservices/sensor-service/models.py index cf5eb93..959f454 100644 --- a/microservices/sensor-service/models.py +++ b/microservices/sensor-service/models.py @@ -296,19 +296,82 @@ class AnalyticsSummary(BaseModel): datetime: lambda v: v.isoformat() } +# Room Management Models +class Room(BaseModel): + """Room model for database storage and API responses""" + name: str = Field(..., description="Unique room name") + description: Optional[str] = Field(None, description="Room description") + floor: Optional[str] = Field(None, description="Floor designation") + building: Optional[str] = Field(None, description="Building name") + area: Optional[float] = Field(None, description="Room area in square meters") + capacity: Optional[int] = Field(None, description="Maximum occupancy") + room_type: Optional[str] = Field(None, description="Room type (office, meeting, storage, etc.)") + + # Metadata + created_at: datetime = Field(default_factory=datetime.utcnow, description="Room creation timestamp") + updated_at: datetime = Field(default_factory=datetime.utcnow, description="Room update timestamp") + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() if v else None + } + +class RoomCreate(BaseModel): + """Model for creating new rooms""" + name: str = Field(..., description="Unique room name", min_length=1, max_length=100) + description: Optional[str] = Field(None, description="Room description", max_length=500) + floor: Optional[str] = Field(None, description="Floor designation", max_length=50) + building: Optional[str] = Field(None, description="Building name", max_length=100) + area: Optional[float] = Field(None, description="Room area in square meters", gt=0) + capacity: Optional[int] = Field(None, description="Maximum occupancy", gt=0) + room_type: Optional[str] = Field(None, description="Room type", max_length=50) + +class RoomUpdate(BaseModel): + """Model for updating existing rooms""" + description: Optional[str] = Field(None, description="Room description", max_length=500) + floor: Optional[str] = Field(None, description="Floor designation", max_length=50) + building: Optional[str] = Field(None, description="Building name", max_length=100) + area: Optional[float] = Field(None, description="Room area in square meters", gt=0) + capacity: Optional[int] = Field(None, description="Maximum occupancy", gt=0) + room_type: Optional[str] = Field(None, description="Room type", max_length=50) + +class RoomInfo(BaseModel): + """Comprehensive room information for API responses""" + name: str = Field(..., description="Room name") + description: Optional[str] = Field(None, description="Room description") + floor: Optional[str] = Field(None, description="Floor designation") + building: Optional[str] = Field(None, description="Building name") + area: Optional[float] = Field(None, description="Room area in square meters") + capacity: Optional[int] = Field(None, description="Maximum occupancy") + room_type: Optional[str] = Field(None, description="Room type") + + # Runtime information + sensor_count: int = Field(0, description="Number of sensors in room") + active_sensors: int = Field(0, description="Number of active sensors") + last_updated: Optional[datetime] = Field(None, description="Last metrics update") + + # Timestamps + created_at: datetime = Field(..., description="Room creation timestamp") + updated_at: datetime = Field(..., description="Room update timestamp") + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() if v else None + } + class HealthResponse(BaseModel): """Health check response""" service: str status: str timestamp: datetime version: str - + # Additional service-specific health metrics total_sensors: Optional[int] = None active_sensors: Optional[int] = None total_rooms: Optional[int] = None websocket_connections: Optional[int] = None - + class Config: json_encoders = { datetime: lambda v: v.isoformat() diff --git a/microservices/sensor-service/room_service.py b/microservices/sensor-service/room_service.py new file mode 100644 index 0000000..d952427 --- /dev/null +++ b/microservices/sensor-service/room_service.py @@ -0,0 +1,467 @@ +""" +Room service for managing rooms and room-level metrics +""" + +import logging +from datetime import datetime, timedelta +from typing import List, Dict, Any, Optional +import json + +logger = logging.getLogger(__name__) + +class RoomService: + """Service for managing rooms and room-level analytics""" + + def __init__(self, db, redis_client): + self.db = db + self.redis = redis_client + + async def get_all_room_names(self) -> List[str]: + """Get a simple list of all room names for dropdowns/selections""" + try: + # Get rooms from the rooms collection + room_cursor = self.db.rooms.find({}, {"name": 1}) + room_names = set() + + async for room in room_cursor: + room_names.add(room["name"]) + + # Also get rooms that exist only in sensor data (legacy support) + sensor_cursor = self.db.sensors.find( + {"room": {"$ne": None, "$exists": True}}, + {"room": 1} + ) + + async for sensor in sensor_cursor: + if sensor.get("room"): + room_names.add(sensor["room"]) + + # Convert to sorted list + return sorted(list(room_names)) + + except Exception as e: + logger.error(f"Error getting room names: {e}") + raise + + async def initialize_default_rooms(self) -> None: + """Initialize default rooms if none exist""" + try: + # Check if any rooms exist + room_count = await self.db.rooms.count_documents({}) + + if room_count == 0: + # Create default rooms + default_rooms = [ + {"name": "Conference Room A", "description": "Main conference room", "room_type": "meeting"}, + {"name": "Conference Room B", "description": "Secondary conference room", "room_type": "meeting"}, + {"name": "Office Floor 1", "description": "First floor office space", "room_type": "office"}, + {"name": "Office Floor 2", "description": "Second floor office space", "room_type": "office"}, + {"name": "Kitchen", "description": "Employee kitchen and break room", "room_type": "common"}, + {"name": "Lobby", "description": "Main entrance and reception", "room_type": "common"}, + {"name": "Server Room", "description": "IT equipment room", "room_type": "technical"}, + {"name": "Storage Room", "description": "General storage", "room_type": "storage"}, + {"name": "Meeting Room 1", "description": "Small meeting room", "room_type": "meeting"}, + {"name": "Meeting Room 2", "description": "Small meeting room", "room_type": "meeting"} + ] + + for room_data in default_rooms: + room_doc = { + **room_data, + "created_at": datetime.utcnow(), + "updated_at": datetime.utcnow() + } + await self.db.rooms.insert_one(room_doc) + + logger.info(f"Initialized {len(default_rooms)} default rooms") + + except Exception as e: + logger.error(f"Error initializing default rooms: {e}") + raise + + async def get_rooms(self) -> List[Dict[str, Any]]: + """Get all rooms with sensor counts and metrics""" + try: + # Get unique rooms from sensors + pipeline = [ + {"$group": {"_id": "$room", "sensor_count": {"$sum": 1}}}, + {"$match": {"_id": {"$ne": None}}} + ] + + cursor = self.db.sensors.aggregate(pipeline) + rooms = [] + + async for room_data in cursor: + room_name = room_data["_id"] + + # Get latest room metrics + latest_metrics = await self._get_latest_room_metrics(room_name) + + room_info = { + "name": room_name, + "sensor_count": room_data["sensor_count"], + "latest_metrics": latest_metrics, + "last_updated": latest_metrics.get("timestamp") if latest_metrics else None + } + + rooms.append(room_info) + + return rooms + + except Exception as e: + logger.error(f"Error getting rooms: {e}") + raise + + async def create_room(self, room_data: Dict[str, Any]) -> Dict[str, Any]: + """Create a new room""" + try: + room_doc = { + "name": room_data.get("name"), + "description": room_data.get("description", ""), + "floor": room_data.get("floor"), + "building": room_data.get("building"), + "area": room_data.get("area"), + "capacity": room_data.get("capacity"), + "room_type": room_data.get("room_type"), + "created_at": datetime.utcnow(), + "updated_at": datetime.utcnow() + } + + # Validate required fields + if not room_doc["name"] or not room_doc["name"].strip(): + raise ValueError("Room name is required") + + # Check if room already exists + existing = await self.db.rooms.find_one({"name": room_doc["name"]}) + if existing: + raise ValueError(f"Room {room_doc['name']} already exists") + + result = await self.db.rooms.insert_one(room_doc) + + return { + "id": str(result.inserted_id), + "name": room_doc["name"], + "created_at": room_doc["created_at"] + } + + except Exception as e: + logger.error(f"Error creating room: {e}") + raise + + async def update_room(self, room_name: str, room_data: Dict[str, Any]) -> Dict[str, Any]: + """Update an existing room""" + try: + # Check if room exists + existing = await self.db.rooms.find_one({"name": room_name}) + if not existing: + raise ValueError(f"Room {room_name} not found") + + # Prepare update document + update_doc = { + "updated_at": datetime.utcnow() + } + + # Update only provided fields + for field in ["description", "floor", "building", "area", "capacity", "room_type"]: + if field in room_data and room_data[field] is not None: + update_doc[field] = room_data[field] + + # Perform update + result = await self.db.rooms.update_one( + {"name": room_name}, + {"$set": update_doc} + ) + + if result.modified_count == 0: + logger.warning(f"No changes made to room {room_name}") + + return { + "name": room_name, + "updated_at": update_doc["updated_at"], + "modified": result.modified_count > 0 + } + + except Exception as e: + logger.error(f"Error updating room: {e}") + raise + + async def delete_room(self, room_name: str) -> Dict[str, Any]: + """Delete a room and optionally reassign sensors""" + try: + # Check if room exists + existing = await self.db.rooms.find_one({"name": room_name}) + + # Check for sensors in this room + sensors_in_room = await self.db.sensors.find({"room": room_name}).to_list(None) + + if sensors_in_room: + # Update sensors to have null room (don't delete sensors) + await self.db.sensors.update_many( + {"room": room_name}, + {"$unset": {"room": ""}} + ) + + # Delete room from rooms collection if it exists + room_deleted = False + if existing: + result = await self.db.rooms.delete_one({"name": room_name}) + room_deleted = result.deleted_count > 0 + + # Delete room metrics + metrics_result = await self.db.room_metrics.delete_many({"room": room_name}) + + return { + "room": room_name, + "room_deleted": room_deleted, + "sensors_updated": len(sensors_in_room), + "metrics_deleted": metrics_result.deleted_count + } + + except Exception as e: + logger.error(f"Error deleting room: {e}") + raise + + async def get_room_details(self, room_name: str) -> Optional[Dict[str, Any]]: + """Get detailed room information""" + try: + # Get room info + room = await self.db.rooms.find_one({"name": room_name}) + + if not room: + # Create basic room info from sensor data + sensors = await self.db.sensors.find({"room": room_name}).to_list(None) + if not sensors: + return None + + room = { + "name": room_name, + "description": f"Room with {len(sensors)} sensors", + "sensor_count": len(sensors) + } + else: + room["_id"] = str(room["_id"]) + + # Get sensor count + sensor_count = await self.db.sensors.count_documents({"room": room_name}) + room["sensor_count"] = sensor_count + + # Get sensors in this room + cursor = self.db.sensors.find({"room": room_name}) + sensors = [] + async for sensor in cursor: + sensor["_id"] = str(sensor["_id"]) + sensors.append(sensor) + + room["sensors"] = sensors + + # Get recent room metrics + room["recent_metrics"] = await self._get_recent_room_metrics(room_name, hours=24) + + return room + + except Exception as e: + logger.error(f"Error getting room details: {e}") + raise + + async def get_room_data(self, room_name: str, start_time: Optional[int] = None, + end_time: Optional[int] = None, limit: int = 100) -> Dict[str, Any]: + """Get historical data for a room""" + try: + # Get room metrics + room_query = {"room": room_name} + if start_time or end_time: + room_query["timestamp"] = {} + if start_time: + room_query["timestamp"]["$gte"] = start_time + if end_time: + room_query["timestamp"]["$lte"] = end_time + + room_metrics_cursor = self.db.room_metrics.find(room_query).sort("timestamp", -1).limit(limit) + room_metrics = [] + async for metric in room_metrics_cursor: + metric["_id"] = str(metric["_id"]) + room_metrics.append(metric) + + # Get sensor readings for this room + sensor_query = {"room": room_name} + if start_time or end_time: + sensor_query["timestamp"] = {} + if start_time: + sensor_query["timestamp"]["$gte"] = start_time + if end_time: + sensor_query["timestamp"]["$lte"] = end_time + + sensor_readings_cursor = self.db.sensor_readings.find(sensor_query).sort("timestamp", -1).limit(limit) + sensor_readings = [] + async for reading in sensor_readings_cursor: + reading["_id"] = str(reading["_id"]) + sensor_readings.append(reading) + + return { + "room_metrics": room_metrics, + "sensor_readings": sensor_readings + } + + except Exception as e: + logger.error(f"Error getting room data: {e}") + raise + + async def update_room_metrics(self, sensor_data): + """Update room-level metrics when sensor data is received""" + try: + if not sensor_data.room: + return + + # Calculate room-level aggregates + room_metrics = await self._calculate_room_metrics(sensor_data.room) + + if room_metrics: + # Store room metrics + metrics_doc = { + "room": sensor_data.room, + "timestamp": sensor_data.timestamp, + "total_energy": room_metrics.get("total_energy", 0), + "average_temperature": room_metrics.get("avg_temperature"), + "co2_level": room_metrics.get("co2_level"), + "occupancy_estimate": room_metrics.get("occupancy_estimate"), + "sensor_count": room_metrics.get("sensor_count", 0), + "created_at": datetime.utcnow() + } + + await self.db.room_metrics.insert_one(metrics_doc) + + # Cache latest metrics + if self.redis: + cache_key = f"room:{sensor_data.room}:latest_metrics" + await self.redis.setex(cache_key, 3600, json.dumps(metrics_doc, default=str)) + + except Exception as e: + logger.error(f"Error updating room metrics: {e}") + + async def aggregate_all_room_metrics(self): + """Aggregate metrics for all rooms""" + try: + # Get all unique rooms + pipeline = [{"$group": {"_id": "$room"}}] + cursor = self.db.sensors.aggregate(pipeline) + + async for room_data in cursor: + room_name = room_data["_id"] + if room_name: + await self._calculate_room_metrics(room_name) + + except Exception as e: + logger.error(f"Error aggregating room metrics: {e}") + + async def _get_latest_room_metrics(self, room_name: str) -> Optional[Dict[str, Any]]: + """Get latest room metrics""" + try: + # Try Redis cache first + if self.redis: + cache_key = f"room:{room_name}:latest_metrics" + cached = await self.redis.get(cache_key) + if cached: + return json.loads(cached) + + # Fall back to database + latest = await self.db.room_metrics.find_one( + {"room": room_name}, + sort=[("timestamp", -1)] + ) + + if latest: + latest["_id"] = str(latest["_id"]) + return latest + + return None + + except Exception as e: + logger.error(f"Error getting latest room metrics: {e}") + return None + + async def _get_recent_room_metrics(self, room_name: str, hours: int = 24) -> List[Dict[str, Any]]: + """Get recent room metrics""" + try: + start_time = datetime.utcnow() - timedelta(hours=hours) + + cursor = self.db.room_metrics.find({ + "room": room_name, + "created_at": {"$gte": start_time} + }).sort("timestamp", -1) + + metrics = [] + async for metric in cursor: + metric["_id"] = str(metric["_id"]) + metrics.append(metric) + + return metrics + + except Exception as e: + logger.error(f"Error getting recent room metrics: {e}") + return [] + + async def _calculate_room_metrics(self, room_name: str) -> Dict[str, Any]: + """Calculate aggregated metrics for a room""" + try: + # Get recent sensor readings (last 5 minutes) + five_minutes_ago = datetime.utcnow() - timedelta(minutes=5) + + pipeline = [ + { + "$match": { + "room": room_name, + "created_at": {"$gte": five_minutes_ago} + } + }, + { + "$group": { + "_id": "$sensor_id", + "latest_value": {"$last": "$value"}, + "sensor_type": {"$last": "$sensor_type"} if "sensor_type" in ["$first", "$last"] else {"$first": "energy"}, + "unit": {"$last": "$unit"} + } + } + ] + + cursor = self.db.sensor_readings.aggregate(pipeline) + + total_energy = 0 + temperatures = [] + co2_levels = [] + sensor_count = 0 + + async for sensor_data in cursor: + sensor_count += 1 + value = sensor_data.get("latest_value", 0) + sensor_type = sensor_data.get("sensor_type", "energy") + + if sensor_type == "energy" or "energy" in str(sensor_data.get("unit", "")).lower(): + total_energy += value + elif sensor_type == "temperature": + temperatures.append(value) + elif sensor_type == "co2": + co2_levels.append(value) + + metrics = { + "total_energy": total_energy, + "sensor_count": sensor_count, + "avg_temperature": sum(temperatures) / len(temperatures) if temperatures else None, + "co2_level": sum(co2_levels) / len(co2_levels) if co2_levels else None, + "occupancy_estimate": self._estimate_occupancy(sensor_count, total_energy) + } + + return metrics + + except Exception as e: + logger.error(f"Error calculating room metrics: {e}") + return {} + + def _estimate_occupancy(self, sensor_count: int, total_energy: float) -> Optional[str]: + """Estimate occupancy level based on energy consumption""" + if total_energy == 0: + return "vacant" + elif total_energy < sensor_count * 50: # Low threshold + return "low" + elif total_energy < sensor_count * 150: # Medium threshold + return "medium" + else: + return "high" \ No newline at end of file diff --git a/microservices/sensor-service/sensor_service.py b/microservices/sensor-service/sensor_service.py new file mode 100644 index 0000000..abc0e55 --- /dev/null +++ b/microservices/sensor-service/sensor_service.py @@ -0,0 +1,251 @@ +""" +Sensor service business logic +""" + +import logging +from datetime import datetime, timedelta +from typing import List, Dict, Any, Optional +import json + +logger = logging.getLogger(__name__) + +class SensorService: + """Service for managing sensors and sensor data""" + + def __init__(self, db, redis_client): + self.db = db + self.redis = redis_client + + async def get_sensors(self, room: Optional[str] = None, sensor_type: Optional[str] = None, status: Optional[str] = None) -> List[Dict[str, Any]]: + """Get sensors with optional filtering""" + try: + query = {} + + if room: + query["room"] = room + if sensor_type: + query["sensor_type"] = sensor_type + if status: + query["status"] = status + + cursor = self.db.sensors.find(query) + sensors = [] + + async for sensor in cursor: + sensor["_id"] = str(sensor["_id"]) + sensors.append(sensor) + + return sensors + + except Exception as e: + logger.error(f"Error getting sensors: {e}") + raise + + async def get_sensor_details(self, sensor_id: str) -> Optional[Dict[str, Any]]: + """Get detailed sensor information""" + try: + sensor = await self.db.sensors.find_one({"sensor_id": sensor_id}) + + if sensor: + sensor["_id"] = str(sensor["_id"]) + + # Get recent readings + recent_readings = await self.get_sensor_data(sensor_id, limit=10) + sensor["recent_readings"] = recent_readings.get("readings", []) + + return sensor + + return None + + except Exception as e: + logger.error(f"Error getting sensor details: {e}") + raise + + async def get_sensor_data(self, sensor_id: str, start_time: Optional[int] = None, + end_time: Optional[int] = None, limit: int = 100, offset: int = 0) -> Dict[str, Any]: + """Get historical sensor data""" + try: + query = {"sensor_id": sensor_id} + + if start_time or end_time: + query["timestamp"] = {} + if start_time: + query["timestamp"]["$gte"] = start_time + if end_time: + query["timestamp"]["$lte"] = end_time + + # Get total count + total_count = await self.db.sensor_readings.count_documents(query) + + # Get readings + cursor = self.db.sensor_readings.find(query).sort("timestamp", -1).skip(offset).limit(limit) + readings = [] + + async for reading in cursor: + reading["_id"] = str(reading["_id"]) + readings.append(reading) + + return { + "readings": readings, + "total_count": total_count, + "execution_time_ms": 0 # Placeholder + } + + except Exception as e: + logger.error(f"Error getting sensor data: {e}") + raise + + async def create_sensor(self, sensor_data) -> Dict[str, Any]: + """Create a new sensor""" + try: + # Check if sensor already exists + existing = await self.db.sensors.find_one({"sensor_id": sensor_data.sensor_id}) + if existing: + raise ValueError(f"Sensor {sensor_data.sensor_id} already exists") + + # Create sensor document + sensor_doc = { + "sensor_id": sensor_data.sensor_id, + "name": sensor_data.name, + "sensor_type": sensor_data.sensor_type.value if hasattr(sensor_data.sensor_type, 'value') else str(sensor_data.sensor_type), + "room": sensor_data.room, + "location": sensor_data.location if hasattr(sensor_data, 'location') else None, + "status": "active", + "created_at": datetime.utcnow(), + "updated_at": datetime.utcnow() + } + + result = await self.db.sensors.insert_one(sensor_doc) + + return {"created_at": datetime.utcnow()} + + except Exception as e: + logger.error(f"Error creating sensor: {e}") + raise + + async def update_sensor(self, sensor_id: str, update_data: Dict[str, Any]) -> bool: + """Update sensor metadata""" + try: + update_data["updated_at"] = datetime.utcnow() + + result = await self.db.sensors.update_one( + {"sensor_id": sensor_id}, + {"$set": update_data} + ) + + return result.modified_count > 0 + + except Exception as e: + logger.error(f"Error updating sensor: {e}") + raise + + async def delete_sensor(self, sensor_id: str) -> Dict[str, Any]: + """Delete a sensor and its data""" + try: + # Delete readings + readings_result = await self.db.sensor_readings.delete_many({"sensor_id": sensor_id}) + + # Delete sensor + await self.db.sensors.delete_one({"sensor_id": sensor_id}) + + return {"readings_deleted": readings_result.deleted_count} + + except Exception as e: + logger.error(f"Error deleting sensor: {e}") + raise + + async def ingest_sensor_data(self, sensor_data) -> Dict[str, Any]: + """Ingest real-time sensor data""" + try: + # Create reading document + reading_doc = { + "sensor_id": sensor_data.sensor_id, + "timestamp": sensor_data.timestamp, + "value": sensor_data.value, + "unit": sensor_data.unit if hasattr(sensor_data, 'unit') else None, + "room": sensor_data.room if hasattr(sensor_data, 'room') else None, + "created_at": datetime.utcnow() + } + + # Store in database + await self.db.sensor_readings.insert_one(reading_doc) + + # Cache recent value in Redis + if self.redis: + cache_key = f"sensor:{sensor_data.sensor_id}:latest" + await self.redis.setex(cache_key, 3600, json.dumps(reading_doc, default=str)) + + return {"success": True} + + except Exception as e: + logger.error(f"Error ingesting sensor data: {e}") + raise + + async def export_data(self, start_time: int, end_time: int, sensor_ids: Optional[str] = None, + format: str = "json") -> Dict[str, Any]: + """Export sensor data""" + try: + query = { + "timestamp": {"$gte": start_time, "$lte": end_time} + } + + if sensor_ids: + sensor_list = [s.strip() for s in sensor_ids.split(",")] + query["sensor_id"] = {"$in": sensor_list} + + cursor = self.db.sensor_readings.find(query).sort("timestamp", 1) + readings = [] + + async for reading in cursor: + reading["_id"] = str(reading["_id"]) + readings.append(reading) + + return { + "format": format, + "data": readings, + "total_records": len(readings), + "period": {"start": start_time, "end": end_time} + } + + except Exception as e: + logger.error(f"Error exporting data: {e}") + raise + + async def get_events(self, severity: Optional[str] = None, event_type: Optional[str] = None, + hours: int = 24, limit: int = 50) -> List[Dict[str, Any]]: + """Get system events""" + try: + start_time = datetime.utcnow() - timedelta(hours=hours) + + query = {"timestamp": {"$gte": start_time}} + + if severity: + query["severity"] = severity + if event_type: + query["event_type"] = event_type + + cursor = self.db.system_events.find(query).sort("timestamp", -1).limit(limit) + events = [] + + async for event in cursor: + event["_id"] = str(event["_id"]) + events.append(event) + + return events + + except Exception as e: + logger.error(f"Error getting events: {e}") + return [] + + async def cleanup_old_data(self, cutoff_date: datetime): + """Clean up old sensor data""" + try: + result = await self.db.sensor_readings.delete_many({ + "created_at": {"$lt": cutoff_date} + }) + + logger.info(f"Cleaned up {result.deleted_count} old sensor readings") + + except Exception as e: + logger.error(f"Error cleaning up old data: {e}") + raise \ No newline at end of file