diff --git a/layers/__init__.py b/layers/__init__.py deleted file mode 100644 index e8a337c..0000000 --- a/layers/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# Empty file to make this a Python package \ No newline at end of file diff --git a/layers/__pycache__/__init__.cpython-312.pyc b/layers/__pycache__/__init__.cpython-312.pyc deleted file mode 100644 index 779e97d..0000000 Binary files a/layers/__pycache__/__init__.cpython-312.pyc and /dev/null differ diff --git a/layers/__pycache__/__init__.cpython-39.pyc b/layers/__pycache__/__init__.cpython-39.pyc deleted file mode 100644 index 6cc301f..0000000 Binary files a/layers/__pycache__/__init__.cpython-39.pyc and /dev/null differ diff --git a/layers/business/__init__.py b/layers/business/__init__.py deleted file mode 100644 index e8a337c..0000000 --- a/layers/business/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# Empty file to make this a Python package \ No newline at end of file diff --git a/layers/business/__pycache__/__init__.cpython-39.pyc b/layers/business/__pycache__/__init__.cpython-39.pyc deleted file mode 100644 index 2d9d339..0000000 Binary files a/layers/business/__pycache__/__init__.cpython-39.pyc and /dev/null differ diff --git a/layers/business/__pycache__/analytics_service.cpython-39.pyc b/layers/business/__pycache__/analytics_service.cpython-39.pyc deleted file mode 100644 index 2420b29..0000000 Binary files a/layers/business/__pycache__/analytics_service.cpython-39.pyc and /dev/null differ diff --git a/layers/business/__pycache__/cleanup_service.cpython-39.pyc b/layers/business/__pycache__/cleanup_service.cpython-39.pyc deleted file mode 100644 index 566988e..0000000 Binary files a/layers/business/__pycache__/cleanup_service.cpython-39.pyc and /dev/null differ diff --git a/layers/business/__pycache__/room_service.cpython-39.pyc b/layers/business/__pycache__/room_service.cpython-39.pyc deleted file mode 100644 index 6d170cc..0000000 Binary files a/layers/business/__pycache__/room_service.cpython-39.pyc and /dev/null differ diff --git a/layers/business/__pycache__/sensor_service.cpython-39.pyc b/layers/business/__pycache__/sensor_service.cpython-39.pyc deleted file mode 100644 index 6f8e9c9..0000000 Binary files a/layers/business/__pycache__/sensor_service.cpython-39.pyc and /dev/null differ diff --git a/layers/business/analytics_service.py b/layers/business/analytics_service.py deleted file mode 100644 index b23c7bf..0000000 --- a/layers/business/analytics_service.py +++ /dev/null @@ -1,300 +0,0 @@ -""" -Analytics business logic service -Business Layer - handles analytics calculations and data aggregations -""" -from datetime import datetime, timedelta -from typing import Dict, Any, List, Optional -import logging - -from ..infrastructure.repositories import SensorReadingRepository - -logger = logging.getLogger(__name__) - -class AnalyticsService: - """Service for analytics and reporting operations""" - - def __init__(self): - self.sensor_reading_repo = SensorReadingRepository() - - async def get_analytics_summary(self, hours: int = 24) -> Dict[str, Any]: - """Get comprehensive analytics summary for the specified time period""" - try: - start_time = datetime.utcnow() - timedelta(hours=hours) - - # Sensor-level analytics pipeline - sensor_pipeline = [ - {"$match": {"created_at": {"$gte": start_time}}}, - {"$group": { - "_id": { - "sensor_id": "$sensor_id", - "room": "$room", - "sensor_type": "$sensor_type" - }, - "reading_count": {"$sum": 1}, - "avg_energy": {"$avg": "$energy.value"}, - "total_energy": {"$sum": "$energy.value"}, - "avg_co2": {"$avg": "$co2.value"}, - "max_co2": {"$max": "$co2.value"}, - "avg_temperature": {"$avg": "$temperature.value"}, - "latest_timestamp": {"$max": "$timestamp"} - }}, - {"$sort": {"total_energy": -1}} - ] - - sensor_analytics = await self.sensor_reading_repo.aggregate(sensor_pipeline) - - # Room-level analytics pipeline - room_pipeline = [ - {"$match": {"created_at": {"$gte": start_time}, "room": {"$ne": None}}}, - {"$group": { - "_id": "$room", - "sensor_count": {"$addToSet": "$sensor_id"}, - "total_energy": {"$sum": "$energy.value"}, - "avg_co2": {"$avg": "$co2.value"}, - "max_co2": {"$max": "$co2.value"}, - "reading_count": {"$sum": 1} - }}, - {"$project": { - "room": "$_id", - "sensor_count": {"$size": "$sensor_count"}, - "total_energy": 1, - "avg_co2": 1, - "max_co2": 1, - "reading_count": 1 - }}, - {"$sort": {"total_energy": -1}} - ] - - room_analytics = await self.sensor_reading_repo.aggregate(room_pipeline) - - # Calculate summary statistics - summary_stats = self._calculate_summary_stats(sensor_analytics, room_analytics) - - return { - "period_hours": hours, - "start_time": start_time.isoformat(), - "sensor_analytics": sensor_analytics, - "room_analytics": room_analytics, - "summary": summary_stats - } - - except Exception as e: - logger.error(f"Error getting analytics summary: {e}") - return { - "period_hours": hours, - "start_time": None, - "sensor_analytics": [], - "room_analytics": [], - "summary": {} - } - - def _calculate_summary_stats(self, sensor_analytics: List[Dict], - room_analytics: List[Dict]) -> Dict[str, Any]: - """Calculate summary statistics from analytics data""" - total_readings = sum(item["reading_count"] for item in sensor_analytics) - total_energy = sum(item.get("total_energy", 0) or 0 for item in sensor_analytics) - - # Energy consumption insights - energy_insights = { - "total_consumption_kwh": round(total_energy, 2), - "average_consumption_per_sensor": ( - round(total_energy / len(sensor_analytics), 2) - if sensor_analytics else 0 - ), - "top_energy_consumer": ( - sensor_analytics[0]["_id"]["sensor_id"] - if sensor_analytics else None - ) - } - - # CO2 insights - co2_values = [item.get("avg_co2") for item in sensor_analytics if item.get("avg_co2")] - co2_insights = { - "average_co2_level": ( - round(sum(co2_values) / len(co2_values), 1) - if co2_values else 0 - ), - "sensors_with_high_co2": len([ - co2 for co2 in co2_values if co2 and co2 > 1000 - ]), - "sensors_with_critical_co2": len([ - co2 for co2 in co2_values if co2 and co2 > 5000 - ]) - } - - return { - "total_sensors_analyzed": len(sensor_analytics), - "total_rooms_analyzed": len(room_analytics), - "total_readings": total_readings, - "energy_insights": energy_insights, - "co2_insights": co2_insights - } - - async def get_energy_trends(self, hours: int = 168) -> Dict[str, Any]: - """Get energy consumption trends (default: last week)""" - try: - start_time = datetime.utcnow() - timedelta(hours=hours) - - # Hourly energy consumption pipeline - pipeline = [ - {"$match": { - "created_at": {"$gte": start_time}, - "energy.value": {"$exists": True} - }}, - {"$group": { - "_id": { - "year": {"$year": "$created_at"}, - "month": {"$month": "$created_at"}, - "day": {"$dayOfMonth": "$created_at"}, - "hour": {"$hour": "$created_at"} - }, - "total_energy": {"$sum": "$energy.value"}, - "sensor_count": {"$addToSet": "$sensor_id"}, - "reading_count": {"$sum": 1} - }}, - {"$project": { - "_id": 0, - "timestamp": { - "$dateFromParts": { - "year": "$_id.year", - "month": "$_id.month", - "day": "$_id.day", - "hour": "$_id.hour" - } - }, - "total_energy": {"$round": ["$total_energy", 2]}, - "sensor_count": {"$size": "$sensor_count"}, - "reading_count": 1 - }}, - {"$sort": {"timestamp": 1}} - ] - - trends = await self.sensor_reading_repo.aggregate(pipeline) - - # Calculate trend insights - insights = self._calculate_trend_insights(trends) - - return { - "period_hours": hours, - "data_points": len(trends), - "trends": trends, - "insights": insights - } - - except Exception as e: - logger.error(f"Error getting energy trends: {e}") - return { - "period_hours": hours, - "data_points": 0, - "trends": [], - "insights": {} - } - - def _calculate_trend_insights(self, trends: List[Dict]) -> Dict[str, Any]: - """Calculate insights from trend data""" - if not trends: - return {} - - energy_values = [item["total_energy"] for item in trends] - - # Peak and low consumption - max_consumption = max(energy_values) - min_consumption = min(energy_values) - avg_consumption = sum(energy_values) / len(energy_values) - - # Find peak time - peak_item = max(trends, key=lambda x: x["total_energy"]) - peak_time = peak_item["timestamp"] - - return { - "peak_consumption_kwh": max_consumption, - "lowest_consumption_kwh": min_consumption, - "average_consumption_kwh": round(avg_consumption, 2), - "peak_time": peak_time.isoformat() if hasattr(peak_time, 'isoformat') else str(peak_time), - "consumption_variance": round(max_consumption - min_consumption, 2) - } - - async def get_room_comparison(self, hours: int = 24) -> Dict[str, Any]: - """Get room-by-room comparison analytics""" - try: - start_time = datetime.utcnow() - timedelta(hours=hours) - - pipeline = [ - {"$match": { - "created_at": {"$gte": start_time}, - "room": {"$ne": None} - }}, - {"$group": { - "_id": "$room", - "total_energy": {"$sum": "$energy.value"}, - "avg_energy": {"$avg": "$energy.value"}, - "avg_co2": {"$avg": "$co2.value"}, - "max_co2": {"$max": "$co2.value"}, - "avg_temperature": {"$avg": "$temperature.value"}, - "sensor_count": {"$addToSet": "$sensor_id"}, - "reading_count": {"$sum": 1} - }}, - {"$project": { - "room": "$_id", - "_id": 0, - "total_energy": {"$round": [{"$ifNull": ["$total_energy", 0]}, 2]}, - "avg_energy": {"$round": [{"$ifNull": ["$avg_energy", 0]}, 2]}, - "avg_co2": {"$round": [{"$ifNull": ["$avg_co2", 0]}, 1]}, - "max_co2": {"$round": [{"$ifNull": ["$max_co2", 0]}, 1]}, - "avg_temperature": {"$round": [{"$ifNull": ["$avg_temperature", 0]}, 1]}, - "sensor_count": {"$size": "$sensor_count"}, - "reading_count": 1 - }}, - {"$sort": {"total_energy": -1}} - ] - - room_comparison = await self.sensor_reading_repo.aggregate(pipeline) - - # Calculate comparison insights - insights = self._calculate_room_insights(room_comparison) - - return { - "period_hours": hours, - "rooms_analyzed": len(room_comparison), - "comparison": room_comparison, - "insights": insights - } - - except Exception as e: - logger.error(f"Error getting room comparison: {e}") - return { - "period_hours": hours, - "rooms_analyzed": 0, - "comparison": [], - "insights": {} - } - - def _calculate_room_insights(self, room_data: List[Dict]) -> Dict[str, Any]: - """Calculate insights from room comparison data""" - if not room_data: - return {} - - # Energy insights - total_energy = sum(room["total_energy"] for room in room_data) - highest_consumer = room_data[0] if room_data else None - lowest_consumer = min(room_data, key=lambda x: x["total_energy"]) if room_data else None - - # CO2 insights - rooms_with_high_co2 = [ - room for room in room_data - if room.get("avg_co2", 0) > 1000 - ] - - # Temperature insights - temp_values = [room.get("avg_temperature", 0) for room in room_data if room.get("avg_temperature")] - avg_building_temp = sum(temp_values) / len(temp_values) if temp_values else 0 - - return { - "total_building_energy_kwh": round(total_energy, 2), - "highest_energy_consumer": highest_consumer["room"] if highest_consumer else None, - "lowest_energy_consumer": lowest_consumer["room"] if lowest_consumer else None, - "rooms_with_high_co2": len(rooms_with_high_co2), - "high_co2_rooms": [room["room"] for room in rooms_with_high_co2], - "average_building_temperature": round(avg_building_temp, 1), - "total_active_sensors": sum(room["sensor_count"] for room in room_data) - } \ No newline at end of file diff --git a/layers/business/cleanup_service.py b/layers/business/cleanup_service.py deleted file mode 100644 index 76219b4..0000000 --- a/layers/business/cleanup_service.py +++ /dev/null @@ -1,234 +0,0 @@ -""" -Data cleanup and maintenance service -Business Layer - handles data retention policies and system maintenance -""" -import asyncio -from datetime import datetime, timedelta -from typing import Dict, Any -import logging - -from ..infrastructure.database_connection import database_connection -from ..infrastructure.repositories import SensorReadingRepository - -logger = logging.getLogger(__name__) - -class CleanupService: - """Service for data cleanup and maintenance operations""" - - def __init__(self): - self.sensor_reading_repo = SensorReadingRepository() - self.is_running = False - self.cleanup_task = None - - async def start_scheduled_cleanup(self, interval_hours: int = 24) -> None: - """Start scheduled cleanup process""" - if self.is_running: - logger.warning("Cleanup service is already running") - return - - self.is_running = True - self.cleanup_task = asyncio.create_task(self._cleanup_loop(interval_hours)) - logger.info(f"Started scheduled cleanup service (interval: {interval_hours} hours)") - - async def stop_scheduled_cleanup(self) -> None: - """Stop scheduled cleanup process""" - self.is_running = False - if self.cleanup_task: - self.cleanup_task.cancel() - try: - await self.cleanup_task - except asyncio.CancelledError: - pass - logger.info("Cleanup service stopped") - - async def _cleanup_loop(self, interval_hours: int) -> None: - """Main cleanup loop""" - while self.is_running: - try: - await self.cleanup_old_data() - # Wait for next cleanup interval - await asyncio.sleep(interval_hours * 3600) # Convert hours to seconds - except Exception as e: - logger.error(f"Error in scheduled cleanup: {e}") - # Wait 1 hour before retrying on error - await asyncio.sleep(3600) - - async def cleanup_old_data(self) -> Dict[str, int]: - """Perform data cleanup based on retention policies""" - try: - cleanup_results = {} - db = await database_connection.get_database() - - # Delete sensor readings older than 90 days - sensor_retention_date = datetime.utcnow() - timedelta(days=90) - sensor_result = await db.sensor_readings.delete_many({ - "created_at": {"$lt": sensor_retention_date} - }) - cleanup_results["sensor_readings_deleted"] = sensor_result.deleted_count - - if sensor_result.deleted_count > 0: - logger.info(f"Deleted {sensor_result.deleted_count} old sensor readings") - - # Delete room metrics older than 30 days - room_retention_date = datetime.utcnow() - timedelta(days=30) - room_result = await db.room_metrics.delete_many({ - "created_at": {"$lt": room_retention_date} - }) - cleanup_results["room_metrics_deleted"] = room_result.deleted_count - - if room_result.deleted_count > 0: - logger.info(f"Deleted {room_result.deleted_count} old room metrics") - - # Delete system events older than 60 days - events_retention_date = datetime.utcnow() - timedelta(days=60) - events_result = await db.system_events.delete_many({ - "created_at": {"$lt": events_retention_date} - }) - cleanup_results["system_events_deleted"] = events_result.deleted_count - - if events_result.deleted_count > 0: - logger.info(f"Deleted {events_result.deleted_count} old system events") - - # Clean up orphaned sensor metadata (sensors with no recent readings) - orphaned_retention_date = datetime.utcnow() - timedelta(days=30) - - # Find sensors with no recent readings - active_sensors = await db.sensor_readings.distinct("sensor_id", { - "created_at": {"$gte": orphaned_retention_date} - }) - - orphaned_result = await db.sensor_metadata.delete_many({ - "sensor_id": {"$nin": active_sensors}, - "last_seen": {"$lt": orphaned_retention_date} - }) - cleanup_results["orphaned_metadata_deleted"] = orphaned_result.deleted_count - - if orphaned_result.deleted_count > 0: - logger.info(f"Deleted {orphaned_result.deleted_count} orphaned sensor metadata records") - - return cleanup_results - - except Exception as e: - logger.error(f"Error during data cleanup: {e}") - return {"error": str(e)} - - async def get_storage_statistics(self) -> Dict[str, Any]: - """Get storage statistics for different collections""" - try: - db = await database_connection.get_database() - - stats = {} - - # Sensor readings statistics - sensor_stats = await db.command("collStats", "sensor_readings") - stats["sensor_readings"] = { - "count": sensor_stats.get("count", 0), - "size_bytes": sensor_stats.get("size", 0), - "avg_obj_size": sensor_stats.get("avgObjSize", 0), - "storage_size": sensor_stats.get("storageSize", 0) - } - - # Room metrics statistics - room_stats = await db.command("collStats", "room_metrics") - stats["room_metrics"] = { - "count": room_stats.get("count", 0), - "size_bytes": room_stats.get("size", 0), - "avg_obj_size": room_stats.get("avgObjSize", 0), - "storage_size": room_stats.get("storageSize", 0) - } - - # System events statistics - events_stats = await db.command("collStats", "system_events") - stats["system_events"] = { - "count": events_stats.get("count", 0), - "size_bytes": events_stats.get("size", 0), - "avg_obj_size": events_stats.get("avgObjSize", 0), - "storage_size": events_stats.get("storageSize", 0) - } - - # Sensor metadata statistics - metadata_stats = await db.command("collStats", "sensor_metadata") - stats["sensor_metadata"] = { - "count": metadata_stats.get("count", 0), - "size_bytes": metadata_stats.get("size", 0), - "avg_obj_size": metadata_stats.get("avgObjSize", 0), - "storage_size": metadata_stats.get("storageSize", 0) - } - - # Calculate totals - total_documents = sum(collection["count"] for collection in stats.values()) - total_size = sum(collection["size_bytes"] for collection in stats.values()) - total_storage = sum(collection["storage_size"] for collection in stats.values()) - - stats["totals"] = { - "total_documents": total_documents, - "total_size_bytes": total_size, - "total_storage_bytes": total_storage, - "total_size_mb": round(total_size / (1024 * 1024), 2), - "total_storage_mb": round(total_storage / (1024 * 1024), 2) - } - - return stats - - except Exception as e: - logger.error(f"Error getting storage statistics: {e}") - return {"error": str(e)} - - async def get_data_retention_info(self) -> Dict[str, Any]: - """Get information about data retention policies and old data""" - try: - db = await database_connection.get_database() - - # Current date references - now = datetime.utcnow() - sensor_cutoff = now - timedelta(days=90) - room_cutoff = now - timedelta(days=30) - events_cutoff = now - timedelta(days=60) - - retention_info = {} - - # Sensor readings retention info - old_sensor_count = await db.sensor_readings.count_documents({ - "created_at": {"$lt": sensor_cutoff} - }) - retention_info["sensor_readings"] = { - "retention_days": 90, - "cutoff_date": sensor_cutoff.isoformat(), - "old_records_count": old_sensor_count - } - - # Room metrics retention info - old_room_count = await db.room_metrics.count_documents({ - "created_at": {"$lt": room_cutoff} - }) - retention_info["room_metrics"] = { - "retention_days": 30, - "cutoff_date": room_cutoff.isoformat(), - "old_records_count": old_room_count - } - - # System events retention info - old_events_count = await db.system_events.count_documents({ - "created_at": {"$lt": events_cutoff} - }) - retention_info["system_events"] = { - "retention_days": 60, - "cutoff_date": events_cutoff.isoformat(), - "old_records_count": old_events_count - } - - return retention_info - - except Exception as e: - logger.error(f"Error getting retention info: {e}") - return {"error": str(e)} - - def is_cleanup_running(self) -> bool: - """Check if cleanup service is currently running""" - return self.is_running and ( - self.cleanup_task is not None and - not self.cleanup_task.done() - ) - -# Global cleanup service instance -cleanup_service = CleanupService() \ No newline at end of file diff --git a/layers/business/room_service.py b/layers/business/room_service.py deleted file mode 100644 index b2cc062..0000000 --- a/layers/business/room_service.py +++ /dev/null @@ -1,262 +0,0 @@ -""" -Room metrics business logic service -Business Layer - handles room-related aggregations and business operations -""" -from datetime import datetime, timedelta -from typing import Dict, Any, List, Optional -import logging - -from models import RoomMetrics, CO2Status, OccupancyLevel -from ..infrastructure.repositories import ( - SensorReadingRepository, RoomMetricsRepository, RedisRepository -) - -logger = logging.getLogger(__name__) - -class RoomService: - """Service for room-related business operations""" - - def __init__(self): - self.sensor_reading_repo = SensorReadingRepository() - self.room_metrics_repo = RoomMetricsRepository() - self.redis_repo = RedisRepository() - - async def update_room_metrics(self, room: str) -> bool: - """Calculate and store room-level metrics""" - if not room: - return False - - try: - # Get recent readings for this room (last 5 minutes) - recent_readings = await self.sensor_reading_repo.get_recent_by_room( - room=room, - minutes=5 - ) - - if not recent_readings: - return False - - # Calculate aggregated metrics - metrics = await self._calculate_room_metrics(room, recent_readings) - - # Store in MongoDB - stored = await self.room_metrics_repo.create(metrics) - - # Cache in Redis - if stored: - await self.redis_repo.set_room_metrics(room, metrics.dict()) - logger.debug(f"Updated room metrics for {room}") - - return stored - - except Exception as e: - logger.error(f"Error updating room metrics for {room}: {e}") - return False - - async def _calculate_room_metrics(self, room: str, readings: List[Dict]) -> RoomMetrics: - """Calculate aggregated metrics for a room based on recent readings""" - - # Group readings by sensor - sensors_data = {} - for reading in readings: - sensor_id = reading["sensor_id"] - if sensor_id not in sensors_data: - sensors_data[sensor_id] = [] - sensors_data[sensor_id].append(reading) - - # Initialize value arrays - energy_values = [] - co2_values = [] - temperature_values = [] - humidity_values = [] - motion_detected = False - - # Extract values from readings - for sensor_readings in sensors_data.values(): - for reading in sensor_readings: - if reading.get("energy"): - energy_values.append(reading["energy"]["value"]) - if reading.get("co2"): - co2_values.append(reading["co2"]["value"]) - if reading.get("temperature"): - temperature_values.append(reading["temperature"]["value"]) - if reading.get("humidity"): - humidity_values.append(reading["humidity"]["value"]) - if reading.get("motion") and reading["motion"].get("value") == "Detected": - motion_detected = True - - # Get sensor types present - sensor_types = list(set( - reading.get("sensor_type") - for reading in readings - if reading.get("sensor_type") - )) - - # Initialize metrics object - metrics = RoomMetrics( - room=room, - timestamp=int(datetime.utcnow().timestamp()), - sensor_count=len(sensors_data), - active_sensors=list(sensors_data.keys()), - sensor_types=sensor_types, - motion_detected=motion_detected - ) - - # Calculate energy metrics - if energy_values: - metrics.energy = self._calculate_energy_metrics(energy_values) - - # Calculate CO2 metrics and occupancy - if co2_values: - metrics.co2 = self._calculate_co2_metrics(co2_values) - metrics.occupancy_estimate = self._estimate_occupancy_from_co2( - metrics.co2["average"] - ) - - # Calculate temperature metrics - if temperature_values: - metrics.temperature = self._calculate_temperature_metrics(temperature_values) - - # Calculate humidity metrics - if humidity_values: - metrics.humidity = self._calculate_humidity_metrics(humidity_values) - - # Set last activity time if motion detected - if motion_detected: - metrics.last_activity = datetime.utcnow() - - return metrics - - def _calculate_energy_metrics(self, values: List[float]) -> Dict[str, Any]: - """Calculate energy consumption metrics""" - return { - "current": sum(values), - "average": sum(values) / len(values), - "total": sum(values), - "peak": max(values), - "unit": "kWh" - } - - def _calculate_co2_metrics(self, values: List[float]) -> Dict[str, Any]: - """Calculate CO2 level metrics""" - avg_co2 = sum(values) / len(values) - return { - "current": avg_co2, - "average": avg_co2, - "max": max(values), - "min": min(values), - "status": self._get_co2_status(avg_co2).value, - "unit": "ppm" - } - - def _calculate_temperature_metrics(self, values: List[float]) -> Dict[str, Any]: - """Calculate temperature metrics""" - avg_temp = sum(values) / len(values) - return { - "current": avg_temp, - "average": avg_temp, - "max": max(values), - "min": min(values), - "unit": "°C" - } - - def _calculate_humidity_metrics(self, values: List[float]) -> Dict[str, Any]: - """Calculate humidity metrics""" - avg_humidity = sum(values) / len(values) - return { - "current": avg_humidity, - "average": avg_humidity, - "max": max(values), - "min": min(values), - "unit": "%" - } - - def _get_co2_status(self, co2_level: float) -> CO2Status: - """Determine CO2 status based on level""" - if co2_level < 400: - return CO2Status.GOOD - elif co2_level < 1000: - return CO2Status.MODERATE - elif co2_level < 5000: - return CO2Status.POOR - else: - return CO2Status.CRITICAL - - def _estimate_occupancy_from_co2(self, co2_level: float) -> OccupancyLevel: - """Estimate occupancy level based on CO2 levels""" - if co2_level < 600: - return OccupancyLevel.LOW - elif co2_level < 1200: - return OccupancyLevel.MEDIUM - else: - return OccupancyLevel.HIGH - - async def get_all_rooms(self) -> Dict[str, Any]: - """Get list of all rooms with sensor counts and latest metrics""" - try: - rooms = await self.sensor_reading_repo.get_distinct_rooms() - - room_data = [] - for room in rooms: - # Get sensor count for each room - sensor_ids = await self.sensor_reading_repo.get_distinct_sensor_ids_by_room(room) - sensor_count = len(sensor_ids) - - # Get latest room metrics from cache - room_metrics = await self.redis_repo.get_room_metrics(room) - - room_data.append({ - "room": room, - "sensor_count": sensor_count, - "sensor_ids": sensor_ids, - "latest_metrics": room_metrics - }) - - return { - "rooms": room_data, - "count": len(room_data) - } - - except Exception as e: - logger.error(f"Error getting rooms: {e}") - return {"rooms": [], "count": 0} - - async def get_room_data(self, room_name: str, start_time: Optional[int] = None, - end_time: Optional[int] = None, limit: int = 100) -> Dict[str, Any]: - """Get historical data for a specific room""" - try: - # Build query for time range - query = {"room": room_name} - - if start_time or end_time: - time_query = {} - if start_time: - time_query["$gte"] = datetime.fromtimestamp(start_time) - if end_time: - time_query["$lte"] = datetime.fromtimestamp(end_time) - query["created_at"] = time_query - - # Get room metrics - room_metrics = await self.room_metrics_repo.get_by_room(room_name, limit) - - # Get sensor readings for the room - sensor_readings = await self.sensor_reading_repo.get_by_query( - query=query, - sort_by="timestamp", - sort_order="desc", - limit=limit - ) - - return { - "room": room_name, - "room_metrics": room_metrics, - "sensor_readings": sensor_readings - } - - except Exception as e: - logger.error(f"Error getting room data for {room_name}: {e}") - return { - "room": room_name, - "room_metrics": [], - "sensor_readings": [] - } \ No newline at end of file diff --git a/layers/business/sensor_service.py b/layers/business/sensor_service.py deleted file mode 100644 index 12a140a..0000000 --- a/layers/business/sensor_service.py +++ /dev/null @@ -1,328 +0,0 @@ -""" -Sensor business logic service -Business Layer - handles sensor-related business operations and rules -""" -import json -from datetime import datetime, timedelta -from typing import Dict, Any, List, Optional -import logging -import uuid - -from models import ( - SensorReading, LegacySensorReading, SensorMetadata, - SensorType, SensorStatus, CO2Status, OccupancyLevel -) -from ..infrastructure.repositories import ( - SensorReadingRepository, SensorMetadataRepository, - SystemEventRepository, RedisRepository -) - -logger = logging.getLogger(__name__) - -class SensorService: - """Service for sensor-related business operations""" - - def __init__(self): - self.sensor_reading_repo = SensorReadingRepository() - self.sensor_metadata_repo = SensorMetadataRepository() - self.system_event_repo = SystemEventRepository() - self.redis_repo = RedisRepository() - - async def process_sensor_message(self, message_data: str) -> bool: - """Process incoming sensor message and handle business logic""" - try: - # Parse the message - data = json.loads(message_data) - logger.debug(f"Processing sensor message: {data}") - - # Convert to standard format - sensor_reading = await self._parse_sensor_data(data) - - # Validate business rules - validation_result = await self._validate_sensor_reading(sensor_reading) - if not validation_result["valid"]: - logger.warning(f"Sensor reading validation failed: {validation_result['errors']}") - return False - - # Store the reading - stored = await self.sensor_reading_repo.create(sensor_reading) - if not stored: - return False - - # Update caches and metadata - await self._update_caches(sensor_reading) - await self._update_sensor_metadata(sensor_reading) - - # Check for alerts - await self._check_sensor_alerts(sensor_reading) - - return True - - except Exception as e: - logger.error(f"Error processing sensor message: {e}") - await self._log_processing_error(str(e), message_data) - return False - - async def _parse_sensor_data(self, data: dict) -> SensorReading: - """Parse and convert sensor data to standard format""" - # Check if legacy format - if self._is_legacy_format(data): - return await self._convert_legacy_data(data) - else: - return SensorReading(**data) - - def _is_legacy_format(self, data: dict) -> bool: - """Check if data is in legacy format""" - legacy_keys = {"sensorId", "timestamp", "value", "unit"} - return legacy_keys.issubset(data.keys()) and "energy" not in data - - async def _convert_legacy_data(self, data: dict) -> SensorReading: - """Convert legacy format to new sensor reading format""" - legacy_reading = LegacySensorReading(**data) - - return SensorReading( - sensor_id=legacy_reading.sensor_id, - sensor_type=SensorType.ENERGY, - timestamp=legacy_reading.timestamp, - created_at=legacy_reading.created_at, - energy={ - "value": legacy_reading.value, - "unit": legacy_reading.unit - } - ) - - async def _validate_sensor_reading(self, reading: SensorReading) -> Dict[str, Any]: - """Validate sensor reading against business rules""" - errors = [] - - # Check timestamp is not too far in the future - future_threshold = datetime.utcnow().timestamp() + 3600 # 1 hour - if reading.timestamp > future_threshold: - errors.append("Timestamp is too far in the future") - - # Check timestamp is not too old - past_threshold = datetime.utcnow().timestamp() - 86400 # 24 hours - if reading.timestamp < past_threshold: - errors.append("Timestamp is too old") - - # Validate sensor values - if reading.energy: - energy_value = reading.energy.get("value", 0) - if energy_value < 0 or energy_value > 1000: # Reasonable energy range - errors.append("Energy value is out of acceptable range") - - if reading.co2: - co2_value = reading.co2.get("value", 0) - if co2_value < 0 or co2_value > 50000: # Reasonable CO2 range - errors.append("CO2 value is out of acceptable range") - - if reading.temperature: - temp_value = reading.temperature.get("value", 0) - if temp_value < -50 or temp_value > 100: # Reasonable temperature range - errors.append("Temperature value is out of acceptable range") - - return { - "valid": len(errors) == 0, - "errors": errors - } - - async def _update_caches(self, reading: SensorReading) -> None: - """Update Redis caches with latest sensor data""" - # Cache latest sensor reading - await self.redis_repo.set_sensor_data( - reading.sensor_id, - reading.dict(), - expire_seconds=3600 - ) - - # Update sensor status - status_data = { - "status": "online", - "last_seen": reading.timestamp, - "room": reading.room - } - await self.redis_repo.set_sensor_status( - reading.sensor_id, - status_data, - expire_seconds=1800 - ) - - async def _update_sensor_metadata(self, reading: SensorReading) -> None: - """Update or create sensor metadata""" - existing = await self.sensor_metadata_repo.get_by_sensor_id(reading.sensor_id) - - if existing: - # Update existing metadata - updates = { - "last_seen": datetime.utcnow(), - "status": SensorStatus.ONLINE.value - } - - # Add sensor type to monitoring capabilities if not present - capabilities = existing.get("monitoring_capabilities", []) - if reading.sensor_type.value not in capabilities: - capabilities.append(reading.sensor_type.value) - updates["monitoring_capabilities"] = capabilities - - await self.sensor_metadata_repo.update(reading.sensor_id, updates) - else: - # Create new sensor metadata - metadata = SensorMetadata( - sensor_id=reading.sensor_id, - name=f"Sensor {reading.sensor_id}", - sensor_type=reading.sensor_type, - room=reading.room, - status=SensorStatus.ONLINE, - last_seen=datetime.utcnow(), - monitoring_capabilities=[reading.sensor_type.value] - ) - - await self.sensor_metadata_repo.create(metadata) - logger.info(f"Created metadata for new sensor: {reading.sensor_id}") - - async def _check_sensor_alerts(self, reading: SensorReading) -> None: - """Check for alert conditions in sensor data""" - alerts = [] - - # CO2 level alerts - if reading.co2: - co2_level = reading.co2.get("value", 0) - if co2_level > 5000: - alerts.append({ - "event_type": "co2_critical", - "severity": "critical", - "title": "Critical CO2 Level", - "description": f"CO2 level ({co2_level} ppm) exceeds critical threshold in {reading.room or 'unknown room'}" - }) - elif co2_level > 1000: - alerts.append({ - "event_type": "co2_high", - "severity": "warning", - "title": "High CO2 Level", - "description": f"CO2 level ({co2_level} ppm) is above recommended levels in {reading.room or 'unknown room'}" - }) - - # Energy consumption alerts - if reading.energy: - energy_value = reading.energy.get("value", 0) - if energy_value > 10: - alerts.append({ - "event_type": "energy_high", - "severity": "warning", - "title": "High Energy Consumption", - "description": f"Energy consumption ({energy_value} kWh) is unusually high for sensor {reading.sensor_id}" - }) - - # Temperature alerts - if reading.temperature: - temp_value = reading.temperature.get("value", 0) - if temp_value > 30 or temp_value < 15: - alerts.append({ - "event_type": "temperature_extreme", - "severity": "warning", - "title": "Extreme Temperature", - "description": f"Temperature ({temp_value}°C) is outside normal range in {reading.room or 'unknown room'}" - }) - - # Log alerts as system events - for alert in alerts: - await self._log_alert_event(reading, **alert) - - async def _log_alert_event(self, reading: SensorReading, event_type: str, severity: str, - title: str, description: str) -> None: - """Log an alert as a system event""" - from models import SystemEvent - - event = SystemEvent( - event_id=str(uuid.uuid4()), - event_type=event_type, - severity=severity, - timestamp=int(datetime.utcnow().timestamp()), - title=title, - description=description, - sensor_id=reading.sensor_id, - room=reading.room, - source="sensor_service", - data=reading.dict() - ) - - await self.system_event_repo.create(event) - - async def _log_processing_error(self, error_message: str, raw_data: str) -> None: - """Log data processing error""" - from models import SystemEvent - - event = SystemEvent( - event_id=str(uuid.uuid4()), - event_type="data_processing_error", - severity="error", - timestamp=int(datetime.utcnow().timestamp()), - title="Sensor Data Processing Failed", - description=f"Failed to process sensor message: {error_message}", - source="sensor_service", - data={"raw_message": raw_data} - ) - - await self.system_event_repo.create(event) - - async def get_sensor_details(self, sensor_id: str) -> Optional[Dict[str, Any]]: - """Get complete sensor details including metadata and recent readings""" - # Get metadata - metadata = await self.sensor_metadata_repo.get_by_sensor_id(sensor_id) - if not metadata: - return None - - # Get recent readings - recent_readings = await self.sensor_reading_repo.get_recent_by_sensor( - sensor_id=sensor_id, - limit=100, - minutes=1440 # 24 hours - ) - - # Get latest reading from cache - latest_reading = await self.redis_repo.get_sensor_data(sensor_id) - - return { - "sensor": metadata, - "latest_reading": latest_reading, - "recent_readings_count": len(recent_readings), - "recent_readings": recent_readings[:10] # Return only 10 most recent - } - - async def update_sensor_metadata(self, sensor_id: str, metadata_updates: Dict[str, Any]) -> bool: - """Update sensor metadata with business validation""" - # Validate updates - if "sensor_id" in metadata_updates: - del metadata_updates["sensor_id"] # Cannot change sensor ID - - # Update timestamp - metadata_updates["updated_at"] = datetime.utcnow() - - return await self.sensor_metadata_repo.update(sensor_id, metadata_updates) - - async def delete_sensor(self, sensor_id: str) -> Dict[str, Any]: - """Delete a sensor and all its associated data""" - # Delete readings - readings_deleted = await self.sensor_reading_repo.delete_by_sensor_id(sensor_id) - - # Delete metadata - metadata_deleted = await self.sensor_metadata_repo.delete(sensor_id) - - # Clear cache - await self.redis_repo.delete_sensor_cache(sensor_id) - - return { - "sensor_id": sensor_id, - "readings_deleted": readings_deleted, - "metadata_deleted": metadata_deleted - } - - async def get_all_sensors(self, filters: Dict[str, Any] = None) -> Dict[str, Any]: - """Get all sensors with optional filtering""" - sensors = await self.sensor_metadata_repo.get_all(filters) - - return { - "sensors": sensors, - "count": len(sensors), - "filters": filters or {} - } \ No newline at end of file diff --git a/layers/infrastructure/__init__.py b/layers/infrastructure/__init__.py deleted file mode 100644 index e8a337c..0000000 --- a/layers/infrastructure/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# Empty file to make this a Python package \ No newline at end of file diff --git a/layers/infrastructure/__pycache__/__init__.cpython-312.pyc b/layers/infrastructure/__pycache__/__init__.cpython-312.pyc deleted file mode 100644 index 898b81d..0000000 Binary files a/layers/infrastructure/__pycache__/__init__.cpython-312.pyc and /dev/null differ diff --git a/layers/infrastructure/__pycache__/__init__.cpython-39.pyc b/layers/infrastructure/__pycache__/__init__.cpython-39.pyc deleted file mode 100644 index 35add97..0000000 Binary files a/layers/infrastructure/__pycache__/__init__.cpython-39.pyc and /dev/null differ diff --git a/layers/infrastructure/__pycache__/database_connection.cpython-312.pyc b/layers/infrastructure/__pycache__/database_connection.cpython-312.pyc deleted file mode 100644 index 2755451..0000000 Binary files a/layers/infrastructure/__pycache__/database_connection.cpython-312.pyc and /dev/null differ diff --git a/layers/infrastructure/__pycache__/database_connection.cpython-39.pyc b/layers/infrastructure/__pycache__/database_connection.cpython-39.pyc deleted file mode 100644 index 4178cb6..0000000 Binary files a/layers/infrastructure/__pycache__/database_connection.cpython-39.pyc and /dev/null differ diff --git a/layers/infrastructure/__pycache__/redis_connection.cpython-39.pyc b/layers/infrastructure/__pycache__/redis_connection.cpython-39.pyc deleted file mode 100644 index 6a0953f..0000000 Binary files a/layers/infrastructure/__pycache__/redis_connection.cpython-39.pyc and /dev/null differ diff --git a/layers/infrastructure/__pycache__/repositories.cpython-39.pyc b/layers/infrastructure/__pycache__/repositories.cpython-39.pyc deleted file mode 100644 index e1a1d2f..0000000 Binary files a/layers/infrastructure/__pycache__/repositories.cpython-39.pyc and /dev/null differ diff --git a/layers/infrastructure/database_connection.py b/layers/infrastructure/database_connection.py deleted file mode 100644 index cc024b5..0000000 --- a/layers/infrastructure/database_connection.py +++ /dev/null @@ -1,95 +0,0 @@ -""" -Database connection management for MongoDB -Infrastructure Layer - handles low-level database connectivity -""" -import os -from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase -from pymongo import IndexModel, ASCENDING, DESCENDING -from typing import Optional -import logging - -logger = logging.getLogger(__name__) - -class DatabaseConnection: - """Manages MongoDB connection and database operations""" - - def __init__(self): - self.client: Optional[AsyncIOMotorClient] = None - self.database: Optional[AsyncIOMotorDatabase] = None - self._mongodb_url = os.getenv("MONGODB_URL", "mongodb://localhost:27017") - self._database_name = os.getenv("DATABASE_NAME", "energy_monitoring") - - async def connect(self) -> None: - """Establish connection to MongoDB""" - try: - logger.info(f"Connecting to MongoDB at: {self._mongodb_url}") - - self.client = AsyncIOMotorClient(self._mongodb_url) - await self.client.admin.command('ping') - - self.database = self.client[self._database_name] - await self._create_indexes() - - logger.info("Successfully connected to MongoDB") - - except Exception as e: - logger.error(f"Error connecting to MongoDB: {e}") - raise - - async def disconnect(self) -> None: - """Close MongoDB connection""" - if self.client: - self.client.close() - logger.info("Disconnected from MongoDB") - - async def get_database(self) -> AsyncIOMotorDatabase: - """Get database instance""" - if not self.database: - await self.connect() - return self.database - - async def _create_indexes(self) -> None: - """Create database indexes for optimal performance""" - try: - # Sensor readings collection indexes - sensor_readings_indexes = [ - IndexModel([("sensor_id", ASCENDING), ("timestamp", DESCENDING)]), - IndexModel([("timestamp", DESCENDING)]), - IndexModel([("room", ASCENDING), ("timestamp", DESCENDING)]), - IndexModel([("sensor_type", ASCENDING), ("timestamp", DESCENDING)]), - IndexModel([("created_at", DESCENDING)]), - ] - await self.database.sensor_readings.create_indexes(sensor_readings_indexes) - - # Room metrics collection indexes - room_metrics_indexes = [ - IndexModel([("room", ASCENDING), ("timestamp", DESCENDING)]), - IndexModel([("timestamp", DESCENDING)]), - IndexModel([("created_at", DESCENDING)]), - ] - await self.database.room_metrics.create_indexes(room_metrics_indexes) - - # Sensor metadata collection indexes - sensor_metadata_indexes = [ - IndexModel([("sensor_id", ASCENDING)], unique=True), - IndexModel([("room", ASCENDING)]), - IndexModel([("sensor_type", ASCENDING)]), - IndexModel([("status", ASCENDING)]), - ] - await self.database.sensor_metadata.create_indexes(sensor_metadata_indexes) - - # System events collection indexes - system_events_indexes = [ - IndexModel([("timestamp", DESCENDING)]), - IndexModel([("event_type", ASCENDING), ("timestamp", DESCENDING)]), - IndexModel([("severity", ASCENDING), ("timestamp", DESCENDING)]), - ] - await self.database.system_events.create_indexes(system_events_indexes) - - logger.info("Database indexes created successfully") - - except Exception as e: - logger.error(f"Error creating indexes: {e}") - -# Global database connection instance -database_connection = DatabaseConnection() \ No newline at end of file diff --git a/layers/infrastructure/redis_connection.py b/layers/infrastructure/redis_connection.py deleted file mode 100644 index 574414f..0000000 --- a/layers/infrastructure/redis_connection.py +++ /dev/null @@ -1,80 +0,0 @@ -""" -Redis connection management and operations -Infrastructure Layer - handles Redis connectivity and low-level operations -""" -import os -import json -from typing import Optional, Dict, Any -import logging -import redis.asyncio as redis - -logger = logging.getLogger(__name__) - -class RedisConnection: - """Manages Redis connection and basic operations""" - - def __init__(self): - self.redis_client: Optional[redis.Redis] = None - self._host = os.getenv("REDIS_HOST", "localhost") - self._port = int(os.getenv("REDIS_PORT", "6379")) - self._db = int(os.getenv("REDIS_DB", "0")) - - async def connect(self) -> None: - """Connect to Redis""" - try: - self.redis_client = redis.Redis( - host=self._host, - port=self._port, - db=self._db, - decode_responses=True - ) - await self.redis_client.ping() - logger.info("Successfully connected to Redis") - except Exception as e: - logger.error(f"Error connecting to Redis: {e}") - raise - - async def disconnect(self) -> None: - """Disconnect from Redis""" - if self.redis_client: - await self.redis_client.close() - logger.info("Disconnected from Redis") - - async def get_client(self) -> redis.Redis: - """Get Redis client instance""" - if not self.redis_client: - await self.connect() - return self.redis_client - - async def set_with_expiry(self, key: str, value: str, expire_seconds: int = 3600) -> None: - """Set a key-value pair with expiration""" - client = await self.get_client() - await client.setex(key, expire_seconds, value) - - async def get(self, key: str) -> Optional[str]: - """Get value by key""" - client = await self.get_client() - return await client.get(key) - - async def delete(self, key: str) -> None: - """Delete a key""" - client = await self.get_client() - await client.delete(key) - - async def get_keys_by_pattern(self, pattern: str) -> list: - """Get keys matching a pattern""" - client = await self.get_client() - return await client.keys(pattern) - - async def publish(self, channel: str, message: str) -> None: - """Publish message to a channel""" - client = await self.get_client() - await client.publish(channel, message) - - async def create_pubsub(self) -> redis.client.PubSub: - """Create a pub/sub instance""" - client = await self.get_client() - return client.pubsub() - -# Global Redis connection instance -redis_connection = RedisConnection() \ No newline at end of file diff --git a/layers/infrastructure/repositories.py b/layers/infrastructure/repositories.py deleted file mode 100644 index c9c2945..0000000 --- a/layers/infrastructure/repositories.py +++ /dev/null @@ -1,362 +0,0 @@ -""" -Repository classes for data access -Infrastructure Layer - handles database operations and queries -""" -import json -from datetime import datetime, timedelta -from typing import List, Dict, Any, Optional -from pymongo import ASCENDING, DESCENDING -from pymongo.errors import DuplicateKeyError -import logging - -from .database_connection import database_connection -from .redis_connection import redis_connection -from models import SensorReading, SensorMetadata, RoomMetrics, SystemEvent - -logger = logging.getLogger(__name__) - -class SensorReadingRepository: - """Repository for sensor reading data operations""" - - async def create(self, reading: SensorReading) -> bool: - """Store sensor reading in MongoDB""" - try: - db = await database_connection.get_database() - reading_dict = reading.dict() - - # Add document ID for deduplication - reading_dict["_id"] = f"{reading.sensor_id}_{reading.timestamp}" - - await db.sensor_readings.insert_one(reading_dict) - logger.debug(f"Stored sensor reading for {reading.sensor_id}") - return True - - except DuplicateKeyError: - logger.debug(f"Duplicate reading ignored for {reading.sensor_id} at {reading.timestamp}") - return True - except Exception as e: - logger.error(f"Error storing sensor reading: {e}") - return False - - async def get_recent_by_sensor(self, sensor_id: str, limit: int = 100, minutes: int = 60) -> List[Dict]: - """Get recent readings for a specific sensor""" - try: - db = await database_connection.get_database() - query = { - "sensor_id": sensor_id, - "created_at": {"$gte": datetime.utcnow() - timedelta(minutes=minutes)} - } - - cursor = db.sensor_readings.find(query).sort("created_at", -1).limit(limit) - readings = await cursor.to_list(length=limit) - - # Convert ObjectId to string - for reading in readings: - reading["_id"] = str(reading["_id"]) - - return readings - - except Exception as e: - logger.error(f"Error getting recent readings for {sensor_id}: {e}") - return [] - - async def get_recent_by_room(self, room: str, minutes: int = 5) -> List[Dict]: - """Get recent readings for a specific room""" - try: - db = await database_connection.get_database() - recent_time = datetime.utcnow() - timedelta(minutes=minutes) - - cursor = db.sensor_readings.find({ - "room": room, - "created_at": {"$gte": recent_time} - }) - - readings = await cursor.to_list(length=None) - return readings - - except Exception as e: - logger.error(f"Error getting recent readings for room {room}: {e}") - return [] - - async def get_by_query(self, query: Dict[str, Any], sort_by: str = "timestamp", - sort_order: str = "desc", limit: int = 100, offset: int = 0) -> List[Dict]: - """Get readings by complex query""" - try: - db = await database_connection.get_database() - - sort_direction = DESCENDING if sort_order == "desc" else ASCENDING - cursor = db.sensor_readings.find(query).sort(sort_by, sort_direction).skip(offset).limit(limit) - - readings = await cursor.to_list(length=limit) - - # Convert ObjectId to string - for reading in readings: - reading["_id"] = str(reading["_id"]) - - return readings - - except Exception as e: - logger.error(f"Error querying sensor readings: {e}") - return [] - - async def count_by_query(self, query: Dict[str, Any]) -> int: - """Count readings matching query""" - try: - db = await database_connection.get_database() - return await db.sensor_readings.count_documents(query) - except Exception as e: - logger.error(f"Error counting sensor readings: {e}") - return 0 - - async def get_distinct_rooms(self) -> List[str]: - """Get list of distinct rooms""" - try: - db = await database_connection.get_database() - return await db.sensor_readings.distinct("room", {"room": {"$ne": None}}) - except Exception as e: - logger.error(f"Error getting distinct rooms: {e}") - return [] - - async def get_distinct_sensor_ids_by_room(self, room: str) -> List[str]: - """Get distinct sensor IDs for a room""" - try: - db = await database_connection.get_database() - return await db.sensor_readings.distinct("sensor_id", {"room": room}) - except Exception as e: - logger.error(f"Error getting distinct sensor IDs for room {room}: {e}") - return [] - - async def delete_by_sensor_id(self, sensor_id: str) -> int: - """Delete all readings for a sensor""" - try: - db = await database_connection.get_database() - result = await db.sensor_readings.delete_many({"sensor_id": sensor_id}) - return result.deleted_count - except Exception as e: - logger.error(f"Error deleting readings for sensor {sensor_id}: {e}") - return 0 - - async def aggregate(self, pipeline: List[Dict]) -> List[Dict]: - """Execute aggregation pipeline""" - try: - db = await database_connection.get_database() - cursor = db.sensor_readings.aggregate(pipeline) - return await cursor.to_list(length=None) - except Exception as e: - logger.error(f"Error executing aggregation: {e}") - return [] - -class SensorMetadataRepository: - """Repository for sensor metadata operations""" - - async def create(self, metadata: SensorMetadata) -> bool: - """Create sensor metadata""" - try: - db = await database_connection.get_database() - await db.sensor_metadata.insert_one(metadata.dict()) - logger.info(f"Created metadata for sensor: {metadata.sensor_id}") - return True - except Exception as e: - logger.error(f"Error creating sensor metadata: {e}") - return False - - async def update(self, sensor_id: str, updates: Dict[str, Any]) -> bool: - """Update sensor metadata""" - try: - db = await database_connection.get_database() - updates["updated_at"] = datetime.utcnow() - - result = await db.sensor_metadata.update_one( - {"sensor_id": sensor_id}, - {"$set": updates} - ) - return result.modified_count > 0 - except Exception as e: - logger.error(f"Error updating sensor metadata: {e}") - return False - - async def get_by_sensor_id(self, sensor_id: str) -> Optional[Dict]: - """Get sensor metadata by ID""" - try: - db = await database_connection.get_database() - metadata = await db.sensor_metadata.find_one({"sensor_id": sensor_id}) - if metadata: - metadata["_id"] = str(metadata["_id"]) - return metadata - except Exception as e: - logger.error(f"Error getting sensor metadata: {e}") - return None - - async def get_all(self, filters: Dict[str, Any] = None) -> List[Dict]: - """Get all sensor metadata with optional filters""" - try: - db = await database_connection.get_database() - query = filters or {} - - cursor = db.sensor_metadata.find(query).sort("created_at", DESCENDING) - metadata_list = await cursor.to_list(length=None) - - # Convert ObjectId to string - for metadata in metadata_list: - metadata["_id"] = str(metadata["_id"]) - - return metadata_list - except Exception as e: - logger.error(f"Error getting sensor metadata: {e}") - return [] - - async def delete(self, sensor_id: str) -> bool: - """Delete sensor metadata""" - try: - db = await database_connection.get_database() - result = await db.sensor_metadata.delete_one({"sensor_id": sensor_id}) - return result.deleted_count > 0 - except Exception as e: - logger.error(f"Error deleting sensor metadata: {e}") - return False - -class RoomMetricsRepository: - """Repository for room metrics operations""" - - async def create(self, metrics: RoomMetrics) -> bool: - """Store room metrics""" - try: - db = await database_connection.get_database() - await db.room_metrics.insert_one(metrics.dict()) - logger.debug(f"Stored room metrics for {metrics.room}") - return True - except Exception as e: - logger.error(f"Error storing room metrics: {e}") - return False - - async def get_by_room(self, room: str, limit: int = 100) -> List[Dict]: - """Get room metrics by room name""" - try: - db = await database_connection.get_database() - cursor = db.room_metrics.find({"room": room}).sort("timestamp", DESCENDING).limit(limit) - metrics = await cursor.to_list(length=limit) - - # Convert ObjectId to string - for metric in metrics: - metric["_id"] = str(metric["_id"]) - - return metrics - except Exception as e: - logger.error(f"Error getting room metrics for {room}: {e}") - return [] - -class SystemEventRepository: - """Repository for system events operations""" - - async def create(self, event: SystemEvent) -> bool: - """Create system event""" - try: - db = await database_connection.get_database() - await db.system_events.insert_one(event.dict()) - logger.info(f"System event logged: {event.event_type} - {event.title}") - return True - except Exception as e: - logger.error(f"Error logging system event: {e}") - return False - - async def get_recent(self, hours: int = 24, limit: int = 50, - filters: Dict[str, Any] = None) -> List[Dict]: - """Get recent system events""" - try: - db = await database_connection.get_database() - start_time = datetime.utcnow() - timedelta(hours=hours) - - query = {"created_at": {"$gte": start_time}} - if filters: - query.update(filters) - - cursor = db.system_events.find(query).sort("timestamp", DESCENDING).limit(limit) - events = await cursor.to_list(length=limit) - - # Convert ObjectId to string - for event in events: - event["_id"] = str(event["_id"]) - - return events - except Exception as e: - logger.error(f"Error getting recent events: {e}") - return [] - -class RedisRepository: - """Repository for Redis cache operations""" - - async def set_sensor_data(self, sensor_id: str, data: Dict[str, Any], expire_seconds: int = 3600) -> bool: - """Store latest sensor data in Redis cache""" - try: - key = f"sensor:latest:{sensor_id}" - json_data = json.dumps(data) - await redis_connection.set_with_expiry(key, json_data, expire_seconds) - return True - except Exception as e: - logger.error(f"Error caching sensor data: {e}") - return False - - async def get_sensor_data(self, sensor_id: str) -> Optional[Dict[str, Any]]: - """Get latest sensor data from Redis cache""" - try: - key = f"sensor:latest:{sensor_id}" - data = await redis_connection.get(key) - if data: - return json.loads(data) - return None - except Exception as e: - logger.error(f"Error getting cached sensor data: {e}") - return None - - async def set_sensor_status(self, sensor_id: str, status_data: Dict[str, Any], expire_seconds: int = 1800) -> bool: - """Set sensor status in Redis""" - try: - key = f"sensor:status:{sensor_id}" - json_data = json.dumps(status_data) - await redis_connection.set_with_expiry(key, json_data, expire_seconds) - return True - except Exception as e: - logger.error(f"Error setting sensor status: {e}") - return False - - async def set_room_metrics(self, room: str, metrics: Dict[str, Any], expire_seconds: int = 1800) -> bool: - """Store room metrics in Redis cache""" - try: - key = f"room:metrics:{room}" - json_data = json.dumps(metrics) - await redis_connection.set_with_expiry(key, json_data, expire_seconds) - return True - except Exception as e: - logger.error(f"Error caching room metrics: {e}") - return False - - async def get_room_metrics(self, room: str) -> Optional[Dict[str, Any]]: - """Get room metrics from Redis cache""" - try: - key = f"room:metrics:{room}" - data = await redis_connection.get(key) - if data: - return json.loads(data) - return None - except Exception as e: - logger.error(f"Error getting cached room metrics: {e}") - return None - - async def get_active_sensors(self) -> List[str]: - """Get list of active sensors from Redis""" - try: - keys = await redis_connection.get_keys_by_pattern("sensor:latest:*") - return [key.replace("sensor:latest:", "") for key in keys] - except Exception as e: - logger.error(f"Error getting active sensors: {e}") - return [] - - async def delete_sensor_cache(self, sensor_id: str) -> bool: - """Delete all cached data for a sensor""" - try: - await redis_connection.delete(f"sensor:latest:{sensor_id}") - await redis_connection.delete(f"sensor:status:{sensor_id}") - return True - except Exception as e: - logger.error(f"Error deleting sensor cache: {e}") - return False \ No newline at end of file diff --git a/layers/presentation/__init__.py b/layers/presentation/__init__.py deleted file mode 100644 index e8a337c..0000000 --- a/layers/presentation/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# Empty file to make this a Python package \ No newline at end of file diff --git a/layers/presentation/__pycache__/__init__.cpython-39.pyc b/layers/presentation/__pycache__/__init__.cpython-39.pyc deleted file mode 100644 index ce43cbd..0000000 Binary files a/layers/presentation/__pycache__/__init__.cpython-39.pyc and /dev/null differ diff --git a/layers/presentation/__pycache__/api_routes.cpython-39.pyc b/layers/presentation/__pycache__/api_routes.cpython-39.pyc deleted file mode 100644 index 373d25c..0000000 Binary files a/layers/presentation/__pycache__/api_routes.cpython-39.pyc and /dev/null differ diff --git a/layers/presentation/__pycache__/redis_subscriber.cpython-39.pyc b/layers/presentation/__pycache__/redis_subscriber.cpython-39.pyc deleted file mode 100644 index 9a6d5a3..0000000 Binary files a/layers/presentation/__pycache__/redis_subscriber.cpython-39.pyc and /dev/null differ diff --git a/layers/presentation/__pycache__/websocket_handler.cpython-39.pyc b/layers/presentation/__pycache__/websocket_handler.cpython-39.pyc deleted file mode 100644 index 7d6b862..0000000 Binary files a/layers/presentation/__pycache__/websocket_handler.cpython-39.pyc and /dev/null differ diff --git a/layers/presentation/api_routes.py b/layers/presentation/api_routes.py deleted file mode 100644 index b494f36..0000000 --- a/layers/presentation/api_routes.py +++ /dev/null @@ -1,404 +0,0 @@ -""" -API routes for the energy monitoring system -Presentation Layer - handles HTTP endpoints and request/response formatting -""" -from fastapi import APIRouter, HTTPException, Query, Depends -from typing import List, Optional, Dict, Any -from datetime import datetime, timedelta -import time -import logging - -from models import ( - DataQuery, DataResponse, SensorType, SensorStatus, HealthCheck -) -from ..business.sensor_service import SensorService -from ..business.room_service import RoomService -from ..business.analytics_service import AnalyticsService -from ..infrastructure.database_connection import database_connection -from ..infrastructure.redis_connection import redis_connection - -logger = logging.getLogger(__name__) -router = APIRouter() - -# Initialize services -sensor_service = SensorService() -room_service = RoomService() -analytics_service = AnalyticsService() - -# Dependency to check database connection -async def check_database(): - """Dependency to ensure database is connected""" - try: - db = await database_connection.get_database() - return db - except Exception as e: - logger.error(f"Database connection failed: {e}") - raise HTTPException(status_code=503, detail="Database unavailable") - -@router.get("/sensors", summary="Get all sensors") -async def get_sensors( - room: Optional[str] = Query(None, description="Filter by room"), - sensor_type: Optional[SensorType] = Query(None, description="Filter by sensor type"), - status: Optional[SensorStatus] = Query(None, description="Filter by status"), - db=Depends(check_database) -): - """Get list of all registered sensors with optional filtering""" - try: - # Build filters - filters = {} - if room: - filters["room"] = room - if sensor_type: - filters["sensor_type"] = sensor_type.value - if status: - filters["status"] = status.value - - result = await sensor_service.get_all_sensors(filters) - return result - - except Exception as e: - logger.error(f"Error getting sensors: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@router.get("/sensors/{sensor_id}", summary="Get sensor details") -async def get_sensor(sensor_id: str, db=Depends(check_database)): - """Get detailed information about a specific sensor""" - try: - result = await sensor_service.get_sensor_details(sensor_id) - - if not result: - raise HTTPException(status_code=404, detail="Sensor not found") - - return result - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error getting sensor {sensor_id}: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@router.get("/sensors/{sensor_id}/data", summary="Get sensor historical data") -async def get_sensor_data( - sensor_id: str, - start_time: Optional[int] = Query(None, description="Start timestamp (Unix)"), - end_time: Optional[int] = Query(None, description="End timestamp (Unix)"), - limit: int = Query(100, description="Maximum records to return"), - offset: int = Query(0, description="Records to skip"), - db=Depends(check_database) -): - """Get historical data for a specific sensor""" - try: - start_query_time = time.time() - - # Build query - query = {"sensor_id": sensor_id} - - if start_time or end_time: - time_query = {} - if start_time: - time_query["$gte"] = datetime.fromtimestamp(start_time) - if end_time: - time_query["$lte"] = datetime.fromtimestamp(end_time) - query["created_at"] = time_query - - # Get total count and readings through service layer - from ..infrastructure.repositories import SensorReadingRepository - repo = SensorReadingRepository() - - total_count = await repo.count_by_query(query) - readings = await repo.get_by_query( - query=query, - sort_by="timestamp", - sort_order="desc", - limit=limit, - offset=offset - ) - - execution_time = (time.time() - start_query_time) * 1000 - - return DataResponse( - data=readings, - total_count=total_count, - query=DataQuery( - sensor_ids=[sensor_id], - start_time=start_time, - end_time=end_time, - limit=limit, - offset=offset - ), - execution_time_ms=execution_time - ) - - except Exception as e: - logger.error(f"Error getting sensor data for {sensor_id}: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@router.get("/rooms", summary="Get all rooms") -async def get_rooms(db=Depends(check_database)): - """Get list of all rooms with sensor counts and latest metrics""" - try: - result = await room_service.get_all_rooms() - return result - - except Exception as e: - logger.error(f"Error getting rooms: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@router.get("/rooms/{room_name}/data", summary="Get room historical data") -async def get_room_data( - room_name: str, - start_time: Optional[int] = Query(None, description="Start timestamp (Unix)"), - end_time: Optional[int] = Query(None, description="End timestamp (Unix)"), - limit: int = Query(100, description="Maximum records to return"), - db=Depends(check_database) -): - """Get historical data for a specific room""" - try: - start_query_time = time.time() - - result = await room_service.get_room_data( - room_name=room_name, - start_time=start_time, - end_time=end_time, - limit=limit - ) - - execution_time = (time.time() - start_query_time) * 1000 - result["execution_time_ms"] = execution_time - - return result - - except Exception as e: - logger.error(f"Error getting room data for {room_name}: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@router.post("/data/query", summary="Advanced data query", response_model=DataResponse) -async def query_data(query_params: DataQuery, db=Depends(check_database)): - """Advanced data querying with multiple filters and aggregations""" - try: - start_query_time = time.time() - - # Build MongoDB query - mongo_query = {} - - # Sensor filters - if query_params.sensor_ids: - mongo_query["sensor_id"] = {"$in": query_params.sensor_ids} - - if query_params.rooms: - mongo_query["room"] = {"$in": query_params.rooms} - - if query_params.sensor_types: - mongo_query["sensor_type"] = {"$in": [st.value for st in query_params.sensor_types]} - - # Time range - if query_params.start_time or query_params.end_time: - time_query = {} - if query_params.start_time: - time_query["$gte"] = datetime.fromtimestamp(query_params.start_time) - if query_params.end_time: - time_query["$lte"] = datetime.fromtimestamp(query_params.end_time) - mongo_query["created_at"] = time_query - - # Execute query through repository - from ..infrastructure.repositories import SensorReadingRepository - repo = SensorReadingRepository() - - total_count = await repo.count_by_query(mongo_query) - readings = await repo.get_by_query( - query=mongo_query, - sort_by=query_params.sort_by, - sort_order=query_params.sort_order, - limit=query_params.limit, - offset=query_params.offset - ) - - execution_time = (time.time() - start_query_time) * 1000 - - return DataResponse( - data=readings, - total_count=total_count, - query=query_params, - execution_time_ms=execution_time - ) - - except Exception as e: - logger.error(f"Error executing data query: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@router.get("/analytics/summary", summary="Get analytics summary") -async def get_analytics_summary( - hours: int = Query(24, description="Hours of data to analyze"), - db=Depends(check_database) -): - """Get analytics summary for the specified time period""" - try: - result = await analytics_service.get_analytics_summary(hours) - return result - - except Exception as e: - logger.error(f"Error getting analytics summary: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@router.get("/analytics/trends", summary="Get energy trends") -async def get_energy_trends( - hours: int = Query(168, description="Hours of data to analyze (default: 1 week)"), - db=Depends(check_database) -): - """Get energy consumption trends""" - try: - result = await analytics_service.get_energy_trends(hours) - return result - - except Exception as e: - logger.error(f"Error getting energy trends: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@router.get("/analytics/rooms", summary="Get room comparison analytics") -async def get_room_comparison( - hours: int = Query(24, description="Hours of data to analyze"), - db=Depends(check_database) -): - """Get room-by-room comparison analytics""" - try: - result = await analytics_service.get_room_comparison(hours) - return result - - except Exception as e: - logger.error(f"Error getting room comparison: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@router.get("/events", summary="Get system events") -async def get_events( - severity: Optional[str] = Query(None, description="Filter by severity"), - event_type: Optional[str] = Query(None, description="Filter by event type"), - hours: int = Query(24, description="Hours of events to retrieve"), - limit: int = Query(50, description="Maximum events to return"), - db=Depends(check_database) -): - """Get recent system events and alerts""" - try: - # Build filters - filters = {} - if severity: - filters["severity"] = severity - if event_type: - filters["event_type"] = event_type - - from ..infrastructure.repositories import SystemEventRepository - repo = SystemEventRepository() - - events = await repo.get_recent( - hours=hours, - limit=limit, - filters=filters - ) - - return { - "events": events, - "count": len(events), - "period_hours": hours - } - - except Exception as e: - logger.error(f"Error getting events: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@router.put("/sensors/{sensor_id}/metadata", summary="Update sensor metadata") -async def update_sensor_metadata( - sensor_id: str, - metadata: dict, - db=Depends(check_database) -): - """Update sensor metadata""" - try: - success = await sensor_service.update_sensor_metadata(sensor_id, metadata) - - if not success: - raise HTTPException(status_code=404, detail="Sensor not found") - - return {"message": "Sensor metadata updated successfully"} - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error updating sensor metadata: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@router.delete("/sensors/{sensor_id}", summary="Delete sensor and all its data") -async def delete_sensor(sensor_id: str, db=Depends(check_database)): - """Delete a sensor and all its associated data""" - try: - result = await sensor_service.delete_sensor(sensor_id) - - if result["readings_deleted"] == 0 and not result.get("metadata_deleted"): - raise HTTPException(status_code=404, detail="Sensor not found") - - return { - "message": "Sensor deleted successfully", - **result - } - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error deleting sensor {sensor_id}: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@router.get("/export", summary="Export data") -async def export_data( - start_time: int = Query(..., description="Start timestamp (Unix)"), - end_time: int = Query(..., description="End timestamp (Unix)"), - sensor_ids: Optional[str] = Query(None, description="Comma-separated sensor IDs"), - format: str = Query("json", description="Export format (json, csv)"), - db=Depends(check_database) -): - """Export sensor data for the specified time range""" - try: - # Build query - query = { - "created_at": { - "$gte": datetime.fromtimestamp(start_time), - "$lte": datetime.fromtimestamp(end_time) - } - } - - if sensor_ids: - sensor_list = [sid.strip() for sid in sensor_ids.split(",")] - query["sensor_id"] = {"$in": sensor_list} - - # Get data through repository - from ..infrastructure.repositories import SensorReadingRepository - repo = SensorReadingRepository() - - readings = await repo.get_by_query( - query=query, - sort_by="timestamp", - sort_order="asc", - limit=10000 # Large limit for export - ) - - # Convert datetime fields for JSON serialization - for reading in readings: - if "created_at" in reading and hasattr(reading["created_at"], "isoformat"): - reading["created_at"] = reading["created_at"].isoformat() - - if format.lower() == "csv": - raise HTTPException(status_code=501, detail="CSV export not yet implemented") - - return { - "data": readings, - "count": len(readings), - "export_params": { - "start_time": start_time, - "end_time": end_time, - "sensor_ids": sensor_ids.split(",") if sensor_ids else None, - "format": format - } - } - - except HTTPException: - raise - except Exception as e: - logger.error(f"Error exporting data: {e}") - raise HTTPException(status_code=500, detail="Internal server error") \ No newline at end of file diff --git a/layers/presentation/redis_subscriber.py b/layers/presentation/redis_subscriber.py deleted file mode 100644 index 0c2f6e1..0000000 --- a/layers/presentation/redis_subscriber.py +++ /dev/null @@ -1,128 +0,0 @@ -""" -Redis subscriber for real-time data processing -Presentation Layer - handles Redis pub/sub and WebSocket broadcasting -""" -import asyncio -import logging - -from ..infrastructure.redis_connection import redis_connection -from ..business.sensor_service import SensorService -from ..business.room_service import RoomService -from .websocket_handler import websocket_manager - -logger = logging.getLogger(__name__) - -class RedisSubscriber: - """Manages Redis subscription and data broadcasting""" - - def __init__(self): - self.sensor_service = SensorService() - self.room_service = RoomService() - self.is_running = False - self.subscription_task = None - - async def start_subscription(self, channel: str = "energy_data") -> None: - """Start Redis subscription in background task""" - if self.is_running: - logger.warning("Redis subscriber is already running") - return - - self.is_running = True - self.subscription_task = asyncio.create_task(self._subscribe_loop(channel)) - logger.info(f"Started Redis subscriber for channel: {channel}") - - async def stop_subscription(self) -> None: - """Stop Redis subscription""" - self.is_running = False - if self.subscription_task: - self.subscription_task.cancel() - try: - await self.subscription_task - except asyncio.CancelledError: - pass - logger.info("Redis subscriber stopped") - - async def _subscribe_loop(self, channel: str) -> None: - """Main subscription loop""" - logger.info("Starting Redis subscriber...") - - try: - # Get Redis client and create pubsub - redis_client = await redis_connection.get_client() - pubsub = await redis_connection.create_pubsub() - - # Subscribe to channel - await pubsub.subscribe(channel) - logger.info(f"Subscribed to Redis channel: '{channel}'") - - while self.is_running: - try: - # Get message with timeout - message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0) - - if message and message.get('data'): - await self._process_message(message['data']) - - except Exception as e: - logger.error(f"Error in Redis subscriber loop: {e}") - # Add delay to prevent rapid-fire errors - await asyncio.sleep(5) - - except Exception as e: - logger.error(f"Could not connect to Redis for subscription: {e}") - finally: - # Clean up pubsub connection - try: - await pubsub.unsubscribe(channel) - await pubsub.close() - except Exception as e: - logger.error(f"Error closing pubsub connection: {e}") - - async def _process_message(self, message_data: str) -> None: - """Process incoming Redis message""" - try: - logger.debug(f"Received from Redis: {message_data}") - - # Process sensor data through business layer - processing_success = await self.sensor_service.process_sensor_message(message_data) - - if processing_success: - # Extract room from message for room metrics update - import json - try: - data = json.loads(message_data) - room = data.get('room') - if room: - # Update room metrics asynchronously - asyncio.create_task(self.room_service.update_room_metrics(room)) - except json.JSONDecodeError: - logger.warning("Could not parse message for room extraction") - - # Broadcast to WebSocket clients - await websocket_manager.broadcast(message_data) - else: - logger.warning("Sensor data processing failed, skipping broadcast") - - except Exception as e: - logger.error(f"Error processing Redis message: {e}") - - def is_subscriber_running(self) -> bool: - """Check if subscriber is currently running""" - return self.is_running and ( - self.subscription_task is not None and - not self.subscription_task.done() - ) - - async def get_subscriber_status(self) -> dict: - """Get subscriber status information""" - return { - "is_running": self.is_running, - "task_status": ( - "running" if self.subscription_task and not self.subscription_task.done() - else "stopped" - ), - "active_websocket_connections": websocket_manager.get_connection_count() - } - -# Global Redis subscriber instance -redis_subscriber = RedisSubscriber() \ No newline at end of file diff --git a/layers/presentation/websocket_handler.py b/layers/presentation/websocket_handler.py deleted file mode 100644 index cb565ad..0000000 --- a/layers/presentation/websocket_handler.py +++ /dev/null @@ -1,97 +0,0 @@ -""" -WebSocket connection handler -Presentation Layer - manages WebSocket connections and real-time communication -""" -import asyncio -from typing import List -from fastapi import WebSocket, WebSocketDisconnect -import logging - -logger = logging.getLogger(__name__) - -class WebSocketManager: - """Manages WebSocket connections and broadcasting""" - - def __init__(self): - self.active_connections: List[WebSocket] = [] - - async def connect(self, websocket: WebSocket) -> None: - """Accept and store new WebSocket connection""" - await websocket.accept() - self.active_connections.append(websocket) - logger.info(f"New client connected. Total clients: {len(self.active_connections)}") - - def disconnect(self, websocket: WebSocket) -> None: - """Remove WebSocket connection""" - if websocket in self.active_connections: - self.active_connections.remove(websocket) - logger.info(f"Client disconnected. Total clients: {len(self.active_connections)}") - - async def send_personal_message(self, message: str, websocket: WebSocket) -> None: - """Send message to specific WebSocket connection""" - try: - await websocket.send_text(message) - except Exception as e: - logger.error(f"Error sending personal message: {e}") - self.disconnect(websocket) - - async def broadcast(self, message: str) -> None: - """Broadcast message to all connected clients""" - if not self.active_connections: - return - - try: - # Send to all connections concurrently - tasks = [ - self._safe_send_message(connection, message) - for connection in self.active_connections.copy() - ] - - # Execute all sends concurrently and handle exceptions - results = await asyncio.gather(*tasks, return_exceptions=True) - - # Remove failed connections - failed_connections = [] - for i, result in enumerate(results): - if isinstance(result, Exception): - failed_connections.append(self.active_connections[i]) - - for connection in failed_connections: - self.disconnect(connection) - - except Exception as e: - logger.error(f"Error in broadcast: {e}") - - async def _safe_send_message(self, websocket: WebSocket, message: str) -> None: - """Safely send message to WebSocket with error handling""" - try: - await websocket.send_text(message) - except WebSocketDisconnect: - # Connection was closed - raise - except Exception as e: - logger.error(f"Error sending message to client: {e}") - raise - - def get_connection_count(self) -> int: - """Get number of active connections""" - return len(self.active_connections) - - async def ping_all_connections(self) -> int: - """Ping all connections to check health, return number of healthy connections""" - if not self.active_connections: - return 0 - - healthy_connections = [] - for connection in self.active_connections.copy(): - try: - await connection.ping() - healthy_connections.append(connection) - except Exception: - logger.debug("Removing unhealthy connection") - - self.active_connections = healthy_connections - return len(healthy_connections) - -# Global WebSocket manager instance -websocket_manager = WebSocketManager() \ No newline at end of file diff --git a/main.py b/main.py deleted file mode 100644 index e0e6580..0000000 --- a/main.py +++ /dev/null @@ -1,202 +0,0 @@ - -import asyncio -import json -import redis.asyncio as redis -import time -import os -from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, Depends, Query -from fastapi.middleware.cors import CORSMiddleware -from typing import List, Optional -import logging -from contextlib import asynccontextmanager - -# Import our custom modules -from database import connect_to_mongo, close_mongo_connection, redis_manager, schedule_cleanup -from persistence import persistence_service -from models import DataQuery, DataResponse, HealthCheck -from api import router as api_router - -# Configure logging -logging.basicConfig(level=logging.INFO) -logger = logging.getLogger(__name__) - -# Application startup time for uptime calculation -app_start_time = time.time() - -@asynccontextmanager -async def lifespan(app: FastAPI): - """Application lifespan manager""" - # Startup - logger.info("Application starting up...") - - # Connect to databases - await connect_to_mongo() - await persistence_service.initialize() - - # Start background tasks - asyncio.create_task(redis_subscriber()) - asyncio.create_task(schedule_cleanup()) - - logger.info("Application startup complete") - - yield - - # Shutdown - logger.info("Application shutting down...") - await close_mongo_connection() - await redis_manager.disconnect() - logger.info("Application shutdown complete") - -app = FastAPI( - title="Energy Monitoring Dashboard API", - description="Real-time energy monitoring and IoT sensor data management system", - version="1.0.0", - lifespan=lifespan -) - -# Add CORS middleware -app.add_middleware( - CORSMiddleware, - allow_origins=["*"], # Configure appropriately for production - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) - -# Include API router -app.include_router(api_router, prefix="/api/v1") - -# In-memory store for active WebSocket connections -active_connections: List[WebSocket] = [] - -# Redis channel to subscribe to -REDIS_CHANNEL = "energy_data" - - -@app.websocket("/ws") -async def websocket_endpoint(websocket: WebSocket): - """ - WebSocket endpoint that connects a client, adds them to the active pool, - and removes them on disconnection. - """ - await websocket.accept() - active_connections.append(websocket) - logger.info(f"New client connected. Total clients: {len(active_connections)}") - try: - while True: - # Keep the connection alive - await websocket.receive_text() - except WebSocketDisconnect: - active_connections.remove(websocket) - logger.info(f"Client disconnected. Total clients: {len(active_connections)}") - - -async def redis_subscriber(): - """ - Connects to Redis, subscribes to the specified channel, and broadcasts - messages to all active WebSocket clients. Also persists data to MongoDB. - """ - logger.info("Starting Redis subscriber...") - try: - r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True) - await r.ping() - logger.info("Successfully connected to Redis for subscription.") - except Exception as e: - logger.error(f"Could not connect to Redis for subscription: {e}") - return - - pubsub = r.pubsub() - await pubsub.subscribe(REDIS_CHANNEL) - - logger.info(f"Subscribed to Redis channel: '{REDIS_CHANNEL}'") - while True: - try: - message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0) - if message: - message_data = message['data'] - logger.debug(f"Received from Redis: {message_data}") - - # Process and persist the data - await persistence_service.process_sensor_message(message_data) - - # Broadcast message to all connected WebSocket clients - if active_connections: - await asyncio.gather( - *[connection.send_text(message_data) for connection in active_connections], - return_exceptions=True - ) - - except Exception as e: - logger.error(f"Error in Redis subscriber loop: {e}") - # Add a delay to prevent rapid-fire errors - await asyncio.sleep(5) - - -@app.get("/") -async def read_root(): - """Root endpoint with basic system information""" - return { - "message": "Energy Monitoring Dashboard Backend", - "version": "1.0.0", - "status": "running", - "uptime_seconds": time.time() - app_start_time - } - - -@app.get("/health", response_model=HealthCheck) -async def health_check(): - """Health check endpoint""" - try: - # Check database connections - mongodb_connected = True - redis_connected = True - - try: - await persistence_service.db.command("ping") - except: - mongodb_connected = False - - try: - await redis_manager.redis_client.ping() - except: - redis_connected = False - - # Get system statistics - stats = await persistence_service.get_sensor_statistics() - - # Determine overall status - status = "healthy" - if not mongodb_connected or not redis_connected: - status = "degraded" - - return HealthCheck( - status=status, - mongodb_connected=mongodb_connected, - redis_connected=redis_connected, - total_sensors=stats.get("total_sensors", 0), - active_sensors=stats.get("active_sensors", 0), - total_readings=stats.get("total_readings", 0), - uptime_seconds=time.time() - app_start_time - ) - - except Exception as e: - logger.error(f"Health check failed: {e}") - raise HTTPException(status_code=503, detail="Service Unavailable") - - -@app.get("/status") -async def system_status(): - """Detailed system status endpoint""" - try: - stats = await persistence_service.get_sensor_statistics() - - return { - "timestamp": time.time(), - "uptime_seconds": time.time() - app_start_time, - "active_websocket_connections": len(active_connections), - "database_stats": stats - } - - except Exception as e: - logger.error(f"Status check failed: {e}") - raise HTTPException(status_code=500, detail="Internal Server Error") diff --git a/microservices_example.md b/microservices_example.md deleted file mode 100644 index 6e6c3c4..0000000 --- a/microservices_example.md +++ /dev/null @@ -1,84 +0,0 @@ -# Microservices Architecture Example - -## Service Decomposition - -### 1. Sensor Data Service -**Responsibility**: Sensor data ingestion, validation, and storage -``` -Port: 8001 -Database: sensor_db (MongoDB) -Endpoints: -- POST /sensors/data # Ingest sensor readings -- GET /sensors/{id}/data # Get sensor history -- GET /sensors # List sensors -``` - -### 2. Room Management Service -**Responsibility**: Room metrics, aggregations, and space management -``` -Port: 8002 -Database: room_db (MongoDB) -Endpoints: -- GET /rooms # List rooms -- GET /rooms/{id}/metrics # Current room metrics -- GET /rooms/{id}/history # Historical room data -``` - -### 3. Analytics Service -**Responsibility**: Data analysis, reporting, and insights -``` -Port: 8003 -Database: analytics_db (PostgreSQL/ClickHouse) -Endpoints: -- GET /analytics/summary # Dashboard summary -- GET /analytics/trends # Trend analysis -- GET /analytics/reports/{id} # Generated reports -``` - -### 4. Notification Service -**Responsibility**: Alerts, events, and real-time notifications -``` -Port: 8004 -Database: events_db (MongoDB) -Message Queue: RabbitMQ/Kafka -Endpoints: -- POST /notifications/send # Send notification -- GET /events # System events -- WebSocket: /ws/notifications # Real-time alerts -``` - -### 5. API Gateway -**Responsibility**: Request routing, authentication, rate limiting -``` -Port: 8000 -Routes all requests to appropriate services -Handles CORS, authentication, logging -``` - -## Inter-Service Communication - -### Synchronous (HTTP/REST) -```python -# Analytics Service calling Sensor Service -import httpx - -async def get_sensor_data(sensor_id: str): - async with httpx.AsyncClient() as client: - response = await client.get(f"http://sensor-service:8001/sensors/{sensor_id}/data") - return response.json() -``` - -### Asynchronous (Message Queue) -```python -# Sensor Service publishes event -await message_queue.publish("sensor.data.received", { - "sensor_id": "sensor_001", - "timestamp": datetime.utcnow(), - "data": sensor_reading -}) - -# Room Service subscribes to event -@message_queue.subscribe("sensor.data.received") -async def handle_sensor_data(message): - await room_service.update_room_metrics(message.data) -``` \ No newline at end of file diff --git a/test_structure.py b/test_structure.py deleted file mode 100644 index 27e4f76..0000000 --- a/test_structure.py +++ /dev/null @@ -1,221 +0,0 @@ -#!/usr/bin/env python3 -""" -Test script to validate the layered architecture structure -This script checks the structure without requiring all dependencies to be installed -""" -import os -import sys -from pathlib import Path - -def check_file_structure(): - """Check if all expected files exist in the layered structure""" - expected_structure = { - "layers/__init__.py": "Layers package init", - "layers/infrastructure/__init__.py": "Infrastructure layer init", - "layers/infrastructure/database_connection.py": "Database connection management", - "layers/infrastructure/redis_connection.py": "Redis connection management", - "layers/infrastructure/repositories.py": "Data access layer", - "layers/business/__init__.py": "Business layer init", - "layers/business/sensor_service.py": "Sensor business logic", - "layers/business/room_service.py": "Room business logic", - "layers/business/analytics_service.py": "Analytics business logic", - "layers/business/cleanup_service.py": "Cleanup business logic", - "layers/presentation/__init__.py": "Presentation layer init", - "layers/presentation/websocket_handler.py": "WebSocket management", - "layers/presentation/redis_subscriber.py": "Redis pub/sub handling", - "layers/presentation/api_routes.py": "API route definitions", - "main_layered.py": "Main application with layered architecture", - "models.py": "Data models (existing)", - } - - print("šŸ” Checking layered architecture file structure...") - print("=" * 60) - - all_files_exist = True - - for file_path, description in expected_structure.items(): - full_path = Path(file_path) - - if full_path.exists(): - size = full_path.stat().st_size - print(f"āœ… {file_path:<40} ({size:,} bytes) - {description}") - else: - print(f"āŒ {file_path:<40} MISSING - {description}") - all_files_exist = False - - print("=" * 60) - - if all_files_exist: - print("šŸŽ‰ All files in layered structure exist!") - return True - else: - print("āŒ Some files are missing from the layered structure") - return False - -def check_import_structure(): - """Check the logical structure of imports (without actually importing)""" - print("\nšŸ“‹ Analyzing import dependencies...") - print("=" * 60) - - # Define expected dependencies by layer - layer_dependencies = { - "Infrastructure Layer": { - "files": [ - "layers/infrastructure/database_connection.py", - "layers/infrastructure/redis_connection.py", - "layers/infrastructure/repositories.py" - ], - "can_import_from": ["models", "external libraries"], - "should_not_import_from": ["business", "presentation"] - }, - "Business Layer": { - "files": [ - "layers/business/sensor_service.py", - "layers/business/room_service.py", - "layers/business/analytics_service.py", - "layers/business/cleanup_service.py" - ], - "can_import_from": ["models", "infrastructure", "external libraries"], - "should_not_import_from": ["presentation"] - }, - "Presentation Layer": { - "files": [ - "layers/presentation/websocket_handler.py", - "layers/presentation/redis_subscriber.py", - "layers/presentation/api_routes.py" - ], - "can_import_from": ["models", "business", "infrastructure", "external libraries"], - "should_not_import_from": [] - } - } - - violations = [] - - for layer_name, layer_info in layer_dependencies.items(): - print(f"\n{layer_name}:") - - for file_path in layer_info["files"]: - if Path(file_path).exists(): - try: - with open(file_path, 'r') as f: - content = f.read() - - # Check for violations - for forbidden in layer_info["should_not_import_from"]: - if forbidden == "business" and "from ..business" in content: - violations.append(f"{file_path} imports from business layer (violation)") - elif forbidden == "presentation" and "from ..presentation" in content: - violations.append(f"{file_path} imports from presentation layer (violation)") - - print(f" āœ… {Path(file_path).name}") - - except Exception as e: - print(f" āš ļø {Path(file_path).name} - Could not analyze: {e}") - - if violations: - print(f"\nāŒ Found {len(violations)} layering violations:") - for violation in violations: - print(f" - {violation}") - return False - else: - print("\nāœ… No layering violations detected!") - return True - -def analyze_code_separation(): - """Analyze how well the code has been separated by responsibility""" - print("\nšŸ“Š Analyzing code separation...") - print("=" * 60) - - analysis = { - "Infrastructure Layer": { - "responsibilities": ["Database connections", "Redis connections", "Data repositories"], - "file_count": 0, - "total_lines": 0 - }, - "Business Layer": { - "responsibilities": ["Business logic", "Data processing", "Analytics", "Cleanup"], - "file_count": 0, - "total_lines": 0 - }, - "Presentation Layer": { - "responsibilities": ["HTTP endpoints", "WebSocket handling", "Request/Response"], - "file_count": 0, - "total_lines": 0 - } - } - - layer_paths = { - "Infrastructure Layer": "layers/infrastructure/", - "Business Layer": "layers/business/", - "Presentation Layer": "layers/presentation/" - } - - for layer_name, layer_path in layer_paths.items(): - layer_dir = Path(layer_path) - if layer_dir.exists(): - py_files = list(layer_dir.glob("*.py")) - py_files = [f for f in py_files if f.name != "__init__.py"] - - total_lines = 0 - for py_file in py_files: - try: - with open(py_file, 'r') as f: - lines = len(f.readlines()) - total_lines += lines - except: - pass - - analysis[layer_name]["file_count"] = len(py_files) - analysis[layer_name]["total_lines"] = total_lines - - for layer_name, info in analysis.items(): - print(f"\n{layer_name}:") - print(f" Files: {info['file_count']}") - print(f" Lines of Code: {info['total_lines']:,}") - print(f" Responsibilities: {', '.join(info['responsibilities'])}") - - total_files = sum(info["file_count"] for info in analysis.values()) - total_lines = sum(info["total_lines"] for info in analysis.values()) - - print(f"\nšŸ“ˆ Total Separation Metrics:") - print(f" Total Files: {total_files}") - print(f" Total Lines: {total_lines:,}") - print(f" Layers: 3 (Infrastructure, Business, Presentation)") - - return True - -def main(): - """Main test function""" - print("šŸ—ļø LAYERED ARCHITECTURE VALIDATION") - print("=" * 60) - - success = True - - # Check file structure - if not check_file_structure(): - success = False - - # Check import structure - if not check_import_structure(): - success = False - - # Analyze code separation - if not analyze_code_separation(): - success = False - - print("\n" + "=" * 60) - if success: - print("šŸŽ‰ VALIDATION SUCCESSFUL - Layered architecture is properly structured!") - print("\n✨ Key Benefits Achieved:") - print(" • Clear separation of concerns") - print(" • Infrastructure isolated from business logic") - print(" • Business logic separated from presentation") - print(" • Easy to test individual layers") - print(" • Maintainable and scalable structure") - else: - print("āŒ VALIDATION FAILED - Issues found in layered architecture") - - return success - -if __name__ == "__main__": - sys.exit(0 if main() else 1) \ No newline at end of file