This commit is contained in:
2025-06-08 15:53:25 +00:00
commit aabc180d20
37 changed files with 4858 additions and 0 deletions

53
.gitignore vendored Normal file
View File

@@ -0,0 +1,53 @@
# Python bytecode
__pycache__/
*.py[cod]
*$py.class
# Distribution / packaging
dist/
build/
*.egg-info/
*.egg
# Unit test / coverage reports
htmlcov/
.tox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
.hypothesis/
pytest_cache/
.pytest_cache/
# Virtual environments
venv/
env/
ENV/
.env
.venv
# IDE specific files
.idea/
.vscode/
*.swp
*.swo
.DS_Store
# Project specific
*.log
logs/
temp/
tmp/
# Local configuration
.env.local
.env.development.local
.env.test.local
.env.production.local
# Database
*.db
*.sqlite3

158
README.md Normal file
View File

@@ -0,0 +1,158 @@
# Watermaker PLC API
RESTful API for monitoring and controlling watermaker PLC systems. Provides access to sensors, timers, controls, and watermaker operation sequences.
## Features
- **Real-time Data Monitoring**: Sensors, timers, outputs, runtime hours, water counters
- **Selective Data Retrieval**: Bandwidth-optimized API for specific data groups/keys
- **Asynchronous Control Operations**: DTS start/stop/skip sequences with progress tracking
- **Background Data Updates**: Continuous PLC monitoring with configurable intervals
- **Thread-safe Data Caching**: Centralized cache with concurrent access support
- **Comprehensive Error Handling**: Structured error responses and logging
- **Modular Architecture**: Clean separation of concerns with service layers
## Quick Start
### Installation
```bash
# Clone repository
git clone <repository-url>
cd watermaker-plc-api
# Install dependencies
pip install -r requirements.txt
# Install package
pip install -e .
```
### Configuration
Set environment variables:
```bash
export PLC_IP=192.168.1.15
export PLC_PORT=502
export DATA_UPDATE_INTERVAL=5
export LOG_LEVEL=INFO
```
### Running the API
```bash
# Using the command line entry point
watermaker-api --host 0.0.0.0 --port 5000
# Or run directly
python -m watermaker_plc_api.main
```
## API Endpoints
### Data Monitoring
- `GET /api/status` - Connection and system status
- `GET /api/all` - All PLC data in one response
- `GET /api/select` - Selective data retrieval (bandwidth optimized)
- `GET /api/sensors` - All sensor data
- `GET /api/timers` - All timer data
- `GET /api/outputs` - Output control data
- `GET /api/runtime` - Runtime hours data
- `GET /api/water_counters` - Water production counters
### Control Operations
- `POST /api/dts/start` - Start DTS watermaker sequence (async)
- `POST /api/dts/stop` - Stop watermaker sequence (async)
- `POST /api/dts/skip` - Skip current step (async)
- `GET /api/dts/status` - Get DTS operation status
- `POST /api/write/register` - Write single register
### Selective API Examples
```bash
# Get temperature and pressure sensors only
curl "http://localhost:5000/api/select?groups=temperature,pressure"
# Get specific registers
curl "http://localhost:5000/api/select?keys=1036,1003,1017,1121"
# Combined approach
curl "http://localhost:5000/api/select?groups=dts_timer&keys=1036"
```
### Control Examples
```bash
# Start DTS sequence (returns immediately with task_id)
curl -X POST http://localhost:5000/api/dts/start
# Poll for progress
curl http://localhost:5000/api/dts/status/abc12345
# Stop watermaker
curl -X POST http://localhost:5000/api/dts/stop
```
## Architecture
```
watermaker_plc_api/
├── app.py # Flask application factory
├── config.py # Configuration management
├── controllers/ # API route handlers
├── services/ # Business logic layer
├── models/ # Data models and mappings
└── utils/ # Utilities and helpers
```
### Key Components
- **PLCConnection**: Thread-safe Modbus TCP communication
- **DataCache**: Centralized, thread-safe data storage
- **RegisterReader**: Service for reading PLC registers
- **RegisterWriter**: Service for writing PLC registers
- **BackgroundTaskManager**: Continuous data updates
- **Controllers**: RESTful API endpoints
## Variable Groups
| Group | Description | Count |
|-------|-------------|-------|
| system | System status and mode | 2 |
| pressure | Water pressure sensors | 3 |
| temperature | Temperature monitoring | 2 |
| flow | Flow rate meters | 3 |
| quality | Water quality (TDS) sensors | 2 |
| fwf_timer | Fresh water flush timers | 1 |
| dts_timer | DTS process step timers | 6 |
| rtc | Real-time clock data | 6 |
| outputs | Digital output controls | 6 |
| runtime | System runtime hours | 1 |
| water_counters | Water production counters | 4 |
## Development
### Running Tests
```bash
python -m pytest tests/
```
### Environment Setup
```bash
# Development environment
export FLASK_ENV=development
export DEBUG=true
export PLC_IP=127.0.0.1 # For simulator
# Production environment
export FLASK_ENV=production
export SECRET_KEY=your-secret-key
export PLC_IP=192.168.1.15
```
## License
MIT License - see LICENSE file for details.

13
pytest.ini Normal file
View File

@@ -0,0 +1,13 @@
[tool:pytest]
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
addopts =
-v
--tb=short
--strict-markers
--disable-warnings
filterwarnings =
ignore::DeprecationWarning
ignore::PendingDeprecationWarning

3
requirements.txt Normal file
View File

@@ -0,0 +1,3 @@
Flask==2.3.3
Flask-CORS==4.0.0
pymodbus==3.4.1

85
run_server.py Executable file
View File

@@ -0,0 +1,85 @@
#!/usr/bin/env python3
"""
Standalone script to run the Watermaker PLC API server.
This can be used when the package installation has issues.
"""
import sys
import os
import argparse
# Add the package directory to Python path
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
from watermaker_plc_api.app import create_app
from watermaker_plc_api.config import Config
from watermaker_plc_api.utils.logger import get_logger
from watermaker_plc_api.services.background_tasks import start_background_updates
logger = get_logger(__name__)
def parse_args():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(description='Watermaker PLC API Server')
parser.add_argument('--host', default='0.0.0.0',
help='Host to bind to (default: 0.0.0.0)')
parser.add_argument('--port', type=int, default=5000,
help='Port to bind to (default: 5000)')
parser.add_argument('--plc-ip', default='192.168.1.15',
help='PLC IP address (default: 192.168.1.15)')
parser.add_argument('--plc-port', type=int, default=502,
help='PLC Modbus port (default: 502)')
parser.add_argument('--debug', action='store_true',
help='Enable debug mode')
parser.add_argument('--no-background-updates', action='store_true',
help='Disable background data updates')
return parser.parse_args()
def main():
"""Main application entry point"""
args = parse_args()
# Update config with command line arguments
Config.PLC_IP = args.plc_ip
Config.PLC_PORT = args.plc_port
Config.DEBUG = args.debug
logger.info("Starting Watermaker PLC API Server (Standalone)")
logger.info(f"API Version: 1.1.0")
logger.info(f"PLC Target: {Config.PLC_IP}:{Config.PLC_PORT}")
logger.info(f"Server: http://{args.host}:{args.port}")
# Create Flask application
app = create_app()
# Start background data updates (unless disabled)
if not args.no_background_updates:
start_background_updates()
logger.info("Background data update thread started")
else:
logger.warning("Background data updates disabled")
logger.info("🚀 Server starting...")
logger.info("📊 API Documentation: http://localhost:5000/api/config")
logger.info("❤️ Health Check: http://localhost:5000/api/status")
try:
# Start the Flask server
app.run(
host=args.host,
port=args.port,
debug=args.debug,
threaded=True
)
except KeyboardInterrupt:
logger.info("Server shutdown requested by user")
sys.exit(0)
except Exception as e:
logger.error(f"Failed to start server: {e}")
sys.exit(1)
if __name__ == '__main__':
main()

37
setup.py Normal file
View File

@@ -0,0 +1,37 @@
from setuptools import setup, find_packages
with open("README.md", "r", encoding="utf-8") as fh:
long_description = fh.read()
with open("requirements.txt", "r", encoding="utf-8") as fh:
requirements = [line.strip() for line in fh if line.strip() and not line.startswith("#")]
setup(
name="watermaker-plc-api",
version="1.1.0",
author="Your Name",
author_email="paul@golownia.com",
description="RESTful API for Watermaker PLC monitoring and control",
long_description=long_description,
long_description_content_type="text/markdown",
url="https://github.com/terbonium/watermaker-plc-api.git",
packages=find_packages(),
classifiers=[
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
"Programming Language :: Python :: 3",
"Programming Language :: Python :: 3.8",
"Programming Language :: Python :: 3.9",
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
],
python_requires=">=3.8",
install_requires=requirements,
entry_points={
"console_scripts": [
"watermaker-api=watermaker_plc_api.main:main",
],
},
)

5
tests/__init__.py Normal file
View File

@@ -0,0 +1,5 @@
"""
Test suite for the Watermaker PLC API.
"""
# Test configuration and utilities can be placed here

304
tests/test_controllers.py Normal file
View File

@@ -0,0 +1,304 @@
"""
Tests for API controllers.
"""
import pytest
import json
from unittest.mock import Mock, patch, MagicMock
import sys
import os
# Add the parent directory to the path so we can import the package
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
class TestControllers:
"""Test cases for API controllers"""
@pytest.fixture
def app(self):
"""Create test Flask application"""
# Mock the services to avoid actual PLC connections during testing
with patch('watermaker_plc_api.services.plc_connection.get_plc_connection'), \
patch('watermaker_plc_api.services.data_cache.get_data_cache'), \
patch('watermaker_plc_api.services.background_tasks.start_background_updates'):
from watermaker_plc_api.app import create_app
from watermaker_plc_api.config import TestingConfig
app = create_app(TestingConfig)
app.config['TESTING'] = True
return app
@pytest.fixture
def client(self, app):
"""Create test client"""
return app.test_client()
def test_status_endpoint(self, client):
"""Test /api/status endpoint"""
with patch('watermaker_plc_api.controllers.system_controller.cache') as mock_cache, \
patch('watermaker_plc_api.controllers.system_controller.plc') as mock_plc:
mock_cache.get_connection_status.return_value = "connected"
mock_cache.get_last_update.return_value = "2025-06-03T12:00:00"
mock_plc.get_connection_status.return_value = {
"ip_address": "127.0.0.1",
"port": 502,
"connected": True
}
response = client.get('/api/status')
assert response.status_code == 200
data = json.loads(response.data)
assert 'connection_status' in data
assert 'last_update' in data
assert 'plc_config' in data
assert 'timestamp' in data
def test_config_endpoint(self, client):
"""Test /api/config endpoint"""
response = client.get('/api/config')
assert response.status_code == 200
data = json.loads(response.data)
assert 'api_version' in data
assert 'endpoints' in data
assert 'variable_groups' in data
assert 'total_variables' in data
def test_sensors_endpoint(self, client):
"""Test /api/sensors endpoint"""
with patch('watermaker_plc_api.controllers.sensors_controller.cache') as mock_cache:
# Mock cache data
mock_cache.get_sensors.return_value = {
"1000": {
"name": "System Mode",
"raw_value": 5,
"scaled_value": 5,
"unit": "",
"category": "system"
}
}
mock_cache.get_last_update.return_value = "2025-06-03T12:00:00"
response = client.get('/api/sensors')
assert response.status_code == 200
data = json.loads(response.data)
assert 'sensors' in data
assert 'last_update' in data
assert 'count' in data
assert data['count'] == 1
def test_sensors_category_endpoint(self, client):
"""Test /api/sensors/category/<category> endpoint"""
with patch('watermaker_plc_api.controllers.sensors_controller.cache') as mock_cache:
mock_cache.get_sensors_by_category.return_value = {}
mock_cache.get_last_update.return_value = "2025-06-03T12:00:00"
# Test valid category
response = client.get('/api/sensors/category/system')
assert response.status_code == 200
# Test invalid category
response = client.get('/api/sensors/category/invalid')
assert response.status_code == 400
def test_timers_endpoint(self, client):
"""Test /api/timers endpoint"""
with patch('watermaker_plc_api.controllers.timers_controller.cache') as mock_cache:
# Mock cache data
mock_cache.get_timers.return_value = {
"136": {
"name": "FWF Timer",
"raw_value": 0,
"scaled_value": 0,
"active": False
}
}
mock_cache.get_active_timers.return_value = []
mock_cache.get_last_update.return_value = "2025-06-03T12:00:00"
response = client.get('/api/timers')
assert response.status_code == 200
data = json.loads(response.data)
assert 'timers' in data
assert 'active_timers' in data
assert 'total_count' in data
assert 'active_count' in data
def test_outputs_endpoint(self, client):
"""Test /api/outputs endpoint"""
with patch('watermaker_plc_api.controllers.outputs_controller.cache') as mock_cache:
# Mock cache data
mock_cache.get_outputs.return_value = {
"40017": {
"register": 40017,
"value": 0,
"binary": "0000000000000000",
"bits": []
}
}
mock_cache.get_last_update.return_value = "2025-06-03T12:00:00"
response = client.get('/api/outputs')
assert response.status_code == 200
data = json.loads(response.data)
assert 'outputs' in data
assert 'last_update' in data
assert 'count' in data
def test_write_register_endpoint(self, client):
"""Test /api/write/register endpoint"""
# Test missing content-type (no JSON)
response = client.post('/api/write/register')
assert response.status_code == 400
data = json.loads(response.data)
assert "Request must be JSON" in data['message']
# Test missing data with proper content-type
response = client.post('/api/write/register',
data=json.dumps({}),
content_type='application/json')
assert response.status_code == 400
# Test missing fields
response = client.post('/api/write/register',
data=json.dumps({"address": 1000}),
content_type='application/json')
assert response.status_code == 400
# Test invalid values
response = client.post('/api/write/register',
data=json.dumps({"address": -1, "value": 5}),
content_type='application/json')
assert response.status_code == 400
def test_select_endpoint_no_params(self, client):
"""Test /api/select endpoint without parameters"""
response = client.get('/api/select')
assert response.status_code == 400
data = json.loads(response.data)
assert 'usage' in data['details']
def test_dts_start_endpoint(self, client):
"""Test /api/dts/start endpoint"""
with patch('watermaker_plc_api.controllers.dts_controller.start_dts_sequence_async') as mock_start:
# Mock successful start
mock_start.return_value = (True, "DTS sequence started", {"task_id": "abc12345"})
response = client.post('/api/dts/start')
assert response.status_code == 202
data = json.loads(response.data)
assert data['success'] is True
assert 'task_id' in data
assert 'status_endpoint' in data
def test_dts_start_conflict(self, client):
"""Test /api/dts/start endpoint with conflict"""
with patch('watermaker_plc_api.controllers.dts_controller.start_dts_sequence_async') as mock_start:
# Mock operation already running
mock_start.return_value = (False, "Operation already in progress", {"existing_task_id": "def67890"})
response = client.post('/api/dts/start')
assert response.status_code == 409
data = json.loads(response.data)
assert data['success'] is False
def test_dts_status_endpoint_not_found(self, client):
"""Test /api/dts/status/<task_id> endpoint with non-existent task"""
response = client.get('/api/dts/status/nonexistent')
assert response.status_code == 404
data = json.loads(response.data)
assert 'available_tasks' in data['details']
def test_dts_cancel_endpoint_not_found(self, client):
"""Test /api/dts/cancel/<task_id> endpoint with non-existent task"""
response = client.post('/api/dts/cancel/nonexistent')
assert response.status_code == 404
data = json.loads(response.data)
assert data['success'] is False
def test_dts_cancel_endpoint_success(self, client):
"""Test successful task cancellation"""
with patch('watermaker_plc_api.controllers.dts_controller.dts_operations') as mock_operations:
# Mock existing running task
mock_task = {
"task_id": "abc12345",
"status": "running",
"current_step": "waiting_for_valves"
}
mock_operations.get.return_value = mock_task
response = client.post('/api/dts/cancel/abc12345')
assert response.status_code == 200
data = json.loads(response.data)
assert data['success'] is True
def test_dts_cancel_endpoint_not_running(self, client):
"""Test cancelling non-running task"""
with patch('watermaker_plc_api.controllers.dts_controller.dts_operations') as mock_operations:
# Mock existing completed task
mock_operations.get.return_value = {
"task_id": "abc12345",
"status": "completed",
"current_step": "completed"
}
response = client.post('/api/dts/cancel/abc12345')
assert response.status_code == 400
data = json.loads(response.data)
assert data['success'] is False
class TestErrorHandling:
"""Test cases for error handling across controllers"""
@pytest.fixture
def app(self):
"""Create test Flask application"""
# Mock the services to avoid actual PLC connections during testing
with patch('watermaker_plc_api.services.plc_connection.get_plc_connection'), \
patch('watermaker_plc_api.services.data_cache.get_data_cache'), \
patch('watermaker_plc_api.services.background_tasks.start_background_updates'):
from watermaker_plc_api.app import create_app
from watermaker_plc_api.config import TestingConfig
app = create_app(TestingConfig)
app.config['TESTING'] = True
return app
@pytest.fixture
def client(self, app):
"""Create test client"""
return app.test_client()
def test_404_error(self, client):
"""Test 404 error handling"""
response = client.get('/api/nonexistent')
assert response.status_code == 404
data = json.loads(response.data)
assert data['success'] is False
assert data['error'] == 'Not Found'
def test_405_method_not_allowed(self, client):
"""Test 405 method not allowed error"""
response = client.delete('/api/status') # DELETE not allowed on status endpoint
assert response.status_code == 405
data = json.loads(response.data)
assert data['success'] is False
assert data['error'] == 'Method Not Allowed'

