Coverage for core / compute_optimizer.py: 75.3%

477 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2OS Compute Optimizer — Make HARTOS a net positive on any system. 

3 

4Event-based system resource optimizer that: 

51. Monitors: CPU, RAM, disk I/O, network, swap, process priorities, temp files 

62. Optimizes: auto-tune process priorities, clean caches, manage swap, power profiles 

73. Proactive: random-interval hive action stream exploration for network-wide optimization 

84. Reports: emit EventBus events for dashboard, federation delta for hive-wide stats 

9 

10Design principles: 

11- Event-driven: only wake when thresholds breach or random hive tick fires 

12- Zero overhead when system is healthy — sleep when idle 

13- Cross-platform: Windows + Linux + macOS 

14- No ML in HARTOS — heuristic only 

15- NEVER kills processes, NEVER deletes user files — only system temp/cache 

16 

17Integration: 

18- EventBus: emits 'system.health.snapshot', 'system.optimization.applied' 

19- GPU detection: vram_manager.detect_gpu() (single source) 

20- Federation: anonymized stats contributed via delta channel 

21- Resource governor: respects governor mode (ACTIVE/IDLE/SLEEP) 

22 

23Singleton: get_optimizer() returns the module-level instance. 

24Flask blueprint: create_optimizer_blueprint() for REST API. 

25""" 

26 

27import collections 

28import enum 

29import logging 

30import os 

31import platform 

32import random 

33import tempfile 

34import threading 

35import time 

36from dataclasses import dataclass, field 

37from typing import Any, Dict, List, Optional 

38 

39logger = logging.getLogger(__name__) 

40 

41 

42# ===================================================================== 

43# Lazy Imports 

44# ===================================================================== 

45 

46def _try_import_psutil(): 

47 """Return psutil module or None if unavailable.""" 

48 try: 

49 import psutil 

50 return psutil 

51 except ImportError: 

52 return None 

53 

54 

55def _try_detect_gpu() -> Dict[str, Any]: 

56 """Detect GPU via vram_manager (single source of truth).""" 

57 try: 

58 from integrations.service_tools.vram_manager import detect_gpu 

59 return detect_gpu() 

60 except Exception: 

61 return {'name': 'none', 'total_gb': 0, 'free_gb': 0, 'cuda_available': False} 

62 

63 

64def _emit(topic: str, data: Any = None) -> None: 

65 """Emit an event on the platform EventBus (best-effort, no-op if not bootstrapped).""" 

66 try: 

67 from core.platform.events import emit_event 

68 emit_event(topic, data) 

69 except Exception: 

70 pass 

71 

72 

73# ===================================================================== 

74# Thresholds (configurable via env) 

75# ===================================================================== 

76 

77CPU_HIGH_THRESHOLD = float(os.environ.get('OPTIMIZER_CPU_HIGH', '80')) 

78RAM_HIGH_THRESHOLD = float(os.environ.get('OPTIMIZER_RAM_HIGH', '85')) 

79SWAP_HIGH_THRESHOLD = float(os.environ.get('OPTIMIZER_SWAP_HIGH', '50')) 

80DISK_HIGH_THRESHOLD = float(os.environ.get('OPTIMIZER_DISK_HIGH', '90')) 

81 

82# Monitor loop sleeps this long between threshold checks (seconds) 

83MONITOR_INTERVAL = float(os.environ.get('OPTIMIZER_MONITOR_INTERVAL', '15')) 

84 

85# Hive exploration: random interval bounds (seconds) 

86HIVE_EXPLORE_MIN = float(os.environ.get('OPTIMIZER_HIVE_MIN', '300')) # 5 min 

87HIVE_EXPLORE_MAX = float(os.environ.get('OPTIMIZER_HIVE_MAX', '1800')) # 30 min 

88 

89# Max age of temp files to clean (seconds) — default 24h 

90TEMP_CLEAN_MAX_AGE = float(os.environ.get('OPTIMIZER_TEMP_MAX_AGE', '86400')) 

91 

92# Optimization history buffer size 

93HISTORY_MAXLEN = 200 

94 

95 

96# ===================================================================== 

97# Enums & Dataclasses 

98# ===================================================================== 

99 

100class ActionType(enum.Enum): 

101 """Types of optimization actions the optimizer can take.""" 

102 PRIORITY_ADJUST = 'priority_adjust' 

103 CACHE_CLEAN = 'cache_clean' 

104 SWAP_MANAGE = 'swap_manage' 

105 POWER_TUNE = 'power_tune' 

106 PROCESS_SUGGEST = 'process_suggest' 

107 NETWORK_TUNE = 'network_tune' 

108 

109 

110@dataclass 

111class SystemSnapshot: 

112 """Point-in-time snapshot of system resource utilization.""" 

113 timestamp: float = 0.0 

114 cpu_percent: float = 0.0 

115 ram_percent: float = 0.0 

116 ram_used_gb: float = 0.0 

117 ram_total_gb: float = 0.0 

118 swap_percent: float = 0.0 

119 disk_usage_percent: float = 0.0 # Root/C: partition 

120 disk_io_read_mb: float = 0.0 # Cumulative MB read 

121 disk_io_write_mb: float = 0.0 # Cumulative MB written 

122 net_sent_mb: float = 0.0 # Cumulative MB sent 

123 net_recv_mb: float = 0.0 # Cumulative MB received 

124 top_processes: List[Dict] = field(default_factory=list) # [{name, pid, cpu%, mem%}] 

125 gpu_util_percent: float = 0.0 

126 gpu_mem_used_gb: float = 0.0 

127 gpu_mem_total_gb: float = 0.0 

128 platform_name: str = '' 

129 

130 def to_dict(self) -> Dict: 

131 return { 

132 'timestamp': self.timestamp, 

133 'cpu_percent': round(self.cpu_percent, 1), 

134 'ram_percent': round(self.ram_percent, 1), 

135 'ram_used_gb': round(self.ram_used_gb, 2), 

136 'ram_total_gb': round(self.ram_total_gb, 2), 

137 'swap_percent': round(self.swap_percent, 1), 

138 'disk_usage_percent': round(self.disk_usage_percent, 1), 

139 'disk_io_read_mb': round(self.disk_io_read_mb, 1), 

140 'disk_io_write_mb': round(self.disk_io_write_mb, 1), 

141 'net_sent_mb': round(self.net_sent_mb, 1), 

142 'net_recv_mb': round(self.net_recv_mb, 1), 

143 'top_processes': self.top_processes[:10], 

144 'gpu_util_percent': round(self.gpu_util_percent, 1), 

145 'gpu_mem_used_gb': round(self.gpu_mem_used_gb, 2), 

146 'gpu_mem_total_gb': round(self.gpu_mem_total_gb, 2), 

147 'platform': self.platform_name, 

148 } 

149 

150 

151@dataclass 

152class OptimizationAction: 

153 """A single optimization action the optimizer can take or suggest.""" 

154 action_type: ActionType 

155 target: str # What is being optimized (e.g., process name, path) 

156 params: Dict = field(default_factory=dict) 

157 impact_estimate: str = '' # Human-readable expected impact 

158 applied: bool = False 

159 timestamp: float = 0.0 

160 result: str = '' # Outcome after applying 

161 

162 def to_dict(self) -> Dict: 

163 return { 

164 'action_type': self.action_type.value, 

165 'target': self.target, 

166 'params': self.params, 

167 'impact_estimate': self.impact_estimate, 

168 'applied': self.applied, 

169 'timestamp': self.timestamp, 

170 'result': self.result, 

171 } 

172 

173 

174# ===================================================================== 

175# ComputeOptimizer 

176# ===================================================================== 

177 

178class ComputeOptimizer: 

179 """Event-based system optimizer — makes HARTOS a net positive on any host. 

