Coverage for core / central_orchestrator_client.py: 67.3%

205 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Central Orchestrator Client — the deployment-side glue between a 

3running HART OS / Nunba node and hevolve.ai's central control plane. 

4 

5Brief reference: ml_intern_brief_hevolveai_training.md §2.3 + §5-D. 

6 

7Responsibilities: 

8 1. POST /heartbeat — every node reports health + benchmark best-scores 

9 to the central orchestrator so the ops dashboard shows live status. 

10 2. GET /halt — the node polls for the master kill signal. When 

11 present, the returned payload carries a master-key signature; we 

12 verify via security.master_key.verify_master_signature before 

13 calling HiveCircuitBreaker.halt_network(). 

14 3. (optional) emit TensorBoard local-writer path to a central ingest 

15 URL — consent-gated, controlled by a single env var. 

16 

17Design constraints: 

18 - NO URL is invented. Every endpoint is driven by environment 

19 variables. If the env var is unset, the client does nothing 

20 (fails open — the node operates independently). This matches 

21 the brief's §2.3 note: "Do not invent these URLs. Ask the 

22 human-in-the-loop." 

23 - Poll loop runs in a single background daemon thread with 

24 exponential backoff on failure. Never spams a dead central. 

25 - When the node tier is 'central' itself, the client SHORTS OUT — 

26 a central node does not POST heartbeats to itself. 

27 - Master-kill signature verification is MANDATORY. A halt signal 

28 without a verified master-key signature is logged at CRITICAL 

29 and ignored. This prevents a rogue central from halting the 

30 network without the physical master key. 

31""" 

32from __future__ import annotations 

33 

34import logging 

35import os 

36import random 

37import threading 

38import time 

39from typing import Any, Dict, Optional 

40 

41logger = logging.getLogger('hevolve_social') 

42 

43 

44# ─── Environment variables — all URLs driven by these ─── 

45 

46ENV_CENTRAL_URL = 'HEVOLVE_CENTRAL_ORCHESTRATOR_URL' 

47ENV_HEARTBEAT_PATH = 'HEVOLVE_CENTRAL_HEARTBEAT_PATH' # default /heartbeat 

48ENV_HALT_PATH = 'HEVOLVE_CENTRAL_HALT_PATH' # default /halt 

49ENV_TENSORBOARD_URL = 'HEVOLVE_TENSORBOARD_URL' 

50ENV_HEARTBEAT_INTERVAL = 'HEVOLVE_CENTRAL_HEARTBEAT_INTERVAL_S' # default 60 

51ENV_HALT_POLL_INTERVAL = 'HEVOLVE_CENTRAL_HALT_POLL_INTERVAL_S' # default 30 

52ENV_NODE_ID = 'HEVOLVE_NODE_ID' 

53ENV_NODE_TIER = 'HEVOLVE_NODE_TIER' 

54 

55 

56_DEFAULT_HEARTBEAT_PATH = '/heartbeat' 

57_DEFAULT_HALT_PATH = '/halt' 

58_DEFAULT_HEARTBEAT_INTERVAL = 60 

59_DEFAULT_HALT_POLL_INTERVAL = 30 

60_MAX_BACKOFF = 600 # 10 minutes 

61_INITIAL_BACKOFF = 5 # 5 seconds 

62 

63 

64def _int_env(name: str, default: int) -> int: 

65 try: 

66 return int(os.environ.get(name, '') or default) 

67 except (TypeError, ValueError): 

68 return default 

69 

70 

71class CentralOrchestratorClient: 

72 """Background client polling the hevolve.ai central orchestrator. 

73 

74 Call `start()` from boot — idempotent and a no-op when the central 

75 URL env var is unset. `stop()` is called at process shutdown. 

76 """ 

77 

78 def __init__(self): 

79 self._lock = threading.Lock() 

80 self._thread: Optional[threading.Thread] = None 

81 self._stop_event = threading.Event() 

82 self._running = False 

83 self._last_heartbeat_ts: float = 0.0 

84 self._last_heartbeat_error: Optional[str] = None 

85 self._last_halt_poll_ts: float = 0.0 

86 self._last_halt_poll_error: Optional[str] = None 

87 self._halt_applied = False 

88 self._backoff = _INITIAL_BACKOFF 

89 

90 # ─── Public API ─── 

91 

92 def is_configured(self) -> bool: 

93 """True when the central URL env var is set (non-empty).""" 

94 url = os.environ.get(ENV_CENTRAL_URL, '').strip() 

95 return bool(url) 

96 

97 def start(self) -> bool: 

98 """Start the poll loop. No-op when env not configured OR the 

