#!/usr/bin/env python3 """ Debug Input Tracker for Generator Overload Detection Monitors and outputs all tracked input variables to help debug generator overload detection. Samples internally at dbus rate (2Hz) while allowing configurable output frequency. Usage: ./debug_input_tracker.py # Output every 2 seconds ./debug_input_tracker.py --interval 5 # Output every 5 seconds ./debug_input_tracker.py --interval 0.5 # Output every sample ./debug_input_tracker.py --csv # CSV output format ./debug_input_tracker.py --verbose # Include raw buffer data Author: Claude (Anthropic) License: MIT """ import sys import os import argparse from time import time, sleep from datetime import datetime from collections import deque # Add velib_python to path (Venus OS standard location) sys.path.insert(1, os.path.join(os.path.dirname(__file__), 'ext', 'velib_python')) sys.path.insert(1, '/opt/victronenergy/velib_python') try: import dbus except ImportError: print("ERROR: dbus not available. This script must run on Venus OS.") print("For development testing, use --mock flag.") dbus = None from config import DBUS_CONFIG, OVERLOAD_CONFIG from overload_detector import OverloadDetector class InputTracker: """ Tracks generator input power and deviation metrics for debugging. """ def __init__(self, use_mock=False): self.use_mock = use_mock self.bus = None if not use_mock: if dbus is None: raise RuntimeError("dbus module not available") self.bus = dbus.SystemBus() self._proxy_cache = {} # Services self.vebus_service = DBUS_CONFIG['vebus_service'] self.generator_service = DBUS_CONFIG['generator_service'] # Overload detector for deviation tracking self.detector = OverloadDetector(OVERLOAD_CONFIG) # Internal sample buffers for statistics self.sample_count = 0 self.power_samples = deque(maxlen=100) # Last 100 samples for stats # Track min/max/avg since last output self.reset_interval_stats() # Mock state for testing self._mock_time = 0 self._mock_base_power = 8000 def reset_interval_stats(self): """Reset statistics tracked between output intervals""" self.interval_samples = 0 self.interval_power_min = float('inf') self.interval_power_max = float('-inf') self.interval_power_sum = 0 self.interval_overload_triggers = 0 def _get_proxy(self, service, path): key = (service, path) proxy = self._proxy_cache.get(key) if proxy is not None: return proxy obj = self.bus.get_object(service, path, introspect=False) self._proxy_cache[key] = obj return obj def _get_dbus_value(self, service, path): """Get a value from D-Bus service""" if self.use_mock: return self._get_mock_value(service, path) try: obj = self._get_proxy(service, path) return obj.GetValue(dbus_interface='com.victronenergy.BusItem') except Exception: self._proxy_cache.pop((service, path), None) return None def _get_mock_value(self, service, path): """Return mock values for development testing""" import random import math self._mock_time += 0.5 if path == '/Ac/ActiveIn/L1/P': # Simulate power with some oscillation oscillation = 200 * math.sin(self._mock_time * 0.5) noise = random.gauss(0, 50) return self._mock_base_power + oscillation + noise elif path == '/Ac/ActiveIn/L2/P': return 0 # Single phase elif path == '/Ac/ActiveIn/Connected': return 1 elif path == '/State': return 1 # Running elif path == '/Ac/In/1/CurrentLimit': return 45.0 return None def read_inputs(self): """Read all input values from dbus""" # Power readings l1_power = self._get_dbus_value(self.vebus_service, '/Ac/ActiveIn/L1/P') l2_power = self._get_dbus_value(self.vebus_service, '/Ac/ActiveIn/L2/P') l1_power = float(l1_power) if l1_power is not None else 0.0 l2_power = float(l2_power) if l2_power is not None else 0.0 total_power = l1_power + l2_power # AC connection status ac_connected = self._get_dbus_value(self.vebus_service, '/Ac/ActiveIn/Connected') ac_connected = bool(ac_connected) if ac_connected is not None else False # Generator state gen_state = self._get_dbus_value(self.generator_service, '/State') gen_state = int(gen_state) if gen_state is not None else 0 # Current limit current_limit = self._get_dbus_value(self.vebus_service, '/Ac/In/1/CurrentLimit') current_limit = float(current_limit) if current_limit is not None else 0.0 return { 'timestamp': time(), 'l1_power': l1_power, 'l2_power': l2_power, 'total_power': total_power, 'ac_connected': ac_connected, 'generator_state': gen_state, 'current_limit': current_limit, } def sample(self): """ Take a single sample and update internal tracking. Returns sample data with overload detection diagnostics. """ inputs = self.read_inputs() # Update overload detector is_overload, diag = self.detector.update( inputs['l1_power'], inputs['l2_power'], inputs['timestamp'] ) self.sample_count += 1 self.power_samples.append(inputs['total_power']) # Update interval statistics self.interval_samples += 1 self.interval_power_sum += inputs['total_power'] self.interval_power_min = min(self.interval_power_min, inputs['total_power']) self.interval_power_max = max(self.interval_power_max, inputs['total_power']) if is_overload: self.interval_overload_triggers += 1 return { **inputs, 'is_overload': is_overload, 'diagnostics': diag, 'sample_number': self.sample_count, } def get_interval_stats(self): """Get statistics for the current output interval""" if self.interval_samples == 0: return None return { 'samples': self.interval_samples, 'power_min': self.interval_power_min, 'power_max': self.interval_power_max, 'power_avg': self.interval_power_sum / self.interval_samples, 'power_range': self.interval_power_max - self.interval_power_min, 'overload_triggers': self.interval_overload_triggers, } def get_buffer_stats(self): """Get statistics from the power sample buffer""" if len(self.power_samples) < 2: return None samples = list(self.power_samples) n = len(samples) mean = sum(samples) / n variance = sum((x - mean) ** 2 for x in samples) / n std_dev = variance ** 0.5 return { 'buffer_size': n, 'mean': mean, 'std_dev': std_dev, 'min': min(samples), 'max': max(samples), } def format_table_output(sample, interval_stats, buffer_stats, verbose=False): """Format output as a readable table""" lines = [] # Header with timestamp ts = datetime.fromtimestamp(sample['timestamp']).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] lines.append(f"\n{'='*70}") lines.append(f"Debug Input Tracker - {ts}") lines.append(f"{'='*70}") # Generator Status gen_states = {0: 'Stopped', 1: 'Running', 2: 'Warm-up', 3: 'Cool-down', 10: 'Error'} gen_state_name = gen_states.get(sample['generator_state'], f"Unknown({sample['generator_state']})") lines.append(f"\n--- Generator Status ---") lines.append(f" State: {gen_state_name}") lines.append(f" AC Connected: {'Yes' if sample['ac_connected'] else 'No'}") lines.append(f" Current Limit: {sample['current_limit']:.1f} A") # Power Readings lines.append(f"\n--- Power Readings ---") lines.append(f" L1 Power: {sample['l1_power']:>8.1f} W") lines.append(f" L2 Power: {sample['l2_power']:>8.1f} W") lines.append(f" Total Power: {sample['total_power']:>8.1f} W") # Interval Statistics if interval_stats: lines.append(f"\n--- Interval Stats ({interval_stats['samples']} samples) ---") lines.append(f" Power Min: {interval_stats['power_min']:>8.1f} W") lines.append(f" Power Max: {interval_stats['power_max']:>8.1f} W") lines.append(f" Power Avg: {interval_stats['power_avg']:>8.1f} W") lines.append(f" Power Range: {interval_stats['power_range']:>8.1f} W") # Deviation/Detection Metrics diag = sample['diagnostics'] lines.append(f"\n--- Overload Detection ---") if diag.get('status') == 'warming_up': lines.append(f" Status: Warming up ({diag.get('samples', 0)}/{diag.get('needed', '?')} samples)") elif diag.get('status') == 'lockout': lines.append(f" Status: Lockout ({diag.get('lockout_remaining', 0):.1f}s remaining)") else: # Method 1: Reversals reversals = diag.get('reversals', 0) reversal_thresh = diag.get('reversal_threshold', OVERLOAD_CONFIG['reversal_threshold']) method1 = diag.get('method1_triggered', False) lines.append(f" Reversals: {reversals:>3d} / {reversal_thresh} threshold {'[TRIGGERED]' if method1 else ''}") # Method 2: Std Dev std_dev = diag.get('std_dev', 0) std_thresh = diag.get('std_dev_threshold', OVERLOAD_CONFIG['std_dev_threshold']) method2 = diag.get('method2_triggered', False) lines.append(f" Std Deviation: {std_dev:>7.1f} W / {std_thresh} W threshold {'[TRIGGERED]' if method2 else ''}") # Combined detection instant = diag.get('instant_detection', False) confirmed = diag.get('confirmed_count', 0) confirm_thresh = diag.get('confirmation_threshold', OVERLOAD_CONFIG['confirmation_threshold']) lines.append(f" Instant Detect: {'Yes' if instant else 'No'}") lines.append(f" Confirmations: {confirmed:>3d} / {confirm_thresh} threshold") # Final overload status is_overload = sample['is_overload'] status = "*** OVERLOAD DETECTED ***" if is_overload else "Normal" lines.append(f" Status: {status}") # Verbose buffer data if verbose and buffer_stats: lines.append(f"\n--- Buffer Statistics ({buffer_stats['buffer_size']} samples) ---") lines.append(f" Mean Power: {buffer_stats['mean']:>8.1f} W") lines.append(f" Std Deviation: {buffer_stats['std_dev']:>8.1f} W") lines.append(f" Min: {buffer_stats['min']:>8.1f} W") lines.append(f" Max: {buffer_stats['max']:>8.1f} W") lines.append(f"\n Sample #: {sample['sample_number']}") return '\n'.join(lines) def format_csv_output(sample, interval_stats, include_header=False): """Format output as CSV""" headers = [ 'timestamp', 'sample_num', 'gen_state', 'ac_connected', 'current_limit', 'l1_power', 'l2_power', 'total_power', 'reversals', 'std_dev', 'method1_triggered', 'method2_triggered', 'instant_detection', 'confirmed_count', 'is_overload', 'interval_samples', 'power_min', 'power_max', 'power_avg', 'power_range' ] diag = sample['diagnostics'] values = [ datetime.fromtimestamp(sample['timestamp']).strftime('%Y-%m-%d %H:%M:%S.%f')[:-3], sample['sample_number'], sample['generator_state'], 1 if sample['ac_connected'] else 0, f"{sample['current_limit']:.1f}", f"{sample['l1_power']:.1f}", f"{sample['l2_power']:.1f}", f"{sample['total_power']:.1f}", diag.get('reversals', ''), f"{diag.get('std_dev', 0):.1f}" if 'std_dev' in diag else '', 1 if diag.get('method1_triggered', False) else 0, 1 if diag.get('method2_triggered', False) else 0, 1 if diag.get('instant_detection', False) else 0, diag.get('confirmed_count', ''), 1 if sample['is_overload'] else 0, ] # Interval stats if interval_stats: values.extend([ interval_stats['samples'], f"{interval_stats['power_min']:.1f}", f"{interval_stats['power_max']:.1f}", f"{interval_stats['power_avg']:.1f}", f"{interval_stats['power_range']:.1f}", ]) else: values.extend(['', '', '', '', '']) lines = [] if include_header: lines.append(','.join(headers)) lines.append(','.join(str(v) for v in values)) return '\n'.join(lines) def main(): parser = argparse.ArgumentParser( description='Debug tracker for generator input variables and overload detection', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" Examples: %(prog)s # Output every 2 seconds (default) %(prog)s --interval 5 # Output every 5 seconds %(prog)s --interval 0.5 # Output every sample (500ms) %(prog)s --csv # CSV format for logging %(prog)s --csv > debug.csv # Log to file %(prog)s --verbose # Include buffer statistics %(prog)s --mock # Use mock data for testing """ ) parser.add_argument( '-i', '--interval', type=float, default=2.0, help='Output interval in seconds (default: 2.0, min: 0.5)' ) parser.add_argument( '--csv', action='store_true', help='Output in CSV format' ) parser.add_argument( '-v', '--verbose', action='store_true', help='Include detailed buffer statistics' ) parser.add_argument( '--mock', action='store_true', help='Use mock data for development testing' ) parser.add_argument( '--sample-rate', type=float, default=0.5, help='Internal sample rate in seconds (default: 0.5 = 2Hz dbus rate)' ) args = parser.parse_args() # Validate interval if args.interval < args.sample_rate: args.interval = args.sample_rate # Initialize tracker try: tracker = InputTracker(use_mock=args.mock) except RuntimeError as e: print(f"ERROR: {e}", file=sys.stderr) sys.exit(1) if not args.csv: print("Starting debug input tracker...") print(f" Sample rate: {args.sample_rate}s ({1/args.sample_rate:.1f} Hz)") print(f" Output interval: {args.interval}s") print(f" Press Ctrl+C to stop\n") # Output CSV header first_output = True # Timing last_output_time = 0 try: while True: now = time() # Take a sample sample = tracker.sample() # Check if it's time to output if now - last_output_time >= args.interval: interval_stats = tracker.get_interval_stats() buffer_stats = tracker.get_buffer_stats() if args.verbose else None if args.csv: print(format_csv_output(sample, interval_stats, include_header=first_output)) else: print(format_table_output(sample, interval_stats, buffer_stats, args.verbose)) # Flush for real-time output when piping sys.stdout.flush() # Reset interval tracking tracker.reset_interval_stats() last_output_time = now first_output = False # Wait for next sample sleep(args.sample_rate) except KeyboardInterrupt: if not args.csv: print("\n\nStopped.") sys.exit(0) if __name__ == '__main__': main()