Coverage for integrations / social / dashboard_service.py: 94.4%
161 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Agent Dashboard Service: Truth-Grounded Unified View
4Queries actual database state, applies staleness detection, computes priority.
5Shows what is REALLY happening, not what we wish was happening.
7Consumed by Nunba (desktop), HART RN (mobile), and hevolve.ai (web).
8"""
9import hashlib
10import logging
11import os
12from datetime import datetime, timedelta
13from typing import Dict, List, Optional
15from sqlalchemy.orm import Session
17logger = logging.getLogger('hevolve_social')
20class DashboardService:
21 """Static service class for the truth-grounded unified agent dashboard."""
23 # Priority tier weights (higher = shown first)
24 TIER_EXECUTING = 1000
25 TIER_ACTIVE = 500
26 TIER_STALLED = 450
27 TIER_FROZEN_DAEMON = 300
28 TIER_DAEMON = 200
29 TIER_IDLE = 50
30 TIER_COMPLETED = 10
32 @staticmethod
33 def get_dashboard(db: Session) -> Dict:
34 """Build complete agent dashboard from live database state.
36 Returns dict with timestamp, node_health, agents (priority-sorted),
37 and summary counts.
38 """
39 now = datetime.utcnow()
40 agents: List[Dict] = []
42 # 1. Agent goals (marketing, coding, analytics, etc.)
43 agents.extend(DashboardService._get_agent_goals(db))
45 # 2. Coding goals
46 agents.extend(DashboardService._get_coding_goals(db))
48 # 3. Background daemons (from watchdog)
49 agents.extend(DashboardService._get_daemon_status())
51 # 4. Trained agents (social users with user_type='agent')
52 agents.extend(DashboardService._get_trained_agents(db))
54 # 5. Expert agents (static registry)
55 agents.extend(DashboardService._get_expert_agents())
57 # Compute priority, sort descending
58 for agent in agents:
59 agent['priority'] = DashboardService._compute_priority(agent)
60 agents.sort(key=lambda a: -a['priority'])
62 # Node health from watchdog
63 node_health = {'watchdog': 'unavailable', 'threads': {}}
64 try:
65 from security.node_watchdog import get_watchdog
66 wd = get_watchdog()
67 if wd:
68 node_health = wd.get_health()
69 except Exception:
70 pass
72 # Summary counts
73 summary: Dict = {'total': len(agents), 'by_type': {}, 'by_status': {}}
74 for a in agents:
75 t = a.get('type', 'unknown')
76 s = a.get('status', 'unknown')
77 summary['by_type'][t] = summary['by_type'].get(t, 0) + 1
78 summary['by_status'][s] = summary['by_status'].get(s, 0) + 1
80 # World model (HevolveAI) status: SUPPLEMENTARY data, must not
81 # block the dashboard. `get_world_model_bridge()` first-call
82 # bootstraps embodied_ai + vision + llama subsystems and can take
83 # 60s+ when HARTOS Tier-1 init was skipped (e.g. transformers
84 # crashed at boot). Without this timeout the entire dashboard
85 # endpoint hangs every poll → 5-second React UI poll piles up →
86 # frontend silently times out → "No agents running" appears even
87 # though 351 goals are in the AgentGoal table.
88 #
89 # Regression observed 2026-04-26: Tier-1 KeyError at boot left
90 # world_model_bridge un-warmed; every dashboard poll then took
91 # 57s, queueing the waitress task list and emptying the admin UI.
92 world_model = {'healthy': False, 'error': 'unavailable'}
93 try:
94 import concurrent.futures as _cf
95 def _collect_world_model():
96 from integrations.agent_engine.world_model_bridge import (
97 get_world_model_bridge)
98 bridge = get_world_model_bridge()
99 return {
100 'health': bridge.check_health(),
101 'stats': bridge.get_learning_stats(),
102 }
103 # CRITICAL: do NOT use `with ThreadPoolExecutor as ex:` here.
104 # The context-manager ``__exit__`` calls ``shutdown(wait=True)``,
105 # which join()s the pool's worker thread. When ``_fut.result``
106 # times out, the worker is still inside the heavy
107 # ``get_world_model_bridge()`` import and CAN'T finish, so the
108 # ``with`` exit blocks forever — turning the 2s timeout into
109 # an infinite hang and stacking every dashboard poll into a
110 # permanently-stuck Hypercorn worker. Live thread dump
111 # 2026-04-28 22:08 showed 15 nunba_X workers ALL frozen at
112 # ``ThreadPoolExecutor.__exit__ → shutdown → join``. Fix:
113 # manual try/finally + ``shutdown(wait=False,
114 # cancel_futures=True)`` so the request returns even when
115 # the worker is permanently wedged on the import lock.
116 _ex = _cf.ThreadPoolExecutor(max_workers=1)
117 try:
118 _fut = _ex.submit(_collect_world_model)
119 try:
120 _wm = _fut.result(timeout=2.0)
121 health = _wm['health']
122 stats = _wm['stats']
123 world_model = {
124 'healthy': health.get('healthy', False),
125 'learning_stats': stats.get('learning', {}),
126 'hivemind_stats': stats.get('hivemind', {}),
127 'bridge_stats': stats.get('bridge', {}),
128 }
129 except _cf.TimeoutError:
130 # Bridge is cold or unreachable: surface that fact
131 # in the response without blocking the dashboard.
132 world_model = {'healthy': False, 'error': 'cold_or_unreachable'}
133 finally:
134 # Don't wait for the (potentially permanently-stuck)
135 # worker thread. It's a daemon — interpreter shutdown
136 # will reap it. Cancel any not-yet-started futures so
137 # the pool doesn't pick up new work after we leave.
138 _ex.shutdown(wait=False, cancel_futures=True)
139 except Exception:
140 pass
142 return {
143 'timestamp': now.isoformat(),
144 'node_health': node_health,
145 'world_model': world_model,
146 'agents': agents,
147 'summary': summary,
148 }
150 @staticmethod
151 def _get_agent_goals(db: Session) -> List[Dict]:
152 """Query AgentGoal table. Apply truth-grounding: detect stalled goals."""
153 try:
154 from .models import AgentGoal
155 except ImportError:
156 return []
158 poll_interval = int(os.environ.get('HEVOLVE_AGENT_POLL_INTERVAL', '30'))
159 now = datetime.utcnow()
161 goals = db.query(AgentGoal).filter(
162 AgentGoal.status.in_(['active', 'paused', 'completed', 'failed'])
163 ).all()
165 result = []
166 for goal in goals:
167 gd = goal.to_dict()
169 # Truth-grounding: detect stalled or idle
170 real_status = goal.status
171 if goal.status == 'active' and goal.last_dispatched_at:
172 age = (now - goal.last_dispatched_at).total_seconds()
173 if age > poll_interval * 2:
174 real_status = 'stalled'
175 elif goal.status == 'active' and not goal.last_dispatched_at:
176 real_status = 'idle'
178 result.append({
179 'id': str(goal.id),
180 'type': f'{goal.goal_type}_goal',
181 'name': goal.title,
182 'status': real_status,
183 'current_task': f'{goal.goal_type}: {goal.title[:60]}',
184 'skills': [goal.goal_type],
185 'last_active': gd.get('last_dispatched_at'),
186 'metrics': {
187 'spark_spent': goal.spark_spent,
188 'spark_budget': goal.spark_budget,
189 },
190 })
191 return result
193 @staticmethod
194 def _get_coding_goals(db: Session) -> List[Dict]:
195 """Query CodingGoal table with task completion percentage."""
196 try:
197 from .models import CodingGoal
198 except ImportError:
199 return []
201 goals = db.query(CodingGoal).filter(
202 CodingGoal.status.in_(['active', 'paused', 'completed'])
203 ).all()
205 result = []
206 for goal in goals:
207 total = getattr(goal, 'total_tasks', 0) or 0
208 completed = getattr(goal, 'completed_tasks', 0) or 0
209 pct = round(completed / total * 100, 1) if total > 0 else 0
211 result.append({
212 'id': str(goal.id),
213 'type': 'coding_goal',
214 'name': goal.title,
215 'status': goal.status,
216 'current_task': f'Coding: {goal.title[:60]} ({pct}% done)',
217 'skills': ['coding'],
218 'last_active': goal.updated_at.isoformat() if getattr(
219 goal, 'updated_at', None) else None,
220 'metrics': {
221 'total_tasks': total,
222 'completed_tasks': completed,
223 'completion_pct': pct,
224 },
225 })
226 return result
228 @staticmethod
229 def _get_daemon_status() -> List[Dict]:
230 """Get background daemon statuses from watchdog."""
231 result = []
232 try:
233 from security.node_watchdog import get_watchdog
234 wd = get_watchdog()
235 if not wd:
236 raise RuntimeError('no watchdog')
238 health = wd.get_health()
239 for name, info in health.get('threads', {}).items():
240 result.append({
241 'id': f'daemon_{name}',
242 'type': 'daemon',
243 'name': name,
244 'status': info.get('status', 'unknown'),
245 'current_task': f'Background: {name}',
246 'skills': [name.replace('_', ' ')],
247 'last_active': info.get('last_heartbeat_iso'),
248 'metrics': {
249 'restart_count': info.get('restart_count', 0),
250 'heartbeat_age_s': info.get('last_heartbeat_age_s'),
251 },
252 })
253 except Exception:
254 # Watchdog not available; enumerate known daemons
255 for name in ('gossip', 'runtime_monitor', 'sync_engine',
256 'agent_daemon', 'coding_daemon'):
257 result.append({
258 'id': f'daemon_{name}',
259 'type': 'daemon',
260 'name': name,
261 'status': 'unknown',
262 'current_task': f'Background: {name}',
263 'skills': [name.replace('_', ' ')],
264 'last_active': None,
265 'metrics': {},
266 })
267 return result
269 @staticmethod
270 def _get_trained_agents(db: Session) -> List[Dict]:
271 """Query users with user_type='agent'."""
272 try:
273 from .models import User
274 except ImportError:
275 return []
277 agents = db.query(User).filter(User.user_type == 'agent').all()
279 result = []
280 for agent in agents:
281 last_active = getattr(agent, 'last_active_at', None)
282 is_active = (last_active and
283 (datetime.utcnow() - last_active).total_seconds() < 3600)
285 result.append({
286 'id': str(agent.id),
287 'type': 'trained_agent',
288 'name': getattr(agent, 'display_name', None) or agent.username,
289 'status': 'active' if is_active else 'idle',
290 'current_task': None,
291 'skills': [], # loaded from skill badges if available
292 'last_active': last_active.isoformat() if last_active else None,
293 'metrics': {
294 'karma_score': getattr(agent, 'karma_score', 0),
295 },
296 })
297 return result
299 @staticmethod
300 def _get_expert_agents() -> List[Dict]:
301 """Load from ExpertAgentRegistry if available."""
302 result = []
303 try:
304 from integrations.internal_comm.internal_agent_communication import (
305 AgentSkillRegistry)
306 registry = AgentSkillRegistry.get_instance()
307 for agent_id, agent_info in registry._agents.items():
308 result.append({
309 'id': f'expert_{agent_id}',
310 'type': 'expert_agent',
311 'name': agent_info.get('name', agent_id),
312 'status': 'available',
313 'current_task': None,
314 'skills': list(agent_info.get('skills', {}).keys()),
315 'last_active': None,
316 'metrics': {
317 'accuracy': agent_info.get('accuracy', 0),
318 },
319 })
320 except Exception:
321 pass
322 return result
324 # ───────────────────────────────────────────────────────────────
325 # ETag short-circuit: cheap hash for client If-None-Match polls.
326 # See api_dashboard.get_agent_dashboard / api_audit.list_agents.
327 # The React UI polls every 5s; without this, every poll re-runs
328 # 5 SQL queries + watchdog locks + 170-row serialization even when
329 # nothing changed. Hash inputs are MAX(updated_at) on the two goal
330 # tables + watchdog uptime bucket + trained-agent count: all cheap
331 # reads that move whenever the dashboard would visibly change.
332 # ───────────────────────────────────────────────────────────────
334 @staticmethod
335 def get_dashboard_version(db: Session) -> str:
336 """Return a 16-char hash of dashboard inputs for ETag/304.
338 Cheap by construction:
339 - MAX(updated_at) on agent_goals + coding_goals (indexed)
340 - watchdog uptime in 5s buckets + restart-log length
341 - count of trained_agent users
343 ANY caller depending on this for cache invalidation MUST also
344 ensure their write site bumps `updated_at` (the SQLAlchemy
345 `onupdate=func.now()` does this for AgentGoal/CodingGoal).
346 For state changes that DON'T touch these tables (e.g. expert
347 agent registry mutations, daemon thread restarts), the
348 EventBus → SSE bridge in core/platform/bootstrap.py pushes
349 `dashboard.invalidate` so the client re-fetches without
350 waiting for the ETag to differ.
351 """
352 parts: List[str] = []
353 try:
354 from sqlalchemy import func
355 from .models import AgentGoal, CodingGoal, User
356 parts.append(str(db.query(func.max(AgentGoal.updated_at)).scalar()))
357 parts.append(str(db.query(func.max(CodingGoal.updated_at)).scalar()))
358 parts.append(str(
359 db.query(func.count(User.id))
360 .filter(User.user_type == 'agent').scalar()))
361 except Exception:
362 # Schema not migrated, in-memory test DB without tables, etc.
363 # Fall through with whatever we collected; version will
364 # still be stable per request, just less precise.
365 parts.append('schema-unavailable')
367 # Daemon restart_log length: bumps on every restart, stable
368 # otherwise. We deliberately skip uptime: it ticks every
369 # second and would invalidate the cache on every poll without
370 # corresponding to a state the user sees change. Always
371 # append something so the parts-list cardinality is stable
372 # whether or not the watchdog has been started yet.
373 restart_count = ''
374 try:
375 from security.node_watchdog import get_watchdog
376 wd = get_watchdog()
377 if wd is not None:
378 restart_count = str(len(wd.get_health().get('restart_log', [])))
379 except Exception:
380 pass
381 parts.append(restart_count)
383 return hashlib.sha256('|'.join(parts).encode()).hexdigest()[:16]
385 @staticmethod
386 def _compute_priority(agent_entry: Dict) -> int:
387 """Compute priority score for dashboard ordering.
389 Priority reflects what matters most RIGHT NOW:
390 - Broken things first (frozen daemons)
391 - Active work next (executing/active goals)
392 - Background services
393 - Idle/completed last
394 """
395 status = agent_entry.get('status', '')
396 agent_type = agent_entry.get('type', '')
398 # Tier 1: Currently executing
399 if status in ('executing', 'dispatching'):
400 base = DashboardService.TIER_EXECUTING
401 # Tier 2: Active goals
402 elif status == 'active' and 'goal' in agent_type:
403 base = DashboardService.TIER_ACTIVE
404 # Tier 3: Stalled goals (need attention)
405 elif status == 'stalled':
406 base = DashboardService.TIER_STALLED
407 # Tier 4: Frozen daemons (need immediate attention)
408 elif agent_type == 'daemon' and status == 'frozen':
409 base = DashboardService.TIER_FROZEN_DAEMON
410 # Tier 5: Healthy daemons
411 elif agent_type == 'daemon' and status in ('healthy', 'unknown'):
412 base = DashboardService.TIER_DAEMON
413 # Tier 6: Idle/available
414 elif status in ('idle', 'available'):
415 base = DashboardService.TIER_IDLE
416 # Tier 7: Completed/paused/failed
417 else:
418 base = DashboardService.TIER_COMPLETED
420 # Sub-sort by remaining spark budget (for goals)
421 metrics = agent_entry.get('metrics', {})
422 budget = metrics.get('spark_budget') or 0
423 spent = metrics.get('spark_spent') or 0
424 remaining = max(0, budget - spent)
425 base += min(remaining, 100)
427 return base