""" Depth recording with averaging and persistence. Collects raw depth readings, computes 5-minute averages to smooth wave noise, stores them in a 96-hour circular buffer, and persists to SQLite across restarts. Each record includes the vessel's GPS position so that observations can be filtered to only those taken near the current anchorage. """ import logging import math import os import sqlite3 import time from collections import deque from config import ( DEPTH_AVG_WINDOW, DEPTH_HISTORY_HOURS, DB_FILE, STATIONARY_RADIUS_METERS, ) logger = logging.getLogger('DepthRecorder') _SLOTS = DEPTH_HISTORY_HOURS * 3600 // DEPTH_AVG_WINDOW def _haversine(lat1, lon1, lat2, lon2): """Great-circle distance in meters between two GPS coordinates.""" R = 6371000 phi1 = math.radians(lat1) phi2 = math.radians(lat2) dphi = math.radians(lat2 - lat1) dlam = math.radians(lon2 - lon1) a = (math.sin(dphi / 2) ** 2 + math.cos(phi1) * math.cos(phi2) * math.sin(dlam / 2) ** 2) return R * 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a)) class DepthRecorder: """Accumulates raw depth readings, averages them, and stores history.""" def __init__(self, db_path=None): self.db_path = db_path or DB_FILE self._raw_window = [] self._window_start = 0.0 # Ring buffer: (timestamp, depth_avg, lat, lon) self.history = deque(maxlen=_SLOTS) self._init_db() self._load_history() # ------------------------------------------------------------------ # Database # ------------------------------------------------------------------ def _init_db(self): db_dir = os.path.dirname(self.db_path) if db_dir and not os.path.exists(db_dir): try: os.makedirs(db_dir, exist_ok=True) except OSError: pass try: self._conn = sqlite3.connect(self.db_path) self._conn.execute( 'CREATE TABLE IF NOT EXISTS depth_history ' '(timestamp REAL PRIMARY KEY, depth_avg REAL, lat REAL, lon REAL)') self._conn.execute( 'CREATE INDEX IF NOT EXISTS idx_ts ON depth_history(timestamp)') self._conn.commit() logger.info(f"Database opened: {self.db_path}") except sqlite3.Error as e: logger.error(f"Database init failed: {e}") self._conn = None def _load_history(self): """Load recent history from SQLite on startup.""" if not self._conn: return try: cutoff = time.time() - DEPTH_HISTORY_HOURS * 3600 rows = self._conn.execute( 'SELECT timestamp, depth_avg, lat, lon FROM depth_history ' 'WHERE timestamp > ? ORDER BY timestamp', (cutoff,) ).fetchall() for ts, depth, lat, lon in rows: self.history.append((ts, depth, lat, lon)) logger.info(f"Loaded {len(rows)} depth history records") except sqlite3.Error as e: logger.warning(f"Failed to load history: {e}") def _persist(self, ts, depth_avg, lat, lon): if not self._conn: return try: self._conn.execute( 'INSERT OR REPLACE INTO depth_history ' '(timestamp, depth_avg, lat, lon) VALUES (?, ?, ?, ?)', (ts, depth_avg, lat, lon)) self._conn.commit() except sqlite3.Error as e: logger.warning(f"Failed to persist depth: {e}") def _prune_db(self): """Remove records older than the history window.""" if not self._conn: return try: cutoff = time.time() - DEPTH_HISTORY_HOURS * 3600 self._conn.execute( 'DELETE FROM depth_history WHERE timestamp < ?', (cutoff,)) self._conn.commit() except sqlite3.Error: pass # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ def add_reading(self, depth, lat=None, lon=None, now=None): """Add a raw depth reading. Returns a new average if the window closed.""" if now is None: now = time.time() if not self._raw_window: self._window_start = now self._raw_window.append((now, depth, lat, lon)) elapsed = now - self._window_start if elapsed >= DEPTH_AVG_WINDOW and self._raw_window: return self._flush_window() return None def _flush_window(self): """Compute the average of the current window and store it.""" if not self._raw_window: return None depths = [d for _, d, _, _ in self._raw_window] avg_depth = sum(depths) / len(depths) mid_ts = (self._window_start + self._raw_window[-1][0]) / 2.0 last_lat = None last_lon = None for _, _, lat, lon in reversed(self._raw_window): if lat is not None and lon is not None: last_lat, last_lon = lat, lon break self.history.append((mid_ts, avg_depth, last_lat, last_lon)) self._persist(mid_ts, avg_depth, last_lat, last_lon) self._raw_window.clear() self._window_start = 0.0 if len(self.history) % 100 == 0: self._prune_db() return (mid_ts, avg_depth) def get_history(self, lat=None, lon=None, radius=None): """Return list of (timestamp, depth) tuples, optionally filtered by position. Args: lat, lon: current vessel position for proximity filtering radius: max distance in meters (default: STATIONARY_RADIUS_METERS) When lat/lon are provided, only records taken within `radius` of that position are returned. Records with no stored position are included (they predate GPS-tagged recording). """ if radius is None: radius = STATIONARY_RADIUS_METERS result = [] for entry in self.history: ts, depth = entry[0], entry[1] rec_lat = entry[2] if len(entry) > 2 else None rec_lon = entry[3] if len(entry) > 3 else None if lat is not None and lon is not None: if (rec_lat is not None and rec_lon is not None and _haversine(lat, lon, rec_lat, rec_lon) > radius): continue result.append((ts, depth)) return result def get_latest_depth(self): """Return the most recent averaged depth, or None.""" if self.history: return self.history[-1][1] if self._raw_window: depths = [d for _, d, _, _ in self._raw_window] return sum(depths) / len(depths) return None def close(self): if self._conn: try: self._conn.close() except sqlite3.Error: pass