180 

181 Only wakes when thresholds breach or a random hive-exploration tick fires. 

182 All optimizations are non-destructive: never kills processes, never deletes 

183 user files, only cleans system temp/cache and adjusts priorities. 

184 """ 

185 

186 def __init__(self): 

187 self._lock = threading.Lock() 

188 self._running = False 

189 self._monitor_thread: Optional[threading.Thread] = None 

190 self._hive_thread: Optional[threading.Thread] = None 

191 self._stop_event = threading.Event() 

192 

193 # Snapshot history (bounded) 

194 self._snapshots: collections.deque = collections.deque(maxlen=60) 

195 self._last_snapshot: Optional[SystemSnapshot] = None 

196 

197 # Optimization history (bounded) 

198 self._history: collections.deque = collections.deque(maxlen=HISTORY_MAXLEN) 

199 

200 # Stats counters 

201 self._optimizations_applied: int = 0 

202 self._suggestions_made: int = 0 

203 self._hive_explorations: int = 0 

204 self._cache_bytes_freed: int = 0 

205 

206 # Cooldowns: action_type -> last_applied_timestamp 

207 self._cooldowns: Dict[str, float] = {} 

208 

209 # Platform 

210 self._platform = platform.system() # 'Windows', 'Linux', 'Darwin' 

211 

212 # ── Lifecycle ───────────────────────────────────────────────── 

213 

214 def start(self) -> None: 

215 """Start background monitor and hive exploration threads.""" 

216 with self._lock: 

217 if self._running: 

218 return 

219 self._running = True 

220 self._stop_event.clear() 

221 

222 self._monitor_thread = threading.Thread( 

223 target=self._monitor_loop, 

224 name='compute_optimizer_monitor', 

225 daemon=True, 

226 ) 

227 self._hive_thread = threading.Thread( 

228 target=self._hive_explore_loop, 

229 name='compute_optimizer_hive', 

230 daemon=True, 

231 ) 

232 self._monitor_thread.start() 

233 self._hive_thread.start() 

234 logger.info("ComputeOptimizer started (monitor=%.0fs, hive=%.0f-%.0fs)", 

235 MONITOR_INTERVAL, HIVE_EXPLORE_MIN, HIVE_EXPLORE_MAX) 

236 

237 def stop(self) -> None: 

238 """Stop background threads gracefully.""" 

239 with self._lock: 

240 if not self._running: 

241 return 

242 self._running = False 

243 self._stop_event.set() 

244 if self._monitor_thread and self._monitor_thread.is_alive(): 

245 self._monitor_thread.join(timeout=5) 

246 if self._hive_thread and self._hive_thread.is_alive(): 

247 self._hive_thread.join(timeout=5) 

248 logger.info("ComputeOptimizer stopped") 

249 

250 # ── Monitor Loop (event-driven: sleep between checks) ───────── 

251 

252 def _monitor_loop(self) -> None: 

253 """Periodic threshold check — sleeps between iterations.""" 

254 while not self._stop_event.is_set(): 

255 try: 

256 snap = self.snapshot() 

257 self._emit_stats(snap) 

258 

259 # Only act when thresholds breached 

260 actions = self._check_thresholds(snap) 

261 if actions: 

262 suggestions = self._suggest_optimizations(snap) 

263 for action in suggestions: 

264 self._apply_optimization(action) 

265 except Exception as e: 

266 logger.debug("ComputeOptimizer monitor error: %s", e) 

267 

268 self._stop_event.wait(MONITOR_INTERVAL) 

269 

270 def _hive_explore_loop(self) -> None: 

271 """Random-interval hive action stream exploration.""" 

272 while not self._stop_event.is_set(): 

273 delay = random.uniform(HIVE_EXPLORE_MIN, HIVE_EXPLORE_MAX) 

274 if self._stop_event.wait(delay): 

275 break # Stop requested during sleep 

276 try: 

277 self._explore_hive_stream() 

278 with self._lock: 

279 self._hive_explorations += 1 

280 except Exception as e: 

281 logger.debug("ComputeOptimizer hive exploration error: %s", e) 

282 

283 # ── Snapshot ────────────────────────────────────────────────── 

284 

285 # ── Snapshot collectors (extracted helpers — single source of truth 

286 # for cpu/ram/swap/disk/network reads + per-process walk + GPU 

287 # detection. Both ``snapshot()`` (the canonical full-snapshot 

288 # entry, behavior preserved exactly) and ``get_latest_snapshot`` 

289 # (the cached cross-module accessor) compose these helpers. 

290 # Splitting the collectors lets cross-module readers (e.g. 

291 # ResourceGovernor, which only needs aggregates) request just 

292 # the cheap path without paying the per-process walk's GIL cost. 

293 # ────────────────────────────────────────────────────────────────── 

294 

295 def _collect_aggregates(self, snap: SystemSnapshot) -> None: 

296 """Populate cpu/ram/swap/disk/network on snap (cheap, GIL-light). 

