Coverage for integrations / agent_engine / compute_mesh_service.py: 24.0%

246 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HART OS Compute Mesh Service — Privacy-Bounded Cross-Device Intelligence. 

3 

4Same user's devices automatically discover each other and share compute. 

5Privacy boundary = user_id (Ed25519 keypair). Only YOUR devices can join 

6YOUR mesh. Different users NEVER share compute through this service. 

7 

8Discovery: 

9 LAN → UDP beacon (port 6780) + device fingerprint 

10 WAN → STUN/TURN for NAT traversal, WireGuard tunnel 

11 Internet → WireGuard over public IP or relay 

12 

13Task relay protocol: 

14 POST /mesh/infer — Offload model inference 

15 POST /mesh/status — Device health + available compute 

16 GET /mesh/peers — List paired devices 

17 POST /mesh/pair — Initiate device pairing (challenge-response) 

18""" 

19import hashlib 

20import json 

21import logging 

22import os 

23import threading 

24import time 

25from typing import Any, Dict, List, Optional 

26 

27from core.port_registry import get_port 

28 

29logger = logging.getLogger('hevolve.compute_mesh') 

30 

31# ═══════════════════════════════════════════════════════════════ 

32# Compute Mesh Service 

33# ═══════════════════════════════════════════════════════════════ 

34 

35class MeshPeer: 

36 """Represents a paired device in the compute mesh.""" 

37 

38 def __init__(self, peer_id: str, address: str, public_key: str, 

39 capabilities: Optional[dict] = None): 

40 self.peer_id = peer_id 

41 self.address = address 

42 self.public_key = public_key 

43 self.capabilities = capabilities or {} 

44 self.last_seen = time.time() 

45 self.latency_ms: Optional[int] = None 

46 self.available_compute: float = 0.0 # 0.0 to 1.0 

47 self.loaded_models: List[str] = [] 

48 

49 def to_dict(self) -> dict: 

50 return { 

51 'peer_id': self.peer_id, 

52 'address': self.address, 

53 'public_key': self.public_key[:16] + '...', 

54 'capabilities': self.capabilities, 

55 'last_seen': self.last_seen, 

56 'latency_ms': self.latency_ms, 

57 'available_compute': self.available_compute, 

58 'loaded_models': self.loaded_models, 

59 'age_seconds': int(time.time() - self.last_seen), 

60 } 

61 

62 def is_stale(self, max_age: int = 300) -> bool: 

63 """Peer is stale if not seen for max_age seconds.""" 

64 return (time.time() - self.last_seen) > max_age 

65 

66 

67class ComputeMeshService: 

68 """Same-user device compute aggregation. 

69 

70 get_available_peers() and score() provide the interface used by 

71 CodingAgentOrchestrator for trust-based hive offload of coding tasks. 

