- Added iots-right.json configuration file to define IoT devices and their sensors. - Developed HttpPoller class to handle polling of IoT devices via HTTP. - Created IoT configuration loader to validate and load device configurations from JSON. - Introduced models for device status, polling metrics, and data sources. - Implemented API routes for health checks, device status retrieval, and configuration management. - Enhanced error handling and logging throughout the data ingestion process.
Energy Dashboard - Modular Monolith
This is the modular monolithic architecture version of the Energy Dashboard, refactored from the original microservices architecture.
Architecture Overview
The application is structured as a modular monolith, combining the benefits of:
- Monolithic deployment: Single application, simpler operations
- Modular design: Clear module boundaries, maintainability
Key Architectural Decisions
- Single Application: All modules run in one process
- Module Isolation: Each module has its own directory and clear interfaces
- Separate Databases: Each module maintains its own database for data isolation
- In-Process Event Bus: Replaces Redis pub/sub for inter-module communication
- Direct Dependency Injection: Modules communicate directly via function calls
- Shared Core: Common infrastructure (database, events, config) shared across modules
Project Structure
monolith/
├── src/
│ ├── main.py # Main FastAPI application
│ ├── core/ # Shared core infrastructure
│ │ ├── config.py # Centralized configuration
│ │ ├── database.py # Database connection manager
│ │ ├── events.py # In-process event bus
│ │ ├── redis.py # Optional Redis cache
│ │ ├── dependencies.py # FastAPI dependencies
│ │ └── logging_config.py # Logging setup
│ ├── modules/ # Business modules
│ │ ├── sensors/ # Sensor management module
│ │ │ ├── __init__.py
│ │ │ ├── router.py # API routes
│ │ │ ├── models.py # Data models
│ │ │ ├── sensor_service.py # Business logic
│ │ │ ├── room_service.py
│ │ │ ├── analytics_service.py
│ │ │ └── websocket_manager.py
│ │ ├── demand_response/ # Demand response module
│ │ │ ├── __init__.py
│ │ │ ├── models.py
│ │ │ └── demand_response_service.py
│ │ └── data_ingestion/ # Data ingestion module
│ │ ├── __init__.py
│ │ ├── config.py
│ │ ├── ftp_monitor.py
│ │ ├── slg_processor.py
│ │ └── database.py
│ └── api/ # API layer (if needed)
├── config/ # Configuration files
├── tests/ # Test files
├── requirements.txt # Python dependencies
├── Dockerfile # Docker build file
├── docker-compose.yml # Docker Compose configuration
└── README.md # This file
Modules
1. Sensors Module (src/modules/sensors)
Responsibility: Sensor management, room management, real-time data, and analytics
Key Features:
- Sensor CRUD operations
- Room management
- Real-time data ingestion
- Analytics and reporting
- WebSocket support for live data streaming
Database: energy_dashboard_sensors
API Endpoints: /api/v1/sensors/*, /api/v1/rooms/*, /api/v1/analytics/*
2. Demand Response Module (src/modules/demand_response)
Responsibility: Grid interaction, demand response events, and load management
Key Features:
- Demand response event management
- Device flexibility calculation
- Auto-response configuration
- Load reduction requests
Database: energy_dashboard_demand_response
API Endpoints: /api/v1/demand-response/*
3. Data Ingestion Module (src/modules/data_ingestion)
Responsibility: FTP monitoring and SA4CPS data processing
Key Features:
- FTP file monitoring
- .sgl_v2 file processing
- Dynamic collection management
- Duplicate detection
Database: digitalmente_ingestion
API Endpoints: /api/v1/ingestion/*
Core Components
Event Bus (src/core/events.py)
Replaces Redis pub/sub with an in-process event bus for inter-module communication.
Standard Event Topics:
energy_data: Energy consumption updatesdr_events: Demand response eventssensor_events: Sensor-related eventssystem_events: System-level eventsdata_ingestion: Data ingestion events
Usage Example:
from core.events import event_bus, EventTopics
# Publish event
await event_bus.publish(EventTopics.ENERGY_DATA, {"sensor_id": "sensor_1", "value": 3.5})
# Subscribe to events
def handle_energy_data(data):
print(f"Received energy data: {data}")
event_bus.subscribe(EventTopics.ENERGY_DATA, handle_energy_data)
Database Manager (src/core/database.py)
Centralized database connection management with separate databases per module.
Available Databases:
main_db: Main application databasesensors_db: Sensors module databasedemand_response_db: Demand response module databasedata_ingestion_db: Data ingestion module database
Usage Example:
from core.dependencies import get_sensors_db
from fastapi import Depends
async def my_endpoint(db=Depends(get_sensors_db)):
result = await db.sensors.find_one({"sensor_id": "sensor_1"})
Configuration (src/core/config.py)
Centralized configuration using Pydantic Settings.
Configuration Sources:
- Environment variables
.envfile (if present)- Default values
Getting Started
Prerequisites
- Python 3.11+
- MongoDB 7.0+ (deployed separately)
- Redis 7+ (optional, for caching - deployed separately)
- Docker and Docker Compose (for containerized deployment)
Local Development
-
Install dependencies:
cd monolith pip install -r requirements.txt -
Configure environment:
cp .env.example .env # Edit .env with your MongoDB and Redis connection strings -
Ensure MongoDB and Redis are accessible:
- MongoDB should be running and accessible at the URL specified in
MONGO_URL - Redis (optional) should be accessible at the URL specified in
REDIS_URL
- MongoDB should be running and accessible at the URL specified in
-
Run the application:
cd src uvicorn main:app --reload --host 0.0.0.0 --port 8000 -
Access the application:
- API: http://localhost:8000
- Health Check: http://localhost:8000/health
- API Docs: http://localhost:8000/docs
Docker Deployment
Note: MongoDB and Redis are deployed separately and must be accessible before starting the application.
-
Configure environment variables:
cp .env.example .env # Edit .env with your MongoDB and Redis connection strings -
Build and start the application:
cd monolith docker-compose up --build -d -
View logs:
docker-compose logs -f monolith -
Stop the application:
docker-compose down
API Endpoints
Global Endpoints
GET /: Root endpointGET /health: Global health checkGET /api/v1/overview: System overview
Sensors Module
-
GET /api/v1/sensors/get: Get sensors with filters -
GET /api/v1/sensors/{sensor_id}: Get sensor details -
GET /api/v1/sensors/{sensor_id}/data: Get sensor data -
POST /api/v1/sensors: Create sensor -
PUT /api/v1/sensors/{sensor_id}: Update sensor -
DELETE /api/v1/sensors/{sensor_id}: Delete sensor -
GET /api/v1/rooms: Get all rooms -
GET /api/v1/rooms/names: Get room names -
POST /api/v1/rooms: Create room -
GET /api/v1/rooms/{room_name}: Get room details -
PUT /api/v1/rooms/{room_name}: Update room -
DELETE /api/v1/rooms/{room_name}: Delete room -
GET /api/v1/analytics/summary: Analytics summary -
GET /api/v1/analytics/energy: Energy analytics -
POST /api/v1/data/query: Advanced data query -
WS /api/v1/ws: WebSocket for real-time data
Demand Response Module
- Endpoints for demand response events, invitations, and device management
- (To be fully documented when router is added)
Data Ingestion Module
- Endpoints for FTP monitoring status and manual triggers
- (To be fully documented when router is added)
Inter-Module Communication
Modules communicate in two ways:
1. Direct Dependency Injection
For synchronous operations, modules directly import and call each other's services:
from modules.sensors import SensorService
from core.dependencies import get_sensors_db
sensor_service = SensorService(db=await get_sensors_db(), redis=None)
sensors = await sensor_service.get_sensors()
2. Event-Driven Communication
For asynchronous operations, modules use the event bus:
from core.events import event_bus, EventTopics
# Publisher
await event_bus.publish(EventTopics.ENERGY_DATA, {
"sensor_id": "sensor_1",
"value": 3.5,
"timestamp": 1234567890
})
# Subscriber
async def handle_energy_update(data):
print(f"Energy update: {data}")
event_bus.subscribe(EventTopics.ENERGY_DATA, handle_energy_update)
Background Tasks
The application runs several background tasks:
-
Room Metrics Aggregation (every 5 minutes)
- Aggregates sensor data into room-level metrics
-
Data Cleanup (daily)
- Removes sensor data older than 90 days
-
Event Scheduler (every 60 seconds)
- Checks and executes scheduled demand response events
-
Auto Response (every 30 seconds)
- Processes automatic demand response opportunities
-
FTP Monitoring (every 6 hours, configurable)
- Monitors FTP server for new SA4CPS data files
Configuration Options
Key environment variables:
Database
MONGO_URL: MongoDB connection stringREDIS_URL: Redis connection stringREDIS_ENABLED: Enable/disable Redis (true/false)
Application
DEBUG: Enable debug mode (true/false)HOST: Application host (default: 0.0.0.0)PORT: Application port (default: 8000)
FTP
FTP_SA4CPS_HOST: FTP server hostFTP_SA4CPS_PORT: FTP server portFTP_SA4CPS_USERNAME: FTP usernameFTP_SA4CPS_PASSWORD: FTP passwordFTP_SA4CPS_REMOTE_PATH: Remote directory pathFTP_CHECK_INTERVAL: Check interval in secondsFTP_SKIP_INITIAL_SCAN: Skip initial FTP scan (true/false)
Migration from Microservices
See MIGRATION.md for detailed migration guide.
Development Guidelines
Adding a New Module
-
Create module directory:
src/modules/new_module/ -
Add module files:
__init__.py: Module exportsmodels.py: Pydantic modelsservice.py: Business logicrouter.py: API routes
-
Register module in main application:
from modules.new_module.router import router as new_module_router app.include_router(new_module_router, prefix="/api/v1/new-module", tags=["new-module"])
Adding an Event Topic
-
Add topic to
EventTopicsclass insrc/core/events.py:class EventTopics: NEW_TOPIC = "new_topic" -
Use in your module:
from core.events import event_bus, EventTopics await event_bus.publish(EventTopics.NEW_TOPIC, data)
Testing
# Run all tests
pytest
# Run with coverage
pytest --cov=src --cov-report=html
# Run specific module tests
pytest tests/modules/sensors/
Monitoring and Logging
- Logs: Application logs to stdout
- Log Level: Controlled by
DEBUGenvironment variable - Health Checks: Available at
/healthendpoint - Metrics: System overview at
/api/v1/overview
Performance Considerations
- Database Indexing: Ensure proper indexes on frequently queried fields
- Redis Caching: Enable Redis for improved performance (optional)
- Connection Pooling: Motor (MongoDB) and Redis clients handle connection pooling
- Async Operations: All I/O operations are asynchronous
- Background Tasks: Long-running operations don't block request handling
Security
- CORS: Configured in main application
- Environment Variables: Use
.envfile, never commit secrets - Database Authentication: MongoDB requires authentication
- Input Validation: Pydantic models validate all inputs
- Error Handling: Sensitive information not exposed in error messages
Troubleshooting
Database Connection Issues
# Test MongoDB connection (update with your connection string)
mongosh mongodb://admin:password123@mongodb-host:27017/?authSource=admin
# Check if MongoDB is accessible from the container
docker-compose exec monolith ping mongodb-host
Redis Connection Issues
# Test Redis connection (update with your connection string)
redis-cli -h redis-host ping
# Check if Redis is accessible from the container
docker-compose exec monolith ping redis-host
Application Won't Start
# Check logs
docker-compose logs monolith
# Verify environment variables
docker-compose exec monolith env | grep MONGO
License
[Your License Here]
Contributing
[Your Contributing Guidelines Here]