Coverage for integrations / social / realtime.py: 46.8%

79 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial - Real-time Events 

3 

4Publishes via MessageBus (LOCAL EventBus + PeerLink + Crossbar). 

5Falls back to direct HTTP if MessageBus unavailable. 

6 

7Topic routing (MessageBus TOPIC_MAP): 

8 chat.social → com.hertzai.hevolve.social.{user_id} (RN + web subscribe) 

9 community.feed → com.hertzai.community.feed (RN global feed) 

10 community.message → com.hertzai.hevolve.community.{id} (web per-community) 

11""" 

12import logging 

13import json 

14 

15logger = logging.getLogger('hevolve_social') 

16 

17_publisher = None 

18 

19 

20def _get_publisher(): 

21 global _publisher 

22 if _publisher is not None: 

23 return _publisher 

24 try: 

25 from crossbarhttp3 import CrossbarHttpPublisher 

26 import os 

27 url = os.environ.get('WAMP_URL', 'http://localhost:8088/publish') 

28 _publisher = CrossbarHttpPublisher(url) 

29 except ImportError: 

30 logger.debug("crossbarhttp3 not available, real-time events disabled") 

31 except Exception as e: 

32 logger.debug(f"WAMP publisher init failed: {e}") 

33 return _publisher 

34 

35 

36# Topics that may fan out to all authenticated subscribers (public feed, 

37# aggregate counters, community rooms the user has joined). Anything 

38# else MUST be per-user — the topic string must end with the user_id 

39# so the WAMP router can gate subscriptions via role-based authorizer. 

40_PUBLIC_TOPIC_PREFIXES = ( 

41 'community.feed', 

42 'community.message', 

43 'social.post.', # post-scoped (aggregate vote counts) 

44 'social.comment.', # comment-scoped 

45 'social.user.', # user-scoped (public profile fan-out) 

46 'chat.social', # per-user, user_id threaded via data 

47 'dm.', # per-conversation, gated elsewhere 

48 'presence.', # per-user presence 

49 'game.', # game session id in the topic 

50 'setup_progress', # boot-time setup progress (pre-auth, no user_id) 

51 'setup.', # boot-time setup (broader) 

52 'system.', # system-wide events (catalog/orchestrator) 

53 'catalog.', # model catalog updates 

54 'model.', # per-model lifecycle 

55 'tts.', # per-user audio-ready event (audio URL per request) 

56 'admin.', # admin-console broadcasts 

57 'agent.', # agent lifecycle (creation/review/complete/error) — scoped by agent_id 

58) 

59 

60# Phase 7c.7+ tenant-scoped topic shapes — checked separately so we 

61# can apply per-shape rules (per-conv membership at service layer, 

62# per-user topics enforce .{user_id} suffix here as defense in depth). 

63# Reviewer-flagged H3: blanket 'tenant.' prefix was too permissive. 

64_TENANT_PUBLIC_PREFIX = 'tenant.' 

65 

66 

67def _authorize_topic_for_user_id(topic: str, user_id: str) -> bool: 

68 """Validate that `topic` is publishable for `user_id`. 

69 

70 A topic is considered owned-by-user_id iff: 

71 - it is in the public prefix whitelist above, OR 

72 - it ends with `.{user_id}` or `/{user_id}` (per-user fanout topic), OR 

73 - it is a tenant.* topic shape that the per-shape rule below 

74 accepts (conv-scoped requires service-layer membership check; 

75 user-scoped requires the standard .{user_id} suffix match). 

76 

77 Returns True when the pair is OK, False otherwise. Callers log + 

78 refuse on False so the WAMP router's subscribe-side authorizer 

79 has a defense-in-depth pair enforcing the same invariant at the 

80 publish site. 

81 """ 

82 if not topic: 

83 return False 

84 # Public / aggregate — every authenticated user may receive. 

85 for pref in _PUBLIC_TOPIC_PREFIXES: 

86 if topic == pref or topic.startswith(pref): 

87 return True 

88 # Tenant-scoped topics — split by shape: 

89 # tenant.<tid>.conv.<cid>.{typing|read|message} → service-layer 

90 # membership check is the gate (ConversationService.emit_typing / 

91 # mark_read both call _is_member before publishing). We accept 

92 # here because the topic doesn't carry the publisher's user_id; 

93 # the second layer is the WAMP router's subscribe ACL (Phase 8). 

94 # tenant.<tid>.user.<uid>.<event> → must end with .{user_id} so 

95 # a user can't forge events targeting another user's inbox. 

96 if topic.startswith(_TENANT_PUBLIC_PREFIX): 

97 # Pass-2 N-NEW-4 + Review M2 fix: parse via shared helper so 

98 # publish-side and subscribe-side gates can never drift on 

99 # topic-shape semantics (substring vs segment match). 

100 from .realtime_acl import parse_topic 

101 parsed = parse_topic(topic) 

102 if not parsed.is_tenant_scoped: 

103 return False 

104 scope = parsed.scope 

105 if scope == 'conv': 

106 # Conversation-scoped: service layer is the gate. 

107 return True 

108 # User-scoped: enforce suffix match. 

109 if scope == 'user' and user_id: 

110 if topic.endswith(f'.{user_id}') or topic.endswith(f'/{user_id}'): 

111 return True 

112 # Allow .user.<uid>.<event> shape — strip event suffix and 

113 # check. 

114 head, _, _ = topic.rpartition('.') 

115 if head.endswith(f'.{user_id}'): 

116 return True 

117 return False 

118 # Unknown tenant shape — refuse. 

119 return False 

120 # Per-user topic must end with the publisher's user_id. 

121 if user_id: 

122 if topic.endswith(f'.{user_id}') or topic.endswith(f'/{user_id}'): 

123 return True 

124 return False 

125 

126 

127_PUBLISH_COUNTERS = { 

128 'authorize_refused': 0, 

129 'bus_ok': 0, 

130 'bus_failed': 0, 

131 'http_fallback_ok': 0, 

132 'http_fallback_failed': 0, 

133 'http_no_publisher': 0, 

134} 

135 

136 

137def get_publish_counters() -> dict: 

138 """Pass-1 N3 instrumentation: returns a snapshot of the 

