Files
sac4cps-backend/demand-response-architecture.md
rafaeldpsilva 7547e6b229 demand response
2025-12-10 15:26:34 +00:00

29 KiB

Demand Response System - Architecture & Logic Documentation

Table of Contents

  1. System Overview
  2. Component Locations
  3. Architecture & Data Flow
  4. Key Components
  5. Invitation Lifecycle
  6. Integration Points
  7. API Reference
  8. Complete Event Flow Example

System Overview

The IoT Building Monitoring system includes a comprehensive Demand Response (DR) management system that enables buildings to participate in grid flexibility programs by reducing power consumption during peak demand periods.

Key Capabilities:

  • Create and manage DR invitations with target load reductions
  • Auto-accept or manual approval of DR events
  • Track power reduction in real-time during events
  • Calculate financial benefits from DR participation
  • Forecast available flexibility by device and time
  • Configure device-specific DR participation instructions

Component Locations

Core Components

Component Path Purpose
Service Layer services/DemandResponseService.py Business logic for DR operations
Database Layer database/DemandResponseRepository.py MongoDB data access for DR
Execution Engine core/DemandResponseAtuator.py Runs DR events, tracks power reduction
Main Orchestrator core/Core.py Coordinates DR events and accumulates reduction
IoT Model model/IoT.py Device configuration with DR capabilities
API Endpoints api/main.py:230-329 REST endpoints for DR operations
Configuration config/f.json System and device configuration

Architecture & Data Flow

High-Level Architecture

┌─────────────────────────────────────┐
│   REST API Endpoints                │
│   (Flask: api/main.py)              │
│   - /invitation/*                   │
│   - /event/check                    │
│   - /dr/benefit                     │
└──────────────┬──────────────────────┘
               │
               ▼
┌─────────────────────────────────────┐
│   DemandResponseService             │
│   (Business Logic Layer)            │
│   - Invitation management           │
│   - Auto-answer configuration       │
│   - Response tracking               │
└──────────────┬──────────────────────┘
               │
               ▼
┌─────────────────────────────────────┐
│   DemandResponseRepository          │
│   (Data Access Layer)               │
│   - MongoDB operations              │
│   - Query optimization              │
└──────────────┬──────────────────────┘
               │
               ▼
┌─────────────────────────────────────┐
│   MongoDB Collections               │
│   - demand_response_invitations     │
│   - config (auto_answer)            │
│   - benefit (financial tracking)    │
│   - instructions (hourly rules)     │
└─────────────────────────────────────┘

Execution Architecture

┌──────────────────────────────────────┐
│   Core.py (Main Thread)              │
│   - Manages IoT device fleet         │
│   - Tracks dr_reduced_power          │
│   - Calculates total flexibility     │
└──────────────┬───────────────────────┘
               │
               │ schedule_event(time, iot)
               ▼
┌──────────────────────────────────────┐
│  DemandResponseAtuator (New Thread)  │
│  - Spawned per device per event      │
│  - Runs for 1 hour (59 minutes)      │
│  - Updates core.dr_reduced_power     │
│  - Auto-terminates at event end      │
└──────────────────────────────────────┘

Data Models

MongoDB Collection: demand_response_invitations

{
  "_id": "ObjectId",
  "datetime": "2025-12-10 13:45:32",      // Invitation creation time
  "event_time": "2025-12-10 14:00:00",    // When DR event occurs
  "load_kwh": 5.2,                        // Target reduction in kWh
  "load_percentage": 15.0,                // Reduction as % of total load
  "iots": ["AC1", "AC2", "Lighting"],     // Participating devices
  "response": "WAITING|YES|NO"            // Participant decision
}

MongoDB Collection: config

{
  "config": "config",
  "auto_answer": true  // Auto-accept DR invitations
}

MongoDB Collection: benefit

{
  "source": "dr",               // "dr" or "p2p"
  "product": "AC1",             // Device name
  "value": 5.50,                // Financial benefit (€)
  "datetime": "2025-12-10 14:00:00"
}

MongoDB Collection: instructions

{
  "AC1": {
    "0": "participation",   // Hour 0: full DR participation
    "1": "shifting",        // Hour 1: 0-20% participation
    "2": "off",            // Hour 2: no DR participation
    "3": "participation",
    // ... hours 4-23
  },
  "AC2": { /* ... */ }
}

