Coverage for core / peer_link / local_subscribers.py: 60.5%

147 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Local Crossbar topic subscribers — replaces cloud chatbot_pipeline subscribers. 

3 

4The cloud chatbot_pipeline (Twisted WAMP ApplicationSession) subscribes to 

5~10 Crossbar topics for logging, confirmation tracking, error handling, etc. 

6This module provides local equivalents that work fully offline. 

7 

8Cloud subscribers replicated: 

9 confirmation.py → DeliveryTracker (delivery tracking + local notification) 

10 longrunning.py → LongRunningTracker (task status logging) 

11 intermediate.py → (inline — just log) 

12 exception.py → (inline — just log) 

13 timeout.py → (inline — just log) 

14 actions.py → (inline — action tracking via DB) 

15 probe.py → (inline — health probe response) 

16 

17Bootstrapped once via bootstrap_local_subscribers(). 

18""" 

19 

20import json 

21import logging 

22import threading 

23import time 

24from collections import OrderedDict 

25from typing import Dict, Optional 

26 

27logger = logging.getLogger('hevolve.local_subscribers') 

28 

29# ─── Delivery Tracker (replaces cloud confirmation.py) ───────────────── 

30 

31_DELIVERY_TTL = 60 # seconds before a message is considered unconfirmed 

32_MAX_PENDING = 500 

33 

34 

35class DeliveryTracker: 

36 """Track message delivery and fire local notifications for unconfirmed messages. 

37 

38 Cloud confirmation.py tracks pending messages and sends FCM push notifications 

39 if not confirmed within 30s. Locally, we: 

40 1. Track pending messages with timestamps 

