Coverage for integrations / social / api_tracker.py: 20.1%

468 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HARTSocial - Thought Experiment Tracker API Blueprint 

3 

4Aggregation layer over Post, AgentGoal, DistributedTaskCoordinator, 

5MemoryGraph, and Agent Ledger for the tracker UI. 

6 

715 endpoints at /api/social/tracker/*: 

8 - 6 original (list/detail/conversations/approve/reject/notifications) 

9 - 6 pledge endpoints (list/summary/create/withdraw/consume/insights) 

10 - 3 admin/user pledge endpoints (mine/all/verify) 

11""" 

12import os 

13import logging 

14from datetime import datetime 

15from flask import Blueprint, request, jsonify, g 

16from sqlalchemy import func as sa_func 

17 

18from .auth import require_auth, require_central 

19from .models import get_db, Post, User, ComputeEscrow, MeteredAPIUsage, NodeComputeConfig 

20from .services import NotificationService, PostService 

21 

22logger = logging.getLogger('hevolve_social') 

23 

24tracker_bp = Blueprint('tracker', __name__, url_prefix='/api/social/tracker') 

25 

26 

27def _ok(data=None, meta=None, status=200): 

28 r = {'success': True} 

29 if data is not None: 

30 r['data'] = data 

31 if meta is not None: 

32 r['meta'] = meta 

33 return jsonify(r), status 

34 

35 

36def _err(msg, status=400): 

37 return jsonify({'success': False, 'error': msg}), status 

38 

39 

40def _get_goal_for_post(db, post_id): 

41 """Find AgentGoal linked to a thought experiment post.""" 

42 try: 

43 from integrations.social.models import AgentGoal 

44 goals = db.query(AgentGoal).filter( 

45 AgentGoal.goal_type == 'thought_experiment', 

46 AgentGoal.status.in_(['active', 'paused', 'completed']), 

47 ).all() 

48 for goal in goals: 

49 cfg = goal.config_json or {} 

50 if cfg.get('post_id') == str(post_id): 

51 return goal 

52 except Exception as e: 

53 logger.debug(f"Goal lookup failed: {e}") 

54 return None 

55 

56 

57def _get_goal_progress(goal_id): 

58 """Get distributed task progress for a goal (Redis-backed).""" 

59 try: 

60 from integrations.distributed_agent.task_coordinator import DistributedTaskCoordinator 

61 import redis 

62 r = redis.Redis(host='localhost', port=6379, decode_responses=True, 

63 socket_connect_timeout=1, socket_timeout=2, 

64 retry_on_timeout=False) 

65 r.ping() 

66 coordinator = DistributedTaskCoordinator(redis_client=r) 

67 return coordinator.get_goal_progress(goal_id) 

68 except Exception as e: 

69 logger.debug(f"Goal progress unavailable: {e}") 

70 return None 

71 

72 

73def _get_agent_conversations(user_id, prompt_id, limit=50): 

74 """Get conversation history from MemoryGraph for an agent session.""" 

75 try: 

76 from integrations.channels.memory.memory_graph import MemoryGraph 

77 session_key = f"{user_id}_{prompt_id}" if prompt_id else str(user_id) 

78 try: 

79 from core.platform_paths import get_memory_graph_dir 

80 db_path = get_memory_graph_dir(session_key) 

81 except ImportError: 

82 db_path = os.path.join( 

83 os.path.expanduser("~"), "Documents", "Nunba", "data", 

84 "memory_graph", session_key, 

85 ) 

86 if not os.path.exists(db_path): 

87 return [] 

88 graph = MemoryGraph(db_path=db_path, user_id=str(user_id)) 

89 nodes = graph.get_session_memories(session_key, limit=limit) 

90 return [ 

91 { 

92 'id': n.id, 

93 'role': n.metadata.get('role', 'system'), 

94 'content': n.content, 

95 'timestamp': n.created_at, 

96 'session_key': session_key, 

97 'category': n.category, 

98 } 

99 for n in nodes 

100 ] 

101 except Exception as e: 

102 logger.debug(f"Conversation fetch failed: {e}") 

103 return [] 

104 

105 

106def _get_ledger_tasks(goal_id): 

107 """Get Agent Ledger tasks for a goal, including HITL blocked tasks.""" 

108 try: 

109 from agent_ledger.core import SmartLedger 

110 ledger = SmartLedger() 

111 parent = ledger.get_task(goal_id) 

112 if not parent: 

113 return [] 

114 tasks = [] 

115 for child_id in (parent.child_task_ids or []): 

116 task = ledger.get_task(child_id) 

117 if task: 

118 tasks.append({ 

119 'id': task.id, 

120 'description': task.description, 

121 'status': task.status.value if hasattr(task.status, 'value') else str(task.status), 

122 'progress_pct': task.progress_pct, 

123 'blocked_reason': (task.blocked_reason.value 

124 if hasattr(task.blocked_reason, 'value') 

125 else str(task.blocked_reason)) if task.blocked_reason else None, 

126 'assigned_agent': task.assigned_to, 

127 }) 

128 return tasks 

129 except Exception as e: 

130 logger.debug(f"Ledger tasks unavailable: {e}") 

131 return [] 

132 

133 

134# ─── Endpoints ─── 

135 

136 

137@tracker_bp.route('/experiments', methods=['GET']) 

138@require_auth 

139def list_experiments(): 

140 """List thought experiment posts with agent goal status.""" 

141 limit = request.args.get('limit', 20, type=int) 

142 offset = request.args.get('offset', 0, type=int) 

143 filter_type = request.args.get('filter', 'all') # all | mine | needs_review 

144 

145 q = g.db.query(Post).filter(Post.is_thought_experiment == True) 

146 

147 if filter_type == 'mine': 

148 q = q.filter(Post.author_id == str(g.user.id)) 

149 

150 total = q.count() 

151 posts = q.order_by(Post.created_at.desc()).offset(offset).limit(limit).all() 

152 

153 experiments = [] 

154 for post in posts: 

155 post_dict = post.to_dict(include_author=True) 

156 goal = _get_goal_for_post(g.db, post.id) 

157 

158 goal_info = None 

159 needs_review = False 

160 if goal: 

161 progress = _get_goal_progress(goal.id) 

162 ledger_tasks = _get_ledger_tasks(goal.id) 

163 needs_review = any( 

164 t.get('blocked_reason') == 'APPROVAL_REQUIRED' 

165 for t in ledger_tasks 

166 ) 

167 goal_info = { 

168 'goal_id': goal.id, 

169 'status': goal.status, 

170 'goal_type': goal.goal_type, 

171 'prompt_id': goal.prompt_id, 

172 'progress': progress, 

173 'task_count': len(ledger_tasks), 

174 'needs_review': needs_review, 

175 } 

176 

177 if filter_type == 'needs_review' and not needs_review: 

178 continue 

179 

180 experiments.append({ 

181 **post_dict, 

182 'goal': goal_info, 

183 }) 

184 

185 meta = {'total': total, 'limit': limit, 'offset': offset, 

186 'has_more': offset + limit < total} 

187 return _ok(experiments, meta=meta) 

188 

189 

190@tracker_bp.route('/experiments/<post_id>', methods=['GET']) 

191@require_auth 

192def get_experiment(post_id): 

193 """Single experiment detail with full agent progress.""" 

194 post = g.db.query(Post).filter_by(id=post_id).first() 

195 if not post: 

196 return _err('Post not found', 404) 

197 

198 post_dict = post.to_dict(include_author=True) 

199 goal = _get_goal_for_post(g.db, post_id) 

200 

201 goal_info = None 

202 ledger_tasks = [] 

203 if goal: 

204 progress = _get_goal_progress(goal.id) 

205 ledger_tasks = _get_ledger_tasks(goal.id) 

206 goal_info = { 

207 'goal_id': goal.id, 

208 'status': goal.status, 

209 'goal_type': goal.goal_type, 

210 'prompt_id': goal.prompt_id, 

211 'progress': progress, 

212 'tasks': ledger_tasks, 

213 'needs_review': any( 

214 t.get('blocked_reason') == 'APPROVAL_REQUIRED' 

215 for t in ledger_tasks 

216 ), 

217 'config': goal.config_json, 

218 } 

219 

220 return _ok({ 

221 **post_dict, 

222 'goal': goal_info, 

223 }) 

224 

225 

226@tracker_bp.route('/experiments/<post_id>/conversations', methods=['GET']) 

227@require_auth 

228def get_conversations(post_id): 

229 """Agent conversation history for a thought experiment.""" 

230 post = g.db.query(Post).filter_by(id=post_id).first() 

231 if not post: 

232 return _err('Post not found', 404) 

233 

234 goal = _get_goal_for_post(g.db, post_id) 

235 if not goal: 

236 return _ok({'conversations': [], 'agents': []}) 

237 

238 # Get conversations from all agent sessions working on this goal 

239 conversations = [] 

240 agents_seen = {} 

241 

242 # Primary agent session (from goal's prompt_id) 

243 if goal.prompt_id: 

244 # The daemon sets prompt_id as f"{goal_type}_{goal_id[:8]}" 

245 # The MemoryGraph session key is f"{user_id}_{prompt_id}" 

246 # We need to find agents that worked on this goal 

247 try: 

248 from integrations.social.models import AgentGoal 

249 # Check if goal has a created_by user 

250 if goal.created_by and goal.created_by != 'system_bootstrap': 

251 msgs = _get_agent_conversations(goal.created_by, goal.prompt_id) 

252 if msgs: 

253 conversations.extend(msgs) 

254 agents_seen[goal.created_by] = { 

255 'user_id': goal.created_by, 

256 'prompt_id': goal.prompt_id, 

257 'role': 'primary', 

258 } 

259 except Exception as e: 

260 logger.debug(f"Primary conversation fetch: {e}") 

261 

262 # Sort conversations chronologically 

263 conversations.sort(key=lambda c: c.get('timestamp', '')) 

264 

265 return _ok({ 

266 'conversations': conversations, 

267 'agents': list(agents_seen.values()), 

268 }) 

269 

270 

271@tracker_bp.route('/experiments/<post_id>/approve', methods=['POST']) 

272@require_auth 

273def approve_task(post_id): 

274 """HITL approval - unblocks APPROVAL_REQUIRED tasks for this experiment.""" 

275 post = g.db.query(Post).filter_by(id=post_id).first() 

276 if not post: 

277 return _err('Post not found', 404) 

278 

279 goal = _get_goal_for_post(g.db, post_id) 

280 if not goal: 

281 return _err('No goal found for this experiment', 404) 

282 

283 data = request.get_json(force=True, silent=True) or {} 

284 task_id = data.get('task_id') # Optional: approve specific task 

285 

286 try: 

287 from agent_ledger.core import SmartLedger, TaskStatus 

288 ledger = SmartLedger() 

289 

290 unblocked = 0 

291 tasks = _get_ledger_tasks(goal.id) 

292 for t in tasks: 

293 if t.get('blocked_reason') == 'APPROVAL_REQUIRED': 

294 if task_id and t['id'] != task_id: 

295 continue 

296 task_obj = ledger.get_task(t['id']) 

297 if task_obj: 

298 task_obj.status = TaskStatus.IN_PROGRESS 

299 task_obj.blocked_reason = None 

300 ledger.update_task(task_obj) 

301 unblocked += 1 

302 

303 return _ok({'unblocked': unblocked, 'goal_id': goal.id}) 

304 except Exception as e: 

305 logger.debug(f"Approve failed: {e}") 

306 return _ok({'unblocked': 0, 'message': 'Ledger unavailable, approval recorded'}) 

307 

308 

309@tracker_bp.route('/experiments/<post_id>/reject', methods=['POST']) 

310@require_auth 

311def reject_task(post_id): 

312 """HITL rejection - fails APPROVAL_REQUIRED tasks for this experiment.""" 

313 post = g.db.query(Post).filter_by(id=post_id).first() 

314 if not post: 

315 return _err('Post not found', 404) 

316 

317 goal = _get_goal_for_post(g.db, post_id) 

318 if not goal: 

319 return _err('No goal found for this experiment', 404) 

320 

321 data = request.get_json(force=True, silent=True) or {} 

322 task_id = data.get('task_id') 

323 reason = data.get('reason', 'Rejected by human reviewer') 

324 

325 try: 

326 from agent_ledger.core import SmartLedger, TaskStatus, FailureReason 

327 ledger = SmartLedger() 

328 

329 rejected = 0 

330 tasks = _get_ledger_tasks(goal.id) 

331 for t in tasks: 

332 if t.get('blocked_reason') == 'APPROVAL_REQUIRED': 

333 if task_id and t['id'] != task_id: 

334 continue 

335 task_obj = ledger.get_task(t['id']) 

336 if task_obj: 

337 task_obj.status = TaskStatus.FAILED 

338 task_obj.failure_reason = FailureReason.VALIDATION_FAILED 

339 task_obj.blocked_reason = None 

340 ledger.update_task(task_obj) 

341 rejected += 1 

342 

343 return _ok({'rejected': rejected, 'goal_id': goal.id}) 

344 except Exception as e: 

345 logger.debug(f"Reject failed: {e}") 

346 return _ok({'rejected': 0, 'message': 'Ledger unavailable, rejection recorded'}) 

347 

348 

349@tracker_bp.route('/notifications', methods=['GET']) 

350@require_auth 

351def get_tracker_notifications(): 

352 """HITL-relevant notifications for the current user.""" 

353 limit = request.args.get('limit', 20, type=int) 

354 offset = request.args.get('offset', 0, type=int) 

355 

356 notifs, total = NotificationService.get_for_user( 

357 g.db, str(g.user.id), limit=limit, offset=offset, 

358 ) 

359 

360 # Filter to tracker-relevant types 

361 tracker_types = {'goal_contribution', 'goal_verified', 'approval_required'} 

362 filtered = [ 

363 n.to_dict() for n in notifs 

364 if n.type in tracker_types 

365 ] 

366 

367 return _ok(filtered, meta={ 

368 'total': len(filtered), 

369 'limit': limit, 

370 'offset': offset, 

371 }) 

372 

373 

374# ═══════════════════════════════════════════════════════════════ 

375# COMPUTE PLEDGE ENDPOINTS (6 endpoints) 

376# ═══════════════════════════════════════════════════════════════ 

377 

378VALID_PLEDGE_TYPES = ('gpu_hours', 'cloud_credits', 'money') 

379 

380 

381def _is_contributor(db, user_id, post_id): 

382 """Return True if user has an active/settled pledge for this experiment.""" 

383 return db.query(ComputeEscrow).filter( 

384 ComputeEscrow.experiment_post_id == str(post_id), 

385 ComputeEscrow.creditor_node_id == str(user_id), 

386 ComputeEscrow.status.in_(['pending', 'settled']), 

387 ).first() is not None 

388 

389 

390def _is_central(user): 

391 """Check if user has central (admin) role.""" 

392 role = getattr(user, 'role', None) or 'flat' 

393 return role == 'central' or getattr(user, 'is_admin', False) 

394 

395 

396@tracker_bp.route('/experiments/<post_id>/pledges', methods=['GET']) 

397@require_auth 

398def list_pledges(post_id): 

399 """List compute escrows pledged to a thought experiment. 

400 

401 Central role: sees all pledges with full detail. 

402 Contributors: see all pledges (peer visibility). 

403 Others: see anonymised count only (use pledge-summary instead). 

404 """ 

405 post = g.db.query(Post).filter_by(id=post_id).first() 

406 if not post: 

407 return _err('Post not found', 404) 

408 

409 pledges = g.db.query(ComputeEscrow).filter( 

410 ComputeEscrow.experiment_post_id == str(post_id), 

411 ).order_by(ComputeEscrow.created_at.desc()).all() 

412 

413 user_id = str(g.user.id) 

414 if _is_central(g.user) or _is_contributor(g.db, user_id, post_id): 

415 return _ok([p.to_dict() for p in pledges]) 

416 

417 # Non-contributors get count + types only 

418 return _ok({ 

419 'count': len(pledges), 

420 'message': 'Pledge to this experiment to see full contributor details', 

421 }) 

422 

423 

424@tracker_bp.route('/experiments/<post_id>/pledge-summary', methods=['GET']) 

425@require_auth 

426def pledge_summary(post_id): 

427 """Aggregate pledges by pledge_type for a thought experiment. 

428 

429 Returns a shape the frontend (PledgeSummaryBar) can consume directly: 

430 { pledges: { gpu_hours: {total, consumed}, ... }, pledgers: [...], 

431 pledger_count: N, user_pledge: {...}|null } 

432 """ 

433 post = g.db.query(Post).filter_by(id=post_id).first() 

434 if not post: 

435 return _err('Post not found', 404) 

436 

437 rows = g.db.query( 

438 ComputeEscrow.pledge_type, 

439 sa_func.count(ComputeEscrow.id).label('count'), 

440 sa_func.sum(ComputeEscrow.spark_amount).label('total_spark'), 

441 sa_func.sum(ComputeEscrow.consumed).label('total_consumed'), 

442 ).filter( 

443 ComputeEscrow.experiment_post_id == str(post_id), 

444 ComputeEscrow.status.in_(['pending', 'settled']), 

445 ).group_by(ComputeEscrow.pledge_type).all() 

446 

447 pledges = {} 

448 for row in rows: 

449 ptype = row.pledge_type or 'spark' 

450 pledges[ptype] = { 

451 'total': row.total_spark or 0, 

452 'consumed': round(row.total_consumed or 0, 4), 

453 'count': row.count, 

454 } 

455 

456 # Top pledgers (non-anonymous) 

457 user_id = str(g.user.id) 

458 top_escrows = g.db.query(ComputeEscrow).filter( 

459 ComputeEscrow.experiment_post_id == str(post_id), 

460 ComputeEscrow.status.in_(['pending', 'settled']), 

461 ).order_by(ComputeEscrow.spark_amount.desc()).limit(5).all() 

462 

463 pledger_ids = list(dict.fromkeys(e.creditor_node_id for e in top_escrows)) 

464 pledgers = [] 

465 for pid in pledger_ids: 

466 u = g.db.query(User).filter_by(id=pid).first() 

467 if u: 

468 pledgers.append({ 

469 'id': u.id, 

470 'username': u.username, 

471 'avatar_url': getattr(u, 'avatar_url', None), 

472 }) 

473 

474 pledger_count = g.db.query( 

475 sa_func.count(sa_func.distinct(ComputeEscrow.creditor_node_id)) 

476 ).filter( 

477 ComputeEscrow.experiment_post_id == str(post_id), 

478 ComputeEscrow.status.in_(['pending', 'settled']), 

479 ).scalar() or 0 

480 

481 # Current user's pledge 

482 user_escrow = g.db.query(ComputeEscrow).filter( 

483 ComputeEscrow.experiment_post_id == str(post_id), 

484 ComputeEscrow.creditor_node_id == user_id, 

485 ComputeEscrow.status.in_(['pending', 'settled']), 

486 ).first() 

487 

488 user_pledge = None 

489 if user_escrow: 

490 unit_map = {'gpu_hours': 'hours', 'cloud_credits': 'credits', 'money': 'USD'} 

491 user_pledge = { 

492 'id': user_escrow.id, 

493 'amount': user_escrow.spark_amount, 

494 'unit': unit_map.get(user_escrow.pledge_type, 'spark'), 

495 'type': user_escrow.pledge_type, 

496 } 

497 

498 return _ok({ 

499 'pledges': pledges, 

500 'pledgers': pledgers, 

501 'pledger_count': pledger_count, 

502 'user_pledge': user_pledge, 

503 }) 

504 

505 

506@tracker_bp.route('/experiments/<post_id>/pledge', methods=['POST']) 

507@require_auth 

508def create_pledge(post_id): 

509 """Create a compute pledge (ComputeEscrow) for a thought experiment. 

510 

511 Body: 

512 pledge_type: 'gpu_hours' | 'cloud_credits' | 'money' (required) 

513 spark_amount: int (required, > 0) 

514 message: str (optional supporter message) 

515 """ 

516 post = g.db.query(Post).filter_by(id=post_id).first() 

517 if not post: 

518 return _err('Post not found', 404) 

519 if not getattr(post, 'is_thought_experiment', False): 

520 return _err('Post is not a thought experiment', 400) 

521 

522 data = request.get_json(force=True, silent=True) or {} 

523 pledge_type = data.get('pledge_type') or data.get('type') 

524 spark_amount = data.get('spark_amount') or data.get('amount') 

525 message = data.get('message', '') 

526 

527 if pledge_type not in VALID_PLEDGE_TYPES: 

528 return _err(f'pledge_type must be one of: {", ".join(VALID_PLEDGE_TYPES)}', 400) 

529 if not spark_amount or not isinstance(spark_amount, (int, float)) or spark_amount <= 0: 

530 return _err('spark_amount must be a positive number', 400) 

531 spark_amount = int(spark_amount) 

532 

533 user_id = str(g.user.id) 

534 

535 # Verify pledger's node accepts thought experiments (if they have a config) 

536 node_config = g.db.query(NodeComputeConfig).filter( 

537 NodeComputeConfig.node_id == user_id, 

538 ).first() 

539 if node_config and not node_config.accept_thought_experiments: 

540 return _err('Your node config has accept_thought_experiments disabled', 403) 

541 

542 # Create the escrow — debtor is the experiment post author (they receive 

543 # the compute), creditor is the pledging user (they supply it). 

544 escrow = ComputeEscrow( 

545 debtor_node_id=str(post.author_id), 

546 creditor_node_id=user_id, 

547 request_id=f'experiment_{post_id}', 

548 task_type='thought_experiment', 

549 spark_amount=spark_amount, 

550 status='pending', 

551 experiment_post_id=str(post_id), 

552 pledge_type=pledge_type, 

553 consumed=0.0, 

554 pledge_message=message[:500] if message else None, 

555 ) 

556 g.db.add(escrow) 

557 g.db.flush() 

558 

559 # Award resonance for pledging 

560 try: 

561 from .resonance_engine import ResonanceService 

562 ResonanceService.award_action(g.db, user_id, 'experiment_pledge', source_id=str(escrow.id)) 

563 except Exception as e: 

564 logger.debug(f"Resonance award for experiment_pledge failed: {e}") 

565 

566 return _ok(escrow.to_dict(), status=201) 

567 

568 

569@tracker_bp.route('/experiments/<post_id>/pledge/<int:escrow_id>', methods=['DELETE']) 

570@require_auth 

571def withdraw_pledge(post_id, escrow_id): 

572 """Withdraw a pledge if nothing has been consumed yet. 

573 

574 Only the pledge creator (creditor) or a central admin can withdraw. 

575 """ 

576 escrow = g.db.query(ComputeEscrow).filter_by( 

577 id=escrow_id, 

578 experiment_post_id=str(post_id), 

579 ).first() 

580 if not escrow: 

581 return _err('Pledge not found', 404) 

582 

583 user_id = str(g.user.id) 

584 if escrow.creditor_node_id != user_id and not _is_central(g.user): 

585 return _err('Only the pledge creator or a central admin can withdraw', 403) 

586 

587 if (escrow.consumed or 0) > 0: 

588 return _err( 

589 f'Cannot withdraw: {escrow.consumed} already consumed. ' 

590 f'Contact the experiment author to settle.', 

591 409, 

592 ) 

593 

594 escrow.status = 'expired' 

595 escrow.settled_at = datetime.utcnow() 

596 g.db.flush() 

597 

598 return _ok({'withdrawn': True, 'escrow_id': escrow_id}) 

599 

600 

601@tracker_bp.route('/experiments/<post_id>/consume', methods=['POST']) 

602@require_central 

603def consume_pledge(post_id): 

604 """Internal: agent consumes compute from pledged escrows. 

605 

606 Only callable by central role (backend/agent orchestration). 

607 

608 Body: 

609 escrow_id: int (required — which pledge to draw from) 

610 amount: float (required — spark units to consume) 

611 model_id: str (required — which model was used) 

612 node_id: str (required — which node executed the work) 

613 tokens_in: int (optional) 

614 tokens_out: int (optional) 

615 

616 Enforces deterministic budget: consumed + amount <= spark_amount. 

617 """ 

618 data = request.get_json(force=True, silent=True) or {} 

619 escrow_id = data.get('escrow_id') 

620 amount = data.get('amount') 

621 model_id = data.get('model_id', 'unknown') 

622 node_id = data.get('node_id', 'unknown') 

623 tokens_in = data.get('tokens_in', 0) 

624 tokens_out = data.get('tokens_out', 0) 

625 

626 if not escrow_id or not amount: 

627 return _err('escrow_id and amount are required', 400) 

628 if not isinstance(amount, (int, float)) or amount <= 0: 

629 return _err('amount must be a positive number', 400) 

630 amount = float(amount) 

631 

632 escrow = g.db.query(ComputeEscrow).filter_by( 

633 id=escrow_id, 

634 experiment_post_id=str(post_id), 

635 ).first() 

636 if not escrow: 

637 return _err('Pledge not found for this experiment', 404) 

638 if escrow.status not in ('pending', 'settled'): 

639 return _err(f'Pledge status is {escrow.status}, cannot consume', 409) 

640 

641 # Deterministic budget enforcement 

642 current_consumed = escrow.consumed or 0.0 

643 remaining = escrow.spark_amount - current_consumed 

644 if amount > remaining: 

645 return _err( 

646 f'Budget exceeded: requested {amount}, remaining {round(remaining, 4)}. ' 

647 f'Pledged total: {escrow.spark_amount}, already consumed: {round(current_consumed, 4)}.', 

648 409, 

649 ) 

650 

651 # Update escrow consumed amount 

652 escrow.consumed = round(current_consumed + amount, 4) 

653 if escrow.consumed >= escrow.spark_amount: 

654 escrow.status = 'settled' 

655 escrow.settled_at = datetime.utcnow() 

656 

657 # Record the metered usage linked to this escrow 

658 usage = MeteredAPIUsage( 

659 node_id=node_id, 

660 operator_id=escrow.creditor_node_id, 

661 model_id=model_id, 

662 task_source='experiment', 

663 goal_id=None, 

664 requester_node_id=escrow.debtor_node_id, 

665 tokens_in=tokens_in, 

666 tokens_out=tokens_out, 

667 estimated_spark_cost=int(amount), 

668 settlement_status='settled', 

669 escrow_id=escrow.id, 

670 experiment_post_id=str(post_id), 

671 ) 

672 g.db.add(usage) 

673 g.db.flush() 

674 

675 return _ok({ 

676 'consumed': amount, 

677 'total_consumed': escrow.consumed, 

678 'remaining': round(escrow.spark_amount - escrow.consumed, 4), 

679 'escrow_status': escrow.status, 

680 'usage_id': usage.id, 

681 }) 

682 

683 

684@tracker_bp.route('/experiments/<post_id>/insights', methods=['GET']) 

685@require_auth 

686def experiment_insights(post_id): 

687 """Contributor-exclusive deep progress insights for a thought experiment. 

688 

689 Central: full access. 

690 Contributors (have an active pledge): full access. 

691 Others: 403. 

692 """ 

693 post = g.db.query(Post).filter_by(id=post_id).first() 

694 if not post: 

695 return _err('Post not found', 404) 

696 

697 user_id = str(g.user.id) 

698 if not _is_central(g.user) and not _is_contributor(g.db, user_id, post_id): 

699 return _err('Insights are exclusive to contributors. Pledge compute to unlock.', 403) 

700 

701 # Gather goal info 

702 goal = _get_goal_for_post(g.db, post_id) 

703 goal_info = None 

704 ledger_tasks = [] 

705 if goal: 

706 progress = _get_goal_progress(goal.id) 

707 ledger_tasks = _get_ledger_tasks(goal.id) 

708 goal_info = { 

709 'goal_id': goal.id, 

710 'status': goal.status, 

711 'goal_type': goal.goal_type, 

712 'progress': progress, 

713 'tasks': ledger_tasks, 

714 'needs_review': any( 

715 t.get('blocked_reason') == 'APPROVAL_REQUIRED' 

716 for t in ledger_tasks 

717 ), 

718 } 

719 

720 # Aggregate consumption for this experiment 

721 consumption_rows = g.db.query( 

722 MeteredAPIUsage.model_id, 

723 sa_func.count(MeteredAPIUsage.id).label('call_count'), 

724 sa_func.sum(MeteredAPIUsage.tokens_in).label('total_tokens_in'), 

725 sa_func.sum(MeteredAPIUsage.tokens_out).label('total_tokens_out'), 

726 sa_func.sum(MeteredAPIUsage.estimated_spark_cost).label('total_spark_cost'), 

727 ).filter( 

728 MeteredAPIUsage.experiment_post_id == str(post_id), 

729 ).group_by(MeteredAPIUsage.model_id).all() 

730 

731 consumption = [] 

732 for row in consumption_rows: 

733 consumption.append({ 

734 'model_id': row.model_id, 

735 'call_count': row.call_count, 

736 'total_tokens_in': row.total_tokens_in or 0, 

737 'total_tokens_out': row.total_tokens_out or 0, 

738 'total_spark_cost': row.total_spark_cost or 0, 

739 }) 

740 

741 # Pledge summary for context 

742 pledges = g.db.query(ComputeEscrow).filter( 

743 ComputeEscrow.experiment_post_id == str(post_id), 

744 ComputeEscrow.status.in_(['pending', 'settled']), 

745 ).all() 

746 total_pledged = sum(p.spark_amount for p in pledges) 

747 total_consumed = sum(p.consumed or 0 for p in pledges) 

748 

749 return _ok({ 

750 'post_id': post_id, 

751 'goal': goal_info, 

752 'budget': { 

753 'total_pledged_spark': total_pledged, 

754 'total_consumed_spark': round(total_consumed, 4), 

755 'remaining_spark': round(total_pledged - total_consumed, 4), 

756 'pledge_count': len(pledges), 

757 }, 

758 'consumption_by_model': consumption, 

759 }) 

760 

761 

762# ═══════════════════════════════════════════════════════════════ 

763# MY PLEDGES / ADMIN VIEW / NODE VERIFICATION 

764# ═══════════════════════════════════════════════════════════════ 

765 

766@tracker_bp.route('/pledges/mine', methods=['GET']) 

767@require_auth 

768def my_pledges(): 

769 """Get all pledges made by the current user across experiments.""" 

770 user_id = str(g.user.id) 

771 limit = request.args.get('limit', 50, type=int) 

772 offset = request.args.get('offset', 0, type=int) 

773 status_filter = request.args.get('status') 

774 

775 q = g.db.query(ComputeEscrow).filter( 

776 ComputeEscrow.creditor_node_id == user_id, 

777 ComputeEscrow.experiment_post_id.isnot(None), 

778 ) 

779 if status_filter: 

780 q = q.filter_by(status=status_filter) 

781 

782 total = q.count() 

783 escrows = q.order_by( 

784 ComputeEscrow.created_at.desc()).offset(offset).limit(limit).all() 

785 

786 return _ok( 

787 [e.to_dict() for e in escrows], 

788 meta={'total': total, 'limit': limit, 'offset': offset, 

789 'has_more': offset + limit < total}, 

790 ) 

791 

792 

793@tracker_bp.route('/pledges/all', methods=['GET']) 

794@require_central 

795def all_pledges(): 

796 """Central admin: view all experiment pledges system-wide.""" 

797 limit = request.args.get('limit', 100, type=int) 

798 offset = request.args.get('offset', 0, type=int) 

799 status_filter = request.args.get('status') 

800 pledge_type = request.args.get('pledge_type') 

801 post_id = request.args.get('post_id') 

802 

803 q = g.db.query(ComputeEscrow).filter( 

804 ComputeEscrow.experiment_post_id.isnot(None), 

805 ) 

806 if status_filter: 

807 q = q.filter_by(status=status_filter) 

808 if pledge_type and pledge_type in VALID_PLEDGE_TYPES: 

809 q = q.filter_by(pledge_type=pledge_type) 

810 if post_id: 

811 q = q.filter_by(experiment_post_id=str(post_id)) 

812 

813 total = q.count() 

814 escrows = q.order_by( 

815 ComputeEscrow.created_at.desc()).offset(offset).limit(limit).all() 

816 

817 return _ok( 

818 [e.to_dict() for e in escrows], 

819 meta={'total': total, 'limit': limit, 'offset': offset, 

820 'has_more': offset + limit < total}, 

821 ) 

822 

823 

824@tracker_bp.route('/pledges/<int:escrow_id>/verify', methods=['POST']) 

825@require_auth 

826def verify_pledge(escrow_id): 

827 """Verify node capacity for a gpu_hours pledge. 

828 

829 Only the pledge owner or a regional/central admin can verify. 

830 """ 

831 escrow = g.db.query(ComputeEscrow).filter_by(id=escrow_id).first() 

832 if not escrow: 

833 return _err('Pledge not found', 404) 

834 

835 user_id = str(g.user.id) 

836 user_role = getattr(g.user, 'role', None) or 'flat' 

837 is_admin = user_role in ('central', 'regional') or getattr(g.user, 'is_admin', False) 

838 if escrow.creditor_node_id != user_id and not is_admin: 

839 return _err('Access denied', 403) 

840 

841 if escrow.pledge_type != 'gpu_hours': 

842 return _err('Only gpu_hours pledges require node verification', 400) 

843 

844 node_config = g.db.query(NodeComputeConfig).filter_by( 

845 node_id=escrow.creditor_node_id).first() 

846 

847 capacity_ok = True 

848 capacity_details = {} 

849 if node_config and node_config.offered_gpu_hours_per_day > 0: 

850 max_monthly = node_config.offered_gpu_hours_per_day * 30 

851 capacity_details = { 

852 'offered_daily': node_config.offered_gpu_hours_per_day, 

853 'max_monthly': round(max_monthly, 1), 

854 'pledged': escrow.spark_amount, 

855 'within_capacity': escrow.spark_amount <= max_monthly, 

856 } 

857 capacity_ok = escrow.spark_amount <= max_monthly 

858 else: 

859 capacity_details = {'warning': 'No NodeComputeConfig found'} 

860 

861 return _ok({ 

862 'verified': capacity_ok, 

863 'capacity': capacity_details, 

864 'escrow': escrow.to_dict(), 

865 }) 

866 

867 

868# ── Hive View Endpoints (extend tracker, no separate blueprint) ────── 

869 

870 

871@tracker_bp.route('/experiments/<post_id>/inject', methods=['POST']) 

872@require_auth 

873def inject_variable(post_id): 

874 """God's-eye variable injection — push new context into a running agent.""" 

875 data = request.get_json(silent=True) or {} 

876 variable = data.get('variable', '') 

877 injection_type = data.get('injection_type', 'info') # constraint | info | question 

878 

879 if not variable: 

880 return _err('variable is required', 400) 

881 

882 goal = _get_goal_for_post(g.db, post_id) 

883 if not goal: 

884 return _err('No active agent for this experiment', 404) 

885 

886 try: 

887 from integrations.channels.memory.memory_graph import MemoryGraph 

888 session_key = f"{goal.owner_id}_{goal.prompt_id}" if goal.prompt_id else str(goal.owner_id) 

889 try: 

890 from core.platform_paths import get_memory_graph_dir 

891 db_path = get_memory_graph_dir(session_key) 

892 except ImportError: 

893 db_path = os.path.join( 

894 os.path.expanduser("~"), "Documents", "Nunba", "data", 

895 "memory_graph", session_key) 

896 os.makedirs(db_path, exist_ok=True) 

897 graph = MemoryGraph(db_path=db_path, user_id=str(goal.owner_id)) 

898 memory_id = graph.register( 

899 content=f"[INJECTED {injection_type.upper()}] {variable}", 

900 metadata={ 

901 'memory_type': 'injection', 'injection_type': injection_type, 

902 'source_agent': 'god_eye', 'session_id': session_key, 

903 'injected_by': g.user_id, 

904 'injected_at': datetime.utcnow().isoformat(), 

905 }, 

906 context_snapshot=f"God's-eye {injection_type} injection during experiment", 

907 ) 

908 

909 # Notify live UIs 

910 try: 

911 from .realtime import publish_event 

912 publish_event('chat.social', { 

913 'type': 'hive_injection', 'goal_id': goal.id, 

914 'injection_type': injection_type, 'variable': variable[:200], 

915 }, user_id=goal.owner_id) 

916 except Exception: 

917 pass 

918 

919 return _ok({'memory_id': memory_id, 'injection_type': injection_type, 

920 'message': f'{injection_type.capitalize()} injected into agent context.'}) 

921 except Exception as e: 

922 logger.error("Variable injection failed: %s", e) 

923 return _err(str(e), 500) 

924 

925 

926@tracker_bp.route('/experiments/<post_id>/interview', methods=['POST']) 

927@require_auth 

928def interview_agent(post_id): 

929 """Post-experiment agent interview — ask any agent about its reasoning.""" 

930 data = request.get_json(silent=True) or {} 

931 question = data.get('question', '') 

932 if not question: 

933 return _err('question is required', 400) 

934 

935 goal = _get_goal_for_post(g.db, post_id) 

936 if not goal: 

937 return _err('No agent for this experiment', 404) 

938 

939 try: 

940 from core.http_pool import pooled_post 

941 from core.port_registry import get_port 

942 chat_url = f"http://localhost:{get_port('backend')}/chat" 

943 

944 interview_prompt = ( 

945 f"[INTERVIEW MODE — You are being asked about your reasoning on the experiment " 

946 f"'{goal.title}'. Reflect on your work and explain your thought process.]\n\n" 

947 f"Question: {question}" 

948 ) 

949 

950 resp = pooled_post(chat_url, json={ 

951 'user_id': goal.owner_id, 

952 'prompt_id': goal.prompt_id or 0, 

953 'prompt': interview_prompt, 

954 }, timeout=60) 

955 

956 if resp.status_code == 200: 

957 result = resp.json() 

958 return _ok({'question': question, 'answer': result.get('response', 'No response.'), 

959 'goal_id': goal.id}) 

960 else: 

961 return _err(f'Agent returned {resp.status_code}', 502) 

962 except Exception as e: 

963 logger.error("Agent interview failed: %s", e) 

964 return _err(str(e), 500) 

965 

966 

967@tracker_bp.route('/dual-context', methods=['POST']) 

968@require_auth 

969def launch_dual_context(): 

970 """Clone an experiment into N parallel contexts with different overrides.""" 

971 import uuid as _uuid 

972 from .models import AgentGoal 

973 

974 data = request.get_json(silent=True) or {} 

975 source_post_id = data.get('post_id') 

976 contexts = data.get('contexts', []) 

977 

978 if not source_post_id or not contexts or len(contexts) < 2: 

979 return _err('post_id and at least 2 contexts required', 400) 

980 

981 source_goal = _get_goal_for_post(g.db, source_post_id) 

982 if not source_goal: 

983 return _err('No agent for this experiment', 404) 

984 

985 new_goals = [] 

986 for ctx in contexts: 

987 new_id = str(_uuid.uuid4())[:16] 

988 cfg = dict(source_goal.config_json or {}) 

989 cfg['dual_context_label'] = ctx.get('label', 'variant') 

990 cfg['system_prompt_override'] = ctx.get('system_prompt_override', '') 

991 cfg['source_goal_id'] = source_goal.id 

992 

993 new_goal = AgentGoal( 

994 id=new_id, owner_id=source_goal.owner_id, 

995 goal_type=source_goal.goal_type, 

996 title=f"{source_goal.title} [{ctx.get('label', 'variant')}]", 

997 description=source_goal.description, 

998 status='active', priority=source_goal.priority, 

999 config_json=cfg, spark_budget=source_goal.spark_budget, 

1000 created_by=g.user_id, 

1001 ) 

1002 g.db.add(new_goal) 

1003 new_goals.append({'goal_id': new_id, 'label': ctx.get('label', 'variant'), 'status': 'active'}) 

1004 

1005 g.db.flush() 

1006 return jsonify({'success': True, 'data': { 

1007 'source_post_id': source_post_id, 'contexts': new_goals, 

1008 'message': f'{len(new_goals)} parallel contexts launched.', 

1009 }}), 201 

1010 

1011 

1012@tracker_bp.route('/encounters', methods=['GET']) 

1013@require_auth 

1014def get_encounter_graph(): 

1015 """Agent collaboration graph from Encounter data.""" 

1016 try: 

1017 from .models import Encounter 

1018 encounters = g.db.query(Encounter).filter( 

1019 Encounter.bond_level > 0, 

1020 ).order_by(Encounter.latest_at.desc()).limit(200).all() 

1021 

1022 user_ids = set() 

1023 for e in encounters: 

1024 user_ids.add(e.user_a_id) 

1025 user_ids.add(e.user_b_id) 

1026 

1027 users = {} 

1028 if user_ids: 

1029 user_rows = g.db.query(User).filter(User.id.in_(list(user_ids))).all() 

1030 users = {u.id: {'id': u.id, 'name': u.display_name or u.username, 

1031 'type': getattr(u, 'user_type', 'human')} for u in user_rows} 

1032 

1033 return _ok({ 

1034 'nodes': list(users.values()), 

1035 'edges': [{'source': e.user_a_id, 'target': e.user_b_id, 

1036 'bond_level': e.bond_level, 'encounter_count': e.encounter_count, 

1037 'context_type': e.context_type} for e in encounters], 

1038 }) 

1039 except Exception as e: 

1040 logger.error("Encounter graph failed: %s", e) 

1041 return _err(str(e), 500)