Coverage for integrations / agent_engine / speculative_dispatcher.py: 62.1%

457 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Unified Agent Goal Engine - Speculative Dispatcher 

3 

4Draft-first + expert-takeover dispatcher. 

5 

6Two entry points, one delivery channel: 

7 

8 dispatch_draft_first(prompt, ...) 

9 DRAFT tier (Qwen3.5-0.8B) replies SYNCHRONOUSLY in ~300ms with an 

10 envelope ``{reply, delegate: none|local|hive, confidence, ...}``. 

11 The reply is the user's standby answer; ``delegate`` is the draft's 

12 self-assessment of whether a heavier model needs to take the turn. 

13 

14 Five guards can promote the turn to expert background regardless of 

15 the draft's own decision: 

16 * refusal pattern in the reply → REFUSAL_OVERRIDE 

17 * delegate=none + confidence < floor → LOW_CONFIDENCE 

18 * delegate=none + agent_bound prompt → AGENT_BOUND 

19 * delegate=none + classifier surfaces actionable intent 

20 (channel_connect / is_create_agent / language_change / 

21 invite / join_room / memory_query) → ACTIONABLE_INTENT 

22 * envelope parse failure → PARSE_FAILURE 

23 The canonical taxonomy lives in 

24 ``integrations.agent_engine.escalation_reasons.EscalationReason``; 

25 callers / observers / WorldModelBridge see the reason in the 

26 return dict and in ``self._active[speculation_id]``. 

27 

28 dispatch_speculative(prompt, ...) 

29 Legacy entry. Fast tier replies synchronously, expert runs in 

30 background. Same delivery channel. 

31 

32Expert background path 

33---------------------- 

34 

35When a turn escalates, ``_expert_background_task`` dispatches via 

36``_dispatch_expert_langchain``: 

37 

38 - **Local expert** (``is_local=True``): routes through the full HARTOS 

39 /chat pipeline — full tool registry, full system prompt, full agent 

40 context. Bundled mode uses an in-process Flask test_client; HTTP 

41 mode POSTs to the configured backend. Re-entry guarded by 

42 ``speculative=False, draft_first=False`` in the payload. 

43 

44 - **Hive expert** (``is_local=False``): registered by 

45 ``HiveExpertDiscovery`` from ``peer.capability.announce`` gossip, 

46 routed via OpenAI-compatible POST to the peer's ``base_url`` with 

47 bearer auth. The hive peer's 27B / fine-tuned model takes the turn 

48 directly — no "review the draft" wrapper. 

49 

50When the expert returns, ``_deliver_expert_response`` bubble-replaces 

51the standby via the existing ``speculation_id`` channel: SSE for the 

52chat UI + TTS pupit topic for voice. The fast_response stays only when 

53the expert returned empty or got guardrail-blocked. 

54 

55Guardrails at every layer 

56------------------------- 

57- ``ConstitutionalFilter.check_prompt`` before dispatch and on expert output 

58- ``HiveCircuitBreaker.is_halted()`` before dispatch and again at task entry 

59- ``HiveEthos.rewrite_prompt_for_togetherness`` on every prompt 

60- Budget enforcement via ``_check_and_reserve_budget`` 

61- Energy + latency tracking on every model call 

62- Per-peer install-validation gate via ``_pass_validation_gate`` 

63 

64Hive expert wiring (subscriber-side complete; producer pending) 

65--------------------------------------------------------------- 

66``HiveExpertDiscovery`` listens for ``peer.capability.announce`` / 

67``peer.capability.revoke`` on the platform EventBus and auto-registers 

68reachable, trust-verified peers as ``ModelTier.EXPERT`` backends. 

69Until peers start emitting the announce gossip (producer daemon ships 

70separately), ``_pick_expert_for_delegate('hive', ...)`` falls back to 

71the local fast model and ``served_by`` in telemetry reads 

72``local_langchain_bg`` 100% of the time. 

73""" 

74import atexit 

75import json 

76import logging 

77import os 

78import re 

79import time 

80import uuid 

81import threading 

82from collections import deque 

83 

84from core.port_registry import get_port 

85from concurrent.futures import ThreadPoolExecutor 

86from typing import Dict, List, Optional 

87 

88# Eager-import the history loader so the per-call inline import in 

89# dispatch_draft_first doesn't hit Python's import lock on every chat 

90# (sys.modules cache makes subsequent calls fast, but the FIRST call 

91# of a freshly-warm process still pays the lock cost). Imported via 

92# try/except to keep the module load tolerant of test environments 

93# that mock out the social subtree. 

94try: 

95 from integrations.channels.memory.shared_history import ( 

96 seed_autogen_from_shared_history) 

97except ImportError: 

98 seed_autogen_from_shared_history = None # type: ignore[assignment] 

99 

100from integrations.agent_engine.escalation_reasons import EscalationReason 

101 

102logger = logging.getLogger('hevolve_social') 

103 

104 

105# Minimum draft confidence required to commit the draft's reply as the 

106# FINAL answer (delegate="none" path). Below this we schedule an expert 

107# verification in the background regardless of what the draft said — 

108# reasoning quality must never regress just because the 0.8B thought 

109# it could handle the question. A confident "none" still takes the 

110# fast path; an unsure "none" is treated as a quiet "local". 

111_DRAFT_CONFIDENCE_FLOOR = 0.85 

112 

113# Refusal patterns the draft must NEVER emit. See the role-contract 

114# block in `_build_draft_classifier_prompt` — the draft is the 

115# first-responder, NOT the authority on system capability. Any reply 

116# that asserts the system can't do something is a prompt-following 

117# failure (or the model slipped a refusal through the system prompt) 

118# and must be replaced with a standby + escalation to the expert. 

119# 

120# Targets HIGH-CONFIDENCE capability refusals only — not legitimate 

121# negative phrasing like "I don't know the answer". Match shape: 

122# "I" + negation + (capability noun OR system-action verb). Word 

123# boundaries + IGNORECASE guard against false positives like 

124# "I cannot wait to help". 

125# Capability verbs the draft might falsely claim it can't perform. 

126# Factored out so the refusal-pattern alternatives stay in sync. 

127_REFUSAL_VERBS = ( 

128 r"access|fetch|reach|browse|connect|connect to|read|retrieve|" 

129 r"download|verify|view|see|check|crawl|open|load|hit|resolve|" 

130 r"directly|currently|presently" 

131) 

132# Capability nouns paired with "I do(n't| not) have <NOUN>" or 

133# "I lack <NOUN>" / "I have no <NOUN>" — anything that frames an 

134# absent capability rather than negative recall ("I don't know"). 

135_REFUSAL_NOUNS = ( 

136 r"access|tools?|the ability|the capability|permission|a way|any way|" 

137 r"built-?in|external|the internet|web access|internet access|" 

138 r"means|way to" 

139) 

140_REFUSAL_PATTERN = re.compile( 

141 r"\b(?:" 

142 # "I cannot/can't <optional softener> <verb>" 

143 r"I (?:can'?t|cannot)\s+" 

144 r"(?:directly\s+|currently\s+|presently\s+)?" 

145 r"(?:" + _REFUSAL_VERBS + r")" 

146 r"|" 

147 # "I am unable/not able TO <optional softener> <verb>" 

148 # and contraction "I'm unable/not able TO <verb>" 

149 # (regex split because "I'm" has no space between I and m, 

150 # while "I am" requires the space) 

151 r"(?:I'?m|I am)\s+(?:unable|not able)\s+to\s+" 

152 r"(?:directly\s+|currently\s+|presently\s+)?" 

153 r"(?:" + _REFUSAL_VERBS + r")" 

154 r"|" 

155 # "I (don't|do not) have <noun>" e.g. "I don't have built-in tools" 

156 r"I do(?:n'?t| not) have\s+" 

157 r"(?:" + _REFUSAL_NOUNS + r")" 

158 r"|" 

159 # "I lack <noun>" e.g. "I lack access to GitHub" 

160 r"I lack\s+(?:" + _REFUSAL_NOUNS + r")" 

161 r"|" 

162 # "I have no <noun>" e.g. "I have no tools to retrieve…" 

163 r"I have no\s+(?:access|way|tools?|ability|means)" 

164 r"|" 

165 # "I'm just/only a (large language model|LLM|AI|chatbot|…)" 

166 # — split for I'm vs I am as above. 

167 r"(?:I'?m|I am) (?:just|only) (?:a|an) " 

168 r"(?:large language model|LLM|AI|language model|chatbot|" 

169 r"text-based assistant)" 

170 r")", 

171 re.IGNORECASE, 

172) 

173 

174# Generic standby reply substituted when the draft slips a refusal 

175# through. Keeps the user comfortable while the expert path runs. 

176# Intentionally short and capability-neutral — the expert's actual 

177# answer will replace this within the latency budget. 

178_REFUSAL_STANDBY_REPLY = "Let me check that for you…" 

179 

180 

181def _capability_summary_safe() -> str: 

182 """Return the runtime capability summary for the draft prompt, or 

183 an empty string when the helper itself can't be imported. 

184 

185 Lazy import — tool_allowlist pulls model_registry / ModelCatalog / 

186 MCP / channel subsystems on first call. In bare unit-test envs 

187 those aren't loaded, so we treat any failure as 'no summary' and 

188 let the rest of the prompt carry on. Empty string is the signal 

189 the f-string in _build_draft_classifier_prompt skips the section. 

190 """ 

191 try: 

192 from integrations.agent_engine.tool_allowlist import ( 

193 get_capability_summary, 

194 ) 

195 return get_capability_summary() or "" 

196 except Exception: 

197 return "" 

198 

199 

200class SpeculativeDispatcher: 

201 """Fast-first, expert-takeover speculative execution engine. 

202 

203 Every method enforces guardrails — no code path bypasses safety. 

