Coverage for integrations / distributed_agent / worker_loop.py: 87.1%

147 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Distributed Worker Loop — auto-claim and execute tasks on worker nodes. 

3 

4Runs as a background daemon thread on every node where the shared Redis 

5coordinator is reachable. No separate mode flag — if Redis exists and 

6this node is part of a hive, the worker loop auto-starts and claims tasks. 

7 

8Polls the shared DistributedTaskCoordinator for unclaimed tasks, 

9executes them via the local /chat endpoint, and submits results back. 

10""" 

11import os 

12import random 

13import time 

14import logging 

15import threading 

16import requests 

17from typing import Optional 

18from core.http_pool import pooled_post 

19 

20from core.constants import HIVE_DEPTH 

21from core.port_registry import get_port 

22 

23logger = logging.getLogger('hevolve_social') 

24 

25 

26class DistributedWorkerLoop: 

27 """Background loop: claim tasks from shared Redis, execute via local /chat, submit results.""" 

28 

29 # Exponential backoff bounds for Redis outages. Without this the 

30 # tick loop used to spin at self._interval (15s) while every Redis 

31 # call raised ConnectionError — not quite 100% CPU, but tens of 

32 # thousands of failed connect attempts per hour during a prolonged 

33 # outage. With backoff we start at 1s (much quicker than 15s 

34 # recovery) and cap at 60s (well below the poll interval during 

35 # healthy operation). 

36 _BACKOFF_MIN: float = 1.0 

37 _BACKOFF_MAX: float = 60.0 

38 # Jitter factor so N workers don't all reconnect in the same second 

39 # when Redis comes back. ±25% of the current backoff window. 

40 _BACKOFF_JITTER: float = 0.25 

41 

42 def __init__(self): 

43 self._interval = int(os.environ.get('HEVOLVE_WORKER_POLL_INTERVAL', '15')) 

44 self._running = False 

45 self._thread = None 

46 self._lock = threading.Lock() 

47 self._node_id = os.environ.get('HEVOLVE_NODE_ID', 'unknown') 

48 self._capabilities = self._detect_capabilities() 

49 # Current Redis backoff state — reset to 0 when a tick succeeds. 

50 self._redis_backoff: float = 0.0 

51 

52 def _detect_capabilities(self): 

53 """Detect this node's capabilities from system_requirements.""" 

54 caps = ['marketing', 'news', 'finance', 'revenue'] # Base capabilities 

55 try: 

56 from security.system_requirements import get_capabilities 

57 hw = get_capabilities() 

58 if hw: 

59 tier = hw.tier.value 

60 if tier in ('standard', 'performance', 'compute_host'): 

61 caps.extend(['coding', 'ip_protection', 'provision']) 

62 if tier in ('performance', 'compute_host'): 

63 caps.append('vision') 

64 except Exception: 

65 pass 

66 return caps 

67 

68 def start(self): 

69 """Start the worker loop if a shared Redis coordinator is reachable. 

70 

71 No separate mode flag — if Redis is available, the worker loop 

72 starts and will claim tasks from the shared queue. This is how 

73 a node joins the distributed hive: just have Redis reachable. 

74 """ 

75 if not self._is_enabled(): 

76 logger.debug("Distributed worker loop: Redis coordinator not reachable, skipping") 

77 return 

78 

79 with self._lock: 

80 if self._running: 

81 return 

82 self._running = True 

83 

84 self._thread = threading.Thread(target=self._loop, daemon=True) 

85 self._thread.start() 

86 logger.info(f"Distributed worker loop started (interval={self._interval}s, " 

87 f"capabilities={self._capabilities})") 

88 

89 def stop(self): 

90 """Stop the worker loop.""" 

91 with self._lock: 

92 self._running = False 

93 if self._thread: 

94 self._thread.join(timeout=10) 

95 

96 @staticmethod 

97 def _is_enabled() -> bool: 

