Coverage for integrations / agent_engine / speculative_dispatcher.py: 62.1%
457 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Unified Agent Goal Engine - Speculative Dispatcher
4Draft-first + expert-takeover dispatcher.
6Two entry points, one delivery channel:
8 dispatch_draft_first(prompt, ...)
9 DRAFT tier (Qwen3.5-0.8B) replies SYNCHRONOUSLY in ~300ms with an
10 envelope ``{reply, delegate: none|local|hive, confidence, ...}``.
11 The reply is the user's standby answer; ``delegate`` is the draft's
12 self-assessment of whether a heavier model needs to take the turn.
14 Five guards can promote the turn to expert background regardless of
15 the draft's own decision:
16 * refusal pattern in the reply → REFUSAL_OVERRIDE
17 * delegate=none + confidence < floor → LOW_CONFIDENCE
18 * delegate=none + agent_bound prompt → AGENT_BOUND
19 * delegate=none + classifier surfaces actionable intent
20 (channel_connect / is_create_agent / language_change /
21 invite / join_room / memory_query) → ACTIONABLE_INTENT
22 * envelope parse failure → PARSE_FAILURE
23 The canonical taxonomy lives in
24 ``integrations.agent_engine.escalation_reasons.EscalationReason``;
25 callers / observers / WorldModelBridge see the reason in the
26 return dict and in ``self._active[speculation_id]``.
28 dispatch_speculative(prompt, ...)
29 Legacy entry. Fast tier replies synchronously, expert runs in
30 background. Same delivery channel.
32Expert background path
33----------------------
35When a turn escalates, ``_expert_background_task`` dispatches via
36``_dispatch_expert_langchain``:
38 - **Local expert** (``is_local=True``): routes through the full HARTOS
39 /chat pipeline — full tool registry, full system prompt, full agent
40 context. Bundled mode uses an in-process Flask test_client; HTTP
41 mode POSTs to the configured backend. Re-entry guarded by
42 ``speculative=False, draft_first=False`` in the payload.
44 - **Hive expert** (``is_local=False``): registered by
45 ``HiveExpertDiscovery`` from ``peer.capability.announce`` gossip,
46 routed via OpenAI-compatible POST to the peer's ``base_url`` with
47 bearer auth. The hive peer's 27B / fine-tuned model takes the turn
48 directly — no "review the draft" wrapper.
50When the expert returns, ``_deliver_expert_response`` bubble-replaces
51the standby via the existing ``speculation_id`` channel: SSE for the
52chat UI + TTS pupit topic for voice. The fast_response stays only when
53the expert returned empty or got guardrail-blocked.
55Guardrails at every layer
56-------------------------
57- ``ConstitutionalFilter.check_prompt`` before dispatch and on expert output
58- ``HiveCircuitBreaker.is_halted()`` before dispatch and again at task entry
59- ``HiveEthos.rewrite_prompt_for_togetherness`` on every prompt
60- Budget enforcement via ``_check_and_reserve_budget``
61- Energy + latency tracking on every model call
62- Per-peer install-validation gate via ``_pass_validation_gate``
64Hive expert wiring (subscriber-side complete; producer pending)
65---------------------------------------------------------------
66``HiveExpertDiscovery`` listens for ``peer.capability.announce`` /
67``peer.capability.revoke`` on the platform EventBus and auto-registers
68reachable, trust-verified peers as ``ModelTier.EXPERT`` backends.
69Until peers start emitting the announce gossip (producer daemon ships
70separately), ``_pick_expert_for_delegate('hive', ...)`` falls back to
71the local fast model and ``served_by`` in telemetry reads
72``local_langchain_bg`` 100% of the time.
73"""
74import atexit
75import json
76import logging
77import os
78import re
79import time
80import uuid
81import threading
82from collections import deque
84from core.port_registry import get_port
85from concurrent.futures import ThreadPoolExecutor
86from typing import Dict, List, Optional
88# Eager-import the history loader so the per-call inline import in
89# dispatch_draft_first doesn't hit Python's import lock on every chat
90# (sys.modules cache makes subsequent calls fast, but the FIRST call
91# of a freshly-warm process still pays the lock cost). Imported via
92# try/except to keep the module load tolerant of test environments
93# that mock out the social subtree.
94try:
95 from integrations.channels.memory.shared_history import (
96 seed_autogen_from_shared_history)
97except ImportError:
98 seed_autogen_from_shared_history = None # type: ignore[assignment]
100from integrations.agent_engine.escalation_reasons import EscalationReason
102logger = logging.getLogger('hevolve_social')
105# Minimum draft confidence required to commit the draft's reply as the
106# FINAL answer (delegate="none" path). Below this we schedule an expert
107# verification in the background regardless of what the draft said —
108# reasoning quality must never regress just because the 0.8B thought
109# it could handle the question. A confident "none" still takes the
110# fast path; an unsure "none" is treated as a quiet "local".
111_DRAFT_CONFIDENCE_FLOOR = 0.85
113# Refusal patterns the draft must NEVER emit. See the role-contract
114# block in `_build_draft_classifier_prompt` — the draft is the
115# first-responder, NOT the authority on system capability. Any reply
116# that asserts the system can't do something is a prompt-following
117# failure (or the model slipped a refusal through the system prompt)
118# and must be replaced with a standby + escalation to the expert.
119#
120# Targets HIGH-CONFIDENCE capability refusals only — not legitimate
121# negative phrasing like "I don't know the answer". Match shape:
122# "I" + negation + (capability noun OR system-action verb). Word
123# boundaries + IGNORECASE guard against false positives like
124# "I cannot wait to help".
125# Capability verbs the draft might falsely claim it can't perform.
126# Factored out so the refusal-pattern alternatives stay in sync.
127_REFUSAL_VERBS = (
128 r"access|fetch|reach|browse|connect|connect to|read|retrieve|"
129 r"download|verify|view|see|check|crawl|open|load|hit|resolve|"
130 r"directly|currently|presently"
131)
132# Capability nouns paired with "I do(n't| not) have <NOUN>" or
133# "I lack <NOUN>" / "I have no <NOUN>" — anything that frames an
134# absent capability rather than negative recall ("I don't know").
135_REFUSAL_NOUNS = (
136 r"access|tools?|the ability|the capability|permission|a way|any way|"
137 r"built-?in|external|the internet|web access|internet access|"
138 r"means|way to"
139)
140_REFUSAL_PATTERN = re.compile(
141 r"\b(?:"
142 # "I cannot/can't <optional softener> <verb>"
143 r"I (?:can'?t|cannot)\s+"
144 r"(?:directly\s+|currently\s+|presently\s+)?"
145 r"(?:" + _REFUSAL_VERBS + r")"
146 r"|"
147 # "I am unable/not able TO <optional softener> <verb>"
148 # and contraction "I'm unable/not able TO <verb>"
149 # (regex split because "I'm" has no space between I and m,
150 # while "I am" requires the space)
151 r"(?:I'?m|I am)\s+(?:unable|not able)\s+to\s+"
152 r"(?:directly\s+|currently\s+|presently\s+)?"
153 r"(?:" + _REFUSAL_VERBS + r")"
154 r"|"
155 # "I (don't|do not) have <noun>" e.g. "I don't have built-in tools"
156 r"I do(?:n'?t| not) have\s+"
157 r"(?:" + _REFUSAL_NOUNS + r")"
158 r"|"
159 # "I lack <noun>" e.g. "I lack access to GitHub"
160 r"I lack\s+(?:" + _REFUSAL_NOUNS + r")"
161 r"|"
162 # "I have no <noun>" e.g. "I have no tools to retrieve…"
163 r"I have no\s+(?:access|way|tools?|ability|means)"
164 r"|"
165 # "I'm just/only a (large language model|LLM|AI|chatbot|…)"
166 # — split for I'm vs I am as above.
167 r"(?:I'?m|I am) (?:just|only) (?:a|an) "
168 r"(?:large language model|LLM|AI|language model|chatbot|"
169 r"text-based assistant)"
170 r")",
171 re.IGNORECASE,
172)
174# Generic standby reply substituted when the draft slips a refusal
175# through. Keeps the user comfortable while the expert path runs.
176# Intentionally short and capability-neutral — the expert's actual
177# answer will replace this within the latency budget.
178_REFUSAL_STANDBY_REPLY = "Let me check that for you…"
181def _capability_summary_safe() -> str:
182 """Return the runtime capability summary for the draft prompt, or
183 an empty string when the helper itself can't be imported.
185 Lazy import — tool_allowlist pulls model_registry / ModelCatalog /
186 MCP / channel subsystems on first call. In bare unit-test envs
187 those aren't loaded, so we treat any failure as 'no summary' and
188 let the rest of the prompt carry on. Empty string is the signal
189 the f-string in _build_draft_classifier_prompt skips the section.
190 """
191 try:
192 from integrations.agent_engine.tool_allowlist import (
193 get_capability_summary,
194 )
195 return get_capability_summary() or ""
196 except Exception:
197 return ""
200class SpeculativeDispatcher:
201 """Fast-first, expert-takeover speculative execution engine.
203 Every method enforces guardrails — no code path bypasses safety.
204 """
206 def __init__(self, model_registry=None):
207 from .model_registry import model_registry as _default_registry
208 self._registry = model_registry or _default_registry
209 self._expert_pool = ThreadPoolExecutor(
210 max_workers=int(os.environ.get('HEVOLVE_EXPERT_WORKERS', '4')),
211 thread_name_prefix='spec_expert',
212 )
213 atexit.register(lambda: self._expert_pool.shutdown(wait=False))
214 self._active: Dict[str, dict] = {} # speculation_id → metadata
215 self._lock = threading.Lock()
216 self._results: Dict[str, dict] = {} # speculation_id → expert result
217 self._results_max = 1000 # evict oldest when exceeded
218 # HiveMind fusion consult results (speculation_id → {result, ts}).
219 # Populated by the best-effort `_schedule_hive_consult` fired when
220 # the user selected `intelligence_preference='hive_preferred'` AND
221 # the draft self-delegated to hive. Read by observers / tests —
222 # does NOT feed back into the chat reply path (non-blocking; the
223 # hot-path latency budget is preserved). Capped at 1000 entries
224 # via the same eviction helper as `_results`.
225 self._last_hive_consult: Dict[str, dict] = {}
227 # ─── Gate: should we speculate? ───
229 def should_speculate(self, user_id: str, prompt_id: str,
230 prompt: str, goal: dict = None) -> bool:
231 """Gate: expert model available + budget remaining + not halted + not casual."""
232 # GUARDRAIL: circuit breaker
233 from security.hive_guardrails import HiveCircuitBreaker
234 if HiveCircuitBreaker.is_halted():
235 return False
237 # GUARDRAIL: constitutional check on prompt
238 from security.hive_guardrails import ConstitutionalFilter
239 passed, _ = ConstitutionalFilter.check_prompt(prompt)
240 if not passed:
241 return False
243 # Need both a fast and expert model
244 fast = self._registry.get_fast_model()
245 expert = self._registry.get_expert_model()
246 if not fast or not expert:
247 return False
248 if fast.model_id == expert.model_id:
249 return False # Same model — no point speculating
251 # Budget check (if goal has spark budget)
252 if goal and goal.get('spark_budget', 0) > 0:
253 spent = goal.get('spark_spent', 0)
254 remaining = goal['spark_budget'] - spent
255 # Estimate ~4k tokens per speculation (prompt + response)
256 if remaining < expert.cost_per_1k_tokens * 4:
257 return False
259 return True
261 # ─── Main entry point ───
263 def dispatch_speculative(self, prompt: str, user_id: str, prompt_id: str,
264 goal_id: str = None, goal_type: str = 'general',
265 node_id: str = None) -> dict:
266 """
267 1. Guardrail-check the prompt
268 2. Pick fast model → dispatch synchronously → user gets response
269 3. Record compute contribution for hive node (ad revenue)
270 4. Pick expert model → dispatch in background thread
271 5. Return fast response immediately
273 Returns:
274 {
275 'response': str, # Fast agent's response
276 'speculation_id': str, # Track the background expert
277 'fast_model': str, # Which model served fast
278 'expert_pending': bool, # True if expert is working
279 'latency_ms': float, # Fast response latency
280 'energy_kwh': float, # Energy consumed
281 }
282 """
283 speculation_id = str(uuid.uuid4())[:12]
285 # GUARDRAIL: circuit breaker
286 from security.hive_guardrails import HiveCircuitBreaker
287 if HiveCircuitBreaker.is_halted():
288 return {'response': '', 'speculation_id': speculation_id,
289 'error': 'Hive is halted', 'expert_pending': False}
291 # GUARDRAIL: constitutional filter
292 from security.hive_guardrails import ConstitutionalFilter
293 passed, reason = ConstitutionalFilter.check_prompt(prompt)
294 if not passed:
295 return {'response': '', 'speculation_id': speculation_id,
296 'error': reason, 'expert_pending': False}
298 # GUARDRAIL: rewrite prompt for togetherness
299 from security.hive_guardrails import HiveEthos
300 prompt = HiveEthos.rewrite_prompt_for_togetherness(prompt)
302 # ── FAST PATH ──
303 fast_model = self._registry.get_fast_model()
304 if not fast_model:
305 return {'response': '', 'speculation_id': speculation_id,
306 'error': 'No fast model available', 'expert_pending': False}
308 start = time.time()
309 fast_response = self._dispatch_to_model(
310 fast_model, prompt, user_id, prompt_id, goal_type, goal_id)
311 elapsed_ms = (time.time() - start) * 1000
312 self._track_call_telemetry(fast_model, elapsed_ms, node_id)
314 # ── EXPERT PATH (background) ──
315 expert_model = self._registry.get_expert_model()
316 expert_pending = self._schedule_expert_background(
317 speculation_id=speculation_id,
318 prompt=prompt,
319 fast_response=fast_response,
320 expert_model=expert_model,
321 user_id=user_id, prompt_id=prompt_id,
322 goal_id=goal_id, goal_type=goal_type,
323 origin_model_id=fast_model.model_id,
324 origin_model_role='fast_model',
325 )
327 return {
328 'response': fast_response,
329 'speculation_id': speculation_id,
330 'fast_model': fast_model.model_id,
331 'expert_pending': expert_pending,
332 'latency_ms': round(elapsed_ms, 1),
333 'energy_kwh': round(
334 self._registry.get_total_energy_kwh(hours=0.01), 6),
335 }
337 # ─── Draft-first dispatch (Qwen3.5-0.8B standby + delegate signal) ───
339 def dispatch_draft_first(self, prompt: str, user_id: str, prompt_id: str,
340 goal_id: str = None, goal_type: str = 'general',
341 node_id: str = None,
342 agent_persona: Optional[str] = None,
343 preferred_lang: str = 'en',
344 user_pref: str = 'auto',
345 agent_bound: bool = False) -> dict:
346 """Draft-first dispatch: tiny model answers immediately, signals whether
347 to delegate.
349 Architecture (the piece the user asked for on top of the speculative
350 dispatcher):
352 1. The DRAFT tier model (Qwen3.5-0.8B) receives a wrapped prompt that
353 asks it to emit JSON:
354 { "reply": "...",
355 "delegate": "none" | "local" | "hive",
356 "confidence": 0.0-1.0 }
357 `delegate` is the draft's self-assessment of its place in the
358 hierarchy: can it answer, or should a bigger model take over?
359 2. Regardless of the delegate signal, the draft's ``reply`` is
360 returned SYNCHRONOUSLY as a standby response — the user sees
361 something within ~300ms even when delegation is needed.
362 3. When ``delegate != "none"`` (or the JSON can't be parsed), a
363 background expert task runs on the local FAST tier or is
364 dispatched to the hive — same code path as dispatch_speculative.
365 4. Both the draft's reply AND the eventual expert reply get fed
366 through ``WorldModelBridge.record_interaction`` with distinct
367 model_id tags so HevolveAI's continual learner can distill
368 the expert's improvements back into the draft over time.
370 Guardrails: the outer /chat handler already ran GuardrailEnforcer +
371 prompt_guard before calling us, so we only re-check constitutional
372 filter (cheap) and circuit breaker here. Budget + hive_ethos still
373 apply on the expert path via dispatch_speculative's helpers.
375 Returns a dict shaped like dispatch_speculative's response plus
376 ``delegate``, ``draft_model``, and ``draft_confidence`` fields so
377 callers can discriminate.
378 """
379 speculation_id = str(uuid.uuid4())[:12]
381 # ── 1. Local preconditions (cheapest first, so a missing registry
382 # entry never triggers an unnecessary network probe).
383 draft_model = self._registry.get_draft_model()
384 if draft_model is None:
385 return {
386 'response': '', 'speculation_id': speculation_id,
387 'delegate': 'none', 'error': 'no_draft_model',
388 'expert_pending': False,
389 }
391 # ── Language guard: skip 0.8B draft for non-Latin scripts ──
392 # Qwen3.5-0.8B-UD-Q4_K_XL has weak Unicode coverage for the
393 # scripts listed in core.constants.NON_LATIN_SCRIPT_LANGS —
394 # falls back to Latin-transliterated output ("Vanakkam! Nan
395 # ungal nanban..." for Tamil) which is unusable for TTS + UX.
396 # The 4B main has native Unicode coverage. Canonical set lives
397 # in core.constants; this file imports rather than duplicates.
398 from core.constants import NON_LATIN_SCRIPT_LANGS
399 _lang_key = (preferred_lang or 'en').split('-')[0].lower()
400 if _lang_key and _lang_key != 'en' and _lang_key in NON_LATIN_SCRIPT_LANGS:
401 logger.info(
402 f"Skipping 0.8B draft for preferred_lang={_lang_key!r} "
403 f"(weak Unicode script coverage); routing direct to 4B.",
404 )
405 return {
406 'response': '', 'speculation_id': speculation_id,
407 'delegate': 'none', 'error': 'draft_skipped_non_english',
408 'expert_pending': False,
409 }
411 # ── 2. Gate checks (constitutional + circuit breaker + draft server probe)
412 gate_error = self._check_draft_first_gates(prompt)
413 if gate_error is not None:
414 return {
415 'response': '', 'speculation_id': speculation_id,
416 'delegate': 'none', 'error': gate_error, 'expert_pending': False,
417 }
419 # ── 2. Load recent conversation history (best-effort, non-fatal) ──
420 # Single source of truth — same seed_autogen_from_shared_history the
421 # autogen GroupChat uses. Without this the draft sees each turn as
422 # first-contact and emits generic greetings for follow-ups (witnessed
423 # 2026-05-09 09:35 — user asked about WhatsApp at 09:34 then "what's
424 # happening?" at 09:35; draft replied with a generic "Nothing
425 # unusual…" because it had no memory of the WhatsApp turn 60s prior).
426 # Cap at 4 messages — the 0.8B context budget can't fit more without
427 # crowding out the answering rules + JSON schema.
428 recent_turns: List[Dict] = []
429 try:
430 if seed_autogen_from_shared_history is None:
431 raise ImportError(
432 "shared_history.seed_autogen_from_shared_history "
433 "not importable in this environment")
434 recent_turns = seed_autogen_from_shared_history(
435 user_id, max_messages=4) or []
436 # NO dedup against the current prompt — users genuinely
437 # repeat themselves (rephrasing, asking again, "hi" / "hi"
438 # in casual back-and-forth). Dropping a recent turn just
439 # because its text matches the current prompt would silently
440 # lose legitimate conversation history. If the writer hook
441 # in get_response_group / chatbot_routes persisted the
442 # current turn before this dispatch ran, the prompt may
443 # appear twice in the LLM's input — that's the correct
444 # natural-language signal ("user said X, then said X again")
445 # and the model should treat it as such.
446 except Exception as _hist_err:
447 logger.debug(f"draft history load failed: {_hist_err}")
448 recent_turns = []
450 # ── 3. Dispatch the draft with the classifier prompt ──
451 draft_prompt = self._build_draft_classifier_prompt(
452 prompt, agent_persona=agent_persona, preferred_lang=preferred_lang,
453 recent_turns=recent_turns)
454 start = time.time()
455 draft_raw = self._dispatch_to_model(
456 draft_model, draft_prompt, user_id, prompt_id, goal_type, goal_id)
457 draft_latency_ms = (time.time() - start) * 1000
458 self._track_call_telemetry(draft_model, draft_latency_ms, node_id)
460 # ── 3. Parse envelope + record draft interaction ──
461 parsed = self._parse_draft_envelope(draft_raw)
462 draft_reply = parsed.get('reply') or draft_raw.strip()[:500]
463 # Track whether the envelope parsed at all — an empty parsed dict
464 # means we fell through to the delegate='local' default below,
465 # which is materially different from the model emitting an
466 # explicit 'local' decision. Stamp the reason for downstream
467 # telemetry / continual-learning before any guard runs.
468 parse_failed = not parsed
469 delegate = parsed.get('delegate', 'local') # default on parse fail
470 confidence = float(parsed.get('confidence') or 0.0)
471 # Canonical reason value — refined by each guard below. Starts
472 # at PARSE_FAILURE when the envelope didn't parse, otherwise the
473 # baseline CLASSIFIER_DELEGATE. Only meaningful when we end up
474 # in the delegate in ('local', 'hive') branch — None otherwise.
475 escalation_reason: Optional[EscalationReason] = (
476 EscalationReason.PARSE_FAILURE if parse_failed
477 else EscalationReason.CLASSIFIER_DELEGATE
478 )
480 # REFUSAL GUARD: the draft is the first-responder role, NOT the
481 # authority on system capability. Any reply that asserts the
482 # system can't do something is a prompt-following failure — the
483 # role contract in the classifier prompt explicitly forbids
484 # refusals of this shape. When the model slips one through
485 # anyway (typically when the user asks for a tool-bound
486 # capability the draft can't see — URL fetch, file read,
487 # GitHub PR check, etc.), we replace the standby with a generic
488 # holding reply and force escalation to the local expert. The
489 # user never sees the refusal; the expert (with full tool
490 # access) produces the real answer and the SSE/WAMP fan-out
491 # delivers it to replace the standby.
492 # Size-agnostic — same rule applies whether the draft slot
493 # holds a 0.8B, 4B, or 27B model. None of them see the full
494 # tool registry the expert binds.
495 refusal_overridden = False
496 if draft_reply and _REFUSAL_PATTERN.search(draft_reply):
497 logger.info(
498 "draft-first: refusal detected in draft reply "
499 "(delegate=%r, conf=%.2f) — replacing with standby + "
500 "forcing delegate=local. Original reply prefix: %r",
501 delegate, confidence, draft_reply[:120],
502 )
503 draft_reply = _REFUSAL_STANDBY_REPLY
504 delegate = 'local'
505 refusal_overridden = True
506 escalation_reason = EscalationReason.REFUSAL_OVERRIDE
508 # REASONING-QUALITY GUARD: an unsure "none" is not good enough to
509 # ship as the final answer. Promote it to "local" so an expert
510 # verifier still runs in the background. Keeps the single
511 # dispatch path — this is just delegate normalization, no new
512 # branch below. Ensures the draft model can never regress the
513 # reasoning quality the user gets — worst case they see the
514 # draft reply briefly as a standby and it's replaced when the
515 # 4B expert finishes via the existing crossbar delivery.
516 if delegate == 'none' and confidence < _DRAFT_CONFIDENCE_FLOOR:
517 logger.info(
518 f"draft-first: low-confidence 'none' ({confidence:.2f} < "
519 f"{_DRAFT_CONFIDENCE_FLOOR}) → escalating to local verifier"
520 )
521 delegate = 'local'
522 escalation_reason = EscalationReason.LOW_CONFIDENCE
524 # AGENT-BINDING GUARD: when the caller bound this turn to a
525 # specific agent (prompt_id resolves to a real agent on disk,
526 # not the request-id fallback), the user has chosen a
527 # specialist and expects THAT specialist's voice — not the
528 # 0.8B draft answering in its generic voice. Even a trivial
529 # greeting like "hi" should pass through the specialist so
530 # its persona / system prompt / tool registry shapes the
531 # reply. Promote delegate=none → local so the expert path
532 # always takes the turn for agent-bound requests.
533 #
534 # When agent_bound=False (no specific agent in scope, e.g.
535 # default chat or guest free-floating), the draft's "none"
536 # decision stays — the 0.8B can handle trivial questions
537 # without paying the 4B cost.
538 if delegate == 'none' and agent_bound:
539 logger.info(
540 "draft-first: prompt_id=%r is bound to a specific "
541 "agent — escalating delegate=none → 'local' so the "
542 "agent's expert path takes the turn instead of the "
543 "0.8B draft's generic voice.",
544 prompt_id,
545 )
546 delegate = 'local'
547 escalation_reason = EscalationReason.AGENT_BOUND
549 # ACTIONABLE-INTENT GUARD: when the draft's own classifier
550 # surfaces an actionable intent flag (channel_connect,
551 # is_create_agent, language_change), answering in-band would
552 # orphan the action — there is no way to invoke the matching
553 # tool (Connect_Channel / Create_Agent) from the casual draft
554 # path because casual_conv=True skips the full tool registry
555 # in hart_intelligence_entry.get_ans (see the is_first=True
556 # branch). Promote delegate=none → 'local' so the expert
557 # turn binds the full registry — Connect_Channel for
558 # channel-add intents, Create_Agent for agent-build intents —
559 # and actually fires the tool the LLM would otherwise have
560 # described in free-form text.
561 #
562 # The draft's reply is also replaced with the standby (same
563 # rationale as REFUSAL GUARD above): the draft on the
564 # casual path has no tool access, so any in-band reply on
565 # an actionable-intent turn is a verified-signal anti-pattern
566 # — it claims action while taking none. The expert reply
567 # replaces the standby via the existing speculation_id
568 # bubble-replacement path (#204 already shipped).
569 # Helper: treat the sentinel string 'none' as "no intent", matching
570 # the classifier's own contract (it returns 'none' for *_intent /
571 # memory_query when there's no actionable intent). Without this,
572 # the literal 'none'.strip() evaluates truthy and the guard fires
573 # on every casual turn → every reply gets replaced with the
574 # "Let me check that for you…" placeholder, and the user never
575 # receives the actual answer because the expert background task
576 # has nothing real to refine (live evidence 2026-05-11 22:36:41
577 # request 897fc534 — speculation_id 9a418ac4-1eb, message 'strange').
578 def _intent_set(value) -> bool:
579 if not value or value is False:
580 return False
581 v = str(value).strip().lower()
582 return bool(v) and v != 'none'
584 if delegate == 'none' and (
585 _intent_set(parsed.get('channel_connect'))
586 or parsed.get('is_create_agent')
587 or _intent_set(parsed.get('language_change'))
588 or _intent_set(parsed.get('invite_intent'))
589 or _intent_set(parsed.get('join_room_intent'))
590 or _intent_set(parsed.get('memory_query'))
591 ):
592 logger.info(
593 "draft-first: actionable intent flag set "
594 "(channel_connect=%r, is_create_agent=%r, "
595 "language_change=%r, invite_intent=%r, "
596 "join_room_intent=%r, memory_query=%r) — escalating "
597 "delegate=none → 'local' so the expert tool registry "
598 "handles the turn.",
599 parsed.get('channel_connect'),
600 parsed.get('is_create_agent'),
601 parsed.get('language_change'),
602 parsed.get('invite_intent'),
603 parsed.get('join_room_intent'),
604 parsed.get('memory_query'),
605 )
606 draft_reply = _REFUSAL_STANDBY_REPLY
607 delegate = 'local'
608 escalation_reason = EscalationReason.ACTIONABLE_INTENT
610 # Non-Latin languages skip draft entirely (hart_intelligence_entry.py)
611 # so this code path is only reached for English/Latin-script languages.
613 # ── Draft telemetry: log full envelope for offline calibration ──
614 # The data scientist requires this to build a confidence calibration
615 # curve and detect intent classification drift over time.
616 try:
617 _telemetry = {
618 'speculation_id': speculation_id,
619 'user_id': user_id,
620 'confidence': confidence,
621 'delegate': delegate,
622 'is_casual': parsed.get('is_casual'),
623 'is_correction': parsed.get('is_correction'),
624 'is_create_agent': parsed.get('is_create_agent'),
625 'channel_connect': parsed.get('channel_connect'),
626 'language_change': parsed.get('language_change'),
627 'draft_model': draft_model.model_id if draft_model else None,
628 'latency_ms': draft_latency_ms,
629 'reply_len': len(draft_reply) if draft_reply else 0,
630 'escalated': delegate != parsed.get('delegate', 'local'),
631 # refusal_overridden lets us calibrate per-model adherence
632 # to the role contract. A draft model with the right
633 # prompt should hit this near-zero — sustained non-zero
634 # rate is a signal that either the prompt isn't being
635 # followed (model too small / fine-tune mismatch) or the
636 # model is the wrong fit for the draft slot on this
637 # hardware.
638 'refusal_overridden': refusal_overridden,
639 }
640 logger.info(f"draft-telemetry: {json.dumps(_telemetry)}")
641 except Exception:
642 pass # telemetry must never break the hot path
644 # When the draft path's net effect is "draft answered, no
645 # escalation" (delegate=='none' and none of the guards fired),
646 # the escalation_reason is meaningless — clear it so the
647 # WorldModelBridge sees a clean "this draft was the final
648 # answer" record.
649 recorded_reason = (
650 escalation_reason.value if delegate in ('local', 'hive')
651 else None
652 )
653 self._record_interaction_safely(
654 user_id=user_id, prompt_id=prompt_id, prompt=prompt,
655 response=draft_reply, model_id=draft_model.model_id,
656 latency_ms=draft_latency_ms, node_id=node_id, goal_id=goal_id,
657 escalation_reason=recorded_reason,
658 )
660 # ── 4. Schedule expert if the draft self-delegated ──
661 expert_pending = False
662 hive_consult_scheduled = False
663 if delegate in ('local', 'hive'):
664 expert_model = self._pick_expert_for_delegate(
665 delegate, user_pref=user_pref)
666 expert_pending = self._schedule_expert_background(
667 speculation_id=speculation_id,
668 prompt=prompt,
669 fast_response=draft_reply,
670 expert_model=expert_model,
671 user_id=user_id, prompt_id=prompt_id,
672 goal_id=goal_id, goal_type=goal_type,
673 origin_model_id=draft_model.model_id,
674 origin_model_role='draft_model',
675 delegate=delegate,
676 escalation_reason=escalation_reason,
677 )
678 # When the user explicitly asked for `hive_preferred` AND the
679 # draft self-delegated to hive, also fire a best-effort MoE
680 # HiveMind fusion consult in the background. The consult
681 # result is stored on self._last_hive_consult for observers
682 # and future wiring; it does NOT replace the expert's reply
683 # on the hot path (preserves the 1.5s chat budget). Safe on
684 # `auto` and `local_only` paths — gated by user_pref so
685 # existing callers see no behavior change.
686 if delegate == 'hive' and user_pref == 'hive_preferred':
687 hive_consult_scheduled = self._schedule_hive_consult(
688 prompt=prompt,
689 user_id=user_id,
690 speculation_id=speculation_id,
691 )
693 # Channel name defensively coerced: draft model sometimes emits None,
694 # null, or a capitalised string. Normalise to a lowercased str so
695 # callers can treat an empty string as "no channel connect intent".
696 _channel = parsed.get('channel_connect') or ''
697 if not isinstance(_channel, str):
698 _channel = ''
699 # Language change — same defensive coercion as channel_connect.
700 # Validated against the canonical SUPPORTED_LANG_DICT (single
701 # source of truth for language codes, lives in hart_intelligence_entry).
702 _lang = parsed.get('language_change') or ''
703 if not isinstance(_lang, str):
704 _lang = ''
705 _lang = _lang.strip().lower()[:5]
706 if _lang:
707 from core.safe_hartos_attr import safe_hartos_attr
708 SUPPORTED_LANG_DICT = safe_hartos_attr('SUPPORTED_LANG_DICT')
709 if SUPPORTED_LANG_DICT is not None:
710 if _lang not in SUPPORTED_LANG_DICT:
711 logger.debug(
712 "draft: language_change '%s' not in "
713 "SUPPORTED_LANG_DICT — ignoring", _lang)
714 _lang = ''
715 # else: HARTOS not yet loaded — accept the code as-is, same
716 # fall-through the original ImportError branch had.
717 return {
718 'response': draft_reply,
719 'speculation_id': speculation_id,
720 'draft_model': draft_model.model_id,
721 'delegate': delegate,
722 'draft_confidence': confidence,
723 'is_correction': bool(parsed.get('is_correction', False)),
724 'is_casual': bool(parsed.get('is_casual', False)),
725 'is_create_agent': bool(parsed.get('is_create_agent', False)),
726 'channel_connect': _channel.strip().lower(),
727 'language_change': _lang.strip().lower(),
728 'expert_pending': expert_pending,
729 # Additive field: observers (J282, telemetry, admin/diag) can
730 # tell a hive fusion consult was fired in background. Legacy
731 # callers that don't read this field are unaffected.
732 'hive_consult_scheduled': hive_consult_scheduled,
733 # Additive field: which guard promoted this turn to expert
734 # (or ``None`` when the draft answered as final). Canonical
735 # values come from ``EscalationReason`` (see
736 # ``integrations.agent_engine.escalation_reasons``).
737 'escalation_reason': recorded_reason,
738 'latency_ms': round(draft_latency_ms, 1),
739 'energy_kwh': round(
740 self._registry.get_total_energy_kwh(hours=0.01), 6),
741 }
743 # ─── SRP helpers extracted from dispatch_draft_first ───
745 # Class-level toggle the health probe can flip off in tests. Prod
746 # leaves it enabled so dead-port POSTs short-circuit cleanly; unit
747 # tests that mock _dispatch_to_model set it to False so the mocked
748 # dispatch actually runs. Kept as a class attribute (not instance)
749 # so fixtures can patch once for the whole suite.
750 _health_probe_enabled: bool = True
752 def _check_draft_first_gates(self, prompt: str) -> Optional[str]:
753 """Run the cheap gates (circuit breaker + constitutional filter +
754 draft-server health probe) that must pass before we spend any
755 model time.
757 Returns None on success, or an error string identifying which gate
758 rejected the request. Keeps dispatch_draft_first's orchestration
759 thin — this method owns "is the system healthy enough to proceed".
760 """
761 from security.hive_guardrails import HiveCircuitBreaker, ConstitutionalFilter
762 if HiveCircuitBreaker.is_halted():
763 return 'Hive is halted'
764 passed, reason = ConstitutionalFilter.check_prompt(prompt)
765 if not passed:
766 return reason or 'constitutional filter'
767 # Fast TCP probe against the draft server. If the 0.8B caption
768 # server (port 8081) isn't listening, fall through to the normal
769 # 4B path instead of POSTing to a dead port and waiting for a
770 # socket timeout on every chat request. Cache the result for 30s
771 # so we don't probe on every message.
772 if self._health_probe_enabled and not self._draft_server_alive():
773 return 'draft_server_offline'
774 return None
776 _draft_probe_ts: float = 0.0
777 _draft_probe_ok: bool = False
779 def _draft_server_alive(self) -> bool:
780 """Cheap TCP probe against the draft server endpoint. Cached
781 for 30s so the dispatcher stays responsive under chat load.
782 Returns True if a connect() to the draft host:port succeeds."""
783 import socket
784 import time as _t
785 now = _t.time()
786 if now - self._draft_probe_ts < 30.0:
787 return self._draft_probe_ok
788 ok = False
789 try:
790 from core.port_registry import get_local_draft_url
791 url = get_local_draft_url()
792 # http://host:port/v1 → (host, port)
793 _body = url.split('://', 1)[-1].split('/', 1)[0]
794 host, _, port_s = _body.partition(':')
795 port = int(port_s) if port_s else 80
796 with socket.create_connection((host, port), timeout=0.5):
797 ok = True
798 except Exception:
799 ok = False
800 self.__class__._draft_probe_ts = now
801 self.__class__._draft_probe_ok = ok
802 return ok
804 def _build_draft_classifier_prompt(
805 self, user_prompt: str, agent_persona: Optional[str] = None,
806 preferred_lang: str = 'en',
807 recent_turns: Optional[List[Dict]] = None,
808 ) -> str:
809 """Wrap the user prompt with the draft-first classifier instruction.
811 The draft's job is twofold: (a) produce a short standby reply fit
812 for simple chat, (b) self-assess whether a bigger model is needed.
813 The JSON schema is flat so a 0.8B model can reliably emit it.
815 If ``agent_persona`` is provided, it's prepended to the instruction
816 so the draft's reply is in the voice of the custom / system agent
817 the user is talking to instead of a generic first-responder. Used
818 for the Path-2 system-agent case (e.g. Nunba personality agent).
820 If ``recent_turns`` is provided, prior conversation context is
821 rendered as a "Recent conversation" block before the current user
822 prompt so the draft can answer follow-ups in context (e.g. user
823 asks "what's happening?" 60s after asking about WhatsApp — without
824 history the draft would treat each turn as first-contact and emit
825 a generic greeting). Capped at 4 turns to fit the 0.8B context
826 budget; oldest first; long messages are truncated to 400 chars.
828 Owns ONLY prompt construction — no I/O, no side effects.
829 """
830 # Default brand identity when no explicit persona is supplied.
831 # Without this fall-back, the user's "who are you?" turn drops
832 # through to the underlying model's training-default name
833 # ("I'm Qwen3.5...") because cf3e337 deliberately removed every
834 # "You are <internal-role>" sentence to fix an identity-leak
835 # where the draft echoed "first-responder" architecture jargon.
836 # That fix was correct in spirit but went one step too far —
837 # the BRAND identity (the user-facing product name "Nunba") is
838 # not architecture jargon and is exactly what the user expects
839 # to hear when no per-agent persona is selected. The
840 # ``agent_persona`` branch below overrides this for any turn
841 # where a specific persona is in scope, so explicit personas
842 # are unaffected.
843 #
844 # Single source of truth — core.constants.NUNBA_BRAND_IDENTITY.
845 # Same constant is imported by Nunba's _fallback_chat in
846 # routes/hartos_backend_adapter.py so the two paths can never
847 # drift on brand wording.
848 from core.constants import NUNBA_BRAND_IDENTITY
849 persona_block = f"{NUNBA_BRAND_IDENTITY}\n\n"
850 if agent_persona:
851 # Cap the persona at ~800 chars so a long system prompt doesn't
852 # blow the 0.8B model's context budget on a single-turn call.
853 snippet = agent_persona.strip()[:800]
854 persona_block = (
855 "You are playing the following persona — reply in this "
856 "voice, but keep the JSON schema below exactly as "
857 "specified. Persona:\n"
858 f"{snippet}\n\n"
859 )
860 lang_block = ''
861 if preferred_lang and not preferred_lang.startswith('en'):
862 try:
863 from core.constants import SUPPORTED_LANG_DICT
864 lang_name = SUPPORTED_LANG_DICT.get(preferred_lang[:2], preferred_lang)
865 except ImportError:
866 lang_name = preferred_lang
867 # Same language + tone prompt the 4B path uses (with examples, code-mixing rules)
868 _tone = ''
869 try:
870 from core.agent_personality import get_regional_tone_prompt
871 _tone = get_regional_tone_prompt(preferred_lang)
872 except Exception:
873 pass
874 lang_block = (
875 f"Answer questions accurately and respond as quickly as possible in {lang_name}. "
876 f"Keep responses under 200 words. Be colloquial and natural.\n"
877 f"{_tone}\n\n"
878 )
880 # Compute the runtime capability summary ONCE per prompt build so
881 # the static-tools / ModelCatalog / MCP / channel walks don't run
882 # twice for the conditional injection below.
883 cap_summary = _capability_summary_safe()
884 cap_block = (
885 f"Available capabilities (the system can do these via the "
886 f"routing path below): {cap_summary}.\n\n"
887 if cap_summary else ""
888 )
890 # Recent conversation context — single source via
891 # seed_autogen_from_shared_history (the autogen path uses the same
892 # call), formatted into a flat User:/Assistant: transcript so the
893 # 0.8B can read follow-ups in context. Capped at 4 turns + 400
894 # chars/turn to fit the draft's context budget.
895 history_block = ""
896 if recent_turns:
897 _hist_lines = []
898 for _turn in recent_turns[-4:]:
899 _role = _turn.get('role') or ''
900 _content = (_turn.get('content') or '').strip()
901 if not _content:
902 continue
903 if len(_content) > 400:
904 _content = _content[:400] + '…'
905 if _role == 'user':
906 _hist_lines.append(f"User: {_content}")
907 elif _role == 'assistant':
908 _hist_lines.append(f"Assistant: {_content}")
909 if _hist_lines:
910 history_block = (
911 "Recent conversation (oldest first; use this to "
912 "interpret the user's current message in context — "
913 "follow-ups like 'what's happening?' or 'why?' refer "
914 "back to these turns):\n"
915 + "\n".join(_hist_lines)
916 + "\n\n"
917 )
919 return (
920 persona_block
921 + lang_block
922 # ── Job + answering rules — size-agnostic, identity-free ─────
923 # Same wording works whether the draft is 0.8B, 4B, or 27B.
924 # The model in this slot does NOT see the full tool registry
925 # (web fetch, code exec, GitHub, filesystem, vision, computer
926 # control, MCP servers, channels), the user's loaded persona,
927 # multi-turn memory, or the ReAct loop — so it must never
928 # refuse on behalf of the system.
929 #
930 # The 3ea8648 prompt opened with "You are a fast local
931 # first-responder" and the model would echo it verbatim on
932 # "who are you?" → "I'm your fast local first-responder,
933 # ready to assist you right away." cf3e337 fixed that by
934 # removing every internal-role identity sentence, but went
935 # one step too far — with NO identity at all the 0.8B fell
936 # through to its training-default name ("I'm Qwen…"). The
937 # default brand identity ("You are Nunba…") now lives in
938 # persona_block above, so this section deliberately does
939 # NOT add another "You are <X>" line — only the BRAND
940 # identity above is allowed; INTERNAL-ROLE jargon
941 # ("first-responder", "draft", "classifier") stays out.
942 # All instructions below are phrased as the *job* and
943 # *rules*, never as architecture identity.
944 + "Your job is to produce a short reply to the user AND "
945 "classify the user's intent on several independent axes. The "
946 "classification flags route the message downstream — be "
947 "accurate.\n\n"
948 # Positive capability summary — primary teaching mechanism so
949 # the model knows what the system CAN do. Auto-discovered:
950 # static tool list + ModelCatalog (TTS/STT/VLM/video/audio,
951 # rolled up by type) + MCP servers + channels + expert-agent
952 # categories. Computed once into cap_block above.
953 + cap_block
954 + "ANSWERING RULES — READ BEFORE REPLYING:\n"
955 "You only see this single turn. The system's actual tool / "
956 "integration / capability set is dynamic and not visible from "
957 "here — so you don't get to decide what the system can or "
958 "can't do. Therefore:\n"
959 "- NEVER write 'I cannot', 'I don't have access', 'I'm unable', "
960 "'I'm just a', 'I do not have the ability', or any phrase "
961 "asserting the system can't do something.\n"
962 "- NEVER claim no internet/tools/file access; you have no way "
963 "to verify what is or isn't reachable.\n"
964 "- You MAY answer directly ONLY for trivial recall, simple "
965 "math, greetings, or questions that need NO live data, NO "
966 "external system access, NO filesystem, NO code execution, "
967 "and NO per-user state beyond this single turn.\n"
968 "- For ANYTHING else (URLs, code, files, current state, "
969 "multi-step reasoning, capability questions): set "
970 "delegate=\"local\" (or \"hive\" for very large requests) "
971 "and write a brief standby reply such as \"Let me check that "
972 "for you…\", \"Looking that up…\", or \"One moment…\". The "
973 "standby is replaced by the authoritative answer automatically "
974 "— your only job is to keep the user comfortable while that "
975 "runs.\n"
976 "- Refusals are not your call. If you ever feel the urge to "
977 "refuse: pick a standby instead and delegate.\n\n"
978 + history_block
979 + f"User: {user_prompt}\n\n"
980 "Respond with ONE JSON object on a single line and NOTHING else:\n"
981 '{"reply": "<your short reply to the user, 1-3 sentences>", '
982 '"delegate": "none" OR "local" OR "hive", '
983 '"confidence": <float 0-1>, '
984 '"is_correction": true OR false, '
985 '"is_casual": true OR false, '
986 '"is_create_agent": true OR false, '
987 '"channel_connect": "<channel name or empty string>", '
988 '"language_change": "<ISO 639-1 code or empty string>", '
989 '"invite_intent": "<short context if user wants invite link, or empty>", '
990 '"join_room_intent": "<platform + room/url if user wants agent to join, or empty>", '
991 '"memory_query": "<short context if user asks about past conversations, or empty>", '
992 '"reason": "<why you chose this delegate value>"}\n\n'
993 # ── delegate ────────────────────────────────────────────────
994 "delegate: Use \"none\" for greetings, small-talk, factual "
995 "questions you can fully answer yourself, or anything that needs "
996 "no external tools. Use \"local\" if the request needs tools, "
997 "code, reasoning, or multi-step work the 4B model can handle. "
998 "Use \"hive\" if it needs large-model expertise, long-context "
999 "research, or specialized skill distribution.\n\n"
1000 # ── is_correction ────────────────────────────────────────────
1001 "is_correction: true when the user is telling you something "
1002 "in the previous assistant turn was wrong, inaccurate, or "
1003 "they're restating what they actually meant (e.g. 'no that's "
1004 "wrong', 'actually, I meant X', 'not quite', 'you got it "
1005 "wrong'). Otherwise false. Routes the turn into the hive's "
1006 "real-time learning pipeline, so prefer false when unsure.\n\n"
1007 # ── is_casual ────────────────────────────────────────────────
1008 "is_casual: true when the message is pure chit-chat, a "
1009 "greeting, an acknowledgement, or anything that clearly "
1010 "doesn't need any tools, search, computer control, agent "
1011 "creation, or multi-step reasoning. Used to skip the heavy "
1012 "tool-resolution pipeline. If in doubt (looks substantive), "
1013 "prefer false.\n\n"
1014 # ── is_create_agent ─────────────────────────────────────────
1015 "is_create_agent: true when the user is explicitly asking to "
1016 "create, build, train, or set up a NEW AI agent / bot / "
1017 "assistant / automated workflow. Not true for questions "
1018 "ABOUT agents, or for using an existing agent. Routes the "
1019 "turn into the autogen CREATE flow.\n\n"
1020 # ── channel_connect ─────────────────────────────────────────
1021 "channel_connect: if the user is asking to connect, add, "
1022 "link, or set up a messaging channel (WhatsApp, Telegram, "
1023 "Slack, Discord, Gmail, SMS, Teams, Messenger, etc.) put "
1024 "the lowercased channel name here (e.g. \"whatsapp\"). "
1025 "Otherwise use an empty string \"\". This routes the turn "
1026 "to the Connect_Channel tool.\n\n"
1027 # ── language_change ─────────────────────────────────────────
1028 "language_change: if the user is asking to switch language "
1029 "(e.g. \"talk to me in tamil\", \"hablame en español\", "
1030 "\"parle-moi en français\", \"日本語で話して\"), put the "
1031 "ISO 639-1 code here (e.g. \"ta\" for Tamil, \"es\" for "
1032 "Spanish, \"fr\" for French, \"ja\" for Japanese, \"hi\" "
1033 "for Hindi, \"zh\" for Chinese, \"ko\" for Korean, \"ar\" "
1034 "for Arabic, \"de\" for German, \"ru\" for Russian). "
1035 "Otherwise use an empty string \"\". This overrides the "
1036 "session's preferred_lang so the main LLM responds in "
1037 "the requested language and TTS routes to an engine that "
1038 "supports it.\n\n"
1039 # ── invite_intent ───────────────────────────────────────────
1040 "invite_intent: if the user is asking to invite, share, or "
1041 "refer a friend / colleague / family member to Nunba (e.g. "
1042 "\"invite a friend\", \"give me an invite link\", \"share "
1043 "Nunba with my colleague\", \"how do I refer people\"), put "
1044 "a short freeform context here (e.g. \"work friend\" or "
1045 "\"family\") — empty string is fine for a generic shareable "
1046 "link. Otherwise use an empty string \"\". This routes the "
1047 "turn to the Invite_Friend tool.\n\n"
1048 # ── join_room_intent ────────────────────────────────────────
1049 "join_room_intent: if the user is asking the AI to JOIN an "
1050 "external room / channel / meeting / group as a co-pilot, "
1051 "note-taker, or participant (e.g. \"join my Discord audio "
1052 "room\", \"attend my Teams meet\", \"take notes in the "
1053 "WhatsApp family group\", \"co-pilot my Slack channel\"), "
1054 "put a short \"<platform> <room or url>\" string here "
1055 "(e.g. \"discord https://discord.com/channels/123/456\"). "
1056 "Otherwise use an empty string \"\". This routes the turn "
1057 "to the Join_External_Room tool, which always gates on "
1058 "consent and announces the agent's presence in the room.\n\n"
1059 # ── memory_query ────────────────────────────────────────────
1060 "memory_query: if the user is asking about something they "
1061 "discussed previously, what was said in past conversations, "
1062 "or asking the assistant to recall earlier context (e.g. "
1063 "\"what did we speak 2 days back\", \"do you remember when "
1064 "I asked about X\", \"what was that thing we discussed last "
1065 "week\", \"recall my previous question on Y\", \"what did I "
1066 "tell you about my project\"), put a short freeform context "
1067 "string here describing the topic / time window (e.g. "
1068 "\"conversations from last 2 days\", \"earlier project "
1069 "discussion\"). Otherwise use an empty string \"\". This "
1070 "routes the turn to the recall_memory tool which searches "
1071 "the memory graph with optional time filters."
1072 )
1074 def _track_call_telemetry(
1075 self, model: 'ModelBackend', latency_ms: float, node_id: Optional[str],
1076 ) -> None:
1077 """Record the per-model telemetry trio (energy + latency +
1078 compute-contribution for hive reward).
1080 Owns ONLY the telemetry side-effects so dispatch_draft_first,
1081 dispatch_speculative, and any future dispatch variant can share
1082 one call path. No return value — this is fire-and-forget."""
1083 self._registry.record_energy(model.model_id, latency_ms)
1084 self._registry.record_latency(model.model_id, latency_ms)
1085 self._record_compute_contribution(node_id, model.model_id, latency_ms)
1087 def _schedule_expert_background(
1088 self,
1089 speculation_id: str,
1090 prompt: str,
1091 fast_response: str,
1092 expert_model: Optional['ModelBackend'],
1093 user_id: str,
1094 prompt_id: str,
1095 goal_id: Optional[str],
1096 goal_type: str,
1097 origin_model_id: str,
1098 origin_model_role: str = 'fast_model',
1099 delegate: Optional[str] = None,
1100 escalation_reason: Optional['EscalationReason'] = None,
1101 ) -> bool:
1102 """Schedule the expert-improvement task in the background pool.
1104 Centralizes the registration into self._active + thread submit so
1105 both dispatch_draft_first and dispatch_speculative share one code
1106 path. Returns True if the expert was actually scheduled.
1108 ``escalation_reason`` is purely observability metadata: it gets
1109 stamped into ``self._active[speculation_id]`` so admin /diag and
1110 telemetry can ask "why was this turn escalated?" without
1111 re-deriving the heuristic. Optional + defaults to None so the
1112 legacy dispatch_speculative call site (which has no draft to
1113 derive a reason from) needs no change.
1115 Guards:
1116 - no expert model → nothing to schedule
1117 - expert_model.model_id == origin_model_id → pointless, skip
1118 - budget denied → skip
1119 """
1120 if expert_model is None:
1121 return False
1122 if expert_model.model_id == origin_model_id:
1123 return False
1124 if not self._check_and_reserve_budget(user_id, goal_id, expert_model):
1125 return False
1127 with self._lock:
1128 entry = {
1129 origin_model_role: origin_model_id,
1130 'expert_model': expert_model.model_id,
1131 'user_id': user_id,
1132 'prompt_id': prompt_id,
1133 'goal_id': goal_id,
1134 'started_at': time.time(),
1135 }
1136 if delegate is not None:
1137 entry['delegate'] = delegate
1138 if escalation_reason is not None:
1139 # Store the canonical string value (Enum's str inheritance
1140 # makes this safe for JSON / SSE round-trip).
1141 entry['escalation_reason'] = (
1142 escalation_reason.value
1143 if hasattr(escalation_reason, 'value')
1144 else str(escalation_reason)
1145 )
1146 self._active[speculation_id] = entry
1148 self._expert_pool.submit(
1149 self._expert_background_task,
1150 speculation_id, prompt, fast_response,
1151 expert_model, user_id, prompt_id, goal_id, goal_type,
1152 )
1153 return True
1155 def _parse_draft_envelope(self, raw: str) -> dict:
1156 """Extract the {reply, delegate, confidence} JSON the draft should
1157 have produced. Tolerant of markdown fences, prose wrappers, and
1158 trailing commas.
1160 Returns an empty dict on total parse failure — callers should treat
1161 that as 'delegate to local' via the default in dispatch_draft_first."""
1162 if not raw:
1163 return {}
1164 import json as _json
1165 import re as _re
1167 text = raw.strip()
1169 # Strip ```json ... ``` fences if present
1170 fence = _re.search(r'```(?:json)?\s*(\{.*?\})\s*```', text, _re.DOTALL)
1171 if fence:
1172 text = fence.group(1)
1174 # Try raw parse first
1175 try:
1176 return _json.loads(text)
1177 except (_json.JSONDecodeError, TypeError):
1178 pass
1180 # Fall back to the first {...} we can find
1181 match = _re.search(r'\{.*\}', text, _re.DOTALL)
1182 if match:
1183 candidate = match.group(0)
1184 # Trim trailing commas before } or ]
1185 candidate = _re.sub(r',\s*([\}\]])', r'\1', candidate)
1186 try:
1187 return _json.loads(candidate)
1188 except (_json.JSONDecodeError, TypeError):
1189 pass
1191 logger.debug(f"draft envelope parse failed: {raw[:120]!r}")
1192 return {}
1194 def _pick_expert_for_delegate(self, delegate: str,
1195 user_pref: str = 'auto'):
1196 """Select the background model for a given delegate value.
1198 - "local": local FAST-tier model (e.g. Qwen3.5-4B)
1199 - "hive": highest-accuracy hive/expert model, falls back to local
1200 if no remote expert is available
1202 ``user_pref`` honours the Demopage intelligence toggle
1203 (``local_only`` | ``auto`` | ``hive_preferred``):
1205 - ``local_only`` + ``delegate='hive'`` → **downgrade to local
1206 fast model**. The user explicitly opted out of hive compute;
1207 we respect that even if the draft self-delegated.
1208 - ``auto`` (default) → today's behavior: hive delegate picks the
1209 expert, falls back to fast if no expert registered.
1210 - ``hive_preferred`` → today's behavior for model selection; the
1211 ADDITIONAL MoE HiveMind fusion consult is fired separately by
1212 ``dispatch_draft_first`` via ``_schedule_hive_consult``.
1214 Install-validation gate:
1215 A model that was installed via `/api/admin/models/hub/install`
1216 is stamped ``capabilities['install_validated']`` — ``False`` on
1217 register, flipped to ``True`` only when the post-download
1218 ``loader.load()`` probe succeeded. If the dispatcher picks an
1219 entry whose validation has not yet flipped (or explicitly
1220 failed), fall back to the local fast model rather than serve
1221 the user an unproven weight. Seeded / manually-registered
1222 entries have no such flag and are trusted by default.
1224 Returns None if no suitable model exists — caller then treats
1225 the draft's reply as the final answer."""
1226 if delegate == 'local':
1227 return self._pass_validation_gate(
1228 self._registry.get_fast_model())
1229 if delegate == 'hive':
1230 # Respect the user's explicit "local only" preference even
1231 # when the draft says hive would be better — the toggle is
1232 # the final authority on egress.
1233 if user_pref == 'local_only':
1234 return self._pass_validation_gate(
1235 self._registry.get_fast_model())
1236 expert = self._registry.get_expert_model()
1237 gated = self._pass_validation_gate(expert)
1238 if gated:
1239 return gated
1240 # Graceful fallback: no (valid) hive expert → use local fast
1241 return self._pass_validation_gate(
1242 self._registry.get_fast_model())
1243 return None
1245 def _pass_validation_gate(self, model):
1246 """Refuse to route to a hub-installed model whose post-download
1247 load probe has not yet succeeded.
1249 Catalog entries registered by `/api/admin/models/hub/install`
1250 carry ``source == 'hub-install'`` and
1251 ``capabilities['install_validated']`` (False at register time,
1252 flipped to True by the background validate probe). Any other
1253 entry — seeded preset, manual register, legacy — has no such
1254 flag and passes through unchanged, preserving every existing
1255 code path byte-for-byte.
1257 Returns ``None`` when the gate rejects the pick so the caller
1258 can cascade to the next fallback."""
1259 if model is None:
1260 return None
1261 try:
1262 model_id = getattr(model, 'id', None) or getattr(model, 'model_id', None)
1263 if not model_id:
1264 return model # unknown shape — don't second-guess
1265 # Late import — the catalog lives in the Nunba-side shim
1266 # but is a transitive dependency through service_tools.
1267 from integrations.service_tools.model_catalog import (
1268 get_catalog,
1269 )
1270 entry = get_catalog().get(model_id)
1271 if entry is None:
1272 return model # not a catalog entry — trust the registry
1273 source = getattr(entry, 'source', '')
1274 if source != 'hub-install':
1275 return model # seeded / manual → no validation requirement
1276 caps = getattr(entry, 'capabilities', {}) or {}
1277 if caps.get('install_validated') is True:
1278 return model
1279 logger.info(
1280 f"[dispatcher] refusing unvalidated hub-install "
1281 f"{model_id}; caller will fall back"
1282 )
1283 return None
1284 except Exception as e:
1285 # Any error in the gate degrades open — the dispatcher must
1286 # not fail chat because the validation lookup misbehaved.
1287 logger.debug(f"[dispatcher] validation gate error: {e}")
1288 return model
1290 def _schedule_hive_consult(self, prompt: str, user_id: str,
1291 speculation_id: str) -> bool:
1292 """Best-effort MoE HiveMind fusion consult on the expert pool.
1294 Fired only when the user selected
1295 ``intelligence_preference='hive_preferred'`` AND the draft
1296 self-delegated to ``hive``. Runs on the existing
1297 ``_expert_pool`` so it never contends with the chat hot path;
1298 the consult result populates ``self._last_hive_consult`` for
1299 observers (journey tests, admin/diag, future fusion wiring) to
1300 read.
1302 Guarantees:
1303 * Returns ``True`` iff a task was accepted onto the pool.
1304 * All failures are swallowed (``logger.debug`` only) — the
1305 chat reply path never blocks or errors on a missing /
1306 offline HiveMind.
1307 * No dependency on ``world_model_bridge`` at import time; the
1308 import is inside the worker so environments without
1309 hevolveai (pip) still load the dispatcher cleanly.
1310 """
1311 def _consult():
1312 try:
1313 from integrations.agent_engine.world_model_bridge import (
1314 get_world_model_bridge,
1315 )
1316 bridge = get_world_model_bridge()
1317 if not bridge:
1318 return
1319 # 1500ms timeout matches the default hive consult budget
1320 # in world_model_bridge and is half the chat hot-path
1321 # ceiling — cannot stall user-visible latency because
1322 # this runs on a worker thread, not the request thread.
1323 result = bridge.query_hivemind(
1324 prompt, timeout_ms=1500, user_id=user_id,
1325 )
1326 if not result:
1327 return
1328 with self._lock:
1329 self._last_hive_consult[speculation_id] = {
1330 'result': result,
1331 'completed_at': time.time(),
1332 'user_id': user_id,
1333 }
1334 # Cap the dict at the same ceiling as _results to
1335 # avoid unbounded growth in long-running instances.
1336 if len(self._last_hive_consult) > self._results_max:
1337 oldest = sorted(
1338 self._last_hive_consult.items(),
1339 key=lambda kv: kv[1].get('completed_at', 0),
1340 )[:len(self._last_hive_consult) - self._results_max]
1341 for k, _ in oldest:
1342 self._last_hive_consult.pop(k, None)
1343 logger.info(
1344 f"[hive_consult] speculation={speculation_id} "
1345 f"user={user_id} fusion returned "
1346 f"{str(result)[:160]!r}"
1347 )
1348 except Exception as e:
1349 logger.debug(f"[hive_consult] failed: {e}")
1351 try:
1352 self._expert_pool.submit(_consult)
1353 return True
1354 except Exception as e:
1355 logger.debug(f"[hive_consult] pool submit failed: {e}")
1356 return False
1358 def _record_interaction_safely(self, **kwargs) -> None:
1359 """Feed an interaction into HevolveAI via WorldModelBridge. Never
1360 raises — continual learning is best-effort and the chat path must
1361 not break if HevolveAI is offline or the bridge is in circuit-open
1362 mode. WorldModelBridge already handles guardrail + secret redaction
1363 internally."""
1364 try:
1365 from integrations.agent_engine.world_model_bridge import get_world_model_bridge
1366 bridge = get_world_model_bridge()
1367 bridge.record_interaction(**kwargs)
1368 except Exception as e:
1369 logger.debug(f"record_interaction skipped: {e}")
1371 # ─── Background expert task ───
1372 #
1373 # Single path: expert (local langchain pipeline or hive peer) takes
1374 # the ORIGINAL turn directly. No "improve this draft" wrapper, no
1375 # similarity gate — when the expert is the actual expert (full tool
1376 # registry locally, 27B or fine-tuned on hive), its reply IS the
1377 # answer. The standby (fast_response) was already delivered by the
1378 # draft-first path; this task replaces it via the existing
1379 # speculation_id bubble-replacement channel on _deliver_expert_response.
1381 def _expert_background_task(self, speculation_id: str, original_prompt: str,
1382 fast_response: str, expert_model, user_id: str,
1383 prompt_id: str, goal_id: str, goal_type: str):
1384 """Background: dispatch expert → deliver (or fall through to draft
1385 standby). Outer try/finally owns the shared invariants —
1386 circuit-breaker gate, exception swallowing, ``_active`` cleanup —
1387 so the helper can focus on dispatch + delivery semantics.
1388 """
1389 try:
1390 # GUARDRAIL: circuit breaker (check again — may have been halted)
1391 from security.hive_guardrails import HiveCircuitBreaker
1392 if HiveCircuitBreaker.is_halted():
1393 return
1395 self._run_collapsed_expert_path(
1396 speculation_id, original_prompt, fast_response,
1397 expert_model, user_id, prompt_id, goal_id, goal_type)
1399 except Exception as e:
1400 logger.debug(f"Expert background task failed for {speculation_id}: {e}")
1401 finally:
1402 with self._lock:
1403 self._active.pop(speculation_id, None)
1405 def _run_collapsed_expert_path(self, speculation_id: str,
1406 original_prompt: str, fast_response: str,
1407 expert_model, user_id: str, prompt_id: str,
1408 goal_id: str, goal_type: str):
1409 """Collapsed path: expert takes the ORIGINAL turn through the full
1410 langchain pipeline (local) or the hive endpoint (remote). No
1411 'improve this draft' wrapper. Reuses ``_dispatch_expert_langchain``
1412 for routing + ``_deliver_expert_response`` for SSE/TTS fan-out.
1413 """
1414 start = time.time()
1415 expert_response = self._dispatch_expert_langchain(
1416 expert_model, original_prompt, user_id, prompt_id,
1417 goal_type, goal_id)
1418 elapsed_ms = (time.time() - start) * 1000
1420 # GUARDRAIL: energy + latency telemetry (same instrumentation as
1421 # the legacy path — the rollout flip must not lose these metrics).
1422 self._registry.record_energy(expert_model.model_id, elapsed_ms)
1423 self._registry.record_latency(expert_model.model_id, elapsed_ms)
1425 # Empty response → the draft standby stays as the final reply.
1426 # The user already saw it; nothing to deliver. Record the
1427 # _results entry so admin /diag can tell the expert ran and
1428 # returned empty (vs never ran).
1429 if not expert_response or not expert_response.strip():
1430 logger.debug(
1431 "collapsed expert returned empty for %s; "
1432 "draft standby remains the final reply", speculation_id)
1433 with self._lock:
1434 self._results[speculation_id] = {
1435 'response': fast_response,
1436 'model': expert_model.model_id,
1437 'latency_ms': round(elapsed_ms, 1),
1438 'improved': False,
1439 }
1440 self._evict_old_results()
1441 return
1443 # GUARDRAIL: constitutional check on expert output before delivery
1444 from security.hive_guardrails import ConstitutionalFilter
1445 passed, reason = ConstitutionalFilter.check_prompt(expert_response)
1446 if not passed:
1447 logger.warning(
1448 "collapsed expert response blocked by guardrail: %s",
1449 reason)
1450 return
1452 # Unconditional delivery: the expert is THE expert here, not a
1453 # "maybe improvement". Bubble-replace the standby via the
1454 # existing speculation_id channel (SSE + TTS — see
1455 # _deliver_expert_response for the dual-channel contract).
1456 self._deliver_expert_response(
1457 user_id, prompt_id, speculation_id, expert_response)
1459 # Feed continual learning. Stamp escalation_reason from the
1460 # _active entry so distillation can weight refusal-overridden
1461 # turns differently from clean classifier-delegate turns.
1462 with self._lock:
1463 active_entry = dict(self._active.get(speculation_id, {}))
1464 served_by = (
1465 'hive_langchain_bg' if not expert_model.is_local
1466 else 'local_langchain_bg'
1467 )
1468 self._record_interaction_safely(
1469 user_id=user_id, prompt_id=prompt_id,
1470 prompt=original_prompt, response=expert_response,
1471 model_id=expert_model.model_id, latency_ms=elapsed_ms,
1472 goal_id=goal_id,
1473 escalation_reason=active_entry.get('escalation_reason'),
1474 )
1476 with self._lock:
1477 self._results[speculation_id] = {
1478 'response': expert_response,
1479 'model': expert_model.model_id,
1480 'latency_ms': round(elapsed_ms, 1),
1481 'improved': True,
1482 'served_by': served_by,
1483 }
1484 self._evict_old_results()
1486 def _dispatch_expert_langchain(self, model, prompt: str, user_id: str,
1487 prompt_id: str, goal_type: str,
1488 goal_id: Optional[str]) -> str:
1489 """Send the expert turn through the right transport for its tier.
1491 - ``model.is_local=True``: route through the FULL HARTOS /chat
1492 pipeline (agent loading, autogen GroupChat, full tool registry)
1493 so actionable-intent / agent-bound turns actually fire their
1494 tools.
1495 * Bundled mode (NUNBA_BUNDLED / sys.frozen): in-process Flask
1496 ``test_client`` — port 6777 is not bound in bundled Nunba.
1497 * Non-bundled: HTTP POST to ``HEVOLVE_BASE_URL`` (or the
1498 port_registry-resolved backend).
1499 Re-entry is prevented by ``speculative=False, draft_first=False``
1500 in the payload — the inner /chat handler reads these and skips
1501 the dispatcher.
1503 - ``model.is_local=False`` (hive-served expert, registered by
1504 ``HiveExpertDiscovery``): OpenAI-compatible POST to
1505 ``{base_url}/chat/completions`` with the registered auth token.
1506 The hive peer's 27B / fine-tuned model takes the turn directly.
1508 Returns the response string, or ``''`` on any failure — caller
1509 falls back to ``fast_response`` so the user always sees the
1510 draft's standby.
1511 """
1512 if model is None:
1513 return ''
1515 # ── Hive path: OpenAI-compatible POST to peer base_url ──
1516 if not getattr(model, 'is_local', True):
1517 cfg = getattr(model, 'config_list_entry', {}) or {}
1518 base_url = (cfg.get('base_url') or '').rstrip('/')
1519 api_key = cfg.get('api_key') or ''
1520 inner_model_id = cfg.get('model') or model.model_id
1521 if not base_url:
1522 logger.debug(
1523 "hive expert %s missing base_url — cannot dispatch",
1524 model.model_id)
1525 return ''
1526 try:
1527 import requests as _req
1528 headers = (
1529 {'Authorization': f'Bearer {api_key}'} if api_key else {})
1530 resp = _req.post(
1531 f'{base_url}/chat/completions',
1532 headers=headers,
1533 json={
1534 'model': inner_model_id,
1535 'messages': [{'role': 'user', 'content': prompt}],
1536 'max_tokens': 1500,
1537 'temperature': 0.7,
1538 },
1539 timeout=60,
1540 )
1541 if resp.status_code == 200:
1542 data = resp.json() or {}
1543 choices = data.get('choices') or []
1544 if choices:
1545 msg = (choices[0] or {}).get('message') or {}
1546 return msg.get('content') or ''
1547 else:
1548 logger.debug(
1549 "hive expert %s returned HTTP %s",
1550 model.model_id, resp.status_code)
1551 except Exception as e:
1552 logger.debug(
1553 "hive expert dispatch failed (%s): %s",
1554 model.model_id, e)
1555 return ''
1557 # ── Local path: full HARTOS /chat pipeline ──
1558 payload = {
1559 'user_id': user_id,
1560 'prompt_id': (
1561 f'{goal_type}_{goal_id[:8]}' if goal_id else prompt_id),
1562 'prompt': prompt,
1563 'create_agent': True,
1564 'autonomous': True,
1565 'casual_conv': False,
1566 'model_config': model.to_config_list(),
1567 # Hard no-reentry: inner /chat reads these and skips the
1568 # dispatcher entirely so we never recursively re-enter.
1569 'speculative': False,
1570 'draft_first': False,
1571 }
1573 import sys as _sys
1574 _bundled = bool(
1575 os.environ.get('NUNBA_BUNDLED')
1576 or getattr(_sys, 'frozen', False)
1577 )
1578 if _bundled:
1579 try:
1580 # Late import — keeps module-load time independent of
1581 # hart_intelligence_entry's heavy boot graph. In bundled
1582 # Nunba this is cheap (already in sys.modules by the
1583 # time a chat turn fires).
1584 from hart_intelligence_entry import app as _app # type: ignore
1585 with _app.test_client() as client:
1586 resp = client.post('/chat', json=payload)
1587 if resp.status_code == 200:
1588 data = resp.get_json() or {}
1589 return data.get('response') or ''
1590 logger.debug(
1591 "local expert /chat returned %s in bundled mode",
1592 resp.status_code)
1593 except Exception as e:
1594 logger.debug(
1595 "local expert bundled dispatch failed: %s", e)
1596 return ''
1598 # Non-bundled: HTTP POST to the configured backend.
1599 try:
1600 import requests as _req
1601 base = os.environ.get(
1602 'HEVOLVE_BASE_URL',
1603 f'http://localhost:{get_port("backend")}',
1604 )
1605 resp = _req.post(f'{base}/chat', json=payload, timeout=60)
1606 if resp.status_code == 200:
1607 return (resp.json() or {}).get('response') or ''
1608 logger.debug(
1609 "local expert /chat HTTP returned %s", resp.status_code)
1610 except Exception as e:
1611 logger.debug("local expert HTTP dispatch failed: %s", e)
1612 return ''
1614 # ─── Helpers ───
1616 def _dispatch_to_model(self, model: 'ModelBackend', prompt: str,
1617 user_id: str, prompt_id: str,
1618 goal_type: str, goal_id: str = None) -> str:
1619 """Send prompt to a specific model via /chat endpoint with config override.
1621 Always passes ``speculative=False`` and ``draft_first=False`` on the
1622 inner call so the dispatcher can never recursively re-enter itself
1623 when HEVOLVE_DRAFT_FIRST or the legacy speculative flag is enabled
1624 upstream. The outer chat route triggered us, and that's where the
1625 decision to speculate was made.
1627 In bundled/in-process mode (Nunba desktop), uses Flask test_client()
1628 instead of HTTP — port 6777 is never bound in bundled mode.
1629 """
1630 payload = {
1631 'user_id': user_id,
1632 'prompt_id': f'{goal_type}_{goal_id[:8]}' if goal_id else prompt_id,
1633 'prompt': prompt,
1634 'create_agent': True,
1635 'autonomous': True,
1636 'casual_conv': False,
1637 'model_config': model.to_config_list(),
1638 # Hard no-reentry guard — inner dispatch never speculates
1639 'speculative': False,
1640 'draft_first': False,
1641 }
1643 # Bundled mode: call the model's llama-server directly on its port.
1644 # Do NOT use Flask test_client('/chat') — that re-enters the full
1645 # HARTOS pipeline (autogen, agent creation, etc.) causing re-entrancy.
1646 _bundled = bool(os.environ.get('NUNBA_BUNDLED') or getattr(__import__('sys'), 'frozen', False))
1647 if _bundled:
1648 try:
1649 # Resolve the model's direct port from the catalog/port_registry
1650 _port = None
1651 if hasattr(model, 'port') and model.port:
1652 _port = model.port
1653 if not _port:
1654 try:
1655 from core.port_registry import get_local_draft_url, get_local_llm_url
1656 _url = get_local_draft_url() or get_local_llm_url()
1657 if _url:
1658 # Extract port from URL like http://127.0.0.1:8081/v1
1659 import re as _re
1660 _m = _re.search(r':(\d+)', _url)
1661 _port = int(_m.group(1)) if _m else 8081
1662 except Exception:
1663 _port = 8081 # draft default
1664 import requests as _req
1665 resp = _req.post(
1666 f'http://127.0.0.1:{_port}/v1/chat/completions',
1667 json={
1668 'model': 'llama',
1669 'messages': [{'role': 'user', 'content': prompt}],
1670 'max_tokens': 500,
1671 'temperature': 0.7,
1672 },
1673 timeout=15,
1674 )
1675 if resp.status_code == 200:
1676 data = resp.json()
1677 if 'choices' in data:
1678 return data['choices'][0]['message']['content']
1679 elif 'error' in data:
1680 logger.debug(f"Draft model error: {data['error']}")
1681 except Exception as e:
1682 logger.debug(f"Direct draft dispatch failed ({model.model_id}): {e}")
1683 return ''
1685 # HTTP mode: external HARTOS server
1686 import requests as req
1687 base_url = os.environ.get('HEVOLVE_BASE_URL', f'http://localhost:{get_port("backend")}')
1688 try:
1689 resp = req.post(f'{base_url}/chat', json=payload, timeout=30)
1690 if resp.status_code == 200:
1691 return resp.json().get('response', '')
1692 except req.RequestException as e:
1693 logger.debug(f"Model dispatch failed ({model.model_id}): {e}")
1694 return ''
1696 def _deliver_expert_response(self, user_id: str, prompt_id: str,
1697 speculation_id: str, response: str):
1698 """Dual-channel async delivery: Crossbar chat topic + TTS pupit topic.
1700 Worker-thread safe — uses ``core.safe_hartos_attr`` to read
1701 hart_intelligence symbols without triggering Python's per-module
1702 import lock (worker threads racing the canonical loader on the
1703 langchain_core / transformers import chain caused multi-minute
1704 agent_daemon freezes; resolving via sys.modules avoids the lock).
1705 """
1706 from core.safe_hartos_attr import safe_hartos_attr
1707 from core.peer_link.message_bus import chat_topic_for
1709 # 1. Publish text via canonical publish_async (MessageBus → Crossbar)
1710 try:
1711 publish_async = safe_hartos_attr('publish_async')
1712 if publish_async is not None:
1713 topic = chat_topic_for(user_id)
1714 publish_async(topic, response)
1715 logger.info(
1716 "Expert chat publish: spec=%s user=%s topic=%s len=%d",
1717 speculation_id, user_id, topic, len(response or ''),
1718 )
1719 else:
1720 logger.info(
1721 "Expert chat publish skipped: spec=%s user=%s — "
1722 "HARTOS publish_async not yet resolvable (loader still "
1723 "initialising). Drop the speculative bubble; the main "
1724 "reply path will deliver when ready.",
1725 speculation_id, user_id,
1726 )
1727 except Exception as e:
1728 logger.warning(
1729 "Expert chat publish failed: spec=%s user=%s err=%s",
1730 speculation_id, user_id, e,
1731 )
1733 # 2. Synthesize TTS and publish to pupit audio topic — ensures speculative
1734 # expert improvements get the SAME audio treatment as regular replies
1735 # (users on TTS-enabled sessions hear the improved response).
1736 try:
1737 _tts_synthesize_and_publish = safe_hartos_attr(
1738 '_tts_synthesize_and_publish')
1739 if _tts_synthesize_and_publish is not None:
1740 _tts_synthesize_and_publish(
1741 response, str(user_id), speculation_id)
1742 logger.info(
1743 "Expert TTS publish: spec=%s user=%s",
1744 speculation_id, user_id,
1745 )
1746 else:
1747 logger.info(
1748 "Expert TTS publish skipped: spec=%s user=%s — "
1749 "HARTOS _tts_synthesize_and_publish not yet resolvable.",
1750 speculation_id, user_id,
1751 )
1752 except Exception as e:
1753 logger.debug(f"Expert TTS publish failed: spec={speculation_id} err={e}")
1755 logger.info(f"Expert enhancement delivered: spec={speculation_id}, "
1756 f"user={user_id}")
1758 def _check_and_reserve_budget(self, user_id: str, goal_id: str,
1759 expert_model) -> bool:
1760 """Check Spark budget before expert execution (atomic row lock).
1762 Delegates to shared budget_gate.check_goal_budget() to avoid duplication.
1763 """
1764 if not goal_id:
1765 return True # No goal = no budget constraint
1767 try:
1768 from .budget_gate import check_goal_budget
1769 cost = expert_model.cost_per_1k_tokens
1770 allowed, remaining, reason = check_goal_budget(goal_id, cost)
1771 return allowed
1772 except ImportError:
1773 return True # Allow if budget system unavailable
1775 def _record_compute_contribution(self, node_id: str, model_id: str,
1776 latency_ms: float):
1777 """Credit hive node for serving fast response → ad revenue eligibility.
1779 GUARDRAIL: Only master_key_verified nodes get credit.
1780 GUARDRAIL: ComputeDemocracy.adjusted_reward() — logarithmic, not linear.
1781 """
1782 if not node_id:
1783 return
1784 try:
1785 from integrations.social.models import get_db, PeerNode
1786 db = get_db()
1787 try:
1788 peer = db.query(PeerNode).filter_by(node_id=node_id).first()
1789 if peer and peer.master_key_verified:
1790 peer.agent_count = (peer.agent_count or 0) + 1
1791 db.commit()
1792 finally:
1793 db.close()
1794 except Exception as e:
1795 logger.debug(f"Compute contribution recording skipped: {e}")
1797 def _evict_old_results(self):
1798 """Evict oldest results when over capacity. Must be called under self._lock."""
1799 if len(self._results) > self._results_max:
1800 # Remove oldest entries (dict preserves insertion order in Python 3.7+)
1801 excess = len(self._results) - self._results_max
1802 for key in list(self._results.keys())[:excess]:
1803 del self._results[key]
1805 # ─── Status / results ───
1807 def get_speculation_status(self, speculation_id: str) -> dict:
1808 """Get status of a speculative dispatch."""
1809 with self._lock:
1810 if speculation_id in self._active:
1811 return {'status': 'pending', 'speculation_id': speculation_id}
1812 if speculation_id in self._results:
1813 result = self._results[speculation_id]
1814 return {'status': 'completed', **result}
1815 return {'status': 'unknown', 'speculation_id': speculation_id}
1817 def get_stats(self) -> dict:
1818 """Get dispatcher statistics."""
1819 with self._lock:
1820 return {
1821 'active_speculations': len(self._active),
1822 'completed': len(self._results),
1823 'total_energy_kwh_24h': round(
1824 self._registry.get_total_energy_kwh(24), 4),
1825 }
1828# ─── Module-level singleton ───
1829_dispatcher = None
1830_dispatcher_lock = threading.Lock()
1833def get_speculative_dispatcher() -> SpeculativeDispatcher:
1834 """Get or create the singleton SpeculativeDispatcher."""
1835 global _dispatcher
1836 if _dispatcher is None:
1837 with _dispatcher_lock:
1838 if _dispatcher is None:
1839 _dispatcher = SpeculativeDispatcher()
1840 return _dispatcher