Coverage for integrations / social / dashboard_service.py: 94.4%

161 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Agent Dashboard Service: Truth-Grounded Unified View 

3 

4Queries actual database state, applies staleness detection, computes priority. 

5Shows what is REALLY happening, not what we wish was happening. 

6 

7Consumed by Nunba (desktop), HART RN (mobile), and hevolve.ai (web). 

8""" 

9import hashlib 

10import logging 

11import os 

12from datetime import datetime, timedelta 

13from typing import Dict, List, Optional 

14 

15from sqlalchemy.orm import Session 

16 

17logger = logging.getLogger('hevolve_social') 

18 

19 

20class DashboardService: 

21 """Static service class for the truth-grounded unified agent dashboard.""" 

22 

23 # Priority tier weights (higher = shown first) 

24 TIER_EXECUTING = 1000 

25 TIER_ACTIVE = 500 

26 TIER_STALLED = 450 

27 TIER_FROZEN_DAEMON = 300 

28 TIER_DAEMON = 200 

29 TIER_IDLE = 50 

30 TIER_COMPLETED = 10 

31 

32 @staticmethod 

33 def get_dashboard(db: Session) -> Dict: 

34 """Build complete agent dashboard from live database state. 

35 

36 Returns dict with timestamp, node_health, agents (priority-sorted), 

37 and summary counts. 

