Coverage for integrations / providers / revenue_tracker.py: 88.7%
142 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Revenue Tracker — understand earning spark vs compute spent.
4Tracks two sides of every transaction:
5 COST: what we paid (API calls, local GPU time, bandwidth)
6 REVENUE: what we earned (affiliate commissions, user credits, subscriptions)
8Key metrics:
9 earning_spark = revenue / cost (>1.0 = profitable)
10 cost_per_request = total_cost / total_requests
11 revenue_per_user = total_revenue / active_users
12 net_margin = (revenue - cost) / revenue
14Persisted at ~/Documents/Nunba/data/revenue_tracker.json.
15Exposed via /api/admin/providers/revenue endpoint.
16"""
18import json
19import logging
20import os
21import threading
22import time
23from dataclasses import dataclass, field, asdict
24from pathlib import Path
25from typing import Any, Dict, List, Optional
27logger = logging.getLogger(__name__)
30@dataclass
31class CostEntry:
32 """A single cost event."""
33 timestamp: float = 0.0
34 provider_id: str = ''
35 model_id: str = ''
36 cost_usd: float = 0.0
37 cost_type: str = 'api' # api, compute, bandwidth
38 tokens_used: int = 0
39 request_type: str = 'llm' # llm, image_gen, video_gen, etc.
42@dataclass
43class RevenueEntry:
44 """A single revenue event."""
45 timestamp: float = 0.0
46 source: str = '' # affiliate, credits, subscription
47 provider_id: str = ''
48 amount_usd: float = 0.0
49 user_id: str = ''
50 event_type: str = '' # purchase, commission, referral
53@dataclass
54class PeriodStats:
55 """Aggregated stats for a time period."""
56 period: str = '' # 'hour', 'day', 'week', 'month'
57 start_ts: float = 0.0
58 end_ts: float = 0.0
59 total_cost: float = 0.0
60 total_revenue: float = 0.0
61 total_requests: int = 0
62 earning_spark: float = 0.0 # revenue / cost
63 net_margin: float = 0.0 # (revenue - cost) / revenue
64 cost_by_provider: Dict[str, float] = field(default_factory=dict)
65 cost_by_type: Dict[str, float] = field(default_factory=dict)
66 revenue_by_source: Dict[str, float] = field(default_factory=dict)
67 top_models: List[Dict] = field(default_factory=list)
70class RevenueTracker:
71 """Track revenue vs compute cost for the earning spark metric."""
73 def __init__(self, tracker_path: Optional[str] = None):
74 try:
75 from core.platform_paths import get_db_dir
76 data_dir = Path(get_db_dir())
77 except ImportError:
78 data_dir = Path.home() / 'Documents' / 'Nunba' / 'data'
79 data_dir.mkdir(parents=True, exist_ok=True)
81 self._path = Path(tracker_path) if tracker_path else data_dir / 'revenue_tracker.json'
82 self._lock = threading.Lock()
84 # In-memory buffers (persisted periodically)
85 self._costs: List[CostEntry] = []
86 self._revenues: List[RevenueEntry] = []
88 # Running totals
89 self._total_cost = 0.0
90 self._total_revenue = 0.0
91 self._total_requests = 0
93 self._load()
95 def _load(self):
96 if self._path.exists():
97 try:
98 with open(self._path, 'r') as f:
99 data = json.load(f)
100 self._total_cost = data.get('total_cost', 0.0)
101 self._total_revenue = data.get('total_revenue', 0.0)
102 self._total_requests = data.get('total_requests', 0)
103 # Load recent entries (last 24h only — older data is aggregated)
104 for c in data.get('recent_costs', []):
105 self._costs.append(CostEntry(**{
106 k: v for k, v in c.items()
107 if k in CostEntry.__dataclass_fields__}))
108 for r in data.get('recent_revenues', []):
109 self._revenues.append(RevenueEntry(**{
110 k: v for k, v in r.items()
111 if k in RevenueEntry.__dataclass_fields__}))
112 except Exception as e:
113 logger.warning("Failed to load revenue tracker: %s", e)
115 def save(self):
116 # Keep only last 24h of entries
117 cutoff = time.time() - 86400
118 with self._lock:
119 recent_costs = [asdict(c) for c in self._costs if c.timestamp > cutoff]
120 recent_revenues = [asdict(r) for r in self._revenues if r.timestamp > cutoff]
121 data = {
122 'total_cost': self._total_cost,
123 'total_revenue': self._total_revenue,
124 'total_requests': self._total_requests,
125 'recent_costs': recent_costs,
126 'recent_revenues': recent_revenues,
127 'last_saved': time.time(),
128 }
129 try:
130 with open(self._path, 'w') as f:
131 json.dump(data, f, indent=2)
132 except Exception as e:
133 logger.error("Failed to save revenue tracker: %s", e)
135 # ── Recording ─────────────────────────────────────────────────────
137 def record_cost(self, provider_id: str, model_id: str, cost_usd: float,
138 tokens_used: int = 0, request_type: str = 'llm',
139 cost_type: str = 'api'):
140 """Record an API cost event (called by gateway after each request)."""
141 entry = CostEntry(
142 timestamp=time.time(), provider_id=provider_id,
143 model_id=model_id, cost_usd=cost_usd,
144 cost_type=cost_type, tokens_used=tokens_used,
145 request_type=request_type,
146 )
147 with self._lock:
148 self._costs.append(entry)
149 self._total_cost += cost_usd
150 self._total_requests += 1
152 # Auto-save and trim every 50 requests
153 if self._total_requests % 50 == 0:
154 self._trim_old_entries()
155 self.save()
157 def _trim_old_entries(self):
158 """Remove entries older than 24h from in-memory lists."""
159 cutoff = time.time() - 86400
160 with self._lock:
161 self._costs = [c for c in self._costs if c.timestamp > cutoff]
162 self._revenues = [r for r in self._revenues if r.timestamp > cutoff]
164 def record_revenue(self, source: str, amount_usd: float,
165 provider_id: str = '', user_id: str = '',
166 event_type: str = 'commission'):
167 """Record a revenue event and credit the contributing user.
169 When a user's compute, API keys, agents, or content generates revenue,
170 they earn spark proportional to the amount. Requires revenue_share consent.
171 """
172 # Credit the contributing user via ResonanceService (if consented)
173 if user_id and amount_usd > 0:
174 try:
175 from integrations.social.consent_service import ConsentService
176 from integrations.social.models import db_session
177 with db_session() as db:
178 if ConsentService.check_consent(db, user_id, 'revenue_share'):
179 from integrations.social.resonance_engine import ResonanceService
180 # 1 USD = 100 spark (configurable exchange rate)
181 spark_amount = max(1, int(amount_usd * 100))
182 ResonanceService.award_spark(
183 db, user_id, spark_amount,
184 source_type='revenue_share',
185 description=f'Earned from {source}: ${amount_usd:.4f} ({provider_id})',
186 )
187 db.commit()
188 except Exception as _e:
189 logger.debug("Revenue credit failed for user %s: %s", user_id, _e)
191 entry = RevenueEntry(
192 timestamp=time.time(), source=source,
193 provider_id=provider_id, amount_usd=amount_usd,
194 user_id=user_id, event_type=event_type,
195 )
196 with self._lock:
197 self._revenues.append(entry)
198 self._total_revenue += amount_usd
199 self.save()
201 # ── Analytics ─────────────────────────────────────────────────────
203 def get_earning_spark(self) -> float:
204 """Revenue / Cost ratio. >1.0 = profitable."""
205 if self._total_cost <= 0:
206 return float('inf') if self._total_revenue > 0 else 0.0
207 return self._total_revenue / self._total_cost
209 def get_summary(self) -> Dict[str, Any]:
210 """Full analytics summary for dashboard."""
211 spark = self.get_earning_spark()
212 net_margin = ((self._total_revenue - self._total_cost) / self._total_revenue
213 if self._total_revenue > 0 else 0.0)
215 # Cost breakdown by provider
216 cost_by_provider: Dict[str, float] = {}
217 cost_by_type: Dict[str, float] = {}
218 for c in self._costs:
219 cost_by_provider[c.provider_id] = cost_by_provider.get(
220 c.provider_id, 0) + c.cost_usd
221 cost_by_type[c.request_type] = cost_by_type.get(
222 c.request_type, 0) + c.cost_usd
224 # Revenue breakdown
225 revenue_by_source: Dict[str, float] = {}
226 for r in self._revenues:
227 revenue_by_source[r.source] = revenue_by_source.get(
228 r.source, 0) + r.amount_usd
230 # Top cost models
231 model_costs: Dict[str, float] = {}
232 for c in self._costs:
233 key = f"{c.provider_id}/{c.model_id}"
234 model_costs[key] = model_costs.get(key, 0) + c.cost_usd
235 top_models = sorted(model_costs.items(), key=lambda x: x[1], reverse=True)[:10]
237 return {
238 'total_cost_usd': round(self._total_cost, 6),
239 'total_revenue_usd': round(self._total_revenue, 6),
240 'net_profit_usd': round(self._total_revenue - self._total_cost, 6),
241 'earning_spark': round(spark, 3),
242 'net_margin_pct': round(net_margin * 100, 1),
243 'total_requests': self._total_requests,
244 'cost_per_request': round(self._total_cost / max(1, self._total_requests), 6),
245 'cost_by_provider': {k: round(v, 6) for k, v in
246 sorted(cost_by_provider.items(), key=lambda x: x[1], reverse=True)},
247 'cost_by_type': {k: round(v, 6) for k, v in cost_by_type.items()},
248 'revenue_by_source': {k: round(v, 6) for k, v in revenue_by_source.items()},
249 'top_cost_models': [{'model': m, 'cost': round(c, 6)} for m, c in top_models],
250 }
252 def get_period_stats(self, hours: int = 24) -> PeriodStats:
253 """Get stats for a specific time period (last N hours)."""
254 cutoff = time.time() - (hours * 3600)
255 costs = [c for c in self._costs if c.timestamp > cutoff]
256 revenues = [r for r in self._revenues if r.timestamp > cutoff]
258 total_cost = sum(c.cost_usd for c in costs)
259 total_revenue = sum(r.amount_usd for r in revenues)
260 spark = total_revenue / total_cost if total_cost > 0 else 0
262 return PeriodStats(
263 period=f'last_{hours}h',
264 start_ts=cutoff,
265 end_ts=time.time(),
266 total_cost=total_cost,
267 total_revenue=total_revenue,
268 total_requests=len(costs),
269 earning_spark=spark,
270 net_margin=((total_revenue - total_cost) / total_revenue
271 if total_revenue > 0 else 0),
272 )
275# ═══════════════════════════════════════════════════════════════════════
276# Singleton
277# ═══════════════════════════════════════════════════════════════════════
279_tracker: Optional[RevenueTracker] = None
280_tracker_lock = threading.Lock()
283def get_tracker() -> RevenueTracker:
284 global _tracker
285 if _tracker is None:
286 with _tracker_lock:
287 if _tracker is None:
288 _tracker = RevenueTracker()
289 return _tracker