- Add iterative directory scanning to prevent infinite recursion - Cache processed files in memory to avoid redundant database lookups - Skip already processed files using cache and database fallback - Add tests for skip logic and iterative scan behavior - Change logging for MongoDB connection and file storage to debug level - Clean up FastAPI app and remove redundant docstrings
357 lines
12 KiB
Python
357 lines
12 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test database skip functionality
|
|
Tests that already processed files are skipped to avoid reprocessing
|
|
"""
|
|
|
|
import asyncio
|
|
import sys
|
|
import os
|
|
from pathlib import Path
|
|
from unittest.mock import MagicMock, patch, AsyncMock
|
|
from typing import List
|
|
|
|
# Add src directory to path
|
|
sys.path.append(str(Path(__file__).parent.parent / "src"))
|
|
|
|
from ftp_monitor import FTPMonitor, FTPFileInfo
|
|
|
|
|
|
class MockDatabaseManager:
|
|
"""Mock database manager for testing skip functionality"""
|
|
|
|
def __init__(self):
|
|
self.processed_files = set()
|
|
self.stored_files = {}
|
|
|
|
async def is_file_processed(self, filename: str) -> bool:
|
|
"""Mock check if file is processed"""
|
|
return filename in self.processed_files
|
|
|
|
async def get_processed_files(self) -> List[str]:
|
|
"""Mock get list of processed files"""
|
|
return list(self.processed_files)
|
|
|
|
async def store_file_data(self, filename: str, records: List) -> bool:
|
|
"""Mock store file data"""
|
|
self.processed_files.add(filename)
|
|
self.stored_files[filename] = records
|
|
return True
|
|
|
|
def mark_as_processed(self, filename: str):
|
|
"""Helper method to mark file as processed for testing"""
|
|
self.processed_files.add(filename)
|
|
|
|
|
|
class MockFTP:
|
|
"""Mock FTP client"""
|
|
|
|
def __init__(self, directory_structure):
|
|
self.directory_structure = directory_structure
|
|
self.current_dir = '/'
|
|
|
|
def pwd(self):
|
|
return self.current_dir
|
|
|
|
def cwd(self, path):
|
|
if path in self.directory_structure:
|
|
self.current_dir = path
|
|
else:
|
|
raise Exception(f"Directory not found: {path}")
|
|
|
|
def retrlines(self, command, callback):
|
|
"""Mock LIST command"""
|
|
if not command.startswith('LIST'):
|
|
raise Exception(f"Unsupported command: {command}")
|
|
|
|
current_struct = self.directory_structure.get(self.current_dir, {})
|
|
|
|
# Add files
|
|
for filename in current_struct.get('files', []):
|
|
callback(f"-rw-r--r-- 1 user group 1024 Jan 01 12:00 {filename}")
|
|
|
|
|
|
async def test_skip_already_processed_files():
|
|
"""Test that already processed files are skipped"""
|
|
print("🧪 Testing skip already processed files")
|
|
print("-" * 40)
|
|
|
|
# Create mock directory with files
|
|
directory_structure = {
|
|
'/': {
|
|
'files': ['file1.sgl_v2', 'file2.sgl_v2', 'file3.sgl_v2']
|
|
}
|
|
}
|
|
|
|
# Create mock database with some files already processed
|
|
mock_db = MockDatabaseManager()
|
|
mock_db.mark_as_processed('file1.sgl_v2') # Already processed
|
|
mock_db.mark_as_processed('file3.sgl_v2') # Already processed
|
|
# file2.sgl_v2 is NOT processed
|
|
|
|
with patch('ftp_monitor.FTP_CONFIG', {
|
|
'host': 'test.example.com',
|
|
'username': 'testuser',
|
|
'password': 'testpass',
|
|
'base_path': '/',
|
|
'check_interval': 3600,
|
|
'recursive_scan': False,
|
|
'max_recursion_depth': 5
|
|
}):
|
|
# Create FTP monitor with mock database
|
|
monitor = FTPMonitor(mock_db)
|
|
|
|
# Initialize cache from database
|
|
cache_count = await monitor.initialize_processed_files_cache()
|
|
print(f" Loaded {cache_count} files from database cache")
|
|
|
|
# Verify cache was loaded correctly
|
|
assert cache_count == 2, f"Expected 2 cached files, got {cache_count}"
|
|
assert 'file1.sgl_v2' in monitor.processed_files
|
|
assert 'file3.sgl_v2' in monitor.processed_files
|
|
assert 'file2.sgl_v2' not in monitor.processed_files
|
|
|
|
mock_ftp = MockFTP(directory_structure)
|
|
|
|
# Mock the _process_file method to track which files are processed
|
|
processed_files = []
|
|
original_process_file = monitor._process_file
|
|
|
|
async def mock_process_file(ftp, file_info):
|
|
processed_files.append(file_info.name)
|
|
return True
|
|
|
|
monitor._process_file = mock_process_file
|
|
|
|
# Test file processing
|
|
result = await monitor.check_for_new_files()
|
|
|
|
print(f"✅ Processing complete")
|
|
print(f" Files found: {result['files_found']}")
|
|
print(f" Files processed: {result['files_processed']}")
|
|
print(f" Files skipped: {result['files_skipped']}")
|
|
|
|
# Verify results
|
|
assert result['files_found'] == 3, "Should find 3 files total"
|
|
assert result['files_processed'] == 1, "Should process only 1 new file"
|
|
assert result['files_skipped'] == 2, "Should skip 2 already processed files"
|
|
|
|
# Verify only file2.sgl_v2 was processed
|
|
assert len(processed_files) == 1, f"Expected 1 processed file, got {len(processed_files)}"
|
|
assert 'file2.sgl_v2' in processed_files, "Should process file2.sgl_v2"
|
|
|
|
print("✅ Skip already processed files test passed")
|
|
|
|
|
|
async def test_database_lookup_fallback():
|
|
"""Test that database lookup works when cache misses"""
|
|
print("\n🧪 Testing database lookup fallback")
|
|
print("-" * 40)
|
|
|
|
# Create mock directory with files
|
|
directory_structure = {
|
|
'/': {
|
|
'files': ['new_file.sgl_v2', 'db_only_file.sgl_v2']
|
|
}
|
|
}
|
|
|
|
# Create mock database
|
|
mock_db = MockDatabaseManager()
|
|
# Simulate a file that exists in database but not in cache
|
|
mock_db.mark_as_processed('db_only_file.sgl_v2')
|
|
|
|
with patch('ftp_monitor.FTP_CONFIG', {
|
|
'host': 'test.example.com',
|
|
'username': 'testuser',
|
|
'password': 'testpass',
|
|
'base_path': '/',
|
|
'check_interval': 3600,
|
|
'recursive_scan': False,
|
|
'max_recursion_depth': 5
|
|
}):
|
|
monitor = FTPMonitor(mock_db)
|
|
|
|
# Don't initialize cache - simulate starting with empty cache
|
|
# but database has processed files
|
|
|
|
mock_ftp = MockFTP(directory_structure)
|
|
|
|
# Mock the _process_file method
|
|
processed_files = []
|
|
|
|
async def mock_process_file(ftp, file_info):
|
|
processed_files.append(file_info.name)
|
|
return True
|
|
|
|
monitor._process_file = mock_process_file
|
|
|
|
# Test file processing
|
|
result = await monitor.check_for_new_files()
|
|
|
|
print(f"✅ Database fallback test complete")
|
|
print(f" Files found: {result['files_found']}")
|
|
print(f" Files processed: {result['files_processed']}")
|
|
print(f" Files skipped: {result['files_skipped']}")
|
|
|
|
# Verify results
|
|
assert result['files_found'] == 2, "Should find 2 files total"
|
|
assert result['files_processed'] == 1, "Should process only 1 new file"
|
|
assert result['files_skipped'] == 1, "Should skip 1 database-processed file"
|
|
|
|
# Verify only new_file.sgl_v2 was processed
|
|
assert len(processed_files) == 1, f"Expected 1 processed file, got {len(processed_files)}"
|
|
assert 'new_file.sgl_v2' in processed_files, "Should process new_file.sgl_v2"
|
|
|
|
# Verify cache was updated with database file
|
|
assert 'db_only_file.sgl_v2' in monitor.processed_files, "Cache should be updated with database file"
|
|
|
|
print("✅ Database lookup fallback test passed")
|
|
|
|
|
|
async def test_cache_initialization():
|
|
"""Test that cache is properly initialized from database"""
|
|
print("\n🧪 Testing cache initialization")
|
|
print("-" * 35)
|
|
|
|
# Create mock database with processed files
|
|
mock_db = MockDatabaseManager()
|
|
mock_db.mark_as_processed('old_file1.sgl_v2')
|
|
mock_db.mark_as_processed('old_file2.sgl_v2')
|
|
mock_db.mark_as_processed('old_file3.sgl_v2')
|
|
|
|
with patch('ftp_monitor.FTP_CONFIG', {
|
|
'host': 'test.example.com',
|
|
'username': 'testuser',
|
|
'password': 'testpass',
|
|
'base_path': '/',
|
|
'check_interval': 3600,
|
|
'recursive_scan': False,
|
|
'max_recursion_depth': 5
|
|
}):
|
|
monitor = FTPMonitor(mock_db)
|
|
|
|
# Verify cache starts empty
|
|
assert len(monitor.processed_files) == 0, "Cache should start empty"
|
|
|
|
# Initialize cache
|
|
cache_count = await monitor.initialize_processed_files_cache()
|
|
|
|
print(f"✅ Cache initialized with {cache_count} files")
|
|
|
|
# Verify cache is populated
|
|
assert cache_count == 3, f"Expected 3 cached files, got {cache_count}"
|
|
assert len(monitor.processed_files) == 3, "Cache should contain 3 files"
|
|
|
|
expected_files = {'old_file1.sgl_v2', 'old_file2.sgl_v2', 'old_file3.sgl_v2'}
|
|
assert monitor.processed_files == expected_files, "Cache should contain expected files"
|
|
|
|
print("✅ Cache initialization test passed")
|
|
|
|
|
|
async def test_performance_with_many_processed_files():
|
|
"""Test performance with many already processed files"""
|
|
print("\n🧪 Testing performance with many processed files")
|
|
print("-" * 50)
|
|
|
|
# Create many files, mostly already processed
|
|
all_files = [f"file_{i:04d}.sgl_v2" for i in range(100)]
|
|
new_files = [f"new_file_{i}.sgl_v2" for i in range(3)]
|
|
|
|
directory_structure = {
|
|
'/': {
|
|
'files': all_files + new_files
|
|
}
|
|
}
|
|
|
|
# Create mock database with most files already processed
|
|
mock_db = MockDatabaseManager()
|
|
for filename in all_files:
|
|
mock_db.mark_as_processed(filename)
|
|
|
|
with patch('ftp_monitor.FTP_CONFIG', {
|
|
'host': 'test.example.com',
|
|
'username': 'testuser',
|
|
'password': 'testpass',
|
|
'base_path': '/',
|
|
'check_interval': 3600,
|
|
'recursive_scan': False,
|
|
'max_recursion_depth': 5
|
|
}):
|
|
monitor = FTPMonitor(mock_db)
|
|
|
|
# Initialize cache
|
|
cache_count = await monitor.initialize_processed_files_cache()
|
|
print(f" Loaded {cache_count} files into cache")
|
|
|
|
mock_ftp = MockFTP(directory_structure)
|
|
|
|
# Mock the _process_file method to track processing
|
|
processed_files = []
|
|
db_lookups = 0
|
|
|
|
# Track database lookups
|
|
original_is_file_processed = mock_db.is_file_processed
|
|
|
|
async def tracked_is_file_processed(filename):
|
|
nonlocal db_lookups
|
|
db_lookups += 1
|
|
return await original_is_file_processed(filename)
|
|
|
|
mock_db.is_file_processed = tracked_is_file_processed
|
|
|
|
async def mock_process_file(ftp, file_info):
|
|
processed_files.append(file_info.name)
|
|
return True
|
|
|
|
monitor._process_file = mock_process_file
|
|
|
|
# Test file processing
|
|
result = await monitor.check_for_new_files()
|
|
|
|
print(f"✅ Performance test complete")
|
|
print(f" Files found: {result['files_found']}")
|
|
print(f" Files processed: {result['files_processed']}")
|
|
print(f" Files skipped: {result['files_skipped']}")
|
|
print(f" Database lookups: {db_lookups}")
|
|
|
|
# Verify results
|
|
assert result['files_found'] == 103, "Should find 103 files total"
|
|
assert result['files_processed'] == 3, "Should process only 3 new files"
|
|
assert result['files_skipped'] == 100, "Should skip 100 already processed files"
|
|
|
|
# Verify performance: should have minimal database lookups due to caching
|
|
assert db_lookups == 3, f"Should have only 3 database lookups (for new files), got {db_lookups}"
|
|
|
|
# Verify only new files were processed
|
|
assert len(processed_files) == 3, f"Expected 3 processed files, got {len(processed_files)}"
|
|
for new_file in new_files:
|
|
assert new_file in processed_files, f"Should process {new_file}"
|
|
|
|
print("✅ Performance test passed")
|
|
|
|
|
|
async def main():
|
|
"""Main test function"""
|
|
print("🚀 Database Skip Functionality Test Suite")
|
|
print("=" * 50)
|
|
|
|
try:
|
|
await test_skip_already_processed_files()
|
|
await test_database_lookup_fallback()
|
|
await test_cache_initialization()
|
|
await test_performance_with_many_processed_files()
|
|
|
|
print("\n" + "=" * 50)
|
|
print("✅ All database skip tests passed!")
|
|
print("💾 File duplication prevention is working correctly")
|
|
print("🚀 Performance optimizations are effective")
|
|
|
|
except Exception as e:
|
|
print(f"\n❌ Test failed: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main()) |