Coverage for integrations / agent_engine / world_model_bridge.py: 62.8%
707 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Unified Agent Goal Engine - World Model Bridge
4Bridge between LLM-langchain orchestration and HevolveAI's embodied AI.
5Every agent interaction becomes training data for continuous learning.
6Skills distribute via gossip notification + local RALT ingestion.
8Dual-mode operation:
9 IN-PROCESS (flat/regional): Direct Python calls — zero HTTP overhead.
10 HevolveAI is pip-installed, learning functions called directly.
11 HTTP FALLBACK (central standalone): REST calls to remote HevolveAI.
12 Used when services run as separate processes on different ports.
14Bootstrap: HevolveAI uses llama.cpp (Qwen3-VL-2B, Q4_K_XL, ~1.5GB) locally.
15No external API key needed. Learning is local-first, distributed via RALT + WAMP.
17GUARDRAILS applied at every layer:
18- ConstitutionalFilter on experiences before storage
19- WorldModelSafetyBounds on RALT export (rate limit + witness)
20- ConstructiveFilter on skill packets (no destructive capabilities)
21"""
22import atexit
23import json
24import logging
25import os
26import sys
27import time
28import threading
29from collections import deque
30from concurrent.futures import ThreadPoolExecutor
31from typing import Dict, List, Optional
33import requests
35from core.http_pool import pooled_get, pooled_post
37logger = logging.getLogger('hevolve_social')
40class WorldModelBridge:
41 """Bridge between LLM-langchain orchestration and HevolveAI embodied AI.
43 Dual-mode: in-process direct Python calls when HevolveAI is co-located
44 (flat/regional), HTTP fallback when running as separate processes (central).
46 Two-tier thinking model:
47 - Agent-level: Hevolve dispatches coarse-grained goals (task delegation)
48 - Tensor-level: HevolveAI fuses heterogeneous agent thoughts (HiveMind)
49 This bridge connects the two layers.
50 """
52 def __init__(self):
53 self._api_url = os.environ.get(
54 'HEVOLVEAI_API_URL', 'http://localhost:8000')
55 self._node_tier = os.environ.get('HEVOLVE_NODE_TIER', 'flat')
56 self._experience_queue: deque = deque(maxlen=10000)
57 self._flush_executor = ThreadPoolExecutor(
58 max_workers=2, thread_name_prefix='wm_flush')
59 atexit.register(lambda: self._flush_executor.shutdown(wait=False))
60 self._flush_batch_size = int(os.environ.get(
61 'HEVOLVE_WM_FLUSH_BATCH', '50'))
62 self._lock = threading.Lock()
63 self._stats = {
64 'total_recorded': 0,
65 'total_flushed': 0,
66 'total_corrections': 0,
67 'total_hivemind_queries': 0,
68 'total_skills_distributed': 0,
69 'total_skills_blocked': 0,
70 'node_tier': self._node_tier,
71 }
73 # Configurable HTTP timeouts
74 self._timeout_flush = int(os.environ.get('HEVOLVE_WM_FLUSH_TIMEOUT', '15'))
75 self._timeout_correction = int(os.environ.get('HEVOLVE_WM_CORRECTION_TIMEOUT', '30'))
76 self._timeout_default = int(os.environ.get('HEVOLVE_WM_HTTP_TIMEOUT', '10'))
78 # Circuit breaker: reusable implementation from core
79 from core.circuit_breaker import CircuitBreaker
80 self._circuit_breaker = CircuitBreaker(
81 name='world_model_bridge', threshold=5, cooldown=60.0)
83 # In-process mode: direct Python calls (no HTTP overhead)
84 self._provider = None # LearningLLMProvider
85 self._hive_mind = None # HiveMind
86 self._in_process = False
87 self._in_process_retry_done = False
88 self._http_disabled = False # Set True when bundled + no server to talk to
89 self._federation_aggregated = {}
91 # Cloud consent: per-user gate for non-local HTTP data sharing.
92 # "If anyways it needs to be sent to cloud for something then
93 # let's get consent from user at runtime." — project steward
94 self._consent_cache: Dict[str, tuple] = {} # user_id → (bool, timestamp)
95 self._consent_cache_ttl = 300 # 5 minutes
97 # CCT (Compute Contribution Token) cache for learning access gating
98 self._cct_cache: Optional[tuple] = None # (cct_string, timestamp)
99 self._cct_cache_ttl = 300 # 5 minutes
100 self._node_id: Optional[str] = None
102 self._init_in_process()
104 # Disable HTTP when there's no server to talk to.
105 # If in-process failed AND no explicit HEVOLVEAI_API_URL was configured
106 # (just the default localhost:8000), there's no point spamming HTTP.
107 # Bundled mode (NUNBA_BUNDLED) always disables — Nunba owns the lifecycle.
108 # Non-bundled with default URL also disables — no server was started.
109 if not self._in_process:
110 _explicit_url = os.environ.get('HEVOLVEAI_API_URL', '')
111 if os.environ.get('NUNBA_BUNDLED') == '1' or not _explicit_url:
112 self._http_disabled = True
113 logger.info(
114 "[WorldModelBridge] Learning not available in-process, "
115 "no explicit HEVOLVEAI_API_URL — HTTP disabled")
117 # Periodic HevolveAI integrity watcher (Gap 1 fix)
118 self._crawl_watcher = None
119 self._start_crawl_integrity_watcher()
121 def _start_crawl_integrity_watcher(self) -> None:
122 """Start periodic HevolveAI integrity watcher if in-process mode active.
124 No-op if HTTP mode (nothing to watch — HTTP doesn't depend on
125 HevolveAI's local file state).
126 """
127 if not self._in_process:
128 return
129 try:
130 from security.source_protection import CrawlIntegrityWatcher
131 watcher = CrawlIntegrityWatcher()
132 watcher.register_tamper_callback(self._on_crawl_tamper_detected)
133 watcher.start()
134 self._crawl_watcher = watcher
135 logger.info("[WorldModelBridge] CrawlIntegrityWatcher started")
136 except ImportError:
137 pass # source_protection not available
138 except Exception as e:
139 logger.warning(
140 f"[WorldModelBridge] CrawlIntegrityWatcher failed: {e}")
142 def _on_crawl_tamper_detected(self) -> None:
143 """Callback: HevolveAI files changed post-boot.
145 Disable in-process mode and fall back to HTTP.
146 Does NOT halt the hive — that requires master key.
147 """
148 logger.critical(
149 "[WorldModelBridge] HevolveAI tampering detected — "
150 "disabling in-process mode, falling back to HTTP")
151 with self._lock:
152 self._in_process = False
153 self._provider = None
154 self._hive_mind = None
156 def _cb_is_open(self) -> bool:
157 """Check if circuit breaker is open (blocking requests)."""
158 return self._circuit_breaker.is_open()
160 def _cb_record_success(self):
161 """Reset circuit breaker on successful call."""
162 self._circuit_breaker.record_success()
164 def _cb_record_failure(self):
165 """Record failure; open circuit at threshold."""
166 self._circuit_breaker.record_failure()
168 def _is_external_target(self) -> bool:
169 """Check if the API URL points to a non-local (cloud) endpoint."""
170 url = self._api_url.lower()
171 local_prefixes = (
172 'http://localhost', 'http://127.0.0.1',
173 'http://0.0.0.0', 'http://[::1]',
174 )
175 return not any(url.startswith(p) for p in local_prefixes)
177 def _has_cloud_consent(self, user_id: str) -> bool:
178 """Check if a user has consented to cloud data sharing.
180 Consent is stored in User.settings['cloud_data_consent'].
181 Cached for 5 minutes to avoid DB lookups on every experience.
182 In-process mode (local) does NOT require consent — data stays local.
183 """
184 if not user_id:
185 return False
187 now = time.time()
188 cached = self._consent_cache.get(user_id)
189 if cached and now - cached[1] < self._consent_cache_ttl:
190 return cached[0]
192 consent = False
193 try:
194 from integrations.social.models import get_db, User
195 db = get_db()
196 try:
197 user = db.query(User).filter_by(id=user_id).first()
198 if user:
199 consent = bool(
200 (user.settings or {}).get('cloud_data_consent', False))
201 finally:
202 db.close()
203 except Exception:
204 pass
206 self._consent_cache[user_id] = (consent, now)
207 return consent
209 # U3: Hive participation opt-out (reuses consent cache pattern)
210 def _has_hive_participation(self, user_id: str) -> bool:
211 """Check if user has hive participation enabled (default: True).
213 Users can opt out via User.settings['hive_participation'] = False.
214 Cached alongside cloud consent (same TTL).
215 """
216 if not user_id:
217 return True # Default participate if no user context
219 cache_key = f"hive_{user_id}"
220 now = time.time()
221 cached = self._consent_cache.get(cache_key)
222 if cached and now - cached[1] < self._consent_cache_ttl:
223 return cached[0]
225 participate = True
226 try:
227 from integrations.social.models import get_db, User
228 db = get_db()
229 try:
230 user = db.query(User).filter_by(id=user_id).first()
231 if user:
232 participate = bool(
233 (user.settings or {}).get('hive_participation', True))
234 finally:
235 db.close()
236 except Exception:
237 pass
239 self._consent_cache[cache_key] = (participate, now)
240 return participate
242 # ─── CCT (Compute Contribution Token) gating ──────────────────
244 def _load_cached_cct(self) -> Optional[str]:
245 """Load CCT from file, cache in memory (5 min TTL)."""
246 now = time.time()
247 if self._cct_cache and now - self._cct_cache[1] < self._cct_cache_ttl:
248 return self._cct_cache[0]
250 try:
251 from .continual_learner_gate import ContinualLearnerGateService
252 cct = ContinualLearnerGateService.load_cct_from_file()
253 if cct:
254 self._cct_cache = (cct, now)
255 return cct
256 except Exception:
257 return None
259 def _check_cct_access(self, capability: str) -> bool:
260 """Check if node's CCT grants a specific capability. Zero DB calls."""
261 try:
262 from .continual_learner_gate import ContinualLearnerGateService
263 cct = self._load_cached_cct()
264 if not cct:
265 return False
266 return ContinualLearnerGateService.check_cct_capability(
267 cct, capability, self._node_id)
268 except Exception:
269 return False # Fail-closed
271 def _init_in_process(self):
272 """Try to connect to in-process learning pipeline (zero HTTP overhead).
274 When HevolveAI is pip-installed and _init_learning_pipeline() has run
275 in hart_intelligence, we get direct references to the provider
276 and hivemind instances. All subsequent calls bypass HTTP entirely.
278 SECURITY: Integrity verification required before enabling in-process.
279 If HevolveAI files don't match the signed manifest, fall back to HTTP.
280 """
281 # Integrity gate: verify HevolveAI installation before in-process
282 try:
283 from security.source_protection import SourceProtectionService
284 integrity = SourceProtectionService.verify_hevolveai_integrity()
285 if not integrity.get('verified', False):
286 mismatched = integrity.get('mismatched_files', [])
287 if mismatched:
288 logger.warning(
289 f"[WorldModelBridge] HevolveAI integrity FAILED: "
290 f"{len(mismatched)} mismatched files — forcing HTTP mode")
291 logger.info(
292 f"[WorldModelBridge] HTTP mode: {self._api_url}")
293 return
294 except ImportError:
295 pass # source_protection not available — skip check
296 except Exception as e:
297 logger.debug(f"[WorldModelBridge] Integrity check skipped: {e}")
299 # Worker threads MUST NOT trigger a `from hart_intelligence import …`
300 # here. The hart_intelligence import chain (langchain, transformers,
301 # autogen, multimodal stacks) can take 300+ seconds on first load,
302 # and Python's per-module import lock is held for the entire
303 # duration. When the watchdog declares the worker FROZEN at 300s
304 # and "restarts" it, the original thread can't be killed (Python
305 # has no thread.kill); it stays alive holding the lock. The new
306 # thread runs the same `from hart_intelligence import …` and blocks
307 # on the same lock. After ~10 cycles you have 10 zombie daemon
308 # threads, zero goals dispatched, zero spark spent — exactly the
309 # 2026-04-29 dashboard incident (daemon restart_count=9).
310 #
311 # Only consult `sys.modules` to see whether hart_intelligence was
312 # already imported on the main thread (via the bootstrap pre-warm
313 # path that owns the heavyweight import). If yes — use the
314 # resolved module's accessors directly, no import lock taken.
315 # If no — fall through to HTTP mode without forcing a worker-thread
316 # import; the bootstrap path will retry the in-process upgrade on
317 # the next record_interaction once it finishes the pre-warm.
318 mod = sys.modules.get('hart_intelligence')
319 if mod is not None:
320 try:
321 _get_provider = getattr(mod, 'get_learning_provider', None)
322 _get_hive = getattr(mod, 'get_hive_mind', None)
323 if _get_provider is not None:
324 provider = _get_provider()
325 hive = _get_hive() if _get_hive is not None else None
326 if provider is not None:
327 self._provider = provider
328 self._hive_mind = hive
329 self._in_process = True
330 logger.info(
331 "[WorldModelBridge] In-process mode: direct Python calls")
332 return
333 except Exception as e:
334 logger.debug(
335 f"[WorldModelBridge] in-process probe failed: {e}")
336 logger.info(
337 f"[WorldModelBridge] HTTP mode: {self._api_url}")
339 # ─── Record interactions (auto-learn) ────────────────────────────
341 def record_interaction(self, user_id: str, prompt_id: str,
342 prompt: str, response: str,
343 model_id: str = None, latency_ms: float = 0,
344 node_id: str = None, goal_id: str = None,
345 attribution_chain: dict = None,
346 escalation_reason: str = None):
347 """Record every agent interaction as training data for HevolveAI.
349 Called after EVERY /chat response. Batches experiences and flushes
350 them to HevolveAI (in-process or HTTP).
351 HevolveAI auto-learns from every completion (3-priority queue:
352 expert > reality > distillation).
354 ``escalation_reason`` (optional) — when the speculative dispatcher
355 promoted this turn from draft to expert path, the canonical
356 reason value from
357 ``integrations.agent_engine.escalation_reasons.EscalationReason``.
358 Stored on the experience so HevolveAI's distillation can weight
359 refusal-overridden / parse-failure traces differently from
360 clean classifier-delegate traces. ``None`` when the draft
361 answered as final (no escalation).
363 GUARDRAIL: ConstitutionalFilter screens before storage.
364 """
365 # Lazy in-process re-check: HevolveAI may have initialized after our __init__
366 if not self._in_process and not self._in_process_retry_done:
367 self._in_process_retry_done = True
368 self._init_in_process()
370 try:
371 from security.hive_guardrails import ConstitutionalFilter
372 passed, _ = ConstitutionalFilter.check_prompt(response)
373 if not passed:
374 return
375 except ImportError:
376 pass
378 experience = {
379 'prompt': prompt[:2000],
380 'response': response[:5000],
381 'model_id': model_id or 'unknown',
382 'latency_ms': latency_ms,
383 'user_id': str(user_id),
384 'prompt_id': str(prompt_id),
385 'node_id': node_id,
386 'goal_id': goal_id,
387 'timestamp': time.time(),
388 'source': 'langchain_orchestration',
389 }
390 # Structured attribution chain — preferred carrier for
391 # agent_attribution's step/observation/credit/parent_action_id
392 # payload. Previously it was packed into the 'prompt' field
393 # as stringified JSON, which confused HevolveAI's prompt-text
394 # distillation path. Leave the legacy path intact for callers
395 # that haven't migrated — they pass nothing, and HevolveAI
396 # receives only the primary experience fields.
397 if attribution_chain is not None:
398 experience['attribution_chain'] = attribution_chain
399 if escalation_reason:
400 experience['escalation_reason'] = escalation_reason
402 # PRIVACY: Redact secrets + anonymize user before shared ingestion.
403 # The hive must NEVER leak secrets from one user to another.
404 try:
405 from security.secret_redactor import redact_experience
406 experience = redact_experience(experience)
407 except ImportError:
408 pass
410 self._experience_queue.append(experience)
411 with self._lock:
412 self._stats['total_recorded'] += 1
414 # Durable local write: append the user↔assistant pair to
415 # ConversationEntry so FULL_HISTORY and the channel unified
416 # inbox can query it by time range. Previously record_interaction
417 # only batched to the in-memory _experience_queue + async flush
418 # to HevolveAI, so if HevolveAI was offline the row was lost on
419 # process exit. Now chat history survives restarts even without
420 # a hive connection. Failures here must not break the learning
421 # path — wrap in try/except and log at debug level.
422 try:
423 self._persist_to_conversation_entry(
424 user_id=str(user_id), prompt_id=str(prompt_id),
425 prompt=prompt, response=response,
426 model_id=model_id or 'unknown',
427 )
428 except Exception as e:
429 logger.debug(f"ConversationEntry durable write skipped: {e}")
431 if len(self._experience_queue) >= self._flush_batch_size:
432 batch = []
433 while self._experience_queue and len(batch) < self._flush_batch_size:
434 try:
435 batch.append(self._experience_queue.popleft())
436 except IndexError:
437 break
438 if batch:
439 self._flush_executor.submit(self._flush_to_world_model, batch)
441 def _persist_to_conversation_entry(
442 self, user_id: str, prompt_id: str,
443 prompt: str, response: str, model_id: str,
444 ) -> None:
445 """Write the user prompt + assistant response to ConversationEntry
446 as two rows so FULL_HISTORY can BETWEEN-query them by created_at.
448 Keeps the single write-path that integrations.channels.response.router
449 uses (same model, same field contract), so existing tooling that
450 already reads ConversationEntry (unified inbox, time-based history)
451 sees chat-route interactions too. agent_id is left empty — that
452 maps to the default chat flow; channel-inbound flows still set it.
453 """
454 try:
455 from integrations.social.models import get_db, ConversationEntry
456 except ImportError:
457 return
458 db = None
459 try:
460 db = get_db()
461 for role, content in (('user', prompt), ('assistant', response)):
462 if not content:
463 continue
464 entry = ConversationEntry(
465 user_id=user_id,
466 channel_type='chat',
467 role=role,
468 content=content[:10000],
469 agent_id=None,
470 prompt_id=prompt_id or None,
471 )
472 db.add(entry)
473 db.commit()
474 except Exception:
475 # Swallow — the caller's try/except already downgrades to
476 # debug. We must not raise here under any circumstances.
477 if db is not None:
478 try:
479 db.rollback()
480 except Exception:
481 pass
482 raise
483 finally:
484 if db is not None:
485 try:
486 db.close()
487 except Exception:
488 pass
490 def _flush_to_world_model(self, batch: list):
491 """Flush experience batch to HevolveAI's learning provider.
493 In-process mode: calls provider.create_chat_completion() directly.
494 HTTP mode: POST /v1/chat/completions (OpenAI format).
495 """
496 if self._in_process and self._provider:
497 for exp in batch:
498 try:
499 messages = [
500 {
501 'role': 'system',
502 'content': json.dumps({
503 'source': exp.get('source',
504 'langchain_orchestration'),
505 'user_id': exp.get('user_id'),
506 'prompt_id': exp.get('prompt_id'),
507 'goal_id': exp.get('goal_id'),
508 'model_id': exp.get('model_id'),
509 'latency_ms': exp.get('latency_ms'),
510 'node_id': exp.get('node_id'),
511 }),
512 },
513 {'role': 'user', 'content': exp['prompt']},
514 {'role': 'assistant', 'content': exp['response']},
515 ]
516 self._provider.create_chat_completion(
517 messages=messages,
518 model='hevolve-interaction-replay',
519 temperature=0,
520 max_tokens=1,
521 )
522 with self._lock:
523 self._stats['total_flushed'] += 1
524 except Exception as e:
525 logger.debug(f"In-process flush error: {e}")
526 return
528 # HTTP fallback (central standalone or HevolveAI not in-process)
529 if self._http_disabled or self._cb_is_open():
530 return
532 # CONSENT GATE: if target is external (cloud), filter to consented users only.
533 # Local endpoints (localhost) don't require consent — data stays on-device.
534 if self._is_external_target():
535 original_count = len(batch)
536 batch = [
537 exp for exp in batch
538 if self._has_cloud_consent(exp.get('user_id', ''))
539 ]
540 skipped = original_count - len(batch)
541 if skipped > 0:
542 logger.info(
543 f"[WorldModelBridge] Skipped {skipped}/{original_count} "
544 f"experiences — no cloud consent")
545 if not batch:
546 return
548 for exp in batch:
549 try:
550 body = {
551 'model': 'hevolve-interaction-replay',
552 'messages': [
553 {
554 'role': 'system',
555 'content': json.dumps({
556 'source': exp.get('source',
557 'langchain_orchestration'),
558 'user_id': exp.get('user_id'),
559 'prompt_id': exp.get('prompt_id'),
560 'goal_id': exp.get('goal_id'),
561 'model_id': exp.get('model_id'),
562 'latency_ms': exp.get('latency_ms'),
563 'node_id': exp.get('node_id'),
564 }),
565 },
566 {'role': 'user', 'content': exp['prompt']},
567 {'role': 'assistant', 'content': exp['response']},
568 ],
569 'temperature': 0,
570 'max_tokens': 1,
571 }
572 pooled_post(
573 f'{self._api_url}/v1/chat/completions',
574 json=body,
575 timeout=self._timeout_flush,
576 )
577 self._cb_record_success()
578 with self._lock:
579 self._stats['total_flushed'] += 1
580 except requests.RequestException:
581 self._cb_record_failure()
582 except Exception as e:
583 self._cb_record_failure()
584 logger.debug(f"World model flush error: {e}")
586 # ─── Expert corrections (RL-EF) ─────────────────────────────────
588 def submit_correction(self, original_response: str,
589 corrected_response: str,
590 expert_id: str = 'hevolve_user',
591 confidence: float = 1.0,
592 explanation: str = None,
593 context: dict = None,
594 valid_until: str = None) -> dict:
595 """Submit an expert correction to HevolveAI's RL-EF system.
597 In-process: calls send_expert_correction() directly.
598 HTTP: POST /v1/corrections.
600 HevolveAI routes to:
601 - Kernel Continual Learner (instant, no gradient) for factual corrections
602 - Orthogonal LoRA (gradient-based, forgetting-safe) for conceptual ones
604 Returns dict with success status.
605 """
606 try:
607 from security.hive_guardrails import ConstitutionalFilter
608 passed, reason = ConstitutionalFilter.check_prompt(corrected_response)
609 if not passed:
610 return {'success': False, 'reason': reason}
611 except ImportError:
612 pass
614 # PRIVACY: Redact secrets from corrections before shared learning
615 try:
616 from security.secret_redactor import redact_secrets
617 original_response, _ = redact_secrets(original_response)
618 corrected_response, _ = redact_secrets(corrected_response)
619 if explanation:
620 explanation, _ = redact_secrets(explanation)
621 except ImportError:
622 pass
624 # In-process direct call
625 if self._in_process and self._provider:
626 try:
627 from hevolveai.embodied_ai.rl_ef import send_expert_correction
628 result = send_expert_correction(
629 domain='general',
630 original_response=original_response[:5000],
631 corrected_response=corrected_response[:5000],
632 expert_id=expert_id,
633 confidence=max(0.0, min(1.0, confidence)),
634 explanation=explanation[:2000] if explanation else None,
635 valid_until=valid_until,
636 )
637 with self._lock:
638 self._stats['total_corrections'] += 1
639 return result if isinstance(result, dict) else {'success': True}
640 except Exception as e:
641 logger.debug(f"In-process correction failed: {e}")
643 # CONSENT GATE: external HTTP requires consent
644 if self._is_external_target():
645 user_id = (context or {}).get('user_id', expert_id)
646 if not self._has_cloud_consent(user_id):
647 return {'success': False, 'reason': 'Cloud data consent required'}
649 # HTTP fallback
650 if self._http_disabled:
651 return {'success': False, 'reason': 'Learning not available (bundled mode)'}
652 body = {
653 'original_response': original_response[:5000],
654 'corrected_response': corrected_response[:5000],
655 'expert_id': expert_id,
656 'confidence': max(0.0, min(1.0, confidence)),
657 }
658 if explanation:
659 body['explanation'] = explanation[:2000]
660 if context:
661 body['context'] = context
662 if valid_until:
663 body['valid_until'] = valid_until
665 if self._cb_is_open():
666 return {'success': False, 'reason': 'Circuit breaker open'}
668 try:
669 resp = pooled_post(
670 f'{self._api_url}/v1/corrections',
671 json=body,
672 timeout=self._timeout_correction,
673 )
674 self._cb_record_success()
675 if resp.status_code == 200:
676 with self._lock:
677 self._stats['total_corrections'] += 1
678 return resp.json()
679 return {'success': False, 'reason': f'HTTP {resp.status_code}'}
680 except requests.RequestException as e:
681 self._cb_record_failure()
682 logger.debug(f"Correction submission failed: {e}")
683 return {'success': False, 'reason': str(e)}
685 # ─── HiveMind collective thinking ────────────────────────────────
687 def register_peer_agent(
688 self,
689 peer_id: str,
690 agent_type: str = 'remote_peer',
691 latent_dim: int = 2048,
692 capabilities: Optional[list] = None,
693 ) -> bool:
694 """Best-effort registration of a newly-linked peer with the in-
695 process HiveMind so `fuse_thoughts` / `think_together_distributed`
696 can include it in MoE consensus.
698 Called from `core.peer_link.link_manager.upgrade_peer` right
699 after a successful `link.connect()`. Safe to invoke in any
700 topology:
702 - In-process HiveMind loaded → real registration.
703 - In-process HiveMind missing (no hevolveai, HTTP-only central
704 tier) → logs at debug and returns False.
705 - Any exception inside HiveMind → swallowed at debug; the link
706 upgrade must never fail because the hive can't register.
708 Returns ``True`` iff the peer was accepted into the HiveMind
709 agent registry. Callers generally ignore the return value —
710 this is a side-effect hook, not a prerequisite.
711 """
712 if not (self._in_process and self._hive_mind):
713 logger.debug(
714 f"[register_peer_agent] no in-process HiveMind; "
715 f"skipping peer {peer_id[:8] if peer_id else '?'}"
716 )
717 return False
718 try:
719 # Resolve AgentCapability lazily — hevolveai might expose a
720 # different enum shape across versions; fall back to string
721 # capabilities when the enum isn't importable.
722 caps: list
723 try:
724 from hevolveai.embodied_ai.learning.hive_mind import (
725 AgentCapability,
726 )
727 default_caps = [
728 AgentCapability.ENCODE,
729 AgentCapability.REASON,
730 ]
731 caps = list(capabilities) if capabilities else default_caps
732 except Exception:
733 caps = list(capabilities) if capabilities else [
734 'encode', 'reason',
735 ]
736 self._hive_mind.register_agent(
737 agent_id=peer_id,
738 agent_type=agent_type,
739 latent_dim=latent_dim,
740 capabilities=caps,
741 )
742 logger.info(
743 f"[register_peer_agent] peer={peer_id[:8]} "
744 f"type={agent_type} dim={latent_dim} registered"
745 )
746 return True
747 except Exception as e:
748 logger.debug(
749 f"[register_peer_agent] peer={peer_id[:8] if peer_id else '?'} "
750 f"skipped: {e}"
751 )
752 return False
754 # ─── G12: parallel SSM student inference ─────────────────────────
756 def predict_student(
757 self,
758 prompt: str,
759 messages: Optional[List[Dict]] = None,
760 temperature: float = 0.7,
761 max_tokens: int = 256,
762 timeout_s: float = 2.0,
763 ) -> Optional[Dict]:
764 """Run the HevolveAI SSM student forward pass in-process.
766 G12 — closes the gap where HARTOS' CustomGPT posts directly to
767 llama.cpp and never involves the SSM student. When this is
768 available, callers can fire the student in parallel with the
769 llama.cpp teacher and feed the (teacher, student) pair into the
770 distillation engine for continuous learning.
772 Returns:
773 dict with keys {'response', 'action_tensor', 'epistemic_data'}
774 on success, or None if:
775 - In-process mode not active
776 - Provider doesn't expose _generate_response
777 - Student forward pass failed / timed out
779 Design notes:
780 * Synchronous call; use an executor at the call site if you
781 want to overlap it with the teacher HTTP request.
782 * Gated by HEVOLVE_PARALLEL_SSM=0 to disable per-deployment.
783 * Never raises — the teacher path must not be blocked by a
784 failing student.
785 """
786 import os as _os
787 if _os.environ.get('HEVOLVE_PARALLEL_SSM', '1') == '0':
788 return None
789 if not (self._in_process and self._provider):
790 return None
791 gen = getattr(self._provider, '_generate_response', None)
792 if gen is None or not callable(gen):
793 return None
795 if messages is None:
796 messages = [{'role': 'user', 'content': prompt}]
797 try:
798 # _generate_response returns (response_text, action_tensor,
799 # epistemic_data). Signature verified against
800 # learning_llm_provider.py:1511.
801 res = gen(prompt, messages, temperature,
802 cached_vision_features=None)
803 except Exception as e:
804 logger.debug(f"[predict_student] SSM forward failed: {e}")
805 return None
807 try:
808 if isinstance(res, tuple) and len(res) >= 3:
809 response_text, action_tensor, epistemic_data = res[:3]
810 elif isinstance(res, tuple) and len(res) == 2:
811 response_text, action_tensor = res
812 epistemic_data = None
813 else:
814 response_text, action_tensor, epistemic_data = (
815 str(res), None, None)
816 with self._lock:
817 self._stats.setdefault('student_inferences', 0)
818 self._stats['student_inferences'] += 1
819 return {
820 'response': response_text,
821 'action_tensor': action_tensor,
822 'epistemic_data': epistemic_data,
823 }
824 except Exception as e:
825 logger.debug(f"[predict_student] result unpack failed: {e}")
826 return None
828 def record_teacher_student_pair(
829 self,
830 prompt: str,
831 teacher_response: str,
832 student_response: str,
833 student_action=None,
834 ) -> bool:
835 """Enqueue a (teacher, student) pair for Qwen distillation.
837 Feeds the HevolveAI DistillationEngine the same way the in-process
838 parallel path does (learning_llm_provider.py:1551-1558). Fire-and-
839 forget; returns True iff enqueue succeeded.
840 """
841 if not (self._in_process and self._provider):
842 return False
843 engine = getattr(self._provider, 'distillation_engine', None)
844 if engine is None:
845 return False
846 if not student_response or not teacher_response:
847 return False
848 if student_response == teacher_response:
849 # No correction needed — skip.
850 return False
851 try:
852 engine.enqueue(
853 query=prompt,
854 teacher_response=teacher_response,
855 student_response=student_response,
856 student_action=student_action,
857 )
858 with self._lock:
859 self._stats.setdefault('distillation_pairs', 0)
860 self._stats['distillation_pairs'] += 1
861 return True
862 except Exception as e:
863 logger.debug(f"[distillation] enqueue failed: {e}")
864 return False
866 def query_hivemind(self, query_text: str,
867 timeout_ms: int = 1000,
868 user_id: str = None) -> Optional[dict]:
869 """Query HevolveAI's HiveMind for distributed collective thinking.
871 In-process: calls hive_mind.think_together_distributed() directly.
872 HTTP: POST /v1/hivemind/think.
874 HevolveAI:
875 1. Encodes query via frozen Qwen alignment layer (2048-D)
876 2. Publishes local thought to WAMP
877 3. Waits for remote agent responses (timeout_ms)
878 4. Fuses with attention-weighted method
879 5. Returns collective thought with contributing agents + weights
881 Use this for real-time multi-modal reasoning (tensor-level fusion).
882 For coarse-grained task delegation, use agent-level dispatch instead.
884 CCT gating: requires 'hivemind_query' capability. Without valid CCT,
885 returns cached/stale response (graceful degradation, not hard block).
886 """
887 # U3: Check hive participation setting
888 if user_id and not self._has_hive_participation(user_id):
889 logger.debug(f"HiveMind query skipped: user {user_id} opted out of hive")
890 return None
892 # CCT access gate (graceful: degrade to cached, don't block)
893 if not self._check_cct_access('hivemind_query'):
894 logger.debug("HiveMind query: no CCT with hivemind_query capability")
895 cached = self._federation_aggregated.get('last_thought')
896 if cached:
897 return {'thought': cached, 'source': 'cached', 'cct_gated': True}
898 return None
900 try:
901 from security.hive_guardrails import ConstitutionalFilter
902 passed, _ = ConstitutionalFilter.check_prompt(query_text)
903 if not passed:
904 return None
905 except ImportError:
906 pass
908 # In-process direct call
909 if self._in_process and self._hive_mind:
910 try:
911 import torch
912 # Encode query text as a thought tensor.
913 # Prefer the provider's encoder; if unavailable, fall back to
914 # a deterministic hash-derived vector so repeated queries of
915 # the same text yield the same latent (vs torch.randn which
916 # would make every call dissimilar to itself).
917 # Mirrors the fallback in api_server.py's /hivethink handler.
918 thought = None
919 if (self._provider and
920 hasattr(self._provider, 'embodied_agent') and
921 self._provider.embodied_agent and
922 hasattr(self._provider.embodied_agent, 'encoder')):
923 try:
924 encoder = self._provider.embodied_agent.encoder
925 if hasattr(encoder, 'encode_text'):
926 encoded = encoder.encode_text(query_text)
927 # encode_text may return a tensor or (tensor, info)
928 if isinstance(encoded, tuple):
929 encoded = encoded[0]
930 flat = encoded.detach().cpu().flatten()
931 if flat.shape[0] > 2048:
932 flat = flat[:2048]
933 elif flat.shape[0] < 2048:
934 flat = torch.nn.functional.pad(
935 flat, (0, 2048 - flat.shape[0])
936 )
937 thought = torch.nn.functional.normalize(
938 flat, dim=0
939 ).unsqueeze(0)
940 except Exception as enc_err:
941 logger.debug(
942 f"[Bridge] Encoder fallback for hivemind_query: {enc_err}"
943 )
944 thought = None
946 if thought is None:
947 # Deterministic hash -> 2048-dim bit-expanded vector.
948 # Same input yields same vector (unlike torch.randn).
949 query_hash = hash(query_text) % (2 ** 31)
950 vec = torch.zeros(2048, device='cpu')
951 for i in range(2048):
952 vec[i] = ((query_hash >> (i % 31)) & 1) * 0.02 - 0.01
953 thought = torch.nn.functional.normalize(vec, dim=0).unsqueeze(0)
955 result = self._hive_mind.think_together_distributed(
956 local_thought=thought,
957 local_agent_id=getattr(
958 self._hive_mind, '_local_agent_id', 'local'),
959 timeout_ms=timeout_ms,
960 **({"owner_id": user_id} if user_id else {}),
961 )
962 with self._lock:
963 self._stats['total_hivemind_queries'] += 1
964 return {
965 'thought': result.text if hasattr(result, 'text')
966 else str(result)
967 }
968 except Exception as e:
969 logger.debug(f"In-process hivemind failed: {e}")
971 # PRIVACY: Redact secrets from query before sending to shared hivemind
972 try:
973 from security.secret_redactor import redact_secrets
974 query_text, _ = redact_secrets(query_text)
975 except ImportError:
976 pass
978 # CONSENT GATE: external HTTP requires consent
979 if self._is_external_target() and user_id:
980 if not self._has_cloud_consent(user_id):
981 return None
983 # PeerLink path — collect thoughts from connected peers directly
984 try:
985 from core.peer_link.link_manager import get_link_manager
986 mgr = get_link_manager()
987 responses = mgr.collect('hivemind', timeout_ms=timeout_ms)
988 if responses:
989 with self._lock:
990 self._stats['total_hivemind_queries'] += 1
991 # Each peer response is a legitimate (query -> answer) pair.
992 # Feed them as training experiences so the local agent
993 # learns from cross-peer knowledge, not just transient
994 # query-response display. Deduplicated by peer_id +
995 # response-hash in record_interaction's batcher.
996 try:
997 for idx, peer_resp in enumerate(responses):
998 if not isinstance(peer_resp, dict):
999 continue
1000 peer_id = (peer_resp.get('peer_id')
1001 or peer_resp.get('node_id')
1002 or f'peer_{idx}')
1003 peer_text = (peer_resp.get('thought')
1004 or peer_resp.get('response')
1005 or peer_resp.get('text'))
1006 if not peer_text:
1007 continue
1008 self.record_interaction(
1009 user_id=user_id or 'hive',
1010 prompt_id=f'peerlink_{peer_id}',
1011 prompt=query_text[:2000],
1012 response=str(peer_text)[:5000],
1013 model_id=f'peerlink:{peer_id}',
1014 latency_ms=float(timeout_ms),
1015 node_id=peer_id,
1016 )
1017 with self._lock:
1018 self._stats.setdefault('peerlink_responses_trained', 0)
1019 self._stats['peerlink_responses_trained'] += len(responses)
1020 except Exception as train_err:
1021 logger.debug(
1022 f"[Bridge] Failed to feed PeerLink responses to training: {train_err}"
1023 )
1024 return {
1025 'thoughts': responses,
1026 'source': 'peerlink',
1027 'peer_count': len(responses),
1028 }
1029 except Exception:
1030 pass
1032 # HTTP fallback
1033 if self._http_disabled or self._cb_is_open():
1034 return None
1036 try:
1037 resp = pooled_post(
1038 f'{self._api_url}/v1/hivemind/think',
1039 json={'query': query_text[:2000], 'timeout_ms': timeout_ms},
1040 timeout=max(30, timeout_ms / 1000 + 5),
1041 )
1042 self._cb_record_success()
1043 if resp.status_code == 200:
1044 with self._lock:
1045 self._stats['total_hivemind_queries'] += 1
1046 return resp.json()
1047 except requests.RequestException as e:
1048 self._cb_record_failure()
1049 logger.debug(f"HiveMind query failed: {e}")
1050 return None
1052 # ─── Learning stats ──────────────────────────────────────────────
1054 def get_learning_stats(self) -> dict:
1055 """Get merged learning + hivemind + bridge stats.
1057 In-process: calls provider.get_stats() + hive_mind.get_stats().
1058 HTTP: GET /v1/stats + GET /v1/hivemind/stats.
1059 Returns combined dict for dashboard consumption.
1060 """
1061 result = {'learning': {}, 'hivemind': {}, 'bridge': self.get_stats()}
1063 if self._in_process:
1064 if self._provider:
1065 try:
1066 result['learning'] = self._provider.get_stats()
1067 except Exception:
1068 pass
1069 if self._hive_mind:
1070 try:
1071 result['hivemind'] = self._hive_mind.get_stats()
1072 except Exception:
1073 pass
1074 return result
1076 # HTTP fallback — guarded by circuit breaker to avoid retry storms
1077 if self._http_disabled or self._circuit_breaker.is_open():
1078 return result
1079 try:
1080 resp = pooled_get(
1081 f'{self._api_url}/v1/stats', timeout=self._timeout_default)
1082 if resp.status_code == 200:
1083 result['learning'] = resp.json()
1084 self._circuit_breaker.record_success()
1085 else:
1086 self._circuit_breaker.record_failure()
1087 except requests.RequestException:
1088 self._circuit_breaker.record_failure()
1090 try:
1091 resp = pooled_get(
1092 f'{self._api_url}/v1/hivemind/stats', timeout=self._timeout_default)
1093 if resp.status_code == 200:
1094 result['hivemind'] = resp.json()
1095 self._circuit_breaker.record_success()
1096 else:
1097 self._circuit_breaker.record_failure()
1098 except requests.RequestException:
1099 self._circuit_breaker.record_failure()
1101 return result
1103 def get_hivemind_agents(self) -> list:
1104 """Get list of connected HiveMind agents.
1106 In-process: calls hive_mind.get_all_agents().
1107 HTTP: GET /v1/hivemind/agents.
1108 Returns agent specs with capabilities, modality, latent dimensions.
1109 """
1110 if self._in_process and self._hive_mind:
1111 try:
1112 return self._hive_mind.get_all_agents()
1113 except Exception:
1114 pass
1116 # HTTP fallback — guarded by circuit breaker
1117 if self._http_disabled or self._circuit_breaker.is_open():
1118 return []
1119 try:
1120 resp = pooled_get(
1121 f'{self._api_url}/v1/hivemind/agents', timeout=self._timeout_default)
1122 if resp.status_code == 200:
1123 data = resp.json()
1124 self._circuit_breaker.record_success()
1125 return data.get('agents', data) if isinstance(data, dict) else data
1126 self._circuit_breaker.record_failure()
1127 except requests.RequestException:
1128 self._circuit_breaker.record_failure()
1129 return []
1131 # ─── RALT skill distribution ─────────────────────────────────────
1133 def distribute_skill_packet(self, ralt_packet: dict,
1134 node_id: str,
1135 target_nodes: list = None) -> dict:
1136 """Distribute a learned skill (RALT packet) across hive nodes.
1138 Gossip broadcasts a notification that a skill is available.
1139 Each receiving peer triggers RALT ingestion through their own
1140 local HevolveAI instance.
1142 GUARDRAILS:
1143 1. CCT gate — requires 'skill_distribution' capability
1144 2. WorldModelSafetyBounds.gate_ralt_export() — rate limit + witness
1145 3. ConstructiveFilter.check_output() — constructiveness check
1146 """
1147 # CCT access gate (hard block: cannot distribute without contribution)
1148 if not self._check_cct_access('skill_distribution'):
1149 logger.info("Skill distribution blocked: no CCT with "
1150 "skill_distribution capability")
1151 with self._lock:
1152 self._stats['total_skills_blocked'] += 1
1153 return {'success': False, 'reason': 'no_cct_skill_distribution'}
1155 try:
1156 from security.hive_guardrails import WorldModelSafetyBounds
1157 passed, reason = WorldModelSafetyBounds.gate_ralt_export(
1158 ralt_packet, node_id)
1159 if not passed:
1160 with self._lock:
1161 self._stats['total_skills_blocked'] += 1
1162 return {'success': False, 'reason': reason}
1163 except ImportError:
1164 pass
1166 try:
1167 from security.hive_guardrails import ConstructiveFilter
1168 desc = ralt_packet.get('description', '')
1169 passed, reason = ConstructiveFilter.check_output(desc)
1170 if not passed:
1171 with self._lock:
1172 self._stats['total_skills_blocked'] += 1
1173 return {'success': False, 'reason': reason}
1174 except ImportError:
1175 pass
1177 # Notify peers via gossip that a skill is available
1178 # Each peer ingests via their local HevolveAI RALT receiver
1179 try:
1180 from integrations.social.peer_discovery import gossip
1181 gossip.broadcast({
1182 'type': 'ralt_skill_available',
1183 'packet_summary': {
1184 'task_id': ralt_packet.get('task_id'),
1185 'description': ralt_packet.get('description', '')[:200],
1186 'complexity': ralt_packet.get('complexity', 'unknown'),
1187 },
1188 'source_node': node_id,
1189 'source_api_url': self._api_url,
1190 'timestamp': time.time(),
1191 }, targets=target_nodes)
1192 with self._lock:
1193 self._stats['total_skills_distributed'] += 1
1194 return {'success': True}
1195 except Exception as e:
1196 logger.debug(f"RALT distribution skipped: {e}")
1197 return {'success': False, 'reason': str(e)}
1199 # ─── RALT skill ingestion (inbound, peer → local) ────────────────
1200 #
1201 # Complements distribute_skill_packet(): that method is outbound
1202 # (local HevolveAI learned a skill → notify peers). The two methods
1203 # below are the inbound path (peer learned a skill → pull & install
1204 # into local HevolveAI). Receiver side of the RALT hive loop.
1205 #
1206 # Wiring:
1207 # gossip peer_broadcast endpoint (discovery.py)
1208 # -> handle_ralt_skill_notification(msg) ← notification
1209 # pulls full packet from source_api_url
1210 # -> ingest_skill_packet(packet_dict) ← install
1211 # in-process: agent.import_skill(RALTPacket.from_wire(dict))
1212 # HTTP: POST /v1/ralt/skills/install
1214 def handle_ralt_skill_notification(self, msg: dict) -> dict:
1215 """Pull a newly-announced skill from its source node and install it.
1217 Invoked by the /api/social/peers/broadcast gossip receiver when
1218 msg['type'] == 'ralt_skill_available'. The notification carries
1219 only a summary; we fetch the full packet from the sender's
1220 /v1/ralt/skills/export/{task_id} and hand it to
1221 ingest_skill_packet.
1223 Failure modes are returned as dicts rather than raised so the
1224 gossip dispatcher can reply with a structured diagnostic to the
1225 sender without crashing the blueprint.
1226 """
1227 summary = msg.get('packet_summary') or {}
1228 task_id = summary.get('task_id')
1229 source_url = (msg.get('source_api_url') or '').rstrip('/')
1230 source_node = msg.get('source_node', 'unknown')
1232 if not task_id or not source_url:
1233 return {'success': False,
1234 'reason': 'missing_task_id_or_source_api_url'}
1236 # Echo prevention: don't ingest skills we ourselves broadcast.
1237 # _node_id on the bridge is currently unset; fall back to the
1238 # gossip singleton which holds the canonical local node_id.
1239 try:
1240 from integrations.social.peer_discovery import gossip
1241 local_node = self._node_id or getattr(gossip, 'node_id', None)
1242 except Exception:
1243 local_node = self._node_id
1244 if local_node and source_node == local_node:
1245 return {'success': False, 'reason': 'echo_skip'}
1247 # Pull the packet. Use circuit breaker so a repeatedly failing
1248 # source can't stall the gossip dispatcher.
1249 if self._circuit_breaker.is_open():
1250 return {'success': False, 'reason': 'circuit_open'}
1251 try:
1252 resp = pooled_get(
1253 f"{source_url}/v1/ralt/skills/export/{task_id}",
1254 timeout=self._timeout_default)
1255 except requests.RequestException as e:
1256 self._circuit_breaker.record_failure()
1257 return {'success': False, 'reason': f'fetch_failed: {e}'}
1259 if resp.status_code != 200:
1260 self._circuit_breaker.record_failure()
1261 return {'success': False,
1262 'reason': f'export_http_{resp.status_code}'}
1263 try:
1264 packet_dict = resp.json()
1265 except ValueError as e:
1266 self._circuit_breaker.record_failure()
1267 return {'success': False,
1268 'reason': f'export_not_json: {e}'}
1269 self._circuit_breaker.record_success()
1271 return self.ingest_skill_packet(packet_dict, source_node=source_node)
1273 def ingest_skill_packet(self, packet_dict: dict,
1274 source_node: str = '') -> dict:
1275 """Install a wire-format RALT packet into the local HevolveAI.
1277 In-process: reconstruct RALTPacket and call embodied_agent
1278 .import_skill() directly (zero HTTP overhead, same code path as
1279 the local learning loop at integrated_realtime_agent.py:1331).
1281 HTTP: POST the dict to /v1/ralt/skills/install on the local
1282 HevolveAI API server.
1284 Guardrails:
1285 1. CCT gate — requires a token with 'skill_distribution'
1286 capability on the INGEST side too (prevents unauthenticated
1287 peers from pushing skills into our local model).
1288 2. WorldModelSafetyBounds.gate_ralt_ingest (if available) —
1289 mirror of gate_ralt_export used by distribute_skill_packet.
1290 """
1291 # Symmetric CCT gate: we require skill_distribution to ingest
1292 # because ingestion mutates the local world model. Mirrors the
1293 # check in distribute_skill_packet (line 923).
1294 if not self._check_cct_access('skill_distribution'):
1295 logger.info("Skill ingest blocked: no CCT with "
1296 "skill_distribution capability")
1297 with self._lock:
1298 self._stats['total_skills_blocked'] += 1
1299 return {'success': False, 'reason': 'no_cct_skill_distribution'}
1301 # Optional ingest-side guardrail (inbound analog of
1302 # gate_ralt_export). Skip gracefully if not implemented.
1303 try:
1304 from security.hive_guardrails import WorldModelSafetyBounds
1305 if hasattr(WorldModelSafetyBounds, 'gate_ralt_ingest'):
1306 passed, reason = WorldModelSafetyBounds.gate_ralt_ingest(
1307 packet_dict, source_node)
1308 if not passed:
1309 with self._lock:
1310 self._stats['total_skills_blocked'] += 1
1311 return {'success': False, 'reason': reason}
1312 except ImportError:
1313 pass
1314 except Exception as e:
1315 logger.debug(f"gate_ralt_ingest skipped: {e}")
1317 # In-process fast path: reuse the exact same call that the
1318 # local learning loop uses after self-learning (see
1319 # integrated_realtime_agent.py:1331). Importing RALTPacket
1320 # lazily so this module works when HevolveAI isn't installed.
1321 if self._in_process and self._provider is not None:
1322 try:
1323 agent = getattr(self._provider, 'embodied_agent', None)
1324 if agent is not None and hasattr(agent, 'import_skill'):
1325 from hevolveai.embodied_ai.learning.latent_transfer import (
1326 RALTPacket)
1327 pkt = RALTPacket.from_wire(packet_dict)
1328 ok = bool(agent.import_skill(pkt, verify_topology=True))
1329 with self._lock:
1330 self._stats.setdefault('total_skills_received', 0)
1331 self._stats.setdefault('total_skills_installed', 0)
1332 self._stats['total_skills_received'] += 1
1333 if ok:
1334 self._stats['total_skills_installed'] += 1
1335 return {
1336 'success': ok,
1337 'mode': 'in_process',
1338 'task_id': pkt.task_id,
1339 'source_id': pkt.source_id,
1340 }
1341 except Exception as e:
1342 # Fall through to HTTP — in-process path failed but the
1343 # HTTP API may still be reachable in a hybrid setup.
1344 logger.debug(
1345 f"[WorldModelBridge] in-process skill ingest failed, "
1346 f"falling back to HTTP: {e}")
1348 # HTTP fallback: POST to local HevolveAI /v1/ralt/skills/install
1349 if self._http_disabled or self._circuit_breaker.is_open():
1350 return {'success': False,
1351 'reason': 'http_disabled_or_circuit_open'}
1352 try:
1353 resp = pooled_post(
1354 f'{self._api_url}/v1/ralt/skills/install',
1355 json=packet_dict,
1356 timeout=self._timeout_default)
1357 except requests.RequestException as e:
1358 self._circuit_breaker.record_failure()
1359 return {'success': False, 'reason': f'install_fetch_failed: {e}'}
1361 if resp.status_code != 200:
1362 self._circuit_breaker.record_failure()
1363 return {'success': False,
1364 'reason': f'install_http_{resp.status_code}'}
1365 self._circuit_breaker.record_success()
1366 try:
1367 body = resp.json()
1368 except ValueError:
1369 body = {'success': False, 'reason': 'install_not_json'}
1370 with self._lock:
1371 self._stats.setdefault('total_skills_received', 0)
1372 self._stats.setdefault('total_skills_installed', 0)
1373 self._stats['total_skills_received'] += 1
1374 if body.get('success'):
1375 self._stats['total_skills_installed'] += 1
1376 body.setdefault('mode', 'http')
1377 return body
1379 # ─── Health ──────────────────────────────────────────────────────
1381 def check_health(self) -> dict:
1382 """Check learning pipeline health.
1384 In-process: returns healthy if provider is available.
1385 HTTP: GET /health on HevolveAI API.
1386 """
1387 if self._in_process and self._provider:
1388 return {
1389 'healthy': True,
1390 'learning_active': True,
1391 'mode': 'in_process',
1392 'node_tier': self._node_tier,
1393 }
1395 # HTTP fallback
1396 if self._http_disabled:
1397 return {
1398 'healthy': False,
1399 'learning_active': False,
1400 'mode': 'disabled',
1401 'node_tier': self._node_tier,
1402 }
1403 try:
1404 resp = pooled_get(
1405 f'{self._api_url}/health', timeout=5)
1406 if resp.status_code == 200:
1407 data = resp.json() if resp.headers.get(
1408 'content-type', '').startswith('application/json') else {}
1409 return {
1410 'healthy': True,
1411 'learning_active': True,
1412 'mode': 'http',
1413 'node_tier': self._node_tier,
1414 'details': data,
1415 }
1416 return {
1417 'healthy': False,
1418 'learning_active': False,
1419 'mode': 'http',
1420 'node_tier': self._node_tier,
1421 'details': {'status_code': resp.status_code},
1422 }
1423 except requests.RequestException as e:
1424 return {
1425 'healthy': False,
1426 'learning_active': False,
1427 'mode': 'http',
1428 'node_tier': self._node_tier,
1429 'details': {'error': str(e)},
1430 }
1432 def get_stats(self) -> dict:
1433 """Get bridge-level statistics."""
1434 with self._lock:
1435 return {
1436 'queue_size': len(self._experience_queue),
1437 'api_url': self._api_url,
1438 'in_process': self._in_process,
1439 **self._stats,
1440 }
1442 # ─── Embodied interaction (same latent space) ────────────────
1444 def send_action(self, action: dict) -> bool:
1445 """Send a motor/actuator command to HevolveAI's world model.
1447 The world model operates in one latent space — text, sensors,
1448 motors are all representations of the same world. Actions are
1449 predictions that the world model tests against reality.
1451 Args:
1452 action: Dict with type, target, params, timestamp.
1453 e.g. {'type': 'motor_velocity', 'target': 'left_wheel',
1454 'params': {'velocity': 0.5}, 'timestamp': ...}
1456 Returns:
1457 True if action was sent successfully.
1458 """
1459 # Safety check before forwarding
1460 try:
1461 from integrations.robotics.safety_monitor import get_safety_monitor
1462 monitor = get_safety_monitor()
1463 if monitor.is_estopped:
1464 logger.warning("[WorldModelBridge] Action blocked: E-stop active")
1465 return False
1466 # Check position if action has spatial params
1467 position = action.get('params', {})
1468 if any(k in position for k in ('x', 'y', 'z')):
1469 if not monitor.check_position_safe(position):
1470 logger.warning("[WorldModelBridge] Action blocked: outside workspace")
1471 return False
1472 except ImportError:
1473 pass
1475 # In-process: actions route through submit_correction() for error
1476 # propagation when outcomes differ from predictions. No separate
1477 # action_stream module — the correction path handles backprop.
1479 # HTTP fallback
1480 if self._http_disabled or self._cb_is_open():
1481 return False
1483 try:
1484 resp = pooled_post(
1485 f'{self._api_url}/v1/actions',
1486 json=action,
1487 timeout=self._timeout_default,
1488 )
1489 if resp.status_code in (200, 201):
1490 self._cb_record_success()
1491 with self._lock:
1492 self._stats['total_actions_sent'] = self._stats.get(
1493 'total_actions_sent', 0) + 1
1494 return True
1495 self._cb_record_failure()
1496 return False
1497 except requests.RequestException:
1498 self._cb_record_failure()
1499 return False
1501 def ingest_sensor_batch(self, readings: list) -> int:
1502 """Feed sensor data to HevolveAI's world model for learning.
1504 The world model's latent space includes physical sensor state.
1505 This method batches sensor readings and flushes them to HevolveAI
1506 for continuous learning — the same way text experiences are flushed.
1508 Args:
1509 readings: List of SensorReading.to_dict() dicts.
1511 Returns:
1512 Number of readings successfully ingested.
1513 """
1514 if not readings:
1515 return 0
1517 # In-process: sensor data routes through submit_sensor_frame() →
1518 # embodied.step(sensor, train=True). SensorInput dataclass already
1519 # carries type/modality metadata. No separate sensor_ingest module.
1521 # HTTP fallback
1522 if self._http_disabled or self._cb_is_open():
1523 return 0
1525 try:
1526 resp = pooled_post(
1527 f'{self._api_url}/v1/sensors/batch',
1528 json={'readings': readings},
1529 timeout=self._timeout_flush,
1530 )
1531 if resp.status_code in (200, 201):
1532 self._cb_record_success()
1533 with self._lock:
1534 self._stats['total_sensor_readings'] = self._stats.get(
1535 'total_sensor_readings', 0) + len(readings)
1536 return len(readings)
1537 self._cb_record_failure()
1538 return 0
1539 except requests.RequestException:
1540 self._cb_record_failure()
1541 return 0
1543 def get_learning_feedback(self) -> Optional[Dict]:
1544 """Poll HevolveAI for real-time learning feedback.
1546 The world model continuously learns from sensor+action data.
1547 This method retrieves corrections/predictions — trajectory
1548 adjustments, new obstacle awareness, learned patterns.
1550 Returns:
1551 Feedback dict from HevolveAI, or None if unavailable.
1552 """
1553 # In-process: feedback is returned inline by step() as InferenceStats
1554 # (attention weights, epistemic uncertainty, kernel corrections).
1555 # The provider exposes this via get_stats(). No separate module.
1556 if self._in_process and self._provider:
1557 try:
1558 stats = self._provider.get_stats()
1559 return stats.get('last_feedback') or stats
1560 except Exception:
1561 pass
1563 # HTTP fallback
1564 if self._http_disabled or self._cb_is_open():
1565 return None
1567 try:
1568 resp = pooled_get(
1569 f'{self._api_url}/v1/feedback/latest',
1570 timeout=self._timeout_default,
1571 )
1572 if resp.status_code == 200:
1573 self._cb_record_success()
1574 return resp.json()
1575 self._cb_record_failure()
1576 return None
1577 except requests.RequestException:
1578 self._cb_record_failure()
1579 return None
1581 def record_embodied_interaction(
1582 self, action: dict, sensor_context: dict, outcome: dict,
1583 ):
1584 """Record an action+sensor+outcome triple for recipe learning.
1586 This extends record_interaction() for physical actions.
1587 The triple becomes CREATE mode training data: what action was
1588 taken, what sensor state surrounded it, what was the outcome.
1590 Stored as experiences in the same queue that text interactions use.
1591 Same latent space — the world model doesn't distinguish modalities.
1592 """
1593 experience = {
1594 'type': 'embodied_interaction',
1595 'action': action,
1596 'sensor_context': sensor_context,
1597 'outcome': outcome,
1598 'timestamp': action.get('timestamp', 0),
1599 'node_tier': self._node_tier,
1600 }
1601 self._experience_queue.append(experience)
1602 with self._lock:
1603 self._stats['total_recorded'] = self._stats.get('total_recorded', 0) + 1
1605 def emergency_stop(self) -> bool:
1606 """Send zero-velocity to all actuators via HevolveAI.
1608 This is the bridge-level emergency stop — it tells HevolveAI to
1609 immediately halt all physical outputs.
1610 """
1611 estop_action = {
1612 'type': 'emergency_stop',
1613 'target': '*',
1614 'params': {'velocity': 0, 'force': 0},
1615 }
1617 # In-process: send zero-velocity via the same HTTP estop endpoint.
1618 # Emergency stop must be reliable — no in-process shortcut that
1619 # could silently fail on ImportError.
1620 try:
1621 resp = pooled_post(
1622 f'{self._api_url}/v1/actions/estop',
1623 json=estop_action,
1624 timeout=3,
1625 )
1626 return resp.status_code in (200, 201)
1627 except requests.RequestException:
1628 return False
1630 # ─── Sensor frame forwarding to HevolveAI ────────────────────
1632 def submit_sensor_frame(
1633 self, user_id: str, frame_bytes: bytes,
1634 channel: str = 'camera', reality_signature: float = 1.0,
1635 ):
1636 """Forward raw sensor frame to HevolveAI for visual encoding + learning.
1638 HARTOS captures camera/screen frames locally. This method forwards
1639 them to HevolveAI's /v1/sensor/ingest endpoint so the embodied AI
1640 can: encode via Qwen -> predict next state -> compute error -> learn.
1642 Called only on scene change (adaptive sampling) to avoid overwhelming
1643 the learning pipeline.
1645 In-process mode: calls learn_from_feedback directly with encoded frame.
1646 HTTP mode: POST /v1/sensor/ingest with base64 data.
1647 """
1648 import base64
1650 source = 'camera' if channel == 'camera' else 'screen'
1651 sig = reality_signature if channel == 'camera' else 0.0
1653 if self._in_process and self._provider:
1654 # In-process: encode frame and feed through learning pipeline
1655 try:
1656 import torch
1657 embodied = getattr(self._provider, 'embodied_agent', None)
1658 encoder = getattr(embodied, 'encoder', None) if embodied else None
1659 if encoder is None:
1660 encoder = getattr(self._provider, 'qwen_encoder', None)
1661 if encoder is not None:
1662 from PIL import Image
1663 import io
1664 img = Image.open(io.BytesIO(frame_bytes)).convert('RGB')
1665 encoding = encoder.encode_image(img)
1667 # Route through embodied agent step() — same path as /v1/sensor/ingest
1668 if embodied is not None and hasattr(embodied, 'step'):
1669 from hevolveai.embodied_ai.types.sensor_input import SensorInput
1670 sensor = SensorInput(
1671 data=encoding if isinstance(encoding, torch.Tensor)
1672 else torch.tensor(encoding).float(),
1673 input_type='encoded_features',
1674 modality='vision',
1675 metadata={'reality_signature': sig, 'source': source},
1676 )
1677 embodied.step(sensor)
1678 return
1679 except Exception as e:
1680 logger.debug(f"[WorldModelBridge] In-process sensor frame failed: {e}")
1682 # HTTP fallback
1683 if self._http_disabled or self._cb_is_open():
1684 return
1686 try:
1687 data_b64 = base64.b64encode(frame_bytes).decode('ascii')
1688 resp = pooled_post(
1689 f'{self._api_url}/v1/sensor/ingest',
1690 json={
1691 'modality': 'vision',
1692 'source': source,
1693 'data': data_b64,
1694 'format': 'jpeg',
1695 'session_id': f'{user_id}_{source}',
1696 'reality_signature': sig,
1697 },
1698 timeout=self._timeout_default,
1699 )
1700 if resp.status_code == 200:
1701 self._cb_record_success()
1702 else:
1703 self._cb_record_failure()
1704 except requests.RequestException:
1705 self._cb_record_failure()
1707 def submit_output_feedback(
1708 self, output_modality: str, status: str, context: str,
1709 model_used: str = 'unknown', error_message: str = None,
1710 generation_time_seconds: float = 0.0, user_id: str = 'default',
1711 generated_data: bytes = None, generated_format: str = None,
1712 ):
1713 """Report output modality generation result to HevolveAI for learning.
1715 Routes through existing endpoints (no redundant API surface):
1716 - Success with data: /v1/sensor/ingest (generated output = observation)
1717 - Error/rejection: /v1/corrections (correction signal)
1718 - Success without data: record_interaction (text experience)
1720 Generated outputs are just observations with reality_signature=0.0.
1721 Errors are corrections that teach modality routing.
1722 """
1723 import base64
1725 # Success with generated data: treat as sensor observation
1726 if status == 'completed' and generated_data is not None:
1727 # Map modality format to sensor ingest format
1728 modality_map = {
1729 'image': 'vision',
1730 'audio_speech': 'audio',
1731 'audio_music': 'audio',
1732 'video': 'vision',
1733 'video_with_audio': 'multimodal',
1734 }
1735 sensor_modality = modality_map.get(output_modality, 'multimodal')
1736 fmt = generated_format or 'jpeg'
1738 if self._cb_is_open():
1739 return
1741 try:
1742 data_b64 = base64.b64encode(generated_data).decode('ascii')
1743 resp = pooled_post(
1744 f'{self._api_url}/v1/sensor/ingest',
1745 json={
1746 'modality': sensor_modality,
1747 'source': f'generated_{output_modality}',
1748 'data': data_b64,
1749 'format': fmt,
1750 'session_id': f'{user_id}_output_{output_modality}',
1751 'reality_signature': 0.0,
1752 'text': context[:500],
1753 },
1754 timeout=self._timeout_default,
1755 )
1756 if resp.status_code == 200:
1757 self._cb_record_success()
1758 else:
1759 self._cb_record_failure()
1760 except requests.RequestException:
1761 self._cb_record_failure()
1762 return
1764 # Error/rejection: route as correction.
1765 # C1 hive-memory-on-high-error: before submitting the correction,
1766 # ask the hive if peers have encountered this modality+error before.
1767 # Any collective hint is attached to the correction as explanation
1768 # so the KernelContinualLearner / LoRA learner gets richer context
1769 # than the raw error string alone. This closes the loop between
1770 # local failures and the shared error-memory store.
1771 if status in ('error', 'user_rejected') and error_message:
1772 hive_hint = None
1773 try:
1774 query_text = (
1775 f'output_failure modality={output_modality} '
1776 f'error={error_message[:200]} context={context[:200]}'
1777 )
1778 hive_result = self.query_hivemind(
1779 query_text, user_id=user_id, timeout_ms=200
1780 )
1781 if hive_result:
1782 if isinstance(hive_result, dict):
1783 hive_hint = (
1784 hive_result.get('thought')
1785 or hive_result.get('thoughts')
1786 or hive_result.get('source')
1787 )
1788 else:
1789 hive_hint = str(hive_result)[:400]
1790 if hive_hint:
1791 with self._lock:
1792 self._stats.setdefault(
1793 'hive_hints_on_output_error', 0)
1794 self._stats['hive_hints_on_output_error'] += 1
1795 except Exception as hive_err:
1796 logger.debug(
1797 f"[Bridge] hive hint query failed for output error: {hive_err}"
1798 )
1799 hive_hint = None
1801 explanation = None
1802 if hive_hint:
1803 explanation = f'hive_hint: {str(hive_hint)[:400]}'
1805 self.submit_correction(
1806 original_response=f'[{output_modality}] {context[:500]}',
1807 corrected_response=f'{output_modality} generation failed: {error_message}',
1808 expert_id=user_id,
1809 confidence=0.8 if status == 'user_rejected' else 0.5,
1810 explanation=explanation,
1811 context={'output_modality': output_modality,
1812 'status': status,
1813 'user_id': user_id,
1814 'hive_hint_used': bool(hive_hint)},
1815 )
1816 return
1818 # Success without data or pending: record as text interaction
1819 self.record_interaction(
1820 user_id=user_id,
1821 prompt_id=f'output_{output_modality}',
1822 prompt=context[:2000],
1823 response=f'[{output_modality} {status}] generated by {model_used} in {generation_time_seconds:.1f}s',
1824 model_id=model_used,
1825 latency_ms=generation_time_seconds * 1000,
1826 )
1828 # ─── Federation support ───────────────────────────────────────
1830 def extract_learning_delta(self) -> dict:
1831 """Pull stats for federation delta extraction.
1833 Used by FederatedAggregator.extract_local_delta() to build the
1834 lightweight metric delta that gets broadcast to peers.
1835 """
1836 stats = self.get_stats()
1837 learning = self.get_learning_stats()
1838 return {
1839 'bridge': stats,
1840 'learning': learning.get('learning', {}),
1841 'hivemind': learning.get('hivemind', {}),
1842 }
1844 def apply_federation_update(self, aggregated: dict) -> bool:
1845 """Store aggregated network-wide metrics locally and notify listeners.
1847 Does NOT push to HevolveAI's parametric learning — federation metrics
1848 are consumed by BenchmarkRegistry and dashboard, not the gradient
1849 pipeline. But the EventBus emit lets any local subscriber
1850 (dashboards, benchmark router, coding_agent benchmark_tracker)
1851 react without polling _federation_aggregated.
1853 Called by FederatedAggregator.apply_aggregated() (single entry point).
1854 """
1855 self._federation_aggregated = aggregated
1856 try:
1857 from core.platform.events import emit_event
1858 emit_event('learning.federation_update', {
1859 'epoch': aggregated.get('epoch', 0),
1860 'peer_count': aggregated.get('peer_count', 0),
1861 'convergence': aggregated.get('convergence'),
1862 })
1863 # Keep legacy topic for backward-compat with existing subscribers
1864 emit_event('federation.aggregated', {
1865 'epoch': aggregated.get('epoch', 0),
1866 'peer_count': aggregated.get('peer_count', 0),
1867 })
1868 except Exception as exc:
1869 logger.debug(f"[WorldModelBridge] federation event emit failed: {exc}")
1870 return True
1873# ─── Module-level singleton ───
1874_bridge = None
1875_bridge_lock = threading.Lock()
1878def get_world_model_bridge() -> WorldModelBridge:
1879 """Get or create the singleton WorldModelBridge."""
1880 global _bridge
1881 if _bridge is None:
1882 with _bridge_lock:
1883 if _bridge is None:
1884 _bridge = WorldModelBridge()
1885 return _bridge