Coverage for core / peer_link / crossbar_publish.py: 100.0%
33 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""Chat-bubble publishers — priority-49 'Thinking' bubble + future siblings.
3Sibling modules in this package, one per topic family:
4 - ui_commands.py: HARTOS→phone fleet commands
5 - realtime.py: social / community events
6 - crossbar_publish.py (this): chat-bubble messages on
7 com.hertzai.hevolve.chat.{user_id}
9All route through publish_async, which itself fans out via
10MessageBus.publish (LOCAL + SSE + PEERLINK + CROSSBAR). The
11hartos_backend_adapter._capture_thinking monkey-patch on
12publish_async still fires, so per-request thinking buffers keep
13working with no adapter change.
15The 'preffered_language' typo is preserved on the wire because
16the historical schema has carried it for a long time and frontends
17may key on the misspelt field name.
19Latent bug NOT fixed here:
20 Two FULL-schema callers (autogen GroupChat + action-start tap)
21 historically pass request_id="123456" as a literal placeholder.
22 All their traces collapse into one buffer key in Nunba's
23 adapter. Migrating those callers preserves the placeholder for
24 byte-identical output; fixing the propagation is tracked
25 separately.
26"""
27from __future__ import annotations
29import json
30import logging
31import uuid
32from typing import Any
34logger = logging.getLogger(__name__)
36# Zoom-box stub — autogen FULL shape ships this so the avatar /
37# book-parsing UI sees a stable schema. All-zero coords mean
38# "no zoom"; consumer treats as absent.
39_ZOOM_STUB: dict = {
40 'top_left': {'x': 0, 'y': 0},
41 'top_right': {'x': 0, 'y': 0},
42 'bottom_right': {'x': 0, 'y': 0},
43 'bottom_left': {'x': 0, 'y': 0},
44}
47def publish_thinking_trace(
48 *,
49 text: Any,
50 user_id: str,
51 request_id: str = '',
52 bot_type: str = 'Agent',
53 full_schema: bool = False,
54 preffered_language: str = 'en-US',
55) -> bool:
56 """Build a priority-49 'Thinking' bubble + publish to the user's chat topic.
58 Args:
59 text: Bubble body. Coerced to str via str() if not already a
60 string — autogen taps occasionally pass non-str content
61 whose default repr was historically what the wire carried.
62 user_id: Target user. Empty / None returns False without publishing.
63 request_id: Per-request span id used by Nunba's adapter to key
64 trace buffers.
65 bot_type: 'Agent' for normal pipeline traces, 'ComputeRouter'
66 for compute-routing status pushes.
67 full_schema: True for the autogen GroupChat / action-start
68 taps (adds zoom_bounding_box, page/analogy image URLs,
69 preffered_language). False for tool closures and routing
70 status pushes.
71 preffered_language: Language tag (typo preserved). Only
72 emitted when full_schema=True.
74 Returns:
75 True if publish_async was invoked, False if HARTOS publisher
76 is unresolvable or user_id is empty. Exceptions inside
77 publish_async are logged at debug and absorbed.
78 """
79 if not user_id:
80 return False
82 text_str = text if isinstance(text, str) else str(text)
84 # Per-event dedup id — uuid4 hex. Each thinking emit gets a unique
85 # msg_id so multiple events sharing the same request_id (the typical
86 # case — N thinking steps within one chat turn) are NOT collapsed by
87 # client dedup (crossbarWorker.processedMessages, realtimeService.
88 # _seenIds, Android processedRequestIds). request_id stays the
89 # GROUPING / filtering key; msg_id is the dedup key.
90 _msg_id = uuid.uuid4().hex
92 if full_schema:
93 envelope = {
94 'text': [text_str],
95 'priority': 49,
96 'action': 'Thinking',
97 'historical_request_id': [],
98 'preffered_language': preffered_language,
99 'options': [],
100 'newoptions': [],
101 'bot_type': bot_type,
102 'page_image_url': '',
103 'analogy_image_url': '',
104 'request_id': request_id,
105 'msg_id': _msg_id,
106 'zoom_bounding_box': _ZOOM_STUB,
107 }
108 else:
109 envelope = {
110 'text': [text_str],
111 'priority': 49,
112 'action': 'Thinking',
113 'bot_type': bot_type,
114 'request_id': request_id,
115 'msg_id': _msg_id,
116 'historical_request_id': [],
117 'options': [],
118 'newoptions': [],
119 }
121 try:
122 from core.safe_hartos_attr import safe_hartos_attr
123 publish_async = safe_hartos_attr('publish_async')
124 if publish_async is None:
125 logger.debug(
126 "publish_thinking_trace: HARTOS publish_async unresolvable")
127 return False
128 publish_async(
129 f'com.hertzai.hevolve.chat.{user_id}',
130 json.dumps(envelope),
131 )
132 return True
133 except Exception as e:
134 logger.debug(f"publish_thinking_trace failed: {e}")
135 return False
138def publish_chat_stage(stage: str, *, user_id: str, request_id: str = '', text: str = None) -> bool:
139 """Emit a chat-hot-path milestone (#508) via the canonical thinking-trace
140 publisher. When `text` is provided, use it directly (per-tool calls);
141 otherwise look up CHAT_STAGE_TEXTS[stage]. Returns False if user_id
142 missing or stage/text unresolvable."""
143 if not user_id:
144 return False
145 if text is None:
146 from core.constants import CHAT_STAGE_TEXTS
147 text = CHAT_STAGE_TEXTS.get(stage)
148 if not text:
149 logger.warning("publish_chat_stage: unknown stage %r", stage)
150 return False
151 return publish_thinking_trace(
152 text=text, user_id=str(user_id), request_id=str(request_id or ''),
153 bot_type='Agent', full_schema=False,
154 )