Key Components

1. DemandResponseService

Location: services/DemandResponseService.py

Responsibilities:

  • Manages DR invitation lifecycle
  • Handles participant responses
  • Configures auto-accept behavior
  • Queries invitation status

Key Methods:

def invitation(event_time, load_kwh, load_percentage, iots):
    """Create new DR invitation"""
    # Checks auto_answer config
    # Sets response to YES if auto-enabled, else WAITING
    # Stores in MongoDB via repository

def answer_invitation(event_time, iot, response):
    """Record YES/NO response for specific device"""
    # Updates invitation response field
    # Used for manual acceptance workflow

def get_unanswered_invitations():
    """Get all pending invitations awaiting response"""
    # Returns invitations with response="WAITING"

def get_answered_invitations():
    """Get last 5 completed invitations"""
    # Returns historical invitations (YES/NO)

def get_auto_answer_config():
    """Check if auto-accept is enabled"""
    # Returns boolean from config collection

def set_auto_answer_config(auto_answer):
    """Enable/disable auto-accept"""
    # Updates MongoDB config collection

Auto-Accept Logic:

# Line 35-38 in DemandResponseService.py
if self.get_auto_answer_config():
    response = "YES"  # Auto-accept enabled
else:
    response = "WAITING"  # Require manual approval

2. DemandResponseRepository

Location: database/DemandResponseRepository.py

Responsibilities:

  • Direct MongoDB operations
  • Query optimization and filtering
  • Data persistence

Key Methods:

def insert_invitation(datetime, event_time, load_kwh, load_percentage, iots, response):
    """Store new DR invitation in MongoDB"""

def answer_invitation(event_time, iot, response):
    """Update invitation response status"""
    # Updates document where event_time matches and iot in iots array

def get_unanswered_invitations():
    """Query: {response: "WAITING"}"""

def get_answered_invitations():
    """Query: {response: {$ne: "WAITING"}}, limit 5, sort by datetime desc"""

def get_accepted_upcoming_invitations():
    """Query: {response: "YES", event_time: {$gte: now}}"""

def get_invitation(event_time):
    """Find specific invitation by event time"""

3. DemandResponseAtuator

Location: core/DemandResponseAtuator.py

Responsibilities:

  • Executes DR event for a single device
  • Runs as separate thread during event
  • Accumulates power reduction in real-time
  • Auto-terminates after 1 hour

Architecture:

class DemandResponseAtuator(Thread):
    def __init__(self, core, iot):
        self.core = core          # Reference to Core instance
        self.iot = iot            # IoT device participating in DR
        self.event_on = True      # Event active flag

    def run(self):
        # Schedule event end at 59 minutes from now
        end_time = (datetime.now() + timedelta(minutes=59))
        end_time_formatted = end_time.strftime('%H:%M:%S')
        schedule.every().day.at(end_time_formatted).do(self.end_event)

        # Main loop: accumulate power reduction every second
        while self.event_on:
            # Add device's current power to reduction accumulator
            self.core.dr_reduced_power += self.iot.get_power()
            schedule.run_pending()
            time.sleep(1)

    def end_event(self):
        """Called automatically at event end"""
        self.event_on = False
        return schedule.CancelJob

Key Characteristics:

  • Threading Model: One thread per device per event
  • Update Frequency: Every 1 second
  • Duration: Exactly 59 minutes (scheduled termination)
  • Power Tracking: Cumulative reduction added to core.dr_reduced_power

4. Core (Main Orchestrator)

Location: core/Core.py

DR-Related Attributes:

class Core(Thread):
    def __init__(self):
        self.dr_reduced_power = 0.0  # Accumulator for power reduction
        self.iots_consumption = []   # List of controllable devices
        self.iots = []               # All IoT devices

Key DR Methods:

def schedule_event(self, event_time, iot_name):
    """Initiate DR event for specified device"""
    # Find device by name
    iot = [i for i in self.iots if i.name == iot_name][0]

    # Create and start DemandResponseAtuator thread
    dr = DemandResponseAtuator(self, iot)
    dr.start()

