Organizes 11 projects for Cerbo GX/Venus OS into a single repository: - axiom-nmea: Raymarine LightHouse protocol decoder - dbus-generator-ramp: Generator current ramp controller - dbus-lightning: Blitzortung lightning monitor - dbus-meteoblue-forecast: Meteoblue weather forecast - dbus-no-foreign-land: noforeignland.com tracking - dbus-tides: Tide prediction from depth + harmonics - dbus-vrm-history: VRM cloud history proxy - dbus-windy-station: Windy.com weather upload - mfd-custom-app: MFD app deployment package - venus-html5-app: Custom Victron HTML5 app fork - watermaker: Watermaker PLC control UI Adds root README, .gitignore, project template, and per-project .gitignore files. Sensitive config files excluded via .gitignore with .example templates provided. Made-with: Cursor
379 lines
11 KiB
Python
379 lines
11 KiB
Python
"""
|
|
Blitzortung.org WebSocket client with LZW decoding and automatic reconnection.
|
|
|
|
Uses only Python stdlib (socket, ssl, hashlib, struct) -- no external dependencies.
|
|
Implements the WebSocket protocol (RFC 6455) at the minimum level needed for
|
|
text-frame communication with the Blitzortung push service.
|
|
"""
|
|
|
|
import hashlib
|
|
import json
|
|
import logging
|
|
import os
|
|
import socket
|
|
import ssl
|
|
import struct
|
|
import threading
|
|
import time
|
|
from base64 import b64encode
|
|
|
|
logger = logging.getLogger('BlitzortungClient')
|
|
|
|
STATUS_CONNECTED = 'connected'
|
|
STATUS_DISCONNECTED = 'disconnected'
|
|
STATUS_RECONNECTING = 'reconnecting'
|
|
|
|
_WS_MAGIC = b'258EAFA5-E914-47DA-95CA-5AB4B141E427'
|
|
|
|
OPCODE_TEXT = 0x1
|
|
OPCODE_CLOSE = 0x8
|
|
OPCODE_PING = 0x9
|
|
OPCODE_PONG = 0xA
|
|
|
|
|
|
def decode_lzw(b):
|
|
"""Decode LZW-obfuscated Blitzortung message into a dict."""
|
|
e = {}
|
|
d = list(b)
|
|
c = d[0]
|
|
f = c
|
|
g = [c]
|
|
h = 256
|
|
o = h
|
|
for i in range(1, len(d)):
|
|
a = ord(d[i])
|
|
a = d[i] if h > a else e[a] if e.get(a) else f + c
|
|
g.append(a)
|
|
c = a[0]
|
|
e[o] = f + c
|
|
o += 1
|
|
f = a
|
|
return json.loads(''.join(g))
|
|
|
|
|
|
def _parse_wss_url(url):
|
|
"""Parse 'wss://host[:port][/path]' into (host, port, path)."""
|
|
if url.startswith('wss://'):
|
|
rest = url[6:]
|
|
elif url.startswith('ws://'):
|
|
rest = url[5:]
|
|
else:
|
|
raise ValueError(f"Unsupported URL scheme: {url}")
|
|
|
|
is_tls = url.startswith('wss://')
|
|
default_port = 443 if is_tls else 80
|
|
|
|
slash = rest.find('/')
|
|
if slash >= 0:
|
|
host_part = rest[:slash]
|
|
path = rest[slash:]
|
|
else:
|
|
host_part = rest
|
|
path = '/'
|
|
|
|
if ':' in host_part:
|
|
host, port_str = host_part.rsplit(':', 1)
|
|
port = int(port_str)
|
|
else:
|
|
host = host_part
|
|
port = default_port
|
|
|
|
return host, port, path, is_tls
|
|
|
|
|
|
class _WebSocketConn:
|
|
"""Minimal RFC 6455 WebSocket client using stdlib only."""
|
|
|
|
def __init__(self, url, timeout=30):
|
|
self._sock = None
|
|
self._closed = False
|
|
host, port, path, is_tls = _parse_wss_url(url)
|
|
self._connect(host, port, path, is_tls, timeout)
|
|
|
|
def _connect(self, host, port, path, is_tls, timeout):
|
|
raw = socket.create_connection((host, port), timeout=timeout)
|
|
raw.settimeout(timeout)
|
|
|
|
if is_tls:
|
|
ctx = ssl.create_default_context()
|
|
ctx.check_hostname = False
|
|
ctx.verify_mode = ssl.CERT_NONE
|
|
self._sock = ctx.wrap_socket(raw, server_hostname=host)
|
|
else:
|
|
self._sock = raw
|
|
|
|
key_bytes = b64encode(os.urandom(16))
|
|
key = key_bytes.decode('ascii')
|
|
request = (
|
|
f"GET {path} HTTP/1.1\r\n"
|
|
f"Host: {host}\r\n"
|
|
f"Upgrade: websocket\r\n"
|
|
f"Connection: Upgrade\r\n"
|
|
f"Sec-WebSocket-Key: {key}\r\n"
|
|
f"Sec-WebSocket-Version: 13\r\n"
|
|
f"Origin: https://{host}\r\n"
|
|
f"\r\n"
|
|
)
|
|
self._sock.sendall(request.encode('ascii'))
|
|
|
|
response = b''
|
|
while b'\r\n\r\n' not in response:
|
|
chunk = self._sock.recv(4096)
|
|
if not chunk:
|
|
raise ConnectionError("Connection closed during handshake")
|
|
response += chunk
|
|
|
|
header_block = response.split(b'\r\n\r\n')[0].decode('ascii', errors='replace')
|
|
status_line = header_block.split('\r\n')[0]
|
|
if '101' not in status_line:
|
|
raise ConnectionError(f"WebSocket handshake failed: {status_line}")
|
|
|
|
expected_accept = b64encode(
|
|
hashlib.sha1(key_bytes + _WS_MAGIC).digest()
|
|
).decode('ascii')
|
|
|
|
def send_text(self, text):
|
|
"""Send a masked text frame."""
|
|
payload = text.encode('utf-8')
|
|
frame = bytearray()
|
|
frame.append(0x80 | OPCODE_TEXT)
|
|
|
|
length = len(payload)
|
|
if length < 126:
|
|
frame.append(0x80 | length)
|
|
elif length < 65536:
|
|
frame.append(0x80 | 126)
|
|
frame.extend(struct.pack('!H', length))
|
|
else:
|
|
frame.append(0x80 | 127)
|
|
frame.extend(struct.pack('!Q', length))
|
|
|
|
mask = os.urandom(4)
|
|
frame.extend(mask)
|
|
frame.extend(bytes(b ^ mask[i % 4] for i, b in enumerate(payload)))
|
|
|
|
self._sock.sendall(frame)
|
|
|
|
def _recv_exact(self, n):
|
|
"""Read exactly n bytes from the socket."""
|
|
data = b''
|
|
while len(data) < n:
|
|
chunk = self._sock.recv(n - len(data))
|
|
if not chunk:
|
|
raise ConnectionError("Connection closed")
|
|
data += chunk
|
|
return data
|
|
|
|
def recv(self):
|
|
"""Receive next text message, handling control frames internally.
|
|
|
|
Returns the text payload as a string, or None on close.
|
|
Raises socket.timeout on timeout.
|
|
"""
|
|
buf = bytearray()
|
|
|
|
while True:
|
|
b0, b1 = self._recv_exact(2)
|
|
fin = b0 & 0x80
|
|
opcode = b0 & 0x0F
|
|
masked = b1 & 0x80
|
|
length = b1 & 0x7F
|
|
|
|
if length == 126:
|
|
length = struct.unpack('!H', self._recv_exact(2))[0]
|
|
elif length == 127:
|
|
length = struct.unpack('!Q', self._recv_exact(8))[0]
|
|
|
|
if masked:
|
|
mask = self._recv_exact(4)
|
|
raw = self._recv_exact(length)
|
|
payload = bytes(b ^ mask[i % 4] for i, b in enumerate(raw))
|
|
else:
|
|
payload = self._recv_exact(length)
|
|
|
|
if opcode == OPCODE_CLOSE:
|
|
self._send_close()
|
|
return None
|
|
elif opcode == OPCODE_PING:
|
|
self._send_pong(payload)
|
|
continue
|
|
elif opcode == OPCODE_PONG:
|
|
continue
|
|
elif opcode == OPCODE_TEXT or opcode == 0x0:
|
|
buf.extend(payload)
|
|
if fin:
|
|
return buf.decode('utf-8', errors='replace')
|
|
|
|
def _send_close(self):
|
|
"""Send a close frame."""
|
|
try:
|
|
frame = bytes([0x80 | OPCODE_CLOSE, 0x80, 0, 0, 0, 0])
|
|
self._sock.sendall(frame)
|
|
except Exception:
|
|
pass
|
|
|
|
def _send_pong(self, data):
|
|
"""Send a pong frame echoing the ping payload."""
|
|
frame = bytearray()
|
|
frame.append(0x80 | OPCODE_PONG)
|
|
length = len(data)
|
|
if length < 126:
|
|
frame.append(0x80 | length)
|
|
else:
|
|
frame.append(0x80 | 126)
|
|
frame.extend(struct.pack('!H', length))
|
|
mask = os.urandom(4)
|
|
frame.extend(mask)
|
|
frame.extend(bytes(b ^ mask[i % 4] for i, b in enumerate(data)))
|
|
try:
|
|
self._sock.sendall(frame)
|
|
except Exception:
|
|
pass
|
|
|
|
def close(self):
|
|
if self._closed:
|
|
return
|
|
self._closed = True
|
|
try:
|
|
self._send_close()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self._sock.shutdown(socket.SHUT_RDWR)
|
|
except Exception:
|
|
pass
|
|
try:
|
|
self._sock.close()
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
class BlitzortungClient:
|
|
"""Persistent WebSocket connection to Blitzortung with auto-reconnect."""
|
|
|
|
def __init__(self, servers, init_msg, on_strike, base_delay=1.0, max_delay=60.0):
|
|
self.servers = servers
|
|
self.init_msg = init_msg
|
|
self.on_strike = on_strike
|
|
self.base_delay = base_delay
|
|
self.max_delay = max_delay
|
|
|
|
self.status = STATUS_DISCONNECTED
|
|
self._server_index = 0
|
|
self._delay = base_delay
|
|
self._running = False
|
|
self._thread = None
|
|
self._ws = None
|
|
self._lock = threading.Lock()
|
|
|
|
def start(self):
|
|
if self._running:
|
|
return
|
|
self._running = True
|
|
self._thread = threading.Thread(target=self._run_loop, daemon=True)
|
|
self._thread.start()
|
|
logger.info("Client started")
|
|
|
|
def stop(self):
|
|
self._running = False
|
|
with self._lock:
|
|
if self._ws:
|
|
try:
|
|
self._ws.close()
|
|
except Exception:
|
|
pass
|
|
if self._thread:
|
|
self._thread.join(timeout=5)
|
|
self.status = STATUS_DISCONNECTED
|
|
logger.info("Client stopped")
|
|
|
|
def _run_loop(self):
|
|
while self._running:
|
|
url = self.servers[self._server_index]
|
|
try:
|
|
self._connect(url)
|
|
except Exception as e:
|
|
logger.warning(f"Connection to {url} failed: {e}")
|
|
|
|
if not self._running:
|
|
break
|
|
|
|
self.status = STATUS_RECONNECTING
|
|
logger.info(f"Reconnecting in {self._delay:.0f}s...")
|
|
time.sleep(self._delay)
|
|
|
|
self._delay = min(self._delay * 2, self.max_delay)
|
|
self._server_index = (self._server_index + 1) % len(self.servers)
|
|
|
|
def _connect(self, url):
|
|
logger.info(f"Connecting to {url}")
|
|
|
|
try:
|
|
ws = _WebSocketConn(url, timeout=30)
|
|
except Exception as e:
|
|
logger.warning(f"WebSocket connect error: {e}")
|
|
raise
|
|
|
|
with self._lock:
|
|
self._ws = ws
|
|
|
|
self.status = STATUS_CONNECTED
|
|
self._delay = self.base_delay
|
|
logger.info(f"Connected to {url}")
|
|
|
|
try:
|
|
ws.send_text(self.init_msg)
|
|
logger.info(f"Sent init message: {self.init_msg}")
|
|
except Exception as e:
|
|
logger.error(f"Failed to send init: {e}")
|
|
ws.close()
|
|
raise
|
|
|
|
while self._running:
|
|
try:
|
|
raw = ws.recv()
|
|
if raw is None:
|
|
logger.warning("WebSocket connection closed by server")
|
|
break
|
|
self._handle_message(raw)
|
|
except socket.timeout:
|
|
continue
|
|
except (ConnectionError, OSError) as e:
|
|
logger.warning(f"WebSocket recv error: {e}")
|
|
break
|
|
|
|
self.status = STATUS_DISCONNECTED
|
|
with self._lock:
|
|
self._ws = None
|
|
ws.close()
|
|
|
|
def _handle_message(self, raw):
|
|
try:
|
|
data = decode_lzw(raw)
|
|
|
|
if not isinstance(data, dict):
|
|
return
|
|
|
|
lat = data.get('lat')
|
|
lon = data.get('lon')
|
|
strike_time = data.get('time')
|
|
|
|
if lat is None or lon is None:
|
|
return
|
|
|
|
if strike_time is not None:
|
|
if strike_time > 1e15:
|
|
ts_ms = int(strike_time / 1e6)
|
|
elif strike_time > 1e12:
|
|
ts_ms = int(strike_time)
|
|
else:
|
|
ts_ms = int(strike_time * 1000)
|
|
else:
|
|
ts_ms = int(time.time() * 1000)
|
|
|
|
self.on_strike(float(lat), float(lon), ts_ms)
|
|
except json.JSONDecodeError:
|
|
pass
|
|
except Exception as e:
|
|
logger.debug(f"Message decode error: {e}")
|