139 publish-fan-out counters. Used by ops dashboards + smoke tests 

140 to detect partial-fan-out regressions (e.g., a deploy where the 

141 bus path silently fails on every call but the HTTP fallback 

142 masks it, leaving PEERLINK + LOCAL legs dark).""" 

143 return dict(_PUBLISH_COUNTERS) 

144 

145 

146def reset_publish_counters() -> None: 

147 """Test helper — zero the counters between tests.""" 

148 for k in _PUBLISH_COUNTERS: 

149 _PUBLISH_COUNTERS[k] = 0 

150 

151 

152def publish_event(topic: str, data: dict, user_id: str = ''): 

153 """Publish via MessageBus (LOCAL + PEERLINK + CROSSBAR). Falls back to direct HTTP. 

154 

155 Authorization guard: cross-user topic publish is refused at this 

156 entry so a compiled-in bug that emits e.g. user A's notification 

157 to topic `...social.B` is caught here instead of leaking to B's 

158 subscribe channel. The WAMP router's role-based subscribe 

159 authorizer (when configured) enforces the same invariant on the 

160 receive side — defense in depth. 

161 

162 Pass-1 N3 instrumentation: 

163 Each leg increments a counter so ops can spot partial fan-out: 

164 - bus_ok / bus_failed 

165 - http_fallback_ok / http_fallback_failed / http_no_publisher 

166 - authorize_refused 

167 Read via get_publish_counters(). 

168 """ 

169 if not _authorize_topic_for_user_id(topic, user_id): 

170 _PUBLISH_COUNTERS['authorize_refused'] += 1 

171 logger.warning( 

172 f"WAMP publish refused: user_id={user_id!r} cannot publish " 

173 f"to topic={topic!r} (cross-user topic subscribe guard)" 

174 ) 

175 return 

176 try: 

177 from core.peer_link.message_bus import get_message_bus 

178 bus = get_message_bus() 

179 bus.publish(topic, data, user_id=user_id) 

180 _PUBLISH_COUNTERS['bus_ok'] += 1 

181 return 

182 except Exception as e: 

183 _PUBLISH_COUNTERS['bus_failed'] += 1 

184 logger.debug("MessageBus publish failed for %s: %s", topic, e) 

185 # Fallback: direct HTTP (original path) 

186 publisher = _get_publisher() 

187 if publisher is None: 

188 _PUBLISH_COUNTERS['http_no_publisher'] += 1 

189 return 

190 try: 

191 publisher.publish(topic, json.dumps(data)) 

192 _PUBLISH_COUNTERS['http_fallback_ok'] += 1 

193 except Exception as e: 

194 _PUBLISH_COUNTERS['http_fallback_failed'] += 1 

195 logger.debug(f"WAMP publish failed for {topic}: {e}") 

196 

197 

198def on_new_post(post_dict: dict, community_name: str = None): 

199 # Broadcast to global community feed (RN subscribes to com.hertzai.community.feed) 

200 publish_event('community.feed', post_dict) 

201 # Also per-community (web subscribes to com.hertzai.hevolve.community.{id}) 

202 if community_name: 

203 data = dict(post_dict) 

204 data['community_id'] = community_name 

205 publish_event('community.message', data) 

206 

207 

208def on_new_comment(comment_dict: dict, post_id: str): 

209 # Local-only (no frontend subscribes to per-post WAMP topics) 

210 publish_event(f'social.post.{post_id}.new_comment', comment_dict) 

211 

212 

213def on_vote_update(target_type: str, target_id: str, score: int): 

214 # Local-only (no frontend subscribes to per-target WAMP topics) 

215 publish_event(f'social.{target_type}.{target_id}.vote', {'score': score}) 

216 

217 

218def on_notification(user_id: str, notification_dict: dict): 

219 # Route to per-user social topic (RN + web subscribe to com.hertzai.hevolve.social.{user_id}) 

220 publish_event('chat.social', { 

221 'type': 'notification', 

222 **notification_dict, 

223 }, user_id=user_id) 

224 # Also broadcast to SSE clients (Nunba desktop) — scoped to the target user 

225 from core.platform.events import broadcast_sse_safe 

226 broadcast_sse_safe('notification', { 

227 'user_id': user_id, 

228 **notification_dict, 

229 }, user_id=user_id)