38 """ 

39 now = datetime.utcnow() 

40 agents: List[Dict] = [] 

41 

42 # 1. Agent goals (marketing, coding, analytics, etc.) 

43 agents.extend(DashboardService._get_agent_goals(db)) 

44 

45 # 2. Coding goals 

46 agents.extend(DashboardService._get_coding_goals(db)) 

47 

48 # 3. Background daemons (from watchdog) 

49 agents.extend(DashboardService._get_daemon_status()) 

50 

51 # 4. Trained agents (social users with user_type='agent') 

52 agents.extend(DashboardService._get_trained_agents(db)) 

53 

54 # 5. Expert agents (static registry) 

55 agents.extend(DashboardService._get_expert_agents()) 

56 

57 # Compute priority, sort descending 

58 for agent in agents: 

59 agent['priority'] = DashboardService._compute_priority(agent) 

60 agents.sort(key=lambda a: -a['priority']) 

61 

62 # Node health from watchdog 

63 node_health = {'watchdog': 'unavailable', 'threads': {}} 

64 try: 

65 from security.node_watchdog import get_watchdog 

66 wd = get_watchdog() 

67 if wd: 

68 node_health = wd.get_health() 

69 except Exception: 

70 pass 

71 

72 # Summary counts 

73 summary: Dict = {'total': len(agents), 'by_type': {}, 'by_status': {}} 

74 for a in agents: 

75 t = a.get('type', 'unknown') 

76 s = a.get('status', 'unknown') 

77 summary['by_type'][t] = summary['by_type'].get(t, 0) + 1 

78 summary['by_status'][s] = summary['by_status'].get(s, 0) + 1 

79 

80 # World model (HevolveAI) status: SUPPLEMENTARY data, must not 

81 # block the dashboard. `get_world_model_bridge()` first-call 

82 # bootstraps embodied_ai + vision + llama subsystems and can take 

83 # 60s+ when HARTOS Tier-1 init was skipped (e.g. transformers 

84 # crashed at boot). Without this timeout the entire dashboard 

85 # endpoint hangs every poll → 5-second React UI poll piles up → 

86 # frontend silently times out → "No agents running" appears even 

87 # though 351 goals are in the AgentGoal table. 

88 # 

89 # Regression observed 2026-04-26: Tier-1 KeyError at boot left 

90 # world_model_bridge un-warmed; every dashboard poll then took 

91 # 57s, queueing the waitress task list and emptying the admin UI. 

92 world_model = {'healthy': False, 'error': 'unavailable'} 

93 try: 

94 import concurrent.futures as _cf 

95 def _collect_world_model(): 

96 from integrations.agent_engine.world_model_bridge import ( 

97 get_world_model_bridge) 

98 bridge = get_world_model_bridge() 

99 return { 

100 'health': bridge.check_health(), 

101 'stats': bridge.get_learning_stats(), 

102 } 

103 # CRITICAL: do NOT use `with ThreadPoolExecutor as ex:` here. 

104 # The context-manager ``__exit__`` calls ``shutdown(wait=True)``, 

105 # which join()s the pool's worker thread. When ``_fut.result`` 

106 # times out, the worker is still inside the heavy 

107 # ``get_world_model_bridge()`` import and CAN'T finish, so the 

108 # ``with`` exit blocks forever — turning the 2s timeout into 

109 # an infinite hang and stacking every dashboard poll into a 

110 # permanently-stuck Hypercorn worker. Live thread dump 

111 # 2026-04-28 22:08 showed 15 nunba_X workers ALL frozen at 

112 # ``ThreadPoolExecutor.__exit__ → shutdown → join``. Fix: 

113 # manual try/finally + ``shutdown(wait=False, 

114 # cancel_futures=True)`` so the request returns even when 

115 # the worker is permanently wedged on the import lock. 

116 _ex = _cf.ThreadPoolExecutor(max_workers=1) 

117 try: 

118 _fut = _ex.submit(_collect_world_model) 

119 try: 

120 _wm = _fut.result(timeout=2.0) 

121 health = _wm['health'] 

122 stats = _wm['stats'] 

123 world_model = { 

124 'healthy': health.get('healthy', False), 

125 'learning_stats': stats.get('learning', {}), 

126 'hivemind_stats': stats.get('hivemind', {}), 

127 'bridge_stats': stats.get('bridge', {}), 

128 } 

129 except _cf.TimeoutError: 

130 # Bridge is cold or unreachable: surface that fact 

131 # in the response without blocking the dashboard. 

132 world_model = {'healthy': False, 'error': 'cold_or_unreachable'} 

133 finally: 

134 # Don't wait for the (potentially permanently-stuck) 

135 # worker thread. It's a daemon — interpreter shutdown 

136 # will reap it. Cancel any not-yet-started futures so 

137 # the pool doesn't pick up new work after we leave. 

138 _ex.shutdown(wait=False, cancel_futures=True) 

139 except Exception: 

140 pass 

141 

142 return { 

143 'timestamp': now.isoformat(), 

144 'node_health': node_health, 

145 'world_model': world_model, 

146 'agents': agents, 

147 'summary': summary, 

148 } 

149 

150 @staticmethod 

151 def _get_agent_goals(db: Session) -> List[Dict]: 

152 """Query AgentGoal table. Apply truth-grounding: detect stalled goals.""" 

153 try: 

154 from .models import AgentGoal 

155 except ImportError: 

156 return [] 

157 

158 poll_interval = int(os.environ.get('HEVOLVE_AGENT_POLL_INTERVAL', '30')) 

159 now = datetime.utcnow() 

160 

161 goals = db.query(AgentGoal).filter( 

162 AgentGoal.status.in_(['active', 'paused', 'completed', 'failed']) 

163 ).all() 

164 

165 result = [] 

166 for goal in goals: 

167 gd = goal.to_dict() 

168 

169 # Truth-grounding: detect stalled or idle 

170 real_status = goal.status 

171 if goal.status == 'active' and goal.last_dispatched_at: 

172 age = (now - goal.last_dispatched_at).total_seconds() 

173 if age > poll_interval * 2: 

174 real_status = 'stalled' 

175 elif goal.status == 'active' and not goal.last_dispatched_at: 

176 real_status = 'idle' 

177 

178 result.append({ 

179 'id': str(goal.id), 

180 'type': f'{goal.goal_type}_goal', 

181 'name': goal.title, 

182 'status': real_status, 

183 'current_task': f'{goal.goal_type}: {goal.title[:60]}', 

184 'skills': [goal.goal_type], 

185 'last_active': gd.get('last_dispatched_at'), 

186 'metrics': { 

187 'spark_spent': goal.spark_spent, 

188 'spark_budget': goal.spark_budget, 

189 }, 

190 }) 

191 return result 

192 

193 @staticmethod 

194 def _get_coding_goals(db: Session) -> List[Dict]: 

195 """Query CodingGoal table with task completion percentage.""" 

196 try: 

197 from .models import CodingGoal 

198 except ImportError: 

199 return [] 

200 

201 goals = db.query(CodingGoal).filter( 

202 CodingGoal.status.in_(['active', 'paused', 'completed']) 

203 ).all() 

204 

205 result = [] 

206 for goal in goals: 

207 total = getattr(goal, 'total_tasks', 0) or 0 

208 completed = getattr(goal, 'completed_tasks', 0) or 0 

209 pct = round(completed / total * 100, 1) if total > 0 else 0 

210 

211 result.append({ 

212 'id': str(goal.id), 

213 'type': 'coding_goal', 

214 'name': goal.title, 

215 'status': goal.status, 

216 'current_task': f'Coding: {goal.title[:60]} ({pct}% done)', 

217 'skills': ['coding'], 

218 'last_active': goal.updated_at.isoformat() if getattr( 

219 goal, 'updated_at', None) else None, 

220 'metrics': { 

221 'total_tasks': total, 

222 'completed_tasks': completed, 

223 'completion_pct': pct, 

224 }, 

225 }) 

226 return result 

227 

228 @staticmethod 

229 def _get_daemon_status() -> List[Dict]: 

230 """Get background daemon statuses from watchdog.""" 

231 result = [] 

232 try: 

233 from security.node_watchdog import get_watchdog 

234 wd = get_watchdog() 

235 if not wd: 

236 raise RuntimeError('no watchdog') 

237 

238 health = wd.get_health() 

239 for name, info in health.get('threads', {}).items(): 

240 result.append({ 

241 'id': f'daemon_{name}', 

242 'type': 'daemon', 

243 'name': name, 

244 'status': info.get('status', 'unknown'), 

245 'current_task': f'Background: {name}', 

246 'skills': [name.replace('_', ' ')], 

247 'last_active': info.get('last_heartbeat_iso'), 

248 'metrics': { 

249 'restart_count': info.get('restart_count', 0), 

250 'heartbeat_age_s': info.get('last_heartbeat_age_s'), 

251 }, 

252 }) 

253 except Exception: 

254 # Watchdog not available; enumerate known daemons 

255 for name in ('gossip', 'runtime_monitor', 'sync_engine', 

256 'agent_daemon', 'coding_daemon'): 

257 result.append({ 

258 'id': f'daemon_{name}', 

259 'type': 'daemon', 

260 'name': name, 

261 'status': 'unknown', 

262 'current_task': f'Background: {name}', 

263 'skills': [name.replace('_', ' ')], 

264 'last_active': None, 

265 'metrics': {}, 

266 }) 

267 return result 

268 

269 @staticmethod 

270 def _get_trained_agents(db: Session) -> List[Dict]: 

271 """Query users with user_type='agent'.""" 

272 try: 

273 from .models import User 

274 except ImportError: 

275 return [] 

276 

277 agents = db.query(User).filter(User.user_type == 'agent').all() 

278 

279 result = [] 

280 for agent in agents: 

281 last_active = getattr(agent, 'last_active_at', None) 

282 is_active = (last_active and 

283 (datetime.utcnow() - last_active).total_seconds() < 3600) 

284 

285 result.append({ 

286 'id': str(agent.id), 

287 'type': 'trained_agent', 

288 'name': getattr(agent, 'display_name', None) or agent.username, 

289 'status': 'active' if is_active else 'idle', 

290 'current_task': None, 

291 'skills': [], # loaded from skill badges if available 

292 'last_active': last_active.isoformat() if last_active else None, 

293 'metrics': { 

294 'karma_score': getattr(agent, 'karma_score', 0), 

295 }, 

296 }) 

297 return result 

298 

299 @staticmethod 

300 def _get_expert_agents() -> List[Dict]: 

301 """Load from ExpertAgentRegistry if available.""" 

302 result = [] 

303 try: 

304 from integrations.internal_comm.internal_agent_communication import ( 

305 AgentSkillRegistry) 

306 registry = AgentSkillRegistry.get_instance() 

307 for agent_id, agent_info in registry._agents.items(): 

308 result.append({ 

309 'id': f'expert_{agent_id}', 

310 'type': 'expert_agent', 

311 'name': agent_info.get('name', agent_id), 

312 'status': 'available', 

313 'current_task': None, 

314 'skills': list(agent_info.get('skills', {}).keys()), 

315 'last_active': None, 

316 'metrics': { 

317 'accuracy': agent_info.get('accuracy', 0), 

318 }, 

319 }) 

320 except Exception: 

321 pass 

322 return result 

323 

324 # ─────────────────────────────────────────────────────────────── 

325 # ETag short-circuit: cheap hash for client If-None-Match polls. 

326 # See api_dashboard.get_agent_dashboard / api_audit.list_agents. 

327 # The React UI polls every 5s; without this, every poll re-runs 

328 # 5 SQL queries + watchdog locks + 170-row serialization even when 

329 # nothing changed. Hash inputs are MAX(updated_at) on the two goal 

330 # tables + watchdog uptime bucket + trained-agent count: all cheap 

331 # reads that move whenever the dashboard would visibly change. 

332 # ─────────────────────────────────────────────────────────────── 

333 

334 @staticmethod 

335 def get_dashboard_version(db: Session) -> str: 

336 """Return a 16-char hash of dashboard inputs for ETag/304. 

