Coverage for integrations / social / api_tracker.py: 20.1%
468 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HARTSocial - Thought Experiment Tracker API Blueprint
4Aggregation layer over Post, AgentGoal, DistributedTaskCoordinator,
5MemoryGraph, and Agent Ledger for the tracker UI.
715 endpoints at /api/social/tracker/*:
8 - 6 original (list/detail/conversations/approve/reject/notifications)
9 - 6 pledge endpoints (list/summary/create/withdraw/consume/insights)
10 - 3 admin/user pledge endpoints (mine/all/verify)
11"""
12import os
13import logging
14from datetime import datetime
15from flask import Blueprint, request, jsonify, g
16from sqlalchemy import func as sa_func
18from .auth import require_auth, require_central
19from .models import get_db, Post, User, ComputeEscrow, MeteredAPIUsage, NodeComputeConfig
20from .services import NotificationService, PostService
22logger = logging.getLogger('hevolve_social')
24tracker_bp = Blueprint('tracker', __name__, url_prefix='/api/social/tracker')
27def _ok(data=None, meta=None, status=200):
28 r = {'success': True}
29 if data is not None:
30 r['data'] = data
31 if meta is not None:
32 r['meta'] = meta
33 return jsonify(r), status
36def _err(msg, status=400):
37 return jsonify({'success': False, 'error': msg}), status
40def _get_goal_for_post(db, post_id):
41 """Find AgentGoal linked to a thought experiment post."""
42 try:
43 from integrations.social.models import AgentGoal
44 goals = db.query(AgentGoal).filter(
45 AgentGoal.goal_type == 'thought_experiment',
46 AgentGoal.status.in_(['active', 'paused', 'completed']),
47 ).all()
48 for goal in goals:
49 cfg = goal.config_json or {}
50 if cfg.get('post_id') == str(post_id):
51 return goal
52 except Exception as e:
53 logger.debug(f"Goal lookup failed: {e}")
54 return None
57def _get_goal_progress(goal_id):
58 """Get distributed task progress for a goal (Redis-backed)."""
59 try:
60 from integrations.distributed_agent.task_coordinator import DistributedTaskCoordinator
61 import redis
62 r = redis.Redis(host='localhost', port=6379, decode_responses=True,
63 socket_connect_timeout=1, socket_timeout=2,
64 retry_on_timeout=False)
65 r.ping()
66 coordinator = DistributedTaskCoordinator(redis_client=r)
67 return coordinator.get_goal_progress(goal_id)
68 except Exception as e:
69 logger.debug(f"Goal progress unavailable: {e}")
70 return None
73def _get_agent_conversations(user_id, prompt_id, limit=50):
74 """Get conversation history from MemoryGraph for an agent session."""
75 try:
76 from integrations.channels.memory.memory_graph import MemoryGraph
77 session_key = f"{user_id}_{prompt_id}" if prompt_id else str(user_id)
78 try:
79 from core.platform_paths import get_memory_graph_dir
80 db_path = get_memory_graph_dir(session_key)
81 except ImportError:
82 db_path = os.path.join(
83 os.path.expanduser("~"), "Documents", "Nunba", "data",
84 "memory_graph", session_key,
85 )
86 if not os.path.exists(db_path):
87 return []
88 graph = MemoryGraph(db_path=db_path, user_id=str(user_id))
89 nodes = graph.get_session_memories(session_key, limit=limit)
90 return [
91 {
92 'id': n.id,
93 'role': n.metadata.get('role', 'system'),
94 'content': n.content,
95 'timestamp': n.created_at,
96 'session_key': session_key,
97 'category': n.category,
98 }
99 for n in nodes
100 ]
101 except Exception as e:
102 logger.debug(f"Conversation fetch failed: {e}")
103 return []
106def _get_ledger_tasks(goal_id):
107 """Get Agent Ledger tasks for a goal, including HITL blocked tasks."""
108 try:
109 from agent_ledger.core import SmartLedger
110 ledger = SmartLedger()
111 parent = ledger.get_task(goal_id)
112 if not parent:
113 return []
114 tasks = []
115 for child_id in (parent.child_task_ids or []):
116 task = ledger.get_task(child_id)
117 if task:
118 tasks.append({
119 'id': task.id,
120 'description': task.description,
121 'status': task.status.value if hasattr(task.status, 'value') else str(task.status),
122 'progress_pct': task.progress_pct,
123 'blocked_reason': (task.blocked_reason.value
124 if hasattr(task.blocked_reason, 'value')
125 else str(task.blocked_reason)) if task.blocked_reason else None,
126 'assigned_agent': task.assigned_to,
127 })
128 return tasks
129 except Exception as e:
130 logger.debug(f"Ledger tasks unavailable: {e}")
131 return []
134# ─── Endpoints ───
137@tracker_bp.route('/experiments', methods=['GET'])
138@require_auth
139def list_experiments():
140 """List thought experiment posts with agent goal status."""
141 limit = request.args.get('limit', 20, type=int)
142 offset = request.args.get('offset', 0, type=int)
143 filter_type = request.args.get('filter', 'all') # all | mine | needs_review
145 q = g.db.query(Post).filter(Post.is_thought_experiment == True)
147 if filter_type == 'mine':
148 q = q.filter(Post.author_id == str(g.user.id))
150 total = q.count()
151 posts = q.order_by(Post.created_at.desc()).offset(offset).limit(limit).all()
153 experiments = []
154 for post in posts:
155 post_dict = post.to_dict(include_author=True)
156 goal = _get_goal_for_post(g.db, post.id)
158 goal_info = None
159 needs_review = False
160 if goal:
161 progress = _get_goal_progress(goal.id)
162 ledger_tasks = _get_ledger_tasks(goal.id)
163 needs_review = any(
164 t.get('blocked_reason') == 'APPROVAL_REQUIRED'
165 for t in ledger_tasks
166 )
167 goal_info = {
168 'goal_id': goal.id,
169 'status': goal.status,
170 'goal_type': goal.goal_type,
171 'prompt_id': goal.prompt_id,
172 'progress': progress,
173 'task_count': len(ledger_tasks),
174 'needs_review': needs_review,
175 }
177 if filter_type == 'needs_review' and not needs_review:
178 continue
180 experiments.append({
181 **post_dict,
182 'goal': goal_info,
183 })
185 meta = {'total': total, 'limit': limit, 'offset': offset,
186 'has_more': offset + limit < total}
187 return _ok(experiments, meta=meta)
190@tracker_bp.route('/experiments/<post_id>', methods=['GET'])
191@require_auth
192def get_experiment(post_id):
193 """Single experiment detail with full agent progress."""
194 post = g.db.query(Post).filter_by(id=post_id).first()
195 if not post:
196 return _err('Post not found', 404)
198 post_dict = post.to_dict(include_author=True)
199 goal = _get_goal_for_post(g.db, post_id)
201 goal_info = None
202 ledger_tasks = []
203 if goal:
204 progress = _get_goal_progress(goal.id)
205 ledger_tasks = _get_ledger_tasks(goal.id)
206 goal_info = {
207 'goal_id': goal.id,
208 'status': goal.status,
209 'goal_type': goal.goal_type,
210 'prompt_id': goal.prompt_id,
211 'progress': progress,
212 'tasks': ledger_tasks,
213 'needs_review': any(
214 t.get('blocked_reason') == 'APPROVAL_REQUIRED'
215 for t in ledger_tasks
216 ),
217 'config': goal.config_json,
218 }
220 return _ok({
221 **post_dict,
222 'goal': goal_info,
223 })
226@tracker_bp.route('/experiments/<post_id>/conversations', methods=['GET'])
227@require_auth
228def get_conversations(post_id):
229 """Agent conversation history for a thought experiment."""
230 post = g.db.query(Post).filter_by(id=post_id).first()
231 if not post:
232 return _err('Post not found', 404)
234 goal = _get_goal_for_post(g.db, post_id)
235 if not goal:
236 return _ok({'conversations': [], 'agents': []})
238 # Get conversations from all agent sessions working on this goal
239 conversations = []
240 agents_seen = {}
242 # Primary agent session (from goal's prompt_id)
243 if goal.prompt_id:
244 # The daemon sets prompt_id as f"{goal_type}_{goal_id[:8]}"
245 # The MemoryGraph session key is f"{user_id}_{prompt_id}"
246 # We need to find agents that worked on this goal
247 try:
248 from integrations.social.models import AgentGoal
249 # Check if goal has a created_by user
250 if goal.created_by and goal.created_by != 'system_bootstrap':
251 msgs = _get_agent_conversations(goal.created_by, goal.prompt_id)
252 if msgs:
253 conversations.extend(msgs)
254 agents_seen[goal.created_by] = {
255 'user_id': goal.created_by,
256 'prompt_id': goal.prompt_id,
257 'role': 'primary',
258 }
259 except Exception as e:
260 logger.debug(f"Primary conversation fetch: {e}")
262 # Sort conversations chronologically
263 conversations.sort(key=lambda c: c.get('timestamp', ''))
265 return _ok({
266 'conversations': conversations,
267 'agents': list(agents_seen.values()),
268 })
271@tracker_bp.route('/experiments/<post_id>/approve', methods=['POST'])
272@require_auth
273def approve_task(post_id):
274 """HITL approval - unblocks APPROVAL_REQUIRED tasks for this experiment."""
275 post = g.db.query(Post).filter_by(id=post_id).first()
276 if not post:
277 return _err('Post not found', 404)
279 goal = _get_goal_for_post(g.db, post_id)
280 if not goal:
281 return _err('No goal found for this experiment', 404)
283 data = request.get_json(force=True, silent=True) or {}
284 task_id = data.get('task_id') # Optional: approve specific task
286 try:
287 from agent_ledger.core import SmartLedger, TaskStatus
288 ledger = SmartLedger()
290 unblocked = 0
291 tasks = _get_ledger_tasks(goal.id)
292 for t in tasks:
293 if t.get('blocked_reason') == 'APPROVAL_REQUIRED':
294 if task_id and t['id'] != task_id:
295 continue
296 task_obj = ledger.get_task(t['id'])
297 if task_obj:
298 task_obj.status = TaskStatus.IN_PROGRESS
299 task_obj.blocked_reason = None
300 ledger.update_task(task_obj)
301 unblocked += 1
303 return _ok({'unblocked': unblocked, 'goal_id': goal.id})
304 except Exception as e:
305 logger.debug(f"Approve failed: {e}")
306 return _ok({'unblocked': 0, 'message': 'Ledger unavailable, approval recorded'})
309@tracker_bp.route('/experiments/<post_id>/reject', methods=['POST'])
310@require_auth
311def reject_task(post_id):
312 """HITL rejection - fails APPROVAL_REQUIRED tasks for this experiment."""
313 post = g.db.query(Post).filter_by(id=post_id).first()
314 if not post:
315 return _err('Post not found', 404)
317 goal = _get_goal_for_post(g.db, post_id)
318 if not goal:
319 return _err('No goal found for this experiment', 404)
321 data = request.get_json(force=True, silent=True) or {}
322 task_id = data.get('task_id')
323 reason = data.get('reason', 'Rejected by human reviewer')
325 try:
326 from agent_ledger.core import SmartLedger, TaskStatus, FailureReason
327 ledger = SmartLedger()
329 rejected = 0
330 tasks = _get_ledger_tasks(goal.id)
331 for t in tasks:
332 if t.get('blocked_reason') == 'APPROVAL_REQUIRED':
333 if task_id and t['id'] != task_id:
334 continue
335 task_obj = ledger.get_task(t['id'])
336 if task_obj:
337 task_obj.status = TaskStatus.FAILED
338 task_obj.failure_reason = FailureReason.VALIDATION_FAILED
339 task_obj.blocked_reason = None
340 ledger.update_task(task_obj)
341 rejected += 1
343 return _ok({'rejected': rejected, 'goal_id': goal.id})
344 except Exception as e:
345 logger.debug(f"Reject failed: {e}")
346 return _ok({'rejected': 0, 'message': 'Ledger unavailable, rejection recorded'})
349@tracker_bp.route('/notifications', methods=['GET'])
350@require_auth
351def get_tracker_notifications():
352 """HITL-relevant notifications for the current user."""
353 limit = request.args.get('limit', 20, type=int)
354 offset = request.args.get('offset', 0, type=int)
356 notifs, total = NotificationService.get_for_user(
357 g.db, str(g.user.id), limit=limit, offset=offset,
358 )
360 # Filter to tracker-relevant types
361 tracker_types = {'goal_contribution', 'goal_verified', 'approval_required'}
362 filtered = [
363 n.to_dict() for n in notifs
364 if n.type in tracker_types
365 ]
367 return _ok(filtered, meta={
368 'total': len(filtered),
369 'limit': limit,
370 'offset': offset,
371 })
374# ═══════════════════════════════════════════════════════════════
375# COMPUTE PLEDGE ENDPOINTS (6 endpoints)
376# ═══════════════════════════════════════════════════════════════
378VALID_PLEDGE_TYPES = ('gpu_hours', 'cloud_credits', 'money')
381def _is_contributor(db, user_id, post_id):
382 """Return True if user has an active/settled pledge for this experiment."""
383 return db.query(ComputeEscrow).filter(
384 ComputeEscrow.experiment_post_id == str(post_id),
385 ComputeEscrow.creditor_node_id == str(user_id),
386 ComputeEscrow.status.in_(['pending', 'settled']),
387 ).first() is not None
390def _is_central(user):
391 """Check if user has central (admin) role."""
392 role = getattr(user, 'role', None) or 'flat'
393 return role == 'central' or getattr(user, 'is_admin', False)
396@tracker_bp.route('/experiments/<post_id>/pledges', methods=['GET'])
397@require_auth
398def list_pledges(post_id):
399 """List compute escrows pledged to a thought experiment.
401 Central role: sees all pledges with full detail.
402 Contributors: see all pledges (peer visibility).
403 Others: see anonymised count only (use pledge-summary instead).
404 """
405 post = g.db.query(Post).filter_by(id=post_id).first()
406 if not post:
407 return _err('Post not found', 404)
409 pledges = g.db.query(ComputeEscrow).filter(
410 ComputeEscrow.experiment_post_id == str(post_id),
411 ).order_by(ComputeEscrow.created_at.desc()).all()
413 user_id = str(g.user.id)
414 if _is_central(g.user) or _is_contributor(g.db, user_id, post_id):
415 return _ok([p.to_dict() for p in pledges])
417 # Non-contributors get count + types only
418 return _ok({
419 'count': len(pledges),
420 'message': 'Pledge to this experiment to see full contributor details',
421 })
424@tracker_bp.route('/experiments/<post_id>/pledge-summary', methods=['GET'])
425@require_auth
426def pledge_summary(post_id):
427 """Aggregate pledges by pledge_type for a thought experiment.
429 Returns a shape the frontend (PledgeSummaryBar) can consume directly:
430 { pledges: { gpu_hours: {total, consumed}, ... }, pledgers: [...],
431 pledger_count: N, user_pledge: {...}|null }
432 """
433 post = g.db.query(Post).filter_by(id=post_id).first()
434 if not post:
435 return _err('Post not found', 404)
437 rows = g.db.query(
438 ComputeEscrow.pledge_type,
439 sa_func.count(ComputeEscrow.id).label('count'),
440 sa_func.sum(ComputeEscrow.spark_amount).label('total_spark'),
441 sa_func.sum(ComputeEscrow.consumed).label('total_consumed'),
442 ).filter(
443 ComputeEscrow.experiment_post_id == str(post_id),
444 ComputeEscrow.status.in_(['pending', 'settled']),
445 ).group_by(ComputeEscrow.pledge_type).all()
447 pledges = {}
448 for row in rows:
449 ptype = row.pledge_type or 'spark'
450 pledges[ptype] = {
451 'total': row.total_spark or 0,
452 'consumed': round(row.total_consumed or 0, 4),
453 'count': row.count,
454 }
456 # Top pledgers (non-anonymous)
457 user_id = str(g.user.id)
458 top_escrows = g.db.query(ComputeEscrow).filter(
459 ComputeEscrow.experiment_post_id == str(post_id),
460 ComputeEscrow.status.in_(['pending', 'settled']),
461 ).order_by(ComputeEscrow.spark_amount.desc()).limit(5).all()
463 pledger_ids = list(dict.fromkeys(e.creditor_node_id for e in top_escrows))
464 pledgers = []
465 for pid in pledger_ids:
466 u = g.db.query(User).filter_by(id=pid).first()
467 if u:
468 pledgers.append({
469 'id': u.id,
470 'username': u.username,
471 'avatar_url': getattr(u, 'avatar_url', None),
472 })
474 pledger_count = g.db.query(
475 sa_func.count(sa_func.distinct(ComputeEscrow.creditor_node_id))
476 ).filter(
477 ComputeEscrow.experiment_post_id == str(post_id),
478 ComputeEscrow.status.in_(['pending', 'settled']),
479 ).scalar() or 0
481 # Current user's pledge
482 user_escrow = g.db.query(ComputeEscrow).filter(
483 ComputeEscrow.experiment_post_id == str(post_id),
484 ComputeEscrow.creditor_node_id == user_id,
485 ComputeEscrow.status.in_(['pending', 'settled']),
486 ).first()
488 user_pledge = None
489 if user_escrow:
490 unit_map = {'gpu_hours': 'hours', 'cloud_credits': 'credits', 'money': 'USD'}
491 user_pledge = {
492 'id': user_escrow.id,
493 'amount': user_escrow.spark_amount,
494 'unit': unit_map.get(user_escrow.pledge_type, 'spark'),
495 'type': user_escrow.pledge_type,
496 }
498 return _ok({
499 'pledges': pledges,
500 'pledgers': pledgers,
501 'pledger_count': pledger_count,
502 'user_pledge': user_pledge,
503 })
506@tracker_bp.route('/experiments/<post_id>/pledge', methods=['POST'])
507@require_auth
508def create_pledge(post_id):
509 """Create a compute pledge (ComputeEscrow) for a thought experiment.
511 Body:
512 pledge_type: 'gpu_hours' | 'cloud_credits' | 'money' (required)
513 spark_amount: int (required, > 0)
514 message: str (optional supporter message)
515 """
516 post = g.db.query(Post).filter_by(id=post_id).first()
517 if not post:
518 return _err('Post not found', 404)
519 if not getattr(post, 'is_thought_experiment', False):
520 return _err('Post is not a thought experiment', 400)
522 data = request.get_json(force=True, silent=True) or {}
523 pledge_type = data.get('pledge_type') or data.get('type')
524 spark_amount = data.get('spark_amount') or data.get('amount')
525 message = data.get('message', '')
527 if pledge_type not in VALID_PLEDGE_TYPES:
528 return _err(f'pledge_type must be one of: {", ".join(VALID_PLEDGE_TYPES)}', 400)
529 if not spark_amount or not isinstance(spark_amount, (int, float)) or spark_amount <= 0:
530 return _err('spark_amount must be a positive number', 400)
531 spark_amount = int(spark_amount)
533 user_id = str(g.user.id)
535 # Verify pledger's node accepts thought experiments (if they have a config)
536 node_config = g.db.query(NodeComputeConfig).filter(
537 NodeComputeConfig.node_id == user_id,
538 ).first()
539 if node_config and not node_config.accept_thought_experiments:
540 return _err('Your node config has accept_thought_experiments disabled', 403)
542 # Create the escrow — debtor is the experiment post author (they receive
543 # the compute), creditor is the pledging user (they supply it).
544 escrow = ComputeEscrow(
545 debtor_node_id=str(post.author_id),
546 creditor_node_id=user_id,
547 request_id=f'experiment_{post_id}',
548 task_type='thought_experiment',
549 spark_amount=spark_amount,
550 status='pending',
551 experiment_post_id=str(post_id),
552 pledge_type=pledge_type,
553 consumed=0.0,
554 pledge_message=message[:500] if message else None,
555 )
556 g.db.add(escrow)
557 g.db.flush()
559 # Award resonance for pledging
560 try:
561 from .resonance_engine import ResonanceService
562 ResonanceService.award_action(g.db, user_id, 'experiment_pledge', source_id=str(escrow.id))
563 except Exception as e:
564 logger.debug(f"Resonance award for experiment_pledge failed: {e}")
566 return _ok(escrow.to_dict(), status=201)
569@tracker_bp.route('/experiments/<post_id>/pledge/<int:escrow_id>', methods=['DELETE'])
570@require_auth
571def withdraw_pledge(post_id, escrow_id):
572 """Withdraw a pledge if nothing has been consumed yet.
574 Only the pledge creator (creditor) or a central admin can withdraw.
575 """
576 escrow = g.db.query(ComputeEscrow).filter_by(
577 id=escrow_id,
578 experiment_post_id=str(post_id),
579 ).first()
580 if not escrow:
581 return _err('Pledge not found', 404)
583 user_id = str(g.user.id)
584 if escrow.creditor_node_id != user_id and not _is_central(g.user):
585 return _err('Only the pledge creator or a central admin can withdraw', 403)
587 if (escrow.consumed or 0) > 0:
588 return _err(
589 f'Cannot withdraw: {escrow.consumed} already consumed. '
590 f'Contact the experiment author to settle.',
591 409,
592 )
594 escrow.status = 'expired'
595 escrow.settled_at = datetime.utcnow()
596 g.db.flush()
598 return _ok({'withdrawn': True, 'escrow_id': escrow_id})
601@tracker_bp.route('/experiments/<post_id>/consume', methods=['POST'])
602@require_central
603def consume_pledge(post_id):
604 """Internal: agent consumes compute from pledged escrows.
606 Only callable by central role (backend/agent orchestration).
608 Body:
609 escrow_id: int (required — which pledge to draw from)
610 amount: float (required — spark units to consume)
611 model_id: str (required — which model was used)
612 node_id: str (required — which node executed the work)
613 tokens_in: int (optional)
614 tokens_out: int (optional)
616 Enforces deterministic budget: consumed + amount <= spark_amount.
617 """
618 data = request.get_json(force=True, silent=True) or {}
619 escrow_id = data.get('escrow_id')
620 amount = data.get('amount')
621 model_id = data.get('model_id', 'unknown')
622 node_id = data.get('node_id', 'unknown')
623 tokens_in = data.get('tokens_in', 0)
624 tokens_out = data.get('tokens_out', 0)
626 if not escrow_id or not amount:
627 return _err('escrow_id and amount are required', 400)
628 if not isinstance(amount, (int, float)) or amount <= 0:
629 return _err('amount must be a positive number', 400)
630 amount = float(amount)
632 escrow = g.db.query(ComputeEscrow).filter_by(
633 id=escrow_id,
634 experiment_post_id=str(post_id),
635 ).first()
636 if not escrow:
637 return _err('Pledge not found for this experiment', 404)
638 if escrow.status not in ('pending', 'settled'):
639 return _err(f'Pledge status is {escrow.status}, cannot consume', 409)
641 # Deterministic budget enforcement
642 current_consumed = escrow.consumed or 0.0
643 remaining = escrow.spark_amount - current_consumed
644 if amount > remaining:
645 return _err(
646 f'Budget exceeded: requested {amount}, remaining {round(remaining, 4)}. '
647 f'Pledged total: {escrow.spark_amount}, already consumed: {round(current_consumed, 4)}.',
648 409,
649 )
651 # Update escrow consumed amount
652 escrow.consumed = round(current_consumed + amount, 4)
653 if escrow.consumed >= escrow.spark_amount:
654 escrow.status = 'settled'
655 escrow.settled_at = datetime.utcnow()
657 # Record the metered usage linked to this escrow
658 usage = MeteredAPIUsage(
659 node_id=node_id,
660 operator_id=escrow.creditor_node_id,
661 model_id=model_id,
662 task_source='experiment',
663 goal_id=None,
664 requester_node_id=escrow.debtor_node_id,
665 tokens_in=tokens_in,
666 tokens_out=tokens_out,
667 estimated_spark_cost=int(amount),
668 settlement_status='settled',
669 escrow_id=escrow.id,
670 experiment_post_id=str(post_id),
671 )
672 g.db.add(usage)
673 g.db.flush()
675 return _ok({
676 'consumed': amount,
677 'total_consumed': escrow.consumed,
678 'remaining': round(escrow.spark_amount - escrow.consumed, 4),
679 'escrow_status': escrow.status,
680 'usage_id': usage.id,
681 })
684@tracker_bp.route('/experiments/<post_id>/insights', methods=['GET'])
685@require_auth
686def experiment_insights(post_id):
687 """Contributor-exclusive deep progress insights for a thought experiment.
689 Central: full access.
690 Contributors (have an active pledge): full access.
691 Others: 403.
692 """
693 post = g.db.query(Post).filter_by(id=post_id).first()
694 if not post:
695 return _err('Post not found', 404)
697 user_id = str(g.user.id)
698 if not _is_central(g.user) and not _is_contributor(g.db, user_id, post_id):
699 return _err('Insights are exclusive to contributors. Pledge compute to unlock.', 403)
701 # Gather goal info
702 goal = _get_goal_for_post(g.db, post_id)
703 goal_info = None
704 ledger_tasks = []
705 if goal:
706 progress = _get_goal_progress(goal.id)
707 ledger_tasks = _get_ledger_tasks(goal.id)
708 goal_info = {
709 'goal_id': goal.id,
710 'status': goal.status,
711 'goal_type': goal.goal_type,
712 'progress': progress,
713 'tasks': ledger_tasks,
714 'needs_review': any(
715 t.get('blocked_reason') == 'APPROVAL_REQUIRED'
716 for t in ledger_tasks
717 ),
718 }
720 # Aggregate consumption for this experiment
721 consumption_rows = g.db.query(
722 MeteredAPIUsage.model_id,
723 sa_func.count(MeteredAPIUsage.id).label('call_count'),
724 sa_func.sum(MeteredAPIUsage.tokens_in).label('total_tokens_in'),
725 sa_func.sum(MeteredAPIUsage.tokens_out).label('total_tokens_out'),
726 sa_func.sum(MeteredAPIUsage.estimated_spark_cost).label('total_spark_cost'),
727 ).filter(
728 MeteredAPIUsage.experiment_post_id == str(post_id),
729 ).group_by(MeteredAPIUsage.model_id).all()
731 consumption = []
732 for row in consumption_rows:
733 consumption.append({
734 'model_id': row.model_id,
735 'call_count': row.call_count,
736 'total_tokens_in': row.total_tokens_in or 0,
737 'total_tokens_out': row.total_tokens_out or 0,
738 'total_spark_cost': row.total_spark_cost or 0,
739 })
741 # Pledge summary for context
742 pledges = g.db.query(ComputeEscrow).filter(
743 ComputeEscrow.experiment_post_id == str(post_id),
744 ComputeEscrow.status.in_(['pending', 'settled']),
745 ).all()
746 total_pledged = sum(p.spark_amount for p in pledges)
747 total_consumed = sum(p.consumed or 0 for p in pledges)
749 return _ok({
750 'post_id': post_id,
751 'goal': goal_info,
752 'budget': {
753 'total_pledged_spark': total_pledged,
754 'total_consumed_spark': round(total_consumed, 4),
755 'remaining_spark': round(total_pledged - total_consumed, 4),
756 'pledge_count': len(pledges),
757 },
758 'consumption_by_model': consumption,
759 })
762# ═══════════════════════════════════════════════════════════════
763# MY PLEDGES / ADMIN VIEW / NODE VERIFICATION
764# ═══════════════════════════════════════════════════════════════
766@tracker_bp.route('/pledges/mine', methods=['GET'])
767@require_auth
768def my_pledges():
769 """Get all pledges made by the current user across experiments."""
770 user_id = str(g.user.id)
771 limit = request.args.get('limit', 50, type=int)
772 offset = request.args.get('offset', 0, type=int)
773 status_filter = request.args.get('status')
775 q = g.db.query(ComputeEscrow).filter(
776 ComputeEscrow.creditor_node_id == user_id,
777 ComputeEscrow.experiment_post_id.isnot(None),
778 )
779 if status_filter:
780 q = q.filter_by(status=status_filter)
782 total = q.count()
783 escrows = q.order_by(
784 ComputeEscrow.created_at.desc()).offset(offset).limit(limit).all()
786 return _ok(
787 [e.to_dict() for e in escrows],
788 meta={'total': total, 'limit': limit, 'offset': offset,
789 'has_more': offset + limit < total},
790 )
793@tracker_bp.route('/pledges/all', methods=['GET'])
794@require_central
795def all_pledges():
796 """Central admin: view all experiment pledges system-wide."""
797 limit = request.args.get('limit', 100, type=int)
798 offset = request.args.get('offset', 0, type=int)
799 status_filter = request.args.get('status')
800 pledge_type = request.args.get('pledge_type')
801 post_id = request.args.get('post_id')
803 q = g.db.query(ComputeEscrow).filter(
804 ComputeEscrow.experiment_post_id.isnot(None),
805 )
806 if status_filter:
807 q = q.filter_by(status=status_filter)
808 if pledge_type and pledge_type in VALID_PLEDGE_TYPES:
809 q = q.filter_by(pledge_type=pledge_type)
810 if post_id:
811 q = q.filter_by(experiment_post_id=str(post_id))
813 total = q.count()
814 escrows = q.order_by(
815 ComputeEscrow.created_at.desc()).offset(offset).limit(limit).all()
817 return _ok(
818 [e.to_dict() for e in escrows],
819 meta={'total': total, 'limit': limit, 'offset': offset,
820 'has_more': offset + limit < total},
821 )
824@tracker_bp.route('/pledges/<int:escrow_id>/verify', methods=['POST'])
825@require_auth
826def verify_pledge(escrow_id):
827 """Verify node capacity for a gpu_hours pledge.
829 Only the pledge owner or a regional/central admin can verify.
830 """
831 escrow = g.db.query(ComputeEscrow).filter_by(id=escrow_id).first()
832 if not escrow:
833 return _err('Pledge not found', 404)
835 user_id = str(g.user.id)
836 user_role = getattr(g.user, 'role', None) or 'flat'
837 is_admin = user_role in ('central', 'regional') or getattr(g.user, 'is_admin', False)
838 if escrow.creditor_node_id != user_id and not is_admin:
839 return _err('Access denied', 403)
841 if escrow.pledge_type != 'gpu_hours':
842 return _err('Only gpu_hours pledges require node verification', 400)
844 node_config = g.db.query(NodeComputeConfig).filter_by(
845 node_id=escrow.creditor_node_id).first()
847 capacity_ok = True
848 capacity_details = {}
849 if node_config and node_config.offered_gpu_hours_per_day > 0:
850 max_monthly = node_config.offered_gpu_hours_per_day * 30
851 capacity_details = {
852 'offered_daily': node_config.offered_gpu_hours_per_day,
853 'max_monthly': round(max_monthly, 1),
854 'pledged': escrow.spark_amount,
855 'within_capacity': escrow.spark_amount <= max_monthly,
856 }
857 capacity_ok = escrow.spark_amount <= max_monthly
858 else:
859 capacity_details = {'warning': 'No NodeComputeConfig found'}
861 return _ok({
862 'verified': capacity_ok,
863 'capacity': capacity_details,
864 'escrow': escrow.to_dict(),
865 })
868# ── Hive View Endpoints (extend tracker, no separate blueprint) ──────
871@tracker_bp.route('/experiments/<post_id>/inject', methods=['POST'])
872@require_auth
873def inject_variable(post_id):
874 """God's-eye variable injection — push new context into a running agent."""
875 data = request.get_json(silent=True) or {}
876 variable = data.get('variable', '')
877 injection_type = data.get('injection_type', 'info') # constraint | info | question
879 if not variable:
880 return _err('variable is required', 400)
882 goal = _get_goal_for_post(g.db, post_id)
883 if not goal:
884 return _err('No active agent for this experiment', 404)
886 try:
887 from integrations.channels.memory.memory_graph import MemoryGraph
888 session_key = f"{goal.owner_id}_{goal.prompt_id}" if goal.prompt_id else str(goal.owner_id)
889 try:
890 from core.platform_paths import get_memory_graph_dir
891 db_path = get_memory_graph_dir(session_key)
892 except ImportError:
893 db_path = os.path.join(
894 os.path.expanduser("~"), "Documents", "Nunba", "data",
895 "memory_graph", session_key)
896 os.makedirs(db_path, exist_ok=True)
897 graph = MemoryGraph(db_path=db_path, user_id=str(goal.owner_id))
898 memory_id = graph.register(
899 content=f"[INJECTED {injection_type.upper()}] {variable}",
900 metadata={
901 'memory_type': 'injection', 'injection_type': injection_type,
902 'source_agent': 'god_eye', 'session_id': session_key,
903 'injected_by': g.user_id,
904 'injected_at': datetime.utcnow().isoformat(),
905 },
906 context_snapshot=f"God's-eye {injection_type} injection during experiment",
907 )
909 # Notify live UIs
910 try:
911 from .realtime import publish_event
912 publish_event('chat.social', {
913 'type': 'hive_injection', 'goal_id': goal.id,
914 'injection_type': injection_type, 'variable': variable[:200],
915 }, user_id=goal.owner_id)
916 except Exception:
917 pass
919 return _ok({'memory_id': memory_id, 'injection_type': injection_type,
920 'message': f'{injection_type.capitalize()} injected into agent context.'})
921 except Exception as e:
922 logger.error("Variable injection failed: %s", e)
923 return _err(str(e), 500)
926@tracker_bp.route('/experiments/<post_id>/interview', methods=['POST'])
927@require_auth
928def interview_agent(post_id):
929 """Post-experiment agent interview — ask any agent about its reasoning."""
930 data = request.get_json(silent=True) or {}
931 question = data.get('question', '')
932 if not question:
933 return _err('question is required', 400)
935 goal = _get_goal_for_post(g.db, post_id)
936 if not goal:
937 return _err('No agent for this experiment', 404)
939 try:
940 from core.http_pool import pooled_post
941 from core.port_registry import get_port
942 chat_url = f"http://localhost:{get_port('backend')}/chat"
944 interview_prompt = (
945 f"[INTERVIEW MODE — You are being asked about your reasoning on the experiment "
946 f"'{goal.title}'. Reflect on your work and explain your thought process.]\n\n"
947 f"Question: {question}"
948 )
950 resp = pooled_post(chat_url, json={
951 'user_id': goal.owner_id,
952 'prompt_id': goal.prompt_id or 0,
953 'prompt': interview_prompt,
954 }, timeout=60)
956 if resp.status_code == 200:
957 result = resp.json()
958 return _ok({'question': question, 'answer': result.get('response', 'No response.'),
959 'goal_id': goal.id})
960 else:
961 return _err(f'Agent returned {resp.status_code}', 502)
962 except Exception as e:
963 logger.error("Agent interview failed: %s", e)
964 return _err(str(e), 500)
967@tracker_bp.route('/dual-context', methods=['POST'])
968@require_auth
969def launch_dual_context():
970 """Clone an experiment into N parallel contexts with different overrides."""
971 import uuid as _uuid
972 from .models import AgentGoal
974 data = request.get_json(silent=True) or {}
975 source_post_id = data.get('post_id')
976 contexts = data.get('contexts', [])
978 if not source_post_id or not contexts or len(contexts) < 2:
979 return _err('post_id and at least 2 contexts required', 400)
981 source_goal = _get_goal_for_post(g.db, source_post_id)
982 if not source_goal:
983 return _err('No agent for this experiment', 404)
985 new_goals = []
986 for ctx in contexts:
987 new_id = str(_uuid.uuid4())[:16]
988 cfg = dict(source_goal.config_json or {})
989 cfg['dual_context_label'] = ctx.get('label', 'variant')
990 cfg['system_prompt_override'] = ctx.get('system_prompt_override', '')
991 cfg['source_goal_id'] = source_goal.id
993 new_goal = AgentGoal(
994 id=new_id, owner_id=source_goal.owner_id,
995 goal_type=source_goal.goal_type,
996 title=f"{source_goal.title} [{ctx.get('label', 'variant')}]",
997 description=source_goal.description,
998 status='active', priority=source_goal.priority,
999 config_json=cfg, spark_budget=source_goal.spark_budget,
1000 created_by=g.user_id,
1001 )
1002 g.db.add(new_goal)
1003 new_goals.append({'goal_id': new_id, 'label': ctx.get('label', 'variant'), 'status': 'active'})
1005 g.db.flush()
1006 return jsonify({'success': True, 'data': {
1007 'source_post_id': source_post_id, 'contexts': new_goals,
1008 'message': f'{len(new_goals)} parallel contexts launched.',
1009 }}), 201
1012@tracker_bp.route('/encounters', methods=['GET'])
1013@require_auth
1014def get_encounter_graph():
1015 """Agent collaboration graph from Encounter data."""
1016 try:
1017 from .models import Encounter
1018 encounters = g.db.query(Encounter).filter(
1019 Encounter.bond_level > 0,
1020 ).order_by(Encounter.latest_at.desc()).limit(200).all()
1022 user_ids = set()
1023 for e in encounters:
1024 user_ids.add(e.user_a_id)
1025 user_ids.add(e.user_b_id)
1027 users = {}
1028 if user_ids:
1029 user_rows = g.db.query(User).filter(User.id.in_(list(user_ids))).all()
1030 users = {u.id: {'id': u.id, 'name': u.display_name or u.username,
1031 'type': getattr(u, 'user_type', 'human')} for u in user_rows}
1033 return _ok({
1034 'nodes': list(users.values()),
1035 'edges': [{'source': e.user_a_id, 'target': e.user_b_id,
1036 'bond_level': e.bond_level, 'encounter_count': e.encounter_count,
1037 'context_type': e.context_type} for e in encounters],
1038 })
1039 except Exception as e:
1040 logger.error("Encounter graph failed: %s", e)
1041 return _err(str(e), 500)