Coverage for integrations / social / hosting_reward_service.py: 73.9%

176 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial - Hosting Reward Service 

3Computes contribution scores, manages visibility tiers, 

4distributes rewards to peer node operators. 

5""" 

6import logging 

7from datetime import datetime, timedelta 

8from typing import Optional, Dict, List 

9 

10from sqlalchemy import desc, func 

11from sqlalchemy.orm import Session 

12 

13from .models import PeerNode, AdImpression, HostingReward, User 

14from .resonance_engine import ResonanceService 

15 

16logger = logging.getLogger('hevolve_social') 

17 

18# --- Constants --- 

19 

20SCORE_WEIGHTS = { 

21 'uptime_ratio': 100.0, # uptime 0.0-1.0 * 100 = 0-100 points 

22 'agent_count': 2.0, # 2 points per agent hosted 

23 'post_count': 0.5, # 0.5 points per post served 

24 'ad_impressions': 0.1, # 0.1 points per ad impression served 

25 'gpu_hours': 5.0, # 5 points per GPU-hour served 

26 'inferences': 0.01, # 0.01 points per inference 

27 'energy_kwh': 2.0, # 2 points per kWh contributed 

28 'api_costs_absorbed': 10.0, # 10 points per USD metered API absorbed for hive 

29} 

30 

31TIER_THRESHOLDS = { 

32 'standard': 0, 

33 'featured': 100, 

34 'priority': 500, 

35} 

36 

37HOSTING_MILESTONES = [10, 50, 100, 500] # agent_count thresholds 

38 

39try: 

40 from integrations.agent_engine.revenue_aggregator import REVENUE_SPLIT_USERS 

41 HOSTER_REVENUE_SHARE = REVENUE_SPLIT_USERS # 0.90 (was 0.70) 

42except ImportError: 

43 HOSTER_REVENUE_SHARE = 0.90 

44 

45# Same canonical fallback discipline as REVENUE_SPLIT_USERS above -- 

46# import once at module load so estimate_weekly_spark() does not pay 

47# the import cost on every call. 

48try: 

49 from integrations.agent_engine.revenue_aggregator import SPARK_PER_USD as _SPARK_PER_USD 

50except ImportError: 

51 _SPARK_PER_USD = 100.0 

52 

53 

54class HostingRewardService: 

55 

56 # --- Contribution Scoring --- 

57 

58 @staticmethod 

59 def compute_contribution_score(db: Session, node_id: str, 

60 period_days: int = 7) -> Optional[Dict]: 

61 """Compute and update contribution_score + visibility_tier for a PeerNode.""" 

62 peer = db.query(PeerNode).filter_by(node_id=node_id).first() 

63 if not peer: 

64 return None 

65 

66 uptime = HostingRewardService.compute_uptime_ratio(db, peer) 

67 

68 # Ad impressions served by this node in the period 

69 cutoff = datetime.utcnow() - timedelta(days=period_days) 

70 ad_imp_count = db.query(func.count(AdImpression.id)).filter( 

71 AdImpression.node_id == node_id, 

72 AdImpression.created_at >= cutoff, 

73 ).scalar() or 0 

74 

75 score = ( 

76 uptime * SCORE_WEIGHTS['uptime_ratio'] 

77 + (peer.agent_count or 0) * SCORE_WEIGHTS['agent_count'] 

78 + (peer.post_count or 0) * SCORE_WEIGHTS['post_count'] 

79 + ad_imp_count * SCORE_WEIGHTS['ad_impressions'] 

80 + (peer.gpu_hours_served or 0) * SCORE_WEIGHTS['gpu_hours'] 

81 + (peer.total_inferences or 0) * SCORE_WEIGHTS['inferences'] 

82 + (peer.energy_kwh_contributed or 0) * SCORE_WEIGHTS['energy_kwh'] 

83 + (peer.metered_api_costs_absorbed or 0) * SCORE_WEIGHTS['api_costs_absorbed'] 

84 ) 

85 

86 old_tier = peer.visibility_tier 

87 peer.contribution_score = round(score, 2) 

88 peer.visibility_tier = HostingRewardService._determine_tier(score) 

89 db.flush() 

90 

91 return { 

92 'node_id': node_id, 

93 'score': peer.contribution_score, 

94 'tier': peer.visibility_tier, 

95 'previous_tier': old_tier, 

96 'breakdown': { 

97 'uptime': round(uptime * SCORE_WEIGHTS['uptime_ratio'], 2), 

98 'agents': (peer.agent_count or 0) * SCORE_WEIGHTS['agent_count'], 

99 'posts': (peer.post_count or 0) * SCORE_WEIGHTS['post_count'], 

100 'ad_impressions': ad_imp_count * SCORE_WEIGHTS['ad_impressions'], 

101 'gpu_hours': (peer.gpu_hours_served or 0) * SCORE_WEIGHTS['gpu_hours'], 

102 'inferences': (peer.total_inferences or 0) * SCORE_WEIGHTS['inferences'], 

103 'energy_kwh': (peer.energy_kwh_contributed or 0) * SCORE_WEIGHTS['energy_kwh'], 

104 'api_costs_absorbed': (peer.metered_api_costs_absorbed or 0) * SCORE_WEIGHTS['api_costs_absorbed'], 

105 }, 

106 } 

107 

108 @staticmethod 

109 def compute_uptime_ratio(db: Session, peer: PeerNode) -> float: 

110 """Compute uptime ratio: active=1.0, stale=0.5, dead=0.0.""" 

111 if peer.status == 'active': 

112 return 1.0 

113 elif peer.status == 'stale': 

114 return 0.5 

115 return 0.0 

116 

117 @staticmethod 

118 def _determine_tier(score: float) -> str: 

119 if score >= TIER_THRESHOLDS['priority']: 

120 return 'priority' 

121 elif score >= TIER_THRESHOLDS['featured']: 

122 return 'featured' 

123 return 'standard' 

124 

125 @staticmethod 

126 def compute_all_scores(db: Session, period_days: int = 7) -> List[Dict]: 

127 """Batch: compute scores for all active/stale PeerNodes.""" 

128 peers = db.query(PeerNode).filter( 

129 PeerNode.status.in_(['active', 'stale']) 

130 ).all() 

131 results = [] 

132 for peer in peers: 

133 result = HostingRewardService.compute_contribution_score( 

134 db, peer.node_id, period_days) 

135 if result: 

136 results.append(result) 

137 return results 

138 

139 # --- Pre-opt-in Earnings Estimate (idle_compute_workstream G3) --- 

140 

141 # Tier-typical hardware profile keyed on `NodeTierLevel.value` from 

142 # security.system_requirements (single source of truth for tier 

143 # names -- no parallel ladder). Numbers are pure-function over 

144 # SCORE_WEIGHTS so any tweak there flows through automatically. 

145 # Calibration: conservative top-quartile-of-observed estimates; 

146 # bumped up at compute_host because that tier is explicitly built 

147 # for serving regional inference traffic. 

148 _TIER_PROFILE = { 

149 'embedded': {'gpu_factor': 0.0, 'agents': 0, 'inf_per_week': 50, 'kwh_per_week': 0.2}, 

150 'observer': {'gpu_factor': 0.0, 'agents': 0, 'inf_per_week': 100, 'kwh_per_week': 0.5}, 

151 'lite': {'gpu_factor': 0.0, 'agents': 1, 'inf_per_week': 200, 'kwh_per_week': 1.0}, 

152 'standard': {'gpu_factor': 0.5, 'agents': 3, 'inf_per_week': 1000, 'kwh_per_week': 2.5}, 

153 'full': {'gpu_factor': 1.0, 'agents': 5, 'inf_per_week': 5000, 'kwh_per_week': 6.0}, 

154 'compute_host': {'gpu_factor': 1.0, 'agents': 12,'inf_per_week': 50000, 'kwh_per_week': 25.0}, 

155 } 

156 

157 @staticmethod 

158 def estimate_weekly_spark( 

159 tier: str = 'standard', 

160 has_gpu: bool = False, 

161 weekly_hours: int = 168, 

162 ) -> Dict: 

163 """Pre-opt-in Spark estimate for the self-advertising banner. 

