Coverage for integrations / agent_engine / hive_expert_discovery.py: 0.0%

185 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1"""Auto-register hive-served expert-tier models into the canonical 

2``ModelRegistry`` by subscribing to ``peer.capability.*`` gossip on 

3the platform EventBus. 

4 

5Design intent 

6============= 

7 

8The speculative dispatcher's design has a "hive expert" tier: when a 

9local draft escalates, the bigger / fine-tuned model that actually 

10takes the turn should run on a peer compute node (regional or central 

11deployment), not the same local 4B that served the draft. Without 

12that, ``_pick_expert_for_delegate('hive', ...)`` falls back to local 

13fast and the speculative refine path is just the local 4B refining 

14its own kind of output — pure waste. 

15 

16This module closes that gap on the **consumer** side. When a peer 

17emits a ``peer.capability.announce`` event (producer side ships 

18separately on the peer's node daemon), this subscriber: 

19 

20 1. Validates the peer's trust signature against the master-key 

21 delegation chain (``security.key_delegation.verify_peer_attestation``). 

22 Until that API ships, an env-var allowlist 

23 (``HEVOLVE_HIVE_TRUSTED_PEERS``) is the safe fallback so an 

24 operator can manually whitelist peers they trust. 

25 2. Pings the advertised endpoint for reachability and latency 

26 (re-uses the same HTTP probe pattern ``LlamaConfig`` uses for 

27 llama-server health — see commit 3f9be3be for the 503-aware 

28 liveness check). 

29 3. Registers each ``tier=expert`` model the peer advertises as a 

30 ``ModelBackend(is_local=False, tier=EXPERT, ...)`` in the 

31 canonical ``model_registry``. The dispatcher's existing 

32 ``get_expert_model()`` then returns the hive-served entry 

33 automatically — no dispatcher-side change required. 

34 4. Health-checks each registered peer every ``_HEALTH_CHECK_INTERVAL_S``. 

35 After ``_HEALTH_CHECK_FAIL_BUDGET`` consecutive failures the peer's 

36 models get ``ModelRegistry.unregister``'d so the dispatcher never 

37 dispatches to a dead endpoint. 

38 5. ``peer.capability.revoke`` events drop the peer's models 

39 immediately (clean shutdown path). 

40 

41Producer-side gap 

42================= 

43 

44No peer emits ``peer.capability.announce`` yet — that producer ships 

45separately on the peer node daemon. Until then this subscriber sits 

46idle: ``on_peer_announce`` is never called, the registry has no hive 

47entries, and the dispatcher transparently falls through to local 

48langchain. Telemetry's ``served_by`` field will read ``local_langchain`` 

49100% of the time until peers start announcing, giving you a clean 

50metric for hive-tier uptake afterwards. 

51 

52Reuse 

53===== 

54 

55Nothing in this file duplicates existing primitives: 

56 

57 - ``ModelRegistry.register`` / ``unregister`` — canonical registration 

58 - ``ModelBackend`` — same shape every other backend uses 

59 - ``ModelTier.EXPERT`` — single source of truth for tier identification 

60 - ``core.platform.registry.get_registry`` + ``.get('events')`` — same 

61 EventBus access pattern as the rest of the platform (see 

62 ``core.platform.events.emit_event``) 

63 - ``security.key_delegation.verify_peer_attestation`` (when shipped) — 

64 same trust chain the rest of the security layer uses 

65 - ``requests`` — already a project dependency; no new transport 

66""" 

67from __future__ import annotations 

68 

69import logging 

70import os 

71import threading 

72import time 

73from concurrent.futures import ThreadPoolExecutor 

74from typing import Any, Dict, List, Optional, Set 

75 

76import requests 

77 

78from integrations.agent_engine.model_registry import ( 

79 ModelBackend, 

80 ModelRegistry, 

81 ModelTier, 

82 model_registry as _default_registry, 

83) 

84 

85logger = logging.getLogger('hevolve_social') 

86 

87# Health-check cadence and fail budget. Tuned so a transient network 

88# blip (one missed ping) doesn't drop a peer, but a genuinely dead peer 

89# is dropped within ~3 minutes (3 misses × 60s). 

