Coverage for integrations / agent_engine / world_model_bridge.py: 62.8%

707 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Unified Agent Goal Engine - World Model Bridge 

3 

4Bridge between LLM-langchain orchestration and HevolveAI's embodied AI. 

5Every agent interaction becomes training data for continuous learning. 

6Skills distribute via gossip notification + local RALT ingestion. 

7 

8Dual-mode operation: 

9 IN-PROCESS (flat/regional): Direct Python calls — zero HTTP overhead. 

10 HevolveAI is pip-installed, learning functions called directly. 

11 HTTP FALLBACK (central standalone): REST calls to remote HevolveAI. 

12 Used when services run as separate processes on different ports. 

13 

14Bootstrap: HevolveAI uses llama.cpp (Qwen3-VL-2B, Q4_K_XL, ~1.5GB) locally. 

15No external API key needed. Learning is local-first, distributed via RALT + WAMP. 

16 

17GUARDRAILS applied at every layer: 

18- ConstitutionalFilter on experiences before storage 

19- WorldModelSafetyBounds on RALT export (rate limit + witness) 

20- ConstructiveFilter on skill packets (no destructive capabilities) 

21""" 

22import atexit 

23import json 

24import logging 

25import os 

26import sys 

27import time 

28import threading 

29from collections import deque 

30from concurrent.futures import ThreadPoolExecutor 

31from typing import Dict, List, Optional 

32 

33import requests 

34 

35from core.http_pool import pooled_get, pooled_post 

36 

37logger = logging.getLogger('hevolve_social') 

38 

39 

40class WorldModelBridge: 

41 """Bridge between LLM-langchain orchestration and HevolveAI embodied AI. 

42 

43 Dual-mode: in-process direct Python calls when HevolveAI is co-located 

44 (flat/regional), HTTP fallback when running as separate processes (central). 

45 

46 Two-tier thinking model: 

47 - Agent-level: Hevolve dispatches coarse-grained goals (task delegation) 

48 - Tensor-level: HevolveAI fuses heterogeneous agent thoughts (HiveMind) 

49 This bridge connects the two layers. 

50 """ 

51 

52 def __init__(self): 

53 self._api_url = os.environ.get( 

54 'HEVOLVEAI_API_URL', 'http://localhost:8000') 

55 self._node_tier = os.environ.get('HEVOLVE_NODE_TIER', 'flat') 

56 self._experience_queue: deque = deque(maxlen=10000) 

57 self._flush_executor = ThreadPoolExecutor( 

58 max_workers=2, thread_name_prefix='wm_flush') 

59 atexit.register(lambda: self._flush_executor.shutdown(wait=False)) 

60 self._flush_batch_size = int(os.environ.get( 

61 'HEVOLVE_WM_FLUSH_BATCH', '50')) 

62 self._lock = threading.Lock() 

63 self._stats = { 

64 'total_recorded': 0, 

65 'total_flushed': 0, 

66 'total_corrections': 0, 

67 'total_hivemind_queries': 0, 

68 'total_skills_distributed': 0, 

69 'total_skills_blocked': 0, 

70 'node_tier': self._node_tier, 

71 } 

72 

73 # Configurable HTTP timeouts 

74 self._timeout_flush = int(os.environ.get('HEVOLVE_WM_FLUSH_TIMEOUT', '15')) 

75 self._timeout_correction = int(os.environ.get('HEVOLVE_WM_CORRECTION_TIMEOUT', '30')) 

76 self._timeout_default = int(os.environ.get('HEVOLVE_WM_HTTP_TIMEOUT', '10')) 

77 

78 # Circuit breaker: reusable implementation from core 

79 from core.circuit_breaker import CircuitBreaker 

80 self._circuit_breaker = CircuitBreaker( 

81 name='world_model_bridge', threshold=5, cooldown=60.0) 

82 

83 # In-process mode: direct Python calls (no HTTP overhead) 

84 self._provider = None # LearningLLMProvider 

85 self._hive_mind = None # HiveMind 

86 self._in_process = False 

87 self._in_process_retry_done = False 

88 self._http_disabled = False # Set True when bundled + no server to talk to 

89 self._federation_aggregated = {} 

90 

91 # Cloud consent: per-user gate for non-local HTTP data sharing. 

92 # "If anyways it needs to be sent to cloud for something then 

93 # let's get consent from user at runtime." — project steward 

94 self._consent_cache: Dict[str, tuple] = {} # user_id → (bool, timestamp) 

95 self._consent_cache_ttl = 300 # 5 minutes 

96 

97 # CCT (Compute Contribution Token) cache for learning access gating 

98 self._cct_cache: Optional[tuple] = None # (cct_string, timestamp) 

99 self._cct_cache_ttl = 300 # 5 minutes 

100 self._node_id: Optional[str] = None 

101 

102 self._init_in_process() 

103 

104 # Disable HTTP when there's no server to talk to. 

105 # If in-process failed AND no explicit HEVOLVEAI_API_URL was configured 

106 # (just the default localhost:8000), there's no point spamming HTTP. 

107 # Bundled mode (NUNBA_BUNDLED) always disables — Nunba owns the lifecycle. 

108 # Non-bundled with default URL also disables — no server was started. 

109 if not self._in_process: 

110 _explicit_url = os.environ.get('HEVOLVEAI_API_URL', '') 

111 if os.environ.get('NUNBA_BUNDLED') == '1' or not _explicit_url: 

112 self._http_disabled = True 

113 logger.info( 

114 "[WorldModelBridge] Learning not available in-process, " 

115 "no explicit HEVOLVEAI_API_URL — HTTP disabled") 

116 

117 # Periodic HevolveAI integrity watcher (Gap 1 fix) 

118 self._crawl_watcher = None 

119 self._start_crawl_integrity_watcher() 

120 

121 def _start_crawl_integrity_watcher(self) -> None: 

122 """Start periodic HevolveAI integrity watcher if in-process mode active. 

123 

