Organizes 11 projects for Cerbo GX/Venus OS into a single repository: - axiom-nmea: Raymarine LightHouse protocol decoder - dbus-generator-ramp: Generator current ramp controller - dbus-lightning: Blitzortung lightning monitor - dbus-meteoblue-forecast: Meteoblue weather forecast - dbus-no-foreign-land: noforeignland.com tracking - dbus-tides: Tide prediction from depth + harmonics - dbus-vrm-history: VRM cloud history proxy - dbus-windy-station: Windy.com weather upload - mfd-custom-app: MFD app deployment package - venus-html5-app: Custom Victron HTML5 app fork - watermaker: Watermaker PLC control UI Adds root README, .gitignore, project template, and per-project .gitignore files. Sensitive config files excluded via .gitignore with .example templates provided. Made-with: Cursor
224 lines
7.8 KiB
Python
224 lines
7.8 KiB
Python
"""
|
|
Lightning analysis engine: windowed aggregation, approach detection, ETA.
|
|
"""
|
|
|
|
import math
|
|
import time
|
|
|
|
from strike_buffer import haversine_miles, bearing_degrees
|
|
|
|
|
|
def _linear_regression(xs, ys):
|
|
"""Simple linear regression returning (slope, intercept, r_squared)."""
|
|
n = len(xs)
|
|
if n < 2:
|
|
return None, None, None
|
|
|
|
sum_x = sum(xs)
|
|
sum_y = sum(ys)
|
|
sum_xy = sum(x * y for x, y in zip(xs, ys))
|
|
sum_x2 = sum(x * x for x in xs)
|
|
sum_y2 = sum(y * y for y in ys)
|
|
|
|
denom = n * sum_x2 - sum_x * sum_x
|
|
if abs(denom) < 1e-12:
|
|
return None, None, None
|
|
|
|
slope = (n * sum_xy - sum_x * sum_y) / denom
|
|
intercept = (sum_y - slope * sum_x) / n
|
|
|
|
ss_tot = sum_y2 - (sum_y * sum_y) / n
|
|
if abs(ss_tot) < 1e-12:
|
|
return slope, intercept, 1.0
|
|
|
|
ss_res = sum((y - (slope * x + intercept)) ** 2 for x, y in zip(xs, ys))
|
|
r_squared = max(0.0, 1.0 - ss_res / ss_tot)
|
|
|
|
return slope, intercept, r_squared
|
|
|
|
|
|
def _bearing_stddev(bearings):
|
|
"""Circular standard deviation of bearings in degrees."""
|
|
if len(bearings) < 2:
|
|
return 0.0
|
|
to_rad = math.pi / 180
|
|
sin_sum = sum(math.sin(b * to_rad) for b in bearings)
|
|
cos_sum = sum(math.cos(b * to_rad) for b in bearings)
|
|
n = len(bearings)
|
|
r = math.sqrt((sin_sum / n) ** 2 + (cos_sum / n) ** 2)
|
|
r = min(r, 1.0)
|
|
if r >= 1.0:
|
|
return 0.0
|
|
return math.degrees(math.sqrt(-2.0 * math.log(r)))
|
|
|
|
|
|
def _bearing_to_cardinal(bearing):
|
|
"""Convert bearing to cardinal/intercardinal label."""
|
|
dirs = ['N', 'NE', 'E', 'SE', 'S', 'SW', 'W', 'NW']
|
|
idx = int(((bearing + 22.5) % 360) / 45)
|
|
return dirs[idx]
|
|
|
|
|
|
class AnalysisEngine:
|
|
"""Processes strike buffer into a summary for frontend display."""
|
|
|
|
def __init__(self, config):
|
|
self.window_size = config.get('window_size', 900)
|
|
self.num_windows = config.get('num_windows', 4)
|
|
self.min_strikes_active = config.get('min_strikes_active', 3)
|
|
self.min_windows_approach = config.get('min_windows_approach', 3)
|
|
self.r2_approaching = config.get('r2_approaching', 0.7)
|
|
self.r2_downgrade = config.get('r2_downgrade', 0.5)
|
|
self.max_bearing_stddev = config.get('max_bearing_stddev', 30.0)
|
|
self.max_eta_minutes = config.get('max_eta_minutes', 240)
|
|
|
|
def analyze(self, strikes, vessel_lat, vessel_lon):
|
|
"""Produce summary dict from current strike list and vessel position."""
|
|
now = time.time()
|
|
now_ms = now * 1000
|
|
|
|
recent_cutoff = now_ms - self.window_size * 1000
|
|
recent = [s for s in strikes if s.timestamp >= recent_cutoff]
|
|
|
|
active = len(recent) >= self.min_strikes_active
|
|
|
|
if not active:
|
|
return {
|
|
'active': False,
|
|
'strike_count_15m': len(recent),
|
|
'nearest_distance': None,
|
|
'centroid_bearing': None,
|
|
'centroid_distance': None,
|
|
'cardinal': None,
|
|
'approaching': False,
|
|
'approach_speed': None,
|
|
'eta_minutes': None,
|
|
'confidence': None,
|
|
}
|
|
|
|
nearest_distance = min(s.distance for s in recent) if recent else None
|
|
|
|
windows = self._build_windows(strikes, now_ms)
|
|
|
|
current_window = windows[-1] if windows else None
|
|
centroid_bearing = None
|
|
centroid_distance = None
|
|
cardinal = None
|
|
|
|
if current_window and current_window['count'] > 0:
|
|
centroid_bearing = current_window['centroid_bearing']
|
|
centroid_distance = current_window['centroid_distance']
|
|
cardinal = _bearing_to_cardinal(centroid_bearing)
|
|
|
|
approaching, approach_speed, eta_minutes, confidence = \
|
|
self._detect_approach(windows, vessel_lat, vessel_lon)
|
|
|
|
if centroid_distance is not None:
|
|
centroid_distance = round(centroid_distance, 1)
|
|
if nearest_distance is not None:
|
|
nearest_distance = round(nearest_distance, 1)
|
|
if centroid_bearing is not None:
|
|
centroid_bearing = round(centroid_bearing, 0)
|
|
if approach_speed is not None:
|
|
approach_speed = round(approach_speed, 0)
|
|
if eta_minutes is not None:
|
|
eta_minutes = round(eta_minutes, 0)
|
|
if confidence is not None:
|
|
confidence = round(confidence, 2)
|
|
|
|
return {
|
|
'active': True,
|
|
'strike_count_15m': len(recent),
|
|
'nearest_distance': nearest_distance,
|
|
'centroid_bearing': centroid_bearing,
|
|
'centroid_distance': centroid_distance,
|
|
'cardinal': cardinal,
|
|
'approaching': approaching,
|
|
'approach_speed': approach_speed,
|
|
'eta_minutes': eta_minutes,
|
|
'confidence': confidence,
|
|
}
|
|
|
|
def _build_windows(self, strikes, now_ms):
|
|
"""Divide strikes into 15-minute windows, most recent last."""
|
|
windows = []
|
|
for i in range(self.num_windows):
|
|
window_end = now_ms - i * self.window_size * 1000
|
|
window_start = window_end - self.window_size * 1000
|
|
window_mid = (window_start + window_end) / 2
|
|
|
|
window_strikes = [s for s in strikes
|
|
if window_start <= s.timestamp < window_end]
|
|
|
|
if not window_strikes:
|
|
windows.append({
|
|
'mid_time': window_mid,
|
|
'count': 0,
|
|
'centroid_lat': None,
|
|
'centroid_lon': None,
|
|
'centroid_distance': None,
|
|
'centroid_bearing': None,
|
|
})
|
|
continue
|
|
|
|
avg_lat = sum(s.lat for s in window_strikes) / len(window_strikes)
|
|
avg_lon = sum(s.lon for s in window_strikes) / len(window_strikes)
|
|
avg_dist = sum(s.distance for s in window_strikes) / len(window_strikes)
|
|
|
|
to_rad = math.pi / 180
|
|
sin_sum = sum(math.sin(s.bearing * to_rad) for s in window_strikes)
|
|
cos_sum = sum(math.cos(s.bearing * to_rad) for s in window_strikes)
|
|
avg_bearing = (math.degrees(math.atan2(sin_sum, cos_sum)) + 360) % 360
|
|
|
|
windows.append({
|
|
'mid_time': window_mid,
|
|
'count': len(window_strikes),
|
|
'centroid_lat': avg_lat,
|
|
'centroid_lon': avg_lon,
|
|
'centroid_distance': avg_dist,
|
|
'centroid_bearing': avg_bearing,
|
|
})
|
|
|
|
windows.reverse()
|
|
return windows
|
|
|
|
def _detect_approach(self, windows, vessel_lat, vessel_lon):
|
|
"""Detect if storm is approaching. Returns (approaching, speed_mph, eta_min, r2)."""
|
|
populated = [w for w in windows if w['count'] > 0]
|
|
if len(populated) < self.min_windows_approach:
|
|
return False, None, None, None
|
|
|
|
times = [w['mid_time'] / 60000 for w in populated]
|
|
distances = [w['centroid_distance'] for w in populated]
|
|
bearings = [w['centroid_bearing'] for w in populated]
|
|
|
|
bearing_sd = _bearing_stddev(bearings)
|
|
if bearing_sd > self.max_bearing_stddev:
|
|
return False, None, None, None
|
|
|
|
slope, intercept, r2 = _linear_regression(times, distances)
|
|
if slope is None or r2 is None:
|
|
return False, None, None, None
|
|
|
|
if slope >= 0:
|
|
return False, None, None, None
|
|
|
|
if r2 < self.r2_downgrade:
|
|
return False, None, None, None
|
|
|
|
approaching = r2 >= self.r2_approaching
|
|
if not approaching:
|
|
return False, None, None, r2
|
|
|
|
speed_mph = abs(slope) * 60
|
|
current_distance = populated[-1]['centroid_distance']
|
|
|
|
if speed_mph < 0.1:
|
|
return True, 0, None, r2
|
|
|
|
eta = current_distance / speed_mph * 60
|
|
if eta > self.max_eta_minutes:
|
|
eta = self.max_eta_minutes
|
|
|
|
return True, speed_mph, eta, r2
|