29 KiB
Demand Response System - Architecture & Logic Documentation
Table of Contents
- System Overview
- Component Locations
- Architecture & Data Flow
- Key Components
- Invitation Lifecycle
- Integration Points
- API Reference
- Complete Event Flow Example
System Overview
The IoT Building Monitoring system includes a comprehensive Demand Response (DR) management system that enables buildings to participate in grid flexibility programs by reducing power consumption during peak demand periods.
Key Capabilities:
- Create and manage DR invitations with target load reductions
- Auto-accept or manual approval of DR events
- Track power reduction in real-time during events
- Calculate financial benefits from DR participation
- Forecast available flexibility by device and time
- Configure device-specific DR participation instructions
Component Locations
Core Components
| Component | Path | Purpose |
|---|---|---|
| Service Layer | services/DemandResponseService.py |
Business logic for DR operations |
| Database Layer | database/DemandResponseRepository.py |
MongoDB data access for DR |
| Execution Engine | core/DemandResponseAtuator.py |
Runs DR events, tracks power reduction |
| Main Orchestrator | core/Core.py |
Coordinates DR events and accumulates reduction |
| IoT Model | model/IoT.py |
Device configuration with DR capabilities |
| API Endpoints | api/main.py:230-329 |
REST endpoints for DR operations |
| Configuration | config/f.json |
System and device configuration |
Architecture & Data Flow
High-Level Architecture
┌─────────────────────────────────────┐
│ REST API Endpoints │
│ (Flask: api/main.py) │
│ - /invitation/* │
│ - /event/check │
│ - /dr/benefit │
└──────────────┬──────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ DemandResponseService │
│ (Business Logic Layer) │
│ - Invitation management │
│ - Auto-answer configuration │
│ - Response tracking │
└──────────────┬──────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ DemandResponseRepository │
│ (Data Access Layer) │
│ - MongoDB operations │
│ - Query optimization │
└──────────────┬──────────────────────┘
│
▼
┌─────────────────────────────────────┐
│ MongoDB Collections │
│ - demand_response_invitations │
│ - config (auto_answer) │
│ - benefit (financial tracking) │
│ - instructions (hourly rules) │
└─────────────────────────────────────┘
Execution Architecture
┌──────────────────────────────────────┐
│ Core.py (Main Thread) │
│ - Manages IoT device fleet │
│ - Tracks dr_reduced_power │
│ - Calculates total flexibility │
└──────────────┬───────────────────────┘
│
│ schedule_event(time, iot)
▼
┌──────────────────────────────────────┐
│ DemandResponseAtuator (New Thread) │
│ - Spawned per device per event │
│ - Runs for 1 hour (59 minutes) │
│ - Updates core.dr_reduced_power │
│ - Auto-terminates at event end │
└──────────────────────────────────────┘
Data Models
MongoDB Collection: demand_response_invitations
{
"_id": "ObjectId",
"datetime": "2025-12-10 13:45:32", // Invitation creation time
"event_time": "2025-12-10 14:00:00", // When DR event occurs
"load_kwh": 5.2, // Target reduction in kWh
"load_percentage": 15.0, // Reduction as % of total load
"iots": ["AC1", "AC2", "Lighting"], // Participating devices
"response": "WAITING|YES|NO" // Participant decision
}
MongoDB Collection: config
{
"config": "config",
"auto_answer": true // Auto-accept DR invitations
}
MongoDB Collection: benefit
{
"source": "dr", // "dr" or "p2p"
"product": "AC1", // Device name
"value": 5.50, // Financial benefit (€)
"datetime": "2025-12-10 14:00:00"
}
MongoDB Collection: instructions
{
"AC1": {
"0": "participation", // Hour 0: full DR participation
"1": "shifting", // Hour 1: 0-20% participation
"2": "off", // Hour 2: no DR participation
"3": "participation",
// ... hours 4-23
},
"AC2": { /* ... */ }
}
Key Components
1. DemandResponseService
Location: services/DemandResponseService.py
Responsibilities:
- Manages DR invitation lifecycle
- Handles participant responses
- Configures auto-accept behavior
- Queries invitation status
Key Methods:
def invitation(event_time, load_kwh, load_percentage, iots):
"""Create new DR invitation"""
# Checks auto_answer config
# Sets response to YES if auto-enabled, else WAITING
# Stores in MongoDB via repository
def answer_invitation(event_time, iot, response):
"""Record YES/NO response for specific device"""
# Updates invitation response field
# Used for manual acceptance workflow
def get_unanswered_invitations():
"""Get all pending invitations awaiting response"""
# Returns invitations with response="WAITING"
def get_answered_invitations():
"""Get last 5 completed invitations"""
# Returns historical invitations (YES/NO)
def get_auto_answer_config():
"""Check if auto-accept is enabled"""
# Returns boolean from config collection
def set_auto_answer_config(auto_answer):
"""Enable/disable auto-accept"""
# Updates MongoDB config collection
Auto-Accept Logic:
# Line 35-38 in DemandResponseService.py
if self.get_auto_answer_config():
response = "YES" # Auto-accept enabled
else:
response = "WAITING" # Require manual approval
2. DemandResponseRepository
Location: database/DemandResponseRepository.py
Responsibilities:
- Direct MongoDB operations
- Query optimization and filtering
- Data persistence
Key Methods:
def insert_invitation(datetime, event_time, load_kwh, load_percentage, iots, response):
"""Store new DR invitation in MongoDB"""
def answer_invitation(event_time, iot, response):
"""Update invitation response status"""
# Updates document where event_time matches and iot in iots array
def get_unanswered_invitations():
"""Query: {response: "WAITING"}"""
def get_answered_invitations():
"""Query: {response: {$ne: "WAITING"}}, limit 5, sort by datetime desc"""
def get_accepted_upcoming_invitations():
"""Query: {response: "YES", event_time: {$gte: now}}"""
def get_invitation(event_time):
"""Find specific invitation by event time"""
3. DemandResponseAtuator
Location: core/DemandResponseAtuator.py
Responsibilities:
- Executes DR event for a single device
- Runs as separate thread during event
- Accumulates power reduction in real-time
- Auto-terminates after 1 hour
Architecture:
class DemandResponseAtuator(Thread):
def __init__(self, core, iot):
self.core = core # Reference to Core instance
self.iot = iot # IoT device participating in DR
self.event_on = True # Event active flag
def run(self):
# Schedule event end at 59 minutes from now
end_time = (datetime.now() + timedelta(minutes=59))
end_time_formatted = end_time.strftime('%H:%M:%S')
schedule.every().day.at(end_time_formatted).do(self.end_event)
# Main loop: accumulate power reduction every second
while self.event_on:
# Add device's current power to reduction accumulator
self.core.dr_reduced_power += self.iot.get_power()
schedule.run_pending()
time.sleep(1)
def end_event(self):
"""Called automatically at event end"""
self.event_on = False
return schedule.CancelJob
Key Characteristics:
- Threading Model: One thread per device per event
- Update Frequency: Every 1 second
- Duration: Exactly 59 minutes (scheduled termination)
- Power Tracking: Cumulative reduction added to
core.dr_reduced_power
4. Core (Main Orchestrator)
Location: core/Core.py
DR-Related Attributes:
class Core(Thread):
def __init__(self):
self.dr_reduced_power = 0.0 # Accumulator for power reduction
self.iots_consumption = [] # List of controllable devices
self.iots = [] # All IoT devices
Key DR Methods:
def schedule_event(self, event_time, iot_name):
"""Initiate DR event for specified device"""
# Find device by name
iot = [i for i in self.iots if i.name == iot_name][0]
# Create and start DemandResponseAtuator thread
dr = DemandResponseAtuator(self, iot)
dr.start()
def get_total_consumption(self):
"""Returns consumption MINUS DR reductions"""
# Sum all device power
totalPower = sum(iot.get_power() for iot in self.iots_consumption)
# Subtract DR reduction
reduce = self.dr_reduced_power
self.dr_reduced_power = 0 # Reset accumulator
return totalPower - reduce
def get_total_flexibility(self):
"""Calculate available flexibility for DR"""
# Sum power of devices with demandresponse=true
return sum(iot.get_power() for iot in self.iots_consumption
if iot.demandresponse)
How Power Reduction Works:
- During DR event,
DemandResponseAtuatorcontinuously adds todr_reduced_power - When
get_total_consumption()is called, reduction is subtracted from total dr_reduced_poweris reset to 0 after each reading- This creates effective "virtual" power reduction in reported consumption
5. IoT Model
Location: model/IoT.py
DR-Related Attributes:
class IoT:
def __init__(self, config):
self.name = config['name']
self.demandresponse = config['control'].get('demandresponse', False)
self.instructions = {} # Hourly DR instructions
Configuration Example (config/f.json):
{
"resources": {
"iots": [
{
"name": "AC1",
"type": "hvac",
"uri": "http://192.168.2.91:30000/api/realtime/H_01/AC1",
"control": {
"demandresponse": true // Device can participate in DR
}
}
]
}
}
DR-Capable Devices:
- AC1, AC2, AC3, AC4 (HVAC systems)
- Water Heater
- Lighting
- Refrigerator
Instruction Types:
"participation"- Full DR participation (100% reduction if needed)"shifting"- Partial participation (0-20% reduction)"off"- No DR participation for that hour
Invitation Lifecycle
1. Create Invitation
Endpoint: POST /invitation/send
Request:
{
"event_time": "2025-12-10 14:00:00",
"kwh": 5.2,
"percentage": 15,
"iots": ["AC1", "AC2", "Lighting"]
}
Response:
{
"event_time": "2025-12-10 14:00:00"
}
Logic Flow:
- Validates event_time format
- Checks auto_answer configuration
- Sets response = "YES" if auto-enabled, else "WAITING"
- Stores invitation in MongoDB
- Returns event_time as confirmation
2. Check Invitation Status
Endpoint: POST /invitation/get
Request:
{
"event_time": "2025-12-10 14:00:00"
}
Response:
{
"datetime": "2025-12-10 13:45:32",
"event_time": "2025-12-10 14:00:00",
"load_kwh": 5.2,
"load_percentage": 15,
"iots": ["AC1", "AC2", "Lighting"],
"response": "WAITING"
}
3. Get Pending Invitations
Endpoint: GET /invitation/unanswered
Response:
[
{
"datetime": "2025-12-10 13:45:32",
"event_time": "2025-12-10 14:00:00",
"load_kwh": 5.2,
"load_percentage": 15,
"iots": ["AC1", "AC2"],
"response": "WAITING"
},
{
"datetime": "2025-12-10 14:20:15",
"event_time": "2025-12-10 16:00:00",
"load_kwh": 3.8,
"load_percentage": 10,
"iots": ["Water Heater"],
"response": "WAITING"
}
]
Use Case: Display pending DR invitations requiring participant decision
4. Answer Invitation
Endpoint: POST /invitation/answer
Request:
{
"event_time": "2025-12-10 14:00:00",
"iot": "AC1",
"response": "YES"
}
Response:
{
"message": "answered"
}
Logic:
- Updates invitation document in MongoDB
- Sets response field to "YES" or "NO"
- Filters by event_time and iot in iots array
- Enables manual approval workflow
5. Execute DR Event
Endpoint: POST /event/check
Request:
{
"event_time": "2025-12-10 14:00:00",
"iot": "AC1"
}
Logic Flow:
1. Receives event_time and iot name
2. Calls core.schedule_event(event_time, iot)
3. Core finds IoT device by name
4. Creates new DemandResponseAtuator(core, iot)
5. Starts thread → begins power reduction tracking
6. Thread runs for 59 minutes, accumulating reduction every second
7. Auto-terminates at scheduled end time
6. Configure Auto-Accept
Get Config: GET /invitation/auto
Response:
{
"auto_answer": true
}
Set Config: POST /invitation/auto
Request:
{
"auto_answer": true
}
Response:
{
"auto_answer": true
}
Effect:
- When enabled: New invitations automatically set to response="YES"
- When disabled: New invitations set to response="WAITING" (require manual approval)
Integration Points
1. Energy Management
ForecastService (services/ForecastService.py)
- Calculates
forecast_flexibility()based on historical data - Predicts available DR capacity for future periods
- Uses flexibility data stored with hourly consumption/generation
Core.get_total_flexibility()
- Returns sum of power from DR-capable devices
- Indicates current available flexibility
- Accessible via
GET /energy/flexibility
def get_total_flexibility(self):
return sum(iot.get_power() for iot in self.iots_consumption
if iot.demandresponse)
2. Building Management
StoringManager (model/StoringManager.py)
- Stores hourly aggregates including flexibility
- MongoDB collection:
TOTALPOWERHOUR - Fields:
{datetime, consumption, generation, flexibility}
BuildingRepository (database/BuildingRepository.py)
insert_hour()stores flexibility alongside consumption/generation- Flexibility calculated as:
power * random(0-20%) - Provides historical baseline for forecasting
3. Financial Tracking
EnergyService (services/EnergyService.py)
def add_benefit(source, product, value):
"""Record financial benefit from DR or P2P"""
# source: "dr" or "p2p"
# product: device name
# value: financial reward amount
Record DR Benefit: POST /dr/benefit
Request:
{
"iot": "AC1",
"value": 5.50
}
Storage:
{
"source": "dr",
"product": "AC1",
"value": 5.50,
"datetime": "2025-12-10 14:00:00"
}
Monthly Benefits: GET /benefits/monthly
Response:
{
"dr": 150.00,
"p2p": 50.00
}
4. IoT Device Control
IotService (services/IotService.py)
def change_dr_enable(iot, enable):
"""Enable or disable DR capability for device"""
iot.demandresponse = enable
def update_instructions(instructions):
"""Set hourly DR participation instructions"""
# Format: {iot_name: {hour: "participation|shifting|off"}}
def get_instructions():
"""Retrieve current DR instructions"""
return {iot.name: iot.instructions for iot in iots}
Update Instructions: POST /iot/instructions
Request:
{
"AC1": {
"0": "participation",
"1": "shifting",
"2": "off",
"3": "participation"
// ... hours 4-23
}
}
Forecasted Flexibility by Hour: POST /iots/forecast/flexibility
Request:
{
"hour": 14
}
Response:
{
"shifting": [["AC1", 50], ["AC2", 75]], // 0-20% participation
"reducing": [["Water Heater", 100]] // Full participation
}
API Reference
Demand Response Endpoints
| Method | Endpoint | Description | Request Body | Response |
|---|---|---|---|---|
| POST | /invitation/send |
Create DR invitation | {event_time, kwh, percentage, iots} |
{event_time} |
| POST | /invitation/get |
Get specific invitation | {event_time} |
Invitation object |
| GET | /invitation/unanswered |
Get pending invitations | None | Array of invitations |
| GET | /invitation/answered |
Get last 5 completed | None | Array of invitations |
| POST | /invitation/answer |
Submit response | {event_time, iot, response} |
{message: "answered"} |
| GET | /invitation/auto |
Get auto-accept config | None | {auto_answer: boolean} |
| POST | /invitation/auto |
Set auto-accept config | {auto_answer: boolean} |
{auto_answer: boolean} |
| POST | /event/check |
Execute DR event | {event_time, iot} |
Success status |
| POST | /dr/benefit |
Record DR benefit | {iot, value} |
{message: "ok"} |
Related Flexibility/Energy Endpoints
| Method | Endpoint | Description |
|---|---|---|
| GET | /energy/now |
Current consumption, generation, flexibility |
| GET | /energy/flexibility |
Available flexibility for DR |
| GET | /forecast/flexibility |
Forecasted flexibility |
| POST | /iots/forecast/flexibility |
Flexibility by hour and device |
| POST | /iot/demandresponse/enable |
Enable/disable device DR |
| POST | /iot/instructions |
Update DR instructions |
| GET | /iot/instructions |
Get current instructions |
| GET | /benefits/monthly |
Monthly DR benefits |
Complete Event Flow Example
Scenario: 10 kWh Reduction Event at 2:00 PM
┌─────────────────────────────────────────────────────────────┐
│ STEP 1: Create Invitation (1:45 PM) │
└─────────────────────────────────────────────────────────────┘
POST /invitation/send
{
"event_time": "2025-12-10 14:00:00",
"kwh": 10,
"percentage": 20,
"iots": ["AC1", "AC2", "Water Heater"]
}
Flow:
├─ DemandResponseService.invitation()
├─ Checks auto_answer config → disabled (false)
├─ Sets response = "WAITING"
├─ DemandResponseRepository.insert_invitation()
└─ MongoDB: Creates invitation document
Result: Invitation stored, awaiting participant approval
┌─────────────────────────────────────────────────────────────┐
│ STEP 2: Check Pending Invitations (1:50 PM) │
└─────────────────────────────────────────────────────────────┘
GET /invitation/unanswered
Response:
[
{
"datetime": "2025-12-10 13:45:32",
"event_time": "2025-12-10 14:00:00",
"load_kwh": 10,
"load_percentage": 20,
"iots": ["AC1", "AC2", "Water Heater"],
"response": "WAITING"
}
]
┌─────────────────────────────────────────────────────────────┐
│ STEP 3: Answer Invitation for Each Device (1:55 PM) │
└─────────────────────────────────────────────────────────────┘
POST /invitation/answer
{"event_time": "2025-12-10 14:00:00", "iot": "AC1", "response": "YES"}
POST /invitation/answer
{"event_time": "2025-12-10 14:00:00", "iot": "AC2", "response": "YES"}
POST /invitation/answer
{"event_time": "2025-12-10 14:00:00", "iot": "Water Heater", "response": "NO"}
Flow per request:
├─ DemandResponseService.answer_invitation()
├─ DemandResponseRepository.answer_invitation()
└─ MongoDB: Updates invitation.response for specified iot
Result: AC1 and AC2 accepted, Water Heater declined
┌─────────────────────────────────────────────────────────────┐
│ STEP 4: Execute DR Event (2:00 PM - Event Start) │
└─────────────────────────────────────────────────────────────┘
POST /event/check
{"event_time": "2025-12-10 14:00:00", "iot": "AC1"}
POST /event/check
{"event_time": "2025-12-10 14:00:00", "iot": "AC2"}
Flow per request:
├─ Core.schedule_event("2025-12-10 14:00:00", "AC1")
├─ Finds IoT device: iot = [i for i in core.iots if i.name == "AC1"][0]
├─ Creates DemandResponseAtuator(core, iot)
└─ Starts thread
DemandResponseAtuator.run():
├─ Schedules end_event() at 14:59:00
└─ While loop (every 1 second for 59 minutes):
└─ core.dr_reduced_power += iot.get_power()
Result: Two threads running, accumulating power reduction
┌─────────────────────────────────────────────────────────────┐
│ STEP 5: Monitor Energy (2:30 PM - During Event) │
└─────────────────────────────────────────────────────────────┘
GET /energy/now
Flow:
├─ Core.get_total_consumption()
├─ totalPower = sum(iot.get_power() for iot in iots_consumption)
├─ totalPower = 50 kW (all devices)
├─ reduce = core.dr_reduced_power = 8 kW (accumulated from AC1+AC2)
├─ core.dr_reduced_power = 0 # Reset
└─ return 50 - 8 = 42 kW
Response:
{
"consumption": 42.0, // Reduced by DR
"generation": 15.0,
"flexibility": 18.0
}
Result: Consumption appears 8 kW lower due to DR reduction
┌─────────────────────────────────────────────────────────────┐
│ STEP 6: Automatic Event End (2:59 PM) │
└─────────────────────────────────────────────────────────────┘
Scheduled Task Triggered:
├─ DemandResponseAtuator.end_event() called
├─ self.event_on = False
├─ Thread exits while loop
└─ Thread terminates
Result: Both AC1 and AC2 threads stopped, DR event complete
┌─────────────────────────────────────────────────────────────┐
│ STEP 7: Record Financial Benefit (3:00 PM) │
└─────────────────────────────────────────────────────────────┘
POST /dr/benefit
{"iot": "AC1", "value": 5.50}
POST /dr/benefit
{"iot": "AC2", "value": 4.75}
Flow per request:
├─ EnergyService.add_benefit("dr", iot, value)
├─ FinancialRepository.insert_benefit()
└─ MongoDB.benefit: {source: "dr", product: iot, value: value, datetime: now}
Result: Total DR benefit = €10.25
┌─────────────────────────────────────────────────────────────┐
│ STEP 8: Hourly Storage (3:00 PM - End of Hour) │
└─────────────────────────────────────────────────────────────┘
StoringManager.save_hour() (automatic):
├─ BuildingService.save_last_hour()
├─ Calculates flexibility = power * random(0-20%)
├─ BuildingRepository.insert_hour()
└─ MongoDB.TOTALPOWERHOUR: {
datetime: "2025-12-10 14:00:00",
consumption: 42.0, // Average during hour (with DR reduction)
generation: 15.0,
flexibility: 7.8
}
Result: Hour data stored with DR-reduced consumption
┌─────────────────────────────────────────────────────────────┐
│ STEP 9: View Monthly Benefits (End of Month) │
└─────────────────────────────────────────────────────────────┘
GET /benefits/monthly
Response:
{
"dr": 185.50, // Total DR benefits for month
"p2p": 62.30 // Total P2P benefits for month
}
Result: Financial tracking shows €185.50 earned from DR participation
Key Metrics & Statistics
| Metric | Value | Source |
|---|---|---|
| Update Frequency | 1 second | DemandResponseAtuator.run() |
| Event Duration | 59 minutes | Scheduled termination |
| Storage Frequency | Every hour | StoringManager |
| DR-Capable Devices | 8 devices | config/f.json |
| Threading Model | 1 thread per device per event | Core.schedule_event() |
| Database | MongoDB (H01, BuildingRightSide) | Multiple collections |
| API Framework | Flask with CORS | api/main.py |
| Flexibility Calculation | 0-20% of device power | Based on instructions |
Configuration Reference
Device Configuration (config/f.json)
{
"app": {
"dr_events_auto_accept": 1, // 1=enabled, 0=disabled
"monitoring": 0 // Debug logging
},
"storage": {
"local": {
"demand_response": ["H01", "demand_response_invitations"],
"config": ["H01", "config"],
"benefit": ["BuildingRightSide", "benefit"],
"instructions": ["H01", "instructions"]
}
},
"resources": {
"iots": [
{
"name": "AC1",
"type": "hvac",
"uri": "http://192.168.2.91:30000/api/realtime/H_01/AC1",
"control": {
"demandresponse": true
}
}
// ... more devices
]
}
}
MongoDB Database Structure
Database: H01
├─ demand_response_invitations (DR events)
├─ config (auto_answer setting)
├─ instructions (hourly participation rules)
└─ TOTALPOWERHOUR (hourly aggregates)
Database: BuildingRightSide
└─ benefit (financial tracking)
Summary
The Demand Response system is a comprehensive, multi-threaded solution that enables building participation in grid flexibility programs. It features:
- Automatic or Manual Approval: Configurable auto-accept or manual review workflow
- Real-Time Power Tracking: Per-device threads accumulate power reduction every second
- Financial Benefit Tracking: Source-based tracking (DR vs P2P) with monthly aggregation
- Flexibility Forecasting: Historical data and hourly instructions for predictive planning
- Device-Level Control: Per-device, per-hour participation configuration
- MongoDB Persistence: Scalable data storage with optimized queries
- REST API: Complete API for external integration and control
- Thread Safety: Separate threads per device prevent interference
Critical Files:
- services/DemandResponseService.py:35-38 - Auto-accept logic
- core/DemandResponseAtuator.py:run() - Power reduction accumulation
- core/Core.py:get_total_consumption() - DR-reduced consumption calculation
- api/main.py:230-329 - All DR endpoints
This architecture enables scalable, reliable demand response management with precise power tracking and financial incentive tracking.