Coverage for integrations / agentic_router.py: 37.7%
204 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Agentic Intent Router — detects when a user prompt requires multi-step
3execution and routes from LangChain to autogen.
5Used by the LangChain Agentic_Router tool. When a prompt is classified as
6agentic, this module:
71. Uses the LLM to semantically match against 96 expert agents + user recipes
82. Uses the LLM to generate a real execution plan (3-7 steps)
93. Returns structured plan data for the frontend Plan Mode UI
11Intent classification itself is handled by the LLM deciding whether to call
12the Agentic_Router tool — no keyword heuristics needed.
13"""
15import json
16import logging
17import os
18from typing import Dict, List, Optional
20logger = logging.getLogger(__name__)
23def find_matching_agent(prompt: str, prompts_dir: str = None) -> Optional[Dict]:
24 """Use LLM to semantically match prompt against available agents + recipes.
26 Sends agent summaries to the LLM and asks it to select the best match.
27 Falls back to None if LLM fails or no match found.
28 """
29 agent_summaries = _build_agent_catalog(prompts_dir)
30 if not agent_summaries:
31 return None
33 try:
34 from core.safe_hartos_attr import safe_hartos_attr
35 get_llm = safe_hartos_attr('get_llm')
36 if get_llm is None:
37 logger.info(
38 "Agent matching unavailable: HARTOS get_llm not yet "
39 "resolvable — returning None (caller falls back to "
40 "default agent dispatch)."
41 )
42 return None
43 llm = get_llm(temperature=0.1, max_tokens=300)
44 logger.info(
45 "Agent matching: prompt=%r catalog_size=%d",
46 (prompt or '')[:80], len(agent_summaries),
47 )
49 catalog_text = "\n".join(
50 f"- ID:{a['id']} | {a['name']} | {a['source']} | {a['description'][:120]}"
51 for a in agent_summaries[:50]
52 )
54 result = llm.invoke(
55 f"Given this user task, select the single best matching agent from the catalog below. "
56 f"If no agent is a good semantic match, respond with just 'NONE'.\n"
57 f"Otherwise respond with ONLY the agent ID.\n\n"
58 f"User task: {prompt}\n\n"
59 f"Agent catalog:\n{catalog_text}"
60 )
62 text = (result.content if hasattr(result, 'content') else str(result)).strip()
64 if text.upper() == 'NONE' or not text:
65 return None
67 for a in agent_summaries:
68 if a['id'] in text:
69 return {
70 'agent_id': a['id'],
71 'name': a['name'],
72 'score': 15,
73 'source': a['source'],
74 'description': a['description'],
75 }
76 return None
77 except Exception as e:
78 logger.warning(f"LLM agent matching failed: {e}")
79 return None
82def _build_agent_catalog(prompts_dir: str = None) -> List[Dict]:
83 """Build unified catalog of expert agents + user recipes for LLM matching."""
84 catalog = []
86 try:
87 from integrations.expert_agents.registry import ExpertAgentRegistry
88 registry = ExpertAgentRegistry()
89 for agent in registry.agents.values():
90 catalog.append({
91 'id': agent.agent_id,
92 'name': agent.name,
93 'description': agent.description,
94 'source': 'expert',
95 })
96 except Exception:
97 pass
99 if prompts_dir and os.path.isdir(prompts_dir):
100 try:
101 for fname in os.listdir(prompts_dir):
102 if not fname.endswith('.json') or '_recipe' in fname:
103 continue
104 try:
105 with open(os.path.join(prompts_dir, fname)) as f:
106 recipe = json.load(f)
107 catalog.append({
108 'id': fname.replace('.json', ''),
109 'name': recipe.get('name', fname),
110 'description': recipe.get('goal', ''),
111 'source': 'recipe',
112 })
113 except Exception:
114 continue
115 except Exception:
116 pass
118 # Hive recipes — federated index from peer nodes
119 try:
120 from integrations.agent_engine.federated_aggregator import get_federated_aggregator
121 agg = get_federated_aggregator()
122 hive_index = agg.aggregate_recipes()
123 if hive_index and isinstance(hive_index, dict):
124 for rid, info in hive_index.items():
125 if isinstance(info, dict) and info.get('name'):
126 catalog.append({
127 'id': rid,
128 'name': info['name'],
129 'description': info.get('description', ''),
130 'source': 'hive',
131 })
132 except Exception:
133 pass
135 # Google A2A registered agents
136 try:
137 from integrations.google_a2a.dynamic_agent_registry import get_registry
138 a2a_registry = get_registry()
139 for agent in a2a_registry.list_agents():
140 catalog.append({
141 'id': agent.get('id', ''),
142 'name': agent.get('name', ''),
143 'description': agent.get('description', ''),
144 'source': 'a2a',
145 })
146 except Exception:
147 pass
149 return catalog
152def generate_plan_steps(prompt: str, matched_agent: Optional[Dict] = None) -> List[Dict]:
153 """Generate plan steps using the LLM. Falls back to generic steps on failure."""
154 try:
155 from core.safe_hartos_attr import safe_hartos_attr
156 get_llm = safe_hartos_attr('get_llm')
157 if get_llm is None:
158 logger.info(
159 "Plan generation falling back to generic steps: HARTOS "
160 "get_llm not yet resolvable (loader still init)."
161 )
162 raise RuntimeError("HARTOS get_llm unavailable")
163 llm = get_llm(temperature=0.3, max_tokens=800)
164 logger.info(
165 "Plan generation: prompt=%r matched_agent=%s",
166 (prompt or '')[:80],
167 matched_agent.get('name') if matched_agent else None,
168 )
170 agent_context = ""
171 if matched_agent:
172 agent_context = (f"\nMatched expert: {matched_agent['name']} — "
173 f"{matched_agent.get('description', '')}")
175 result = llm.invoke(
176 f"Generate a 3-7 step execution plan for this task. "
177 f"Return ONLY a JSON array: "
178 f'[{{"step_num": 1, "description": "...", "tool_or_agent": "..."}}]'
179 f"{agent_context}\n\nTask: {prompt}"
180 )
182 text = result.content if hasattr(result, 'content') else str(result)
183 import re
184 match = re.search(r'\[.*\]', text, re.DOTALL)
185 if match:
186 steps = json.loads(match.group())
187 if isinstance(steps, list) and len(steps) >= 2:
188 return steps
189 except Exception as e:
190 logger.warning(f"LLM plan generation failed, using fallback: {e}")
192 agent_name = matched_agent['name'] if matched_agent else 'execution'
193 return [
194 {'step_num': 1, 'description': 'Analyze requirements and gather context', 'tool_or_agent': 'analysis'},
195 {'step_num': 2, 'description': 'Plan approach and identify resources', 'tool_or_agent': 'planning'},
196 {'step_num': 3, 'description': 'Execute the task', 'tool_or_agent': agent_name},
197 {'step_num': 4, 'description': 'Deliver results and get feedback', 'tool_or_agent': 'delivery'},
198 ]
201def should_auto_create_agent(prompt: str, prompts_dir: str = None) -> bool:
202 """Return True only if NO existing agent can handle this task.
204 This is the gate that prevents unnecessary agent creation.
205 """
206 match = find_matching_agent(prompt, prompts_dir)
207 return match is None
210def build_agentic_plan(prompt: str, prompts_dir: str = None) -> Dict:
211 """Full pipeline: match → plan. Returns structured plan dict."""
212 matched_agent = find_matching_agent(prompt, prompts_dir)
213 plan_steps = generate_plan_steps(prompt, matched_agent)
215 return {
216 'task_description': prompt,
217 'steps': plan_steps,
218 'matched_agent_id': matched_agent['agent_id'] if matched_agent else None,
219 'matched_agent_name': matched_agent['name'] if matched_agent else None,
220 'matched_agent_source': matched_agent['source'] if matched_agent else None,
221 'confidence': 'high' if matched_agent else 'medium',
222 'requires_new_agent': matched_agent is None,
223 }
226# ─────────────────────────────────────────────────────────────────────
227# Direct-named dispatch (Phase 7b — mention path)
228# Plan reference: sunny-gliding-eich.md, Part B.4 + Part E.5.
229#
230# Used when a specific agent is named (`@solar-architect`) so we don't
231# need to run the matcher — the caller already knows which agent should
232# reply. The dispatch runs the full guardrail pipeline that any other
233# agent action runs through, and posts the reply via
234# CommentService.create so it picks up DLP, classification, fan-out,
235# resonance, etc. — same path a human reply takes. No new privileged
236# code path.
237#
238# Async by default: a daemon thread does the LLM call so the calling
239# Flask request (post create / comment create) returns immediately.
240# If anything fails — guardrail rejection, LLM unreachable, source
241# missing — the worker logs and exits. The Mention + Notification rows
242# persisted upstream survive, so the agent runtime can pick the work up
243# asynchronously the next tick.
244# ─────────────────────────────────────────────────────────────────────
246# Cap agent reply chains to keep two misbehaving agents in a group
247# from spinning up an unbounded thread chain. Reviewer-flagged M2.
248# The depth is carried through the context dict so each recursive
249# trigger increments and refuses past the ceiling.
250_MAX_AGENT_DISPATCH_DEPTH = 2
253def dispatch_to_agent(agent_id: str, prompt: str,
254 context: Dict = None,
255 synchronous: bool = False) -> None:
256 """Spawn an async worker to generate + post the agent's reply.
258 Args:
259 agent_id: User.id of the @-mentioned agent (User.user_type='agent').
260 prompt: Inline content the agent should reason about — already
261 includes surrounding source text from MentionService.
262 context: dict with 'source_kind' ('post' | 'comment' | 'message'),
263 'source_id', 'author_id', 'tenant_id', and optional
264 '_dispatch_depth' (recursion counter, set automatically).
265 synchronous: testing knob. When True, run inline so unit tests can
266 assert post-conditions without thread coordination. Default
267 False — production callers always want fire-and-forget.
269 Recursion guard: if context['_dispatch_depth'] > _MAX_AGENT_DISPATCH_DEPTH
270 the call is refused. This prevents two mention-each-other agents
271 from spinning an unbounded chain (M2 — reviewer-flagged).
273 Never raises — agent dispatch failure must never break the calling
274 post/comment create path.
275 """
276 ctx = dict(context) if context else {}
277 depth = int(ctx.get('_dispatch_depth') or 0)
278 # `>=` so the cap names the max number of levels exactly. With
279 # _MAX_AGENT_DISPATCH_DEPTH=2 we permit depth 0 → 1 (the call
280 # itself) and depth 1 → 2 (one recursive level), refusing at 2.
281 # Reviewer N-NEW-3.
282 if depth >= _MAX_AGENT_DISPATCH_DEPTH:
283 logger.info(
284 "dispatch_to_agent: refusing recursive dispatch at depth=%d "
285 "(ceiling=%d) for agent=%s",
286 depth, _MAX_AGENT_DISPATCH_DEPTH, agent_id)
287 return
288 ctx['_dispatch_depth'] = depth + 1
289 args = (agent_id, prompt, ctx)
290 if synchronous:
291 _dispatch_to_agent_worker(*args)
292 return
293 import threading
294 threading.Thread(
295 target=_dispatch_to_agent_worker,
296 args=args,
297 daemon=True,
298 name=f'mention_dispatch_{(agent_id or "?")[:8]}',
299 ).start()
302def _dispatch_via_chat(agent_id: str, rewritten_prompt: str,
303 context: Dict) -> Optional[str]:
304 """Reuse the canonical /chat endpoint instead of doing a raw
305 ``llm.invoke``.
307 /chat already runs the full agent runtime (autogen / langchain /
308 draft-first routing, persona via
309 ``agent_identity.build_identity_prompt``, dynamic per-agent tools,
310 multi-turn history) that ``flask_integration._handle_message`` has
311 used for 31-channel adapter inbound since 2026-01-31. Social-
312 platform mention dispatch should use the same runtime — this
313 helper lets it.
315 Body shape mirrors what ``flask_integration._handle_message``
316 builds at line 150 so /chat sees the same payload regardless of
317 which caller dispatched. Missing ``agent_id``/``prompt_id``/recipe
318 is the existing /chat fallback signal that routes to LangChain.
320 Transport is HTTP loopback to localhost via ``pooled_post`` — the
321 same connection-pooled transport ``flask_integration`` uses, so we
322 don't open a parallel HTTP client. In every HARTOS deploy mode
323 (flat / regional / central / Docker / ISO / pip-installed server)
324 the HARTOS Flask app is reachable on its configured backend port,
325 so loopback is universally available.
327 Returns the agent reply text on success, or ``None`` on any failure
328 (chat unreachable, non-200, malformed response, missing
329 ``response`` field). Caller falls back to raw ``llm.invoke`` so
330 agents still respond rather than going silent.
332 Behind the ``HEVOLVE_FLAG_DISPATCH_VIA_CHAT`` env flag — default
333 off, dormant until production verifies the /chat path for social-
334 platform agent dispatches.
335 """
336 try:
337 from core.http_pool import pooled_post
338 except Exception as e:
339 logger.warning("dispatch_via_chat: core.http_pool unavailable (%s)", e)
340 return None
342 try:
343 from core.constants import DEFAULT_USER_ID, DEFAULT_PROMPT_ID
344 except Exception:
345 DEFAULT_USER_ID, DEFAULT_PROMPT_ID = 10077, 8888
347 try:
348 from core.port_registry import get_port
349 port = get_port('backend')
350 except Exception:
351 port = int(os.environ.get('FLASK_PORT')
352 or os.environ.get('HEVOLVE_BACKEND_PORT')
353 or '5000')
355 body = {
356 'user_id': context.get('owner_id') or DEFAULT_USER_ID,
357 # /chat early-validation requires prompt_id key to exist —
358 # DEFAULT_PROMPT_ID is the social-platform fallback. /chat's
359 # own logic resolves the actual recipe / persona for this
360 # dispatch from agent_id (when set) before falling back to the
361 # prompt_id default; this field is just the validation
362 # placeholder.
363 'prompt_id': context.get('prompt_id') or DEFAULT_PROMPT_ID,
364 'prompt': rewritten_prompt,
365 # agent_id is the social-platform agent's User.id — /chat uses
366 # this to load the persona via agent_identity, register tools,
367 # and route to the autogen runtime. When absent /chat
368 # gracefully falls back to LangChain (per the existing check).
369 'agent_id': agent_id,
370 # Don't auto-create — social agents are pre-registered via
371 # UserService.register_agent at signup / onboarding time.
372 'create_agent': False,
373 # Forward the originating room context so /chat's response
374 # router can fan out to the right surface.
375 'channel_context': dict({
376 'source_kind': context.get('source_kind'),
377 'source_id': context.get('source_id'),
378 }, **(context.get('channel_context') or {})),
379 }
381 url = f"http://localhost:{port}/chat"
382 try:
383 resp = pooled_post(url, json=body, timeout=120)
384 except Exception as e:
385 logger.warning(
386 "dispatch_via_chat: pooled_post to %s failed: %s", url, e)
387 return None
389 if getattr(resp, 'status_code', 0) != 200:
390 logger.warning(
391 "dispatch_via_chat: /chat returned %s for agent=%s — body=%s",
392 getattr(resp, 'status_code', '?'),
393 agent_id,
394 (getattr(resp, 'text', '') or '')[:200])
395 return None
397 try:
398 data = resp.json()
399 except Exception as e:
400 logger.warning(
401 "dispatch_via_chat: response not JSON for agent=%s: %s",
402 agent_id, e)
403 return None
405 reply = data.get('response') if isinstance(data, dict) else None
406 if not isinstance(reply, str) or not reply.strip():
407 return None
408 return reply.strip()
411def _dispatch_to_agent_worker(agent_id: str, prompt: str, context: Dict):
412 """Worker body: guardrails → LLM → guardrails → post as Comment.
414 Each stage degrades gracefully — missing LLM or guardrails just
415 short-circuits the worker, never bubbles. The Mention rows are
416 already persisted by MentionService so the conversation isn't lost.
417 """
418 if not agent_id or not prompt:
419 return
421 # 1. Pre-dispatch: Constitutional Filter + Ethos + circuit breaker.
422 rewritten = prompt
423 try:
424 from security.hive_guardrails import GuardrailEnforcer
425 passed, reason, rewritten = GuardrailEnforcer.before_dispatch(prompt)
426 if not passed:
427 logger.info("dispatch_to_agent: rejected pre-dispatch (%s); "
428 "agent=%s", reason, agent_id)
429 return
430 except Exception as e:
431 # Guardrails module missing — degrade open in dev, but still
432 # block in any production where the module is expected. Log
433 # loudly so this is visible in audits.
434 logger.warning("dispatch_to_agent: guardrails unavailable; "
435 "proceeding without pre-dispatch check (%s)", e)
437 # 2. Run prompt through the canonical agent runtime.
438 #
439 # Two paths, gated by HEVOLVE_FLAG_DISPATCH_VIA_CHAT:
440 #
441 # ON (target state) — delegate to the canonical /chat HTTP
442 # endpoint flask_integration._handle_message has used for
443 # 31-channel adapter inbound since 2026-01-31. /chat brings:
444 # - autogen / langchain / draft-first routing (chooses the
445 # right runtime per request, with the existing
446 # "no agent_id/prompt_id/recipe → langchain" fallback)
447 # - per-agent persona via build_identity_prompt(agent_config)
448 # - dynamic tool registration per agent
449 # - multi-turn conversation history
450 # This is the unification the social-platform dispatch was
451 # missing — without it, social agents make raw single-shot
452 # llm.invoke calls and have NO tools / persona / history.
453 #
454 # OFF (default — preserves existing behavior) — raw `llm.invoke`
455 # via safe_hartos_attr('get_llm'). Same code path social
456 # agents have used since the 2026-05-04 Phase-7+8+9 mega-commit.
457 # Flag stays off until the /chat path is validated for social-
458 # platform dispatches; flipping the flag is the rollout switch.
459 #
460 # On flag-on `_dispatch_via_chat` returning None (chat endpoint
461 # unreachable, non-200, malformed response), we fall back to raw
462 # llm.invoke so agents still respond rather than going silent.
463 reply_text: Optional[str] = None
464 use_chat = (
465 os.environ.get('HEVOLVE_FLAG_DISPATCH_VIA_CHAT', '').strip().lower()
466 in ('1', 'true', 'yes', 'on')
467 )
468 if use_chat:
469 reply_text = _dispatch_via_chat(agent_id, rewritten, context)
470 if reply_text is None:
471 logger.info(
472 "dispatch_to_agent: /chat path failed; falling back to "
473 "raw LLM for agent=%s", agent_id)
475 if not reply_text:
476 try:
477 from core.safe_hartos_attr import safe_hartos_attr
478 get_llm = safe_hartos_attr('get_llm')
479 if get_llm is None:
480 logger.info("dispatch_to_agent: get_llm unresolved; "
481 "agent=%s — runtime will pick up", agent_id)
482 return
483 llm = get_llm(temperature=0.7, max_tokens=600)
484 result = llm.invoke(rewritten)
485 reply_text = (result.content if hasattr(result, 'content')
486 else str(result)).strip()
487 except Exception as e:
488 logger.warning(
489 "dispatch_to_agent: LLM call failed for agent=%s: %s",
490 agent_id, e)
491 return
493 if not reply_text:
494 return
496 # 3. Post-response: Constructive Filter + energy tracking.
497 try:
498 from security.hive_guardrails import GuardrailEnforcer
499 passed, reason = GuardrailEnforcer.after_response(reply_text)
500 if not passed:
501 logger.info("dispatch_to_agent: response blocked post-LLM "
502 "(%s); agent=%s", reason, agent_id)
503 return
504 except Exception:
505 pass
507 # 4. Post via existing CommentService — same fan-out a human gets.
508 _post_agent_reply(agent_id, context or {}, reply_text)
511def _post_agent_reply(agent_id: str, context: Dict, reply_text: str):
512 """Post the agent's reply through the appropriate existing surface.
514 For source_kind='post' → top-level comment on that post.
515 For source_kind='comment' → reply nested under the parent comment.
516 For source_kind='message' → message in the same conversation
517 (Phase 7c.3 path; agent's reply gets the same fan-out + mention
518 parsing any human reply gets — no privileged path).
519 Unknown source kinds are logged and skipped.
520 """
521 source_kind = context.get('source_kind')
522 source_id = context.get('source_id')
523 if not source_kind or not source_id:
524 return
525 try:
526 from integrations.social.models import db_session, User, Post, Comment
527 from integrations.social.services import CommentService
528 with db_session() as db:
529 agent = db.query(User).filter(User.id == agent_id).first()
530 if not agent:
531 logger.info("dispatch_to_agent: agent user_id=%s not found",
532 agent_id)
533 return
535 if source_kind == 'post':
536 post = db.query(Post).filter(Post.id == source_id).first()
537 if post:
538 CommentService.create(db, post, agent, reply_text)
539 elif source_kind == 'comment':
540 parent = db.query(Comment).filter(
541 Comment.id == source_id).first()
542 if parent:
543 post = db.query(Post).filter(
544 Post.id == parent.post_id).first()
545 if post:
546 CommentService.create(db, post, agent, reply_text,
547 parent_id=parent.id)
548 elif source_kind == 'message':
549 # Conversation message — find the parent conversation
550 # via the source message row, post the agent's reply
551 # as a new Message in that same conversation.
552 from sqlalchemy import text as _text
553 row = db.execute(_text(
554 "SELECT parent_id FROM messages WHERE id = :mid"),
555 {'mid': source_id}
556 ).fetchone()
557 if row is None:
558 logger.info("dispatch_to_agent: source message %s not "
559 "found", source_id)
560 return
561 conv_id = row[0]
562 from integrations.social.conversation_service import (
563 ConversationService)
564 # Auto-add the agent as a conversation member if its
565 # owner already is and the conversation allows agents.
566 # For now we require the agent to already be a member —
567 # joining is handled by AgentJoinGrant in Phase 7d.
568 from integrations.social.conversation_service import (
569 _is_member)
570 if not _is_member(db, conv_id, agent.id):
571 logger.info("dispatch_to_agent: agent %s not a member "
572 "of conv %s; skipping reply",
573 agent.id, conv_id)
574 return
575 ConversationService.send_message(
576 db, conv_id=conv_id, author_id=agent.id,
577 content=reply_text,
578 tenant_id=context.get('tenant_id'))
579 elif source_kind == 'call':
580 # Live voice/video call — the agent's reply text is
581 # destined for an audio publisher (PocketTTS → LiveKit
582 # frames). Three deliveries:
583 #
584 # 1. TTS outbox enqueue → bridge worker drains, hands
585 # to audio publisher (canonical reply path for the
586 # magic loop: speaker → STT → router → TTS → speaker).
587 # 2. Cross-channel transcript persist → the call
588 # transcript reads chronologically alongside any
589 # adapter-channel chat (UNIF-G3). Best-effort.
590 # 3. Liquid UI meet_copilot card emit → the user sees
591 # the agent's reply in the UI overlay even before
592 # audio arrives (UNIF-G5). Best-effort.
593 #
594 # source_id == call_id (CallSession). The agent must
595 # already be attached as a CallParticipant; any
596 # AgentJoinGrant + scope.can_voice gating happens at
597 # CallService.attach_agent before the bridge spins up.
598 call_id = source_id
599 try:
600 from integrations.social.agent_voice_bridge import (
601 enqueue_tts_text)
602 enqueue_tts_text(call_id, agent.id, reply_text)
603 except Exception as e:
604 logger.warning(
605 "dispatch_to_agent: TTS outbox enqueue failed "
606 "(call=%s, agent=%s): %s",
607 call_id, agent.id, e)
608 try:
609 from integrations.social.chat_messages import (
610 persist_external_room_event)
611 persist_external_room_event(
612 user_id=str(context.get('owner_id') or agent.id),
613 platform=str(context.get('platform') or 'livekit'),
614 room_id=str(call_id),
615 sender_id=str(agent.id),
616 text=reply_text,
617 kind='agent_reply',
618 )
619 except Exception as e:
620 logger.debug(
621 "dispatch_to_agent: cross-channel persist "
622 "skipped (%s)", e)
623 try:
624 from core.platform.service_registry import (
625 ServiceRegistry)
626 svc = ServiceRegistry.get('LiquidUIService')
627 if svc is not None:
628 svc.agent_ui_update(
629 str(agent.id),
630 {
631 'type': 'meet_copilot',
632 'call_id': str(call_id),
633 'platform': str(
634 context.get('platform') or 'livekit'),
635 'room_id': str(call_id),
636 'state': 'live',
637 'agent_reply': reply_text,
638 },
639 )
640 except Exception as e:
641 logger.debug(
642 "dispatch_to_agent: meet_copilot emit "
643 "skipped (%s)", e)
644 elif source_kind == 'external_room':
645 # Adapter-bound rooms (Discord channel / WhatsApp group /
646 # Slack channel / Matrix room / Teams channel / etc.).
647 # The agent's reply text is destined for the SAME room
648 # the user wrote in. Delegate to the canonical
649 # ``ChannelResponseRouter.route_response`` — it:
650 # 1. Logs the assistant turn to ConversationEntry.
651 # 2. Sends back to the originating channel via the
652 # ChannelRegistry (each adapter's send_message).
653 # 3. Optionally fans out to other bound channels.
654 # 4. WAMP-notifies the user's desktop.
655 #
656 # Caller's ``context['channel_context']`` MUST carry
657 # ``{channel, chat_id, sender_id}`` — the same dict
658 # ``flask_integration._handle_message`` already builds
659 # for the legacy /chat path. Reusing the same shape
660 # means a future migration can swap the legacy path
661 # over by changing one call site.
662 ch_ctx = context.get('channel_context') or {}
663 if not ch_ctx.get('channel') or not ch_ctx.get('chat_id'):
664 logger.info(
665 "dispatch_to_agent: source_kind='external_room' "
666 "needs context['channel_context'] with "
667 "{channel, chat_id}; got %r — skipping reply",
668 ch_ctx)
669 return
670 try:
671 from integrations.channels.response.router import (
672 get_response_router)
673 get_response_router().route_response(
674 user_id=context.get('owner_id') or agent.id,
675 response_text=reply_text,
676 channel_context=ch_ctx,
677 agent_id=agent.id,
678 # Fan-out is a caller decision — default False
679 # so the agent reply lands in the originating
680 # room only. Callers that want bound-channel
681 # broadcast can pass `fan_out_external=True` in
682 # context.
683 fan_out=bool(context.get('fan_out_external')),
684 )
685 except Exception as e:
686 logger.warning(
687 "dispatch_to_agent: external_room reply via "
688 "ChannelResponseRouter failed (channel=%s "
689 "chat_id=%s): %s",
690 ch_ctx.get('channel'), ch_ctx.get('chat_id'), e)
691 else:
692 logger.info("dispatch_to_agent: source_kind=%s not yet "
693 "supported", source_kind)
694 except Exception as e:
695 logger.warning("dispatch_to_agent: post-reply persist failed: %s", e)