90_HEALTH_CHECK_INTERVAL_S = 60 

91_HEALTH_CHECK_FAIL_BUDGET = 3 

92_PING_TIMEOUT_S = 2.0 

93 

94# Minimum verified_baseline a peer's advertised model needs in order to 

95# be registered. Below this we treat the advertisement as untrusted 

96# noise — the peer is free to advertise but the dispatcher must never 

97# route real turns to a model whose own self-reported baseline is 

98# below random-chance reasoning quality. 

99_MIN_VERIFIED_BASELINE = 0.5 

100 

101# Gossip topics. Producer side (peer's capability advertiser daemon) 

102# ships separately; this module just listens. 

103_TOPIC_ANNOUNCE = 'peer.capability.announce' 

104_TOPIC_REVOKE = 'peer.capability.revoke' 

105 

106 

107def _backend_id_for(peer_id: str, model_id: str) -> str: 

108 """Canonical ID format for a hive-served backend entry. 

109 

110 Centralised so the announce / revoke / health-check paths all 

111 speak the same prefix convention. 

112 """ 

113 return f'hive-{peer_id}-{model_id}' 

114 

115 

116class HiveExpertDiscovery: 

117 """Subscribes to ``peer.capability.*`` gossip and keeps the canonical 

118 ``ModelRegistry`` in sync with reachable hive expert peers. 

119 

120 Thread-safety 

121 ------------- 

122 Internal state (``_peer_models``, ``_fail_count``) is guarded by 

123 ``self._lock``. ``ModelRegistry.register`` / ``unregister`` carry 

124 their own lock so cross-thread registration is safe. The health 

125 check loop runs on a dedicated ``ThreadPoolExecutor`` worker; it 

126 holds ``self._lock`` only while reading the peer list, never while 

127 pinging. 

128 """ 

129 

130 def __init__(self, registry: Optional[ModelRegistry] = None): 

131 self._registry = registry or _default_registry 

132 self._lock = threading.Lock() 

133 self._peer_models: Dict[str, Set[str]] = {} 

134 self._fail_count: Dict[str, int] = {} 

135 # Worker pool size 2: one for the health-check loop, one slack 

136 # for any future async announce-validation that needs more than 

137 # the bus dispatcher's caller thread. 

138 self._pool = ThreadPoolExecutor( 

139 max_workers=2, thread_name_prefix='hive_expert_discovery') 

140 self._stop = threading.Event() 

141 self._subscribed = False 

142 

143 # ── Public API ────────────────────────────────────────────────── 

144 

145 def attach_to_event_bus(self) -> bool: 

146 """Wire up gossip subscriptions on the platform EventBus. 

147 

148 Idempotent: returns True on first successful attach, False on 

149 subsequent calls (already subscribed) or if the bus isn't 

150 bootstrapped. 

151 

152 Safe to call from anywhere — degrades to no-op when the 

153 platform layer hasn't initialised yet. Bootstrap should call 

154 this after ``platform.init`` succeeds. 

155 """ 

156 with self._lock: 

157 if self._subscribed: 

158 return False 

159 try: 

160 from core.platform.registry import get_registry 

161 registry = get_registry() 

162 if not registry.has('events'): 

163 logger.debug( 

164 "HiveExpertDiscovery: EventBus not yet bootstrapped; " 

165 "attach_to_event_bus is a no-op until platform.init runs") 

166 return False 

167 bus = registry.get('events') 

168 bus.on(_TOPIC_ANNOUNCE, self._on_announce_event) 

169 bus.on(_TOPIC_REVOKE, self._on_revoke_event) 

170 with self._lock: 

171 self._subscribed = True 

172 # Kick off the background health checker. Submit to our 

173 # own pool so the dispatcher and platform shutdown lifecycle 

174 # can stop us cleanly via ``self._stop``. 

175 self._pool.submit(self._health_check_loop) 

176 logger.info( 

177 "HiveExpertDiscovery: subscribed to %s + %s, " 

178 "health check every %ds", 

179 _TOPIC_ANNOUNCE, _TOPIC_REVOKE, _HEALTH_CHECK_INTERVAL_S) 

180 return True 

181 except Exception as e: 

