diff --git a/microservices/data-ingestion-service/Dockerfile b/microservices/data-ingestion-service/Dockerfile index af25270..45ad51c 100644 --- a/microservices/data-ingestion-service/Dockerfile +++ b/microservices/data-ingestion-service/Dockerfile @@ -10,10 +10,10 @@ WORKDIR /app # Install system dependencies RUN apt-get update \ && apt-get install -y --no-install-recommends \ - build-essential \ - curl \ - libssl-dev \ - libffi-dev \ + build-essential \ + curl \ + libssl-dev \ + libffi-dev \ && rm -rf /var/lib/apt/lists/* # Copy requirements and install Python dependencies @@ -34,10 +34,10 @@ ENV PYTHONPATH="/app/src:$PYTHONPATH" # Expose port EXPOSE 8008 -# Health check -HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ +# Health check - allow more time for service initialization +HEALTHCHECK --interval=30s --timeout=10s --start-period=120s --retries=5 \ CMD curl -f http://localhost:8008/health || exit 1 # Start the application from src directory WORKDIR /app/src -CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8008"] \ No newline at end of file +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8008"] diff --git a/microservices/data-ingestion-service/src/config.py b/microservices/data-ingestion-service/src/config.py index 27ca0b0..dbcb293 100644 --- a/microservices/data-ingestion-service/src/config.py +++ b/microservices/data-ingestion-service/src/config.py @@ -14,6 +14,7 @@ FTP_CONFIG: Dict[str, Any] = { "password": os.getenv("FTP_SA4CPS_PASSWORD", 'n$WFtz9+bleN'), # Set via environment variable "base_path": os.getenv("FTP_SA4CPS_REMOTE_PATH", "/SLGs/"), "check_interval": int(os.getenv("FTP_CHECK_INTERVAL", "21600")), # 6 hours default + "skip_initial_scan": os.getenv("FTP_SKIP_INITIAL_SCAN", "true").lower() == "true", } # MongoDB Configuration diff --git a/microservices/data-ingestion-service/src/database.py b/microservices/data-ingestion-service/src/database.py index 33bb063..a75346d 100644 --- a/microservices/data-ingestion-service/src/database.py +++ b/microservices/data-ingestion-service/src/database.py @@ -1,8 +1,3 @@ -""" -MongoDB Database Manager for SA4CPS Data Ingestion -Simple sync MongoDB operations for storing .sgl_v2 file data -""" - import logging from datetime import datetime from typing import List, Dict, Any, Optional @@ -15,37 +10,32 @@ logger = logging.getLogger(__name__) class DatabaseManager: - """Manages MongoDB connections and operations for SA4CPS data""" def __init__(self): self.client: Optional[MongoClient] = None self.db = None self.collections = {} - # MongoDB configuration self.connection_string = MONGO_CONFIG["connection_string"] self.database_name = MONGO_CONFIG["database_name"] logger.info(f"Database manager initialized for: {self.database_name}") async def connect(self): - """Connect to MongoDB""" try: logger.info(f"Connecting to MongoDB at: {self.connection_string}") self.client = MongoClient(self.connection_string, serverSelectionTimeoutMS=5000) - # Test connection await self.ping() - # Get database and collections self.db = self.client[self.database_name] self.collections = { 'files': self.db.sa4cps_files, 'energy_data': self.db.sa4cps_energy_data, - 'metadata': self.db.sa4cps_metadata + 'metadata': self.db.sa4cps_metadata, + 'scanned_directories': self.db.sa4cps_scanned_directories } - # Create indexes for better performance self._create_indexes() logger.info(f"Connected to MongoDB database: {self.database_name}") @@ -66,9 +56,21 @@ class DatabaseManager: raise ConnectionFailure("No database connection") try: - # The ping command is cheap and does not require auth. - self.client.admin.command('ping') + # Use async approach with timeout + import asyncio + import concurrent.futures + + # Run the ping command in a thread pool to avoid blocking + loop = asyncio.get_event_loop() + with concurrent.futures.ThreadPoolExecutor() as pool: + await asyncio.wait_for( + loop.run_in_executor(pool, self.client.admin.command, 'ping'), + timeout=3.0 # 3 second timeout for ping + ) logger.debug("MongoDB ping successful") + except asyncio.TimeoutError: + logger.error("MongoDB ping timeout after 3 seconds") + raise ConnectionFailure("MongoDB ping timeout") except ConnectionFailure as e: logger.error(f"MongoDB ping failed - Server not available: {e}") raise @@ -77,22 +79,22 @@ class DatabaseManager: raise ConnectionFailure(f"Ping failed: {e}") def _create_indexes(self): - """Create database indexes for efficient queries""" try: - # Index on files collection self.collections['files'].create_index("filename", unique=True) self.collections['files'].create_index("processed_at") - # Index on energy data collection self.collections['energy_data'].create_index([("filename", 1), ("timestamp", 1)]) self.collections['energy_data'].create_index("timestamp") + self.collections['scanned_directories'].create_index("directory_path", unique=True) + self.collections['scanned_directories'].create_index("last_scanned") + self.collections['scanned_directories'].create_index("scan_status") + logger.info("Database indexes created successfully") except Exception as e: logger.warning(f"Failed to create indexes: {e}") async def store_file_data(self, filename: str, records: List[Dict[str, Any]]) -> bool: - """Store processed .sgl_v2 file data in MongoDB""" try: current_time = datetime.now() @@ -175,6 +177,77 @@ class DatabaseManager: logger.error(f"Error getting file info for {filename}: {e}") return None + # Directory scanning tracking methods + async def is_directory_scanned(self, directory_path: str, since_timestamp: datetime = None) -> bool: + """Check if directory has been scanned recently""" + try: + query = {"directory_path": directory_path, "scan_status": "complete"} + if since_timestamp: + query["last_scanned"] = {"$gte": since_timestamp} + + result = self.collections['scanned_directories'].find_one(query) + return result is not None + except Exception as e: + logger.error(f"Error checking directory scan status for {directory_path}: {e}") + return False + + async def mark_directory_scanned(self, directory_path: str, file_count: int, ftp_last_modified: datetime = None) -> bool: + """Mark directory as scanned with current timestamp""" + try: + scan_record = { + "directory_path": directory_path, + "last_scanned": datetime.now(), + "file_count": file_count, + "scan_status": "complete" + } + + if ftp_last_modified: + scan_record["ftp_last_modified"] = ftp_last_modified + + # Use upsert to update existing or create new record + self.collections['scanned_directories'].replace_one( + {"directory_path": directory_path}, + scan_record, + upsert=True + ) + + logger.debug(f"Marked directory as scanned: {directory_path} ({file_count} files)") + return True + + except Exception as e: + logger.error(f"Error marking directory as scanned {directory_path}: {e}") + return False + + async def get_scanned_directories(self) -> List[Dict[str, Any]]: + """Get all scanned directory records""" + try: + cursor = self.collections['scanned_directories'].find() + return list(cursor) + except Exception as e: + logger.error(f"Error getting scanned directories: {e}") + return [] + + async def should_skip_directory(self, directory_path: str, ftp_last_modified: datetime = None) -> bool: + """Determine if directory should be skipped based on scan history and modification time""" + try: + scan_record = self.collections['scanned_directories'].find_one( + {"directory_path": directory_path, "scan_status": "complete"} + ) + + if not scan_record: + return False # Never scanned, should scan + + # If we have FTP modification time and it's newer than our last scan, don't skip + if ftp_last_modified and scan_record.get("last_scanned"): + return ftp_last_modified <= scan_record["last_scanned"] + + # If directory was scanned successfully, skip it (assuming it's historical data) + return True + + except Exception as e: + logger.error(f"Error determining if directory should be skipped {directory_path}: {e}") + return False + async def get_stats(self) -> Dict[str, Any]: """Get database statistics""" try: diff --git a/microservices/data-ingestion-service/src/ftp_monitor.py b/microservices/data-ingestion-service/src/ftp_monitor.py index 12e03fa..79556fd 100644 --- a/microservices/data-ingestion-service/src/ftp_monitor.py +++ b/microservices/data-ingestion-service/src/ftp_monitor.py @@ -1,9 +1,3 @@ -#!/usr/bin/env python3 -""" -FTP Monitor for SA4CPS .slg_v2 files -Monitors ftp.sa4cps.pt for new monthly files -""" - import asyncio from ftplib import FTP import logging @@ -18,10 +12,8 @@ from slg_processor import SLGProcessor logger = logging.getLogger(__name__) - @dataclass class FTPFileInfo: - """Information about an FTP file""" path: str name: str size: int @@ -29,8 +21,6 @@ class FTPFileInfo: class FTPMonitor: - """Monitors SA4CPS FTP server for new .slg_v2 files""" - def __init__(self, db_manager): self.db_manager = db_manager self.processor = SLGProcessor() @@ -39,87 +29,129 @@ class FTPMonitor: self.files_processed_count = 0 self.status = "initializing" - # FTP connection settings self.ftp_host = FTP_CONFIG["host"] self.ftp_user = FTP_CONFIG["username"] self.ftp_pass = FTP_CONFIG["password"] self.base_path = FTP_CONFIG["base_path"] self.check_interval = FTP_CONFIG["check_interval"] + self.skip_initial_scan = FTP_CONFIG["skip_initial_scan"] logger.info(f"FTP Monitor initialized for {self.ftp_host}") async def initialize_processed_files_cache(self): - """Load already processed files from database into memory cache""" try: - processed_file_names = await self.db_manager.get_processed_files() - # Convert filenames to full paths and add to processed_files set + # Add timeout to prevent blocking startup indefinitely + processed_file_names = await asyncio.wait_for( + self.db_manager.get_processed_files(), + timeout=10.0 # 10 second timeout + ) + for filename in processed_file_names: - # We'll use just the filename as the key since we check by filename - # But we need to be consistent with how we store paths self.processed_files.add(filename) logger.info(f"Loaded {len(processed_file_names)} already processed files from database") return len(processed_file_names) + except asyncio.TimeoutError: + logger.warning("Timeout loading processed files cache - continuing with empty cache") + return 0 except Exception as e: logger.error(f"Error loading processed files from database: {e}") return 0 async def start_monitoring(self): - """Start the monitoring loop""" - self.status = "running" + self.status = "initializing" logger.info("Starting FTP monitoring loop") - # Initialize cache of processed files from database - await self.initialize_processed_files_cache() + try: + await self.initialize_processed_files_cache() + logger.info("FTP monitor initialization completed") + except asyncio.CancelledError: + logger.info("FTP monitor initialization cancelled") + self.status = "stopped" + return + except Exception as e: + logger.error(f"Error during FTP monitor initialization: {e}") + self.status = "error" + try: + await asyncio.sleep(1800) + except asyncio.CancelledError: + logger.info("FTP monitor cancelled during error recovery") + self.status = "stopped" + return + + self.status = "running" + + # Optionally skip initial scan and wait for first scheduled interval + if self.skip_initial_scan: + logger.info(f"Skipping initial scan - waiting {self.check_interval/3600:.1f} hours for first scheduled check") + try: + await asyncio.sleep(self.check_interval) + except asyncio.CancelledError: + logger.info("FTP monitoring cancelled during initial wait") + self.status = "stopped" + return while True: try: - await self.check_for_new_files() + # Add timeout to prevent indefinite blocking on FTP operations + await asyncio.wait_for(self.check_for_new_files(), timeout=300.0) # 5 minute timeout self.status = "running" - # Wait for next check (6 hours) logger.info(f"Waiting {self.check_interval/3600:.1f} hours until next check") await asyncio.sleep(self.check_interval) + except asyncio.TimeoutError: + logger.warning("FTP check timed out after 5 minutes - will retry") + self.status = "error" + try: + await asyncio.sleep(900) # Wait 15 minutes before retry + except asyncio.CancelledError: + logger.info("FTP monitoring task cancelled during timeout recovery") + self.status = "stopped" + break + except asyncio.CancelledError: + logger.info("FTP monitoring task cancelled - shutting down gracefully") + self.status = "stopped" + break except Exception as e: self.status = "error" logger.error(f"Error in monitoring loop: {e}") - # Wait 30 minutes before retrying on error - await asyncio.sleep(1800) + try: + await asyncio.sleep(1800) # Wait 30 minutes before retry + except asyncio.CancelledError: + logger.info("FTP monitoring task cancelled during error recovery") + self.status = "stopped" + break async def check_for_new_files(self) -> Dict[str, Any]: - """Check FTP server for new .slg_v2 files""" self.last_check = datetime.now() logger.info(f"Checking FTP server at {self.last_check}") try: - # Connect to FTP server with FTP(self.ftp_host) as ftp: ftp.login(self.ftp_user, self.ftp_pass) logger.info(f"Connected to FTP server: {self.ftp_host}") - # Find .slg_v2 files new_files = await self._find_slg_files(ftp) - # Process new files processed_count = 0 skipped_count = 0 for file_info in new_files: - # Check if file is already processed (using filename for cache consistency) + # Check for cancellation during file processing loop + if asyncio.current_task().cancelled(): + raise asyncio.CancelledError() + if file_info.name in self.processed_files: logger.debug(f"Skipping already processed file (cached): {file_info.name}") skipped_count += 1 continue - # Double-check with database (in case cache missed something) if await self.db_manager.is_file_processed(file_info.name): logger.debug(f"Skipping already processed file (database): {file_info.name}") - # Add to cache to avoid future database checks self.processed_files.add(file_info.name) skipped_count += 1 continue - # Process the file logger.debug(f"Processing new file: {file_info.name}") success = await self._process_file(ftp, file_info) if success: @@ -143,7 +175,6 @@ class FTPMonitor: raise async def _find_slg_files(self, ftp: FTP) -> List[FTPFileInfo]: - """Find .sgl_v2 files in the FTP directory structure""" files = [] try: @@ -155,66 +186,65 @@ class FTPMonitor: return [] async def _scan_directories_iterative(self, ftp: FTP, base_path: str, files: List[FTPFileInfo]): - """Iteratively scan directories for .slg_v2 files using a queue approach""" - # Queue of directories to scan: (directory_path, depth) directories_to_scan = [(base_path, 0)] visited_dirs = set() + skipped_dirs = 0 + scanned_dirs = 0 while directories_to_scan: current_dir, current_depth = directories_to_scan.pop(0) # FIFO queue - # Normalize directory path normalized_path = current_dir.rstrip('/') if current_dir != '/' else '/' - # Skip if already visited (loop prevention) if normalized_path in visited_dirs: logger.debug(f"Skipping already visited directory: {normalized_path}") continue - # Mark as visited visited_dirs.add(normalized_path) + + # Check if directory should be skipped based on previous scans + if await self.db_manager.should_skip_directory(normalized_path): + logger.info(f"Skipping previously scanned directory: {normalized_path}") + skipped_dirs += 1 + continue + logger.debug(f"Scanning directory: {normalized_path} (depth: {current_depth}, queue: {len(directories_to_scan)})") + scanned_dirs += 1 try: - # Navigate to directory original_dir = ftp.pwd() ftp.cwd(current_dir) - # Get directory listing dir_list = [] ftp.retrlines('LIST', dir_list.append) logger.debug(f"Found {len(dir_list)} entries in {normalized_path}") - # Process entries + # Count files found in this directory + files_found_in_dir = 0 + for line in dir_list: parts = line.split() if len(parts) >= 9: filename = parts[-1] permissions = parts[0] - # Skip current and parent directory references if filename in ['.', '..']: continue - # Handle directories if permissions.startswith('d'): - # Create full subdirectory path if normalized_path == '/': subdirectory_path = f"/{filename}" else: subdirectory_path = f"{normalized_path}/{filename}" - # Normalize subdirectory path subdirectory_normalized = subdirectory_path.rstrip('/') if subdirectory_path != '/' else '/' - # Add to queue if not already visited if subdirectory_normalized not in visited_dirs: directories_to_scan.append((subdirectory_path, current_depth + 1)) logger.debug(f"Added to queue: {subdirectory_path}") else: logger.debug(f"Skipping already visited: {subdirectory_path}") - # Handle .slg_v2 files elif filename.endswith('.sgl_v2'): logger.debug(f"Found .slg_v2 file: {filename} in {normalized_path}") try: @@ -229,38 +259,35 @@ class FTPMonitor: name=filename, size=size )) + files_found_in_dir += 1 except (ValueError, IndexError): logger.warning(f"Could not parse file info for: {filename}") - # Return to original directory ftp.cwd(original_dir) - logger.debug(f"Completed scanning: {normalized_path}") + + # Mark directory as scanned + await self.db_manager.mark_directory_scanned(normalized_path, files_found_in_dir) + logger.debug(f"Completed scanning: {normalized_path} ({files_found_in_dir} files found)") except Exception as e: logger.warning(f"Error scanning directory {normalized_path}: {e}") continue - logger.info(f"Iterative scan completed. Visited {len(visited_dirs)} directories") + logger.info(f"Iterative scan completed. Scanned: {scanned_dirs} directories, Skipped: {skipped_dirs} directories (Total visited: {len(visited_dirs)})") async def _process_file(self, ftp: FTP, file_info: FTPFileInfo) -> bool: - """Download and process a .slg_v2 file""" logger.debug(f"Processing file: {file_info.path} ({file_info.size} bytes)") try: - # Create temporary file for download with tempfile.NamedTemporaryFile(mode='wb', suffix='.slg_v2', delete=False) as temp_file: temp_path = temp_file.name - # Download file using full path with open(temp_path, 'wb') as f: - # Use the full path for RETR command ftp.retrbinary(f'RETR {file_info.path}', f.write) - # Process the downloaded file records = await self.processor.process_file(temp_path, file_info.name) - # Store in database if records: await self.db_manager.store_file_data(file_info.name, records) logger.debug(f"Stored {len(records)} records from {file_info.name}") @@ -274,7 +301,6 @@ class FTPMonitor: return False finally: - # Clean up temporary file try: if 'temp_path' in locals(): os.unlink(temp_path) @@ -282,19 +308,15 @@ class FTPMonitor: pass def get_status(self) -> str: - """Get current monitor status""" return self.status def get_last_check_time(self) -> Optional[str]: - """Get last check time as ISO string""" return self.last_check.isoformat() if self.last_check else None def get_processed_count(self) -> int: - """Get total number of files processed""" return self.files_processed_count def get_detailed_status(self) -> Dict[str, Any]: - """Get detailed status information""" return { "status": self.status, "last_check": self.get_last_check_time(), diff --git a/microservices/data-ingestion-service/src/main.py b/microservices/data-ingestion-service/src/main.py index c5a63db..bf02b30 100644 --- a/microservices/data-ingestion-service/src/main.py +++ b/microservices/data-ingestion-service/src/main.py @@ -8,7 +8,7 @@ from typing import Any from ftp_monitor import FTPMonitor from database import DatabaseManager -logging.basicConfig(level=logging.INFO) +logging.basicConfig(level=logging.DEBUG) logger = logging.getLogger(__name__) ftp_monitor = None @@ -23,18 +23,36 @@ async def lifespan(app: FastAPI): db_manager = DatabaseManager() await db_manager.connect() + logger.info("Database connection established") ftp_monitor = FTPMonitor(db_manager) + logger.info("FTP monitor created") monitoring_task = asyncio.create_task(ftp_monitor.start_monitoring()) + logger.info("FTP monitoring task started in background") - logger.info("Service started successfully") + logger.info("Service startup complete - HTTP server ready to accept requests") yield logger.info("Shutting down service...") - monitoring_task.cancel() - await db_manager.close() + + # Cancel monitoring task and wait for graceful shutdown + if not monitoring_task.done(): + monitoring_task.cancel() + try: + await asyncio.wait_for(monitoring_task, timeout=5.0) + logger.info("Monitoring task stopped gracefully") + except asyncio.TimeoutError: + logger.warning("Monitoring task shutdown timeout - forcing termination") + except asyncio.CancelledError: + logger.info("Monitoring task cancelled successfully") + + # Close database connection + if db_manager: + await db_manager.close() + logger.info("Database connection closed") + logger.info("Service shutdown complete") @@ -66,24 +84,85 @@ async def health_check(): "ftp_monitor": "unknown" } + service_issues = [] + # Check database connection if db_manager: try: await db_manager.ping() health_status["database"] = "connected" - except Exception: + except Exception as e: health_status["database"] = "disconnected" - health_status["service"] = "degraded" + service_issues.append("database_disconnected") + logger.warning(f"Database health check failed: {e}") + else: + health_status["database"] = "not_initialized" + health_status["service"] = "starting" # Check FTP monitor status if ftp_monitor: - health_status["ftp_monitor"] = ftp_monitor.get_status() - health_status["last_check"] = ftp_monitor.get_last_check_time() - health_status["files_processed"] = ftp_monitor.get_processed_count() + ftp_status = ftp_monitor.get_status() + health_status["ftp_monitor"] = ftp_status + + try: + health_status["last_check"] = ftp_monitor.get_last_check_time() + health_status["files_processed"] = ftp_monitor.get_processed_count() + except: + # Don't fail health check if optional status fields fail + pass + + # Improved service status logic - be more tolerant during startup + if ftp_status == "initializing": + # Service is initializing but can still be considered healthy for basic operations + if health_status["database"] == "connected": + health_status["service"] = "healthy" # Database is ready, FTP is starting + else: + health_status["service"] = "starting" + elif ftp_status == "error": + service_issues.append("ftp_monitor_error") + elif ftp_status == "running": + pass # Keep healthy status + else: + health_status["ftp_monitor"] = "not_initialized" + # Don't mark as starting if database is connected - service can be functional + if health_status["database"] != "connected": + health_status["service"] = "starting" + + # Determine final service status + if service_issues: + health_status["service"] = "degraded" + health_status["issues"] = service_issues + elif health_status["service"] != "starting": + health_status["service"] = "healthy" return health_status +@app.get("/readiness") +async def readiness_check(): + global ftp_monitor, db_manager + + if not db_manager or not ftp_monitor: + raise HTTPException(status_code=503, detail="Service not ready - components not initialized") + + # Check database connectivity + try: + await db_manager.ping() + except Exception as e: + raise HTTPException(status_code=503, detail=f"Service not ready - database issue: {str(e)}") + + # FTP monitor should be at least initializing + ftp_status = ftp_monitor.get_status() + if ftp_status == "error": + raise HTTPException(status_code=503, detail="Service not ready - FTP monitor in error state") + + return { + "ready": True, + "timestamp": datetime.now().isoformat(), + "ftp_monitor_status": ftp_status + } + + @app.get("/status") async def get_status(): global ftp_monitor, db_manager @@ -117,6 +196,44 @@ async def trigger_manual_check(): raise HTTPException(status_code=500, detail=f"Check failed: {str(e)}") +@app.get("/scan-cache") +async def get_scan_cache(): + global db_manager + + if not db_manager: + raise HTTPException(status_code=503, detail="Database not initialized") + + try: + scanned_dirs = await db_manager.get_scanned_directories() + return { + "scanned_directories": scanned_dirs, + "total_directories": len(scanned_dirs), + "timestamp": datetime.now().isoformat() + } + except Exception as e: + logger.error(f"Error getting scan cache: {e}") + raise HTTPException(status_code=500, detail=f"Failed to get scan cache: {str(e)}") + + +@app.delete("/scan-cache") +async def clear_scan_cache(): + global db_manager + + if not db_manager: + raise HTTPException(status_code=503, detail="Database not initialized") + + try: + result = db_manager.collections['scanned_directories'].delete_many({}) + return { + "message": "Scan cache cleared successfully", + "deleted_count": result.deleted_count, + "timestamp": datetime.now().isoformat() + } + except Exception as e: + logger.error(f"Error clearing scan cache: {e}") + raise HTTPException(status_code=500, detail=f"Failed to clear scan cache: {str(e)}") + + if __name__ == "__main__": import uvicorn uvicorn.run("main:app", host="0.0.0.0", port=8008, reload=True) diff --git a/microservices/data-ingestion-service/test_health_check.py b/microservices/data-ingestion-service/test_health_check.py new file mode 100644 index 0000000..b9cbdaf --- /dev/null +++ b/microservices/data-ingestion-service/test_health_check.py @@ -0,0 +1,132 @@ +#!/usr/bin/env python3 +""" +Test script to verify health check improvements for data-ingestion-service +""" + +import asyncio +import aiohttp +import time +import sys +from datetime import datetime + +async def test_health_check(): + """Test the health check endpoint with improved startup handling""" + + print("Testing data-ingestion-service health check improvements...") + print("=" * 60) + + service_url = "http://localhost:8008" + + async with aiohttp.ClientSession() as session: + + # Test 1: Check if service responds at all + print("Test 1: Basic connectivity") + try: + async with session.get(f"{service_url}/", timeout=5) as response: + if response.status == 200: + data = await response.json() + print(f"✅ Service responding: {data['service']}") + else: + print(f"❌ Service returned status {response.status}") + except Exception as e: + print(f"❌ Service not reachable: {e}") + print("Make sure the service is running: python main.py") + return False + + print() + + # Test 2: Health check during startup (multiple checks) + print("Test 2: Health check progression during startup") + print("Checking health status every 2 seconds for 30 seconds...") + + startup_healthy = False + for i in range(15): # 30 seconds total + try: + async with session.get(f"{service_url}/health", timeout=3) as response: + data = await response.json() + service_status = data.get('service', 'unknown') + db_status = data.get('database', 'unknown') + ftp_status = data.get('ftp_monitor', 'unknown') + + status_icon = "✅" if service_status == "healthy" else "🟡" if service_status == "starting" else "❌" + + print(f" {i+1:2d}s: {status_icon} Service={service_status}, DB={db_status}, FTP={ftp_status}") + + if service_status == "healthy": + startup_healthy = True + print(f" 🎉 Service became healthy after {(i+1)*2} seconds!") + break + + if service_status not in ["starting", "healthy"]: + print(f" ❌ Service in unexpected state: {service_status}") + break + + except asyncio.TimeoutError: + print(f" {i+1:2d}s: ⏰ Health check timeout") + except Exception as e: + print(f" {i+1:2d}s: ❌ Error: {e}") + + await asyncio.sleep(2) + + print() + + # Test 3: Final detailed health status + print("Test 3: Detailed health status") + try: + async with session.get(f"{service_url}/health", timeout=5) as response: + data = await response.json() + print("Final health status:") + print(f" Service Status: {data.get('service', 'unknown')}") + print(f" Database: {data.get('database', 'unknown')}") + print(f" FTP Monitor: {data.get('ftp_monitor', 'unknown')}") + print(f" Last Check: {data.get('last_check', 'none')}") + print(f" Files Processed: {data.get('files_processed', 0)}") + if 'issues' in data: + print(f" Issues: {data['issues']}") + except Exception as e: + print(f"❌ Error getting final status: {e}") + + print() + + # Test 4: Readiness check + print("Test 4: Readiness check") + try: + async with session.get(f"{service_url}/readiness", timeout=5) as response: + if response.status == 200: + data = await response.json() + print(f"✅ Service is ready: {data.get('ready', False)}") + print(f" FTP Monitor Status: {data.get('ftp_monitor_status', 'unknown')}") + else: + text = await response.text() + print(f"❌ Service not ready (HTTP {response.status}): {text}") + except Exception as e: + print(f"❌ Error checking readiness: {e}") + + print() + print("=" * 60) + + if startup_healthy: + print("✅ SUCCESS: Service health check improvements are working!") + print(" - Service can become healthy even during FTP initialization") + print(" - Health checks are responsive and don't block") + return True + else: + print("⚠️ WARNING: Service didn't become healthy within 30 seconds") + print(" This might be expected if:") + print(" - Database connection is slow") + print(" - FTP server is unreachable") + print(" - Service is still initializing (check logs)") + return False + +if __name__ == "__main__": + print(f"Starting health check test at {datetime.now()}") + + try: + success = asyncio.run(test_health_check()) + sys.exit(0 if success else 1) + except KeyboardInterrupt: + print("\nTest interrupted by user") + sys.exit(1) + except Exception as e: + print(f"Test failed with error: {e}") + sys.exit(1) \ No newline at end of file