Files
venus/axiom-nmea/raymarine_nmea/nmea/generator.py
dev 9756538f16 Initial commit: Venus OS boat addons monorepo
Organizes 11 projects for Cerbo GX/Venus OS into a single repository:
- axiom-nmea: Raymarine LightHouse protocol decoder
- dbus-generator-ramp: Generator current ramp controller
- dbus-lightning: Blitzortung lightning monitor
- dbus-meteoblue-forecast: Meteoblue weather forecast
- dbus-no-foreign-land: noforeignland.com tracking
- dbus-tides: Tide prediction from depth + harmonics
- dbus-vrm-history: VRM cloud history proxy
- dbus-windy-station: Windy.com weather upload
- mfd-custom-app: MFD app deployment package
- venus-html5-app: Custom Victron HTML5 app fork
- watermaker: Watermaker PLC control UI

Adds root README, .gitignore, project template, and per-project
.gitignore files. Sensitive config files excluded via .gitignore
with .example templates provided.

Made-with: Cursor
2026-03-16 17:04:16 +00:00

524 lines
14 KiB
Python

"""
NMEA sentence generator.
Generates complete sets of NMEA 0183 sentences from sensor data.
"""
from datetime import datetime
from typing import List, Optional, Dict, Set
from ..data.store import SensorData
from ..sensors import get_tank_name, get_battery_name
from .sentences import (
GGASentence,
GLLSentence,
RMCSentence,
HDGSentence,
HDTSentence,
VTGSentence,
VHWSentence,
MWVSentence,
MWDSentence,
DPTSentence,
DBTSentence,
MTWSentence,
MTASentence,
XDRSentence,
)
class NMEAGenerator:
"""Generates NMEA 0183 sentences from sensor data.
This class provides methods to generate various NMEA sentences
from SensorData. It can generate individual sentence types or
complete sets of all available sentences.
Example:
generator = NMEAGenerator()
# Generate all sentences
sentences = generator.generate_all(sensor_data)
for sentence in sentences:
print(sentence, end='')
# Generate specific sentences
gga = generator.generate_gga(sensor_data)
mwd = generator.generate_mwd(sensor_data)
Sentence Types Generated:
GPS: GGA, GLL, RMC
Navigation: HDG, HDT, VTG, VHW
Wind: MWV (apparent & true), MWD
Depth: DPT, DBT
Temperature: MTW, MTA
Transducers: XDR (tanks, batteries)
"""
# Default magnetic variation (can be overridden)
DEFAULT_MAG_VARIATION = 0.0
# Sentence types that can be enabled/disabled
SENTENCE_TYPES = {
'gga', 'gll', 'rmc', # GPS
'hdg', 'hdt', 'vtg', 'vhw', # Navigation
'mwv_apparent', 'mwv_true', 'mwd', # Wind
'dpt', 'dbt', # Depth
'mtw', 'mta', # Temperature
'xdr_tanks', 'xdr_batteries', 'xdr_pressure', # Transducers
}
def __init__(
self,
mag_variation: Optional[float] = None,
enabled_sentences: Optional[Set[str]] = None,
transducer_offset: float = 0.0,
):
"""Initialize the NMEA generator.
Args:
mag_variation: Magnetic variation in degrees (positive=East)
enabled_sentences: Set of sentence types to generate.
If None, all sentences are enabled.
transducer_offset: Depth transducer offset in meters
(positive = to waterline, negative = to keel)
"""
self.mag_variation = mag_variation or self.DEFAULT_MAG_VARIATION
self.enabled = enabled_sentences or self.SENTENCE_TYPES.copy()
self.transducer_offset = transducer_offset
def is_enabled(self, sentence_type: str) -> bool:
"""Check if a sentence type is enabled."""
return sentence_type in self.enabled
def enable(self, sentence_type: str) -> None:
"""Enable a sentence type."""
if sentence_type in self.SENTENCE_TYPES:
self.enabled.add(sentence_type)
def disable(self, sentence_type: str) -> None:
"""Disable a sentence type."""
self.enabled.discard(sentence_type)
def generate_all(self, data: SensorData) -> List[str]:
"""Generate all enabled NMEA sentences.
Args:
data: SensorData object with current sensor values
Returns:
List of NMEA sentence strings (with CRLF)
"""
sentences = []
now = datetime.utcnow()
# GPS sentences
if self.is_enabled('gga'):
s = self.generate_gga(data, now)
if s:
sentences.append(s)
if self.is_enabled('gll'):
s = self.generate_gll(data, now)
if s:
sentences.append(s)
if self.is_enabled('rmc'):
s = self.generate_rmc(data, now)
if s:
sentences.append(s)
# Navigation sentences
if self.is_enabled('hdg'):
s = self.generate_hdg(data)
if s:
sentences.append(s)
if self.is_enabled('hdt'):
s = self.generate_hdt(data)
if s:
sentences.append(s)
if self.is_enabled('vtg'):
s = self.generate_vtg(data)
if s:
sentences.append(s)
if self.is_enabled('vhw'):
s = self.generate_vhw(data)
if s:
sentences.append(s)
# Wind sentences
if self.is_enabled('mwv_apparent'):
s = self.generate_mwv_apparent(data)
if s:
sentences.append(s)
if self.is_enabled('mwv_true'):
s = self.generate_mwv_true(data)
if s:
sentences.append(s)
if self.is_enabled('mwd'):
s = self.generate_mwd(data)
if s:
sentences.append(s)
# Depth sentences
if self.is_enabled('dpt'):
s = self.generate_dpt(data)
if s:
sentences.append(s)
if self.is_enabled('dbt'):
s = self.generate_dbt(data)
if s:
sentences.append(s)
# Temperature sentences
if self.is_enabled('mtw'):
s = self.generate_mtw(data)
if s:
sentences.append(s)
if self.is_enabled('mta'):
s = self.generate_mta(data)
if s:
sentences.append(s)
# Transducer sentences
if self.is_enabled('xdr_tanks'):
s = self.generate_xdr_tanks(data)
if s:
sentences.append(s)
if self.is_enabled('xdr_batteries'):
s = self.generate_xdr_batteries(data)
if s:
sentences.append(s)
if self.is_enabled('xdr_pressure'):
s = self.generate_xdr_pressure(data)
if s:
sentences.append(s)
return sentences
# GPS Sentences
def generate_gga(
self,
data: SensorData,
time: Optional[datetime] = None
) -> Optional[str]:
"""Generate GGA (GPS Fix Data) sentence."""
with data._lock:
lat = data.latitude
lon = data.longitude
if lat is None or lon is None:
return None
sentence = GGASentence(
latitude=lat,
longitude=lon,
time=time,
)
return sentence.to_nmea()
def generate_gll(
self,
data: SensorData,
time: Optional[datetime] = None
) -> Optional[str]:
"""Generate GLL (Geographic Position) sentence."""
with data._lock:
lat = data.latitude
lon = data.longitude
if lat is None or lon is None:
return None
sentence = GLLSentence(
latitude=lat,
longitude=lon,
time=time,
)
return sentence.to_nmea()
def generate_rmc(
self,
data: SensorData,
time: Optional[datetime] = None
) -> Optional[str]:
"""Generate RMC (Recommended Minimum) sentence."""
with data._lock:
lat = data.latitude
lon = data.longitude
sog = data.sog_kts
cog = data.cog_deg
if lat is None or lon is None:
return None
sentence = RMCSentence(
latitude=lat,
longitude=lon,
time=time,
sog=sog,
cog=cog,
mag_var=self.mag_variation,
)
return sentence.to_nmea()
# Navigation Sentences
def generate_hdg(self, data: SensorData) -> Optional[str]:
"""Generate HDG (Heading, Deviation & Variation) sentence."""
with data._lock:
heading = data.heading_deg
if heading is None:
return None
# Convert true heading to magnetic
heading_mag = (heading - self.mag_variation) % 360
sentence = HDGSentence(
heading=heading_mag,
deviation=0.0,
variation=self.mag_variation,
)
return sentence.to_nmea()
def generate_hdt(self, data: SensorData) -> Optional[str]:
"""Generate HDT (Heading True) sentence."""
with data._lock:
heading = data.heading_deg
if heading is None:
return None
sentence = HDTSentence(heading=heading)
return sentence.to_nmea()
def generate_vtg(self, data: SensorData) -> Optional[str]:
"""Generate VTG (Track Made Good) sentence."""
with data._lock:
cog = data.cog_deg
sog = data.sog_kts
# Need at least one value
if cog is None and sog is None:
return None
cog_mag = None
if cog is not None:
cog_mag = (cog - self.mag_variation) % 360
sentence = VTGSentence(
cog_true=cog,
cog_mag=cog_mag,
sog_kts=sog,
)
return sentence.to_nmea()
def generate_vhw(self, data: SensorData) -> Optional[str]:
"""Generate VHW (Water Speed and Heading) sentence."""
with data._lock:
heading = data.heading_deg
# Note: We don't have water speed, so just use heading
if heading is None:
return None
heading_mag = (heading - self.mag_variation) % 360
sentence = VHWSentence(
heading_true=heading,
heading_mag=heading_mag,
speed_kts=None, # No water speed available
)
return sentence.to_nmea()
# Wind Sentences
def generate_mwv_apparent(self, data: SensorData) -> Optional[str]:
"""Generate MWV sentence for apparent wind."""
with data._lock:
awa = data.awa_deg
aws = data.aws_kts
if awa is None and aws is None:
return None
sentence = MWVSentence(
angle=awa,
reference="R", # Relative/Apparent
speed=aws,
speed_units="N", # Knots
)
return sentence.to_nmea()
def generate_mwv_true(self, data: SensorData) -> Optional[str]:
"""Generate MWV sentence for true wind."""
with data._lock:
twd = data.twd_deg
tws = data.tws_kts
heading = data.heading_deg
if twd is None and tws is None:
return None
# Calculate true wind angle relative to bow
twa = None
if twd is not None and heading is not None:
twa = (twd - heading) % 360
if twa > 180:
twa = twa - 360 # Normalize to -180 to 180
sentence = MWVSentence(
angle=twa if twa is not None else twd,
reference="T", # True
speed=tws,
speed_units="N", # Knots
)
return sentence.to_nmea()
def generate_mwd(self, data: SensorData) -> Optional[str]:
"""Generate MWD (Wind Direction and Speed) sentence."""
with data._lock:
twd = data.twd_deg
tws = data.tws_kts
if twd is None and tws is None:
return None
# Calculate magnetic wind direction
twd_mag = None
if twd is not None:
twd_mag = (twd - self.mag_variation) % 360
sentence = MWDSentence(
direction_true=twd,
direction_mag=twd_mag,
speed_kts=tws,
)
return sentence.to_nmea()
# Depth Sentences
def generate_dpt(self, data: SensorData) -> Optional[str]:
"""Generate DPT (Depth) sentence."""
with data._lock:
depth = data.depth_m
if depth is None:
return None
sentence = DPTSentence(
depth_m=depth,
offset_m=self.transducer_offset,
)
return sentence.to_nmea()
def generate_dbt(self, data: SensorData) -> Optional[str]:
"""Generate DBT (Depth Below Transducer) sentence."""
with data._lock:
depth = data.depth_m
if depth is None:
return None
sentence = DBTSentence(depth_m=depth)
return sentence.to_nmea()
# Temperature Sentences
def generate_mtw(self, data: SensorData) -> Optional[str]:
"""Generate MTW (Water Temperature) sentence."""
with data._lock:
temp = data.water_temp_c
if temp is None:
return None
sentence = MTWSentence(temp_c=temp)
return sentence.to_nmea()
def generate_mta(self, data: SensorData) -> Optional[str]:
"""Generate MTA (Air Temperature) sentence."""
with data._lock:
temp = data.air_temp_c
if temp is None:
return None
sentence = MTASentence(temp_c=temp)
return sentence.to_nmea()
# Transducer Sentences
def generate_xdr_tanks(self, data: SensorData) -> Optional[str]:
"""Generate XDR sentence for tank levels."""
with data._lock:
tanks = dict(data.tanks)
if not tanks:
return None
# Build name mapping
names = {tid: get_tank_name(tid) for tid in tanks}
sentence = XDRSentence.for_tanks(tanks, names)
return sentence.to_nmea()
def generate_xdr_batteries(self, data: SensorData) -> Optional[str]:
"""Generate XDR sentence for battery voltages."""
with data._lock:
batteries = dict(data.batteries)
if not batteries:
return None
# Build name mapping
names = {bid: get_battery_name(bid) for bid in batteries}
sentence = XDRSentence.for_batteries(batteries, names)
return sentence.to_nmea()
def generate_xdr_pressure(self, data: SensorData) -> Optional[str]:
"""Generate XDR sentence for barometric pressure."""
with data._lock:
pressure = data.pressure_mbar
if pressure is None:
return None
sentence = XDRSentence.for_pressure(pressure)
return sentence.to_nmea()
def generate_xdr_all(self, data: SensorData) -> List[str]:
"""Generate all XDR sentences (tanks, batteries, pressure).
Returns separate sentences for each type to avoid
exceeding NMEA sentence length limits.
"""
sentences = []
tanks_xdr = self.generate_xdr_tanks(data)
if tanks_xdr:
sentences.append(tanks_xdr)
batteries_xdr = self.generate_xdr_batteries(data)
if batteries_xdr:
sentences.append(batteries_xdr)
pressure_xdr = self.generate_xdr_pressure(data)
if pressure_xdr:
sentences.append(pressure_xdr)
return sentences