124 No-op if HTTP mode (nothing to watch — HTTP doesn't depend on 

125 HevolveAI's local file state). 

126 """ 

127 if not self._in_process: 

128 return 

129 try: 

130 from security.source_protection import CrawlIntegrityWatcher 

131 watcher = CrawlIntegrityWatcher() 

132 watcher.register_tamper_callback(self._on_crawl_tamper_detected) 

133 watcher.start() 

134 self._crawl_watcher = watcher 

135 logger.info("[WorldModelBridge] CrawlIntegrityWatcher started") 

136 except ImportError: 

137 pass # source_protection not available 

138 except Exception as e: 

139 logger.warning( 

140 f"[WorldModelBridge] CrawlIntegrityWatcher failed: {e}") 

141 

142 def _on_crawl_tamper_detected(self) -> None: 

143 """Callback: HevolveAI files changed post-boot. 

144 

145 Disable in-process mode and fall back to HTTP. 

146 Does NOT halt the hive — that requires master key. 

147 """ 

148 logger.critical( 

149 "[WorldModelBridge] HevolveAI tampering detected — " 

150 "disabling in-process mode, falling back to HTTP") 

151 with self._lock: 

152 self._in_process = False 

153 self._provider = None 

154 self._hive_mind = None 

155 

156 def _cb_is_open(self) -> bool: 

157 """Check if circuit breaker is open (blocking requests).""" 

158 return self._circuit_breaker.is_open() 

159 

160 def _cb_record_success(self): 

161 """Reset circuit breaker on successful call.""" 

162 self._circuit_breaker.record_success() 

163 

164 def _cb_record_failure(self): 

165 """Record failure; open circuit at threshold.""" 

166 self._circuit_breaker.record_failure() 

167 

168 def _is_external_target(self) -> bool: 

169 """Check if the API URL points to a non-local (cloud) endpoint.""" 

170 url = self._api_url.lower() 

171 local_prefixes = ( 

172 'http://localhost', 'http://127.0.0.1', 

173 'http://0.0.0.0', 'http://[::1]', 

174 ) 

175 return not any(url.startswith(p) for p in local_prefixes) 

176 

177 def _has_cloud_consent(self, user_id: str) -> bool: 

178 """Check if a user has consented to cloud data sharing. 

179 

180 Consent is stored in User.settings['cloud_data_consent']. 

181 Cached for 5 minutes to avoid DB lookups on every experience. 

182 In-process mode (local) does NOT require consent — data stays local. 

183 """ 

184 if not user_id: 

185 return False 

186 

187 now = time.time() 

188 cached = self._consent_cache.get(user_id) 

189 if cached and now - cached[1] < self._consent_cache_ttl: 

190 return cached[0] 

191 

192 consent = False 

193 try: 

194 from integrations.social.models import get_db, User 

195 db = get_db() 

196 try: 

197 user = db.query(User).filter_by(id=user_id).first() 

198 if user: 

199 consent = bool( 

200 (user.settings or {}).get('cloud_data_consent', False)) 

201 finally: 

202 db.close() 

203 except Exception: 

204 pass 

205 

206 self._consent_cache[user_id] = (consent, now) 

207 return consent 

208 

209 # U3: Hive participation opt-out (reuses consent cache pattern) 

210 def _has_hive_participation(self, user_id: str) -> bool: 

211 """Check if user has hive participation enabled (default: True). 

212 

213 Users can opt out via User.settings['hive_participation'] = False. 

214 Cached alongside cloud consent (same TTL). 

215 """ 

216 if not user_id: 

217 return True # Default participate if no user context 

218 

219 cache_key = f"hive_{user_id}" 

220 now = time.time() 

221 cached = self._consent_cache.get(cache_key) 

222 if cached and now - cached[1] < self._consent_cache_ttl: 

223 return cached[0] 

224 

225 participate = True 

226 try: 

227 from integrations.social.models import get_db, User 

228 db = get_db() 

229 try: 

230 user = db.query(User).filter_by(id=user_id).first() 

231 if user: 

232 participate = bool( 

233 (user.settings or {}).get('hive_participation', True)) 

234 finally: 

235 db.close() 

236 except Exception: 

237 pass 

238 

239 self._consent_cache[cache_key] = (participate, now) 

240 return participate 

241 

242 # ─── CCT (Compute Contribution Token) gating ────────────────── 

243 

244 def _load_cached_cct(self) -> Optional[str]: 

245 """Load CCT from file, cache in memory (5 min TTL).""" 

246 now = time.time() 

247 if self._cct_cache and now - self._cct_cache[1] < self._cct_cache_ttl: 

248 return self._cct_cache[0] 

249 

250 try: 

251 from .continual_learner_gate import ContinualLearnerGateService 

252 cct = ContinualLearnerGateService.load_cct_from_file() 

253 if cct: 

254 self._cct_cache = (cct, now) 

255 return cct 

256 except Exception: 

257 return None 

258 

259 def _check_cct_access(self, capability: str) -> bool: 

260 """Check if node's CCT grants a specific capability. Zero DB calls.""" 

261 try: 

262 from .continual_learner_gate import ContinualLearnerGateService 

263 cct = self._load_cached_cct() 

264 if not cct: 

265 return False 

266 return ContinualLearnerGateService.check_cct_capability( 

267 cct, capability, self._node_id) 

268 except Exception: 

269 return False # Fail-closed 

270 

271 def _init_in_process(self): 

272 """Try to connect to in-process learning pipeline (zero HTTP overhead). 

273 

274 When HevolveAI is pip-installed and _init_learning_pipeline() has run 

275 in hart_intelligence, we get direct references to the provider 

276 and hivemind instances. All subsequent calls bypass HTTP entirely. 

277 

278 SECURITY: Integrity verification required before enabling in-process. 

279 If HevolveAI files don't match the signed manifest, fall back to HTTP. 

280 """ 

281 # Integrity gate: verify HevolveAI installation before in-process 

282 try: 

283 from security.source_protection import SourceProtectionService 

284 integrity = SourceProtectionService.verify_hevolveai_integrity() 

285 if not integrity.get('verified', False): 

286 mismatched = integrity.get('mismatched_files', []) 

287 if mismatched: 

288 logger.warning( 

289 f"[WorldModelBridge] HevolveAI integrity FAILED: " 

290 f"{len(mismatched)} mismatched files — forcing HTTP mode") 

291 logger.info( 

292 f"[WorldModelBridge] HTTP mode: {self._api_url}") 

293 return 

294 except ImportError: 

295 pass # source_protection not available — skip check 

296 except Exception as e: 

297 logger.debug(f"[WorldModelBridge] Integrity check skipped: {e}") 

298 

299 # Worker threads MUST NOT trigger a `from hart_intelligence import …` 

300 # here. The hart_intelligence import chain (langchain, transformers, 

301 # autogen, multimodal stacks) can take 300+ seconds on first load, 

302 # and Python's per-module import lock is held for the entire 

303 # duration. When the watchdog declares the worker FROZEN at 300s 

304 # and "restarts" it, the original thread can't be killed (Python 

305 # has no thread.kill); it stays alive holding the lock. The new 

306 # thread runs the same `from hart_intelligence import …` and blocks 

307 # on the same lock. After ~10 cycles you have 10 zombie daemon 

308 # threads, zero goals dispatched, zero spark spent — exactly the 

309 # 2026-04-29 dashboard incident (daemon restart_count=9). 

310 # 

311 # Only consult `sys.modules` to see whether hart_intelligence was 

312 # already imported on the main thread (via the bootstrap pre-warm 

313 # path that owns the heavyweight import). If yes — use the 

314 # resolved module's accessors directly, no import lock taken. 

315 # If no — fall through to HTTP mode without forcing a worker-thread 

316 # import; the bootstrap path will retry the in-process upgrade on 

317 # the next record_interaction once it finishes the pre-warm. 

318 mod = sys.modules.get('hart_intelligence') 

319 if mod is not None: 

320 try: 

321 _get_provider = getattr(mod, 'get_learning_provider', None) 

322 _get_hive = getattr(mod, 'get_hive_mind', None) 

323 if _get_provider is not None: 

324 provider = _get_provider() 

325 hive = _get_hive() if _get_hive is not None else None 

326 if provider is not None: 

327 self._provider = provider 

328 self._hive_mind = hive 

329 self._in_process = True 

330 logger.info( 

331 "[WorldModelBridge] In-process mode: direct Python calls") 

332 return 

333 except Exception as e: 

334 logger.debug( 

335 f"[WorldModelBridge] in-process probe failed: {e}") 

336 logger.info( 

337 f"[WorldModelBridge] HTTP mode: {self._api_url}") 

338 

339 # ─── Record interactions (auto-learn) ──────────────────────────── 

340 

341 def record_interaction(self, user_id: str, prompt_id: str, 

342 prompt: str, response: str, 

343 model_id: str = None, latency_ms: float = 0, 

344 node_id: str = None, goal_id: str = None, 

345 attribution_chain: dict = None, 

346 escalation_reason: str = None): 

347 """Record every agent interaction as training data for HevolveAI. 

348 

349 Called after EVERY /chat response. Batches experiences and flushes 

350 them to HevolveAI (in-process or HTTP). 

351 HevolveAI auto-learns from every completion (3-priority queue: 

352 expert > reality > distillation). 

353 

354 ``escalation_reason`` (optional) — when the speculative dispatcher 

355 promoted this turn from draft to expert path, the canonical 

356 reason value from 

357 ``integrations.agent_engine.escalation_reasons.EscalationReason``. 

358 Stored on the experience so HevolveAI's distillation can weight 

359 refusal-overridden / parse-failure traces differently from 

360 clean classifier-delegate traces. ``None`` when the draft 

361 answered as final (no escalation). 

362 

363 GUARDRAIL: ConstitutionalFilter screens before storage. 

364 """ 

365 # Lazy in-process re-check: HevolveAI may have initialized after our __init__ 

366 if not self._in_process and not self._in_process_retry_done: 

367 self._in_process_retry_done = True 

368 self._init_in_process() 

369 

370 try: 

371 from security.hive_guardrails import ConstitutionalFilter 

372 passed, _ = ConstitutionalFilter.check_prompt(response) 

373 if not passed: 

374 return 

375 except ImportError: 

376 pass 

377 

378 experience = { 

379 'prompt': prompt[:2000], 

380 'response': response[:5000], 

381 'model_id': model_id or 'unknown', 

382 'latency_ms': latency_ms, 

383 'user_id': str(user_id), 

384 'prompt_id': str(prompt_id), 

385 'node_id': node_id, 

386 'goal_id': goal_id, 

387 'timestamp': time.time(), 

388 'source': 'langchain_orchestration', 

389 } 

390 # Structured attribution chain — preferred carrier for 

391 # agent_attribution's step/observation/credit/parent_action_id 

392 # payload. Previously it was packed into the 'prompt' field 

393 # as stringified JSON, which confused HevolveAI's prompt-text 

394 # distillation path. Leave the legacy path intact for callers 

395 # that haven't migrated — they pass nothing, and HevolveAI 

396 # receives only the primary experience fields. 

397 if attribution_chain is not None: 

398 experience['attribution_chain'] = attribution_chain 

399 if escalation_reason: 

400 experience['escalation_reason'] = escalation_reason 

401 

402 # PRIVACY: Redact secrets + anonymize user before shared ingestion. 

403 # The hive must NEVER leak secrets from one user to another. 

404 try: 

405 from security.secret_redactor import redact_experience 

406 experience = redact_experience(experience) 

407 except ImportError: 

408 pass 

409 

410 self._experience_queue.append(experience) 

411 with self._lock: 

412 self._stats['total_recorded'] += 1 

413 

414 # Durable local write: append the user↔assistant pair to 

415 # ConversationEntry so FULL_HISTORY and the channel unified 

416 # inbox can query it by time range. Previously record_interaction 

417 # only batched to the in-memory _experience_queue + async flush 

418 # to HevolveAI, so if HevolveAI was offline the row was lost on 

419 # process exit. Now chat history survives restarts even without 

420 # a hive connection. Failures here must not break the learning 

421 # path — wrap in try/except and log at debug level. 

422 try: 

423 self._persist_to_conversation_entry( 

424 user_id=str(user_id), prompt_id=str(prompt_id), 

425 prompt=prompt, response=response, 

426 model_id=model_id or 'unknown', 

427 ) 

428 except Exception as e: 

429 logger.debug(f"ConversationEntry durable write skipped: {e}") 

430 

431 if len(self._experience_queue) >= self._flush_batch_size: 

432 batch = [] 

433 while self._experience_queue and len(batch) < self._flush_batch_size: 

434 try: 

435 batch.append(self._experience_queue.popleft()) 

436 except IndexError: 

437 break 

438 if batch: 

439 self._flush_executor.submit(self._flush_to_world_model, batch) 

440 

441 def _persist_to_conversation_entry( 

442 self, user_id: str, prompt_id: str, 

443 prompt: str, response: str, model_id: str, 

444 ) -> None: 

445 """Write the user prompt + assistant response to ConversationEntry 

446 as two rows so FULL_HISTORY can BETWEEN-query them by created_at. 

447 

448 Keeps the single write-path that integrations.channels.response.router 

449 uses (same model, same field contract), so existing tooling that 

450 already reads ConversationEntry (unified inbox, time-based history) 

451 sees chat-route interactions too. agent_id is left empty — that 

452 maps to the default chat flow; channel-inbound flows still set it. 

453 """ 

454 try: 

455 from integrations.social.models import get_db, ConversationEntry 

456 except ImportError: 

457 return 

458 db = None 

459 try: 

460 db = get_db() 

461 for role, content in (('user', prompt), ('assistant', response)): 

462 if not content: 

463 continue 

464 entry = ConversationEntry( 

465 user_id=user_id, 

466 channel_type='chat', 

467 role=role, 

468 content=content[:10000], 

469 agent_id=None, 

470 prompt_id=prompt_id or None, 

471 ) 

472 db.add(entry) 

473 db.commit() 

474 except Exception: 

475 # Swallow — the caller's try/except already downgrades to 

476 # debug. We must not raise here under any circumstances. 

477 if db is not None: 

478 try: 

479 db.rollback() 

480 except Exception: 

481 pass 

482 raise 

483 finally: 

484 if db is not None: 

485 try: 

486 db.close() 

487 except Exception: 

488 pass 

489 

490 def _flush_to_world_model(self, batch: list): 

491 """Flush experience batch to HevolveAI's learning provider. 

492 

493 In-process mode: calls provider.create_chat_completion() directly. 

494 HTTP mode: POST /v1/chat/completions (OpenAI format). 

495 """ 

496 if self._in_process and self._provider: 

497 for exp in batch: 

498 try: 

499 messages = [ 

500 { 

501 'role': 'system', 

502 'content': json.dumps({ 

503 'source': exp.get('source', 

504 'langchain_orchestration'), 

505 'user_id': exp.get('user_id'), 

506 'prompt_id': exp.get('prompt_id'), 

507 'goal_id': exp.get('goal_id'), 

508 'model_id': exp.get('model_id'), 

509 'latency_ms': exp.get('latency_ms'), 

510 'node_id': exp.get('node_id'), 

511 }), 

512 }, 

513 {'role': 'user', 'content': exp['prompt']}, 

514 {'role': 'assistant', 'content': exp['response']}, 

515 ] 

516 self._provider.create_chat_completion( 

517 messages=messages, 

518 model='hevolve-interaction-replay', 

519 temperature=0, 

520 max_tokens=1, 

521 ) 

522 with self._lock: 

523 self._stats['total_flushed'] += 1 

524 except Exception as e: 

525 logger.debug(f"In-process flush error: {e}") 

526 return 

527 

528 # HTTP fallback (central standalone or HevolveAI not in-process) 

529 if self._http_disabled or self._cb_is_open(): 

530 return 

531 

532 # CONSENT GATE: if target is external (cloud), filter to consented users only. 

533 # Local endpoints (localhost) don't require consent — data stays on-device. 

534 if self._is_external_target(): 

535 original_count = len(batch) 

536 batch = [ 

537 exp for exp in batch 

538 if self._has_cloud_consent(exp.get('user_id', '')) 

539 ] 

540 skipped = original_count - len(batch) 

541 if skipped > 0: 

542 logger.info( 

543 f"[WorldModelBridge] Skipped {skipped}/{original_count} " 

544 f"experiences — no cloud consent") 

545 if not batch: 

546 return 

547 

548 for exp in batch: 

549 try: 

550 body = { 

551 'model': 'hevolve-interaction-replay', 

552 'messages': [ 

553 { 

554 'role': 'system', 

555 'content': json.dumps({ 

556 'source': exp.get('source', 

557 'langchain_orchestration'), 

558 'user_id': exp.get('user_id'), 

559 'prompt_id': exp.get('prompt_id'), 

560 'goal_id': exp.get('goal_id'), 

561 'model_id': exp.get('model_id'), 

562 'latency_ms': exp.get('latency_ms'), 

563 'node_id': exp.get('node_id'), 

564 }), 

565 }, 

566 {'role': 'user', 'content': exp['prompt']}, 

567 {'role': 'assistant', 'content': exp['response']}, 

568 ], 

569 'temperature': 0, 

570 'max_tokens': 1, 

571 } 

572 pooled_post( 

573 f'{self._api_url}/v1/chat/completions', 

574 json=body, 

575 timeout=self._timeout_flush, 

576 ) 

577 self._cb_record_success() 

578 with self._lock: 

579 self._stats['total_flushed'] += 1 

580 except requests.RequestException: 

581 self._cb_record_failure() 

582 except Exception as e: 

583 self._cb_record_failure() 

584 logger.debug(f"World model flush error: {e}") 

585 

586 # ─── Expert corrections (RL-EF) ───────────────────────────────── 

587 

588 def submit_correction(self, original_response: str, 

589 corrected_response: str, 

590 expert_id: str = 'hevolve_user', 

591 confidence: float = 1.0, 

592 explanation: str = None, 

593 context: dict = None, 

594 valid_until: str = None) -> dict: 

595 """Submit an expert correction to HevolveAI's RL-EF system. 

596 

597 In-process: calls send_expert_correction() directly. 

598 HTTP: POST /v1/corrections. 

599 

600 HevolveAI routes to: 

601 - Kernel Continual Learner (instant, no gradient) for factual corrections 

602 - Orthogonal LoRA (gradient-based, forgetting-safe) for conceptual ones 

603 

604 Returns dict with success status. 

605 """ 

606 try: 

607 from security.hive_guardrails import ConstitutionalFilter 

608 passed, reason = ConstitutionalFilter.check_prompt(corrected_response) 

609 if not passed: 

610 return {'success': False, 'reason': reason} 

611 except ImportError: 

612 pass 

613 

614 # PRIVACY: Redact secrets from corrections before shared learning 

615 try: 

616 from security.secret_redactor import redact_secrets 

617 original_response, _ = redact_secrets(original_response) 

618 corrected_response, _ = redact_secrets(corrected_response) 

619 if explanation: 

620 explanation, _ = redact_secrets(explanation) 

621 except ImportError: 

622 pass 

623 

624 # In-process direct call 

625 if self._in_process and self._provider: 

626 try: 

627 from hevolveai.embodied_ai.rl_ef import send_expert_correction 

628 result = send_expert_correction( 

629 domain='general', 

630 original_response=original_response[:5000], 

631 corrected_response=corrected_response[:5000], 

632 expert_id=expert_id, 

633 confidence=max(0.0, min(1.0, confidence)), 

634 explanation=explanation[:2000] if explanation else None, 

635 valid_until=valid_until, 

636 ) 

637 with self._lock: 

638 self._stats['total_corrections'] += 1 

639 return result if isinstance(result, dict) else {'success': True} 

640 except Exception as e: 

641 logger.debug(f"In-process correction failed: {e}") 

642 

643 # CONSENT GATE: external HTTP requires consent 

644 if self._is_external_target(): 

645 user_id = (context or {}).get('user_id', expert_id) 

646 if not self._has_cloud_consent(user_id): 

647 return {'success': False, 'reason': 'Cloud data consent required'} 

648 

649 # HTTP fallback 

650 if self._http_disabled: 

651 return {'success': False, 'reason': 'Learning not available (bundled mode)'} 

652 body = { 

653 'original_response': original_response[:5000], 

654 'corrected_response': corrected_response[:5000], 

655 'expert_id': expert_id, 

656 'confidence': max(0.0, min(1.0, confidence)), 

657 } 

658 if explanation: 

659 body['explanation'] = explanation[:2000] 

660 if context: 

661 body['context'] = context 

662 if valid_until: 

663 body['valid_until'] = valid_until 

664 

665 if self._cb_is_open(): 

666 return {'success': False, 'reason': 'Circuit breaker open'} 

667 

668 try: 

669 resp = pooled_post( 

670 f'{self._api_url}/v1/corrections', 

671 json=body, 

672 timeout=self._timeout_correction, 

673 ) 

674 self._cb_record_success() 

675 if resp.status_code == 200: 

676 with self._lock: 

677 self._stats['total_corrections'] += 1 

678 return resp.json() 

679 return {'success': False, 'reason': f'HTTP {resp.status_code}'} 

680 except requests.RequestException as e: 

681 self._cb_record_failure() 

682 logger.debug(f"Correction submission failed: {e}") 

683 return {'success': False, 'reason': str(e)} 

684 

685 # ─── HiveMind collective thinking ──────────────────────────────── 

686 

687 def register_peer_agent( 

688 self, 

689 peer_id: str, 

690 agent_type: str = 'remote_peer', 

691 latent_dim: int = 2048, 

692 capabilities: Optional[list] = None, 

693 ) -> bool: 

694 """Best-effort registration of a newly-linked peer with the in- 

695 process HiveMind so `fuse_thoughts` / `think_together_distributed` 

696 can include it in MoE consensus. 

697 

698 Called from `core.peer_link.link_manager.upgrade_peer` right 

699 after a successful `link.connect()`. Safe to invoke in any 

700 topology: 

701 

702 - In-process HiveMind loaded → real registration. 

703 - In-process HiveMind missing (no hevolveai, HTTP-only central 

704 tier) → logs at debug and returns False. 

705 - Any exception inside HiveMind → swallowed at debug; the link 

706 upgrade must never fail because the hive can't register. 

707 

708 Returns ``True`` iff the peer was accepted into the HiveMind 

709 agent registry. Callers generally ignore the return value — 

710 this is a side-effect hook, not a prerequisite. 

711 """ 

712 if not (self._in_process and self._hive_mind): 

713 logger.debug( 

714 f"[register_peer_agent] no in-process HiveMind; " 

715 f"skipping peer {peer_id[:8] if peer_id else '?'}" 

716 ) 

717 return False 

718 try: 

719 # Resolve AgentCapability lazily — hevolveai might expose a 

720 # different enum shape across versions; fall back to string 

721 # capabilities when the enum isn't importable. 

722 caps: list 

723 try: 

724 from hevolveai.embodied_ai.learning.hive_mind import ( 

725 AgentCapability, 

726 ) 

727 default_caps = [ 

728 AgentCapability.ENCODE, 

729 AgentCapability.REASON, 

730 ] 

731 caps = list(capabilities) if capabilities else default_caps 

732 except Exception: 

733 caps = list(capabilities) if capabilities else [ 

734 'encode', 'reason', 

735 ] 

736 self._hive_mind.register_agent( 

737 agent_id=peer_id, 

738 agent_type=agent_type, 

739 latent_dim=latent_dim, 

740 capabilities=caps, 

741 ) 

742 logger.info( 

743 f"[register_peer_agent] peer={peer_id[:8]} " 

744 f"type={agent_type} dim={latent_dim} registered" 

745 ) 

746 return True 

747 except Exception as e: 

748 logger.debug( 

749 f"[register_peer_agent] peer={peer_id[:8] if peer_id else '?'} " 

750 f"skipped: {e}" 

751 ) 

752 return False 

753 

754 # ─── G12: parallel SSM student inference ───────────────────────── 

755 

756 def predict_student( 

757 self, 

758 prompt: str, 

759 messages: Optional[List[Dict]] = None, 

760 temperature: float = 0.7, 

761 max_tokens: int = 256, 

762 timeout_s: float = 2.0, 

763 ) -> Optional[Dict]: 

764 """Run the HevolveAI SSM student forward pass in-process. 

765 

766 G12 — closes the gap where HARTOS' CustomGPT posts directly to 

767 llama.cpp and never involves the SSM student. When this is 

768 available, callers can fire the student in parallel with the 

769 llama.cpp teacher and feed the (teacher, student) pair into the 

770 distillation engine for continuous learning. 

771 

772 Returns: 

773 dict with keys {'response', 'action_tensor', 'epistemic_data'} 

774 on success, or None if: 

775 - In-process mode not active 

776 - Provider doesn't expose _generate_response 

777 - Student forward pass failed / timed out 

778 

779 Design notes: 

780 * Synchronous call; use an executor at the call site if you 

781 want to overlap it with the teacher HTTP request. 

782 * Gated by HEVOLVE_PARALLEL_SSM=0 to disable per-deployment. 

783 * Never raises — the teacher path must not be blocked by a 

784 failing student. 

785 """ 

786 import os as _os 

787 if _os.environ.get('HEVOLVE_PARALLEL_SSM', '1') == '0': 

788 return None 

789 if not (self._in_process and self._provider): 

790 return None 

791 gen = getattr(self._provider, '_generate_response', None) 

792 if gen is None or not callable(gen): 

793 return None 

794 

795 if messages is None: 

796 messages = [{'role': 'user', 'content': prompt}] 

797 try: 

798 # _generate_response returns (response_text, action_tensor, 

799 # epistemic_data). Signature verified against 

800 # learning_llm_provider.py:1511. 

801 res = gen(prompt, messages, temperature, 

802 cached_vision_features=None) 

803 except Exception as e: 

804 logger.debug(f"[predict_student] SSM forward failed: {e}") 

805 return None 

806 

807 try: 

808 if isinstance(res, tuple) and len(res) >= 3: 

809 response_text, action_tensor, epistemic_data = res[:3] 

810 elif isinstance(res, tuple) and len(res) == 2: 

811 response_text, action_tensor = res 

812 epistemic_data = None 

813 else: 

814 response_text, action_tensor, epistemic_data = ( 

815 str(res), None, None) 

816 with self._lock: 

817 self._stats.setdefault('student_inferences', 0) 

818 self._stats['student_inferences'] += 1 

819 return { 

820 'response': response_text, 

821 'action_tensor': action_tensor, 

822 'epistemic_data': epistemic_data, 

823 } 

824 except Exception as e: 

825 logger.debug(f"[predict_student] result unpack failed: {e}") 

826 return None 

827 

828 def record_teacher_student_pair( 

829 self, 

830 prompt: str, 

831 teacher_response: str, 

832 student_response: str, 

833 student_action=None, 

834 ) -> bool: 

835 """Enqueue a (teacher, student) pair for Qwen distillation. 

836 

837 Feeds the HevolveAI DistillationEngine the same way the in-process 

838 parallel path does (learning_llm_provider.py:1551-1558). Fire-and- 

839 forget; returns True iff enqueue succeeded. 

840 """ 

841 if not (self._in_process and self._provider): 

842 return False 

843 engine = getattr(self._provider, 'distillation_engine', None) 

844 if engine is None: 

845 return False 

846 if not student_response or not teacher_response: 

847 return False 

848 if student_response == teacher_response: 

849 # No correction needed — skip. 

850 return False 

851 try: 

852 engine.enqueue( 

853 query=prompt, 

854 teacher_response=teacher_response, 

855 student_response=student_response, 

856 student_action=student_action, 

857 ) 

858 with self._lock: 

859 self._stats.setdefault('distillation_pairs', 0) 

860 self._stats['distillation_pairs'] += 1 

861 return True 

862 except Exception as e: 

863 logger.debug(f"[distillation] enqueue failed: {e}") 

864 return False 

865 

866 def query_hivemind(self, query_text: str, 

867 timeout_ms: int = 1000, 

868 user_id: str = None) -> Optional[dict]: 

869 """Query HevolveAI's HiveMind for distributed collective thinking. 

870 

871 In-process: calls hive_mind.think_together_distributed() directly. 

872 HTTP: POST /v1/hivemind/think. 

873 

874 HevolveAI: 

875 1. Encodes query via frozen Qwen alignment layer (2048-D) 

876 2. Publishes local thought to WAMP 

877 3. Waits for remote agent responses (timeout_ms) 

878 4. Fuses with attention-weighted method 

879 5. Returns collective thought with contributing agents + weights 

880 

881 Use this for real-time multi-modal reasoning (tensor-level fusion). 

882 For coarse-grained task delegation, use agent-level dispatch instead. 

883 

884 CCT gating: requires 'hivemind_query' capability. Without valid CCT, 

885 returns cached/stale response (graceful degradation, not hard block). 

886 """ 

887 # U3: Check hive participation setting 

888 if user_id and not self._has_hive_participation(user_id): 

889 logger.debug(f"HiveMind query skipped: user {user_id} opted out of hive") 

890 return None 

891 

892 # CCT access gate (graceful: degrade to cached, don't block) 

893 if not self._check_cct_access('hivemind_query'): 

894 logger.debug("HiveMind query: no CCT with hivemind_query capability") 

895 cached = self._federation_aggregated.get('last_thought') 

896 if cached: 

897 return {'thought': cached, 'source': 'cached', 'cct_gated': True} 

898 return None 

899 

900 try: 

901 from security.hive_guardrails import ConstitutionalFilter 

902 passed, _ = ConstitutionalFilter.check_prompt(query_text) 

903 if not passed: 

904 return None 

905 except ImportError: 

906 pass 

907 

908 # In-process direct call 

909 if self._in_process and self._hive_mind: 

910 try: 

911 import torch 

912 # Encode query text as a thought tensor. 

913 # Prefer the provider's encoder; if unavailable, fall back to 

914 # a deterministic hash-derived vector so repeated queries of 

915 # the same text yield the same latent (vs torch.randn which 

916 # would make every call dissimilar to itself). 

917 # Mirrors the fallback in api_server.py's /hivethink handler. 

918 thought = None 

919 if (self._provider and 

920 hasattr(self._provider, 'embodied_agent') and 

921 self._provider.embodied_agent and 

922 hasattr(self._provider.embodied_agent, 'encoder')): 

923 try: 

924 encoder = self._provider.embodied_agent.encoder 

925 if hasattr(encoder, 'encode_text'): 

926 encoded = encoder.encode_text(query_text) 

927 # encode_text may return a tensor or (tensor, info) 

928 if isinstance(encoded, tuple): 

929 encoded = encoded[0] 

930 flat = encoded.detach().cpu().flatten() 

931 if flat.shape[0] > 2048: 

932 flat = flat[:2048] 

933 elif flat.shape[0] < 2048: 

934 flat = torch.nn.functional.pad( 

935 flat, (0, 2048 - flat.shape[0]) 

936 ) 

937 thought = torch.nn.functional.normalize( 

938 flat, dim=0 

939 ).unsqueeze(0) 

940 except Exception as enc_err: 

941 logger.debug( 

942 f"[Bridge] Encoder fallback for hivemind_query: {enc_err}" 

943 ) 

944 thought = None 

945 

946 if thought is None: 

947 # Deterministic hash -> 2048-dim bit-expanded vector. 

948 # Same input yields same vector (unlike torch.randn). 

949 query_hash = hash(query_text) % (2 ** 31) 

950 vec = torch.zeros(2048, device='cpu') 

951 for i in range(2048): 

952 vec[i] = ((query_hash >> (i % 31)) & 1) * 0.02 - 0.01 

953 thought = torch.nn.functional.normalize(vec, dim=0).unsqueeze(0) 

954 

955 result = self._hive_mind.think_together_distributed( 

956 local_thought=thought, 

957 local_agent_id=getattr( 

958 self._hive_mind, '_local_agent_id', 'local'), 

959 timeout_ms=timeout_ms, 

960 **({"owner_id": user_id} if user_id else {}), 

961 ) 

962 with self._lock: 

963 self._stats['total_hivemind_queries'] += 1 

964 return { 

965 'thought': result.text if hasattr(result, 'text') 

966 else str(result) 

967 } 

968 except Exception as e: 

969 logger.debug(f"In-process hivemind failed: {e}") 

970 

971 # PRIVACY: Redact secrets from query before sending to shared hivemind 

972 try: 

973 from security.secret_redactor import redact_secrets 

974 query_text, _ = redact_secrets(query_text) 

975 except ImportError: 

976 pass 

977 

978 # CONSENT GATE: external HTTP requires consent 

979 if self._is_external_target() and user_id: 

980 if not self._has_cloud_consent(user_id): 

981 return None 

982 

983 # PeerLink path — collect thoughts from connected peers directly 

984 try: 

985 from core.peer_link.link_manager import get_link_manager 

986 mgr = get_link_manager() 

987 responses = mgr.collect('hivemind', timeout_ms=timeout_ms) 

988 if responses: 

989 with self._lock: 

990 self._stats['total_hivemind_queries'] += 1 

991 # Each peer response is a legitimate (query -> answer) pair. 

992 # Feed them as training experiences so the local agent 

993 # learns from cross-peer knowledge, not just transient 

994 # query-response display. Deduplicated by peer_id + 

995 # response-hash in record_interaction's batcher. 

996 try: 

997 for idx, peer_resp in enumerate(responses): 

998 if not isinstance(peer_resp, dict): 

999 continue 

1000 peer_id = (peer_resp.get('peer_id') 

1001 or peer_resp.get('node_id') 

1002 or f'peer_{idx}') 

1003 peer_text = (peer_resp.get('thought') 

1004 or peer_resp.get('response') 

1005 or peer_resp.get('text')) 

1006 if not peer_text: 

1007 continue 

1008 self.record_interaction( 

1009 user_id=user_id or 'hive', 

1010 prompt_id=f'peerlink_{peer_id}', 

1011 prompt=query_text[:2000], 

1012 response=str(peer_text)[:5000], 

1013 model_id=f'peerlink:{peer_id}', 

1014 latency_ms=float(timeout_ms), 

1015 node_id=peer_id, 

1016 ) 

1017 with self._lock: 

1018 self._stats.setdefault('peerlink_responses_trained', 0) 

1019 self._stats['peerlink_responses_trained'] += len(responses) 

1020 except Exception as train_err: 

1021 logger.debug( 

1022 f"[Bridge] Failed to feed PeerLink responses to training: {train_err}" 

1023 ) 

1024 return { 

1025 'thoughts': responses, 

1026 'source': 'peerlink', 

1027 'peer_count': len(responses), 

1028 } 

1029 except Exception: 

1030 pass 

1031 

1032 # HTTP fallback 

1033 if self._http_disabled or self._cb_is_open(): 

1034 return None 

1035 

1036 try: 

1037 resp = pooled_post( 

1038 f'{self._api_url}/v1/hivemind/think', 

1039 json={'query': query_text[:2000], 'timeout_ms': timeout_ms}, 

1040 timeout=max(30, timeout_ms / 1000 + 5), 

1041 ) 

1042 self._cb_record_success() 

1043 if resp.status_code == 200: 

1044 with self._lock: 

1045 self._stats['total_hivemind_queries'] += 1 

1046 return resp.json() 

1047 except requests.RequestException as e: 

1048 self._cb_record_failure() 

1049 logger.debug(f"HiveMind query failed: {e}") 

1050 return None 

1051 

1052 # ─── Learning stats ────────────────────────────────────────────── 

1053 

1054 def get_learning_stats(self) -> dict: 

1055 """Get merged learning + hivemind + bridge stats. 

1056 

1057 In-process: calls provider.get_stats() + hive_mind.get_stats(). 

1058 HTTP: GET /v1/stats + GET /v1/hivemind/stats. 

1059 Returns combined dict for dashboard consumption. 

1060 """ 

1061 result = {'learning': {}, 'hivemind': {}, 'bridge': self.get_stats()} 

1062 

1063 if self._in_process: 

1064 if self._provider: 

1065 try: 

1066 result['learning'] = self._provider.get_stats() 

1067 except Exception: 

1068 pass 

1069 if self._hive_mind: 

1070 try: 

1071 result['hivemind'] = self._hive_mind.get_stats() 

1072 except Exception: 

1073 pass 

1074 return result 

1075 

1076 # HTTP fallback — guarded by circuit breaker to avoid retry storms 

1077 if self._http_disabled or self._circuit_breaker.is_open(): 

1078 return result 

1079 try: 

1080 resp = pooled_get( 

1081 f'{self._api_url}/v1/stats', timeout=self._timeout_default) 

1082 if resp.status_code == 200: 

1083 result['learning'] = resp.json() 

1084 self._circuit_breaker.record_success() 

1085 else: 

1086 self._circuit_breaker.record_failure() 

1087 except requests.RequestException: 

1088 self._circuit_breaker.record_failure() 

1089 

1090 try: 

1091 resp = pooled_get( 

1092 f'{self._api_url}/v1/hivemind/stats', timeout=self._timeout_default) 

1093 if resp.status_code == 200: 

1094 result['hivemind'] = resp.json() 

1095 self._circuit_breaker.record_success() 

1096 else: 

1097 self._circuit_breaker.record_failure() 

1098 except requests.RequestException: 

1099 self._circuit_breaker.record_failure() 

1100 

1101 return result 

1102 

1103 def get_hivemind_agents(self) -> list: 

1104 """Get list of connected HiveMind agents. 

1105 

1106 In-process: calls hive_mind.get_all_agents(). 

1107 HTTP: GET /v1/hivemind/agents. 

1108 Returns agent specs with capabilities, modality, latent dimensions. 

1109 """ 

1110 if self._in_process and self._hive_mind: 

1111 try: 

1112 return self._hive_mind.get_all_agents() 

1113 except Exception: 

1114 pass 

1115 

1116 # HTTP fallback — guarded by circuit breaker 

1117 if self._http_disabled or self._circuit_breaker.is_open(): 

1118 return [] 

1119 try: 

1120 resp = pooled_get( 

1121 f'{self._api_url}/v1/hivemind/agents', timeout=self._timeout_default) 

1122 if resp.status_code == 200: 

1123 data = resp.json() 

1124 self._circuit_breaker.record_success() 

1125 return data.get('agents', data) if isinstance(data, dict) else data 

1126 self._circuit_breaker.record_failure() 

1127 except requests.RequestException: 

1128 self._circuit_breaker.record_failure() 

1129 return [] 

1130 

1131 # ─── RALT skill distribution ───────────────────────────────────── 

1132 

1133 def distribute_skill_packet(self, ralt_packet: dict, 

1134 node_id: str, 

1135 target_nodes: list = None) -> dict: 

1136 """Distribute a learned skill (RALT packet) across hive nodes. 

1137 

1138 Gossip broadcasts a notification that a skill is available. 

1139 Each receiving peer triggers RALT ingestion through their own 

1140 local HevolveAI instance. 

1141 

1142 GUARDRAILS: 

1143 1. CCT gate — requires 'skill_distribution' capability 

1144 2. WorldModelSafetyBounds.gate_ralt_export() — rate limit + witness 

1145 3. ConstructiveFilter.check_output() — constructiveness check 

1146 """ 

1147 # CCT access gate (hard block: cannot distribute without contribution) 

1148 if not self._check_cct_access('skill_distribution'): 

1149 logger.info("Skill distribution blocked: no CCT with " 

1150 "skill_distribution capability") 

1151 with self._lock: 

1152 self._stats['total_skills_blocked'] += 1 

1153 return {'success': False, 'reason': 'no_cct_skill_distribution'} 

1154 

1155 try: 

1156 from security.hive_guardrails import WorldModelSafetyBounds 

1157 passed, reason = WorldModelSafetyBounds.gate_ralt_export( 

1158 ralt_packet, node_id) 

1159 if not passed: 

1160 with self._lock: 

1161 self._stats['total_skills_blocked'] += 1 

1162 return {'success': False, 'reason': reason} 

1163 except ImportError: 

1164 pass 

1165 

1166 try: 

1167 from security.hive_guardrails import ConstructiveFilter 

1168 desc = ralt_packet.get('description', '') 

1169 passed, reason = ConstructiveFilter.check_output(desc) 

1170 if not passed: 

1171 with self._lock: 

1172 self._stats['total_skills_blocked'] += 1 

1173 return {'success': False, 'reason': reason} 

1174 except ImportError: 

1175 pass 

1176 

1177 # Notify peers via gossip that a skill is available 

1178 # Each peer ingests via their local HevolveAI RALT receiver 

1179 try: 

1180 from integrations.social.peer_discovery import gossip 

1181 gossip.broadcast({ 

1182 'type': 'ralt_skill_available', 

1183 'packet_summary': { 

1184 'task_id': ralt_packet.get('task_id'), 

1185 'description': ralt_packet.get('description', '')[:200], 

1186 'complexity': ralt_packet.get('complexity', 'unknown'), 

1187 }, 

1188 'source_node': node_id, 

1189 'source_api_url': self._api_url, 

1190 'timestamp': time.time(), 

1191 }, targets=target_nodes) 

1192 with self._lock: 

1193 self._stats['total_skills_distributed'] += 1 

1194 return {'success': True} 

1195 except Exception as e: 

1196 logger.debug(f"RALT distribution skipped: {e}") 

1197 return {'success': False, 'reason': str(e)} 

1198 

1199 # ─── RALT skill ingestion (inbound, peer → local) ──────────────── 

1200 # 

1201 # Complements distribute_skill_packet(): that method is outbound 

1202 # (local HevolveAI learned a skill → notify peers). The two methods 

1203 # below are the inbound path (peer learned a skill → pull & install 

1204 # into local HevolveAI). Receiver side of the RALT hive loop. 

1205 # 

1206 # Wiring: 

1207 # gossip peer_broadcast endpoint (discovery.py) 

1208 # -> handle_ralt_skill_notification(msg) ← notification 

1209 # pulls full packet from source_api_url 

1210 # -> ingest_skill_packet(packet_dict) ← install 

1211 # in-process: agent.import_skill(RALTPacket.from_wire(dict)) 

1212 # HTTP: POST /v1/ralt/skills/install 

1213 

1214 def handle_ralt_skill_notification(self, msg: dict) -> dict: 

1215 """Pull a newly-announced skill from its source node and install it. 

1216 

1217 Invoked by the /api/social/peers/broadcast gossip receiver when 

1218 msg['type'] == 'ralt_skill_available'. The notification carries 

1219 only a summary; we fetch the full packet from the sender's 

1220 /v1/ralt/skills/export/{task_id} and hand it to 

1221 ingest_skill_packet. 

1222 

1223 Failure modes are returned as dicts rather than raised so the 

1224 gossip dispatcher can reply with a structured diagnostic to the 

1225 sender without crashing the blueprint. 

1226 """ 

1227 summary = msg.get('packet_summary') or {} 

1228 task_id = summary.get('task_id') 

1229 source_url = (msg.get('source_api_url') or '').rstrip('/') 

1230 source_node = msg.get('source_node', 'unknown') 

1231 

1232 if not task_id or not source_url: 

1233 return {'success': False, 

1234 'reason': 'missing_task_id_or_source_api_url'} 

1235 

1236 # Echo prevention: don't ingest skills we ourselves broadcast. 

1237 # _node_id on the bridge is currently unset; fall back to the 

1238 # gossip singleton which holds the canonical local node_id. 

1239 try: 

1240 from integrations.social.peer_discovery import gossip 

1241 local_node = self._node_id or getattr(gossip, 'node_id', None) 

1242 except Exception: 

1243 local_node = self._node_id 

1244 if local_node and source_node == local_node: 

1245 return {'success': False, 'reason': 'echo_skip'} 

1246 

1247 # Pull the packet. Use circuit breaker so a repeatedly failing 

1248 # source can't stall the gossip dispatcher. 

1249 if self._circuit_breaker.is_open(): 

1250 return {'success': False, 'reason': 'circuit_open'} 

1251 try: 

1252 resp = pooled_get( 

1253 f"{source_url}/v1/ralt/skills/export/{task_id}", 

1254 timeout=self._timeout_default) 

1255 except requests.RequestException as e: 

1256 self._circuit_breaker.record_failure() 

1257 return {'success': False, 'reason': f'fetch_failed: {e}'} 

1258 

1259 if resp.status_code != 200: 

1260 self._circuit_breaker.record_failure() 

1261 return {'success': False, 

1262 'reason': f'export_http_{resp.status_code}'} 

1263 try: 

1264 packet_dict = resp.json() 

1265 except ValueError as e: 

1266 self._circuit_breaker.record_failure() 

1267 return {'success': False, 

1268 'reason': f'export_not_json: {e}'} 

1269 self._circuit_breaker.record_success() 

1270 

1271 return self.ingest_skill_packet(packet_dict, source_node=source_node) 

1272 

1273 def ingest_skill_packet(self, packet_dict: dict, 

1274 source_node: str = '') -> dict: 

1275 """Install a wire-format RALT packet into the local HevolveAI. 

1276 

1277 In-process: reconstruct RALTPacket and call embodied_agent 

1278 .import_skill() directly (zero HTTP overhead, same code path as 

1279 the local learning loop at integrated_realtime_agent.py:1331). 

1280 

1281 HTTP: POST the dict to /v1/ralt/skills/install on the local 

1282 HevolveAI API server. 

1283 

1284 Guardrails: 

1285 1. CCT gate — requires a token with 'skill_distribution' 

1286 capability on the INGEST side too (prevents unauthenticated 

1287 peers from pushing skills into our local model). 

1288 2. WorldModelSafetyBounds.gate_ralt_ingest (if available) — 

1289 mirror of gate_ralt_export used by distribute_skill_packet. 

1290 """ 

1291 # Symmetric CCT gate: we require skill_distribution to ingest 

1292 # because ingestion mutates the local world model. Mirrors the 

1293 # check in distribute_skill_packet (line 923). 

1294 if not self._check_cct_access('skill_distribution'): 

1295 logger.info("Skill ingest blocked: no CCT with " 

1296 "skill_distribution capability") 

1297 with self._lock: 

1298 self._stats['total_skills_blocked'] += 1 

1299 return {'success': False, 'reason': 'no_cct_skill_distribution'} 

1300 

1301 # Optional ingest-side guardrail (inbound analog of 

1302 # gate_ralt_export). Skip gracefully if not implemented. 

1303 try: 

1304 from security.hive_guardrails import WorldModelSafetyBounds 

1305 if hasattr(WorldModelSafetyBounds, 'gate_ralt_ingest'): 

1306 passed, reason = WorldModelSafetyBounds.gate_ralt_ingest( 

1307 packet_dict, source_node) 

1308 if not passed: 

1309 with self._lock: 

1310 self._stats['total_skills_blocked'] += 1 

1311 return {'success': False, 'reason': reason} 

1312 except ImportError: 

1313 pass 

1314 except Exception as e: 

1315 logger.debug(f"gate_ralt_ingest skipped: {e}") 

1316 

1317 # In-process fast path: reuse the exact same call that the 

1318 # local learning loop uses after self-learning (see 

1319 # integrated_realtime_agent.py:1331). Importing RALTPacket 

1320 # lazily so this module works when HevolveAI isn't installed. 

1321 if self._in_process and self._provider is not None: 

1322 try: 

1323 agent = getattr(self._provider, 'embodied_agent', None) 

1324 if agent is not None and hasattr(agent, 'import_skill'): 

1325 from hevolveai.embodied_ai.learning.latent_transfer import ( 

1326 RALTPacket) 

1327 pkt = RALTPacket.from_wire(packet_dict) 

1328 ok = bool(agent.import_skill(pkt, verify_topology=True)) 

1329 with self._lock: 

1330 self._stats.setdefault('total_skills_received', 0) 

1331 self._stats.setdefault('total_skills_installed', 0) 

1332 self._stats['total_skills_received'] += 1 

1333 if ok: 

1334 self._stats['total_skills_installed'] += 1 

1335 return { 

1336 'success': ok, 

1337 'mode': 'in_process', 

1338 'task_id': pkt.task_id, 

1339 'source_id': pkt.source_id, 

1340 } 

1341 except Exception as e: 

1342 # Fall through to HTTP — in-process path failed but the 

1343 # HTTP API may still be reachable in a hybrid setup. 

1344 logger.debug( 

1345 f"[WorldModelBridge] in-process skill ingest failed, " 

1346 f"falling back to HTTP: {e}") 

1347 

1348 # HTTP fallback: POST to local HevolveAI /v1/ralt/skills/install 

1349 if self._http_disabled or self._circuit_breaker.is_open(): 

1350 return {'success': False, 

1351 'reason': 'http_disabled_or_circuit_open'} 

1352 try: 

1353 resp = pooled_post( 

1354 f'{self._api_url}/v1/ralt/skills/install', 

1355 json=packet_dict, 

1356 timeout=self._timeout_default) 

1357 except requests.RequestException as e: 

1358 self._circuit_breaker.record_failure() 

1359 return {'success': False, 'reason': f'install_fetch_failed: {e}'} 

1360 

1361 if resp.status_code != 200: 

1362 self._circuit_breaker.record_failure() 

1363 return {'success': False, 

1364 'reason': f'install_http_{resp.status_code}'} 

1365 self._circuit_breaker.record_success() 

1366 try: 

1367 body = resp.json() 

1368 except ValueError: 

1369 body = {'success': False, 'reason': 'install_not_json'} 

1370 with self._lock: 

1371 self._stats.setdefault('total_skills_received', 0) 

1372 self._stats.setdefault('total_skills_installed', 0) 

1373 self._stats['total_skills_received'] += 1 

1374 if body.get('success'): 

1375 self._stats['total_skills_installed'] += 1 

1376 body.setdefault('mode', 'http') 

1377 return body 

1378 

1379 # ─── Health ────────────────────────────────────────────────────── 

1380 

1381 def check_health(self) -> dict: 

1382 """Check learning pipeline health. 

1383 

1384 In-process: returns healthy if provider is available. 

1385 HTTP: GET /health on HevolveAI API. 

1386 """ 

1387 if self._in_process and self._provider: 

1388 return { 

1389 'healthy': True, 

1390 'learning_active': True, 

1391 'mode': 'in_process', 

1392 'node_tier': self._node_tier, 

1393 } 

1394 

1395 # HTTP fallback 

1396 if self._http_disabled: 

1397 return { 

1398 'healthy': False, 

1399 'learning_active': False, 

1400 'mode': 'disabled', 

1401 'node_tier': self._node_tier, 

1402 } 

1403 try: 

1404 resp = pooled_get( 

1405 f'{self._api_url}/health', timeout=5) 

1406 if resp.status_code == 200: 

1407 data = resp.json() if resp.headers.get( 

1408 'content-type', '').startswith('application/json') else {} 

1409 return { 

1410 'healthy': True, 

1411 'learning_active': True, 

1412 'mode': 'http', 

1413 'node_tier': self._node_tier, 

1414 'details': data, 

1415 } 

1416 return { 

1417 'healthy': False, 

1418 'learning_active': False, 

1419 'mode': 'http', 

1420 'node_tier': self._node_tier, 

1421 'details': {'status_code': resp.status_code}, 

1422 } 

1423 except requests.RequestException as e: 

1424 return { 

1425 'healthy': False, 

1426 'learning_active': False, 

1427 'mode': 'http', 

1428 'node_tier': self._node_tier, 

1429 'details': {'error': str(e)}, 

1430 } 

1431 

1432 def get_stats(self) -> dict: 

1433 """Get bridge-level statistics.""" 

1434 with self._lock: 

1435 return { 

1436 'queue_size': len(self._experience_queue), 

1437 'api_url': self._api_url, 

1438 'in_process': self._in_process, 

1439 **self._stats, 

1440 } 

1441 

1442 # ─── Embodied interaction (same latent space) ──────────────── 

1443 

1444 def send_action(self, action: dict) -> bool: 

1445 """Send a motor/actuator command to HevolveAI's world model. 

1446 

1447 The world model operates in one latent space — text, sensors, 

1448 motors are all representations of the same world. Actions are 

1449 predictions that the world model tests against reality. 

1450 

1451 Args: 

1452 action: Dict with type, target, params, timestamp. 

1453 e.g. {'type': 'motor_velocity', 'target': 'left_wheel', 

1454 'params': {'velocity': 0.5}, 'timestamp': ...} 

1455 

1456 Returns: 

1457 True if action was sent successfully. 

1458 """ 

1459 # Safety check before forwarding 

1460 try: 

1461 from integrations.robotics.safety_monitor import get_safety_monitor 

1462 monitor = get_safety_monitor() 

1463 if monitor.is_estopped: 

1464 logger.warning("[WorldModelBridge] Action blocked: E-stop active") 

1465 return False 

1466 # Check position if action has spatial params 

1467 position = action.get('params', {}) 

1468 if any(k in position for k in ('x', 'y', 'z')): 

1469 if not monitor.check_position_safe(position): 

1470 logger.warning("[WorldModelBridge] Action blocked: outside workspace") 

1471 return False 

1472 except ImportError: 

1473 pass 

1474 

1475 # In-process: actions route through submit_correction() for error 

1476 # propagation when outcomes differ from predictions. No separate 

1477 # action_stream module — the correction path handles backprop. 

1478 

1479 # HTTP fallback 

1480 if self._http_disabled or self._cb_is_open(): 

1481 return False 

1482 

1483 try: 

1484 resp = pooled_post( 

1485 f'{self._api_url}/v1/actions', 

1486 json=action, 

1487 timeout=self._timeout_default, 

1488 ) 

1489 if resp.status_code in (200, 201): 

1490 self._cb_record_success() 

1491 with self._lock: 

1492 self._stats['total_actions_sent'] = self._stats.get( 

1493 'total_actions_sent', 0) + 1 

1494 return True 

1495 self._cb_record_failure() 

1496 return False 

1497 except requests.RequestException: 

1498 self._cb_record_failure() 

1499 return False 

1500 

1501 def ingest_sensor_batch(self, readings: list) -> int: 

1502 """Feed sensor data to HevolveAI's world model for learning. 

1503 

1504 The world model's latent space includes physical sensor state. 

1505 This method batches sensor readings and flushes them to HevolveAI 

1506 for continuous learning — the same way text experiences are flushed. 

1507 

1508 Args: 

1509 readings: List of SensorReading.to_dict() dicts. 

1510 

1511 Returns: 

1512 Number of readings successfully ingested. 

1513 """ 

1514 if not readings: 

1515 return 0 

1516 

1517 # In-process: sensor data routes through submit_sensor_frame() → 

1518 # embodied.step(sensor, train=True). SensorInput dataclass already 

1519 # carries type/modality metadata. No separate sensor_ingest module. 

1520 

1521 # HTTP fallback 

1522 if self._http_disabled or self._cb_is_open(): 

1523 return 0 

1524 

1525 try: 

1526 resp = pooled_post( 

1527 f'{self._api_url}/v1/sensors/batch', 

1528 json={'readings': readings}, 

1529 timeout=self._timeout_flush, 

1530 ) 

1531 if resp.status_code in (200, 201): 

1532 self._cb_record_success() 

1533 with self._lock: 

1534 self._stats['total_sensor_readings'] = self._stats.get( 

1535 'total_sensor_readings', 0) + len(readings) 

1536 return len(readings) 

1537 self._cb_record_failure() 

1538 return 0 

1539 except requests.RequestException: 

1540 self._cb_record_failure() 

1541 return 0 

1542 

1543 def get_learning_feedback(self) -> Optional[Dict]: 

1544 """Poll HevolveAI for real-time learning feedback. 

1545 

1546 The world model continuously learns from sensor+action data. 

1547 This method retrieves corrections/predictions — trajectory 

1548 adjustments, new obstacle awareness, learned patterns. 

1549 

1550 Returns: 

1551 Feedback dict from HevolveAI, or None if unavailable. 

1552 """ 

1553 # In-process: feedback is returned inline by step() as InferenceStats 

1554 # (attention weights, epistemic uncertainty, kernel corrections). 

1555 # The provider exposes this via get_stats(). No separate module. 

1556 if self._in_process and self._provider: 

1557 try: 

1558 stats = self._provider.get_stats() 

1559 return stats.get('last_feedback') or stats 

1560 except Exception: 

1561 pass 

1562 

1563 # HTTP fallback 

1564 if self._http_disabled or self._cb_is_open(): 

1565 return None 

1566 

1567 try: 

1568 resp = pooled_get( 

1569 f'{self._api_url}/v1/feedback/latest', 

1570 timeout=self._timeout_default, 

1571 ) 

1572 if resp.status_code == 200: 

1573 self._cb_record_success() 

1574 return resp.json() 

1575 self._cb_record_failure() 

1576 return None 

1577 except requests.RequestException: 

1578 self._cb_record_failure() 

1579 return None 

1580 

1581 def record_embodied_interaction( 

1582 self, action: dict, sensor_context: dict, outcome: dict, 

1583 ): 

1584 """Record an action+sensor+outcome triple for recipe learning. 

1585 

1586 This extends record_interaction() for physical actions. 

1587 The triple becomes CREATE mode training data: what action was 

1588 taken, what sensor state surrounded it, what was the outcome. 

1589 

1590 Stored as experiences in the same queue that text interactions use. 

1591 Same latent space — the world model doesn't distinguish modalities. 

1592 """ 

1593 experience = { 

1594 'type': 'embodied_interaction', 

1595 'action': action, 

1596 'sensor_context': sensor_context, 

1597 'outcome': outcome, 

1598 'timestamp': action.get('timestamp', 0), 

1599 'node_tier': self._node_tier, 

1600 } 

1601 self._experience_queue.append(experience) 

1602 with self._lock: 

1603 self._stats['total_recorded'] = self._stats.get('total_recorded', 0) + 1 

1604 

1605 def emergency_stop(self) -> bool: 

1606 """Send zero-velocity to all actuators via HevolveAI. 

1607 

1608 This is the bridge-level emergency stop — it tells HevolveAI to 

1609 immediately halt all physical outputs. 

1610 """ 

1611 estop_action = { 

1612 'type': 'emergency_stop', 

1613 'target': '*', 

1614 'params': {'velocity': 0, 'force': 0}, 

1615 } 

1616 

1617 # In-process: send zero-velocity via the same HTTP estop endpoint. 

1618 # Emergency stop must be reliable — no in-process shortcut that 

1619 # could silently fail on ImportError. 

1620 try: 

1621 resp = pooled_post( 

1622 f'{self._api_url}/v1/actions/estop', 

1623 json=estop_action, 

1624 timeout=3, 

1625 ) 

1626 return resp.status_code in (200, 201) 

1627 except requests.RequestException: 

1628 return False 

1629 

1630 # ─── Sensor frame forwarding to HevolveAI ──────────────────── 

1631 

1632 def submit_sensor_frame( 

1633 self, user_id: str, frame_bytes: bytes, 

1634 channel: str = 'camera', reality_signature: float = 1.0, 

1635 ): 

1636 """Forward raw sensor frame to HevolveAI for visual encoding + learning. 

1637 

1638 HARTOS captures camera/screen frames locally. This method forwards 

1639 them to HevolveAI's /v1/sensor/ingest endpoint so the embodied AI 

1640 can: encode via Qwen -> predict next state -> compute error -> learn. 

1641 

1642 Called only on scene change (adaptive sampling) to avoid overwhelming 

1643 the learning pipeline. 

1644 

1645 In-process mode: calls learn_from_feedback directly with encoded frame. 

1646 HTTP mode: POST /v1/sensor/ingest with base64 data. 

1647 """ 

1648 import base64 

1649 

1650 source = 'camera' if channel == 'camera' else 'screen' 

1651 sig = reality_signature if channel == 'camera' else 0.0 

1652 

1653 if self._in_process and self._provider: 

1654 # In-process: encode frame and feed through learning pipeline 

1655 try: 

1656 import torch 

1657 embodied = getattr(self._provider, 'embodied_agent', None) 

1658 encoder = getattr(embodied, 'encoder', None) if embodied else None 

1659 if encoder is None: 

1660 encoder = getattr(self._provider, 'qwen_encoder', None) 

1661 if encoder is not None: 

1662 from PIL import Image 

1663 import io 

1664 img = Image.open(io.BytesIO(frame_bytes)).convert('RGB') 

1665 encoding = encoder.encode_image(img) 

1666 

1667 # Route through embodied agent step() — same path as /v1/sensor/ingest 

1668 if embodied is not None and hasattr(embodied, 'step'): 

1669 from hevolveai.embodied_ai.types.sensor_input import SensorInput 

1670 sensor = SensorInput( 

1671 data=encoding if isinstance(encoding, torch.Tensor) 

1672 else torch.tensor(encoding).float(), 

1673 input_type='encoded_features', 

1674 modality='vision', 

1675 metadata={'reality_signature': sig, 'source': source}, 

1676 ) 

1677 embodied.step(sensor) 

1678 return 

1679 except Exception as e: 

1680 logger.debug(f"[WorldModelBridge] In-process sensor frame failed: {e}") 

1681 

1682 # HTTP fallback 

1683 if self._http_disabled or self._cb_is_open(): 

1684 return 

1685 

1686 try: 

1687 data_b64 = base64.b64encode(frame_bytes).decode('ascii') 

1688 resp = pooled_post( 

1689 f'{self._api_url}/v1/sensor/ingest', 

1690 json={ 

1691 'modality': 'vision', 

1692 'source': source, 

1693 'data': data_b64, 

1694 'format': 'jpeg', 

1695 'session_id': f'{user_id}_{source}', 

1696 'reality_signature': sig, 

1697 }, 

1698 timeout=self._timeout_default, 

1699 ) 

1700 if resp.status_code == 200: 

1701 self._cb_record_success() 

1702 else: 

1703 self._cb_record_failure() 

1704 except requests.RequestException: 

1705 self._cb_record_failure() 

1706 

1707 def submit_output_feedback( 

1708 self, output_modality: str, status: str, context: str, 

1709 model_used: str = 'unknown', error_message: str = None, 

1710 generation_time_seconds: float = 0.0, user_id: str = 'default', 

1711 generated_data: bytes = None, generated_format: str = None, 

1712 ): 

1713 """Report output modality generation result to HevolveAI for learning. 

1714 

1715 Routes through existing endpoints (no redundant API surface): 

1716 - Success with data: /v1/sensor/ingest (generated output = observation) 

1717 - Error/rejection: /v1/corrections (correction signal) 

1718 - Success without data: record_interaction (text experience) 

1719 

1720 Generated outputs are just observations with reality_signature=0.0. 

1721 Errors are corrections that teach modality routing. 

1722 """ 

1723 import base64 

1724 

1725 # Success with generated data: treat as sensor observation 

1726 if status == 'completed' and generated_data is not None: 

1727 # Map modality format to sensor ingest format 

1728 modality_map = { 

1729 'image': 'vision', 

1730 'audio_speech': 'audio', 

1731 'audio_music': 'audio', 

1732 'video': 'vision', 

1733 'video_with_audio': 'multimodal', 

1734 } 

1735 sensor_modality = modality_map.get(output_modality, 'multimodal') 

1736 fmt = generated_format or 'jpeg' 

1737 

1738 if self._cb_is_open(): 

1739 return 

1740 

1741 try: 

1742 data_b64 = base64.b64encode(generated_data).decode('ascii') 

1743 resp = pooled_post( 

1744 f'{self._api_url}/v1/sensor/ingest', 

1745 json={ 

1746 'modality': sensor_modality, 

1747 'source': f'generated_{output_modality}', 

1748 'data': data_b64, 

1749 'format': fmt, 

1750 'session_id': f'{user_id}_output_{output_modality}', 

1751 'reality_signature': 0.0, 

1752 'text': context[:500], 

1753 }, 

1754 timeout=self._timeout_default, 

1755 ) 

1756 if resp.status_code == 200: 

1757 self._cb_record_success() 

1758 else: 

1759 self._cb_record_failure() 

1760 except requests.RequestException: 

1761 self._cb_record_failure() 

1762 return 

1763 

1764 # Error/rejection: route as correction. 

1765 # C1 hive-memory-on-high-error: before submitting the correction, 

1766 # ask the hive if peers have encountered this modality+error before. 

1767 # Any collective hint is attached to the correction as explanation 

1768 # so the KernelContinualLearner / LoRA learner gets richer context 

1769 # than the raw error string alone. This closes the loop between 

1770 # local failures and the shared error-memory store. 

1771 if status in ('error', 'user_rejected') and error_message: 

1772 hive_hint = None 

1773 try: 

1774 query_text = ( 

1775 f'output_failure modality={output_modality} ' 

1776 f'error={error_message[:200]} context={context[:200]}' 

1777 ) 

1778 hive_result = self.query_hivemind( 

1779 query_text, user_id=user_id, timeout_ms=200 

1780 ) 

1781 if hive_result: 

1782 if isinstance(hive_result, dict): 

1783 hive_hint = ( 

1784 hive_result.get('thought') 

1785 or hive_result.get('thoughts') 

1786 or hive_result.get('source') 

1787 ) 

1788 else: 

1789 hive_hint = str(hive_result)[:400] 

1790 if hive_hint: 

1791 with self._lock: 

1792 self._stats.setdefault( 

1793 'hive_hints_on_output_error', 0) 

1794 self._stats['hive_hints_on_output_error'] += 1 

1795 except Exception as hive_err: 

1796 logger.debug( 

1797 f"[Bridge] hive hint query failed for output error: {hive_err}" 

1798 ) 

1799 hive_hint = None 

1800 

1801 explanation = None 

1802 if hive_hint: 

1803 explanation = f'hive_hint: {str(hive_hint)[:400]}' 

1804 

1805 self.submit_correction( 

1806 original_response=f'[{output_modality}] {context[:500]}', 

1807 corrected_response=f'{output_modality} generation failed: {error_message}', 

1808 expert_id=user_id, 

1809 confidence=0.8 if status == 'user_rejected' else 0.5, 

1810 explanation=explanation, 

1811 context={'output_modality': output_modality, 

1812 'status': status, 

1813 'user_id': user_id, 

1814 'hive_hint_used': bool(hive_hint)}, 

1815 ) 

1816 return 

1817 

1818 # Success without data or pending: record as text interaction 

1819 self.record_interaction( 

1820 user_id=user_id, 

1821 prompt_id=f'output_{output_modality}', 

1822 prompt=context[:2000], 

1823 response=f'[{output_modality} {status}] generated by {model_used} in {generation_time_seconds:.1f}s', 

1824 model_id=model_used, 

1825 latency_ms=generation_time_seconds * 1000, 

1826 ) 

1827 

1828 # ─── Federation support ─────────────────────────────────────── 

1829 

1830 def extract_learning_delta(self) -> dict: 

1831 """Pull stats for federation delta extraction. 

1832 

1833 Used by FederatedAggregator.extract_local_delta() to build the 

1834 lightweight metric delta that gets broadcast to peers. 

1835 """ 

1836 stats = self.get_stats() 

1837 learning = self.get_learning_stats() 

1838 return { 

1839 'bridge': stats, 

1840 'learning': learning.get('learning', {}), 

1841 'hivemind': learning.get('hivemind', {}), 

1842 } 

1843 

1844 def apply_federation_update(self, aggregated: dict) -> bool: 

1845 """Store aggregated network-wide metrics locally and notify listeners. 

1846 

1847 Does NOT push to HevolveAI's parametric learning — federation metrics 

1848 are consumed by BenchmarkRegistry and dashboard, not the gradient 

1849 pipeline. But the EventBus emit lets any local subscriber 

1850 (dashboards, benchmark router, coding_agent benchmark_tracker) 

1851 react without polling _federation_aggregated. 

1852 

1853 Called by FederatedAggregator.apply_aggregated() (single entry point). 

1854 """ 

1855 self._federation_aggregated = aggregated 

1856 try: 

1857 from core.platform.events import emit_event 

1858 emit_event('learning.federation_update', { 

1859 'epoch': aggregated.get('epoch', 0), 

1860 'peer_count': aggregated.get('peer_count', 0), 

1861 'convergence': aggregated.get('convergence'), 

1862 }) 

1863 # Keep legacy topic for backward-compat with existing subscribers 

1864 emit_event('federation.aggregated', { 

1865 'epoch': aggregated.get('epoch', 0), 

1866 'peer_count': aggregated.get('peer_count', 0), 

1867 }) 

1868 except Exception as exc: 

1869 logger.debug(f"[WorldModelBridge] federation event emit failed: {exc}") 

1870 return True 

1871 

1872 

1873# ─── Module-level singleton ─── 

1874_bridge = None 

1875_bridge_lock = threading.Lock() 

1876 

1877 

1878def get_world_model_bridge() -> WorldModelBridge: 

1879 """Get or create the singleton WorldModelBridge.""" 

1880 global _bridge 

1881 if _bridge is None: 

1882 with _bridge_lock: 

1883 if _bridge is None: 

1884 _bridge = WorldModelBridge() 

1885 return _bridge