182 logger.warning( 

183 "HiveExpertDiscovery: attach_to_event_bus failed (%s) — " 

184 "hive routing will not activate this session.", e) 

185 return False 

186 

187 def shutdown(self) -> None: 

188 """Stop the health-check loop and the worker pool. 

189 

190 Does NOT unregister already-registered hive backends — leave 

191 them in place for graceful drain. ``ModelRegistry.unregister`` 

192 is still callable; callers that need a hard reset can iterate. 

193 """ 

194 self._stop.set() 

195 self._pool.shutdown(wait=False) 

196 

197 # ── EventBus callbacks (bus passes (topic, data)) ─────────────── 

198 

199 def _on_announce_event(self, topic: str, data: Any) -> None: 

200 if not isinstance(data, dict): 

201 return 

202 self.on_peer_announce(data) 

203 

204 def _on_revoke_event(self, topic: str, data: Any) -> None: 

205 if not isinstance(data, dict): 

206 return 

207 peer_id = data.get('peer_id') or '' 

208 if peer_id: 

209 self._drop_peer(peer_id, reason='revoke') 

210 

211 # ── Announce handling ────────────────────────────────────────── 

212 

213 def on_peer_announce(self, msg: Dict[str, Any]) -> int: 

214 """Process a single ``peer.capability.announce`` payload. 

215 

216 Expected payload (producer-side schema documented here so the 

217 producer daemon has a single source of truth to match): 

218 

219 { 

220 "peer_id": str, # opaque, stable per node 

221 "endpoint": str, # https://node-x.example 

222 "auth_token": str, # bearer for HTTP calls 

223 "trust_signature": str, # signed by master-key chain 

224 "models": [ 

225 { 

226 "model_id": str, 

227 "display_name": str, 

228 "tier": "expert" | "balanced" | "fast", 

229 "verified_baseline": float, # 0.0-1.0 

230 "specialty": [str], # optional 

231 "capabilities": {...}, # optional 

232 }, 

233 ... 

234 ] 

235 } 

236 

237 Returns the number of expert-tier backends registered (or 

238 re-registered) for this peer after this announce processes. 

239 Zero means the announce was rejected (trust / reachability / 

240 no qualifying models) — caller may log; we don't raise. 

241 """ 

242 peer_id = (msg.get('peer_id') or '').strip() 

243 endpoint = (msg.get('endpoint') or '').rstrip('/') 

244 auth_token = msg.get('auth_token') or '' 

245 if not peer_id or not endpoint: 

246 logger.debug( 

247 "HiveExpertDiscovery: announce missing peer_id/endpoint, " 

248 "ignored") 

249 return 0 

250 

251 if not self._verify_peer_trust(msg): 

252 logger.warning( 

253 "HiveExpertDiscovery: peer %s trust check failed — " 

254 "refusing to register advertised expert models", 

255 peer_id) 

256 return 0 

257 

258 # Pre-commit reachability probe: don't register entries we 

259 # can't actually reach right now. Producer's next announce 

260 # will give us another chance. 

261 latency_ms = self._ping_latency(endpoint, auth_token) 

262 if latency_ms is None: 

263 logger.info( 

264 "HiveExpertDiscovery: peer %s unreachable on first probe " 

265 "(endpoint=%s); will retry on next announce", 

266 peer_id, endpoint) 

267 return 0 

268 

269 # Compute the new set BEFORE touching the registry so a 

270 # malformed payload doesn't half-drop the peer. 

271 new_ids: Set[str] = set() 

272 new_backends: List[ModelBackend] = [] 

273 for model in (msg.get('models') or []): 

274 if not isinstance(model, dict): 

275 continue 

276 if model.get('tier') != 'expert': 

277 continue 

278 model_id = (model.get('model_id') or '').strip() 

279 if not model_id: 

280 continue 

281 try: 

282 baseline = float(model.get('verified_baseline', 0.0)) 

283 except (TypeError, ValueError): 

284 continue 

285 if baseline < _MIN_VERIFIED_BASELINE: 

286 continue 

287 

288 backend_id = _backend_id_for(peer_id, model_id) 

289 new_ids.add(backend_id) 

