67 lines
2.0 KiB
Python
67 lines
2.0 KiB
Python
"""
|
|
In-memory track buffer with spatial deduplication.
|
|
|
|
Replaces SQLite-backed TrackLogger for normal operation — zero SD card writes.
|
|
Points are stored as (ts, lat, lon) tuples and only appended when the vessel
|
|
has moved more than DEDUP_THRESHOLD_FT from the last stored point.
|
|
"""
|
|
|
|
import math
|
|
import logging
|
|
|
|
logger = logging.getLogger('dbus-anchor-alarm.track_buffer')
|
|
|
|
EARTH_RADIUS_FT = 20902231.0
|
|
DEDUP_THRESHOLD_FT = 7.0
|
|
MAX_POINTS = 2000
|
|
TRIM_FRACTION = 0.20
|
|
|
|
|
|
def _haversine_ft(lat1, lon1, lat2, lon2):
|
|
rlat1, rlat2 = math.radians(lat1), math.radians(lat2)
|
|
dlat = math.radians(lat2 - lat1)
|
|
dlon = math.radians(lon2 - lon1)
|
|
a = (math.sin(dlat / 2) ** 2
|
|
+ math.cos(rlat1) * math.cos(rlat2) * math.sin(dlon / 2) ** 2)
|
|
return EARTH_RADIUS_FT * 2.0 * math.atan2(math.sqrt(a), math.sqrt(1.0 - a))
|
|
|
|
|
|
class TrackBuffer:
|
|
"""In-memory GPS track with spatial deduplication."""
|
|
|
|
def __init__(self):
|
|
self._points = [] # [(ts, lat, lon)]
|
|
|
|
def add_if_moved(self, snapshot):
|
|
"""Append point only if vessel moved > DEDUP_THRESHOLD_FT from last stored point."""
|
|
if snapshot.latitude is None or snapshot.longitude is None:
|
|
return
|
|
|
|
ts = snapshot.timestamp
|
|
lat, lon = snapshot.latitude, snapshot.longitude
|
|
|
|
if self._points:
|
|
_, last_lat, last_lon = self._points[-1]
|
|
if _haversine_ft(last_lat, last_lon, lat, lon) < DEDUP_THRESHOLD_FT:
|
|
return
|
|
|
|
self._points.append((ts, lat, lon))
|
|
|
|
if len(self._points) > MAX_POINTS:
|
|
trim = int(MAX_POINTS * TRIM_FRACTION)
|
|
self._points = self._points[trim:]
|
|
logger.info('Track buffer trimmed to %d points', len(self._points))
|
|
|
|
def get_display_points_json(self):
|
|
"""Return JSON-serialisable list of all stored points."""
|
|
return [
|
|
{'ts': ts, 'lat': lat, 'lon': lon}
|
|
for ts, lat, lon in self._points
|
|
]
|
|
|
|
def get_point_count(self):
|
|
return len(self._points)
|
|
|
|
def clear(self):
|
|
self._points.clear()
|