View File

@@ -0,0 +1,130 @@
"""
Tests for data conversion utilities.
"""
import pytest
import struct
import sys
import os
# Add the parent directory to the path so we can import the package
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from watermaker_plc_api.utils.data_conversion import (
scale_value,
convert_ieee754_float,
convert_gallon_counter,
get_descriptive_value,
validate_register_value,
format_binary_string
)
class TestDataConversion:
"""Test cases for data conversion utilities"""
def test_scale_value_direct(self):
"""Test direct scaling (no change)"""
assert scale_value(100, "direct") == 100
assert scale_value(0, "direct") == 0
def test_scale_value_divide(self):
"""Test division scaling"""
assert scale_value(100, "÷10") == 10.0
assert scale_value(250, "÷10") == 25.0
assert scale_value(1000, "÷100") == 10.0
def test_scale_value_multiply(self):
"""Test multiplication scaling"""
assert scale_value(10, "×10") == 100.0
assert scale_value(5, "×2") == 10.0
def test_scale_value_invalid(self):
"""Test invalid scaling types"""
# Should return original value for invalid scale types
assert scale_value(100, "invalid") == 100
assert scale_value(100, "÷0") == 100 # Division by zero
assert scale_value(100, "×abc") == 100 # Invalid multiplier
def test_convert_ieee754_float(self):
"""Test IEEE 754 float conversion"""
# Test known values
# 1.0 in IEEE 754: 0x3F800000
high = 0x3F80
low = 0x0000
result = convert_ieee754_float(high, low)
assert result == 1.0
# Test another known value
# 3.14159 in IEEE 754: approximately 0x40490FD0
high = 0x4049
low = 0x0FD0
result = convert_ieee754_float(high, low)
assert abs(result - 3.14) < 0.01 # Allow small floating point differences
def test_convert_gallon_counter(self):
"""Test gallon counter conversion (same as IEEE 754)"""
high = 0x3F80
low = 0x0000
result = convert_gallon_counter(high, low)
assert result == 1.0
def test_get_descriptive_value_with_mapping(self):
"""Test getting descriptive value with value mapping"""
config = {
"values": {
"0": "Standby",
"5": "Running",
"7": "Service"
}
}
assert get_descriptive_value(0, config) == "Standby"
assert get_descriptive_value(5, config) == "Running"
assert get_descriptive_value(7, config) == "Service"
assert get_descriptive_value(99, config) == "Unknown (99)"
def test_get_descriptive_value_without_mapping(self):
"""Test getting descriptive value without value mapping"""
config = {}
assert get_descriptive_value(100, config) == 100
def test_validate_register_value(self):
"""Test register value validation"""
# Valid values
assert validate_register_value(0) is True
assert validate_register_value(1000) is True
assert validate_register_value(65533) is True
# Invalid values
assert validate_register_value(None) is False
assert validate_register_value(-1) is False
assert validate_register_value(65534) is False
assert validate_register_value(65535) is False
assert validate_register_value("string") is False
def test_validate_register_value_custom_max(self):
"""Test register value validation with custom maximum"""
assert validate_register_value(100, max_value=1000) is True
assert validate_register_value(1000, max_value=1000) is False
assert validate_register_value(999, max_value=1000) is True
def test_format_binary_string(self):
"""Test binary string formatting"""
assert format_binary_string(5) == "0000000000000101"
assert format_binary_string(255) == "0000000011111111"
assert format_binary_string(0) == "0000000000000000"
# Test custom width
assert format_binary_string(5, width=8) == "00000101"
assert format_binary_string(15, width=4) == "1111"
def test_ieee754_edge_cases(self):
"""Test IEEE 754 conversion edge cases"""
# Test with None return on error
result = convert_ieee754_float(None, 0)
assert result is None
# Test zero
result = convert_ieee754_float(0, 0)
assert result == 0.0

View File

@@ -0,0 +1,125 @@
"""
Tests for PLC connection functionality.
"""
import pytest
import sys
import os
from unittest.mock import Mock, patch
# Add the parent directory to the path so we can import the package
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from watermaker_plc_api.services.plc_connection import PLCConnection, get_plc_connection
from watermaker_plc_api.utils.error_handler import PLCConnectionError
class TestPLCConnection:
"""Test cases for PLCConnection class"""
def setup_method(self):
"""Reset the global connection instance before each test"""
# Clear the global singleton for clean tests
import watermaker_plc_api.services.plc_connection
watermaker_plc_api.services.plc_connection._plc_connection = None
def test_singleton_pattern(self):
"""Test that get_plc_connection returns the same instance"""
conn1 = get_plc_connection()
conn2 = get_plc_connection()
assert conn1 is conn2
@patch('watermaker_plc_api.services.plc_connection.ModbusTcpClient')
def test_successful_connection(self, mock_client_class):
"""Test successful PLC connection"""
# Setup mock
mock_client = Mock()
mock_client.connect.return_value = True
mock_client_class.return_value = mock_client
# Test connection
plc = PLCConnection()
result = plc.connect()
assert result is True
assert plc.is_connected is True
mock_client.connect.assert_called_once()
@patch('watermaker_plc_api.services.plc_connection.ModbusTcpClient')
def test_failed_connection(self, mock_client_class):
"""Test failed PLC connection"""
# Setup mock
mock_client = Mock()
mock_client.connect.return_value = False
mock_client_class.return_value = mock_client
# Test connection
plc = PLCConnection()
result = plc.connect()
assert result is False
assert plc.is_connected is False
@patch('watermaker_plc_api.services.plc_connection.ModbusTcpClient')
def test_read_input_register(self, mock_client_class):
"""Test reading input register"""
# Setup mock
mock_client = Mock()
mock_client.connect.return_value = True
mock_result = Mock()
mock_result.registers = [1234]
mock_result.isError.return_value = False
mock_client.read_input_registers.return_value = mock_result
mock_client_class.return_value = mock_client
# Test read
plc = PLCConnection()
plc.connect()
value = plc.read_input_register(1000)
assert value == 1234
mock_client.read_input_registers.assert_called_with(1000, 1, slave=1)
@patch('watermaker_plc_api.services.plc_connection.ModbusTcpClient')
def test_write_holding_register(self, mock_client_class):
"""Test writing holding register"""
# Setup mock
mock_client = Mock()
mock_client.connect.return_value = True
mock_result = Mock()
mock_result.isError.return_value = False
mock_client.write_register.return_value = mock_result
mock_client_class.return_value = mock_client
# Test write
plc = PLCConnection()
plc.connect()
success = plc.write_holding_register(1000, 5)
assert success is True
mock_client.write_register.assert_called_with(1000, 5, slave=1)
def test_write_without_connection(self):
"""Test writing register without PLC connection"""
with patch('watermaker_plc_api.services.plc_connection.ModbusTcpClient') as mock_client_class:
# Setup mock to fail connection
mock_client = Mock()
mock_client.connect.return_value = False
mock_client_class.return_value = mock_client
plc = PLCConnection()
with pytest.raises(PLCConnectionError):
plc.write_holding_register(1000, 5)
def test_get_connection_status(self):
"""Test getting connection status information"""
plc = PLCConnection()
status = plc.get_connection_status()
assert isinstance(status, dict)
assert "connected" in status
assert "ip_address" in status
assert "port" in status
assert "unit_id" in status
assert "timeout" in status

View File

@@ -0,0 +1,140 @@
"""
Tests for register reader service.
"""
import pytest
import sys
import os
from unittest.mock import Mock, patch
# Add the parent directory to the path so we can import the package
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from watermaker_plc_api.services.register_reader import RegisterReader
class TestRegisterReader:
"""Test cases for RegisterReader service"""
@patch('watermaker_plc_api.services.register_reader.get_plc_connection')
@patch('watermaker_plc_api.services.register_reader.get_data_cache')
def test_update_sensors_success(self, mock_cache, mock_plc):
"""Test successful sensor data update"""
# Setup mocks
mock_plc_instance = Mock()
mock_plc_instance.read_input_register.return_value = 1000
mock_plc.return_value = mock_plc_instance
mock_cache_instance = Mock()
mock_cache.return_value = mock_cache_instance
# Test update
reader = RegisterReader()
result = reader.update_sensors()
assert result is True
mock_cache_instance.set_sensors.assert_called_once()
@patch('watermaker_plc_api.services.register_reader.get_plc_connection')
@patch('watermaker_plc_api.services.register_reader.get_data_cache')
def test_update_sensors_read_failure(self, mock_cache, mock_plc):
"""Test sensor update with read failure"""
# Setup mocks
mock_plc_instance = Mock()
mock_plc_instance.read_input_register.return_value = None # Read failure
mock_plc.return_value = mock_plc_instance
mock_cache_instance = Mock()
mock_cache.return_value = mock_cache_instance
# Test update
reader = RegisterReader()
result = reader.update_sensors()
assert result is True # Should still succeed even if no valid reads
mock_cache_instance.set_sensors.assert_called_once()
@patch('watermaker_plc_api.services.register_reader.get_plc_connection')
@patch('watermaker_plc_api.services.register_reader.get_data_cache')
def test_update_timers_success(self, mock_cache, mock_plc):
"""Test successful timer data update"""
# Setup mocks
mock_plc_instance = Mock()
mock_plc_instance.read_holding_register.return_value = 100
mock_plc.return_value = mock_plc_instance
mock_cache_instance = Mock()
mock_cache.return_value = mock_cache_instance
# Test update
reader = RegisterReader()
result = reader.update_timers()
assert result is True
mock_cache_instance.set_timers.assert_called_once()
@patch('watermaker_plc_api.services.register_reader.get_plc_connection')
@patch('watermaker_plc_api.services.register_reader.get_data_cache')
def test_read_register_pair_success(self, mock_cache, mock_plc):
"""Test successful register pair reading"""
# Setup mocks
mock_plc_instance = Mock()
mock_plc_instance.read_holding_register.side_effect = [0x3F80, 0x0000] # IEEE 754 for 1.0
mock_plc.return_value = mock_plc_instance
mock_cache_instance = Mock()
mock_cache.return_value = mock_cache_instance
# Test read
reader = RegisterReader()
success, converted, high, low = reader.read_register_pair(5014, 5015, "ieee754")
assert success is True
assert converted == 1.0
assert high == 0x3F80
assert low == 0x0000
@patch('watermaker_plc_api.services.register_reader.get_plc_connection')
@patch('watermaker_plc_api.services.register_reader.get_data_cache')
def test_read_register_pair_failure(self, mock_cache, mock_plc):
"""Test register pair reading with failure"""
# Setup mocks
mock_plc_instance = Mock()
mock_plc_instance.read_holding_register.return_value = None # Read failure
mock_plc.return_value = mock_plc_instance
mock_cache_instance = Mock()
mock_cache.return_value = mock_cache_instance
# Test read
reader = RegisterReader()
success, converted, high, low = reader.read_register_pair(5014, 5015, "ieee754")
assert success is False
assert converted is None
@patch('watermaker_plc_api.services.register_reader.get_plc_connection')
@patch('watermaker_plc_api.services.register_reader.get_data_cache')
def test_read_selective_data(self, mock_cache, mock_plc):
"""Test selective data reading"""
# Setup mocks
mock_plc_instance = Mock()
mock_plc_instance.read_input_register.return_value = 500
mock_plc_instance.read_holding_register.return_value = 100
mock_plc.return_value = mock_plc_instance
mock_cache_instance = Mock()
mock_cache.return_value = mock_cache_instance
# Test selective read
reader = RegisterReader()
result = reader.read_selective_data(["temperature"], ["1036"])
assert "sensors" in result
assert "timers" in result
assert "requested_groups" in result
assert "requested_keys" in result
assert "summary" in result
assert result["requested_groups"] == ["temperature"]
assert result["requested_keys"] == ["1036"]

View File

@@ -0,0 +1,12 @@
{
"folders": [
{
"path": "/root/FCI/api"
}
],
"settings": {
"python.defaultInterpreterPath": "/root/FCI/api/venv/bin/python",
"python.terminal.activateEnvironment": true,
"terminal.integrated.cwd": "/root/FCI/api"
}
}

View File

@@ -0,0 +1,15 @@
"""
Watermaker PLC API Package
RESTful API for monitoring and controlling watermaker PLC systems.
Provides access to sensors, timers, controls, and watermaker operation sequences.
"""
__version__ = "1.1.0"
__author__ = "Your Name"
__email__ = "your.email@example.com"
from .app import create_app
from .config import Config
__all__ = ['create_app', 'Config']

View File

@@ -0,0 +1,100 @@
#!/usr/bin/env python3
"""
Main entry point for the Watermaker PLC API server.
"""
import argparse
import sys
from .app import create_app
from .config import Config
from .utils.logger import get_logger
from .services.background_tasks import start_background_updates
logger = get_logger(__name__)
def parse_args():
"""Parse command line arguments"""
parser = argparse.ArgumentParser(description='Watermaker PLC API Server')
parser.add_argument('--host', default='0.0.0.0',
help='Host to bind to (default: 0.0.0.0)')
parser.add_argument('--port', type=int, default=5000,
help='Port to bind to (default: 5000)')
parser.add_argument('--plc-ip', default='192.168.1.15',
help='PLC IP address (default: 192.168.1.15)')
parser.add_argument('--plc-port', type=int, default=502,
help='PLC Modbus port (default: 502)')
parser.add_argument('--debug', action='store_true',
help='Enable debug mode')
parser.add_argument('--no-background-updates', action='store_true',
help='Disable background data updates')
return parser.parse_args()
def main():
"""Main application entry point"""
args = parse_args()
# Update config with command line arguments
Config.PLC_IP = args.plc_ip
Config.PLC_PORT = args.plc_port
Config.DEBUG = args.debug
logger.info("Starting Watermaker PLC API Server")
logger.info(f"API Version: 1.1.0")
logger.info(f"PLC Target: {Config.PLC_IP}:{Config.PLC_PORT}")
logger.info(f"Server: http://{args.host}:{args.port}")
# Create Flask application
app = create_app()
# Start background data updates (unless disabled)
if not args.no_background_updates:
start_background_updates()
logger.info("Background data update thread started")
else:
logger.warning("Background data updates disabled")
# Log available endpoints
logger.info("Available endpoints:")
logger.info(" Data Monitoring:")
logger.info(" GET /api/all - All PLC data")
logger.info(" GET /api/select - Selective data (bandwidth optimized)")
logger.info(" GET /api/sensors - All sensors")
logger.info(" GET /api/timers - All timers")
logger.info(" GET /api/outputs - Output controls")
logger.info(" GET /api/runtime - Runtime hours")
logger.info(" GET /api/water_counters - Water production counters")
logger.info(" GET /api/status - Connection status")
logger.info(" GET /api/config - API configuration")
logger.info("")
logger.info(" Control Operations:")
logger.info(" POST /api/dts/start - Start DTS watermaker sequence")
logger.info(" POST /api/dts/stop - Stop watermaker sequence")
logger.info(" POST /api/dts/skip - Skip current step")
logger.info(" GET /api/dts/status - Get DTS operation status")
logger.info(" POST /api/write/register - Write single register")
logger.info("")
logger.info(" Examples:")
logger.info(" /api/select?groups=temperature,pressure")
logger.info(" /api/select?keys=1036,1003,1017")
logger.info(" curl -X POST http://localhost:5000/api/dts/start")
try:
# Start the Flask server
app.run(
host=args.host,
port=args.port,
debug=args.debug,
threaded=True
)
except KeyboardInterrupt:
logger.info("Server shutdown requested by user")
sys.exit(0)
except Exception as e:
logger.error(f"Failed to start server: {e}")
sys.exit(1)
if __name__ == '__main__':
main()

