Coverage for core / peer_link / hivemind_handler.py: 84.6%

65 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HiveMind channel (0x05 PRIVATE) receiver. 

3 

4The channel registry in `core.peer_link.channels` declares 0x05 = hivemind 

5(PRIVATE). `world_model_bridge.query_hivemind` broadcasts on this channel via 

6`PeerLinkManager.collect('hivemind', ...)` expecting each peer to reply with a 

7thought. Before this module existed, no handler was registered on the 

8receiving side — hive-scoped private messages were silently dropped on the 

9floor and `collect()` returned an empty list regardless of peer count. 

10 

11This module fills that gap in two modes: 

12 

131. ``type == 'query'`` (the shape emitted by ``link_manager.collect``) — 

14 forward to the in-process HiveMind via ``world_model_bridge`` and return 

15 the peer's local thought. Returning a dict from a channel handler makes 

16 the peer's reply travel back on the same link (see ChannelDispatcher 

17 docstring). 

18 

192. ``type == 'deliver'`` (private agent-to-agent message) — deliver to the 

20 target agent's mailbox via the agent_ledger `receive_message` primitive 

21 so the receiving agent picks it up on its next tick. 

22 

23The handler is registered via ``bootstrap_hivemind_handler()`` from the same 

24boot sequence that registers the `dispatch` handler in 

25``embedded_main._register_device_control_handler`` (and its Nunba 

26equivalent). Calls are safe no-ops when PeerLink is disabled or when the 

27HiveMind bridge isn't loaded. 

28""" 

29import logging 

30from typing import Any, Dict, Optional 

31 

32logger = logging.getLogger('hevolve.peer_link.hivemind') 

33 

34# Sentinel returned when the local peer has nothing to contribute. Making 

35# this a module-level constant keeps the shape stable for test assertions 

36# and avoids accidental None → "no handler registered" ambiguity on the 

37# caller side (world_model_bridge treats None/empty as "no peer responded"). 

38_EMPTY_REPLY: Dict[str, Any] = {'type': 'reply', 'thought': ''} 

39 

40 

41def handle_hivemind_message(data: Any, sender_peer_id: str) -> Optional[dict]: 

42 """Route an incoming 0x05 hivemind frame. 

43 

44 Handler signature matches ``ChannelDispatcher.register`` expectations: 

45 ``handler(data, sender_peer_id) -> Optional[dict]``. Returning a dict 

46 sends a response back on the same link. 

47 

48 Failure modes: 

49 - malformed payload (non-dict) → log at debug, return None 

50 - unknown ``type`` → log at debug, return None 

51 - HiveMind bridge missing (HTTP-only tier) → log at debug, return 

52 ``_EMPTY_REPLY`` so the requesting peer's ``collect()`` still sees a 

53 structured response rather than hanging on the timeout. 

54 """ 

55 if not isinstance(data, dict): 

56 logger.debug( 

57 f"hivemind frame from {sender_peer_id[:8] if sender_peer_id else '?'} " 

58 f"not a dict: {type(data).__name__}" 

59 ) 

60 return None 

61 

62 msg_type = data.get('type', 'query') 

63 

64 if msg_type == 'query': 

65 return _handle_query(data, sender_peer_id) 

66 if msg_type == 'deliver': 

67 return _handle_deliver(data, sender_peer_id) 

68 

69 logger.debug( 

70 f"hivemind frame from {sender_peer_id[:8] if sender_peer_id else '?'} " 

71 f"has unknown type={msg_type!r}" 

72 ) 

73 return None 

74 

75 

76def _handle_query(data: dict, sender_peer_id: str) -> Optional[dict]: 

77 """Respond to a HiveMind thought query. 

78 

79 world_model_bridge.query_hivemind emits ``{'type': 'query'}`` by default 

80 (see link_manager.collect). Callers may optionally include: 

81 * 'query' — textual prompt (when a peer wants targeted fusion) 

82 * 'user_id' — owner identity, used for consent checks if present 

83 * 'timeout_ms' — the caller's own budget (informational; we don't 

84 enforce it here because ChannelDispatcher dispatches synchronously) 

