Simplify data ingestion service

This commit is contained in:
rafaeldpsilva
2025-09-10 15:47:10 +01:00
parent 13556347b0
commit b7e734e0d2
13 changed files with 474 additions and 4440 deletions

View File

@@ -1,779 +1,139 @@
"""
Data Ingestion Service
Monitors FTP servers for new time series data from real communities and publishes to Redis.
Provides realistic data feeds for simulation and analytics.
Port: 8008
SA4CPS Data Ingestion Service
Simple FTP monitoring service for .slg_v2 files with MongoDB storage
"""
import asyncio
from datetime import datetime, timedelta
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
from fastapi.middleware.cors import CORSMiddleware
from fastapi import FastAPI, HTTPException
from contextlib import asynccontextmanager
import asyncio
import logging
from typing import List, Optional, Dict, Any
import json
from bson import ObjectId
from datetime import datetime
from typing import Dict, Any
from models import (
DataSourceCreate, DataSourceUpdate, DataSourceResponse,
FileProcessingRequest, FileProcessingResponse, IngestionStats,
HealthStatus, QualityReport, TopicInfo, PublishingStats
)
from database import db_manager, get_database, get_redis, DatabaseService
from ftp_monitor import FTPMonitor
from slg_v2_processor import SLGv2Processor
from redis_publisher import RedisPublisher
from data_validator import DataValidator
from monitoring import ServiceMonitor, PerformanceMonitor, ErrorHandler
from database import DatabaseManager
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
# Global services
ftp_monitor = None
db_manager = None
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager"""
logger.info("Data Ingestion Service starting up...")
try:
# Connect to databases
await db_manager.connect()
# Initialize core components
await initialize_data_sources()
await initialize_components()
# Start background tasks
asyncio.create_task(ftp_monitoring_task())
asyncio.create_task(data_processing_task())
asyncio.create_task(health_monitoring_task())
asyncio.create_task(cleanup_task())
logger.info("Data Ingestion Service startup complete")
yield
except Exception as e:
logger.error(f"Error during startup: {e}")
raise
finally:
logger.info("Data Ingestion Service shutting down...")
await db_manager.disconnect()
logger.info("Data Ingestion Service shutdown complete")
"""Application lifespan management"""
global ftp_monitor, db_manager
logger.info("Starting SA4CPS Data Ingestion Service...")
# Initialize database connection
db_manager = DatabaseManager()
await db_manager.connect()
# Initialize FTP monitor
ftp_monitor = FTPMonitor(db_manager)
# Start background monitoring task
monitoring_task = asyncio.create_task(ftp_monitor.start_monitoring())
logger.info("Service started successfully")
yield
# Cleanup on shutdown
logger.info("Shutting down service...")
monitoring_task.cancel()
await db_manager.close()
logger.info("Service shutdown complete")
# Create FastAPI app
app = FastAPI(
title="Data Ingestion Service",
description="FTP monitoring and time series data ingestion for real community data simulation",
title="SA4CPS Data Ingestion Service",
description="Monitors FTP server for .slg_v2 files and stores data in MongoDB",
version="1.0.0",
lifespan=lifespan
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Global components
ftp_monitor = None
data_processor = None
redis_publisher = None
data_validator = None
service_monitor = None
@app.get("/")
async def root():
"""Root endpoint"""
return {
"service": "SA4CPS Data Ingestion Service",
"status": "running",
"timestamp": datetime.now().isoformat()
}
# Dependencies
async def get_db():
return await get_database()
async def get_ftp_monitor():
global ftp_monitor
if not ftp_monitor:
db = await get_database()
redis = await get_redis()
ftp_monitor = FTPMonitor(db, redis)
return ftp_monitor
async def get_slg_processor():
global data_processor
if not data_processor:
db = await get_database()
redis = await get_redis()
data_processor = SLGv2Processor(db, redis)
return data_processor
async def get_redis_publisher():
global redis_publisher
if not redis_publisher:
redis = await get_redis()
redis_publisher = RedisPublisher(redis)
return redis_publisher
async def get_data_validator():
global data_validator
if not data_validator:
db = await get_database()
redis = await get_redis()
data_validator = DataValidator(db, redis)
return data_validator
@app.get("/health", response_model=HealthStatus)
@app.get("/health")
async def health_check():
"""Health check endpoint"""
try:
# Get database health
health_data = await db_manager.health_check()
# Get FTP connections status
ftp_status = await check_ftp_connections()
# Calculate uptime
app_start_time = getattr(app.state, 'start_time', datetime.utcnow())
uptime = (datetime.utcnow() - app_start_time).total_seconds()
# Get processing stats
processing_stats = await get_processing_queue_size()
overall_status = "healthy"
if not health_data["mongodb"] or not health_data["redis"]:
overall_status = "degraded"
elif ftp_status["healthy_connections"] == 0 and ftp_status["total_connections"] > 0:
overall_status = "degraded"
return HealthStatus(
status=overall_status,
timestamp=datetime.utcnow(),
uptime_seconds=uptime,
active_sources=ftp_status["healthy_connections"],
total_processed_files=processing_stats.get("total_processed", 0),
redis_connected=health_data["redis"],
mongodb_connected=health_data["mongodb"],
last_error=None
)
except Exception as e:
logger.error(f"Health check failed: {e}")
return HealthStatus(
status="unhealthy",
timestamp=datetime.utcnow(),
uptime_seconds=0,
active_sources=0,
total_processed_files=0,
redis_connected=False,
mongodb_connected=False,
last_error=str(e)
)
global ftp_monitor, db_manager
@app.get("/stats", response_model=IngestionStats)
async def get_ingestion_stats():
"""Get data ingestion statistics"""
try:
db = await get_database()
# Get statistics from database
stats_data = await db.ingestion_stats.find_one(
{"date": datetime.utcnow().strftime("%Y-%m-%d")}
) or {}
return IngestionStats(
files_processed_today=stats_data.get("files_processed", 0),
records_ingested_today=stats_data.get("records_ingested", 0),
errors_today=stats_data.get("errors", 0),
data_sources_active=stats_data.get("active_sources", 0),
average_processing_time_ms=stats_data.get("avg_processing_time", 0),
last_successful_ingestion=stats_data.get("last_success"),
redis_messages_published=stats_data.get("redis_published", 0),
data_quality_score=stats_data.get("quality_score", 100.0)
)
except Exception as e:
logger.error(f"Error getting ingestion stats: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
health_status = {
"service": "healthy",
"timestamp": datetime.now().isoformat(),
"database": "unknown",
"ftp_monitor": "unknown"
}
@app.get("/sources")
async def get_data_sources():
"""Get configured data sources"""
try:
db = await get_database()
cursor = db.data_sources.find({})
sources = []
async for source in cursor:
source["_id"] = str(source["_id"])
# Convert datetime fields
for field in ["created_at", "updated_at", "last_check", "last_success"]:
if field in source and source[field]:
source[field] = source[field].isoformat()
sources.append(source)
return {
"sources": sources,
"count": len(sources)
}
except Exception as e:
logger.error(f"Error getting data sources: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@app.post("/sources")
async def create_data_source(
source_config: DataSourceCreate,
background_tasks: BackgroundTasks
):
"""Create a new data source"""
try:
db = await get_database()
# Create source document
source_doc = {
"name": source_config.name,
"description": source_config.description,
"source_type": source_config.source_type,
"ftp_config": source_config.ftp_config.dict() if source_config.ftp_config else None,
"file_patterns": source_config.file_patterns,
"data_format": source_config.data_format.value,
"topics": [topic.dict() for topic in source_config.topics],
"redis_topics": [topic.topic_name for topic in source_config.topics],
"enabled": source_config.enabled,
"check_interval_seconds": source_config.polling_interval_minutes * 60,
"max_file_size_mb": source_config.max_file_size_mb,
"created_at": datetime.utcnow(),
"updated_at": datetime.utcnow(),
"status": "created"
}
result = await db.data_sources.insert_one(source_doc)
# Test connection in background
background_tasks.add_task(test_data_source_connection, str(result.inserted_id))
return {
"message": "Data source created successfully",
"source_id": str(result.inserted_id),
"name": source_config.name
}
except Exception as e:
logger.error(f"Error creating data source: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@app.put("/sources/{source_id}")
async def update_data_source(
source_id: str,
source_config: DataSourceUpdate
):
"""Update an existing data source"""
try:
db = await get_database()
update_doc = {}
if source_config.name is not None:
update_doc["name"] = source_config.name
if source_config.description is not None:
update_doc["description"] = source_config.description
if source_config.ftp_config is not None:
update_doc["ftp_config"] = source_config.ftp_config.dict()
if source_config.file_patterns is not None:
update_doc["file_patterns"] = source_config.file_patterns
if source_config.data_format is not None:
update_doc["data_format"] = source_config.data_format.value
if source_config.topics is not None:
update_doc["topics"] = [topic.dict() for topic in source_config.topics]
update_doc["redis_topics"] = [topic.topic_name for topic in source_config.topics]
if source_config.enabled is not None:
update_doc["enabled"] = source_config.enabled
if source_config.polling_interval_minutes is not None:
update_doc["check_interval_seconds"] = source_config.polling_interval_minutes * 60
if source_config.max_file_size_mb is not None:
update_doc["max_file_size_mb"] = source_config.max_file_size_mb
update_doc["updated_at"] = datetime.utcnow()
result = await db.data_sources.update_one(
{"_id": ObjectId(source_id)},
{"$set": update_doc}
)
if result.matched_count == 0:
raise HTTPException(status_code=404, detail="Data source not found")
return {
"message": "Data source updated successfully",
"source_id": source_id
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error updating data source: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@app.delete("/sources/{source_id}")
async def delete_data_source(source_id: str):
"""Delete a data source"""
try:
db = await get_database()
result = await db.data_sources.delete_one({"_id": ObjectId(source_id)})
if result.deleted_count == 0:
raise HTTPException(status_code=404, detail="Data source not found")
return {
"message": "Data source deleted successfully",
"source_id": source_id
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error deleting data source: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@app.post("/sources/{source_id}/test")
async def test_data_source(source_id: str):
"""Test connection to a data source"""
try:
db = await get_database()
source = await db.data_sources.find_one({"_id": ObjectId(source_id)})
if not source:
raise HTTPException(status_code=404, detail="Data source not found")
monitor = await get_ftp_monitor()
test_result = await monitor.test_connection(source)
return {
"source_id": source_id,
"connection_test": test_result,
"tested_at": datetime.utcnow().isoformat()
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error testing data source: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@app.post("/sources/{source_id}/trigger")
async def trigger_manual_check(
source_id: str,
background_tasks: BackgroundTasks
):
"""Manually trigger a check for new data"""
try:
db = await get_database()
source = await db.data_sources.find_one({"_id": ObjectId(source_id)})
if not source:
raise HTTPException(status_code=404, detail="Data source not found")
# Trigger check in background
background_tasks.add_task(process_data_source, source)
return {
"message": "Manual check triggered",
"source_id": source_id,
"triggered_at": datetime.utcnow().isoformat()
}
except HTTPException:
raise
except Exception as e:
logger.error(f"Error triggering manual check: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/processing/status")
async def get_processing_status():
"""Get current processing status"""
try:
db = await get_database()
# Get recent processing jobs
cursor = db.processing_jobs.find().sort("started_at", -1).limit(20)
jobs = []
async for job in cursor:
job["_id"] = str(job["_id"])
for field in ["started_at", "completed_at", "created_at"]:
if field in job and job[field]:
job[field] = job[field].isoformat()
jobs.append(job)
# Get queue size
queue_size = await get_processing_queue_size()
return {
"processing_jobs": jobs,
"queue_size": queue_size,
"last_updated": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Error getting processing status: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/data-quality")
async def get_data_quality_metrics():
"""Get data quality metrics"""
try:
db = await get_database()
# Get recent quality metrics
cursor = db.data_quality_metrics.find().sort("timestamp", -1).limit(10)
metrics = []
async for metric in cursor:
metric["_id"] = str(metric["_id"])
if "timestamp" in metric:
metric["timestamp"] = metric["timestamp"].isoformat()
metrics.append(metric)
return {
"quality_metrics": metrics,
"count": len(metrics)
}
except Exception as e:
logger.error(f"Error getting data quality metrics: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
@app.get("/redis/topics")
async def get_redis_topics():
"""Get active Redis topics"""
try:
redis = await get_redis()
publisher = await get_redis_publisher()
topics_info = await publisher.get_topics_info()
return {
"active_topics": topics_info,
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
logger.error(f"Error getting Redis topics: {e}")
raise HTTPException(status_code=500, detail="Internal server error")
# Background task functions
async def initialize_data_sources():
"""Initialize data sources from database"""
try:
db = await get_database()
# Auto-configure SA4CPS source if none exist
count = await db.data_sources.count_documents({})
if count == 0:
from .simple_sa4cps_config import SimpleSA4CPSConfig
config = SimpleSA4CPSConfig()
result = await config.setup_sa4cps_source()
if result['success']:
logger.info(f"✅ Auto-configured SA4CPS source: {result['source_id']}")
else:
logger.warning(f"Failed to auto-configure SA4CPS: {result['message']}")
except Exception as e:
logger.error(f"Error initializing data sources: {e}")
async def initialize_components():
"""Initialize core service components"""
try:
# Initialize global components
global ftp_monitor, data_processor, redis_publisher, data_validator, service_monitor
db = await get_database()
redis = await get_redis()
# Initialize monitoring first
service_monitor = ServiceMonitor(db, redis)
await service_monitor.start_monitoring()
# Initialize FTP monitor
ftp_monitor = FTPMonitor(db, redis)
# Initialize SLG_v2 processor
data_processor = SLGv2Processor(db, redis)
# Initialize Redis publisher
redis_publisher = RedisPublisher(redis)
await redis_publisher.initialize()
# Initialize data validator
data_validator = DataValidator(db, redis)
await data_validator.initialize()
# Store app start time for uptime calculation
app.state.start_time = datetime.utcnow()
logger.info("Core components initialized successfully")
except Exception as e:
logger.error(f"Error initializing components: {e}")
if service_monitor:
await service_monitor.error_handler.log_error(e, {"task": "component_initialization"})
raise
async def ftp_monitoring_task():
"""Main FTP monitoring background task"""
logger.info("Starting FTP monitoring task")
while True:
# Check database connection
if db_manager:
try:
db = await get_database()
# Get all enabled data sources
cursor = db.data_sources.find({"enabled": True})
async for source in cursor:
try:
# Check if it's time to check this source
last_check = source.get("last_check")
check_interval = source.get("check_interval_seconds", 300)
if (not last_check or
(datetime.utcnow() - last_check).total_seconds() >= check_interval):
# Process this data source
await process_data_source(source)
# Update last check time
await db.data_sources.update_one(
{"_id": source["_id"]},
{"$set": {"last_check": datetime.utcnow()}}
)
except Exception as e:
logger.error(f"Error processing data source {source.get('name', 'unknown')}: {e}")
# Sleep between monitoring cycles
await asyncio.sleep(30)
except Exception as e:
logger.error(f"Error in FTP monitoring task: {e}")
await asyncio.sleep(60)
await db_manager.ping()
health_status["database"] = "connected"
except Exception:
health_status["database"] = "disconnected"
health_status["service"] = "degraded"
# Check FTP monitor status
if ftp_monitor:
health_status["ftp_monitor"] = ftp_monitor.get_status()
health_status["last_check"] = ftp_monitor.get_last_check_time()
health_status["files_processed"] = ftp_monitor.get_processed_count()
return health_status
@app.get("/status")
async def get_status():
"""Detailed status endpoint"""
global ftp_monitor, db_manager
if not ftp_monitor:
raise HTTPException(status_code=503, detail="FTP monitor not initialized")
return {
"ftp_monitor": ftp_monitor.get_detailed_status(),
"database": await db_manager.get_stats() if db_manager else None,
"timestamp": datetime.now().isoformat()
}
@app.post("/trigger-check")
async def trigger_manual_check():
"""Manually trigger FTP check"""
global ftp_monitor
if not ftp_monitor:
raise HTTPException(status_code=503, detail="FTP monitor not initialized")
async def process_data_source(source: Dict[str, Any]):
"""Process a single data source"""
try:
monitor = await get_ftp_monitor()
processor = await get_slg_processor()
publisher = await get_redis_publisher()
# Get new files from FTP
new_files = await monitor.check_for_new_files(source)
if new_files:
logger.info(f"Found {len(new_files)} new .slg_v2 files for source: {source['name']}")
for file_info in new_files:
try:
# Download and process file
file_data = await monitor.download_file(source, file_info)
# Process the .slg_v2 file
processed_data = await processor.process_slg_v2_file(file_data)
# Validate data quality
validator = await get_data_validator()
quality_metrics = await validator.validate_time_series(processed_data)
# Publish to Redis topics
for topic in source["redis_topics"]:
await publisher.publish_time_series_data(
topic, processed_data, source["name"]
)
# Record processing success
await record_processing_success(source, file_info, len(processed_data), quality_metrics)
except Exception as e:
logger.error(f"Error processing file {file_info.get('filename', 'unknown')}: {e}")
await record_processing_error(source, file_info, str(e))
result = await ftp_monitor.check_for_new_files()
return {
"message": "Manual check completed",
"result": result,
"timestamp": datetime.now().isoformat()
}
except Exception as e:
logger.error(f"Error in process_data_source for {source.get('name', 'unknown')}: {e}")
logger.error(f"Manual check failed: {e}")
raise HTTPException(status_code=500, detail=f"Check failed: {str(e)}")
async def data_processing_task():
"""Background task for data processing queue"""
logger.info("Starting data processing task")
# This task handles queued processing jobs
while True:
try:
await asyncio.sleep(10) # Check every 10 seconds
# Implementation for processing queued jobs would go here
except Exception as e:
logger.error(f"Error in data processing task: {e}")
await asyncio.sleep(30)
async def health_monitoring_task():
"""Background task for monitoring system health"""
logger.info("Starting health monitoring task")
while True:
try:
# Monitor FTP connections
await monitor_ftp_health()
# Monitor Redis publishing
await monitor_redis_health()
# Monitor processing performance
await monitor_processing_performance()
await asyncio.sleep(60) # Check every minute
except Exception as e:
logger.error(f"Error in health monitoring task: {e}")
await asyncio.sleep(120)
async def cleanup_task():
"""Background task for cleaning up old data"""
logger.info("Starting cleanup task")
while True:
try:
db = await get_database()
# Clean up old processing jobs (keep last 1000)
old_jobs = await db.processing_jobs.find().sort("created_at", -1).skip(1000)
async for job in old_jobs:
await db.processing_jobs.delete_one({"_id": job["_id"]})
# Clean up old quality metrics (keep last 30 days)
cutoff_date = datetime.utcnow() - timedelta(days=30)
await db.data_quality_metrics.delete_many({"timestamp": {"$lt": cutoff_date}})
# Clean up old ingestion stats (keep last 90 days)
cutoff_date = datetime.utcnow() - timedelta(days=90)
await db.ingestion_stats.delete_many({"date": {"$lt": cutoff_date.strftime("%Y-%m-%d")}})
await asyncio.sleep(3600) # Run every hour
except Exception as e:
logger.error(f"Error in cleanup task: {e}")
await asyncio.sleep(7200)
# Helper functions
async def check_ftp_connections() -> Dict[str, int]:
"""Check health of FTP connections"""
try:
db = await get_database()
sources = await db.data_sources.find({"enabled": True}).to_list(None)
total = len(sources)
healthy = 0
monitor = await get_ftp_monitor()
for source in sources:
try:
if await monitor.test_connection(source):
healthy += 1
except:
pass
return {"total_connections": total, "healthy_connections": healthy}
except Exception as e:
logger.error(f"Error checking FTP connections: {e}")
return {"total_connections": 0, "healthy_connections": 0}
async def get_processing_queue_size() -> int:
"""Get size of processing queue"""
try:
db = await get_database()
return await db.processing_queue.count_documents({"status": "pending"})
except Exception as e:
logger.error(f"Error getting queue size: {e}")
return 0
async def test_data_source_connection(source_id: str):
"""Test connection to a data source (background task)"""
try:
db = await get_database()
source = await db.data_sources.find_one({"_id": ObjectId(source_id)})
if source:
monitor = await get_ftp_monitor()
success = await monitor.test_connection(source)
await db.data_sources.update_one(
{"_id": ObjectId(source_id)},
{"$set": {
"last_test": datetime.utcnow(),
"last_test_result": "success" if success else "failed"
}}
)
except Exception as e:
logger.error(f"Error testing connection for source {source_id}: {e}")
async def record_processing_success(source, file_info, record_count, quality_metrics):
"""Record successful processing"""
try:
db = await get_database()
# Update source stats
await db.data_sources.update_one(
{"_id": source["_id"]},
{"$set": {"last_success": datetime.utcnow()}}
)
# Update daily stats
today = datetime.utcnow().strftime("%Y-%m-%d")
await db.ingestion_stats.update_one(
{"date": today},
{
"$inc": {
"files_processed": 1,
"records_ingested": record_count,
"redis_published": len(source["redis_topics"])
},
"$set": {
"last_success": datetime.utcnow(),
"quality_score": quality_metrics.get("overall_score", 100.0)
}
},
upsert=True
)
except Exception as e:
logger.error(f"Error recording processing success: {e}")
async def record_processing_error(source, file_info, error_message):
"""Record processing error"""
try:
db = await get_database()
# Update daily stats
today = datetime.utcnow().strftime("%Y-%m-%d")
await db.ingestion_stats.update_one(
{"date": today},
{"$inc": {"errors": 1}},
upsert=True
)
# Log error
await db.processing_errors.insert_one({
"source_id": source["_id"],
"source_name": source["name"],
"file_info": file_info,
"error_message": error_message,
"timestamp": datetime.utcnow()
})
except Exception as e:
logger.error(f"Error recording processing error: {e}")
async def monitor_ftp_health():
"""Monitor FTP connection health"""
# Implementation for FTP health monitoring
pass
async def monitor_redis_health():
"""Monitor Redis publishing health"""
# Implementation for Redis health monitoring
pass
async def monitor_processing_performance():
"""Monitor processing performance metrics"""
# Implementation for performance monitoring
pass
if __name__ == "__main__":
import uvicorn
from bson import ObjectId
uvicorn.run(app, host="0.0.0.0", port=8008)
uvicorn.run("main:app", host="0.0.0.0", port=8008, reload=True)