#!/usr/bin/env python3 """ DTS API Test Suite - Comprehensive testing for DTS mode progression Monitors all screen transitions, tracks timer progress, and provides detailed debugging information. """ import sys import os import time import json import csv import argparse import requests from datetime import datetime, timedelta from typing import Dict, List, Any, Optional, Tuple from dataclasses import dataclass, asdict from pathlib import Path import logging from urllib.parse import urljoin # Add the package directory to Python path for imports sys.path.insert(0, os.path.dirname(os.path.abspath(__file__))) @dataclass class TransitionEvent: """Represents a screen transition event""" timestamp: str from_mode: int to_mode: int from_screen: str to_screen: str timer_info: Dict[str, Any] duration_seconds: float api_response_time_ms: int @dataclass class TimerProgress: """Represents timer progress data""" timestamp: str mode: int timer_address: Optional[int] timer_value: Optional[int] progress_percent: int countdown_rate: float expected_duration: Optional[int] @dataclass class TestResults: """Test execution results""" start_time: Optional[str] = None end_time: Optional[str] = None total_duration_seconds: float = 0.0 transitions_detected: int = 0 screens_completed: int = 0 api_errors: int = 0 timer_issues: int = 0 success: bool = False error_messages: List[str] = None performance_metrics: Dict[str, Any] = None def __post_init__(self): if self.error_messages is None: self.error_messages = [] if self.performance_metrics is None: self.performance_metrics = {} class TestConfig: """Configuration for DTS API testing""" # API Settings API_BASE_URL = "http://localhost:5000/api" REQUEST_TIMEOUT = 10 RETRY_ATTEMPTS = 3 RETRY_DELAY = 1 # Monitoring Settings POLLING_INTERVAL = 1.0 # seconds TRANSITION_TIMEOUT = 300 # 5 minutes max per screen PROGRESS_UPDATE_INTERVAL = 5 # seconds # Output Settings CONSOLE_VERBOSITY = "INFO" # DEBUG, INFO, WARNING, ERROR LOG_FILE_ENABLED = True CSV_EXPORT_ENABLED = True HTML_REPORT_ENABLED = False # Test Parameters EXPECTED_SCREEN_DURATIONS = { 5: 180, # Priming: 3 minutes 6: 60, # Init: 1 minute 8: 60 # Flush: 1 minute } # Alert Thresholds STUCK_TIMER_THRESHOLD = 30 # seconds without timer change SLOW_TRANSITION_THRESHOLD = 1.5 # 150% of expected duration class DTSAPITester: """Main test orchestrator for DTS API testing""" # Screen mode mappings SCREEN_MODES = { 34: "dts_requested_active", 5: "dts_priming_active", 6: "dts_init_active", 7: "dts_production_active", 8: "dts_flush_active", 2: "dts_process_complete" } # Timer mappings for progress tracking TIMER_MAPPINGS = { 5: {"timer_address": 128, "expected_duration": 180, "name": "Priming"}, 6: {"timer_address": 129, "expected_duration": 60, "name": "Init"}, 8: {"timer_address": 133, "expected_duration": 60, "name": "Fresh Water Flush"} } def __init__(self, api_base_url: str = None, config: TestConfig = None): """Initialize the DTS API tester""" self.config = config or TestConfig() self.api_base_url = api_base_url or self.config.API_BASE_URL self.session = requests.Session() self.session.timeout = self.config.REQUEST_TIMEOUT # Test state self.current_task_id = None self.transition_history: List[TransitionEvent] = [] self.timer_progress_history: List[TimerProgress] = [] self.start_time = None self.test_results = TestResults() self.previous_state = None self.last_timer_values = {} # Setup logging self.logger = self._setup_logger() # Create output directories self._create_output_directories() def _setup_logger(self) -> logging.Logger: """Setup logging configuration""" logger = logging.getLogger('DTSAPITester') logger.setLevel(getattr(logging, self.config.CONSOLE_VERBOSITY)) # Console handler console_handler = logging.StreamHandler() console_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') console_handler.setFormatter(console_formatter) logger.addHandler(console_handler) # File handler if enabled if self.config.LOG_FILE_ENABLED: log_dir = Path("logs") log_dir.mkdir(exist_ok=True) log_file = log_dir / f"dts_test_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log" file_handler = logging.FileHandler(log_file) file_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') file_handler.setFormatter(file_formatter) logger.addHandler(file_handler) self.log_file_path = log_file return logger def _create_output_directories(self): """Create necessary output directories""" for directory in ["logs", "reports", "data"]: Path(directory).mkdir(exist_ok=True) def _make_api_request(self, method: str, endpoint: str, **kwargs) -> Tuple[Optional[Dict], int]: """Make API request with error handling and timing""" url = urljoin(self.api_base_url + "/", endpoint.lstrip("/")) start_time = time.time() for attempt in range(self.config.RETRY_ATTEMPTS): try: response = self.session.request(method, url, **kwargs) response_time_ms = int((time.time() - start_time) * 1000) if response.status_code == 200: return response.json(), response_time_ms else: self.logger.warning(f"API request failed: {response.status_code} - {response.text}") if attempt == self.config.RETRY_ATTEMPTS - 1: self.test_results.api_errors += 1 return None, response_time_ms except requests.exceptions.RequestException as e: self.logger.warning(f"API request error (attempt {attempt + 1}): {e}") if attempt < self.config.RETRY_ATTEMPTS - 1: time.sleep(self.config.RETRY_DELAY * (attempt + 1)) else: self.test_results.api_errors += 1 return None, int((time.time() - start_time) * 1000) return None, 0 def check_system_status(self) -> bool: """Check if the API and system are ready""" self.logger.info("🔍 Checking system status...") response, _ = self._make_api_request("GET", "/status") if response: connection_status = response.get("connection_status", "unknown") plc_connected = response.get("plc_config", {}).get("connected", False) if connection_status == "connected" and plc_connected: self.logger.info("✅ System status: Connected") return True else: self.logger.warning(f"⚠️ System status: {connection_status}, PLC connected: {plc_connected}") return True # Still allow test to proceed else: self.logger.error("❌ System status: Failed to connect") return False def start_dts_sequence(self) -> bool: """Start DTS sequence via API""" self.logger.info("🔄 Starting DTS sequence...") response, response_time = self._make_api_request("POST", "/dts/start") if response and response.get("success"): self.current_task_id = response.get("data", {}).get("task_id") self.logger.info(f"📋 DTS sequence started - Task ID: {self.current_task_id}") self.logger.info(f"⚡ API Response Time: {response_time}ms") return True else: self.logger.error("❌ Failed to start DTS sequence") self.test_results.error_messages.append("Failed to start DTS sequence") return False def get_task_status(self) -> Optional[Dict]: """Get current task status""" if not self.current_task_id: return None response, response_time = self._make_api_request("GET", f"/dts/status/{self.current_task_id}") if response: return response.get("task", {}) return None def get_current_step_progress(self) -> Optional[Dict]: """Get real-time step progress""" response, _ = self._make_api_request("GET", "/dts/current-step-progress") return response def detect_screen_transition(self, current_state: Dict, previous_state: Dict) -> Optional[TransitionEvent]: """Detect screen transitions""" if not previous_state: return None # Check for mode change in timer_info current_mode = current_state.get("timer_info", {}).get("current_mode") previous_mode = previous_state.get("timer_info", {}).get("current_mode") # Also check current_step for transitions current_step = current_state.get("current_step", "") previous_step = previous_state.get("current_step", "") if current_mode != previous_mode and current_mode is not None: # Mode transition detected from_screen = self.SCREEN_MODES.get(previous_mode, f"mode_{previous_mode}") to_screen = self.SCREEN_MODES.get(current_mode, f"mode_{current_mode}") # Calculate duration since last transition duration = 0.0 if self.transition_history: last_transition = self.transition_history[-1] last_time = datetime.fromisoformat(last_transition.timestamp.replace('Z', '+00:00')) current_time = datetime.now() duration = (current_time - last_time).total_seconds() transition = TransitionEvent( timestamp=datetime.now().isoformat(), from_mode=previous_mode, to_mode=current_mode, from_screen=from_screen, to_screen=to_screen, timer_info=current_state.get("timer_info", {}), duration_seconds=duration, api_response_time_ms=0 # Will be updated by caller ) return transition return None def log_transition_event(self, transition: TransitionEvent): """Log detailed transition information""" self.transition_history.append(transition) self.test_results.transitions_detected += 1 # Get screen names from_name = self._get_screen_name(transition.from_mode) to_name = self._get_screen_name(transition.to_mode) # Format duration duration_str = self._format_duration(transition.duration_seconds) self.logger.info("📺 Screen Transition Detected:") self.logger.info(f" {from_name} → {to_name} (Mode {transition.from_mode} → {transition.to_mode})") # Log timer information timer_info = transition.timer_info if timer_info.get("timer_address"): timer_addr = timer_info["timer_address"] timer_val = timer_info.get("raw_timer_value", "N/A") progress = timer_info.get("timer_progress", 0) self.logger.info(f" ⏱️ Timer R{timer_addr}: {timer_val} ({progress}% progress)") if transition.duration_seconds > 0: self.logger.info(f" ⏳ Duration: {duration_str}") # Check for timing issues expected_duration = self.config.EXPECTED_SCREEN_DURATIONS.get(transition.from_mode) if expected_duration and transition.duration_seconds > 0: if transition.duration_seconds > expected_duration * self.config.SLOW_TRANSITION_THRESHOLD: self.logger.warning(f" ⚠️ Slow transition: {duration_str} (expected ~{expected_duration}s)") def analyze_timer_progress(self, current_state: Dict): """Analyze and log timer progress""" timer_info = current_state.get("timer_info", {}) if not timer_info: return current_mode = timer_info.get("current_mode") timer_address = timer_info.get("timer_address") timer_value = timer_info.get("raw_timer_value") progress = timer_info.get("timer_progress", 0) if timer_address and timer_value is not None: # Create timer progress record timer_progress = TimerProgress( timestamp=datetime.now().isoformat(), mode=current_mode, timer_address=timer_address, timer_value=timer_value, progress_percent=progress, countdown_rate=self._calculate_countdown_rate(timer_address, timer_value), expected_duration=self.config.EXPECTED_SCREEN_DURATIONS.get(current_mode) ) self.timer_progress_history.append(timer_progress) # Check for stuck timer if self._is_timer_stuck(timer_address, timer_value): self.logger.warning(f"⚠️ Timer R{timer_address} appears stuck at {timer_value}") self.test_results.timer_issues += 1 def _calculate_countdown_rate(self, timer_address: int, current_value: int) -> float: """Calculate timer countdown rate""" if timer_address not in self.last_timer_values: self.last_timer_values[timer_address] = { "value": current_value, "timestamp": time.time() } return 0.0 last_data = self.last_timer_values[timer_address] time_diff = time.time() - last_data["timestamp"] value_diff = last_data["value"] - current_value if time_diff > 0: rate = value_diff / time_diff # Update stored values self.last_timer_values[timer_address] = { "value": current_value, "timestamp": time.time() } return rate return 0.0 def _is_timer_stuck(self, timer_address: int, current_value: int) -> bool: """Check if timer appears to be stuck""" if timer_address not in self.last_timer_values: return False last_data = self.last_timer_values[timer_address] time_since_change = time.time() - last_data["timestamp"] return (current_value == last_data["value"] and current_value > 0 and time_since_change > self.config.STUCK_TIMER_THRESHOLD) def _get_screen_name(self, mode: int) -> str: """Get human-readable screen name""" screen_names = { 34: "DTS Requested", 5: "Priming", 6: "Init", 7: "Production", 8: "Fresh Water Flush", 2: "Standby (Complete)" } return screen_names.get(mode, f"Mode {mode}") def _format_duration(self, seconds: float) -> str: """Format duration in human-readable format""" if seconds < 60: return f"{seconds:.1f}s" elif seconds < 3600: minutes = int(seconds // 60) secs = int(seconds % 60) return f"{minutes}m {secs}s" else: hours = int(seconds // 3600) minutes = int((seconds % 3600) // 60) return f"{hours}h {minutes}m" def display_progress_bar(self, progress: int, width: int = 20) -> str: """Create a visual progress bar""" filled = int(width * progress / 100) bar = "█" * filled + "░" * (width - filled) return f"{bar} {progress}%" def monitor_dts_progress(self) -> bool: """Main monitoring loop""" self.logger.info("🔍 Starting DTS progress monitoring...") start_time = time.time() last_progress_update = 0 monitoring_active = True while monitoring_active: try: # Get current task status current_state = self.get_task_status() if not current_state: self.logger.error("❌ Failed to get task status") time.sleep(self.config.POLLING_INTERVAL) continue # Check for transitions if self.previous_state: transition = self.detect_screen_transition(current_state, self.previous_state) if transition: self.log_transition_event(transition) # Analyze timer progress self.analyze_timer_progress(current_state) # Display progress updates current_time = time.time() if current_time - last_progress_update >= self.config.PROGRESS_UPDATE_INTERVAL: self._display_current_status(current_state) last_progress_update = current_time # Check if DTS process is complete task_status = current_state.get("status", "") current_step = current_state.get("current_step", "") if task_status == "completed" or current_step == "dts_process_complete": self.logger.info("✅ DTS process completed successfully!") self.test_results.success = True monitoring_active = False elif task_status == "failed": self.logger.error("❌ DTS process failed!") error_msg = current_state.get("last_error", {}).get("message", "Unknown error") self.test_results.error_messages.append(f"DTS process failed: {error_msg}") monitoring_active = False # Check for timeout elapsed_time = current_time - start_time if elapsed_time > self.config.TRANSITION_TIMEOUT: self.logger.error(f"❌ Monitoring timeout after {elapsed_time:.1f}s") self.test_results.error_messages.append("Monitoring timeout exceeded") monitoring_active = False # Store current state for next iteration self.previous_state = current_state # Wait before next poll time.sleep(self.config.POLLING_INTERVAL) except KeyboardInterrupt: self.logger.info("👋 Monitoring stopped by user") monitoring_active = False except Exception as e: self.logger.error(f"❌ Error during monitoring: {e}") self.test_results.error_messages.append(f"Monitoring error: {str(e)}") time.sleep(self.config.POLLING_INTERVAL) # Calculate final results self.test_results.total_duration_seconds = time.time() - start_time self.test_results.screens_completed = len(self.transition_history) return self.test_results.success def _display_current_status(self, current_state: Dict): """Display current status information""" current_step = current_state.get("current_step", "") progress = current_state.get("progress_percent", 0) timer_info = current_state.get("timer_info", {}) # Get current mode and screen name current_mode = timer_info.get("current_mode", 0) screen_name = self._get_screen_name(current_mode) self.logger.info(f"⏳ Current: {screen_name} (Mode {current_mode})") if timer_info.get("timer_address"): timer_addr = timer_info["timer_address"] timer_val = timer_info.get("raw_timer_value", 0) progress_bar = self.display_progress_bar(progress) self.logger.info(f"📊 Progress: {progress_bar}") self.logger.info(f"⏱️ Timer R{timer_addr}: {timer_val}") # Show elapsed time if self.start_time: elapsed = time.time() - self.start_time self.logger.info(f"🕐 Elapsed: {self._format_duration(elapsed)}") def generate_reports(self): """Generate test reports""" self.logger.info("📊 Generating test reports...") # Update final test results self.test_results.end_time = datetime.now().isoformat() # Generate JSON report self._generate_json_report() # Generate CSV report if enabled if self.config.CSV_EXPORT_ENABLED: self._generate_csv_report() # Display summary self._display_test_summary() def _generate_json_report(self): """Generate detailed JSON report""" report_data = { "test_session": { "start_time": self.test_results.start_time, "end_time": self.test_results.end_time, "api_endpoint": self.api_base_url, "task_id": self.current_task_id, "total_duration_seconds": self.test_results.total_duration_seconds }, "results": asdict(self.test_results), "transitions": [asdict(t) for t in self.transition_history], "timer_progress": [asdict(t) for t in self.timer_progress_history] } # Save to file report_file = Path("reports") / f"dts_test_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json" with open(report_file, 'w') as f: json.dump(report_data, f, indent=2) self.logger.info(f"📄 JSON report saved: {report_file}") def _generate_csv_report(self): """Generate CSV report for timer data""" if not self.timer_progress_history: return csv_file = Path("data") / f"dts_timer_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv" with open(csv_file, 'w', newline='') as f: writer = csv.writer(f) writer.writerow([ "timestamp", "mode", "timer_address", "timer_value", "progress_percent", "countdown_rate", "expected_duration" ]) for progress in self.timer_progress_history: writer.writerow([ progress.timestamp, progress.mode, progress.timer_address, progress.timer_value, progress.progress_percent, progress.countdown_rate, progress.expected_duration ]) self.logger.info(f"📊 CSV data saved: {csv_file}") def _display_test_summary(self): """Display final test summary""" results = self.test_results self.logger.info("\n" + "="*60) self.logger.info("📊 DTS API Test Summary") self.logger.info("="*60) # Test outcome status_icon = "✅" if results.success else "❌" status_text = "SUCCESS" if results.success else "FAILED" self.logger.info(f"Status: {status_icon} {status_text}") # Timing information self.logger.info(f"Total Duration: {self._format_duration(results.total_duration_seconds)}") self.logger.info(f"Screens Completed: {results.screens_completed}") self.logger.info(f"Transitions Detected: {results.transitions_detected}") # Error information if results.api_errors > 0: self.logger.warning(f"API Errors: {results.api_errors}") if results.timer_issues > 0: self.logger.warning(f"Timer Issues: {results.timer_issues}") # Error messages if results.error_messages: self.logger.error("Error Messages:") for error in results.error_messages: self.logger.error(f" - {error}") # Transition summary if self.transition_history: self.logger.info("\nTransition Summary:") for i, transition in enumerate(self.transition_history, 1): from_name = self._get_screen_name(transition.from_mode) to_name = self._get_screen_name(transition.to_mode) duration_str = self._format_duration(transition.duration_seconds) self.logger.info(f" {i}. {from_name} → {to_name} ({duration_str})") self.logger.info("="*60) def run_test(self) -> bool: """Run the complete DTS API test""" self.logger.info("🚀 DTS API Test Suite v1.0") self.logger.info(f"📡 API Endpoint: {self.api_base_url}") self.logger.info(f"⏰ Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}") self.start_time = time.time() self.test_results.start_time = datetime.now().isoformat() try: # Step 1: Check system status if not self.check_system_status(): return False # Step 2: Start DTS sequence if not self.start_dts_sequence(): return False # Step 3: Monitor progress success = self.monitor_dts_progress() # Step 4: Generate reports self.generate_reports() return success except Exception as e: self.logger.error(f"❌ Test execution failed: {e}") self.test_results.error_messages.append(f"Test execution failed: {str(e)}") return False def main(): """Main entry point""" parser = argparse.ArgumentParser(description="DTS API Test Suite") parser.add_argument("--api-url", default="http://localhost:5000/api", help="API base URL") parser.add_argument("--verbose", action="store_true", help="Enable verbose output") parser.add_argument("--export-csv", action="store_true", help="Export timer data to CSV") parser.add_argument("--config", help="Configuration file path") parser.add_argument("--polling-interval", type=float, default=1.0, help="Polling interval in seconds") args = parser.parse_args() # Create configuration config = TestConfig() if args.verbose: config.CONSOLE_VERBOSITY = "DEBUG" if args.export_csv: config.CSV_EXPORT_ENABLED = True if args.polling_interval: config.POLLING_INTERVAL = args.polling_interval # Load custom config if provided if args.config and os.path.exists(args.config): with open(args.config, 'r') as f: custom_config = json.load(f) for key, value in custom_config.items(): if hasattr(config, key): setattr(config, key, value) # Create and run tester tester = DTSAPITester(api_base_url=args.api_url, config=config) success = tester.run_test() # Exit with appropriate code sys.exit(0 if success else 1) if __name__ == "__main__": main()