Files
venus/dbus-anchor-alarm/drag_detector.py
2026-03-26 14:15:02 +00:00

270 lines
9.3 KiB
Python

"""
Drag detection and alarm logic for anchor alarm.
Pure math -- no D-Bus. Takes sensor data and tracker state as parameters.
"""
import math
import time
import logging
from collections import deque
from config import (
DRAG_SPEED_MIN_KTS,
DRAG_SPEED_MAX_KTS,
DRAG_RADIAL_MIN_KTS,
DRAG_SPEED_SUSTAIN_SEC,
DRAG_SWING_SUPPRESS_DEG_SEC,
)
logger = logging.getLogger('dbus-anchor-alarm.drag')
EARTH_RADIUS_FT = 20902231.0
ALARM_NONE = "none"
ALARM_RADIUS = "radius"
ALARM_DRAG = "drag"
ALARM_SHIFT = "shift"
_DEPTH_HISTORY_MAX = 600
_HEADING_HISTORY_MAX = 30
_DEPTH_RATE_WINDOW_SEC = 300.0
_SWING_RATE_WINDOW_SEC = 10.0
def _haversine_ft(lat1, lon1, lat2, lon2):
"""Distance between two GPS points in feet."""
rlat1, rlat2 = math.radians(lat1), math.radians(lat2)
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = (math.sin(dlat / 2) ** 2
+ math.cos(rlat1) * math.cos(rlat2) * math.sin(dlon / 2) ** 2)
return EARTH_RADIUS_FT * 2.0 * math.atan2(math.sqrt(a), math.sqrt(1.0 - a))
def _bearing_between(lat1, lon1, lat2, lon2):
"""Bearing from point 1 to point 2 in degrees (0-360)."""
rlat1 = math.radians(lat1)
rlat2 = math.radians(lat2)
dlon = math.radians(lon2 - lon1)
x = math.sin(dlon) * math.cos(rlat2)
y = (math.cos(rlat1) * math.sin(rlat2)
- math.sin(rlat1) * math.cos(rlat2) * math.cos(dlon))
return math.degrees(math.atan2(x, y)) % 360.0
def _angle_diff(a, b):
"""Smallest angle between two bearings (0-180), handling wraparound."""
d = abs((a % 360.0) - (b % 360.0))
return d if d <= 180.0 else 360.0 - d
def _depth_rate_ft_per_min(depth_history):
"""Rate of depth change in ft/min via simple linear regression.
depth_history is a list of (depth_ft, timestamp).
Returns 0.0 if insufficient data.
"""
if len(depth_history) < 2:
return 0.0
t0 = depth_history[0][1]
n = len(depth_history)
sum_t = sum_d = sum_tt = sum_td = 0.0
for depth, ts in depth_history:
t = (ts - t0) / 60.0 # minutes
sum_t += t
sum_d += depth
sum_tt += t * t
sum_td += t * depth
denom = n * sum_tt - sum_t * sum_t
if abs(denom) < 1e-12:
return 0.0
return (n * sum_td - sum_t * sum_d) / denom
class DragDetector:
"""Detects anchor drag and manages alarm state."""
def __init__(self):
self.alarm_active = False
self.alarm_type = ALARM_NONE
self.alarm_message = ""
self.tide_influence_active = False
self.swing_rate_deg_per_sec = 0.0
self._drag_speed_start_time = None
self._last_estimated_anchor = None # (lat, lon, timestamp)
self._depth_history = deque(maxlen=_DEPTH_HISTORY_MAX)
self._heading_history = deque(maxlen=_HEADING_HISTORY_MAX)
def update(self, snapshot, tracker, alarm_radius_ft, chain_length_ft):
"""Run drag / alarm checks for a single tick."""
now = snapshot.timestamp or time.time()
self._update_swing_rate(snapshot, now)
self._update_depth_history(snapshot, now)
self._update_tide_influence(snapshot)
alarm_set = False
# 1. Radius check (highest priority)
if (not alarm_set
and tracker.marked_lat is not None
and snapshot.latitude is not None
and snapshot.longitude is not None):
dist = _haversine_ft(
snapshot.latitude, snapshot.longitude,
tracker.marked_lat, tracker.marked_lon,
)
if dist > alarm_radius_ft:
self._set_alarm(
ALARM_RADIUS,
"Vessel is {:.0f}ft from anchor, exceeds {:.0f}ft alarm radius"
.format(dist, alarm_radius_ft),
)
alarm_set = True
# 2. Radial-speed drag check
# Decompose vessel SOG into radial component (away from anchor).
# Swinging on the rode produces tangential motion with near-zero
# radial speed; actual drag produces sustained outward radial speed.
if (not alarm_set
and tracker.marked_lat is not None
and snapshot.latitude is not None
and snapshot.longitude is not None
and snapshot.speed is not None
and snapshot.course is not None):
if DRAG_SPEED_MIN_KTS <= snapshot.speed <= DRAG_SPEED_MAX_KTS:
bearing_to_anchor = _bearing_between(
snapshot.latitude, snapshot.longitude,
tracker.marked_lat, tracker.marked_lon,
)
bearing_away = (bearing_to_anchor + 180.0) % 360.0
angle_from_away = _angle_diff(snapshot.course, bearing_away)
radial_kts = snapshot.speed * math.cos(
math.radians(angle_from_away))
swinging = (self.swing_rate_deg_per_sec
>= DRAG_SWING_SUPPRESS_DEG_SEC)
if radial_kts >= DRAG_RADIAL_MIN_KTS and not swinging:
if self._drag_speed_start_time is None:
self._drag_speed_start_time = now
elapsed = now - self._drag_speed_start_time
if elapsed >= DRAG_SPEED_SUSTAIN_SEC:
self._set_alarm(
ALARM_DRAG,
"Vessel dragging at {:.2f} kts radial for {:.0f}s"
.format(radial_kts, elapsed),
)
alarm_set = True
else:
self._drag_speed_start_time = None
else:
self._drag_speed_start_time = None
# 3. Anchor shift check
if (not alarm_set
and tracker.estimated_lat is not None
and tracker.estimated_lon is not None
and tracker.drift_ft is not None
and tracker.marked_lat is not None):
threshold = 0.25 * alarm_radius_ft
if tracker.drift_ft > threshold:
wind = snapshot.wind_speed
if wind is not None and wind > 10.0:
self._set_alarm(
ALARM_SHIFT,
"Estimated anchor position shifted {:.0f}ft from marked position"
.format(tracker.drift_ft),
)
alarm_set = True
# 4. Tide-based drag suppression
if self.tide_influence_active and self.alarm_type == ALARM_DRAG:
self.alarm_active = False
self.alarm_type = ALARM_NONE
self.alarm_message = ""
alarm_set = False
# 5. Clear alarm if nothing triggered
if not alarm_set:
self.alarm_active = False
self.alarm_type = ALARM_NONE
self.alarm_message = ""
def reset(self):
"""Clear all state. Called when anchor is weighed."""
self.alarm_active = False
self.alarm_type = ALARM_NONE
self.alarm_message = ""
self.tide_influence_active = False
self.swing_rate_deg_per_sec = 0.0
self._drag_speed_start_time = None
self._last_estimated_anchor = None
self._depth_history.clear()
self._heading_history.clear()
# ------------------------------------------------------------------
# Internal helpers
# ------------------------------------------------------------------
def _set_alarm(self, alarm_type, message):
self.alarm_active = True
self.alarm_type = alarm_type
self.alarm_message = message
logger.warning('ALARM %s: %s', alarm_type, message)
def _update_depth_history(self, snapshot, now):
if snapshot.depth is not None:
self._depth_history.append((snapshot.depth, now))
def _update_tide_influence(self, snapshot):
"""Flag tide influence when depth is changing and wind is calm."""
if len(self._depth_history) < 10:
self.tide_influence_active = False
return
cutoff = self._depth_history[-1][1] - _DEPTH_RATE_WINDOW_SEC
window = [(d, t) for d, t in self._depth_history if t >= cutoff]
if len(window) < 2:
self.tide_influence_active = False
return
rate = _depth_rate_ft_per_min(window)
wind = snapshot.wind_speed if snapshot.wind_speed is not None else 0.0
self.tide_influence_active = abs(rate) > 0.1 and wind < 5.0
def _update_swing_rate(self, snapshot, now):
"""Track heading rate of change (deg/sec)."""
if snapshot.heading is None:
return
self._heading_history.append((snapshot.heading, now))
if len(self._heading_history) < 2:
self.swing_rate_deg_per_sec = 0.0
return
# Use oldest sample within the swing-rate window
cutoff = now - _SWING_RATE_WINDOW_SEC
oldest = None
for h, t in self._heading_history:
if t >= cutoff:
oldest = (h, t)
break
if oldest is None or oldest[1] >= now - 0.5:
self.swing_rate_deg_per_sec = 0.0
return
newest_h, newest_t = self._heading_history[-1]
dt = newest_t - oldest[1]
if dt < 0.5:
return
dh = _angle_diff(newest_h, oldest[0])
self.swing_rate_deg_per_sec = dh / dt