Coverage for integrations / agent_engine / compute_mesh_service.py: 24.0%
246 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HART OS Compute Mesh Service — Privacy-Bounded Cross-Device Intelligence.
4Same user's devices automatically discover each other and share compute.
5Privacy boundary = user_id (Ed25519 keypair). Only YOUR devices can join
6YOUR mesh. Different users NEVER share compute through this service.
8Discovery:
9 LAN → UDP beacon (port 6780) + device fingerprint
10 WAN → STUN/TURN for NAT traversal, WireGuard tunnel
11 Internet → WireGuard over public IP or relay
13Task relay protocol:
14 POST /mesh/infer — Offload model inference
15 POST /mesh/status — Device health + available compute
16 GET /mesh/peers — List paired devices
17 POST /mesh/pair — Initiate device pairing (challenge-response)
18"""
19import hashlib
20import json
21import logging
22import os
23import threading
24import time
25from typing import Any, Dict, List, Optional
27from core.port_registry import get_port
29logger = logging.getLogger('hevolve.compute_mesh')
31# ═══════════════════════════════════════════════════════════════
32# Compute Mesh Service
33# ═══════════════════════════════════════════════════════════════
35class MeshPeer:
36 """Represents a paired device in the compute mesh."""
38 def __init__(self, peer_id: str, address: str, public_key: str,
39 capabilities: Optional[dict] = None):
40 self.peer_id = peer_id
41 self.address = address
42 self.public_key = public_key
43 self.capabilities = capabilities or {}
44 self.last_seen = time.time()
45 self.latency_ms: Optional[int] = None
46 self.available_compute: float = 0.0 # 0.0 to 1.0
47 self.loaded_models: List[str] = []
49 def to_dict(self) -> dict:
50 return {
51 'peer_id': self.peer_id,
52 'address': self.address,
53 'public_key': self.public_key[:16] + '...',
54 'capabilities': self.capabilities,
55 'last_seen': self.last_seen,
56 'latency_ms': self.latency_ms,
57 'available_compute': self.available_compute,
58 'loaded_models': self.loaded_models,
59 'age_seconds': int(time.time() - self.last_seen),
60 }
62 def is_stale(self, max_age: int = 300) -> bool:
63 """Peer is stale if not seen for max_age seconds."""
64 return (time.time() - self.last_seen) > max_age
67class ComputeMeshService:
68 """Same-user device compute aggregation.
70 get_available_peers() and score() provide the interface used by
71 CodingAgentOrchestrator for trust-based hive offload of coding tasks.
72 """
74 def __init__(
75 self,
76 task_relay_port: int = 6796,
77 wg_port: int = 6795,
78 max_offload_percent: int = 50,
79 allow_wan: bool = True,
80 stun_server: str = 'stun:stun.l.google.com:19302',
81 mesh_interface: str = 'hart-mesh0',
82 mesh_subnet: str = '10.99.0.0/16',
83 auto_accept: bool = True,
84 ):
85 self.task_relay_port = task_relay_port
86 self.wg_port = wg_port
87 self.max_offload_percent = max_offload_percent
88 self.allow_wan = allow_wan
89 self.stun_server = stun_server
90 self.mesh_interface = mesh_interface
91 self.mesh_subnet = mesh_subnet
92 self.auto_accept = auto_accept
94 self._peers: Dict[str, MeshPeer] = {}
95 self._lock = threading.Lock()
96 self._running = False
97 self._device_id: Optional[str] = None
98 self._mesh_ip: Optional[str] = None
100 # Load device identity
101 self._load_identity()
103 logger.info(
104 f"ComputeMeshService initialized: relay_port={task_relay_port}, "
105 f"wg_port={wg_port}, max_offload={max_offload_percent}%"
106 )
108 def _load_identity(self):
109 """Load mesh device identity from filesystem."""
110 data_dir = os.environ.get('HEVOLVE_DATA_DIR', '/var/lib/hart')
111 key_dir = os.path.join(data_dir, 'mesh', 'keys')
113 try:
114 mesh_ip_file = os.path.join(key_dir, 'mesh_ip')
115 if os.path.exists(mesh_ip_file):
116 with open(mesh_ip_file) as f:
117 self._mesh_ip = f.read().strip()
118 logger.info(f"Mesh IP: {self._mesh_ip}")
120 pub_key_file = os.path.join(key_dir, 'public.key')
121 if os.path.exists(pub_key_file):
122 with open(pub_key_file) as f:
123 pub_key = f.read().strip()
124 self._device_id = hashlib.sha256(pub_key.encode()).hexdigest()[:16]
125 logger.info(f"Device ID: {self._device_id}")
127 # Load node identity for user verification
128 node_key_file = os.path.join(data_dir, 'node_public.key')
129 if os.path.exists(node_key_file):
130 with open(node_key_file, 'rb') as f:
131 self._node_public_key = f.read()
132 else:
133 self._node_public_key = None
134 except Exception as e:
135 logger.warning(f"Could not load mesh identity: {e}")
137 # ─── Peer Discovery ──────────────────────────────────────
139 def discover_peers(self) -> List[Dict[str, Any]]:
140 """Find same-user devices via discovery service."""
141 from core.http_pool import pooled_get
142 from urllib.parse import urlparse
144 discovered = []
146 # Query local discovery service for peers
147 try:
148 resp = pooled_get(f'http://localhost:{get_port("backend")}/api/social/peers', timeout=5)
149 if resp.status_code == 200:
150 peers = resp.json().get('peers', [])
151 for peer in peers:
152 # Only mesh with same-user devices
153 # In production, verify user_id via Ed25519 signature
154 peer_url = peer.get('url', '')
155 peer_address = ''
156 if peer_url:
157 try:
158 peer_address = urlparse(peer_url).hostname or ''
159 except Exception:
160 pass
161 peer_id = peer.get('node_id', '')
163 if peer_address and peer_id:
164 # Check if this peer supports mesh
165 try:
166 mesh_resp = pooled_get(
167 f'http://{peer_address}:{self.task_relay_port}/mesh/status',
168 timeout=3,
169 )
170 if mesh_resp.status_code == 200:
171 mesh_data = mesh_resp.json()
172 with self._lock:
173 if peer_id not in self._peers:
174 self._peers[peer_id] = MeshPeer(
175 peer_id=peer_id,
176 address=peer_address,
177 public_key=peer.get('public_key', ''),
178 capabilities=mesh_data.get('capabilities', {}),
179 )
180 else:
181 self._peers[peer_id].last_seen = time.time()
182 self._peers[peer_id].capabilities = mesh_data.get('capabilities', {})
183 self._peers[peer_id].available_compute = mesh_data.get('available_compute', 0)
184 self._peers[peer_id].loaded_models = mesh_data.get('loaded_models', [])
186 discovered.append(self._peers[peer_id].to_dict())
187 except Exception:
188 pass # Peer doesn't support mesh
189 except Exception as e:
190 logger.debug(f"Peer discovery error: {e}")
192 return discovered
194 # ─── Task Offload ────────────────────────────────────────
196 def offload_inference(
197 self,
198 peer_id: str,
199 model_type: str,
200 prompt: str,
201 options: Optional[dict] = None,
202 ) -> Dict[str, Any]:
203 """Send inference request to a mesh peer — PeerLink first, HTTP fallback."""
204 from core.http_pool import pooled_post
206 with self._lock:
207 peer = self._peers.get(peer_id)
209 if not peer:
210 return {'error': f'Unknown peer: {peer_id}'}
212 if peer.is_stale():
213 age = int(time.time() - peer.last_seen)
214 return {'error': f'Peer {peer_id} is stale (last seen {age}s ago)'}
216 payload = {
217 'model_type': model_type,
218 'prompt': prompt,
219 'options': options or {},
220 'source_device': self._device_id,
221 }
223 # Try PeerLink first (encrypted for cross-user, plain for same-user)
224 try:
225 from core.peer_link.link_manager import get_link_manager
226 link = get_link_manager().get_link(peer_id)
227 if link:
228 result = link.send('compute', payload,
229 wait_response=True,
230 timeout=(options or {}).get('timeout', 120))
231 if result and 'error' not in result:
232 result['offloaded_to'] = peer_id
233 result['peer_address'] = peer.address
234 result['transport'] = 'peerlink'
235 return result
236 except Exception:
237 pass
239 # HTTP fallback
240 try:
241 resp = pooled_post(
242 f'http://{peer.address}:{self.task_relay_port}/mesh/infer',
243 json=payload,
244 timeout=(options or {}).get('timeout', 120),
245 )
247 if resp.status_code == 200:
248 result = resp.json()
249 result['offloaded_to'] = peer_id
250 result['peer_address'] = peer.address
251 return result
252 else:
253 return {'error': f'Peer returned status {resp.status_code}'}
254 except Exception as e:
255 return {'error': f'Offload to {peer_id} failed: {str(e)}'}
257 def offload_to_best_peer(
258 self, model_type: str, prompt: str, options: Optional[dict] = None
259 ) -> Dict[str, Any]:
260 """Offload inference to the best available mesh peer."""
261 with self._lock:
262 candidates = [
263 p for p in self._peers.values()
264 if not p.is_stale() and p.available_compute > 0.1
265 ]
267 if not candidates:
268 return {'error': 'No mesh peers available for offload'}
270 # Sort by: model already loaded > available compute > lowest latency
271 def score(peer):
272 model_bonus = 10 if model_type in peer.loaded_models else 0
273 return model_bonus + peer.available_compute * 5 - (peer.latency_ms or 500) / 100
275 candidates.sort(key=score, reverse=True)
276 best = candidates[0]
278 logger.info(
279 f"Offloading {model_type} to peer {best.peer_id} "
280 f"(compute={best.available_compute:.1%}, models={best.loaded_models})"
281 )
283 return self.offload_inference(best.peer_id, model_type, prompt, options)
285 def get_available_peers(self) -> List[Dict[str, Any]]:
286 """Return non-stale peers as dicts (used by CodingAgentOrchestrator)."""
287 with self._lock:
288 return [p.to_dict() for p in self._peers.values() if not p.is_stale()]
290 def score(self, peer: Dict) -> float:
291 """Score a peer dict by compute availability and latency."""
292 compute = peer.get('available_compute', 0)
293 latency = peer.get('latency_ms') or 500
294 return compute * 5 - latency / 100
296 # ─── Device Pairing ──────────────────────────────────────
298 def pair_device(self, peer_address: str) -> Dict[str, Any]:
299 """Initiate pairing with a new device."""
300 from core.http_pool import pooled_post
302 try:
303 # Send pairing challenge
304 challenge = hashlib.sha256(os.urandom(32)).hexdigest()
305 resp = pooled_post(
306 f'http://{peer_address}:{self.task_relay_port}/mesh/pair',
307 json={
308 'action': 'challenge',
309 'challenge': challenge,
310 'device_id': self._device_id,
311 'mesh_ip': self._mesh_ip,
312 },
313 timeout=10,
314 )
316 if resp.status_code == 200:
317 result = resp.json()
318 if result.get('accepted'):
319 peer_id = result.get('device_id', peer_address)
320 with self._lock:
321 self._peers[peer_id] = MeshPeer(
322 peer_id=peer_id,
323 address=peer_address,
324 public_key=result.get('public_key', ''),
325 capabilities=result.get('capabilities', {}),
326 )
327 logger.info(f"Paired with device: {peer_id} at {peer_address}")
328 return {'status': 'paired', 'peer_id': peer_id}
329 else:
330 return {'status': 'rejected', 'reason': result.get('reason', 'unknown')}
331 else:
332 return {'error': f'Pairing failed: HTTP {resp.status_code}'}
333 except Exception as e:
334 return {'error': f'Pairing failed: {str(e)}'}
336 # ─── Status ──────────────────────────────────────────────
338 def get_mesh_status(self) -> Dict[str, Any]:
339 """Get aggregate compute inventory across all paired devices."""
340 with self._lock:
341 active_peers = [p for p in self._peers.values() if not p.is_stale()]
343 # Get local capabilities
344 local_caps = self._get_local_capabilities()
346 return {
347 'status': 'running' if self._running else 'stopped',
348 'device_id': self._device_id,
349 'mesh_ip': self._mesh_ip,
350 'peer_count': len(active_peers),
351 'total_peers_known': len(self._peers),
352 'local': local_caps,
353 'peers': [p.to_dict() for p in active_peers],
354 'aggregate': {
355 'total_compute': local_caps.get('available_compute', 0) + sum(
356 p.available_compute for p in active_peers
357 ),
358 'total_models': list(set(
359 local_caps.get('loaded_models', []) +
360 [m for p in active_peers for m in p.loaded_models]
361 )),
362 },
363 'max_offload_percent': self.max_offload_percent,
364 'allow_wan': self.allow_wan,
365 }
367 def _get_local_capabilities(self) -> dict:
368 """Detect local compute capabilities."""
369 import shutil
371 caps = {
372 'cpu_count': os.cpu_count() or 1,
373 'available_compute': 1.0 - (self.max_offload_percent / 100.0),
374 'loaded_models': [],
375 }
377 # Detect GPU (delegate to VRAMManager — single source of truth)
378 try:
379 from integrations.service_tools.vram_manager import vram_manager
380 gpu_info = vram_manager.detect_gpu()
381 if gpu_info.get('cuda_available'):
382 caps['gpu'] = f"{gpu_info.get('name', 'GPU')}, {gpu_info.get('total_gb', 0)}GB"
383 else:
384 caps['gpu'] = gpu_info.get('name') or None
385 except Exception:
386 pass
388 # Detect RAM (cross-platform: psutil first, /proc/meminfo fallback)
389 try:
390 import psutil
391 mem = psutil.virtual_memory()
392 caps['ram_gb'] = round(mem.total / (1024 ** 3), 1)
393 except ImportError:
394 try:
395 with open('/proc/meminfo') as f:
396 for line in f:
397 if line.startswith('MemTotal:'):
398 kb = int(line.split()[1])
399 caps['ram_gb'] = round(kb / 1024 / 1024, 1)
400 break
401 except Exception:
402 pass
404 # Check which models are loaded
405 from core.http_pool import pooled_get as _pooled_get
406 try:
407 resp = _pooled_get(f'http://localhost:{get_port("model_bus")}/v1/models', timeout=3)
408 if resp.status_code == 200:
409 models = resp.json().get('models', [])
410 caps['loaded_models'] = [m.get('type', 'unknown') for m in models]
411 except Exception:
412 pass
414 return caps
416 # ─── HTTP Server ─────────────────────────────────────────
418 def _create_flask_app(self):
419 """Create Flask app for task relay HTTP API."""
420 from flask import Flask, request, jsonify
422 app = Flask(__name__)
424 @app.route('/mesh/status', methods=['GET', 'POST'])
425 def mesh_status():
426 return jsonify(self.get_mesh_status())
428 @app.route('/mesh/peers', methods=['GET'])
429 def mesh_peers():
430 with self._lock:
431 peers = [p.to_dict() for p in self._peers.values()]
432 return jsonify({'peers': peers})
434 @app.route('/mesh/pair', methods=['POST'])
435 def mesh_pair():
436 data = request.get_json(force=True)
438 if data.get('action') == 'challenge':
439 # Incoming pairing request
440 if self.auto_accept:
441 # Auto-accept same-user devices
442 peer_id = data.get('device_id', 'unknown')
443 return jsonify({
444 'accepted': True,
445 'device_id': self._device_id,
446 'public_key': '', # WireGuard public key
447 'capabilities': self._get_local_capabilities(),
448 })
449 else:
450 return jsonify({'accepted': False, 'reason': 'manual approval required'})
451 elif 'peer_address' in data:
452 # Outgoing pairing request
453 result = self.pair_device(data['peer_address'])
454 return jsonify(result)
455 else:
456 return jsonify({'error': 'Invalid pairing request'}), 400
458 @app.route('/mesh/infer', methods=['POST'])
459 def mesh_infer():
460 data = request.get_json(force=True)
461 model_type = data.get('model_type', 'llm')
462 prompt = data.get('prompt', '')
464 # Forward to local Model Bus
465 import requests as req
466 try:
467 resp = req.post(
468 f'http://localhost:{get_port("model_bus")}/v1/chat',
469 json={'prompt': prompt, 'model_type': model_type},
470 timeout=120,
471 )
472 if resp.status_code == 200:
473 result = resp.json()
474 result['served_by'] = self._device_id
475 return jsonify(result)
476 else:
477 return jsonify({'error': f'Local inference failed: {resp.status_code}'}), 502
478 except Exception as e:
479 return jsonify({'error': f'Local inference error: {str(e)}'}), 502
481 @app.route('/health', methods=['GET'])
482 def health():
483 return jsonify({'status': 'ok', 'service': 'compute-mesh'})
485 return app
487 # ─── Serve ───────────────────────────────────────────────
489 def serve_forever(self):
490 """Start the Compute Mesh service."""
491 self._running = True
493 # Background: periodic peer discovery
494 def _discovery_loop():
495 while self._running:
496 try:
497 self.discover_peers()
498 except Exception as e:
499 logger.error(f"Peer discovery error: {e}")
500 time.sleep(30)
502 # Background: peer health check
503 def _health_loop():
504 while self._running:
505 time.sleep(60)
506 with self._lock:
507 stale = [pid for pid, p in self._peers.items() if p.is_stale(600)]
508 for pid in stale:
509 logger.info(f"Removing stale peer: {pid}")
510 del self._peers[pid]
512 threading.Thread(target=_discovery_loop, daemon=True).start()
513 threading.Thread(target=_health_loop, daemon=True).start()
515 # Start Flask HTTP server for task relay
516 app = self._create_flask_app()
517 logger.info(f"Compute Mesh task relay starting on port {self.task_relay_port}")
519 try:
520 from waitress import serve
521 serve(app, host='0.0.0.0', port=self.task_relay_port, threads=4)
522 except ImportError:
523 app.run(host='0.0.0.0', port=self.task_relay_port, threaded=True)
526# ─── Module-level singleton ─────────────────────────────────
527_mesh_instance: Optional[ComputeMeshService] = None
528_mesh_lock = threading.Lock()
531def get_compute_mesh() -> ComputeMeshService:
532 """Get or create the singleton ComputeMeshService."""
533 global _mesh_instance
534 if _mesh_instance is None:
535 with _mesh_lock:
536 if _mesh_instance is None:
537 _mesh_instance = ComputeMeshService()
538 return _mesh_instance