feat: Implement HTTP Poller for IoT device data ingestion

- Added iots-right.json configuration file to define IoT devices and their sensors.
- Developed HttpPoller class to handle polling of IoT devices via HTTP.
- Created IoT configuration loader to validate and load device configurations from JSON.
- Introduced models for device status, polling metrics, and data sources.
- Implemented API routes for health checks, device status retrieval, and configuration management.
- Enhanced error handling and logging throughout the data ingestion process.
This commit is contained in:
rafaeldpsilva
2025-12-22 16:35:22 +00:00
parent ccf5f5a5c3
commit 4bedcecf5d
18 changed files with 3157 additions and 1167 deletions

2
.gitignore vendored
View File

@@ -173,4 +173,6 @@ poetry.toml
# LSP config files
pyrightconfig.json
CLAUDE.md
# End of https://www.toptal.com/developers/gitignore/api/python

View File

@@ -1,20 +1,12 @@
# MongoDB Configuration (external deployment)
# MongoDB Configuration
# Update with your MongoDB connection string
MONGO_URL=mongodb://admin:password123@mongodb-host:27017/?authSource=admin
MONGO_URL=mongodb://admin:password123@localhost:27017/?authSource=admin
# Redis Configuration (external deployment, optional)
# Update with your Redis connection string
REDIS_URL=redis://redis-host:6379
REDIS_ENABLED=false
# FTP Configuration
FTP_SA4CPS_HOST=ftp.sa4cps.pt
FTP_SA4CPS_PORT=21
FTP_SA4CPS_USERNAME=curvascarga@sa4cps.pt
FTP_SA4CPS_PASSWORD=
FTP_SA4CPS_REMOTE_PATH=/SLGs/
FTP_CHECK_INTERVAL=21600
FTP_SKIP_INITIAL_SCAN=true
# HTTP Poller Configuration
# IoT device polling settings
HTTP_POLL_INTERVAL=60
HTTP_TIMEOUT=10
HTTP_MAX_CONCURRENT=5
# Application Settings
DEBUG=false

1561
monolith/iots-left.json Executable file

File diff suppressed because it is too large Load Diff

616
monolith/iots-right.json Executable file
View File

@@ -0,0 +1,616 @@
{
"iots": {
"battery": [
{
"name": "Battery_1",
"type": "battery",
"uri": "192.168.2.54",
"sensors": [
{
"type": "energy",
"tag": [
"battery",
"stored_energy"
],
"data": "DOUBLE"
},
{
"type": "charging_rate",
"tag": [
"battery",
"charging_rate"
],
"data": "DOUBLE"
}
]
},
{
"name": "Battery_2",
"type": "battery",
"uri": "192.168.2.55",
"sensors": [
{
"type": "energy",
"tag": [
"battery",
"stored_energy"
],
"data": "DOUBLE"
},
{
"type": "charging_rate",
"tag": [
"battery",
"charging_rate"
],
"data": "DOUBLE"
}
]
},
{
"name": "Battery_3",
"type": "battery",
"uri": "192.168.2.56",
"sensors": [
{
"type": "energy",
"tag": [
"battery",
"stored_energy"
],
"data": "DOUBLE"
},
{
"type": "charging_rate",
"tag": [
"battery",
"charging_rate"
],
"data": "DOUBLE"
}
]
}
],
"refrigerator": [
{
"name": "Fridge",
"type": "refrigerator",
"uri": "http://192.168.2.5:8520/enaplug/read/170307001",
"sensors": [
{
"type": "power",
"tag": [
"enaplug_170307001",
"act1"
],
"data": "DOUBLE"
},
{
"type": "voltage",
"tag": [
"enaplug_170307001",
"volt1"
],
"data": "DOUBLE"
},
{
"type": "current",
"tag": [
"enaplug_170307001",
"curr1"
],
"data": "DOUBLE"
},
{
"type": "doorOpen",
"tag": [
"enaplug_170307001",
"doorOpened"
],
"data": "BOOLEAN"
},
{
"type": "state",
"tag": [
"enaplug_170307001",
"state"
],
"data": "DOUBLE"
},
{
"type": "internal Temperature",
"tag": [
"enaplug_170307001",
"temp2"
],
"data": "DOUBLE"
},
{
"type": "external Temperature",
"tag": [
"enaplug_170307001",
"temp1"
],
"data": "DOUBLE"
},
{
"type": "humidity",
"tag": [
"enaplug_170307001",
"hum1"
],
"data": "DOUBLE"
}
]
}
],
"waterheater": [
{
"name": "Water Heater",
"type": "water heater",
"uri": "http://192.168.2.5:8520/enaplug/read/180717001",
"sensors": [
{
"type": "power",
"tag": [
"enaplug_180717001",
"act"
],
"data": "DOUBLE"
},
{
"type": "voltage",
"tag": [
"enaplug_180717001",
"volt"
],
"data": "DOUBLE"
},
{
"type": "current",
"tag": [
"enaplug_180717001",
"curr"
],
"data": "DOUBLE"
},
{
"type": "state",
"tag": [
"enaplug_180717001",
"state"
],
"data": "BOOLEAN"
},
{
"type": "temperature",
"tag": [
"enaplug_180717001",
"temp"
],
"data": "DOUBLE"
}
]
}
],
"microwave": [
{
"name": "Microwave",
"type": "microwave",
"uri": "http://192.168.2.5:8520/resource/AnalyzerKitHall_V2",
"sensors": [
{
"type": "power",
"tag": [
"AnalyzerKitHall_V2",
"microwave_active"
],
"data": "DOUBLE"
},
{
"type": "voltage",
"tag": [
"AnalyzerKitHall_V2",
"microwave_voltage"
],
"data": "DOUBLE"
},
{
"type": "current",
"tag": [
"AnalyzerKitHall_V2",
"microwave_current_x10"
],
"data": "DOUBLE"
}
]
}
],
"dishwasher": [
{
"name": "Dishwasher",
"type": "dishwasher",
"uri": "http://192.168.2.5:8520/resource/AnalyzerKitHall_V2",
"sensors": [
{
"type": "power",
"tag": [
"AnalyzerKitHall_V2",
"dishwasher_active"
],
"data": "DOUBLE"
},
{
"type": "voltage",
"tag": [
"AnalyzerKitHall_V2",
"dishwasher_voltage"
],
"data": "DOUBLE"
},
{
"type": "current",
"tag": [
"AnalyzerKitHall_V2",
"dishwasher_current_x10"
],
"data": "DOUBLE"
}
]
}
],
"kettle": [
{
"name": "Kettle",
"type": "kettle",
"uri": "http://192.168.2.5:8520/resource/AnalyzerKitHall_V2",
"sensors": [
{
"type": "power",
"tag": [
"AnalyzerKitHall_V2",
"kettle_active"
],
"data": "DOUBLE"
},
{
"type": "voltage",
"tag": [
"AnalyzerKitHall_V2",
"kettle_voltage"
],
"data": "DOUBLE"
},
{
"type": "current",
"tag": [
"AnalyzerKitHall_V2",
"kettle_current_x10"
],
"data": "DOUBLE"
}
]
}
],
"hvac": [
{
"name": "Air Conditioner Kitchen",
"type": "hvac",
"uri": "http://192.168.2.5:8520/resource/AnalyzerKitHall_V2",
"sensors": [
{
"type": "power",
"tag": [
"AnalyzerKitHall_V2",
"kitchen_ac_activePower"
],
"data": "DOUBLE"
},
{
"type": "voltage",
"tag": [
"AnalyzerKitHall_V2",
"kitchen_ac_voltage"
],
"data": "DOUBLE"
},
{
"type": "current",
"tag": [
"AnalyzerKitHall_V2",
"kitchen_ac_current_x10"
],
"data": "DOUBLE"
}
]
},
{
"name": "Air Conditioner Hallway",
"type": "hvac",
"uri": "http://192.168.2.5:8520/resource/AnalyzerKitHall_V2",
"sensors": [
{
"type": "power",
"tag": [
"AnalyzerKitHall_V2",
"hallway_ac_activePower"
],
"data": "DOUBLE"
},
{
"type": "voltage",
"tag": [
"AnalyzerKitHall_V2",
"hallway_ac_voltage"
],
"data": "DOUBLE"
},
{
"type": "current",
"tag": [
"AnalyzerKitHall_V2",
"hallway_ac_current_x10"
],
"data": "DOUBLE"
}
]
},
{
"name": "Air Conditioner 112_115",
"type": "hvac",
"uri": "http://192.168.2.5:8520/resource/Analyzer115_V1",
"sensors": [
{
"type": "power",
"tag": [
"Analyzer115_V1",
"P2_W"
],
"data": "DOUBLE"
},
{
"type": "voltage",
"tag": [
"Analyzer115_V1",
"U2N_Vx10"
],
"data": "DOUBLE"
},
{
"type": "current",
"tag": [
"Analyzer115_V1",
"Curr2_mA"
],
"data": "DOUBLE"
}
]
},
{
"name": "Air Conditioner 111_116",
"type": "hvac",
"uri": "http://192.168.2.5:8520/resource/Analyzer116_V1",
"sensors": [
{
"type": "power",
"tag": [
"Analyzer116_V1",
"P2_W"
],
"data": "DOUBLE"
},
{
"type": "voltage",
"tag": [
"Analyzer116_V1",
"U2N_V"
],
"data": "DOUBLE"
},
{
"type": "current",
"tag": [
"Analyzer116_V1",
"Curr2_A"
],
"data": "DOUBLE"
}
]
}
],
"sockets": [
{
"name": "Sockets 112_115",
"type": "sockets",
"uri": "http://192.168.2.5:8520/resource/Analyzer115_V1",
"sensors": [
{
"type": "power",
"tag": [
"Analyzer115_V1",
"P1_W"
],
"data": "DOUBLE"
},
{
"type": "voltage",
"tag": [
"Analyzer115_V1",
"U1N_Vx10"
],
"data": "DOUBLE"
},
{
"type": "current",
"tag": [
"Analyzer115_V1",
"Curr1_mA"
],
"data": "DOUBLE"
}
]
},
{
"name": "Sockets 111_116",
"type": "sockets",
"uri": "http://192.168.2.5:8520/resource/Analyzer116_V1",
"sensors": [
{
"type": "power",
"tag": [
"Analyzer116_V1",
"P3_W"
],
"data": "DOUBLE"
},
{
"type": "voltage",
"tag": [
"Analyzer116_V1",
"U3N_V"
],
"data": "DOUBLE"
},
{
"type": "current",
"tag": [
"Analyzer116_V1",
"Curr3_A"
],
"data": "DOUBLE"
}
]
}
],
"lamp": [
{
"name": "Lamp 1_111",
"type": "lamp",
"uri": "http://192.168.2.68:8089/desenrasca/lamp/111/1",
"sensors": [
{
"type": "power",
"tag": [
"consumption_w"
],
"data": "DOUBLE"
}
]
},
{
"name": "Lamp 1_112",
"type": "lamp",
"uri": "http://192.168.2.68:8089/desenrasca/lamp/112/1",
"sensors": [
{
"type": "power",
"tag": [
"consumption_w"
],
"data": "DOUBLE"
}
]
},
{
"name": "Lamp 2_112",
"type": "lamp",
"uri": "http://192.168.2.68:8089/desenrasca/lamp/112/2",
"sensors": [
{
"type": "power",
"tag": [
"consumption_w"
],
"data": "DOUBLE"
}
]
},
{
"name": "Lamp 3_112",
"type": "lamp",
"uri": "http://192.168.2.68:8089/desenrasca/lamp/112/3",
"sensors": [
{
"type": "power",
"tag": [
"consumption_w"
],
"data": "DOUBLE"
}
]
},
{
"name": "Lamp 1_115",
"type": "lamp",
"uri": "http://192.168.2.68:8089/desenrasca/lamp/115/1",
"sensors": [
{
"type": "power",
"tag": [
"consumption_w"
],
"data": "DOUBLE"
}
]
},
{
"name": "Lamp 2_115",
"type": "lamp",
"uri": "http://192.168.2.68:8089/desenrasca/lamp/115/2",
"sensors": [
{
"type": "power",
"tag": [
"consumption_w"
],
"data": "DOUBLE"
}
]
},
{
"name": "Lamp 3_115",
"type": "lamp",
"uri": "http://192.168.2.68:8089/desenrasca/lamp/115/3",
"sensors": [
{
"type": "power",
"tag": [
"consumption_w"
],
"data": "DOUBLE"
}
]
},
{
"name": "Lamp 1_116",
"type": "lamp",
"uri": "http://192.168.2.68:8089/desenrasca/lamp/116/1",
"sensors": [
{
"type": "power",
"tag": [
"consumption_w"
],
"data": "DOUBLE"
}
]
}
],
"generation": [
{
"name": "Generation",
"type": "generation",
"uri": "http://192.168.2.68:8089/desenrasca/generation/3750",
"sensors": [
{
"type": "generation",
"tag": [
"generation_w"
],
"data": "DOUBLE"
}
]
}
]
}
}

