import json import asyncio from datetime import datetime, timedelta from typing import Dict, Any, List, Optional import logging from pymongo.errors import DuplicateKeyError import uuid from database import get_database, redis_manager from models import ( SensorReading, LegacySensorReading, SensorMetadata, RoomMetrics, SystemEvent, SensorType, SensorStatus, CO2Status, OccupancyLevel ) logger = logging.getLogger(__name__) class DataPersistenceService: """Service for persisting sensor data to MongoDB and managing Redis cache""" def __init__(self): self.db = None self.redis = redis_manager async def initialize(self): """Initialize the persistence service""" self.db = await get_database() await self.redis.connect() logger.info("Data persistence service initialized") async def process_sensor_message(self, message_data: str) -> bool: """Process incoming sensor message and persist data""" try: # Parse the message data = json.loads(message_data) logger.debug(f"Processing sensor message: {data}") # Determine message format and convert to standard format if self._is_legacy_format(data): sensor_reading = await self._convert_legacy_data(data) else: sensor_reading = SensorReading(**data) # Store in MongoDB await self._store_sensor_reading(sensor_reading) # Update Redis cache for real-time access await self._update_redis_cache(sensor_reading) # Update sensor metadata await self._update_sensor_metadata(sensor_reading) # Calculate and store room metrics await self._update_room_metrics(sensor_reading) # Check for alerts and anomalies await self._check_alerts(sensor_reading) return True except Exception as e: logger.error(f"Error processing sensor message: {e}") # Log the error event await self._log_system_event( event_type="data_processing_error", severity="error", title="Sensor Data Processing Failed", description=f"Failed to process sensor message: {str(e)}", data={"raw_message": message_data} ) return False def _is_legacy_format(self, data: dict) -> bool: """Check if data is in legacy format""" legacy_keys = {"sensorId", "timestamp", "value", "unit"} return legacy_keys.issubset(data.keys()) and "energy" not in data async def _convert_legacy_data(self, data: dict) -> SensorReading: """Convert legacy format to new sensor reading format""" legacy_reading = LegacySensorReading(**data) return SensorReading( sensor_id=legacy_reading.sensor_id, sensor_type=SensorType.ENERGY, # Assume legacy data is energy timestamp=legacy_reading.timestamp, created_at=legacy_reading.created_at, energy={ "value": legacy_reading.value, "unit": legacy_reading.unit } ) async def _store_sensor_reading(self, reading: SensorReading): """Store sensor reading in MongoDB""" try: reading_dict = reading.dict() # Add document ID for deduplication reading_dict["_id"] = f"{reading.sensor_id}_{reading.timestamp}" await self.db.sensor_readings.insert_one(reading_dict) logger.debug(f"Stored sensor reading for {reading.sensor_id}") except DuplicateKeyError: logger.debug(f"Duplicate reading ignored for {reading.sensor_id} at {reading.timestamp}") except Exception as e: logger.error(f"Error storing sensor reading: {e}") raise async def _update_redis_cache(self, reading: SensorReading): """Update Redis cache with latest sensor data""" try: # Store latest reading for real-time access await self.redis.set_sensor_data( reading.sensor_id, reading.dict(), expire_time=3600 # 1 hour expiration ) # Store sensor status status_key = f"sensor:status:{reading.sensor_id}" await self.redis.redis_client.setex( status_key, 1800, # 30 minutes json.dumps({ "status": "online", "last_seen": reading.timestamp, "room": reading.room }) ) except Exception as e: logger.error(f"Error updating Redis cache: {e}") async def _update_sensor_metadata(self, reading: SensorReading): """Update or create sensor metadata""" try: # Check if sensor metadata exists existing = await self.db.sensor_metadata.find_one({"sensor_id": reading.sensor_id}) if existing: # Update existing metadata await self.db.sensor_metadata.update_one( {"sensor_id": reading.sensor_id}, { "$set": { "last_seen": datetime.utcnow(), "status": SensorStatus.ONLINE.value, "updated_at": datetime.utcnow() }, "$addToSet": { "monitoring_capabilities": reading.sensor_type.value } } ) else: # Create new sensor metadata metadata = SensorMetadata( sensor_id=reading.sensor_id, name=f"Sensor {reading.sensor_id}", sensor_type=reading.sensor_type, room=reading.room, status=SensorStatus.ONLINE, last_seen=datetime.utcnow(), monitoring_capabilities=[reading.sensor_type.value] ) await self.db.sensor_metadata.insert_one(metadata.dict()) logger.info(f"Created metadata for new sensor: {reading.sensor_id}") except Exception as e: logger.error(f"Error updating sensor metadata: {e}") async def _update_room_metrics(self, reading: SensorReading): """Calculate and store room-level metrics""" if not reading.room: return try: # Get recent readings for this room (last 5 minutes) recent_time = datetime.utcnow() - timedelta(minutes=5) # Query recent readings for the room cursor = self.db.sensor_readings.find({ "room": reading.room, "created_at": {"$gte": recent_time} }) recent_readings = await cursor.to_list(length=None) if not recent_readings: return # Calculate aggregated metrics metrics = await self._calculate_room_metrics(reading.room, recent_readings) # Store in MongoDB await self.db.room_metrics.insert_one(metrics.dict()) # Cache in Redis await self.redis.set_room_metrics(reading.room, metrics.dict()) logger.debug(f"Updated room metrics for {reading.room}") except Exception as e: logger.error(f"Error updating room metrics: {e}") async def _calculate_room_metrics(self, room: str, readings: List[Dict]) -> RoomMetrics: """Calculate aggregated metrics for a room""" # Group readings by sensor sensors_data = {} for reading in readings: sensor_id = reading["sensor_id"] if sensor_id not in sensors_data: sensors_data[sensor_id] = [] sensors_data[sensor_id].append(reading) # Initialize metrics energy_values = [] co2_values = [] temperature_values = [] humidity_values = [] motion_detected = False # Extract values from readings for sensor_readings in sensors_data.values(): for reading in sensor_readings: if reading.get("energy"): energy_values.append(reading["energy"]["value"]) if reading.get("co2"): co2_values.append(reading["co2"]["value"]) if reading.get("temperature"): temperature_values.append(reading["temperature"]["value"]) if reading.get("humidity"): humidity_values.append(reading["humidity"]["value"]) if reading.get("motion") and reading["motion"].get("value") == "Detected": motion_detected = True # Calculate aggregated metrics metrics = RoomMetrics( room=room, timestamp=int(datetime.utcnow().timestamp()), sensor_count=len(sensors_data), active_sensors=list(sensors_data.keys()), sensor_types=list(set(reading.get("sensor_type") for reading in readings if reading.get("sensor_type"))), motion_detected=motion_detected ) # Energy metrics if energy_values: metrics.energy = { "current": sum(energy_values), "average": sum(energy_values) / len(energy_values), "total": sum(energy_values), "peak": max(energy_values), "unit": "kWh" } # CO2 metrics if co2_values: avg_co2 = sum(co2_values) / len(co2_values) metrics.co2 = { "current": avg_co2, "average": avg_co2, "max": max(co2_values), "min": min(co2_values), "status": self._get_co2_status(avg_co2).value, "unit": "ppm" } # Set occupancy estimate based on CO2 metrics.occupancy_estimate = self._estimate_occupancy(avg_co2) # Temperature metrics if temperature_values: metrics.temperature = { "current": sum(temperature_values) / len(temperature_values), "average": sum(temperature_values) / len(temperature_values), "max": max(temperature_values), "min": min(temperature_values), "unit": "°C" } # Humidity metrics if humidity_values: metrics.humidity = { "current": sum(humidity_values) / len(humidity_values), "average": sum(humidity_values) / len(humidity_values), "max": max(humidity_values), "min": min(humidity_values), "unit": "%" } return metrics def _get_co2_status(self, co2_level: float) -> CO2Status: """Determine CO2 status based on level""" if co2_level < 400: return CO2Status.GOOD elif co2_level < 1000: return CO2Status.MODERATE elif co2_level < 5000: return CO2Status.POOR else: return CO2Status.CRITICAL def _estimate_occupancy(self, co2_level: float) -> OccupancyLevel: """Estimate occupancy level based on CO2""" if co2_level < 600: return OccupancyLevel.LOW elif co2_level < 1200: return OccupancyLevel.MEDIUM else: return OccupancyLevel.HIGH async def _check_alerts(self, reading: SensorReading): """Check for alert conditions and create system events""" alerts = [] # CO2 level alerts if reading.co2: co2_level = reading.co2.get("value", 0) if co2_level > 5000: alerts.append({ "event_type": "co2_critical", "severity": "critical", "title": "Critical CO2 Level", "description": f"CO2 level ({co2_level} ppm) exceeds critical threshold in {reading.room or 'unknown room'}" }) elif co2_level > 1000: alerts.append({ "event_type": "co2_high", "severity": "warning", "title": "High CO2 Level", "description": f"CO2 level ({co2_level} ppm) is above recommended levels in {reading.room or 'unknown room'}" }) # Energy consumption alerts if reading.energy: energy_value = reading.energy.get("value", 0) if energy_value > 10: # Threshold for high energy consumption alerts.append({ "event_type": "energy_high", "severity": "warning", "title": "High Energy Consumption", "description": f"Energy consumption ({energy_value} kWh) is unusually high for sensor {reading.sensor_id}" }) # Temperature alerts if reading.temperature: temp_value = reading.temperature.get("value", 0) if temp_value > 30 or temp_value < 15: alerts.append({ "event_type": "temperature_extreme", "severity": "warning", "title": "Extreme Temperature", "description": f"Temperature ({temp_value}°C) is outside normal range in {reading.room or 'unknown room'}" }) # Create system events for alerts for alert in alerts: await self._log_system_event( sensor_id=reading.sensor_id, room=reading.room, **alert, data=reading.dict() ) async def _log_system_event(self, event_type: str, severity: str, title: str, description: str, sensor_id: str = None, room: str = None, source: str = None, data: Dict = None): """Log a system event""" try: event = SystemEvent( event_id=str(uuid.uuid4()), event_type=event_type, severity=severity, timestamp=int(datetime.utcnow().timestamp()), title=title, description=description, sensor_id=sensor_id, room=room, source=source or "data_persistence_service", data=data or {} ) await self.db.system_events.insert_one(event.dict()) logger.info(f"System event logged: {event_type} - {title}") except Exception as e: logger.error(f"Error logging system event: {e}") async def get_recent_readings(self, sensor_id: str = None, room: str = None, limit: int = 100, minutes: int = 60) -> List[Dict]: """Get recent sensor readings""" try: # Build query query = { "created_at": {"$gte": datetime.utcnow() - timedelta(minutes=minutes)} } if sensor_id: query["sensor_id"] = sensor_id if room: query["room"] = room cursor = self.db.sensor_readings.find(query).sort("created_at", -1).limit(limit) readings = await cursor.to_list(length=limit) return readings except Exception as e: logger.error(f"Error getting recent readings: {e}") return [] async def get_sensor_statistics(self) -> Dict[str, Any]: """Get overall sensor statistics""" try: stats = {} # Total readings count stats["total_readings"] = await self.db.sensor_readings.count_documents({}) # Active sensors (sensors that sent data in last 24 hours) recent_time = datetime.utcnow() - timedelta(hours=24) active_sensors = await self.db.sensor_readings.distinct("sensor_id", { "created_at": {"$gte": recent_time} }) stats["active_sensors"] = len(active_sensors) # Total registered sensors stats["total_sensors"] = await self.db.sensor_metadata.count_documents({}) # Readings in last 24 hours stats["recent_readings"] = await self.db.sensor_readings.count_documents({ "created_at": {"$gte": recent_time} }) # Room count stats["total_rooms"] = len(await self.db.sensor_readings.distinct("room", {"room": {"$ne": None}})) return stats except Exception as e: logger.error(f"Error getting sensor statistics: {e}") return {} # Global persistence service instance persistence_service = DataPersistenceService()