#!/usr/bin/env python3 """ Debug script to investigate DTS step transition issues. This script will monitor the DTS process and help identify why transitions aren't being detected. """ import sys import os import time from datetime import datetime # Add the package directory to Python path sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) from watermaker_plc_api.services.plc_connection import get_plc_connection from watermaker_plc_api.models.timer_mappings import ( get_timer_for_dts_mode, get_timer_expected_start_value, calculate_timer_progress_percent, get_timer_info ) from watermaker_plc_api.utils.logger import get_logger logger = get_logger(__name__) def monitor_dts_transition(): """Monitor DTS process and detect transition issues""" plc = get_plc_connection() if not plc.connect(): print("โŒ Failed to connect to PLC") return print("๐Ÿ” DTS Step Transition Monitor") print("=" * 50) previous_mode = None previous_timer_value = None step_start_time = None try: while True: # Read current system mode current_mode = plc.read_holding_register(1000) if current_mode is None: print("โŒ Failed to read R1000 (system mode)") time.sleep(2) continue # Check if mode changed if current_mode != previous_mode: print(f"\n๐Ÿ”„ Mode Change Detected: {previous_mode} โ†’ {current_mode}") previous_mode = current_mode step_start_time = datetime.now() previous_timer_value = None # Get timer for current mode timer_address = get_timer_for_dts_mode(current_mode) if timer_address: # Read timer value timer_value = plc.read_holding_register(timer_address) if timer_value is not None: # Get timer info timer_info = get_timer_info(timer_address) expected_start = get_timer_expected_start_value(timer_address) # Calculate progress progress = calculate_timer_progress_percent(timer_address, timer_value) # Check if timer value changed timer_changed = timer_value != previous_timer_value previous_timer_value = timer_value # Calculate elapsed time since step started elapsed = "" if step_start_time: elapsed_seconds = (datetime.now() - step_start_time).total_seconds() elapsed = f" (elapsed: {elapsed_seconds:.1f}s)" # Print status status_icon = "โณ" if timer_changed else "โš ๏ธ" print(f"{status_icon} Mode {current_mode} | Timer R{timer_address}: {timer_value} | " f"Progress: {progress}% | Expected Start: {expected_start}{elapsed}") # Check for potential issues if timer_value == 0 and progress == 0: print(" ๐Ÿšจ Timer is 0 but progress is 0% - possible issue!") elif timer_value > expected_start: print(f" ๐Ÿšจ Timer value ({timer_value}) > expected start ({expected_start}) - unusual!") elif not timer_changed and timer_value > 0: print(" โš ๏ธ Timer not counting down - may be stuck!") # Check if step should transition if timer_value == 0 and current_mode == 5: print(" โœ… Step 1 timer completed - should transition to Mode 6 soon") elif timer_value == 0 and current_mode == 6: print(" โœ… Step 2 timer completed - should transition to Mode 7 soon") else: print(f"โŒ Failed to read timer R{timer_address} for mode {current_mode}") else: print(f"โ„น๏ธ Mode {current_mode} has no associated timer (production/transition mode)") time.sleep(1) # Check every second except KeyboardInterrupt: print("\n๐Ÿ‘‹ Monitoring stopped by user") except Exception as e: print(f"โŒ Error during monitoring: {e}") finally: plc.disconnect() def check_current_dts_state(): """Check the current DTS state and provide detailed analysis""" plc = get_plc_connection() if not plc.connect(): print("โŒ Failed to connect to PLC") return print("๐Ÿ“Š Current DTS State Analysis") print("=" * 40) try: # Read current mode current_mode = plc.read_holding_register(1000) print(f"Current Mode (R1000): {current_mode}") if current_mode is None: print("โŒ Cannot read system mode") return # Get timer for current mode timer_address = get_timer_for_dts_mode(current_mode) if timer_address: timer_value = plc.read_holding_register(timer_address) timer_info = get_timer_info(timer_address) expected_start = get_timer_expected_start_value(timer_address) progress = calculate_timer_progress_percent(timer_address, timer_value) print(f"Timer Address: R{timer_address}") print(f"Timer Name: {timer_info.get('name', 'Unknown')}") print(f"Current Value: {timer_value}") print(f"Expected Start: {expected_start}") print(f"Progress: {progress}%") print(f"Scale: {timer_info.get('scale', 'direct')}") print(f"Unit: {timer_info.get('unit', '')}") # Analysis if timer_value == 0: print("โœ… Timer completed - step should transition soon") elif timer_value == expected_start: print("๐Ÿ”„ Timer at start value - step just began") elif timer_value > expected_start: print("โš ๏ธ Timer value higher than expected - unusual condition") else: remaining_time = timer_value / 10 # Assuming รท10 scale print(f"โณ Timer counting down - ~{remaining_time:.1f}s remaining") else: print("โ„น๏ธ No timer for current mode (production/transition phase)") # Check other relevant registers print("\n๐Ÿ” Additional Register Values:") # Check R71 (command register) r71_value = plc.read_holding_register(71) print(f"R71 (Command): {r71_value}") # Check R138 specifically (DTS Step 1 timer) r138_value = plc.read_holding_register(138) print(f"R138 (DTS Step 1 Timer): {r138_value}") # Check R128 (DTS Step 2 timer) r128_value = plc.read_holding_register(128) print(f"R128 (DTS Step 2 Timer): {r128_value}") except Exception as e: print(f"โŒ Error during analysis: {e}") finally: plc.disconnect() def main(): """Main function""" if len(sys.argv) > 1 and sys.argv[1] == "monitor": monitor_dts_transition() else: check_current_dts_state() print("\n๐Ÿ’ก To continuously monitor transitions, run:") print(" python debug_dts_step_transition.py monitor") if __name__ == "__main__": main()