diff --git a/microservices/data-ingestion-service/src/database.py b/microservices/data-ingestion-service/src/database.py index a75346d..485ec50 100644 --- a/microservices/data-ingestion-service/src/database.py +++ b/microservices/data-ingestion-service/src/database.py @@ -15,6 +15,7 @@ class DatabaseManager: self.client: Optional[MongoClient] = None self.db = None self.collections = {} + self.energy_collections_cache = {} # Cache for dynamically created energy data collections self.connection_string = MONGO_CONFIG["connection_string"] self.database_name = MONGO_CONFIG["database_name"] @@ -31,12 +32,11 @@ class DatabaseManager: self.db = self.client[self.database_name] self.collections = { 'files': self.db.sa4cps_files, - 'energy_data': self.db.sa4cps_energy_data, 'metadata': self.db.sa4cps_metadata, 'scanned_directories': self.db.sa4cps_scanned_directories } - self._create_indexes() + self._create_base_indexes() logger.info(f"Connected to MongoDB database: {self.database_name}") @@ -78,13 +78,12 @@ class DatabaseManager: logger.error(f"MongoDB ping failed with error: {e}") raise ConnectionFailure(f"Ping failed: {e}") - def _create_indexes(self): + def _create_base_indexes(self): + """Create indexes for base collections (not energy data collections)""" try: self.collections['files'].create_index("filename", unique=True) self.collections['files'].create_index("processed_at") - - self.collections['energy_data'].create_index([("filename", 1), ("timestamp", 1)]) - self.collections['energy_data'].create_index("timestamp") + self.collections['files'].create_index("directory_path") self.collections['scanned_directories'].create_index("directory_path", unique=True) self.collections['scanned_directories'].create_index("last_scanned") @@ -94,13 +93,97 @@ class DatabaseManager: except Exception as e: logger.warning(f"Failed to create indexes: {e}") - async def store_file_data(self, filename: str, records: List[Dict[str, Any]]) -> bool: + def _extract_level3_path(self, directory_path: str) -> Optional[str]: + """Extract level 3 directory path (SLGs/Community/Building) from full path""" + # Expected structure: /SLGs/Community/Building/... + parts = directory_path.strip('/').split('/') + + if len(parts) >= 3 and parts[0] == 'SLGs': + # Return SLGs/Community/Building + return '/'.join(parts[:3]) + + return None + + def _sanitize_collection_name(self, level3_path: str) -> str: + """Convert level 3 directory path to valid MongoDB collection name + + Example: SLGs/CommunityA/Building1 -> energy_data__CommunityA_Building1 + """ + parts = level3_path.strip('/').split('/') + + if len(parts) >= 3 and parts[0] == 'SLGs': + # Use Community_Building as the collection suffix + collection_suffix = f"{parts[1]}_{parts[2]}" + collection_name = f"energy_data__{collection_suffix}" + return collection_name + + # Fallback: sanitize the entire path + sanitized = level3_path.replace('/', '_').replace('.', '_').replace(' ', '_') + sanitized = sanitized.strip('_') + return f"energy_data__{sanitized}" + + def _get_energy_collection(self, directory_path: str): + """Get or create energy data collection for a specific level 3 directory path""" + level3_path = self._extract_level3_path(directory_path) + + if not level3_path: + logger.warning(f"Could not extract level 3 path from: {directory_path}, using default collection") + # Fallback to a default collection for non-standard paths + collection_name = "energy_data__other" + else: + collection_name = self._sanitize_collection_name(level3_path) + + # Check cache first + if collection_name in self.energy_collections_cache: + return self.energy_collections_cache[collection_name] + + # Create/get collection + collection = self.db[collection_name] + + # Create indexes for this energy collection + try: + collection.create_index([("filename", 1), ("timestamp", 1)]) + collection.create_index("timestamp") + collection.create_index("meter_id") + logger.debug(f"Created indexes for collection: {collection_name}") + except Exception as e: + logger.warning(f"Failed to create indexes for {collection_name}: {e}") + + # Cache the collection + self.energy_collections_cache[collection_name] = collection + logger.info(f"Initialized energy data collection: {collection_name} for path: {directory_path}") + + return collection + + def _list_energy_collections(self) -> List[str]: + """List all energy data collections in the database""" + try: + all_collections = self.db.list_collection_names() + # Filter collections that start with 'energy_data__' + energy_collections = [c for c in all_collections if c.startswith('energy_data__')] + return energy_collections + except Exception as e: + logger.error(f"Error listing energy collections: {e}") + return [] + + async def store_file_data(self, filename: str, records: List[Dict[str, Any]], directory_path: str = None) -> bool: try: current_time = datetime.now() + # Determine which collection to use based on directory path + if directory_path: + energy_collection = self._get_energy_collection(directory_path) + level3_path = self._extract_level3_path(directory_path) + else: + logger.warning(f"No directory path provided for {filename}, using default collection") + energy_collection = self._get_energy_collection("/SLGs/unknown/unknown") + level3_path = None + # Store file metadata file_metadata = { "filename": filename, + "directory_path": directory_path, + "level3_path": level3_path, "record_count": len(records), "processed_at": current_time, "file_size": sum(len(str(record)) for record in records), @@ -118,12 +201,13 @@ class DatabaseManager: for record in records: record["filename"] = filename record["processed_at"] = current_time + record["directory_path"] = directory_path - # Insert energy data records + # Insert energy data records into the appropriate collection if records: - result = self.collections['energy_data'].insert_many(records) + result = energy_collection.insert_many(records) inserted_count = len(result.inserted_ids) - logger.debug(f"Stored {inserted_count} records from {filename}") + logger.debug(f"Stored {inserted_count} records from {filename} to {energy_collection.name}") return True return False @@ -134,6 +218,7 @@ class DatabaseManager: # Store error metadata error_metadata = { "filename": filename, + "directory_path": directory_path, "processed_at": current_time, "status": "error", "error_message": str(e) @@ -178,8 +263,14 @@ class DatabaseManager: return None # Directory scanning tracking methods + # Note: Only level 4+ directories (/SLGs/Community/Building/SubDir) are tracked + # to avoid unnecessary caching of high-level organizational directories + async def is_directory_scanned(self, directory_path: str, since_timestamp: datetime = None) -> bool: - """Check if directory has been scanned recently""" + """Check if directory has been scanned recently + + Note: Only level 4+ directories are tracked in the database + """ try: query = {"directory_path": directory_path, "scan_status": "complete"} if since_timestamp: @@ -249,14 +340,14 @@ class DatabaseManager: return False async def get_stats(self) -> Dict[str, Any]: - """Get database statistics""" + """Get database statistics including all energy collections""" try: stats = { "database": self.database_name, "timestamp": datetime.now().isoformat() } - # Count documents in each collection + # Count documents in base collections for name, collection in self.collections.items(): try: count = collection.count_documents({}) @@ -264,12 +355,35 @@ class DatabaseManager: except Exception as e: stats[f"{name}_count"] = f"error: {e}" + # Get all energy collections and their counts + try: + energy_collections = self._list_energy_collections() + energy_stats = [] + total_energy_records = 0 + + for collection_name in energy_collections: + collection = self.db[collection_name] + count = collection.count_documents({}) + total_energy_records += count + + energy_stats.append({ + "collection": collection_name, + "record_count": count + }) + + stats["energy_collections"] = energy_stats + stats["total_energy_collections"] = len(energy_collections) + stats["total_energy_records"] = total_energy_records + + except Exception as e: + stats["energy_collections"] = f"error: {e}" + # Get recent files try: recent_files = [] cursor = self.collections['files'].find( {}, - {"filename": 1, "processed_at": 1, "record_count": 1, "status": 1, "_id": 0} + {"filename": 1, "processed_at": 1, "record_count": 1, "status": 1, "directory_path": 1, "level3_path": 1, "_id": 0} ).sort("processed_at", -1).limit(5) for doc in cursor: @@ -292,8 +406,17 @@ class DatabaseManager: filename: Optional[str] = None, start_time: Optional[datetime] = None, end_time: Optional[datetime] = None, + directory_path: Optional[str] = None, limit: int = 100) -> List[Dict[str, Any]]: - """Retrieve energy data with optional filtering""" + """Retrieve energy data with optional filtering + + Args: + filename: Filter by specific filename + start_time: Filter by start timestamp + end_time: Filter by end timestamp + directory_path: Filter by specific directory path (level 3). If None, queries all collections + limit: Maximum number of records to return + """ try: query = {} @@ -308,22 +431,48 @@ class DatabaseManager: time_query["$lte"] = end_time query["timestamp"] = time_query - cursor = self.collections['energy_data'].find(query).sort("timestamp", -1).limit(limit) - data = [] - for doc in cursor: - # Convert ObjectId to string and datetime to ISO string - if "_id" in doc: - doc["_id"] = str(doc["_id"]) - if "timestamp" in doc and hasattr(doc["timestamp"], "isoformat"): - doc["timestamp"] = doc["timestamp"].isoformat() - if "processed_at" in doc and hasattr(doc["processed_at"], "isoformat"): - doc["processed_at"] = doc["processed_at"].isoformat() - data.append(doc) + # If directory_path is specified, query only that collection + if directory_path: + collection = self._get_energy_collection(directory_path) + cursor = collection.find(query).sort("timestamp", -1).limit(limit) + + for doc in cursor: + data.append(self._format_energy_document(doc)) + + else: + # Query across all energy collections + energy_collection_names = self._list_energy_collections() + + # Collect data from all collections, then sort and limit + all_data = [] + per_collection_limit = max(limit, 1000) # Get more from each to ensure we have enough after sorting + + for collection_name in energy_collection_names: + collection = self.db[collection_name] + cursor = collection.find(query).sort("timestamp", -1).limit(per_collection_limit) + + for doc in cursor: + all_data.append(self._format_energy_document(doc)) + + # Sort all data by timestamp and apply final limit + all_data.sort(key=lambda x: x.get("timestamp", ""), reverse=True) + data = all_data[:limit] return data except Exception as e: logger.error(f"Error retrieving energy data: {e}") return [] + + def _format_energy_document(self, doc: Dict[str, Any]) -> Dict[str, Any]: + """Format energy document for API response""" + # Convert ObjectId to string and datetime to ISO string + if "_id" in doc: + doc["_id"] = str(doc["_id"]) + if "timestamp" in doc and hasattr(doc["timestamp"], "isoformat"): + doc["timestamp"] = doc["timestamp"].isoformat() + if "processed_at" in doc and hasattr(doc["processed_at"], "isoformat"): + doc["processed_at"] = doc["processed_at"].isoformat() + return doc diff --git a/microservices/data-ingestion-service/src/ftp_monitor.py b/microservices/data-ingestion-service/src/ftp_monitor.py index 79556fd..8f2a1de 100644 --- a/microservices/data-ingestion-service/src/ftp_monitor.py +++ b/microservices/data-ingestion-service/src/ftp_monitor.py @@ -17,6 +17,7 @@ class FTPFileInfo: path: str name: str size: int + directory_path: str # Directory containing the file modified_time: Optional[datetime] = None @@ -79,6 +80,7 @@ class FTPMonitor: self.status = "stopped" return + await asyncio.wait_for(self.check_for_new_files(), timeout=300.0) # 5 minute timeout self.status = "running" # Optionally skip initial scan and wait for first scheduled interval @@ -202,9 +204,13 @@ class FTPMonitor: visited_dirs.add(normalized_path) - # Check if directory should be skipped based on previous scans - if await self.db_manager.should_skip_directory(normalized_path): - logger.info(f"Skipping previously scanned directory: {normalized_path}") + # Determine directory depth (level 4 = /SLGs/Community/Building/SubDir) + path_parts = normalized_path.strip('/').split('/') + directory_level = len(path_parts) + + # Check if directory should be skipped based on previous scans (only for level 4+) + if directory_level >= 4 and await self.db_manager.should_skip_directory(normalized_path): + logger.info(f"Skipping previously scanned level {directory_level} directory: {normalized_path}") skipped_dirs += 1 continue @@ -221,7 +227,7 @@ class FTPMonitor: # Count files found in this directory files_found_in_dir = 0 - + for line in dir_list: parts = line.split() if len(parts) >= 9: @@ -257,7 +263,8 @@ class FTPMonitor: files.append(FTPFileInfo( path=full_path, name=filename, - size=size + size=size, + directory_path=normalized_path )) files_found_in_dir += 1 @@ -265,10 +272,13 @@ class FTPMonitor: logger.warning(f"Could not parse file info for: {filename}") ftp.cwd(original_dir) - - # Mark directory as scanned - await self.db_manager.mark_directory_scanned(normalized_path, files_found_in_dir) - logger.debug(f"Completed scanning: {normalized_path} ({files_found_in_dir} files found)") + + # Mark directory as scanned (only for level 4+ directories) + if directory_level >= 4: + await self.db_manager.mark_directory_scanned(normalized_path, files_found_in_dir) + logger.debug(f"Completed scanning level {directory_level} directory: {normalized_path} ({files_found_in_dir} files found)") + else: + logger.debug(f"Completed scanning level {directory_level} directory (not saved to cache): {normalized_path} ({files_found_in_dir} files found)") except Exception as e: logger.warning(f"Error scanning directory {normalized_path}: {e}") @@ -277,7 +287,7 @@ class FTPMonitor: logger.info(f"Iterative scan completed. Scanned: {scanned_dirs} directories, Skipped: {skipped_dirs} directories (Total visited: {len(visited_dirs)})") async def _process_file(self, ftp: FTP, file_info: FTPFileInfo) -> bool: - logger.debug(f"Processing file: {file_info.path} ({file_info.size} bytes)") + logger.debug(f"Processing file: {file_info.path} ({file_info.size} bytes) from directory: {file_info.directory_path}") try: with tempfile.NamedTemporaryFile(mode='wb', suffix='.slg_v2', delete=False) as temp_file: @@ -289,8 +299,9 @@ class FTPMonitor: records = await self.processor.process_file(temp_path, file_info.name) if records: - await self.db_manager.store_file_data(file_info.name, records) - logger.debug(f"Stored {len(records)} records from {file_info.name}") + # Pass directory path to store_file_data for collection selection + await self.db_manager.store_file_data(file_info.name, records, file_info.directory_path) + logger.debug(f"Stored {len(records)} records from {file_info.name} to collection for {file_info.directory_path}") return True else: logger.warning(f"No valid records found in {file_info.name}") diff --git a/microservices/data-ingestion-service/tests/test_database_skip.py b/microservices/data-ingestion-service/tests/test_database_skip.py index d7b67e5..ebd6d58 100644 --- a/microservices/data-ingestion-service/tests/test_database_skip.py +++ b/microservices/data-ingestion-service/tests/test_database_skip.py @@ -32,7 +32,7 @@ class MockDatabaseManager: """Mock get list of processed files""" return list(self.processed_files) - async def store_file_data(self, filename: str, records: List) -> bool: + async def store_file_data(self, filename: str, records: List, directory_path: str = None) -> bool: """Mock store file data""" self.processed_files.add(filename) self.stored_files[filename] = records