Organizes 11 projects for Cerbo GX/Venus OS into a single repository: - axiom-nmea: Raymarine LightHouse protocol decoder - dbus-generator-ramp: Generator current ramp controller - dbus-lightning: Blitzortung lightning monitor - dbus-meteoblue-forecast: Meteoblue weather forecast - dbus-no-foreign-land: noforeignland.com tracking - dbus-tides: Tide prediction from depth + harmonics - dbus-vrm-history: VRM cloud history proxy - dbus-windy-station: Windy.com weather upload - mfd-custom-app: MFD app deployment package - venus-html5-app: Custom Victron HTML5 app fork - watermaker: Watermaker PLC control UI Adds root README, .gitignore, project template, and per-project .gitignore files. Sensitive config files excluded via .gitignore with .example templates provided. Made-with: Cursor
935 lines
34 KiB
Python
935 lines
34 KiB
Python
"""
|
|
Ramp Controller for Generator Current Ramp Controller
|
|
|
|
Manages the current limit ramping with:
|
|
- Initial ramp from 40A to 50A over 30 minutes
|
|
- Fast recovery to near-overload point after overload detection
|
|
- Adaptive margins based on rapid overload detection
|
|
- Output power correlation learning for smarter limits
|
|
"""
|
|
|
|
import logging
|
|
import json
|
|
import os
|
|
from time import time
|
|
from dataclasses import dataclass, field
|
|
from typing import List, Optional, Tuple, Dict
|
|
|
|
from config import RAMP_CONFIG, LEARNING_CONFIG
|
|
|
|
|
|
@dataclass
|
|
class OverloadEvent:
|
|
"""Record of an overload event"""
|
|
timestamp: float
|
|
current_at_overload: float
|
|
last_stable_current: float
|
|
output_power: float = 0.0 # Output power at time of overload
|
|
|
|
|
|
@dataclass
|
|
class LearningDataPoint:
|
|
"""A data point for correlation learning"""
|
|
timestamp: float
|
|
output_power: float # Watts
|
|
safe_input_current: float # Amps - highest current that was stable
|
|
was_overload: bool # True if this was recorded at overload
|
|
|
|
|
|
@dataclass
|
|
class RampState:
|
|
"""Current state of the ramp controller"""
|
|
current_limit: float
|
|
target_limit: float
|
|
ramp_start_time: Optional[float] = None
|
|
ramp_start_current: Optional[float] = None # Current limit when ramp started
|
|
last_stable_current: float = 40.0
|
|
is_recovery: bool = False
|
|
# Fast recovery tracking
|
|
fast_ramp_target: Optional[float] = None # Target for fast ramp phase
|
|
in_fast_ramp: bool = False
|
|
|
|
|
|
class PowerCorrelationModel:
|
|
"""
|
|
Learns the correlation between output power and maximum safe input current.
|
|
|
|
Model: max_input_current = base + slope * output_power
|
|
|
|
Key insight: When output loads are high, more input power passes through
|
|
directly to loads (not through inverter/battery). When output loads are low,
|
|
all input power goes to battery charging, stressing the inverter more.
|
|
"""
|
|
|
|
def __init__(self, config: Dict = None):
|
|
self.config = config or LEARNING_CONFIG
|
|
self.logger = logging.getLogger('PowerModel')
|
|
|
|
# Model parameters
|
|
self.base_current = self.config['initial_base_current']
|
|
self.slope = self.config['initial_slope']
|
|
|
|
# Data points for learning
|
|
self.data_points: List[LearningDataPoint] = []
|
|
|
|
# Track overload points specifically
|
|
self.overload_points: List[Tuple[float, float]] = [] # (output_power, input_current)
|
|
|
|
# Confidence level
|
|
self.confidence = 0
|
|
|
|
# Load saved model if exists
|
|
self._load_model()
|
|
|
|
def _model_path(self) -> str:
|
|
"""Path to saved model file"""
|
|
return '/data/dbus-generator-ramp/learned_model.json'
|
|
|
|
def _load_model(self):
|
|
"""Load saved model parameters"""
|
|
try:
|
|
path = self._model_path()
|
|
if os.path.exists(path):
|
|
with open(path, 'r') as f:
|
|
data = json.load(f)
|
|
self.base_current = data.get('base_current', self.base_current)
|
|
self.slope = data.get('slope', self.slope)
|
|
self.confidence = data.get('confidence', 0)
|
|
self.overload_points = [tuple(p) for p in data.get('overload_points', [])]
|
|
self.logger.info(
|
|
f"Loaded model: base={self.base_current:.1f}A, "
|
|
f"slope={self.slope:.6f}, confidence={self.confidence}"
|
|
)
|
|
except Exception as e:
|
|
self.logger.warning(f"Could not load model: {e}")
|
|
|
|
def _save_model(self):
|
|
"""Save model parameters to disk"""
|
|
try:
|
|
path = self._model_path()
|
|
os.makedirs(os.path.dirname(path), exist_ok=True)
|
|
data = {
|
|
'base_current': self.base_current,
|
|
'slope': self.slope,
|
|
'confidence': self.confidence,
|
|
'overload_points': self.overload_points[-20:], # Keep last 20
|
|
}
|
|
with open(path, 'w') as f:
|
|
json.dump(data, f, indent=2)
|
|
except Exception as e:
|
|
self.logger.warning(f"Could not save model: {e}")
|
|
|
|
def record_stable_operation(self, output_power: float, input_current: float,
|
|
timestamp: float = None):
|
|
"""
|
|
Record a data point of stable operation.
|
|
|
|
Args:
|
|
output_power: Current output power in Watts
|
|
input_current: Current input current limit in Amps
|
|
timestamp: Time of recording
|
|
"""
|
|
if timestamp is None:
|
|
timestamp = time()
|
|
|
|
point = LearningDataPoint(
|
|
timestamp=timestamp,
|
|
output_power=output_power,
|
|
safe_input_current=input_current,
|
|
was_overload=False
|
|
)
|
|
|
|
self.data_points.append(point)
|
|
|
|
# Trim old data points
|
|
max_points = self.config['max_data_points']
|
|
if len(self.data_points) > max_points:
|
|
self.data_points = self.data_points[-max_points:]
|
|
|
|
self.confidence = min(self.confidence + 1, max_points)
|
|
|
|
def record_overload(self, output_power: float, input_current: float,
|
|
timestamp: float = None):
|
|
"""
|
|
Record an overload event - this is valuable learning data.
|
|
|
|
Args:
|
|
output_power: Output power at time of overload
|
|
input_current: Input current when overload occurred
|
|
timestamp: Time of overload
|
|
"""
|
|
if timestamp is None:
|
|
timestamp = time()
|
|
|
|
# Record as data point
|
|
point = LearningDataPoint(
|
|
timestamp=timestamp,
|
|
output_power=output_power,
|
|
safe_input_current=input_current - 1.0, # Overload means this was too high
|
|
was_overload=True
|
|
)
|
|
self.data_points.append(point)
|
|
|
|
# Track overload point
|
|
self.overload_points.append((output_power, input_current))
|
|
if len(self.overload_points) > 20:
|
|
self.overload_points = self.overload_points[-20:]
|
|
|
|
# Update model based on this overload
|
|
self._update_model_from_overload(output_power, input_current)
|
|
|
|
self.confidence = min(self.confidence + 2, self.config['max_data_points'])
|
|
self._save_model()
|
|
|
|
self.logger.info(
|
|
f"Recorded overload: output={output_power:.0f}W, input={input_current:.1f}A"
|
|
)
|
|
|
|
def _update_model_from_overload(self, output_power: float, overload_current: float):
|
|
"""
|
|
Update model parameters based on overload event.
|
|
Uses gradient descent-like update.
|
|
"""
|
|
lr = self.config['learning_rate']
|
|
|
|
# Current model prediction
|
|
predicted_max = self.predict_max_current(output_power)
|
|
|
|
# The actual max should be below overload point
|
|
actual_max = overload_current - 1.0
|
|
|
|
# Error
|
|
error = predicted_max - actual_max
|
|
|
|
if error > 0: # Model was too optimistic
|
|
# Reduce base_current more aggressively
|
|
self.base_current -= lr * error * 0.8
|
|
|
|
# Adjust slope based on where in the power range this occurred
|
|
if output_power > 3000:
|
|
# Overload at high output - slope might be too high
|
|
self.slope -= lr * 0.0001
|
|
|
|
self.logger.info(
|
|
f"Model update: base={self.base_current:.1f}A, slope={self.slope:.6f}"
|
|
)
|
|
|
|
# Clamp to reasonable values
|
|
# Base is the low-output (0-1500W) achievable limit, typically 45A
|
|
# Allow learning to adjust slightly but keep within reasonable bounds
|
|
self.base_current = max(40.0, min(48.0, self.base_current))
|
|
self.slope = max(0.0, min(0.003, self.slope))
|
|
|
|
def predict_max_current(self, output_power: float) -> float:
|
|
"""
|
|
Predict the maximum safe input current for given output power.
|
|
|
|
Args:
|
|
output_power: Current output power in Watts
|
|
|
|
Returns:
|
|
Predicted maximum safe input current in Amps
|
|
"""
|
|
# Base prediction from linear model
|
|
predicted = self.base_current + (self.slope * output_power)
|
|
|
|
# Apply zone-based adjustments
|
|
zones = self.config['power_zones']
|
|
for zone_name, (min_p, max_p, offset) in zones.items():
|
|
if min_p <= output_power < max_p:
|
|
predicted += offset
|
|
break
|
|
|
|
# Check against recent overload points
|
|
for (op, oc) in self.overload_points:
|
|
if abs(op - output_power) < 500: # Similar output power
|
|
# Be conservative - use lower of predicted and (overload - margin)
|
|
predicted = min(predicted, oc - 3.0)
|
|
|
|
return predicted
|
|
|
|
def get_confidence_level(self) -> str:
|
|
"""Get confidence level as string"""
|
|
if self.confidence < 3:
|
|
return 'LOW'
|
|
elif self.confidence < self.config['min_confidence']:
|
|
return 'MEDIUM'
|
|
else:
|
|
return 'HIGH'
|
|
|
|
def should_use_model(self) -> bool:
|
|
"""Check if model has enough confidence to be used"""
|
|
return self.confidence >= self.config['min_confidence']
|
|
|
|
def get_status(self) -> Dict:
|
|
"""Get model status for monitoring"""
|
|
return {
|
|
'base_current': self.base_current,
|
|
'slope': self.slope,
|
|
'confidence': self.confidence,
|
|
'confidence_level': self.get_confidence_level(),
|
|
'data_points': len(self.data_points),
|
|
'overload_points': len(self.overload_points),
|
|
}
|
|
|
|
|
|
class RampController:
|
|
"""
|
|
Manages current limit ramping with overload protection.
|
|
|
|
Behavior:
|
|
- Normal ramp: 40A -> 50A over 30 minutes
|
|
- On overload: Return to 40A, wait cooldown, then FAST ramp to near-overload
|
|
- Fast recovery: Quickly ramp to (overload_point - 4A), then slow ramp to target
|
|
- Rapid overload: If overload again within 2 minutes, use larger margin (6A)
|
|
- Learning: Track output power correlation to predict safe limits
|
|
"""
|
|
|
|
def __init__(self, config=None):
|
|
self.config = config or RAMP_CONFIG
|
|
self.logger = logging.getLogger('RampController')
|
|
|
|
# Current state
|
|
self.state = RampState(
|
|
current_limit=self.config['initial_current'],
|
|
target_limit=self.config['target_current'],
|
|
last_stable_current=self.config['initial_current']
|
|
)
|
|
|
|
# Overload history
|
|
self.overload_history: List[OverloadEvent] = []
|
|
|
|
# Recovery target (may be lower than target_current after overloads)
|
|
self.recovery_target = self.config['target_current']
|
|
|
|
# Timestamp tracking
|
|
self.last_update_time = None
|
|
self.stable_since = None
|
|
|
|
# Fast recovery tracking
|
|
self.rapid_overload_count = 0 # Count of rapid successive overloads
|
|
self.last_overload_time = None
|
|
|
|
# Current output power (updated externally)
|
|
self.current_output_power = 0.0
|
|
|
|
# Output power at last overload (for detecting significant increases)
|
|
self.output_power_at_last_overload = 0.0
|
|
self.last_target_reevaluation = None
|
|
|
|
# Power correlation learning
|
|
self.power_model = PowerCorrelationModel(LEARNING_CONFIG)
|
|
self.learning_config = LEARNING_CONFIG
|
|
|
|
@property
|
|
def current_limit(self) -> float:
|
|
"""Get current limit value"""
|
|
return self.state.current_limit
|
|
|
|
@property
|
|
def is_ramping(self) -> bool:
|
|
"""Check if currently ramping"""
|
|
return self.state.ramp_start_time is not None
|
|
|
|
def set_output_power(self, power: float):
|
|
"""Update current output power reading"""
|
|
self.current_output_power = power
|
|
|
|
def set_initial_limit(self, current_setting: float) -> float:
|
|
"""
|
|
Called when generator enters warm-up.
|
|
Always sets limit to initial value for a known starting state.
|
|
"""
|
|
initial = self.config['initial_current']
|
|
|
|
if current_setting != initial:
|
|
self.logger.info(
|
|
f"Generator warm-up: setting limit from {current_setting}A to {initial}A"
|
|
)
|
|
else:
|
|
self.logger.info(
|
|
f"Generator warm-up: confirming limit at {initial}A"
|
|
)
|
|
|
|
self.state.current_limit = initial
|
|
return initial
|
|
|
|
def start_ramp(self, timestamp: float = None):
|
|
"""
|
|
Start ramping from current position.
|
|
"""
|
|
if timestamp is None:
|
|
timestamp = time()
|
|
|
|
self.state.ramp_start_time = timestamp
|
|
self.state.is_recovery = len(self.overload_history) > 0
|
|
|
|
if self.state.is_recovery:
|
|
# Recovery after cooldown - we rolled back to initial_current
|
|
self.state.ramp_start_current = self.config['initial_current']
|
|
self.state.target_limit = self.recovery_target
|
|
|
|
# Calculate fast ramp target
|
|
self._calculate_fast_recovery_target()
|
|
|
|
if self.state.fast_ramp_target and self.state.fast_ramp_target > self.config['initial_current']:
|
|
self.state.in_fast_ramp = True
|
|
self.logger.info(
|
|
f"Starting FAST recovery: {self.state.ramp_start_current}A -> "
|
|
f"{self.state.fast_ramp_target:.1f}A (fast), then -> "
|
|
f"{self.recovery_target}A (slow)"
|
|
)
|
|
else:
|
|
self.state.in_fast_ramp = False
|
|
self.logger.info(
|
|
f"Starting RECOVERY ramp to {self.recovery_target}A at "
|
|
f"{self.config['recovery_ramp_rate']} A/min"
|
|
)
|
|
else:
|
|
# Initial ramp or retry after stable - start from current position
|
|
self.state.ramp_start_current = self.state.current_limit
|
|
self.state.target_limit = self.config['target_current']
|
|
self.state.fast_ramp_target = None
|
|
self.state.in_fast_ramp = False
|
|
self.logger.info(
|
|
f"Starting INITIAL ramp from {self.state.ramp_start_current}A to "
|
|
f"{self.state.target_limit}A at {self.config['initial_ramp_rate']} A/min"
|
|
)
|
|
|
|
self.last_update_time = timestamp
|
|
|
|
def _calculate_fast_recovery_target(self):
|
|
"""
|
|
Calculate the target for fast recovery phase.
|
|
|
|
Logic:
|
|
- Base: (overload_current - fast_recovery_margin)
|
|
- If rapid overload: add extra margin
|
|
- Consider power model prediction if confident
|
|
"""
|
|
if not self.overload_history:
|
|
self.state.fast_ramp_target = None
|
|
return
|
|
|
|
last_overload = self.overload_history[-1]
|
|
overload_current = last_overload.current_at_overload
|
|
|
|
# Base margin
|
|
margin = self.config['fast_recovery_margin'] # Default: 4A
|
|
|
|
# Add extra margin for rapid overloads
|
|
if self.rapid_overload_count > 0:
|
|
extra = self.config['rapid_overload_extra_margin'] * self.rapid_overload_count
|
|
margin += extra
|
|
self.logger.info(
|
|
f"Rapid overload #{self.rapid_overload_count}: using margin of {margin}A"
|
|
)
|
|
|
|
# Calculate fast ramp target
|
|
fast_target = overload_current - margin
|
|
|
|
# Consider power model prediction
|
|
if self.power_model.should_use_model():
|
|
predicted_max = self.power_model.predict_max_current(self.current_output_power)
|
|
# Use the more conservative of the two
|
|
if predicted_max < fast_target:
|
|
self.logger.info(
|
|
f"Power model suggests max {predicted_max:.1f}A at "
|
|
f"{self.current_output_power:.0f}W output"
|
|
)
|
|
fast_target = min(fast_target, predicted_max - 1.0)
|
|
|
|
# Don't fast-ramp below initial_current
|
|
fast_target = max(fast_target, self.config['initial_current'] + 1.0)
|
|
|
|
# Don't fast-ramp above recovery target
|
|
fast_target = min(fast_target, self.recovery_target - 1.0)
|
|
|
|
self.state.fast_ramp_target = round(fast_target * 2) / 2 # Round to 0.5A
|
|
|
|
self.logger.info(
|
|
f"Fast recovery target: {self.state.fast_ramp_target}A "
|
|
f"(overload was at {overload_current}A, margin {margin}A)"
|
|
)
|
|
|
|
def update(self, timestamp: float = None) -> Optional[float]:
|
|
"""
|
|
Calculate and return the new current limit based on elapsed time.
|
|
"""
|
|
if timestamp is None:
|
|
timestamp = time()
|
|
|
|
if self.state.ramp_start_time is None:
|
|
return None
|
|
|
|
elapsed = timestamp - self.state.ramp_start_time
|
|
|
|
# Determine current phase and rate
|
|
if self.state.in_fast_ramp and self.state.fast_ramp_target:
|
|
# Fast ramp phase
|
|
rate = self.config['fast_ramp_rate'] # Fast: 5 A/min
|
|
phase_target = self.state.fast_ramp_target
|
|
elif self.state.is_recovery:
|
|
# Slow recovery phase
|
|
rate = self.config['recovery_ramp_rate']
|
|
phase_target = self.state.target_limit
|
|
else:
|
|
# Initial ramp phase
|
|
rate = self.config['initial_ramp_rate']
|
|
phase_target = self.state.target_limit
|
|
|
|
# Calculate new limit from ramp start position
|
|
start_current = self.state.ramp_start_current or self.config['initial_current']
|
|
new_limit = start_current + (elapsed * rate / 60.0)
|
|
|
|
# Check if we've completed fast ramp phase
|
|
if self.state.in_fast_ramp and self.state.fast_ramp_target:
|
|
if new_limit >= self.state.fast_ramp_target:
|
|
# Transition to slow ramp
|
|
self.logger.info(
|
|
f"Fast ramp complete at {self.state.fast_ramp_target}A, "
|
|
f"switching to slow ramp"
|
|
)
|
|
self.state.in_fast_ramp = False
|
|
# Reset ramp start time and current for slow phase
|
|
self.state.ramp_start_time = timestamp
|
|
self.state.ramp_start_current = self.state.fast_ramp_target
|
|
self.state.current_limit = self.state.fast_ramp_target
|
|
new_limit = self.state.fast_ramp_target
|
|
|
|
# Clamp to target
|
|
new_limit = min(new_limit, self.state.target_limit)
|
|
|
|
# Clamp to absolute maximum
|
|
new_limit = min(new_limit, self.config['maximum_current'])
|
|
|
|
# Round to nearest 0.5A
|
|
new_limit = round(new_limit * 2) / 2
|
|
|
|
# Check if we've reached target
|
|
if new_limit >= self.state.target_limit:
|
|
self.logger.info(f"Reached target: {new_limit}A")
|
|
self.state.ramp_start_time = None
|
|
self.state.in_fast_ramp = False
|
|
|
|
# Start tracking stable time
|
|
if self.stable_since is None:
|
|
self.stable_since = timestamp
|
|
|
|
# Update last stable tracking
|
|
if self.last_update_time is not None:
|
|
time_since_update = timestamp - self.last_update_time
|
|
if time_since_update >= self.config['ramp_update_interval']:
|
|
self.state.last_stable_current = max(
|
|
self.state.last_stable_current,
|
|
new_limit - 1.0
|
|
)
|
|
# Record stable operation for learning
|
|
self.power_model.record_stable_operation(
|
|
self.current_output_power,
|
|
new_limit,
|
|
timestamp
|
|
)
|
|
|
|
# Only return if changed
|
|
if abs(new_limit - self.state.current_limit) >= 0.5:
|
|
self.state.current_limit = new_limit
|
|
self.last_update_time = timestamp
|
|
return new_limit
|
|
|
|
return None
|
|
|
|
def handle_overload(self, timestamp: float = None, output_power: float = None) -> dict:
|
|
"""
|
|
Handle an overload event.
|
|
|
|
Returns:
|
|
dict with:
|
|
- new_limit: Immediate limit to set (initial_current)
|
|
- recovery_target: Target for eventual recovery
|
|
- overload_count: Total overloads this session
|
|
- fast_recovery_target: Target for fast ramp phase
|
|
- is_rapid_overload: True if this was a rapid successive overload
|
|
"""
|
|
if timestamp is None:
|
|
timestamp = time()
|
|
|
|
if output_power is None:
|
|
output_power = self.current_output_power
|
|
|
|
# Check if this is a rapid overload (within threshold of last one)
|
|
is_rapid = False
|
|
if self.last_overload_time is not None:
|
|
time_since_last = timestamp - self.last_overload_time
|
|
if time_since_last < self.config['rapid_overload_threshold']:
|
|
is_rapid = True
|
|
self.rapid_overload_count += 1
|
|
self.logger.warning(
|
|
f"RAPID overload detected! ({time_since_last:.0f}s since last) "
|
|
f"Rapid count: {self.rapid_overload_count}"
|
|
)
|
|
else:
|
|
# Reset rapid count if enough time has passed
|
|
self.rapid_overload_count = 0
|
|
|
|
self.last_overload_time = timestamp
|
|
|
|
# Record the overload
|
|
event = OverloadEvent(
|
|
timestamp=timestamp,
|
|
current_at_overload=self.state.current_limit,
|
|
last_stable_current=self.state.last_stable_current,
|
|
output_power=output_power
|
|
)
|
|
self.overload_history.append(event)
|
|
|
|
overload_count = len(self.overload_history)
|
|
|
|
self.logger.warning(
|
|
f"OVERLOAD #{overload_count} at {event.current_at_overload}A "
|
|
f"(last stable: {event.last_stable_current}A, "
|
|
f"output power: {output_power:.0f}W)"
|
|
)
|
|
|
|
# Record overload in power model
|
|
self.power_model.record_overload(output_power, event.current_at_overload, timestamp)
|
|
|
|
# Calculate new recovery target
|
|
base_margin = self.config['recovery_margin']
|
|
extra_margin = self.config['margin_per_overload'] * (overload_count - 1)
|
|
total_margin = base_margin + extra_margin
|
|
|
|
# Add extra margin for rapid overloads
|
|
if is_rapid:
|
|
total_margin += self.config['rapid_overload_extra_margin'] * self.rapid_overload_count
|
|
|
|
# If we were stable at current for long enough, return to that current after recovery
|
|
# (overload likely due to step load e.g. heater; generator can handle prior level)
|
|
return_to_stable = self.config.get('return_to_stable_after_overload', False)
|
|
min_stable_duration = self.config.get('return_to_stable_min_duration', 1800)
|
|
used_return_to_stable = False
|
|
if return_to_stable and self.stable_since is not None:
|
|
stable_duration = timestamp - self.stable_since
|
|
if stable_duration >= min_stable_duration:
|
|
self.recovery_target = min(
|
|
event.current_at_overload,
|
|
self.config['target_current']
|
|
)
|
|
used_return_to_stable = True
|
|
self.logger.info(
|
|
f"Stable for {stable_duration/60:.1f} min at {event.current_at_overload}A: "
|
|
f"recovery target set to {self.recovery_target}A (return to prior current)"
|
|
)
|
|
if not used_return_to_stable:
|
|
self.recovery_target = event.last_stable_current - total_margin
|
|
# Cap at (overload current - base margin)
|
|
self.recovery_target = min(
|
|
self.recovery_target,
|
|
event.current_at_overload - base_margin
|
|
)
|
|
|
|
# Don't go below minimum
|
|
self.recovery_target = max(
|
|
self.recovery_target,
|
|
self.config['minimum_current']
|
|
)
|
|
|
|
self.logger.info(
|
|
f"Recovery target set to {self.recovery_target}A "
|
|
f"(margin: {total_margin}A)"
|
|
)
|
|
|
|
# Calculate fast recovery target for info
|
|
fast_margin = self.config['fast_recovery_margin']
|
|
if is_rapid:
|
|
fast_margin += self.config['rapid_overload_extra_margin'] * self.rapid_overload_count
|
|
fast_target = max(
|
|
event.current_at_overload - fast_margin,
|
|
self.config['initial_current']
|
|
)
|
|
|
|
# Reset state
|
|
self.state.current_limit = self.config['initial_current']
|
|
self.state.ramp_start_time = None
|
|
self.state.in_fast_ramp = False
|
|
self.state.fast_ramp_target = None
|
|
self.stable_since = None
|
|
|
|
# Track output power at overload for later comparison
|
|
self.output_power_at_last_overload = output_power
|
|
self.last_target_reevaluation = timestamp
|
|
|
|
return {
|
|
'new_limit': self.config['initial_current'],
|
|
'recovery_target': self.recovery_target,
|
|
'overload_count': overload_count,
|
|
'fast_recovery_target': fast_target,
|
|
'is_rapid_overload': is_rapid,
|
|
'rapid_overload_count': self.rapid_overload_count,
|
|
}
|
|
|
|
def check_history_clear(self, timestamp: float = None) -> bool:
|
|
"""
|
|
Check if we should clear overload history after extended stable operation.
|
|
"""
|
|
if timestamp is None:
|
|
timestamp = time()
|
|
|
|
if not self.overload_history:
|
|
return False
|
|
|
|
if self.stable_since is None:
|
|
return False
|
|
|
|
stable_duration = timestamp - self.stable_since
|
|
|
|
if stable_duration >= self.config['history_clear_time']:
|
|
self.logger.info(
|
|
f"Stable for {stable_duration/60:.1f} minutes, "
|
|
f"clearing {len(self.overload_history)} overload events"
|
|
)
|
|
self.overload_history.clear()
|
|
self.recovery_target = self.config['target_current']
|
|
self.rapid_overload_count = 0
|
|
self.last_overload_time = None
|
|
return True
|
|
|
|
return False
|
|
|
|
def should_retry_full_power(self) -> bool:
|
|
"""
|
|
Check if we should attempt ramping to full power again.
|
|
"""
|
|
if self.overload_history:
|
|
return False
|
|
|
|
if self.state.current_limit >= self.config['target_current']:
|
|
return False
|
|
|
|
if self.is_ramping:
|
|
return False
|
|
|
|
return True
|
|
|
|
def check_output_power_increase(self, timestamp: float = None) -> Optional[dict]:
|
|
"""
|
|
Check if output power has increased significantly since last overload.
|
|
If so, re-evaluate and potentially raise the recovery target.
|
|
|
|
Returns:
|
|
dict with new_target and should_ramp if target was raised, None otherwise
|
|
"""
|
|
if timestamp is None:
|
|
timestamp = time()
|
|
|
|
# Only relevant if we have overload history
|
|
if not self.overload_history:
|
|
return None
|
|
|
|
# Check minimum interval between re-evaluations
|
|
min_interval = self.learning_config.get('min_reevaluation_interval', 60)
|
|
if self.last_target_reevaluation is not None:
|
|
if timestamp - self.last_target_reevaluation < min_interval:
|
|
return None
|
|
|
|
# Check if output power has increased significantly
|
|
threshold = self.learning_config.get('output_power_increase_threshold', 2000)
|
|
power_increase = self.current_output_power - self.output_power_at_last_overload
|
|
|
|
if power_increase < threshold:
|
|
return None
|
|
|
|
# Output power has increased significantly - re-evaluate target
|
|
self.logger.info(
|
|
f"Output power increased by {power_increase:.0f}W "
|
|
f"({self.output_power_at_last_overload:.0f}W -> {self.current_output_power:.0f}W)"
|
|
)
|
|
|
|
# Get model's prediction for current output power
|
|
if self.power_model.should_use_model():
|
|
new_suggested = self.power_model.predict_max_current(self.current_output_power)
|
|
else:
|
|
# Without confident model, use zone-based estimate
|
|
# Higher output = can handle more input
|
|
base = self.config['target_current']
|
|
# Add ~1A per 1000W of output power increase
|
|
bonus = power_increase / 1000.0
|
|
new_suggested = min(base, self.recovery_target + bonus)
|
|
|
|
# Only raise target, never lower it from this check
|
|
if new_suggested <= self.recovery_target:
|
|
self.logger.debug(
|
|
f"Model suggests {new_suggested:.1f}A, not higher than current "
|
|
f"recovery target {self.recovery_target:.1f}A"
|
|
)
|
|
return None
|
|
|
|
# Cap at target_current
|
|
new_target = min(new_suggested, self.config['target_current'])
|
|
|
|
# Apply a safety margin - don't go all the way to predicted max
|
|
new_target = new_target - 1.0
|
|
|
|
# Round to 0.5A
|
|
new_target = round(new_target * 2) / 2
|
|
|
|
if new_target <= self.recovery_target:
|
|
return None
|
|
|
|
old_target = self.recovery_target
|
|
self.recovery_target = new_target
|
|
self.last_target_reevaluation = timestamp
|
|
|
|
# Update the reference point for future comparisons
|
|
self.output_power_at_last_overload = self.current_output_power
|
|
|
|
self.logger.info(
|
|
f"Raised recovery target: {old_target:.1f}A -> {new_target:.1f}A "
|
|
f"(output power now {self.current_output_power:.0f}W)"
|
|
)
|
|
|
|
# Determine if we should start a new ramp
|
|
should_ramp = not self.is_ramping and self.state.current_limit < new_target
|
|
|
|
return {
|
|
'old_target': old_target,
|
|
'new_target': new_target,
|
|
'output_power': self.current_output_power,
|
|
'should_ramp': should_ramp,
|
|
}
|
|
|
|
def get_suggested_limit(self, output_power: float = None) -> float:
|
|
"""
|
|
Get the suggested current limit based on output power.
|
|
Uses the learned correlation model.
|
|
"""
|
|
if output_power is None:
|
|
output_power = self.current_output_power
|
|
|
|
if self.power_model.should_use_model():
|
|
suggested = self.power_model.predict_max_current(output_power)
|
|
return round(suggested * 2) / 2
|
|
else:
|
|
return self.config['target_current']
|
|
|
|
def reset(self):
|
|
"""Full reset (e.g., when generator stops)"""
|
|
self.state = RampState(
|
|
current_limit=self.config['initial_current'],
|
|
target_limit=self.config['target_current'],
|
|
last_stable_current=self.config['initial_current']
|
|
)
|
|
self.overload_history.clear()
|
|
self.recovery_target = self.config['target_current']
|
|
self.last_update_time = None
|
|
self.stable_since = None
|
|
self.rapid_overload_count = 0
|
|
self.last_overload_time = None
|
|
self.current_output_power = 0.0
|
|
self.output_power_at_last_overload = 0.0
|
|
self.last_target_reevaluation = None
|
|
self.logger.debug("Controller reset")
|
|
|
|
def get_status(self) -> dict:
|
|
"""Get current controller status"""
|
|
return {
|
|
'current_limit': self.state.current_limit,
|
|
'target_limit': self.state.target_limit,
|
|
'recovery_target': self.recovery_target,
|
|
'is_ramping': self.is_ramping,
|
|
'is_recovery': self.state.is_recovery,
|
|
'in_fast_ramp': self.state.in_fast_ramp,
|
|
'fast_ramp_target': self.state.fast_ramp_target,
|
|
'last_stable': self.state.last_stable_current,
|
|
'overload_count': len(self.overload_history),
|
|
'rapid_overload_count': self.rapid_overload_count,
|
|
'stable_since': self.stable_since,
|
|
'output_power': self.current_output_power,
|
|
'output_power_at_overload': self.output_power_at_last_overload,
|
|
'power_model': self.power_model.get_status(),
|
|
'suggested_limit': self.get_suggested_limit(),
|
|
}
|
|
|
|
|
|
# For testing
|
|
if __name__ == '__main__':
|
|
logging.basicConfig(level=logging.DEBUG)
|
|
|
|
controller = RampController()
|
|
|
|
print("=== Test: Normal Ramp ===")
|
|
controller.start_ramp(0)
|
|
for minute in range(35):
|
|
t = minute * 60
|
|
new_limit = controller.update(t)
|
|
if new_limit or minute % 5 == 0:
|
|
status = controller.get_status()
|
|
print(f" t={minute}min: limit={status['current_limit']}A, ramping={status['is_ramping']}")
|
|
|
|
print("\n=== Test: Overload at 45A with Fast Recovery ===")
|
|
controller.reset()
|
|
controller.set_output_power(2500) # Simulate output power
|
|
controller.start_ramp(0)
|
|
|
|
# Ramp to 45A (about 15 minutes)
|
|
for minute in range(16):
|
|
controller.update(minute * 60)
|
|
|
|
print(f" Before overload: limit={controller.get_status()['current_limit']}A")
|
|
|
|
# Simulate overload at 45A
|
|
result = controller.handle_overload(16 * 60, output_power=2500)
|
|
print(f" After overload: new_limit={result['new_limit']}A, "
|
|
f"recovery_target={result['recovery_target']}A, "
|
|
f"fast_target={result['fast_recovery_target']:.1f}A")
|
|
|
|
# After cooldown, start recovery with fast ramp
|
|
print("\n Starting recovery ramp...")
|
|
controller.start_ramp(21 * 60) # 5 min cooldown
|
|
|
|
for second in range(0, 600, 10): # 10 minutes in 10-second increments
|
|
t = 21 * 60 + second
|
|
new_limit = controller.update(t)
|
|
if new_limit:
|
|
status = controller.get_status()
|
|
phase = "FAST" if status['in_fast_ramp'] else "SLOW"
|
|
print(f" t={21 + second/60:.1f}min: limit={status['current_limit']}A [{phase}]")
|
|
|
|
print("\n=== Test: Rapid Successive Overloads ===")
|
|
controller.reset()
|
|
controller.set_output_power(1500) # Low output
|
|
controller.start_ramp(0)
|
|
|
|
# Ramp to 44A
|
|
for minute in range(12):
|
|
controller.update(minute * 60)
|
|
|
|
# First overload
|
|
result1 = controller.handle_overload(12 * 60)
|
|
print(f" 1st overload: margin=4A, fast_target={result1['fast_recovery_target']:.1f}A")
|
|
|
|
# Quick recovery and second overload within 2 minutes
|
|
controller.start_ramp(17 * 60)
|
|
for second in range(0, 90, 10):
|
|
controller.update(17 * 60 + second)
|
|
|
|
result2 = controller.handle_overload(18.5 * 60) # 1.5 min later
|
|
print(f" 2nd RAPID overload: margin=6A, fast_target={result2['fast_recovery_target']:.1f}A, "
|
|
f"is_rapid={result2['is_rapid_overload']}")
|
|
|
|
# Third overload also rapid
|
|
controller.start_ramp(24 * 60)
|
|
for second in range(0, 60, 10):
|
|
controller.update(24 * 60 + second)
|
|
|
|
result3 = controller.handle_overload(25 * 60) # 1 min later
|
|
print(f" 3rd RAPID overload: margin=8A, fast_target={result3['fast_recovery_target']:.1f}A, "
|
|
f"rapid_count={result3['rapid_overload_count']}")
|
|
|
|
print("\n=== Test: Power Model Suggestions ===")
|
|
model = controller.power_model
|
|
print(f" Model status: {model.get_status()}")
|
|
print(f" Suggested limit at 1500W output: {controller.get_suggested_limit(1500):.1f}A")
|
|
print(f" Suggested limit at 4000W output: {controller.get_suggested_limit(4000):.1f}A")
|
|
print(f" Suggested limit at 6000W output: {controller.get_suggested_limit(6000):.1f}A")
|