194 lines
7.5 KiB
Python
194 lines
7.5 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
Debug script to investigate DTS step transition issues.
|
||
This script will monitor the DTS process and help identify why transitions aren't being detected.
|
||
"""
|
||
|
||
import sys
|
||
import os
|
||
import time
|
||
from datetime import datetime
|
||
|
||
# Add the package directory to Python path
|
||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||
|
||
from watermaker_plc_api.services.plc_connection import get_plc_connection
|
||
from watermaker_plc_api.models.timer_mappings import (
|
||
get_timer_for_dts_mode,
|
||
get_timer_expected_start_value,
|
||
calculate_timer_progress_percent,
|
||
get_timer_info
|
||
)
|
||
from watermaker_plc_api.utils.logger import get_logger
|
||
|
||
logger = get_logger(__name__)
|
||
|
||
def monitor_dts_transition():
|
||
"""Monitor DTS process and detect transition issues"""
|
||
plc = get_plc_connection()
|
||
|
||
if not plc.connect():
|
||
print("❌ Failed to connect to PLC")
|
||
return
|
||
|
||
print("🔍 DTS Step Transition Monitor")
|
||
print("=" * 50)
|
||
|
||
previous_mode = None
|
||
previous_timer_value = None
|
||
step_start_time = None
|
||
|
||
try:
|
||
while True:
|
||
# Read current system mode
|
||
current_mode = plc.read_holding_register(1000)
|
||
if current_mode is None:
|
||
print("❌ Failed to read R1000 (system mode)")
|
||
time.sleep(2)
|
||
continue
|
||
|
||
# Check if mode changed
|
||
if current_mode != previous_mode:
|
||
print(f"\n🔄 Mode Change Detected: {previous_mode} → {current_mode}")
|
||
previous_mode = current_mode
|
||
step_start_time = datetime.now()
|
||
previous_timer_value = None
|
||
|
||
# Get timer for current mode
|
||
timer_address = get_timer_for_dts_mode(current_mode)
|
||
|
||
if timer_address:
|
||
# Read timer value
|
||
timer_value = plc.read_holding_register(timer_address)
|
||
if timer_value is not None:
|
||
# Get timer info
|
||
timer_info = get_timer_info(timer_address)
|
||
expected_start = get_timer_expected_start_value(timer_address)
|
||
|
||
# Calculate progress
|
||
progress = calculate_timer_progress_percent(timer_address, timer_value)
|
||
|
||
# Check if timer value changed
|
||
timer_changed = timer_value != previous_timer_value
|
||
previous_timer_value = timer_value
|
||
|
||
# Calculate elapsed time since step started
|
||
elapsed = ""
|
||
if step_start_time:
|
||
elapsed_seconds = (datetime.now() - step_start_time).total_seconds()
|
||
elapsed = f" (elapsed: {elapsed_seconds:.1f}s)"
|
||
|
||
# Print status
|
||
status_icon = "⏳" if timer_changed else "⚠️"
|
||
print(f"{status_icon} Mode {current_mode} | Timer R{timer_address}: {timer_value} | "
|
||
f"Progress: {progress}% | Expected Start: {expected_start}{elapsed}")
|
||
|
||
# Check for potential issues
|
||
if timer_value == 0 and progress == 0:
|
||
print(" 🚨 Timer is 0 but progress is 0% - possible issue!")
|
||
elif timer_value > expected_start:
|
||
print(f" 🚨 Timer value ({timer_value}) > expected start ({expected_start}) - unusual!")
|
||
elif not timer_changed and timer_value > 0:
|
||
print(" ⚠️ Timer not counting down - may be stuck!")
|
||
|
||
# Check if step should transition
|
||
if timer_value == 0 and current_mode == 5:
|
||
print(" ✅ Step 1 timer completed - should transition to Mode 6 soon")
|
||
elif timer_value == 0 and current_mode == 6:
|
||
print(" ✅ Step 2 timer completed - should transition to Mode 7 soon")
|
||
|
||
else:
|
||
print(f"❌ Failed to read timer R{timer_address} for mode {current_mode}")
|
||
else:
|
||
print(f"ℹ️ Mode {current_mode} has no associated timer (production/transition mode)")
|
||
|
||
time.sleep(1) # Check every second
|
||
|
||
except KeyboardInterrupt:
|
||
print("\n👋 Monitoring stopped by user")
|
||
except Exception as e:
|
||
print(f"❌ Error during monitoring: {e}")
|
||
finally:
|
||
plc.disconnect()
|
||
|
||
def check_current_dts_state():
|
||
"""Check the current DTS state and provide detailed analysis"""
|
||
plc = get_plc_connection()
|
||
|
||
if not plc.connect():
|
||
print("❌ Failed to connect to PLC")
|
||
return
|
||
|
||
print("📊 Current DTS State Analysis")
|
||
print("=" * 40)
|
||
|
||
try:
|
||
# Read current mode
|
||
current_mode = plc.read_holding_register(1000)
|
||
print(f"Current Mode (R1000): {current_mode}")
|
||
|
||
if current_mode is None:
|
||
print("❌ Cannot read system mode")
|
||
return
|
||
|
||
# Get timer for current mode
|
||
timer_address = get_timer_for_dts_mode(current_mode)
|
||
|
||
if timer_address:
|
||
timer_value = plc.read_holding_register(timer_address)
|
||
timer_info = get_timer_info(timer_address)
|
||
expected_start = get_timer_expected_start_value(timer_address)
|
||
progress = calculate_timer_progress_percent(timer_address, timer_value)
|
||
|
||
print(f"Timer Address: R{timer_address}")
|
||
print(f"Timer Name: {timer_info.get('name', 'Unknown')}")
|
||
print(f"Current Value: {timer_value}")
|
||
print(f"Expected Start: {expected_start}")
|
||
print(f"Progress: {progress}%")
|
||
print(f"Scale: {timer_info.get('scale', 'direct')}")
|
||
print(f"Unit: {timer_info.get('unit', '')}")
|
||
|
||
# Analysis
|
||
if timer_value == 0:
|
||
print("✅ Timer completed - step should transition soon")
|
||
elif timer_value == expected_start:
|
||
print("🔄 Timer at start value - step just began")
|
||
elif timer_value > expected_start:
|
||
print("⚠️ Timer value higher than expected - unusual condition")
|
||
else:
|
||
remaining_time = timer_value / 10 # Assuming ÷10 scale
|
||
print(f"⏳ Timer counting down - ~{remaining_time:.1f}s remaining")
|
||
else:
|
||
print("ℹ️ No timer for current mode (production/transition phase)")
|
||
|
||
# Check other relevant registers
|
||
print("\n🔍 Additional Register Values:")
|
||
|
||
# Check R71 (command register)
|
||
r71_value = plc.read_holding_register(71)
|
||
print(f"R71 (Command): {r71_value}")
|
||
|
||
# Check R138 specifically (DTS Step 1 timer)
|
||
r138_value = plc.read_holding_register(138)
|
||
print(f"R138 (DTS Step 1 Timer): {r138_value}")
|
||
|
||
# Check R128 (DTS Step 2 timer)
|
||
r128_value = plc.read_holding_register(128)
|
||
print(f"R128 (DTS Step 2 Timer): {r128_value}")
|
||
|
||
except Exception as e:
|
||
print(f"❌ Error during analysis: {e}")
|
||
finally:
|
||
plc.disconnect()
|
||
|
||
def main():
|
||
"""Main function"""
|
||
if len(sys.argv) > 1 and sys.argv[1] == "monitor":
|
||
monitor_dts_transition()
|
||
else:
|
||
check_current_dts_state()
|
||
print("\n💡 To continuously monitor transitions, run:")
|
||
print(" python debug_dts_step_transition.py monitor")
|
||
|
||
if __name__ == "__main__":
|
||
main() |