Files
venus/dbus-windy-station/windy_station.py
Paul G 36a07dacb9 Extract shared signal-based D-Bus readers into lib/signal_reader.py
- Added lib/signal_reader.py with SignalGpsReader, SignalMeteoReader, and
  SignalDepthReader that use PropertiesChanged signal subscriptions instead
  of polling via GetValue(), reducing D-Bus overhead at steady state.
- Each reader discovers its service dynamically, seeds its cache with a
  one-shot GetValue, then relies on signals for all subsequent updates.
- Refactored dbus-tides, dbus-windy-station, dbus-no-foreign-land,
  dbus-lightning, and dbus-meteoblue-forecast to import from the shared
  library, removing ~600 lines of duplicated _unwrap() helpers and
  per-service GPS/meteo/depth reader classes.
- Updated install.sh for all five services to deploy signal_reader.py
  to /data/lib/ on the target device.
- Updated build-package.sh for all five services to bundle
  signal_reader.py into the .tar.gz package.
- Updated README.md with the new lib/ entry in the project table and
  documented the shared D-Bus readers pattern.
- Bumped version numbers in affected services (e.g. nfl_tracking 2.0.1).

Made-with: Cursor
2026-03-27 01:03:16 +00:00

906 lines
37 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Windy Station for Venus OS
Reads weather and GPS data from Venus OS D-Bus (published by Raymarine meteo
service) and uploads observations to Windy.com's weather station network.
Two types of API calls (matching the proven axiom-nmea windy_station example):
1. Station update (PUT /api/v2/pws/{id}): Updates GPS location and metadata
- On first GPS fix, every 60 minutes, or if GPS moves > 200 feet
2. Observation update (GET /api/v2/observation/update): Uploads weather data
- Every 5 minutes (configurable, min 300s)
Wind speed is reported as a 5-minute running average.
Wind gust is the peak 3-consecutive-sample average over a 10-minute window.
Settings are configurable via the Venus OS GUI (Settings -> Windy Station)
and persist across restarts via Venus localsettings.
"""
import json
import logging
import math
import os
import signal
import sys
import time
from collections import deque
from urllib.error import HTTPError, URLError
from urllib.parse import urlencode
from urllib.request import Request, urlopen
sys.path.insert(1, os.path.join(os.path.dirname(__file__), 'ext', 'velib_python'))
sys.path.insert(1, '/opt/victronenergy/velib_python')
sys.path.insert(1, os.path.join(os.path.dirname(__file__), '..', 'lib'))
sys.path.insert(1, '/data/lib')
try:
from gi.repository import GLib
except ImportError:
print("ERROR: GLib not available. This script must run on Venus OS.")
sys.exit(1)
try:
import dbus
from dbus.mainloop.glib import DBusGMainLoop
from vedbus import VeDbusService
from settingsdevice import SettingsDevice
except ImportError as e:
print(f"ERROR: Required module not available: {e}")
print("This script must run on Venus OS.")
sys.exit(1)
from config import (
SERVICE_NAME, API_BASE, CONFIG_FILE,
OBSERVATION_MIN_INTERVAL, STATION_UPDATE_INTERVAL,
GPS_MOVEMENT_THRESHOLD_METERS,
WIND_AVG_WINDOW, GUST_WINDOW, WIND_DIR_HISTORY_WINDOW,
WIND_TREND_WINDOW,
LOGGING_CONFIG,
)
from signal_reader import SignalGpsReader, SignalMeteoReader
VERSION = '1.0.1'
def haversine_distance(lat1, lon1, lat2, lon2):
"""Great-circle distance in meters between two GPS coordinates."""
R = 6371000
phi1, phi2 = math.radians(lat1), math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlam = math.radians(lon2 - lon1)
a = (math.sin(dphi / 2) ** 2 +
math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2)
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
# ---------------------------------------------------------------------------
# Wind calculations
# ---------------------------------------------------------------------------
class WindSpeedAverager:
"""Computes a running average of wind speed over a configurable window."""
def __init__(self, window_seconds=300.0):
self.window_seconds = window_seconds
self.samples = deque()
def add_sample(self, wind_speed_ms):
now = time.time()
self.samples.append((now, wind_speed_ms))
self._prune(now)
def _prune(self, now):
cutoff = now - self.window_seconds
while self.samples and self.samples[0][0] < cutoff:
self.samples.popleft()
def get_average(self):
"""Return average wind speed in m/s, or None if no samples."""
self._prune(time.time())
if not self.samples:
return None
total = sum(s[1] for s in self.samples)
return total / len(self.samples)
class WindGustTracker:
"""Tracks wind gust as peak 3-consecutive-sample average over a window."""
def __init__(self, window_seconds=600.0):
self.window_seconds = window_seconds
self.samples = deque()
def add_sample(self, wind_speed_ms):
now = time.time()
self.samples.append((now, wind_speed_ms))
self._prune(now)
def _prune(self, now):
cutoff = now - self.window_seconds
while self.samples and self.samples[0][0] < cutoff:
self.samples.popleft()
def get_gust(self):
"""Peak 3-consecutive-sample average in m/s, or None."""
self._prune(time.time())
if len(self.samples) < 3:
return None
speeds = [s[1] for s in self.samples]
max_avg = 0.0
for i in range(len(speeds) - 2):
avg = (speeds[i] + speeds[i + 1] + speeds[i + 2]) / 3.0
if avg > max_avg:
max_avg = avg
return max_avg if max_avg > 0 else None
class WindDirHistory:
"""Rolling buffer of wind direction samples for UI trend visualization."""
def __init__(self, window_seconds=1800.0):
self.window_seconds = window_seconds
self.samples = deque()
def add_sample(self, direction_deg, speed_ms=None):
now = time.time()
self.samples.append((now, direction_deg, speed_ms))
self._prune(now)
def _prune(self, now):
cutoff = now - self.window_seconds
while self.samples and self.samples[0][0] < cutoff:
self.samples.popleft()
def to_json(self):
"""Serialize to JSON array of {time, dir, spd} with millisecond timestamps."""
self._prune(time.time())
out = []
for entry in self.samples:
ts, d = entry[0], entry[1]
spd = entry[2] if len(entry) > 2 else None
rec = {"time": int(ts * 1000), "dir": round(d, 1)}
if spd is not None:
rec["spd"] = round(spd, 2)
out.append(rec)
return json.dumps(out)
class WindTrendTracker:
"""Sector-based wind trend: aggregates speed by directional sector over a
rolling window so the UI can render a heatmap ring showing wind shifts."""
SECTOR_COUNT = 120 # 3° per sector
def __init__(self, window_seconds=3600.0):
self.window_seconds = window_seconds
self.sector_size = 360.0 / self.SECTOR_COUNT
self.samples = deque()
def add_sample(self, direction_deg, speed_ms):
now = time.time()
sector = int(direction_deg / self.sector_size) % self.SECTOR_COUNT
self.samples.append((now, sector, speed_ms))
self._prune(now)
def _prune(self, now):
cutoff = now - self.window_seconds
while self.samples and self.samples[0][0] < cutoff:
self.samples.popleft()
def to_json(self):
"""Compact JSON: {s: sector_count, w: window_ms,
d: [[idx, last_seen_ms, peak_spd, count], ...]}"""
self._prune(time.time())
sectors = {}
for ts, sector, spd in self.samples:
if sector not in sectors:
sectors[sector] = {"last": ts, "peak": spd or 0, "n": 0}
entry = sectors[sector]
if ts > entry["last"]:
entry["last"] = ts
if spd is not None and spd > entry["peak"]:
entry["peak"] = spd
entry["n"] += 1
data = []
for idx, info in sorted(sectors.items()):
data.append([idx, int(info["last"] * 1000),
round(info["peak"], 2), info["n"]])
return json.dumps({"s": self.SECTOR_COUNT,
"w": int(self.window_seconds * 1000), "d": data})
# ---------------------------------------------------------------------------
# Main controller
# ---------------------------------------------------------------------------
class WindyStationController:
"""Coordinates D-Bus monitoring, wind calculations, and Windy API uploads."""
def __init__(self):
self._setup_logging()
self.logger = logging.getLogger('WindyStation')
self.logger.info(f"Initializing Windy Station v{VERSION}")
self.bus = dbus.SystemBus()
self._create_dbus_service()
self._setup_settings()
self.meteo = SignalMeteoReader(self.bus)
self.gps = SignalGpsReader(self.bus)
self.wind_averager = WindSpeedAverager(WIND_AVG_WINDOW)
self.gust_tracker = WindGustTracker(GUST_WINDOW)
self.wind_dir_history = WindDirHistory(WIND_DIR_HISTORY_WINDOW)
self.wind_trend = WindTrendTracker(WIND_TREND_WINDOW)
# Timing state
self.last_observation_time = 0
self.last_observation_success = 0
self.last_station_update = 0
self.last_station_attempt = 0
self.last_station_lat = None
self.last_station_lon = None
self.last_wind_sample = 0
self.last_dir_history_publish = 0
GLib.timeout_add(1000, self._main_loop)
self.logger.info("Initialized. Polling every 1s, observations every "
f"{OBSERVATION_MIN_INTERVAL}s")
def _setup_logging(self):
level = getattr(logging, LOGGING_CONFIG['level'], logging.INFO)
fmt = ('%(asctime)s %(levelname)s %(name)s: %(message)s'
if LOGGING_CONFIG['include_timestamp']
else '%(levelname)s %(name)s: %(message)s')
logging.basicConfig(level=level, format=fmt, stream=sys.stdout)
# -- D-Bus service creation -----------------------------------------------
def _create_dbus_service(self):
self.logger.info(f"Creating D-Bus service: {SERVICE_NAME}")
max_retries = 5
retry_delay = 1.0
for attempt in range(max_retries):
try:
self.dbus_service = VeDbusService(
SERVICE_NAME, self.bus, register=False)
break
except dbus.exceptions.NameExistsException:
if attempt < max_retries - 1:
self.logger.warning(
f"D-Bus name exists, retrying in {retry_delay}s "
f"(attempt {attempt + 1}/{max_retries})")
time.sleep(retry_delay)
retry_delay *= 2
else:
raise
# Management paths
self.dbus_service.add_path('/Mgmt/ProcessName', 'dbus-windy-station')
self.dbus_service.add_path('/Mgmt/ProcessVersion', VERSION)
self.dbus_service.add_path('/Mgmt/Connection', 'local')
# Device info
self.dbus_service.add_path('/DeviceInstance', 0)
self.dbus_service.add_path('/ProductId', 0xA160)
self.dbus_service.add_path('/ProductName', 'Windy Station')
self.dbus_service.add_path('/FirmwareVersion', VERSION)
self.dbus_service.add_path('/Connected', 1)
# Status paths (read-only, shown on GUI)
def _status_text(p, v):
labels = {0: 'Idle', 1: 'Active', 2: 'Uploading', 3: 'Error'}
return labels.get(v, 'Unknown') if v is not None else 'Unknown'
self.dbus_service.add_path('/Status', 0,
gettextcallback=_status_text)
self.dbus_service.add_path('/WindSpeed', None,
gettextcallback=lambda p, v:
f"{v:.1f} m/s" if v is not None else "--")
self.dbus_service.add_path('/WindGust', None,
gettextcallback=lambda p, v:
f"{v:.1f} m/s" if v is not None else "--")
self.dbus_service.add_path('/WindDirection', None,
gettextcallback=lambda p, v:
f"{v:.0f}°" if v is not None else "--")
self.dbus_service.add_path('/Temperature', None,
gettextcallback=lambda p, v:
f"{v:.1f}°C" if v is not None else "--")
self.dbus_service.add_path('/Pressure', None,
gettextcallback=lambda p, v:
f"{v:.1f} hPa" if v is not None else "--")
self.dbus_service.add_path('/LastReportTimeAgo', 'Never')
self.dbus_service.add_path('/LastReportTime', 0,
gettextcallback=self._report_time_text)
self.dbus_service.add_path('/WindDirHistory', '[]')
self.dbus_service.add_path('/WindTrend', '{}')
# Writable settings
self.dbus_service.add_path('/Settings/Enabled', 1,
writeable=True,
onchangecallback=self._on_setting_changed)
self.dbus_service.add_path('/Settings/ApiKey', '',
writeable=True,
onchangecallback=self._on_setting_changed)
self.dbus_service.add_path('/Settings/StationId', '',
writeable=True,
onchangecallback=self._on_setting_changed)
self.dbus_service.add_path('/Settings/StationPassword', '',
writeable=True,
onchangecallback=self._on_setting_changed)
self.dbus_service.add_path('/Settings/UploadInterval',
OBSERVATION_MIN_INTERVAL,
writeable=True,
onchangecallback=self._on_setting_changed,
gettextcallback=lambda p, v:
f"{v}s" if v is not None else "--")
self.dbus_service.add_path('/Settings/Name', '',
writeable=True,
onchangecallback=self._on_setting_changed)
self.dbus_service.add_path('/Settings/ShareOption', 'public',
writeable=True,
onchangecallback=self._on_setting_changed)
self.dbus_service.add_path('/Settings/StationType', '',
writeable=True,
onchangecallback=self._on_setting_changed)
self.dbus_service.add_path('/Settings/ElevM', 0,
writeable=True,
onchangecallback=self._on_setting_changed,
gettextcallback=lambda p, v:
f"{v} m" if v is not None else "--")
self.dbus_service.add_path('/Settings/AglWind', 10,
writeable=True,
onchangecallback=self._on_setting_changed,
gettextcallback=lambda p, v:
f"{v} m" if v is not None else "--")
self.dbus_service.add_path('/Settings/AglTemp', 2,
writeable=True,
onchangecallback=self._on_setting_changed,
gettextcallback=lambda p, v:
f"{v} m" if v is not None else "--")
self.dbus_service.add_path('/Settings/Units', 0,
writeable=True,
onchangecallback=self._on_setting_changed)
self.dbus_service.register()
self.logger.info("D-Bus service created")
# -- Persistent settings --------------------------------------------------
def _setup_settings(self):
self.settings = None
try:
path = '/Settings/WindyStation'
settings_def = {
'Enabled': [path + '/Enabled', 1, 0, 1],
'ApiKey': [path + '/ApiKey', '', 0, 0],
'StationId': [path + '/StationId', '', 0, 0],
'StationPassword': [path + '/StationPassword', '', 0, 0],
'UploadInterval': [path + '/UploadInterval',
OBSERVATION_MIN_INTERVAL, 300, 900],
'Name': [path + '/Name', '', 0, 0],
'ShareOption': [path + '/ShareOption', 'public', 0, 0],
'StationType': [path + '/StationType', '', 0, 0],
'ElevM': [path + '/ElevM', 0, 0, 1000],
'AglWind': [path + '/AglWind', 10, 0, 100],
'AglTemp': [path + '/AglTemp', 2, 0, 100],
'Units': [path + '/Units', 0, 0, 1],
}
self.settings = SettingsDevice(
self.bus, settings_def,
self._on_persistent_setting_changed)
if self.settings:
self._load_settings()
self.logger.info("Persistent settings initialized")
except Exception as e:
self.logger.warning(f"Could not initialize persistent settings: {e}")
self._set_defaults()
self._load_config_file()
def _set_defaults(self):
self.enabled = True
self.api_key = ''
self.station_id = ''
self.station_password = ''
self.upload_interval = OBSERVATION_MIN_INTERVAL
self.station_name = ''
self.share_option = 'public'
self.station_type = ''
self.elev_m = 0
self.agl_wind = 10
self.agl_temp = 2
self.units = 0 # 0=Metric, 1=Imperial
def _load_config_file(self):
"""Load station_config.json if it exists. Overrides localsettings."""
try:
if not os.path.exists(CONFIG_FILE):
return
with open(CONFIG_FILE, 'r') as f:
data = json.load(f)
self.logger.info(f"Loading config from {CONFIG_FILE}")
field_map = {
'api_key': ('ApiKey', 'api_key', str),
'apiKey': ('ApiKey', 'api_key', str),
'station_id': ('StationId', 'station_id', str),
'stationId': ('StationId', 'station_id', str),
'id': ('StationId', 'station_id', str),
'station_password': ('StationPassword', 'station_password', str),
'stationPassword': ('StationPassword', 'station_password', str),
'password': ('StationPassword', 'station_password', str),
'update_interval': ('UploadInterval', 'upload_interval', int),
'updateInterval': ('UploadInterval', 'upload_interval', int),
'interval': ('UploadInterval', 'upload_interval', int),
'name': ('Name', 'station_name', str),
'share_option': ('ShareOption', 'share_option', str),
'shareOption': ('ShareOption', 'share_option', str),
'station_type': ('StationType', 'station_type', str),
'stationType': ('StationType', 'station_type', str),
'elev_m': ('ElevM', 'elev_m', float),
'elevM': ('ElevM', 'elev_m', float),
'agl_wind': ('AglWind', 'agl_wind', float),
'aglWind': ('AglWind', 'agl_wind', float),
'agl_temp': ('AglTemp', 'agl_temp', float),
'aglTemp': ('AglTemp', 'agl_temp', float),
}
for json_key, value in data.items():
if json_key not in field_map:
continue
setting_name, attr_name, cast = field_map[json_key]
try:
typed_value = cast(value) if value is not None else None
if typed_value is None:
continue
setattr(self, attr_name, typed_value)
self._save_setting(setting_name, typed_value)
dbus_path = f'/Settings/{setting_name}'
self.dbus_service[dbus_path] = typed_value
except (ValueError, TypeError):
self.logger.warning(
f"Invalid value for {json_key}: {value}")
# Normalize share_option
share_map = {
'public': 'public', 'open': 'public',
'only_windy': 'only_windy', 'onlywindy': 'only_windy',
'private': 'private',
}
normalized = share_map.get(
self.share_option.lower().replace(' ', ''))
if normalized:
self.share_option = normalized
# Enforce interval bounds
self.upload_interval = max(300, min(900, self.upload_interval))
self.logger.info(
f"Config file applied: station_id={self.station_id}, "
f"name={self.station_name}")
except json.JSONDecodeError as e:
self.logger.error(f"Invalid JSON in {CONFIG_FILE}: {e}")
except Exception as e:
self.logger.error(f"Error reading {CONFIG_FILE}: {e}")
def _load_settings(self):
if not self.settings:
return
try:
self.enabled = bool(self.settings['Enabled'])
self.api_key = str(self.settings['ApiKey'] or '')
self.station_id = str(self.settings['StationId'] or '')
self.station_password = str(self.settings['StationPassword'] or '')
self.upload_interval = max(
300, min(900, int(self.settings['UploadInterval'])))
self.station_name = str(self.settings['Name'] or '')
self.share_option = str(self.settings['ShareOption'] or 'public')
self.station_type = str(self.settings['StationType'] or '')
self.elev_m = float(self.settings['ElevM'] or 0)
self.agl_wind = float(self.settings['AglWind'] or 10)
self.agl_temp = float(self.settings['AglTemp'] or 2)
self.units = int(self.settings['Units'] or 0)
self.dbus_service['/Settings/Enabled'] = 1 if self.enabled else 0
self.dbus_service['/Settings/ApiKey'] = self.api_key
self.dbus_service['/Settings/StationId'] = self.station_id
self.dbus_service['/Settings/StationPassword'] = self.station_password
self.dbus_service['/Settings/UploadInterval'] = self.upload_interval
self.dbus_service['/Settings/Name'] = self.station_name
self.dbus_service['/Settings/ShareOption'] = self.share_option
self.dbus_service['/Settings/StationType'] = self.station_type
self.dbus_service['/Settings/ElevM'] = self.elev_m
self.dbus_service['/Settings/AglWind'] = self.agl_wind
self.dbus_service['/Settings/AglTemp'] = self.agl_temp
self.dbus_service['/Settings/Units'] = self.units
self.logger.info(
f"Loaded settings: station_id={self.station_id or '(empty)'}, "
f"interval={self.upload_interval}s")
except Exception as e:
self.logger.warning(f"Error loading settings: {e}")
self._set_defaults()
def _on_persistent_setting_changed(self, setting, old_value, new_value):
self.logger.info(f"Persistent setting changed: {setting} = {new_value}")
self._load_settings()
def _on_setting_changed(self, path, value):
self.logger.info(f"Setting changed: {path} = {value}")
if path == '/Settings/Enabled':
self.enabled = bool(value)
self._save_setting('Enabled', 1 if self.enabled else 0)
elif path == '/Settings/ApiKey':
self.api_key = str(value) if value else ''
self._save_setting('ApiKey', self.api_key)
elif path == '/Settings/StationId':
self.station_id = str(value) if value else ''
self._save_setting('StationId', self.station_id)
elif path == '/Settings/StationPassword':
self.station_password = str(value) if value else ''
self._save_setting('StationPassword', self.station_password)
elif path == '/Settings/UploadInterval':
val = int(value) if value else OBSERVATION_MIN_INTERVAL
self.upload_interval = max(300, min(900, val))
self._save_setting('UploadInterval', self.upload_interval)
elif path == '/Settings/Name':
self.station_name = str(value) if value else ''
self._save_setting('Name', self.station_name)
elif path == '/Settings/ShareOption':
self.share_option = str(value) if value else 'public'
self._save_setting('ShareOption', self.share_option)
elif path == '/Settings/StationType':
self.station_type = str(value) if value else ''
self._save_setting('StationType', self.station_type)
elif path == '/Settings/ElevM':
self.elev_m = float(value) if value is not None else 0
self._save_setting('ElevM', self.elev_m)
elif path == '/Settings/AglWind':
self.agl_wind = float(value) if value is not None else 10
self._save_setting('AglWind', self.agl_wind)
elif path == '/Settings/AglTemp':
self.agl_temp = float(value) if value is not None else 2
self._save_setting('AglTemp', self.agl_temp)
elif path == '/Settings/Units':
self.units = int(value) if value is not None else 0
self._save_setting('Units', self.units)
return True
def _save_setting(self, name, value):
if self.settings:
try:
self.settings[name] = value
except Exception as e:
self.logger.warning(f"Failed to save setting {name}: {e}")
# -- Display helpers ------------------------------------------------------
def _report_time_text(self, path, value):
try:
if value is None or (isinstance(value, (int, float)) and value <= 0):
return "Never"
diff = time.time() - float(value)
if diff < 60:
return "Just now"
if diff < 3600:
return f"{int(diff / 60)}m ago"
if diff < 86400:
return f"{int(diff / 3600)}h ago"
return time.strftime("%Y-%m-%d %H:%M", time.localtime(float(value)))
except (TypeError, ValueError):
return "Never"
def _update_report_time_ago(self):
if self.last_observation_success <= 0:
self.dbus_service['/LastReportTimeAgo'] = 'Never'
else:
diff = time.time() - self.last_observation_success
m = int(diff / 60)
h, rm = m // 60, m % 60
if h > 0:
self.dbus_service['/LastReportTimeAgo'] = f"{h}h {rm}m"
else:
self.dbus_service['/LastReportTimeAgo'] = f"{rm}m"
# -- Station update (PUT) -------------------------------------------------
def _should_update_station(self, lat, lon):
if lat is None or lon is None:
return False
now = time.time()
if self.last_station_attempt > 0 and now - self.last_station_attempt < 60:
return False
if self.last_station_update == 0:
return True
if now - self.last_station_update >= STATION_UPDATE_INTERVAL:
return True
if self.last_station_lat is not None and self.last_station_lon is not None:
dist = haversine_distance(
self.last_station_lat, self.last_station_lon, lat, lon)
if dist >= GPS_MOVEMENT_THRESHOLD_METERS:
return True
return False
def _update_station(self, lat, lon):
"""PUT station metadata and GPS location to Windy API."""
self.last_station_attempt = time.time()
if not self.api_key or not self.station_id:
self.logger.warning("Cannot update station: API key or station ID not set")
return False
url = f"{API_BASE}/pws/{self.station_id}"
payload = {
"share_option": self.share_option or "public",
}
if self.station_type:
payload["station_type"] = self.station_type
if self.station_name:
payload["name"] = self.station_name
if self.elev_m is not None:
payload["elev_m"] = self.elev_m
if self.agl_wind is not None:
payload["agl_wind"] = self.agl_wind
if self.agl_temp is not None:
payload["agl_temp"] = self.agl_temp
if lat is not None and lon is not None:
payload["lat"] = round(lat, 6)
payload["lon"] = round(lon, 6)
try:
request = Request(
url,
data=json.dumps(payload).encode('utf-8'),
headers={
"Content-Type": "application/json",
"windy-api-key": self.api_key,
},
method='PUT'
)
with urlopen(request, timeout=30) as response:
if response.status == 200:
self.last_station_update = time.time()
if lat is not None and lon is not None:
self.last_station_lat = lat
self.last_station_lon = lon
self.logger.info(f"Station updated: {lat:.6f}, {lon:.6f}")
return True
body = response.read().decode('utf-8')
self.logger.error(f"Station update error {response.status}: {body}")
return False
except HTTPError as e:
body = ""
try:
body = e.read().decode('utf-8')
except Exception:
pass
self.logger.error(f"Station update HTTP {e.code}: {e.reason} {body}")
return False
except (URLError, Exception) as e:
self.logger.error(f"Station update error: {e}")
return False
# -- Observation upload (GET) ---------------------------------------------
def _send_observation(self):
"""GET observation update to Windy API with query parameters."""
if not self.station_id or not self.station_password:
self.logger.warning(
"Cannot send observation: station ID or password not set")
return False
params = {
"id": self.station_id,
"PASSWORD": self.station_password,
}
has_data = False
wind_dir = self.meteo.get_wind_direction()
if wind_dir is not None:
params["winddir"] = int(round(wind_dir))
has_data = True
avg_wind = self.wind_averager.get_average()
if avg_wind is not None:
params["wind"] = round(avg_wind, 1)
has_data = True
gust = self.gust_tracker.get_gust()
if gust is not None:
params["gust"] = round(gust, 1)
temp = self.meteo.get_temperature()
if temp is not None:
params["temp"] = round(temp, 1)
has_data = True
pressure = self.meteo.get_pressure()
if pressure is not None:
params["pressure"] = round(pressure * 100) # hPa -> Pa
has_data = True
if not has_data:
self.logger.debug("No weather data to send")
return False
url = f"{API_BASE}/observation/update?{urlencode(params)}"
try:
request = Request(url, method='GET')
with urlopen(request, timeout=30) as response:
if response.status == 200:
self.logger.info(
f"Observation sent: wind={params.get('wind', 'N/A')} "
f"gust={params.get('gust', 'N/A')} "
f"temp={params.get('temp', 'N/A')} "
f"pressure={params.get('pressure', 'N/A')}")
return True
body = response.read().decode('utf-8')
self.logger.error(
f"Observation error {response.status}: {body}")
return False
except HTTPError as e:
body = ""
try:
body = e.read().decode('utf-8')
except Exception:
pass
self.logger.error(
f"Observation HTTP {e.code}: {e.reason} {body}")
if e.code == 429:
self._handle_rate_limit(body)
return False
except (URLError, Exception) as e:
self.logger.error(f"Observation error: {e}")
return False
def _handle_rate_limit(self, body):
"""Parse a 429 response and defer the next observation until retry_after."""
try:
data = json.loads(body)
retry_iso = data.get("retry_after")
if retry_iso:
from datetime import datetime, timezone
retry_dt = datetime.fromisoformat(
retry_iso.replace("Z", "+00:00"))
delay = (retry_dt - datetime.now(timezone.utc)).total_seconds()
if delay > 0:
self.last_observation_time = time.time() + delay
self.logger.warning(
f"Rate-limited by API; deferring next observation "
f"{delay:.0f}s until {retry_iso}")
return
except Exception:
pass
self.last_observation_time = time.time() + 300
self.logger.warning(
"Rate-limited by API; deferring next observation 300s")
# -- Main loop ------------------------------------------------------------
def _main_loop(self):
try:
if not self.enabled:
self.dbus_service['/Status'] = 0
self._update_report_time_ago()
return True
now = time.time()
# Sample wind every second for averaging and gust tracking
wind_dir = self.meteo.get_wind_direction()
if now - self.last_wind_sample >= 1.0:
wind_ms = self.meteo.get_wind_speed()
if wind_ms is not None:
self.wind_averager.add_sample(wind_ms)
self.gust_tracker.add_sample(wind_ms)
if wind_dir is not None:
self.wind_dir_history.add_sample(wind_dir, wind_ms)
self.wind_trend.add_sample(wind_dir, wind_ms)
self.last_wind_sample = now
# Update current conditions on D-Bus (for GUI display)
avg_wind = self.wind_averager.get_average()
self.dbus_service['/WindSpeed'] = (
round(avg_wind, 1) if avg_wind is not None else None)
gust = self.gust_tracker.get_gust()
self.dbus_service['/WindGust'] = (
round(gust, 1) if gust is not None else None)
self.dbus_service['/WindDirection'] = wind_dir
self.dbus_service['/Temperature'] = self.meteo.get_temperature()
self.dbus_service['/Pressure'] = self.meteo.get_pressure()
# Publish direction history (throttled)
if now - self.last_dir_history_publish >= 15.0:
self.dbus_service['/WindDirHistory'] = (
self.wind_dir_history.to_json())
self.dbus_service['/WindTrend'] = (
self.wind_trend.to_json())
self.last_dir_history_publish = now
# GPS position
pos = self.gps.get_position()
lat = pos[0] if pos else None
lon = pos[1] if pos else None
# Check credentials configured
has_creds = bool(self.station_id and self.station_password)
if not has_creds:
self.dbus_service['/Status'] = 0 # Idle - not configured
self._update_report_time_ago()
return True
self.dbus_service['/Status'] = 1 # Active
# Station location update
if self.api_key and self._should_update_station(lat, lon):
self.logger.info("Updating station location...")
self._update_station(lat, lon)
# Observation upload
interval = max(300, self.upload_interval)
if now - self.last_observation_time >= interval:
self.dbus_service['/Status'] = 2 # Uploading
if self._send_observation():
self.last_observation_success = int(now)
self.dbus_service['/LastReportTime'] = (
self.last_observation_success)
if self.last_observation_time <= now:
self.last_observation_time = now
self.dbus_service['/Status'] = 1
self._update_report_time_ago()
except dbus.exceptions.DBusException as e:
self.logger.warning(f"D-Bus error: {e}")
except Exception as e:
self.logger.exception(f"Unexpected error: {e}")
return True
def main():
DBusGMainLoop(set_as_default=True)
print("=" * 60)
print(f"Windy Station v{VERSION}")
print("=" * 60)
mainloop = None
def signal_handler(signum, frame):
try:
sig_name = signal.Signals(signum).name
except ValueError:
sig_name = str(signum)
logging.info(f"Received {sig_name}, shutting down...")
if mainloop is not None:
mainloop.quit()
signal.signal(signal.SIGTERM, signal_handler)
signal.signal(signal.SIGINT, signal_handler)
try:
controller = WindyStationController()
mainloop = GLib.MainLoop()
mainloop.run()
except KeyboardInterrupt:
print("\nShutdown requested")
except Exception as e:
logging.error(f"Fatal error: {e}", exc_info=True)
sys.exit(1)
finally:
logging.info("Service stopped")
if __name__ == '__main__':
main()