Coverage for core / peer_link / crossbar_publish.py: 100.0%

33 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1"""Chat-bubble publishers — priority-49 'Thinking' bubble + future siblings. 

2 

3Sibling modules in this package, one per topic family: 

4 - ui_commands.py: HARTOS→phone fleet commands 

5 - realtime.py: social / community events 

6 - crossbar_publish.py (this): chat-bubble messages on 

7 com.hertzai.hevolve.chat.{user_id} 

8 

9All route through publish_async, which itself fans out via 

10MessageBus.publish (LOCAL + SSE + PEERLINK + CROSSBAR). The 

11hartos_backend_adapter._capture_thinking monkey-patch on 

12publish_async still fires, so per-request thinking buffers keep 

13working with no adapter change. 

14 

15The 'preffered_language' typo is preserved on the wire because 

16the historical schema has carried it for a long time and frontends 

17may key on the misspelt field name. 

18 

19Latent bug NOT fixed here: 

20 Two FULL-schema callers (autogen GroupChat + action-start tap) 

21 historically pass request_id="123456" as a literal placeholder. 

22 All their traces collapse into one buffer key in Nunba's 

23 adapter. Migrating those callers preserves the placeholder for 

24 byte-identical output; fixing the propagation is tracked 

25 separately. 

26""" 

27from __future__ import annotations 

28 

29import json 

30import logging 

31import uuid 

32from typing import Any 

33 

34logger = logging.getLogger(__name__) 

35 

36# Zoom-box stub — autogen FULL shape ships this so the avatar / 

37# book-parsing UI sees a stable schema. All-zero coords mean 

38# "no zoom"; consumer treats as absent. 

39_ZOOM_STUB: dict = { 

40 'top_left': {'x': 0, 'y': 0}, 

41 'top_right': {'x': 0, 'y': 0}, 

42 'bottom_right': {'x': 0, 'y': 0}, 

43 'bottom_left': {'x': 0, 'y': 0}, 

44} 

45 

46 

47def publish_thinking_trace( 

48 *, 

49 text: Any, 

50 user_id: str, 

51 request_id: str = '', 

52 bot_type: str = 'Agent', 

53 full_schema: bool = False, 

54 preffered_language: str = 'en-US', 

55) -> bool: 

56 """Build a priority-49 'Thinking' bubble + publish to the user's chat topic. 

57 

58 Args: 

59 text: Bubble body. Coerced to str via str() if not already a 

60 string — autogen taps occasionally pass non-str content 

61 whose default repr was historically what the wire carried. 

62 user_id: Target user. Empty / None returns False without publishing. 

63 request_id: Per-request span id used by Nunba's adapter to key 

64 trace buffers. 

65 bot_type: 'Agent' for normal pipeline traces, 'ComputeRouter' 

66 for compute-routing status pushes. 

67 full_schema: True for the autogen GroupChat / action-start 

68 taps (adds zoom_bounding_box, page/analogy image URLs, 

69 preffered_language). False for tool closures and routing 

70 status pushes. 

71 preffered_language: Language tag (typo preserved). Only 

72 emitted when full_schema=True. 

73 

74 Returns: 

75 True if publish_async was invoked, False if HARTOS publisher 

76 is unresolvable or user_id is empty. Exceptions inside 

77 publish_async are logged at debug and absorbed. 

78 """ 

79 if not user_id: 

80 return False 

81 

82 text_str = text if isinstance(text, str) else str(text) 

83 

84 # Per-event dedup id — uuid4 hex. Each thinking emit gets a unique 

85 # msg_id so multiple events sharing the same request_id (the typical 

86 # case — N thinking steps within one chat turn) are NOT collapsed by 

87 # client dedup (crossbarWorker.processedMessages, realtimeService. 

88 # _seenIds, Android processedRequestIds). request_id stays the 

89 # GROUPING / filtering key; msg_id is the dedup key. 

90 _msg_id = uuid.uuid4().hex 

91 

92 if full_schema: 

93 envelope = { 

94 'text': [text_str], 

95 'priority': 49, 

96 'action': 'Thinking', 

97 'historical_request_id': [], 

98 'preffered_language': preffered_language, 

99 'options': [], 

100 'newoptions': [], 

101 'bot_type': bot_type, 

102 'page_image_url': '', 

103 'analogy_image_url': '', 

104 'request_id': request_id, 

105 'msg_id': _msg_id, 

106 'zoom_bounding_box': _ZOOM_STUB, 

107 } 

108 else: 

109 envelope = { 

110 'text': [text_str], 

111 'priority': 49, 

112 'action': 'Thinking', 

113 'bot_type': bot_type, 

114 'request_id': request_id, 

115 'msg_id': _msg_id, 

116 'historical_request_id': [], 

117 'options': [], 

118 'newoptions': [], 

119 } 

120 

121 try: 

122 from core.safe_hartos_attr import safe_hartos_attr 

123 publish_async = safe_hartos_attr('publish_async') 

124 if publish_async is None: 

125 logger.debug( 

126 "publish_thinking_trace: HARTOS publish_async unresolvable") 

127 return False 

128 publish_async( 

129 f'com.hertzai.hevolve.chat.{user_id}', 

130 json.dumps(envelope), 

131 ) 

132 return True 

133 except Exception as e: 

134 logger.debug(f"publish_thinking_trace failed: {e}") 

135 return False 

136 

137 

138def publish_chat_stage(stage: str, *, user_id: str, request_id: str = '', text: str = None) -> bool: 

139 """Emit a chat-hot-path milestone (#508) via the canonical thinking-trace 

140 publisher. When `text` is provided, use it directly (per-tool calls); 

141 otherwise look up CHAT_STAGE_TEXTS[stage]. Returns False if user_id 

142 missing or stage/text unresolvable.""" 

143 if not user_id: 

144 return False 

145 if text is None: 

146 from core.constants import CHAT_STAGE_TEXTS 

147 text = CHAT_STAGE_TEXTS.get(stage) 

148 if not text: 

149 logger.warning("publish_chat_stage: unknown stage %r", stage) 

150 return False 

151 return publish_thinking_trace( 

152 text=text, user_id=str(user_id), request_id=str(request_id or ''), 

153 bot_type='Agent', full_schema=False, 

154 )