diff --git a/microservices/api-gateway/main.py b/microservices/api-gateway/main.py index d5edff0..f979381 100644 --- a/microservices/api-gateway/main.py +++ b/microservices/api-gateway/main.py @@ -33,6 +33,9 @@ async def lifespan(app: FastAPI): # Initialize service registry await service_registry.initialize() + # Register all services + await service_registry.register_services(SERVICES) + # Start health check task asyncio.create_task(health_check_task()) @@ -66,51 +69,51 @@ auth_middleware = AuthMiddleware() # Service configuration SERVICES = { - "token-service": ServiceConfig( - name="token-service", - base_url=os.getenv("TOKEN_SERVICE_URL", "http://energy-token-service:8001"), - health_endpoint="/health", - auth_required=False - ), - "battery-service": ServiceConfig( - name="battery-service", - base_url=os.getenv("BATTERY_SERVICE_URL", "http://energy-battery-service:8002"), - health_endpoint="/health", - auth_required=True - ), - "demand-response-service": ServiceConfig( - name="demand-response-service", - base_url=os.getenv("DEMAND_RESPONSE_SERVICE_URL", "http://energy-demand-response-service:8003"), - health_endpoint="/health", - auth_required=True - ), - "p2p-trading-service": ServiceConfig( - name="p2p-trading-service", - base_url=os.getenv("P2P_TRADING_SERVICE_URL", "http://energy-p2p-trading-service:8004"), - health_endpoint="/health", - auth_required=True - ), - "forecasting-service": ServiceConfig( - name="forecasting-service", - base_url=os.getenv("FORECASTING_SERVICE_URL", "http://energy-forecasting-service:8005"), - health_endpoint="/health", - auth_required=True - ), - "iot-control-service": ServiceConfig( - name="iot-control-service", - base_url=os.getenv("IOT_CONTROL_SERVICE_URL", "http://energy-iot-control-service:8006"), - health_endpoint="/health", - auth_required=True - ), + # "token-service": ServiceConfig( + # name="token-service", + # base_url=os.getenv("TOKEN_SERVICE_URL", "http://token-service:8001"), + # health_endpoint="/health", + # auth_required=False + # ), + # "battery-service": ServiceConfig( + # name="battery-service", + # base_url=os.getenv("BATTERY_SERVICE_URL", "http://battery-service:8002"), + # health_endpoint="/health", + # auth_required=True + # ), + # "demand-response-service": ServiceConfig( + # name="demand-response-service", + # base_url=os.getenv("DEMAND_RESPONSE_SERVICE_URL", "http://demand-response-service:8003"), + # health_endpoint="/health", + # auth_required=True + # ), + # "p2p-trading-service": ServiceConfig( + # name="p2p-trading-service", + # base_url=os.getenv("P2P_TRADING_SERVICE_URL", "http://p2p-trading-service:8004"), + # health_endpoint="/health", + # auth_required=True + # ), + # "forecasting-service": ServiceConfig( + # name="forecasting-service", + # base_url=os.getenv("FORECASTING_SERVICE_URL", "http://forecasting-service:8005"), + # health_endpoint="/health", + # auth_required=True + # ), + # "iot-control-service": ServiceConfig( + # name="iot-control-service", + # base_url=os.getenv("IOT_CONTROL_SERVICE_URL", "http://iot-control-service:8006"), + # health_endpoint="/health", + # auth_required=True + # ), "sensor-service": ServiceConfig( name="sensor-service", - base_url=os.getenv("SENSOR_SERVICE_URL", "http://energy-sensor-service:8007"), + base_url=os.getenv("SENSOR_SERVICE_URL", "http://sensor-service:8007"), health_endpoint="/health", auth_required=True ), "data-ingestion-service": ServiceConfig( name="data-ingestion-service", - base_url=os.getenv("DATA_INGESTION_SERVICE_URL", "http://energy-data-ingestion-service:8008"), + base_url=os.getenv("DATA_INGESTION_SERVICE_URL", "http://data-ingestion-service:8008"), health_endpoint="/health", auth_required=False ) @@ -437,9 +440,6 @@ async def health_check_task(): logger.error(f"Error in health check task: {e}") await asyncio.sleep(60) -# Initialize service registry with services -asyncio.create_task(service_registry.register_services(SERVICES)) - if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000) diff --git a/microservices/data-ingestion-service/src/config.py b/microservices/data-ingestion-service/src/config.py index 2bdd32a..27ca0b0 100644 --- a/microservices/data-ingestion-service/src/config.py +++ b/microservices/data-ingestion-service/src/config.py @@ -12,8 +12,8 @@ FTP_CONFIG: Dict[str, Any] = { "host": os.getenv("FTP_SA4CPS_HOST", "ftp.sa4cps.pt"), "username": os.getenv("FTP_SA4CPS_USERNAME", "curvascarga@sa4cps.pt"), "password": os.getenv("FTP_SA4CPS_PASSWORD", 'n$WFtz9+bleN'), # Set via environment variable - "base_path": os.getenv("FTP_SA4CPS_REMOTE_PATH", "/SLGs/Faial/"), - "check_interval": int(os.getenv("FTP_CHECK_INTERVAL", "21600")) # 6 hours default + "base_path": os.getenv("FTP_SA4CPS_REMOTE_PATH", "/SLGs/"), + "check_interval": int(os.getenv("FTP_CHECK_INTERVAL", "21600")), # 6 hours default } # MongoDB Configuration diff --git a/microservices/data-ingestion-service/src/database.py b/microservices/data-ingestion-service/src/database.py index dfc1f8d..33bb063 100644 --- a/microservices/data-ingestion-service/src/database.py +++ b/microservices/data-ingestion-service/src/database.py @@ -58,7 +58,7 @@ class DatabaseManager: """Close MongoDB connection""" if self.client: self.client.close() - logger.info("MongoDB connection closed") + logger.debug("MongoDB connection closed") async def ping(self): """Test database connection""" @@ -68,7 +68,7 @@ class DatabaseManager: try: # The ping command is cheap and does not require auth. self.client.admin.command('ping') - logger.info("MongoDB ping successful") + logger.debug("MongoDB ping successful") except ConnectionFailure as e: logger.error(f"MongoDB ping failed - Server not available: {e}") raise @@ -121,7 +121,7 @@ class DatabaseManager: if records: result = self.collections['energy_data'].insert_many(records) inserted_count = len(result.inserted_ids) - logger.info(f"Stored {inserted_count} records from {filename}") + logger.debug(f"Stored {inserted_count} records from {filename}") return True return False @@ -163,6 +163,10 @@ class DatabaseManager: logger.error(f"Error getting processed files: {e}") return [] + async def is_file_processed(self, filename: str) -> bool: + """Mock check if file is processed""" + return filename in await self.get_processed_files() + async def get_file_info(self, filename: str) -> Optional[Dict[str, Any]]: """Get information about a specific file""" try: diff --git a/microservices/data-ingestion-service/src/ftp_monitor.py b/microservices/data-ingestion-service/src/ftp_monitor.py index f9a252d..12e03fa 100644 --- a/microservices/data-ingestion-service/src/ftp_monitor.py +++ b/microservices/data-ingestion-service/src/ftp_monitor.py @@ -48,11 +48,30 @@ class FTPMonitor: logger.info(f"FTP Monitor initialized for {self.ftp_host}") + async def initialize_processed_files_cache(self): + """Load already processed files from database into memory cache""" + try: + processed_file_names = await self.db_manager.get_processed_files() + # Convert filenames to full paths and add to processed_files set + for filename in processed_file_names: + # We'll use just the filename as the key since we check by filename + # But we need to be consistent with how we store paths + self.processed_files.add(filename) + + logger.info(f"Loaded {len(processed_file_names)} already processed files from database") + return len(processed_file_names) + except Exception as e: + logger.error(f"Error loading processed files from database: {e}") + return 0 + async def start_monitoring(self): """Start the monitoring loop""" self.status = "running" logger.info("Starting FTP monitoring loop") + # Initialize cache of processed files from database + await self.initialize_processed_files_cache() + while True: try: await self.check_for_new_files() @@ -84,17 +103,35 @@ class FTPMonitor: # Process new files processed_count = 0 + skipped_count = 0 for file_info in new_files: - if file_info.path not in self.processed_files: - success = await self._process_file(ftp, file_info) - if success: - self.processed_files.add(file_info.path) - processed_count += 1 - self.files_processed_count += 1 + # Check if file is already processed (using filename for cache consistency) + if file_info.name in self.processed_files: + logger.debug(f"Skipping already processed file (cached): {file_info.name}") + skipped_count += 1 + continue + + # Double-check with database (in case cache missed something) + if await self.db_manager.is_file_processed(file_info.name): + logger.debug(f"Skipping already processed file (database): {file_info.name}") + # Add to cache to avoid future database checks + self.processed_files.add(file_info.name) + skipped_count += 1 + continue + + # Process the file + logger.debug(f"Processing new file: {file_info.name}") + success = await self._process_file(ftp, file_info) + if success: + self.processed_files.add(file_info.name) + processed_count += 1 + logger.debug(f"Successfully processed file: {file_info.name} ({processed_count} total)") + self.files_processed_count += 1 result = { "files_found": len(new_files), "files_processed": processed_count, + "files_skipped": skipped_count, "timestamp": self.last_check.isoformat() } @@ -110,55 +147,115 @@ class FTPMonitor: files = [] try: - # Navigate to base path - ftp.cwd(self.base_path) - logger.info(f"Scanning directory: {self.base_path}") - - # Get directory listing - dir_list = [] - ftp.retrlines('LIST', dir_list.append) - logger.info(f"Received {len(dir_list)} directory entries") - - for line in dir_list: - print(line) - parts = line.split() - if len(parts) >= 9: - filename = parts[-1] - # Check if it's a .slg_v2 file - if filename.endswith('.sgl_v2'): - print('found file') - try: - size = int(parts[4]) - full_path = f"{self.base_path.rstrip('/')}/{filename}" - - files.append(FTPFileInfo( - path=full_path, - name=filename, - size=size - )) - - except (ValueError, IndexError): - logger.warning(f"Could not parse file info for: {filename}") - - logger.info(f"Found {len(files)} .slg_v2 files") + await self._scan_directories_iterative(ftp, self.base_path, files) + logger.info(f"Found {len(files)} .slg_v2 files across all directories") return files - except Exception as e: logger.error(f"Error scanning FTP directory: {e}") return [] + async def _scan_directories_iterative(self, ftp: FTP, base_path: str, files: List[FTPFileInfo]): + """Iteratively scan directories for .slg_v2 files using a queue approach""" + # Queue of directories to scan: (directory_path, depth) + directories_to_scan = [(base_path, 0)] + visited_dirs = set() + + while directories_to_scan: + current_dir, current_depth = directories_to_scan.pop(0) # FIFO queue + + # Normalize directory path + normalized_path = current_dir.rstrip('/') if current_dir != '/' else '/' + + # Skip if already visited (loop prevention) + if normalized_path in visited_dirs: + logger.debug(f"Skipping already visited directory: {normalized_path}") + continue + + # Mark as visited + visited_dirs.add(normalized_path) + logger.debug(f"Scanning directory: {normalized_path} (depth: {current_depth}, queue: {len(directories_to_scan)})") + + try: + # Navigate to directory + original_dir = ftp.pwd() + ftp.cwd(current_dir) + + # Get directory listing + dir_list = [] + ftp.retrlines('LIST', dir_list.append) + logger.debug(f"Found {len(dir_list)} entries in {normalized_path}") + + # Process entries + for line in dir_list: + parts = line.split() + if len(parts) >= 9: + filename = parts[-1] + permissions = parts[0] + + # Skip current and parent directory references + if filename in ['.', '..']: + continue + + # Handle directories + if permissions.startswith('d'): + # Create full subdirectory path + if normalized_path == '/': + subdirectory_path = f"/{filename}" + else: + subdirectory_path = f"{normalized_path}/{filename}" + + # Normalize subdirectory path + subdirectory_normalized = subdirectory_path.rstrip('/') if subdirectory_path != '/' else '/' + + # Add to queue if not already visited + if subdirectory_normalized not in visited_dirs: + directories_to_scan.append((subdirectory_path, current_depth + 1)) + logger.debug(f"Added to queue: {subdirectory_path}") + else: + logger.debug(f"Skipping already visited: {subdirectory_path}") + + # Handle .slg_v2 files + elif filename.endswith('.sgl_v2'): + logger.debug(f"Found .slg_v2 file: {filename} in {normalized_path}") + try: + size = int(parts[4]) + if normalized_path == '/': + full_path = f"/{filename}" + else: + full_path = f"{normalized_path}/{filename}" + + files.append(FTPFileInfo( + path=full_path, + name=filename, + size=size + )) + + except (ValueError, IndexError): + logger.warning(f"Could not parse file info for: {filename}") + + # Return to original directory + ftp.cwd(original_dir) + logger.debug(f"Completed scanning: {normalized_path}") + + except Exception as e: + logger.warning(f"Error scanning directory {normalized_path}: {e}") + continue + + logger.info(f"Iterative scan completed. Visited {len(visited_dirs)} directories") + async def _process_file(self, ftp: FTP, file_info: FTPFileInfo) -> bool: """Download and process a .slg_v2 file""" - logger.info(f"Processing file: {file_info.name} ({file_info.size} bytes)") + logger.debug(f"Processing file: {file_info.path} ({file_info.size} bytes)") try: # Create temporary file for download with tempfile.NamedTemporaryFile(mode='wb', suffix='.slg_v2', delete=False) as temp_file: temp_path = temp_file.name - # Download file + # Download file using full path with open(temp_path, 'wb') as f: - ftp.retrbinary(f'RETR {file_info.name}', f.write) + # Use the full path for RETR command + ftp.retrbinary(f'RETR {file_info.path}', f.write) # Process the downloaded file records = await self.processor.process_file(temp_path, file_info.name) @@ -166,7 +263,7 @@ class FTPMonitor: # Store in database if records: await self.db_manager.store_file_data(file_info.name, records) - logger.info(f"Stored {len(records)} records from {file_info.name}") + logger.debug(f"Stored {len(records)} records from {file_info.name}") return True else: logger.warning(f"No valid records found in {file_info.name}") @@ -205,5 +302,5 @@ class FTPMonitor: "processed_files_count": len(self.processed_files), "check_interval_hours": self.check_interval / 3600, "ftp_host": self.ftp_host, - "base_path": self.base_path + "base_path": self.base_path, } diff --git a/microservices/data-ingestion-service/src/main.py b/microservices/data-ingestion-service/src/main.py index aaacbc6..c5a63db 100644 --- a/microservices/data-ingestion-service/src/main.py +++ b/microservices/data-ingestion-service/src/main.py @@ -1,56 +1,43 @@ -""" -SA4CPS Data Ingestion Service -Simple FTP monitoring service for .sgl_v2 files with MongoDB storage -""" - from fastapi import FastAPI, HTTPException from contextlib import asynccontextmanager import asyncio import logging from datetime import datetime -from typing import Dict, Any +from typing import Any from ftp_monitor import FTPMonitor from database import DatabaseManager -# Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -# Global services ftp_monitor = None db_manager = None @asynccontextmanager async def lifespan(app: FastAPI): - """Application lifespan management""" global ftp_monitor, db_manager logger.info("Starting SA4CPS Data Ingestion Service...") - # Initialize database connection db_manager = DatabaseManager() await db_manager.connect() - # Initialize FTP monitor ftp_monitor = FTPMonitor(db_manager) - # Start background monitoring task monitoring_task = asyncio.create_task(ftp_monitor.start_monitoring()) logger.info("Service started successfully") yield - # Cleanup on shutdown logger.info("Shutting down service...") monitoring_task.cancel() await db_manager.close() logger.info("Service shutdown complete") -# Create FastAPI app app = FastAPI( title="SA4CPS Data Ingestion Service", description="Monitors FTP server for .sgl_v2 files and stores data in MongoDB", @@ -61,7 +48,6 @@ app = FastAPI( @app.get("/") async def root(): - """Root endpoint""" return { "service": "SA4CPS Data Ingestion Service", "status": "running", @@ -71,7 +57,6 @@ async def root(): @app.get("/health") async def health_check(): - """Health check endpoint""" global ftp_monitor, db_manager health_status = { @@ -101,7 +86,6 @@ async def health_check(): @app.get("/status") async def get_status(): - """Detailed status endpoint""" global ftp_monitor, db_manager if not ftp_monitor: @@ -116,7 +100,6 @@ async def get_status(): @app.post("/trigger-check") async def trigger_manual_check(): - """Manually trigger FTP check""" global ftp_monitor if not ftp_monitor: diff --git a/microservices/data-ingestion-service/tests/test_database_skip.py b/microservices/data-ingestion-service/tests/test_database_skip.py new file mode 100644 index 0000000..d7b67e5 --- /dev/null +++ b/microservices/data-ingestion-service/tests/test_database_skip.py @@ -0,0 +1,357 @@ +#!/usr/bin/env python3 +""" +Test database skip functionality +Tests that already processed files are skipped to avoid reprocessing +""" + +import asyncio +import sys +import os +from pathlib import Path +from unittest.mock import MagicMock, patch, AsyncMock +from typing import List + +# Add src directory to path +sys.path.append(str(Path(__file__).parent.parent / "src")) + +from ftp_monitor import FTPMonitor, FTPFileInfo + + +class MockDatabaseManager: + """Mock database manager for testing skip functionality""" + + def __init__(self): + self.processed_files = set() + self.stored_files = {} + + async def is_file_processed(self, filename: str) -> bool: + """Mock check if file is processed""" + return filename in self.processed_files + + async def get_processed_files(self) -> List[str]: + """Mock get list of processed files""" + return list(self.processed_files) + + async def store_file_data(self, filename: str, records: List) -> bool: + """Mock store file data""" + self.processed_files.add(filename) + self.stored_files[filename] = records + return True + + def mark_as_processed(self, filename: str): + """Helper method to mark file as processed for testing""" + self.processed_files.add(filename) + + +class MockFTP: + """Mock FTP client""" + + def __init__(self, directory_structure): + self.directory_structure = directory_structure + self.current_dir = '/' + + def pwd(self): + return self.current_dir + + def cwd(self, path): + if path in self.directory_structure: + self.current_dir = path + else: + raise Exception(f"Directory not found: {path}") + + def retrlines(self, command, callback): + """Mock LIST command""" + if not command.startswith('LIST'): + raise Exception(f"Unsupported command: {command}") + + current_struct = self.directory_structure.get(self.current_dir, {}) + + # Add files + for filename in current_struct.get('files', []): + callback(f"-rw-r--r-- 1 user group 1024 Jan 01 12:00 {filename}") + + +async def test_skip_already_processed_files(): + """Test that already processed files are skipped""" + print("๐Ÿงช Testing skip already processed files") + print("-" * 40) + + # Create mock directory with files + directory_structure = { + '/': { + 'files': ['file1.sgl_v2', 'file2.sgl_v2', 'file3.sgl_v2'] + } + } + + # Create mock database with some files already processed + mock_db = MockDatabaseManager() + mock_db.mark_as_processed('file1.sgl_v2') # Already processed + mock_db.mark_as_processed('file3.sgl_v2') # Already processed + # file2.sgl_v2 is NOT processed + + with patch('ftp_monitor.FTP_CONFIG', { + 'host': 'test.example.com', + 'username': 'testuser', + 'password': 'testpass', + 'base_path': '/', + 'check_interval': 3600, + 'recursive_scan': False, + 'max_recursion_depth': 5 + }): + # Create FTP monitor with mock database + monitor = FTPMonitor(mock_db) + + # Initialize cache from database + cache_count = await monitor.initialize_processed_files_cache() + print(f" Loaded {cache_count} files from database cache") + + # Verify cache was loaded correctly + assert cache_count == 2, f"Expected 2 cached files, got {cache_count}" + assert 'file1.sgl_v2' in monitor.processed_files + assert 'file3.sgl_v2' in monitor.processed_files + assert 'file2.sgl_v2' not in monitor.processed_files + + mock_ftp = MockFTP(directory_structure) + + # Mock the _process_file method to track which files are processed + processed_files = [] + original_process_file = monitor._process_file + + async def mock_process_file(ftp, file_info): + processed_files.append(file_info.name) + return True + + monitor._process_file = mock_process_file + + # Test file processing + result = await monitor.check_for_new_files() + + print(f"โœ… Processing complete") + print(f" Files found: {result['files_found']}") + print(f" Files processed: {result['files_processed']}") + print(f" Files skipped: {result['files_skipped']}") + + # Verify results + assert result['files_found'] == 3, "Should find 3 files total" + assert result['files_processed'] == 1, "Should process only 1 new file" + assert result['files_skipped'] == 2, "Should skip 2 already processed files" + + # Verify only file2.sgl_v2 was processed + assert len(processed_files) == 1, f"Expected 1 processed file, got {len(processed_files)}" + assert 'file2.sgl_v2' in processed_files, "Should process file2.sgl_v2" + + print("โœ… Skip already processed files test passed") + + +async def test_database_lookup_fallback(): + """Test that database lookup works when cache misses""" + print("\n๐Ÿงช Testing database lookup fallback") + print("-" * 40) + + # Create mock directory with files + directory_structure = { + '/': { + 'files': ['new_file.sgl_v2', 'db_only_file.sgl_v2'] + } + } + + # Create mock database + mock_db = MockDatabaseManager() + # Simulate a file that exists in database but not in cache + mock_db.mark_as_processed('db_only_file.sgl_v2') + + with patch('ftp_monitor.FTP_CONFIG', { + 'host': 'test.example.com', + 'username': 'testuser', + 'password': 'testpass', + 'base_path': '/', + 'check_interval': 3600, + 'recursive_scan': False, + 'max_recursion_depth': 5 + }): + monitor = FTPMonitor(mock_db) + + # Don't initialize cache - simulate starting with empty cache + # but database has processed files + + mock_ftp = MockFTP(directory_structure) + + # Mock the _process_file method + processed_files = [] + + async def mock_process_file(ftp, file_info): + processed_files.append(file_info.name) + return True + + monitor._process_file = mock_process_file + + # Test file processing + result = await monitor.check_for_new_files() + + print(f"โœ… Database fallback test complete") + print(f" Files found: {result['files_found']}") + print(f" Files processed: {result['files_processed']}") + print(f" Files skipped: {result['files_skipped']}") + + # Verify results + assert result['files_found'] == 2, "Should find 2 files total" + assert result['files_processed'] == 1, "Should process only 1 new file" + assert result['files_skipped'] == 1, "Should skip 1 database-processed file" + + # Verify only new_file.sgl_v2 was processed + assert len(processed_files) == 1, f"Expected 1 processed file, got {len(processed_files)}" + assert 'new_file.sgl_v2' in processed_files, "Should process new_file.sgl_v2" + + # Verify cache was updated with database file + assert 'db_only_file.sgl_v2' in monitor.processed_files, "Cache should be updated with database file" + + print("โœ… Database lookup fallback test passed") + + +async def test_cache_initialization(): + """Test that cache is properly initialized from database""" + print("\n๐Ÿงช Testing cache initialization") + print("-" * 35) + + # Create mock database with processed files + mock_db = MockDatabaseManager() + mock_db.mark_as_processed('old_file1.sgl_v2') + mock_db.mark_as_processed('old_file2.sgl_v2') + mock_db.mark_as_processed('old_file3.sgl_v2') + + with patch('ftp_monitor.FTP_CONFIG', { + 'host': 'test.example.com', + 'username': 'testuser', + 'password': 'testpass', + 'base_path': '/', + 'check_interval': 3600, + 'recursive_scan': False, + 'max_recursion_depth': 5 + }): + monitor = FTPMonitor(mock_db) + + # Verify cache starts empty + assert len(monitor.processed_files) == 0, "Cache should start empty" + + # Initialize cache + cache_count = await monitor.initialize_processed_files_cache() + + print(f"โœ… Cache initialized with {cache_count} files") + + # Verify cache is populated + assert cache_count == 3, f"Expected 3 cached files, got {cache_count}" + assert len(monitor.processed_files) == 3, "Cache should contain 3 files" + + expected_files = {'old_file1.sgl_v2', 'old_file2.sgl_v2', 'old_file3.sgl_v2'} + assert monitor.processed_files == expected_files, "Cache should contain expected files" + + print("โœ… Cache initialization test passed") + + +async def test_performance_with_many_processed_files(): + """Test performance with many already processed files""" + print("\n๐Ÿงช Testing performance with many processed files") + print("-" * 50) + + # Create many files, mostly already processed + all_files = [f"file_{i:04d}.sgl_v2" for i in range(100)] + new_files = [f"new_file_{i}.sgl_v2" for i in range(3)] + + directory_structure = { + '/': { + 'files': all_files + new_files + } + } + + # Create mock database with most files already processed + mock_db = MockDatabaseManager() + for filename in all_files: + mock_db.mark_as_processed(filename) + + with patch('ftp_monitor.FTP_CONFIG', { + 'host': 'test.example.com', + 'username': 'testuser', + 'password': 'testpass', + 'base_path': '/', + 'check_interval': 3600, + 'recursive_scan': False, + 'max_recursion_depth': 5 + }): + monitor = FTPMonitor(mock_db) + + # Initialize cache + cache_count = await monitor.initialize_processed_files_cache() + print(f" Loaded {cache_count} files into cache") + + mock_ftp = MockFTP(directory_structure) + + # Mock the _process_file method to track processing + processed_files = [] + db_lookups = 0 + + # Track database lookups + original_is_file_processed = mock_db.is_file_processed + + async def tracked_is_file_processed(filename): + nonlocal db_lookups + db_lookups += 1 + return await original_is_file_processed(filename) + + mock_db.is_file_processed = tracked_is_file_processed + + async def mock_process_file(ftp, file_info): + processed_files.append(file_info.name) + return True + + monitor._process_file = mock_process_file + + # Test file processing + result = await monitor.check_for_new_files() + + print(f"โœ… Performance test complete") + print(f" Files found: {result['files_found']}") + print(f" Files processed: {result['files_processed']}") + print(f" Files skipped: {result['files_skipped']}") + print(f" Database lookups: {db_lookups}") + + # Verify results + assert result['files_found'] == 103, "Should find 103 files total" + assert result['files_processed'] == 3, "Should process only 3 new files" + assert result['files_skipped'] == 100, "Should skip 100 already processed files" + + # Verify performance: should have minimal database lookups due to caching + assert db_lookups == 3, f"Should have only 3 database lookups (for new files), got {db_lookups}" + + # Verify only new files were processed + assert len(processed_files) == 3, f"Expected 3 processed files, got {len(processed_files)}" + for new_file in new_files: + assert new_file in processed_files, f"Should process {new_file}" + + print("โœ… Performance test passed") + + +async def main(): + """Main test function""" + print("๐Ÿš€ Database Skip Functionality Test Suite") + print("=" * 50) + + try: + await test_skip_already_processed_files() + await test_database_lookup_fallback() + await test_cache_initialization() + await test_performance_with_many_processed_files() + + print("\n" + "=" * 50) + print("โœ… All database skip tests passed!") + print("๐Ÿ’พ File duplication prevention is working correctly") + print("๐Ÿš€ Performance optimizations are effective") + + except Exception as e: + print(f"\nโŒ Test failed: {e}") + import traceback + traceback.print_exc() + sys.exit(1) + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/microservices/data-ingestion-service/tests/test_iterative_scan.py b/microservices/data-ingestion-service/tests/test_iterative_scan.py new file mode 100644 index 0000000..42fa98f --- /dev/null +++ b/microservices/data-ingestion-service/tests/test_iterative_scan.py @@ -0,0 +1,330 @@ +#!/usr/bin/env python3 +""" +Test FTP Monitor iterative directory scanning +Tests the new queue-based approach that prevents infinite loops +""" + +import asyncio +import sys +import os +from pathlib import Path +from unittest.mock import MagicMock, patch +from typing import List + +# Add src directory to path +sys.path.append(str(Path(__file__).parent.parent / "src")) + +from ftp_monitor import FTPMonitor, FTPFileInfo + + +class MockFTP: + """Mock FTP client for testing iterative scanning""" + + def __init__(self, directory_structure): + self.directory_structure = directory_structure + self.current_dir = '/' + self.operations_log = [] # Track all operations for debugging + + def pwd(self): + return self.current_dir + + def cwd(self, path): + self.operations_log.append(f"CWD: {path}") + if path in self.directory_structure: + self.current_dir = path + else: + raise Exception(f"Directory not found: {path}") + + def retrlines(self, command, callback): + """Mock LIST command""" + if not command.startswith('LIST'): + raise Exception(f"Unsupported command: {command}") + + self.operations_log.append(f"LIST: {self.current_dir}") + current_struct = self.directory_structure.get(self.current_dir, {}) + + # Add directories + for dirname in current_struct.get('directories', {}): + callback(f"drwxr-xr-x 2 user group 4096 Jan 01 12:00 {dirname}") + + # Add files + for filename in current_struct.get('files', []): + callback(f"-rw-r--r-- 1 user group 1024 Jan 01 12:00 {filename}") + + +async def test_simple_directory_structure(): + """Test iterative scanning with simple nested structure""" + print("๐Ÿงช Testing simple directory structure") + print("-" * 40) + + directory_structure = { + '/': { + 'files': ['root.sgl_v2'], + 'directories': { + 'level1': {}, + 'level2': {} + } + }, + '/level1': { + 'files': ['file1.sgl_v2'], + 'directories': { + 'nested': {} + } + }, + '/level1/nested': { + 'files': ['nested.sgl_v2'], + 'directories': {} + }, + '/level2': { + 'files': ['file2.sgl_v2'], + 'directories': {} + } + } + + mock_db = MagicMock() + + with patch('ftp_monitor.FTP_CONFIG', { + 'host': 'test.example.com', + 'username': 'testuser', + 'password': 'testpass', + 'base_path': '/', + 'check_interval': 3600, + 'recursive_scan': True, + 'max_recursion_depth': 10 + }): + monitor = FTPMonitor(mock_db) + mock_ftp = MockFTP(directory_structure) + + # Test iterative scan + files = [] + await monitor._scan_directories_iterative(mock_ftp, '/', files) + + print(f"โœ… Found {len(files)} files") + print(f" Operations: {len(mock_ftp.operations_log)}") + + # Verify all files were found + file_names = [f.name for f in files] + expected_files = ['root.sgl_v2', 'file1.sgl_v2', 'nested.sgl_v2', 'file2.sgl_v2'] + + assert len(files) == 4, f"Expected 4 files, got {len(files)}" + for expected_file in expected_files: + assert expected_file in file_names, f"Missing file: {expected_file}" + + # Check that operations are reasonable (no infinite loops) + assert len(mock_ftp.operations_log) < 20, f"Too many operations: {len(mock_ftp.operations_log)}" + + print("โœ… Simple structure test passed") + + +async def test_circular_references(): + """Test that circular references are handled correctly""" + print("\n๐Ÿงช Testing circular references") + print("-" * 40) + + # Create structure with circular reference + directory_structure = { + '/': { + 'files': ['root.sgl_v2'], + 'directories': { + 'dirA': {} + } + }, + '/dirA': { + 'files': ['fileA.sgl_v2'], + 'directories': { + 'dirB': {} + } + }, + '/dirA/dirB': { + 'files': ['fileB.sgl_v2'], + 'directories': { + 'dirA': {} # This would create A -> B -> A loop in recursive approach + } + } + } + + mock_db = MagicMock() + + with patch('ftp_monitor.FTP_CONFIG', { + 'host': 'test.example.com', + 'username': 'testuser', + 'password': 'testpass', + 'base_path': '/', + 'check_interval': 3600, + 'recursive_scan': True, + 'max_recursion_depth': 5 + }): + monitor = FTPMonitor(mock_db) + mock_ftp = MockFTP(directory_structure) + + # Test iterative scan + files = [] + await monitor._scan_directories_iterative(mock_ftp, '/', files) + + print(f"โœ… Handled circular references") + print(f" Files found: {len(files)}") + print(f" Operations: {len(mock_ftp.operations_log)}") + + # Should find all files without getting stuck + file_names = [f.name for f in files] + expected_files = ['root.sgl_v2', 'fileA.sgl_v2', 'fileB.sgl_v2'] + + assert len(files) == 3, f"Expected 3 files, got {len(files)}" + for expected_file in expected_files: + assert expected_file in file_names, f"Missing file: {expected_file}" + + # Should not have excessive operations (indicating no infinite loop) + assert len(mock_ftp.operations_log) < 15, f"Too many operations: {len(mock_ftp.operations_log)}" + + print("โœ… Circular references test passed") + + +async def test_deep_structure_with_limit(): + """Test deep directory structure respects depth limit""" + print("\n๐Ÿงช Testing deep structure with depth limit") + print("-" * 45) + + # Create deep structure + directory_structure = { + '/': { + 'files': ['root.sgl_v2'], + 'directories': { + 'level1': {} + } + }, + '/level1': { + 'files': ['file1.sgl_v2'], + 'directories': { + 'level2': {} + } + }, + '/level1/level2': { + 'files': ['file2.sgl_v2'], + 'directories': { + 'level3': {} + } + }, + '/level1/level2/level3': { + 'files': ['deep_file.sgl_v2'], # Should not be found due to depth limit + 'directories': {} + } + } + + mock_db = MagicMock() + + # Set low depth limit + with patch('ftp_monitor.FTP_CONFIG', { + 'host': 'test.example.com', + 'username': 'testuser', + 'password': 'testpass', + 'base_path': '/', + 'check_interval': 3600, + 'recursive_scan': True, + 'max_recursion_depth': 2 # Should stop at level 2 + }): + monitor = FTPMonitor(mock_db) + mock_ftp = MockFTP(directory_structure) + + # Test iterative scan with depth limit + files = [] + await monitor._scan_directories_iterative(mock_ftp, '/', files) + + print(f"โœ… Depth limit respected") + print(f" Files found: {len(files)}") + + # Should find files up to depth 2, but not deeper + file_names = [f.name for f in files] + + assert 'root.sgl_v2' in file_names, "Should find root file (depth 0)" + assert 'file1.sgl_v2' in file_names, "Should find level 1 file (depth 1)" + assert 'file2.sgl_v2' in file_names, "Should find level 2 file (depth 2)" + assert 'deep_file.sgl_v2' not in file_names, "Should NOT find deep file (depth 3)" + + print("โœ… Depth limit test passed") + + +async def test_queue_behavior(): + """Test that the queue processes directories in FIFO order""" + print("\n๐Ÿงช Testing queue FIFO behavior") + print("-" * 35) + + directory_structure = { + '/': { + 'files': [], + 'directories': { + 'first': {}, + 'second': {}, + 'third': {} + } + }, + '/first': { + 'files': ['first.sgl_v2'], + 'directories': {} + }, + '/second': { + 'files': ['second.sgl_v2'], + 'directories': {} + }, + '/third': { + 'files': ['third.sgl_v2'], + 'directories': {} + } + } + + mock_db = MagicMock() + + with patch('ftp_monitor.FTP_CONFIG', { + 'host': 'test.example.com', + 'username': 'testuser', + 'password': 'testpass', + 'base_path': '/', + 'check_interval': 3600, + 'recursive_scan': True, + 'max_recursion_depth': 5 + }): + monitor = FTPMonitor(mock_db) + mock_ftp = MockFTP(directory_structure) + + # Test iterative scan + files = [] + await monitor._scan_directories_iterative(mock_ftp, '/', files) + + print(f"โœ… Queue behavior test completed") + print(f" Files found: {len(files)}") + + # Should find all files + assert len(files) == 3, f"Expected 3 files, got {len(files)}" + + file_names = [f.name for f in files] + expected_files = ['first.sgl_v2', 'second.sgl_v2', 'third.sgl_v2'] + + for expected_file in expected_files: + assert expected_file in file_names, f"Missing file: {expected_file}" + + print("โœ… Queue behavior test passed") + + +async def main(): + """Main test function""" + print("๐Ÿš€ FTP Monitor Iterative Scanning Test Suite") + print("=" * 55) + + try: + await test_simple_directory_structure() + await test_circular_references() + await test_deep_structure_with_limit() + await test_queue_behavior() + + print("\n" + "=" * 55) + print("โœ… All iterative scanning tests passed!") + print("๐Ÿ”„ Queue-based approach is working correctly") + + except Exception as e: + print(f"\nโŒ Test failed: {e}") + import traceback + traceback.print_exc() + sys.exit(1) + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/microservices/docker-compose.yml b/microservices/docker-compose.yml index 18a7628..33f695a 100644 --- a/microservices/docker-compose.yml +++ b/microservices/docker-compose.yml @@ -4,7 +4,7 @@ services: # Database Services mongodb: image: mongo:5.0 - container_name: energy-mongodb + container_name: mongodb restart: unless-stopped environment: MONGO_INITDB_ROOT_USERNAME: admin @@ -19,7 +19,7 @@ services: redis: image: redis:7-alpine - container_name: energy-redis + container_name: redis restart: unless-stopped ports: - "6379:6379" @@ -33,7 +33,7 @@ services: build: context: ./api-gateway dockerfile: Dockerfile - container_name: energy-api-gateway + container_name: api-gateway restart: unless-stopped ports: - "8000:8000" @@ -51,7 +51,7 @@ services: depends_on: - mongodb - redis - - token-service + # - token-service - sensor-service - data-ingestion-service # - battery-service @@ -60,28 +60,28 @@ services: - energy-network # Token Management Service - token-service: - build: - context: ./token-service - dockerfile: Dockerfile - container_name: energy-token-service - restart: unless-stopped - ports: - - "8001:8001" - environment: - - MONGO_URL=mongodb://admin:password123@localhost:27017/energy_dashboard_tokens?authSource=admin - - JWT_SECRET_KEY=your-super-secret-jwt-key-change-in-production - depends_on: - - mongodb - networks: - - energy-network + # token-service: + # build: + # context: ./token-service + # dockerfile: Dockerfile + # container_name: token-service + # restart: unless-stopped + # ports: + # - "8001:8001" + # environment: + # - MONGO_URL=mongodb://admin:password123@localhost:27017/energy_dashboard_tokens?authSource=admin + # - JWT_SECRET_KEY=your-super-secret-jwt-key-change-in-production + # depends_on: + # - mongodb + # networks: + # - energy-network # Battery Management Service # battery-service: # build: # context: ./battery-service # dockerfile: Dockerfile - # container_name: energy-battery-service + # container_name: battery-service # restart: unless-stopped # ports: # - "8002:8002" @@ -99,7 +99,7 @@ services: # build: # context: ./demand-response-service # dockerfile: Dockerfile - # container_name: energy-demand-response-service + # container_name: demand-response-service # restart: unless-stopped # ports: # - "8003:8003" @@ -118,7 +118,7 @@ services: # build: # context: ./p2p-trading-service # dockerfile: Dockerfile - # container_name: energy-p2p-trading-service + # container_name: p2p-trading-service # restart: unless-stopped # ports: # - "8004:8004" @@ -136,7 +136,7 @@ services: # build: # context: ./forecasting-service # dockerfile: Dockerfile - # container_name: energy-forecasting-service + # container_name: forecasting-service # restart: unless-stopped # ports: # - "8005:8005" @@ -154,7 +154,7 @@ services: # build: # context: ./iot-control-service # dockerfile: Dockerfile - # container_name: energy-iot-control-service + # container_name: iot-control-service # restart: unless-stopped # ports: # - "8006:8006" @@ -174,7 +174,7 @@ services: build: context: ./data-ingestion-service dockerfile: Dockerfile - container_name: energy-data-ingestion-service + container_name: data-ingestion-service restart: unless-stopped ports: - "8008:8008" @@ -183,8 +183,7 @@ services: - FTP_SA4CPS_HOST=ftp.sa4cps.pt - FTP_SA4CPS_PORT=21 - FTP_SA4CPS_USERNAME=curvascarga@sa4cps.pt - - FTP_SA4CPS_PASSWORD=n$WFtz9+bleN - - FTP_SA4CPS_REMOTE_PATH=/ + - FTP_SA4CPS_REMOTE_PATH=/SLGs/ - FTP_CHECK_INTERVAL=21600 depends_on: - mongodb @@ -196,7 +195,7 @@ services: build: context: ./sensor-service dockerfile: Dockerfile - container_name: energy-sensor-service + container_name: sensor-service restart: unless-stopped ports: - "8007:8007" @@ -213,7 +212,7 @@ services: # Monitoring and Management nginx: image: nginx:alpine - container_name: energy-nginx + container_name: nginx restart: unless-stopped ports: - "80:80"