79
watermaker_plc_api/app.py Normal file
View File

@@ -0,0 +1,79 @@
"""
Flask application factory and setup.
"""
from flask import Flask
from flask_cors import CORS
from .config import Config
from .utils.logger import get_logger
from .utils.error_handler import setup_error_handlers
# Import controllers
from .controllers.system_controller import system_bp
from .controllers.sensors_controller import sensors_bp
from .controllers.timers_controller import timers_bp
from .controllers.outputs_controller import outputs_bp
from .controllers.dts_controller import dts_bp
logger = get_logger(__name__)
def create_app(config_object=None):
"""
Application factory pattern for creating Flask app.
Args:
config_object: Configuration class or object to use
Returns:
Flask: Configured Flask application
"""
app = Flask(__name__)
# Configure the app
if config_object is None:
config_object = Config
app.config.from_object(config_object)
# Enable CORS for web-based control panels
if config_object.CORS_ENABLED:
CORS(app)
logger.info("CORS enabled for web applications")
# Setup error handlers
setup_error_handlers(app)
# Register blueprints
register_blueprints(app)
# Log application startup
logger.info(f"Flask application created")
logger.info(f"Debug mode: {app.config.get('DEBUG', False)}")
logger.info(f"PLC target: {config_object.PLC_IP}:{config_object.PLC_PORT}")
return app
def register_blueprints(app):
"""Register all route blueprints with the Flask app"""
# System and status endpoints
app.register_blueprint(system_bp, url_prefix='/api')
# Data monitoring endpoints
app.register_blueprint(sensors_bp, url_prefix='/api')
app.register_blueprint(timers_bp, url_prefix='/api')
app.register_blueprint(outputs_bp, url_prefix='/api')
# Control endpoints
app.register_blueprint(dts_bp, url_prefix='/api')
logger.info("All blueprints registered successfully")
# Log registered routes
if app.config.get('DEBUG', False):
logger.debug("Registered routes:")
for rule in app.url_map.iter_rules():
methods = ','.join(rule.methods - {'OPTIONS', 'HEAD'})
logger.debug(f" {rule.rule} [{methods}] -> {rule.endpoint}")

View File

@@ -0,0 +1,94 @@
"""
Configuration settings for the Watermaker PLC API.
"""
import os
from typing import Dict, Any
class Config:
"""Application configuration"""
# Flask Settings
DEBUG = os.getenv('DEBUG', 'False').lower() == 'true'
SECRET_KEY = os.getenv('SECRET_KEY', 'watermaker-plc-api-dev-key')
# PLC Connection Settings
PLC_IP = os.getenv('PLC_IP', '192.168.1.15')
PLC_PORT = int(os.getenv('PLC_PORT', '502'))
PLC_UNIT_ID = int(os.getenv('PLC_UNIT_ID', '1'))
PLC_TIMEOUT = int(os.getenv('PLC_TIMEOUT', '3'))
PLC_CONNECTION_RETRY_INTERVAL = int(os.getenv('PLC_CONNECTION_RETRY_INTERVAL', '30'))
# API Settings
API_VERSION = "1.1"
CORS_ENABLED = True
# Background Task Settings
DATA_UPDATE_INTERVAL = int(os.getenv('DATA_UPDATE_INTERVAL', '5')) # seconds
ERROR_RETRY_INTERVAL = int(os.getenv('ERROR_RETRY_INTERVAL', '10')) # seconds
MAX_CACHED_ERRORS = int(os.getenv('MAX_CACHED_ERRORS', '10'))
# Logging Settings
LOG_LEVEL = os.getenv('LOG_LEVEL', 'INFO').upper()
LOG_FORMAT = '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
@classmethod
def get_plc_config(cls) -> Dict[str, Any]:
"""Get PLC connection configuration"""
return {
"ip_address": cls.PLC_IP,
"port": cls.PLC_PORT,
"unit_id": cls.PLC_UNIT_ID,
"timeout": cls.PLC_TIMEOUT,
"connected": False,
"client": None,
"last_connection_attempt": 0,
"connection_retry_interval": cls.PLC_CONNECTION_RETRY_INTERVAL
}
@classmethod
def get_api_info(cls) -> Dict[str, Any]:
"""Get API information"""
return {
"version": cls.API_VERSION,
"debug": cls.DEBUG,
"plc_target": f"{cls.PLC_IP}:{cls.PLC_PORT}",
"update_interval": f"{cls.DATA_UPDATE_INTERVAL} seconds"
}
class DevelopmentConfig(Config):
"""Development configuration"""
DEBUG = True
PLC_IP = '127.0.0.1' # Simulator for development
class ProductionConfig(Config):
"""Production configuration"""
DEBUG = False
@property
def SECRET_KEY(self):
"""Get SECRET_KEY from environment, required in production"""
secret_key = os.getenv('SECRET_KEY')
if not secret_key:
raise ValueError("SECRET_KEY environment variable must be set in production")
return secret_key
class TestingConfig(Config):
"""Testing configuration"""
TESTING = True
DEBUG = True
PLC_IP = '127.0.0.1'
DATA_UPDATE_INTERVAL = 1 # Faster updates for testing
# Configuration mapping
config_map = {
'development': DevelopmentConfig,
'production': ProductionConfig,
'testing': TestingConfig,
'default': Config
}

View File

@@ -0,0 +1,17 @@
"""
Controller modules for handling API routes and business logic.
"""
from .system_controller import system_bp
from .sensors_controller import sensors_bp
from .timers_controller import timers_bp
from .outputs_controller import outputs_bp
from .dts_controller import dts_bp
__all__ = [
'system_bp',
'sensors_bp',
'timers_bp',
'outputs_bp',
'dts_bp'
]

View File

