""" Shared signal-based D-Bus readers for Venus OS services. Provides GPS, meteo, and depth readers that use PropertiesChanged signal subscriptions instead of polling via GetValue(). Each reader discovers its service dynamically, seeds its cache with a one-shot GetValue, then relies on signals for all subsequent updates. Usage: from signal_reader import SignalGpsReader, SignalMeteoReader, SignalDepthReader bus = dbus.SystemBus() gps = SignalGpsReader(bus) pos = gps.get_position() # (lat, lon) or None -- reads from cache """ import logging import time import dbus logger = logging.getLogger(__name__) BUS_ITEM = 'com.victronenergy.BusItem' SYSTEM_SERVICE = 'com.victronenergy.system' def _unwrap(v): """Convert D-Bus value types to Python native types.""" if v is None: return None if isinstance(v, (dbus.Int16, dbus.Int32, dbus.Int64, dbus.UInt16, dbus.UInt32, dbus.UInt64, dbus.Byte)): return int(v) if isinstance(v, dbus.Double): return float(v) if isinstance(v, (dbus.String, dbus.Signature)): return str(v) if isinstance(v, dbus.Boolean): return bool(v) if isinstance(v, dbus.Array): return [_unwrap(x) for x in v] if len(v) > 0 else None if isinstance(v, (dbus.Dictionary, dict)): return {k: _unwrap(x) for k, x in v.items()} return v class _BaseSignalReader: """Base for D-Bus readers that bootstrap with GetValue then use signals.""" def __init__(self, bus): self._bus = bus self._proxy_cache = {} self._subscribed = False def _get_proxy(self, service, path): key = (service, path) proxy = self._proxy_cache.get(key) if proxy is not None: return proxy try: obj = self._bus.get_object(service, path, introspect=False) proxy = dbus.Interface(obj, BUS_ITEM) self._proxy_cache[key] = proxy return proxy except dbus.exceptions.DBusException: return None def _read_value(self, service, path): proxy = self._get_proxy(service, path) if proxy is None: return None try: return _unwrap(proxy.GetValue()) except dbus.exceptions.DBusException: self._proxy_cache.pop((service, path), None) return None def _subscribe(self, service, path, handler): try: self._bus.add_signal_receiver( handler, signal_name='PropertiesChanged', dbus_interface=BUS_ITEM, bus_name=service, path=path, ) except dbus.exceptions.DBusException as e: logger.debug('Failed to subscribe to %s%s: %s', service, path, e) def _list_service_names(self, prefix): """Return the first D-Bus service name matching *prefix*, or None.""" try: bus_obj = self._bus.get_object( 'org.freedesktop.DBus', '/org/freedesktop/DBus') iface = dbus.Interface(bus_obj, 'org.freedesktop.DBus') for name in iface.ListNames(): name_str = str(name) if name_str.startswith(prefix): return name_str except dbus.exceptions.DBusException: pass return None class SignalGpsReader(_BaseSignalReader): """Signal-driven GPS reader with dynamic service discovery. Resolves the GPS service via com.victronenergy.system /GpsService, bootstraps lat/lon/speed/fix with GetValue, then subscribes to PropertiesChanged for zero-cost steady-state reads. """ def __init__(self, bus): super().__init__(bus) self._service = None self._lat_path = None self._lon_path = None self._latitude = None self._longitude = None self._speed = None self._fix = None self._last_update = 0 self._discover_and_subscribe() def _discover_and_subscribe(self): try: proxy = self._get_proxy(SYSTEM_SERVICE, '/GpsService') if proxy: svc = _unwrap(proxy.GetValue()) if svc and isinstance(svc, str): self._service = svc self._detect_paths() self._bootstrap() self._setup_subscriptions() return except dbus.exceptions.DBusException: pass logger.debug('GPS service not yet available') def _detect_paths(self): """Determine whether GPS exposes /Position/Latitude or /Latitude.""" proxy = self._get_proxy(self._service, '/Position/Latitude') if proxy: try: proxy.GetValue() self._lat_path = '/Position/Latitude' self._lon_path = '/Position/Longitude' return except dbus.exceptions.DBusException: pass self._lat_path = '/Latitude' self._lon_path = '/Longitude' def _bootstrap(self): self._latitude = self._read_value(self._service, self._lat_path) self._longitude = self._read_value(self._service, self._lon_path) self._speed = self._read_value(self._service, '/Speed') self._fix = self._read_value(self._service, '/Fix') if self._latitude is not None: self._last_update = time.time() def _setup_subscriptions(self): if self._subscribed: return svc = self._service self._subscribe(svc, self._lat_path, self._on_lat) self._subscribe(svc, self._lon_path, self._on_lon) self._subscribe(svc, '/Speed', self._on_speed) self._subscribe(svc, '/Fix', self._on_fix) self._subscribed = True logger.debug('GPS signal subscriptions active on %s', svc) def _on_lat(self, changes): if 'Value' in changes: self._latitude = _unwrap(changes['Value']) self._last_update = time.time() def _on_lon(self, changes): if 'Value' in changes: self._longitude = _unwrap(changes['Value']) self._last_update = time.time() def _on_speed(self, changes): if 'Value' in changes: self._speed = _unwrap(changes['Value']) def _on_fix(self, changes): if 'Value' in changes: self._fix = _unwrap(changes['Value']) def _ensure_service(self): if self._service: return True self._discover_and_subscribe() return self._service is not None def get_position(self): """Return (lat, lon) or None.""" if not self._ensure_service(): return None lat, lon = self._latitude, self._longitude if lat is None or lon is None: return None try: lat_f, lon_f = float(lat), float(lon) except (TypeError, ValueError): return None if -90 <= lat_f <= 90 and -180 <= lon_f <= 180: return (lat_f, lon_f) return None def get_speed(self): """Return speed in m/s, or None.""" if not self._ensure_service(): return None if self._speed is not None: try: return float(self._speed) except (TypeError, ValueError): pass return None @property def last_update(self): return self._last_update class SignalMeteoReader(_BaseSignalReader): """Signal-driven meteo reader with dynamic service discovery. Discovers the first com.victronenergy.meteo.* service via ListNames, bootstraps wind/temp/pressure with GetValue, then subscribes to PropertiesChanged. """ PATHS = ('/WindDirection', '/WindSpeed', '/ExternalTemperature', '/Pressure') def __init__(self, bus): super().__init__(bus) self._service = None self._values = {p: None for p in self.PATHS} self._discover_and_subscribe() def _discover_and_subscribe(self): svc = self._list_service_names('com.victronenergy.meteo.') if svc: self._service = svc self._bootstrap() self._setup_subscriptions() else: logger.debug('Meteo service not yet available') def _bootstrap(self): for path in self.PATHS: self._values[path] = self._read_value(self._service, path) def _setup_subscriptions(self): if self._subscribed: return for path in self.PATHS: self._subscribe( self._service, path, lambda changes, p=path: self._on_changed(p, changes)) self._subscribed = True logger.debug('Meteo signal subscriptions active on %s', self._service) def _on_changed(self, path, changes): if 'Value' in changes: self._values[path] = _unwrap(changes['Value']) def _ensure_service(self): if self._service: return True self._discover_and_subscribe() return self._service is not None def _get(self, path): if not self._ensure_service(): return None return self._values.get(path) def get_wind_direction(self): """Wind direction in degrees (0-360).""" return self._get('/WindDirection') def get_wind_speed(self): """Wind speed in m/s.""" return self._get('/WindSpeed') def get_temperature(self): """Air temperature in Celsius.""" return self._get('/ExternalTemperature') def get_pressure(self): """Barometric pressure in hPa (mbar).""" return self._get('/Pressure') class SignalDepthReader(_BaseSignalReader): """Signal-driven depth reader with dynamic service discovery. Discovers the first com.victronenergy.navigation.* service via ListNames, bootstraps depth with GetValue, then subscribes to PropertiesChanged. """ def __init__(self, bus): super().__init__(bus) self._service = None self._depth = None self._discover_and_subscribe() def _discover_and_subscribe(self): svc = self._list_service_names('com.victronenergy.navigation.') if svc: self._service = svc self._bootstrap() self._setup_subscriptions() else: logger.debug('Navigation service not yet available') def _bootstrap(self): self._depth = self._read_value(self._service, '/Depth') def _setup_subscriptions(self): if self._subscribed: return self._subscribe(self._service, '/Depth', self._on_depth) self._subscribed = True logger.debug('Depth signal subscription active on %s', self._service) def _on_depth(self, changes): if 'Value' in changes: self._depth = _unwrap(changes['Value']) def _ensure_service(self): if self._service: return True self._discover_and_subscribe() return self._service is not None def get_depth(self): """Return depth in meters, or None.""" if not self._ensure_service(): return None if self._depth is not None: try: d = float(self._depth) if d > 0: return d except (TypeError, ValueError): pass return None