Coverage for core / persona_registry.py: 77.9%
77 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""Per-(user, prompt) persona/role registry — single source of truth.
3A single agent instance can be shared across multiple personas (student /
4parent / teacher). The Helper agent uses `send_message_to_roles(role,
5message)` to deliver a message to a specific persona via crossbar.
7This module hosts the TTLCache singletons that map `{user_id}_{prompt_id}`
8to the persona/role state, plus the canonical broadcast routine the
9`send_message_to_roles` tool registers across both flows.
11Single-writer invariant:
12 - These TTLCaches are declared ONLY here.
13 - `reuse_recipe.py` and `create_recipe.py` import (not redeclare) them.
14 - A drift-guard test enforces this.
16Populated by:
17 - `register_persona_for_session(user_id, prompt_id, persona_list)` —
18 called from create_recipe.set_for_creating_actions (after the
19 gather-requirements config is loaded) and from reuse_recipe's
20 update_persona / create_agents_for_user paths.
22#510 follow-up: this replaces the previous module-level declarations
23in reuse_recipe.py:181, 185, 186 and the commented-out closure at
24reuse_recipe.py:1203-1240.
25"""
26from __future__ import annotations
28import logging
29from typing import Any, Dict, List, Optional
31from core.session_cache import TTLCache
33logger = logging.getLogger(__name__)
36# {f"{user_id}_{prompt_id}": [
37# {'agentInstanceID': str, 'user_id': Any, 'role': str, 'deviceID': str, ...},
38# ...
39# ]}
40agents_session: TTLCache = TTLCache(
41 ttl_seconds=7200, max_size=500, name='persona_agents_session')
43# {f"{user_id}_{prompt_id}": {user_id: role_name}}
44agents_roles: TTLCache = TTLCache(
45 ttl_seconds=7200, max_size=500, name='persona_agents_roles')
47# {user_id: {prompt_id: chat_creator_user_id}} — cross-user joined chat
48chat_joinees: TTLCache = TTLCache(
49 ttl_seconds=7200, max_size=500, name='persona_chat_joinees')
52# Canonical WAMP topic base for multi-persona broadcast. Per-user
53# suffix matches HARTOS's existing convention used by chat/action/
54# vision/analogy/social/fleet/game/community topics — so the WAMP
55# router's per-user-topic ACL (#246) can gate this topic too, and
56# multi-node hive deployments won't leak one tenant's broadcast to
57# another via a shared subscription.
58MULTICHAT_TOPIC_BASE = 'com.hertzai.hevolve.agent.multichat'
61def multichat_topic_for(target_user_id) -> str:
62 """Return the per-user WAMP topic for a multi-persona broadcast.
64 The target_user_id is the user who OWNS the target persona (i.e.
65 `agents_session[key][i]['user_id']` for the matched entry). Only
66 that user's HARTOS process subscribes to the suffixed topic, which
67 contains cross-tenant leaks in a multi-node hive deployment.
69 Pattern mirrors `core.peer_link.message_bus.chat_topic_for(user_id)`
70 for `com.hertzai.hevolve.chat.{user_id}`.
71 """
72 return f'{MULTICHAT_TOPIC_BASE}.{target_user_id}'
75def register_persona_for_session(
76 user_id: Any,
77 prompt_id: Any,
78 persona_list: List[Dict[str, Any]],
79 device_id: str = 'something',
80) -> int:
81 """Populate `agents_session` + `agents_roles` from a persona list.
83 Called at the start of each recipe flow:
84 - create_recipe.set_for_creating_actions (after config load with
85 `config['personas']` / `config['flows'][...]['persona']`)
86 - reuse_recipe's update_persona flow (already does this inline
87 at lines 723-731, 802-806 — those usage sites continue to work
88 unchanged since `agents_session`/`agents_roles` are now
89 imported from here)
91 `persona_list` items may be dicts with 'name'/'role' keys or plain
92 strings. Idempotent — re-call overwrites the session's entries
93 (useful after persona edits during recipe creation).
95 Returns the count of personas registered. Never raises — logs +
96 returns 0 on a malformed persona_list (per `feedback_audit_evidence_discipline.md`:
97 log everything, no silent gulps).
98 """
99 if user_id is None or prompt_id is None:
100 logger.warning(
101 "register_persona_for_session: missing user_id/prompt_id "
102 "(user_id=%r, prompt_id=%r); skipping", user_id, prompt_id)
103 return 0
105 key = f"{user_id}_{prompt_id}"
106 session_entries: List[Dict[str, Any]] = []
107 roles_map: Dict[Any, str] = {}
109 for persona in (persona_list or []):
110 # Accept both string names and dict shapes
111 if isinstance(persona, str):
112 role_name = persona
113 elif isinstance(persona, dict):
114 role_name = persona.get('name') or persona.get('role')
115 else:
116 logger.warning(
117 "register_persona_for_session: skipping unknown persona "
118 "shape %r", type(persona))
119 continue
121 if not role_name:
122 logger.warning(
123 "register_persona_for_session: skipping persona without "
124 "name/role: %r", persona)
125 continue
127 session_entries.append({
128 'agentInstanceID': f'com.hertzai.hevolve.chat.{prompt_id}.{user_id}',
129 'user_id': user_id,
130 'role': role_name,
131 'deviceID': device_id,
132 })
133 roles_map[user_id] = role_name
135 agents_session[key] = session_entries
136 agents_roles[key] = roles_map
137 logger.info(
138 "register_persona_for_session: registered %d persona(s) for %s",
139 len(session_entries), key)
140 return len(session_entries)
143def _resolve_session_for_user(user_id: Any, prompt_id: Any):
144 """Return (session_entries, key) for the user, falling back to the
145 chat-creator's session if this user joined another user's chat."""
146 key = f"{user_id}_{prompt_id}"
147 sessions = agents_session.get(key, [])
148 if sessions:
149 return sessions, key
151 # Cross-user join fallback
152 try:
153 joinees_for_user = chat_joinees.get(user_id, {}) or {}
154 creator = joinees_for_user.get(prompt_id)
155 if creator:
156 creator_key = f"{creator}_{prompt_id}"
157 sessions = agents_session.get(creator_key, [])
158 if sessions:
159 return sessions, creator_key
160 except Exception:
161 logger.warning(
162 "_resolve_session_for_user: chat_joinees lookup failed for %s",
163 key, exc_info=True)
165 return [], key
168def _send_message_to_roles_impl(
169 user_id: Any,
170 prompt_id: Any,
171 role: str,
172 message: str,
173 publish_fn: Optional[Any] = None,
174) -> str:
175 """Canonical multi-persona broadcast.
177 Looks up the target persona by `role` in `agents_session`, then
178 publishes the message to crossbar topic `MULTICHAT_TOPIC` with
179 caller metadata so the receiving persona's agent loop can route it.
181 `publish_fn` is the crossbar publisher — pass
182 `reuse_recipe.publish_async` (or `helper_fun.publish_async` /
183 a `safe_hartos_attr('publish_async')`-resolved callable). If
184 omitted, the function lazy-resolves the canonical
185 `core.safe_hartos_attr.safe_hartos_attr('publish_async')`.
187 Returns a status string (per autogen tool contract — string return
188 is fed back to the LLM as the tool's response).
189 """
190 if not role:
191 logger.warning(
192 "send_message_to_roles: empty role arg for %s_%s",
193 user_id, prompt_id)
194 return "Cannot broadcast: role argument is empty"
196 sessions, key = _resolve_session_for_user(user_id, prompt_id)
197 if not sessions:
198 logger.warning(
199 "send_message_to_roles: no agent_session for %s; "
200 "personas not initialized yet?", key)
201 return (
202 f"No personas registered for session {key}. "
203 "Run gather-requirements first.")
205 caller_role = (agents_roles.get(key, {}) or {}).get(user_id)
207 # Resolve publisher if caller didn't pass one
208 if publish_fn is None:
209 try:
210 from core.safe_hartos_attr import safe_hartos_attr
211 publish_fn = safe_hartos_attr('publish_async')
212 except Exception:
213 logger.error(
214 "send_message_to_roles: cannot resolve publish_async via "
215 "safe_hartos_attr", exc_info=True)
216 return "Crossbar publisher unavailable"
217 if publish_fn is None:
218 logger.error(
219 "send_message_to_roles: publish_async resolved to None")
220 return "Crossbar publisher unavailable"
222 for entry in sessions:
223 if entry.get('role') == role:
224 payload = dict(entry)
225 payload.update({
226 'message': message,
227 'caller_role': caller_role,
228 'caller_user_id': user_id,
229 'caller_prompt_id': prompt_id,
230 })
231 # Per-user topic — only the target persona's owning user's
232 # HARTOS process subscribes. Falls back to caller's user_id
233 # if the persona entry doesn't carry one (defensive — every
234 # entry SHOULD have a user_id per the register_persona_for_session
235 # contract).
236 target_uid = entry.get('user_id') or user_id
237 topic = multichat_topic_for(target_uid)
238 try:
239 publish_fn(topic, payload)
240 logger.info(
241 "send_message_to_roles: published to %s role=%s "
242 "via %s", key, role, topic)
243 return 'Message sent Successfully'
244 except Exception:
245 logger.error(
246 "send_message_to_roles: publish failed for %s role=%s",
247 key, role, exc_info=True)
248 return f"Failed to publish to role={role}"
250 logger.warning(
251 "send_message_to_roles: no persona with role=%r in session %s "
252 "(available roles=%s)",
253 role, key, [e.get('role') for e in sessions])
254 return f"No persona with role={role!r} in session"
257__all__ = [
258 'agents_session',
259 'agents_roles',
260 'chat_joinees',
261 'MULTICHAT_TOPIC_BASE',
262 'multichat_topic_for',
263 'register_persona_for_session',
264 '_send_message_to_roles_impl',
265]