""" Demand Response Service - Core Business Logic Handles DR invitations, event execution, auto-response, and flexibility calculation """ import asyncio import json import uuid from datetime import datetime, timedelta from typing import List, Dict, Optional, Any import logging from motor.motor_asyncio import AsyncIOMotorDatabase import redis.asyncio as redis # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class DemandResponseService: """Core Demand Response service business logic""" def __init__(self, db: AsyncIOMotorDatabase, redis_client: redis.Redis): self.db = db self.redis = redis_client self.active_events: Dict[str, asyncio.Task] = {} # event_id -> task self.device_power_cache: Dict[str, float] = {} # device_id -> power_kw (updated by Redis subscriber) # ===== INVITATION MANAGEMENT ===== async def send_invitation( self, event_time: datetime, load_kwh: float, load_percentage: float, iots: List[str], duration_minutes: int = 59 ) -> Dict[str, Any]: """ Create and send DR invitation Returns: {"event_id": str, "response": str, "message": str} """ logger.info(f"Creating DR invitation for {len(iots)} devices at {event_time}") # Generate unique event ID event_id = str(uuid.uuid4()) # Check auto-response configuration auto_config = await self.get_auto_response_config() response = "YES" if auto_config.get("enabled", False) else "WAITING" # Create invitation document invitation = { "event_id": event_id, "created_at": datetime.utcnow(), "event_time": event_time, "load_kwh": load_kwh, "load_percentage": load_percentage, "iots": iots, "duration_minutes": duration_minutes, "response": response, "status": "pending" } # Store in MongoDB await self.db.demand_response_invitations.insert_one(invitation) # Cache in Redis for fast access (24 hour TTL) cache_key = f"dr:invitation:{event_id}" await self.redis.setex( cache_key, 86400, json.dumps(invitation, default=str) ) # Publish event to Redis pub/sub await self.redis.publish("dr_events", json.dumps({ "event": "invitation_created", "event_id": event_id, "event_time": event_time.isoformat(), "load_kwh": load_kwh, "response": response })) logger.info(f"Invitation {event_id} created with response: {response}") return { "event_id": event_id, "response": response, "message": "Invitation created successfully" } async def answer_invitation( self, event_id: str, iot_id: str, response: str, committed_reduction_kw: Optional[float] = None ) -> Dict[str, Any]: """ Record device response to invitation Returns: {"success": bool, "message": str} """ logger.info(f"Recording response for invitation {event_id}, device {iot_id}: {response}") # Validate invitation exists invitation = await self.get_invitation(event_id) if not invitation: return {"success": False, "message": f"Invitation {event_id} not found"} if iot_id not in invitation["iots"]: return {"success": False, "message": f"Device {iot_id} not in invitation"} # Check if already responded existing = await self.db.demand_response_responses.find_one({ "event_id": event_id, "device_id": iot_id }) if existing: return {"success": False, "message": f"Device {iot_id} has already responded"} # Store response response_doc = { "event_id": event_id, "device_id": iot_id, "response": response, "committed_reduction_kw": committed_reduction_kw, "responded_at": datetime.utcnow() } await self.db.demand_response_responses.insert_one(response_doc) # Check if all devices have responded total_devices = len(invitation["iots"]) total_responses = await self.db.demand_response_responses.count_documents({"event_id": event_id}) if total_responses == total_devices: # All devices responded - update invitation status yes_count = await self.db.demand_response_responses.count_documents({ "event_id": event_id, "response": "YES" }) all_yes = yes_count == total_devices new_response = "YES" if all_yes else "NO" new_status = "scheduled" if all_yes else "cancelled" await self.db.demand_response_invitations.update_one( {"event_id": event_id}, {"$set": {"response": new_response, "status": new_status}} ) logger.info(f"Invitation {event_id} final response: {new_response} (status: {new_status})") # Clear cache await self.redis.delete(f"dr:invitation:{event_id}") # Publish event await self.redis.publish("dr_events", json.dumps({ "event": "invitation_answered", "event_id": event_id, "device_id": iot_id, "response": response })) return {"success": True, "message": "Response recorded successfully"} async def get_invitation(self, event_id: str) -> Optional[Dict[str, Any]]: """ Get invitation by event_id (with Redis caching) """ # Try cache first cache_key = f"dr:invitation:{event_id}" cached = await self.redis.get(cache_key) if cached: invitation = json.loads(cached) return invitation # Fallback to MongoDB invitation = await self.db.demand_response_invitations.find_one({"event_id": event_id}) if invitation: invitation["_id"] = str(invitation["_id"]) # Cache for 24 hours await self.redis.setex( cache_key, 86400, json.dumps(invitation, default=str) ) return invitation return None async def get_unanswered_invitations(self) -> List[Dict[str, Any]]: """Get all pending invitations awaiting response""" cursor = self.db.demand_response_invitations.find({ "response": "WAITING", "status": "pending" }).sort("created_at", -1) invitations = [] async for inv in cursor: inv["_id"] = str(inv["_id"]) invitations.append(inv) return invitations async def get_answered_invitations(self, hours: int = 24, limit: int = 50) -> List[Dict[str, Any]]: """Get recent answered invitations""" start_time = datetime.utcnow() - timedelta(hours=hours) cursor = self.db.demand_response_invitations.find({ "response": {"$ne": "WAITING"}, "created_at": {"$gte": start_time} }).sort("created_at", -1).limit(limit) invitations = [] async for inv in cursor: inv["_id"] = str(inv["_id"]) invitations.append(inv) return invitations # ===== EVENT EXECUTION ===== async def schedule_event( self, event_time: datetime, iots: List[str], load_reduction_kw: float, duration_minutes: int = 59 ) -> Dict[str, Any]: """ Schedule a DR event for execution Returns: {"event_id": str, "message": str} """ logger.info(f"Scheduling DR event for {len(iots)} devices at {event_time}") # Create event document event_id = str(uuid.uuid4()) end_time = event_time + timedelta(minutes=duration_minutes) event = { "event_id": event_id, "start_time": event_time, "end_time": end_time, "status": "scheduled", "participating_devices": iots, "target_reduction_kw": load_reduction_kw, "actual_reduction_kw": 0.0, "power_samples": [] } await self.db.demand_response_events.insert_one(event) # Publish scheduled event await self.redis.publish("dr_events", json.dumps({ "event": "event_scheduled", "event_id": event_id, "start_time": event_time.isoformat(), "end_time": end_time.isoformat(), "devices": iots })) logger.info(f"Event {event_id} scheduled successfully") return { "event_id": event_id, "message": "Event scheduled successfully" } async def execute_event(self, event_id: str): """ Execute a DR event (spawns background task) """ logger.info(f"Executing DR event {event_id}") # Get event details event = await self.db.demand_response_events.find_one({"event_id": event_id}) if not event: logger.error(f"Event {event_id} not found") return # Update status to active await self.db.demand_response_events.update_one( {"event_id": event_id}, {"$set": {"status": "active", "actual_start_time": datetime.utcnow()}} ) # Publish event started await self.redis.publish("dr_events", json.dumps({ "event": "event_started", "event_id": event_id, "devices": event["participating_devices"] })) # Create and store async task for this event task = asyncio.create_task(self._run_event_loop(event)) self.active_events[event_id] = task logger.info(f"DR event {event_id} started successfully") async def _run_event_loop(self, event: Dict[str, Any]): """ CRITICAL: Core event execution loop - runs for duration_minutes Samples power every 5 seconds, accumulates reduction, handles cancellation """ event_id = event["event_id"] end_time = event["end_time"] devices = event["participating_devices"] total_reduction_kwh = 0.0 sample_count = 0 logger.info(f"Starting event loop for {event_id}, ending at {end_time}") try: while datetime.utcnow() < end_time: # Get current power for all participating devices from cache device_powers = { device_id: self.device_power_cache.get(device_id, 0.0) for device_id in devices } # Calculate reduction for this 5-second interval # interval_hours = 5.0 / 3600.0 = 0.00139 hours interval_reduction_kwh = sum(device_powers.values()) * (5.0 / 3600.0) total_reduction_kwh += interval_reduction_kwh sample_count += 1 # Store sample in MongoDB (every sample to maintain accuracy) sample = { "timestamp": datetime.utcnow(), "device_powers": device_powers, "interval_reduction_kwh": interval_reduction_kwh } await self.db.demand_response_events.update_one( {"event_id": event_id}, { "$push": {"power_samples": sample}, "$set": {"actual_reduction_kw": total_reduction_kwh} } ) # Update Redis cache for fast access to current reduction cache_key = f"dr:event:active:{event_id}" await self.redis.setex( cache_key, 300, # 5 minute TTL json.dumps({ "event_id": event_id, "current_reduction_kwh": total_reduction_kwh, "devices": device_powers, "last_update": datetime.utcnow().isoformat() }, default=str) ) # Publish progress every 10 samples (50 seconds) if sample_count % 10 == 0: await self.redis.publish("dr_events", json.dumps({ "event": "event_progress", "event_id": event_id, "total_reduction_kwh": round(total_reduction_kwh, 3), "device_powers": device_powers, "timestamp": datetime.utcnow().isoformat() })) logger.info(f"Event {event_id} progress: {total_reduction_kwh:.3f} kWh ({sample_count} samples)") # Sleep for 5 seconds await asyncio.sleep(5) # Event completed successfully logger.info(f"Event {event_id} completed with {total_reduction_kwh:.3f} kWh reduction") await self._complete_event(event_id, total_reduction_kwh) except asyncio.CancelledError: logger.info(f"Event {event_id} cancelled by user") await self._cancel_event(event_id) raise except Exception as e: logger.error(f"Error in event {event_id}: {e}", exc_info=True) await self._cancel_event(event_id) async def _complete_event(self, event_id: str, total_reduction_kwh: float): """Mark event as completed""" await self.db.demand_response_events.update_one( {"event_id": event_id}, { "$set": { "status": "completed", "actual_end_time": datetime.utcnow(), "actual_reduction_kw": total_reduction_kwh } } ) # Remove from active events self.active_events.pop(event_id, None) # Clear cache await self.redis.delete(f"dr:event:active:{event_id}") # Publish completion await self.redis.publish("dr_events", json.dumps({ "event": "event_completed", "event_id": event_id, "total_reduction_kwh": total_reduction_kwh })) logger.info(f"DR event {event_id} marked as completed") async def _cancel_event(self, event_id: str): """Internal method to cancel an event""" await self.db.demand_response_events.update_one( {"event_id": event_id}, { "$set": { "status": "cancelled", "cancelled_at": datetime.utcnow() } } ) self.active_events.pop(event_id, None) await self.redis.delete(f"dr:event:active:{event_id}") # Publish cancellation await self.redis.publish("dr_events", json.dumps({ "event": "event_cancelled", "event_id": event_id, "timestamp": datetime.utcnow().isoformat() })) async def cancel_event(self, event_id: str): """ Public method to cancel a running DR event gracefully """ logger.info(f"Cancelling DR event {event_id}") # Cancel the async task task = self.active_events.get(event_id) if task and not task.done(): task.cancel() try: await task except asyncio.CancelledError: # Expected - task cancelled successfully pass except Exception as e: logger.error(f"Error cancelling event task {event_id}: {e}") # Update database status (if not already done by _cancel_event) event = await self.db.demand_response_events.find_one({"event_id": event_id}) if event and event.get("status") != "cancelled": await self._cancel_event(event_id) logger.info(f"DR event {event_id} cancelled successfully") async def get_active_events(self) -> List[Dict[str, Any]]: """Get currently running events with real-time data""" cursor = self.db.demand_response_events.find({ "status": "active" }).sort("start_time", -1) events = [] async for event in cursor: event["_id"] = str(event["_id"]) # Add real-time data from cache cache_key = f"dr:event:active:{event['event_id']}" cached = await self.redis.get(cache_key) if cached: realtime_data = json.loads(cached) event["current_reduction_kwh"] = realtime_data.get("current_reduction_kwh") event["current_device_powers"] = realtime_data.get("devices") events.append(event) return events # ===== DEVICE POWER INTEGRATION ===== def update_device_power_cache(self, device_id: str, power_kw: float): """ Update device power cache (called by Redis subscriber) This is synchronous because it's just updating a dict """ self.device_power_cache[device_id] = power_kw # No logging here to avoid spam (called every few seconds per device) async def get_device_power(self, device_id: str) -> float: """Get current power for a device from cache""" return self.device_power_cache.get(device_id, 0.0) # ===== AUTO-RESPONSE CONFIGURATION ===== async def get_auto_response_config(self) -> Dict[str, Any]: """Get auto-response configuration""" config = await self.db.auto_response_config.find_one({"config_id": "default"}) if not config: # Create default config default_config = { "config_id": "default", "enabled": False, "max_reduction_percentage": 20.0, "response_delay_seconds": 300, "min_notice_minutes": 60, "updated_at": datetime.utcnow() } await self.db.auto_response_config.insert_one(default_config) return default_config return config async def set_auto_response_config( self, enabled: bool, max_reduction_percentage: float = 20.0, response_delay_seconds: int = 300, min_notice_minutes: int = 60 ) -> Dict[str, Any]: """Update auto-response configuration""" await self.db.auto_response_config.update_one( {"config_id": "default"}, { "$set": { "enabled": enabled, "max_reduction_percentage": max_reduction_percentage, "response_delay_seconds": response_delay_seconds, "min_notice_minutes": min_notice_minutes, "updated_at": datetime.utcnow() } }, upsert=True ) # Clear cache await self.redis.delete("dr:config:auto_response") logger.info(f"Auto-response config updated: enabled={enabled}") return await self.get_auto_response_config() async def process_auto_responses(self): """ Process pending invitations with auto-response (called by background task) """ # Get auto-response configuration auto_config = await self.get_auto_response_config() if not auto_config.get("enabled"): return # Find unanswered invitations invitations = await self.get_unanswered_invitations() for invitation in invitations: event_id = invitation["event_id"] event_time = invitation["event_time"] # Parse event_time (might be string from cache) if isinstance(event_time, str): event_time = datetime.fromisoformat(event_time.replace('Z', '+00:00')) # Check if event is within auto-response criteria time_until_event = (event_time - datetime.utcnow()).total_seconds() / 60 # minutes min_notice = auto_config.get("min_notice_minutes", 60) if time_until_event >= min_notice: logger.info(f"Auto-responding to invitation {event_id}") # Auto-accept for all devices for device_id in invitation["iots"]: # Check if already responded existing = await self.db.demand_response_responses.find_one({ "event_id": event_id, "device_id": device_id }) if not existing: # Get device current power device_power = await self.get_device_power(device_id) # Calculate committed reduction based on max_reduction_percentage max_reduction_pct = auto_config.get("max_reduction_percentage", 20.0) committed_reduction = device_power * (max_reduction_pct / 100) if device_power > 0 else 0.5 # Submit auto-response try: await self.answer_invitation(event_id, device_id, "YES", committed_reduction) logger.info(f"Auto-accepted for device {device_id} with {committed_reduction:.2f} kW commitment") except Exception as e: logger.error(f"Error auto-responding for {device_id}: {e}") else: logger.debug(f"Invitation {event_id} too soon ({time_until_event:.0f}m < {min_notice}m)") # ===== BACKGROUND TASK SUPPORT ===== async def check_scheduled_events(self): """ Check for events that need to be started (called by scheduler task) """ now = datetime.utcnow() threshold = now + timedelta(minutes=1) # Start events within next minute # Find scheduled events that should start cursor = self.db.demand_response_events.find({ "status": "scheduled", "start_time": {"$lte": threshold, "$gte": now} }) async for event in cursor: event_id = event["event_id"] # Check if not already active if event_id not in self.active_events: logger.info(f"Starting scheduled DR event {event_id}") await self.execute_event(event_id) # ===== BASIC FLEXIBILITY CALCULATION ===== async def get_current_flexibility(self) -> Dict[str, Any]: """ Calculate current available flexibility from device power cache """ total_flexibility_kw = 0.0 devices = [] # Get all devices with instructions cursor = self.db.device_instructions.find({}) current_hour = datetime.utcnow().hour async for device_doc in cursor: device_id = device_doc["device_id"] instruction = device_doc["instructions"].get(str(current_hour), "off") if instruction != "off": # Get device current power from cache device_power = self.device_power_cache.get(device_id, 0.0) if instruction == "participation": # Full flexibility (100%) flexibility = device_power elif instruction == "shifting": # Partial flexibility (20%) flexibility = device_power * 0.20 else: flexibility = 0.0 if flexibility > 0: devices.append({ "device_id": device_id, "available_kw": round(flexibility, 2), "instruction": instruction, "current_power": round(device_power, 2) }) total_flexibility_kw += flexibility snapshot = { "timestamp": datetime.utcnow(), "total_flexibility_kw": round(total_flexibility_kw, 2), "devices": devices } # Store snapshot await self.db.flexibility_snapshots.insert_one(dict(snapshot)) # Cache for 5 minutes await self.redis.setex( "dr:flexibility:current", 300, json.dumps(snapshot, default=str) ) return snapshot async def get_device_instructions(self, device_id: Optional[str] = None) -> Dict[str, Any]: """Get DR instructions for device(s)""" if device_id: doc = await self.db.device_instructions.find_one({"device_id": device_id}) return doc if doc else {"device_id": device_id, "instructions": {}} else: cursor = self.db.device_instructions.find({}) instructions = {} async for doc in cursor: instructions[doc["device_id"]] = doc["instructions"] return instructions async def update_device_instructions(self, device_id: str, instructions: Dict[str, str]): """Update hourly instructions for a device""" await self.db.device_instructions.update_one( {"device_id": device_id}, { "$set": { "instructions": instructions, "updated_at": datetime.utcnow() } }, upsert=True ) logger.info(f"Updated instructions for device {device_id}") # ===== ANALYTICS ===== async def get_performance_analytics(self, days: int = 30) -> Dict[str, Any]: """Get DR performance analytics""" start_date = datetime.utcnow() - timedelta(days=days) # Query completed events cursor = self.db.demand_response_events.find({ "status": "completed", "start_time": {"$gte": start_date} }) events = await cursor.to_list(length=None) if not events: return { "period_days": days, "total_events": 0, "total_reduction_kwh": 0.0, "total_target_kwh": 0.0, "average_reduction_kwh": 0.0, "achievement_rate": 0.0, "average_event_duration_minutes": 59 } total_reduction = sum(e.get("actual_reduction_kw", 0) for e in events) total_target = sum(e.get("target_reduction_kw", 0) for e in events) return { "period_days": days, "total_events": len(events), "total_reduction_kwh": round(total_reduction, 2), "total_target_kwh": round(total_target, 2), "average_reduction_kwh": round(total_reduction / len(events), 2), "achievement_rate": round((total_reduction / total_target * 100) if total_target > 0 else 0, 2), "average_event_duration_minutes": 59 }