337 

338 Cheap by construction: 

339 - MAX(updated_at) on agent_goals + coding_goals (indexed) 

340 - watchdog uptime in 5s buckets + restart-log length 

341 - count of trained_agent users 

342 

343 ANY caller depending on this for cache invalidation MUST also 

344 ensure their write site bumps `updated_at` (the SQLAlchemy 

345 `onupdate=func.now()` does this for AgentGoal/CodingGoal). 

346 For state changes that DON'T touch these tables (e.g. expert 

347 agent registry mutations, daemon thread restarts), the 

348 EventBus → SSE bridge in core/platform/bootstrap.py pushes 

349 `dashboard.invalidate` so the client re-fetches without 

350 waiting for the ETag to differ. 

351 """ 

352 parts: List[str] = [] 

353 try: 

354 from sqlalchemy import func 

355 from .models import AgentGoal, CodingGoal, User 

356 parts.append(str(db.query(func.max(AgentGoal.updated_at)).scalar())) 

357 parts.append(str(db.query(func.max(CodingGoal.updated_at)).scalar())) 

358 parts.append(str( 

359 db.query(func.count(User.id)) 

360 .filter(User.user_type == 'agent').scalar())) 

361 except Exception: 

362 # Schema not migrated, in-memory test DB without tables, etc. 

363 # Fall through with whatever we collected; version will 

364 # still be stable per request, just less precise. 

365 parts.append('schema-unavailable') 

366 

367 # Daemon restart_log length: bumps on every restart, stable 

368 # otherwise. We deliberately skip uptime: it ticks every 

369 # second and would invalidate the cache on every poll without 

370 # corresponding to a state the user sees change. Always 

371 # append something so the parts-list cardinality is stable 

372 # whether or not the watchdog has been started yet. 

373 restart_count = '' 

374 try: 

375 from security.node_watchdog import get_watchdog 

376 wd = get_watchdog() 

377 if wd is not None: 

378 restart_count = str(len(wd.get_health().get('restart_log', []))) 

379 except Exception: 

380 pass 

381 parts.append(restart_count) 

382 

383 return hashlib.sha256('|'.join(parts).encode()).hexdigest()[:16] 

384 

385 @staticmethod 

386 def _compute_priority(agent_entry: Dict) -> int: 

387 """Compute priority score for dashboard ordering. 