204 """ 

205 

206 def __init__(self, model_registry=None): 

207 from .model_registry import model_registry as _default_registry 

208 self._registry = model_registry or _default_registry 

209 self._expert_pool = ThreadPoolExecutor( 

210 max_workers=int(os.environ.get('HEVOLVE_EXPERT_WORKERS', '4')), 

211 thread_name_prefix='spec_expert', 

212 ) 

213 atexit.register(lambda: self._expert_pool.shutdown(wait=False)) 

214 self._active: Dict[str, dict] = {} # speculation_id → metadata 

215 self._lock = threading.Lock() 

216 self._results: Dict[str, dict] = {} # speculation_id → expert result 

217 self._results_max = 1000 # evict oldest when exceeded 

218 # HiveMind fusion consult results (speculation_id → {result, ts}). 

219 # Populated by the best-effort `_schedule_hive_consult` fired when 

220 # the user selected `intelligence_preference='hive_preferred'` AND 

221 # the draft self-delegated to hive. Read by observers / tests — 

222 # does NOT feed back into the chat reply path (non-blocking; the 

223 # hot-path latency budget is preserved). Capped at 1000 entries 

224 # via the same eviction helper as `_results`. 

225 self._last_hive_consult: Dict[str, dict] = {} 

226 

227 # ─── Gate: should we speculate? ─── 

228 

229 def should_speculate(self, user_id: str, prompt_id: str, 

230 prompt: str, goal: dict = None) -> bool: 

231 """Gate: expert model available + budget remaining + not halted + not casual.""" 

232 # GUARDRAIL: circuit breaker 

233 from security.hive_guardrails import HiveCircuitBreaker 

234 if HiveCircuitBreaker.is_halted(): 

235 return False 

236 

237 # GUARDRAIL: constitutional check on prompt 

238 from security.hive_guardrails import ConstitutionalFilter 

239 passed, _ = ConstitutionalFilter.check_prompt(prompt) 

240 if not passed: 

241 return False 

242 

243 # Need both a fast and expert model 

244 fast = self._registry.get_fast_model() 

245 expert = self._registry.get_expert_model() 

246 if not fast or not expert: 

247 return False 

248 if fast.model_id == expert.model_id: 

249 return False # Same model — no point speculating 

250 

251 # Budget check (if goal has spark budget) 

252 if goal and goal.get('spark_budget', 0) > 0: 

253 spent = goal.get('spark_spent', 0) 

254 remaining = goal['spark_budget'] - spent 

255 # Estimate ~4k tokens per speculation (prompt + response) 

256 if remaining < expert.cost_per_1k_tokens * 4: 

257 return False 

258 

259 return True 

260 

261 # ─── Main entry point ─── 

262 

263 def dispatch_speculative(self, prompt: str, user_id: str, prompt_id: str, 

264 goal_id: str = None, goal_type: str = 'general', 

265 node_id: str = None) -> dict: 

266 """ 

267 1. Guardrail-check the prompt 

268 2. Pick fast model → dispatch synchronously → user gets response 

269 3. Record compute contribution for hive node (ad revenue) 

270 4. Pick expert model → dispatch in background thread 

271 5. Return fast response immediately 

272 

273 Returns: 

274 { 

275 'response': str, # Fast agent's response 

276 'speculation_id': str, # Track the background expert 

277 'fast_model': str, # Which model served fast 

278 'expert_pending': bool, # True if expert is working 

279 'latency_ms': float, # Fast response latency 

280 'energy_kwh': float, # Energy consumed 

281 } 

282 """ 

283 speculation_id = str(uuid.uuid4())[:12] 

284 

285 # GUARDRAIL: circuit breaker 

286 from security.hive_guardrails import HiveCircuitBreaker 

287 if HiveCircuitBreaker.is_halted(): 

288 return {'response': '', 'speculation_id': speculation_id, 

289 'error': 'Hive is halted', 'expert_pending': False} 

290 

291 # GUARDRAIL: constitutional filter 

292 from security.hive_guardrails import ConstitutionalFilter 

293 passed, reason = ConstitutionalFilter.check_prompt(prompt) 

294 if not passed: 

295 return {'response': '', 'speculation_id': speculation_id, 

296 'error': reason, 'expert_pending': False} 

297 

298 # GUARDRAIL: rewrite prompt for togetherness 

299 from security.hive_guardrails import HiveEthos 

300 prompt = HiveEthos.rewrite_prompt_for_togetherness(prompt) 

301 

302 # ── FAST PATH ── 

303 fast_model = self._registry.get_fast_model() 

304 if not fast_model: 

305 return {'response': '', 'speculation_id': speculation_id, 

306 'error': 'No fast model available', 'expert_pending': False} 

307 

308 start = time.time() 

309 fast_response = self._dispatch_to_model( 

310 fast_model, prompt, user_id, prompt_id, goal_type, goal_id) 

311 elapsed_ms = (time.time() - start) * 1000 

312 self._track_call_telemetry(fast_model, elapsed_ms, node_id) 

313 

314 # ── EXPERT PATH (background) ── 

315 expert_model = self._registry.get_expert_model() 

316 expert_pending = self._schedule_expert_background( 

317 speculation_id=speculation_id, 

318 prompt=prompt, 

319 fast_response=fast_response, 

320 expert_model=expert_model, 

321 user_id=user_id, prompt_id=prompt_id, 

322 goal_id=goal_id, goal_type=goal_type, 

323 origin_model_id=fast_model.model_id, 

324 origin_model_role='fast_model', 

325 ) 

326 

327 return { 

328 'response': fast_response, 

329 'speculation_id': speculation_id, 

330 'fast_model': fast_model.model_id, 

331 'expert_pending': expert_pending, 

332 'latency_ms': round(elapsed_ms, 1), 

333 'energy_kwh': round( 

334 self._registry.get_total_energy_kwh(hours=0.01), 6), 

335 } 

336 

337 # ─── Draft-first dispatch (Qwen3.5-0.8B standby + delegate signal) ─── 

338 

339 def dispatch_draft_first(self, prompt: str, user_id: str, prompt_id: str, 

340 goal_id: str = None, goal_type: str = 'general', 

341 node_id: str = None, 

342 agent_persona: Optional[str] = None, 

343 preferred_lang: str = 'en', 

344 user_pref: str = 'auto', 

345 agent_bound: bool = False) -> dict: 

346 """Draft-first dispatch: tiny model answers immediately, signals whether 

347 to delegate. 

348 

349 Architecture (the piece the user asked for on top of the speculative 

350 dispatcher): 

351 

352 1. The DRAFT tier model (Qwen3.5-0.8B) receives a wrapped prompt that 

353 asks it to emit JSON: 

354 { "reply": "...", 

355 "delegate": "none" | "local" | "hive", 

356 "confidence": 0.0-1.0 } 

357 `delegate` is the draft's self-assessment of its place in the 

358 hierarchy: can it answer, or should a bigger model take over? 

359 2. Regardless of the delegate signal, the draft's ``reply`` is 

360 returned SYNCHRONOUSLY as a standby response — the user sees 

361 something within ~300ms even when delegation is needed. 

