Coverage for integrations / agentic_router.py: 37.7%

204 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Agentic Intent Router — detects when a user prompt requires multi-step 

3execution and routes from LangChain to autogen. 

4 

5Used by the LangChain Agentic_Router tool. When a prompt is classified as 

6agentic, this module: 

71. Uses the LLM to semantically match against 96 expert agents + user recipes 

82. Uses the LLM to generate a real execution plan (3-7 steps) 

93. Returns structured plan data for the frontend Plan Mode UI 

10 

11Intent classification itself is handled by the LLM deciding whether to call 

12the Agentic_Router tool — no keyword heuristics needed. 

13""" 

14 

15import json 

16import logging 

17import os 

18from typing import Dict, List, Optional 

19 

20logger = logging.getLogger(__name__) 

21 

22 

23def find_matching_agent(prompt: str, prompts_dir: str = None) -> Optional[Dict]: 

24 """Use LLM to semantically match prompt against available agents + recipes. 

25 

26 Sends agent summaries to the LLM and asks it to select the best match. 

27 Falls back to None if LLM fails or no match found. 

28 """ 

29 agent_summaries = _build_agent_catalog(prompts_dir) 

30 if not agent_summaries: 

31 return None 

32 

33 try: 

34 from core.safe_hartos_attr import safe_hartos_attr 

35 get_llm = safe_hartos_attr('get_llm') 

36 if get_llm is None: 

37 logger.info( 

38 "Agent matching unavailable: HARTOS get_llm not yet " 

39 "resolvable — returning None (caller falls back to " 

40 "default agent dispatch)." 

41 ) 

42 return None 

43 llm = get_llm(temperature=0.1, max_tokens=300) 

44 logger.info( 

45 "Agent matching: prompt=%r catalog_size=%d", 

46 (prompt or '')[:80], len(agent_summaries), 

47 ) 

48 

49 catalog_text = "\n".join( 

50 f"- ID:{a['id']} | {a['name']} | {a['source']} | {a['description'][:120]}" 

51 for a in agent_summaries[:50] 

52 ) 

53 

54 result = llm.invoke( 

55 f"Given this user task, select the single best matching agent from the catalog below. " 

56 f"If no agent is a good semantic match, respond with just 'NONE'.\n" 

57 f"Otherwise respond with ONLY the agent ID.\n\n" 

58 f"User task: {prompt}\n\n" 

59 f"Agent catalog:\n{catalog_text}" 

60 ) 

61 

62 text = (result.content if hasattr(result, 'content') else str(result)).strip() 

63 

64 if text.upper() == 'NONE' or not text: 

65 return None 

66 

67 for a in agent_summaries: 

68 if a['id'] in text: 

69 return { 

70 'agent_id': a['id'], 

71 'name': a['name'], 

72 'score': 15, 

73 'source': a['source'], 

74 'description': a['description'], 

75 } 

76 return None 

77 except Exception as e: 

78 logger.warning(f"LLM agent matching failed: {e}") 

79 return None 

80 

81 

82def _build_agent_catalog(prompts_dir: str = None) -> List[Dict]: 

83 """Build unified catalog of expert agents + user recipes for LLM matching.""" 

84 catalog = [] 

85 

86 try: 

87 from integrations.expert_agents.registry import ExpertAgentRegistry 

88 registry = ExpertAgentRegistry() 

89 for agent in registry.agents.values(): 

90 catalog.append({ 

91 'id': agent.agent_id, 

92 'name': agent.name, 

93 'description': agent.description, 

94 'source': 'expert', 

95 }) 

96 except Exception: 

97 pass 

98 

99 if prompts_dir and os.path.isdir(prompts_dir): 

100 try: 

101 for fname in os.listdir(prompts_dir): 

102 if not fname.endswith('.json') or '_recipe' in fname: 

103 continue 

104 try: 

105 with open(os.path.join(prompts_dir, fname)) as f: 

106 recipe = json.load(f) 

107 catalog.append({ 

108 'id': fname.replace('.json', ''), 

109 'name': recipe.get('name', fname), 

110 'description': recipe.get('goal', ''), 

111 'source': 'recipe', 

112 }) 

113 except Exception: 

114 continue 

115 except Exception: 

116 pass 

117 

118 # Hive recipes — federated index from peer nodes 

119 try: 

120 from integrations.agent_engine.federated_aggregator import get_federated_aggregator 

121 agg = get_federated_aggregator() 

122 hive_index = agg.aggregate_recipes() 

123 if hive_index and isinstance(hive_index, dict): 

124 for rid, info in hive_index.items(): 

125 if isinstance(info, dict) and info.get('name'): 

126 catalog.append({ 

127 'id': rid, 

128 'name': info['name'], 

129 'description': info.get('description', ''), 

130 'source': 'hive', 

131 }) 

132 except Exception: 

133 pass 

134 

135 # Google A2A registered agents 

136 try: 

137 from integrations.google_a2a.dynamic_agent_registry import get_registry 

138 a2a_registry = get_registry() 

139 for agent in a2a_registry.list_agents(): 

140 catalog.append({ 

141 'id': agent.get('id', ''), 

142 'name': agent.get('name', ''), 

143 'description': agent.get('description', ''), 

144 'source': 'a2a', 

145 }) 

146 except Exception: 

147 pass 

148 

149 return catalog 

150 

151 

152def generate_plan_steps(prompt: str, matched_agent: Optional[Dict] = None) -> List[Dict]: 

153 """Generate plan steps using the LLM. Falls back to generic steps on failure.""" 

154 try: 

155 from core.safe_hartos_attr import safe_hartos_attr 

156 get_llm = safe_hartos_attr('get_llm') 

157 if get_llm is None: 

158 logger.info( 

159 "Plan generation falling back to generic steps: HARTOS " 

160 "get_llm not yet resolvable (loader still init)." 

161 ) 

162 raise RuntimeError("HARTOS get_llm unavailable") 

163 llm = get_llm(temperature=0.3, max_tokens=800) 

164 logger.info( 

165 "Plan generation: prompt=%r matched_agent=%s", 

166 (prompt or '')[:80], 

167 matched_agent.get('name') if matched_agent else None, 

168 ) 

169 

170 agent_context = "" 

171 if matched_agent: 

172 agent_context = (f"\nMatched expert: {matched_agent['name']} — " 

173 f"{matched_agent.get('description', '')}") 

174 

175 result = llm.invoke( 

176 f"Generate a 3-7 step execution plan for this task. " 

177 f"Return ONLY a JSON array: " 

178 f'[{{"step_num": 1, "description": "...", "tool_or_agent": "..."}}]' 

179 f"{agent_context}\n\nTask: {prompt}" 

180 ) 

181 

182 text = result.content if hasattr(result, 'content') else str(result) 

183 import re 

184 match = re.search(r'\[.*\]', text, re.DOTALL) 

185 if match: 

186 steps = json.loads(match.group()) 

187 if isinstance(steps, list) and len(steps) >= 2: 

188 return steps 

189 except Exception as e: 

190 logger.warning(f"LLM plan generation failed, using fallback: {e}") 

191 

192 agent_name = matched_agent['name'] if matched_agent else 'execution' 

193 return [ 

194 {'step_num': 1, 'description': 'Analyze requirements and gather context', 'tool_or_agent': 'analysis'}, 

195 {'step_num': 2, 'description': 'Plan approach and identify resources', 'tool_or_agent': 'planning'}, 

196 {'step_num': 3, 'description': 'Execute the task', 'tool_or_agent': agent_name}, 

197 {'step_num': 4, 'description': 'Deliver results and get feedback', 'tool_or_agent': 'delivery'}, 

198 ] 

199 

200 

201def should_auto_create_agent(prompt: str, prompts_dir: str = None) -> bool: 

202 """Return True only if NO existing agent can handle this task. 

