304 lines
12 KiB
Python
304 lines
12 KiB
Python
"""
|
|
Tests for API controllers.
|
|
"""
|
|
|
|
import pytest
|
|
import json
|
|
from unittest.mock import Mock, patch, MagicMock
|
|
import sys
|
|
import os
|
|
|
|
# Add the parent directory to the path so we can import the package
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
|
|
|
|
|
class TestControllers:
|
|
"""Test cases for API controllers"""
|
|
|
|
@pytest.fixture
|
|
def app(self):
|
|
"""Create test Flask application"""
|
|
# Mock the services to avoid actual PLC connections during testing
|
|
with patch('watermaker_plc_api.services.plc_connection.get_plc_connection'), \
|
|
patch('watermaker_plc_api.services.data_cache.get_data_cache'), \
|
|
patch('watermaker_plc_api.services.background_tasks.start_background_updates'):
|
|
|
|
from watermaker_plc_api.app import create_app
|
|
from watermaker_plc_api.config import TestingConfig
|
|
|
|
app = create_app(TestingConfig)
|
|
app.config['TESTING'] = True
|
|
return app
|
|
|
|
@pytest.fixture
|
|
def client(self, app):
|
|
"""Create test client"""
|
|
return app.test_client()
|
|
|
|
def test_status_endpoint(self, client):
|
|
"""Test /api/status endpoint"""
|
|
with patch('watermaker_plc_api.controllers.system_controller.cache') as mock_cache, \
|
|
patch('watermaker_plc_api.controllers.system_controller.plc') as mock_plc:
|
|
|
|
mock_cache.get_connection_status.return_value = "connected"
|
|
mock_cache.get_last_update.return_value = "2025-06-03T12:00:00"
|
|
mock_plc.get_connection_status.return_value = {
|
|
"ip_address": "127.0.0.1",
|
|
"port": 502,
|
|
"connected": True
|
|
}
|
|
|
|
response = client.get('/api/status')
|
|
assert response.status_code == 200
|
|
|
|
data = json.loads(response.data)
|
|
assert 'connection_status' in data
|
|
assert 'last_update' in data
|
|
assert 'plc_config' in data
|
|
assert 'timestamp' in data
|
|
|
|
def test_config_endpoint(self, client):
|
|
"""Test /api/config endpoint"""
|
|
response = client.get('/api/config')
|
|
assert response.status_code == 200
|
|
|
|
data = json.loads(response.data)
|
|
assert 'api_version' in data
|
|
assert 'endpoints' in data
|
|
assert 'variable_groups' in data
|
|
assert 'total_variables' in data
|
|
|
|
def test_sensors_endpoint(self, client):
|
|
"""Test /api/sensors endpoint"""
|
|
with patch('watermaker_plc_api.controllers.sensors_controller.cache') as mock_cache:
|
|
# Mock cache data
|
|
mock_cache.get_sensors.return_value = {
|
|
"1000": {
|
|
"name": "System Mode",
|
|
"raw_value": 5,
|
|
"scaled_value": 5,
|
|
"unit": "",
|
|
"category": "system"
|
|
}
|
|
}
|
|
mock_cache.get_last_update.return_value = "2025-06-03T12:00:00"
|
|
|
|
response = client.get('/api/sensors')
|
|
assert response.status_code == 200
|
|
|
|
data = json.loads(response.data)
|
|
assert 'sensors' in data
|
|
assert 'last_update' in data
|
|
assert 'count' in data
|
|
assert data['count'] == 1
|
|
|
|
def test_sensors_category_endpoint(self, client):
|
|
"""Test /api/sensors/category/<category> endpoint"""
|
|
with patch('watermaker_plc_api.controllers.sensors_controller.cache') as mock_cache:
|
|
mock_cache.get_sensors_by_category.return_value = {}
|
|
mock_cache.get_last_update.return_value = "2025-06-03T12:00:00"
|
|
|
|
# Test valid category
|
|
response = client.get('/api/sensors/category/system')
|
|
assert response.status_code == 200
|
|
|
|
# Test invalid category
|
|
response = client.get('/api/sensors/category/invalid')
|
|
assert response.status_code == 400
|
|
|
|
def test_timers_endpoint(self, client):
|
|
"""Test /api/timers endpoint"""
|
|
with patch('watermaker_plc_api.controllers.timers_controller.cache') as mock_cache:
|
|
# Mock cache data
|
|
mock_cache.get_timers.return_value = {
|
|
"136": {
|
|
"name": "FWF Timer",
|
|
"raw_value": 0,
|
|
"scaled_value": 0,
|
|
"active": False
|
|
}
|
|
}
|
|
mock_cache.get_active_timers.return_value = []
|
|
mock_cache.get_last_update.return_value = "2025-06-03T12:00:00"
|
|
|
|
response = client.get('/api/timers')
|
|
assert response.status_code == 200
|
|
|
|
data = json.loads(response.data)
|
|
assert 'timers' in data
|
|
assert 'active_timers' in data
|
|
assert 'total_count' in data
|
|
assert 'active_count' in data
|
|
|
|
def test_outputs_endpoint(self, client):
|
|
"""Test /api/outputs endpoint"""
|
|
with patch('watermaker_plc_api.controllers.outputs_controller.cache') as mock_cache:
|
|
# Mock cache data
|
|
mock_cache.get_outputs.return_value = {
|
|
"40017": {
|
|
"register": 40017,
|
|
"value": 0,
|
|
"binary": "0000000000000000",
|
|
"bits": []
|
|
}
|
|
}
|
|
mock_cache.get_last_update.return_value = "2025-06-03T12:00:00"
|
|
|
|
response = client.get('/api/outputs')
|
|
assert response.status_code == 200
|
|
|
|
data = json.loads(response.data)
|
|
assert 'outputs' in data
|
|
assert 'last_update' in data
|
|
assert 'count' in data
|
|
|
|
def test_write_register_endpoint(self, client):
|
|
"""Test /api/write/register endpoint"""
|
|
# Test missing content-type (no JSON)
|
|
response = client.post('/api/write/register')
|
|
assert response.status_code == 400
|
|
data = json.loads(response.data)
|
|
assert "Request must be JSON" in data['message']
|
|
|
|
# Test missing data with proper content-type
|
|
response = client.post('/api/write/register',
|
|
data=json.dumps({}),
|
|
content_type='application/json')
|
|
assert response.status_code == 400
|
|
|
|
# Test missing fields
|
|
response = client.post('/api/write/register',
|
|
data=json.dumps({"address": 1000}),
|
|
content_type='application/json')
|
|
assert response.status_code == 400
|
|
|
|
# Test invalid values
|
|
response = client.post('/api/write/register',
|
|
data=json.dumps({"address": -1, "value": 5}),
|
|
content_type='application/json')
|
|
assert response.status_code == 400
|
|
|
|
def test_select_endpoint_no_params(self, client):
|
|
"""Test /api/select endpoint without parameters"""
|
|
response = client.get('/api/select')
|
|
assert response.status_code == 400
|
|
|
|
data = json.loads(response.data)
|
|
assert 'usage' in data['details']
|
|
|
|
def test_dts_start_endpoint(self, client):
|
|
"""Test /api/dts/start endpoint"""
|
|
with patch('watermaker_plc_api.controllers.dts_controller.start_dts_sequence_async') as mock_start:
|
|
# Mock successful start
|
|
mock_start.return_value = (True, "DTS sequence started", {"task_id": "abc12345"})
|
|
|
|
response = client.post('/api/dts/start')
|
|
assert response.status_code == 202
|
|
|
|
data = json.loads(response.data)
|
|
assert data['success'] is True
|
|
assert 'task_id' in data
|
|
assert 'status_endpoint' in data
|
|
|
|
def test_dts_start_conflict(self, client):
|
|
"""Test /api/dts/start endpoint with conflict"""
|
|
with patch('watermaker_plc_api.controllers.dts_controller.start_dts_sequence_async') as mock_start:
|
|
# Mock operation already running
|
|
mock_start.return_value = (False, "Operation already in progress", {"existing_task_id": "def67890"})
|
|
|
|
response = client.post('/api/dts/start')
|
|
assert response.status_code == 409
|
|
|
|
data = json.loads(response.data)
|
|
assert data['success'] is False
|
|
|
|
def test_dts_status_endpoint_not_found(self, client):
|
|
"""Test /api/dts/status/<task_id> endpoint with non-existent task"""
|
|
response = client.get('/api/dts/status/nonexistent')
|
|
assert response.status_code == 404
|
|
|
|
data = json.loads(response.data)
|
|
assert 'available_tasks' in data['details']
|
|
|
|
def test_dts_cancel_endpoint_not_found(self, client):
|
|
"""Test /api/dts/cancel/<task_id> endpoint with non-existent task"""
|
|
response = client.post('/api/dts/cancel/nonexistent')
|
|
assert response.status_code == 404
|
|
|
|
data = json.loads(response.data)
|
|
assert data['success'] is False
|
|
|
|
def test_dts_cancel_endpoint_success(self, client):
|
|
"""Test successful task cancellation"""
|
|
with patch('watermaker_plc_api.controllers.dts_controller.dts_operations') as mock_operations:
|
|
# Mock existing running task
|
|
mock_task = {
|
|
"task_id": "abc12345",
|
|
"status": "running",
|
|
"current_step": "waiting_for_valves"
|
|
}
|
|
mock_operations.get.return_value = mock_task
|
|
|
|
response = client.post('/api/dts/cancel/abc12345')
|
|
assert response.status_code == 200
|
|
|
|
data = json.loads(response.data)
|
|
assert data['success'] is True
|
|
|
|
def test_dts_cancel_endpoint_not_running(self, client):
|
|
"""Test cancelling non-running task"""
|
|
with patch('watermaker_plc_api.controllers.dts_controller.dts_operations') as mock_operations:
|
|
# Mock existing completed task
|
|
mock_operations.get.return_value = {
|
|
"task_id": "abc12345",
|
|
"status": "completed",
|
|
"current_step": "completed"
|
|
}
|
|
|
|
response = client.post('/api/dts/cancel/abc12345')
|
|
assert response.status_code == 400
|
|
|
|
data = json.loads(response.data)
|
|
assert data['success'] is False
|
|
|
|
|
|
class TestErrorHandling:
|
|
"""Test cases for error handling across controllers"""
|
|
|
|
@pytest.fixture
|
|
def app(self):
|
|
"""Create test Flask application"""
|
|
# Mock the services to avoid actual PLC connections during testing
|
|
with patch('watermaker_plc_api.services.plc_connection.get_plc_connection'), \
|
|
patch('watermaker_plc_api.services.data_cache.get_data_cache'), \
|
|
patch('watermaker_plc_api.services.background_tasks.start_background_updates'):
|
|
|
|
from watermaker_plc_api.app import create_app
|
|
from watermaker_plc_api.config import TestingConfig
|
|
|
|
app = create_app(TestingConfig)
|
|
app.config['TESTING'] = True
|
|
return app
|
|
|
|
@pytest.fixture
|
|
def client(self, app):
|
|
"""Create test client"""
|
|
return app.test_client()
|
|
|
|
def test_404_error(self, client):
|
|
"""Test 404 error handling"""
|
|
response = client.get('/api/nonexistent')
|
|
assert response.status_code == 404
|
|
|
|
data = json.loads(response.data)
|
|
assert data['success'] is False
|
|
assert data['error'] == 'Not Found'
|
|
|
|
def test_405_method_not_allowed(self, client):
|
|
"""Test 405 method not allowed error"""
|
|
response = client.delete('/api/status') # DELETE not allowed on status endpoint
|
|
assert response.status_code == 405
|
|
|
|
data = json.loads(response.data)
|
|
assert data['success'] is False
|
|
assert data['error'] == 'Method Not Allowed' |