98 """Check if the shared coordinator is reachable (Redis available). 

99 

100 Uses the existing tier system — no separate distributed mode flag. 

101 """ 

102 try: 

103 from integrations.distributed_agent.api import _get_coordinator 

104 coord = _get_coordinator() 

105 return coord is not None 

106 except Exception: 

107 return False 

108 

109 def _wd_heartbeat(self): 

110 """Send heartbeat to watchdog between potentially blocking operations.""" 

111 try: 

112 from security.node_watchdog import get_watchdog 

113 wd = get_watchdog() 

114 if wd: 

115 wd.heartbeat('distributed_worker') 

116 except Exception: 

117 pass 

118 

119 def _loop(self): 

120 # Lazy-import redis so tests that never touch Redis don't need 

121 # the dependency installed. ``RedisConnectionError`` resolves to 

122 # ``Exception`` when redis isn't available, which is still 

123 # correct (the backoff branch also traps generic connect errors). 

124 try: 

125 from redis.exceptions import ConnectionError as RedisConnectionError 

126 except Exception: # pragma: no cover — redis always installed in prod 

127 RedisConnectionError = ConnectionError # type: ignore[misc, assignment] 

128 

129 while self._running: 

130 # Sleep for poll interval + current backoff. Backoff is 0 

131 # during healthy operation so this collapses back to the 

132 # pre-fix behaviour once Redis recovers. 

133 sleep_for = self._interval + self._redis_backoff 

134 if self._redis_backoff > 0: 

135 # ±25% jitter prevents thundering-herd when many workers 

136 # simultaneously discover Redis has come back. 

137 jitter = self._redis_backoff * self._BACKOFF_JITTER 

138 sleep_for += random.uniform(-jitter, jitter) 

139 time.sleep(max(1.0, sleep_for)) 

140 if not self._running: 

141 break 

142 self._wd_heartbeat() 

143 # GUARDRAIL: circuit breaker 

144 try: 

145 from security.hive_guardrails import HiveCircuitBreaker 

146 if HiveCircuitBreaker.is_halted(): 

147 continue 

148 except ImportError: 

149 pass 

150 try: 

151 self._tick() 

152 # Tick succeeded → reset backoff so the next cycle runs 

153 # at the normal poll cadence. 

154 if self._redis_backoff > 0: 

155 logger.info( 

156 f"Worker Redis recovered after " 

157 f"{self._redis_backoff:.1f}s backoff") 

158 self._redis_backoff = 0.0 

159 except RedisConnectionError as e: 

160 self._bump_redis_backoff() 

161 logger.warning( 

162 f"Worker Redis ConnectionError: {e}; " 

163 f"backoff={self._redis_backoff:.1f}s") 

164 except Exception as e: 

165 # Unknown errors: treat as transient but still back off so 

166 # a persistent bug doesn't spin. 

167 msg = str(e).lower() 

168 if 'connection' in msg or 'timeout' in msg or 'redis' in msg: 

169 self._bump_redis_backoff() 

170 logger.debug(f"Distributed worker tick error: {e}") 

171 self._wd_heartbeat() 

172 

173 def _bump_redis_backoff(self) -> None: 

174 """Double the backoff window, clamped to [MIN, MAX].""" 

175 if self._redis_backoff <= 0: 

176 self._redis_backoff = self._BACKOFF_MIN 

177 else: 

178 self._redis_backoff = min(self._redis_backoff * 2, 

179 self._BACKOFF_MAX) 

180 

181 def _tick(self): 

182 """Try to claim and execute one task per tick.""" 

183 coordinator = self._get_coordinator() 

184 if not coordinator: 

185 return 

186 

187 # Claim next matching task 

188 task = coordinator.claim_next_task( 

189 agent_id=self._node_id, 

190 capabilities=self._capabilities, 

191 ) 

192 if not task: 

193 return 

194 

195 logger.info(f"Worker claimed task {task.task_id}: {task.description[:80]}") 

196 

197 # HIVE_DEPTH enforcement — defense in depth. Coordinators stamp 

