Organizes 11 projects for Cerbo GX/Venus OS into a single repository: - axiom-nmea: Raymarine LightHouse protocol decoder - dbus-generator-ramp: Generator current ramp controller - dbus-lightning: Blitzortung lightning monitor - dbus-meteoblue-forecast: Meteoblue weather forecast - dbus-no-foreign-land: noforeignland.com tracking - dbus-tides: Tide prediction from depth + harmonics - dbus-vrm-history: VRM cloud history proxy - dbus-windy-station: Windy.com weather upload - mfd-custom-app: MFD app deployment package - venus-html5-app: Custom Victron HTML5 app fork - watermaker: Watermaker PLC control UI Adds root README, .gitignore, project template, and per-project .gitignore files. Sensitive config files excluded via .gitignore with .example templates provided. Made-with: Cursor
246 lines
8.3 KiB
Python
246 lines
8.3 KiB
Python
"""
|
|
Tide event detector.
|
|
|
|
Multi-stage filtering pipeline detects high/low tide events from noisy
|
|
depth observations taken at anchor.
|
|
|
|
Pipeline:
|
|
1. Median filter removes impulse noise from anchor swing
|
|
2. Triangular-weighted moving average extracts the tidal trend
|
|
3. Slope reversals identify candidate extrema
|
|
4. Iterative merge enforces strict high-low alternation
|
|
5. Amplitude gate rejects sub-tidal oscillations
|
|
6. Confirmation delay holds back the most recent candidate until the
|
|
smoothed curve has moved away from it, adapting naturally to
|
|
conditions: fast in calm water, slower in noisy anchor-swing.
|
|
"""
|
|
|
|
import logging
|
|
import time
|
|
from collections import deque
|
|
|
|
from config import (
|
|
DEPTH_AVG_WINDOW,
|
|
MEDIAN_FILTER_KERNEL,
|
|
MIN_TIDE_AMPLITUDE,
|
|
SMOOTHING_WINDOW,
|
|
TIDE_CONFIRMATION_DEPTH,
|
|
)
|
|
|
|
logger = logging.getLogger('TideDetector')
|
|
|
|
_SMOOTH_SLOTS = SMOOTHING_WINDOW // DEPTH_AVG_WINDOW
|
|
|
|
|
|
class TideDetector:
|
|
"""Detects high and low tide events from smoothed depth observations."""
|
|
|
|
def __init__(self, min_amplitude=None):
|
|
self.min_amplitude = min_amplitude or MIN_TIDE_AMPLITUDE
|
|
self.events = deque(maxlen=50)
|
|
|
|
def update(self, history):
|
|
"""Process the full depth history and detect new tide events.
|
|
|
|
Re-processes the entire smoothed history on each call. The most
|
|
recent candidate is held back until confirmed by a sustained depth
|
|
excursion, so in calm conditions events are reported promptly while
|
|
in noisy conditions (anchor swing) reporting waits until the trend
|
|
is unambiguous.
|
|
|
|
Args:
|
|
history: list of (timestamp, depth_avg) from DepthRecorder
|
|
|
|
Returns:
|
|
Newly detected event tuple (timestamp, depth, type) or None.
|
|
"""
|
|
if len(history) < _SMOOTH_SLOTS + 1:
|
|
return None
|
|
|
|
smoothed = self._smooth(history)
|
|
if len(smoothed) < 2:
|
|
return None
|
|
|
|
confirmed = self._detect_events(smoothed)
|
|
|
|
new_event = None
|
|
for evt in confirmed:
|
|
if not self._already_reported(evt[0]):
|
|
self.events.append(evt)
|
|
logger.info(
|
|
"Detected %s tide: depth=%.2fm at %s",
|
|
evt[2], evt[1],
|
|
time.strftime('%H:%M', time.localtime(evt[0])))
|
|
new_event = evt
|
|
|
|
return new_event
|
|
|
|
def _already_reported(self, ts):
|
|
"""True if an event near this timestamp is already in the deque."""
|
|
for existing in self.events:
|
|
if abs(existing[0] - ts) < SMOOTHING_WINDOW:
|
|
return True
|
|
return False
|
|
|
|
# ------------------------------------------------------------------
|
|
# Smoothing
|
|
# ------------------------------------------------------------------
|
|
|
|
def _smooth(self, history):
|
|
"""Median pre-filter followed by triangular-weighted moving average."""
|
|
if len(history) < _SMOOTH_SLOTS:
|
|
return list(history)
|
|
|
|
filtered = self._median_filter(history)
|
|
return self._triangular_smooth(filtered)
|
|
|
|
@staticmethod
|
|
def _median_filter(data):
|
|
"""Remove impulse noise from anchor swing."""
|
|
half = MEDIAN_FILTER_KERNEL // 2
|
|
result = []
|
|
for i in range(len(data)):
|
|
lo = max(0, i - half)
|
|
hi = min(len(data), i + half + 1)
|
|
depths = sorted(d for _, d in data[lo:hi])
|
|
result.append((data[i][0], depths[len(depths) // 2]))
|
|
return result
|
|
|
|
@staticmethod
|
|
def _triangular_smooth(data):
|
|
"""Triangular-weighted moving average for clean frequency rolloff."""
|
|
half = _SMOOTH_SLOTS // 2
|
|
if len(data) < _SMOOTH_SLOTS:
|
|
return list(data)
|
|
|
|
result = []
|
|
for i in range(half, len(data) - half):
|
|
window = data[i - half:i + half + 1]
|
|
weights = [
|
|
1.0 - abs(j - half) / (half + 1)
|
|
for j in range(len(window))
|
|
]
|
|
w_sum = sum(weights)
|
|
avg_ts = sum(
|
|
w * ts for w, (ts, _) in zip(weights, window)) / w_sum
|
|
avg_d = sum(
|
|
w * d for w, (_, d) in zip(weights, window)) / w_sum
|
|
result.append((avg_ts, avg_d))
|
|
return result
|
|
|
|
# ------------------------------------------------------------------
|
|
# Event detection
|
|
# ------------------------------------------------------------------
|
|
|
|
def _detect_events(self, smoothed):
|
|
"""Find, merge, gate, and confirm tide events from smoothed data."""
|
|
reversals = self._find_reversals(smoothed)
|
|
cleaned = self._merge_and_gate(reversals)
|
|
return self._confirm_events(smoothed, cleaned)
|
|
|
|
@staticmethod
|
|
def _find_reversals(smoothed):
|
|
"""Detect all slope sign changes in the smoothed series."""
|
|
candidates = []
|
|
prev_sign = 0
|
|
for i in range(1, len(smoothed)):
|
|
ts_prev, d_prev = smoothed[i - 1]
|
|
ts_curr, d_curr = smoothed[i]
|
|
if d_curr > d_prev:
|
|
new_sign = 1
|
|
elif d_curr < d_prev:
|
|
new_sign = -1
|
|
else:
|
|
continue
|
|
if prev_sign != 0 and new_sign != prev_sign:
|
|
typ = "high" if prev_sign == 1 else "low"
|
|
candidates.append((ts_prev, d_prev, typ))
|
|
prev_sign = new_sign
|
|
return candidates
|
|
|
|
def _merge_and_gate(self, candidates):
|
|
"""Iteratively merge same-type events and apply amplitude gate.
|
|
|
|
When noise-induced mini-reversals produce consecutive same-type
|
|
candidates the most extreme depth is kept. Small-amplitude
|
|
oscillations are then pruned. Iteration continues until the
|
|
sequence is stable and strictly alternates high-low.
|
|
"""
|
|
events = list(candidates)
|
|
changed = True
|
|
while changed:
|
|
changed = False
|
|
|
|
merged = []
|
|
for ts, d, typ in events:
|
|
if merged and merged[-1][2] == typ:
|
|
prev_d = merged[-1][1]
|
|
if ((typ == "high" and d > prev_d)
|
|
or (typ == "low" and d < prev_d)):
|
|
merged[-1] = (ts, d, typ)
|
|
else:
|
|
merged.append((ts, d, typ))
|
|
if len(merged) != len(events):
|
|
changed = True
|
|
|
|
filtered = [merged[0]] if merged else []
|
|
for i in range(1, len(merged)):
|
|
ts, d, typ = merged[i]
|
|
if abs(d - filtered[-1][1]) < self.min_amplitude:
|
|
changed = True
|
|
continue
|
|
filtered.append((ts, d, typ))
|
|
events = filtered
|
|
return events
|
|
|
|
@staticmethod
|
|
def _confirm_events(smoothed, events):
|
|
"""Hold back the most recent event until confirmed.
|
|
|
|
Confirmation requires smoothed depths after the candidate to have
|
|
moved away by at least TIDE_CONFIRMATION_DEPTH. The smoothing
|
|
window naturally makes this condition harder to meet in noisy
|
|
conditions, providing adaptive delay.
|
|
"""
|
|
if not events:
|
|
return events
|
|
|
|
last_ts, last_d, last_type = events[-1]
|
|
max_excursion = 0.0
|
|
for ts, d in smoothed:
|
|
if ts <= last_ts:
|
|
continue
|
|
if last_type == "high":
|
|
excursion = last_d - d
|
|
else:
|
|
excursion = d - last_d
|
|
if excursion > max_excursion:
|
|
max_excursion = excursion
|
|
|
|
if max_excursion < TIDE_CONFIRMATION_DEPTH:
|
|
return events[:-1]
|
|
return events
|
|
|
|
# ------------------------------------------------------------------
|
|
# Public accessors
|
|
# ------------------------------------------------------------------
|
|
|
|
def get_recent_events(self, count=10):
|
|
"""Return the most recent tide events."""
|
|
events = list(self.events)
|
|
return events[-count:] if len(events) > count else events
|
|
|
|
def get_last_high(self):
|
|
"""Return the most recent high tide event, or None."""
|
|
for event in reversed(self.events):
|
|
if event[2] == "high":
|
|
return event
|
|
return None
|
|
|
|
def get_last_low(self):
|
|
"""Return the most recent low tide event, or None."""
|
|
for event in reversed(self.events):
|
|
if event[2] == "low":
|
|
return event
|
|
return None
|