72 """ 

73 

74 def __init__( 

75 self, 

76 task_relay_port: int = 6796, 

77 wg_port: int = 6795, 

78 max_offload_percent: int = 50, 

79 allow_wan: bool = True, 

80 stun_server: str = 'stun:stun.l.google.com:19302', 

81 mesh_interface: str = 'hart-mesh0', 

82 mesh_subnet: str = '10.99.0.0/16', 

83 auto_accept: bool = True, 

84 ): 

85 self.task_relay_port = task_relay_port 

86 self.wg_port = wg_port 

87 self.max_offload_percent = max_offload_percent 

88 self.allow_wan = allow_wan 

89 self.stun_server = stun_server 

90 self.mesh_interface = mesh_interface 

91 self.mesh_subnet = mesh_subnet 

92 self.auto_accept = auto_accept 

93 

94 self._peers: Dict[str, MeshPeer] = {} 

95 self._lock = threading.Lock() 

96 self._running = False 

97 self._device_id: Optional[str] = None 

98 self._mesh_ip: Optional[str] = None 

99 

100 # Load device identity 

101 self._load_identity() 

102 

103 logger.info( 

104 f"ComputeMeshService initialized: relay_port={task_relay_port}, " 

105 f"wg_port={wg_port}, max_offload={max_offload_percent}%" 

106 ) 

107 

108 def _load_identity(self): 

109 """Load mesh device identity from filesystem.""" 

110 data_dir = os.environ.get('HEVOLVE_DATA_DIR', '/var/lib/hart') 

111 key_dir = os.path.join(data_dir, 'mesh', 'keys') 

112 

113 try: 

114 mesh_ip_file = os.path.join(key_dir, 'mesh_ip') 

115 if os.path.exists(mesh_ip_file): 

116 with open(mesh_ip_file) as f: 

117 self._mesh_ip = f.read().strip() 

118 logger.info(f"Mesh IP: {self._mesh_ip}") 

119 

120 pub_key_file = os.path.join(key_dir, 'public.key') 

121 if os.path.exists(pub_key_file): 

122 with open(pub_key_file) as f: 

123 pub_key = f.read().strip() 

124 self._device_id = hashlib.sha256(pub_key.encode()).hexdigest()[:16] 

125 logger.info(f"Device ID: {self._device_id}") 

126 

127 # Load node identity for user verification 

128 node_key_file = os.path.join(data_dir, 'node_public.key') 

129 if os.path.exists(node_key_file): 

130 with open(node_key_file, 'rb') as f: 

131 self._node_public_key = f.read() 

132 else: 

133 self._node_public_key = None 

134 except Exception as e: 

135 logger.warning(f"Could not load mesh identity: {e}") 

136 

137 # ─── Peer Discovery ────────────────────────────────────── 

138 

139 def discover_peers(self) -> List[Dict[str, Any]]: 

140 """Find same-user devices via discovery service.""" 

141 from core.http_pool import pooled_get 

142 from urllib.parse import urlparse 

143 

144 discovered = [] 

145 

146 # Query local discovery service for peers 

147 try: 

148 resp = pooled_get(f'http://localhost:{get_port("backend")}/api/social/peers', timeout=5) 

149 if resp.status_code == 200: 

150 peers = resp.json().get('peers', []) 

151 for peer in peers: 

152 # Only mesh with same-user devices 

153 # In production, verify user_id via Ed25519 signature 

154 peer_url = peer.get('url', '') 

155 peer_address = '' 

156 if peer_url: 

157 try: 

158 peer_address = urlparse(peer_url).hostname or '' 

159 except Exception: 

160 pass 

161 peer_id = peer.get('node_id', '') 

162 

163 if peer_address and peer_id: 

164 # Check if this peer supports mesh 

165 try: 

166 mesh_resp = pooled_get( 

167 f'http://{peer_address}:{self.task_relay_port}/mesh/status', 

168 timeout=3, 

169 ) 

170 if mesh_resp.status_code == 200: 

171 mesh_data = mesh_resp.json() 

172 with self._lock: 

173 if peer_id not in self._peers: 

174 self._peers[peer_id] = MeshPeer( 

175 peer_id=peer_id, 

176 address=peer_address, 

177 public_key=peer.get('public_key', ''), 

178 capabilities=mesh_data.get('capabilities', {}), 

179 ) 

180 else: 

181 self._peers[peer_id].last_seen = time.time() 

182 self._peers[peer_id].capabilities = mesh_data.get('capabilities', {}) 

183 self._peers[peer_id].available_compute = mesh_data.get('available_compute', 0) 

184 self._peers[peer_id].loaded_models = mesh_data.get('loaded_models', []) 

185 

186 discovered.append(self._peers[peer_id].to_dict()) 

187 except Exception: 

188 pass # Peer doesn't support mesh 

189 except Exception as e: 

190 logger.debug(f"Peer discovery error: {e}") 

191 

192 return discovered 

193 

194 # ─── Task Offload ──────────────────────────────────────── 

195 

196 def offload_inference( 

197 self, 

198 peer_id: str, 

199 model_type: str, 

200 prompt: str, 

201 options: Optional[dict] = None, 

202 ) -> Dict[str, Any]: 

203 """Send inference request to a mesh peer — PeerLink first, HTTP fallback.""" 

204 from core.http_pool import pooled_post 

205 

206 with self._lock: 

207 peer = self._peers.get(peer_id) 

208 

209 if not peer: 

210 return {'error': f'Unknown peer: {peer_id}'} 

211 

212 if peer.is_stale(): 

213 age = int(time.time() - peer.last_seen) 

214 return {'error': f'Peer {peer_id} is stale (last seen {age}s ago)'} 

215 

216 payload = { 

217 'model_type': model_type, 

218 'prompt': prompt, 

219 'options': options or {}, 

220 'source_device': self._device_id, 

221 } 

222 

223 # Try PeerLink first (encrypted for cross-user, plain for same-user) 

224 try: 

225 from core.peer_link.link_manager import get_link_manager 

226 link = get_link_manager().get_link(peer_id) 

227 if link: 

228 result = link.send('compute', payload, 

229 wait_response=True, 

230 timeout=(options or {}).get('timeout', 120)) 

231 if result and 'error' not in result: 

232 result['offloaded_to'] = peer_id 

233 result['peer_address'] = peer.address 

234 result['transport'] = 'peerlink' 

235 return result 

236 except Exception: 

237 pass 

238 

239 # HTTP fallback 

240 try: 

241 resp = pooled_post( 

242 f'http://{peer.address}:{self.task_relay_port}/mesh/infer', 

243 json=payload, 

244 timeout=(options or {}).get('timeout', 120), 

245 ) 

246 

247 if resp.status_code == 200: 

248 result = resp.json() 

249 result['offloaded_to'] = peer_id 

250 result['peer_address'] = peer.address 

251 return result 

252 else: 

253 return {'error': f'Peer returned status {resp.status_code}'} 

254 except Exception as e: 

255 return {'error': f'Offload to {peer_id} failed: {str(e)}'} 

256 

257 def offload_to_best_peer( 

258 self, model_type: str, prompt: str, options: Optional[dict] = None 

259 ) -> Dict[str, Any]: 

260 """Offload inference to the best available mesh peer.""" 

261 with self._lock: 

262 candidates = [ 

263 p for p in self._peers.values() 

264 if not p.is_stale() and p.available_compute > 0.1 

265 ] 

266 

267 if not candidates: 

268 return {'error': 'No mesh peers available for offload'} 

269 

270 # Sort by: model already loaded > available compute > lowest latency 

271 def score(peer): 

272 model_bonus = 10 if model_type in peer.loaded_models else 0 

273 return model_bonus + peer.available_compute * 5 - (peer.latency_ms or 500) / 100 

274 

275 candidates.sort(key=score, reverse=True) 

276 best = candidates[0] 

277 

278 logger.info( 

279 f"Offloading {model_type} to peer {best.peer_id} " 

280 f"(compute={best.available_compute:.1%}, models={best.loaded_models})" 

281 ) 

282 

283 return self.offload_inference(best.peer_id, model_type, prompt, options) 

284 

285 def get_available_peers(self) -> List[Dict[str, Any]]: 

286 """Return non-stale peers as dicts (used by CodingAgentOrchestrator).""" 

287 with self._lock: 

288 return [p.to_dict() for p in self._peers.values() if not p.is_stale()] 

289 

290 def score(self, peer: Dict) -> float: 

291 """Score a peer dict by compute availability and latency.""" 

292 compute = peer.get('available_compute', 0) 

293 latency = peer.get('latency_ms') or 500 

294 return compute * 5 - latency / 100 

295 

296 # ─── Device Pairing ────────────────────────────────────── 

297 

298 def pair_device(self, peer_address: str) -> Dict[str, Any]: 

299 """Initiate pairing with a new device.""" 

300 from core.http_pool import pooled_post 

301 

302 try: 

303 # Send pairing challenge 

304 challenge = hashlib.sha256(os.urandom(32)).hexdigest() 

305 resp = pooled_post( 

306 f'http://{peer_address}:{self.task_relay_port}/mesh/pair', 

307 json={ 

308 'action': 'challenge', 

309 'challenge': challenge, 

310 'device_id': self._device_id, 

311 'mesh_ip': self._mesh_ip, 

312 }, 

313 timeout=10, 

314 ) 

315 

316 if resp.status_code == 200: 

317 result = resp.json() 

318 if result.get('accepted'): 

319 peer_id = result.get('device_id', peer_address) 

320 with self._lock: 

321 self._peers[peer_id] = MeshPeer( 

322 peer_id=peer_id, 

323 address=peer_address, 

324 public_key=result.get('public_key', ''), 

325 capabilities=result.get('capabilities', {}), 

326 ) 

327 logger.info(f"Paired with device: {peer_id} at {peer_address}") 

328 return {'status': 'paired', 'peer_id': peer_id} 

329 else: 

330 return {'status': 'rejected', 'reason': result.get('reason', 'unknown')} 

331 else: 

332 return {'error': f'Pairing failed: HTTP {resp.status_code}'} 

333 except Exception as e: 

334 return {'error': f'Pairing failed: {str(e)}'} 

335 

336 # ─── Status ────────────────────────────────────────────── 

337 

338 def get_mesh_status(self) -> Dict[str, Any]: 

339 """Get aggregate compute inventory across all paired devices.""" 

340 with self._lock: 

341 active_peers = [p for p in self._peers.values() if not p.is_stale()] 

342 

343 # Get local capabilities 

344 local_caps = self._get_local_capabilities() 

345 

346 return { 

347 'status': 'running' if self._running else 'stopped', 

348 'device_id': self._device_id, 

349 'mesh_ip': self._mesh_ip, 

350 'peer_count': len(active_peers), 

351 'total_peers_known': len(self._peers), 

352 'local': local_caps, 

353 'peers': [p.to_dict() for p in active_peers], 

354 'aggregate': { 

355 'total_compute': local_caps.get('available_compute', 0) + sum( 

356 p.available_compute for p in active_peers 

357 ), 

358 'total_models': list(set( 

359 local_caps.get('loaded_models', []) + 

360 [m for p in active_peers for m in p.loaded_models] 

361 )), 

362 }, 

363 'max_offload_percent': self.max_offload_percent, 

364 'allow_wan': self.allow_wan, 

365 } 

366 

367 def _get_local_capabilities(self) -> dict: 

368 """Detect local compute capabilities.""" 

369 import shutil 

370 

371 caps = { 

372 'cpu_count': os.cpu_count() or 1, 

373 'available_compute': 1.0 - (self.max_offload_percent / 100.0), 

374 'loaded_models': [], 

375 } 

376 

377 # Detect GPU (delegate to VRAMManager — single source of truth) 

378 try: 

379 from integrations.service_tools.vram_manager import vram_manager 

380 gpu_info = vram_manager.detect_gpu() 

381 if gpu_info.get('cuda_available'): 

382 caps['gpu'] = f"{gpu_info.get('name', 'GPU')}, {gpu_info.get('total_gb', 0)}GB" 

383 else: 

384 caps['gpu'] = gpu_info.get('name') or None 

385 except Exception: 

386 pass 

387 

388 # Detect RAM (cross-platform: psutil first, /proc/meminfo fallback) 

389 try: 

390 import psutil 

391 mem = psutil.virtual_memory() 

392 caps['ram_gb'] = round(mem.total / (1024 ** 3), 1) 

393 except ImportError: 

394 try: 

395 with open('/proc/meminfo') as f: 

396 for line in f: 

397 if line.startswith('MemTotal:'): 

398 kb = int(line.split()[1]) 

399 caps['ram_gb'] = round(kb / 1024 / 1024, 1) 

400 break 

401 except Exception: 

402 pass 

403 

404 # Check which models are loaded 

405 from core.http_pool import pooled_get as _pooled_get 

406 try: 

407 resp = _pooled_get(f'http://localhost:{get_port("model_bus")}/v1/models', timeout=3) 

408 if resp.status_code == 200: 

409 models = resp.json().get('models', []) 

410 caps['loaded_models'] = [m.get('type', 'unknown') for m in models] 

411 except Exception: 

412 pass 

413 

414 return caps 

415 

416 # ─── HTTP Server ───────────────────────────────────────── 

417 

418 def _create_flask_app(self): 

419 """Create Flask app for task relay HTTP API.""" 

420 from flask import Flask, request, jsonify 

421 

422 app = Flask(__name__) 

423 

424 @app.route('/mesh/status', methods=['GET', 'POST']) 

425 def mesh_status(): 

426 return jsonify(self.get_mesh_status()) 

427 

428 @app.route('/mesh/peers', methods=['GET']) 

429 def mesh_peers(): 

430 with self._lock: 

431 peers = [p.to_dict() for p in self._peers.values()] 

432 return jsonify({'peers': peers}) 

433 

434 @app.route('/mesh/pair', methods=['POST']) 

435 def mesh_pair(): 

436 data = request.get_json(force=True) 

437 

438 if data.get('action') == 'challenge': 

439 # Incoming pairing request 

440 if self.auto_accept: 

441 # Auto-accept same-user devices 

442 peer_id = data.get('device_id', 'unknown') 

443 return jsonify({ 

444 'accepted': True, 

445 'device_id': self._device_id, 

446 'public_key': '', # WireGuard public key 

447 'capabilities': self._get_local_capabilities(), 

448 }) 

449 else: 

450 return jsonify({'accepted': False, 'reason': 'manual approval required'}) 

451 elif 'peer_address' in data: 

452 # Outgoing pairing request 

453 result = self.pair_device(data['peer_address']) 

454 return jsonify(result) 

455 else: 

456 return jsonify({'error': 'Invalid pairing request'}), 400 

457 

458 @app.route('/mesh/infer', methods=['POST']) 

459 def mesh_infer(): 

460 data = request.get_json(force=True) 

461 model_type = data.get('model_type', 'llm') 

462 prompt = data.get('prompt', '') 

463 

464 # Forward to local Model Bus 

465 import requests as req 

466 try: 

467 resp = req.post( 

468 f'http://localhost:{get_port("model_bus")}/v1/chat', 

469 json={'prompt': prompt, 'model_type': model_type}, 

470 timeout=120, 

471 ) 

472 if resp.status_code == 200: 

473 result = resp.json() 

474 result['served_by'] = self._device_id 

475 return jsonify(result) 

476 else: 

477 return jsonify({'error': f'Local inference failed: {resp.status_code}'}), 502 

478 except Exception as e: 

479 return jsonify({'error': f'Local inference error: {str(e)}'}), 502 

480 

481 @app.route('/health', methods=['GET']) 

482 def health(): 

483 return jsonify({'status': 'ok', 'service': 'compute-mesh'}) 

484 

485 return app 

486 

487 # ─── Serve ─────────────────────────────────────────────── 

488 

489 def serve_forever(self): 

490 """Start the Compute Mesh service.""" 

491 self._running = True 

492 

493 # Background: periodic peer discovery 

494 def _discovery_loop(): 

495 while self._running: 

496 try: 

497 self.discover_peers() 

498 except Exception as e: 

499 logger.error(f"Peer discovery error: {e}") 

500 time.sleep(30) 

501 

502 # Background: peer health check 

503 def _health_loop(): 

504 while self._running: 

505 time.sleep(60) 

506 with self._lock: 

507 stale = [pid for pid, p in self._peers.items() if p.is_stale(600)] 

508 for pid in stale: 

509 logger.info(f"Removing stale peer: {pid}") 

510 del self._peers[pid] 

511 

512 threading.Thread(target=_discovery_loop, daemon=True).start() 

513 threading.Thread(target=_health_loop, daemon=True).start() 

514 

515 # Start Flask HTTP server for task relay 

516 app = self._create_flask_app() 

517 logger.info(f"Compute Mesh task relay starting on port {self.task_relay_port}") 

518 

519 try: 

520 from waitress import serve 

521 serve(app, host='0.0.0.0', port=self.task_relay_port, threads=4) 

522 except ImportError: 

523 app.run(host='0.0.0.0', port=self.task_relay_port, threaded=True) 

524 

525 

526# ─── Module-level singleton ───────────────────────────────── 

527_mesh_instance: Optional[ComputeMeshService] = None 

528_mesh_lock = threading.Lock() 

529 

530 

531def get_compute_mesh() -> ComputeMeshService: 

532 """Get or create the singleton ComputeMeshService.""" 

533 global _mesh_instance 

534 if _mesh_instance is None: 

535 with _mesh_lock: 

536 if _mesh_instance is None: 

537 _mesh_instance = ComputeMeshService() 

538 return _mesh_instance