Coverage for integrations / social / hosting_reward_service.py: 73.9%
176 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HevolveSocial - Hosting Reward Service
3Computes contribution scores, manages visibility tiers,
4distributes rewards to peer node operators.
5"""
6import logging
7from datetime import datetime, timedelta
8from typing import Optional, Dict, List
10from sqlalchemy import desc, func
11from sqlalchemy.orm import Session
13from .models import PeerNode, AdImpression, HostingReward, User
14from .resonance_engine import ResonanceService
16logger = logging.getLogger('hevolve_social')
18# --- Constants ---
20SCORE_WEIGHTS = {
21 'uptime_ratio': 100.0, # uptime 0.0-1.0 * 100 = 0-100 points
22 'agent_count': 2.0, # 2 points per agent hosted
23 'post_count': 0.5, # 0.5 points per post served
24 'ad_impressions': 0.1, # 0.1 points per ad impression served
25 'gpu_hours': 5.0, # 5 points per GPU-hour served
26 'inferences': 0.01, # 0.01 points per inference
27 'energy_kwh': 2.0, # 2 points per kWh contributed
28 'api_costs_absorbed': 10.0, # 10 points per USD metered API absorbed for hive
29}
31TIER_THRESHOLDS = {
32 'standard': 0,
33 'featured': 100,
34 'priority': 500,
35}
37HOSTING_MILESTONES = [10, 50, 100, 500] # agent_count thresholds
39try:
40 from integrations.agent_engine.revenue_aggregator import REVENUE_SPLIT_USERS
41 HOSTER_REVENUE_SHARE = REVENUE_SPLIT_USERS # 0.90 (was 0.70)
42except ImportError:
43 HOSTER_REVENUE_SHARE = 0.90
45# Same canonical fallback discipline as REVENUE_SPLIT_USERS above --
46# import once at module load so estimate_weekly_spark() does not pay
47# the import cost on every call.
48try:
49 from integrations.agent_engine.revenue_aggregator import SPARK_PER_USD as _SPARK_PER_USD
50except ImportError:
51 _SPARK_PER_USD = 100.0
54class HostingRewardService:
56 # --- Contribution Scoring ---
58 @staticmethod
59 def compute_contribution_score(db: Session, node_id: str,
60 period_days: int = 7) -> Optional[Dict]:
61 """Compute and update contribution_score + visibility_tier for a PeerNode."""
62 peer = db.query(PeerNode).filter_by(node_id=node_id).first()
63 if not peer:
64 return None
66 uptime = HostingRewardService.compute_uptime_ratio(db, peer)
68 # Ad impressions served by this node in the period
69 cutoff = datetime.utcnow() - timedelta(days=period_days)
70 ad_imp_count = db.query(func.count(AdImpression.id)).filter(
71 AdImpression.node_id == node_id,
72 AdImpression.created_at >= cutoff,
73 ).scalar() or 0
75 score = (
76 uptime * SCORE_WEIGHTS['uptime_ratio']
77 + (peer.agent_count or 0) * SCORE_WEIGHTS['agent_count']
78 + (peer.post_count or 0) * SCORE_WEIGHTS['post_count']
79 + ad_imp_count * SCORE_WEIGHTS['ad_impressions']
80 + (peer.gpu_hours_served or 0) * SCORE_WEIGHTS['gpu_hours']
81 + (peer.total_inferences or 0) * SCORE_WEIGHTS['inferences']
82 + (peer.energy_kwh_contributed or 0) * SCORE_WEIGHTS['energy_kwh']
83 + (peer.metered_api_costs_absorbed or 0) * SCORE_WEIGHTS['api_costs_absorbed']
84 )
86 old_tier = peer.visibility_tier
87 peer.contribution_score = round(score, 2)
88 peer.visibility_tier = HostingRewardService._determine_tier(score)
89 db.flush()
91 return {
92 'node_id': node_id,
93 'score': peer.contribution_score,
94 'tier': peer.visibility_tier,
95 'previous_tier': old_tier,
96 'breakdown': {
97 'uptime': round(uptime * SCORE_WEIGHTS['uptime_ratio'], 2),
98 'agents': (peer.agent_count or 0) * SCORE_WEIGHTS['agent_count'],
99 'posts': (peer.post_count or 0) * SCORE_WEIGHTS['post_count'],
100 'ad_impressions': ad_imp_count * SCORE_WEIGHTS['ad_impressions'],
101 'gpu_hours': (peer.gpu_hours_served or 0) * SCORE_WEIGHTS['gpu_hours'],
102 'inferences': (peer.total_inferences or 0) * SCORE_WEIGHTS['inferences'],
103 'energy_kwh': (peer.energy_kwh_contributed or 0) * SCORE_WEIGHTS['energy_kwh'],
104 'api_costs_absorbed': (peer.metered_api_costs_absorbed or 0) * SCORE_WEIGHTS['api_costs_absorbed'],
105 },
106 }
108 @staticmethod
109 def compute_uptime_ratio(db: Session, peer: PeerNode) -> float:
110 """Compute uptime ratio: active=1.0, stale=0.5, dead=0.0."""
111 if peer.status == 'active':
112 return 1.0
113 elif peer.status == 'stale':
114 return 0.5
115 return 0.0
117 @staticmethod
118 def _determine_tier(score: float) -> str:
119 if score >= TIER_THRESHOLDS['priority']:
120 return 'priority'
121 elif score >= TIER_THRESHOLDS['featured']:
122 return 'featured'
123 return 'standard'
125 @staticmethod
126 def compute_all_scores(db: Session, period_days: int = 7) -> List[Dict]:
127 """Batch: compute scores for all active/stale PeerNodes."""
128 peers = db.query(PeerNode).filter(
129 PeerNode.status.in_(['active', 'stale'])
130 ).all()
131 results = []
132 for peer in peers:
133 result = HostingRewardService.compute_contribution_score(
134 db, peer.node_id, period_days)
135 if result:
136 results.append(result)
137 return results
139 # --- Pre-opt-in Earnings Estimate (idle_compute_workstream G3) ---
141 # Tier-typical hardware profile keyed on `NodeTierLevel.value` from
142 # security.system_requirements (single source of truth for tier
143 # names -- no parallel ladder). Numbers are pure-function over
144 # SCORE_WEIGHTS so any tweak there flows through automatically.
145 # Calibration: conservative top-quartile-of-observed estimates;
146 # bumped up at compute_host because that tier is explicitly built
147 # for serving regional inference traffic.
148 _TIER_PROFILE = {
149 'embedded': {'gpu_factor': 0.0, 'agents': 0, 'inf_per_week': 50, 'kwh_per_week': 0.2},
150 'observer': {'gpu_factor': 0.0, 'agents': 0, 'inf_per_week': 100, 'kwh_per_week': 0.5},
151 'lite': {'gpu_factor': 0.0, 'agents': 1, 'inf_per_week': 200, 'kwh_per_week': 1.0},
152 'standard': {'gpu_factor': 0.5, 'agents': 3, 'inf_per_week': 1000, 'kwh_per_week': 2.5},
153 'full': {'gpu_factor': 1.0, 'agents': 5, 'inf_per_week': 5000, 'kwh_per_week': 6.0},
154 'compute_host': {'gpu_factor': 1.0, 'agents': 12,'inf_per_week': 50000, 'kwh_per_week': 25.0},
155 }
157 @staticmethod
158 def estimate_weekly_spark(
159 tier: str = 'standard',
160 has_gpu: bool = False,
161 weekly_hours: int = 168,
162 ) -> Dict:
163 """Pre-opt-in Spark estimate for the self-advertising banner.
165 Pure function over `SCORE_WEIGHTS` + tier profile -- no DB read,
166 no parallel scoring path. The same weights drive realized
167 scoring in `compute_contribution_score`, so any retune flows
168 through both surfaces atomically.
170 Args:
171 tier: a `NodeTierLevel.value` -- embedded|observer|lite|
172 standard|full|compute_host (from
173 `security.system_requirements.get_capabilities`).
174 has_gpu: when True, multiplies the gpu_hours term by the
175 tier's gpu_factor; embedded/observer/lite tiers
176 default to 0 even with a GPU because the profile
177 assumes they don't continuously serve GPU jobs.
178 weekly_hours: how many hours per week the node is online.
179 168 = always-on; 40 = office-hours only.
181 Returns:
182 {'weekly_spark': int, 'weekly_usd': float,
183 'breakdown': {gpu_hours, inferences, energy_kwh, agents,
184 uptime}, 'tier': tier, 'assumptions': dict}
185 """
186 profile = HostingRewardService._TIER_PROFILE.get(
187 tier, HostingRewardService._TIER_PROFILE['standard'])
188 # Uptime fraction over the week (168h)
189 uptime_ratio = max(0.0, min(1.0, weekly_hours / 168.0))
190 # GPU hours over the week -- only when has_gpu AND tier supports it
191 gpu_hours = weekly_hours * profile['gpu_factor'] if has_gpu else 0.0
192 breakdown = {
193 'uptime': round(uptime_ratio * SCORE_WEIGHTS['uptime_ratio'], 2),
194 'agents': profile['agents'] * SCORE_WEIGHTS['agent_count'],
195 'gpu_hours': round(gpu_hours * SCORE_WEIGHTS['gpu_hours'], 2),
196 'inferences': round(
197 profile['inf_per_week'] * SCORE_WEIGHTS['inferences'], 2),
198 'energy_kwh': round(
199 profile['kwh_per_week'] * SCORE_WEIGHTS['energy_kwh'], 2),
200 }
201 weekly_score = sum(breakdown.values())
202 # Score -> Spark: 1 score-point ~= 1 Spark in current calibration
203 # (revenue_aggregator's 90/9/1 split applies at distribution
204 # time, not here -- this is gross). USD conversion uses the
205 # canonical SPARK_PER_USD constant imported at module load.
206 usd_per_spark = 1.0 / float(_SPARK_PER_USD or 100)
207 return {
208 'tier': tier,
209 'weekly_spark': int(round(weekly_score)),
210 'weekly_usd': round(weekly_score * usd_per_spark, 2),
211 'breakdown': breakdown,
212 'assumptions': {
213 'has_gpu': bool(has_gpu),
214 'weekly_hours': int(weekly_hours),
215 'gpu_factor': profile['gpu_factor'],
216 'inferences_per_week': profile['inf_per_week'],
217 'energy_kwh_per_week': profile['kwh_per_week'],
218 'spark_per_usd': float(_SPARK_PER_USD or 100),
219 },
220 }
222 # --- Reward Distribution ---
224 @staticmethod
225 def distribute_ad_revenue(db: Session, node_id: str,
226 period: str = 'daily') -> Optional[Dict]:
227 """Distribute ad revenue to node operator for a period."""
228 peer = db.query(PeerNode).filter_by(node_id=node_id).first()
229 if not peer or not peer.node_operator_id:
230 return None
232 # Count impressions served by this node in the last 24h
233 cutoff = datetime.utcnow() - timedelta(hours=24)
234 imp_count = db.query(func.count(AdImpression.id)).filter(
235 AdImpression.node_id == node_id,
236 AdImpression.created_at >= cutoff,
237 ).scalar() or 0
239 if imp_count == 0:
240 return None
242 # Calculate revenue: avg CPI * impressions * hoster share
243 avg_cpi = db.query(func.avg(AdImpression.ad.has())).scalar() or 0.1
244 # Simpler: use default CPI
245 revenue_spark = int(imp_count * 0.1 * HOSTER_REVENUE_SHARE)
246 if revenue_spark < 1:
247 revenue_spark = 1
249 # Credit operator
250 ResonanceService.award_spark(
251 db, peer.node_operator_id, revenue_spark,
252 'ad_revenue', node_id,
253 f'Ad revenue: {imp_count} impressions served')
255 # Record hosting reward
256 reward = HostingReward(
257 node_id=node_id,
258 operator_id=peer.node_operator_id,
259 amount=revenue_spark,
260 currency='spark',
261 period=period,
262 reason=f'Ad revenue share: {imp_count} impressions',
263 ad_impressions_count=imp_count,
264 uptime_ratio=HostingRewardService.compute_uptime_ratio(db, peer),
265 contribution_score_snapshot=peer.contribution_score or 0,
266 )
267 db.add(reward)
269 # Batch impression bonus (1 Spark per 100 impressions)
270 batches = imp_count // 100
271 if batches > 0:
272 ResonanceService.award_spark(
273 db, peer.node_operator_id, batches,
274 'ad_impression_served', node_id,
275 f'Batch impression bonus: {batches} x 100')
277 db.flush()
279 # Immutable audit trail for ad revenue distribution
280 try:
281 from security.immutable_audit_log import get_audit_log
282 get_audit_log().log_event(
283 'revenue_distribution',
284 actor_id='hosting_reward_service',
285 action=f'Ad revenue distributed to {peer.node_operator_id}',
286 detail={
287 'node_id': node_id,
288 'operator_id': peer.node_operator_id,
289 'spark_amount': revenue_spark,
290 'impressions': imp_count,
291 'period': period,
292 },
293 target_id=node_id,
294 )
295 except Exception:
296 pass # Audit log failure must not block distribution
298 return reward.to_dict()
300 @staticmethod
301 def distribute_uptime_bonus(db: Session, node_id: str) -> Optional[Dict]:
302 """Award daily bonus for 100% uptime (status='active')."""
303 peer = db.query(PeerNode).filter_by(
304 node_id=node_id, status='active').first()
305 if not peer or not peer.node_operator_id:
306 return None
308 # Check: last_seen within 5 minutes (alive)
309 if peer.last_seen:
310 age = (datetime.utcnow() - peer.last_seen).total_seconds()
311 if age > 300:
312 return None # Not truly active
314 # Check if already awarded today
315 today_start = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0)
316 existing = db.query(HostingReward).filter(
317 HostingReward.node_id == node_id,
318 HostingReward.period == 'daily',
319 HostingReward.reason.like('Uptime bonus%'),
320 HostingReward.created_at >= today_start,
321 ).first()
322 if existing:
323 return None # Already awarded today
325 # Award: 10 Spark + 5 Pulse + 20 XP
326 ResonanceService.award_spark(
327 db, peer.node_operator_id, 10, 'hosting_uptime_bonus', node_id,
328 'Daily uptime bonus')
329 ResonanceService.award_pulse(
330 db, peer.node_operator_id, 5, 'hosting_uptime_bonus', node_id,
331 'Daily uptime bonus')
332 ResonanceService.award_xp(
333 db, peer.node_operator_id, 20, 'hosting_uptime_bonus', node_id,
334 'Daily uptime bonus')
336 reward = HostingReward(
337 node_id=node_id,
338 operator_id=peer.node_operator_id,
339 amount=10, currency='spark',
340 period='daily',
341 reason='Uptime bonus: 100% active',
342 uptime_ratio=1.0,
343 contribution_score_snapshot=peer.contribution_score or 0,
344 )
345 db.add(reward)
346 db.flush()
348 # Immutable audit trail for uptime bonus distribution
349 try:
350 from security.immutable_audit_log import get_audit_log
351 get_audit_log().log_event(
352 'revenue_distribution',
353 actor_id='hosting_reward_service',
354 action=f'Uptime bonus distributed to {peer.node_operator_id}',
355 detail={
356 'node_id': node_id,
357 'operator_id': peer.node_operator_id,
358 'spark_amount': 10,
359 'pulse_amount': 5,
360 'xp_amount': 20,
361 'reason': 'daily_uptime_bonus',
362 },
363 target_id=node_id,
364 )
365 except Exception:
366 pass # Audit log failure must not block distribution
368 return reward.to_dict()
370 @staticmethod
371 def check_milestones(db: Session, node_id: str) -> Optional[Dict]:
372 """Check and award hosting milestones based on agent_count."""
373 peer = db.query(PeerNode).filter_by(node_id=node_id).first()
374 if not peer or not peer.node_operator_id:
375 return None
377 agent_count = peer.agent_count or 0
378 awarded = None
380 for threshold in HOSTING_MILESTONES:
381 if agent_count >= threshold:
382 # Check if already awarded
383 existing = db.query(HostingReward).filter(
384 HostingReward.node_id == node_id,
385 HostingReward.period == 'milestone',
386 HostingReward.reason == f'Hosting milestone: {threshold} agents',
387 ).first()
388 if existing:
389 continue
391 # Award milestone
392 ResonanceService.award_spark(
393 db, peer.node_operator_id, 50,
394 'hosting_milestone', node_id,
395 f'Milestone: {threshold} agents hosted')
396 ResonanceService.award_pulse(
397 db, peer.node_operator_id, 25,
398 'hosting_milestone', node_id,
399 f'Milestone: {threshold} agents hosted')
400 ResonanceService.award_xp(
401 db, peer.node_operator_id, 100,
402 'hosting_milestone', node_id,
403 f'Milestone: {threshold} agents hosted')
405 reward = HostingReward(
406 node_id=node_id,
407 operator_id=peer.node_operator_id,
408 amount=50, currency='spark',
409 period='milestone',
410 reason=f'Hosting milestone: {threshold} agents',
411 contribution_score_snapshot=peer.contribution_score or 0,
412 )
413 db.add(reward)
414 awarded = reward
416 if awarded:
417 db.flush()
418 return awarded.to_dict()
419 return None
421 # --- Queries ---
423 # --- Compute Stats Aggregation ---
425 @staticmethod
426 def aggregate_compute_stats(db: Session, node_id: str,
427 period_hours: int = 24) -> Optional[Dict]:
428 """Aggregate metered API usage into PeerNode cumulative stats.
430 Queries MeteredAPIUsage for hive/idle tasks and updates PeerNode's
431 gpu_hours_served, total_inferences, energy_kwh_contributed,
432 metered_api_costs_absorbed columns.
433 """
434 peer = db.query(PeerNode).filter_by(node_id=node_id).first()
435 if not peer:
436 return None
438 try:
439 from .models import MeteredAPIUsage
440 cutoff = datetime.utcnow() - timedelta(hours=period_hours)
441 usages = db.query(MeteredAPIUsage).filter(
442 MeteredAPIUsage.node_id == node_id,
443 MeteredAPIUsage.task_source != 'own',
444 MeteredAPIUsage.created_at >= cutoff,
445 ).all()
447 inference_count = len(usages)
448 usd_absorbed = sum(u.actual_usd_cost or 0 for u in usages)
449 total_tokens = sum((u.tokens_in or 0) + (u.tokens_out or 0) for u in usages)
451 # Estimate GPU hours: ~1 GPU-second per 1K tokens (rough heuristic)
452 gpu_hours_delta = (total_tokens / 1000.0) / 3600.0
454 # Estimate energy: use ModelRegistry if available, else 170W TDP default
455 try:
456 from integrations.agent_engine.model_registry import model_registry
457 energy_delta = model_registry.get_total_energy_kwh(hours=period_hours)
458 except Exception:
459 energy_delta = gpu_hours_delta * 0.170 # 170W default TDP
461 peer.total_inferences = (peer.total_inferences or 0) + inference_count
462 peer.gpu_hours_served = round((peer.gpu_hours_served or 0) + gpu_hours_delta, 4)
463 peer.energy_kwh_contributed = round(
464 (peer.energy_kwh_contributed or 0) + energy_delta, 4)
465 peer.metered_api_costs_absorbed = round(
466 (peer.metered_api_costs_absorbed or 0) + usd_absorbed, 4)
468 db.flush()
469 return {
470 'node_id': node_id,
471 'period_hours': period_hours,
472 'inferences_added': inference_count,
473 'gpu_hours_added': round(gpu_hours_delta, 4),
474 'energy_kwh_added': round(energy_delta, 4),
475 'usd_absorbed_added': round(usd_absorbed, 4),
476 }
477 except Exception as e:
478 logger.debug(f"Compute stats aggregation failed: {e}")
479 return None
481 # --- Queries ---
483 @staticmethod
484 def get_rewards(db: Session, node_id: str = None,
485 operator_id: str = None,
486 limit: int = 50, offset: int = 0) -> List[Dict]:
487 q = db.query(HostingReward)
488 if node_id:
489 q = q.filter_by(node_id=node_id)
490 if operator_id:
491 q = q.filter_by(operator_id=operator_id)
492 rewards = q.order_by(desc(HostingReward.created_at)).offset(offset).limit(limit).all()
493 return [r.to_dict() for r in rewards]
495 @staticmethod
496 def get_leaderboard(db: Session, limit: int = 50,
497 offset: int = 0) -> List[Dict]:
498 nodes = db.query(PeerNode).filter(
499 PeerNode.status.in_(['active', 'stale'])
500 ).order_by(desc(PeerNode.contribution_score)).offset(offset).limit(limit).all()
501 return [n.to_dict() for n in nodes]
503 @staticmethod
504 def get_reward_summary(db: Session, node_id: str) -> Dict:
505 peer = db.query(PeerNode).filter_by(node_id=node_id).first()
506 if not peer:
507 return {'error': 'Node not found'}
509 total_spark = db.query(func.coalesce(
510 func.sum(HostingReward.amount), 0
511 )).filter(
512 HostingReward.node_id == node_id,
513 HostingReward.currency == 'spark',
514 ).scalar()
516 reward_count = db.query(func.count(HostingReward.id)).filter_by(
517 node_id=node_id).scalar()
519 return {
520 'node_id': node_id,
521 'contribution_score': peer.contribution_score or 0,
522 'visibility_tier': peer.visibility_tier or 'standard',
523 'total_spark_earned': int(total_spark),
524 'total_rewards': reward_count,
525 'agent_count': peer.agent_count or 0,
526 'post_count': peer.post_count or 0,
527 'status': peer.status,
528 }