""" Data Ingestion Service Monitors FTP servers for new time series data from real communities and publishes to Redis. Provides realistic data feeds for simulation and analytics. Port: 8008 """ import asyncio from datetime import datetime, timedelta from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks from fastapi.middleware.cors import CORSMiddleware from contextlib import asynccontextmanager import logging from typing import List, Optional, Dict, Any import json from bson import ObjectId from .models import ( DataSourceCreate, DataSourceUpdate, DataSourceResponse, FileProcessingRequest, FileProcessingResponse, IngestionStats, HealthStatus, QualityReport, TopicInfo, PublishingStats ) from .database import db_manager, get_database, get_redis, DatabaseService from .ftp_monitor import FTPMonitor from .data_processor import DataProcessor from .redis_publisher import RedisPublisher from .data_validator import DataValidator from .monitoring import ServiceMonitor, PerformanceMonitor, ErrorHandler # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @asynccontextmanager async def lifespan(app: FastAPI): """Application lifespan manager""" logger.info("Data Ingestion Service starting up...") try: # Connect to databases await db_manager.connect() # Initialize core components await initialize_data_sources() await initialize_components() # Start background tasks asyncio.create_task(ftp_monitoring_task()) asyncio.create_task(data_processing_task()) asyncio.create_task(health_monitoring_task()) asyncio.create_task(cleanup_task()) logger.info("Data Ingestion Service startup complete") yield except Exception as e: logger.error(f"Error during startup: {e}") raise finally: logger.info("Data Ingestion Service shutting down...") await db_manager.disconnect() logger.info("Data Ingestion Service shutdown complete") app = FastAPI( title="Data Ingestion Service", description="FTP monitoring and time series data ingestion for real community data simulation", version="1.0.0", lifespan=lifespan ) app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) # Global components ftp_monitor = None data_processor = None redis_publisher = None data_validator = None service_monitor = None # Dependencies async def get_db(): return await get_database() async def get_ftp_monitor(): global ftp_monitor if not ftp_monitor: db = await get_database() redis = await get_redis() ftp_monitor = FTPMonitor(db, redis) return ftp_monitor async def get_data_processor(): global data_processor if not data_processor: db = await get_database() redis = await get_redis() data_processor = DataProcessor(db, redis) return data_processor async def get_redis_publisher(): global redis_publisher if not redis_publisher: redis = await get_redis() redis_publisher = RedisPublisher(redis) return redis_publisher async def get_data_validator(): global data_validator if not data_validator: db = await get_database() redis = await get_redis() data_validator = DataValidator(db, redis) return data_validator @app.get("/health", response_model=HealthStatus) async def health_check(): """Health check endpoint""" try: # Get database health health_data = await db_manager.health_check() # Get FTP connections status ftp_status = await check_ftp_connections() # Calculate uptime app_start_time = getattr(app.state, 'start_time', datetime.utcnow()) uptime = (datetime.utcnow() - app_start_time).total_seconds() # Get processing stats processing_stats = await get_processing_queue_size() overall_status = "healthy" if not health_data["mongodb"] or not health_data["redis"]: overall_status = "degraded" elif ftp_status["healthy_connections"] == 0 and ftp_status["total_connections"] > 0: overall_status = "degraded" return HealthStatus( status=overall_status, timestamp=datetime.utcnow(), uptime_seconds=uptime, active_sources=ftp_status["healthy_connections"], total_processed_files=processing_stats.get("total_processed", 0), redis_connected=health_data["redis"], mongodb_connected=health_data["mongodb"], last_error=None ) except Exception as e: logger.error(f"Health check failed: {e}") return HealthStatus( status="unhealthy", timestamp=datetime.utcnow(), uptime_seconds=0, active_sources=0, total_processed_files=0, redis_connected=False, mongodb_connected=False, last_error=str(e) ) @app.get("/stats", response_model=IngestionStats) async def get_ingestion_stats(): """Get data ingestion statistics""" try: db = await get_database() # Get statistics from database stats_data = await db.ingestion_stats.find_one( {"date": datetime.utcnow().strftime("%Y-%m-%d")} ) or {} return IngestionStats( files_processed_today=stats_data.get("files_processed", 0), records_ingested_today=stats_data.get("records_ingested", 0), errors_today=stats_data.get("errors", 0), data_sources_active=stats_data.get("active_sources", 0), average_processing_time_ms=stats_data.get("avg_processing_time", 0), last_successful_ingestion=stats_data.get("last_success"), redis_messages_published=stats_data.get("redis_published", 0), data_quality_score=stats_data.get("quality_score", 100.0) ) except Exception as e: logger.error(f"Error getting ingestion stats: {e}") raise HTTPException(status_code=500, detail="Internal server error") @app.get("/sources") async def get_data_sources(): """Get configured data sources""" try: db = await get_database() cursor = db.data_sources.find({}) sources = [] async for source in cursor: source["_id"] = str(source["_id"]) # Convert datetime fields for field in ["created_at", "updated_at", "last_check", "last_success"]: if field in source and source[field]: source[field] = source[field].isoformat() sources.append(source) return { "sources": sources, "count": len(sources) } except Exception as e: logger.error(f"Error getting data sources: {e}") raise HTTPException(status_code=500, detail="Internal server error") @app.post("/sources") async def create_data_source( source_config: DataSourceCreate, background_tasks: BackgroundTasks ): """Create a new data source""" try: db = await get_database() # Create source document source_doc = { "name": source_config.name, "description": source_config.description, "source_type": source_config.source_type, "ftp_config": source_config.ftp_config.dict() if source_config.ftp_config else None, "file_patterns": source_config.file_patterns, "data_format": source_config.data_format.value, "topics": [topic.dict() for topic in source_config.topics], "redis_topics": [topic.topic_name for topic in source_config.topics], "enabled": source_config.enabled, "check_interval_seconds": source_config.polling_interval_minutes * 60, "max_file_size_mb": source_config.max_file_size_mb, "created_at": datetime.utcnow(), "updated_at": datetime.utcnow(), "status": "created" } result = await db.data_sources.insert_one(source_doc) # Test connection in background background_tasks.add_task(test_data_source_connection, str(result.inserted_id)) return { "message": "Data source created successfully", "source_id": str(result.inserted_id), "name": source_config.name } except Exception as e: logger.error(f"Error creating data source: {e}") raise HTTPException(status_code=500, detail="Internal server error") @app.put("/sources/{source_id}") async def update_data_source( source_id: str, source_config: DataSourceUpdate ): """Update an existing data source""" try: db = await get_database() update_doc = {} if source_config.name is not None: update_doc["name"] = source_config.name if source_config.description is not None: update_doc["description"] = source_config.description if source_config.ftp_config is not None: update_doc["ftp_config"] = source_config.ftp_config.dict() if source_config.file_patterns is not None: update_doc["file_patterns"] = source_config.file_patterns if source_config.data_format is not None: update_doc["data_format"] = source_config.data_format.value if source_config.topics is not None: update_doc["topics"] = [topic.dict() for topic in source_config.topics] update_doc["redis_topics"] = [topic.topic_name for topic in source_config.topics] if source_config.enabled is not None: update_doc["enabled"] = source_config.enabled if source_config.polling_interval_minutes is not None: update_doc["check_interval_seconds"] = source_config.polling_interval_minutes * 60 if source_config.max_file_size_mb is not None: update_doc["max_file_size_mb"] = source_config.max_file_size_mb update_doc["updated_at"] = datetime.utcnow() result = await db.data_sources.update_one( {"_id": ObjectId(source_id)}, {"$set": update_doc} ) if result.matched_count == 0: raise HTTPException(status_code=404, detail="Data source not found") return { "message": "Data source updated successfully", "source_id": source_id } except HTTPException: raise except Exception as e: logger.error(f"Error updating data source: {e}") raise HTTPException(status_code=500, detail="Internal server error") @app.delete("/sources/{source_id}") async def delete_data_source(source_id: str): """Delete a data source""" try: db = await get_database() result = await db.data_sources.delete_one({"_id": ObjectId(source_id)}) if result.deleted_count == 0: raise HTTPException(status_code=404, detail="Data source not found") return { "message": "Data source deleted successfully", "source_id": source_id } except HTTPException: raise except Exception as e: logger.error(f"Error deleting data source: {e}") raise HTTPException(status_code=500, detail="Internal server error") @app.post("/sources/{source_id}/test") async def test_data_source(source_id: str): """Test connection to a data source""" try: db = await get_database() source = await db.data_sources.find_one({"_id": ObjectId(source_id)}) if not source: raise HTTPException(status_code=404, detail="Data source not found") monitor = await get_ftp_monitor() test_result = await monitor.test_connection(source) return { "source_id": source_id, "connection_test": test_result, "tested_at": datetime.utcnow().isoformat() } except HTTPException: raise except Exception as e: logger.error(f"Error testing data source: {e}") raise HTTPException(status_code=500, detail="Internal server error") @app.post("/sources/{source_id}/trigger") async def trigger_manual_check( source_id: str, background_tasks: BackgroundTasks ): """Manually trigger a check for new data""" try: db = await get_database() source = await db.data_sources.find_one({"_id": ObjectId(source_id)}) if not source: raise HTTPException(status_code=404, detail="Data source not found") # Trigger check in background background_tasks.add_task(process_data_source, source) return { "message": "Manual check triggered", "source_id": source_id, "triggered_at": datetime.utcnow().isoformat() } except HTTPException: raise except Exception as e: logger.error(f"Error triggering manual check: {e}") raise HTTPException(status_code=500, detail="Internal server error") @app.get("/processing/status") async def get_processing_status(): """Get current processing status""" try: db = await get_database() # Get recent processing jobs cursor = db.processing_jobs.find().sort("started_at", -1).limit(20) jobs = [] async for job in cursor: job["_id"] = str(job["_id"]) for field in ["started_at", "completed_at", "created_at"]: if field in job and job[field]: job[field] = job[field].isoformat() jobs.append(job) # Get queue size queue_size = await get_processing_queue_size() return { "processing_jobs": jobs, "queue_size": queue_size, "last_updated": datetime.utcnow().isoformat() } except Exception as e: logger.error(f"Error getting processing status: {e}") raise HTTPException(status_code=500, detail="Internal server error") @app.get("/data-quality") async def get_data_quality_metrics(): """Get data quality metrics""" try: db = await get_database() # Get recent quality metrics cursor = db.data_quality_metrics.find().sort("timestamp", -1).limit(10) metrics = [] async for metric in cursor: metric["_id"] = str(metric["_id"]) if "timestamp" in metric: metric["timestamp"] = metric["timestamp"].isoformat() metrics.append(metric) return { "quality_metrics": metrics, "count": len(metrics) } except Exception as e: logger.error(f"Error getting data quality metrics: {e}") raise HTTPException(status_code=500, detail="Internal server error") @app.get("/redis/topics") async def get_redis_topics(): """Get active Redis topics""" try: redis = await get_redis() publisher = await get_redis_publisher() topics_info = await publisher.get_topics_info() return { "active_topics": topics_info, "timestamp": datetime.utcnow().isoformat() } except Exception as e: logger.error(f"Error getting Redis topics: {e}") raise HTTPException(status_code=500, detail="Internal server error") # Background task functions async def initialize_data_sources(): """Initialize data sources from database""" try: db = await get_database() # Create default data source if none exist count = await db.data_sources.count_documents({}) if count == 0: default_source = { "name": "Community Energy Data", "source_type": "ftp", "ftp_config": { "host": "ftp.example.com", "port": 21, "username": "energy_data", "password": "password", "remote_path": "/energy_data", "use_ssl": False }, "file_patterns": ["*.csv", "*.json", "energy_*.txt"], "data_format": "csv", "redis_topics": ["energy_data", "community_consumption", "real_time_metrics"], "enabled": False, # Disabled by default until configured "check_interval_seconds": 300, "created_at": datetime.utcnow(), "updated_at": datetime.utcnow(), "status": "configured" } await db.data_sources.insert_one(default_source) logger.info("Created default data source configuration") except Exception as e: logger.error(f"Error initializing data sources: {e}") async def initialize_components(): """Initialize core service components""" try: # Initialize global components global ftp_monitor, data_processor, redis_publisher, data_validator, service_monitor db = await get_database() redis = await get_redis() # Initialize monitoring first service_monitor = ServiceMonitor(db, redis) await service_monitor.start_monitoring() # Initialize FTP monitor ftp_monitor = FTPMonitor(db, redis) # Initialize data processor data_processor = DataProcessor(db, redis) await data_processor.initialize() # Initialize Redis publisher redis_publisher = RedisPublisher(redis) await redis_publisher.initialize() # Initialize data validator data_validator = DataValidator(db, redis) await data_validator.initialize() # Store app start time for uptime calculation app.state.start_time = datetime.utcnow() logger.info("Core components initialized successfully") except Exception as e: logger.error(f"Error initializing components: {e}") if service_monitor: await service_monitor.error_handler.log_error(e, {"task": "component_initialization"}) raise async def ftp_monitoring_task(): """Main FTP monitoring background task""" logger.info("Starting FTP monitoring task") while True: try: db = await get_database() # Get all enabled data sources cursor = db.data_sources.find({"enabled": True}) async for source in cursor: try: # Check if it's time to check this source last_check = source.get("last_check") check_interval = source.get("check_interval_seconds", 300) if (not last_check or (datetime.utcnow() - last_check).total_seconds() >= check_interval): # Process this data source await process_data_source(source) # Update last check time await db.data_sources.update_one( {"_id": source["_id"]}, {"$set": {"last_check": datetime.utcnow()}} ) except Exception as e: logger.error(f"Error processing data source {source.get('name', 'unknown')}: {e}") # Sleep between monitoring cycles await asyncio.sleep(30) except Exception as e: logger.error(f"Error in FTP monitoring task: {e}") await asyncio.sleep(60) async def process_data_source(source: Dict[str, Any]): """Process a single data source""" try: monitor = await get_ftp_monitor() processor = await get_data_processor() publisher = await get_redis_publisher() # Get new files from FTP new_files = await monitor.check_for_new_files(source) if new_files: logger.info(f"Found {len(new_files)} new files for source: {source['name']}") for file_info in new_files: try: # Download and process file file_data = await monitor.download_file(source, file_info) # Process the time series data processed_data = await processor.process_time_series_data( file_data, source["data_format"] ) # Validate data quality validator = await get_data_validator() quality_metrics = await validator.validate_time_series(processed_data) # Publish to Redis topics for topic in source["redis_topics"]: await publisher.publish_time_series_data( topic, processed_data, source["name"] ) # Record processing success await record_processing_success(source, file_info, len(processed_data), quality_metrics) except Exception as e: logger.error(f"Error processing file {file_info.get('filename', 'unknown')}: {e}") await record_processing_error(source, file_info, str(e)) except Exception as e: logger.error(f"Error in process_data_source for {source.get('name', 'unknown')}: {e}") async def data_processing_task(): """Background task for data processing queue""" logger.info("Starting data processing task") # This task handles queued processing jobs while True: try: await asyncio.sleep(10) # Check every 10 seconds # Implementation for processing queued jobs would go here except Exception as e: logger.error(f"Error in data processing task: {e}") await asyncio.sleep(30) async def health_monitoring_task(): """Background task for monitoring system health""" logger.info("Starting health monitoring task") while True: try: # Monitor FTP connections await monitor_ftp_health() # Monitor Redis publishing await monitor_redis_health() # Monitor processing performance await monitor_processing_performance() await asyncio.sleep(60) # Check every minute except Exception as e: logger.error(f"Error in health monitoring task: {e}") await asyncio.sleep(120) async def cleanup_task(): """Background task for cleaning up old data""" logger.info("Starting cleanup task") while True: try: db = await get_database() # Clean up old processing jobs (keep last 1000) old_jobs = await db.processing_jobs.find().sort("created_at", -1).skip(1000) async for job in old_jobs: await db.processing_jobs.delete_one({"_id": job["_id"]}) # Clean up old quality metrics (keep last 30 days) cutoff_date = datetime.utcnow() - timedelta(days=30) await db.data_quality_metrics.delete_many({"timestamp": {"$lt": cutoff_date}}) # Clean up old ingestion stats (keep last 90 days) cutoff_date = datetime.utcnow() - timedelta(days=90) await db.ingestion_stats.delete_many({"date": {"$lt": cutoff_date.strftime("%Y-%m-%d")}}) await asyncio.sleep(3600) # Run every hour except Exception as e: logger.error(f"Error in cleanup task: {e}") await asyncio.sleep(7200) # Helper functions async def check_ftp_connections() -> Dict[str, int]: """Check health of FTP connections""" try: db = await get_database() sources = await db.data_sources.find({"enabled": True}).to_list(None) total = len(sources) healthy = 0 monitor = await get_ftp_monitor() for source in sources: try: if await monitor.test_connection(source): healthy += 1 except: pass return {"total_connections": total, "healthy_connections": healthy} except Exception as e: logger.error(f"Error checking FTP connections: {e}") return {"total_connections": 0, "healthy_connections": 0} async def get_processing_queue_size() -> int: """Get size of processing queue""" try: db = await get_database() return await db.processing_queue.count_documents({"status": "pending"}) except Exception as e: logger.error(f"Error getting queue size: {e}") return 0 async def test_data_source_connection(source_id: str): """Test connection to a data source (background task)""" try: db = await get_database() source = await db.data_sources.find_one({"_id": ObjectId(source_id)}) if source: monitor = await get_ftp_monitor() success = await monitor.test_connection(source) await db.data_sources.update_one( {"_id": ObjectId(source_id)}, {"$set": { "last_test": datetime.utcnow(), "last_test_result": "success" if success else "failed" }} ) except Exception as e: logger.error(f"Error testing connection for source {source_id}: {e}") async def record_processing_success(source, file_info, record_count, quality_metrics): """Record successful processing""" try: db = await get_database() # Update source stats await db.data_sources.update_one( {"_id": source["_id"]}, {"$set": {"last_success": datetime.utcnow()}} ) # Update daily stats today = datetime.utcnow().strftime("%Y-%m-%d") await db.ingestion_stats.update_one( {"date": today}, { "$inc": { "files_processed": 1, "records_ingested": record_count, "redis_published": len(source["redis_topics"]) }, "$set": { "last_success": datetime.utcnow(), "quality_score": quality_metrics.get("overall_score", 100.0) } }, upsert=True ) except Exception as e: logger.error(f"Error recording processing success: {e}") async def record_processing_error(source, file_info, error_message): """Record processing error""" try: db = await get_database() # Update daily stats today = datetime.utcnow().strftime("%Y-%m-%d") await db.ingestion_stats.update_one( {"date": today}, {"$inc": {"errors": 1}}, upsert=True ) # Log error await db.processing_errors.insert_one({ "source_id": source["_id"], "source_name": source["name"], "file_info": file_info, "error_message": error_message, "timestamp": datetime.utcnow() }) except Exception as e: logger.error(f"Error recording processing error: {e}") async def monitor_ftp_health(): """Monitor FTP connection health""" # Implementation for FTP health monitoring pass async def monitor_redis_health(): """Monitor Redis publishing health""" # Implementation for Redis health monitoring pass async def monitor_processing_performance(): """Monitor processing performance metrics""" # Implementation for performance monitoring pass if __name__ == "__main__": import uvicorn from bson import ObjectId uvicorn.run(app, host="0.0.0.0", port=8008)