290 new_backends.append(ModelBackend( 

291 model_id=backend_id, 

292 display_name=( 

293 'Hive: ' + (model.get('display_name') or model_id)), 

294 tier=ModelTier.EXPERT, 

295 config_list_entry={ 

296 'model': model_id, 

297 'api_key': auth_token or 'hive', 

298 'base_url': f'{endpoint}/v1', 

299 'price': [0, 0], 

300 'specialty': list(model.get('specialty') or []), 

301 }, 

302 avg_latency_ms=latency_ms, 

303 accuracy_score=baseline, 

304 cost_per_1k_tokens=0.0, 

305 is_local=False, 

306 hardware_dependent=False, 

307 gpu_tdp_watts=0.0, 

308 )) 

309 

310 # Diff against any existing registration for this peer. Register 

311 # the new ones first (so the peer never goes to zero models 

312 # mid-update), then unregister the ones that disappeared. 

313 with self._lock: 

314 prev_ids = self._peer_models.get(peer_id, set()) 

315 for backend in new_backends: 

316 self._registry.register(backend) 

317 dropped = prev_ids - new_ids 

318 for backend_id in dropped: 

319 self._registry.unregister(backend_id) 

320 with self._lock: 

321 self._peer_models[peer_id] = new_ids 

322 self._fail_count[peer_id] = 0 

323 

324 if new_ids: 

325 logger.info( 

326 "HiveExpertDiscovery: peer %s registered %d expert " 

327 "backend(s), dropped %d, latency=%.0fms", 

328 peer_id, len(new_ids), len(dropped), latency_ms) 

329 elif dropped: 

330 logger.info( 

331 "HiveExpertDiscovery: peer %s announced no qualifying " 

332 "experts; dropped %d prior backend(s)", 

333 peer_id, len(dropped)) 

334 return len(new_ids) 

335 

336 # ── Health check ─────────────────────────────────────────────── 

337 

338 def _health_check_loop(self) -> None: 

339 """Background loop — pings every registered peer's endpoint 

340 every ``_HEALTH_CHECK_INTERVAL_S``. Drops peers after 

341 ``_HEALTH_CHECK_FAIL_BUDGET`` consecutive failures. 

342 

343 Updates each surviving peer's per-backend ``avg_latency_ms`` 

344 from the live ping so the dispatcher's selector reflects 

345 current network conditions instead of the original-announce 

346 snapshot. 

347 """ 

348 while not self._stop.wait(_HEALTH_CHECK_INTERVAL_S): 

349 with self._lock: 

350 peers = list(self._peer_models.keys()) 

351 for peer_id in peers: 

352 if self._stop.is_set(): 

353 return 

354 try: 

355 self._check_one_peer(peer_id) 

356 except Exception as e: 

357 logger.debug( 

358 "HiveExpertDiscovery: health check raised for " 

359 "%s: %s", peer_id, e) 

360 

361 def _check_one_peer(self, peer_id: str) -> None: 

362 with self._lock: 

363 backend_ids = list(self._peer_models.get(peer_id, ())) 

364 if not backend_ids: 

365 return 

366 # Backends from one peer share an endpoint — probe via the 

367 # first registered backend's config_list_entry base_url. 

368 first = self._registry.get_model(backend_ids[0]) 

369 if first is None: 

370 # Backend vanished from under us (manual unregister?) — sync 

371 # our state and move on. 

372 self._drop_peer(peer_id, reason='backend_missing') 

373 return 

374 cfg = first.config_list_entry or {} 

375 base_url = (cfg.get('base_url') or '').rstrip('/') 

376 if base_url.endswith('/v1'): 

377 endpoint = base_url[:-3] 

378 else: 

379 endpoint = base_url 

380 auth = cfg.get('api_key') or '' 

381 

382 latency_ms = self._ping_latency(endpoint, auth) 

383 if latency_ms is None: 

384 with self._lock: 

385 self._fail_count[peer_id] = ( 

386 self._fail_count.get(peer_id, 0) + 1) 

387 fails = self._fail_count[peer_id] 

388 if fails >= _HEALTH_CHECK_FAIL_BUDGET: 

389 self._drop_peer(peer_id, reason=f'health_fail x{fails}') 

390 return 

391 

392 # Live latency update — feeds the dispatcher's picker so a 