198 # hop at submit_goal, but a rogue peer could have forwarded a 

199 # task with an inflated hop; drop anything past the published 

200 # 3-level topology instead of executing it and re-propagating. 

201 try: 

202 task_hop = int(task.context.get('hop', 0) or 0) 

203 except (TypeError, ValueError): 

204 task_hop = 0 

205 if task_hop >= HIVE_DEPTH: 

206 logger.warning( 

207 f"Worker dropping task {task.task_id}: hop={task_hop} " 

208 f">= HIVE_DEPTH={HIVE_DEPTH}") 

209 return 

210 

211 # Execute via local /chat 

212 result = self._execute_task(task) 

213 

214 if result is not None: 

215 # Submit result back to coordinator 

216 try: 

217 coordinator.submit_result(task.task_id, self._node_id, result) 

218 logger.info(f"Worker completed task {task.task_id}") 

219 except Exception as e: 

220 logger.warning(f"Worker failed to submit result for {task.task_id}: {e}") 

221 else: 

222 logger.warning(f"Worker execution failed for task {task.task_id}") 

223 

224 def _execute_task(self, task) -> Optional[str]: 

225 """Execute a distributed task via the local /chat endpoint. 

226 

227 Uses the same guardrail pipeline as local dispatch. 

228 """ 

229 prompt = task.context.get('prompt', task.description) 

230 goal_type = task.context.get('goal_type', 'coding') 

231 user_id = task.context.get('user_id', self._node_id) 

232 

233 # GUARDRAIL: pre-dispatch gate 

234 try: 

235 from security.hive_guardrails import GuardrailEnforcer 

236 allowed, reason, prompt = GuardrailEnforcer.before_dispatch(prompt) 

237 if not allowed: 

238 logger.warning(f"Worker task {task.task_id} blocked by guardrail: {reason}") 

239 return None 

240 except ImportError: 

241 logger.error("CRITICAL: hive_guardrails not available — blocking worker dispatch") 

242 return None 

243 

244 base_url = os.environ.get('HEVOLVE_BASE_URL', f'http://localhost:{get_port("backend")}') 

245 prompt_id = f"{goal_type}_{task.task_id[:8]}" 

246 

247 body = { 

248 'user_id': user_id, 

249 'prompt_id': prompt_id, 

250 'prompt': prompt, 

251 'create_agent': True, 

252 'autonomous': True, 

253 'casual_conv': False, 

254 } 

255 

256 try: 

257 resp = pooled_post(f'{base_url}/chat', json=body, timeout=120) 

258 if resp.status_code == 200: 

259 result = resp.json() 

260 response = result.get('response', '') 

261 

262 # GUARDRAIL: post-response check 

263 try: 

264 from security.hive_guardrails import GuardrailEnforcer 

265 passed, reason = GuardrailEnforcer.after_response(response) 

266 if not passed: 

267 logger.warning(f"Worker response filtered for {task.task_id}: {reason}") 

268 return None 

269 except ImportError: 

270 return None 

271 

272 # Record to world model 

273 try: 

274 from integrations.agent_engine.world_model_bridge import get_world_model_bridge 

275 bridge = get_world_model_bridge() 

276 bridge.record_interaction( 

277 user_id=user_id, 

278 prompt_id=prompt_id, 

279 prompt=prompt, 

280 response=response, 

281 goal_id=task.task_id, 

282 ) 

283 except Exception: 

284 pass 

285 

286 return response 

287 except requests.RequestException as e: 

288 logger.warning(f"Worker local /chat failed for {task.task_id}: {e}") 

289 

290 return None 

291 

292 @staticmethod 

293 def _get_coordinator(): 

294 """Get shared coordinator singleton.""" 

295 try: 

296 from integrations.distributed_agent.api import _get_coordinator 

297 return _get_coordinator() 

298 except Exception: 

299 return None 

300 

301 

302# Module-level singleton 

303worker_loop = DistributedWorkerLoop()