""" Analytics service for processing sensor data and generating insights """ import logging from datetime import datetime, timedelta from typing import List, Dict, Any, Optional import json logger = logging.getLogger(__name__) class AnalyticsService: """Service for analytics and data processing""" def __init__(self, db, redis_client): self.db = db self.redis = redis_client async def query_data(self, query_params) -> Dict[str, Any]: """Execute advanced data query""" try: # Build query query = {} if hasattr(query_params, 'sensor_ids') and query_params.sensor_ids: query["sensor_id"] = {"$in": query_params.sensor_ids} if hasattr(query_params, 'start_time') and query_params.start_time: query.setdefault("timestamp", {})["$gte"] = query_params.start_time if hasattr(query_params, 'end_time') and query_params.end_time: query.setdefault("timestamp", {})["$lte"] = query_params.end_time # Execute query cursor = self.db.sensor_readings.find(query) if hasattr(query_params, 'limit') and query_params.limit: cursor = cursor.limit(query_params.limit) if hasattr(query_params, 'offset') and query_params.offset: cursor = cursor.skip(query_params.offset) cursor = cursor.sort("timestamp", -1) # Get results results = [] async for reading in cursor: reading["_id"] = str(reading["_id"]) results.append(reading) # Get total count total_count = await self.db.sensor_readings.count_documents(query) return { "data": results, "total_count": total_count, "query": query_params.__dict__ if hasattr(query_params, '__dict__') else {}, "execution_time_ms": 0 # Placeholder } except Exception as e: logger.error(f"Error executing data query: {e}") raise async def get_analytics_summary(self, hours: int = 24) -> Dict[str, Any]: """Get comprehensive analytics summary""" try: start_time = datetime.utcnow() - timedelta(hours=hours) # Get basic statistics pipeline = [ { "$match": { "created_at": {"$gte": start_time} } }, { "$group": { "_id": None, "total_readings": {"$sum": 1}, "average_value": {"$avg": "$value"}, "min_value": {"$min": "$value"}, "max_value": {"$max": "$value"}, "unique_sensors": {"$addToSet": "$sensor_id"} } } ] cursor = self.db.sensor_readings.aggregate(pipeline) stats = await cursor.to_list(length=1) base_stats = stats[0] if stats else { "total_readings": 0, "average_value": 0, "min_value": 0, "max_value": 0, "unique_sensors": [] } # Get room-level statistics room_stats = await self._get_room_analytics(hours) # Get energy trends energy_trends = await self._get_energy_trends(hours) return { "period_hours": hours, "timestamp": datetime.utcnow().isoformat(), "total_readings": base_stats["total_readings"], "unique_sensors": len(base_stats["unique_sensors"]), "value_statistics": { "average": round(base_stats["average_value"], 2) if base_stats["average_value"] else 0, "minimum": base_stats["min_value"], "maximum": base_stats["max_value"] }, "room_statistics": room_stats, "energy_trends": energy_trends } except Exception as e: logger.error(f"Error getting analytics summary: {e}") raise async def get_energy_analytics(self, hours: int = 24, room: Optional[str] = None) -> Dict[str, Any]: """Get energy-specific analytics""" try: start_time = datetime.utcnow() - timedelta(hours=hours) # Build query query = {"created_at": {"$gte": start_time}} if room: query["room"] = room # Energy consumption over time pipeline = [ {"$match": query}, { "$group": { "_id": { "hour": {"$hour": "$created_at"}, "date": {"$dateToString": {"format": "%Y-%m-%d", "date": "$created_at"}} }, "total_energy": {"$sum": "$value"}, "reading_count": {"$sum": 1} } }, {"$sort": {"_id.date": 1, "_id.hour": 1}} ] cursor = self.db.sensor_readings.aggregate(pipeline) hourly_data = [] async for data in cursor: hourly_data.append({ "hour": data["_id"]["hour"], "date": data["_id"]["date"], "total_energy": data["total_energy"], "reading_count": data["reading_count"] }) # Peak consumption analysis peak_analysis = await self._get_peak_consumption_analysis(query) # Energy efficiency metrics efficiency_metrics = await self._get_efficiency_metrics(query) return { "period_hours": hours, "room": room, "timestamp": datetime.utcnow().isoformat(), "hourly_consumption": hourly_data, "peak_analysis": peak_analysis, "efficiency_metrics": efficiency_metrics, "total_consumption": sum(item["total_energy"] for item in hourly_data) } except Exception as e: logger.error(f"Error getting energy analytics: {e}") raise async def _get_room_analytics(self, hours: int) -> Dict[str, Any]: """Get room-level analytics""" try: start_time = datetime.utcnow() - timedelta(hours=hours) pipeline = [ { "$match": { "created_at": {"$gte": start_time}, "room": {"$ne": None} } }, { "$group": { "_id": "$room", "total_readings": {"$sum": 1}, "total_energy": {"$sum": "$value"}, "average_energy": {"$avg": "$value"}, "unique_sensors": {"$addToSet": "$sensor_id"} } }, {"$sort": {"total_energy": -1}} ] cursor = self.db.sensor_readings.aggregate(pipeline) room_data = [] async for room in cursor: room_data.append({ "room": room["_id"], "total_readings": room["total_readings"], "total_energy": room["total_energy"], "average_energy": round(room["average_energy"], 2), "sensor_count": len(room["unique_sensors"]) }) return { "by_room": room_data, "total_rooms": len(room_data) } except Exception as e: logger.error(f"Error getting room analytics: {e}") return {"by_room": [], "total_rooms": 0} async def _get_energy_trends(self, hours: int) -> Dict[str, Any]: """Get energy consumption trends""" try: start_time = datetime.utcnow() - timedelta(hours=hours) # Get current period data current_query = {"created_at": {"$gte": start_time}} current_cursor = self.db.sensor_readings.aggregate([ {"$match": current_query}, {"$group": {"_id": None, "total": {"$sum": "$value"}, "count": {"$sum": 1}}} ]) current_data = await current_cursor.to_list(length=1) current_total = current_data[0]["total"] if current_data else 0 current_count = current_data[0]["count"] if current_data else 0 # Get previous period for comparison previous_start = start_time - timedelta(hours=hours) previous_query = { "created_at": {"$gte": previous_start, "$lt": start_time} } previous_cursor = self.db.sensor_readings.aggregate([ {"$match": previous_query}, {"$group": {"_id": None, "total": {"$sum": "$value"}, "count": {"$sum": 1}}} ]) previous_data = await previous_cursor.to_list(length=1) previous_total = previous_data[0]["total"] if previous_data else 0 # Calculate trend if previous_total > 0: trend_percentage = ((current_total - previous_total) / previous_total) * 100 else: trend_percentage = 0 return { "current_period": { "total_energy": current_total, "reading_count": current_count, "average_per_reading": current_total / current_count if current_count > 0 else 0 }, "previous_period": { "total_energy": previous_total }, "trend": { "percentage_change": round(trend_percentage, 2), "direction": "up" if trend_percentage > 0 else "down" if trend_percentage < 0 else "stable" } } except Exception as e: logger.error(f"Error getting energy trends: {e}") return {} async def _get_peak_consumption_analysis(self, base_query: Dict[str, Any]) -> Dict[str, Any]: """Analyze peak consumption patterns""" try: pipeline = [ {"$match": base_query}, { "$group": { "_id": {"$hour": "$created_at"}, "total_consumption": {"$sum": "$value"}, "reading_count": {"$sum": 1} } }, {"$sort": {"total_consumption": -1}} ] cursor = self.db.sensor_readings.aggregate(pipeline) hourly_consumption = await cursor.to_list(length=None) if not hourly_consumption: return {"peak_hour": None, "peak_consumption": 0, "hourly_pattern": []} peak_data = hourly_consumption[0] return { "peak_hour": peak_data["_id"], "peak_consumption": peak_data["total_consumption"], "hourly_pattern": [ { "hour": item["_id"], "consumption": item["total_consumption"], "reading_count": item["reading_count"] } for item in hourly_consumption ] } except Exception as e: logger.error(f"Error analyzing peak consumption: {e}") return {"peak_hour": None, "peak_consumption": 0, "hourly_pattern": []} async def _get_efficiency_metrics(self, base_query: Dict[str, Any]) -> Dict[str, Any]: """Calculate energy efficiency metrics""" try: # Average consumption per sensor pipeline = [ {"$match": base_query}, { "$group": { "_id": "$sensor_id", "total_consumption": {"$sum": "$value"}, "reading_count": {"$sum": 1} } }, { "$group": { "_id": None, "average_per_sensor": {"$avg": "$total_consumption"}, "sensor_count": {"$sum": 1}, "min_consumption": {"$min": "$total_consumption"}, "max_consumption": {"$max": "$total_consumption"} } } ] cursor = self.db.sensor_readings.aggregate(pipeline) efficiency_data = await cursor.to_list(length=1) if not efficiency_data: return { "average_per_sensor": 0, "sensor_count": 0, "efficiency_score": 0, "variation_coefficient": 0 } data = efficiency_data[0] # Calculate efficiency score (lower variation = higher efficiency) if data["average_per_sensor"] > 0: variation_coefficient = (data["max_consumption"] - data["min_consumption"]) / data["average_per_sensor"] efficiency_score = max(0, 100 - (variation_coefficient * 10)) # Scale to 0-100 else: variation_coefficient = 0 efficiency_score = 100 return { "average_per_sensor": round(data["average_per_sensor"], 2), "sensor_count": data["sensor_count"], "efficiency_score": round(efficiency_score, 1), "variation_coefficient": round(variation_coefficient, 2) } except Exception as e: logger.error(f"Error calculating efficiency metrics: {e}") return { "average_per_sensor": 0, "sensor_count": 0, "efficiency_score": 0, "variation_coefficient": 0 }