Coverage for core / platform / evolution_engine.py: 87.9%
132 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Evolution Engine — HART knows when code is being written for it.
4Self-aware static analysis that detects good patterns, anti-patterns,
5and core-subsystem changes. When code touches HART's own internals,
6the engine raises awareness, suggests improvements, and emits events
7so the platform can respond (extra review, CI gates, upgrade guards).
9Follows the ManifestValidator / budget_gate.py pattern:
10 - Static methods, fail-closed, clear reasons
11 - Zero new dependencies (stdlib ast + re only)
13Usage:
14 from core.platform.evolution_engine import EvolutionEngine
16 analysis = EvolutionEngine.analyze_changes(['core/platform/registry.py'])
17 anti = EvolutionEngine.detect_anti_patterns(source_code)
18 good = EvolutionEngine.detect_good_patterns(source_code)
19 suggestions = EvolutionEngine.suggest_improvements(analysis)
20 EvolutionEngine.emit_suggestions(suggestions, analysis)
21"""
23import ast
24import logging
25import re
26from typing import Any, Dict, List
28logger = logging.getLogger('hevolve.platform')
30# ─── Good Patterns (to detect and reward) ────────────────────────
32GOOD_PATTERNS: Dict[str, str] = {
33 'budget_gate_validation': r'(valid|ok|allowed),\s*(reason|errors?|msg)',
34 'frozen_values': r'_FrozenValues|__slots__\s*=\s*\(\)',
35 'singleton_pattern': r'_instance\s*=\s*None.*\ndef\s+get_',
36 'event_emission': r'emit_event\s*\(',
37 'context_manager_db': r'with\s+db_session\(\)',
38 'manifest_validation': r'ManifestValidator\.validate\(',
39 'sandbox_analysis': r'ExtensionSandbox\.analyze',
40 'service_registry_usage': r'registry\.(register|get|has)\(',
41}
43# ─── Anti-Patterns (to detect and flag) ──────────────────────────
45ANTI_PATTERNS: Dict[str, Any] = {
46 'hardcoded_port': r'(?:port|PORT)\s*=\s*\d{4,5}(?!\s*#\s*(?:default|fallback))',
47 'manual_db_close': r'\.close\(\).*(?:session|db|conn)',
48 'bare_except': r'except\s*:',
49 'eval_usage': r'(?<![a-zA-Z_])eval\s*\(',
50 'exec_usage': r'(?<![a-zA-Z_])exec\s*\(',
51 'star_import': r'from\s+\w+\s+import\s+\*',
52 'hardcoded_key': r'(?:api_key|secret|password|token)\s*=\s*["\'][^"\']{8,}',
53 'missing_type_hints': None, # Special: detected via AST
54}
56# ─── Self-Awareness Thresholds ───────────────────────────────────
58SELF_AWARENESS_THRESHOLDS: Dict[str, Any] = {
59 'core_platform_changes': 3,
60 'security_changes': 1,
61 'manifest_or_registry_changes': 2,
62 'test_coverage_decrease': 0.05,
63}
65# ─── Core Subsystem Identification ───────────────────────────────
67CORE_SUBSYSTEMS: Dict[str, str] = {
68 'core/platform/': 'platform_layer',
69 'core/': 'core',
70 'security/': 'security',
71 'hart_sdk/': 'sdk',
72 'integrations/agent_engine/': 'agent_engine',
73 'integrations/social/': 'social',
74 'integrations/channels/': 'channels',
75 'integrations/remote_desktop/': 'remote_desktop',
76}
78# ─── Anti-pattern descriptions ───────────────────────────────────
80_ANTI_PATTERN_DESCRIPTIONS: Dict[str, str] = {
81 'hardcoded_port': 'Hardcoded port number — use core/port_registry.py get_port() instead',
82 'manual_db_close': 'Manual DB close — use "with db_session()" context manager instead',
83 'bare_except': 'Bare except — catch specific exceptions to avoid masking bugs',
84 'eval_usage': 'eval() usage — dangerous; use ast.literal_eval() or explicit parsing',
85 'exec_usage': 'exec() usage — dangerous; use safer alternatives',
86 'star_import': 'Star import — use explicit imports for clarity and linting',
87 'hardcoded_key': 'Hardcoded secret — use environment variables or config.json',
88 'missing_type_hints': 'Function missing return type annotation',
89}
91# ─── Good pattern descriptions ───────────────────────────────────
93_GOOD_PATTERN_DESCRIPTIONS: Dict[str, str] = {
94 'budget_gate_validation': 'Uses (valid, reason) return pattern (budget_gate style)',
95 'frozen_values': 'Uses _FrozenValues / __slots__ immutability pattern',
96 'singleton_pattern': 'Uses _instance = None + get_*() singleton pattern',
97 'event_emission': 'Uses emit_event() for decoupled communication',
98 'context_manager_db': 'Uses db_session() context manager for safe DB access',
99 'manifest_validation': 'Uses ManifestValidator.validate() for app integrity',
100 'sandbox_analysis': 'Uses ExtensionSandbox.analyze for safe extension loading',
101 'service_registry_usage': 'Uses ServiceRegistry for dependency management',
102}
105class EvolutionEngine:
106 """Self-aware evolution engine — HART knows when code is being written for it.
108 All methods are static — no instance state needed.
109 """
111 @staticmethod
112 def analyze_changes(changed_files: List[str], diff_content: str = '') -> dict:
113 """Analyze a set of changed files for self-awareness.
115 Identifies which core subsystems are affected, checks whether changes
116 exceed self-awareness thresholds, and returns structured analysis.
118 Args:
119 changed_files: List of file paths (relative to repo root).
120 diff_content: Optional unified diff content for deeper analysis.
122 Returns:
123 {
124 'self_aware': bool,
125 'affected_subsystems': list,
126 'suggestions': list,
127 'pattern_matches': dict,
128 }
129 """
130 affected_subsystems: List[str] = []
131 subsystem_counts: Dict[str, int] = {}
133 # Normalize paths to forward slashes for consistent matching
134 normalized = [f.replace('\\', '/') for f in changed_files]
136 for filepath in normalized:
137 for prefix, subsystem in CORE_SUBSYSTEMS.items():
138 if filepath.startswith(prefix):
139 if subsystem not in affected_subsystems:
140 affected_subsystems.append(subsystem)
141 subsystem_counts[subsystem] = subsystem_counts.get(subsystem, 0) + 1
142 break # longest prefix first in dict — match first hit
144 # Determine self-awareness
145 self_aware = False
146 suggestions: List[str] = []
148 # Check thresholds
149 platform_count = subsystem_counts.get('platform_layer', 0)
150 if platform_count >= SELF_AWARENESS_THRESHOLDS['core_platform_changes']:
151 self_aware = True
152 suggestions.append(
153 f'Core platform layer has {platform_count} changed files '
154 f'(threshold: {SELF_AWARENESS_THRESHOLDS["core_platform_changes"]}) '
155 f'— extra review recommended')
157 security_count = subsystem_counts.get('security', 0)
158 if security_count >= SELF_AWARENESS_THRESHOLDS['security_changes']:
159 self_aware = True
160 suggestions.append(
161 f'Security subsystem has {security_count} changed file(s) '
162 f'(threshold: {SELF_AWARENESS_THRESHOLDS["security_changes"]}) '
163 f'— security audit required')
165 # manifest_or_registry = changes touching app_manifest, app_registry,
166 # manifest_validator, or registry
167 registry_keywords = ('manifest', 'registry')
168 registry_count = sum(
169 1 for f in normalized
170 if any(kw in f.lower() for kw in registry_keywords)
171 )
172 if registry_count >= SELF_AWARENESS_THRESHOLDS['manifest_or_registry_changes']:
173 self_aware = True
174 suggestions.append(
175 f'{registry_count} manifest/registry files changed '
176 f'(threshold: {SELF_AWARENESS_THRESHOLDS["manifest_or_registry_changes"]}) '
177 f'— validate app catalog integrity')
179 # Scan diff content for pattern matches
180 pattern_matches: Dict[str, List[dict]] = {
181 'anti_patterns': [],
182 'good_patterns': [],
183 }
184 if diff_content:
185 pattern_matches['anti_patterns'] = EvolutionEngine.detect_anti_patterns(
186 diff_content, '<diff>')
187 pattern_matches['good_patterns'] = EvolutionEngine.detect_good_patterns(
188 diff_content, '<diff>')
190 return {
191 'self_aware': self_aware,
192 'affected_subsystems': affected_subsystems,
193 'suggestions': suggestions,
194 'pattern_matches': pattern_matches,
195 }
197 @staticmethod
198 def detect_anti_patterns(source: str, filename: str = '') -> List[dict]:
199 """Scan source code for anti-patterns.
201 Checks regex-based anti-patterns and uses AST for missing_type_hints.
203 Args:
204 source: Python source code to analyze.
205 filename: Optional filename for context in results.
207 Returns:
208 List of {'pattern': name, 'line': lineno, 'description': str}
209 """
210 results: List[dict] = []
212 lines = source.split('\n')
214 # Regex-based anti-patterns
215 for name, pattern in ANTI_PATTERNS.items():
216 if pattern is None:
217 continue # AST-based — handled below
218 regex = re.compile(pattern)
219 for i, line in enumerate(lines, start=1):
220 if regex.search(line):
221 results.append({
222 'pattern': name,
223 'line': i,
224 'description': _ANTI_PATTERN_DESCRIPTIONS.get(
225 name, f'Anti-pattern: {name}'),
226 })
228 # AST-based: missing_type_hints
229 try:
230 tree = ast.parse(source, filename=filename or '<string>')
231 for node in ast.walk(tree):
232 if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
233 if node.returns is None:
234 results.append({
235 'pattern': 'missing_type_hints',
236 'line': node.lineno,
237 'description': (
238 f'Function \'{node.name}\' missing return type '
239 f'annotation — {_ANTI_PATTERN_DESCRIPTIONS["missing_type_hints"]}'),
240 })
241 except SyntaxError:
242 pass # Non-Python or invalid source — skip AST analysis
244 return results
246 @staticmethod
247 def detect_good_patterns(source: str, filename: str = '') -> List[dict]:
248 """Scan source code for good patterns that follow HART conventions.
250 Args:
251 source: Python source code to analyze.
252 filename: Optional filename for context in results.
254 Returns:
255 List of {'pattern': name, 'line': lineno, 'description': str}
256 """
257 results: List[dict] = []
259 for name, pattern in GOOD_PATTERNS.items():
260 regex = re.compile(pattern, re.DOTALL)
261 for match in regex.finditer(source):
262 # Calculate line number from match position
263 lineno = source[:match.start()].count('\n') + 1
264 results.append({
265 'pattern': name,
266 'line': lineno,
267 'description': _GOOD_PATTERN_DESCRIPTIONS.get(
268 name, f'Good pattern: {name}'),
269 })
271 return results
273 @staticmethod
274 def suggest_improvements(analysis: dict) -> List[str]:
275 """Generate improvement suggestions from an analysis result.
277 Args:
278 analysis: Result from analyze_changes().
280 Returns:
281 List of human-readable suggestion strings.
282 """
283 suggestions = list(analysis.get('suggestions', []))
285 # Security subsystem needs extra review
286 if (analysis.get('self_aware')
287 and 'security' in analysis.get('affected_subsystems', [])):
288 sec_msg = ('Security subsystem affected — ensure guardrail '
289 'immutability is preserved and run security test suite')
290 if sec_msg not in suggestions:
291 suggestions.append(sec_msg)
293 # Anti-pattern suggestions
294 anti_patterns = analysis.get('pattern_matches', {}).get('anti_patterns', [])
295 seen_patterns = set()
296 for ap in anti_patterns:
297 name = ap.get('pattern', '')
298 if name not in seen_patterns:
299 seen_patterns.add(name)
300 desc = _ANTI_PATTERN_DESCRIPTIONS.get(name, f'Fix: {name}')
301 suggestions.append(f'Anti-pattern detected: {desc}')
303 # If no good patterns found, suggest adoption
304 good_patterns = analysis.get('pattern_matches', {}).get('good_patterns', [])
305 if not good_patterns and analysis.get('self_aware'):
306 suggestions.append(
307 'No recognized HART patterns found in diff — consider adopting '
308 'emit_event(), db_session(), or ManifestValidator.validate()')
310 # Try AI-powered suggestions via ModelBusService
311 if anti_patterns or analysis.get('self_aware'):
312 try:
313 from integrations.agent_engine.model_bus_service import get_model_bus_service
314 bus = get_model_bus_service()
315 if bus:
316 prompt = (
317 'Suggest improvements for HART OS code changes. '
318 f'Affected subsystems: {analysis.get("affected_subsystems", [])}. '
319 f'Anti-patterns found: {[ap.get("pattern") for ap in anti_patterns]}.')
320 result = bus.infer(prompt=prompt)
321 if result and 'response' in result:
322 suggestions.append(result['response'])
323 except Exception:
324 pass # AI suggestions are best-effort
326 return suggestions
328 @staticmethod
329 def should_suggest(changed_files: List[str]) -> bool:
330 """Determine whether the engine should emit suggestions for these changes.
332 Returns True if the number of files touching core subsystems exceeds
333 any threshold in SELF_AWARENESS_THRESHOLDS.
335 Args:
336 changed_files: List of file paths (relative to repo root).
338 Returns:
339 True if suggestions should be emitted.
340 """
341 if not changed_files:
342 return False
344 normalized = [f.replace('\\', '/') for f in changed_files]
345 subsystem_counts: Dict[str, int] = {}
347 for filepath in normalized:
348 for prefix, subsystem in CORE_SUBSYSTEMS.items():
349 if filepath.startswith(prefix):
350 subsystem_counts[subsystem] = subsystem_counts.get(subsystem, 0) + 1
351 break
353 # Check each threshold
354 platform_count = subsystem_counts.get('platform_layer', 0)
355 if platform_count >= SELF_AWARENESS_THRESHOLDS['core_platform_changes']:
356 return True
358 security_count = subsystem_counts.get('security', 0)
359 if security_count >= SELF_AWARENESS_THRESHOLDS['security_changes']:
360 return True
362 registry_keywords = ('manifest', 'registry')
363 registry_count = sum(
364 1 for f in normalized
365 if any(kw in f.lower() for kw in registry_keywords)
366 )
367 if registry_count >= SELF_AWARENESS_THRESHOLDS['manifest_or_registry_changes']:
368 return True
370 return False
372 @staticmethod
373 def emit_suggestions(suggestions: List[str], analysis: dict) -> None:
374 """Emit evolution events to the platform EventBus.
376 Events emitted:
377 - evolution.suggestion: For each suggestion
378 - evolution.pattern_violation: For each anti-pattern detected
379 - evolution.complexity_warning: When self_aware triggers
381 Args:
382 suggestions: List of suggestion strings.
383 analysis: Result from analyze_changes().
384 """
385 try:
386 from core.platform.events import emit_event
388 # Emit each suggestion
389 for suggestion in suggestions:
390 emit_event('evolution.suggestion', {
391 'message': suggestion,
392 'affected_subsystems': analysis.get('affected_subsystems', []),
393 })
395 # Emit pattern violations
396 anti_patterns = analysis.get('pattern_matches', {}).get('anti_patterns', [])
397 for ap in anti_patterns:
398 emit_event('evolution.pattern_violation', {
399 'pattern': ap.get('pattern', ''),
400 'line': ap.get('line', 0),
401 'description': ap.get('description', ''),
402 })
404 # Emit complexity warning if self-aware
405 if analysis.get('self_aware'):
406 emit_event('evolution.complexity_warning', {
407 'affected_subsystems': analysis.get('affected_subsystems', []),
408 'suggestion_count': len(suggestions),
409 })
411 except Exception:
412 pass # Event emission is best-effort — never block callers