# Demand Response System - Architecture & Logic Documentation ## Table of Contents 1. [System Overview](#system-overview) 2. [Component Locations](#component-locations) 3. [Architecture & Data Flow](#architecture--data-flow) 4. [Key Components](#key-components) 5. [Invitation Lifecycle](#invitation-lifecycle) 6. [Integration Points](#integration-points) 7. [API Reference](#api-reference) 8. [Complete Event Flow Example](#complete-event-flow-example) --- ## System Overview The IoT Building Monitoring system includes a comprehensive Demand Response (DR) management system that enables buildings to participate in grid flexibility programs by reducing power consumption during peak demand periods. **Key Capabilities:** - Create and manage DR invitations with target load reductions - Auto-accept or manual approval of DR events - Track power reduction in real-time during events - Calculate financial benefits from DR participation - Forecast available flexibility by device and time - Configure device-specific DR participation instructions --- ## Component Locations ### Core Components | Component | Path | Purpose | |-----------|------|---------| | **Service Layer** | `services/DemandResponseService.py` | Business logic for DR operations | | **Database Layer** | `database/DemandResponseRepository.py` | MongoDB data access for DR | | **Execution Engine** | `core/DemandResponseAtuator.py` | Runs DR events, tracks power reduction | | **Main Orchestrator** | `core/Core.py` | Coordinates DR events and accumulates reduction | | **IoT Model** | `model/IoT.py` | Device configuration with DR capabilities | | **API Endpoints** | `api/main.py:230-329` | REST endpoints for DR operations | | **Configuration** | `config/f.json` | System and device configuration | --- ## Architecture & Data Flow ### High-Level Architecture ``` ┌─────────────────────────────────────┐ │ REST API Endpoints │ │ (Flask: api/main.py) │ │ - /invitation/* │ │ - /event/check │ │ - /dr/benefit │ └──────────────┬──────────────────────┘ │ ▼ ┌─────────────────────────────────────┐ │ DemandResponseService │ │ (Business Logic Layer) │ │ - Invitation management │ │ - Auto-answer configuration │ │ - Response tracking │ └──────────────┬──────────────────────┘ │ ▼ ┌─────────────────────────────────────┐ │ DemandResponseRepository │ │ (Data Access Layer) │ │ - MongoDB operations │ │ - Query optimization │ └──────────────┬──────────────────────┘ │ ▼ ┌─────────────────────────────────────┐ │ MongoDB Collections │ │ - demand_response_invitations │ │ - config (auto_answer) │ │ - benefit (financial tracking) │ │ - instructions (hourly rules) │ └─────────────────────────────────────┘ ``` ### Execution Architecture ``` ┌──────────────────────────────────────┐ │ Core.py (Main Thread) │ │ - Manages IoT device fleet │ │ - Tracks dr_reduced_power │ │ - Calculates total flexibility │ └──────────────┬───────────────────────┘ │ │ schedule_event(time, iot) ▼ ┌──────────────────────────────────────┐ │ DemandResponseAtuator (New Thread) │ │ - Spawned per device per event │ │ - Runs for 1 hour (59 minutes) │ │ - Updates core.dr_reduced_power │ │ - Auto-terminates at event end │ └──────────────────────────────────────┘ ``` ### Data Models #### MongoDB Collection: `demand_response_invitations` ```json { "_id": "ObjectId", "datetime": "2025-12-10 13:45:32", // Invitation creation time "event_time": "2025-12-10 14:00:00", // When DR event occurs "load_kwh": 5.2, // Target reduction in kWh "load_percentage": 15.0, // Reduction as % of total load "iots": ["AC1", "AC2", "Lighting"], // Participating devices "response": "WAITING|YES|NO" // Participant decision } ``` #### MongoDB Collection: `config` ```json { "config": "config", "auto_answer": true // Auto-accept DR invitations } ``` #### MongoDB Collection: `benefit` ```json { "source": "dr", // "dr" or "p2p" "product": "AC1", // Device name "value": 5.50, // Financial benefit (€) "datetime": "2025-12-10 14:00:00" } ``` #### MongoDB Collection: `instructions` ```json { "AC1": { "0": "participation", // Hour 0: full DR participation "1": "shifting", // Hour 1: 0-20% participation "2": "off", // Hour 2: no DR participation "3": "participation", // ... hours 4-23 }, "AC2": { /* ... */ } } ``` --- ## Key Components ### 1. DemandResponseService **Location:** `services/DemandResponseService.py` **Responsibilities:** - Manages DR invitation lifecycle - Handles participant responses - Configures auto-accept behavior - Queries invitation status **Key Methods:** ```python def invitation(event_time, load_kwh, load_percentage, iots): """Create new DR invitation""" # Checks auto_answer config # Sets response to YES if auto-enabled, else WAITING # Stores in MongoDB via repository def answer_invitation(event_time, iot, response): """Record YES/NO response for specific device""" # Updates invitation response field # Used for manual acceptance workflow def get_unanswered_invitations(): """Get all pending invitations awaiting response""" # Returns invitations with response="WAITING" def get_answered_invitations(): """Get last 5 completed invitations""" # Returns historical invitations (YES/NO) def get_auto_answer_config(): """Check if auto-accept is enabled""" # Returns boolean from config collection def set_auto_answer_config(auto_answer): """Enable/disable auto-accept""" # Updates MongoDB config collection ``` **Auto-Accept Logic:** ```python # Line 35-38 in DemandResponseService.py if self.get_auto_answer_config(): response = "YES" # Auto-accept enabled else: response = "WAITING" # Require manual approval ``` --- ### 2. DemandResponseRepository **Location:** `database/DemandResponseRepository.py` **Responsibilities:** - Direct MongoDB operations - Query optimization and filtering - Data persistence **Key Methods:** ```python def insert_invitation(datetime, event_time, load_kwh, load_percentage, iots, response): """Store new DR invitation in MongoDB""" def answer_invitation(event_time, iot, response): """Update invitation response status""" # Updates document where event_time matches and iot in iots array def get_unanswered_invitations(): """Query: {response: "WAITING"}""" def get_answered_invitations(): """Query: {response: {$ne: "WAITING"}}, limit 5, sort by datetime desc""" def get_accepted_upcoming_invitations(): """Query: {response: "YES", event_time: {$gte: now}}""" def get_invitation(event_time): """Find specific invitation by event time""" ``` --- ### 3. DemandResponseAtuator **Location:** `core/DemandResponseAtuator.py` **Responsibilities:** - Executes DR event for a single device - Runs as separate thread during event - Accumulates power reduction in real-time - Auto-terminates after 1 hour **Architecture:** ```python class DemandResponseAtuator(Thread): def __init__(self, core, iot): self.core = core # Reference to Core instance self.iot = iot # IoT device participating in DR self.event_on = True # Event active flag def run(self): # Schedule event end at 59 minutes from now end_time = (datetime.now() + timedelta(minutes=59)) end_time_formatted = end_time.strftime('%H:%M:%S') schedule.every().day.at(end_time_formatted).do(self.end_event) # Main loop: accumulate power reduction every second while self.event_on: # Add device's current power to reduction accumulator self.core.dr_reduced_power += self.iot.get_power() schedule.run_pending() time.sleep(1) def end_event(self): """Called automatically at event end""" self.event_on = False return schedule.CancelJob ``` **Key Characteristics:** - **Threading Model:** One thread per device per event - **Update Frequency:** Every 1 second - **Duration:** Exactly 59 minutes (scheduled termination) - **Power Tracking:** Cumulative reduction added to `core.dr_reduced_power` --- ### 4. Core (Main Orchestrator) **Location:** `core/Core.py` **DR-Related Attributes:** ```python class Core(Thread): def __init__(self): self.dr_reduced_power = 0.0 # Accumulator for power reduction self.iots_consumption = [] # List of controllable devices self.iots = [] # All IoT devices ``` **Key DR Methods:** ```python def schedule_event(self, event_time, iot_name): """Initiate DR event for specified device""" # Find device by name iot = [i for i in self.iots if i.name == iot_name][0] # Create and start DemandResponseAtuator thread dr = DemandResponseAtuator(self, iot) dr.start() def get_total_consumption(self): """Returns consumption MINUS DR reductions""" # Sum all device power totalPower = sum(iot.get_power() for iot in self.iots_consumption) # Subtract DR reduction reduce = self.dr_reduced_power self.dr_reduced_power = 0 # Reset accumulator return totalPower - reduce def get_total_flexibility(self): """Calculate available flexibility for DR""" # Sum power of devices with demandresponse=true return sum(iot.get_power() for iot in self.iots_consumption if iot.demandresponse) ``` **How Power Reduction Works:** 1. During DR event, `DemandResponseAtuator` continuously adds to `dr_reduced_power` 2. When `get_total_consumption()` is called, reduction is subtracted from total 3. `dr_reduced_power` is reset to 0 after each reading 4. This creates effective "virtual" power reduction in reported consumption --- ### 5. IoT Model **Location:** `model/IoT.py` **DR-Related Attributes:** ```python class IoT: def __init__(self, config): self.name = config['name'] self.demandresponse = config['control'].get('demandresponse', False) self.instructions = {} # Hourly DR instructions ``` **Configuration Example (config/f.json):** ```json { "resources": { "iots": [ { "name": "AC1", "type": "hvac", "uri": "http://192.168.2.91:30000/api/realtime/H_01/AC1", "control": { "demandresponse": true // Device can participate in DR } } ] } } ``` **DR-Capable Devices:** - AC1, AC2, AC3, AC4 (HVAC systems) - Water Heater - Lighting - Refrigerator **Instruction Types:** - `"participation"` - Full DR participation (100% reduction if needed) - `"shifting"` - Partial participation (0-20% reduction) - `"off"` - No DR participation for that hour --- ## Invitation Lifecycle ### 1. Create Invitation **Endpoint:** `POST /invitation/send` **Request:** ```json { "event_time": "2025-12-10 14:00:00", "kwh": 5.2, "percentage": 15, "iots": ["AC1", "AC2", "Lighting"] } ``` **Response:** ```json { "event_time": "2025-12-10 14:00:00" } ``` **Logic Flow:** 1. Validates event_time format 2. Checks auto_answer configuration 3. Sets response = "YES" if auto-enabled, else "WAITING" 4. Stores invitation in MongoDB 5. Returns event_time as confirmation --- ### 2. Check Invitation Status **Endpoint:** `POST /invitation/get` **Request:** ```json { "event_time": "2025-12-10 14:00:00" } ``` **Response:** ```json { "datetime": "2025-12-10 13:45:32", "event_time": "2025-12-10 14:00:00", "load_kwh": 5.2, "load_percentage": 15, "iots": ["AC1", "AC2", "Lighting"], "response": "WAITING" } ``` --- ### 3. Get Pending Invitations **Endpoint:** `GET /invitation/unanswered` **Response:** ```json [ { "datetime": "2025-12-10 13:45:32", "event_time": "2025-12-10 14:00:00", "load_kwh": 5.2, "load_percentage": 15, "iots": ["AC1", "AC2"], "response": "WAITING" }, { "datetime": "2025-12-10 14:20:15", "event_time": "2025-12-10 16:00:00", "load_kwh": 3.8, "load_percentage": 10, "iots": ["Water Heater"], "response": "WAITING" } ] ``` **Use Case:** Display pending DR invitations requiring participant decision --- ### 4. Answer Invitation **Endpoint:** `POST /invitation/answer` **Request:** ```json { "event_time": "2025-12-10 14:00:00", "iot": "AC1", "response": "YES" } ``` **Response:** ```json { "message": "answered" } ``` **Logic:** - Updates invitation document in MongoDB - Sets response field to "YES" or "NO" - Filters by event_time and iot in iots array - Enables manual approval workflow --- ### 5. Execute DR Event **Endpoint:** `POST /event/check` **Request:** ```json { "event_time": "2025-12-10 14:00:00", "iot": "AC1" } ``` **Logic Flow:** ```python 1. Receives event_time and iot name 2. Calls core.schedule_event(event_time, iot) 3. Core finds IoT device by name 4. Creates new DemandResponseAtuator(core, iot) 5. Starts thread → begins power reduction tracking 6. Thread runs for 59 minutes, accumulating reduction every second 7. Auto-terminates at scheduled end time ``` --- ### 6. Configure Auto-Accept **Get Config:** `GET /invitation/auto` **Response:** ```json { "auto_answer": true } ``` **Set Config:** `POST /invitation/auto` **Request:** ```json { "auto_answer": true } ``` **Response:** ```json { "auto_answer": true } ``` **Effect:** - When enabled: New invitations automatically set to response="YES" - When disabled: New invitations set to response="WAITING" (require manual approval) --- ## Integration Points ### 1. Energy Management **ForecastService** (`services/ForecastService.py`) - Calculates `forecast_flexibility()` based on historical data - Predicts available DR capacity for future periods - Uses flexibility data stored with hourly consumption/generation **Core.get_total_flexibility()** - Returns sum of power from DR-capable devices - Indicates current available flexibility - Accessible via `GET /energy/flexibility` ```python def get_total_flexibility(self): return sum(iot.get_power() for iot in self.iots_consumption if iot.demandresponse) ``` --- ### 2. Building Management **StoringManager** (`model/StoringManager.py`) - Stores hourly aggregates including flexibility - MongoDB collection: `TOTALPOWERHOUR` - Fields: `{datetime, consumption, generation, flexibility}` **BuildingRepository** (`database/BuildingRepository.py`) - `insert_hour()` stores flexibility alongside consumption/generation - Flexibility calculated as: `power * random(0-20%)` - Provides historical baseline for forecasting --- ### 3. Financial Tracking **EnergyService** (`services/EnergyService.py`) ```python def add_benefit(source, product, value): """Record financial benefit from DR or P2P""" # source: "dr" or "p2p" # product: device name # value: financial reward amount ``` **Record DR Benefit:** `POST /dr/benefit` **Request:** ```json { "iot": "AC1", "value": 5.50 } ``` **Storage:** ```json { "source": "dr", "product": "AC1", "value": 5.50, "datetime": "2025-12-10 14:00:00" } ``` **Monthly Benefits:** `GET /benefits/monthly` **Response:** ```json { "dr": 150.00, "p2p": 50.00 } ``` --- ### 4. IoT Device Control **IotService** (`services/IotService.py`) ```python def change_dr_enable(iot, enable): """Enable or disable DR capability for device""" iot.demandresponse = enable def update_instructions(instructions): """Set hourly DR participation instructions""" # Format: {iot_name: {hour: "participation|shifting|off"}} def get_instructions(): """Retrieve current DR instructions""" return {iot.name: iot.instructions for iot in iots} ``` **Update Instructions:** `POST /iot/instructions` **Request:** ```json { "AC1": { "0": "participation", "1": "shifting", "2": "off", "3": "participation" // ... hours 4-23 } } ``` **Forecasted Flexibility by Hour:** `POST /iots/forecast/flexibility` **Request:** ```json { "hour": 14 } ``` **Response:** ```json { "shifting": [["AC1", 50], ["AC2", 75]], // 0-20% participation "reducing": [["Water Heater", 100]] // Full participation } ``` --- ## API Reference ### Demand Response Endpoints | Method | Endpoint | Description | Request Body | Response | |--------|----------|-------------|--------------|----------| | POST | `/invitation/send` | Create DR invitation | `{event_time, kwh, percentage, iots}` | `{event_time}` | | POST | `/invitation/get` | Get specific invitation | `{event_time}` | Invitation object | | GET | `/invitation/unanswered` | Get pending invitations | None | Array of invitations | | GET | `/invitation/answered` | Get last 5 completed | None | Array of invitations | | POST | `/invitation/answer` | Submit response | `{event_time, iot, response}` | `{message: "answered"}` | | GET | `/invitation/auto` | Get auto-accept config | None | `{auto_answer: boolean}` | | POST | `/invitation/auto` | Set auto-accept config | `{auto_answer: boolean}` | `{auto_answer: boolean}` | | POST | `/event/check` | Execute DR event | `{event_time, iot}` | Success status | | POST | `/dr/benefit` | Record DR benefit | `{iot, value}` | `{message: "ok"}` | ### Related Flexibility/Energy Endpoints | Method | Endpoint | Description | |--------|----------|-------------| | GET | `/energy/now` | Current consumption, generation, flexibility | | GET | `/energy/flexibility` | Available flexibility for DR | | GET | `/forecast/flexibility` | Forecasted flexibility | | POST | `/iots/forecast/flexibility` | Flexibility by hour and device | | POST | `/iot/demandresponse/enable` | Enable/disable device DR | | POST | `/iot/instructions` | Update DR instructions | | GET | `/iot/instructions` | Get current instructions | | GET | `/benefits/monthly` | Monthly DR benefits | --- ## Complete Event Flow Example ### Scenario: 10 kWh Reduction Event at 2:00 PM ``` ┌─────────────────────────────────────────────────────────────┐ │ STEP 1: Create Invitation (1:45 PM) │ └─────────────────────────────────────────────────────────────┘ POST /invitation/send { "event_time": "2025-12-10 14:00:00", "kwh": 10, "percentage": 20, "iots": ["AC1", "AC2", "Water Heater"] } Flow: ├─ DemandResponseService.invitation() ├─ Checks auto_answer config → disabled (false) ├─ Sets response = "WAITING" ├─ DemandResponseRepository.insert_invitation() └─ MongoDB: Creates invitation document Result: Invitation stored, awaiting participant approval ┌─────────────────────────────────────────────────────────────┐ │ STEP 2: Check Pending Invitations (1:50 PM) │ └─────────────────────────────────────────────────────────────┘ GET /invitation/unanswered Response: [ { "datetime": "2025-12-10 13:45:32", "event_time": "2025-12-10 14:00:00", "load_kwh": 10, "load_percentage": 20, "iots": ["AC1", "AC2", "Water Heater"], "response": "WAITING" } ] ┌─────────────────────────────────────────────────────────────┐ │ STEP 3: Answer Invitation for Each Device (1:55 PM) │ └─────────────────────────────────────────────────────────────┘ POST /invitation/answer {"event_time": "2025-12-10 14:00:00", "iot": "AC1", "response": "YES"} POST /invitation/answer {"event_time": "2025-12-10 14:00:00", "iot": "AC2", "response": "YES"} POST /invitation/answer {"event_time": "2025-12-10 14:00:00", "iot": "Water Heater", "response": "NO"} Flow per request: ├─ DemandResponseService.answer_invitation() ├─ DemandResponseRepository.answer_invitation() └─ MongoDB: Updates invitation.response for specified iot Result: AC1 and AC2 accepted, Water Heater declined ┌─────────────────────────────────────────────────────────────┐ │ STEP 4: Execute DR Event (2:00 PM - Event Start) │ └─────────────────────────────────────────────────────────────┘ POST /event/check {"event_time": "2025-12-10 14:00:00", "iot": "AC1"} POST /event/check {"event_time": "2025-12-10 14:00:00", "iot": "AC2"} Flow per request: ├─ Core.schedule_event("2025-12-10 14:00:00", "AC1") ├─ Finds IoT device: iot = [i for i in core.iots if i.name == "AC1"][0] ├─ Creates DemandResponseAtuator(core, iot) └─ Starts thread DemandResponseAtuator.run(): ├─ Schedules end_event() at 14:59:00 └─ While loop (every 1 second for 59 minutes): └─ core.dr_reduced_power += iot.get_power() Result: Two threads running, accumulating power reduction ┌─────────────────────────────────────────────────────────────┐ │ STEP 5: Monitor Energy (2:30 PM - During Event) │ └─────────────────────────────────────────────────────────────┘ GET /energy/now Flow: ├─ Core.get_total_consumption() ├─ totalPower = sum(iot.get_power() for iot in iots_consumption) ├─ totalPower = 50 kW (all devices) ├─ reduce = core.dr_reduced_power = 8 kW (accumulated from AC1+AC2) ├─ core.dr_reduced_power = 0 # Reset └─ return 50 - 8 = 42 kW Response: { "consumption": 42.0, // Reduced by DR "generation": 15.0, "flexibility": 18.0 } Result: Consumption appears 8 kW lower due to DR reduction ┌─────────────────────────────────────────────────────────────┐ │ STEP 6: Automatic Event End (2:59 PM) │ └─────────────────────────────────────────────────────────────┘ Scheduled Task Triggered: ├─ DemandResponseAtuator.end_event() called ├─ self.event_on = False ├─ Thread exits while loop └─ Thread terminates Result: Both AC1 and AC2 threads stopped, DR event complete ┌─────────────────────────────────────────────────────────────┐ │ STEP 7: Record Financial Benefit (3:00 PM) │ └─────────────────────────────────────────────────────────────┘ POST /dr/benefit {"iot": "AC1", "value": 5.50} POST /dr/benefit {"iot": "AC2", "value": 4.75} Flow per request: ├─ EnergyService.add_benefit("dr", iot, value) ├─ FinancialRepository.insert_benefit() └─ MongoDB.benefit: {source: "dr", product: iot, value: value, datetime: now} Result: Total DR benefit = €10.25 ┌─────────────────────────────────────────────────────────────┐ │ STEP 8: Hourly Storage (3:00 PM - End of Hour) │ └─────────────────────────────────────────────────────────────┘ StoringManager.save_hour() (automatic): ├─ BuildingService.save_last_hour() ├─ Calculates flexibility = power * random(0-20%) ├─ BuildingRepository.insert_hour() └─ MongoDB.TOTALPOWERHOUR: { datetime: "2025-12-10 14:00:00", consumption: 42.0, // Average during hour (with DR reduction) generation: 15.0, flexibility: 7.8 } Result: Hour data stored with DR-reduced consumption ┌─────────────────────────────────────────────────────────────┐ │ STEP 9: View Monthly Benefits (End of Month) │ └─────────────────────────────────────────────────────────────┘ GET /benefits/monthly Response: { "dr": 185.50, // Total DR benefits for month "p2p": 62.30 // Total P2P benefits for month } Result: Financial tracking shows €185.50 earned from DR participation ``` --- ## Key Metrics & Statistics | Metric | Value | Source | |--------|-------|--------| | **Update Frequency** | 1 second | DemandResponseAtuator.run() | | **Event Duration** | 59 minutes | Scheduled termination | | **Storage Frequency** | Every hour | StoringManager | | **DR-Capable Devices** | 8 devices | config/f.json | | **Threading Model** | 1 thread per device per event | Core.schedule_event() | | **Database** | MongoDB (H01, BuildingRightSide) | Multiple collections | | **API Framework** | Flask with CORS | api/main.py | | **Flexibility Calculation** | 0-20% of device power | Based on instructions | --- ## Configuration Reference ### Device Configuration (config/f.json) ```json { "app": { "dr_events_auto_accept": 1, // 1=enabled, 0=disabled "monitoring": 0 // Debug logging }, "storage": { "local": { "demand_response": ["H01", "demand_response_invitations"], "config": ["H01", "config"], "benefit": ["BuildingRightSide", "benefit"], "instructions": ["H01", "instructions"] } }, "resources": { "iots": [ { "name": "AC1", "type": "hvac", "uri": "http://192.168.2.91:30000/api/realtime/H_01/AC1", "control": { "demandresponse": true } } // ... more devices ] } } ``` ### MongoDB Database Structure ``` Database: H01 ├─ demand_response_invitations (DR events) ├─ config (auto_answer setting) ├─ instructions (hourly participation rules) └─ TOTALPOWERHOUR (hourly aggregates) Database: BuildingRightSide └─ benefit (financial tracking) ``` --- ## Summary The Demand Response system is a comprehensive, multi-threaded solution that enables building participation in grid flexibility programs. It features: - **Automatic or Manual Approval:** Configurable auto-accept or manual review workflow - **Real-Time Power Tracking:** Per-device threads accumulate power reduction every second - **Financial Benefit Tracking:** Source-based tracking (DR vs P2P) with monthly aggregation - **Flexibility Forecasting:** Historical data and hourly instructions for predictive planning - **Device-Level Control:** Per-device, per-hour participation configuration - **MongoDB Persistence:** Scalable data storage with optimized queries - **REST API:** Complete API for external integration and control - **Thread Safety:** Separate threads per device prevent interference **Critical Files:** - **services/DemandResponseService.py:35-38** - Auto-accept logic - **core/DemandResponseAtuator.py:run()** - Power reduction accumulation - **core/Core.py:get_total_consumption()** - DR-reduced consumption calculation - **api/main.py:230-329** - All DR endpoints This architecture enables scalable, reliable demand response management with precise power tracking and financial incentive tracking.