Files
venus/dbus-tides/tide_merger.py
dev 9756538f16 Initial commit: Venus OS boat addons monorepo
Organizes 11 projects for Cerbo GX/Venus OS into a single repository:
- axiom-nmea: Raymarine LightHouse protocol decoder
- dbus-generator-ramp: Generator current ramp controller
- dbus-lightning: Blitzortung lightning monitor
- dbus-meteoblue-forecast: Meteoblue weather forecast
- dbus-no-foreign-land: noforeignland.com tracking
- dbus-tides: Tide prediction from depth + harmonics
- dbus-vrm-history: VRM cloud history proxy
- dbus-windy-station: Windy.com weather upload
- mfd-custom-app: MFD app deployment package
- venus-html5-app: Custom Victron HTML5 app fork
- watermaker: Watermaker PLC control UI

Adds root README, .gitignore, project template, and per-project
.gitignore files. Sensitive config files excluded via .gitignore
with .example templates provided.

Made-with: Cursor
2026-03-16 17:04:16 +00:00

338 lines
12 KiB
Python

"""
Tide merger -- calibrates model predictions against observed depth.
The model predicts tide height relative to mean sea level. Observed
depth is measured from the transducer to the seabed. The merger
computes a "chart depth" offset so that:
predicted_depth = chart_depth + predicted_tide_height
Where chart_depth is calibrated from:
chart_depth = mean(observed_depth) - mean(predicted_tide_height)
This gives us predicted depths under the keel for the current location.
When the vessel moves more than CHART_DEPTH_RECAL_METERS, chart_depth
is recalibrated from recent observations at the new position.
"""
import logging
import math
import time
from config import CHART_DEPTH_RECAL_METERS
logger = logging.getLogger('TideMerger')
def _haversine(lat1, lon1, lat2, lon2):
"""Great-circle distance in meters between two GPS coordinates."""
R = 6371000
phi1 = math.radians(lat1)
phi2 = math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlam = math.radians(lon2 - lon1)
a = (math.sin(dphi / 2) ** 2
+ math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2)
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
class TideMerger:
"""Merges observed depth with model predictions."""
def __init__(self):
self._chart_depth = None
self._calibrated = False
self._calibration_samples = 0
self._cal_lat = None
self._cal_lon = None
@property
def is_calibrated(self):
return self._calibrated
@property
def chart_depth(self):
return self._chart_depth
def needs_recalibration(self, lat, lon):
"""Check if chart_depth needs recalibrating due to a position change."""
if lat is None or lon is None:
return False
if self._cal_lat is None or self._cal_lon is None:
return False
return _haversine(lat, lon, self._cal_lat, self._cal_lon) > CHART_DEPTH_RECAL_METERS
def calibrate(self, observed_history, predicted_ts, predicted_heights,
lat=None, lon=None):
"""Calibrate chart_depth from overlapping observed and predicted data.
When lat/lon are provided and the vessel has moved more than
CHART_DEPTH_RECAL_METERS from the last calibration position,
only recent observations (last 30 minutes) are used so that
chart_depth reflects the new seabed depth immediately.
Args:
observed_history: list of (timestamp, depth) from DepthRecorder
predicted_ts: list of prediction timestamps
predicted_heights: list of predicted tide heights (MSL-relative)
lat, lon: current vessel position
Returns:
True if calibration succeeded.
"""
if not observed_history or not predicted_ts or not predicted_heights:
return False
obs_to_use = observed_history
if lat is not None and lon is not None:
use_recent = False
if self._cal_lat is None or self._cal_lon is None:
use_recent = True
reason = "initial calibration (no prior position)"
else:
dist = _haversine(lat, lon, self._cal_lat, self._cal_lon)
if dist > CHART_DEPTH_RECAL_METERS:
use_recent = True
reason = ("position moved %.0fm from calibration point"
% dist)
if use_recent:
now = time.time()
recent = [(ts, d) for ts, d in observed_history
if now - ts < 1800]
if len(recent) >= 3:
obs_to_use = recent
logger.info(
"%s, calibrating chart_depth from %d recent obs",
reason, len(recent))
elif self._cal_lat is not None:
logger.info(
"%s but only %d recent obs, "
"keeping current chart_depth",
reason, len(recent))
return False
pred_start = predicted_ts[0]
pred_end = predicted_ts[-1]
overlapping_obs = [
(ts, d) for ts, d in obs_to_use
if pred_start <= ts <= pred_end
]
if len(overlapping_obs) < 6:
if obs_to_use:
mean_obs = sum(d for _, d in obs_to_use) / len(obs_to_use)
mean_pred = sum(predicted_heights) / len(predicted_heights)
self._chart_depth = mean_obs - mean_pred
self._calibration_samples = len(obs_to_use)
self._calibrated = True
self._cal_lat = lat
self._cal_lon = lon
logger.info(
"Rough calibration: chart_depth=%.2f (from %d obs, %d pred)",
self._chart_depth, len(obs_to_use),
len(predicted_heights))
return True
return False
mean_obs = sum(d for _, d in overlapping_obs) / len(overlapping_obs)
interp_pred = []
for obs_ts, _ in overlapping_obs:
h = self._interp(obs_ts, predicted_ts, predicted_heights)
if h is not None:
interp_pred.append(h)
if not interp_pred:
return False
mean_pred = sum(interp_pred) / len(interp_pred)
self._chart_depth = mean_obs - mean_pred
self._calibration_samples = len(overlapping_obs)
self._calibrated = True
self._cal_lat = lat
self._cal_lon = lon
logger.info(
"Calibrated: chart_depth=%.2f (from %d overlapping samples)",
self._chart_depth, len(overlapping_obs))
return True
def reset_calibration(self):
"""Reset calibration when vessel moves to a new location."""
self._chart_depth = None
self._calibrated = False
self._calibration_samples = 0
self._cal_lat = None
self._cal_lon = None
logger.info("Calibration reset (location change)")
def merge_predictions(self, predicted_ts, predicted_heights):
"""Convert MSL-relative predictions to depth-under-keel predictions.
Args:
predicted_ts: list of timestamps
predicted_heights: list of MSL-relative heights
Returns:
list of predicted depths under keel, or None if not calibrated.
"""
if not self._calibrated or self._chart_depth is None:
return None
return [self._chart_depth + h for h in predicted_heights]
def get_next_tides(self, extrema, count=2):
"""Get the next N high and low tides from predicted extrema.
Args:
extrema: list of (timestamp, height, "high"|"low")
count: how many of each type to return
Returns:
dict with keys:
'high': [(ts, depth), ...] -- next `count` highs
'low': [(ts, depth), ...] -- next `count` lows
"""
now = time.time()
future = [(ts, h, t) for ts, h, t in extrema if ts > now]
highs = []
lows = []
for ts, h, tide_type in future:
if tide_type == "high" and len(highs) < count:
highs.append((ts, h))
elif tide_type == "low" and len(lows) < count:
lows.append((ts, h))
if len(highs) >= count and len(lows) >= count:
break
return {'high': highs, 'low': lows}
def get_prev_tides(self, extrema, count=1):
"""Get the most recent N past high and low tides from predicted extrema.
Args:
extrema: list of (timestamp, height, "high"|"low")
count: how many of each type to return
Returns:
dict with keys:
'high': [(ts, depth), ...] -- most recent `count` past highs
'low': [(ts, depth), ...] -- most recent `count` past lows
"""
now = time.time()
past = [(ts, h, t) for ts, h, t in extrema if ts <= now]
highs = []
lows = []
for ts, h, tide_type in reversed(past):
if tide_type == "high" and len(highs) < count:
highs.append((ts, h))
elif tide_type == "low" and len(lows) < count:
lows.append((ts, h))
if len(highs) >= count and len(lows) >= count:
break
return {'high': highs, 'low': lows}
def build_combined_curve(self, observed_history, predicted_ts,
predicted_depths, hours_back=24):
"""Build a combined depth curve: observed trailing + predicted future.
Args:
observed_history: list of (timestamp, depth)
predicted_ts: list of predicted timestamps
predicted_depths: list of predicted depths (calibrated)
hours_back: how many hours of observed history to include
Returns:
dict with 'observed' and 'predicted' arrays of {ts, depth}.
"""
now = time.time()
cutoff = now - hours_back * 3600
obs_points = [
{'ts': int(ts), 'depth': round(d, 2)}
for ts, d in observed_history
if ts > cutoff
]
pred_points = []
if predicted_ts and predicted_depths:
for ts, d in zip(predicted_ts, predicted_depths):
pred_points.append({
'ts': int(ts),
'depth': round(d, 2),
})
return {
'observed': obs_points,
'predicted': pred_points,
}
def build_dual_curves(self, observed_history, station_ts, station_depths,
local_ts=None, local_depths=None, hours_back=24):
"""Build observed + station + local prediction curves.
Args:
observed_history: list of (timestamp, depth)
station_ts: prediction timestamps (unadjusted model)
station_depths: calibrated station depths
local_ts: adapted prediction timestamps (or None)
local_depths: adapted calibrated depths (or None)
hours_back: hours of observed history to include
Returns:
dict with 'observed', 'station', and 'local' arrays of
{ts, depth}.
"""
now = time.time()
cutoff = now - hours_back * 3600
obs_points = [
{'ts': int(ts), 'depth': round(d, 2)}
for ts, d in observed_history
if ts > cutoff
]
station_points = []
if station_ts and station_depths:
for ts, d in zip(station_ts, station_depths):
station_points.append(
{'ts': int(ts), 'depth': round(d, 2)})
local_points = []
if local_ts and local_depths:
for ts, d in zip(local_ts, local_depths):
local_points.append(
{'ts': int(ts), 'depth': round(d, 2)})
return {
'observed': obs_points,
'station': station_points,
'local': local_points,
}
@staticmethod
def _interp(target_ts, timestamps, values):
"""Linear interpolation of a value at target_ts."""
if not timestamps:
return None
if target_ts <= timestamps[0]:
return values[0]
if target_ts >= timestamps[-1]:
return values[-1]
for i in range(len(timestamps) - 1):
if timestamps[i] <= target_ts <= timestamps[i + 1]:
dt = timestamps[i + 1] - timestamps[i]
if dt == 0:
return values[i]
frac = (target_ts - timestamps[i]) / dt
return values[i] + frac * (values[i + 1] - values[i])
return None