Files
venus/dbus-lightning/analysis_engine.py
dev 9756538f16 Initial commit: Venus OS boat addons monorepo
Organizes 11 projects for Cerbo GX/Venus OS into a single repository:
- axiom-nmea: Raymarine LightHouse protocol decoder
- dbus-generator-ramp: Generator current ramp controller
- dbus-lightning: Blitzortung lightning monitor
- dbus-meteoblue-forecast: Meteoblue weather forecast
- dbus-no-foreign-land: noforeignland.com tracking
- dbus-tides: Tide prediction from depth + harmonics
- dbus-vrm-history: VRM cloud history proxy
- dbus-windy-station: Windy.com weather upload
- mfd-custom-app: MFD app deployment package
- venus-html5-app: Custom Victron HTML5 app fork
- watermaker: Watermaker PLC control UI

Adds root README, .gitignore, project template, and per-project
.gitignore files. Sensitive config files excluded via .gitignore
with .example templates provided.

Made-with: Cursor
2026-03-16 17:04:16 +00:00

224 lines
7.8 KiB
Python

"""
Lightning analysis engine: windowed aggregation, approach detection, ETA.
"""
import math
import time
from strike_buffer import haversine_miles, bearing_degrees
def _linear_regression(xs, ys):
"""Simple linear regression returning (slope, intercept, r_squared)."""
n = len(xs)
if n < 2:
return None, None, None
sum_x = sum(xs)
sum_y = sum(ys)
sum_xy = sum(x * y for x, y in zip(xs, ys))
sum_x2 = sum(x * x for x in xs)
sum_y2 = sum(y * y for y in ys)
denom = n * sum_x2 - sum_x * sum_x
if abs(denom) < 1e-12:
return None, None, None
slope = (n * sum_xy - sum_x * sum_y) / denom
intercept = (sum_y - slope * sum_x) / n
ss_tot = sum_y2 - (sum_y * sum_y) / n
if abs(ss_tot) < 1e-12:
return slope, intercept, 1.0
ss_res = sum((y - (slope * x + intercept)) ** 2 for x, y in zip(xs, ys))
r_squared = max(0.0, 1.0 - ss_res / ss_tot)
return slope, intercept, r_squared
def _bearing_stddev(bearings):
"""Circular standard deviation of bearings in degrees."""
if len(bearings) < 2:
return 0.0
to_rad = math.pi / 180
sin_sum = sum(math.sin(b * to_rad) for b in bearings)
cos_sum = sum(math.cos(b * to_rad) for b in bearings)
n = len(bearings)
r = math.sqrt((sin_sum / n) ** 2 + (cos_sum / n) ** 2)
r = min(r, 1.0)
if r >= 1.0:
return 0.0
return math.degrees(math.sqrt(-2.0 * math.log(r)))
def _bearing_to_cardinal(bearing):
"""Convert bearing to cardinal/intercardinal label."""
dirs = ['N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW']
idx = int(((bearing + 22.5) % 360) / 45)
return dirs[idx]
class AnalysisEngine:
"""Processes strike buffer into a summary for frontend display."""
def __init__(self, config):
self.window_size = config.get('window_size', 900)
self.num_windows = config.get('num_windows', 4)
self.min_strikes_active = config.get('min_strikes_active', 3)
self.min_windows_approach = config.get('min_windows_approach', 3)
self.r2_approaching = config.get('r2_approaching', 0.7)
self.r2_downgrade = config.get('r2_downgrade', 0.5)
self.max_bearing_stddev = config.get('max_bearing_stddev', 30.0)
self.max_eta_minutes = config.get('max_eta_minutes', 240)
def analyze(self, strikes, vessel_lat, vessel_lon):
"""Produce summary dict from current strike list and vessel position."""
now = time.time()
now_ms = now * 1000
recent_cutoff = now_ms - self.window_size * 1000
recent = [s for s in strikes if s.timestamp >= recent_cutoff]
active = len(recent) >= self.min_strikes_active
if not active:
return {
'active': False,
'strike_count_15m': len(recent),
'nearest_distance': None,
'centroid_bearing': None,
'centroid_distance': None,
'cardinal': None,
'approaching': False,
'approach_speed': None,
'eta_minutes': None,
'confidence': None,
}
nearest_distance = min(s.distance for s in recent) if recent else None
windows = self._build_windows(strikes, now_ms)
current_window = windows[-1] if windows else None
centroid_bearing = None
centroid_distance = None
cardinal = None
if current_window and current_window['count'] > 0:
centroid_bearing = current_window['centroid_bearing']
centroid_distance = current_window['centroid_distance']
cardinal = _bearing_to_cardinal(centroid_bearing)
approaching, approach_speed, eta_minutes, confidence = \
self._detect_approach(windows, vessel_lat, vessel_lon)
if centroid_distance is not None:
centroid_distance = round(centroid_distance, 1)
if nearest_distance is not None:
nearest_distance = round(nearest_distance, 1)
if centroid_bearing is not None:
centroid_bearing = round(centroid_bearing, 0)
if approach_speed is not None:
approach_speed = round(approach_speed, 0)
if eta_minutes is not None:
eta_minutes = round(eta_minutes, 0)
if confidence is not None:
confidence = round(confidence, 2)
return {
'active': True,
'strike_count_15m': len(recent),
'nearest_distance': nearest_distance,
'centroid_bearing': centroid_bearing,
'centroid_distance': centroid_distance,
'cardinal': cardinal,
'approaching': approaching,
'approach_speed': approach_speed,
'eta_minutes': eta_minutes,
'confidence': confidence,
}
def _build_windows(self, strikes, now_ms):
"""Divide strikes into 15-minute windows, most recent last."""
windows = []
for i in range(self.num_windows):
window_end = now_ms - i * self.window_size * 1000
window_start = window_end - self.window_size * 1000
window_mid = (window_start + window_end) / 2
window_strikes = [s for s in strikes
if window_start <= s.timestamp < window_end]
if not window_strikes:
windows.append({
'mid_time': window_mid,
'count': 0,
'centroid_lat': None,
'centroid_lon': None,
'centroid_distance': None,
'centroid_bearing': None,
})
continue
avg_lat = sum(s.lat for s in window_strikes) / len(window_strikes)
avg_lon = sum(s.lon for s in window_strikes) / len(window_strikes)
avg_dist = sum(s.distance for s in window_strikes) / len(window_strikes)
to_rad = math.pi / 180
sin_sum = sum(math.sin(s.bearing * to_rad) for s in window_strikes)
cos_sum = sum(math.cos(s.bearing * to_rad) for s in window_strikes)
avg_bearing = (math.degrees(math.atan2(sin_sum, cos_sum)) + 360) % 360
windows.append({
'mid_time': window_mid,
'count': len(window_strikes),
'centroid_lat': avg_lat,
'centroid_lon': avg_lon,
'centroid_distance': avg_dist,
'centroid_bearing': avg_bearing,
})
windows.reverse()
return windows
def _detect_approach(self, windows, vessel_lat, vessel_lon):
"""Detect if storm is approaching. Returns (approaching, speed_mph, eta_min, r2)."""
populated = [w for w in windows if w['count'] > 0]
if len(populated) < self.min_windows_approach:
return False, None, None, None
times = [w['mid_time'] / 60000 for w in populated]
distances = [w['centroid_distance'] for w in populated]
bearings = [w['centroid_bearing'] for w in populated]
bearing_sd = _bearing_stddev(bearings)
if bearing_sd > self.max_bearing_stddev:
return False, None, None, None
slope, intercept, r2 = _linear_regression(times, distances)
if slope is None or r2 is None:
return False, None, None, None
if slope >= 0:
return False, None, None, None
if r2 < self.r2_downgrade:
return False, None, None, None
approaching = r2 >= self.r2_approaching
if not approaching:
return False, None, None, r2
speed_mph = abs(slope) * 60
current_distance = populated[-1]['centroid_distance']
if speed_mph < 0.1:
return True, 0, None, r2
eta = current_distance / speed_mph * 60
if eta > self.max_eta_minutes:
eta = self.max_eta_minutes
return True, speed_mph, eta, r2