Switch to PyMongo, update config and requirements, fix FTP extension

typo

- Replace Motor (async) with PyMongo (sync) in database manager - Update
environment variable names for FTP and MongoDB config - Remove unused
dependencies from requirements.txt - Fix file extension typo: .slg_v2 →
.sgl_v2 throughout code and docs - Add debug prints for MongoDB env vars
in config - Update FTP monitor to use correct file extension and PyMongo
- Adjust FastAPI descriptions for new extension
This commit is contained in:
rafaeldpsilva
2025-09-11 11:45:19 +01:00
parent b2a5b3d229
commit 2932e0a424
6 changed files with 152 additions and 156 deletions

View File

@@ -9,10 +9,10 @@ The Data Ingestion Service provides comprehensive FTP monitoring and data proces
## Architecture
```
ftp.sa4cps.pt (.slg_v2 files)
ftp.sa4cps.pt (.slg_v2 files)
FTP Monitor (polls every 5 minutes)
Data Processor (supports multiple formats)
Redis Publisher (3 topic channels)
@@ -84,9 +84,9 @@ Set these in the `docker-compose.yml`:
environment:
- FTP_SA4CPS_HOST=ftp.sa4cps.pt # FTP server hostname
- FTP_SA4CPS_PORT=21 # FTP port (default: 21)
- FTP_SA4CPS_USERNAME=anonymous # FTP username
- FTP_SA4CPS_USERNAME= # FTP username
- FTP_SA4CPS_PASSWORD= # FTP password (empty for anonymous)
- FTP_SA4CPS_REMOTE_PATH=/ # Remote directory path
- FTP_SA4CPS_REMOTE_PATH=/ # Remote directory path
```
### Manual Configuration
@@ -101,7 +101,7 @@ configurator = SA4CPSConfigurator()
# Create data source
result = await configurator.create_sa4cps_data_source(
username="your_username",
password="your_password",
password="your_password",
remote_path="/data/energy"
)
@@ -144,7 +144,7 @@ timestamp,sensor_id,energy_kwh,power_w,voltage_v
2024-01-15T10:01:00Z,SENSOR_001,1235.1,865.3,229.8
```
### Space-Delimited Format
### Space-Delimited Format
```
# Energy consumption data
# System: Smart Grid Monitor
@@ -191,7 +191,7 @@ All processed data is converted to a standardized sensor reading format:
### sa4cps_energy_data
Primary energy consumption and power readings:
- Energy consumption (kWh, MWh)
- Power readings (W, kW, MW)
- Power readings (W, kW, MW)
- Efficiency metrics
### sa4cps_sensor_metrics
@@ -201,7 +201,7 @@ Sensor telemetry and environmental data:
- Sensor status/diagnostics
- System health metrics
### sa4cps_raw_data
### sa4cps_raw_data
Raw unprocessed data for debugging:
- Original file content
- Processing metadata
@@ -278,10 +278,10 @@ class CustomSA4CPSProcessor(DataProcessor):
async def _process_slg_v2_line(self, line, header, metadata, line_idx):
# Custom line processing logic
processed = await super()._process_slg_v2_line(line, header, metadata, line_idx)
# Add custom fields
processed['custom_field'] = 'custom_value'
return processed
```
@@ -295,4 +295,4 @@ For issues or questions:
## License
This implementation is part of the SA4CPS project energy monitoring dashboard.
This implementation is part of the SA4CPS project energy monitoring dashboard.

View File

@@ -3,26 +3,12 @@ fastapi==0.104.1
uvicorn==0.24.0
pydantic==2.5.0
# Database dependencies
motor==3.3.2
# Database dependencies - using PyMongo (sync) instead of Motor (async)
pymongo==4.6.0
redis==5.0.1
# FTP handling
ftputil==5.0.4
# Data processing
pandas==2.1.4
numpy==1.25.2
openpyxl==3.1.2
xlrd==2.0.1
# Async HTTP client
httpx==0.25.2
# Logging and monitoring
structlog==23.2.0
# Date/time utilities
python-dateutil==2.8.2
@@ -31,5 +17,4 @@ typing-extensions==4.8.0
# Development dependencies (optional)
pytest==7.4.3
pytest-asyncio==0.21.1
pytest-cov==4.1.0
pytest-asyncio==0.21.1

View File

@@ -9,20 +9,24 @@ from typing import Dict, Any
# FTP Configuration for SA4CPS server
FTP_CONFIG: Dict[str, Any] = {
"host": os.getenv("SA4CPS_FTP_HOST", "ftp.sa4cps.pt"),
"username": os.getenv("SA4CPS_FTP_USER", "curvascarga@sa4cps.pt"),
"password": os.getenv("SA4CPS_FTP_PASS", ""), # Set via environment variable
"base_path": os.getenv("SA4CPS_FTP_PATH", "/SLGs/Faial/PT0010000000015181AA/"),
"check_interval": int(os.getenv("SA4CPS_CHECK_INTERVAL", "21600")) # 6 hours default
"host": os.getenv("FTP_SA4CPS_HOST", "ftp.sa4cps.pt"),
"username": os.getenv("FTP_SA4CPS_USERNAME", "curvascarga@sa4cps.pt"),
"password": os.getenv("FTP_SA4CPS_PASSWORD", 'n$WFtz9+bleN'), # Set via environment variable
"base_path": os.getenv("FTP_SA4CPS_REMOTE_PATH", "/SLGs/Faial/"),
"check_interval": int(os.getenv("FTP_CHECK_INTERVAL", "21600")) # 6 hours default
}
# MongoDB Configuration
# Debug environment variables
print(f"DEBUG: MONGO_URL env var = {os.getenv('MONGO_URL', 'NOT SET')}")
print(f"DEBUG: All env vars starting with MONGO: {[k for k in os.environ.keys() if k.startswith('MONGO')]}")
MONGO_CONFIG: Dict[str, Any] = {
"connection_string": os.getenv(
"MONGODB_URL",
"mongodb://admin:admin@localhost:27018/sa4cps_energy?authSource=admin"
"MONGO_URL",
"mongodb://admin:password123@localhost:27017/digitalmente_ingestion?authSource=admin"
),
"database_name": os.getenv("MONGODB_DATABASE", "sa4cps_energy")
"database_name": os.getenv("MONGODB_DATABASE", "digitalmente_ingestion")
}
# Logging Configuration

View File

@@ -1,14 +1,12 @@
#!/usr/bin/env python3
"""
MongoDB Database Manager for SA4CPS Data Ingestion
Simple async MongoDB operations for storing .slg_v2 file data
Simple sync MongoDB operations for storing .sgl_v2 file data
"""
import asyncio
import logging
from datetime import datetime
from typing import List, Dict, Any, Optional
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure, ServerSelectionTimeoutError
from config import MONGO_CONFIG
@@ -18,26 +16,27 @@ logger = logging.getLogger(__name__)
class DatabaseManager:
"""Manages MongoDB connections and operations for SA4CPS data"""
def __init__(self):
self.client: Optional[AsyncIOMotorClient] = None
self.client: Optional[MongoClient] = None
self.db = None
self.collections = {}
# MongoDB configuration
self.connection_string = MONGO_CONFIG["connection_string"]
self.database_name = MONGO_CONFIG["database_name"]
logger.info(f"Database manager initialized for: {self.database_name}")
async def connect(self):
"""Connect to MongoDB"""
try:
self.client = AsyncIOMotorClient(self.connection_string)
logger.info(f"Connecting to MongoDB at: {self.connection_string}")
self.client = MongoClient(self.connection_string, serverSelectionTimeoutMS=5000)
# Test connection
await self.client.admin.command('ping')
await self.ping()
# Get database and collections
self.db = self.client[self.database_name]
self.collections = {
@@ -45,50 +44,58 @@ class DatabaseManager:
'energy_data': self.db.sa4cps_energy_data,
'metadata': self.db.sa4cps_metadata
}
# Create indexes for better performance
await self._create_indexes()
logger.info(f"Connected to MongoDB: {self.database_name}")
self._create_indexes()
logger.info(f"Connected to MongoDB database: {self.database_name}")
except (ConnectionFailure, ServerSelectionTimeoutError) as e:
logger.error(f"Failed to connect to MongoDB: {e}")
raise
async def close(self):
"""Close MongoDB connection"""
if self.client:
self.client.close()
logger.info("MongoDB connection closed")
async def ping(self):
"""Test database connection"""
if not self.client:
raise ConnectionFailure("No database connection")
await self.client.admin.command('ping')
async def _create_indexes(self):
try:
# The ping command is cheap and does not require auth.
self.client.admin.command('ping')
logger.info("MongoDB ping successful")
except ConnectionFailure as e:
logger.error(f"MongoDB ping failed - Server not available: {e}")
raise
except Exception as e:
logger.error(f"MongoDB ping failed with error: {e}")
raise ConnectionFailure(f"Ping failed: {e}")
def _create_indexes(self):
"""Create database indexes for efficient queries"""
try:
# Index on files collection
await self.collections['files'].create_index("filename", unique=True)
await self.collections['files'].create_index("processed_at")
self.collections['files'].create_index("filename", unique=True)
self.collections['files'].create_index("processed_at")
# Index on energy data collection
await self.collections['energy_data'].create_index([("filename", 1), ("timestamp", 1)])
await self.collections['energy_data'].create_index("timestamp")
self.collections['energy_data'].create_index([("filename", 1), ("timestamp", 1)])
self.collections['energy_data'].create_index("timestamp")
logger.info("Database indexes created successfully")
except Exception as e:
logger.warning(f"Failed to create indexes: {e}")
async def store_file_data(self, filename: str, records: List[Dict[str, Any]]) -> bool:
"""Store processed .slg_v2 file data in MongoDB"""
"""Store processed .sgl_v2 file data in MongoDB"""
try:
current_time = datetime.now()
# Store file metadata
file_metadata = {
"filename": filename,
@@ -97,31 +104,31 @@ class DatabaseManager:
"file_size": sum(len(str(record)) for record in records),
"status": "processed"
}
# Insert or update file record
await self.collections['files'].replace_one(
self.collections['files'].replace_one(
{"filename": filename},
file_metadata,
upsert=True
)
# Add filename and processed timestamp to each record
for record in records:
record["filename"] = filename
record["processed_at"] = current_time
# Insert energy data records
if records:
result = await self.collections['energy_data'].insert_many(records)
result = self.collections['energy_data'].insert_many(records)
inserted_count = len(result.inserted_ids)
logger.info(f"Stored {inserted_count} records from {filename}")
return True
return False
except Exception as e:
logger.error(f"Error storing data for {filename}: {e}")
# Store error metadata
error_metadata = {
"filename": filename,
@@ -129,15 +136,15 @@ class DatabaseManager:
"status": "error",
"error_message": str(e)
}
await self.collections['files'].replace_one(
self.collections['files'].replace_one(
{"filename": filename},
error_metadata,
upsert=True
)
return False
async def get_processed_files(self) -> List[str]:
"""Get list of successfully processed files"""
try:
@@ -145,25 +152,25 @@ class DatabaseManager:
{"status": "processed"},
{"filename": 1, "_id": 0}
)
files = []
async for doc in cursor:
for doc in cursor:
files.append(doc["filename"])
return files
except Exception as e:
logger.error(f"Error getting processed files: {e}")
return []
async def get_file_info(self, filename: str) -> Optional[Dict[str, Any]]:
"""Get information about a specific file"""
try:
return await self.collections['files'].find_one({"filename": filename})
return self.collections['files'].find_one({"filename": filename})
except Exception as e:
logger.error(f"Error getting file info for {filename}: {e}")
return None
async def get_stats(self) -> Dict[str, Any]:
"""Get database statistics"""
try:
@@ -171,15 +178,15 @@ class DatabaseManager:
"database": self.database_name,
"timestamp": datetime.now().isoformat()
}
# Count documents in each collection
for name, collection in self.collections.items():
try:
count = await collection.count_documents({})
count = collection.count_documents({})
stats[f"{name}_count"] = count
except Exception as e:
stats[f"{name}_count"] = f"error: {e}"
# Get recent files
try:
recent_files = []
@@ -187,24 +194,24 @@ class DatabaseManager:
{},
{"filename": 1, "processed_at": 1, "record_count": 1, "status": 1, "_id": 0}
).sort("processed_at", -1).limit(5)
async for doc in cursor:
for doc in cursor:
if doc.get("processed_at"):
doc["processed_at"] = doc["processed_at"].isoformat()
recent_files.append(doc)
stats["recent_files"] = recent_files
except Exception as e:
stats["recent_files"] = f"error: {e}"
return stats
except Exception as e:
logger.error(f"Error getting database stats: {e}")
return {"error": str(e), "timestamp": datetime.now().isoformat()}
async def get_energy_data(self,
async def get_energy_data(self,
filename: Optional[str] = None,
start_time: Optional[datetime] = None,
end_time: Optional[datetime] = None,
@@ -212,10 +219,10 @@ class DatabaseManager:
"""Retrieve energy data with optional filtering"""
try:
query = {}
if filename:
query["filename"] = filename
if start_time or end_time:
time_query = {}
if start_time:
@@ -223,11 +230,11 @@ class DatabaseManager:
if end_time:
time_query["$lte"] = end_time
query["timestamp"] = time_query
cursor = self.collections['energy_data'].find(query).sort("timestamp", -1).limit(limit)
data = []
async for doc in cursor:
for doc in cursor:
# Convert ObjectId to string and datetime to ISO string
if "_id" in doc:
doc["_id"] = str(doc["_id"])
@@ -235,11 +242,11 @@ class DatabaseManager:
doc["timestamp"] = doc["timestamp"].isoformat()
if "processed_at" in doc and hasattr(doc["processed_at"], "isoformat"):
doc["processed_at"] = doc["processed_at"].isoformat()
data.append(doc)
return data
except Exception as e:
logger.error(f"Error retrieving energy data: {e}")
return []
return []

View File

@@ -5,10 +5,10 @@ Monitors ftp.sa4cps.pt for new monthly files
"""
import asyncio
import ftplib
from ftplib import FTP
import logging
import os
from datetime import datetime, timedelta
from datetime import datetime
from typing import List, Dict, Any, Optional
from dataclasses import dataclass
import tempfile
@@ -30,7 +30,7 @@ class FTPFileInfo:
class FTPMonitor:
"""Monitors SA4CPS FTP server for new .slg_v2 files"""
def __init__(self, db_manager):
self.db_manager = db_manager
self.processor = SLGProcessor()
@@ -38,52 +38,50 @@ class FTPMonitor:
self.processed_files: set = set()
self.files_processed_count = 0
self.status = "initializing"
# FTP connection settings
self.ftp_host = FTP_CONFIG["host"]
self.ftp_user = FTP_CONFIG["username"]
self.ftp_pass = FTP_CONFIG["password"]
self.base_path = FTP_CONFIG["base_path"]
# Check interval: 6 hours (files are monthly, so frequent checks aren't needed)
self.check_interval = FTP_CONFIG.get("check_interval", 6 * 3600) # 6 hours
self.check_interval = FTP_CONFIG["check_interval"]
logger.info(f"FTP Monitor initialized for {self.ftp_host}")
async def start_monitoring(self):
"""Start the monitoring loop"""
self.status = "running"
logger.info("Starting FTP monitoring loop")
while True:
try:
await self.check_for_new_files()
self.status = "running"
# Wait for next check (6 hours)
logger.info(f"Waiting {self.check_interval/3600:.1f} hours until next check")
await asyncio.sleep(self.check_interval)
except Exception as e:
self.status = "error"
logger.error(f"Error in monitoring loop: {e}")
# Wait 30 minutes before retrying on error
await asyncio.sleep(1800)
async def check_for_new_files(self) -> Dict[str, Any]:
"""Check FTP server for new .slg_v2 files"""
self.last_check = datetime.now()
logger.info(f"Checking FTP server at {self.last_check}")
try:
# Connect to FTP server
with ftplib.FTP(self.ftp_host) as ftp:
with FTP(self.ftp_host) as ftp:
ftp.login(self.ftp_user, self.ftp_pass)
logger.info(f"Connected to FTP server: {self.ftp_host}")
# Find .slg_v2 files
new_files = await self._find_slg_files(ftp)
# Process new files
processed_count = 0
for file_info in new_files:
@@ -93,76 +91,78 @@ class FTPMonitor:
self.processed_files.add(file_info.path)
processed_count += 1
self.files_processed_count += 1
result = {
"files_found": len(new_files),
"files_processed": processed_count,
"timestamp": self.last_check.isoformat()
}
logger.info(f"Check complete: {result}")
return result
except Exception as e:
logger.error(f"FTP check failed: {e}")
raise
async def _find_slg_files(self, ftp: ftplib.FTP) -> List[FTPFileInfo]:
"""Find .slg_v2 files in the FTP directory structure"""
async def _find_slg_files(self, ftp: FTP) -> List[FTPFileInfo]:
"""Find .sgl_v2 files in the FTP directory structure"""
files = []
try:
# Navigate to base path
ftp.cwd(self.base_path)
logger.info(f"Scanning directory: {self.base_path}")
# Get directory listing
dir_list = []
ftp.retrlines('LIST', dir_list.append)
logger.info(f"Received {len(dir_list)} directory entries")
for line in dir_list:
print(line)
parts = line.split()
if len(parts) >= 9:
filename = parts[-1]
# Check if it's a .slg_v2 file
if filename.endswith('.slg_v2'):
if filename.endswith('.sgl_v2'):
print('found file')
try:
size = int(parts[4])
full_path = f"{self.base_path.rstrip('/')}/{filename}"
files.append(FTPFileInfo(
path=full_path,
name=filename,
size=size
))
except (ValueError, IndexError):
logger.warning(f"Could not parse file info for: {filename}")
logger.info(f"Found {len(files)} .slg_v2 files")
return files
except Exception as e:
logger.error(f"Error scanning FTP directory: {e}")
return []
async def _process_file(self, ftp: ftplib.FTP, file_info: FTPFileInfo) -> bool:
async def _process_file(self, ftp: FTP, file_info: FTPFileInfo) -> bool:
"""Download and process a .slg_v2 file"""
logger.info(f"Processing file: {file_info.name} ({file_info.size} bytes)")
try:
# Create temporary file for download
with tempfile.NamedTemporaryFile(mode='wb', suffix='.slg_v2', delete=False) as temp_file:
temp_path = temp_file.name
# Download file
with open(temp_path, 'wb') as f:
ftp.retrbinary(f'RETR {file_info.name}', f.write)
# Process the downloaded file
records = await self.processor.process_file(temp_path, file_info.name)
# Store in database
if records:
await self.db_manager.store_file_data(file_info.name, records)
@@ -171,11 +171,11 @@ class FTPMonitor:
else:
logger.warning(f"No valid records found in {file_info.name}")
return False
except Exception as e:
logger.error(f"Error processing file {file_info.name}: {e}")
return False
finally:
# Clean up temporary file
try:
@@ -183,19 +183,19 @@ class FTPMonitor:
os.unlink(temp_path)
except OSError:
pass
def get_status(self) -> str:
"""Get current monitor status"""
return self.status
def get_last_check_time(self) -> Optional[str]:
"""Get last check time as ISO string"""
return self.last_check.isoformat() if self.last_check else None
def get_processed_count(self) -> int:
"""Get total number of files processed"""
return self.files_processed_count
def get_detailed_status(self) -> Dict[str, Any]:
"""Get detailed status information"""
return {
@@ -206,4 +206,4 @@ class FTPMonitor:
"check_interval_hours": self.check_interval / 3600,
"ftp_host": self.ftp_host,
"base_path": self.base_path
}
}

View File

@@ -1,6 +1,6 @@
"""
SA4CPS Data Ingestion Service
Simple FTP monitoring service for .slg_v2 files with MongoDB storage
Simple FTP monitoring service for .sgl_v2 files with MongoDB storage
"""
from fastapi import FastAPI, HTTPException
@@ -53,7 +53,7 @@ async def lifespan(app: FastAPI):
# Create FastAPI app
app = FastAPI(
title="SA4CPS Data Ingestion Service",
description="Monitors FTP server for .slg_v2 files and stores data in MongoDB",
description="Monitors FTP server for .sgl_v2 files and stores data in MongoDB",
version="1.0.0",
lifespan=lifespan
)