def get_total_consumption(self):
    """Returns consumption MINUS DR reductions"""
    # Sum all device power
    totalPower = sum(iot.get_power() for iot in self.iots_consumption)

    # Subtract DR reduction
    reduce = self.dr_reduced_power
    self.dr_reduced_power = 0  # Reset accumulator

    return totalPower - reduce

def get_total_flexibility(self):
    """Calculate available flexibility for DR"""
    # Sum power of devices with demandresponse=true
    return sum(iot.get_power() for iot in self.iots_consumption
               if iot.demandresponse)

How Power Reduction Works:

  1. During DR event, DemandResponseAtuator continuously adds to dr_reduced_power
  2. When get_total_consumption() is called, reduction is subtracted from total
  3. dr_reduced_power is reset to 0 after each reading
  4. This creates effective "virtual" power reduction in reported consumption

5. IoT Model

Location: model/IoT.py

DR-Related Attributes:

class IoT:
    def __init__(self, config):
        self.name = config['name']
        self.demandresponse = config['control'].get('demandresponse', False)
        self.instructions = {}  # Hourly DR instructions

Configuration Example (config/f.json):

{
  "resources": {
    "iots": [
      {
        "name": "AC1",
        "type": "hvac",
        "uri": "http://192.168.2.91:30000/api/realtime/H_01/AC1",
        "control": {
          "demandresponse": true  // Device can participate in DR
        }
      }
    ]
  }
}

DR-Capable Devices:

  • AC1, AC2, AC3, AC4 (HVAC systems)
  • Water Heater
  • Lighting
  • Refrigerator

Instruction Types:

  • "participation" - Full DR participation (100% reduction if needed)
  • "shifting" - Partial participation (0-20% reduction)
  • "off" - No DR participation for that hour

Invitation Lifecycle

1. Create Invitation

Endpoint: POST /invitation/send

Request:

{
  "event_time": "2025-12-10 14:00:00",
  "kwh": 5.2,
  "percentage": 15,
  "iots": ["AC1", "AC2", "Lighting"]
}

Response:

{
  "event_time": "2025-12-10 14:00:00"
}

Logic Flow:

  1. Validates event_time format
  2. Checks auto_answer configuration
  3. Sets response = "YES" if auto-enabled, else "WAITING"
  4. Stores invitation in MongoDB
  5. Returns event_time as confirmation

2. Check Invitation Status

Endpoint: POST /invitation/get

Request:

{
  "event_time": "2025-12-10 14:00:00"
}

Response:

{
  "datetime": "2025-12-10 13:45:32",
  "event_time": "2025-12-10 14:00:00",
  "load_kwh": 5.2,
  "load_percentage": 15,
  "iots": ["AC1", "AC2", "Lighting"],
  "response": "WAITING"
}

3. Get Pending Invitations

Endpoint: GET /invitation/unanswered

Response:

[
  {
    "datetime": "2025-12-10 13:45:32",
    "event_time": "2025-12-10 14:00:00",
    "load_kwh": 5.2,
    "load_percentage": 15,
    "iots": ["AC1", "AC2"],
    "response": "WAITING"
  },
  {
    "datetime": "2025-12-10 14:20:15",
    "event_time": "2025-12-10 16:00:00",
    "load_kwh": 3.8,
    "load_percentage": 10,
    "iots": ["Water Heater"],
    "response": "WAITING"
  }
]

Use Case: Display pending DR invitations requiring participant decision


4. Answer Invitation

Endpoint: POST /invitation/answer

Request:

{
  "event_time": "2025-12-10 14:00:00",
  "iot": "AC1",
  "response": "YES"
}

Response:

{
  "message": "answered"
}

Logic:

  • Updates invitation document in MongoDB
  • Sets response field to "YES" or "NO"
  • Filters by event_time and iot in iots array
  • Enables manual approval workflow

5. Execute DR Event

Endpoint: POST /event/check

Request:

{
  "event_time": "2025-12-10 14:00:00",
  "iot": "AC1"
}

Logic Flow:

