Organizes 11 projects for Cerbo GX/Venus OS into a single repository: - axiom-nmea: Raymarine LightHouse protocol decoder - dbus-generator-ramp: Generator current ramp controller - dbus-lightning: Blitzortung lightning monitor - dbus-meteoblue-forecast: Meteoblue weather forecast - dbus-no-foreign-land: noforeignland.com tracking - dbus-tides: Tide prediction from depth + harmonics - dbus-vrm-history: VRM cloud history proxy - dbus-windy-station: Windy.com weather upload - mfd-custom-app: MFD app deployment package - venus-html5-app: Custom Victron HTML5 app fork - watermaker: Watermaker PLC control UI Adds root README, .gitignore, project template, and per-project .gitignore files. Sensitive config files excluded via .gitignore with .example templates provided. Made-with: Cursor
825 lines
30 KiB
Python
825 lines
30 KiB
Python
"""
|
|
Tide prediction orchestrator.
|
|
|
|
Loads tidal constituent data from the bundled coastal grid (or fallback
|
|
sources), interpolates for the vessel position, and generates tide
|
|
predictions using the pure-Python harmonic engine.
|
|
"""
|
|
|
|
import gzip
|
|
import json
|
|
import logging
|
|
import math
|
|
import os
|
|
import time
|
|
|
|
from config import (
|
|
BESTFIT_MIN_HISTORY_HOURS,
|
|
CONSTITUENTS_DIR,
|
|
DATA_DIR,
|
|
GRID_FILE_NAME,
|
|
NOAA_NEARBY_COUNT,
|
|
NOAA_STATION_MAX_DISTANCE,
|
|
NOAA_STATIONS_FILE_NAME,
|
|
PREDICTION_HOURS,
|
|
PREDICTION_INTERVAL,
|
|
)
|
|
import tide_harmonics
|
|
|
|
# Harmonic predictions are smooth mathematical curves with no noise,
|
|
# so extrema filtering only needs to suppress shallow-water harmonic
|
|
# artifacts (typically < 1 cm). The observed-data threshold
|
|
# (MIN_TIDE_AMPLITUDE) is far too aggressive here and drops
|
|
# legitimate small-range tidal cycles caused by diurnal inequality.
|
|
_PREDICTION_EXTREMA_MIN = 0.02
|
|
|
|
logger = logging.getLogger('TidePredictor')
|
|
|
|
|
|
def _find_subordinate_ref_extrema(timestamps, heights, min_amplitude=0.4):
|
|
"""Single-pass extrema finder for subordinate reference curves.
|
|
|
|
The subordinate height-correction factor (h_high / h_low) is
|
|
interpolated between reference-curve extrema. The multi-pass
|
|
pair-removal in find_extrema() selects different surviving extrema,
|
|
shifting factor boundaries and distorting the subordinate curve.
|
|
This preserves the original sequential logic so the curve shape
|
|
remains stable.
|
|
"""
|
|
if len(timestamps) < 3:
|
|
return []
|
|
extrema = []
|
|
last_h = None
|
|
for i in range(1, len(heights) - 1):
|
|
hp = heights[i - 1]
|
|
hc = heights[i]
|
|
hn = heights[i + 1]
|
|
if hc > hp and hc > hn:
|
|
etype = "high"
|
|
elif hc < hp and hc < hn:
|
|
etype = "low"
|
|
else:
|
|
continue
|
|
if last_h is not None and abs(hc - last_h) < min_amplitude:
|
|
continue
|
|
extrema.append((timestamps[i], hc, etype))
|
|
last_h = hc
|
|
return extrema
|
|
|
|
|
|
class TidePredictor:
|
|
"""Manages constituent data and produces tide predictions."""
|
|
|
|
def __init__(self):
|
|
self._grid = None
|
|
self._grid_loaded = False
|
|
self._noaa_stations = None
|
|
self._noaa_loaded = False
|
|
self._current_constituents = None
|
|
self._current_source = None
|
|
self._current_station = None
|
|
self._current_offsets = None
|
|
self._nearby_stations = []
|
|
self._station_override = None
|
|
self._current_lat = None
|
|
self._current_lon = None
|
|
self._last_prediction = None
|
|
self._last_extrema = None
|
|
self._last_prediction_time = 0
|
|
|
|
# ------------------------------------------------------------------
|
|
# NOAA station loading
|
|
# ------------------------------------------------------------------
|
|
|
|
def load_noaa_stations(self):
|
|
"""Load the pre-built NOAA station database from disk."""
|
|
if self._noaa_loaded:
|
|
return self._noaa_stations is not None
|
|
|
|
self._noaa_loaded = True
|
|
path = self._find_noaa_file()
|
|
if not path:
|
|
logger.info("No NOAA station file found")
|
|
return False
|
|
|
|
try:
|
|
if path.endswith('.gz'):
|
|
with gzip.open(path, 'rt', encoding='utf-8') as f:
|
|
self._noaa_stations = json.load(f)
|
|
else:
|
|
with open(path, 'r') as f:
|
|
self._noaa_stations = json.load(f)
|
|
|
|
n = len(self._noaa_stations.get('stations', []))
|
|
logger.info("Loaded NOAA station database: %d stations", n)
|
|
return True
|
|
except (OSError, json.JSONDecodeError) as e:
|
|
logger.error("Failed to load NOAA stations: %s", e)
|
|
self._noaa_stations = None
|
|
return False
|
|
|
|
def _find_noaa_file(self):
|
|
"""Search for the NOAA station file in known locations."""
|
|
candidates = [
|
|
os.path.join(DATA_DIR, 'constituents', NOAA_STATIONS_FILE_NAME),
|
|
os.path.join(CONSTITUENTS_DIR, NOAA_STATIONS_FILE_NAME),
|
|
os.path.join(os.path.dirname(__file__), 'constituents',
|
|
NOAA_STATIONS_FILE_NAME),
|
|
os.path.join(os.path.dirname(__file__), 'constituents',
|
|
NOAA_STATIONS_FILE_NAME.replace('.gz', '')),
|
|
]
|
|
for path in candidates:
|
|
if os.path.exists(path):
|
|
logger.info("Found NOAA station file: %s", path)
|
|
return path
|
|
return None
|
|
|
|
def _find_nearby_noaa_stations(self, lat, lon):
|
|
"""Find the N nearest NOAA stations within range.
|
|
|
|
Populates self._nearby_stations with (distance, station) pairs
|
|
for all stations within NOAA_STATION_MAX_DISTANCE, sorted by
|
|
distance (nearest first), up to NOAA_NEARBY_COUNT.
|
|
"""
|
|
self._nearby_stations = []
|
|
if not self.load_noaa_stations() or self._noaa_stations is None:
|
|
return
|
|
|
|
stations = self._noaa_stations.get('stations', [])
|
|
if not stations:
|
|
return
|
|
|
|
dists = []
|
|
for st in stations:
|
|
d = _haversine(lat, lon, st['lat'], st['lon'])
|
|
if d <= NOAA_STATION_MAX_DISTANCE:
|
|
dists.append((d, st))
|
|
dists.sort(key=lambda x: x[0])
|
|
self._nearby_stations = dists[:NOAA_NEARBY_COUNT]
|
|
|
|
if self._nearby_stations:
|
|
logger.info(
|
|
"Found %d NOAA stations within %.0f nm",
|
|
len(self._nearby_stations),
|
|
NOAA_STATION_MAX_DISTANCE / 1852)
|
|
for d, st in self._nearby_stations:
|
|
stype = ' (sub)' if st.get('type') == 'S' else ''
|
|
logger.info(
|
|
" %s %s (%.1f nm)%s",
|
|
st.get('id', '?'), st.get('name', '?'),
|
|
d / 1852, stype)
|
|
else:
|
|
logger.info(
|
|
"No NOAA stations within %.0f nm of %.2f, %.2f",
|
|
NOAA_STATION_MAX_DISTANCE / 1852, lat, lon)
|
|
|
|
def _select_noaa_station(self, lat, lon, depth_history=None):
|
|
"""Select the best NOAA station for the current position.
|
|
|
|
Selection priority:
|
|
1. Manual override (if station_override is set and valid)
|
|
2. Best-fit scoring against observed depth history
|
|
3. Nearest station by distance
|
|
|
|
Returns a constituent list, or None if no station in range.
|
|
"""
|
|
self._find_nearby_noaa_stations(lat, lon)
|
|
if not self._nearby_stations:
|
|
return None
|
|
|
|
chosen_dist, chosen_station = None, None
|
|
|
|
if self._station_override:
|
|
for d, st in self._nearby_stations:
|
|
if st.get('id') == self._station_override:
|
|
chosen_dist, chosen_station = d, st
|
|
logger.info(
|
|
"Using overridden station %s (%s)",
|
|
st.get('id', '?'), st.get('name', '?'))
|
|
break
|
|
if chosen_station is None:
|
|
all_stations = self._noaa_stations.get('stations', [])
|
|
for st in all_stations:
|
|
if st.get('id') == self._station_override:
|
|
chosen_dist = _haversine(
|
|
lat, lon, st['lat'], st['lon'])
|
|
chosen_station = st
|
|
logger.info(
|
|
"Using overridden station %s (%s), %.1f km away",
|
|
st.get('id', '?'), st.get('name', '?'),
|
|
chosen_dist / 1000)
|
|
break
|
|
|
|
if chosen_station is None and depth_history:
|
|
scored = self._rank_stations_by_fit(
|
|
self._nearby_stations, depth_history)
|
|
if scored:
|
|
chosen_dist, chosen_station = scored[0]
|
|
|
|
if chosen_station is None:
|
|
chosen_dist, chosen_station = self._nearby_stations[0]
|
|
|
|
is_subordinate = chosen_station.get('type') == 'S'
|
|
if is_subordinate:
|
|
constituents, offsets = self._resolve_subordinate(chosen_station)
|
|
else:
|
|
constituents = chosen_station.get('constituents', [])
|
|
offsets = None
|
|
|
|
if not constituents:
|
|
return None
|
|
|
|
self._current_offsets = offsets
|
|
self._current_station = {
|
|
'id': chosen_station.get('id', ''),
|
|
'name': chosen_station.get('name', ''),
|
|
'lat': chosen_station.get('lat', 0.0),
|
|
'lon': chosen_station.get('lon', 0.0),
|
|
'distance': chosen_dist,
|
|
}
|
|
if is_subordinate:
|
|
self._current_station['type'] = 'S'
|
|
self._current_station['ref_id'] = chosen_station.get('ref_id', '')
|
|
self._current_source = 'noaa:%s' % chosen_station.get('id', '?')
|
|
|
|
logger.info(
|
|
"Selected NOAA station %s (%s), %.1f km, %d constituents%s",
|
|
chosen_station.get('id', '?'),
|
|
chosen_station.get('name', '?'),
|
|
chosen_dist / 1000,
|
|
len(constituents),
|
|
' (subordinate)' if is_subordinate else '')
|
|
return constituents
|
|
|
|
def _resolve_subordinate(self, sub_station):
|
|
"""Resolve a subordinate station to its reference station's
|
|
constituents and the subordinate offsets.
|
|
|
|
Returns (constituents, offsets) or (None, None).
|
|
"""
|
|
ref_id = sub_station.get('ref_id', '')
|
|
if not ref_id or not self._noaa_stations:
|
|
return None, None
|
|
|
|
ref_station = None
|
|
for st in self._noaa_stations.get('stations', []):
|
|
if st.get('id') == ref_id and st.get('constituents'):
|
|
ref_station = st
|
|
break
|
|
|
|
if not ref_station:
|
|
logger.warning(
|
|
"Reference station %s not found for subordinate %s",
|
|
ref_id, sub_station.get('id', '?'))
|
|
return None, None
|
|
|
|
offsets = sub_station.get('offsets', {})
|
|
logger.info(
|
|
"Resolved subordinate %s -> reference %s (%s), "
|
|
"time_high=%+dmin, time_low=%+dmin, "
|
|
"height_high=%.2f, height_low=%.2f (%s)",
|
|
sub_station.get('id', '?'),
|
|
ref_id, ref_station.get('name', '?'),
|
|
offsets.get('time_high', 0), offsets.get('time_low', 0),
|
|
offsets.get('height_high', 1.0), offsets.get('height_low', 1.0),
|
|
offsets.get('height_type', 'R'))
|
|
return ref_station['constituents'], offsets
|
|
|
|
def _rank_stations_by_fit(self, candidates, depth_history):
|
|
"""Score candidate stations against observed depth data.
|
|
|
|
Runs a hindcast for each candidate over the observed time window,
|
|
then picks the one whose predicted tidal shape best correlates
|
|
with the detrended depth observations.
|
|
|
|
Returns candidates re-sorted by fit quality (best first),
|
|
or None if insufficient history.
|
|
"""
|
|
if not depth_history or len(depth_history) < 6:
|
|
return None
|
|
|
|
span_hours = (depth_history[-1][0] - depth_history[0][0]) / 3600.0
|
|
if span_hours < BESTFIT_MIN_HISTORY_HOURS:
|
|
return None
|
|
|
|
obs_ts = [ts for ts, _ in depth_history]
|
|
obs_depths = [d for _, d in depth_history]
|
|
|
|
obs_mean = sum(obs_depths) / len(obs_depths)
|
|
obs_centered = [d - obs_mean for d in obs_depths]
|
|
|
|
scored = []
|
|
for dist, st in candidates:
|
|
if st.get('type') == 'S':
|
|
constituents, offsets = self._resolve_subordinate(st)
|
|
else:
|
|
constituents = st.get('constituents', [])
|
|
offsets = None
|
|
if not constituents:
|
|
continue
|
|
|
|
if offsets:
|
|
time_shift = (offsets.get('time_high', 0) +
|
|
offsets.get('time_low', 0)) / 2.0 * 60.0
|
|
shifted_ts = [t - time_shift for t in obs_ts]
|
|
raw_heights = tide_harmonics.predict(
|
|
constituents, shifted_ts)
|
|
if not raw_heights or len(raw_heights) != len(obs_ts):
|
|
continue
|
|
h_high = offsets.get('height_high', 1.0)
|
|
h_low = offsets.get('height_low', 1.0)
|
|
h_type = offsets.get('height_type', 'R')
|
|
avg_factor = (h_high + h_low) / 2.0
|
|
if h_type == 'R':
|
|
pred_heights = [h * avg_factor for h in raw_heights]
|
|
else:
|
|
pred_heights = [h + avg_factor for h in raw_heights]
|
|
else:
|
|
pred_heights = tide_harmonics.predict(constituents, obs_ts)
|
|
if not pred_heights or len(pred_heights) != len(obs_ts):
|
|
continue
|
|
|
|
pred_mean = sum(pred_heights) / len(pred_heights)
|
|
pred_centered = [h - pred_mean for h in pred_heights]
|
|
|
|
corr = _correlation(obs_centered, pred_centered)
|
|
scored.append((dist, st, corr))
|
|
|
|
if not scored:
|
|
return None
|
|
|
|
scored.sort(key=lambda x: -x[2])
|
|
|
|
for dist, st, corr in scored:
|
|
stype = ' (sub)' if st.get('type') == 'S' else ''
|
|
logger.info(
|
|
" Fit score: %s %s = %.3f (%.1f nm)%s",
|
|
st.get('id', '?'), st.get('name', '?'),
|
|
corr, dist / 1852, stype)
|
|
|
|
best_corr = scored[0][2]
|
|
if best_corr < 0.3:
|
|
logger.info("Best fit correlation %.3f too low, using nearest",
|
|
best_corr)
|
|
return None
|
|
|
|
return [(d, st) for d, st, _ in scored]
|
|
|
|
@property
|
|
def station_override(self):
|
|
return self._station_override
|
|
|
|
@station_override.setter
|
|
def station_override(self, station_id):
|
|
"""Set a manual station override. Empty string or None to clear."""
|
|
if station_id:
|
|
self._station_override = str(station_id).strip()
|
|
logger.info("Station override set: %s", self._station_override)
|
|
else:
|
|
self._station_override = None
|
|
logger.info("Station override cleared")
|
|
|
|
# ------------------------------------------------------------------
|
|
# Grid loading
|
|
# ------------------------------------------------------------------
|
|
|
|
def load_grid(self):
|
|
"""Load the pre-computed coastal grid from disk."""
|
|
if self._grid_loaded:
|
|
return self._grid is not None
|
|
|
|
self._grid_loaded = True
|
|
grid_path = self._find_grid_file()
|
|
if not grid_path:
|
|
logger.warning("No coastal grid file found")
|
|
return False
|
|
|
|
try:
|
|
if grid_path.endswith('.gz'):
|
|
with gzip.open(grid_path, 'rt', encoding='utf-8') as f:
|
|
self._grid = json.load(f)
|
|
else:
|
|
with open(grid_path, 'r') as f:
|
|
self._grid = json.load(f)
|
|
|
|
n_points = len(self._grid.get('points', []))
|
|
regions = self._grid.get('regions', [])
|
|
logger.info(
|
|
"Loaded coastal grid: %d points, regions: %s",
|
|
n_points, regions)
|
|
return True
|
|
except (OSError, json.JSONDecodeError) as e:
|
|
logger.error("Failed to load grid: %s", e)
|
|
self._grid = None
|
|
return False
|
|
|
|
def _find_grid_file(self):
|
|
"""Search for the grid file in known locations."""
|
|
candidates = [
|
|
os.path.join(DATA_DIR, 'constituents', GRID_FILE_NAME),
|
|
os.path.join(CONSTITUENTS_DIR, GRID_FILE_NAME),
|
|
os.path.join(os.path.dirname(__file__), 'constituents', GRID_FILE_NAME),
|
|
os.path.join(os.path.dirname(__file__), 'constituents',
|
|
GRID_FILE_NAME.replace('.gz', '')),
|
|
]
|
|
for path in candidates:
|
|
if os.path.exists(path):
|
|
logger.info("Found grid file: %s", path)
|
|
return path
|
|
return None
|
|
|
|
# ------------------------------------------------------------------
|
|
# Location update
|
|
# ------------------------------------------------------------------
|
|
|
|
def update_location(self, lat, lon, depth_history=None):
|
|
"""Update constituents for a new vessel position.
|
|
|
|
Tries data sources in priority order:
|
|
1. NOAA station (observation-derived, best near coast)
|
|
2. Coastal grid (global model, interpolated)
|
|
3. Custom JSON (manual fallback)
|
|
|
|
When depth_history is provided and long enough, candidate NOAA
|
|
stations are scored against the observations to pick the best fit.
|
|
|
|
Returns True if new constituents were loaded.
|
|
"""
|
|
self._current_lat = lat
|
|
self._current_lon = lon
|
|
self._current_source = None
|
|
self._current_station = None
|
|
self._current_offsets = None
|
|
|
|
constituents = self._select_noaa_station(
|
|
lat, lon, depth_history=depth_history)
|
|
if constituents is None:
|
|
constituents = self._interpolate_from_grid(lat, lon)
|
|
if constituents is not None:
|
|
self._current_source = 'grid'
|
|
if constituents is None:
|
|
constituents = self._load_custom_json()
|
|
if constituents is not None:
|
|
self._current_source = 'custom'
|
|
if constituents is None:
|
|
logger.warning("No constituent data for %.2f, %.2f", lat, lon)
|
|
return False
|
|
|
|
self._current_constituents = constituents
|
|
logger.info(
|
|
"Loaded %d constituents for %.2f, %.2f (source: %s)",
|
|
len(constituents), lat, lon, self._current_source)
|
|
return True
|
|
|
|
@property
|
|
def current_source(self):
|
|
"""Return the name of the data source for current constituents."""
|
|
return self._current_source
|
|
|
|
@property
|
|
def current_station(self):
|
|
"""Return metadata dict for the selected NOAA station, or None."""
|
|
return self._current_station
|
|
|
|
@property
|
|
def nearby_stations(self):
|
|
"""Return list of nearby station dicts with distance info."""
|
|
result = []
|
|
for dist, st in self._nearby_stations:
|
|
entry = {
|
|
'id': st.get('id', ''),
|
|
'name': st.get('name', ''),
|
|
'lat': st.get('lat', 0.0),
|
|
'lon': st.get('lon', 0.0),
|
|
'distance': round(dist / 1000, 1),
|
|
}
|
|
if st.get('type') == 'S':
|
|
entry['type'] = 'S'
|
|
entry['ref_id'] = st.get('ref_id', '')
|
|
result.append(entry)
|
|
return result
|
|
|
|
# ------------------------------------------------------------------
|
|
# Prediction
|
|
# ------------------------------------------------------------------
|
|
|
|
def predict(self, from_time=None, duration_hours=None):
|
|
"""Generate a tide prediction curve.
|
|
|
|
Args:
|
|
from_time: Unix timestamp for the start of the curve
|
|
(default: now).
|
|
duration_hours: length of the curve in hours
|
|
(default: PREDICTION_HOURS).
|
|
|
|
Returns:
|
|
(timestamps, heights, extrema) or (None, None, None).
|
|
"""
|
|
if not self._current_constituents:
|
|
return None, None, None
|
|
|
|
if from_time is None:
|
|
from_time = time.time()
|
|
if duration_hours is None:
|
|
duration_hours = PREDICTION_HOURS
|
|
|
|
n_steps = duration_hours * 3600 // PREDICTION_INTERVAL
|
|
timestamps = [
|
|
from_time + i * PREDICTION_INTERVAL
|
|
for i in range(n_steps + 1)
|
|
]
|
|
|
|
if self._current_offsets:
|
|
heights = self._predict_subordinate(timestamps)
|
|
else:
|
|
heights = tide_harmonics.predict(
|
|
self._current_constituents, timestamps)
|
|
|
|
if not heights:
|
|
return None, None, None
|
|
|
|
extrema = tide_harmonics.find_extrema(
|
|
timestamps, heights, min_amplitude=_PREDICTION_EXTREMA_MIN)
|
|
|
|
self._last_prediction = (timestamps, heights)
|
|
self._last_extrema = extrema
|
|
self._last_prediction_time = time.time()
|
|
|
|
logger.info(
|
|
"Prediction: %d points, %d extrema over %dh",
|
|
len(timestamps), len(extrema), duration_hours)
|
|
|
|
return timestamps, heights, extrema
|
|
|
|
def _predict_subordinate(self, timestamps):
|
|
"""Predict tide heights for a subordinate station.
|
|
|
|
Strategy:
|
|
1. Time-shift the entire reference curve by the average of
|
|
the high/low time offsets (handles bulk timing difference).
|
|
2. Find extrema in the shifted reference curve.
|
|
3. Linearly interpolate the height correction factor between
|
|
successive extrema (h_high at highs, h_low at lows).
|
|
4. Apply the interpolated factor to produce the final curve.
|
|
|
|
This yields a smooth continuous curve that matches NOAA's
|
|
subordinate methodology at the extrema and transitions
|
|
smoothly between them.
|
|
"""
|
|
offsets = self._current_offsets
|
|
time_high_s = offsets.get('time_high', 0) * 60.0
|
|
time_low_s = offsets.get('time_low', 0) * 60.0
|
|
h_high = offsets.get('height_high', 1.0)
|
|
h_low = offsets.get('height_low', 1.0)
|
|
h_type = offsets.get('height_type', 'R')
|
|
|
|
avg_time_shift = (time_high_s + time_low_s) / 2.0
|
|
|
|
shifted_ts = [t - avg_time_shift for t in timestamps]
|
|
ref_heights = tide_harmonics.predict(
|
|
self._current_constituents, shifted_ts)
|
|
if not ref_heights:
|
|
return []
|
|
|
|
ref_extrema = _find_subordinate_ref_extrema(
|
|
shifted_ts, ref_heights)
|
|
|
|
if len(ref_extrema) < 2:
|
|
avg_factor = (h_high + h_low) / 2.0
|
|
if h_type == 'R':
|
|
return [h * avg_factor for h in ref_heights]
|
|
return [h + avg_factor for h in ref_heights]
|
|
|
|
ext_times = [e[0] + avg_time_shift for e in ref_extrema]
|
|
ext_factors = [h_high if e[2] == 'high' else h_low
|
|
for e in ref_extrema]
|
|
|
|
result = []
|
|
for i, t in enumerate(timestamps):
|
|
if t <= ext_times[0]:
|
|
factor = ext_factors[0]
|
|
elif t >= ext_times[-1]:
|
|
factor = ext_factors[-1]
|
|
else:
|
|
j = 0
|
|
for k in range(len(ext_times) - 1):
|
|
if ext_times[k] <= t < ext_times[k + 1]:
|
|
j = k
|
|
break
|
|
span = ext_times[j + 1] - ext_times[j]
|
|
frac = (t - ext_times[j]) / max(span, 1.0)
|
|
factor = (ext_factors[j] +
|
|
frac * (ext_factors[j + 1] - ext_factors[j]))
|
|
|
|
if h_type == 'R':
|
|
result.append(ref_heights[i] * factor)
|
|
else:
|
|
result.append(ref_heights[i] + factor)
|
|
|
|
return result
|
|
|
|
def predict_at(self, timestamps):
|
|
"""Evaluate the harmonic model at arbitrary timestamps.
|
|
|
|
Returns a list of MSL-relative heights, or None if no constituents.
|
|
"""
|
|
if not self._current_constituents or not timestamps:
|
|
return None
|
|
return tide_harmonics.predict(self._current_constituents, timestamps)
|
|
|
|
def get_last_prediction(self):
|
|
"""Return the most recent prediction result."""
|
|
if self._last_prediction:
|
|
return (*self._last_prediction, self._last_extrema)
|
|
return None, None, None
|
|
|
|
def has_constituents(self):
|
|
return self._current_constituents is not None
|
|
|
|
@property
|
|
def last_prediction_time(self):
|
|
return self._last_prediction_time
|
|
|
|
def estimate_datum_offset(self):
|
|
"""Estimate MSL-above-MLLW datum offset from constituent amplitudes.
|
|
|
|
Runs a 30-day prediction, finds all lows, groups into daily
|
|
pairs, averages the lower of each pair (the MLLW definition).
|
|
Returns the offset in meters (MSL - MLLW, always >= 0).
|
|
"""
|
|
if not self._current_constituents:
|
|
return 0.0
|
|
|
|
now = time.time()
|
|
step = 600
|
|
n_steps = 30 * 86400 // step
|
|
timestamps = [now + i * step for i in range(n_steps + 1)]
|
|
|
|
heights = tide_harmonics.predict(
|
|
self._current_constituents, timestamps)
|
|
if not heights:
|
|
return 0.0
|
|
|
|
extrema = tide_harmonics.find_extrema(
|
|
timestamps, heights, min_amplitude=_PREDICTION_EXTREMA_MIN)
|
|
|
|
lows = [h for _, h, t in extrema if t == "low"]
|
|
if len(lows) < 2:
|
|
return abs(min(lows)) if lows else 0.0
|
|
|
|
lower_lows = []
|
|
for i in range(0, len(lows) - 1, 2):
|
|
lower_lows.append(min(lows[i], lows[i + 1]))
|
|
if len(lows) % 2 == 1:
|
|
lower_lows.append(lows[-1])
|
|
|
|
mllw = sum(lower_lows) / len(lower_lows)
|
|
offset = -mllw
|
|
logger.info(
|
|
"Estimated datum offset (MSL above MLLW): %.3fm", offset)
|
|
return max(offset, 0.0)
|
|
|
|
# ------------------------------------------------------------------
|
|
# Constituent interpolation
|
|
# ------------------------------------------------------------------
|
|
|
|
def _interpolate_from_grid(self, lat, lon):
|
|
"""Bilinear interpolation of constituents from the coastal grid."""
|
|
if not self.load_grid() or self._grid is None:
|
|
return None
|
|
|
|
points = self._grid.get('points', [])
|
|
const_names = self._grid.get('constituents', [])
|
|
if not points or not const_names:
|
|
return None
|
|
|
|
nearest = self._find_nearest_points(points, lat, lon, count=4)
|
|
if not nearest:
|
|
return None
|
|
|
|
closest_dist = nearest[0][0]
|
|
closest_pt = nearest[0][1]
|
|
|
|
# Too far from any grid point
|
|
if closest_dist > 100000:
|
|
logger.warning(
|
|
"Nearest grid point is %.0fkm away", closest_dist / 1000)
|
|
return None
|
|
|
|
# Very close to a single point -- no interpolation needed
|
|
if closest_dist < 1000 or len(nearest) < 4:
|
|
return self._point_to_constituents(closest_pt, const_names)
|
|
|
|
return self._bilinear_interpolate(nearest, const_names)
|
|
|
|
def _find_nearest_points(self, points, lat, lon, count=4):
|
|
"""Find the N nearest ocean grid points by haversine distance."""
|
|
dists = []
|
|
for pt in points:
|
|
d = _haversine(lat, lon, pt['lat'], pt['lon'])
|
|
dists.append((d, pt))
|
|
dists.sort(key=lambda x: x[0])
|
|
return dists[:count]
|
|
|
|
def _bilinear_interpolate(self, nearest, const_names):
|
|
"""Inverse-distance-weighted interpolation of constituent data.
|
|
|
|
Phase interpolation uses circular averaging (cos/sin components)
|
|
to avoid wraparound artifacts at 0/360 degrees.
|
|
"""
|
|
total_weight = 0.0
|
|
n = len(const_names)
|
|
weighted_amp = [0.0] * n
|
|
weighted_phase_cos = [0.0] * n
|
|
weighted_phase_sin = [0.0] * n
|
|
|
|
for dist, pt in nearest:
|
|
w = 1.0 / max(dist, 1.0)
|
|
total_weight += w
|
|
|
|
amps = pt.get('amp', [])
|
|
phases = pt.get('phase', [])
|
|
|
|
for j in range(min(n, len(amps), len(phases))):
|
|
weighted_amp[j] += w * amps[j]
|
|
ph_rad = phases[j] * tide_harmonics.DEG2RAD
|
|
weighted_phase_cos[j] += w * math.cos(ph_rad)
|
|
weighted_phase_sin[j] += w * math.sin(ph_rad)
|
|
|
|
if total_weight == 0:
|
|
return None
|
|
|
|
constituents = []
|
|
for j, name in enumerate(const_names):
|
|
amp = weighted_amp[j] / total_weight
|
|
phase = math.atan2(
|
|
weighted_phase_sin[j], weighted_phase_cos[j]
|
|
) * tide_harmonics.RAD2DEG
|
|
if phase < 0:
|
|
phase += 360.0
|
|
constituents.append({
|
|
'name': name,
|
|
'amplitude': amp,
|
|
'phase': phase,
|
|
})
|
|
|
|
return constituents
|
|
|
|
@staticmethod
|
|
def _point_to_constituents(pt, const_names):
|
|
"""Convert a single grid point to a constituent list."""
|
|
amps = pt.get('amp', [])
|
|
phases = pt.get('phase', [])
|
|
constituents = []
|
|
for j in range(min(len(const_names), len(amps), len(phases))):
|
|
constituents.append({
|
|
'name': const_names[j],
|
|
'amplitude': amps[j],
|
|
'phase': phases[j],
|
|
})
|
|
return constituents
|
|
|
|
def _load_custom_json(self):
|
|
"""Try loading a custom constituent JSON file."""
|
|
for base in (DATA_DIR, os.path.dirname(__file__)):
|
|
path = os.path.join(base, 'constituents', 'custom.json')
|
|
if os.path.exists(path):
|
|
try:
|
|
with open(path, 'r') as f:
|
|
data = json.load(f)
|
|
constituents = data.get('constituents', [])
|
|
if constituents:
|
|
logger.info(
|
|
"Loaded %d constituents from %s",
|
|
len(constituents), path)
|
|
return constituents
|
|
except (OSError, json.JSONDecodeError) as e:
|
|
logger.warning("Failed to load %s: %s", path, e)
|
|
return None
|
|
|
|
|
|
def _haversine(lat1, lon1, lat2, lon2):
|
|
"""Great-circle distance in meters between two GPS coordinates."""
|
|
R = 6371000
|
|
phi1 = math.radians(lat1)
|
|
phi2 = math.radians(lat2)
|
|
dphi = math.radians(lat2 - lat1)
|
|
dlam = math.radians(lon2 - lon1)
|
|
a = (math.sin(dphi / 2) ** 2 +
|
|
math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2)
|
|
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
|
|
|
|
|
|
def _correlation(xs, ys):
|
|
"""Pearson correlation coefficient between two centered sequences."""
|
|
n = len(xs)
|
|
if n == 0:
|
|
return 0.0
|
|
sum_xy = sum(x * y for x, y in zip(xs, ys))
|
|
sum_x2 = sum(x * x for x in xs)
|
|
sum_y2 = sum(y * y for y in ys)
|
|
denom = math.sqrt(sum_x2 * sum_y2)
|
|
if denom < 1e-12:
|
|
return 0.0
|
|
return sum_xy / denom
|