362 3. When ``delegate != "none"`` (or the JSON can't be parsed), a 

363 background expert task runs on the local FAST tier or is 

364 dispatched to the hive — same code path as dispatch_speculative. 

365 4. Both the draft's reply AND the eventual expert reply get fed 

366 through ``WorldModelBridge.record_interaction`` with distinct 

367 model_id tags so HevolveAI's continual learner can distill 

368 the expert's improvements back into the draft over time. 

369 

370 Guardrails: the outer /chat handler already ran GuardrailEnforcer + 

371 prompt_guard before calling us, so we only re-check constitutional 

372 filter (cheap) and circuit breaker here. Budget + hive_ethos still 

373 apply on the expert path via dispatch_speculative's helpers. 

374 

375 Returns a dict shaped like dispatch_speculative's response plus 

376 ``delegate``, ``draft_model``, and ``draft_confidence`` fields so 

377 callers can discriminate. 

378 """ 

379 speculation_id = str(uuid.uuid4())[:12] 

380 

381 # ── 1. Local preconditions (cheapest first, so a missing registry 

382 # entry never triggers an unnecessary network probe). 

383 draft_model = self._registry.get_draft_model() 

384 if draft_model is None: 

385 return { 

386 'response': '', 'speculation_id': speculation_id, 

387 'delegate': 'none', 'error': 'no_draft_model', 

388 'expert_pending': False, 

389 } 

390 

391 # ── Language guard: skip 0.8B draft for non-Latin scripts ── 

392 # Qwen3.5-0.8B-UD-Q4_K_XL has weak Unicode coverage for the 

393 # scripts listed in core.constants.NON_LATIN_SCRIPT_LANGS — 

394 # falls back to Latin-transliterated output ("Vanakkam! Nan 

395 # ungal nanban..." for Tamil) which is unusable for TTS + UX. 

396 # The 4B main has native Unicode coverage. Canonical set lives 

397 # in core.constants; this file imports rather than duplicates. 

398 from core.constants import NON_LATIN_SCRIPT_LANGS 

399 _lang_key = (preferred_lang or 'en').split('-')[0].lower() 

400 if _lang_key and _lang_key != 'en' and _lang_key in NON_LATIN_SCRIPT_LANGS: 

401 logger.info( 

402 f"Skipping 0.8B draft for preferred_lang={_lang_key!r} " 

403 f"(weak Unicode script coverage); routing direct to 4B.", 

404 ) 

405 return { 

406 'response': '', 'speculation_id': speculation_id, 

407 'delegate': 'none', 'error': 'draft_skipped_non_english', 

408 'expert_pending': False, 

409 } 

410 

411 # ── 2. Gate checks (constitutional + circuit breaker + draft server probe) 

412 gate_error = self._check_draft_first_gates(prompt) 

413 if gate_error is not None: 

414 return { 

415 'response': '', 'speculation_id': speculation_id, 

416 'delegate': 'none', 'error': gate_error, 'expert_pending': False, 

417 } 

418 

419 # ── 2. Load recent conversation history (best-effort, non-fatal) ── 

420 # Single source of truth — same seed_autogen_from_shared_history the 

421 # autogen GroupChat uses. Without this the draft sees each turn as 

422 # first-contact and emits generic greetings for follow-ups (witnessed 

423 # 2026-05-09 09:35 — user asked about WhatsApp at 09:34 then "what's 

424 # happening?" at 09:35; draft replied with a generic "Nothing 

425 # unusual…" because it had no memory of the WhatsApp turn 60s prior). 

426 # Cap at 4 messages — the 0.8B context budget can't fit more without 

427 # crowding out the answering rules + JSON schema. 

428 recent_turns: List[Dict] = [] 

429 try: 

430 if seed_autogen_from_shared_history is None: 

431 raise ImportError( 

432 "shared_history.seed_autogen_from_shared_history " 

433 "not importable in this environment") 

434 recent_turns = seed_autogen_from_shared_history( 

435 user_id, max_messages=4) or [] 

436 # NO dedup against the current prompt — users genuinely 

437 # repeat themselves (rephrasing, asking again, "hi" / "hi" 

438 # in casual back-and-forth). Dropping a recent turn just 

439 # because its text matches the current prompt would silently 

440 # lose legitimate conversation history. If the writer hook 

441 # in get_response_group / chatbot_routes persisted the 

442 # current turn before this dispatch ran, the prompt may 

443 # appear twice in the LLM's input — that's the correct 

444 # natural-language signal ("user said X, then said X again") 

445 # and the model should treat it as such. 

446 except Exception as _hist_err: 

447 logger.debug(f"draft history load failed: {_hist_err}") 

448 recent_turns = [] 

449 

450 # ── 3. Dispatch the draft with the classifier prompt ── 

451 draft_prompt = self._build_draft_classifier_prompt( 

452 prompt, agent_persona=agent_persona, preferred_lang=preferred_lang, 

453 recent_turns=recent_turns) 

454 start = time.time() 

455 draft_raw = self._dispatch_to_model( 

456 draft_model, draft_prompt, user_id, prompt_id, goal_type, goal_id) 

457 draft_latency_ms = (time.time() - start) * 1000 

458 self._track_call_telemetry(draft_model, draft_latency_ms, node_id) 

459 

460 # ── 3. Parse envelope + record draft interaction ── 

461 parsed = self._parse_draft_envelope(draft_raw) 

462 draft_reply = parsed.get('reply') or draft_raw.strip()[:500] 

463 # Track whether the envelope parsed at all — an empty parsed dict 

464 # means we fell through to the delegate='local' default below, 

465 # which is materially different from the model emitting an 

466 # explicit 'local' decision. Stamp the reason for downstream 

467 # telemetry / continual-learning before any guard runs. 

468 parse_failed = not parsed 

469 delegate = parsed.get('delegate', 'local') # default on parse fail 

470 confidence = float(parsed.get('confidence') or 0.0) 

471 # Canonical reason value — refined by each guard below. Starts 

472 # at PARSE_FAILURE when the envelope didn't parse, otherwise the 

473 # baseline CLASSIFIER_DELEGATE. Only meaningful when we end up 

474 # in the delegate in ('local', 'hive') branch — None otherwise. 

475 escalation_reason: Optional[EscalationReason] = ( 

476 EscalationReason.PARSE_FAILURE if parse_failed 

477 else EscalationReason.CLASSIFIER_DELEGATE 

478 ) 

479 

480 # REFUSAL GUARD: the draft is the first-responder role, NOT the 

481 # authority on system capability. Any reply that asserts the 

482 # system can't do something is a prompt-following failure — the 

483 # role contract in the classifier prompt explicitly forbids 

484 # refusals of this shape. When the model slips one through 

485 # anyway (typically when the user asks for a tool-bound 

486 # capability the draft can't see — URL fetch, file read, 

487 # GitHub PR check, etc.), we replace the standby with a generic 

488 # holding reply and force escalation to the local expert. The 

489 # user never sees the refusal; the expert (with full tool 

490 # access) produces the real answer and the SSE/WAMP fan-out 

491 # delivers it to replace the standby. 

492 # Size-agnostic — same rule applies whether the draft slot 

493 # holds a 0.8B, 4B, or 27B model. None of them see the full 

494 # tool registry the expert binds. 

495 refusal_overridden = False 

496 if draft_reply and _REFUSAL_PATTERN.search(draft_reply): 

497 logger.info( 

498 "draft-first: refusal detected in draft reply " 

499 "(delegate=%r, conf=%.2f) — replacing with standby + " 

500 "forcing delegate=local. Original reply prefix: %r", 

501 delegate, confidence, draft_reply[:120], 

502 ) 

503 draft_reply = _REFUSAL_STANDBY_REPLY 

504 delegate = 'local' 

505 refusal_overridden = True 

506 escalation_reason = EscalationReason.REFUSAL_OVERRIDE 

507 

508 # REASONING-QUALITY GUARD: an unsure "none" is not good enough to 

509 # ship as the final answer. Promote it to "local" so an expert 

510 # verifier still runs in the background. Keeps the single 

511 # dispatch path — this is just delegate normalization, no new 

512 # branch below. Ensures the draft model can never regress the 

513 # reasoning quality the user gets — worst case they see the 

514 # draft reply briefly as a standby and it's replaced when the 

515 # 4B expert finishes via the existing crossbar delivery. 

516 if delegate == 'none' and confidence < _DRAFT_CONFIDENCE_FLOOR: 

517 logger.info( 

518 f"draft-first: low-confidence 'none' ({confidence:.2f} < " 

519 f"{_DRAFT_CONFIDENCE_FLOOR}) → escalating to local verifier" 

520 ) 

521 delegate = 'local' 

522 escalation_reason = EscalationReason.LOW_CONFIDENCE 

523 

524 # AGENT-BINDING GUARD: when the caller bound this turn to a 

525 # specific agent (prompt_id resolves to a real agent on disk, 

526 # not the request-id fallback), the user has chosen a 

527 # specialist and expects THAT specialist's voice — not the 

528 # 0.8B draft answering in its generic voice. Even a trivial 

529 # greeting like "hi" should pass through the specialist so 

530 # its persona / system prompt / tool registry shapes the 

531 # reply. Promote delegate=none → local so the expert path 

532 # always takes the turn for agent-bound requests. 

533 # 

534 # When agent_bound=False (no specific agent in scope, e.g. 

535 # default chat or guest free-floating), the draft's "none" 

536 # decision stays — the 0.8B can handle trivial questions 

537 # without paying the 4B cost. 

538 if delegate == 'none' and agent_bound: 

539 logger.info( 

540 "draft-first: prompt_id=%r is bound to a specific " 

541 "agent — escalating delegate=none → 'local' so the " 

542 "agent's expert path takes the turn instead of the " 

543 "0.8B draft's generic voice.", 

544 prompt_id, 

545 ) 

546 delegate = 'local' 

547 escalation_reason = EscalationReason.AGENT_BOUND 

548 

549 # ACTIONABLE-INTENT GUARD: when the draft's own classifier 

550 # surfaces an actionable intent flag (channel_connect, 

551 # is_create_agent, language_change), answering in-band would 

552 # orphan the action — there is no way to invoke the matching 

553 # tool (Connect_Channel / Create_Agent) from the casual draft 

554 # path because casual_conv=True skips the full tool registry 

555 # in hart_intelligence_entry.get_ans (see the is_first=True 

556 # branch). Promote delegate=none → 'local' so the expert 

557 # turn binds the full registry — Connect_Channel for 

558 # channel-add intents, Create_Agent for agent-build intents — 

559 # and actually fires the tool the LLM would otherwise have 

560 # described in free-form text. 

561 # 

562 # The draft's reply is also replaced with the standby (same 

563 # rationale as REFUSAL GUARD above): the draft on the 

564 # casual path has no tool access, so any in-band reply on 

565 # an actionable-intent turn is a verified-signal anti-pattern 

566 # — it claims action while taking none. The expert reply 

567 # replaces the standby via the existing speculation_id 

568 # bubble-replacement path (#204 already shipped). 

569 # Helper: treat the sentinel string 'none' as "no intent", matching 

570 # the classifier's own contract (it returns 'none' for *_intent / 

571 # memory_query when there's no actionable intent). Without this, 

572 # the literal 'none'.strip() evaluates truthy and the guard fires 

573 # on every casual turn → every reply gets replaced with the 

574 # "Let me check that for you…" placeholder, and the user never 

575 # receives the actual answer because the expert background task 

576 # has nothing real to refine (live evidence 2026-05-11 22:36:41 

577 # request 897fc534 — speculation_id 9a418ac4-1eb, message 'strange'). 

578 def _intent_set(value) -> bool: 

579 if not value or value is False: 

580 return False 

581 v = str(value).strip().lower() 

582 return bool(v) and v != 'none' 

583 

584 if delegate == 'none' and ( 

585 _intent_set(parsed.get('channel_connect')) 

586 or parsed.get('is_create_agent') 

587 or _intent_set(parsed.get('language_change')) 

588 or _intent_set(parsed.get('invite_intent')) 

589 or _intent_set(parsed.get('join_room_intent')) 

590 or _intent_set(parsed.get('memory_query')) 

591 ): 

592 logger.info( 

593 "draft-first: actionable intent flag set " 

594 "(channel_connect=%r, is_create_agent=%r, " 

595 "language_change=%r, invite_intent=%r, " 

596 "join_room_intent=%r, memory_query=%r) — escalating " 

597 "delegate=none → 'local' so the expert tool registry " 

598 "handles the turn.", 

599 parsed.get('channel_connect'), 

600 parsed.get('is_create_agent'), 

601 parsed.get('language_change'), 

602 parsed.get('invite_intent'), 

603 parsed.get('join_room_intent'), 

604 parsed.get('memory_query'), 

605 ) 

606 draft_reply = _REFUSAL_STANDBY_REPLY 

607 delegate = 'local' 

608 escalation_reason = EscalationReason.ACTIONABLE_INTENT 

609 

610 # Non-Latin languages skip draft entirely (hart_intelligence_entry.py) 

611 # so this code path is only reached for English/Latin-script languages. 

612 

613 # ── Draft telemetry: log full envelope for offline calibration ── 

614 # The data scientist requires this to build a confidence calibration 

615 # curve and detect intent classification drift over time. 

616 try: 

617 _telemetry = { 

618 'speculation_id': speculation_id, 

619 'user_id': user_id, 

620 'confidence': confidence, 

621 'delegate': delegate, 

622 'is_casual': parsed.get('is_casual'), 

623 'is_correction': parsed.get('is_correction'), 

624 'is_create_agent': parsed.get('is_create_agent'), 

625 'channel_connect': parsed.get('channel_connect'), 

626 'language_change': parsed.get('language_change'), 

627 'draft_model': draft_model.model_id if draft_model else None, 

628 'latency_ms': draft_latency_ms, 

629 'reply_len': len(draft_reply) if draft_reply else 0, 

630 'escalated': delegate != parsed.get('delegate', 'local'), 

631 # refusal_overridden lets us calibrate per-model adherence 

632 # to the role contract. A draft model with the right 

633 # prompt should hit this near-zero — sustained non-zero 

634 # rate is a signal that either the prompt isn't being 

635 # followed (model too small / fine-tune mismatch) or the 

636 # model is the wrong fit for the draft slot on this 

637 # hardware. 

638 'refusal_overridden': refusal_overridden, 

639 } 

640 logger.info(f"draft-telemetry: {json.dumps(_telemetry)}") 

641 except Exception: 

642 pass # telemetry must never break the hot path 

643 

644 # When the draft path's net effect is "draft answered, no 

645 # escalation" (delegate=='none' and none of the guards fired), 

646 # the escalation_reason is meaningless — clear it so the 

647 # WorldModelBridge sees a clean "this draft was the final 

648 # answer" record. 

649 recorded_reason = ( 

650 escalation_reason.value if delegate in ('local', 'hive') 

651 else None 

652 ) 

653 self._record_interaction_safely( 

654 user_id=user_id, prompt_id=prompt_id, prompt=prompt, 

655 response=draft_reply, model_id=draft_model.model_id, 

656 latency_ms=draft_latency_ms, node_id=node_id, goal_id=goal_id, 

657 escalation_reason=recorded_reason, 

658 ) 

659 

660 # ── 4. Schedule expert if the draft self-delegated ── 

661 expert_pending = False 

662 hive_consult_scheduled = False 

663 if delegate in ('local', 'hive'): 

664 expert_model = self._pick_expert_for_delegate( 

665 delegate, user_pref=user_pref) 

666 expert_pending = self._schedule_expert_background( 

667 speculation_id=speculation_id, 

668 prompt=prompt, 

669 fast_response=draft_reply, 

670 expert_model=expert_model, 

671 user_id=user_id, prompt_id=prompt_id, 

672 goal_id=goal_id, goal_type=goal_type, 

673 origin_model_id=draft_model.model_id, 

674 origin_model_role='draft_model', 

675 delegate=delegate, 

676 escalation_reason=escalation_reason, 

677 ) 

678 # When the user explicitly asked for `hive_preferred` AND the 

679 # draft self-delegated to hive, also fire a best-effort MoE 

680 # HiveMind fusion consult in the background. The consult 

681 # result is stored on self._last_hive_consult for observers 

682 # and future wiring; it does NOT replace the expert's reply 

683 # on the hot path (preserves the 1.5s chat budget). Safe on 

684 # `auto` and `local_only` paths — gated by user_pref so 

685 # existing callers see no behavior change. 

686 if delegate == 'hive' and user_pref == 'hive_preferred': 

687 hive_consult_scheduled = self._schedule_hive_consult( 

688 prompt=prompt, 

689 user_id=user_id, 

690 speculation_id=speculation_id, 

691 ) 

692 

693 # Channel name defensively coerced: draft model sometimes emits None, 

694 # null, or a capitalised string. Normalise to a lowercased str so 

695 # callers can treat an empty string as "no channel connect intent". 

696 _channel = parsed.get('channel_connect') or '' 

697 if not isinstance(_channel, str): 

698 _channel = '' 

699 # Language change — same defensive coercion as channel_connect. 

700 # Validated against the canonical SUPPORTED_LANG_DICT (single 

701 # source of truth for language codes, lives in hart_intelligence_entry). 

702 _lang = parsed.get('language_change') or '' 

703 if not isinstance(_lang, str): 

704 _lang = '' 

705 _lang = _lang.strip().lower()[:5] 

706 if _lang: 

707 from core.safe_hartos_attr import safe_hartos_attr 

708 SUPPORTED_LANG_DICT = safe_hartos_attr('SUPPORTED_LANG_DICT') 

709 if SUPPORTED_LANG_DICT is not None: 

710 if _lang not in SUPPORTED_LANG_DICT: 

711 logger.debug( 

712 "draft: language_change '%s' not in " 

713 "SUPPORTED_LANG_DICT — ignoring", _lang) 

714 _lang = '' 

715 # else: HARTOS not yet loaded — accept the code as-is, same 

716 # fall-through the original ImportError branch had. 

717 return { 

718 'response': draft_reply, 

719 'speculation_id': speculation_id, 

720 'draft_model': draft_model.model_id, 

721 'delegate': delegate, 

722 'draft_confidence': confidence, 

723 'is_correction': bool(parsed.get('is_correction', False)), 

724 'is_casual': bool(parsed.get('is_casual', False)), 

725 'is_create_agent': bool(parsed.get('is_create_agent', False)), 

726 'channel_connect': _channel.strip().lower(), 

727 'language_change': _lang.strip().lower(), 

728 'expert_pending': expert_pending, 

729 # Additive field: observers (J282, telemetry, admin/diag) can 

730 # tell a hive fusion consult was fired in background. Legacy 

731 # callers that don't read this field are unaffected. 

732 'hive_consult_scheduled': hive_consult_scheduled, 

733 # Additive field: which guard promoted this turn to expert 

734 # (or ``None`` when the draft answered as final). Canonical 

735 # values come from ``EscalationReason`` (see 

736 # ``integrations.agent_engine.escalation_reasons``). 

737 'escalation_reason': recorded_reason, 

738 'latency_ms': round(draft_latency_ms, 1), 

739 'energy_kwh': round( 

740 self._registry.get_total_energy_kwh(hours=0.01), 6), 

741 } 

742 

743 # ─── SRP helpers extracted from dispatch_draft_first ─── 

744 

745 # Class-level toggle the health probe can flip off in tests. Prod 

746 # leaves it enabled so dead-port POSTs short-circuit cleanly; unit 

747 # tests that mock _dispatch_to_model set it to False so the mocked 

748 # dispatch actually runs. Kept as a class attribute (not instance) 

749 # so fixtures can patch once for the whole suite. 

750 _health_probe_enabled: bool = True 

751 

752 def _check_draft_first_gates(self, prompt: str) -> Optional[str]: 

753 """Run the cheap gates (circuit breaker + constitutional filter + 

754 draft-server health probe) that must pass before we spend any 

755 model time. 

756 

757 Returns None on success, or an error string identifying which gate 

758 rejected the request. Keeps dispatch_draft_first's orchestration 

759 thin — this method owns "is the system healthy enough to proceed". 

760 """ 

761 from security.hive_guardrails import HiveCircuitBreaker, ConstitutionalFilter 

762 if HiveCircuitBreaker.is_halted(): 

763 return 'Hive is halted' 

764 passed, reason = ConstitutionalFilter.check_prompt(prompt) 

765 if not passed: 

766 return reason or 'constitutional filter' 

767 # Fast TCP probe against the draft server. If the 0.8B caption 

768 # server (port 8081) isn't listening, fall through to the normal 

769 # 4B path instead of POSTing to a dead port and waiting for a 

770 # socket timeout on every chat request. Cache the result for 30s 

771 # so we don't probe on every message. 

772 if self._health_probe_enabled and not self._draft_server_alive(): 

773 return 'draft_server_offline' 

774 return None 

775 

776 _draft_probe_ts: float = 0.0 

777 _draft_probe_ok: bool = False 

778 

779 def _draft_server_alive(self) -> bool: 

780 """Cheap TCP probe against the draft server endpoint. Cached 

781 for 30s so the dispatcher stays responsive under chat load. 

782 Returns True if a connect() to the draft host:port succeeds.""" 

783 import socket 

784 import time as _t 

785 now = _t.time() 

786 if now - self._draft_probe_ts < 30.0: 

787 return self._draft_probe_ok 

788 ok = False 

789 try: 

790 from core.port_registry import get_local_draft_url 

791 url = get_local_draft_url() 

792 # http://host:port/v1 → (host, port) 

793 _body = url.split('://', 1)[-1].split('/', 1)[0] 

794 host, _, port_s = _body.partition(':') 

795 port = int(port_s) if port_s else 80 

796 with socket.create_connection((host, port), timeout=0.5): 

797 ok = True 

798 except Exception: 

799 ok = False 

800 self.__class__._draft_probe_ts = now 

801 self.__class__._draft_probe_ok = ok 

802 return ok 

803 

804 def _build_draft_classifier_prompt( 

805 self, user_prompt: str, agent_persona: Optional[str] = None, 

806 preferred_lang: str = 'en', 

807 recent_turns: Optional[List[Dict]] = None, 

808 ) -> str: 

809 """Wrap the user prompt with the draft-first classifier instruction. 

810 

811 The draft's job is twofold: (a) produce a short standby reply fit 

812 for simple chat, (b) self-assess whether a bigger model is needed. 

813 The JSON schema is flat so a 0.8B model can reliably emit it. 

814 

815 If ``agent_persona`` is provided, it's prepended to the instruction 

816 so the draft's reply is in the voice of the custom / system agent 

817 the user is talking to instead of a generic first-responder. Used 

818 for the Path-2 system-agent case (e.g. Nunba personality agent). 

819 

820 If ``recent_turns`` is provided, prior conversation context is 

821 rendered as a "Recent conversation" block before the current user 

822 prompt so the draft can answer follow-ups in context (e.g. user 

823 asks "what's happening?" 60s after asking about WhatsApp — without 

824 history the draft would treat each turn as first-contact and emit 

825 a generic greeting). Capped at 4 turns to fit the 0.8B context 

826 budget; oldest first; long messages are truncated to 400 chars. 

827 

828 Owns ONLY prompt construction — no I/O, no side effects. 

829 """ 

830 # Default brand identity when no explicit persona is supplied. 

831 # Without this fall-back, the user's "who are you?" turn drops 

832 # through to the underlying model's training-default name 

833 # ("I'm Qwen3.5...") because cf3e337 deliberately removed every 

834 # "You are <internal-role>" sentence to fix an identity-leak 

835 # where the draft echoed "first-responder" architecture jargon. 

836 # That fix was correct in spirit but went one step too far — 

837 # the BRAND identity (the user-facing product name "Nunba") is 

838 # not architecture jargon and is exactly what the user expects 

839 # to hear when no per-agent persona is selected. The 

840 # ``agent_persona`` branch below overrides this for any turn 

841 # where a specific persona is in scope, so explicit personas 

842 # are unaffected. 

843 # 

844 # Single source of truth — core.constants.NUNBA_BRAND_IDENTITY. 

845 # Same constant is imported by Nunba's _fallback_chat in 

846 # routes/hartos_backend_adapter.py so the two paths can never 

847 # drift on brand wording. 

848 from core.constants import NUNBA_BRAND_IDENTITY 

849 persona_block = f"{NUNBA_BRAND_IDENTITY}\n\n" 

850 if agent_persona: 

851 # Cap the persona at ~800 chars so a long system prompt doesn't 

852 # blow the 0.8B model's context budget on a single-turn call. 

853 snippet = agent_persona.strip()[:800] 

854 persona_block = ( 

855 "You are playing the following persona — reply in this " 

856 "voice, but keep the JSON schema below exactly as " 

857 "specified. Persona:\n" 

858 f"{snippet}\n\n" 

859 ) 

860 lang_block = '' 

861 if preferred_lang and not preferred_lang.startswith('en'): 

862 try: 

863 from core.constants import SUPPORTED_LANG_DICT 

864 lang_name = SUPPORTED_LANG_DICT.get(preferred_lang[:2], preferred_lang) 

865 except ImportError: 

866 lang_name = preferred_lang 

867 # Same language + tone prompt the 4B path uses (with examples, code-mixing rules) 

868 _tone = '' 

869 try: 

870 from core.agent_personality import get_regional_tone_prompt 

871 _tone = get_regional_tone_prompt(preferred_lang) 

872 except Exception: 

873 pass 

874 lang_block = ( 

875 f"Answer questions accurately and respond as quickly as possible in {lang_name}. " 

876 f"Keep responses under 200 words. Be colloquial and natural.\n" 

877 f"{_tone}\n\n" 

878 ) 

879 

880 # Compute the runtime capability summary ONCE per prompt build so 

881 # the static-tools / ModelCatalog / MCP / channel walks don't run 

882 # twice for the conditional injection below. 

883 cap_summary = _capability_summary_safe() 

884 cap_block = ( 

885 f"Available capabilities (the system can do these via the " 

886 f"routing path below): {cap_summary}.\n\n" 

887 if cap_summary else "" 

888 ) 

889 

890 # Recent conversation context — single source via 

891 # seed_autogen_from_shared_history (the autogen path uses the same 

892 # call), formatted into a flat User:/Assistant: transcript so the 

893 # 0.8B can read follow-ups in context. Capped at 4 turns + 400 

894 # chars/turn to fit the draft's context budget. 

895 history_block = "" 

896 if recent_turns: 

897 _hist_lines = [] 

898 for _turn in recent_turns[-4:]: 

899 _role = _turn.get('role') or '' 

900 _content = (_turn.get('content') or '').strip() 

901 if not _content: 

902 continue 

903 if len(_content) > 400: 

904 _content = _content[:400] + '…' 

905 if _role == 'user': 

906 _hist_lines.append(f"User: {_content}") 

907 elif _role == 'assistant': 

908 _hist_lines.append(f"Assistant: {_content}") 

909 if _hist_lines: 

910 history_block = ( 

911 "Recent conversation (oldest first; use this to " 

912 "interpret the user's current message in context — " 

913 "follow-ups like 'what's happening?' or 'why?' refer " 

914 "back to these turns):\n" 

915 + "\n".join(_hist_lines) 

916 + "\n\n" 

917 ) 

918 

919 return ( 

920 persona_block 

921 + lang_block 

922 # ── Job + answering rules — size-agnostic, identity-free ───── 

923 # Same wording works whether the draft is 0.8B, 4B, or 27B. 

924 # The model in this slot does NOT see the full tool registry 

925 # (web fetch, code exec, GitHub, filesystem, vision, computer 

926 # control, MCP servers, channels), the user's loaded persona, 

927 # multi-turn memory, or the ReAct loop — so it must never 

928 # refuse on behalf of the system. 

929 # 

930 # The 3ea8648 prompt opened with "You are a fast local 

931 # first-responder" and the model would echo it verbatim on 

932 # "who are you?" → "I'm your fast local first-responder, 

933 # ready to assist you right away." cf3e337 fixed that by 

934 # removing every internal-role identity sentence, but went 

935 # one step too far — with NO identity at all the 0.8B fell 

936 # through to its training-default name ("I'm Qwen…"). The 

937 # default brand identity ("You are Nunba…") now lives in 

938 # persona_block above, so this section deliberately does 

939 # NOT add another "You are <X>" line — only the BRAND 

940 # identity above is allowed; INTERNAL-ROLE jargon 

941 # ("first-responder", "draft", "classifier") stays out. 

942 # All instructions below are phrased as the *job* and 

943 # *rules*, never as architecture identity. 

944 + "Your job is to produce a short reply to the user AND " 

945 "classify the user's intent on several independent axes. The " 

946 "classification flags route the message downstream — be " 

947 "accurate.\n\n" 

948 # Positive capability summary — primary teaching mechanism so 

949 # the model knows what the system CAN do. Auto-discovered: 

950 # static tool list + ModelCatalog (TTS/STT/VLM/video/audio, 

951 # rolled up by type) + MCP servers + channels + expert-agent 

952 # categories. Computed once into cap_block above. 

953 + cap_block 

954 + "ANSWERING RULES — READ BEFORE REPLYING:\n" 

955 "You only see this single turn. The system's actual tool / " 

956 "integration / capability set is dynamic and not visible from " 

957 "here — so you don't get to decide what the system can or " 

958 "can't do. Therefore:\n" 

959 "- NEVER write 'I cannot', 'I don't have access', 'I'm unable', " 

960 "'I'm just a', 'I do not have the ability', or any phrase " 

961 "asserting the system can't do something.\n" 

962 "- NEVER claim no internet/tools/file access; you have no way " 

963 "to verify what is or isn't reachable.\n" 

964 "- You MAY answer directly ONLY for trivial recall, simple " 

965 "math, greetings, or questions that need NO live data, NO " 

966 "external system access, NO filesystem, NO code execution, " 

967 "and NO per-user state beyond this single turn.\n" 

968 "- For ANYTHING else (URLs, code, files, current state, " 

969 "multi-step reasoning, capability questions): set " 

970 "delegate=\"local\" (or \"hive\" for very large requests) " 

971 "and write a brief standby reply such as \"Let me check that " 

972 "for you…\", \"Looking that up…\", or \"One moment…\". The " 

973 "standby is replaced by the authoritative answer automatically " 

974 "— your only job is to keep the user comfortable while that " 

975 "runs.\n" 

976 "- Refusals are not your call. If you ever feel the urge to " 

977 "refuse: pick a standby instead and delegate.\n\n" 

978 + history_block 

979 + f"User: {user_prompt}\n\n" 

980 "Respond with ONE JSON object on a single line and NOTHING else:\n" 

981 '{"reply": "<your short reply to the user, 1-3 sentences>", ' 

982 '"delegate": "none" OR "local" OR "hive", ' 

983 '"confidence": <float 0-1>, ' 

984 '"is_correction": true OR false, ' 

985 '"is_casual": true OR false, ' 

986 '"is_create_agent": true OR false, ' 

987 '"channel_connect": "<channel name or empty string>", ' 

988 '"language_change": "<ISO 639-1 code or empty string>", ' 

989 '"invite_intent": "<short context if user wants invite link, or empty>", ' 

990 '"join_room_intent": "<platform + room/url if user wants agent to join, or empty>", ' 

991 '"memory_query": "<short context if user asks about past conversations, or empty>", ' 

992 '"reason": "<why you chose this delegate value>"}\n\n' 

993 # ── delegate ──────────────────────────────────────────────── 

994 "delegate: Use \"none\" for greetings, small-talk, factual " 

995 "questions you can fully answer yourself, or anything that needs " 

996 "no external tools. Use \"local\" if the request needs tools, " 

997 "code, reasoning, or multi-step work the 4B model can handle. " 

998 "Use \"hive\" if it needs large-model expertise, long-context " 

999 "research, or specialized skill distribution.\n\n" 

1000 # ── is_correction ──────────────────────────────────────────── 

1001 "is_correction: true when the user is telling you something " 

1002 "in the previous assistant turn was wrong, inaccurate, or " 

1003 "they're restating what they actually meant (e.g. 'no that's " 

1004 "wrong', 'actually, I meant X', 'not quite', 'you got it " 

1005 "wrong'). Otherwise false. Routes the turn into the hive's " 

1006 "real-time learning pipeline, so prefer false when unsure.\n\n" 

1007 # ── is_casual ──────────────────────────────────────────────── 

1008 "is_casual: true when the message is pure chit-chat, a " 

1009 "greeting, an acknowledgement, or anything that clearly " 

1010 "doesn't need any tools, search, computer control, agent " 

1011 "creation, or multi-step reasoning. Used to skip the heavy " 

1012 "tool-resolution pipeline. If in doubt (looks substantive), " 

1013 "prefer false.\n\n" 

1014 # ── is_create_agent ───────────────────────────────────────── 

1015 "is_create_agent: true when the user is explicitly asking to " 

1016 "create, build, train, or set up a NEW AI agent / bot / " 

1017 "assistant / automated workflow. Not true for questions " 

1018 "ABOUT agents, or for using an existing agent. Routes the " 

1019 "turn into the autogen CREATE flow.\n\n" 

1020 # ── channel_connect ───────────────────────────────────────── 

1021 "channel_connect: if the user is asking to connect, add, " 

1022 "link, or set up a messaging channel (WhatsApp, Telegram, " 

1023 "Slack, Discord, Gmail, SMS, Teams, Messenger, etc.) put " 

1024 "the lowercased channel name here (e.g. \"whatsapp\"). " 

1025 "Otherwise use an empty string \"\". This routes the turn " 

1026 "to the Connect_Channel tool.\n\n" 

1027 # ── language_change ───────────────────────────────────────── 

1028 "language_change: if the user is asking to switch language " 

1029 "(e.g. \"talk to me in tamil\", \"hablame en español\", " 

1030 "\"parle-moi en français\", \"日本語で話して\"), put the " 

1031 "ISO 639-1 code here (e.g. \"ta\" for Tamil, \"es\" for " 

1032 "Spanish, \"fr\" for French, \"ja\" for Japanese, \"hi\" " 

1033 "for Hindi, \"zh\" for Chinese, \"ko\" for Korean, \"ar\" " 

1034 "for Arabic, \"de\" for German, \"ru\" for Russian). " 

1035 "Otherwise use an empty string \"\". This overrides the " 

1036 "session's preferred_lang so the main LLM responds in " 

1037 "the requested language and TTS routes to an engine that " 

1038 "supports it.\n\n" 

1039 # ── invite_intent ─────────────────────────────────────────── 

1040 "invite_intent: if the user is asking to invite, share, or " 

1041 "refer a friend / colleague / family member to Nunba (e.g. " 

1042 "\"invite a friend\", \"give me an invite link\", \"share " 

1043 "Nunba with my colleague\", \"how do I refer people\"), put " 

1044 "a short freeform context here (e.g. \"work friend\" or " 

1045 "\"family\") — empty string is fine for a generic shareable " 

1046 "link. Otherwise use an empty string \"\". This routes the " 

1047 "turn to the Invite_Friend tool.\n\n" 

1048 # ── join_room_intent ──────────────────────────────────────── 

1049 "join_room_intent: if the user is asking the AI to JOIN an " 

1050 "external room / channel / meeting / group as a co-pilot, " 

1051 "note-taker, or participant (e.g. \"join my Discord audio " 

1052 "room\", \"attend my Teams meet\", \"take notes in the " 

1053 "WhatsApp family group\", \"co-pilot my Slack channel\"), " 

1054 "put a short \"<platform> <room or url>\" string here " 

1055 "(e.g. \"discord https://discord.com/channels/123/456\"). " 

1056 "Otherwise use an empty string \"\". This routes the turn " 

1057 "to the Join_External_Room tool, which always gates on " 

1058 "consent and announces the agent's presence in the room.\n\n" 

1059 # ── memory_query ──────────────────────────────────────────── 

1060 "memory_query: if the user is asking about something they " 

1061 "discussed previously, what was said in past conversations, " 

1062 "or asking the assistant to recall earlier context (e.g. " 

1063 "\"what did we speak 2 days back\", \"do you remember when " 

1064 "I asked about X\", \"what was that thing we discussed last " 

1065 "week\", \"recall my previous question on Y\", \"what did I " 

1066 "tell you about my project\"), put a short freeform context " 

1067 "string here describing the topic / time window (e.g. " 

1068 "\"conversations from last 2 days\", \"earlier project " 

1069 "discussion\"). Otherwise use an empty string \"\". This " 

1070 "routes the turn to the recall_memory tool which searches " 

1071 "the memory graph with optional time filters." 

1072 ) 

1073 

1074 def _track_call_telemetry( 

1075 self, model: 'ModelBackend', latency_ms: float, node_id: Optional[str], 

1076 ) -> None: 

1077 """Record the per-model telemetry trio (energy + latency + 

1078 compute-contribution for hive reward). 

1079 

1080 Owns ONLY the telemetry side-effects so dispatch_draft_first, 

1081 dispatch_speculative, and any future dispatch variant can share 

1082 one call path. No return value — this is fire-and-forget.""" 

1083 self._registry.record_energy(model.model_id, latency_ms) 

1084 self._registry.record_latency(model.model_id, latency_ms) 

1085 self._record_compute_contribution(node_id, model.model_id, latency_ms) 

1086 

1087 def _schedule_expert_background( 

1088 self, 

1089 speculation_id: str, 

1090 prompt: str, 

1091 fast_response: str, 

1092 expert_model: Optional['ModelBackend'], 

1093 user_id: str, 

1094 prompt_id: str, 

1095 goal_id: Optional[str], 

1096 goal_type: str, 

1097 origin_model_id: str, 

1098 origin_model_role: str = 'fast_model', 

1099 delegate: Optional[str] = None, 

1100 escalation_reason: Optional['EscalationReason'] = None, 

1101 ) -> bool: 

1102 """Schedule the expert-improvement task in the background pool. 

1103 

1104 Centralizes the registration into self._active + thread submit so 

1105 both dispatch_draft_first and dispatch_speculative share one code 

1106 path. Returns True if the expert was actually scheduled. 

1107 

1108 ``escalation_reason`` is purely observability metadata: it gets 

1109 stamped into ``self._active[speculation_id]`` so admin /diag and 

1110 telemetry can ask "why was this turn escalated?" without 

1111 re-deriving the heuristic. Optional + defaults to None so the 

1112 legacy dispatch_speculative call site (which has no draft to 

1113 derive a reason from) needs no change. 

1114 

1115 Guards: 

1116 - no expert model → nothing to schedule 

1117 - expert_model.model_id == origin_model_id → pointless, skip 

1118 - budget denied → skip 

1119 """ 

1120 if expert_model is None: 

1121 return False 

1122 if expert_model.model_id == origin_model_id: 

1123 return False 

1124 if not self._check_and_reserve_budget(user_id, goal_id, expert_model): 

1125 return False 

1126 

1127 with self._lock: 

1128 entry = { 

1129 origin_model_role: origin_model_id, 

1130 'expert_model': expert_model.model_id, 

1131 'user_id': user_id, 

1132 'prompt_id': prompt_id, 

1133 'goal_id': goal_id, 

1134 'started_at': time.time(), 

1135 } 

1136 if delegate is not None: 

1137 entry['delegate'] = delegate 

1138 if escalation_reason is not None: 

1139 # Store the canonical string value (Enum's str inheritance 

1140 # makes this safe for JSON / SSE round-trip). 

1141 entry['escalation_reason'] = ( 

1142 escalation_reason.value 

1143 if hasattr(escalation_reason, 'value') 

1144 else str(escalation_reason) 

1145 ) 

1146 self._active[speculation_id] = entry 

1147 

1148 self._expert_pool.submit( 

1149 self._expert_background_task, 

1150 speculation_id, prompt, fast_response, 

1151 expert_model, user_id, prompt_id, goal_id, goal_type, 

1152 ) 

1153 return True 

1154 

1155 def _parse_draft_envelope(self, raw: str) -> dict: 

1156 """Extract the {reply, delegate, confidence} JSON the draft should 

1157 have produced. Tolerant of markdown fences, prose wrappers, and 

1158 trailing commas. 

1159 

1160 Returns an empty dict on total parse failure — callers should treat 

1161 that as 'delegate to local' via the default in dispatch_draft_first.""" 

1162 if not raw: 

1163 return {} 

1164 import json as _json 

1165 import re as _re 

1166 

1167 text = raw.strip() 

1168 

1169 # Strip ```json ... ``` fences if present 

1170 fence = _re.search(r'```(?:json)?\s*(\{.*?\})\s*```', text, _re.DOTALL) 

1171 if fence: 

1172 text = fence.group(1) 

1173 

1174 # Try raw parse first 

1175 try: 

1176 return _json.loads(text) 

1177 except (_json.JSONDecodeError, TypeError): 

1178 pass 

1179 

1180 # Fall back to the first {...} we can find 

1181 match = _re.search(r'\{.*\}', text, _re.DOTALL) 

1182 if match: 

1183 candidate = match.group(0) 

1184 # Trim trailing commas before } or ] 

1185 candidate = _re.sub(r',\s*([\}\]])', r'\1', candidate) 

1186 try: 

1187 return _json.loads(candidate) 

1188 except (_json.JSONDecodeError, TypeError): 

1189 pass 

1190 

1191 logger.debug(f"draft envelope parse failed: {raw[:120]!r}") 

1192 return {} 

1193 

1194 def _pick_expert_for_delegate(self, delegate: str, 

1195 user_pref: str = 'auto'): 

1196 """Select the background model for a given delegate value. 

1197 

1198 - "local": local FAST-tier model (e.g. Qwen3.5-4B) 

1199 - "hive": highest-accuracy hive/expert model, falls back to local 

1200 if no remote expert is available 

1201 

1202 ``user_pref`` honours the Demopage intelligence toggle 

1203 (``local_only`` | ``auto`` | ``hive_preferred``): 

1204 

1205 - ``local_only`` + ``delegate='hive'`` → **downgrade to local 

1206 fast model**. The user explicitly opted out of hive compute; 

1207 we respect that even if the draft self-delegated. 

1208 - ``auto`` (default) → today's behavior: hive delegate picks the 

1209 expert, falls back to fast if no expert registered. 

1210 - ``hive_preferred`` → today's behavior for model selection; the 

1211 ADDITIONAL MoE HiveMind fusion consult is fired separately by 

1212 ``dispatch_draft_first`` via ``_schedule_hive_consult``. 

1213 

1214 Install-validation gate: 

1215 A model that was installed via `/api/admin/models/hub/install` 

1216 is stamped ``capabilities['install_validated']`` — ``False`` on 

1217 register, flipped to ``True`` only when the post-download 

1218 ``loader.load()`` probe succeeded. If the dispatcher picks an 

1219 entry whose validation has not yet flipped (or explicitly 

1220 failed), fall back to the local fast model rather than serve 

1221 the user an unproven weight. Seeded / manually-registered 

1222 entries have no such flag and are trusted by default. 

1223 

1224 Returns None if no suitable model exists — caller then treats 

1225 the draft's reply as the final answer.""" 

1226 if delegate == 'local': 

1227 return self._pass_validation_gate( 

1228 self._registry.get_fast_model()) 

1229 if delegate == 'hive': 

1230 # Respect the user's explicit "local only" preference even 

1231 # when the draft says hive would be better — the toggle is 

1232 # the final authority on egress. 

1233 if user_pref == 'local_only': 

1234 return self._pass_validation_gate( 

1235 self._registry.get_fast_model()) 

1236 expert = self._registry.get_expert_model() 

1237 gated = self._pass_validation_gate(expert) 

1238 if gated: 

1239 return gated 

1240 # Graceful fallback: no (valid) hive expert → use local fast 

1241 return self._pass_validation_gate( 

1242 self._registry.get_fast_model()) 

1243 return None 

1244 

1245 def _pass_validation_gate(self, model): 

1246 """Refuse to route to a hub-installed model whose post-download 

1247 load probe has not yet succeeded. 

1248 

1249 Catalog entries registered by `/api/admin/models/hub/install` 

1250 carry ``source == 'hub-install'`` and 

1251 ``capabilities['install_validated']`` (False at register time, 

1252 flipped to True by the background validate probe). Any other 

1253 entry — seeded preset, manual register, legacy — has no such 

1254 flag and passes through unchanged, preserving every existing 

1255 code path byte-for-byte. 

1256 

1257 Returns ``None`` when the gate rejects the pick so the caller 

1258 can cascade to the next fallback.""" 

1259 if model is None: 

1260 return None 

1261 try: 

1262 model_id = getattr(model, 'id', None) or getattr(model, 'model_id', None) 

1263 if not model_id: 

1264 return model # unknown shape — don't second-guess 

1265 # Late import — the catalog lives in the Nunba-side shim 

1266 # but is a transitive dependency through service_tools. 

1267 from integrations.service_tools.model_catalog import ( 

1268 get_catalog, 

1269 ) 

1270 entry = get_catalog().get(model_id) 

1271 if entry is None: 

1272 return model # not a catalog entry — trust the registry 

1273 source = getattr(entry, 'source', '') 

1274 if source != 'hub-install': 

1275 return model # seeded / manual → no validation requirement 

1276 caps = getattr(entry, 'capabilities', {}) or {} 

1277 if caps.get('install_validated') is True: 

1278 return model 

1279 logger.info( 

1280 f"[dispatcher] refusing unvalidated hub-install " 

1281 f"{model_id}; caller will fall back" 

1282 ) 

1283 return None 

1284 except Exception as e: 

1285 # Any error in the gate degrades open — the dispatcher must 

1286 # not fail chat because the validation lookup misbehaved. 

1287 logger.debug(f"[dispatcher] validation gate error: {e}") 

1288 return model 

1289 

1290 def _schedule_hive_consult(self, prompt: str, user_id: str, 

1291 speculation_id: str) -> bool: 

1292 """Best-effort MoE HiveMind fusion consult on the expert pool. 

1293 

1294 Fired only when the user selected 

1295 ``intelligence_preference='hive_preferred'`` AND the draft 

1296 self-delegated to ``hive``. Runs on the existing 

1297 ``_expert_pool`` so it never contends with the chat hot path; 

1298 the consult result populates ``self._last_hive_consult`` for 

1299 observers (journey tests, admin/diag, future fusion wiring) to 

1300 read. 

1301 

1302 Guarantees: 

1303 * Returns ``True`` iff a task was accepted onto the pool. 

1304 * All failures are swallowed (``logger.debug`` only) — the 

1305 chat reply path never blocks or errors on a missing / 

1306 offline HiveMind. 

1307 * No dependency on ``world_model_bridge`` at import time; the 

1308 import is inside the worker so environments without 

1309 hevolveai (pip) still load the dispatcher cleanly. 

1310 """ 

1311 def _consult(): 

1312 try: 

1313 from integrations.agent_engine.world_model_bridge import ( 

1314 get_world_model_bridge, 

1315 ) 

1316 bridge = get_world_model_bridge() 

1317 if not bridge: 

1318 return 

1319 # 1500ms timeout matches the default hive consult budget 

1320 # in world_model_bridge and is half the chat hot-path 

1321 # ceiling — cannot stall user-visible latency because 

1322 # this runs on a worker thread, not the request thread. 

1323 result = bridge.query_hivemind( 

1324 prompt, timeout_ms=1500, user_id=user_id, 

1325 ) 

1326 if not result: 

1327 return 

1328 with self._lock: 

1329 self._last_hive_consult[speculation_id] = { 

1330 'result': result, 

1331 'completed_at': time.time(), 

1332 'user_id': user_id, 

1333 } 

1334 # Cap the dict at the same ceiling as _results to 

1335 # avoid unbounded growth in long-running instances. 

1336 if len(self._last_hive_consult) > self._results_max: 

1337 oldest = sorted( 

1338 self._last_hive_consult.items(), 

1339 key=lambda kv: kv[1].get('completed_at', 0), 

1340 )[:len(self._last_hive_consult) - self._results_max] 

1341 for k, _ in oldest: 

1342 self._last_hive_consult.pop(k, None) 

1343 logger.info( 

1344 f"[hive_consult] speculation={speculation_id} " 

1345 f"user={user_id} fusion returned " 

1346 f"{str(result)[:160]!r}" 

1347 ) 

1348 except Exception as e: 

1349 logger.debug(f"[hive_consult] failed: {e}") 

1350 

1351 try: 

1352 self._expert_pool.submit(_consult) 

1353 return True 

1354 except Exception as e: 

1355 logger.debug(f"[hive_consult] pool submit failed: {e}") 

1356 return False 

1357 

1358 def _record_interaction_safely(self, **kwargs) -> None: 

1359 """Feed an interaction into HevolveAI via WorldModelBridge. Never 

1360 raises — continual learning is best-effort and the chat path must 

1361 not break if HevolveAI is offline or the bridge is in circuit-open 

1362 mode. WorldModelBridge already handles guardrail + secret redaction 

1363 internally.""" 

1364 try: 

1365 from integrations.agent_engine.world_model_bridge import get_world_model_bridge 

1366 bridge = get_world_model_bridge() 

1367 bridge.record_interaction(**kwargs) 

1368 except Exception as e: 

1369 logger.debug(f"record_interaction skipped: {e}") 

1370 

1371 # ─── Background expert task ─── 

1372 # 

1373 # Single path: expert (local langchain pipeline or hive peer) takes 

1374 # the ORIGINAL turn directly. No "improve this draft" wrapper, no 

1375 # similarity gate — when the expert is the actual expert (full tool 

1376 # registry locally, 27B or fine-tuned on hive), its reply IS the 

1377 # answer. The standby (fast_response) was already delivered by the 

1378 # draft-first path; this task replaces it via the existing 

1379 # speculation_id bubble-replacement channel on _deliver_expert_response. 

1380 

1381 def _expert_background_task(self, speculation_id: str, original_prompt: str, 

1382 fast_response: str, expert_model, user_id: str, 

1383 prompt_id: str, goal_id: str, goal_type: str): 

1384 """Background: dispatch expert → deliver (or fall through to draft 

1385 standby). Outer try/finally owns the shared invariants — 

1386 circuit-breaker gate, exception swallowing, ``_active`` cleanup — 

1387 so the helper can focus on dispatch + delivery semantics. 

1388 """ 

1389 try: 

1390 # GUARDRAIL: circuit breaker (check again — may have been halted) 

1391 from security.hive_guardrails import HiveCircuitBreaker 

1392 if HiveCircuitBreaker.is_halted(): 

1393 return 

1394 

1395 self._run_collapsed_expert_path( 

1396 speculation_id, original_prompt, fast_response, 

1397 expert_model, user_id, prompt_id, goal_id, goal_type) 

1398 

1399 except Exception as e: 

1400 logger.debug(f"Expert background task failed for {speculation_id}: {e}") 

1401 finally: 

1402 with self._lock: 

1403 self._active.pop(speculation_id, None) 

1404 

1405 def _run_collapsed_expert_path(self, speculation_id: str, 

1406 original_prompt: str, fast_response: str, 

1407 expert_model, user_id: str, prompt_id: str, 

1408 goal_id: str, goal_type: str): 

1409 """Collapsed path: expert takes the ORIGINAL turn through the full 

1410 langchain pipeline (local) or the hive endpoint (remote). No 

1411 'improve this draft' wrapper. Reuses ``_dispatch_expert_langchain`` 

1412 for routing + ``_deliver_expert_response`` for SSE/TTS fan-out. 

1413 """ 

1414 start = time.time() 

1415 expert_response = self._dispatch_expert_langchain( 

1416 expert_model, original_prompt, user_id, prompt_id, 

1417 goal_type, goal_id) 

1418 elapsed_ms = (time.time() - start) * 1000 

1419 

1420 # GUARDRAIL: energy + latency telemetry (same instrumentation as 

1421 # the legacy path — the rollout flip must not lose these metrics). 

1422 self._registry.record_energy(expert_model.model_id, elapsed_ms) 

1423 self._registry.record_latency(expert_model.model_id, elapsed_ms) 

1424 

1425 # Empty response → the draft standby stays as the final reply. 

1426 # The user already saw it; nothing to deliver. Record the 

1427 # _results entry so admin /diag can tell the expert ran and 

1428 # returned empty (vs never ran). 

1429 if not expert_response or not expert_response.strip(): 

1430 logger.debug( 

1431 "collapsed expert returned empty for %s; " 

1432 "draft standby remains the final reply", speculation_id) 

1433 with self._lock: 

1434 self._results[speculation_id] = { 

1435 'response': fast_response, 

1436 'model': expert_model.model_id, 

1437 'latency_ms': round(elapsed_ms, 1), 

1438 'improved': False, 

1439 } 

1440 self._evict_old_results() 

1441 return 

1442 

1443 # GUARDRAIL: constitutional check on expert output before delivery 

1444 from security.hive_guardrails import ConstitutionalFilter 

1445 passed, reason = ConstitutionalFilter.check_prompt(expert_response) 

1446 if not passed: 

1447 logger.warning( 

1448 "collapsed expert response blocked by guardrail: %s", 

1449 reason) 

1450 return 

1451 

1452 # Unconditional delivery: the expert is THE expert here, not a 

1453 # "maybe improvement". Bubble-replace the standby via the 

1454 # existing speculation_id channel (SSE + TTS — see 

1455 # _deliver_expert_response for the dual-channel contract). 

1456 self._deliver_expert_response( 

1457 user_id, prompt_id, speculation_id, expert_response) 

1458 

1459 # Feed continual learning. Stamp escalation_reason from the 

1460 # _active entry so distillation can weight refusal-overridden 

1461 # turns differently from clean classifier-delegate turns. 

1462 with self._lock: 

1463 active_entry = dict(self._active.get(speculation_id, {})) 

1464 served_by = ( 

1465 'hive_langchain_bg' if not expert_model.is_local 

1466 else 'local_langchain_bg' 

1467 ) 

1468 self._record_interaction_safely( 

1469 user_id=user_id, prompt_id=prompt_id, 

1470 prompt=original_prompt, response=expert_response, 

1471 model_id=expert_model.model_id, latency_ms=elapsed_ms, 

1472 goal_id=goal_id, 

1473 escalation_reason=active_entry.get('escalation_reason'), 

1474 ) 

1475 

1476 with self._lock: 

1477 self._results[speculation_id] = { 

1478 'response': expert_response, 

1479 'model': expert_model.model_id, 

1480 'latency_ms': round(elapsed_ms, 1), 

1481 'improved': True, 

1482 'served_by': served_by, 

1483 } 

1484 self._evict_old_results() 

1485 

1486 def _dispatch_expert_langchain(self, model, prompt: str, user_id: str, 

1487 prompt_id: str, goal_type: str, 

1488 goal_id: Optional[str]) -> str: 

1489 """Send the expert turn through the right transport for its tier. 

1490 

1491 - ``model.is_local=True``: route through the FULL HARTOS /chat 

1492 pipeline (agent loading, autogen GroupChat, full tool registry) 

1493 so actionable-intent / agent-bound turns actually fire their 

1494 tools. 

1495 * Bundled mode (NUNBA_BUNDLED / sys.frozen): in-process Flask 

1496 ``test_client`` — port 6777 is not bound in bundled Nunba. 

1497 * Non-bundled: HTTP POST to ``HEVOLVE_BASE_URL`` (or the 

1498 port_registry-resolved backend). 

1499 Re-entry is prevented by ``speculative=False, draft_first=False`` 

1500 in the payload — the inner /chat handler reads these and skips 

1501 the dispatcher. 

1502 

1503 - ``model.is_local=False`` (hive-served expert, registered by 

1504 ``HiveExpertDiscovery``): OpenAI-compatible POST to 

1505 ``{base_url}/chat/completions`` with the registered auth token. 

1506 The hive peer's 27B / fine-tuned model takes the turn directly. 

1507 

1508 Returns the response string, or ``''`` on any failure — caller 

1509 falls back to ``fast_response`` so the user always sees the 

1510 draft's standby. 

1511 """ 

1512 if model is None: 

1513 return '' 

1514 

1515 # ── Hive path: OpenAI-compatible POST to peer base_url ── 

1516 if not getattr(model, 'is_local', True): 

1517 cfg = getattr(model, 'config_list_entry', {}) or {} 

1518 base_url = (cfg.get('base_url') or '').rstrip('/') 

1519 api_key = cfg.get('api_key') or '' 

1520 inner_model_id = cfg.get('model') or model.model_id 

1521 if not base_url: 

1522 logger.debug( 

1523 "hive expert %s missing base_url — cannot dispatch", 

1524 model.model_id) 

1525 return '' 

1526 try: 

1527 import requests as _req 

1528 headers = ( 

1529 {'Authorization': f'Bearer {api_key}'} if api_key else {}) 

1530 resp = _req.post( 

1531 f'{base_url}/chat/completions', 

1532 headers=headers, 

1533 json={ 

1534 'model': inner_model_id, 

1535 'messages': [{'role': 'user', 'content': prompt}], 

1536 'max_tokens': 1500, 

1537 'temperature': 0.7, 

1538 }, 

1539 timeout=60, 

1540 ) 

1541 if resp.status_code == 200: 

1542 data = resp.json() or {} 

1543 choices = data.get('choices') or [] 

1544 if choices: 

1545 msg = (choices[0] or {}).get('message') or {} 

1546 return msg.get('content') or '' 

1547 else: 

1548 logger.debug( 

1549 "hive expert %s returned HTTP %s", 

1550 model.model_id, resp.status_code) 

1551 except Exception as e: 

1552 logger.debug( 

1553 "hive expert dispatch failed (%s): %s", 

1554 model.model_id, e) 

1555 return '' 

1556 

1557 # ── Local path: full HARTOS /chat pipeline ── 

1558 payload = { 

1559 'user_id': user_id, 

1560 'prompt_id': ( 

1561 f'{goal_type}_{goal_id[:8]}' if goal_id else prompt_id), 

1562 'prompt': prompt, 

1563 'create_agent': True, 

1564 'autonomous': True, 

1565 'casual_conv': False, 

1566 'model_config': model.to_config_list(), 

1567 # Hard no-reentry: inner /chat reads these and skips the 

1568 # dispatcher entirely so we never recursively re-enter. 

1569 'speculative': False, 

1570 'draft_first': False, 

1571 } 

1572 

1573 import sys as _sys 

1574 _bundled = bool( 

1575 os.environ.get('NUNBA_BUNDLED') 

1576 or getattr(_sys, 'frozen', False) 

1577 ) 

1578 if _bundled: 

1579 try: 

1580 # Late import — keeps module-load time independent of 

1581 # hart_intelligence_entry's heavy boot graph. In bundled 

1582 # Nunba this is cheap (already in sys.modules by the 

1583 # time a chat turn fires). 

1584 from hart_intelligence_entry import app as _app # type: ignore 

1585 with _app.test_client() as client: 

1586 resp = client.post('/chat', json=payload) 

1587 if resp.status_code == 200: 

1588 data = resp.get_json() or {} 

1589 return data.get('response') or '' 

1590 logger.debug( 

1591 "local expert /chat returned %s in bundled mode", 

1592 resp.status_code) 

1593 except Exception as e: 

1594 logger.debug( 

1595 "local expert bundled dispatch failed: %s", e) 

1596 return '' 

1597 

1598 # Non-bundled: HTTP POST to the configured backend. 

1599 try: 

1600 import requests as _req 

1601 base = os.environ.get( 

1602 'HEVOLVE_BASE_URL', 

1603 f'http://localhost:{get_port("backend")}', 

1604 ) 

1605 resp = _req.post(f'{base}/chat', json=payload, timeout=60) 

1606 if resp.status_code == 200: 

1607 return (resp.json() or {}).get('response') or '' 

1608 logger.debug( 

1609 "local expert /chat HTTP returned %s", resp.status_code) 

1610 except Exception as e: 

1611 logger.debug("local expert HTTP dispatch failed: %s", e) 

1612 return '' 

1613 

1614 # ─── Helpers ─── 

1615 

1616 def _dispatch_to_model(self, model: 'ModelBackend', prompt: str, 

1617 user_id: str, prompt_id: str, 

1618 goal_type: str, goal_id: str = None) -> str: 

1619 """Send prompt to a specific model via /chat endpoint with config override. 

1620 

1621 Always passes ``speculative=False`` and ``draft_first=False`` on the 

1622 inner call so the dispatcher can never recursively re-enter itself 

1623 when HEVOLVE_DRAFT_FIRST or the legacy speculative flag is enabled 

1624 upstream. The outer chat route triggered us, and that's where the 

1625 decision to speculate was made. 

1626 

1627 In bundled/in-process mode (Nunba desktop), uses Flask test_client() 

1628 instead of HTTP — port 6777 is never bound in bundled mode. 

1629 """ 

1630 payload = { 

1631 'user_id': user_id, 

1632 'prompt_id': f'{goal_type}_{goal_id[:8]}' if goal_id else prompt_id, 

1633 'prompt': prompt, 

1634 'create_agent': True, 

1635 'autonomous': True, 

1636 'casual_conv': False, 

1637 'model_config': model.to_config_list(), 

1638 # Hard no-reentry guard — inner dispatch never speculates 

1639 'speculative': False, 

1640 'draft_first': False, 

1641 } 

1642 

1643 # Bundled mode: call the model's llama-server directly on its port. 

1644 # Do NOT use Flask test_client('/chat') — that re-enters the full 

1645 # HARTOS pipeline (autogen, agent creation, etc.) causing re-entrancy. 

1646 _bundled = bool(os.environ.get('NUNBA_BUNDLED') or getattr(__import__('sys'), 'frozen', False)) 

1647 if _bundled: 

1648 try: 

1649 # Resolve the model's direct port from the catalog/port_registry 

1650 _port = None 

1651 if hasattr(model, 'port') and model.port: 

1652 _port = model.port 

1653 if not _port: 

1654 try: 

1655 from core.port_registry import get_local_draft_url, get_local_llm_url 

1656 _url = get_local_draft_url() or get_local_llm_url() 

1657 if _url: 

1658 # Extract port from URL like http://127.0.0.1:8081/v1 

1659 import re as _re 

1660 _m = _re.search(r':(\d+)', _url) 

1661 _port = int(_m.group(1)) if _m else 8081 

1662 except Exception: 

1663 _port = 8081 # draft default 

1664 import requests as _req 

1665 resp = _req.post( 

1666 f'http://127.0.0.1:{_port}/v1/chat/completions', 

1667 json={ 

1668 'model': 'llama', 

1669 'messages': [{'role': 'user', 'content': prompt}], 

1670 'max_tokens': 500, 

1671 'temperature': 0.7, 

1672 }, 

1673 timeout=15, 

1674 ) 

1675 if resp.status_code == 200: 

1676 data = resp.json() 

1677 if 'choices' in data: 

1678 return data['choices'][0]['message']['content'] 

1679 elif 'error' in data: 

1680 logger.debug(f"Draft model error: {data['error']}") 

1681 except Exception as e: 

1682 logger.debug(f"Direct draft dispatch failed ({model.model_id}): {e}") 

1683 return '' 

1684 

1685 # HTTP mode: external HARTOS server 

1686 import requests as req 

1687 base_url = os.environ.get('HEVOLVE_BASE_URL', f'http://localhost:{get_port("backend")}') 

1688 try: 

1689 resp = req.post(f'{base_url}/chat', json=payload, timeout=30) 

1690 if resp.status_code == 200: 

1691 return resp.json().get('response', '') 

1692 except req.RequestException as e: 

1693 logger.debug(f"Model dispatch failed ({model.model_id}): {e}") 

1694 return '' 

1695 

1696 def _deliver_expert_response(self, user_id: str, prompt_id: str, 

1697 speculation_id: str, response: str): 

1698 """Dual-channel async delivery: Crossbar chat topic + TTS pupit topic. 

1699 

1700 Worker-thread safe — uses ``core.safe_hartos_attr`` to read 

1701 hart_intelligence symbols without triggering Python's per-module 

1702 import lock (worker threads racing the canonical loader on the 

1703 langchain_core / transformers import chain caused multi-minute 

1704 agent_daemon freezes; resolving via sys.modules avoids the lock). 

1705 """ 

1706 from core.safe_hartos_attr import safe_hartos_attr 

1707 from core.peer_link.message_bus import chat_topic_for 

1708 

1709 # 1. Publish text via canonical publish_async (MessageBus → Crossbar) 

1710 try: 

1711 publish_async = safe_hartos_attr('publish_async') 

1712 if publish_async is not None: 

1713 topic = chat_topic_for(user_id) 

1714 publish_async(topic, response) 

1715 logger.info( 

1716 "Expert chat publish: spec=%s user=%s topic=%s len=%d", 

1717 speculation_id, user_id, topic, len(response or ''), 

1718 ) 

1719 else: 

1720 logger.info( 

1721 "Expert chat publish skipped: spec=%s user=%s — " 

1722 "HARTOS publish_async not yet resolvable (loader still " 

1723 "initialising). Drop the speculative bubble; the main " 

1724 "reply path will deliver when ready.", 

1725 speculation_id, user_id, 

1726 ) 

1727 except Exception as e: 

1728 logger.warning( 

1729 "Expert chat publish failed: spec=%s user=%s err=%s", 

1730 speculation_id, user_id, e, 

1731 ) 

1732 

1733 # 2. Synthesize TTS and publish to pupit audio topic — ensures speculative 

1734 # expert improvements get the SAME audio treatment as regular replies 

1735 # (users on TTS-enabled sessions hear the improved response). 

1736 try: 

1737 _tts_synthesize_and_publish = safe_hartos_attr( 

1738 '_tts_synthesize_and_publish') 

1739 if _tts_synthesize_and_publish is not None: 

1740 _tts_synthesize_and_publish( 

1741 response, str(user_id), speculation_id) 

1742 logger.info( 

1743 "Expert TTS publish: spec=%s user=%s", 

1744 speculation_id, user_id, 

1745 ) 

1746 else: 

1747 logger.info( 

1748 "Expert TTS publish skipped: spec=%s user=%s — " 

1749 "HARTOS _tts_synthesize_and_publish not yet resolvable.", 

1750 speculation_id, user_id, 

1751 ) 

1752 except Exception as e: 

1753 logger.debug(f"Expert TTS publish failed: spec={speculation_id} err={e}") 

1754 

1755 logger.info(f"Expert enhancement delivered: spec={speculation_id}, " 

1756 f"user={user_id}") 

1757 

1758 def _check_and_reserve_budget(self, user_id: str, goal_id: str, 

1759 expert_model) -> bool: 

1760 """Check Spark budget before expert execution (atomic row lock). 

1761 

1762 Delegates to shared budget_gate.check_goal_budget() to avoid duplication. 

1763 """ 

1764 if not goal_id: 

1765 return True # No goal = no budget constraint 

1766 

1767 try: 

1768 from .budget_gate import check_goal_budget 

1769 cost = expert_model.cost_per_1k_tokens 

1770 allowed, remaining, reason = check_goal_budget(goal_id, cost) 

1771 return allowed 

1772 except ImportError: 

1773 return True # Allow if budget system unavailable 

1774 

1775 def _record_compute_contribution(self, node_id: str, model_id: str, 

1776 latency_ms: float): 

1777 """Credit hive node for serving fast response → ad revenue eligibility. 

1778 

1779 GUARDRAIL: Only master_key_verified nodes get credit. 

1780 GUARDRAIL: ComputeDemocracy.adjusted_reward() — logarithmic, not linear. 

1781 """ 

1782 if not node_id: 

1783 return 

1784 try: 

1785 from integrations.social.models import get_db, PeerNode 

1786 db = get_db() 

1787 try: 

1788 peer = db.query(PeerNode).filter_by(node_id=node_id).first() 

1789 if peer and peer.master_key_verified: 

1790 peer.agent_count = (peer.agent_count or 0) + 1 

1791 db.commit() 

1792 finally: 

1793 db.close() 

1794 except Exception as e: 

1795 logger.debug(f"Compute contribution recording skipped: {e}") 

1796 

1797 def _evict_old_results(self): 

1798 """Evict oldest results when over capacity. Must be called under self._lock.""" 

1799 if len(self._results) > self._results_max: 

1800 # Remove oldest entries (dict preserves insertion order in Python 3.7+) 

1801 excess = len(self._results) - self._results_max 

1802 for key in list(self._results.keys())[:excess]: 

1803 del self._results[key] 

1804 

1805 # ─── Status / results ─── 

1806 

1807 def get_speculation_status(self, speculation_id: str) -> dict: 

1808 """Get status of a speculative dispatch.""" 

1809 with self._lock: 

1810 if speculation_id in self._active: 

1811 return {'status': 'pending', 'speculation_id': speculation_id} 

1812 if speculation_id in self._results: 

1813 result = self._results[speculation_id] 

1814 return {'status': 'completed', **result} 

1815 return {'status': 'unknown', 'speculation_id': speculation_id} 

1816 

1817 def get_stats(self) -> dict: 

1818 """Get dispatcher statistics.""" 

1819 with self._lock: 

1820 return { 

1821 'active_speculations': len(self._active), 

1822 'completed': len(self._results), 

1823 'total_energy_kwh_24h': round( 

1824 self._registry.get_total_energy_kwh(24), 4), 

1825 } 

1826 

1827 

1828# ─── Module-level singleton ─── 

1829_dispatcher = None 

1830_dispatcher_lock = threading.Lock() 

1831 

1832 

1833def get_speculative_dispatcher() -> SpeculativeDispatcher: 

1834 """Get or create the singleton SpeculativeDispatcher.""" 

1835 global _dispatcher 

1836 if _dispatcher is None: 

1837 with _dispatcher_lock: 

1838 if _dispatcher is None: 

1839 _dispatcher = SpeculativeDispatcher() 

1840 return _dispatcher