1. Receives event_time and iot name
2. Calls core.schedule_event(event_time, iot)
3. Core finds IoT device by name
4. Creates new DemandResponseAtuator(core, iot)
5. Starts thread  begins power reduction tracking
6. Thread runs for 59 minutes, accumulating reduction every second
7. Auto-terminates at scheduled end time

6. Configure Auto-Accept

Get Config: GET /invitation/auto

Response:

{
  "auto_answer": true
}

Set Config: POST /invitation/auto

Request:

{
  "auto_answer": true
}

Response:

{
  "auto_answer": true
}

Effect:

  • When enabled: New invitations automatically set to response="YES"
  • When disabled: New invitations set to response="WAITING" (require manual approval)

Integration Points

1. Energy Management

ForecastService (services/ForecastService.py)

  • Calculates forecast_flexibility() based on historical data
  • Predicts available DR capacity for future periods
  • Uses flexibility data stored with hourly consumption/generation

Core.get_total_flexibility()

  • Returns sum of power from DR-capable devices
  • Indicates current available flexibility
  • Accessible via GET /energy/flexibility
def get_total_flexibility(self):
    return sum(iot.get_power() for iot in self.iots_consumption
               if iot.demandresponse)

2. Building Management

StoringManager (model/StoringManager.py)

  • Stores hourly aggregates including flexibility
  • MongoDB collection: TOTALPOWERHOUR
  • Fields: {datetime, consumption, generation, flexibility}

BuildingRepository (database/BuildingRepository.py)

  • insert_hour() stores flexibility alongside consumption/generation
  • Flexibility calculated as: power * random(0-20%)
  • Provides historical baseline for forecasting

3. Financial Tracking

EnergyService (services/EnergyService.py)

def add_benefit(source, product, value):
    """Record financial benefit from DR or P2P"""
    # source: "dr" or "p2p"
    # product: device name
    # value: financial reward amount

Record DR Benefit: POST /dr/benefit

Request:

{
  "iot": "AC1",
  "value": 5.50
}

Storage:

{
  "source": "dr",
  "product": "AC1",
  "value": 5.50,
  "datetime": "2025-12-10 14:00:00"
}

Monthly Benefits: GET /benefits/monthly

Response:

{
  "dr": 150.00,
  "p2p": 50.00
}

4. IoT Device Control

IotService (services/IotService.py)

def change_dr_enable(iot, enable):
    """Enable or disable DR capability for device"""
    iot.demandresponse = enable

def update_instructions(instructions):
    """Set hourly DR participation instructions"""
    # Format: {iot_name: {hour: "participation|shifting|off"}}

def get_instructions():
    """Retrieve current DR instructions"""
    return {iot.name: iot.instructions for iot in iots}

Update Instructions: POST /iot/instructions

Request:

{
  "AC1": {
    "0": "participation",
    "1": "shifting",
    "2": "off",
    "3": "participation"
    // ... hours 4-23
  }
}

Forecasted Flexibility by Hour: POST /iots/forecast/flexibility

Request:

{
  "hour": 14
}

Response:

{
  "shifting": [["AC1", 50], ["AC2", 75]],  // 0-20% participation
  "reducing": [["Water Heater", 100]]       // Full participation
}

API Reference

Demand Response Endpoints

Method Endpoint Description Request Body Response
POST /invitation/send Create DR invitation {event_time, kwh, percentage, iots} {event_time}
POST /invitation/get Get specific invitation {event_time} Invitation object
GET /invitation/unanswered Get pending invitations None Array of invitations
GET /invitation/answered Get last 5 completed None Array of invitations
POST /invitation/answer Submit response {event_time, iot, response} {message: "answered"}
GET /invitation/auto Get auto-accept config None {auto_answer: boolean}
POST /invitation/auto Set auto-accept config {auto_answer: boolean} {auto_answer: boolean}
POST /event/check Execute DR event {event_time, iot} Success status
POST /dr/benefit Record DR benefit {iot, value} {message: "ok"}
Method Endpoint Description
GET /energy/now Current consumption, generation, flexibility
GET /energy/flexibility Available flexibility for DR
GET /forecast/flexibility Forecasted flexibility
POST /iots/forecast/flexibility Flexibility by hour and device
POST /iot/demandresponse/enable Enable/disable device DR
POST /iot/instructions Update DR instructions
GET /iot/instructions Get current instructions
GET /benefits/monthly Monthly DR benefits

