Coverage for integrations / social / realtime.py: 46.8%
79 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HevolveSocial - Real-time Events
4Publishes via MessageBus (LOCAL EventBus + PeerLink + Crossbar).
5Falls back to direct HTTP if MessageBus unavailable.
7Topic routing (MessageBus TOPIC_MAP):
8 chat.social → com.hertzai.hevolve.social.{user_id} (RN + web subscribe)
9 community.feed → com.hertzai.community.feed (RN global feed)
10 community.message → com.hertzai.hevolve.community.{id} (web per-community)
11"""
12import logging
13import json
15logger = logging.getLogger('hevolve_social')
17_publisher = None
20def _get_publisher():
21 global _publisher
22 if _publisher is not None:
23 return _publisher
24 try:
25 from crossbarhttp3 import CrossbarHttpPublisher
26 import os
27 url = os.environ.get('WAMP_URL', 'http://localhost:8088/publish')
28 _publisher = CrossbarHttpPublisher(url)
29 except ImportError:
30 logger.debug("crossbarhttp3 not available, real-time events disabled")
31 except Exception as e:
32 logger.debug(f"WAMP publisher init failed: {e}")
33 return _publisher
36# Topics that may fan out to all authenticated subscribers (public feed,
37# aggregate counters, community rooms the user has joined). Anything
38# else MUST be per-user — the topic string must end with the user_id
39# so the WAMP router can gate subscriptions via role-based authorizer.
40_PUBLIC_TOPIC_PREFIXES = (
41 'community.feed',
42 'community.message',
43 'social.post.', # post-scoped (aggregate vote counts)
44 'social.comment.', # comment-scoped
45 'social.user.', # user-scoped (public profile fan-out)
46 'chat.social', # per-user, user_id threaded via data
47 'dm.', # per-conversation, gated elsewhere
48 'presence.', # per-user presence
49 'game.', # game session id in the topic
50 'setup_progress', # boot-time setup progress (pre-auth, no user_id)
51 'setup.', # boot-time setup (broader)
52 'system.', # system-wide events (catalog/orchestrator)
53 'catalog.', # model catalog updates
54 'model.', # per-model lifecycle
55 'tts.', # per-user audio-ready event (audio URL per request)
56 'admin.', # admin-console broadcasts
57 'agent.', # agent lifecycle (creation/review/complete/error) — scoped by agent_id
58)
60# Phase 7c.7+ tenant-scoped topic shapes — checked separately so we
61# can apply per-shape rules (per-conv membership at service layer,
62# per-user topics enforce .{user_id} suffix here as defense in depth).
63# Reviewer-flagged H3: blanket 'tenant.' prefix was too permissive.
64_TENANT_PUBLIC_PREFIX = 'tenant.'
67def _authorize_topic_for_user_id(topic: str, user_id: str) -> bool:
68 """Validate that `topic` is publishable for `user_id`.
70 A topic is considered owned-by-user_id iff:
71 - it is in the public prefix whitelist above, OR
72 - it ends with `.{user_id}` or `/{user_id}` (per-user fanout topic), OR
73 - it is a tenant.* topic shape that the per-shape rule below
74 accepts (conv-scoped requires service-layer membership check;
75 user-scoped requires the standard .{user_id} suffix match).
77 Returns True when the pair is OK, False otherwise. Callers log +
78 refuse on False so the WAMP router's subscribe-side authorizer
79 has a defense-in-depth pair enforcing the same invariant at the
80 publish site.
81 """
82 if not topic:
83 return False
84 # Public / aggregate — every authenticated user may receive.
85 for pref in _PUBLIC_TOPIC_PREFIXES:
86 if topic == pref or topic.startswith(pref):
87 return True
88 # Tenant-scoped topics — split by shape:
89 # tenant.<tid>.conv.<cid>.{typing|read|message} → service-layer
90 # membership check is the gate (ConversationService.emit_typing /
91 # mark_read both call _is_member before publishing). We accept
92 # here because the topic doesn't carry the publisher's user_id;
93 # the second layer is the WAMP router's subscribe ACL (Phase 8).
94 # tenant.<tid>.user.<uid>.<event> → must end with .{user_id} so
95 # a user can't forge events targeting another user's inbox.
96 if topic.startswith(_TENANT_PUBLIC_PREFIX):
97 # Pass-2 N-NEW-4 + Review M2 fix: parse via shared helper so
98 # publish-side and subscribe-side gates can never drift on
99 # topic-shape semantics (substring vs segment match).
100 from .realtime_acl import parse_topic
101 parsed = parse_topic(topic)
102 if not parsed.is_tenant_scoped:
103 return False
104 scope = parsed.scope
105 if scope == 'conv':
106 # Conversation-scoped: service layer is the gate.
107 return True
108 # User-scoped: enforce suffix match.
109 if scope == 'user' and user_id:
110 if topic.endswith(f'.{user_id}') or topic.endswith(f'/{user_id}'):
111 return True
112 # Allow .user.<uid>.<event> shape — strip event suffix and
113 # check.
114 head, _, _ = topic.rpartition('.')
115 if head.endswith(f'.{user_id}'):
116 return True
117 return False
118 # Unknown tenant shape — refuse.
119 return False
120 # Per-user topic must end with the publisher's user_id.
121 if user_id:
122 if topic.endswith(f'.{user_id}') or topic.endswith(f'/{user_id}'):
123 return True
124 return False
127_PUBLISH_COUNTERS = {
128 'authorize_refused': 0,
129 'bus_ok': 0,
130 'bus_failed': 0,
131 'http_fallback_ok': 0,
132 'http_fallback_failed': 0,
133 'http_no_publisher': 0,
134}
137def get_publish_counters() -> dict:
138 """Pass-1 N3 instrumentation: returns a snapshot of the
139 publish-fan-out counters. Used by ops dashboards + smoke tests
140 to detect partial-fan-out regressions (e.g., a deploy where the
141 bus path silently fails on every call but the HTTP fallback
142 masks it, leaving PEERLINK + LOCAL legs dark)."""
143 return dict(_PUBLISH_COUNTERS)
146def reset_publish_counters() -> None:
147 """Test helper — zero the counters between tests."""
148 for k in _PUBLISH_COUNTERS:
149 _PUBLISH_COUNTERS[k] = 0
152def publish_event(topic: str, data: dict, user_id: str = ''):
153 """Publish via MessageBus (LOCAL + PEERLINK + CROSSBAR). Falls back to direct HTTP.
155 Authorization guard: cross-user topic publish is refused at this
156 entry so a compiled-in bug that emits e.g. user A's notification
157 to topic `...social.B` is caught here instead of leaking to B's
158 subscribe channel. The WAMP router's role-based subscribe
159 authorizer (when configured) enforces the same invariant on the
160 receive side — defense in depth.
162 Pass-1 N3 instrumentation:
163 Each leg increments a counter so ops can spot partial fan-out:
164 - bus_ok / bus_failed
165 - http_fallback_ok / http_fallback_failed / http_no_publisher
166 - authorize_refused
167 Read via get_publish_counters().
168 """
169 if not _authorize_topic_for_user_id(topic, user_id):
170 _PUBLISH_COUNTERS['authorize_refused'] += 1
171 logger.warning(
172 f"WAMP publish refused: user_id={user_id!r} cannot publish "
173 f"to topic={topic!r} (cross-user topic subscribe guard)"
174 )
175 return
176 try:
177 from core.peer_link.message_bus import get_message_bus
178 bus = get_message_bus()
179 bus.publish(topic, data, user_id=user_id)
180 _PUBLISH_COUNTERS['bus_ok'] += 1
181 return
182 except Exception as e:
183 _PUBLISH_COUNTERS['bus_failed'] += 1
184 logger.debug("MessageBus publish failed for %s: %s", topic, e)
185 # Fallback: direct HTTP (original path)
186 publisher = _get_publisher()
187 if publisher is None:
188 _PUBLISH_COUNTERS['http_no_publisher'] += 1
189 return
190 try:
191 publisher.publish(topic, json.dumps(data))
192 _PUBLISH_COUNTERS['http_fallback_ok'] += 1
193 except Exception as e:
194 _PUBLISH_COUNTERS['http_fallback_failed'] += 1
195 logger.debug(f"WAMP publish failed for {topic}: {e}")
198def on_new_post(post_dict: dict, community_name: str = None):
199 # Broadcast to global community feed (RN subscribes to com.hertzai.community.feed)
200 publish_event('community.feed', post_dict)
201 # Also per-community (web subscribes to com.hertzai.hevolve.community.{id})
202 if community_name:
203 data = dict(post_dict)
204 data['community_id'] = community_name
205 publish_event('community.message', data)
208def on_new_comment(comment_dict: dict, post_id: str):
209 # Local-only (no frontend subscribes to per-post WAMP topics)
210 publish_event(f'social.post.{post_id}.new_comment', comment_dict)
213def on_vote_update(target_type: str, target_id: str, score: int):
214 # Local-only (no frontend subscribes to per-target WAMP topics)
215 publish_event(f'social.{target_type}.{target_id}.vote', {'score': score})
218def on_notification(user_id: str, notification_dict: dict):
219 # Route to per-user social topic (RN + web subscribe to com.hertzai.hevolve.social.{user_id})
220 publish_event('chat.social', {
221 'type': 'notification',
222 **notification_dict,
223 }, user_id=user_id)
224 # Also broadcast to SSE clients (Nunba desktop) — scoped to the target user
225 from core.platform.events import broadcast_sse_safe
226 broadcast_sse_safe('notification', {
227 'user_id': user_id,
228 **notification_dict,
229 }, user_id=user_id)