Coverage for core / compute_optimizer.py: 75.3%
477 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2OS Compute Optimizer — Make HARTOS a net positive on any system.
4Event-based system resource optimizer that:
51. Monitors: CPU, RAM, disk I/O, network, swap, process priorities, temp files
62. Optimizes: auto-tune process priorities, clean caches, manage swap, power profiles
73. Proactive: random-interval hive action stream exploration for network-wide optimization
84. Reports: emit EventBus events for dashboard, federation delta for hive-wide stats
10Design principles:
11- Event-driven: only wake when thresholds breach or random hive tick fires
12- Zero overhead when system is healthy — sleep when idle
13- Cross-platform: Windows + Linux + macOS
14- No ML in HARTOS — heuristic only
15- NEVER kills processes, NEVER deletes user files — only system temp/cache
17Integration:
18- EventBus: emits 'system.health.snapshot', 'system.optimization.applied'
19- GPU detection: vram_manager.detect_gpu() (single source)
20- Federation: anonymized stats contributed via delta channel
21- Resource governor: respects governor mode (ACTIVE/IDLE/SLEEP)
23Singleton: get_optimizer() returns the module-level instance.
24Flask blueprint: create_optimizer_blueprint() for REST API.
25"""
27import collections
28import enum
29import logging
30import os
31import platform
32import random
33import tempfile
34import threading
35import time
36from dataclasses import dataclass, field
37from typing import Any, Dict, List, Optional
39logger = logging.getLogger(__name__)
42# =====================================================================
43# Lazy Imports
44# =====================================================================
46def _try_import_psutil():
47 """Return psutil module or None if unavailable."""
48 try:
49 import psutil
50 return psutil
51 except ImportError:
52 return None
55def _try_detect_gpu() -> Dict[str, Any]:
56 """Detect GPU via vram_manager (single source of truth)."""
57 try:
58 from integrations.service_tools.vram_manager import detect_gpu
59 return detect_gpu()
60 except Exception:
61 return {'name': 'none', 'total_gb': 0, 'free_gb': 0, 'cuda_available': False}
64def _emit(topic: str, data: Any = None) -> None:
65 """Emit an event on the platform EventBus (best-effort, no-op if not bootstrapped)."""
66 try:
67 from core.platform.events import emit_event
68 emit_event(topic, data)
69 except Exception:
70 pass
73# =====================================================================
74# Thresholds (configurable via env)
75# =====================================================================
77CPU_HIGH_THRESHOLD = float(os.environ.get('OPTIMIZER_CPU_HIGH', '80'))
78RAM_HIGH_THRESHOLD = float(os.environ.get('OPTIMIZER_RAM_HIGH', '85'))
79SWAP_HIGH_THRESHOLD = float(os.environ.get('OPTIMIZER_SWAP_HIGH', '50'))
80DISK_HIGH_THRESHOLD = float(os.environ.get('OPTIMIZER_DISK_HIGH', '90'))
82# Monitor loop sleeps this long between threshold checks (seconds)
83MONITOR_INTERVAL = float(os.environ.get('OPTIMIZER_MONITOR_INTERVAL', '15'))
85# Hive exploration: random interval bounds (seconds)
86HIVE_EXPLORE_MIN = float(os.environ.get('OPTIMIZER_HIVE_MIN', '300')) # 5 min
87HIVE_EXPLORE_MAX = float(os.environ.get('OPTIMIZER_HIVE_MAX', '1800')) # 30 min
89# Max age of temp files to clean (seconds) — default 24h
90TEMP_CLEAN_MAX_AGE = float(os.environ.get('OPTIMIZER_TEMP_MAX_AGE', '86400'))
92# Optimization history buffer size
93HISTORY_MAXLEN = 200
96# =====================================================================
97# Enums & Dataclasses
98# =====================================================================
100class ActionType(enum.Enum):
101 """Types of optimization actions the optimizer can take."""
102 PRIORITY_ADJUST = 'priority_adjust'
103 CACHE_CLEAN = 'cache_clean'
104 SWAP_MANAGE = 'swap_manage'
105 POWER_TUNE = 'power_tune'
106 PROCESS_SUGGEST = 'process_suggest'
107 NETWORK_TUNE = 'network_tune'
110@dataclass
111class SystemSnapshot:
112 """Point-in-time snapshot of system resource utilization."""
113 timestamp: float = 0.0
114 cpu_percent: float = 0.0
115 ram_percent: float = 0.0
116 ram_used_gb: float = 0.0
117 ram_total_gb: float = 0.0
118 swap_percent: float = 0.0
119 disk_usage_percent: float = 0.0 # Root/C: partition
120 disk_io_read_mb: float = 0.0 # Cumulative MB read
121 disk_io_write_mb: float = 0.0 # Cumulative MB written
122 net_sent_mb: float = 0.0 # Cumulative MB sent
123 net_recv_mb: float = 0.0 # Cumulative MB received
124 top_processes: List[Dict] = field(default_factory=list) # [{name, pid, cpu%, mem%}]
125 gpu_util_percent: float = 0.0
126 gpu_mem_used_gb: float = 0.0
127 gpu_mem_total_gb: float = 0.0
128 platform_name: str = ''
130 def to_dict(self) -> Dict:
131 return {
132 'timestamp': self.timestamp,
133 'cpu_percent': round(self.cpu_percent, 1),
134 'ram_percent': round(self.ram_percent, 1),
135 'ram_used_gb': round(self.ram_used_gb, 2),
136 'ram_total_gb': round(self.ram_total_gb, 2),
137 'swap_percent': round(self.swap_percent, 1),
138 'disk_usage_percent': round(self.disk_usage_percent, 1),
139 'disk_io_read_mb': round(self.disk_io_read_mb, 1),
140 'disk_io_write_mb': round(self.disk_io_write_mb, 1),
141 'net_sent_mb': round(self.net_sent_mb, 1),
142 'net_recv_mb': round(self.net_recv_mb, 1),
143 'top_processes': self.top_processes[:10],
144 'gpu_util_percent': round(self.gpu_util_percent, 1),
145 'gpu_mem_used_gb': round(self.gpu_mem_used_gb, 2),
146 'gpu_mem_total_gb': round(self.gpu_mem_total_gb, 2),
147 'platform': self.platform_name,
148 }
151@dataclass
152class OptimizationAction:
153 """A single optimization action the optimizer can take or suggest."""
154 action_type: ActionType
155 target: str # What is being optimized (e.g., process name, path)
156 params: Dict = field(default_factory=dict)
157 impact_estimate: str = '' # Human-readable expected impact
158 applied: bool = False
159 timestamp: float = 0.0
160 result: str = '' # Outcome after applying
162 def to_dict(self) -> Dict:
163 return {
164 'action_type': self.action_type.value,
165 'target': self.target,
166 'params': self.params,
167 'impact_estimate': self.impact_estimate,
168 'applied': self.applied,
169 'timestamp': self.timestamp,
170 'result': self.result,
171 }
174# =====================================================================
175# ComputeOptimizer
176# =====================================================================
178class ComputeOptimizer:
179 """Event-based system optimizer — makes HARTOS a net positive on any host.
181 Only wakes when thresholds breach or a random hive-exploration tick fires.
182 All optimizations are non-destructive: never kills processes, never deletes
183 user files, only cleans system temp/cache and adjusts priorities.
184 """
186 def __init__(self):
187 self._lock = threading.Lock()
188 self._running = False
189 self._monitor_thread: Optional[threading.Thread] = None
190 self._hive_thread: Optional[threading.Thread] = None
191 self._stop_event = threading.Event()
193 # Snapshot history (bounded)
194 self._snapshots: collections.deque = collections.deque(maxlen=60)
195 self._last_snapshot: Optional[SystemSnapshot] = None
197 # Optimization history (bounded)
198 self._history: collections.deque = collections.deque(maxlen=HISTORY_MAXLEN)
200 # Stats counters
201 self._optimizations_applied: int = 0
202 self._suggestions_made: int = 0
203 self._hive_explorations: int = 0
204 self._cache_bytes_freed: int = 0
206 # Cooldowns: action_type -> last_applied_timestamp
207 self._cooldowns: Dict[str, float] = {}
209 # Platform
210 self._platform = platform.system() # 'Windows', 'Linux', 'Darwin'
212 # ── Lifecycle ─────────────────────────────────────────────────
214 def start(self) -> None:
215 """Start background monitor and hive exploration threads."""
216 with self._lock:
217 if self._running:
218 return
219 self._running = True
220 self._stop_event.clear()
222 self._monitor_thread = threading.Thread(
223 target=self._monitor_loop,
224 name='compute_optimizer_monitor',
225 daemon=True,
226 )
227 self._hive_thread = threading.Thread(
228 target=self._hive_explore_loop,
229 name='compute_optimizer_hive',
230 daemon=True,
231 )
232 self._monitor_thread.start()
233 self._hive_thread.start()
234 logger.info("ComputeOptimizer started (monitor=%.0fs, hive=%.0f-%.0fs)",
235 MONITOR_INTERVAL, HIVE_EXPLORE_MIN, HIVE_EXPLORE_MAX)
237 def stop(self) -> None:
238 """Stop background threads gracefully."""
239 with self._lock:
240 if not self._running:
241 return
242 self._running = False
243 self._stop_event.set()
244 if self._monitor_thread and self._monitor_thread.is_alive():
245 self._monitor_thread.join(timeout=5)
246 if self._hive_thread and self._hive_thread.is_alive():
247 self._hive_thread.join(timeout=5)
248 logger.info("ComputeOptimizer stopped")
250 # ── Monitor Loop (event-driven: sleep between checks) ─────────
252 def _monitor_loop(self) -> None:
253 """Periodic threshold check — sleeps between iterations."""
254 while not self._stop_event.is_set():
255 try:
256 snap = self.snapshot()
257 self._emit_stats(snap)
259 # Only act when thresholds breached
260 actions = self._check_thresholds(snap)
261 if actions:
262 suggestions = self._suggest_optimizations(snap)
263 for action in suggestions:
264 self._apply_optimization(action)
265 except Exception as e:
266 logger.debug("ComputeOptimizer monitor error: %s", e)
268 self._stop_event.wait(MONITOR_INTERVAL)
270 def _hive_explore_loop(self) -> None:
271 """Random-interval hive action stream exploration."""
272 while not self._stop_event.is_set():
273 delay = random.uniform(HIVE_EXPLORE_MIN, HIVE_EXPLORE_MAX)
274 if self._stop_event.wait(delay):
275 break # Stop requested during sleep
276 try:
277 self._explore_hive_stream()
278 with self._lock:
279 self._hive_explorations += 1
280 except Exception as e:
281 logger.debug("ComputeOptimizer hive exploration error: %s", e)
283 # ── Snapshot ──────────────────────────────────────────────────
285 # ── Snapshot collectors (extracted helpers — single source of truth
286 # for cpu/ram/swap/disk/network reads + per-process walk + GPU
287 # detection. Both ``snapshot()`` (the canonical full-snapshot
288 # entry, behavior preserved exactly) and ``get_latest_snapshot``
289 # (the cached cross-module accessor) compose these helpers.
290 # Splitting the collectors lets cross-module readers (e.g.
291 # ResourceGovernor, which only needs aggregates) request just
292 # the cheap path without paying the per-process walk's GIL cost.
293 # ──────────────────────────────────────────────────────────────────
295 def _collect_aggregates(self, snap: SystemSnapshot) -> None:
296 """Populate cpu/ram/swap/disk/network on snap (cheap, GIL-light).
298 Always safe to call — has zero per-process / per-PID cost. Lifted
299 verbatim from the original ``snapshot`` body, behavior preserved.
300 """
301 psutil = _try_import_psutil()
302 if psutil is None:
303 return
304 # CPU — non-blocking (interval=None returns since-last-call)
305 snap.cpu_percent = psutil.cpu_percent(interval=None)
307 # RAM
308 mem = psutil.virtual_memory()
309 snap.ram_percent = mem.percent
310 snap.ram_used_gb = mem.used / (1024 ** 3)
311 snap.ram_total_gb = mem.total / (1024 ** 3)
313 # Swap
314 swap = psutil.swap_memory()
315 snap.swap_percent = swap.percent if swap.total > 0 else 0.0
317 # Disk usage (root partition)
318 try:
319 root = 'C:\\' if self._platform == 'Windows' else '/'
320 disk = psutil.disk_usage(root)
321 snap.disk_usage_percent = disk.percent
322 except Exception:
323 pass
325 # Disk I/O
326 try:
327 dio = psutil.disk_io_counters()
328 if dio:
329 snap.disk_io_read_mb = dio.read_bytes / (1024 ** 2)
330 snap.disk_io_write_mb = dio.write_bytes / (1024 ** 2)
331 except Exception:
332 pass
334 # Network I/O
335 try:
336 nio = psutil.net_io_counters()
337 if nio:
338 snap.net_sent_mb = nio.bytes_sent / (1024 ** 2)
339 snap.net_recv_mb = nio.bytes_recv / (1024 ** 2)
340 except Exception:
341 pass
343 def _collect_top_processes(self, snap: SystemSnapshot) -> None:
344 """Populate snap.top_processes via psutil.process_iter (expensive).
346 WHY GATED: ``psutil.process_iter(['memory_percent'])`` on Windows
347 walks every PID and issues OpenProcess + GetProcessMemoryInfo
348 per process, all under the GIL. On a typical Windows box
349 (~150-300 PIDs) that's 200ms-2s of GIL-held CPU per call — which,
350 fired every MONITOR_INTERVAL seconds, periodically stalls WAMP
351 heartbeats, the chat hot path, and daemon yield gates. py-spy
352 thread dumps caught exactly this loop active+gil. The data is
353 only read by ``_suggest_optimizations`` when a threshold is
354 breached; emitting an empty list when nothing is hot costs
355 nothing downstream. Gate covers cpu/ram/swap/disk so any
356 breach (incl. 89.5% RAM situations) still gets the per-process
357 detail it needs.
358 """
359 psutil = _try_import_psutil()
360 if psutil is None:
361 return
362 if not (snap.cpu_percent > CPU_HIGH_THRESHOLD
363 or snap.ram_percent > RAM_HIGH_THRESHOLD
364 or snap.swap_percent > SWAP_HIGH_THRESHOLD
365 or snap.disk_usage_percent > DISK_HIGH_THRESHOLD):
366 return
367 try:
368 procs = []
369 for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_percent']):
370 info = proc.info
371 if info.get('cpu_percent', 0) > 0.1:
372 procs.append({
373 'pid': info['pid'],
374 'name': info.get('name', ''),
375 'cpu_percent': round(info.get('cpu_percent', 0), 1),
376 'mem_percent': round(info.get('memory_percent', 0), 1),
377 })
378 procs.sort(key=lambda p: p['cpu_percent'], reverse=True)
379 snap.top_processes = procs[:10]
380 except Exception:
381 pass
383 def _collect_gpu(self, snap: SystemSnapshot) -> None:
384 """Populate snap GPU fields via vram_manager (single source of truth)."""
385 gpu_info = _try_detect_gpu()
386 if gpu_info.get('cuda_available'):
387 snap.gpu_mem_total_gb = gpu_info.get('total_gb', 0)
388 snap.gpu_mem_used_gb = snap.gpu_mem_total_gb - gpu_info.get('free_gb', 0)
389 if snap.gpu_mem_total_gb > 0:
390 snap.gpu_util_percent = (snap.gpu_mem_used_gb / snap.gpu_mem_total_gb) * 100
392 def _store_snapshot(self, snap: SystemSnapshot) -> None:
393 """Append to history deque + update _last_snapshot under lock."""
394 with self._lock:
395 self._snapshots.append(snap)
396 self._last_snapshot = snap
398 def snapshot(self) -> SystemSnapshot:
399 """Capture current system state. Returns a SystemSnapshot.
401 Uses psutil if available; falls back to minimal OS-level data.
402 GPU info via vram_manager (single source of truth).
404 Behavior preserved from pre-refactor (2026-05-01 monitor
405 unification): aggregates + per-process top-N (when threshold
406 breached) + GPU + history append + _last_snapshot update +
407 return snap. Body factored into ``_collect_*`` helpers so the
408 cross-module ``get_latest_snapshot`` accessor can reuse the
409 cheap aggregate path without paying the per-process GIL cost.
410 Existing callers (``_monitor_loop``, ``_emit_stats``, the
411 ``/optimizer/snapshot`` route, the test suite mocks) see the
412 identical SystemSnapshot they always have.
413 """
414 snap = SystemSnapshot(
415 timestamp=time.time(),
416 platform_name=self._platform,
417 )
418 self._collect_aggregates(snap)
419 self._collect_top_processes(snap)
420 self._collect_gpu(snap)
421 self._store_snapshot(snap)
422 return snap
424 def get_latest_snapshot(self,
425 max_age_s: float = 10.0,
426 include_per_process: bool = False
427 ) -> Optional[SystemSnapshot]:
428 """Cached system snapshot for cross-module readers.
430 Primary consumer: ``ResourceGovernor._monitor_loop`` (5s cadence)
431 which historically polled psutil itself, duplicating this
432 module's polls. Returning a cached snapshot eliminates the
433 parallel psutil call site (root cause logged 2026-05-01:
434 py-spy caught both ``compute_optimizer_monitor`` AND
435 ``ResourceGovernor-Monitor`` active+gil simultaneously, both
436 in psutil cpu/memory paths).
438 Args:
439 max_age_s: return cached snap if it's younger than this.
440 Pass 0 to always force a fresh read. Default 10s — well
441 under ResourceGovernor's IDLE_THRESHOLD_SECONDS (120s)
442 buffer for ACTIVE→IDLE transitions, so worst-case stale
443 data can't cause a perceptible mode-transition lag.
444 include_per_process: if True, run the expensive
445 ``process_iter`` walk during a refresh. Default False —
446 cross-module readers like ResourceGovernor only need
447 aggregates and would otherwise force a per-process walk
448 on every 5s tick under sustained pressure.
450 Returns:
451 The most-recent SystemSnapshot. Never None in steady state
452 (a fresh snapshot is built when cache is empty). Returns
453 None only if snapshot construction itself raises (defensive
454 — caller should fall back to its own data source).
455 """
456 # Lock-free read of the atomic ref — fast path when cache hot.
457 last = self._last_snapshot
458 if last is not None and (time.time() - last.timestamp) < max_age_s:
459 return last
460 # Stale or missing — build a fresh snap. Aggregates are
461 # always cheap; top-processes only if caller asked.
462 try:
463 snap = SystemSnapshot(
464 timestamp=time.time(),
465 platform_name=self._platform,
466 )
467 self._collect_aggregates(snap)
468 if include_per_process:
469 self._collect_top_processes(snap)
470 self._collect_gpu(snap)
471 self._store_snapshot(snap)
472 return snap
473 except Exception as e:
474 logger.debug("get_latest_snapshot refresh failed: %s", e)
475 return None
477 # ── Threshold Detection ───────────────────────────────────────
479 def _check_thresholds(self, snap: SystemSnapshot) -> List[str]:
480 """Detect which thresholds are breached. Returns list of breach names."""
481 breaches = []
482 if snap.cpu_percent > CPU_HIGH_THRESHOLD:
483 breaches.append('cpu_high')
484 if snap.ram_percent > RAM_HIGH_THRESHOLD:
485 breaches.append('ram_high')
486 if snap.swap_percent > SWAP_HIGH_THRESHOLD:
487 breaches.append('swap_high')
488 if snap.disk_usage_percent > DISK_HIGH_THRESHOLD:
489 breaches.append('disk_high')
490 return breaches
492 # ── Suggest Optimizations (heuristic) ─────────────────────────
494 def _suggest_optimizations(self, snap: SystemSnapshot) -> List[OptimizationAction]:
495 """Generate non-destructive optimization suggestions based on snapshot."""
496 actions = []
497 now = time.time()
499 # CPU high: suggest lowering heavy non-essential processes
500 if snap.cpu_percent > CPU_HIGH_THRESHOLD:
501 if self._cooldown_ok(ActionType.PROCESS_SUGGEST, now, 120):
502 hogs = [p for p in snap.top_processes
503 if p.get('cpu_percent', 0) > 20 and p.get('name', '') != '']
504 for proc in hogs[:3]:
505 actions.append(OptimizationAction(
506 action_type=ActionType.PROCESS_SUGGEST,
507 target=proc.get('name', 'unknown'),
508 params={'pid': proc.get('pid', 0), 'cpu_percent': proc['cpu_percent']},
509 impact_estimate=f"Lower priority of {proc['name']} "
510 f"(using {proc['cpu_percent']}% CPU)",
511 ))
513 # RAM high: clean temp/cache
514 if snap.ram_percent > RAM_HIGH_THRESHOLD:
515 if self._cooldown_ok(ActionType.CACHE_CLEAN, now, 300):
516 actions.append(OptimizationAction(
517 action_type=ActionType.CACHE_CLEAN,
518 target='system_temp',
519 params={'max_age_seconds': TEMP_CLEAN_MAX_AGE},
520 impact_estimate='Free RAM by clearing stale temp files',
521 ))
523 # Swap high: suggest reducing memory pressure
524 if snap.swap_percent > SWAP_HIGH_THRESHOLD:
525 if self._cooldown_ok(ActionType.SWAP_MANAGE, now, 600):
526 actions.append(OptimizationAction(
527 action_type=ActionType.SWAP_MANAGE,
528 target='swap_pressure',
529 params={'swap_percent': snap.swap_percent},
530 impact_estimate='Reduce swap usage by freeing cached memory',
531 ))
533 # Disk high: clean temp directories
534 if snap.disk_usage_percent > DISK_HIGH_THRESHOLD:
535 if self._cooldown_ok(ActionType.CACHE_CLEAN, now, 600):
536 actions.append(OptimizationAction(
537 action_type=ActionType.CACHE_CLEAN,
538 target='disk_temp',
539 params={'max_age_seconds': TEMP_CLEAN_MAX_AGE},
540 impact_estimate='Free disk space by clearing stale temp files',
541 ))
543 with self._lock:
544 self._suggestions_made += len(actions)
546 return actions
548 def _cooldown_ok(self, action_type: ActionType, now: float,
549 min_interval: float) -> bool:
550 """Check if enough time has passed since last action of this type."""
551 key = action_type.value
552 last = self._cooldowns.get(key, 0)
553 return (now - last) >= min_interval
555 # ── Apply Optimizations (safe, non-destructive) ───────────────
557 def _apply_optimization(self, action: OptimizationAction) -> None:
558 """Execute an optimization action. Only non-destructive operations."""
559 action.timestamp = time.time()
561 try:
562 if action.action_type == ActionType.PROCESS_SUGGEST:
563 action.result = self._apply_priority_adjust(action)
564 elif action.action_type == ActionType.CACHE_CLEAN:
565 action.result = self._apply_cache_clean(action)
566 elif action.action_type == ActionType.SWAP_MANAGE:
567 action.result = self._apply_swap_manage(action)
568 elif action.action_type == ActionType.POWER_TUNE:
569 action.result = self._apply_power_tune(action)
570 elif action.action_type == ActionType.NETWORK_TUNE:
571 action.result = self._apply_network_tune(action)
572 else:
573 action.result = 'no handler'
574 return
576 action.applied = True
577 with self._lock:
578 self._optimizations_applied += 1
579 self._cooldowns[action.action_type.value] = time.time()
580 self._history.append(action)
582 logger.info("Optimization applied: %s on %s -> %s",
583 action.action_type.value, action.target, action.result)
584 _emit('system.optimization.applied', action.to_dict())
586 except Exception as e:
587 action.result = f'error: {e}'
588 with self._lock:
589 self._history.append(action)
590 logger.debug("Optimization failed: %s on %s: %s",
591 action.action_type.value, action.target, e)
593 def _apply_priority_adjust(self, action: OptimizationAction) -> str:
594 """Lower priority of a CPU-hogging process (NEVER kill)."""
595 psutil = _try_import_psutil()
596 if psutil is None:
597 return 'psutil not available'
599 pid = action.params.get('pid', 0)
600 if not pid:
601 return 'no pid'
603 try:
604 proc = psutil.Process(pid)
605 name = proc.name()
607 # Safety: never touch system-critical processes
608 protected = {'systemd', 'init', 'kernel', 'csrss.exe', 'svchost.exe',
609 'wininit.exe', 'services.exe', 'lsass.exe', 'smss.exe',
610 'System', 'explorer.exe', 'dwm.exe', 'loginwindow',
611 'launchd', 'WindowServer'}
612 if name in protected:
613 return f'skipped (protected: {name})'
615 # Lower priority by one notch — never idle/realtime
616 if self._platform == 'Windows':
617 current = proc.nice()
618 # Windows priority classes: IDLE=64, BELOW_NORMAL=16384,
619 # NORMAL=32, ABOVE_NORMAL=32768, HIGH=128, REALTIME=256
620 if current in (128, 256, 32768): # HIGH, REALTIME, ABOVE_NORMAL
621 proc.nice(32) # NORMAL_PRIORITY_CLASS
622 return f'lowered {name} (pid={pid}) to NORMAL priority'
623 else:
624 current_nice = proc.nice()
625 if current_nice < 10:
626 proc.nice(10)
627 return f'set {name} (pid={pid}) nice=10'
629 return f'{name} already at acceptable priority'
631 except psutil.NoSuchProcess:
632 return f'process {pid} no longer exists'
633 except psutil.AccessDenied:
634 return f'access denied for pid {pid}'
635 except Exception as e:
636 return f'error: {e}'
638 def _apply_cache_clean(self, action: OptimizationAction) -> str:
639 """Clean stale temp files. NEVER deletes user files."""
640 max_age = action.params.get('max_age_seconds', TEMP_CLEAN_MAX_AGE)
641 now = time.time()
642 freed = 0
643 cleaned = 0
645 # System temp directory only
646 temp_dir = tempfile.gettempdir()
647 try:
648 for entry in os.scandir(temp_dir):
649 try:
650 # Only delete old files (not directories with content)
651 stat = entry.stat(follow_symlinks=False)
652 age = now - stat.st_mtime
653 if age < max_age:
654 continue
656 if entry.is_file(follow_symlinks=False):
657 size = stat.st_size
658 os.unlink(entry.path)
659 freed += size
660 cleaned += 1
661 elif entry.is_dir(follow_symlinks=False):
662 # Only empty directories
663 try:
664 os.rmdir(entry.path)
665 cleaned += 1
666 except OSError:
667 pass # Not empty — skip
668 except (PermissionError, OSError):
669 continue
670 except Exception as e:
671 logger.debug("Cache clean scan error: %s", e)
673 with self._lock:
674 self._cache_bytes_freed += freed
676 freed_mb = freed / (1024 * 1024)
677 return f'cleaned {cleaned} items, freed {freed_mb:.1f} MB from {temp_dir}'
679 def _apply_swap_manage(self, action: OptimizationAction) -> str:
680 """Reduce swap pressure by dropping filesystem caches (Linux only)."""
681 if self._platform == 'Linux':
682 try:
683 # Drop page cache only (safest option: value 1)
684 # Requires root — will fail gracefully if not root
685 with open('/proc/sys/vm/drop_caches', 'w') as f:
686 f.write('1')
687 return 'dropped page cache (Linux)'
688 except PermissionError:
689 return 'drop_caches requires root — skipped'
690 except Exception as e:
691 return f'swap manage error: {e}'
692 elif self._platform == 'Windows':
693 # On Windows, cleaning temp files is the safest approach
694 return self._apply_cache_clean(OptimizationAction(
695 action_type=ActionType.CACHE_CLEAN,
696 target='swap_relief',
697 params={'max_age_seconds': 3600}, # More aggressive: 1h
698 ))
699 return 'swap management not available on this platform'
701 def _apply_power_tune(self, action: OptimizationAction) -> str:
702 """Adjust power profile. Invoked by hive exploration when needed."""
703 profile = action.params.get('profile', 'balanced')
705 if self._platform == 'Windows':
706 # Windows power scheme GUIDs
707 schemes = {
708 'powersave': '381b4222-f694-41f0-9685-ff5bb260df2e', # Balanced (most compatible)
709 'balanced': '381b4222-f694-41f0-9685-ff5bb260df2e',
710 'performance': '8c5e7fda-e8bf-4a96-9a85-a6e23a8c635c',
711 }
712 guid = schemes.get(profile, schemes['balanced'])
713 try:
714 import subprocess
715 from core.subprocess_safe import hidden_popen_kwargs
716 subprocess.run(
717 ['powercfg', '/setactive', guid],
718 capture_output=True, timeout=10,
719 **hidden_popen_kwargs(),
720 )
721 return f'set power scheme to {profile}'
722 except Exception as e:
723 return f'power tune error: {e}'
725 elif self._platform == 'Linux':
726 # cpufreq governor
727 governors = {
728 'powersave': 'powersave',
729 'balanced': 'schedutil',
730 'performance': 'performance',
731 }
732 gov = governors.get(profile, 'schedutil')
733 try:
734 # Apply to all CPUs
735 cpu_count = os.cpu_count() or 1
736 applied = 0
737 for i in range(cpu_count):
738 path = f'/sys/devices/system/cpu/cpu{i}/cpufreq/scaling_governor'
739 try:
740 with open(path, 'w') as f:
741 f.write(gov)
742 applied += 1
743 except (PermissionError, FileNotFoundError):
744 pass
745 return f'set governor={gov} on {applied}/{cpu_count} CPUs'
746 except Exception as e:
747 return f'power tune error: {e}'
749 return 'power tuning not available on this platform'
751 def _apply_network_tune(self, action: OptimizationAction) -> str:
752 """Network tuning — currently informational only."""
753 return 'network tuning noted (advisory only)'
755 # ── Hive Exploration (proactive, random-interval) ─────────────
757 def _explore_hive_stream(self) -> None:
758 """Check hive action stream for optimization goals from other nodes.
760 At random intervals (5-30 min), peek at the hive to see if:
761 - Nodes in the region are overloaded -> this node can accept more tasks
762 - A benchmark is running -> minimize background work
763 - New optimization strategies discovered -> apply locally
764 - Resource bottlenecks detected network-wide -> contribute stats
765 """
766 snap = self._last_snapshot
767 if snap is None:
768 snap = self.snapshot()
770 # Contribute anonymized stats to hive
771 self._contribute_to_hive(snap)
773 # Check for hive optimization goals
774 goals = self._fetch_hive_goals()
775 if not goals:
776 return
778 for goal in goals:
779 goal_type = goal.get('type', '')
781 if goal_type == 'region_overloaded':
782 # Other nodes struggling — if we have headroom, signal availability
783 if snap.cpu_percent < 50 and snap.ram_percent < 60:
784 _emit('system.hive.available', {
785 'cpu_headroom': round(100 - snap.cpu_percent, 1),
786 'ram_headroom': round(100 - snap.ram_percent, 1),
787 'goal_id': goal.get('id', ''),
788 })
789 logger.info("Hive: signaled availability for overloaded region")
791 elif goal_type == 'benchmark_running':
792 # Hive is benchmarking — reduce our background work
793 action = OptimizationAction(
794 action_type=ActionType.PRIORITY_ADJUST,
795 target='self_throttle',
796 params={'reason': 'hive_benchmark'},
797 impact_estimate='Reduce background work during hive benchmark',
798 )
799 self._apply_optimization(action)
801 elif goal_type == 'power_save_request':
802 # Network-wide power saving (e.g., peak grid hours)
803 action = OptimizationAction(
804 action_type=ActionType.POWER_TUNE,
805 target='power_profile',
806 params={'profile': 'powersave'},
807 impact_estimate='Switch to powersave for network-wide efficiency',
808 )
809 self._apply_optimization(action)
811 def _fetch_hive_goals(self) -> List[Dict]:
812 """Fetch optimization goals from the hive (best-effort).
814 Queries the goal manager for active system-optimization goals.
815 Returns empty list if hive is not available.
816 """
817 try:
818 from integrations.agent_engine.goal_manager import get_goal_manager
819 gm = get_goal_manager()
820 if gm is None:
821 return []
822 # Look for active optimization goals
823 goals = []
824 all_goals = getattr(gm, 'get_active_goals', lambda: [])()
825 for g in all_goals:
826 tags = getattr(g, 'tags', []) or []
827 if 'system_optimization' in tags or 'compute_optimization' in tags:
828 goals.append({
829 'id': getattr(g, 'id', ''),
830 'type': getattr(g, 'goal_type', ''),
831 'params': getattr(g, 'params', {}),
832 })
833 return goals
834 except Exception:
835 return []
837 # ── EventBus & Federation ─────────────────────────────────────
839 def _emit_stats(self, snap: SystemSnapshot) -> None:
840 """Emit system health snapshot to EventBus for dashboard consumption."""
841 data = snap.to_dict()
842 data['health_score'] = self.get_health_score(snap)
843 _emit('system.health.snapshot', data)
845 def _contribute_to_hive(self, snap: SystemSnapshot) -> None:
846 """Share anonymized system stats via federation delta channel.
848 Stats are aggregated, not personally identifiable:
849 - CPU/RAM/disk utilization bands (not exact values)
850 - Optimization counts
851 - Platform type
852 No IP addresses, hostnames, or process names are shared.
853 """
854 try:
855 from integrations.agent_engine.federated_aggregator import get_aggregator
856 agg = get_aggregator()
857 if agg is None:
858 return
860 # Anonymized: bucket utilization into bands (low/medium/high/critical)
861 def _band(pct: float) -> str:
862 if pct < 30:
863 return 'low'
864 if pct < 60:
865 return 'medium'
866 if pct < 85:
867 return 'high'
868 return 'critical'
870 delta = {
871 'channel': 'compute_health',
872 'cpu_band': _band(snap.cpu_percent),
873 'ram_band': _band(snap.ram_percent),
874 'disk_band': _band(snap.disk_usage_percent),
875 'optimizations_applied': self._optimizations_applied,
876 'platform': self._platform,
877 'has_gpu': snap.gpu_mem_total_gb > 0,
878 'timestamp': time.time(),
879 }
881 # Use broadcast_delta if available
882 broadcast = getattr(agg, 'broadcast_delta', None)
883 if callable(broadcast):
884 broadcast(delta)
885 except Exception:
886 pass
888 # ── Public API ────────────────────────────────────────────────
890 def get_stats(self) -> Dict:
891 """Return optimization statistics and recent history."""
892 with self._lock:
893 history = [a.to_dict() for a in self._history]
894 return {
895 'running': self._running,
896 'optimizations_applied': self._optimizations_applied,
897 'suggestions_made': self._suggestions_made,
898 'hive_explorations': self._hive_explorations,
899 'cache_bytes_freed': self._cache_bytes_freed,
900 'cache_mb_freed': round(self._cache_bytes_freed / (1024 * 1024), 2),
901 'history_count': len(history),
902 'recent_history': history[-20:],
903 'platform': self._platform,
904 }
906 def get_health_score(self, snap: Optional[SystemSnapshot] = None) -> float:
907 """Compute overall system health score (0.0 = critical, 1.0 = healthy).
909 Weighted average of resource utilization:
910 - CPU: 30% weight
911 - RAM: 30% weight
912 - Swap: 20% weight
913 - Disk: 20% weight
915 Lower utilization = higher score. Score inverts percentage to health.
916 """
917 if snap is None:
918 snap = self._last_snapshot
919 if snap is None:
920 return 1.0 # No data = assume healthy
922 # Convert utilization % to health (100% used = 0.0, 0% used = 1.0)
923 cpu_health = max(0.0, 1.0 - snap.cpu_percent / 100.0)
924 ram_health = max(0.0, 1.0 - snap.ram_percent / 100.0)
925 swap_health = max(0.0, 1.0 - snap.swap_percent / 100.0)
926 disk_health = max(0.0, 1.0 - snap.disk_usage_percent / 100.0)
928 score = (cpu_health * 0.30
929 + ram_health * 0.30
930 + swap_health * 0.20
931 + disk_health * 0.20)
933 return round(max(0.0, min(1.0, score)), 3)
935 def trigger_optimization(self) -> Dict:
936 """Manually trigger an optimization check. Returns actions taken.
938 Feeds attribution chain: health baseline → actions → final health score.
939 """
940 # Attribution: track this optimization cycle
941 attribution_id = None
942 try:
943 from integrations.agent_engine.agent_attribution import (
944 begin_action, record_step, complete_action,
945 )
946 attribution_id = begin_action(
947 agent_id='compute_optimizer',
948 action_type='optimization_cycle',
949 expected_outcome={'health_score_delta': 0.05},
950 acceptance_criteria=['health_score > baseline'],
951 )
952 except Exception:
953 pass
955 snap = self.snapshot()
956 baseline_health = self.get_health_score(snap)
957 if attribution_id:
958 try:
959 record_step(attribution_id, 'baseline_snapshot',
960 state={'cpu_pct': snap.cpu_percent, 'ram_pct': snap.ram_percent,
961 'health_score': baseline_health},
962 decision='assess', confidence=1.0)
963 except Exception:
964 pass
966 suggestions = self._suggest_optimizations(snap)
967 results = []
968 for action in suggestions:
969 self._apply_optimization(action)
970 results.append(action.to_dict())
971 if attribution_id:
972 try:
973 record_step(attribution_id, f'applied:{action.action_type.value}',
974 state={'target': action.target,
975 'result': action.to_dict().get('result', '')},
976 decision=action.action_type.value, confidence=0.7)
977 except Exception:
978 pass
980 # If no threshold breached, still do a cache clean as maintenance
981 if not results:
982 action = OptimizationAction(
983 action_type=ActionType.CACHE_CLEAN,
984 target='maintenance',
985 params={'max_age_seconds': TEMP_CLEAN_MAX_AGE},
986 impact_estimate='Routine maintenance cache clean',
987 )
988 self._apply_optimization(action)
989 results.append(action.to_dict())
991 final_snap = self.snapshot()
992 final_health = self.get_health_score(final_snap)
994 # Attribution: complete with delta outcome
995 if attribution_id:
996 try:
997 complete_action(attribution_id, outcome={
998 'status': 'completed',
999 'baseline_health_score': baseline_health,
1000 'final_health_score': final_health,
1001 'health_score_delta': round(final_health - baseline_health, 4),
1002 'actions_applied': len(results),
1003 })
1004 except Exception:
1005 pass
1007 return {
1008 'snapshot': snap.to_dict(),
1009 'health_score': final_health,
1010 'actions': results,
1011 }
1014# =====================================================================
1015# Singleton
1016# =====================================================================
1018_optimizer: Optional[ComputeOptimizer] = None
1019_optimizer_lock = threading.Lock()
1022def get_optimizer() -> ComputeOptimizer:
1023 """Get or create the singleton ComputeOptimizer."""
1024 global _optimizer
1025 if _optimizer is None:
1026 with _optimizer_lock:
1027 if _optimizer is None:
1028 _optimizer = ComputeOptimizer()
1029 return _optimizer
1032# =====================================================================
1033# Flask Blueprint
1034# =====================================================================
1036def create_optimizer_blueprint():
1037 """Create a Flask Blueprint for system optimizer API endpoints.
1039 Endpoints:
1040 GET /api/system/health - Current snapshot + health score
1041 GET /api/system/optimizations - Recent optimizations applied
1042 POST /api/system/optimize - Trigger manual optimization check
1044 Returns:
1045 Flask Blueprint instance, or None if Flask is not available.
1046 """
1047 try:
1048 from flask import Blueprint, jsonify, request
1049 except ImportError:
1050 logger.debug("Flask not available -- optimizer blueprint not created")
1051 return None
1053 bp = Blueprint('compute_optimizer', __name__, url_prefix='/api/system')
1055 @bp.route('/health', methods=['GET'])
1056 def system_health():
1057 optimizer = get_optimizer()
1058 snap = optimizer.snapshot()
1059 return jsonify({
1060 'health_score': optimizer.get_health_score(snap),
1061 'snapshot': snap.to_dict(),
1062 'optimizations_applied': optimizer.get_stats()['optimizations_applied'],
1063 })
1065 @bp.route('/optimizations', methods=['GET'])
1066 def system_optimizations():
1067 optimizer = get_optimizer()
1068 stats = optimizer.get_stats()
1069 limit = request.args.get('limit', 20, type=int)
1070 stats['recent_history'] = stats['recent_history'][-limit:]
1071 return jsonify(stats)
1073 @bp.route('/optimize', methods=['POST'])
1074 def trigger_optimize():
1075 optimizer = get_optimizer()
1076 result = optimizer.trigger_optimization()
1077 return jsonify(result)
1079 return bp