View File

@@ -23,18 +23,10 @@ class Settings(BaseSettings):
data_ingestion_db_name: str = "digitalmente_ingestion"
main_db_name: str = "energy_dashboard"
# Redis
redis_url: str = os.getenv("REDIS_URL", "redis://localhost:6379")
redis_enabled: bool = True # Can be disabled for full monolith mode
# FTP Configuration (for data ingestion)
ftp_sa4cps_host: str = os.getenv("FTP_SA4CPS_HOST", "ftp.sa4cps.pt")
ftp_sa4cps_port: int = int(os.getenv("FTP_SA4CPS_PORT", "21"))
ftp_sa4cps_username: str = os.getenv("FTP_SA4CPS_USERNAME", "curvascarga@sa4cps.pt")
ftp_sa4cps_password: str = os.getenv("FTP_SA4CPS_PASSWORD", "")
ftp_sa4cps_remote_path: str = os.getenv("FTP_SA4CPS_REMOTE_PATH", "/SLGs/")
ftp_check_interval: int = int(os.getenv("FTP_CHECK_INTERVAL", "21600")) # 6 hours
ftp_skip_initial_scan: bool = os.getenv("FTP_SKIP_INITIAL_SCAN", "true").lower() == "true"
# HTTP Poller Configuration (for IoT devices)
http_poll_interval: int = int(os.getenv("HTTP_POLL_INTERVAL", "60")) # 60 seconds
http_timeout: int = int(os.getenv("HTTP_TIMEOUT", "10")) # 10 seconds
http_max_concurrent: int = int(os.getenv("HTTP_MAX_CONCURRENT", "5")) # 5 concurrent requests
# CORS
cors_origins: list = ["*"]
@@ -42,11 +34,6 @@ class Settings(BaseSettings):
cors_allow_methods: list = ["*"]
cors_allow_headers: list = ["*"]
# Background Tasks
health_check_interval: int = 30
event_scheduler_interval: int = 60
auto_response_interval: int = 30
class Config:
env_file = ".env"
case_sensitive = False

View File