297 

298 Always safe to call — has zero per-process / per-PID cost. Lifted 

299 verbatim from the original ``snapshot`` body, behavior preserved. 

300 """ 

301 psutil = _try_import_psutil() 

302 if psutil is None: 

303 return 

304 # CPU — non-blocking (interval=None returns since-last-call) 

305 snap.cpu_percent = psutil.cpu_percent(interval=None) 

306 

307 # RAM 

308 mem = psutil.virtual_memory() 

309 snap.ram_percent = mem.percent 

310 snap.ram_used_gb = mem.used / (1024 ** 3) 

311 snap.ram_total_gb = mem.total / (1024 ** 3) 

312 

313 # Swap 

314 swap = psutil.swap_memory() 

315 snap.swap_percent = swap.percent if swap.total > 0 else 0.0 

316 

317 # Disk usage (root partition) 

318 try: 

319 root = 'C:\\' if self._platform == 'Windows' else '/' 

320 disk = psutil.disk_usage(root) 

321 snap.disk_usage_percent = disk.percent 

322 except Exception: 

323 pass 

324 

325 # Disk I/O 

326 try: 

327 dio = psutil.disk_io_counters() 

328 if dio: 

329 snap.disk_io_read_mb = dio.read_bytes / (1024 ** 2) 

330 snap.disk_io_write_mb = dio.write_bytes / (1024 ** 2) 

331 except Exception: 

332 pass 

333 

334 # Network I/O 

335 try: 

336 nio = psutil.net_io_counters() 

337 if nio: 

338 snap.net_sent_mb = nio.bytes_sent / (1024 ** 2) 

339 snap.net_recv_mb = nio.bytes_recv / (1024 ** 2) 

340 except Exception: 

341 pass 

342 

343 def _collect_top_processes(self, snap: SystemSnapshot) -> None: 

344 """Populate snap.top_processes via psutil.process_iter (expensive). 

345 

346 WHY GATED: ``psutil.process_iter(['memory_percent'])`` on Windows 

347 walks every PID and issues OpenProcess + GetProcessMemoryInfo 

348 per process, all under the GIL. On a typical Windows box 

349 (~150-300 PIDs) that's 200ms-2s of GIL-held CPU per call — which, 

350 fired every MONITOR_INTERVAL seconds, periodically stalls WAMP 

351 heartbeats, the chat hot path, and daemon yield gates. py-spy 

352 thread dumps caught exactly this loop active+gil. The data is 

353 only read by ``_suggest_optimizations`` when a threshold is 

354 breached; emitting an empty list when nothing is hot costs 

355 nothing downstream. Gate covers cpu/ram/swap/disk so any 

356 breach (incl. 89.5% RAM situations) still gets the per-process 

357 detail it needs. 

358 """ 

359 psutil = _try_import_psutil() 

360 if psutil is None: 

361 return 

362 if not (snap.cpu_percent > CPU_HIGH_THRESHOLD 

363 or snap.ram_percent > RAM_HIGH_THRESHOLD 

364 or snap.swap_percent > SWAP_HIGH_THRESHOLD 

365 or snap.disk_usage_percent > DISK_HIGH_THRESHOLD): 

366 return 

367 try: 

368 procs = [] 

369 for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']): 

370 info = proc.info 

371 if info.get('cpu_percent', 0) > 0.1: 

372 procs.append({ 

373 'pid': info['pid'], 

374 'name': info.get('name', ''), 

375 'cpu_percent': round(info.get('cpu_percent', 0), 1), 

376 'mem_percent': round(info.get('memory_percent', 0), 1), 

377 }) 

378 procs.sort(key=lambda p: p['cpu_percent'], reverse=True) 

379 snap.top_processes = procs[:10] 

380 except Exception: 

381 pass 

382 

383 def _collect_gpu(self, snap: SystemSnapshot) -> None: 

384 """Populate snap GPU fields via vram_manager (single source of truth).""" 

385 gpu_info = _try_detect_gpu() 

386 if gpu_info.get('cuda_available'): 

387 snap.gpu_mem_total_gb = gpu_info.get('total_gb', 0) 

388 snap.gpu_mem_used_gb = snap.gpu_mem_total_gb - gpu_info.get('free_gb', 0) 

389 if snap.gpu_mem_total_gb > 0: 

390 snap.gpu_util_percent = (snap.gpu_mem_used_gb / snap.gpu_mem_total_gb) * 100 

391 

392 def _store_snapshot(self, snap: SystemSnapshot) -> None: 

393 """Append to history deque + update _last_snapshot under lock.""" 

394 with self._lock: 

395 self._snapshots.append(snap) 

396 self._last_snapshot = snap 

397 

398 def snapshot(self) -> SystemSnapshot: 

399 """Capture current system state. Returns a SystemSnapshot. 

