diff --git a/microservices/api-gateway/main.py b/microservices/api-gateway/main.py index 5cfe021..d5edff0 100644 --- a/microservices/api-gateway/main.py +++ b/microservices/api-gateway/main.py @@ -68,45 +68,51 @@ auth_middleware = AuthMiddleware() SERVICES = { "token-service": ServiceConfig( name="token-service", - base_url="http://localhost:8001", + base_url=os.getenv("TOKEN_SERVICE_URL", "http://energy-token-service:8001"), health_endpoint="/health", auth_required=False ), "battery-service": ServiceConfig( name="battery-service", - base_url="http://localhost:8002", + base_url=os.getenv("BATTERY_SERVICE_URL", "http://energy-battery-service:8002"), health_endpoint="/health", auth_required=True ), "demand-response-service": ServiceConfig( name="demand-response-service", - base_url="http://localhost:8003", + base_url=os.getenv("DEMAND_RESPONSE_SERVICE_URL", "http://energy-demand-response-service:8003"), health_endpoint="/health", auth_required=True ), "p2p-trading-service": ServiceConfig( name="p2p-trading-service", - base_url="http://localhost:8004", + base_url=os.getenv("P2P_TRADING_SERVICE_URL", "http://energy-p2p-trading-service:8004"), health_endpoint="/health", auth_required=True ), "forecasting-service": ServiceConfig( name="forecasting-service", - base_url="http://localhost:8005", + base_url=os.getenv("FORECASTING_SERVICE_URL", "http://energy-forecasting-service:8005"), health_endpoint="/health", auth_required=True ), "iot-control-service": ServiceConfig( name="iot-control-service", - base_url="http://localhost:8006", + base_url=os.getenv("IOT_CONTROL_SERVICE_URL", "http://energy-iot-control-service:8006"), health_endpoint="/health", auth_required=True ), "sensor-service": ServiceConfig( name="sensor-service", - base_url="http://localhost:8007", + base_url=os.getenv("SENSOR_SERVICE_URL", "http://energy-sensor-service:8007"), health_endpoint="/health", auth_required=True + ), + "data-ingestion-service": ServiceConfig( + name="data-ingestion-service", + base_url=os.getenv("DATA_INGESTION_SERVICE_URL", "http://energy-data-ingestion-service:8008"), + health_endpoint="/health", + auth_required=False ) } @@ -216,6 +222,22 @@ async def sensor_service_proxy(request: Request, path: str): """Proxy requests to sensor service""" return await proxy_request(request, "sensor-service", f"/{path}") +# Data Ingestion Service Routes (SA4CPS FTP Monitoring) +@app.api_route("/api/v1/ingestion/{path:path}", methods=["GET", "POST", "PUT", "DELETE"]) +async def data_ingestion_service_proxy(request: Request, path: str): + """Proxy requests to data ingestion service""" + return await proxy_request(request, "data-ingestion-service", f"/{path}") + +@app.api_route("/api/v1/sources/{path:path}", methods=["GET", "POST", "PUT", "DELETE"]) +async def data_sources_proxy(request: Request, path: str): + """Proxy requests to data ingestion service for data sources""" + return await proxy_request(request, "data-ingestion-service", f"/sources/{path}") + +@app.get("/api/v1/sources") +async def data_sources_list_proxy(request: Request): + """Proxy requests to data ingestion service for sources list""" + return await proxy_request(request, "data-ingestion-service", "/sources") + @app.api_route("/api/v1/rooms/{path:path}", methods=["GET", "POST", "PUT", "DELETE"]) async def room_service_proxy(request: Request, path: str): """Proxy requests to sensor service for room management""" diff --git a/microservices/docker-compose.yml b/microservices/docker-compose.yml index ad65427..18a7628 100644 --- a/microservices/docker-compose.yml +++ b/microservices/docker-compose.yml @@ -52,6 +52,8 @@ services: - mongodb - redis - token-service + - sensor-service + - data-ingestion-service # - battery-service # - demand-response-service networks: @@ -67,7 +69,7 @@ services: ports: - "8001:8001" environment: - - MONGO_URL=mongodb://admin:password123@mongodb:27017/energy_dashboard_tokens?authSource=admin + - MONGO_URL=mongodb://admin:password123@localhost:27017/energy_dashboard_tokens?authSource=admin - JWT_SECRET_KEY=your-super-secret-jwt-key-change-in-production depends_on: - mongodb @@ -177,16 +179,15 @@ services: ports: - "8008:8008" environment: - - MONGO_URL=mongodb://admin:password123@mongodb:27017/energy_dashboard_ingestion?authSource=admin - - REDIS_URL=redis://redis:6379 + - MONGO_URL=mongodb://admin:password123@mongodb:27017/ - FTP_SA4CPS_HOST=ftp.sa4cps.pt - FTP_SA4CPS_PORT=21 - - FTP_SA4CPS_USERNAME=anonymous - - FTP_SA4CPS_PASSWORD= + - FTP_SA4CPS_USERNAME=curvascarga@sa4cps.pt + - FTP_SA4CPS_PASSWORD=n$WFtz9+bleN - FTP_SA4CPS_REMOTE_PATH=/ + - FTP_CHECK_INTERVAL=21600 depends_on: - mongodb - - redis networks: - energy-network diff --git a/microservices/sensor-service/main.py b/microservices/sensor-service/main.py index a0b8331..51c9eac 100644 --- a/microservices/sensor-service/main.py +++ b/microservices/sensor-service/main.py @@ -9,7 +9,7 @@ import asyncio from datetime import datetime, timedelta from fastapi import FastAPI, HTTPException, Depends, WebSocket, WebSocketDisconnect, Query, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware -from contextual import asynccontextmanager +from contextlib import asynccontextmanager import logging from typing import List, Optional, Dict, Any import json @@ -20,7 +20,7 @@ from models import ( ) from database import connect_to_mongo, close_mongo_connection, get_database, connect_to_redis, get_redis from sensor_service import SensorService -from room_service import RoomService +from room_service import RoomService from analytics_service import AnalyticsService from websocket_manager import WebSocketManager @@ -37,16 +37,16 @@ async def lifespan(app: FastAPI): logger.info("Sensor Service starting up...") await connect_to_mongo() await connect_to_redis() - + # Start background tasks asyncio.create_task(redis_subscriber_task()) asyncio.create_task(room_metrics_aggregation_task()) asyncio.create_task(data_cleanup_task()) - + logger.info("Sensor Service startup complete") - + yield - + logger.info("Sensor Service shutting down...") await close_mongo_connection() logger.info("Sensor Service shutdown complete") @@ -88,10 +88,10 @@ async def health_check(): try: db = await get_database() await db.command("ping") - + redis = await get_redis() await redis.ping() - + return HealthResponse( service="sensor-service", status="healthy", @@ -147,7 +147,7 @@ async def get_sensor(sensor_id: str, service: SensorService = Depends(get_sensor sensor = await service.get_sensor_details(sensor_id) if not sensor: raise HTTPException(status_code=404, detail="Sensor not found") - + return sensor except HTTPException: raise @@ -173,7 +173,7 @@ async def get_sensor_data( limit=limit, offset=offset ) - + return DataResponse( data=data["readings"], total_count=data["total_count"], @@ -220,7 +220,7 @@ async def update_sensor( result = await service.update_sensor(sensor_id, update_data) if not result: raise HTTPException(status_code=404, detail="Sensor not found") - + return { "message": "Sensor updated successfully", "sensor_id": sensor_id, @@ -289,7 +289,7 @@ async def get_room(room_name: str, service: RoomService = Depends(get_room_servi room = await service.get_room_details(room_name) if not room: raise HTTPException(status_code=404, detail="Room not found") - + return room except HTTPException: raise @@ -313,7 +313,7 @@ async def get_room_data( end_time=end_time, limit=limit ) - + return { "room": room_name, "room_metrics": data.get("room_metrics", []), @@ -385,7 +385,7 @@ async def export_data( sensor_ids=sensor_ids, format=format ) - + return export_data except Exception as e: logger.error(f"Error exporting data: {e}") @@ -408,7 +408,7 @@ async def get_events( hours=hours, limit=limit ) - + return { "events": events, "count": len(events), @@ -429,11 +429,11 @@ async def ingest_sensor_data( try: # Process and store sensor data result = await service.ingest_sensor_data(sensor_data) - + # Schedule background tasks for analytics background_tasks.add_task(update_room_metrics, sensor_data) background_tasks.add_task(broadcast_sensor_data, sensor_data) - + return { "message": "Sensor data ingested successfully", "sensor_id": sensor_data.sensor_id, @@ -466,42 +466,42 @@ async def broadcast_sensor_data(sensor_data: SensorReading): async def redis_subscriber_task(): """Subscribe to Redis channels for real-time data""" logger.info("Starting Redis subscriber task") - + try: redis = await get_redis() pubsub = redis.pubsub() await pubsub.subscribe("energy_data", "sensor_events") - + while True: try: message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=1.0) if message: # Process incoming message and broadcast to WebSocket clients await websocket_manager.broadcast_raw_data(message['data']) - + except Exception as e: logger.error(f"Error processing Redis message: {e}") await asyncio.sleep(5) - + except Exception as e: logger.error(f"Redis subscriber task failed: {e}") async def room_metrics_aggregation_task(): """Periodically aggregate room-level metrics""" logger.info("Starting room metrics aggregation task") - + while True: try: db = await get_database() redis = await get_redis() room_service = RoomService(db, redis) - + # Aggregate metrics for all rooms await room_service.aggregate_all_room_metrics() - + # Sleep for 5 minutes between aggregations await asyncio.sleep(300) - + except Exception as e: logger.error(f"Error in room metrics aggregation: {e}") await asyncio.sleep(600) # Wait longer on error @@ -509,23 +509,23 @@ async def room_metrics_aggregation_task(): async def data_cleanup_task(): """Periodic cleanup of old data""" logger.info("Starting data cleanup task") - + while True: try: db = await get_database() service = SensorService(db, None) - + # Clean up data older than 90 days cleanup_date = datetime.utcnow() - timedelta(days=90) await service.cleanup_old_data(cleanup_date) - + # Sleep for 24 hours between cleanups await asyncio.sleep(86400) - + except Exception as e: logger.error(f"Error in data cleanup task: {e}") await asyncio.sleep(7200) # Wait 2 hours on error if __name__ == "__main__": import uvicorn - uvicorn.run(app, host="0.0.0.0", port=8007) \ No newline at end of file + uvicorn.run(app, host="0.0.0.0", port=8007)