@@ -14,14 +14,13 @@ from fastapi.middleware.cors import CORSMiddleware
from core.config import settings
from core.logging_config import setup_logging
from core.database import db_manager
from core.redis import redis_manager
from core.events import event_bus, EventTopics
# Module imports
from modules.sensors.router import router as sensors_router
from modules.sensors.room_service import RoomService
from modules.sensors import WebSocketManager
from modules.demand_response import DemandResponseService
# TEMPORARILY DISABLED: from modules.demand_response import DemandResponseService
# Setup logging
setup_logging()
@@ -35,7 +34,7 @@ async def room_metrics_aggregation_task():
while True:
try:
room_service = RoomService(db_manager.sensors_db, redis_manager.client)
room_service = RoomService(db_manager.sensors_db, None)
await room_service.aggregate_all_room_metrics()
await asyncio.sleep(300) # 5 minutes
@@ -62,74 +61,66 @@ async def data_cleanup_task():
await asyncio.sleep(7200)
async def event_scheduler_task():
"""Background task for checking and executing scheduled DR events"""
logger.info("Starting event scheduler task")
# TEMPORARILY DISABLED: Demand Response background tasks
# async def event_scheduler_task():
# """Background task for checking and executing scheduled DR events"""
# logger.info("Starting event scheduler task")
#
# while True:
# try:
# service = DemandResponseService(db_manager.demand_response_db, redis_manager.client)
# await service.check_scheduled_events()
# await asyncio.sleep(settings.event_scheduler_interval)
#
# except Exception as e:
# logger.error(f"Error in event scheduler task: {e}")
# await asyncio.sleep(120)
# async def auto_response_task():
# """Background task for automatic demand response"""
# logger.info("Starting auto-response task")
#
# while True:
# try:
# service = DemandResponseService(db_manager.demand_response_db, redis_manager.client)
# await service.process_auto_responses()
# await asyncio.sleep(settings.auto_response_interval)
#
# except Exception as e:
# logger.error(f"Error in auto-response task: {e}")
# await asyncio.sleep(90)
# async def energy_data_event_subscriber():
# """Subscribe to internal event bus for energy data events"""
# logger.info("Starting energy data event subscriber")
#
# async def handle_energy_data(data):
# """Handle energy data events"""
# try:
# service = DemandResponseService(db_manager.demand_response_db, redis_manager.client)
# sensor_id = data.get("sensorId") or data.get("sensor_id")
# power_kw = data.get("value", 0.0)
#
# if sensor_id:
# service.update_device_power_cache(sensor_id, power_kw)
#
# except Exception as e:
# logger.error(f"Error processing energy data event: {e}")
#
# # Subscribe to energy data events
# event_bus.subscribe(EventTopics.ENERGY_DATA, handle_energy_data)
async def http_polling_task(http_poller):
"""Background task for HTTP IoT device polling"""
logger.info("Starting HTTP polling task")
while True:
try:
service = DemandResponseService(db_manager.demand_response_db, redis_manager.client)
await service.check_scheduled_events()
await asyncio.sleep(settings.event_scheduler_interval)
await http_poller.run()
except Exception as e:
logger.error(f"Error in event scheduler task: {e}")
await asyncio.sleep(120)
async def auto_response_task():
"""Background task for automatic demand response"""
logger.info("Starting auto-response task")
while True:
try:
service = DemandResponseService(db_manager.demand_response_db, redis_manager.client)
await service.process_auto_responses()
await asyncio.sleep(settings.auto_response_interval)
except Exception as e:
logger.error(f"Error in auto-response task: {e}")
await asyncio.sleep(90)
async def energy_data_event_subscriber():
"""Subscribe to internal event bus for energy data events"""
logger.info("Starting energy data event subscriber")
async def handle_energy_data(data):
"""Handle energy data events"""
try:
service = DemandResponseService(db_manager.demand_response_db, redis_manager.client)
sensor_id = data.get("sensorId") or data.get("sensor_id")
power_kw = data.get("value", 0.0)
if sensor_id:
service.update_device_power_cache(sensor_id, power_kw)
except Exception as e:
logger.error(f"Error processing energy data event: {e}")
# Subscribe to energy data events
event_bus.subscribe(EventTopics.ENERGY_DATA, handle_energy_data)
async def ftp_monitoring_task():
"""Background task for FTP monitoring"""
logger.info("Starting FTP monitoring task")
while True:
try:
from modules.data_ingestion import FTPMonitor, SLGProcessor
ftp_monitor = FTPMonitor(db_manager.data_ingestion_db)
slg_processor = SLGProcessor(db_manager.data_ingestion_db)
await ftp_monitor.check_and_process_files(slg_processor)
await asyncio.sleep(settings.ftp_check_interval)
except Exception as e:
logger.error(f"Error in FTP monitoring task: {e}")
await asyncio.sleep(600)
logger.error(f"Error in HTTP polling task: {e}")
# Application lifespan
@@ -140,24 +131,33 @@ async def lifespan(app: FastAPI):
# Connect to databases
await db_manager.connect()
await redis_manager.connect()
# Initialize default rooms
room_service = RoomService(db_manager.sensors_db, redis_manager.client)
room_service = RoomService(db_manager.sensors_db, None)
await room_service.initialize_default_rooms()
# Initialize HTTP poller for IoT devices
from modules.data_ingestion import HttpPoller, set_http_poller
http_poller = HttpPoller(
sensors_db=db_manager.sensors_db,
poll_interval=settings.http_poll_interval,
timeout=10,
max_concurrent=5
)
set_http_poller(http_poller)
# Subscribe to internal events
await energy_data_event_subscriber()
# TEMPORARILY DISABLED: await energy_data_event_subscriber()
# Start background tasks
asyncio.create_task(room_metrics_aggregation_task())
asyncio.create_task(data_cleanup_task())
asyncio.create_task(event_scheduler_task())
asyncio.create_task(auto_response_task())
# TEMPORARILY DISABLED: asyncio.create_task(event_scheduler_task())
# TEMPORARILY DISABLED: asyncio.create_task(auto_response_task())
# Start FTP monitoring if not skipping initial scan
if not settings.ftp_skip_initial_scan:
asyncio.create_task(ftp_monitoring_task())
# Start HTTP polling task
asyncio.create_task(http_polling_task(http_poller))
logger.info("Application startup complete")
@@ -167,7 +167,6 @@ async def lifespan(app: FastAPI):
# Disconnect from databases
await db_manager.disconnect()
await redis_manager.disconnect()
logger.info("Application shutdown complete")
@@ -210,12 +209,6 @@ async def health_check():
# Check database connection
await db_manager.main_db.command("ping")
# Check Redis connection (optional)
redis_status = "disabled"
if redis_manager.is_available:
await redis_manager.client.ping()
redis_status = "healthy"
return {
"service": settings.app_name,
"version": settings.app_version,
@@ -223,12 +216,11 @@ async def health_check():
"timestamp": datetime.utcnow().isoformat(),
"components": {
"database": "healthy",
"redis": redis_status,
"event_bus": "healthy"
},
"modules": {
"sensors": "loaded",
"demand_response": "loaded",
"demand_response": "disabled",
"data_ingestion": "loaded"
}
}
@@ -244,21 +236,21 @@ async def system_overview():
"""Get system overview"""
try:
from modules.sensors import SensorService
from modules.demand_response import DemandResponseService
# TEMPORARILY DISABLED: from modules.demand_response import DemandResponseService
sensor_service = SensorService(db_manager.sensors_db, redis_manager.client)
dr_service = DemandResponseService(db_manager.demand_response_db, redis_manager.client)
sensor_service = SensorService(db_manager.sensors_db, None)
# TEMPORARILY DISABLED: dr_service = DemandResponseService(db_manager.demand_response_db, None)
# Get sensor counts
all_sensors = await sensor_service.get_sensors()
active_sensors = [s for s in all_sensors if s.get("status") == "online"]
# Get room counts
room_service = RoomService(db_manager.sensors_db, redis_manager.client)
room_service = RoomService(db_manager.sensors_db, None)
all_rooms = await room_service.get_rooms()
# Get DR event counts
active_events = len(dr_service.active_events) if hasattr(dr_service, 'active_events') else 0
# TEMPORARILY DISABLED: active_events = len(dr_service.active_events) if hasattr(dr_service, 'active_events') else 0
return {
"timestamp": datetime.utcnow().isoformat(),
@@ -271,7 +263,7 @@ async def system_overview():
"total": len(all_rooms)
},
"demand_response": {
"active_events": active_events
"status": "disabled"
},
"event_bus": {
"topics": event_bus.get_topics(),
@@ -291,9 +283,16 @@ app.include_router(
tags=["sensors"]
)
# Note: Demand Response and Data Ingestion routers would be added here
# Data Ingestion router
from modules.data_ingestion import router as data_ingestion_router
app.include_router(
data_ingestion_router,
prefix="/api/v1/ingestion",
tags=["data-ingestion"]
)
# Note: Demand Response router would be added here (currently disabled)
# app.include_router(demand_response_router, prefix="/api/v1/demand-response", tags=["demand-response"])
# app.include_router(data_ingestion_router, prefix="/api/v1/ingestion", tags=["data-ingestion"])
if __name__ == "__main__":

View File

@@ -1,11 +1,31 @@
"""Data Ingestion module - handles FTP monitoring and SA4CPS data processing."""
"""Data Ingestion module - handles HTTP/MQTT IoT device polling."""
from .ftp_monitor import FTPMonitor
from .slg_processor import SLGProcessor
from .config import Config
from .http_poller import HttpPoller
from .iot_config import IoTConfiguration, IoTDevice, get_iot_config, get_config_loader
from .models import (
DataSourceType, PollingStatus, DeviceStatus,
DataSourceSummary, PollingMetrics
)
from .router import router, set_http_poller
__all__ = [
"FTPMonitor",
"SLGProcessor",
"Config",
# HTTP Poller
"HttpPoller",
"set_http_poller",
# Configuration
"IoTConfiguration",
"IoTDevice",
"get_iot_config",
"get_config_loader",
# Models
"DataSourceType",
"PollingStatus",
"DeviceStatus",
"DataSourceSummary",
"PollingMetrics",
# Router
"router",
]

View File

@@ -1,45 +0,0 @@
#!/usr/bin/env python3
"""
Configuration for SA4CPS Data Ingestion Service
Simple configuration management for FTP and MongoDB connections
"""
import os
from typing import Dict, Any
# FTP Configuration for SA4CPS server
FTP_CONFIG: Dict[str, Any] = {
"host": os.getenv("FTP_SA4CPS_HOST", "ftp.sa4cps.pt"),
"username": os.getenv("FTP_SA4CPS_USERNAME", "curvascarga@sa4cps.pt"),
"password": os.getenv("FTP_SA4CPS_PASSWORD", 'n$WFtz9+bleN'), # Set via environment variable
"base_path": os.getenv("FTP_SA4CPS_REMOTE_PATH", "/SLGs/"),
"check_interval": int(os.getenv("FTP_CHECK_INTERVAL", "21600")), # 6 hours default
"skip_initial_scan": os.getenv("FTP_SKIP_INITIAL_SCAN", "true").lower() == "true",
}
# MongoDB Configuration
# Debug environment variables
print(f"DEBUG: MONGO_URL env var = {os.getenv('MONGO_URL', 'NOT SET')}")
print(f"DEBUG: All env vars starting with MONGO: {[k for k in os.environ.keys() if k.startswith('MONGO')]}")
MONGO_CONFIG: Dict[str, Any] = {
"connection_string": os.getenv(
"MONGO_URL",
"mongodb://admin:password123@localhost:27017/digitalmente_ingestion?authSource=admin"
),
"database_name": os.getenv("MONGODB_DATABASE", "digitalmente_ingestion")
}
# Logging Configuration
LOGGING_CONFIG: Dict[str, Any] = {
"level": os.getenv("LOG_LEVEL", "INFO"),
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
}
# Service Configuration
SERVICE_CONFIG: Dict[str, Any] = {
"name": "SA4CPS Data Ingestion Service",
"version": "1.0.0",
"port": int(os.getenv("SERVICE_PORT", "8008")),
"host": os.getenv("SERVICE_HOST", "0.0.0.0")
}

View File

@@ -1,478 +0,0 @@
import logging
from datetime import datetime
from typing import List, Dict, Any, Optional
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError
from config import MONGO_CONFIG
logger = logging.getLogger(__name__)
class DatabaseManager:
def __init__(self):
self.client: Optional[MongoClient] = None
self.db = None
self.collections = {}
self.energy_collections_cache = {} # Cache for dynamically created energy data collections
self.connection_string = MONGO_CONFIG["connection_string"]
self.database_name = MONGO_CONFIG["database_name"]
logger.info(f"Database manager initialized for: {self.database_name}")
async def connect(self):
try:
logger.info(f"Connecting to MongoDB at: {self.connection_string}")
self.client = MongoClient(self.connection_string, serverSelectionTimeoutMS=5000)
await self.ping()
self.db = self.client[self.database_name]
self.collections = {
'files': self.db.sa4cps_files,
'metadata': self.db.sa4cps_metadata,
'scanned_directories': self.db.sa4cps_scanned_directories
}
self._create_base_indexes()
logger.info(f"Connected to MongoDB database: {self.database_name}")
except (ConnectionFailure, ServerSelectionTimeoutError) as e:
logger.error(f"Failed to connect to MongoDB: {e}")
raise
async def close(self):
"""Close MongoDB connection"""
if self.client:
self.client.close()
logger.debug("MongoDB connection closed")
async def ping(self):
"""Test database connection"""
if not self.client:
raise ConnectionFailure("No database connection")
try:
# Use async approach with timeout
import asyncio
import concurrent.futures
# Run the ping command in a thread pool to avoid blocking
loop = asyncio.get_event_loop()
with concurrent.futures.ThreadPoolExecutor() as pool:
await asyncio.wait_for(
loop.run_in_executor(pool, self.client.admin.command, 'ping'),
timeout=3.0 # 3 second timeout for ping
)
logger.debug("MongoDB ping successful")
except asyncio.TimeoutError:
logger.error("MongoDB ping timeout after 3 seconds")
raise ConnectionFailure("MongoDB ping timeout")
except ConnectionFailure as e:
logger.error(f"MongoDB ping failed - Server not available: {e}")
raise
except Exception as e:
logger.error(f"MongoDB ping failed with error: {e}")
raise ConnectionFailure(f"Ping failed: {e}")
def _create_base_indexes(self):
"""Create indexes for base collections (not energy data collections)"""
try:
self.collections['files'].create_index("filename", unique=True)
self.collections['files'].create_index("processed_at")
self.collections['files'].create_index("directory_path")
self.collections['scanned_directories'].create_index("directory_path", unique=True)
self.collections['scanned_directories'].create_index("last_scanned")
self.collections['scanned_directories'].create_index("scan_status")
logger.info("Database indexes created successfully")
except Exception as e:
logger.warning(f"Failed to create indexes: {e}")
def _extract_level3_path(self, directory_path: str) -> Optional[str]:
"""Extract level 3 directory path (SLGs/Community/Building) from full path"""
# Expected structure: /SLGs/Community/Building/...
parts = directory_path.strip('/').split('/')
if len(parts) >= 3 and parts[0] == 'SLGs':
# Return SLGs/Community/Building
return '/'.join(parts[:3])
return None
def _sanitize_collection_name(self, level3_path: str) -> str:
"""Convert level 3 directory path to valid MongoDB collection name
Example: SLGs/CommunityA/Building1 -> energy_data__CommunityA_Building1
"""
parts = level3_path.strip('/').split('/')
if len(parts) >= 3 and parts[0] == 'SLGs':
# Use Community_Building as the collection suffix
collection_suffix = f"{parts[1]}_{parts[2]}"
collection_name = f"energy_data__{collection_suffix}"
return collection_name
# Fallback: sanitize the entire path
sanitized = level3_path.replace('/', '_').replace('.', '_').replace(' ', '_')
sanitized = sanitized.strip('_')
return f"energy_data__{sanitized}"
def _get_energy_collection(self, directory_path: str):
"""Get or create energy data collection for a specific level 3 directory path"""
level3_path = self._extract_level3_path(directory_path)
if not level3_path:
logger.warning(f"Could not extract level 3 path from: {directory_path}, using default collection")
# Fallback to a default collection for non-standard paths
collection_name = "energy_data__other"
else:
collection_name = self._sanitize_collection_name(level3_path)
# Check cache first
if collection_name in self.energy_collections_cache:
return self.energy_collections_cache[collection_name]
# Create/get collection
collection = self.db[collection_name]
# Create indexes for this energy collection
try:
collection.create_index([("filename", 1), ("timestamp", 1)])
collection.create_index("timestamp")
collection.create_index("meter_id")
logger.debug(f"Created indexes for collection: {collection_name}")
except Exception as e:
logger.warning(f"Failed to create indexes for {collection_name}: {e}")
# Cache the collection
self.energy_collections_cache[collection_name] = collection
logger.info(f"Initialized energy data collection: {collection_name} for path: {directory_path}")
return collection
def _list_energy_collections(self) -> List[str]:
"""List all energy data collections in the database"""
try:
all_collections = self.db.list_collection_names()
# Filter collections that start with 'energy_data__'
energy_collections = [c for c in all_collections if c.startswith('energy_data__')]
return energy_collections
except Exception as e:
logger.error(f"Error listing energy collections: {e}")
return []
async def store_file_data(self, filename: str, records: List[Dict[str, Any]], directory_path: str = None) -> bool:
try:
current_time = datetime.now()
# Determine which collection to use based on directory path
if directory_path:
energy_collection = self._get_energy_collection(directory_path)
level3_path = self._extract_level3_path(directory_path)
else:
logger.warning(f"No directory path provided for {filename}, using default collection")
energy_collection = self._get_energy_collection("/SLGs/unknown/unknown")
level3_path = None
# Store file metadata
file_metadata = {
"filename": filename,
"directory_path": directory_path,
"level3_path": level3_path,
"record_count": len(records),
"processed_at": current_time,
"file_size": sum(len(str(record)) for record in records),
"status": "processed"
}
# Insert or update file record
self.collections['files'].replace_one(
{"filename": filename},
file_metadata,
upsert=True
)
# Add filename and processed timestamp to each record
for record in records:
record["filename"] = filename
record["processed_at"] = current_time
record["directory_path"] = directory_path
# Insert energy data records into the appropriate collection
if records:
result = energy_collection.insert_many(records)
inserted_count = len(result.inserted_ids)
logger.debug(f"Stored {inserted_count} records from {filename} to {energy_collection.name}")
return True
return False
except Exception as e:
logger.error(f"Error storing data for {filename}: {e}")
# Store error metadata
error_metadata = {
"filename": filename,
"directory_path": directory_path,
"processed_at": current_time,
"status": "error",
"error_message": str(e)
}
self.collections['files'].replace_one(
{"filename": filename},
error_metadata,
upsert=True
)
return False
async def get_processed_files(self) -> List[str]:
"""Get list of successfully processed files"""
try:
cursor = self.collections['files'].find(
{"status": "processed"},
{"filename": 1, "_id": 0}
)
files = []
for doc in cursor:
files.append(doc["filename"])
return files
except Exception as e:
logger.error(f"Error getting processed files: {e}")
return []
async def is_file_processed(self, filename: str) -> bool:
"""Mock check if file is processed"""
return filename in await self.get_processed_files()
async def get_file_info(self, filename: str) -> Optional[Dict[str, Any]]:
"""Get information about a specific file"""
try:
return self.collections['files'].find_one({"filename": filename})
except Exception as e:
logger.error(f"Error getting file info for {filename}: {e}")
return None
# Directory scanning tracking methods
# Note: Only level 4+ directories (/SLGs/Community/Building/SubDir) are tracked
# to avoid unnecessary caching of high-level organizational directories
async def is_directory_scanned(self, directory_path: str, since_timestamp: datetime = None) -> bool:
"""Check if directory has been scanned recently
Note: Only level 4+ directories are tracked in the database
"""
try:
query = {"directory_path": directory_path, "scan_status": "complete"}
if since_timestamp:
query["last_scanned"] = {"$gte": since_timestamp}
result = self.collections['scanned_directories'].find_one(query)
return result is not None
except Exception as e:
logger.error(f"Error checking directory scan status for {directory_path}: {e}")
return False
async def mark_directory_scanned(self, directory_path: str, file_count: int, ftp_last_modified: datetime = None) -> bool:
"""Mark directory as scanned with current timestamp"""
try:
scan_record = {
"directory_path": directory_path,
"last_scanned": datetime.now(),
"file_count": file_count,
"scan_status": "complete"
}
if ftp_last_modified:
scan_record["ftp_last_modified"] = ftp_last_modified
# Use upsert to update existing or create new record
self.collections['scanned_directories'].replace_one(
{"directory_path": directory_path},
scan_record,
upsert=True
)
logger.debug(f"Marked directory as scanned: {directory_path} ({file_count} files)")
return True
except Exception as e:
logger.error(f"Error marking directory as scanned {directory_path}: {e}")
return False
async def get_scanned_directories(self) -> List[Dict[str, Any]]:
"""Get all scanned directory records"""
try:
cursor = self.collections['scanned_directories'].find()
return list(cursor)
except Exception as e:
logger.error(f"Error getting scanned directories: {e}")
return []
async def should_skip_directory(self, directory_path: str, ftp_last_modified: datetime = None) -> bool:
"""Determine if directory should be skipped based on scan history and modification time"""
try:
scan_record = self.collections['scanned_directories'].find_one(
{"directory_path": directory_path, "scan_status": "complete"}
)
if not scan_record:
return False # Never scanned, should scan
# If we have FTP modification time and it's newer than our last scan, don't skip
if ftp_last_modified and scan_record.get("last_scanned"):
return ftp_last_modified <= scan_record["last_scanned"]
# If directory was scanned successfully, skip it (assuming it's historical data)
return True
except Exception as e:
logger.error(f"Error determining if directory should be skipped {directory_path}: {e}")
return False
async def get_stats(self) -> Dict[str, Any]:
"""Get database statistics including all energy collections"""
try:
stats = {
"database": self.database_name,
"timestamp": datetime.now().isoformat()
}
# Count documents in base collections
for name, collection in self.collections.items():
try:
count = collection.count_documents({})
stats[f"{name}_count"] = count
except Exception as e:
stats[f"{name}_count"] = f"error: {e}"
# Get all energy collections and their counts
try:
energy_collections = self._list_energy_collections()
energy_stats = []
total_energy_records = 0
for collection_name in energy_collections:
collection = self.db[collection_name]
count = collection.count_documents({})
total_energy_records += count
energy_stats.append({
"collection": collection_name,
"record_count": count
})
stats["energy_collections"] = energy_stats
stats["total_energy_collections"] = len(energy_collections)
stats["total_energy_records"] = total_energy_records
except Exception as e:
stats["energy_collections"] = f"error: {e}"
# Get recent files
try:
recent_files = []
cursor = self.collections['files'].find(
{},
{"filename": 1, "processed_at": 1, "record_count": 1, "status": 1, "directory_path": 1, "level3_path": 1, "_id": 0}
).sort("processed_at", -1).limit(5)
for doc in cursor:
if doc.get("processed_at"):
doc["processed_at"] = doc["processed_at"].isoformat()
recent_files.append(doc)
stats["recent_files"] = recent_files
except Exception as e:
stats["recent_files"] = f"error: {e}"
return stats
except Exception as e:
logger.error(f"Error getting database stats: {e}")
return {"error": str(e), "timestamp": datetime.now().isoformat()}
async def get_energy_data(self,
filename: Optional[str] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
directory_path: Optional[str] = None,
limit: int = 100) -> List[Dict[str, Any]]:
"""Retrieve energy data with optional filtering
Args:
filename: Filter by specific filename
start_time: Filter by start timestamp
end_time: Filter by end timestamp
directory_path: Filter by specific directory path (level 3). If None, queries all collections
limit: Maximum number of records to return
"""
try:
query = {}
if filename:
query["filename"] = filename
if start_time or end_time:
time_query = {}
if start_time:
time_query["$gte"] = start_time
if end_time:
time_query["$lte"] = end_time
query["timestamp"] = time_query
data = []
# If directory_path is specified, query only that collection
if directory_path:
collection = self._get_energy_collection(directory_path)
cursor = collection.find(query).sort("timestamp", -1).limit(limit)
for doc in cursor:
data.append(self._format_energy_document(doc))
else:
# Query across all energy collections
energy_collection_names = self._list_energy_collections()
# Collect data from all collections, then sort and limit
all_data = []
per_collection_limit = max(limit, 1000) # Get more from each to ensure we have enough after sorting
for collection_name in energy_collection_names:
collection = self.db[collection_name]
cursor = collection.find(query).sort("timestamp", -1).limit(per_collection_limit)
for doc in cursor:
all_data.append(self._format_energy_document(doc))
# Sort all data by timestamp and apply final limit
all_data.sort(key=lambda x: x.get("timestamp", ""), reverse=True)
data = all_data[:limit]
return data
except Exception as e:
logger.error(f"Error retrieving energy data: {e}")
return []
def _format_energy_document(self, doc: Dict[str, Any]) -> Dict[str, Any]:
"""Format energy document for API response"""
# Convert ObjectId to string and datetime to ISO string
if "_id" in doc:
doc["_id"] = str(doc["_id"])
if "timestamp" in doc and hasattr(doc["timestamp"], "isoformat"):
doc["timestamp"] = doc["timestamp"].isoformat()
if "processed_at" in doc and hasattr(doc["processed_at"], "isoformat"):
doc["processed_at"] = doc["processed_at"].isoformat()
return doc

View File

@@ -1,339 +0,0 @@
import asyncio
from ftplib import FTP
import logging
import os
from datetime import datetime
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
import tempfile
from config import FTP_CONFIG
from slg_processor import SLGProcessor
logger = logging.getLogger(__name__)
@dataclass
class FTPFileInfo:
path: str
name: str
size: int
directory_path: str # Directory containing the file
modified_time: Optional[datetime] = None
class FTPMonitor:
def __init__(self, db_manager):
self.db_manager = db_manager
self.processor = SLGProcessor()
self.last_check: Optional[datetime] = None
self.processed_files: set = set()
self.files_processed_count = 0
self.status = "initializing"
self.ftp_host = FTP_CONFIG["host"]
self.ftp_user = FTP_CONFIG["username"]
self.ftp_pass = FTP_CONFIG["password"]
self.base_path = FTP_CONFIG["base_path"]
self.check_interval = FTP_CONFIG["check_interval"]
self.skip_initial_scan = FTP_CONFIG["skip_initial_scan"]
logger.info(f"FTP Monitor initialized for {self.ftp_host}")
async def initialize_processed_files_cache(self):
try:
# Add timeout to prevent blocking startup indefinitely
processed_file_names = await asyncio.wait_for(
self.db_manager.get_processed_files(),
timeout=10.0 # 10 second timeout
)
for filename in processed_file_names:
self.processed_files.add(filename)
logger.info(f"Loaded {len(processed_file_names)} already processed files from database")
return len(processed_file_names)
except asyncio.TimeoutError:
logger.warning("Timeout loading processed files cache - continuing with empty cache")
return 0
except Exception as e:
logger.error(f"Error loading processed files from database: {e}")
return 0
async def start_monitoring(self):
self.status = "initializing"
logger.info("Starting FTP monitoring loop")
try:
await self.initialize_processed_files_cache()
logger.info("FTP monitor initialization completed")
except asyncio.CancelledError:
logger.info("FTP monitor initialization cancelled")
self.status = "stopped"
return
except Exception as e:
logger.error(f"Error during FTP monitor initialization: {e}")
self.status = "error"
try:
await asyncio.sleep(1800)
except asyncio.CancelledError:
logger.info("FTP monitor cancelled during error recovery")
self.status = "stopped"
return
await asyncio.wait_for(self.check_for_new_files(), timeout=300.0) # 5 minute timeout
self.status = "running"
# Optionally skip initial scan and wait for first scheduled interval
if self.skip_initial_scan:
logger.info(f"Skipping initial scan - waiting {self.check_interval/3600:.1f} hours for first scheduled check")
try:
await asyncio.sleep(self.check_interval)
except asyncio.CancelledError:
logger.info("FTP monitoring cancelled during initial wait")
self.status = "stopped"
return
while True:
try:
# Add timeout to prevent indefinite blocking on FTP operations
await asyncio.wait_for(self.check_for_new_files(), timeout=300.0) # 5 minute timeout
self.status = "running"
logger.info(f"Waiting {self.check_interval/3600:.1f} hours until next check")
await asyncio.sleep(self.check_interval)
except asyncio.TimeoutError:
logger.warning("FTP check timed out after 5 minutes - will retry")
self.status = "error"
try:
await asyncio.sleep(900) # Wait 15 minutes before retry
except asyncio.CancelledError:
logger.info("FTP monitoring task cancelled during timeout recovery")
self.status = "stopped"
break
except asyncio.CancelledError:
logger.info("FTP monitoring task cancelled - shutting down gracefully")
self.status = "stopped"
break
except Exception as e:
self.status = "error"
logger.error(f"Error in monitoring loop: {e}")
try:
await asyncio.sleep(1800) # Wait 30 minutes before retry
except asyncio.CancelledError:
logger.info("FTP monitoring task cancelled during error recovery")
self.status = "stopped"
break
async def check_for_new_files(self) -> Dict[str, Any]:
self.last_check = datetime.now()
logger.info(f"Checking FTP server at {self.last_check}")
try:
with FTP(self.ftp_host) as ftp:
ftp.login(self.ftp_user, self.ftp_pass)
logger.info(f"Connected to FTP server: {self.ftp_host}")
new_files = await self._find_slg_files(ftp)
processed_count = 0
skipped_count = 0
for file_info in new_files:
# Check for cancellation during file processing loop
if asyncio.current_task().cancelled():
raise asyncio.CancelledError()
if file_info.name in self.processed_files:
logger.debug(f"Skipping already processed file (cached): {file_info.name}")
skipped_count += 1
continue
if await self.db_manager.is_file_processed(file_info.name):
logger.debug(f"Skipping already processed file (database): {file_info.name}")
self.processed_files.add(file_info.name)
skipped_count += 1
continue
logger.debug(f"Processing new file: {file_info.name}")
success = await self._process_file(ftp, file_info)
if success:
self.processed_files.add(file_info.name)
processed_count += 1
logger.debug(f"Successfully processed file: {file_info.name} ({processed_count} total)")
self.files_processed_count += 1
result = {
"files_found": len(new_files),
"files_processed": processed_count,
"files_skipped": skipped_count,
"timestamp": self.last_check.isoformat()
}
logger.info(f"Check complete: {result}")
return result
except Exception as e:
logger.error(f"FTP check failed: {e}")
raise
async def _find_slg_files(self, ftp: FTP) -> List[FTPFileInfo]:
files = []
try:
await self._scan_directories_iterative(ftp, self.base_path, files)
logger.info(f"Found {len(files)} .slg_v2 files across all directories")
return files
except Exception as e:
logger.error(f"Error scanning FTP directory: {e}")
return []
async def _scan_directories_iterative(self, ftp: FTP, base_path: str, files: List[FTPFileInfo]):
directories_to_scan = [(base_path, 0)]
visited_dirs = set()
skipped_dirs = 0
scanned_dirs = 0
while directories_to_scan:
current_dir, current_depth = directories_to_scan.pop(0) # FIFO queue
normalized_path = current_dir.rstrip('/') if current_dir != '/' else '/'
if normalized_path in visited_dirs:
logger.debug(f"Skipping already visited directory: {normalized_path}")
continue
visited_dirs.add(normalized_path)
# Determine directory depth (level 4 = /SLGs/Community/Building/SubDir)
path_parts = normalized_path.strip('/').split('/')
directory_level = len(path_parts)
# Check if directory should be skipped based on previous scans (only for level 4+)
if directory_level >= 4 and await self.db_manager.should_skip_directory(normalized_path):
logger.info(f"Skipping previously scanned level {directory_level} directory: {normalized_path}")
skipped_dirs += 1
continue
logger.debug(f"Scanning directory: {normalized_path} (depth: {current_depth}, queue: {len(directories_to_scan)})")
scanned_dirs += 1
try:
original_dir = ftp.pwd()
ftp.cwd(current_dir)
dir_list = []
ftp.retrlines('LIST', dir_list.append)
logger.debug(f"Found {len(dir_list)} entries in {normalized_path}")
# Count files found in this directory
files_found_in_dir = 0
for line in dir_list:
parts = line.split()
if len(parts) >= 9:
filename = parts[-1]
permissions = parts[0]
if filename in ['.', '..']:
continue
if permissions.startswith('d'):
if normalized_path == '/':
subdirectory_path = f"/{filename}"
else:
subdirectory_path = f"{normalized_path}/{filename}"
subdirectory_normalized = subdirectory_path.rstrip('/') if subdirectory_path != '/' else '/'
if subdirectory_normalized not in visited_dirs:
directories_to_scan.append((subdirectory_path, current_depth + 1))
logger.debug(f"Added to queue: {subdirectory_path}")
else:
logger.debug(f"Skipping already visited: {subdirectory_path}")
elif filename.endswith('.sgl_v2'):
logger.debug(f"Found .slg_v2 file: {filename} in {normalized_path}")
try:
size = int(parts[4])
if normalized_path == '/':
full_path = f"/{filename}"
else:
full_path = f"{normalized_path}/{filename}"
files.append(FTPFileInfo(
path=full_path,
name=filename,
size=size,
directory_path=normalized_path
))
files_found_in_dir += 1
except (ValueError, IndexError):
logger.warning(f"Could not parse file info for: {filename}")
ftp.cwd(original_dir)
# Mark directory as scanned (only for level 4+ directories)
if directory_level >= 4:
await self.db_manager.mark_directory_scanned(normalized_path, files_found_in_dir)
logger.debug(f"Completed scanning level {directory_level} directory: {normalized_path} ({files_found_in_dir} files found)")
else:
logger.debug(f"Completed scanning level {directory_level} directory (not saved to cache): {normalized_path} ({files_found_in_dir} files found)")
except Exception as e:
logger.warning(f"Error scanning directory {normalized_path}: {e}")
continue
logger.info(f"Iterative scan completed. Scanned: {scanned_dirs} directories, Skipped: {skipped_dirs} directories (Total visited: {len(visited_dirs)})")
async def _process_file(self, ftp: FTP, file_info: FTPFileInfo) -> bool:
logger.debug(f"Processing file: {file_info.path} ({file_info.size} bytes) from directory: {file_info.directory_path}")
try:
with tempfile.NamedTemporaryFile(mode='wb', suffix='.slg_v2', delete=False) as temp_file:
temp_path = temp_file.name
with open(temp_path, 'wb') as f:
ftp.retrbinary(f'RETR {file_info.path}', f.write)
records = await self.processor.process_file(temp_path, file_info.name)
if records:
# Pass directory path to store_file_data for collection selection
await self.db_manager.store_file_data(file_info.name, records, file_info.directory_path)
logger.debug(f"Stored {len(records)} records from {file_info.name} to collection for {file_info.directory_path}")
return True
else:
logger.warning(f"No valid records found in {file_info.name}")
return False
except Exception as e:
logger.error(f"Error processing file {file_info.name}: {e}")
return False
finally:
try:
if 'temp_path' in locals():
os.unlink(temp_path)
except OSError:
pass
def get_status(self) -> str:
return self.status
def get_last_check_time(self) -> Optional[str]:
return self.last_check.isoformat() if self.last_check else None
def get_processed_count(self) -> int:
return self.files_processed_count
def get_detailed_status(self) -> Dict[str, Any]:
return {
"status": self.status,
"last_check": self.get_last_check_time(),
"files_processed": self.files_processed_count,
"processed_files_count": len(self.processed_files),
"check_interval_hours": self.check_interval / 3600,
"ftp_host": self.ftp_host,
"base_path": self.base_path,
}

View File

@@ -0,0 +1,353 @@
"""
HTTP Poller Service
Polls IoT devices via HTTP and ingests sensor data
"""
import asyncio
import logging
import time
from datetime import datetime
from typing import Dict, Optional, Any, List
import aiohttp
from motor.motor_asyncio import AsyncIOMotorDatabase
from .iot_config import IoTConfiguration, IoTDevice, SensorConfig, get_iot_config
from .models import DeviceStatus, PollingStatus, PollingMetrics
from modules.sensors.models import SensorReading, SensorType
from core.events import event_bus, EventTopics
logger = logging.getLogger(__name__)
class HttpPoller:
"""HTTP-based IoT device poller"""
def __init__(
self,
sensors_db: AsyncIOMotorDatabase,
config: Optional[IoTConfiguration] = None,
poll_interval: int = 60,
timeout: int = 10,
max_concurrent: int = 5
):
"""
Initialize HTTP poller
Args:
sensors_db: Motor database for sensor data storage
config: IoT configuration (loads from file if None)
poll_interval: Seconds between polls
timeout: HTTP request timeout in seconds
max_concurrent: Maximum concurrent HTTP requests
"""
self.sensors_db = sensors_db
self.config = config or get_iot_config()
self.poll_interval = poll_interval
self.timeout = timeout
self.max_concurrent = max_concurrent
# Metrics tracking
self.device_status: Dict[str, DeviceStatus] = {}
self.total_polls = 0
self.successful_polls = 0
self.failed_polls = 0
self.poll_times: List[float] = []
# Control flags
self.running = False
self._semaphore = asyncio.Semaphore(max_concurrent)
# Initialize device status
self._initialize_device_status()
def _initialize_device_status(self):
"""Initialize status tracking for all devices"""
for device in self.config.get_all_devices():
self.device_status[device.name] = DeviceStatus(
device_name=device.name,
device_type=device.type,
uri=device.uri,
status=PollingStatus.INACTIVE,
sensors_count=len(device.sensors)
)
async def poll_device(self, device: IoTDevice) -> bool:
"""
Poll a single device and ingest its data
Args:
device: IoT device configuration
Returns:
True if successful, False otherwise
"""
async with self._semaphore:
status = self.device_status[device.name]
start_time = time.time()
try:
# Update status
status.last_poll = datetime.utcnow()
status.total_polls += 1
self.total_polls += 1
# Make HTTP request
async with aiohttp.ClientSession() as session:
async with session.get(
device.uri,
timeout=aiohttp.ClientTimeout(total=self.timeout)
) as response:
if response.status != 200:
raise Exception(f"HTTP {response.status}: {await response.text()}")
data = await response.json()
# Extract and store sensor readings
timestamp = int(datetime.utcnow().timestamp())
readings_stored = 0
for sensor in device.sensors:
try:
value = self._extract_value(data, sensor.tag)
if value is not None:
await self._store_reading(
device=device,
sensor=sensor,
value=value,
timestamp=timestamp
)
readings_stored += 1
except Exception as e:
logger.warning(
f"Failed to extract sensor {sensor.type} from {device.name}: {e}"
)
# Update success metrics
poll_time = (time.time() - start_time) * 1000
self.poll_times.append(poll_time)
if len(self.poll_times) > 100:
self.poll_times.pop(0)
status.successful_polls += 1
status.last_success = datetime.utcnow()
status.status = PollingStatus.ACTIVE
status.last_error = None
self.successful_polls += 1
logger.debug(
f"Polled {device.name}: {readings_stored} readings in {poll_time:.1f}ms"
)
return True
except asyncio.TimeoutError:
error_msg = f"Timeout after {self.timeout}s"
status.failed_polls += 1
status.status = PollingStatus.ERROR
status.last_error = error_msg
self.failed_polls += 1
logger.error(f"Timeout polling {device.name}")
return False
except Exception as e:
error_msg = str(e)
status.failed_polls += 1
status.status = PollingStatus.ERROR
status.last_error = error_msg
self.failed_polls += 1
logger.error(f"Error polling {device.name}: {e}")
return False
def _extract_value(self, data: Dict[str, Any], tag_path: List[str]) -> Optional[Any]:
"""
Extract value from nested JSON using tag path
Args:
data: JSON response data
tag_path: List of keys to traverse
Returns:
Extracted value or None
"""
current = data
for key in tag_path:
if isinstance(current, dict):
current = current.get(key)
if current is None:
return None
else:
return None
return current
async def _store_reading(
self,
device: IoTDevice,
sensor: SensorConfig,
value: Any,
timestamp: int
):
"""
Store sensor reading in database and publish event
Args:
device: Device configuration
sensor: Sensor configuration
value: Sensor value
timestamp: Unix timestamp
"""
# Map sensor type to SensorType enum (with fallback to ENERGY)
sensor_type_map = {
"power": SensorType.ENERGY,
"voltage": SensorType.ENERGY,
"current": SensorType.ENERGY,
"energy": SensorType.ENERGY,
"temperature": SensorType.TEMPERATURE,
"humidity": SensorType.HUMIDITY,
"co2": SensorType.CO2,
"generation": SensorType.ENERGY,
"charging_rate": SensorType.ENERGY,
"doorOpen": SensorType.OCCUPANCY,
"state": SensorType.OCCUPANCY,
}
sensor_type = sensor_type_map.get(sensor.type, SensorType.ENERGY)
# Create sensor ID from device name and sensor type
sensor_id = f"{device.name}_{sensor.type}".replace(" ", "_")
# Convert value based on data type
if sensor.data == "BOOLEAN":
numeric_value = 1.0 if value else 0.0
else:
numeric_value = float(value)
# Create sensor reading
reading_data = {
"sensor_id": sensor_id,
"sensor_type": sensor_type.value,
"timestamp": timestamp,
"room": device.type, # Use device type as room (battery, refrigerator, etc.)
"metadata": {
"device_name": device.name,
"device_type": device.type,
"sensor_type": sensor.type,
"data_type": sensor.data,
"source": "http_poller"
}
}
# Add specific sensor data fields
if sensor.type == "power":
reading_data["power"] = numeric_value
elif sensor.type == "voltage":
reading_data["voltage"] = numeric_value
elif sensor.type == "current":
reading_data["current"] = numeric_value
elif sensor.type == "temperature":
reading_data["temperature"] = numeric_value
elif sensor.type == "humidity":
reading_data["humidity"] = numeric_value
elif sensor.type == "energy":
reading_data["energy"] = numeric_value
elif sensor.type == "generation":
reading_data["generation"] = numeric_value
else:
reading_data["energy"] = numeric_value # Default field
# Store in database
await self.sensors_db.sensor_readings.insert_one(reading_data)
# Publish to event bus
await event_bus.publish(EventTopics.ENERGY_DATA, {
"sensor_id": sensor_id,
"value": numeric_value,
"timestamp": timestamp,
"device_name": device.name,
"sensor_type": sensor.type
})
async def poll_all_devices(self):
"""Poll all configured devices concurrently"""
devices = self.config.get_all_devices()
if not devices:
logger.warning("No devices configured for polling")
return
logger.info(f"Polling {len(devices)} devices...")
# Poll all devices concurrently
tasks = [self.poll_device(device) for device in devices]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Count successes
successes = sum(1 for r in results if r is True)
logger.info(f"Polling complete: {successes}/{len(devices)} successful")
async def run(self):
"""Run continuous polling loop"""
self.running = True
logger.info(
f"Starting HTTP poller: {len(self.config.get_all_devices())} devices, "
f"interval={self.poll_interval}s"
)
while self.running:
try:
await self.poll_all_devices()
await asyncio.sleep(self.poll_interval)
except Exception as e:
logger.error(f"Error in polling loop: {e}")
await asyncio.sleep(10) # Brief pause on error
def stop(self):
"""Stop the polling loop"""
logger.info("Stopping HTTP poller...")
self.running = False
def get_metrics(self) -> PollingMetrics:
"""Get current polling metrics"""
active_count = sum(
1 for s in self.device_status.values()
if s.status == PollingStatus.ACTIVE
)
inactive_count = sum(
1 for s in self.device_status.values()
if s.status == PollingStatus.INACTIVE
)
error_count = sum(
1 for s in self.device_status.values()
if s.status == PollingStatus.ERROR
)
# Count devices by type
devices_by_type: Dict[str, int] = {}
for device in self.config.get_all_devices():
devices_by_type[device.type] = devices_by_type.get(device.type, 0) + 1
# Calculate success rate
success_rate = 0.0
if self.total_polls > 0:
success_rate = (self.successful_polls / self.total_polls) * 100
# Calculate average poll time
avg_poll_time = 0.0
if self.poll_times:
avg_poll_time = sum(self.poll_times) / len(self.poll_times)
return PollingMetrics(
timestamp=datetime.utcnow(),
total_devices=len(self.device_status),
active_devices=active_count,
inactive_devices=inactive_count,
error_devices=error_count,
total_polls=self.total_polls,
successful_polls=self.successful_polls,
failed_polls=self.failed_polls,
success_rate=success_rate,
average_poll_time_ms=avg_poll_time,
devices_by_type=devices_by_type
)
def get_device_statuses(self) -> List[DeviceStatus]:
"""Get status for all devices"""
return list(self.device_status.values())

View File

@@ -0,0 +1,187 @@
"""
IoT Configuration Loader
Loads and validates IoT device configuration from iots-right.json
"""
import json
import logging
from pathlib import Path
from typing import Dict, List, Optional, Any
from pydantic import BaseModel, Field, validator
logger = logging.getLogger(__name__)
class SensorConfig(BaseModel):
"""Individual sensor configuration within a device"""
type: str = Field(..., description="Sensor type (power, voltage, temperature, etc.)")
tag: List[str] = Field(..., description="JSON path tags to extract value")
data: str = Field(..., description="Data type: DOUBLE, BOOLEAN, etc.")
class IoTDevice(BaseModel):
"""IoT device configuration"""
name: str = Field(..., description="Device name")
type: str = Field(..., description="Device type (battery, refrigerator, hvac, etc.)")
uri: str = Field(..., description="HTTP endpoint URI")
sensors: List[SensorConfig] = Field(..., description="List of sensors")
@validator('uri')
def validate_uri(cls, v):
"""Ensure URI is valid"""
if not v.startswith(('http://', 'https://')):
raise ValueError(f"Invalid URI: {v}")
return v
class IoTConfiguration(BaseModel):
"""Complete IoT configuration"""
iots: Dict[str, List[IoTDevice]] = Field(..., description="IoT devices grouped by category")
def get_all_devices(self) -> List[IoTDevice]:
"""Get flat list of all devices"""
devices = []
for device_list in self.iots.values():
devices.extend(device_list)
return devices
def get_devices_by_type(self, device_type: str) -> List[IoTDevice]:
"""Get devices by category type"""
return self.iots.get(device_type, [])
def get_device_by_name(self, name: str) -> Optional[IoTDevice]:
"""Find device by name"""
for device in self.get_all_devices():
if device.name == name:
return device
return None
class IoTConfigLoader:
"""Loads and manages IoT configuration"""
def __init__(self, config_path: Optional[Path] = None):
"""
Initialize config loader
Args:
config_path: Path to iots-right.json. If None, looks in monolith root
"""
self.config_path = config_path or self._find_config_file()
self.config: Optional[IoTConfiguration] = None
def _find_config_file(self) -> Path:
"""Find iots-right.json in monolith directory"""
# Start from current file location and go up to find monolith root
current = Path(__file__).parent
while current.name != 'monolith' and current.parent != current:
current = current.parent
config_file = current / 'iots-right.json'
if not config_file.exists():
logger.warning(f"Config file not found at {config_file}")
return config_file
def load(self) -> IoTConfiguration:
"""
Load configuration from file
Returns:
IoTConfiguration object
Raises:
FileNotFoundError: If config file doesn't exist
ValueError: If config is invalid
"""
if not self.config_path.exists():
raise FileNotFoundError(f"IoT config not found: {self.config_path}")
try:
with open(self.config_path, 'r') as f:
data = json.load(f)
self.config = IoTConfiguration(**data)
device_count = len(self.config.get_all_devices())
logger.info(f"Loaded IoT config: {device_count} devices from {self.config_path}")
# Log summary by type
for device_type, devices in self.config.iots.items():
logger.info(f" - {device_type}: {len(devices)} devices")
return self.config
except json.JSONDecodeError as e:
logger.error(f"Invalid JSON in config file: {e}")
raise ValueError(f"Invalid JSON in {self.config_path}: {e}")
except Exception as e:
logger.error(f"Failed to load config: {e}")
raise
def reload(self) -> IoTConfiguration:
"""Reload configuration from file"""
return self.load()
def get_config(self) -> IoTConfiguration:
"""
Get current configuration, loading if necessary
Returns:
IoTConfiguration object
"""
if self.config is None:
self.load()
return self.config
def get_device_summary(self) -> Dict[str, Any]:
"""Get summary statistics of devices"""
config = self.get_config()
total_devices = 0
total_sensors = 0
summary = {}
for device_type, devices in config.iots.items():
device_count = len(devices)
sensor_count = sum(len(d.sensors) for d in devices)
total_devices += device_count
total_sensors += sensor_count
summary[device_type] = {
"device_count": device_count,
"sensor_count": sensor_count,
"devices": [
{
"name": d.name,
"uri": d.uri,
"sensor_count": len(d.sensors)
}
for d in devices
]
}
return {
"total_devices": total_devices,
"total_sensors": total_sensors,
"by_type": summary,
"config_file": str(self.config_path)
}
# Global config loader instance
_config_loader: Optional[IoTConfigLoader] = None
def get_config_loader() -> IoTConfigLoader:
"""Get global config loader instance"""
global _config_loader
if _config_loader is None:
_config_loader = IoTConfigLoader()
return _config_loader
def get_iot_config() -> IoTConfiguration:
"""Get IoT configuration"""
return get_config_loader().get_config()

View File

@@ -0,0 +1,112 @@
"""
Pydantic models for Data Ingestion module
"""
from datetime import datetime
from typing import Optional, Dict, Any, List
from pydantic import BaseModel, Field
from enum import Enum
class PollingStatus(str, Enum):
"""Device polling status"""
ACTIVE = "active"
INACTIVE = "inactive"
ERROR = "error"
DISABLED = "disabled"
class DataSourceType(str, Enum):
"""Data source type"""
HTTP = "http"
MQTT = "mqtt"
class DeviceStatus(BaseModel):
"""Device polling status information"""
device_name: str
device_type: str
uri: str
status: PollingStatus
last_poll: Optional[datetime] = None
last_success: Optional[datetime] = None
total_polls: int = 0
successful_polls: int = 0
failed_polls: int = 0
last_error: Optional[str] = None
sensors_count: int = 0
class Config:
json_encoders = {
datetime: lambda v: v.isoformat() if v else None
}
class DataSourceSummary(BaseModel):
"""Summary of data sources"""
source_type: DataSourceType
enabled: bool
total_devices: int
active_devices: int
total_sensors: int
devices: List[DeviceStatus]
class PollingMetrics(BaseModel):
"""Polling performance metrics"""
timestamp: datetime
total_devices: int
active_devices: int
inactive_devices: int
error_devices: int
total_polls: int
successful_polls: int
failed_polls: int
success_rate: float
average_poll_time_ms: float
devices_by_type: Dict[str, int]
class Config:
json_encoders = {
datetime: lambda v: v.isoformat()
}
class HttpPollerConfig(BaseModel):
"""HTTP poller configuration"""
enabled: bool = True
poll_interval_seconds: int = Field(60, ge=5, le=3600)
timeout_seconds: int = Field(10, ge=1, le=60)
max_retries: int = Field(3, ge=0, le=10)
concurrent_requests: int = Field(5, ge=1, le=50)
class MqttSubscriberConfig(BaseModel):
"""MQTT subscriber configuration (future)"""
enabled: bool = False
broker_host: str = "localhost"
broker_port: int = 1883
username: Optional[str] = None
password: Optional[str] = None
topics: List[str] = []
class DataIngestionConfig(BaseModel):
"""Complete data ingestion configuration"""
http: HttpPollerConfig = HttpPollerConfig()
mqtt: MqttSubscriberConfig = MqttSubscriberConfig()
class HealthResponse(BaseModel):
"""Health check response"""
service: str
status: str
timestamp: datetime
version: str
http_poller: Optional[Dict[str, Any]] = None
mqtt_subscriber: Optional[Dict[str, Any]] = None
class Config:
json_encoders = {
datetime: lambda v: v.isoformat()
}

View File

@@ -0,0 +1,194 @@
"""Data Ingestion module API routes."""
import logging
from datetime import datetime
from fastapi import APIRouter, HTTPException, Depends
from typing import Optional
from .models import (
HealthResponse, DataSourceSummary, PollingMetrics,
DeviceStatus, DataSourceType
)
from .iot_config import get_config_loader
from .http_poller import HttpPoller
from core.dependencies import get_sensors_db
logger = logging.getLogger(__name__)
# Create router
router = APIRouter()
# Global HTTP poller instance (will be set by main.py)
_http_poller: Optional[HttpPoller] = None
def set_http_poller(poller: HttpPoller):
"""Set the global HTTP poller instance"""
global _http_poller
_http_poller = poller
def get_http_poller() -> HttpPoller:
"""Get the HTTP poller instance"""
if _http_poller is None:
raise HTTPException(status_code=503, detail="HTTP poller not initialized")
return _http_poller
# Health check
@router.get("/health", response_model=HealthResponse)
async def health_check(db=Depends(get_sensors_db)):
"""Health check endpoint for data ingestion module"""
try:
await db.command("ping")
http_status = None
if _http_poller:
metrics = _http_poller.get_metrics()
http_status = {
"enabled": True,
"running": _http_poller.running,
"total_devices": metrics.total_devices,
"active_devices": metrics.active_devices,
"success_rate": metrics.success_rate
}
return HealthResponse(
service="data-ingestion-module",
status="healthy",
timestamp=datetime.utcnow(),
version="1.0.0",
http_poller=http_status,
mqtt_subscriber={"enabled": False}
)
except Exception as e:
logger.error(f"Health check failed: {e}")
raise HTTPException(status_code=503, detail="Service Unavailable")
@router.get("/sources", response_model=DataSourceSummary)
async def get_data_sources():
"""
Get all data ingestion sources
Implements OpenAPI endpoint: GET /api/v1/sources
"""
try:
poller = get_http_poller()
config_loader = get_config_loader()
device_statuses = poller.get_device_statuses()
active_count = sum(1 for d in device_statuses if d.status.value == "active")
return DataSourceSummary(
source_type=DataSourceType.HTTP,
enabled=True,
total_devices=len(device_statuses),
active_devices=active_count,
total_sensors=sum(d.sensors_count for d in device_statuses),
devices=device_statuses
)
except Exception as e:
logger.error(f"Error getting data sources: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/sources/summary")
async def get_sources_summary():
"""Get summary of configured data sources"""
try:
config_loader = get_config_loader()
summary = config_loader.get_device_summary()
return summary
except Exception as e:
logger.error(f"Error getting sources summary: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/sources/devices")
async def get_device_list():
"""Get list of all configured devices"""
try:
poller = get_http_poller()
devices = poller.get_device_statuses()
return {
"devices": devices,
"count": len(devices),
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Error getting device list: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/sources/devices/{device_name}", response_model=DeviceStatus)
async def get_device_status(device_name: str):
"""Get status of a specific device"""
try:
poller = get_http_poller()
status = poller.device_status.get(device_name)
if not status:
raise HTTPException(status_code=404, detail=f"Device '{device_name}' not found")
return status
except HTTPException:
raise
except Exception as e:
logger.error(f"Error getting device status: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.get("/metrics", response_model=PollingMetrics)
async def get_polling_metrics():
"""Get HTTP polling performance metrics"""
try:
poller = get_http_poller()
return poller.get_metrics()
except Exception as e:
logger.error(f"Error getting metrics: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.post("/poll/trigger")
async def trigger_manual_poll():
"""Manually trigger a polling cycle for all devices"""
try:
poller = get_http_poller()
# Trigger poll in background
import asyncio
asyncio.create_task(poller.poll_all_devices())
return {
"message": "Manual poll triggered",
"timestamp": datetime.utcnow().isoformat(),
"devices": len(poller.device_status)
}
except Exception as e:
logger.error(f"Error triggering poll: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@router.post("/config/reload")
async def reload_configuration():
"""Reload IoT configuration from iots-right.json"""
try:
config_loader = get_config_loader()
config = config_loader.reload()
return {
"message": "Configuration reloaded successfully",
"total_devices": len(config.get_all_devices()),
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Error reloading config: {e}")
raise HTTPException(status_code=500, detail=str(e))

View File

@@ -1,171 +0,0 @@
#!/usr/bin/env python3
"""
SA4CPS SLG_V2 File Processor
Simple parser for .slg_v2 energy data files
"""
import logging
from datetime import datetime
from typing import List, Dict, Any, Optional
import re
logger = logging.getLogger(__name__)
class SLGProcessor:
"""Processes SA4CPS .slg_v2 files into structured energy data records"""
def __init__(self):
self.processed_files = 0
self.total_records = 0
async def process_file(self, file_path: str, filename: str) -> List[Dict[str, Any]]:
"""Process a .slg_v2 file and return energy data records"""
logger.info(f"Processing SLG file: {filename}")
try:
with open(file_path, 'r', encoding='utf-8') as file:
lines = file.readlines()
records = []
file_metadata = self._parse_metadata(lines[:5]) # Parse first 5 lines for metadata
# Process data lines (lines starting with '20' are data records)
for line_num, line in enumerate(lines, 1):
line = line.strip()
if line.startswith('20'): # Data record lines start with '20' (year)
record = self._parse_data_line(line, file_metadata, filename)
if record:
records.append(record)
self.processed_files += 1
self.total_records += len(records)
logger.info(f"Processed {len(records)} records from {filename}")
return records
except Exception as e:
logger.error(f"Error processing {filename}: {e}")
return []
def _parse_metadata(self, header_lines: List[str]) -> Dict[str, Any]:
"""Parse metadata from SLG file header"""
metadata = {
"meter_id": None,
"measurement_type": None,
"unit": None,
"interval": None,
"period_start": None,
"period_end": None
}
try:
for line in header_lines:
line = line.strip()
if line.startswith('00'): # Header line with meter info
parts = line.split('\t')
if len(parts) >= 12:
metadata["meter_id"] = parts[3] # Meter ID
metadata["period_start"] = self._parse_date(parts[6])
metadata["period_end"] = self._parse_date(parts[7])
elif line.startswith('01'): # Measurement configuration
parts = line.split('\t')
if len(parts) >= 10:
metadata["measurement_type"] = parts[4] # POTENCIA
metadata["unit"] = parts[5] # K (kW)
metadata["interval"] = parts[6] # 15M
except Exception as e:
logger.warning(f"Error parsing metadata: {e}")
return metadata
def _parse_data_line(self, line: str, metadata: Dict[str, Any], filename: str) -> Optional[Dict[str, Any]]:
"""Parse a data line into an energy record"""
try:
parts = line.split('\t')
if len(parts) < 4:
return None
# Parse timestamp (format: 20250201 0015)
date_part = parts[1] # 20250201
time_part = parts[2] # 0015
# Convert to datetime
timestamp = self._parse_timestamp(date_part, time_part)
if not timestamp:
return None
# Parse energy value
value_str = parts[3].replace('.', '') # Remove decimal separator
try:
value = float(value_str) / 1000.0 # Convert from thousandths
except ValueError:
value = 0.0
# Create record
record = {
"timestamp": timestamp,
"meter_id": metadata.get("meter_id", "unknown"),
"measurement_type": metadata.get("measurement_type", "energy"),
"value": value,
"unit": metadata.get("unit", "kW"),
"interval": metadata.get("interval", "15M"),
"filename": filename,
"quality": int(parts[4]) if len(parts) > 4 else 0
}
return record
except Exception as e:
logger.warning(f"Error parsing data line '{line}': {e}")
return None
def _parse_date(self, date_str: str) -> Optional[datetime]:
"""Parse date string (YYYYMMDD format)"""
try:
if len(date_str) == 8 and date_str.isdigit():
year = int(date_str[:4])
month = int(date_str[4:6])
day = int(date_str[6:8])
return datetime(year, month, day)
except ValueError:
pass
return None
def _parse_timestamp(self, date_str: str, time_str: str) -> Optional[datetime]:
"""Parse timestamp from date and time strings"""
try:
# Parse date (YYYYMMDD)
if len(date_str) != 8 or not date_str.isdigit():
return None
year = int(date_str[:4])
month = int(date_str[4:6])
day = int(date_str[6:8])
# Parse time (HHMM)
if len(time_str) != 4 or not time_str.isdigit():
return None
hour = int(time_str[:2])
if hour ==24:
hour = 0
minute = int(time_str[2:4])
return datetime(year, month, day, hour, minute)
except ValueError as e:
logger.warning(f"Error parsing timestamp '{date_str} {time_str}': {e}")
return None
def get_stats(self) -> Dict[str, int]:
"""Get processing statistics"""
return {
"files_processed": self.processed_files,
"total_records": self.total_records
}

View File

@@ -1,8 +1,8 @@
"""Demand Response module - handles grid interaction and load management."""
from .models import (
DemandResponseInvitation,
InvitationResponse,
DRInvitation,
DRInvitationResponse,
EventRequest,
EventStatus,
LoadReductionRequest,
@@ -11,8 +11,8 @@ from .models import (
from .demand_response_service import DemandResponseService
__all__ = [
"DemandResponseInvitation",
"InvitationResponse",
"DRInvitation",
"DRInvitationResponse",
"EventRequest",
"EventStatus",
"LoadReductionRequest",

View File

@@ -14,7 +14,7 @@ from .room_service import RoomService
from .analytics_service import AnalyticsService
from .websocket_manager import WebSocketManager
from src.core.dependencies import get_sensors_db, get_redis
from core.dependencies import get_sensors_db, get_redis
logger = logging.getLogger(__name__)

View File

@@ -8,7 +8,7 @@ from typing import List, Set, Dict, Any
from fastapi import WebSocket, WebSocketDisconnect
import logging
from models import SensorReading
from .models import SensorReading
logger = logging.getLogger(__name__)