Coverage for integrations / agent_engine / revenue_aggregator.py: 97.4%
116 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Revenue Aggregator — connects ad/API revenue to trading agent funding.
4Monitors platform revenue streams, automatically funds paper trading
5goals when thresholds are exceeded, and tracks profit distribution.
7Revenue → Spark accumulation → threshold → fund trading goals → reinvestment.
9NOTE: Revenue query logic lives in query_revenue_streams() (shared with
10finance_tools.get_financial_health). Do not duplicate DB queries here.
11"""
12import logging
13import os
14from datetime import datetime, timedelta
15from typing import Dict, Optional
17logger = logging.getLogger(__name__)
19# Configurable via environment
20_FUNDING_THRESHOLD = float(os.environ.get('HEVOLVE_REVENUE_FUNDING_THRESHOLD', '1000'))
21_FUNDING_ALLOCATION_PCT = float(os.environ.get('HEVOLVE_REVENUE_FUNDING_PCT', '0.10'))
22# ─── 90/9/1 Revenue Split Model ───
23# 90% → User Pool (distributed proportional to contribution_score)
24# 9% → Infrastructure Pool (regional + central, proportional to compute spent)
25# 1% → Central (flat unconditional take)
26REVENUE_SPLIT_USERS = 0.90
27REVENUE_SPLIT_INFRA = 0.09
28REVENUE_SPLIT_CENTRAL = 0.01
29_PROFIT_PLATFORM_SHARE = REVENUE_SPLIT_INFRA + REVENUE_SPLIT_CENTRAL # backward compat
32# ─── Shared revenue query (used by both RevenueAggregator and finance_tools) ──
34def query_revenue_streams(db, period_days: int = 30) -> Dict:
35 """Core revenue query — single source of truth for revenue data.
37 Returns: {period_days, api_revenue, ad_revenue, hosting_payouts,
38 total_gross, platform_share}
40 Used by: RevenueAggregator.get_revenue_streams(), finance_tools.get_financial_health()
41 """
42 from sqlalchemy import func
43 cutoff = datetime.utcnow() - timedelta(days=period_days)
45 result = {
46 'period_days': period_days,
47 'api_revenue': 0.0,
48 'ad_revenue': 0.0,
49 'hosting_payouts': 0.0,
50 'total_gross': 0.0,
51 'user_pool_share': 0.0,
52 'infra_pool_share': 0.0,
53 'central_share': 0.0,
54 'platform_share': 0.0,
55 }
57 # API revenue
58 try:
59 from integrations.social.models import APIUsageLog
60 api_total = db.query(
61 func.coalesce(func.sum(APIUsageLog.cost_credits), 0.0)
62 ).filter(APIUsageLog.created_at >= cutoff).scalar() or 0.0
63 result['api_revenue'] = float(api_total)
64 except Exception as e:
65 logger.debug(f"API revenue query: {e}")
67 # Ad revenue (Spark spent by advertisers)
68 try:
69 from integrations.social.models import AdUnit
70 ad_total = db.query(
71 func.coalesce(func.sum(AdUnit.spent_spark), 0)
72 ).filter(AdUnit.created_at >= cutoff).scalar() or 0
73 result['ad_revenue'] = float(ad_total)
74 except Exception as e:
75 logger.debug(f"Ad revenue query: {e}")
77 # Hosting payouts (outgoing — for tracking net)
78 try:
79 from integrations.social.models import HostingReward
80 hosting_total = db.query(
81 func.coalesce(func.sum(HostingReward.amount), 0.0)
82 ).filter(HostingReward.created_at >= cutoff).scalar() or 0.0
83 result['hosting_payouts'] = float(hosting_total)
84 except Exception as e:
85 logger.debug(f"Hosting reward query: {e}")
87 gross = result['api_revenue'] + result['ad_revenue']
88 result['total_gross'] = gross
89 result['user_pool_share'] = gross * REVENUE_SPLIT_USERS
90 result['infra_pool_share'] = gross * REVENUE_SPLIT_INFRA
91 result['central_share'] = gross * REVENUE_SPLIT_CENTRAL
92 result['platform_share'] = gross * (REVENUE_SPLIT_INFRA + REVENUE_SPLIT_CENTRAL) # backward compat
93 return result
96class RevenueAggregator:
97 """Connects revenue streams to trading goal funding."""
99 @staticmethod
100 def get_revenue_streams(db, period_days: int = 30) -> Dict:
101 """Aggregate all revenue streams over a period.
103 Delegates to shared query_revenue_streams() — single source of truth.
104 """
105 return query_revenue_streams(db, period_days)
107 @staticmethod
108 def check_and_fund_trading(db) -> Dict:
109 """If platform wallet excess > threshold, allocate to trading.
111 Creates a new paper trading goal via GoalManager when threshold hit.
112 Returns: {funded: bool, amount: float, goal_id: str}
113 """
114 revenue = RevenueAggregator.get_revenue_streams(db, period_days=30)
115 platform_excess = revenue['platform_share'] - revenue['hosting_payouts']
117 if platform_excess < _FUNDING_THRESHOLD:
118 return {
119 'funded': False,
120 'platform_excess': round(platform_excess, 2),
121 'threshold': _FUNDING_THRESHOLD,
122 }
124 funding_amount = platform_excess * _FUNDING_ALLOCATION_PCT
126 # Check if there's already an auto-funded active trading goal
127 try:
128 from integrations.social.models import AgentGoal
129 existing = db.query(AgentGoal).filter(
130 AgentGoal.goal_type == 'trading',
131 AgentGoal.status == 'active',
132 AgentGoal.created_by == 'revenue_aggregator',
133 ).first()
134 if existing:
135 return {
136 'funded': False,
137 'reason': 'active_trading_goal_exists',
138 'goal_id': existing.id,
139 }
140 except Exception:
141 pass
143 # Create trading goal
144 try:
145 from .goal_manager import GoalManager
146 result = GoalManager.create_goal(
147 db,
148 goal_type='trading',
149 title='Revenue-Funded Paper Trading',
150 description=(
151 f'Auto-funded from platform revenue excess. '
152 f'Budget: {int(funding_amount)} Spark. '
153 f'Strategy: long_term diversified. '
154 f'Paper trading only — live requires constitutional vote.'
155 ),
156 config={
157 'strategy': 'long_term',
158 'paper_trading': True,
159 'market': 'crypto',
160 'max_budget': int(funding_amount),
161 'max_loss_pct': 10,
162 'auto_funded': True,
163 'funding_source': 'platform_revenue',
164 },
165 spark_budget=int(funding_amount),
166 created_by='revenue_aggregator',
167 )
168 if result.get('success'):
169 logger.info(
170 f"Revenue aggregator: funded trading goal with "
171 f"{int(funding_amount)} Spark")
172 return {
173 'funded': True,
174 'amount': round(funding_amount, 2),
175 'goal_id': result['goal'].get('id'),
176 }
177 except Exception as e:
178 logger.debug(f"Revenue funding failed: {e}")
180 return {'funded': False, 'error': 'goal_creation_failed'}
182 @staticmethod
183 def distribute_trading_profits(db, portfolio_id: str) -> Dict:
184 """Record profit distribution from a paper portfolio.
186 Paper profits are tracked only. Live profits follow 90/9/1 split.
187 """
188 try:
189 from integrations.social.models import PaperPortfolio
190 portfolio = db.query(PaperPortfolio).filter_by(id=portfolio_id).first()
191 if not portfolio:
192 return {'error': 'portfolio_not_found'}
194 if portfolio.total_pnl <= 0:
195 return {'profit': 0.0, 'distributed': False,
196 'reason': 'no_profit'}
198 platform_share = portfolio.total_pnl * _PROFIT_PLATFORM_SHARE
199 provider_share = portfolio.total_pnl * (1 - _PROFIT_PLATFORM_SHARE)
201 return {
202 'portfolio_id': portfolio_id,
203 'total_pnl': round(portfolio.total_pnl, 2),
204 'platform_share': round(platform_share, 2),
205 'provider_share': round(provider_share, 2),
206 'distributed': True,
207 'note': 'Paper trading — profits are simulated',
208 }
209 except Exception as e:
210 return {'error': str(e)}
212 @staticmethod
213 def get_dashboard(db) -> Dict:
214 """Full revenue dashboard for /api/revenue/dashboard."""
215 revenue = RevenueAggregator.get_revenue_streams(db)
217 # Trading P&L
218 trading_pnl = 0.0
219 active_portfolios = 0
220 try:
221 from integrations.social.models import PaperPortfolio
222 portfolios = db.query(PaperPortfolio).filter_by(status='active').all()
223 active_portfolios = len(portfolios)
224 trading_pnl = sum(p.total_pnl or 0 for p in portfolios)
225 except Exception:
226 pass
228 return {
229 'revenue': revenue,
230 'trading': {
231 'active_portfolios': active_portfolios,
232 'total_pnl': round(trading_pnl, 2),
233 },
234 'funding': {
235 'threshold': _FUNDING_THRESHOLD,
236 'allocation_pct': _FUNDING_ALLOCATION_PCT,
237 'platform_excess': round(
238 revenue['platform_share'] - revenue['hosting_payouts'], 2),
239 },
240 }
243# Module singleton
244_revenue_aggregator = None
247def get_revenue_aggregator() -> RevenueAggregator:
248 global _revenue_aggregator
249 if _revenue_aggregator is None:
250 _revenue_aggregator = RevenueAggregator()
251 return _revenue_aggregator
254# ─── Metered API Cost Settlement ───────────────────────────────────────
256SPARK_PER_USD = int(os.environ.get('HEVOLVE_SPARK_PER_USD', '100'))
259def settle_metered_api_costs(db, period_hours: int = 24) -> Dict:
260 """Auto-settle pending MeteredAPIUsage records where task_source != 'own'.
262 For each pending record:
263 1. Look up operator_id (the user whose API was consumed)
264 2. Convert actual_usd_cost to Spark: spark = max(1, int(usd * SPARK_PER_USD))
265 3. award_spark(db, operator_id, spark, 'api_cost_recovery', usage.id, desc)
266 4. Mark settlement_status = 'settled'
268 Returns: {settled_count, total_spark_awarded, total_usd_settled}
269 """
270 from sqlalchemy import and_
271 from integrations.social.models import MeteredAPIUsage
272 from integrations.social.resonance_engine import ResonanceService
274 cutoff = datetime.utcnow() - timedelta(hours=period_hours)
275 pending = db.query(MeteredAPIUsage).filter(
276 and_(
277 MeteredAPIUsage.settlement_status == 'pending',
278 MeteredAPIUsage.task_source != 'own',
279 MeteredAPIUsage.created_at >= cutoff,
280 )
281 ).all()
283 settled_count = 0
284 total_spark = 0
285 total_usd = 0.0
287 for usage in pending:
288 if not usage.operator_id:
289 usage.settlement_status = 'written_off'
290 continue
292 spark_amount = max(1, int(usage.actual_usd_cost * SPARK_PER_USD))
293 try:
294 ResonanceService.award_spark(
295 db, usage.operator_id, spark_amount,
296 'api_cost_recovery', usage.id,
297 f'API cost recovery: {usage.model_id} ({usage.task_source})')
298 usage.settlement_status = 'settled'
299 settled_count += 1
300 total_spark += spark_amount
301 total_usd += usage.actual_usd_cost
302 # Realtime fan-out for the live earnings drawer (idle-compute
303 # workstream G5). Subscribers: hevolve.ai compute dashboard
304 # SSE, Nunba shell tile. Best-effort — settlement must
305 # never block on event emit.
306 try:
307 from core.platform.events import emit_event
308 emit_event('compute.task_settled', {
309 'usage_id': usage.id,
310 'operator_id': usage.operator_id,
311 'node_id': usage.node_id,
312 'model_id': usage.model_id,
313 'task_source': usage.task_source,
314 'usd_cost': round(usage.actual_usd_cost, 6),
315 'spark_awarded': spark_amount,
316 'source_type': 'api_cost_recovery',
317 })
318 except Exception:
319 pass
320 except Exception as e:
321 logger.debug(f"Settlement failed for usage {usage.id}: {e}")
323 if settled_count > 0:
324 db.flush()
325 logger.info(f"Metered API settlement: {settled_count} records, "
326 f"{total_spark} Spark, ${total_usd:.2f} USD")
328 # Immutable audit trail for revenue settlements
329 try:
330 from security.immutable_audit_log import get_audit_log
331 get_audit_log().log_event(
332 'revenue_settlement',
333 actor_id='revenue_aggregator',
334 action=f'Settled {settled_count} metered API records',
335 detail={
336 'settled_count': settled_count,
337 'total_spark_awarded': total_spark,
338 'total_usd_settled': round(total_usd, 4),
339 'period_hours': period_hours,
340 'spark_per_usd': SPARK_PER_USD,
341 },
342 )
343 except Exception:
344 pass # Audit log failure must not block settlement
346 return {
347 'settled_count': settled_count,
348 'total_spark_awarded': total_spark,
349 'total_usd_settled': round(total_usd, 4),
350 }