Files
venus/dbus-anchor-alarm/track_logger.py
2026-03-26 14:15:02 +00:00

398 lines
13 KiB
Python

"""
SQLite-backed track logging with automatic compaction.
Replaces the previous JSONL flat-file approach. Two tables:
- raw_points : full-resolution, rolling 2-hour window
- summary_points : compacted older data (5-min buckets 2-24h, 30-min 24h+)
Hard cap: summary_points older than 7 days are deleted.
Estimated max DB size ~500 KB at 1 Hz logging.
"""
import json
import logging
import os
import sqlite3
import time
from config import DATA_DIR
logger = logging.getLogger(__name__)
DB_FILE = 'track.db'
JSONL_FILE = 'track.jsonl'
RAW_WINDOW_SEC = 2 * 3600 # keep 2h of full-res data
COMPACT_INTERVAL_SEC = 10 * 60 # run compaction every 10 min
SUMMARY_BUCKET_RECENT_MIN = 5 # 2-24h: 5-min buckets
SUMMARY_BUCKET_OLD_MIN = 30 # 24h+: 30-min buckets
MAX_SUMMARY_AGE_SEC = 7 * 86400 # drop summaries older than 7 days
DISPLAY_FULL_RES_SEC = 30 * 60
DISPLAY_1H_BUCKET_MIN = 5
DISPLAY_6H_BUCKET_MIN = 15
DISPLAY_OLD_BUCKET_MIN = 30
ESTIMATION_LOG_MAX_AGE_SEC = 7 * 86400
_SCHEMA = """
CREATE TABLE IF NOT EXISTS raw_points (
ts REAL PRIMARY KEY,
lat REAL,
lon REAL,
hdg REAL,
cog REAL,
spd REAL,
ws REAL,
wd REAL,
dist REAL,
depth REAL
);
CREATE TABLE IF NOT EXISTS summary_points (
ts REAL PRIMARY KEY,
lat REAL,
lon REAL,
hdg REAL,
hdg_min REAL,
hdg_max REAL,
cog REAL,
spd REAL,
ws REAL,
ws_min REAL,
ws_max REAL,
wd REAL,
dist REAL,
depth REAL,
count INTEGER
);
CREATE TABLE IF NOT EXISTS estimation_log (
ts REAL PRIMARY KEY,
marked_lat REAL,
marked_lon REAL,
est_lat REAL,
est_lon REAL,
uncertainty_ft REAL,
drift_ft REAL,
hdg_est_lat REAL,
hdg_est_lon REAL,
arc_center_lat REAL,
arc_center_lon REAL,
arc_radius_ft REAL,
arc_residual REAL,
arc_coverage REAL,
arc_valid INTEGER,
arc_point_count INTEGER,
weight_arc REAL,
weight_heading REAL,
cat_dist_ft REAL,
vessel_lat REAL,
vessel_lon REAL
);
"""
class TrackLogger:
def __init__(self, data_dir=DATA_DIR):
self._data_dir = data_dir
os.makedirs(data_dir, exist_ok=True)
db_path = os.path.join(data_dir, DB_FILE)
self._conn = sqlite3.connect(db_path, check_same_thread=False)
self._conn.execute('PRAGMA journal_mode=WAL')
self._conn.execute('PRAGMA synchronous=NORMAL')
self._conn.executescript(_SCHEMA)
self._conn.commit()
self._last_compact_ts = time.time()
self._display_cache = None
self._display_dirty = True
self._maybe_migrate_jsonl()
# -- public API ----------------------------------------------------------
def log_point(self, snapshot, estimated_distance_ft):
"""Append a track point from *snapshot* if GPS is valid."""
if snapshot.latitude is None or snapshot.longitude is None:
return
ts = snapshot.timestamp or time.time()
try:
self._conn.execute(
'INSERT OR REPLACE INTO raw_points '
'(ts, lat, lon, hdg, cog, spd, ws, wd, dist, depth) '
'VALUES (?,?,?,?,?,?,?,?,?,?)',
(ts, snapshot.latitude, snapshot.longitude,
snapshot.heading, snapshot.course, snapshot.speed,
snapshot.wind_speed, snapshot.wind_direction,
estimated_distance_ft, snapshot.depth),
)
self._conn.commit()
except sqlite3.Error:
logger.exception('Failed to insert track point')
self._display_dirty = True
now = time.time()
if now - self._last_compact_ts >= COMPACT_INTERVAL_SEC:
self._compact(now)
self._last_compact_ts = now
def log_estimation(self, tracker, vessel_lat, vessel_lon):
"""Persist the tracker's current estimation state for offline analysis."""
if not tracker.anchor_set:
return
ts = time.time()
try:
self._conn.execute(
'INSERT OR REPLACE INTO estimation_log '
'(ts, marked_lat, marked_lon, est_lat, est_lon, '
'uncertainty_ft, drift_ft, hdg_est_lat, hdg_est_lon, '
'arc_center_lat, arc_center_lon, arc_radius_ft, '
'arc_residual, arc_coverage, arc_valid, arc_point_count, '
'weight_arc, weight_heading, cat_dist_ft, '
'vessel_lat, vessel_lon) '
'VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)',
(ts,
tracker.marked_lat, tracker.marked_lon,
tracker.estimated_lat, tracker.estimated_lon,
tracker.uncertainty_radius_ft, tracker.drift_ft,
tracker.last_heading_est_lat, tracker.last_heading_est_lon,
tracker.last_arc_center_lat, tracker.last_arc_center_lon,
tracker.last_arc_radius_ft, tracker.last_arc_residual,
tracker.last_arc_coverage,
int(tracker.last_arc_valid),
tracker.last_arc_point_count,
tracker.last_weight_arc, tracker.last_weight_heading,
tracker.estimated_distance_ft,
vessel_lat, vessel_lon),
)
self._conn.commit()
except sqlite3.Error:
logger.exception('Failed to insert estimation log')
cutoff = ts - ESTIMATION_LOG_MAX_AGE_SEC
try:
self._conn.execute(
'DELETE FROM estimation_log WHERE ts < ?', (cutoff,))
self._conn.commit()
except sqlite3.Error:
pass
def get_display_points_json(self):
"""Return a JSON-serializable list of merged display points."""
if self._display_dirty or self._display_cache is None:
self._build_display_cache()
return self._display_cache
def get_point_count(self):
try:
row = self._conn.execute(
'SELECT (SELECT COUNT(*) FROM raw_points) + '
'(SELECT COUNT(*) FROM summary_points)',
).fetchone()
return row[0] if row else 0
except sqlite3.Error:
return 0
def clear(self):
"""Remove all track data."""
try:
self._conn.execute('DELETE FROM raw_points')
self._conn.execute('DELETE FROM summary_points')
self._conn.execute('DELETE FROM estimation_log')
self._conn.commit()
except sqlite3.Error:
logger.exception('Failed to clear track tables')
self._display_cache = []
self._display_dirty = False
# -- internals -----------------------------------------------------------
def _compact(self, now):
"""Move raw_points older than RAW_WINDOW_SEC into summary_points,
and prune summaries older than MAX_SUMMARY_AGE_SEC."""
cutoff = now - RAW_WINDOW_SEC
try:
rows = self._conn.execute(
'SELECT ts, lat, lon, hdg, cog, spd, ws, wd, dist, depth '
'FROM raw_points WHERE ts < ? ORDER BY ts',
(cutoff,),
).fetchall()
if rows:
summaries = self._aggregate_rows(rows, now)
self._conn.executemany(
'INSERT OR REPLACE INTO summary_points '
'(ts, lat, lon, hdg, hdg_min, hdg_max, cog, spd, '
'ws, ws_min, ws_max, wd, dist, depth, count) '
'VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)',
summaries,
)
self._conn.execute('DELETE FROM raw_points WHERE ts < ?', (cutoff,))
old_cutoff = now - MAX_SUMMARY_AGE_SEC
self._conn.execute('DELETE FROM summary_points WHERE ts < ?', (old_cutoff,))
self._conn.commit()
self._display_dirty = True
if rows:
logger.info(
'Compacted %d raw points into %d summaries',
len(rows), len(summaries),
)
except sqlite3.Error:
logger.exception('Compaction failed')
def _aggregate_rows(self, rows, now):
"""Group rows into time buckets and return summary tuples."""
twenty_four_h_ago = now - 24 * 3600
groups = {}
for row in rows:
ts = row[0]
if ts >= twenty_four_h_ago:
bucket_sec = SUMMARY_BUCKET_RECENT_MIN * 60
else:
bucket_sec = SUMMARY_BUCKET_OLD_MIN * 60
bucket_key = int(ts // bucket_sec) * bucket_sec
groups.setdefault(bucket_key, []).append(row)
summaries = []
for _bk, pts in groups.items():
n = len(pts)
def _avg(idx):
vals = [p[idx] for p in pts if p[idx] is not None]
return sum(vals) / len(vals) if vals else None
def _min_val(idx):
vals = [p[idx] for p in pts if p[idx] is not None]
return min(vals) if vals else None
def _max_val(idx):
vals = [p[idx] for p in pts if p[idx] is not None]
return max(vals) if vals else None
timestamps = sorted(p[0] for p in pts)
mid_ts = timestamps[len(timestamps) // 2]
summaries.append((
mid_ts,
_avg(1), # lat
_avg(2), # lon
_avg(3), # hdg
_min_val(3), # hdg_min
_max_val(3), # hdg_max
_avg(4), # cog
_avg(5), # spd
_avg(6), # ws
_min_val(6), # ws_min
_max_val(6), # ws_max
_avg(7), # wd
_avg(8), # dist
_avg(9), # depth
n,
))
return summaries
def _build_display_cache(self):
"""Query both tables and merge for display."""
now = time.time()
full_res_cutoff = now - DISPLAY_FULL_RES_SEC
one_hour_cutoff = now - 3600
six_hour_cutoff = now - 6 * 3600
try:
raw = self._conn.execute(
'SELECT ts, lat, lon FROM raw_points '
'WHERE ts >= ? ORDER BY ts',
(full_res_cutoff,),
).fetchall()
summaries = self._conn.execute(
'SELECT ts, lat, lon FROM summary_points ORDER BY ts',
).fetchall()
except sqlite3.Error:
logger.exception('Failed to query display points')
self._display_cache = []
self._display_dirty = False
return
recent_raw = [
{'ts': r[0], 'lat': r[1], 'lon': r[2]}
for r in raw
]
bucket_1h = []
bucket_6h = []
bucket_old = []
for s in summaries:
ts = s[0]
pt = {'ts': ts, 'lat': s[1], 'lon': s[2], 'merged': True}
if ts >= one_hour_cutoff:
bucket_1h.append(pt)
elif ts >= six_hour_cutoff:
bucket_6h.append(pt)
else:
bucket_old.append(pt)
merged = []
merged.extend(_downsample(bucket_old, DISPLAY_OLD_BUCKET_MIN))
merged.extend(_downsample(bucket_6h, DISPLAY_6H_BUCKET_MIN))
merged.extend(_downsample(bucket_1h, DISPLAY_1H_BUCKET_MIN))
merged.extend(recent_raw)
self._display_cache = merged
self._display_dirty = False
def _maybe_migrate_jsonl(self):
"""One-time migration from legacy JSONL file."""
jsonl_path = os.path.join(self._data_dir, JSONL_FILE)
if not os.path.isfile(jsonl_path):
return
count = 0
try:
with open(jsonl_path, 'r') as f:
for line in f:
line = line.strip()
if not line:
continue
try:
pt = json.loads(line)
except json.JSONDecodeError:
continue
self._conn.execute(
'INSERT OR IGNORE INTO raw_points '
'(ts, lat, lon, hdg, cog, spd, ws, wd, dist, depth) '
'VALUES (?,?,?,?,?,?,?,?,?,?)',
(pt.get('ts'), pt.get('lat'), pt.get('lon'),
pt.get('hdg'), pt.get('cog'), pt.get('spd'),
pt.get('ws'), pt.get('wd'), pt.get('dist'),
pt.get('depth')),
)
count += 1
self._conn.commit()
backup = jsonl_path + '.migrated'
os.rename(jsonl_path, backup)
logger.info('Migrated %d points from JSONL -> SQLite (backup: %s)', count, backup)
except (OSError, sqlite3.Error):
logger.exception('JSONL migration failed')
def _downsample(points, interval_min):
"""Thin a list of dicts by keeping one per interval_min bucket."""
if not points:
return []
interval_sec = interval_min * 60
result = []
last_bucket = None
for pt in points:
bucket = int(pt['ts'] // interval_sec)
if bucket != last_bucket:
result.append(pt)
last_bucket = bucket
return result