@@ -0,0 +1,732 @@
"""
DTS controller for watermaker control operations and task management.
"""
import uuid
import time
import threading
from datetime import datetime
from flask import Blueprint, jsonify
from ..services.plc_connection import get_plc_connection
from ..services.register_writer import RegisterWriter
from ..services.data_cache import get_data_cache
from ..utils.logger import get_logger
from ..utils.error_handler import create_error_response, create_success_response, DTSOperationError
logger = get_logger(__name__)
# Create blueprint
dts_bp = Blueprint('dts', __name__)
# Initialize services
plc = get_plc_connection()
writer = RegisterWriter()
cache = get_data_cache()
# DTS operation tracking with task management
dts_operations = {} # Track multiple operations by task_id
def create_dts_task():
"""Create a new DTS task with unique ID"""
task_id = str(uuid.uuid4())[:8] # Short UUID for easy reference
dts_operations[task_id] = {
"task_id": task_id,
"status": "pending", # pending, running, completed, failed, cancelled
"progress_percent": 0,
"current_step": "",
"start_time": None,
"end_time": None,
"steps_completed": [],
"last_error": None,
"step_descriptions": {
"checking_system_mode": "Checking current system mode",
"setting_prep_mode": "Setting preparation mode",
"setting_r71_256": "Activating valve sequence",
"setting_r71_0": "Completing valve command",
"waiting_for_valves": "Waiting for valve positioning",
"starting_dts_mode": "Starting DTS operation",
"completed": "DTS sequence completed",
"failed": "DTS sequence failed"
}
}
return task_id
def get_latest_dts_task():
"""Get the most recent DTS task"""
if not dts_operations:
return None
return max(dts_operations.values(), key=lambda x: x.get("start_time", ""))
def execute_dts_sequence(task_id):
"""
Execute the DTS sequence in background thread.
Updates task status as it progresses.
"""
task = dts_operations[task_id]
try:
task["status"] = "running"
task["start_time"] = datetime.now().isoformat()
task["progress_percent"] = 5
if not plc.connect():
raise DTSOperationError("Failed to connect to PLC")
# Step 1: Check value of R1000
task["current_step"] = "checking_system_mode"
task["progress_percent"] = 10
logger.info("DTS Start: Checking system mode (R1000)")
current_mode = plc.read_holding_register(1000)
if current_mode is None:
raise DTSOperationError("Failed to read system mode register R1000")
logger.info(f"DTS Start: Current system mode = {current_mode}")
task["steps_completed"].append({
"step": "read_r1000",
"value": current_mode,
"timestamp": datetime.now().isoformat()
})
task["progress_percent"] = 20
# Step 2: If not 34, set R1000=34
if current_mode != 34:
task["current_step"] = "setting_prep_mode"
task["progress_percent"] = 25
logger.info("DTS Start: Setting system mode to 34 (prep mode)")
if not writer.write_holding_register(1000, 34):
raise DTSOperationError("Failed to write R1000=34")
task["steps_completed"].append({
"step": "write_r1000_34",
"timestamp": datetime.now().isoformat()
})
# Wait 2 seconds
logger.info("DTS Start: Waiting 2 seconds after mode change")
time.sleep(2)
else:
logger.info("DTS Start: System already in prep mode (34)")
task["steps_completed"].append({
"step": "r1000_already_34",
"timestamp": datetime.now().isoformat()
})
task["progress_percent"] = 35
# Step 3: Set R71=256
task["current_step"] = "setting_r71_256"
task["progress_percent"] = 40
logger.info("DTS Start: Setting R71=256")
if not writer.write_holding_register(71, 256):
raise DTSOperationError("Failed to write R71=256")
task["steps_completed"].append({
"step": "write_r71_256",
"timestamp": datetime.now().isoformat()
})
# Wait 2 seconds
logger.info("DTS Start: Waiting 2 seconds after R71=256")
time.sleep(2)
task["progress_percent"] = 50
# Step 4: Set R71=0
task["current_step"] = "setting_r71_0"
task["progress_percent"] = 55
logger.info("DTS Start: Setting R71=0")
if not writer.write_holding_register(71, 0):
raise DTSOperationError("Failed to write R71=0")
task["steps_completed"].append({
"step": "write_r71_0",
"timestamp": datetime.now().isoformat()
})
task["progress_percent"] = 60
# Step 5: Wait for R138 to count down (valves moving into position)
task["current_step"] = "waiting_for_valves"
task["progress_percent"] = 65
logger.info("DTS Start: Waiting for valve positioning (monitoring R138)")
# Monitor R138 for up to 15 seconds
valve_timeout = time.time() + 15
r138_initial = None
while time.time() < valve_timeout:
r138_value = plc.read_holding_register(138)
if r138_value is None:
logger.warning("Failed to read R138 during valve wait")
time.sleep(0.5)
continue
if r138_initial is None:
r138_initial = r138_value
logger.info(f"DTS Start: R138 initial value = {r138_value}")
# Update progress based on R138 countdown
if r138_initial and r138_initial > 0:
valve_progress = max(0, (r138_initial - r138_value) / r138_initial)
task["progress_percent"] = 65 + int(valve_progress * 20) # 65% to 85%
# When R138 reaches 0, valves are positioned
if r138_value == 0:
logger.info("DTS Start: Valve positioning complete (R138 = 0)")
task["steps_completed"].append({
"step": "valves_positioned",
"r138_initial": r138_initial,
"timestamp": datetime.now().isoformat()
})
break
time.sleep(0.2) # Check every 200ms
else:
# Timeout occurred
raise DTSOperationError("Timeout waiting for valve positioning (R138 did not reach 0)")
task["progress_percent"] = 90
# Step 6: Set R1000=5 (start DTS operation)
task["current_step"] = "starting_dts_mode"
task["progress_percent"] = 95
logger.info("DTS Start: Setting R1000=5 (starting DTS mode)")
if not writer.write_holding_register(1000, 5):
raise DTSOperationError("Failed to write R1000=5")
task["steps_completed"].append({
"step": "write_r1000_5",
"timestamp": datetime.now().isoformat()
})
# Sequence completed successfully
task["current_step"] = "completed"
task["status"] = "completed"
task["progress_percent"] = 100
task["end_time"] = datetime.now().isoformat()
logger.info("DTS Start: Sequence completed successfully")
except Exception as e:
# Error occurred during sequence
error_msg = str(e)
logger.error(f"DTS Start: Error during sequence: {error_msg}")
task["last_error"] = {
"message": error_msg,
"step": task["current_step"],
"timestamp": datetime.now().isoformat()
}
task["status"] = "failed"
task["current_step"] = "failed"
task["end_time"] = datetime.now().isoformat()
cache.add_error(f"DTS Start Failed: {error_msg}")
def execute_stop_sequence(task_id):
"""
Execute the watermaker stop sequence in background thread.
Stop sequence varies based on current system mode (R1000).
"""
task = dts_operations[task_id]
try:
task["status"] = "running"
task["start_time"] = datetime.now().isoformat()
task["progress_percent"] = 5
if not plc.connect():
raise DTSOperationError("Failed to connect to PLC")
# Step 1: Read current system mode
task["current_step"] = "reading_system_mode"
task["progress_percent"] = 10
logger.info("Stop: Reading current system mode (R1000)")
current_mode = plc.read_holding_register(1000)
if current_mode is None:
raise DTSOperationError("Failed to read system mode register R1000")
logger.info(f"Stop: Current system mode = {current_mode}")
task["steps_completed"].append({
"step": "read_current_mode",
"value": current_mode,
"timestamp": datetime.now().isoformat()
})
task["progress_percent"] = 20
# Step 2: Execute appropriate stop sequence based on mode
if current_mode == 7:
# Mode 7 stop sequence
task["current_step"] = "stopping_mode_7"
task["progress_percent"] = 30
logger.info("Stop: Executing mode 7 stop sequence (R71=513)")
if not writer.write_holding_register(71, 513):
raise DTSOperationError("Failed to write R71=513")
task["steps_completed"].append({
"step": "write_r71_513",
"timestamp": datetime.now().isoformat()
})
time.sleep(1)
task["progress_percent"] = 60
if not writer.write_holding_register(71, 0):
raise DTSOperationError("Failed to write R71=0 after 513")
task["steps_completed"].append({
"step": "write_r71_0_after_513",
"timestamp": datetime.now().isoformat()
})
task["progress_percent"] = 80
if not writer.write_holding_register(1000, 8):
raise DTSOperationError("Failed to write R1000=8")
task["steps_completed"].append({
"step": "write_r1000_8",
"timestamp": datetime.now().isoformat()
})
elif current_mode == 5:
# Mode 5 stop sequence
task["current_step"] = "stopping_mode_5"
task["progress_percent"] = 30
logger.info("Stop: Executing mode 5 stop sequence (R71=512)")
if not writer.write_holding_register(71, 512):
raise DTSOperationError("Failed to write R71=512")
task["steps_completed"].append({
"step": "write_r71_512",
"timestamp": datetime.now().isoformat()
})
time.sleep(1)
task["progress_percent"] = 60
if not writer.write_holding_register(71, 0):
raise DTSOperationError("Failed to write R71=0 after 512")
task["steps_completed"].append({
"step": "write_r71_0_after_512",
"timestamp": datetime.now().isoformat()
})
task["progress_percent"] = 80
if not writer.write_holding_register(1000, 8):
raise DTSOperationError("Failed to write R1000=8")
task["steps_completed"].append({
"step": "write_r1000_8",
"timestamp": datetime.now().isoformat()
})
elif current_mode == 8:
# Mode 8 stop sequence (flush screen)
task["current_step"] = "stopping_mode_8_flush"
task["progress_percent"] = 30
logger.info("Stop: Executing mode 8 flush stop sequence (R71=1024)")
if not writer.write_holding_register(71, 1024):
raise DTSOperationError("Failed to write R71=1024")
task["steps_completed"].append({
"step": "write_r71_1024",
"timestamp": datetime.now().isoformat()
})
time.sleep(1)
task["progress_percent"] = 60
if not writer.write_holding_register(71, 0):
raise DTSOperationError("Failed to write R71=0 after 1024")
task["steps_completed"].append({
"step": "write_r71_0_after_1024",
"timestamp": datetime.now().isoformat()
})
task["progress_percent"] = 80
if not writer.write_holding_register(1000, 2):
raise DTSOperationError("Failed to write R1000=2")
task["steps_completed"].append({
"step": "write_r1000_2",
"timestamp": datetime.now().isoformat()
})
else:
raise DTSOperationError(f"Cannot stop from current mode {current_mode}. Valid modes: 5, 7, 8")
# Sequence completed successfully
task["current_step"] = "completed"
task["status"] = "completed"
task["progress_percent"] = 100
task["end_time"] = datetime.now().isoformat()
logger.info(f"Stop: Sequence completed successfully from mode {current_mode}")
except Exception as e:
# Error occurred during sequence
error_msg = str(e)
logger.error(f"Stop: Error during sequence: {error_msg}")
task["last_error"] = {
"message": error_msg,
"step": task["current_step"],
"timestamp": datetime.now().isoformat()
}
task["status"] = "failed"
task["current_step"] = "failed"
task["end_time"] = datetime.now().isoformat()
cache.add_error(f"Stop Failed: {error_msg}")
def execute_skip_sequence(task_id):
"""
Execute step skip sequence in background thread.
Automatically determines next step based on current mode.
"""
task = dts_operations[task_id]
try:
task["status"] = "running"
task["start_time"] = datetime.now().isoformat()
task["progress_percent"] = 5
if not plc.connect():
raise DTSOperationError("Failed to connect to PLC")
# Step 1: Read current system mode
task["current_step"] = "reading_system_mode"
task["progress_percent"] = 10
logger.info("Skip: Reading current system mode (R1000)")
current_mode = plc.read_holding_register(1000)
if current_mode is None:
raise DTSOperationError("Failed to read system mode register R1000")
logger.info(f"Skip: Current system mode = {current_mode}")
task["steps_completed"].append({
"step": "read_current_mode",
"value": current_mode,
"timestamp": datetime.now().isoformat()
})
task["progress_percent"] = 20
# Step 2: Execute appropriate skip sequence
if current_mode == 5:
# Skip step 2: Mode 5 -> step 3 (Mode 6)
target_step = 3
task["current_step"] = "skipping_step_2"
task["progress_percent"] = 40
logger.info("Skip: Skipping step 2 from mode 5 to step 3 (R67=32841)")
if not writer.write_holding_register(67, 32841):
raise DTSOperationError("Failed to write R67=32841")
task["steps_completed"].append({
"step": "write_r67_32841",
"from_mode": 5,
"to_step": 3,
"note": "PLC will advance to R1000=6",
"timestamp": datetime.now().isoformat()
})
task["progress_percent"] = 80
elif current_mode == 6:
# Skip step 3: Mode 6 -> step 4 (Mode 7)
target_step = 4
task["current_step"] = "skipping_step_3"
task["progress_percent"] = 40
logger.info("Skip: Skipping step 3 from mode 6 to step 4 (R67=32968)")
if not writer.write_holding_register(67, 32968):
raise DTSOperationError("Failed to write R67=32968")
task["steps_completed"].append({
"step": "write_r67_32968",
"from_mode": 6,
"to_step": 4,
"timestamp": datetime.now().isoformat()
})
time.sleep(1)
task["progress_percent"] = 70
if not writer.write_holding_register(1000, 7):
raise DTSOperationError("Failed to write R1000=7")
task["steps_completed"].append({
"step": "write_r1000_7",
"from_mode": 6,
"to_step": 4,
"timestamp": datetime.now().isoformat()
})
task["progress_percent"] = 80
else:
raise DTSOperationError(f"Cannot skip from current mode {current_mode}. Skip only available from modes 5 (step 2) or 6 (step 3)")
# Sequence completed successfully
task["current_step"] = "completed"
task["status"] = "completed"
task["progress_percent"] = 100
task["end_time"] = datetime.now().isoformat()
logger.info(f"Skip: Sequence completed successfully from mode {current_mode} to step {target_step}")
except Exception as e:
# Error occurred during sequence
error_msg = str(e)
logger.error(f"Skip: Error during sequence: {error_msg}")
task["last_error"] = {
"message": error_msg,
"step": task["current_step"],
"timestamp": datetime.now().isoformat()
}
task["status"] = "failed"
task["current_step"] = "failed"
task["end_time"] = datetime.now().isoformat()
cache.add_error(f"Skip Failed: {error_msg}")
def start_dts_sequence_async():
"""Start DTS sequence asynchronously"""
# Check if another operation is running
for task in dts_operations.values():
if task["status"] == "running":
return False, "Operation already in progress", {"existing_task_id": task["task_id"]}
# Create new task
task_id = create_dts_task()
# Start background thread
thread = threading.Thread(target=execute_dts_sequence, args=(task_id,), daemon=True)
thread.start()
logger.info(f"DTS Start: Started async operation with task_id {task_id}")
return True, "DTS sequence started", {"task_id": task_id}
def start_stop_sequence_async():
"""Start watermaker stop sequence asynchronously"""
# Check if another operation is running
for task in dts_operations.values():
if task["status"] == "running":
return False, "Operation already in progress", {"existing_task_id": task["task_id"]}
# Create new task
task_id = create_dts_task()
task = dts_operations[task_id]
task["step_descriptions"].update({
"reading_system_mode": "Reading current system mode",
"stopping_mode_5": "Stopping from DTS mode (5)",
"stopping_mode_7": "Stopping from service mode (7)",
"stopping_mode_8_flush": "Stopping from flush mode (8)",
"completed": "Stop sequence completed",
"failed": "Stop sequence failed"
})
# Start background thread
thread = threading.Thread(target=execute_stop_sequence, args=(task_id,), daemon=True)
thread.start()
logger.info(f"Stop: Started async operation with task_id {task_id}")
return True, "Stop sequence started", {"task_id": task_id}
def start_skip_sequence_async():
"""Start step skip sequence asynchronously"""
# Check if another operation is running
for task in dts_operations.values():
if task["status"] == "running":
return False, "Operation already in progress", {"existing_task_id": task["task_id"]}
# Create new task
task_id = create_dts_task()
task = dts_operations[task_id]
task["step_descriptions"].update({
"reading_system_mode": "Reading current system mode",
"skipping_step_2": "Skipping step 2 (mode 5 → step 3)",
"skipping_step_3": "Skipping step 3 (mode 6 → step 4)",
"completed": "Skip sequence completed",
"failed": "Skip sequence failed"
})
# Start background thread
thread = threading.Thread(target=execute_skip_sequence, args=(task_id,), daemon=True)
thread.start()
logger.info(f"Skip: Started async operation with task_id {task_id}")
return True, "Skip sequence started", {"task_id": task_id}
# DTS Control Endpoints
@dts_bp.route('/dts/start', methods=['POST'])
def start_dts():
"""Start DTS watermaker sequence asynchronously"""
success, message, details = start_dts_sequence_async()
if success:
return create_success_response(
message,
{
"task_id": details["task_id"],
"status_endpoint": f"/api/dts/status/{details['task_id']}",
"polling_info": {
"recommended_interval": "1 second",
"check_status_at": f"/api/dts/status/{details['task_id']}"
}
},
202 # 202 Accepted (async operation started)
)
else:
return create_error_response(
"Conflict",
message,
409, # 409 Conflict (operation already running)
details
)
@dts_bp.route('/dts/stop', methods=['POST'])
def stop_watermaker():
"""Stop watermaker sequence (mode-dependent)"""
success, message, details = start_stop_sequence_async()
if success:
return create_success_response(
message,
{
"task_id": details["task_id"],
"status_endpoint": f"/api/dts/status/{details['task_id']}",
"polling_info": {
"recommended_interval": "1 second",
"check_status_at": f"/api/dts/status/{details['task_id']}"
},
"note": "Stop sequence varies by current mode (5, 7, or 8)"
},
202 # 202 Accepted (async operation started)
)
else:
return create_error_response(
"Conflict",
message,
409, # 409 Conflict (operation already running)
details
)
@dts_bp.route('/dts/skip', methods=['POST'])
def skip_step():
"""Skip current step automatically (determines next step based on current mode)"""
success, message, details = start_skip_sequence_async()
if success:
return create_success_response(
message,
{
"task_id": details["task_id"],
"status_endpoint": f"/api/dts/status/{details['task_id']}",
"polling_info": {
"recommended_interval": "1 second",
"check_status_at": f"/api/dts/status/{details['task_id']}"
},
"note": "Auto-skip: Mode 5 → Step 3, Mode 6 → Step 4",
"valid_from_modes": [5, 6]
},
202 # 202 Accepted (async operation started)
)
else:
return create_error_response(
"Conflict",
message,
409, # 409 Conflict (operation already running)
details
)
@dts_bp.route('/dts/status')
@dts_bp.route('/dts/status/<task_id>')
def get_dts_status(task_id=None):
"""Get DTS operation status for specific task or latest task"""
if task_id:
# Get specific task
task = dts_operations.get(task_id)
if not task:
return create_error_response(
"Not Found",
f"Task {task_id} not found",
404,
{"available_tasks": list(dts_operations.keys())}
)
# Add user-friendly progress info
task_response = dict(task)
task_response["step_description"] = task["step_descriptions"].get(task["current_step"], task["current_step"])
task_response["is_complete"] = task["status"] in ["completed", "failed", "cancelled"]
task_response["is_running"] = task["status"] == "running"
return jsonify({
"task": task_response,
"timestamp": datetime.now().isoformat()
})
else:
# Get latest task or all tasks summary
latest_task = get_latest_dts_task()
if latest_task:
latest_task["step_description"] = latest_task["step_descriptions"].get(
latest_task["current_step"], latest_task["current_step"]
)
latest_task["is_complete"] = latest_task["status"] in ["completed", "failed", "cancelled"]
latest_task["is_running"] = latest_task["status"] == "running"
return jsonify({
"latest_task": latest_task,
"total_tasks": len(dts_operations),
"active_tasks": [t["task_id"] for t in dts_operations.values() if t["status"] == "running"],
"all_task_ids": list(dts_operations.keys())[-5:], # Last 5 task IDs
"timestamp": datetime.now().isoformat()
})
@dts_bp.route('/dts/cancel/<task_id>', methods=['POST'])
def cancel_dts_task(task_id):
"""Cancel a running DTS task (if possible)"""
task = dts_operations.get(task_id)
if not task:
return create_error_response(
"Not Found",
f"Task {task_id} not found",
404
)
if task["status"] != "running":
return create_error_response(
"Bad Request",
f"Task {task_id} is not running (status: {task['status']})",
400
)
# Mark task as cancelled
task["status"] = "cancelled"
task["current_step"] = "cancelled"
task["end_time"] = datetime.now().isoformat()
task["last_error"] = {
"message": "Operation cancelled by user",
"timestamp": datetime.now().isoformat()
}
return create_success_response(
f"Task {task_id} marked as cancelled",
{"note": "Background operation may continue briefly"}
)

View File

@@ -0,0 +1,46 @@
"""
Outputs controller for digital output control endpoints.
"""
from flask import Blueprint, jsonify
from ..services.data_cache import get_data_cache
from ..utils.logger import get_logger
logger = get_logger(__name__)
# Create blueprint
outputs_bp = Blueprint('outputs', __name__)
# Initialize services
cache = get_data_cache()
@outputs_bp.route('/outputs')
def get_outputs():
"""Get output control data"""
outputs = cache.get_outputs()
return jsonify({
"outputs": outputs,
"last_update": cache.get_last_update(),
"count": len(outputs)
})
@outputs_bp.route('/outputs/active')
def get_active_outputs():
"""Get only active output controls"""
active_outputs = cache.get_active_outputs()
# Calculate total active outputs across all registers
total_active = sum(
len(output.get("active_bits", []))
for output in active_outputs.values()
)
return jsonify({
"active_outputs": active_outputs,
"total_active": total_active,
"register_count": len(active_outputs),
"last_update": cache.get_last_update()
})

View File

@@ -0,0 +1,75 @@
"""
Sensors controller for sensor data and runtime/water counter endpoints.
"""
from flask import Blueprint, jsonify
from ..services.data_cache import get_data_cache
from ..utils.logger import get_logger
from ..utils.error_handler import create_error_response
logger = get_logger(__name__)
# Create blueprint
sensors_bp = Blueprint('sensors', __name__)
# Initialize services
cache = get_data_cache()
@sensors_bp.route('/sensors')
def get_sensors():
"""Get all sensor data"""
sensors = cache.get_sensors()
last_update = cache.get_last_update()
return jsonify({
"sensors": sensors,
"last_update": last_update,
"count": len(sensors)
})
@sensors_bp.route('/sensors/category/<category>')
def get_sensors_by_category(category):
"""Get sensors filtered by category"""
valid_categories = ['system', 'pressure', 'temperature', 'flow', 'quality']
if category not in valid_categories:
return create_error_response(
"Bad Request",
f"Invalid category '{category}'. Valid categories: {', '.join(valid_categories)}",
400
)
filtered_sensors = cache.get_sensors_by_category(category)
return jsonify({
"category": category,
"sensors": filtered_sensors,
"count": len(filtered_sensors),
"last_update": cache.get_last_update()
})
@sensors_bp.route('/runtime')
def get_runtime():
"""Get runtime hours data (IEEE 754 float)"""
runtime_data = cache.get_runtime()
return jsonify({
"runtime": runtime_data,
"last_update": cache.get_last_update(),
"count": len(runtime_data)
})
@sensors_bp.route('/water_counters')
def get_water_counters():
"""Get water production counter data (gallon totals)"""
water_counter_data = cache.get_water_counters()
return jsonify({
"water_counters": water_counter_data,
"last_update": cache.get_last_update(),
"count": len(water_counter_data)
})

View File

