Files
venus/dbus-vrm-history/dbus-vrm-history.py
Paul G dd566f6975 Apply Venus OS best practices across all services
- Standardize multilog to s25000 n4 (~100KB cap) to prevent filling /data
- Use VeDbusService register=False + deferred register() in dbus-generator-ramp and dbus-vrm-history
- Add @exit_on_error decorator to template GLib callback
- Document best practices in README and template README

Made-with: Cursor
2026-03-16 18:40:54 +00:00

423 lines
15 KiB
Python

#!/usr/bin/env python3
"""
Venus OS VRM History Service
Proxies VRM API historical data requests through D-Bus/MQTT.
Data is requested by the frontend via MQTT W/ topics, fetched from the
VRM cloud API, and published back as JSON strings on N/ topics.
Service name: com.victronenergy.vrmhistory
MQTT paths: N/{portalId}/vrmhistory/0/...
"""
import sys
import os
import json
import logging
import signal
import threading
from time import time, sleep
sys.path.insert(1, os.path.join(os.path.dirname(__file__), 'ext', 'velib_python'))
sys.path.insert(1, '/opt/victronenergy/velib_python')
try:
from gi.repository import GLib
except ImportError:
print("ERROR: GLib not available. This script must run on Venus OS.")
sys.exit(1)
try:
import dbus
from dbus.mainloop.glib import DBusGMainLoop
from vedbus import VeDbusService
from settingsdevice import SettingsDevice
except ImportError as e:
print(f"ERROR: Required module not available: {e}")
sys.exit(1)
from config import DBUS_CONFIG, VRM_CONFIG, LOGGING_CONFIG, STATUS
from vrm_client import VrmClient
VERSION = '1.0.0'
SERVICE_NAME = DBUS_CONFIG['service_name']
class VrmHistoryService:
def __init__(self):
self._setup_logging()
self.logger = logging.getLogger('VrmHistory')
self.logger.info(f"Initializing VRM History Service v{VERSION}")
self.bus = dbus.SystemBus()
self.vrm_client = None
self.portal_id = None
self._fetch_lock = threading.Lock()
self._read_portal_id()
self._create_dbus_service()
self._setup_settings()
self._init_client()
refresh_ms = VRM_CONFIG['refresh_interval_minutes'] * 60 * 1000
GLib.timeout_add(refresh_ms, self._auto_refresh)
self.logger.info("VRM History Service initialized")
def _setup_logging(self):
level = getattr(logging, LOGGING_CONFIG['level'], logging.INFO)
fmt = '%(levelname)s %(name)s: %(message)s'
if LOGGING_CONFIG['include_timestamp']:
fmt = '%(asctime)s ' + fmt
logging.basicConfig(level=level, format=fmt, stream=sys.stdout)
def _read_portal_id(self):
try:
with open('/data/venus/unique-id', 'r') as f:
self.portal_id = f.read().strip()
self.logger.info(f"Portal ID: {self.portal_id}")
except FileNotFoundError:
try:
bus = dbus.SystemBus()
obj = bus.get_object('com.victronenergy.platform', '/Vrmportalid')
self.portal_id = str(obj.GetValue())
self.logger.info(f"Portal ID from dbus: {self.portal_id}")
except Exception as e:
self.logger.warning(f"Could not read portal ID: {e}")
self.portal_id = None
def _create_dbus_service(self):
self.logger.info(f"Creating D-Bus service: {SERVICE_NAME}")
max_retries = 5
retry_delay = 1.0
for attempt in range(max_retries):
try:
self.dbus_service = VeDbusService(SERVICE_NAME, self.bus, register=False)
break
except dbus.exceptions.NameExistsException:
if attempt < max_retries - 1:
self.logger.warning(f"D-Bus name exists, retrying ({attempt + 1}/{max_retries})")
sleep(retry_delay)
retry_delay *= 2
else:
raise
svc = self.dbus_service
svc.add_path('/Mgmt/ProcessName', 'dbus-vrm-history')
svc.add_path('/Mgmt/ProcessVersion', VERSION)
svc.add_path('/Mgmt/Connection', 'local')
svc.add_path('/DeviceInstance', 0)
svc.add_path('/ProductId', 0xFFFE)
svc.add_path('/ProductName', 'VRM History Proxy')
svc.add_path('/FirmwareVersion', VERSION)
svc.add_path('/Connected', 1)
svc.add_path('/Status', STATUS['UNCONFIGURED'])
svc.add_path('/Error', '')
svc.add_path('/LastFetchTime', 0)
svc.add_path('/Config/Token', '', writeable=True,
onchangecallback=self._on_token_changed)
svc.add_path('/Config/SiteId', 0, writeable=True,
onchangecallback=self._on_setting_changed)
svc.add_path('/Config/RefreshInterval',
VRM_CONFIG['refresh_interval_minutes'],
writeable=True, onchangecallback=self._on_setting_changed)
for req_path in ['/Request/Soc', '/Request/Consumption',
'/Request/GeneratorRuntime', '/Request/BatteryCycles']:
svc.add_path(req_path, '', writeable=True,
onchangecallback=self._on_request)
svc.add_path('/History/Soc', '')
svc.add_path('/History/Consumption', '')
svc.add_path('/History/GeneratorRuntime', '')
svc.add_path('/History/BatteryCycles', '')
# Register service after all paths added (Venus OS best practice)
svc.register()
def _setup_settings(self):
try:
base = DBUS_CONFIG['settings_path']
settings_def = {
'Token': [base + '/Token', '', 0, 0],
'SiteId': [base + '/SiteId', 0, 0, 999999999],
'RefreshInterval': [base + '/RefreshInterval',
VRM_CONFIG['refresh_interval_minutes'], 1, 1440],
}
self.settings = SettingsDevice(self.bus, settings_def,
self._on_persistent_setting_changed)
token = self.settings['Token']
site_id = self.settings['SiteId']
refresh = self.settings['RefreshInterval']
if token:
self.dbus_service['/Config/Token'] = token
if site_id:
self.dbus_service['/Config/SiteId'] = site_id
if refresh:
self.dbus_service['/Config/RefreshInterval'] = refresh
self.logger.info("Persistent settings loaded")
except Exception as e:
self.logger.warning(f"Could not init persistent settings: {e}")
self.settings = None
def _on_persistent_setting_changed(self, name, old, new):
self.logger.info(f"Setting changed: {name} = {new}")
def _init_client(self):
token = self.dbus_service['/Config/Token']
site_id = self.dbus_service['/Config/SiteId']
if not token:
self.dbus_service['/Status'] = STATUS['UNCONFIGURED']
return
self.vrm_client = VrmClient(token, site_id if site_id else None)
if not site_id and self.portal_id:
self._discover_site_id()
elif site_id:
self.vrm_client.site_id = site_id
self.dbus_service['/Status'] = STATUS['READY']
else:
self.dbus_service['/Status'] = STATUS['ERROR']
self.dbus_service['/Error'] = 'No portal ID available for auto-discovery'
def _discover_site_id(self):
def _do_discover():
try:
GLib.idle_add(lambda: self._set_status(STATUS['DISCOVERING']))
site_id = self.vrm_client.discover_site_id(self.portal_id)
self.vrm_client.site_id = site_id
def _apply(sid):
self.dbus_service['/Config/SiteId'] = sid
if self.settings:
self.settings['SiteId'] = sid
self._set_status(STATUS['READY'])
self.logger.info(f"Auto-discovered site ID: {sid}")
return False
GLib.idle_add(_apply, site_id)
except Exception as e:
self.logger.error(f"Site ID discovery failed: {e}")
GLib.idle_add(lambda: self._set_error(str(e)))
t = threading.Thread(target=_do_discover, daemon=True)
t.start()
def _set_status(self, status):
self.dbus_service['/Status'] = status
if status != STATUS['ERROR']:
self.dbus_service['/Error'] = ''
return False
def _set_error(self, msg):
self.dbus_service['/Status'] = STATUS['ERROR']
self.dbus_service['/Error'] = msg
return False
def _on_token_changed(self, path, value):
self.logger.info("Token updated")
if self.settings:
self.settings['Token'] = value
self.vrm_client = VrmClient(value)
site_id = self.dbus_service['/Config/SiteId']
if site_id:
self.vrm_client.site_id = site_id
self.dbus_service['/Status'] = STATUS['READY']
elif self.portal_id:
self._discover_site_id()
return True
def _on_setting_changed(self, path, value):
if path == '/Config/SiteId' and self.vrm_client:
self.vrm_client.site_id = value
if self.settings:
self.settings['SiteId'] = value
if value:
self.dbus_service['/Status'] = STATUS['READY']
elif path == '/Config/RefreshInterval':
if self.settings:
self.settings['RefreshInterval'] = value
return True
def _on_request(self, path, value):
if not value:
return True
request_map = {
'/Request/Soc': self._fetch_soc,
'/Request/Consumption': self._fetch_consumption,
'/Request/GeneratorRuntime': self._fetch_generator_runtime,
'/Request/BatteryCycles': self._fetch_battery_cycles,
}
handler = request_map.get(path)
if not handler:
return True
try:
params = json.loads(value) if isinstance(value, str) else value
except (json.JSONDecodeError, TypeError):
self.logger.error(f"Invalid request JSON on {path}: {value}")
return True
def _bg():
with self._fetch_lock:
try:
GLib.idle_add(lambda: self._set_status(STATUS['FETCHING']))
handler(params)
GLib.idle_add(self._fetch_complete)
except Exception as e:
self.logger.error(f"Fetch error for {path}: {e}")
GLib.idle_add(lambda: self._set_error(str(e)))
t = threading.Thread(target=_bg, daemon=True)
t.start()
return True
def _fetch_complete(self):
self.dbus_service['/Status'] = STATUS['READY']
self.dbus_service['/LastFetchTime'] = int(time())
return False
def _fetch_soc(self, params):
start = params.get('start', int(time()) - 86400)
end = params.get('end', int(time()))
resp = self.vrm_client.get_stats('live_feed', ['bs'], start, end)
records = resp.get('records', {})
bs_data = records.get('bs', [])
result = [{"ts": int(pt[0]), "soc": round(pt[1], 1)}
for pt in bs_data if len(pt) >= 2]
data = json.dumps(result)
GLib.idle_add(lambda: self._publish('/History/Soc', data))
def _fetch_consumption(self, params):
start = params.get('start', int(time()) - 604800)
end = params.get('end', int(time()))
interval = params.get('interval', 'days')
resp = self.vrm_client.get_stats('kwh', ['kwh'], start, end, interval)
records = resp.get('records', {})
kwh_data = records.get('kwh', [])
result = []
for pt in kwh_data:
if len(pt) < 2:
continue
entry = {
"ts": int(pt[0]),
"genKwh": round(pt[1] if len(pt) > 1 else 0, 2),
"gridKwh": round(pt[2] if len(pt) > 2 else 0, 2),
"totalKwh": round((pt[1] if len(pt) > 1 else 0) +
(pt[2] if len(pt) > 2 else 0), 2),
}
result.append(entry)
data = json.dumps(result)
GLib.idle_add(lambda: self._publish('/History/Consumption', data))
def _fetch_generator_runtime(self, params):
start = params.get('start', int(time()) - 604800)
end = params.get('end', int(time()))
try:
resp = self.vrm_client.get_widget('HistoricData')
hist = resp.get('records', {}).get('data', [])
result = []
for day in hist:
ts = day.get('timestamp', 0)
if ts < start or ts > end:
continue
hours = day.get('kwh', {}).get('generator_runtime_hours', 0)
starts = day.get('kwh', {}).get('generator_starts', 0)
result.append({"ts": ts, "hours": round(hours, 1),
"starts": int(starts)})
except Exception:
result = []
data = json.dumps(result)
GLib.idle_add(lambda: self._publish('/History/GeneratorRuntime', data))
def _fetch_battery_cycles(self, params):
start = params.get('start', int(time()) - 604800)
end = params.get('end', int(time()))
try:
resp = self.vrm_client.get_widget('HistoricData')
hist = resp.get('records', {}).get('data', [])
result = []
for day in hist:
ts = day.get('timestamp', 0)
if ts < start or ts > end:
continue
charge = day.get('kwh', {}).get('battery_charge', 0)
discharge = day.get('kwh', {}).get('battery_discharge', 0)
result.append({"ts": ts, "chargeKwh": round(charge, 2),
"dischargeKwh": round(discharge, 2)})
except Exception:
result = []
data = json.dumps(result)
GLib.idle_add(lambda: self._publish('/History/BatteryCycles', data))
def _publish(self, path, value):
self.dbus_service[path] = value
return False
def _auto_refresh(self):
if self.dbus_service['/Status'] == STATUS['UNCONFIGURED']:
return True
if not self.vrm_client or not self.vrm_client.site_id:
return True
self.logger.info("Auto-refresh triggered")
now = int(time())
def _bg():
with self._fetch_lock:
try:
GLib.idle_add(lambda: self._set_status(STATUS['FETCHING']))
self._fetch_soc({'start': now - 86400, 'end': now})
self._fetch_consumption({'start': now - 604800, 'end': now,
'interval': 'days'})
self._fetch_generator_runtime({'start': now - 604800, 'end': now})
self._fetch_battery_cycles({'start': now - 604800, 'end': now})
GLib.idle_add(self._fetch_complete)
except Exception as e:
self.logger.error(f"Auto-refresh error: {e}")
GLib.idle_add(lambda: self._set_error(str(e)))
t = threading.Thread(target=_bg, daemon=True)
t.start()
return True
def main():
DBusGMainLoop(set_as_default=True)
service = VrmHistoryService()
def signal_handler(signum, frame):
logging.getLogger('VrmHistory').info("Received signal, exiting...")
mainloop.quit()
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
mainloop = GLib.MainLoop()
logging.getLogger('VrmHistory').info("Starting main loop")
mainloop.run()
if __name__ == '__main__':
main()