Coverage for integrations / agent_engine / revenue_aggregator.py: 97.4%

116 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Revenue Aggregator — connects ad/API revenue to trading agent funding. 

3 

4Monitors platform revenue streams, automatically funds paper trading 

5goals when thresholds are exceeded, and tracks profit distribution. 

6 

7Revenue → Spark accumulation → threshold → fund trading goals → reinvestment. 

8 

9NOTE: Revenue query logic lives in query_revenue_streams() (shared with 

10finance_tools.get_financial_health). Do not duplicate DB queries here. 

11""" 

12import logging 

13import os 

14from datetime import datetime, timedelta 

15from typing import Dict, Optional 

16 

17logger = logging.getLogger(__name__) 

18 

19# Configurable via environment 

20_FUNDING_THRESHOLD = float(os.environ.get('HEVOLVE_REVENUE_FUNDING_THRESHOLD', '1000')) 

21_FUNDING_ALLOCATION_PCT = float(os.environ.get('HEVOLVE_REVENUE_FUNDING_PCT', '0.10')) 

22# ─── 90/9/1 Revenue Split Model ─── 

23# 90% → User Pool (distributed proportional to contribution_score) 

24# 9% → Infrastructure Pool (regional + central, proportional to compute spent) 

25# 1% → Central (flat unconditional take) 

26REVENUE_SPLIT_USERS = 0.90 

27REVENUE_SPLIT_INFRA = 0.09 

28REVENUE_SPLIT_CENTRAL = 0.01 

29_PROFIT_PLATFORM_SHARE = REVENUE_SPLIT_INFRA + REVENUE_SPLIT_CENTRAL # backward compat 

30 

31 

32# ─── Shared revenue query (used by both RevenueAggregator and finance_tools) ── 

33 

34def query_revenue_streams(db, period_days: int = 30) -> Dict: 

35 """Core revenue query — single source of truth for revenue data. 

36 

37 Returns: {period_days, api_revenue, ad_revenue, hosting_payouts, 

38 total_gross, platform_share} 

39 

40 Used by: RevenueAggregator.get_revenue_streams(), finance_tools.get_financial_health() 

41 """ 

42 from sqlalchemy import func 

43 cutoff = datetime.utcnow() - timedelta(days=period_days) 

44 

45 result = { 

46 'period_days': period_days, 

47 'api_revenue': 0.0, 

48 'ad_revenue': 0.0, 

49 'hosting_payouts': 0.0, 

50 'total_gross': 0.0, 

51 'user_pool_share': 0.0, 

52 'infra_pool_share': 0.0, 

53 'central_share': 0.0, 

54 'platform_share': 0.0, 

55 } 

56 

57 # API revenue 

58 try: 

59 from integrations.social.models import APIUsageLog 

60 api_total = db.query( 

61 func.coalesce(func.sum(APIUsageLog.cost_credits), 0.0) 

62 ).filter(APIUsageLog.created_at >= cutoff).scalar() or 0.0 

63 result['api_revenue'] = float(api_total) 

64 except Exception as e: 

65 logger.debug(f"API revenue query: {e}") 

66 

67 # Ad revenue (Spark spent by advertisers) 

68 try: 

69 from integrations.social.models import AdUnit 

70 ad_total = db.query( 

71 func.coalesce(func.sum(AdUnit.spent_spark), 0) 

72 ).filter(AdUnit.created_at >= cutoff).scalar() or 0 

73 result['ad_revenue'] = float(ad_total) 

74 except Exception as e: 

75 logger.debug(f"Ad revenue query: {e}") 

76 

77 # Hosting payouts (outgoing — for tracking net) 

78 try: 

79 from integrations.social.models import HostingReward 

80 hosting_total = db.query( 

81 func.coalesce(func.sum(HostingReward.amount), 0.0) 

82 ).filter(HostingReward.created_at >= cutoff).scalar() or 0.0 

83 result['hosting_payouts'] = float(hosting_total) 

84 except Exception as e: 

85 logger.debug(f"Hosting reward query: {e}") 

86 

87 gross = result['api_revenue'] + result['ad_revenue'] 

88 result['total_gross'] = gross 

89 result['user_pool_share'] = gross * REVENUE_SPLIT_USERS 

90 result['infra_pool_share'] = gross * REVENUE_SPLIT_INFRA 

91 result['central_share'] = gross * REVENUE_SPLIT_CENTRAL 

92 result['platform_share'] = gross * (REVENUE_SPLIT_INFRA + REVENUE_SPLIT_CENTRAL) # backward compat 

93 return result 

94 

95 

96class RevenueAggregator: 

97 """Connects revenue streams to trading goal funding.""" 

98 

99 @staticmethod 

100 def get_revenue_streams(db, period_days: int = 30) -> Dict: 

101 """Aggregate all revenue streams over a period. 

102 

