Coverage for integrations / social / api_audit.py: 17.8%

202 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial - Agent Audit Trail & Compute Tracking API 

3Thin aggregation layer - no new tables. Reads from existing sources: 

4 - DashboardService (agents), MemoryGraph (conversations/lifecycle), 

5 - SmartLedger (task events), AgentGoal (daemon goals), 

6 - RegionalHostRegistry (compute nodes), APIUsageLog (compute usage). 

7""" 

8import logging 

9from datetime import datetime, timedelta 

10from flask import Blueprint, request, jsonify, g, make_response 

11 

12from .auth import require_auth, optional_auth 

13from .models import get_db, AgentGoal, APIUsageLog, User 

14 

15logger = logging.getLogger('hevolve_social') 

16 

17audit_bp = Blueprint('audit', __name__, url_prefix='/api/social/audit') 

18 

19 

20def _ok(data=None, meta=None, status=200): 

21 r = {'success': True} 

22 if data is not None: 

23 r['data'] = data 

24 if meta is not None: 

25 r['meta'] = meta 

26 return jsonify(r), status 

27 

28 

29def _err(msg, status=400): 

30 return jsonify({'success': False, 'error': msg}), status 

31 

32 

33# ═══════════════════════════════════════════════════════════════ 

34# AGENTS - Unified list (local + cloud + daemon) 

35# ═══════════════════════════════════════════════════════════════ 

36 

37@audit_bp.route('/agents', methods=['GET']) 

38@require_auth 

39def list_agents(): 

40 """Unified agent list from DashboardService + local prompts. 

41 

42 Honors ``If-None-Match`` for the unfiltered case (and includes the 

43 requested ``type`` filter in the ETag). See 

44 DashboardService.get_dashboard_version for hash inputs. The React 

45 UI polls this endpoint every few seconds; without 304 the full 

46 pipeline runs each tick and starves waitress workers under 

47 ResourceGovernor's "active" throttle (cores [12,13,14,15] @ 25%). 

