496 lines
20 KiB
Python
496 lines
20 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Static analysis script to identify unused code in the watermaker PLC API.
|
|
This script analyzes the main application package to find unused functions,
|
|
methods, imports, variables, and classes.
|
|
"""
|
|
|
|
import ast
|
|
import os
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Dict, List, Set, Tuple, Any
|
|
from collections import defaultdict
|
|
import json
|
|
|
|
class CodeAnalyzer(ast.NodeVisitor):
|
|
"""AST visitor to analyze Python code for unused elements."""
|
|
|
|
def __init__(self, file_path: str):
|
|
self.file_path = file_path
|
|
self.imports = {} # name -> (module, alias)
|
|
self.from_imports = {} # name -> (module, original_name)
|
|
self.function_defs = set() # function names defined
|
|
self.class_defs = set() # class names defined
|
|
self.method_defs = {} # class_name -> set of method names
|
|
self.variable_assignments = set() # variable names assigned
|
|
self.function_calls = set() # function names called
|
|
self.attribute_accesses = set() # attribute names accessed
|
|
self.name_references = set() # all name references
|
|
self.decorators = set() # decorator names
|
|
self.string_literals = set() # string literals (for dynamic calls)
|
|
|
|
def visit_Import(self, node):
|
|
"""Track import statements."""
|
|
for alias in node.names:
|
|
name = alias.asname if alias.asname else alias.name
|
|
self.imports[name] = (alias.name, alias.asname)
|
|
self.generic_visit(node)
|
|
|
|
def visit_ImportFrom(self, node):
|
|
"""Track from-import statements."""
|
|
module = node.module or ''
|
|
for alias in node.names:
|
|
name = alias.asname if alias.asname else alias.name
|
|
self.from_imports[name] = (module, alias.name)
|
|
self.generic_visit(node)
|
|
|
|
def visit_FunctionDef(self, node):
|
|
"""Track function definitions."""
|
|
self.function_defs.add(node.name)
|
|
# Track decorators
|
|
for decorator in node.decorator_list:
|
|
if isinstance(decorator, ast.Name):
|
|
self.decorators.add(decorator.id)
|
|
elif isinstance(decorator, ast.Attribute):
|
|
self.decorators.add(decorator.attr)
|
|
self.generic_visit(node)
|
|
|
|
def visit_AsyncFunctionDef(self, node):
|
|
"""Track async function definitions."""
|
|
self.function_defs.add(node.name)
|
|
# Track decorators
|
|
for decorator in node.decorator_list:
|
|
if isinstance(decorator, ast.Name):
|
|
self.decorators.add(decorator.id)
|
|
elif isinstance(decorator, ast.Attribute):
|
|
self.decorators.add(decorator.attr)
|
|
self.generic_visit(node)
|
|
|
|
def visit_ClassDef(self, node):
|
|
"""Track class definitions and their methods."""
|
|
self.class_defs.add(node.name)
|
|
self.method_defs[node.name] = set()
|
|
|
|
# Track decorators
|
|
for decorator in node.decorator_list:
|
|
if isinstance(decorator, ast.Name):
|
|
self.decorators.add(decorator.id)
|
|
elif isinstance(decorator, ast.Attribute):
|
|
self.decorators.add(decorator.attr)
|
|
|
|
# Track methods in this class
|
|
for item in node.body:
|
|
if isinstance(item, (ast.FunctionDef, ast.AsyncFunctionDef)):
|
|
self.method_defs[node.name].add(item.name)
|
|
|
|
self.generic_visit(node)
|
|
|
|
def visit_Assign(self, node):
|
|
"""Track variable assignments."""
|
|
for target in node.targets:
|
|
if isinstance(target, ast.Name):
|
|
self.variable_assignments.add(target.id)
|
|
elif isinstance(target, ast.Tuple) or isinstance(target, ast.List):
|
|
for elt in target.elts:
|
|
if isinstance(elt, ast.Name):
|
|
self.variable_assignments.add(elt.id)
|
|
self.generic_visit(node)
|
|
|
|
def visit_Call(self, node):
|
|
"""Track function calls."""
|
|
if isinstance(node.func, ast.Name):
|
|
self.function_calls.add(node.func.id)
|
|
elif isinstance(node.func, ast.Attribute):
|
|
self.function_calls.add(node.func.attr)
|
|
if isinstance(node.func.value, ast.Name):
|
|
self.name_references.add(node.func.value.id)
|
|
self.generic_visit(node)
|
|
|
|
def visit_Attribute(self, node):
|
|
"""Track attribute accesses."""
|
|
self.attribute_accesses.add(node.attr)
|
|
if isinstance(node.value, ast.Name):
|
|
self.name_references.add(node.value.id)
|
|
self.generic_visit(node)
|
|
|
|
def visit_Name(self, node):
|
|
"""Track name references."""
|
|
if isinstance(node.ctx, ast.Load):
|
|
self.name_references.add(node.id)
|
|
self.generic_visit(node)
|
|
|
|
def visit_Str(self, node):
|
|
"""Track string literals for potential dynamic calls."""
|
|
self.string_literals.add(node.s)
|
|
self.generic_visit(node)
|
|
|
|
def visit_Constant(self, node):
|
|
"""Track constant values including strings."""
|
|
if isinstance(node.value, str):
|
|
self.string_literals.add(node.value)
|
|
self.generic_visit(node)
|
|
|
|
|
|
class UnusedCodeDetector:
|
|
"""Main class for detecting unused code in the project."""
|
|
|
|
def __init__(self, project_root: str):
|
|
self.project_root = Path(project_root)
|
|
self.package_root = self.project_root / "watermaker_plc_api"
|
|
self.analyzers = {} # file_path -> CodeAnalyzer
|
|
self.all_functions = set()
|
|
self.all_classes = set()
|
|
self.all_methods = {} # class_name -> set of methods
|
|
self.all_imports = {} # file_path -> imports
|
|
self.all_variables = {} # file_path -> variables
|
|
self.used_names = set()
|
|
self.flask_routes = set()
|
|
self.entry_points = set()
|
|
|
|
def analyze_file(self, file_path: Path) -> CodeAnalyzer:
|
|
"""Analyze a single Python file."""
|
|
try:
|
|
with open(file_path, 'r', encoding='utf-8') as f:
|
|
content = f.read()
|
|
|
|
tree = ast.parse(content, filename=str(file_path))
|
|
analyzer = CodeAnalyzer(str(file_path))
|
|
analyzer.visit(tree)
|
|
return analyzer
|
|
except Exception as e:
|
|
print(f"Error analyzing {file_path}: {e}")
|
|
return None
|
|
|
|
def find_python_files(self) -> List[Path]:
|
|
"""Find all Python files in the main package."""
|
|
python_files = []
|
|
for root, dirs, files in os.walk(self.package_root):
|
|
# Skip __pycache__ directories
|
|
dirs[:] = [d for d in dirs if d != '__pycache__']
|
|
|
|
for file in files:
|
|
if file.endswith('.py'):
|
|
python_files.append(Path(root) / file)
|
|
|
|
return python_files
|
|
|
|
def identify_entry_points(self):
|
|
"""Identify entry points that should never be removed."""
|
|
entry_points = {
|
|
'main', # from __main__.py
|
|
'create_app', # from app.py
|
|
'parse_args', # from __main__.py
|
|
}
|
|
|
|
# Flask route handlers are entry points
|
|
for analyzer in self.analyzers.values():
|
|
for decorator in analyzer.decorators:
|
|
if 'route' in decorator:
|
|
# Find functions with route decorators
|
|
# This is a simplified approach
|
|
pass
|
|
|
|
return entry_points
|
|
|
|
def identify_flask_routes(self):
|
|
"""Identify Flask route handler functions."""
|
|
routes = set()
|
|
|
|
for file_path, analyzer in self.analyzers.items():
|
|
if 'controller' in file_path:
|
|
# Look for functions with @bp.route decorators
|
|
try:
|
|
with open(file_path, 'r') as f:
|
|
lines = f.readlines()
|
|
|
|
for i, line in enumerate(lines):
|
|
if '@' in line and '.route(' in line:
|
|
# Next non-empty line should be function definition
|
|
for j in range(i + 1, min(i + 5, len(lines))):
|
|
next_line = lines[j].strip()
|
|
if next_line.startswith('def '):
|
|
func_name = next_line.split('(')[0].replace('def ', '')
|
|
routes.add(func_name)
|
|
break
|
|
except Exception as e:
|
|
print(f"Error finding routes in {file_path}: {e}")
|
|
|
|
return routes
|
|
|
|
def build_usage_graph(self):
|
|
"""Build a graph of what functions/classes are used where."""
|
|
used_names = set()
|
|
|
|
# Start with entry points
|
|
used_names.update(self.entry_points)
|
|
used_names.update(self.flask_routes)
|
|
|
|
# Add commonly used patterns
|
|
always_used = {
|
|
'__init__', '__str__', '__repr__', '__call__',
|
|
'get_logger', 'setup_error_handlers', 'create_app',
|
|
'get_plc_connection', 'get_data_cache', 'get_operation_state_manager',
|
|
'start_background_updates', 'get_task_manager'
|
|
}
|
|
used_names.update(always_used)
|
|
|
|
# Iteratively find used functions
|
|
changed = True
|
|
iterations = 0
|
|
max_iterations = 10
|
|
|
|
while changed and iterations < max_iterations:
|
|
changed = False
|
|
iterations += 1
|
|
old_size = len(used_names)
|
|
|
|
for analyzer in self.analyzers.values():
|
|
# If any function in this file is used, mark its dependencies
|
|
file_has_used_function = bool(
|
|
analyzer.function_defs.intersection(used_names) or
|
|
analyzer.class_defs.intersection(used_names)
|
|
)
|
|
|
|
if file_has_used_function:
|
|
used_names.update(analyzer.function_calls)
|
|
used_names.update(analyzer.name_references)
|
|
used_names.update(analyzer.attribute_accesses)
|
|
|
|
# Mark imported names as potentially used
|
|
used_names.update(analyzer.imports.keys())
|
|
used_names.update(analyzer.from_imports.keys())
|
|
|
|
if len(used_names) > old_size:
|
|
changed = True
|
|
|
|
return used_names
|
|
|
|
def find_unused_imports(self) -> Dict[str, List[str]]:
|
|
"""Find unused import statements."""
|
|
unused_imports = {}
|
|
|
|
for file_path, analyzer in self.analyzers.items():
|
|
unused_in_file = []
|
|
|
|
# Check regular imports
|
|
for name, (module, alias) in analyzer.imports.items():
|
|
if name not in analyzer.name_references and name not in self.used_names:
|
|
# Check if it's used in string literals (dynamic imports)
|
|
used_in_strings = any(name in s for s in analyzer.string_literals)
|
|
if not used_in_strings:
|
|
unused_in_file.append(f"import {module}" + (f" as {alias}" if alias else ""))
|
|
|
|
# Check from imports
|
|
for name, (module, original) in analyzer.from_imports.items():
|
|
if name not in analyzer.name_references and name not in self.used_names:
|
|
# Check if it's used in string literals
|
|
used_in_strings = any(name in s for s in analyzer.string_literals)
|
|
if not used_in_strings:
|
|
import_stmt = f"from {module} import {original}"
|
|
if name != original:
|
|
import_stmt += f" as {name}"
|
|
unused_in_file.append(import_stmt)
|
|
|
|
if unused_in_file:
|
|
unused_imports[file_path] = unused_in_file
|
|
|
|
return unused_imports
|
|
|
|
def find_unused_functions(self) -> Dict[str, List[str]]:
|
|
"""Find unused function definitions."""
|
|
unused_functions = {}
|
|
|
|
for file_path, analyzer in self.analyzers.items():
|
|
unused_in_file = []
|
|
|
|
for func_name in analyzer.function_defs:
|
|
# Skip special methods and entry points
|
|
if (func_name.startswith('__') or
|
|
func_name in self.used_names or
|
|
func_name in self.flask_routes or
|
|
func_name in self.entry_points):
|
|
continue
|
|
|
|
# Check if function is called anywhere
|
|
is_used = False
|
|
for other_analyzer in self.analyzers.values():
|
|
if (func_name in other_analyzer.function_calls or
|
|
func_name in other_analyzer.name_references or
|
|
any(func_name in s for s in other_analyzer.string_literals)):
|
|
is_used = True
|
|
break
|
|
|
|
if not is_used:
|
|
unused_in_file.append(func_name)
|
|
|
|
if unused_in_file:
|
|
unused_functions[file_path] = unused_in_file
|
|
|
|
return unused_functions
|
|
|
|
def find_unused_variables(self) -> Dict[str, List[str]]:
|
|
"""Find unused variable assignments."""
|
|
unused_variables = {}
|
|
|
|
for file_path, analyzer in self.analyzers.items():
|
|
unused_in_file = []
|
|
|
|
for var_name in analyzer.variable_assignments:
|
|
# Skip special variables and constants
|
|
if (var_name.startswith('_') or
|
|
var_name.isupper() or # Constants
|
|
var_name in self.used_names):
|
|
continue
|
|
|
|
# Check if variable is referenced
|
|
if (var_name not in analyzer.name_references and
|
|
not any(var_name in s for s in analyzer.string_literals)):
|
|
unused_in_file.append(var_name)
|
|
|
|
if unused_in_file:
|
|
unused_variables[file_path] = unused_in_file
|
|
|
|
return unused_variables
|
|
|
|
def analyze_project(self):
|
|
"""Run complete analysis of the project."""
|
|
print("🔍 Analyzing watermaker PLC API for unused code...")
|
|
|
|
# Find and analyze all Python files
|
|
python_files = self.find_python_files()
|
|
print(f"Found {len(python_files)} Python files to analyze")
|
|
|
|
for file_path in python_files:
|
|
analyzer = self.analyze_file(file_path)
|
|
if analyzer:
|
|
self.analyzers[str(file_path)] = analyzer
|
|
|
|
print(f"Successfully analyzed {len(self.analyzers)} files")
|
|
|
|
# Identify entry points and Flask routes
|
|
self.entry_points = self.identify_entry_points()
|
|
self.flask_routes = self.identify_flask_routes()
|
|
|
|
print(f"Identified {len(self.flask_routes)} Flask route handlers")
|
|
|
|
# Build usage graph
|
|
self.used_names = self.build_usage_graph()
|
|
print(f"Identified {len(self.used_names)} used names")
|
|
|
|
# Find unused code
|
|
unused_imports = self.find_unused_imports()
|
|
unused_functions = self.find_unused_functions()
|
|
unused_variables = self.find_unused_variables()
|
|
|
|
return {
|
|
'unused_imports': unused_imports,
|
|
'unused_functions': unused_functions,
|
|
'unused_variables': unused_variables,
|
|
'flask_routes': list(self.flask_routes),
|
|
'entry_points': list(self.entry_points),
|
|
'used_names_count': len(self.used_names),
|
|
'total_files_analyzed': len(self.analyzers)
|
|
}
|
|
|
|
def generate_report(self, results: Dict) -> str:
|
|
"""Generate a detailed report of unused code."""
|
|
report = []
|
|
report.append("# Unused Code Analysis Report")
|
|
report.append(f"Generated: {os.popen('date').read().strip()}")
|
|
report.append("")
|
|
|
|
# Summary
|
|
total_unused_imports = sum(len(imports) for imports in results['unused_imports'].values())
|
|
total_unused_functions = sum(len(funcs) for funcs in results['unused_functions'].values())
|
|
total_unused_variables = sum(len(vars) for vars in results['unused_variables'].values())
|
|
|
|
report.append("## Summary")
|
|
report.append(f"- Files analyzed: {results['total_files_analyzed']}")
|
|
report.append(f"- Flask routes found: {len(results['flask_routes'])}")
|
|
report.append(f"- Used names identified: {results['used_names_count']}")
|
|
report.append(f"- Unused imports: {total_unused_imports}")
|
|
report.append(f"- Unused functions: {total_unused_functions}")
|
|
report.append(f"- Unused variables: {total_unused_variables}")
|
|
report.append("")
|
|
|
|
# Unused imports
|
|
if results['unused_imports']:
|
|
report.append("## Unused Imports")
|
|
for file_path, imports in results['unused_imports'].items():
|
|
rel_path = os.path.relpath(file_path, self.project_root)
|
|
report.append(f"### {rel_path}")
|
|
for imp in imports:
|
|
report.append(f"- `{imp}`")
|
|
report.append("")
|
|
|
|
# Unused functions
|
|
if results['unused_functions']:
|
|
report.append("## Unused Functions")
|
|
for file_path, functions in results['unused_functions'].items():
|
|
rel_path = os.path.relpath(file_path, self.project_root)
|
|
report.append(f"### {rel_path}")
|
|
for func in functions:
|
|
report.append(f"- `{func}()`")
|
|
report.append("")
|
|
|
|
# Unused variables
|
|
if results['unused_variables']:
|
|
report.append("## Unused Variables")
|
|
for file_path, variables in results['unused_variables'].items():
|
|
rel_path = os.path.relpath(file_path, self.project_root)
|
|
report.append(f"### {rel_path}")
|
|
for var in variables:
|
|
report.append(f"- `{var}`")
|
|
report.append("")
|
|
|
|
# Flask routes (for reference)
|
|
if results['flask_routes']:
|
|
report.append("## Flask Routes (Preserved)")
|
|
for route in sorted(results['flask_routes']):
|
|
report.append(f"- `{route}()`")
|
|
report.append("")
|
|
|
|
return "\n".join(report)
|
|
|
|
|
|
def main():
|
|
"""Main function to run the unused code analysis."""
|
|
project_root = os.getcwd()
|
|
|
|
print("🚀 Starting unused code analysis for Watermaker PLC API")
|
|
print(f"Project root: {project_root}")
|
|
|
|
detector = UnusedCodeDetector(project_root)
|
|
results = detector.analyze_project()
|
|
|
|
# Generate report
|
|
report = detector.generate_report(results)
|
|
|
|
# Save report to file
|
|
report_file = "unused_code_analysis_report.md"
|
|
with open(report_file, 'w') as f:
|
|
f.write(report)
|
|
|
|
print(f"📊 Analysis complete! Report saved to: {report_file}")
|
|
|
|
# Save detailed results as JSON
|
|
json_file = "unused_code_analysis_results.json"
|
|
with open(json_file, 'w') as f:
|
|
json.dump(results, f, indent=2, default=str)
|
|
|
|
print(f"📋 Detailed results saved to: {json_file}")
|
|
|
|
# Print summary
|
|
total_unused = (
|
|
sum(len(imports) for imports in results['unused_imports'].values()) +
|
|
sum(len(funcs) for funcs in results['unused_functions'].values()) +
|
|
sum(len(vars) for vars in results['unused_variables'].values())
|
|
)
|
|
|
|
print(f"\n✨ Found {total_unused} potentially unused code elements")
|
|
print("Review the report before proceeding with removal!")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |