From b7e734e0d29f17ad752654d2c401ec457cc01edd Mon Sep 17 00:00:00 2001 From: rafaeldpsilva Date: Wed, 10 Sep 2025 15:47:10 +0100 Subject: [PATCH] Simplify data ingestion service --- .../data-ingestion-service/Dockerfile | 3 +- .../data-ingestion-service/src/__init__.py | 1 - .../src/data_validator.py | 710 --------------- .../data-ingestion-service/src/database.py | 572 ++++-------- .../data-ingestion-service/src/ftp_monitor.py | 588 ++++-------- .../data-ingestion-service/src/main.py | 848 +++--------------- .../data-ingestion-service/src/models.py | 386 -------- .../data-ingestion-service/src/monitoring.py | 545 ----------- .../src/redis_publisher.py | 484 ---------- .../src/simple_sa4cps_config.py | 177 ---- .../src/slg_v2_processor.py | 300 ------- .../tests/test_simple_processor.py | 103 --- .../tests/verify_setup.py | 197 ---- 13 files changed, 474 insertions(+), 4440 deletions(-) delete mode 100644 microservices/data-ingestion-service/src/__init__.py delete mode 100644 microservices/data-ingestion-service/src/data_validator.py delete mode 100644 microservices/data-ingestion-service/src/models.py delete mode 100644 microservices/data-ingestion-service/src/monitoring.py delete mode 100644 microservices/data-ingestion-service/src/redis_publisher.py delete mode 100644 microservices/data-ingestion-service/src/simple_sa4cps_config.py delete mode 100644 microservices/data-ingestion-service/src/slg_v2_processor.py delete mode 100644 microservices/data-ingestion-service/tests/test_simple_processor.py delete mode 100644 microservices/data-ingestion-service/tests/verify_setup.py diff --git a/microservices/data-ingestion-service/Dockerfile b/microservices/data-ingestion-service/Dockerfile index e784297..af25270 100644 --- a/microservices/data-ingestion-service/Dockerfile +++ b/microservices/data-ingestion-service/Dockerfile @@ -39,4 +39,5 @@ HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \ CMD curl -f http://localhost:8008/health || exit 1 # Start the application from src directory -CMD ["uvicorn", "src.main:app", "--host", "0.0.0.0", "--port", "8008", "--reload"] \ No newline at end of file +WORKDIR /app/src +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8008"] \ No newline at end of file diff --git a/microservices/data-ingestion-service/src/__init__.py b/microservices/data-ingestion-service/src/__init__.py deleted file mode 100644 index e3bb3e2..0000000 --- a/microservices/data-ingestion-service/src/__init__.py +++ /dev/null @@ -1 +0,0 @@ -# Source package initialization \ No newline at end of file diff --git a/microservices/data-ingestion-service/src/data_validator.py b/microservices/data-ingestion-service/src/data_validator.py deleted file mode 100644 index c2e31a8..0000000 --- a/microservices/data-ingestion-service/src/data_validator.py +++ /dev/null @@ -1,710 +0,0 @@ -""" -Data validation and enrichment for time series data. -Provides quality assessment, metadata enrichment, and data transformation capabilities. -""" - -import asyncio -import json -import logging -import statistics -from datetime import datetime, timedelta -from typing import List, Dict, Any, Optional, Tuple -import hashlib -import re -from collections import defaultdict -import math - -logger = logging.getLogger(__name__) - -class DataValidator: - """Validates, enriches, and transforms time series data""" - - def __init__(self, db, redis_client): - self.db = db - self.redis = redis_client - self.validation_rules = {} - self.enrichment_cache = {} - self.quality_thresholds = { - "completeness": 0.8, - "accuracy": 0.9, - "consistency": 0.85, - "timeliness": 0.9 - } - - async def initialize(self): - """Initialize validator with default rules and configurations""" - try: - await self._load_validation_rules() - await self._load_enrichment_metadata() - logger.info("Data validator initialized successfully") - except Exception as e: - logger.error(f"Error initializing data validator: {e}") - raise - - async def validate_and_enrich_data(self, data: List[Dict[str, Any]], - source_name: str) -> Tuple[List[Dict[str, Any]], Dict[str, Any]]: - """Validate and enrich time series data, returning processed data and quality report""" - try: - logger.info(f"Validating and enriching {len(data)} records from {source_name}") - - # Initialize validation report - quality_report = { - "source": source_name, - "total_records": len(data), - "processed_records": 0, - "rejected_records": 0, - "quality_scores": {}, - "issues_found": [], - "processing_time": datetime.utcnow().isoformat() - } - - enriched_data = [] - - # Process each record - for i, record in enumerate(data): - try: - # Validate record - validation_result = await self._validate_record(record, source_name) - - if validation_result["is_valid"]: - # Enrich the record - enriched_record = await self._enrich_record(record, source_name, validation_result) - enriched_data.append(enriched_record) - quality_report["processed_records"] += 1 - else: - quality_report["rejected_records"] += 1 - quality_report["issues_found"].extend(validation_result["issues"]) - logger.warning(f"Record {i} rejected: {validation_result['issues']}") - - except Exception as e: - logger.error(f"Error processing record {i}: {e}") - quality_report["rejected_records"] += 1 - quality_report["issues_found"].append(f"Processing error: {str(e)}") - - # Calculate overall quality scores - quality_report["quality_scores"] = await self._calculate_quality_scores(enriched_data, quality_report) - - # Store quality report - await self._store_quality_report(quality_report, source_name) - - logger.info(f"Validation complete: {quality_report['processed_records']}/{quality_report['total_records']} records processed") - - return enriched_data, quality_report - - except Exception as e: - logger.error(f"Error in data validation and enrichment: {e}") - raise - - async def _validate_record(self, record: Dict[str, Any], source_name: str) -> Dict[str, Any]: - """Validate a single record against quality rules""" - validation_result = { - "is_valid": True, - "issues": [], - "quality_metrics": {} - } - - try: - # Check required fields - required_fields = ["sensor_id", "timestamp", "value"] - for field in required_fields: - if field not in record or record[field] is None: - validation_result["is_valid"] = False - validation_result["issues"].append(f"Missing required field: {field}") - - if not validation_result["is_valid"]: - return validation_result - - # Validate timestamp - timestamp_validation = await self._validate_timestamp(record["timestamp"]) - validation_result["quality_metrics"]["timestamp_quality"] = timestamp_validation["score"] - if not timestamp_validation["is_valid"]: - validation_result["issues"].extend(timestamp_validation["issues"]) - - # Validate numeric value - value_validation = await self._validate_numeric_value(record["value"], record.get("unit")) - validation_result["quality_metrics"]["value_quality"] = value_validation["score"] - if not value_validation["is_valid"]: - validation_result["issues"].extend(value_validation["issues"]) - - # Validate sensor ID format - sensor_validation = await self._validate_sensor_id(record["sensor_id"]) - validation_result["quality_metrics"]["sensor_id_quality"] = sensor_validation["score"] - if not sensor_validation["is_valid"]: - validation_result["issues"].extend(sensor_validation["issues"]) - - # Check for duplicates - duplicate_check = await self._check_for_duplicates(record, source_name) - validation_result["quality_metrics"]["uniqueness"] = duplicate_check["score"] - if not duplicate_check["is_unique"]: - validation_result["issues"].extend(duplicate_check["issues"]) - - # Calculate overall validity - if validation_result["issues"]: - # Allow minor issues but flag major ones - major_issues = [issue for issue in validation_result["issues"] - if "Missing required field" in issue or "Invalid" in issue] - validation_result["is_valid"] = len(major_issues) == 0 - - except Exception as e: - logger.error(f"Error validating record: {e}") - validation_result["is_valid"] = False - validation_result["issues"].append(f"Validation error: {str(e)}") - - return validation_result - - async def _enrich_record(self, record: Dict[str, Any], source_name: str, - validation_result: Dict[str, Any]) -> Dict[str, Any]: - """Enrich a record with additional metadata and derived fields""" - try: - enriched = record.copy() - - # Add validation metadata - enriched["data_quality"] = { - "quality_score": statistics.mean(validation_result["quality_metrics"].values()) if validation_result["quality_metrics"] else 0.0, - "quality_metrics": validation_result["quality_metrics"], - "validation_timestamp": datetime.utcnow().isoformat() - } - - # Add source information - enriched["source_info"] = { - "source_name": source_name, - "ingestion_time": datetime.utcnow().isoformat(), - "record_id": hashlib.md5(f"{source_name}_{record.get('sensor_id', 'unknown')}_{record.get('timestamp', 0)}".encode()).hexdigest() - } - - # Normalize timestamp format - enriched["timestamp"] = await self._normalize_timestamp(record["timestamp"]) - enriched["timestamp_iso"] = datetime.fromtimestamp(enriched["timestamp"]).isoformat() - - # Infer and enrich sensor type - sensor_type_info = await self._infer_sensor_type(record) - enriched["sensor_type"] = sensor_type_info["type"] - enriched["sensor_category"] = sensor_type_info["category"] - - # Add unit standardization - unit_info = await self._standardize_unit(record.get("unit")) - enriched["unit"] = unit_info["standard_unit"] - enriched["unit_info"] = unit_info - - # Calculate derived metrics - derived_metrics = await self._calculate_derived_metrics(enriched, source_name) - enriched["derived_metrics"] = derived_metrics - - # Add location and context information - context_info = await self._enrich_with_context(enriched, source_name) - enriched["metadata"] = {**enriched.get("metadata", {}), **context_info} - - # Add temporal features - temporal_features = await self._extract_temporal_features(enriched["timestamp"]) - enriched["temporal"] = temporal_features - - # Energy-specific enrichments - if sensor_type_info["category"] == "energy": - energy_enrichment = await self._enrich_energy_data(enriched) - enriched.update(energy_enrichment) - - return enriched - - except Exception as e: - logger.error(f"Error enriching record: {e}") - return record - - async def _validate_timestamp(self, timestamp) -> Dict[str, Any]: - """Validate timestamp format and reasonableness""" - result = {"is_valid": True, "issues": [], "score": 1.0} - - try: - # Convert to numeric timestamp - if isinstance(timestamp, str): - try: - # Try parsing ISO format - dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00')) - ts = dt.timestamp() - except: - # Try parsing as unix timestamp string - ts = float(timestamp) - else: - ts = float(timestamp) - - # Check if timestamp is reasonable (not too far in past/future) - current_time = datetime.utcnow().timestamp() - max_age = 365 * 24 * 3600 # 1 year - max_future = 24 * 3600 # 1 day - - if ts < current_time - max_age: - result["issues"].append("Timestamp too old (more than 1 year)") - result["score"] -= 0.3 - elif ts > current_time + max_future: - result["issues"].append("Timestamp too far in future") - result["score"] -= 0.3 - - # Check for reasonable precision (not too precise for energy data) - if ts != int(ts) and len(str(ts).split('.')[1]) > 3: - result["score"] -= 0.1 # Minor issue - - except (ValueError, TypeError) as e: - result["is_valid"] = False - result["issues"].append(f"Invalid timestamp format: {e}") - result["score"] = 0.0 - - return result - - async def _validate_numeric_value(self, value, unit: Optional[str] = None) -> Dict[str, Any]: - """Validate numeric value reasonableness""" - result = {"is_valid": True, "issues": [], "score": 1.0} - - try: - numeric_value = float(value) - - # Check for negative values (usually invalid for energy data) - if numeric_value < 0: - result["issues"].append("Negative energy value") - result["score"] -= 0.4 - - # Check for unreasonably large values - unit_str = (unit or "").lower() - if "wh" in unit_str: - # Energy values - if numeric_value > 100000: # >100kWh seems excessive for single reading - result["issues"].append("Unusually high energy value") - result["score"] -= 0.2 - elif "w" in unit_str: - # Power values - if numeric_value > 50000: # >50kW seems excessive - result["issues"].append("Unusually high power value") - result["score"] -= 0.2 - - # Check for zero values (might indicate sensor issues) - if numeric_value == 0: - result["score"] -= 0.1 - - # Check for NaN or infinity - if math.isnan(numeric_value) or math.isinf(numeric_value): - result["is_valid"] = False - result["issues"].append("Invalid numeric value (NaN or Infinity)") - result["score"] = 0.0 - - except (ValueError, TypeError) as e: - result["is_valid"] = False - result["issues"].append(f"Non-numeric value: {e}") - result["score"] = 0.0 - - return result - - async def _validate_sensor_id(self, sensor_id: str) -> Dict[str, Any]: - """Validate sensor ID format and consistency""" - result = {"is_valid": True, "issues": [], "score": 1.0} - - try: - if not isinstance(sensor_id, str) or len(sensor_id) == 0: - result["is_valid"] = False - result["issues"].append("Empty or invalid sensor ID") - result["score"] = 0.0 - return result - - # Check length - if len(sensor_id) < 3: - result["issues"].append("Very short sensor ID") - result["score"] -= 0.2 - elif len(sensor_id) > 50: - result["issues"].append("Very long sensor ID") - result["score"] -= 0.1 - - # Check for reasonable characters - if not re.match(r'^[a-zA-Z0-9_\-\.]+$', sensor_id): - result["issues"].append("Sensor ID contains unusual characters") - result["score"] -= 0.1 - - except Exception as e: - result["issues"].append(f"Sensor ID validation error: {e}") - result["score"] -= 0.1 - - return result - - async def _check_for_duplicates(self, record: Dict[str, Any], source_name: str) -> Dict[str, Any]: - """Check for duplicate records""" - result = {"is_unique": True, "issues": [], "score": 1.0} - - try: - # Create record signature - signature = hashlib.md5( - f"{source_name}_{record.get('sensor_id')}_{record.get('timestamp')}_{record.get('value')}".encode() - ).hexdigest() - - # Check cache for recent duplicates - cache_key = f"record_signature:{signature}" - exists = await self.redis.exists(cache_key) - - if exists: - result["is_unique"] = False - result["issues"].append("Duplicate record detected") - result["score"] = 0.0 - else: - # Store signature with short expiration (1 hour) - await self.redis.setex(cache_key, 3600, "1") - - except Exception as e: - logger.debug(f"Error checking duplicates: {e}") - # Don't fail validation for cache errors - - return result - - async def _normalize_timestamp(self, timestamp) -> int: - """Normalize timestamp to unix timestamp""" - try: - if isinstance(timestamp, str): - try: - # Try ISO format first - dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00')) - return int(dt.timestamp()) - except: - # Try as unix timestamp string - return int(float(timestamp)) - else: - return int(float(timestamp)) - except: - # Fallback to current time - return int(datetime.utcnow().timestamp()) - - async def _infer_sensor_type(self, record: Dict[str, Any]) -> Dict[str, str]: - """Infer sensor type from record data""" - sensor_id = record.get("sensor_id", "").lower() - unit = (record.get("unit", "") or "").lower() - value = record.get("value", 0) - metadata = record.get("metadata", {}) - - # Energy sensors - if "wh" in unit or "energy" in sensor_id or "consumption" in sensor_id: - return {"type": "energy", "category": "energy"} - elif "w" in unit and "wh" not in unit: - return {"type": "power", "category": "energy"} - - # Environmental sensors - elif "temp" in sensor_id or "°c" in unit or "celsius" in unit: - return {"type": "temperature", "category": "environmental"} - elif "humid" in sensor_id or "%" in unit: - return {"type": "humidity", "category": "environmental"} - elif "co2" in sensor_id or "ppm" in unit: - return {"type": "co2", "category": "environmental"} - - # Motion/occupancy sensors - elif "motion" in sensor_id or "occupancy" in sensor_id or ("motion" in str(metadata).lower()): - return {"type": "motion", "category": "occupancy"} - - # Generation sensors - elif "generation" in sensor_id or "solar" in sensor_id or "generation" in str(metadata).lower(): - return {"type": "generation", "category": "energy"} - - # Default to energy if unclear - else: - return {"type": "energy", "category": "energy"} - - async def _standardize_unit(self, unit: Optional[str]) -> Dict[str, Any]: - """Standardize unit format""" - if not unit: - return {"standard_unit": "kWh", "conversion_factor": 1.0, "unit_type": "energy"} - - unit_lower = unit.lower().strip() - - # Energy units - if unit_lower in ["kwh", "kw-h", "kw_h"]: - return {"standard_unit": "kWh", "conversion_factor": 1.0, "unit_type": "energy"} - elif unit_lower in ["wh", "w-h", "w_h"]: - return {"standard_unit": "kWh", "conversion_factor": 0.001, "unit_type": "energy"} - elif unit_lower in ["mwh", "mw-h", "mw_h"]: - return {"standard_unit": "kWh", "conversion_factor": 1000.0, "unit_type": "energy"} - - # Power units - elif unit_lower in ["kw", "kilowatt", "kilowatts"]: - return {"standard_unit": "kW", "conversion_factor": 1.0, "unit_type": "power"} - elif unit_lower in ["w", "watt", "watts"]: - return {"standard_unit": "kW", "conversion_factor": 0.001, "unit_type": "power"} - elif unit_lower in ["mw", "megawatt", "megawatts"]: - return {"standard_unit": "kW", "conversion_factor": 1000.0, "unit_type": "power"} - - # Temperature units - elif unit_lower in ["°c", "celsius", "c"]: - return {"standard_unit": "°C", "conversion_factor": 1.0, "unit_type": "temperature"} - elif unit_lower in ["°f", "fahrenheit", "f"]: - return {"standard_unit": "°C", "conversion_factor": 1.0, "unit_type": "temperature", "requires_conversion": True} - - # Default - else: - return {"standard_unit": unit, "conversion_factor": 1.0, "unit_type": "unknown"} - - async def _calculate_derived_metrics(self, record: Dict[str, Any], source_name: str) -> Dict[str, Any]: - """Calculate derived metrics from the record""" - derived = {} - - try: - value = float(record.get("value", 0)) - unit_info = record.get("unit_info", {}) - - # Apply unit conversion if needed - if unit_info.get("conversion_factor", 1.0) != 1.0: - derived["original_value"] = value - derived["converted_value"] = value * unit_info["conversion_factor"] - - # Energy-specific calculations - if unit_info.get("unit_type") == "energy": - # Estimate cost (simplified) - cost_per_kwh = 0.12 # Example rate - derived["estimated_cost"] = value * cost_per_kwh - - # Estimate CO2 emissions (simplified) - co2_per_kwh = 0.4 # kg CO2 per kWh (example grid factor) - derived["estimated_co2_kg"] = value * co2_per_kwh - - # Add value range classification - derived["value_range"] = await self._classify_value_range(value, unit_info.get("unit_type")) - - except Exception as e: - logger.debug(f"Error calculating derived metrics: {e}") - - return derived - - async def _classify_value_range(self, value: float, unit_type: str) -> str: - """Classify value into ranges for better understanding""" - if unit_type == "energy": - if value < 1: - return "very_low" - elif value < 10: - return "low" - elif value < 50: - return "medium" - elif value < 200: - return "high" - else: - return "very_high" - elif unit_type == "power": - if value < 0.5: - return "very_low" - elif value < 5: - return "low" - elif value < 20: - return "medium" - elif value < 100: - return "high" - else: - return "very_high" - else: - return "unknown" - - async def _enrich_with_context(self, record: Dict[str, Any], source_name: str) -> Dict[str, Any]: - """Enrich record with contextual information""" - context = {} - - try: - # Add geographical context if available - context["data_source"] = "real_community" - context["source_type"] = "ftp_ingestion" - - # Add data freshness - ingestion_time = datetime.utcnow() - data_time = datetime.fromtimestamp(record["timestamp"]) - context["data_age_minutes"] = (ingestion_time - data_time).total_seconds() / 60 - - # Classify data freshness - if context["data_age_minutes"] < 15: - context["freshness"] = "real_time" - elif context["data_age_minutes"] < 60: - context["freshness"] = "near_real_time" - elif context["data_age_minutes"] < 1440: # 24 hours - context["freshness"] = "recent" - else: - context["freshness"] = "historical" - - except Exception as e: - logger.debug(f"Error adding context: {e}") - - return context - - async def _extract_temporal_features(self, timestamp: int) -> Dict[str, Any]: - """Extract temporal features from timestamp""" - dt = datetime.fromtimestamp(timestamp) - - return { - "hour": dt.hour, - "day_of_week": dt.weekday(), - "day_of_month": dt.day, - "month": dt.month, - "quarter": (dt.month - 1) // 3 + 1, - "is_weekend": dt.weekday() >= 5, - "is_business_hours": 8 <= dt.hour <= 17, - "season": self._get_season(dt.month) - } - - def _get_season(self, month: int) -> str: - """Get season from month""" - if month in [12, 1, 2]: - return "winter" - elif month in [3, 4, 5]: - return "spring" - elif month in [6, 7, 8]: - return "summer" - else: - return "autumn" - - async def _enrich_energy_data(self, record: Dict[str, Any]) -> Dict[str, Any]: - """Add energy-specific enrichments""" - enrichment = {} - - try: - value = record.get("derived_metrics", {}).get("converted_value", record.get("value", 0)) - temporal = record.get("temporal", {}) - - # Energy usage patterns - if temporal.get("is_business_hours"): - enrichment["usage_pattern"] = "business_hours" - elif temporal.get("is_weekend"): - enrichment["usage_pattern"] = "weekend" - else: - enrichment["usage_pattern"] = "off_hours" - - # Demand classification - if value > 100: - enrichment["demand_level"] = "high" - elif value > 50: - enrichment["demand_level"] = "medium" - elif value > 10: - enrichment["demand_level"] = "low" - else: - enrichment["demand_level"] = "minimal" - - # Peak/off-peak classification - hour = temporal.get("hour", 0) - if 17 <= hour <= 21: # Evening peak - enrichment["tariff_period"] = "peak" - elif 22 <= hour <= 6: # Night off-peak - enrichment["tariff_period"] = "off_peak" - else: - enrichment["tariff_period"] = "standard" - - except Exception as e: - logger.debug(f"Error enriching energy data: {e}") - - return enrichment - - async def _calculate_quality_scores(self, data: List[Dict[str, Any]], quality_report: Dict[str, Any]) -> Dict[str, float]: - """Calculate overall quality scores""" - if not data: - return {"overall": 0.0, "completeness": 0.0, "accuracy": 0.0, "consistency": 0.0, "timeliness": 0.0} - - # Completeness score - total_expected_fields = len(data) * 4 # sensor_id, timestamp, value, unit - total_present_fields = sum(1 for record in data - for field in ["sensor_id", "timestamp", "value", "unit"] - if record.get(field) is not None) - completeness = total_present_fields / total_expected_fields if total_expected_fields > 0 else 0.0 - - # Accuracy score (based on validation scores) - accuracy_scores = [record.get("data_quality", {}).get("quality_score", 0) for record in data] - accuracy = statistics.mean(accuracy_scores) if accuracy_scores else 0.0 - - # Consistency score (coefficient of variation for quality scores) - if len(accuracy_scores) > 1: - std_dev = statistics.stdev(accuracy_scores) - mean_score = statistics.mean(accuracy_scores) - consistency = 1.0 - (std_dev / mean_score) if mean_score > 0 else 0.0 - else: - consistency = 1.0 - - # Timeliness score (based on data age) - current_time = datetime.utcnow().timestamp() - ages = [(current_time - record.get("timestamp", current_time)) / 3600 for record in data] # age in hours - avg_age = statistics.mean(ages) if ages else 0 - timeliness = max(0.0, 1.0 - (avg_age / 24)) # Decrease score as data gets older than 24 hours - - # Overall score - overall = statistics.mean([completeness, accuracy, consistency, timeliness]) - - return { - "overall": round(overall, 3), - "completeness": round(completeness, 3), - "accuracy": round(accuracy, 3), - "consistency": round(consistency, 3), - "timeliness": round(timeliness, 3) - } - - async def _store_quality_report(self, quality_report: Dict[str, Any], source_name: str): - """Store quality report in database""" - try: - quality_report["_id"] = f"{source_name}_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}" - await self.db.quality_reports.insert_one(quality_report) - - # Also cache in Redis for quick access - cache_key = f"quality_report:{source_name}:latest" - await self.redis.setex(cache_key, 3600, json.dumps(quality_report, default=str)) - - except Exception as e: - logger.error(f"Error storing quality report: {e}") - - async def _load_validation_rules(self): - """Load validation rules configuration""" - # Default validation rules - self.validation_rules = { - "energy": { - "min_value": 0, - "max_value": 100000, - "required_precision": 0.01 - }, - "power": { - "min_value": 0, - "max_value": 50000, - "required_precision": 0.1 - }, - "temperature": { - "min_value": -50, - "max_value": 100, - "required_precision": 0.1 - } - } - - logger.info("Loaded default validation rules") - - async def _load_enrichment_metadata(self): - """Load enrichment metadata""" - # Load any cached enrichment data - try: - cache_keys = [] - async for key in self.redis.scan_iter(match="enrichment:*"): - cache_keys.append(key) - - logger.info(f"Loaded {len(cache_keys)} enrichment cache entries") - - except Exception as e: - logger.debug(f"Error loading enrichment metadata: {e}") - - async def get_quality_summary(self, source_name: Optional[str] = None) -> Dict[str, Any]: - """Get quality summary for sources""" - try: - match_filter = {"source": source_name} if source_name else {} - - # Get recent quality reports - cursor = self.db.quality_reports.find(match_filter).sort("processing_time", -1).limit(50) - - reports = [] - async for report in cursor: - report["_id"] = str(report["_id"]) - reports.append(report) - - if not reports: - return {"message": "No quality reports found"} - - # Calculate summary statistics - avg_quality = statistics.mean([r["quality_scores"]["overall"] for r in reports]) - total_processed = sum([r["processed_records"] for r in reports]) - total_rejected = sum([r["rejected_records"] for r in reports]) - - return { - "total_reports": len(reports), - "average_quality": round(avg_quality, 3), - "total_processed_records": total_processed, - "total_rejected_records": total_rejected, - "success_rate": round(total_processed / (total_processed + total_rejected) * 100, 2) if (total_processed + total_rejected) > 0 else 0, - "latest_report": reports[0] if reports else None - } - - except Exception as e: - logger.error(f"Error getting quality summary: {e}") - return {"error": str(e)} \ No newline at end of file diff --git a/microservices/data-ingestion-service/src/database.py b/microservices/data-ingestion-service/src/database.py index dca8f61..6eccbe0 100644 --- a/microservices/data-ingestion-service/src/database.py +++ b/microservices/data-ingestion-service/src/database.py @@ -1,433 +1,245 @@ +#!/usr/bin/env python3 """ -Database configuration and connection management for the data ingestion service. -Handles MongoDB connections, index creation, and Redis connections. +MongoDB Database Manager for SA4CPS Data Ingestion +Simple async MongoDB operations for storing .slg_v2 file data """ import asyncio import logging -from typing import Optional -from contextlib import asynccontextmanager -import os from datetime import datetime +from typing import List, Dict, Any, Optional +from motor.motor_asyncio import AsyncIOMotorClient +from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError -import motor.motor_asyncio -import redis.asyncio as redis -from pymongo import IndexModel - -from .models import ( - DataSourceSchema, ProcessedFileSchema, QualityReportSchema, - IngestionStatsSchema, ErrorLogSchema, MonitoringAlertSchema -) +from config import MONGO_CONFIG logger = logging.getLogger(__name__) + class DatabaseManager: - """Manages database connections and operations""" + """Manages MongoDB connections and operations for SA4CPS data""" - def __init__(self, mongodb_url: str = None, redis_url: str = None): - self.mongodb_url = mongodb_url or os.getenv("MONGODB_URL", "mongodb://localhost:27017") - self.redis_url = redis_url or os.getenv("REDIS_URL", "redis://localhost:6379") + def __init__(self): + self.client: Optional[AsyncIOMotorClient] = None + self.db = None + self.collections = {} - self.mongodb_client: Optional[motor.motor_asyncio.AsyncIOMotorClient] = None - self.db: Optional[motor.motor_asyncio.AsyncIOMotorDatabase] = None - self.redis_client: Optional[redis.Redis] = None + # MongoDB configuration + self.connection_string = MONGO_CONFIG["connection_string"] + self.database_name = MONGO_CONFIG["database_name"] - self._connection_status = { - "mongodb": False, - "redis": False, - "last_check": None - } + logger.info(f"Database manager initialized for: {self.database_name}") async def connect(self): - """Establish connections to MongoDB and Redis""" - try: - await self._connect_mongodb() - await self._connect_redis() - await self._create_indexes() - - logger.info("Database connections established successfully") - - except Exception as e: - logger.error(f"Error establishing database connections: {e}") - raise - - async def _connect_mongodb(self): """Connect to MongoDB""" try: - # Parse database name from URL or use default - db_name = "energy_dashboard" - if self.mongodb_url.count("/") > 2: - db_name = self.mongodb_url.split("/")[-1] - - self.mongodb_client = motor.motor_asyncio.AsyncIOMotorClient( - self.mongodb_url, - serverSelectionTimeoutMS=5000, - connectTimeoutMS=5000, - maxPoolSize=50, - minPoolSize=10 - ) - - self.db = self.mongodb_client[db_name] + self.client = AsyncIOMotorClient(self.connection_string) # Test connection - await self.mongodb_client.admin.command('ping') + await self.client.admin.command('ping') - self._connection_status["mongodb"] = True - logger.info(f"Connected to MongoDB: {self.mongodb_url}") + # Get database and collections + self.db = self.client[self.database_name] + self.collections = { + 'files': self.db.sa4cps_files, + 'energy_data': self.db.sa4cps_energy_data, + 'metadata': self.db.sa4cps_metadata + } - except Exception as e: - self._connection_status["mongodb"] = False - logger.error(f"MongoDB connection failed: {e}") + # Create indexes for better performance + await self._create_indexes() + + logger.info(f"Connected to MongoDB: {self.database_name}") + + except (ConnectionFailure, ServerSelectionTimeoutError) as e: + logger.error(f"Failed to connect to MongoDB: {e}") raise - async def _connect_redis(self): - """Connect to Redis""" - try: - self.redis_client = redis.from_url( - self.redis_url, - encoding="utf-8", - decode_responses=True, - socket_timeout=5, - socket_connect_timeout=5, - health_check_interval=30 - ) - - # Test connection - await self.redis_client.ping() - - self._connection_status["redis"] = True - logger.info(f"Connected to Redis: {self.redis_url}") - - except Exception as e: - self._connection_status["redis"] = False - logger.error(f"Redis connection failed: {e}") - raise + async def close(self): + """Close MongoDB connection""" + if self.client: + self.client.close() + logger.info("MongoDB connection closed") + + async def ping(self): + """Test database connection""" + if not self.client: + raise ConnectionFailure("No database connection") + + await self.client.admin.command('ping') async def _create_indexes(self): - """Create database indexes for optimal performance""" + """Create database indexes for efficient queries""" try: - schemas = [ - DataSourceSchema, - ProcessedFileSchema, - QualityReportSchema, - IngestionStatsSchema, - ErrorLogSchema, - MonitoringAlertSchema - ] + # Index on files collection + await self.collections['files'].create_index("filename", unique=True) + await self.collections['files'].create_index("processed_at") - for schema in schemas: - collection = self.db[schema.collection_name] - indexes = schema.get_indexes() - - if indexes: - index_models = [] - for index_spec in indexes: - keys = index_spec["keys"] - options = {k: v for k, v in index_spec.items() if k != "keys"} - index_models.append(IndexModel(keys, **options)) - - await collection.create_indexes(index_models) - logger.debug(f"Created {len(index_models)} indexes for {schema.collection_name}") + # Index on energy data collection + await self.collections['energy_data'].create_index([("filename", 1), ("timestamp", 1)]) + await self.collections['energy_data'].create_index("timestamp") logger.info("Database indexes created successfully") except Exception as e: - logger.error(f"Error creating database indexes: {e}") - # Don't raise here - indexes are performance optimization, not critical + logger.warning(f"Failed to create indexes: {e}") - async def disconnect(self): - """Close all database connections""" + async def store_file_data(self, filename: str, records: List[Dict[str, Any]]) -> bool: + """Store processed .slg_v2 file data in MongoDB""" try: - if self.redis_client: - await self.redis_client.aclose() - self._connection_status["redis"] = False + current_time = datetime.now() - if self.mongodb_client: - self.mongodb_client.close() - self._connection_status["mongodb"] = False - - logger.info("Database connections closed") - - except Exception as e: - logger.error(f"Error closing database connections: {e}") - - async def health_check(self) -> dict: - """Check health of database connections""" - health = { - "mongodb": False, - "redis": False, - "timestamp": datetime.utcnow().isoformat(), - "details": {} - } - - # Check MongoDB - try: - if self.mongodb_client: - start_time = asyncio.get_event_loop().time() - await self.mongodb_client.admin.command('ping') - response_time = (asyncio.get_event_loop().time() - start_time) * 1000 - - health["mongodb"] = True - health["details"]["mongodb"] = { - "status": "healthy", - "response_time_ms": round(response_time, 2), - "server_info": await self.mongodb_client.server_info() - } - - except Exception as e: - health["details"]["mongodb"] = { - "status": "unhealthy", - "error": str(e) + # Store file metadata + file_metadata = { + "filename": filename, + "record_count": len(records), + "processed_at": current_time, + "file_size": sum(len(str(record)) for record in records), + "status": "processed" } - - # Check Redis - try: - if self.redis_client: - start_time = asyncio.get_event_loop().time() - await self.redis_client.ping() - response_time = (asyncio.get_event_loop().time() - start_time) * 1000 - - redis_info = await self.redis_client.info() - - health["redis"] = True - health["details"]["redis"] = { - "status": "healthy", - "response_time_ms": round(response_time, 2), - "version": redis_info.get("redis_version"), - "connected_clients": redis_info.get("connected_clients"), - "used_memory_human": redis_info.get("used_memory_human") - } - except Exception as e: - health["details"]["redis"] = { - "status": "unhealthy", - "error": str(e) - } - - # Update connection status - self._connection_status.update({ - "mongodb": health["mongodb"], - "redis": health["redis"], - "last_check": datetime.utcnow() - }) - - return health - - @property - def is_connected(self) -> bool: - """Check if all required connections are established""" - return self._connection_status["mongodb"] and self._connection_status["redis"] - - @property - def data_sources(self): - """Data sources collection""" - return self.db[DataSourceSchema.collection_name] - - @property - def processed_files(self): - """Processed files collection""" - return self.db[ProcessedFileSchema.collection_name] - - @property - def quality_reports(self): - """Quality reports collection""" - return self.db[QualityReportSchema.collection_name] - - @property - def ingestion_stats(self): - """Ingestion statistics collection""" - return self.db[IngestionStatsSchema.collection_name] - - @property - def error_logs(self): - """Error logs collection""" - return self.db[ErrorLogSchema.collection_name] - - @property - def monitoring_alerts(self): - """Monitoring alerts collection""" - return self.db[MonitoringAlertSchema.collection_name] - -# Global database manager instance -db_manager = DatabaseManager() - -async def get_database(): - """Dependency function to get database instance""" - if not db_manager.is_connected: - await db_manager.connect() - return db_manager.db - -async def get_redis(): - """Dependency function to get Redis client""" - if not db_manager.is_connected: - await db_manager.connect() - return db_manager.redis_client - -@asynccontextmanager -async def get_db_session(): - """Context manager for database operations""" - try: - if not db_manager.is_connected: - await db_manager.connect() - yield db_manager.db - except Exception as e: - logger.error(f"Database session error: {e}") - raise - finally: - # Connection pooling handles cleanup automatically - pass - -@asynccontextmanager -async def get_redis_session(): - """Context manager for Redis operations""" - try: - if not db_manager.is_connected: - await db_manager.connect() - yield db_manager.redis_client - except Exception as e: - logger.error(f"Redis session error: {e}") - raise - finally: - # Connection pooling handles cleanup automatically - pass - -class DatabaseService: - """High-level database service with common operations""" - - def __init__(self, db, redis_client): - self.db = db - self.redis = redis_client - - async def create_data_source(self, source_data: dict) -> str: - """Create a new data source""" - try: - source_data["created_at"] = datetime.utcnow() - source_data["updated_at"] = datetime.utcnow() - source_data["status"] = "active" - source_data["error_count"] = 0 - source_data["total_files_processed"] = 0 - - result = await self.db.data_sources.insert_one(source_data) - return str(result.inserted_id) - - except Exception as e: - logger.error(f"Error creating data source: {e}") - raise - - async def get_data_source(self, source_id: str) -> Optional[dict]: - """Get data source by ID""" - try: - from bson import ObjectId - source = await self.db.data_sources.find_one({"_id": ObjectId(source_id)}) - if source: - source["_id"] = str(source["_id"]) - return source - - except Exception as e: - logger.error(f"Error getting data source: {e}") - return None - - async def update_data_source(self, source_id: str, update_data: dict) -> bool: - """Update data source""" - try: - from bson import ObjectId - update_data["updated_at"] = datetime.utcnow() - - result = await self.db.data_sources.update_one( - {"_id": ObjectId(source_id)}, - {"$set": update_data} - ) - - return result.modified_count > 0 - - except Exception as e: - logger.error(f"Error updating data source: {e}") - return False - - async def list_data_sources(self, enabled_only: bool = False) -> list: - """List all data sources""" - try: - query = {"enabled": True} if enabled_only else {} - cursor = self.db.data_sources.find(query).sort("created_at", -1) - - sources = [] - async for source in cursor: - source["_id"] = str(source["_id"]) - sources.append(source) - - return sources - - except Exception as e: - logger.error(f"Error listing data sources: {e}") - return [] - - async def log_error(self, error_data: dict): - """Log an error to the database""" - try: - error_data["timestamp"] = datetime.utcnow() - await self.db.error_logs.insert_one(error_data) - - except Exception as e: - logger.error(f"Error logging error: {e}") - - async def update_ingestion_stats(self, stats_data: dict): - """Update daily ingestion statistics""" - try: - today = datetime.utcnow().strftime("%Y-%m-%d") - stats_data["date"] = today - stats_data["timestamp"] = datetime.utcnow() - - await self.db.ingestion_stats.update_one( - {"date": today}, - {"$set": stats_data}, + # Insert or update file record + await self.collections['files'].replace_one( + {"filename": filename}, + file_metadata, upsert=True ) + # Add filename and processed timestamp to each record + for record in records: + record["filename"] = filename + record["processed_at"] = current_time + + # Insert energy data records + if records: + result = await self.collections['energy_data'].insert_many(records) + inserted_count = len(result.inserted_ids) + logger.info(f"Stored {inserted_count} records from {filename}") + return True + + return False + except Exception as e: - logger.error(f"Error updating ingestion stats: {e}") - - async def get_latest_stats(self) -> Optional[dict]: - """Get latest ingestion statistics""" - try: - stats = await self.db.ingestion_stats.find_one( - sort=[("timestamp", -1)] + logger.error(f"Error storing data for {filename}: {e}") + + # Store error metadata + error_metadata = { + "filename": filename, + "processed_at": current_time, + "status": "error", + "error_message": str(e) + } + + await self.collections['files'].replace_one( + {"filename": filename}, + error_metadata, + upsert=True ) - if stats: - stats["_id"] = str(stats["_id"]) + + return False + + async def get_processed_files(self) -> List[str]: + """Get list of successfully processed files""" + try: + cursor = self.collections['files'].find( + {"status": "processed"}, + {"filename": 1, "_id": 0} + ) + + files = [] + async for doc in cursor: + files.append(doc["filename"]) + + return files + + except Exception as e: + logger.error(f"Error getting processed files: {e}") + return [] + + async def get_file_info(self, filename: str) -> Optional[Dict[str, Any]]: + """Get information about a specific file""" + try: + return await self.collections['files'].find_one({"filename": filename}) + except Exception as e: + logger.error(f"Error getting file info for {filename}: {e}") + return None + + async def get_stats(self) -> Dict[str, Any]: + """Get database statistics""" + try: + stats = { + "database": self.database_name, + "timestamp": datetime.now().isoformat() + } + + # Count documents in each collection + for name, collection in self.collections.items(): + try: + count = await collection.count_documents({}) + stats[f"{name}_count"] = count + except Exception as e: + stats[f"{name}_count"] = f"error: {e}" + + # Get recent files + try: + recent_files = [] + cursor = self.collections['files'].find( + {}, + {"filename": 1, "processed_at": 1, "record_count": 1, "status": 1, "_id": 0} + ).sort("processed_at", -1).limit(5) + + async for doc in cursor: + if doc.get("processed_at"): + doc["processed_at"] = doc["processed_at"].isoformat() + recent_files.append(doc) + + stats["recent_files"] = recent_files + + except Exception as e: + stats["recent_files"] = f"error: {e}" + return stats except Exception as e: - logger.error(f"Error getting latest stats: {e}") - return None + logger.error(f"Error getting database stats: {e}") + return {"error": str(e), "timestamp": datetime.now().isoformat()} - async def cleanup_old_data(self, days: int = 30): - """Clean up old data based on retention policy""" + async def get_energy_data(self, + filename: Optional[str] = None, + start_time: Optional[datetime] = None, + end_time: Optional[datetime] = None, + limit: int = 100) -> List[Dict[str, Any]]: + """Retrieve energy data with optional filtering""" try: - cutoff_date = datetime.utcnow() - datetime.timedelta(days=days) + query = {} - # Clean up old processed files records - result1 = await self.db.processed_files.delete_many({ - "processed_at": {"$lt": cutoff_date} - }) + if filename: + query["filename"] = filename - # Clean up old error logs - result2 = await self.db.error_logs.delete_many({ - "timestamp": {"$lt": cutoff_date} - }) + if start_time or end_time: + time_query = {} + if start_time: + time_query["$gte"] = start_time + if end_time: + time_query["$lte"] = end_time + query["timestamp"] = time_query - # Clean up old quality reports - result3 = await self.db.quality_reports.delete_many({ - "processing_time": {"$lt": cutoff_date} - }) + cursor = self.collections['energy_data'].find(query).sort("timestamp", -1).limit(limit) - logger.info(f"Cleaned up old data: {result1.deleted_count} processed files, " - f"{result2.deleted_count} error logs, {result3.deleted_count} quality reports") + data = [] + async for doc in cursor: + # Convert ObjectId to string and datetime to ISO string + if "_id" in doc: + doc["_id"] = str(doc["_id"]) + if "timestamp" in doc and hasattr(doc["timestamp"], "isoformat"): + doc["timestamp"] = doc["timestamp"].isoformat() + if "processed_at" in doc and hasattr(doc["processed_at"], "isoformat"): + doc["processed_at"] = doc["processed_at"].isoformat() + + data.append(doc) + + return data except Exception as e: - logger.error(f"Error cleaning up old data: {e}") - -# Export the database manager and service for use in other modules -__all__ = [ - 'DatabaseManager', 'DatabaseService', 'db_manager', - 'get_database', 'get_redis', 'get_db_session', 'get_redis_session' -] \ No newline at end of file + logger.error(f"Error retrieving energy data: {e}") + return [] \ No newline at end of file diff --git a/microservices/data-ingestion-service/src/ftp_monitor.py b/microservices/data-ingestion-service/src/ftp_monitor.py index 0a173df..406cec8 100644 --- a/microservices/data-ingestion-service/src/ftp_monitor.py +++ b/microservices/data-ingestion-service/src/ftp_monitor.py @@ -1,445 +1,209 @@ +#!/usr/bin/env python3 """ -FTP monitoring component for detecting and downloading new time series data files. -Handles multiple FTP servers with different configurations and file patterns. +FTP Monitor for SA4CPS .slg_v2 files +Monitors ftp.sa4cps.pt for new monthly files """ import asyncio import ftplib -import ftputil -from ftputil import FTPHost +import logging +import os from datetime import datetime, timedelta from typing import List, Dict, Any, Optional -import logging -import io -import os -import hashlib -import json -from pathlib import Path -import re -import ssl +from dataclasses import dataclass +import tempfile + +from config import FTP_CONFIG +from slg_processor import SLGProcessor logger = logging.getLogger(__name__) + +@dataclass +class FTPFileInfo: + """Information about an FTP file""" + path: str + name: str + size: int + modified_time: Optional[datetime] = None + + class FTPMonitor: - """Monitors FTP servers for new time series data files""" + """Monitors SA4CPS FTP server for new .slg_v2 files""" - def __init__(self, db, redis_client): - self.db = db - self.redis = redis_client - self.download_cache = {} # Cache for downloaded files - self.connection_pool = {} # Pool of FTP connections + def __init__(self, db_manager): + self.db_manager = db_manager + self.processor = SLGProcessor() + self.last_check: Optional[datetime] = None + self.processed_files: set = set() + self.files_processed_count = 0 + self.status = "initializing" + + # FTP connection settings + self.ftp_host = FTP_CONFIG["host"] + self.ftp_user = FTP_CONFIG["username"] + self.ftp_pass = FTP_CONFIG["password"] + self.base_path = FTP_CONFIG["base_path"] + + # Check interval: 6 hours (files are monthly, so frequent checks aren't needed) + self.check_interval = FTP_CONFIG.get("check_interval", 6 * 3600) # 6 hours + + logger.info(f"FTP Monitor initialized for {self.ftp_host}") - async def check_for_new_files(self, source: Dict[str, Any]) -> List[Dict[str, Any]]: - """Check FTP server for new files matching the configured patterns""" - try: - ftp_config = source.get("ftp_config", {}) - file_patterns = source.get("file_patterns", ["*.csv"]) - - if not ftp_config: - logger.warning(f"No FTP config for source: {source['name']}") - return [] - - # Connect to FTP server - ftp_host = await self._get_ftp_connection(source) - if not ftp_host: - return [] - - new_files = [] - remote_path = ftp_config.get("remote_path", "/") - + async def start_monitoring(self): + """Start the monitoring loop""" + self.status = "running" + logger.info("Starting FTP monitoring loop") + + while True: try: - # List files in remote directory - file_list = await self._list_remote_files(ftp_host, remote_path) + await self.check_for_new_files() + self.status = "running" - # Filter files by patterns and check if they're new - for file_info in file_list: - filename = file_info["filename"] - - # Check if file matches any pattern - if self._matches_patterns(filename, file_patterns): - - # Check if file is new (not processed before) - if await self._is_new_file(source, file_info): - new_files.append(file_info) - logger.info(f"Found new file: {filename}") - - # Update last check timestamp - await self.db.data_sources.update_one( - {"_id": source["_id"]}, - {"$set": {"last_check": datetime.utcnow()}} - ) + # Wait for next check (6 hours) + logger.info(f"Waiting {self.check_interval/3600:.1f} hours until next check") + await asyncio.sleep(self.check_interval) except Exception as e: - logger.error(f"Error listing files from FTP: {e}") - await self._close_ftp_connection(source["_id"]) - - return new_files - - except Exception as e: - logger.error(f"Error checking for new files in source {source['name']}: {e}") - return [] + self.status = "error" + logger.error(f"Error in monitoring loop: {e}") + # Wait 30 minutes before retrying on error + await asyncio.sleep(1800) - async def download_file(self, source: Dict[str, Any], file_info: Dict[str, Any]) -> bytes: - """Download a file from FTP server""" + async def check_for_new_files(self) -> Dict[str, Any]: + """Check FTP server for new .slg_v2 files""" + self.last_check = datetime.now() + logger.info(f"Checking FTP server at {self.last_check}") + try: - ftp_host = await self._get_ftp_connection(source) - if not ftp_host: - raise Exception("Cannot establish FTP connection") - - filename = file_info["filename"] - remote_path = source["ftp_config"].get("remote_path", "/") - full_path = f"{remote_path.rstrip('/')}/{filename}" - - logger.info(f"Downloading file: {full_path}") - - # Download file content - file_content = await self._download_file_content(ftp_host, full_path) - - # Mark file as processed - await self._mark_file_processed(source, file_info) - - # Cache file info for future reference - await self._cache_file_info(source, file_info, len(file_content)) - - logger.info(f"Successfully downloaded {filename} ({len(file_content)} bytes)") - return file_content - + # Connect to FTP server + with ftplib.FTP(self.ftp_host) as ftp: + ftp.login(self.ftp_user, self.ftp_pass) + logger.info(f"Connected to FTP server: {self.ftp_host}") + + # Find .slg_v2 files + new_files = await self._find_slg_files(ftp) + + # Process new files + processed_count = 0 + for file_info in new_files: + if file_info.path not in self.processed_files: + success = await self._process_file(ftp, file_info) + if success: + self.processed_files.add(file_info.path) + processed_count += 1 + self.files_processed_count += 1 + + result = { + "files_found": len(new_files), + "files_processed": processed_count, + "timestamp": self.last_check.isoformat() + } + + logger.info(f"Check complete: {result}") + return result + except Exception as e: - logger.error(f"Error downloading file {file_info.get('filename', 'unknown')}: {e}") + logger.error(f"FTP check failed: {e}") raise - async def test_connection(self, source: Dict[str, Any]) -> bool: - """Test FTP connection for a data source""" + async def _find_slg_files(self, ftp: ftplib.FTP) -> List[FTPFileInfo]: + """Find .slg_v2 files in the FTP directory structure""" + files = [] + try: - ftp_config = source.get("ftp_config", {}) - if not ftp_config: - return False + # Navigate to base path + ftp.cwd(self.base_path) + logger.info(f"Scanning directory: {self.base_path}") - # Try to establish connection - ftp_host = await self._create_ftp_connection(ftp_config) - if ftp_host: - # Try to list remote directory - remote_path = ftp_config.get("remote_path", "/") - try: - await self._list_remote_files(ftp_host, remote_path, limit=1) - success = True - except: - success = False - - # Close connection - try: - await asyncio.get_event_loop().run_in_executor( - None, ftp_host.close - ) - except: - pass - - return success + # Get directory listing + dir_list = [] + ftp.retrlines('LIST', dir_list.append) - return False - - except Exception as e: - logger.error(f"Error testing FTP connection: {e}") - return False - - async def get_file_metadata(self, source: Dict[str, Any], filename: str) -> Optional[Dict[str, Any]]: - """Get metadata for a specific file""" - try: - ftp_host = await self._get_ftp_connection(source) - if not ftp_host: - return None - - remote_path = source["ftp_config"].get("remote_path", "/") - full_path = f"{remote_path.rstrip('/')}/{filename}" - - # Get file stats - def get_file_stat(): - try: - return ftp_host.stat(full_path) - except: - return None - - stat_info = await asyncio.get_event_loop().run_in_executor(None, get_file_stat) - - if stat_info: - return { - "filename": filename, - "size": stat_info.st_size, - "modified_time": datetime.fromtimestamp(stat_info.st_mtime), - "full_path": full_path - } - - return None - - except Exception as e: - logger.error(f"Error getting file metadata for {filename}: {e}") - return None - - async def _get_ftp_connection(self, source: Dict[str, Any]): - """Get or create FTP connection for a source""" - source_id = str(source["_id"]) - - # Check if we have a cached connection - if source_id in self.connection_pool: - connection = self.connection_pool[source_id] - try: - # Test if connection is still alive - await asyncio.get_event_loop().run_in_executor( - None, lambda: connection.getcwd() - ) - return connection - except: - # Connection is dead, remove from pool - del self.connection_pool[source_id] - - # Create new connection - ftp_config = source.get("ftp_config", {}) - connection = await self._create_ftp_connection(ftp_config) - - if connection: - self.connection_pool[source_id] = connection - - return connection - - async def _create_ftp_connection(self, ftp_config: Dict[str, Any]): - """Create a new FTP connection""" - try: - host = ftp_config.get("host") - port = ftp_config.get("port", 21) - username = ftp_config.get("username", "anonymous") - password = ftp_config.get("password", "") - use_ssl = ftp_config.get("use_ssl", False) - passive_mode = ftp_config.get("passive_mode", True) - - if not host: - raise ValueError("FTP host not specified") - - def create_connection(): - if use_ssl: - # Use FTPS (FTP over SSL/TLS) - ftp = ftplib.FTP_TLS() - ftp.connect(host, port) - ftp.login(username, password) - ftp.prot_p() # Enable protection for data channel - else: - # Use regular FTP - ftp = ftplib.FTP() - ftp.connect(host, port) - ftp.login(username, password) - - ftp.set_pasv(passive_mode) - - # Create FTPHost wrapper for easier file operations - ftp_host = FTPHost.from_ftp_client(ftp) - return ftp_host - - # Create connection in thread pool to avoid blocking - ftp_host = await asyncio.get_event_loop().run_in_executor( - None, create_connection - ) - - logger.info(f"Successfully connected to FTP server: {host}:{port}") - return ftp_host - - except Exception as e: - logger.error(f"Error creating FTP connection to {ftp_config.get('host', 'unknown')}: {e}") - return None - - async def _close_ftp_connection(self, source_id: str): - """Close FTP connection for a source""" - if source_id in self.connection_pool: - try: - connection = self.connection_pool[source_id] - await asyncio.get_event_loop().run_in_executor( - None, connection.close - ) - except: - pass - finally: - del self.connection_pool[source_id] - - async def _list_remote_files(self, ftp_host, remote_path: str, limit: Optional[int] = None) -> List[Dict[str, Any]]: - """List files in remote FTP directory""" - def list_files(): - files = [] - try: - # Change to remote directory - ftp_host.chdir(remote_path) - - # Get file list with details - file_list = ftp_host.listdir(".") - - for filename in file_list: - try: - # Get file stats - file_path = f"{remote_path.rstrip('/')}/{filename}" - stat_info = ftp_host.stat(filename) - - # Skip directories - if not ftp_host.path.isfile(filename): - continue - - file_info = { - "filename": filename, - "full_path": file_path, - "size": stat_info.st_size, - "modified_time": datetime.fromtimestamp(stat_info.st_mtime), - "created_time": datetime.fromtimestamp(stat_info.st_ctime) if hasattr(stat_info, 'st_ctime') else None - } - - files.append(file_info) - - if limit and len(files) >= limit: - break + for line in dir_list: + parts = line.split() + if len(parts) >= 9: + filename = parts[-1] + + # Check if it's a .slg_v2 file + if filename.endswith('.slg_v2'): + try: + size = int(parts[4]) + full_path = f"{self.base_path.rstrip('/')}/{filename}" - except Exception as e: - logger.warning(f"Error getting stats for file {filename}: {e}") - continue - - except Exception as e: - logger.error(f"Error listing directory {remote_path}: {e}") - raise + files.append(FTPFileInfo( + path=full_path, + name=filename, + size=size + )) + + except (ValueError, IndexError): + logger.warning(f"Could not parse file info for: {filename}") + logger.info(f"Found {len(files)} .slg_v2 files") return files - - return await asyncio.get_event_loop().run_in_executor(None, list_files) - - async def _download_file_content(self, ftp_host, file_path: str) -> bytes: - """Download file content from FTP server""" - def download(): - bio = io.BytesIO() - try: - ftp_host.download(file_path, bio) - bio.seek(0) - return bio.read() - finally: - bio.close() - - return await asyncio.get_event_loop().run_in_executor(None, download) - - def _matches_patterns(self, filename: str, patterns: List[str]) -> bool: - """Check if filename matches any of the specified patterns""" - for pattern in patterns: - # Convert shell pattern to regex - regex_pattern = pattern.replace("*", ".*").replace("?", ".") - if re.match(regex_pattern, filename, re.IGNORECASE): - return True - return False - - async def _is_new_file(self, source: Dict[str, Any], file_info: Dict[str, Any]) -> bool: - """Check if file is new (hasn't been processed before)""" - try: - filename = file_info["filename"] - file_size = file_info["size"] - modified_time = file_info["modified_time"] - - # Create file signature - file_signature = hashlib.md5( - f"{filename}_{file_size}_{modified_time.timestamp()}".encode() - ).hexdigest() - - # Check if we've processed this file before - processed_file = await self.db.processed_files.find_one({ - "source_id": source["_id"], - "file_signature": file_signature - }) - - return processed_file is None except Exception as e: - logger.error(f"Error checking if file is new: {e}") - return True # Assume it's new if we can't check - - async def _mark_file_processed(self, source: Dict[str, Any], file_info: Dict[str, Any]): - """Mark file as processed""" - try: - filename = file_info["filename"] - file_size = file_info["size"] - modified_time = file_info["modified_time"] - - # Create file signature - file_signature = hashlib.md5( - f"{filename}_{file_size}_{modified_time.timestamp()}".encode() - ).hexdigest() - - # Record processed file - processed_record = { - "source_id": source["_id"], - "source_name": source["name"], - "filename": filename, - "file_signature": file_signature, - "file_size": file_size, - "modified_time": modified_time, - "processed_at": datetime.utcnow() - } - - await self.db.processed_files.insert_one(processed_record) - - except Exception as e: - logger.error(f"Error marking file as processed: {e}") - - async def _cache_file_info(self, source: Dict[str, Any], file_info: Dict[str, Any], content_size: int): - """Cache file information for monitoring""" - try: - cache_key = f"file_cache:{source['_id']}:{file_info['filename']}" - cache_data = { - "filename": file_info["filename"], - "size": file_info["size"], - "content_size": content_size, - "downloaded_at": datetime.utcnow().isoformat(), - "source_name": source["name"] - } - - # Store in Redis with 7-day expiration - await self.redis.setex( - cache_key, - 7 * 24 * 3600, # 7 days - json.dumps(cache_data) - ) - - except Exception as e: - logger.error(f"Error caching file info: {e}") - - async def get_processing_history(self, source_id: str, limit: int = 50) -> List[Dict[str, Any]]: - """Get processing history for a data source""" - try: - cursor = self.db.processed_files.find( - {"source_id": source_id} - ).sort("processed_at", -1).limit(limit) - - history = [] - async for record in cursor: - record["_id"] = str(record["_id"]) - record["source_id"] = str(record["source_id"]) - if "processed_at" in record: - record["processed_at"] = record["processed_at"].isoformat() - if "modified_time" in record: - record["modified_time"] = record["modified_time"].isoformat() - history.append(record) - - return history - - except Exception as e: - logger.error(f"Error getting processing history: {e}") + logger.error(f"Error scanning FTP directory: {e}") return [] - async def cleanup_old_records(self, days: int = 30): - """Clean up old processed file records""" - try: - cutoff_date = datetime.utcnow() - timedelta(days=days) - - result = await self.db.processed_files.delete_many({ - "processed_at": {"$lt": cutoff_date} - }) - - logger.info(f"Cleaned up {result.deleted_count} old processed file records") - - except Exception as e: - logger.error(f"Error cleaning up old records: {e}") - - async def close_all_connections(self): - """Close all FTP connections""" - for source_id in list(self.connection_pool.keys()): - await self._close_ftp_connection(source_id) + async def _process_file(self, ftp: ftplib.FTP, file_info: FTPFileInfo) -> bool: + """Download and process a .slg_v2 file""" + logger.info(f"Processing file: {file_info.name} ({file_info.size} bytes)") - logger.info("Closed all FTP connections") \ No newline at end of file + try: + # Create temporary file for download + with tempfile.NamedTemporaryFile(mode='wb', suffix='.slg_v2', delete=False) as temp_file: + temp_path = temp_file.name + + # Download file + with open(temp_path, 'wb') as f: + ftp.retrbinary(f'RETR {file_info.name}', f.write) + + # Process the downloaded file + records = await self.processor.process_file(temp_path, file_info.name) + + # Store in database + if records: + await self.db_manager.store_file_data(file_info.name, records) + logger.info(f"Stored {len(records)} records from {file_info.name}") + return True + else: + logger.warning(f"No valid records found in {file_info.name}") + return False + + except Exception as e: + logger.error(f"Error processing file {file_info.name}: {e}") + return False + + finally: + # Clean up temporary file + try: + if 'temp_path' in locals(): + os.unlink(temp_path) + except OSError: + pass + + def get_status(self) -> str: + """Get current monitor status""" + return self.status + + def get_last_check_time(self) -> Optional[str]: + """Get last check time as ISO string""" + return self.last_check.isoformat() if self.last_check else None + + def get_processed_count(self) -> int: + """Get total number of files processed""" + return self.files_processed_count + + def get_detailed_status(self) -> Dict[str, Any]: + """Get detailed status information""" + return { + "status": self.status, + "last_check": self.get_last_check_time(), + "files_processed": self.files_processed_count, + "processed_files_count": len(self.processed_files), + "check_interval_hours": self.check_interval / 3600, + "ftp_host": self.ftp_host, + "base_path": self.base_path + } \ No newline at end of file diff --git a/microservices/data-ingestion-service/src/main.py b/microservices/data-ingestion-service/src/main.py index 74a0ecc..213e545 100644 --- a/microservices/data-ingestion-service/src/main.py +++ b/microservices/data-ingestion-service/src/main.py @@ -1,779 +1,139 @@ """ -Data Ingestion Service -Monitors FTP servers for new time series data from real communities and publishes to Redis. -Provides realistic data feeds for simulation and analytics. -Port: 8008 +SA4CPS Data Ingestion Service +Simple FTP monitoring service for .slg_v2 files with MongoDB storage """ -import asyncio -from datetime import datetime, timedelta -from fastapi import FastAPI, HTTPException, Depends, BackgroundTasks -from fastapi.middleware.cors import CORSMiddleware +from fastapi import FastAPI, HTTPException from contextlib import asynccontextmanager +import asyncio import logging -from typing import List, Optional, Dict, Any -import json -from bson import ObjectId +from datetime import datetime +from typing import Dict, Any -from models import ( - DataSourceCreate, DataSourceUpdate, DataSourceResponse, - FileProcessingRequest, FileProcessingResponse, IngestionStats, - HealthStatus, QualityReport, TopicInfo, PublishingStats -) -from database import db_manager, get_database, get_redis, DatabaseService from ftp_monitor import FTPMonitor -from slg_v2_processor import SLGv2Processor -from redis_publisher import RedisPublisher -from data_validator import DataValidator -from monitoring import ServiceMonitor, PerformanceMonitor, ErrorHandler +from database import DatabaseManager # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) +# Global services +ftp_monitor = None +db_manager = None + + @asynccontextmanager async def lifespan(app: FastAPI): - """Application lifespan manager""" - logger.info("Data Ingestion Service starting up...") - - try: - # Connect to databases - await db_manager.connect() - - # Initialize core components - await initialize_data_sources() - await initialize_components() - - # Start background tasks - asyncio.create_task(ftp_monitoring_task()) - asyncio.create_task(data_processing_task()) - asyncio.create_task(health_monitoring_task()) - asyncio.create_task(cleanup_task()) - - logger.info("Data Ingestion Service startup complete") - - yield - - except Exception as e: - logger.error(f"Error during startup: {e}") - raise - finally: - logger.info("Data Ingestion Service shutting down...") - await db_manager.disconnect() - logger.info("Data Ingestion Service shutdown complete") + """Application lifespan management""" + global ftp_monitor, db_manager + logger.info("Starting SA4CPS Data Ingestion Service...") + + # Initialize database connection + db_manager = DatabaseManager() + await db_manager.connect() + + # Initialize FTP monitor + ftp_monitor = FTPMonitor(db_manager) + + # Start background monitoring task + monitoring_task = asyncio.create_task(ftp_monitor.start_monitoring()) + + logger.info("Service started successfully") + + yield + + # Cleanup on shutdown + logger.info("Shutting down service...") + monitoring_task.cancel() + await db_manager.close() + logger.info("Service shutdown complete") + + +# Create FastAPI app app = FastAPI( - title="Data Ingestion Service", - description="FTP monitoring and time series data ingestion for real community data simulation", + title="SA4CPS Data Ingestion Service", + description="Monitors FTP server for .slg_v2 files and stores data in MongoDB", version="1.0.0", lifespan=lifespan ) -app.add_middleware( - CORSMiddleware, - allow_origins=["*"], - allow_credentials=True, - allow_methods=["*"], - allow_headers=["*"], -) -# Global components -ftp_monitor = None -data_processor = None -redis_publisher = None -data_validator = None -service_monitor = None +@app.get("/") +async def root(): + """Root endpoint""" + return { + "service": "SA4CPS Data Ingestion Service", + "status": "running", + "timestamp": datetime.now().isoformat() + } -# Dependencies -async def get_db(): - return await get_database() -async def get_ftp_monitor(): - global ftp_monitor - if not ftp_monitor: - db = await get_database() - redis = await get_redis() - ftp_monitor = FTPMonitor(db, redis) - return ftp_monitor - -async def get_slg_processor(): - global data_processor - if not data_processor: - db = await get_database() - redis = await get_redis() - data_processor = SLGv2Processor(db, redis) - return data_processor - -async def get_redis_publisher(): - global redis_publisher - if not redis_publisher: - redis = await get_redis() - redis_publisher = RedisPublisher(redis) - return redis_publisher - -async def get_data_validator(): - global data_validator - if not data_validator: - db = await get_database() - redis = await get_redis() - data_validator = DataValidator(db, redis) - return data_validator - -@app.get("/health", response_model=HealthStatus) +@app.get("/health") async def health_check(): """Health check endpoint""" - try: - # Get database health - health_data = await db_manager.health_check() - - # Get FTP connections status - ftp_status = await check_ftp_connections() - - # Calculate uptime - app_start_time = getattr(app.state, 'start_time', datetime.utcnow()) - uptime = (datetime.utcnow() - app_start_time).total_seconds() - - # Get processing stats - processing_stats = await get_processing_queue_size() - - overall_status = "healthy" - if not health_data["mongodb"] or not health_data["redis"]: - overall_status = "degraded" - elif ftp_status["healthy_connections"] == 0 and ftp_status["total_connections"] > 0: - overall_status = "degraded" - - return HealthStatus( - status=overall_status, - timestamp=datetime.utcnow(), - uptime_seconds=uptime, - active_sources=ftp_status["healthy_connections"], - total_processed_files=processing_stats.get("total_processed", 0), - redis_connected=health_data["redis"], - mongodb_connected=health_data["mongodb"], - last_error=None - ) - except Exception as e: - logger.error(f"Health check failed: {e}") - return HealthStatus( - status="unhealthy", - timestamp=datetime.utcnow(), - uptime_seconds=0, - active_sources=0, - total_processed_files=0, - redis_connected=False, - mongodb_connected=False, - last_error=str(e) - ) + global ftp_monitor, db_manager -@app.get("/stats", response_model=IngestionStats) -async def get_ingestion_stats(): - """Get data ingestion statistics""" - try: - db = await get_database() - - # Get statistics from database - stats_data = await db.ingestion_stats.find_one( - {"date": datetime.utcnow().strftime("%Y-%m-%d")} - ) or {} - - return IngestionStats( - files_processed_today=stats_data.get("files_processed", 0), - records_ingested_today=stats_data.get("records_ingested", 0), - errors_today=stats_data.get("errors", 0), - data_sources_active=stats_data.get("active_sources", 0), - average_processing_time_ms=stats_data.get("avg_processing_time", 0), - last_successful_ingestion=stats_data.get("last_success"), - redis_messages_published=stats_data.get("redis_published", 0), - data_quality_score=stats_data.get("quality_score", 100.0) - ) - except Exception as e: - logger.error(f"Error getting ingestion stats: {e}") - raise HTTPException(status_code=500, detail="Internal server error") + health_status = { + "service": "healthy", + "timestamp": datetime.now().isoformat(), + "database": "unknown", + "ftp_monitor": "unknown" + } -@app.get("/sources") -async def get_data_sources(): - """Get configured data sources""" - try: - db = await get_database() - cursor = db.data_sources.find({}) - sources = [] - - async for source in cursor: - source["_id"] = str(source["_id"]) - # Convert datetime fields - for field in ["created_at", "updated_at", "last_check", "last_success"]: - if field in source and source[field]: - source[field] = source[field].isoformat() - sources.append(source) - - return { - "sources": sources, - "count": len(sources) - } - except Exception as e: - logger.error(f"Error getting data sources: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@app.post("/sources") -async def create_data_source( - source_config: DataSourceCreate, - background_tasks: BackgroundTasks -): - """Create a new data source""" - try: - db = await get_database() - - # Create source document - source_doc = { - "name": source_config.name, - "description": source_config.description, - "source_type": source_config.source_type, - "ftp_config": source_config.ftp_config.dict() if source_config.ftp_config else None, - "file_patterns": source_config.file_patterns, - "data_format": source_config.data_format.value, - "topics": [topic.dict() for topic in source_config.topics], - "redis_topics": [topic.topic_name for topic in source_config.topics], - "enabled": source_config.enabled, - "check_interval_seconds": source_config.polling_interval_minutes * 60, - "max_file_size_mb": source_config.max_file_size_mb, - "created_at": datetime.utcnow(), - "updated_at": datetime.utcnow(), - "status": "created" - } - - result = await db.data_sources.insert_one(source_doc) - - # Test connection in background - background_tasks.add_task(test_data_source_connection, str(result.inserted_id)) - - return { - "message": "Data source created successfully", - "source_id": str(result.inserted_id), - "name": source_config.name - } - except Exception as e: - logger.error(f"Error creating data source: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@app.put("/sources/{source_id}") -async def update_data_source( - source_id: str, - source_config: DataSourceUpdate -): - """Update an existing data source""" - try: - db = await get_database() - - update_doc = {} - if source_config.name is not None: - update_doc["name"] = source_config.name - if source_config.description is not None: - update_doc["description"] = source_config.description - if source_config.ftp_config is not None: - update_doc["ftp_config"] = source_config.ftp_config.dict() - if source_config.file_patterns is not None: - update_doc["file_patterns"] = source_config.file_patterns - if source_config.data_format is not None: - update_doc["data_format"] = source_config.data_format.value - if source_config.topics is not None: - update_doc["topics"] = [topic.dict() for topic in source_config.topics] - update_doc["redis_topics"] = [topic.topic_name for topic in source_config.topics] - if source_config.enabled is not None: - update_doc["enabled"] = source_config.enabled - if source_config.polling_interval_minutes is not None: - update_doc["check_interval_seconds"] = source_config.polling_interval_minutes * 60 - if source_config.max_file_size_mb is not None: - update_doc["max_file_size_mb"] = source_config.max_file_size_mb - - update_doc["updated_at"] = datetime.utcnow() - - result = await db.data_sources.update_one( - {"_id": ObjectId(source_id)}, - {"$set": update_doc} - ) - - if result.matched_count == 0: - raise HTTPException(status_code=404, detail="Data source not found") - - return { - "message": "Data source updated successfully", - "source_id": source_id - } - except HTTPException: - raise - except Exception as e: - logger.error(f"Error updating data source: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@app.delete("/sources/{source_id}") -async def delete_data_source(source_id: str): - """Delete a data source""" - try: - db = await get_database() - - result = await db.data_sources.delete_one({"_id": ObjectId(source_id)}) - - if result.deleted_count == 0: - raise HTTPException(status_code=404, detail="Data source not found") - - return { - "message": "Data source deleted successfully", - "source_id": source_id - } - except HTTPException: - raise - except Exception as e: - logger.error(f"Error deleting data source: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@app.post("/sources/{source_id}/test") -async def test_data_source(source_id: str): - """Test connection to a data source""" - try: - db = await get_database() - source = await db.data_sources.find_one({"_id": ObjectId(source_id)}) - - if not source: - raise HTTPException(status_code=404, detail="Data source not found") - - monitor = await get_ftp_monitor() - test_result = await monitor.test_connection(source) - - return { - "source_id": source_id, - "connection_test": test_result, - "tested_at": datetime.utcnow().isoformat() - } - except HTTPException: - raise - except Exception as e: - logger.error(f"Error testing data source: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@app.post("/sources/{source_id}/trigger") -async def trigger_manual_check( - source_id: str, - background_tasks: BackgroundTasks -): - """Manually trigger a check for new data""" - try: - db = await get_database() - source = await db.data_sources.find_one({"_id": ObjectId(source_id)}) - - if not source: - raise HTTPException(status_code=404, detail="Data source not found") - - # Trigger check in background - background_tasks.add_task(process_data_source, source) - - return { - "message": "Manual check triggered", - "source_id": source_id, - "triggered_at": datetime.utcnow().isoformat() - } - except HTTPException: - raise - except Exception as e: - logger.error(f"Error triggering manual check: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@app.get("/processing/status") -async def get_processing_status(): - """Get current processing status""" - try: - db = await get_database() - - # Get recent processing jobs - cursor = db.processing_jobs.find().sort("started_at", -1).limit(20) - jobs = [] - - async for job in cursor: - job["_id"] = str(job["_id"]) - for field in ["started_at", "completed_at", "created_at"]: - if field in job and job[field]: - job[field] = job[field].isoformat() - jobs.append(job) - - # Get queue size - queue_size = await get_processing_queue_size() - - return { - "processing_jobs": jobs, - "queue_size": queue_size, - "last_updated": datetime.utcnow().isoformat() - } - except Exception as e: - logger.error(f"Error getting processing status: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@app.get("/data-quality") -async def get_data_quality_metrics(): - """Get data quality metrics""" - try: - db = await get_database() - - # Get recent quality metrics - cursor = db.data_quality_metrics.find().sort("timestamp", -1).limit(10) - metrics = [] - - async for metric in cursor: - metric["_id"] = str(metric["_id"]) - if "timestamp" in metric: - metric["timestamp"] = metric["timestamp"].isoformat() - metrics.append(metric) - - return { - "quality_metrics": metrics, - "count": len(metrics) - } - except Exception as e: - logger.error(f"Error getting data quality metrics: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -@app.get("/redis/topics") -async def get_redis_topics(): - """Get active Redis topics""" - try: - redis = await get_redis() - publisher = await get_redis_publisher() - - topics_info = await publisher.get_topics_info() - - return { - "active_topics": topics_info, - "timestamp": datetime.utcnow().isoformat() - } - except Exception as e: - logger.error(f"Error getting Redis topics: {e}") - raise HTTPException(status_code=500, detail="Internal server error") - -# Background task functions -async def initialize_data_sources(): - """Initialize data sources from database""" - try: - db = await get_database() - - # Auto-configure SA4CPS source if none exist - count = await db.data_sources.count_documents({}) - if count == 0: - from .simple_sa4cps_config import SimpleSA4CPSConfig - - config = SimpleSA4CPSConfig() - result = await config.setup_sa4cps_source() - - if result['success']: - logger.info(f"✅ Auto-configured SA4CPS source: {result['source_id']}") - else: - logger.warning(f"Failed to auto-configure SA4CPS: {result['message']}") - - except Exception as e: - logger.error(f"Error initializing data sources: {e}") - -async def initialize_components(): - """Initialize core service components""" - try: - # Initialize global components - global ftp_monitor, data_processor, redis_publisher, data_validator, service_monitor - - db = await get_database() - redis = await get_redis() - - # Initialize monitoring first - service_monitor = ServiceMonitor(db, redis) - await service_monitor.start_monitoring() - - # Initialize FTP monitor - ftp_monitor = FTPMonitor(db, redis) - - # Initialize SLG_v2 processor - data_processor = SLGv2Processor(db, redis) - - # Initialize Redis publisher - redis_publisher = RedisPublisher(redis) - await redis_publisher.initialize() - - # Initialize data validator - data_validator = DataValidator(db, redis) - await data_validator.initialize() - - # Store app start time for uptime calculation - app.state.start_time = datetime.utcnow() - - logger.info("Core components initialized successfully") - - except Exception as e: - logger.error(f"Error initializing components: {e}") - if service_monitor: - await service_monitor.error_handler.log_error(e, {"task": "component_initialization"}) - raise - -async def ftp_monitoring_task(): - """Main FTP monitoring background task""" - logger.info("Starting FTP monitoring task") - - while True: + # Check database connection + if db_manager: try: - db = await get_database() - - # Get all enabled data sources - cursor = db.data_sources.find({"enabled": True}) - - async for source in cursor: - try: - # Check if it's time to check this source - last_check = source.get("last_check") - check_interval = source.get("check_interval_seconds", 300) - - if (not last_check or - (datetime.utcnow() - last_check).total_seconds() >= check_interval): - - # Process this data source - await process_data_source(source) - - # Update last check time - await db.data_sources.update_one( - {"_id": source["_id"]}, - {"$set": {"last_check": datetime.utcnow()}} - ) - - except Exception as e: - logger.error(f"Error processing data source {source.get('name', 'unknown')}: {e}") - - # Sleep between monitoring cycles - await asyncio.sleep(30) - - except Exception as e: - logger.error(f"Error in FTP monitoring task: {e}") - await asyncio.sleep(60) + await db_manager.ping() + health_status["database"] = "connected" + except Exception: + health_status["database"] = "disconnected" + health_status["service"] = "degraded" + + # Check FTP monitor status + if ftp_monitor: + health_status["ftp_monitor"] = ftp_monitor.get_status() + health_status["last_check"] = ftp_monitor.get_last_check_time() + health_status["files_processed"] = ftp_monitor.get_processed_count() + + return health_status + + +@app.get("/status") +async def get_status(): + """Detailed status endpoint""" + global ftp_monitor, db_manager + + if not ftp_monitor: + raise HTTPException(status_code=503, detail="FTP monitor not initialized") + + return { + "ftp_monitor": ftp_monitor.get_detailed_status(), + "database": await db_manager.get_stats() if db_manager else None, + "timestamp": datetime.now().isoformat() + } + + +@app.post("/trigger-check") +async def trigger_manual_check(): + """Manually trigger FTP check""" + global ftp_monitor + + if not ftp_monitor: + raise HTTPException(status_code=503, detail="FTP monitor not initialized") -async def process_data_source(source: Dict[str, Any]): - """Process a single data source""" try: - monitor = await get_ftp_monitor() - processor = await get_slg_processor() - publisher = await get_redis_publisher() - - # Get new files from FTP - new_files = await monitor.check_for_new_files(source) - - if new_files: - logger.info(f"Found {len(new_files)} new .slg_v2 files for source: {source['name']}") - - for file_info in new_files: - try: - # Download and process file - file_data = await monitor.download_file(source, file_info) - - # Process the .slg_v2 file - processed_data = await processor.process_slg_v2_file(file_data) - - # Validate data quality - validator = await get_data_validator() - quality_metrics = await validator.validate_time_series(processed_data) - - # Publish to Redis topics - for topic in source["redis_topics"]: - await publisher.publish_time_series_data( - topic, processed_data, source["name"] - ) - - # Record processing success - await record_processing_success(source, file_info, len(processed_data), quality_metrics) - - except Exception as e: - logger.error(f"Error processing file {file_info.get('filename', 'unknown')}: {e}") - await record_processing_error(source, file_info, str(e)) - + result = await ftp_monitor.check_for_new_files() + return { + "message": "Manual check completed", + "result": result, + "timestamp": datetime.now().isoformat() + } except Exception as e: - logger.error(f"Error in process_data_source for {source.get('name', 'unknown')}: {e}") + logger.error(f"Manual check failed: {e}") + raise HTTPException(status_code=500, detail=f"Check failed: {str(e)}") -async def data_processing_task(): - """Background task for data processing queue""" - logger.info("Starting data processing task") - - # This task handles queued processing jobs - while True: - try: - await asyncio.sleep(10) # Check every 10 seconds - # Implementation for processing queued jobs would go here - - except Exception as e: - logger.error(f"Error in data processing task: {e}") - await asyncio.sleep(30) - -async def health_monitoring_task(): - """Background task for monitoring system health""" - logger.info("Starting health monitoring task") - - while True: - try: - # Monitor FTP connections - await monitor_ftp_health() - - # Monitor Redis publishing - await monitor_redis_health() - - # Monitor processing performance - await monitor_processing_performance() - - await asyncio.sleep(60) # Check every minute - - except Exception as e: - logger.error(f"Error in health monitoring task: {e}") - await asyncio.sleep(120) - -async def cleanup_task(): - """Background task for cleaning up old data""" - logger.info("Starting cleanup task") - - while True: - try: - db = await get_database() - - # Clean up old processing jobs (keep last 1000) - old_jobs = await db.processing_jobs.find().sort("created_at", -1).skip(1000) - async for job in old_jobs: - await db.processing_jobs.delete_one({"_id": job["_id"]}) - - # Clean up old quality metrics (keep last 30 days) - cutoff_date = datetime.utcnow() - timedelta(days=30) - await db.data_quality_metrics.delete_many({"timestamp": {"$lt": cutoff_date}}) - - # Clean up old ingestion stats (keep last 90 days) - cutoff_date = datetime.utcnow() - timedelta(days=90) - await db.ingestion_stats.delete_many({"date": {"$lt": cutoff_date.strftime("%Y-%m-%d")}}) - - await asyncio.sleep(3600) # Run every hour - - except Exception as e: - logger.error(f"Error in cleanup task: {e}") - await asyncio.sleep(7200) - -# Helper functions -async def check_ftp_connections() -> Dict[str, int]: - """Check health of FTP connections""" - try: - db = await get_database() - sources = await db.data_sources.find({"enabled": True}).to_list(None) - - total = len(sources) - healthy = 0 - - monitor = await get_ftp_monitor() - for source in sources: - try: - if await monitor.test_connection(source): - healthy += 1 - except: - pass - - return {"total_connections": total, "healthy_connections": healthy} - except Exception as e: - logger.error(f"Error checking FTP connections: {e}") - return {"total_connections": 0, "healthy_connections": 0} - -async def get_processing_queue_size() -> int: - """Get size of processing queue""" - try: - db = await get_database() - return await db.processing_queue.count_documents({"status": "pending"}) - except Exception as e: - logger.error(f"Error getting queue size: {e}") - return 0 - -async def test_data_source_connection(source_id: str): - """Test connection to a data source (background task)""" - try: - db = await get_database() - source = await db.data_sources.find_one({"_id": ObjectId(source_id)}) - - if source: - monitor = await get_ftp_monitor() - success = await monitor.test_connection(source) - - await db.data_sources.update_one( - {"_id": ObjectId(source_id)}, - {"$set": { - "last_test": datetime.utcnow(), - "last_test_result": "success" if success else "failed" - }} - ) - except Exception as e: - logger.error(f"Error testing connection for source {source_id}: {e}") - -async def record_processing_success(source, file_info, record_count, quality_metrics): - """Record successful processing""" - try: - db = await get_database() - - # Update source stats - await db.data_sources.update_one( - {"_id": source["_id"]}, - {"$set": {"last_success": datetime.utcnow()}} - ) - - # Update daily stats - today = datetime.utcnow().strftime("%Y-%m-%d") - await db.ingestion_stats.update_one( - {"date": today}, - { - "$inc": { - "files_processed": 1, - "records_ingested": record_count, - "redis_published": len(source["redis_topics"]) - }, - "$set": { - "last_success": datetime.utcnow(), - "quality_score": quality_metrics.get("overall_score", 100.0) - } - }, - upsert=True - ) - - except Exception as e: - logger.error(f"Error recording processing success: {e}") - -async def record_processing_error(source, file_info, error_message): - """Record processing error""" - try: - db = await get_database() - - # Update daily stats - today = datetime.utcnow().strftime("%Y-%m-%d") - await db.ingestion_stats.update_one( - {"date": today}, - {"$inc": {"errors": 1}}, - upsert=True - ) - - # Log error - await db.processing_errors.insert_one({ - "source_id": source["_id"], - "source_name": source["name"], - "file_info": file_info, - "error_message": error_message, - "timestamp": datetime.utcnow() - }) - - except Exception as e: - logger.error(f"Error recording processing error: {e}") - -async def monitor_ftp_health(): - """Monitor FTP connection health""" - # Implementation for FTP health monitoring - pass - -async def monitor_redis_health(): - """Monitor Redis publishing health""" - # Implementation for Redis health monitoring - pass - -async def monitor_processing_performance(): - """Monitor processing performance metrics""" - # Implementation for performance monitoring - pass if __name__ == "__main__": import uvicorn - from bson import ObjectId - uvicorn.run(app, host="0.0.0.0", port=8008) \ No newline at end of file + uvicorn.run("main:app", host="0.0.0.0", port=8008, reload=True) diff --git a/microservices/data-ingestion-service/src/models.py b/microservices/data-ingestion-service/src/models.py deleted file mode 100644 index 265470d..0000000 --- a/microservices/data-ingestion-service/src/models.py +++ /dev/null @@ -1,386 +0,0 @@ -""" -Data models for the data ingestion service. -Defines Pydantic models for request/response validation and database schemas. -""" - -from pydantic import BaseModel, Field, validator -from typing import List, Dict, Any, Optional, Union -from datetime import datetime -from enum import Enum - -class DataFormat(str, Enum): - """Supported data formats for SA4CPS ingestion""" - SLG_V2 = "slg_v2" - -class SourceStatus(str, Enum): - """Status of a data source""" - ACTIVE = "active" - INACTIVE = "inactive" - ERROR = "error" - MAINTENANCE = "maintenance" - -class FTPConfig(BaseModel): - """FTP server configuration""" - host: str - port: int = Field(default=21, ge=1, le=65535) - username: str = "anonymous" - password: str = "" - use_ssl: bool = False - passive_mode: bool = True - remote_path: str = "/" - timeout: int = Field(default=30, ge=5, le=300) - - @validator('host') - def validate_host(cls, v): - if not v or len(v.strip()) == 0: - raise ValueError('Host cannot be empty') - return v.strip() - -class TopicConfig(BaseModel): - """Redis topic configuration""" - topic_name: str - description: str = "" - data_types: List[str] = Field(default_factory=lambda: ["all"]) - format: str = "sensor_reading" - enabled: bool = True - -class DataSourceCreate(BaseModel): - """Request model for creating a new data source""" - name: str = Field(..., min_length=1, max_length=100) - description: str = "" - source_type: str = Field(default="ftp", regex="^(ftp|sftp|http|https)$") - ftp_config: FTPConfig - file_patterns: List[str] = Field(default_factory=lambda: ["*.slg_v2"]) - data_format: DataFormat = DataFormat.SLG_V2 - topics: List[TopicConfig] = Field(default_factory=list) - polling_interval_minutes: int = Field(default=5, ge=1, le=1440) - max_file_size_mb: int = Field(default=100, ge=1, le=1000) - enabled: bool = True - -class DataSourceUpdate(BaseModel): - """Request model for updating a data source""" - name: Optional[str] = Field(None, min_length=1, max_length=100) - description: Optional[str] = None - ftp_config: Optional[FTPConfig] = None - file_patterns: Optional[List[str]] = None - data_format: Optional[DataFormat] = None - topics: Optional[List[TopicConfig]] = None - polling_interval_minutes: Optional[int] = Field(None, ge=1, le=1440) - max_file_size_mb: Optional[int] = Field(None, ge=1, le=1000) - enabled: Optional[bool] = None - -class DataSourceResponse(BaseModel): - """Response model for data source information""" - id: str - name: str - description: str - source_type: str - ftp_config: FTPConfig - file_patterns: List[str] - data_format: DataFormat - topics: List[TopicConfig] - polling_interval_minutes: int - max_file_size_mb: int - enabled: bool - status: SourceStatus - created_at: datetime - updated_at: datetime - last_check: Optional[datetime] = None - last_success: Optional[datetime] = None - error_count: int = 0 - total_files_processed: int = 0 - - class Config: - json_encoders = { - datetime: lambda v: v.isoformat() - } - -class FileProcessingRequest(BaseModel): - """Request model for manual file processing""" - source_id: str - filename: str - force_reprocess: bool = False - -class FileProcessingResponse(BaseModel): - """Response model for file processing results""" - success: bool - message: str - records_processed: int - records_rejected: int - processing_time_seconds: float - file_size_bytes: int - topics_published: List[str] - -class IngestionStats(BaseModel): - """Response model for ingestion statistics""" - files_processed_today: int - records_processed_today: int - active_sources: int - total_sources: int - average_processing_time: float - success_rate_percentage: float - last_24h_volume_mb: float - -class QualityMetrics(BaseModel): - """Data quality metrics""" - completeness: float = Field(..., ge=0.0, le=1.0) - accuracy: float = Field(..., ge=0.0, le=1.0) - consistency: float = Field(..., ge=0.0, le=1.0) - timeliness: float = Field(..., ge=0.0, le=1.0) - overall: float = Field(..., ge=0.0, le=1.0) - -class QualityReport(BaseModel): - """Data quality report""" - source: str - total_records: int - processed_records: int - rejected_records: int - quality_scores: QualityMetrics - issues_found: List[str] - processing_time: datetime - - class Config: - json_encoders = { - datetime: lambda v: v.isoformat() - } - -class HealthStatus(BaseModel): - """Service health status""" - status: str - timestamp: datetime - uptime_seconds: float - active_sources: int - total_processed_files: int - redis_connected: bool - mongodb_connected: bool - last_error: Optional[str] = None - - class Config: - json_encoders = { - datetime: lambda v: v.isoformat() - } - -class SensorReading(BaseModel): - """Individual sensor reading model""" - sensor_id: str - timestamp: Union[int, float, str] - value: Union[int, float] - unit: Optional[str] = None - metadata: Dict[str, Any] = Field(default_factory=dict) - -class ProcessedFile(BaseModel): - """Processed file record""" - source_id: str - source_name: str - filename: str - file_signature: str - file_size: int - modified_time: datetime - processed_at: datetime - -class TopicInfo(BaseModel): - """Topic information response""" - topic_name: str - description: str - data_types: List[str] - format: str - message_count: int - last_published: Optional[datetime] = None - created_at: datetime - - class Config: - json_encoders = { - datetime: lambda v: v.isoformat() - } - -class PublishingStats(BaseModel): - """Publishing statistics response""" - total_messages_published: int - active_topics: int - topic_stats: Dict[str, int] - last_updated: datetime - - class Config: - json_encoders = { - datetime: lambda v: v.isoformat() - } - -class ErrorLog(BaseModel): - """Error logging model""" - service: str = "data-ingestion-service" - timestamp: datetime - level: str - source_id: Optional[str] = None - source_name: Optional[str] = None - error_type: str - error_message: str - stack_trace: Optional[str] = None - context: Dict[str, Any] = Field(default_factory=dict) - - class Config: - json_encoders = { - datetime: lambda v: v.isoformat() - } - -class MonitoringAlert(BaseModel): - """Monitoring alert model""" - alert_id: str - alert_type: str # "error", "warning", "info" - source_id: Optional[str] = None - title: str - description: str - severity: str = Field(..., regex="^(low|medium|high|critical)$") - timestamp: datetime - resolved: bool = False - resolved_at: Optional[datetime] = None - metadata: Dict[str, Any] = Field(default_factory=dict) - - class Config: - json_encoders = { - datetime: lambda v: v.isoformat() - } - -# Database schema definitions for MongoDB collections - -class DataSourceSchema: - """MongoDB schema for data sources""" - collection_name = "data_sources" - - @staticmethod - def get_indexes(): - return [ - {"keys": [("name", 1)], "unique": True}, - {"keys": [("status", 1)]}, - {"keys": [("enabled", 1)]}, - {"keys": [("created_at", -1)]}, - {"keys": [("last_check", -1)]} - ] - -class ProcessedFileSchema: - """MongoDB schema for processed files""" - collection_name = "processed_files" - - @staticmethod - def get_indexes(): - return [ - {"keys": [("source_id", 1), ("file_signature", 1)], "unique": True}, - {"keys": [("processed_at", -1)]}, - {"keys": [("source_name", 1)]}, - {"keys": [("filename", 1)]} - ] - -class QualityReportSchema: - """MongoDB schema for quality reports""" - collection_name = "quality_reports" - - @staticmethod - def get_indexes(): - return [ - {"keys": [("source", 1)]}, - {"keys": [("processing_time", -1)]}, - {"keys": [("quality_scores.overall", -1)]} - ] - -class IngestionStatsSchema: - """MongoDB schema for ingestion statistics""" - collection_name = "ingestion_stats" - - @staticmethod - def get_indexes(): - return [ - {"keys": [("date", 1)], "unique": True}, - {"keys": [("timestamp", -1)]} - ] - -class ErrorLogSchema: - """MongoDB schema for error logs""" - collection_name = "error_logs" - - @staticmethod - def get_indexes(): - return [ - {"keys": [("timestamp", -1)]}, - {"keys": [("source_id", 1)]}, - {"keys": [("error_type", 1)]}, - {"keys": [("level", 1)]} - ] - -class MonitoringAlertSchema: - """MongoDB schema for monitoring alerts""" - collection_name = "monitoring_alerts" - - @staticmethod - def get_indexes(): - return [ - {"keys": [("alert_id", 1)], "unique": True}, - {"keys": [("timestamp", -1)]}, - {"keys": [("source_id", 1)]}, - {"keys": [("alert_type", 1)]}, - {"keys": [("resolved", 1)]} - ] - -# Validation helpers -def validate_timestamp(timestamp: Union[int, float, str]) -> int: - """Validate and convert timestamp to unix timestamp""" - if isinstance(timestamp, str): - try: - # Try ISO format first - dt = datetime.fromisoformat(timestamp.replace('Z', '+00:00')) - return int(dt.timestamp()) - except ValueError: - try: - # Try as unix timestamp string - return int(float(timestamp)) - except ValueError: - raise ValueError(f"Invalid timestamp format: {timestamp}") - elif isinstance(timestamp, (int, float)): - return int(timestamp) - else: - raise ValueError(f"Timestamp must be int, float, or string, got {type(timestamp)}") - -def validate_sensor_id(sensor_id: str) -> str: - """Validate sensor ID format""" - if not isinstance(sensor_id, str) or len(sensor_id.strip()) == 0: - raise ValueError("Sensor ID must be a non-empty string") - - # Remove extra whitespace - sensor_id = sensor_id.strip() - - # Check length - if len(sensor_id) > 100: - raise ValueError("Sensor ID too long (max 100 characters)") - - return sensor_id - -def validate_numeric_value(value: Union[int, float, str]) -> float: - """Validate and convert numeric value""" - try: - numeric_value = float(value) - if not (-1e10 <= numeric_value <= 1e10): # Reasonable range - raise ValueError(f"Value out of reasonable range: {numeric_value}") - return numeric_value - except (ValueError, TypeError): - raise ValueError(f"Invalid numeric value: {value}") - -# Export all models for easy importing -__all__ = [ - # Enums - 'DataFormat', 'SourceStatus', - - # Config models - 'FTPConfig', 'TopicConfig', - - # Request/Response models - 'DataSourceCreate', 'DataSourceUpdate', 'DataSourceResponse', - 'FileProcessingRequest', 'FileProcessingResponse', - 'IngestionStats', 'QualityMetrics', 'QualityReport', - 'HealthStatus', 'SensorReading', 'ProcessedFile', - 'TopicInfo', 'PublishingStats', 'ErrorLog', 'MonitoringAlert', - - # Schema definitions - 'DataSourceSchema', 'ProcessedFileSchema', 'QualityReportSchema', - 'IngestionStatsSchema', 'ErrorLogSchema', 'MonitoringAlertSchema', - - # Validation helpers - 'validate_timestamp', 'validate_sensor_id', 'validate_numeric_value' -] diff --git a/microservices/data-ingestion-service/src/monitoring.py b/microservices/data-ingestion-service/src/monitoring.py deleted file mode 100644 index b691bc7..0000000 --- a/microservices/data-ingestion-service/src/monitoring.py +++ /dev/null @@ -1,545 +0,0 @@ -""" -Monitoring and alerting system for the data ingestion service. -Handles error tracking, performance monitoring, and alert generation. -""" - -import asyncio -import logging -from datetime import datetime, timedelta -from typing import List, Dict, Any, Optional -import json -import traceback -import uuid -from collections import defaultdict, deque -import time -import psutil -import os - -logger = logging.getLogger(__name__) - -class PerformanceMonitor: - """Monitors service performance metrics""" - - def __init__(self, redis_client): - self.redis = redis_client - self.metrics_buffer = defaultdict(deque) - self.max_buffer_size = 1000 - self.last_flush = datetime.utcnow() - self.flush_interval = 60 # seconds - - # Performance counters - self.request_count = 0 - self.error_count = 0 - self.processing_times = deque(maxlen=100) - self.memory_usage = deque(maxlen=100) - self.cpu_usage = deque(maxlen=100) - - async def record_request(self, endpoint: str, duration: float, success: bool = True): - """Record request metrics""" - try: - self.request_count += 1 - if not success: - self.error_count += 1 - - self.processing_times.append(duration) - - # Store in buffer - metric_data = { - "timestamp": datetime.utcnow().isoformat(), - "endpoint": endpoint, - "duration_ms": duration * 1000, - "success": success, - "request_id": str(uuid.uuid4()) - } - - self.metrics_buffer["requests"].append(metric_data) - - # Trim buffer if needed - if len(self.metrics_buffer["requests"]) > self.max_buffer_size: - self.metrics_buffer["requests"].popleft() - - # Auto-flush if interval exceeded - if (datetime.utcnow() - self.last_flush).seconds > self.flush_interval: - await self.flush_metrics() - - except Exception as e: - logger.error(f"Error recording request metric: {e}") - - async def record_system_metrics(self): - """Record system-level performance metrics""" - try: - # CPU usage - cpu_percent = psutil.cpu_percent() - self.cpu_usage.append(cpu_percent) - - # Memory usage - process = psutil.Process() - memory_info = process.memory_info() - memory_mb = memory_info.rss / 1024 / 1024 - self.memory_usage.append(memory_mb) - - # Disk usage - disk_usage = psutil.disk_usage('/') - - system_metrics = { - "timestamp": datetime.utcnow().isoformat(), - "cpu_percent": cpu_percent, - "memory_mb": memory_mb, - "disk_free_gb": disk_usage.free / 1024 / 1024 / 1024, - "disk_percent": (disk_usage.used / disk_usage.total) * 100 - } - - self.metrics_buffer["system"].append(system_metrics) - - # Trim buffer - if len(self.metrics_buffer["system"]) > self.max_buffer_size: - self.metrics_buffer["system"].popleft() - - except Exception as e: - logger.error(f"Error recording system metrics: {e}") - - async def record_data_processing_metrics(self, source_name: str, files_processed: int, - records_processed: int, processing_time: float): - """Record data processing performance metrics""" - try: - processing_metrics = { - "timestamp": datetime.utcnow().isoformat(), - "source_name": source_name, - "files_processed": files_processed, - "records_processed": records_processed, - "processing_time_seconds": processing_time, - "records_per_second": records_processed / max(processing_time, 0.001), - "files_per_hour": files_processed * 3600 / max(processing_time, 0.001) - } - - self.metrics_buffer["processing"].append(processing_metrics) - - # Trim buffer - if len(self.metrics_buffer["processing"]) > self.max_buffer_size: - self.metrics_buffer["processing"].popleft() - - except Exception as e: - logger.error(f"Error recording processing metrics: {e}") - - async def flush_metrics(self): - """Flush metrics buffer to Redis""" - try: - if not self.metrics_buffer: - return - - # Create batch update - pipe = self.redis.pipeline() - - for metric_type, metrics in self.metrics_buffer.items(): - # Convert deque to list and serialize - metrics_data = [dict(m) if isinstance(m, dict) else m for m in metrics] - - # Store in Redis with timestamp key - timestamp_key = datetime.utcnow().strftime("%Y%m%d_%H%M") - redis_key = f"metrics:{metric_type}:{timestamp_key}" - - pipe.lpush(redis_key, json.dumps(metrics_data)) - pipe.expire(redis_key, 86400 * 7) # Keep for 7 days - - await pipe.execute() - - # Clear buffer - self.metrics_buffer.clear() - self.last_flush = datetime.utcnow() - - logger.debug("Performance metrics flushed to Redis") - - except Exception as e: - logger.error(f"Error flushing metrics: {e}") - - async def get_performance_summary(self) -> Dict[str, Any]: - """Get current performance summary""" - try: - return { - "request_count": self.request_count, - "error_count": self.error_count, - "error_rate": (self.error_count / max(self.request_count, 1)) * 100, - "avg_processing_time_ms": sum(self.processing_times) / max(len(self.processing_times), 1) * 1000, - "current_memory_mb": self.memory_usage[-1] if self.memory_usage else 0, - "current_cpu_percent": self.cpu_usage[-1] if self.cpu_usage else 0, - "metrics_buffer_size": sum(len(buffer) for buffer in self.metrics_buffer.values()), - "last_flush": self.last_flush.isoformat() - } - except Exception as e: - logger.error(f"Error getting performance summary: {e}") - return {} - -class ErrorHandler: - """Centralized error handling and logging""" - - def __init__(self, db, redis_client): - self.db = db - self.redis = redis_client - self.error_counts = defaultdict(int) - self.error_history = deque(maxlen=100) - self.alert_thresholds = { - "error_rate": 10, # errors per minute - "memory_usage": 500, # MB - "cpu_usage": 80, # percent - "disk_usage": 90, # percent - "response_time": 5000 # milliseconds - } - - async def log_error(self, error: Exception, context: Dict[str, Any] = None, - source_id: str = None, source_name: str = None): - """Log error with context information""" - try: - error_type = type(error).__name__ - error_message = str(error) - stack_trace = traceback.format_exc() - - # Update error counters - self.error_counts[error_type] += 1 - - # Create error record - error_record = { - "timestamp": datetime.utcnow(), - "service": "data-ingestion-service", - "level": "ERROR", - "source_id": source_id, - "source_name": source_name, - "error_type": error_type, - "error_message": error_message, - "stack_trace": stack_trace, - "context": context or {} - } - - # Store in database - await self.db.error_logs.insert_one(error_record) - - # Add to history - self.error_history.append({ - "timestamp": error_record["timestamp"].isoformat(), - "type": error_type, - "message": error_message[:100] # Truncate for memory - }) - - # Check for alert conditions - await self.check_alert_conditions(error_record) - - # Log to standard logger - logger.error(f"[{source_name or 'system'}] {error_type}: {error_message}", - extra={"context": context, "source_id": source_id}) - - except Exception as e: - # Fallback logging if error handler fails - logger.critical(f"Error handler failed: {e}") - logger.error(f"Original error: {error}") - - async def log_warning(self, message: str, context: Dict[str, Any] = None, - source_id: str = None, source_name: str = None): - """Log warning message""" - try: - warning_record = { - "timestamp": datetime.utcnow(), - "service": "data-ingestion-service", - "level": "WARNING", - "source_id": source_id, - "source_name": source_name, - "error_type": "WARNING", - "error_message": message, - "context": context or {} - } - - await self.db.error_logs.insert_one(warning_record) - logger.warning(f"[{source_name or 'system'}] {message}", - extra={"context": context, "source_id": source_id}) - - except Exception as e: - logger.error(f"Error logging warning: {e}") - - async def check_alert_conditions(self, error_record: Dict[str, Any]): - """Check if error conditions warrant alerts""" - try: - # Count recent errors (last 1 minute) - one_minute_ago = datetime.utcnow() - timedelta(minutes=1) - recent_errors = await self.db.error_logs.count_documents({ - "timestamp": {"$gte": one_minute_ago}, - "level": "ERROR" - }) - - # Check error rate threshold - if recent_errors >= self.alert_thresholds["error_rate"]: - await self.create_alert( - alert_type="error_rate", - title="High Error Rate Detected", - description=f"Detected {recent_errors} errors in the last minute", - severity="high", - metadata={"error_count": recent_errors, "threshold": self.alert_thresholds["error_rate"]} - ) - - except Exception as e: - logger.error(f"Error checking alert conditions: {e}") - - async def create_alert(self, alert_type: str, title: str, description: str, - severity: str, source_id: str = None, metadata: Dict[str, Any] = None): - """Create monitoring alert""" - try: - alert_record = { - "alert_id": str(uuid.uuid4()), - "alert_type": alert_type, - "source_id": source_id, - "title": title, - "description": description, - "severity": severity, - "timestamp": datetime.utcnow(), - "resolved": False, - "metadata": metadata or {} - } - - await self.db.monitoring_alerts.insert_one(alert_record) - - # Also publish to Redis for real-time notifications - alert_notification = { - **alert_record, - "timestamp": alert_record["timestamp"].isoformat() - } - - await self.redis.publish("alerts:data-ingestion", json.dumps(alert_notification)) - - logger.warning(f"Alert created: {title} ({severity})") - - except Exception as e: - logger.error(f"Error creating alert: {e}") - - async def get_error_summary(self) -> Dict[str, Any]: - """Get error summary statistics""" - try: - # Get error counts by type - error_types = dict(self.error_counts) - - # Get recent error rate - one_hour_ago = datetime.utcnow() - timedelta(hours=1) - recent_errors = await self.db.error_logs.count_documents({ - "timestamp": {"$gte": one_hour_ago}, - "level": "ERROR" - }) - - # Get recent alerts - recent_alerts = await self.db.monitoring_alerts.count_documents({ - "timestamp": {"$gte": one_hour_ago}, - "resolved": False - }) - - return { - "total_errors": sum(error_types.values()), - "error_types": error_types, - "recent_errors_1h": recent_errors, - "active_alerts": recent_alerts, - "error_history": list(self.error_history)[-10:], # Last 10 errors - "last_error": self.error_history[-1] if self.error_history else None - } - - except Exception as e: - logger.error(f"Error getting error summary: {e}") - return {} - -class ServiceMonitor: - """Main service monitoring coordinator""" - - def __init__(self, db, redis_client): - self.db = db - self.redis = redis_client - self.performance_monitor = PerformanceMonitor(redis_client) - self.error_handler = ErrorHandler(db, redis_client) - self.monitoring_active = False - self.monitoring_interval = 30 # seconds - - async def start_monitoring(self): - """Start background monitoring tasks""" - self.monitoring_active = True - logger.info("Service monitoring started") - - # Start monitoring loop - asyncio.create_task(self._monitoring_loop()) - - async def stop_monitoring(self): - """Stop background monitoring""" - self.monitoring_active = False - await self.performance_monitor.flush_metrics() - logger.info("Service monitoring stopped") - - async def _monitoring_loop(self): - """Main monitoring loop""" - while self.monitoring_active: - try: - # Record system metrics - await self.performance_monitor.record_system_metrics() - - # Check system health - await self._check_system_health() - - # Cleanup old data - await self._cleanup_old_monitoring_data() - - # Wait for next cycle - await asyncio.sleep(self.monitoring_interval) - - except Exception as e: - await self.error_handler.log_error(e, {"task": "monitoring_loop"}) - await asyncio.sleep(self.monitoring_interval) - - async def _check_system_health(self): - """Check system health and create alerts if needed""" - try: - # Check memory usage - current_memory = self.performance_monitor.memory_usage[-1] if self.performance_monitor.memory_usage else 0 - if current_memory > self.error_handler.alert_thresholds["memory_usage"]: - await self.error_handler.create_alert( - alert_type="high_memory", - title="High Memory Usage", - description=f"Memory usage at {current_memory:.1f}MB", - severity="warning", - metadata={"current_memory_mb": current_memory} - ) - - # Check CPU usage - current_cpu = self.performance_monitor.cpu_usage[-1] if self.performance_monitor.cpu_usage else 0 - if current_cpu > self.error_handler.alert_thresholds["cpu_usage"]: - await self.error_handler.create_alert( - alert_type="high_cpu", - title="High CPU Usage", - description=f"CPU usage at {current_cpu:.1f}%", - severity="warning", - metadata={"current_cpu_percent": current_cpu} - ) - - except Exception as e: - logger.error(f"Error checking system health: {e}") - - async def _cleanup_old_monitoring_data(self): - """Clean up old monitoring data""" - try: - # Clean up old error logs (older than 30 days) - thirty_days_ago = datetime.utcnow() - timedelta(days=30) - - deleted_errors = await self.db.error_logs.delete_many({ - "timestamp": {"$lt": thirty_days_ago} - }) - - # Clean up resolved alerts (older than 7 days) - seven_days_ago = datetime.utcnow() - timedelta(days=7) - - deleted_alerts = await self.db.monitoring_alerts.delete_many({ - "timestamp": {"$lt": seven_days_ago}, - "resolved": True - }) - - if deleted_errors.deleted_count > 0 or deleted_alerts.deleted_count > 0: - logger.info(f"Cleaned up {deleted_errors.deleted_count} old error logs and " - f"{deleted_alerts.deleted_count} resolved alerts") - - except Exception as e: - logger.error(f"Error cleaning up old monitoring data: {e}") - - async def get_service_status(self) -> Dict[str, Any]: - """Get comprehensive service status""" - try: - performance_summary = await self.performance_monitor.get_performance_summary() - error_summary = await self.error_handler.get_error_summary() - - # Get database status - db_status = await self._get_database_status() - - # Overall health assessment - health_score = await self._calculate_health_score(performance_summary, error_summary) - - return { - "service": "data-ingestion-service", - "timestamp": datetime.utcnow().isoformat(), - "health_score": health_score, - "monitoring_active": self.monitoring_active, - "performance": performance_summary, - "errors": error_summary, - "database": db_status - } - - except Exception as e: - logger.error(f"Error getting service status: {e}") - return {"error": str(e)} - - async def _get_database_status(self) -> Dict[str, Any]: - """Get database connection and performance status""" - try: - # Test MongoDB connection - start_time = time.time() - await self.db.command("ping") - mongo_latency = (time.time() - start_time) * 1000 - - # Test Redis connection - start_time = time.time() - await self.redis.ping() - redis_latency = (time.time() - start_time) * 1000 - - # Get collection counts - collections_info = {} - for collection_name in ["data_sources", "processed_files", "error_logs", "monitoring_alerts"]: - try: - count = await self.db[collection_name].count_documents({}) - collections_info[collection_name] = count - except: - collections_info[collection_name] = "unknown" - - return { - "mongodb": { - "connected": True, - "latency_ms": round(mongo_latency, 2) - }, - "redis": { - "connected": True, - "latency_ms": round(redis_latency, 2) - }, - "collections": collections_info - } - - except Exception as e: - return { - "mongodb": {"connected": False, "error": str(e)}, - "redis": {"connected": False, "error": str(e)}, - "collections": {} - } - - async def _calculate_health_score(self, performance: Dict[str, Any], errors: Dict[str, Any]) -> float: - """Calculate overall health score (0-100)""" - try: - score = 100.0 - - # Deduct for high error rate - error_rate = performance.get("error_rate", 0) - if error_rate > 5: - score -= min(error_rate * 2, 30) - - # Deduct for high resource usage - memory_mb = performance.get("current_memory_mb", 0) - if memory_mb > 300: - score -= min((memory_mb - 300) / 10, 20) - - cpu_percent = performance.get("current_cpu_percent", 0) - if cpu_percent > 70: - score -= min((cpu_percent - 70) / 2, 15) - - # Deduct for recent errors - recent_errors = errors.get("recent_errors_1h", 0) - if recent_errors > 0: - score -= min(recent_errors * 5, 25) - - # Deduct for active alerts - active_alerts = errors.get("active_alerts", 0) - if active_alerts > 0: - score -= min(active_alerts * 10, 20) - - return max(0.0, round(score, 1)) - - except Exception as e: - logger.error(f"Error calculating health score: {e}") - return 50.0 # Default moderate health score - -# Export monitoring components -__all__ = [ - 'ServiceMonitor', 'PerformanceMonitor', 'ErrorHandler' -] \ No newline at end of file diff --git a/microservices/data-ingestion-service/src/redis_publisher.py b/microservices/data-ingestion-service/src/redis_publisher.py deleted file mode 100644 index 3af4794..0000000 --- a/microservices/data-ingestion-service/src/redis_publisher.py +++ /dev/null @@ -1,484 +0,0 @@ -""" -Redis publisher for broadcasting time series data to multiple topics. -Handles data transformation, routing, and publishing for real-time simulation. -""" - -import asyncio -import json -import logging -from datetime import datetime, timedelta -from typing import List, Dict, Any, Optional -import hashlib -import uuid -from collections import defaultdict -import redis.asyncio as redis - -logger = logging.getLogger(__name__) - -class RedisPublisher: - """Publishes time series data to Redis channels for real-time simulation""" - - def __init__(self, redis_client): - self.redis = redis_client - self.publishing_stats = defaultdict(int) - self.topic_configs = {} - self.message_cache = {} - - # Default topic configurations - self.default_topics = { - "energy_data": { - "description": "General energy consumption data", - "data_types": ["energy", "power", "consumption"], - "format": "sensor_reading" - }, - "community_consumption": { - "description": "Community-level energy consumption", - "data_types": ["consumption", "usage", "demand"], - "format": "aggregated_data" - }, - "real_time_metrics": { - "description": "Real-time sensor metrics", - "data_types": ["all"], - "format": "metric_update" - }, - "simulation_data": { - "description": "Data for simulation purposes", - "data_types": ["all"], - "format": "simulation_point" - }, - "community_generation": { - "description": "Community energy generation data", - "data_types": ["generation", "production", "renewable"], - "format": "generation_data" - }, - "grid_events": { - "description": "Grid-related events and alerts", - "data_types": ["events", "alerts", "grid_status"], - "format": "event_data" - } - } - - async def initialize(self): - """Initialize publisher with default topic configurations""" - try: - for topic, config in self.default_topics.items(): - await self.configure_topic(topic, config) - - logger.info(f"Initialized Redis publisher with {len(self.default_topics)} default topics") - - except Exception as e: - logger.error(f"Error initializing Redis publisher: {e}") - raise - - async def publish_time_series_data(self, topic: str, data: List[Dict[str, Any]], source_name: str): - """Publish time series data to a specific Redis topic""" - try: - if not data: - logger.warning(f"No data to publish to topic: {topic}") - return - - logger.info(f"Publishing {len(data)} records to topic: {topic}") - - # Get topic configuration - topic_config = self.topic_configs.get(topic, {}) - data_format = topic_config.get("format", "sensor_reading") - - # Process and publish each data point - published_count = 0 - for record in data: - try: - # Transform data based on topic format - message = await self._transform_data_for_topic(record, data_format, source_name) - - # Add publishing metadata - message["published_at"] = datetime.utcnow().isoformat() - message["topic"] = topic - message["message_id"] = str(uuid.uuid4()) - - # Publish to Redis - await self.redis.publish(topic, json.dumps(message)) - - published_count += 1 - self.publishing_stats[topic] += 1 - - except Exception as e: - logger.warning(f"Error publishing record to {topic}: {e}") - continue - - logger.info(f"Successfully published {published_count}/{len(data)} records to {topic}") - - # Update topic statistics - await self._update_topic_stats(topic, published_count) - - except Exception as e: - logger.error(f"Error publishing to topic {topic}: {e}") - raise - - async def publish_single_message(self, topic: str, message: Dict[str, Any]): - """Publish a single message to a Redis topic""" - try: - # Add metadata - message["published_at"] = datetime.utcnow().isoformat() - message["topic"] = topic - message["message_id"] = str(uuid.uuid4()) - - # Publish - await self.redis.publish(topic, json.dumps(message)) - - self.publishing_stats[topic] += 1 - logger.debug(f"Published single message to {topic}") - - except Exception as e: - logger.error(f"Error publishing single message to {topic}: {e}") - raise - - async def publish_batch(self, topic_messages: Dict[str, List[Dict[str, Any]]]): - """Publish multiple messages to multiple topics""" - try: - total_published = 0 - - for topic, messages in topic_messages.items(): - for message in messages: - await self.publish_single_message(topic, message) - total_published += 1 - - logger.info(f"Batch published {total_published} messages across {len(topic_messages)} topics") - - except Exception as e: - logger.error(f"Error in batch publishing: {e}") - raise - - async def configure_topic(self, topic: str, config: Dict[str, Any]): - """Configure a topic with specific settings""" - try: - self.topic_configs[topic] = { - "description": config.get("description", ""), - "data_types": config.get("data_types", ["all"]), - "format": config.get("format", "generic"), - "created_at": datetime.utcnow().isoformat(), - "message_count": 0 - } - - logger.info(f"Configured topic: {topic}") - - except Exception as e: - logger.error(f"Error configuring topic {topic}: {e}") - raise - - async def get_topics_info(self) -> Dict[str, Any]: - """Get information about all configured topics""" - try: - topics_info = {} - - for topic, config in self.topic_configs.items(): - # Get recent message count - message_count = self.publishing_stats.get(topic, 0) - - topics_info[topic] = { - **config, - "message_count": message_count, - "last_published": await self._get_last_published_time(topic) - } - - return topics_info - - except Exception as e: - logger.error(f"Error getting topics info: {e}") - return {} - - async def get_publishing_stats(self) -> Dict[str, Any]: - """Get publishing statistics""" - try: - total_messages = sum(self.publishing_stats.values()) - - return { - "total_messages_published": total_messages, - "active_topics": len(self.topic_configs), - "topic_stats": dict(self.publishing_stats), - "last_updated": datetime.utcnow().isoformat() - } - - except Exception as e: - logger.error(f"Error getting publishing stats: {e}") - return {} - - async def _transform_data_for_topic(self, record: Dict[str, Any], format_type: str, source_name: str) -> Dict[str, Any]: - """Transform data based on topic format requirements""" - try: - base_message = { - "source": source_name, - "format": format_type - } - - if format_type == "sensor_reading": - return await self._format_as_sensor_reading(record, base_message) - elif format_type == "aggregated_data": - return await self._format_as_aggregated_data(record, base_message) - elif format_type == "metric_update": - return await self._format_as_metric_update(record, base_message) - elif format_type == "simulation_point": - return await self._format_as_simulation_point(record, base_message) - elif format_type == "generation_data": - return await self._format_as_generation_data(record, base_message) - elif format_type == "event_data": - return await self._format_as_event_data(record, base_message) - else: - # Generic format - return {**base_message, **record} - - except Exception as e: - logger.error(f"Error transforming data for format {format_type}: {e}") - return {**base_message, **record} - - async def _format_as_sensor_reading(self, record: Dict[str, Any], base_message: Dict[str, Any]) -> Dict[str, Any]: - """Format data as sensor reading for energy dashboard""" - return { - **base_message, - "type": "sensor_data", - "sensorId": record.get("sensor_id", "unknown"), - "sensor_id": record.get("sensor_id", "unknown"), - "timestamp": record.get("timestamp", int(datetime.utcnow().timestamp())), - "value": record.get("value", 0), - "unit": record.get("unit", "kWh"), - "room": record.get("metadata", {}).get("room"), - "sensor_type": self._infer_sensor_type(record), - "metadata": record.get("metadata", {}), - "data_quality": await self._assess_data_quality(record) - } - - async def _format_as_aggregated_data(self, record: Dict[str, Any], base_message: Dict[str, Any]) -> Dict[str, Any]: - """Format data as aggregated community data""" - return { - **base_message, - "type": "aggregated_consumption", - "community_id": record.get("sensor_id", "community_1"), - "timestamp": record.get("timestamp", int(datetime.utcnow().timestamp())), - "total_consumption": record.get("value", 0), - "unit": record.get("unit", "kWh"), - "period": "real_time", - "households": record.get("metadata", {}).get("households", 1), - "average_per_household": record.get("value", 0) / max(record.get("metadata", {}).get("households", 1), 1) - } - - async def _format_as_metric_update(self, record: Dict[str, Any], base_message: Dict[str, Any]) -> Dict[str, Any]: - """Format data as real-time metric update""" - return { - **base_message, - "type": "metric_update", - "metric_id": record.get("sensor_id", "unknown"), - "metric_type": self._infer_metric_type(record), - "timestamp": record.get("timestamp", int(datetime.utcnow().timestamp())), - "current_value": record.get("value", 0), - "unit": record.get("unit", "kWh"), - "trend": await self._calculate_trend(record), - "metadata": record.get("metadata", {}) - } - - async def _format_as_simulation_point(self, record: Dict[str, Any], base_message: Dict[str, Any]) -> Dict[str, Any]: - """Format data for simulation purposes""" - return { - **base_message, - "type": "simulation_data", - "simulation_id": f"sim_{record.get('sensor_id', 'unknown')}", - "timestamp": record.get("timestamp", int(datetime.utcnow().timestamp())), - "energy_value": record.get("value", 0), - "unit": record.get("unit", "kWh"), - "scenario": record.get("metadata", {}).get("scenario", "baseline"), - "location": record.get("metadata", {}).get("location", "unknown"), - "data_source": record.get("data_source", "real_community"), - "quality_score": await self._assess_data_quality(record) - } - - async def _format_as_generation_data(self, record: Dict[str, Any], base_message: Dict[str, Any]) -> Dict[str, Any]: - """Format data as energy generation data""" - return { - **base_message, - "type": "generation_data", - "generator_id": record.get("sensor_id", "unknown"), - "timestamp": record.get("timestamp", int(datetime.utcnow().timestamp())), - "generation_value": record.get("value", 0), - "unit": record.get("unit", "kWh"), - "generation_type": record.get("metadata", {}).get("type", "renewable"), - "efficiency": record.get("metadata", {}).get("efficiency", 0.85), - "weather_conditions": record.get("metadata", {}).get("weather") - } - - async def _format_as_event_data(self, record: Dict[str, Any], base_message: Dict[str, Any]) -> Dict[str, Any]: - """Format data as grid event""" - return { - **base_message, - "type": "grid_event", - "event_id": str(uuid.uuid4()), - "timestamp": record.get("timestamp", int(datetime.utcnow().timestamp())), - "event_type": await self._classify_event_type(record), - "severity": await self._assess_event_severity(record), - "affected_area": record.get("metadata", {}).get("area", "unknown"), - "value": record.get("value", 0), - "unit": record.get("unit", "kWh"), - "description": f"Energy event detected: {record.get('value', 0)} {record.get('unit', 'kWh')}" - } - - def _infer_sensor_type(self, record: Dict[str, Any]) -> str: - """Infer sensor type from record data""" - value = record.get("value", 0) - unit = record.get("unit", "").lower() - metadata = record.get("metadata", {}) - - if "generation" in str(metadata).lower() or "solar" in str(metadata).lower(): - return "generation" - elif "temperature" in str(metadata).lower() or "temp" in str(metadata).lower(): - return "temperature" - elif "co2" in str(metadata).lower() or "carbon" in str(metadata).lower(): - return "co2" - elif "humidity" in str(metadata).lower(): - return "humidity" - elif "motion" in str(metadata).lower() or "occupancy" in str(metadata).lower(): - return "motion" - else: - return "energy" - - def _infer_metric_type(self, record: Dict[str, Any]) -> str: - """Infer metric type from record""" - unit = record.get("unit", "").lower() - - if "wh" in unit: - return "energy" - elif "w" in unit: - return "power" - elif "°c" in unit or "celsius" in unit or "temp" in unit: - return "temperature" - elif "%" in unit: - return "percentage" - elif "ppm" in unit or "co2" in unit: - return "co2" - else: - return "generic" - - async def _calculate_trend(self, record: Dict[str, Any]) -> str: - """Calculate trend for metric (simplified)""" - # This is a simplified trend calculation - # In a real implementation, you'd compare with historical values - value = record.get("value", 0) - - if value > 100: - return "increasing" - elif value < 50: - return "decreasing" - else: - return "stable" - - async def _assess_data_quality(self, record: Dict[str, Any]) -> float: - """Assess data quality score (0-1)""" - score = 1.0 - - # Check for missing fields - if not record.get("timestamp"): - score -= 0.2 - if not record.get("sensor_id"): - score -= 0.2 - if record.get("value") is None: - score -= 0.3 - if not record.get("unit"): - score -= 0.1 - - # Check for reasonable values - value = record.get("value", 0) - if value < 0: - score -= 0.1 - if value > 10000: # Unusually high energy value - score -= 0.1 - - return max(0.0, score) - - async def _classify_event_type(self, record: Dict[str, Any]) -> str: - """Classify event type based on data""" - value = record.get("value", 0) - - if value > 1000: - return "high_consumption" - elif value < 10: - return "low_consumption" - else: - return "normal_operation" - - async def _assess_event_severity(self, record: Dict[str, Any]) -> str: - """Assess event severity""" - value = record.get("value", 0) - - if value > 5000: - return "critical" - elif value > 1000: - return "warning" - elif value < 5: - return "info" - else: - return "normal" - - async def _update_topic_stats(self, topic: str, count: int): - """Update topic statistics""" - try: - stats_key = f"topic_stats:{topic}" - await self.redis.hincrby(stats_key, "message_count", count) - await self.redis.hset(stats_key, "last_published", datetime.utcnow().isoformat()) - await self.redis.expire(stats_key, 86400) # Expire after 24 hours - - except Exception as e: - logger.error(f"Error updating topic stats: {e}") - - async def _get_last_published_time(self, topic: str) -> Optional[str]: - """Get last published time for a topic""" - try: - stats_key = f"topic_stats:{topic}" - return await self.redis.hget(stats_key, "last_published") - except Exception as e: - logger.debug(f"Error getting last published time for {topic}: {e}") - return None - - async def create_data_stream(self, topic: str, data_stream: List[Dict[str, Any]], - interval_seconds: float = 1.0): - """Create a continuous data stream by publishing data at intervals""" - try: - logger.info(f"Starting data stream for topic {topic} with {len(data_stream)} points") - - for i, data_point in enumerate(data_stream): - await self.publish_single_message(topic, data_point) - - # Add stream metadata - stream_info = { - "type": "stream_info", - "topic": topic, - "current_point": i + 1, - "total_points": len(data_stream), - "progress": (i + 1) / len(data_stream) * 100, - "timestamp": datetime.utcnow().isoformat() - } - - await self.publish_single_message(f"{topic}_stream_info", stream_info) - - # Wait before next data point - if i < len(data_stream) - 1: - await asyncio.sleep(interval_seconds) - - logger.info(f"Completed data stream for topic {topic}") - - except Exception as e: - logger.error(f"Error creating data stream: {e}") - raise - - async def cleanup_old_stats(self, days: int = 7): - """Clean up old topic statistics""" - try: - # Get all topic stat keys - pattern = "topic_stats:*" - keys = [] - - async for key in self.redis.scan_iter(match=pattern): - keys.append(key) - - # Delete old keys (Redis TTL should handle this, but cleanup anyway) - if keys: - await self.redis.delete(*keys) - logger.info(f"Cleaned up {len(keys)} old topic stat keys") - - except Exception as e: - logger.error(f"Error cleaning up old stats: {e}") \ No newline at end of file diff --git a/microservices/data-ingestion-service/src/simple_sa4cps_config.py b/microservices/data-ingestion-service/src/simple_sa4cps_config.py deleted file mode 100644 index 7b448dc..0000000 --- a/microservices/data-ingestion-service/src/simple_sa4cps_config.py +++ /dev/null @@ -1,177 +0,0 @@ -""" -Simplified SA4CPS Configuration -Auto-configures for ftp.sa4cps.pt with .slg_v2 files only -""" - -import asyncio -import logging -from datetime import datetime -from typing import Dict, Any -from database import get_database - -logger = logging.getLogger(__name__) - -class SimpleSA4CPSConfig: - """Simplified SA4CPS configuration for .slg_v2 files only""" - - def __init__(self): - self.ftp_host = "ftp.sa4cps.pt" - self.source_name = "SA4CPS Smart Grid Data" - - async def setup_sa4cps_source(self, username: str = "curvascarga@sa4cps.pt", - password: str = "n$WFtz9+bleN", - remote_path: str = "/") -> Dict[str, Any]: - """Create the SA4CPS data source""" - try: - db = await get_database() - - # Check if already exists - existing = await db.data_sources.find_one({"name": self.source_name}) - if existing: - logger.info("SA4CPS source already configured") - return { - "success": True, - "message": "Already configured", - "source_id": str(existing["_id"]) - } - - # Create simplified SA4CPS data source - source_doc = { - "name": self.source_name, - "description": "SA4CPS Smart Grid .slg_v2 data from ftp.sa4cps.pt", - "source_type": "ftp", - "ftp_config": { - "host": self.ftp_host, - "port": 21, - "username": username, - "password": password, - "remote_path": remote_path, - "use_ssl": False, - "passive_mode": True, - "timeout": 30 - }, - "file_patterns": ["*.slg_v2"], - "data_format": "slg_v2", - "redis_topics": ["sa4cps_energy_data", "sa4cps_raw_data"], - "enabled": True, - "check_interval_seconds": 300, # 5 minutes - "created_at": datetime.utcnow(), - "updated_at": datetime.utcnow(), - "status": "configured" - } - - result = await db.data_sources.insert_one(source_doc) - source_id = str(result.inserted_id) - - logger.info(f"✅ SA4CPS source configured: {source_id}") - - return { - "success": True, - "message": "SA4CPS source configured successfully", - "source_id": source_id, - "ftp_host": self.ftp_host, - "file_pattern": "*.slg_v2", - "topics": ["sa4cps_energy_data", "sa4cps_raw_data"] - } - - except Exception as e: - logger.error(f"❌ Failed to configure SA4CPS source: {e}") - return { - "success": False, - "message": f"Configuration failed: {str(e)}" - } - - async def test_connection(self) -> Dict[str, Any]: - """Test SA4CPS FTP connection""" - try: - from ftp_monitor import FTPMonitor - from database import get_redis - - db = await get_database() - redis = await get_redis() - - source = await db.data_sources.find_one({"name": self.source_name}) - if not source: - return {"success": False, "message": "SA4CPS source not configured"} - - monitor = FTPMonitor(db, redis) - connection_test = await monitor.test_connection(source) - - if connection_test: - files = await monitor.check_for_new_files(source) - return { - "success": True, - "message": f"✅ Connected to {self.ftp_host}", - "files_found": len(files), - "sample_files": [f["filename"] for f in files[:5]] - } - else: - return { - "success": False, - "message": f"❌ Cannot connect to {self.ftp_host}" - } - - except Exception as e: - logger.error(f"Connection test failed: {e}") - return { - "success": False, - "message": f"Connection test error: {str(e)}" - } - - async def get_status(self) -> Dict[str, Any]: - """Get SA4CPS source status""" - try: - db = await get_database() - source = await db.data_sources.find_one({"name": self.source_name}) - - if not source: - return {"configured": False, "message": "Not configured"} - - # Get processing stats - processed_count = await db.processed_files.count_documents({"source_id": source["_id"]}) - - return { - "configured": True, - "source_id": str(source["_id"]), - "name": source["name"], - "enabled": source.get("enabled", False), - "ftp_host": self.ftp_host, - "last_check": source.get("last_check").isoformat() if source.get("last_check") else None, - "files_processed": processed_count, - "status": "✅ Ready for .slg_v2 files" - } - - except Exception as e: - return {"configured": False, "error": str(e)} - -async def quick_setup(): - """Quick setup for SA4CPS""" - print("🚀 Setting up SA4CPS .slg_v2 data ingestion...") - - config = SimpleSA4CPSConfig() - - # Setup source - result = await config.setup_sa4cps_source() - print(f"Setup: {result['message']}") - - if result['success']: - # Test connection - test_result = await config.test_connection() - print(f"Connection: {test_result['message']}") - - if test_result['success']: - print(f"📁 Found {test_result.get('files_found', 0)} .slg_v2 files") - - # Show status - status = await config.get_status() - print(f"Status: {status.get('status', 'Unknown')}") - - print("\n✅ SA4CPS setup complete!") - print("📊 Data will be published to Redis topics:") - print(" • sa4cps_energy_data (processed sensor readings)") - print(" • sa4cps_raw_data (raw .slg_v2 content)") - else: - print("❌ Setup failed. Check configuration and try again.") - -if __name__ == "__main__": - asyncio.run(quick_setup()) \ No newline at end of file diff --git a/microservices/data-ingestion-service/src/slg_v2_processor.py b/microservices/data-ingestion-service/src/slg_v2_processor.py deleted file mode 100644 index 2a6f44b..0000000 --- a/microservices/data-ingestion-service/src/slg_v2_processor.py +++ /dev/null @@ -1,300 +0,0 @@ -""" -Simplified SA4CPS .slg_v2 file processor -Focused exclusively on processing .slg_v2 files from ftp.sa4cps.pt -""" - -import logging -from datetime import datetime, timedelta -from typing import List, Dict, Any, Optional -import re - -logger = logging.getLogger(__name__) - -class SLGv2Processor: - """Simplified processor for SA4CPS .slg_v2 files only""" - - def __init__(self, db, redis_client): - self.db = db - self.redis = redis_client - - async def process_slg_v2_file(self, file_content: bytes) -> List[Dict[str, Any]]: - """Process a .slg_v2 file and return standardized sensor readings""" - try: - # Decode file content - try: - text_content = file_content.decode('utf-8') - except UnicodeDecodeError: - text_content = file_content.decode('latin1', errors='ignore') - - logger.info(f"Processing SLG_V2 file ({len(file_content)} bytes)") - - lines = text_content.strip().split('\n') - if not lines: - logger.warning("SLG_V2 file is empty") - return [] - - processed_data = [] - header = None - metadata = {} - - for line_idx, line in enumerate(lines): - line = line.strip() - - if not line: - continue - - # Extract metadata from comment lines - if line.startswith('#') or line.startswith('//'): - comment = line[1:].strip() if line.startswith('#') else line[2:].strip() - if ':' in comment: - key, value = comment.split(':', 1) - metadata[key.strip()] = value.strip() - continue - - # Detect header line - if header is None and self._is_header_line(line): - header = self._parse_header(line) - continue - - # Process data lines - try: - processed_row = self._process_data_line(line, header, metadata, line_idx) - if processed_row: - processed_data.append(processed_row) - except Exception as e: - logger.warning(f"Error processing SLG_V2 line {line_idx}: {e}") - continue - - logger.info(f"Successfully processed {len(processed_data)} SLG_V2 records") - return processed_data - - except Exception as e: - logger.error(f"Error processing SLG_V2 file: {e}") - raise - - def _is_header_line(self, line: str) -> bool: - """Check if line appears to be a header""" - # Common SA4CPS header patterns - header_keywords = ['timestamp', 'time', 'date', 'sensor', 'id', 'energy', 'power', 'voltage', 'current'] - line_lower = line.lower() - - has_keywords = any(keyword in line_lower for keyword in header_keywords) - - # Check if most parts are non-numeric (likely header) - parts = re.split(r'[,;\t\s]+', line) - numeric_parts = 0 - for part in parts: - try: - float(part.strip()) - numeric_parts += 1 - except ValueError: - continue - - return has_keywords and (numeric_parts < len(parts) / 2) - - def _parse_header(self, line: str) -> List[str]: - """Parse header line and return column names""" - # Try different delimiters - for delimiter in [',', ';', '\t']: - if delimiter in line: - parts = [part.strip() for part in line.split(delimiter) if part.strip()] - if len(parts) > 1: - return parts - - # Default to whitespace splitting - return [part.strip() for part in line.split() if part.strip()] - - def _process_data_line(self, line: str, header: Optional[List[str]], - metadata: Dict[str, Any], line_idx: int) -> Optional[Dict[str, Any]]: - """Process a single data line into a sensor reading""" - try: - # Parse line into parts - parts = self._parse_line_parts(line) - if not parts: - return None - - # Map parts to columns - if header and len(parts) >= len(header): - row_dict = dict(zip(header, parts[:len(header)])) - else: - row_dict = {f"col_{i}": part for i, part in enumerate(parts)} - - # Extract core sensor reading fields - processed_row = { - 'timestamp': self._extract_timestamp(row_dict, line_idx), - 'sensor_id': self._extract_sensor_id(row_dict, line_idx), - 'value': self._extract_primary_value(row_dict), - 'unit': self._infer_unit(row_dict), - 'metadata': { - **metadata, # File-level metadata - **row_dict, # All row data - 'line_number': line_idx, - 'raw_line': line - }, - 'processed_at': datetime.utcnow().isoformat(), - 'data_source': 'sa4cps_slg_v2', - 'file_format': 'SLG_V2' - } - - # Extract additional numeric values - additional_values = self._extract_additional_values(row_dict) - if additional_values: - processed_row['additional_values'] = additional_values - - return processed_row - - except Exception as e: - logger.error(f"Error processing data line {line_idx}: {e}") - return None - - def _parse_line_parts(self, line: str) -> List[str]: - """Parse line into parts using appropriate delimiter""" - for delimiter in [',', ';', '\t']: - if delimiter in line: - parts = [part.strip() for part in line.split(delimiter) if part.strip()] - if len(parts) > 1: - return parts - - # Fallback to whitespace - return [part.strip() for part in line.split() if part.strip()] - - def _extract_timestamp(self, row_dict: Dict[str, str], line_idx: int) -> int: - """Extract timestamp from row data""" - # Look for timestamp columns - for key, val in row_dict.items(): - if any(ts_word in key.lower() for ts_word in ['time', 'date', 'timestamp', 'ts']): - timestamp = self._parse_timestamp(val) - if timestamp: - return int(timestamp.timestamp()) - - # Use current time with line offset if no timestamp found - return int((datetime.utcnow() + timedelta(seconds=line_idx)).timestamp()) - - def _extract_sensor_id(self, row_dict: Dict[str, str], line_idx: int) -> str: - """Extract sensor ID from row data""" - for key, val in row_dict.items(): - if any(id_word in key.lower() for id_word in ['sensor', 'device', 'meter', 'id']): - return str(val).strip() - - return f"sa4cps_sensor_{line_idx}" - - def _extract_primary_value(self, row_dict: Dict[str, str]) -> Optional[float]: - """Extract the primary numeric value (typically energy)""" - # Priority order for SA4CPS data - priority_keys = ['energy', 'consumption', 'kwh', 'power', 'watt', 'value'] - - # First, try priority keys - for priority_key in priority_keys: - for key, val in row_dict.items(): - if priority_key in key.lower(): - numeric_val = self._parse_numeric(val) - if numeric_val is not None: - return numeric_val - - # Fallback: first numeric value found - for key, val in row_dict.items(): - if not any(skip_word in key.lower() for skip_word in ['time', 'date', 'id', 'sensor', 'device']): - numeric_val = self._parse_numeric(val) - if numeric_val is not None: - return numeric_val - - return None - - def _extract_additional_values(self, row_dict: Dict[str, str]) -> Dict[str, Dict[str, Any]]: - """Extract additional numeric values beyond the primary one""" - additional = {} - - for key, val in row_dict.items(): - if any(skip_word in key.lower() for skip_word in ['time', 'date', 'id', 'sensor', 'device']): - continue - - numeric_val = self._parse_numeric(val) - if numeric_val is not None: - additional[key] = { - 'value': numeric_val, - 'unit': self._infer_unit_from_key(key, numeric_val) - } - - return additional - - def _infer_unit(self, row_dict: Dict[str, str]) -> str: - """Infer unit from column names and values""" - for key in row_dict.keys(): - unit = self._infer_unit_from_key(key, 0) - if unit != "unknown": - return unit - return "kWh" # Default for SA4CPS energy data - - def _infer_unit_from_key(self, key: str, value: float) -> str: - """Infer unit based on column name""" - key_lower = key.lower() - - if any(word in key_lower for word in ['energy', 'kwh', 'consumption']): - return "kWh" - elif any(word in key_lower for word in ['power', 'watt', 'w']): - return "W" - elif any(word in key_lower for word in ['voltage', 'volt', 'v']): - return "V" - elif any(word in key_lower for word in ['current', 'amp', 'a']): - return "A" - elif any(word in key_lower for word in ['temp', 'temperature']): - return "°C" - elif any(word in key_lower for word in ['freq', 'frequency']): - return "Hz" - elif any(word in key_lower for word in ['percent', '%']): - return "%" - else: - return "unknown" - - def _parse_timestamp(self, timestamp_str: str) -> Optional[datetime]: - """Parse timestamp from string""" - try: - # Common SA4CPS timestamp formats - formats = [ - "%Y-%m-%d %H:%M:%S", - "%Y-%m-%dT%H:%M:%S", - "%Y-%m-%dT%H:%M:%SZ", - "%d/%m/%Y %H:%M:%S", - "%Y/%m/%d %H:%M:%S" - ] - - for fmt in formats: - try: - return datetime.strptime(timestamp_str.strip(), fmt) - except ValueError: - continue - - # Try parsing as unix timestamp - try: - timestamp_num = float(timestamp_str) - if timestamp_num > 1e10: # Milliseconds - timestamp_num = timestamp_num / 1000 - return datetime.fromtimestamp(timestamp_num) - except: - pass - - return None - - except Exception as e: - logger.debug(f"Error parsing timestamp '{timestamp_str}': {e}") - return None - - def _parse_numeric(self, value_str: str) -> Optional[float]: - """Parse numeric value from string""" - try: - # Clean the string of non-numeric characters (except decimal point and minus) - cleaned = re.sub(r'[^\d.-]', '', value_str.strip()) - if cleaned: - return float(cleaned) - return None - except Exception: - return None - - async def get_processing_stats(self) -> Dict[str, Any]: - """Get processing statistics""" - return { - "supported_formats": ["slg_v2"], - "format_description": "SA4CPS Smart Grid Data Format v2", - "specializations": ["energy_monitoring", "smart_grid", "sensor_telemetry"], - "last_updated": datetime.utcnow().isoformat() - } \ No newline at end of file diff --git a/microservices/data-ingestion-service/tests/test_simple_processor.py b/microservices/data-ingestion-service/tests/test_simple_processor.py deleted file mode 100644 index 929fa61..0000000 --- a/microservices/data-ingestion-service/tests/test_simple_processor.py +++ /dev/null @@ -1,103 +0,0 @@ -#!/usr/bin/env python3 -""" -Simple test for the streamlined SA4CPS .slg_v2 processor -""" - -import asyncio -import json -import sys -from pathlib import Path - -# Add src directory to path -sys.path.append(str(Path(__file__).parent.parent / "src")) -from slg_v2_processor import SLGv2Processor - -# Sample SA4CPS .slg_v2 test data -SAMPLE_SLG_V2_DATA = """# SA4CPS Smart Grid Data Export -# Location: Research Building A -# System: Energy Monitoring v2.1 -# Date: 2024-01-15 -timestamp,sensor_id,energy_kwh,power_w,voltage_v,current_a -2024-01-15T10:00:00,GRID_A_001,1234.5,850.2,230.1,3.7 -2024-01-15T10:01:00,GRID_A_001,1235.1,865.3,229.8,3.8 -2024-01-15T10:02:00,GRID_A_002,987.3,654.2,228.9,2.9 -2024-01-15T10:03:00,GRID_A_002,988.1,661.5,229.2,2.9 -""" - -SPACE_DELIMITED_DATA = """# Smart Building Energy Data -# Building: Laboratory Complex -2024-01-15T10:00:00 LAB_SENSOR_01 1500.23 750.5 240.1 -2024-01-15T10:01:00 LAB_SENSOR_01 1501.85 780.2 239.8 -2024-01-15T10:02:00 LAB_SENSOR_02 890.45 420.8 241.2 -""" - -class MockProcessor(SLGv2Processor): - def __init__(self): - # Mock without database dependencies - pass - -async def test_slg_v2_processing(): - """Test the simplified .slg_v2 processor""" - print("🧪 Testing Simplified SA4CPS .slg_v2 Processor") - print("=" * 50) - - processor = MockProcessor() - - # Test 1: CSV-style .slg_v2 - print("\n📋 Test 1: CSV-style SA4CPS data") - try: - result1 = await processor.process_slg_v2_file(SAMPLE_SLG_V2_DATA.encode('utf-8')) - print(f"✅ Processed {len(result1)} records") - - if result1: - sample = result1[0] - print("📄 Sample record:") - print(f" Sensor: {sample['sensor_id']}") - print(f" Timestamp: {sample['timestamp']}") - print(f" Value: {sample['value']} {sample['unit']}") - print(f" Additional values: {len(sample.get('additional_values', {}))}") - - except Exception as e: - print(f"❌ Test 1 failed: {e}") - - # Test 2: Space-delimited data - print("\n📋 Test 2: Space-delimited SA4CPS data") - try: - result2 = await processor.process_slg_v2_file(SPACE_DELIMITED_DATA.encode('utf-8')) - print(f"✅ Processed {len(result2)} records") - - if result2: - sample = result2[0] - print("📄 Sample record:") - print(f" Sensor: {sample['sensor_id']}") - print(f" Value: {sample['value']} {sample['unit']}") - print(f" Metadata keys: {len(sample.get('metadata', {}))}") - - except Exception as e: - print(f"❌ Test 2 failed: {e}") - - # Test 3: Processing stats - print("\n📊 Test 3: Processing statistics") - try: - stats = await processor.get_processing_stats() - print("✅ Processor statistics:") - print(f" Supported formats: {stats['supported_formats']}") - print(f" Description: {stats['format_description']}") - print(f" Specializations: {', '.join(stats['specializations'])}") - - except Exception as e: - print(f"❌ Test 3 failed: {e}") - - print("\n🎉 Testing complete!") - print("\n📈 Benefits of simplified processor:") - print(" • 70% less code complexity") - print(" • Focused only on SA4CPS .slg_v2 format") - print(" • Optimized for energy monitoring data") - print(" • Faster processing and easier maintenance") - print("\n🔗 Integration:") - print(" • Auto-connects to ftp.sa4cps.pt") - print(" • Processes *.slg_v2 files automatically") - print(" • Publishes to sa4cps_energy_data Redis topic") - -if __name__ == "__main__": - asyncio.run(test_slg_v2_processing()) \ No newline at end of file diff --git a/microservices/data-ingestion-service/tests/verify_setup.py b/microservices/data-ingestion-service/tests/verify_setup.py deleted file mode 100644 index d83ba5b..0000000 --- a/microservices/data-ingestion-service/tests/verify_setup.py +++ /dev/null @@ -1,197 +0,0 @@ -#!/usr/bin/env python3 -""" -Verification script for simplified SA4CPS data ingestion service -Checks all components without requiring database connections -""" - -import os -import sys -from pathlib import Path - -def check_file_exists(filepath, description): - """Check if a file exists and report status""" - if Path(filepath).exists(): - print(f"✅ {description}: {filepath}") - return True - else: - print(f"❌ MISSING {description}: {filepath}") - return False - -def check_directory_structure(): - """Verify all required files are present""" - print("📁 Checking SA4CPS Data Ingestion Service Structure") - print("=" * 55) - - src_files = [ - ("src/main.py", "FastAPI main application"), - ("src/models.py", "Pydantic data models"), - ("src/database.py", "Database connection manager"), - ("src/slg_v2_processor.py", "SA4CPS .slg_v2 file processor"), - ("src/simple_sa4cps_config.py", "Simplified SA4CPS configuration"), - ("src/ftp_monitor.py", "FTP monitoring service"), - ("src/redis_publisher.py", "Redis message publisher"), - ("src/data_validator.py", "Data validation utilities"), - ("src/monitoring.py", "Service monitoring components") - ] - - test_files = [ - ("tests/test_simple_processor.py", "Processor test suite"), - ("tests/verify_setup.py", "Setup verification script") - ] - - config_files = [ - ("requirements.txt", "Python dependencies"), - ("Dockerfile", "Docker container configuration") - ] - - files_to_check = src_files + test_files + config_files - - all_present = True - for filename, description in files_to_check: - if not check_file_exists(filename, description): - all_present = False - - return all_present - -def check_configuration(): - """Verify SA4CPS configuration""" - print(f"\n🔧 Checking SA4CPS Configuration") - print("-" * 35) - - # Check if simple_sa4cps_config.py has correct settings - try: - with open("src/simple_sa4cps_config.py", "r") as f: - content = f.read() - - if "ftp.sa4cps.pt" in content: - print("✅ FTP host configured: ftp.sa4cps.pt") - else: - print("❌ FTP host not found in config") - - if "curvascarga@sa4cps.pt" in content: - print("✅ FTP username configured") - else: - print("❌ FTP username not found") - - if ".slg_v2" in content: - print("✅ SLG_V2 file format configured") - else: - print("❌ SLG_V2 format not configured") - - if "sa4cps_energy_data" in content: - print("✅ Redis topics configured") - else: - print("❌ Redis topics not configured") - - return True - except Exception as e: - print(f"❌ Error reading config: {e}") - return False - -def check_processor(): - """Verify processor functionality""" - print(f"\n⚙️ Checking SLG_V2 Processor") - print("-" * 30) - - try: - # Import without database dependencies - sys.path.append('.') - - # Check if processor can be imported - print("✅ SLGv2Processor class available") - - # Check test file - if Path("tests/test_simple_processor.py").exists(): - with open("tests/test_simple_processor.py", "r") as f: - test_content = f.read() - - if "CSV-style SA4CPS data" in test_content: - print("✅ CSV format test available") - if "Space-delimited SA4CPS data" in test_content: - print("✅ Space-delimited format test available") - if "Processing statistics" in test_content: - print("✅ Statistics test available") - - return True - except Exception as e: - print(f"❌ Processor check failed: {e}") - return False - -def check_docker_setup(): - """Verify Docker configuration""" - print(f"\n🐳 Checking Docker Configuration") - print("-" * 35) - - # Check Dockerfile - if Path("Dockerfile").exists(): - with open("Dockerfile", "r") as f: - dockerfile_content = f.read() - - if "python:3.9-slim" in dockerfile_content: - print("✅ Python 3.9 base image") - if "requirements.txt" in dockerfile_content: - print("✅ Dependencies installation configured") - if "8008" in dockerfile_content: - print("✅ Port 8008 exposed") - if "uvicorn" in dockerfile_content: - print("✅ ASGI server configured") - else: - print("❌ Dockerfile missing") - return False - - # Check requirements.txt - if Path("requirements.txt").exists(): - with open("requirements.txt", "r") as f: - requirements = f.read() - - required_deps = ["fastapi", "motor", "redis", "ftputil", "pandas"] - for dep in required_deps: - if dep in requirements: - print(f"✅ {dep} dependency listed") - else: - print(f"❌ {dep} dependency missing") - - return True - -def generate_summary(): - """Generate setup summary""" - print(f"\n📊 SA4CPS Service Summary") - print("=" * 30) - print("🎯 Purpose: Monitor ftp.sa4cps.pt for .slg_v2 files") - print("📁 File Format: SA4CPS Smart Grid Data (.slg_v2)") - print("🌐 FTP Server: ftp.sa4cps.pt") - print("👤 Username: curvascarga@sa4cps.pt") - print("🔄 Processing: Real-time sensor data extraction") - print("📤 Output: Redis topics (sa4cps_energy_data, sa4cps_raw_data)") - print("🐳 Deployment: Docker container on port 8008") - - print(f"\n🚀 Next Steps:") - print("1. Run: docker-compose up data-ingestion-service") - print("2. Test: python test_simple_processor.py") - print("3. Configure: python simple_sa4cps_config.py") - print("4. Monitor: Check /health endpoint") - -def main(): - """Main verification function""" - print("🔍 SA4CPS Data Ingestion Service Verification") - print("=" * 50) - - # Run all checks - structure_ok = check_directory_structure() - config_ok = check_configuration() - processor_ok = check_processor() - docker_ok = check_docker_setup() - - # Final status - print(f"\n{'='*50}") - if all([structure_ok, config_ok, processor_ok, docker_ok]): - print("🎉 SA4CPS Data Ingestion Service: READY FOR DEPLOYMENT") - print("✅ All components verified successfully") - else: - print("⚠️ SA4CPS Data Ingestion Service: ISSUES FOUND") - print("❌ Please fix the issues above before deployment") - - generate_summary() - -if __name__ == "__main__": - main() \ No newline at end of file