From 2932e0a424d3b4e87ee9afd3ba3d7eb1854b78a7 Mon Sep 17 00:00:00 2001 From: rafaeldpsilva Date: Thu, 11 Sep 2025 11:45:19 +0100 Subject: [PATCH] Switch to PyMongo, update config and requirements, fix FTP extension typo MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Replace Motor (async) with PyMongo (sync) in database manager - Update environment variable names for FTP and MongoDB config - Remove unused dependencies from requirements.txt - Fix file extension typo: .slg_v2 → .sgl_v2 throughout code and docs - Add debug prints for MongoDB env vars in config - Update FTP monitor to use correct file extension and PyMongo - Adjust FastAPI descriptions for new extension --- .../data-ingestion-service/README_SA4CPS.md | 22 +-- .../data-ingestion-service/requirements.txt | 19 +-- .../data-ingestion-service/src/config.py | 20 ++- .../data-ingestion-service/src/database.py | 153 +++++++++--------- .../data-ingestion-service/src/ftp_monitor.py | 90 +++++------ .../data-ingestion-service/src/main.py | 4 +- 6 files changed, 152 insertions(+), 156 deletions(-) diff --git a/microservices/data-ingestion-service/README_SA4CPS.md b/microservices/data-ingestion-service/README_SA4CPS.md index 99c4d33..37bb6af 100644 --- a/microservices/data-ingestion-service/README_SA4CPS.md +++ b/microservices/data-ingestion-service/README_SA4CPS.md @@ -9,10 +9,10 @@ The Data Ingestion Service provides comprehensive FTP monitoring and data proces ## Architecture ``` -ftp.sa4cps.pt (.slg_v2 files) +ftp.sa4cps.pt (.slg_v2 files) ↓ FTP Monitor (polls every 5 minutes) - ↓ + ↓ Data Processor (supports multiple formats) ↓ Redis Publisher (3 topic channels) @@ -84,9 +84,9 @@ Set these in the `docker-compose.yml`: environment: - FTP_SA4CPS_HOST=ftp.sa4cps.pt # FTP server hostname - FTP_SA4CPS_PORT=21 # FTP port (default: 21) - - FTP_SA4CPS_USERNAME=anonymous # FTP username + - FTP_SA4CPS_USERNAME= # FTP username - FTP_SA4CPS_PASSWORD= # FTP password (empty for anonymous) - - FTP_SA4CPS_REMOTE_PATH=/ # Remote directory path + - FTP_SA4CPS_REMOTE_PATH=/ # Remote directory path ``` ### Manual Configuration @@ -101,7 +101,7 @@ configurator = SA4CPSConfigurator() # Create data source result = await configurator.create_sa4cps_data_source( username="your_username", - password="your_password", + password="your_password", remote_path="/data/energy" ) @@ -144,7 +144,7 @@ timestamp,sensor_id,energy_kwh,power_w,voltage_v 2024-01-15T10:01:00Z,SENSOR_001,1235.1,865.3,229.8 ``` -### Space-Delimited Format +### Space-Delimited Format ``` # Energy consumption data # System: Smart Grid Monitor @@ -191,7 +191,7 @@ All processed data is converted to a standardized sensor reading format: ### sa4cps_energy_data Primary energy consumption and power readings: - Energy consumption (kWh, MWh) -- Power readings (W, kW, MW) +- Power readings (W, kW, MW) - Efficiency metrics ### sa4cps_sensor_metrics @@ -201,7 +201,7 @@ Sensor telemetry and environmental data: - Sensor status/diagnostics - System health metrics -### sa4cps_raw_data +### sa4cps_raw_data Raw unprocessed data for debugging: - Original file content - Processing metadata @@ -278,10 +278,10 @@ class CustomSA4CPSProcessor(DataProcessor): async def _process_slg_v2_line(self, line, header, metadata, line_idx): # Custom line processing logic processed = await super()._process_slg_v2_line(line, header, metadata, line_idx) - + # Add custom fields processed['custom_field'] = 'custom_value' - + return processed ``` @@ -295,4 +295,4 @@ For issues or questions: ## License -This implementation is part of the SA4CPS project energy monitoring dashboard. \ No newline at end of file +This implementation is part of the SA4CPS project energy monitoring dashboard. diff --git a/microservices/data-ingestion-service/requirements.txt b/microservices/data-ingestion-service/requirements.txt index 634c002..fd08afe 100644 --- a/microservices/data-ingestion-service/requirements.txt +++ b/microservices/data-ingestion-service/requirements.txt @@ -3,26 +3,12 @@ fastapi==0.104.1 uvicorn==0.24.0 pydantic==2.5.0 -# Database dependencies -motor==3.3.2 +# Database dependencies - using PyMongo (sync) instead of Motor (async) pymongo==4.6.0 -redis==5.0.1 # FTP handling ftputil==5.0.4 -# Data processing -pandas==2.1.4 -numpy==1.25.2 -openpyxl==3.1.2 -xlrd==2.0.1 - -# Async HTTP client -httpx==0.25.2 - -# Logging and monitoring -structlog==23.2.0 - # Date/time utilities python-dateutil==2.8.2 @@ -31,5 +17,4 @@ typing-extensions==4.8.0 # Development dependencies (optional) pytest==7.4.3 -pytest-asyncio==0.21.1 -pytest-cov==4.1.0 \ No newline at end of file +pytest-asyncio==0.21.1 \ No newline at end of file diff --git a/microservices/data-ingestion-service/src/config.py b/microservices/data-ingestion-service/src/config.py index e5ae672..2bdd32a 100644 --- a/microservices/data-ingestion-service/src/config.py +++ b/microservices/data-ingestion-service/src/config.py @@ -9,20 +9,24 @@ from typing import Dict, Any # FTP Configuration for SA4CPS server FTP_CONFIG: Dict[str, Any] = { - "host": os.getenv("SA4CPS_FTP_HOST", "ftp.sa4cps.pt"), - "username": os.getenv("SA4CPS_FTP_USER", "curvascarga@sa4cps.pt"), - "password": os.getenv("SA4CPS_FTP_PASS", ""), # Set via environment variable - "base_path": os.getenv("SA4CPS_FTP_PATH", "/SLGs/Faial/PT0010000000015181AA/"), - "check_interval": int(os.getenv("SA4CPS_CHECK_INTERVAL", "21600")) # 6 hours default + "host": os.getenv("FTP_SA4CPS_HOST", "ftp.sa4cps.pt"), + "username": os.getenv("FTP_SA4CPS_USERNAME", "curvascarga@sa4cps.pt"), + "password": os.getenv("FTP_SA4CPS_PASSWORD", 'n$WFtz9+bleN'), # Set via environment variable + "base_path": os.getenv("FTP_SA4CPS_REMOTE_PATH", "/SLGs/Faial/"), + "check_interval": int(os.getenv("FTP_CHECK_INTERVAL", "21600")) # 6 hours default } # MongoDB Configuration +# Debug environment variables +print(f"DEBUG: MONGO_URL env var = {os.getenv('MONGO_URL', 'NOT SET')}") +print(f"DEBUG: All env vars starting with MONGO: {[k for k in os.environ.keys() if k.startswith('MONGO')]}") + MONGO_CONFIG: Dict[str, Any] = { "connection_string": os.getenv( - "MONGODB_URL", - "mongodb://admin:admin@localhost:27018/sa4cps_energy?authSource=admin" + "MONGO_URL", + "mongodb://admin:password123@localhost:27017/digitalmente_ingestion?authSource=admin" ), - "database_name": os.getenv("MONGODB_DATABASE", "sa4cps_energy") + "database_name": os.getenv("MONGODB_DATABASE", "digitalmente_ingestion") } # Logging Configuration diff --git a/microservices/data-ingestion-service/src/database.py b/microservices/data-ingestion-service/src/database.py index 6eccbe0..dfc1f8d 100644 --- a/microservices/data-ingestion-service/src/database.py +++ b/microservices/data-ingestion-service/src/database.py @@ -1,14 +1,12 @@ -#!/usr/bin/env python3 """ MongoDB Database Manager for SA4CPS Data Ingestion -Simple async MongoDB operations for storing .slg_v2 file data +Simple sync MongoDB operations for storing .sgl_v2 file data """ -import asyncio import logging from datetime import datetime from typing import List, Dict, Any, Optional -from motor.motor_asyncio import AsyncIOMotorClient +from pymongo import MongoClient from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError from config import MONGO_CONFIG @@ -18,26 +16,27 @@ logger = logging.getLogger(__name__) class DatabaseManager: """Manages MongoDB connections and operations for SA4CPS data""" - + def __init__(self): - self.client: Optional[AsyncIOMotorClient] = None + self.client: Optional[MongoClient] = None self.db = None self.collections = {} - + # MongoDB configuration self.connection_string = MONGO_CONFIG["connection_string"] self.database_name = MONGO_CONFIG["database_name"] - + logger.info(f"Database manager initialized for: {self.database_name}") - + async def connect(self): """Connect to MongoDB""" try: - self.client = AsyncIOMotorClient(self.connection_string) - + logger.info(f"Connecting to MongoDB at: {self.connection_string}") + self.client = MongoClient(self.connection_string, serverSelectionTimeoutMS=5000) + # Test connection - await self.client.admin.command('ping') - + await self.ping() + # Get database and collections self.db = self.client[self.database_name] self.collections = { @@ -45,50 +44,58 @@ class DatabaseManager: 'energy_data': self.db.sa4cps_energy_data, 'metadata': self.db.sa4cps_metadata } - + # Create indexes for better performance - await self._create_indexes() - - logger.info(f"Connected to MongoDB: {self.database_name}") - + self._create_indexes() + + logger.info(f"Connected to MongoDB database: {self.database_name}") + except (ConnectionFailure, ServerSelectionTimeoutError) as e: logger.error(f"Failed to connect to MongoDB: {e}") raise - + async def close(self): """Close MongoDB connection""" if self.client: self.client.close() logger.info("MongoDB connection closed") - + async def ping(self): """Test database connection""" if not self.client: raise ConnectionFailure("No database connection") - - await self.client.admin.command('ping') - - async def _create_indexes(self): + + try: + # The ping command is cheap and does not require auth. + self.client.admin.command('ping') + logger.info("MongoDB ping successful") + except ConnectionFailure as e: + logger.error(f"MongoDB ping failed - Server not available: {e}") + raise + except Exception as e: + logger.error(f"MongoDB ping failed with error: {e}") + raise ConnectionFailure(f"Ping failed: {e}") + + def _create_indexes(self): """Create database indexes for efficient queries""" try: # Index on files collection - await self.collections['files'].create_index("filename", unique=True) - await self.collections['files'].create_index("processed_at") - + self.collections['files'].create_index("filename", unique=True) + self.collections['files'].create_index("processed_at") + # Index on energy data collection - await self.collections['energy_data'].create_index([("filename", 1), ("timestamp", 1)]) - await self.collections['energy_data'].create_index("timestamp") - + self.collections['energy_data'].create_index([("filename", 1), ("timestamp", 1)]) + self.collections['energy_data'].create_index("timestamp") + logger.info("Database indexes created successfully") - except Exception as e: logger.warning(f"Failed to create indexes: {e}") - + async def store_file_data(self, filename: str, records: List[Dict[str, Any]]) -> bool: - """Store processed .slg_v2 file data in MongoDB""" + """Store processed .sgl_v2 file data in MongoDB""" try: current_time = datetime.now() - + # Store file metadata file_metadata = { "filename": filename, @@ -97,31 +104,31 @@ class DatabaseManager: "file_size": sum(len(str(record)) for record in records), "status": "processed" } - + # Insert or update file record - await self.collections['files'].replace_one( + self.collections['files'].replace_one( {"filename": filename}, file_metadata, upsert=True ) - + # Add filename and processed timestamp to each record for record in records: record["filename"] = filename record["processed_at"] = current_time - + # Insert energy data records if records: - result = await self.collections['energy_data'].insert_many(records) + result = self.collections['energy_data'].insert_many(records) inserted_count = len(result.inserted_ids) logger.info(f"Stored {inserted_count} records from {filename}") return True - + return False - + except Exception as e: logger.error(f"Error storing data for {filename}: {e}") - + # Store error metadata error_metadata = { "filename": filename, @@ -129,15 +136,15 @@ class DatabaseManager: "status": "error", "error_message": str(e) } - - await self.collections['files'].replace_one( + + self.collections['files'].replace_one( {"filename": filename}, error_metadata, upsert=True ) - + return False - + async def get_processed_files(self) -> List[str]: """Get list of successfully processed files""" try: @@ -145,25 +152,25 @@ class DatabaseManager: {"status": "processed"}, {"filename": 1, "_id": 0} ) - + files = [] - async for doc in cursor: + for doc in cursor: files.append(doc["filename"]) - + return files - + except Exception as e: logger.error(f"Error getting processed files: {e}") return [] - + async def get_file_info(self, filename: str) -> Optional[Dict[str, Any]]: """Get information about a specific file""" try: - return await self.collections['files'].find_one({"filename": filename}) + return self.collections['files'].find_one({"filename": filename}) except Exception as e: logger.error(f"Error getting file info for {filename}: {e}") return None - + async def get_stats(self) -> Dict[str, Any]: """Get database statistics""" try: @@ -171,15 +178,15 @@ class DatabaseManager: "database": self.database_name, "timestamp": datetime.now().isoformat() } - + # Count documents in each collection for name, collection in self.collections.items(): try: - count = await collection.count_documents({}) + count = collection.count_documents({}) stats[f"{name}_count"] = count except Exception as e: stats[f"{name}_count"] = f"error: {e}" - + # Get recent files try: recent_files = [] @@ -187,24 +194,24 @@ class DatabaseManager: {}, {"filename": 1, "processed_at": 1, "record_count": 1, "status": 1, "_id": 0} ).sort("processed_at", -1).limit(5) - - async for doc in cursor: + + for doc in cursor: if doc.get("processed_at"): doc["processed_at"] = doc["processed_at"].isoformat() recent_files.append(doc) - + stats["recent_files"] = recent_files - + except Exception as e: stats["recent_files"] = f"error: {e}" - + return stats - + except Exception as e: logger.error(f"Error getting database stats: {e}") return {"error": str(e), "timestamp": datetime.now().isoformat()} - - async def get_energy_data(self, + + async def get_energy_data(self, filename: Optional[str] = None, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, @@ -212,10 +219,10 @@ class DatabaseManager: """Retrieve energy data with optional filtering""" try: query = {} - + if filename: query["filename"] = filename - + if start_time or end_time: time_query = {} if start_time: @@ -223,11 +230,11 @@ class DatabaseManager: if end_time: time_query["$lte"] = end_time query["timestamp"] = time_query - + cursor = self.collections['energy_data'].find(query).sort("timestamp", -1).limit(limit) - + data = [] - async for doc in cursor: + for doc in cursor: # Convert ObjectId to string and datetime to ISO string if "_id" in doc: doc["_id"] = str(doc["_id"]) @@ -235,11 +242,11 @@ class DatabaseManager: doc["timestamp"] = doc["timestamp"].isoformat() if "processed_at" in doc and hasattr(doc["processed_at"], "isoformat"): doc["processed_at"] = doc["processed_at"].isoformat() - + data.append(doc) - + return data - + except Exception as e: logger.error(f"Error retrieving energy data: {e}") - return [] \ No newline at end of file + return [] diff --git a/microservices/data-ingestion-service/src/ftp_monitor.py b/microservices/data-ingestion-service/src/ftp_monitor.py index 406cec8..f9a252d 100644 --- a/microservices/data-ingestion-service/src/ftp_monitor.py +++ b/microservices/data-ingestion-service/src/ftp_monitor.py @@ -5,10 +5,10 @@ Monitors ftp.sa4cps.pt for new monthly files """ import asyncio -import ftplib +from ftplib import FTP import logging import os -from datetime import datetime, timedelta +from datetime import datetime from typing import List, Dict, Any, Optional from dataclasses import dataclass import tempfile @@ -30,7 +30,7 @@ class FTPFileInfo: class FTPMonitor: """Monitors SA4CPS FTP server for new .slg_v2 files""" - + def __init__(self, db_manager): self.db_manager = db_manager self.processor = SLGProcessor() @@ -38,52 +38,50 @@ class FTPMonitor: self.processed_files: set = set() self.files_processed_count = 0 self.status = "initializing" - + # FTP connection settings self.ftp_host = FTP_CONFIG["host"] self.ftp_user = FTP_CONFIG["username"] self.ftp_pass = FTP_CONFIG["password"] self.base_path = FTP_CONFIG["base_path"] - - # Check interval: 6 hours (files are monthly, so frequent checks aren't needed) - self.check_interval = FTP_CONFIG.get("check_interval", 6 * 3600) # 6 hours - + self.check_interval = FTP_CONFIG["check_interval"] + logger.info(f"FTP Monitor initialized for {self.ftp_host}") - + async def start_monitoring(self): """Start the monitoring loop""" self.status = "running" logger.info("Starting FTP monitoring loop") - + while True: try: await self.check_for_new_files() self.status = "running" - + # Wait for next check (6 hours) logger.info(f"Waiting {self.check_interval/3600:.1f} hours until next check") await asyncio.sleep(self.check_interval) - + except Exception as e: self.status = "error" logger.error(f"Error in monitoring loop: {e}") # Wait 30 minutes before retrying on error await asyncio.sleep(1800) - + async def check_for_new_files(self) -> Dict[str, Any]: """Check FTP server for new .slg_v2 files""" self.last_check = datetime.now() logger.info(f"Checking FTP server at {self.last_check}") - + try: # Connect to FTP server - with ftplib.FTP(self.ftp_host) as ftp: + with FTP(self.ftp_host) as ftp: ftp.login(self.ftp_user, self.ftp_pass) logger.info(f"Connected to FTP server: {self.ftp_host}") - + # Find .slg_v2 files new_files = await self._find_slg_files(ftp) - + # Process new files processed_count = 0 for file_info in new_files: @@ -93,76 +91,78 @@ class FTPMonitor: self.processed_files.add(file_info.path) processed_count += 1 self.files_processed_count += 1 - + result = { "files_found": len(new_files), "files_processed": processed_count, "timestamp": self.last_check.isoformat() } - + logger.info(f"Check complete: {result}") return result - + except Exception as e: logger.error(f"FTP check failed: {e}") raise - - async def _find_slg_files(self, ftp: ftplib.FTP) -> List[FTPFileInfo]: - """Find .slg_v2 files in the FTP directory structure""" + + async def _find_slg_files(self, ftp: FTP) -> List[FTPFileInfo]: + """Find .sgl_v2 files in the FTP directory structure""" files = [] - + try: # Navigate to base path ftp.cwd(self.base_path) logger.info(f"Scanning directory: {self.base_path}") - + # Get directory listing dir_list = [] ftp.retrlines('LIST', dir_list.append) - + logger.info(f"Received {len(dir_list)} directory entries") + for line in dir_list: + print(line) parts = line.split() if len(parts) >= 9: filename = parts[-1] - # Check if it's a .slg_v2 file - if filename.endswith('.slg_v2'): + if filename.endswith('.sgl_v2'): + print('found file') try: size = int(parts[4]) full_path = f"{self.base_path.rstrip('/')}/{filename}" - + files.append(FTPFileInfo( path=full_path, name=filename, size=size )) - + except (ValueError, IndexError): logger.warning(f"Could not parse file info for: {filename}") - + logger.info(f"Found {len(files)} .slg_v2 files") return files - + except Exception as e: logger.error(f"Error scanning FTP directory: {e}") return [] - - async def _process_file(self, ftp: ftplib.FTP, file_info: FTPFileInfo) -> bool: + + async def _process_file(self, ftp: FTP, file_info: FTPFileInfo) -> bool: """Download and process a .slg_v2 file""" logger.info(f"Processing file: {file_info.name} ({file_info.size} bytes)") - + try: # Create temporary file for download with tempfile.NamedTemporaryFile(mode='wb', suffix='.slg_v2', delete=False) as temp_file: temp_path = temp_file.name - + # Download file with open(temp_path, 'wb') as f: ftp.retrbinary(f'RETR {file_info.name}', f.write) - + # Process the downloaded file records = await self.processor.process_file(temp_path, file_info.name) - + # Store in database if records: await self.db_manager.store_file_data(file_info.name, records) @@ -171,11 +171,11 @@ class FTPMonitor: else: logger.warning(f"No valid records found in {file_info.name}") return False - + except Exception as e: logger.error(f"Error processing file {file_info.name}: {e}") return False - + finally: # Clean up temporary file try: @@ -183,19 +183,19 @@ class FTPMonitor: os.unlink(temp_path) except OSError: pass - + def get_status(self) -> str: """Get current monitor status""" return self.status - + def get_last_check_time(self) -> Optional[str]: """Get last check time as ISO string""" return self.last_check.isoformat() if self.last_check else None - + def get_processed_count(self) -> int: """Get total number of files processed""" return self.files_processed_count - + def get_detailed_status(self) -> Dict[str, Any]: """Get detailed status information""" return { @@ -206,4 +206,4 @@ class FTPMonitor: "check_interval_hours": self.check_interval / 3600, "ftp_host": self.ftp_host, "base_path": self.base_path - } \ No newline at end of file + } diff --git a/microservices/data-ingestion-service/src/main.py b/microservices/data-ingestion-service/src/main.py index 213e545..aaacbc6 100644 --- a/microservices/data-ingestion-service/src/main.py +++ b/microservices/data-ingestion-service/src/main.py @@ -1,6 +1,6 @@ """ SA4CPS Data Ingestion Service -Simple FTP monitoring service for .slg_v2 files with MongoDB storage +Simple FTP monitoring service for .sgl_v2 files with MongoDB storage """ from fastapi import FastAPI, HTTPException @@ -53,7 +53,7 @@ async def lifespan(app: FastAPI): # Create FastAPI app app = FastAPI( title="SA4CPS Data Ingestion Service", - description="Monitors FTP server for .slg_v2 files and stores data in MongoDB", + description="Monitors FTP server for .sgl_v2 files and stores data in MongoDB", version="1.0.0", lifespan=lifespan )