Data ingestion service tests
This commit is contained in:
40
microservices/data-ingestion-service/src/config.py
Normal file
40
microservices/data-ingestion-service/src/config.py
Normal file
@@ -0,0 +1,40 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Configuration for SA4CPS Data Ingestion Service
|
||||
Simple configuration management for FTP and MongoDB connections
|
||||
"""
|
||||
|
||||
import os
|
||||
from typing import Dict, Any
|
||||
|
||||
# FTP Configuration for SA4CPS server
|
||||
FTP_CONFIG: Dict[str, Any] = {
|
||||
"host": os.getenv("SA4CPS_FTP_HOST", "ftp.sa4cps.pt"),
|
||||
"username": os.getenv("SA4CPS_FTP_USER", "curvascarga@sa4cps.pt"),
|
||||
"password": os.getenv("SA4CPS_FTP_PASS", ""), # Set via environment variable
|
||||
"base_path": os.getenv("SA4CPS_FTP_PATH", "/SLGs/Faial/PT0010000000015181AA/"),
|
||||
"check_interval": int(os.getenv("SA4CPS_CHECK_INTERVAL", "21600")) # 6 hours default
|
||||
}
|
||||
|
||||
# MongoDB Configuration
|
||||
MONGO_CONFIG: Dict[str, Any] = {
|
||||
"connection_string": os.getenv(
|
||||
"MONGODB_URL",
|
||||
"mongodb://admin:admin@localhost:27018/sa4cps_energy?authSource=admin"
|
||||
),
|
||||
"database_name": os.getenv("MONGODB_DATABASE", "sa4cps_energy")
|
||||
}
|
||||
|
||||
# Logging Configuration
|
||||
LOGGING_CONFIG: Dict[str, Any] = {
|
||||
"level": os.getenv("LOG_LEVEL", "INFO"),
|
||||
"format": "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
|
||||
}
|
||||
|
||||
# Service Configuration
|
||||
SERVICE_CONFIG: Dict[str, Any] = {
|
||||
"name": "SA4CPS Data Ingestion Service",
|
||||
"version": "1.0.0",
|
||||
"port": int(os.getenv("SERVICE_PORT", "8008")),
|
||||
"host": os.getenv("SERVICE_HOST", "0.0.0.0")
|
||||
}
|
||||
171
microservices/data-ingestion-service/src/slg_processor.py
Normal file
171
microservices/data-ingestion-service/src/slg_processor.py
Normal file
@@ -0,0 +1,171 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
SA4CPS SLG_V2 File Processor
|
||||
Simple parser for .slg_v2 energy data files
|
||||
"""
|
||||
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import List, Dict, Any, Optional
|
||||
import re
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class SLGProcessor:
|
||||
"""Processes SA4CPS .slg_v2 files into structured energy data records"""
|
||||
|
||||
def __init__(self):
|
||||
self.processed_files = 0
|
||||
self.total_records = 0
|
||||
|
||||
async def process_file(self, file_path: str, filename: str) -> List[Dict[str, Any]]:
|
||||
"""Process a .slg_v2 file and return energy data records"""
|
||||
logger.info(f"Processing SLG file: {filename}")
|
||||
|
||||
try:
|
||||
with open(file_path, 'r', encoding='utf-8') as file:
|
||||
lines = file.readlines()
|
||||
|
||||
records = []
|
||||
file_metadata = self._parse_metadata(lines[:5]) # Parse first 5 lines for metadata
|
||||
|
||||
# Process data lines (lines starting with '20' are data records)
|
||||
for line_num, line in enumerate(lines, 1):
|
||||
line = line.strip()
|
||||
|
||||
if line.startswith('20'): # Data record lines start with '20' (year)
|
||||
record = self._parse_data_line(line, file_metadata, filename)
|
||||
if record:
|
||||
records.append(record)
|
||||
|
||||
self.processed_files += 1
|
||||
self.total_records += len(records)
|
||||
|
||||
logger.info(f"Processed {len(records)} records from {filename}")
|
||||
return records
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error processing {filename}: {e}")
|
||||
return []
|
||||
|
||||
def _parse_metadata(self, header_lines: List[str]) -> Dict[str, Any]:
|
||||
"""Parse metadata from SLG file header"""
|
||||
metadata = {
|
||||
"meter_id": None,
|
||||
"measurement_type": None,
|
||||
"unit": None,
|
||||
"interval": None,
|
||||
"period_start": None,
|
||||
"period_end": None
|
||||
}
|
||||
|
||||
try:
|
||||
for line in header_lines:
|
||||
line = line.strip()
|
||||
|
||||
if line.startswith('00'): # Header line with meter info
|
||||
parts = line.split('\t')
|
||||
if len(parts) >= 12:
|
||||
metadata["meter_id"] = parts[3] # Meter ID
|
||||
metadata["period_start"] = self._parse_date(parts[6])
|
||||
metadata["period_end"] = self._parse_date(parts[7])
|
||||
|
||||
elif line.startswith('01'): # Measurement configuration
|
||||
parts = line.split('\t')
|
||||
if len(parts) >= 10:
|
||||
metadata["measurement_type"] = parts[4] # POTENCIA
|
||||
metadata["unit"] = parts[5] # K (kW)
|
||||
metadata["interval"] = parts[6] # 15M
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Error parsing metadata: {e}")
|
||||
|
||||
return metadata
|
||||
|
||||
def _parse_data_line(self, line: str, metadata: Dict[str, Any], filename: str) -> Optional[Dict[str, Any]]:
|
||||
"""Parse a data line into an energy record"""
|
||||
try:
|
||||
parts = line.split('\t')
|
||||
|
||||
if len(parts) < 4:
|
||||
return None
|
||||
|
||||
# Parse timestamp (format: 20250201 0015)
|
||||
date_part = parts[1] # 20250201
|
||||
time_part = parts[2] # 0015
|
||||
|
||||
# Convert to datetime
|
||||
timestamp = self._parse_timestamp(date_part, time_part)
|
||||
if not timestamp:
|
||||
return None
|
||||
|
||||
# Parse energy value
|
||||
value_str = parts[3].replace('.', '') # Remove decimal separator
|
||||
try:
|
||||
value = float(value_str) / 1000.0 # Convert from thousandths
|
||||
except ValueError:
|
||||
value = 0.0
|
||||
|
||||
# Create record
|
||||
record = {
|
||||
"timestamp": timestamp,
|
||||
"meter_id": metadata.get("meter_id", "unknown"),
|
||||
"measurement_type": metadata.get("measurement_type", "energy"),
|
||||
"value": value,
|
||||
"unit": metadata.get("unit", "kW"),
|
||||
"interval": metadata.get("interval", "15M"),
|
||||
"filename": filename,
|
||||
"quality": int(parts[4]) if len(parts) > 4 else 0
|
||||
}
|
||||
|
||||
return record
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Error parsing data line '{line}': {e}")
|
||||
return None
|
||||
|
||||
def _parse_date(self, date_str: str) -> Optional[datetime]:
|
||||
"""Parse date string (YYYYMMDD format)"""
|
||||
try:
|
||||
if len(date_str) == 8 and date_str.isdigit():
|
||||
year = int(date_str[:4])
|
||||
month = int(date_str[4:6])
|
||||
day = int(date_str[6:8])
|
||||
return datetime(year, month, day)
|
||||
except ValueError:
|
||||
pass
|
||||
return None
|
||||
|
||||
def _parse_timestamp(self, date_str: str, time_str: str) -> Optional[datetime]:
|
||||
"""Parse timestamp from date and time strings"""
|
||||
try:
|
||||
# Parse date (YYYYMMDD)
|
||||
if len(date_str) != 8 or not date_str.isdigit():
|
||||
return None
|
||||
|
||||
year = int(date_str[:4])
|
||||
month = int(date_str[4:6])
|
||||
day = int(date_str[6:8])
|
||||
|
||||
# Parse time (HHMM)
|
||||
if len(time_str) != 4 or not time_str.isdigit():
|
||||
return None
|
||||
|
||||
hour = int(time_str[:2])
|
||||
if hour ==24:
|
||||
hour = 0
|
||||
minute = int(time_str[2:4])
|
||||
|
||||
return datetime(year, month, day, hour, minute)
|
||||
|
||||
except ValueError as e:
|
||||
logger.warning(f"Error parsing timestamp '{date_str} {time_str}': {e}")
|
||||
return None
|
||||
|
||||
def get_stats(self) -> Dict[str, int]:
|
||||
"""Get processing statistics"""
|
||||
return {
|
||||
"files_processed": self.processed_files,
|
||||
"total_records": self.total_records
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
1
microservices/data-ingestion-service/tests/path.txt
Normal file
1
microservices/data-ingestion-service/tests/path.txt
Normal file
@@ -0,0 +1 @@
|
||||
ftp.sa4cps.pt/SLGs/Faial/PT0010000000015181AA/2025_02/
|
||||
104
microservices/data-ingestion-service/tests/test_slg_processor.py
Normal file
104
microservices/data-ingestion-service/tests/test_slg_processor.py
Normal file
@@ -0,0 +1,104 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Test SLG Processor with sample SA4CPS data
|
||||
Simple test to validate .slg_v2 file processing
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import sys
|
||||
import os
|
||||
from pathlib import Path
|
||||
|
||||
# Add src directory to path
|
||||
sys.path.append(str(Path(__file__).parent.parent / "src"))
|
||||
|
||||
from slg_processor import SLGProcessor
|
||||
|
||||
|
||||
async def test_sample_file():
|
||||
"""Test processing the sample .slg_v2 file"""
|
||||
print("🧪 Testing SLG Processor with sample file")
|
||||
print("=" * 45)
|
||||
|
||||
# Path to sample file
|
||||
sample_file = Path(__file__).parent / "ORDCELPE_20250907_8947167363.sgl_v2"
|
||||
|
||||
if not sample_file.exists():
|
||||
print(f"❌ Sample file not found: {sample_file}")
|
||||
return
|
||||
|
||||
processor = SLGProcessor()
|
||||
|
||||
try:
|
||||
# Process the file
|
||||
records = await processor.process_file(str(sample_file), sample_file.name)
|
||||
|
||||
print(f"✅ File processed successfully")
|
||||
print(f"📊 Records extracted: {len(records)}")
|
||||
|
||||
if records:
|
||||
# Show first few records
|
||||
print(f"\n📋 Sample Records:")
|
||||
for i, record in enumerate(records[:3]):
|
||||
print(f" {i+1}. {record['timestamp']} - {record['value']} {record['unit']} (Meter: {record['meter_id']})")
|
||||
|
||||
if len(records) > 3:
|
||||
print(f" ... and {len(records) - 3} more records")
|
||||
|
||||
# Show statistics
|
||||
total_energy = sum(r['value'] for r in records)
|
||||
print(f"\n📈 Statistics:")
|
||||
print(f" Total Energy: {total_energy:.3f} kW")
|
||||
print(f" Average: {total_energy/len(records):.3f} kW")
|
||||
print(f" Time Range: {records[0]['timestamp']} to {records[-1]['timestamp']}")
|
||||
|
||||
else:
|
||||
print("⚠️ No records extracted from file")
|
||||
|
||||
except Exception as e:
|
||||
print(f"❌ Error processing file: {e}")
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
|
||||
|
||||
def validate_environment():
|
||||
"""Validate test environment"""
|
||||
print("🔧 Validating Test Environment")
|
||||
print("-" * 35)
|
||||
|
||||
# Check if sample file exists
|
||||
sample_file = Path(__file__).parent / "ORDCELPE_20250907_8947167363.sgl_v2"
|
||||
if sample_file.exists():
|
||||
print(f"✅ Sample file found: {sample_file.name}")
|
||||
print(f" Size: {sample_file.stat().st_size} bytes")
|
||||
else:
|
||||
print(f"❌ Sample file missing: {sample_file}")
|
||||
|
||||
# Check source files
|
||||
src_dir = Path(__file__).parent.parent / "src"
|
||||
required_files = ["main.py", "slg_processor.py", "config.py", "ftp_monitor.py", "database.py"]
|
||||
|
||||
for filename in required_files:
|
||||
file_path = src_dir / filename
|
||||
if file_path.exists():
|
||||
print(f"✅ Source file: {filename}")
|
||||
else:
|
||||
print(f"❌ Missing: {filename}")
|
||||
|
||||
|
||||
async def main():
|
||||
"""Main test function"""
|
||||
print("🚀 SA4CPS Data Ingestion Service - Test Suite")
|
||||
print("=" * 50)
|
||||
|
||||
validate_environment()
|
||||
print()
|
||||
|
||||
await test_sample_file()
|
||||
|
||||
print("\n" + "=" * 50)
|
||||
print("✅ Test suite completed")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
Reference in New Issue
Block a user