Remove legacy API and database modules for layered refactor

This commit is contained in:
rafaeldpsilva
2025-09-09 13:50:53 +01:00
parent 4bde7a951c
commit 3bcddf9602
7 changed files with 3 additions and 1769 deletions

3
.gitignore vendored
View File

@@ -1,4 +1,7 @@
### Python ### ### Python ###
#Claude file
.claude/
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files
__pycache__/ __pycache__/
*.py[cod] *.py[cod]

582
api.py
View File

@@ -1,582 +0,0 @@
from fastapi import APIRouter, HTTPException, Query, Depends
from typing import List, Optional, Dict, Any
from datetime import datetime, timedelta
import time
import logging
from pymongo import ASCENDING, DESCENDING
from database import get_database, redis_manager
from models import (
DataQuery, DataResponse, SensorReading, SensorMetadata,
RoomMetrics, SystemEvent, SensorType, SensorStatus
)
from persistence import persistence_service
from services.token_service import TokenService
logger = logging.getLogger(__name__)
router = APIRouter()
# Dependency to get database
async def get_db():
return await get_database()
@router.get("/sensors", summary="Get all sensors")
async def get_sensors(
room: Optional[str] = Query(None, description="Filter by room"),
sensor_type: Optional[SensorType] = Query(None, description="Filter by sensor type"),
status: Optional[SensorStatus] = Query(None, description="Filter by status"),
db=Depends(get_db)
):
"""Get list of all registered sensors with optional filtering"""
try:
# Build query
query = {}
if room:
query["room"] = room
if sensor_type:
query["sensor_type"] = sensor_type.value
if status:
query["status"] = status.value
# Execute query
cursor = db.sensor_metadata.find(query).sort("created_at", DESCENDING)
sensors = await cursor.to_list(length=None)
# Convert ObjectId to string
for sensor in sensors:
sensor["_id"] = str(sensor["_id"])
return {
"sensors": sensors,
"count": len(sensors),
"query": query
}
except Exception as e:
logger.error(f"Error getting sensors: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/sensors/{sensor_id}", summary="Get sensor details")
async def get_sensor(sensor_id: str, db=Depends(get_db)):
"""Get detailed information about a specific sensor"""
try:
# Get sensor metadata
sensor = await db.sensor_metadata.find_one({"sensor_id": sensor_id})
if not sensor:
raise HTTPException(status_code=404, detail="Sensor not found")
sensor["_id"] = str(sensor["_id"])
# Get recent readings (last 24 hours)
recent_readings = await persistence_service.get_recent_readings(
sensor_id=sensor_id,
limit=100,
minutes=1440 # 24 hours
)
# Get latest reading from Redis
latest_reading = await redis_manager.get_sensor_data(sensor_id)
return {
"sensor": sensor,
"latest_reading": latest_reading,
"recent_readings_count": len(recent_readings),
"recent_readings": recent_readings[:10] # Return only 10 most recent
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting sensor {sensor_id}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/sensors/{sensor_id}/data", summary="Get sensor historical data")
async def get_sensor_data(
sensor_id: str,
start_time: Optional[int] = Query(None, description="Start timestamp (Unix)"),
end_time: Optional[int] = Query(None, description="End timestamp (Unix)"),
limit: int = Query(100, description="Maximum records to return"),
offset: int = Query(0, description="Records to skip"),
db=Depends(get_db)
):
"""Get historical data for a specific sensor"""
try:
start_query_time = time.time()
# Build time range query
query = {"sensor_id": sensor_id}
if start_time or end_time:
time_query = {}
if start_time:
time_query["$gte"] = datetime.fromtimestamp(start_time)
if end_time:
time_query["$lte"] = datetime.fromtimestamp(end_time)
query["created_at"] = time_query
# Get total count
total_count = await db.sensor_readings.count_documents(query)
# Execute query with pagination
cursor = db.sensor_readings.find(query).sort("timestamp", DESCENDING).skip(offset).limit(limit)
readings = await cursor.to_list(length=limit)
# Convert ObjectId to string
for reading in readings:
reading["_id"] = str(reading["_id"])
execution_time = (time.time() - start_query_time) * 1000 # Convert to milliseconds
return DataResponse(
data=readings,
total_count=total_count,
query=DataQuery(
sensor_ids=[sensor_id],
start_time=start_time,
end_time=end_time,
limit=limit,
offset=offset
),
execution_time_ms=execution_time
)
except Exception as e:
logger.error(f"Error getting sensor data for {sensor_id}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/rooms", summary="Get all rooms")
async def get_rooms(db=Depends(get_db)):
"""Get list of all rooms with sensor counts"""
try:
# Get distinct rooms from sensor readings
rooms = await db.sensor_readings.distinct("room", {"room": {"$ne": None}})
room_data = []
for room in rooms:
# Get sensor count for each room
sensor_count = len(await db.sensor_readings.distinct("sensor_id", {"room": room}))
# Get latest room metrics from Redis
room_metrics = await redis_manager.get_room_metrics(room)
room_data.append({
"room": room,
"sensor_count": sensor_count,
"latest_metrics": room_metrics
})
return {
"rooms": room_data,
"count": len(room_data)
}
except Exception as e:
logger.error(f"Error getting rooms: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/rooms/{room_name}/data", summary="Get room historical data")
async def get_room_data(
room_name: str,
start_time: Optional[int] = Query(None, description="Start timestamp (Unix)"),
end_time: Optional[int] = Query(None, description="End timestamp (Unix)"),
limit: int = Query(100, description="Maximum records to return"),
db=Depends(get_db)
):
"""Get historical data for a specific room"""
try:
start_query_time = time.time()
# Build query for room metrics
query = {"room": room_name}
if start_time or end_time:
time_query = {}
if start_time:
time_query["$gte"] = datetime.fromtimestamp(start_time)
if end_time:
time_query["$lte"] = datetime.fromtimestamp(end_time)
query["created_at"] = time_query
# Get room metrics
cursor = db.room_metrics.find(query).sort("timestamp", DESCENDING).limit(limit)
room_metrics = await cursor.to_list(length=limit)
# Also get sensor readings for the room
sensor_query = {"room": room_name}
if "created_at" in query:
sensor_query["created_at"] = query["created_at"]
sensor_cursor = db.sensor_readings.find(sensor_query).sort("timestamp", DESCENDING).limit(limit)
sensor_readings = await sensor_cursor.to_list(length=limit)
# Convert ObjectId to string
for item in room_metrics + sensor_readings:
item["_id"] = str(item["_id"])
execution_time = (time.time() - start_query_time) * 1000
return {
"room": room_name,
"room_metrics": room_metrics,
"sensor_readings": sensor_readings,
"execution_time_ms": execution_time
}
except Exception as e:
logger.error(f"Error getting room data for {room_name}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.post("/data/query", summary="Advanced data query", response_model=DataResponse)
async def query_data(query_params: DataQuery, db=Depends(get_db)):
"""Advanced data querying with multiple filters and aggregations"""
try:
start_query_time = time.time()
# Build MongoDB query
mongo_query = {}
# Sensor filters
if query_params.sensor_ids:
mongo_query["sensor_id"] = {"$in": query_params.sensor_ids}
if query_params.rooms:
mongo_query["room"] = {"$in": query_params.rooms}
if query_params.sensor_types:
mongo_query["sensor_type"] = {"$in": [st.value for st in query_params.sensor_types]}
# Time range
if query_params.start_time or query_params.end_time:
time_query = {}
if query_params.start_time:
time_query["$gte"] = datetime.fromtimestamp(query_params.start_time)
if query_params.end_time:
time_query["$lte"] = datetime.fromtimestamp(query_params.end_time)
mongo_query["created_at"] = time_query
# Get total count
total_count = await db.sensor_readings.count_documents(mongo_query)
# Execute query with pagination and sorting
sort_direction = DESCENDING if query_params.sort_order == "desc" else ASCENDING
cursor = db.sensor_readings.find(mongo_query).sort(
query_params.sort_by, sort_direction
).skip(query_params.offset).limit(query_params.limit)
readings = await cursor.to_list(length=query_params.limit)
# Convert ObjectId to string
for reading in readings:
reading["_id"] = str(reading["_id"])
execution_time = (time.time() - start_query_time) * 1000
return DataResponse(
data=readings,
total_count=total_count,
query=query_params,
execution_time_ms=execution_time
)
except Exception as e:
logger.error(f"Error executing data query: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/analytics/summary", summary="Get analytics summary")
async def get_analytics_summary(
hours: int = Query(24, description="Hours of data to analyze"),
db=Depends(get_db)
):
"""Get analytics summary for the specified time period"""
try:
start_time = datetime.utcnow() - timedelta(hours=hours)
# Aggregation pipeline for analytics
pipeline = [
{"$match": {"created_at": {"$gte": start_time}}},
{"$group": {
"_id": {
"sensor_id": "$sensor_id",
"room": "$room",
"sensor_type": "$sensor_type"
},
"reading_count": {"$sum": 1},
"avg_energy": {"$avg": "$energy.value"},
"total_energy": {"$sum": "$energy.value"},
"avg_co2": {"$avg": "$co2.value"},
"max_co2": {"$max": "$co2.value"},
"avg_temperature": {"$avg": "$temperature.value"},
"latest_timestamp": {"$max": "$timestamp"}
}},
{"$sort": {"total_energy": -1}}
]
cursor = db.sensor_readings.aggregate(pipeline)
analytics = await cursor.to_list(length=None)
# Room-level summary
room_pipeline = [
{"$match": {"created_at": {"$gte": start_time}, "room": {"$ne": None}}},
{"$group": {
"_id": "$room",
"sensor_count": {"$addToSet": "$sensor_id"},
"total_energy": {"$sum": "$energy.value"},
"avg_co2": {"$avg": "$co2.value"},
"max_co2": {"$max": "$co2.value"},
"reading_count": {"$sum": 1}
}},
{"$project": {
"room": "$_id",
"sensor_count": {"$size": "$sensor_count"},
"total_energy": 1,
"avg_co2": 1,
"max_co2": 1,
"reading_count": 1
}},
{"$sort": {"total_energy": -1}}
]
room_cursor = db.sensor_readings.aggregate(room_pipeline)
room_analytics = await room_cursor.to_list(length=None)
return {
"period_hours": hours,
"start_time": start_time.isoformat(),
"sensor_analytics": analytics,
"room_analytics": room_analytics,
"summary": {
"total_sensors_analyzed": len(analytics),
"total_rooms_analyzed": len(room_analytics),
"total_readings": sum(item["reading_count"] for item in analytics)
}
}
except Exception as e:
logger.error(f"Error getting analytics summary: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/events", summary="Get system events")
async def get_events(
severity: Optional[str] = Query(None, description="Filter by severity"),
event_type: Optional[str] = Query(None, description="Filter by event type"),
hours: int = Query(24, description="Hours of events to retrieve"),
limit: int = Query(50, description="Maximum events to return"),
db=Depends(get_db)
):
"""Get recent system events and alerts"""
try:
start_time = datetime.utcnow() - timedelta(hours=hours)
# Build query
query = {"created_at": {"$gte": start_time}}
if severity:
query["severity"] = severity
if event_type:
query["event_type"] = event_type
# Execute query
cursor = db.system_events.find(query).sort("timestamp", DESCENDING).limit(limit)
events = await cursor.to_list(length=limit)
# Convert ObjectId to string
for event in events:
event["_id"] = str(event["_id"])
return {
"events": events,
"count": len(events),
"period_hours": hours
}
except Exception as e:
logger.error(f"Error getting events: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.put("/sensors/{sensor_id}/metadata", summary="Update sensor metadata")
async def update_sensor_metadata(
sensor_id: str,
metadata: dict,
db=Depends(get_db)
):
"""Update sensor metadata"""
try:
# Update timestamp
metadata["updated_at"] = datetime.utcnow()
result = await db.sensor_metadata.update_one(
{"sensor_id": sensor_id},
{"$set": metadata}
)
if result.matched_count == 0:
raise HTTPException(status_code=404, detail="Sensor not found")
return {"message": "Sensor metadata updated successfully", "modified": result.modified_count > 0}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating sensor metadata: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.delete("/sensors/{sensor_id}", summary="Delete sensor and all its data")
async def delete_sensor(sensor_id: str, db=Depends(get_db)):
"""Delete a sensor and all its associated data"""
try:
# Delete sensor readings
readings_result = await db.sensor_readings.delete_many({"sensor_id": sensor_id})
# Delete sensor metadata
metadata_result = await db.sensor_metadata.delete_one({"sensor_id": sensor_id})
# Delete from Redis cache
await redis_manager.redis_client.delete(f"sensor:latest:{sensor_id}")
await redis_manager.redis_client.delete(f"sensor:status:{sensor_id}")
if metadata_result.deleted_count == 0:
raise HTTPException(status_code=404, detail="Sensor not found")
return {
"message": "Sensor deleted successfully",
"sensor_id": sensor_id,
"readings_deleted": readings_result.deleted_count,
"metadata_deleted": metadata_result.deleted_count
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting sensor {sensor_id}: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/export", summary="Export data")
async def export_data(
start_time: int = Query(..., description="Start timestamp (Unix)"),
end_time: int = Query(..., description="End timestamp (Unix)"),
sensor_ids: Optional[str] = Query(None, description="Comma-separated sensor IDs"),
format: str = Query("json", description="Export format (json, csv)"),
db=Depends(get_db)
):
"""Export sensor data for the specified time range"""
try:
# Build query
query = {
"created_at": {
"$gte": datetime.fromtimestamp(start_time),
"$lte": datetime.fromtimestamp(end_time)
}
}
if sensor_ids:
sensor_list = [sid.strip() for sid in sensor_ids.split(",")]
query["sensor_id"] = {"$in": sensor_list}
# Get data
cursor = db.sensor_readings.find(query).sort("timestamp", ASCENDING)
readings = await cursor.to_list(length=None)
# Convert ObjectId to string
for reading in readings:
reading["_id"] = str(reading["_id"])
# Convert datetime to ISO string for JSON serialization
if "created_at" in reading:
reading["created_at"] = reading["created_at"].isoformat()
if format.lower() == "csv":
# TODO: Implement CSV export
raise HTTPException(status_code=501, detail="CSV export not yet implemented")
return {
"data": readings,
"count": len(readings),
"export_params": {
"start_time": start_time,
"end_time": end_time,
"sensor_ids": sensor_ids.split(",") if sensor_ids else None,
"format": format
}
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error exporting data: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
# Token Management Endpoints
@router.get("/tokens", summary="Get all tokens")
async def get_tokens(db=Depends(get_db)):
"""Get list of all tokens"""
try:
token_service = TokenService(db)
tokens = await token_service.get_tokens()
return {"tokens": tokens}
except Exception as e:
logger.error(f"Error getting tokens: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.post("/tokens/generate", summary="Generate new token")
async def generate_token(
name: str,
list_of_resources: List[str],
data_aggregation: bool = False,
time_aggregation: bool = False,
embargo: int = 0,
exp_hours: int = 24,
db=Depends(get_db)
):
"""Generate a new JWT token with specified permissions"""
try:
token_service = TokenService(db)
token = token_service.generate_token(
name=name,
list_of_resources=list_of_resources,
data_aggregation=data_aggregation,
time_aggregation=time_aggregation,
embargo=embargo,
exp_hours=exp_hours
)
return {"token": token}
except Exception as e:
logger.error(f"Error generating token: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.post("/tokens/check", summary="Validate token")
async def check_token(token: str, db=Depends(get_db)):
"""Check token validity and decode payload"""
try:
token_service = TokenService(db)
decoded = token_service.decode_token(token)
return decoded
except Exception as e:
logger.error(f"Error checking token: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.post("/tokens/save", summary="Save token to database")
async def save_token(token: str, db=Depends(get_db)):
"""Save a valid token to the database"""
try:
token_service = TokenService(db)
result = await token_service.insert_token(token)
return result
except ValueError as e:
raise HTTPException(status_code=400, detail=str(e))
except Exception as e:
logger.error(f"Error saving token: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.post("/tokens/revoke", summary="Revoke token")
async def revoke_token(token: str, db=Depends(get_db)):
"""Revoke a token by marking it as inactive"""
try:
token_service = TokenService(db)
result = await token_service.revoke_token(token)
return result
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))
except Exception as e:
logger.error(f"Error revoking token: {e}")
raise HTTPException(status_code=500, detail="Internal server error")

View File

@@ -1,220 +0,0 @@
import os
from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase
from pymongo import IndexModel, ASCENDING, DESCENDING
from typing import Optional
import asyncio
from datetime import datetime, timedelta
import logging
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class MongoDB:
client: Optional[AsyncIOMotorClient] = None
database: Optional[AsyncIOMotorDatabase] = None
# Global MongoDB instance
mongodb = MongoDB()
async def connect_to_mongo():
"""Create database connection"""
try:
# MongoDB connection string - default to localhost for development
mongodb_url = os.getenv("MONGODB_URL", "mongodb://localhost:27017")
database_name = os.getenv("DATABASE_NAME", "energy_monitoring")
logger.info(f"Connecting to MongoDB at: {mongodb_url}")
# Create async MongoDB client
mongodb.client = AsyncIOMotorClient(mongodb_url)
# Test the connection
await mongodb.client.admin.command('ping')
logger.info("Successfully connected to MongoDB")
# Get database
mongodb.database = mongodb.client[database_name]
# Create indexes for better performance
await create_indexes()
except Exception as e:
logger.error(f"Error connecting to MongoDB: {e}")
raise
async def close_mongo_connection():
"""Close database connection"""
if mongodb.client:
mongodb.client.close()
logger.info("Disconnected from MongoDB")
async def create_indexes():
"""Create database indexes for optimal performance"""
try:
# Sensor readings collection indexes
sensor_readings_indexes = [
IndexModel([("sensor_id", ASCENDING), ("timestamp", DESCENDING)]),
IndexModel([("timestamp", DESCENDING)]),
IndexModel([("room", ASCENDING), ("timestamp", DESCENDING)]),
IndexModel([("sensor_type", ASCENDING), ("timestamp", DESCENDING)]),
IndexModel([("created_at", DESCENDING)]),
]
await mongodb.database.sensor_readings.create_indexes(sensor_readings_indexes)
# Room metrics collection indexes
room_metrics_indexes = [
IndexModel([("room", ASCENDING), ("timestamp", DESCENDING)]),
IndexModel([("timestamp", DESCENDING)]),
IndexModel([("created_at", DESCENDING)]),
]
await mongodb.database.room_metrics.create_indexes(room_metrics_indexes)
# Sensor metadata collection indexes
sensor_metadata_indexes = [
IndexModel([("sensor_id", ASCENDING)], unique=True),
IndexModel([("room", ASCENDING)]),
IndexModel([("sensor_type", ASCENDING)]),
IndexModel([("status", ASCENDING)]),
]
await mongodb.database.sensor_metadata.create_indexes(sensor_metadata_indexes)
# System events collection indexes
system_events_indexes = [
IndexModel([("timestamp", DESCENDING)]),
IndexModel([("event_type", ASCENDING), ("timestamp", DESCENDING)]),
IndexModel([("severity", ASCENDING), ("timestamp", DESCENDING)]),
]
await mongodb.database.system_events.create_indexes(system_events_indexes)
logger.info("Database indexes created successfully")
except Exception as e:
logger.error(f"Error creating indexes: {e}")
async def get_database() -> AsyncIOMotorDatabase:
"""Get database instance"""
if not mongodb.database:
await connect_to_mongo()
return mongodb.database
class RedisManager:
"""Redis connection and operations manager"""
def __init__(self):
self.redis_client = None
self.redis_host = os.getenv("REDIS_HOST", "localhost")
self.redis_port = int(os.getenv("REDIS_PORT", "6379"))
self.redis_db = int(os.getenv("REDIS_DB", "0"))
async def connect(self):
"""Connect to Redis"""
try:
import redis.asyncio as redis
self.redis_client = redis.Redis(
host=self.redis_host,
port=self.redis_port,
db=self.redis_db,
decode_responses=True
)
await self.redis_client.ping()
logger.info("Successfully connected to Redis")
except Exception as e:
logger.error(f"Error connecting to Redis: {e}")
raise
async def disconnect(self):
"""Disconnect from Redis"""
if self.redis_client:
await self.redis_client.close()
logger.info("Disconnected from Redis")
async def set_sensor_data(self, sensor_id: str, data: dict, expire_time: int = 3600):
"""Store latest sensor data in Redis with expiration"""
if not self.redis_client:
await self.connect()
key = f"sensor:latest:{sensor_id}"
await self.redis_client.setex(key, expire_time, str(data))
async def get_sensor_data(self, sensor_id: str) -> Optional[dict]:
"""Get latest sensor data from Redis"""
if not self.redis_client:
await self.connect()
key = f"sensor:latest:{sensor_id}"
data = await self.redis_client.get(key)
if data:
import json
return json.loads(data)
return None
async def set_room_metrics(self, room: str, metrics: dict, expire_time: int = 1800):
"""Store room aggregated metrics in Redis"""
if not self.redis_client:
await self.connect()
key = f"room:metrics:{room}"
await self.redis_client.setex(key, expire_time, str(metrics))
async def get_room_metrics(self, room: str) -> Optional[dict]:
"""Get room aggregated metrics from Redis"""
if not self.redis_client:
await self.connect()
key = f"room:metrics:{room}"
data = await self.redis_client.get(key)
if data:
import json
return json.loads(data)
return None
async def get_all_active_sensors(self) -> list:
"""Get list of all sensors with recent data in Redis"""
if not self.redis_client:
await self.connect()
keys = await self.redis_client.keys("sensor:latest:*")
return [key.replace("sensor:latest:", "") for key in keys]
# Global Redis manager instance
redis_manager = RedisManager()
async def cleanup_old_data():
"""Cleanup old data from MongoDB (retention policy)"""
try:
db = await get_database()
# Delete sensor readings older than 90 days
retention_date = datetime.utcnow() - timedelta(days=90)
result = await db.sensor_readings.delete_many({
"created_at": {"$lt": retention_date}
})
if result.deleted_count > 0:
logger.info(f"Deleted {result.deleted_count} old sensor readings")
# Delete room metrics older than 30 days
retention_date = datetime.utcnow() - timedelta(days=30)
result = await db.room_metrics.delete_many({
"created_at": {"$lt": retention_date}
})
if result.deleted_count > 0:
logger.info(f"Deleted {result.deleted_count} old room metrics")
except Exception as e:
logger.error(f"Error cleaning up old data: {e}")
# Scheduled cleanup task
async def schedule_cleanup():
"""Schedule periodic cleanup of old data"""
while True:
try:
await cleanup_old_data()
# Wait 24 hours before next cleanup
await asyncio.sleep(24 * 60 * 60)
except Exception as e:
logger.error(f"Error in scheduled cleanup: {e}")
# Wait 1 hour before retrying
await asyncio.sleep(60 * 60)

View File

@@ -1,273 +0,0 @@
"""
Main application entry point with layered architecture
This is the new structured version of the FastAPI application
"""
import asyncio
import time
from contextlib import asynccontextmanager
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
from fastapi.middleware.cors import CORSMiddleware
import logging
# Import layered components
from layers.infrastructure.database_connection import database_connection
from layers.infrastructure.redis_connection import redis_connection
from layers.business.sensor_service import SensorService
from layers.business.cleanup_service import cleanup_service
from layers.presentation.websocket_handler import websocket_manager
from layers.presentation.redis_subscriber import redis_subscriber
from layers.presentation.api_routes import router as api_router
from models import HealthCheck
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Application startup time for uptime calculation
app_start_time = time.time()
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager with proper layer initialization"""
# Startup
logger.info("Application starting up...")
try:
# Initialize infrastructure layer
await database_connection.connect()
await redis_connection.connect()
logger.info("Infrastructure layer initialized")
# Initialize business layer
sensor_service = SensorService() # Services are initialized on-demand
logger.info("Business layer initialized")
# Initialize presentation layer
await redis_subscriber.start_subscription("energy_data")
await cleanup_service.start_scheduled_cleanup(24) # Daily cleanup
logger.info("Presentation layer initialized")
logger.info("Application startup complete")
yield
# Shutdown
logger.info("Application shutting down...")
# Stop background tasks
await redis_subscriber.stop_subscription()
await cleanup_service.stop_scheduled_cleanup()
# Close connections
await database_connection.disconnect()
await redis_connection.disconnect()
logger.info("Application shutdown complete")
except Exception as e:
logger.error(f"Error during application lifecycle: {e}")
raise
app = FastAPI(
title="Energy Monitoring Dashboard API",
description="Real-time energy monitoring and IoT sensor data management system (Layered Architecture)",
version="2.0.0",
lifespan=lifespan
)
# Add CORS middleware
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Configure appropriately for production
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Include API router with version prefix
app.include_router(api_router, prefix="/api/v1")
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
"""
WebSocket endpoint for real-time data streaming
Presentation Layer - handles WebSocket connections
"""
await websocket_manager.connect(websocket)
try:
while True:
# Keep the connection alive by waiting for messages
await websocket.receive_text()
except WebSocketDisconnect:
websocket_manager.disconnect(websocket)
@app.get("/")
async def read_root():
"""Root endpoint with basic system information"""
return {
"message": "Energy Monitoring Dashboard Backend (Layered Architecture)",
"version": "2.0.0",
"status": "running",
"uptime_seconds": time.time() - app_start_time,
"architecture": "3-layer (Presentation, Business, Infrastructure)"
}
@app.get("/health", response_model=HealthCheck)
async def health_check():
"""
Comprehensive health check endpoint
Checks all layers and dependencies
"""
try:
# Check infrastructure layer
mongodb_connected = True
redis_connected = True
try:
db = await database_connection.get_database()
await db.command("ping")
except:
mongodb_connected = False
try:
redis_client = await redis_connection.get_client()
await redis_client.ping()
except:
redis_connected = False
# Check business layer through service
sensor_service = SensorService()
from layers.infrastructure.repositories import SensorReadingRepository
stats_repo = SensorReadingRepository()
# Get basic statistics
try:
# Simple count queries to test business layer
total_readings = await stats_repo.count_by_query({})
active_sensors_data = await redis_connection.get_keys_by_pattern("sensor:latest:*")
total_sensors = len(active_sensors_data)
except Exception as e:
logger.error(f"Error getting stats for health check: {e}")
total_readings = 0
total_sensors = 0
# Check presentation layer
websocket_connections = websocket_manager.get_connection_count()
redis_subscription_active = redis_subscriber.is_subscriber_running()
# Determine overall status
status = "healthy"
if not mongodb_connected or not redis_connected:
status = "degraded"
if not mongodb_connected and not redis_connected:
status = "unhealthy"
return HealthCheck(
status=status,
mongodb_connected=mongodb_connected,
redis_connected=redis_connected,
total_sensors=total_sensors,
active_sensors=total_sensors, # Approximation
total_readings=total_readings,
uptime_seconds=time.time() - app_start_time
)
except Exception as e:
logger.error(f"Health check failed: {e}")
raise HTTPException(status_code=503, detail="Service Unavailable")
@app.get("/status")
async def system_status():
"""
Detailed system status endpoint with layer-specific information
"""
try:
# Infrastructure layer status
infrastructure_status = {
"database_connected": True,
"redis_connected": True
}
try:
db = await database_connection.get_database()
await db.command("ping")
except:
infrastructure_status["database_connected"] = False
try:
redis_client = await redis_connection.get_client()
await redis_client.ping()
except:
infrastructure_status["redis_connected"] = False
# Business layer status
business_status = {
"cleanup_service_running": cleanup_service.is_cleanup_running()
}
# Presentation layer status
presentation_status = {
"active_websocket_connections": websocket_manager.get_connection_count(),
"redis_subscriber_running": redis_subscriber.is_subscriber_running()
}
# Get subscriber status details
subscriber_status = await redis_subscriber.get_subscriber_status()
return {
"timestamp": time.time(),
"uptime_seconds": time.time() - app_start_time,
"architecture": "layered",
"layers": {
"infrastructure": infrastructure_status,
"business": business_status,
"presentation": presentation_status
},
"redis_subscriber": subscriber_status
}
except Exception as e:
logger.error(f"Status check failed: {e}")
raise HTTPException(status_code=500, detail="Internal Server Error")
@app.get("/system/cleanup", summary="Get cleanup service status")
async def get_cleanup_status():
"""Get data cleanup service status and statistics"""
try:
# Get cleanup service status
cleanup_running = cleanup_service.is_cleanup_running()
# Get storage statistics
storage_stats = await cleanup_service.get_storage_statistics()
# Get retention policy info
retention_info = await cleanup_service.get_data_retention_info()
return {
"cleanup_service_running": cleanup_running,
"storage_statistics": storage_stats,
"retention_policies": retention_info
}
except Exception as e:
logger.error(f"Error getting cleanup status: {e}")
raise HTTPException(status_code=500, detail="Internal Server Error")
@app.post("/system/cleanup", summary="Run manual cleanup")
async def run_manual_cleanup():
"""Manually trigger data cleanup process"""
try:
cleanup_results = await cleanup_service.cleanup_old_data()
return {
"message": "Manual cleanup completed",
"results": cleanup_results
}
except Exception as e:
logger.error(f"Error running manual cleanup: {e}")
raise HTTPException(status_code=500, detail="Internal Server Error")
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

236
models.py
View File

@@ -1,236 +0,0 @@
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any, Literal
from datetime import datetime
from enum import Enum
class SensorType(str, Enum):
ENERGY = "energy"
CO2 = "co2"
TEMPERATURE = "temperature"
HUMIDITY = "humidity"
HVAC = "hvac"
LIGHTING = "lighting"
SECURITY = "security"
class SensorStatus(str, Enum):
ONLINE = "online"
OFFLINE = "offline"
ERROR = "error"
MAINTENANCE = "maintenance"
class CO2Status(str, Enum):
GOOD = "good"
MODERATE = "moderate"
POOR = "poor"
CRITICAL = "critical"
class OccupancyLevel(str, Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
# Base Models
class SensorReading(BaseModel):
"""Individual sensor reading model"""
sensor_id: str = Field(..., description="Unique sensor identifier")
room: Optional[str] = Field(None, description="Room where sensor is located")
sensor_type: SensorType = Field(..., description="Type of sensor")
timestamp: int = Field(..., description="Unix timestamp of reading")
created_at: datetime = Field(default_factory=datetime.utcnow, description="Record creation timestamp")
# Sensor values
energy: Optional[Dict[str, Any]] = Field(None, description="Energy reading with value and unit")
co2: Optional[Dict[str, Any]] = Field(None, description="CO2 reading with value and unit")
temperature: Optional[Dict[str, Any]] = Field(None, description="Temperature reading with value and unit")
humidity: Optional[Dict[str, Any]] = Field(None, description="Humidity reading with value and unit")
motion: Optional[Dict[str, Any]] = Field(None, description="Motion detection reading")
# Metadata
metadata: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Additional sensor metadata")
class Config:
json_encoders = {
datetime: lambda v: v.isoformat()
}
class LegacySensorReading(BaseModel):
"""Legacy sensor reading format for backward compatibility"""
sensor_id: str = Field(..., alias="sensorId")
timestamp: int
value: float
unit: str
created_at: datetime = Field(default_factory=datetime.utcnow)
class Config:
allow_population_by_field_name = True
class SensorMetadata(BaseModel):
"""Sensor configuration and metadata"""
sensor_id: str = Field(..., description="Unique sensor identifier")
name: str = Field(..., description="Human-readable sensor name")
sensor_type: SensorType = Field(..., description="Type of sensor")
room: Optional[str] = Field(None, description="Room assignment")
status: SensorStatus = Field(default=SensorStatus.OFFLINE, description="Current sensor status")
# Physical location and installation details
location: Optional[str] = Field(None, description="Physical location description")
floor: Optional[str] = Field(None, description="Floor level")
building: Optional[str] = Field(None, description="Building identifier")
# Technical specifications
model: Optional[str] = Field(None, description="Sensor model")
manufacturer: Optional[str] = Field(None, description="Sensor manufacturer")
firmware_version: Optional[str] = Field(None, description="Firmware version")
hardware_version: Optional[str] = Field(None, description="Hardware version")
# Network and connectivity
ip_address: Optional[str] = Field(None, description="IP address if network connected")
mac_address: Optional[str] = Field(None, description="MAC address")
connection_type: Optional[str] = Field(None, description="Connection type (wifi, ethernet, zigbee, etc.)")
# Power and maintenance
battery_level: Optional[float] = Field(None, description="Battery level percentage")
last_maintenance: Optional[datetime] = Field(None, description="Last maintenance date")
next_maintenance: Optional[datetime] = Field(None, description="Next scheduled maintenance")
# Operational settings
sampling_rate: Optional[int] = Field(None, description="Data sampling rate in seconds")
calibration_date: Optional[datetime] = Field(None, description="Last calibration date")
# Capabilities
monitoring_capabilities: List[str] = Field(default_factory=list, description="List of monitoring capabilities")
control_capabilities: List[str] = Field(default_factory=list, description="List of control capabilities")
# Timestamps
installed_at: Optional[datetime] = Field(None, description="Installation timestamp")
last_seen: Optional[datetime] = Field(None, description="Last communication timestamp")
created_at: datetime = Field(default_factory=datetime.utcnow, description="Record creation timestamp")
updated_at: datetime = Field(default_factory=datetime.utcnow, description="Record update timestamp")
class Config:
json_encoders = {
datetime: lambda v: v.isoformat() if v else None
}
class RoomMetrics(BaseModel):
"""Aggregated room-level metrics"""
room: str = Field(..., description="Room identifier")
timestamp: int = Field(..., description="Metrics calculation timestamp")
created_at: datetime = Field(default_factory=datetime.utcnow, description="Record creation timestamp")
# Sensor inventory
sensor_count: int = Field(0, description="Total number of sensors in room")
active_sensors: List[str] = Field(default_factory=list, description="List of active sensor IDs")
sensor_types: List[SensorType] = Field(default_factory=list, description="Types of sensors present")
# Energy metrics
energy: Optional[Dict[str, Any]] = Field(None, description="Energy consumption metrics")
# Format: {"current": float, "total": float, "average": float, "peak": float, "unit": str}
# Environmental metrics
co2: Optional[Dict[str, Any]] = Field(None, description="CO2 level metrics")
# Format: {"current": float, "average": float, "max": float, "min": float, "status": CO2Status, "unit": str}
temperature: Optional[Dict[str, Any]] = Field(None, description="Temperature metrics")
# Format: {"current": float, "average": float, "max": float, "min": float, "unit": str}
humidity: Optional[Dict[str, Any]] = Field(None, description="Humidity metrics")
# Format: {"current": float, "average": float, "max": float, "min": float, "unit": str}
# Occupancy and usage
occupancy_estimate: OccupancyLevel = Field(default=OccupancyLevel.LOW, description="Estimated occupancy level")
motion_detected: bool = Field(default=False, description="Recent motion detection status")
# Time-based metrics
last_activity: Optional[datetime] = Field(None, description="Last detected activity timestamp")
daily_usage_hours: Optional[float] = Field(None, description="Estimated daily usage in hours")
class Config:
json_encoders = {
datetime: lambda v: v.isoformat() if v else None
}
class SystemEvent(BaseModel):
"""System events and alerts"""
event_id: str = Field(..., description="Unique event identifier")
event_type: str = Field(..., description="Type of event")
severity: Literal["info", "warning", "error", "critical"] = Field(..., description="Event severity")
timestamp: int = Field(..., description="Event timestamp")
created_at: datetime = Field(default_factory=datetime.utcnow, description="Record creation timestamp")
# Event details
title: str = Field(..., description="Event title")
description: str = Field(..., description="Event description")
source: Optional[str] = Field(None, description="Event source (sensor_id, system component, etc.)")
# Context
sensor_id: Optional[str] = Field(None, description="Related sensor ID")
room: Optional[str] = Field(None, description="Related room")
# Event data
data: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Additional event data")
# Status tracking
acknowledged: bool = Field(default=False, description="Whether event has been acknowledged")
resolved: bool = Field(default=False, description="Whether event has been resolved")
acknowledged_by: Optional[str] = Field(None, description="Who acknowledged the event")
resolved_by: Optional[str] = Field(None, description="Who resolved the event")
acknowledged_at: Optional[datetime] = Field(None, description="Acknowledgment timestamp")
resolved_at: Optional[datetime] = Field(None, description="Resolution timestamp")
class Config:
json_encoders = {
datetime: lambda v: v.isoformat() if v else None
}
class DataQuery(BaseModel):
"""Data query parameters for historical data retrieval"""
sensor_ids: Optional[List[str]] = Field(None, description="Filter by sensor IDs")
rooms: Optional[List[str]] = Field(None, description="Filter by rooms")
sensor_types: Optional[List[SensorType]] = Field(None, description="Filter by sensor types")
# Time range
start_time: Optional[int] = Field(None, description="Start timestamp (Unix)")
end_time: Optional[int] = Field(None, description="End timestamp (Unix)")
# Aggregation
aggregate: Optional[str] = Field(None, description="Aggregation method (avg, sum, min, max)")
interval: Optional[str] = Field(None, description="Aggregation interval (1m, 5m, 1h, 1d)")
# Pagination
limit: int = Field(default=100, description="Maximum number of records to return")
offset: int = Field(default=0, description="Number of records to skip")
# Sorting
sort_by: str = Field(default="timestamp", description="Field to sort by")
sort_order: Literal["asc", "desc"] = Field(default="desc", description="Sort order")
class DataResponse(BaseModel):
"""Response model for data queries"""
data: List[Dict[str, Any]] = Field(default_factory=list, description="Query results")
total_count: int = Field(0, description="Total number of matching records")
query: DataQuery = Field(..., description="Original query parameters")
execution_time_ms: float = Field(..., description="Query execution time in milliseconds")
class HealthCheck(BaseModel):
"""Health check response model"""
status: str = Field(..., description="Overall system status")
timestamp: datetime = Field(default_factory=datetime.utcnow)
# Database status
mongodb_connected: bool = Field(..., description="MongoDB connection status")
redis_connected: bool = Field(..., description="Redis connection status")
# Data statistics
total_sensors: int = Field(0, description="Total number of registered sensors")
active_sensors: int = Field(0, description="Number of active sensors")
total_readings: int = Field(0, description="Total sensor readings in database")
# System metrics
uptime_seconds: float = Field(..., description="System uptime in seconds")
memory_usage_mb: Optional[float] = Field(None, description="Memory usage in MB")
class Config:
json_encoders = {
datetime: lambda v: v.isoformat()
}

View File

@@ -1,448 +0,0 @@
import json
import asyncio
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional
import logging
from pymongo.errors import DuplicateKeyError
import uuid
from database import get_database, redis_manager
from models import (
SensorReading, LegacySensorReading, SensorMetadata, RoomMetrics,
SystemEvent, SensorType, SensorStatus, CO2Status, OccupancyLevel
)
logger = logging.getLogger(__name__)
class DataPersistenceService:
"""Service for persisting sensor data to MongoDB and managing Redis cache"""
def __init__(self):
self.db = None
self.redis = redis_manager
async def initialize(self):
"""Initialize the persistence service"""
self.db = await get_database()
await self.redis.connect()
logger.info("Data persistence service initialized")
async def process_sensor_message(self, message_data: str) -> bool:
"""Process incoming sensor message and persist data"""
try:
# Parse the message
data = json.loads(message_data)
logger.debug(f"Processing sensor message: {data}")
# Determine message format and convert to standard format
if self._is_legacy_format(data):
sensor_reading = await self._convert_legacy_data(data)
else:
sensor_reading = SensorReading(**data)
# Store in MongoDB
await self._store_sensor_reading(sensor_reading)
# Update Redis cache for real-time access
await self._update_redis_cache(sensor_reading)
# Update sensor metadata
await self._update_sensor_metadata(sensor_reading)
# Calculate and store room metrics
await self._update_room_metrics(sensor_reading)
# Check for alerts and anomalies
await self._check_alerts(sensor_reading)
return True
except Exception as e:
logger.error(f"Error processing sensor message: {e}")
# Log the error event
await self._log_system_event(
event_type="data_processing_error",
severity="error",
title="Sensor Data Processing Failed",
description=f"Failed to process sensor message: {str(e)}",
data={"raw_message": message_data}
)
return False
def _is_legacy_format(self, data: dict) -> bool:
"""Check if data is in legacy format"""
legacy_keys = {"sensorId", "timestamp", "value", "unit"}
return legacy_keys.issubset(data.keys()) and "energy" not in data
async def _convert_legacy_data(self, data: dict) -> SensorReading:
"""Convert legacy format to new sensor reading format"""
legacy_reading = LegacySensorReading(**data)
return SensorReading(
sensor_id=legacy_reading.sensor_id,
sensor_type=SensorType.ENERGY, # Assume legacy data is energy
timestamp=legacy_reading.timestamp,
created_at=legacy_reading.created_at,
energy={
"value": legacy_reading.value,
"unit": legacy_reading.unit
}
)
async def _store_sensor_reading(self, reading: SensorReading):
"""Store sensor reading in MongoDB"""
try:
reading_dict = reading.dict()
# Add document ID for deduplication
reading_dict["_id"] = f"{reading.sensor_id}_{reading.timestamp}"
await self.db.sensor_readings.insert_one(reading_dict)
logger.debug(f"Stored sensor reading for {reading.sensor_id}")
except DuplicateKeyError:
logger.debug(f"Duplicate reading ignored for {reading.sensor_id} at {reading.timestamp}")
except Exception as e:
logger.error(f"Error storing sensor reading: {e}")
raise
async def _update_redis_cache(self, reading: SensorReading):
"""Update Redis cache with latest sensor data"""
try:
# Store latest reading for real-time access
await self.redis.set_sensor_data(
reading.sensor_id,
reading.dict(),
expire_time=3600 # 1 hour expiration
)
# Store sensor status
status_key = f"sensor:status:{reading.sensor_id}"
await self.redis.redis_client.setex(
status_key,
1800, # 30 minutes
json.dumps({
"status": "online",
"last_seen": reading.timestamp,
"room": reading.room
})
)
except Exception as e:
logger.error(f"Error updating Redis cache: {e}")
async def _update_sensor_metadata(self, reading: SensorReading):
"""Update or create sensor metadata"""
try:
# Check if sensor metadata exists
existing = await self.db.sensor_metadata.find_one({"sensor_id": reading.sensor_id})
if existing:
# Update existing metadata
await self.db.sensor_metadata.update_one(
{"sensor_id": reading.sensor_id},
{
"$set": {
"last_seen": datetime.utcnow(),
"status": SensorStatus.ONLINE.value,
"updated_at": datetime.utcnow()
},
"$addToSet": {
"monitoring_capabilities": reading.sensor_type.value
}
}
)
else:
# Create new sensor metadata
metadata = SensorMetadata(
sensor_id=reading.sensor_id,
name=f"Sensor {reading.sensor_id}",
sensor_type=reading.sensor_type,
room=reading.room,
status=SensorStatus.ONLINE,
last_seen=datetime.utcnow(),
monitoring_capabilities=[reading.sensor_type.value]
)
await self.db.sensor_metadata.insert_one(metadata.dict())
logger.info(f"Created metadata for new sensor: {reading.sensor_id}")
except Exception as e:
logger.error(f"Error updating sensor metadata: {e}")
async def _update_room_metrics(self, reading: SensorReading):
"""Calculate and store room-level metrics"""
if not reading.room:
return
try:
# Get recent readings for this room (last 5 minutes)
recent_time = datetime.utcnow() - timedelta(minutes=5)
# Query recent readings for the room
cursor = self.db.sensor_readings.find({
"room": reading.room,
"created_at": {"$gte": recent_time}
})
recent_readings = await cursor.to_list(length=None)
if not recent_readings:
return
# Calculate aggregated metrics
metrics = await self._calculate_room_metrics(reading.room, recent_readings)
# Store in MongoDB
await self.db.room_metrics.insert_one(metrics.dict())
# Cache in Redis
await self.redis.set_room_metrics(reading.room, metrics.dict())
logger.debug(f"Updated room metrics for {reading.room}")
except Exception as e:
logger.error(f"Error updating room metrics: {e}")
async def _calculate_room_metrics(self, room: str, readings: List[Dict]) -> RoomMetrics:
"""Calculate aggregated metrics for a room"""
# Group readings by sensor
sensors_data = {}
for reading in readings:
sensor_id = reading["sensor_id"]
if sensor_id not in sensors_data:
sensors_data[sensor_id] = []
sensors_data[sensor_id].append(reading)
# Initialize metrics
energy_values = []
co2_values = []
temperature_values = []
humidity_values = []
motion_detected = False
# Extract values from readings
for sensor_readings in sensors_data.values():
for reading in sensor_readings:
if reading.get("energy"):
energy_values.append(reading["energy"]["value"])
if reading.get("co2"):
co2_values.append(reading["co2"]["value"])
if reading.get("temperature"):
temperature_values.append(reading["temperature"]["value"])
if reading.get("humidity"):
humidity_values.append(reading["humidity"]["value"])
if reading.get("motion") and reading["motion"].get("value") == "Detected":
motion_detected = True
# Calculate aggregated metrics
metrics = RoomMetrics(
room=room,
timestamp=int(datetime.utcnow().timestamp()),
sensor_count=len(sensors_data),
active_sensors=list(sensors_data.keys()),
sensor_types=list(set(reading.get("sensor_type") for reading in readings if reading.get("sensor_type"))),
motion_detected=motion_detected
)
# Energy metrics
if energy_values:
metrics.energy = {
"current": sum(energy_values),
"average": sum(energy_values) / len(energy_values),
"total": sum(energy_values),
"peak": max(energy_values),
"unit": "kWh"
}
# CO2 metrics
if co2_values:
avg_co2 = sum(co2_values) / len(co2_values)
metrics.co2 = {
"current": avg_co2,
"average": avg_co2,
"max": max(co2_values),
"min": min(co2_values),
"status": self._get_co2_status(avg_co2).value,
"unit": "ppm"
}
# Set occupancy estimate based on CO2
metrics.occupancy_estimate = self._estimate_occupancy(avg_co2)
# Temperature metrics
if temperature_values:
metrics.temperature = {
"current": sum(temperature_values) / len(temperature_values),
"average": sum(temperature_values) / len(temperature_values),
"max": max(temperature_values),
"min": min(temperature_values),
"unit": "°C"
}
# Humidity metrics
if humidity_values:
metrics.humidity = {
"current": sum(humidity_values) / len(humidity_values),
"average": sum(humidity_values) / len(humidity_values),
"max": max(humidity_values),
"min": min(humidity_values),
"unit": "%"
}
return metrics
def _get_co2_status(self, co2_level: float) -> CO2Status:
"""Determine CO2 status based on level"""
if co2_level < 400:
return CO2Status.GOOD
elif co2_level < 1000:
return CO2Status.MODERATE
elif co2_level < 5000:
return CO2Status.POOR
else:
return CO2Status.CRITICAL
def _estimate_occupancy(self, co2_level: float) -> OccupancyLevel:
"""Estimate occupancy level based on CO2"""
if co2_level < 600:
return OccupancyLevel.LOW
elif co2_level < 1200:
return OccupancyLevel.MEDIUM
else:
return OccupancyLevel.HIGH
async def _check_alerts(self, reading: SensorReading):
"""Check for alert conditions and create system events"""
alerts = []
# CO2 level alerts
if reading.co2:
co2_level = reading.co2.get("value", 0)
if co2_level > 5000:
alerts.append({
"event_type": "co2_critical",
"severity": "critical",
"title": "Critical CO2 Level",
"description": f"CO2 level ({co2_level} ppm) exceeds critical threshold in {reading.room or 'unknown room'}"
})
elif co2_level > 1000:
alerts.append({
"event_type": "co2_high",
"severity": "warning",
"title": "High CO2 Level",
"description": f"CO2 level ({co2_level} ppm) is above recommended levels in {reading.room or 'unknown room'}"
})
# Energy consumption alerts
if reading.energy:
energy_value = reading.energy.get("value", 0)
if energy_value > 10: # Threshold for high energy consumption
alerts.append({
"event_type": "energy_high",
"severity": "warning",
"title": "High Energy Consumption",
"description": f"Energy consumption ({energy_value} kWh) is unusually high for sensor {reading.sensor_id}"
})
# Temperature alerts
if reading.temperature:
temp_value = reading.temperature.get("value", 0)
if temp_value > 30 or temp_value < 15:
alerts.append({
"event_type": "temperature_extreme",
"severity": "warning",
"title": "Extreme Temperature",
"description": f"Temperature ({temp_value}°C) is outside normal range in {reading.room or 'unknown room'}"
})
# Create system events for alerts
for alert in alerts:
await self._log_system_event(
sensor_id=reading.sensor_id,
room=reading.room,
**alert,
data=reading.dict()
)
async def _log_system_event(self, event_type: str, severity: str, title: str, description: str,
sensor_id: str = None, room: str = None, source: str = None, data: Dict = None):
"""Log a system event"""
try:
event = SystemEvent(
event_id=str(uuid.uuid4()),
event_type=event_type,
severity=severity,
timestamp=int(datetime.utcnow().timestamp()),
title=title,
description=description,
sensor_id=sensor_id,
room=room,
source=source or "data_persistence_service",
data=data or {}
)
await self.db.system_events.insert_one(event.dict())
logger.info(f"System event logged: {event_type} - {title}")
except Exception as e:
logger.error(f"Error logging system event: {e}")
async def get_recent_readings(self, sensor_id: str = None, room: str = None,
limit: int = 100, minutes: int = 60) -> List[Dict]:
"""Get recent sensor readings"""
try:
# Build query
query = {
"created_at": {"$gte": datetime.utcnow() - timedelta(minutes=minutes)}
}
if sensor_id:
query["sensor_id"] = sensor_id
if room:
query["room"] = room
cursor = self.db.sensor_readings.find(query).sort("created_at", -1).limit(limit)
readings = await cursor.to_list(length=limit)
return readings
except Exception as e:
logger.error(f"Error getting recent readings: {e}")
return []
async def get_sensor_statistics(self) -> Dict[str, Any]:
"""Get overall sensor statistics"""
try:
stats = {}
# Total readings count
stats["total_readings"] = await self.db.sensor_readings.count_documents({})
# Active sensors (sensors that sent data in last 24 hours)
recent_time = datetime.utcnow() - timedelta(hours=24)
active_sensors = await self.db.sensor_readings.distinct("sensor_id", {
"created_at": {"$gte": recent_time}
})
stats["active_sensors"] = len(active_sensors)
# Total registered sensors
stats["total_sensors"] = await self.db.sensor_metadata.count_documents({})
# Readings in last 24 hours
stats["recent_readings"] = await self.db.sensor_readings.count_documents({
"created_at": {"$gte": recent_time}
})
# Room count
stats["total_rooms"] = len(await self.db.sensor_readings.distinct("room", {"room": {"$ne": None}}))
return stats
except Exception as e:
logger.error(f"Error getting sensor statistics: {e}")
return {}
# Global persistence service instance
persistence_service = DataPersistenceService()

View File

@@ -1,10 +0,0 @@
fastapi
uvicorn[standard]
redis
websockets
pymongo
motor
python-dotenv
pandas
numpy
pydantic