Files
venus/dbus-generator-ramp/dbus-generator-ramp.py

1121 lines
50 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Venus OS Generator Current Ramp Controller
Monitors generator operation and dynamically adjusts inverter/charger
input current limit to prevent generator overload.
This version publishes its own D-Bus service so status is visible via MQTT:
N/<vrm-id>/generatorramp/0/...
Features:
- Preemptively sets 40A limit when generator enters warm-up
- Ramps from 40A to 50A over 30 minutes after AC connects
- Detects generator overload via power fluctuation analysis
- Rolls back to 40A on overload, then conservatively ramps back up
- Publishes status to D-Bus/MQTT for monitoring
- Settings adjustable via D-Bus/MQTT
Author: Claude (Anthropic)
License: MIT
"""
import sys
import os
import logging
import signal
from time import time, sleep
# Add velib_python to path (Venus OS standard location)
sys.path.insert(1, os.path.join(os.path.dirname(__file__), 'ext', 'velib_python'))
sys.path.insert(1, '/opt/victronenergy/velib_python')
try:
from gi.repository import GLib
except ImportError:
print("ERROR: GLib not available. This script must run on Venus OS.")
sys.exit(1)
try:
import dbus
from dbus.mainloop.glib import DBusGMainLoop
from vedbus import VeDbusService
from settingsdevice import SettingsDevice
except ImportError as e:
print(f"ERROR: Required module not available: {e}")
print("This script must run on Venus OS.")
sys.exit(1)
from config import (
DBUS_CONFIG, GENERATOR_STATE, RAMP_CONFIG,
OVERLOAD_CONFIG, LOGGING_CONFIG, TIMING_CONFIG, LEARNING_CONFIG
)
from overload_detector import OverloadDetector
from ramp_controller import RampController
# Version
VERSION = '2.2.0'
# D-Bus service name for our addon
SERVICE_NAME = 'com.victronenergy.generatorramp'
class GeneratorRampController:
"""
Main controller that coordinates:
- D-Bus monitoring of generator and inverter state
- Overload detection from power readings
- Current limit ramping
- Publishing status to D-Bus (visible via MQTT)
"""
# Controller states
STATE_IDLE = 0
STATE_WARMUP = 1
STATE_RAMPING = 2
STATE_COOLDOWN = 3
STATE_RECOVERY = 4
STATE_STABLE = 5
STATE_NAMES = {
0: 'Idle',
1: 'Warm-up',
2: 'Ramping',
3: 'Cooldown',
4: 'Recovery',
5: 'Stable',
}
def __init__(self):
self._setup_logging()
self.logger = logging.getLogger('GenRampCtrl')
self.logger.info(f"Initializing Generator Ramp Controller v{VERSION}")
# Components
self.overload_detector = OverloadDetector(OVERLOAD_CONFIG)
self.ramp_controller = RampController(RAMP_CONFIG)
# State
self.state = self.STATE_IDLE
self.state_enter_time = time()
# D-Bus proxy cache (avoid per-tick get_object leak)
self._proxy_cache = {}
# Cached values from D-Bus
self.generator_state = GENERATOR_STATE['STOPPED']
self.ac_connected = False
self.current_l1_power = 0
self.current_l2_power = 0
self.current_limit_setting = 0
# Output power tracking (loads on inverter output)
self.output_l1_power = 0
self.output_l2_power = 0
# Enabled flag
self.enabled = True
# Energy accumulation (Wh -> kWh)
self._run_energy_wh = 0.0
self._last_run_energy_wh = 0.0
self._energy_last_time = None
# Signal subscription tracking
self._power_subscriptions_active = False
# D-Bus connection
self.bus = dbus.SystemBus()
# Create our D-Bus service for publishing status
self._create_dbus_service()
# Set up settings (stored in Venus localsettings)
self._setup_settings()
# Connect to VE.Bus and Generator services
self._init_dbus_monitors()
# Start main loop timer
interval_ms = TIMING_CONFIG['main_loop_interval_ms']
GLib.timeout_add(interval_ms, self._main_loop)
self.logger.info(f"Initialized. Main loop interval: {interval_ms}ms")
def _setup_logging(self):
"""Configure logging based on config"""
level = getattr(logging, LOGGING_CONFIG['level'], logging.INFO)
if LOGGING_CONFIG['include_timestamp']:
fmt = '%(asctime)s %(levelname)s %(name)s: %(message)s'
else:
fmt = '%(levelname)s %(name)s: %(message)s'
logging.basicConfig(
level=level,
format=fmt,
stream=sys.stdout
)
def _create_dbus_service(self):
"""Create our own D-Bus service for publishing status"""
self.logger.info(f"Creating D-Bus service: {SERVICE_NAME}")
# Retry logic in case previous instance hasn't released the bus name yet
max_retries = 5
retry_delay = 1.0 # seconds
for attempt in range(max_retries):
try:
self.dbus_service = VeDbusService(SERVICE_NAME, self.bus, register=False)
break # Success
except dbus.exceptions.NameExistsException:
if attempt < max_retries - 1:
self.logger.warning(
f"D-Bus name exists, retrying in {retry_delay}s "
f"(attempt {attempt + 1}/{max_retries})"
)
sleep(retry_delay)
retry_delay *= 2 # Exponential backoff
else:
self.logger.error("Failed to acquire D-Bus name after retries")
raise
# Add management paths (required for Venus)
self.dbus_service.add_path('/Mgmt/ProcessName', 'dbus-generator-ramp')
self.dbus_service.add_path('/Mgmt/ProcessVersion', VERSION)
self.dbus_service.add_path('/Mgmt/Connection', 'local')
# Add device info
self.dbus_service.add_path('/DeviceInstance', 0)
self.dbus_service.add_path('/ProductId', 0xFFFF)
self.dbus_service.add_path('/ProductName', 'Generator Ramp Controller')
self.dbus_service.add_path('/FirmwareVersion', VERSION)
self.dbus_service.add_path('/Connected', 1)
# Status paths (read-only) - these will be visible in MQTT
self.dbus_service.add_path('/State', self.STATE_IDLE,
gettextcallback=lambda p, v: self.STATE_NAMES.get(v, 'Unknown'))
self.dbus_service.add_path('/CurrentLimit', 0.0,
gettextcallback=lambda p, v: f"{v:.1f}A")
self.dbus_service.add_path('/TargetLimit', RAMP_CONFIG['target_current'],
gettextcallback=lambda p, v: f"{v:.1f}A")
self.dbus_service.add_path('/RecoveryTarget', RAMP_CONFIG['target_current'],
gettextcallback=lambda p, v: f"{v:.1f}A")
self.dbus_service.add_path('/OverloadCount', 0)
self.dbus_service.add_path('/LastStableCurrent', RAMP_CONFIG['initial_current'],
gettextcallback=lambda p, v: f"{v:.1f}A")
# Input power monitoring
self.dbus_service.add_path('/Power/L1', 0.0,
gettextcallback=lambda p, v: f"{v:.0f}W")
self.dbus_service.add_path('/Power/L2', 0.0,
gettextcallback=lambda p, v: f"{v:.0f}W")
self.dbus_service.add_path('/Power/Total', 0.0,
gettextcallback=lambda p, v: f"{v:.0f}W")
# Output power monitoring (loads on inverter output)
self.dbus_service.add_path('/OutputPower/L1', 0.0,
gettextcallback=lambda p, v: f"{v:.0f}W")
self.dbus_service.add_path('/OutputPower/L2', 0.0,
gettextcallback=lambda p, v: f"{v:.0f}W")
self.dbus_service.add_path('/OutputPower/Total', 0.0,
gettextcallback=lambda p, v: f"{v:.0f}W")
# Overload detection diagnostics
self.dbus_service.add_path('/Detection/Reversals', 0)
self.dbus_service.add_path('/Detection/StdDev', 0.0,
gettextcallback=lambda p, v: f"{v:.1f}W")
self.dbus_service.add_path('/Detection/IsOverload', 0)
self.dbus_service.add_path('/Detection/OutputPowerOk', 1) # 1=sufficient load for detection
self.dbus_service.add_path('/Detection/Trend', 0.0,
gettextcallback=lambda p, v: f"{v:.1f}W/s")
self.dbus_service.add_path('/Detection/TrendOk', 1) # 1=not dropping fast
self.dbus_service.add_path('/Detection/PowerDrop', 0.0,
gettextcallback=lambda p, v: f"{v:.0f}W")
self.dbus_service.add_path('/Detection/PowerDropOk', 1) # 1=input not dropped from ceiling
# Generator and AC status (mirrors)
self.dbus_service.add_path('/Generator/State', 0,
gettextcallback=self._generator_state_text)
self.dbus_service.add_path('/AcInput/Connected', 0)
# Ramp progress
self.dbus_service.add_path('/Ramp/Progress', 0,
gettextcallback=lambda p, v: f"{v}%")
self.dbus_service.add_path('/Ramp/TimeRemaining', 0,
gettextcallback=lambda p, v: f"{v//60}m {v%60}s" if v > 0 else "0s")
# Energy accumulation (kWh)
self.dbus_service.add_path('/Energy/CurrentRun', 0.0,
gettextcallback=lambda p, v: f"{v:.1f} kWh")
self.dbus_service.add_path('/Energy/LastRun', 0.0,
gettextcallback=lambda p, v: f"{v:.1f} kWh")
# Fast recovery status
self.dbus_service.add_path('/Recovery/InFastRamp', 0)
self.dbus_service.add_path('/Recovery/FastRampTarget', 0.0,
gettextcallback=lambda p, v: f"{v:.1f}A")
self.dbus_service.add_path('/Recovery/RapidOverloadCount', 0)
# Learning model status
self.dbus_service.add_path('/Learning/Confidence', 0)
self.dbus_service.add_path('/Learning/ConfidenceLevel', 'LOW')
self.dbus_service.add_path('/Learning/BaseCurrent', LEARNING_CONFIG['initial_base_current'],
gettextcallback=lambda p, v: f"{v:.1f}A")
self.dbus_service.add_path('/Learning/SuggestedLimit', RAMP_CONFIG['target_current'],
gettextcallback=lambda p, v: f"{v:.1f}A")
self.dbus_service.add_path('/Learning/DataPoints', 0)
self.dbus_service.add_path('/Learning/OverloadPoints', 0)
# Writable settings (can be changed via D-Bus/MQTT)
self.dbus_service.add_path('/Settings/InitialCurrent',
RAMP_CONFIG['initial_current'],
writeable=True,
onchangecallback=self._on_setting_changed,
gettextcallback=lambda p, v: f"{v:.1f}A")
self.dbus_service.add_path('/Settings/TargetCurrent',
RAMP_CONFIG['target_current'],
writeable=True,
onchangecallback=self._on_setting_changed,
gettextcallback=lambda p, v: f"{v:.1f}A")
self.dbus_service.add_path('/Settings/RampDuration',
30, # minutes
writeable=True,
onchangecallback=self._on_setting_changed,
gettextcallback=lambda p, v: f"{v} min")
self.dbus_service.add_path('/Settings/CooldownDuration',
int(RAMP_CONFIG['cooldown_duration'] // 60),
writeable=True,
onchangecallback=self._on_setting_changed,
gettextcallback=lambda p, v: f"{v} min")
self.dbus_service.add_path('/Settings/Enabled',
1,
writeable=True,
onchangecallback=self._on_setting_changed)
# Power zone settings (output-based input current limits)
# These control how input current limit varies with output load
self.dbus_service.add_path('/Settings/LowOutputLimit',
LEARNING_CONFIG['initial_base_current'],
writeable=True,
onchangecallback=self._on_setting_changed,
gettextcallback=lambda p, v: f"{v:.0f}A")
self.dbus_service.add_path('/Settings/MediumOutputLimit',
LEARNING_CONFIG['initial_base_current'] + LEARNING_CONFIG['power_zones']['MEDIUM'][2],
writeable=True,
onchangecallback=self._on_setting_changed,
gettextcallback=lambda p, v: f"{v:.0f}A")
self.dbus_service.add_path('/Settings/HighOutputLimit',
LEARNING_CONFIG['initial_base_current'] + LEARNING_CONFIG['power_zones']['HIGH'][2],
writeable=True,
onchangecallback=self._on_setting_changed,
gettextcallback=lambda p, v: f"{v:.0f}A")
self.dbus_service.add_path('/Settings/LowOutputThreshold',
LEARNING_CONFIG['power_zones']['LOW'][1],
writeable=True,
onchangecallback=self._on_setting_changed,
gettextcallback=lambda p, v: f"{v:.0f}W")
self.dbus_service.add_path('/Settings/HighOutputThreshold',
LEARNING_CONFIG['power_zones']['MEDIUM'][1],
writeable=True,
onchangecallback=self._on_setting_changed,
gettextcallback=lambda p, v: f"{v:.0f}W")
# Return to prior stable current after overload (if stable > N minutes)
self.dbus_service.add_path('/Settings/ReturnToStableAfterOverload',
1 if RAMP_CONFIG.get('return_to_stable_after_overload', True) else 0,
writeable=True,
onchangecallback=self._on_setting_changed)
self.dbus_service.add_path('/Settings/ReturnToStableMinMinutes',
int(RAMP_CONFIG.get('return_to_stable_min_duration', 1800) // 60),
writeable=True,
onchangecallback=self._on_setting_changed,
gettextcallback=lambda p, v: f"{v} min")
self.dbus_service.register()
self.logger.info("D-Bus service created")
def _generator_state_text(self, path, value):
"""Get text for generator state"""
states = {0: 'Stopped', 1: 'Running', 2: 'Warm-up', 3: 'Cool-down', 10: 'Error'}
return states.get(value, f'Unknown ({value})')
def _on_setting_changed(self, path, value):
"""Handle setting changes from D-Bus/MQTT"""
self.logger.info(f"Setting changed: {path} = {value}")
if path == '/Settings/InitialCurrent':
RAMP_CONFIG['initial_current'] = float(value)
self._save_setting('InitialCurrent', float(value))
elif path == '/Settings/TargetCurrent':
RAMP_CONFIG['target_current'] = float(value)
self.dbus_service['/TargetLimit'] = float(value)
self._save_setting('TargetCurrent', float(value))
elif path == '/Settings/RampDuration':
duration_min = int(value)
delta = RAMP_CONFIG['target_current'] - RAMP_CONFIG['initial_current']
RAMP_CONFIG['initial_ramp_rate'] = delta / duration_min if duration_min > 0 else 0.333
self._save_setting('RampDuration', duration_min)
elif path == '/Settings/CooldownDuration':
RAMP_CONFIG['cooldown_duration'] = int(value) * 60
self._save_setting('CooldownDuration', int(value))
elif path == '/Settings/Enabled':
self.enabled = bool(value)
self._save_setting('Enabled', int(value))
if not self.enabled:
self.logger.info("Controller disabled")
self._transition_to(self.STATE_IDLE)
# Power zone settings
elif path == '/Settings/LowOutputLimit':
self._update_power_zones(low_limit=float(value))
self._save_setting('LowOutputLimit', float(value))
elif path == '/Settings/MediumOutputLimit':
self._update_power_zones(medium_limit=float(value))
self._save_setting('MediumOutputLimit', float(value))
elif path == '/Settings/HighOutputLimit':
self._update_power_zones(high_limit=float(value))
self._save_setting('HighOutputLimit', float(value))
elif path == '/Settings/LowOutputThreshold':
self._update_power_zones(low_threshold=int(value))
self._save_setting('LowOutputThreshold', int(value))
elif path == '/Settings/HighOutputThreshold':
self._update_power_zones(high_threshold=int(value))
self._save_setting('HighOutputThreshold', int(value))
elif path == '/Settings/ReturnToStableAfterOverload':
RAMP_CONFIG['return_to_stable_after_overload'] = bool(value)
self._save_setting('ReturnToStableAfterOverload', int(value))
elif path == '/Settings/ReturnToStableMinMinutes':
RAMP_CONFIG['return_to_stable_min_duration'] = int(value) * 60
self._save_setting('ReturnToStableMinMinutes', int(value))
return True
def _save_setting(self, name, value):
"""Save a setting to localsettings"""
if self.settings:
try:
self.settings[name] = value
except Exception as e:
self.logger.warning(f"Failed to save setting {name}: {e}")
def _update_power_zones(self, low_limit=None, medium_limit=None, high_limit=None,
low_threshold=None, high_threshold=None):
"""
Update power zone configuration based on UI settings.
The UI exposes absolute current limits for each zone, but internally
we store base_current and zone offsets. This method converts between them.
"""
# Get current values
base = LEARNING_CONFIG['initial_base_current']
zones = LEARNING_CONFIG['power_zones']
# Current absolute limits
current_low = base + zones['LOW'][2]
current_medium = base + zones['MEDIUM'][2]
current_high = base + zones['HIGH'][2]
# Current thresholds
current_low_thresh = zones['LOW'][1]
current_high_thresh = zones['MEDIUM'][1]
# Apply changes
if low_limit is not None:
# Low limit becomes the new base
new_base = float(low_limit)
# Adjust other offsets to maintain their absolute values
medium_offset = (medium_limit if medium_limit else current_medium) - new_base
high_offset = (high_limit if high_limit else current_high) - new_base
LEARNING_CONFIG['initial_base_current'] = new_base
zones['LOW'] = (zones['LOW'][0], zones['LOW'][1], 0.0)
zones['MEDIUM'] = (zones['MEDIUM'][0], zones['MEDIUM'][1], medium_offset)
zones['HIGH'] = (zones['HIGH'][0], zones['HIGH'][1], high_offset)
zones['VERY_HIGH'] = (zones['VERY_HIGH'][0], zones['VERY_HIGH'][1], high_offset)
# Update the ramp controller's model
self.ramp_controller.power_model.base_current = new_base
self.logger.info(f"Power zones updated: base={new_base}A")
if medium_limit is not None and low_limit is None:
medium_offset = float(medium_limit) - base
zones['MEDIUM'] = (zones['MEDIUM'][0], zones['MEDIUM'][1], medium_offset)
self.logger.info(f"Medium zone limit: {medium_limit}A (offset={medium_offset}A)")
if high_limit is not None and low_limit is None:
high_offset = float(high_limit) - base
zones['HIGH'] = (zones['HIGH'][0], zones['HIGH'][1], high_offset)
zones['VERY_HIGH'] = (zones['VERY_HIGH'][0], zones['VERY_HIGH'][1], high_offset)
self.logger.info(f"High zone limit: {high_limit}A (offset={high_offset}A)")
if low_threshold is not None:
# Update LOW zone upper bound and MEDIUM zone lower bound
zones['LOW'] = (0, int(low_threshold), zones['LOW'][2])
zones['MEDIUM'] = (int(low_threshold), zones['MEDIUM'][1], zones['MEDIUM'][2])
self.logger.info(f"Low/Medium threshold: {low_threshold}W")
if high_threshold is not None:
# Update MEDIUM zone upper bound and HIGH zone lower bound
zones['MEDIUM'] = (zones['MEDIUM'][0], int(high_threshold), zones['MEDIUM'][2])
zones['HIGH'] = (int(high_threshold), zones['HIGH'][1], zones['HIGH'][2])
self.logger.info(f"Medium/High threshold: {high_threshold}W")
def _setup_settings(self):
"""Set up persistent settings via Venus localsettings"""
self.settings = None
try:
settings_path = '/Settings/GeneratorRamp'
# Define settings with defaults [path, default, min, max]
settings_def = {
'InitialCurrent': [settings_path + '/InitialCurrent', 40.0, 10.0, 100.0],
'TargetCurrent': [settings_path + '/TargetCurrent', 54.0, 10.0, 100.0],
'RampDuration': [settings_path + '/RampDuration', 30, 1, 120],
'CooldownDuration': [settings_path + '/CooldownDuration', 5, 1, 30],
'Enabled': [settings_path + '/Enabled', 1, 0, 1],
# Return to prior stable current after overload if stable > N min
'ReturnToStableAfterOverload': [settings_path + '/ReturnToStableAfterOverload', 1, 0, 1],
'ReturnToStableMinMinutes': [settings_path + '/ReturnToStableMinMinutes', 30, 5, 120],
# Power zone settings (output-based input current limits)
'LowOutputLimit': [settings_path + '/LowOutputLimit', 45.0, 30.0, 100.0],
'MediumOutputLimit': [settings_path + '/MediumOutputLimit', 49.0, 30.0, 100.0],
'HighOutputLimit': [settings_path + '/HighOutputLimit', 54.0, 30.0, 100.0],
'LowOutputThreshold': [settings_path + '/LowOutputThreshold', 1500, 0, 10000],
'HighOutputThreshold': [settings_path + '/HighOutputThreshold', 2500, 0, 10000],
}
self.settings = SettingsDevice(
self.bus,
settings_def,
self._on_persistent_setting_changed
)
# Load saved settings
if self.settings:
self._load_settings()
self.logger.info("Persistent settings initialized")
except Exception as e:
self.logger.warning(f"Could not initialize persistent settings: {e}")
self.logger.warning("Settings will not persist across restarts")
def _load_settings(self):
"""Load settings from Venus localsettings"""
if not self.settings:
return
try:
RAMP_CONFIG['initial_current'] = float(self.settings['InitialCurrent'])
RAMP_CONFIG['target_current'] = float(self.settings['TargetCurrent'])
duration_min = int(self.settings['RampDuration'])
delta = RAMP_CONFIG['target_current'] - RAMP_CONFIG['initial_current']
RAMP_CONFIG['initial_ramp_rate'] = delta / duration_min if duration_min > 0 else 0.333
RAMP_CONFIG['cooldown_duration'] = int(self.settings['CooldownDuration']) * 60
self.enabled = bool(self.settings['Enabled'])
# Update D-Bus paths
self.dbus_service['/Settings/InitialCurrent'] = RAMP_CONFIG['initial_current']
self.dbus_service['/Settings/TargetCurrent'] = RAMP_CONFIG['target_current']
self.dbus_service['/Settings/RampDuration'] = duration_min
self.dbus_service['/Settings/CooldownDuration'] = int(self.settings['CooldownDuration'])
self.dbus_service['/Settings/Enabled'] = 1 if self.enabled else 0
self.dbus_service['/TargetLimit'] = RAMP_CONFIG['target_current']
RAMP_CONFIG['return_to_stable_after_overload'] = bool(
self.settings['ReturnToStableAfterOverload'])
RAMP_CONFIG['return_to_stable_min_duration'] = int(
self.settings['ReturnToStableMinMinutes']) * 60
self.dbus_service['/Settings/ReturnToStableAfterOverload'] = 1 if RAMP_CONFIG['return_to_stable_after_overload'] else 0
self.dbus_service['/Settings/ReturnToStableMinMinutes'] = RAMP_CONFIG['return_to_stable_min_duration'] // 60
# Load power zone settings
low_limit = float(self.settings['LowOutputLimit'])
medium_limit = float(self.settings['MediumOutputLimit'])
high_limit = float(self.settings['HighOutputLimit'])
low_threshold = int(self.settings['LowOutputThreshold'])
high_threshold = int(self.settings['HighOutputThreshold'])
# Update power zones configuration
self._update_power_zones(
low_limit=low_limit,
medium_limit=medium_limit,
high_limit=high_limit,
low_threshold=low_threshold,
high_threshold=high_threshold
)
# Update D-Bus paths for power zones
self.dbus_service['/Settings/LowOutputLimit'] = low_limit
self.dbus_service['/Settings/MediumOutputLimit'] = medium_limit
self.dbus_service['/Settings/HighOutputLimit'] = high_limit
self.dbus_service['/Settings/LowOutputThreshold'] = low_threshold
self.dbus_service['/Settings/HighOutputThreshold'] = high_threshold
self.logger.info(
f"Loaded settings: {RAMP_CONFIG['initial_current']}A -> "
f"{RAMP_CONFIG['target_current']}A over {duration_min}min"
)
self.logger.info(
f"Power zones: {low_limit}A (0-{low_threshold}W), "
f"{medium_limit}A ({low_threshold}-{high_threshold}W), "
f"{high_limit}A ({high_threshold}W+)"
)
except Exception as e:
self.logger.warning(f"Error loading settings: {e}")
def _on_persistent_setting_changed(self, setting, old_value, new_value):
"""Called when a persistent setting changes externally"""
self.logger.info(f"Persistent setting changed: {setting} = {new_value}")
self._load_settings()
def _init_dbus_monitors(self):
"""Initialize D-Bus service connections"""
try:
self.vebus_service = DBUS_CONFIG['vebus_service']
self.generator_service = DBUS_CONFIG['generator_service']
self.logger.info(f"Monitoring VE.Bus: {self.vebus_service}")
self.logger.info(f"Monitoring Generator: {self.generator_service}")
# Read initial values
self._read_generator_state()
self._read_ac_state()
self._read_current_limit()
# Set up signal subscriptions
self._setup_signal_subscriptions()
except dbus.exceptions.DBusException as e:
self.logger.error(f"D-Bus initialization failed: {e}")
raise
def _setup_signal_subscriptions(self):
"""Subscribe to D-Bus property changes instead of polling."""
try:
self.bus.add_signal_receiver(
self._on_generator_state_changed,
signal_name='PropertiesChanged',
dbus_interface='com.victronenergy.BusItem',
bus_name=self.generator_service,
path='/State'
)
self.logger.debug(f"Subscribed to generator state changes")
self.bus.add_signal_receiver(
self._on_ac_state_changed,
signal_name='PropertiesChanged',
dbus_interface='com.victronenergy.BusItem',
bus_name=self.vebus_service,
path='/Ac/ActiveIn/Connected'
)
self.logger.debug(f"Subscribed to AC connection changes")
except dbus.exceptions.DBusException as e:
self.logger.warning(f"Failed to set up signal subscriptions: {e}")
def _subscribe_power_readings(self):
"""Subscribe to power readings when generator is running."""
if self._power_subscriptions_active:
return
paths = [
'/Ac/ActiveIn/L1/P',
'/Ac/ActiveIn/L2/P',
'/Ac/Out/L1/P',
'/Ac/Out/L2/P',
'/Ac/In/1/CurrentLimit',
]
try:
for path in paths:
self.bus.add_signal_receiver(
lambda changes, p=path: self._on_power_changed(p, changes),
signal_name='PropertiesChanged',
dbus_interface='com.victronenergy.BusItem',
bus_name=self.vebus_service,
path=path
)
self._power_subscriptions_active = True
self.logger.debug("Subscribed to power readings")
except dbus.exceptions.DBusException as e:
self.logger.warning(f"Failed to subscribe to power readings: {e}")
def _unsubscribe_power_readings(self):
"""Mark power readings as unsubscribed (signals ignored via flag)."""
if self._power_subscriptions_active:
self._power_subscriptions_active = False
self.logger.debug("Power reading signals now ignored")
def _on_generator_state_changed(self, changes):
"""Handle generator state change signal."""
if 'Value' in changes:
self.generator_state = int(changes['Value'])
self.dbus_service['/Generator/State'] = self.generator_state
self.logger.debug(f"Generator state changed: {self.generator_state}")
def _on_ac_state_changed(self, changes):
"""Handle AC connection state change signal."""
if 'Value' in changes:
self.ac_connected = bool(changes['Value'])
self.dbus_service['/AcInput/Connected'] = 1 if self.ac_connected else 0
self.logger.debug(f"AC connected: {self.ac_connected}")
def _on_power_changed(self, path, changes):
"""Handle power reading change signal."""
if not self._power_subscriptions_active:
return
if 'Value' in changes:
value = float(changes['Value'])
if path == '/Ac/ActiveIn/L1/P':
self.current_l1_power = value
self.dbus_service['/Power/L1'] = value
elif path == '/Ac/ActiveIn/L2/P':
self.current_l2_power = value
self.dbus_service['/Power/L2'] = value
self.dbus_service['/Power/Total'] = self.current_l1_power + self.current_l2_power
elif path == '/Ac/Out/L1/P':
self.output_l1_power = value
self.dbus_service['/OutputPower/L1'] = value
elif path == '/Ac/Out/L2/P':
self.output_l2_power = value
self.dbus_service['/OutputPower/L2'] = value
total = self.output_l1_power + self.output_l2_power
self.dbus_service['/OutputPower/Total'] = total
self.ramp_controller.set_output_power(total)
elif path == '/Ac/In/1/CurrentLimit':
self.current_limit_setting = value
self.dbus_service['/CurrentLimit'] = value
def _get_proxy(self, service, path):
"""Return a cached D-Bus proxy, creating it only once per (service, path)."""
key = (service, path)
proxy = self._proxy_cache.get(key)
if proxy is not None:
return proxy
obj = self.bus.get_object(service, path, introspect=False)
self._proxy_cache[key] = obj
return obj
def _get_dbus_value(self, service, path):
"""Get a value from D-Bus service"""
try:
obj = self._get_proxy(service, path)
return obj.GetValue(dbus_interface='com.victronenergy.BusItem')
except dbus.exceptions.DBusException as e:
self._proxy_cache.pop((service, path), None)
self.logger.debug(f"Failed to get {service}{path}: {e}")
return None
def _set_dbus_value(self, service, path, value):
"""Set a value on D-Bus service"""
try:
obj = self._get_proxy(service, path)
if isinstance(value, float):
dbus_value = dbus.Double(value, variant_level=1)
elif isinstance(value, int):
dbus_value = dbus.Int32(value, variant_level=1)
else:
dbus_value = value
obj.SetValue(dbus_value, dbus_interface='com.victronenergy.BusItem')
self.logger.debug(f"Set {path} = {value}")
return True
except dbus.exceptions.DBusException as e:
self._proxy_cache.pop((service, path), None)
self.logger.error(f"Failed to set {path}: {e}")
return False
def _read_generator_state(self):
"""Read generator state from D-Bus"""
value = self._get_dbus_value(self.generator_service, '/State')
self.generator_state = int(value) if value is not None else 0
self.dbus_service['/Generator/State'] = self.generator_state
def _read_ac_state(self):
"""Read AC input connection state from D-Bus"""
value = self._get_dbus_value(self.vebus_service, '/Ac/ActiveIn/Connected')
self.ac_connected = bool(value) if value is not None else False
self.dbus_service['/AcInput/Connected'] = 1 if self.ac_connected else 0
def _read_power(self):
"""Read AC input power from D-Bus"""
value = self._get_dbus_value(self.vebus_service, '/Ac/ActiveIn/L1/P')
self.current_l1_power = float(value) if value is not None else 0
value = self._get_dbus_value(self.vebus_service, '/Ac/ActiveIn/L2/P')
self.current_l2_power = float(value) if value is not None else 0
self.dbus_service['/Power/L1'] = self.current_l1_power
self.dbus_service['/Power/L2'] = self.current_l2_power
self.dbus_service['/Power/Total'] = self.current_l1_power + self.current_l2_power
def _read_current_limit(self):
"""Read current input limit setting from D-Bus"""
value = self._get_dbus_value(self.vebus_service, '/Ac/In/1/CurrentLimit')
self.current_limit_setting = float(value) if value is not None else 0
self.dbus_service['/CurrentLimit'] = self.current_limit_setting
def _read_output_power(self):
"""Read AC output power (loads) from D-Bus"""
value = self._get_dbus_value(self.vebus_service, '/Ac/Out/L1/P')
self.output_l1_power = float(value) if value is not None else 0
value = self._get_dbus_value(self.vebus_service, '/Ac/Out/L2/P')
self.output_l2_power = float(value) if value is not None else 0
total_output = self.output_l1_power + self.output_l2_power
self.dbus_service['/OutputPower/L1'] = self.output_l1_power
self.dbus_service['/OutputPower/L2'] = self.output_l2_power
self.dbus_service['/OutputPower/Total'] = total_output
# Update ramp controller with current output power
self.ramp_controller.set_output_power(total_output)
def _set_current_limit(self, limit: float) -> bool:
"""Set the input current limit"""
limit = round(limit, 1)
self.logger.info(f"Setting current limit to {limit}A")
success = self._set_dbus_value(self.vebus_service, '/Ac/In/1/CurrentLimit', limit)
if success:
self.dbus_service['/CurrentLimit'] = limit
return success
def _transition_to(self, new_state: int):
"""Transition to a new controller state"""
old_name = self.STATE_NAMES.get(self.state, 'Unknown')
new_name = self.STATE_NAMES.get(new_state, 'Unknown')
self.logger.info(f"State: {old_name} -> {new_name}")
self.state = new_state
self.state_enter_time = time()
self.dbus_service['/State'] = new_state
if new_state == self.STATE_IDLE:
self.overload_detector.reset()
self.ramp_controller.reset()
self.dbus_service['/Ramp/Progress'] = 0
self.dbus_service['/Ramp/TimeRemaining'] = 0
def _update_ramp_progress(self):
"""Update ramp progress indicators"""
if not self.ramp_controller.is_ramping:
# When stable at target, show 100%
if self.state == self.STATE_STABLE:
self.dbus_service['/Ramp/Progress'] = 100
self.dbus_service['/Ramp/TimeRemaining'] = 0
return
current = self.ramp_controller.current_limit
initial = RAMP_CONFIG['initial_current']
target = self.ramp_controller.state.target_limit
if target > initial:
progress = int(100 * (current - initial) / (target - initial))
progress = max(0, min(100, progress))
else:
progress = 100
self.dbus_service['/Ramp/Progress'] = progress
if self.ramp_controller.state.is_recovery:
rate = RAMP_CONFIG['recovery_ramp_rate']
else:
rate = RAMP_CONFIG['initial_ramp_rate']
remaining_amps = target - current
if rate > 0:
remaining_seconds = int(remaining_amps / rate * 60)
else:
remaining_seconds = 0
self.dbus_service['/Ramp/TimeRemaining'] = max(0, remaining_seconds)
def _main_loop(self) -> bool:
"""Main control loop - called periodically by GLib."""
try:
now = time()
# Check if enabled
if not self.enabled:
return True
# Conditional D-Bus reading based on state
if self.state == self.STATE_IDLE:
self._read_generator_state()
self._unsubscribe_power_readings()
else:
self._subscribe_power_readings()
if self.state == self.STATE_WARMUP and (now - self.state_enter_time) < 0.1:
self._read_ac_state()
self._read_power()
self._read_output_power()
self._read_current_limit()
# Accumulate energy while generator is providing AC power
if self.ac_connected and self.generator_state in [
GENERATOR_STATE['RUNNING'], GENERATOR_STATE['WARMUP'], GENERATOR_STATE['COOLDOWN']
]:
total_power = self.current_l1_power + self.current_l2_power
if self._energy_last_time is not None and total_power > 0:
dt_hours = (now - self._energy_last_time) / 3600.0
self._run_energy_wh += total_power * dt_hours
self.dbus_service['/Energy/CurrentRun'] = round(self._run_energy_wh / 1000.0, 2)
self._energy_last_time = now
else:
self._energy_last_time = None
# Check for generator stop
if self.generator_state == GENERATOR_STATE['STOPPED']:
if self.state != self.STATE_IDLE:
if self._run_energy_wh > 0:
self._last_run_energy_wh = self._run_energy_wh
self.dbus_service['/Energy/LastRun'] = round(self._last_run_energy_wh / 1000.0, 2)
self.logger.info(f"Run energy: {self._last_run_energy_wh / 1000.0:.2f} kWh")
self._run_energy_wh = 0.0
self.dbus_service['/Energy/CurrentRun'] = 0.0
self.logger.info("Generator stopped")
self._transition_to(self.STATE_IDLE)
return True
# Run overload detection
is_overload = False
if self.ac_connected and self.state in [self.STATE_RAMPING, self.STATE_RECOVERY, self.STATE_STABLE]:
total_output_power = self.output_l1_power + self.output_l2_power
is_overload, diag = self.overload_detector.update(
self.current_l1_power,
self.current_l2_power,
now,
output_power=total_output_power
)
if 'reversals' in diag:
self.dbus_service['/Detection/Reversals'] = diag.get('reversals', 0)
self.dbus_service['/Detection/StdDev'] = diag.get('std_dev', 0)
self.dbus_service['/Detection/IsOverload'] = 1 if is_overload else 0
self.dbus_service['/Detection/OutputPowerOk'] = 1 if diag.get('output_power_ok', True) else 0
self.dbus_service['/Detection/Trend'] = diag.get('trend', 0.0)
self.dbus_service['/Detection/TrendOk'] = 1 if diag.get('trend_ok', True) else 0
self.dbus_service['/Detection/PowerDrop'] = diag.get('power_drop', 0.0)
self.dbus_service['/Detection/PowerDropOk'] = 1 if diag.get('power_drop_ok', True) else 0
# State machine
if self.state == self.STATE_IDLE:
self._handle_idle(now)
elif self.state == self.STATE_WARMUP:
self._handle_warmup(now)
elif self.state == self.STATE_RAMPING:
self._handle_ramping(now, is_overload)
elif self.state == self.STATE_COOLDOWN:
self._handle_cooldown(now)
elif self.state == self.STATE_RECOVERY:
self._handle_recovery(now, is_overload)
elif self.state == self.STATE_STABLE:
self._handle_stable(now, is_overload)
# Update progress
self._update_ramp_progress()
# Update status
status = self.ramp_controller.get_status()
self.dbus_service['/RecoveryTarget'] = status['recovery_target']
self.dbus_service['/OverloadCount'] = status['overload_count']
self.dbus_service['/LastStableCurrent'] = status['last_stable']
# Update fast recovery status
self.dbus_service['/Recovery/InFastRamp'] = 1 if status.get('in_fast_ramp') else 0
self.dbus_service['/Recovery/FastRampTarget'] = status.get('fast_ramp_target') or 0.0
self.dbus_service['/Recovery/RapidOverloadCount'] = status.get('rapid_overload_count', 0)
# Update learning model status
power_model = status.get('power_model', {})
self.dbus_service['/Learning/Confidence'] = power_model.get('confidence', 0)
self.dbus_service['/Learning/ConfidenceLevel'] = power_model.get('confidence_level', 'LOW')
self.dbus_service['/Learning/BaseCurrent'] = power_model.get('base_current', 42.0)
self.dbus_service['/Learning/SuggestedLimit'] = status.get('suggested_limit', 50.0)
self.dbus_service['/Learning/DataPoints'] = power_model.get('data_points', 0)
self.dbus_service['/Learning/OverloadPoints'] = power_model.get('overload_points', 0)
except Exception as e:
self.logger.error(f"Main loop error: {e}", exc_info=True)
return True
def _handle_idle(self, now: float):
"""IDLE: Wait for generator to start warm-up"""
if self.generator_state == GENERATOR_STATE['WARMUP']:
self.logger.info("Generator warm-up detected")
new_limit = self.ramp_controller.set_initial_limit(self.current_limit_setting)
self._set_current_limit(new_limit)
self._transition_to(self.STATE_WARMUP)
elif self.generator_state == GENERATOR_STATE['RUNNING'] and self.ac_connected:
self.logger.info("Generator already running with AC connected")
new_limit = self.ramp_controller.set_initial_limit(self.current_limit_setting)
self._set_current_limit(new_limit)
self.ramp_controller.start_ramp(now)
self.overload_detector.set_ramp_start(now)
self._transition_to(self.STATE_RAMPING)
def _handle_warmup(self, now: float):
"""WARMUP: Generator warming up, AC not yet connected"""
if self.generator_state == GENERATOR_STATE['RUNNING'] and self.ac_connected:
self.logger.info("Warm-up complete, AC connected - starting ramp")
self.ramp_controller.start_ramp(now)
self.overload_detector.set_ramp_start(now)
self._transition_to(self.STATE_RAMPING)
elif self.generator_state not in [GENERATOR_STATE['WARMUP'], GENERATOR_STATE['RUNNING']]:
self._transition_to(self.STATE_IDLE)
def _handle_ramping(self, now: float, is_overload: bool):
"""RAMPING: Increasing current limit"""
if is_overload:
self._handle_overload_event(now)
return
new_limit = self.ramp_controller.update(now)
if new_limit is not None:
self._set_current_limit(new_limit)
if not self.ramp_controller.is_ramping:
self.logger.info("Ramp complete, entering stable state")
self._transition_to(self.STATE_STABLE)
def _handle_cooldown(self, now: float):
"""COOLDOWN: Waiting at initial current after overload"""
elapsed = now - self.state_enter_time
remaining = RAMP_CONFIG['cooldown_duration'] - elapsed
self.dbus_service['/Ramp/TimeRemaining'] = max(0, int(remaining))
if elapsed >= RAMP_CONFIG['cooldown_duration']:
self.logger.info("Cooldown complete, starting recovery ramp")
self.ramp_controller.start_ramp(now)
self.overload_detector.set_ramp_start(now)
self._transition_to(self.STATE_RECOVERY)
def _handle_recovery(self, now: float, is_overload: bool):
"""RECOVERY: Ramping back up after overload"""
if is_overload:
self._handle_overload_event(now)
return
# Check if output power increased - may allow higher target
result = self.ramp_controller.check_output_power_increase(now)
if result:
self.logger.info(
f"Output power increase detected during recovery: "
f"target raised to {result['new_target']}A"
)
new_limit = self.ramp_controller.update(now)
if new_limit is not None:
self._set_current_limit(new_limit)
if not self.ramp_controller.is_ramping:
self.logger.info("Recovery complete, entering stable state")
self._transition_to(self.STATE_STABLE)
def _handle_stable(self, now: float, is_overload: bool):
"""STABLE: Maintaining current limit, monitoring for overload"""
if is_overload:
self._handle_overload_event(now)
return
# Check if output power increased - may allow higher target
result = self.ramp_controller.check_output_power_increase(now)
if result and result.get('should_ramp'):
self.logger.info(
f"Output power increased to {result['output_power']:.0f}W, "
f"raising target to {result['new_target']}A and resuming ramp"
)
self.ramp_controller.start_ramp(now)
self.overload_detector.set_ramp_start(now)
self._transition_to(self.STATE_RECOVERY)
return
if self.ramp_controller.check_history_clear(now):
self.logger.info("Overload history cleared after stable operation")
if self.ramp_controller.should_retry_full_power():
self.logger.info("Attempting full power ramp")
self.ramp_controller.start_ramp(now)
self.overload_detector.set_ramp_start(now)
self._transition_to(self.STATE_RAMPING)
def _handle_overload_event(self, now: float):
"""Handle an overload detection"""
output_power = self.output_l1_power + self.output_l2_power
current_limit = self.current_limit_setting
# Dump verbose debug info BEFORE resetting the detector
self.overload_detector.dump_overload_debug(
current_limit=current_limit,
output_power=output_power
)
result = self.ramp_controller.handle_overload(now, output_power=output_power)
rapid_info = ""
if result.get('is_rapid_overload'):
rapid_info = f" [RAPID #{result['rapid_overload_count']}]"
self.logger.warning(
f"Overload #{result['overload_count']}{rapid_info}: "
f"rolling back to {result['new_limit']}A, "
f"recovery target: {result['recovery_target']}A, "
f"fast target: {result['fast_recovery_target']:.1f}A "
f"(output: {output_power:.0f}W)"
)
self._set_current_limit(result['new_limit'])
self.overload_detector.reset()
self._transition_to(self.STATE_COOLDOWN)
def main():
"""Main entry point"""
DBusGMainLoop(set_as_default=True)
print("=" * 60)
print(f"Generator Current Ramp Controller v{VERSION}")
print("=" * 60)
mainloop = None
def signal_handler(signum, frame):
"""Handle shutdown signals gracefully"""
sig_name = signal.Signals(signum).name
logging.info(f"Received {sig_name}, shutting down...")
if mainloop is not None:
mainloop.quit()
# Register signal handlers for graceful shutdown
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
try:
controller = GeneratorRampController()
mainloop = GLib.MainLoop()
mainloop.run()
except KeyboardInterrupt:
print("\nShutdown requested")
except Exception as e:
logging.error(f"Fatal error: {e}", exc_info=True)
sys.exit(1)
finally:
logging.info("Service stopped")
if __name__ == '__main__':
main()