85 """ 

86 try: 

87 from integrations.agent_engine.world_model_bridge import ( 

88 get_world_model_bridge, 

89 ) 

90 except ImportError: 

91 # HTTP-only central tier — hevolveai not loaded, nothing to think. 

92 return _EMPTY_REPLY 

93 

94 try: 

95 bridge = get_world_model_bridge() 

96 except Exception as e: 

97 logger.debug(f"hivemind query: bridge unavailable ({e})") 

98 return _EMPTY_REPLY 

99 

100 query_text = data.get('query', '') or '' 

101 user_id = data.get('user_id') or None 

102 

103 try: 

104 # query_hivemind already guards on in-process HiveMind availability 

105 # and falls back to PeerLink broadcast on its own. We call it with 

106 # a conservative timeout — the far-side caller is already waiting 

107 # on their own timeout, so ours must be shorter. 

108 result = bridge.query_hivemind( 

109 query_text=query_text, 

110 user_id=user_id, 

111 timeout_ms=min(int(data.get('timeout_ms', 500)), 500), 

112 ) 

113 except Exception as e: 

114 logger.debug(f"hivemind query failed locally: {e}") 

115 return _EMPTY_REPLY 

116 

117 if not result: 

118 return _EMPTY_REPLY 

119 

120 # Shape matches what world_model_bridge.query_hivemind reads back 

121 # (line 954–958 of world_model_bridge.py): 

122 # peer_resp.get('thought') or peer_resp.get('response') 

123 thought = ( 

124 (result.get('thought') if isinstance(result, dict) else None) 

125 or (result.get('response') if isinstance(result, dict) else None) 

126 or '' 

127 ) 

128 return { 

129 'type': 'reply', 

130 'thought': thought, 

131 'peer_id': sender_peer_id, 

132 } 

133 

134 

135def _handle_deliver(data: dict, sender_peer_id: str) -> Optional[dict]: 

136 """Deliver a private message to the target agent's mailbox. 

137 

138 Uses the agent_ledger primitive so the receiving agent picks up the 

139 message on its next dispatcher tick. Required fields: 

140 * 'target_agent_id' — agent (task) to deliver to 

141 * 'message' — arbitrary dict payload, forwarded as-is 

142 

143 The agent_ledger singleton is imported lazily so this handler works 

144 when the ledger module isn't loaded (e.g. pure peer-relay topologies). 

145 """ 

146 target = data.get('target_agent_id') 

147 message = data.get('message') 

148 if not target or not isinstance(message, dict): 

149 logger.debug( 

150 f"hivemind deliver from {sender_peer_id[:8] if sender_peer_id else '?'}: " 

151 f"missing target_agent_id or message (target={target!r})" 

152 ) 

153 return {'type': 'ack', 'delivered': False, 'reason': 'invalid_payload'} 

154 

155 try: 

156 from integrations.distributed_agent.api import _get_coordinator 

157 except ImportError: 

158 logger.debug("hivemind deliver: distributed_agent not importable") 

159 return {'type': 'ack', 'delivered': False, 'reason': 'no_coordinator'} 

160 

161 try: 

162 coordinator = _get_coordinator() 

163 if coordinator is None: 

164 return {'type': 'ack', 'delivered': False, 'reason': 'no_coordinator'} 

165 ledger = getattr(coordinator, '_ledger', None) 

166 if ledger is None: 

167 return {'type': 'ack', 'delivered': False, 'reason': 'no_ledger'} 

168 task = ledger.get_task(target) 

169 if task is None: 

170 return {'type': 'ack', 'delivered': False, 'reason': 'unknown_agent'} 

171 # Stamp sender so the receiving agent can filter by origin. 

172 stamped = dict(message) 

173 stamped.setdefault('from_peer_id', sender_peer_id) 

174 task.receive_message(stamped) 

175 try: 

176 ledger.save() 

177 except Exception: 

178 # Best-effort persist; the in-memory receive is still valid. 

179 pass 

180 return {'type': 'ack', 'delivered': True, 'target_agent_id': target} 

181 except Exception as e: 

182 logger.debug(f"hivemind deliver failed for {target}: {e}") 

183 return {'type': 'ack', 'delivered': False, 'reason': str(e)} 

184 

185 

186def bootstrap_hivemind_handler() -> bool: 

187 """Register the HiveMind handler on the channel dispatcher and on the 

188 PeerLinkManager (so future links pick it up). 

189 

190 Idempotent — returns ``True`` on first successful registration, 

191 ``False`` on any subsequent call (or when PeerLink is absent). 

192 """ 

193 try: 

194 from core.peer_link.channels import get_channel_dispatcher 

195 from core.peer_link.link_manager import get_link_manager 

196 except ImportError: 

197 logger.debug("hivemind bootstrap: peer_link not importable") 

198 return False 

199 

200 dispatcher = get_channel_dispatcher() 

201 if dispatcher.has_handlers('hivemind'): 

202 return False 

203 

204 dispatcher.register('hivemind', handle_hivemind_message) 

205 

206 try: 

207 mgr = get_link_manager() 

208 mgr.register_channel_handler('hivemind', handle_hivemind_message) 

209 except Exception as e: 

210 # dispatcher registration is the source of truth; manager is a 

211 # convenience for per-link delivery. Failure here is non-fatal. 

212 logger.debug(f"hivemind bootstrap: link_manager registration skipped: {e}") 

213 

214 logger.info("HiveMind (0x05 PRIVATE) channel handler registered") 

215 return True