Coverage for integrations / social / api_compute_earnings.py: 22.8%
101 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""Compute earnings -- the user-visible side of idle-compute earning.
3Surface (idle_compute_workstream G3-G5):
4 GET /api/compute/earnings/estimate?tier=standard&has_gpu=0&weekly_hours=168
5 -> tier-typical Spark/USD/week range for the self-advertising banner
6 (no auth -- used in the pre-opt-in dialog so brand-new visitors see
7 a believable "you'd earn ~X this week" preview).
9 GET /api/compute/earnings/<uid>?days=7&limit=20
10 -> live rolling earnings for the opted-in user. Sources rows from
11 ResonanceTransaction where source_type matches a compute or API
12 cost-recovery prefix -- no shadow ledger. Auth required so a user
13 can only read their own earnings.
15 GET /api/compute/earnings/stream
16 -> Server-Sent Events feed forwarding `compute.task_settled` from
17 the platform EventBus. hevolve.ai's earnings drawer + Nunba
18 shell tile keep this connection open and animate a row when a
19 new settlement lands. Subscriber pattern mirrors HiveContest's
20 /api/hive/contest/ideas/stream (single canonical SSE shape).
22The endpoint is the ONLY consumer of:
23 - hosting_reward_service.estimate_weekly_spark (pre-opt-in helper)
24 - revenue_aggregator's emit of `compute.task_settled` (post-settle)
25 - ResonanceTransaction rows with source_type starting `compute:` or
26 `api_cost_recovery` (the canonical settled-Spark ledger)
28No new tables. No parallel ledger. No bespoke event topic.
29"""
30from __future__ import annotations
32import logging
34from flask import Blueprint, Response, g, jsonify, request, stream_with_context
36from integrations.social.auth import require_auth
37from integrations.social.hosting_reward_service import HostingRewardService
38from integrations.social.models import get_db
40logger = logging.getLogger(__name__)
42compute_earnings_bp = Blueprint(
43 'compute_earnings', __name__, url_prefix='/api/compute/earnings',
44)
47# --- Estimate (pre-opt-in banner) ----------------------------------
49# Tier names mirror security.system_requirements.NodeTierLevel -- the
50# single source of truth. Built at import time so any future tier
51# value automatically flows into the validator.
52def _load_valid_tiers() -> tuple:
53 try:
54 from security.system_requirements import NodeTierLevel
55 return tuple(t.value for t in NodeTierLevel)
56 except Exception:
57 # Fallback covers the current ladder; keeps the module
58 # importable even when system_requirements can't load.
59 return (
60 'embedded', 'observer', 'lite', 'standard', 'full', 'compute_host',
61 )
64_VALID_TIERS = _load_valid_tiers()
67def _parse_bool(raw: str) -> bool:
68 return str(raw or '').strip().lower() in ('1', 'true', 'yes', 'y', 'on')
71@compute_earnings_bp.route('/estimate', methods=['GET'])
72def estimate():
73 """Pre-opt-in tier-typical Spark/week estimate.
75 Public -- the banner shows this BEFORE a user opts into compute
76 contribution, so the dialog can say "you'd earn ~X this week".
77 """
78 tier = (request.args.get('tier') or 'standard').strip().lower()
79 if tier not in _VALID_TIERS:
80 tier = 'standard'
81 has_gpu = _parse_bool(request.args.get('has_gpu'))
82 try:
83 weekly_hours = int(request.args.get('weekly_hours', 168))
84 except (TypeError, ValueError):
85 weekly_hours = 168
86 weekly_hours = max(1, min(168, weekly_hours))
88 data = HostingRewardService.estimate_weekly_spark(
89 tier=tier, has_gpu=has_gpu, weekly_hours=weekly_hours,
90 )
91 return jsonify({'data': data})
94# --- Live earnings (opted-in user reads own ledger) ----------------
96# The SQL `LIKE` patterns that mark a ResonanceTransaction as a
97# compute / API earning. Today's only writer:
98# - revenue_aggregator.settle_metered_api_costs -> 'api_cost_recovery'
99# When future writers ship (per-inference settle with 'compute:%' prefix
100# or per-provider settle with 'api:%'), add the prefix here AND add the
101# writer module to the WRITERS comment so the dependency stays explicit.
102# WRITERS:
103# integrations/agent_engine/revenue_aggregator.py:settle_metered_api_costs
104_EARNING_SOURCE_PREFIXES = ('api_cost_recovery%',)
107@compute_earnings_bp.route('/<uid>', methods=['GET'])
108@require_auth
109def list_earnings(uid):
110 """Recent compute/API earnings for the authenticated user.
112 Auth: the requester MUST equal `uid` -- no horizontal escalation.
113 Returns last `limit` (max 100) settled rows in the last `days`
114 (max 90) days.
115 """
116 requester_id = getattr(g.user, 'id', None)
117 if not requester_id or str(requester_id) != str(uid):
118 return jsonify({'error': 'forbidden'}), 403
120 try:
121 days = min(90, max(1, int(request.args.get('days', 7))))
122 except (TypeError, ValueError):
123 days = 7
124 try:
125 limit = min(100, max(1, int(request.args.get('limit', 20))))
126 except (TypeError, ValueError):
127 limit = 20
129 db = get_db()
130 try:
131 from datetime import datetime, timedelta
132 from sqlalchemy import or_
133 from integrations.social.models import ResonanceTransaction
135 cutoff = datetime.utcnow() - timedelta(days=days)
136 q = db.query(ResonanceTransaction).filter(
137 ResonanceTransaction.user_id == str(uid),
138 ResonanceTransaction.currency == 'spark',
139 ResonanceTransaction.created_at >= cutoff,
140 or_(*[
141 ResonanceTransaction.source_type.like(p)
142 for p in _EARNING_SOURCE_PREFIXES
143 ]),
144 ).order_by(ResonanceTransaction.created_at.desc()).limit(limit)
146 rows = []
147 total = 0
148 for tx in q.all():
149 amount = int(tx.amount or 0)
150 total += amount
151 rows.append({
152 'id': getattr(tx, 'id', None),
153 'amount_spark': amount,
154 'source_type': tx.source_type,
155 'source_id': tx.source_id,
156 'description': tx.description,
157 'created_at': tx.created_at.isoformat() if tx.created_at else None,
158 })
159 return jsonify({
160 'data': rows,
161 'meta': {
162 'count': len(rows),
163 'total_spark_in_window': total,
164 'days': days,
165 },
166 })
167 except Exception as exc:
168 logger.debug(f'/api/compute/earnings list failed: {exc}')
169 return jsonify({'error': 'lookup failed'}), 500
170 finally:
171 db.close()
174# --- SSE -- live drawer fan-out -------------------------------------
176@compute_earnings_bp.route('/stream', methods=['GET'])
177@require_auth
178def stream():
179 """Server-Sent Events feed for the live earnings drawer.
181 AUTH REQUIRED. Subscribes to platform EventBus topic
182 `compute.task_settled` (emitted by
183 revenue_aggregator.settle_metered_api_costs after every Spark
184 write). Same shape as HiveContest's
185 /api/hive/contest/ideas/stream -- single canonical SSE pattern.
187 Privacy: the upstream payload contains a raw `operator_id` (the
188 PeerNode operator user id). This route filters server-side so
189 only events whose `operator_id` matches the authenticated session
190 are forwarded to the client. Client-side filtering alone would
191 have leaked every node settlement to any anonymous listener --
192 caught in self-review before commit.
193 """
194 user = getattr(g, 'user', None)
195 requester_id = getattr(user, 'id', None) if user else None
196 if not requester_id:
197 return jsonify({'error': 'auth required'}), 401
198 requester_id = str(requester_id)
200 import json as _json
201 import queue as _queue
202 import time as _time
204 q: _queue.Queue = _queue.Queue(maxsize=200)
206 # Same registry-based bus accessor used by api_hive_contest stream.
207 try:
208 from core.platform.registry import get_registry
209 _reg = get_registry()
210 bus = _reg.get('events') if _reg.has('events') else None
211 except Exception:
212 bus = None
214 def _on_event(_topic, payload):
215 # Defense-in-depth scope guard: forward only this user's
216 # settlement events. Anonymous listeners cannot reach this
217 # closure (route is auth'd), but explicit filter prevents a
218 # logged-in user from seeing other users' rows even if the
219 # auth layer is later bypassed.
220 try:
221 if payload and str(payload.get('operator_id') or '') == requester_id:
222 q.put_nowait(payload)
223 except _queue.Full:
224 pass
226 subscribed = False
227 if bus is not None:
228 try:
229 bus.on('compute.task_settled', _on_event)
230 subscribed = True
231 except Exception as exc:
232 logger.debug(f'compute earnings SSE subscribe failed: {exc}')
234 def _generate():
235 # initial keepalive flushes proxies / lets client confirm connect
236 yield 'event: ping\ndata: {}\n\n'
237 last_ping = _time.time()
238 try:
239 while True:
240 try:
241 payload = q.get(timeout=5.0)
242 yield (
243 f'event: compute.task_settled\n'
244 f'data: {_json.dumps(payload)}\n\n'
245 )
246 except _queue.Empty:
247 pass
248 if _time.time() - last_ping > 15:
249 yield 'event: ping\ndata: {}\n\n'
250 last_ping = _time.time()
251 finally:
252 if subscribed and bus is not None:
253 try:
254 bus.off('compute.task_settled', _on_event)
255 except Exception:
256 pass
258 headers = {
259 'Content-Type': 'text/event-stream',
260 'Cache-Control': 'no-cache',
261 'X-Accel-Buffering': 'no',
262 }
263 return Response(stream_with_context(_generate()), headers=headers)