Coverage for core / platform / evolution_engine.py: 87.9%

132 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Evolution Engine — HART knows when code is being written for it. 

3 

4Self-aware static analysis that detects good patterns, anti-patterns, 

5and core-subsystem changes. When code touches HART's own internals, 

6the engine raises awareness, suggests improvements, and emits events 

7so the platform can respond (extra review, CI gates, upgrade guards). 

8 

9Follows the ManifestValidator / budget_gate.py pattern: 

10 - Static methods, fail-closed, clear reasons 

11 - Zero new dependencies (stdlib ast + re only) 

12 

13Usage: 

14 from core.platform.evolution_engine import EvolutionEngine 

15 

16 analysis = EvolutionEngine.analyze_changes(['core/platform/registry.py']) 

17 anti = EvolutionEngine.detect_anti_patterns(source_code) 

18 good = EvolutionEngine.detect_good_patterns(source_code) 

19 suggestions = EvolutionEngine.suggest_improvements(analysis) 

20 EvolutionEngine.emit_suggestions(suggestions, analysis) 

21""" 

22 

23import ast 

24import logging 

25import re 

26from typing import Any, Dict, List 

27 

28logger = logging.getLogger('hevolve.platform') 

29 

30# ─── Good Patterns (to detect and reward) ──────────────────────── 

31 

32GOOD_PATTERNS: Dict[str, str] = { 

33 'budget_gate_validation': r'(valid|ok|allowed),\s*(reason|errors?|msg)', 

34 'frozen_values': r'_FrozenValues|__slots__\s*=\s*\(\)', 

35 'singleton_pattern': r'_instance\s*=\s*None.*\ndef\s+get_', 

36 'event_emission': r'emit_event\s*\(', 

37 'context_manager_db': r'with\s+db_session\(\)', 

38 'manifest_validation': r'ManifestValidator\.validate\(', 

39 'sandbox_analysis': r'ExtensionSandbox\.analyze', 

40 'service_registry_usage': r'registry\.(register|get|has)\(', 

41} 

42 

43# ─── Anti-Patterns (to detect and flag) ────────────────────────── 

44 

45ANTI_PATTERNS: Dict[str, Any] = { 

46 'hardcoded_port': r'(?:port|PORT)\s*=\s*\d{4,5}(?!\s*#\s*(?:default|fallback))', 

47 'manual_db_close': r'\.close\(\).*(?:session|db|conn)', 

48 'bare_except': r'except\s*:', 

49 'eval_usage': r'(?<![a-zA-Z_])eval\s*\(', 

50 'exec_usage': r'(?<![a-zA-Z_])exec\s*\(', 

51 'star_import': r'from\s+\w+\s+import\s+\*', 

52 'hardcoded_key': r'(?:api_key|secret|password|token)\s*=\s*["\'][^"\']{8,}', 

53 'missing_type_hints': None, # Special: detected via AST 

54} 

55 

56# ─── Self-Awareness Thresholds ─────────────────────────────────── 

57 

58SELF_AWARENESS_THRESHOLDS: Dict[str, Any] = { 

59 'core_platform_changes': 3, 

60 'security_changes': 1, 

61 'manifest_or_registry_changes': 2, 

62 'test_coverage_decrease': 0.05, 

63} 

64 

65# ─── Core Subsystem Identification ─────────────────────────────── 

66 

67CORE_SUBSYSTEMS: Dict[str, str] = { 

68 'core/platform/': 'platform_layer', 

69 'core/': 'core', 

70 'security/': 'security', 

71 'hart_sdk/': 'sdk', 

72 'integrations/agent_engine/': 'agent_engine', 

73 'integrations/social/': 'social', 

74 'integrations/channels/': 'channels', 

75 'integrations/remote_desktop/': 'remote_desktop', 

76} 

77 

78# ─── Anti-pattern descriptions ─────────────────────────────────── 

79 

80_ANTI_PATTERN_DESCRIPTIONS: Dict[str, str] = { 

81 'hardcoded_port': 'Hardcoded port number — use core/port_registry.py get_port() instead', 

82 'manual_db_close': 'Manual DB close — use "with db_session()" context manager instead', 

83 'bare_except': 'Bare except — catch specific exceptions to avoid masking bugs', 

84 'eval_usage': 'eval() usage — dangerous; use ast.literal_eval() or explicit parsing', 

85 'exec_usage': 'exec() usage — dangerous; use safer alternatives', 

86 'star_import': 'Star import — use explicit imports for clarity and linting', 

87 'hardcoded_key': 'Hardcoded secret — use environment variables or config.json', 

88 'missing_type_hints': 'Function missing return type annotation', 

89} 

90 

91# ─── Good pattern descriptions ─────────────────────────────────── 

92 

93_GOOD_PATTERN_DESCRIPTIONS: Dict[str, str] = { 

94 'budget_gate_validation': 'Uses (valid, reason) return pattern (budget_gate style)', 

95 'frozen_values': 'Uses _FrozenValues / __slots__ immutability pattern', 

96 'singleton_pattern': 'Uses _instance = None + get_*() singleton pattern', 

97 'event_emission': 'Uses emit_event() for decoupled communication', 

98 'context_manager_db': 'Uses db_session() context manager for safe DB access', 

99 'manifest_validation': 'Uses ManifestValidator.validate() for app integrity', 

100 'sandbox_analysis': 'Uses ExtensionSandbox.analyze for safe extension loading', 

101 'service_registry_usage': 'Uses ServiceRegistry for dependency management', 

102} 

103 

104 

105class EvolutionEngine: 

106 """Self-aware evolution engine — HART knows when code is being written for it. 

