Coverage for core / persona_registry.py: 77.9%

77 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1"""Per-(user, prompt) persona/role registry — single source of truth. 

2 

3A single agent instance can be shared across multiple personas (student / 

4parent / teacher). The Helper agent uses `send_message_to_roles(role, 

5message)` to deliver a message to a specific persona via crossbar. 

6 

7This module hosts the TTLCache singletons that map `{user_id}_{prompt_id}` 

8to the persona/role state, plus the canonical broadcast routine the 

9`send_message_to_roles` tool registers across both flows. 

10 

11Single-writer invariant: 

12 - These TTLCaches are declared ONLY here. 

13 - `reuse_recipe.py` and `create_recipe.py` import (not redeclare) them. 

14 - A drift-guard test enforces this. 

15 

16Populated by: 

17 - `register_persona_for_session(user_id, prompt_id, persona_list)` — 

18 called from create_recipe.set_for_creating_actions (after the 

19 gather-requirements config is loaded) and from reuse_recipe's 

20 update_persona / create_agents_for_user paths. 

21 

22#510 follow-up: this replaces the previous module-level declarations 

23in reuse_recipe.py:181, 185, 186 and the commented-out closure at 

24reuse_recipe.py:1203-1240. 

25""" 

26from __future__ import annotations 

27 

28import logging 

29from typing import Any, Dict, List, Optional 

30 

31from core.session_cache import TTLCache 

32 

33logger = logging.getLogger(__name__) 

34 

35 

36# {f"{user_id}_{prompt_id}": [ 

37# {'agentInstanceID': str, 'user_id': Any, 'role': str, 'deviceID': str, ...}, 

38# ... 

39# ]} 

40agents_session: TTLCache = TTLCache( 

41 ttl_seconds=7200, max_size=500, name='persona_agents_session') 

42 

43# {f"{user_id}_{prompt_id}": {user_id: role_name}} 

44agents_roles: TTLCache = TTLCache( 

45 ttl_seconds=7200, max_size=500, name='persona_agents_roles') 

46 

47# {user_id: {prompt_id: chat_creator_user_id}} — cross-user joined chat 

48chat_joinees: TTLCache = TTLCache( 

49 ttl_seconds=7200, max_size=500, name='persona_chat_joinees') 

50 

51 

52# Canonical WAMP topic base for multi-persona broadcast. Per-user 

53# suffix matches HARTOS's existing convention used by chat/action/ 

54# vision/analogy/social/fleet/game/community topics — so the WAMP 

55# router's per-user-topic ACL (#246) can gate this topic too, and 

56# multi-node hive deployments won't leak one tenant's broadcast to 

57# another via a shared subscription. 

58MULTICHAT_TOPIC_BASE = 'com.hertzai.hevolve.agent.multichat' 

59 

60 

61def multichat_topic_for(target_user_id) -> str: 

62 """Return the per-user WAMP topic for a multi-persona broadcast. 

63 

64 The target_user_id is the user who OWNS the target persona (i.e. 

65 `agents_session[key][i]['user_id']` for the matched entry). Only 

66 that user's HARTOS process subscribes to the suffixed topic, which 

67 contains cross-tenant leaks in a multi-node hive deployment. 

68 

69 Pattern mirrors `core.peer_link.message_bus.chat_topic_for(user_id)` 

70 for `com.hertzai.hevolve.chat.{user_id}`. 

71 """ 

72 return f'{MULTICHAT_TOPIC_BASE}.{target_user_id}' 

73 

74 

75def register_persona_for_session( 

76 user_id: Any, 

77 prompt_id: Any, 

78 persona_list: List[Dict[str, Any]], 

79 device_id: str = 'something', 

80) -> int: 

81 """Populate `agents_session` + `agents_roles` from a persona list. 

82 

83 Called at the start of each recipe flow: 

84 - create_recipe.set_for_creating_actions (after config load with 

85 `config['personas']` / `config['flows'][...]['persona']`) 

86 - reuse_recipe's update_persona flow (already does this inline 

87 at lines 723-731, 802-806 — those usage sites continue to work 

88 unchanged since `agents_session`/`agents_roles` are now 

89 imported from here) 

90 

91 `persona_list` items may be dicts with 'name'/'role' keys or plain 

92 strings. Idempotent — re-call overwrites the session's entries 

93 (useful after persona edits during recipe creation). 

94 

95 Returns the count of personas registered. Never raises — logs + 

96 returns 0 on a malformed persona_list (per `feedback_audit_evidence_discipline.md`: 

97 log everything, no silent gulps). 

98 """ 

99 if user_id is None or prompt_id is None: 

100 logger.warning( 

101 "register_persona_for_session: missing user_id/prompt_id " 

102 "(user_id=%r, prompt_id=%r); skipping", user_id, prompt_id) 

103 return 0 

104 

105 key = f"{user_id}_{prompt_id}" 

106 session_entries: List[Dict[str, Any]] = [] 

107 roles_map: Dict[Any, str] = {} 

108 

109 for persona in (persona_list or []): 

110 # Accept both string names and dict shapes 

111 if isinstance(persona, str): 

112 role_name = persona 

113 elif isinstance(persona, dict): 

114 role_name = persona.get('name') or persona.get('role') 

115 else: 

116 logger.warning( 

117 "register_persona_for_session: skipping unknown persona " 

118 "shape %r", type(persona)) 

119 continue 

120 

121 if not role_name: 

