Files
venus/dbus-anchor-alarm/sensor_reader.py

213 lines
7.7 KiB
Python

"""
Read GPS, wind, heading, depth, speed, and course from Venus OS D-Bus services.
"""
import logging
import time
from collections import namedtuple
import dbus
logger = logging.getLogger('dbus-anchor-alarm.sensors')
BUS_ITEM = 'com.victronenergy.BusItem'
GPS_SERVICE = 'com.victronenergy.gps.raymarine_0'
METEO_SERVICE = 'com.victronenergy.meteo.raymarine_0'
NAVIGATION_SERVICE = 'com.victronenergy.navigation.raymarine_0'
MS_TO_KNOTS = 1.94384
METERS_TO_FEET = 3.28084
SensorSnapshot = namedtuple('SensorSnapshot', [
'latitude',
'longitude',
'speed',
'course',
'heading',
'depth',
'wind_speed',
'wind_direction',
'timestamp',
])
def _unwrap(v):
"""Convert D-Bus value types to Python native types."""
if v is None:
return None
if isinstance(v, (dbus.Int16, dbus.Int32, dbus.Int64,
dbus.UInt16, dbus.UInt32, dbus.UInt64, dbus.Byte)):
return int(v)
if isinstance(v, dbus.Double):
return float(v)
if isinstance(v, (dbus.String, dbus.Signature)):
return str(v)
if isinstance(v, dbus.Boolean):
return bool(v)
if isinstance(v, dbus.Array):
return [_unwrap(x) for x in v] if len(v) > 0 else None
if isinstance(v, (dbus.Dictionary, dict)):
return {k: _unwrap(x) for k, x in v.items()}
return v
class SensorReader:
"""Reads navigation sensor data from Venus OS D-Bus services using signal subscriptions."""
def __init__(self, bus):
self._bus = bus
self._gps_available = False
self._proxy_cache = {}
self._latitude = None
self._longitude = None
self._fix = None
self._speed_ms = None
self._course = None
self._heading = None
self._depth_m = None
self._wind_speed = None
self._wind_direction = None
self._last_update = time.time()
self._poll_initial_values()
self._setup_signal_subscriptions()
def _get_proxy(self, service_name, path):
"""Return a cached D-Bus proxy, creating it only once per (service, path)."""
key = (service_name, path)
proxy = self._proxy_cache.get(key)
if proxy is not None:
return proxy
obj = self._bus.get_object(service_name, path, introspect=False)
proxy = dbus.Interface(obj, BUS_ITEM)
self._proxy_cache[key] = proxy
return proxy
def _read_dbus_value(self, service_name, path):
"""Read a single value from D-Bus. Returns None on any failure."""
try:
proxy = self._get_proxy(service_name, path)
return _unwrap(proxy.GetValue())
except dbus.exceptions.DBusException as e:
self._proxy_cache.pop((service_name, path), None)
logger.debug('D-Bus read failed: %s %s -- %s', service_name, path, e)
return None
def _poll_initial_values(self):
"""Poll initial values once at startup."""
self._latitude = self._read_dbus_value(GPS_SERVICE, '/Position/Latitude')
self._longitude = self._read_dbus_value(GPS_SERVICE, '/Position/Longitude')
self._fix = self._read_dbus_value(GPS_SERVICE, '/Fix')
self._speed_ms = self._read_dbus_value(GPS_SERVICE, '/Speed')
self._course = self._read_dbus_value(GPS_SERVICE, '/Course')
self._heading = self._read_dbus_value(NAVIGATION_SERVICE, '/Heading')
self._depth_m = self._read_dbus_value(NAVIGATION_SERVICE, '/Depth')
self._wind_speed = self._read_dbus_value(METEO_SERVICE, '/WindSpeed')
self._wind_direction = self._read_dbus_value(METEO_SERVICE, '/WindDirection')
self._update_gps_available()
def _setup_signal_subscriptions(self):
"""Subscribe to all sensor value changes."""
subscriptions = [
(GPS_SERVICE, '/Position/Latitude', '_on_latitude_changed'),
(GPS_SERVICE, '/Position/Longitude', '_on_longitude_changed'),
(GPS_SERVICE, '/Fix', '_on_fix_changed'),
(GPS_SERVICE, '/Speed', '_on_speed_changed'),
(GPS_SERVICE, '/Course', '_on_course_changed'),
(NAVIGATION_SERVICE, '/Heading', '_on_heading_changed'),
(NAVIGATION_SERVICE, '/Depth', '_on_depth_changed'),
(METEO_SERVICE, '/WindSpeed', '_on_wind_speed_changed'),
(METEO_SERVICE, '/WindDirection', '_on_wind_direction_changed'),
]
for service, path, handler_name in subscriptions:
try:
self._bus.add_signal_receiver(
getattr(self, handler_name),
signal_name='PropertiesChanged',
dbus_interface='com.victronenergy.BusItem',
bus_name=service,
path=path
)
except dbus.exceptions.DBusException as e:
logger.debug(f'Failed to subscribe to {service}{path}: {e}')
def _on_latitude_changed(self, changes):
if 'Value' in changes:
self._latitude = _unwrap(changes['Value'])
self._last_update = time.time()
def _on_longitude_changed(self, changes):
if 'Value' in changes:
self._longitude = _unwrap(changes['Value'])
self._last_update = time.time()
def _on_fix_changed(self, changes):
if 'Value' in changes:
self._fix = _unwrap(changes['Value'])
self._update_gps_available()
def _on_speed_changed(self, changes):
if 'Value' in changes:
self._speed_ms = _unwrap(changes['Value'])
def _on_course_changed(self, changes):
if 'Value' in changes:
self._course = _unwrap(changes['Value'])
def _on_heading_changed(self, changes):
if 'Value' in changes:
self._heading = _unwrap(changes['Value'])
def _on_depth_changed(self, changes):
if 'Value' in changes:
self._depth_m = _unwrap(changes['Value'])
def _on_wind_speed_changed(self, changes):
if 'Value' in changes:
self._wind_speed = _unwrap(changes['Value'])
def _on_wind_direction_changed(self, changes):
if 'Value' in changes:
self._wind_direction = _unwrap(changes['Value'])
def _update_gps_available(self):
"""Update GPS availability status."""
self._gps_available = (
self._latitude is not None and
self._longitude is not None and
self._fix is not None and
int(self._fix) >= 1
)
@property
def connected(self):
"""True if GPS service is reachable and has a fix."""
return self._gps_available
def read(self):
"""Return a SensorSnapshot from cached signal values.
No D-Bus GetValue() calls - all values updated via signals.
"""
speed = float(self._speed_ms) * MS_TO_KNOTS if self._speed_ms is not None else None
course = float(self._course) if self._course is not None else None
heading = float(self._heading) if self._heading is not None else None
depth = float(self._depth_m) * METERS_TO_FEET if self._depth_m is not None else None
wind_speed = float(self._wind_speed) if self._wind_speed is not None else None
wind_direction = float(self._wind_direction) if self._wind_direction is not None else None
return SensorSnapshot(
latitude=float(self._latitude) if self._latitude is not None else None,
longitude=float(self._longitude) if self._longitude is not None else None,
speed=speed,
course=course,
heading=heading,
depth=depth,
wind_speed=wind_speed,
wind_direction=wind_direction,
timestamp=time.time(),
)