Files
venus/dbus-tides/tide_predictor.py
dev 9756538f16 Initial commit: Venus OS boat addons monorepo
Organizes 11 projects for Cerbo GX/Venus OS into a single repository:
- axiom-nmea: Raymarine LightHouse protocol decoder
- dbus-generator-ramp: Generator current ramp controller
- dbus-lightning: Blitzortung lightning monitor
- dbus-meteoblue-forecast: Meteoblue weather forecast
- dbus-no-foreign-land: noforeignland.com tracking
- dbus-tides: Tide prediction from depth + harmonics
- dbus-vrm-history: VRM cloud history proxy
- dbus-windy-station: Windy.com weather upload
- mfd-custom-app: MFD app deployment package
- venus-html5-app: Custom Victron HTML5 app fork
- watermaker: Watermaker PLC control UI

Adds root README, .gitignore, project template, and per-project
.gitignore files. Sensitive config files excluded via .gitignore
with .example templates provided.

Made-with: Cursor
2026-03-16 17:04:16 +00:00

825 lines
30 KiB
Python

"""
Tide prediction orchestrator.
Loads tidal constituent data from the bundled coastal grid (or fallback
sources), interpolates for the vessel position, and generates tide
predictions using the pure-Python harmonic engine.
"""
import gzip
import json
import logging
import math
import os
import time
from config import (
BESTFIT_MIN_HISTORY_HOURS,
CONSTITUENTS_DIR,
DATA_DIR,
GRID_FILE_NAME,
NOAA_NEARBY_COUNT,
NOAA_STATION_MAX_DISTANCE,
NOAA_STATIONS_FILE_NAME,
PREDICTION_HOURS,
PREDICTION_INTERVAL,
)
import tide_harmonics
# Harmonic predictions are smooth mathematical curves with no noise,
# so extrema filtering only needs to suppress shallow-water harmonic
# artifacts (typically < 1 cm). The observed-data threshold
# (MIN_TIDE_AMPLITUDE) is far too aggressive here and drops
# legitimate small-range tidal cycles caused by diurnal inequality.
_PREDICTION_EXTREMA_MIN = 0.02
logger = logging.getLogger('TidePredictor')
def _find_subordinate_ref_extrema(timestamps, heights, min_amplitude=0.4):
"""Single-pass extrema finder for subordinate reference curves.
The subordinate height-correction factor (h_high / h_low) is
interpolated between reference-curve extrema. The multi-pass
pair-removal in find_extrema() selects different surviving extrema,
shifting factor boundaries and distorting the subordinate curve.
This preserves the original sequential logic so the curve shape
remains stable.
"""
if len(timestamps) < 3:
return []
extrema = []
last_h = None
for i in range(1, len(heights) - 1):
hp = heights[i - 1]
hc = heights[i]
hn = heights[i + 1]
if hc > hp and hc > hn:
etype = "high"
elif hc < hp and hc < hn:
etype = "low"
else:
continue
if last_h is not None and abs(hc - last_h) < min_amplitude:
continue
extrema.append((timestamps[i], hc, etype))
last_h = hc
return extrema
class TidePredictor:
"""Manages constituent data and produces tide predictions."""
def __init__(self):
self._grid = None
self._grid_loaded = False
self._noaa_stations = None
self._noaa_loaded = False
self._current_constituents = None
self._current_source = None
self._current_station = None
self._current_offsets = None
self._nearby_stations = []
self._station_override = None
self._current_lat = None
self._current_lon = None
self._last_prediction = None
self._last_extrema = None
self._last_prediction_time = 0
# ------------------------------------------------------------------
# NOAA station loading
# ------------------------------------------------------------------
def load_noaa_stations(self):
"""Load the pre-built NOAA station database from disk."""
if self._noaa_loaded:
return self._noaa_stations is not None
self._noaa_loaded = True
path = self._find_noaa_file()
if not path:
logger.info("No NOAA station file found")
return False
try:
if path.endswith('.gz'):
with gzip.open(path, 'rt', encoding='utf-8') as f:
self._noaa_stations = json.load(f)
else:
with open(path, 'r') as f:
self._noaa_stations = json.load(f)
n = len(self._noaa_stations.get('stations', []))
logger.info("Loaded NOAA station database: %d stations", n)
return True
except (OSError, json.JSONDecodeError) as e:
logger.error("Failed to load NOAA stations: %s", e)
self._noaa_stations = None
return False
def _find_noaa_file(self):
"""Search for the NOAA station file in known locations."""
candidates = [
os.path.join(DATA_DIR, 'constituents', NOAA_STATIONS_FILE_NAME),
os.path.join(CONSTITUENTS_DIR, NOAA_STATIONS_FILE_NAME),
os.path.join(os.path.dirname(__file__), 'constituents',
NOAA_STATIONS_FILE_NAME),
os.path.join(os.path.dirname(__file__), 'constituents',
NOAA_STATIONS_FILE_NAME.replace('.gz', '')),
]
for path in candidates:
if os.path.exists(path):
logger.info("Found NOAA station file: %s", path)
return path
return None
def _find_nearby_noaa_stations(self, lat, lon):
"""Find the N nearest NOAA stations within range.
Populates self._nearby_stations with (distance, station) pairs
for all stations within NOAA_STATION_MAX_DISTANCE, sorted by
distance (nearest first), up to NOAA_NEARBY_COUNT.
"""
self._nearby_stations = []
if not self.load_noaa_stations() or self._noaa_stations is None:
return
stations = self._noaa_stations.get('stations', [])
if not stations:
return
dists = []
for st in stations:
d = _haversine(lat, lon, st['lat'], st['lon'])
if d <= NOAA_STATION_MAX_DISTANCE:
dists.append((d, st))
dists.sort(key=lambda x: x[0])
self._nearby_stations = dists[:NOAA_NEARBY_COUNT]
if self._nearby_stations:
logger.info(
"Found %d NOAA stations within %.0f nm",
len(self._nearby_stations),
NOAA_STATION_MAX_DISTANCE / 1852)
for d, st in self._nearby_stations:
stype = ' (sub)' if st.get('type') == 'S' else ''
logger.info(
" %s %s (%.1f nm)%s",
st.get('id', '?'), st.get('name', '?'),
d / 1852, stype)
else:
logger.info(
"No NOAA stations within %.0f nm of %.2f, %.2f",
NOAA_STATION_MAX_DISTANCE / 1852, lat, lon)
def _select_noaa_station(self, lat, lon, depth_history=None):
"""Select the best NOAA station for the current position.
Selection priority:
1. Manual override (if station_override is set and valid)
2. Best-fit scoring against observed depth history
3. Nearest station by distance
Returns a constituent list, or None if no station in range.
"""
self._find_nearby_noaa_stations(lat, lon)
if not self._nearby_stations:
return None
chosen_dist, chosen_station = None, None
if self._station_override:
for d, st in self._nearby_stations:
if st.get('id') == self._station_override:
chosen_dist, chosen_station = d, st
logger.info(
"Using overridden station %s (%s)",
st.get('id', '?'), st.get('name', '?'))
break
if chosen_station is None:
all_stations = self._noaa_stations.get('stations', [])
for st in all_stations:
if st.get('id') == self._station_override:
chosen_dist = _haversine(
lat, lon, st['lat'], st['lon'])
chosen_station = st
logger.info(
"Using overridden station %s (%s), %.1f km away",
st.get('id', '?'), st.get('name', '?'),
chosen_dist / 1000)
break
if chosen_station is None and depth_history:
scored = self._rank_stations_by_fit(
self._nearby_stations, depth_history)
if scored:
chosen_dist, chosen_station = scored[0]
if chosen_station is None:
chosen_dist, chosen_station = self._nearby_stations[0]
is_subordinate = chosen_station.get('type') == 'S'
if is_subordinate:
constituents, offsets = self._resolve_subordinate(chosen_station)
else:
constituents = chosen_station.get('constituents', [])
offsets = None
if not constituents:
return None
self._current_offsets = offsets
self._current_station = {
'id': chosen_station.get('id', ''),
'name': chosen_station.get('name', ''),
'lat': chosen_station.get('lat', 0.0),
'lon': chosen_station.get('lon', 0.0),
'distance': chosen_dist,
}
if is_subordinate:
self._current_station['type'] = 'S'
self._current_station['ref_id'] = chosen_station.get('ref_id', '')
self._current_source = 'noaa:%s' % chosen_station.get('id', '?')
logger.info(
"Selected NOAA station %s (%s), %.1f km, %d constituents%s",
chosen_station.get('id', '?'),
chosen_station.get('name', '?'),
chosen_dist / 1000,
len(constituents),
' (subordinate)' if is_subordinate else '')
return constituents
def _resolve_subordinate(self, sub_station):
"""Resolve a subordinate station to its reference station's
constituents and the subordinate offsets.
Returns (constituents, offsets) or (None, None).
"""
ref_id = sub_station.get('ref_id', '')
if not ref_id or not self._noaa_stations:
return None, None
ref_station = None
for st in self._noaa_stations.get('stations', []):
if st.get('id') == ref_id and st.get('constituents'):
ref_station = st
break
if not ref_station:
logger.warning(
"Reference station %s not found for subordinate %s",
ref_id, sub_station.get('id', '?'))
return None, None
offsets = sub_station.get('offsets', {})
logger.info(
"Resolved subordinate %s -> reference %s (%s), "
"time_high=%+dmin, time_low=%+dmin, "
"height_high=%.2f, height_low=%.2f (%s)",
sub_station.get('id', '?'),
ref_id, ref_station.get('name', '?'),
offsets.get('time_high', 0), offsets.get('time_low', 0),
offsets.get('height_high', 1.0), offsets.get('height_low', 1.0),
offsets.get('height_type', 'R'))
return ref_station['constituents'], offsets
def _rank_stations_by_fit(self, candidates, depth_history):
"""Score candidate stations against observed depth data.
Runs a hindcast for each candidate over the observed time window,
then picks the one whose predicted tidal shape best correlates
with the detrended depth observations.
Returns candidates re-sorted by fit quality (best first),
or None if insufficient history.
"""
if not depth_history or len(depth_history) < 6:
return None
span_hours = (depth_history[-1][0] - depth_history[0][0]) / 3600.0
if span_hours < BESTFIT_MIN_HISTORY_HOURS:
return None
obs_ts = [ts for ts, _ in depth_history]
obs_depths = [d for _, d in depth_history]
obs_mean = sum(obs_depths) / len(obs_depths)
obs_centered = [d - obs_mean for d in obs_depths]
scored = []
for dist, st in candidates:
if st.get('type') == 'S':
constituents, offsets = self._resolve_subordinate(st)
else:
constituents = st.get('constituents', [])
offsets = None
if not constituents:
continue
if offsets:
time_shift = (offsets.get('time_high', 0) +
offsets.get('time_low', 0)) / 2.0 * 60.0
shifted_ts = [t - time_shift for t in obs_ts]
raw_heights = tide_harmonics.predict(
constituents, shifted_ts)
if not raw_heights or len(raw_heights) != len(obs_ts):
continue
h_high = offsets.get('height_high', 1.0)
h_low = offsets.get('height_low', 1.0)
h_type = offsets.get('height_type', 'R')
avg_factor = (h_high + h_low) / 2.0
if h_type == 'R':
pred_heights = [h * avg_factor for h in raw_heights]
else:
pred_heights = [h + avg_factor for h in raw_heights]
else:
pred_heights = tide_harmonics.predict(constituents, obs_ts)
if not pred_heights or len(pred_heights) != len(obs_ts):
continue
pred_mean = sum(pred_heights) / len(pred_heights)
pred_centered = [h - pred_mean for h in pred_heights]
corr = _correlation(obs_centered, pred_centered)
scored.append((dist, st, corr))
if not scored:
return None
scored.sort(key=lambda x: -x[2])
for dist, st, corr in scored:
stype = ' (sub)' if st.get('type') == 'S' else ''
logger.info(
" Fit score: %s %s = %.3f (%.1f nm)%s",
st.get('id', '?'), st.get('name', '?'),
corr, dist / 1852, stype)
best_corr = scored[0][2]
if best_corr < 0.3:
logger.info("Best fit correlation %.3f too low, using nearest",
best_corr)
return None
return [(d, st) for d, st, _ in scored]
@property
def station_override(self):
return self._station_override
@station_override.setter
def station_override(self, station_id):
"""Set a manual station override. Empty string or None to clear."""
if station_id:
self._station_override = str(station_id).strip()
logger.info("Station override set: %s", self._station_override)
else:
self._station_override = None
logger.info("Station override cleared")
# ------------------------------------------------------------------
# Grid loading
# ------------------------------------------------------------------
def load_grid(self):
"""Load the pre-computed coastal grid from disk."""
if self._grid_loaded:
return self._grid is not None
self._grid_loaded = True
grid_path = self._find_grid_file()
if not grid_path:
logger.warning("No coastal grid file found")
return False
try:
if grid_path.endswith('.gz'):
with gzip.open(grid_path, 'rt', encoding='utf-8') as f:
self._grid = json.load(f)
else:
with open(grid_path, 'r') as f:
self._grid = json.load(f)
n_points = len(self._grid.get('points', []))
regions = self._grid.get('regions', [])
logger.info(
"Loaded coastal grid: %d points, regions: %s",
n_points, regions)
return True
except (OSError, json.JSONDecodeError) as e:
logger.error("Failed to load grid: %s", e)
self._grid = None
return False
def _find_grid_file(self):
"""Search for the grid file in known locations."""
candidates = [
os.path.join(DATA_DIR, 'constituents', GRID_FILE_NAME),
os.path.join(CONSTITUENTS_DIR, GRID_FILE_NAME),
os.path.join(os.path.dirname(__file__), 'constituents', GRID_FILE_NAME),
os.path.join(os.path.dirname(__file__), 'constituents',
GRID_FILE_NAME.replace('.gz', '')),
]
for path in candidates:
if os.path.exists(path):
logger.info("Found grid file: %s", path)
return path
return None
# ------------------------------------------------------------------
# Location update
# ------------------------------------------------------------------
def update_location(self, lat, lon, depth_history=None):
"""Update constituents for a new vessel position.
Tries data sources in priority order:
1. NOAA station (observation-derived, best near coast)
2. Coastal grid (global model, interpolated)
3. Custom JSON (manual fallback)
When depth_history is provided and long enough, candidate NOAA
stations are scored against the observations to pick the best fit.
Returns True if new constituents were loaded.
"""
self._current_lat = lat
self._current_lon = lon
self._current_source = None
self._current_station = None
self._current_offsets = None
constituents = self._select_noaa_station(
lat, lon, depth_history=depth_history)
if constituents is None:
constituents = self._interpolate_from_grid(lat, lon)
if constituents is not None:
self._current_source = 'grid'
if constituents is None:
constituents = self._load_custom_json()
if constituents is not None:
self._current_source = 'custom'
if constituents is None:
logger.warning("No constituent data for %.2f, %.2f", lat, lon)
return False
self._current_constituents = constituents
logger.info(
"Loaded %d constituents for %.2f, %.2f (source: %s)",
len(constituents), lat, lon, self._current_source)
return True
@property
def current_source(self):
"""Return the name of the data source for current constituents."""
return self._current_source
@property
def current_station(self):
"""Return metadata dict for the selected NOAA station, or None."""
return self._current_station
@property
def nearby_stations(self):
"""Return list of nearby station dicts with distance info."""
result = []
for dist, st in self._nearby_stations:
entry = {
'id': st.get('id', ''),
'name': st.get('name', ''),
'lat': st.get('lat', 0.0),
'lon': st.get('lon', 0.0),
'distance': round(dist / 1000, 1),
}
if st.get('type') == 'S':
entry['type'] = 'S'
entry['ref_id'] = st.get('ref_id', '')
result.append(entry)
return result
# ------------------------------------------------------------------
# Prediction
# ------------------------------------------------------------------
def predict(self, from_time=None, duration_hours=None):
"""Generate a tide prediction curve.
Args:
from_time: Unix timestamp for the start of the curve
(default: now).
duration_hours: length of the curve in hours
(default: PREDICTION_HOURS).
Returns:
(timestamps, heights, extrema) or (None, None, None).
"""
if not self._current_constituents:
return None, None, None
if from_time is None:
from_time = time.time()
if duration_hours is None:
duration_hours = PREDICTION_HOURS
n_steps = duration_hours * 3600 // PREDICTION_INTERVAL
timestamps = [
from_time + i * PREDICTION_INTERVAL
for i in range(n_steps + 1)
]
if self._current_offsets:
heights = self._predict_subordinate(timestamps)
else:
heights = tide_harmonics.predict(
self._current_constituents, timestamps)
if not heights:
return None, None, None
extrema = tide_harmonics.find_extrema(
timestamps, heights, min_amplitude=_PREDICTION_EXTREMA_MIN)
self._last_prediction = (timestamps, heights)
self._last_extrema = extrema
self._last_prediction_time = time.time()
logger.info(
"Prediction: %d points, %d extrema over %dh",
len(timestamps), len(extrema), duration_hours)
return timestamps, heights, extrema
def _predict_subordinate(self, timestamps):
"""Predict tide heights for a subordinate station.
Strategy:
1. Time-shift the entire reference curve by the average of
the high/low time offsets (handles bulk timing difference).
2. Find extrema in the shifted reference curve.
3. Linearly interpolate the height correction factor between
successive extrema (h_high at highs, h_low at lows).
4. Apply the interpolated factor to produce the final curve.
This yields a smooth continuous curve that matches NOAA's
subordinate methodology at the extrema and transitions
smoothly between them.
"""
offsets = self._current_offsets
time_high_s = offsets.get('time_high', 0) * 60.0
time_low_s = offsets.get('time_low', 0) * 60.0
h_high = offsets.get('height_high', 1.0)
h_low = offsets.get('height_low', 1.0)
h_type = offsets.get('height_type', 'R')
avg_time_shift = (time_high_s + time_low_s) / 2.0
shifted_ts = [t - avg_time_shift for t in timestamps]
ref_heights = tide_harmonics.predict(
self._current_constituents, shifted_ts)
if not ref_heights:
return []
ref_extrema = _find_subordinate_ref_extrema(
shifted_ts, ref_heights)
if len(ref_extrema) < 2:
avg_factor = (h_high + h_low) / 2.0
if h_type == 'R':
return [h * avg_factor for h in ref_heights]
return [h + avg_factor for h in ref_heights]
ext_times = [e[0] + avg_time_shift for e in ref_extrema]
ext_factors = [h_high if e[2] == 'high' else h_low
for e in ref_extrema]
result = []
for i, t in enumerate(timestamps):
if t <= ext_times[0]:
factor = ext_factors[0]
elif t >= ext_times[-1]:
factor = ext_factors[-1]
else:
j = 0
for k in range(len(ext_times) - 1):
if ext_times[k] <= t < ext_times[k + 1]:
j = k
break
span = ext_times[j + 1] - ext_times[j]
frac = (t - ext_times[j]) / max(span, 1.0)
factor = (ext_factors[j] +
frac * (ext_factors[j + 1] - ext_factors[j]))
if h_type == 'R':
result.append(ref_heights[i] * factor)
else:
result.append(ref_heights[i] + factor)
return result
def predict_at(self, timestamps):
"""Evaluate the harmonic model at arbitrary timestamps.
Returns a list of MSL-relative heights, or None if no constituents.
"""
if not self._current_constituents or not timestamps:
return None
return tide_harmonics.predict(self._current_constituents, timestamps)
def get_last_prediction(self):
"""Return the most recent prediction result."""
if self._last_prediction:
return (*self._last_prediction, self._last_extrema)
return None, None, None
def has_constituents(self):
return self._current_constituents is not None
@property
def last_prediction_time(self):
return self._last_prediction_time
def estimate_datum_offset(self):
"""Estimate MSL-above-MLLW datum offset from constituent amplitudes.
Runs a 30-day prediction, finds all lows, groups into daily
pairs, averages the lower of each pair (the MLLW definition).
Returns the offset in meters (MSL - MLLW, always >= 0).
"""
if not self._current_constituents:
return 0.0
now = time.time()
step = 600
n_steps = 30 * 86400 // step
timestamps = [now + i * step for i in range(n_steps + 1)]
heights = tide_harmonics.predict(
self._current_constituents, timestamps)
if not heights:
return 0.0
extrema = tide_harmonics.find_extrema(
timestamps, heights, min_amplitude=_PREDICTION_EXTREMA_MIN)
lows = [h for _, h, t in extrema if t == "low"]
if len(lows) < 2:
return abs(min(lows)) if lows else 0.0
lower_lows = []
for i in range(0, len(lows) - 1, 2):
lower_lows.append(min(lows[i], lows[i + 1]))
if len(lows) % 2 == 1:
lower_lows.append(lows[-1])
mllw = sum(lower_lows) / len(lower_lows)
offset = -mllw
logger.info(
"Estimated datum offset (MSL above MLLW): %.3fm", offset)
return max(offset, 0.0)
# ------------------------------------------------------------------
# Constituent interpolation
# ------------------------------------------------------------------
def _interpolate_from_grid(self, lat, lon):
"""Bilinear interpolation of constituents from the coastal grid."""
if not self.load_grid() or self._grid is None:
return None
points = self._grid.get('points', [])
const_names = self._grid.get('constituents', [])
if not points or not const_names:
return None
nearest = self._find_nearest_points(points, lat, lon, count=4)
if not nearest:
return None
closest_dist = nearest[0][0]
closest_pt = nearest[0][1]
# Too far from any grid point
if closest_dist > 100000:
logger.warning(
"Nearest grid point is %.0fkm away", closest_dist / 1000)
return None
# Very close to a single point -- no interpolation needed
if closest_dist < 1000 or len(nearest) < 4:
return self._point_to_constituents(closest_pt, const_names)
return self._bilinear_interpolate(nearest, const_names)
def _find_nearest_points(self, points, lat, lon, count=4):
"""Find the N nearest ocean grid points by haversine distance."""
dists = []
for pt in points:
d = _haversine(lat, lon, pt['lat'], pt['lon'])
dists.append((d, pt))
dists.sort(key=lambda x: x[0])
return dists[:count]
def _bilinear_interpolate(self, nearest, const_names):
"""Inverse-distance-weighted interpolation of constituent data.
Phase interpolation uses circular averaging (cos/sin components)
to avoid wraparound artifacts at 0/360 degrees.
"""
total_weight = 0.0
n = len(const_names)
weighted_amp = [0.0] * n
weighted_phase_cos = [0.0] * n
weighted_phase_sin = [0.0] * n
for dist, pt in nearest:
w = 1.0 / max(dist, 1.0)
total_weight += w
amps = pt.get('amp', [])
phases = pt.get('phase', [])
for j in range(min(n, len(amps), len(phases))):
weighted_amp[j] += w * amps[j]
ph_rad = phases[j] * tide_harmonics.DEG2RAD
weighted_phase_cos[j] += w * math.cos(ph_rad)
weighted_phase_sin[j] += w * math.sin(ph_rad)
if total_weight == 0:
return None
constituents = []
for j, name in enumerate(const_names):
amp = weighted_amp[j] / total_weight
phase = math.atan2(
weighted_phase_sin[j], weighted_phase_cos[j]
) * tide_harmonics.RAD2DEG
if phase < 0:
phase += 360.0
constituents.append({
'name': name,
'amplitude': amp,
'phase': phase,
})
return constituents
@staticmethod
def _point_to_constituents(pt, const_names):
"""Convert a single grid point to a constituent list."""
amps = pt.get('amp', [])
phases = pt.get('phase', [])
constituents = []
for j in range(min(len(const_names), len(amps), len(phases))):
constituents.append({
'name': const_names[j],
'amplitude': amps[j],
'phase': phases[j],
})
return constituents
def _load_custom_json(self):
"""Try loading a custom constituent JSON file."""
for base in (DATA_DIR, os.path.dirname(__file__)):
path = os.path.join(base, 'constituents', 'custom.json')
if os.path.exists(path):
try:
with open(path, 'r') as f:
data = json.load(f)
constituents = data.get('constituents', [])
if constituents:
logger.info(
"Loaded %d constituents from %s",
len(constituents), path)
return constituents
except (OSError, json.JSONDecodeError) as e:
logger.warning("Failed to load %s: %s", path, e)
return None
def _haversine(lat1, lon1, lat2, lon2):
"""Great-circle distance in meters between two GPS coordinates."""
R = 6371000
phi1 = math.radians(lat1)
phi2 = math.radians(lat2)
dphi = math.radians(lat2 - lat1)
dlam = math.radians(lon2 - lon1)
a = (math.sin(dphi / 2) ** 2 +
math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2)
return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
def _correlation(xs, ys):
"""Pearson correlation coefficient between two centered sequences."""
n = len(xs)
if n == 0:
return 0.0
sum_xy = sum(x * y for x, y in zip(xs, ys))
sum_x2 = sum(x * x for x in xs)
sum_y2 = sum(y * y for y in ys)
denom = math.sqrt(sum_x2 * sum_y2)
if denom < 1e-12:
return 0.0
return sum_xy / denom