Remove layered architecture files and related modules
This commit is contained in:
@@ -1 +0,0 @@
|
||||
# Empty file to make this a Python package
|
||||
Binary file not shown.
Binary file not shown.
@@ -1 +0,0 @@
|
||||
# Empty file to make this a Python package
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,300 +0,0 @@
|
||||
"""
|
||||
Analytics business logic service
|
||||
Business Layer - handles analytics calculations and data aggregations
|
||||
"""
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Dict, Any, List, Optional
|
||||
import logging
|
||||
|
||||
from ..infrastructure.repositories import SensorReadingRepository
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class AnalyticsService:
|
||||
"""Service for analytics and reporting operations"""
|
||||
|
||||
def __init__(self):
|
||||
self.sensor_reading_repo = SensorReadingRepository()
|
||||
|
||||
async def get_analytics_summary(self, hours: int = 24) -> Dict[str, Any]:
|
||||
"""Get comprehensive analytics summary for the specified time period"""
|
||||
try:
|
||||
start_time = datetime.utcnow() - timedelta(hours=hours)
|
||||
|
||||
# Sensor-level analytics pipeline
|
||||
sensor_pipeline = [
|
||||
{"$match": {"created_at": {"$gte": start_time}}},
|
||||
{"$group": {
|
||||
"_id": {
|
||||
"sensor_id": "$sensor_id",
|
||||
"room": "$room",
|
||||
"sensor_type": "$sensor_type"
|
||||
},
|
||||
"reading_count": {"$sum": 1},
|
||||
"avg_energy": {"$avg": "$energy.value"},
|
||||
"total_energy": {"$sum": "$energy.value"},
|
||||
"avg_co2": {"$avg": "$co2.value"},
|
||||
"max_co2": {"$max": "$co2.value"},
|
||||
"avg_temperature": {"$avg": "$temperature.value"},
|
||||
"latest_timestamp": {"$max": "$timestamp"}
|
||||
}},
|
||||
{"$sort": {"total_energy": -1}}
|
||||
]
|
||||
|
||||
sensor_analytics = await self.sensor_reading_repo.aggregate(sensor_pipeline)
|
||||
|
||||
# Room-level analytics pipeline
|
||||
room_pipeline = [
|
||||
{"$match": {"created_at": {"$gte": start_time}, "room": {"$ne": None}}},
|
||||
{"$group": {
|
||||
"_id": "$room",
|
||||
"sensor_count": {"$addToSet": "$sensor_id"},
|
||||
"total_energy": {"$sum": "$energy.value"},
|
||||
"avg_co2": {"$avg": "$co2.value"},
|
||||
"max_co2": {"$max": "$co2.value"},
|
||||
"reading_count": {"$sum": 1}
|
||||
}},
|
||||
{"$project": {
|
||||
"room": "$_id",
|
||||
"sensor_count": {"$size": "$sensor_count"},
|
||||
"total_energy": 1,
|
||||
"avg_co2": 1,
|
||||
"max_co2": 1,
|
||||
"reading_count": 1
|
||||
}},
|
||||
{"$sort": {"total_energy": -1}}
|
||||
]
|
||||
|
||||
room_analytics = await self.sensor_reading_repo.aggregate(room_pipeline)
|
||||
|
||||
# Calculate summary statistics
|
||||
summary_stats = self._calculate_summary_stats(sensor_analytics, room_analytics)
|
||||
|
||||
return {
|
||||
"period_hours": hours,
|
||||
"start_time": start_time.isoformat(),
|
||||
"sensor_analytics": sensor_analytics,
|
||||
"room_analytics": room_analytics,
|
||||
"summary": summary_stats
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting analytics summary: {e}")
|
||||
return {
|
||||
"period_hours": hours,
|
||||
"start_time": None,
|
||||
"sensor_analytics": [],
|
||||
"room_analytics": [],
|
||||
"summary": {}
|
||||
}
|
||||
|
||||
def _calculate_summary_stats(self, sensor_analytics: List[Dict],
|
||||
room_analytics: List[Dict]) -> Dict[str, Any]:
|
||||
"""Calculate summary statistics from analytics data"""
|
||||
total_readings = sum(item["reading_count"] for item in sensor_analytics)
|
||||
total_energy = sum(item.get("total_energy", 0) or 0 for item in sensor_analytics)
|
||||
|
||||
# Energy consumption insights
|
||||
energy_insights = {
|
||||
"total_consumption_kwh": round(total_energy, 2),
|
||||
"average_consumption_per_sensor": (
|
||||
round(total_energy / len(sensor_analytics), 2)
|
||||
if sensor_analytics else 0
|
||||
),
|
||||
"top_energy_consumer": (
|
||||
sensor_analytics[0]["_id"]["sensor_id"]
|
||||
if sensor_analytics else None
|
||||
)
|
||||
}
|
||||
|
||||
# CO2 insights
|
||||
co2_values = [item.get("avg_co2") for item in sensor_analytics if item.get("avg_co2")]
|
||||
co2_insights = {
|
||||
"average_co2_level": (
|
||||
round(sum(co2_values) / len(co2_values), 1)
|
||||
if co2_values else 0
|
||||
),
|
||||
"sensors_with_high_co2": len([
|
||||
co2 for co2 in co2_values if co2 and co2 > 1000
|
||||
]),
|
||||
"sensors_with_critical_co2": len([
|
||||
co2 for co2 in co2_values if co2 and co2 > 5000
|
||||
])
|
||||
}
|
||||
|
||||
return {
|
||||
"total_sensors_analyzed": len(sensor_analytics),
|
||||
"total_rooms_analyzed": len(room_analytics),
|
||||
"total_readings": total_readings,
|
||||
"energy_insights": energy_insights,
|
||||
"co2_insights": co2_insights
|
||||
}
|
||||
|
||||
async def get_energy_trends(self, hours: int = 168) -> Dict[str, Any]:
|
||||
"""Get energy consumption trends (default: last week)"""
|
||||
try:
|
||||
start_time = datetime.utcnow() - timedelta(hours=hours)
|
||||
|
||||
# Hourly energy consumption pipeline
|
||||
pipeline = [
|
||||
{"$match": {
|
||||
"created_at": {"$gte": start_time},
|
||||
"energy.value": {"$exists": True}
|
||||
}},
|
||||
{"$group": {
|
||||
"_id": {
|
||||
"year": {"$year": "$created_at"},
|
||||
"month": {"$month": "$created_at"},
|
||||
"day": {"$dayOfMonth": "$created_at"},
|
||||
"hour": {"$hour": "$created_at"}
|
||||
},
|
||||
"total_energy": {"$sum": "$energy.value"},
|
||||
"sensor_count": {"$addToSet": "$sensor_id"},
|
||||
"reading_count": {"$sum": 1}
|
||||
}},
|
||||
{"$project": {
|
||||
"_id": 0,
|
||||
"timestamp": {
|
||||
"$dateFromParts": {
|
||||
"year": "$_id.year",
|
||||
"month": "$_id.month",
|
||||
"day": "$_id.day",
|
||||
"hour": "$_id.hour"
|
||||
}
|
||||
},
|
||||
"total_energy": {"$round": ["$total_energy", 2]},
|
||||
"sensor_count": {"$size": "$sensor_count"},
|
||||
"reading_count": 1
|
||||
}},
|
||||
{"$sort": {"timestamp": 1}}
|
||||
]
|
||||
|
||||
trends = await self.sensor_reading_repo.aggregate(pipeline)
|
||||
|
||||
# Calculate trend insights
|
||||
insights = self._calculate_trend_insights(trends)
|
||||
|
||||
return {
|
||||
"period_hours": hours,
|
||||
"data_points": len(trends),
|
||||
"trends": trends,
|
||||
"insights": insights
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting energy trends: {e}")
|
||||
return {
|
||||
"period_hours": hours,
|
||||
"data_points": 0,
|
||||
"trends": [],
|
||||
"insights": {}
|
||||
}
|
||||
|
||||
def _calculate_trend_insights(self, trends: List[Dict]) -> Dict[str, Any]:
|
||||
"""Calculate insights from trend data"""
|
||||
if not trends:
|
||||
return {}
|
||||
|
||||
energy_values = [item["total_energy"] for item in trends]
|
||||
|
||||
# Peak and low consumption
|
||||
max_consumption = max(energy_values)
|
||||
min_consumption = min(energy_values)
|
||||
avg_consumption = sum(energy_values) / len(energy_values)
|
||||
|
||||
# Find peak time
|
||||
peak_item = max(trends, key=lambda x: x["total_energy"])
|
||||
peak_time = peak_item["timestamp"]
|
||||
|
||||
return {
|
||||
"peak_consumption_kwh": max_consumption,
|
||||
"lowest_consumption_kwh": min_consumption,
|
||||
"average_consumption_kwh": round(avg_consumption, 2),
|
||||
"peak_time": peak_time.isoformat() if hasattr(peak_time, 'isoformat') else str(peak_time),
|
||||
"consumption_variance": round(max_consumption - min_consumption, 2)
|
||||
}
|
||||
|
||||
async def get_room_comparison(self, hours: int = 24) -> Dict[str, Any]:
|
||||
"""Get room-by-room comparison analytics"""
|
||||
try:
|
||||
start_time = datetime.utcnow() - timedelta(hours=hours)
|
||||
|
||||
pipeline = [
|
||||
{"$match": {
|
||||
"created_at": {"$gte": start_time},
|
||||
"room": {"$ne": None}
|
||||
}},
|
||||
{"$group": {
|
||||
"_id": "$room",
|
||||
"total_energy": {"$sum": "$energy.value"},
|
||||
"avg_energy": {"$avg": "$energy.value"},
|
||||
"avg_co2": {"$avg": "$co2.value"},
|
||||
"max_co2": {"$max": "$co2.value"},
|
||||
"avg_temperature": {"$avg": "$temperature.value"},
|
||||
"sensor_count": {"$addToSet": "$sensor_id"},
|
||||
"reading_count": {"$sum": 1}
|
||||
}},
|
||||
{"$project": {
|
||||
"room": "$_id",
|
||||
"_id": 0,
|
||||
"total_energy": {"$round": [{"$ifNull": ["$total_energy", 0]}, 2]},
|
||||
"avg_energy": {"$round": [{"$ifNull": ["$avg_energy", 0]}, 2]},
|
||||
"avg_co2": {"$round": [{"$ifNull": ["$avg_co2", 0]}, 1]},
|
||||
"max_co2": {"$round": [{"$ifNull": ["$max_co2", 0]}, 1]},
|
||||
"avg_temperature": {"$round": [{"$ifNull": ["$avg_temperature", 0]}, 1]},
|
||||
"sensor_count": {"$size": "$sensor_count"},
|
||||
"reading_count": 1
|
||||
}},
|
||||
{"$sort": {"total_energy": -1}}
|
||||
]
|
||||
|
||||
room_comparison = await self.sensor_reading_repo.aggregate(pipeline)
|
||||
|
||||
# Calculate comparison insights
|
||||
insights = self._calculate_room_insights(room_comparison)
|
||||
|
||||
return {
|
||||
"period_hours": hours,
|
||||
"rooms_analyzed": len(room_comparison),
|
||||
"comparison": room_comparison,
|
||||
"insights": insights
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting room comparison: {e}")
|
||||
return {
|
||||
"period_hours": hours,
|
||||
"rooms_analyzed": 0,
|
||||
"comparison": [],
|
||||
"insights": {}
|
||||
}
|
||||
|
||||
def _calculate_room_insights(self, room_data: List[Dict]) -> Dict[str, Any]:
|
||||
"""Calculate insights from room comparison data"""
|
||||
if not room_data:
|
||||
return {}
|
||||
|
||||
# Energy insights
|
||||
total_energy = sum(room["total_energy"] for room in room_data)
|
||||
highest_consumer = room_data[0] if room_data else None
|
||||
lowest_consumer = min(room_data, key=lambda x: x["total_energy"]) if room_data else None
|
||||
|
||||
# CO2 insights
|
||||
rooms_with_high_co2 = [
|
||||
room for room in room_data
|
||||
if room.get("avg_co2", 0) > 1000
|
||||
]
|
||||
|
||||
# Temperature insights
|
||||
temp_values = [room.get("avg_temperature", 0) for room in room_data if room.get("avg_temperature")]
|
||||
avg_building_temp = sum(temp_values) / len(temp_values) if temp_values else 0
|
||||
|
||||
return {
|
||||
"total_building_energy_kwh": round(total_energy, 2),
|
||||
"highest_energy_consumer": highest_consumer["room"] if highest_consumer else None,
|
||||
"lowest_energy_consumer": lowest_consumer["room"] if lowest_consumer else None,
|
||||
"rooms_with_high_co2": len(rooms_with_high_co2),
|
||||
"high_co2_rooms": [room["room"] for room in rooms_with_high_co2],
|
||||
"average_building_temperature": round(avg_building_temp, 1),
|
||||
"total_active_sensors": sum(room["sensor_count"] for room in room_data)
|
||||
}
|
||||
@@ -1,234 +0,0 @@
|
||||
"""
|
||||
Data cleanup and maintenance service
|
||||
Business Layer - handles data retention policies and system maintenance
|
||||
"""
|
||||
import asyncio
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Dict, Any
|
||||
import logging
|
||||
|
||||
from ..infrastructure.database_connection import database_connection
|
||||
from ..infrastructure.repositories import SensorReadingRepository
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class CleanupService:
|
||||
"""Service for data cleanup and maintenance operations"""
|
||||
|
||||
def __init__(self):
|
||||
self.sensor_reading_repo = SensorReadingRepository()
|
||||
self.is_running = False
|
||||
self.cleanup_task = None
|
||||
|
||||
async def start_scheduled_cleanup(self, interval_hours: int = 24) -> None:
|
||||
"""Start scheduled cleanup process"""
|
||||
if self.is_running:
|
||||
logger.warning("Cleanup service is already running")
|
||||
return
|
||||
|
||||
self.is_running = True
|
||||
self.cleanup_task = asyncio.create_task(self._cleanup_loop(interval_hours))
|
||||
logger.info(f"Started scheduled cleanup service (interval: {interval_hours} hours)")
|
||||
|
||||
async def stop_scheduled_cleanup(self) -> None:
|
||||
"""Stop scheduled cleanup process"""
|
||||
self.is_running = False
|
||||
if self.cleanup_task:
|
||||
self.cleanup_task.cancel()
|
||||
try:
|
||||
await self.cleanup_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
logger.info("Cleanup service stopped")
|
||||
|
||||
async def _cleanup_loop(self, interval_hours: int) -> None:
|
||||
"""Main cleanup loop"""
|
||||
while self.is_running:
|
||||
try:
|
||||
await self.cleanup_old_data()
|
||||
# Wait for next cleanup interval
|
||||
await asyncio.sleep(interval_hours * 3600) # Convert hours to seconds
|
||||
except Exception as e:
|
||||
logger.error(f"Error in scheduled cleanup: {e}")
|
||||
# Wait 1 hour before retrying on error
|
||||
await asyncio.sleep(3600)
|
||||
|
||||
async def cleanup_old_data(self) -> Dict[str, int]:
|
||||
"""Perform data cleanup based on retention policies"""
|
||||
try:
|
||||
cleanup_results = {}
|
||||
db = await database_connection.get_database()
|
||||
|
||||
# Delete sensor readings older than 90 days
|
||||
sensor_retention_date = datetime.utcnow() - timedelta(days=90)
|
||||
sensor_result = await db.sensor_readings.delete_many({
|
||||
"created_at": {"$lt": sensor_retention_date}
|
||||
})
|
||||
cleanup_results["sensor_readings_deleted"] = sensor_result.deleted_count
|
||||
|
||||
if sensor_result.deleted_count > 0:
|
||||
logger.info(f"Deleted {sensor_result.deleted_count} old sensor readings")
|
||||
|
||||
# Delete room metrics older than 30 days
|
||||
room_retention_date = datetime.utcnow() - timedelta(days=30)
|
||||
room_result = await db.room_metrics.delete_many({
|
||||
"created_at": {"$lt": room_retention_date}
|
||||
})
|
||||
cleanup_results["room_metrics_deleted"] = room_result.deleted_count
|
||||
|
||||
if room_result.deleted_count > 0:
|
||||
logger.info(f"Deleted {room_result.deleted_count} old room metrics")
|
||||
|
||||
# Delete system events older than 60 days
|
||||
events_retention_date = datetime.utcnow() - timedelta(days=60)
|
||||
events_result = await db.system_events.delete_many({
|
||||
"created_at": {"$lt": events_retention_date}
|
||||
})
|
||||
cleanup_results["system_events_deleted"] = events_result.deleted_count
|
||||
|
||||
if events_result.deleted_count > 0:
|
||||
logger.info(f"Deleted {events_result.deleted_count} old system events")
|
||||
|
||||
# Clean up orphaned sensor metadata (sensors with no recent readings)
|
||||
orphaned_retention_date = datetime.utcnow() - timedelta(days=30)
|
||||
|
||||
# Find sensors with no recent readings
|
||||
active_sensors = await db.sensor_readings.distinct("sensor_id", {
|
||||
"created_at": {"$gte": orphaned_retention_date}
|
||||
})
|
||||
|
||||
orphaned_result = await db.sensor_metadata.delete_many({
|
||||
"sensor_id": {"$nin": active_sensors},
|
||||
"last_seen": {"$lt": orphaned_retention_date}
|
||||
})
|
||||
cleanup_results["orphaned_metadata_deleted"] = orphaned_result.deleted_count
|
||||
|
||||
if orphaned_result.deleted_count > 0:
|
||||
logger.info(f"Deleted {orphaned_result.deleted_count} orphaned sensor metadata records")
|
||||
|
||||
return cleanup_results
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error during data cleanup: {e}")
|
||||
return {"error": str(e)}
|
||||
|
||||
async def get_storage_statistics(self) -> Dict[str, Any]:
|
||||
"""Get storage statistics for different collections"""
|
||||
try:
|
||||
db = await database_connection.get_database()
|
||||
|
||||
stats = {}
|
||||
|
||||
# Sensor readings statistics
|
||||
sensor_stats = await db.command("collStats", "sensor_readings")
|
||||
stats["sensor_readings"] = {
|
||||
"count": sensor_stats.get("count", 0),
|
||||
"size_bytes": sensor_stats.get("size", 0),
|
||||
"avg_obj_size": sensor_stats.get("avgObjSize", 0),
|
||||
"storage_size": sensor_stats.get("storageSize", 0)
|
||||
}
|
||||
|
||||
# Room metrics statistics
|
||||
room_stats = await db.command("collStats", "room_metrics")
|
||||
stats["room_metrics"] = {
|
||||
"count": room_stats.get("count", 0),
|
||||
"size_bytes": room_stats.get("size", 0),
|
||||
"avg_obj_size": room_stats.get("avgObjSize", 0),
|
||||
"storage_size": room_stats.get("storageSize", 0)
|
||||
}
|
||||
|
||||
# System events statistics
|
||||
events_stats = await db.command("collStats", "system_events")
|
||||
stats["system_events"] = {
|
||||
"count": events_stats.get("count", 0),
|
||||
"size_bytes": events_stats.get("size", 0),
|
||||
"avg_obj_size": events_stats.get("avgObjSize", 0),
|
||||
"storage_size": events_stats.get("storageSize", 0)
|
||||
}
|
||||
|
||||
# Sensor metadata statistics
|
||||
metadata_stats = await db.command("collStats", "sensor_metadata")
|
||||
stats["sensor_metadata"] = {
|
||||
"count": metadata_stats.get("count", 0),
|
||||
"size_bytes": metadata_stats.get("size", 0),
|
||||
"avg_obj_size": metadata_stats.get("avgObjSize", 0),
|
||||
"storage_size": metadata_stats.get("storageSize", 0)
|
||||
}
|
||||
|
||||
# Calculate totals
|
||||
total_documents = sum(collection["count"] for collection in stats.values())
|
||||
total_size = sum(collection["size_bytes"] for collection in stats.values())
|
||||
total_storage = sum(collection["storage_size"] for collection in stats.values())
|
||||
|
||||
stats["totals"] = {
|
||||
"total_documents": total_documents,
|
||||
"total_size_bytes": total_size,
|
||||
"total_storage_bytes": total_storage,
|
||||
"total_size_mb": round(total_size / (1024 * 1024), 2),
|
||||
"total_storage_mb": round(total_storage / (1024 * 1024), 2)
|
||||
}
|
||||
|
||||
return stats
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting storage statistics: {e}")
|
||||
return {"error": str(e)}
|
||||
|
||||
async def get_data_retention_info(self) -> Dict[str, Any]:
|
||||
"""Get information about data retention policies and old data"""
|
||||
try:
|
||||
db = await database_connection.get_database()
|
||||
|
||||
# Current date references
|
||||
now = datetime.utcnow()
|
||||
sensor_cutoff = now - timedelta(days=90)
|
||||
room_cutoff = now - timedelta(days=30)
|
||||
events_cutoff = now - timedelta(days=60)
|
||||
|
||||
retention_info = {}
|
||||
|
||||
# Sensor readings retention info
|
||||
old_sensor_count = await db.sensor_readings.count_documents({
|
||||
"created_at": {"$lt": sensor_cutoff}
|
||||
})
|
||||
retention_info["sensor_readings"] = {
|
||||
"retention_days": 90,
|
||||
"cutoff_date": sensor_cutoff.isoformat(),
|
||||
"old_records_count": old_sensor_count
|
||||
}
|
||||
|
||||
# Room metrics retention info
|
||||
old_room_count = await db.room_metrics.count_documents({
|
||||
"created_at": {"$lt": room_cutoff}
|
||||
})
|
||||
retention_info["room_metrics"] = {
|
||||
"retention_days": 30,
|
||||
"cutoff_date": room_cutoff.isoformat(),
|
||||
"old_records_count": old_room_count
|
||||
}
|
||||
|
||||
# System events retention info
|
||||
old_events_count = await db.system_events.count_documents({
|
||||
"created_at": {"$lt": events_cutoff}
|
||||
})
|
||||
retention_info["system_events"] = {
|
||||
"retention_days": 60,
|
||||
"cutoff_date": events_cutoff.isoformat(),
|
||||
"old_records_count": old_events_count
|
||||
}
|
||||
|
||||
return retention_info
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting retention info: {e}")
|
||||
return {"error": str(e)}
|
||||
|
||||
def is_cleanup_running(self) -> bool:
|
||||
"""Check if cleanup service is currently running"""
|
||||
return self.is_running and (
|
||||
self.cleanup_task is not None and
|
||||
not self.cleanup_task.done()
|
||||
)
|
||||
|
||||
# Global cleanup service instance
|
||||
cleanup_service = CleanupService()
|
||||
@@ -1,262 +0,0 @@
|
||||
"""
|
||||
Room metrics business logic service
|
||||
Business Layer - handles room-related aggregations and business operations
|
||||
"""
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Dict, Any, List, Optional
|
||||
import logging
|
||||
|
||||
from models import RoomMetrics, CO2Status, OccupancyLevel
|
||||
from ..infrastructure.repositories import (
|
||||
SensorReadingRepository, RoomMetricsRepository, RedisRepository
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class RoomService:
|
||||
"""Service for room-related business operations"""
|
||||
|
||||
def __init__(self):
|
||||
self.sensor_reading_repo = SensorReadingRepository()
|
||||
self.room_metrics_repo = RoomMetricsRepository()
|
||||
self.redis_repo = RedisRepository()
|
||||
|
||||
async def update_room_metrics(self, room: str) -> bool:
|
||||
"""Calculate and store room-level metrics"""
|
||||
if not room:
|
||||
return False
|
||||
|
||||
try:
|
||||
# Get recent readings for this room (last 5 minutes)
|
||||
recent_readings = await self.sensor_reading_repo.get_recent_by_room(
|
||||
room=room,
|
||||
minutes=5
|
||||
)
|
||||
|
||||
if not recent_readings:
|
||||
return False
|
||||
|
||||
# Calculate aggregated metrics
|
||||
metrics = await self._calculate_room_metrics(room, recent_readings)
|
||||
|
||||
# Store in MongoDB
|
||||
stored = await self.room_metrics_repo.create(metrics)
|
||||
|
||||
# Cache in Redis
|
||||
if stored:
|
||||
await self.redis_repo.set_room_metrics(room, metrics.dict())
|
||||
logger.debug(f"Updated room metrics for {room}")
|
||||
|
||||
return stored
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating room metrics for {room}: {e}")
|
||||
return False
|
||||
|
||||
async def _calculate_room_metrics(self, room: str, readings: List[Dict]) -> RoomMetrics:
|
||||
"""Calculate aggregated metrics for a room based on recent readings"""
|
||||
|
||||
# Group readings by sensor
|
||||
sensors_data = {}
|
||||
for reading in readings:
|
||||
sensor_id = reading["sensor_id"]
|
||||
if sensor_id not in sensors_data:
|
||||
sensors_data[sensor_id] = []
|
||||
sensors_data[sensor_id].append(reading)
|
||||
|
||||
# Initialize value arrays
|
||||
energy_values = []
|
||||
co2_values = []
|
||||
temperature_values = []
|
||||
humidity_values = []
|
||||
motion_detected = False
|
||||
|
||||
# Extract values from readings
|
||||
for sensor_readings in sensors_data.values():
|
||||
for reading in sensor_readings:
|
||||
if reading.get("energy"):
|
||||
energy_values.append(reading["energy"]["value"])
|
||||
if reading.get("co2"):
|
||||
co2_values.append(reading["co2"]["value"])
|
||||
if reading.get("temperature"):
|
||||
temperature_values.append(reading["temperature"]["value"])
|
||||
if reading.get("humidity"):
|
||||
humidity_values.append(reading["humidity"]["value"])
|
||||
if reading.get("motion") and reading["motion"].get("value") == "Detected":
|
||||
motion_detected = True
|
||||
|
||||
# Get sensor types present
|
||||
sensor_types = list(set(
|
||||
reading.get("sensor_type")
|
||||
for reading in readings
|
||||
if reading.get("sensor_type")
|
||||
))
|
||||
|
||||
# Initialize metrics object
|
||||
metrics = RoomMetrics(
|
||||
room=room,
|
||||
timestamp=int(datetime.utcnow().timestamp()),
|
||||
sensor_count=len(sensors_data),
|
||||
active_sensors=list(sensors_data.keys()),
|
||||
sensor_types=sensor_types,
|
||||
motion_detected=motion_detected
|
||||
)
|
||||
|
||||
# Calculate energy metrics
|
||||
if energy_values:
|
||||
metrics.energy = self._calculate_energy_metrics(energy_values)
|
||||
|
||||
# Calculate CO2 metrics and occupancy
|
||||
if co2_values:
|
||||
metrics.co2 = self._calculate_co2_metrics(co2_values)
|
||||
metrics.occupancy_estimate = self._estimate_occupancy_from_co2(
|
||||
metrics.co2["average"]
|
||||
)
|
||||
|
||||
# Calculate temperature metrics
|
||||
if temperature_values:
|
||||
metrics.temperature = self._calculate_temperature_metrics(temperature_values)
|
||||
|
||||
# Calculate humidity metrics
|
||||
if humidity_values:
|
||||
metrics.humidity = self._calculate_humidity_metrics(humidity_values)
|
||||
|
||||
# Set last activity time if motion detected
|
||||
if motion_detected:
|
||||
metrics.last_activity = datetime.utcnow()
|
||||
|
||||
return metrics
|
||||
|
||||
def _calculate_energy_metrics(self, values: List[float]) -> Dict[str, Any]:
|
||||
"""Calculate energy consumption metrics"""
|
||||
return {
|
||||
"current": sum(values),
|
||||
"average": sum(values) / len(values),
|
||||
"total": sum(values),
|
||||
"peak": max(values),
|
||||
"unit": "kWh"
|
||||
}
|
||||
|
||||
def _calculate_co2_metrics(self, values: List[float]) -> Dict[str, Any]:
|
||||
"""Calculate CO2 level metrics"""
|
||||
avg_co2 = sum(values) / len(values)
|
||||
return {
|
||||
"current": avg_co2,
|
||||
"average": avg_co2,
|
||||
"max": max(values),
|
||||
"min": min(values),
|
||||
"status": self._get_co2_status(avg_co2).value,
|
||||
"unit": "ppm"
|
||||
}
|
||||
|
||||
def _calculate_temperature_metrics(self, values: List[float]) -> Dict[str, Any]:
|
||||
"""Calculate temperature metrics"""
|
||||
avg_temp = sum(values) / len(values)
|
||||
return {
|
||||
"current": avg_temp,
|
||||
"average": avg_temp,
|
||||
"max": max(values),
|
||||
"min": min(values),
|
||||
"unit": "°C"
|
||||
}
|
||||
|
||||
def _calculate_humidity_metrics(self, values: List[float]) -> Dict[str, Any]:
|
||||
"""Calculate humidity metrics"""
|
||||
avg_humidity = sum(values) / len(values)
|
||||
return {
|
||||
"current": avg_humidity,
|
||||
"average": avg_humidity,
|
||||
"max": max(values),
|
||||
"min": min(values),
|
||||
"unit": "%"
|
||||
}
|
||||
|
||||
def _get_co2_status(self, co2_level: float) -> CO2Status:
|
||||
"""Determine CO2 status based on level"""
|
||||
if co2_level < 400:
|
||||
return CO2Status.GOOD
|
||||
elif co2_level < 1000:
|
||||
return CO2Status.MODERATE
|
||||
elif co2_level < 5000:
|
||||
return CO2Status.POOR
|
||||
else:
|
||||
return CO2Status.CRITICAL
|
||||
|
||||
def _estimate_occupancy_from_co2(self, co2_level: float) -> OccupancyLevel:
|
||||
"""Estimate occupancy level based on CO2 levels"""
|
||||
if co2_level < 600:
|
||||
return OccupancyLevel.LOW
|
||||
elif co2_level < 1200:
|
||||
return OccupancyLevel.MEDIUM
|
||||
else:
|
||||
return OccupancyLevel.HIGH
|
||||
|
||||
async def get_all_rooms(self) -> Dict[str, Any]:
|
||||
"""Get list of all rooms with sensor counts and latest metrics"""
|
||||
try:
|
||||
rooms = await self.sensor_reading_repo.get_distinct_rooms()
|
||||
|
||||
room_data = []
|
||||
for room in rooms:
|
||||
# Get sensor count for each room
|
||||
sensor_ids = await self.sensor_reading_repo.get_distinct_sensor_ids_by_room(room)
|
||||
sensor_count = len(sensor_ids)
|
||||
|
||||
# Get latest room metrics from cache
|
||||
room_metrics = await self.redis_repo.get_room_metrics(room)
|
||||
|
||||
room_data.append({
|
||||
"room": room,
|
||||
"sensor_count": sensor_count,
|
||||
"sensor_ids": sensor_ids,
|
||||
"latest_metrics": room_metrics
|
||||
})
|
||||
|
||||
return {
|
||||
"rooms": room_data,
|
||||
"count": len(room_data)
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting rooms: {e}")
|
||||
return {"rooms": [], "count": 0}
|
||||
|
||||
async def get_room_data(self, room_name: str, start_time: Optional[int] = None,
|
||||
end_time: Optional[int] = None, limit: int = 100) -> Dict[str, Any]:
|
||||
"""Get historical data for a specific room"""
|
||||
try:
|
||||
# Build query for time range
|
||||
query = {"room": room_name}
|
||||
|
||||
if start_time or end_time:
|
||||
time_query = {}
|
||||
if start_time:
|
||||
time_query["$gte"] = datetime.fromtimestamp(start_time)
|
||||
if end_time:
|
||||
time_query["$lte"] = datetime.fromtimestamp(end_time)
|
||||
query["created_at"] = time_query
|
||||
|
||||
# Get room metrics
|
||||
room_metrics = await self.room_metrics_repo.get_by_room(room_name, limit)
|
||||
|
||||
# Get sensor readings for the room
|
||||
sensor_readings = await self.sensor_reading_repo.get_by_query(
|
||||
query=query,
|
||||
sort_by="timestamp",
|
||||
sort_order="desc",
|
||||
limit=limit
|
||||
)
|
||||
|
||||
return {
|
||||
"room": room_name,
|
||||
"room_metrics": room_metrics,
|
||||
"sensor_readings": sensor_readings
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting room data for {room_name}: {e}")
|
||||
return {
|
||||
"room": room_name,
|
||||
"room_metrics": [],
|
||||
"sensor_readings": []
|
||||
}
|
||||
@@ -1,328 +0,0 @@
|
||||
"""
|
||||
Sensor business logic service
|
||||
Business Layer - handles sensor-related business operations and rules
|
||||
"""
|
||||
import json
|
||||
from datetime import datetime, timedelta
|
||||
from typing import Dict, Any, List, Optional
|
||||
import logging
|
||||
import uuid
|
||||
|
||||
from models import (
|
||||
SensorReading, LegacySensorReading, SensorMetadata,
|
||||
SensorType, SensorStatus, CO2Status, OccupancyLevel
|
||||
)
|
||||
from ..infrastructure.repositories import (
|
||||
SensorReadingRepository, SensorMetadataRepository,
|
||||
SystemEventRepository, RedisRepository
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class SensorService:
|
||||
"""Service for sensor-related business operations"""
|
||||
|
||||
def __init__(self):
|
||||
self.sensor_reading_repo = SensorReadingRepository()
|
||||
self.sensor_metadata_repo = SensorMetadataRepository()
|
||||
self.system_event_repo = SystemEventRepository()
|
||||
self.redis_repo = RedisRepository()
|
||||
|
||||
async def process_sensor_message(self, message_data: str) -> bool:
|
||||
"""Process incoming sensor message and handle business logic"""
|
||||
try:
|
||||
# Parse the message
|
||||
data = json.loads(message_data)
|
||||
logger.debug(f"Processing sensor message: {data}")
|
||||
|
||||
# Convert to standard format
|
||||
sensor_reading = await self._parse_sensor_data(data)
|
||||
|
||||
# Validate business rules
|
||||
validation_result = await self._validate_sensor_reading(sensor_reading)
|
||||
if not validation_result["valid"]:
|
||||
logger.warning(f"Sensor reading validation failed: {validation_result['errors']}")
|
||||
return False
|
||||
|
||||
# Store the reading
|
||||
stored = await self.sensor_reading_repo.create(sensor_reading)
|
||||
if not stored:
|
||||
return False
|
||||
|
||||
# Update caches and metadata
|
||||
await self._update_caches(sensor_reading)
|
||||
await self._update_sensor_metadata(sensor_reading)
|
||||
|
||||
# Check for alerts
|
||||
await self._check_sensor_alerts(sensor_reading)
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing sensor message: {e}")
|
||||
await self._log_processing_error(str(e), message_data)
|
||||
return False
|
||||
|
||||
async def _parse_sensor_data(self, data: dict) -> SensorReading:
|
||||
"""Parse and convert sensor data to standard format"""
|
||||
# Check if legacy format
|
||||
if self._is_legacy_format(data):
|
||||
return await self._convert_legacy_data(data)
|
||||
else:
|
||||
return SensorReading(**data)
|
||||
|
||||
def _is_legacy_format(self, data: dict) -> bool:
|
||||
"""Check if data is in legacy format"""
|
||||
legacy_keys = {"sensorId", "timestamp", "value", "unit"}
|
||||
return legacy_keys.issubset(data.keys()) and "energy" not in data
|
||||
|
||||
async def _convert_legacy_data(self, data: dict) -> SensorReading:
|
||||
"""Convert legacy format to new sensor reading format"""
|
||||
legacy_reading = LegacySensorReading(**data)
|
||||
|
||||
return SensorReading(
|
||||
sensor_id=legacy_reading.sensor_id,
|
||||
sensor_type=SensorType.ENERGY,
|
||||
timestamp=legacy_reading.timestamp,
|
||||
created_at=legacy_reading.created_at,
|
||||
energy={
|
||||
"value": legacy_reading.value,
|
||||
"unit": legacy_reading.unit
|
||||
}
|
||||
)
|
||||
|
||||
async def _validate_sensor_reading(self, reading: SensorReading) -> Dict[str, Any]:
|
||||
"""Validate sensor reading against business rules"""
|
||||
errors = []
|
||||
|
||||
# Check timestamp is not too far in the future
|
||||
future_threshold = datetime.utcnow().timestamp() + 3600 # 1 hour
|
||||
if reading.timestamp > future_threshold:
|
||||
errors.append("Timestamp is too far in the future")
|
||||
|
||||
# Check timestamp is not too old
|
||||
past_threshold = datetime.utcnow().timestamp() - 86400 # 24 hours
|
||||
if reading.timestamp < past_threshold:
|
||||
errors.append("Timestamp is too old")
|
||||
|
||||
# Validate sensor values
|
||||
if reading.energy:
|
||||
energy_value = reading.energy.get("value", 0)
|
||||
if energy_value < 0 or energy_value > 1000: # Reasonable energy range
|
||||
errors.append("Energy value is out of acceptable range")
|
||||
|
||||
if reading.co2:
|
||||
co2_value = reading.co2.get("value", 0)
|
||||
if co2_value < 0 or co2_value > 50000: # Reasonable CO2 range
|
||||
errors.append("CO2 value is out of acceptable range")
|
||||
|
||||
if reading.temperature:
|
||||
temp_value = reading.temperature.get("value", 0)
|
||||
if temp_value < -50 or temp_value > 100: # Reasonable temperature range
|
||||
errors.append("Temperature value is out of acceptable range")
|
||||
|
||||
return {
|
||||
"valid": len(errors) == 0,
|
||||
"errors": errors
|
||||
}
|
||||
|
||||
async def _update_caches(self, reading: SensorReading) -> None:
|
||||
"""Update Redis caches with latest sensor data"""
|
||||
# Cache latest sensor reading
|
||||
await self.redis_repo.set_sensor_data(
|
||||
reading.sensor_id,
|
||||
reading.dict(),
|
||||
expire_seconds=3600
|
||||
)
|
||||
|
||||
# Update sensor status
|
||||
status_data = {
|
||||
"status": "online",
|
||||
"last_seen": reading.timestamp,
|
||||
"room": reading.room
|
||||
}
|
||||
await self.redis_repo.set_sensor_status(
|
||||
reading.sensor_id,
|
||||
status_data,
|
||||
expire_seconds=1800
|
||||
)
|
||||
|
||||
async def _update_sensor_metadata(self, reading: SensorReading) -> None:
|
||||
"""Update or create sensor metadata"""
|
||||
existing = await self.sensor_metadata_repo.get_by_sensor_id(reading.sensor_id)
|
||||
|
||||
if existing:
|
||||
# Update existing metadata
|
||||
updates = {
|
||||
"last_seen": datetime.utcnow(),
|
||||
"status": SensorStatus.ONLINE.value
|
||||
}
|
||||
|
||||
# Add sensor type to monitoring capabilities if not present
|
||||
capabilities = existing.get("monitoring_capabilities", [])
|
||||
if reading.sensor_type.value not in capabilities:
|
||||
capabilities.append(reading.sensor_type.value)
|
||||
updates["monitoring_capabilities"] = capabilities
|
||||
|
||||
await self.sensor_metadata_repo.update(reading.sensor_id, updates)
|
||||
else:
|
||||
# Create new sensor metadata
|
||||
metadata = SensorMetadata(
|
||||
sensor_id=reading.sensor_id,
|
||||
name=f"Sensor {reading.sensor_id}",
|
||||
sensor_type=reading.sensor_type,
|
||||
room=reading.room,
|
||||
status=SensorStatus.ONLINE,
|
||||
last_seen=datetime.utcnow(),
|
||||
monitoring_capabilities=[reading.sensor_type.value]
|
||||
)
|
||||
|
||||
await self.sensor_metadata_repo.create(metadata)
|
||||
logger.info(f"Created metadata for new sensor: {reading.sensor_id}")
|
||||
|
||||
async def _check_sensor_alerts(self, reading: SensorReading) -> None:
|
||||
"""Check for alert conditions in sensor data"""
|
||||
alerts = []
|
||||
|
||||
# CO2 level alerts
|
||||
if reading.co2:
|
||||
co2_level = reading.co2.get("value", 0)
|
||||
if co2_level > 5000:
|
||||
alerts.append({
|
||||
"event_type": "co2_critical",
|
||||
"severity": "critical",
|
||||
"title": "Critical CO2 Level",
|
||||
"description": f"CO2 level ({co2_level} ppm) exceeds critical threshold in {reading.room or 'unknown room'}"
|
||||
})
|
||||
elif co2_level > 1000:
|
||||
alerts.append({
|
||||
"event_type": "co2_high",
|
||||
"severity": "warning",
|
||||
"title": "High CO2 Level",
|
||||
"description": f"CO2 level ({co2_level} ppm) is above recommended levels in {reading.room or 'unknown room'}"
|
||||
})
|
||||
|
||||
# Energy consumption alerts
|
||||
if reading.energy:
|
||||
energy_value = reading.energy.get("value", 0)
|
||||
if energy_value > 10:
|
||||
alerts.append({
|
||||
"event_type": "energy_high",
|
||||
"severity": "warning",
|
||||
"title": "High Energy Consumption",
|
||||
"description": f"Energy consumption ({energy_value} kWh) is unusually high for sensor {reading.sensor_id}"
|
||||
})
|
||||
|
||||
# Temperature alerts
|
||||
if reading.temperature:
|
||||
temp_value = reading.temperature.get("value", 0)
|
||||
if temp_value > 30 or temp_value < 15:
|
||||
alerts.append({
|
||||
"event_type": "temperature_extreme",
|
||||
"severity": "warning",
|
||||
"title": "Extreme Temperature",
|
||||
"description": f"Temperature ({temp_value}°C) is outside normal range in {reading.room or 'unknown room'}"
|
||||
})
|
||||
|
||||
# Log alerts as system events
|
||||
for alert in alerts:
|
||||
await self._log_alert_event(reading, **alert)
|
||||
|
||||
async def _log_alert_event(self, reading: SensorReading, event_type: str, severity: str,
|
||||
title: str, description: str) -> None:
|
||||
"""Log an alert as a system event"""
|
||||
from models import SystemEvent
|
||||
|
||||
event = SystemEvent(
|
||||
event_id=str(uuid.uuid4()),
|
||||
event_type=event_type,
|
||||
severity=severity,
|
||||
timestamp=int(datetime.utcnow().timestamp()),
|
||||
title=title,
|
||||
description=description,
|
||||
sensor_id=reading.sensor_id,
|
||||
room=reading.room,
|
||||
source="sensor_service",
|
||||
data=reading.dict()
|
||||
)
|
||||
|
||||
await self.system_event_repo.create(event)
|
||||
|
||||
async def _log_processing_error(self, error_message: str, raw_data: str) -> None:
|
||||
"""Log data processing error"""
|
||||
from models import SystemEvent
|
||||
|
||||
event = SystemEvent(
|
||||
event_id=str(uuid.uuid4()),
|
||||
event_type="data_processing_error",
|
||||
severity="error",
|
||||
timestamp=int(datetime.utcnow().timestamp()),
|
||||
title="Sensor Data Processing Failed",
|
||||
description=f"Failed to process sensor message: {error_message}",
|
||||
source="sensor_service",
|
||||
data={"raw_message": raw_data}
|
||||
)
|
||||
|
||||
await self.system_event_repo.create(event)
|
||||
|
||||
async def get_sensor_details(self, sensor_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""Get complete sensor details including metadata and recent readings"""
|
||||
# Get metadata
|
||||
metadata = await self.sensor_metadata_repo.get_by_sensor_id(sensor_id)
|
||||
if not metadata:
|
||||
return None
|
||||
|
||||
# Get recent readings
|
||||
recent_readings = await self.sensor_reading_repo.get_recent_by_sensor(
|
||||
sensor_id=sensor_id,
|
||||
limit=100,
|
||||
minutes=1440 # 24 hours
|
||||
)
|
||||
|
||||
# Get latest reading from cache
|
||||
latest_reading = await self.redis_repo.get_sensor_data(sensor_id)
|
||||
|
||||
return {
|
||||
"sensor": metadata,
|
||||
"latest_reading": latest_reading,
|
||||
"recent_readings_count": len(recent_readings),
|
||||
"recent_readings": recent_readings[:10] # Return only 10 most recent
|
||||
}
|
||||
|
||||
async def update_sensor_metadata(self, sensor_id: str, metadata_updates: Dict[str, Any]) -> bool:
|
||||
"""Update sensor metadata with business validation"""
|
||||
# Validate updates
|
||||
if "sensor_id" in metadata_updates:
|
||||
del metadata_updates["sensor_id"] # Cannot change sensor ID
|
||||
|
||||
# Update timestamp
|
||||
metadata_updates["updated_at"] = datetime.utcnow()
|
||||
|
||||
return await self.sensor_metadata_repo.update(sensor_id, metadata_updates)
|
||||
|
||||
async def delete_sensor(self, sensor_id: str) -> Dict[str, Any]:
|
||||
"""Delete a sensor and all its associated data"""
|
||||
# Delete readings
|
||||
readings_deleted = await self.sensor_reading_repo.delete_by_sensor_id(sensor_id)
|
||||
|
||||
# Delete metadata
|
||||
metadata_deleted = await self.sensor_metadata_repo.delete(sensor_id)
|
||||
|
||||
# Clear cache
|
||||
await self.redis_repo.delete_sensor_cache(sensor_id)
|
||||
|
||||
return {
|
||||
"sensor_id": sensor_id,
|
||||
"readings_deleted": readings_deleted,
|
||||
"metadata_deleted": metadata_deleted
|
||||
}
|
||||
|
||||
async def get_all_sensors(self, filters: Dict[str, Any] = None) -> Dict[str, Any]:
|
||||
"""Get all sensors with optional filtering"""
|
||||
sensors = await self.sensor_metadata_repo.get_all(filters)
|
||||
|
||||
return {
|
||||
"sensors": sensors,
|
||||
"count": len(sensors),
|
||||
"filters": filters or {}
|
||||
}
|
||||
@@ -1 +0,0 @@
|
||||
# Empty file to make this a Python package
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,95 +0,0 @@
|
||||
"""
|
||||
Database connection management for MongoDB
|
||||
Infrastructure Layer - handles low-level database connectivity
|
||||
"""
|
||||
import os
|
||||
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase
|
||||
from pymongo import IndexModel, ASCENDING, DESCENDING
|
||||
from typing import Optional
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class DatabaseConnection:
|
||||
"""Manages MongoDB connection and database operations"""
|
||||
|
||||
def __init__(self):
|
||||
self.client: Optional[AsyncIOMotorClient] = None
|
||||
self.database: Optional[AsyncIOMotorDatabase] = None
|
||||
self._mongodb_url = os.getenv("MONGODB_URL", "mongodb://localhost:27017")
|
||||
self._database_name = os.getenv("DATABASE_NAME", "energy_monitoring")
|
||||
|
||||
async def connect(self) -> None:
|
||||
"""Establish connection to MongoDB"""
|
||||
try:
|
||||
logger.info(f"Connecting to MongoDB at: {self._mongodb_url}")
|
||||
|
||||
self.client = AsyncIOMotorClient(self._mongodb_url)
|
||||
await self.client.admin.command('ping')
|
||||
|
||||
self.database = self.client[self._database_name]
|
||||
await self._create_indexes()
|
||||
|
||||
logger.info("Successfully connected to MongoDB")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error connecting to MongoDB: {e}")
|
||||
raise
|
||||
|
||||
async def disconnect(self) -> None:
|
||||
"""Close MongoDB connection"""
|
||||
if self.client:
|
||||
self.client.close()
|
||||
logger.info("Disconnected from MongoDB")
|
||||
|
||||
async def get_database(self) -> AsyncIOMotorDatabase:
|
||||
"""Get database instance"""
|
||||
if not self.database:
|
||||
await self.connect()
|
||||
return self.database
|
||||
|
||||
async def _create_indexes(self) -> None:
|
||||
"""Create database indexes for optimal performance"""
|
||||
try:
|
||||
# Sensor readings collection indexes
|
||||
sensor_readings_indexes = [
|
||||
IndexModel([("sensor_id", ASCENDING), ("timestamp", DESCENDING)]),
|
||||
IndexModel([("timestamp", DESCENDING)]),
|
||||
IndexModel([("room", ASCENDING), ("timestamp", DESCENDING)]),
|
||||
IndexModel([("sensor_type", ASCENDING), ("timestamp", DESCENDING)]),
|
||||
IndexModel([("created_at", DESCENDING)]),
|
||||
]
|
||||
await self.database.sensor_readings.create_indexes(sensor_readings_indexes)
|
||||
|
||||
# Room metrics collection indexes
|
||||
room_metrics_indexes = [
|
||||
IndexModel([("room", ASCENDING), ("timestamp", DESCENDING)]),
|
||||
IndexModel([("timestamp", DESCENDING)]),
|
||||
IndexModel([("created_at", DESCENDING)]),
|
||||
]
|
||||
await self.database.room_metrics.create_indexes(room_metrics_indexes)
|
||||
|
||||
# Sensor metadata collection indexes
|
||||
sensor_metadata_indexes = [
|
||||
IndexModel([("sensor_id", ASCENDING)], unique=True),
|
||||
IndexModel([("room", ASCENDING)]),
|
||||
IndexModel([("sensor_type", ASCENDING)]),
|
||||
IndexModel([("status", ASCENDING)]),
|
||||
]
|
||||
await self.database.sensor_metadata.create_indexes(sensor_metadata_indexes)
|
||||
|
||||
# System events collection indexes
|
||||
system_events_indexes = [
|
||||
IndexModel([("timestamp", DESCENDING)]),
|
||||
IndexModel([("event_type", ASCENDING), ("timestamp", DESCENDING)]),
|
||||
IndexModel([("severity", ASCENDING), ("timestamp", DESCENDING)]),
|
||||
]
|
||||
await self.database.system_events.create_indexes(system_events_indexes)
|
||||
|
||||
logger.info("Database indexes created successfully")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error creating indexes: {e}")
|
||||
|
||||
# Global database connection instance
|
||||
database_connection = DatabaseConnection()
|
||||
@@ -1,80 +0,0 @@
|
||||
"""
|
||||
Redis connection management and operations
|
||||
Infrastructure Layer - handles Redis connectivity and low-level operations
|
||||
"""
|
||||
import os
|
||||
import json
|
||||
from typing import Optional, Dict, Any
|
||||
import logging
|
||||
import redis.asyncio as redis
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class RedisConnection:
|
||||
"""Manages Redis connection and basic operations"""
|
||||
|
||||
def __init__(self):
|
||||
self.redis_client: Optional[redis.Redis] = None
|
||||
self._host = os.getenv("REDIS_HOST", "localhost")
|
||||
self._port = int(os.getenv("REDIS_PORT", "6379"))
|
||||
self._db = int(os.getenv("REDIS_DB", "0"))
|
||||
|
||||
async def connect(self) -> None:
|
||||
"""Connect to Redis"""
|
||||
try:
|
||||
self.redis_client = redis.Redis(
|
||||
host=self._host,
|
||||
port=self._port,
|
||||
db=self._db,
|
||||
decode_responses=True
|
||||
)
|
||||
await self.redis_client.ping()
|
||||
logger.info("Successfully connected to Redis")
|
||||
except Exception as e:
|
||||
logger.error(f"Error connecting to Redis: {e}")
|
||||
raise
|
||||
|
||||
async def disconnect(self) -> None:
|
||||
"""Disconnect from Redis"""
|
||||
if self.redis_client:
|
||||
await self.redis_client.close()
|
||||
logger.info("Disconnected from Redis")
|
||||
|
||||
async def get_client(self) -> redis.Redis:
|
||||
"""Get Redis client instance"""
|
||||
if not self.redis_client:
|
||||
await self.connect()
|
||||
return self.redis_client
|
||||
|
||||
async def set_with_expiry(self, key: str, value: str, expire_seconds: int = 3600) -> None:
|
||||
"""Set a key-value pair with expiration"""
|
||||
client = await self.get_client()
|
||||
await client.setex(key, expire_seconds, value)
|
||||
|
||||
async def get(self, key: str) -> Optional[str]:
|
||||
"""Get value by key"""
|
||||
client = await self.get_client()
|
||||
return await client.get(key)
|
||||
|
||||
async def delete(self, key: str) -> None:
|
||||
"""Delete a key"""
|
||||
client = await self.get_client()
|
||||
await client.delete(key)
|
||||
|
||||
async def get_keys_by_pattern(self, pattern: str) -> list:
|
||||
"""Get keys matching a pattern"""
|
||||
client = await self.get_client()
|
||||
return await client.keys(pattern)
|
||||
|
||||
async def publish(self, channel: str, message: str) -> None:
|
||||
"""Publish message to a channel"""
|
||||
client = await self.get_client()
|
||||
await client.publish(channel, message)
|
||||
|
||||
async def create_pubsub(self) -> redis.client.PubSub:
|
||||
"""Create a pub/sub instance"""
|
||||
client = await self.get_client()
|
||||
return client.pubsub()
|
||||
|
||||
# Global Redis connection instance
|
||||
redis_connection = RedisConnection()
|
||||
@@ -1,362 +0,0 @@
|
||||
"""
|
||||
Repository classes for data access
|
||||
Infrastructure Layer - handles database operations and queries
|
||||
"""
|
||||
import json
|
||||
from datetime import datetime, timedelta
|
||||
from typing import List, Dict, Any, Optional
|
||||
from pymongo import ASCENDING, DESCENDING
|
||||
from pymongo.errors import DuplicateKeyError
|
||||
import logging
|
||||
|
||||
from .database_connection import database_connection
|
||||
from .redis_connection import redis_connection
|
||||
from models import SensorReading, SensorMetadata, RoomMetrics, SystemEvent
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class SensorReadingRepository:
|
||||
"""Repository for sensor reading data operations"""
|
||||
|
||||
async def create(self, reading: SensorReading) -> bool:
|
||||
"""Store sensor reading in MongoDB"""
|
||||
try:
|
||||
db = await database_connection.get_database()
|
||||
reading_dict = reading.dict()
|
||||
|
||||
# Add document ID for deduplication
|
||||
reading_dict["_id"] = f"{reading.sensor_id}_{reading.timestamp}"
|
||||
|
||||
await db.sensor_readings.insert_one(reading_dict)
|
||||
logger.debug(f"Stored sensor reading for {reading.sensor_id}")
|
||||
return True
|
||||
|
||||
except DuplicateKeyError:
|
||||
logger.debug(f"Duplicate reading ignored for {reading.sensor_id} at {reading.timestamp}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error storing sensor reading: {e}")
|
||||
return False
|
||||
|
||||
async def get_recent_by_sensor(self, sensor_id: str, limit: int = 100, minutes: int = 60) -> List[Dict]:
|
||||
"""Get recent readings for a specific sensor"""
|
||||
try:
|
||||
db = await database_connection.get_database()
|
||||
query = {
|
||||
"sensor_id": sensor_id,
|
||||
"created_at": {"$gte": datetime.utcnow() - timedelta(minutes=minutes)}
|
||||
}
|
||||
|
||||
cursor = db.sensor_readings.find(query).sort("created_at", -1).limit(limit)
|
||||
readings = await cursor.to_list(length=limit)
|
||||
|
||||
# Convert ObjectId to string
|
||||
for reading in readings:
|
||||
reading["_id"] = str(reading["_id"])
|
||||
|
||||
return readings
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting recent readings for {sensor_id}: {e}")
|
||||
return []
|
||||
|
||||
async def get_recent_by_room(self, room: str, minutes: int = 5) -> List[Dict]:
|
||||
"""Get recent readings for a specific room"""
|
||||
try:
|
||||
db = await database_connection.get_database()
|
||||
recent_time = datetime.utcnow() - timedelta(minutes=minutes)
|
||||
|
||||
cursor = db.sensor_readings.find({
|
||||
"room": room,
|
||||
"created_at": {"$gte": recent_time}
|
||||
})
|
||||
|
||||
readings = await cursor.to_list(length=None)
|
||||
return readings
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting recent readings for room {room}: {e}")
|
||||
return []
|
||||
|
||||
async def get_by_query(self, query: Dict[str, Any], sort_by: str = "timestamp",
|
||||
sort_order: str = "desc", limit: int = 100, offset: int = 0) -> List[Dict]:
|
||||
"""Get readings by complex query"""
|
||||
try:
|
||||
db = await database_connection.get_database()
|
||||
|
||||
sort_direction = DESCENDING if sort_order == "desc" else ASCENDING
|
||||
cursor = db.sensor_readings.find(query).sort(sort_by, sort_direction).skip(offset).limit(limit)
|
||||
|
||||
readings = await cursor.to_list(length=limit)
|
||||
|
||||
# Convert ObjectId to string
|
||||
for reading in readings:
|
||||
reading["_id"] = str(reading["_id"])
|
||||
|
||||
return readings
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error querying sensor readings: {e}")
|
||||
return []
|
||||
|
||||
async def count_by_query(self, query: Dict[str, Any]) -> int:
|
||||
"""Count readings matching query"""
|
||||
try:
|
||||
db = await database_connection.get_database()
|
||||
return await db.sensor_readings.count_documents(query)
|
||||
except Exception as e:
|
||||
logger.error(f"Error counting sensor readings: {e}")
|
||||
return 0
|
||||
|
||||
async def get_distinct_rooms(self) -> List[str]:
|
||||
"""Get list of distinct rooms"""
|
||||
try:
|
||||
db = await database_connection.get_database()
|
||||
return await db.sensor_readings.distinct("room", {"room": {"$ne": None}})
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting distinct rooms: {e}")
|
||||
return []
|
||||
|
||||
async def get_distinct_sensor_ids_by_room(self, room: str) -> List[str]:
|
||||
"""Get distinct sensor IDs for a room"""
|
||||
try:
|
||||
db = await database_connection.get_database()
|
||||
return await db.sensor_readings.distinct("sensor_id", {"room": room})
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting distinct sensor IDs for room {room}: {e}")
|
||||
return []
|
||||
|
||||
async def delete_by_sensor_id(self, sensor_id: str) -> int:
|
||||
"""Delete all readings for a sensor"""
|
||||
try:
|
||||
db = await database_connection.get_database()
|
||||
result = await db.sensor_readings.delete_many({"sensor_id": sensor_id})
|
||||
return result.deleted_count
|
||||
except Exception as e:
|
||||
logger.error(f"Error deleting readings for sensor {sensor_id}: {e}")
|
||||
return 0
|
||||
|
||||
async def aggregate(self, pipeline: List[Dict]) -> List[Dict]:
|
||||
"""Execute aggregation pipeline"""
|
||||
try:
|
||||
db = await database_connection.get_database()
|
||||
cursor = db.sensor_readings.aggregate(pipeline)
|
||||
return await cursor.to_list(length=None)
|
||||
except Exception as e:
|
||||
logger.error(f"Error executing aggregation: {e}")
|
||||
return []
|
||||
|
||||
class SensorMetadataRepository:
|
||||
"""Repository for sensor metadata operations"""
|
||||
|
||||
async def create(self, metadata: SensorMetadata) -> bool:
|
||||
"""Create sensor metadata"""
|
||||
try:
|
||||
db = await database_connection.get_database()
|
||||
await db.sensor_metadata.insert_one(metadata.dict())
|
||||
logger.info(f"Created metadata for sensor: {metadata.sensor_id}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error creating sensor metadata: {e}")
|
||||
return False
|
||||
|
||||
async def update(self, sensor_id: str, updates: Dict[str, Any]) -> bool:
|
||||
"""Update sensor metadata"""
|
||||
try:
|
||||
db = await database_connection.get_database()
|
||||
updates["updated_at"] = datetime.utcnow()
|
||||
|
||||
result = await db.sensor_metadata.update_one(
|
||||
{"sensor_id": sensor_id},
|
||||
{"$set": updates}
|
||||
)
|
||||
return result.modified_count > 0
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating sensor metadata: {e}")
|
||||
return False
|
||||
|
||||
async def get_by_sensor_id(self, sensor_id: str) -> Optional[Dict]:
|
||||
"""Get sensor metadata by ID"""
|
||||
try:
|
||||
db = await database_connection.get_database()
|
||||
metadata = await db.sensor_metadata.find_one({"sensor_id": sensor_id})
|
||||
if metadata:
|
||||
metadata["_id"] = str(metadata["_id"])
|
||||
return metadata
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting sensor metadata: {e}")
|
||||
return None
|
||||
|
||||
async def get_all(self, filters: Dict[str, Any] = None) -> List[Dict]:
|
||||
"""Get all sensor metadata with optional filters"""
|
||||
try:
|
||||
db = await database_connection.get_database()
|
||||
query = filters or {}
|
||||
|
||||
cursor = db.sensor_metadata.find(query).sort("created_at", DESCENDING)
|
||||
metadata_list = await cursor.to_list(length=None)
|
||||
|
||||
# Convert ObjectId to string
|
||||
for metadata in metadata_list:
|
||||
metadata["_id"] = str(metadata["_id"])
|
||||
|
||||
return metadata_list
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting sensor metadata: {e}")
|
||||
return []
|
||||
|
||||
async def delete(self, sensor_id: str) -> bool:
|
||||
"""Delete sensor metadata"""
|
||||
try:
|
||||
db = await database_connection.get_database()
|
||||
result = await db.sensor_metadata.delete_one({"sensor_id": sensor_id})
|
||||
return result.deleted_count > 0
|
||||
except Exception as e:
|
||||
logger.error(f"Error deleting sensor metadata: {e}")
|
||||
return False
|
||||
|
||||
class RoomMetricsRepository:
|
||||
"""Repository for room metrics operations"""
|
||||
|
||||
async def create(self, metrics: RoomMetrics) -> bool:
|
||||
"""Store room metrics"""
|
||||
try:
|
||||
db = await database_connection.get_database()
|
||||
await db.room_metrics.insert_one(metrics.dict())
|
||||
logger.debug(f"Stored room metrics for {metrics.room}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error storing room metrics: {e}")
|
||||
return False
|
||||
|
||||
async def get_by_room(self, room: str, limit: int = 100) -> List[Dict]:
|
||||
"""Get room metrics by room name"""
|
||||
try:
|
||||
db = await database_connection.get_database()
|
||||
cursor = db.room_metrics.find({"room": room}).sort("timestamp", DESCENDING).limit(limit)
|
||||
metrics = await cursor.to_list(length=limit)
|
||||
|
||||
# Convert ObjectId to string
|
||||
for metric in metrics:
|
||||
metric["_id"] = str(metric["_id"])
|
||||
|
||||
return metrics
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting room metrics for {room}: {e}")
|
||||
return []
|
||||
|
||||
class SystemEventRepository:
|
||||
"""Repository for system events operations"""
|
||||
|
||||
async def create(self, event: SystemEvent) -> bool:
|
||||
"""Create system event"""
|
||||
try:
|
||||
db = await database_connection.get_database()
|
||||
await db.system_events.insert_one(event.dict())
|
||||
logger.info(f"System event logged: {event.event_type} - {event.title}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error logging system event: {e}")
|
||||
return False
|
||||
|
||||
async def get_recent(self, hours: int = 24, limit: int = 50,
|
||||
filters: Dict[str, Any] = None) -> List[Dict]:
|
||||
"""Get recent system events"""
|
||||
try:
|
||||
db = await database_connection.get_database()
|
||||
start_time = datetime.utcnow() - timedelta(hours=hours)
|
||||
|
||||
query = {"created_at": {"$gte": start_time}}
|
||||
if filters:
|
||||
query.update(filters)
|
||||
|
||||
cursor = db.system_events.find(query).sort("timestamp", DESCENDING).limit(limit)
|
||||
events = await cursor.to_list(length=limit)
|
||||
|
||||
# Convert ObjectId to string
|
||||
for event in events:
|
||||
event["_id"] = str(event["_id"])
|
||||
|
||||
return events
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting recent events: {e}")
|
||||
return []
|
||||
|
||||
class RedisRepository:
|
||||
"""Repository for Redis cache operations"""
|
||||
|
||||
async def set_sensor_data(self, sensor_id: str, data: Dict[str, Any], expire_seconds: int = 3600) -> bool:
|
||||
"""Store latest sensor data in Redis cache"""
|
||||
try:
|
||||
key = f"sensor:latest:{sensor_id}"
|
||||
json_data = json.dumps(data)
|
||||
await redis_connection.set_with_expiry(key, json_data, expire_seconds)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error caching sensor data: {e}")
|
||||
return False
|
||||
|
||||
async def get_sensor_data(self, sensor_id: str) -> Optional[Dict[str, Any]]:
|
||||
"""Get latest sensor data from Redis cache"""
|
||||
try:
|
||||
key = f"sensor:latest:{sensor_id}"
|
||||
data = await redis_connection.get(key)
|
||||
if data:
|
||||
return json.loads(data)
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting cached sensor data: {e}")
|
||||
return None
|
||||
|
||||
async def set_sensor_status(self, sensor_id: str, status_data: Dict[str, Any], expire_seconds: int = 1800) -> bool:
|
||||
"""Set sensor status in Redis"""
|
||||
try:
|
||||
key = f"sensor:status:{sensor_id}"
|
||||
json_data = json.dumps(status_data)
|
||||
await redis_connection.set_with_expiry(key, json_data, expire_seconds)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error setting sensor status: {e}")
|
||||
return False
|
||||
|
||||
async def set_room_metrics(self, room: str, metrics: Dict[str, Any], expire_seconds: int = 1800) -> bool:
|
||||
"""Store room metrics in Redis cache"""
|
||||
try:
|
||||
key = f"room:metrics:{room}"
|
||||
json_data = json.dumps(metrics)
|
||||
await redis_connection.set_with_expiry(key, json_data, expire_seconds)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error caching room metrics: {e}")
|
||||
return False
|
||||
|
||||
async def get_room_metrics(self, room: str) -> Optional[Dict[str, Any]]:
|
||||
"""Get room metrics from Redis cache"""
|
||||
try:
|
||||
key = f"room:metrics:{room}"
|
||||
data = await redis_connection.get(key)
|
||||
if data:
|
||||
return json.loads(data)
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting cached room metrics: {e}")
|
||||
return None
|
||||
|
||||
async def get_active_sensors(self) -> List[str]:
|
||||
"""Get list of active sensors from Redis"""
|
||||
try:
|
||||
keys = await redis_connection.get_keys_by_pattern("sensor:latest:*")
|
||||
return [key.replace("sensor:latest:", "") for key in keys]
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting active sensors: {e}")
|
||||
return []
|
||||
|
||||
async def delete_sensor_cache(self, sensor_id: str) -> bool:
|
||||
"""Delete all cached data for a sensor"""
|
||||
try:
|
||||
await redis_connection.delete(f"sensor:latest:{sensor_id}")
|
||||
await redis_connection.delete(f"sensor:status:{sensor_id}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error deleting sensor cache: {e}")
|
||||
return False
|
||||
@@ -1 +0,0 @@
|
||||
# Empty file to make this a Python package
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -1,404 +0,0 @@
|
||||
"""
|
||||
API routes for the energy monitoring system
|
||||
Presentation Layer - handles HTTP endpoints and request/response formatting
|
||||
"""
|
||||
from fastapi import APIRouter, HTTPException, Query, Depends
|
||||
from typing import List, Optional, Dict, Any
|
||||
from datetime import datetime, timedelta
|
||||
import time
|
||||
import logging
|
||||
|
||||
from models import (
|
||||
DataQuery, DataResponse, SensorType, SensorStatus, HealthCheck
|
||||
)
|
||||
from ..business.sensor_service import SensorService
|
||||
from ..business.room_service import RoomService
|
||||
from ..business.analytics_service import AnalyticsService
|
||||
from ..infrastructure.database_connection import database_connection
|
||||
from ..infrastructure.redis_connection import redis_connection
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
router = APIRouter()
|
||||
|
||||
# Initialize services
|
||||
sensor_service = SensorService()
|
||||
room_service = RoomService()
|
||||
analytics_service = AnalyticsService()
|
||||
|
||||
# Dependency to check database connection
|
||||
async def check_database():
|
||||
"""Dependency to ensure database is connected"""
|
||||
try:
|
||||
db = await database_connection.get_database()
|
||||
return db
|
||||
except Exception as e:
|
||||
logger.error(f"Database connection failed: {e}")
|
||||
raise HTTPException(status_code=503, detail="Database unavailable")
|
||||
|
||||
@router.get("/sensors", summary="Get all sensors")
|
||||
async def get_sensors(
|
||||
room: Optional[str] = Query(None, description="Filter by room"),
|
||||
sensor_type: Optional[SensorType] = Query(None, description="Filter by sensor type"),
|
||||
status: Optional[SensorStatus] = Query(None, description="Filter by status"),
|
||||
db=Depends(check_database)
|
||||
):
|
||||
"""Get list of all registered sensors with optional filtering"""
|
||||
try:
|
||||
# Build filters
|
||||
filters = {}
|
||||
if room:
|
||||
filters["room"] = room
|
||||
if sensor_type:
|
||||
filters["sensor_type"] = sensor_type.value
|
||||
if status:
|
||||
filters["status"] = status.value
|
||||
|
||||
result = await sensor_service.get_all_sensors(filters)
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting sensors: {e}")
|
||||
raise HTTPException(status_code=500, detail="Internal server error")
|
||||
|
||||
@router.get("/sensors/{sensor_id}", summary="Get sensor details")
|
||||
async def get_sensor(sensor_id: str, db=Depends(check_database)):
|
||||
"""Get detailed information about a specific sensor"""
|
||||
try:
|
||||
result = await sensor_service.get_sensor_details(sensor_id)
|
||||
|
||||
if not result:
|
||||
raise HTTPException(status_code=404, detail="Sensor not found")
|
||||
|
||||
return result
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting sensor {sensor_id}: {e}")
|
||||
raise HTTPException(status_code=500, detail="Internal server error")
|
||||
|
||||
@router.get("/sensors/{sensor_id}/data", summary="Get sensor historical data")
|
||||
async def get_sensor_data(
|
||||
sensor_id: str,
|
||||
start_time: Optional[int] = Query(None, description="Start timestamp (Unix)"),
|
||||
end_time: Optional[int] = Query(None, description="End timestamp (Unix)"),
|
||||
limit: int = Query(100, description="Maximum records to return"),
|
||||
offset: int = Query(0, description="Records to skip"),
|
||||
db=Depends(check_database)
|
||||
):
|
||||
"""Get historical data for a specific sensor"""
|
||||
try:
|
||||
start_query_time = time.time()
|
||||
|
||||
# Build query
|
||||
query = {"sensor_id": sensor_id}
|
||||
|
||||
if start_time or end_time:
|
||||
time_query = {}
|
||||
if start_time:
|
||||
time_query["$gte"] = datetime.fromtimestamp(start_time)
|
||||
if end_time:
|
||||
time_query["$lte"] = datetime.fromtimestamp(end_time)
|
||||
query["created_at"] = time_query
|
||||
|
||||
# Get total count and readings through service layer
|
||||
from ..infrastructure.repositories import SensorReadingRepository
|
||||
repo = SensorReadingRepository()
|
||||
|
||||
total_count = await repo.count_by_query(query)
|
||||
readings = await repo.get_by_query(
|
||||
query=query,
|
||||
sort_by="timestamp",
|
||||
sort_order="desc",
|
||||
limit=limit,
|
||||
offset=offset
|
||||
)
|
||||
|
||||
execution_time = (time.time() - start_query_time) * 1000
|
||||
|
||||
return DataResponse(
|
||||
data=readings,
|
||||
total_count=total_count,
|
||||
query=DataQuery(
|
||||
sensor_ids=[sensor_id],
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
limit=limit,
|
||||
offset=offset
|
||||
),
|
||||
execution_time_ms=execution_time
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting sensor data for {sensor_id}: {e}")
|
||||
raise HTTPException(status_code=500, detail="Internal server error")
|
||||
|
||||
@router.get("/rooms", summary="Get all rooms")
|
||||
async def get_rooms(db=Depends(check_database)):
|
||||
"""Get list of all rooms with sensor counts and latest metrics"""
|
||||
try:
|
||||
result = await room_service.get_all_rooms()
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting rooms: {e}")
|
||||
raise HTTPException(status_code=500, detail="Internal server error")
|
||||
|
||||
@router.get("/rooms/{room_name}/data", summary="Get room historical data")
|
||||
async def get_room_data(
|
||||
room_name: str,
|
||||
start_time: Optional[int] = Query(None, description="Start timestamp (Unix)"),
|
||||
end_time: Optional[int] = Query(None, description="End timestamp (Unix)"),
|
||||
limit: int = Query(100, description="Maximum records to return"),
|
||||
db=Depends(check_database)
|
||||
):
|
||||
"""Get historical data for a specific room"""
|
||||
try:
|
||||
start_query_time = time.time()
|
||||
|
||||
result = await room_service.get_room_data(
|
||||
room_name=room_name,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
limit=limit
|
||||
)
|
||||
|
||||
execution_time = (time.time() - start_query_time) * 1000
|
||||
result["execution_time_ms"] = execution_time
|
||||
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting room data for {room_name}: {e}")
|
||||
raise HTTPException(status_code=500, detail="Internal server error")
|
||||
|
||||
@router.post("/data/query", summary="Advanced data query", response_model=DataResponse)
|
||||
async def query_data(query_params: DataQuery, db=Depends(check_database)):
|
||||
"""Advanced data querying with multiple filters and aggregations"""
|
||||
try:
|
||||
start_query_time = time.time()
|
||||
|
||||
# Build MongoDB query
|
||||
mongo_query = {}
|
||||
|
||||
# Sensor filters
|
||||
if query_params.sensor_ids:
|
||||
mongo_query["sensor_id"] = {"$in": query_params.sensor_ids}
|
||||
|
||||
if query_params.rooms:
|
||||
mongo_query["room"] = {"$in": query_params.rooms}
|
||||
|
||||
if query_params.sensor_types:
|
||||
mongo_query["sensor_type"] = {"$in": [st.value for st in query_params.sensor_types]}
|
||||
|
||||
# Time range
|
||||
if query_params.start_time or query_params.end_time:
|
||||
time_query = {}
|
||||
if query_params.start_time:
|
||||
time_query["$gte"] = datetime.fromtimestamp(query_params.start_time)
|
||||
if query_params.end_time:
|
||||
time_query["$lte"] = datetime.fromtimestamp(query_params.end_time)
|
||||
mongo_query["created_at"] = time_query
|
||||
|
||||
# Execute query through repository
|
||||
from ..infrastructure.repositories import SensorReadingRepository
|
||||
repo = SensorReadingRepository()
|
||||
|
||||
total_count = await repo.count_by_query(mongo_query)
|
||||
readings = await repo.get_by_query(
|
||||
query=mongo_query,
|
||||
sort_by=query_params.sort_by,
|
||||
sort_order=query_params.sort_order,
|
||||
limit=query_params.limit,
|
||||
offset=query_params.offset
|
||||
)
|
||||
|
||||
execution_time = (time.time() - start_query_time) * 1000
|
||||
|
||||
return DataResponse(
|
||||
data=readings,
|
||||
total_count=total_count,
|
||||
query=query_params,
|
||||
execution_time_ms=execution_time
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error executing data query: {e}")
|
||||
raise HTTPException(status_code=500, detail="Internal server error")
|
||||
|
||||
@router.get("/analytics/summary", summary="Get analytics summary")
|
||||
async def get_analytics_summary(
|
||||
hours: int = Query(24, description="Hours of data to analyze"),
|
||||
db=Depends(check_database)
|
||||
):
|
||||
"""Get analytics summary for the specified time period"""
|
||||
try:
|
||||
result = await analytics_service.get_analytics_summary(hours)
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting analytics summary: {e}")
|
||||
raise HTTPException(status_code=500, detail="Internal server error")
|
||||
|
||||
@router.get("/analytics/trends", summary="Get energy trends")
|
||||
async def get_energy_trends(
|
||||
hours: int = Query(168, description="Hours of data to analyze (default: 1 week)"),
|
||||
db=Depends(check_database)
|
||||
):
|
||||
"""Get energy consumption trends"""
|
||||
try:
|
||||
result = await analytics_service.get_energy_trends(hours)
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting energy trends: {e}")
|
||||
raise HTTPException(status_code=500, detail="Internal server error")
|
||||
|
||||
@router.get("/analytics/rooms", summary="Get room comparison analytics")
|
||||
async def get_room_comparison(
|
||||
hours: int = Query(24, description="Hours of data to analyze"),
|
||||
db=Depends(check_database)
|
||||
):
|
||||
"""Get room-by-room comparison analytics"""
|
||||
try:
|
||||
result = await analytics_service.get_room_comparison(hours)
|
||||
return result
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting room comparison: {e}")
|
||||
raise HTTPException(status_code=500, detail="Internal server error")
|
||||
|
||||
@router.get("/events", summary="Get system events")
|
||||
async def get_events(
|
||||
severity: Optional[str] = Query(None, description="Filter by severity"),
|
||||
event_type: Optional[str] = Query(None, description="Filter by event type"),
|
||||
hours: int = Query(24, description="Hours of events to retrieve"),
|
||||
limit: int = Query(50, description="Maximum events to return"),
|
||||
db=Depends(check_database)
|
||||
):
|
||||
"""Get recent system events and alerts"""
|
||||
try:
|
||||
# Build filters
|
||||
filters = {}
|
||||
if severity:
|
||||
filters["severity"] = severity
|
||||
if event_type:
|
||||
filters["event_type"] = event_type
|
||||
|
||||
from ..infrastructure.repositories import SystemEventRepository
|
||||
repo = SystemEventRepository()
|
||||
|
||||
events = await repo.get_recent(
|
||||
hours=hours,
|
||||
limit=limit,
|
||||
filters=filters
|
||||
)
|
||||
|
||||
return {
|
||||
"events": events,
|
||||
"count": len(events),
|
||||
"period_hours": hours
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting events: {e}")
|
||||
raise HTTPException(status_code=500, detail="Internal server error")
|
||||
|
||||
@router.put("/sensors/{sensor_id}/metadata", summary="Update sensor metadata")
|
||||
async def update_sensor_metadata(
|
||||
sensor_id: str,
|
||||
metadata: dict,
|
||||
db=Depends(check_database)
|
||||
):
|
||||
"""Update sensor metadata"""
|
||||
try:
|
||||
success = await sensor_service.update_sensor_metadata(sensor_id, metadata)
|
||||
|
||||
if not success:
|
||||
raise HTTPException(status_code=404, detail="Sensor not found")
|
||||
|
||||
return {"message": "Sensor metadata updated successfully"}
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating sensor metadata: {e}")
|
||||
raise HTTPException(status_code=500, detail="Internal server error")
|
||||
|
||||
@router.delete("/sensors/{sensor_id}", summary="Delete sensor and all its data")
|
||||
async def delete_sensor(sensor_id: str, db=Depends(check_database)):
|
||||
"""Delete a sensor and all its associated data"""
|
||||
try:
|
||||
result = await sensor_service.delete_sensor(sensor_id)
|
||||
|
||||
if result["readings_deleted"] == 0 and not result.get("metadata_deleted"):
|
||||
raise HTTPException(status_code=404, detail="Sensor not found")
|
||||
|
||||
return {
|
||||
"message": "Sensor deleted successfully",
|
||||
**result
|
||||
}
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Error deleting sensor {sensor_id}: {e}")
|
||||
raise HTTPException(status_code=500, detail="Internal server error")
|
||||
|
||||
@router.get("/export", summary="Export data")
|
||||
async def export_data(
|
||||
start_time: int = Query(..., description="Start timestamp (Unix)"),
|
||||
end_time: int = Query(..., description="End timestamp (Unix)"),
|
||||
sensor_ids: Optional[str] = Query(None, description="Comma-separated sensor IDs"),
|
||||
format: str = Query("json", description="Export format (json, csv)"),
|
||||
db=Depends(check_database)
|
||||
):
|
||||
"""Export sensor data for the specified time range"""
|
||||
try:
|
||||
# Build query
|
||||
query = {
|
||||
"created_at": {
|
||||
"$gte": datetime.fromtimestamp(start_time),
|
||||
"$lte": datetime.fromtimestamp(end_time)
|
||||
}
|
||||
}
|
||||
|
||||
if sensor_ids:
|
||||
sensor_list = [sid.strip() for sid in sensor_ids.split(",")]
|
||||
query["sensor_id"] = {"$in": sensor_list}
|
||||
|
||||
# Get data through repository
|
||||
from ..infrastructure.repositories import SensorReadingRepository
|
||||
repo = SensorReadingRepository()
|
||||
|
||||
readings = await repo.get_by_query(
|
||||
query=query,
|
||||
sort_by="timestamp",
|
||||
sort_order="asc",
|
||||
limit=10000 # Large limit for export
|
||||
)
|
||||
|
||||
# Convert datetime fields for JSON serialization
|
||||
for reading in readings:
|
||||
if "created_at" in reading and hasattr(reading["created_at"], "isoformat"):
|
||||
reading["created_at"] = reading["created_at"].isoformat()
|
||||
|
||||
if format.lower() == "csv":
|
||||
raise HTTPException(status_code=501, detail="CSV export not yet implemented")
|
||||
|
||||
return {
|
||||
"data": readings,
|
||||
"count": len(readings),
|
||||
"export_params": {
|
||||
"start_time": start_time,
|
||||
"end_time": end_time,
|
||||
"sensor_ids": sensor_ids.split(",") if sensor_ids else None,
|
||||
"format": format
|
||||
}
|
||||
}
|
||||
|
||||
except HTTPException:
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Error exporting data: {e}")
|
||||
raise HTTPException(status_code=500, detail="Internal server error")
|
||||
@@ -1,128 +0,0 @@
|
||||
"""
|
||||
Redis subscriber for real-time data processing
|
||||
Presentation Layer - handles Redis pub/sub and WebSocket broadcasting
|
||||
"""
|
||||
import asyncio
|
||||
import logging
|
||||
|
||||
from ..infrastructure.redis_connection import redis_connection
|
||||
from ..business.sensor_service import SensorService
|
||||
from ..business.room_service import RoomService
|
||||
from .websocket_handler import websocket_manager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class RedisSubscriber:
|
||||
"""Manages Redis subscription and data broadcasting"""
|
||||
|
||||
def __init__(self):
|
||||
self.sensor_service = SensorService()
|
||||
self.room_service = RoomService()
|
||||
self.is_running = False
|
||||
self.subscription_task = None
|
||||
|
||||
async def start_subscription(self, channel: str = "energy_data") -> None:
|
||||
"""Start Redis subscription in background task"""
|
||||
if self.is_running:
|
||||
logger.warning("Redis subscriber is already running")
|
||||
return
|
||||
|
||||
self.is_running = True
|
||||
self.subscription_task = asyncio.create_task(self._subscribe_loop(channel))
|
||||
logger.info(f"Started Redis subscriber for channel: {channel}")
|
||||
|
||||
async def stop_subscription(self) -> None:
|
||||
"""Stop Redis subscription"""
|
||||
self.is_running = False
|
||||
if self.subscription_task:
|
||||
self.subscription_task.cancel()
|
||||
try:
|
||||
await self.subscription_task
|
||||
except asyncio.CancelledError:
|
||||
pass
|
||||
logger.info("Redis subscriber stopped")
|
||||
|
||||
async def _subscribe_loop(self, channel: str) -> None:
|
||||
"""Main subscription loop"""
|
||||
logger.info("Starting Redis subscriber...")
|
||||
|
||||
try:
|
||||
# Get Redis client and create pubsub
|
||||
redis_client = await redis_connection.get_client()
|
||||
pubsub = await redis_connection.create_pubsub()
|
||||
|
||||
# Subscribe to channel
|
||||
await pubsub.subscribe(channel)
|
||||
logger.info(f"Subscribed to Redis channel: '{channel}'")
|
||||
|
||||
while self.is_running:
|
||||
try:
|
||||
# Get message with timeout
|
||||
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0)
|
||||
|
||||
if message and message.get('data'):
|
||||
await self._process_message(message['data'])
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in Redis subscriber loop: {e}")
|
||||
# Add delay to prevent rapid-fire errors
|
||||
await asyncio.sleep(5)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Could not connect to Redis for subscription: {e}")
|
||||
finally:
|
||||
# Clean up pubsub connection
|
||||
try:
|
||||
await pubsub.unsubscribe(channel)
|
||||
await pubsub.close()
|
||||
except Exception as e:
|
||||
logger.error(f"Error closing pubsub connection: {e}")
|
||||
|
||||
async def _process_message(self, message_data: str) -> None:
|
||||
"""Process incoming Redis message"""
|
||||
try:
|
||||
logger.debug(f"Received from Redis: {message_data}")
|
||||
|
||||
# Process sensor data through business layer
|
||||
processing_success = await self.sensor_service.process_sensor_message(message_data)
|
||||
|
||||
if processing_success:
|
||||
# Extract room from message for room metrics update
|
||||
import json
|
||||
try:
|
||||
data = json.loads(message_data)
|
||||
room = data.get('room')
|
||||
if room:
|
||||
# Update room metrics asynchronously
|
||||
asyncio.create_task(self.room_service.update_room_metrics(room))
|
||||
except json.JSONDecodeError:
|
||||
logger.warning("Could not parse message for room extraction")
|
||||
|
||||
# Broadcast to WebSocket clients
|
||||
await websocket_manager.broadcast(message_data)
|
||||
else:
|
||||
logger.warning("Sensor data processing failed, skipping broadcast")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing Redis message: {e}")
|
||||
|
||||
def is_subscriber_running(self) -> bool:
|
||||
"""Check if subscriber is currently running"""
|
||||
return self.is_running and (
|
||||
self.subscription_task is not None and
|
||||
not self.subscription_task.done()
|
||||
)
|
||||
|
||||
async def get_subscriber_status(self) -> dict:
|
||||
"""Get subscriber status information"""
|
||||
return {
|
||||
"is_running": self.is_running,
|
||||
"task_status": (
|
||||
"running" if self.subscription_task and not self.subscription_task.done()
|
||||
else "stopped"
|
||||
),
|
||||
"active_websocket_connections": websocket_manager.get_connection_count()
|
||||
}
|
||||
|
||||
# Global Redis subscriber instance
|
||||
redis_subscriber = RedisSubscriber()
|
||||
@@ -1,97 +0,0 @@
|
||||
"""
|
||||
WebSocket connection handler
|
||||
Presentation Layer - manages WebSocket connections and real-time communication
|
||||
"""
|
||||
import asyncio
|
||||
from typing import List
|
||||
from fastapi import WebSocket, WebSocketDisconnect
|
||||
import logging
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class WebSocketManager:
|
||||
"""Manages WebSocket connections and broadcasting"""
|
||||
|
||||
def __init__(self):
|
||||
self.active_connections: List[WebSocket] = []
|
||||
|
||||
async def connect(self, websocket: WebSocket) -> None:
|
||||
"""Accept and store new WebSocket connection"""
|
||||
await websocket.accept()
|
||||
self.active_connections.append(websocket)
|
||||
logger.info(f"New client connected. Total clients: {len(self.active_connections)}")
|
||||
|
||||
def disconnect(self, websocket: WebSocket) -> None:
|
||||
"""Remove WebSocket connection"""
|
||||
if websocket in self.active_connections:
|
||||
self.active_connections.remove(websocket)
|
||||
logger.info(f"Client disconnected. Total clients: {len(self.active_connections)}")
|
||||
|
||||
async def send_personal_message(self, message: str, websocket: WebSocket) -> None:
|
||||
"""Send message to specific WebSocket connection"""
|
||||
try:
|
||||
await websocket.send_text(message)
|
||||
except Exception as e:
|
||||
logger.error(f"Error sending personal message: {e}")
|
||||
self.disconnect(websocket)
|
||||
|
||||
async def broadcast(self, message: str) -> None:
|
||||
"""Broadcast message to all connected clients"""
|
||||
if not self.active_connections:
|
||||
return
|
||||
|
||||
try:
|
||||
# Send to all connections concurrently
|
||||
tasks = [
|
||||
self._safe_send_message(connection, message)
|
||||
for connection in self.active_connections.copy()
|
||||
]
|
||||
|
||||
# Execute all sends concurrently and handle exceptions
|
||||
results = await asyncio.gather(*tasks, return_exceptions=True)
|
||||
|
||||
# Remove failed connections
|
||||
failed_connections = []
|
||||
for i, result in enumerate(results):
|
||||
if isinstance(result, Exception):
|
||||
failed_connections.append(self.active_connections[i])
|
||||
|
||||
for connection in failed_connections:
|
||||
self.disconnect(connection)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in broadcast: {e}")
|
||||
|
||||
async def _safe_send_message(self, websocket: WebSocket, message: str) -> None:
|
||||
"""Safely send message to WebSocket with error handling"""
|
||||
try:
|
||||
await websocket.send_text(message)
|
||||
except WebSocketDisconnect:
|
||||
# Connection was closed
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.error(f"Error sending message to client: {e}")
|
||||
raise
|
||||
|
||||
def get_connection_count(self) -> int:
|
||||
"""Get number of active connections"""
|
||||
return len(self.active_connections)
|
||||
|
||||
async def ping_all_connections(self) -> int:
|
||||
"""Ping all connections to check health, return number of healthy connections"""
|
||||
if not self.active_connections:
|
||||
return 0
|
||||
|
||||
healthy_connections = []
|
||||
for connection in self.active_connections.copy():
|
||||
try:
|
||||
await connection.ping()
|
||||
healthy_connections.append(connection)
|
||||
except Exception:
|
||||
logger.debug("Removing unhealthy connection")
|
||||
|
||||
self.active_connections = healthy_connections
|
||||
return len(healthy_connections)
|
||||
|
||||
# Global WebSocket manager instance
|
||||
websocket_manager = WebSocketManager()
|
||||
202
main.py
202
main.py
@@ -1,202 +0,0 @@
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
import redis.asyncio as redis
|
||||
import time
|
||||
import os
|
||||
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException, Depends, Query
|
||||
from fastapi.middleware.cors import CORSMiddleware
|
||||
from typing import List, Optional
|
||||
import logging
|
||||
from contextlib import asynccontextmanager
|
||||
|
||||
# Import our custom modules
|
||||
from database import connect_to_mongo, close_mongo_connection, redis_manager, schedule_cleanup
|
||||
from persistence import persistence_service
|
||||
from models import DataQuery, DataResponse, HealthCheck
|
||||
from api import router as api_router
|
||||
|
||||
# Configure logging
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Application startup time for uptime calculation
|
||||
app_start_time = time.time()
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
"""Application lifespan manager"""
|
||||
# Startup
|
||||
logger.info("Application starting up...")
|
||||
|
||||
# Connect to databases
|
||||
await connect_to_mongo()
|
||||
await persistence_service.initialize()
|
||||
|
||||
# Start background tasks
|
||||
asyncio.create_task(redis_subscriber())
|
||||
asyncio.create_task(schedule_cleanup())
|
||||
|
||||
logger.info("Application startup complete")
|
||||
|
||||
yield
|
||||
|
||||
# Shutdown
|
||||
logger.info("Application shutting down...")
|
||||
await close_mongo_connection()
|
||||
await redis_manager.disconnect()
|
||||
logger.info("Application shutdown complete")
|
||||
|
||||
app = FastAPI(
|
||||
title="Energy Monitoring Dashboard API",
|
||||
description="Real-time energy monitoring and IoT sensor data management system",
|
||||
version="1.0.0",
|
||||
lifespan=lifespan
|
||||
)
|
||||
|
||||
# Add CORS middleware
|
||||
app.add_middleware(
|
||||
CORSMiddleware,
|
||||
allow_origins=["*"], # Configure appropriately for production
|
||||
allow_credentials=True,
|
||||
allow_methods=["*"],
|
||||
allow_headers=["*"],
|
||||
)
|
||||
|
||||
# Include API router
|
||||
app.include_router(api_router, prefix="/api/v1")
|
||||
|
||||
# In-memory store for active WebSocket connections
|
||||
active_connections: List[WebSocket] = []
|
||||
|
||||
# Redis channel to subscribe to
|
||||
REDIS_CHANNEL = "energy_data"
|
||||
|
||||
|
||||
@app.websocket("/ws")
|
||||
async def websocket_endpoint(websocket: WebSocket):
|
||||
"""
|
||||
WebSocket endpoint that connects a client, adds them to the active pool,
|
||||
and removes them on disconnection.
|
||||
"""
|
||||
await websocket.accept()
|
||||
active_connections.append(websocket)
|
||||
logger.info(f"New client connected. Total clients: {len(active_connections)}")
|
||||
try:
|
||||
while True:
|
||||
# Keep the connection alive
|
||||
await websocket.receive_text()
|
||||
except WebSocketDisconnect:
|
||||
active_connections.remove(websocket)
|
||||
logger.info(f"Client disconnected. Total clients: {len(active_connections)}")
|
||||
|
||||
|
||||
async def redis_subscriber():
|
||||
"""
|
||||
Connects to Redis, subscribes to the specified channel, and broadcasts
|
||||
messages to all active WebSocket clients. Also persists data to MongoDB.
|
||||
"""
|
||||
logger.info("Starting Redis subscriber...")
|
||||
try:
|
||||
r = redis.Redis(host='localhost', port=6379, db=0, decode_responses=True)
|
||||
await r.ping()
|
||||
logger.info("Successfully connected to Redis for subscription.")
|
||||
except Exception as e:
|
||||
logger.error(f"Could not connect to Redis for subscription: {e}")
|
||||
return
|
||||
|
||||
pubsub = r.pubsub()
|
||||
await pubsub.subscribe(REDIS_CHANNEL)
|
||||
|
||||
logger.info(f"Subscribed to Redis channel: '{REDIS_CHANNEL}'")
|
||||
while True:
|
||||
try:
|
||||
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0)
|
||||
if message:
|
||||
message_data = message['data']
|
||||
logger.debug(f"Received from Redis: {message_data}")
|
||||
|
||||
# Process and persist the data
|
||||
await persistence_service.process_sensor_message(message_data)
|
||||
|
||||
# Broadcast message to all connected WebSocket clients
|
||||
if active_connections:
|
||||
await asyncio.gather(
|
||||
*[connection.send_text(message_data) for connection in active_connections],
|
||||
return_exceptions=True
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in Redis subscriber loop: {e}")
|
||||
# Add a delay to prevent rapid-fire errors
|
||||
await asyncio.sleep(5)
|
||||
|
||||
|
||||
@app.get("/")
|
||||
async def read_root():
|
||||
"""Root endpoint with basic system information"""
|
||||
return {
|
||||
"message": "Energy Monitoring Dashboard Backend",
|
||||
"version": "1.0.0",
|
||||
"status": "running",
|
||||
"uptime_seconds": time.time() - app_start_time
|
||||
}
|
||||
|
||||
|
||||
@app.get("/health", response_model=HealthCheck)
|
||||
async def health_check():
|
||||
"""Health check endpoint"""
|
||||
try:
|
||||
# Check database connections
|
||||
mongodb_connected = True
|
||||
redis_connected = True
|
||||
|
||||
try:
|
||||
await persistence_service.db.command("ping")
|
||||
except:
|
||||
mongodb_connected = False
|
||||
|
||||
try:
|
||||
await redis_manager.redis_client.ping()
|
||||
except:
|
||||
redis_connected = False
|
||||
|
||||
# Get system statistics
|
||||
stats = await persistence_service.get_sensor_statistics()
|
||||
|
||||
# Determine overall status
|
||||
status = "healthy"
|
||||
if not mongodb_connected or not redis_connected:
|
||||
status = "degraded"
|
||||
|
||||
return HealthCheck(
|
||||
status=status,
|
||||
mongodb_connected=mongodb_connected,
|
||||
redis_connected=redis_connected,
|
||||
total_sensors=stats.get("total_sensors", 0),
|
||||
active_sensors=stats.get("active_sensors", 0),
|
||||
total_readings=stats.get("total_readings", 0),
|
||||
uptime_seconds=time.time() - app_start_time
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Health check failed: {e}")
|
||||
raise HTTPException(status_code=503, detail="Service Unavailable")
|
||||
|
||||
|
||||
@app.get("/status")
|
||||
async def system_status():
|
||||
"""Detailed system status endpoint"""
|
||||
try:
|
||||
stats = await persistence_service.get_sensor_statistics()
|
||||
|
||||
return {
|
||||
"timestamp": time.time(),
|
||||
"uptime_seconds": time.time() - app_start_time,
|
||||
"active_websocket_connections": len(active_connections),
|
||||
"database_stats": stats
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Status check failed: {e}")
|
||||
raise HTTPException(status_code=500, detail="Internal Server Error")
|
||||
@@ -1,84 +0,0 @@
|
||||
# Microservices Architecture Example
|
||||
|
||||
## Service Decomposition
|
||||
|
||||
### 1. Sensor Data Service
|
||||
**Responsibility**: Sensor data ingestion, validation, and storage
|
||||
```
|
||||
Port: 8001
|
||||
Database: sensor_db (MongoDB)
|
||||
Endpoints:
|
||||
- POST /sensors/data # Ingest sensor readings
|
||||
- GET /sensors/{id}/data # Get sensor history
|
||||
- GET /sensors # List sensors
|
||||
```
|
||||
|
||||
### 2. Room Management Service
|
||||
**Responsibility**: Room metrics, aggregations, and space management
|
||||
```
|
||||
Port: 8002
|
||||
Database: room_db (MongoDB)
|
||||
Endpoints:
|
||||
- GET /rooms # List rooms
|
||||
- GET /rooms/{id}/metrics # Current room metrics
|
||||
- GET /rooms/{id}/history # Historical room data
|
||||
```
|
||||
|
||||
### 3. Analytics Service
|
||||
**Responsibility**: Data analysis, reporting, and insights
|
||||
```
|
||||
Port: 8003
|
||||
Database: analytics_db (PostgreSQL/ClickHouse)
|
||||
Endpoints:
|
||||
- GET /analytics/summary # Dashboard summary
|
||||
- GET /analytics/trends # Trend analysis
|
||||
- GET /analytics/reports/{id} # Generated reports
|
||||
```
|
||||
|
||||
### 4. Notification Service
|
||||
**Responsibility**: Alerts, events, and real-time notifications
|
||||
```
|
||||
Port: 8004
|
||||
Database: events_db (MongoDB)
|
||||
Message Queue: RabbitMQ/Kafka
|
||||
Endpoints:
|
||||
- POST /notifications/send # Send notification
|
||||
- GET /events # System events
|
||||
- WebSocket: /ws/notifications # Real-time alerts
|
||||
```
|
||||
|
||||
### 5. API Gateway
|
||||
**Responsibility**: Request routing, authentication, rate limiting
|
||||
```
|
||||
Port: 8000
|
||||
Routes all requests to appropriate services
|
||||
Handles CORS, authentication, logging
|
||||
```
|
||||
|
||||
## Inter-Service Communication
|
||||
|
||||
### Synchronous (HTTP/REST)
|
||||
```python
|
||||
# Analytics Service calling Sensor Service
|
||||
import httpx
|
||||
|
||||
async def get_sensor_data(sensor_id: str):
|
||||
async with httpx.AsyncClient() as client:
|
||||
response = await client.get(f"http://sensor-service:8001/sensors/{sensor_id}/data")
|
||||
return response.json()
|
||||
```
|
||||
|
||||
### Asynchronous (Message Queue)
|
||||
```python
|
||||
# Sensor Service publishes event
|
||||
await message_queue.publish("sensor.data.received", {
|
||||
"sensor_id": "sensor_001",
|
||||
"timestamp": datetime.utcnow(),
|
||||
"data": sensor_reading
|
||||
})
|
||||
|
||||
# Room Service subscribes to event
|
||||
@message_queue.subscribe("sensor.data.received")
|
||||
async def handle_sensor_data(message):
|
||||
await room_service.update_room_metrics(message.data)
|
||||
```
|
||||
@@ -1,221 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Test script to validate the layered architecture structure
|
||||
This script checks the structure without requiring all dependencies to be installed
|
||||
"""
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
def check_file_structure():
|
||||
"""Check if all expected files exist in the layered structure"""
|
||||
expected_structure = {
|
||||
"layers/__init__.py": "Layers package init",
|
||||
"layers/infrastructure/__init__.py": "Infrastructure layer init",
|
||||
"layers/infrastructure/database_connection.py": "Database connection management",
|
||||
"layers/infrastructure/redis_connection.py": "Redis connection management",
|
||||
"layers/infrastructure/repositories.py": "Data access layer",
|
||||
"layers/business/__init__.py": "Business layer init",
|
||||
"layers/business/sensor_service.py": "Sensor business logic",
|
||||
"layers/business/room_service.py": "Room business logic",
|
||||
"layers/business/analytics_service.py": "Analytics business logic",
|
||||
"layers/business/cleanup_service.py": "Cleanup business logic",
|
||||
"layers/presentation/__init__.py": "Presentation layer init",
|
||||
"layers/presentation/websocket_handler.py": "WebSocket management",
|
||||
"layers/presentation/redis_subscriber.py": "Redis pub/sub handling",
|
||||
"layers/presentation/api_routes.py": "API route definitions",
|
||||
"main_layered.py": "Main application with layered architecture",
|
||||
"models.py": "Data models (existing)",
|
||||
}
|
||||
|
||||
print("🔍 Checking layered architecture file structure...")
|
||||
print("=" * 60)
|
||||
|
||||
all_files_exist = True
|
||||
|
||||
for file_path, description in expected_structure.items():
|
||||
full_path = Path(file_path)
|
||||
|
||||
if full_path.exists():
|
||||
size = full_path.stat().st_size
|
||||
print(f"✅ {file_path:<40} ({size:,} bytes) - {description}")
|
||||
else:
|
||||
print(f"❌ {file_path:<40} MISSING - {description}")
|
||||
all_files_exist = False
|
||||
|
||||
print("=" * 60)
|
||||
|
||||
if all_files_exist:
|
||||
print("🎉 All files in layered structure exist!")
|
||||
return True
|
||||
else:
|
||||
print("❌ Some files are missing from the layered structure")
|
||||
return False
|
||||
|
||||
def check_import_structure():
|
||||
"""Check the logical structure of imports (without actually importing)"""
|
||||
print("\n📋 Analyzing import dependencies...")
|
||||
print("=" * 60)
|
||||
|
||||
# Define expected dependencies by layer
|
||||
layer_dependencies = {
|
||||
"Infrastructure Layer": {
|
||||
"files": [
|
||||
"layers/infrastructure/database_connection.py",
|
||||
"layers/infrastructure/redis_connection.py",
|
||||
"layers/infrastructure/repositories.py"
|
||||
],
|
||||
"can_import_from": ["models", "external libraries"],
|
||||
"should_not_import_from": ["business", "presentation"]
|
||||
},
|
||||
"Business Layer": {
|
||||
"files": [
|
||||
"layers/business/sensor_service.py",
|
||||
"layers/business/room_service.py",
|
||||
"layers/business/analytics_service.py",
|
||||
"layers/business/cleanup_service.py"
|
||||
],
|
||||
"can_import_from": ["models", "infrastructure", "external libraries"],
|
||||
"should_not_import_from": ["presentation"]
|
||||
},
|
||||
"Presentation Layer": {
|
||||
"files": [
|
||||
"layers/presentation/websocket_handler.py",
|
||||
"layers/presentation/redis_subscriber.py",
|
||||
"layers/presentation/api_routes.py"
|
||||
],
|
||||
"can_import_from": ["models", "business", "infrastructure", "external libraries"],
|
||||
"should_not_import_from": []
|
||||
}
|
||||
}
|
||||
|
||||
violations = []
|
||||
|
||||
for layer_name, layer_info in layer_dependencies.items():
|
||||
print(f"\n{layer_name}:")
|
||||
|
||||
for file_path in layer_info["files"]:
|
||||
if Path(file_path).exists():
|
||||
try:
|
||||
with open(file_path, 'r') as f:
|
||||
content = f.read()
|
||||
|
||||
# Check for violations
|
||||
for forbidden in layer_info["should_not_import_from"]:
|
||||
if forbidden == "business" and "from ..business" in content:
|
||||
violations.append(f"{file_path} imports from business layer (violation)")
|
||||
elif forbidden == "presentation" and "from ..presentation" in content:
|
||||
violations.append(f"{file_path} imports from presentation layer (violation)")
|
||||
|
||||
print(f" ✅ {Path(file_path).name}")
|
||||
|
||||
except Exception as e:
|
||||
print(f" ⚠️ {Path(file_path).name} - Could not analyze: {e}")
|
||||
|
||||
if violations:
|
||||
print(f"\n❌ Found {len(violations)} layering violations:")
|
||||
for violation in violations:
|
||||
print(f" - {violation}")
|
||||
return False
|
||||
else:
|
||||
print("\n✅ No layering violations detected!")
|
||||
return True
|
||||
|
||||
def analyze_code_separation():
|
||||
"""Analyze how well the code has been separated by responsibility"""
|
||||
print("\n📊 Analyzing code separation...")
|
||||
print("=" * 60)
|
||||
|
||||
analysis = {
|
||||
"Infrastructure Layer": {
|
||||
"responsibilities": ["Database connections", "Redis connections", "Data repositories"],
|
||||
"file_count": 0,
|
||||
"total_lines": 0
|
||||
},
|
||||
"Business Layer": {
|
||||
"responsibilities": ["Business logic", "Data processing", "Analytics", "Cleanup"],
|
||||
"file_count": 0,
|
||||
"total_lines": 0
|
||||
},
|
||||
"Presentation Layer": {
|
||||
"responsibilities": ["HTTP endpoints", "WebSocket handling", "Request/Response"],
|
||||
"file_count": 0,
|
||||
"total_lines": 0
|
||||
}
|
||||
}
|
||||
|
||||
layer_paths = {
|
||||
"Infrastructure Layer": "layers/infrastructure/",
|
||||
"Business Layer": "layers/business/",
|
||||
"Presentation Layer": "layers/presentation/"
|
||||
}
|
||||
|
||||
for layer_name, layer_path in layer_paths.items():
|
||||
layer_dir = Path(layer_path)
|
||||
if layer_dir.exists():
|
||||
py_files = list(layer_dir.glob("*.py"))
|
||||
py_files = [f for f in py_files if f.name != "__init__.py"]
|
||||
|
||||
total_lines = 0
|
||||
for py_file in py_files:
|
||||
try:
|
||||
with open(py_file, 'r') as f:
|
||||
lines = len(f.readlines())
|
||||
total_lines += lines
|
||||
except:
|
||||
pass
|
||||
|
||||
analysis[layer_name]["file_count"] = len(py_files)
|
||||
analysis[layer_name]["total_lines"] = total_lines
|
||||
|
||||
for layer_name, info in analysis.items():
|
||||
print(f"\n{layer_name}:")
|
||||
print(f" Files: {info['file_count']}")
|
||||
print(f" Lines of Code: {info['total_lines']:,}")
|
||||
print(f" Responsibilities: {', '.join(info['responsibilities'])}")
|
||||
|
||||
total_files = sum(info["file_count"] for info in analysis.values())
|
||||
total_lines = sum(info["total_lines"] for info in analysis.values())
|
||||
|
||||
print(f"\n📈 Total Separation Metrics:")
|
||||
print(f" Total Files: {total_files}")
|
||||
print(f" Total Lines: {total_lines:,}")
|
||||
print(f" Layers: 3 (Infrastructure, Business, Presentation)")
|
||||
|
||||
return True
|
||||
|
||||
def main():
|
||||
"""Main test function"""
|
||||
print("🏗️ LAYERED ARCHITECTURE VALIDATION")
|
||||
print("=" * 60)
|
||||
|
||||
success = True
|
||||
|
||||
# Check file structure
|
||||
if not check_file_structure():
|
||||
success = False
|
||||
|
||||
# Check import structure
|
||||
if not check_import_structure():
|
||||
success = False
|
||||
|
||||
# Analyze code separation
|
||||
if not analyze_code_separation():
|
||||
success = False
|
||||
|
||||
print("\n" + "=" * 60)
|
||||
if success:
|
||||
print("🎉 VALIDATION SUCCESSFUL - Layered architecture is properly structured!")
|
||||
print("\n✨ Key Benefits Achieved:")
|
||||
print(" • Clear separation of concerns")
|
||||
print(" • Infrastructure isolated from business logic")
|
||||
print(" • Business logic separated from presentation")
|
||||
print(" • Easy to test individual layers")
|
||||
print(" • Maintainable and scalable structure")
|
||||
else:
|
||||
print("❌ VALIDATION FAILED - Issues found in layered architecture")
|
||||
|
||||
return success
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(0 if main() else 1)
|
||||
Reference in New Issue
Block a user