393 # peer whose ping latency drifts up loses to faster peers. 

394 with self._lock: 

395 self._fail_count[peer_id] = 0 

396 for backend_id in backend_ids: 

397 self._registry.record_latency(backend_id, latency_ms) 

398 

399 # ── Helpers ──────────────────────────────────────────────────── 

400 

401 @staticmethod 

402 def _ping_latency(endpoint: str, auth_token: str) -> Optional[float]: 

403 """HTTP probe matching the same liveness contract llama-server 

404 speaks (commit 3f9be3be): HTTP 200 = alive, HTTP 503 with 

405 'Loading' body = alive-but-warming, anything else = dead. 

406 

407 Returns latency in milliseconds, or None on failure. 

408 """ 

409 if not endpoint: 

410 return None 

411 url = f'{endpoint.rstrip("/")}/health' 

412 headers = ( 

413 {'Authorization': f'Bearer {auth_token}'} if auth_token else {}) 

414 try: 

415 t0 = time.time() 

416 r = requests.get(url, headers=headers, timeout=_PING_TIMEOUT_S) 

417 elapsed_ms = (time.time() - t0) * 1000 

418 if r.status_code == 200: 

419 return elapsed_ms 

420 if r.status_code == 503: 

421 body_text = (r.text or '').lower() 

422 if 'loading' in body_text: 

423 return elapsed_ms 

424 return None 

425 except requests.RequestException: 

426 return None 

427 except Exception: 

428 return None 

429 

430 @staticmethod 

431 def _verify_peer_trust(msg: Dict[str, Any]) -> bool: 

432 """Trust gate. Prefer the master-key delegation chain when 

433 ``security.key_delegation.verify_peer_attestation`` is available. 

434 

435 Until that API ships, fall back to an env-var allowlist so an 

436 operator can run the discovery path against a known-trusted 

437 peer they control (typical for a regional deployment in 

438 bring-up). When the API lands, ImportError path goes dead 

439 without code change here. 

440 """ 

441 peer_id = (msg.get('peer_id') or '').strip() 

442 if not peer_id: 

443 return False 

444 try: 

445 from security.key_delegation import ( # type: ignore 

446 verify_peer_attestation, 

447 ) 

448 return bool(verify_peer_attestation( 

449 peer_id=peer_id, 

450 signature=msg.get('trust_signature', ''), 

451 payload=msg, 

452 )) 

453 except ImportError: 

454 allowlist_env = os.environ.get('HEVOLVE_HIVE_TRUSTED_PEERS', '') 

455 trusted = { 

456 p.strip() for p in allowlist_env.split(',') if p.strip() 

457 } 

458 return peer_id in trusted 

459 except Exception as e: 

460 logger.warning( 

461 "HiveExpertDiscovery: trust verification raised for " 

462 "peer %s: %s — denying", peer_id, e) 

463 return False 

464 

465 def _drop_peer(self, peer_id: str, *, reason: str) -> int: 

466 """Unregister every backend recorded for this peer. 

467 

468 Returns the count of backends unregistered. Safe to call 

469 multiple times — no-op when the peer is unknown. 

470 """ 

471 with self._lock: 

472 backend_ids = self._peer_models.pop(peer_id, set()) 

473 self._fail_count.pop(peer_id, None) 

474 for backend_id in backend_ids: 

475 self._registry.unregister(backend_id) 

476 if backend_ids: 

477 logger.info( 

478 "HiveExpertDiscovery: dropped %d backend(s) from peer " 

479 "%s (%s)", len(backend_ids), peer_id, reason) 

480 return len(backend_ids) 

481 

482 

483# ─── Module-level singleton ─── 

484_singleton: Optional[HiveExpertDiscovery] = None 

485_init_lock = threading.Lock() 

486 

487 

488def get_hive_expert_discovery() -> HiveExpertDiscovery: 

489 """Module-level singleton accessor — matches ``model_registry``'s 

490 own singleton pattern (see ``integrations.agent_engine.model_registry``). 

491 """ 

492 global _singleton 

493 with _init_lock: 

494 if _singleton is None: 

495 _singleton = HiveExpertDiscovery() 

496 return _singleton