Coverage for core / peer_link / local_subscribers.py: 60.5%
147 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Local Crossbar topic subscribers — replaces cloud chatbot_pipeline subscribers.
4The cloud chatbot_pipeline (Twisted WAMP ApplicationSession) subscribes to
5~10 Crossbar topics for logging, confirmation tracking, error handling, etc.
6This module provides local equivalents that work fully offline.
8Cloud subscribers replicated:
9 confirmation.py → DeliveryTracker (delivery tracking + local notification)
10 longrunning.py → LongRunningTracker (task status logging)
11 intermediate.py → (inline — just log)
12 exception.py → (inline — just log)
13 timeout.py → (inline — just log)
14 actions.py → (inline — action tracking via DB)
15 probe.py → (inline — health probe response)
17Bootstrapped once via bootstrap_local_subscribers().
18"""
20import json
21import logging
22import threading
23import time
24from collections import OrderedDict
25from typing import Dict, Optional
27logger = logging.getLogger('hevolve.local_subscribers')
29# ─── Delivery Tracker (replaces cloud confirmation.py) ─────────────────
31_DELIVERY_TTL = 60 # seconds before a message is considered unconfirmed
32_MAX_PENDING = 500
35class DeliveryTracker:
36 """Track message delivery and fire local notifications for unconfirmed messages.
38 Cloud confirmation.py tracks pending messages and sends FCM push notifications
39 if not confirmed within 30s. Locally, we:
40 1. Track pending messages with timestamps
41 2. After TTL, emit a local notification event (no FCM — that's cloud-only)
42 3. Multi-device peers handle their own delivery via PeerLink acks
43 """
45 def __init__(self):
46 self._pending: OrderedDict = OrderedDict() # msg_id → {topic, timestamp, data}
47 self._lock = threading.Lock()
48 self._cleanup_thread = None
49 self._running = False
51 def start(self):
52 if self._running:
53 return
54 self._running = True
55 self._cleanup_thread = threading.Thread(
56 target=self._cleanup_loop, daemon=True,
57 name='delivery_tracker',
58 )
59 self._cleanup_thread.start()
61 def stop(self):
62 self._running = False
64 def track(self, topic: str, data: dict):
65 """Track a published message for delivery confirmation.
67 Keys by request_id (cloud primary key), falls back to msg_id.
68 Stores topic_name as secondary info (cloud uses 2-level dict,
69 we use flat dict with topic_name inside the value).
70 """
71 # Match cloud protocol: request_id is primary, msg_id is secondary
72 tracking_key = data.get('request_id') or data.get('msg_id', '')
73 if not tracking_key:
74 return
76 with self._lock:
77 self._pending[tracking_key] = {
78 'topic': topic,
79 'topic_name': data.get('topic_name', topic),
80 'timestamp': time.time(),
81 'user_id': data.get('user_id', ''),
82 }
83 # Cap size
84 while len(self._pending) > _MAX_PENDING:
85 self._pending.popitem(last=False)
87 def confirm(self, tracking_key: str):
88 """Mark a message as delivered (confirmed by frontend)."""
89 with self._lock:
90 self._pending.pop(tracking_key, None)
92 def on_confirmation_message(self, topic: str, data: dict):
93 """Handle confirmation topic messages.
95 Cloud protocol (confirmation.py):
96 - 'confirmation' key PRESENT and False → new unconfirmed message, track it
97 - 'confirmation' key ABSENT → confirmed by frontend, pop it
99 We handle both cloud and local format:
100 - confirmation=False (explicit) → track
101 - confirmation=True (explicit) → confirm
102 - confirmation absent → confirm (cloud format)
103 """
104 if isinstance(data, str):
105 try:
106 data = json.loads(data)
107 except (json.JSONDecodeError, TypeError):
108 return
110 tracking_key = data.get('request_id') or data.get('msg_id', '')
112 if 'confirmation' in data:
113 if not data['confirmation']:
114 # confirmation=False → new unconfirmed message, track it
115 self.track(data.get('topic_name', ''), data)
116 elif tracking_key:
117 # confirmation=True → confirmed, pop it
118 self.confirm(tracking_key)
119 elif tracking_key:
120 # Key absent → cloud-format confirmation, pop it
121 self.confirm(tracking_key)
123 def _cleanup_loop(self):
124 """Background loop: check for unconfirmed messages and emit notifications."""
125 while self._running:
126 time.sleep(15) # Check every 15s
127 now = time.time()
128 expired = []
130 with self._lock:
131 for msg_id, info in list(self._pending.items()):
132 if now - info['timestamp'] > _DELIVERY_TTL:
133 expired.append((msg_id, info))
135 for msg_id, _ in expired:
136 self._pending.pop(msg_id, None)
138 for msg_id, info in expired:
139 logger.debug(
140 f"Unconfirmed delivery: {info['topic']} "
141 f"(user={info['user_id']}, age={_DELIVERY_TTL}s)"
142 )
143 # Emit local notification event for unconfirmed messages
144 try:
145 from core.platform.events import emit_event
146 emit_event('notification.unconfirmed', {
147 'msg_id': msg_id,
148 'topic': info['topic'],
149 'user_id': info['user_id'],
150 })
151 except Exception:
152 pass
154 def get_stats(self) -> dict:
155 with self._lock:
156 return {'pending': len(self._pending)}
159# ─── Long-Running Task Tracker (replaces cloud longrunning.py) ──────────
161class LongRunningTracker:
162 """Track long-running task status locally.
164 Cloud longrunning.py logs task status to Teams webhook + tracks in memory.
165 Locally, we just log and track in memory (no Teams webhook needed).
166 """
168 def __init__(self):
169 self._tasks: Dict[str, dict] = {} # request_id → latest status
170 self._lock = threading.Lock()
172 def on_progress(self, topic: str, data: dict):
173 """Handle longrunning.log messages."""
174 if isinstance(data, str):
175 try:
176 data = json.loads(data)
177 except (json.JSONDecodeError, TypeError):
178 return
180 request_id = data.get('request_id', '')
181 task_name = data.get('task_name', '')
182 status = data.get('status', '')
184 if request_id:
185 with self._lock:
186 self._tasks[request_id] = {
187 'task_name': task_name,
188 'status': status,
189 'timestamp': time.time(),
190 'data': data,
191 }
193 # Log status transitions
194 if status in ('ERROR', 'TIMEOUT'):
195 logger.warning(f"Task {task_name} ({request_id}): {status}")
196 else:
197 logger.debug(f"Task {task_name} ({request_id}): {status}")
199 # Clean old entries (keep last 200)
200 with self._lock:
201 if len(self._tasks) > 200:
202 oldest = sorted(self._tasks.items(),
203 key=lambda x: x[1].get('timestamp', 0))
204 for rid, _ in oldest[:len(self._tasks) - 200]:
205 del self._tasks[rid]
207 def get_task_status(self, request_id: str) -> Optional[dict]:
208 with self._lock:
209 return self._tasks.get(request_id)
211 def get_stats(self) -> dict:
212 with self._lock:
213 return {'tracked_tasks': len(self._tasks)}
216# ─── Global instances (thread-safe, matching MessageBus pattern) ────────
218_delivery_tracker: Optional[DeliveryTracker] = None
219_longrunning_tracker: Optional[LongRunningTracker] = None
220_bootstrapped = False
221_singleton_lock = threading.Lock()
224def get_delivery_tracker() -> DeliveryTracker:
225 global _delivery_tracker
226 if _delivery_tracker is None:
227 with _singleton_lock:
228 if _delivery_tracker is None:
229 _delivery_tracker = DeliveryTracker()
230 return _delivery_tracker
233def get_longrunning_tracker() -> LongRunningTracker:
234 global _longrunning_tracker
235 if _longrunning_tracker is None:
236 with _singleton_lock:
237 if _longrunning_tracker is None:
238 _longrunning_tracker = LongRunningTracker()
239 return _longrunning_tracker
242def bootstrap_local_subscribers() -> None:
243 """Wire up local subscribers to the MessageBus.
245 Call once at startup. Subscribes to all topics that the cloud
246 chatbot_pipeline handles, using local handlers.
247 """
248 global _bootstrapped
249 with _singleton_lock:
250 if _bootstrapped:
251 return
252 _bootstrapped = True
254 try:
255 from core.peer_link.message_bus import get_message_bus
256 bus = get_message_bus()
257 except Exception as e:
258 logger.warning(f"Cannot bootstrap local subscribers (no MessageBus): {e}")
259 return
261 # 1. Confirmation tracking (replaces cloud confirmation.py)
262 tracker = get_delivery_tracker()
263 tracker.start()
264 bus.subscribe('task.confirmation', tracker.on_confirmation_message)
266 # 2. Long-running task tracking (replaces cloud longrunning.py)
267 lr = get_longrunning_tracker()
268 bus.subscribe('task.progress', lr.on_progress)
270 # 3. Intermediate responses — just log (cloud intermediate.py does the same)
271 def _on_intermediate(topic, data):
272 logger.debug(f"Intermediate response: {str(data)[:200]}")
274 bus.subscribe('task.intermediate', _on_intermediate)
276 # 4. Exception logging (replaces cloud exception.py)
277 def _on_exception(topic, data):
278 if isinstance(data, str):
279 try:
280 data = json.loads(data)
281 except Exception:
282 data = {'raw': data}
283 logger.error(f"Agent exception: {data.get('error', data.get('raw', str(data)[:200]))}")
285 bus.subscribe('task.exception', _on_exception)
287 # 5. Timeout tracking (replaces cloud timeout.py)
288 def _on_timeout(topic, data):
289 if isinstance(data, str):
290 try:
291 data = json.loads(data)
292 except Exception:
293 data = {'raw': data}
294 logger.warning(f"Task timeout: {data.get('task_name', '')} ({data.get('request_id', '')})")
296 bus.subscribe('task.timeout', _on_timeout)
298 # 6. Health probe response (replaces cloud probe.py)
299 def _on_probe(topic, data):
300 logger.debug("Health probe received")
301 try:
302 bus.publish('task.probe_response', {
303 'status': 'ok',
304 'timestamp': time.time(),
305 'delivery': tracker.get_stats(),
306 'longrunning': lr.get_stats(),
307 }, skip_crossbar=True)
308 except Exception:
309 pass
311 # Probe uses a different topic pattern but subscribe anyway
312 bus.subscribe('task.probe', _on_probe)
314 logger.info(
315 "Local subscribers bootstrapped: confirmation, longrunning, "
316 "intermediate, exception, timeout, probe"
317 )