@@ -0,0 +1,359 @@
"""
System controller for status, configuration, and general API endpoints.
"""
from flask import Blueprint, jsonify, request
from datetime import datetime
from ..config import Config
from ..services.data_cache import get_data_cache
from ..services.plc_connection import get_plc_connection
from ..services.register_reader import RegisterReader
from ..services.register_writer import RegisterWriter
from ..utils.logger import get_logger
from ..utils.error_handler import create_error_response, create_success_response, RegisterWriteError, PLCConnectionError
logger = get_logger(__name__)
# Create blueprint
system_bp = Blueprint('system', __name__)
# Initialize services
cache = get_data_cache()
plc = get_plc_connection()
reader = RegisterReader()
writer = RegisterWriter()
@system_bp.route('/status')
def get_status():
"""Get connection and system status"""
plc_status = plc.get_connection_status()
return jsonify({
"connection_status": cache.get_connection_status(),
"last_update": cache.get_last_update(),
"plc_config": {
"ip": plc_status["ip_address"],
"port": plc_status["port"],
"connected": plc_status["connected"]
},
"timestamp": datetime.now().isoformat()
})
@system_bp.route('/all')
def get_all_data():
"""Get all PLC data in one response"""
all_data = cache.get_all_data()
summary = cache.get_summary_stats()
return jsonify({
"status": {
"connection_status": all_data["connection_status"],
"last_update": all_data["last_update"],
"connected": plc.is_connected
},
"sensors": all_data["sensors"],
"timers": all_data["timers"],
"rtc": all_data["rtc"],
"outputs": all_data["outputs"],
"runtime": all_data["runtime"],
"water_counters": all_data["water_counters"],
"summary": summary
})
@system_bp.route('/select')
def get_selected_data():
"""Get only selected variables by groups and/or keys to reduce bandwidth and PLC traffic"""
# Get query parameters
groups_param = request.args.get('groups', '')
keys_param = request.args.get('keys', '')
# Parse groups and keys
requested_groups = [g.strip() for g in groups_param.split(',') if g.strip()] if groups_param else []
requested_keys = [k.strip() for k in keys_param.split(',') if k.strip()] if keys_param else []
if not requested_groups and not requested_keys:
return create_error_response(
"Bad Request",
"Must specify either 'groups' or 'keys' parameter",
400,
{
"usage": {
"groups": "Comma-separated list: system,pressure,temperature,flow,quality,fwf_timer,dts_timer,rtc,outputs,runtime,water_counters",
"keys": "Comma-separated list of register numbers: 1000,1003,1017,136,138,5014,5024",
"examples": [
"/api/select?groups=temperature,pressure",
"/api/select?keys=1036,1003,1017",
"/api/select?groups=dts_timer&keys=1036",
"/api/select?groups=runtime,water_counters"
]
}
}
)
# Check PLC connection
if not plc.is_connected:
if not plc.connect():
return create_error_response(
"Service Unavailable",
"PLC connection failed",
503,
{"connection_status": cache.get_connection_status()}
)
try:
# Read selective data
result = reader.read_selective_data(requested_groups, requested_keys)
result["timestamp"] = datetime.now().isoformat()
return jsonify(result)
except Exception as e:
logger.error(f"Error reading selective data: {e}")
return create_error_response(
"Internal Server Error",
f"Failed to read selective data: {str(e)}",
500
)
@system_bp.route('/errors')
def get_errors():
"""Get recent errors"""
errors = cache.get_errors(limit=10)
return jsonify({
"errors": errors,
"count": len(errors)
})
@system_bp.route('/write/register', methods=['POST'])
def write_register():
"""Write to a single holding register"""
try:
# Check if request has JSON data
if not request.is_json:
return create_error_response(
"Bad Request",
"Request must be JSON with Content-Type: application/json",
400
)
data = request.get_json()
if not data or 'address' not in data or 'value' not in data:
return create_error_response(
"Bad Request",
"Must provide 'address' and 'value' in JSON body",
400
)
address = int(data['address'])
value = int(data['value'])
# Validate the write operation
is_valid, error_msg = writer.validate_write_operation(address, value)
if not is_valid:
return create_error_response("Bad Request", error_msg, 400)
# Perform the write
success = writer.write_holding_register(address, value)
return create_success_response(
f"Successfully wrote {value} to register {address}",
{
"address": address,
"value": value,
"timestamp": datetime.now().isoformat()
}
)
except ValueError as e:
return create_error_response(
"Bad Request",
f"Invalid address or value: {e}",
400
)
except (RegisterWriteError, PLCConnectionError) as e:
return create_error_response(
"Service Unavailable",
str(e),
503
)
except Exception as e:
logger.error(f"Unexpected error in write_register: {e}")
return create_error_response(
"Internal Server Error",
"An unexpected error occurred",
500
)
@system_bp.route('/config')
def get_config():
"""Get API configuration and available endpoints"""
return jsonify({
"api_version": Config.API_VERSION,
"endpoints": {
"/api/status": "Connection and system status",
"/api/sensors": "All sensor data",
"/api/sensors/category/<category>": "Sensors by category (system, pressure, temperature, flow, quality)",
"/api/timers": "All timer data",
"/api/timers/dts": "DTS timer data",
"/api/timers/fwf": "FWF timer data",
"/api/rtc": "Real-time clock data",
"/api/outputs": "Output control data",
"/api/outputs/active": "Active output controls only",
"/api/runtime": "Runtime hours data (IEEE 754 float)",
"/api/water_counters": "Water production counters (gallon totals)",
"/api/all": "All data in one response",
"/api/select": "Selective data retrieval (groups and/or keys) - BANDWIDTH OPTIMIZED",
"/api/errors": "Recent errors",
"/api/config": "This configuration",
"/api/dts/start": "POST - Start DTS watermaker sequence (async)",
"/api/dts/stop": "POST - Stop watermaker sequence (async, mode-dependent)",
"/api/dts/skip": "POST - Skip current step automatically (async)",
"/api/dts/status": "Get latest DTS operation status",
"/api/dts/status/<task_id>": "Get specific DTS task status",
"/api/dts/cancel/<task_id>": "POST - Cancel running DTS task",
"/api/write/register": "POST - Write single holding register"
},
"control_endpoints": {
"/api/dts/start": {
"method": "POST",
"description": "Start DTS watermaker sequence (ASYNC)",
"parameters": "None required",
"returns": "task_id for status polling",
"response_time": "< 100ms (immediate)",
"sequence": [
"Check R1000 value",
"Set R1000=34 if not already",
"Wait 2 seconds",
"Set R71=256",
"Wait 2 seconds",
"Set R71=0",
"Monitor R138 for valve positioning",
"Set R1000=5 to start DTS mode"
],
"polling": {
"status_endpoint": "/api/dts/status/{task_id}",
"recommended_interval": "1 second",
"total_duration": "~10 seconds"
}
},
"/api/dts/stop": {
"method": "POST",
"description": "Stop watermaker sequence (ASYNC, mode-dependent)",
"parameters": "None required",
"returns": "task_id for status polling",
"response_time": "< 100ms (immediate)",
"mode_sequences": {
"mode_5_dts": "R71=512, wait 1s, R71=0, R1000=8",
"mode_7_service": "R71=513, wait 1s, R71=0, R1000=8",
"mode_8_flush": "R71=1024, wait 1s, R71=0, R1000=2"
},
"note": "Watermaker always ends with flush screen (mode 8)"
},
"/api/dts/skip": {
"method": "POST",
"description": "Skip current step automatically (ASYNC)",
"parameters": "None required - auto-determines next step",
"returns": "task_id for status polling",
"response_time": "< 100ms (immediate)",
"auto_logic": {
"from_mode_5": "Skip step 2 → step 3: R67=32841 (PLC advances to mode 6)",
"from_mode_6": "Skip step 3 → step 4: R67=32968, wait 1s, R1000=7"
},
"valid_from_modes": [5, 6],
"example": "/api/dts/skip"
},
"/api/write/register": {
"method": "POST",
"description": "Write single holding register",
"body": {"address": "register_number", "value": "value_to_write"},
"example": {"address": 1000, "value": 5}
}
},
"variable_groups": {
"system": {
"description": "System status and operational mode",
"keys": ["1000", "1036"],
"count": 2
},
"pressure": {
"description": "Water pressure sensors",
"keys": ["1003", "1007", "1008"],
"count": 3
},
"temperature": {
"description": "Temperature monitoring",
"keys": ["1017", "1125"],
"count": 2
},
"flow": {
"description": "Flow rate meters",
"keys": ["1120", "1121", "1122"],
"count": 3
},
"quality": {
"description": "Water quality (TDS) sensors",
"keys": ["1123", "1124"],
"count": 2
},
"fwf_timer": {
"description": "Fresh water flush timers",
"keys": ["136"],
"count": 1
},
"dts_timer": {
"description": "DTS process step timers",
"keys": ["138", "128", "129", "133", "135", "139"],
"count": 6
},
"rtc": {
"description": "Real-time clock registers",
"keys": ["513", "514", "516", "517", "518", "519"],
"count": 6
},
"outputs": {
"description": "Digital output controls",
"keys": ["257", "258", "259", "260", "264", "265"],
"count": 6
},
"runtime": {
"description": "System runtime hours (IEEE 754 float)",
"keys": ["5014"],
"count": 1,
"note": "32-bit float from register pairs R5014+R5015"
},
"water_counters": {
"description": "Water production counters (gallon totals)",
"keys": ["5024", "5028", "5032", "5034"],
"count": 4,
"note": "32-bit floats from register pairs (Single/Double/DTS Total/Since Last)"
}
},
"selective_api_usage": {
"endpoint": "/api/select",
"description": "Retrieve only specified variables to reduce bandwidth and PLC traffic",
"parameters": {
"groups": "Comma-separated group names (system,pressure,temperature,flow,quality,fwf_timer,dts_timer,rtc,outputs,runtime,water_counters)",
"keys": "Comma-separated register numbers (1000,1003,1017,136,etc.)"
},
"examples": {
"temperature_and_pressure": "/api/select?groups=temperature,pressure",
"specific_sensors": "/api/select?keys=1036,1003,1017,1121",
"dts_monitoring": "/api/select?groups=dts_timer&keys=1036",
"critical_only": "/api/select?keys=1036,1003,1121,1123",
"runtime_and_counters": "/api/select?groups=runtime,water_counters"
}
},
"total_variables": 36,
"update_interval": f"{Config.DATA_UPDATE_INTERVAL} seconds (full scan) / on-demand (selective)",
"plc_config": {
"ip": Config.PLC_IP,
"port": Config.PLC_PORT
}
})

View File

@@ -0,0 +1,78 @@
"""
Timers controller for timer and RTC data endpoints.
"""
from flask import Blueprint, jsonify
from ..services.data_cache import get_data_cache
from ..utils.logger import get_logger
logger = get_logger(__name__)
# Create blueprint
timers_bp = Blueprint('timers', __name__)
# Initialize services
cache = get_data_cache()
@timers_bp.route('/timers')
def get_timers():
"""Get all timer data"""
timers = cache.get_timers()
active_timers = cache.get_active_timers()
return jsonify({
"timers": timers,
"last_update": cache.get_last_update(),
"active_timers": active_timers,
"total_count": len(timers),
"active_count": len(active_timers)
})
@timers_bp.route('/timers/dts')
def get_dts_timers():
"""Get DTS timer data"""
dts_timers = cache.get_timers_by_category("dts_timer")
active_dts_timers = [
addr for addr, timer in dts_timers.items()
if timer.get("active", False)
]
return jsonify({
"dts_timers": dts_timers,
"active_timers": active_dts_timers,
"total_count": len(dts_timers),
"active_count": len(active_dts_timers),
"last_update": cache.get_last_update()
})
@timers_bp.route('/timers/fwf')
def get_fwf_timers():
"""Get Fresh Water Flush timer data"""
fwf_timers = cache.get_timers_by_category("fwf_timer")
active_fwf_timers = [
addr for addr, timer in fwf_timers.items()
if timer.get("active", False)
]
return jsonify({
"fwf_timers": fwf_timers,
"active_timers": active_fwf_timers,
"total_count": len(fwf_timers),
"active_count": len(active_fwf_timers),
"last_update": cache.get_last_update()
})
@timers_bp.route('/rtc')
def get_rtc():
"""Get real-time clock data"""
rtc_data = cache.get_rtc()
return jsonify({
"rtc": rtc_data,
"last_update": cache.get_last_update(),
"count": len(rtc_data)
})

View File

@@ -0,0 +1,20 @@
"""
Data models and register mappings for PLC variables.
"""
from .sensor_mappings import KNOWN_SENSORS, get_sensor_by_category
from .timer_mappings import TIMER_REGISTERS, RTC_REGISTERS, get_timer_by_category
from .output_mappings import OUTPUT_CONTROLS, get_output_controls
from .runtime_mappings import RUNTIME_REGISTERS, WATER_COUNTER_REGISTERS
__all__ = [
'KNOWN_SENSORS',
'TIMER_REGISTERS',
'RTC_REGISTERS',
'OUTPUT_CONTROLS',
'RUNTIME_REGISTERS',
'WATER_COUNTER_REGISTERS',
'get_sensor_by_category',
'get_timer_by_category',
'get_output_controls'
]

View File

@@ -0,0 +1,151 @@
"""
Output control register mappings and configuration.
"""
from typing import Dict, List, Any
# Output control mappings
OUTPUT_CONTROLS = {
257: {"name": "Low Pressure Pump", "register": 40017, "bit": 0},
258: {"name": "High Pressure Pump", "register": 40017, "bit": 1},
259: {"name": "Product Divert Valve", "register": 40017, "bit": 2},
260: {"name": "Flush solenoid", "register": 40017, "bit": 3},
264: {"name": "Double Pass Solenoid", "register": 40017, "bit": 7},
265: {"name": "Shore Feed Solenoid", "register": 40017, "bit": 8}
}
def get_output_controls() -> Dict[int, Dict[str, Any]]:
"""
Get all output control configurations.
Returns:
Dict of output control configurations
"""
return OUTPUT_CONTROLS.copy()
def get_output_registers() -> List[int]:
"""
Get list of unique output register addresses.
Returns:
List of register addresses (e.g., [40017, 40018, ...])
"""
registers = set()
for config in OUTPUT_CONTROLS.values():
registers.add(config["register"])
return sorted(list(registers))
def get_output_addresses_by_group(group: str) -> List[int]:
"""
Get output addresses for the outputs group.
Args:
group: Group name ("outputs")
Returns:
List of output control addresses
"""
if group == "outputs":
return list(OUTPUT_CONTROLS.keys())
return []
def get_controls_by_register(register: int) -> Dict[int, Dict[str, Any]]:
"""
Get all output controls for a specific register.
Args:
register: Register address (e.g., 40017)
Returns:
Dict of controls mapped to that register
"""
return {
addr: config for addr, config in OUTPUT_CONTROLS.items()
if config["register"] == register
}
def validate_output_address(address: int) -> bool:
"""
Check if an address is a known output control.
Args:
address: Output control address to validate
Returns:
True if address is a known output control
"""
return address in OUTPUT_CONTROLS
def get_output_info(address: int) -> Dict[str, Any]:
"""
Get configuration info for a specific output control.
Args:
address: Output control address
Returns:
Output configuration dict or empty dict if not found
"""
return OUTPUT_CONTROLS.get(address, {})
def calculate_modbus_address(register: int) -> int:
"""
Convert holding register address to Modbus address.
Args:
register: Holding register address (e.g., 40017)
Returns:
Modbus address (e.g., 16)
"""
return register - 40001
def extract_bit_value(register_value: int, bit_position: int) -> int:
"""
Extract a specific bit value from a register.
Args:
register_value: Full register value
bit_position: Bit position (0-15)
Returns:
Bit value (0 or 1)
"""
return (register_value >> bit_position) & 1
def create_output_bit_info(register: int, register_value: int) -> List[Dict[str, Any]]:
"""
Create bit information for all outputs in a register.
Args:
register: Register address
register_value: Current register value
Returns:
List of bit information dicts
"""
bits = []
for bit in range(16):
bit_value = extract_bit_value(register_value, bit)
output_addr = ((register - 40017) * 16) + (bit + 1) + 256
control_info = OUTPUT_CONTROLS.get(output_addr, {})
bits.append({
"bit": bit,
"address": output_addr,
"value": bit_value,
"name": control_info.get("name", f"Output {output_addr}"),
"active": bit_value == 1
})
return bits

View File

