432 lines
15 KiB
Python
432 lines
15 KiB
Python
"""
|
|
Anchor position estimation engine.
|
|
|
|
Combines heading-based projection and arc-based circle fitting to estimate
|
|
where the anchor sits on the seabed, with a 2-sigma uncertainty circle.
|
|
"""
|
|
|
|
import math
|
|
import time
|
|
import logging
|
|
from collections import deque
|
|
|
|
from catenary import catenary_distance, wind_force_lbs
|
|
|
|
logger = logging.getLogger('dbus-anchor-alarm.tracker')
|
|
|
|
EARTH_RADIUS_FT = 20902231.0
|
|
NM_TO_FT = 6076.12
|
|
|
|
|
|
def _haversine_ft(lat1, lon1, lat2, lon2):
|
|
"""Distance between two GPS points in feet."""
|
|
rlat1, rlat2 = math.radians(lat1), math.radians(lat2)
|
|
dlat = math.radians(lat2 - lat1)
|
|
dlon = math.radians(lon2 - lon1)
|
|
a = (math.sin(dlat / 2) ** 2
|
|
+ math.cos(rlat1) * math.cos(rlat2) * math.sin(dlon / 2) ** 2)
|
|
return EARTH_RADIUS_FT * 2.0 * math.atan2(math.sqrt(a), math.sqrt(1.0 - a))
|
|
|
|
|
|
def _project_position(lat, lon, distance_ft, bearing_deg):
|
|
"""Project a point from (lat, lon) by distance_ft along bearing_deg."""
|
|
d = distance_ft / EARTH_RADIUS_FT
|
|
brng = math.radians(bearing_deg)
|
|
rlat = math.radians(lat)
|
|
rlon = math.radians(lon)
|
|
|
|
new_lat = math.asin(
|
|
math.sin(rlat) * math.cos(d)
|
|
+ math.cos(rlat) * math.sin(d) * math.cos(brng)
|
|
)
|
|
new_lon = rlon + math.atan2(
|
|
math.sin(brng) * math.sin(d) * math.cos(rlat),
|
|
math.cos(d) - math.sin(rlat) * math.sin(new_lat),
|
|
)
|
|
return math.degrees(new_lat), math.degrees(new_lon)
|
|
|
|
|
|
def _to_local_xy(lat, lon, ref_lat, ref_lon):
|
|
"""Convert (lat, lon) to local (x_ft, y_ft) relative to a reference."""
|
|
lat_rad = math.radians(ref_lat)
|
|
dx = (lon - ref_lon) * 60.0 * NM_TO_FT * math.cos(lat_rad)
|
|
dy = (lat - ref_lat) * 60.0 * NM_TO_FT
|
|
return dx, dy
|
|
|
|
|
|
def _from_local_xy(x_ft, y_ft, ref_lat, ref_lon):
|
|
"""Convert local (x_ft, y_ft) back to (lat, lon)."""
|
|
lat_rad = math.radians(ref_lat)
|
|
cos_lat = math.cos(lat_rad)
|
|
if abs(cos_lat) < 1e-12:
|
|
cos_lat = 1e-12
|
|
dlon = x_ft / (60.0 * NM_TO_FT * cos_lat)
|
|
dlat = y_ft / (60.0 * NM_TO_FT)
|
|
return ref_lat + dlat, ref_lon + dlon
|
|
|
|
|
|
def _circle_fit(points_xy):
|
|
"""Algebraic circle fit (Kasa method) for a list of (x, y) points.
|
|
|
|
Returns (cx, cy, radius, residual) or None if singular.
|
|
"""
|
|
n = len(points_xy)
|
|
if n < 3:
|
|
return None
|
|
|
|
sx = sy = sxx = syy = sxy = 0.0
|
|
sxxx = syyy = sxyy = syxx = 0.0
|
|
for x, y in points_xy:
|
|
x2, y2 = x * x, y * y
|
|
sx += x
|
|
sy += y
|
|
sxx += x2
|
|
syy += y2
|
|
sxy += x * y
|
|
sxxx += x2 * x
|
|
syyy += y2 * y
|
|
sxyy += x * y2
|
|
syxx += y * x2
|
|
|
|
a11 = 2.0 * (sx * sx - n * sxx)
|
|
a12 = 2.0 * (sx * sy - n * sxy)
|
|
a21 = a12
|
|
a22 = 2.0 * (sy * sy - n * syy)
|
|
|
|
b1 = (sx * sxx - n * sxxx) + (sx * syy - n * sxyy)
|
|
b2 = (sy * sxx - n * syxx) + (sy * syy - n * syyy)
|
|
|
|
det = a11 * a22 - a12 * a21
|
|
if abs(det) < 1e-12:
|
|
return None
|
|
|
|
cx = (b1 * a22 - b2 * a12) / det
|
|
cy = (a11 * b2 - a21 * b1) / det
|
|
|
|
radius_sq = (sxx + syy - 2.0 * cx * sx - 2.0 * cy * sy) / n + cx * cx + cy * cy
|
|
if radius_sq < 0:
|
|
return None
|
|
radius = math.sqrt(radius_sq)
|
|
|
|
residual = 0.0
|
|
for x, y in points_xy:
|
|
d = math.sqrt((x - cx) ** 2 + (y - cy) ** 2) - radius
|
|
residual += d * d
|
|
residual = math.sqrt(residual / n)
|
|
|
|
return cx, cy, radius, residual
|
|
|
|
|
|
def _angular_span(headings_deg):
|
|
"""Angular span of a list of headings, handling 0/360 wraparound."""
|
|
if len(headings_deg) < 2:
|
|
return 0.0
|
|
|
|
sorted_h = sorted(h % 360.0 for h in headings_deg)
|
|
max_gap = 0.0
|
|
for i in range(1, len(sorted_h)):
|
|
gap = sorted_h[i] - sorted_h[i - 1]
|
|
if gap > max_gap:
|
|
max_gap = gap
|
|
wrap_gap = (360.0 - sorted_h[-1]) + sorted_h[0]
|
|
if wrap_gap > max_gap:
|
|
max_gap = wrap_gap
|
|
|
|
return 360.0 - max_gap
|
|
|
|
|
|
class AnchorTracker:
|
|
"""Estimates anchor position from GPS track, heading, and catenary geometry."""
|
|
|
|
def __init__(self, bow_offset_ft=0.0):
|
|
self.anchor_set = False
|
|
self.marked_lat = None
|
|
self.marked_lon = None
|
|
self.estimated_lat = None
|
|
self.estimated_lon = None
|
|
self.uncertainty_radius_ft = 0.0
|
|
self.drift_ft = 0.0
|
|
self.estimated_distance_ft = 0.0
|
|
self.bow_offset_ft = bow_offset_ft
|
|
|
|
self._heading_estimates = deque(maxlen=300)
|
|
self._arc_points = deque(maxlen=1800)
|
|
self._estimated_history = deque(maxlen=100)
|
|
|
|
self.last_heading_est_lat = None
|
|
self.last_heading_est_lon = None
|
|
self.last_arc_center_lat = None
|
|
self.last_arc_center_lon = None
|
|
self.last_arc_radius_ft = None
|
|
self.last_arc_residual = None
|
|
self.last_arc_coverage = 0.0
|
|
self.last_arc_valid = False
|
|
self.last_weight_arc = 0.0
|
|
self.last_weight_heading = 0.0
|
|
self.last_arc_point_count = 0
|
|
|
|
# ------------------------------------------------------------------
|
|
# Public API
|
|
# ------------------------------------------------------------------
|
|
|
|
def drop_anchor(self, lat, lon, alarm_radius_ft=None):
|
|
"""Mark anchor position at the vessel's current GPS location."""
|
|
self.marked_lat = lat
|
|
self.marked_lon = lon
|
|
self.estimated_lat = lat
|
|
self.estimated_lon = lon
|
|
self.anchor_set = True
|
|
self.drift_ft = 0.0
|
|
self.estimated_distance_ft = 0.0
|
|
|
|
default = 100.0
|
|
if alarm_radius_ft is not None:
|
|
default = max(alarm_radius_ft / 2.0, 100.0)
|
|
self.uncertainty_radius_ft = default
|
|
|
|
self._heading_estimates.clear()
|
|
self._arc_points.clear()
|
|
self._estimated_history.clear()
|
|
logger.info(
|
|
'Anchor dropped at %.6f, %.6f uncertainty=%.0f ft',
|
|
lat, lon, self.uncertainty_radius_ft,
|
|
)
|
|
|
|
def after_drop(self, vessel_lat, vessel_lon, heading_deg,
|
|
chain_length_ft, depth_ft, freeboard_ft,
|
|
wind_speed_kts, windage_area, drag_coeff, chain_weight):
|
|
"""Estimate anchor position when vessel has already backed away."""
|
|
force = wind_force_lbs(wind_speed_kts, windage_area, drag_coeff)
|
|
result = catenary_distance(
|
|
chain_length_ft, depth_ft, freeboard_ft, force, chain_weight,
|
|
)
|
|
self.estimated_distance_ft = result.total_distance_ft
|
|
|
|
proj_dist = self.estimated_distance_ft + self.bow_offset_ft
|
|
anchor_lat, anchor_lon = _project_position(
|
|
vessel_lat, vessel_lon, proj_dist, heading_deg,
|
|
)
|
|
|
|
self.marked_lat = anchor_lat
|
|
self.marked_lon = anchor_lon
|
|
self.estimated_lat = anchor_lat
|
|
self.estimated_lon = anchor_lon
|
|
self.anchor_set = True
|
|
self.drift_ft = 0.0
|
|
self.uncertainty_radius_ft = 0.50 * chain_length_ft
|
|
|
|
self._heading_estimates.clear()
|
|
self._arc_points.clear()
|
|
self._estimated_history.clear()
|
|
logger.info(
|
|
'Anchor after-drop at %.6f, %.6f dist=%.0f ft uncertainty=%.0f ft',
|
|
anchor_lat, anchor_lon,
|
|
self.estimated_distance_ft, self.uncertainty_radius_ft,
|
|
)
|
|
|
|
def update(self, snapshot, chain_length_ft, depth_ft, freeboard_ft,
|
|
windage_area, drag_coeff, chain_weight):
|
|
"""Main loop tick -- refine anchor position estimate."""
|
|
if not self.anchor_set:
|
|
return
|
|
if snapshot.latitude is None or snapshot.longitude is None:
|
|
return
|
|
|
|
now = snapshot.timestamp or time.time()
|
|
vlat, vlon = snapshot.latitude, snapshot.longitude
|
|
|
|
# Step 1: catenary distance
|
|
wind = snapshot.wind_speed if snapshot.wind_speed is not None else 0.0
|
|
force = wind_force_lbs(wind, windage_area, drag_coeff)
|
|
cat = catenary_distance(
|
|
chain_length_ft, depth_ft, freeboard_ft, force, chain_weight,
|
|
)
|
|
self.estimated_distance_ft = cat.total_distance_ft
|
|
|
|
# Step 2: heading-based estimate (project from bow, not GPS)
|
|
if snapshot.heading is not None:
|
|
bow_lat, bow_lon = _project_position(
|
|
vlat, vlon, self.bow_offset_ft, snapshot.heading,
|
|
)
|
|
a_lat, a_lon = _project_position(
|
|
bow_lat, bow_lon, self.estimated_distance_ft, snapshot.heading,
|
|
)
|
|
weight = 1.5 if wind > 5.0 else 1.0
|
|
self._heading_estimates.append((a_lat, a_lon, now, weight))
|
|
|
|
# Step 3: arc-based estimation
|
|
if snapshot.heading is not None:
|
|
self._arc_points.append((vlat, vlon, snapshot.heading, now))
|
|
|
|
cutoff_10min = now - 600.0
|
|
recent_headings = [
|
|
h for _, _, h, t in self._arc_points if t >= cutoff_10min
|
|
]
|
|
arc_coverage = (
|
|
_angular_span(recent_headings) if len(recent_headings) >= 2 else 0.0
|
|
)
|
|
|
|
arc_center_lat = None
|
|
arc_center_lon = None
|
|
arc_valid = False
|
|
arc_radius = None
|
|
arc_residual = None
|
|
|
|
if arc_coverage > 30.0 and len(self._arc_points) >= 5:
|
|
ref_lat, ref_lon = self._arc_points[0][0], self._arc_points[0][1]
|
|
pts_xy = [
|
|
_to_local_xy(
|
|
*_project_position(la, lo, self.bow_offset_ft, h),
|
|
ref_lat, ref_lon,
|
|
)
|
|
for la, lo, h, _ in self._arc_points
|
|
]
|
|
fit = _circle_fit(pts_xy)
|
|
if fit is not None:
|
|
cx, cy, arc_radius, arc_residual = fit
|
|
if arc_residual < 0.5 * chain_length_ft:
|
|
arc_center_lat, arc_center_lon = _from_local_xy(
|
|
cx, cy, ref_lat, ref_lon,
|
|
)
|
|
arc_valid = True
|
|
|
|
# Step 4: combine estimates
|
|
heading_lat, heading_lon = self._weighted_heading_average()
|
|
|
|
wa, wh = 0.0, 0.0
|
|
|
|
if heading_lat is None and not arc_valid:
|
|
self._store_estimation_state(
|
|
heading_lat, heading_lon,
|
|
arc_center_lat, arc_center_lon, arc_radius, arc_residual,
|
|
arc_coverage, arc_valid, wa, wh,
|
|
)
|
|
return
|
|
|
|
if arc_valid and heading_lat is not None:
|
|
if arc_coverage > 180.0:
|
|
wa, wh = 0.85, 0.15
|
|
elif arc_coverage > 120.0:
|
|
wa, wh = 0.65, 0.35
|
|
elif arc_coverage > 60.0:
|
|
wa, wh = 0.30, 0.70
|
|
else:
|
|
wa, wh = 0.0, 1.0
|
|
|
|
if wa > 0:
|
|
self.estimated_lat = wa * arc_center_lat + wh * heading_lat
|
|
self.estimated_lon = wa * arc_center_lon + wh * heading_lon
|
|
else:
|
|
self.estimated_lat = heading_lat
|
|
self.estimated_lon = heading_lon
|
|
elif arc_valid:
|
|
wa, wh = 1.0, 0.0
|
|
self.estimated_lat = arc_center_lat
|
|
self.estimated_lon = arc_center_lon
|
|
else:
|
|
wa, wh = 0.0, 1.0
|
|
self.estimated_lat = heading_lat
|
|
self.estimated_lon = heading_lon
|
|
|
|
self._store_estimation_state(
|
|
heading_lat, heading_lon,
|
|
arc_center_lat, arc_center_lon, arc_radius, arc_residual,
|
|
arc_coverage, arc_valid, wa, wh,
|
|
)
|
|
|
|
# Step 5: uncertainty radius
|
|
if len(self._estimated_history) >= 10:
|
|
recent = list(self._estimated_history)[-50:]
|
|
mean_lat = sum(e[0] for e in recent) / len(recent)
|
|
mean_lon = sum(e[1] for e in recent) / len(recent)
|
|
var_ft = 0.0
|
|
for la, lo, _t, _u in recent:
|
|
dx, dy = _to_local_xy(la, lo, mean_lat, mean_lon)
|
|
var_ft += dx * dx + dy * dy
|
|
std_ft = math.sqrt(var_ft / len(recent))
|
|
self.uncertainty_radius_ft = max(
|
|
10.0, min(2.0 * std_ft, chain_length_ft),
|
|
)
|
|
|
|
# Step 6: drift
|
|
if self.marked_lat is not None and self.estimated_lat is not None:
|
|
self.drift_ft = _haversine_ft(
|
|
self.marked_lat, self.marked_lon,
|
|
self.estimated_lat, self.estimated_lon,
|
|
)
|
|
|
|
# Step 7: history
|
|
self._estimated_history.append(
|
|
(self.estimated_lat, self.estimated_lon, now,
|
|
self.uncertainty_radius_ft),
|
|
)
|
|
|
|
def weigh_anchor(self):
|
|
"""Reset all state."""
|
|
self.anchor_set = False
|
|
self.marked_lat = None
|
|
self.marked_lon = None
|
|
self.estimated_lat = None
|
|
self.estimated_lon = None
|
|
self.uncertainty_radius_ft = 0.0
|
|
self.drift_ft = 0.0
|
|
self.estimated_distance_ft = 0.0
|
|
self._heading_estimates.clear()
|
|
self._arc_points.clear()
|
|
self._estimated_history.clear()
|
|
self.last_heading_est_lat = None
|
|
self.last_heading_est_lon = None
|
|
self.last_arc_center_lat = None
|
|
self.last_arc_center_lon = None
|
|
self.last_arc_radius_ft = None
|
|
self.last_arc_residual = None
|
|
self.last_arc_coverage = 0.0
|
|
self.last_arc_valid = False
|
|
self.last_weight_arc = 0.0
|
|
self.last_weight_heading = 0.0
|
|
self.last_arc_point_count = 0
|
|
logger.info('Anchor weighed -- state reset')
|
|
|
|
def get_estimated_history_json(self):
|
|
"""Return JSON-serializable list of recent estimated positions."""
|
|
return [
|
|
{'lat': lat, 'lon': lon, 'ts': ts, 'uncertainty_ft': unc}
|
|
for lat, lon, ts, unc in self._estimated_history
|
|
]
|
|
|
|
# ------------------------------------------------------------------
|
|
# Internal
|
|
# ------------------------------------------------------------------
|
|
|
|
def _store_estimation_state(self, hdg_lat, hdg_lon, arc_lat, arc_lon,
|
|
arc_radius, arc_residual, arc_coverage,
|
|
arc_valid, wa, wh):
|
|
self.last_heading_est_lat = hdg_lat
|
|
self.last_heading_est_lon = hdg_lon
|
|
self.last_arc_center_lat = arc_lat
|
|
self.last_arc_center_lon = arc_lon
|
|
self.last_arc_radius_ft = arc_radius
|
|
self.last_arc_residual = arc_residual
|
|
self.last_arc_coverage = arc_coverage
|
|
self.last_arc_valid = arc_valid
|
|
self.last_weight_arc = wa
|
|
self.last_weight_heading = wh
|
|
self.last_arc_point_count = len(self._arc_points)
|
|
|
|
def _weighted_heading_average(self):
|
|
"""Weighted average of heading-based anchor position estimates."""
|
|
if not self._heading_estimates:
|
|
return None, None
|
|
|
|
total_w = 0.0
|
|
wlat = 0.0
|
|
wlon = 0.0
|
|
for lat, lon, _ts, w in self._heading_estimates:
|
|
wlat += lat * w
|
|
wlon += lon * w
|
|
total_w += w
|
|
|
|
if total_w == 0:
|
|
return None, None
|
|
return wlat / total_w, wlon / total_w
|