Files
FCI_WaterMaker_API/tests/test_controllers.py
2025-06-08 15:53:25 +00:00

304 lines
12 KiB
Python

"""
Tests for API controllers.
"""
import pytest
import json
from unittest.mock import Mock, patch, MagicMock
import sys
import os
# Add the parent directory to the path so we can import the package
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
class TestControllers:
"""Test cases for API controllers"""
@pytest.fixture
def app(self):
"""Create test Flask application"""
# Mock the services to avoid actual PLC connections during testing
with patch('watermaker_plc_api.services.plc_connection.get_plc_connection'), \
patch('watermaker_plc_api.services.data_cache.get_data_cache'), \
patch('watermaker_plc_api.services.background_tasks.start_background_updates'):
from watermaker_plc_api.app import create_app
from watermaker_plc_api.config import TestingConfig
app = create_app(TestingConfig)
app.config['TESTING'] = True
return app
@pytest.fixture
def client(self, app):
"""Create test client"""
return app.test_client()
def test_status_endpoint(self, client):
"""Test /api/status endpoint"""
with patch('watermaker_plc_api.controllers.system_controller.cache') as mock_cache, \
patch('watermaker_plc_api.controllers.system_controller.plc') as mock_plc:
mock_cache.get_connection_status.return_value = "connected"
mock_cache.get_last_update.return_value = "2025-06-03T12:00:00"
mock_plc.get_connection_status.return_value = {
"ip_address": "127.0.0.1",
"port": 502,
"connected": True
}
response = client.get('/api/status')
assert response.status_code == 200
data = json.loads(response.data)
assert 'connection_status' in data
assert 'last_update' in data
assert 'plc_config' in data
assert 'timestamp' in data
def test_config_endpoint(self, client):
"""Test /api/config endpoint"""
response = client.get('/api/config')
assert response.status_code == 200
data = json.loads(response.data)
assert 'api_version' in data
assert 'endpoints' in data
assert 'variable_groups' in data
assert 'total_variables' in data
def test_sensors_endpoint(self, client):
"""Test /api/sensors endpoint"""
with patch('watermaker_plc_api.controllers.sensors_controller.cache') as mock_cache:
# Mock cache data
mock_cache.get_sensors.return_value = {
"1000": {
"name": "System Mode",
"raw_value": 5,
"scaled_value": 5,
"unit": "",
"category": "system"
}
}
mock_cache.get_last_update.return_value = "2025-06-03T12:00:00"
response = client.get('/api/sensors')
assert response.status_code == 200
data = json.loads(response.data)
assert 'sensors' in data
assert 'last_update' in data
assert 'count' in data
assert data['count'] == 1
def test_sensors_category_endpoint(self, client):
"""Test /api/sensors/category/<category> endpoint"""
with patch('watermaker_plc_api.controllers.sensors_controller.cache') as mock_cache:
mock_cache.get_sensors_by_category.return_value = {}
mock_cache.get_last_update.return_value = "2025-06-03T12:00:00"
# Test valid category
response = client.get('/api/sensors/category/system')
assert response.status_code == 200
# Test invalid category
response = client.get('/api/sensors/category/invalid')
assert response.status_code == 400
def test_timers_endpoint(self, client):
"""Test /api/timers endpoint"""
with patch('watermaker_plc_api.controllers.timers_controller.cache') as mock_cache:
# Mock cache data
mock_cache.get_timers.return_value = {
"136": {
"name": "FWF Timer",
"raw_value": 0,
"scaled_value": 0,
"active": False
}
}
mock_cache.get_active_timers.return_value = []
mock_cache.get_last_update.return_value = "2025-06-03T12:00:00"
response = client.get('/api/timers')
assert response.status_code == 200
data = json.loads(response.data)
assert 'timers' in data
assert 'active_timers' in data
assert 'total_count' in data
assert 'active_count' in data
def test_outputs_endpoint(self, client):
"""Test /api/outputs endpoint"""
with patch('watermaker_plc_api.controllers.outputs_controller.cache') as mock_cache:
# Mock cache data
mock_cache.get_outputs.return_value = {
"40017": {
"register": 40017,
"value": 0,
"binary": "0000000000000000",
"bits": []
}
}
mock_cache.get_last_update.return_value = "2025-06-03T12:00:00"
response = client.get('/api/outputs')
assert response.status_code == 200
data = json.loads(response.data)
assert 'outputs' in data
assert 'last_update' in data
assert 'count' in data
def test_write_register_endpoint(self, client):
"""Test /api/write/register endpoint"""
# Test missing content-type (no JSON)
response = client.post('/api/write/register')
assert response.status_code == 400
data = json.loads(response.data)
assert "Request must be JSON" in data['message']
# Test missing data with proper content-type
response = client.post('/api/write/register',
data=json.dumps({}),
content_type='application/json')
assert response.status_code == 400
# Test missing fields
response = client.post('/api/write/register',
data=json.dumps({"address": 1000}),
content_type='application/json')
assert response.status_code == 400
# Test invalid values
response = client.post('/api/write/register',
data=json.dumps({"address": -1, "value": 5}),
content_type='application/json')
assert response.status_code == 400
def test_select_endpoint_no_params(self, client):
"""Test /api/select endpoint without parameters"""
response = client.get('/api/select')
assert response.status_code == 400
data = json.loads(response.data)
assert 'usage' in data['details']
def test_dts_start_endpoint(self, client):
"""Test /api/dts/start endpoint"""
with patch('watermaker_plc_api.controllers.dts_controller.start_dts_sequence_async') as mock_start:
# Mock successful start
mock_start.return_value = (True, "DTS sequence started", {"task_id": "abc12345"})
response = client.post('/api/dts/start')
assert response.status_code == 202
data = json.loads(response.data)
assert data['success'] is True
assert 'task_id' in data
assert 'status_endpoint' in data
def test_dts_start_conflict(self, client):
"""Test /api/dts/start endpoint with conflict"""
with patch('watermaker_plc_api.controllers.dts_controller.start_dts_sequence_async') as mock_start:
# Mock operation already running
mock_start.return_value = (False, "Operation already in progress", {"existing_task_id": "def67890"})
response = client.post('/api/dts/start')
assert response.status_code == 409
data = json.loads(response.data)
assert data['success'] is False
def test_dts_status_endpoint_not_found(self, client):
"""Test /api/dts/status/<task_id> endpoint with non-existent task"""
response = client.get('/api/dts/status/nonexistent')
assert response.status_code == 404
data = json.loads(response.data)
assert 'available_tasks' in data['details']
def test_dts_cancel_endpoint_not_found(self, client):
"""Test /api/dts/cancel/<task_id> endpoint with non-existent task"""
response = client.post('/api/dts/cancel/nonexistent')
assert response.status_code == 404
data = json.loads(response.data)
assert data['success'] is False
def test_dts_cancel_endpoint_success(self, client):
"""Test successful task cancellation"""
with patch('watermaker_plc_api.controllers.dts_controller.dts_operations') as mock_operations:
# Mock existing running task
mock_task = {
"task_id": "abc12345",
"status": "running",
"current_step": "waiting_for_valves"
}
mock_operations.get.return_value = mock_task
response = client.post('/api/dts/cancel/abc12345')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
def test_dts_cancel_endpoint_not_running(self, client):
"""Test cancelling non-running task"""
with patch('watermaker_plc_api.controllers.dts_controller.dts_operations') as mock_operations:
# Mock existing completed task
mock_operations.get.return_value = {
"task_id": "abc12345",
"status": "completed",
"current_step": "completed"
}
response = client.post('/api/dts/cancel/abc12345')
assert response.status_code == 400
data = json.loads(response.data)
assert data['success'] is False
class TestErrorHandling:
"""Test cases for error handling across controllers"""
@pytest.fixture
def app(self):
"""Create test Flask application"""
# Mock the services to avoid actual PLC connections during testing
with patch('watermaker_plc_api.services.plc_connection.get_plc_connection'), \
patch('watermaker_plc_api.services.data_cache.get_data_cache'), \
patch('watermaker_plc_api.services.background_tasks.start_background_updates'):
from watermaker_plc_api.app import create_app
from watermaker_plc_api.config import TestingConfig
app = create_app(TestingConfig)
app.config['TESTING'] = True
return app
@pytest.fixture
def client(self, app):
"""Create test client"""
return app.test_client()
def test_404_error(self, client):
"""Test 404 error handling"""
response = client.get('/api/nonexistent')
assert response.status_code == 404
data = json.loads(response.data)
assert data['success'] is False
assert data['error'] == 'Not Found'
def test_405_method_not_allowed(self, client):
"""Test 405 method not allowed error"""
response = client.delete('/api/status') # DELETE not allowed on status endpoint
assert response.status_code == 405
data = json.loads(response.data)
assert data['success'] is False
assert data['error'] == 'Method Not Allowed'