@@ -0,0 +1,165 @@
"""
Runtime and water counter register mappings (32-bit values from register pairs).
"""
from typing import Dict, List, Any
# Runtime register mappings (32-bit IEEE 754 float pairs)
RUNTIME_REGISTERS = {
5014: {"name": "Runtime Hours", "scale": "ieee754", "unit": "hours", "category": "runtime",
"pair_register": 5015, "description": "Total system runtime"}
}
# Water counter register mappings (32-bit gallon counters)
WATER_COUNTER_REGISTERS = {
5024: {"name": "Single-Pass Total Gallons", "scale": "gallon_counter", "unit": "gallons", "category": "water_counters",
"pair_register": 5025, "description": "Total single-pass water produced"},
5026: {"name": "Single-Pass Total Gallons since last", "scale": "gallon_counter", "unit": "gallons", "category": "water_counters",
"pair_register": 5027, "description": "Total single-pass water produced since last"},
5028: {"name": "Double-Pass Total Gallons", "scale": "gallon_counter", "unit": "gallons", "category": "water_counters",
"pair_register": 5029, "description": "Total double-pass water produced"},
5030: {"name": "Double-Pass Total Gallons since last", "scale": "gallon_counter", "unit": "gallons", "category": "water_counters",
"pair_register": 5031, "description": "Total double-pass water produced since last"},
5032: {"name": "DTS Total Gallons", "scale": "gallon_counter", "unit": "gallons", "category": "water_counters",
"pair_register": 5033, "description": "Total DTS water produced"},
5034: {"name": "DTS Since Last Gallons", "scale": "gallon_counter", "unit": "gallons", "category": "water_counters",
"pair_register": 5035, "description": "DTS water since last reset"}
}
def get_runtime_registers() -> Dict[int, Dict[str, Any]]:
"""
Get all runtime register configurations.
Returns:
Dict of runtime register configurations
"""
return RUNTIME_REGISTERS.copy()
def get_water_counter_registers() -> Dict[int, Dict[str, Any]]:
"""
Get all water counter register configurations.
Returns:
Dict of water counter register configurations
"""
return WATER_COUNTER_REGISTERS.copy()
def get_runtime_addresses_by_group(group: str) -> List[int]:
"""
Get register addresses for runtime-related groups.
Args:
group: Group name (runtime, water_counters)
Returns:
List of register addresses
"""
if group == "runtime":
return list(RUNTIME_REGISTERS.keys())
elif group == "water_counters":
return list(WATER_COUNTER_REGISTERS.keys())
return []
def validate_runtime_address(address: int) -> bool:
"""
Check if an address is a known runtime register.
Args:
address: Register address to validate
Returns:
True if address is a known runtime register
"""
return address in RUNTIME_REGISTERS
def validate_water_counter_address(address: int) -> bool:
"""
Check if an address is a known water counter register.
Args:
address: Register address to validate
Returns:
True if address is a known water counter register
"""
return address in WATER_COUNTER_REGISTERS
def get_runtime_info(address: int) -> Dict[str, Any]:
"""
Get configuration info for a specific runtime register.
Args:
address: Runtime register address
Returns:
Runtime configuration dict or empty dict if not found
"""
return RUNTIME_REGISTERS.get(address, {})
def get_water_counter_info(address: int) -> Dict[str, Any]:
"""
Get configuration info for a specific water counter register.
Args:
address: Water counter register address
Returns:
Water counter configuration dict or empty dict if not found
"""
return WATER_COUNTER_REGISTERS.get(address, {})
def get_register_pair(address: int) -> tuple:
"""
Get the register pair (high, low) for a given address.
Args:
address: Primary register address
Returns:
Tuple of (high_register, low_register) or (None, None) if not found
"""
# Check runtime registers
if address in RUNTIME_REGISTERS:
config = RUNTIME_REGISTERS[address]
return (address, config["pair_register"])
# Check water counter registers
if address in WATER_COUNTER_REGISTERS:
config = WATER_COUNTER_REGISTERS[address]
return (address, config["pair_register"])
return (None, None)
def get_all_32bit_addresses() -> List[int]:
"""
Get all addresses that use 32-bit register pairs.
Returns:
List of primary register addresses
"""
addresses = []
addresses.extend(RUNTIME_REGISTERS.keys())
addresses.extend(WATER_COUNTER_REGISTERS.keys())
return sorted(addresses)
def is_32bit_register(address: int) -> bool:
"""
Check if an address represents a 32-bit register pair.
Args:
address: Register address to check
Returns:
True if address is part of a 32-bit register pair
"""
return address in RUNTIME_REGISTERS or address in WATER_COUNTER_REGISTERS

View File

@@ -0,0 +1,126 @@
"""
Sensor register mappings and configuration.
"""
from typing import Dict, List, Any
# Known sensor mappings with categorization
KNOWN_SENSORS = {
# System Status & Control
1000: {"name": "System Mode", "scale": "direct", "unit": "", "category": "system",
"values": {
"65535": "Standby / Screen Saver",
"2": "Idle / home",
"3": "alarm list",
"34": "DTS requested - step 1 press and hold DTS to START",
"5": "DTS mode startup - step 2, flush with shore pressure - 180 second timer",
"6": "DTS mode startup - step 3, high pressure pump on, 60 second timer product valve divert",
"7": "DTS mode running - step 4 high pressure pump on - water flowing to tank",
"8": "fresh water flush",
"9": "setup screen",
"15": "toggle flush valve , quality valve, low pressure pump, high pressure pump, enable/disable service mode",
"16": "toggle double pass / feed valve (from service menu)",
"17": "needle valve control (from service menu)",
"18": "sensor reading overview (from service menu)",
"31": "Overview System diagram Map",
"32": "contact support screen",
"33": "seawater home - pick single or double"
}},
1036: {"name": "System Status", "scale": "direct", "unit": "", "category": "system",
"values": {"0": "Standby", "5": "FWF", "7": "Service Mode"}},
# Pressure Sensors
1003: {"name": "Feed Pressure", "scale": "direct", "unit": "PSI", "category": "pressure"},
1007: {"name": "High Pressure #2", "scale": "direct", "unit": "PSI", "category": "pressure"},
1008: {"name": "High Pressure #1", "scale": "direct", "unit": "PSI", "category": "pressure"},
# Temperature Sensors
1017: {"name": "Water Temperature", "scale": "÷10", "unit": "°F", "category": "temperature"},
1125: {"name": "System Temperature", "scale": "÷10", "unit": "°F", "category": "temperature"},
# Flow Meters
1120: {"name": "Brine Flowmeter", "scale": "÷10", "unit": "GPM", "category": "flow"},
1121: {"name": "1st Pass Product Flowmeter", "scale": "÷10", "unit": "GPM", "category": "flow"},
1122: {"name": "2nd Pass Product Flowmeter", "scale": "÷10", "unit": "GPM", "category": "flow"},
# Water Quality
1123: {"name": "Product TDS #1", "scale": "direct", "unit": "PPM", "category": "quality"},
1124: {"name": "Product TDS #2", "scale": "direct", "unit": "PPM", "category": "quality"}
}
def get_sensor_by_category(category: str) -> Dict[int, Dict[str, Any]]:
"""
Get sensors filtered by category.
Args:
category: Sensor category (system, pressure, temperature, flow, quality)
Returns:
Dict of sensors in the specified category
"""
return {
addr: config for addr, config in KNOWN_SENSORS.items()
if config.get("category") == category
}
def get_sensor_categories() -> List[str]:
"""
Get list of all available sensor categories.
Returns:
List of category names
"""
categories = set()
for config in KNOWN_SENSORS.values():
if "category" in config:
categories.add(config["category"])
return sorted(list(categories))
def get_sensor_addresses_by_group(group: str) -> List[int]:
"""
Get sensor addresses for a specific group.
Args:
group: Group name (system, pressure, temperature, flow, quality)
Returns:
List of register addresses
"""
group_mappings = {
"system": [1000, 1036],
"pressure": [1003, 1007, 1008],
"temperature": [1017, 1125],
"flow": [1120, 1121, 1122],
"quality": [1123, 1124]
}
return group_mappings.get(group, [])
def validate_sensor_address(address: int) -> bool:
"""
Check if an address is a known sensor register.
Args:
address: Register address to validate
Returns:
True if address is a known sensor
"""
return address in KNOWN_SENSORS
def get_sensor_info(address: int) -> Dict[str, Any]:
"""
Get configuration info for a specific sensor.
Args:
address: Sensor register address
Returns:
Sensor configuration dict or empty dict if not found
"""
return KNOWN_SENSORS.get(address, {})

View File

@@ -0,0 +1,148 @@
"""
Timer and RTC register mappings and configuration.
"""
from typing import Dict, List, Any
# Timer register mappings
TIMER_REGISTERS = {
# FWF Mode Timer
136: {"name": "FWF Flush Timer", "scale": "÷10", "unit": "sec", "category": "fwf_timer"},
# DTS Mode Timers
138: {"name": "DTS Step 1 Timer", "scale": "÷10", "unit": "sec", "category": "dts_timer"},
128: {"name": "DTS Step 2 Priming Timer", "scale": "÷10", "unit": "sec", "category": "dts_timer"},
129: {"name": "DTS Step 3 Init Timer", "scale": "÷10", "unit": "sec", "category": "dts_timer"},
133: {"name": "DTS Step 4 Timer", "scale": "÷10", "unit": "sec", "category": "dts_timer"},
135: {"name": "DTS Step 5 Stop Timer", "scale": "÷10", "unit": "sec", "category": "dts_timer"},
139: {"name": "DTS Step 6 Flush Timer", "scale": "÷10", "unit": "sec", "category": "dts_timer"}
}
# RTC register mappings
RTC_REGISTERS = {
513: {"name": "RTC Minutes", "scale": "direct", "unit": "min", "category": "rtc"},
514: {"name": "RTC Seconds", "scale": "direct", "unit": "sec", "category": "rtc"},
516: {"name": "RTC Year", "scale": "direct", "unit": "", "category": "rtc"},
517: {"name": "RTC Month", "scale": "direct", "unit": "", "category": "rtc"},
518: {"name": "RTC Day", "scale": "direct", "unit": "", "category": "rtc"},
519: {"name": "RTC Month (Alt)", "scale": "direct", "unit": "", "category": "rtc"}
}
def get_timer_by_category(category: str) -> Dict[int, Dict[str, Any]]:
"""
Get timers filtered by category.
Args:
category: Timer category (fwf_timer, dts_timer)
Returns:
Dict of timers in the specified category
"""
return {
addr: config for addr, config in TIMER_REGISTERS.items()
if config.get("category") == category
}
def get_rtc_registers() -> Dict[int, Dict[str, Any]]:
"""
Get all RTC register configurations.
Returns:
Dict of RTC register configurations
"""
return RTC_REGISTERS.copy()
def get_timer_addresses_by_group(group: str) -> List[int]:
"""
Get timer addresses for a specific group.
Args:
group: Group name (fwf_timer, dts_timer, rtc)
Returns:
List of register addresses
"""
group_mappings = {
"fwf_timer": [136],
"dts_timer": [138, 128, 129, 133, 135, 139],
"rtc": [513, 514, 516, 517, 518, 519]
}
return group_mappings.get(group, [])
def get_dts_timer_addresses() -> List[int]:
"""
Get all DTS timer register addresses.
Returns:
List of DTS timer addresses
"""
return [addr for addr, config in TIMER_REGISTERS.items()
if config.get("category") == "dts_timer"]
def get_fwf_timer_addresses() -> List[int]:
"""
Get FWF timer register addresses.
Returns:
List of FWF timer addresses
"""
return [addr for addr, config in TIMER_REGISTERS.items()
if config.get("category") == "fwf_timer"]
def validate_timer_address(address: int) -> bool:
"""
Check if an address is a known timer register.
Args:
address: Register address to validate
Returns:
True if address is a known timer
"""
return address in TIMER_REGISTERS
def validate_rtc_address(address: int) -> bool:
"""
Check if an address is a known RTC register.
Args:
address: Register address to validate
Returns:
True if address is a known RTC register
"""
return address in RTC_REGISTERS
def get_timer_info(address: int) -> Dict[str, Any]:
"""
Get configuration info for a specific timer.
Args:
address: Timer register address
Returns:
Timer configuration dict or empty dict if not found
"""
return TIMER_REGISTERS.get(address, {})
def get_rtc_info(address: int) -> Dict[str, Any]:
"""
Get configuration info for a specific RTC register.
Args:
address: RTC register address
Returns:
RTC configuration dict or empty dict if not found
"""
return RTC_REGISTERS.get(address, {})

View File

@@ -0,0 +1,20 @@
"""
Service layer for PLC communication, data caching, and background tasks.
"""
from .plc_connection import PLCConnection, get_plc_connection
from .data_cache import DataCache, get_data_cache
from .register_reader import RegisterReader
from .register_writer import RegisterWriter
from .background_tasks import start_background_updates, BackgroundTaskManager
__all__ = [
'PLCConnection',
'get_plc_connection',
'DataCache',
'get_data_cache',
'RegisterReader',
'RegisterWriter',
'start_background_updates',
'BackgroundTaskManager'
]

View File

@@ -0,0 +1,111 @@
"""
Background task management for continuous data updates.
"""
import threading
import time
from typing import Optional
from ..config import Config
from ..utils.logger import get_logger
from .register_reader import RegisterReader
logger = get_logger(__name__)
class BackgroundTaskManager:
"""Manages background tasks for PLC data updates"""
def __init__(self):
self.reader = RegisterReader()
self._update_thread: Optional[threading.Thread] = None
self._running = False
self._stop_event = threading.Event()
def start_data_updates(self):
"""Start the background data update thread"""
if self._running:
logger.warning("Background data updates already running")
return
self._running = True
self._stop_event.clear()
self._update_thread = threading.Thread(
target=self._data_update_loop,
daemon=True,
name="PLCDataUpdater"
)
self._update_thread.start()
logger.info("Background data update thread started")
def stop_data_updates(self):
"""Stop the background data update thread"""
if not self._running:
return
self._running = False
self._stop_event.set()
if self._update_thread and self._update_thread.is_alive():
self._update_thread.join(timeout=5)
if self._update_thread.is_alive():
logger.warning("Background thread did not stop gracefully")
logger.info("Background data updates stopped")
def _data_update_loop(self):
"""Main data update loop running in background thread"""
logger.info("Starting PLC data update loop")
while self._running and not self._stop_event.is_set():
try:
# Update all PLC data
self.reader.update_all_data()
# Wait for next update cycle
self._stop_event.wait(Config.DATA_UPDATE_INTERVAL)
except Exception as e:
logger.error(f"Error in data update loop: {e}")
# Wait longer on error to avoid rapid retries
self._stop_event.wait(Config.ERROR_RETRY_INTERVAL)
logger.info("PLC data update loop ended")
def is_running(self) -> bool:
"""Check if background updates are running"""
return self._running and self._update_thread is not None and self._update_thread.is_alive()
# Global background task manager instance
_task_manager: Optional[BackgroundTaskManager] = None
def get_task_manager() -> BackgroundTaskManager:
"""
Get the global background task manager instance.
Returns:
BackgroundTaskManager instance
"""
global _task_manager
if _task_manager is None:
_task_manager = BackgroundTaskManager()
return _task_manager
def start_background_updates():
"""Start background data updates using the global task manager"""
manager = get_task_manager()
manager.start_data_updates()
def stop_background_updates():
"""Stop background data updates using the global task manager"""
manager = get_task_manager()
manager.stop_data_updates()
def is_background_updates_running() -> bool:
"""Check if background updates are currently running"""
manager = get_task_manager()
return manager.is_running()

View File