Complete Event Flow Example

Scenario: 10 kWh Reduction Event at 2:00 PM

┌─────────────────────────────────────────────────────────────┐
│ STEP 1: Create Invitation (1:45 PM)                         │
└─────────────────────────────────────────────────────────────┘

POST /invitation/send
{
  "event_time": "2025-12-10 14:00:00",
  "kwh": 10,
  "percentage": 20,
  "iots": ["AC1", "AC2", "Water Heater"]
}

Flow:
├─ DemandResponseService.invitation()
├─ Checks auto_answer config → disabled (false)
├─ Sets response = "WAITING"
├─ DemandResponseRepository.insert_invitation()
└─ MongoDB: Creates invitation document

Result: Invitation stored, awaiting participant approval


┌─────────────────────────────────────────────────────────────┐
│ STEP 2: Check Pending Invitations (1:50 PM)                │
└─────────────────────────────────────────────────────────────┘

GET /invitation/unanswered

Response:
[
  {
    "datetime": "2025-12-10 13:45:32",
    "event_time": "2025-12-10 14:00:00",
    "load_kwh": 10,
    "load_percentage": 20,
    "iots": ["AC1", "AC2", "Water Heater"],
    "response": "WAITING"
  }
]


┌─────────────────────────────────────────────────────────────┐
│ STEP 3: Answer Invitation for Each Device (1:55 PM)        │
└─────────────────────────────────────────────────────────────┘

POST /invitation/answer
{"event_time": "2025-12-10 14:00:00", "iot": "AC1", "response": "YES"}

POST /invitation/answer
{"event_time": "2025-12-10 14:00:00", "iot": "AC2", "response": "YES"}

POST /invitation/answer
{"event_time": "2025-12-10 14:00:00", "iot": "Water Heater", "response": "NO"}

Flow per request:
├─ DemandResponseService.answer_invitation()
├─ DemandResponseRepository.answer_invitation()
└─ MongoDB: Updates invitation.response for specified iot

Result: AC1 and AC2 accepted, Water Heater declined


┌─────────────────────────────────────────────────────────────┐
│ STEP 4: Execute DR Event (2:00 PM - Event Start)           │
└─────────────────────────────────────────────────────────────┘

POST /event/check
{"event_time": "2025-12-10 14:00:00", "iot": "AC1"}

POST /event/check
{"event_time": "2025-12-10 14:00:00", "iot": "AC2"}

Flow per request:
├─ Core.schedule_event("2025-12-10 14:00:00", "AC1")
├─ Finds IoT device: iot = [i for i in core.iots if i.name == "AC1"][0]
├─ Creates DemandResponseAtuator(core, iot)
└─ Starts thread

DemandResponseAtuator.run():
├─ Schedules end_event() at 14:59:00
└─ While loop (every 1 second for 59 minutes):
    └─ core.dr_reduced_power += iot.get_power()

Result: Two threads running, accumulating power reduction


┌─────────────────────────────────────────────────────────────┐
│ STEP 5: Monitor Energy (2:30 PM - During Event)            │
└─────────────────────────────────────────────────────────────┘

GET /energy/now

Flow:
├─ Core.get_total_consumption()
├─ totalPower = sum(iot.get_power() for iot in iots_consumption)
├─ totalPower = 50 kW (all devices)
├─ reduce = core.dr_reduced_power = 8 kW (accumulated from AC1+AC2)
├─ core.dr_reduced_power = 0  # Reset
└─ return 50 - 8 = 42 kW

Response:
{
  "consumption": 42.0,  // Reduced by DR
  "generation": 15.0,
  "flexibility": 18.0
}

Result: Consumption appears 8 kW lower due to DR reduction


┌─────────────────────────────────────────────────────────────┐
│ STEP 6: Automatic Event End (2:59 PM)                      │
└─────────────────────────────────────────────────────────────┘

Scheduled Task Triggered:
├─ DemandResponseAtuator.end_event() called
├─ self.event_on = False
├─ Thread exits while loop
└─ Thread terminates

Result: Both AC1 and AC2 threads stopped, DR event complete