103 Delegates to shared query_revenue_streams() — single source of truth. 

104 """ 

105 return query_revenue_streams(db, period_days) 

106 

107 @staticmethod 

108 def check_and_fund_trading(db) -> Dict: 

109 """If platform wallet excess > threshold, allocate to trading. 

110 

111 Creates a new paper trading goal via GoalManager when threshold hit. 

112 Returns: {funded: bool, amount: float, goal_id: str} 

113 """ 

114 revenue = RevenueAggregator.get_revenue_streams(db, period_days=30) 

115 platform_excess = revenue['platform_share'] - revenue['hosting_payouts'] 

116 

117 if platform_excess < _FUNDING_THRESHOLD: 

118 return { 

119 'funded': False, 

120 'platform_excess': round(platform_excess, 2), 

121 'threshold': _FUNDING_THRESHOLD, 

122 } 

123 

124 funding_amount = platform_excess * _FUNDING_ALLOCATION_PCT 

125 

126 # Check if there's already an auto-funded active trading goal 

127 try: 

128 from integrations.social.models import AgentGoal 

129 existing = db.query(AgentGoal).filter( 

130 AgentGoal.goal_type == 'trading', 

131 AgentGoal.status == 'active', 

132 AgentGoal.created_by == 'revenue_aggregator', 

133 ).first() 

134 if existing: 

135 return { 

136 'funded': False, 

137 'reason': 'active_trading_goal_exists', 

138 'goal_id': existing.id, 

139 } 

140 except Exception: 

141 pass 

142 

143 # Create trading goal 

144 try: 

145 from .goal_manager import GoalManager 

146 result = GoalManager.create_goal( 

147 db, 

148 goal_type='trading', 

149 title='Revenue-Funded Paper Trading', 

150 description=( 

151 f'Auto-funded from platform revenue excess. ' 

152 f'Budget: {int(funding_amount)} Spark. ' 

153 f'Strategy: long_term diversified. ' 

154 f'Paper trading only — live requires constitutional vote.' 

155 ), 

156 config={ 

157 'strategy': 'long_term', 

158 'paper_trading': True, 

159 'market': 'crypto', 

160 'max_budget': int(funding_amount), 

161 'max_loss_pct': 10, 

162 'auto_funded': True, 

163 'funding_source': 'platform_revenue', 

164 }, 

165 spark_budget=int(funding_amount), 

166 created_by='revenue_aggregator', 

167 ) 

168 if result.get('success'): 

169 logger.info( 

170 f"Revenue aggregator: funded trading goal with " 

171 f"{int(funding_amount)} Spark") 

172 return { 

173 'funded': True, 

174 'amount': round(funding_amount, 2), 

175 'goal_id': result['goal'].get('id'), 

176 } 

177 except Exception as e: 

178 logger.debug(f"Revenue funding failed: {e}") 

179 

180 return {'funded': False, 'error': 'goal_creation_failed'} 

181 

182 @staticmethod 

183 def distribute_trading_profits(db, portfolio_id: str) -> Dict: 

184 """Record profit distribution from a paper portfolio. 

185 

186 Paper profits are tracked only. Live profits follow 90/9/1 split. 

187 """ 

188 try: 

189 from integrations.social.models import PaperPortfolio 

190 portfolio = db.query(PaperPortfolio).filter_by(id=portfolio_id).first() 

191 if not portfolio: 

192 return {'error': 'portfolio_not_found'} 

193 

194 if portfolio.total_pnl <= 0: 

195 return {'profit': 0.0, 'distributed': False, 

196 'reason': 'no_profit'} 

197 

198 platform_share = portfolio.total_pnl * _PROFIT_PLATFORM_SHARE 

199 provider_share = portfolio.total_pnl * (1 - _PROFIT_PLATFORM_SHARE) 

200 

201 return { 

202 'portfolio_id': portfolio_id, 

203 'total_pnl': round(portfolio.total_pnl, 2), 

204 'platform_share': round(platform_share, 2), 

205 'provider_share': round(provider_share, 2), 

206 'distributed': True, 

207 'note': 'Paper trading — profits are simulated', 

208 } 

209 except Exception as e: 

210 return {'error': str(e)} 

211 

212 @staticmethod 

213 def get_dashboard(db) -> Dict: 

214 """Full revenue dashboard for /api/revenue/dashboard.""" 

215 revenue = RevenueAggregator.get_revenue_streams(db) 

216 

217 # Trading P&L 

218 trading_pnl = 0.0 

219 active_portfolios = 0 

220 try: 

221 from integrations.social.models import PaperPortfolio 

222 portfolios = db.query(PaperPortfolio).filter_by(status='active').all() 

223 active_portfolios = len(portfolios) 

224 trading_pnl = sum(p.total_pnl or 0 for p in portfolios) 

225 except Exception: 

226 pass 

227 

228 return { 

229 'revenue': revenue, 

230 'trading': { 

231 'active_portfolios': active_portfolios, 

232 'total_pnl': round(trading_pnl, 2), 

233 }, 

234 'funding': { 

235 'threshold': _FUNDING_THRESHOLD, 

236 'allocation_pct': _FUNDING_ALLOCATION_PCT, 

237 'platform_excess': round( 

238 revenue['platform_share'] - revenue['hosting_payouts'], 2), 

239 }, 

240 } 

241 

242 

243# Module singleton 

244_revenue_aggregator = None 

245 

246 

247def get_revenue_aggregator() -> RevenueAggregator: 

248 global _revenue_aggregator 

249 if _revenue_aggregator is None: 

250 _revenue_aggregator = RevenueAggregator() 

251 return _revenue_aggregator 

252 

253 

254# ─── Metered API Cost Settlement ─────────────────────────────────────── 

255 

256SPARK_PER_USD = int(os.environ.get('HEVOLVE_SPARK_PER_USD', '100')) 

257 

258 

259def settle_metered_api_costs(db, period_hours: int = 24) -> Dict: 

260 """Auto-settle pending MeteredAPIUsage records where task_source != 'own'. 

