Files
venus/dbus-tides/tide_detector.py
dev 9756538f16 Initial commit: Venus OS boat addons monorepo
Organizes 11 projects for Cerbo GX/Venus OS into a single repository:
- axiom-nmea: Raymarine LightHouse protocol decoder
- dbus-generator-ramp: Generator current ramp controller
- dbus-lightning: Blitzortung lightning monitor
- dbus-meteoblue-forecast: Meteoblue weather forecast
- dbus-no-foreign-land: noforeignland.com tracking
- dbus-tides: Tide prediction from depth + harmonics
- dbus-vrm-history: VRM cloud history proxy
- dbus-windy-station: Windy.com weather upload
- mfd-custom-app: MFD app deployment package
- venus-html5-app: Custom Victron HTML5 app fork
- watermaker: Watermaker PLC control UI

Adds root README, .gitignore, project template, and per-project
.gitignore files. Sensitive config files excluded via .gitignore
with .example templates provided.

Made-with: Cursor
2026-03-16 17:04:16 +00:00

246 lines
8.3 KiB
Python

"""
Tide event detector.
Multi-stage filtering pipeline detects high/low tide events from noisy
depth observations taken at anchor.
Pipeline:
1. Median filter removes impulse noise from anchor swing
2. Triangular-weighted moving average extracts the tidal trend
3. Slope reversals identify candidate extrema
4. Iterative merge enforces strict high-low alternation
5. Amplitude gate rejects sub-tidal oscillations
6. Confirmation delay holds back the most recent candidate until the
smoothed curve has moved away from it, adapting naturally to
conditions: fast in calm water, slower in noisy anchor-swing.
"""
import logging
import time
from collections import deque
from config import (
DEPTH_AVG_WINDOW,
MEDIAN_FILTER_KERNEL,
MIN_TIDE_AMPLITUDE,
SMOOTHING_WINDOW,
TIDE_CONFIRMATION_DEPTH,
)
logger = logging.getLogger('TideDetector')
_SMOOTH_SLOTS = SMOOTHING_WINDOW // DEPTH_AVG_WINDOW
class TideDetector:
"""Detects high and low tide events from smoothed depth observations."""
def __init__(self, min_amplitude=None):
self.min_amplitude = min_amplitude or MIN_TIDE_AMPLITUDE
self.events = deque(maxlen=50)
def update(self, history):
"""Process the full depth history and detect new tide events.
Re-processes the entire smoothed history on each call. The most
recent candidate is held back until confirmed by a sustained depth
excursion, so in calm conditions events are reported promptly while
in noisy conditions (anchor swing) reporting waits until the trend
is unambiguous.
Args:
history: list of (timestamp, depth_avg) from DepthRecorder
Returns:
Newly detected event tuple (timestamp, depth, type) or None.
"""
if len(history) < _SMOOTH_SLOTS + 1:
return None
smoothed = self._smooth(history)
if len(smoothed) < 2:
return None
confirmed = self._detect_events(smoothed)
new_event = None
for evt in confirmed:
if not self._already_reported(evt[0]):
self.events.append(evt)
logger.info(
"Detected %s tide: depth=%.2fm at %s",
evt[2], evt[1],
time.strftime('%H:%M', time.localtime(evt[0])))
new_event = evt
return new_event
def _already_reported(self, ts):
"""True if an event near this timestamp is already in the deque."""
for existing in self.events:
if abs(existing[0] - ts) < SMOOTHING_WINDOW:
return True
return False
# ------------------------------------------------------------------
# Smoothing
# ------------------------------------------------------------------
def _smooth(self, history):
"""Median pre-filter followed by triangular-weighted moving average."""
if len(history) < _SMOOTH_SLOTS:
return list(history)
filtered = self._median_filter(history)
return self._triangular_smooth(filtered)
@staticmethod
def _median_filter(data):
"""Remove impulse noise from anchor swing."""
half = MEDIAN_FILTER_KERNEL // 2
result = []
for i in range(len(data)):
lo = max(0, i - half)
hi = min(len(data), i + half + 1)
depths = sorted(d for _, d in data[lo:hi])
result.append((data[i][0], depths[len(depths) // 2]))
return result
@staticmethod
def _triangular_smooth(data):
"""Triangular-weighted moving average for clean frequency rolloff."""
half = _SMOOTH_SLOTS // 2
if len(data) < _SMOOTH_SLOTS:
return list(data)
result = []
for i in range(half, len(data) - half):
window = data[i - half:i + half + 1]
weights = [
1.0 - abs(j - half) / (half + 1)
for j in range(len(window))
]
w_sum = sum(weights)
avg_ts = sum(
w * ts for w, (ts, _) in zip(weights, window)) / w_sum
avg_d = sum(
w * d for w, (_, d) in zip(weights, window)) / w_sum
result.append((avg_ts, avg_d))
return result
# ------------------------------------------------------------------
# Event detection
# ------------------------------------------------------------------
def _detect_events(self, smoothed):
"""Find, merge, gate, and confirm tide events from smoothed data."""
reversals = self._find_reversals(smoothed)
cleaned = self._merge_and_gate(reversals)
return self._confirm_events(smoothed, cleaned)
@staticmethod
def _find_reversals(smoothed):
"""Detect all slope sign changes in the smoothed series."""
candidates = []
prev_sign = 0
for i in range(1, len(smoothed)):
ts_prev, d_prev = smoothed[i - 1]
ts_curr, d_curr = smoothed[i]
if d_curr > d_prev:
new_sign = 1
elif d_curr < d_prev:
new_sign = -1
else:
continue
if prev_sign != 0 and new_sign != prev_sign:
typ = "high" if prev_sign == 1 else "low"
candidates.append((ts_prev, d_prev, typ))
prev_sign = new_sign
return candidates
def _merge_and_gate(self, candidates):
"""Iteratively merge same-type events and apply amplitude gate.
When noise-induced mini-reversals produce consecutive same-type
candidates the most extreme depth is kept. Small-amplitude
oscillations are then pruned. Iteration continues until the
sequence is stable and strictly alternates high-low.
"""
events = list(candidates)
changed = True
while changed:
changed = False
merged = []
for ts, d, typ in events:
if merged and merged[-1][2] == typ:
prev_d = merged[-1][1]
if ((typ == "high" and d > prev_d)
or (typ == "low" and d < prev_d)):
merged[-1] = (ts, d, typ)
else:
merged.append((ts, d, typ))
if len(merged) != len(events):
changed = True
filtered = [merged[0]] if merged else []
for i in range(1, len(merged)):
ts, d, typ = merged[i]
if abs(d - filtered[-1][1]) < self.min_amplitude:
changed = True
continue
filtered.append((ts, d, typ))
events = filtered
return events
@staticmethod
def _confirm_events(smoothed, events):
"""Hold back the most recent event until confirmed.
Confirmation requires smoothed depths after the candidate to have
moved away by at least TIDE_CONFIRMATION_DEPTH. The smoothing
window naturally makes this condition harder to meet in noisy
conditions, providing adaptive delay.
"""
if not events:
return events
last_ts, last_d, last_type = events[-1]
max_excursion = 0.0
for ts, d in smoothed:
if ts <= last_ts:
continue
if last_type == "high":
excursion = last_d - d
else:
excursion = d - last_d
if excursion > max_excursion:
max_excursion = excursion
if max_excursion < TIDE_CONFIRMATION_DEPTH:
return events[:-1]
return events
# ------------------------------------------------------------------
# Public accessors
# ------------------------------------------------------------------
def get_recent_events(self, count=10):
"""Return the most recent tide events."""
events = list(self.events)
return events[-count:] if len(events) > count else events
def get_last_high(self):
"""Return the most recent high tide event, or None."""
for event in reversed(self.events):
if event[2] == "high":
return event
return None
def get_last_low(self):
"""Return the most recent low tide event, or None."""
for event in reversed(self.events):
if event[2] == "low":
return event
return None