164 

165 Pure function over `SCORE_WEIGHTS` + tier profile -- no DB read, 

166 no parallel scoring path. The same weights drive realized 

167 scoring in `compute_contribution_score`, so any retune flows 

168 through both surfaces atomically. 

169 

170 Args: 

171 tier: a `NodeTierLevel.value` -- embedded|observer|lite| 

172 standard|full|compute_host (from 

173 `security.system_requirements.get_capabilities`). 

174 has_gpu: when True, multiplies the gpu_hours term by the 

175 tier's gpu_factor; embedded/observer/lite tiers 

176 default to 0 even with a GPU because the profile 

177 assumes they don't continuously serve GPU jobs. 

178 weekly_hours: how many hours per week the node is online. 

179 168 = always-on; 40 = office-hours only. 

180 

181 Returns: 

182 {'weekly_spark': int, 'weekly_usd': float, 

183 'breakdown': {gpu_hours, inferences, energy_kwh, agents, 

184 uptime}, 'tier': tier, 'assumptions': dict} 

185 """ 

186 profile = HostingRewardService._TIER_PROFILE.get( 

187 tier, HostingRewardService._TIER_PROFILE['standard']) 

188 # Uptime fraction over the week (168h) 

189 uptime_ratio = max(0.0, min(1.0, weekly_hours / 168.0)) 

190 # GPU hours over the week -- only when has_gpu AND tier supports it 

191 gpu_hours = weekly_hours * profile['gpu_factor'] if has_gpu else 0.0 

192 breakdown = { 

193 'uptime': round(uptime_ratio * SCORE_WEIGHTS['uptime_ratio'], 2), 

194 'agents': profile['agents'] * SCORE_WEIGHTS['agent_count'], 

195 'gpu_hours': round(gpu_hours * SCORE_WEIGHTS['gpu_hours'], 2), 

196 'inferences': round( 

197 profile['inf_per_week'] * SCORE_WEIGHTS['inferences'], 2), 

198 'energy_kwh': round( 

199 profile['kwh_per_week'] * SCORE_WEIGHTS['energy_kwh'], 2), 

200 } 

201 weekly_score = sum(breakdown.values()) 

202 # Score -> Spark: 1 score-point ~= 1 Spark in current calibration 

203 # (revenue_aggregator's 90/9/1 split applies at distribution 

204 # time, not here -- this is gross). USD conversion uses the 

205 # canonical SPARK_PER_USD constant imported at module load. 

206 usd_per_spark = 1.0 / float(_SPARK_PER_USD or 100) 

207 return { 

208 'tier': tier, 

209 'weekly_spark': int(round(weekly_score)), 

210 'weekly_usd': round(weekly_score * usd_per_spark, 2), 

211 'breakdown': breakdown, 

212 'assumptions': { 

213 'has_gpu': bool(has_gpu), 

214 'weekly_hours': int(weekly_hours), 

215 'gpu_factor': profile['gpu_factor'], 

216 'inferences_per_week': profile['inf_per_week'], 

217 'energy_kwh_per_week': profile['kwh_per_week'], 

218 'spark_per_usd': float(_SPARK_PER_USD or 100), 

219 }, 

220 } 

221 

222 # --- Reward Distribution --- 

223 

224 @staticmethod 

225 def distribute_ad_revenue(db: Session, node_id: str, 

226 period: str = 'daily') -> Optional[Dict]: 

227 """Distribute ad revenue to node operator for a period.""" 

228 peer = db.query(PeerNode).filter_by(node_id=node_id).first() 

229 if not peer or not peer.node_operator_id: 

230 return None 

231 

232 # Count impressions served by this node in the last 24h 

233 cutoff = datetime.utcnow() - timedelta(hours=24) 

234 imp_count = db.query(func.count(AdImpression.id)).filter( 

235 AdImpression.node_id == node_id, 

236 AdImpression.created_at >= cutoff, 

237 ).scalar() or 0 

238 

239 if imp_count == 0: 

240 return None 

241 

242 # Calculate revenue: avg CPI * impressions * hoster share 

243 avg_cpi = db.query(func.avg(AdImpression.ad.has())).scalar() or 0.1 

244 # Simpler: use default CPI 

245 revenue_spark = int(imp_count * 0.1 * HOSTER_REVENUE_SHARE) 

246 if revenue_spark < 1: 

247 revenue_spark = 1 

248 

249 # Credit operator 

250 ResonanceService.award_spark( 

251 db, peer.node_operator_id, revenue_spark, 

252 'ad_revenue', node_id, 

253 f'Ad revenue: {imp_count} impressions served') 

254 

255 # Record hosting reward 

256 reward = HostingReward( 

257 node_id=node_id, 

258 operator_id=peer.node_operator_id, 

259 amount=revenue_spark, 

260 currency='spark', 

261 period=period, 

262 reason=f'Ad revenue share: {imp_count} impressions', 

263 ad_impressions_count=imp_count, 

264 uptime_ratio=HostingRewardService.compute_uptime_ratio(db, peer), 

265 contribution_score_snapshot=peer.contribution_score or 0, 

266 ) 

267 db.add(reward) 

268 

269 # Batch impression bonus (1 Spark per 100 impressions) 

270 batches = imp_count // 100 

271 if batches > 0: 

272 ResonanceService.award_spark( 

273 db, peer.node_operator_id, batches, 

274 'ad_impression_served', node_id, 

275 f'Batch impression bonus: {batches} x 100') 

276 

277 db.flush() 

278 

279 # Immutable audit trail for ad revenue distribution 

280 try: 

281 from security.immutable_audit_log import get_audit_log 

282 get_audit_log().log_event( 

283 'revenue_distribution', 

284 actor_id='hosting_reward_service', 

285 action=f'Ad revenue distributed to {peer.node_operator_id}', 

286 detail={ 

287 'node_id': node_id, 

288 'operator_id': peer.node_operator_id, 

289 'spark_amount': revenue_spark, 

290 'impressions': imp_count, 

291 'period': period, 

292 }, 

293 target_id=node_id, 

294 ) 

295 except Exception: 

296 pass # Audit log failure must not block distribution 

297 

298 return reward.to_dict() 

299 

300 @staticmethod 

301 def distribute_uptime_bonus(db: Session, node_id: str) -> Optional[Dict]: 

302 """Award daily bonus for 100% uptime (status='active').""" 

303 peer = db.query(PeerNode).filter_by( 

304 node_id=node_id, status='active').first() 

305 if not peer or not peer.node_operator_id: 

306 return None 

307 

308 # Check: last_seen within 5 minutes (alive) 

309 if peer.last_seen: 

310 age = (datetime.utcnow() - peer.last_seen).total_seconds() 

311 if age > 300: 

312 return None # Not truly active 

313 

314 # Check if already awarded today 

315 today_start = datetime.utcnow().replace(hour=0, minute=0, second=0, microsecond=0) 

316 existing = db.query(HostingReward).filter( 

317 HostingReward.node_id == node_id, 

318 HostingReward.period == 'daily', 

319 HostingReward.reason.like('Uptime bonus%'), 

320 HostingReward.created_at >= today_start, 

321 ).first() 

322 if existing: 

323 return None # Already awarded today 

324 

325 # Award: 10 Spark + 5 Pulse + 20 XP 

326 ResonanceService.award_spark( 

327 db, peer.node_operator_id, 10, 'hosting_uptime_bonus', node_id, 

328 'Daily uptime bonus') 

329 ResonanceService.award_pulse( 

330 db, peer.node_operator_id, 5, 'hosting_uptime_bonus', node_id, 

331 'Daily uptime bonus') 

332 ResonanceService.award_xp( 

333 db, peer.node_operator_id, 20, 'hosting_uptime_bonus', node_id, 

334 'Daily uptime bonus') 

335 

336 reward = HostingReward( 

337 node_id=node_id, 

338 operator_id=peer.node_operator_id, 

339 amount=10, currency='spark', 

340 period='daily', 

341 reason='Uptime bonus: 100% active', 

342 uptime_ratio=1.0, 

343 contribution_score_snapshot=peer.contribution_score or 0, 

344 ) 

345 db.add(reward) 

346 db.flush() 

347 

348 # Immutable audit trail for uptime bonus distribution 

349 try: 

350 from security.immutable_audit_log import get_audit_log 

351 get_audit_log().log_event( 

352 'revenue_distribution', 

353 actor_id='hosting_reward_service', 

354 action=f'Uptime bonus distributed to {peer.node_operator_id}', 

355 detail={ 

356 'node_id': node_id, 

357 'operator_id': peer.node_operator_id, 

358 'spark_amount': 10, 

359 'pulse_amount': 5, 

360 'xp_amount': 20, 

361 'reason': 'daily_uptime_bonus', 

362 }, 

363 target_id=node_id, 

364 ) 

365 except Exception: 

366 pass # Audit log failure must not block distribution 

367 

368 return reward.to_dict() 

369 

370 @staticmethod 

371 def check_milestones(db: Session, node_id: str) -> Optional[Dict]: 

372 """Check and award hosting milestones based on agent_count.""" 

373 peer = db.query(PeerNode).filter_by(node_id=node_id).first() 

374 if not peer or not peer.node_operator_id: 

375 return None 

376 

377 agent_count = peer.agent_count or 0 

378 awarded = None 

379 

380 for threshold in HOSTING_MILESTONES: 

381 if agent_count >= threshold: 

382 # Check if already awarded 

383 existing = db.query(HostingReward).filter( 

384 HostingReward.node_id == node_id, 

385 HostingReward.period == 'milestone', 

386 HostingReward.reason == f'Hosting milestone: {threshold} agents', 

387 ).first() 

388 if existing: 

389 continue 

390 

391 # Award milestone 

392 ResonanceService.award_spark( 

393 db, peer.node_operator_id, 50, 

394 'hosting_milestone', node_id, 

395 f'Milestone: {threshold} agents hosted') 

396 ResonanceService.award_pulse( 

397 db, peer.node_operator_id, 25, 

398 'hosting_milestone', node_id, 

399 f'Milestone: {threshold} agents hosted') 

400 ResonanceService.award_xp( 

401 db, peer.node_operator_id, 100, 

402 'hosting_milestone', node_id, 

403 f'Milestone: {threshold} agents hosted') 

404 

405 reward = HostingReward( 

406 node_id=node_id, 

407 operator_id=peer.node_operator_id, 

408 amount=50, currency='spark', 

409 period='milestone', 

410 reason=f'Hosting milestone: {threshold} agents', 

411 contribution_score_snapshot=peer.contribution_score or 0, 

412 ) 

413 db.add(reward) 

414 awarded = reward 

415 

416 if awarded: 

417 db.flush() 

418 return awarded.to_dict() 

419 return None 

420 

421 # --- Queries --- 

422 

423 # --- Compute Stats Aggregation --- 

424 

425 @staticmethod 

426 def aggregate_compute_stats(db: Session, node_id: str, 

427 period_hours: int = 24) -> Optional[Dict]: 

428 """Aggregate metered API usage into PeerNode cumulative stats. 

