- Implement RoomService for room management and metrics - Add AnalyticsService for sensor data analytics and trends - Extend models with Room, RoomCreate, RoomUpdate, RoomInfo - Add room CRUD endpoints to FastAPI app - Add database connection logic for MongoDB and Redis - Refactor sensor service logic into SensorService class
378 lines
17 KiB
Python
378 lines
17 KiB
Python
"""
|
|
Models for Sensor Management Service - integrating all original dashboard functionality
|
|
"""
|
|
|
|
from pydantic import BaseModel, Field
|
|
from typing import Optional, List, Dict, Any, Literal
|
|
from datetime import datetime
|
|
from enum import Enum
|
|
|
|
class SensorType(str, Enum):
|
|
ENERGY = "energy"
|
|
CO2 = "co2"
|
|
TEMPERATURE = "temperature"
|
|
HUMIDITY = "humidity"
|
|
HVAC = "hvac"
|
|
LIGHTING = "lighting"
|
|
SECURITY = "security"
|
|
MOTION = "motion"
|
|
|
|
class SensorStatus(str, Enum):
|
|
ONLINE = "online"
|
|
OFFLINE = "offline"
|
|
ERROR = "error"
|
|
MAINTENANCE = "maintenance"
|
|
|
|
class CO2Status(str, Enum):
|
|
GOOD = "good"
|
|
MODERATE = "moderate"
|
|
POOR = "poor"
|
|
CRITICAL = "critical"
|
|
|
|
class OccupancyLevel(str, Enum):
|
|
LOW = "low"
|
|
MEDIUM = "medium"
|
|
HIGH = "high"
|
|
|
|
# Base Models from original dashboard
|
|
class SensorReading(BaseModel):
|
|
"""Individual sensor reading model - enhanced from original"""
|
|
sensor_id: str = Field(..., description="Unique sensor identifier")
|
|
room: Optional[str] = Field(None, description="Room where sensor is located")
|
|
sensor_type: SensorType = Field(..., description="Type of sensor")
|
|
timestamp: int = Field(..., description="Unix timestamp of reading")
|
|
created_at: datetime = Field(default_factory=datetime.utcnow, description="Record creation timestamp")
|
|
|
|
# Sensor values with enhanced structure
|
|
energy: Optional[Dict[str, Any]] = Field(None, description="Energy reading with value and unit")
|
|
co2: Optional[Dict[str, Any]] = Field(None, description="CO2 reading with value and unit")
|
|
temperature: Optional[Dict[str, Any]] = Field(None, description="Temperature reading with value and unit")
|
|
humidity: Optional[Dict[str, Any]] = Field(None, description="Humidity reading with value and unit")
|
|
motion: Optional[Dict[str, Any]] = Field(None, description="Motion detection reading")
|
|
|
|
# Additional sensor types from tiocps
|
|
power: Optional[Dict[str, Any]] = Field(None, description="Power consumption reading")
|
|
voltage: Optional[Dict[str, Any]] = Field(None, description="Voltage reading")
|
|
current: Optional[Dict[str, Any]] = Field(None, description="Current reading")
|
|
generation: Optional[Dict[str, Any]] = Field(None, description="Energy generation reading")
|
|
|
|
# Metadata
|
|
metadata: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Additional sensor metadata")
|
|
|
|
class Config:
|
|
json_encoders = {
|
|
datetime: lambda v: v.isoformat()
|
|
}
|
|
|
|
class LegacySensorReading(BaseModel):
|
|
"""Legacy sensor reading format for backward compatibility"""
|
|
sensor_id: str = Field(..., alias="sensorId")
|
|
timestamp: int
|
|
value: float
|
|
unit: str
|
|
created_at: datetime = Field(default_factory=datetime.utcnow)
|
|
|
|
class Config:
|
|
allow_population_by_field_name = True
|
|
|
|
class SensorMetadata(BaseModel):
|
|
"""Enhanced sensor metadata from original dashboard"""
|
|
sensor_id: str = Field(..., description="Unique sensor identifier")
|
|
name: str = Field(..., description="Human-readable sensor name")
|
|
sensor_type: SensorType = Field(..., description="Type of sensor")
|
|
room: Optional[str] = Field(None, description="Room assignment")
|
|
status: SensorStatus = Field(default=SensorStatus.OFFLINE, description="Current sensor status")
|
|
|
|
# Physical location and installation details
|
|
location: Optional[str] = Field(None, description="Physical location description")
|
|
floor: Optional[str] = Field(None, description="Floor level")
|
|
building: Optional[str] = Field(None, description="Building identifier")
|
|
|
|
# Technical specifications
|
|
model: Optional[str] = Field(None, description="Sensor model")
|
|
manufacturer: Optional[str] = Field(None, description="Sensor manufacturer")
|
|
firmware_version: Optional[str] = Field(None, description="Firmware version")
|
|
hardware_version: Optional[str] = Field(None, description="Hardware version")
|
|
|
|
# Network and connectivity
|
|
ip_address: Optional[str] = Field(None, description="IP address if network connected")
|
|
mac_address: Optional[str] = Field(None, description="MAC address")
|
|
connection_type: Optional[str] = Field(None, description="Connection type (wifi, ethernet, zigbee, etc.)")
|
|
|
|
# Power and maintenance
|
|
battery_level: Optional[float] = Field(None, description="Battery level percentage")
|
|
last_maintenance: Optional[datetime] = Field(None, description="Last maintenance date")
|
|
next_maintenance: Optional[datetime] = Field(None, description="Next scheduled maintenance")
|
|
|
|
# Operational settings
|
|
sampling_rate: Optional[int] = Field(None, description="Data sampling rate in seconds")
|
|
calibration_date: Optional[datetime] = Field(None, description="Last calibration date")
|
|
|
|
# Capabilities from tiocps integration
|
|
monitoring_capabilities: List[str] = Field(default_factory=list, description="List of monitoring capabilities")
|
|
control_capabilities: List[str] = Field(default_factory=list, description="List of control capabilities")
|
|
demand_response_enabled: bool = Field(default=False, description="Demand response participation")
|
|
|
|
# Timestamps
|
|
installed_at: Optional[datetime] = Field(None, description="Installation timestamp")
|
|
last_seen: Optional[datetime] = Field(None, description="Last communication timestamp")
|
|
created_at: datetime = Field(default_factory=datetime.utcnow, description="Record creation timestamp")
|
|
updated_at: datetime = Field(default_factory=datetime.utcnow, description="Record update timestamp")
|
|
|
|
class Config:
|
|
json_encoders = {
|
|
datetime: lambda v: v.isoformat() if v else None
|
|
}
|
|
|
|
class RoomMetrics(BaseModel):
|
|
"""Enhanced room metrics from original dashboard"""
|
|
room: str = Field(..., description="Room identifier")
|
|
timestamp: int = Field(..., description="Metrics calculation timestamp")
|
|
created_at: datetime = Field(default_factory=datetime.utcnow, description="Record creation timestamp")
|
|
|
|
# Sensor inventory
|
|
sensor_count: int = Field(0, description="Total number of sensors in room")
|
|
active_sensors: List[str] = Field(default_factory=list, description="List of active sensor IDs")
|
|
sensor_types: List[SensorType] = Field(default_factory=list, description="Types of sensors present")
|
|
|
|
# Energy metrics (enhanced from tiocps)
|
|
energy: Optional[Dict[str, Any]] = Field(None, description="Energy consumption metrics")
|
|
power: Optional[Dict[str, Any]] = Field(None, description="Power consumption metrics")
|
|
generation: Optional[Dict[str, Any]] = Field(None, description="Energy generation metrics")
|
|
flexibility: Optional[Dict[str, Any]] = Field(None, description="Energy flexibility metrics")
|
|
|
|
# Environmental metrics
|
|
co2: Optional[Dict[str, Any]] = Field(None, description="CO2 level metrics")
|
|
temperature: Optional[Dict[str, Any]] = Field(None, description="Temperature metrics")
|
|
humidity: Optional[Dict[str, Any]] = Field(None, description="Humidity metrics")
|
|
|
|
# Occupancy and usage
|
|
occupancy_estimate: OccupancyLevel = Field(default=OccupancyLevel.LOW, description="Estimated occupancy level")
|
|
motion_detected: bool = Field(default=False, description="Recent motion detection status")
|
|
|
|
# Time-based metrics
|
|
last_activity: Optional[datetime] = Field(None, description="Last detected activity timestamp")
|
|
daily_usage_hours: Optional[float] = Field(None, description="Estimated daily usage in hours")
|
|
|
|
# Economic metrics from tiocps
|
|
energy_cost: Optional[float] = Field(None, description="Estimated energy cost")
|
|
savings_potential: Optional[float] = Field(None, description="Potential savings from optimization")
|
|
|
|
class Config:
|
|
json_encoders = {
|
|
datetime: lambda v: v.isoformat() if v else None
|
|
}
|
|
|
|
class Room(BaseModel):
|
|
"""Room definition model"""
|
|
name: str = Field(..., description="Room name/identifier")
|
|
display_name: Optional[str] = Field(None, description="Human-readable room name")
|
|
floor: Optional[str] = Field(None, description="Floor level")
|
|
building: Optional[str] = Field(None, description="Building identifier")
|
|
area_m2: Optional[float] = Field(None, description="Room area in square meters")
|
|
capacity: Optional[int] = Field(None, description="Room capacity (people)")
|
|
room_type: Optional[str] = Field(None, description="Room type (office, meeting, etc.)")
|
|
|
|
# Configuration
|
|
target_temperature: Optional[float] = Field(None, description="Target temperature")
|
|
target_co2: Optional[float] = Field(None, description="Target CO2 level")
|
|
operating_hours: Optional[Dict[str, Any]] = Field(None, description="Operating hours schedule")
|
|
|
|
# Status
|
|
active: bool = Field(default=True, description="Whether room is active")
|
|
created_at: datetime = Field(default_factory=datetime.utcnow)
|
|
updated_at: datetime = Field(default_factory=datetime.utcnow)
|
|
|
|
class Config:
|
|
json_encoders = {
|
|
datetime: lambda v: v.isoformat()
|
|
}
|
|
|
|
class SystemEvent(BaseModel):
|
|
"""Enhanced system events from original dashboard"""
|
|
event_id: str = Field(..., description="Unique event identifier")
|
|
event_type: str = Field(..., description="Type of event")
|
|
severity: Literal["info", "warning", "error", "critical"] = Field(..., description="Event severity")
|
|
timestamp: int = Field(..., description="Event timestamp")
|
|
created_at: datetime = Field(default_factory=datetime.utcnow, description="Record creation timestamp")
|
|
|
|
# Event details
|
|
title: str = Field(..., description="Event title")
|
|
description: str = Field(..., description="Event description")
|
|
source: Optional[str] = Field(None, description="Event source (sensor_id, system component, etc.)")
|
|
|
|
# Context
|
|
sensor_id: Optional[str] = Field(None, description="Related sensor ID")
|
|
room: Optional[str] = Field(None, description="Related room")
|
|
|
|
# Event data
|
|
data: Optional[Dict[str, Any]] = Field(default_factory=dict, description="Additional event data")
|
|
|
|
# Status tracking
|
|
acknowledged: bool = Field(default=False, description="Whether event has been acknowledged")
|
|
resolved: bool = Field(default=False, description="Whether event has been resolved")
|
|
acknowledged_by: Optional[str] = Field(None, description="Who acknowledged the event")
|
|
resolved_by: Optional[str] = Field(None, description="Who resolved the event")
|
|
acknowledged_at: Optional[datetime] = Field(None, description="Acknowledgment timestamp")
|
|
resolved_at: Optional[datetime] = Field(None, description="Resolution timestamp")
|
|
|
|
class Config:
|
|
json_encoders = {
|
|
datetime: lambda v: v.isoformat() if v else None
|
|
}
|
|
|
|
class DataQuery(BaseModel):
|
|
"""Enhanced data query parameters from original dashboard"""
|
|
sensor_ids: Optional[List[str]] = Field(None, description="Filter by sensor IDs")
|
|
rooms: Optional[List[str]] = Field(None, description="Filter by rooms")
|
|
sensor_types: Optional[List[SensorType]] = Field(None, description="Filter by sensor types")
|
|
|
|
# Time range
|
|
start_time: Optional[int] = Field(None, description="Start timestamp (Unix)")
|
|
end_time: Optional[int] = Field(None, description="End timestamp (Unix)")
|
|
|
|
# Aggregation
|
|
aggregate: Optional[str] = Field(None, description="Aggregation method (avg, sum, min, max)")
|
|
interval: Optional[str] = Field(None, description="Aggregation interval (1m, 5m, 1h, 1d)")
|
|
|
|
# Pagination
|
|
limit: int = Field(default=100, description="Maximum number of records to return")
|
|
offset: int = Field(default=0, description="Number of records to skip")
|
|
|
|
# Sorting
|
|
sort_by: str = Field(default="timestamp", description="Field to sort by")
|
|
sort_order: Literal["asc", "desc"] = Field(default="desc", description="Sort order")
|
|
|
|
# Additional filters from tiocps
|
|
energy_threshold: Optional[float] = Field(None, description="Filter by energy threshold")
|
|
co2_threshold: Optional[float] = Field(None, description="Filter by CO2 threshold")
|
|
include_metadata: bool = Field(default=False, description="Include sensor metadata in response")
|
|
|
|
class DataResponse(BaseModel):
|
|
"""Enhanced response model for data queries"""
|
|
data: List[Dict[str, Any]] = Field(default_factory=list, description="Query results")
|
|
total_count: int = Field(0, description="Total number of matching records")
|
|
query: DataQuery = Field(..., description="Original query parameters")
|
|
execution_time_ms: float = Field(..., description="Query execution time in milliseconds")
|
|
|
|
# Additional metadata
|
|
aggregation_applied: bool = Field(default=False, description="Whether data was aggregated")
|
|
cache_hit: bool = Field(default=False, description="Whether result was served from cache")
|
|
|
|
class AnalyticsSummary(BaseModel):
|
|
"""Comprehensive analytics summary"""
|
|
period_hours: int
|
|
start_time: datetime
|
|
end_time: datetime
|
|
|
|
# Sensor analytics
|
|
total_sensors: int
|
|
active_sensors: int
|
|
sensor_types_summary: Dict[str, int]
|
|
|
|
# Room analytics
|
|
total_rooms: int
|
|
active_rooms: int
|
|
room_occupancy_summary: Dict[str, int]
|
|
|
|
# Energy analytics
|
|
total_energy_consumption: float
|
|
total_energy_generation: float
|
|
net_energy_consumption: float
|
|
energy_efficiency: float
|
|
|
|
# Environmental analytics
|
|
average_co2: float
|
|
average_temperature: float
|
|
average_humidity: float
|
|
|
|
# System health
|
|
system_events_count: int
|
|
critical_events_count: int
|
|
sensor_errors_count: int
|
|
|
|
class Config:
|
|
json_encoders = {
|
|
datetime: lambda v: v.isoformat()
|
|
}
|
|
|
|
# Room Management Models
|
|
class Room(BaseModel):
|
|
"""Room model for database storage and API responses"""
|
|
name: str = Field(..., description="Unique room name")
|
|
description: Optional[str] = Field(None, description="Room description")
|
|
floor: Optional[str] = Field(None, description="Floor designation")
|
|
building: Optional[str] = Field(None, description="Building name")
|
|
area: Optional[float] = Field(None, description="Room area in square meters")
|
|
capacity: Optional[int] = Field(None, description="Maximum occupancy")
|
|
room_type: Optional[str] = Field(None, description="Room type (office, meeting, storage, etc.)")
|
|
|
|
# Metadata
|
|
created_at: datetime = Field(default_factory=datetime.utcnow, description="Room creation timestamp")
|
|
updated_at: datetime = Field(default_factory=datetime.utcnow, description="Room update timestamp")
|
|
|
|
class Config:
|
|
json_encoders = {
|
|
datetime: lambda v: v.isoformat() if v else None
|
|
}
|
|
|
|
class RoomCreate(BaseModel):
|
|
"""Model for creating new rooms"""
|
|
name: str = Field(..., description="Unique room name", min_length=1, max_length=100)
|
|
description: Optional[str] = Field(None, description="Room description", max_length=500)
|
|
floor: Optional[str] = Field(None, description="Floor designation", max_length=50)
|
|
building: Optional[str] = Field(None, description="Building name", max_length=100)
|
|
area: Optional[float] = Field(None, description="Room area in square meters", gt=0)
|
|
capacity: Optional[int] = Field(None, description="Maximum occupancy", gt=0)
|
|
room_type: Optional[str] = Field(None, description="Room type", max_length=50)
|
|
|
|
class RoomUpdate(BaseModel):
|
|
"""Model for updating existing rooms"""
|
|
description: Optional[str] = Field(None, description="Room description", max_length=500)
|
|
floor: Optional[str] = Field(None, description="Floor designation", max_length=50)
|
|
building: Optional[str] = Field(None, description="Building name", max_length=100)
|
|
area: Optional[float] = Field(None, description="Room area in square meters", gt=0)
|
|
capacity: Optional[int] = Field(None, description="Maximum occupancy", gt=0)
|
|
room_type: Optional[str] = Field(None, description="Room type", max_length=50)
|
|
|
|
class RoomInfo(BaseModel):
|
|
"""Comprehensive room information for API responses"""
|
|
name: str = Field(..., description="Room name")
|
|
description: Optional[str] = Field(None, description="Room description")
|
|
floor: Optional[str] = Field(None, description="Floor designation")
|
|
building: Optional[str] = Field(None, description="Building name")
|
|
area: Optional[float] = Field(None, description="Room area in square meters")
|
|
capacity: Optional[int] = Field(None, description="Maximum occupancy")
|
|
room_type: Optional[str] = Field(None, description="Room type")
|
|
|
|
# Runtime information
|
|
sensor_count: int = Field(0, description="Number of sensors in room")
|
|
active_sensors: int = Field(0, description="Number of active sensors")
|
|
last_updated: Optional[datetime] = Field(None, description="Last metrics update")
|
|
|
|
# Timestamps
|
|
created_at: datetime = Field(..., description="Room creation timestamp")
|
|
updated_at: datetime = Field(..., description="Room update timestamp")
|
|
|
|
class Config:
|
|
json_encoders = {
|
|
datetime: lambda v: v.isoformat() if v else None
|
|
}
|
|
|
|
class HealthResponse(BaseModel):
|
|
"""Health check response"""
|
|
service: str
|
|
status: str
|
|
timestamp: datetime
|
|
version: str
|
|
|
|
# Additional service-specific health metrics
|
|
total_sensors: Optional[int] = None
|
|
active_sensors: Optional[int] = None
|
|
total_rooms: Optional[int] = None
|
|
websocket_connections: Optional[int] = None
|
|
|
|
class Config:
|
|
json_encoders = {
|
|
datetime: lambda v: v.isoformat()
|
|
} |