Implement iterative FTP scan and skip logic with processed file cache

- Add iterative directory scanning to prevent infinite recursion - Cache
processed files in memory to avoid redundant database lookups - Skip
already processed files using cache and database fallback - Add tests
for skip logic and iterative scan behavior - Change logging for MongoDB
connection and file storage to debug level - Clean up FastAPI app and
remove redundant docstrings
This commit is contained in:
rafaeldpsilva
2025-09-12 13:43:21 +01:00
parent a703240b27
commit aa07347604
8 changed files with 906 additions and 136 deletions

View File

@@ -12,8 +12,8 @@ FTP_CONFIG: Dict[str, Any] = {
"host": os.getenv("FTP_SA4CPS_HOST", "ftp.sa4cps.pt"),
"username": os.getenv("FTP_SA4CPS_USERNAME", "curvascarga@sa4cps.pt"),
"password": os.getenv("FTP_SA4CPS_PASSWORD", 'n$WFtz9+bleN'), # Set via environment variable
"base_path": os.getenv("FTP_SA4CPS_REMOTE_PATH", "/SLGs/Faial/"),
"check_interval": int(os.getenv("FTP_CHECK_INTERVAL", "21600")) # 6 hours default
"base_path": os.getenv("FTP_SA4CPS_REMOTE_PATH", "/SLGs/"),
"check_interval": int(os.getenv("FTP_CHECK_INTERVAL", "21600")), # 6 hours default
}
# MongoDB Configuration

View File

@@ -58,7 +58,7 @@ class DatabaseManager:
"""Close MongoDB connection"""
if self.client:
self.client.close()
logger.info("MongoDB connection closed")
logger.debug("MongoDB connection closed")
async def ping(self):
"""Test database connection"""
@@ -68,7 +68,7 @@ class DatabaseManager:
try:
# The ping command is cheap and does not require auth.
self.client.admin.command('ping')
logger.info("MongoDB ping successful")
logger.debug("MongoDB ping successful")
except ConnectionFailure as e:
logger.error(f"MongoDB ping failed - Server not available: {e}")
raise
@@ -121,7 +121,7 @@ class DatabaseManager:
if records:
result = self.collections['energy_data'].insert_many(records)
inserted_count = len(result.inserted_ids)
logger.info(f"Stored {inserted_count} records from {filename}")
logger.debug(f"Stored {inserted_count} records from {filename}")
return True
return False
@@ -163,6 +163,10 @@ class DatabaseManager:
logger.error(f"Error getting processed files: {e}")
return []
async def is_file_processed(self, filename: str) -> bool:
"""Mock check if file is processed"""
return filename in await self.get_processed_files()
async def get_file_info(self, filename: str) -> Optional[Dict[str, Any]]:
"""Get information about a specific file"""
try:

View File

