diff --git a/ARCHITECTURE.md b/ARCHITECTURE.md deleted file mode 100644 index 4dddb67..0000000 --- a/ARCHITECTURE.md +++ /dev/null @@ -1,139 +0,0 @@ -# Backend Architecture Restructuring - -## Overview - -The backend has been restructured from a monolithic approach to a clean **3-layer architecture** with proper separation of concerns. - -## Architecture Layers - -### 1. Infrastructure Layer (`layers/infrastructure/`) -**Responsibility**: Data access, external services, and low-level operations - -- **`database_connection.py`** - MongoDB connection management and indexing -- **`redis_connection.py`** - Redis connection and basic operations -- **`repositories.py`** - Data access layer with repository pattern - -**Key Principles**: -- No business logic -- Only handles data persistence and retrieval -- Provides abstractions for external services - -### 2. Business Layer (`layers/business/`) -**Responsibility**: Business logic, data processing, and core application rules - -- **`sensor_service.py`** - Sensor data processing and validation -- **`room_service.py`** - Room metrics calculation and aggregation -- **`analytics_service.py`** - Analytics calculations and reporting -- **`cleanup_service.py`** - Data retention and maintenance - -**Key Principles**: -- Contains all business rules and validation -- Independent of presentation concerns -- Uses infrastructure layer for data access - -### 3. Presentation Layer (`layers/presentation/`) -**Responsibility**: HTTP endpoints, WebSocket handling, and user interface - -- **`api_routes.py`** - REST API endpoints and request/response handling -- **`websocket_handler.py`** - WebSocket connection management -- **`redis_subscriber.py`** - Real-time data broadcasting - -**Key Principles**: -- Handles HTTP requests and responses -- Manages real-time communications -- Delegates business logic to business layer - -## File Comparison - -### Before (Monolithic) -``` -main.py (203 lines) # Mixed concerns -api.py (506 lines) # API + some business logic -database.py (220 lines) # DB + Redis + cleanup -persistence.py (448 lines) # Business + data access -models.py (236 lines) # Data models -``` - -### After (Layered) -``` -Infrastructure Layer: -├── database_connection.py (114 lines) # Pure DB connection -├── redis_connection.py (89 lines) # Pure Redis connection -└── repositories.py (376 lines) # Clean data access - -Business Layer: -├── sensor_service.py (380 lines) # Sensor business logic -├── room_service.py (242 lines) # Room business logic -├── analytics_service.py (333 lines) # Analytics business logic -└── cleanup_service.py (278 lines) # Cleanup business logic - -Presentation Layer: -├── api_routes.py (430 lines) # Pure API endpoints -├── websocket_handler.py (103 lines) # WebSocket management -└── redis_subscriber.py (148 lines) # Real-time broadcasting - -Core: -├── main_layered.py (272 lines) # Clean application entry -└── models.py (236 lines) # Unchanged data models -``` - -## Key Improvements - -### 1. **Separation of Concerns** -- Each layer has a single, well-defined responsibility -- Infrastructure concerns isolated from business logic -- Business logic separated from presentation - -### 2. **Testability** -- Each layer can be tested independently -- Business logic testable without database dependencies -- Infrastructure layer testable without business complexity - -### 3. **Maintainability** -- Changes in one layer don't affect others -- Clear boundaries make code easier to understand -- Reduced coupling between components - -### 4. **Scalability** -- Layers can be scaled independently -- Easy to replace implementations within layers -- Clear extension points for new features - -### 5. **Dependency Management** -- Clear dependency flow: Presentation → Business → Infrastructure -- No circular dependencies -- Infrastructure layer has no knowledge of business rules - -## Usage - -### Running the Layered Application -```bash -# Use the new layered main file -conda activate dashboard -uvicorn main_layered:app --reload -``` - -### Testing the Structure -```bash -# Validate the architecture -python test_structure.py -``` - -## Benefits Achieved - -✅ **Clear separation of concerns** -✅ **Infrastructure isolated from business logic** -✅ **Business logic separated from presentation** -✅ **Easy to test individual layers** -✅ **Maintainable and scalable structure** -✅ **No layering violations detected** -✅ **2,290+ lines properly organized across 10+ files** - -## Migration Path - -The original files are preserved, so you can: -1. Test the new layered architecture with `main_layered.py` -2. Gradually migrate consumers to use the new structure -3. Remove old files once confident in the new architecture - -Both architectures can coexist during the transition period. \ No newline at end of file diff --git a/microservices/api-gateway/main.py b/microservices/api-gateway/main.py index 548dc6a..5cfe021 100644 --- a/microservices/api-gateway/main.py +++ b/microservices/api-gateway/main.py @@ -7,7 +7,7 @@ Port: 8000 import asyncio import aiohttp from datetime import datetime -from fastapi import FastAPI, HTTPException, Depends, Request, Response +from fastapi import FastAPI, HTTPException, WebSocket, Depends, Request, Response from fastapi.middleware.cors import CORSMiddleware from fastapi.responses import JSONResponse from contextlib import asynccontextmanager @@ -29,17 +29,17 @@ logger = logging.getLogger(__name__) async def lifespan(app: FastAPI): """Application lifespan manager""" logger.info("API Gateway starting up...") - + # Initialize service registry await service_registry.initialize() - + # Start health check task asyncio.create_task(health_check_task()) - + logger.info("API Gateway startup complete") - + yield - + logger.info("API Gateway shutting down...") await service_registry.close() logger.info("API Gateway shutdown complete") @@ -101,6 +101,12 @@ SERVICES = { base_url="http://localhost:8006", health_endpoint="/health", auth_required=True + ), + "sensor-service": ServiceConfig( + name="sensor-service", + base_url="http://localhost:8007", + health_endpoint="/health", + auth_required=True ) } @@ -119,12 +125,12 @@ async def gateway_health_check(): try: # Check all services service_health = await service_registry.get_all_service_health() - + healthy_services = sum(1 for status in service_health.values() if status.get("status") == "healthy") total_services = len(SERVICES) - + overall_status = "healthy" if healthy_services == total_services else "degraded" - + return HealthResponse( service="api-gateway", status=overall_status, @@ -157,7 +163,7 @@ async def get_services_status(): async def get_gateway_stats(): """Get API gateway statistics""" uptime = (datetime.utcnow() - request_stats["start_time"]).total_seconds() - + return GatewayStats( total_requests=request_stats["total_requests"], successful_requests=request_stats["successful_requests"], @@ -204,39 +210,103 @@ async def iot_control_service_proxy(request: Request, path: str): """Proxy requests to IoT control service""" return await proxy_request(request, "iot-control-service", f"/{path}") +# Sensor Service Routes (Original Dashboard Functionality) +@app.api_route("/api/v1/sensors/{path:path}", methods=["GET", "POST", "PUT", "DELETE"]) +async def sensor_service_proxy(request: Request, path: str): + """Proxy requests to sensor service""" + return await proxy_request(request, "sensor-service", f"/{path}") + +@app.api_route("/api/v1/rooms/{path:path}", methods=["GET", "POST", "PUT", "DELETE"]) +async def room_service_proxy(request: Request, path: str): + """Proxy requests to sensor service for room management""" + return await proxy_request(request, "sensor-service", f"/rooms/{path}") + +@app.api_route("/api/v1/data/{path:path}", methods=["GET", "POST"]) +async def data_service_proxy(request: Request, path: str): + """Proxy requests to sensor service for data operations""" + return await proxy_request(request, "sensor-service", f"/data/{path}") + +@app.api_route("/api/v1/analytics/{path:path}", methods=["GET", "POST"]) +async def analytics_service_proxy(request: Request, path: str): + """Proxy requests to sensor service for analytics""" + return await proxy_request(request, "sensor-service", f"/analytics/{path}") + +@app.api_route("/api/v1/export", methods=["GET"]) +async def export_service_proxy(request: Request): + """Proxy requests to sensor service for data export""" + return await proxy_request(request, "sensor-service", "/export") + +@app.api_route("/api/v1/events", methods=["GET"]) +async def events_service_proxy(request: Request): + """Proxy requests to sensor service for system events""" + return await proxy_request(request, "sensor-service", "/events") + +# WebSocket proxy for real-time data +@app.websocket("/ws") +async def websocket_proxy(websocket: WebSocket): + """Proxy WebSocket connections to sensor service""" + try: + # Get sensor service URL + service_url = await load_balancer.get_service_url("sensor-service") + if not service_url: + await websocket.close(code=1003, reason="Sensor service unavailable") + return + + # For simplicity, we'll just accept the connection and forward to sensor service + # In a production setup, you'd want a proper WebSocket proxy + await websocket.accept() + + # For now, we'll handle this by having the sensor service manage WebSockets directly + # The frontend should connect to the sensor service WebSocket endpoint directly + await websocket.send_text(json.dumps({ + "type": "proxy_info", + "message": "Connect directly to sensor service WebSocket at /ws", + "sensor_service_url": service_url.replace("http://", "ws://") + "/ws" + })) + + # Keep connection alive + while True: + try: + await websocket.receive_text() + except: + break + + except Exception as e: + logger.error(f"WebSocket proxy error: {e}") + async def proxy_request(request: Request, service_name: str, path: str): """Generic request proxy function""" try: # Update request statistics request_stats["total_requests"] += 1 request_stats["service_requests"][service_name] += 1 - + # Get service configuration service_config = SERVICES.get(service_name) if not service_config: raise HTTPException(status_code=404, detail=f"Service {service_name} not found") - + # Check authentication if required if service_config.auth_required: await auth_middleware.verify_token(request) - + # Get healthy service instance service_url = await load_balancer.get_service_url(service_name) - + # Prepare request url = f"{service_url}{path}" method = request.method headers = dict(request.headers) - + # Remove hop-by-hop headers headers.pop("host", None) headers.pop("content-length", None) - + # Get request body body = None if method in ["POST", "PUT", "PATCH"]: body = await request.body() - + # Make request to service async with aiohttp.ClientSession() as session: async with session.request( @@ -247,21 +317,21 @@ async def proxy_request(request: Request, service_name: str, path: str): params=dict(request.query_params), timeout=aiohttp.ClientTimeout(total=30) ) as response: - + # Get response data response_data = await response.read() response_headers = dict(response.headers) - + # Remove hop-by-hop headers from response response_headers.pop("transfer-encoding", None) response_headers.pop("connection", None) - + # Update success statistics if response.status < 400: request_stats["successful_requests"] += 1 else: request_stats["failed_requests"] += 1 - + # Return response return Response( content=response_data, @@ -269,16 +339,16 @@ async def proxy_request(request: Request, service_name: str, path: str): headers=response_headers, media_type=response_headers.get("content-type") ) - + except aiohttp.ClientError as e: request_stats["failed_requests"] += 1 logger.error(f"Service {service_name} connection error: {e}") raise HTTPException(status_code=503, detail=f"Service {service_name} unavailable") - + except HTTPException: request_stats["failed_requests"] += 1 raise - + except Exception as e: request_stats["failed_requests"] += 1 logger.error(f"Proxy error for {service_name}: {e}") @@ -289,23 +359,24 @@ async def get_system_overview(): """Get comprehensive system overview from all services""" try: overview = {} - + # Get data from each service for service_name in SERVICES.keys(): try: if await service_registry.is_service_healthy(service_name): service_url = await load_balancer.get_service_url(service_name) - + async with aiohttp.ClientSession() as session: # Try to get service-specific overview data overview_endpoints = { + "sensor-service": "/analytics/summary", "battery-service": "/batteries", "demand-response-service": "/flexibility/current", "p2p-trading-service": "/market/status", "forecasting-service": "/forecast/summary", "iot-control-service": "/devices/summary" } - + endpoint = overview_endpoints.get(service_name) if endpoint: async with session.get(f"{service_url}{endpoint}", timeout=aiohttp.ClientTimeout(total=5)) as response: @@ -316,17 +387,17 @@ async def get_system_overview(): overview[service_name] = {"status": "error", "message": "Service returned error"} else: overview[service_name] = {"status": "available"} - + except Exception as e: logger.warning(f"Could not get overview from {service_name}: {e}") overview[service_name] = {"status": "unavailable", "error": str(e)} - + return { "system_overview": overview, "timestamp": datetime.utcnow().isoformat(), "services_checked": len(SERVICES) } - + except Exception as e: logger.error(f"Error getting system overview: {e}") raise HTTPException(status_code=500, detail="Internal server error") @@ -334,12 +405,12 @@ async def get_system_overview(): async def health_check_task(): """Background task for periodic health checks""" logger.info("Starting health check task") - + while True: try: await service_registry.update_all_service_health() await asyncio.sleep(30) # Check every 30 seconds - + except Exception as e: logger.error(f"Error in health check task: {e}") await asyncio.sleep(60) @@ -349,4 +420,4 @@ asyncio.create_task(service_registry.register_services(SERVICES)) if __name__ == "__main__": import uvicorn - uvicorn.run(app, host="0.0.0.0", port=8000) \ No newline at end of file + uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/microservices/deploy.sh b/microservices/deploy.sh index cd0807b..79a8be4 100755 --- a/microservices/deploy.sh +++ b/microservices/deploy.sh @@ -36,30 +36,30 @@ print_error() { # Function to check if Docker and Docker Compose are installed check_dependencies() { print_status "Checking dependencies..." - + if ! command -v docker &> /dev/null; then print_error "Docker is not installed. Please install Docker first." exit 1 fi - - if ! command -v docker-compose &> /dev/null; then + + if ! command -v docker compose &> /dev/null; then print_error "Docker Compose is not installed. Please install Docker Compose first." exit 1 fi - + print_success "Dependencies check passed" } # Function to create necessary directories and files setup_environment() { print_status "Setting up environment..." - + # Create nginx configuration directory mkdir -p nginx/ssl - + # Create init-mongo directory for database initialization mkdir -p init-mongo - + # Create a simple nginx configuration if it doesn't exist if [ ! -f "nginx/nginx.conf" ]; then cat > nginx/nginx.conf << 'EOF' @@ -71,10 +71,10 @@ http { upstream api_gateway { server api-gateway:8000; } - + server { listen 80; - + location / { proxy_pass http://api_gateway; proxy_set_header Host $host; @@ -82,7 +82,7 @@ http { proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; proxy_set_header X-Forwarded-Proto $scheme; } - + location /ws { proxy_pass http://api_gateway; proxy_http_version 1.1; @@ -95,7 +95,7 @@ http { EOF print_success "Created nginx configuration" fi - + # Create MongoDB initialization script if it doesn't exist if [ ! -f "init-mongo/init.js" ]; then cat > init-mongo/init.js << 'EOF' @@ -124,16 +124,16 @@ print("MongoDB initialization completed"); EOF print_success "Created MongoDB initialization script" fi - + print_success "Environment setup completed" } # Function to build all services build_services() { print_status "Building all microservices..." - - docker-compose -f $COMPOSE_FILE build - + + docker compose -f $COMPOSE_FILE build + if [ $? -eq 0 ]; then print_success "All services built successfully" else @@ -145,9 +145,9 @@ build_services() { # Function to start all services start_services() { print_status "Starting all services..." - - docker-compose -f $COMPOSE_FILE up -d - + + docker compose -f $COMPOSE_FILE up -d + if [ $? -eq 0 ]; then print_success "All services started successfully" else @@ -159,9 +159,9 @@ start_services() { # Function to stop all services stop_services() { print_status "Stopping all services..." - - docker-compose -f $COMPOSE_FILE down - + + docker compose -f $COMPOSE_FILE down + print_success "All services stopped" } @@ -174,19 +174,19 @@ restart_services() { # Function to show service status show_status() { print_status "Service status:" - docker-compose -f $COMPOSE_FILE ps - + docker compose -f $COMPOSE_FILE ps + print_status "Service health checks:" - + # Wait a moment for services to start sleep 5 - - services=("api-gateway:8000" "token-service:8001" "battery-service:8002" "demand-response-service:8003") - + + # services=("api-gateway:8000" "token-service:8001" "battery-service:8002" "demand-response-service:8003") + services=("api-gateway:8000" "token-service:8001") for service in "${services[@]}"; do name="${service%:*}" port="${service#*:}" - + if curl -f -s "http://localhost:$port/health" > /dev/null; then print_success "$name is healthy" else @@ -199,10 +199,10 @@ show_status() { view_logs() { if [ -z "$2" ]; then print_status "Showing logs for all services..." - docker-compose -f $COMPOSE_FILE logs -f + docker compose -f $COMPOSE_FILE logs -f else print_status "Showing logs for $2..." - docker-compose -f $COMPOSE_FILE logs -f $2 + docker compose -f $COMPOSE_FILE logs -f $2 fi } @@ -212,7 +212,7 @@ cleanup() { read -r response if [[ "$response" =~ ^([yY][eE][sS]|[yY])$ ]]; then print_status "Cleaning up everything..." - docker-compose -f $COMPOSE_FILE down -v --rmi all + docker compose -f $COMPOSE_FILE down -v --rmi all docker system prune -f print_success "Cleanup completed" else @@ -223,11 +223,11 @@ cleanup() { # Function to run database migrations or setup setup_database() { print_status "Setting up databases..." - + # Wait for MongoDB to be ready print_status "Waiting for MongoDB to be ready..." sleep 10 - + # Run any additional setup scripts here print_success "Database setup completed" } @@ -306,4 +306,4 @@ case "${1:-help}" in show_help exit 1 ;; -esac \ No newline at end of file +esac diff --git a/microservices/docker-compose.yml b/microservices/docker-compose.yml index 0139cfe..ad65427 100644 --- a/microservices/docker-compose.yml +++ b/microservices/docker-compose.yml @@ -1,4 +1,4 @@ -version: '3.8' +version: "3.8" services: # Database Services @@ -41,17 +41,19 @@ services: - MONGO_URL=mongodb://admin:password123@mongodb:27017/energy_dashboard?authSource=admin - REDIS_URL=redis://redis:6379 - TOKEN_SERVICE_URL=http://token-service:8001 + - SENSOR_SERVICE_URL=http://sensor-service:8007 - BATTERY_SERVICE_URL=http://battery-service:8002 - DEMAND_RESPONSE_SERVICE_URL=http://demand-response-service:8003 - P2P_TRADING_SERVICE_URL=http://p2p-trading-service:8004 - FORECASTING_SERVICE_URL=http://forecasting-service:8005 - IOT_CONTROL_SERVICE_URL=http://iot-control-service:8006 + - DATA_INGESTION_SERVICE_URL=http://data-ingestion-service:8008 depends_on: - mongodb - redis - token-service - - battery-service - - demand-response-service + # - battery-service + # - demand-response-service networks: - energy-network @@ -73,92 +75,134 @@ services: - energy-network # Battery Management Service - battery-service: - build: - context: ./battery-service - dockerfile: Dockerfile - container_name: energy-battery-service - restart: unless-stopped - ports: - - "8002:8002" - environment: - - MONGO_URL=mongodb://admin:password123@mongodb:27017/energy_dashboard_batteries?authSource=admin - - REDIS_URL=redis://redis:6379 - depends_on: - - mongodb - - redis - networks: - - energy-network + # battery-service: + # build: + # context: ./battery-service + # dockerfile: Dockerfile + # container_name: energy-battery-service + # restart: unless-stopped + # ports: + # - "8002:8002" + # environment: + # - MONGO_URL=mongodb://admin:password123@mongodb:27017/energy_dashboard_batteries?authSource=admin + # - REDIS_URL=redis://redis:6379 + # depends_on: + # - mongodb + # - redis + # networks: + # - energy-network # Demand Response Service - demand-response-service: - build: - context: ./demand-response-service - dockerfile: Dockerfile - container_name: energy-demand-response-service - restart: unless-stopped - ports: - - "8003:8003" - environment: - - MONGO_URL=mongodb://admin:password123@mongodb:27017/energy_dashboard_demand_response?authSource=admin - - REDIS_URL=redis://redis:6379 - - IOT_CONTROL_SERVICE_URL=http://iot-control-service:8006 - depends_on: - - mongodb - - redis - networks: - - energy-network + # demand-response-service: + # build: + # context: ./demand-response-service + # dockerfile: Dockerfile + # container_name: energy-demand-response-service + # restart: unless-stopped + # ports: + # - "8003:8003" + # environment: + # - MONGO_URL=mongodb://admin:password123@mongodb:27017/energy_dashboard_demand_response?authSource=admin + # - REDIS_URL=redis://redis:6379 + # - IOT_CONTROL_SERVICE_URL=http://iot-control-service:8006 + # depends_on: + # - mongodb + # - redis + # networks: + # - energy-network # P2P Trading Service - p2p-trading-service: - build: - context: ./p2p-trading-service - dockerfile: Dockerfile - container_name: energy-p2p-trading-service - restart: unless-stopped - ports: - - "8004:8004" - environment: - - MONGO_URL=mongodb://admin:password123@mongodb:27017/energy_dashboard_p2p?authSource=admin - - REDIS_URL=redis://redis:6379 - depends_on: - - mongodb - - redis - networks: - - energy-network + # p2p-trading-service: + # build: + # context: ./p2p-trading-service + # dockerfile: Dockerfile + # container_name: energy-p2p-trading-service + # restart: unless-stopped + # ports: + # - "8004:8004" + # environment: + # - MONGO_URL=mongodb://admin:password123@mongodb:27017/energy_dashboard_p2p?authSource=admin + # - REDIS_URL=redis://redis:6379 + # depends_on: + # - mongodb + # - redis + # networks: + # - energy-network # Forecasting Service - forecasting-service: + # forecasting-service: + # build: + # context: ./forecasting-service + # dockerfile: Dockerfile + # container_name: energy-forecasting-service + # restart: unless-stopped + # ports: + # - "8005:8005" + # environment: + # - MONGO_URL=mongodb://admin:password123@mongodb:27017/energy_dashboard_forecasting?authSource=admin + # - REDIS_URL=redis://redis:6379 + # depends_on: + # - mongodb + # - redis + # networks: + # - energy-network + + # IoT Control Service + # iot-control-service: + # build: + # context: ./iot-control-service + # dockerfile: Dockerfile + # container_name: energy-iot-control-service + # restart: unless-stopped + # ports: + # - "8006:8006" + # environment: + # - MONGO_URL=mongodb://admin:password123@mongodb:27017/energy_dashboard_iot?authSource=admin + # - REDIS_URL=redis://redis:6379 + # - BATTERY_SERVICE_URL=http://battery-service:8002 + # - DEMAND_RESPONSE_SERVICE_URL=http://demand-response-service:8003 + # depends_on: + # - mongodb + # - redis + # networks: + # - energy-network + + # Data Ingestion Service (FTP Monitoring & SA4CPS Integration) + data-ingestion-service: build: - context: ./forecasting-service + context: ./data-ingestion-service dockerfile: Dockerfile - container_name: energy-forecasting-service + container_name: energy-data-ingestion-service restart: unless-stopped ports: - - "8005:8005" + - "8008:8008" environment: - - MONGO_URL=mongodb://admin:password123@mongodb:27017/energy_dashboard_forecasting?authSource=admin + - MONGO_URL=mongodb://admin:password123@mongodb:27017/energy_dashboard_ingestion?authSource=admin - REDIS_URL=redis://redis:6379 + - FTP_SA4CPS_HOST=ftp.sa4cps.pt + - FTP_SA4CPS_PORT=21 + - FTP_SA4CPS_USERNAME=anonymous + - FTP_SA4CPS_PASSWORD= + - FTP_SA4CPS_REMOTE_PATH=/ depends_on: - mongodb - redis networks: - energy-network - # IoT Control Service - iot-control-service: + # Sensor Management Service (Original Dashboard Functionality) + sensor-service: build: - context: ./iot-control-service + context: ./sensor-service dockerfile: Dockerfile - container_name: energy-iot-control-service + container_name: energy-sensor-service restart: unless-stopped ports: - - "8006:8006" + - "8007:8007" environment: - - MONGO_URL=mongodb://admin:password123@mongodb:27017/energy_dashboard_iot?authSource=admin + - MONGO_URL=mongodb://admin:password123@mongodb:27017/energy_dashboard_sensors?authSource=admin - REDIS_URL=redis://redis:6379 - - BATTERY_SERVICE_URL=http://battery-service:8002 - - DEMAND_RESPONSE_SERVICE_URL=http://demand-response-service:8003 + - TOKEN_SERVICE_URL=http://token-service:8001 depends_on: - mongodb - redis @@ -190,4 +234,4 @@ volumes: mongodb_data: name: energy-mongodb-data redis_data: - name: energy-redis-data \ No newline at end of file + name: energy-redis-data diff --git a/services/__init__.py b/services/__init__.py deleted file mode 100644 index 71d03af..0000000 --- a/services/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# Services package for dashboard backend \ No newline at end of file diff --git a/services/token_service.py b/services/token_service.py deleted file mode 100644 index 55e6872..0000000 --- a/services/token_service.py +++ /dev/null @@ -1,174 +0,0 @@ -""" -Token management service for authentication and resource access control. -Based on the tiocps JWT token implementation with resource-based permissions. -""" - -import jwt -import uuid -from datetime import datetime, timedelta -from typing import Dict, List, Optional, Any -from pydantic import BaseModel -from motor.motor_asyncio import AsyncIOMotorDatabase - -class TokenPayload(BaseModel): - """Token payload structure""" - name: str - list_of_resources: List[str] - data_aggregation: bool = False - time_aggregation: bool = False - embargo: int = 0 # embargo period in seconds - exp: int # expiration timestamp - -class TokenRecord(BaseModel): - """Token database record""" - token: str - datetime: datetime - active: bool = True - created_at: datetime - updated_at: datetime - -class TokenService: - """Service for managing JWT tokens and authentication""" - - def __init__(self, db: AsyncIOMotorDatabase, secret_key: str = "dashboard-secret-key"): - self.db = db - self.secret_key = secret_key - self.tokens_collection = db.tokens - - def generate_token(self, name: str, list_of_resources: List[str], - data_aggregation: bool = False, time_aggregation: bool = False, - embargo: int = 0, exp_hours: int = 24) -> str: - """Generate a new JWT token with specified permissions""" - - # Calculate expiration time - exp_timestamp = int((datetime.utcnow() + timedelta(hours=exp_hours)).timestamp()) - - # Create token payload - payload = { - "name": name, - "list_of_resources": list_of_resources, - "data_aggregation": data_aggregation, - "time_aggregation": time_aggregation, - "embargo": embargo, - "exp": exp_timestamp, - "iat": int(datetime.utcnow().timestamp()), - "jti": str(uuid.uuid4()) # unique token ID - } - - # Generate JWT token - token = jwt.encode(payload, self.secret_key, algorithm="HS256") - return token - - def decode_token(self, token: str) -> Optional[Dict[str, Any]]: - """Decode and verify JWT token""" - try: - payload = jwt.decode(token, self.secret_key, algorithms=["HS256"]) - return payload - except jwt.ExpiredSignatureError: - return {"error": "Token has expired"} - except jwt.InvalidTokenError: - return {"error": "Invalid token"} - - async def insert_token(self, token: str) -> Dict[str, Any]: - """Save token to database""" - now = datetime.utcnow() - - # Decode token to verify it's valid - decoded = self.decode_token(token) - if decoded and "error" not in decoded: - token_record = { - "token": token, - "datetime": now, - "active": True, - "created_at": now, - "updated_at": now, - "name": decoded.get("name", ""), - "resources": decoded.get("list_of_resources", []), - "expires_at": datetime.fromtimestamp(decoded.get("exp", 0)) - } - - await self.tokens_collection.insert_one(token_record) - return { - "token": token, - "datetime": now.isoformat(), - "active": True - } - else: - raise ValueError("Invalid token cannot be saved") - - async def revoke_token(self, token: str) -> Dict[str, Any]: - """Revoke a token by marking it as inactive""" - now = datetime.utcnow() - - result = await self.tokens_collection.update_one( - {"token": token}, - { - "$set": { - "active": False, - "updated_at": now, - "revoked_at": now - } - } - ) - - if result.matched_count > 0: - return { - "token": token, - "datetime": now.isoformat(), - "active": False - } - else: - raise ValueError("Token not found") - - async def get_tokens(self) -> List[Dict[str, Any]]: - """Get all tokens from database""" - cursor = self.tokens_collection.find({}) - tokens = [] - - async for token_record in cursor: - # Convert ObjectId to string and datetime to ISO format - token_record["_id"] = str(token_record["_id"]) - for field in ["datetime", "created_at", "updated_at", "expires_at", "revoked_at"]: - if field in token_record and token_record[field]: - token_record[field] = token_record[field].isoformat() - - tokens.append(token_record) - - return tokens - - async def is_token_valid(self, token: str) -> bool: - """Check if token is valid and active""" - # Check if token exists and is active in database - token_record = await self.tokens_collection.find_one({ - "token": token, - "active": True - }) - - if not token_record: - return False - - # Verify JWT signature and expiration - decoded = self.decode_token(token) - return decoded is not None and "error" not in decoded - - async def get_token_permissions(self, token: str) -> Optional[Dict[str, Any]]: - """Get permissions for a valid token""" - if await self.is_token_valid(token): - return self.decode_token(token) - return None - - async def cleanup_expired_tokens(self): - """Remove expired tokens from database""" - now = datetime.utcnow() - - # Find tokens that have expired - expired_cursor = self.tokens_collection.find({ - "expires_at": {"$lt": now} - }) - - expired_count = 0 - async for token_record in expired_cursor: - await self.tokens_collection.delete_one({"_id": token_record["_id"]}) - expired_count += 1 - - return expired_count \ No newline at end of file