107 

108 All methods are static — no instance state needed. 

109 """ 

110 

111 @staticmethod 

112 def analyze_changes(changed_files: List[str], diff_content: str = '') -> dict: 

113 """Analyze a set of changed files for self-awareness. 

114 

115 Identifies which core subsystems are affected, checks whether changes 

116 exceed self-awareness thresholds, and returns structured analysis. 

117 

118 Args: 

119 changed_files: List of file paths (relative to repo root). 

120 diff_content: Optional unified diff content for deeper analysis. 

121 

122 Returns: 

123 { 

124 'self_aware': bool, 

125 'affected_subsystems': list, 

126 'suggestions': list, 

127 'pattern_matches': dict, 

128 } 

129 """ 

130 affected_subsystems: List[str] = [] 

131 subsystem_counts: Dict[str, int] = {} 

132 

133 # Normalize paths to forward slashes for consistent matching 

134 normalized = [f.replace('\\', '/') for f in changed_files] 

135 

136 for filepath in normalized: 

137 for prefix, subsystem in CORE_SUBSYSTEMS.items(): 

138 if filepath.startswith(prefix): 

139 if subsystem not in affected_subsystems: 

140 affected_subsystems.append(subsystem) 

141 subsystem_counts[subsystem] = subsystem_counts.get(subsystem, 0) + 1 

142 break # longest prefix first in dict — match first hit 

143 

144 # Determine self-awareness 

145 self_aware = False 

146 suggestions: List[str] = [] 

147 

148 # Check thresholds 

149 platform_count = subsystem_counts.get('platform_layer', 0) 

150 if platform_count >= SELF_AWARENESS_THRESHOLDS['core_platform_changes']: 

151 self_aware = True 

152 suggestions.append( 

153 f'Core platform layer has {platform_count} changed files ' 

154 f'(threshold: {SELF_AWARENESS_THRESHOLDS["core_platform_changes"]}) ' 

155 f'— extra review recommended') 

156 

157 security_count = subsystem_counts.get('security', 0) 

158 if security_count >= SELF_AWARENESS_THRESHOLDS['security_changes']: 

159 self_aware = True 

160 suggestions.append( 

161 f'Security subsystem has {security_count} changed file(s) ' 

162 f'(threshold: {SELF_AWARENESS_THRESHOLDS["security_changes"]}) ' 

163 f'— security audit required') 

164 

165 # manifest_or_registry = changes touching app_manifest, app_registry, 

166 # manifest_validator, or registry 

167 registry_keywords = ('manifest', 'registry') 

168 registry_count = sum( 

169 1 for f in normalized 

170 if any(kw in f.lower() for kw in registry_keywords) 

171 ) 

172 if registry_count >= SELF_AWARENESS_THRESHOLDS['manifest_or_registry_changes']: 

173 self_aware = True 

174 suggestions.append( 

175 f'{registry_count} manifest/registry files changed ' 

176 f'(threshold: {SELF_AWARENESS_THRESHOLDS["manifest_or_registry_changes"]}) ' 

177 f'— validate app catalog integrity') 

178 

179 # Scan diff content for pattern matches 

180 pattern_matches: Dict[str, List[dict]] = { 

181 'anti_patterns': [], 

182 'good_patterns': [], 

183 } 

184 if diff_content: 

185 pattern_matches['anti_patterns'] = EvolutionEngine.detect_anti_patterns( 

186 diff_content, '<diff>') 

187 pattern_matches['good_patterns'] = EvolutionEngine.detect_good_patterns( 

188 diff_content, '<diff>') 

189 

190 return { 

191 'self_aware': self_aware, 

192 'affected_subsystems': affected_subsystems, 

193 'suggestions': suggestions, 

194 'pattern_matches': pattern_matches, 

195 } 

196 

197 @staticmethod 

198 def detect_anti_patterns(source: str, filename: str = '') -> List[dict]: 

199 """Scan source code for anti-patterns. 