261 

262 For each pending record: 

263 1. Look up operator_id (the user whose API was consumed) 

264 2. Convert actual_usd_cost to Spark: spark = max(1, int(usd * SPARK_PER_USD)) 

265 3. award_spark(db, operator_id, spark, 'api_cost_recovery', usage.id, desc) 

266 4. Mark settlement_status = 'settled' 

267 

268 Returns: {settled_count, total_spark_awarded, total_usd_settled} 

269 """ 

270 from sqlalchemy import and_ 

271 from integrations.social.models import MeteredAPIUsage 

272 from integrations.social.resonance_engine import ResonanceService 

273 

274 cutoff = datetime.utcnow() - timedelta(hours=period_hours) 

275 pending = db.query(MeteredAPIUsage).filter( 

276 and_( 

277 MeteredAPIUsage.settlement_status == 'pending', 

278 MeteredAPIUsage.task_source != 'own', 

279 MeteredAPIUsage.created_at >= cutoff, 

280 ) 

281 ).all() 

282 

283 settled_count = 0 

284 total_spark = 0 

285 total_usd = 0.0 

286 

287 for usage in pending: 

288 if not usage.operator_id: 

289 usage.settlement_status = 'written_off' 

290 continue 

291 

292 spark_amount = max(1, int(usage.actual_usd_cost * SPARK_PER_USD)) 

293 try: 

294 ResonanceService.award_spark( 

295 db, usage.operator_id, spark_amount, 

296 'api_cost_recovery', usage.id, 

297 f'API cost recovery: {usage.model_id} ({usage.task_source})') 

298 usage.settlement_status = 'settled' 

299 settled_count += 1 

300 total_spark += spark_amount 

301 total_usd += usage.actual_usd_cost 

302 # Realtime fan-out for the live earnings drawer (idle-compute 

303 # workstream G5). Subscribers: hevolve.ai compute dashboard 

304 # SSE, Nunba shell tile. Best-effort — settlement must 

305 # never block on event emit. 

306 try: 

307 from core.platform.events import emit_event 

308 emit_event('compute.task_settled', { 

309 'usage_id': usage.id, 

310 'operator_id': usage.operator_id, 

311 'node_id': usage.node_id, 

312 'model_id': usage.model_id, 

313 'task_source': usage.task_source, 

314 'usd_cost': round(usage.actual_usd_cost, 6), 

315 'spark_awarded': spark_amount, 

316 'source_type': 'api_cost_recovery', 

317 }) 

318 except Exception: 

319 pass 

320 except Exception as e: 

321 logger.debug(f"Settlement failed for usage {usage.id}: {e}") 

322 

323 if settled_count > 0: 

324 db.flush() 

325 logger.info(f"Metered API settlement: {settled_count} records, " 

326 f"{total_spark} Spark, ${total_usd:.2f} USD") 

327 

328 # Immutable audit trail for revenue settlements 

329 try: 

330 from security.immutable_audit_log import get_audit_log 

331 get_audit_log().log_event( 

332 'revenue_settlement', 

333 actor_id='revenue_aggregator', 

334 action=f'Settled {settled_count} metered API records', 

335 detail={ 

336 'settled_count': settled_count, 

337 'total_spark_awarded': total_spark, 

338 'total_usd_settled': round(total_usd, 4), 

339 'period_hours': period_hours, 

340 'spark_per_usd': SPARK_PER_USD, 

341 }, 

342 ) 

343 except Exception: 

344 pass # Audit log failure must not block settlement 

345 

346 return { 

347 'settled_count': settled_count, 

348 'total_spark_awarded': total_spark, 

349 'total_usd_settled': round(total_usd, 4), 

350 }