99 node tier is 'central' (central node doesn't phone itself). 

100 Returns True when the thread actually started.""" 

101 if not self.is_configured(): 

102 logger.info( 

103 '[central_orchestrator] %s unset — client inactive', 

104 ENV_CENTRAL_URL, 

105 ) 

106 return False 

107 tier = (os.environ.get(ENV_NODE_TIER, '') or '').lower() 

108 if tier == 'central': 

109 logger.info( 

110 '[central_orchestrator] node tier=central — client ' 

111 'does not self-heartbeat' 

112 ) 

113 return False 

114 with self._lock: 

115 if self._running and self._thread and self._thread.is_alive(): 

116 return False 

117 self._stop_event.clear() 

118 self._thread = threading.Thread( 

119 target=self._loop, 

120 name='central_orchestrator_client', 

121 daemon=True, 

122 ) 

123 self._running = True 

124 self._thread.start() 

125 logger.info( 

126 '[central_orchestrator] started — target=%s heartbeat_interval=%ds', 

127 os.environ.get(ENV_CENTRAL_URL, ''), 

128 _int_env(ENV_HEARTBEAT_INTERVAL, _DEFAULT_HEARTBEAT_INTERVAL), 

129 ) 

130 return True 

131 

132 def stop(self) -> None: 

133 """Signal the loop to exit. Does NOT join — daemon thread 

134 exits when the process exits.""" 

135 self._stop_event.set() 

136 with self._lock: 

137 self._running = False 

138 

139 def get_status(self) -> Dict[str, Any]: 

140 """Expose current state for /status endpoints + dashboards.""" 

141 return { 

142 'configured': self.is_configured(), 

143 'running': self._running, 

144 'last_heartbeat_ts': self._last_heartbeat_ts, 

145 'last_heartbeat_error': self._last_heartbeat_error, 

146 'last_halt_poll_ts': self._last_halt_poll_ts, 

147 'last_halt_poll_error': self._last_halt_poll_error, 

148 'halt_applied': self._halt_applied, 

149 'central_url': os.environ.get(ENV_CENTRAL_URL, ''), 

150 'tensorboard_url': os.environ.get(ENV_TENSORBOARD_URL, ''), 

151 } 

152 

153 # ─── Loop body ─── 

154 

155 def _loop(self) -> None: 

156 heartbeat_interval = _int_env( 

157 ENV_HEARTBEAT_INTERVAL, _DEFAULT_HEARTBEAT_INTERVAL, 

158 ) 

159 halt_interval = _int_env( 

160 ENV_HALT_POLL_INTERVAL, _DEFAULT_HALT_POLL_INTERVAL, 

161 ) 

162 

163 next_heartbeat = 0.0 

164 next_halt_poll = 0.0 

165 

166 while not self._stop_event.is_set(): 

167 now = time.time() 

168 try: 

169 if now >= next_heartbeat: 

170 ok = self._post_heartbeat() 

171 if ok: 

172 self._backoff = _INITIAL_BACKOFF 

173 next_heartbeat = now + heartbeat_interval 

174 else: 

175 # Backoff — jittered, capped at _MAX_BACKOFF 

176 delay = min(self._backoff, _MAX_BACKOFF) 

177 delay = delay + random.uniform(0, delay * 0.2) 

178 next_heartbeat = now + delay 

179 self._backoff = min(self._backoff * 2, _MAX_BACKOFF) 

180 

181 if now >= next_halt_poll: 

182 self._check_halt() 

183 next_halt_poll = now + halt_interval 

184 except Exception as exc: 

185 # Never let an exception kill the loop — the whole point 

186 # of this thread is to survive central outages. 

187 logger.debug(f'[central_orchestrator] loop error: {exc}') 

188 

189 # Sleep until the sooner of the two next deadlines, but at 

190 # most 5 seconds so `stop()` takes effect promptly. 

191 sleep_for = max( 

192 0.5, 

193 min(5.0, min(next_heartbeat, next_halt_poll) - time.time()), 

194 ) 

195 self._stop_event.wait(sleep_for) 

196 

197 # ─── Heartbeat ─── 

198 

199 def _post_heartbeat(self) -> bool: 

200 url = self._url(ENV_HEARTBEAT_PATH, _DEFAULT_HEARTBEAT_PATH) 

201 if not url: 

202 return False 

203 payload = self._build_heartbeat_payload() 

204 try: 

205 from core.http_pool import pooled_post 

206 resp = pooled_post(url, json=payload, timeout=10) 

207 self._last_heartbeat_ts = time.time() 

208 if resp is None: 

209 self._last_heartbeat_error = 'no response' 

210 return False 

211 status = getattr(resp, 'status_code', 0) 

212 if 200 <= status < 300: 

213 self._last_heartbeat_error = None 

214 return True 

215 self._last_heartbeat_error = f'HTTP {status}' 

216 return False 

217 except ImportError: 

218 return self._post_heartbeat_requests(url, payload) 

219 except Exception as exc: 

220 self._last_heartbeat_ts = time.time() 

221 self._last_heartbeat_error = str(exc) 

222 return False 

223 

224 def _post_heartbeat_requests( 

225 self, url: str, payload: Dict[str, Any], 

226 ) -> bool: 

227 """Fallback for environments where core.http_pool is absent 

228 (e.g. slim import-only test runs). Uses `requests` with a 

229 short timeout.""" 

230 try: 

231 import requests 

232 resp = requests.post(url, json=payload, timeout=10) 

233 self._last_heartbeat_ts = time.time() 

234 if 200 <= resp.status_code < 300: 

235 self._last_heartbeat_error = None 

236 return True 

237 self._last_heartbeat_error = f'HTTP {resp.status_code}' 

238 return False 

239 except Exception as exc: 

240 self._last_heartbeat_ts = time.time() 

241 self._last_heartbeat_error = str(exc) 

242 return False 

243 

244 def _build_heartbeat_payload(self) -> Dict[str, Any]: 

245 """Assemble the heartbeat body. 

246 

247 Fields are intentionally conservative — no raw user data, no 

248 PII. The central orchestrator sees node_id, node_tier, 

249 guardrail_hash, and a small benchmark summary. Everything 

250 else stays on-device. 

251 """ 

252 payload: Dict[str, Any] = { 

253 'node_id': os.environ.get(ENV_NODE_ID, '') or _fallback_node_id(), 

254 'node_tier': os.environ.get(ENV_NODE_TIER, '') or 'flat', 

255 'timestamp': time.time(), 

256 'version': 1, 

257 } 

258 # Guardrail hash — proves we're still running genuine guardrails. 

259 try: 

260 from security.hive_guardrails import compute_guardrail_hash 

261 payload['guardrail_hash'] = compute_guardrail_hash() 

262 except Exception: 

263 pass 

264 # Halted-state flag — the central wants to know when a node 

265 # has self-tripped its circuit breaker. 

266 try: 

267 from security.hive_guardrails import HiveCircuitBreaker 

268 payload['halted'] = HiveCircuitBreaker.is_halted() 

269 except Exception: 

270 payload['halted'] = False 

271 # Benchmark best scores — lets the central dashboard rank nodes. 

272 try: 

273 from integrations.agent_engine.hive_benchmark_prover import ( 

274 get_benchmark_prover, 

275 ) 

276 prover = get_benchmark_prover() 

277 best = prover._leaderboard.get_best_scores() or {} 

278 # Only top-level benchmark scores — keep the payload small. 

279 payload['benchmark_best'] = { 

280 k: round(float(v.get('score', 0)), 4) 

281 for k, v in best.items() 

282 } 

283 except Exception: 

284 payload['benchmark_best'] = {} 

285 # WorldModelBridge stats — one-line summary of learning traffic. 

286 try: 

287 from integrations.agent_engine.world_model_bridge import ( 

288 get_world_model_bridge, 

289 ) 

290 b = get_world_model_bridge() 

291 stats = b.get_stats() if hasattr(b, 'get_stats') else {} 

292 payload['world_model'] = { 

293 'total_recorded': stats.get('total_recorded', 0), 

294 'total_flushed': stats.get('total_flushed', 0), 

295 } 

296 except Exception: 

297 payload['world_model'] = {} 

298 return payload 

299 

300 # ─── Halt poll ─── 

301 

302 def _check_halt(self) -> None: 

303 url = self._url(ENV_HALT_PATH, _DEFAULT_HALT_PATH) 

304 if not url: 

305 return 

306 try: 

307 from core.http_pool import pooled_get 

308 resp = pooled_get(url, timeout=10) 

309 except ImportError: 

310 resp = self._get_halt_requests(url) 

311 except Exception as exc: 

312 self._last_halt_poll_ts = time.time() 

313 self._last_halt_poll_error = str(exc) 

314 return 

315 

316 self._last_halt_poll_ts = time.time() 

317 if resp is None: 

318 self._last_halt_poll_error = 'no response' 

319 return 

320 status = getattr(resp, 'status_code', 0) 

321 if status == 404: 

322 # Central orchestrator defines "no halt" as 404 on /halt — 

323 # cheap, no JSON parse needed. 

324 self._last_halt_poll_error = None 

325 return 

326 if status != 200: 

327 self._last_halt_poll_error = f'HTTP {status}' 

328 return 

329 try: 

330 body = resp.json() if hasattr(resp, 'json') else None 

331 except Exception: 

332 self._last_halt_poll_error = 'invalid JSON' 

333 return 

334 if not isinstance(body, dict) or not body.get('halt'): 

335 self._last_halt_poll_error = None 

336 return 

337 reason = str(body.get('reason', 'central halt')) 

338 signature = str(body.get('signature', '')) 

339 if not signature: 

340 logger.critical( 

341 '[central_orchestrator] halt signal WITHOUT signature ' 

342 '— ignored. Reason: %s', reason, 

343 ) 

344 self._last_halt_poll_error = 'halt without signature' 

345 return 

346 self._apply_halt(reason=reason, signature=signature) 

347 self._last_halt_poll_error = None 

348 

349 def _get_halt_requests(self, url: str): 

350 try: 

351 import requests 

352 return requests.get(url, timeout=10) 

353 except Exception as exc: 

354 self._last_halt_poll_error = str(exc) 

355 return None 

356 

357 def _apply_halt(self, reason: str, signature: str) -> None: 

358 """Verify the master-key signature + trip the circuit breaker. 

359 

360 HiveCircuitBreaker.halt_network itself verifies the signature, 

361 so a forged halt signal is rejected at the guardrail layer. 

362 This method is just the caller. 

363 """ 

364 try: 

365 from security.hive_guardrails import HiveCircuitBreaker 

366 ok = HiveCircuitBreaker.halt_network( 

367 reason=f'central:{reason}', 

368 signature=signature, 

369 ) 

370 if ok: 

371 self._halt_applied = True 

372 logger.critical( 

373 '[central_orchestrator] HIVE HALTED by central ' 

374 'orchestrator. Reason: %s', reason, 

375 ) 

376 else: 

377 logger.critical( 

378 '[central_orchestrator] halt signal rejected — ' 

379 'signature verification failed. Reason: %s', 

380 reason, 

381 ) 

382 except ImportError: 

383 logger.critical( 

384 '[central_orchestrator] guardrails unavailable — ' 

385 'cannot apply halt: %s', reason, 

386 ) 

387 except Exception as exc: 

388 logger.critical( 

389 '[central_orchestrator] halt apply failed: %s', exc, 

390 ) 

391 

392 # ─── URL helpers ─── 

393 

394 def _url(self, path_env: str, default_path: str) -> str: 

395 base = os.environ.get(ENV_CENTRAL_URL, '').strip() 

396 if not base: 

397 return '' 

398 if base.endswith('/'): 

399 base = base[:-1] 

400 path = os.environ.get(path_env, '').strip() or default_path 

401 if not path.startswith('/'): 

402 path = '/' + path 

403 return base + path 

404 

405 

406def _fallback_node_id() -> str: 

407 """Return a stable-per-install node id when HEVOLVE_NODE_ID is unset.""" 

408 try: 

409 from security.node_integrity import compute_code_hash 

410 return compute_code_hash()[:16] 

411 except Exception: 

412 return 'unknown-node' 

413 

414 

415# ─── Module singleton ─── 

416 

417_client: Optional[CentralOrchestratorClient] = None 

418_client_lock = threading.Lock() 

419 

420 

421def get_client() -> CentralOrchestratorClient: 

422 """Return the singleton client, creating it on first access.""" 

423 global _client 

424 if _client is None: 

425 with _client_lock: 

426 if _client is None: 

427 _client = CentralOrchestratorClient() 

428 return _client 

429 

430 

431def start() -> bool: 

432 """Idempotent bootstrap helper — call from hart_intelligence_entry 

433 boot. Returns True when the background loop actually started.""" 

434 return get_client().start() 

435 

436 

437def stop() -> None: 

438 """Idempotent shutdown helper — call from atexit.""" 

439 get_client().stop() 

440 

441 

442def get_status() -> Dict[str, Any]: 

443 """Status snapshot for /api/social/dashboard/health.""" 

444 return get_client().get_status()