Files
venus/dbus-generator-ramp/ramp_controller.py
dev 9756538f16 Initial commit: Venus OS boat addons monorepo
Organizes 11 projects for Cerbo GX/Venus OS into a single repository:
- axiom-nmea: Raymarine LightHouse protocol decoder
- dbus-generator-ramp: Generator current ramp controller
- dbus-lightning: Blitzortung lightning monitor
- dbus-meteoblue-forecast: Meteoblue weather forecast
- dbus-no-foreign-land: noforeignland.com tracking
- dbus-tides: Tide prediction from depth + harmonics
- dbus-vrm-history: VRM cloud history proxy
- dbus-windy-station: Windy.com weather upload
- mfd-custom-app: MFD app deployment package
- venus-html5-app: Custom Victron HTML5 app fork
- watermaker: Watermaker PLC control UI

Adds root README, .gitignore, project template, and per-project
.gitignore files. Sensitive config files excluded via .gitignore
with .example templates provided.

Made-with: Cursor
2026-03-16 17:04:16 +00:00

935 lines
34 KiB
Python

"""
Ramp Controller for Generator Current Ramp Controller
Manages the current limit ramping with:
- Initial ramp from 40A to 50A over 30 minutes
- Fast recovery to near-overload point after overload detection
- Adaptive margins based on rapid overload detection
- Output power correlation learning for smarter limits
"""
import logging
import json
import os
from time import time
from dataclasses import dataclass, field
from typing import List, Optional, Tuple, Dict
from config import RAMP_CONFIG, LEARNING_CONFIG
@dataclass
class OverloadEvent:
"""Record of an overload event"""
timestamp: float
current_at_overload: float
last_stable_current: float
output_power: float = 0.0 # Output power at time of overload
@dataclass
class LearningDataPoint:
"""A data point for correlation learning"""
timestamp: float
output_power: float # Watts
safe_input_current: float # Amps - highest current that was stable
was_overload: bool # True if this was recorded at overload
@dataclass
class RampState:
"""Current state of the ramp controller"""
current_limit: float
target_limit: float
ramp_start_time: Optional[float] = None
ramp_start_current: Optional[float] = None # Current limit when ramp started
last_stable_current: float = 40.0
is_recovery: bool = False
# Fast recovery tracking
fast_ramp_target: Optional[float] = None # Target for fast ramp phase
in_fast_ramp: bool = False
class PowerCorrelationModel:
"""
Learns the correlation between output power and maximum safe input current.
Model: max_input_current = base + slope * output_power
Key insight: When output loads are high, more input power passes through
directly to loads (not through inverter/battery). When output loads are low,
all input power goes to battery charging, stressing the inverter more.
"""
def __init__(self, config: Dict = None):
self.config = config or LEARNING_CONFIG
self.logger = logging.getLogger('PowerModel')
# Model parameters
self.base_current = self.config['initial_base_current']
self.slope = self.config['initial_slope']
# Data points for learning
self.data_points: List[LearningDataPoint] = []
# Track overload points specifically
self.overload_points: List[Tuple[float, float]] = [] # (output_power, input_current)
# Confidence level
self.confidence = 0
# Load saved model if exists
self._load_model()
def _model_path(self) -> str:
"""Path to saved model file"""
return '/data/dbus-generator-ramp/learned_model.json'
def _load_model(self):
"""Load saved model parameters"""
try:
path = self._model_path()
if os.path.exists(path):
with open(path, 'r') as f:
data = json.load(f)
self.base_current = data.get('base_current', self.base_current)
self.slope = data.get('slope', self.slope)
self.confidence = data.get('confidence', 0)
self.overload_points = [tuple(p) for p in data.get('overload_points', [])]
self.logger.info(
f"Loaded model: base={self.base_current:.1f}A, "
f"slope={self.slope:.6f}, confidence={self.confidence}"
)
except Exception as e:
self.logger.warning(f"Could not load model: {e}")
def _save_model(self):
"""Save model parameters to disk"""
try:
path = self._model_path()
os.makedirs(os.path.dirname(path), exist_ok=True)
data = {
'base_current': self.base_current,
'slope': self.slope,
'confidence': self.confidence,
'overload_points': self.overload_points[-20:], # Keep last 20
}
with open(path, 'w') as f:
json.dump(data, f, indent=2)
except Exception as e:
self.logger.warning(f"Could not save model: {e}")
def record_stable_operation(self, output_power: float, input_current: float,
timestamp: float = None):
"""
Record a data point of stable operation.
Args:
output_power: Current output power in Watts
input_current: Current input current limit in Amps
timestamp: Time of recording
"""
if timestamp is None:
timestamp = time()
point = LearningDataPoint(
timestamp=timestamp,
output_power=output_power,
safe_input_current=input_current,
was_overload=False
)
self.data_points.append(point)
# Trim old data points
max_points = self.config['max_data_points']
if len(self.data_points) > max_points:
self.data_points = self.data_points[-max_points:]
self.confidence = min(self.confidence + 1, max_points)
def record_overload(self, output_power: float, input_current: float,
timestamp: float = None):
"""
Record an overload event - this is valuable learning data.
Args:
output_power: Output power at time of overload
input_current: Input current when overload occurred
timestamp: Time of overload
"""
if timestamp is None:
timestamp = time()
# Record as data point
point = LearningDataPoint(
timestamp=timestamp,
output_power=output_power,
safe_input_current=input_current - 1.0, # Overload means this was too high
was_overload=True
)
self.data_points.append(point)
# Track overload point
self.overload_points.append((output_power, input_current))
if len(self.overload_points) > 20:
self.overload_points = self.overload_points[-20:]
# Update model based on this overload
self._update_model_from_overload(output_power, input_current)
self.confidence = min(self.confidence + 2, self.config['max_data_points'])
self._save_model()
self.logger.info(
f"Recorded overload: output={output_power:.0f}W, input={input_current:.1f}A"
)
def _update_model_from_overload(self, output_power: float, overload_current: float):
"""
Update model parameters based on overload event.
Uses gradient descent-like update.
"""
lr = self.config['learning_rate']
# Current model prediction
predicted_max = self.predict_max_current(output_power)
# The actual max should be below overload point
actual_max = overload_current - 1.0
# Error
error = predicted_max - actual_max
if error > 0: # Model was too optimistic
# Reduce base_current more aggressively
self.base_current -= lr * error * 0.8
# Adjust slope based on where in the power range this occurred
if output_power > 3000:
# Overload at high output - slope might be too high
self.slope -= lr * 0.0001
self.logger.info(
f"Model update: base={self.base_current:.1f}A, slope={self.slope:.6f}"
)
# Clamp to reasonable values
# Base is the low-output (0-1500W) achievable limit, typically 45A
# Allow learning to adjust slightly but keep within reasonable bounds
self.base_current = max(40.0, min(48.0, self.base_current))
self.slope = max(0.0, min(0.003, self.slope))
def predict_max_current(self, output_power: float) -> float:
"""
Predict the maximum safe input current for given output power.
Args:
output_power: Current output power in Watts
Returns:
Predicted maximum safe input current in Amps
"""
# Base prediction from linear model
predicted = self.base_current + (self.slope * output_power)
# Apply zone-based adjustments
zones = self.config['power_zones']
for zone_name, (min_p, max_p, offset) in zones.items():
if min_p <= output_power < max_p:
predicted += offset
break
# Check against recent overload points
for (op, oc) in self.overload_points:
if abs(op - output_power) < 500: # Similar output power
# Be conservative - use lower of predicted and (overload - margin)
predicted = min(predicted, oc - 3.0)
return predicted
def get_confidence_level(self) -> str:
"""Get confidence level as string"""
if self.confidence < 3:
return 'LOW'
elif self.confidence < self.config['min_confidence']:
return 'MEDIUM'
else:
return 'HIGH'
def should_use_model(self) -> bool:
"""Check if model has enough confidence to be used"""
return self.confidence >= self.config['min_confidence']
def get_status(self) -> Dict:
"""Get model status for monitoring"""
return {
'base_current': self.base_current,
'slope': self.slope,
'confidence': self.confidence,
'confidence_level': self.get_confidence_level(),
'data_points': len(self.data_points),
'overload_points': len(self.overload_points),
}
class RampController:
"""
Manages current limit ramping with overload protection.
Behavior:
- Normal ramp: 40A -> 50A over 30 minutes
- On overload: Return to 40A, wait cooldown, then FAST ramp to near-overload
- Fast recovery: Quickly ramp to (overload_point - 4A), then slow ramp to target
- Rapid overload: If overload again within 2 minutes, use larger margin (6A)
- Learning: Track output power correlation to predict safe limits
"""
def __init__(self, config=None):
self.config = config or RAMP_CONFIG
self.logger = logging.getLogger('RampController')
# Current state
self.state = RampState(
current_limit=self.config['initial_current'],
target_limit=self.config['target_current'],
last_stable_current=self.config['initial_current']
)
# Overload history
self.overload_history: List[OverloadEvent] = []
# Recovery target (may be lower than target_current after overloads)
self.recovery_target = self.config['target_current']
# Timestamp tracking
self.last_update_time = None
self.stable_since = None
# Fast recovery tracking
self.rapid_overload_count = 0 # Count of rapid successive overloads
self.last_overload_time = None
# Current output power (updated externally)
self.current_output_power = 0.0
# Output power at last overload (for detecting significant increases)
self.output_power_at_last_overload = 0.0
self.last_target_reevaluation = None
# Power correlation learning
self.power_model = PowerCorrelationModel(LEARNING_CONFIG)
self.learning_config = LEARNING_CONFIG
@property
def current_limit(self) -> float:
"""Get current limit value"""
return self.state.current_limit
@property
def is_ramping(self) -> bool:
"""Check if currently ramping"""
return self.state.ramp_start_time is not None
def set_output_power(self, power: float):
"""Update current output power reading"""
self.current_output_power = power
def set_initial_limit(self, current_setting: float) -> float:
"""
Called when generator enters warm-up.
Always sets limit to initial value for a known starting state.
"""
initial = self.config['initial_current']
if current_setting != initial:
self.logger.info(
f"Generator warm-up: setting limit from {current_setting}A to {initial}A"
)
else:
self.logger.info(
f"Generator warm-up: confirming limit at {initial}A"
)
self.state.current_limit = initial
return initial
def start_ramp(self, timestamp: float = None):
"""
Start ramping from current position.
"""
if timestamp is None:
timestamp = time()
self.state.ramp_start_time = timestamp
self.state.is_recovery = len(self.overload_history) > 0
if self.state.is_recovery:
# Recovery after cooldown - we rolled back to initial_current
self.state.ramp_start_current = self.config['initial_current']
self.state.target_limit = self.recovery_target
# Calculate fast ramp target
self._calculate_fast_recovery_target()
if self.state.fast_ramp_target and self.state.fast_ramp_target > self.config['initial_current']:
self.state.in_fast_ramp = True
self.logger.info(
f"Starting FAST recovery: {self.state.ramp_start_current}A -> "
f"{self.state.fast_ramp_target:.1f}A (fast), then -> "
f"{self.recovery_target}A (slow)"
)
else:
self.state.in_fast_ramp = False
self.logger.info(
f"Starting RECOVERY ramp to {self.recovery_target}A at "
f"{self.config['recovery_ramp_rate']} A/min"
)
else:
# Initial ramp or retry after stable - start from current position
self.state.ramp_start_current = self.state.current_limit
self.state.target_limit = self.config['target_current']
self.state.fast_ramp_target = None
self.state.in_fast_ramp = False
self.logger.info(
f"Starting INITIAL ramp from {self.state.ramp_start_current}A to "
f"{self.state.target_limit}A at {self.config['initial_ramp_rate']} A/min"
)
self.last_update_time = timestamp
def _calculate_fast_recovery_target(self):
"""
Calculate the target for fast recovery phase.
Logic:
- Base: (overload_current - fast_recovery_margin)
- If rapid overload: add extra margin
- Consider power model prediction if confident
"""
if not self.overload_history:
self.state.fast_ramp_target = None
return
last_overload = self.overload_history[-1]
overload_current = last_overload.current_at_overload
# Base margin
margin = self.config['fast_recovery_margin'] # Default: 4A
# Add extra margin for rapid overloads
if self.rapid_overload_count > 0:
extra = self.config['rapid_overload_extra_margin'] * self.rapid_overload_count
margin += extra
self.logger.info(
f"Rapid overload #{self.rapid_overload_count}: using margin of {margin}A"
)
# Calculate fast ramp target
fast_target = overload_current - margin
# Consider power model prediction
if self.power_model.should_use_model():
predicted_max = self.power_model.predict_max_current(self.current_output_power)
# Use the more conservative of the two
if predicted_max < fast_target:
self.logger.info(
f"Power model suggests max {predicted_max:.1f}A at "
f"{self.current_output_power:.0f}W output"
)
fast_target = min(fast_target, predicted_max - 1.0)
# Don't fast-ramp below initial_current
fast_target = max(fast_target, self.config['initial_current'] + 1.0)
# Don't fast-ramp above recovery target
fast_target = min(fast_target, self.recovery_target - 1.0)
self.state.fast_ramp_target = round(fast_target * 2) / 2 # Round to 0.5A
self.logger.info(
f"Fast recovery target: {self.state.fast_ramp_target}A "
f"(overload was at {overload_current}A, margin {margin}A)"
)
def update(self, timestamp: float = None) -> Optional[float]:
"""
Calculate and return the new current limit based on elapsed time.
"""
if timestamp is None:
timestamp = time()
if self.state.ramp_start_time is None:
return None
elapsed = timestamp - self.state.ramp_start_time
# Determine current phase and rate
if self.state.in_fast_ramp and self.state.fast_ramp_target:
# Fast ramp phase
rate = self.config['fast_ramp_rate'] # Fast: 5 A/min
phase_target = self.state.fast_ramp_target
elif self.state.is_recovery:
# Slow recovery phase
rate = self.config['recovery_ramp_rate']
phase_target = self.state.target_limit
else:
# Initial ramp phase
rate = self.config['initial_ramp_rate']
phase_target = self.state.target_limit
# Calculate new limit from ramp start position
start_current = self.state.ramp_start_current or self.config['initial_current']
new_limit = start_current + (elapsed * rate / 60.0)
# Check if we've completed fast ramp phase
if self.state.in_fast_ramp and self.state.fast_ramp_target:
if new_limit >= self.state.fast_ramp_target:
# Transition to slow ramp
self.logger.info(
f"Fast ramp complete at {self.state.fast_ramp_target}A, "
f"switching to slow ramp"
)
self.state.in_fast_ramp = False
# Reset ramp start time and current for slow phase
self.state.ramp_start_time = timestamp
self.state.ramp_start_current = self.state.fast_ramp_target
self.state.current_limit = self.state.fast_ramp_target
new_limit = self.state.fast_ramp_target
# Clamp to target
new_limit = min(new_limit, self.state.target_limit)
# Clamp to absolute maximum
new_limit = min(new_limit, self.config['maximum_current'])
# Round to nearest 0.5A
new_limit = round(new_limit * 2) / 2
# Check if we've reached target
if new_limit >= self.state.target_limit:
self.logger.info(f"Reached target: {new_limit}A")
self.state.ramp_start_time = None
self.state.in_fast_ramp = False
# Start tracking stable time
if self.stable_since is None:
self.stable_since = timestamp
# Update last stable tracking
if self.last_update_time is not None:
time_since_update = timestamp - self.last_update_time
if time_since_update >= self.config['ramp_update_interval']:
self.state.last_stable_current = max(
self.state.last_stable_current,
new_limit - 1.0
)
# Record stable operation for learning
self.power_model.record_stable_operation(
self.current_output_power,
new_limit,
timestamp
)
# Only return if changed
if abs(new_limit - self.state.current_limit) >= 0.5:
self.state.current_limit = new_limit
self.last_update_time = timestamp
return new_limit
return None
def handle_overload(self, timestamp: float = None, output_power: float = None) -> dict:
"""
Handle an overload event.
Returns:
dict with:
- new_limit: Immediate limit to set (initial_current)
- recovery_target: Target for eventual recovery
- overload_count: Total overloads this session
- fast_recovery_target: Target for fast ramp phase
- is_rapid_overload: True if this was a rapid successive overload
"""
if timestamp is None:
timestamp = time()
if output_power is None:
output_power = self.current_output_power
# Check if this is a rapid overload (within threshold of last one)
is_rapid = False
if self.last_overload_time is not None:
time_since_last = timestamp - self.last_overload_time
if time_since_last < self.config['rapid_overload_threshold']:
is_rapid = True
self.rapid_overload_count += 1
self.logger.warning(
f"RAPID overload detected! ({time_since_last:.0f}s since last) "
f"Rapid count: {self.rapid_overload_count}"
)
else:
# Reset rapid count if enough time has passed
self.rapid_overload_count = 0
self.last_overload_time = timestamp
# Record the overload
event = OverloadEvent(
timestamp=timestamp,
current_at_overload=self.state.current_limit,
last_stable_current=self.state.last_stable_current,
output_power=output_power
)
self.overload_history.append(event)
overload_count = len(self.overload_history)
self.logger.warning(
f"OVERLOAD #{overload_count} at {event.current_at_overload}A "
f"(last stable: {event.last_stable_current}A, "
f"output power: {output_power:.0f}W)"
)
# Record overload in power model
self.power_model.record_overload(output_power, event.current_at_overload, timestamp)
# Calculate new recovery target
base_margin = self.config['recovery_margin']
extra_margin = self.config['margin_per_overload'] * (overload_count - 1)
total_margin = base_margin + extra_margin
# Add extra margin for rapid overloads
if is_rapid:
total_margin += self.config['rapid_overload_extra_margin'] * self.rapid_overload_count
# If we were stable at current for long enough, return to that current after recovery
# (overload likely due to step load e.g. heater; generator can handle prior level)
return_to_stable = self.config.get('return_to_stable_after_overload', False)
min_stable_duration = self.config.get('return_to_stable_min_duration', 1800)
used_return_to_stable = False
if return_to_stable and self.stable_since is not None:
stable_duration = timestamp - self.stable_since
if stable_duration >= min_stable_duration:
self.recovery_target = min(
event.current_at_overload,
self.config['target_current']
)
used_return_to_stable = True
self.logger.info(
f"Stable for {stable_duration/60:.1f} min at {event.current_at_overload}A: "
f"recovery target set to {self.recovery_target}A (return to prior current)"
)
if not used_return_to_stable:
self.recovery_target = event.last_stable_current - total_margin
# Cap at (overload current - base margin)
self.recovery_target = min(
self.recovery_target,
event.current_at_overload - base_margin
)
# Don't go below minimum
self.recovery_target = max(
self.recovery_target,
self.config['minimum_current']
)
self.logger.info(
f"Recovery target set to {self.recovery_target}A "
f"(margin: {total_margin}A)"
)
# Calculate fast recovery target for info
fast_margin = self.config['fast_recovery_margin']
if is_rapid:
fast_margin += self.config['rapid_overload_extra_margin'] * self.rapid_overload_count
fast_target = max(
event.current_at_overload - fast_margin,
self.config['initial_current']
)
# Reset state
self.state.current_limit = self.config['initial_current']
self.state.ramp_start_time = None
self.state.in_fast_ramp = False
self.state.fast_ramp_target = None
self.stable_since = None
# Track output power at overload for later comparison
self.output_power_at_last_overload = output_power
self.last_target_reevaluation = timestamp
return {
'new_limit': self.config['initial_current'],
'recovery_target': self.recovery_target,
'overload_count': overload_count,
'fast_recovery_target': fast_target,
'is_rapid_overload': is_rapid,
'rapid_overload_count': self.rapid_overload_count,
}
def check_history_clear(self, timestamp: float = None) -> bool:
"""
Check if we should clear overload history after extended stable operation.
"""
if timestamp is None:
timestamp = time()
if not self.overload_history:
return False
if self.stable_since is None:
return False
stable_duration = timestamp - self.stable_since
if stable_duration >= self.config['history_clear_time']:
self.logger.info(
f"Stable for {stable_duration/60:.1f} minutes, "
f"clearing {len(self.overload_history)} overload events"
)
self.overload_history.clear()
self.recovery_target = self.config['target_current']
self.rapid_overload_count = 0
self.last_overload_time = None
return True
return False
def should_retry_full_power(self) -> bool:
"""
Check if we should attempt ramping to full power again.
"""
if self.overload_history:
return False
if self.state.current_limit >= self.config['target_current']:
return False
if self.is_ramping:
return False
return True
def check_output_power_increase(self, timestamp: float = None) -> Optional[dict]:
"""
Check if output power has increased significantly since last overload.
If so, re-evaluate and potentially raise the recovery target.
Returns:
dict with new_target and should_ramp if target was raised, None otherwise
"""
if timestamp is None:
timestamp = time()
# Only relevant if we have overload history
if not self.overload_history:
return None
# Check minimum interval between re-evaluations
min_interval = self.learning_config.get('min_reevaluation_interval', 60)
if self.last_target_reevaluation is not None:
if timestamp - self.last_target_reevaluation < min_interval:
return None
# Check if output power has increased significantly
threshold = self.learning_config.get('output_power_increase_threshold', 2000)
power_increase = self.current_output_power - self.output_power_at_last_overload
if power_increase < threshold:
return None
# Output power has increased significantly - re-evaluate target
self.logger.info(
f"Output power increased by {power_increase:.0f}W "
f"({self.output_power_at_last_overload:.0f}W -> {self.current_output_power:.0f}W)"
)
# Get model's prediction for current output power
if self.power_model.should_use_model():
new_suggested = self.power_model.predict_max_current(self.current_output_power)
else:
# Without confident model, use zone-based estimate
# Higher output = can handle more input
base = self.config['target_current']
# Add ~1A per 1000W of output power increase
bonus = power_increase / 1000.0
new_suggested = min(base, self.recovery_target + bonus)
# Only raise target, never lower it from this check
if new_suggested <= self.recovery_target:
self.logger.debug(
f"Model suggests {new_suggested:.1f}A, not higher than current "
f"recovery target {self.recovery_target:.1f}A"
)
return None
# Cap at target_current
new_target = min(new_suggested, self.config['target_current'])
# Apply a safety margin - don't go all the way to predicted max
new_target = new_target - 1.0
# Round to 0.5A
new_target = round(new_target * 2) / 2
if new_target <= self.recovery_target:
return None
old_target = self.recovery_target
self.recovery_target = new_target
self.last_target_reevaluation = timestamp
# Update the reference point for future comparisons
self.output_power_at_last_overload = self.current_output_power
self.logger.info(
f"Raised recovery target: {old_target:.1f}A -> {new_target:.1f}A "
f"(output power now {self.current_output_power:.0f}W)"
)
# Determine if we should start a new ramp
should_ramp = not self.is_ramping and self.state.current_limit < new_target
return {
'old_target': old_target,
'new_target': new_target,
'output_power': self.current_output_power,
'should_ramp': should_ramp,
}
def get_suggested_limit(self, output_power: float = None) -> float:
"""
Get the suggested current limit based on output power.
Uses the learned correlation model.
"""
if output_power is None:
output_power = self.current_output_power
if self.power_model.should_use_model():
suggested = self.power_model.predict_max_current(output_power)
return round(suggested * 2) / 2
else:
return self.config['target_current']
def reset(self):
"""Full reset (e.g., when generator stops)"""
self.state = RampState(
current_limit=self.config['initial_current'],
target_limit=self.config['target_current'],
last_stable_current=self.config['initial_current']
)
self.overload_history.clear()
self.recovery_target = self.config['target_current']
self.last_update_time = None
self.stable_since = None
self.rapid_overload_count = 0
self.last_overload_time = None
self.current_output_power = 0.0
self.output_power_at_last_overload = 0.0
self.last_target_reevaluation = None
self.logger.debug("Controller reset")
def get_status(self) -> dict:
"""Get current controller status"""
return {
'current_limit': self.state.current_limit,
'target_limit': self.state.target_limit,
'recovery_target': self.recovery_target,
'is_ramping': self.is_ramping,
'is_recovery': self.state.is_recovery,
'in_fast_ramp': self.state.in_fast_ramp,
'fast_ramp_target': self.state.fast_ramp_target,
'last_stable': self.state.last_stable_current,
'overload_count': len(self.overload_history),
'rapid_overload_count': self.rapid_overload_count,
'stable_since': self.stable_since,
'output_power': self.current_output_power,
'output_power_at_overload': self.output_power_at_last_overload,
'power_model': self.power_model.get_status(),
'suggested_limit': self.get_suggested_limit(),
}
# For testing
if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
controller = RampController()
print("=== Test: Normal Ramp ===")
controller.start_ramp(0)
for minute in range(35):
t = minute * 60
new_limit = controller.update(t)
if new_limit or minute % 5 == 0:
status = controller.get_status()
print(f" t={minute}min: limit={status['current_limit']}A, ramping={status['is_ramping']}")
print("\n=== Test: Overload at 45A with Fast Recovery ===")
controller.reset()
controller.set_output_power(2500) # Simulate output power
controller.start_ramp(0)
# Ramp to 45A (about 15 minutes)
for minute in range(16):
controller.update(minute * 60)
print(f" Before overload: limit={controller.get_status()['current_limit']}A")
# Simulate overload at 45A
result = controller.handle_overload(16 * 60, output_power=2500)
print(f" After overload: new_limit={result['new_limit']}A, "
f"recovery_target={result['recovery_target']}A, "
f"fast_target={result['fast_recovery_target']:.1f}A")
# After cooldown, start recovery with fast ramp
print("\n Starting recovery ramp...")
controller.start_ramp(21 * 60) # 5 min cooldown
for second in range(0, 600, 10): # 10 minutes in 10-second increments
t = 21 * 60 + second
new_limit = controller.update(t)
if new_limit:
status = controller.get_status()
phase = "FAST" if status['in_fast_ramp'] else "SLOW"
print(f" t={21 + second/60:.1f}min: limit={status['current_limit']}A [{phase}]")
print("\n=== Test: Rapid Successive Overloads ===")
controller.reset()
controller.set_output_power(1500) # Low output
controller.start_ramp(0)
# Ramp to 44A
for minute in range(12):
controller.update(minute * 60)
# First overload
result1 = controller.handle_overload(12 * 60)
print(f" 1st overload: margin=4A, fast_target={result1['fast_recovery_target']:.1f}A")
# Quick recovery and second overload within 2 minutes
controller.start_ramp(17 * 60)
for second in range(0, 90, 10):
controller.update(17 * 60 + second)
result2 = controller.handle_overload(18.5 * 60) # 1.5 min later
print(f" 2nd RAPID overload: margin=6A, fast_target={result2['fast_recovery_target']:.1f}A, "
f"is_rapid={result2['is_rapid_overload']}")
# Third overload also rapid
controller.start_ramp(24 * 60)
for second in range(0, 60, 10):
controller.update(24 * 60 + second)
result3 = controller.handle_overload(25 * 60) # 1 min later
print(f" 3rd RAPID overload: margin=8A, fast_target={result3['fast_recovery_target']:.1f}A, "
f"rapid_count={result3['rapid_overload_count']}")
print("\n=== Test: Power Model Suggestions ===")
model = controller.power_model
print(f" Model status: {model.get_status()}")
print(f" Suggested limit at 1500W output: {controller.get_suggested_limit(1500):.1f}A")
print(f" Suggested limit at 4000W output: {controller.get_suggested_limit(4000):.1f}A")
print(f" Suggested limit at 6000W output: {controller.get_suggested_limit(6000):.1f}A")