Coverage for integrations / social / sync_engine.py: 75.1%

241 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial - Offline-First Sync Engine 

3 

4Queues operations locally when offline, drains to regional/central when connected. 

5Used by regional (sync to central) and local (sync to regional) tiers. 

6""" 

7import os 

8import time 

9import logging 

10import threading 

11import requests 

12from datetime import datetime 

13 

14from core.http_pool import pooled_get, pooled_post 

15from typing import Dict, Optional 

16 

17logger = logging.getLogger('hevolve_social') 

18 

19 

20class SyncEngine: 

21 """Offline-first sync engine for regional/local tiers.""" 

22 

23 def __init__(self): 

24 self._interval = int(os.environ.get('HEVOLVE_SYNC_INTERVAL', '60')) 

25 self._batch_size = int(os.environ.get('HEVOLVE_MAX_SYNC_BATCH', '50')) 

26 self._running = False 

27 self._thread = None 

28 self._lock = threading.Lock() 

29 

30 MAX_QUEUE_SIZE = 10000 

31 

32 @staticmethod 

33 def queue(db, target_tier: str, operation_type: str, payload: dict) -> Optional[str]: 

34 """Queue a sync operation for later delivery.""" 

35 from .models import SyncQueue 

36 from security.node_integrity import get_public_key_hex 

37 

38 try: 

39 node_id = get_public_key_hex()[:16] 

40 except Exception: 

41 node_id = 'unknown' 

42 

43 # Backpressure: reject if queue is too large for this node 

44 current_count = db.query(SyncQueue).filter( 

45 SyncQueue.node_id == node_id, 

46 SyncQueue.status.in_(['queued', 'failed']), 

47 ).count() 

48 if current_count >= SyncEngine.MAX_QUEUE_SIZE: 

49 logger.warning(f"Sync queue backpressure: {current_count} items for node {node_id}, skipping insertion") 

50 return None 

51 

52 item = SyncQueue( 

53 node_id=node_id, 

54 target_tier=target_tier, 

55 operation_type=operation_type, 

56 payload_json=payload, 

57 status='queued', 

58 ) 

59 db.add(item) 

60 db.flush() 

61 return item.id 

62 

63 @staticmethod 

64 def drain_queue(db, node_id: str, target_url: str, batch_size: int = 50) -> Dict: 

65 """Send queued operations to target. Returns counts.""" 

66 from .models import SyncQueue 

67 

68 items = db.query(SyncQueue).filter( 

69 SyncQueue.node_id == node_id, 

70 SyncQueue.status.in_(['queued', 'failed']), 

71 ).order_by(SyncQueue.created_at).limit(batch_size).all() 

72 

73 if not items: 

74 return {'sent': 0, 'failed': 0, 'remaining': 0} 

75 

76 # Optimistic locking: atomically update status to 'in_progress' only for items still 'queued'/'failed' 

77 item_ids = [item.id for item in items] 

78 updated_count = db.query(SyncQueue).filter( 

79 SyncQueue.id.in_(item_ids), 

80 SyncQueue.status.in_(['queued', 'failed']), 

81 ).update({'status': 'in_progress', 'last_attempt_at': datetime.utcnow()}, synchronize_session='fetch') 

82 db.flush() 

83 

84 if updated_count == 0: 

85 return {'sent': 0, 'failed': 0, 'remaining': 0} 

86 

87 # Re-fetch only items that were successfully claimed 

88 items = db.query(SyncQueue).filter( 

89 SyncQueue.id.in_(item_ids), 

90 SyncQueue.status == 'in_progress', 

91 ).all() 

92 

93 if not items: 

94 return {'sent': 0, 'failed': 0, 'remaining': 0} 

95 

96 # Build batch payload 

97 batch = [] 

98 for item in items: 

99 batch.append({ 

100 'id': item.id, 

101 'operation_type': item.operation_type, 

102 'payload': item.payload_json, 

103 }) 

104 

105 # Send batch — E2E encrypt if target has X25519 key 

106 sent = 0 

107 failed = 0 

108 send_payload = {'items': batch, 'node_id': node_id} 

109 try: 

110 target_x25519 = SyncEngine._get_target_x25519(db, target_url) 

111 if target_x25519: 

112 try: 

113 from security.channel_encryption import encrypt_json_for_peer 

114 send_payload = {'encrypted': True, 

115 'envelope': encrypt_json_for_peer(send_payload, target_x25519)} 

116 except Exception: 

117 pass # Encryption unavailable, send plaintext 

118 except Exception: 

119 pass 

120 try: 

121 resp = pooled_post( 

122 f"{target_url}/api/social/hierarchy/sync", 

123 json=send_payload, 

124 timeout=30, 

125 ) 

126 if resp.status_code == 200: 

127 result = resp.json() 

128 processed_ids = set(result.get('processed', [])) 

129 for item in items: 

130 if item.id in processed_ids: 

131 item.status = 'completed' 

132 item.completed_at = datetime.utcnow() 

133 sent += 1 

134 else: 

135 item.status = 'failed' 

136 item.retry_count = (item.retry_count or 0) + 1 

137 item.error_message = 'Not in processed list' 

138 failed += 1 

139 else: 

140 for item in items: 

141 item.status = 'failed' 

142 item.retry_count = (item.retry_count or 0) + 1 

143 item.error_message = f'HTTP {resp.status_code}' 

144 failed += 1 

145 except requests.RequestException as e: 

146 for item in items: 

147 item.status = 'failed' 

148 item.retry_count = (item.retry_count or 0) + 1 

149 item.error_message = str(e) 

150 failed += 1 

151 

152 # Mark items that exceeded max retries as dead (stop retrying) 

153 for item in items: 

154 if item.status == 'failed' and (item.retry_count or 0) >= (item.max_retries or 5): 

155 item.status = 'dead' 

156 item.error_message = f'Max retries exceeded: {item.error_message}' 

157 

158 db.flush() 

159 

160 remaining = db.query(SyncQueue).filter( 

161 SyncQueue.node_id == node_id, 

162 SyncQueue.status.in_(['queued', 'failed']), 

163 SyncQueue.retry_count < SyncQueue.max_retries, 

164 ).count() 

165 

166 return {'sent': sent, 'failed': failed, 'remaining': remaining} 

167 

168 @staticmethod 

169 def _get_target_x25519(db, target_url: str) -> str: 

170 """Look up X25519 public key for a target node by URL.""" 

171 try: 

172 from .models import PeerNode 

173 peer = db.query(PeerNode).filter( 

174 PeerNode.url == target_url.rstrip('/'), 

175 PeerNode.status == 'active', 

176 ).first() 

177 if peer and getattr(peer, 'x25519_public', None): 

178 return peer.x25519_public 

179 except Exception: 

180 pass 

181 return '' 

182 

183 @staticmethod 

184 def receive_sync_batch(db, items: list) -> Dict: 

185 """Process incoming sync items from a child node.""" 

186 processed = [] 

187 errors = [] 

188 

189 for item in items: 

190 op = item.get('operation_type', '') 

191 payload = item.get('payload', {}) 

192 item_id = item.get('id', '') 

193 

194 # Idempotency: skip already-processed items 

195 if item_id and db: 

196 from .models import SyncQueue 

197 existing = db.query(SyncQueue).filter_by(id=item_id).first() 

198 if existing and existing.status in ('completed', 'dead'): 

199 processed.append(item_id) 

200 continue 

201 

202 try: 

203 if op == 'register_agent': 

204 # Agent data sync - store as metadata for now 

205 logger.info(f"Sync: received agent registration from child") 

206 elif op == 'sync_post': 

207 logger.info(f"Sync: received post sync from child") 

208 elif op == 'update_stats': 

209 logger.info(f"Sync: received stats update from child") 

210 elif op == 'register_node': 

211 logger.info(f"Sync: received node registration from child") 

212 elif op == 'coding_task_assign': 

213 logger.info(f"Sync: received coding task assignment from parent") 

214 elif op == 'coding_submission': 

215 logger.info(f"Sync: received coding submission from child") 

216 elif op == 'sync_user': 

217 SyncEngine._handle_sync_user(db, payload) 

218 elif op == 'revoke_token': 

219 SyncEngine._handle_revoke_token(payload) 

220 elif op == 'sync_blocklist': 

221 SyncEngine._handle_sync_blocklist(payload) 

222 else: 

223 logger.debug(f"Sync: unknown operation type: {op}") 

224 

225 processed.append(item_id) 

226 except Exception as e: 

227 errors.append({'id': item_id, 'error': str(e)}) 

228 

229 return {'processed': processed, 'errors': errors} 

230 

231 @staticmethod 

232 def _handle_sync_user(db, payload: dict): 

233 """Create or update a User record from sync data.""" 

234 from .models import User 

235 

236 user_id = payload.get('user_id') 

237 username = payload.get('username', '') 

238 if not user_id or not username: 

239 logger.warning("sync_user: missing user_id or username") 

240 return 

241 

242 existing = db.query(User).filter_by(id=user_id).first() 

243 if existing: 

244 # Update fields from sync (don't overwrite local-only fields) 

245 if payload.get('handle'): 

246 existing.handle = payload['handle'] 

247 if payload.get('display_name'): 

248 existing.display_name = payload['display_name'] 

249 if payload.get('role'): 

250 existing.role = payload['role'] 

251 logger.info(f"Sync: updated user {user_id} from sync") 

252 else: 

253 # Create new user record from sync 

254 from .auth import generate_api_token 

255 user = User( 

256 id=user_id, 

257 username=username, 

258 display_name=payload.get('display_name', username), 

259 handle=payload.get('handle', ''), 

260 role=payload.get('role', 'flat'), 

261 user_type=payload.get('user_type', 'human'), 

262 api_token=generate_api_token(), 

263 ) 

264 db.add(user) 

265 logger.info(f"Sync: created user {user_id} from sync") 

266 

267 @staticmethod 

268 def _handle_revoke_token(payload: dict): 

269 """Add a JTI to the local token blocklist.""" 

270 jti = payload.get('jti', '') 

271 if not jti: 

272 logger.warning("revoke_token sync: missing jti") 

273 return 

274 try: 

275 from security.jwt_manager import _blocklist, ACCESS_TOKEN_EXPIRY 

276 expires_in = payload.get('expires_in', ACCESS_TOKEN_EXPIRY) 

277 _blocklist.add(jti, expires_in) 

278 logger.info(f"Sync: revoked token jti={jti}") 

279 except Exception as e: 

280 logger.warning(f"Sync: failed to revoke token: {e}") 

281 

282 @staticmethod 

283 def _handle_sync_blocklist(payload: dict): 

284 """Bulk sync of blocked JTIs.""" 

285 jtis = payload.get('jtis', []) 

286 if not jtis: 

287 return 

288 try: 

289 from security.jwt_manager import _blocklist, ACCESS_TOKEN_EXPIRY 

290 expires_in = payload.get('expires_in', ACCESS_TOKEN_EXPIRY) 

291 for jti in jtis: 

292 _blocklist.add(jti, expires_in) 

293 logger.info(f"Sync: bulk-revoked {len(jtis)} tokens") 

294 except Exception as e: 

295 logger.warning(f"Sync: failed to sync blocklist: {e}") 

296 

297 @staticmethod 

298 def queue_user_sync(db, user_data: dict, direction: str = 'up'): 

299 """Queue a user creation/update for sync. 