429 

430 Queries MeteredAPIUsage for hive/idle tasks and updates PeerNode's 

431 gpu_hours_served, total_inferences, energy_kwh_contributed, 

432 metered_api_costs_absorbed columns. 

433 """ 

434 peer = db.query(PeerNode).filter_by(node_id=node_id).first() 

435 if not peer: 

436 return None 

437 

438 try: 

439 from .models import MeteredAPIUsage 

440 cutoff = datetime.utcnow() - timedelta(hours=period_hours) 

441 usages = db.query(MeteredAPIUsage).filter( 

442 MeteredAPIUsage.node_id == node_id, 

443 MeteredAPIUsage.task_source != 'own', 

444 MeteredAPIUsage.created_at >= cutoff, 

445 ).all() 

446 

447 inference_count = len(usages) 

448 usd_absorbed = sum(u.actual_usd_cost or 0 for u in usages) 

449 total_tokens = sum((u.tokens_in or 0) + (u.tokens_out or 0) for u in usages) 

450 

451 # Estimate GPU hours: ~1 GPU-second per 1K tokens (rough heuristic) 

452 gpu_hours_delta = (total_tokens / 1000.0) / 3600.0 

453 

454 # Estimate energy: use ModelRegistry if available, else 170W TDP default 

455 try: 

456 from integrations.agent_engine.model_registry import model_registry 

457 energy_delta = model_registry.get_total_energy_kwh(hours=period_hours) 

458 except Exception: 

459 energy_delta = gpu_hours_delta * 0.170 # 170W default TDP 

460 

461 peer.total_inferences = (peer.total_inferences or 0) + inference_count 

462 peer.gpu_hours_served = round((peer.gpu_hours_served or 0) + gpu_hours_delta, 4) 

463 peer.energy_kwh_contributed = round( 

464 (peer.energy_kwh_contributed or 0) + energy_delta, 4) 

465 peer.metered_api_costs_absorbed = round( 

466 (peer.metered_api_costs_absorbed or 0) + usd_absorbed, 4) 

467 

468 db.flush() 

469 return { 

470 'node_id': node_id, 

471 'period_hours': period_hours, 

472 'inferences_added': inference_count, 

473 'gpu_hours_added': round(gpu_hours_delta, 4), 

474 'energy_kwh_added': round(energy_delta, 4), 

475 'usd_absorbed_added': round(usd_absorbed, 4), 

476 } 

477 except Exception as e: 

478 logger.debug(f"Compute stats aggregation failed: {e}") 

479 return None 

480 

481 # --- Queries --- 

482 

483 @staticmethod 

484 def get_rewards(db: Session, node_id: str = None, 

485 operator_id: str = None, 

486 limit: int = 50, offset: int = 0) -> List[Dict]: 

487 q = db.query(HostingReward) 

488 if node_id: 

489 q = q.filter_by(node_id=node_id) 

490 if operator_id: 

491 q = q.filter_by(operator_id=operator_id) 

492 rewards = q.order_by(desc(HostingReward.created_at)).offset(offset).limit(limit).all() 

493 return [r.to_dict() for r in rewards] 

494 

495 @staticmethod 

496 def get_leaderboard(db: Session, limit: int = 50, 

497 offset: int = 0) -> List[Dict]: 

498 nodes = db.query(PeerNode).filter( 

499 PeerNode.status.in_(['active', 'stale']) 

500 ).order_by(desc(PeerNode.contribution_score)).offset(offset).limit(limit).all() 

501 return [n.to_dict() for n in nodes] 

502 

503 @staticmethod 

504 def get_reward_summary(db: Session, node_id: str) -> Dict: 

505 peer = db.query(PeerNode).filter_by(node_id=node_id).first() 

506 if not peer: 

507 return {'error': 'Node not found'} 

508 

509 total_spark = db.query(func.coalesce( 

510 func.sum(HostingReward.amount), 0 

511 )).filter( 

512 HostingReward.node_id == node_id, 

513 HostingReward.currency == 'spark', 

514 ).scalar() 

515 

516 reward_count = db.query(func.count(HostingReward.id)).filter_by( 

517 node_id=node_id).scalar() 

518 

519 return { 

520 'node_id': node_id, 

521 'contribution_score': peer.contribution_score or 0, 

522 'visibility_tier': peer.visibility_tier or 'standard', 

523 'total_spark_earned': int(total_spark), 

524 'total_rewards': reward_count, 

525 'agent_count': peer.agent_count or 0, 

526 'post_count': peer.post_count or 0, 

527 'status': peer.status, 

528 }