203 

204 This is the gate that prevents unnecessary agent creation. 

205 """ 

206 match = find_matching_agent(prompt, prompts_dir) 

207 return match is None 

208 

209 

210def build_agentic_plan(prompt: str, prompts_dir: str = None) -> Dict: 

211 """Full pipeline: match → plan. Returns structured plan dict.""" 

212 matched_agent = find_matching_agent(prompt, prompts_dir) 

213 plan_steps = generate_plan_steps(prompt, matched_agent) 

214 

215 return { 

216 'task_description': prompt, 

217 'steps': plan_steps, 

218 'matched_agent_id': matched_agent['agent_id'] if matched_agent else None, 

219 'matched_agent_name': matched_agent['name'] if matched_agent else None, 

220 'matched_agent_source': matched_agent['source'] if matched_agent else None, 

221 'confidence': 'high' if matched_agent else 'medium', 

222 'requires_new_agent': matched_agent is None, 

223 } 

224 

225 

226# ───────────────────────────────────────────────────────────────────── 

227# Direct-named dispatch (Phase 7b — mention path) 

228# Plan reference: sunny-gliding-eich.md, Part B.4 + Part E.5. 

229# 

230# Used when a specific agent is named (`@solar-architect`) so we don't 

231# need to run the matcher — the caller already knows which agent should 

232# reply. The dispatch runs the full guardrail pipeline that any other 

233# agent action runs through, and posts the reply via 

234# CommentService.create so it picks up DLP, classification, fan-out, 

235# resonance, etc. — same path a human reply takes. No new privileged 

236# code path. 

237# 

238# Async by default: a daemon thread does the LLM call so the calling 

239# Flask request (post create / comment create) returns immediately. 

240# If anything fails — guardrail rejection, LLM unreachable, source 

241# missing — the worker logs and exits. The Mention + Notification rows 

242# persisted upstream survive, so the agent runtime can pick the work up 

243# asynchronously the next tick. 

244# ───────────────────────────────────────────────────────────────────── 

245 

246# Cap agent reply chains to keep two misbehaving agents in a group 

247# from spinning up an unbounded thread chain. Reviewer-flagged M2. 

248# The depth is carried through the context dict so each recursive 

249# trigger increments and refuses past the ceiling. 

250_MAX_AGENT_DISPATCH_DEPTH = 2 

251 

252 

253def dispatch_to_agent(agent_id: str, prompt: str, 

254 context: Dict = None, 

255 synchronous: bool = False) -> None: 

256 """Spawn an async worker to generate + post the agent's reply. 

