""" Battery management service implementation """ import asyncio from datetime import datetime, timedelta from typing import Dict, List, Optional, Any from motor.motor_asyncio import AsyncIOMotorDatabase import redis.asyncio as redis import logging import json from models import BatteryState, BatteryType, MaintenanceAlert logger = logging.getLogger(__name__) class BatteryService: """Service for managing battery operations and monitoring""" def __init__(self, db: AsyncIOMotorDatabase, redis_client: redis.Redis): self.db = db self.redis = redis_client self.batteries_collection = db.batteries self.battery_history_collection = db.battery_history self.maintenance_alerts_collection = db.maintenance_alerts async def get_batteries(self) -> List[Dict[str, Any]]: """Get all registered batteries""" cursor = self.batteries_collection.find({}) batteries = [] async for battery in cursor: battery["_id"] = str(battery["_id"]) # Convert datetime fields to ISO format for field in ["installed_date", "last_maintenance", "next_maintenance", "last_updated"]: if field in battery and battery[field]: battery[field] = battery[field].isoformat() batteries.append(battery) return batteries async def get_battery_status(self, battery_id: str) -> Optional[Dict[str, Any]]: """Get current status of a specific battery""" # First try to get from Redis cache cached_status = await self.redis.get(f"battery:status:{battery_id}") if cached_status: return json.loads(cached_status) # Fall back to database battery = await self.batteries_collection.find_one({"battery_id": battery_id}) if battery: battery["_id"] = str(battery["_id"]) # Convert datetime fields for field in ["installed_date", "last_maintenance", "next_maintenance", "last_updated"]: if field in battery and battery[field]: battery[field] = battery[field].isoformat() # Cache the result await self.redis.setex( f"battery:status:{battery_id}", 300, # 5 minutes TTL json.dumps(battery, default=str) ) return battery return None async def charge_battery(self, battery_id: str, power_kw: float, duration_minutes: Optional[int] = None) -> Dict[str, Any]: """Initiate battery charging""" battery = await self.get_battery_status(battery_id) if not battery: return {"success": False, "error": "Battery not found"} # Check if battery can accept charge current_soc = battery.get("state_of_charge", 0) max_charge_power = battery.get("max_charge_power_kw", 0) if current_soc >= 100: return {"success": False, "error": "Battery is already fully charged"} if power_kw > max_charge_power: return {"success": False, "error": f"Requested power ({power_kw} kW) exceeds maximum charge power ({max_charge_power} kW)"} # Update battery state now = datetime.utcnow() update_data = { "state": BatteryState.CHARGING.value, "current_power_kw": power_kw, "last_updated": now } if duration_minutes: update_data["charging_until"] = now + timedelta(minutes=duration_minutes) await self.batteries_collection.update_one( {"battery_id": battery_id}, {"$set": update_data} ) # Clear cache await self.redis.delete(f"battery:status:{battery_id}") # Log the charging event await self._log_battery_event(battery_id, "charging_started", { "power_kw": power_kw, "duration_minutes": duration_minutes }) # Publish event to Redis for real-time updates await self.redis.publish("battery_events", json.dumps({ "event": "charging_started", "battery_id": battery_id, "power_kw": power_kw, "timestamp": now.isoformat() })) return { "success": True, "estimated_completion": (now + timedelta(minutes=duration_minutes)).isoformat() if duration_minutes else None } async def discharge_battery(self, battery_id: str, power_kw: float, duration_minutes: Optional[int] = None) -> Dict[str, Any]: """Initiate battery discharging""" battery = await self.get_battery_status(battery_id) if not battery: return {"success": False, "error": "Battery not found"} # Check if battery can discharge current_soc = battery.get("state_of_charge", 0) max_discharge_power = battery.get("max_discharge_power_kw", 0) if current_soc <= 0: return {"success": False, "error": "Battery is already empty"} if power_kw > max_discharge_power: return {"success": False, "error": f"Requested power ({power_kw} kW) exceeds maximum discharge power ({max_discharge_power} kW)"} # Update battery state now = datetime.utcnow() update_data = { "state": BatteryState.DISCHARGING.value, "current_power_kw": -power_kw, # Negative for discharging "last_updated": now } if duration_minutes: update_data["discharging_until"] = now + timedelta(minutes=duration_minutes) await self.batteries_collection.update_one( {"battery_id": battery_id}, {"$set": update_data} ) # Clear cache await self.redis.delete(f"battery:status:{battery_id}") # Log the discharging event await self._log_battery_event(battery_id, "discharging_started", { "power_kw": power_kw, "duration_minutes": duration_minutes }) # Publish event await self.redis.publish("battery_events", json.dumps({ "event": "discharging_started", "battery_id": battery_id, "power_kw": power_kw, "timestamp": now.isoformat() })) return { "success": True, "estimated_completion": (now + timedelta(minutes=duration_minutes)).isoformat() if duration_minutes else None } async def optimize_battery(self, battery_id: str, target_soc: float) -> Dict[str, Any]: """Optimize battery charging/discharging to reach target SOC""" battery = await self.get_battery_status(battery_id) if not battery: return {"success": False, "error": "Battery not found"} current_soc = battery.get("state_of_charge", 0) capacity_kwh = battery.get("capacity_kwh", 0) # Calculate energy needed energy_difference_kwh = (target_soc - current_soc) / 100 * capacity_kwh if abs(energy_difference_kwh) < 0.1: # Within 0.1 kWh return {"message": "Battery is already at target SOC", "action": "none"} if energy_difference_kwh > 0: # Need to charge max_power = battery.get("max_charge_power_kw", 0) action = "charge" else: # Need to discharge max_power = battery.get("max_discharge_power_kw", 0) action = "discharge" energy_difference_kwh = abs(energy_difference_kwh) # Calculate optimal power and duration optimal_power = min(max_power, energy_difference_kwh * 2) # Conservative power level duration_hours = energy_difference_kwh / optimal_power duration_minutes = int(duration_hours * 60) # Execute the optimization if action == "charge": result = await self.charge_battery(battery_id, optimal_power, duration_minutes) else: result = await self.discharge_battery(battery_id, optimal_power, duration_minutes) return { "action": action, "power_kw": optimal_power, "duration_minutes": duration_minutes, "energy_difference_kwh": energy_difference_kwh, "result": result } async def get_battery_history(self, battery_id: str, hours: int = 24) -> List[Dict[str, Any]]: """Get historical data for a battery""" start_time = datetime.utcnow() - timedelta(hours=hours) cursor = self.battery_history_collection.find({ "battery_id": battery_id, "timestamp": {"$gte": start_time} }).sort("timestamp", -1) history = [] async for record in cursor: record["_id"] = str(record["_id"]) if "timestamp" in record: record["timestamp"] = record["timestamp"].isoformat() history.append(record) return history async def get_battery_analytics(self, hours: int = 24) -> Dict[str, Any]: """Get system-wide battery analytics""" start_time = datetime.utcnow() - timedelta(hours=hours) # Get all batteries batteries = await self.get_batteries() total_capacity = sum(b.get("capacity_kwh", 0) for b in batteries) total_stored = sum(b.get("stored_energy_kwh", 0) for b in batteries) active_count = sum(1 for b in batteries if b.get("state") != "error") # Aggregate historical data pipeline = [ {"$match": {"timestamp": {"$gte": start_time}}}, {"$group": { "_id": None, "total_energy_charged": {"$sum": {"$cond": [{"$gt": ["$power_kw", 0]}, {"$multiply": ["$power_kw", 0.5]}, 0]}}, # Approximate kWh "total_energy_discharged": {"$sum": {"$cond": [{"$lt": ["$power_kw", 0]}, {"$multiply": [{"$abs": "$power_kw"}, 0.5]}, 0]}}, "avg_efficiency": {"$avg": "$efficiency"} }} ] cursor = self.battery_history_collection.aggregate(pipeline) analytics_data = await cursor.to_list(length=1) if analytics_data: energy_data = analytics_data[0] else: energy_data = { "total_energy_charged": 0, "total_energy_discharged": 0, "avg_efficiency": 0.95 } # Calculate metrics average_soc = sum(b.get("state_of_charge", 0) for b in batteries) / len(batteries) if batteries else 0 average_health = sum(b.get("health_percentage", 100) for b in batteries) / len(batteries) if batteries else 100 return { "total_batteries": len(batteries), "active_batteries": active_count, "total_capacity_kwh": total_capacity, "total_stored_energy_kwh": total_stored, "average_soc": round(average_soc, 2), "total_energy_charged_kwh": round(energy_data["total_energy_charged"], 2), "total_energy_discharged_kwh": round(energy_data["total_energy_discharged"], 2), "net_energy_flow_kwh": round(energy_data["total_energy_charged"] - energy_data["total_energy_discharged"], 2), "round_trip_efficiency": round(energy_data.get("avg_efficiency", 0.95) * 100, 2), "capacity_utilization": round((total_stored / total_capacity * 100) if total_capacity > 0 else 0, 2), "average_health": round(average_health, 2), "batteries_needing_maintenance": sum(1 for b in batteries if b.get("health_percentage", 100) < 80) } async def update_battery_status(self, battery_id: str): """Update battery status with simulated or real data""" # This would typically connect to actual battery management systems # For now, we'll simulate some basic updates battery = await self.get_battery_status(battery_id) if not battery: return now = datetime.utcnow() current_power = battery.get("current_power_kw", 0) current_soc = battery.get("state_of_charge", 50) capacity = battery.get("capacity_kwh", 100) # Simulate SOC changes based on power flow if current_power != 0: # Convert power to SOC change (simplified) soc_change = (current_power * 0.5) / capacity * 100 # 0.5 hour interval new_soc = max(0, min(100, current_soc + soc_change)) # Calculate stored energy stored_energy = new_soc / 100 * capacity # Update database await self.batteries_collection.update_one( {"battery_id": battery_id}, { "$set": { "state_of_charge": round(new_soc, 2), "stored_energy_kwh": round(stored_energy, 2), "last_updated": now } } ) # Log historical data await self.battery_history_collection.insert_one({ "battery_id": battery_id, "timestamp": now, "state_of_charge": new_soc, "power_kw": current_power, "stored_energy_kwh": stored_energy, "efficiency": battery.get("efficiency", 0.95) }) # Clear cache await self.redis.delete(f"battery:status:{battery_id}") async def check_maintenance_alerts(self): """Check for batteries needing maintenance""" batteries = await self.get_batteries() for battery in batteries: alerts = [] # Check health health = battery.get("health_percentage", 100) if health < 70: alerts.append({ "alert_type": "health", "severity": "critical", "message": f"Battery health is critically low at {health}%", "recommended_action": "Schedule immediate maintenance and consider replacement" }) elif health < 85: alerts.append({ "alert_type": "health", "severity": "warning", "message": f"Battery health is declining at {health}%", "recommended_action": "Schedule maintenance inspection" }) # Check cycles cycles = battery.get("cycles_completed", 0) max_cycles = battery.get("max_cycles", 5000) if cycles > max_cycles * 0.9: alerts.append({ "alert_type": "cycles", "severity": "warning", "message": f"Battery has completed {cycles}/{max_cycles} cycles", "recommended_action": "Plan for battery replacement" }) # Check scheduled maintenance next_maintenance = battery.get("next_maintenance") if next_maintenance and datetime.fromisoformat(next_maintenance.replace('Z', '+00:00')) < datetime.utcnow(): alerts.append({ "alert_type": "scheduled", "severity": "info", "message": "Scheduled maintenance is due", "recommended_action": "Perform scheduled maintenance procedures" }) # Save alerts to database for alert in alerts: alert_doc = { "battery_id": battery["battery_id"], "timestamp": datetime.utcnow(), **alert } # Check if alert already exists to avoid duplicates existing = await self.maintenance_alerts_collection.find_one({ "battery_id": battery["battery_id"], "alert_type": alert["alert_type"], "severity": alert["severity"] }) if not existing: await self.maintenance_alerts_collection.insert_one(alert_doc) async def _log_battery_event(self, battery_id: str, event_type: str, data: Dict[str, Any]): """Log battery events for auditing""" event_doc = { "battery_id": battery_id, "event_type": event_type, "timestamp": datetime.utcnow(), "data": data } await self.db.battery_events.insert_one(event_doc)