200 

201 Checks regex-based anti-patterns and uses AST for missing_type_hints. 

202 

203 Args: 

204 source: Python source code to analyze. 

205 filename: Optional filename for context in results. 

206 

207 Returns: 

208 List of {'pattern': name, 'line': lineno, 'description': str} 

209 """ 

210 results: List[dict] = [] 

211 

212 lines = source.split('\n') 

213 

214 # Regex-based anti-patterns 

215 for name, pattern in ANTI_PATTERNS.items(): 

216 if pattern is None: 

217 continue # AST-based — handled below 

218 regex = re.compile(pattern) 

219 for i, line in enumerate(lines, start=1): 

220 if regex.search(line): 

221 results.append({ 

222 'pattern': name, 

223 'line': i, 

224 'description': _ANTI_PATTERN_DESCRIPTIONS.get( 

225 name, f'Anti-pattern: {name}'), 

226 }) 

227 

228 # AST-based: missing_type_hints 

229 try: 

230 tree = ast.parse(source, filename=filename or '<string>') 

231 for node in ast.walk(tree): 

232 if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): 

233 if node.returns is None: 

234 results.append({ 

235 'pattern': 'missing_type_hints', 

236 'line': node.lineno, 

237 'description': ( 

238 f'Function \'{node.name}\' missing return type ' 

239 f'annotation — {_ANTI_PATTERN_DESCRIPTIONS["missing_type_hints"]}'), 

240 }) 

241 except SyntaxError: 

242 pass # Non-Python or invalid source — skip AST analysis 

243 

244 return results 

245 

246 @staticmethod 

247 def detect_good_patterns(source: str, filename: str = '') -> List[dict]: 

248 """Scan source code for good patterns that follow HART conventions. 

249 

250 Args: 

251 source: Python source code to analyze. 

252 filename: Optional filename for context in results. 

253 

254 Returns: 

255 List of {'pattern': name, 'line': lineno, 'description': str} 

256 """ 

257 results: List[dict] = [] 

258 

259 for name, pattern in GOOD_PATTERNS.items(): 

260 regex = re.compile(pattern, re.DOTALL) 

261 for match in regex.finditer(source): 

262 # Calculate line number from match position 

263 lineno = source[:match.start()].count('\n') + 1 

264 results.append({ 

265 'pattern': name, 

266 'line': lineno, 

267 'description': _GOOD_PATTERN_DESCRIPTIONS.get( 

268 name, f'Good pattern: {name}'), 

269 }) 

270 

271 return results 

272 

273 @staticmethod 

274 def suggest_improvements(analysis: dict) -> List[str]: 

275 """Generate improvement suggestions from an analysis result. 

276 

277 Args: 

278 analysis: Result from analyze_changes(). 

279 

280 Returns: 

281 List of human-readable suggestion strings. 