122 logger.warning( 

123 "register_persona_for_session: skipping persona without " 

124 "name/role: %r", persona) 

125 continue 

126 

127 session_entries.append({ 

128 'agentInstanceID': f'com.hertzai.hevolve.chat.{prompt_id}.{user_id}', 

129 'user_id': user_id, 

130 'role': role_name, 

131 'deviceID': device_id, 

132 }) 

133 roles_map[user_id] = role_name 

134 

135 agents_session[key] = session_entries 

136 agents_roles[key] = roles_map 

137 logger.info( 

138 "register_persona_for_session: registered %d persona(s) for %s", 

139 len(session_entries), key) 

140 return len(session_entries) 

141 

142 

143def _resolve_session_for_user(user_id: Any, prompt_id: Any): 

144 """Return (session_entries, key) for the user, falling back to the 

145 chat-creator's session if this user joined another user's chat.""" 

146 key = f"{user_id}_{prompt_id}" 

147 sessions = agents_session.get(key, []) 

148 if sessions: 

149 return sessions, key 

150 

151 # Cross-user join fallback 

152 try: 

153 joinees_for_user = chat_joinees.get(user_id, {}) or {} 

154 creator = joinees_for_user.get(prompt_id) 

155 if creator: 

156 creator_key = f"{creator}_{prompt_id}" 

157 sessions = agents_session.get(creator_key, []) 

158 if sessions: 

159 return sessions, creator_key 

160 except Exception: 

161 logger.warning( 

162 "_resolve_session_for_user: chat_joinees lookup failed for %s", 

163 key, exc_info=True) 

164 

165 return [], key 

166 

167 

168def _send_message_to_roles_impl( 

169 user_id: Any, 

170 prompt_id: Any, 

171 role: str, 

172 message: str, 

173 publish_fn: Optional[Any] = None, 

174) -> str: 

175 """Canonical multi-persona broadcast. 

176 

177 Looks up the target persona by `role` in `agents_session`, then 

178 publishes the message to crossbar topic `MULTICHAT_TOPIC` with 

179 caller metadata so the receiving persona's agent loop can route it. 

180 

181 `publish_fn` is the crossbar publisher — pass 

182 `reuse_recipe.publish_async` (or `helper_fun.publish_async` / 

183 a `safe_hartos_attr('publish_async')`-resolved callable). If 

184 omitted, the function lazy-resolves the canonical 

185 `core.safe_hartos_attr.safe_hartos_attr('publish_async')`. 

186 

187 Returns a status string (per autogen tool contract — string return 

188 is fed back to the LLM as the tool's response). 

189 """ 

190 if not role: 

191 logger.warning( 

192 "send_message_to_roles: empty role arg for %s_%s", 

193 user_id, prompt_id) 

194 return "Cannot broadcast: role argument is empty" 

195 

196 sessions, key = _resolve_session_for_user(user_id, prompt_id) 

197 if not sessions: 

198 logger.warning( 

199 "send_message_to_roles: no agent_session for %s; " 

200 "personas not initialized yet?", key) 

201 return ( 

202 f"No personas registered for session {key}. " 

203 "Run gather-requirements first.") 

204 

205 caller_role = (agents_roles.get(key, {}) or {}).get(user_id) 

206 

207 # Resolve publisher if caller didn't pass one 

208 if publish_fn is None: 

209 try: 

210 from core.safe_hartos_attr import safe_hartos_attr 

211 publish_fn = safe_hartos_attr('publish_async') 

212 except Exception: 

213 logger.error( 

214 "send_message_to_roles: cannot resolve publish_async via " 

215 "safe_hartos_attr", exc_info=True) 

216 return "Crossbar publisher unavailable" 

217 if publish_fn is None: 

218 logger.error( 

219 "send_message_to_roles: publish_async resolved to None") 

220 return "Crossbar publisher unavailable" 

221 

222 for entry in sessions: 

223 if entry.get('role') == role: 

224 payload = dict(entry) 

225 payload.update({ 

226 'message': message, 

227 'caller_role': caller_role, 

228 'caller_user_id': user_id, 

229 'caller_prompt_id': prompt_id, 

230 }) 

231 # Per-user topic — only the target persona's owning user's 

232 # HARTOS process subscribes. Falls back to caller's user_id 

233 # if the persona entry doesn't carry one (defensive — every 

234 # entry SHOULD have a user_id per the register_persona_for_session 

235 # contract). 

236 target_uid = entry.get('user_id') or user_id 

237 topic = multichat_topic_for(target_uid) 

238 try: 

239 publish_fn(topic, payload) 

240 logger.info( 

241 "send_message_to_roles: published to %s role=%s " 

242 "via %s", key, role, topic) 

243 return 'Message sent Successfully' 

244 except Exception: 

245 logger.error( 

246 "send_message_to_roles: publish failed for %s role=%s", 

247 key, role, exc_info=True) 

248 return f"Failed to publish to role={role}" 

249 

250 logger.warning( 

251 "send_message_to_roles: no persona with role=%r in session %s " 

252 "(available roles=%s)", 

253 role, key, [e.get('role') for e in sessions]) 

254 return f"No persona with role={role!r} in session" 

255 

256 

257__all__ = [ 

258 'agents_session', 

259 'agents_roles', 

260 'chat_joinees', 

261 'MULTICHAT_TOPIC_BASE', 

262 'multichat_topic_for', 

263 'register_persona_for_session', 

264 '_send_message_to_roles_impl', 

265]