@@ -0,0 +1,249 @@
"""
Centralized data cache for PLC sensor data, timers, and status.
"""
import threading
from datetime import datetime
from typing import Dict, List, Any, Optional
from ..config import Config
from ..utils.logger import get_logger
logger = get_logger(__name__)
class DataCache:
"""Thread-safe data cache for PLC data"""
def __init__(self):
self._lock = threading.RLock()
self._data = {
"sensors": {},
"timers": {},
"rtc": {},
"outputs": {},
"runtime": {},
"water_counters": {},
"last_update": None,
"connection_status": "disconnected",
"errors": []
}
def get_all_data(self) -> Dict[str, Any]:
"""
Get all cached data (thread-safe).
Returns:
Copy of all cached data
"""
with self._lock:
return {
"sensors": self._data["sensors"].copy(),
"timers": self._data["timers"].copy(),
"rtc": self._data["rtc"].copy(),
"outputs": self._data["outputs"].copy(),
"runtime": self._data["runtime"].copy(),
"water_counters": self._data["water_counters"].copy(),
"last_update": self._data["last_update"],
"connection_status": self._data["connection_status"],
"errors": self._data["errors"].copy()
}
def get_sensors(self) -> Dict[str, Any]:
"""Get all sensor data"""
with self._lock:
return self._data["sensors"].copy()
def get_sensors_by_category(self, category: str) -> Dict[str, Any]:
"""Get sensors filtered by category"""
with self._lock:
return {
addr: sensor for addr, sensor in self._data["sensors"].items()
if sensor.get("category") == category
}
def get_timers(self) -> Dict[str, Any]:
"""Get all timer data"""
with self._lock:
return self._data["timers"].copy()
def get_active_timers(self) -> List[str]:
"""Get list of active timer addresses"""
with self._lock:
return [
addr for addr, timer in self._data["timers"].items()
if timer.get("active", False)
]
def get_timers_by_category(self, category: str) -> Dict[str, Any]:
"""Get timers filtered by category"""
with self._lock:
return {
addr: timer for addr, timer in self._data["timers"].items()
if timer.get("category") == category
}
def get_rtc(self) -> Dict[str, Any]:
"""Get RTC data"""
with self._lock:
return self._data["rtc"].copy()
def get_outputs(self) -> Dict[str, Any]:
"""Get output data"""
with self._lock:
return self._data["outputs"].copy()
def get_active_outputs(self) -> Dict[str, Any]:
"""Get only active output controls"""
with self._lock:
active_outputs = {}
for reg, output in self._data["outputs"].items():
active_bits = [bit for bit in output.get("bits", []) if bit.get("active", False)]
if active_bits:
active_outputs[reg] = {
**output,
"active_bits": active_bits
}
return active_outputs
def get_runtime(self) -> Dict[str, Any]:
"""Get runtime data"""
with self._lock:
return self._data["runtime"].copy()
def get_water_counters(self) -> Dict[str, Any]:
"""Get water counter data"""
with self._lock:
return self._data["water_counters"].copy()
def set_sensors(self, sensors: Dict[str, Any]):
"""Update sensor data"""
with self._lock:
self._data["sensors"] = sensors
self._data["last_update"] = datetime.now().isoformat()
def set_timers(self, timers: Dict[str, Any]):
"""Update timer data"""
with self._lock:
self._data["timers"] = timers
self._data["last_update"] = datetime.now().isoformat()
def set_rtc(self, rtc: Dict[str, Any]):
"""Update RTC data"""
with self._lock:
self._data["rtc"] = rtc
self._data["last_update"] = datetime.now().isoformat()
def set_outputs(self, outputs: Dict[str, Any]):
"""Update output data"""
with self._lock:
self._data["outputs"] = outputs
self._data["last_update"] = datetime.now().isoformat()
def set_runtime(self, runtime: Dict[str, Any]):
"""Update runtime data"""
with self._lock:
self._data["runtime"] = runtime
self._data["last_update"] = datetime.now().isoformat()
def set_water_counters(self, water_counters: Dict[str, Any]):
"""Update water counter data"""
with self._lock:
self._data["water_counters"] = water_counters
self._data["last_update"] = datetime.now().isoformat()
def set_connection_status(self, status: str):
"""Update connection status"""
with self._lock:
self._data["connection_status"] = status
def add_error(self, error: str):
"""Add error to error list (thread-safe)"""
with self._lock:
self._data["errors"].append({
"timestamp": datetime.now().isoformat(),
"error": error
})
# Keep only last N errors
max_errors = Config.MAX_CACHED_ERRORS
if len(self._data["errors"]) > max_errors:
self._data["errors"] = self._data["errors"][-max_errors:]
def get_errors(self, limit: int = 10) -> List[Dict[str, Any]]:
"""Get recent errors"""
with self._lock:
return self._data["errors"][-limit:]
def clear_errors(self):
"""Clear all errors"""
with self._lock:
self._data["errors"] = []
def get_last_update(self) -> Optional[str]:
"""Get last update timestamp"""
with self._lock:
return self._data["last_update"]
def get_connection_status(self) -> str:
"""Get current connection status"""
with self._lock:
return self._data["connection_status"]
def get_summary_stats(self) -> Dict[str, int]:
"""Get summary statistics"""
with self._lock:
return {
"sensor_count": len(self._data["sensors"]),
"active_timer_count": len([
t for t in self._data["timers"].values()
if t.get("active", False)
]),
"active_output_count": sum(
len([b for b in output.get("bits", []) if b.get("active", False)])
for output in self._data["outputs"].values()
),
"runtime_count": len(self._data["runtime"]),
"water_counter_count": len(self._data["water_counters"]),
"error_count": len(self._data["errors"])
}
def update_sensor(self, address: str, sensor_data: Dict[str, Any]):
"""Update single sensor (thread-safe)"""
with self._lock:
self._data["sensors"][address] = sensor_data
self._data["last_update"] = datetime.now().isoformat()
def update_timer(self, address: str, timer_data: Dict[str, Any]):
"""Update single timer (thread-safe)"""
with self._lock:
self._data["timers"][address] = timer_data
self._data["last_update"] = datetime.now().isoformat()
# Global data cache instance
_data_cache: Optional[DataCache] = None
def get_data_cache() -> DataCache:
"""
Get the global data cache instance (singleton pattern).
Returns:
DataCache instance
"""
global _data_cache
if _data_cache is None:
_data_cache = DataCache()
logger.info("Data cache initialized")
return _data_cache
def initialize_data_cache() -> DataCache:
"""
Initialize and return the data cache.
Returns:
DataCache instance
"""
cache = get_data_cache()
logger.info("Data cache ready")
return cache

View File

@@ -0,0 +1,216 @@
"""
PLC connection management and Modbus communication.
"""
import time
from typing import Optional
from pymodbus.client import ModbusTcpClient
from ..config import Config
from ..utils.logger import get_logger
from ..utils.error_handler import PLCConnectionError
logger = get_logger(__name__)
class PLCConnection:
"""Manages PLC connection and provides thread-safe access"""
def __init__(self):
self.config = Config.get_plc_config()
self.client: Optional[ModbusTcpClient] = None
self._is_connected = False
@property
def is_connected(self) -> bool:
"""Check if PLC is currently connected"""
return self._is_connected and self.client is not None
def connect(self) -> bool:
"""
Establish connection to PLC with retry logic.
Returns:
True if connected successfully, False otherwise
"""
current_time = time.time()
# Check if we should retry connection
if (self.config["last_connection_attempt"] +
self.config["connection_retry_interval"]) > current_time:
return self._is_connected
self.config["last_connection_attempt"] = current_time
try:
# Close existing connection if any
if self.client:
self.client.close()
# Create new client
self.client = ModbusTcpClient(
host=self.config["ip_address"],
port=self.config["port"],
timeout=self.config["timeout"]
)
# Attempt connection
self._is_connected = self.client.connect()
self.config["connected"] = self._is_connected
if self._is_connected:
logger.info(f"Connected to PLC at {self.config['ip_address']}:{self.config['port']}")
return True
else:
logger.error(f"Failed to connect to PLC at {self.config['ip_address']}:{self.config['port']}")
return False
except Exception as e:
logger.error(f"Error connecting to PLC: {e}")
self._is_connected = False
self.config["connected"] = False
return False
def disconnect(self):
"""Disconnect from PLC"""
if self.client:
try:
self.client.close()
logger.info("Disconnected from PLC")
except Exception as e:
logger.warning(f"Error during PLC disconnect: {e}")
finally:
self.client = None
self._is_connected = False
self.config["connected"] = False
def read_input_register(self, address: int) -> Optional[int]:
"""
Read input register (function code 4).
Args:
address: Register address
Returns:
Register value or None if read failed
"""
if not self.is_connected:
if not self.connect():
return None
try:
result = self.client.read_input_registers(
address, 1, slave=self.config["unit_id"]
)
if hasattr(result, 'registers') and not result.isError():
return result.registers[0]
else:
logger.warning(f"Failed to read input register {address}: {result}")
return None
except Exception as e:
logger.error(f"Error reading input register {address}: {e}")
self._is_connected = False
return None
def read_holding_register(self, address: int) -> Optional[int]:
"""
Read holding register (function code 3).
Args:
address: Register address
Returns:
Register value or None if read failed
"""
if not self.is_connected:
if not self.connect():
return None
try:
result = self.client.read_holding_registers(
address, 1, slave=self.config["unit_id"]
)
if hasattr(result, 'registers') and not result.isError():
return result.registers[0]
else:
logger.warning(f"Failed to read holding register {address}: {result}")
return None
except Exception as e:
logger.error(f"Error reading holding register {address}: {e}")
self._is_connected = False
return None
def write_holding_register(self, address: int, value: int) -> bool:
"""
Write single holding register (function code 6).
Args:
address: Register address
value: Value to write
Returns:
True if write successful, False otherwise
"""
if not self.is_connected:
if not self.connect():
raise PLCConnectionError("Cannot write register - PLC not connected")
try:
result = self.client.write_register(
address, value, slave=self.config["unit_id"]
)
if not result.isError():
logger.info(f"Successfully wrote value {value} to register {address}")
return True
else:
logger.error(f"Error writing register {address}: {result}")
return False
except Exception as e:
logger.error(f"Exception writing register {address}: {e}")
self._is_connected = False
raise PLCConnectionError(f"Failed to write register {address}: {e}")
def get_connection_status(self) -> dict:
"""
Get current connection status information.
Returns:
Dict with connection status details
"""
return {
"connected": self.is_connected,
"ip_address": self.config["ip_address"],
"port": self.config["port"],
"unit_id": self.config["unit_id"],
"timeout": self.config["timeout"],
"last_connection_attempt": self.config["last_connection_attempt"],
"retry_interval": self.config["connection_retry_interval"]
}
# Global PLC connection instance
_plc_connection: Optional[PLCConnection] = None
def get_plc_connection() -> PLCConnection:
"""
Get the global PLC connection instance (singleton pattern).
Returns:
PLCConnection instance
"""
global _plc_connection
if _plc_connection is None:
_plc_connection = PLCConnection()
return _plc_connection
def initialize_plc_connection() -> PLCConnection:
"""
Initialize and return the PLC connection.
Returns:
PLCConnection instance
"""
connection = get_plc_connection()
connection.connect()
return connection

View File

@@ -0,0 +1,487 @@
"""
Service for reading PLC registers and updating the data cache.
"""
from typing import Optional, Tuple, Dict, Any, List
from ..models import (
KNOWN_SENSORS, TIMER_REGISTERS, RTC_REGISTERS,
RUNTIME_REGISTERS, WATER_COUNTER_REGISTERS
)
from ..models.output_mappings import get_output_registers, create_output_bit_info
from ..utils.data_conversion import (
scale_value, get_descriptive_value, validate_register_value,
convert_ieee754_float, convert_gallon_counter, format_binary_string
)
from ..utils.logger import get_logger
from .plc_connection import get_plc_connection
from .data_cache import get_data_cache
logger = get_logger(__name__)
class RegisterReader:
"""Service for reading PLC registers and updating cache"""
def __init__(self):
self.plc = get_plc_connection()
self.cache = get_data_cache()
def read_register_pair(self, high_address: int, low_address: int, conversion_type: str) -> Tuple[bool, Optional[float], Optional[int], Optional[int]]:
"""
Read a pair of registers and convert them based on type.
Args:
high_address: High register address
low_address: Low register address
conversion_type: Conversion type (ieee754, gallon_counter)
Returns:
Tuple of (success, converted_value, raw_high, raw_low)
"""
high_value = self.plc.read_holding_register(high_address)
low_value = self.plc.read_holding_register(low_address)
if not validate_register_value(high_value) or not validate_register_value(low_value):
return False, None, high_value, low_value
if conversion_type == "ieee754":
converted = convert_ieee754_float(high_value, low_value)
elif conversion_type == "gallon_counter":
converted = convert_gallon_counter(high_value, low_value)
else:
converted = None
return True, converted, high_value, low_value
def update_sensors(self) -> bool:
"""
Update all sensor data in cache.
Returns:
True if successful, False otherwise
"""
try:
sensors = {}
for address, config in KNOWN_SENSORS.items():
raw_value = self.plc.read_input_register(address)
if validate_register_value(raw_value):
scaled_value = scale_value(raw_value, config["scale"])
descriptive_value = get_descriptive_value(raw_value, config)
sensors[str(address)] = {
"name": config["name"],
"raw_value": raw_value,
"scaled_value": scaled_value,
"descriptive_value": descriptive_value if isinstance(descriptive_value, str) else scaled_value,
"unit": config["unit"],
"category": config["category"],
"scale": config["scale"]
}
self.cache.set_sensors(sensors)
return True
except Exception as e:
logger.error(f"Error updating sensors: {e}")
self.cache.add_error(f"Sensor update failed: {e}")
return False
def update_timers(self) -> bool:
"""
Update all timer data in cache.
Returns:
True if successful, False otherwise
"""
try:
timers = {}
for address, config in TIMER_REGISTERS.items():
raw_value = self.plc.read_holding_register(address)
if validate_register_value(raw_value):
scaled_value = scale_value(raw_value, config["scale"])
timers[str(address)] = {
"name": config["name"],
"raw_value": raw_value,
"scaled_value": scaled_value,
"unit": config["unit"],
"category": config["category"],
"active": raw_value > 0
}
self.cache.set_timers(timers)
return True
except Exception as e:
logger.error(f"Error updating timers: {e}")
self.cache.add_error(f"Timer update failed: {e}")
return False
def update_rtc(self) -> bool:
"""
Update RTC data in cache.
Returns:
True if successful, False otherwise
"""
try:
rtc_data = {}
for address, config in RTC_REGISTERS.items():
raw_value = self.plc.read_holding_register(address)
if validate_register_value(raw_value):
rtc_data[str(address)] = {
"name": config["name"],
"value": raw_value,
"unit": config["unit"]
}
self.cache.set_rtc(rtc_data)
return True
except Exception as e:
logger.error(f"Error updating RTC: {e}")
self.cache.add_error(f"RTC update failed: {e}")
return False
def update_runtime(self) -> bool:
"""
Update runtime data in cache.
Returns:
True if successful, False otherwise
"""
try:
runtime_data = {}
for address, config in RUNTIME_REGISTERS.items():
success, converted_value, high_raw, low_raw = self.read_register_pair(
address, config["pair_register"], "ieee754"
)
if success and converted_value is not None:
runtime_data[str(address)] = {
"name": config["name"],
"value": converted_value,
"unit": config["unit"],
"category": config["category"],
"description": config["description"],
"raw_high": high_raw,
"raw_low": low_raw,
"high_register": address,
"low_register": config["pair_register"]
}
self.cache.set_runtime(runtime_data)
return True
except Exception as e:
logger.error(f"Error updating runtime: {e}")
self.cache.add_error(f"Runtime update failed: {e}")
return False
def update_water_counters(self) -> bool:
"""
Update water counter data in cache.
Returns:
True if successful, False otherwise
"""
try:
water_counter_data = {}
for address, config in WATER_COUNTER_REGISTERS.items():
success, converted_value, high_raw, low_raw = self.read_register_pair(
address, config["pair_register"], "gallon_counter"
)
if success and converted_value is not None:
water_counter_data[str(address)] = {
"name": config["name"],
"value": converted_value,
"unit": config["unit"],
"category": config["category"],
"description": config["description"],
"raw_high": high_raw,
"raw_low": low_raw,
"high_register": address,
"low_register": config["pair_register"]
}
self.cache.set_water_counters(water_counter_data)
return True
except Exception as e:
logger.error(f"Error updating water counters: {e}")
self.cache.add_error(f"Water counter update failed: {e}")
return False
def update_outputs(self) -> bool:
"""
Update output control data in cache.
Returns:
True if successful, False otherwise
"""
try:
outputs = {}
output_registers = get_output_registers()
for reg in output_registers:
modbus_addr = reg - 40001
raw_value = self.plc.read_holding_register(modbus_addr)
if validate_register_value(raw_value):
outputs[str(reg)] = {
"register": reg,
"value": raw_value,
"binary": format_binary_string(raw_value),
"bits": create_output_bit_info(reg, raw_value)
}
self.cache.set_outputs(outputs)
return True
except Exception as e:
logger.error(f"Error updating outputs: {e}")
self.cache.add_error(f"Output update failed: {e}")
return False
def update_all_data(self) -> bool:
"""
Update all PLC data in cache.
Returns:
True if all updates successful, False if any failed
"""
success = True
# Update connection status
if self.plc.is_connected:
self.cache.set_connection_status("connected")
else:
self.cache.set_connection_status("disconnected")
if not self.plc.connect():
self.cache.set_connection_status("connection_failed")
return False
# Update all data types
success &= self.update_sensors()
success &= self.update_timers()
success &= self.update_rtc()
success &= self.update_runtime()
success &= self.update_water_counters()
success &= self.update_outputs()
if success:
logger.debug("All PLC data updated successfully")
else:
logger.warning("Some PLC data updates failed")
return success
def read_selective_data(self, groups: List[str], keys: List[str]) -> Dict[str, Any]:
"""
Read only selected variables by groups and/or keys.
Args:
groups: List of group names
keys: List of register keys
Returns:
Dict containing selected data
"""
result = {
"sensors": {},
"timers": {},
"rtc": {},
"outputs": {},
"runtime": {},
"water_counters": {},
"requested_groups": groups,
"requested_keys": keys
}
# Collect addresses to read based on groups and keys
sensor_addresses = set()
timer_addresses = set()
output_registers = set()
runtime_addresses = set()
water_counter_addresses = set()
# Add addresses by groups
group_mappings = {
"system": [1000, 1036],
"pressure": [1003, 1007, 1008],
"temperature": [1017, 1125],
"flow": [1120, 1121, 1122],
"quality": [1123, 1124],
"fwf_timer": [136],
"dts_timer": [138, 128, 129, 133, 135, 139],
"rtc": [513, 514, 516, 517, 518, 519],
"outputs": [40017, 40018, 40019, 40020, 40021, 40022],
"runtime": list(RUNTIME_REGISTERS.keys()),
"water_counters": list(WATER_COUNTER_REGISTERS.keys())
}
for group in groups:
if group in group_mappings:
addresses = group_mappings[group]
if group == "outputs":
output_registers.update(addresses)
elif group == "runtime":
runtime_addresses.update(addresses)
elif group == "water_counters":
water_counter_addresses.update(addresses)
elif group in ["fwf_timer", "dts_timer", "rtc"]:
timer_addresses.update(addresses)
else:
sensor_addresses.update(addresses)
# Add specific requested keys
for key in keys:
try:
addr = int(key)
if addr in KNOWN_SENSORS:
sensor_addresses.add(addr)
elif addr in TIMER_REGISTERS or addr in RTC_REGISTERS:
timer_addresses.add(addr)
elif addr in RUNTIME_REGISTERS:
runtime_addresses.add(addr)
elif addr in WATER_COUNTER_REGISTERS:
water_counter_addresses.add(addr)
elif addr >= 40017 and addr <= 40022:
output_registers.add(addr)
else:
sensor_addresses.add(addr)
except ValueError:
continue
# Read and populate selected data
total_reads = 0
# Read sensors
for address in sensor_addresses:
raw_value = self.plc.read_input_register(address)
if validate_register_value(raw_value):
config = KNOWN_SENSORS.get(address, {
"name": f"Register {address}",
"scale": "direct",
"unit": "",
"category": "unknown"
})
scaled_value = scale_value(raw_value, config["scale"])
descriptive_value = get_descriptive_value(raw_value, config)
result["sensors"][str(address)] = {
"name": config["name"],
"raw_value": raw_value,
"scaled_value": scaled_value,
"descriptive_value": descriptive_value if isinstance(descriptive_value, str) else scaled_value,
"unit": config["unit"],
"category": config.get("category", "unknown"),
"scale": config["scale"]
}
total_reads += 1
# Read timers/RTC
for address in timer_addresses:
raw_value = self.plc.read_holding_register(address)
if validate_register_value(raw_value):
if address in TIMER_REGISTERS:
config = TIMER_REGISTERS[address]
scaled_value = scale_value(raw_value, config["scale"])
result["timers"][str(address)] = {
"name": config["name"],
"raw_value": raw_value,
"scaled_value": scaled_value,
"unit": config["unit"],
"category": config["category"],
"active": raw_value > 0
}
elif address in RTC_REGISTERS:
config = RTC_REGISTERS[address]
result["rtc"][str(address)] = {
"name": config["name"],
"value": raw_value,
"unit": config["unit"]
}
total_reads += 1
# Read runtime registers
for address in runtime_addresses:
if address in RUNTIME_REGISTERS:
config = RUNTIME_REGISTERS[address]
success, converted_value, high_raw, low_raw = self.read_register_pair(
address, config["pair_register"], "ieee754"
)
if success and converted_value is not None:
result["runtime"][str(address)] = {
"name": config["name"],
"value": converted_value,
"unit": config["unit"],
"category": config["category"],
"description": config["description"],
"raw_high": high_raw,
"raw_low": low_raw,
"high_register": address,
"low_register": config["pair_register"]
}
total_reads += 2 # Register pair
# Read water counter registers
for address in water_counter_addresses:
if address in WATER_COUNTER_REGISTERS:
config = WATER_COUNTER_REGISTERS[address]
success, converted_value, high_raw, low_raw = self.read_register_pair(
address, config["pair_register"], "gallon_counter"
)
if success and converted_value is not None:
result["water_counters"][str(address)] = {
"name": config["name"],
"value": converted_value,
"unit": config["unit"],
"category": config["category"],
"description": config["description"],
"raw_high": high_raw,
"raw_low": low_raw,
"high_register": address,
"low_register": config["pair_register"]
}
total_reads += 2 # Register pair
# Read outputs
for reg in output_registers:
modbus_addr = reg - 40001
raw_value = self.plc.read_holding_register(modbus_addr)
if validate_register_value(raw_value):
result["outputs"][str(reg)] = {
"register": reg,
"value": raw_value,
"binary": format_binary_string(raw_value),
"bits": create_output_bit_info(reg, raw_value)
}
total_reads += 1
# Add summary
result["summary"] = {
"sensors_read": len(result["sensors"]),
"timers_read": len(result["timers"]),
"rtc_read": len(result["rtc"]),
"outputs_read": len(result["outputs"]),
"runtime_read": len(result["runtime"]),
"water_counters_read": len(result["water_counters"]),
"total_plc_reads": total_reads
}
return result