257 

258 Args: 

259 agent_id: User.id of the @-mentioned agent (User.user_type='agent'). 

260 prompt: Inline content the agent should reason about — already 

261 includes surrounding source text from MentionService. 

262 context: dict with 'source_kind' ('post' | 'comment' | 'message'), 

263 'source_id', 'author_id', 'tenant_id', and optional 

264 '_dispatch_depth' (recursion counter, set automatically). 

265 synchronous: testing knob. When True, run inline so unit tests can 

266 assert post-conditions without thread coordination. Default 

267 False — production callers always want fire-and-forget. 

268 

269 Recursion guard: if context['_dispatch_depth'] > _MAX_AGENT_DISPATCH_DEPTH 

270 the call is refused. This prevents two mention-each-other agents 

271 from spinning an unbounded chain (M2 — reviewer-flagged). 

272 

273 Never raises — agent dispatch failure must never break the calling 

274 post/comment create path. 

275 """ 

276 ctx = dict(context) if context else {} 

277 depth = int(ctx.get('_dispatch_depth') or 0) 

278 # `>=` so the cap names the max number of levels exactly. With 

279 # _MAX_AGENT_DISPATCH_DEPTH=2 we permit depth 0 → 1 (the call 

280 # itself) and depth 1 → 2 (one recursive level), refusing at 2. 

281 # Reviewer N-NEW-3. 

282 if depth >= _MAX_AGENT_DISPATCH_DEPTH: 

283 logger.info( 

284 "dispatch_to_agent: refusing recursive dispatch at depth=%d " 

285 "(ceiling=%d) for agent=%s", 

286 depth, _MAX_AGENT_DISPATCH_DEPTH, agent_id) 

287 return 

288 ctx['_dispatch_depth'] = depth + 1 

289 args = (agent_id, prompt, ctx) 

290 if synchronous: 

291 _dispatch_to_agent_worker(*args) 

292 return 

293 import threading 

294 threading.Thread( 

295 target=_dispatch_to_agent_worker, 

296 args=args, 

297 daemon=True, 

298 name=f'mention_dispatch_{(agent_id or "?")[:8]}', 

299 ).start() 

300 

301 

302def _dispatch_via_chat(agent_id: str, rewritten_prompt: str, 

303 context: Dict) -> Optional[str]: 

304 """Reuse the canonical /chat endpoint instead of doing a raw 

305 ``llm.invoke``. 

306 

