Coverage for integrations / social / api_audit.py: 17.8%
202 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HevolveSocial - Agent Audit Trail & Compute Tracking API
3Thin aggregation layer - no new tables. Reads from existing sources:
4 - DashboardService (agents), MemoryGraph (conversations/lifecycle),
5 - SmartLedger (task events), AgentGoal (daemon goals),
6 - RegionalHostRegistry (compute nodes), APIUsageLog (compute usage).
7"""
8import logging
9from datetime import datetime, timedelta
10from flask import Blueprint, request, jsonify, g, make_response
12from .auth import require_auth, optional_auth
13from .models import get_db, AgentGoal, APIUsageLog, User
15logger = logging.getLogger('hevolve_social')
17audit_bp = Blueprint('audit', __name__, url_prefix='/api/social/audit')
20def _ok(data=None, meta=None, status=200):
21 r = {'success': True}
22 if data is not None:
23 r['data'] = data
24 if meta is not None:
25 r['meta'] = meta
26 return jsonify(r), status
29def _err(msg, status=400):
30 return jsonify({'success': False, 'error': msg}), status
33# ═══════════════════════════════════════════════════════════════
34# AGENTS - Unified list (local + cloud + daemon)
35# ═══════════════════════════════════════════════════════════════
37@audit_bp.route('/agents', methods=['GET'])
38@require_auth
39def list_agents():
40 """Unified agent list from DashboardService + local prompts.
42 Honors ``If-None-Match`` for the unfiltered case (and includes the
43 requested ``type`` filter in the ETag). See
44 DashboardService.get_dashboard_version for hash inputs. The React
45 UI polls this endpoint every few seconds; without 304 the full
46 pipeline runs each tick and starves waitress workers under
47 ResourceGovernor's "active" throttle (cores [12,13,14,15] @ 25%).
48 """
49 agent_type = request.args.get('type') # local|cloud|daemon|all
50 db = get_db()
51 try:
52 from .dashboard_service import DashboardService
54 # ETag short-circuit — include the type filter so /agents and
55 # /agents?type=daemon don't share a cache slot.
56 version = DashboardService.get_dashboard_version(db)
57 etag = f'W/"audit-agents-{version}-{agent_type or "all"}"'
58 if request.headers.get('If-None-Match') == etag:
59 resp = make_response('', 304)
60 resp.headers['ETag'] = etag
61 resp.headers['Cache-Control'] = 'private, max-age=2'
62 return resp
64 dashboard = DashboardService.get_dashboard(db)
65 agents = dashboard.get('agents', [])
67 # Filter by type if requested
68 if agent_type and agent_type != 'all':
69 agents = [a for a in agents if a.get('type') == agent_type]
71 # Enrich with user info if agent has a social user record
72 for agent in agents:
73 if agent.get('social_user_id'):
74 user = db.query(User).filter_by(id=agent['social_user_id']).first()
75 if user:
76 agent['display_name'] = user.display_name
77 agent['avatar_url'] = user.avatar_url
79 body, status = _ok(agents)
80 resp = make_response(body, status)
81 resp.headers['ETag'] = etag
82 resp.headers['Cache-Control'] = 'private, max-age=2'
83 return resp
84 except Exception as e:
85 logger.error(f"Audit list_agents failed: {e}")
86 return _ok([])
87 finally:
88 db.close()
91@audit_bp.route('/agents/<agent_id>/timeline', methods=['GET'])
92@require_auth
93def get_agent_timeline(agent_id):
94 """Chronological activity: conversations, tool calls, status transitions, thinking."""
95 limit = min(int(request.args.get('limit', 50)), 200)
96 events = []
98 # 1. MemoryGraph lifecycle + conversation events
99 try:
100 from integrations.channels.memory.memory_graph import MemoryGraph
101 # Try common session key patterns
102 user_id = g.user.id
103 for session_key in [f"{user_id}_{agent_id}", f"default_{agent_id}", agent_id]:
104 try:
105 mg = MemoryGraph(session_key)
106 memories = mg.get_session_memories(session_key, limit=limit)
107 for m in memories:
108 md = m.to_dict() if hasattr(m, 'to_dict') else {'content': str(m)}
109 events.append({
110 'type': md.get('memory_type', 'conversation'),
111 'timestamp': md.get('created_at'),
112 'content': md.get('content', ''),
113 'metadata': md.get('metadata', {}),
114 'source': 'memory_graph',
115 })
116 if events:
117 break
118 except Exception:
119 continue
120 except ImportError:
121 pass
123 # 2. Ledger task events
124 try:
125 from agent_ledger.core import SmartLedger
126 ledger = SmartLedger(agent_id=agent_id)
127 ledger_events = ledger.get_events(limit=limit)
128 for ev in ledger_events:
129 events.append({
130 'type': 'task_event',
131 'timestamp': ev.get('timestamp'),
132 'content': ev.get('description', ev.get('event_type', '')),
133 'metadata': ev,
134 'source': 'ledger',
135 })
136 except Exception:
137 pass
139 # Sort chronologically
140 events.sort(key=lambda e: e.get('timestamp') or '', reverse=True)
141 return _ok(events[:limit])
144@audit_bp.route('/agents/<agent_id>/conversations', methods=['GET'])
145@require_auth
146def get_agent_conversations(agent_id):
147 """Agent conversation history from MemoryGraph."""
148 conversations = []
149 try:
150 from integrations.channels.memory.memory_graph import MemoryGraph
151 user_id = g.user.id
152 for session_key in [f"{user_id}_{agent_id}", f"default_{agent_id}", agent_id]:
153 try:
154 mg = MemoryGraph(session_key)
155 memories = mg.get_session_memories(session_key, limit=100)
156 for m in memories:
157 md = m.to_dict() if hasattr(m, 'to_dict') else {'content': str(m)}
158 if md.get('memory_type') == 'conversation':
159 conversations.append({
160 'id': md.get('id'),
161 'role': md.get('metadata', {}).get('role', 'assistant'),
162 'content': md.get('content', ''),
163 'timestamp': md.get('created_at'),
164 })
165 if conversations:
166 break
167 except Exception:
168 continue
169 except ImportError:
170 pass
171 return _ok(conversations)
174@audit_bp.route('/agents/<agent_id>/thinking', methods=['GET'])
175@require_auth
176def get_agent_thinking(agent_id):
177 """Agent reasoning chain from MemoryGraph lifecycle events."""
178 thinking = []
179 try:
180 from integrations.channels.memory.memory_graph import MemoryGraph
181 user_id = g.user.id
182 for session_key in [f"{user_id}_{agent_id}", f"default_{agent_id}", agent_id]:
183 try:
184 mg = MemoryGraph(session_key)
185 memories = mg.get_session_memories(session_key, limit=100)
186 for m in memories:
187 md = m.to_dict() if hasattr(m, 'to_dict') else {'content': str(m)}
188 if md.get('memory_type') in ('lifecycle', 'thinking', 'tool_call'):
189 thinking.append({
190 'type': md.get('memory_type'),
191 'content': md.get('content', ''),
192 'timestamp': md.get('created_at'),
193 'metadata': md.get('metadata', {}),
194 })
195 if thinking:
196 break
197 except Exception:
198 continue
199 except ImportError:
200 pass
201 return _ok(thinking)
204# ═══════════════════════════════════════════════════════════════
205# DAEMON - Background agent activity
206# ═══════════════════════════════════════════════════════════════
208@audit_bp.route('/daemon/activity', methods=['GET'])
209@require_auth
210def get_daemon_activity():
211 """Recent daemon actions: goal dispatches, idle agent detection, remediation."""
212 limit = min(int(request.args.get('limit', 30)), 100)
213 activity = []
215 # Recent goal dispatches
216 db = get_db()
217 try:
218 cutoff = datetime.utcnow() - timedelta(hours=24)
219 goals = (db.query(AgentGoal)
220 .filter(AgentGoal.created_at >= cutoff)
221 .order_by(AgentGoal.created_at.desc())
222 .limit(limit)
223 .all())
224 for goal in goals:
225 activity.append({
226 'type': 'goal_dispatch',
227 'timestamp': goal.created_at.isoformat() if goal.created_at else None,
228 'goal_id': goal.id,
229 'goal_type': goal.goal_type,
230 'status': goal.status,
231 'description': goal.description,
232 'assigned_agent_id': goal.assigned_agent_id,
233 })
234 except Exception as e:
235 logger.debug(f"Daemon activity query failed: {e}")
236 finally:
237 db.close()
239 # Ledger daemon events
240 try:
241 from agent_ledger.core import SmartLedger
242 ledger = SmartLedger(agent_id='daemon')
243 events = ledger.get_events(limit=limit)
244 for ev in events:
245 activity.append({
246 'type': ev.get('event_type', 'daemon_tick'),
247 'timestamp': ev.get('timestamp'),
248 'content': ev.get('description', ''),
249 'metadata': ev,
250 })
251 except Exception:
252 pass
254 activity.sort(key=lambda a: a.get('timestamp') or '', reverse=True)
255 return _ok(activity[:limit])
258@audit_bp.route('/daemon/goals', methods=['GET'])
259@require_auth
260def get_daemon_goals():
261 """Active and completed goals with progress."""
262 status_filter = request.args.get('status', 'active')
263 db = get_db()
264 try:
265 q = db.query(AgentGoal)
266 if status_filter != 'all':
267 q = q.filter(AgentGoal.status == status_filter)
268 goals = q.order_by(AgentGoal.created_at.desc()).limit(50).all()
270 result = []
271 for goal in goals:
272 gd = goal.to_dict()
274 # Try to get progress from distributed coordinator
275 try:
276 from integrations.distributed_agent.task_coordinator import DistributedTaskCoordinator
277 coord = DistributedTaskCoordinator()
278 progress = coord.get_goal_progress(goal.id)
279 gd['progress'] = progress
280 except Exception:
281 gd['progress'] = None
283 result.append(gd)
285 return _ok(result)
286 except Exception as e:
287 logger.error(f"Daemon goals query failed: {e}")
288 return _ok([])
289 finally:
290 db.close()
293# ═══════════════════════════════════════════════════════════════
294# COMPUTE - Node tracking (regional/central only for full view)
295# ═══════════════════════════════════════════════════════════════
297@audit_bp.route('/compute/nodes', methods=['GET'])
298@require_auth
299def get_compute_nodes():
300 """All compute nodes from RegionalHostRegistry."""
301 nodes = []
302 try:
303 from integrations.distributed_agent.host_registry import RegionalHostRegistry
304 registry = RegionalHostRegistry()
305 nodes = registry.get_all_hosts()
306 except Exception as e:
307 logger.debug(f"Compute nodes query failed: {e}")
309 # For flat users, only show their own node info
310 if g.user.role not in ('central', 'regional'):
311 nodes = [n for n in nodes if n.get('owner_id') == g.user.id]
313 return _ok(nodes)
316@audit_bp.route('/compute/usage', methods=['GET'])
317@require_auth
318def get_compute_usage():
319 """Compute usage aggregated by user/agent from api_usage_log."""
320 days = min(int(request.args.get('days', 7)), 30)
321 cutoff = datetime.utcnow() - timedelta(days=days)
323 db = get_db()
324 try:
325 from sqlalchemy import func as sqlfunc
326 query = (db.query(
327 APIUsageLog.api_key_id,
328 sqlfunc.count(APIUsageLog.id).label('request_count'),
329 sqlfunc.sum(APIUsageLog.tokens_in).label('total_tokens_in'),
330 sqlfunc.sum(APIUsageLog.tokens_out).label('total_tokens_out'),
331 sqlfunc.sum(APIUsageLog.compute_ms).label('total_compute_ms'),
332 sqlfunc.sum(APIUsageLog.cost_credits).label('total_cost'),
333 ).filter(APIUsageLog.created_at >= cutoff)
334 .group_by(APIUsageLog.api_key_id)
335 .all())
337 usage = []
338 for row in query:
339 usage.append({
340 'api_key_id': row.api_key_id,
341 'request_count': row.request_count or 0,
342 'total_tokens_in': row.total_tokens_in or 0,
343 'total_tokens_out': row.total_tokens_out or 0,
344 'total_compute_ms': row.total_compute_ms or 0,
345 'total_cost': float(row.total_cost or 0),
346 })
348 # For flat users, filter to own usage
349 if g.user.role not in ('central', 'regional'):
350 # Get user's API keys
351 from .models import CommercialAPIKey
352 user_keys = {k.id for k in
353 db.query(CommercialAPIKey.id).filter_by(user_id=g.user.id).all()}
354 usage = [u for u in usage if u['api_key_id'] in user_keys]
356 return _ok(usage)
357 except Exception as e:
358 logger.error(f"Compute usage query failed: {e}")
359 return _ok([])
360 finally:
361 db.close()
364@audit_bp.route('/compute/routing', methods=['GET'])
365@require_auth
366def get_compute_routing():
367 """Current routing info: node tier, LLM backend, routing reasons."""
368 import os
369 routing = {
370 'node_tier': 'flat',
371 'llm_backend': 'unknown',
372 'routing_reasons': [],
373 }
375 # Detect node tier
376 try:
377 from security.key_delegation import get_node_tier
378 routing['node_tier'] = get_node_tier()
379 except Exception:
380 pass
382 # Detect LLM backend
383 hevolve_url = os.environ.get('HEVOLVE_BACKEND_URL', '')
384 if hevolve_url:
385 routing['llm_backend'] = 'hart_intelligence'
386 routing['llm_url'] = hevolve_url
387 else:
388 routing['llm_backend'] = 'direct_llama'
390 # Check local LLM availability
391 try:
392 from core.http_pool import pooled_get
393 resp = pooled_get('http://localhost:8080/health', timeout=2)
394 routing['local_llm_available'] = resp.status_code == 200
395 except Exception:
396 routing['local_llm_available'] = False
398 # Routing reasons
399 if not routing['local_llm_available']:
400 routing['routing_reasons'].append({
401 'reason': 'compute_unavailable',
402 'description': 'Local LLM not running - requests routed to regional/cloud',
403 })
405 # Check host registry for connected nodes
406 try:
407 from integrations.distributed_agent.host_registry import RegionalHostRegistry
408 registry = RegionalHostRegistry()
409 hosts = registry.get_all_hosts()
410 routing['connected_nodes'] = len(hosts)
411 except Exception:
412 routing['connected_nodes'] = 0
414 return _ok(routing)