Coverage for core / peer_link / hivemind_handler.py: 84.6%
65 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HiveMind channel (0x05 PRIVATE) receiver.
4The channel registry in `core.peer_link.channels` declares 0x05 = hivemind
5(PRIVATE). `world_model_bridge.query_hivemind` broadcasts on this channel via
6`PeerLinkManager.collect('hivemind', ...)` expecting each peer to reply with a
7thought. Before this module existed, no handler was registered on the
8receiving side — hive-scoped private messages were silently dropped on the
9floor and `collect()` returned an empty list regardless of peer count.
11This module fills that gap in two modes:
131. ``type == 'query'`` (the shape emitted by ``link_manager.collect``) —
14 forward to the in-process HiveMind via ``world_model_bridge`` and return
15 the peer's local thought. Returning a dict from a channel handler makes
16 the peer's reply travel back on the same link (see ChannelDispatcher
17 docstring).
192. ``type == 'deliver'`` (private agent-to-agent message) — deliver to the
20 target agent's mailbox via the agent_ledger `receive_message` primitive
21 so the receiving agent picks it up on its next tick.
23The handler is registered via ``bootstrap_hivemind_handler()`` from the same
24boot sequence that registers the `dispatch` handler in
25``embedded_main._register_device_control_handler`` (and its Nunba
26equivalent). Calls are safe no-ops when PeerLink is disabled or when the
27HiveMind bridge isn't loaded.
28"""
29import logging
30from typing import Any, Dict, Optional
32logger = logging.getLogger('hevolve.peer_link.hivemind')
34# Sentinel returned when the local peer has nothing to contribute. Making
35# this a module-level constant keeps the shape stable for test assertions
36# and avoids accidental None → "no handler registered" ambiguity on the
37# caller side (world_model_bridge treats None/empty as "no peer responded").
38_EMPTY_REPLY: Dict[str, Any] = {'type': 'reply', 'thought': ''}
41def handle_hivemind_message(data: Any, sender_peer_id: str) -> Optional[dict]:
42 """Route an incoming 0x05 hivemind frame.
44 Handler signature matches ``ChannelDispatcher.register`` expectations:
45 ``handler(data, sender_peer_id) -> Optional[dict]``. Returning a dict
46 sends a response back on the same link.
48 Failure modes:
49 - malformed payload (non-dict) → log at debug, return None
50 - unknown ``type`` → log at debug, return None
51 - HiveMind bridge missing (HTTP-only tier) → log at debug, return
52 ``_EMPTY_REPLY`` so the requesting peer's ``collect()`` still sees a
53 structured response rather than hanging on the timeout.
54 """
55 if not isinstance(data, dict):
56 logger.debug(
57 f"hivemind frame from {sender_peer_id[:8] if sender_peer_id else '?'} "
58 f"not a dict: {type(data).__name__}"
59 )
60 return None
62 msg_type = data.get('type', 'query')
64 if msg_type == 'query':
65 return _handle_query(data, sender_peer_id)
66 if msg_type == 'deliver':
67 return _handle_deliver(data, sender_peer_id)
69 logger.debug(
70 f"hivemind frame from {sender_peer_id[:8] if sender_peer_id else '?'} "
71 f"has unknown type={msg_type!r}"
72 )
73 return None
76def _handle_query(data: dict, sender_peer_id: str) -> Optional[dict]:
77 """Respond to a HiveMind thought query.
79 world_model_bridge.query_hivemind emits ``{'type': 'query'}`` by default
80 (see link_manager.collect). Callers may optionally include:
81 * 'query' — textual prompt (when a peer wants targeted fusion)
82 * 'user_id' — owner identity, used for consent checks if present
83 * 'timeout_ms' — the caller's own budget (informational; we don't
84 enforce it here because ChannelDispatcher dispatches synchronously)
85 """
86 try:
87 from integrations.agent_engine.world_model_bridge import (
88 get_world_model_bridge,
89 )
90 except ImportError:
91 # HTTP-only central tier — hevolveai not loaded, nothing to think.
92 return _EMPTY_REPLY
94 try:
95 bridge = get_world_model_bridge()
96 except Exception as e:
97 logger.debug(f"hivemind query: bridge unavailable ({e})")
98 return _EMPTY_REPLY
100 query_text = data.get('query', '') or ''
101 user_id = data.get('user_id') or None
103 try:
104 # query_hivemind already guards on in-process HiveMind availability
105 # and falls back to PeerLink broadcast on its own. We call it with
106 # a conservative timeout — the far-side caller is already waiting
107 # on their own timeout, so ours must be shorter.
108 result = bridge.query_hivemind(
109 query_text=query_text,
110 user_id=user_id,
111 timeout_ms=min(int(data.get('timeout_ms', 500)), 500),
112 )
113 except Exception as e:
114 logger.debug(f"hivemind query failed locally: {e}")
115 return _EMPTY_REPLY
117 if not result:
118 return _EMPTY_REPLY
120 # Shape matches what world_model_bridge.query_hivemind reads back
121 # (line 954–958 of world_model_bridge.py):
122 # peer_resp.get('thought') or peer_resp.get('response')
123 thought = (
124 (result.get('thought') if isinstance(result, dict) else None)
125 or (result.get('response') if isinstance(result, dict) else None)
126 or ''
127 )
128 return {
129 'type': 'reply',
130 'thought': thought,
131 'peer_id': sender_peer_id,
132 }
135def _handle_deliver(data: dict, sender_peer_id: str) -> Optional[dict]:
136 """Deliver a private message to the target agent's mailbox.
138 Uses the agent_ledger primitive so the receiving agent picks up the
139 message on its next dispatcher tick. Required fields:
140 * 'target_agent_id' — agent (task) to deliver to
141 * 'message' — arbitrary dict payload, forwarded as-is
143 The agent_ledger singleton is imported lazily so this handler works
144 when the ledger module isn't loaded (e.g. pure peer-relay topologies).
145 """
146 target = data.get('target_agent_id')
147 message = data.get('message')
148 if not target or not isinstance(message, dict):
149 logger.debug(
150 f"hivemind deliver from {sender_peer_id[:8] if sender_peer_id else '?'}: "
151 f"missing target_agent_id or message (target={target!r})"
152 )
153 return {'type': 'ack', 'delivered': False, 'reason': 'invalid_payload'}
155 try:
156 from integrations.distributed_agent.api import _get_coordinator
157 except ImportError:
158 logger.debug("hivemind deliver: distributed_agent not importable")
159 return {'type': 'ack', 'delivered': False, 'reason': 'no_coordinator'}
161 try:
162 coordinator = _get_coordinator()
163 if coordinator is None:
164 return {'type': 'ack', 'delivered': False, 'reason': 'no_coordinator'}
165 ledger = getattr(coordinator, '_ledger', None)
166 if ledger is None:
167 return {'type': 'ack', 'delivered': False, 'reason': 'no_ledger'}
168 task = ledger.get_task(target)
169 if task is None:
170 return {'type': 'ack', 'delivered': False, 'reason': 'unknown_agent'}
171 # Stamp sender so the receiving agent can filter by origin.
172 stamped = dict(message)
173 stamped.setdefault('from_peer_id', sender_peer_id)
174 task.receive_message(stamped)
175 try:
176 ledger.save()
177 except Exception:
178 # Best-effort persist; the in-memory receive is still valid.
179 pass
180 return {'type': 'ack', 'delivered': True, 'target_agent_id': target}
181 except Exception as e:
182 logger.debug(f"hivemind deliver failed for {target}: {e}")
183 return {'type': 'ack', 'delivered': False, 'reason': str(e)}
186def bootstrap_hivemind_handler() -> bool:
187 """Register the HiveMind handler on the channel dispatcher and on the
188 PeerLinkManager (so future links pick it up).
190 Idempotent — returns ``True`` on first successful registration,
191 ``False`` on any subsequent call (or when PeerLink is absent).
192 """
193 try:
194 from core.peer_link.channels import get_channel_dispatcher
195 from core.peer_link.link_manager import get_link_manager
196 except ImportError:
197 logger.debug("hivemind bootstrap: peer_link not importable")
198 return False
200 dispatcher = get_channel_dispatcher()
201 if dispatcher.has_handlers('hivemind'):
202 return False
204 dispatcher.register('hivemind', handle_hivemind_message)
206 try:
207 mgr = get_link_manager()
208 mgr.register_channel_handler('hivemind', handle_hivemind_message)
209 except Exception as e:
210 # dispatcher registration is the source of truth; manager is a
211 # convenience for per-link delivery. Failure here is non-fatal.
212 logger.debug(f"hivemind bootstrap: link_manager registration skipped: {e}")
214 logger.info("HiveMind (0x05 PRIVATE) channel handler registered")
215 return True