307 /chat already runs the full agent runtime (autogen / langchain / 

308 draft-first routing, persona via 

309 ``agent_identity.build_identity_prompt``, dynamic per-agent tools, 

310 multi-turn history) that ``flask_integration._handle_message`` has 

311 used for 31-channel adapter inbound since 2026-01-31. Social- 

312 platform mention dispatch should use the same runtime — this 

313 helper lets it. 

314 

315 Body shape mirrors what ``flask_integration._handle_message`` 

316 builds at line 150 so /chat sees the same payload regardless of 

317 which caller dispatched. Missing ``agent_id``/``prompt_id``/recipe 

318 is the existing /chat fallback signal that routes to LangChain. 

319 

320 Transport is HTTP loopback to localhost via ``pooled_post`` — the 

321 same connection-pooled transport ``flask_integration`` uses, so we 

322 don't open a parallel HTTP client. In every HARTOS deploy mode 

323 (flat / regional / central / Docker / ISO / pip-installed server) 

324 the HARTOS Flask app is reachable on its configured backend port, 

325 so loopback is universally available. 

326 

327 Returns the agent reply text on success, or ``None`` on any failure 

328 (chat unreachable, non-200, malformed response, missing 

329 ``response`` field). Caller falls back to raw ``llm.invoke`` so 

330 agents still respond rather than going silent. 

331 

332 Behind the ``HEVOLVE_FLAG_DISPATCH_VIA_CHAT`` env flag — default 

333 off, dormant until production verifies the /chat path for social- 

334 platform agent dispatches. 

335 """ 

336 try: 

337 from core.http_pool import pooled_post 

338 except Exception as e: 

339 logger.warning("dispatch_via_chat: core.http_pool unavailable (%s)", e) 

340 return None 

341 

342 try: 

343 from core.constants import DEFAULT_USER_ID, DEFAULT_PROMPT_ID 

344 except Exception: 

345 DEFAULT_USER_ID, DEFAULT_PROMPT_ID = 10077, 8888 

346 

347 try: 

348 from core.port_registry import get_port 

349 port = get_port('backend') 

350 except Exception: 

351 port = int(os.environ.get('FLASK_PORT') 

352 or os.environ.get('HEVOLVE_BACKEND_PORT') 

353 or '5000') 

354 

355 body = { 

356 'user_id': context.get('owner_id') or DEFAULT_USER_ID, 

357 # /chat early-validation requires prompt_id key to exist — 

358 # DEFAULT_PROMPT_ID is the social-platform fallback. /chat's 

359 # own logic resolves the actual recipe / persona for this 

360 # dispatch from agent_id (when set) before falling back to the 

361 # prompt_id default; this field is just the validation 

362 # placeholder. 

363 'prompt_id': context.get('prompt_id') or DEFAULT_PROMPT_ID, 

364 'prompt': rewritten_prompt, 

365 # agent_id is the social-platform agent's User.id — /chat uses 

366 # this to load the persona via agent_identity, register tools, 

367 # and route to the autogen runtime. When absent /chat 

368 # gracefully falls back to LangChain (per the existing check). 

369 'agent_id': agent_id, 

370 # Don't auto-create — social agents are pre-registered via 

371 # UserService.register_agent at signup / onboarding time. 

372 'create_agent': False, 

373 # Forward the originating room context so /chat's response 

374 # router can fan out to the right surface. 

375 'channel_context': dict({ 

376 'source_kind': context.get('source_kind'), 

377 'source_id': context.get('source_id'), 

378 }, **(context.get('channel_context') or {})), 

379 } 

380 

381 url = f"http://localhost:{port}/chat" 

382 try: 

383 resp = pooled_post(url, json=body, timeout=120) 

384 except Exception as e: 

385 logger.warning( 

386 "dispatch_via_chat: pooled_post to %s failed: %s", url, e) 

387 return None 

388 

389 if getattr(resp, 'status_code', 0) != 200: 

390 logger.warning( 

391 "dispatch_via_chat: /chat returned %s for agent=%s — body=%s", 

392 getattr(resp, 'status_code', '?'), 

393 agent_id, 

394 (getattr(resp, 'text', '') or '')[:200]) 

395 return None 

396 

397 try: 

398 data = resp.json() 

399 except Exception as e: 

400 logger.warning( 

401 "dispatch_via_chat: response not JSON for agent=%s: %s", 

402 agent_id, e) 

403 return None 

404 

405 reply = data.get('response') if isinstance(data, dict) else None 

406 if not isinstance(reply, str) or not reply.strip(): 

407 return None 

408 return reply.strip() 

409 

410 

411def _dispatch_to_agent_worker(agent_id: str, prompt: str, context: Dict): 

412 """Worker body: guardrails → LLM → guardrails → post as Comment. 

