- Store energy data in separate MongoDB collections for each SLGs/Community/Building directory - Update FTP monitor and database manager to track directory paths and select appropriate collections - Add collection stats to database statistics API - Update sensor and token services for improved API consistency - Add 'rb' (rebuild and restart) option to deploy.sh script
355 lines
14 KiB
Python
355 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Enhanced Data Simulator for Bootstrap Sensors
|
|
Generates realistic real-time sensor data for the bootstrap sensors created by bootstrap_sensors.py
|
|
"""
|
|
|
|
import redis
|
|
import time
|
|
import random
|
|
import json
|
|
import logging
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Any
|
|
import math
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Redis configuration
|
|
REDIS_HOST = 'localhost'
|
|
REDIS_PORT = 6379
|
|
REDIS_CHANNEL = "energy_data"
|
|
|
|
# Bootstrap sensor IDs (must match bootstrap_sensors.py)
|
|
BOOTSTRAP_SENSORS = {
|
|
# Living Room Sensors
|
|
"lr_energy_001": {"type": "energy", "room": "living_room", "base_value": 2.5, "variance": 1.2},
|
|
"lr_co2_001": {"type": "co2", "room": "living_room", "base_value": 420, "variance": 80},
|
|
"lr_temp_001": {"type": "temperature", "room": "living_room", "base_value": 22.0, "variance": 2.0},
|
|
|
|
# Kitchen Sensors
|
|
"kt_energy_001": {"type": "energy", "room": "kitchen", "base_value": 3.8, "variance": 2.1},
|
|
"kt_humidity_001": {"type": "humidity", "room": "kitchen", "base_value": 45.0, "variance": 15.0},
|
|
"kt_temp_001": {"type": "temperature", "room": "kitchen", "base_value": 24.0, "variance": 3.0},
|
|
|
|
# Bedroom Sensors
|
|
"br_energy_001": {"type": "energy", "room": "bedroom", "base_value": 1.2, "variance": 0.8},
|
|
"br_co2_001": {"type": "co2", "room": "bedroom", "base_value": 480, "variance": 120},
|
|
"br_temp_001": {"type": "temperature", "room": "bedroom", "base_value": 20.5, "variance": 1.5},
|
|
|
|
# Office Sensors
|
|
"of_energy_001": {"type": "energy", "room": "office", "base_value": 2.1, "variance": 1.5},
|
|
"of_co2_001": {"type": "co2", "room": "office", "base_value": 450, "variance": 100},
|
|
"of_motion_001": {"type": "motion", "room": "office", "base_value": 0, "variance": 1},
|
|
|
|
# Bathroom Sensors
|
|
"bt_humidity_001": {"type": "humidity", "room": "bathroom", "base_value": 65.0, "variance": 20.0},
|
|
"bt_temp_001": {"type": "temperature", "room": "bathroom", "base_value": 23.0, "variance": 2.5},
|
|
|
|
# Garage Sensors
|
|
"gr_energy_001": {"type": "energy", "room": "garage", "base_value": 0.8, "variance": 0.5},
|
|
"gr_motion_001": {"type": "motion", "room": "garage", "base_value": 0, "variance": 1}
|
|
}
|
|
|
|
class SensorDataGenerator:
|
|
"""Generates realistic sensor data with time-based patterns"""
|
|
|
|
def __init__(self):
|
|
self.start_time = time.time()
|
|
self.motion_states = {} # Track motion sensor states
|
|
|
|
# Initialize motion states
|
|
for sensor_id, config in BOOTSTRAP_SENSORS.items():
|
|
if config["type"] == "motion":
|
|
self.motion_states[sensor_id] = {"active": False, "last_change": time.time()}
|
|
|
|
def get_time_factor(self) -> float:
|
|
"""Get time-based multiplier for realistic daily patterns"""
|
|
current_hour = datetime.now().hour
|
|
|
|
# Energy usage patterns (higher during day, lower at night)
|
|
if 6 <= current_hour <= 22: # Daytime
|
|
return 1.0 + 0.3 * math.sin((current_hour - 6) * math.pi / 16)
|
|
else: # Nighttime
|
|
return 0.3 + 0.2 * random.random()
|
|
|
|
def get_occupancy_factor(self, room: str) -> float:
|
|
"""Get occupancy-based multiplier for different rooms"""
|
|
current_hour = datetime.now().hour
|
|
|
|
occupancy_patterns = {
|
|
"living_room": 1.2 if 18 <= current_hour <= 23 else 0.8,
|
|
"kitchen": 1.5 if 7 <= current_hour <= 9 or 17 <= current_hour <= 20 else 0.6,
|
|
"bedroom": 1.3 if 22 <= current_hour or current_hour <= 7 else 0.4,
|
|
"office": 1.4 if 9 <= current_hour <= 17 else 0.3,
|
|
"bathroom": 1.0, # Consistent usage
|
|
"garage": 0.8 if 7 <= current_hour <= 9 or 17 <= current_hour <= 19 else 0.2
|
|
}
|
|
|
|
return occupancy_patterns.get(room, 1.0)
|
|
|
|
def generate_energy_reading(self, sensor_id: str, config: Dict) -> Dict[str, Any]:
|
|
"""Generate realistic energy consumption reading"""
|
|
base_value = config["base_value"]
|
|
variance = config["variance"]
|
|
room = config["room"]
|
|
|
|
# Apply time and occupancy factors
|
|
time_factor = self.get_time_factor()
|
|
occupancy_factor = self.get_occupancy_factor(room)
|
|
|
|
# Add some randomness
|
|
random_factor = 1.0 + (random.random() - 0.5) * 0.4
|
|
|
|
# Calculate final value
|
|
value = base_value * time_factor * occupancy_factor * random_factor
|
|
value = max(0.1, value) # Ensure minimum consumption
|
|
|
|
return {
|
|
"sensor_id": sensor_id,
|
|
"room": room,
|
|
"sensor_type": "energy",
|
|
"timestamp": int(time.time()),
|
|
"energy": {
|
|
"value": round(value, 3),
|
|
"unit": "kWh"
|
|
},
|
|
"metadata": {
|
|
"time_factor": round(time_factor, 2),
|
|
"occupancy_factor": round(occupancy_factor, 2)
|
|
}
|
|
}
|
|
|
|
def generate_co2_reading(self, sensor_id: str, config: Dict) -> Dict[str, Any]:
|
|
"""Generate realistic CO2 level reading"""
|
|
base_value = config["base_value"]
|
|
variance = config["variance"]
|
|
room = config["room"]
|
|
|
|
# CO2 increases with occupancy
|
|
occupancy_factor = self.get_occupancy_factor(room)
|
|
co2_increase = (occupancy_factor - 0.5) * 150
|
|
|
|
# Add random fluctuation
|
|
random_variation = (random.random() - 0.5) * variance
|
|
|
|
value = base_value + co2_increase + random_variation
|
|
value = max(350, min(2000, value)) # Realistic CO2 range
|
|
|
|
return {
|
|
"sensor_id": sensor_id,
|
|
"room": room,
|
|
"sensor_type": "co2",
|
|
"timestamp": int(time.time()),
|
|
"co2": {
|
|
"value": round(value, 1),
|
|
"unit": "ppm"
|
|
},
|
|
"metadata": {
|
|
"quality_level": "good" if value < 600 else "moderate" if value < 1000 else "poor"
|
|
}
|
|
}
|
|
|
|
def generate_temperature_reading(self, sensor_id: str, config: Dict) -> Dict[str, Any]:
|
|
"""Generate realistic temperature reading"""
|
|
base_value = config["base_value"]
|
|
variance = config["variance"]
|
|
room = config["room"]
|
|
|
|
# Temperature varies with time of day and occupancy
|
|
current_hour = datetime.now().hour
|
|
daily_variation = 2 * math.sin((current_hour - 6) * math.pi / 12)
|
|
|
|
occupancy_factor = self.get_occupancy_factor(room)
|
|
occupancy_heat = (occupancy_factor - 0.5) * 1.5
|
|
|
|
random_variation = (random.random() - 0.5) * variance
|
|
|
|
value = base_value + daily_variation + occupancy_heat + random_variation
|
|
|
|
return {
|
|
"sensor_id": sensor_id,
|
|
"room": room,
|
|
"sensor_type": "temperature",
|
|
"timestamp": int(time.time()),
|
|
"temperature": {
|
|
"value": round(value, 1),
|
|
"unit": "°C"
|
|
}
|
|
}
|
|
|
|
def generate_humidity_reading(self, sensor_id: str, config: Dict) -> Dict[str, Any]:
|
|
"""Generate realistic humidity reading"""
|
|
base_value = config["base_value"]
|
|
variance = config["variance"]
|
|
room = config["room"]
|
|
|
|
# Humidity patterns based on room usage
|
|
if room == "bathroom":
|
|
# Higher spikes during usage times
|
|
current_hour = datetime.now().hour
|
|
if 7 <= current_hour <= 9 or 19 <= current_hour <= 22:
|
|
usage_spike = random.uniform(10, 25)
|
|
else:
|
|
usage_spike = 0
|
|
elif room == "kitchen":
|
|
# Cooking increases humidity
|
|
current_hour = datetime.now().hour
|
|
if 17 <= current_hour <= 20:
|
|
usage_spike = random.uniform(5, 15)
|
|
else:
|
|
usage_spike = 0
|
|
else:
|
|
usage_spike = 0
|
|
|
|
random_variation = (random.random() - 0.5) * variance
|
|
value = base_value + usage_spike + random_variation
|
|
value = max(20, min(95, value)) # Realistic humidity range
|
|
|
|
return {
|
|
"sensor_id": sensor_id,
|
|
"room": room,
|
|
"sensor_type": "humidity",
|
|
"timestamp": int(time.time()),
|
|
"humidity": {
|
|
"value": round(value, 1),
|
|
"unit": "%"
|
|
}
|
|
}
|
|
|
|
def generate_motion_reading(self, sensor_id: str, config: Dict) -> Dict[str, Any]:
|
|
"""Generate realistic motion detection reading"""
|
|
room = config["room"]
|
|
current_time = time.time()
|
|
|
|
# Get current state
|
|
if sensor_id not in self.motion_states:
|
|
self.motion_states[sensor_id] = {"active": False, "last_change": current_time}
|
|
|
|
state = self.motion_states[sensor_id]
|
|
|
|
# Determine if motion should be detected based on occupancy patterns
|
|
occupancy_factor = self.get_occupancy_factor(room)
|
|
motion_probability = occupancy_factor * 0.3 # 30% chance when occupied
|
|
|
|
# Change state based on probability and time since last change
|
|
time_since_change = current_time - state["last_change"]
|
|
|
|
if state["active"]:
|
|
# If motion is active, chance to stop after some time
|
|
if time_since_change > 30: # At least 30 seconds of motion
|
|
if random.random() < 0.4: # 40% chance to stop
|
|
state["active"] = False
|
|
state["last_change"] = current_time
|
|
else:
|
|
# If no motion, chance to start based on occupancy
|
|
if time_since_change > 10: # At least 10 seconds of no motion
|
|
if random.random() < motion_probability:
|
|
state["active"] = True
|
|
state["last_change"] = current_time
|
|
|
|
return {
|
|
"sensor_id": sensor_id,
|
|
"room": room,
|
|
"sensor_type": "motion",
|
|
"timestamp": int(time.time()),
|
|
"motion": {
|
|
"value": 1 if state["active"] else 0,
|
|
"unit": "detected"
|
|
},
|
|
"metadata": {
|
|
"duration_seconds": int(time_since_change) if state["active"] else 0
|
|
}
|
|
}
|
|
|
|
def generate_sensor_reading(self, sensor_id: str) -> Dict[str, Any]:
|
|
"""Generate appropriate reading based on sensor type"""
|
|
if sensor_id not in BOOTSTRAP_SENSORS:
|
|
logger.warning(f"Unknown sensor ID: {sensor_id}")
|
|
return None
|
|
|
|
config = BOOTSTRAP_SENSORS[sensor_id]
|
|
sensor_type = config["type"]
|
|
|
|
if sensor_type == "energy":
|
|
return self.generate_energy_reading(sensor_id, config)
|
|
elif sensor_type == "co2":
|
|
return self.generate_co2_reading(sensor_id, config)
|
|
elif sensor_type == "temperature":
|
|
return self.generate_temperature_reading(sensor_id, config)
|
|
elif sensor_type == "humidity":
|
|
return self.generate_humidity_reading(sensor_id, config)
|
|
elif sensor_type == "motion":
|
|
return self.generate_motion_reading(sensor_id, config)
|
|
else:
|
|
logger.warning(f"Unknown sensor type: {sensor_type}")
|
|
return None
|
|
|
|
def main():
|
|
"""Main simulation loop"""
|
|
logger.info("=== Starting Enhanced Data Simulator ===")
|
|
|
|
# Connect to Redis
|
|
try:
|
|
redis_client = redis.Redis(host=REDIS_HOST, port=REDIS_PORT, db=0, decode_responses=True)
|
|
redis_client.ping()
|
|
logger.info(f"Successfully connected to Redis at {REDIS_HOST}:{REDIS_PORT}")
|
|
except redis.exceptions.ConnectionError as e:
|
|
logger.error(f"Could not connect to Redis: {e}")
|
|
return
|
|
|
|
# Initialize data generator
|
|
generator = SensorDataGenerator()
|
|
|
|
logger.info(f"Loaded {len(BOOTSTRAP_SENSORS)} bootstrap sensors")
|
|
logger.info(f"Publishing to Redis channel: '{REDIS_CHANNEL}'")
|
|
logger.info("Press Ctrl+C to stop simulation")
|
|
|
|
sensor_ids = list(BOOTSTRAP_SENSORS.keys())
|
|
|
|
try:
|
|
while True:
|
|
sensors_produced = []
|
|
for a in range(5):
|
|
|
|
# Generate data for a random sensor
|
|
sensor_id = random.choice(sensor_ids)
|
|
sensors_produced.append(sensor_id)
|
|
reading = generator.generate_sensor_reading(sensor_id)
|
|
|
|
if reading:
|
|
# Publish to Redis
|
|
payload = json.dumps(reading)
|
|
redis_client.publish(REDIS_CHANNEL, payload)
|
|
|
|
# Log the reading
|
|
sensor_type = reading["sensor_type"]
|
|
room = reading["room"]
|
|
value_info = ""
|
|
|
|
if "energy" in reading:
|
|
value_info = f"{reading['energy']['value']} {reading['energy']['unit']}"
|
|
elif "co2" in reading:
|
|
value_info = f"{reading['co2']['value']} {reading['co2']['unit']}"
|
|
elif "temperature" in reading:
|
|
value_info = f"{reading['temperature']['value']} {reading['temperature']['unit']}"
|
|
elif "humidity" in reading:
|
|
value_info = f"{reading['humidity']['value']} {reading['humidity']['unit']}"
|
|
elif "motion" in reading:
|
|
value_info = f"{'DETECTED' if reading['motion']['value'] else 'CLEAR'}"
|
|
|
|
logger.info(f"📊 {sensor_id} ({room}/{sensor_type}): {value_info}")
|
|
|
|
# Random interval between readings (1-5 seconds)
|
|
time.sleep(random.uniform(1, 5))
|
|
|
|
except KeyboardInterrupt:
|
|
logger.info("Stopping data simulation...")
|
|
except Exception as e:
|
|
logger.error(f"Simulation error: {e}")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|