Files
venus/dbus-lightning/blitzortung_client.py
dev 9756538f16 Initial commit: Venus OS boat addons monorepo
Organizes 11 projects for Cerbo GX/Venus OS into a single repository:
- axiom-nmea: Raymarine LightHouse protocol decoder
- dbus-generator-ramp: Generator current ramp controller
- dbus-lightning: Blitzortung lightning monitor
- dbus-meteoblue-forecast: Meteoblue weather forecast
- dbus-no-foreign-land: noforeignland.com tracking
- dbus-tides: Tide prediction from depth + harmonics
- dbus-vrm-history: VRM cloud history proxy
- dbus-windy-station: Windy.com weather upload
- mfd-custom-app: MFD app deployment package
- venus-html5-app: Custom Victron HTML5 app fork
- watermaker: Watermaker PLC control UI

Adds root README, .gitignore, project template, and per-project
.gitignore files. Sensitive config files excluded via .gitignore
with .example templates provided.

Made-with: Cursor
2026-03-16 17:04:16 +00:00

379 lines
11 KiB
Python

"""
Blitzortung.org WebSocket client with LZW decoding and automatic reconnection.
Uses only Python stdlib (socket, ssl, hashlib, struct) -- no external dependencies.
Implements the WebSocket protocol (RFC 6455) at the minimum level needed for
text-frame communication with the Blitzortung push service.
"""
import hashlib
import json
import logging
import os
import socket
import ssl
import struct
import threading
import time
from base64 import b64encode
logger = logging.getLogger('BlitzortungClient')
STATUS_CONNECTED = 'connected'
STATUS_DISCONNECTED = 'disconnected'
STATUS_RECONNECTING = 'reconnecting'
_WS_MAGIC = b'258EAFA5-E914-47DA-95CA-5AB4B141E427'
OPCODE_TEXT = 0x1
OPCODE_CLOSE = 0x8
OPCODE_PING = 0x9
OPCODE_PONG = 0xA
def decode_lzw(b):
"""Decode LZW-obfuscated Blitzortung message into a dict."""
e = {}
d = list(b)
c = d[0]
f = c
g = [c]
h = 256
o = h
for i in range(1, len(d)):
a = ord(d[i])
a = d[i] if h > a else e[a] if e.get(a) else f + c
g.append(a)
c = a[0]
e[o] = f + c
o += 1
f = a
return json.loads(''.join(g))
def _parse_wss_url(url):
"""Parse 'wss://host[:port][/path]' into (host, port, path)."""
if url.startswith('wss://'):
rest = url[6:]
elif url.startswith('ws://'):
rest = url[5:]
else:
raise ValueError(f"Unsupported URL scheme: {url}")
is_tls = url.startswith('wss://')
default_port = 443 if is_tls else 80
slash = rest.find('/')
if slash >= 0:
host_part = rest[:slash]
path = rest[slash:]
else:
host_part = rest
path = '/'
if ':' in host_part:
host, port_str = host_part.rsplit(':', 1)
port = int(port_str)
else:
host = host_part
port = default_port
return host, port, path, is_tls
class _WebSocketConn:
"""Minimal RFC 6455 WebSocket client using stdlib only."""
def __init__(self, url, timeout=30):
self._sock = None
self._closed = False
host, port, path, is_tls = _parse_wss_url(url)
self._connect(host, port, path, is_tls, timeout)
def _connect(self, host, port, path, is_tls, timeout):
raw = socket.create_connection((host, port), timeout=timeout)
raw.settimeout(timeout)
if is_tls:
ctx = ssl.create_default_context()
ctx.check_hostname = False
ctx.verify_mode = ssl.CERT_NONE
self._sock = ctx.wrap_socket(raw, server_hostname=host)
else:
self._sock = raw
key_bytes = b64encode(os.urandom(16))
key = key_bytes.decode('ascii')
request = (
f"GET {path} HTTP/1.1\r\n"
f"Host: {host}\r\n"
f"Upgrade: websocket\r\n"
f"Connection: Upgrade\r\n"
f"Sec-WebSocket-Key: {key}\r\n"
f"Sec-WebSocket-Version: 13\r\n"
f"Origin: https://{host}\r\n"
f"\r\n"
)
self._sock.sendall(request.encode('ascii'))
response = b''
while b'\r\n\r\n' not in response:
chunk = self._sock.recv(4096)
if not chunk:
raise ConnectionError("Connection closed during handshake")
response += chunk
header_block = response.split(b'\r\n\r\n')[0].decode('ascii', errors='replace')
status_line = header_block.split('\r\n')[0]
if '101' not in status_line:
raise ConnectionError(f"WebSocket handshake failed: {status_line}")
expected_accept = b64encode(
hashlib.sha1(key_bytes + _WS_MAGIC).digest()
).decode('ascii')
def send_text(self, text):
"""Send a masked text frame."""
payload = text.encode('utf-8')
frame = bytearray()
frame.append(0x80 | OPCODE_TEXT)
length = len(payload)
if length < 126:
frame.append(0x80 | length)
elif length < 65536:
frame.append(0x80 | 126)
frame.extend(struct.pack('!H', length))
else:
frame.append(0x80 | 127)
frame.extend(struct.pack('!Q', length))
mask = os.urandom(4)
frame.extend(mask)
frame.extend(bytes(b ^ mask[i % 4] for i, b in enumerate(payload)))
self._sock.sendall(frame)
def _recv_exact(self, n):
"""Read exactly n bytes from the socket."""
data = b''
while len(data) < n:
chunk = self._sock.recv(n - len(data))
if not chunk:
raise ConnectionError("Connection closed")
data += chunk
return data
def recv(self):
"""Receive next text message, handling control frames internally.
Returns the text payload as a string, or None on close.
Raises socket.timeout on timeout.
"""
buf = bytearray()
while True:
b0, b1 = self._recv_exact(2)
fin = b0 & 0x80
opcode = b0 & 0x0F
masked = b1 & 0x80
length = b1 & 0x7F
if length == 126:
length = struct.unpack('!H', self._recv_exact(2))[0]
elif length == 127:
length = struct.unpack('!Q', self._recv_exact(8))[0]
if masked:
mask = self._recv_exact(4)
raw = self._recv_exact(length)
payload = bytes(b ^ mask[i % 4] for i, b in enumerate(raw))
else:
payload = self._recv_exact(length)
if opcode == OPCODE_CLOSE:
self._send_close()
return None
elif opcode == OPCODE_PING:
self._send_pong(payload)
continue
elif opcode == OPCODE_PONG:
continue
elif opcode == OPCODE_TEXT or opcode == 0x0:
buf.extend(payload)
if fin:
return buf.decode('utf-8', errors='replace')
def _send_close(self):
"""Send a close frame."""
try:
frame = bytes([0x80 | OPCODE_CLOSE, 0x80, 0, 0, 0, 0])
self._sock.sendall(frame)
except Exception:
pass
def _send_pong(self, data):
"""Send a pong frame echoing the ping payload."""
frame = bytearray()
frame.append(0x80 | OPCODE_PONG)
length = len(data)
if length < 126:
frame.append(0x80 | length)
else:
frame.append(0x80 | 126)
frame.extend(struct.pack('!H', length))
mask = os.urandom(4)
frame.extend(mask)
frame.extend(bytes(b ^ mask[i % 4] for i, b in enumerate(data)))
try:
self._sock.sendall(frame)
except Exception:
pass
def close(self):
if self._closed:
return
self._closed = True
try:
self._send_close()
except Exception:
pass
try:
self._sock.shutdown(socket.SHUT_RDWR)
except Exception:
pass
try:
self._sock.close()
except Exception:
pass
class BlitzortungClient:
"""Persistent WebSocket connection to Blitzortung with auto-reconnect."""
def __init__(self, servers, init_msg, on_strike, base_delay=1.0, max_delay=60.0):
self.servers = servers
self.init_msg = init_msg
self.on_strike = on_strike
self.base_delay = base_delay
self.max_delay = max_delay
self.status = STATUS_DISCONNECTED
self._server_index = 0
self._delay = base_delay
self._running = False
self._thread = None
self._ws = None
self._lock = threading.Lock()
def start(self):
if self._running:
return
self._running = True
self._thread = threading.Thread(target=self._run_loop, daemon=True)
self._thread.start()
logger.info("Client started")
def stop(self):
self._running = False
with self._lock:
if self._ws:
try:
self._ws.close()
except Exception:
pass
if self._thread:
self._thread.join(timeout=5)
self.status = STATUS_DISCONNECTED
logger.info("Client stopped")
def _run_loop(self):
while self._running:
url = self.servers[self._server_index]
try:
self._connect(url)
except Exception as e:
logger.warning(f"Connection to {url} failed: {e}")
if not self._running:
break
self.status = STATUS_RECONNECTING
logger.info(f"Reconnecting in {self._delay:.0f}s...")
time.sleep(self._delay)
self._delay = min(self._delay * 2, self.max_delay)
self._server_index = (self._server_index + 1) % len(self.servers)
def _connect(self, url):
logger.info(f"Connecting to {url}")
try:
ws = _WebSocketConn(url, timeout=30)
except Exception as e:
logger.warning(f"WebSocket connect error: {e}")
raise
with self._lock:
self._ws = ws
self.status = STATUS_CONNECTED
self._delay = self.base_delay
logger.info(f"Connected to {url}")
try:
ws.send_text(self.init_msg)
logger.info(f"Sent init message: {self.init_msg}")
except Exception as e:
logger.error(f"Failed to send init: {e}")
ws.close()
raise
while self._running:
try:
raw = ws.recv()
if raw is None:
logger.warning("WebSocket connection closed by server")
break
self._handle_message(raw)
except socket.timeout:
continue
except (ConnectionError, OSError) as e:
logger.warning(f"WebSocket recv error: {e}")
break
self.status = STATUS_DISCONNECTED
with self._lock:
self._ws = None
ws.close()
def _handle_message(self, raw):
try:
data = decode_lzw(raw)
if not isinstance(data, dict):
return
lat = data.get('lat')
lon = data.get('lon')
strike_time = data.get('time')
if lat is None or lon is None:
return
if strike_time is not None:
if strike_time > 1e15:
ts_ms = int(strike_time / 1e6)
elif strike_time > 1e12:
ts_ms = int(strike_time)
else:
ts_ms = int(strike_time * 1000)
else:
ts_ms = int(time.time() * 1000)
self.on_strike(float(lat), float(lon), ts_ms)
except json.JSONDecodeError:
pass
except Exception as e:
logger.debug(f"Message decode error: {e}")