Coverage for integrations / social / api_compute_earnings.py: 22.8%

101 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1"""Compute earnings -- the user-visible side of idle-compute earning. 

2 

3Surface (idle_compute_workstream G3-G5): 

4 GET /api/compute/earnings/estimate?tier=standard&has_gpu=0&weekly_hours=168 

5 -> tier-typical Spark/USD/week range for the self-advertising banner 

6 (no auth -- used in the pre-opt-in dialog so brand-new visitors see 

7 a believable "you'd earn ~X this week" preview). 

8 

9 GET /api/compute/earnings/<uid>?days=7&limit=20 

10 -> live rolling earnings for the opted-in user. Sources rows from 

11 ResonanceTransaction where source_type matches a compute or API 

12 cost-recovery prefix -- no shadow ledger. Auth required so a user 

13 can only read their own earnings. 

14 

15 GET /api/compute/earnings/stream 

16 -> Server-Sent Events feed forwarding `compute.task_settled` from 

17 the platform EventBus. hevolve.ai's earnings drawer + Nunba 

18 shell tile keep this connection open and animate a row when a 

19 new settlement lands. Subscriber pattern mirrors HiveContest's 

20 /api/hive/contest/ideas/stream (single canonical SSE shape). 

21 

22The endpoint is the ONLY consumer of: 

23 - hosting_reward_service.estimate_weekly_spark (pre-opt-in helper) 

24 - revenue_aggregator's emit of `compute.task_settled` (post-settle) 

25 - ResonanceTransaction rows with source_type starting `compute:` or 

26 `api_cost_recovery` (the canonical settled-Spark ledger) 

27 

28No new tables. No parallel ledger. No bespoke event topic. 

29""" 

30from __future__ import annotations 

31 

32import logging 

33 

34from flask import Blueprint, Response, g, jsonify, request, stream_with_context 

35 

36from integrations.social.auth import require_auth 

37from integrations.social.hosting_reward_service import HostingRewardService 

38from integrations.social.models import get_db 

39 

40logger = logging.getLogger(__name__) 

41 

42compute_earnings_bp = Blueprint( 

43 'compute_earnings', __name__, url_prefix='/api/compute/earnings', 

44) 

45 

46 

47# --- Estimate (pre-opt-in banner) ---------------------------------- 

48 

49# Tier names mirror security.system_requirements.NodeTierLevel -- the 

50# single source of truth. Built at import time so any future tier 

51# value automatically flows into the validator. 

52def _load_valid_tiers() -> tuple: 

53 try: 

54 from security.system_requirements import NodeTierLevel 

55 return tuple(t.value for t in NodeTierLevel) 

56 except Exception: 

57 # Fallback covers the current ladder; keeps the module 

58 # importable even when system_requirements can't load. 

59 return ( 

60 'embedded', 'observer', 'lite', 'standard', 'full', 'compute_host', 

61 ) 

62 

63 

64_VALID_TIERS = _load_valid_tiers() 

65 

66 

67def _parse_bool(raw: str) -> bool: 

68 return str(raw or '').strip().lower() in ('1', 'true', 'yes', 'y', 'on') 

69 

70 

71@compute_earnings_bp.route('/estimate', methods=['GET']) 

72def estimate(): 

73 """Pre-opt-in tier-typical Spark/week estimate. 

74 

75 Public -- the banner shows this BEFORE a user opts into compute 

76 contribution, so the dialog can say "you'd earn ~X this week". 

77 """ 

78 tier = (request.args.get('tier') or 'standard').strip().lower() 

79 if tier not in _VALID_TIERS: 

80 tier = 'standard' 

81 has_gpu = _parse_bool(request.args.get('has_gpu')) 

82 try: 

83 weekly_hours = int(request.args.get('weekly_hours', 168)) 

84 except (TypeError, ValueError): 

85 weekly_hours = 168 

86 weekly_hours = max(1, min(168, weekly_hours)) 

87 

88 data = HostingRewardService.estimate_weekly_spark( 

89 tier=tier, has_gpu=has_gpu, weekly_hours=weekly_hours, 

90 ) 

91 return jsonify({'data': data}) 

92 

93 

94# --- Live earnings (opted-in user reads own ledger) ---------------- 

95 

96# The SQL `LIKE` patterns that mark a ResonanceTransaction as a 

97# compute / API earning. Today's only writer: 

98# - revenue_aggregator.settle_metered_api_costs -> 'api_cost_recovery' 

99# When future writers ship (per-inference settle with 'compute:%' prefix 

100# or per-provider settle with 'api:%'), add the prefix here AND add the 

101# writer module to the WRITERS comment so the dependency stays explicit. 

102# WRITERS: 

103# integrations/agent_engine/revenue_aggregator.py:settle_metered_api_costs 

104_EARNING_SOURCE_PREFIXES = ('api_cost_recovery%',) 

105 

106 

107@compute_earnings_bp.route('/<uid>', methods=['GET']) 

108@require_auth 

109def list_earnings(uid): 

110 """Recent compute/API earnings for the authenticated user. 

111 

112 Auth: the requester MUST equal `uid` -- no horizontal escalation. 

113 Returns last `limit` (max 100) settled rows in the last `days` 

114 (max 90) days. 

115 """ 

116 requester_id = getattr(g.user, 'id', None) 

117 if not requester_id or str(requester_id) != str(uid): 

