diff --git a/demand-response-architecture.md b/demand-response-architecture.md new file mode 100644 index 0000000..9494ac3 --- /dev/null +++ b/demand-response-architecture.md @@ -0,0 +1,976 @@ +# Demand Response System - Architecture & Logic Documentation + +## Table of Contents +1. [System Overview](#system-overview) +2. [Component Locations](#component-locations) +3. [Architecture & Data Flow](#architecture--data-flow) +4. [Key Components](#key-components) +5. [Invitation Lifecycle](#invitation-lifecycle) +6. [Integration Points](#integration-points) +7. [API Reference](#api-reference) +8. [Complete Event Flow Example](#complete-event-flow-example) + +--- + +## System Overview + +The IoT Building Monitoring system includes a comprehensive Demand Response (DR) management system that enables buildings to participate in grid flexibility programs by reducing power consumption during peak demand periods. + +**Key Capabilities:** +- Create and manage DR invitations with target load reductions +- Auto-accept or manual approval of DR events +- Track power reduction in real-time during events +- Calculate financial benefits from DR participation +- Forecast available flexibility by device and time +- Configure device-specific DR participation instructions + +--- + +## Component Locations + +### Core Components + +| Component | Path | Purpose | +|-----------|------|---------| +| **Service Layer** | `services/DemandResponseService.py` | Business logic for DR operations | +| **Database Layer** | `database/DemandResponseRepository.py` | MongoDB data access for DR | +| **Execution Engine** | `core/DemandResponseAtuator.py` | Runs DR events, tracks power reduction | +| **Main Orchestrator** | `core/Core.py` | Coordinates DR events and accumulates reduction | +| **IoT Model** | `model/IoT.py` | Device configuration with DR capabilities | +| **API Endpoints** | `api/main.py:230-329` | REST endpoints for DR operations | +| **Configuration** | `config/f.json` | System and device configuration | + +--- + +## Architecture & Data Flow + +### High-Level Architecture + +``` +┌─────────────────────────────────────┐ +│ REST API Endpoints │ +│ (Flask: api/main.py) │ +│ - /invitation/* │ +│ - /event/check │ +│ - /dr/benefit │ +└──────────────┬──────────────────────┘ + │ + ▼ +┌─────────────────────────────────────┐ +│ DemandResponseService │ +│ (Business Logic Layer) │ +│ - Invitation management │ +│ - Auto-answer configuration │ +│ - Response tracking │ +└──────────────┬──────────────────────┘ + │ + ▼ +┌─────────────────────────────────────┐ +│ DemandResponseRepository │ +│ (Data Access Layer) │ +│ - MongoDB operations │ +│ - Query optimization │ +└──────────────┬──────────────────────┘ + │ + ▼ +┌─────────────────────────────────────┐ +│ MongoDB Collections │ +│ - demand_response_invitations │ +│ - config (auto_answer) │ +│ - benefit (financial tracking) │ +│ - instructions (hourly rules) │ +└─────────────────────────────────────┘ +``` + +### Execution Architecture + +``` +┌──────────────────────────────────────┐ +│ Core.py (Main Thread) │ +│ - Manages IoT device fleet │ +│ - Tracks dr_reduced_power │ +│ - Calculates total flexibility │ +└──────────────┬───────────────────────┘ + │ + │ schedule_event(time, iot) + ▼ +┌──────────────────────────────────────┐ +│ DemandResponseAtuator (New Thread) │ +│ - Spawned per device per event │ +│ - Runs for 1 hour (59 minutes) │ +│ - Updates core.dr_reduced_power │ +│ - Auto-terminates at event end │ +└──────────────────────────────────────┘ +``` + +### Data Models + +#### MongoDB Collection: `demand_response_invitations` + +```json +{ + "_id": "ObjectId", + "datetime": "2025-12-10 13:45:32", // Invitation creation time + "event_time": "2025-12-10 14:00:00", // When DR event occurs + "load_kwh": 5.2, // Target reduction in kWh + "load_percentage": 15.0, // Reduction as % of total load + "iots": ["AC1", "AC2", "Lighting"], // Participating devices + "response": "WAITING|YES|NO" // Participant decision +} +``` + +#### MongoDB Collection: `config` + +```json +{ + "config": "config", + "auto_answer": true // Auto-accept DR invitations +} +``` + +#### MongoDB Collection: `benefit` + +```json +{ + "source": "dr", // "dr" or "p2p" + "product": "AC1", // Device name + "value": 5.50, // Financial benefit (€) + "datetime": "2025-12-10 14:00:00" +} +``` + +#### MongoDB Collection: `instructions` + +```json +{ + "AC1": { + "0": "participation", // Hour 0: full DR participation + "1": "shifting", // Hour 1: 0-20% participation + "2": "off", // Hour 2: no DR participation + "3": "participation", + // ... hours 4-23 + }, + "AC2": { /* ... */ } +} +``` + +--- + +## Key Components + +### 1. DemandResponseService +**Location:** `services/DemandResponseService.py` + +**Responsibilities:** +- Manages DR invitation lifecycle +- Handles participant responses +- Configures auto-accept behavior +- Queries invitation status + +**Key Methods:** + +```python +def invitation(event_time, load_kwh, load_percentage, iots): + """Create new DR invitation""" + # Checks auto_answer config + # Sets response to YES if auto-enabled, else WAITING + # Stores in MongoDB via repository + +def answer_invitation(event_time, iot, response): + """Record YES/NO response for specific device""" + # Updates invitation response field + # Used for manual acceptance workflow + +def get_unanswered_invitations(): + """Get all pending invitations awaiting response""" + # Returns invitations with response="WAITING" + +def get_answered_invitations(): + """Get last 5 completed invitations""" + # Returns historical invitations (YES/NO) + +def get_auto_answer_config(): + """Check if auto-accept is enabled""" + # Returns boolean from config collection + +def set_auto_answer_config(auto_answer): + """Enable/disable auto-accept""" + # Updates MongoDB config collection +``` + +**Auto-Accept Logic:** +```python +# Line 35-38 in DemandResponseService.py +if self.get_auto_answer_config(): + response = "YES" # Auto-accept enabled +else: + response = "WAITING" # Require manual approval +``` + +--- + +### 2. DemandResponseRepository +**Location:** `database/DemandResponseRepository.py` + +**Responsibilities:** +- Direct MongoDB operations +- Query optimization and filtering +- Data persistence + +**Key Methods:** + +```python +def insert_invitation(datetime, event_time, load_kwh, load_percentage, iots, response): + """Store new DR invitation in MongoDB""" + +def answer_invitation(event_time, iot, response): + """Update invitation response status""" + # Updates document where event_time matches and iot in iots array + +def get_unanswered_invitations(): + """Query: {response: "WAITING"}""" + +def get_answered_invitations(): + """Query: {response: {$ne: "WAITING"}}, limit 5, sort by datetime desc""" + +def get_accepted_upcoming_invitations(): + """Query: {response: "YES", event_time: {$gte: now}}""" + +def get_invitation(event_time): + """Find specific invitation by event time""" +``` + +--- + +### 3. DemandResponseAtuator +**Location:** `core/DemandResponseAtuator.py` + +**Responsibilities:** +- Executes DR event for a single device +- Runs as separate thread during event +- Accumulates power reduction in real-time +- Auto-terminates after 1 hour + +**Architecture:** +```python +class DemandResponseAtuator(Thread): + def __init__(self, core, iot): + self.core = core # Reference to Core instance + self.iot = iot # IoT device participating in DR + self.event_on = True # Event active flag + + def run(self): + # Schedule event end at 59 minutes from now + end_time = (datetime.now() + timedelta(minutes=59)) + end_time_formatted = end_time.strftime('%H:%M:%S') + schedule.every().day.at(end_time_formatted).do(self.end_event) + + # Main loop: accumulate power reduction every second + while self.event_on: + # Add device's current power to reduction accumulator + self.core.dr_reduced_power += self.iot.get_power() + schedule.run_pending() + time.sleep(1) + + def end_event(self): + """Called automatically at event end""" + self.event_on = False + return schedule.CancelJob +``` + +**Key Characteristics:** +- **Threading Model:** One thread per device per event +- **Update Frequency:** Every 1 second +- **Duration:** Exactly 59 minutes (scheduled termination) +- **Power Tracking:** Cumulative reduction added to `core.dr_reduced_power` + +--- + +### 4. Core (Main Orchestrator) +**Location:** `core/Core.py` + +**DR-Related Attributes:** +```python +class Core(Thread): + def __init__(self): + self.dr_reduced_power = 0.0 # Accumulator for power reduction + self.iots_consumption = [] # List of controllable devices + self.iots = [] # All IoT devices +``` + +**Key DR Methods:** + +```python +def schedule_event(self, event_time, iot_name): + """Initiate DR event for specified device""" + # Find device by name + iot = [i for i in self.iots if i.name == iot_name][0] + + # Create and start DemandResponseAtuator thread + dr = DemandResponseAtuator(self, iot) + dr.start() + +def get_total_consumption(self): + """Returns consumption MINUS DR reductions""" + # Sum all device power + totalPower = sum(iot.get_power() for iot in self.iots_consumption) + + # Subtract DR reduction + reduce = self.dr_reduced_power + self.dr_reduced_power = 0 # Reset accumulator + + return totalPower - reduce + +def get_total_flexibility(self): + """Calculate available flexibility for DR""" + # Sum power of devices with demandresponse=true + return sum(iot.get_power() for iot in self.iots_consumption + if iot.demandresponse) +``` + +**How Power Reduction Works:** +1. During DR event, `DemandResponseAtuator` continuously adds to `dr_reduced_power` +2. When `get_total_consumption()` is called, reduction is subtracted from total +3. `dr_reduced_power` is reset to 0 after each reading +4. This creates effective "virtual" power reduction in reported consumption + +--- + +### 5. IoT Model +**Location:** `model/IoT.py` + +**DR-Related Attributes:** +```python +class IoT: + def __init__(self, config): + self.name = config['name'] + self.demandresponse = config['control'].get('demandresponse', False) + self.instructions = {} # Hourly DR instructions +``` + +**Configuration Example (config/f.json):** +```json +{ + "resources": { + "iots": [ + { + "name": "AC1", + "type": "hvac", + "uri": "http://192.168.2.91:30000/api/realtime/H_01/AC1", + "control": { + "demandresponse": true // Device can participate in DR + } + } + ] + } +} +``` + +**DR-Capable Devices:** +- AC1, AC2, AC3, AC4 (HVAC systems) +- Water Heater +- Lighting +- Refrigerator + +**Instruction Types:** +- `"participation"` - Full DR participation (100% reduction if needed) +- `"shifting"` - Partial participation (0-20% reduction) +- `"off"` - No DR participation for that hour + +--- + +## Invitation Lifecycle + +### 1. Create Invitation +**Endpoint:** `POST /invitation/send` + +**Request:** +```json +{ + "event_time": "2025-12-10 14:00:00", + "kwh": 5.2, + "percentage": 15, + "iots": ["AC1", "AC2", "Lighting"] +} +``` + +**Response:** +```json +{ + "event_time": "2025-12-10 14:00:00" +} +``` + +**Logic Flow:** +1. Validates event_time format +2. Checks auto_answer configuration +3. Sets response = "YES" if auto-enabled, else "WAITING" +4. Stores invitation in MongoDB +5. Returns event_time as confirmation + +--- + +### 2. Check Invitation Status +**Endpoint:** `POST /invitation/get` + +**Request:** +```json +{ + "event_time": "2025-12-10 14:00:00" +} +``` + +**Response:** +```json +{ + "datetime": "2025-12-10 13:45:32", + "event_time": "2025-12-10 14:00:00", + "load_kwh": 5.2, + "load_percentage": 15, + "iots": ["AC1", "AC2", "Lighting"], + "response": "WAITING" +} +``` + +--- + +### 3. Get Pending Invitations +**Endpoint:** `GET /invitation/unanswered` + +**Response:** +```json +[ + { + "datetime": "2025-12-10 13:45:32", + "event_time": "2025-12-10 14:00:00", + "load_kwh": 5.2, + "load_percentage": 15, + "iots": ["AC1", "AC2"], + "response": "WAITING" + }, + { + "datetime": "2025-12-10 14:20:15", + "event_time": "2025-12-10 16:00:00", + "load_kwh": 3.8, + "load_percentage": 10, + "iots": ["Water Heater"], + "response": "WAITING" + } +] +``` + +**Use Case:** Display pending DR invitations requiring participant decision + +--- + +### 4. Answer Invitation +**Endpoint:** `POST /invitation/answer` + +**Request:** +```json +{ + "event_time": "2025-12-10 14:00:00", + "iot": "AC1", + "response": "YES" +} +``` + +**Response:** +```json +{ + "message": "answered" +} +``` + +**Logic:** +- Updates invitation document in MongoDB +- Sets response field to "YES" or "NO" +- Filters by event_time and iot in iots array +- Enables manual approval workflow + +--- + +### 5. Execute DR Event +**Endpoint:** `POST /event/check` + +**Request:** +```json +{ + "event_time": "2025-12-10 14:00:00", + "iot": "AC1" +} +``` + +**Logic Flow:** +```python +1. Receives event_time and iot name +2. Calls core.schedule_event(event_time, iot) +3. Core finds IoT device by name +4. Creates new DemandResponseAtuator(core, iot) +5. Starts thread → begins power reduction tracking +6. Thread runs for 59 minutes, accumulating reduction every second +7. Auto-terminates at scheduled end time +``` + +--- + +### 6. Configure Auto-Accept + +**Get Config:** `GET /invitation/auto` + +**Response:** +```json +{ + "auto_answer": true +} +``` + +**Set Config:** `POST /invitation/auto` + +**Request:** +```json +{ + "auto_answer": true +} +``` + +**Response:** +```json +{ + "auto_answer": true +} +``` + +**Effect:** +- When enabled: New invitations automatically set to response="YES" +- When disabled: New invitations set to response="WAITING" (require manual approval) + +--- + +## Integration Points + +### 1. Energy Management + +**ForecastService** (`services/ForecastService.py`) +- Calculates `forecast_flexibility()` based on historical data +- Predicts available DR capacity for future periods +- Uses flexibility data stored with hourly consumption/generation + +**Core.get_total_flexibility()** +- Returns sum of power from DR-capable devices +- Indicates current available flexibility +- Accessible via `GET /energy/flexibility` + +```python +def get_total_flexibility(self): + return sum(iot.get_power() for iot in self.iots_consumption + if iot.demandresponse) +``` + +--- + +### 2. Building Management + +**StoringManager** (`model/StoringManager.py`) +- Stores hourly aggregates including flexibility +- MongoDB collection: `TOTALPOWERHOUR` +- Fields: `{datetime, consumption, generation, flexibility}` + +**BuildingRepository** (`database/BuildingRepository.py`) +- `insert_hour()` stores flexibility alongside consumption/generation +- Flexibility calculated as: `power * random(0-20%)` +- Provides historical baseline for forecasting + +--- + +### 3. Financial Tracking + +**EnergyService** (`services/EnergyService.py`) +```python +def add_benefit(source, product, value): + """Record financial benefit from DR or P2P""" + # source: "dr" or "p2p" + # product: device name + # value: financial reward amount +``` + +**Record DR Benefit:** `POST /dr/benefit` + +**Request:** +```json +{ + "iot": "AC1", + "value": 5.50 +} +``` + +**Storage:** +```json +{ + "source": "dr", + "product": "AC1", + "value": 5.50, + "datetime": "2025-12-10 14:00:00" +} +``` + +**Monthly Benefits:** `GET /benefits/monthly` + +**Response:** +```json +{ + "dr": 150.00, + "p2p": 50.00 +} +``` + +--- + +### 4. IoT Device Control + +**IotService** (`services/IotService.py`) + +```python +def change_dr_enable(iot, enable): + """Enable or disable DR capability for device""" + iot.demandresponse = enable + +def update_instructions(instructions): + """Set hourly DR participation instructions""" + # Format: {iot_name: {hour: "participation|shifting|off"}} + +def get_instructions(): + """Retrieve current DR instructions""" + return {iot.name: iot.instructions for iot in iots} +``` + +**Update Instructions:** `POST /iot/instructions` + +**Request:** +```json +{ + "AC1": { + "0": "participation", + "1": "shifting", + "2": "off", + "3": "participation" + // ... hours 4-23 + } +} +``` + +**Forecasted Flexibility by Hour:** `POST /iots/forecast/flexibility` + +**Request:** +```json +{ + "hour": 14 +} +``` + +**Response:** +```json +{ + "shifting": [["AC1", 50], ["AC2", 75]], // 0-20% participation + "reducing": [["Water Heater", 100]] // Full participation +} +``` + +--- + +## API Reference + +### Demand Response Endpoints + +| Method | Endpoint | Description | Request Body | Response | +|--------|----------|-------------|--------------|----------| +| POST | `/invitation/send` | Create DR invitation | `{event_time, kwh, percentage, iots}` | `{event_time}` | +| POST | `/invitation/get` | Get specific invitation | `{event_time}` | Invitation object | +| GET | `/invitation/unanswered` | Get pending invitations | None | Array of invitations | +| GET | `/invitation/answered` | Get last 5 completed | None | Array of invitations | +| POST | `/invitation/answer` | Submit response | `{event_time, iot, response}` | `{message: "answered"}` | +| GET | `/invitation/auto` | Get auto-accept config | None | `{auto_answer: boolean}` | +| POST | `/invitation/auto` | Set auto-accept config | `{auto_answer: boolean}` | `{auto_answer: boolean}` | +| POST | `/event/check` | Execute DR event | `{event_time, iot}` | Success status | +| POST | `/dr/benefit` | Record DR benefit | `{iot, value}` | `{message: "ok"}` | + +### Related Flexibility/Energy Endpoints + +| Method | Endpoint | Description | +|--------|----------|-------------| +| GET | `/energy/now` | Current consumption, generation, flexibility | +| GET | `/energy/flexibility` | Available flexibility for DR | +| GET | `/forecast/flexibility` | Forecasted flexibility | +| POST | `/iots/forecast/flexibility` | Flexibility by hour and device | +| POST | `/iot/demandresponse/enable` | Enable/disable device DR | +| POST | `/iot/instructions` | Update DR instructions | +| GET | `/iot/instructions` | Get current instructions | +| GET | `/benefits/monthly` | Monthly DR benefits | + +--- + +## Complete Event Flow Example + +### Scenario: 10 kWh Reduction Event at 2:00 PM + +``` +┌─────────────────────────────────────────────────────────────┐ +│ STEP 1: Create Invitation (1:45 PM) │ +└─────────────────────────────────────────────────────────────┘ + +POST /invitation/send +{ + "event_time": "2025-12-10 14:00:00", + "kwh": 10, + "percentage": 20, + "iots": ["AC1", "AC2", "Water Heater"] +} + +Flow: +├─ DemandResponseService.invitation() +├─ Checks auto_answer config → disabled (false) +├─ Sets response = "WAITING" +├─ DemandResponseRepository.insert_invitation() +└─ MongoDB: Creates invitation document + +Result: Invitation stored, awaiting participant approval + + +┌─────────────────────────────────────────────────────────────┐ +│ STEP 2: Check Pending Invitations (1:50 PM) │ +└─────────────────────────────────────────────────────────────┘ + +GET /invitation/unanswered + +Response: +[ + { + "datetime": "2025-12-10 13:45:32", + "event_time": "2025-12-10 14:00:00", + "load_kwh": 10, + "load_percentage": 20, + "iots": ["AC1", "AC2", "Water Heater"], + "response": "WAITING" + } +] + + +┌─────────────────────────────────────────────────────────────┐ +│ STEP 3: Answer Invitation for Each Device (1:55 PM) │ +└─────────────────────────────────────────────────────────────┘ + +POST /invitation/answer +{"event_time": "2025-12-10 14:00:00", "iot": "AC1", "response": "YES"} + +POST /invitation/answer +{"event_time": "2025-12-10 14:00:00", "iot": "AC2", "response": "YES"} + +POST /invitation/answer +{"event_time": "2025-12-10 14:00:00", "iot": "Water Heater", "response": "NO"} + +Flow per request: +├─ DemandResponseService.answer_invitation() +├─ DemandResponseRepository.answer_invitation() +└─ MongoDB: Updates invitation.response for specified iot + +Result: AC1 and AC2 accepted, Water Heater declined + + +┌─────────────────────────────────────────────────────────────┐ +│ STEP 4: Execute DR Event (2:00 PM - Event Start) │ +└─────────────────────────────────────────────────────────────┘ + +POST /event/check +{"event_time": "2025-12-10 14:00:00", "iot": "AC1"} + +POST /event/check +{"event_time": "2025-12-10 14:00:00", "iot": "AC2"} + +Flow per request: +├─ Core.schedule_event("2025-12-10 14:00:00", "AC1") +├─ Finds IoT device: iot = [i for i in core.iots if i.name == "AC1"][0] +├─ Creates DemandResponseAtuator(core, iot) +└─ Starts thread + +DemandResponseAtuator.run(): +├─ Schedules end_event() at 14:59:00 +└─ While loop (every 1 second for 59 minutes): + └─ core.dr_reduced_power += iot.get_power() + +Result: Two threads running, accumulating power reduction + + +┌─────────────────────────────────────────────────────────────┐ +│ STEP 5: Monitor Energy (2:30 PM - During Event) │ +└─────────────────────────────────────────────────────────────┘ + +GET /energy/now + +Flow: +├─ Core.get_total_consumption() +├─ totalPower = sum(iot.get_power() for iot in iots_consumption) +├─ totalPower = 50 kW (all devices) +├─ reduce = core.dr_reduced_power = 8 kW (accumulated from AC1+AC2) +├─ core.dr_reduced_power = 0 # Reset +└─ return 50 - 8 = 42 kW + +Response: +{ + "consumption": 42.0, // Reduced by DR + "generation": 15.0, + "flexibility": 18.0 +} + +Result: Consumption appears 8 kW lower due to DR reduction + + +┌─────────────────────────────────────────────────────────────┐ +│ STEP 6: Automatic Event End (2:59 PM) │ +└─────────────────────────────────────────────────────────────┘ + +Scheduled Task Triggered: +├─ DemandResponseAtuator.end_event() called +├─ self.event_on = False +├─ Thread exits while loop +└─ Thread terminates + +Result: Both AC1 and AC2 threads stopped, DR event complete + + +┌─────────────────────────────────────────────────────────────┐ +│ STEP 7: Record Financial Benefit (3:00 PM) │ +└─────────────────────────────────────────────────────────────┘ + +POST /dr/benefit +{"iot": "AC1", "value": 5.50} + +POST /dr/benefit +{"iot": "AC2", "value": 4.75} + +Flow per request: +├─ EnergyService.add_benefit("dr", iot, value) +├─ FinancialRepository.insert_benefit() +└─ MongoDB.benefit: {source: "dr", product: iot, value: value, datetime: now} + +Result: Total DR benefit = €10.25 + + +┌─────────────────────────────────────────────────────────────┐ +│ STEP 8: Hourly Storage (3:00 PM - End of Hour) │ +└─────────────────────────────────────────────────────────────┘ + +StoringManager.save_hour() (automatic): +├─ BuildingService.save_last_hour() +├─ Calculates flexibility = power * random(0-20%) +├─ BuildingRepository.insert_hour() +└─ MongoDB.TOTALPOWERHOUR: { + datetime: "2025-12-10 14:00:00", + consumption: 42.0, // Average during hour (with DR reduction) + generation: 15.0, + flexibility: 7.8 + } + +Result: Hour data stored with DR-reduced consumption + + +┌─────────────────────────────────────────────────────────────┐ +│ STEP 9: View Monthly Benefits (End of Month) │ +└─────────────────────────────────────────────────────────────┘ + +GET /benefits/monthly + +Response: +{ + "dr": 185.50, // Total DR benefits for month + "p2p": 62.30 // Total P2P benefits for month +} + +Result: Financial tracking shows €185.50 earned from DR participation +``` + +--- + +## Key Metrics & Statistics + +| Metric | Value | Source | +|--------|-------|--------| +| **Update Frequency** | 1 second | DemandResponseAtuator.run() | +| **Event Duration** | 59 minutes | Scheduled termination | +| **Storage Frequency** | Every hour | StoringManager | +| **DR-Capable Devices** | 8 devices | config/f.json | +| **Threading Model** | 1 thread per device per event | Core.schedule_event() | +| **Database** | MongoDB (H01, BuildingRightSide) | Multiple collections | +| **API Framework** | Flask with CORS | api/main.py | +| **Flexibility Calculation** | 0-20% of device power | Based on instructions | + +--- + +## Configuration Reference + +### Device Configuration (config/f.json) + +```json +{ + "app": { + "dr_events_auto_accept": 1, // 1=enabled, 0=disabled + "monitoring": 0 // Debug logging + }, + "storage": { + "local": { + "demand_response": ["H01", "demand_response_invitations"], + "config": ["H01", "config"], + "benefit": ["BuildingRightSide", "benefit"], + "instructions": ["H01", "instructions"] + } + }, + "resources": { + "iots": [ + { + "name": "AC1", + "type": "hvac", + "uri": "http://192.168.2.91:30000/api/realtime/H_01/AC1", + "control": { + "demandresponse": true + } + } + // ... more devices + ] + } +} +``` + +### MongoDB Database Structure + +``` +Database: H01 +├─ demand_response_invitations (DR events) +├─ config (auto_answer setting) +├─ instructions (hourly participation rules) +└─ TOTALPOWERHOUR (hourly aggregates) + +Database: BuildingRightSide +└─ benefit (financial tracking) +``` + +--- + +## Summary + +The Demand Response system is a comprehensive, multi-threaded solution that enables building participation in grid flexibility programs. It features: + +- **Automatic or Manual Approval:** Configurable auto-accept or manual review workflow +- **Real-Time Power Tracking:** Per-device threads accumulate power reduction every second +- **Financial Benefit Tracking:** Source-based tracking (DR vs P2P) with monthly aggregation +- **Flexibility Forecasting:** Historical data and hourly instructions for predictive planning +- **Device-Level Control:** Per-device, per-hour participation configuration +- **MongoDB Persistence:** Scalable data storage with optimized queries +- **REST API:** Complete API for external integration and control +- **Thread Safety:** Separate threads per device prevent interference + +**Critical Files:** +- **services/DemandResponseService.py:35-38** - Auto-accept logic +- **core/DemandResponseAtuator.py:run()** - Power reduction accumulation +- **core/Core.py:get_total_consumption()** - DR-reduced consumption calculation +- **api/main.py:230-329** - All DR endpoints + +This architecture enables scalable, reliable demand response management with precise power tracking and financial incentive tracking. diff --git a/microservices/api-gateway/main.py b/microservices/api-gateway/main.py index e4e191d..19c5221 100644 --- a/microservices/api-gateway/main.py +++ b/microservices/api-gateway/main.py @@ -77,6 +77,12 @@ SERVICES = { base_url=os.getenv("DATA_INGESTION_SERVICE_URL", "http://data-ingestion-service:8008"), health_endpoint="/health", auth_required=False + ), + "demand-response-service": ServiceConfig( + name="demand-response-service", + base_url=os.getenv("DEMAND_RESPONSE_SERVICE_URL", "http://demand-response-service:8003"), + health_endpoint="/health", + auth_required=True ) } diff --git a/microservices/demand-response-service/Dockerfile b/microservices/demand-response-service/Dockerfile new file mode 100644 index 0000000..a4de447 --- /dev/null +++ b/microservices/demand-response-service/Dockerfile @@ -0,0 +1,26 @@ +FROM python:3.9-slim + +WORKDIR /app + +# Install system dependencies +RUN apt-get update && apt-get install -y \ + gcc \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# Copy requirements and install Python dependencies +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy application code +COPY . . + +# Expose port +EXPOSE 8003 + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=10s --retries=3 \ + CMD curl -f http://localhost:8003/health || exit 1 + +# Run the application +CMD ["python", "main.py"] diff --git a/microservices/demand-response-service/database.py b/microservices/demand-response-service/database.py new file mode 100644 index 0000000..74ca2a5 --- /dev/null +++ b/microservices/demand-response-service/database.py @@ -0,0 +1,208 @@ +""" +Database configuration and connection management for Demand Response Service +""" + +from motor.motor_asyncio import AsyncIOMotorClient, AsyncIOMotorDatabase +import redis.asyncio as redis +import logging +import os + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + +# Configuration from environment variables +MONGO_URL = os.getenv("MONGO_URL", "mongodb://localhost:27017") +DATABASE_NAME = os.getenv("DATABASE_NAME", "energy_dashboard_demand_response") +REDIS_URL = os.getenv("REDIS_URL", "redis://localhost:6379") + +# Global database clients +_mongo_client: AsyncIOMotorClient = None +_database: AsyncIOMotorDatabase = None +_redis_client: redis.Redis = None + + +async def connect_to_mongo(): + """Initialize MongoDB connection and create indexes""" + global _mongo_client, _database + + try: + logger.info(f"Connecting to MongoDB at {MONGO_URL}") + _mongo_client = AsyncIOMotorClient(MONGO_URL) + _database = _mongo_client[DATABASE_NAME] + + # Test connection + await _database.command("ping") + logger.info(f"Successfully connected to MongoDB database: {DATABASE_NAME}") + + # Create indexes + await create_indexes() + + except Exception as e: + logger.error(f"Failed to connect to MongoDB: {e}") + raise + + +async def close_mongo_connection(): + """Close MongoDB connection""" + global _mongo_client + + if _mongo_client: + _mongo_client.close() + logger.info("MongoDB connection closed") + + +async def get_database() -> AsyncIOMotorDatabase: + """Get database instance""" + if _database is None: + await connect_to_mongo() + return _database + + +async def connect_to_redis(): + """Initialize Redis connection""" + global _redis_client + + try: + logger.info(f"Connecting to Redis at {REDIS_URL}") + _redis_client = redis.from_url(REDIS_URL, decode_responses=True) + + # Test connection + await _redis_client.ping() + logger.info("Successfully connected to Redis") + + except Exception as e: + logger.error(f"Failed to connect to Redis: {e}") + raise + + +async def close_redis_connection(): + """Close Redis connection""" + global _redis_client + + if _redis_client: + await _redis_client.close() + logger.info("Redis connection closed") + + +async def get_redis() -> redis.Redis: + """Get Redis client instance""" + if _redis_client is None: + await connect_to_redis() + return _redis_client + + +async def create_indexes(): + """Create MongoDB indexes for optimal query performance""" + db = await get_database() + + logger.info("Creating MongoDB indexes...") + + try: + # Indexes for demand_response_invitations collection + await db.demand_response_invitations.create_index("event_id", unique=True) + await db.demand_response_invitations.create_index([("event_time", 1), ("status", 1)]) + await db.demand_response_invitations.create_index("status") + await db.demand_response_invitations.create_index("created_at") + await db.demand_response_invitations.create_index("response") + logger.info("Created indexes for demand_response_invitations collection") + + # Indexes for demand_response_events collection + await db.demand_response_events.create_index("event_id", unique=True) + await db.demand_response_events.create_index([("start_time", 1), ("status", 1)]) + await db.demand_response_events.create_index([("status", 1), ("start_time", 1)]) + await db.demand_response_events.create_index("status") + await db.demand_response_events.create_index("invitation_id") + logger.info("Created indexes for demand_response_events collection") + + # Indexes for demand_response_responses collection + await db.demand_response_responses.create_index([("event_id", 1), ("device_id", 1)], unique=True) + await db.demand_response_responses.create_index("event_id") + await db.demand_response_responses.create_index("device_id") + await db.demand_response_responses.create_index("responded_at") + logger.info("Created indexes for demand_response_responses collection") + + # Indexes for flexibility_snapshots collection (with TTL for auto-cleanup) + await db.flexibility_snapshots.create_index([("timestamp", -1)]) + await db.flexibility_snapshots.create_index( + "timestamp", + expireAfterSeconds=7776000 # 90 days TTL + ) + logger.info("Created indexes for flexibility_snapshots collection") + + # Indexes for auto_response_config collection (singleton document) + await db.auto_response_config.create_index("config_id", unique=True) + logger.info("Created indexes for auto_response_config collection") + + # Indexes for device_instructions collection + await db.device_instructions.create_index("device_id", unique=True) + await db.device_instructions.create_index("updated_at") + logger.info("Created indexes for device_instructions collection") + + logger.info("All MongoDB indexes created successfully") + + except Exception as e: + logger.error(f"Error creating indexes: {e}") + # Don't raise - indexes may already exist + + +async def initialize_default_config(): + """Initialize default auto-response configuration if it doesn't exist""" + db = await get_database() + + try: + # Check if default config exists + existing_config = await db.auto_response_config.find_one({"config_id": "default"}) + + if not existing_config: + default_config = { + "config_id": "default", + "enabled": False, + "max_reduction_percentage": 20.0, + "response_delay_seconds": 300, + "min_notice_minutes": 60, + "created_at": None, + "updated_at": None + } + + await db.auto_response_config.insert_one(default_config) + logger.info("Created default auto-response configuration") + else: + logger.info("Auto-response configuration already exists") + + except Exception as e: + logger.error(f"Error initializing default config: {e}") + + +# Utility functions for common database operations + +async def get_collection(collection_name: str): + """Get a collection by name""" + db = await get_database() + return db[collection_name] + + +async def health_check() -> dict: + """Check database connections health""" + status = { + "mongodb": False, + "redis": False + } + + try: + # Check MongoDB + db = await get_database() + await db.command("ping") + status["mongodb"] = True + except Exception as e: + logger.error(f"MongoDB health check failed: {e}") + + try: + # Check Redis + redis_client = await get_redis() + await redis_client.ping() + status["redis"] = True + except Exception as e: + logger.error(f"Redis health check failed: {e}") + + return status diff --git a/microservices/demand-response-service/demand_response_service.py b/microservices/demand-response-service/demand_response_service.py new file mode 100644 index 0000000..7b7eceb --- /dev/null +++ b/microservices/demand-response-service/demand_response_service.py @@ -0,0 +1,747 @@ +""" +Demand Response Service - Core Business Logic +Handles DR invitations, event execution, auto-response, and flexibility calculation +""" + +import asyncio +import json +import uuid +from datetime import datetime, timedelta +from typing import List, Dict, Optional, Any +import logging + +from motor.motor_asyncio import AsyncIOMotorDatabase +import redis.asyncio as redis + +# Configure logging +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class DemandResponseService: + """Core Demand Response service business logic""" + + def __init__(self, db: AsyncIOMotorDatabase, redis_client: redis.Redis): + self.db = db + self.redis = redis_client + self.active_events: Dict[str, asyncio.Task] = {} # event_id -> task + self.device_power_cache: Dict[str, float] = {} # device_id -> power_kw (updated by Redis subscriber) + + # ===== INVITATION MANAGEMENT ===== + + async def send_invitation( + self, + event_time: datetime, + load_kwh: float, + load_percentage: float, + iots: List[str], + duration_minutes: int = 59 + ) -> Dict[str, Any]: + """ + Create and send DR invitation + Returns: {"event_id": str, "response": str, "message": str} + """ + logger.info(f"Creating DR invitation for {len(iots)} devices at {event_time}") + + # Generate unique event ID + event_id = str(uuid.uuid4()) + + # Check auto-response configuration + auto_config = await self.get_auto_response_config() + response = "YES" if auto_config.get("enabled", False) else "WAITING" + + # Create invitation document + invitation = { + "event_id": event_id, + "created_at": datetime.utcnow(), + "event_time": event_time, + "load_kwh": load_kwh, + "load_percentage": load_percentage, + "iots": iots, + "duration_minutes": duration_minutes, + "response": response, + "status": "pending" + } + + # Store in MongoDB + await self.db.demand_response_invitations.insert_one(invitation) + + # Cache in Redis for fast access (24 hour TTL) + cache_key = f"dr:invitation:{event_id}" + await self.redis.setex( + cache_key, + 86400, + json.dumps(invitation, default=str) + ) + + # Publish event to Redis pub/sub + await self.redis.publish("dr_events", json.dumps({ + "event": "invitation_created", + "event_id": event_id, + "event_time": event_time.isoformat(), + "load_kwh": load_kwh, + "response": response + })) + + logger.info(f"Invitation {event_id} created with response: {response}") + + return { + "event_id": event_id, + "response": response, + "message": "Invitation created successfully" + } + + async def answer_invitation( + self, + event_id: str, + iot_id: str, + response: str, + committed_reduction_kw: Optional[float] = None + ) -> Dict[str, Any]: + """ + Record device response to invitation + Returns: {"success": bool, "message": str} + """ + logger.info(f"Recording response for invitation {event_id}, device {iot_id}: {response}") + + # Validate invitation exists + invitation = await self.get_invitation(event_id) + if not invitation: + return {"success": False, "message": f"Invitation {event_id} not found"} + + if iot_id not in invitation["iots"]: + return {"success": False, "message": f"Device {iot_id} not in invitation"} + + # Check if already responded + existing = await self.db.demand_response_responses.find_one({ + "event_id": event_id, + "device_id": iot_id + }) + + if existing: + return {"success": False, "message": f"Device {iot_id} has already responded"} + + # Store response + response_doc = { + "event_id": event_id, + "device_id": iot_id, + "response": response, + "committed_reduction_kw": committed_reduction_kw, + "responded_at": datetime.utcnow() + } + + await self.db.demand_response_responses.insert_one(response_doc) + + # Check if all devices have responded + total_devices = len(invitation["iots"]) + total_responses = await self.db.demand_response_responses.count_documents({"event_id": event_id}) + + if total_responses == total_devices: + # All devices responded - update invitation status + yes_count = await self.db.demand_response_responses.count_documents({ + "event_id": event_id, + "response": "YES" + }) + + all_yes = yes_count == total_devices + new_response = "YES" if all_yes else "NO" + new_status = "scheduled" if all_yes else "cancelled" + + await self.db.demand_response_invitations.update_one( + {"event_id": event_id}, + {"$set": {"response": new_response, "status": new_status}} + ) + + logger.info(f"Invitation {event_id} final response: {new_response} (status: {new_status})") + + # Clear cache + await self.redis.delete(f"dr:invitation:{event_id}") + + # Publish event + await self.redis.publish("dr_events", json.dumps({ + "event": "invitation_answered", + "event_id": event_id, + "device_id": iot_id, + "response": response + })) + + return {"success": True, "message": "Response recorded successfully"} + + async def get_invitation(self, event_id: str) -> Optional[Dict[str, Any]]: + """ + Get invitation by event_id (with Redis caching) + """ + # Try cache first + cache_key = f"dr:invitation:{event_id}" + cached = await self.redis.get(cache_key) + if cached: + invitation = json.loads(cached) + return invitation + + # Fallback to MongoDB + invitation = await self.db.demand_response_invitations.find_one({"event_id": event_id}) + if invitation: + invitation["_id"] = str(invitation["_id"]) + + # Cache for 24 hours + await self.redis.setex( + cache_key, + 86400, + json.dumps(invitation, default=str) + ) + + return invitation + + return None + + async def get_unanswered_invitations(self) -> List[Dict[str, Any]]: + """Get all pending invitations awaiting response""" + cursor = self.db.demand_response_invitations.find({ + "response": "WAITING", + "status": "pending" + }).sort("created_at", -1) + + invitations = [] + async for inv in cursor: + inv["_id"] = str(inv["_id"]) + invitations.append(inv) + + return invitations + + async def get_answered_invitations(self, hours: int = 24, limit: int = 50) -> List[Dict[str, Any]]: + """Get recent answered invitations""" + start_time = datetime.utcnow() - timedelta(hours=hours) + + cursor = self.db.demand_response_invitations.find({ + "response": {"$ne": "WAITING"}, + "created_at": {"$gte": start_time} + }).sort("created_at", -1).limit(limit) + + invitations = [] + async for inv in cursor: + inv["_id"] = str(inv["_id"]) + invitations.append(inv) + + return invitations + + # ===== EVENT EXECUTION ===== + + async def schedule_event( + self, + event_time: datetime, + iots: List[str], + load_reduction_kw: float, + duration_minutes: int = 59 + ) -> Dict[str, Any]: + """ + Schedule a DR event for execution + Returns: {"event_id": str, "message": str} + """ + logger.info(f"Scheduling DR event for {len(iots)} devices at {event_time}") + + # Create event document + event_id = str(uuid.uuid4()) + end_time = event_time + timedelta(minutes=duration_minutes) + + event = { + "event_id": event_id, + "start_time": event_time, + "end_time": end_time, + "status": "scheduled", + "participating_devices": iots, + "target_reduction_kw": load_reduction_kw, + "actual_reduction_kw": 0.0, + "power_samples": [] + } + + await self.db.demand_response_events.insert_one(event) + + # Publish scheduled event + await self.redis.publish("dr_events", json.dumps({ + "event": "event_scheduled", + "event_id": event_id, + "start_time": event_time.isoformat(), + "end_time": end_time.isoformat(), + "devices": iots + })) + + logger.info(f"Event {event_id} scheduled successfully") + + return { + "event_id": event_id, + "message": "Event scheduled successfully" + } + + async def execute_event(self, event_id: str): + """ + Execute a DR event (spawns background task) + """ + logger.info(f"Executing DR event {event_id}") + + # Get event details + event = await self.db.demand_response_events.find_one({"event_id": event_id}) + if not event: + logger.error(f"Event {event_id} not found") + return + + # Update status to active + await self.db.demand_response_events.update_one( + {"event_id": event_id}, + {"$set": {"status": "active", "actual_start_time": datetime.utcnow()}} + ) + + # Publish event started + await self.redis.publish("dr_events", json.dumps({ + "event": "event_started", + "event_id": event_id, + "devices": event["participating_devices"] + })) + + # Create and store async task for this event + task = asyncio.create_task(self._run_event_loop(event)) + self.active_events[event_id] = task + + logger.info(f"DR event {event_id} started successfully") + + async def _run_event_loop(self, event: Dict[str, Any]): + """ + CRITICAL: Core event execution loop - runs for duration_minutes + Samples power every 5 seconds, accumulates reduction, handles cancellation + """ + event_id = event["event_id"] + end_time = event["end_time"] + devices = event["participating_devices"] + + total_reduction_kwh = 0.0 + sample_count = 0 + + logger.info(f"Starting event loop for {event_id}, ending at {end_time}") + + try: + while datetime.utcnow() < end_time: + # Get current power for all participating devices from cache + device_powers = { + device_id: self.device_power_cache.get(device_id, 0.0) + for device_id in devices + } + + # Calculate reduction for this 5-second interval + # interval_hours = 5.0 / 3600.0 = 0.00139 hours + interval_reduction_kwh = sum(device_powers.values()) * (5.0 / 3600.0) + total_reduction_kwh += interval_reduction_kwh + + sample_count += 1 + + # Store sample in MongoDB (every sample to maintain accuracy) + sample = { + "timestamp": datetime.utcnow(), + "device_powers": device_powers, + "interval_reduction_kwh": interval_reduction_kwh + } + + await self.db.demand_response_events.update_one( + {"event_id": event_id}, + { + "$push": {"power_samples": sample}, + "$set": {"actual_reduction_kw": total_reduction_kwh} + } + ) + + # Update Redis cache for fast access to current reduction + cache_key = f"dr:event:active:{event_id}" + await self.redis.setex( + cache_key, + 300, # 5 minute TTL + json.dumps({ + "event_id": event_id, + "current_reduction_kwh": total_reduction_kwh, + "devices": device_powers, + "last_update": datetime.utcnow().isoformat() + }, default=str) + ) + + # Publish progress every 10 samples (50 seconds) + if sample_count % 10 == 0: + await self.redis.publish("dr_events", json.dumps({ + "event": "event_progress", + "event_id": event_id, + "total_reduction_kwh": round(total_reduction_kwh, 3), + "device_powers": device_powers, + "timestamp": datetime.utcnow().isoformat() + })) + logger.info(f"Event {event_id} progress: {total_reduction_kwh:.3f} kWh ({sample_count} samples)") + + # Sleep for 5 seconds + await asyncio.sleep(5) + + # Event completed successfully + logger.info(f"Event {event_id} completed with {total_reduction_kwh:.3f} kWh reduction") + await self._complete_event(event_id, total_reduction_kwh) + + except asyncio.CancelledError: + logger.info(f"Event {event_id} cancelled by user") + await self._cancel_event(event_id) + raise + + except Exception as e: + logger.error(f"Error in event {event_id}: {e}", exc_info=True) + await self._cancel_event(event_id) + + async def _complete_event(self, event_id: str, total_reduction_kwh: float): + """Mark event as completed""" + await self.db.demand_response_events.update_one( + {"event_id": event_id}, + { + "$set": { + "status": "completed", + "actual_end_time": datetime.utcnow(), + "actual_reduction_kw": total_reduction_kwh + } + } + ) + + # Remove from active events + self.active_events.pop(event_id, None) + + # Clear cache + await self.redis.delete(f"dr:event:active:{event_id}") + + # Publish completion + await self.redis.publish("dr_events", json.dumps({ + "event": "event_completed", + "event_id": event_id, + "total_reduction_kwh": total_reduction_kwh + })) + + logger.info(f"DR event {event_id} marked as completed") + + async def _cancel_event(self, event_id: str): + """Internal method to cancel an event""" + await self.db.demand_response_events.update_one( + {"event_id": event_id}, + { + "$set": { + "status": "cancelled", + "cancelled_at": datetime.utcnow() + } + } + ) + + self.active_events.pop(event_id, None) + await self.redis.delete(f"dr:event:active:{event_id}") + + # Publish cancellation + await self.redis.publish("dr_events", json.dumps({ + "event": "event_cancelled", + "event_id": event_id, + "timestamp": datetime.utcnow().isoformat() + })) + + async def cancel_event(self, event_id: str): + """ + Public method to cancel a running DR event gracefully + """ + logger.info(f"Cancelling DR event {event_id}") + + # Cancel the async task + task = self.active_events.get(event_id) + if task and not task.done(): + task.cancel() + try: + await task + except asyncio.CancelledError: + # Expected - task cancelled successfully + pass + except Exception as e: + logger.error(f"Error cancelling event task {event_id}: {e}") + + # Update database status (if not already done by _cancel_event) + event = await self.db.demand_response_events.find_one({"event_id": event_id}) + if event and event.get("status") != "cancelled": + await self._cancel_event(event_id) + + logger.info(f"DR event {event_id} cancelled successfully") + + async def get_active_events(self) -> List[Dict[str, Any]]: + """Get currently running events with real-time data""" + cursor = self.db.demand_response_events.find({ + "status": "active" + }).sort("start_time", -1) + + events = [] + async for event in cursor: + event["_id"] = str(event["_id"]) + + # Add real-time data from cache + cache_key = f"dr:event:active:{event['event_id']}" + cached = await self.redis.get(cache_key) + if cached: + realtime_data = json.loads(cached) + event["current_reduction_kwh"] = realtime_data.get("current_reduction_kwh") + event["current_device_powers"] = realtime_data.get("devices") + + events.append(event) + + return events + + # ===== DEVICE POWER INTEGRATION ===== + + def update_device_power_cache(self, device_id: str, power_kw: float): + """ + Update device power cache (called by Redis subscriber) + This is synchronous because it's just updating a dict + """ + self.device_power_cache[device_id] = power_kw + # No logging here to avoid spam (called every few seconds per device) + + async def get_device_power(self, device_id: str) -> float: + """Get current power for a device from cache""" + return self.device_power_cache.get(device_id, 0.0) + + # ===== AUTO-RESPONSE CONFIGURATION ===== + + async def get_auto_response_config(self) -> Dict[str, Any]: + """Get auto-response configuration""" + config = await self.db.auto_response_config.find_one({"config_id": "default"}) + + if not config: + # Create default config + default_config = { + "config_id": "default", + "enabled": False, + "max_reduction_percentage": 20.0, + "response_delay_seconds": 300, + "min_notice_minutes": 60, + "updated_at": datetime.utcnow() + } + await self.db.auto_response_config.insert_one(default_config) + return default_config + + return config + + async def set_auto_response_config( + self, + enabled: bool, + max_reduction_percentage: float = 20.0, + response_delay_seconds: int = 300, + min_notice_minutes: int = 60 + ) -> Dict[str, Any]: + """Update auto-response configuration""" + await self.db.auto_response_config.update_one( + {"config_id": "default"}, + { + "$set": { + "enabled": enabled, + "max_reduction_percentage": max_reduction_percentage, + "response_delay_seconds": response_delay_seconds, + "min_notice_minutes": min_notice_minutes, + "updated_at": datetime.utcnow() + } + }, + upsert=True + ) + + # Clear cache + await self.redis.delete("dr:config:auto_response") + + logger.info(f"Auto-response config updated: enabled={enabled}") + + return await self.get_auto_response_config() + + async def process_auto_responses(self): + """ + Process pending invitations with auto-response (called by background task) + """ + # Get auto-response configuration + auto_config = await self.get_auto_response_config() + + if not auto_config.get("enabled"): + return + + # Find unanswered invitations + invitations = await self.get_unanswered_invitations() + + for invitation in invitations: + event_id = invitation["event_id"] + event_time = invitation["event_time"] + + # Parse event_time (might be string from cache) + if isinstance(event_time, str): + event_time = datetime.fromisoformat(event_time.replace('Z', '+00:00')) + + # Check if event is within auto-response criteria + time_until_event = (event_time - datetime.utcnow()).total_seconds() / 60 # minutes + min_notice = auto_config.get("min_notice_minutes", 60) + + if time_until_event >= min_notice: + logger.info(f"Auto-responding to invitation {event_id}") + + # Auto-accept for all devices + for device_id in invitation["iots"]: + # Check if already responded + existing = await self.db.demand_response_responses.find_one({ + "event_id": event_id, + "device_id": device_id + }) + + if not existing: + # Get device current power + device_power = await self.get_device_power(device_id) + + # Calculate committed reduction based on max_reduction_percentage + max_reduction_pct = auto_config.get("max_reduction_percentage", 20.0) + committed_reduction = device_power * (max_reduction_pct / 100) if device_power > 0 else 0.5 + + # Submit auto-response + try: + await self.answer_invitation(event_id, device_id, "YES", committed_reduction) + logger.info(f"Auto-accepted for device {device_id} with {committed_reduction:.2f} kW commitment") + except Exception as e: + logger.error(f"Error auto-responding for {device_id}: {e}") + else: + logger.debug(f"Invitation {event_id} too soon ({time_until_event:.0f}m < {min_notice}m)") + + # ===== BACKGROUND TASK SUPPORT ===== + + async def check_scheduled_events(self): + """ + Check for events that need to be started (called by scheduler task) + """ + now = datetime.utcnow() + threshold = now + timedelta(minutes=1) # Start events within next minute + + # Find scheduled events that should start + cursor = self.db.demand_response_events.find({ + "status": "scheduled", + "start_time": {"$lte": threshold, "$gte": now} + }) + + async for event in cursor: + event_id = event["event_id"] + + # Check if not already active + if event_id not in self.active_events: + logger.info(f"Starting scheduled DR event {event_id}") + await self.execute_event(event_id) + + # ===== BASIC FLEXIBILITY CALCULATION ===== + + async def get_current_flexibility(self) -> Dict[str, Any]: + """ + Calculate current available flexibility from device power cache + """ + total_flexibility_kw = 0.0 + devices = [] + + # Get all devices with instructions + cursor = self.db.device_instructions.find({}) + current_hour = datetime.utcnow().hour + + async for device_doc in cursor: + device_id = device_doc["device_id"] + instruction = device_doc["instructions"].get(str(current_hour), "off") + + if instruction != "off": + # Get device current power from cache + device_power = self.device_power_cache.get(device_id, 0.0) + + if instruction == "participation": + # Full flexibility (100%) + flexibility = device_power + elif instruction == "shifting": + # Partial flexibility (20%) + flexibility = device_power * 0.20 + else: + flexibility = 0.0 + + if flexibility > 0: + devices.append({ + "device_id": device_id, + "available_kw": round(flexibility, 2), + "instruction": instruction, + "current_power": round(device_power, 2) + }) + total_flexibility_kw += flexibility + + snapshot = { + "timestamp": datetime.utcnow(), + "total_flexibility_kw": round(total_flexibility_kw, 2), + "devices": devices + } + + # Store snapshot + await self.db.flexibility_snapshots.insert_one(dict(snapshot)) + + # Cache for 5 minutes + await self.redis.setex( + "dr:flexibility:current", + 300, + json.dumps(snapshot, default=str) + ) + + return snapshot + + async def get_device_instructions(self, device_id: Optional[str] = None) -> Dict[str, Any]: + """Get DR instructions for device(s)""" + if device_id: + doc = await self.db.device_instructions.find_one({"device_id": device_id}) + return doc if doc else {"device_id": device_id, "instructions": {}} + else: + cursor = self.db.device_instructions.find({}) + instructions = {} + async for doc in cursor: + instructions[doc["device_id"]] = doc["instructions"] + return instructions + + async def update_device_instructions(self, device_id: str, instructions: Dict[str, str]): + """Update hourly instructions for a device""" + await self.db.device_instructions.update_one( + {"device_id": device_id}, + { + "$set": { + "instructions": instructions, + "updated_at": datetime.utcnow() + } + }, + upsert=True + ) + + logger.info(f"Updated instructions for device {device_id}") + + # ===== ANALYTICS ===== + + async def get_performance_analytics(self, days: int = 30) -> Dict[str, Any]: + """Get DR performance analytics""" + start_date = datetime.utcnow() - timedelta(days=days) + + # Query completed events + cursor = self.db.demand_response_events.find({ + "status": "completed", + "start_time": {"$gte": start_date} + }) + + events = await cursor.to_list(length=None) + + if not events: + return { + "period_days": days, + "total_events": 0, + "total_reduction_kwh": 0.0, + "total_target_kwh": 0.0, + "average_reduction_kwh": 0.0, + "achievement_rate": 0.0, + "average_event_duration_minutes": 59 + } + + total_reduction = sum(e.get("actual_reduction_kw", 0) for e in events) + total_target = sum(e.get("target_reduction_kw", 0) for e in events) + + return { + "period_days": days, + "total_events": len(events), + "total_reduction_kwh": round(total_reduction, 2), + "total_target_kwh": round(total_target, 2), + "average_reduction_kwh": round(total_reduction / len(events), 2), + "achievement_rate": round((total_reduction / total_target * 100) if total_target > 0 else 0, 2), + "average_event_duration_minutes": 59 + } diff --git a/microservices/demand-response-service/main.py b/microservices/demand-response-service/main.py index 02afa4d..0e61778 100644 --- a/microservices/demand-response-service/main.py +++ b/microservices/demand-response-service/main.py @@ -23,22 +23,130 @@ from demand_response_service import DemandResponseService logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) +# Background task functions +async def event_scheduler_task(): + """Background task for checking and executing scheduled events""" + logger.info("Starting event scheduler task") + + while True: + try: + db = await get_database() + redis = await get_redis() + service = DemandResponseService(db, redis) + + # Check for events that need to be executed + await service.check_scheduled_events() + + # Sleep for 60 seconds between checks + await asyncio.sleep(60) + + except asyncio.CancelledError: + logger.info("Event scheduler task cancelled") + raise + except Exception as e: + logger.error(f"Error in event scheduler task: {e}") + await asyncio.sleep(120) # Wait longer on error + + +async def auto_response_task(): + """Background task for automatic demand response""" + logger.info("Starting auto-response task") + + while True: + try: + db = await get_database() + redis = await get_redis() + service = DemandResponseService(db, redis) + + # Check for auto-response opportunities + await service.process_auto_responses() + + # Sleep for 30 seconds between checks + await asyncio.sleep(30) + + except asyncio.CancelledError: + logger.info("Auto-response task cancelled") + raise + except Exception as e: + logger.error(f"Error in auto-response task: {e}") + await asyncio.sleep(90) # Wait longer on error + + +async def energy_data_subscriber_task(): + """Subscribe to energy_data Redis channel for device power updates""" + logger.info("Starting energy data subscriber task") + + try: + redis = await get_redis() + db = await get_database() + service = DemandResponseService(db, redis) + + pubsub = redis.pubsub() + await pubsub.subscribe("energy_data") + + logger.info("Subscribed to energy_data channel") + + while True: + try: + message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0) + if message and message.get('type') == 'message': + import json + data = json.loads(message['data']) + + # Format: {"sensorId": "sensor_1", "timestamp": 123, "value": 3.5, "unit": "kWh"} + sensor_id = data.get("sensorId") + power_kw = data.get("value", 0.0) + + # Update service cache + service.update_device_power_cache(sensor_id, power_kw) + + except json.JSONDecodeError as e: + logger.warning(f"Invalid JSON in energy_data message: {e}") + except Exception as e: + logger.error(f"Error processing energy data message: {e}") + await asyncio.sleep(5) + + except asyncio.CancelledError: + logger.info("Energy data subscriber task cancelled") + raise + except Exception as e: + logger.error(f"Energy data subscriber task failed: {e}") + + @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan manager""" logger.info("Demand Response Service starting up...") await connect_to_mongo() await connect_to_redis() - + + # Create global service instance for shutdown cleanup + db = await get_database() + redis = await get_redis() + app.state.dr_service = DemandResponseService(db, redis) + # Start background tasks asyncio.create_task(event_scheduler_task()) asyncio.create_task(auto_response_task()) - + asyncio.create_task(energy_data_subscriber_task()) + logger.info("Demand Response Service startup complete") - + yield - + logger.info("Demand Response Service shutting down...") + + # Cancel all active DR events gracefully + if hasattr(app.state, 'dr_service'): + active_event_ids = list(app.state.dr_service.active_events.keys()) + if active_event_ids: + logger.info(f"Cancelling {len(active_event_ids)} active events...") + for event_id in active_event_ids: + try: + await app.state.dr_service.cancel_event(event_id) + except Exception as e: + logger.error(f"Error cancelling event {event_id}: {e}") + await close_mongo_connection() logger.info("Demand Response Service shutdown complete") diff --git a/microservices/demand-response-service/models.py b/microservices/demand-response-service/models.py new file mode 100644 index 0000000..6f149c0 --- /dev/null +++ b/microservices/demand-response-service/models.py @@ -0,0 +1,338 @@ +""" +Pydantic models for Demand Response Service +""" + +from datetime import datetime +from typing import List, Dict, Optional, Literal +from pydantic import BaseModel, Field +from enum import Enum + + +# Enums +class InvitationStatus(str, Enum): + """Invitation status states""" + PENDING = "pending" + SCHEDULED = "scheduled" + ACTIVE = "active" + COMPLETED = "completed" + CANCELLED = "cancelled" + + +class ResponseType(str, Enum): + """Device response types""" + WAITING = "WAITING" + YES = "YES" + NO = "NO" + + +class EventStatus(str, Enum): + """DR event status states""" + SCHEDULED = "scheduled" + ACTIVE = "active" + COMPLETED = "completed" + CANCELLED = "cancelled" + + +class InstructionType(str, Enum): + """Device participation instruction types""" + PARTICIPATION = "participation" # Full DR participation (100%) + SHIFTING = "shifting" # Partial participation (0-20%) + OFF = "off" # No DR participation + + +# Invitation Models +class EventRequest(BaseModel): + """Request model for creating a DR event (alias for DRInvitationCreate)""" + event_time: datetime = Field(..., description="When the DR event should occur") + load_kwh: float = Field(..., description="Target load reduction in kWh", gt=0) + load_percentage: float = Field(..., description="Target reduction as percentage of total load", ge=0, le=100) + iots: List[str] = Field(..., description="List of device IDs to participate", min_items=1) + duration_minutes: int = Field(59, description="Event duration in minutes", gt=0, le=120) + + class Config: + json_schema_extra = { + "example": { + "event_time": "2025-12-10T14:00:00", + "load_kwh": 5.0, + "load_percentage": 15.0, + "iots": ["sensor_1", "sensor_2"], + "duration_minutes": 59 + } + } + + +class DRInvitationCreate(BaseModel): + """Request model for creating a DR invitation""" + event_time: datetime = Field(..., description="When the DR event should occur") + load_kwh: float = Field(..., description="Target load reduction in kWh", gt=0) + load_percentage: float = Field(..., description="Target reduction as percentage of total load", ge=0, le=100) + iots: List[str] = Field(..., description="List of device IDs to participate", min_items=1) + duration_minutes: int = Field(59, description="Event duration in minutes", gt=0, le=120) + + class Config: + json_schema_extra = { + "example": { + "event_time": "2025-12-10T14:00:00", + "load_kwh": 5.0, + "load_percentage": 15.0, + "iots": ["sensor_1", "sensor_2"], + "duration_minutes": 59 + } + } + + +class DRInvitationResponse(BaseModel): + """Response model for device answering invitation""" + event_id: str = Field(..., description="Event identifier") + iot_id: str = Field(..., description="Device identifier") + response: ResponseType = Field(..., description="Device response (YES/NO)") + committed_reduction_kw: Optional[float] = Field(None, description="Committed power reduction in kW", ge=0) + + class Config: + json_schema_extra = { + "example": { + "event_id": "550e8400-e29b-41d4-a716-446655440000", + "iot_id": "sensor_1", + "response": "YES", + "committed_reduction_kw": 2.5 + } + } + + +class DRInvitation(BaseModel): + """Full DR invitation model""" + event_id: str = Field(..., description="Unique event identifier") + created_at: datetime = Field(..., description="Invitation creation time") + event_time: datetime = Field(..., description="Scheduled event start time") + load_kwh: float = Field(..., description="Target load reduction in kWh") + load_percentage: float = Field(..., description="Target reduction percentage") + iots: List[str] = Field(..., description="Participating device IDs") + duration_minutes: int = Field(..., description="Event duration in minutes") + response: str = Field(..., description="Overall response status") + status: str = Field(..., description="Invitation status") + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() if v else None + } + json_schema_extra = { + "example": { + "event_id": "550e8400-e29b-41d4-a716-446655440000", + "created_at": "2025-12-10T13:45:00", + "event_time": "2025-12-10T14:00:00", + "load_kwh": 5.0, + "load_percentage": 15.0, + "iots": ["sensor_1", "sensor_2"], + "duration_minutes": 59, + "response": "WAITING", + "status": "pending" + } + } + + +# Event Models +class EventScheduleRequest(BaseModel): + """Request model for scheduling a DR event""" + event_time: datetime = Field(..., description="Event start time") + iots: List[str] = Field(..., description="Participating device IDs", min_items=1) + load_reduction_kw: float = Field(..., description="Target reduction in kW", gt=0) + duration_minutes: int = Field(59, description="Event duration in minutes", gt=0, le=120) + + class Config: + json_schema_extra = { + "example": { + "event_time": "2025-12-10T14:00:00", + "iots": ["sensor_1", "sensor_2"], + "load_reduction_kw": 5.0, + "duration_minutes": 59 + } + } + + +class PowerSample(BaseModel): + """Individual power sample during event""" + timestamp: datetime = Field(..., description="Sample timestamp") + device_powers: Dict[str, float] = Field(..., description="Device power readings (device_id -> kW)") + interval_reduction_kwh: Optional[float] = Field(None, description="Reduction for this interval") + + +class DREvent(BaseModel): + """DR event execution model""" + event_id: str = Field(..., description="Unique event identifier") + invitation_id: Optional[str] = Field(None, description="Source invitation ID if applicable") + start_time: datetime = Field(..., description="Event start time") + end_time: datetime = Field(..., description="Event end time") + status: EventStatus = Field(..., description="Event status") + participating_devices: List[str] = Field(..., description="Device IDs participating") + target_reduction_kw: float = Field(..., description="Target power reduction in kW") + actual_reduction_kw: float = Field(0.0, description="Actual achieved reduction in kWh") + power_samples: List[Dict] = Field(default_factory=list, description="Power samples during event") + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() if v else None + } + + +class ActiveEventResponse(BaseModel): + """Response model for active event with real-time data""" + event_id: str = Field(..., description="Event identifier") + status: EventStatus = Field(..., description="Current status") + start_time: datetime = Field(..., description="Event start time") + end_time: datetime = Field(..., description="Event end time") + participating_devices: List[str] = Field(..., description="Participating devices") + target_reduction_kw: float = Field(..., description="Target reduction") + actual_reduction_kw: float = Field(..., description="Current achieved reduction") + current_device_powers: Optional[Dict[str, float]] = Field(None, description="Current device power readings") + progress_percentage: Optional[float] = Field(None, description="Event progress (0-100%)") + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() if v else None + } + + +class LoadReductionRequest(BaseModel): + """Request model for executing load reduction""" + event_time: datetime = Field(..., description="Event start time") + iot: str = Field(..., description="Device ID") + + class Config: + json_schema_extra = { + "example": { + "event_time": "2025-12-10T14:00:00", + "iot": "sensor_1" + } + } + + +# Flexibility Models +class DeviceFlexibility(BaseModel): + """Per-device flexibility information""" + device_id: str = Field(..., description="Device identifier") + available_kw: float = Field(..., description="Available flexibility in kW", ge=0) + instruction: str = Field(..., description="Current DR instruction") + current_power: float = Field(..., description="Current power consumption in kW", ge=0) + + +class FlexibilityResponse(BaseModel): + """Response model for current flexibility""" + timestamp: datetime = Field(..., description="Calculation timestamp") + total_flexibility_kw: float = Field(..., description="Total available flexibility in kW", ge=0) + devices: List[DeviceFlexibility] = Field(..., description="Per-device breakdown") + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() if v else None + } + json_schema_extra = { + "example": { + "timestamp": "2025-12-10T13:45:00", + "total_flexibility_kw": 15.5, + "devices": [ + { + "device_id": "sensor_1", + "available_kw": 3.5, + "instruction": "participation", + "current_power": 3.5 + }, + { + "device_id": "sensor_2", + "available_kw": 0.8, + "instruction": "shifting", + "current_power": 4.0 + } + ] + } + } + + +class DeviceInstructionUpdate(BaseModel): + """Model for updating device instructions""" + device_id: str = Field(..., description="Device identifier") + instructions: Dict[str, str] = Field(..., description="Hourly instructions (hour -> instruction type)") + + class Config: + json_schema_extra = { + "example": { + "device_id": "sensor_1", + "instructions": { + "0": "participation", + "1": "shifting", + "2": "off", + "3": "participation" + } + } + } + + +# Configuration Models +class AutoResponseConfig(BaseModel): + """Auto-response configuration model""" + enabled: bool = Field(..., description="Whether auto-response is enabled") + max_reduction_percentage: float = Field(20.0, description="Maximum reduction percentage for auto-accept", ge=0, le=100) + response_delay_seconds: int = Field(300, description="Delay before auto-responding (seconds)", ge=0) + min_notice_minutes: int = Field(60, description="Minimum notice required for auto-accept (minutes)", ge=0) + + class Config: + json_schema_extra = { + "example": { + "enabled": True, + "max_reduction_percentage": 20.0, + "response_delay_seconds": 300, + "min_notice_minutes": 60 + } + } + + +# Response Models +class InvitationSendResponse(BaseModel): + """Response for sending invitation""" + event_id: str = Field(..., description="Created event identifier") + response: str = Field(..., description="Initial response status") + message: str = Field(..., description="Status message") + + +class InvitationAnswerResponse(BaseModel): + """Response for answering invitation""" + success: bool = Field(..., description="Whether answer was recorded") + message: str = Field(..., description="Status message") + + +class EventScheduleResponse(BaseModel): + """Response for scheduling event""" + event_id: str = Field(..., description="Scheduled event identifier") + message: str = Field(..., description="Status message") + + +class PerformanceAnalytics(BaseModel): + """Performance analytics response""" + period_days: int = Field(..., description="Analysis period in days") + total_events: int = Field(..., description="Total number of events") + total_reduction_kwh: float = Field(..., description="Total energy reduced") + total_target_kwh: float = Field(..., description="Total target reduction") + average_reduction_kwh: float = Field(..., description="Average reduction per event") + achievement_rate: float = Field(..., description="Achievement rate (%)") + average_event_duration_minutes: int = Field(..., description="Average event duration") + + +# Health Check Model +class HealthResponse(BaseModel): + """Health check response model""" + service: str = Field(..., description="Service name") + status: str = Field(..., description="Service status") + timestamp: datetime = Field(..., description="Check timestamp") + version: str = Field(..., description="Service version") + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() if v else None + } + json_schema_extra = { + "example": { + "service": "demand-response-service", + "status": "healthy", + "timestamp": "2025-12-10T13:45:00", + "version": "1.0.0" + } + } diff --git a/microservices/demand-response-service/requirements.txt b/microservices/demand-response-service/requirements.txt new file mode 100644 index 0000000..b96e94a --- /dev/null +++ b/microservices/demand-response-service/requirements.txt @@ -0,0 +1,11 @@ +fastapi>=0.104.0 +uvicorn[standard]>=0.24.0 +pymongo>=4.5.0 +motor>=3.3.0 +redis>=5.0.0 +python-dotenv>=1.0.0 +pydantic>=2.4.0 +aiohttp>=3.9.0 +pytest>=7.4.0 +pytest-asyncio>=0.21.0 +python-multipart diff --git a/microservices/demand-response-service/test_demand_response.py b/microservices/demand-response-service/test_demand_response.py new file mode 100644 index 0000000..3ad2963 --- /dev/null +++ b/microservices/demand-response-service/test_demand_response.py @@ -0,0 +1,524 @@ +""" +Unit tests for Demand Response Service +Run with: pytest test_demand_response.py -v +""" + +import pytest +import asyncio +from datetime import datetime, timedelta +from unittest.mock import AsyncMock, MagicMock, patch +import json + +from demand_response_service import DemandResponseService + + +# Test fixtures +@pytest.fixture +def mock_db(): + """Mock MongoDB database""" + db = MagicMock() + + # Mock collections + db.demand_response_invitations = MagicMock() + db.demand_response_events = MagicMock() + db.demand_response_responses = MagicMock() + db.auto_response_config = MagicMock() + db.device_instructions = MagicMock() + db.flexibility_snapshots = MagicMock() + + return db + + +@pytest.fixture +def mock_redis(): + """Mock Redis client""" + redis = AsyncMock() + redis.get = AsyncMock(return_value=None) + redis.setex = AsyncMock() + redis.delete = AsyncMock() + redis.publish = AsyncMock() + return redis + + +@pytest.fixture +def dr_service(mock_db, mock_redis): + """Create DemandResponseService instance with mocks""" + return DemandResponseService(mock_db, mock_redis) + + +# Test: Invitation Management + +@pytest.mark.asyncio +async def test_send_invitation_with_auto_accept(dr_service, mock_db, mock_redis): + """Test sending invitation with auto-accept enabled""" + # Mock auto-response config (enabled) + mock_db.auto_response_config.find_one = AsyncMock(return_value={ + "config_id": "default", + "enabled": True + }) + + mock_db.demand_response_invitations.insert_one = AsyncMock() + + event_time = datetime.utcnow() + timedelta(hours=2) + result = await dr_service.send_invitation( + event_time=event_time, + load_kwh=5.0, + load_percentage=15.0, + iots=["sensor_1", "sensor_2"], + duration_minutes=59 + ) + + assert "event_id" in result + assert result["response"] == "YES" + assert result["message"] == "Invitation created successfully" + + # Verify MongoDB insert was called + mock_db.demand_response_invitations.insert_one.assert_called_once() + + # Verify Redis caching + mock_redis.setex.assert_called() + mock_redis.publish.assert_called() + + +@pytest.mark.asyncio +async def test_send_invitation_manual(dr_service, mock_db, mock_redis): + """Test sending invitation with auto-accept disabled (manual mode)""" + # Mock auto-response config (disabled) + mock_db.auto_response_config.find_one = AsyncMock(return_value={ + "config_id": "default", + "enabled": False + }) + + mock_db.demand_response_invitations.insert_one = AsyncMock() + + event_time = datetime.utcnow() + timedelta(hours=2) + result = await dr_service.send_invitation( + event_time=event_time, + load_kwh=5.0, + load_percentage=15.0, + iots=["sensor_1", "sensor_2"], + duration_minutes=59 + ) + + assert result["response"] == "WAITING" + + +@pytest.mark.asyncio +async def test_answer_invitation_success(dr_service, mock_db, mock_redis): + """Test answering an invitation successfully""" + event_id = "test-event-123" + + # Mock get_invitation to return a valid invitation + dr_service.get_invitation = AsyncMock(return_value={ + "event_id": event_id, + "iots": ["sensor_1", "sensor_2"] + }) + + # Mock that device hasn't responded yet + mock_db.demand_response_responses.find_one = AsyncMock(return_value=None) + mock_db.demand_response_responses.insert_one = AsyncMock() + mock_db.demand_response_responses.count_documents = AsyncMock(return_value=1) + + result = await dr_service.answer_invitation( + event_id=event_id, + iot_id="sensor_1", + response="YES", + committed_reduction_kw=2.5 + ) + + assert result["success"] is True + assert result["message"] == "Response recorded successfully" + + # Verify response was stored + mock_db.demand_response_responses.insert_one.assert_called_once() + mock_redis.delete.assert_called() + mock_redis.publish.assert_called() + + +@pytest.mark.asyncio +async def test_answer_invitation_device_not_in_list(dr_service, mock_db, mock_redis): + """Test answering invitation for device not in invitation list""" + event_id = "test-event-123" + + dr_service.get_invitation = AsyncMock(return_value={ + "event_id": event_id, + "iots": ["sensor_1", "sensor_2"] + }) + + result = await dr_service.answer_invitation( + event_id=event_id, + iot_id="sensor_3", # Not in list + response="YES" + ) + + assert result["success"] is False + assert "not in invitation" in result["message"] + + +# Test: Event Execution + +@pytest.mark.asyncio +async def test_schedule_event(dr_service, mock_db, mock_redis): + """Test scheduling a DR event""" + mock_db.demand_response_events.insert_one = AsyncMock() + + event_time = datetime.utcnow() + timedelta(hours=1) + result = await dr_service.schedule_event( + event_time=event_time, + iots=["sensor_1", "sensor_2"], + load_reduction_kw=5.0, + duration_minutes=59 + ) + + assert "event_id" in result + assert result["message"] == "Event scheduled successfully" + + mock_db.demand_response_events.insert_one.assert_called_once() + mock_redis.publish.assert_called() + + +@pytest.mark.asyncio +async def test_execute_event(dr_service, mock_db, mock_redis): + """Test executing a DR event (spawns background task)""" + event_id = "test-event-456" + + # Mock event document + event = { + "event_id": event_id, + "start_time": datetime.utcnow(), + "end_time": datetime.utcnow() + timedelta(minutes=59), + "participating_devices": ["sensor_1"], + "target_reduction_kw": 5.0 + } + + mock_db.demand_response_events.find_one = AsyncMock(return_value=event) + mock_db.demand_response_events.update_one = AsyncMock() + + # Execute event (starts background task) + await dr_service.execute_event(event_id) + + # Verify event status updated to active + mock_db.demand_response_events.update_one.assert_called() + mock_redis.publish.assert_called() + + # Verify task was created and stored + assert event_id in dr_service.active_events + + # Cancel the task to prevent it from running + task = dr_service.active_events[event_id] + task.cancel() + try: + await task + except asyncio.CancelledError: + pass + + +@pytest.mark.asyncio +async def test_cancel_event(dr_service, mock_db, mock_redis): + """Test cancelling a running DR event""" + event_id = "test-event-789" + + # Create a mock task + mock_task = AsyncMock() + mock_task.done = MagicMock(return_value=False) + mock_task.cancel = MagicMock() + + dr_service.active_events[event_id] = mock_task + + # Mock database operations + mock_db.demand_response_events.find_one = AsyncMock(return_value={ + "event_id": event_id, + "status": "active" + }) + mock_db.demand_response_events.update_one = AsyncMock() + + await dr_service.cancel_event(event_id) + + # Verify task was cancelled + mock_task.cancel.assert_called_once() + + # Verify database updated + mock_db.demand_response_events.update_one.assert_called() + mock_redis.delete.assert_called() + mock_redis.publish.assert_called() + + +# Test: Device Power Integration + +@pytest.mark.asyncio +async def test_update_device_power_cache(dr_service): + """Test updating device power cache""" + dr_service.update_device_power_cache("sensor_1", 2.5) + + assert dr_service.device_power_cache["sensor_1"] == 2.5 + + dr_service.update_device_power_cache("sensor_1", 3.0) + assert dr_service.device_power_cache["sensor_1"] == 3.0 + + +@pytest.mark.asyncio +async def test_get_device_power(dr_service): + """Test getting device power from cache""" + dr_service.device_power_cache["sensor_1"] = 2.5 + + power = await dr_service.get_device_power("sensor_1") + assert power == 2.5 + + # Test non-existent device returns 0 + power = await dr_service.get_device_power("sensor_999") + assert power == 0.0 + + +# Test: Auto-Response Configuration + +@pytest.mark.asyncio +async def test_get_auto_response_config_exists(dr_service, mock_db): + """Test getting existing auto-response config""" + mock_config = { + "config_id": "default", + "enabled": True, + "max_reduction_percentage": 20.0 + } + + mock_db.auto_response_config.find_one = AsyncMock(return_value=mock_config) + + config = await dr_service.get_auto_response_config() + + assert config["enabled"] is True + assert config["max_reduction_percentage"] == 20.0 + + +@pytest.mark.asyncio +async def test_get_auto_response_config_creates_default(dr_service, mock_db): + """Test creating default config when none exists""" + mock_db.auto_response_config.find_one = AsyncMock(return_value=None) + mock_db.auto_response_config.insert_one = AsyncMock() + + config = await dr_service.get_auto_response_config() + + assert config["enabled"] is False + mock_db.auto_response_config.insert_one.assert_called_once() + + +@pytest.mark.asyncio +async def test_set_auto_response_config(dr_service, mock_db, mock_redis): + """Test updating auto-response configuration""" + mock_db.auto_response_config.update_one = AsyncMock() + mock_db.auto_response_config.find_one = AsyncMock(return_value={ + "config_id": "default", + "enabled": True, + "max_reduction_percentage": 25.0 + }) + + config = await dr_service.set_auto_response_config( + enabled=True, + max_reduction_percentage=25.0 + ) + + assert config["enabled"] is True + assert config["max_reduction_percentage"] == 25.0 + + mock_db.auto_response_config.update_one.assert_called_once() + mock_redis.delete.assert_called() + + +# Test: Auto-Response Processing + +@pytest.mark.asyncio +async def test_process_auto_responses_disabled(dr_service, mock_db): + """Test auto-response processing when disabled""" + mock_db.auto_response_config.find_one = AsyncMock(return_value={ + "config_id": "default", + "enabled": False + }) + + # Should return early without processing + await dr_service.process_auto_responses() + + # No invitations should be queried + mock_db.demand_response_invitations.find.assert_not_called() + + +@pytest.mark.asyncio +async def test_process_auto_responses_enabled(dr_service, mock_db, mock_redis): + """Test auto-response processing when enabled""" + # Mock enabled config + mock_db.auto_response_config.find_one = AsyncMock(return_value={ + "config_id": "default", + "enabled": True, + "max_reduction_percentage": 20.0, + "min_notice_minutes": 60 + }) + + # Mock pending invitation + future_time = datetime.utcnow() + timedelta(hours=2) + mock_invitation = { + "event_id": "test-event-auto", + "event_time": future_time, + "iots": ["sensor_1"] + } + + dr_service.get_unanswered_invitations = AsyncMock(return_value=[mock_invitation]) + dr_service.get_device_power = AsyncMock(return_value=5.0) + dr_service.answer_invitation = AsyncMock(return_value={"success": True}) + + mock_db.demand_response_responses.find_one = AsyncMock(return_value=None) + + await dr_service.process_auto_responses() + + # Should have auto-responded + dr_service.answer_invitation.assert_called_once() + + +# Test: Flexibility Calculation + +@pytest.mark.asyncio +async def test_get_current_flexibility(dr_service, mock_db, mock_redis): + """Test calculating current flexibility""" + # Mock device with instructions + mock_device = { + "device_id": "sensor_1", + "instructions": { + str(datetime.utcnow().hour): "participation" + } + } + + async def mock_cursor(): + yield mock_device + + mock_db.device_instructions.find = MagicMock(return_value=mock_cursor()) + mock_db.flexibility_snapshots.insert_one = AsyncMock() + + # Set device power in cache + dr_service.device_power_cache["sensor_1"] = 5.0 + + result = await dr_service.get_current_flexibility() + + assert result["total_flexibility_kw"] == 5.0 + assert len(result["devices"]) == 1 + assert result["devices"][0]["device_id"] == "sensor_1" + + mock_db.flexibility_snapshots.insert_one.assert_called_once() + mock_redis.setex.assert_called() + + +# Test: Device Instructions + +@pytest.mark.asyncio +async def test_update_device_instructions(dr_service, mock_db): + """Test updating device DR instructions""" + mock_db.device_instructions.update_one = AsyncMock() + + instructions = { + "0": "participation", + "1": "shifting", + "2": "off" + } + + await dr_service.update_device_instructions("sensor_1", instructions) + + mock_db.device_instructions.update_one.assert_called_once() + + +@pytest.mark.asyncio +async def test_get_device_instructions_single(dr_service, mock_db): + """Test getting instructions for single device""" + mock_instructions = { + "device_id": "sensor_1", + "instructions": {"0": "participation"} + } + + mock_db.device_instructions.find_one = AsyncMock(return_value=mock_instructions) + + result = await dr_service.get_device_instructions("sensor_1") + + assert result["device_id"] == "sensor_1" + assert "instructions" in result + + +# Test: Analytics + +@pytest.mark.asyncio +async def test_get_performance_analytics(dr_service, mock_db): + """Test getting performance analytics""" + # Mock completed events + mock_events = [ + {"actual_reduction_kw": 5.0, "target_reduction_kw": 6.0}, + {"actual_reduction_kw": 4.5, "target_reduction_kw": 5.0} + ] + + mock_cursor = AsyncMock() + mock_cursor.to_list = AsyncMock(return_value=mock_events) + + mock_db.demand_response_events.find = MagicMock(return_value=mock_cursor) + + analytics = await dr_service.get_performance_analytics(days=30) + + assert analytics["total_events"] == 2 + assert analytics["total_reduction_kwh"] == 9.5 + assert analytics["total_target_kwh"] == 11.0 + assert analytics["achievement_rate"] > 0 + + +@pytest.mark.asyncio +async def test_get_performance_analytics_no_events(dr_service, mock_db): + """Test analytics with no completed events""" + mock_cursor = AsyncMock() + mock_cursor.to_list = AsyncMock(return_value=[]) + + mock_db.demand_response_events.find = MagicMock(return_value=mock_cursor) + + analytics = await dr_service.get_performance_analytics(days=30) + + assert analytics["total_events"] == 0 + assert analytics["total_reduction_kwh"] == 0.0 + assert analytics["achievement_rate"] == 0.0 + + +# Integration-style tests + +@pytest.mark.asyncio +async def test_full_invitation_workflow(dr_service, mock_db, mock_redis): + """Test complete invitation workflow from creation to response""" + # Step 1: Create invitation + mock_db.auto_response_config.find_one = AsyncMock(return_value={ + "config_id": "default", + "enabled": False + }) + mock_db.demand_response_invitations.insert_one = AsyncMock() + + event_time = datetime.utcnow() + timedelta(hours=2) + invite_result = await dr_service.send_invitation( + event_time=event_time, + load_kwh=5.0, + load_percentage=15.0, + iots=["sensor_1", "sensor_2"], + duration_minutes=59 + ) + + event_id = invite_result["event_id"] + assert invite_result["response"] == "WAITING" + + # Step 2: Answer invitation for device 1 + dr_service.get_invitation = AsyncMock(return_value={ + "event_id": event_id, + "iots": ["sensor_1", "sensor_2"] + }) + mock_db.demand_response_responses.find_one = AsyncMock(return_value=None) + mock_db.demand_response_responses.insert_one = AsyncMock() + mock_db.demand_response_responses.count_documents = AsyncMock(side_effect=[1, 1, 2, 2]) + mock_db.demand_response_invitations.update_one = AsyncMock() + + answer1 = await dr_service.answer_invitation(event_id, "sensor_1", "YES", 2.5) + assert answer1["success"] is True + + # Step 3: Answer invitation for device 2 + answer2 = await dr_service.answer_invitation(event_id, "sensor_2", "YES", 2.5) + assert answer2["success"] is True + + # Verify final invitation update was called (all devices responded) + assert mock_db.demand_response_invitations.update_one.call_count >= 1 + + +if __name__ == "__main__": + pytest.main([__file__, "-v", "--tb=short"]) diff --git a/microservices/deploy.sh b/microservices/deploy.sh deleted file mode 100755 index 5aa37d3..0000000 --- a/microservices/deploy.sh +++ /dev/null @@ -1,314 +0,0 @@ -#!/bin/bash - -# Energy Management Microservices Deployment Script -# This script handles deployment, startup, and management of all microservices - -set -e # Exit on any error - -# Colors for output -RED='\033[0;31m' -GREEN='\033[0;32m' -YELLOW='\033[1;33m' -BLUE='\033[0;34m' -NC='\033[0m' # No Color - -# Configuration -COMPOSE_FILE="docker-compose.yml" -PROJECT_NAME="sa4cps" - -# Function to print colored output -print_status() { - echo -e "${BLUE}[INFO]${NC} $1" -} - -print_success() { - echo -e "${GREEN}[SUCCESS]${NC} $1" -} - -print_warning() { - echo -e "${YELLOW}[WARNING]${NC} $1" -} - -print_error() { - echo -e "${RED}[ERROR]${NC} $1" -} - -# Function to check if Docker and Docker Compose are installed -check_dependencies() { - print_status "Checking dependencies..." - - if ! command -v docker &> /dev/null; then - print_error "Docker is not installed. Please install Docker first." - exit 1 - fi - - if ! command -v docker compose &> /dev/null; then - print_error "Docker Compose is not installed. Please install Docker Compose first." - exit 1 - fi - - print_success "Dependencies check passed" -} - -# Function to create necessary directories and files -setup_environment() { - print_status "Setting up environment..." - - # Create nginx configuration directory - mkdir -p nginx/ssl - - # Create init-mongo directory for database initialization - mkdir -p init-mongo - - # Create a simple nginx configuration if it doesn't exist - if [ ! -f "nginx/nginx.conf" ]; then - cat > nginx/nginx.conf << 'EOF' -events { - worker_connections 1024; -} - -http { - upstream api_gateway { - server api-gateway:8000; - } - - server { - listen 80; - - location / { - proxy_pass http://api_gateway; - proxy_set_header Host $host; - proxy_set_header X-Real-IP $remote_addr; - proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; - proxy_set_header X-Forwarded-Proto $scheme; - } - - location /ws { - proxy_pass http://api_gateway; - proxy_http_version 1.1; - proxy_set_header Upgrade $http_upgrade; - proxy_set_header Connection "upgrade"; - proxy_set_header Host $host; - } - } -} -EOF - print_success "Created nginx configuration" - fi - - # Create MongoDB initialization script if it doesn't exist - if [ ! -f "init-mongo/init.js" ]; then - cat > init-mongo/init.js << 'EOF' -// MongoDB initialization script -db = db.getSiblingDB('energy_dashboard'); -db.createUser({ - user: 'dashboard_user', - pwd: 'dashboard_pass', - roles: [ - { role: 'readWrite', db: 'energy_dashboard' }, - { role: 'readWrite', db: 'energy_dashboard_tokens' }, - { role: 'readWrite', db: 'energy_dashboard_batteries' }, - { role: 'readWrite', db: 'energy_dashboard_demand_response' }, - { role: 'readWrite', db: 'energy_dashboard_p2p' }, - { role: 'readWrite', db: 'energy_dashboard_forecasting' }, - { role: 'readWrite', db: 'energy_dashboard_iot' } - ] -}); - -// Create initial collections and indexes -db.sensors.createIndex({ "sensor_id": 1 }, { unique: true }); -db.sensor_readings.createIndex({ "sensor_id": 1, "timestamp": -1 }); -db.room_metrics.createIndex({ "room": 1, "timestamp": -1 }); - -print("MongoDB initialization completed"); -EOF - print_success "Created MongoDB initialization script" - fi - - print_success "Environment setup completed" -} - -# Function to build all services -build_services() { - print_status "Building all microservices..." - - docker compose -f $COMPOSE_FILE build - - if [ $? -eq 0 ]; then - print_success "All services built successfully" - else - print_error "Failed to build services" - exit 1 - fi -} - -# Function to start all services -start_services() { - print_status "Starting all services..." - - docker compose -f $COMPOSE_FILE up -d - - if [ $? -eq 0 ]; then - print_success "All services started successfully" - else - print_error "Failed to start services" - exit 1 - fi -} - -# Function to stop all services -stop_services() { - print_status "Stopping all services..." - - docker compose -f $COMPOSE_FILE down - - print_success "All services stopped" -} - -# Function to restart all services -restart_services() { - stop_services - start_services -} - -# Function to show service status -show_status() { - print_status "Service status:" - docker compose -f $COMPOSE_FILE ps - - print_status "Service health checks:" - - # Wait a moment for services to start - sleep 5 - - # services=("api-gateway:8000" "token-service:8001" "battery-service:8002" "demand-response-service:8003") - services=("api-gateway:8000" "token-service:8001") - for service in "${services[@]}"; do - name="${service%:*}" - port="${service#*:}" - - if curl -f -s "http://localhost:$port/health" > /dev/null; then - print_success "$name is healthy" - else - print_warning "$name is not responding to health checks" - fi - done -} - -# Function to view logs -view_logs() { - if [ -z "$2" ]; then - print_status "Showing logs for all services..." - docker compose -f $COMPOSE_FILE logs -f - else - print_status "Showing logs for $2..." - docker compose -f $COMPOSE_FILE logs -f $2 - fi -} - -# Function to clean up everything -cleanup() { - print_warning "This will remove all containers, images, and volumes. Are you sure? (y/N)" - read -r response - if [[ "$response" =~ ^([yY][eE][sS]|[yY])$ ]]; then - print_status "Cleaning up everything..." - docker compose -f $COMPOSE_FILE down -v --rmi all - docker system prune -f - print_success "Cleanup completed" - else - print_status "Cleanup cancelled" - fi -} - -# Function to run database migrations or setup -setup_database() { - print_status "Setting up databases..." - - # Wait for MongoDB to be ready - print_status "Waiting for MongoDB to be ready..." - sleep 10 - - # Run any additional setup scripts here - print_success "Database setup completed" -} - -# Function to show help -show_help() { - echo "Energy Management Microservices Deployment Script" - echo "" - echo "Usage: $0 [COMMAND]" - echo "" - echo "Commands:" - echo " setup Setup environment and dependencies" - echo " build Build all microservices" - echo " start Start all services" - echo " stop Stop all services" - echo " restart Restart all services" - echo " status Show service status and health" - echo " logs Show logs for all services" - echo " logs Show logs for specific service" - echo " deploy Full deployment (setup + build + start)" - echo " db-setup Setup databases" - echo " cleanup Remove all containers, images, and volumes" - echo " help Show this help message" - echo "" - echo "Examples:" - echo " $0 deploy # Full deployment" - echo " $0 logs battery-service # Show battery service logs" - echo " $0 status # Check service health" -} - -# Main script logic -case "${1:-help}" in - setup) - check_dependencies - setup_environment - ;; - build) - check_dependencies - build_services - ;; - start) - check_dependencies - start_services - ;; - stop) - stop_services - ;; - rb) - check_dependencies - build_services - restart_services - ;; - restart) - restart_services - ;; - status) - show_status - ;; - logs) - view_logs $@ - ;; - deploy) - check_dependencies - setup_environment - build_services - start_services - setup_database - show_status - ;; - db-setup) - setup_database - ;; - cleanup) - cleanup - ;; - help|--help|-h) - show_help - ;; - *) - print_error "Unknown command: $1" - show_help - exit 1 - ;; -esac diff --git a/microservices/docker-compose.yml b/microservices/docker-compose.yml index 218e956..cc07640 100644 --- a/microservices/docker-compose.yml +++ b/microservices/docker-compose.yml @@ -54,8 +54,8 @@ services: - token-service - sensor-service - data-ingestion-service + - demand-response-service # - battery-service - # - demand-response-service networks: - energy-network @@ -95,23 +95,24 @@ services: # - energy-network # Demand Response Service - # demand-response-service: - # build: - # context: ./demand-response-service - # dockerfile: Dockerfile - # container_name: demand-response-service - # restart: unless-stopped - # ports: - # - "8003:8003" - # environment: - # - MONGO_URL=mongodb://admin:password123@mongodb:27017/energy_dashboard_demand_response?authSource=admin - # - REDIS_URL=redis://redis:6379 - # - IOT_CONTROL_SERVICE_URL=http://iot-control-service:8006 - # depends_on: - # - mongodb - # - redis - # networks: - # - energy-network + demand-response-service: + build: + context: ./demand-response-service + dockerfile: Dockerfile + container_name: demand-response-service + restart: unless-stopped + ports: + - "8003:8003" + environment: + - MONGO_URL=mongodb://admin:password123@mongodb:27017/energy_dashboard_demand_response?authSource=admin + - REDIS_URL=redis://redis:6379 + - SENSOR_SERVICE_URL=http://sensor-service:8007 + depends_on: + - mongodb + - redis + - sensor-service + networks: + - energy-network # P2P Trading Service # p2p-trading-service: diff --git a/microservices/openapi.yaml b/microservices/openapi.yaml new file mode 100644 index 0000000..49fe289 --- /dev/null +++ b/microservices/openapi.yaml @@ -0,0 +1,1748 @@ +openapi: 3.0.3 +info: + title: Energy Management API + description: | + Central API gateway for energy management microservices. + + This API provides comprehensive energy management functionality including: + - Token-based authentication and authorization + - Sensor and room monitoring + - Battery management and optimization + - Demand response event management + - Real-time data ingestion and analytics + - Data export and reporting + + All requests are routed through the API Gateway running on port 8000. + version: 1.0.0 + contact: + name: API Support + email: support@example.com + license: + name: MIT + url: https://opensource.org/licenses/MIT + +servers: + - url: http://localhost:8000 + description: Local development server + - url: http://localhost:8000/api/v1 + description: API Gateway v1 + +tags: + - name: Gateway + description: API Gateway health and statistics + - name: Authentication + description: Token management and authentication + - name: Sensors + description: Sensor management and data retrieval + - name: Rooms + description: Room management and monitoring + - name: Analytics + description: Data analytics and reporting + - name: Demand Response + description: Demand response event management + - name: Batteries + description: Battery monitoring and control + - name: Data Ingestion + description: Data ingestion and source management + +security: + - BearerAuth: [] + +components: + securitySchemes: + BearerAuth: + type: http + scheme: bearer + bearerFormat: JWT + description: JWT token obtained from /api/v1/tokens/generate + + schemas: + HealthResponse: + type: object + properties: + service: + type: string + example: api-gateway + status: + type: string + enum: [healthy, degraded, unhealthy] + example: healthy + timestamp: + type: string + format: date-time + version: + type: string + example: 1.0.0 + services: + type: object + additionalProperties: + type: object + healthy_services: + type: integer + total_services: + type: integer + + GatewayStats: + type: object + properties: + total_requests: + type: integer + example: 1523 + successful_requests: + type: integer + example: 1498 + failed_requests: + type: integer + example: 25 + success_rate: + type: number + format: float + example: 98.36 + uptime_seconds: + type: number + format: float + example: 3600.5 + service_requests: + type: object + additionalProperties: + type: integer + timestamp: + type: string + format: date-time + + TokenGenerateRequest: + type: object + required: + - name + - list_of_resources + properties: + name: + type: string + description: Token name or identifier + example: dashboard-token + list_of_resources: + type: array + items: + type: string + description: List of resources this token can access + example: [sensor_1, sensor_2, room_A] + data_aggregation: + type: boolean + default: false + description: Allow data aggregation + time_aggregation: + type: boolean + default: false + description: Allow time-based aggregation + embargo: + type: boolean + default: false + description: Apply embargo restrictions + exp_hours: + type: integer + default: 24 + description: Token expiration in hours + example: 24 + + TokenResponse: + type: object + properties: + token: + type: string + description: JWT token string + example: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9... + + TokenValidationResponse: + type: object + properties: + valid: + type: boolean + example: true + token: + type: string + decoded: + type: object + nullable: true + error: + type: string + nullable: true + + SensorMetadata: + type: object + required: + - sensor_id + - sensor_type + properties: + sensor_id: + type: string + example: sensor_1 + sensor_type: + type: string + enum: [temperature, humidity, co2, energy, occupancy] + room: + type: string + example: Room A + location: + type: string + example: Northeast corner + status: + type: string + enum: [active, inactive, maintenance, error] + default: active + metadata: + type: object + additionalProperties: true + + SensorReading: + type: object + required: + - sensor_id + - timestamp + - value + properties: + sensor_id: + type: string + example: sensor_1 + timestamp: + type: integer + format: int64 + description: Unix timestamp + example: 1702396800 + value: + type: number + format: float + example: 23.5 + unit: + type: string + example: celsius + room: + type: string + example: Room A + + RoomCreate: + type: object + required: + - name + properties: + name: + type: string + example: Conference Room A + description: + type: string + example: Main conference room on first floor + floor: + type: integer + example: 1 + area_sqm: + type: number + format: float + example: 45.5 + capacity: + type: integer + example: 12 + room_type: + type: string + example: conference + + RoomUpdate: + type: object + properties: + description: + type: string + floor: + type: integer + area_sqm: + type: number + format: float + capacity: + type: integer + room_type: + type: string + + DataQuery: + type: object + properties: + sensor_ids: + type: array + items: + type: string + example: [sensor_1, sensor_2] + start_time: + type: integer + format: int64 + description: Unix timestamp + end_time: + type: integer + format: int64 + description: Unix timestamp + limit: + type: integer + default: 100 + example: 100 + offset: + type: integer + default: 0 + example: 0 + + EventRequest: + type: object + required: + - event_time + - load_kwh + - load_percentage + - iots + properties: + event_time: + type: string + format: date-time + description: When the DR event should occur + example: 2025-12-10T14:00:00 + load_kwh: + type: number + format: float + minimum: 0 + exclusiveMinimum: true + description: Target load reduction in kWh + example: 5.0 + load_percentage: + type: number + format: float + minimum: 0 + maximum: 100 + description: Target reduction as percentage of total load + example: 15.0 + iots: + type: array + items: + type: string + minItems: 1 + description: List of device IDs to participate + example: [sensor_1, sensor_2] + duration_minutes: + type: integer + minimum: 1 + maximum: 120 + default: 59 + description: Event duration in minutes + example: 59 + + InvitationResponse: + type: object + required: + - event_id + - iot_id + - response + properties: + event_id: + type: string + example: 550e8400-e29b-41d4-a716-446655440000 + iot_id: + type: string + example: sensor_1 + response: + type: string + enum: [WAITING, YES, NO] + example: YES + committed_reduction_kw: + type: number + format: float + minimum: 0 + nullable: true + example: 2.5 + + LoadReductionRequest: + type: object + required: + - target_reduction_kw + - duration_minutes + properties: + target_reduction_kw: + type: number + format: float + minimum: 0 + exclusiveMinimum: true + example: 5.0 + duration_minutes: + type: integer + minimum: 1 + example: 60 + priority_iots: + type: array + items: + type: string + nullable: true + example: [sensor_1, sensor_2] + + FlexibilityResponse: + type: object + properties: + timestamp: + type: string + format: date-time + total_flexibility_kw: + type: number + format: float + minimum: 0 + example: 15.5 + devices: + type: array + items: + type: object + properties: + device_id: + type: string + example: sensor_1 + available_kw: + type: number + format: float + minimum: 0 + example: 3.5 + instruction: + type: string + example: participation + current_power: + type: number + format: float + minimum: 0 + example: 3.5 + + BatteryStatus: + type: object + properties: + battery_id: + type: string + example: battery_1 + capacity: + type: number + format: float + description: Total battery capacity in kWh + example: 13.5 + stored_energy: + type: number + format: float + description: Current stored energy in kWh + example: 10.8 + state_of_charge: + type: number + format: float + description: State of charge percentage (0-100) + example: 80.0 + status: + type: string + enum: [charging, discharging, idle, maintenance] + example: idle + power_kw: + type: number + format: float + description: Current power flow in kW (positive = charging) + example: 0.0 + last_updated: + type: string + format: date-time + + ChargingRequest: + type: object + required: + - power_kw + - duration_minutes + properties: + power_kw: + type: number + format: float + minimum: 0 + exclusiveMinimum: true + example: 3.5 + duration_minutes: + type: integer + minimum: 1 + example: 60 + + Error: + type: object + properties: + detail: + type: string + example: Resource not found + status_code: + type: integer + example: 404 + +paths: + # Gateway Endpoints + /health: + get: + tags: + - Gateway + summary: Gateway health check + description: Check the health status of the API Gateway and all registered microservices + operationId: getGatewayHealth + security: [] + responses: + '200': + description: Health check successful + content: + application/json: + schema: + $ref: '#/components/schemas/HealthResponse' + '503': + description: Service unavailable + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + + /services/status: + get: + tags: + - Gateway + summary: Get services status + description: Get detailed status of all registered microservices + operationId: getServicesStatus + security: [] + responses: + '200': + description: Services status retrieved successfully + content: + application/json: + schema: + type: object + properties: + services: + type: object + additionalProperties: + type: object + timestamp: + type: string + format: date-time + total_services: + type: integer + healthy_services: + type: integer + + /stats: + get: + tags: + - Gateway + summary: Get gateway statistics + description: Get API Gateway request statistics and performance metrics + operationId: getGatewayStats + security: [] + responses: + '200': + description: Statistics retrieved successfully + content: + application/json: + schema: + $ref: '#/components/schemas/GatewayStats' + + /api/v1/overview: + get: + tags: + - Gateway + summary: Get system overview + description: Get comprehensive overview from all services + operationId: getSystemOverview + responses: + '200': + description: System overview retrieved successfully + content: + application/json: + schema: + type: object + properties: + system_overview: + type: object + additionalProperties: + type: object + timestamp: + type: string + format: date-time + services_checked: + type: integer + + # Token Service Endpoints + /api/v1/tokens/generate: + post: + tags: + - Authentication + summary: Generate new JWT token + description: Generate a new JWT token with specified permissions and resources + operationId: generateToken + security: [] + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/TokenGenerateRequest' + responses: + '200': + description: Token generated successfully + content: + application/json: + schema: + $ref: '#/components/schemas/TokenResponse' + '500': + description: Internal server error + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + + /api/v1/tokens/validate: + post: + tags: + - Authentication + summary: Validate JWT token + description: Validate and decode a JWT token + operationId: validateToken + security: [] + requestBody: + required: true + content: + application/json: + schema: + type: object + required: + - token + properties: + token: + type: string + responses: + '200': + description: Token validation result + content: + application/json: + schema: + $ref: '#/components/schemas/TokenValidationResponse' + + /api/v1/tokens: + get: + tags: + - Authentication + summary: Get all tokens + description: Retrieve list of all tokens + operationId: getTokens + security: [] + responses: + '200': + description: Tokens retrieved successfully + content: + application/json: + schema: + type: object + properties: + tokens: + type: array + items: + type: object + count: + type: integer + + # Sensor Service Endpoints + /api/v1/sensors/get: + get: + tags: + - Sensors + summary: Get all sensors + description: Get all sensors with optional filtering by room, type, or status + operationId: getSensors + parameters: + - name: room + in: query + description: Filter by room name + schema: + type: string + - name: sensor_type + in: query + description: Filter by sensor type + schema: + type: string + enum: [temperature, humidity, co2, energy, occupancy] + - name: status + in: query + description: Filter by sensor status + schema: + type: string + enum: [active, inactive, maintenance, error] + responses: + '200': + description: Sensors retrieved successfully + content: + application/json: + schema: + type: object + properties: + sensors: + type: array + items: + $ref: '#/components/schemas/SensorMetadata' + count: + type: integer + filters: + type: object + + /api/v1/sensors/{sensor_id}: + get: + tags: + - Sensors + summary: Get sensor details + description: Get detailed information about a specific sensor + operationId: getSensorDetails + parameters: + - name: sensor_id + in: path + required: true + description: Sensor identifier + schema: + type: string + responses: + '200': + description: Sensor details retrieved successfully + content: + application/json: + schema: + $ref: '#/components/schemas/SensorMetadata' + '404': + description: Sensor not found + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + + put: + tags: + - Sensors + summary: Update sensor + description: Update sensor metadata + operationId: updateSensor + parameters: + - name: sensor_id + in: path + required: true + schema: + type: string + requestBody: + required: true + content: + application/json: + schema: + type: object + additionalProperties: true + responses: + '200': + description: Sensor updated successfully + content: + application/json: + schema: + type: object + properties: + message: + type: string + sensor_id: + type: string + updated_at: + type: string + format: date-time + + delete: + tags: + - Sensors + summary: Delete sensor + description: Delete a sensor and all its data + operationId: deleteSensor + parameters: + - name: sensor_id + in: path + required: true + schema: + type: string + responses: + '200': + description: Sensor deleted successfully + content: + application/json: + schema: + type: object + properties: + message: + type: string + sensor_id: + type: string + readings_deleted: + type: integer + + /api/v1/sensors: + post: + tags: + - Sensors + summary: Create sensor + description: Register a new sensor + operationId: createSensor + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/SensorMetadata' + responses: + '200': + description: Sensor created successfully + content: + application/json: + schema: + type: object + properties: + message: + type: string + sensor_id: + type: string + created_at: + type: string + format: date-time + '400': + description: Invalid request + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + + /api/v1/sensors/{sensor_id}/data: + get: + tags: + - Sensors + summary: Get sensor data + description: Get historical data for a specific sensor + operationId: getSensorData + parameters: + - name: sensor_id + in: path + required: true + schema: + type: string + - name: start_time + in: query + description: Start timestamp (Unix) + schema: + type: integer + format: int64 + - name: end_time + in: query + description: End timestamp (Unix) + schema: + type: integer + format: int64 + - name: limit + in: query + description: Maximum records to return + schema: + type: integer + default: 100 + - name: offset + in: query + description: Records to skip + schema: + type: integer + default: 0 + responses: + '200': + description: Sensor data retrieved successfully + content: + application/json: + schema: + type: object + properties: + data: + type: array + items: + $ref: '#/components/schemas/SensorReading' + total_count: + type: integer + query: + $ref: '#/components/schemas/DataQuery' + execution_time_ms: + type: number + + # Room Management Endpoints + /api/v1/rooms/names: + get: + tags: + - Rooms + summary: Get room names + description: Get simple list of room names for dropdowns + operationId: getRoomNames + responses: + '200': + description: Room names retrieved successfully + content: + application/json: + schema: + type: object + properties: + rooms: + type: array + items: + type: string + count: + type: integer + + /api/v1/rooms: + get: + tags: + - Rooms + summary: Get all rooms + description: Get all rooms with sensor counts and metrics + operationId: getRooms + responses: + '200': + description: Rooms retrieved successfully + content: + application/json: + schema: + type: object + properties: + rooms: + type: array + items: + type: object + count: + type: integer + + post: + tags: + - Rooms + summary: Create room + description: Create a new room + operationId: createRoom + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/RoomCreate' + responses: + '200': + description: Room created successfully + content: + application/json: + schema: + type: object + properties: + message: + type: string + room: + type: string + created_at: + type: string + format: date-time + '400': + description: Invalid request + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + + /api/v1/rooms/{room_name}: + get: + tags: + - Rooms + summary: Get room details + description: Get detailed room information + operationId: getRoomDetails + parameters: + - name: room_name + in: path + required: true + schema: + type: string + responses: + '200': + description: Room details retrieved successfully + content: + application/json: + schema: + type: object + '404': + description: Room not found + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + + put: + tags: + - Rooms + summary: Update room + description: Update an existing room + operationId: updateRoom + parameters: + - name: room_name + in: path + required: true + schema: + type: string + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/RoomUpdate' + responses: + '200': + description: Room updated successfully + content: + application/json: + schema: + type: object + properties: + message: + type: string + room: + type: string + updated_at: + type: string + format: date-time + modified: + type: boolean + + delete: + tags: + - Rooms + summary: Delete room + description: Delete a room + operationId: deleteRoom + parameters: + - name: room_name + in: path + required: true + schema: + type: string + responses: + '200': + description: Room deleted successfully + content: + application/json: + schema: + type: object + properties: + message: + type: string + + /api/v1/rooms/{room_name}/data: + get: + tags: + - Rooms + summary: Get room data + description: Get historical data for a specific room + operationId: getRoomData + parameters: + - name: room_name + in: path + required: true + schema: + type: string + - name: start_time + in: query + description: Start timestamp (Unix) + schema: + type: integer + format: int64 + - name: end_time + in: query + description: End timestamp (Unix) + schema: + type: integer + format: int64 + - name: limit + in: query + description: Maximum records to return + schema: + type: integer + default: 100 + responses: + '200': + description: Room data retrieved successfully + content: + application/json: + schema: + type: object + properties: + room: + type: string + room_metrics: + type: array + items: + type: object + sensor_readings: + type: array + items: + $ref: '#/components/schemas/SensorReading' + period: + type: object + properties: + start_time: + type: integer + end_time: + type: integer + + # Analytics Endpoints + /api/v1/data/query: + post: + tags: + - Analytics + summary: Query data + description: Advanced data querying with multiple filters + operationId: queryData + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/DataQuery' + responses: + '200': + description: Query executed successfully + content: + application/json: + schema: + type: object + + /api/v1/analytics/summary: + get: + tags: + - Analytics + summary: Get analytics summary + description: Get comprehensive analytics summary + operationId: getAnalyticsSummary + parameters: + - name: hours + in: query + description: Hours of data to analyze + schema: + type: integer + default: 24 + responses: + '200': + description: Analytics summary retrieved successfully + content: + application/json: + schema: + type: object + + /api/v1/analytics/energy: + get: + tags: + - Analytics + summary: Get energy analytics + description: Get energy-specific analytics + operationId: getEnergyAnalytics + parameters: + - name: hours + in: query + description: Hours of data to analyze + schema: + type: integer + default: 24 + - name: room + in: query + description: Filter by room + schema: + type: string + responses: + '200': + description: Energy analytics retrieved successfully + content: + application/json: + schema: + type: object + + /api/v1/export: + get: + tags: + - Analytics + summary: Export data + description: Export sensor data in JSON or CSV format + operationId: exportData + parameters: + - name: start_time + in: query + required: true + description: Start timestamp (Unix) + schema: + type: integer + format: int64 + - name: end_time + in: query + required: true + description: End timestamp (Unix) + schema: + type: integer + format: int64 + - name: sensor_ids + in: query + description: Comma-separated sensor IDs + schema: + type: string + - name: format + in: query + description: Export format + schema: + type: string + enum: [json, csv] + default: json + responses: + '200': + description: Data exported successfully + content: + application/json: + schema: + type: object + + /api/v1/events: + get: + tags: + - Analytics + summary: Get system events + description: Get system events and alerts + operationId: getSystemEvents + parameters: + - name: severity + in: query + description: Filter by severity + schema: + type: string + - name: event_type + in: query + description: Filter by event type + schema: + type: string + - name: hours + in: query + description: Hours of events to retrieve + schema: + type: integer + default: 24 + - name: limit + in: query + description: Maximum events to return + schema: + type: integer + default: 50 + responses: + '200': + description: Events retrieved successfully + content: + application/json: + schema: + type: object + properties: + events: + type: array + items: + type: object + count: + type: integer + period_hours: + type: integer + + # Demand Response Endpoints + /api/v1/demand-response/invitations/send: + post: + tags: + - Demand Response + summary: Send demand response invitation + description: Send demand response invitation to specified IoT devices + operationId: sendDRInvitation + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/EventRequest' + responses: + '200': + description: Invitation sent successfully + content: + application/json: + schema: + type: object + properties: + message: + type: string + event_id: + type: string + event_time: + type: string + format: date-time + participants: + type: integer + load_kwh: + type: number + load_percentage: + type: number + + /api/v1/demand-response/invitations/unanswered: + get: + tags: + - Demand Response + summary: Get unanswered invitations + description: Get all unanswered demand response invitations + operationId: getUnansweredInvitations + responses: + '200': + description: Unanswered invitations retrieved successfully + content: + application/json: + schema: + type: object + properties: + invitations: + type: array + items: + type: object + count: + type: integer + + /api/v1/demand-response/invitations/answered: + get: + tags: + - Demand Response + summary: Get answered invitations + description: Get answered demand response invitations + operationId: getAnsweredInvitations + parameters: + - name: hours + in: query + description: Hours to look back + schema: + type: integer + default: 24 + responses: + '200': + description: Answered invitations retrieved successfully + content: + application/json: + schema: + type: object + properties: + invitations: + type: array + items: + type: object + count: + type: integer + period_hours: + type: integer + + /api/v1/demand-response/invitations/answer: + post: + tags: + - Demand Response + summary: Answer invitation + description: Answer a demand response invitation + operationId: answerDRInvitation + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/InvitationResponse' + responses: + '200': + description: Response recorded successfully + content: + application/json: + schema: + type: object + properties: + message: + type: string + event_id: + type: string + iot_id: + type: string + response: + type: string + committed_reduction: + type: number + '400': + description: Invalid request + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + + /api/v1/demand-response/invitations/{event_id}: + get: + tags: + - Demand Response + summary: Get invitation details + description: Get details of a specific demand response invitation + operationId: getDRInvitation + parameters: + - name: event_id + in: path + required: true + schema: + type: string + responses: + '200': + description: Invitation details retrieved successfully + content: + application/json: + schema: + type: object + '404': + description: Invitation not found + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + + /api/v1/demand-response/events/active: + get: + tags: + - Demand Response + summary: Get active events + description: Get currently active demand response events + operationId: getActiveDREvents + responses: + '200': + description: Active events retrieved successfully + content: + application/json: + schema: + type: object + properties: + events: + type: array + items: + type: object + count: + type: integer + + /api/v1/demand-response/flexibility/current: + get: + tags: + - Demand Response + summary: Get current flexibility + description: Get current system flexibility capacity + operationId: getCurrentFlexibility + responses: + '200': + description: Flexibility retrieved successfully + content: + application/json: + schema: + type: object + properties: + timestamp: + type: string + format: date-time + flexibility: + $ref: '#/components/schemas/FlexibilityResponse' + + /api/v1/demand-response/flexibility/forecast: + get: + tags: + - Demand Response + summary: Get flexibility forecast + description: Get forecasted flexibility for the next specified hours + operationId: getFlexibilityForecast + parameters: + - name: hours + in: query + description: Forecast horizon in hours + schema: + type: integer + default: 24 + responses: + '200': + description: Forecast retrieved successfully + content: + application/json: + schema: + type: object + properties: + forecast_hours: + type: integer + flexibility_forecast: + type: object + generated_at: + type: string + format: date-time + + /api/v1/demand-response/load-reduction/execute: + post: + tags: + - Demand Response + summary: Execute load reduction + description: Execute immediate load reduction + operationId: executeLoadReduction + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/LoadReductionRequest' + responses: + '200': + description: Load reduction executed successfully + content: + application/json: + schema: + type: object + properties: + message: + type: string + target_reduction_kw: + type: number + actual_reduction_kw: + type: number + participating_devices: + type: array + items: + type: string + execution_time: + type: string + format: date-time + + /api/v1/demand-response/analytics/performance: + get: + tags: + - Demand Response + summary: Get performance analytics + description: Get demand response performance analytics + operationId: getDRPerformanceAnalytics + parameters: + - name: days + in: query + description: Analysis period in days + schema: + type: integer + default: 30 + responses: + '200': + description: Analytics retrieved successfully + content: + application/json: + schema: + type: object + properties: + period_days: + type: integer + analytics: + type: object + generated_at: + type: string + format: date-time + + # Battery Endpoints + /api/v1/batteries: + get: + tags: + - Batteries + summary: Get all batteries + description: Get all registered batteries + operationId: getBatteries + responses: + '200': + description: Batteries retrieved successfully + content: + application/json: + schema: + type: object + properties: + batteries: + type: array + items: + $ref: '#/components/schemas/BatteryStatus' + count: + type: integer + total_capacity: + type: number + total_stored_energy: + type: number + + /api/v1/batteries/{battery_id}: + get: + tags: + - Batteries + summary: Get battery status + description: Get specific battery status + operationId: getBatteryStatus + parameters: + - name: battery_id + in: path + required: true + schema: + type: string + responses: + '200': + description: Battery status retrieved successfully + content: + application/json: + schema: + type: object + properties: + battery_id: + type: string + status: + $ref: '#/components/schemas/BatteryStatus' + '404': + description: Battery not found + content: + application/json: + schema: + $ref: '#/components/schemas/Error' + + /api/v1/batteries/{battery_id}/charge: + post: + tags: + - Batteries + summary: Charge battery + description: Charge a battery with specified power + operationId: chargeBattery + parameters: + - name: battery_id + in: path + required: true + schema: + type: string + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/ChargingRequest' + responses: + '200': + description: Charging initiated successfully + content: + application/json: + schema: + type: object + properties: + message: + type: string + battery_id: + type: string + power_kw: + type: number + duration_minutes: + type: integer + estimated_completion: + type: string + format: date-time + + /api/v1/batteries/{battery_id}/discharge: + post: + tags: + - Batteries + summary: Discharge battery + description: Discharge a battery with specified power + operationId: dischargeBattery + parameters: + - name: battery_id + in: path + required: true + schema: + type: string + requestBody: + required: true + content: + application/json: + schema: + $ref: '#/components/schemas/ChargingRequest' + responses: + '200': + description: Discharging initiated successfully + content: + application/json: + schema: + type: object + properties: + message: + type: string + battery_id: + type: string + power_kw: + type: number + duration_minutes: + type: integer + estimated_completion: + type: string + format: date-time + + /api/v1/batteries/{battery_id}/history: + get: + tags: + - Batteries + summary: Get battery history + description: Get battery historical data + operationId: getBatteryHistory + parameters: + - name: battery_id + in: path + required: true + schema: + type: string + - name: hours + in: query + description: Hours of history to retrieve + schema: + type: integer + default: 24 + responses: + '200': + description: Battery history retrieved successfully + content: + application/json: + schema: + type: object + properties: + battery_id: + type: string + period_hours: + type: integer + history: + type: array + items: + type: object + data_points: + type: integer + + # Data Ingestion Endpoints + /api/v1/sources: + get: + tags: + - Data Ingestion + summary: Get data sources + description: Get all data ingestion sources + operationId: getDataSources + responses: + '200': + description: Data sources retrieved successfully + content: + application/json: + schema: + type: object + properties: + sources: + type: array + items: + type: object + count: + type: integer + + # WebSocket endpoint + /ws: + get: + tags: + - Sensors + summary: WebSocket connection + description: WebSocket endpoint for real-time sensor data streaming + operationId: websocketConnection + responses: + '101': + description: Switching Protocols - WebSocket connection established diff --git a/microservices/tasks b/microservices/tasks deleted file mode 100644 index aa90dc6..0000000 --- a/microservices/tasks +++ /dev/null @@ -1,47 +0,0 @@ -api-gateway -- Critical: Extend `SERVICES` and `service_requests` to include every routed microservice so proxy endpoints stop returning 404/KeyError for battery/demand-response/p2p/forecasting/iot routes (api-gateway/main.py:70-169). -- High: Guard `request_stats` updates against unknown services or look up configs dynamically before incrementing counters to avoid crashes (api-gateway/main.py:88-169). -- Possible Feature: Add per-service rate limiting and fallback routing with circuit breakers to keep the gateway responsive during downstream outages. -- Data to Store: Persist rolling latency/throughput metrics per backend plus authentication decision logs for audit and tuning. - -battery-service -- High: Handle zero or missing max charge/discharge power before dividing when optimising SOC to prevent ZeroDivisionError (battery-service/battery_service.py:205-213). -- Medium: Use the stored `capacity_kwh`/`stored_energy_kwh` fields when computing fleet totals so analytics reflect real values (battery-service/main.py:95-96). -- Possible Feature: Expose predictive maintenance recommendations based on usage profiles and integrate battery grouping/aggregation endpoints. -- Data to Store: Track per-cycle metadata (depth of discharge, temperatures) and maintenance events to support lifecycle analytics. - -data-ingestion-service -- High: Wrap the initial `wait_for(check_for_new_files)` call in error handling so startup connection/timeout issues don't kill the monitoring task (data-ingestion-service/src/ftp_monitor.py:62-100). -- Possible Feature: Provide a dashboard/REST endpoint for real-time ingestion status with manual retry controls and support for additional protocols (SFTP/HTTPS). -- Data to Store: Record per-file ingestion outcomes, error traces, and FTP scan history to analyse gaps and retry logic effectiveness. - -demand-response-service -- Critical: Restore the missing `models`, `database`, and `demand_response_service` modules referenced during import so the app can boot (demand-response-service/main.py:15-20). -- Possible Feature: Implement participant opt-in/opt-out scheduling, incentive tracking, and automated curtailment verification reports. -- Data to Store: Persist device participation history, response accuracy, and incentive payouts to evaluate program efficiency. - -forecasting-service -- Critical: Recreate the forecasting microservice implementation; the directory is empty so nothing can start (forecasting-service/). -- Possible Feature: Offer multiple forecast horizons with confidence intervals and expose model version management APIs. -- Data to Store: Keep training dataset metadata, forecast error metrics, and model artefacts to support retraining and auditing. - -iot-control-service -- Critical: Recreate the IoT control microservice implementation; the directory is empty so nothing can start (iot-control-service/). -- Possible Feature: Add device scheduling/policy engines with rules-based automation and rollback support for failed commands. -- Data to Store: Log all device capabilities, issued commands, acknowledgements, and firmware status to manage the fleet safely. - -p2p-trading-service -- Critical: Recreate the P2P trading microservice implementation; the directory is empty so nothing can start (p2p-trading-service/). -- Possible Feature: Build order-book style trading with price discovery, bidding windows, and settlement workflows. -- Data to Store: Capture trade offers, matched transactions, settlement receipts, and participant credit balances for compliance. - -sensor-service -- High: Fix the aggregation pipeline so sensor types are grouped correctly and room metrics use real readings instead of the constant "energy" fallback (sensor-service/room_service.py:408-420). -- Medium: Filter system events using comparable types (e.g., `created_at` or int timestamps) so queries return results (sensor-service/sensor_service.py:218-227). -- Possible Feature: Add anomaly detection on sensor streams and configurable alerting thresholds surfaced via dashboards/WebSockets. -- Data to Store: Maintain sensor calibration history, room occupancy patterns, and WebSocket subscription metrics for optimisation. - -token-service -- Medium: Stop forcibly overriding client-provided expiry/embargo values so custom lifetimes survive token generation (token-service/main.py:99-108). -- Possible Feature: Support role/permission templates with bulk token provisioning and self-service revocation flows. -- Data to Store: Persist token usage logs (IP, endpoint, timestamp) and refresh token metadata to improve security monitoring.