Files
venus/dbus-generator-ramp/overload_detector.py
dev 9756538f16 Initial commit: Venus OS boat addons monorepo
Organizes 11 projects for Cerbo GX/Venus OS into a single repository:
- axiom-nmea: Raymarine LightHouse protocol decoder
- dbus-generator-ramp: Generator current ramp controller
- dbus-lightning: Blitzortung lightning monitor
- dbus-meteoblue-forecast: Meteoblue weather forecast
- dbus-no-foreign-land: noforeignland.com tracking
- dbus-tides: Tide prediction from depth + harmonics
- dbus-vrm-history: VRM cloud history proxy
- dbus-windy-station: Windy.com weather upload
- mfd-custom-app: MFD app deployment package
- venus-html5-app: Custom Victron HTML5 app fork
- watermaker: Watermaker PLC control UI

Adds root README, .gitignore, project template, and per-project
.gitignore files. Sensitive config files excluded via .gitignore
with .example templates provided.

Made-with: Cursor
2026-03-16 17:04:16 +00:00

545 lines
20 KiB
Python

"""
Overload Detector for Generator Current Ramp Controller
Detects generator overload by analyzing power fluctuation patterns.
Uses a hybrid approach combining multiple detection methods.
Key Design Goals:
- Detect erratic oscillations (overload) - TRIGGER
- Ignore smooth load changes (normal) - IGNORE
- Minimize false positives with confirmation buffer
"""
import logging
from collections import deque
from time import time
try:
import numpy as np
HAS_NUMPY = True
except ImportError:
HAS_NUMPY = False
import math
from config import OVERLOAD_CONFIG
class OverloadDetector:
"""
Hybrid overload detector combining:
1. Rate-of-change reversal detection (catches rapid oscillations)
2. Detrended standard deviation (ignores smooth ramps)
Both methods must agree, plus confirmation over multiple samples.
"""
def __init__(self, config=None):
self.config = config or OVERLOAD_CONFIG
self.logger = logging.getLogger('OverloadDetector')
# Power sample buffer for std dev calculation (INPUT power - actual generator load)
self.power_buffer = deque(maxlen=self.config['std_dev_window_size'])
# Derivative buffer for reversal detection
self.derivative_buffer = deque(maxlen=self.config['reversal_window_size'])
# Confirmation buffer
self.confirmation_buffer = deque(maxlen=self.config['confirmation_window'])
# State tracking
self.last_power = None
self.last_derivative = None
self.lockout_until = 0
# Grace period tracking (suppresses detection after ramp starts)
self.ramp_start_time = None
# Optional smoothing
self.smoothed_power = None
# Diagnostics
self.last_diagnostics = {}
def set_ramp_start(self, timestamp: float = None):
"""
Mark the start of a ramp phase. Suppresses detection during grace period.
Args:
timestamp: Time when ramp started (defaults to time.time())
"""
if timestamp is None:
timestamp = time()
self.ramp_start_time = timestamp
grace_period = self.config.get('ramp_start_grace_period', 30)
self.logger.info(f"Ramp start marked - grace period: {grace_period}s")
def update(self, l1_power: float, l2_power: float, timestamp: float = None,
output_power: float = None) -> tuple:
"""
Process new power readings.
Args:
l1_power: Power on L1 in Watts
l2_power: Power on L2 in Watts (0 if single phase)
timestamp: Current time (defaults to time.time())
output_power: Total AC output power (for filtering false positives)
Returns:
tuple: (is_overload: bool, diagnostics: dict)
"""
if timestamp is None:
timestamp = time()
total_power = (l1_power or 0) + (l2_power or 0)
# Check lockout
if timestamp < self.lockout_until:
return False, {
'status': 'lockout',
'lockout_remaining': self.lockout_until - timestamp
}
# Check grace period after ramp start
grace_period = self.config.get('ramp_start_grace_period', 30)
if self.ramp_start_time is not None:
elapsed_since_ramp = timestamp - self.ramp_start_time
if elapsed_since_ramp < grace_period:
return False, {
'status': 'grace_period',
'grace_remaining': grace_period - elapsed_since_ramp
}
# Optional smoothing
if self.config['use_smoothing']:
if self.smoothed_power is None:
self.smoothed_power = total_power
else:
alpha = self.config['smoothing_alpha']
self.smoothed_power = alpha * total_power + (1 - alpha) * self.smoothed_power
analysis_power = self.smoothed_power
else:
analysis_power = total_power
# Update power buffer (INPUT power - actual generator load)
self.power_buffer.append(analysis_power)
# Calculate derivative
if self.last_power is not None:
derivative = analysis_power - self.last_power
self.derivative_buffer.append(derivative)
else:
derivative = 0
self.last_power = analysis_power
# Need minimum samples before detection
min_samples = max(
self.config['reversal_window_size'] // 2,
self.config['std_dev_window_size'] // 2
)
if len(self.power_buffer) < min_samples:
return False, {
'status': 'warming_up',
'samples': len(self.power_buffer),
'needed': min_samples
}
# --- Method 1: Count derivative reversals ---
reversals = self._count_reversals()
method1_triggered = reversals >= self.config['reversal_threshold']
# --- Method 2: Detrended standard deviation ---
std_dev = self._detrended_std_dev()
method2_triggered = std_dev > self.config['std_dev_threshold']
# --- Combine methods: both must agree ---
instant_detection = method1_triggered and method2_triggered
# --- Check minimum output power requirement ---
# A true overload requires significant load - without it, fluctuations
# are likely just load switching (e.g., appliance turning off)
min_output = self.config.get('min_output_power_for_overload', 0)
output_power_ok = output_power is None or output_power >= min_output
instant_detection = instant_detection and output_power_ok
# --- Check trend direction ---
# If power is dropping significantly, it's a load decrease, not overload
# True overloads oscillate at/near a ceiling, not trend downward
trend = self._calculate_trend()
trend_threshold = self.config.get('trend_drop_threshold', -100)
trend_ok = trend >= trend_threshold # OK if trend is not dropping fast
instant_detection = instant_detection and trend_ok
# --- Check power drop from recent maximum ---
# If current power is well below recent max, we've DROPPED from ceiling
# True overloads oscillate AT/NEAR the ceiling, not below it
# Note: This checks INPUT power - the actual generator load
# Output power is NOT reliable because charger can take over freed capacity
recent_max = max(self.power_buffer) if self.power_buffer else total_power
power_drop = recent_max - total_power
max_drop_allowed = self.config.get('max_power_drop_for_overload', 1000)
power_drop_ok = power_drop <= max_drop_allowed
instant_detection = instant_detection and power_drop_ok
# --- Confirmation: sustained detection ---
self.confirmation_buffer.append(1 if instant_detection else 0)
confirmed_count = sum(self.confirmation_buffer)
is_overload = confirmed_count >= self.config['confirmation_threshold']
# Build diagnostics
self.last_diagnostics = {
'status': 'monitoring',
'total_power': total_power,
'output_power': output_power,
'min_output_required': min_output,
'output_power_ok': output_power_ok,
'trend': round(trend, 1),
'trend_threshold': trend_threshold,
'trend_ok': trend_ok,
'recent_max': round(recent_max, 0),
'power_drop': round(power_drop, 0),
'max_drop_allowed': max_drop_allowed,
'power_drop_ok': power_drop_ok,
'reversals': reversals,
'reversal_threshold': self.config['reversal_threshold'],
'method1_triggered': method1_triggered,
'std_dev': round(std_dev, 1),
'std_dev_threshold': self.config['std_dev_threshold'],
'method2_triggered': method2_triggered,
'instant_detection': instant_detection,
'confirmed_count': confirmed_count,
'confirmation_threshold': self.config['confirmation_threshold'],
'is_overload': is_overload,
}
# Apply lockout if triggered
if is_overload:
self.lockout_until = timestamp + self.config['lockout_duration']
self.logger.warning(
f"OVERLOAD DETECTED! reversals={reversals}, std_dev={std_dev:.1f}W, "
f"trend={trend:.1f}W/s, power_drop={power_drop:.0f}W"
)
# Clear confirmation buffer after trigger
self.confirmation_buffer.clear()
return is_overload, self.last_diagnostics
def _count_reversals(self) -> int:
"""
Count significant sign changes in the derivative.
A reversal is when the derivative changes sign AND both the
previous and current derivative exceed the threshold magnitude.
"""
if len(self.derivative_buffer) < 3:
return 0
threshold = self.config['derivative_threshold']
derivs = list(self.derivative_buffer)
reversals = 0
prev_sign = None
for d in derivs:
# Ignore small changes
if abs(d) < threshold:
continue
current_sign = 1 if d > 0 else -1
if prev_sign is not None and current_sign != prev_sign:
reversals += 1
prev_sign = current_sign
return reversals
def _detrended_std_dev(self) -> float:
"""
Calculate standard deviation after removing linear trend.
This allows us to ignore smooth ramps up or down while
still detecting oscillations around the trend.
"""
if len(self.power_buffer) < 10:
return 0.0
data = list(self.power_buffer)
n = len(data)
if HAS_NUMPY:
# Use numpy for efficiency
arr = np.array(data)
x = np.arange(n)
# Fit linear trend
coeffs = np.polyfit(x, arr, 1)
trend = np.polyval(coeffs, x)
# Detrend and calculate std dev
detrended = arr - trend
return float(np.std(detrended))
else:
# Pure Python fallback
# Calculate linear regression manually
x_mean = (n - 1) / 2.0
y_mean = sum(data) / n
numerator = sum((i - x_mean) * (data[i] - y_mean) for i in range(n))
denominator = sum((i - x_mean) ** 2 for i in range(n))
if denominator == 0:
slope = 0
else:
slope = numerator / denominator
intercept = y_mean - slope * x_mean
# Calculate detrended values
detrended = [data[i] - (slope * i + intercept) for i in range(n)]
# Calculate std dev
mean_detrended = sum(detrended) / n
variance = sum((x - mean_detrended) ** 2 for x in detrended) / n
return math.sqrt(variance)
def _calculate_trend(self) -> float:
"""
Calculate the trend slope in watts per second.
Returns positive for rising power, negative for falling power.
This helps filter out load-drop scenarios from true overloads.
"""
if len(self.power_buffer) < 10:
return 0.0
data = list(self.power_buffer)
n = len(data)
# Sample interval in seconds
sample_interval = self.config['sample_interval_ms'] / 1000.0
if HAS_NUMPY:
arr = np.array(data)
x = np.arange(n)
coeffs = np.polyfit(x, arr, 1)
slope_per_sample = coeffs[0]
else:
x_mean = (n - 1) / 2.0
y_mean = sum(data) / n
numerator = sum((i - x_mean) * (data[i] - y_mean) for i in range(n))
denominator = sum((i - x_mean) ** 2 for i in range(n))
if denominator == 0:
slope_per_sample = 0
else:
slope_per_sample = numerator / denominator
# Convert from watts/sample to watts/second
return slope_per_sample / sample_interval
def reset(self):
"""Reset detector state (e.g., after generator stops)"""
self.power_buffer.clear()
self.derivative_buffer.clear()
self.confirmation_buffer.clear()
self.last_power = None
self.last_derivative = None
self.smoothed_power = None
self.lockout_until = 0
self.ramp_start_time = None
self.last_diagnostics = {}
self.logger.debug("Detector reset")
def get_diagnostics(self) -> dict:
"""Get the most recent diagnostics"""
return self.last_diagnostics
def dump_overload_debug(self, current_limit: float = None, output_power: float = None) -> str:
"""
Generate comprehensive debug dump when overload is detected.
Call this method when an overload triggers to log detailed data
for algorithm tuning. Returns formatted string and logs it.
Args:
current_limit: Current input current limit setting (Amps)
output_power: Total AC output power (Watts)
Returns:
Formatted debug string
"""
diag = self.last_diagnostics
# Get buffer contents
power_samples = list(self.power_buffer)
deriv_samples = list(self.derivative_buffer)
confirm_samples = list(self.confirmation_buffer)
# Calculate some additional stats
if power_samples:
power_min = min(power_samples)
power_max = max(power_samples)
power_range = power_max - power_min
power_mean = sum(power_samples) / len(power_samples)
else:
power_min = power_max = power_range = power_mean = 0
if deriv_samples:
deriv_min = min(deriv_samples)
deriv_max = max(deriv_samples)
deriv_mean = sum(deriv_samples) / len(deriv_samples)
# Count positive/negative derivatives
deriv_pos = sum(1 for d in deriv_samples if d > self.config['derivative_threshold'])
deriv_neg = sum(1 for d in deriv_samples if d < -self.config['derivative_threshold'])
else:
deriv_min = deriv_max = deriv_mean = deriv_pos = deriv_neg = 0
# Format the dump
lines = [
"",
"=" * 70,
"OVERLOAD DEBUG DUMP",
"=" * 70,
"",
"--- DETECTION RESULT ---",
f" Overload Confirmed: {diag.get('is_overload', False)}",
f" Current Limit: {current_limit}A" if current_limit else " Current Limit: N/A",
f" Output Power: {output_power:.0f}W" if output_power else " Output Power: N/A",
"",
"--- THRESHOLDS (config) ---",
f" reversal_threshold: {self.config['reversal_threshold']}",
f" derivative_threshold: {self.config['derivative_threshold']}W",
f" std_dev_threshold: {self.config['std_dev_threshold']}W",
f" confirmation_threshold: {self.config['confirmation_threshold']}/{self.config['confirmation_window']}",
f" trend_drop_threshold: {self.config.get('trend_drop_threshold', -100)}W/s",
f" max_power_drop_for_overload: {self.config.get('max_power_drop_for_overload', 1000)}W",
f" min_output_power_for_overload: {self.config.get('min_output_power_for_overload', 0)}W",
"",
"--- METHOD 1: REVERSALS ---",
f" Reversals counted: {diag.get('reversals', 0)} (threshold: {self.config['reversal_threshold']})",
f" Method1 triggered: {diag.get('method1_triggered', False)}",
f" Significant positive derivatives: {deriv_pos}",
f" Significant negative derivatives: {deriv_neg}",
f" Derivative range: {deriv_min:.0f}W to {deriv_max:.0f}W",
"",
"--- METHOD 2: STD DEV ---",
f" Detrended std_dev: {diag.get('std_dev', 0):.1f}W (threshold: {self.config['std_dev_threshold']}W)",
f" Method2 triggered: {diag.get('method2_triggered', False)}",
f" Power range: {power_min:.0f}W to {power_max:.0f}W (delta: {power_range:.0f}W)",
f" Power mean: {power_mean:.0f}W",
"",
"--- FILTERS ---",
f" Trend: {diag.get('trend', 0):.1f}W/s (threshold: {diag.get('trend_threshold', -100)}W/s)",
f" Trend OK: {diag.get('trend_ok', True)}",
f" Recent max power: {diag.get('recent_max', 0):.0f}W",
f" Power drop from max: {diag.get('power_drop', 0):.0f}W (max allowed: {diag.get('max_drop_allowed', 1000)}W)",
f" Power drop OK: {diag.get('power_drop_ok', True)}",
f" Output power OK: {diag.get('output_power_ok', True)}",
"",
"--- CONFIRMATION ---",
f" Instant detection: {diag.get('instant_detection', False)}",
f" Confirmed samples: {diag.get('confirmed_count', 0)}/{self.config['confirmation_threshold']}",
f" Confirmation buffer: {confirm_samples}",
"",
"--- POWER BUFFER (last {0} samples) ---".format(len(power_samples)),
]
# Add power samples in rows of 10
for i in range(0, len(power_samples), 10):
chunk = power_samples[i:i+10]
formatted = [f"{p:.0f}" for p in chunk]
lines.append(f" [{i:2d}-{i+len(chunk)-1:2d}]: {', '.join(formatted)}")
lines.extend([
"",
"--- DERIVATIVE BUFFER (last {0} samples) ---".format(len(deriv_samples)),
])
# Add derivative samples in rows of 10
for i in range(0, len(deriv_samples), 10):
chunk = deriv_samples[i:i+10]
formatted = [f"{d:+.0f}" for d in chunk]
lines.append(f" [{i:2d}-{i+len(chunk)-1:2d}]: {', '.join(formatted)}")
lines.extend([
"",
"=" * 70,
"",
])
debug_output = "\n".join(lines)
# Log it
self.logger.warning(debug_output)
return debug_output
class SimpleThresholdDetector:
"""
Simple alternative detector using just power variance.
Use this if the hybrid approach is too sensitive or complex.
"""
def __init__(self, variance_threshold=100000, window_size=20):
self.variance_threshold = variance_threshold
self.buffer = deque(maxlen=window_size)
self.logger = logging.getLogger('SimpleDetector')
def update(self, l1_power: float, l2_power: float, timestamp: float = None) -> tuple:
total_power = (l1_power or 0) + (l2_power or 0)
self.buffer.append(total_power)
if len(self.buffer) < 10:
return False, {'status': 'warming_up'}
# Calculate variance
data = list(self.buffer)
mean = sum(data) / len(data)
variance = sum((x - mean) ** 2 for x in data) / len(data)
is_overload = variance > self.variance_threshold
return is_overload, {
'variance': variance,
'threshold': self.variance_threshold,
'is_overload': is_overload
}
def reset(self):
self.buffer.clear()
# For testing
if __name__ == '__main__':
import random
logging.basicConfig(level=logging.DEBUG)
detector = OverloadDetector()
print("Testing with stable power...")
for i in range(50):
power = 8000 + random.gauss(0, 30)
result, diag = detector.update(power, 0)
if i % 10 == 0:
print(f" Sample {i}: power={power:.0f}, overload={result}")
print("\nTesting with smooth ramp...")
detector.reset()
for i in range(50):
power = 8000 + i * 50 + random.gauss(0, 30) # Ramping up
result, diag = detector.update(power, 0)
if i % 10 == 0:
print(f" Sample {i}: power={power:.0f}, overload={result}")
print("\nTesting with oscillation (overload)...")
detector.reset()
for i in range(50):
oscillation = 600 * (1 if i % 4 < 2 else -1) # Square wave oscillation
power = 8000 + oscillation + random.gauss(0, 30)
result, diag = detector.update(power, 0)
if result or i % 10 == 0:
print(f" Sample {i}: power={power:.0f}, overload={result}, diag={diag}")