Coverage for integrations / agent_engine / hive_expert_discovery.py: 0.0%
185 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""Auto-register hive-served expert-tier models into the canonical
2``ModelRegistry`` by subscribing to ``peer.capability.*`` gossip on
3the platform EventBus.
5Design intent
6=============
8The speculative dispatcher's design has a "hive expert" tier: when a
9local draft escalates, the bigger / fine-tuned model that actually
10takes the turn should run on a peer compute node (regional or central
11deployment), not the same local 4B that served the draft. Without
12that, ``_pick_expert_for_delegate('hive', ...)`` falls back to local
13fast and the speculative refine path is just the local 4B refining
14its own kind of output — pure waste.
16This module closes that gap on the **consumer** side. When a peer
17emits a ``peer.capability.announce`` event (producer side ships
18separately on the peer's node daemon), this subscriber:
20 1. Validates the peer's trust signature against the master-key
21 delegation chain (``security.key_delegation.verify_peer_attestation``).
22 Until that API ships, an env-var allowlist
23 (``HEVOLVE_HIVE_TRUSTED_PEERS``) is the safe fallback so an
24 operator can manually whitelist peers they trust.
25 2. Pings the advertised endpoint for reachability and latency
26 (re-uses the same HTTP probe pattern ``LlamaConfig`` uses for
27 llama-server health — see commit 3f9be3be for the 503-aware
28 liveness check).
29 3. Registers each ``tier=expert`` model the peer advertises as a
30 ``ModelBackend(is_local=False, tier=EXPERT, ...)`` in the
31 canonical ``model_registry``. The dispatcher's existing
32 ``get_expert_model()`` then returns the hive-served entry
33 automatically — no dispatcher-side change required.
34 4. Health-checks each registered peer every ``_HEALTH_CHECK_INTERVAL_S``.
35 After ``_HEALTH_CHECK_FAIL_BUDGET`` consecutive failures the peer's
36 models get ``ModelRegistry.unregister``'d so the dispatcher never
37 dispatches to a dead endpoint.
38 5. ``peer.capability.revoke`` events drop the peer's models
39 immediately (clean shutdown path).
41Producer-side gap
42=================
44No peer emits ``peer.capability.announce`` yet — that producer ships
45separately on the peer node daemon. Until then this subscriber sits
46idle: ``on_peer_announce`` is never called, the registry has no hive
47entries, and the dispatcher transparently falls through to local
48langchain. Telemetry's ``served_by`` field will read ``local_langchain``
49100% of the time until peers start announcing, giving you a clean
50metric for hive-tier uptake afterwards.
52Reuse
53=====
55Nothing in this file duplicates existing primitives:
57 - ``ModelRegistry.register`` / ``unregister`` — canonical registration
58 - ``ModelBackend`` — same shape every other backend uses
59 - ``ModelTier.EXPERT`` — single source of truth for tier identification
60 - ``core.platform.registry.get_registry`` + ``.get('events')`` — same
61 EventBus access pattern as the rest of the platform (see
62 ``core.platform.events.emit_event``)
63 - ``security.key_delegation.verify_peer_attestation`` (when shipped) —
64 same trust chain the rest of the security layer uses
65 - ``requests`` — already a project dependency; no new transport
66"""
67from __future__ import annotations
69import logging
70import os
71import threading
72import time
73from concurrent.futures import ThreadPoolExecutor
74from typing import Any, Dict, List, Optional, Set
76import requests
78from integrations.agent_engine.model_registry import (
79 ModelBackend,
80 ModelRegistry,
81 ModelTier,
82 model_registry as _default_registry,
83)
85logger = logging.getLogger('hevolve_social')
87# Health-check cadence and fail budget. Tuned so a transient network
88# blip (one missed ping) doesn't drop a peer, but a genuinely dead peer
89# is dropped within ~3 minutes (3 misses × 60s).
90_HEALTH_CHECK_INTERVAL_S = 60
91_HEALTH_CHECK_FAIL_BUDGET = 3
92_PING_TIMEOUT_S = 2.0
94# Minimum verified_baseline a peer's advertised model needs in order to
95# be registered. Below this we treat the advertisement as untrusted
96# noise — the peer is free to advertise but the dispatcher must never
97# route real turns to a model whose own self-reported baseline is
98# below random-chance reasoning quality.
99_MIN_VERIFIED_BASELINE = 0.5
101# Gossip topics. Producer side (peer's capability advertiser daemon)
102# ships separately; this module just listens.
103_TOPIC_ANNOUNCE = 'peer.capability.announce'
104_TOPIC_REVOKE = 'peer.capability.revoke'
107def _backend_id_for(peer_id: str, model_id: str) -> str:
108 """Canonical ID format for a hive-served backend entry.
110 Centralised so the announce / revoke / health-check paths all
111 speak the same prefix convention.
112 """
113 return f'hive-{peer_id}-{model_id}'
116class HiveExpertDiscovery:
117 """Subscribes to ``peer.capability.*`` gossip and keeps the canonical
118 ``ModelRegistry`` in sync with reachable hive expert peers.
120 Thread-safety
121 -------------
122 Internal state (``_peer_models``, ``_fail_count``) is guarded by
123 ``self._lock``. ``ModelRegistry.register`` / ``unregister`` carry
124 their own lock so cross-thread registration is safe. The health
125 check loop runs on a dedicated ``ThreadPoolExecutor`` worker; it
126 holds ``self._lock`` only while reading the peer list, never while
127 pinging.
128 """
130 def __init__(self, registry: Optional[ModelRegistry] = None):
131 self._registry = registry or _default_registry
132 self._lock = threading.Lock()
133 self._peer_models: Dict[str, Set[str]] = {}
134 self._fail_count: Dict[str, int] = {}
135 # Worker pool size 2: one for the health-check loop, one slack
136 # for any future async announce-validation that needs more than
137 # the bus dispatcher's caller thread.
138 self._pool = ThreadPoolExecutor(
139 max_workers=2, thread_name_prefix='hive_expert_discovery')
140 self._stop = threading.Event()
141 self._subscribed = False
143 # ── Public API ──────────────────────────────────────────────────
145 def attach_to_event_bus(self) -> bool:
146 """Wire up gossip subscriptions on the platform EventBus.
148 Idempotent: returns True on first successful attach, False on
149 subsequent calls (already subscribed) or if the bus isn't
150 bootstrapped.
152 Safe to call from anywhere — degrades to no-op when the
153 platform layer hasn't initialised yet. Bootstrap should call
154 this after ``platform.init`` succeeds.
155 """
156 with self._lock:
157 if self._subscribed:
158 return False
159 try:
160 from core.platform.registry import get_registry
161 registry = get_registry()
162 if not registry.has('events'):
163 logger.debug(
164 "HiveExpertDiscovery: EventBus not yet bootstrapped; "
165 "attach_to_event_bus is a no-op until platform.init runs")
166 return False
167 bus = registry.get('events')
168 bus.on(_TOPIC_ANNOUNCE, self._on_announce_event)
169 bus.on(_TOPIC_REVOKE, self._on_revoke_event)
170 with self._lock:
171 self._subscribed = True
172 # Kick off the background health checker. Submit to our
173 # own pool so the dispatcher and platform shutdown lifecycle
174 # can stop us cleanly via ``self._stop``.
175 self._pool.submit(self._health_check_loop)
176 logger.info(
177 "HiveExpertDiscovery: subscribed to %s + %s, "
178 "health check every %ds",
179 _TOPIC_ANNOUNCE, _TOPIC_REVOKE, _HEALTH_CHECK_INTERVAL_S)
180 return True
181 except Exception as e:
182 logger.warning(
183 "HiveExpertDiscovery: attach_to_event_bus failed (%s) — "
184 "hive routing will not activate this session.", e)
185 return False
187 def shutdown(self) -> None:
188 """Stop the health-check loop and the worker pool.
190 Does NOT unregister already-registered hive backends — leave
191 them in place for graceful drain. ``ModelRegistry.unregister``
192 is still callable; callers that need a hard reset can iterate.
193 """
194 self._stop.set()
195 self._pool.shutdown(wait=False)
197 # ── EventBus callbacks (bus passes (topic, data)) ───────────────
199 def _on_announce_event(self, topic: str, data: Any) -> None:
200 if not isinstance(data, dict):
201 return
202 self.on_peer_announce(data)
204 def _on_revoke_event(self, topic: str, data: Any) -> None:
205 if not isinstance(data, dict):
206 return
207 peer_id = data.get('peer_id') or ''
208 if peer_id:
209 self._drop_peer(peer_id, reason='revoke')
211 # ── Announce handling ──────────────────────────────────────────
213 def on_peer_announce(self, msg: Dict[str, Any]) -> int:
214 """Process a single ``peer.capability.announce`` payload.
216 Expected payload (producer-side schema documented here so the
217 producer daemon has a single source of truth to match):
219 {
220 "peer_id": str, # opaque, stable per node
221 "endpoint": str, # https://node-x.example
222 "auth_token": str, # bearer for HTTP calls
223 "trust_signature": str, # signed by master-key chain
224 "models": [
225 {
226 "model_id": str,
227 "display_name": str,
228 "tier": "expert" | "balanced" | "fast",
229 "verified_baseline": float, # 0.0-1.0
230 "specialty": [str], # optional
231 "capabilities": {...}, # optional
232 },
233 ...
234 ]
235 }
237 Returns the number of expert-tier backends registered (or
238 re-registered) for this peer after this announce processes.
239 Zero means the announce was rejected (trust / reachability /
240 no qualifying models) — caller may log; we don't raise.
241 """
242 peer_id = (msg.get('peer_id') or '').strip()
243 endpoint = (msg.get('endpoint') or '').rstrip('/')
244 auth_token = msg.get('auth_token') or ''
245 if not peer_id or not endpoint:
246 logger.debug(
247 "HiveExpertDiscovery: announce missing peer_id/endpoint, "
248 "ignored")
249 return 0
251 if not self._verify_peer_trust(msg):
252 logger.warning(
253 "HiveExpertDiscovery: peer %s trust check failed — "
254 "refusing to register advertised expert models",
255 peer_id)
256 return 0
258 # Pre-commit reachability probe: don't register entries we
259 # can't actually reach right now. Producer's next announce
260 # will give us another chance.
261 latency_ms = self._ping_latency(endpoint, auth_token)
262 if latency_ms is None:
263 logger.info(
264 "HiveExpertDiscovery: peer %s unreachable on first probe "
265 "(endpoint=%s); will retry on next announce",
266 peer_id, endpoint)
267 return 0
269 # Compute the new set BEFORE touching the registry so a
270 # malformed payload doesn't half-drop the peer.
271 new_ids: Set[str] = set()
272 new_backends: List[ModelBackend] = []
273 for model in (msg.get('models') or []):
274 if not isinstance(model, dict):
275 continue
276 if model.get('tier') != 'expert':
277 continue
278 model_id = (model.get('model_id') or '').strip()
279 if not model_id:
280 continue
281 try:
282 baseline = float(model.get('verified_baseline', 0.0))
283 except (TypeError, ValueError):
284 continue
285 if baseline < _MIN_VERIFIED_BASELINE:
286 continue
288 backend_id = _backend_id_for(peer_id, model_id)
289 new_ids.add(backend_id)
290 new_backends.append(ModelBackend(
291 model_id=backend_id,
292 display_name=(
293 'Hive: ' + (model.get('display_name') or model_id)),
294 tier=ModelTier.EXPERT,
295 config_list_entry={
296 'model': model_id,
297 'api_key': auth_token or 'hive',
298 'base_url': f'{endpoint}/v1',
299 'price': [0, 0],
300 'specialty': list(model.get('specialty') or []),
301 },
302 avg_latency_ms=latency_ms,
303 accuracy_score=baseline,
304 cost_per_1k_tokens=0.0,
305 is_local=False,
306 hardware_dependent=False,
307 gpu_tdp_watts=0.0,
308 ))
310 # Diff against any existing registration for this peer. Register
311 # the new ones first (so the peer never goes to zero models
312 # mid-update), then unregister the ones that disappeared.
313 with self._lock:
314 prev_ids = self._peer_models.get(peer_id, set())
315 for backend in new_backends:
316 self._registry.register(backend)
317 dropped = prev_ids - new_ids
318 for backend_id in dropped:
319 self._registry.unregister(backend_id)
320 with self._lock:
321 self._peer_models[peer_id] = new_ids
322 self._fail_count[peer_id] = 0
324 if new_ids:
325 logger.info(
326 "HiveExpertDiscovery: peer %s registered %d expert "
327 "backend(s), dropped %d, latency=%.0fms",
328 peer_id, len(new_ids), len(dropped), latency_ms)
329 elif dropped:
330 logger.info(
331 "HiveExpertDiscovery: peer %s announced no qualifying "
332 "experts; dropped %d prior backend(s)",
333 peer_id, len(dropped))
334 return len(new_ids)
336 # ── Health check ───────────────────────────────────────────────
338 def _health_check_loop(self) -> None:
339 """Background loop — pings every registered peer's endpoint
340 every ``_HEALTH_CHECK_INTERVAL_S``. Drops peers after
341 ``_HEALTH_CHECK_FAIL_BUDGET`` consecutive failures.
343 Updates each surviving peer's per-backend ``avg_latency_ms``
344 from the live ping so the dispatcher's selector reflects
345 current network conditions instead of the original-announce
346 snapshot.
347 """
348 while not self._stop.wait(_HEALTH_CHECK_INTERVAL_S):
349 with self._lock:
350 peers = list(self._peer_models.keys())
351 for peer_id in peers:
352 if self._stop.is_set():
353 return
354 try:
355 self._check_one_peer(peer_id)
356 except Exception as e:
357 logger.debug(
358 "HiveExpertDiscovery: health check raised for "
359 "%s: %s", peer_id, e)
361 def _check_one_peer(self, peer_id: str) -> None:
362 with self._lock:
363 backend_ids = list(self._peer_models.get(peer_id, ()))
364 if not backend_ids:
365 return
366 # Backends from one peer share an endpoint — probe via the
367 # first registered backend's config_list_entry base_url.
368 first = self._registry.get_model(backend_ids[0])
369 if first is None:
370 # Backend vanished from under us (manual unregister?) — sync
371 # our state and move on.
372 self._drop_peer(peer_id, reason='backend_missing')
373 return
374 cfg = first.config_list_entry or {}
375 base_url = (cfg.get('base_url') or '').rstrip('/')
376 if base_url.endswith('/v1'):
377 endpoint = base_url[:-3]
378 else:
379 endpoint = base_url
380 auth = cfg.get('api_key') or ''
382 latency_ms = self._ping_latency(endpoint, auth)
383 if latency_ms is None:
384 with self._lock:
385 self._fail_count[peer_id] = (
386 self._fail_count.get(peer_id, 0) + 1)
387 fails = self._fail_count[peer_id]
388 if fails >= _HEALTH_CHECK_FAIL_BUDGET:
389 self._drop_peer(peer_id, reason=f'health_fail x{fails}')
390 return
392 # Live latency update — feeds the dispatcher's picker so a
393 # peer whose ping latency drifts up loses to faster peers.
394 with self._lock:
395 self._fail_count[peer_id] = 0
396 for backend_id in backend_ids:
397 self._registry.record_latency(backend_id, latency_ms)
399 # ── Helpers ────────────────────────────────────────────────────
401 @staticmethod
402 def _ping_latency(endpoint: str, auth_token: str) -> Optional[float]:
403 """HTTP probe matching the same liveness contract llama-server
404 speaks (commit 3f9be3be): HTTP 200 = alive, HTTP 503 with
405 'Loading' body = alive-but-warming, anything else = dead.
407 Returns latency in milliseconds, or None on failure.
408 """
409 if not endpoint:
410 return None
411 url = f'{endpoint.rstrip("/")}/health'
412 headers = (
413 {'Authorization': f'Bearer {auth_token}'} if auth_token else {})
414 try:
415 t0 = time.time()
416 r = requests.get(url, headers=headers, timeout=_PING_TIMEOUT_S)
417 elapsed_ms = (time.time() - t0) * 1000
418 if r.status_code == 200:
419 return elapsed_ms
420 if r.status_code == 503:
421 body_text = (r.text or '').lower()
422 if 'loading' in body_text:
423 return elapsed_ms
424 return None
425 except requests.RequestException:
426 return None
427 except Exception:
428 return None
430 @staticmethod
431 def _verify_peer_trust(msg: Dict[str, Any]) -> bool:
432 """Trust gate. Prefer the master-key delegation chain when
433 ``security.key_delegation.verify_peer_attestation`` is available.
435 Until that API ships, fall back to an env-var allowlist so an
436 operator can run the discovery path against a known-trusted
437 peer they control (typical for a regional deployment in
438 bring-up). When the API lands, ImportError path goes dead
439 without code change here.
440 """
441 peer_id = (msg.get('peer_id') or '').strip()
442 if not peer_id:
443 return False
444 try:
445 from security.key_delegation import ( # type: ignore
446 verify_peer_attestation,
447 )
448 return bool(verify_peer_attestation(
449 peer_id=peer_id,
450 signature=msg.get('trust_signature', ''),
451 payload=msg,
452 ))
453 except ImportError:
454 allowlist_env = os.environ.get('HEVOLVE_HIVE_TRUSTED_PEERS', '')
455 trusted = {
456 p.strip() for p in allowlist_env.split(',') if p.strip()
457 }
458 return peer_id in trusted
459 except Exception as e:
460 logger.warning(
461 "HiveExpertDiscovery: trust verification raised for "
462 "peer %s: %s — denying", peer_id, e)
463 return False
465 def _drop_peer(self, peer_id: str, *, reason: str) -> int:
466 """Unregister every backend recorded for this peer.
468 Returns the count of backends unregistered. Safe to call
469 multiple times — no-op when the peer is unknown.
470 """
471 with self._lock:
472 backend_ids = self._peer_models.pop(peer_id, set())
473 self._fail_count.pop(peer_id, None)
474 for backend_id in backend_ids:
475 self._registry.unregister(backend_id)
476 if backend_ids:
477 logger.info(
478 "HiveExpertDiscovery: dropped %d backend(s) from peer "
479 "%s (%s)", len(backend_ids), peer_id, reason)
480 return len(backend_ids)
483# ─── Module-level singleton ───
484_singleton: Optional[HiveExpertDiscovery] = None
485_init_lock = threading.Lock()
488def get_hive_expert_discovery() -> HiveExpertDiscovery:
489 """Module-level singleton accessor — matches ``model_registry``'s
490 own singleton pattern (see ``integrations.agent_engine.model_registry``).
491 """
492 global _singleton
493 with _init_lock:
494 if _singleton is None:
495 _singleton = HiveExpertDiscovery()
496 return _singleton