221 lines
8.3 KiB
Python
221 lines
8.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test script to validate the layered architecture structure
|
|
This script checks the structure without requiring all dependencies to be installed
|
|
"""
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
|
|
def check_file_structure():
|
|
"""Check if all expected files exist in the layered structure"""
|
|
expected_structure = {
|
|
"layers/__init__.py": "Layers package init",
|
|
"layers/infrastructure/__init__.py": "Infrastructure layer init",
|
|
"layers/infrastructure/database_connection.py": "Database connection management",
|
|
"layers/infrastructure/redis_connection.py": "Redis connection management",
|
|
"layers/infrastructure/repositories.py": "Data access layer",
|
|
"layers/business/__init__.py": "Business layer init",
|
|
"layers/business/sensor_service.py": "Sensor business logic",
|
|
"layers/business/room_service.py": "Room business logic",
|
|
"layers/business/analytics_service.py": "Analytics business logic",
|
|
"layers/business/cleanup_service.py": "Cleanup business logic",
|
|
"layers/presentation/__init__.py": "Presentation layer init",
|
|
"layers/presentation/websocket_handler.py": "WebSocket management",
|
|
"layers/presentation/redis_subscriber.py": "Redis pub/sub handling",
|
|
"layers/presentation/api_routes.py": "API route definitions",
|
|
"main_layered.py": "Main application with layered architecture",
|
|
"models.py": "Data models (existing)",
|
|
}
|
|
|
|
print("🔍 Checking layered architecture file structure...")
|
|
print("=" * 60)
|
|
|
|
all_files_exist = True
|
|
|
|
for file_path, description in expected_structure.items():
|
|
full_path = Path(file_path)
|
|
|
|
if full_path.exists():
|
|
size = full_path.stat().st_size
|
|
print(f"✅ {file_path:<40} ({size:,} bytes) - {description}")
|
|
else:
|
|
print(f"❌ {file_path:<40} MISSING - {description}")
|
|
all_files_exist = False
|
|
|
|
print("=" * 60)
|
|
|
|
if all_files_exist:
|
|
print("🎉 All files in layered structure exist!")
|
|
return True
|
|
else:
|
|
print("❌ Some files are missing from the layered structure")
|
|
return False
|
|
|
|
def check_import_structure():
|
|
"""Check the logical structure of imports (without actually importing)"""
|
|
print("\n📋 Analyzing import dependencies...")
|
|
print("=" * 60)
|
|
|
|
# Define expected dependencies by layer
|
|
layer_dependencies = {
|
|
"Infrastructure Layer": {
|
|
"files": [
|
|
"layers/infrastructure/database_connection.py",
|
|
"layers/infrastructure/redis_connection.py",
|
|
"layers/infrastructure/repositories.py"
|
|
],
|
|
"can_import_from": ["models", "external libraries"],
|
|
"should_not_import_from": ["business", "presentation"]
|
|
},
|
|
"Business Layer": {
|
|
"files": [
|
|
"layers/business/sensor_service.py",
|
|
"layers/business/room_service.py",
|
|
"layers/business/analytics_service.py",
|
|
"layers/business/cleanup_service.py"
|
|
],
|
|
"can_import_from": ["models", "infrastructure", "external libraries"],
|
|
"should_not_import_from": ["presentation"]
|
|
},
|
|
"Presentation Layer": {
|
|
"files": [
|
|
"layers/presentation/websocket_handler.py",
|
|
"layers/presentation/redis_subscriber.py",
|
|
"layers/presentation/api_routes.py"
|
|
],
|
|
"can_import_from": ["models", "business", "infrastructure", "external libraries"],
|
|
"should_not_import_from": []
|
|
}
|
|
}
|
|
|
|
violations = []
|
|
|
|
for layer_name, layer_info in layer_dependencies.items():
|
|
print(f"\n{layer_name}:")
|
|
|
|
for file_path in layer_info["files"]:
|
|
if Path(file_path).exists():
|
|
try:
|
|
with open(file_path, 'r') as f:
|
|
content = f.read()
|
|
|
|
# Check for violations
|
|
for forbidden in layer_info["should_not_import_from"]:
|
|
if forbidden == "business" and "from ..business" in content:
|
|
violations.append(f"{file_path} imports from business layer (violation)")
|
|
elif forbidden == "presentation" and "from ..presentation" in content:
|
|
violations.append(f"{file_path} imports from presentation layer (violation)")
|
|
|
|
print(f" ✅ {Path(file_path).name}")
|
|
|
|
except Exception as e:
|
|
print(f" ⚠️ {Path(file_path).name} - Could not analyze: {e}")
|
|
|
|
if violations:
|
|
print(f"\n❌ Found {len(violations)} layering violations:")
|
|
for violation in violations:
|
|
print(f" - {violation}")
|
|
return False
|
|
else:
|
|
print("\n✅ No layering violations detected!")
|
|
return True
|
|
|
|
def analyze_code_separation():
|
|
"""Analyze how well the code has been separated by responsibility"""
|
|
print("\n📊 Analyzing code separation...")
|
|
print("=" * 60)
|
|
|
|
analysis = {
|
|
"Infrastructure Layer": {
|
|
"responsibilities": ["Database connections", "Redis connections", "Data repositories"],
|
|
"file_count": 0,
|
|
"total_lines": 0
|
|
},
|
|
"Business Layer": {
|
|
"responsibilities": ["Business logic", "Data processing", "Analytics", "Cleanup"],
|
|
"file_count": 0,
|
|
"total_lines": 0
|
|
},
|
|
"Presentation Layer": {
|
|
"responsibilities": ["HTTP endpoints", "WebSocket handling", "Request/Response"],
|
|
"file_count": 0,
|
|
"total_lines": 0
|
|
}
|
|
}
|
|
|
|
layer_paths = {
|
|
"Infrastructure Layer": "layers/infrastructure/",
|
|
"Business Layer": "layers/business/",
|
|
"Presentation Layer": "layers/presentation/"
|
|
}
|
|
|
|
for layer_name, layer_path in layer_paths.items():
|
|
layer_dir = Path(layer_path)
|
|
if layer_dir.exists():
|
|
py_files = list(layer_dir.glob("*.py"))
|
|
py_files = [f for f in py_files if f.name != "__init__.py"]
|
|
|
|
total_lines = 0
|
|
for py_file in py_files:
|
|
try:
|
|
with open(py_file, 'r') as f:
|
|
lines = len(f.readlines())
|
|
total_lines += lines
|
|
except:
|
|
pass
|
|
|
|
analysis[layer_name]["file_count"] = len(py_files)
|
|
analysis[layer_name]["total_lines"] = total_lines
|
|
|
|
for layer_name, info in analysis.items():
|
|
print(f"\n{layer_name}:")
|
|
print(f" Files: {info['file_count']}")
|
|
print(f" Lines of Code: {info['total_lines']:,}")
|
|
print(f" Responsibilities: {', '.join(info['responsibilities'])}")
|
|
|
|
total_files = sum(info["file_count"] for info in analysis.values())
|
|
total_lines = sum(info["total_lines"] for info in analysis.values())
|
|
|
|
print(f"\n📈 Total Separation Metrics:")
|
|
print(f" Total Files: {total_files}")
|
|
print(f" Total Lines: {total_lines:,}")
|
|
print(f" Layers: 3 (Infrastructure, Business, Presentation)")
|
|
|
|
return True
|
|
|
|
def main():
|
|
"""Main test function"""
|
|
print("🏗️ LAYERED ARCHITECTURE VALIDATION")
|
|
print("=" * 60)
|
|
|
|
success = True
|
|
|
|
# Check file structure
|
|
if not check_file_structure():
|
|
success = False
|
|
|
|
# Check import structure
|
|
if not check_import_structure():
|
|
success = False
|
|
|
|
# Analyze code separation
|
|
if not analyze_code_separation():
|
|
success = False
|
|
|
|
print("\n" + "=" * 60)
|
|
if success:
|
|
print("🎉 VALIDATION SUCCESSFUL - Layered architecture is properly structured!")
|
|
print("\n✨ Key Benefits Achieved:")
|
|
print(" • Clear separation of concerns")
|
|
print(" • Infrastructure isolated from business logic")
|
|
print(" • Business logic separated from presentation")
|
|
print(" • Easy to test individual layers")
|
|
print(" • Maintainable and scalable structure")
|
|
else:
|
|
print("❌ VALIDATION FAILED - Issues found in layered architecture")
|
|
|
|
return success
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(0 if main() else 1) |