- Implement FTP monitoring and ingestion for SA4CPS .slg_v2 files - Add robust data processor with multi-format and unit inference support - Publish parsed data to Redis topics for real-time dashboard simulation - Include validation, monitoring, and auto-configuration scripts - Provide documentation and test scripts for SA4CPS integration
796 lines
28 KiB
Python
796 lines
28 KiB
Python
"""
|
|
Data Ingestion Service
|
|
Monitors FTP servers for new time series data from real communities and publishes to Redis.
|
|
Provides realistic data feeds for simulation and analytics.
|
|
Port: 8008
|
|
"""
|
|
|
|
import asyncio
|
|
from datetime import datetime, timedelta
|
|
from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from contextlib import asynccontextmanager
|
|
import logging
|
|
from typing import List, Optional, Dict, Any
|
|
import json
|
|
from bson import ObjectId
|
|
|
|
from .models import (
|
|
DataSourceCreate, DataSourceUpdate, DataSourceResponse,
|
|
FileProcessingRequest, FileProcessingResponse, IngestionStats,
|
|
HealthStatus, QualityReport, TopicInfo, PublishingStats
|
|
)
|
|
from .database import db_manager, get_database, get_redis, DatabaseService
|
|
from .ftp_monitor import FTPMonitor
|
|
from .data_processor import DataProcessor
|
|
from .redis_publisher import RedisPublisher
|
|
from .data_validator import DataValidator
|
|
from .monitoring import ServiceMonitor, PerformanceMonitor, ErrorHandler
|
|
|
|
# Configure logging
|
|
logging.basicConfig(level=logging.INFO)
|
|
logger = logging.getLogger(__name__)
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(app: FastAPI):
|
|
"""Application lifespan manager"""
|
|
logger.info("Data Ingestion Service starting up...")
|
|
|
|
try:
|
|
# Connect to databases
|
|
await db_manager.connect()
|
|
|
|
# Initialize core components
|
|
await initialize_data_sources()
|
|
await initialize_components()
|
|
|
|
# Start background tasks
|
|
asyncio.create_task(ftp_monitoring_task())
|
|
asyncio.create_task(data_processing_task())
|
|
asyncio.create_task(health_monitoring_task())
|
|
asyncio.create_task(cleanup_task())
|
|
|
|
logger.info("Data Ingestion Service startup complete")
|
|
|
|
yield
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error during startup: {e}")
|
|
raise
|
|
finally:
|
|
logger.info("Data Ingestion Service shutting down...")
|
|
await db_manager.disconnect()
|
|
logger.info("Data Ingestion Service shutdown complete")
|
|
|
|
app = FastAPI(
|
|
title="Data Ingestion Service",
|
|
description="FTP monitoring and time series data ingestion for real community data simulation",
|
|
version="1.0.0",
|
|
lifespan=lifespan
|
|
)
|
|
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
# Global components
|
|
ftp_monitor = None
|
|
data_processor = None
|
|
redis_publisher = None
|
|
data_validator = None
|
|
service_monitor = None
|
|
|
|
# Dependencies
|
|
async def get_db():
|
|
return await get_database()
|
|
|
|
async def get_ftp_monitor():
|
|
global ftp_monitor
|
|
if not ftp_monitor:
|
|
db = await get_database()
|
|
redis = await get_redis()
|
|
ftp_monitor = FTPMonitor(db, redis)
|
|
return ftp_monitor
|
|
|
|
async def get_data_processor():
|
|
global data_processor
|
|
if not data_processor:
|
|
db = await get_database()
|
|
redis = await get_redis()
|
|
data_processor = DataProcessor(db, redis)
|
|
return data_processor
|
|
|
|
async def get_redis_publisher():
|
|
global redis_publisher
|
|
if not redis_publisher:
|
|
redis = await get_redis()
|
|
redis_publisher = RedisPublisher(redis)
|
|
return redis_publisher
|
|
|
|
async def get_data_validator():
|
|
global data_validator
|
|
if not data_validator:
|
|
db = await get_database()
|
|
redis = await get_redis()
|
|
data_validator = DataValidator(db, redis)
|
|
return data_validator
|
|
|
|
@app.get("/health", response_model=HealthStatus)
|
|
async def health_check():
|
|
"""Health check endpoint"""
|
|
try:
|
|
# Get database health
|
|
health_data = await db_manager.health_check()
|
|
|
|
# Get FTP connections status
|
|
ftp_status = await check_ftp_connections()
|
|
|
|
# Calculate uptime
|
|
app_start_time = getattr(app.state, 'start_time', datetime.utcnow())
|
|
uptime = (datetime.utcnow() - app_start_time).total_seconds()
|
|
|
|
# Get processing stats
|
|
processing_stats = await get_processing_queue_size()
|
|
|
|
overall_status = "healthy"
|
|
if not health_data["mongodb"] or not health_data["redis"]:
|
|
overall_status = "degraded"
|
|
elif ftp_status["healthy_connections"] == 0 and ftp_status["total_connections"] > 0:
|
|
overall_status = "degraded"
|
|
|
|
return HealthStatus(
|
|
status=overall_status,
|
|
timestamp=datetime.utcnow(),
|
|
uptime_seconds=uptime,
|
|
active_sources=ftp_status["healthy_connections"],
|
|
total_processed_files=processing_stats.get("total_processed", 0),
|
|
redis_connected=health_data["redis"],
|
|
mongodb_connected=health_data["mongodb"],
|
|
last_error=None
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Health check failed: {e}")
|
|
return HealthStatus(
|
|
status="unhealthy",
|
|
timestamp=datetime.utcnow(),
|
|
uptime_seconds=0,
|
|
active_sources=0,
|
|
total_processed_files=0,
|
|
redis_connected=False,
|
|
mongodb_connected=False,
|
|
last_error=str(e)
|
|
)
|
|
|
|
@app.get("/stats", response_model=IngestionStats)
|
|
async def get_ingestion_stats():
|
|
"""Get data ingestion statistics"""
|
|
try:
|
|
db = await get_database()
|
|
|
|
# Get statistics from database
|
|
stats_data = await db.ingestion_stats.find_one(
|
|
{"date": datetime.utcnow().strftime("%Y-%m-%d")}
|
|
) or {}
|
|
|
|
return IngestionStats(
|
|
files_processed_today=stats_data.get("files_processed", 0),
|
|
records_ingested_today=stats_data.get("records_ingested", 0),
|
|
errors_today=stats_data.get("errors", 0),
|
|
data_sources_active=stats_data.get("active_sources", 0),
|
|
average_processing_time_ms=stats_data.get("avg_processing_time", 0),
|
|
last_successful_ingestion=stats_data.get("last_success"),
|
|
redis_messages_published=stats_data.get("redis_published", 0),
|
|
data_quality_score=stats_data.get("quality_score", 100.0)
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error getting ingestion stats: {e}")
|
|
raise HTTPException(status_code=500, detail="Internal server error")
|
|
|
|
@app.get("/sources")
|
|
async def get_data_sources():
|
|
"""Get configured data sources"""
|
|
try:
|
|
db = await get_database()
|
|
cursor = db.data_sources.find({})
|
|
sources = []
|
|
|
|
async for source in cursor:
|
|
source["_id"] = str(source["_id"])
|
|
# Convert datetime fields
|
|
for field in ["created_at", "updated_at", "last_check", "last_success"]:
|
|
if field in source and source[field]:
|
|
source[field] = source[field].isoformat()
|
|
sources.append(source)
|
|
|
|
return {
|
|
"sources": sources,
|
|
"count": len(sources)
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting data sources: {e}")
|
|
raise HTTPException(status_code=500, detail="Internal server error")
|
|
|
|
@app.post("/sources")
|
|
async def create_data_source(
|
|
source_config: DataSourceCreate,
|
|
background_tasks: BackgroundTasks
|
|
):
|
|
"""Create a new data source"""
|
|
try:
|
|
db = await get_database()
|
|
|
|
# Create source document
|
|
source_doc = {
|
|
"name": source_config.name,
|
|
"description": source_config.description,
|
|
"source_type": source_config.source_type,
|
|
"ftp_config": source_config.ftp_config.dict() if source_config.ftp_config else None,
|
|
"file_patterns": source_config.file_patterns,
|
|
"data_format": source_config.data_format.value,
|
|
"topics": [topic.dict() for topic in source_config.topics],
|
|
"redis_topics": [topic.topic_name for topic in source_config.topics],
|
|
"enabled": source_config.enabled,
|
|
"check_interval_seconds": source_config.polling_interval_minutes * 60,
|
|
"max_file_size_mb": source_config.max_file_size_mb,
|
|
"created_at": datetime.utcnow(),
|
|
"updated_at": datetime.utcnow(),
|
|
"status": "created"
|
|
}
|
|
|
|
result = await db.data_sources.insert_one(source_doc)
|
|
|
|
# Test connection in background
|
|
background_tasks.add_task(test_data_source_connection, str(result.inserted_id))
|
|
|
|
return {
|
|
"message": "Data source created successfully",
|
|
"source_id": str(result.inserted_id),
|
|
"name": source_config.name
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error creating data source: {e}")
|
|
raise HTTPException(status_code=500, detail="Internal server error")
|
|
|
|
@app.put("/sources/{source_id}")
|
|
async def update_data_source(
|
|
source_id: str,
|
|
source_config: DataSourceUpdate
|
|
):
|
|
"""Update an existing data source"""
|
|
try:
|
|
db = await get_database()
|
|
|
|
update_doc = {}
|
|
if source_config.name is not None:
|
|
update_doc["name"] = source_config.name
|
|
if source_config.description is not None:
|
|
update_doc["description"] = source_config.description
|
|
if source_config.ftp_config is not None:
|
|
update_doc["ftp_config"] = source_config.ftp_config.dict()
|
|
if source_config.file_patterns is not None:
|
|
update_doc["file_patterns"] = source_config.file_patterns
|
|
if source_config.data_format is not None:
|
|
update_doc["data_format"] = source_config.data_format.value
|
|
if source_config.topics is not None:
|
|
update_doc["topics"] = [topic.dict() for topic in source_config.topics]
|
|
update_doc["redis_topics"] = [topic.topic_name for topic in source_config.topics]
|
|
if source_config.enabled is not None:
|
|
update_doc["enabled"] = source_config.enabled
|
|
if source_config.polling_interval_minutes is not None:
|
|
update_doc["check_interval_seconds"] = source_config.polling_interval_minutes * 60
|
|
if source_config.max_file_size_mb is not None:
|
|
update_doc["max_file_size_mb"] = source_config.max_file_size_mb
|
|
|
|
update_doc["updated_at"] = datetime.utcnow()
|
|
|
|
result = await db.data_sources.update_one(
|
|
{"_id": ObjectId(source_id)},
|
|
{"$set": update_doc}
|
|
)
|
|
|
|
if result.matched_count == 0:
|
|
raise HTTPException(status_code=404, detail="Data source not found")
|
|
|
|
return {
|
|
"message": "Data source updated successfully",
|
|
"source_id": source_id
|
|
}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error updating data source: {e}")
|
|
raise HTTPException(status_code=500, detail="Internal server error")
|
|
|
|
@app.delete("/sources/{source_id}")
|
|
async def delete_data_source(source_id: str):
|
|
"""Delete a data source"""
|
|
try:
|
|
db = await get_database()
|
|
|
|
result = await db.data_sources.delete_one({"_id": ObjectId(source_id)})
|
|
|
|
if result.deleted_count == 0:
|
|
raise HTTPException(status_code=404, detail="Data source not found")
|
|
|
|
return {
|
|
"message": "Data source deleted successfully",
|
|
"source_id": source_id
|
|
}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error deleting data source: {e}")
|
|
raise HTTPException(status_code=500, detail="Internal server error")
|
|
|
|
@app.post("/sources/{source_id}/test")
|
|
async def test_data_source(source_id: str):
|
|
"""Test connection to a data source"""
|
|
try:
|
|
db = await get_database()
|
|
source = await db.data_sources.find_one({"_id": ObjectId(source_id)})
|
|
|
|
if not source:
|
|
raise HTTPException(status_code=404, detail="Data source not found")
|
|
|
|
monitor = await get_ftp_monitor()
|
|
test_result = await monitor.test_connection(source)
|
|
|
|
return {
|
|
"source_id": source_id,
|
|
"connection_test": test_result,
|
|
"tested_at": datetime.utcnow().isoformat()
|
|
}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error testing data source: {e}")
|
|
raise HTTPException(status_code=500, detail="Internal server error")
|
|
|
|
@app.post("/sources/{source_id}/trigger")
|
|
async def trigger_manual_check(
|
|
source_id: str,
|
|
background_tasks: BackgroundTasks
|
|
):
|
|
"""Manually trigger a check for new data"""
|
|
try:
|
|
db = await get_database()
|
|
source = await db.data_sources.find_one({"_id": ObjectId(source_id)})
|
|
|
|
if not source:
|
|
raise HTTPException(status_code=404, detail="Data source not found")
|
|
|
|
# Trigger check in background
|
|
background_tasks.add_task(process_data_source, source)
|
|
|
|
return {
|
|
"message": "Manual check triggered",
|
|
"source_id": source_id,
|
|
"triggered_at": datetime.utcnow().isoformat()
|
|
}
|
|
except HTTPException:
|
|
raise
|
|
except Exception as e:
|
|
logger.error(f"Error triggering manual check: {e}")
|
|
raise HTTPException(status_code=500, detail="Internal server error")
|
|
|
|
@app.get("/processing/status")
|
|
async def get_processing_status():
|
|
"""Get current processing status"""
|
|
try:
|
|
db = await get_database()
|
|
|
|
# Get recent processing jobs
|
|
cursor = db.processing_jobs.find().sort("started_at", -1).limit(20)
|
|
jobs = []
|
|
|
|
async for job in cursor:
|
|
job["_id"] = str(job["_id"])
|
|
for field in ["started_at", "completed_at", "created_at"]:
|
|
if field in job and job[field]:
|
|
job[field] = job[field].isoformat()
|
|
jobs.append(job)
|
|
|
|
# Get queue size
|
|
queue_size = await get_processing_queue_size()
|
|
|
|
return {
|
|
"processing_jobs": jobs,
|
|
"queue_size": queue_size,
|
|
"last_updated": datetime.utcnow().isoformat()
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting processing status: {e}")
|
|
raise HTTPException(status_code=500, detail="Internal server error")
|
|
|
|
@app.get("/data-quality")
|
|
async def get_data_quality_metrics():
|
|
"""Get data quality metrics"""
|
|
try:
|
|
db = await get_database()
|
|
|
|
# Get recent quality metrics
|
|
cursor = db.data_quality_metrics.find().sort("timestamp", -1).limit(10)
|
|
metrics = []
|
|
|
|
async for metric in cursor:
|
|
metric["_id"] = str(metric["_id"])
|
|
if "timestamp" in metric:
|
|
metric["timestamp"] = metric["timestamp"].isoformat()
|
|
metrics.append(metric)
|
|
|
|
return {
|
|
"quality_metrics": metrics,
|
|
"count": len(metrics)
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting data quality metrics: {e}")
|
|
raise HTTPException(status_code=500, detail="Internal server error")
|
|
|
|
@app.get("/redis/topics")
|
|
async def get_redis_topics():
|
|
"""Get active Redis topics"""
|
|
try:
|
|
redis = await get_redis()
|
|
publisher = await get_redis_publisher()
|
|
|
|
topics_info = await publisher.get_topics_info()
|
|
|
|
return {
|
|
"active_topics": topics_info,
|
|
"timestamp": datetime.utcnow().isoformat()
|
|
}
|
|
except Exception as e:
|
|
logger.error(f"Error getting Redis topics: {e}")
|
|
raise HTTPException(status_code=500, detail="Internal server error")
|
|
|
|
# Background task functions
|
|
async def initialize_data_sources():
|
|
"""Initialize data sources from database"""
|
|
try:
|
|
db = await get_database()
|
|
|
|
# Create default data source if none exist
|
|
count = await db.data_sources.count_documents({})
|
|
if count == 0:
|
|
default_source = {
|
|
"name": "Community Energy Data",
|
|
"source_type": "ftp",
|
|
"ftp_config": {
|
|
"host": "ftp.example.com",
|
|
"port": 21,
|
|
"username": "energy_data",
|
|
"password": "password",
|
|
"remote_path": "/energy_data",
|
|
"use_ssl": False
|
|
},
|
|
"file_patterns": ["*.csv", "*.json", "energy_*.txt"],
|
|
"data_format": "csv",
|
|
"redis_topics": ["energy_data", "community_consumption", "real_time_metrics"],
|
|
"enabled": False, # Disabled by default until configured
|
|
"check_interval_seconds": 300,
|
|
"created_at": datetime.utcnow(),
|
|
"updated_at": datetime.utcnow(),
|
|
"status": "configured"
|
|
}
|
|
|
|
await db.data_sources.insert_one(default_source)
|
|
logger.info("Created default data source configuration")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error initializing data sources: {e}")
|
|
|
|
async def initialize_components():
|
|
"""Initialize core service components"""
|
|
try:
|
|
# Initialize global components
|
|
global ftp_monitor, data_processor, redis_publisher, data_validator, service_monitor
|
|
|
|
db = await get_database()
|
|
redis = await get_redis()
|
|
|
|
# Initialize monitoring first
|
|
service_monitor = ServiceMonitor(db, redis)
|
|
await service_monitor.start_monitoring()
|
|
|
|
# Initialize FTP monitor
|
|
ftp_monitor = FTPMonitor(db, redis)
|
|
|
|
# Initialize data processor
|
|
data_processor = DataProcessor(db, redis)
|
|
await data_processor.initialize()
|
|
|
|
# Initialize Redis publisher
|
|
redis_publisher = RedisPublisher(redis)
|
|
await redis_publisher.initialize()
|
|
|
|
# Initialize data validator
|
|
data_validator = DataValidator(db, redis)
|
|
await data_validator.initialize()
|
|
|
|
# Store app start time for uptime calculation
|
|
app.state.start_time = datetime.utcnow()
|
|
|
|
logger.info("Core components initialized successfully")
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error initializing components: {e}")
|
|
if service_monitor:
|
|
await service_monitor.error_handler.log_error(e, {"task": "component_initialization"})
|
|
raise
|
|
|
|
async def ftp_monitoring_task():
|
|
"""Main FTP monitoring background task"""
|
|
logger.info("Starting FTP monitoring task")
|
|
|
|
while True:
|
|
try:
|
|
db = await get_database()
|
|
|
|
# Get all enabled data sources
|
|
cursor = db.data_sources.find({"enabled": True})
|
|
|
|
async for source in cursor:
|
|
try:
|
|
# Check if it's time to check this source
|
|
last_check = source.get("last_check")
|
|
check_interval = source.get("check_interval_seconds", 300)
|
|
|
|
if (not last_check or
|
|
(datetime.utcnow() - last_check).total_seconds() >= check_interval):
|
|
|
|
# Process this data source
|
|
await process_data_source(source)
|
|
|
|
# Update last check time
|
|
await db.data_sources.update_one(
|
|
{"_id": source["_id"]},
|
|
{"$set": {"last_check": datetime.utcnow()}}
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing data source {source.get('name', 'unknown')}: {e}")
|
|
|
|
# Sleep between monitoring cycles
|
|
await asyncio.sleep(30)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in FTP monitoring task: {e}")
|
|
await asyncio.sleep(60)
|
|
|
|
async def process_data_source(source: Dict[str, Any]):
|
|
"""Process a single data source"""
|
|
try:
|
|
monitor = await get_ftp_monitor()
|
|
processor = await get_data_processor()
|
|
publisher = await get_redis_publisher()
|
|
|
|
# Get new files from FTP
|
|
new_files = await monitor.check_for_new_files(source)
|
|
|
|
if new_files:
|
|
logger.info(f"Found {len(new_files)} new files for source: {source['name']}")
|
|
|
|
for file_info in new_files:
|
|
try:
|
|
# Download and process file
|
|
file_data = await monitor.download_file(source, file_info)
|
|
|
|
# Process the time series data
|
|
processed_data = await processor.process_time_series_data(
|
|
file_data, source["data_format"]
|
|
)
|
|
|
|
# Validate data quality
|
|
validator = await get_data_validator()
|
|
quality_metrics = await validator.validate_time_series(processed_data)
|
|
|
|
# Publish to Redis topics
|
|
for topic in source["redis_topics"]:
|
|
await publisher.publish_time_series_data(
|
|
topic, processed_data, source["name"]
|
|
)
|
|
|
|
# Record processing success
|
|
await record_processing_success(source, file_info, len(processed_data), quality_metrics)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error processing file {file_info.get('filename', 'unknown')}: {e}")
|
|
await record_processing_error(source, file_info, str(e))
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in process_data_source for {source.get('name', 'unknown')}: {e}")
|
|
|
|
async def data_processing_task():
|
|
"""Background task for data processing queue"""
|
|
logger.info("Starting data processing task")
|
|
|
|
# This task handles queued processing jobs
|
|
while True:
|
|
try:
|
|
await asyncio.sleep(10) # Check every 10 seconds
|
|
# Implementation for processing queued jobs would go here
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in data processing task: {e}")
|
|
await asyncio.sleep(30)
|
|
|
|
async def health_monitoring_task():
|
|
"""Background task for monitoring system health"""
|
|
logger.info("Starting health monitoring task")
|
|
|
|
while True:
|
|
try:
|
|
# Monitor FTP connections
|
|
await monitor_ftp_health()
|
|
|
|
# Monitor Redis publishing
|
|
await monitor_redis_health()
|
|
|
|
# Monitor processing performance
|
|
await monitor_processing_performance()
|
|
|
|
await asyncio.sleep(60) # Check every minute
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in health monitoring task: {e}")
|
|
await asyncio.sleep(120)
|
|
|
|
async def cleanup_task():
|
|
"""Background task for cleaning up old data"""
|
|
logger.info("Starting cleanup task")
|
|
|
|
while True:
|
|
try:
|
|
db = await get_database()
|
|
|
|
# Clean up old processing jobs (keep last 1000)
|
|
old_jobs = await db.processing_jobs.find().sort("created_at", -1).skip(1000)
|
|
async for job in old_jobs:
|
|
await db.processing_jobs.delete_one({"_id": job["_id"]})
|
|
|
|
# Clean up old quality metrics (keep last 30 days)
|
|
cutoff_date = datetime.utcnow() - timedelta(days=30)
|
|
await db.data_quality_metrics.delete_many({"timestamp": {"$lt": cutoff_date}})
|
|
|
|
# Clean up old ingestion stats (keep last 90 days)
|
|
cutoff_date = datetime.utcnow() - timedelta(days=90)
|
|
await db.ingestion_stats.delete_many({"date": {"$lt": cutoff_date.strftime("%Y-%m-%d")}})
|
|
|
|
await asyncio.sleep(3600) # Run every hour
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error in cleanup task: {e}")
|
|
await asyncio.sleep(7200)
|
|
|
|
# Helper functions
|
|
async def check_ftp_connections() -> Dict[str, int]:
|
|
"""Check health of FTP connections"""
|
|
try:
|
|
db = await get_database()
|
|
sources = await db.data_sources.find({"enabled": True}).to_list(None)
|
|
|
|
total = len(sources)
|
|
healthy = 0
|
|
|
|
monitor = await get_ftp_monitor()
|
|
for source in sources:
|
|
try:
|
|
if await monitor.test_connection(source):
|
|
healthy += 1
|
|
except:
|
|
pass
|
|
|
|
return {"total_connections": total, "healthy_connections": healthy}
|
|
except Exception as e:
|
|
logger.error(f"Error checking FTP connections: {e}")
|
|
return {"total_connections": 0, "healthy_connections": 0}
|
|
|
|
async def get_processing_queue_size() -> int:
|
|
"""Get size of processing queue"""
|
|
try:
|
|
db = await get_database()
|
|
return await db.processing_queue.count_documents({"status": "pending"})
|
|
except Exception as e:
|
|
logger.error(f"Error getting queue size: {e}")
|
|
return 0
|
|
|
|
async def test_data_source_connection(source_id: str):
|
|
"""Test connection to a data source (background task)"""
|
|
try:
|
|
db = await get_database()
|
|
source = await db.data_sources.find_one({"_id": ObjectId(source_id)})
|
|
|
|
if source:
|
|
monitor = await get_ftp_monitor()
|
|
success = await monitor.test_connection(source)
|
|
|
|
await db.data_sources.update_one(
|
|
{"_id": ObjectId(source_id)},
|
|
{"$set": {
|
|
"last_test": datetime.utcnow(),
|
|
"last_test_result": "success" if success else "failed"
|
|
}}
|
|
)
|
|
except Exception as e:
|
|
logger.error(f"Error testing connection for source {source_id}: {e}")
|
|
|
|
async def record_processing_success(source, file_info, record_count, quality_metrics):
|
|
"""Record successful processing"""
|
|
try:
|
|
db = await get_database()
|
|
|
|
# Update source stats
|
|
await db.data_sources.update_one(
|
|
{"_id": source["_id"]},
|
|
{"$set": {"last_success": datetime.utcnow()}}
|
|
)
|
|
|
|
# Update daily stats
|
|
today = datetime.utcnow().strftime("%Y-%m-%d")
|
|
await db.ingestion_stats.update_one(
|
|
{"date": today},
|
|
{
|
|
"$inc": {
|
|
"files_processed": 1,
|
|
"records_ingested": record_count,
|
|
"redis_published": len(source["redis_topics"])
|
|
},
|
|
"$set": {
|
|
"last_success": datetime.utcnow(),
|
|
"quality_score": quality_metrics.get("overall_score", 100.0)
|
|
}
|
|
},
|
|
upsert=True
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error recording processing success: {e}")
|
|
|
|
async def record_processing_error(source, file_info, error_message):
|
|
"""Record processing error"""
|
|
try:
|
|
db = await get_database()
|
|
|
|
# Update daily stats
|
|
today = datetime.utcnow().strftime("%Y-%m-%d")
|
|
await db.ingestion_stats.update_one(
|
|
{"date": today},
|
|
{"$inc": {"errors": 1}},
|
|
upsert=True
|
|
)
|
|
|
|
# Log error
|
|
await db.processing_errors.insert_one({
|
|
"source_id": source["_id"],
|
|
"source_name": source["name"],
|
|
"file_info": file_info,
|
|
"error_message": error_message,
|
|
"timestamp": datetime.utcnow()
|
|
})
|
|
|
|
except Exception as e:
|
|
logger.error(f"Error recording processing error: {e}")
|
|
|
|
async def monitor_ftp_health():
|
|
"""Monitor FTP connection health"""
|
|
# Implementation for FTP health monitoring
|
|
pass
|
|
|
|
async def monitor_redis_health():
|
|
"""Monitor Redis publishing health"""
|
|
# Implementation for Redis health monitoring
|
|
pass
|
|
|
|
async def monitor_processing_performance():
|
|
"""Monitor processing performance metrics"""
|
|
# Implementation for performance monitoring
|
|
pass
|
|
|
|
if __name__ == "__main__":
|
|
import uvicorn
|
|
from bson import ObjectId
|
|
uvicorn.run(app, host="0.0.0.0", port=8008) |