┌─────────────────────────────────────────────────────────────┐
│ STEP 7: Record Financial Benefit (3:00 PM)                 │
└─────────────────────────────────────────────────────────────┘

POST /dr/benefit
{"iot": "AC1", "value": 5.50}

POST /dr/benefit
{"iot": "AC2", "value": 4.75}

Flow per request:
├─ EnergyService.add_benefit("dr", iot, value)
├─ FinancialRepository.insert_benefit()
└─ MongoDB.benefit: {source: "dr", product: iot, value: value, datetime: now}

Result: Total DR benefit = €10.25


┌─────────────────────────────────────────────────────────────┐
│ STEP 8: Hourly Storage (3:00 PM - End of Hour)             │
└─────────────────────────────────────────────────────────────┘

StoringManager.save_hour() (automatic):
├─ BuildingService.save_last_hour()
├─ Calculates flexibility = power * random(0-20%)
├─ BuildingRepository.insert_hour()
└─ MongoDB.TOTALPOWERHOUR: {
    datetime: "2025-12-10 14:00:00",
    consumption: 42.0,  // Average during hour (with DR reduction)
    generation: 15.0,
    flexibility: 7.8
  }

Result: Hour data stored with DR-reduced consumption


┌─────────────────────────────────────────────────────────────┐
│ STEP 9: View Monthly Benefits (End of Month)               │
└─────────────────────────────────────────────────────────────┘

GET /benefits/monthly

Response:
{
  "dr": 185.50,   // Total DR benefits for month
  "p2p": 62.30    // Total P2P benefits for month
}

Result: Financial tracking shows €185.50 earned from DR participation

Key Metrics & Statistics

Metric Value Source
Update Frequency 1 second DemandResponseAtuator.run()
Event Duration 59 minutes Scheduled termination
Storage Frequency Every hour StoringManager
DR-Capable Devices 8 devices config/f.json
Threading Model 1 thread per device per event Core.schedule_event()
Database MongoDB (H01, BuildingRightSide) Multiple collections
API Framework Flask with CORS api/main.py
Flexibility Calculation 0-20% of device power Based on instructions

Configuration Reference

Device Configuration (config/f.json)

{
  "app": {
    "dr_events_auto_accept": 1,    // 1=enabled, 0=disabled
    "monitoring": 0                 // Debug logging
  },
  "storage": {
    "local": {
      "demand_response": ["H01", "demand_response_invitations"],
      "config": ["H01", "config"],
      "benefit": ["BuildingRightSide", "benefit"],
      "instructions": ["H01", "instructions"]
    }
  },
  "resources": {
    "iots": [
      {
        "name": "AC1",
        "type": "hvac",
        "uri": "http://192.168.2.91:30000/api/realtime/H_01/AC1",
        "control": {
          "demandresponse": true
        }
      }
      // ... more devices
    ]
  }
}

MongoDB Database Structure

Database: H01
├─ demand_response_invitations (DR events)
├─ config (auto_answer setting)
├─ instructions (hourly participation rules)
└─ TOTALPOWERHOUR (hourly aggregates)

Database: BuildingRightSide
└─ benefit (financial tracking)

Summary

The Demand Response system is a comprehensive, multi-threaded solution that enables building participation in grid flexibility programs. It features:

  • Automatic or Manual Approval: Configurable auto-accept or manual review workflow
  • Real-Time Power Tracking: Per-device threads accumulate power reduction every second
  • Financial Benefit Tracking: Source-based tracking (DR vs P2P) with monthly aggregation
  • Flexibility Forecasting: Historical data and hourly instructions for predictive planning
  • Device-Level Control: Per-device, per-hour participation configuration
  • MongoDB Persistence: Scalable data storage with optimized queries
  • REST API: Complete API for external integration and control
  • Thread Safety: Separate threads per device prevent interference

Critical Files:

  • services/DemandResponseService.py:35-38 - Auto-accept logic
  • core/DemandResponseAtuator.py:run() - Power reduction accumulation
  • core/Core.py:get_total_consumption() - DR-reduced consumption calculation
  • api/main.py:230-329 - All DR endpoints

This architecture enables scalable, reliable demand response management with precise power tracking and financial incentive tracking.