140 lines
5.3 KiB
Python
140 lines
5.3 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Test script to verify the DTS Single State Refactoring implementation.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'watermaker_plc_api'))
|
|
|
|
from watermaker_plc_api.services.operation_state import get_operation_state_manager
|
|
|
|
def test_operation_state_manager():
|
|
"""Test the basic functionality of the operation state manager"""
|
|
print("Testing Operation State Manager...")
|
|
|
|
state_manager = get_operation_state_manager()
|
|
|
|
# Test 1: Initial state should be idle
|
|
print("Test 1: Initial state")
|
|
assert state_manager.is_idle(), "Initial state should be idle"
|
|
assert not state_manager.is_running(), "Initial state should not be running"
|
|
print("✓ Initial state is idle")
|
|
|
|
# Test 2: Start an operation
|
|
print("\nTest 2: Start operation")
|
|
success, message, details = state_manager.start_operation("test_operation", "api")
|
|
assert success, f"Should be able to start operation: {message}"
|
|
assert state_manager.is_running(), "State should be running after start"
|
|
assert not state_manager.is_idle(), "State should not be idle after start"
|
|
print(f"✓ Operation started: {details['operation_id']}")
|
|
|
|
# Test 3: Try to start another operation (should fail)
|
|
print("\nTest 3: Conflict detection")
|
|
success2, message2, details2 = state_manager.start_operation("another_operation", "api")
|
|
assert not success2, "Should not be able to start second operation"
|
|
assert "already in progress" in message2.lower(), f"Should indicate conflict: {message2}"
|
|
print("✓ Conflict detection works")
|
|
|
|
# Test 4: Update state
|
|
print("\nTest 4: State updates")
|
|
state_manager.update_state({
|
|
"current_step": "test_step",
|
|
"progress_percent": 50,
|
|
"test_field": "test_value"
|
|
})
|
|
current_state = state_manager.get_current_state()
|
|
assert current_state["current_step"] == "test_step", "State update should work"
|
|
assert current_state["progress_percent"] == 50, "Progress should be updated"
|
|
assert current_state["test_field"] == "test_value", "Custom fields should be updated"
|
|
print("✓ State updates work")
|
|
|
|
# Test 5: Complete operation
|
|
print("\nTest 5: Complete operation")
|
|
state_manager.complete_operation(success=True)
|
|
assert not state_manager.is_running(), "Should not be running after completion"
|
|
current_state = state_manager.get_current_state()
|
|
assert current_state["status"] == "completed", "Status should be completed"
|
|
print("✓ Operation completion works")
|
|
|
|
# Test 6: Operation history
|
|
print("\nTest 6: Operation history")
|
|
history = state_manager.get_operation_history()
|
|
assert len(history) == 1, "Should have one operation in history"
|
|
assert history[0]["operation_type"] == "test_operation", "History should contain our operation"
|
|
print("✓ Operation history works")
|
|
|
|
# Test 7: Start new operation after completion
|
|
print("\nTest 7: New operation after completion")
|
|
success3, message3, details3 = state_manager.start_operation("new_operation", "external")
|
|
assert success3, f"Should be able to start new operation: {message3}"
|
|
assert state_manager.is_running(), "Should be running again"
|
|
print(f"✓ New operation started: {details3['operation_id']}")
|
|
|
|
# Test 8: Cancel operation
|
|
print("\nTest 8: Cancel operation")
|
|
cancel_success = state_manager.cancel_operation()
|
|
assert cancel_success, "Should be able to cancel running operation"
|
|
current_state = state_manager.get_current_state()
|
|
assert current_state["status"] == "cancelled", "Status should be cancelled"
|
|
print("✓ Operation cancellation works")
|
|
|
|
print("\n✅ All tests passed! Operation State Manager is working correctly.")
|
|
|
|
def test_import_structure():
|
|
"""Test that all the refactored imports work correctly"""
|
|
print("\nTesting Import Structure...")
|
|
|
|
try:
|
|
from watermaker_plc_api.controllers.dts_controller import (
|
|
get_operation_state_manager,
|
|
handle_external_dts_change,
|
|
start_dts_sequence_async,
|
|
start_stop_sequence_async,
|
|
start_skip_sequence_async,
|
|
update_dts_progress_from_timers
|
|
)
|
|
print("✓ DTS controller imports work")
|
|
|
|
from watermaker_plc_api.services.operation_state import OperationStateManager
|
|
print("✓ Operation state imports work")
|
|
|
|
from watermaker_plc_api.services.background_tasks import get_task_manager
|
|
print("✓ Background tasks imports work")
|
|
|
|
print("✅ All imports successful!")
|
|
|
|
except ImportError as e:
|
|
print(f"❌ Import error: {e}")
|
|
return False
|
|
|
|
return True
|
|
|
|
def main():
|
|
"""Run all tests"""
|
|
print("=" * 60)
|
|
print("DTS Single State Refactoring - Test Suite")
|
|
print("=" * 60)
|
|
|
|
try:
|
|
# Test imports first
|
|
if not test_import_structure():
|
|
return 1
|
|
|
|
# Test operation state manager
|
|
test_operation_state_manager()
|
|
|
|
print("\n" + "=" * 60)
|
|
print("🎉 ALL TESTS PASSED! Refactoring is successful!")
|
|
print("=" * 60)
|
|
|
|
return 0
|
|
|
|
except Exception as e:
|
|
print(f"\n❌ Test failed with error: {e}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
return 1
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(main()) |