Files
venus/dbus-tides/tide_adapter.py
dev 9756538f16 Initial commit: Venus OS boat addons monorepo
Organizes 11 projects for Cerbo GX/Venus OS into a single repository:
- axiom-nmea: Raymarine LightHouse protocol decoder
- dbus-generator-ramp: Generator current ramp controller
- dbus-lightning: Blitzortung lightning monitor
- dbus-meteoblue-forecast: Meteoblue weather forecast
- dbus-no-foreign-land: noforeignland.com tracking
- dbus-tides: Tide prediction from depth + harmonics
- dbus-vrm-history: VRM cloud history proxy
- dbus-windy-station: Windy.com weather upload
- mfd-custom-app: MFD app deployment package
- venus-html5-app: Custom Victron HTML5 app fork
- watermaker: Watermaker PLC control UI

Adds root README, .gitignore, project template, and per-project
.gitignore files. Sensitive config files excluded via .gitignore
with .example templates provided.

Made-with: Cursor
2026-03-16 17:04:16 +00:00

361 lines
12 KiB
Python

"""
Tide adapter -- computes timing offset and amplitude scaling from
observed tide events matched against harmonic model predictions.
Local geography causes tides to arrive earlier/later and with
different amplitude compared to the open-ocean model. The adapter
estimates two correction factors:
Dt = timing offset (seconds, + means local tide arrives later)
k = amplitude scale factor (1.0 = no scaling)
The corrected prediction is:
local_height(t) = k * predicted_height(t - Dt)
"""
import logging
import math
from config import (
ADAPTATION_MAX_MATCH_WINDOW,
ADAPTATION_MIN_MATCHES,
ADAPTATION_MIN_RANGE_PAIRS,
RESIDUAL_DECAY_HOURS,
RESIDUAL_SMOOTHING_WINDOW,
)
logger = logging.getLogger('TideAdapter')
class TideAdapter:
"""Matches observed tide events to predictions and computes corrections."""
def __init__(self):
self._time_offset = 0.0
self._amp_scale = 1.0
self._match_count = 0
self._status = 0 # 0=no data, 1=time only, 2=time+amplitude
@property
def time_offset(self):
return self._time_offset
@property
def amp_scale(self):
return self._amp_scale
@property
def match_count(self):
return self._match_count
@property
def status(self):
return self._status
def reset(self):
"""Reset corrections (e.g. when vessel moves)."""
self._time_offset = 0.0
self._amp_scale = 1.0
self._match_count = 0
self._status = 0
logger.info("Adaptation reset")
def update(self, observed_events, predicted_extrema):
"""Match observed events to predicted extrema and compute corrections.
Args:
observed_events: list of (timestamp, depth, "high"|"low")
from TideDetector
predicted_extrema: list of (timestamp, height, "high"|"low")
from tide_harmonics.find_extrema()
Returns:
True if corrections were updated.
"""
if not observed_events or not predicted_extrema:
return False
matches = self._match_events(observed_events, predicted_extrema)
if len(matches) < ADAPTATION_MIN_MATCHES:
return False
time_offsets = [obs_ts - pred_ts
for obs_ts, _, pred_ts, _, _ in matches]
self._time_offset = _median(time_offsets)
self._match_count = len(matches)
highs_obs = [(obs_ts, obs_d) for obs_ts, obs_d, _, _, t in matches
if t == "high"]
lows_obs = [(obs_ts, obs_d) for obs_ts, obs_d, _, _, t in matches
if t == "low"]
highs_pred = [(pred_ts, pred_h) for _, _, pred_ts, pred_h, t in matches
if t == "high"]
lows_pred = [(pred_ts, pred_h) for _, _, pred_ts, pred_h, t in matches
if t == "low"]
amp_ratios = self._compute_amplitude_ratios(
highs_obs, lows_obs, highs_pred, lows_pred)
if len(amp_ratios) >= ADAPTATION_MIN_RANGE_PAIRS:
self._amp_scale = _median(amp_ratios)
self._amp_scale = max(0.3, min(self._amp_scale, 3.0))
self._status = 2
logger.info(
"Adaptation: Dt=%.0fs (%.1fmin), k=%.2f from %d matches "
"(%d range pairs)",
self._time_offset, self._time_offset / 60.0,
self._amp_scale, self._match_count, len(amp_ratios))
else:
self._amp_scale = 1.0
self._status = 1
logger.info(
"Adaptation: Dt=%.0fs (%.1fmin), k=1.0 (no range pairs) "
"from %d matches",
self._time_offset, self._time_offset / 60.0,
self._match_count)
return True
def adapt_curve(self, timestamps, heights):
"""Apply timing offset and amplitude scaling to a prediction curve.
Args:
timestamps: list of Unix timestamps (original prediction times)
heights: list of MSL-relative heights
Returns:
(adapted_timestamps, adapted_heights) -- same length as input.
Timestamps are shifted by +Dt, heights are scaled by k.
"""
if self._status == 0:
return timestamps, heights
dt = self._time_offset
k = self._amp_scale
adapted_ts = [t + dt for t in timestamps]
adapted_h = [k * h for h in heights]
return adapted_ts, adapted_h
def adapt_extrema(self, extrema):
"""Apply corrections to predicted extrema.
Args:
extrema: list of (timestamp, height, "high"|"low")
Returns:
list of (timestamp, height, type) with corrected values.
"""
if self._status == 0:
return extrema
dt = self._time_offset
k = self._amp_scale
return [(ts + dt, k * h, t) for ts, h, t in extrema]
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _match_events(self, observed, predicted):
"""Match each observed event to the nearest predicted event of the
same type within the match window.
Returns list of (obs_ts, obs_depth, pred_ts, pred_height, type).
"""
matches = []
used_pred = set()
for obs_ts, obs_depth, obs_type in observed:
best_dist = ADAPTATION_MAX_MATCH_WINDOW
best_idx = -1
for i, (pred_ts, pred_h, pred_type) in enumerate(predicted):
if pred_type != obs_type:
continue
if i in used_pred:
continue
dist = abs(obs_ts - pred_ts)
if dist < best_dist:
best_dist = dist
best_idx = i
if best_idx >= 0:
pred_ts, pred_h, pred_type = predicted[best_idx]
matches.append(
(obs_ts, obs_depth, pred_ts, pred_h, obs_type))
used_pred.add(best_idx)
return matches
@staticmethod
def _compute_amplitude_ratios(highs_obs, lows_obs, highs_pred, lows_pred):
"""Compute amplitude ratios from matched high/low pairs.
For each consecutive (high, low) or (low, high) pair in the
observations, compute the ratio of observed range to predicted range.
"""
if not highs_obs or not lows_obs or not highs_pred or not lows_pred:
return []
obs_range = abs(highs_obs[0][1] - lows_obs[0][1])
pred_range = abs(highs_pred[0][1] - lows_pred[0][1])
ratios = []
if pred_range > 0.05:
ratios.append(obs_range / pred_range)
for i in range(1, min(len(highs_obs), len(lows_obs),
len(highs_pred), len(lows_pred))):
or_i = abs(highs_obs[i][1] - lows_obs[i][1])
pr_i = abs(highs_pred[i][1] - lows_pred[i][1])
if pr_i > 0.05:
ratios.append(or_i / pr_i)
return ratios
# ------------------------------------------------------------------
# Residual curve correction
# ------------------------------------------------------------------
def compute_residual_correction(self, obs_history, pred_at_func,
chart_depth):
"""Compute a smoothed residual between observations and adapted model.
Args:
obs_history: list of (timestamp, depth) from the depth recorder
pred_at_func: callable(timestamps) -> list of MSL-relative heights
chart_depth: static depth offset (meters)
Returns:
list of (timestamp, smoothed_residual) or empty list.
"""
if not obs_history or len(obs_history) < 6:
return []
if self._status == 0:
return []
dt = self._time_offset
k = self._amp_scale
obs_ts_list = [ts for ts, _ in obs_history]
obs_depths = [d for _, d in obs_history]
shifted_ts = [ts - dt for ts in obs_ts_list]
raw_heights = pred_at_func(shifted_ts)
if raw_heights is None or len(raw_heights) != len(obs_ts_list):
return []
residuals = []
for i in range(len(obs_ts_list)):
adapted_depth = chart_depth + k * raw_heights[i]
residuals.append(obs_depths[i] - adapted_depth)
smoothed = _smooth_triangular(obs_ts_list, residuals,
RESIDUAL_SMOOTHING_WINDOW)
self._residual_correction = list(zip(obs_ts_list, smoothed))
n = len(self._residual_correction)
if n > 0:
logger.info(
"Residual correction: %d points, range [%.3f, %.3f]m",
n, min(smoothed), max(smoothed))
return self._residual_correction
def apply_residual_correction(self, pred_ts, pred_depths, residual_curve):
"""Apply the smoothed residual to a prediction curve.
In the overlap region the interpolated residual is added directly.
Beyond the last observation the residual decays exponentially.
Args:
pred_ts: prediction timestamps
pred_depths: prediction depths (adapted + chart_depth)
residual_curve: list of (timestamp, residual) from
compute_residual_correction()
Returns:
list of corrected depths (same length as pred_ts).
"""
if not residual_curve or not pred_ts:
return pred_depths
rc_ts = [t for t, _ in residual_curve]
rc_val = [v for _, v in residual_curve]
rc_start = rc_ts[0]
rc_end = rc_ts[-1]
last_r = rc_val[-1]
tau = RESIDUAL_DECAY_HOURS * 3600.0 / math.log(2.0)
corrected = []
for i, t in enumerate(pred_ts):
if t < rc_start:
r = rc_val[0]
elif t <= rc_end:
r = _interp(t, rc_ts, rc_val)
else:
elapsed = t - rc_end
r = last_r * math.exp(-elapsed / tau)
corrected.append(pred_depths[i] + r)
return corrected
def _interp(target, xs, ys):
"""Linear interpolation of ys at target within sorted xs."""
if target <= xs[0]:
return ys[0]
if target >= xs[-1]:
return ys[-1]
lo, hi = 0, len(xs) - 1
while hi - lo > 1:
mid = (lo + hi) // 2
if xs[mid] <= target:
lo = mid
else:
hi = mid
dx = xs[hi] - xs[lo]
if dx == 0:
return ys[lo]
frac = (target - xs[lo]) / dx
return ys[lo] + frac * (ys[hi] - ys[lo])
def _smooth_triangular(timestamps, values, window):
"""Triangular-kernel smoothing of (timestamps, values).
Each output value is a weighted average of all input values within
+/- window/2, weighted linearly by proximity (1 at centre, 0 at edge).
"""
half = window / 2.0
n = len(values)
result = [0.0] * n
for i in range(n):
t_i = timestamps[i]
w_sum = 0.0
v_sum = 0.0
for j in range(n):
dist = abs(timestamps[j] - t_i)
if dist > half:
continue
w = 1.0 - dist / half
w_sum += w
v_sum += w * values[j]
result[i] = v_sum / w_sum if w_sum > 0 else values[i]
return result
def _median(values):
"""Return the median of a list of numbers."""
if not values:
return 0.0
s = sorted(values)
n = len(s)
if n % 2 == 1:
return s[n // 2]
return (s[n // 2 - 1] + s[n // 2]) / 2.0