Organizes 11 projects for Cerbo GX/Venus OS into a single repository: - axiom-nmea: Raymarine LightHouse protocol decoder - dbus-generator-ramp: Generator current ramp controller - dbus-lightning: Blitzortung lightning monitor - dbus-meteoblue-forecast: Meteoblue weather forecast - dbus-no-foreign-land: noforeignland.com tracking - dbus-tides: Tide prediction from depth + harmonics - dbus-vrm-history: VRM cloud history proxy - dbus-windy-station: Windy.com weather upload - mfd-custom-app: MFD app deployment package - venus-html5-app: Custom Victron HTML5 app fork - watermaker: Watermaker PLC control UI Adds root README, .gitignore, project template, and per-project .gitignore files. Sensitive config files excluded via .gitignore with .example templates provided. Made-with: Cursor
361 lines
12 KiB
Python
361 lines
12 KiB
Python
"""
|
|
Tide adapter -- computes timing offset and amplitude scaling from
|
|
observed tide events matched against harmonic model predictions.
|
|
|
|
Local geography causes tides to arrive earlier/later and with
|
|
different amplitude compared to the open-ocean model. The adapter
|
|
estimates two correction factors:
|
|
|
|
Dt = timing offset (seconds, + means local tide arrives later)
|
|
k = amplitude scale factor (1.0 = no scaling)
|
|
|
|
The corrected prediction is:
|
|
|
|
local_height(t) = k * predicted_height(t - Dt)
|
|
"""
|
|
|
|
import logging
|
|
import math
|
|
|
|
from config import (
|
|
ADAPTATION_MAX_MATCH_WINDOW,
|
|
ADAPTATION_MIN_MATCHES,
|
|
ADAPTATION_MIN_RANGE_PAIRS,
|
|
RESIDUAL_DECAY_HOURS,
|
|
RESIDUAL_SMOOTHING_WINDOW,
|
|
)
|
|
|
|
logger = logging.getLogger('TideAdapter')
|
|
|
|
|
|
class TideAdapter:
|
|
"""Matches observed tide events to predictions and computes corrections."""
|
|
|
|
def __init__(self):
|
|
self._time_offset = 0.0
|
|
self._amp_scale = 1.0
|
|
self._match_count = 0
|
|
self._status = 0 # 0=no data, 1=time only, 2=time+amplitude
|
|
|
|
@property
|
|
def time_offset(self):
|
|
return self._time_offset
|
|
|
|
@property
|
|
def amp_scale(self):
|
|
return self._amp_scale
|
|
|
|
@property
|
|
def match_count(self):
|
|
return self._match_count
|
|
|
|
@property
|
|
def status(self):
|
|
return self._status
|
|
|
|
def reset(self):
|
|
"""Reset corrections (e.g. when vessel moves)."""
|
|
self._time_offset = 0.0
|
|
self._amp_scale = 1.0
|
|
self._match_count = 0
|
|
self._status = 0
|
|
logger.info("Adaptation reset")
|
|
|
|
def update(self, observed_events, predicted_extrema):
|
|
"""Match observed events to predicted extrema and compute corrections.
|
|
|
|
Args:
|
|
observed_events: list of (timestamp, depth, "high"|"low")
|
|
from TideDetector
|
|
predicted_extrema: list of (timestamp, height, "high"|"low")
|
|
from tide_harmonics.find_extrema()
|
|
|
|
Returns:
|
|
True if corrections were updated.
|
|
"""
|
|
if not observed_events or not predicted_extrema:
|
|
return False
|
|
|
|
matches = self._match_events(observed_events, predicted_extrema)
|
|
if len(matches) < ADAPTATION_MIN_MATCHES:
|
|
return False
|
|
|
|
time_offsets = [obs_ts - pred_ts
|
|
for obs_ts, _, pred_ts, _, _ in matches]
|
|
self._time_offset = _median(time_offsets)
|
|
self._match_count = len(matches)
|
|
|
|
highs_obs = [(obs_ts, obs_d) for obs_ts, obs_d, _, _, t in matches
|
|
if t == "high"]
|
|
lows_obs = [(obs_ts, obs_d) for obs_ts, obs_d, _, _, t in matches
|
|
if t == "low"]
|
|
highs_pred = [(pred_ts, pred_h) for _, _, pred_ts, pred_h, t in matches
|
|
if t == "high"]
|
|
lows_pred = [(pred_ts, pred_h) for _, _, pred_ts, pred_h, t in matches
|
|
if t == "low"]
|
|
|
|
amp_ratios = self._compute_amplitude_ratios(
|
|
highs_obs, lows_obs, highs_pred, lows_pred)
|
|
|
|
if len(amp_ratios) >= ADAPTATION_MIN_RANGE_PAIRS:
|
|
self._amp_scale = _median(amp_ratios)
|
|
self._amp_scale = max(0.3, min(self._amp_scale, 3.0))
|
|
self._status = 2
|
|
logger.info(
|
|
"Adaptation: Dt=%.0fs (%.1fmin), k=%.2f from %d matches "
|
|
"(%d range pairs)",
|
|
self._time_offset, self._time_offset / 60.0,
|
|
self._amp_scale, self._match_count, len(amp_ratios))
|
|
else:
|
|
self._amp_scale = 1.0
|
|
self._status = 1
|
|
logger.info(
|
|
"Adaptation: Dt=%.0fs (%.1fmin), k=1.0 (no range pairs) "
|
|
"from %d matches",
|
|
self._time_offset, self._time_offset / 60.0,
|
|
self._match_count)
|
|
|
|
return True
|
|
|
|
def adapt_curve(self, timestamps, heights):
|
|
"""Apply timing offset and amplitude scaling to a prediction curve.
|
|
|
|
Args:
|
|
timestamps: list of Unix timestamps (original prediction times)
|
|
heights: list of MSL-relative heights
|
|
|
|
Returns:
|
|
(adapted_timestamps, adapted_heights) -- same length as input.
|
|
Timestamps are shifted by +Dt, heights are scaled by k.
|
|
"""
|
|
if self._status == 0:
|
|
return timestamps, heights
|
|
|
|
dt = self._time_offset
|
|
k = self._amp_scale
|
|
|
|
adapted_ts = [t + dt for t in timestamps]
|
|
adapted_h = [k * h for h in heights]
|
|
return adapted_ts, adapted_h
|
|
|
|
def adapt_extrema(self, extrema):
|
|
"""Apply corrections to predicted extrema.
|
|
|
|
Args:
|
|
extrema: list of (timestamp, height, "high"|"low")
|
|
|
|
Returns:
|
|
list of (timestamp, height, type) with corrected values.
|
|
"""
|
|
if self._status == 0:
|
|
return extrema
|
|
|
|
dt = self._time_offset
|
|
k = self._amp_scale
|
|
return [(ts + dt, k * h, t) for ts, h, t in extrema]
|
|
|
|
# ------------------------------------------------------------------
|
|
# Internal
|
|
# ------------------------------------------------------------------
|
|
|
|
def _match_events(self, observed, predicted):
|
|
"""Match each observed event to the nearest predicted event of the
|
|
same type within the match window.
|
|
|
|
Returns list of (obs_ts, obs_depth, pred_ts, pred_height, type).
|
|
"""
|
|
matches = []
|
|
used_pred = set()
|
|
|
|
for obs_ts, obs_depth, obs_type in observed:
|
|
best_dist = ADAPTATION_MAX_MATCH_WINDOW
|
|
best_idx = -1
|
|
|
|
for i, (pred_ts, pred_h, pred_type) in enumerate(predicted):
|
|
if pred_type != obs_type:
|
|
continue
|
|
if i in used_pred:
|
|
continue
|
|
dist = abs(obs_ts - pred_ts)
|
|
if dist < best_dist:
|
|
best_dist = dist
|
|
best_idx = i
|
|
|
|
if best_idx >= 0:
|
|
pred_ts, pred_h, pred_type = predicted[best_idx]
|
|
matches.append(
|
|
(obs_ts, obs_depth, pred_ts, pred_h, obs_type))
|
|
used_pred.add(best_idx)
|
|
|
|
return matches
|
|
|
|
@staticmethod
|
|
def _compute_amplitude_ratios(highs_obs, lows_obs, highs_pred, lows_pred):
|
|
"""Compute amplitude ratios from matched high/low pairs.
|
|
|
|
For each consecutive (high, low) or (low, high) pair in the
|
|
observations, compute the ratio of observed range to predicted range.
|
|
"""
|
|
if not highs_obs or not lows_obs or not highs_pred or not lows_pred:
|
|
return []
|
|
|
|
obs_range = abs(highs_obs[0][1] - lows_obs[0][1])
|
|
pred_range = abs(highs_pred[0][1] - lows_pred[0][1])
|
|
|
|
ratios = []
|
|
if pred_range > 0.05:
|
|
ratios.append(obs_range / pred_range)
|
|
|
|
for i in range(1, min(len(highs_obs), len(lows_obs),
|
|
len(highs_pred), len(lows_pred))):
|
|
or_i = abs(highs_obs[i][1] - lows_obs[i][1])
|
|
pr_i = abs(highs_pred[i][1] - lows_pred[i][1])
|
|
if pr_i > 0.05:
|
|
ratios.append(or_i / pr_i)
|
|
|
|
return ratios
|
|
|
|
# ------------------------------------------------------------------
|
|
# Residual curve correction
|
|
# ------------------------------------------------------------------
|
|
|
|
def compute_residual_correction(self, obs_history, pred_at_func,
|
|
chart_depth):
|
|
"""Compute a smoothed residual between observations and adapted model.
|
|
|
|
Args:
|
|
obs_history: list of (timestamp, depth) from the depth recorder
|
|
pred_at_func: callable(timestamps) -> list of MSL-relative heights
|
|
chart_depth: static depth offset (meters)
|
|
|
|
Returns:
|
|
list of (timestamp, smoothed_residual) or empty list.
|
|
"""
|
|
if not obs_history or len(obs_history) < 6:
|
|
return []
|
|
if self._status == 0:
|
|
return []
|
|
|
|
dt = self._time_offset
|
|
k = self._amp_scale
|
|
|
|
obs_ts_list = [ts for ts, _ in obs_history]
|
|
obs_depths = [d for _, d in obs_history]
|
|
|
|
shifted_ts = [ts - dt for ts in obs_ts_list]
|
|
raw_heights = pred_at_func(shifted_ts)
|
|
if raw_heights is None or len(raw_heights) != len(obs_ts_list):
|
|
return []
|
|
|
|
residuals = []
|
|
for i in range(len(obs_ts_list)):
|
|
adapted_depth = chart_depth + k * raw_heights[i]
|
|
residuals.append(obs_depths[i] - adapted_depth)
|
|
|
|
smoothed = _smooth_triangular(obs_ts_list, residuals,
|
|
RESIDUAL_SMOOTHING_WINDOW)
|
|
|
|
self._residual_correction = list(zip(obs_ts_list, smoothed))
|
|
n = len(self._residual_correction)
|
|
if n > 0:
|
|
logger.info(
|
|
"Residual correction: %d points, range [%.3f, %.3f]m",
|
|
n, min(smoothed), max(smoothed))
|
|
return self._residual_correction
|
|
|
|
def apply_residual_correction(self, pred_ts, pred_depths, residual_curve):
|
|
"""Apply the smoothed residual to a prediction curve.
|
|
|
|
In the overlap region the interpolated residual is added directly.
|
|
Beyond the last observation the residual decays exponentially.
|
|
|
|
Args:
|
|
pred_ts: prediction timestamps
|
|
pred_depths: prediction depths (adapted + chart_depth)
|
|
residual_curve: list of (timestamp, residual) from
|
|
compute_residual_correction()
|
|
|
|
Returns:
|
|
list of corrected depths (same length as pred_ts).
|
|
"""
|
|
if not residual_curve or not pred_ts:
|
|
return pred_depths
|
|
|
|
rc_ts = [t for t, _ in residual_curve]
|
|
rc_val = [v for _, v in residual_curve]
|
|
rc_start = rc_ts[0]
|
|
rc_end = rc_ts[-1]
|
|
last_r = rc_val[-1]
|
|
|
|
tau = RESIDUAL_DECAY_HOURS * 3600.0 / math.log(2.0)
|
|
|
|
corrected = []
|
|
for i, t in enumerate(pred_ts):
|
|
if t < rc_start:
|
|
r = rc_val[0]
|
|
elif t <= rc_end:
|
|
r = _interp(t, rc_ts, rc_val)
|
|
else:
|
|
elapsed = t - rc_end
|
|
r = last_r * math.exp(-elapsed / tau)
|
|
corrected.append(pred_depths[i] + r)
|
|
|
|
return corrected
|
|
|
|
|
|
def _interp(target, xs, ys):
|
|
"""Linear interpolation of ys at target within sorted xs."""
|
|
if target <= xs[0]:
|
|
return ys[0]
|
|
if target >= xs[-1]:
|
|
return ys[-1]
|
|
lo, hi = 0, len(xs) - 1
|
|
while hi - lo > 1:
|
|
mid = (lo + hi) // 2
|
|
if xs[mid] <= target:
|
|
lo = mid
|
|
else:
|
|
hi = mid
|
|
dx = xs[hi] - xs[lo]
|
|
if dx == 0:
|
|
return ys[lo]
|
|
frac = (target - xs[lo]) / dx
|
|
return ys[lo] + frac * (ys[hi] - ys[lo])
|
|
|
|
|
|
def _smooth_triangular(timestamps, values, window):
|
|
"""Triangular-kernel smoothing of (timestamps, values).
|
|
|
|
Each output value is a weighted average of all input values within
|
|
+/- window/2, weighted linearly by proximity (1 at centre, 0 at edge).
|
|
"""
|
|
half = window / 2.0
|
|
n = len(values)
|
|
result = [0.0] * n
|
|
|
|
for i in range(n):
|
|
t_i = timestamps[i]
|
|
w_sum = 0.0
|
|
v_sum = 0.0
|
|
for j in range(n):
|
|
dist = abs(timestamps[j] - t_i)
|
|
if dist > half:
|
|
continue
|
|
w = 1.0 - dist / half
|
|
w_sum += w
|
|
v_sum += w * values[j]
|
|
result[i] = v_sum / w_sum if w_sum > 0 else values[i]
|
|
|
|
return result
|
|
|
|
|
|
def _median(values):
|
|
"""Return the median of a list of numbers."""
|
|
if not values:
|
|
return 0.0
|
|
s = sorted(values)
|
|
n = len(s)
|
|
if n % 2 == 1:
|
|
return s[n // 2]
|
|
return (s[n // 2 - 1] + s[n // 2]) / 2.0
|