400 

401 Uses psutil if available; falls back to minimal OS-level data. 

402 GPU info via vram_manager (single source of truth). 

403 

404 Behavior preserved from pre-refactor (2026-05-01 monitor 

405 unification): aggregates + per-process top-N (when threshold 

406 breached) + GPU + history append + _last_snapshot update + 

407 return snap. Body factored into ``_collect_*`` helpers so the 

408 cross-module ``get_latest_snapshot`` accessor can reuse the 

409 cheap aggregate path without paying the per-process GIL cost. 

410 Existing callers (``_monitor_loop``, ``_emit_stats``, the 

411 ``/optimizer/snapshot`` route, the test suite mocks) see the 

412 identical SystemSnapshot they always have. 

413 """ 

414 snap = SystemSnapshot( 

415 timestamp=time.time(), 

416 platform_name=self._platform, 

417 ) 

418 self._collect_aggregates(snap) 

419 self._collect_top_processes(snap) 

420 self._collect_gpu(snap) 

421 self._store_snapshot(snap) 

422 return snap 

423 

424 def get_latest_snapshot(self, 

425 max_age_s: float = 10.0, 

426 include_per_process: bool = False 

427 ) -> Optional[SystemSnapshot]: 

428 """Cached system snapshot for cross-module readers. 

429 

430 Primary consumer: ``ResourceGovernor._monitor_loop`` (5s cadence) 

431 which historically polled psutil itself, duplicating this 

432 module's polls. Returning a cached snapshot eliminates the 

433 parallel psutil call site (root cause logged 2026-05-01: 

434 py-spy caught both ``compute_optimizer_monitor`` AND 

435 ``ResourceGovernor-Monitor`` active+gil simultaneously, both 

436 in psutil cpu/memory paths). 

437 

438 Args: 

439 max_age_s: return cached snap if it's younger than this. 

440 Pass 0 to always force a fresh read. Default 10s — well 

441 under ResourceGovernor's IDLE_THRESHOLD_SECONDS (120s) 

442 buffer for ACTIVE→IDLE transitions, so worst-case stale 

443 data can't cause a perceptible mode-transition lag. 

444 include_per_process: if True, run the expensive 

445 ``process_iter`` walk during a refresh. Default False — 

446 cross-module readers like ResourceGovernor only need 

447 aggregates and would otherwise force a per-process walk 

448 on every 5s tick under sustained pressure. 

449 

450 Returns: 

451 The most-recent SystemSnapshot. Never None in steady state 

452 (a fresh snapshot is built when cache is empty). Returns 

453 None only if snapshot construction itself raises (defensive 

454 — caller should fall back to its own data source). 