View File

@@ -0,0 +1,111 @@
"""
Service for writing to PLC registers.
"""
from typing import Dict, Any
from ..utils.logger import get_logger
from ..utils.error_handler import RegisterWriteError, PLCConnectionError
from .plc_connection import get_plc_connection
from .data_cache import get_data_cache
logger = get_logger(__name__)
class RegisterWriter:
"""Service for writing to PLC registers"""
def __init__(self):
self.plc = get_plc_connection()
self.cache = get_data_cache()
def write_holding_register(self, address: int, value: int) -> bool:
"""
Write a single holding register.
Args:
address: Register address
value: Value to write
Returns:
True if write successful
Raises:
RegisterWriteError: If write operation fails
PLCConnectionError: If PLC connection fails
"""
try:
# Validate inputs
if not isinstance(address, int) or address < 0:
raise RegisterWriteError(f"Invalid register address: {address}")
if not isinstance(value, int) or value < 0 or value > 65535:
raise RegisterWriteError(f"Invalid register value: {value}. Must be 0-65535")
# Ensure PLC connection
if not self.plc.is_connected:
if not self.plc.connect():
raise PLCConnectionError("Failed to connect to PLC")
# Perform write operation
success = self.plc.write_holding_register(address, value)
if success:
logger.info(f"Successfully wrote {value} to register {address}")
return True
else:
raise RegisterWriteError(f"Failed to write register {address}")
except (RegisterWriteError, PLCConnectionError):
# Re-raise our custom exceptions
raise
except Exception as e:
error_msg = f"Unexpected error writing register {address}: {e}"
logger.error(error_msg)
self.cache.add_error(error_msg)
raise RegisterWriteError(error_msg)
def write_multiple_registers(self, writes: Dict[int, int]) -> Dict[int, bool]:
"""
Write multiple holding registers.
Args:
writes: Dict mapping addresses to values
Returns:
Dict mapping addresses to success status
"""
results = {}
for address, value in writes.items():
try:
results[address] = self.write_holding_register(address, value)
except (RegisterWriteError, PLCConnectionError) as e:
logger.error(f"Failed to write register {address}: {e}")
results[address] = False
return results
def validate_write_operation(self, address: int, value: int) -> tuple:
"""
Validate a write operation before execution.
Args:
address: Register address
value: Value to write
Returns:
Tuple of (is_valid, error_message)
"""
if not isinstance(address, int):
return False, "Address must be an integer"
if address < 0:
return False, "Address must be non-negative"
if not isinstance(value, int):
return False, "Value must be an integer"
if value < 0 or value > 65535:
return False, "Value must be between 0 and 65535"
return True, ""

View File

@@ -0,0 +1,15 @@
"""
Utility modules for data conversion, logging, and error handling.
"""
from .logger import get_logger
from .data_conversion import scale_value, convert_ieee754_float, convert_gallon_counter
from .error_handler import setup_error_handlers
__all__ = [
'get_logger',
'scale_value',
'convert_ieee754_float',
'convert_gallon_counter',
'setup_error_handlers'
]

View File

@@ -0,0 +1,144 @@
"""
Data conversion utilities for PLC register values.
"""
import struct
from typing import Union, Optional
from .logger import get_logger
logger = get_logger(__name__)
def scale_value(value: Union[int, float], scale_type: str) -> Union[int, float]:
"""
Apply scaling to sensor values based on scale type.
Args:
value: Raw register value
scale_type: Scaling type (e.g., "direct", "÷10", "×100")
Returns:
Scaled value
"""
if scale_type == "direct":
return value
elif scale_type.startswith("÷"):
try:
divisor = float(scale_type[1:])
return value / divisor
except (ValueError, ZeroDivisionError):
logger.warning(f"Invalid divisor in scale_type: {scale_type}")
return value
elif scale_type.startswith("×"):
try:
multiplier = float(scale_type[1:])
return value * multiplier
except ValueError:
logger.warning(f"Invalid multiplier in scale_type: {scale_type}")
return value
else:
logger.warning(f"Unknown scale_type: {scale_type}")
return value
def convert_ieee754_float(high_register: int, low_register: int) -> Optional[float]:
"""
Convert two 16-bit registers to IEEE 754 32-bit float.
Args:
high_register: High 16 bits
low_register: Low 16 bits
Returns:
Float value or None if conversion fails
"""
try:
# Combine registers into 32-bit value (big-endian)
combined_32bit = (high_register << 16) | low_register
# Convert to bytes and then to IEEE 754 float
bytes_value = struct.pack('>I', combined_32bit) # Big-endian unsigned int
float_value = struct.unpack('>f', bytes_value)[0] # Big-endian float
return round(float_value, 2) # Round to 2 decimal places like HMI
except Exception as e:
logger.error(f"Error converting IEEE 754 float: {e}")
return None
def convert_gallon_counter(high_register: int, low_register: int) -> Optional[float]:
"""
Convert two 16-bit registers to gallon counter value.
Args:
high_register: High 16 bits
low_register: Low 16 bits
Returns:
Gallon count as float or None if conversion fails
"""
try:
# Combine registers into 32-bit value
combined_32bit = (high_register << 16) | low_register
# Convert to IEEE 754 float (same as runtime hours)
bytes_value = struct.pack('>I', combined_32bit)
float_value = struct.unpack('>f', bytes_value)[0]
return round(float_value, 2)
except Exception as e:
logger.error(f"Error converting gallon counter: {e}")
return None
def get_descriptive_value(value: Union[int, float], sensor_config: dict) -> Union[str, int, float]:
"""
Convert numeric values to descriptive text where applicable.
Args:
value: Numeric register value
sensor_config: Sensor configuration containing value mappings
Returns:
Descriptive string or original value
"""
if "values" in sensor_config and isinstance(sensor_config["values"], dict):
return sensor_config["values"].get(str(value), f"Unknown ({value})")
return value
def validate_register_value(value: Optional[int], max_value: int = 65536) -> bool:
"""
Validate that a register value is within acceptable range.
Args:
value: Register value to validate
max_value: Maximum acceptable value (default: 65536)
Returns:
True if value is valid, False otherwise
"""
if value is None:
return False
if not isinstance(value, (int, float)):
return False
if value < 0 or value >= max_value:
return False
return True
def format_binary_string(value: int, width: int = 16) -> str:
"""
Format an integer as a binary string with specified width.
Args:
value: Integer value to format
width: Number of bits to display (default: 16)
Returns:
Binary string representation
"""
return format(value, f'0{width}b')

View File

@@ -0,0 +1,167 @@
"""
Centralized error handling for the Flask application.
"""
from flask import Flask, jsonify, request
from datetime import datetime
from typing import Dict, Any
from .logger import get_logger
logger = get_logger(__name__)
def setup_error_handlers(app: Flask):
"""
Setup error handlers for the Flask application.
Args:
app: Flask application instance
"""
@app.errorhandler(400)
def bad_request(error):
"""Handle 400 Bad Request errors"""
logger.warning(f"Bad request: {request.url} - {error.description}")
return jsonify({
"success": False,
"error": "Bad Request",
"message": error.description or "Invalid request parameters",
"timestamp": datetime.now().isoformat()
}), 400
@app.errorhandler(404)
def not_found(error):
"""Handle 404 Not Found errors"""
logger.warning(f"Not found: {request.url}")
return jsonify({
"success": False,
"error": "Not Found",
"message": f"Resource not found: {request.path}",
"timestamp": datetime.now().isoformat()
}), 404
@app.errorhandler(405)
def method_not_allowed(error):
"""Handle 405 Method Not Allowed errors"""
logger.warning(f"Method not allowed: {request.method} {request.url}")
return jsonify({
"success": False,
"error": "Method Not Allowed",
"message": f"Method {request.method} not allowed for {request.path}",
"allowed_methods": list(error.valid_methods) if hasattr(error, 'valid_methods') else [],
"timestamp": datetime.now().isoformat()
}), 405
@app.errorhandler(409)
def conflict(error):
"""Handle 409 Conflict errors"""
logger.warning(f"Conflict: {request.url} - {error.description}")
return jsonify({
"success": False,
"error": "Conflict",
"message": error.description or "Request conflicts with current state",
"timestamp": datetime.now().isoformat()
}), 409
@app.errorhandler(500)
def internal_error(error):
"""Handle 500 Internal Server Error"""
logger.error(f"Internal server error: {request.url} - {str(error)}")
return jsonify({
"success": False,
"error": "Internal Server Error",
"message": "An unexpected error occurred",
"timestamp": datetime.now().isoformat()
}), 500
@app.errorhandler(503)
def service_unavailable(error):
"""Handle 503 Service Unavailable errors"""
logger.error(f"Service unavailable: {request.url} - {error.description}")
return jsonify({
"success": False,
"error": "Service Unavailable",
"message": error.description or "PLC connection unavailable",
"timestamp": datetime.now().isoformat()
}), 503
logger.info("Error handlers configured")
def create_error_response(
error_type: str,
message: str,
status_code: int = 400,
details: Dict[str, Any] = None
) -> tuple:
"""
Create a standardized error response.
Args:
error_type: Type of error (e.g., "Bad Request", "PLC Error")
message: Error message
status_code: HTTP status code
details: Optional additional error details
Returns:
Tuple of (response_dict, status_code)
"""
response = {
"success": False,
"error": error_type,
"message": message,
"timestamp": datetime.now().isoformat()
}
if details:
response["details"] = details
return jsonify(response), status_code
def create_success_response(
message: str = "Success",
data: Dict[str, Any] = None,
status_code: int = 200
) -> tuple:
"""
Create a standardized success response.
Args:
message: Success message
data: Optional response data
status_code: HTTP status code
Returns:
Tuple of (response_dict, status_code)
"""
response = {
"success": True,
"message": message,
"timestamp": datetime.now().isoformat()
}
if data:
response.update(data)
return jsonify(response), status_code
class PLCConnectionError(Exception):
"""Exception raised when PLC connection fails"""
pass
class RegisterReadError(Exception):
"""Exception raised when register read operation fails"""
pass
class RegisterWriteError(Exception):
"""Exception raised when register write operation fails"""
pass
class DTSOperationError(Exception):
"""Exception raised when DTS operation fails"""
pass

View File

@@ -0,0 +1,68 @@
"""
Centralized logging configuration for the application.
"""
import logging
import sys
from typing import Optional
from ..config import Config
def get_logger(name: str, level: Optional[str] = None) -> logging.Logger:
"""
Get a configured logger instance.
Args:
name: Logger name (typically __name__)
level: Optional log level override
Returns:
logging.Logger: Configured logger instance
"""
logger = logging.getLogger(name)
# Only configure if not already configured
if not logger.handlers:
# Set log level
log_level = level or Config.LOG_LEVEL
logger.setLevel(getattr(logging, log_level, logging.INFO))
# Create console handler
handler = logging.StreamHandler(sys.stdout)
handler.setLevel(logger.level)
# Create formatter
formatter = logging.Formatter(Config.LOG_FORMAT)
handler.setFormatter(formatter)
# Add handler to logger
logger.addHandler(handler)
# Prevent duplicate logs from parent loggers
logger.propagate = False
return logger
def setup_logging(level: Optional[str] = None):
"""
Setup application-wide logging configuration.
Args:
level: Optional log level to set globally
"""
log_level = level or Config.LOG_LEVEL
# Configure root logger
logging.basicConfig(
level=getattr(logging, log_level, logging.INFO),
format=Config.LOG_FORMAT,
handlers=[logging.StreamHandler(sys.stdout)]
)
# Suppress verbose logs from external libraries
logging.getLogger('urllib3').setLevel(logging.WARNING)
logging.getLogger('pymodbus').setLevel(logging.WARNING)
logger = get_logger(__name__)
logger.info(f"Logging configured at {log_level} level")