413 

414 Each stage degrades gracefully — missing LLM or guardrails just 

415 short-circuits the worker, never bubbles. The Mention rows are 

416 already persisted by MentionService so the conversation isn't lost. 

417 """ 

418 if not agent_id or not prompt: 

419 return 

420 

421 # 1. Pre-dispatch: Constitutional Filter + Ethos + circuit breaker. 

422 rewritten = prompt 

423 try: 

424 from security.hive_guardrails import GuardrailEnforcer 

425 passed, reason, rewritten = GuardrailEnforcer.before_dispatch(prompt) 

426 if not passed: 

427 logger.info("dispatch_to_agent: rejected pre-dispatch (%s); " 

428 "agent=%s", reason, agent_id) 

429 return 

430 except Exception as e: 

431 # Guardrails module missing — degrade open in dev, but still 

432 # block in any production where the module is expected. Log 

433 # loudly so this is visible in audits. 

434 logger.warning("dispatch_to_agent: guardrails unavailable; " 

435 "proceeding without pre-dispatch check (%s)", e) 

436 

437 # 2. Run prompt through the canonical agent runtime. 

438 # 

439 # Two paths, gated by HEVOLVE_FLAG_DISPATCH_VIA_CHAT: 

440 # 

441 # ON (target state) — delegate to the canonical /chat HTTP 

442 # endpoint flask_integration._handle_message has used for 

443 # 31-channel adapter inbound since 2026-01-31. /chat brings: 

444 # - autogen / langchain / draft-first routing (chooses the 

445 # right runtime per request, with the existing 

446 # "no agent_id/prompt_id/recipe → langchain" fallback) 

447 # - per-agent persona via build_identity_prompt(agent_config) 

448 # - dynamic tool registration per agent 

449 # - multi-turn conversation history 

450 # This is the unification the social-platform dispatch was 

451 # missing — without it, social agents make raw single-shot 

452 # llm.invoke calls and have NO tools / persona / history. 

453 # 

454 # OFF (default — preserves existing behavior) — raw `llm.invoke` 

455 # via safe_hartos_attr('get_llm'). Same code path social 

456 # agents have used since the 2026-05-04 Phase-7+8+9 mega-commit. 

457 # Flag stays off until the /chat path is validated for social- 

458 # platform dispatches; flipping the flag is the rollout switch. 

459 # 

460 # On flag-on `_dispatch_via_chat` returning None (chat endpoint 

461 # unreachable, non-200, malformed response), we fall back to raw 

462 # llm.invoke so agents still respond rather than going silent. 

463 reply_text: Optional[str] = None 

464 use_chat = ( 

465 os.environ.get('HEVOLVE_FLAG_DISPATCH_VIA_CHAT', '').strip().lower() 

466 in ('1', 'true', 'yes', 'on') 

467 ) 

468 if use_chat: 

469 reply_text = _dispatch_via_chat(agent_id, rewritten, context) 

470 if reply_text is None: 

471 logger.info( 

472 "dispatch_to_agent: /chat path failed; falling back to " 

473 "raw LLM for agent=%s", agent_id) 

474 

475 if not reply_text: 

476 try: 

477 from core.safe_hartos_attr import safe_hartos_attr 

478 get_llm = safe_hartos_attr('get_llm') 

479 if get_llm is None: 

480 logger.info("dispatch_to_agent: get_llm unresolved; " 

481 "agent=%s — runtime will pick up", agent_id) 

482 return 

483 llm = get_llm(temperature=0.7, max_tokens=600) 

484 result = llm.invoke(rewritten) 

485 reply_text = (result.content if hasattr(result, 'content') 

486 else str(result)).strip() 

487 except Exception as e: 

488 logger.warning( 

489 "dispatch_to_agent: LLM call failed for agent=%s: %s", 

490 agent_id, e) 

491 return 

492 

493 if not reply_text: 

494 return 

495 

496 # 3. Post-response: Constructive Filter + energy tracking. 

497 try: 

498 from security.hive_guardrails import GuardrailEnforcer 

499 passed, reason = GuardrailEnforcer.after_response(reply_text) 

500 if not passed: 

501 logger.info("dispatch_to_agent: response blocked post-LLM " 

502 "(%s); agent=%s", reason, agent_id) 

503 return 

504 except Exception: 

505 pass 

506 

507 # 4. Post via existing CommentService — same fan-out a human gets. 

508 _post_agent_reply(agent_id, context or {}, reply_text) 

509 

510 

511def _post_agent_reply(agent_id: str, context: Dict, reply_text: str): 

512 """Post the agent's reply through the appropriate existing surface. 