455 """ 

456 # Lock-free read of the atomic ref — fast path when cache hot. 

457 last = self._last_snapshot 

458 if last is not None and (time.time() - last.timestamp) < max_age_s: 

459 return last 

460 # Stale or missing — build a fresh snap. Aggregates are 

461 # always cheap; top-processes only if caller asked. 

462 try: 

463 snap = SystemSnapshot( 

464 timestamp=time.time(), 

465 platform_name=self._platform, 

466 ) 

467 self._collect_aggregates(snap) 

468 if include_per_process: 

469 self._collect_top_processes(snap) 

470 self._collect_gpu(snap) 

471 self._store_snapshot(snap) 

472 return snap 

473 except Exception as e: 

474 logger.debug("get_latest_snapshot refresh failed: %s", e) 

475 return None 

476 

477 # ── Threshold Detection ─────────────────────────────────────── 

478 

479 def _check_thresholds(self, snap: SystemSnapshot) -> List[str]: 

480 """Detect which thresholds are breached. Returns list of breach names.""" 

481 breaches = [] 

482 if snap.cpu_percent > CPU_HIGH_THRESHOLD: 

483 breaches.append('cpu_high') 

484 if snap.ram_percent > RAM_HIGH_THRESHOLD: 

485 breaches.append('ram_high') 

486 if snap.swap_percent > SWAP_HIGH_THRESHOLD: 

487 breaches.append('swap_high') 

488 if snap.disk_usage_percent > DISK_HIGH_THRESHOLD: 

489 breaches.append('disk_high') 

490 return breaches 

491 

492 # ── Suggest Optimizations (heuristic) ───────────────────────── 

493 

494 def _suggest_optimizations(self, snap: SystemSnapshot) -> List[OptimizationAction]: 

495 """Generate non-destructive optimization suggestions based on snapshot.""" 

496 actions = [] 

497 now = time.time() 

498 

499 # CPU high: suggest lowering heavy non-essential processes 

500 if snap.cpu_percent > CPU_HIGH_THRESHOLD: 

501 if self._cooldown_ok(ActionType.PROCESS_SUGGEST, now, 120): 

502 hogs = [p for p in snap.top_processes 

503 if p.get('cpu_percent', 0) > 20 and p.get('name', '') != ''] 

504 for proc in hogs[:3]: 

505 actions.append(OptimizationAction( 

506 action_type=ActionType.PROCESS_SUGGEST, 

507 target=proc.get('name', 'unknown'), 

508 params={'pid': proc.get('pid', 0), 'cpu_percent': proc['cpu_percent']}, 

509 impact_estimate=f"Lower priority of {proc['name']} " 

510 f"(using {proc['cpu_percent']}% CPU)", 

511 )) 

512 

513 # RAM high: clean temp/cache 

514 if snap.ram_percent > RAM_HIGH_THRESHOLD: 

515 if self._cooldown_ok(ActionType.CACHE_CLEAN, now, 300): 

516 actions.append(OptimizationAction( 

517 action_type=ActionType.CACHE_CLEAN, 

518 target='system_temp', 

519 params={'max_age_seconds': TEMP_CLEAN_MAX_AGE}, 

520 impact_estimate='Free RAM by clearing stale temp files', 

521 )) 

522 

523 # Swap high: suggest reducing memory pressure 

524 if snap.swap_percent > SWAP_HIGH_THRESHOLD: 

525 if self._cooldown_ok(ActionType.SWAP_MANAGE, now, 600): 

526 actions.append(OptimizationAction( 

527 action_type=ActionType.SWAP_MANAGE, 

528 target='swap_pressure', 

529 params={'swap_percent': snap.swap_percent}, 

530 impact_estimate='Reduce swap usage by freeing cached memory', 

531 )) 

532 

533 # Disk high: clean temp directories 

534 if snap.disk_usage_percent > DISK_HIGH_THRESHOLD: 

535 if self._cooldown_ok(ActionType.CACHE_CLEAN, now, 600): 

536 actions.append(OptimizationAction( 

537 action_type=ActionType.CACHE_CLEAN, 

538 target='disk_temp', 

539 params={'max_age_seconds': TEMP_CLEAN_MAX_AGE}, 

540 impact_estimate='Free disk space by clearing stale temp files', 

541 )) 

542 

543 with self._lock: 

544 self._suggestions_made += len(actions) 

545 

546 return actions 

547 

548 def _cooldown_ok(self, action_type: ActionType, now: float, 

549 min_interval: float) -> bool: 

550 """Check if enough time has passed since last action of this type.""" 

551 key = action_type.value 

552 last = self._cooldowns.get(key, 0) 

553 return (now - last) >= min_interval 

554 

555 # ── Apply Optimizations (safe, non-destructive) ─────────────── 

556 

557 def _apply_optimization(self, action: OptimizationAction) -> None: 

558 """Execute an optimization action. Only non-destructive operations.""" 

559 action.timestamp = time.time() 

560 

561 try: 

562 if action.action_type == ActionType.PROCESS_SUGGEST: 

563 action.result = self._apply_priority_adjust(action) 

564 elif action.action_type == ActionType.CACHE_CLEAN: 

565 action.result = self._apply_cache_clean(action) 

566 elif action.action_type == ActionType.SWAP_MANAGE: 

567 action.result = self._apply_swap_manage(action) 

568 elif action.action_type == ActionType.POWER_TUNE: 

569 action.result = self._apply_power_tune(action) 

570 elif action.action_type == ActionType.NETWORK_TUNE: 

571 action.result = self._apply_network_tune(action) 

572 else: 

573 action.result = 'no handler' 

574 return 

575 

576 action.applied = True 

577 with self._lock: 

578 self._optimizations_applied += 1 

579 self._cooldowns[action.action_type.value] = time.time() 

580 self._history.append(action) 

581 

582 logger.info("Optimization applied: %s on %s -> %s", 

583 action.action_type.value, action.target, action.result) 

584 _emit('system.optimization.applied', action.to_dict()) 

585 

586 except Exception as e: 

587 action.result = f'error: {e}' 

588 with self._lock: 

589 self._history.append(action) 

590 logger.debug("Optimization failed: %s on %s: %s", 

591 action.action_type.value, action.target, e) 

592 

593 def _apply_priority_adjust(self, action: OptimizationAction) -> str: 

594 """Lower priority of a CPU-hogging process (NEVER kill).""" 

595 psutil = _try_import_psutil() 

596 if psutil is None: 

597 return 'psutil not available' 

598 

599 pid = action.params.get('pid', 0) 

600 if not pid: 

601 return 'no pid' 

602 

603 try: 

604 proc = psutil.Process(pid) 

605 name = proc.name() 

606 

607 # Safety: never touch system-critical processes 

608 protected = {'systemd', 'init', 'kernel', 'csrss.exe', 'svchost.exe', 

609 'wininit.exe', 'services.exe', 'lsass.exe', 'smss.exe', 

610 'System', 'explorer.exe', 'dwm.exe', 'loginwindow', 

611 'launchd', 'WindowServer'} 

612 if name in protected: 

613 return f'skipped (protected: {name})' 

614 

615 # Lower priority by one notch — never idle/realtime 

616 if self._platform == 'Windows': 

617 current = proc.nice() 

618 # Windows priority classes: IDLE=64, BELOW_NORMAL=16384, 

619 # NORMAL=32, ABOVE_NORMAL=32768, HIGH=128, REALTIME=256 

620 if current in (128, 256, 32768): # HIGH, REALTIME, ABOVE_NORMAL 

621 proc.nice(32) # NORMAL_PRIORITY_CLASS 

622 return f'lowered {name} (pid={pid}) to NORMAL priority' 

623 else: 

624 current_nice = proc.nice() 

625 if current_nice < 10: 

626 proc.nice(10) 

627 return f'set {name} (pid={pid}) nice=10' 

628 

629 return f'{name} already at acceptable priority' 

630 

631 except psutil.NoSuchProcess: 

632 return f'process {pid} no longer exists' 

633 except psutil.AccessDenied: 

634 return f'access denied for pid {pid}' 

635 except Exception as e: 

636 return f'error: {e}' 

637 

638 def _apply_cache_clean(self, action: OptimizationAction) -> str: 

639 """Clean stale temp files. NEVER deletes user files.""" 

640 max_age = action.params.get('max_age_seconds', TEMP_CLEAN_MAX_AGE) 

641 now = time.time() 

642 freed = 0 

643 cleaned = 0 

644 

645 # System temp directory only 

646 temp_dir = tempfile.gettempdir() 

647 try: 

648 for entry in os.scandir(temp_dir): 

649 try: 

650 # Only delete old files (not directories with content) 

651 stat = entry.stat(follow_symlinks=False) 

652 age = now - stat.st_mtime 

653 if age < max_age: 

654 continue 

655 

656 if entry.is_file(follow_symlinks=False): 

657 size = stat.st_size 

658 os.unlink(entry.path) 

659 freed += size 

660 cleaned += 1 

661 elif entry.is_dir(follow_symlinks=False): 

662 # Only empty directories 

663 try: 

664 os.rmdir(entry.path) 

665 cleaned += 1 

666 except OSError: 

667 pass # Not empty — skip 

668 except (PermissionError, OSError): 

669 continue 

670 except Exception as e: 

671 logger.debug("Cache clean scan error: %s", e) 

672 

673 with self._lock: 

674 self._cache_bytes_freed += freed 

675 

676 freed_mb = freed / (1024 * 1024) 

677 return f'cleaned {cleaned} items, freed {freed_mb:.1f} MB from {temp_dir}' 

678 

679 def _apply_swap_manage(self, action: OptimizationAction) -> str: 

680 """Reduce swap pressure by dropping filesystem caches (Linux only).""" 

681 if self._platform == 'Linux': 

682 try: 

683 # Drop page cache only (safest option: value 1) 

684 # Requires root — will fail gracefully if not root 

685 with open('/proc/sys/vm/drop_caches', 'w') as f: 

686 f.write('1') 

687 return 'dropped page cache (Linux)' 

688 except PermissionError: 

689 return 'drop_caches requires root — skipped' 

690 except Exception as e: 

691 return f'swap manage error: {e}' 

692 elif self._platform == 'Windows': 

693 # On Windows, cleaning temp files is the safest approach 

694 return self._apply_cache_clean(OptimizationAction( 

695 action_type=ActionType.CACHE_CLEAN, 

696 target='swap_relief', 

697 params={'max_age_seconds': 3600}, # More aggressive: 1h 

698 )) 

699 return 'swap management not available on this platform' 

700 

701 def _apply_power_tune(self, action: OptimizationAction) -> str: 

702 """Adjust power profile. Invoked by hive exploration when needed.""" 

703 profile = action.params.get('profile', 'balanced') 

704 

705 if self._platform == 'Windows': 

706 # Windows power scheme GUIDs 

707 schemes = { 

708 'powersave': '381b4222-f694-41f0-9685-ff5bb260df2e', # Balanced (most compatible) 

709 'balanced': '381b4222-f694-41f0-9685-ff5bb260df2e', 

710 'performance': '8c5e7fda-e8bf-4a96-9a85-a6e23a8c635c', 

711 } 

712 guid = schemes.get(profile, schemes['balanced']) 

713 try: 

714 import subprocess 

715 from core.subprocess_safe import hidden_popen_kwargs 

716 subprocess.run( 

717 ['powercfg', '/setactive', guid], 

718 capture_output=True, timeout=10, 

719 **hidden_popen_kwargs(), 

720 ) 

721 return f'set power scheme to {profile}' 

722 except Exception as e: 

723 return f'power tune error: {e}' 

724 

725 elif self._platform == 'Linux': 

726 # cpufreq governor 

727 governors = { 

728 'powersave': 'powersave', 

729 'balanced': 'schedutil', 

730 'performance': 'performance', 

731 } 

732 gov = governors.get(profile, 'schedutil') 

733 try: 

734 # Apply to all CPUs 

735 cpu_count = os.cpu_count() or 1 

736 applied = 0 

737 for i in range(cpu_count): 

738 path = f'/sys/devices/system/cpu/cpu{i}/cpufreq/scaling_governor' 

739 try: 

740 with open(path, 'w') as f: 

741 f.write(gov) 

742 applied += 1 

743 except (PermissionError, FileNotFoundError): 

744 pass 

745 return f'set governor={gov} on {applied}/{cpu_count} CPUs' 

746 except Exception as e: 

747 return f'power tune error: {e}' 

748 

749 return 'power tuning not available on this platform' 

750 

751 def _apply_network_tune(self, action: OptimizationAction) -> str: 

752 """Network tuning — currently informational only.""" 

753 return 'network tuning noted (advisory only)' 

754 

755 # ── Hive Exploration (proactive, random-interval) ───────────── 

756 

757 def _explore_hive_stream(self) -> None: 

758 """Check hive action stream for optimization goals from other nodes. 

