Simplify data ingestion service
This commit is contained in:
433
microservices/data-ingestion-service/src/database.py
Normal file
433
microservices/data-ingestion-service/src/database.py
Normal file
@@ -0,0 +1,433 @@
|
||||
"""
|
||||
Database configuration and connection management for the data ingestion service.
|
||||
Handles MongoDB connections, index creation, and Redis connections.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from typing import Optional
|
||||
from contextlib import asynccontextmanager
|
||||
import os
|
||||
from datetime import datetime
|
||||
|
||||
import motor.motor_asyncio
|
||||
import redis.asyncio as redis
|
||||
from pymongo import IndexModel
|
||||
|
||||
from .models import (
|
||||
DataSourceSchema, ProcessedFileSchema, QualityReportSchema,
|
||||
IngestionStatsSchema, ErrorLogSchema, MonitoringAlertSchema
|
||||
)
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class DatabaseManager:
|
||||
"""Manages database connections and operations"""
|
||||
|
||||
def __init__(self, mongodb_url: str = None, redis_url: str = None):
|
||||
self.mongodb_url = mongodb_url or os.getenv("MONGODB_URL", "mongodb://localhost:27017")
|
||||
self.redis_url = redis_url or os.getenv("REDIS_URL", "redis://localhost:6379")
|
||||
|
||||
self.mongodb_client: Optional[motor.motor_asyncio.AsyncIOMotorClient] = None
|
||||
self.db: Optional[motor.motor_asyncio.AsyncIOMotorDatabase] = None
|
||||
self.redis_client: Optional[redis.Redis] = None
|
||||
|
||||
self._connection_status = {
|
||||
"mongodb": False,
|
||||
"redis": False,
|
||||
"last_check": None
|
||||
}
|
||||
|
||||
async def connect(self):
|
||||
"""Establish connections to MongoDB and Redis"""
|
||||
try:
|
||||
await self._connect_mongodb()
|
||||
await self._connect_redis()
|
||||
await self._create_indexes()
|
||||
|
||||
logger.info("Database connections established successfully")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error establishing database connections: {e}")
|
||||
raise
|
||||
|
||||
async def _connect_mongodb(self):
|
||||
"""Connect to MongoDB"""
|
||||
try:
|
||||
# Parse database name from URL or use default
|
||||
db_name = "energy_dashboard"
|
||||
if self.mongodb_url.count("/") > 2:
|
||||
db_name = self.mongodb_url.split("/")[-1]
|
||||
|
||||
self.mongodb_client = motor.motor_asyncio.AsyncIOMotorClient(
|
||||
self.mongodb_url,
|
||||
serverSelectionTimeoutMS=5000,
|
||||
connectTimeoutMS=5000,
|
||||
maxPoolSize=50,
|
||||
minPoolSize=10
|
||||
)
|
||||
|
||||
self.db = self.mongodb_client[db_name]
|
||||
|
||||
# Test connection
|
||||
await self.mongodb_client.admin.command('ping')
|
||||
|
||||
self._connection_status["mongodb"] = True
|
||||
logger.info(f"Connected to MongoDB: {self.mongodb_url}")
|
||||
|
||||
except Exception as e:
|
||||
self._connection_status["mongodb"] = False
|
||||
logger.error(f"MongoDB connection failed: {e}")
|
||||
raise
|
||||
|
||||
async def _connect_redis(self):
|
||||
"""Connect to Redis"""
|
||||
try:
|
||||
self.redis_client = redis.from_url(
|
||||
self.redis_url,
|
||||
encoding="utf-8",
|
||||
decode_responses=True,
|
||||
socket_timeout=5,
|
||||
socket_connect_timeout=5,
|
||||
health_check_interval=30
|
||||
)
|
||||
|
||||
# Test connection
|
||||
await self.redis_client.ping()
|
||||
|
||||
self._connection_status["redis"] = True
|
||||
logger.info(f"Connected to Redis: {self.redis_url}")
|
||||
|
||||
except Exception as e:
|
||||
self._connection_status["redis"] = False
|
||||
logger.error(f"Redis connection failed: {e}")
|
||||
raise
|
||||
|
||||
async def _create_indexes(self):
|
||||
"""Create database indexes for optimal performance"""
|
||||
try:
|
||||
schemas = [
|
||||
DataSourceSchema,
|
||||
ProcessedFileSchema,
|
||||
QualityReportSchema,
|
||||
IngestionStatsSchema,
|
||||
ErrorLogSchema,
|
||||
MonitoringAlertSchema
|
||||
]
|
||||
|
||||
for schema in schemas:
|
||||
collection = self.db[schema.collection_name]
|
||||
indexes = schema.get_indexes()
|
||||
|
||||
if indexes:
|
||||
index_models = []
|
||||
for index_spec in indexes:
|
||||
keys = index_spec["keys"]
|
||||
options = {k: v for k, v in index_spec.items() if k != "keys"}
|
||||
index_models.append(IndexModel(keys, **options))
|
||||
|
||||
await collection.create_indexes(index_models)
|
||||
logger.debug(f"Created {len(index_models)} indexes for {schema.collection_name}")
|
||||
|
||||
logger.info("Database indexes created successfully")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error creating database indexes: {e}")
|
||||
# Don't raise here - indexes are performance optimization, not critical
|
||||
|
||||
async def disconnect(self):
|
||||
"""Close all database connections"""
|
||||
try:
|
||||
if self.redis_client:
|
||||
await self.redis_client.aclose()
|
||||
self._connection_status["redis"] = False
|
||||
|
||||
if self.mongodb_client:
|
||||
self.mongodb_client.close()
|
||||
self._connection_status["mongodb"] = False
|
||||
|
||||
logger.info("Database connections closed")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error closing database connections: {e}")
|
||||
|
||||
async def health_check(self) -> dict:
|
||||
"""Check health of database connections"""
|
||||
health = {
|
||||
"mongodb": False,
|
||||
"redis": False,
|
||||
"timestamp": datetime.utcnow().isoformat(),
|
||||
"details": {}
|
||||
}
|
||||
|
||||
# Check MongoDB
|
||||
try:
|
||||
if self.mongodb_client:
|
||||
start_time = asyncio.get_event_loop().time()
|
||||
await self.mongodb_client.admin.command('ping')
|
||||
response_time = (asyncio.get_event_loop().time() - start_time) * 1000
|
||||
|
||||
health["mongodb"] = True
|
||||
health["details"]["mongodb"] = {
|
||||
"status": "healthy",
|
||||
"response_time_ms": round(response_time, 2),
|
||||
"server_info": await self.mongodb_client.server_info()
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
health["details"]["mongodb"] = {
|
||||
"status": "unhealthy",
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
# Check Redis
|
||||
try:
|
||||
if self.redis_client:
|
||||
start_time = asyncio.get_event_loop().time()
|
||||
await self.redis_client.ping()
|
||||
response_time = (asyncio.get_event_loop().time() - start_time) * 1000
|
||||
|
||||
redis_info = await self.redis_client.info()
|
||||
|
||||
health["redis"] = True
|
||||
health["details"]["redis"] = {
|
||||
"status": "healthy",
|
||||
"response_time_ms": round(response_time, 2),
|
||||
"version": redis_info.get("redis_version"),
|
||||
"connected_clients": redis_info.get("connected_clients"),
|
||||
"used_memory_human": redis_info.get("used_memory_human")
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
health["details"]["redis"] = {
|
||||
"status": "unhealthy",
|
||||
"error": str(e)
|
||||
}
|
||||
|
||||
# Update connection status
|
||||
self._connection_status.update({
|
||||
"mongodb": health["mongodb"],
|
||||
"redis": health["redis"],
|
||||
"last_check": datetime.utcnow()
|
||||
})
|
||||
|
||||
return health
|
||||
|
||||
@property
|
||||
def is_connected(self) -> bool:
|
||||
"""Check if all required connections are established"""
|
||||
return self._connection_status["mongodb"] and self._connection_status["redis"]
|
||||
|
||||
@property
|
||||
def data_sources(self):
|
||||
"""Data sources collection"""
|
||||
return self.db[DataSourceSchema.collection_name]
|
||||
|
||||
@property
|
||||
def processed_files(self):
|
||||
"""Processed files collection"""
|
||||
return self.db[ProcessedFileSchema.collection_name]
|
||||
|
||||
@property
|
||||
def quality_reports(self):
|
||||
"""Quality reports collection"""
|
||||
return self.db[QualityReportSchema.collection_name]
|
||||
|
||||
@property
|
||||
def ingestion_stats(self):
|
||||
"""Ingestion statistics collection"""
|
||||
return self.db[IngestionStatsSchema.collection_name]
|
||||
|
||||
@property
|
||||
def error_logs(self):
|
||||
"""Error logs collection"""
|
||||
return self.db[ErrorLogSchema.collection_name]
|
||||
|
||||
@property
|
||||
def monitoring_alerts(self):
|
||||
"""Monitoring alerts collection"""
|
||||
return self.db[MonitoringAlertSchema.collection_name]
|
||||
|
||||
# Global database manager instance
|
||||
db_manager = DatabaseManager()
|
||||
|
||||
async def get_database():
|
||||
"""Dependency function to get database instance"""
|
||||
if not db_manager.is_connected:
|
||||
await db_manager.connect()
|
||||
return db_manager.db
|
||||
|
||||
async def get_redis():
|
||||
"""Dependency function to get Redis client"""
|
||||
if not db_manager.is_connected:
|
||||
await db_manager.connect()
|
||||
return db_manager.redis_client
|
||||
|
||||
@asynccontextmanager
|
||||
async def get_db_session():
|
||||
"""Context manager for database operations"""
|
||||
try:
|
||||
if not db_manager.is_connected:
|
||||
await db_manager.connect()
|
||||
yield db_manager.db
|
||||
except Exception as e:
|
||||
logger.error(f"Database session error: {e}")
|
||||
raise
|
||||
finally:
|
||||
# Connection pooling handles cleanup automatically
|
||||
pass
|
||||
|
||||
@asynccontextmanager
|
||||
async def get_redis_session():
|
||||
"""Context manager for Redis operations"""
|
||||
try:
|
||||
if not db_manager.is_connected:
|
||||
await db_manager.connect()
|
||||
yield db_manager.redis_client
|
||||
except Exception as e:
|
||||
logger.error(f"Redis session error: {e}")
|
||||
raise
|
||||
finally:
|
||||
# Connection pooling handles cleanup automatically
|
||||
pass
|
||||
|
||||
class DatabaseService:
|
||||
"""High-level database service with common operations"""
|
||||
|
||||
def __init__(self, db, redis_client):
|
||||
self.db = db
|
||||
self.redis = redis_client
|
||||
|
||||
async def create_data_source(self, source_data: dict) -> str:
|
||||
"""Create a new data source"""
|
||||
try:
|
||||
source_data["created_at"] = datetime.utcnow()
|
||||
source_data["updated_at"] = datetime.utcnow()
|
||||
source_data["status"] = "active"
|
||||
source_data["error_count"] = 0
|
||||
source_data["total_files_processed"] = 0
|
||||
|
||||
result = await self.db.data_sources.insert_one(source_data)
|
||||
return str(result.inserted_id)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error creating data source: {e}")
|
||||
raise
|
||||
|
||||
async def get_data_source(self, source_id: str) -> Optional[dict]:
|
||||
"""Get data source by ID"""
|
||||
try:
|
||||
from bson import ObjectId
|
||||
source = await self.db.data_sources.find_one({"_id": ObjectId(source_id)})
|
||||
if source:
|
||||
source["_id"] = str(source["_id"])
|
||||
return source
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting data source: {e}")
|
||||
return None
|
||||
|
||||
async def update_data_source(self, source_id: str, update_data: dict) -> bool:
|
||||
"""Update data source"""
|
||||
try:
|
||||
from bson import ObjectId
|
||||
update_data["updated_at"] = datetime.utcnow()
|
||||
|
||||
result = await self.db.data_sources.update_one(
|
||||
{"_id": ObjectId(source_id)},
|
||||
{"$set": update_data}
|
||||
)
|
||||
|
||||
return result.modified_count > 0
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating data source: {e}")
|
||||
return False
|
||||
|
||||
async def list_data_sources(self, enabled_only: bool = False) -> list:
|
||||
"""List all data sources"""
|
||||
try:
|
||||
query = {"enabled": True} if enabled_only else {}
|
||||
cursor = self.db.data_sources.find(query).sort("created_at", -1)
|
||||
|
||||
sources = []
|
||||
async for source in cursor:
|
||||
source["_id"] = str(source["_id"])
|
||||
sources.append(source)
|
||||
|
||||
return sources
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error listing data sources: {e}")
|
||||
return []
|
||||
|
||||
async def log_error(self, error_data: dict):
|
||||
"""Log an error to the database"""
|
||||
try:
|
||||
error_data["timestamp"] = datetime.utcnow()
|
||||
await self.db.error_logs.insert_one(error_data)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error logging error: {e}")
|
||||
|
||||
async def update_ingestion_stats(self, stats_data: dict):
|
||||
"""Update daily ingestion statistics"""
|
||||
try:
|
||||
today = datetime.utcnow().strftime("%Y-%m-%d")
|
||||
stats_data["date"] = today
|
||||
stats_data["timestamp"] = datetime.utcnow()
|
||||
|
||||
await self.db.ingestion_stats.update_one(
|
||||
{"date": today},
|
||||
{"$set": stats_data},
|
||||
upsert=True
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error updating ingestion stats: {e}")
|
||||
|
||||
async def get_latest_stats(self) -> Optional[dict]:
|
||||
"""Get latest ingestion statistics"""
|
||||
try:
|
||||
stats = await self.db.ingestion_stats.find_one(
|
||||
sort=[("timestamp", -1)]
|
||||
)
|
||||
if stats:
|
||||
stats["_id"] = str(stats["_id"])
|
||||
return stats
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting latest stats: {e}")
|
||||
return None
|
||||
|
||||
async def cleanup_old_data(self, days: int = 30):
|
||||
"""Clean up old data based on retention policy"""
|
||||
try:
|
||||
cutoff_date = datetime.utcnow() - datetime.timedelta(days=days)
|
||||
|
||||
# Clean up old processed files records
|
||||
result1 = await self.db.processed_files.delete_many({
|
||||
"processed_at": {"$lt": cutoff_date}
|
||||
})
|
||||
|
||||
# Clean up old error logs
|
||||
result2 = await self.db.error_logs.delete_many({
|
||||
"timestamp": {"$lt": cutoff_date}
|
||||
})
|
||||
|
||||
# Clean up old quality reports
|
||||
result3 = await self.db.quality_reports.delete_many({
|
||||
"processing_time": {"$lt": cutoff_date}
|
||||
})
|
||||
|
||||
logger.info(f"Cleaned up old data: {result1.deleted_count} processed files, "
|
||||
f"{result2.deleted_count} error logs, {result3.deleted_count} quality reports")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error cleaning up old data: {e}")
|
||||
|
||||
# Export the database manager and service for use in other modules
|
||||
__all__ = [
|
||||
'DatabaseManager', 'DatabaseService', 'db_manager',
|
||||
'get_database', 'get_redis', 'get_db_session', 'get_redis_session'
|
||||
]
|
||||
Reference in New Issue
Block a user