213 lines
7.7 KiB
Python
213 lines
7.7 KiB
Python
"""
|
|
Read GPS, wind, heading, depth, speed, and course from Venus OS D-Bus services.
|
|
"""
|
|
|
|
import logging
|
|
import time
|
|
from collections import namedtuple
|
|
|
|
import dbus
|
|
|
|
logger = logging.getLogger('dbus-anchor-alarm.sensors')
|
|
|
|
BUS_ITEM = 'com.victronenergy.BusItem'
|
|
|
|
GPS_SERVICE = 'com.victronenergy.gps.raymarine_0'
|
|
METEO_SERVICE = 'com.victronenergy.meteo.raymarine_0'
|
|
NAVIGATION_SERVICE = 'com.victronenergy.navigation.raymarine_0'
|
|
|
|
MS_TO_KNOTS = 1.94384
|
|
METERS_TO_FEET = 3.28084
|
|
|
|
SensorSnapshot = namedtuple('SensorSnapshot', [
|
|
'latitude',
|
|
'longitude',
|
|
'speed',
|
|
'course',
|
|
'heading',
|
|
'depth',
|
|
'wind_speed',
|
|
'wind_direction',
|
|
'timestamp',
|
|
])
|
|
|
|
|
|
def _unwrap(v):
|
|
"""Convert D-Bus value types to Python native types."""
|
|
if v is None:
|
|
return None
|
|
if isinstance(v, (dbus.Int16, dbus.Int32, dbus.Int64,
|
|
dbus.UInt16, dbus.UInt32, dbus.UInt64, dbus.Byte)):
|
|
return int(v)
|
|
if isinstance(v, dbus.Double):
|
|
return float(v)
|
|
if isinstance(v, (dbus.String, dbus.Signature)):
|
|
return str(v)
|
|
if isinstance(v, dbus.Boolean):
|
|
return bool(v)
|
|
if isinstance(v, dbus.Array):
|
|
return [_unwrap(x) for x in v] if len(v) > 0 else None
|
|
if isinstance(v, (dbus.Dictionary, dict)):
|
|
return {k: _unwrap(x) for k, x in v.items()}
|
|
return v
|
|
|
|
|
|
class SensorReader:
|
|
"""Reads navigation sensor data from Venus OS D-Bus services using signal subscriptions."""
|
|
|
|
def __init__(self, bus):
|
|
self._bus = bus
|
|
self._gps_available = False
|
|
self._proxy_cache = {}
|
|
|
|
self._latitude = None
|
|
self._longitude = None
|
|
self._fix = None
|
|
self._speed_ms = None
|
|
self._course = None
|
|
self._heading = None
|
|
self._depth_m = None
|
|
self._wind_speed = None
|
|
self._wind_direction = None
|
|
self._last_update = time.time()
|
|
|
|
self._poll_initial_values()
|
|
self._setup_signal_subscriptions()
|
|
|
|
def _get_proxy(self, service_name, path):
|
|
"""Return a cached D-Bus proxy, creating it only once per (service, path)."""
|
|
key = (service_name, path)
|
|
proxy = self._proxy_cache.get(key)
|
|
if proxy is not None:
|
|
return proxy
|
|
obj = self._bus.get_object(service_name, path, introspect=False)
|
|
proxy = dbus.Interface(obj, BUS_ITEM)
|
|
self._proxy_cache[key] = proxy
|
|
return proxy
|
|
|
|
def _read_dbus_value(self, service_name, path):
|
|
"""Read a single value from D-Bus. Returns None on any failure."""
|
|
try:
|
|
proxy = self._get_proxy(service_name, path)
|
|
return _unwrap(proxy.GetValue())
|
|
except dbus.exceptions.DBusException as e:
|
|
self._proxy_cache.pop((service_name, path), None)
|
|
logger.debug('D-Bus read failed: %s %s -- %s', service_name, path, e)
|
|
return None
|
|
|
|
def _poll_initial_values(self):
|
|
"""Poll initial values once at startup."""
|
|
self._latitude = self._read_dbus_value(GPS_SERVICE, '/Position/Latitude')
|
|
self._longitude = self._read_dbus_value(GPS_SERVICE, '/Position/Longitude')
|
|
self._fix = self._read_dbus_value(GPS_SERVICE, '/Fix')
|
|
self._speed_ms = self._read_dbus_value(GPS_SERVICE, '/Speed')
|
|
self._course = self._read_dbus_value(GPS_SERVICE, '/Course')
|
|
self._heading = self._read_dbus_value(NAVIGATION_SERVICE, '/Heading')
|
|
self._depth_m = self._read_dbus_value(NAVIGATION_SERVICE, '/Depth')
|
|
self._wind_speed = self._read_dbus_value(METEO_SERVICE, '/WindSpeed')
|
|
self._wind_direction = self._read_dbus_value(METEO_SERVICE, '/WindDirection')
|
|
self._update_gps_available()
|
|
|
|
def _setup_signal_subscriptions(self):
|
|
"""Subscribe to all sensor value changes."""
|
|
subscriptions = [
|
|
(GPS_SERVICE, '/Position/Latitude', '_on_latitude_changed'),
|
|
(GPS_SERVICE, '/Position/Longitude', '_on_longitude_changed'),
|
|
(GPS_SERVICE, '/Fix', '_on_fix_changed'),
|
|
(GPS_SERVICE, '/Speed', '_on_speed_changed'),
|
|
(GPS_SERVICE, '/Course', '_on_course_changed'),
|
|
(NAVIGATION_SERVICE, '/Heading', '_on_heading_changed'),
|
|
(NAVIGATION_SERVICE, '/Depth', '_on_depth_changed'),
|
|
(METEO_SERVICE, '/WindSpeed', '_on_wind_speed_changed'),
|
|
(METEO_SERVICE, '/WindDirection', '_on_wind_direction_changed'),
|
|
]
|
|
|
|
for service, path, handler_name in subscriptions:
|
|
try:
|
|
self._bus.add_signal_receiver(
|
|
getattr(self, handler_name),
|
|
signal_name='PropertiesChanged',
|
|
dbus_interface='com.victronenergy.BusItem',
|
|
bus_name=service,
|
|
path=path
|
|
)
|
|
except dbus.exceptions.DBusException as e:
|
|
logger.debug(f'Failed to subscribe to {service}{path}: {e}')
|
|
|
|
def _on_latitude_changed(self, changes):
|
|
if 'Value' in changes:
|
|
self._latitude = _unwrap(changes['Value'])
|
|
self._last_update = time.time()
|
|
|
|
def _on_longitude_changed(self, changes):
|
|
if 'Value' in changes:
|
|
self._longitude = _unwrap(changes['Value'])
|
|
self._last_update = time.time()
|
|
|
|
def _on_fix_changed(self, changes):
|
|
if 'Value' in changes:
|
|
self._fix = _unwrap(changes['Value'])
|
|
self._update_gps_available()
|
|
|
|
def _on_speed_changed(self, changes):
|
|
if 'Value' in changes:
|
|
self._speed_ms = _unwrap(changes['Value'])
|
|
|
|
def _on_course_changed(self, changes):
|
|
if 'Value' in changes:
|
|
self._course = _unwrap(changes['Value'])
|
|
|
|
def _on_heading_changed(self, changes):
|
|
if 'Value' in changes:
|
|
self._heading = _unwrap(changes['Value'])
|
|
|
|
def _on_depth_changed(self, changes):
|
|
if 'Value' in changes:
|
|
self._depth_m = _unwrap(changes['Value'])
|
|
|
|
def _on_wind_speed_changed(self, changes):
|
|
if 'Value' in changes:
|
|
self._wind_speed = _unwrap(changes['Value'])
|
|
|
|
def _on_wind_direction_changed(self, changes):
|
|
if 'Value' in changes:
|
|
self._wind_direction = _unwrap(changes['Value'])
|
|
|
|
def _update_gps_available(self):
|
|
"""Update GPS availability status."""
|
|
self._gps_available = (
|
|
self._latitude is not None and
|
|
self._longitude is not None and
|
|
self._fix is not None and
|
|
int(self._fix) >= 1
|
|
)
|
|
|
|
@property
|
|
def connected(self):
|
|
"""True if GPS service is reachable and has a fix."""
|
|
return self._gps_available
|
|
|
|
def read(self):
|
|
"""Return a SensorSnapshot from cached signal values.
|
|
|
|
No D-Bus GetValue() calls - all values updated via signals.
|
|
"""
|
|
speed = float(self._speed_ms) * MS_TO_KNOTS if self._speed_ms is not None else None
|
|
course = float(self._course) if self._course is not None else None
|
|
heading = float(self._heading) if self._heading is not None else None
|
|
depth = float(self._depth_m) * METERS_TO_FEET if self._depth_m is not None else None
|
|
wind_speed = float(self._wind_speed) if self._wind_speed is not None else None
|
|
wind_direction = float(self._wind_direction) if self._wind_direction is not None else None
|
|
|
|
return SensorSnapshot(
|
|
latitude=float(self._latitude) if self._latitude is not None else None,
|
|
longitude=float(self._longitude) if self._longitude is not None else None,
|
|
speed=speed,
|
|
course=course,
|
|
heading=heading,
|
|
depth=depth,
|
|
wind_speed=wind_speed,
|
|
wind_direction=wind_direction,
|
|
timestamp=time.time(),
|
|
)
|