41 2. After TTL, emit a local notification event (no FCM — that's cloud-only) 

42 3. Multi-device peers handle their own delivery via PeerLink acks 

43 """ 

44 

45 def __init__(self): 

46 self._pending: OrderedDict = OrderedDict() # msg_id → {topic, timestamp, data} 

47 self._lock = threading.Lock() 

48 self._cleanup_thread = None 

49 self._running = False 

50 

51 def start(self): 

52 if self._running: 

53 return 

54 self._running = True 

55 self._cleanup_thread = threading.Thread( 

56 target=self._cleanup_loop, daemon=True, 

57 name='delivery_tracker', 

58 ) 

59 self._cleanup_thread.start() 

60 

61 def stop(self): 

62 self._running = False 

63 

64 def track(self, topic: str, data: dict): 

65 """Track a published message for delivery confirmation. 

66 

67 Keys by request_id (cloud primary key), falls back to msg_id. 

68 Stores topic_name as secondary info (cloud uses 2-level dict, 

69 we use flat dict with topic_name inside the value). 

70 """ 

71 # Match cloud protocol: request_id is primary, msg_id is secondary 

72 tracking_key = data.get('request_id') or data.get('msg_id', '') 

73 if not tracking_key: 

74 return 

75 

76 with self._lock: 

77 self._pending[tracking_key] = { 

78 'topic': topic, 

79 'topic_name': data.get('topic_name', topic), 

80 'timestamp': time.time(), 

81 'user_id': data.get('user_id', ''), 

82 } 

83 # Cap size 

84 while len(self._pending) > _MAX_PENDING: 

85 self._pending.popitem(last=False) 

86 

87 def confirm(self, tracking_key: str): 

88 """Mark a message as delivered (confirmed by frontend).""" 

89 with self._lock: 

90 self._pending.pop(tracking_key, None) 

91 

92 def on_confirmation_message(self, topic: str, data: dict): 

93 """Handle confirmation topic messages. 

94 

95 Cloud protocol (confirmation.py): 

96 - 'confirmation' key PRESENT and False → new unconfirmed message, track it 

97 - 'confirmation' key ABSENT → confirmed by frontend, pop it 

98 

99 We handle both cloud and local format: 

100 - confirmation=False (explicit) → track 

101 - confirmation=True (explicit) → confirm 

102 - confirmation absent → confirm (cloud format) 

103 """ 

104 if isinstance(data, str): 

105 try: 

106 data = json.loads(data) 

107 except (json.JSONDecodeError, TypeError): 

108 return 

109 

110 tracking_key = data.get('request_id') or data.get('msg_id', '') 

111 

112 if 'confirmation' in data: 

113 if not data['confirmation']: 

114 # confirmation=False → new unconfirmed message, track it 

115 self.track(data.get('topic_name', ''), data) 

116 elif tracking_key: 

117 # confirmation=True → confirmed, pop it 

118 self.confirm(tracking_key) 

119 elif tracking_key: 

120 # Key absent → cloud-format confirmation, pop it 

121 self.confirm(tracking_key) 

122 

123 def _cleanup_loop(self): 

124 """Background loop: check for unconfirmed messages and emit notifications.""" 

125 while self._running: 

126 time.sleep(15) # Check every 15s 

127 now = time.time() 

128 expired = [] 

129 

130 with self._lock: 

131 for msg_id, info in list(self._pending.items()): 

132 if now - info['timestamp'] > _DELIVERY_TTL: 

133 expired.append((msg_id, info)) 

134 

135 for msg_id, _ in expired: 

136 self._pending.pop(msg_id, None) 

137 

138 for msg_id, info in expired: 

139 logger.debug( 

140 f"Unconfirmed delivery: {info['topic']} " 

141 f"(user={info['user_id']}, age={_DELIVERY_TTL}s)" 

142 ) 

143 # Emit local notification event for unconfirmed messages 

144 try: 

145 from core.platform.events import emit_event 

146 emit_event('notification.unconfirmed', { 

147 'msg_id': msg_id, 

148 'topic': info['topic'], 

149 'user_id': info['user_id'], 

150 }) 

151 except Exception: 

152 pass 

153 

154 def get_stats(self) -> dict: 

155 with self._lock: 

156 return {'pending': len(self._pending)} 

157 

158 

159# ─── Long-Running Task Tracker (replaces cloud longrunning.py) ────────── 

160 

161class LongRunningTracker: 

162 """Track long-running task status locally. 

163 

164 Cloud longrunning.py logs task status to Teams webhook + tracks in memory. 

165 Locally, we just log and track in memory (no Teams webhook needed). 

166 """ 

167 

168 def __init__(self): 

169 self._tasks: Dict[str, dict] = {} # request_id → latest status 

170 self._lock = threading.Lock() 

171 

172 def on_progress(self, topic: str, data: dict): 

173 """Handle longrunning.log messages.""" 

174 if isinstance(data, str): 

175 try: 

176 data = json.loads(data) 

177 except (json.JSONDecodeError, TypeError): 

178 return 

179 

180 request_id = data.get('request_id', '') 

181 task_name = data.get('task_name', '') 

182 status = data.get('status', '') 

183 

184 if request_id: 

185 with self._lock: 

186 self._tasks[request_id] = { 

187 'task_name': task_name, 

188 'status': status, 

189 'timestamp': time.time(), 

190 'data': data, 

191 } 

192 

193 # Log status transitions 

194 if status in ('ERROR', 'TIMEOUT'): 

195 logger.warning(f"Task {task_name} ({request_id}): {status}") 

196 else: 

197 logger.debug(f"Task {task_name} ({request_id}): {status}") 

198 

199 # Clean old entries (keep last 200) 

200 with self._lock: 

201 if len(self._tasks) > 200: 

202 oldest = sorted(self._tasks.items(), 

203 key=lambda x: x[1].get('timestamp', 0)) 

204 for rid, _ in oldest[:len(self._tasks) - 200]: 

205 del self._tasks[rid] 

206 

207 def get_task_status(self, request_id: str) -> Optional[dict]: 

208 with self._lock: 

209 return self._tasks.get(request_id) 

210 

211 def get_stats(self) -> dict: 

212 with self._lock: 

213 return {'tracked_tasks': len(self._tasks)} 

214 

215 

216# ─── Global instances (thread-safe, matching MessageBus pattern) ──────── 

217 

218_delivery_tracker: Optional[DeliveryTracker] = None 

219_longrunning_tracker: Optional[LongRunningTracker] = None 

220_bootstrapped = False 

221_singleton_lock = threading.Lock() 

222 

223 

224def get_delivery_tracker() -> DeliveryTracker: 

225 global _delivery_tracker 

226 if _delivery_tracker is None: 

227 with _singleton_lock: 

228 if _delivery_tracker is None: 

229 _delivery_tracker = DeliveryTracker() 

230 return _delivery_tracker 

231 

232 

233def get_longrunning_tracker() -> LongRunningTracker: 

234 global _longrunning_tracker 

235 if _longrunning_tracker is None: 

236 with _singleton_lock: 

237 if _longrunning_tracker is None: 

238 _longrunning_tracker = LongRunningTracker() 

239 return _longrunning_tracker 

240 

241 

242def bootstrap_local_subscribers() -> None: 

243 """Wire up local subscribers to the MessageBus. 

244 

245 Call once at startup. Subscribes to all topics that the cloud 

246 chatbot_pipeline handles, using local handlers. 

247 """ 

248 global _bootstrapped 

249 with _singleton_lock: 

250 if _bootstrapped: 

251 return 

252 _bootstrapped = True 

253 

254 try: 

255 from core.peer_link.message_bus import get_message_bus 

256 bus = get_message_bus() 

257 except Exception as e: 

258 logger.warning(f"Cannot bootstrap local subscribers (no MessageBus): {e}") 

259 return 

260 

261 # 1. Confirmation tracking (replaces cloud confirmation.py) 

262 tracker = get_delivery_tracker() 

263 tracker.start() 

264 bus.subscribe('task.confirmation', tracker.on_confirmation_message) 

265 

266 # 2. Long-running task tracking (replaces cloud longrunning.py) 

267 lr = get_longrunning_tracker() 

268 bus.subscribe('task.progress', lr.on_progress) 

269 

270 # 3. Intermediate responses — just log (cloud intermediate.py does the same) 

271 def _on_intermediate(topic, data): 

272 logger.debug(f"Intermediate response: {str(data)[:200]}") 

273 

274 bus.subscribe('task.intermediate', _on_intermediate) 

275 

276 # 4. Exception logging (replaces cloud exception.py) 

277 def _on_exception(topic, data): 

278 if isinstance(data, str): 

279 try: 

280 data = json.loads(data) 

281 except Exception: 

282 data = {'raw': data} 

283 logger.error(f"Agent exception: {data.get('error', data.get('raw', str(data)[:200]))}") 

284 

285 bus.subscribe('task.exception', _on_exception) 

286 

287 # 5. Timeout tracking (replaces cloud timeout.py) 

288 def _on_timeout(topic, data): 

289 if isinstance(data, str): 

290 try: 

291 data = json.loads(data) 

292 except Exception: 

293 data = {'raw': data} 

294 logger.warning(f"Task timeout: {data.get('task_name', '')} ({data.get('request_id', '')})") 

295 

296 bus.subscribe('task.timeout', _on_timeout) 

297 

298 # 6. Health probe response (replaces cloud probe.py) 

299 def _on_probe(topic, data): 

300 logger.debug("Health probe received") 

301 try: 

302 bus.publish('task.probe_response', { 

303 'status': 'ok', 

304 'timestamp': time.time(), 

305 'delivery': tracker.get_stats(), 

306 'longrunning': lr.get_stats(), 

307 }, skip_crossbar=True) 

308 except Exception: 

309 pass 

310 

311 # Probe uses a different topic pattern but subscribe anyway 

312 bus.subscribe('task.probe', _on_probe) 

313 

314 logger.info( 

315 "Local subscribers bootstrapped: confirmation, longrunning, " 

316 "intermediate, exception, timeout, probe" 

317 )