513 

514 For source_kind='post' → top-level comment on that post. 

515 For source_kind='comment' → reply nested under the parent comment. 

516 For source_kind='message' → message in the same conversation 

517 (Phase 7c.3 path; agent's reply gets the same fan-out + mention 

518 parsing any human reply gets — no privileged path). 

519 Unknown source kinds are logged and skipped. 

520 """ 

521 source_kind = context.get('source_kind') 

522 source_id = context.get('source_id') 

523 if not source_kind or not source_id: 

524 return 

525 try: 

526 from integrations.social.models import db_session, User, Post, Comment 

527 from integrations.social.services import CommentService 

528 with db_session() as db: 

529 agent = db.query(User).filter(User.id == agent_id).first() 

530 if not agent: 

531 logger.info("dispatch_to_agent: agent user_id=%s not found", 

532 agent_id) 

533 return 

534 

535 if source_kind == 'post': 

536 post = db.query(Post).filter(Post.id == source_id).first() 

537 if post: 

538 CommentService.create(db, post, agent, reply_text) 

539 elif source_kind == 'comment': 

540 parent = db.query(Comment).filter( 

541 Comment.id == source_id).first() 

542 if parent: 

543 post = db.query(Post).filter( 

544 Post.id == parent.post_id).first() 

545 if post: 

546 CommentService.create(db, post, agent, reply_text, 

547 parent_id=parent.id) 

548 elif source_kind == 'message': 

549 # Conversation message — find the parent conversation 

550 # via the source message row, post the agent's reply 

551 # as a new Message in that same conversation. 

552 from sqlalchemy import text as _text 

553 row = db.execute(_text( 

554 "SELECT parent_id FROM messages WHERE id = :mid"), 

555 {'mid': source_id} 

556 ).fetchone() 

557 if row is None: 

558 logger.info("dispatch_to_agent: source message %s not " 

559 "found", source_id) 

560 return 

561 conv_id = row[0] 

562 from integrations.social.conversation_service import ( 

563 ConversationService) 

564 # Auto-add the agent as a conversation member if its 

565 # owner already is and the conversation allows agents. 

566 # For now we require the agent to already be a member — 

567 # joining is handled by AgentJoinGrant in Phase 7d. 

568 from integrations.social.conversation_service import ( 

569 _is_member) 

570 if not _is_member(db, conv_id, agent.id): 

571 logger.info("dispatch_to_agent: agent %s not a member " 

572 "of conv %s; skipping reply", 

573 agent.id, conv_id) 

574 return 

575 ConversationService.send_message( 

576 db, conv_id=conv_id, author_id=agent.id, 

577 content=reply_text, 

578 tenant_id=context.get('tenant_id')) 

579 elif source_kind == 'call': 

580 # Live voice/video call — the agent's reply text is 

581 # destined for an audio publisher (PocketTTS → LiveKit 

582 # frames). Three deliveries: 

583 # 

584 # 1. TTS outbox enqueue → bridge worker drains, hands 

585 # to audio publisher (canonical reply path for the 

586 # magic loop: speaker → STT → router → TTS → speaker). 

587 # 2. Cross-channel transcript persist → the call 

588 # transcript reads chronologically alongside any 

589 # adapter-channel chat (UNIF-G3). Best-effort. 

590 # 3. Liquid UI meet_copilot card emit → the user sees 

591 # the agent's reply in the UI overlay even before 

592 # audio arrives (UNIF-G5). Best-effort. 

593 # 

594 # source_id == call_id (CallSession). The agent must 

595 # already be attached as a CallParticipant; any 

596 # AgentJoinGrant + scope.can_voice gating happens at 

597 # CallService.attach_agent before the bridge spins up. 

598 call_id = source_id 

599 try: 

600 from integrations.social.agent_voice_bridge import ( 

601 enqueue_tts_text) 

602 enqueue_tts_text(call_id, agent.id, reply_text) 

603 except Exception as e: 

604 logger.warning( 

605 "dispatch_to_agent: TTS outbox enqueue failed " 

606 "(call=%s, agent=%s): %s", 

607 call_id, agent.id, e) 

608 try: 

609 from integrations.social.chat_messages import ( 

610 persist_external_room_event) 

611 persist_external_room_event( 

612 user_id=str(context.get('owner_id') or agent.id), 

613 platform=str(context.get('platform') or 'livekit'), 

614 room_id=str(call_id), 

615 sender_id=str(agent.id), 

616 text=reply_text, 

617 kind='agent_reply', 

618 ) 

619 except Exception as e: 

620 logger.debug( 

621 "dispatch_to_agent: cross-channel persist " 

622 "skipped (%s)", e) 

623 try: 

624 from core.platform.service_registry import ( 

625 ServiceRegistry) 

626 svc = ServiceRegistry.get('LiquidUIService') 

627 if svc is not None: 

628 svc.agent_ui_update( 

629 str(agent.id), 

630 { 

631 'type': 'meet_copilot', 

632 'call_id': str(call_id), 

633 'platform': str( 

634 context.get('platform') or 'livekit'), 

635 'room_id': str(call_id), 

636 'state': 'live', 

637 'agent_reply': reply_text, 

638 }, 

639 ) 

640 except Exception as e: 

641 logger.debug( 

642 "dispatch_to_agent: meet_copilot emit " 

643 "skipped (%s)", e) 

644 elif source_kind == 'external_room': 

645 # Adapter-bound rooms (Discord channel / WhatsApp group / 

646 # Slack channel / Matrix room / Teams channel / etc.). 

647 # The agent's reply text is destined for the SAME room 

648 # the user wrote in. Delegate to the canonical 

649 # ``ChannelResponseRouter.route_response`` — it: 

650 # 1. Logs the assistant turn to ConversationEntry. 

651 # 2. Sends back to the originating channel via the 

652 # ChannelRegistry (each adapter's send_message). 

653 # 3. Optionally fans out to other bound channels. 

654 # 4. WAMP-notifies the user's desktop. 

655 # 

656 # Caller's ``context['channel_context']`` MUST carry 

657 # ``{channel, chat_id, sender_id}`` — the same dict 

658 # ``flask_integration._handle_message`` already builds 

659 # for the legacy /chat path. Reusing the same shape 

660 # means a future migration can swap the legacy path 

661 # over by changing one call site. 

662 ch_ctx = context.get('channel_context') or {} 

663 if not ch_ctx.get('channel') or not ch_ctx.get('chat_id'): 

664 logger.info( 

665 "dispatch_to_agent: source_kind='external_room' " 

666 "needs context['channel_context'] with " 

667 "{channel, chat_id}; got %r — skipping reply", 

668 ch_ctx) 

669 return 

670 try: 

671 from integrations.channels.response.router import ( 

672 get_response_router) 

673 get_response_router().route_response( 

674 user_id=context.get('owner_id') or agent.id, 

675 response_text=reply_text, 

676 channel_context=ch_ctx, 

677 agent_id=agent.id, 

678 # Fan-out is a caller decision — default False 

679 # so the agent reply lands in the originating 

680 # room only. Callers that want bound-channel 

681 # broadcast can pass `fan_out_external=True` in 

682 # context. 

683 fan_out=bool(context.get('fan_out_external')), 

684 ) 

685 except Exception as e: 

686 logger.warning( 

687 "dispatch_to_agent: external_room reply via " 

688 "ChannelResponseRouter failed (channel=%s " 

689 "chat_id=%s): %s", 

690 ch_ctx.get('channel'), ch_ctx.get('chat_id'), e) 

691 else: 

692 logger.info("dispatch_to_agent: source_kind=%s not yet " 

693 "supported", source_kind) 

694 except Exception as e: 

695 logger.warning("dispatch_to_agent: post-reply persist failed: %s", e)