Add scan cache tracking and improve health checks
- Track scanned FTP directories in MongoDB to avoid redundant scans - Add endpoints to view and clear scan cache - Improve health check logic for better startup and error reporting - Add readiness endpoint for deployment probes - Add test script for health check improvements - Increase logging verbosity for debugging
This commit is contained in:
@@ -10,10 +10,10 @@ WORKDIR /app
|
||||
# Install system dependencies
|
||||
RUN apt-get update \
|
||||
&& apt-get install -y --no-install-recommends \
|
||||
build-essential \
|
||||
curl \
|
||||
libssl-dev \
|
||||
libffi-dev \
|
||||
build-essential \
|
||||
curl \
|
||||
libssl-dev \
|
||||
libffi-dev \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Copy requirements and install Python dependencies
|
||||
@@ -34,10 +34,10 @@ ENV PYTHONPATH="/app/src:$PYTHONPATH"
|
||||
# Expose port
|
||||
EXPOSE 8008
|
||||
|
||||
# Health check
|
||||
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
|
||||
# Health check - allow more time for service initialization
|
||||
HEALTHCHECK --interval=30s --timeout=10s --start-period=120s --retries=5 \
|
||||
CMD curl -f http://localhost:8008/health || exit 1
|
||||
|
||||
# Start the application from src directory
|
||||
WORKDIR /app/src
|
||||
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8008"]
|
||||
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8008"]
|
||||
|
||||
@@ -14,6 +14,7 @@ FTP_CONFIG: Dict[str, Any] = {
|
||||
"password": os.getenv("FTP_SA4CPS_PASSWORD", 'n$WFtz9+bleN'), # Set via environment variable
|
||||
"base_path": os.getenv("FTP_SA4CPS_REMOTE_PATH", "/SLGs/"),
|
||||
"check_interval": int(os.getenv("FTP_CHECK_INTERVAL", "21600")), # 6 hours default
|
||||
"skip_initial_scan": os.getenv("FTP_SKIP_INITIAL_SCAN", "true").lower() == "true",
|
||||
}
|
||||
|
||||
# MongoDB Configuration
|
||||
|
||||
@@ -1,8 +1,3 @@
|
||||
"""
|
||||
MongoDB Database Manager for SA4CPS Data Ingestion
|
||||
Simple sync MongoDB operations for storing .sgl_v2 file data
|
||||
"""
|
||||
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import List, Dict, Any, Optional
|
||||
@@ -15,37 +10,32 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class DatabaseManager:
|
||||
"""Manages MongoDB connections and operations for SA4CPS data"""
|
||||
|
||||
def __init__(self):
|
||||
self.client: Optional[MongoClient] = None
|
||||
self.db = None
|
||||
self.collections = {}
|
||||
|
||||
# MongoDB configuration
|
||||
self.connection_string = MONGO_CONFIG["connection_string"]
|
||||
self.database_name = MONGO_CONFIG["database_name"]
|
||||
|
||||
logger.info(f"Database manager initialized for: {self.database_name}")
|
||||
|
||||
async def connect(self):
|
||||
"""Connect to MongoDB"""
|
||||
try:
|
||||
logger.info(f"Connecting to MongoDB at: {self.connection_string}")
|
||||
self.client = MongoClient(self.connection_string, serverSelectionTimeoutMS=5000)
|
||||
|
||||
# Test connection
|
||||
await self.ping()
|
||||
|
||||
# Get database and collections
|
||||
self.db = self.client[self.database_name]
|
||||
self.collections = {
|
||||
'files': self.db.sa4cps_files,
|
||||
'energy_data': self.db.sa4cps_energy_data,
|
||||
'metadata': self.db.sa4cps_metadata
|
||||
'metadata': self.db.sa4cps_metadata,
|
||||
'scanned_directories': self.db.sa4cps_scanned_directories
|
||||
}
|
||||
|
||||
# Create indexes for better performance
|
||||
self._create_indexes()
|
||||
|
||||
logger.info(f"Connected to MongoDB database: {self.database_name}")
|
||||
@@ -66,9 +56,21 @@ class DatabaseManager:
|
||||
raise ConnectionFailure("No database connection")
|
||||
|
||||
try:
|
||||
# The ping command is cheap and does not require auth.
|
||||
self.client.admin.command('ping')
|
||||
# Use async approach with timeout
|
||||
import asyncio
|
||||
import concurrent.futures
|
||||
|
||||
# Run the ping command in a thread pool to avoid blocking
|
||||
loop = asyncio.get_event_loop()
|
||||
with concurrent.futures.ThreadPoolExecutor() as pool:
|
||||
await asyncio.wait_for(
|
||||
loop.run_in_executor(pool, self.client.admin.command, 'ping'),
|
||||
timeout=3.0 # 3 second timeout for ping
|
||||
)
|
||||
logger.debug("MongoDB ping successful")
|
||||
except asyncio.TimeoutError:
|
||||
logger.error("MongoDB ping timeout after 3 seconds")
|
||||
raise ConnectionFailure("MongoDB ping timeout")
|
||||
except ConnectionFailure as e:
|
||||
logger.error(f"MongoDB ping failed - Server not available: {e}")
|
||||
raise
|
||||
@@ -77,22 +79,22 @@ class DatabaseManager:
|
||||
raise ConnectionFailure(f"Ping failed: {e}")
|
||||
|
||||
def _create_indexes(self):
|
||||
"""Create database indexes for efficient queries"""
|
||||
try:
|
||||
# Index on files collection
|
||||
self.collections['files'].create_index("filename", unique=True)
|
||||
self.collections['files'].create_index("processed_at")
|
||||
|
||||
# Index on energy data collection
|
||||
self.collections['energy_data'].create_index([("filename", 1), ("timestamp", 1)])
|
||||
self.collections['energy_data'].create_index("timestamp")
|
||||
|
||||
self.collections['scanned_directories'].create_index("directory_path", unique=True)
|
||||
self.collections['scanned_directories'].create_index("last_scanned")
|
||||
self.collections['scanned_directories'].create_index("scan_status")
|
||||
|
||||
logger.info("Database indexes created successfully")
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to create indexes: {e}")
|
||||
|
||||
async def store_file_data(self, filename: str, records: List[Dict[str, Any]]) -> bool:
|
||||
"""Store processed .sgl_v2 file data in MongoDB"""
|
||||
try:
|
||||
current_time = datetime.now()
|
||||
|
||||
@@ -175,6 +177,77 @@ class DatabaseManager:
|
||||
logger.error(f"Error getting file info for {filename}: {e}")
|
||||
return None
|
||||
|
||||
# Directory scanning tracking methods
|
||||
async def is_directory_scanned(self, directory_path: str, since_timestamp: datetime = None) -> bool:
|
||||
"""Check if directory has been scanned recently"""
|
||||
try:
|
||||
query = {"directory_path": directory_path, "scan_status": "complete"}
|
||||
if since_timestamp:
|
||||
query["last_scanned"] = {"$gte": since_timestamp}
|
||||
|
||||
result = self.collections['scanned_directories'].find_one(query)
|
||||
return result is not None
|
||||
except Exception as e:
|
||||
logger.error(f"Error checking directory scan status for {directory_path}: {e}")
|
||||
return False
|
||||
|
||||
async def mark_directory_scanned(self, directory_path: str, file_count: int, ftp_last_modified: datetime = None) -> bool:
|
||||
"""Mark directory as scanned with current timestamp"""
|
||||
try:
|
||||
scan_record = {
|
||||
"directory_path": directory_path,
|
||||
"last_scanned": datetime.now(),
|
||||
"file_count": file_count,
|
||||
"scan_status": "complete"
|
||||
}
|
||||
|
||||
if ftp_last_modified:
|
||||
scan_record["ftp_last_modified"] = ftp_last_modified
|
||||
|
||||
# Use upsert to update existing or create new record
|
||||
self.collections['scanned_directories'].replace_one(
|
||||
{"directory_path": directory_path},
|
||||
scan_record,
|
||||
upsert=True
|
||||
)
|
||||
|
||||
logger.debug(f"Marked directory as scanned: {directory_path} ({file_count} files)")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error marking directory as scanned {directory_path}: {e}")
|
||||
return False
|
||||
|
||||
async def get_scanned_directories(self) -> List[Dict[str, Any]]:
|
||||
"""Get all scanned directory records"""
|
||||
try:
|
||||
cursor = self.collections['scanned_directories'].find()
|
||||
return list(cursor)
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting scanned directories: {e}")
|
||||
return []
|
||||
|
||||
async def should_skip_directory(self, directory_path: str, ftp_last_modified: datetime = None) -> bool:
|
||||
"""Determine if directory should be skipped based on scan history and modification time"""
|
||||
try:
|
||||
scan_record = self.collections['scanned_directories'].find_one(
|
||||
{"directory_path": directory_path, "scan_status": "complete"}
|
||||
)
|
||||
|
||||
if not scan_record:
|
||||
return False # Never scanned, should scan
|
||||
|
||||
# If we have FTP modification time and it's newer than our last scan, don't skip
|
||||
if ftp_last_modified and scan_record.get("last_scanned"):
|
||||
return ftp_last_modified <= scan_record["last_scanned"]
|
||||
|
||||
# If directory was scanned successfully, skip it (assuming it's historical data)
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error determining if directory should be skipped {directory_path}: {e}")
|
||||
return False
|
||||
|
||||
async def get_stats(self) -> Dict[str, Any]:
|
||||
"""Get database statistics"""
|
||||
try:
|
||||
|
||||
@@ -1,9 +1,3 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
FTP Monitor for SA4CPS .slg_v2 files
|
||||
Monitors ftp.sa4cps.pt for new monthly files
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
from ftplib import FTP
|
||||
import logging
|
||||
@@ -18,10 +12,8 @@ from slg_processor import SLGProcessor
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
@dataclass
|
||||
class FTPFileInfo:
|
||||
"""Information about an FTP file"""
|
||||
path: str
|
||||
name: str
|
||||
size: int
|
||||
@@ -29,8 +21,6 @@ class FTPFileInfo:
|
||||
|
||||
|
||||
class FTPMonitor:
|
||||
"""Monitors SA4CPS FTP server for new .slg_v2 files"""
|
||||
|
||||
def __init__(self, db_manager):
|
||||
self.db_manager = db_manager
|
||||
self.processor = SLGProcessor()
|
||||
@@ -39,87 +29,129 @@ class FTPMonitor:
|
||||
self.files_processed_count = 0
|
||||
self.status = "initializing"
|
||||
|
||||
# FTP connection settings
|
||||
self.ftp_host = FTP_CONFIG["host"]
|
||||
self.ftp_user = FTP_CONFIG["username"]
|
||||
self.ftp_pass = FTP_CONFIG["password"]
|
||||
self.base_path = FTP_CONFIG["base_path"]
|
||||
self.check_interval = FTP_CONFIG["check_interval"]
|
||||
self.skip_initial_scan = FTP_CONFIG["skip_initial_scan"]
|
||||
|
||||
logger.info(f"FTP Monitor initialized for {self.ftp_host}")
|
||||
|
||||
async def initialize_processed_files_cache(self):
|
||||
"""Load already processed files from database into memory cache"""
|
||||
try:
|
||||
processed_file_names = await self.db_manager.get_processed_files()
|
||||
# Convert filenames to full paths and add to processed_files set
|
||||
# Add timeout to prevent blocking startup indefinitely
|
||||
processed_file_names = await asyncio.wait_for(
|
||||
self.db_manager.get_processed_files(),
|
||||
timeout=10.0 # 10 second timeout
|
||||
)
|
||||
|
||||
for filename in processed_file_names:
|
||||
# We'll use just the filename as the key since we check by filename
|
||||
# But we need to be consistent with how we store paths
|
||||
self.processed_files.add(filename)
|
||||
|
||||
logger.info(f"Loaded {len(processed_file_names)} already processed files from database")
|
||||
return len(processed_file_names)
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning("Timeout loading processed files cache - continuing with empty cache")
|
||||
return 0
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading processed files from database: {e}")
|
||||
return 0
|
||||
|
||||
async def start_monitoring(self):
|
||||
"""Start the monitoring loop"""
|
||||
self.status = "running"
|
||||
self.status = "initializing"
|
||||
logger.info("Starting FTP monitoring loop")
|
||||
|
||||
# Initialize cache of processed files from database
|
||||
await self.initialize_processed_files_cache()
|
||||
try:
|
||||
await self.initialize_processed_files_cache()
|
||||
logger.info("FTP monitor initialization completed")
|
||||
except asyncio.CancelledError:
|
||||
logger.info("FTP monitor initialization cancelled")
|
||||
self.status = "stopped"
|
||||
return
|
||||
except Exception as e:
|
||||
logger.error(f"Error during FTP monitor initialization: {e}")
|
||||
self.status = "error"
|
||||
try:
|
||||
await asyncio.sleep(1800)
|
||||
except asyncio.CancelledError:
|
||||
logger.info("FTP monitor cancelled during error recovery")
|
||||
self.status = "stopped"
|
||||
return
|
||||
|
||||
self.status = "running"
|
||||
|
||||
# Optionally skip initial scan and wait for first scheduled interval
|
||||
if self.skip_initial_scan:
|
||||
logger.info(f"Skipping initial scan - waiting {self.check_interval/3600:.1f} hours for first scheduled check")
|
||||
try:
|
||||
await asyncio.sleep(self.check_interval)
|
||||
except asyncio.CancelledError:
|
||||
logger.info("FTP monitoring cancelled during initial wait")
|
||||
self.status = "stopped"
|
||||
return
|
||||
|
||||
while True:
|
||||
try:
|
||||
await self.check_for_new_files()
|
||||
# Add timeout to prevent indefinite blocking on FTP operations
|
||||
await asyncio.wait_for(self.check_for_new_files(), timeout=300.0) # 5 minute timeout
|
||||
self.status = "running"
|
||||
|
||||
# Wait for next check (6 hours)
|
||||
logger.info(f"Waiting {self.check_interval/3600:.1f} hours until next check")
|
||||
await asyncio.sleep(self.check_interval)
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning("FTP check timed out after 5 minutes - will retry")
|
||||
self.status = "error"
|
||||
try:
|
||||
await asyncio.sleep(900) # Wait 15 minutes before retry
|
||||
except asyncio.CancelledError:
|
||||
logger.info("FTP monitoring task cancelled during timeout recovery")
|
||||
self.status = "stopped"
|
||||
break
|
||||
except asyncio.CancelledError:
|
||||
logger.info("FTP monitoring task cancelled - shutting down gracefully")
|
||||
self.status = "stopped"
|
||||
break
|
||||
except Exception as e:
|
||||
self.status = "error"
|
||||
logger.error(f"Error in monitoring loop: {e}")
|
||||
# Wait 30 minutes before retrying on error
|
||||
await asyncio.sleep(1800)
|
||||
try:
|
||||
await asyncio.sleep(1800) # Wait 30 minutes before retry
|
||||
except asyncio.CancelledError:
|
||||
logger.info("FTP monitoring task cancelled during error recovery")
|
||||
self.status = "stopped"
|
||||
break
|
||||
|
||||
async def check_for_new_files(self) -> Dict[str, Any]:
|
||||
"""Check FTP server for new .slg_v2 files"""
|
||||
self.last_check = datetime.now()
|
||||
logger.info(f"Checking FTP server at {self.last_check}")
|
||||
|
||||
try:
|
||||
# Connect to FTP server
|
||||
with FTP(self.ftp_host) as ftp:
|
||||
ftp.login(self.ftp_user, self.ftp_pass)
|
||||
logger.info(f"Connected to FTP server: {self.ftp_host}")
|
||||
|
||||
# Find .slg_v2 files
|
||||
new_files = await self._find_slg_files(ftp)
|
||||
|
||||
# Process new files
|
||||
processed_count = 0
|
||||
skipped_count = 0
|
||||
for file_info in new_files:
|
||||
# Check if file is already processed (using filename for cache consistency)
|
||||
# Check for cancellation during file processing loop
|
||||
if asyncio.current_task().cancelled():
|
||||
raise asyncio.CancelledError()
|
||||
|
||||
if file_info.name in self.processed_files:
|
||||
logger.debug(f"Skipping already processed file (cached): {file_info.name}")
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
# Double-check with database (in case cache missed something)
|
||||
if await self.db_manager.is_file_processed(file_info.name):
|
||||
logger.debug(f"Skipping already processed file (database): {file_info.name}")
|
||||
# Add to cache to avoid future database checks
|
||||
self.processed_files.add(file_info.name)
|
||||
skipped_count += 1
|
||||
continue
|
||||
|
||||
# Process the file
|
||||
logger.debug(f"Processing new file: {file_info.name}")
|
||||
success = await self._process_file(ftp, file_info)
|
||||
if success:
|
||||
@@ -143,7 +175,6 @@ class FTPMonitor:
|
||||
raise
|
||||
|
||||
async def _find_slg_files(self, ftp: FTP) -> List[FTPFileInfo]:
|
||||
"""Find .sgl_v2 files in the FTP directory structure"""
|
||||
files = []
|
||||
|
||||
try:
|
||||
@@ -155,66 +186,65 @@ class FTPMonitor:
|
||||
return []
|
||||
|
||||
async def _scan_directories_iterative(self, ftp: FTP, base_path: str, files: List[FTPFileInfo]):
|
||||
"""Iteratively scan directories for .slg_v2 files using a queue approach"""
|
||||
# Queue of directories to scan: (directory_path, depth)
|
||||
directories_to_scan = [(base_path, 0)]
|
||||
visited_dirs = set()
|
||||
skipped_dirs = 0
|
||||
scanned_dirs = 0
|
||||
|
||||
while directories_to_scan:
|
||||
current_dir, current_depth = directories_to_scan.pop(0) # FIFO queue
|
||||
|
||||
# Normalize directory path
|
||||
normalized_path = current_dir.rstrip('/') if current_dir != '/' else '/'
|
||||
|
||||
# Skip if already visited (loop prevention)
|
||||
if normalized_path in visited_dirs:
|
||||
logger.debug(f"Skipping already visited directory: {normalized_path}")
|
||||
continue
|
||||
|
||||
# Mark as visited
|
||||
visited_dirs.add(normalized_path)
|
||||
|
||||
# Check if directory should be skipped based on previous scans
|
||||
if await self.db_manager.should_skip_directory(normalized_path):
|
||||
logger.info(f"Skipping previously scanned directory: {normalized_path}")
|
||||
skipped_dirs += 1
|
||||
continue
|
||||
|
||||
logger.debug(f"Scanning directory: {normalized_path} (depth: {current_depth}, queue: {len(directories_to_scan)})")
|
||||
scanned_dirs += 1
|
||||
|
||||
try:
|
||||
# Navigate to directory
|
||||
original_dir = ftp.pwd()
|
||||
ftp.cwd(current_dir)
|
||||
|
||||
# Get directory listing
|
||||
dir_list = []
|
||||
ftp.retrlines('LIST', dir_list.append)
|
||||
logger.debug(f"Found {len(dir_list)} entries in {normalized_path}")
|
||||
|
||||
# Process entries
|
||||
# Count files found in this directory
|
||||
files_found_in_dir = 0
|
||||
|
||||
for line in dir_list:
|
||||
parts = line.split()
|
||||
if len(parts) >= 9:
|
||||
filename = parts[-1]
|
||||
permissions = parts[0]
|
||||
|
||||
# Skip current and parent directory references
|
||||
if filename in ['.', '..']:
|
||||
continue
|
||||
|
||||
# Handle directories
|
||||
if permissions.startswith('d'):
|
||||
# Create full subdirectory path
|
||||
if normalized_path == '/':
|
||||
subdirectory_path = f"/{filename}"
|
||||
else:
|
||||
subdirectory_path = f"{normalized_path}/{filename}"
|
||||
|
||||
# Normalize subdirectory path
|
||||
subdirectory_normalized = subdirectory_path.rstrip('/') if subdirectory_path != '/' else '/'
|
||||
|
||||
# Add to queue if not already visited
|
||||
if subdirectory_normalized not in visited_dirs:
|
||||
directories_to_scan.append((subdirectory_path, current_depth + 1))
|
||||
logger.debug(f"Added to queue: {subdirectory_path}")
|
||||
else:
|
||||
logger.debug(f"Skipping already visited: {subdirectory_path}")
|
||||
|
||||
# Handle .slg_v2 files
|
||||
elif filename.endswith('.sgl_v2'):
|
||||
logger.debug(f"Found .slg_v2 file: {filename} in {normalized_path}")
|
||||
try:
|
||||
@@ -229,38 +259,35 @@ class FTPMonitor:
|
||||
name=filename,
|
||||
size=size
|
||||
))
|
||||
files_found_in_dir += 1
|
||||
|
||||
except (ValueError, IndexError):
|
||||
logger.warning(f"Could not parse file info for: {filename}")
|
||||
|
||||
# Return to original directory
|
||||
ftp.cwd(original_dir)
|
||||
logger.debug(f"Completed scanning: {normalized_path}")
|
||||
|
||||
# Mark directory as scanned
|
||||
await self.db_manager.mark_directory_scanned(normalized_path, files_found_in_dir)
|
||||
logger.debug(f"Completed scanning: {normalized_path} ({files_found_in_dir} files found)")
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Error scanning directory {normalized_path}: {e}")
|
||||
continue
|
||||
|
||||
logger.info(f"Iterative scan completed. Visited {len(visited_dirs)} directories")
|
||||
logger.info(f"Iterative scan completed. Scanned: {scanned_dirs} directories, Skipped: {skipped_dirs} directories (Total visited: {len(visited_dirs)})")
|
||||
|
||||
async def _process_file(self, ftp: FTP, file_info: FTPFileInfo) -> bool:
|
||||
"""Download and process a .slg_v2 file"""
|
||||
logger.debug(f"Processing file: {file_info.path} ({file_info.size} bytes)")
|
||||
|
||||
try:
|
||||
# Create temporary file for download
|
||||
with tempfile.NamedTemporaryFile(mode='wb', suffix='.slg_v2', delete=False) as temp_file:
|
||||
temp_path = temp_file.name
|
||||
|
||||
# Download file using full path
|
||||
with open(temp_path, 'wb') as f:
|
||||
# Use the full path for RETR command
|
||||
ftp.retrbinary(f'RETR {file_info.path}', f.write)
|
||||
|
||||
# Process the downloaded file
|
||||
records = await self.processor.process_file(temp_path, file_info.name)
|
||||
|
||||
# Store in database
|
||||
if records:
|
||||
await self.db_manager.store_file_data(file_info.name, records)
|
||||
logger.debug(f"Stored {len(records)} records from {file_info.name}")
|
||||
@@ -274,7 +301,6 @@ class FTPMonitor:
|
||||
return False
|
||||
|
||||
finally:
|
||||
# Clean up temporary file
|
||||
try:
|
||||
if 'temp_path' in locals():
|
||||
os.unlink(temp_path)
|
||||
@@ -282,19 +308,15 @@ class FTPMonitor:
|
||||
pass
|
||||
|
||||
def get_status(self) -> str:
|
||||
"""Get current monitor status"""
|
||||
return self.status
|
||||
|
||||
def get_last_check_time(self) -> Optional[str]:
|
||||
"""Get last check time as ISO string"""
|
||||
return self.last_check.isoformat() if self.last_check else None
|
||||
|
||||
def get_processed_count(self) -> int:
|
||||
"""Get total number of files processed"""
|
||||
return self.files_processed_count
|
||||
|
||||
def get_detailed_status(self) -> Dict[str, Any]:
|
||||
"""Get detailed status information"""
|
||||
return {
|
||||
"status": self.status,
|
||||
"last_check": self.get_last_check_time(),
|
||||
|
||||
@@ -8,7 +8,7 @@ from typing import Any
|
||||
from ftp_monitor import FTPMonitor
|
||||
from database import DatabaseManager
|
||||
|
||||
logging.basicConfig(level=logging.INFO)
|
||||
logging.basicConfig(level=logging.DEBUG)
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
ftp_monitor = None
|
||||
@@ -23,18 +23,36 @@ async def lifespan(app: FastAPI):
|
||||
|
||||
db_manager = DatabaseManager()
|
||||
await db_manager.connect()
|
||||
logger.info("Database connection established")
|
||||
|
||||
ftp_monitor = FTPMonitor(db_manager)
|
||||
logger.info("FTP monitor created")
|
||||
|
||||
monitoring_task = asyncio.create_task(ftp_monitor.start_monitoring())
|
||||
logger.info("FTP monitoring task started in background")
|
||||
|
||||
logger.info("Service started successfully")
|
||||
logger.info("Service startup complete - HTTP server ready to accept requests")
|
||||
|
||||
yield
|
||||
|
||||
logger.info("Shutting down service...")
|
||||
monitoring_task.cancel()
|
||||
await db_manager.close()
|
||||
|
||||
# Cancel monitoring task and wait for graceful shutdown
|
||||
if not monitoring_task.done():
|
||||
monitoring_task.cancel()
|
||||
try:
|
||||
await asyncio.wait_for(monitoring_task, timeout=5.0)
|
||||
logger.info("Monitoring task stopped gracefully")
|
||||
except asyncio.TimeoutError:
|
||||
logger.warning("Monitoring task shutdown timeout - forcing termination")
|
||||
except asyncio.CancelledError:
|
||||
logger.info("Monitoring task cancelled successfully")
|
||||
|
||||
# Close database connection
|
||||
if db_manager:
|
||||
await db_manager.close()
|
||||
logger.info("Database connection closed")
|
||||
|
||||
logger.info("Service shutdown complete")
|
||||
|
||||
|
||||
@@ -66,24 +84,85 @@ async def health_check():
|
||||
"ftp_monitor": "unknown"
|
||||
}
|
||||
|
||||
service_issues = []
|
||||
|
||||
# Check database connection
|
||||
if db_manager:
|
||||
try:
|
||||
await db_manager.ping()
|
||||
health_status["database"] = "connected"
|
||||
except Exception:
|
||||
except Exception as e:
|
||||
health_status["database"] = "disconnected"
|
||||
health_status["service"] = "degraded"
|
||||
service_issues.append("database_disconnected")
|
||||
logger.warning(f"Database health check failed: {e}")
|
||||
else:
|
||||
health_status["database"] = "not_initialized"
|
||||
health_status["service"] = "starting"
|
||||
|
||||
# Check FTP monitor status
|
||||
if ftp_monitor:
|
||||
health_status["ftp_monitor"] = ftp_monitor.get_status()
|
||||
health_status["last_check"] = ftp_monitor.get_last_check_time()
|
||||
health_status["files_processed"] = ftp_monitor.get_processed_count()
|
||||
ftp_status = ftp_monitor.get_status()
|
||||
health_status["ftp_monitor"] = ftp_status
|
||||
|
||||
try:
|
||||
health_status["last_check"] = ftp_monitor.get_last_check_time()
|
||||
health_status["files_processed"] = ftp_monitor.get_processed_count()
|
||||
except:
|
||||
# Don't fail health check if optional status fields fail
|
||||
pass
|
||||
|
||||
# Improved service status logic - be more tolerant during startup
|
||||
if ftp_status == "initializing":
|
||||
# Service is initializing but can still be considered healthy for basic operations
|
||||
if health_status["database"] == "connected":
|
||||
health_status["service"] = "healthy" # Database is ready, FTP is starting
|
||||
else:
|
||||
health_status["service"] = "starting"
|
||||
elif ftp_status == "error":
|
||||
service_issues.append("ftp_monitor_error")
|
||||
elif ftp_status == "running":
|
||||
pass # Keep healthy status
|
||||
else:
|
||||
health_status["ftp_monitor"] = "not_initialized"
|
||||
# Don't mark as starting if database is connected - service can be functional
|
||||
if health_status["database"] != "connected":
|
||||
health_status["service"] = "starting"
|
||||
|
||||
# Determine final service status
|
||||
if service_issues:
|
||||
health_status["service"] = "degraded"
|
||||
health_status["issues"] = service_issues
|
||||
elif health_status["service"] != "starting":
|
||||
health_status["service"] = "healthy"
|
||||
|
||||
return health_status
|
||||
|
||||
|
||||
@app.get("/readiness")
|
||||
async def readiness_check():
|
||||
global ftp_monitor, db_manager
|
||||
|
||||
if not db_manager or not ftp_monitor:
|
||||
raise HTTPException(status_code=503, detail="Service not ready - components not initialized")
|
||||
|
||||
# Check database connectivity
|
||||
try:
|
||||
await db_manager.ping()
|
||||
except Exception as e:
|
||||
raise HTTPException(status_code=503, detail=f"Service not ready - database issue: {str(e)}")
|
||||
|
||||
# FTP monitor should be at least initializing
|
||||
ftp_status = ftp_monitor.get_status()
|
||||
if ftp_status == "error":
|
||||
raise HTTPException(status_code=503, detail="Service not ready - FTP monitor in error state")
|
||||
|
||||
return {
|
||||
"ready": True,
|
||||
"timestamp": datetime.now().isoformat(),
|
||||
"ftp_monitor_status": ftp_status
|
||||
}
|
||||
|
||||
|
||||
@app.get("/status")
|
||||
async def get_status():
|
||||
global ftp_monitor, db_manager
|
||||
@@ -117,6 +196,44 @@ async def trigger_manual_check():
|
||||
raise HTTPException(status_code=500, detail=f"Check failed: {str(e)}")
|
||||
|
||||
|
||||
@app.get("/scan-cache")
|
||||
async def get_scan_cache():
|
||||
global db_manager
|
||||
|
||||
if not db_manager:
|
||||
raise HTTPException(status_code=503, detail="Database not initialized")
|
||||
|
||||
try:
|
||||
scanned_dirs = await db_manager.get_scanned_directories()
|
||||
return {
|
||||
"scanned_directories": scanned_dirs,
|
||||
"total_directories": len(scanned_dirs),
|
||||
"timestamp": datetime.now().isoformat()
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting scan cache: {e}")
|
||||
raise HTTPException(status_code=500, detail=f"Failed to get scan cache: {str(e)}")
|
||||
|
||||
|
||||
@app.delete("/scan-cache")
|
||||
async def clear_scan_cache():
|
||||
global db_manager
|
||||
|
||||
if not db_manager:
|
||||
raise HTTPException(status_code=503, detail="Database not initialized")
|
||||
|
||||
try:
|
||||
result = db_manager.collections['scanned_directories'].delete_many({})
|
||||
return {
|
||||
"message": "Scan cache cleared successfully",
|
||||
"deleted_count": result.deleted_count,
|
||||
"timestamp": datetime.now().isoformat()
|
||||
}
|
||||
except Exception as e:
|
||||
logger.error(f"Error clearing scan cache: {e}")
|
||||
raise HTTPException(status_code=500, detail=f"Failed to clear scan cache: {str(e)}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import uvicorn
|
||||
uvicorn.run("main:app", host="0.0.0.0", port=8008, reload=True)
|
||||
|
||||
132
microservices/data-ingestion-service/test_health_check.py
Normal file
132
microservices/data-ingestion-service/test_health_check.py
Normal file
@@ -0,0 +1,132 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Test script to verify health check improvements for data-ingestion-service
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import aiohttp
|
||||
import time
|
||||
import sys
|
||||
from datetime import datetime
|
||||
|
||||
async def test_health_check():
|
||||
"""Test the health check endpoint with improved startup handling"""
|
||||
|
||||
print("Testing data-ingestion-service health check improvements...")
|
||||
print("=" * 60)
|
||||
|
||||
service_url = "http://localhost:8008"
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
|
||||
# Test 1: Check if service responds at all
|
||||
print("Test 1: Basic connectivity")
|
||||
try:
|
||||
async with session.get(f"{service_url}/", timeout=5) as response:
|
||||
if response.status == 200:
|
||||
data = await response.json()
|
||||
print(f"✅ Service responding: {data['service']}")
|
||||
else:
|
||||
print(f"❌ Service returned status {response.status}")
|
||||
except Exception as e:
|
||||
print(f"❌ Service not reachable: {e}")
|
||||
print("Make sure the service is running: python main.py")
|
||||
return False
|
||||
|
||||
print()
|
||||
|
||||
# Test 2: Health check during startup (multiple checks)
|
||||
print("Test 2: Health check progression during startup")
|
||||
print("Checking health status every 2 seconds for 30 seconds...")
|
||||
|
||||
startup_healthy = False
|
||||
for i in range(15): # 30 seconds total
|
||||
try:
|
||||
async with session.get(f"{service_url}/health", timeout=3) as response:
|
||||
data = await response.json()
|
||||
service_status = data.get('service', 'unknown')
|
||||
db_status = data.get('database', 'unknown')
|
||||
ftp_status = data.get('ftp_monitor', 'unknown')
|
||||
|
||||
status_icon = "✅" if service_status == "healthy" else "🟡" if service_status == "starting" else "❌"
|
||||
|
||||
print(f" {i+1:2d}s: {status_icon} Service={service_status}, DB={db_status}, FTP={ftp_status}")
|
||||
|
||||
if service_status == "healthy":
|
||||
startup_healthy = True
|
||||
print(f" 🎉 Service became healthy after {(i+1)*2} seconds!")
|
||||
break
|
||||
|
||||
if service_status not in ["starting", "healthy"]:
|
||||
print(f" ❌ Service in unexpected state: {service_status}")
|
||||
break
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
print(f" {i+1:2d}s: ⏰ Health check timeout")
|
||||
except Exception as e:
|
||||
print(f" {i+1:2d}s: ❌ Error: {e}")
|
||||
|
||||
await asyncio.sleep(2)
|
||||
|
||||
print()
|
||||
|
||||
# Test 3: Final detailed health status
|
||||
print("Test 3: Detailed health status")
|
||||
try:
|
||||
async with session.get(f"{service_url}/health", timeout=5) as response:
|
||||
data = await response.json()
|
||||
print("Final health status:")
|
||||
print(f" Service Status: {data.get('service', 'unknown')}")
|
||||
print(f" Database: {data.get('database', 'unknown')}")
|
||||
print(f" FTP Monitor: {data.get('ftp_monitor', 'unknown')}")
|
||||
print(f" Last Check: {data.get('last_check', 'none')}")
|
||||
print(f" Files Processed: {data.get('files_processed', 0)}")
|
||||
if 'issues' in data:
|
||||
print(f" Issues: {data['issues']}")
|
||||
except Exception as e:
|
||||
print(f"❌ Error getting final status: {e}")
|
||||
|
||||
print()
|
||||
|
||||
# Test 4: Readiness check
|
||||
print("Test 4: Readiness check")
|
||||
try:
|
||||
async with session.get(f"{service_url}/readiness", timeout=5) as response:
|
||||
if response.status == 200:
|
||||
data = await response.json()
|
||||
print(f"✅ Service is ready: {data.get('ready', False)}")
|
||||
print(f" FTP Monitor Status: {data.get('ftp_monitor_status', 'unknown')}")
|
||||
else:
|
||||
text = await response.text()
|
||||
print(f"❌ Service not ready (HTTP {response.status}): {text}")
|
||||
except Exception as e:
|
||||
print(f"❌ Error checking readiness: {e}")
|
||||
|
||||
print()
|
||||
print("=" * 60)
|
||||
|
||||
if startup_healthy:
|
||||
print("✅ SUCCESS: Service health check improvements are working!")
|
||||
print(" - Service can become healthy even during FTP initialization")
|
||||
print(" - Health checks are responsive and don't block")
|
||||
return True
|
||||
else:
|
||||
print("⚠️ WARNING: Service didn't become healthy within 30 seconds")
|
||||
print(" This might be expected if:")
|
||||
print(" - Database connection is slow")
|
||||
print(" - FTP server is unreachable")
|
||||
print(" - Service is still initializing (check logs)")
|
||||
return False
|
||||
|
||||
if __name__ == "__main__":
|
||||
print(f"Starting health check test at {datetime.now()}")
|
||||
|
||||
try:
|
||||
success = asyncio.run(test_health_check())
|
||||
sys.exit(0 if success else 1)
|
||||
except KeyboardInterrupt:
|
||||
print("\nTest interrupted by user")
|
||||
sys.exit(1)
|
||||
except Exception as e:
|
||||
print(f"Test failed with error: {e}")
|
||||
sys.exit(1)
|
||||
Reference in New Issue
Block a user