Coverage for core / central_orchestrator_client.py: 67.3%
205 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Central Orchestrator Client — the deployment-side glue between a
3running HART OS / Nunba node and hevolve.ai's central control plane.
5Brief reference: ml_intern_brief_hevolveai_training.md §2.3 + §5-D.
7Responsibilities:
8 1. POST /heartbeat — every node reports health + benchmark best-scores
9 to the central orchestrator so the ops dashboard shows live status.
10 2. GET /halt — the node polls for the master kill signal. When
11 present, the returned payload carries a master-key signature; we
12 verify via security.master_key.verify_master_signature before
13 calling HiveCircuitBreaker.halt_network().
14 3. (optional) emit TensorBoard local-writer path to a central ingest
15 URL — consent-gated, controlled by a single env var.
17Design constraints:
18 - NO URL is invented. Every endpoint is driven by environment
19 variables. If the env var is unset, the client does nothing
20 (fails open — the node operates independently). This matches
21 the brief's §2.3 note: "Do not invent these URLs. Ask the
22 human-in-the-loop."
23 - Poll loop runs in a single background daemon thread with
24 exponential backoff on failure. Never spams a dead central.
25 - When the node tier is 'central' itself, the client SHORTS OUT —
26 a central node does not POST heartbeats to itself.
27 - Master-kill signature verification is MANDATORY. A halt signal
28 without a verified master-key signature is logged at CRITICAL
29 and ignored. This prevents a rogue central from halting the
30 network without the physical master key.
31"""
32from __future__ import annotations
34import logging
35import os
36import random
37import threading
38import time
39from typing import Any, Dict, Optional
41logger = logging.getLogger('hevolve_social')
44# ─── Environment variables — all URLs driven by these ───
46ENV_CENTRAL_URL = 'HEVOLVE_CENTRAL_ORCHESTRATOR_URL'
47ENV_HEARTBEAT_PATH = 'HEVOLVE_CENTRAL_HEARTBEAT_PATH' # default /heartbeat
48ENV_HALT_PATH = 'HEVOLVE_CENTRAL_HALT_PATH' # default /halt
49ENV_TENSORBOARD_URL = 'HEVOLVE_TENSORBOARD_URL'
50ENV_HEARTBEAT_INTERVAL = 'HEVOLVE_CENTRAL_HEARTBEAT_INTERVAL_S' # default 60
51ENV_HALT_POLL_INTERVAL = 'HEVOLVE_CENTRAL_HALT_POLL_INTERVAL_S' # default 30
52ENV_NODE_ID = 'HEVOLVE_NODE_ID'
53ENV_NODE_TIER = 'HEVOLVE_NODE_TIER'
56_DEFAULT_HEARTBEAT_PATH = '/heartbeat'
57_DEFAULT_HALT_PATH = '/halt'
58_DEFAULT_HEARTBEAT_INTERVAL = 60
59_DEFAULT_HALT_POLL_INTERVAL = 30
60_MAX_BACKOFF = 600 # 10 minutes
61_INITIAL_BACKOFF = 5 # 5 seconds
64def _int_env(name: str, default: int) -> int:
65 try:
66 return int(os.environ.get(name, '') or default)
67 except (TypeError, ValueError):
68 return default
71class CentralOrchestratorClient:
72 """Background client polling the hevolve.ai central orchestrator.
74 Call `start()` from boot — idempotent and a no-op when the central
75 URL env var is unset. `stop()` is called at process shutdown.
76 """
78 def __init__(self):
79 self._lock = threading.Lock()
80 self._thread: Optional[threading.Thread] = None
81 self._stop_event = threading.Event()
82 self._running = False
83 self._last_heartbeat_ts: float = 0.0
84 self._last_heartbeat_error: Optional[str] = None
85 self._last_halt_poll_ts: float = 0.0
86 self._last_halt_poll_error: Optional[str] = None
87 self._halt_applied = False
88 self._backoff = _INITIAL_BACKOFF
90 # ─── Public API ───
92 def is_configured(self) -> bool:
93 """True when the central URL env var is set (non-empty)."""
94 url = os.environ.get(ENV_CENTRAL_URL, '').strip()
95 return bool(url)
97 def start(self) -> bool:
98 """Start the poll loop. No-op when env not configured OR the
99 node tier is 'central' (central node doesn't phone itself).
100 Returns True when the thread actually started."""
101 if not self.is_configured():
102 logger.info(
103 '[central_orchestrator] %s unset — client inactive',
104 ENV_CENTRAL_URL,
105 )
106 return False
107 tier = (os.environ.get(ENV_NODE_TIER, '') or '').lower()
108 if tier == 'central':
109 logger.info(
110 '[central_orchestrator] node tier=central — client '
111 'does not self-heartbeat'
112 )
113 return False
114 with self._lock:
115 if self._running and self._thread and self._thread.is_alive():
116 return False
117 self._stop_event.clear()
118 self._thread = threading.Thread(
119 target=self._loop,
120 name='central_orchestrator_client',
121 daemon=True,
122 )
123 self._running = True
124 self._thread.start()
125 logger.info(
126 '[central_orchestrator] started — target=%s heartbeat_interval=%ds',
127 os.environ.get(ENV_CENTRAL_URL, ''),
128 _int_env(ENV_HEARTBEAT_INTERVAL, _DEFAULT_HEARTBEAT_INTERVAL),
129 )
130 return True
132 def stop(self) -> None:
133 """Signal the loop to exit. Does NOT join — daemon thread
134 exits when the process exits."""
135 self._stop_event.set()
136 with self._lock:
137 self._running = False
139 def get_status(self) -> Dict[str, Any]:
140 """Expose current state for /status endpoints + dashboards."""
141 return {
142 'configured': self.is_configured(),
143 'running': self._running,
144 'last_heartbeat_ts': self._last_heartbeat_ts,
145 'last_heartbeat_error': self._last_heartbeat_error,
146 'last_halt_poll_ts': self._last_halt_poll_ts,
147 'last_halt_poll_error': self._last_halt_poll_error,
148 'halt_applied': self._halt_applied,
149 'central_url': os.environ.get(ENV_CENTRAL_URL, ''),
150 'tensorboard_url': os.environ.get(ENV_TENSORBOARD_URL, ''),
151 }
153 # ─── Loop body ───
155 def _loop(self) -> None:
156 heartbeat_interval = _int_env(
157 ENV_HEARTBEAT_INTERVAL, _DEFAULT_HEARTBEAT_INTERVAL,
158 )
159 halt_interval = _int_env(
160 ENV_HALT_POLL_INTERVAL, _DEFAULT_HALT_POLL_INTERVAL,
161 )
163 next_heartbeat = 0.0
164 next_halt_poll = 0.0
166 while not self._stop_event.is_set():
167 now = time.time()
168 try:
169 if now >= next_heartbeat:
170 ok = self._post_heartbeat()
171 if ok:
172 self._backoff = _INITIAL_BACKOFF
173 next_heartbeat = now + heartbeat_interval
174 else:
175 # Backoff — jittered, capped at _MAX_BACKOFF
176 delay = min(self._backoff, _MAX_BACKOFF)
177 delay = delay + random.uniform(0, delay * 0.2)
178 next_heartbeat = now + delay
179 self._backoff = min(self._backoff * 2, _MAX_BACKOFF)
181 if now >= next_halt_poll:
182 self._check_halt()
183 next_halt_poll = now + halt_interval
184 except Exception as exc:
185 # Never let an exception kill the loop — the whole point
186 # of this thread is to survive central outages.
187 logger.debug(f'[central_orchestrator] loop error: {exc}')
189 # Sleep until the sooner of the two next deadlines, but at
190 # most 5 seconds so `stop()` takes effect promptly.
191 sleep_for = max(
192 0.5,
193 min(5.0, min(next_heartbeat, next_halt_poll) - time.time()),
194 )
195 self._stop_event.wait(sleep_for)
197 # ─── Heartbeat ───
199 def _post_heartbeat(self) -> bool:
200 url = self._url(ENV_HEARTBEAT_PATH, _DEFAULT_HEARTBEAT_PATH)
201 if not url:
202 return False
203 payload = self._build_heartbeat_payload()
204 try:
205 from core.http_pool import pooled_post
206 resp = pooled_post(url, json=payload, timeout=10)
207 self._last_heartbeat_ts = time.time()
208 if resp is None:
209 self._last_heartbeat_error = 'no response'
210 return False
211 status = getattr(resp, 'status_code', 0)
212 if 200 <= status < 300:
213 self._last_heartbeat_error = None
214 return True
215 self._last_heartbeat_error = f'HTTP {status}'
216 return False
217 except ImportError:
218 return self._post_heartbeat_requests(url, payload)
219 except Exception as exc:
220 self._last_heartbeat_ts = time.time()
221 self._last_heartbeat_error = str(exc)
222 return False
224 def _post_heartbeat_requests(
225 self, url: str, payload: Dict[str, Any],
226 ) -> bool:
227 """Fallback for environments where core.http_pool is absent
228 (e.g. slim import-only test runs). Uses `requests` with a
229 short timeout."""
230 try:
231 import requests
232 resp = requests.post(url, json=payload, timeout=10)
233 self._last_heartbeat_ts = time.time()
234 if 200 <= resp.status_code < 300:
235 self._last_heartbeat_error = None
236 return True
237 self._last_heartbeat_error = f'HTTP {resp.status_code}'
238 return False
239 except Exception as exc:
240 self._last_heartbeat_ts = time.time()
241 self._last_heartbeat_error = str(exc)
242 return False
244 def _build_heartbeat_payload(self) -> Dict[str, Any]:
245 """Assemble the heartbeat body.
247 Fields are intentionally conservative — no raw user data, no
248 PII. The central orchestrator sees node_id, node_tier,
249 guardrail_hash, and a small benchmark summary. Everything
250 else stays on-device.
251 """
252 payload: Dict[str, Any] = {
253 'node_id': os.environ.get(ENV_NODE_ID, '') or _fallback_node_id(),
254 'node_tier': os.environ.get(ENV_NODE_TIER, '') or 'flat',
255 'timestamp': time.time(),
256 'version': 1,
257 }
258 # Guardrail hash — proves we're still running genuine guardrails.
259 try:
260 from security.hive_guardrails import compute_guardrail_hash
261 payload['guardrail_hash'] = compute_guardrail_hash()
262 except Exception:
263 pass
264 # Halted-state flag — the central wants to know when a node
265 # has self-tripped its circuit breaker.
266 try:
267 from security.hive_guardrails import HiveCircuitBreaker
268 payload['halted'] = HiveCircuitBreaker.is_halted()
269 except Exception:
270 payload['halted'] = False
271 # Benchmark best scores — lets the central dashboard rank nodes.
272 try:
273 from integrations.agent_engine.hive_benchmark_prover import (
274 get_benchmark_prover,
275 )
276 prover = get_benchmark_prover()
277 best = prover._leaderboard.get_best_scores() or {}
278 # Only top-level benchmark scores — keep the payload small.
279 payload['benchmark_best'] = {
280 k: round(float(v.get('score', 0)), 4)
281 for k, v in best.items()
282 }
283 except Exception:
284 payload['benchmark_best'] = {}
285 # WorldModelBridge stats — one-line summary of learning traffic.
286 try:
287 from integrations.agent_engine.world_model_bridge import (
288 get_world_model_bridge,
289 )
290 b = get_world_model_bridge()
291 stats = b.get_stats() if hasattr(b, 'get_stats') else {}
292 payload['world_model'] = {
293 'total_recorded': stats.get('total_recorded', 0),
294 'total_flushed': stats.get('total_flushed', 0),
295 }
296 except Exception:
297 payload['world_model'] = {}
298 return payload
300 # ─── Halt poll ───
302 def _check_halt(self) -> None:
303 url = self._url(ENV_HALT_PATH, _DEFAULT_HALT_PATH)
304 if not url:
305 return
306 try:
307 from core.http_pool import pooled_get
308 resp = pooled_get(url, timeout=10)
309 except ImportError:
310 resp = self._get_halt_requests(url)
311 except Exception as exc:
312 self._last_halt_poll_ts = time.time()
313 self._last_halt_poll_error = str(exc)
314 return
316 self._last_halt_poll_ts = time.time()
317 if resp is None:
318 self._last_halt_poll_error = 'no response'
319 return
320 status = getattr(resp, 'status_code', 0)
321 if status == 404:
322 # Central orchestrator defines "no halt" as 404 on /halt —
323 # cheap, no JSON parse needed.
324 self._last_halt_poll_error = None
325 return
326 if status != 200:
327 self._last_halt_poll_error = f'HTTP {status}'
328 return
329 try:
330 body = resp.json() if hasattr(resp, 'json') else None
331 except Exception:
332 self._last_halt_poll_error = 'invalid JSON'
333 return
334 if not isinstance(body, dict) or not body.get('halt'):
335 self._last_halt_poll_error = None
336 return
337 reason = str(body.get('reason', 'central halt'))
338 signature = str(body.get('signature', ''))
339 if not signature:
340 logger.critical(
341 '[central_orchestrator] halt signal WITHOUT signature '
342 '— ignored. Reason: %s', reason,
343 )
344 self._last_halt_poll_error = 'halt without signature'
345 return
346 self._apply_halt(reason=reason, signature=signature)
347 self._last_halt_poll_error = None
349 def _get_halt_requests(self, url: str):
350 try:
351 import requests
352 return requests.get(url, timeout=10)
353 except Exception as exc:
354 self._last_halt_poll_error = str(exc)
355 return None
357 def _apply_halt(self, reason: str, signature: str) -> None:
358 """Verify the master-key signature + trip the circuit breaker.
360 HiveCircuitBreaker.halt_network itself verifies the signature,
361 so a forged halt signal is rejected at the guardrail layer.
362 This method is just the caller.
363 """
364 try:
365 from security.hive_guardrails import HiveCircuitBreaker
366 ok = HiveCircuitBreaker.halt_network(
367 reason=f'central:{reason}',
368 signature=signature,
369 )
370 if ok:
371 self._halt_applied = True
372 logger.critical(
373 '[central_orchestrator] HIVE HALTED by central '
374 'orchestrator. Reason: %s', reason,
375 )
376 else:
377 logger.critical(
378 '[central_orchestrator] halt signal rejected — '
379 'signature verification failed. Reason: %s',
380 reason,
381 )
382 except ImportError:
383 logger.critical(
384 '[central_orchestrator] guardrails unavailable — '
385 'cannot apply halt: %s', reason,
386 )
387 except Exception as exc:
388 logger.critical(
389 '[central_orchestrator] halt apply failed: %s', exc,
390 )
392 # ─── URL helpers ───
394 def _url(self, path_env: str, default_path: str) -> str:
395 base = os.environ.get(ENV_CENTRAL_URL, '').strip()
396 if not base:
397 return ''
398 if base.endswith('/'):
399 base = base[:-1]
400 path = os.environ.get(path_env, '').strip() or default_path
401 if not path.startswith('/'):
402 path = '/' + path
403 return base + path
406def _fallback_node_id() -> str:
407 """Return a stable-per-install node id when HEVOLVE_NODE_ID is unset."""
408 try:
409 from security.node_integrity import compute_code_hash
410 return compute_code_hash()[:16]
411 except Exception:
412 return 'unknown-node'
415# ─── Module singleton ───
417_client: Optional[CentralOrchestratorClient] = None
418_client_lock = threading.Lock()
421def get_client() -> CentralOrchestratorClient:
422 """Return the singleton client, creating it on first access."""
423 global _client
424 if _client is None:
425 with _client_lock:
426 if _client is None:
427 _client = CentralOrchestratorClient()
428 return _client
431def start() -> bool:
432 """Idempotent bootstrap helper — call from hart_intelligence_entry
433 boot. Returns True when the background loop actually started."""
434 return get_client().start()
437def stop() -> None:
438 """Idempotent shutdown helper — call from atexit."""
439 get_client().stop()
442def get_status() -> Dict[str, Any]:
443 """Status snapshot for /api/social/dashboard/health."""
444 return get_client().get_status()