388 

389 Priority reflects what matters most RIGHT NOW: 

390 - Broken things first (frozen daemons) 

391 - Active work next (executing/active goals) 

392 - Background services 

393 - Idle/completed last 

394 """ 

395 status = agent_entry.get('status', '') 

396 agent_type = agent_entry.get('type', '') 

397 

398 # Tier 1: Currently executing 

399 if status in ('executing', 'dispatching'): 

400 base = DashboardService.TIER_EXECUTING 

401 # Tier 2: Active goals 

402 elif status == 'active' and 'goal' in agent_type: 

403 base = DashboardService.TIER_ACTIVE 

404 # Tier 3: Stalled goals (need attention) 

405 elif status == 'stalled': 

406 base = DashboardService.TIER_STALLED 

407 # Tier 4: Frozen daemons (need immediate attention) 

408 elif agent_type == 'daemon' and status == 'frozen': 

409 base = DashboardService.TIER_FROZEN_DAEMON 

410 # Tier 5: Healthy daemons 

411 elif agent_type == 'daemon' and status in ('healthy', 'unknown'): 

412 base = DashboardService.TIER_DAEMON 

413 # Tier 6: Idle/available 

414 elif status in ('idle', 'available'): 

415 base = DashboardService.TIER_IDLE 

416 # Tier 7: Completed/paused/failed 

417 else: 

418 base = DashboardService.TIER_COMPLETED 

419 

420 # Sub-sort by remaining spark budget (for goals) 

421 metrics = agent_entry.get('metrics', {}) 

422 budget = metrics.get('spark_budget') or 0 

423 spent = metrics.get('spark_spent') or 0 

424 remaining = max(0, budget - spent) 

425 base += min(remaining, 100) 

426 

427 return base