282 """ 

283 suggestions = list(analysis.get('suggestions', [])) 

284 

285 # Security subsystem needs extra review 

286 if (analysis.get('self_aware') 

287 and 'security' in analysis.get('affected_subsystems', [])): 

288 sec_msg = ('Security subsystem affected — ensure guardrail ' 

289 'immutability is preserved and run security test suite') 

290 if sec_msg not in suggestions: 

291 suggestions.append(sec_msg) 

292 

293 # Anti-pattern suggestions 

294 anti_patterns = analysis.get('pattern_matches', {}).get('anti_patterns', []) 

295 seen_patterns = set() 

296 for ap in anti_patterns: 

297 name = ap.get('pattern', '') 

298 if name not in seen_patterns: 

299 seen_patterns.add(name) 

300 desc = _ANTI_PATTERN_DESCRIPTIONS.get(name, f'Fix: {name}') 

301 suggestions.append(f'Anti-pattern detected: {desc}') 

302 

303 # If no good patterns found, suggest adoption 

304 good_patterns = analysis.get('pattern_matches', {}).get('good_patterns', []) 

305 if not good_patterns and analysis.get('self_aware'): 

306 suggestions.append( 

307 'No recognized HART patterns found in diff — consider adopting ' 

308 'emit_event(), db_session(), or ManifestValidator.validate()') 

309 

310 # Try AI-powered suggestions via ModelBusService 

311 if anti_patterns or analysis.get('self_aware'): 

312 try: 

313 from integrations.agent_engine.model_bus_service import get_model_bus_service 

314 bus = get_model_bus_service() 

315 if bus: 

316 prompt = ( 

317 'Suggest improvements for HART OS code changes. ' 

318 f'Affected subsystems: {analysis.get("affected_subsystems", [])}. ' 

319 f'Anti-patterns found: {[ap.get("pattern") for ap in anti_patterns]}.') 

320 result = bus.infer(prompt=prompt) 

321 if result and 'response' in result: 

322 suggestions.append(result['response']) 

323 except Exception: 

324 pass # AI suggestions are best-effort 

325 

326 return suggestions 

327 

328 @staticmethod 

329 def should_suggest(changed_files: List[str]) -> bool: 

330 """Determine whether the engine should emit suggestions for these changes. 

331 

332 Returns True if the number of files touching core subsystems exceeds 

333 any threshold in SELF_AWARENESS_THRESHOLDS. 

334 

335 Args: 

336 changed_files: List of file paths (relative to repo root). 

337 

338 Returns: 

339 True if suggestions should be emitted. 

340 """ 

341 if not changed_files: 

342 return False 

343 

344 normalized = [f.replace('\\', '/') for f in changed_files] 

345 subsystem_counts: Dict[str, int] = {} 

346 

347 for filepath in normalized: 

348 for prefix, subsystem in CORE_SUBSYSTEMS.items(): 

349 if filepath.startswith(prefix): 

350 subsystem_counts[subsystem] = subsystem_counts.get(subsystem, 0) + 1 

351 break 

352 

353 # Check each threshold 

354 platform_count = subsystem_counts.get('platform_layer', 0) 

355 if platform_count >= SELF_AWARENESS_THRESHOLDS['core_platform_changes']: 

356 return True 

357 

358 security_count = subsystem_counts.get('security', 0) 

359 if security_count >= SELF_AWARENESS_THRESHOLDS['security_changes']: 

360 return True 

361 

362 registry_keywords = ('manifest', 'registry') 

363 registry_count = sum( 

364 1 for f in normalized 

365 if any(kw in f.lower() for kw in registry_keywords) 

366 ) 

367 if registry_count >= SELF_AWARENESS_THRESHOLDS['manifest_or_registry_changes']: 

368 return True 

369 

370 return False 

371 

372 @staticmethod 

373 def emit_suggestions(suggestions: List[str], analysis: dict) -> None: 

374 """Emit evolution events to the platform EventBus. 

375 

376 Events emitted: 

377 - evolution.suggestion: For each suggestion 

378 - evolution.pattern_violation: For each anti-pattern detected 

379 - evolution.complexity_warning: When self_aware triggers 

380 

381 Args: 

382 suggestions: List of suggestion strings. 

383 analysis: Result from analyze_changes(). 

384 """ 

385 try: 

386 from core.platform.events import emit_event 

387 

388 # Emit each suggestion 

389 for suggestion in suggestions: 

390 emit_event('evolution.suggestion', { 

391 'message': suggestion, 

392 'affected_subsystems': analysis.get('affected_subsystems', []), 

393 }) 

394 

395 # Emit pattern violations 

396 anti_patterns = analysis.get('pattern_matches', {}).get('anti_patterns', []) 

397 for ap in anti_patterns: 

398 emit_event('evolution.pattern_violation', { 

399 'pattern': ap.get('pattern', ''), 

400 'line': ap.get('line', 0), 

401 'description': ap.get('description', ''), 

402 }) 

403 

404 # Emit complexity warning if self-aware 

405 if analysis.get('self_aware'): 

406 emit_event('evolution.complexity_warning', { 

407 'affected_subsystems': analysis.get('affected_subsystems', []), 

408 'suggestion_count': len(suggestions), 

409 }) 

410 

411 except Exception: 

412 pass # Event emission is best-effort — never block callers