#!/usr/bin/env python3 """ Static analysis script to identify unused code in the watermaker PLC API. This script analyzes the main application package to find unused functions, methods, imports, variables, and classes. """ import ast import os import sys from pathlib import Path from typing import Dict, List, Set, Tuple, Any from collections import defaultdict import json class CodeAnalyzer(ast.NodeVisitor): """AST visitor to analyze Python code for unused elements.""" def __init__(self, file_path: str): self.file_path = file_path self.imports = {} # name -> (module, alias) self.from_imports = {} # name -> (module, original_name) self.function_defs = set() # function names defined self.class_defs = set() # class names defined self.method_defs = {} # class_name -> set of method names self.variable_assignments = set() # variable names assigned self.function_calls = set() # function names called self.attribute_accesses = set() # attribute names accessed self.name_references = set() # all name references self.decorators = set() # decorator names self.string_literals = set() # string literals (for dynamic calls) def visit_Import(self, node): """Track import statements.""" for alias in node.names: name = alias.asname if alias.asname else alias.name self.imports[name] = (alias.name, alias.asname) self.generic_visit(node) def visit_ImportFrom(self, node): """Track from-import statements.""" module = node.module or '' for alias in node.names: name = alias.asname if alias.asname else alias.name self.from_imports[name] = (module, alias.name) self.generic_visit(node) def visit_FunctionDef(self, node): """Track function definitions.""" self.function_defs.add(node.name) # Track decorators for decorator in node.decorator_list: if isinstance(decorator, ast.Name): self.decorators.add(decorator.id) elif isinstance(decorator, ast.Attribute): self.decorators.add(decorator.attr) self.generic_visit(node) def visit_AsyncFunctionDef(self, node): """Track async function definitions.""" self.function_defs.add(node.name) # Track decorators for decorator in node.decorator_list: if isinstance(decorator, ast.Name): self.decorators.add(decorator.id) elif isinstance(decorator, ast.Attribute): self.decorators.add(decorator.attr) self.generic_visit(node) def visit_ClassDef(self, node): """Track class definitions and their methods.""" self.class_defs.add(node.name) self.method_defs[node.name] = set() # Track decorators for decorator in node.decorator_list: if isinstance(decorator, ast.Name): self.decorators.add(decorator.id) elif isinstance(decorator, ast.Attribute): self.decorators.add(decorator.attr) # Track methods in this class for item in node.body: if isinstance(item, (ast.FunctionDef, ast.AsyncFunctionDef)): self.method_defs[node.name].add(item.name) self.generic_visit(node) def visit_Assign(self, node): """Track variable assignments.""" for target in node.targets: if isinstance(target, ast.Name): self.variable_assignments.add(target.id) elif isinstance(target, ast.Tuple) or isinstance(target, ast.List): for elt in target.elts: if isinstance(elt, ast.Name): self.variable_assignments.add(elt.id) self.generic_visit(node) def visit_Call(self, node): """Track function calls.""" if isinstance(node.func, ast.Name): self.function_calls.add(node.func.id) elif isinstance(node.func, ast.Attribute): self.function_calls.add(node.func.attr) if isinstance(node.func.value, ast.Name): self.name_references.add(node.func.value.id) self.generic_visit(node) def visit_Attribute(self, node): """Track attribute accesses.""" self.attribute_accesses.add(node.attr) if isinstance(node.value, ast.Name): self.name_references.add(node.value.id) self.generic_visit(node) def visit_Name(self, node): """Track name references.""" if isinstance(node.ctx, ast.Load): self.name_references.add(node.id) self.generic_visit(node) def visit_Str(self, node): """Track string literals for potential dynamic calls.""" self.string_literals.add(node.s) self.generic_visit(node) def visit_Constant(self, node): """Track constant values including strings.""" if isinstance(node.value, str): self.string_literals.add(node.value) self.generic_visit(node) class UnusedCodeDetector: """Main class for detecting unused code in the project.""" def __init__(self, project_root: str): self.project_root = Path(project_root) self.package_root = self.project_root / "watermaker_plc_api" self.analyzers = {} # file_path -> CodeAnalyzer self.all_functions = set() self.all_classes = set() self.all_methods = {} # class_name -> set of methods self.all_imports = {} # file_path -> imports self.all_variables = {} # file_path -> variables self.used_names = set() self.flask_routes = set() self.entry_points = set() def analyze_file(self, file_path: Path) -> CodeAnalyzer: """Analyze a single Python file.""" try: with open(file_path, 'r', encoding='utf-8') as f: content = f.read() tree = ast.parse(content, filename=str(file_path)) analyzer = CodeAnalyzer(str(file_path)) analyzer.visit(tree) return analyzer except Exception as e: print(f"Error analyzing {file_path}: {e}") return None def find_python_files(self) -> List[Path]: """Find all Python files in the main package.""" python_files = [] for root, dirs, files in os.walk(self.package_root): # Skip __pycache__ directories dirs[:] = [d for d in dirs if d != '__pycache__'] for file in files: if file.endswith('.py'): python_files.append(Path(root) / file) return python_files def identify_entry_points(self): """Identify entry points that should never be removed.""" entry_points = { 'main', # from __main__.py 'create_app', # from app.py 'parse_args', # from __main__.py } # Flask route handlers are entry points for analyzer in self.analyzers.values(): for decorator in analyzer.decorators: if 'route' in decorator: # Find functions with route decorators # This is a simplified approach pass return entry_points def identify_flask_routes(self): """Identify Flask route handler functions.""" routes = set() for file_path, analyzer in self.analyzers.items(): if 'controller' in file_path: # Look for functions with @bp.route decorators try: with open(file_path, 'r') as f: lines = f.readlines() for i, line in enumerate(lines): if '@' in line and '.route(' in line: # Next non-empty line should be function definition for j in range(i + 1, min(i + 5, len(lines))): next_line = lines[j].strip() if next_line.startswith('def '): func_name = next_line.split('(')[0].replace('def ', '') routes.add(func_name) break except Exception as e: print(f"Error finding routes in {file_path}: {e}") return routes def build_usage_graph(self): """Build a graph of what functions/classes are used where.""" used_names = set() # Start with entry points used_names.update(self.entry_points) used_names.update(self.flask_routes) # Add commonly used patterns always_used = { '__init__', '__str__', '__repr__', '__call__', 'get_logger', 'setup_error_handlers', 'create_app', 'get_plc_connection', 'get_data_cache', 'get_operation_state_manager', 'start_background_updates', 'get_task_manager' } used_names.update(always_used) # Iteratively find used functions changed = True iterations = 0 max_iterations = 10 while changed and iterations < max_iterations: changed = False iterations += 1 old_size = len(used_names) for analyzer in self.analyzers.values(): # If any function in this file is used, mark its dependencies file_has_used_function = bool( analyzer.function_defs.intersection(used_names) or analyzer.class_defs.intersection(used_names) ) if file_has_used_function: used_names.update(analyzer.function_calls) used_names.update(analyzer.name_references) used_names.update(analyzer.attribute_accesses) # Mark imported names as potentially used used_names.update(analyzer.imports.keys()) used_names.update(analyzer.from_imports.keys()) if len(used_names) > old_size: changed = True return used_names def find_unused_imports(self) -> Dict[str, List[str]]: """Find unused import statements.""" unused_imports = {} for file_path, analyzer in self.analyzers.items(): unused_in_file = [] # Check regular imports for name, (module, alias) in analyzer.imports.items(): if name not in analyzer.name_references and name not in self.used_names: # Check if it's used in string literals (dynamic imports) used_in_strings = any(name in s for s in analyzer.string_literals) if not used_in_strings: unused_in_file.append(f"import {module}" + (f" as {alias}" if alias else "")) # Check from imports for name, (module, original) in analyzer.from_imports.items(): if name not in analyzer.name_references and name not in self.used_names: # Check if it's used in string literals used_in_strings = any(name in s for s in analyzer.string_literals) if not used_in_strings: import_stmt = f"from {module} import {original}" if name != original: import_stmt += f" as {name}" unused_in_file.append(import_stmt) if unused_in_file: unused_imports[file_path] = unused_in_file return unused_imports def find_unused_functions(self) -> Dict[str, List[str]]: """Find unused function definitions.""" unused_functions = {} for file_path, analyzer in self.analyzers.items(): unused_in_file = [] for func_name in analyzer.function_defs: # Skip special methods and entry points if (func_name.startswith('__') or func_name in self.used_names or func_name in self.flask_routes or func_name in self.entry_points): continue # Check if function is called anywhere is_used = False for other_analyzer in self.analyzers.values(): if (func_name in other_analyzer.function_calls or func_name in other_analyzer.name_references or any(func_name in s for s in other_analyzer.string_literals)): is_used = True break if not is_used: unused_in_file.append(func_name) if unused_in_file: unused_functions[file_path] = unused_in_file return unused_functions def find_unused_variables(self) -> Dict[str, List[str]]: """Find unused variable assignments.""" unused_variables = {} for file_path, analyzer in self.analyzers.items(): unused_in_file = [] for var_name in analyzer.variable_assignments: # Skip special variables and constants if (var_name.startswith('_') or var_name.isupper() or # Constants var_name in self.used_names): continue # Check if variable is referenced if (var_name not in analyzer.name_references and not any(var_name in s for s in analyzer.string_literals)): unused_in_file.append(var_name) if unused_in_file: unused_variables[file_path] = unused_in_file return unused_variables def analyze_project(self): """Run complete analysis of the project.""" print("šŸ” Analyzing watermaker PLC API for unused code...") # Find and analyze all Python files python_files = self.find_python_files() print(f"Found {len(python_files)} Python files to analyze") for file_path in python_files: analyzer = self.analyze_file(file_path) if analyzer: self.analyzers[str(file_path)] = analyzer print(f"Successfully analyzed {len(self.analyzers)} files") # Identify entry points and Flask routes self.entry_points = self.identify_entry_points() self.flask_routes = self.identify_flask_routes() print(f"Identified {len(self.flask_routes)} Flask route handlers") # Build usage graph self.used_names = self.build_usage_graph() print(f"Identified {len(self.used_names)} used names") # Find unused code unused_imports = self.find_unused_imports() unused_functions = self.find_unused_functions() unused_variables = self.find_unused_variables() return { 'unused_imports': unused_imports, 'unused_functions': unused_functions, 'unused_variables': unused_variables, 'flask_routes': list(self.flask_routes), 'entry_points': list(self.entry_points), 'used_names_count': len(self.used_names), 'total_files_analyzed': len(self.analyzers) } def generate_report(self, results: Dict) -> str: """Generate a detailed report of unused code.""" report = [] report.append("# Unused Code Analysis Report") report.append(f"Generated: {os.popen('date').read().strip()}") report.append("") # Summary total_unused_imports = sum(len(imports) for imports in results['unused_imports'].values()) total_unused_functions = sum(len(funcs) for funcs in results['unused_functions'].values()) total_unused_variables = sum(len(vars) for vars in results['unused_variables'].values()) report.append("## Summary") report.append(f"- Files analyzed: {results['total_files_analyzed']}") report.append(f"- Flask routes found: {len(results['flask_routes'])}") report.append(f"- Used names identified: {results['used_names_count']}") report.append(f"- Unused imports: {total_unused_imports}") report.append(f"- Unused functions: {total_unused_functions}") report.append(f"- Unused variables: {total_unused_variables}") report.append("") # Unused imports if results['unused_imports']: report.append("## Unused Imports") for file_path, imports in results['unused_imports'].items(): rel_path = os.path.relpath(file_path, self.project_root) report.append(f"### {rel_path}") for imp in imports: report.append(f"- `{imp}`") report.append("") # Unused functions if results['unused_functions']: report.append("## Unused Functions") for file_path, functions in results['unused_functions'].items(): rel_path = os.path.relpath(file_path, self.project_root) report.append(f"### {rel_path}") for func in functions: report.append(f"- `{func}()`") report.append("") # Unused variables if results['unused_variables']: report.append("## Unused Variables") for file_path, variables in results['unused_variables'].items(): rel_path = os.path.relpath(file_path, self.project_root) report.append(f"### {rel_path}") for var in variables: report.append(f"- `{var}`") report.append("") # Flask routes (for reference) if results['flask_routes']: report.append("## Flask Routes (Preserved)") for route in sorted(results['flask_routes']): report.append(f"- `{route}()`") report.append("") return "\n".join(report) def main(): """Main function to run the unused code analysis.""" project_root = os.getcwd() print("šŸš€ Starting unused code analysis for Watermaker PLC API") print(f"Project root: {project_root}") detector = UnusedCodeDetector(project_root) results = detector.analyze_project() # Generate report report = detector.generate_report(results) # Save report to file report_file = "unused_code_analysis_report.md" with open(report_file, 'w') as f: f.write(report) print(f"šŸ“Š Analysis complete! Report saved to: {report_file}") # Save detailed results as JSON json_file = "unused_code_analysis_results.json" with open(json_file, 'w') as f: json.dump(results, f, indent=2, default=str) print(f"šŸ“‹ Detailed results saved to: {json_file}") # Print summary total_unused = ( sum(len(imports) for imports in results['unused_imports'].values()) + sum(len(funcs) for funcs in results['unused_functions'].values()) + sum(len(vars) for vars in results['unused_variables'].values()) ) print(f"\n✨ Found {total_unused} potentially unused code elements") print("Review the report before proceeding with removal!") if __name__ == "__main__": main()