Coverage for integrations / social / sync_engine.py: 75.1%
241 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HevolveSocial - Offline-First Sync Engine
4Queues operations locally when offline, drains to regional/central when connected.
5Used by regional (sync to central) and local (sync to regional) tiers.
6"""
7import os
8import time
9import logging
10import threading
11import requests
12from datetime import datetime
14from core.http_pool import pooled_get, pooled_post
15from typing import Dict, Optional
17logger = logging.getLogger('hevolve_social')
20class SyncEngine:
21 """Offline-first sync engine for regional/local tiers."""
23 def __init__(self):
24 self._interval = int(os.environ.get('HEVOLVE_SYNC_INTERVAL', '60'))
25 self._batch_size = int(os.environ.get('HEVOLVE_MAX_SYNC_BATCH', '50'))
26 self._running = False
27 self._thread = None
28 self._lock = threading.Lock()
30 MAX_QUEUE_SIZE = 10000
32 @staticmethod
33 def queue(db, target_tier: str, operation_type: str, payload: dict) -> Optional[str]:
34 """Queue a sync operation for later delivery."""
35 from .models import SyncQueue
36 from security.node_integrity import get_public_key_hex
38 try:
39 node_id = get_public_key_hex()[:16]
40 except Exception:
41 node_id = 'unknown'
43 # Backpressure: reject if queue is too large for this node
44 current_count = db.query(SyncQueue).filter(
45 SyncQueue.node_id == node_id,
46 SyncQueue.status.in_(['queued', 'failed']),
47 ).count()
48 if current_count >= SyncEngine.MAX_QUEUE_SIZE:
49 logger.warning(f"Sync queue backpressure: {current_count} items for node {node_id}, skipping insertion")
50 return None
52 item = SyncQueue(
53 node_id=node_id,
54 target_tier=target_tier,
55 operation_type=operation_type,
56 payload_json=payload,
57 status='queued',
58 )
59 db.add(item)
60 db.flush()
61 return item.id
63 @staticmethod
64 def drain_queue(db, node_id: str, target_url: str, batch_size: int = 50) -> Dict:
65 """Send queued operations to target. Returns counts."""
66 from .models import SyncQueue
68 items = db.query(SyncQueue).filter(
69 SyncQueue.node_id == node_id,
70 SyncQueue.status.in_(['queued', 'failed']),
71 ).order_by(SyncQueue.created_at).limit(batch_size).all()
73 if not items:
74 return {'sent': 0, 'failed': 0, 'remaining': 0}
76 # Optimistic locking: atomically update status to 'in_progress' only for items still 'queued'/'failed'
77 item_ids = [item.id for item in items]
78 updated_count = db.query(SyncQueue).filter(
79 SyncQueue.id.in_(item_ids),
80 SyncQueue.status.in_(['queued', 'failed']),
81 ).update({'status': 'in_progress', 'last_attempt_at': datetime.utcnow()}, synchronize_session='fetch')
82 db.flush()
84 if updated_count == 0:
85 return {'sent': 0, 'failed': 0, 'remaining': 0}
87 # Re-fetch only items that were successfully claimed
88 items = db.query(SyncQueue).filter(
89 SyncQueue.id.in_(item_ids),
90 SyncQueue.status == 'in_progress',
91 ).all()
93 if not items:
94 return {'sent': 0, 'failed': 0, 'remaining': 0}
96 # Build batch payload
97 batch = []
98 for item in items:
99 batch.append({
100 'id': item.id,
101 'operation_type': item.operation_type,
102 'payload': item.payload_json,
103 })
105 # Send batch — E2E encrypt if target has X25519 key
106 sent = 0
107 failed = 0
108 send_payload = {'items': batch, 'node_id': node_id}
109 try:
110 target_x25519 = SyncEngine._get_target_x25519(db, target_url)
111 if target_x25519:
112 try:
113 from security.channel_encryption import encrypt_json_for_peer
114 send_payload = {'encrypted': True,
115 'envelope': encrypt_json_for_peer(send_payload, target_x25519)}
116 except Exception:
117 pass # Encryption unavailable, send plaintext
118 except Exception:
119 pass
120 try:
121 resp = pooled_post(
122 f"{target_url}/api/social/hierarchy/sync",
123 json=send_payload,
124 timeout=30,
125 )
126 if resp.status_code == 200:
127 result = resp.json()
128 processed_ids = set(result.get('processed', []))
129 for item in items:
130 if item.id in processed_ids:
131 item.status = 'completed'
132 item.completed_at = datetime.utcnow()
133 sent += 1
134 else:
135 item.status = 'failed'
136 item.retry_count = (item.retry_count or 0) + 1
137 item.error_message = 'Not in processed list'
138 failed += 1
139 else:
140 for item in items:
141 item.status = 'failed'
142 item.retry_count = (item.retry_count or 0) + 1
143 item.error_message = f'HTTP {resp.status_code}'
144 failed += 1
145 except requests.RequestException as e:
146 for item in items:
147 item.status = 'failed'
148 item.retry_count = (item.retry_count or 0) + 1
149 item.error_message = str(e)
150 failed += 1
152 # Mark items that exceeded max retries as dead (stop retrying)
153 for item in items:
154 if item.status == 'failed' and (item.retry_count or 0) >= (item.max_retries or 5):
155 item.status = 'dead'
156 item.error_message = f'Max retries exceeded: {item.error_message}'
158 db.flush()
160 remaining = db.query(SyncQueue).filter(
161 SyncQueue.node_id == node_id,
162 SyncQueue.status.in_(['queued', 'failed']),
163 SyncQueue.retry_count < SyncQueue.max_retries,
164 ).count()
166 return {'sent': sent, 'failed': failed, 'remaining': remaining}
168 @staticmethod
169 def _get_target_x25519(db, target_url: str) -> str:
170 """Look up X25519 public key for a target node by URL."""
171 try:
172 from .models import PeerNode
173 peer = db.query(PeerNode).filter(
174 PeerNode.url == target_url.rstrip('/'),
175 PeerNode.status == 'active',
176 ).first()
177 if peer and getattr(peer, 'x25519_public', None):
178 return peer.x25519_public
179 except Exception:
180 pass
181 return ''
183 @staticmethod
184 def receive_sync_batch(db, items: list) -> Dict:
185 """Process incoming sync items from a child node."""
186 processed = []
187 errors = []
189 for item in items:
190 op = item.get('operation_type', '')
191 payload = item.get('payload', {})
192 item_id = item.get('id', '')
194 # Idempotency: skip already-processed items
195 if item_id and db:
196 from .models import SyncQueue
197 existing = db.query(SyncQueue).filter_by(id=item_id).first()
198 if existing and existing.status in ('completed', 'dead'):
199 processed.append(item_id)
200 continue
202 try:
203 if op == 'register_agent':
204 # Agent data sync - store as metadata for now
205 logger.info(f"Sync: received agent registration from child")
206 elif op == 'sync_post':
207 logger.info(f"Sync: received post sync from child")
208 elif op == 'update_stats':
209 logger.info(f"Sync: received stats update from child")
210 elif op == 'register_node':
211 logger.info(f"Sync: received node registration from child")
212 elif op == 'coding_task_assign':
213 logger.info(f"Sync: received coding task assignment from parent")
214 elif op == 'coding_submission':
215 logger.info(f"Sync: received coding submission from child")
216 elif op == 'sync_user':
217 SyncEngine._handle_sync_user(db, payload)
218 elif op == 'revoke_token':
219 SyncEngine._handle_revoke_token(payload)
220 elif op == 'sync_blocklist':
221 SyncEngine._handle_sync_blocklist(payload)
222 else:
223 logger.debug(f"Sync: unknown operation type: {op}")
225 processed.append(item_id)
226 except Exception as e:
227 errors.append({'id': item_id, 'error': str(e)})
229 return {'processed': processed, 'errors': errors}
231 @staticmethod
232 def _handle_sync_user(db, payload: dict):
233 """Create or update a User record from sync data."""
234 from .models import User
236 user_id = payload.get('user_id')
237 username = payload.get('username', '')
238 if not user_id or not username:
239 logger.warning("sync_user: missing user_id or username")
240 return
242 existing = db.query(User).filter_by(id=user_id).first()
243 if existing:
244 # Update fields from sync (don't overwrite local-only fields)
245 if payload.get('handle'):
246 existing.handle = payload['handle']
247 if payload.get('display_name'):
248 existing.display_name = payload['display_name']
249 if payload.get('role'):
250 existing.role = payload['role']
251 logger.info(f"Sync: updated user {user_id} from sync")
252 else:
253 # Create new user record from sync
254 from .auth import generate_api_token
255 user = User(
256 id=user_id,
257 username=username,
258 display_name=payload.get('display_name', username),
259 handle=payload.get('handle', ''),
260 role=payload.get('role', 'flat'),
261 user_type=payload.get('user_type', 'human'),
262 api_token=generate_api_token(),
263 )
264 db.add(user)
265 logger.info(f"Sync: created user {user_id} from sync")
267 @staticmethod
268 def _handle_revoke_token(payload: dict):
269 """Add a JTI to the local token blocklist."""
270 jti = payload.get('jti', '')
271 if not jti:
272 logger.warning("revoke_token sync: missing jti")
273 return
274 try:
275 from security.jwt_manager import _blocklist, ACCESS_TOKEN_EXPIRY
276 expires_in = payload.get('expires_in', ACCESS_TOKEN_EXPIRY)
277 _blocklist.add(jti, expires_in)
278 logger.info(f"Sync: revoked token jti={jti}")
279 except Exception as e:
280 logger.warning(f"Sync: failed to revoke token: {e}")
282 @staticmethod
283 def _handle_sync_blocklist(payload: dict):
284 """Bulk sync of blocked JTIs."""
285 jtis = payload.get('jtis', [])
286 if not jtis:
287 return
288 try:
289 from security.jwt_manager import _blocklist, ACCESS_TOKEN_EXPIRY
290 expires_in = payload.get('expires_in', ACCESS_TOKEN_EXPIRY)
291 for jti in jtis:
292 _blocklist.add(jti, expires_in)
293 logger.info(f"Sync: bulk-revoked {len(jtis)} tokens")
294 except Exception as e:
295 logger.warning(f"Sync: failed to sync blocklist: {e}")
297 @staticmethod
298 def queue_user_sync(db, user_data: dict, direction: str = 'up'):
299 """Queue a user creation/update for sync.
301 Args:
302 db: Database session
303 user_data: Dict with user_id, username, handle, role, etc.
304 direction: 'up' (to central) or 'down' (from central to nodes)
305 """
306 target = 'central' if direction == 'up' else 'regional'
307 return SyncEngine.queue(db, target, 'sync_user', user_data)
309 @staticmethod
310 def is_connected_to(target_url: str) -> bool:
311 """Check if we can reach the target URL."""
312 try:
313 resp = pooled_get(
314 f"{target_url}/api/social/peers/health",
315 timeout=5,
316 )
317 return resp.status_code == 200
318 except requests.RequestException:
319 return False
321 def start_background_sync(self):
322 """Start background sync drain thread (daemon)."""
323 with self._lock:
324 if self._running:
325 return
326 self._running = True
328 self._thread = threading.Thread(target=self._sync_loop, daemon=True)
329 self._thread.start()
330 logger.info(f"Sync engine started (interval={self._interval}s)")
332 def stop_background_sync(self):
333 """Stop the background sync thread."""
334 with self._lock:
335 self._running = False
336 if self._thread:
337 self._thread.join(timeout=10)
339 def _wd_heartbeat(self):
340 """Send heartbeat to watchdog between potentially blocking operations."""
341 try:
342 from security.node_watchdog import get_watchdog
343 wd = get_watchdog()
344 if wd:
345 wd.heartbeat('sync_engine')
346 except Exception:
347 pass
349 def _sync_loop(self):
350 """Background loop: periodically drain sync queue."""
351 while self._running:
352 time.sleep(self._interval)
353 if not self._running:
354 break
355 self._wd_heartbeat()
356 try:
357 self._do_sync_drain()
358 except Exception as e:
359 logger.debug(f"Sync drain error: {e}")
360 self._wd_heartbeat()
362 def _do_sync_drain(self):
363 """Attempt to drain queued items to target."""
364 from .models import get_db
366 target_url = os.environ.get('HEVOLVE_CENTRAL_URL', '') or \
367 os.environ.get('HEVOLVE_REGIONAL_URL', '')
368 if not target_url:
369 return
371 if not self.is_connected_to(target_url):
372 return
374 db = get_db()
375 try:
376 from security.node_integrity import get_public_key_hex
377 node_id = get_public_key_hex()[:16]
378 except Exception:
379 node_id = 'unknown'
381 try:
382 result = self.drain_queue(db, node_id, target_url, self._batch_size)
383 if result['sent'] > 0:
384 logger.info(f"Sync: drained {result['sent']} items to {target_url}")
385 db.commit()
386 except Exception as e:
387 db.rollback()
388 logger.debug(f"Sync drain error: {e}")
389 finally:
390 db.close()
392 @staticmethod
393 def get_queue_stats(db, node_id: str) -> Dict:
394 """Get sync queue statistics for a node."""
395 from .models import SyncQueue
397 queued = db.query(SyncQueue).filter_by(
398 node_id=node_id, status='queued').count()
399 in_progress = db.query(SyncQueue).filter_by(
400 node_id=node_id, status='in_progress').count()
401 completed = db.query(SyncQueue).filter_by(
402 node_id=node_id, status='completed').count()
403 failed = db.query(SyncQueue).filter_by(
404 node_id=node_id, status='failed').count()
406 return {
407 'queued': queued,
408 'in_progress': in_progress,
409 'completed': completed,
410 'failed': failed,
411 'total_pending': queued + in_progress,
412 }
415# Module-level singleton
416sync_engine = SyncEngine()