#!/usr/bin/env python3 """ Venus OS VRM History Service Proxies VRM API historical data requests through D-Bus/MQTT. Data is requested by the frontend via MQTT W/ topics, fetched from the VRM cloud API, and published back as JSON strings on N/ topics. Service name: com.victronenergy.vrmhistory MQTT paths: N/{portalId}/vrmhistory/0/... """ import sys import os import json import logging import signal import threading from time import time, sleep sys.path.insert(1, os.path.join(os.path.dirname(__file__), 'ext', 'velib_python')) sys.path.insert(1, '/opt/victronenergy/velib_python') try: from gi.repository import GLib except ImportError: print("ERROR: GLib not available. This script must run on Venus OS.") sys.exit(1) try: import dbus from dbus.mainloop.glib import DBusGMainLoop from vedbus import VeDbusService from settingsdevice import SettingsDevice except ImportError as e: print(f"ERROR: Required module not available: {e}") sys.exit(1) from config import DBUS_CONFIG, VRM_CONFIG, LOGGING_CONFIG, STATUS from vrm_client import VrmClient VERSION = '1.0.0' SERVICE_NAME = DBUS_CONFIG['service_name'] class VrmHistoryService: def __init__(self): self._setup_logging() self.logger = logging.getLogger('VrmHistory') self.logger.info(f"Initializing VRM History Service v{VERSION}") self.bus = dbus.SystemBus() self.vrm_client = None self.portal_id = None self._fetch_lock = threading.Lock() self._read_portal_id() self._create_dbus_service() self._setup_settings() self._init_client() refresh_ms = VRM_CONFIG['refresh_interval_minutes'] * 60 * 1000 GLib.timeout_add(refresh_ms, self._auto_refresh) self.logger.info("VRM History Service initialized") def _setup_logging(self): level = getattr(logging, LOGGING_CONFIG['level'], logging.INFO) fmt = '%(levelname)s %(name)s: %(message)s' if LOGGING_CONFIG['include_timestamp']: fmt = '%(asctime)s ' + fmt logging.basicConfig(level=level, format=fmt, stream=sys.stdout) def _read_portal_id(self): try: with open('/data/venus/unique-id', 'r') as f: self.portal_id = f.read().strip() self.logger.info(f"Portal ID: {self.portal_id}") except FileNotFoundError: try: bus = dbus.SystemBus() obj = bus.get_object('com.victronenergy.platform', '/Vrmportalid') self.portal_id = str(obj.GetValue()) self.logger.info(f"Portal ID from dbus: {self.portal_id}") except Exception as e: self.logger.warning(f"Could not read portal ID: {e}") self.portal_id = None def _create_dbus_service(self): self.logger.info(f"Creating D-Bus service: {SERVICE_NAME}") max_retries = 5 retry_delay = 1.0 for attempt in range(max_retries): try: self.dbus_service = VeDbusService(SERVICE_NAME, self.bus, register=False) break except dbus.exceptions.NameExistsException: if attempt < max_retries - 1: self.logger.warning(f"D-Bus name exists, retrying ({attempt + 1}/{max_retries})") sleep(retry_delay) retry_delay *= 2 else: raise svc = self.dbus_service svc.add_path('/Mgmt/ProcessName', 'dbus-vrm-history') svc.add_path('/Mgmt/ProcessVersion', VERSION) svc.add_path('/Mgmt/Connection', 'local') svc.add_path('/DeviceInstance', 0) svc.add_path('/ProductId', 0xFFFE) svc.add_path('/ProductName', 'VRM History Proxy') svc.add_path('/FirmwareVersion', VERSION) svc.add_path('/Connected', 1) svc.add_path('/Status', STATUS['UNCONFIGURED']) svc.add_path('/Error', '') svc.add_path('/LastFetchTime', 0) svc.add_path('/Config/Token', '', writeable=True, onchangecallback=self._on_token_changed) svc.add_path('/Config/SiteId', 0, writeable=True, onchangecallback=self._on_setting_changed) svc.add_path('/Config/RefreshInterval', VRM_CONFIG['refresh_interval_minutes'], writeable=True, onchangecallback=self._on_setting_changed) for req_path in ['/Request/Soc', '/Request/Consumption', '/Request/GeneratorRuntime', '/Request/BatteryCycles']: svc.add_path(req_path, '', writeable=True, onchangecallback=self._on_request) svc.add_path('/History/Soc', '') svc.add_path('/History/Consumption', '') svc.add_path('/History/GeneratorRuntime', '') svc.add_path('/History/BatteryCycles', '') # Register service after all paths added (Venus OS best practice) svc.register() def _setup_settings(self): try: base = DBUS_CONFIG['settings_path'] settings_def = { 'Token': [base + '/Token', '', 0, 0], 'SiteId': [base + '/SiteId', 0, 0, 999999999], 'RefreshInterval': [base + '/RefreshInterval', VRM_CONFIG['refresh_interval_minutes'], 1, 1440], } self.settings = SettingsDevice(self.bus, settings_def, self._on_persistent_setting_changed) token = self.settings['Token'] site_id = self.settings['SiteId'] refresh = self.settings['RefreshInterval'] if token: self.dbus_service['/Config/Token'] = token if site_id: self.dbus_service['/Config/SiteId'] = site_id if refresh: self.dbus_service['/Config/RefreshInterval'] = refresh self.logger.info("Persistent settings loaded") except Exception as e: self.logger.warning(f"Could not init persistent settings: {e}") self.settings = None def _on_persistent_setting_changed(self, name, old, new): self.logger.info(f"Setting changed: {name} = {new}") def _init_client(self): token = self.dbus_service['/Config/Token'] site_id = self.dbus_service['/Config/SiteId'] if not token: self.dbus_service['/Status'] = STATUS['UNCONFIGURED'] return self.vrm_client = VrmClient(token, site_id if site_id else None) if not site_id and self.portal_id: self._discover_site_id() elif site_id: self.vrm_client.site_id = site_id self.dbus_service['/Status'] = STATUS['READY'] else: self.dbus_service['/Status'] = STATUS['ERROR'] self.dbus_service['/Error'] = 'No portal ID available for auto-discovery' def _discover_site_id(self): def _do_discover(): try: GLib.idle_add(lambda: self._set_status(STATUS['DISCOVERING'])) site_id = self.vrm_client.discover_site_id(self.portal_id) self.vrm_client.site_id = site_id def _apply(sid): self.dbus_service['/Config/SiteId'] = sid if self.settings: self.settings['SiteId'] = sid self._set_status(STATUS['READY']) self.logger.info(f"Auto-discovered site ID: {sid}") return False GLib.idle_add(_apply, site_id) except Exception as e: self.logger.error(f"Site ID discovery failed: {e}") GLib.idle_add(lambda: self._set_error(str(e))) t = threading.Thread(target=_do_discover, daemon=True) t.start() def _set_status(self, status): self.dbus_service['/Status'] = status if status != STATUS['ERROR']: self.dbus_service['/Error'] = '' return False def _set_error(self, msg): self.dbus_service['/Status'] = STATUS['ERROR'] self.dbus_service['/Error'] = msg return False def _on_token_changed(self, path, value): self.logger.info("Token updated") if self.settings: self.settings['Token'] = value self.vrm_client = VrmClient(value) site_id = self.dbus_service['/Config/SiteId'] if site_id: self.vrm_client.site_id = site_id self.dbus_service['/Status'] = STATUS['READY'] elif self.portal_id: self._discover_site_id() return True def _on_setting_changed(self, path, value): if path == '/Config/SiteId' and self.vrm_client: self.vrm_client.site_id = value if self.settings: self.settings['SiteId'] = value if value: self.dbus_service['/Status'] = STATUS['READY'] elif path == '/Config/RefreshInterval': if self.settings: self.settings['RefreshInterval'] = value return True def _on_request(self, path, value): if not value: return True request_map = { '/Request/Soc': self._fetch_soc, '/Request/Consumption': self._fetch_consumption, '/Request/GeneratorRuntime': self._fetch_generator_runtime, '/Request/BatteryCycles': self._fetch_battery_cycles, } handler = request_map.get(path) if not handler: return True try: params = json.loads(value) if isinstance(value, str) else value except (json.JSONDecodeError, TypeError): self.logger.error(f"Invalid request JSON on {path}: {value}") return True def _bg(): with self._fetch_lock: try: GLib.idle_add(lambda: self._set_status(STATUS['FETCHING'])) handler(params) GLib.idle_add(self._fetch_complete) except Exception as e: self.logger.error(f"Fetch error for {path}: {e}") GLib.idle_add(lambda: self._set_error(str(e))) t = threading.Thread(target=_bg, daemon=True) t.start() return True def _fetch_complete(self): self.dbus_service['/Status'] = STATUS['READY'] self.dbus_service['/LastFetchTime'] = int(time()) return False def _fetch_soc(self, params): start = params.get('start', int(time()) - 86400) end = params.get('end', int(time())) resp = self.vrm_client.get_stats('live_feed', ['bs'], start, end) records = resp.get('records', {}) bs_data = records.get('bs', []) result = [{"ts": int(pt[0]), "soc": round(pt[1], 1)} for pt in bs_data if len(pt) >= 2] data = json.dumps(result) GLib.idle_add(lambda: self._publish('/History/Soc', data)) def _fetch_consumption(self, params): start = params.get('start', int(time()) - 604800) end = params.get('end', int(time())) interval = params.get('interval', 'days') resp = self.vrm_client.get_stats('kwh', ['kwh'], start, end, interval) records = resp.get('records', {}) kwh_data = records.get('kwh', []) result = [] for pt in kwh_data: if len(pt) < 2: continue entry = { "ts": int(pt[0]), "genKwh": round(pt[1] if len(pt) > 1 else 0, 2), "gridKwh": round(pt[2] if len(pt) > 2 else 0, 2), "totalKwh": round((pt[1] if len(pt) > 1 else 0) + (pt[2] if len(pt) > 2 else 0), 2), } result.append(entry) data = json.dumps(result) GLib.idle_add(lambda: self._publish('/History/Consumption', data)) def _fetch_generator_runtime(self, params): start = params.get('start', int(time()) - 604800) end = params.get('end', int(time())) try: resp = self.vrm_client.get_widget('HistoricData') hist = resp.get('records', {}).get('data', []) result = [] for day in hist: ts = day.get('timestamp', 0) if ts < start or ts > end: continue hours = day.get('kwh', {}).get('generator_runtime_hours', 0) starts = day.get('kwh', {}).get('generator_starts', 0) result.append({"ts": ts, "hours": round(hours, 1), "starts": int(starts)}) except Exception: result = [] data = json.dumps(result) GLib.idle_add(lambda: self._publish('/History/GeneratorRuntime', data)) def _fetch_battery_cycles(self, params): start = params.get('start', int(time()) - 604800) end = params.get('end', int(time())) try: resp = self.vrm_client.get_widget('HistoricData') hist = resp.get('records', {}).get('data', []) result = [] for day in hist: ts = day.get('timestamp', 0) if ts < start or ts > end: continue charge = day.get('kwh', {}).get('battery_charge', 0) discharge = day.get('kwh', {}).get('battery_discharge', 0) result.append({"ts": ts, "chargeKwh": round(charge, 2), "dischargeKwh": round(discharge, 2)}) except Exception: result = [] data = json.dumps(result) GLib.idle_add(lambda: self._publish('/History/BatteryCycles', data)) def _publish(self, path, value): self.dbus_service[path] = value return False def _auto_refresh(self): if self.dbus_service['/Status'] == STATUS['UNCONFIGURED']: return True if not self.vrm_client or not self.vrm_client.site_id: return True self.logger.info("Auto-refresh triggered") now = int(time()) def _bg(): with self._fetch_lock: try: GLib.idle_add(lambda: self._set_status(STATUS['FETCHING'])) self._fetch_soc({'start': now - 86400, 'end': now}) self._fetch_consumption({'start': now - 604800, 'end': now, 'interval': 'days'}) self._fetch_generator_runtime({'start': now - 604800, 'end': now}) self._fetch_battery_cycles({'start': now - 604800, 'end': now}) GLib.idle_add(self._fetch_complete) except Exception as e: self.logger.error(f"Auto-refresh error: {e}") GLib.idle_add(lambda: self._set_error(str(e))) t = threading.Thread(target=_bg, daemon=True) t.start() return True def main(): DBusGMainLoop(set_as_default=True) service = VrmHistoryService() def signal_handler(signum, frame): logging.getLogger('VrmHistory').info("Received signal, exiting...") mainloop.quit() signal.signal(signal.SIGTERM, signal_handler) signal.signal(signal.SIGINT, signal_handler) mainloop = GLib.MainLoop() logging.getLogger('VrmHistory').info("Starting main loop") mainloop.run() if __name__ == '__main__': main()