@@ -48,11 +48,30 @@ class FTPMonitor:
logger.info(f"FTP Monitor initialized for {self.ftp_host}")
async def initialize_processed_files_cache(self):
"""Load already processed files from database into memory cache"""
try:
processed_file_names = await self.db_manager.get_processed_files()
# Convert filenames to full paths and add to processed_files set
for filename in processed_file_names:
# We'll use just the filename as the key since we check by filename
# But we need to be consistent with how we store paths
self.processed_files.add(filename)
logger.info(f"Loaded {len(processed_file_names)} already processed files from database")
return len(processed_file_names)
except Exception as e:
logger.error(f"Error loading processed files from database: {e}")
return 0
async def start_monitoring(self):
"""Start the monitoring loop"""
self.status = "running"
logger.info("Starting FTP monitoring loop")
# Initialize cache of processed files from database
await self.initialize_processed_files_cache()
while True:
try:
await self.check_for_new_files()
@@ -84,17 +103,35 @@ class FTPMonitor:
# Process new files
processed_count = 0
skipped_count = 0
for file_info in new_files:
if file_info.path not in self.processed_files:
success = await self._process_file(ftp, file_info)
if success:
self.processed_files.add(file_info.path)
processed_count += 1
self.files_processed_count += 1
# Check if file is already processed (using filename for cache consistency)
if file_info.name in self.processed_files:
logger.debug(f"Skipping already processed file (cached): {file_info.name}")
skipped_count += 1
continue
# Double-check with database (in case cache missed something)
if await self.db_manager.is_file_processed(file_info.name):
logger.debug(f"Skipping already processed file (database): {file_info.name}")
# Add to cache to avoid future database checks
self.processed_files.add(file_info.name)
skipped_count += 1
continue
# Process the file
logger.debug(f"Processing new file: {file_info.name}")
success = await self._process_file(ftp, file_info)
if success:
self.processed_files.add(file_info.name)
processed_count += 1
logger.debug(f"Successfully processed file: {file_info.name} ({processed_count} total)")
self.files_processed_count += 1
result = {
"files_found": len(new_files),
"files_processed": processed_count,
"files_skipped": skipped_count,
"timestamp": self.last_check.isoformat()
}
@@ -110,55 +147,115 @@ class FTPMonitor:
files = []
try:
# Navigate to base path
ftp.cwd(self.base_path)
logger.info(f"Scanning directory: {self.base_path}")
# Get directory listing
dir_list = []
ftp.retrlines('LIST', dir_list.append)
logger.info(f"Received {len(dir_list)} directory entries")
for line in dir_list:
print(line)
parts = line.split()
if len(parts) >= 9:
filename = parts[-1]
# Check if it's a .slg_v2 file
if filename.endswith('.sgl_v2'):
print('found file')
try:
size = int(parts[4])
full_path = f"{self.base_path.rstrip('/')}/{filename}"
files.append(FTPFileInfo(
path=full_path,
name=filename,
size=size
))
except (ValueError, IndexError):
logger.warning(f"Could not parse file info for: {filename}")
logger.info(f"Found {len(files)} .slg_v2 files")
await self._scan_directories_iterative(ftp, self.base_path, files)
logger.info(f"Found {len(files)} .slg_v2 files across all directories")
return files
except Exception as e:
logger.error(f"Error scanning FTP directory: {e}")
return []
async def _scan_directories_iterative(self, ftp: FTP, base_path: str, files: List[FTPFileInfo]):
"""Iteratively scan directories for .slg_v2 files using a queue approach"""
# Queue of directories to scan: (directory_path, depth)
directories_to_scan = [(base_path, 0)]
visited_dirs = set()
while directories_to_scan:
current_dir, current_depth = directories_to_scan.pop(0) # FIFO queue
# Normalize directory path
normalized_path = current_dir.rstrip('/') if current_dir != '/' else '/'
# Skip if already visited (loop prevention)
if normalized_path in visited_dirs:
logger.debug(f"Skipping already visited directory: {normalized_path}")
continue
# Mark as visited
visited_dirs.add(normalized_path)
logger.debug(f"Scanning directory: {normalized_path} (depth: {current_depth}, queue: {len(directories_to_scan)})")
try:
# Navigate to directory
original_dir = ftp.pwd()
ftp.cwd(current_dir)
# Get directory listing
dir_list = []
ftp.retrlines('LIST', dir_list.append)
logger.debug(f"Found {len(dir_list)} entries in {normalized_path}")
# Process entries
for line in dir_list:
parts = line.split()
if len(parts) >= 9:
filename = parts[-1]
permissions = parts[0]
# Skip current and parent directory references
if filename in ['.', '..']:
continue
# Handle directories
if permissions.startswith('d'):
# Create full subdirectory path
if normalized_path == '/':
subdirectory_path = f"/{filename}"
else:
subdirectory_path = f"{normalized_path}/{filename}"
# Normalize subdirectory path
subdirectory_normalized = subdirectory_path.rstrip('/') if subdirectory_path != '/' else '/'
# Add to queue if not already visited
if subdirectory_normalized not in visited_dirs:
directories_to_scan.append((subdirectory_path, current_depth + 1))
logger.debug(f"Added to queue: {subdirectory_path}")
else:
logger.debug(f"Skipping already visited: {subdirectory_path}")
# Handle .slg_v2 files
elif filename.endswith('.sgl_v2'):
logger.debug(f"Found .slg_v2 file: {filename} in {normalized_path}")
try:
size = int(parts[4])
if normalized_path == '/':
full_path = f"/{filename}"
else:
full_path = f"{normalized_path}/{filename}"
files.append(FTPFileInfo(
path=full_path,
name=filename,
size=size
))
except (ValueError, IndexError):
logger.warning(f"Could not parse file info for: {filename}")
# Return to original directory
ftp.cwd(original_dir)
logger.debug(f"Completed scanning: {normalized_path}")
except Exception as e:
logger.warning(f"Error scanning directory {normalized_path}: {e}")
continue
logger.info(f"Iterative scan completed. Visited {len(visited_dirs)} directories")
async def _process_file(self, ftp: FTP, file_info: FTPFileInfo) -> bool:
"""Download and process a .slg_v2 file"""
logger.info(f"Processing file: {file_info.name} ({file_info.size} bytes)")
logger.debug(f"Processing file: {file_info.path} ({file_info.size} bytes)")
try:
# Create temporary file for download
with tempfile.NamedTemporaryFile(mode='wb', suffix='.slg_v2', delete=False) as temp_file:
temp_path = temp_file.name
# Download file
# Download file using full path
with open(temp_path, 'wb') as f:
ftp.retrbinary(f'RETR {file_info.name}', f.write)
# Use the full path for RETR command
ftp.retrbinary(f'RETR {file_info.path}', f.write)
# Process the downloaded file
records = await self.processor.process_file(temp_path, file_info.name)
@@ -166,7 +263,7 @@ class FTPMonitor:
# Store in database
if records:
await self.db_manager.store_file_data(file_info.name, records)
logger.info(f"Stored {len(records)} records from {file_info.name}")
logger.debug(f"Stored {len(records)} records from {file_info.name}")
return True
else:
logger.warning(f"No valid records found in {file_info.name}")
@@ -205,5 +302,5 @@ class FTPMonitor:
"processed_files_count": len(self.processed_files),
"check_interval_hours": self.check_interval / 3600,
"ftp_host": self.ftp_host,
"base_path": self.base_path
"base_path": self.base_path,
}

