""" Room service for managing rooms and room-level metrics """ import logging from datetime import datetime, timedelta from typing import List, Dict, Any, Optional import json logger = logging.getLogger(__name__) class RoomService: """Service for managing rooms and room-level analytics""" def __init__(self, db, redis_client): self.db = db self.redis = redis_client async def get_all_room_names(self) -> List[str]: """Get a simple list of all room names for dropdowns/selections""" try: # Get rooms from the rooms collection room_cursor = self.db.rooms.find({}, {"name": 1}) room_names = set() async for room in room_cursor: room_names.add(room["name"]) # Also get rooms that exist only in sensor data (legacy support) sensor_cursor = self.db.sensors.find( {"room": {"$ne": None, "$exists": True}}, {"room": 1} ) async for sensor in sensor_cursor: if sensor.get("room"): room_names.add(sensor["room"]) # Convert to sorted list return sorted(list(room_names)) except Exception as e: logger.error(f"Error getting room names: {e}") raise async def initialize_default_rooms(self) -> None: """Initialize default rooms if none exist""" try: # Check if any rooms exist room_count = await self.db.rooms.count_documents({}) if room_count == 0: # Create default rooms default_rooms = [ {"name": "Conference Room A", "description": "Main conference room", "room_type": "meeting"}, {"name": "Conference Room B", "description": "Secondary conference room", "room_type": "meeting"}, {"name": "Office Floor 1", "description": "First floor office space", "room_type": "office"}, {"name": "Office Floor 2", "description": "Second floor office space", "room_type": "office"}, {"name": "Kitchen", "description": "Employee kitchen and break room", "room_type": "common"}, {"name": "Lobby", "description": "Main entrance and reception", "room_type": "common"}, {"name": "Server Room", "description": "IT equipment room", "room_type": "technical"}, {"name": "Storage Room", "description": "General storage", "room_type": "storage"}, {"name": "Meeting Room 1", "description": "Small meeting room", "room_type": "meeting"}, {"name": "Meeting Room 2", "description": "Small meeting room", "room_type": "meeting"} ] for room_data in default_rooms: room_doc = { **room_data, "created_at": datetime.utcnow(), "updated_at": datetime.utcnow() } await self.db.rooms.insert_one(room_doc) logger.info(f"Initialized {len(default_rooms)} default rooms") except Exception as e: logger.error(f"Error initializing default rooms: {e}") raise async def get_rooms(self) -> List[Dict[str, Any]]: """Get all rooms with sensor counts and metrics""" try: # Get unique rooms from sensors pipeline = [ {"$group": {"_id": "$room", "sensor_count": {"$sum": 1}}}, {"$match": {"_id": {"$ne": None}}} ] cursor = self.db.sensors.aggregate(pipeline) rooms = [] async for room_data in cursor: room_name = room_data["_id"] # Get latest room metrics latest_metrics = await self._get_latest_room_metrics(room_name) room_info = { "name": room_name, "sensor_count": room_data["sensor_count"], "latest_metrics": latest_metrics, "last_updated": latest_metrics.get("timestamp") if latest_metrics else None } rooms.append(room_info) return rooms except Exception as e: logger.error(f"Error getting rooms: {e}") raise async def create_room(self, room_data: Dict[str, Any]) -> Dict[str, Any]: """Create a new room""" try: room_doc = { "name": room_data.get("name"), "description": room_data.get("description", ""), "floor": room_data.get("floor"), "building": room_data.get("building"), "area": room_data.get("area"), "capacity": room_data.get("capacity"), "room_type": room_data.get("room_type"), "created_at": datetime.utcnow(), "updated_at": datetime.utcnow() } # Validate required fields if not room_doc["name"] or not room_doc["name"].strip(): raise ValueError("Room name is required") # Check if room already exists existing = await self.db.rooms.find_one({"name": room_doc["name"]}) if existing: raise ValueError(f"Room {room_doc['name']} already exists") result = await self.db.rooms.insert_one(room_doc) return { "id": str(result.inserted_id), "name": room_doc["name"], "created_at": room_doc["created_at"] } except Exception as e: logger.error(f"Error creating room: {e}") raise async def update_room(self, room_name: str, room_data: Dict[str, Any]) -> Dict[str, Any]: """Update an existing room""" try: # Check if room exists existing = await self.db.rooms.find_one({"name": room_name}) if not existing: raise ValueError(f"Room {room_name} not found") # Prepare update document update_doc = { "updated_at": datetime.utcnow() } # Update only provided fields for field in ["description", "floor", "building", "area", "capacity", "room_type"]: if field in room_data and room_data[field] is not None: update_doc[field] = room_data[field] # Perform update result = await self.db.rooms.update_one( {"name": room_name}, {"$set": update_doc} ) if result.modified_count == 0: logger.warning(f"No changes made to room {room_name}") return { "name": room_name, "updated_at": update_doc["updated_at"], "modified": result.modified_count > 0 } except Exception as e: logger.error(f"Error updating room: {e}") raise async def delete_room(self, room_name: str) -> Dict[str, Any]: """Delete a room and optionally reassign sensors""" try: # Check if room exists existing = await self.db.rooms.find_one({"name": room_name}) # Check for sensors in this room sensors_in_room = await self.db.sensors.find({"room": room_name}).to_list(None) if sensors_in_room: # Update sensors to have null room (don't delete sensors) await self.db.sensors.update_many( {"room": room_name}, {"$unset": {"room": ""}} ) # Delete room from rooms collection if it exists room_deleted = False if existing: result = await self.db.rooms.delete_one({"name": room_name}) room_deleted = result.deleted_count > 0 # Delete room metrics metrics_result = await self.db.room_metrics.delete_many({"room": room_name}) return { "room": room_name, "room_deleted": room_deleted, "sensors_updated": len(sensors_in_room), "metrics_deleted": metrics_result.deleted_count } except Exception as e: logger.error(f"Error deleting room: {e}") raise async def get_room_details(self, room_name: str) -> Optional[Dict[str, Any]]: """Get detailed room information""" try: # Get room info room = await self.db.rooms.find_one({"name": room_name}) if not room: # Create basic room info from sensor data sensors = await self.db.sensors.find({"room": room_name}).to_list(None) if not sensors: return None room = { "name": room_name, "description": f"Room with {len(sensors)} sensors", "sensor_count": len(sensors) } else: room["_id"] = str(room["_id"]) # Get sensor count sensor_count = await self.db.sensors.count_documents({"room": room_name}) room["sensor_count"] = sensor_count # Get sensors in this room cursor = self.db.sensors.find({"room": room_name}) sensors = [] async for sensor in cursor: sensor["_id"] = str(sensor["_id"]) sensors.append(sensor) room["sensors"] = sensors # Get recent room metrics room["recent_metrics"] = await self._get_recent_room_metrics(room_name, hours=24) return room except Exception as e: logger.error(f"Error getting room details: {e}") raise async def get_room_data(self, room_name: str, start_time: Optional[int] = None, end_time: Optional[int] = None, limit: int = 100) -> Dict[str, Any]: """Get historical data for a room""" try: # Get room metrics room_query = {"room": room_name} if start_time or end_time: room_query["timestamp"] = {} if start_time: room_query["timestamp"]["$gte"] = start_time if end_time: room_query["timestamp"]["$lte"] = end_time room_metrics_cursor = self.db.room_metrics.find(room_query).sort("timestamp", -1).limit(limit) room_metrics = [] async for metric in room_metrics_cursor: metric["_id"] = str(metric["_id"]) room_metrics.append(metric) # Get sensor readings for this room sensor_query = {"room": room_name} if start_time or end_time: sensor_query["timestamp"] = {} if start_time: sensor_query["timestamp"]["$gte"] = start_time if end_time: sensor_query["timestamp"]["$lte"] = end_time sensor_readings_cursor = self.db.sensor_readings.find(sensor_query).sort("timestamp", -1).limit(limit) sensor_readings = [] async for reading in sensor_readings_cursor: reading["_id"] = str(reading["_id"]) sensor_readings.append(reading) return { "room_metrics": room_metrics, "sensor_readings": sensor_readings } except Exception as e: logger.error(f"Error getting room data: {e}") raise async def update_room_metrics(self, sensor_data): """Update room-level metrics when sensor data is received""" try: if not sensor_data.room: return # Calculate room-level aggregates room_metrics = await self._calculate_room_metrics(sensor_data.room) if room_metrics: # Store room metrics metrics_doc = { "room": sensor_data.room, "timestamp": sensor_data.timestamp, "total_energy": room_metrics.get("total_energy", 0), "average_temperature": room_metrics.get("avg_temperature"), "co2_level": room_metrics.get("co2_level"), "occupancy_estimate": room_metrics.get("occupancy_estimate"), "sensor_count": room_metrics.get("sensor_count", 0), "created_at": datetime.utcnow() } await self.db.room_metrics.insert_one(metrics_doc) # Cache latest metrics if self.redis: cache_key = f"room:{sensor_data.room}:latest_metrics" await self.redis.setex(cache_key, 3600, json.dumps(metrics_doc, default=str)) except Exception as e: logger.error(f"Error updating room metrics: {e}") async def aggregate_all_room_metrics(self): """Aggregate metrics for all rooms""" try: # Get all unique rooms pipeline = [{"$group": {"_id": "$room"}}] cursor = self.db.sensors.aggregate(pipeline) async for room_data in cursor: room_name = room_data["_id"] if room_name: await self._calculate_room_metrics(room_name) except Exception as e: logger.error(f"Error aggregating room metrics: {e}") async def _get_latest_room_metrics(self, room_name: str) -> Optional[Dict[str, Any]]: """Get latest room metrics""" try: # Try Redis cache first if self.redis: cache_key = f"room:{room_name}:latest_metrics" cached = await self.redis.get(cache_key) if cached: return json.loads(cached) # Fall back to database latest = await self.db.room_metrics.find_one( {"room": room_name}, sort=[("timestamp", -1)] ) if latest: latest["_id"] = str(latest["_id"]) return latest return None except Exception as e: logger.error(f"Error getting latest room metrics: {e}") return None async def _get_recent_room_metrics(self, room_name: str, hours: int = 24) -> List[Dict[str, Any]]: """Get recent room metrics""" try: start_time = datetime.utcnow() - timedelta(hours=hours) cursor = self.db.room_metrics.find({ "room": room_name, "created_at": {"$gte": start_time} }).sort("timestamp", -1) metrics = [] async for metric in cursor: metric["_id"] = str(metric["_id"]) metrics.append(metric) return metrics except Exception as e: logger.error(f"Error getting recent room metrics: {e}") return [] async def _calculate_room_metrics(self, room_name: str) -> Dict[str, Any]: """Calculate aggregated metrics for a room""" try: # Get recent sensor readings (last 5 minutes) five_minutes_ago = datetime.utcnow() - timedelta(minutes=5) pipeline = [ { "$match": { "room": room_name, "created_at": {"$gte": five_minutes_ago} } }, { "$group": { "_id": "$sensor_id", "latest_value": {"$last": "$value"}, "sensor_type": {"$last": "$sensor_type"} if "sensor_type" in ["$first", "$last"] else {"$first": "energy"}, "unit": {"$last": "$unit"} } } ] cursor = self.db.sensor_readings.aggregate(pipeline) total_energy = 0 temperatures = [] co2_levels = [] sensor_count = 0 async for sensor_data in cursor: sensor_count += 1 value = sensor_data.get("latest_value", 0) sensor_type = sensor_data.get("sensor_type", "energy") if sensor_type == "energy" or "energy" in str(sensor_data.get("unit", "")).lower(): total_energy += value elif sensor_type == "temperature": temperatures.append(value) elif sensor_type == "co2": co2_levels.append(value) metrics = { "total_energy": total_energy, "sensor_count": sensor_count, "avg_temperature": sum(temperatures) / len(temperatures) if temperatures else None, "co2_level": sum(co2_levels) / len(co2_levels) if co2_levels else None, "occupancy_estimate": self._estimate_occupancy(sensor_count, total_energy) } return metrics except Exception as e: logger.error(f"Error calculating room metrics: {e}") return {} def _estimate_occupancy(self, sensor_count: int, total_energy: float) -> Optional[str]: """Estimate occupancy level based on energy consumption""" if total_energy == 0: return "vacant" elif total_energy < sensor_count * 50: # Low threshold return "low" elif total_energy < sensor_count * 150: # Medium threshold return "medium" else: return "high"