691 lines
27 KiB
Python
Executable File
691 lines
27 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""
|
|
DTS API Test Suite - Comprehensive testing for DTS mode progression
|
|
Monitors all screen transitions, tracks timer progress, and provides detailed debugging information.
|
|
"""
|
|
|
|
import sys
|
|
import os
|
|
import time
|
|
import json
|
|
import csv
|
|
import argparse
|
|
import requests
|
|
from datetime import datetime, timedelta
|
|
from typing import Dict, List, Any, Optional, Tuple
|
|
from dataclasses import dataclass, asdict
|
|
from pathlib import Path
|
|
import logging
|
|
from urllib.parse import urljoin
|
|
|
|
# Add the package directory to Python path for imports
|
|
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
|
|
|
@dataclass
|
|
class TransitionEvent:
|
|
"""Represents a screen transition event"""
|
|
timestamp: str
|
|
from_mode: int
|
|
to_mode: int
|
|
from_screen: str
|
|
to_screen: str
|
|
timer_info: Dict[str, Any]
|
|
duration_seconds: float
|
|
api_response_time_ms: int
|
|
|
|
@dataclass
|
|
class TimerProgress:
|
|
"""Represents timer progress data"""
|
|
timestamp: str
|
|
mode: int
|
|
timer_address: Optional[int]
|
|
timer_value: Optional[int]
|
|
progress_percent: int
|
|
countdown_rate: float
|
|
expected_duration: Optional[int]
|
|
|
|
@dataclass
|
|
class TestResults:
|
|
"""Test execution results"""
|
|
start_time: Optional[str] = None
|
|
end_time: Optional[str] = None
|
|
total_duration_seconds: float = 0.0
|
|
transitions_detected: int = 0
|
|
screens_completed: int = 0
|
|
api_errors: int = 0
|
|
timer_issues: int = 0
|
|
success: bool = False
|
|
error_messages: List[str] = None
|
|
performance_metrics: Dict[str, Any] = None
|
|
|
|
def __post_init__(self):
|
|
if self.error_messages is None:
|
|
self.error_messages = []
|
|
if self.performance_metrics is None:
|
|
self.performance_metrics = {}
|
|
|
|
class TestConfig:
|
|
"""Configuration for DTS API testing"""
|
|
|
|
# API Settings
|
|
API_BASE_URL = "http://localhost:5000/api"
|
|
REQUEST_TIMEOUT = 10
|
|
RETRY_ATTEMPTS = 3
|
|
RETRY_DELAY = 1
|
|
|
|
# Monitoring Settings
|
|
POLLING_INTERVAL = 1.0 # seconds
|
|
TRANSITION_TIMEOUT = 300 # 5 minutes max per screen
|
|
PROGRESS_UPDATE_INTERVAL = 5 # seconds
|
|
|
|
# Output Settings
|
|
CONSOLE_VERBOSITY = "INFO" # DEBUG, INFO, WARNING, ERROR
|
|
LOG_FILE_ENABLED = True
|
|
CSV_EXPORT_ENABLED = True
|
|
HTML_REPORT_ENABLED = False
|
|
|
|
# Test Parameters
|
|
EXPECTED_SCREEN_DURATIONS = {
|
|
5: 180, # Priming: 3 minutes
|
|
6: 60, # Init: 1 minute
|
|
8: 60 # Flush: 1 minute
|
|
}
|
|
|
|
# Alert Thresholds
|
|
STUCK_TIMER_THRESHOLD = 30 # seconds without timer change
|
|
SLOW_TRANSITION_THRESHOLD = 1.5 # 150% of expected duration
|
|
|
|
class DTSAPITester:
|
|
"""Main test orchestrator for DTS API testing"""
|
|
|
|
# Screen mode mappings
|
|
SCREEN_MODES = {
|
|
34: "dts_requested_active",
|
|
5: "dts_priming_active",
|
|
6: "dts_init_active",
|
|
7: "dts_production_active",
|
|
8: "dts_flush_active",
|
|
2: "dts_process_complete"
|
|
}
|
|
|
|
# Timer mappings for progress tracking
|
|
TIMER_MAPPINGS = {
|
|
5: {"timer_address": 128, "expected_duration": 180, "name": "Priming"},
|
|
6: {"timer_address": 129, "expected_duration": 60, "name": "Init"},
|
|
8: {"timer_address": 133, "expected_duration": 60, "name": "Fresh Water Flush"}
|
|
}
|
|
|
|
def __init__(self, api_base_url: str = None, config: TestConfig = None):
|
|
"""Initialize the DTS API tester"""
|
|
self.config = config or TestConfig()
|
|
self.api_base_url = api_base_url or self.config.API_BASE_URL
|
|
self.session = requests.Session()
|
|
self.session.timeout = self.config.REQUEST_TIMEOUT
|
|
|
|
# Test state
|
|
self.current_task_id = None
|
|
self.transition_history: List[TransitionEvent] = []
|
|
self.timer_progress_history: List[TimerProgress] = []
|
|
self.start_time = None
|
|
self.test_results = TestResults()
|
|
self.previous_state = None
|
|
self.last_timer_values = {}
|
|
|
|
# Setup logging
|
|
self.logger = self._setup_logger()
|
|
|
|
# Create output directories
|
|
self._create_output_directories()
|
|
|
|
def _setup_logger(self) -> logging.Logger:
|
|
"""Setup logging configuration"""
|
|
logger = logging.getLogger('DTSAPITester')
|
|
logger.setLevel(getattr(logging, self.config.CONSOLE_VERBOSITY))
|
|
|
|
# Console handler
|
|
console_handler = logging.StreamHandler()
|
|
console_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
|
|
console_handler.setFormatter(console_formatter)
|
|
logger.addHandler(console_handler)
|
|
|
|
# File handler if enabled
|
|
if self.config.LOG_FILE_ENABLED:
|
|
log_dir = Path("logs")
|
|
log_dir.mkdir(exist_ok=True)
|
|
log_file = log_dir / f"dts_test_{datetime.now().strftime('%Y%m%d_%H%M%S')}.log"
|
|
|
|
file_handler = logging.FileHandler(log_file)
|
|
file_formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
|
file_handler.setFormatter(file_formatter)
|
|
logger.addHandler(file_handler)
|
|
|
|
self.log_file_path = log_file
|
|
|
|
return logger
|
|
|
|
def _create_output_directories(self):
|
|
"""Create necessary output directories"""
|
|
for directory in ["logs", "reports", "data"]:
|
|
Path(directory).mkdir(exist_ok=True)
|
|
|
|
def _make_api_request(self, method: str, endpoint: str, **kwargs) -> Tuple[Optional[Dict], int]:
|
|
"""Make API request with error handling and timing"""
|
|
url = urljoin(self.api_base_url + "/", endpoint.lstrip("/"))
|
|
start_time = time.time()
|
|
|
|
for attempt in range(self.config.RETRY_ATTEMPTS):
|
|
try:
|
|
response = self.session.request(method, url, **kwargs)
|
|
response_time_ms = int((time.time() - start_time) * 1000)
|
|
|
|
if response.status_code == 200:
|
|
return response.json(), response_time_ms
|
|
else:
|
|
self.logger.warning(f"API request failed: {response.status_code} - {response.text}")
|
|
if attempt == self.config.RETRY_ATTEMPTS - 1:
|
|
self.test_results.api_errors += 1
|
|
return None, response_time_ms
|
|
|
|
except requests.exceptions.RequestException as e:
|
|
self.logger.warning(f"API request error (attempt {attempt + 1}): {e}")
|
|
if attempt < self.config.RETRY_ATTEMPTS - 1:
|
|
time.sleep(self.config.RETRY_DELAY * (attempt + 1))
|
|
else:
|
|
self.test_results.api_errors += 1
|
|
return None, int((time.time() - start_time) * 1000)
|
|
|
|
return None, 0
|
|
|
|
def check_system_status(self) -> bool:
|
|
"""Check if the API and system are ready"""
|
|
self.logger.info("🔍 Checking system status...")
|
|
|
|
response, _ = self._make_api_request("GET", "/status")
|
|
if response:
|
|
connection_status = response.get("connection_status", "unknown")
|
|
plc_connected = response.get("plc_config", {}).get("connected", False)
|
|
|
|
if connection_status == "connected" and plc_connected:
|
|
self.logger.info("✅ System status: Connected")
|
|
return True
|
|
else:
|
|
self.logger.warning(f"⚠️ System status: {connection_status}, PLC connected: {plc_connected}")
|
|
return True # Still allow test to proceed
|
|
else:
|
|
self.logger.error("❌ System status: Failed to connect")
|
|
return False
|
|
|
|
def start_dts_sequence(self) -> bool:
|
|
"""Start DTS sequence via API"""
|
|
self.logger.info("🔄 Starting DTS sequence...")
|
|
|
|
response, response_time = self._make_api_request("POST", "/dts/start")
|
|
if response and response.get("success"):
|
|
self.current_task_id = response.get("data", {}).get("task_id")
|
|
self.logger.info(f"📋 DTS sequence started - Task ID: {self.current_task_id}")
|
|
self.logger.info(f"⚡ API Response Time: {response_time}ms")
|
|
return True
|
|
else:
|
|
self.logger.error("❌ Failed to start DTS sequence")
|
|
self.test_results.error_messages.append("Failed to start DTS sequence")
|
|
return False
|
|
|
|
def get_task_status(self) -> Optional[Dict]:
|
|
"""Get current task status"""
|
|
if not self.current_task_id:
|
|
return None
|
|
|
|
response, response_time = self._make_api_request("GET", f"/dts/status/{self.current_task_id}")
|
|
if response:
|
|
return response.get("task", {})
|
|
return None
|
|
|
|
def get_current_step_progress(self) -> Optional[Dict]:
|
|
"""Get real-time step progress"""
|
|
response, _ = self._make_api_request("GET", "/dts/current-step-progress")
|
|
return response
|
|
|
|
def detect_screen_transition(self, current_state: Dict, previous_state: Dict) -> Optional[TransitionEvent]:
|
|
"""Detect screen transitions"""
|
|
if not previous_state:
|
|
return None
|
|
|
|
# Check for mode change in timer_info
|
|
current_mode = current_state.get("timer_info", {}).get("current_mode")
|
|
previous_mode = previous_state.get("timer_info", {}).get("current_mode")
|
|
|
|
# Also check current_step for transitions
|
|
current_step = current_state.get("current_step", "")
|
|
previous_step = previous_state.get("current_step", "")
|
|
|
|
if current_mode != previous_mode and current_mode is not None:
|
|
# Mode transition detected
|
|
from_screen = self.SCREEN_MODES.get(previous_mode, f"mode_{previous_mode}")
|
|
to_screen = self.SCREEN_MODES.get(current_mode, f"mode_{current_mode}")
|
|
|
|
# Calculate duration since last transition
|
|
duration = 0.0
|
|
if self.transition_history:
|
|
last_transition = self.transition_history[-1]
|
|
last_time = datetime.fromisoformat(last_transition.timestamp.replace('Z', '+00:00'))
|
|
current_time = datetime.now()
|
|
duration = (current_time - last_time).total_seconds()
|
|
|
|
transition = TransitionEvent(
|
|
timestamp=datetime.now().isoformat(),
|
|
from_mode=previous_mode,
|
|
to_mode=current_mode,
|
|
from_screen=from_screen,
|
|
to_screen=to_screen,
|
|
timer_info=current_state.get("timer_info", {}),
|
|
duration_seconds=duration,
|
|
api_response_time_ms=0 # Will be updated by caller
|
|
)
|
|
|
|
return transition
|
|
|
|
return None
|
|
|
|
def log_transition_event(self, transition: TransitionEvent):
|
|
"""Log detailed transition information"""
|
|
self.transition_history.append(transition)
|
|
self.test_results.transitions_detected += 1
|
|
|
|
# Get screen names
|
|
from_name = self._get_screen_name(transition.from_mode)
|
|
to_name = self._get_screen_name(transition.to_mode)
|
|
|
|
# Format duration
|
|
duration_str = self._format_duration(transition.duration_seconds)
|
|
|
|
self.logger.info("📺 Screen Transition Detected:")
|
|
self.logger.info(f" {from_name} → {to_name} (Mode {transition.from_mode} → {transition.to_mode})")
|
|
|
|
# Log timer information
|
|
timer_info = transition.timer_info
|
|
if timer_info.get("timer_address"):
|
|
timer_addr = timer_info["timer_address"]
|
|
timer_val = timer_info.get("raw_timer_value", "N/A")
|
|
progress = timer_info.get("timer_progress", 0)
|
|
self.logger.info(f" ⏱️ Timer R{timer_addr}: {timer_val} ({progress}% progress)")
|
|
|
|
if transition.duration_seconds > 0:
|
|
self.logger.info(f" ⏳ Duration: {duration_str}")
|
|
|
|
# Check for timing issues
|
|
expected_duration = self.config.EXPECTED_SCREEN_DURATIONS.get(transition.from_mode)
|
|
if expected_duration and transition.duration_seconds > 0:
|
|
if transition.duration_seconds > expected_duration * self.config.SLOW_TRANSITION_THRESHOLD:
|
|
self.logger.warning(f" ⚠️ Slow transition: {duration_str} (expected ~{expected_duration}s)")
|
|
|
|
def analyze_timer_progress(self, current_state: Dict):
|
|
"""Analyze and log timer progress"""
|
|
timer_info = current_state.get("timer_info", {})
|
|
if not timer_info:
|
|
return
|
|
|
|
current_mode = timer_info.get("current_mode")
|
|
timer_address = timer_info.get("timer_address")
|
|
timer_value = timer_info.get("raw_timer_value")
|
|
progress = timer_info.get("timer_progress", 0)
|
|
|
|
if timer_address and timer_value is not None:
|
|
# Create timer progress record
|
|
timer_progress = TimerProgress(
|
|
timestamp=datetime.now().isoformat(),
|
|
mode=current_mode,
|
|
timer_address=timer_address,
|
|
timer_value=timer_value,
|
|
progress_percent=progress,
|
|
countdown_rate=self._calculate_countdown_rate(timer_address, timer_value),
|
|
expected_duration=self.config.EXPECTED_SCREEN_DURATIONS.get(current_mode)
|
|
)
|
|
|
|
self.timer_progress_history.append(timer_progress)
|
|
|
|
# Check for stuck timer
|
|
if self._is_timer_stuck(timer_address, timer_value):
|
|
self.logger.warning(f"⚠️ Timer R{timer_address} appears stuck at {timer_value}")
|
|
self.test_results.timer_issues += 1
|
|
|
|
def _calculate_countdown_rate(self, timer_address: int, current_value: int) -> float:
|
|
"""Calculate timer countdown rate"""
|
|
if timer_address not in self.last_timer_values:
|
|
self.last_timer_values[timer_address] = {
|
|
"value": current_value,
|
|
"timestamp": time.time()
|
|
}
|
|
return 0.0
|
|
|
|
last_data = self.last_timer_values[timer_address]
|
|
time_diff = time.time() - last_data["timestamp"]
|
|
value_diff = last_data["value"] - current_value
|
|
|
|
if time_diff > 0:
|
|
rate = value_diff / time_diff
|
|
|
|
# Update stored values
|
|
self.last_timer_values[timer_address] = {
|
|
"value": current_value,
|
|
"timestamp": time.time()
|
|
}
|
|
|
|
return rate
|
|
|
|
return 0.0
|
|
|
|
def _is_timer_stuck(self, timer_address: int, current_value: int) -> bool:
|
|
"""Check if timer appears to be stuck"""
|
|
if timer_address not in self.last_timer_values:
|
|
return False
|
|
|
|
last_data = self.last_timer_values[timer_address]
|
|
time_since_change = time.time() - last_data["timestamp"]
|
|
|
|
return (current_value == last_data["value"] and
|
|
current_value > 0 and
|
|
time_since_change > self.config.STUCK_TIMER_THRESHOLD)
|
|
|
|
def _get_screen_name(self, mode: int) -> str:
|
|
"""Get human-readable screen name"""
|
|
screen_names = {
|
|
34: "DTS Requested",
|
|
5: "Priming",
|
|
6: "Init",
|
|
7: "Production",
|
|
8: "Fresh Water Flush",
|
|
2: "Standby (Complete)"
|
|
}
|
|
return screen_names.get(mode, f"Mode {mode}")
|
|
|
|
def _format_duration(self, seconds: float) -> str:
|
|
"""Format duration in human-readable format"""
|
|
if seconds < 60:
|
|
return f"{seconds:.1f}s"
|
|
elif seconds < 3600:
|
|
minutes = int(seconds // 60)
|
|
secs = int(seconds % 60)
|
|
return f"{minutes}m {secs}s"
|
|
else:
|
|
hours = int(seconds // 3600)
|
|
minutes = int((seconds % 3600) // 60)
|
|
return f"{hours}h {minutes}m"
|
|
|
|
def display_progress_bar(self, progress: int, width: int = 20) -> str:
|
|
"""Create a visual progress bar"""
|
|
filled = int(width * progress / 100)
|
|
bar = "█" * filled + "░" * (width - filled)
|
|
return f"{bar} {progress}%"
|
|
|
|
def monitor_dts_progress(self) -> bool:
|
|
"""Main monitoring loop"""
|
|
self.logger.info("🔍 Starting DTS progress monitoring...")
|
|
|
|
start_time = time.time()
|
|
last_progress_update = 0
|
|
monitoring_active = True
|
|
|
|
while monitoring_active:
|
|
try:
|
|
# Get current task status
|
|
current_state = self.get_task_status()
|
|
if not current_state:
|
|
self.logger.error("❌ Failed to get task status")
|
|
time.sleep(self.config.POLLING_INTERVAL)
|
|
continue
|
|
|
|
# Check for transitions
|
|
if self.previous_state:
|
|
transition = self.detect_screen_transition(current_state, self.previous_state)
|
|
if transition:
|
|
self.log_transition_event(transition)
|
|
|
|
# Analyze timer progress
|
|
self.analyze_timer_progress(current_state)
|
|
|
|
# Display progress updates
|
|
current_time = time.time()
|
|
if current_time - last_progress_update >= self.config.PROGRESS_UPDATE_INTERVAL:
|
|
self._display_current_status(current_state)
|
|
last_progress_update = current_time
|
|
|
|
# Check if DTS process is complete
|
|
task_status = current_state.get("status", "")
|
|
current_step = current_state.get("current_step", "")
|
|
|
|
if task_status == "completed" or current_step == "dts_process_complete":
|
|
self.logger.info("✅ DTS process completed successfully!")
|
|
self.test_results.success = True
|
|
monitoring_active = False
|
|
elif task_status == "failed":
|
|
self.logger.error("❌ DTS process failed!")
|
|
error_msg = current_state.get("last_error", {}).get("message", "Unknown error")
|
|
self.test_results.error_messages.append(f"DTS process failed: {error_msg}")
|
|
monitoring_active = False
|
|
|
|
# Check for timeout
|
|
elapsed_time = current_time - start_time
|
|
if elapsed_time > self.config.TRANSITION_TIMEOUT:
|
|
self.logger.error(f"❌ Monitoring timeout after {elapsed_time:.1f}s")
|
|
self.test_results.error_messages.append("Monitoring timeout exceeded")
|
|
monitoring_active = False
|
|
|
|
# Store current state for next iteration
|
|
self.previous_state = current_state
|
|
|
|
# Wait before next poll
|
|
time.sleep(self.config.POLLING_INTERVAL)
|
|
|
|
except KeyboardInterrupt:
|
|
self.logger.info("👋 Monitoring stopped by user")
|
|
monitoring_active = False
|
|
except Exception as e:
|
|
self.logger.error(f"❌ Error during monitoring: {e}")
|
|
self.test_results.error_messages.append(f"Monitoring error: {str(e)}")
|
|
time.sleep(self.config.POLLING_INTERVAL)
|
|
|
|
# Calculate final results
|
|
self.test_results.total_duration_seconds = time.time() - start_time
|
|
self.test_results.screens_completed = len(self.transition_history)
|
|
|
|
return self.test_results.success
|
|
|
|
def _display_current_status(self, current_state: Dict):
|
|
"""Display current status information"""
|
|
current_step = current_state.get("current_step", "")
|
|
progress = current_state.get("progress_percent", 0)
|
|
timer_info = current_state.get("timer_info", {})
|
|
|
|
# Get current mode and screen name
|
|
current_mode = timer_info.get("current_mode", 0)
|
|
screen_name = self._get_screen_name(current_mode)
|
|
|
|
self.logger.info(f"⏳ Current: {screen_name} (Mode {current_mode})")
|
|
|
|
if timer_info.get("timer_address"):
|
|
timer_addr = timer_info["timer_address"]
|
|
timer_val = timer_info.get("raw_timer_value", 0)
|
|
progress_bar = self.display_progress_bar(progress)
|
|
self.logger.info(f"📊 Progress: {progress_bar}")
|
|
self.logger.info(f"⏱️ Timer R{timer_addr}: {timer_val}")
|
|
|
|
# Show elapsed time
|
|
if self.start_time:
|
|
elapsed = time.time() - self.start_time
|
|
self.logger.info(f"🕐 Elapsed: {self._format_duration(elapsed)}")
|
|
|
|
def generate_reports(self):
|
|
"""Generate test reports"""
|
|
self.logger.info("📊 Generating test reports...")
|
|
|
|
# Update final test results
|
|
self.test_results.end_time = datetime.now().isoformat()
|
|
|
|
# Generate JSON report
|
|
self._generate_json_report()
|
|
|
|
# Generate CSV report if enabled
|
|
if self.config.CSV_EXPORT_ENABLED:
|
|
self._generate_csv_report()
|
|
|
|
# Display summary
|
|
self._display_test_summary()
|
|
|
|
def _generate_json_report(self):
|
|
"""Generate detailed JSON report"""
|
|
report_data = {
|
|
"test_session": {
|
|
"start_time": self.test_results.start_time,
|
|
"end_time": self.test_results.end_time,
|
|
"api_endpoint": self.api_base_url,
|
|
"task_id": self.current_task_id,
|
|
"total_duration_seconds": self.test_results.total_duration_seconds
|
|
},
|
|
"results": asdict(self.test_results),
|
|
"transitions": [asdict(t) for t in self.transition_history],
|
|
"timer_progress": [asdict(t) for t in self.timer_progress_history]
|
|
}
|
|
|
|
# Save to file
|
|
report_file = Path("reports") / f"dts_test_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
|
|
with open(report_file, 'w') as f:
|
|
json.dump(report_data, f, indent=2)
|
|
|
|
self.logger.info(f"📄 JSON report saved: {report_file}")
|
|
|
|
def _generate_csv_report(self):
|
|
"""Generate CSV report for timer data"""
|
|
if not self.timer_progress_history:
|
|
return
|
|
|
|
csv_file = Path("data") / f"dts_timer_data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.csv"
|
|
|
|
with open(csv_file, 'w', newline='') as f:
|
|
writer = csv.writer(f)
|
|
writer.writerow([
|
|
"timestamp", "mode", "timer_address", "timer_value",
|
|
"progress_percent", "countdown_rate", "expected_duration"
|
|
])
|
|
|
|
for progress in self.timer_progress_history:
|
|
writer.writerow([
|
|
progress.timestamp, progress.mode, progress.timer_address,
|
|
progress.timer_value, progress.progress_percent,
|
|
progress.countdown_rate, progress.expected_duration
|
|
])
|
|
|
|
self.logger.info(f"📊 CSV data saved: {csv_file}")
|
|
|
|
def _display_test_summary(self):
|
|
"""Display final test summary"""
|
|
results = self.test_results
|
|
|
|
self.logger.info("\n" + "="*60)
|
|
self.logger.info("📊 DTS API Test Summary")
|
|
self.logger.info("="*60)
|
|
|
|
# Test outcome
|
|
status_icon = "✅" if results.success else "❌"
|
|
status_text = "SUCCESS" if results.success else "FAILED"
|
|
self.logger.info(f"Status: {status_icon} {status_text}")
|
|
|
|
# Timing information
|
|
self.logger.info(f"Total Duration: {self._format_duration(results.total_duration_seconds)}")
|
|
self.logger.info(f"Screens Completed: {results.screens_completed}")
|
|
self.logger.info(f"Transitions Detected: {results.transitions_detected}")
|
|
|
|
# Error information
|
|
if results.api_errors > 0:
|
|
self.logger.warning(f"API Errors: {results.api_errors}")
|
|
if results.timer_issues > 0:
|
|
self.logger.warning(f"Timer Issues: {results.timer_issues}")
|
|
|
|
# Error messages
|
|
if results.error_messages:
|
|
self.logger.error("Error Messages:")
|
|
for error in results.error_messages:
|
|
self.logger.error(f" - {error}")
|
|
|
|
# Transition summary
|
|
if self.transition_history:
|
|
self.logger.info("\nTransition Summary:")
|
|
for i, transition in enumerate(self.transition_history, 1):
|
|
from_name = self._get_screen_name(transition.from_mode)
|
|
to_name = self._get_screen_name(transition.to_mode)
|
|
duration_str = self._format_duration(transition.duration_seconds)
|
|
self.logger.info(f" {i}. {from_name} → {to_name} ({duration_str})")
|
|
|
|
self.logger.info("="*60)
|
|
|
|
def run_test(self) -> bool:
|
|
"""Run the complete DTS API test"""
|
|
self.logger.info("🚀 DTS API Test Suite v1.0")
|
|
self.logger.info(f"📡 API Endpoint: {self.api_base_url}")
|
|
self.logger.info(f"⏰ Started: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
|
|
self.start_time = time.time()
|
|
self.test_results.start_time = datetime.now().isoformat()
|
|
|
|
try:
|
|
# Step 1: Check system status
|
|
if not self.check_system_status():
|
|
return False
|
|
|
|
# Step 2: Start DTS sequence
|
|
if not self.start_dts_sequence():
|
|
return False
|
|
|
|
# Step 3: Monitor progress
|
|
success = self.monitor_dts_progress()
|
|
|
|
# Step 4: Generate reports
|
|
self.generate_reports()
|
|
|
|
return success
|
|
|
|
except Exception as e:
|
|
self.logger.error(f"❌ Test execution failed: {e}")
|
|
self.test_results.error_messages.append(f"Test execution failed: {str(e)}")
|
|
return False
|
|
|
|
def main():
|
|
"""Main entry point"""
|
|
parser = argparse.ArgumentParser(description="DTS API Test Suite")
|
|
parser.add_argument("--api-url", default="http://localhost:5000/api",
|
|
help="API base URL")
|
|
parser.add_argument("--verbose", action="store_true",
|
|
help="Enable verbose output")
|
|
parser.add_argument("--export-csv", action="store_true",
|
|
help="Export timer data to CSV")
|
|
parser.add_argument("--config", help="Configuration file path")
|
|
parser.add_argument("--polling-interval", type=float, default=1.0,
|
|
help="Polling interval in seconds")
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Create configuration
|
|
config = TestConfig()
|
|
if args.verbose:
|
|
config.CONSOLE_VERBOSITY = "DEBUG"
|
|
if args.export_csv:
|
|
config.CSV_EXPORT_ENABLED = True
|
|
if args.polling_interval:
|
|
config.POLLING_INTERVAL = args.polling_interval
|
|
|
|
# Load custom config if provided
|
|
if args.config and os.path.exists(args.config):
|
|
with open(args.config, 'r') as f:
|
|
custom_config = json.load(f)
|
|
for key, value in custom_config.items():
|
|
if hasattr(config, key):
|
|
setattr(config, key, value)
|
|
|
|
# Create and run tester
|
|
tester = DTSAPITester(api_base_url=args.api_url, config=config)
|
|
success = tester.run_test()
|
|
|
|
# Exit with appropriate code
|
|
sys.exit(0 if success else 1)
|
|
|
|
if __name__ == "__main__":
|
|
main() |