300 

301 Args: 

302 db: Database session 

303 user_data: Dict with user_id, username, handle, role, etc. 

304 direction: 'up' (to central) or 'down' (from central to nodes) 

305 """ 

306 target = 'central' if direction == 'up' else 'regional' 

307 return SyncEngine.queue(db, target, 'sync_user', user_data) 

308 

309 @staticmethod 

310 def is_connected_to(target_url: str) -> bool: 

311 """Check if we can reach the target URL.""" 

312 try: 

313 resp = pooled_get( 

314 f"{target_url}/api/social/peers/health", 

315 timeout=5, 

316 ) 

317 return resp.status_code == 200 

318 except requests.RequestException: 

319 return False 

320 

321 def start_background_sync(self): 

322 """Start background sync drain thread (daemon).""" 

323 with self._lock: 

324 if self._running: 

325 return 

326 self._running = True 

327 

328 self._thread = threading.Thread(target=self._sync_loop, daemon=True) 

329 self._thread.start() 

330 logger.info(f"Sync engine started (interval={self._interval}s)") 

331 

332 def stop_background_sync(self): 

333 """Stop the background sync thread.""" 

334 with self._lock: 

335 self._running = False 

336 if self._thread: 

337 self._thread.join(timeout=10) 

338 

339 def _wd_heartbeat(self): 

340 """Send heartbeat to watchdog between potentially blocking operations.""" 

341 try: 

342 from security.node_watchdog import get_watchdog 

343 wd = get_watchdog() 

344 if wd: 

345 wd.heartbeat('sync_engine') 

346 except Exception: 

347 pass 

348 

349 def _sync_loop(self): 

350 """Background loop: periodically drain sync queue.""" 

351 while self._running: 

352 time.sleep(self._interval) 

353 if not self._running: 

354 break 

355 self._wd_heartbeat() 

356 try: 

357 self._do_sync_drain() 

358 except Exception as e: 

359 logger.debug(f"Sync drain error: {e}") 

360 self._wd_heartbeat() 

361 

362 def _do_sync_drain(self): 

363 """Attempt to drain queued items to target.""" 

364 from .models import get_db 

365 

366 target_url = os.environ.get('HEVOLVE_CENTRAL_URL', '') or \ 

367 os.environ.get('HEVOLVE_REGIONAL_URL', '') 

368 if not target_url: 

369 return 

370 

371 if not self.is_connected_to(target_url): 

372 return 

373 

374 db = get_db() 

375 try: 

376 from security.node_integrity import get_public_key_hex 

377 node_id = get_public_key_hex()[:16] 

378 except Exception: 

379 node_id = 'unknown' 

380 

381 try: 

382 result = self.drain_queue(db, node_id, target_url, self._batch_size) 

383 if result['sent'] > 0: 

384 logger.info(f"Sync: drained {result['sent']} items to {target_url}") 

385 db.commit() 

386 except Exception as e: 

387 db.rollback() 

388 logger.debug(f"Sync drain error: {e}") 

389 finally: 

390 db.close() 

391 

392 @staticmethod 

393 def get_queue_stats(db, node_id: str) -> Dict: 

394 """Get sync queue statistics for a node.""" 

395 from .models import SyncQueue 

396 

397 queued = db.query(SyncQueue).filter_by( 

398 node_id=node_id, status='queued').count() 

399 in_progress = db.query(SyncQueue).filter_by( 

400 node_id=node_id, status='in_progress').count() 

401 completed = db.query(SyncQueue).filter_by( 

402 node_id=node_id, status='completed').count() 

403 failed = db.query(SyncQueue).filter_by( 

404 node_id=node_id, status='failed').count() 

405 

406 return { 

407 'queued': queued, 

408 'in_progress': in_progress, 

409 'completed': completed, 

410 'failed': failed, 

411 'total_pending': queued + in_progress, 

412 } 

413 

414 

415# Module-level singleton 

416sync_engine = SyncEngine()