759 

760 At random intervals (5-30 min), peek at the hive to see if: 

761 - Nodes in the region are overloaded -> this node can accept more tasks 

762 - A benchmark is running -> minimize background work 

763 - New optimization strategies discovered -> apply locally 

764 - Resource bottlenecks detected network-wide -> contribute stats 

765 """ 

766 snap = self._last_snapshot 

767 if snap is None: 

768 snap = self.snapshot() 

769 

770 # Contribute anonymized stats to hive 

771 self._contribute_to_hive(snap) 

772 

773 # Check for hive optimization goals 

774 goals = self._fetch_hive_goals() 

775 if not goals: 

776 return 

777 

778 for goal in goals: 

779 goal_type = goal.get('type', '') 

780 

781 if goal_type == 'region_overloaded': 

782 # Other nodes struggling — if we have headroom, signal availability 

783 if snap.cpu_percent < 50 and snap.ram_percent < 60: 

784 _emit('system.hive.available', { 

785 'cpu_headroom': round(100 - snap.cpu_percent, 1), 

786 'ram_headroom': round(100 - snap.ram_percent, 1), 

787 'goal_id': goal.get('id', ''), 

788 }) 

789 logger.info("Hive: signaled availability for overloaded region") 

790 

791 elif goal_type == 'benchmark_running': 

792 # Hive is benchmarking — reduce our background work 

793 action = OptimizationAction( 

794 action_type=ActionType.PRIORITY_ADJUST, 

795 target='self_throttle', 

796 params={'reason': 'hive_benchmark'}, 

797 impact_estimate='Reduce background work during hive benchmark', 

798 ) 

799 self._apply_optimization(action) 

800 

801 elif goal_type == 'power_save_request': 

802 # Network-wide power saving (e.g., peak grid hours) 

803 action = OptimizationAction( 

804 action_type=ActionType.POWER_TUNE, 

805 target='power_profile', 

806 params={'profile': 'powersave'}, 

807 impact_estimate='Switch to powersave for network-wide efficiency', 

808 ) 

809 self._apply_optimization(action) 

810 

811 def _fetch_hive_goals(self) -> List[Dict]: 

812 """Fetch optimization goals from the hive (best-effort). 

