Coverage for integrations / distributed_agent / worker_loop.py: 87.1%
147 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Distributed Worker Loop — auto-claim and execute tasks on worker nodes.
4Runs as a background daemon thread on every node where the shared Redis
5coordinator is reachable. No separate mode flag — if Redis exists and
6this node is part of a hive, the worker loop auto-starts and claims tasks.
8Polls the shared DistributedTaskCoordinator for unclaimed tasks,
9executes them via the local /chat endpoint, and submits results back.
10"""
11import os
12import random
13import time
14import logging
15import threading
16import requests
17from typing import Optional
18from core.http_pool import pooled_post
20from core.constants import HIVE_DEPTH
21from core.port_registry import get_port
23logger = logging.getLogger('hevolve_social')
26class DistributedWorkerLoop:
27 """Background loop: claim tasks from shared Redis, execute via local /chat, submit results."""
29 # Exponential backoff bounds for Redis outages. Without this the
30 # tick loop used to spin at self._interval (15s) while every Redis
31 # call raised ConnectionError — not quite 100% CPU, but tens of
32 # thousands of failed connect attempts per hour during a prolonged
33 # outage. With backoff we start at 1s (much quicker than 15s
34 # recovery) and cap at 60s (well below the poll interval during
35 # healthy operation).
36 _BACKOFF_MIN: float = 1.0
37 _BACKOFF_MAX: float = 60.0
38 # Jitter factor so N workers don't all reconnect in the same second
39 # when Redis comes back. ±25% of the current backoff window.
40 _BACKOFF_JITTER: float = 0.25
42 def __init__(self):
43 self._interval = int(os.environ.get('HEVOLVE_WORKER_POLL_INTERVAL', '15'))
44 self._running = False
45 self._thread = None
46 self._lock = threading.Lock()
47 self._node_id = os.environ.get('HEVOLVE_NODE_ID', 'unknown')
48 self._capabilities = self._detect_capabilities()
49 # Current Redis backoff state — reset to 0 when a tick succeeds.
50 self._redis_backoff: float = 0.0
52 def _detect_capabilities(self):
53 """Detect this node's capabilities from system_requirements."""
54 caps = ['marketing', 'news', 'finance', 'revenue'] # Base capabilities
55 try:
56 from security.system_requirements import get_capabilities
57 hw = get_capabilities()
58 if hw:
59 tier = hw.tier.value
60 if tier in ('standard', 'performance', 'compute_host'):
61 caps.extend(['coding', 'ip_protection', 'provision'])
62 if tier in ('performance', 'compute_host'):
63 caps.append('vision')
64 except Exception:
65 pass
66 return caps
68 def start(self):
69 """Start the worker loop if a shared Redis coordinator is reachable.
71 No separate mode flag — if Redis is available, the worker loop
72 starts and will claim tasks from the shared queue. This is how
73 a node joins the distributed hive: just have Redis reachable.
74 """
75 if not self._is_enabled():
76 logger.debug("Distributed worker loop: Redis coordinator not reachable, skipping")
77 return
79 with self._lock:
80 if self._running:
81 return
82 self._running = True
84 self._thread = threading.Thread(target=self._loop, daemon=True)
85 self._thread.start()
86 logger.info(f"Distributed worker loop started (interval={self._interval}s, "
87 f"capabilities={self._capabilities})")
89 def stop(self):
90 """Stop the worker loop."""
91 with self._lock:
92 self._running = False
93 if self._thread:
94 self._thread.join(timeout=10)
96 @staticmethod
97 def _is_enabled() -> bool:
98 """Check if the shared coordinator is reachable (Redis available).
100 Uses the existing tier system — no separate distributed mode flag.
101 """
102 try:
103 from integrations.distributed_agent.api import _get_coordinator
104 coord = _get_coordinator()
105 return coord is not None
106 except Exception:
107 return False
109 def _wd_heartbeat(self):
110 """Send heartbeat to watchdog between potentially blocking operations."""
111 try:
112 from security.node_watchdog import get_watchdog
113 wd = get_watchdog()
114 if wd:
115 wd.heartbeat('distributed_worker')
116 except Exception:
117 pass
119 def _loop(self):
120 # Lazy-import redis so tests that never touch Redis don't need
121 # the dependency installed. ``RedisConnectionError`` resolves to
122 # ``Exception`` when redis isn't available, which is still
123 # correct (the backoff branch also traps generic connect errors).
124 try:
125 from redis.exceptions import ConnectionError as RedisConnectionError
126 except Exception: # pragma: no cover — redis always installed in prod
127 RedisConnectionError = ConnectionError # type: ignore[misc, assignment]
129 while self._running:
130 # Sleep for poll interval + current backoff. Backoff is 0
131 # during healthy operation so this collapses back to the
132 # pre-fix behaviour once Redis recovers.
133 sleep_for = self._interval + self._redis_backoff
134 if self._redis_backoff > 0:
135 # ±25% jitter prevents thundering-herd when many workers
136 # simultaneously discover Redis has come back.
137 jitter = self._redis_backoff * self._BACKOFF_JITTER
138 sleep_for += random.uniform(-jitter, jitter)
139 time.sleep(max(1.0, sleep_for))
140 if not self._running:
141 break
142 self._wd_heartbeat()
143 # GUARDRAIL: circuit breaker
144 try:
145 from security.hive_guardrails import HiveCircuitBreaker
146 if HiveCircuitBreaker.is_halted():
147 continue
148 except ImportError:
149 pass
150 try:
151 self._tick()
152 # Tick succeeded → reset backoff so the next cycle runs
153 # at the normal poll cadence.
154 if self._redis_backoff > 0:
155 logger.info(
156 f"Worker Redis recovered after "
157 f"{self._redis_backoff:.1f}s backoff")
158 self._redis_backoff = 0.0
159 except RedisConnectionError as e:
160 self._bump_redis_backoff()
161 logger.warning(
162 f"Worker Redis ConnectionError: {e}; "
163 f"backoff={self._redis_backoff:.1f}s")
164 except Exception as e:
165 # Unknown errors: treat as transient but still back off so
166 # a persistent bug doesn't spin.
167 msg = str(e).lower()
168 if 'connection' in msg or 'timeout' in msg or 'redis' in msg:
169 self._bump_redis_backoff()
170 logger.debug(f"Distributed worker tick error: {e}")
171 self._wd_heartbeat()
173 def _bump_redis_backoff(self) -> None:
174 """Double the backoff window, clamped to [MIN, MAX]."""
175 if self._redis_backoff <= 0:
176 self._redis_backoff = self._BACKOFF_MIN
177 else:
178 self._redis_backoff = min(self._redis_backoff * 2,
179 self._BACKOFF_MAX)
181 def _tick(self):
182 """Try to claim and execute one task per tick."""
183 coordinator = self._get_coordinator()
184 if not coordinator:
185 return
187 # Claim next matching task
188 task = coordinator.claim_next_task(
189 agent_id=self._node_id,
190 capabilities=self._capabilities,
191 )
192 if not task:
193 return
195 logger.info(f"Worker claimed task {task.task_id}: {task.description[:80]}")
197 # HIVE_DEPTH enforcement — defense in depth. Coordinators stamp
198 # hop at submit_goal, but a rogue peer could have forwarded a
199 # task with an inflated hop; drop anything past the published
200 # 3-level topology instead of executing it and re-propagating.
201 try:
202 task_hop = int(task.context.get('hop', 0) or 0)
203 except (TypeError, ValueError):
204 task_hop = 0
205 if task_hop >= HIVE_DEPTH:
206 logger.warning(
207 f"Worker dropping task {task.task_id}: hop={task_hop} "
208 f">= HIVE_DEPTH={HIVE_DEPTH}")
209 return
211 # Execute via local /chat
212 result = self._execute_task(task)
214 if result is not None:
215 # Submit result back to coordinator
216 try:
217 coordinator.submit_result(task.task_id, self._node_id, result)
218 logger.info(f"Worker completed task {task.task_id}")
219 except Exception as e:
220 logger.warning(f"Worker failed to submit result for {task.task_id}: {e}")
221 else:
222 logger.warning(f"Worker execution failed for task {task.task_id}")
224 def _execute_task(self, task) -> Optional[str]:
225 """Execute a distributed task via the local /chat endpoint.
227 Uses the same guardrail pipeline as local dispatch.
228 """
229 prompt = task.context.get('prompt', task.description)
230 goal_type = task.context.get('goal_type', 'coding')
231 user_id = task.context.get('user_id', self._node_id)
233 # GUARDRAIL: pre-dispatch gate
234 try:
235 from security.hive_guardrails import GuardrailEnforcer
236 allowed, reason, prompt = GuardrailEnforcer.before_dispatch(prompt)
237 if not allowed:
238 logger.warning(f"Worker task {task.task_id} blocked by guardrail: {reason}")
239 return None
240 except ImportError:
241 logger.error("CRITICAL: hive_guardrails not available — blocking worker dispatch")
242 return None
244 base_url = os.environ.get('HEVOLVE_BASE_URL', f'http://localhost:{get_port("backend")}')
245 prompt_id = f"{goal_type}_{task.task_id[:8]}"
247 body = {
248 'user_id': user_id,
249 'prompt_id': prompt_id,
250 'prompt': prompt,
251 'create_agent': True,
252 'autonomous': True,
253 'casual_conv': False,
254 }
256 try:
257 resp = pooled_post(f'{base_url}/chat', json=body, timeout=120)
258 if resp.status_code == 200:
259 result = resp.json()
260 response = result.get('response', '')
262 # GUARDRAIL: post-response check
263 try:
264 from security.hive_guardrails import GuardrailEnforcer
265 passed, reason = GuardrailEnforcer.after_response(response)
266 if not passed:
267 logger.warning(f"Worker response filtered for {task.task_id}: {reason}")
268 return None
269 except ImportError:
270 return None
272 # Record to world model
273 try:
274 from integrations.agent_engine.world_model_bridge import get_world_model_bridge
275 bridge = get_world_model_bridge()
276 bridge.record_interaction(
277 user_id=user_id,
278 prompt_id=prompt_id,
279 prompt=prompt,
280 response=response,
281 goal_id=task.task_id,
282 )
283 except Exception:
284 pass
286 return response
287 except requests.RequestException as e:
288 logger.warning(f"Worker local /chat failed for {task.task_id}: {e}")
290 return None
292 @staticmethod
293 def _get_coordinator():
294 """Get shared coordinator singleton."""
295 try:
296 from integrations.distributed_agent.api import _get_coordinator
297 return _get_coordinator()
298 except Exception:
299 return None
302# Module-level singleton
303worker_loop = DistributedWorkerLoop()