View File

@@ -1,56 +1,43 @@
"""
SA4CPS Data Ingestion Service
Simple FTP monitoring service for .sgl_v2 files with MongoDB storage
"""
from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager
import asyncio
import logging
from datetime import datetime
from typing import Dict, Any
from typing import Any
from ftp_monitor import FTPMonitor
from database import DatabaseManager
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Global services
ftp_monitor = None
db_manager = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan management"""
global ftp_monitor, db_manager
logger.info("Starting SA4CPS Data Ingestion Service...")
# Initialize database connection
db_manager = DatabaseManager()
await db_manager.connect()
# Initialize FTP monitor
ftp_monitor = FTPMonitor(db_manager)
# Start background monitoring task
monitoring_task = asyncio.create_task(ftp_monitor.start_monitoring())
logger.info("Service started successfully")
yield
# Cleanup on shutdown
logger.info("Shutting down service...")
monitoring_task.cancel()
await db_manager.close()
logger.info("Service shutdown complete")
# Create FastAPI app
app = FastAPI(
title="SA4CPS Data Ingestion Service",
description="Monitors FTP server for .sgl_v2 files and stores data in MongoDB",
@@ -61,7 +48,6 @@ app = FastAPI(
@app.get("/")
async def root():
"""Root endpoint"""
return {
"service": "SA4CPS Data Ingestion Service",
"status": "running",
@@ -71,7 +57,6 @@ async def root():
@app.get("/health")
async def health_check():
"""Health check endpoint"""
global ftp_monitor, db_manager
health_status = {
@@ -101,7 +86,6 @@ async def health_check():
@app.get("/status")
async def get_status():
"""Detailed status endpoint"""
global ftp_monitor, db_manager
if not ftp_monitor:
@@ -116,7 +100,6 @@ async def get_status():
@app.post("/trigger-check")
async def trigger_manual_check():
"""Manually trigger FTP check"""
global ftp_monitor
if not ftp_monitor: