Organizes 11 projects for Cerbo GX/Venus OS into a single repository: - axiom-nmea: Raymarine LightHouse protocol decoder - dbus-generator-ramp: Generator current ramp controller - dbus-lightning: Blitzortung lightning monitor - dbus-meteoblue-forecast: Meteoblue weather forecast - dbus-no-foreign-land: noforeignland.com tracking - dbus-tides: Tide prediction from depth + harmonics - dbus-vrm-history: VRM cloud history proxy - dbus-windy-station: Windy.com weather upload - mfd-custom-app: MFD app deployment package - venus-html5-app: Custom Victron HTML5 app fork - watermaker: Watermaker PLC control UI Adds root README, .gitignore, project template, and per-project .gitignore files. Sensitive config files excluded via .gitignore with .example templates provided. Made-with: Cursor
338 lines
12 KiB
Python
338 lines
12 KiB
Python
"""
|
|
Tide merger -- calibrates model predictions against observed depth.
|
|
|
|
The model predicts tide height relative to mean sea level. Observed
|
|
depth is measured from the transducer to the seabed. The merger
|
|
computes a "chart depth" offset so that:
|
|
|
|
predicted_depth = chart_depth + predicted_tide_height
|
|
|
|
Where chart_depth is calibrated from:
|
|
|
|
chart_depth = mean(observed_depth) - mean(predicted_tide_height)
|
|
|
|
This gives us predicted depths under the keel for the current location.
|
|
When the vessel moves more than CHART_DEPTH_RECAL_METERS, chart_depth
|
|
is recalibrated from recent observations at the new position.
|
|
"""
|
|
|
|
import logging
|
|
import math
|
|
import time
|
|
|
|
from config import CHART_DEPTH_RECAL_METERS
|
|
|
|
logger = logging.getLogger('TideMerger')
|
|
|
|
|
|
def _haversine(lat1, lon1, lat2, lon2):
|
|
"""Great-circle distance in meters between two GPS coordinates."""
|
|
R = 6371000
|
|
phi1 = math.radians(lat1)
|
|
phi2 = math.radians(lat2)
|
|
dphi = math.radians(lat2 - lat1)
|
|
dlam = math.radians(lon2 - lon1)
|
|
a = (math.sin(dphi / 2) ** 2
|
|
+ math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2)
|
|
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
|
|
|
|
|
|
class TideMerger:
|
|
"""Merges observed depth with model predictions."""
|
|
|
|
def __init__(self):
|
|
self._chart_depth = None
|
|
self._calibrated = False
|
|
self._calibration_samples = 0
|
|
self._cal_lat = None
|
|
self._cal_lon = None
|
|
|
|
@property
|
|
def is_calibrated(self):
|
|
return self._calibrated
|
|
|
|
@property
|
|
def chart_depth(self):
|
|
return self._chart_depth
|
|
|
|
def needs_recalibration(self, lat, lon):
|
|
"""Check if chart_depth needs recalibrating due to a position change."""
|
|
if lat is None or lon is None:
|
|
return False
|
|
if self._cal_lat is None or self._cal_lon is None:
|
|
return False
|
|
return _haversine(lat, lon, self._cal_lat, self._cal_lon) > CHART_DEPTH_RECAL_METERS
|
|
|
|
def calibrate(self, observed_history, predicted_ts, predicted_heights,
|
|
lat=None, lon=None):
|
|
"""Calibrate chart_depth from overlapping observed and predicted data.
|
|
|
|
When lat/lon are provided and the vessel has moved more than
|
|
CHART_DEPTH_RECAL_METERS from the last calibration position,
|
|
only recent observations (last 30 minutes) are used so that
|
|
chart_depth reflects the new seabed depth immediately.
|
|
|
|
Args:
|
|
observed_history: list of (timestamp, depth) from DepthRecorder
|
|
predicted_ts: list of prediction timestamps
|
|
predicted_heights: list of predicted tide heights (MSL-relative)
|
|
lat, lon: current vessel position
|
|
|
|
Returns:
|
|
True if calibration succeeded.
|
|
"""
|
|
if not observed_history or not predicted_ts or not predicted_heights:
|
|
return False
|
|
|
|
obs_to_use = observed_history
|
|
|
|
if lat is not None and lon is not None:
|
|
use_recent = False
|
|
if self._cal_lat is None or self._cal_lon is None:
|
|
use_recent = True
|
|
reason = "initial calibration (no prior position)"
|
|
else:
|
|
dist = _haversine(lat, lon, self._cal_lat, self._cal_lon)
|
|
if dist > CHART_DEPTH_RECAL_METERS:
|
|
use_recent = True
|
|
reason = ("position moved %.0fm from calibration point"
|
|
% dist)
|
|
|
|
if use_recent:
|
|
now = time.time()
|
|
recent = [(ts, d) for ts, d in observed_history
|
|
if now - ts < 1800]
|
|
if len(recent) >= 3:
|
|
obs_to_use = recent
|
|
logger.info(
|
|
"%s, calibrating chart_depth from %d recent obs",
|
|
reason, len(recent))
|
|
elif self._cal_lat is not None:
|
|
logger.info(
|
|
"%s but only %d recent obs, "
|
|
"keeping current chart_depth",
|
|
reason, len(recent))
|
|
return False
|
|
|
|
pred_start = predicted_ts[0]
|
|
pred_end = predicted_ts[-1]
|
|
|
|
overlapping_obs = [
|
|
(ts, d) for ts, d in obs_to_use
|
|
if pred_start <= ts <= pred_end
|
|
]
|
|
|
|
if len(overlapping_obs) < 6:
|
|
if obs_to_use:
|
|
mean_obs = sum(d for _, d in obs_to_use) / len(obs_to_use)
|
|
mean_pred = sum(predicted_heights) / len(predicted_heights)
|
|
self._chart_depth = mean_obs - mean_pred
|
|
self._calibration_samples = len(obs_to_use)
|
|
self._calibrated = True
|
|
self._cal_lat = lat
|
|
self._cal_lon = lon
|
|
logger.info(
|
|
"Rough calibration: chart_depth=%.2f (from %d obs, %d pred)",
|
|
self._chart_depth, len(obs_to_use),
|
|
len(predicted_heights))
|
|
return True
|
|
return False
|
|
|
|
mean_obs = sum(d for _, d in overlapping_obs) / len(overlapping_obs)
|
|
|
|
interp_pred = []
|
|
for obs_ts, _ in overlapping_obs:
|
|
h = self._interp(obs_ts, predicted_ts, predicted_heights)
|
|
if h is not None:
|
|
interp_pred.append(h)
|
|
|
|
if not interp_pred:
|
|
return False
|
|
|
|
mean_pred = sum(interp_pred) / len(interp_pred)
|
|
self._chart_depth = mean_obs - mean_pred
|
|
self._calibration_samples = len(overlapping_obs)
|
|
self._calibrated = True
|
|
self._cal_lat = lat
|
|
self._cal_lon = lon
|
|
|
|
logger.info(
|
|
"Calibrated: chart_depth=%.2f (from %d overlapping samples)",
|
|
self._chart_depth, len(overlapping_obs))
|
|
return True
|
|
|
|
def reset_calibration(self):
|
|
"""Reset calibration when vessel moves to a new location."""
|
|
self._chart_depth = None
|
|
self._calibrated = False
|
|
self._calibration_samples = 0
|
|
self._cal_lat = None
|
|
self._cal_lon = None
|
|
logger.info("Calibration reset (location change)")
|
|
|
|
def merge_predictions(self, predicted_ts, predicted_heights):
|
|
"""Convert MSL-relative predictions to depth-under-keel predictions.
|
|
|
|
Args:
|
|
predicted_ts: list of timestamps
|
|
predicted_heights: list of MSL-relative heights
|
|
|
|
Returns:
|
|
list of predicted depths under keel, or None if not calibrated.
|
|
"""
|
|
if not self._calibrated or self._chart_depth is None:
|
|
return None
|
|
return [self._chart_depth + h for h in predicted_heights]
|
|
|
|
def get_next_tides(self, extrema, count=2):
|
|
"""Get the next N high and low tides from predicted extrema.
|
|
|
|
Args:
|
|
extrema: list of (timestamp, height, "high"|"low")
|
|
count: how many of each type to return
|
|
|
|
Returns:
|
|
dict with keys:
|
|
'high': [(ts, depth), ...] -- next `count` highs
|
|
'low': [(ts, depth), ...] -- next `count` lows
|
|
"""
|
|
now = time.time()
|
|
future = [(ts, h, t) for ts, h, t in extrema if ts > now]
|
|
|
|
highs = []
|
|
lows = []
|
|
for ts, h, tide_type in future:
|
|
if tide_type == "high" and len(highs) < count:
|
|
highs.append((ts, h))
|
|
elif tide_type == "low" and len(lows) < count:
|
|
lows.append((ts, h))
|
|
if len(highs) >= count and len(lows) >= count:
|
|
break
|
|
|
|
return {'high': highs, 'low': lows}
|
|
|
|
def get_prev_tides(self, extrema, count=1):
|
|
"""Get the most recent N past high and low tides from predicted extrema.
|
|
|
|
Args:
|
|
extrema: list of (timestamp, height, "high"|"low")
|
|
count: how many of each type to return
|
|
|
|
Returns:
|
|
dict with keys:
|
|
'high': [(ts, depth), ...] -- most recent `count` past highs
|
|
'low': [(ts, depth), ...] -- most recent `count` past lows
|
|
"""
|
|
now = time.time()
|
|
past = [(ts, h, t) for ts, h, t in extrema if ts <= now]
|
|
|
|
highs = []
|
|
lows = []
|
|
for ts, h, tide_type in reversed(past):
|
|
if tide_type == "high" and len(highs) < count:
|
|
highs.append((ts, h))
|
|
elif tide_type == "low" and len(lows) < count:
|
|
lows.append((ts, h))
|
|
if len(highs) >= count and len(lows) >= count:
|
|
break
|
|
|
|
return {'high': highs, 'low': lows}
|
|
|
|
def build_combined_curve(self, observed_history, predicted_ts,
|
|
predicted_depths, hours_back=24):
|
|
"""Build a combined depth curve: observed trailing + predicted future.
|
|
|
|
Args:
|
|
observed_history: list of (timestamp, depth)
|
|
predicted_ts: list of predicted timestamps
|
|
predicted_depths: list of predicted depths (calibrated)
|
|
hours_back: how many hours of observed history to include
|
|
|
|
Returns:
|
|
dict with 'observed' and 'predicted' arrays of {ts, depth}.
|
|
"""
|
|
now = time.time()
|
|
cutoff = now - hours_back * 3600
|
|
|
|
obs_points = [
|
|
{'ts': int(ts), 'depth': round(d, 2)}
|
|
for ts, d in observed_history
|
|
if ts > cutoff
|
|
]
|
|
|
|
pred_points = []
|
|
if predicted_ts and predicted_depths:
|
|
for ts, d in zip(predicted_ts, predicted_depths):
|
|
pred_points.append({
|
|
'ts': int(ts),
|
|
'depth': round(d, 2),
|
|
})
|
|
|
|
return {
|
|
'observed': obs_points,
|
|
'predicted': pred_points,
|
|
}
|
|
|
|
def build_dual_curves(self, observed_history, station_ts, station_depths,
|
|
local_ts=None, local_depths=None, hours_back=24):
|
|
"""Build observed + station + local prediction curves.
|
|
|
|
Args:
|
|
observed_history: list of (timestamp, depth)
|
|
station_ts: prediction timestamps (unadjusted model)
|
|
station_depths: calibrated station depths
|
|
local_ts: adapted prediction timestamps (or None)
|
|
local_depths: adapted calibrated depths (or None)
|
|
hours_back: hours of observed history to include
|
|
|
|
Returns:
|
|
dict with 'observed', 'station', and 'local' arrays of
|
|
{ts, depth}.
|
|
"""
|
|
now = time.time()
|
|
cutoff = now - hours_back * 3600
|
|
|
|
obs_points = [
|
|
{'ts': int(ts), 'depth': round(d, 2)}
|
|
for ts, d in observed_history
|
|
if ts > cutoff
|
|
]
|
|
|
|
station_points = []
|
|
if station_ts and station_depths:
|
|
for ts, d in zip(station_ts, station_depths):
|
|
station_points.append(
|
|
{'ts': int(ts), 'depth': round(d, 2)})
|
|
|
|
local_points = []
|
|
if local_ts and local_depths:
|
|
for ts, d in zip(local_ts, local_depths):
|
|
local_points.append(
|
|
{'ts': int(ts), 'depth': round(d, 2)})
|
|
|
|
return {
|
|
'observed': obs_points,
|
|
'station': station_points,
|
|
'local': local_points,
|
|
}
|
|
|
|
@staticmethod
|
|
def _interp(target_ts, timestamps, values):
|
|
"""Linear interpolation of a value at target_ts."""
|
|
if not timestamps:
|
|
return None
|
|
if target_ts <= timestamps[0]:
|
|
return values[0]
|
|
if target_ts >= timestamps[-1]:
|
|
return values[-1]
|
|
|
|
for i in range(len(timestamps) - 1):
|
|
if timestamps[i] <= target_ts <= timestamps[i + 1]:
|
|
dt = timestamps[i + 1] - timestamps[i]
|
|
if dt == 0:
|
|
return values[i]
|
|
frac = (target_ts - timestamps[i]) / dt
|
|
return values[i] + frac * (values[i + 1] - values[i])
|
|
|
|
return None
|