813 

814 Queries the goal manager for active system-optimization goals. 

815 Returns empty list if hive is not available. 

816 """ 

817 try: 

818 from integrations.agent_engine.goal_manager import get_goal_manager 

819 gm = get_goal_manager() 

820 if gm is None: 

821 return [] 

822 # Look for active optimization goals 

823 goals = [] 

824 all_goals = getattr(gm, 'get_active_goals', lambda: [])() 

825 for g in all_goals: 

826 tags = getattr(g, 'tags', []) or [] 

827 if 'system_optimization' in tags or 'compute_optimization' in tags: 

828 goals.append({ 

829 'id': getattr(g, 'id', ''), 

830 'type': getattr(g, 'goal_type', ''), 

831 'params': getattr(g, 'params', {}), 

832 }) 

833 return goals 

834 except Exception: 

835 return [] 

836 

837 # ── EventBus & Federation ───────────────────────────────────── 

838 

839 def _emit_stats(self, snap: SystemSnapshot) -> None: 

840 """Emit system health snapshot to EventBus for dashboard consumption.""" 

841 data = snap.to_dict() 

842 data['health_score'] = self.get_health_score(snap) 

843 _emit('system.health.snapshot', data) 

844 

845 def _contribute_to_hive(self, snap: SystemSnapshot) -> None: 

846 """Share anonymized system stats via federation delta channel. 

847 

848 Stats are aggregated, not personally identifiable: 

849 - CPU/RAM/disk utilization bands (not exact values) 

850 - Optimization counts 

851 - Platform type 

852 No IP addresses, hostnames, or process names are shared. 

853 """ 

854 try: 

855 from integrations.agent_engine.federated_aggregator import get_aggregator 

856 agg = get_aggregator() 

857 if agg is None: 

858 return 

859 

860 # Anonymized: bucket utilization into bands (low/medium/high/critical) 

861 def _band(pct: float) -> str: 

862 if pct < 30: 

863 return 'low' 

864 if pct < 60: 

865 return 'medium' 

866 if pct < 85: 

867 return 'high' 

868 return 'critical' 

869 

870 delta = { 

871 'channel': 'compute_health', 

872 'cpu_band': _band(snap.cpu_percent), 

873 'ram_band': _band(snap.ram_percent), 

874 'disk_band': _band(snap.disk_usage_percent), 

875 'optimizations_applied': self._optimizations_applied, 

876 'platform': self._platform, 

877 'has_gpu': snap.gpu_mem_total_gb > 0, 

878 'timestamp': time.time(), 

879 } 

880 

881 # Use broadcast_delta if available 

882 broadcast = getattr(agg, 'broadcast_delta', None) 

883 if callable(broadcast): 

884 broadcast(delta) 

885 except Exception: 

886 pass 

887 

888 # ── Public API ──────────────────────────────────────────────── 

889 

890 def get_stats(self) -> Dict: 

891 """Return optimization statistics and recent history.""" 

892 with self._lock: 

893 history = [a.to_dict() for a in self._history] 

894 return { 

895 'running': self._running, 

896 'optimizations_applied': self._optimizations_applied, 

897 'suggestions_made': self._suggestions_made, 

898 'hive_explorations': self._hive_explorations, 

899 'cache_bytes_freed': self._cache_bytes_freed, 

900 'cache_mb_freed': round(self._cache_bytes_freed / (1024 * 1024), 2), 

901 'history_count': len(history), 

902 'recent_history': history[-20:], 

903 'platform': self._platform, 

904 } 

905 

906 def get_health_score(self, snap: Optional[SystemSnapshot] = None) -> float: 

907 """Compute overall system health score (0.0 = critical, 1.0 = healthy). 

908 

909 Weighted average of resource utilization: 

910 - CPU: 30% weight 

911 - RAM: 30% weight 

912 - Swap: 20% weight 

913 - Disk: 20% weight 

914 

915 Lower utilization = higher score. Score inverts percentage to health. 