118 return jsonify({'error': 'forbidden'}), 403 

119 

120 try: 

121 days = min(90, max(1, int(request.args.get('days', 7)))) 

122 except (TypeError, ValueError): 

123 days = 7 

124 try: 

125 limit = min(100, max(1, int(request.args.get('limit', 20)))) 

126 except (TypeError, ValueError): 

127 limit = 20 

128 

129 db = get_db() 

130 try: 

131 from datetime import datetime, timedelta 

132 from sqlalchemy import or_ 

133 from integrations.social.models import ResonanceTransaction 

134 

135 cutoff = datetime.utcnow() - timedelta(days=days) 

136 q = db.query(ResonanceTransaction).filter( 

137 ResonanceTransaction.user_id == str(uid), 

138 ResonanceTransaction.currency == 'spark', 

139 ResonanceTransaction.created_at >= cutoff, 

140 or_(*[ 

141 ResonanceTransaction.source_type.like(p) 

142 for p in _EARNING_SOURCE_PREFIXES 

143 ]), 

144 ).order_by(ResonanceTransaction.created_at.desc()).limit(limit) 

145 

146 rows = [] 

147 total = 0 

148 for tx in q.all(): 

149 amount = int(tx.amount or 0) 

150 total += amount 

151 rows.append({ 

152 'id': getattr(tx, 'id', None), 

153 'amount_spark': amount, 

154 'source_type': tx.source_type, 

155 'source_id': tx.source_id, 

156 'description': tx.description, 

157 'created_at': tx.created_at.isoformat() if tx.created_at else None, 

158 }) 

159 return jsonify({ 

160 'data': rows, 

161 'meta': { 

162 'count': len(rows), 

163 'total_spark_in_window': total, 

164 'days': days, 

165 }, 

166 }) 

167 except Exception as exc: 

168 logger.debug(f'/api/compute/earnings list failed: {exc}') 

169 return jsonify({'error': 'lookup failed'}), 500 

170 finally: 

171 db.close() 

172 

173 

174# --- SSE -- live drawer fan-out ------------------------------------- 

175 

176@compute_earnings_bp.route('/stream', methods=['GET']) 

177@require_auth 

178def stream(): 

179 """Server-Sent Events feed for the live earnings drawer. 

180 

181 AUTH REQUIRED. Subscribes to platform EventBus topic 

182 `compute.task_settled` (emitted by 

183 revenue_aggregator.settle_metered_api_costs after every Spark 

184 write). Same shape as HiveContest's 

185 /api/hive/contest/ideas/stream -- single canonical SSE pattern. 

186 

187 Privacy: the upstream payload contains a raw `operator_id` (the 

188 PeerNode operator user id). This route filters server-side so 

189 only events whose `operator_id` matches the authenticated session 

190 are forwarded to the client. Client-side filtering alone would 

191 have leaked every node settlement to any anonymous listener -- 

192 caught in self-review before commit. 

193 """ 

194 user = getattr(g, 'user', None) 

195 requester_id = getattr(user, 'id', None) if user else None 

196 if not requester_id: 

197 return jsonify({'error': 'auth required'}), 401 

198 requester_id = str(requester_id) 

199 

200 import json as _json 

201 import queue as _queue 

202 import time as _time 

203 

204 q: _queue.Queue = _queue.Queue(maxsize=200) 

205 

206 # Same registry-based bus accessor used by api_hive_contest stream. 

207 try: 

208 from core.platform.registry import get_registry 

209 _reg = get_registry() 

210 bus = _reg.get('events') if _reg.has('events') else None 

211 except Exception: 

212 bus = None 

213 

214 def _on_event(_topic, payload): 

215 # Defense-in-depth scope guard: forward only this user's 

216 # settlement events. Anonymous listeners cannot reach this 

217 # closure (route is auth'd), but explicit filter prevents a 

218 # logged-in user from seeing other users' rows even if the 

219 # auth layer is later bypassed. 

220 try: 

221 if payload and str(payload.get('operator_id') or '') == requester_id: 

222 q.put_nowait(payload) 

223 except _queue.Full: 

224 pass 

225 

226 subscribed = False 

227 if bus is not None: 

228 try: 

229 bus.on('compute.task_settled', _on_event) 

230 subscribed = True 

231 except Exception as exc: 

232 logger.debug(f'compute earnings SSE subscribe failed: {exc}') 

233 

234 def _generate(): 

235 # initial keepalive flushes proxies / lets client confirm connect 

236 yield 'event: ping\ndata: {}\n\n' 

237 last_ping = _time.time() 

238 try: 

239 while True: 

240 try: 

241 payload = q.get(timeout=5.0) 

242 yield ( 

243 f'event: compute.task_settled\n' 

244 f'data: {_json.dumps(payload)}\n\n' 

245 ) 

246 except _queue.Empty: 

247 pass 

248 if _time.time() - last_ping > 15: 

249 yield 'event: ping\ndata: {}\n\n' 

250 last_ping = _time.time() 

251 finally: 

252 if subscribed and bus is not None: 

253 try: 

254 bus.off('compute.task_settled', _on_event) 

255 except Exception: 

256 pass 

257 

258 headers = { 

259 'Content-Type': 'text/event-stream', 

260 'Cache-Control': 'no-cache', 

261 'X-Accel-Buffering': 'no', 

262 } 

263 return Response(stream_with_context(_generate()), headers=headers)