48 """ 

49 agent_type = request.args.get('type') # local|cloud|daemon|all 

50 db = get_db() 

51 try: 

52 from .dashboard_service import DashboardService 

53 

54 # ETag short-circuit — include the type filter so /agents and 

55 # /agents?type=daemon don't share a cache slot. 

56 version = DashboardService.get_dashboard_version(db) 

57 etag = f'W/"audit-agents-{version}-{agent_type or "all"}"' 

58 if request.headers.get('If-None-Match') == etag: 

59 resp = make_response('', 304) 

60 resp.headers['ETag'] = etag 

61 resp.headers['Cache-Control'] = 'private, max-age=2' 

62 return resp 

63 

64 dashboard = DashboardService.get_dashboard(db) 

65 agents = dashboard.get('agents', []) 

66 

67 # Filter by type if requested 

68 if agent_type and agent_type != 'all': 

69 agents = [a for a in agents if a.get('type') == agent_type] 

70 

71 # Enrich with user info if agent has a social user record 

72 for agent in agents: 

73 if agent.get('social_user_id'): 

74 user = db.query(User).filter_by(id=agent['social_user_id']).first() 

75 if user: 

76 agent['display_name'] = user.display_name 

77 agent['avatar_url'] = user.avatar_url 

78 

79 body, status = _ok(agents) 

80 resp = make_response(body, status) 

81 resp.headers['ETag'] = etag 

82 resp.headers['Cache-Control'] = 'private, max-age=2' 

83 return resp 

84 except Exception as e: 

85 logger.error(f"Audit list_agents failed: {e}") 

86 return _ok([]) 

87 finally: 

88 db.close() 

89 

90 

91@audit_bp.route('/agents/<agent_id>/timeline', methods=['GET']) 

92@require_auth 

93def get_agent_timeline(agent_id): 

94 """Chronological activity: conversations, tool calls, status transitions, thinking.""" 

95 limit = min(int(request.args.get('limit', 50)), 200) 

96 events = [] 

97 

98 # 1. MemoryGraph lifecycle + conversation events 

99 try: 

100 from integrations.channels.memory.memory_graph import MemoryGraph 

101 # Try common session key patterns 

102 user_id = g.user.id 

103 for session_key in [f"{user_id}_{agent_id}", f"default_{agent_id}", agent_id]: 

104 try: 

105 mg = MemoryGraph(session_key) 

106 memories = mg.get_session_memories(session_key, limit=limit) 

107 for m in memories: 

108 md = m.to_dict() if hasattr(m, 'to_dict') else {'content': str(m)} 

109 events.append({ 

110 'type': md.get('memory_type', 'conversation'), 

111 'timestamp': md.get('created_at'), 

112 'content': md.get('content', ''), 

113 'metadata': md.get('metadata', {}), 

114 'source': 'memory_graph', 

115 }) 

116 if events: 

117 break 

118 except Exception: 

119 continue 

120 except ImportError: 

121 pass 

122 

123 # 2. Ledger task events 

124 try: 

125 from agent_ledger.core import SmartLedger 

126 ledger = SmartLedger(agent_id=agent_id) 

127 ledger_events = ledger.get_events(limit=limit) 

128 for ev in ledger_events: 

129 events.append({ 

130 'type': 'task_event', 

131 'timestamp': ev.get('timestamp'), 

132 'content': ev.get('description', ev.get('event_type', '')), 

133 'metadata': ev, 

134 'source': 'ledger', 

135 }) 

136 except Exception: 

137 pass 

138 

139 # Sort chronologically 

140 events.sort(key=lambda e: e.get('timestamp') or '', reverse=True) 

141 return _ok(events[:limit]) 

142 

143 

144@audit_bp.route('/agents/<agent_id>/conversations', methods=['GET']) 

145@require_auth 

146def get_agent_conversations(agent_id): 

147 """Agent conversation history from MemoryGraph.""" 

148 conversations = [] 

149 try: 

150 from integrations.channels.memory.memory_graph import MemoryGraph 

151 user_id = g.user.id 

152 for session_key in [f"{user_id}_{agent_id}", f"default_{agent_id}", agent_id]: 

153 try: 

154 mg = MemoryGraph(session_key) 

155 memories = mg.get_session_memories(session_key, limit=100) 

156 for m in memories: 

157 md = m.to_dict() if hasattr(m, 'to_dict') else {'content': str(m)} 

158 if md.get('memory_type') == 'conversation': 

159 conversations.append({ 

160 'id': md.get('id'), 

161 'role': md.get('metadata', {}).get('role', 'assistant'), 

162 'content': md.get('content', ''), 

163 'timestamp': md.get('created_at'), 

164 }) 

165 if conversations: 

166 break 

167 except Exception: 

168 continue 

169 except ImportError: 

170 pass 

171 return _ok(conversations) 

172 

173 

174@audit_bp.route('/agents/<agent_id>/thinking', methods=['GET']) 

175@require_auth 

176def get_agent_thinking(agent_id): 

177 """Agent reasoning chain from MemoryGraph lifecycle events.""" 

178 thinking = [] 

179 try: 

180 from integrations.channels.memory.memory_graph import MemoryGraph 

181 user_id = g.user.id 

182 for session_key in [f"{user_id}_{agent_id}", f"default_{agent_id}", agent_id]: 

183 try: 

184 mg = MemoryGraph(session_key) 

185 memories = mg.get_session_memories(session_key, limit=100) 

186 for m in memories: 

187 md = m.to_dict() if hasattr(m, 'to_dict') else {'content': str(m)} 

188 if md.get('memory_type') in ('lifecycle', 'thinking', 'tool_call'): 

189 thinking.append({ 

190 'type': md.get('memory_type'), 

191 'content': md.get('content', ''), 

192 'timestamp': md.get('created_at'), 

193 'metadata': md.get('metadata', {}), 

194 }) 

195 if thinking: 

196 break 

197 except Exception: 

198 continue 

199 except ImportError: 

200 pass 

201 return _ok(thinking) 

202 

203 

204# ═══════════════════════════════════════════════════════════════ 

205# DAEMON - Background agent activity 

206# ═══════════════════════════════════════════════════════════════ 

207 

208@audit_bp.route('/daemon/activity', methods=['GET']) 

209@require_auth 

210def get_daemon_activity(): 

211 """Recent daemon actions: goal dispatches, idle agent detection, remediation.""" 

212 limit = min(int(request.args.get('limit', 30)), 100) 

213 activity = [] 

214 

215 # Recent goal dispatches 

216 db = get_db() 

217 try: 

218 cutoff = datetime.utcnow() - timedelta(hours=24) 

219 goals = (db.query(AgentGoal) 

220 .filter(AgentGoal.created_at >= cutoff) 

221 .order_by(AgentGoal.created_at.desc()) 

222 .limit(limit) 

223 .all()) 

224 for goal in goals: 

225 activity.append({ 

226 'type': 'goal_dispatch', 

227 'timestamp': goal.created_at.isoformat() if goal.created_at else None, 

228 'goal_id': goal.id, 

229 'goal_type': goal.goal_type, 

230 'status': goal.status, 

231 'description': goal.description, 

232 'assigned_agent_id': goal.assigned_agent_id, 

233 }) 

234 except Exception as e: 

235 logger.debug(f"Daemon activity query failed: {e}") 

236 finally: 

237 db.close() 

238 

239 # Ledger daemon events 

240 try: 

241 from agent_ledger.core import SmartLedger 

242 ledger = SmartLedger(agent_id='daemon') 

243 events = ledger.get_events(limit=limit) 

244 for ev in events: 

245 activity.append({ 

246 'type': ev.get('event_type', 'daemon_tick'), 

247 'timestamp': ev.get('timestamp'), 

248 'content': ev.get('description', ''), 

249 'metadata': ev, 

250 }) 

251 except Exception: 

252 pass 

253 

254 activity.sort(key=lambda a: a.get('timestamp') or '', reverse=True) 

255 return _ok(activity[:limit]) 

256 

257 

258@audit_bp.route('/daemon/goals', methods=['GET']) 

259@require_auth 

260def get_daemon_goals(): 

261 """Active and completed goals with progress.""" 

262 status_filter = request.args.get('status', 'active') 

263 db = get_db() 

264 try: 

265 q = db.query(AgentGoal) 

266 if status_filter != 'all': 

267 q = q.filter(AgentGoal.status == status_filter) 

268 goals = q.order_by(AgentGoal.created_at.desc()).limit(50).all() 

269 

270 result = [] 

271 for goal in goals: 

272 gd = goal.to_dict() 

273 

274 # Try to get progress from distributed coordinator 

275 try: 

276 from integrations.distributed_agent.task_coordinator import DistributedTaskCoordinator 

277 coord = DistributedTaskCoordinator() 

278 progress = coord.get_goal_progress(goal.id) 

279 gd['progress'] = progress 

280 except Exception: 

281 gd['progress'] = None 

282 

283 result.append(gd) 

284 

285 return _ok(result) 

286 except Exception as e: 

287 logger.error(f"Daemon goals query failed: {e}") 

288 return _ok([]) 

289 finally: 

290 db.close() 

291 

292 

293# ═══════════════════════════════════════════════════════════════ 

294# COMPUTE - Node tracking (regional/central only for full view) 

295# ═══════════════════════════════════════════════════════════════ 

296 

297@audit_bp.route('/compute/nodes', methods=['GET']) 

298@require_auth 

299def get_compute_nodes(): 

300 """All compute nodes from RegionalHostRegistry.""" 

301 nodes = [] 

302 try: 

303 from integrations.distributed_agent.host_registry import RegionalHostRegistry 

304 registry = RegionalHostRegistry() 

305 nodes = registry.get_all_hosts() 

306 except Exception as e: 

307 logger.debug(f"Compute nodes query failed: {e}") 

308 

309 # For flat users, only show their own node info 

310 if g.user.role not in ('central', 'regional'): 

311 nodes = [n for n in nodes if n.get('owner_id') == g.user.id] 

312 

313 return _ok(nodes) 

314 

315 

316@audit_bp.route('/compute/usage', methods=['GET']) 

317@require_auth 

318def get_compute_usage(): 

319 """Compute usage aggregated by user/agent from api_usage_log.""" 

320 days = min(int(request.args.get('days', 7)), 30) 

321 cutoff = datetime.utcnow() - timedelta(days=days) 

322 

323 db = get_db() 

324 try: 

325 from sqlalchemy import func as sqlfunc 

326 query = (db.query( 

327 APIUsageLog.api_key_id, 

328 sqlfunc.count(APIUsageLog.id).label('request_count'), 

329 sqlfunc.sum(APIUsageLog.tokens_in).label('total_tokens_in'), 

330 sqlfunc.sum(APIUsageLog.tokens_out).label('total_tokens_out'), 

331 sqlfunc.sum(APIUsageLog.compute_ms).label('total_compute_ms'), 

332 sqlfunc.sum(APIUsageLog.cost_credits).label('total_cost'), 

333 ).filter(APIUsageLog.created_at >= cutoff) 

334 .group_by(APIUsageLog.api_key_id) 

335 .all()) 

336 

337 usage = [] 

338 for row in query: 

339 usage.append({ 

340 'api_key_id': row.api_key_id, 

341 'request_count': row.request_count or 0, 

342 'total_tokens_in': row.total_tokens_in or 0, 

343 'total_tokens_out': row.total_tokens_out or 0, 

344 'total_compute_ms': row.total_compute_ms or 0, 

345 'total_cost': float(row.total_cost or 0), 

346 }) 

347 

348 # For flat users, filter to own usage 

349 if g.user.role not in ('central', 'regional'): 

350 # Get user's API keys 

351 from .models import CommercialAPIKey 

352 user_keys = {k.id for k in 

353 db.query(CommercialAPIKey.id).filter_by(user_id=g.user.id).all()} 

354 usage = [u for u in usage if u['api_key_id'] in user_keys] 

355 

356 return _ok(usage) 

357 except Exception as e: 

358 logger.error(f"Compute usage query failed: {e}") 

359 return _ok([]) 

360 finally: 

361 db.close() 

362 

363 

364@audit_bp.route('/compute/routing', methods=['GET']) 

365@require_auth 

366def get_compute_routing(): 

367 """Current routing info: node tier, LLM backend, routing reasons.""" 

368 import os 

369 routing = { 

370 'node_tier': 'flat', 

371 'llm_backend': 'unknown', 

372 'routing_reasons': [], 

373 } 

374 

375 # Detect node tier 

376 try: 

377 from security.key_delegation import get_node_tier 

378 routing['node_tier'] = get_node_tier() 

379 except Exception: 

380 pass 

381 

382 # Detect LLM backend 

383 hevolve_url = os.environ.get('HEVOLVE_BACKEND_URL', '') 

384 if hevolve_url: 

385 routing['llm_backend'] = 'hart_intelligence' 

386 routing['llm_url'] = hevolve_url 

387 else: 

388 routing['llm_backend'] = 'direct_llama' 

389 

390 # Check local LLM availability 

391 try: 

392 from core.http_pool import pooled_get 

393 resp = pooled_get('http://localhost:8080/health', timeout=2) 

394 routing['local_llm_available'] = resp.status_code == 200 

395 except Exception: 

396 routing['local_llm_available'] = False 

397 

398 # Routing reasons 

399 if not routing['local_llm_available']: 

400 routing['routing_reasons'].append({ 

401 'reason': 'compute_unavailable', 

402 'description': 'Local LLM not running - requests routed to regional/cloud', 

403 }) 

404 

405 # Check host registry for connected nodes 

406 try: 

407 from integrations.distributed_agent.host_registry import RegionalHostRegistry 

408 registry = RegionalHostRegistry() 

409 hosts = registry.get_all_hosts() 

410 routing['connected_nodes'] = len(hosts) 

411 except Exception: 

412 routing['connected_nodes'] = 0 

413 

414 return _ok(routing)