916 """ 

917 if snap is None: 

918 snap = self._last_snapshot 

919 if snap is None: 

920 return 1.0 # No data = assume healthy 

921 

922 # Convert utilization % to health (100% used = 0.0, 0% used = 1.0) 

923 cpu_health = max(0.0, 1.0 - snap.cpu_percent / 100.0) 

924 ram_health = max(0.0, 1.0 - snap.ram_percent / 100.0) 

925 swap_health = max(0.0, 1.0 - snap.swap_percent / 100.0) 

926 disk_health = max(0.0, 1.0 - snap.disk_usage_percent / 100.0) 

927 

928 score = (cpu_health * 0.30 

929 + ram_health * 0.30 

930 + swap_health * 0.20 

931 + disk_health * 0.20) 

932 

933 return round(max(0.0, min(1.0, score)), 3) 

934 

935 def trigger_optimization(self) -> Dict: 

936 """Manually trigger an optimization check. Returns actions taken. 

937 

938 Feeds attribution chain: health baseline → actions → final health score. 

939 """ 

940 # Attribution: track this optimization cycle 

941 attribution_id = None 

942 try: 

943 from integrations.agent_engine.agent_attribution import ( 

944 begin_action, record_step, complete_action, 

945 ) 

946 attribution_id = begin_action( 

947 agent_id='compute_optimizer', 

948 action_type='optimization_cycle', 

949 expected_outcome={'health_score_delta': 0.05}, 

950 acceptance_criteria=['health_score > baseline'], 

951 ) 

952 except Exception: 

953 pass 

954 

955 snap = self.snapshot() 

956 baseline_health = self.get_health_score(snap) 

957 if attribution_id: 

958 try: 

959 record_step(attribution_id, 'baseline_snapshot', 

960 state={'cpu_pct': snap.cpu_percent, 'ram_pct': snap.ram_percent, 

961 'health_score': baseline_health}, 

962 decision='assess', confidence=1.0) 

963 except Exception: 

964 pass 

965 

966 suggestions = self._suggest_optimizations(snap) 

967 results = [] 

968 for action in suggestions: 

969 self._apply_optimization(action) 

970 results.append(action.to_dict()) 

971 if attribution_id: 

972 try: 

973 record_step(attribution_id, f'applied:{action.action_type.value}', 

974 state={'target': action.target, 

975 'result': action.to_dict().get('result', '')}, 

976 decision=action.action_type.value, confidence=0.7) 

977 except Exception: 

978 pass 

979 

980 # If no threshold breached, still do a cache clean as maintenance 

981 if not results: 

982 action = OptimizationAction( 

983 action_type=ActionType.CACHE_CLEAN, 

984 target='maintenance', 

985 params={'max_age_seconds': TEMP_CLEAN_MAX_AGE}, 

986 impact_estimate='Routine maintenance cache clean', 

987 ) 

988 self._apply_optimization(action) 

989 results.append(action.to_dict()) 

990 

991 final_snap = self.snapshot() 

992 final_health = self.get_health_score(final_snap) 

993 

994 # Attribution: complete with delta outcome 

995 if attribution_id: 

996 try: 

997 complete_action(attribution_id, outcome={ 

998 'status': 'completed', 

999 'baseline_health_score': baseline_health, 

1000 'final_health_score': final_health, 

1001 'health_score_delta': round(final_health - baseline_health, 4), 

1002 'actions_applied': len(results), 

1003 }) 

1004 except Exception: 

1005 pass 

1006 

1007 return { 

1008 'snapshot': snap.to_dict(), 

1009 'health_score': final_health, 

1010 'actions': results, 

1011 } 

1012 

1013 

1014# ===================================================================== 

1015# Singleton 

1016# ===================================================================== 

1017 

1018_optimizer: Optional[ComputeOptimizer] = None 

1019_optimizer_lock = threading.Lock() 

1020 

1021 

1022def get_optimizer() -> ComputeOptimizer: 

1023 """Get or create the singleton ComputeOptimizer.""" 

1024 global _optimizer 

1025 if _optimizer is None: 

1026 with _optimizer_lock: 

1027 if _optimizer is None: 

1028 _optimizer = ComputeOptimizer() 

1029 return _optimizer 

1030 

1031 

1032# ===================================================================== 

1033# Flask Blueprint 

1034# ===================================================================== 

1035 

1036def create_optimizer_blueprint(): 

1037 """Create a Flask Blueprint for system optimizer API endpoints. 

1038 

1039 Endpoints: 

1040 GET /api/system/health - Current snapshot + health score 

1041 GET /api/system/optimizations - Recent optimizations applied 

1042 POST /api/system/optimize - Trigger manual optimization check 

1043 

1044 Returns: 

1045 Flask Blueprint instance, or None if Flask is not available. 

1046 """ 

1047 try: 

1048 from flask import Blueprint, jsonify, request 

1049 except ImportError: 

1050 logger.debug("Flask not available -- optimizer blueprint not created") 

1051 return None 

1052 

1053 bp = Blueprint('compute_optimizer', __name__, url_prefix='/api/system') 

1054 

1055 @bp.route('/health', methods=['GET']) 

1056 def system_health(): 

1057 optimizer = get_optimizer() 

1058 snap = optimizer.snapshot() 

1059 return jsonify({ 

1060 'health_score': optimizer.get_health_score(snap), 

1061 'snapshot': snap.to_dict(), 

1062 'optimizations_applied': optimizer.get_stats()['optimizations_applied'], 

1063 }) 

1064 

1065 @bp.route('/optimizations', methods=['GET']) 

1066 def system_optimizations(): 

1067 optimizer = get_optimizer() 

1068 stats = optimizer.get_stats() 

1069 limit = request.args.get('limit', 20, type=int) 

1070 stats['recent_history'] = stats['recent_history'][-limit:] 

1071 return jsonify(stats) 

1072 

1073 @bp.route('/optimize', methods=['POST']) 

1074 def trigger_optimize(): 

1075 optimizer = get_optimizer() 

1076 result = optimizer.trigger_optimization() 

1077 return jsonify(result) 

1078 

1079 return bp