From 4bedcecf5dd2c584336e030e55c17c752efca96d Mon Sep 17 00:00:00 2001 From: rafaeldpsilva Date: Mon, 22 Dec 2025 16:35:22 +0000 Subject: [PATCH] feat: Implement HTTP Poller for IoT device data ingestion - Added iots-right.json configuration file to define IoT devices and their sensors. - Developed HttpPoller class to handle polling of IoT devices via HTTP. - Created IoT configuration loader to validate and load device configurations from JSON. - Introduced models for device status, polling metrics, and data sources. - Implemented API routes for health checks, device status retrieval, and configuration management. - Enhanced error handling and logging throughout the data ingestion process. --- .gitignore | 2 + monolith/.env.example | 22 +- monolith/iots-left.json | 1561 +++++++++++++++++ monolith/iots-right.json | 616 +++++++ monolith/src/core/config.py | 21 +- monolith/src/main.py | 177 +- .../src/modules/data_ingestion/__init__.py | 34 +- monolith/src/modules/data_ingestion/config.py | 45 - .../src/modules/data_ingestion/database.py | 478 ----- .../src/modules/data_ingestion/ftp_monitor.py | 339 ---- .../src/modules/data_ingestion/http_poller.py | 353 ++++ .../src/modules/data_ingestion/iot_config.py | 187 ++ monolith/src/modules/data_ingestion/models.py | 112 ++ monolith/src/modules/data_ingestion/router.py | 194 ++ .../modules/data_ingestion/slg_processor.py | 171 -- .../src/modules/demand_response/__init__.py | 8 +- monolith/src/modules/sensors/router.py | 2 +- .../src/modules/sensors/websocket_manager.py | 2 +- 18 files changed, 3157 insertions(+), 1167 deletions(-) create mode 100755 monolith/iots-left.json create mode 100755 monolith/iots-right.json delete mode 100644 monolith/src/modules/data_ingestion/config.py delete mode 100644 monolith/src/modules/data_ingestion/database.py delete mode 100644 monolith/src/modules/data_ingestion/ftp_monitor.py create mode 100644 monolith/src/modules/data_ingestion/http_poller.py create mode 100644 monolith/src/modules/data_ingestion/iot_config.py create mode 100644 monolith/src/modules/data_ingestion/models.py create mode 100644 monolith/src/modules/data_ingestion/router.py delete mode 100644 monolith/src/modules/data_ingestion/slg_processor.py diff --git a/.gitignore b/.gitignore index 901218e..f5f4762 100644 --- a/.gitignore +++ b/.gitignore @@ -173,4 +173,6 @@ poetry.toml # LSP config files pyrightconfig.json + +CLAUDE.md # End of https://www.toptal.com/developers/gitignore/api/python diff --git a/monolith/.env.example b/monolith/.env.example index 0523959..2df4233 100644 --- a/monolith/.env.example +++ b/monolith/.env.example @@ -1,20 +1,12 @@ -# MongoDB Configuration (external deployment) +# MongoDB Configuration # Update with your MongoDB connection string -MONGO_URL=mongodb://admin:password123@mongodb-host:27017/?authSource=admin +MONGO_URL=mongodb://admin:password123@localhost:27017/?authSource=admin -# Redis Configuration (external deployment, optional) -# Update with your Redis connection string -REDIS_URL=redis://redis-host:6379 -REDIS_ENABLED=false - -# FTP Configuration -FTP_SA4CPS_HOST=ftp.sa4cps.pt -FTP_SA4CPS_PORT=21 -FTP_SA4CPS_USERNAME=curvascarga@sa4cps.pt -FTP_SA4CPS_PASSWORD= -FTP_SA4CPS_REMOTE_PATH=/SLGs/ -FTP_CHECK_INTERVAL=21600 -FTP_SKIP_INITIAL_SCAN=true +# HTTP Poller Configuration +# IoT device polling settings +HTTP_POLL_INTERVAL=60 +HTTP_TIMEOUT=10 +HTTP_MAX_CONCURRENT=5 # Application Settings DEBUG=false diff --git a/monolith/iots-left.json b/monolith/iots-left.json new file mode 100755 index 0000000..aeb9735 --- /dev/null +++ b/monolith/iots-left.json @@ -0,0 +1,1561 @@ +{ + "iots": { + "hvac": [ + { + "name": "Air Conditioner 101", + "type": "hvac", + "uri": "http://192.168.2.5:8520/resource/Analyzer1_V4", + "sensors": [ + { + "type": "power", + "tag": ["Analyzer1_V4","Ph1_P"], + "data": "DOUBLE" + }, + { + "type": "voltage", + "tag": ["Analyzer1_V4","Ph1_U"], + "data": "DOUBLE" + }, + { + "type": "current", + "tag": ["Analyzer1_V4","Ph1_I"], + "data": "DOUBLE" + } + ] + }, + { + "name": "Air Conditioner 102", + "type": "hvac", + "uri": "http://192.168.2.5:8520/resource/Analyzer102ac_V1", + "sensors": [ + { + "type": "power", + "tag": [ + "Analyzer102ac_V1", + "AC102_P" + ], + "data": "DOUBLE" + }, + { + "type": "voltage", + "tag": [ + "Analyzer102ac_V1", + "AC102_U" + ], + "data": "DOUBLE" + }, + { + "type": "current", + "tag": [ + "Analyzer102ac_V1", + "AC102_I" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Air Conditioner 103", + "type": "hvac", + "uri": "http://192.168.2.5:8520/resource/Analyzer103ac_V1", + "sensors": [ + { + "type": "power", + "tag": [ + "Analyzer103ac_V1", + "AC103_P" + ], + "data": "DOUBLE" + }, + { + "type": "voltage", + "tag": [ + "Analyzer103ac_V1", + "AC103_U" + ], + "data": "DOUBLE" + }, + { + "type": "current", + "tag": [ + "Analyzer103ac_V1", + "AC103_I" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Air Conditioner 105", + "type": "hvac", + "uri": "http://192.168.2.5:8520/resource/Analyzer105ac_V1", + "sensors": [ + { + "type": "power", + "tag": [ + "Analyzer105ac_V1", + "AC105_P" + ], + "data": "DOUBLE" + }, + { + "type": "voltage", + "tag": [ + "Analyzer105ac_V1", + "AC105_U" + ], + "data": "DOUBLE" + }, + { + "type": "current", + "tag": [ + "Analyzer105ac_V1", + "AC105_I" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Air Conditioner 107", + "type": "hvac", + "uri": "http://192.168.2.5:8520/resource/Analyzer107_108_109ac_V1", + "sensors": [ + { + "type": "power", + "tag": [ + "Analyzer107_108_109ac_V1", + "AC107_P" + ], + "data": "DOUBLE" + }, + { + "type": "voltage", + "tag": [ + "Analyzer107_108_109ac_V1", + "AC107_U" + ], + "data": "DOUBLE" + }, + { + "type": "current", + "tag": [ + "Analyzer107_108_109ac_V1", + "AC107_I" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Air Conditioner 108", + "type": "hvac", + "uri": "http://192.168.2.5:8520/resource/Analyzer107_108_109ac_V1", + "sensors": [ + { + "type": "power", + "tag": [ + "Analyzer107_108_109ac_V1", + "AC108_P" + ], + "data": "DOUBLE" + }, + { + "type": "voltage", + "tag": [ + "Analyzer107_108_109ac_V1", + "AC108_U" + ], + "data": "DOUBLE" + }, + { + "type": "current", + "tag": [ + "Analyzer107_108_109ac_V1", + "AC108_I" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Air Conditioner 109", + "type": "hvac", + "uri": "http://192.168.2.5:8520/resource/Analyzer107_108_109ac_V1", + "sensors": [ + { + "type": "power", + "tag": [ + "Analyzer107_108_109ac_V1", + "AC109_P" + ], + "data": "DOUBLE" + }, + { + "type": "voltage", + "tag": [ + "Analyzer107_108_109ac_V1", + "AC109_U" + ], + "data": "DOUBLE" + }, + { + "type": "current", + "tag": [ + "Analyzer107_108_109ac_V1", + "AC109_I" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Air Conditioner Corredor", + "type": "hvac", + "uri": "http://192.168.2.5:8520/resource/Analyzer4_V4", + "sensors": [ + { + "type": "power", + "tag": [ + "Analyzer4_V4", + "P3" + ], + "data": "DOUBLE" + }, + { + "type": "voltage", + "tag": [ + "Analyzer4_V4", + "U3" + ], + "data": "DOUBLE" + }, + { + "type": "current", + "tag": [ + "Analyzer4_V4", + "I3" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Air Conditioner 101", + "type": "hvac", + "uri": "http://192.168.2.5:8520/resource/Analyzer1_V4", + "sensors": [ + { + "type": "power", + "tag": [ + "Analyzer1_V4", + "Ph1_P" + ], + "data": "DOUBLE" + }, + { + "type": "voltage", + "tag": [ + "Analyzer1_V4", + "Ph1_U" + ], + "data": "DOUBLE" + }, + { + "type": "current", + "tag": [ + "Analyzer1_V4", + "Ph1_I" + ], + "data": "DOUBLE" + } + ] + } + ], + "co2": [ + { + "name": "CO2 Sensor 101", + "type": "co2", + "uri": "http://192.168.2.5:8520/resource/Sensors_1_V4", + "sensors": [ + { + "type": "co2", + "tag": [ + "Sensors_1_V4", + "N101_CO2" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "CO2 Sensor 102", + "type": "co2", + "uri": "http://192.168.2.5:8520/resource/Sensors_1_V4", + "sensors": [ + { + "type": "co2", + "tag": [ + "Sensors_1_V4", + "N102_Co2" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "CO2 Sensor 103", + "type": "co2", + "uri": "http://192.168.2.5:8520/resource/Sensors_1_V4", + "sensors": [ + { + "type": "co2", + "tag": [ + "Sensors_1_V4", + "N103_CO2" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "CO2 Sensor 104", + "type": "co2", + "uri": "http://192.168.2.5:8520/resource/Sensors_2_V3", + "sensors": [ + { + "type": "co2", + "tag": [ + "Sensors_2_V3", + "N104_Co2" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "CO2 Sensor 105", + "type": "co2", + "uri": "http://192.168.2.5:8520/resource/Sensors_2_V3", + "sensors": [ + { + "type": "co2", + "tag": [ + "Sensors_2_V3", + "N105_Co2" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "CO2 Sensor 106", + "type": "co2", + "uri": "http://192.168.2.5:8520/resource/Sensors_2_V3", + "sensors": [ + { + "type": "co2", + "tag": [ + "Sensors_2_V3", + "N106_Co2" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "CO2 Sensor 108", + "type": "co2", + "uri": "http://192.168.2.5:8520/resource/Sensors_3_V2", + "sensors": [ + { + "type": "co2", + "tag": [ + "Sensors_3_V2", + "N108_CO2" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "CO2 Sensor 109", + "type": "co2", + "uri": "http://192.168.2.5:8520/resource/Sensors_3_V2", + "sensors": [ + { + "type": "co2", + "tag": [ + "Sensors_3_V2", + "N109_CO2" + ], + "data": "DOUBLE" + } + ] + } + ], + "voc": [ + { + "name": "VOC Sensor 101", + "type": "voc", + "uri": "http://192.168.2.5:8520/resource/Sensors_1_V4", + "sensors": [ + { + "type": "voc", + "tag": [ + "Sensors_1_V4", + "N101_VOC_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "VOC Sensor 102", + "type": "voc", + "uri": "http://192.168.2.5:8520/resource/Sensors_1_V4", + "sensors": [ + { + "type": "voc", + "tag": [ + "Sensors_1_V4", + "N102_VOC_Air_Quality_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "VOC Sensor 103", + "type": "voc", + "uri": "http://192.168.2.5:8520/resource/Sensors_1_V4", + "sensors": [ + { + "type": "voc", + "tag": [ + "Sensors_1_V4", + "N103_VOC_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "VOC Sensor 104", + "type": "voc", + "uri": "http://192.168.2.5:8520/resource/Sensors_2_V3", + "sensors": [ + { + "type": "voc", + "tag": [ + "Sensors_2_V3", + "N104_VOC_Air_Quality_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "VOC Sensor 105", + "type": "voc", + "uri": "http://192.168.2.5:8520/resource/Sensors_2_V3", + "sensors": [ + { + "type": "voc", + "tag": [ + "Sensors_2_V3", + "N105_VOC_Air_Quality_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "VOC Sensor 106", + "type": "voc", + "uri": "http://192.168.2.5:8520/resource/Sensors_2_V3", + "sensors": [ + { + "type": "voc", + "tag": [ + "Sensors_2_V3", + "N106_VOC_Air_Quality_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "VOC Sensor 108", + "type": "voc", + "uri": "http://192.168.2.5:8520/resource/Sensors_3_V2", + "sensors": [ + { + "type": "voc", + "tag": [ + "Sensors_3_V2", + "N108_VOC_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "VOC Sensor 109", + "type": "voc", + "uri": "http://192.168.2.5:8520/resource/Sensors_3_V2", + "sensors": [ + { + "type": "voc", + "tag": [ + "Sensors_3_V2", + "N109_VOC_x10" + ], + "data": "DOUBLE" + } + ] + } + ], + "battery": [ + { + "name": "Battery_4", + "type": "battery", + "uri": "192.168.2.57", + "sensors": [ + { + "type": "energy", + "tag": [ + "battery", + "stored_energy" + ], + "data": "DOUBLE" + }, + { + "type": "charging_rate", + "tag": [ + "battery", + "charging_rate" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Battery_5", + "type": "battery", + "uri": "192.168.2.58", + "sensors": [ + { + "type": "energy", + "tag": [ + "battery", + "stored_energy" + ], + "data": "DOUBLE" + }, + { + "type": "charging_rate", + "tag": [ + "battery", + "charging_rate" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Battery_6", + "type": "battery", + "uri": "192.168.2.59", + "sensors": [ + { + "type": "energy", + "tag": [ + "battery", + "stored_energy" + ], + "data": "DOUBLE" + }, + { + "type": "charging_rate", + "tag": [ + "battery", + "charging_rate" + ], + "data": "DOUBLE" + } + ] + } + ], + "generation": [ + { + "name": "PV", + "type": "generation", + "uri": "http://192.168.2.68:8089/desenrasca/generation/3750", + "sensors": [ + { + "type": "generation", + "tag": [ + "generation_w" + ], + "data": "DOUBLE" + } + ] + } + ], + "lamp": [ + { + "name": "Lamp 1_101", + "type": "lamp", + "uri": "http://192.168.2.68:8089/desenrasca/lamp/101/1", + "sensors": [ + { + "type": "power", + "tag": [ + "consumption_w" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Lamp 2_101", + "type": "lamp", + "uri": "http://192.168.2.68:8089/desenrasca/lamp/101/2", + "sensors": [ + { + "type": "power", + "tag": [ + "consumption_w" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Lamp 1_102", + "type": "lamp", + "uri": "http://192.168.2.68:8089/desenrasca/lamp/102/1", + "sensors": [ + { + "type": "power", + "tag": [ + "consumption_w" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Lamp 2_102", + "type": "lamp", + "uri": "http://192.168.2.68:8089/desenrasca/lamp/102/2", + "sensors": [ + { + "type": "power", + "tag": [ + "consumption_w" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Lamp 1_103", + "type": "lamp", + "uri": "http://192.168.2.68:8089/desenrasca/lamp/103/1", + "sensors": [ + { + "type": "power", + "tag": [ + "consumption_w" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Lamp 1_105", + "type": "lamp", + "uri": "http://192.168.2.68:8089/desenrasca/lamp/105/1", + "sensors": [ + { + "type": "power", + "tag": [ + "consumption_w" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Lamp 2_105", + "type": "lamp", + "uri": "http://192.168.2.68:8089/desenrasca/lamp/105/2", + "sensors": [ + { + "type": "power", + "tag": [ + "consumption_w" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Lamp 1_106", + "type": "lamp", + "uri": "http://192.168.2.68:8089/desenrasca/lamp/106/1", + "sensors": [ + { + "type": "power", + "tag": [ + "consumption_w" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Lamp 1_107", + "type": "lamp", + "uri": "http://192.168.2.68:8089/desenrasca/lamp/107/1", + "sensors": [ + { + "type": "power", + "tag": [ + "consumption_w" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Lamp 1_108", + "type": "lamp", + "uri": "http://192.168.2.68:8089/desenrasca/lamp/108/1", + "sensors": [ + { + "type": "power", + "tag": [ + "consumption_w" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Lamp 1_109", + "type": "lamp", + "uri": "http://192.168.2.68:8089/desenrasca/lamp/109/1", + "sensors": [ + { + "type": "power", + "tag": [ + "consumption_w" + ], + "data": "DOUBLE" + } + ] + } + ], + "movement": [ + { + "name": "Movement Sensor 102_1", + "type": "movement", + "uri": "http://192.168.2.5:8520/resource/Sensors_1_V4", + "sensors": [ + { + "type": "movement", + "tag": [ + "Sensors_1_V4", + "Movement_Sensor_N102_1" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Movement Sensor 102_2", + "type": "movement", + "uri": "http://192.168.2.5:8520/resource/Sensors_1_V4", + "sensors": [ + { + "type": "movement", + "tag": [ + "Sensors_1_V4", + "Movement_Sensor_N102_2" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Movement Sensor 102_3", + "type": "movement", + "uri": "http://192.168.2.5:8520/resource/Sensors_1_V4", + "sensors": [ + { + "type": "movement", + "tag": [ + "Sensors_1_V4", + "Movement_Sensor_N102_3" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Movement Sensor 105_1", + "type": "movement", + "uri": "http://192.168.2.5:8520/resource/Sensors_2_V3", + "sensors": [ + { + "type": "movement", + "tag": [ + "Sensors_2_V3", + "Movement_Sensor_N105_1" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Movement Sensor 105_2", + "type": "movement", + "uri": "http://192.168.2.5:8520/resource/Sensors_2_V3", + "sensors": [ + { + "type": "movement", + "tag": [ + "Sensors_2_V3", + "Movement_Sensor_N105_2" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Movement Contact 107", + "type": "movement", + "uri": "http://192.168.2.5:8520/resource/Sensors_3_V2", + "sensors": [ + { + "type": "movement", + "tag": [ + "Sensors_3_V2", + "N107_Movement_Contact" + ], + "data": "DOUBLE" + } + ] + } + ], + "door": [ + { + "name": "Door Sensor 101", + "type": "door", + "uri": "http://192.168.2.5:8520/resource/Sensors_1_V4", + "sensors": [ + { + "type": "door", + "tag": [ + "Sensors_1_V4", + "N101_Door_NO" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Door Sensor 102", + "type": "door", + "uri": "http://192.168.2.5:8520/resource/Sensors_1_V4", + "sensors": [ + { + "type": "door", + "tag": [ + "Sensors_1_V4", + "N102_Door_NO" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Door Sensor 103", + "type": "door", + "uri": "http://192.168.2.5:8520/resource/Sensors_1_V4", + "sensors": [ + { + "type": "door", + "tag": [ + "Sensors_1_V4", + "N103_Door_NO" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Door Sensor 104", + "type": "door", + "uri": "http://192.168.2.5:8520/resource/Sensors_2_V3", + "sensors": [ + { + "type": "door", + "tag": [ + "Sensors_2_V3", + "N104_Door_NO" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Door Sensor 105", + "type": "door", + "uri": "http://192.168.2.5:8520/resource/Sensors_2_V3", + "sensors": [ + { + "type": "door", + "tag": [ + "Sensors_2_V3", + "N105_Door_NO" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Door Sensor 106", + "type": "door", + "uri": "http://192.168.2.5:8520/resource/Sensors_2_V3", + "sensors": [ + { + "type": "door", + "tag": [ + "Sensors_2_V3", + "N106_Door_NO" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Door Sensor 107", + "type": "door", + "uri": "http://192.168.2.5:8520/resource/Sensors_3_V2", + "sensors": [ + { + "type": "door", + "tag": [ + "Sensors_3_V2", + "N107_Door_NO" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Door Sensor 108", + "type": "door", + "uri": "http://192.168.2.5:8520/resource/Sensors_3_V2", + "sensors": [ + { + "type": "door", + "tag": [ + "Sensors_3_V2", + "N108_Door_NO" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Door Sensor 109", + "type": "door", + "uri": "http://192.168.2.5:8520/resource/Sensors_3_V2", + "sensors": [ + { + "type": "door", + "tag": [ + "Sensors_3_V2", + "N109_Door_NO" + ], + "data": "DOUBLE" + } + ] + } + ], + "light": [ + { + "name": "Light Sensor 101", + "type": "light", + "uri": "http://192.168.2.5:8520/resource/Sensors_1_V4", + "sensors": [ + { + "type": "light", + "tag": [ + "Sensors_1_V4", + "N101_Light_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Light Sensor 102", + "type": "light", + "uri": "http://192.168.2.5:8520/resource/Sensors_1_V4", + "sensors": [ + { + "type": "light", + "tag": [ + "Sensors_1_V4", + "N102_Light_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Light Sensor 103", + "type": "light", + "uri": "http://192.168.2.5:8520/resource/Sensors_1_V4", + "sensors": [ + { + "type": "light", + "tag": [ + "Sensors_1_V4", + "N103_Light_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Light Sensor 105", + "type": "light", + "uri": "http://192.168.2.5:8520/resource/Sensors_2_V3", + "sensors": [ + { + "type": "light", + "tag": [ + "Sensors_2_V3", + "N105_Light_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Light Sensor 106", + "type": "light", + "uri": "http://192.168.2.5:8520/resource/Sensors_2_V3", + "sensors": [ + { + "type": "light", + "tag": [ + "Sensors_2_V3", + "N106_Light_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Light Sensor Outside", + "type": "light", + "uri": "http://192.168.2.5:8520/resource/Sensors_3_V2", + "sensors": [ + { + "type": "light", + "tag": [ + "Sensors_3_V2", + "Light_Sensor_Outside" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Light Sensor 107", + "type": "light", + "uri": "http://192.168.2.5:8520/resource/Sensors_3_V2", + "sensors": [ + { + "type": "light", + "tag": [ + "Sensors_3_V2", + "N107_Light_Intensity_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Corridor Light", + "type": "light", + "uri": "http://192.168.2.5:8520/resource/Sensors_3_V2", + "sensors": [ + { + "type": "light", + "tag": [ + "Sensors_3_V2", + "Corridor_Light_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Light Sensor 108", + "type": "light", + "uri": "http://192.168.2.5:8520/resource/Sensors_3_V2", + "sensors": [ + { + "type": "light", + "tag": [ + "Sensors_3_V2", + "N108_Light_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Light Sensor 109", + "type": "light", + "uri": "http://192.168.2.5:8520/resource/Sensors_3_V2", + "sensors": [ + { + "type": "light", + "tag": [ + "Sensors_3_V2", + "N109_Light_x10" + ], + "data": "DOUBLE" + } + ] + } + ], + "sockets": [ + { + "name": "Sockets-101-102-103", + "type": "sockets", + "uri": "http://192.168.2.5:8520/resource/Analyzer1_V4", + "sensors": [ + { + "type": "power", + "tag": [ + "Analyzer1_V4", + "Ph2_P" + ], + "data": "DOUBLE" + }, + { + "type": "voltage", + "tag": [ + "Analyzer1_V4", + "Ph2_U" + ], + "data": "DOUBLE" + }, + { + "type": "current", + "tag": [ + "Analyzer1_V4", + "Ph2_I" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Sockets-104-105-106", + "type": "sockets", + "uri": "http://192.168.2.5:8520/resource/Analyzer2_V3", + "sensors": [ + { + "type": "power", + "tag": [ + "Analyzer2_V3", + "Ph1_P" + ], + "data": "DOUBLE" + }, + { + "type": "voltage", + "tag": [ + "Analyzer2_V3", + "Ph1_U" + ], + "data": "DOUBLE" + }, + { + "type": "current", + "tag": [ + "Analyzer2_V3", + "Ph1_I" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Sockets-107-108-109", + "type": "sockets", + "uri": "http://192.168.2.5:8520/resource/Analyzer3_V3", + "sensors": [ + { + "type": "power", + "tag": [ + "Analyzer3_V3", + "Ph3_P" + ], + "data": "DOUBLE" + }, + { + "type": "voltage", + "tag": [ + "Analyzer3_V3", + "Ph3_U" + ], + "data": "DOUBLE" + }, + { + "type": "current", + "tag": [ + "Analyzer3_V3", + "Ph3_I" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Sockets-corredor", + "type": "sockets", + "uri": "http://192.168.2.5:8520/resource/Analyzer4_V4", + "sensors": [ + { + "type": "power", + "tag": [ + "Analyzer4_V4", + "P1" + ], + "data": "DOUBLE" + }, + { + "type": "voltage", + "tag": [ + "Analyzer4_V4", + "U1" + ], + "data": "DOUBLE" + }, + { + "type": "current", + "tag": [ + "Analyzer4_V4", + "I1" + ], + "data": "DOUBLE" + } + ] + } + ], + "temperature": [ + { + "name": "Temperature Sensor 101", + "type": "temperature", + "uri": "http://192.168.2.5:8520/resource/Sensors_1_V4", + "sensors": [ + { + "type": "temperature", + "tag": [ + "Sensors_1_V4", + "N101_Temp_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Temperature Sensor 102", + "type": "temperature", + "uri": "http://192.168.2.5:8520/resource/Sensors_1_V4", + "sensors": [ + { + "type": "temperature", + "tag": [ + "Sensors_1_V4", + "N102_Temperature_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Temperature Sensor 103", + "type": "temperature", + "uri": "http://192.168.2.5:8520/resource/Sensors_1_V4", + "sensors": [ + { + "type": "temperature", + "tag": [ + "Sensors_1_V4", + "N103_Temp_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Temperature Sensor 104", + "type": "temperature", + "uri": "http://192.168.2.5:8520/resource/Sensors_2_V3", + "sensors": [ + { + "type": "temperature", + "tag": [ + "Sensors_2_V3", + "N104_Temperature_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Temperature Sensor 105", + "type": "temperature", + "uri": "http://192.168.2.5:8520/resource/Sensors_2_V3", + "sensors": [ + { + "type": "temperature", + "tag": [ + "Sensors_2_V3", + "N105_Temperature_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Temperature Sensor 106", + "type": "temperature", + "uri": "http://192.168.2.5:8520/resource/Sensors_2_V3", + "sensors": [ + { + "type": "temperature", + "tag": [ + "Sensors_2_V3", + "N106_Temperature_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Temperature Sensor 107", + "type": "temperature", + "uri": "http://192.168.2.5:8520/resource/Sensors_3_V2", + "sensors": [ + { + "type": "temperature", + "tag": [ + "Sensors_3_V2", + "N107_Temperature_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Temperature Sensor 108", + "type": "temperature", + "uri": "http://192.168.2.5:8520/resource/Sensors_3_V2", + "sensors": [ + { + "type": "temperature", + "tag": [ + "Sensors_3_V2", + "N108_Temp_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Temperature Sensor 109", + "type": "temperature", + "uri": "http://192.168.2.5:8520/resource/Sensors_3_V2", + "sensors": [ + { + "type": "temperature", + "tag": [ + "Sensors_3_V2", + "N109_Temp_x10" + ], + "data": "DOUBLE" + } + ] + } + ], + "humidity": [ + { + "name": "Humidity Sensor 101", + "type": "humidity", + "uri": "http://192.168.2.5:8520/resource/Sensors_1_V4", + "sensors": [ + { + "type": "humidity", + "tag": [ + "Sensors_1_V4", + "N101_Hum_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Humidity Sensor 102", + "type": "humidity", + "uri": "http://192.168.2.5:8520/resource/Sensors_1_V4", + "sensors": [ + { + "type": "humidity", + "tag": [ + "Sensors_1_V4", + "N102_Humidity_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Humidity Sensor 103", + "type": "humidity", + "uri": "http://192.168.2.5:8520/resource/Sensors_1_V4", + "sensors": [ + { + "type": "humidity", + "tag": [ + "Sensors_1_V4", + "N103_Hum_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Humidity Sensor 104", + "type": "humidity", + "uri": "http://192.168.2.5:8520/resource/Sensors_2_V3", + "sensors": [ + { + "type": "humidity", + "tag": [ + "Sensors_2_V3", + "N104_Humidity_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Humidity Sensor 105", + "type": "humidity", + "uri": "http://192.168.2.5:8520/resource/Sensors_2_V3", + "sensors": [ + { + "type": "humidity", + "tag": [ + "Sensors_2_V3", + "N105_Humidity_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Humidity Sensor 106", + "type": "humidity", + "uri": "http://192.168.2.5:8520/resource/Sensors_2_V3", + "sensors": [ + { + "type": "humidity", + "tag": [ + "Sensors_2_V3", + "N106_Humidity_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Humidity Sensor 107", + "type": "humidity", + "uri": "http://192.168.2.5:8520/resource/Sensors_3_V2", + "sensors": [ + { + "type": "humidity", + "tag": [ + "Sensors_3_V2", + "N107_Humidity_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Humidity Sensor 108", + "type": "humidity", + "uri": "http://192.168.2.5:8520/resource/Sensors_3_V2", + "sensors": [ + { + "type": "humidity", + "tag": [ + "Sensors_3_V2", + "N108_Hum_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Humidity Sensor 109", + "type": "humidity", + "uri": "http://192.168.2.5:8520/resource/Sensors_3_V2", + "sensors": [ + { + "type": "humidity", + "tag": [ + "Sensors_3_V2", + "N109_Hum_x10" + ], + "data": "DOUBLE" + } + ] + } + ], + "weather": [ + { + "name": "Weather", + "type": "weather", + "uri": "http://192.168.2.5:8520/resource/Weather", + "sensors": [ + { + "type": "temperature", + "tag": [ + "Weather", + "outdoor_app_temp" + ], + "data": "DOUBLE" + } + ] + } + ] + } +} \ No newline at end of file diff --git a/monolith/iots-right.json b/monolith/iots-right.json new file mode 100755 index 0000000..368efef --- /dev/null +++ b/monolith/iots-right.json @@ -0,0 +1,616 @@ +{ + "iots": { + "battery": [ + { + "name": "Battery_1", + "type": "battery", + "uri": "192.168.2.54", + "sensors": [ + { + "type": "energy", + "tag": [ + "battery", + "stored_energy" + ], + "data": "DOUBLE" + }, + { + "type": "charging_rate", + "tag": [ + "battery", + "charging_rate" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Battery_2", + "type": "battery", + "uri": "192.168.2.55", + "sensors": [ + { + "type": "energy", + "tag": [ + "battery", + "stored_energy" + ], + "data": "DOUBLE" + }, + { + "type": "charging_rate", + "tag": [ + "battery", + "charging_rate" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Battery_3", + "type": "battery", + "uri": "192.168.2.56", + "sensors": [ + { + "type": "energy", + "tag": [ + "battery", + "stored_energy" + ], + "data": "DOUBLE" + }, + { + "type": "charging_rate", + "tag": [ + "battery", + "charging_rate" + ], + "data": "DOUBLE" + } + ] + } + ], + "refrigerator": [ + { + "name": "Fridge", + "type": "refrigerator", + "uri": "http://192.168.2.5:8520/enaplug/read/170307001", + "sensors": [ + { + "type": "power", + "tag": [ + "enaplug_170307001", + "act1" + ], + "data": "DOUBLE" + }, + { + "type": "voltage", + "tag": [ + "enaplug_170307001", + "volt1" + ], + "data": "DOUBLE" + }, + { + "type": "current", + "tag": [ + "enaplug_170307001", + "curr1" + ], + "data": "DOUBLE" + }, + { + "type": "doorOpen", + "tag": [ + "enaplug_170307001", + "doorOpened" + ], + "data": "BOOLEAN" + }, + { + "type": "state", + "tag": [ + "enaplug_170307001", + "state" + ], + "data": "DOUBLE" + }, + { + "type": "internal Temperature", + "tag": [ + "enaplug_170307001", + "temp2" + ], + "data": "DOUBLE" + }, + { + "type": "external Temperature", + "tag": [ + "enaplug_170307001", + "temp1" + ], + "data": "DOUBLE" + }, + { + "type": "humidity", + "tag": [ + "enaplug_170307001", + "hum1" + ], + "data": "DOUBLE" + } + ] + } + ], + "waterheater": [ + { + "name": "Water Heater", + "type": "water heater", + "uri": "http://192.168.2.5:8520/enaplug/read/180717001", + "sensors": [ + { + "type": "power", + "tag": [ + "enaplug_180717001", + "act" + ], + "data": "DOUBLE" + }, + { + "type": "voltage", + "tag": [ + "enaplug_180717001", + "volt" + ], + "data": "DOUBLE" + }, + { + "type": "current", + "tag": [ + "enaplug_180717001", + "curr" + ], + "data": "DOUBLE" + }, + { + "type": "state", + "tag": [ + "enaplug_180717001", + "state" + ], + "data": "BOOLEAN" + }, + { + "type": "temperature", + "tag": [ + "enaplug_180717001", + "temp" + ], + "data": "DOUBLE" + } + ] + } + ], + "microwave": [ + { + "name": "Microwave", + "type": "microwave", + "uri": "http://192.168.2.5:8520/resource/AnalyzerKitHall_V2", + "sensors": [ + { + "type": "power", + "tag": [ + "AnalyzerKitHall_V2", + "microwave_active" + ], + "data": "DOUBLE" + }, + { + "type": "voltage", + "tag": [ + "AnalyzerKitHall_V2", + "microwave_voltage" + ], + "data": "DOUBLE" + }, + { + "type": "current", + "tag": [ + "AnalyzerKitHall_V2", + "microwave_current_x10" + ], + "data": "DOUBLE" + } + ] + } + ], + "dishwasher": [ + { + "name": "Dishwasher", + "type": "dishwasher", + "uri": "http://192.168.2.5:8520/resource/AnalyzerKitHall_V2", + "sensors": [ + { + "type": "power", + "tag": [ + "AnalyzerKitHall_V2", + "dishwasher_active" + ], + "data": "DOUBLE" + }, + { + "type": "voltage", + "tag": [ + "AnalyzerKitHall_V2", + "dishwasher_voltage" + ], + "data": "DOUBLE" + }, + { + "type": "current", + "tag": [ + "AnalyzerKitHall_V2", + "dishwasher_current_x10" + ], + "data": "DOUBLE" + } + ] + } + ], + "kettle": [ + { + "name": "Kettle", + "type": "kettle", + "uri": "http://192.168.2.5:8520/resource/AnalyzerKitHall_V2", + "sensors": [ + { + "type": "power", + "tag": [ + "AnalyzerKitHall_V2", + "kettle_active" + ], + "data": "DOUBLE" + }, + { + "type": "voltage", + "tag": [ + "AnalyzerKitHall_V2", + "kettle_voltage" + ], + "data": "DOUBLE" + }, + { + "type": "current", + "tag": [ + "AnalyzerKitHall_V2", + "kettle_current_x10" + ], + "data": "DOUBLE" + } + ] + } + ], + "hvac": [ + { + "name": "Air Conditioner Kitchen", + "type": "hvac", + "uri": "http://192.168.2.5:8520/resource/AnalyzerKitHall_V2", + "sensors": [ + { + "type": "power", + "tag": [ + "AnalyzerKitHall_V2", + "kitchen_ac_activePower" + ], + "data": "DOUBLE" + }, + { + "type": "voltage", + "tag": [ + "AnalyzerKitHall_V2", + "kitchen_ac_voltage" + ], + "data": "DOUBLE" + }, + { + "type": "current", + "tag": [ + "AnalyzerKitHall_V2", + "kitchen_ac_current_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Air Conditioner Hallway", + "type": "hvac", + "uri": "http://192.168.2.5:8520/resource/AnalyzerKitHall_V2", + "sensors": [ + { + "type": "power", + "tag": [ + "AnalyzerKitHall_V2", + "hallway_ac_activePower" + ], + "data": "DOUBLE" + }, + { + "type": "voltage", + "tag": [ + "AnalyzerKitHall_V2", + "hallway_ac_voltage" + ], + "data": "DOUBLE" + }, + { + "type": "current", + "tag": [ + "AnalyzerKitHall_V2", + "hallway_ac_current_x10" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Air Conditioner 112_115", + "type": "hvac", + "uri": "http://192.168.2.5:8520/resource/Analyzer115_V1", + "sensors": [ + { + "type": "power", + "tag": [ + "Analyzer115_V1", + "P2_W" + ], + "data": "DOUBLE" + }, + { + "type": "voltage", + "tag": [ + "Analyzer115_V1", + "U2N_Vx10" + ], + "data": "DOUBLE" + }, + { + "type": "current", + "tag": [ + "Analyzer115_V1", + "Curr2_mA" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Air Conditioner 111_116", + "type": "hvac", + "uri": "http://192.168.2.5:8520/resource/Analyzer116_V1", + "sensors": [ + { + "type": "power", + "tag": [ + "Analyzer116_V1", + "P2_W" + ], + "data": "DOUBLE" + }, + { + "type": "voltage", + "tag": [ + "Analyzer116_V1", + "U2N_V" + ], + "data": "DOUBLE" + }, + { + "type": "current", + "tag": [ + "Analyzer116_V1", + "Curr2_A" + ], + "data": "DOUBLE" + } + ] + } + ], + "sockets": [ + { + "name": "Sockets 112_115", + "type": "sockets", + "uri": "http://192.168.2.5:8520/resource/Analyzer115_V1", + "sensors": [ + { + "type": "power", + "tag": [ + "Analyzer115_V1", + "P1_W" + ], + "data": "DOUBLE" + }, + { + "type": "voltage", + "tag": [ + "Analyzer115_V1", + "U1N_Vx10" + ], + "data": "DOUBLE" + }, + { + "type": "current", + "tag": [ + "Analyzer115_V1", + "Curr1_mA" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Sockets 111_116", + "type": "sockets", + "uri": "http://192.168.2.5:8520/resource/Analyzer116_V1", + "sensors": [ + { + "type": "power", + "tag": [ + "Analyzer116_V1", + "P3_W" + ], + "data": "DOUBLE" + }, + { + "type": "voltage", + "tag": [ + "Analyzer116_V1", + "U3N_V" + ], + "data": "DOUBLE" + }, + { + "type": "current", + "tag": [ + "Analyzer116_V1", + "Curr3_A" + ], + "data": "DOUBLE" + } + ] + } + ], + "lamp": [ + { + "name": "Lamp 1_111", + "type": "lamp", + "uri": "http://192.168.2.68:8089/desenrasca/lamp/111/1", + "sensors": [ + { + "type": "power", + "tag": [ + "consumption_w" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Lamp 1_112", + "type": "lamp", + "uri": "http://192.168.2.68:8089/desenrasca/lamp/112/1", + "sensors": [ + { + "type": "power", + "tag": [ + "consumption_w" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Lamp 2_112", + "type": "lamp", + "uri": "http://192.168.2.68:8089/desenrasca/lamp/112/2", + "sensors": [ + { + "type": "power", + "tag": [ + "consumption_w" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Lamp 3_112", + "type": "lamp", + "uri": "http://192.168.2.68:8089/desenrasca/lamp/112/3", + "sensors": [ + { + "type": "power", + "tag": [ + "consumption_w" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Lamp 1_115", + "type": "lamp", + "uri": "http://192.168.2.68:8089/desenrasca/lamp/115/1", + "sensors": [ + { + "type": "power", + "tag": [ + "consumption_w" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Lamp 2_115", + "type": "lamp", + "uri": "http://192.168.2.68:8089/desenrasca/lamp/115/2", + "sensors": [ + { + "type": "power", + "tag": [ + "consumption_w" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Lamp 3_115", + "type": "lamp", + "uri": "http://192.168.2.68:8089/desenrasca/lamp/115/3", + "sensors": [ + { + "type": "power", + "tag": [ + "consumption_w" + ], + "data": "DOUBLE" + } + ] + }, + { + "name": "Lamp 1_116", + "type": "lamp", + "uri": "http://192.168.2.68:8089/desenrasca/lamp/116/1", + "sensors": [ + { + "type": "power", + "tag": [ + "consumption_w" + ], + "data": "DOUBLE" + } + ] + } + ], + "generation": [ + { + "name": "Generation", + "type": "generation", + "uri": "http://192.168.2.68:8089/desenrasca/generation/3750", + "sensors": [ + { + "type": "generation", + "tag": [ + "generation_w" + ], + "data": "DOUBLE" + } + ] + } + ] + } +} \ No newline at end of file diff --git a/monolith/src/core/config.py b/monolith/src/core/config.py index 96134ab..6f9ea7b 100644 --- a/monolith/src/core/config.py +++ b/monolith/src/core/config.py @@ -23,18 +23,10 @@ class Settings(BaseSettings): data_ingestion_db_name: str = "digitalmente_ingestion" main_db_name: str = "energy_dashboard" - # Redis - redis_url: str = os.getenv("REDIS_URL", "redis://localhost:6379") - redis_enabled: bool = True # Can be disabled for full monolith mode - - # FTP Configuration (for data ingestion) - ftp_sa4cps_host: str = os.getenv("FTP_SA4CPS_HOST", "ftp.sa4cps.pt") - ftp_sa4cps_port: int = int(os.getenv("FTP_SA4CPS_PORT", "21")) - ftp_sa4cps_username: str = os.getenv("FTP_SA4CPS_USERNAME", "curvascarga@sa4cps.pt") - ftp_sa4cps_password: str = os.getenv("FTP_SA4CPS_PASSWORD", "") - ftp_sa4cps_remote_path: str = os.getenv("FTP_SA4CPS_REMOTE_PATH", "/SLGs/") - ftp_check_interval: int = int(os.getenv("FTP_CHECK_INTERVAL", "21600")) # 6 hours - ftp_skip_initial_scan: bool = os.getenv("FTP_SKIP_INITIAL_SCAN", "true").lower() == "true" + # HTTP Poller Configuration (for IoT devices) + http_poll_interval: int = int(os.getenv("HTTP_POLL_INTERVAL", "60")) # 60 seconds + http_timeout: int = int(os.getenv("HTTP_TIMEOUT", "10")) # 10 seconds + http_max_concurrent: int = int(os.getenv("HTTP_MAX_CONCURRENT", "5")) # 5 concurrent requests # CORS cors_origins: list = ["*"] @@ -42,11 +34,6 @@ class Settings(BaseSettings): cors_allow_methods: list = ["*"] cors_allow_headers: list = ["*"] - # Background Tasks - health_check_interval: int = 30 - event_scheduler_interval: int = 60 - auto_response_interval: int = 30 - class Config: env_file = ".env" case_sensitive = False diff --git a/monolith/src/main.py b/monolith/src/main.py index 2d07304..e072c69 100644 --- a/monolith/src/main.py +++ b/monolith/src/main.py @@ -14,14 +14,13 @@ from fastapi.middleware.cors import CORSMiddleware from core.config import settings from core.logging_config import setup_logging from core.database import db_manager -from core.redis import redis_manager from core.events import event_bus, EventTopics # Module imports from modules.sensors.router import router as sensors_router from modules.sensors.room_service import RoomService from modules.sensors import WebSocketManager -from modules.demand_response import DemandResponseService +# TEMPORARILY DISABLED: from modules.demand_response import DemandResponseService # Setup logging setup_logging() @@ -35,7 +34,7 @@ async def room_metrics_aggregation_task(): while True: try: - room_service = RoomService(db_manager.sensors_db, redis_manager.client) + room_service = RoomService(db_manager.sensors_db, None) await room_service.aggregate_all_room_metrics() await asyncio.sleep(300) # 5 minutes @@ -62,74 +61,66 @@ async def data_cleanup_task(): await asyncio.sleep(7200) -async def event_scheduler_task(): - """Background task for checking and executing scheduled DR events""" - logger.info("Starting event scheduler task") - - while True: - try: - service = DemandResponseService(db_manager.demand_response_db, redis_manager.client) - await service.check_scheduled_events() - await asyncio.sleep(settings.event_scheduler_interval) - - except Exception as e: - logger.error(f"Error in event scheduler task: {e}") - await asyncio.sleep(120) +# TEMPORARILY DISABLED: Demand Response background tasks +# async def event_scheduler_task(): +# """Background task for checking and executing scheduled DR events""" +# logger.info("Starting event scheduler task") +# +# while True: +# try: +# service = DemandResponseService(db_manager.demand_response_db, redis_manager.client) +# await service.check_scheduled_events() +# await asyncio.sleep(settings.event_scheduler_interval) +# +# except Exception as e: +# logger.error(f"Error in event scheduler task: {e}") +# await asyncio.sleep(120) -async def auto_response_task(): - """Background task for automatic demand response""" - logger.info("Starting auto-response task") - - while True: - try: - service = DemandResponseService(db_manager.demand_response_db, redis_manager.client) - await service.process_auto_responses() - await asyncio.sleep(settings.auto_response_interval) - - except Exception as e: - logger.error(f"Error in auto-response task: {e}") - await asyncio.sleep(90) +# async def auto_response_task(): +# """Background task for automatic demand response""" +# logger.info("Starting auto-response task") +# +# while True: +# try: +# service = DemandResponseService(db_manager.demand_response_db, redis_manager.client) +# await service.process_auto_responses() +# await asyncio.sleep(settings.auto_response_interval) +# +# except Exception as e: +# logger.error(f"Error in auto-response task: {e}") +# await asyncio.sleep(90) -async def energy_data_event_subscriber(): - """Subscribe to internal event bus for energy data events""" - logger.info("Starting energy data event subscriber") - - async def handle_energy_data(data): - """Handle energy data events""" - try: - service = DemandResponseService(db_manager.demand_response_db, redis_manager.client) - sensor_id = data.get("sensorId") or data.get("sensor_id") - power_kw = data.get("value", 0.0) - - if sensor_id: - service.update_device_power_cache(sensor_id, power_kw) - - except Exception as e: - logger.error(f"Error processing energy data event: {e}") - - # Subscribe to energy data events - event_bus.subscribe(EventTopics.ENERGY_DATA, handle_energy_data) +# async def energy_data_event_subscriber(): +# """Subscribe to internal event bus for energy data events""" +# logger.info("Starting energy data event subscriber") +# +# async def handle_energy_data(data): +# """Handle energy data events""" +# try: +# service = DemandResponseService(db_manager.demand_response_db, redis_manager.client) +# sensor_id = data.get("sensorId") or data.get("sensor_id") +# power_kw = data.get("value", 0.0) +# +# if sensor_id: +# service.update_device_power_cache(sensor_id, power_kw) +# +# except Exception as e: +# logger.error(f"Error processing energy data event: {e}") +# +# # Subscribe to energy data events +# event_bus.subscribe(EventTopics.ENERGY_DATA, handle_energy_data) -async def ftp_monitoring_task(): - """Background task for FTP monitoring""" - logger.info("Starting FTP monitoring task") +async def http_polling_task(http_poller): + """Background task for HTTP IoT device polling""" + logger.info("Starting HTTP polling task") - while True: - try: - from modules.data_ingestion import FTPMonitor, SLGProcessor - - ftp_monitor = FTPMonitor(db_manager.data_ingestion_db) - slg_processor = SLGProcessor(db_manager.data_ingestion_db) - - await ftp_monitor.check_and_process_files(slg_processor) - await asyncio.sleep(settings.ftp_check_interval) - - except Exception as e: - logger.error(f"Error in FTP monitoring task: {e}") - await asyncio.sleep(600) + try: + await http_poller.run() + except Exception as e: + logger.error(f"Error in HTTP polling task: {e}") # Application lifespan @@ -140,24 +131,33 @@ async def lifespan(app: FastAPI): # Connect to databases await db_manager.connect() - await redis_manager.connect() # Initialize default rooms - room_service = RoomService(db_manager.sensors_db, redis_manager.client) + room_service = RoomService(db_manager.sensors_db, None) await room_service.initialize_default_rooms() + # Initialize HTTP poller for IoT devices + from modules.data_ingestion import HttpPoller, set_http_poller + + http_poller = HttpPoller( + sensors_db=db_manager.sensors_db, + poll_interval=settings.http_poll_interval, + timeout=10, + max_concurrent=5 + ) + set_http_poller(http_poller) + # Subscribe to internal events - await energy_data_event_subscriber() + # TEMPORARILY DISABLED: await energy_data_event_subscriber() # Start background tasks asyncio.create_task(room_metrics_aggregation_task()) asyncio.create_task(data_cleanup_task()) - asyncio.create_task(event_scheduler_task()) - asyncio.create_task(auto_response_task()) + # TEMPORARILY DISABLED: asyncio.create_task(event_scheduler_task()) + # TEMPORARILY DISABLED: asyncio.create_task(auto_response_task()) - # Start FTP monitoring if not skipping initial scan - if not settings.ftp_skip_initial_scan: - asyncio.create_task(ftp_monitoring_task()) + # Start HTTP polling task + asyncio.create_task(http_polling_task(http_poller)) logger.info("Application startup complete") @@ -167,7 +167,6 @@ async def lifespan(app: FastAPI): # Disconnect from databases await db_manager.disconnect() - await redis_manager.disconnect() logger.info("Application shutdown complete") @@ -210,12 +209,6 @@ async def health_check(): # Check database connection await db_manager.main_db.command("ping") - # Check Redis connection (optional) - redis_status = "disabled" - if redis_manager.is_available: - await redis_manager.client.ping() - redis_status = "healthy" - return { "service": settings.app_name, "version": settings.app_version, @@ -223,12 +216,11 @@ async def health_check(): "timestamp": datetime.utcnow().isoformat(), "components": { "database": "healthy", - "redis": redis_status, "event_bus": "healthy" }, "modules": { "sensors": "loaded", - "demand_response": "loaded", + "demand_response": "disabled", "data_ingestion": "loaded" } } @@ -244,21 +236,21 @@ async def system_overview(): """Get system overview""" try: from modules.sensors import SensorService - from modules.demand_response import DemandResponseService + # TEMPORARILY DISABLED: from modules.demand_response import DemandResponseService - sensor_service = SensorService(db_manager.sensors_db, redis_manager.client) - dr_service = DemandResponseService(db_manager.demand_response_db, redis_manager.client) + sensor_service = SensorService(db_manager.sensors_db, None) + # TEMPORARILY DISABLED: dr_service = DemandResponseService(db_manager.demand_response_db, None) # Get sensor counts all_sensors = await sensor_service.get_sensors() active_sensors = [s for s in all_sensors if s.get("status") == "online"] # Get room counts - room_service = RoomService(db_manager.sensors_db, redis_manager.client) + room_service = RoomService(db_manager.sensors_db, None) all_rooms = await room_service.get_rooms() # Get DR event counts - active_events = len(dr_service.active_events) if hasattr(dr_service, 'active_events') else 0 + # TEMPORARILY DISABLED: active_events = len(dr_service.active_events) if hasattr(dr_service, 'active_events') else 0 return { "timestamp": datetime.utcnow().isoformat(), @@ -271,7 +263,7 @@ async def system_overview(): "total": len(all_rooms) }, "demand_response": { - "active_events": active_events + "status": "disabled" }, "event_bus": { "topics": event_bus.get_topics(), @@ -291,9 +283,16 @@ app.include_router( tags=["sensors"] ) -# Note: Demand Response and Data Ingestion routers would be added here +# Data Ingestion router +from modules.data_ingestion import router as data_ingestion_router +app.include_router( + data_ingestion_router, + prefix="/api/v1/ingestion", + tags=["data-ingestion"] +) + +# Note: Demand Response router would be added here (currently disabled) # app.include_router(demand_response_router, prefix="/api/v1/demand-response", tags=["demand-response"]) -# app.include_router(data_ingestion_router, prefix="/api/v1/ingestion", tags=["data-ingestion"]) if __name__ == "__main__": diff --git a/monolith/src/modules/data_ingestion/__init__.py b/monolith/src/modules/data_ingestion/__init__.py index d47f85a..82ffdb2 100644 --- a/monolith/src/modules/data_ingestion/__init__.py +++ b/monolith/src/modules/data_ingestion/__init__.py @@ -1,11 +1,31 @@ -"""Data Ingestion module - handles FTP monitoring and SA4CPS data processing.""" +"""Data Ingestion module - handles HTTP/MQTT IoT device polling.""" -from .ftp_monitor import FTPMonitor -from .slg_processor import SLGProcessor -from .config import Config +from .http_poller import HttpPoller +from .iot_config import IoTConfiguration, IoTDevice, get_iot_config, get_config_loader +from .models import ( + DataSourceType, PollingStatus, DeviceStatus, + DataSourceSummary, PollingMetrics +) +from .router import router, set_http_poller __all__ = [ - "FTPMonitor", - "SLGProcessor", - "Config", + # HTTP Poller + "HttpPoller", + "set_http_poller", + + # Configuration + "IoTConfiguration", + "IoTDevice", + "get_iot_config", + "get_config_loader", + + # Models + "DataSourceType", + "PollingStatus", + "DeviceStatus", + "DataSourceSummary", + "PollingMetrics", + + # Router + "router", ] diff --git a/monolith/src/modules/data_ingestion/config.py b/monolith/src/modules/data_ingestion/config.py deleted file mode 100644 index dbcb293..0000000 --- a/monolith/src/modules/data_ingestion/config.py +++ /dev/null @@ -1,45 +0,0 @@ -#!/usr/bin/env python3 -""" -Configuration for SA4CPS Data Ingestion Service -Simple configuration management for FTP and MongoDB connections -""" - -import os -from typing import Dict, Any - -# FTP Configuration for SA4CPS server -FTP_CONFIG: Dict[str, Any] = { - "host": os.getenv("FTP_SA4CPS_HOST", "ftp.sa4cps.pt"), - "username": os.getenv("FTP_SA4CPS_USERNAME", "curvascarga@sa4cps.pt"), - "password": os.getenv("FTP_SA4CPS_PASSWORD", 'n$WFtz9+bleN'), # Set via environment variable - "base_path": os.getenv("FTP_SA4CPS_REMOTE_PATH", "/SLGs/"), - "check_interval": int(os.getenv("FTP_CHECK_INTERVAL", "21600")), # 6 hours default - "skip_initial_scan": os.getenv("FTP_SKIP_INITIAL_SCAN", "true").lower() == "true", -} - -# MongoDB Configuration -# Debug environment variables -print(f"DEBUG: MONGO_URL env var = {os.getenv('MONGO_URL', 'NOT SET')}") -print(f"DEBUG: All env vars starting with MONGO: {[k for k in os.environ.keys() if k.startswith('MONGO')]}") - -MONGO_CONFIG: Dict[str, Any] = { - "connection_string": os.getenv( - "MONGO_URL", - "mongodb://admin:password123@localhost:27017/digitalmente_ingestion?authSource=admin" - ), - "database_name": os.getenv("MONGODB_DATABASE", "digitalmente_ingestion") -} - -# Logging Configuration -LOGGING_CONFIG: Dict[str, Any] = { - "level": os.getenv("LOG_LEVEL", "INFO"), - "format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s" -} - -# Service Configuration -SERVICE_CONFIG: Dict[str, Any] = { - "name": "SA4CPS Data Ingestion Service", - "version": "1.0.0", - "port": int(os.getenv("SERVICE_PORT", "8008")), - "host": os.getenv("SERVICE_HOST", "0.0.0.0") -} diff --git a/monolith/src/modules/data_ingestion/database.py b/monolith/src/modules/data_ingestion/database.py deleted file mode 100644 index 485ec50..0000000 --- a/monolith/src/modules/data_ingestion/database.py +++ /dev/null @@ -1,478 +0,0 @@ -import logging -from datetime import datetime -from typing import List, Dict, Any, Optional -from pymongo import MongoClient -from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError - -from config import MONGO_CONFIG - -logger = logging.getLogger(__name__) - - -class DatabaseManager: - - def __init__(self): - self.client: Optional[MongoClient] = None - self.db = None - self.collections = {} - self.energy_collections_cache = {} # Cache for dynamically created energy data collections - - self.connection_string = MONGO_CONFIG["connection_string"] - self.database_name = MONGO_CONFIG["database_name"] - - logger.info(f"Database manager initialized for: {self.database_name}") - - async def connect(self): - try: - logger.info(f"Connecting to MongoDB at: {self.connection_string}") - self.client = MongoClient(self.connection_string, serverSelectionTimeoutMS=5000) - - await self.ping() - - self.db = self.client[self.database_name] - self.collections = { - 'files': self.db.sa4cps_files, - 'metadata': self.db.sa4cps_metadata, - 'scanned_directories': self.db.sa4cps_scanned_directories - } - - self._create_base_indexes() - - logger.info(f"Connected to MongoDB database: {self.database_name}") - - except (ConnectionFailure, ServerSelectionTimeoutError) as e: - logger.error(f"Failed to connect to MongoDB: {e}") - raise - - async def close(self): - """Close MongoDB connection""" - if self.client: - self.client.close() - logger.debug("MongoDB connection closed") - - async def ping(self): - """Test database connection""" - if not self.client: - raise ConnectionFailure("No database connection") - - try: - # Use async approach with timeout - import asyncio - import concurrent.futures - - # Run the ping command in a thread pool to avoid blocking - loop = asyncio.get_event_loop() - with concurrent.futures.ThreadPoolExecutor() as pool: - await asyncio.wait_for( - loop.run_in_executor(pool, self.client.admin.command, 'ping'), - timeout=3.0 # 3 second timeout for ping - ) - logger.debug("MongoDB ping successful") - except asyncio.TimeoutError: - logger.error("MongoDB ping timeout after 3 seconds") - raise ConnectionFailure("MongoDB ping timeout") - except ConnectionFailure as e: - logger.error(f"MongoDB ping failed - Server not available: {e}") - raise - except Exception as e: - logger.error(f"MongoDB ping failed with error: {e}") - raise ConnectionFailure(f"Ping failed: {e}") - - def _create_base_indexes(self): - """Create indexes for base collections (not energy data collections)""" - try: - self.collections['files'].create_index("filename", unique=True) - self.collections['files'].create_index("processed_at") - self.collections['files'].create_index("directory_path") - - self.collections['scanned_directories'].create_index("directory_path", unique=True) - self.collections['scanned_directories'].create_index("last_scanned") - self.collections['scanned_directories'].create_index("scan_status") - - logger.info("Database indexes created successfully") - except Exception as e: - logger.warning(f"Failed to create indexes: {e}") - - def _extract_level3_path(self, directory_path: str) -> Optional[str]: - """Extract level 3 directory path (SLGs/Community/Building) from full path""" - # Expected structure: /SLGs/Community/Building/... - parts = directory_path.strip('/').split('/') - - if len(parts) >= 3 and parts[0] == 'SLGs': - # Return SLGs/Community/Building - return '/'.join(parts[:3]) - - return None - - def _sanitize_collection_name(self, level3_path: str) -> str: - """Convert level 3 directory path to valid MongoDB collection name - - Example: SLGs/CommunityA/Building1 -> energy_data__CommunityA_Building1 - """ - parts = level3_path.strip('/').split('/') - - if len(parts) >= 3 and parts[0] == 'SLGs': - # Use Community_Building as the collection suffix - collection_suffix = f"{parts[1]}_{parts[2]}" - collection_name = f"energy_data__{collection_suffix}" - return collection_name - - # Fallback: sanitize the entire path - sanitized = level3_path.replace('/', '_').replace('.', '_').replace(' ', '_') - sanitized = sanitized.strip('_') - return f"energy_data__{sanitized}" - - def _get_energy_collection(self, directory_path: str): - """Get or create energy data collection for a specific level 3 directory path""" - level3_path = self._extract_level3_path(directory_path) - - if not level3_path: - logger.warning(f"Could not extract level 3 path from: {directory_path}, using default collection") - # Fallback to a default collection for non-standard paths - collection_name = "energy_data__other" - else: - collection_name = self._sanitize_collection_name(level3_path) - - # Check cache first - if collection_name in self.energy_collections_cache: - return self.energy_collections_cache[collection_name] - - # Create/get collection - collection = self.db[collection_name] - - # Create indexes for this energy collection - try: - collection.create_index([("filename", 1), ("timestamp", 1)]) - collection.create_index("timestamp") - collection.create_index("meter_id") - logger.debug(f"Created indexes for collection: {collection_name}") - except Exception as e: - logger.warning(f"Failed to create indexes for {collection_name}: {e}") - - # Cache the collection - self.energy_collections_cache[collection_name] = collection - logger.info(f"Initialized energy data collection: {collection_name} for path: {directory_path}") - - return collection - - def _list_energy_collections(self) -> List[str]: - """List all energy data collections in the database""" - try: - all_collections = self.db.list_collection_names() - # Filter collections that start with 'energy_data__' - energy_collections = [c for c in all_collections if c.startswith('energy_data__')] - return energy_collections - except Exception as e: - logger.error(f"Error listing energy collections: {e}") - return [] - - async def store_file_data(self, filename: str, records: List[Dict[str, Any]], directory_path: str = None) -> bool: - try: - current_time = datetime.now() - - # Determine which collection to use based on directory path - if directory_path: - energy_collection = self._get_energy_collection(directory_path) - level3_path = self._extract_level3_path(directory_path) - else: - logger.warning(f"No directory path provided for {filename}, using default collection") - energy_collection = self._get_energy_collection("/SLGs/unknown/unknown") - level3_path = None - - # Store file metadata - file_metadata = { - "filename": filename, - "directory_path": directory_path, - "level3_path": level3_path, - "record_count": len(records), - "processed_at": current_time, - "file_size": sum(len(str(record)) for record in records), - "status": "processed" - } - - # Insert or update file record - self.collections['files'].replace_one( - {"filename": filename}, - file_metadata, - upsert=True - ) - - # Add filename and processed timestamp to each record - for record in records: - record["filename"] = filename - record["processed_at"] = current_time - record["directory_path"] = directory_path - - # Insert energy data records into the appropriate collection - if records: - result = energy_collection.insert_many(records) - inserted_count = len(result.inserted_ids) - logger.debug(f"Stored {inserted_count} records from {filename} to {energy_collection.name}") - return True - - return False - - except Exception as e: - logger.error(f"Error storing data for {filename}: {e}") - - # Store error metadata - error_metadata = { - "filename": filename, - "directory_path": directory_path, - "processed_at": current_time, - "status": "error", - "error_message": str(e) - } - - self.collections['files'].replace_one( - {"filename": filename}, - error_metadata, - upsert=True - ) - - return False - - async def get_processed_files(self) -> List[str]: - """Get list of successfully processed files""" - try: - cursor = self.collections['files'].find( - {"status": "processed"}, - {"filename": 1, "_id": 0} - ) - - files = [] - for doc in cursor: - files.append(doc["filename"]) - - return files - - except Exception as e: - logger.error(f"Error getting processed files: {e}") - return [] - - async def is_file_processed(self, filename: str) -> bool: - """Mock check if file is processed""" - return filename in await self.get_processed_files() - - async def get_file_info(self, filename: str) -> Optional[Dict[str, Any]]: - """Get information about a specific file""" - try: - return self.collections['files'].find_one({"filename": filename}) - except Exception as e: - logger.error(f"Error getting file info for {filename}: {e}") - return None - - # Directory scanning tracking methods - # Note: Only level 4+ directories (/SLGs/Community/Building/SubDir) are tracked - # to avoid unnecessary caching of high-level organizational directories - - async def is_directory_scanned(self, directory_path: str, since_timestamp: datetime = None) -> bool: - """Check if directory has been scanned recently - - Note: Only level 4+ directories are tracked in the database - """ - try: - query = {"directory_path": directory_path, "scan_status": "complete"} - if since_timestamp: - query["last_scanned"] = {"$gte": since_timestamp} - - result = self.collections['scanned_directories'].find_one(query) - return result is not None - except Exception as e: - logger.error(f"Error checking directory scan status for {directory_path}: {e}") - return False - - async def mark_directory_scanned(self, directory_path: str, file_count: int, ftp_last_modified: datetime = None) -> bool: - """Mark directory as scanned with current timestamp""" - try: - scan_record = { - "directory_path": directory_path, - "last_scanned": datetime.now(), - "file_count": file_count, - "scan_status": "complete" - } - - if ftp_last_modified: - scan_record["ftp_last_modified"] = ftp_last_modified - - # Use upsert to update existing or create new record - self.collections['scanned_directories'].replace_one( - {"directory_path": directory_path}, - scan_record, - upsert=True - ) - - logger.debug(f"Marked directory as scanned: {directory_path} ({file_count} files)") - return True - - except Exception as e: - logger.error(f"Error marking directory as scanned {directory_path}: {e}") - return False - - async def get_scanned_directories(self) -> List[Dict[str, Any]]: - """Get all scanned directory records""" - try: - cursor = self.collections['scanned_directories'].find() - return list(cursor) - except Exception as e: - logger.error(f"Error getting scanned directories: {e}") - return [] - - async def should_skip_directory(self, directory_path: str, ftp_last_modified: datetime = None) -> bool: - """Determine if directory should be skipped based on scan history and modification time""" - try: - scan_record = self.collections['scanned_directories'].find_one( - {"directory_path": directory_path, "scan_status": "complete"} - ) - - if not scan_record: - return False # Never scanned, should scan - - # If we have FTP modification time and it's newer than our last scan, don't skip - if ftp_last_modified and scan_record.get("last_scanned"): - return ftp_last_modified <= scan_record["last_scanned"] - - # If directory was scanned successfully, skip it (assuming it's historical data) - return True - - except Exception as e: - logger.error(f"Error determining if directory should be skipped {directory_path}: {e}") - return False - - async def get_stats(self) -> Dict[str, Any]: - """Get database statistics including all energy collections""" - try: - stats = { - "database": self.database_name, - "timestamp": datetime.now().isoformat() - } - - # Count documents in base collections - for name, collection in self.collections.items(): - try: - count = collection.count_documents({}) - stats[f"{name}_count"] = count - except Exception as e: - stats[f"{name}_count"] = f"error: {e}" - - # Get all energy collections and their counts - try: - energy_collections = self._list_energy_collections() - energy_stats = [] - total_energy_records = 0 - - for collection_name in energy_collections: - collection = self.db[collection_name] - count = collection.count_documents({}) - total_energy_records += count - - energy_stats.append({ - "collection": collection_name, - "record_count": count - }) - - stats["energy_collections"] = energy_stats - stats["total_energy_collections"] = len(energy_collections) - stats["total_energy_records"] = total_energy_records - - except Exception as e: - stats["energy_collections"] = f"error: {e}" - - # Get recent files - try: - recent_files = [] - cursor = self.collections['files'].find( - {}, - {"filename": 1, "processed_at": 1, "record_count": 1, "status": 1, "directory_path": 1, "level3_path": 1, "_id": 0} - ).sort("processed_at", -1).limit(5) - - for doc in cursor: - if doc.get("processed_at"): - doc["processed_at"] = doc["processed_at"].isoformat() - recent_files.append(doc) - - stats["recent_files"] = recent_files - - except Exception as e: - stats["recent_files"] = f"error: {e}" - - return stats - - except Exception as e: - logger.error(f"Error getting database stats: {e}") - return {"error": str(e), "timestamp": datetime.now().isoformat()} - - async def get_energy_data(self, - filename: Optional[str] = None, - start_time: Optional[datetime] = None, - end_time: Optional[datetime] = None, - directory_path: Optional[str] = None, - limit: int = 100) -> List[Dict[str, Any]]: - """Retrieve energy data with optional filtering - - Args: - filename: Filter by specific filename - start_time: Filter by start timestamp - end_time: Filter by end timestamp - directory_path: Filter by specific directory path (level 3). If None, queries all collections - limit: Maximum number of records to return - """ - try: - query = {} - - if filename: - query["filename"] = filename - - if start_time or end_time: - time_query = {} - if start_time: - time_query["$gte"] = start_time - if end_time: - time_query["$lte"] = end_time - query["timestamp"] = time_query - - data = [] - - # If directory_path is specified, query only that collection - if directory_path: - collection = self._get_energy_collection(directory_path) - cursor = collection.find(query).sort("timestamp", -1).limit(limit) - - for doc in cursor: - data.append(self._format_energy_document(doc)) - - else: - # Query across all energy collections - energy_collection_names = self._list_energy_collections() - - # Collect data from all collections, then sort and limit - all_data = [] - per_collection_limit = max(limit, 1000) # Get more from each to ensure we have enough after sorting - - for collection_name in energy_collection_names: - collection = self.db[collection_name] - cursor = collection.find(query).sort("timestamp", -1).limit(per_collection_limit) - - for doc in cursor: - all_data.append(self._format_energy_document(doc)) - - # Sort all data by timestamp and apply final limit - all_data.sort(key=lambda x: x.get("timestamp", ""), reverse=True) - data = all_data[:limit] - - return data - - except Exception as e: - logger.error(f"Error retrieving energy data: {e}") - return [] - - def _format_energy_document(self, doc: Dict[str, Any]) -> Dict[str, Any]: - """Format energy document for API response""" - # Convert ObjectId to string and datetime to ISO string - if "_id" in doc: - doc["_id"] = str(doc["_id"]) - if "timestamp" in doc and hasattr(doc["timestamp"], "isoformat"): - doc["timestamp"] = doc["timestamp"].isoformat() - if "processed_at" in doc and hasattr(doc["processed_at"], "isoformat"): - doc["processed_at"] = doc["processed_at"].isoformat() - return doc diff --git a/monolith/src/modules/data_ingestion/ftp_monitor.py b/monolith/src/modules/data_ingestion/ftp_monitor.py deleted file mode 100644 index 8f2a1de..0000000 --- a/monolith/src/modules/data_ingestion/ftp_monitor.py +++ /dev/null @@ -1,339 +0,0 @@ -import asyncio -from ftplib import FTP -import logging -import os -from datetime import datetime -from typing import List, Dict, Any, Optional -from dataclasses import dataclass -import tempfile - -from config import FTP_CONFIG -from slg_processor import SLGProcessor - -logger = logging.getLogger(__name__) - -@dataclass -class FTPFileInfo: - path: str - name: str - size: int - directory_path: str # Directory containing the file - modified_time: Optional[datetime] = None - - -class FTPMonitor: - def __init__(self, db_manager): - self.db_manager = db_manager - self.processor = SLGProcessor() - self.last_check: Optional[datetime] = None - self.processed_files: set = set() - self.files_processed_count = 0 - self.status = "initializing" - - self.ftp_host = FTP_CONFIG["host"] - self.ftp_user = FTP_CONFIG["username"] - self.ftp_pass = FTP_CONFIG["password"] - self.base_path = FTP_CONFIG["base_path"] - self.check_interval = FTP_CONFIG["check_interval"] - self.skip_initial_scan = FTP_CONFIG["skip_initial_scan"] - - logger.info(f"FTP Monitor initialized for {self.ftp_host}") - - async def initialize_processed_files_cache(self): - try: - # Add timeout to prevent blocking startup indefinitely - processed_file_names = await asyncio.wait_for( - self.db_manager.get_processed_files(), - timeout=10.0 # 10 second timeout - ) - - for filename in processed_file_names: - self.processed_files.add(filename) - - logger.info(f"Loaded {len(processed_file_names)} already processed files from database") - return len(processed_file_names) - except asyncio.TimeoutError: - logger.warning("Timeout loading processed files cache - continuing with empty cache") - return 0 - except Exception as e: - logger.error(f"Error loading processed files from database: {e}") - return 0 - - async def start_monitoring(self): - self.status = "initializing" - logger.info("Starting FTP monitoring loop") - - try: - await self.initialize_processed_files_cache() - logger.info("FTP monitor initialization completed") - except asyncio.CancelledError: - logger.info("FTP monitor initialization cancelled") - self.status = "stopped" - return - except Exception as e: - logger.error(f"Error during FTP monitor initialization: {e}") - self.status = "error" - try: - await asyncio.sleep(1800) - except asyncio.CancelledError: - logger.info("FTP monitor cancelled during error recovery") - self.status = "stopped" - return - - await asyncio.wait_for(self.check_for_new_files(), timeout=300.0) # 5 minute timeout - self.status = "running" - - # Optionally skip initial scan and wait for first scheduled interval - if self.skip_initial_scan: - logger.info(f"Skipping initial scan - waiting {self.check_interval/3600:.1f} hours for first scheduled check") - try: - await asyncio.sleep(self.check_interval) - except asyncio.CancelledError: - logger.info("FTP monitoring cancelled during initial wait") - self.status = "stopped" - return - - while True: - try: - # Add timeout to prevent indefinite blocking on FTP operations - await asyncio.wait_for(self.check_for_new_files(), timeout=300.0) # 5 minute timeout - self.status = "running" - - logger.info(f"Waiting {self.check_interval/3600:.1f} hours until next check") - await asyncio.sleep(self.check_interval) - - except asyncio.TimeoutError: - logger.warning("FTP check timed out after 5 minutes - will retry") - self.status = "error" - try: - await asyncio.sleep(900) # Wait 15 minutes before retry - except asyncio.CancelledError: - logger.info("FTP monitoring task cancelled during timeout recovery") - self.status = "stopped" - break - except asyncio.CancelledError: - logger.info("FTP monitoring task cancelled - shutting down gracefully") - self.status = "stopped" - break - except Exception as e: - self.status = "error" - logger.error(f"Error in monitoring loop: {e}") - try: - await asyncio.sleep(1800) # Wait 30 minutes before retry - except asyncio.CancelledError: - logger.info("FTP monitoring task cancelled during error recovery") - self.status = "stopped" - break - - async def check_for_new_files(self) -> Dict[str, Any]: - self.last_check = datetime.now() - logger.info(f"Checking FTP server at {self.last_check}") - - try: - with FTP(self.ftp_host) as ftp: - ftp.login(self.ftp_user, self.ftp_pass) - logger.info(f"Connected to FTP server: {self.ftp_host}") - - new_files = await self._find_slg_files(ftp) - - processed_count = 0 - skipped_count = 0 - for file_info in new_files: - # Check for cancellation during file processing loop - if asyncio.current_task().cancelled(): - raise asyncio.CancelledError() - - if file_info.name in self.processed_files: - logger.debug(f"Skipping already processed file (cached): {file_info.name}") - skipped_count += 1 - continue - - if await self.db_manager.is_file_processed(file_info.name): - logger.debug(f"Skipping already processed file (database): {file_info.name}") - self.processed_files.add(file_info.name) - skipped_count += 1 - continue - - logger.debug(f"Processing new file: {file_info.name}") - success = await self._process_file(ftp, file_info) - if success: - self.processed_files.add(file_info.name) - processed_count += 1 - logger.debug(f"Successfully processed file: {file_info.name} ({processed_count} total)") - self.files_processed_count += 1 - - result = { - "files_found": len(new_files), - "files_processed": processed_count, - "files_skipped": skipped_count, - "timestamp": self.last_check.isoformat() - } - - logger.info(f"Check complete: {result}") - return result - - except Exception as e: - logger.error(f"FTP check failed: {e}") - raise - - async def _find_slg_files(self, ftp: FTP) -> List[FTPFileInfo]: - files = [] - - try: - await self._scan_directories_iterative(ftp, self.base_path, files) - logger.info(f"Found {len(files)} .slg_v2 files across all directories") - return files - except Exception as e: - logger.error(f"Error scanning FTP directory: {e}") - return [] - - async def _scan_directories_iterative(self, ftp: FTP, base_path: str, files: List[FTPFileInfo]): - directories_to_scan = [(base_path, 0)] - visited_dirs = set() - skipped_dirs = 0 - scanned_dirs = 0 - - while directories_to_scan: - current_dir, current_depth = directories_to_scan.pop(0) # FIFO queue - - normalized_path = current_dir.rstrip('/') if current_dir != '/' else '/' - - if normalized_path in visited_dirs: - logger.debug(f"Skipping already visited directory: {normalized_path}") - continue - - visited_dirs.add(normalized_path) - - # Determine directory depth (level 4 = /SLGs/Community/Building/SubDir) - path_parts = normalized_path.strip('/').split('/') - directory_level = len(path_parts) - - # Check if directory should be skipped based on previous scans (only for level 4+) - if directory_level >= 4 and await self.db_manager.should_skip_directory(normalized_path): - logger.info(f"Skipping previously scanned level {directory_level} directory: {normalized_path}") - skipped_dirs += 1 - continue - - logger.debug(f"Scanning directory: {normalized_path} (depth: {current_depth}, queue: {len(directories_to_scan)})") - scanned_dirs += 1 - - try: - original_dir = ftp.pwd() - ftp.cwd(current_dir) - - dir_list = [] - ftp.retrlines('LIST', dir_list.append) - logger.debug(f"Found {len(dir_list)} entries in {normalized_path}") - - # Count files found in this directory - files_found_in_dir = 0 - - for line in dir_list: - parts = line.split() - if len(parts) >= 9: - filename = parts[-1] - permissions = parts[0] - - if filename in ['.', '..']: - continue - - if permissions.startswith('d'): - if normalized_path == '/': - subdirectory_path = f"/{filename}" - else: - subdirectory_path = f"{normalized_path}/{filename}" - - subdirectory_normalized = subdirectory_path.rstrip('/') if subdirectory_path != '/' else '/' - - if subdirectory_normalized not in visited_dirs: - directories_to_scan.append((subdirectory_path, current_depth + 1)) - logger.debug(f"Added to queue: {subdirectory_path}") - else: - logger.debug(f"Skipping already visited: {subdirectory_path}") - - elif filename.endswith('.sgl_v2'): - logger.debug(f"Found .slg_v2 file: {filename} in {normalized_path}") - try: - size = int(parts[4]) - if normalized_path == '/': - full_path = f"/{filename}" - else: - full_path = f"{normalized_path}/{filename}" - - files.append(FTPFileInfo( - path=full_path, - name=filename, - size=size, - directory_path=normalized_path - )) - files_found_in_dir += 1 - - except (ValueError, IndexError): - logger.warning(f"Could not parse file info for: {filename}") - - ftp.cwd(original_dir) - - # Mark directory as scanned (only for level 4+ directories) - if directory_level >= 4: - await self.db_manager.mark_directory_scanned(normalized_path, files_found_in_dir) - logger.debug(f"Completed scanning level {directory_level} directory: {normalized_path} ({files_found_in_dir} files found)") - else: - logger.debug(f"Completed scanning level {directory_level} directory (not saved to cache): {normalized_path} ({files_found_in_dir} files found)") - - except Exception as e: - logger.warning(f"Error scanning directory {normalized_path}: {e}") - continue - - logger.info(f"Iterative scan completed. Scanned: {scanned_dirs} directories, Skipped: {skipped_dirs} directories (Total visited: {len(visited_dirs)})") - - async def _process_file(self, ftp: FTP, file_info: FTPFileInfo) -> bool: - logger.debug(f"Processing file: {file_info.path} ({file_info.size} bytes) from directory: {file_info.directory_path}") - - try: - with tempfile.NamedTemporaryFile(mode='wb', suffix='.slg_v2', delete=False) as temp_file: - temp_path = temp_file.name - - with open(temp_path, 'wb') as f: - ftp.retrbinary(f'RETR {file_info.path}', f.write) - - records = await self.processor.process_file(temp_path, file_info.name) - - if records: - # Pass directory path to store_file_data for collection selection - await self.db_manager.store_file_data(file_info.name, records, file_info.directory_path) - logger.debug(f"Stored {len(records)} records from {file_info.name} to collection for {file_info.directory_path}") - return True - else: - logger.warning(f"No valid records found in {file_info.name}") - return False - - except Exception as e: - logger.error(f"Error processing file {file_info.name}: {e}") - return False - - finally: - try: - if 'temp_path' in locals(): - os.unlink(temp_path) - except OSError: - pass - - def get_status(self) -> str: - return self.status - - def get_last_check_time(self) -> Optional[str]: - return self.last_check.isoformat() if self.last_check else None - - def get_processed_count(self) -> int: - return self.files_processed_count - - def get_detailed_status(self) -> Dict[str, Any]: - return { - "status": self.status, - "last_check": self.get_last_check_time(), - "files_processed": self.files_processed_count, - "processed_files_count": len(self.processed_files), - "check_interval_hours": self.check_interval / 3600, - "ftp_host": self.ftp_host, - "base_path": self.base_path, - } diff --git a/monolith/src/modules/data_ingestion/http_poller.py b/monolith/src/modules/data_ingestion/http_poller.py new file mode 100644 index 0000000..c762cba --- /dev/null +++ b/monolith/src/modules/data_ingestion/http_poller.py @@ -0,0 +1,353 @@ +""" +HTTP Poller Service +Polls IoT devices via HTTP and ingests sensor data +""" + +import asyncio +import logging +import time +from datetime import datetime +from typing import Dict, Optional, Any, List +import aiohttp +from motor.motor_asyncio import AsyncIOMotorDatabase + +from .iot_config import IoTConfiguration, IoTDevice, SensorConfig, get_iot_config +from .models import DeviceStatus, PollingStatus, PollingMetrics +from modules.sensors.models import SensorReading, SensorType +from core.events import event_bus, EventTopics + +logger = logging.getLogger(__name__) + + +class HttpPoller: + """HTTP-based IoT device poller""" + + def __init__( + self, + sensors_db: AsyncIOMotorDatabase, + config: Optional[IoTConfiguration] = None, + poll_interval: int = 60, + timeout: int = 10, + max_concurrent: int = 5 + ): + """ + Initialize HTTP poller + + Args: + sensors_db: Motor database for sensor data storage + config: IoT configuration (loads from file if None) + poll_interval: Seconds between polls + timeout: HTTP request timeout in seconds + max_concurrent: Maximum concurrent HTTP requests + """ + self.sensors_db = sensors_db + self.config = config or get_iot_config() + self.poll_interval = poll_interval + self.timeout = timeout + self.max_concurrent = max_concurrent + + # Metrics tracking + self.device_status: Dict[str, DeviceStatus] = {} + self.total_polls = 0 + self.successful_polls = 0 + self.failed_polls = 0 + self.poll_times: List[float] = [] + + # Control flags + self.running = False + self._semaphore = asyncio.Semaphore(max_concurrent) + + # Initialize device status + self._initialize_device_status() + + def _initialize_device_status(self): + """Initialize status tracking for all devices""" + for device in self.config.get_all_devices(): + self.device_status[device.name] = DeviceStatus( + device_name=device.name, + device_type=device.type, + uri=device.uri, + status=PollingStatus.INACTIVE, + sensors_count=len(device.sensors) + ) + + async def poll_device(self, device: IoTDevice) -> bool: + """ + Poll a single device and ingest its data + + Args: + device: IoT device configuration + + Returns: + True if successful, False otherwise + """ + async with self._semaphore: + status = self.device_status[device.name] + start_time = time.time() + + try: + # Update status + status.last_poll = datetime.utcnow() + status.total_polls += 1 + self.total_polls += 1 + + # Make HTTP request + async with aiohttp.ClientSession() as session: + async with session.get( + device.uri, + timeout=aiohttp.ClientTimeout(total=self.timeout) + ) as response: + if response.status != 200: + raise Exception(f"HTTP {response.status}: {await response.text()}") + + data = await response.json() + + # Extract and store sensor readings + timestamp = int(datetime.utcnow().timestamp()) + readings_stored = 0 + + for sensor in device.sensors: + try: + value = self._extract_value(data, sensor.tag) + if value is not None: + await self._store_reading( + device=device, + sensor=sensor, + value=value, + timestamp=timestamp + ) + readings_stored += 1 + except Exception as e: + logger.warning( + f"Failed to extract sensor {sensor.type} from {device.name}: {e}" + ) + + # Update success metrics + poll_time = (time.time() - start_time) * 1000 + self.poll_times.append(poll_time) + if len(self.poll_times) > 100: + self.poll_times.pop(0) + + status.successful_polls += 1 + status.last_success = datetime.utcnow() + status.status = PollingStatus.ACTIVE + status.last_error = None + self.successful_polls += 1 + + logger.debug( + f"Polled {device.name}: {readings_stored} readings in {poll_time:.1f}ms" + ) + return True + + except asyncio.TimeoutError: + error_msg = f"Timeout after {self.timeout}s" + status.failed_polls += 1 + status.status = PollingStatus.ERROR + status.last_error = error_msg + self.failed_polls += 1 + logger.error(f"Timeout polling {device.name}") + return False + + except Exception as e: + error_msg = str(e) + status.failed_polls += 1 + status.status = PollingStatus.ERROR + status.last_error = error_msg + self.failed_polls += 1 + logger.error(f"Error polling {device.name}: {e}") + return False + + def _extract_value(self, data: Dict[str, Any], tag_path: List[str]) -> Optional[Any]: + """ + Extract value from nested JSON using tag path + + Args: + data: JSON response data + tag_path: List of keys to traverse + + Returns: + Extracted value or None + """ + current = data + for key in tag_path: + if isinstance(current, dict): + current = current.get(key) + if current is None: + return None + else: + return None + return current + + async def _store_reading( + self, + device: IoTDevice, + sensor: SensorConfig, + value: Any, + timestamp: int + ): + """ + Store sensor reading in database and publish event + + Args: + device: Device configuration + sensor: Sensor configuration + value: Sensor value + timestamp: Unix timestamp + """ + # Map sensor type to SensorType enum (with fallback to ENERGY) + sensor_type_map = { + "power": SensorType.ENERGY, + "voltage": SensorType.ENERGY, + "current": SensorType.ENERGY, + "energy": SensorType.ENERGY, + "temperature": SensorType.TEMPERATURE, + "humidity": SensorType.HUMIDITY, + "co2": SensorType.CO2, + "generation": SensorType.ENERGY, + "charging_rate": SensorType.ENERGY, + "doorOpen": SensorType.OCCUPANCY, + "state": SensorType.OCCUPANCY, + } + + sensor_type = sensor_type_map.get(sensor.type, SensorType.ENERGY) + + # Create sensor ID from device name and sensor type + sensor_id = f"{device.name}_{sensor.type}".replace(" ", "_") + + # Convert value based on data type + if sensor.data == "BOOLEAN": + numeric_value = 1.0 if value else 0.0 + else: + numeric_value = float(value) + + # Create sensor reading + reading_data = { + "sensor_id": sensor_id, + "sensor_type": sensor_type.value, + "timestamp": timestamp, + "room": device.type, # Use device type as room (battery, refrigerator, etc.) + "metadata": { + "device_name": device.name, + "device_type": device.type, + "sensor_type": sensor.type, + "data_type": sensor.data, + "source": "http_poller" + } + } + + # Add specific sensor data fields + if sensor.type == "power": + reading_data["power"] = numeric_value + elif sensor.type == "voltage": + reading_data["voltage"] = numeric_value + elif sensor.type == "current": + reading_data["current"] = numeric_value + elif sensor.type == "temperature": + reading_data["temperature"] = numeric_value + elif sensor.type == "humidity": + reading_data["humidity"] = numeric_value + elif sensor.type == "energy": + reading_data["energy"] = numeric_value + elif sensor.type == "generation": + reading_data["generation"] = numeric_value + else: + reading_data["energy"] = numeric_value # Default field + + # Store in database + await self.sensors_db.sensor_readings.insert_one(reading_data) + + # Publish to event bus + await event_bus.publish(EventTopics.ENERGY_DATA, { + "sensor_id": sensor_id, + "value": numeric_value, + "timestamp": timestamp, + "device_name": device.name, + "sensor_type": sensor.type + }) + + async def poll_all_devices(self): + """Poll all configured devices concurrently""" + devices = self.config.get_all_devices() + + if not devices: + logger.warning("No devices configured for polling") + return + + logger.info(f"Polling {len(devices)} devices...") + + # Poll all devices concurrently + tasks = [self.poll_device(device) for device in devices] + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Count successes + successes = sum(1 for r in results if r is True) + logger.info(f"Polling complete: {successes}/{len(devices)} successful") + + async def run(self): + """Run continuous polling loop""" + self.running = True + logger.info( + f"Starting HTTP poller: {len(self.config.get_all_devices())} devices, " + f"interval={self.poll_interval}s" + ) + + while self.running: + try: + await self.poll_all_devices() + await asyncio.sleep(self.poll_interval) + except Exception as e: + logger.error(f"Error in polling loop: {e}") + await asyncio.sleep(10) # Brief pause on error + + def stop(self): + """Stop the polling loop""" + logger.info("Stopping HTTP poller...") + self.running = False + + def get_metrics(self) -> PollingMetrics: + """Get current polling metrics""" + active_count = sum( + 1 for s in self.device_status.values() + if s.status == PollingStatus.ACTIVE + ) + inactive_count = sum( + 1 for s in self.device_status.values() + if s.status == PollingStatus.INACTIVE + ) + error_count = sum( + 1 for s in self.device_status.values() + if s.status == PollingStatus.ERROR + ) + + # Count devices by type + devices_by_type: Dict[str, int] = {} + for device in self.config.get_all_devices(): + devices_by_type[device.type] = devices_by_type.get(device.type, 0) + 1 + + # Calculate success rate + success_rate = 0.0 + if self.total_polls > 0: + success_rate = (self.successful_polls / self.total_polls) * 100 + + # Calculate average poll time + avg_poll_time = 0.0 + if self.poll_times: + avg_poll_time = sum(self.poll_times) / len(self.poll_times) + + return PollingMetrics( + timestamp=datetime.utcnow(), + total_devices=len(self.device_status), + active_devices=active_count, + inactive_devices=inactive_count, + error_devices=error_count, + total_polls=self.total_polls, + successful_polls=self.successful_polls, + failed_polls=self.failed_polls, + success_rate=success_rate, + average_poll_time_ms=avg_poll_time, + devices_by_type=devices_by_type + ) + + def get_device_statuses(self) -> List[DeviceStatus]: + """Get status for all devices""" + return list(self.device_status.values()) diff --git a/monolith/src/modules/data_ingestion/iot_config.py b/monolith/src/modules/data_ingestion/iot_config.py new file mode 100644 index 0000000..5740b3b --- /dev/null +++ b/monolith/src/modules/data_ingestion/iot_config.py @@ -0,0 +1,187 @@ +""" +IoT Configuration Loader +Loads and validates IoT device configuration from iots-right.json +""" + +import json +import logging +from pathlib import Path +from typing import Dict, List, Optional, Any +from pydantic import BaseModel, Field, validator + +logger = logging.getLogger(__name__) + + +class SensorConfig(BaseModel): + """Individual sensor configuration within a device""" + type: str = Field(..., description="Sensor type (power, voltage, temperature, etc.)") + tag: List[str] = Field(..., description="JSON path tags to extract value") + data: str = Field(..., description="Data type: DOUBLE, BOOLEAN, etc.") + + +class IoTDevice(BaseModel): + """IoT device configuration""" + name: str = Field(..., description="Device name") + type: str = Field(..., description="Device type (battery, refrigerator, hvac, etc.)") + uri: str = Field(..., description="HTTP endpoint URI") + sensors: List[SensorConfig] = Field(..., description="List of sensors") + + @validator('uri') + def validate_uri(cls, v): + """Ensure URI is valid""" + if not v.startswith(('http://', 'https://')): + raise ValueError(f"Invalid URI: {v}") + return v + + +class IoTConfiguration(BaseModel): + """Complete IoT configuration""" + iots: Dict[str, List[IoTDevice]] = Field(..., description="IoT devices grouped by category") + + def get_all_devices(self) -> List[IoTDevice]: + """Get flat list of all devices""" + devices = [] + for device_list in self.iots.values(): + devices.extend(device_list) + return devices + + def get_devices_by_type(self, device_type: str) -> List[IoTDevice]: + """Get devices by category type""" + return self.iots.get(device_type, []) + + def get_device_by_name(self, name: str) -> Optional[IoTDevice]: + """Find device by name""" + for device in self.get_all_devices(): + if device.name == name: + return device + return None + + +class IoTConfigLoader: + """Loads and manages IoT configuration""" + + def __init__(self, config_path: Optional[Path] = None): + """ + Initialize config loader + + Args: + config_path: Path to iots-right.json. If None, looks in monolith root + """ + self.config_path = config_path or self._find_config_file() + self.config: Optional[IoTConfiguration] = None + + def _find_config_file(self) -> Path: + """Find iots-right.json in monolith directory""" + # Start from current file location and go up to find monolith root + current = Path(__file__).parent + while current.name != 'monolith' and current.parent != current: + current = current.parent + + config_file = current / 'iots-right.json' + if not config_file.exists(): + logger.warning(f"Config file not found at {config_file}") + + return config_file + + def load(self) -> IoTConfiguration: + """ + Load configuration from file + + Returns: + IoTConfiguration object + + Raises: + FileNotFoundError: If config file doesn't exist + ValueError: If config is invalid + """ + if not self.config_path.exists(): + raise FileNotFoundError(f"IoT config not found: {self.config_path}") + + try: + with open(self.config_path, 'r') as f: + data = json.load(f) + + self.config = IoTConfiguration(**data) + + device_count = len(self.config.get_all_devices()) + logger.info(f"Loaded IoT config: {device_count} devices from {self.config_path}") + + # Log summary by type + for device_type, devices in self.config.iots.items(): + logger.info(f" - {device_type}: {len(devices)} devices") + + return self.config + + except json.JSONDecodeError as e: + logger.error(f"Invalid JSON in config file: {e}") + raise ValueError(f"Invalid JSON in {self.config_path}: {e}") + except Exception as e: + logger.error(f"Failed to load config: {e}") + raise + + def reload(self) -> IoTConfiguration: + """Reload configuration from file""" + return self.load() + + def get_config(self) -> IoTConfiguration: + """ + Get current configuration, loading if necessary + + Returns: + IoTConfiguration object + """ + if self.config is None: + self.load() + return self.config + + def get_device_summary(self) -> Dict[str, Any]: + """Get summary statistics of devices""" + config = self.get_config() + + total_devices = 0 + total_sensors = 0 + summary = {} + + for device_type, devices in config.iots.items(): + device_count = len(devices) + sensor_count = sum(len(d.sensors) for d in devices) + + total_devices += device_count + total_sensors += sensor_count + + summary[device_type] = { + "device_count": device_count, + "sensor_count": sensor_count, + "devices": [ + { + "name": d.name, + "uri": d.uri, + "sensor_count": len(d.sensors) + } + for d in devices + ] + } + + return { + "total_devices": total_devices, + "total_sensors": total_sensors, + "by_type": summary, + "config_file": str(self.config_path) + } + + +# Global config loader instance +_config_loader: Optional[IoTConfigLoader] = None + + +def get_config_loader() -> IoTConfigLoader: + """Get global config loader instance""" + global _config_loader + if _config_loader is None: + _config_loader = IoTConfigLoader() + return _config_loader + + +def get_iot_config() -> IoTConfiguration: + """Get IoT configuration""" + return get_config_loader().get_config() diff --git a/monolith/src/modules/data_ingestion/models.py b/monolith/src/modules/data_ingestion/models.py new file mode 100644 index 0000000..fe9a486 --- /dev/null +++ b/monolith/src/modules/data_ingestion/models.py @@ -0,0 +1,112 @@ +""" +Pydantic models for Data Ingestion module +""" + +from datetime import datetime +from typing import Optional, Dict, Any, List +from pydantic import BaseModel, Field +from enum import Enum + + +class PollingStatus(str, Enum): + """Device polling status""" + ACTIVE = "active" + INACTIVE = "inactive" + ERROR = "error" + DISABLED = "disabled" + + +class DataSourceType(str, Enum): + """Data source type""" + HTTP = "http" + MQTT = "mqtt" + + +class DeviceStatus(BaseModel): + """Device polling status information""" + device_name: str + device_type: str + uri: str + status: PollingStatus + last_poll: Optional[datetime] = None + last_success: Optional[datetime] = None + total_polls: int = 0 + successful_polls: int = 0 + failed_polls: int = 0 + last_error: Optional[str] = None + sensors_count: int = 0 + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() if v else None + } + + +class DataSourceSummary(BaseModel): + """Summary of data sources""" + source_type: DataSourceType + enabled: bool + total_devices: int + active_devices: int + total_sensors: int + devices: List[DeviceStatus] + + +class PollingMetrics(BaseModel): + """Polling performance metrics""" + timestamp: datetime + total_devices: int + active_devices: int + inactive_devices: int + error_devices: int + total_polls: int + successful_polls: int + failed_polls: int + success_rate: float + average_poll_time_ms: float + devices_by_type: Dict[str, int] + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() + } + + +class HttpPollerConfig(BaseModel): + """HTTP poller configuration""" + enabled: bool = True + poll_interval_seconds: int = Field(60, ge=5, le=3600) + timeout_seconds: int = Field(10, ge=1, le=60) + max_retries: int = Field(3, ge=0, le=10) + concurrent_requests: int = Field(5, ge=1, le=50) + + +class MqttSubscriberConfig(BaseModel): + """MQTT subscriber configuration (future)""" + enabled: bool = False + broker_host: str = "localhost" + broker_port: int = 1883 + username: Optional[str] = None + password: Optional[str] = None + topics: List[str] = [] + + +class DataIngestionConfig(BaseModel): + """Complete data ingestion configuration""" + http: HttpPollerConfig = HttpPollerConfig() + mqtt: MqttSubscriberConfig = MqttSubscriberConfig() + + +class HealthResponse(BaseModel): + """Health check response""" + service: str + status: str + timestamp: datetime + version: str + http_poller: Optional[Dict[str, Any]] = None + mqtt_subscriber: Optional[Dict[str, Any]] = None + + class Config: + json_encoders = { + datetime: lambda v: v.isoformat() + } diff --git a/monolith/src/modules/data_ingestion/router.py b/monolith/src/modules/data_ingestion/router.py new file mode 100644 index 0000000..db47cf8 --- /dev/null +++ b/monolith/src/modules/data_ingestion/router.py @@ -0,0 +1,194 @@ +"""Data Ingestion module API routes.""" + +import logging +from datetime import datetime +from fastapi import APIRouter, HTTPException, Depends +from typing import Optional + +from .models import ( + HealthResponse, DataSourceSummary, PollingMetrics, + DeviceStatus, DataSourceType +) +from .iot_config import get_config_loader +from .http_poller import HttpPoller +from core.dependencies import get_sensors_db + +logger = logging.getLogger(__name__) + +# Create router +router = APIRouter() + +# Global HTTP poller instance (will be set by main.py) +_http_poller: Optional[HttpPoller] = None + + +def set_http_poller(poller: HttpPoller): + """Set the global HTTP poller instance""" + global _http_poller + _http_poller = poller + + +def get_http_poller() -> HttpPoller: + """Get the HTTP poller instance""" + if _http_poller is None: + raise HTTPException(status_code=503, detail="HTTP poller not initialized") + return _http_poller + + +# Health check +@router.get("/health", response_model=HealthResponse) +async def health_check(db=Depends(get_sensors_db)): + """Health check endpoint for data ingestion module""" + try: + await db.command("ping") + + http_status = None + if _http_poller: + metrics = _http_poller.get_metrics() + http_status = { + "enabled": True, + "running": _http_poller.running, + "total_devices": metrics.total_devices, + "active_devices": metrics.active_devices, + "success_rate": metrics.success_rate + } + + return HealthResponse( + service="data-ingestion-module", + status="healthy", + timestamp=datetime.utcnow(), + version="1.0.0", + http_poller=http_status, + mqtt_subscriber={"enabled": False} + ) + except Exception as e: + logger.error(f"Health check failed: {e}") + raise HTTPException(status_code=503, detail="Service Unavailable") + + +@router.get("/sources", response_model=DataSourceSummary) +async def get_data_sources(): + """ + Get all data ingestion sources + Implements OpenAPI endpoint: GET /api/v1/sources + """ + try: + poller = get_http_poller() + config_loader = get_config_loader() + + device_statuses = poller.get_device_statuses() + active_count = sum(1 for d in device_statuses if d.status.value == "active") + + return DataSourceSummary( + source_type=DataSourceType.HTTP, + enabled=True, + total_devices=len(device_statuses), + active_devices=active_count, + total_sensors=sum(d.sensors_count for d in device_statuses), + devices=device_statuses + ) + + except Exception as e: + logger.error(f"Error getting data sources: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + +@router.get("/sources/summary") +async def get_sources_summary(): + """Get summary of configured data sources""" + try: + config_loader = get_config_loader() + summary = config_loader.get_device_summary() + return summary + + except Exception as e: + logger.error(f"Error getting sources summary: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + +@router.get("/sources/devices") +async def get_device_list(): + """Get list of all configured devices""" + try: + poller = get_http_poller() + devices = poller.get_device_statuses() + + return { + "devices": devices, + "count": len(devices), + "timestamp": datetime.utcnow().isoformat() + } + + except Exception as e: + logger.error(f"Error getting device list: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + +@router.get("/sources/devices/{device_name}", response_model=DeviceStatus) +async def get_device_status(device_name: str): + """Get status of a specific device""" + try: + poller = get_http_poller() + status = poller.device_status.get(device_name) + + if not status: + raise HTTPException(status_code=404, detail=f"Device '{device_name}' not found") + + return status + + except HTTPException: + raise + except Exception as e: + logger.error(f"Error getting device status: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + +@router.get("/metrics", response_model=PollingMetrics) +async def get_polling_metrics(): + """Get HTTP polling performance metrics""" + try: + poller = get_http_poller() + return poller.get_metrics() + + except Exception as e: + logger.error(f"Error getting metrics: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + +@router.post("/poll/trigger") +async def trigger_manual_poll(): + """Manually trigger a polling cycle for all devices""" + try: + poller = get_http_poller() + + # Trigger poll in background + import asyncio + asyncio.create_task(poller.poll_all_devices()) + + return { + "message": "Manual poll triggered", + "timestamp": datetime.utcnow().isoformat(), + "devices": len(poller.device_status) + } + + except Exception as e: + logger.error(f"Error triggering poll: {e}") + raise HTTPException(status_code=500, detail="Internal server error") + + +@router.post("/config/reload") +async def reload_configuration(): + """Reload IoT configuration from iots-right.json""" + try: + config_loader = get_config_loader() + config = config_loader.reload() + + return { + "message": "Configuration reloaded successfully", + "total_devices": len(config.get_all_devices()), + "timestamp": datetime.utcnow().isoformat() + } + + except Exception as e: + logger.error(f"Error reloading config: {e}") + raise HTTPException(status_code=500, detail=str(e)) diff --git a/monolith/src/modules/data_ingestion/slg_processor.py b/monolith/src/modules/data_ingestion/slg_processor.py deleted file mode 100644 index d23e848..0000000 --- a/monolith/src/modules/data_ingestion/slg_processor.py +++ /dev/null @@ -1,171 +0,0 @@ -#!/usr/bin/env python3 -""" -SA4CPS SLG_V2 File Processor -Simple parser for .slg_v2 energy data files -""" - -import logging -from datetime import datetime -from typing import List, Dict, Any, Optional -import re - -logger = logging.getLogger(__name__) - - -class SLGProcessor: - """Processes SA4CPS .slg_v2 files into structured energy data records""" - - def __init__(self): - self.processed_files = 0 - self.total_records = 0 - - async def process_file(self, file_path: str, filename: str) -> List[Dict[str, Any]]: - """Process a .slg_v2 file and return energy data records""" - logger.info(f"Processing SLG file: {filename}") - - try: - with open(file_path, 'r', encoding='utf-8') as file: - lines = file.readlines() - - records = [] - file_metadata = self._parse_metadata(lines[:5]) # Parse first 5 lines for metadata - - # Process data lines (lines starting with '20' are data records) - for line_num, line in enumerate(lines, 1): - line = line.strip() - - if line.startswith('20'): # Data record lines start with '20' (year) - record = self._parse_data_line(line, file_metadata, filename) - if record: - records.append(record) - - self.processed_files += 1 - self.total_records += len(records) - - logger.info(f"Processed {len(records)} records from {filename}") - return records - - except Exception as e: - logger.error(f"Error processing {filename}: {e}") - return [] - - def _parse_metadata(self, header_lines: List[str]) -> Dict[str, Any]: - """Parse metadata from SLG file header""" - metadata = { - "meter_id": None, - "measurement_type": None, - "unit": None, - "interval": None, - "period_start": None, - "period_end": None - } - - try: - for line in header_lines: - line = line.strip() - - if line.startswith('00'): # Header line with meter info - parts = line.split('\t') - if len(parts) >= 12: - metadata["meter_id"] = parts[3] # Meter ID - metadata["period_start"] = self._parse_date(parts[6]) - metadata["period_end"] = self._parse_date(parts[7]) - - elif line.startswith('01'): # Measurement configuration - parts = line.split('\t') - if len(parts) >= 10: - metadata["measurement_type"] = parts[4] # POTENCIA - metadata["unit"] = parts[5] # K (kW) - metadata["interval"] = parts[6] # 15M - - except Exception as e: - logger.warning(f"Error parsing metadata: {e}") - - return metadata - - def _parse_data_line(self, line: str, metadata: Dict[str, Any], filename: str) -> Optional[Dict[str, Any]]: - """Parse a data line into an energy record""" - try: - parts = line.split('\t') - - if len(parts) < 4: - return None - - # Parse timestamp (format: 20250201 0015) - date_part = parts[1] # 20250201 - time_part = parts[2] # 0015 - - # Convert to datetime - timestamp = self._parse_timestamp(date_part, time_part) - if not timestamp: - return None - - # Parse energy value - value_str = parts[3].replace('.', '') # Remove decimal separator - try: - value = float(value_str) / 1000.0 # Convert from thousandths - except ValueError: - value = 0.0 - - # Create record - record = { - "timestamp": timestamp, - "meter_id": metadata.get("meter_id", "unknown"), - "measurement_type": metadata.get("measurement_type", "energy"), - "value": value, - "unit": metadata.get("unit", "kW"), - "interval": metadata.get("interval", "15M"), - "filename": filename, - "quality": int(parts[4]) if len(parts) > 4 else 0 - } - - return record - - except Exception as e: - logger.warning(f"Error parsing data line '{line}': {e}") - return None - - def _parse_date(self, date_str: str) -> Optional[datetime]: - """Parse date string (YYYYMMDD format)""" - try: - if len(date_str) == 8 and date_str.isdigit(): - year = int(date_str[:4]) - month = int(date_str[4:6]) - day = int(date_str[6:8]) - return datetime(year, month, day) - except ValueError: - pass - return None - - def _parse_timestamp(self, date_str: str, time_str: str) -> Optional[datetime]: - """Parse timestamp from date and time strings""" - try: - # Parse date (YYYYMMDD) - if len(date_str) != 8 or not date_str.isdigit(): - return None - - year = int(date_str[:4]) - month = int(date_str[4:6]) - day = int(date_str[6:8]) - - # Parse time (HHMM) - if len(time_str) != 4 or not time_str.isdigit(): - return None - - hour = int(time_str[:2]) - if hour ==24: - hour = 0 - minute = int(time_str[2:4]) - - return datetime(year, month, day, hour, minute) - - except ValueError as e: - logger.warning(f"Error parsing timestamp '{date_str} {time_str}': {e}") - return None - - def get_stats(self) -> Dict[str, int]: - """Get processing statistics""" - return { - "files_processed": self.processed_files, - "total_records": self.total_records - } diff --git a/monolith/src/modules/demand_response/__init__.py b/monolith/src/modules/demand_response/__init__.py index 4f6bc16..ddcfebd 100644 --- a/monolith/src/modules/demand_response/__init__.py +++ b/monolith/src/modules/demand_response/__init__.py @@ -1,8 +1,8 @@ """Demand Response module - handles grid interaction and load management.""" from .models import ( - DemandResponseInvitation, - InvitationResponse, + DRInvitation, + DRInvitationResponse, EventRequest, EventStatus, LoadReductionRequest, @@ -11,8 +11,8 @@ from .models import ( from .demand_response_service import DemandResponseService __all__ = [ - "DemandResponseInvitation", - "InvitationResponse", + "DRInvitation", + "DRInvitationResponse", "EventRequest", "EventStatus", "LoadReductionRequest", diff --git a/monolith/src/modules/sensors/router.py b/monolith/src/modules/sensors/router.py index 019053e..0915e01 100644 --- a/monolith/src/modules/sensors/router.py +++ b/monolith/src/modules/sensors/router.py @@ -14,7 +14,7 @@ from .room_service import RoomService from .analytics_service import AnalyticsService from .websocket_manager import WebSocketManager -from src.core.dependencies import get_sensors_db, get_redis +from core.dependencies import get_sensors_db, get_redis logger = logging.getLogger(__name__) diff --git a/monolith/src/modules/sensors/websocket_manager.py b/monolith/src/modules/sensors/websocket_manager.py index 56b8e88..fbdec6e 100644 --- a/monolith/src/modules/sensors/websocket_manager.py +++ b/monolith/src/modules/sensors/websocket_manager.py @@ -8,7 +8,7 @@ from typing import List, Set, Dict, Any from fastapi import WebSocket, WebSocketDisconnect import logging -from models import SensorReading +from .models import SensorReading logger = logging.getLogger(__name__)