""" Read GPS, wind, heading, depth, speed, and course from Venus OS D-Bus services. """ import logging import time from collections import namedtuple import dbus logger = logging.getLogger('dbus-anchor-alarm.sensors') BUS_ITEM = 'com.victronenergy.BusItem' GPS_SERVICE = 'com.victronenergy.gps.raymarine_0' METEO_SERVICE = 'com.victronenergy.meteo.raymarine_0' NAVIGATION_SERVICE = 'com.victronenergy.navigation.raymarine_0' MS_TO_KNOTS = 1.94384 METERS_TO_FEET = 3.28084 SensorSnapshot = namedtuple('SensorSnapshot', [ 'latitude', 'longitude', 'speed', 'course', 'heading', 'depth', 'wind_speed', 'wind_direction', 'timestamp', ]) def _unwrap(v): """Convert D-Bus value types to Python native types.""" if v is None: return None if isinstance(v, (dbus.Int16, dbus.Int32, dbus.Int64, dbus.UInt16, dbus.UInt32, dbus.UInt64, dbus.Byte)): return int(v) if isinstance(v, dbus.Double): return float(v) if isinstance(v, (dbus.String, dbus.Signature)): return str(v) if isinstance(v, dbus.Boolean): return bool(v) if isinstance(v, dbus.Array): return [_unwrap(x) for x in v] if len(v) > 0 else None if isinstance(v, (dbus.Dictionary, dict)): return {k: _unwrap(x) for k, x in v.items()} return v class SensorReader: """Reads navigation sensor data from Venus OS D-Bus services.""" def __init__(self, bus): self._bus = bus self._gps_available = False self._proxy_cache = {} def _get_proxy(self, service_name, path): """Return a cached D-Bus proxy, creating it only once per (service, path).""" key = (service_name, path) proxy = self._proxy_cache.get(key) if proxy is not None: return proxy obj = self._bus.get_object(service_name, path, introspect=False) proxy = dbus.Interface(obj, BUS_ITEM) self._proxy_cache[key] = proxy return proxy def _read_dbus_value(self, service_name, path): """Read a single value from D-Bus. Returns None on any failure.""" try: proxy = self._get_proxy(service_name, path) return _unwrap(proxy.GetValue()) except dbus.exceptions.DBusException as e: self._proxy_cache.pop((service_name, path), None) logger.debug('D-Bus read failed: %s %s -- %s', service_name, path, e) return None @property def connected(self): """True if GPS service is reachable and has a fix.""" return self._gps_available def read(self): """Read all sensors and return a SensorSnapshot. Each field is None if the corresponding D-Bus read fails. Speed is converted from m/s to knots; depth from meters to feet. """ lat = self._read_dbus_value(GPS_SERVICE, '/Position/Latitude') lon = self._read_dbus_value(GPS_SERVICE, '/Position/Longitude') fix = self._read_dbus_value(GPS_SERVICE, '/Fix') self._gps_available = ( lat is not None and lon is not None and fix is not None and int(fix) >= 1 ) speed_ms = self._read_dbus_value(GPS_SERVICE, '/Speed') speed = float(speed_ms) * MS_TO_KNOTS if speed_ms is not None else None course = self._read_dbus_value(GPS_SERVICE, '/Course') if course is not None: course = float(course) heading = self._read_dbus_value(NAVIGATION_SERVICE, '/Heading') if heading is not None: heading = float(heading) depth_m = self._read_dbus_value(NAVIGATION_SERVICE, '/Depth') depth = float(depth_m) * METERS_TO_FEET if depth_m is not None else None wind_speed = self._read_dbus_value(METEO_SERVICE, '/WindSpeed') if wind_speed is not None: wind_speed = float(wind_speed) wind_direction = self._read_dbus_value(METEO_SERVICE, '/WindDirection') if wind_direction is not None: wind_direction = float(wind_direction) return SensorSnapshot( latitude=float(lat) if lat is not None else None, longitude=float(lon) if lon is not None else None, speed=speed, course=course, heading=heading, depth=depth, wind_speed=wind_speed, wind_direction=wind_direction, timestamp=time.time(), )