Coverage for integrations / providers / revenue_tracker.py: 88.7%

142 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Revenue Tracker — understand earning spark vs compute spent. 

3 

4Tracks two sides of every transaction: 

5 COST: what we paid (API calls, local GPU time, bandwidth) 

6 REVENUE: what we earned (affiliate commissions, user credits, subscriptions) 

7 

8Key metrics: 

9 earning_spark = revenue / cost (>1.0 = profitable) 

10 cost_per_request = total_cost / total_requests 

11 revenue_per_user = total_revenue / active_users 

12 net_margin = (revenue - cost) / revenue 

13 

14Persisted at ~/Documents/Nunba/data/revenue_tracker.json. 

15Exposed via /api/admin/providers/revenue endpoint. 

16""" 

17 

18import json 

19import logging 

20import os 

21import threading 

22import time 

23from dataclasses import dataclass, field, asdict 

24from pathlib import Path 

25from typing import Any, Dict, List, Optional 

26 

27logger = logging.getLogger(__name__) 

28 

29 

30@dataclass 

31class CostEntry: 

32 """A single cost event.""" 

33 timestamp: float = 0.0 

34 provider_id: str = '' 

35 model_id: str = '' 

36 cost_usd: float = 0.0 

37 cost_type: str = 'api' # api, compute, bandwidth 

38 tokens_used: int = 0 

39 request_type: str = 'llm' # llm, image_gen, video_gen, etc. 

40 

41 

42@dataclass 

43class RevenueEntry: 

44 """A single revenue event.""" 

45 timestamp: float = 0.0 

46 source: str = '' # affiliate, credits, subscription 

47 provider_id: str = '' 

48 amount_usd: float = 0.0 

49 user_id: str = '' 

50 event_type: str = '' # purchase, commission, referral 

51 

52 

53@dataclass 

54class PeriodStats: 

55 """Aggregated stats for a time period.""" 

56 period: str = '' # 'hour', 'day', 'week', 'month' 

57 start_ts: float = 0.0 

58 end_ts: float = 0.0 

59 total_cost: float = 0.0 

60 total_revenue: float = 0.0 

61 total_requests: int = 0 

62 earning_spark: float = 0.0 # revenue / cost 

63 net_margin: float = 0.0 # (revenue - cost) / revenue 

64 cost_by_provider: Dict[str, float] = field(default_factory=dict) 

65 cost_by_type: Dict[str, float] = field(default_factory=dict) 

66 revenue_by_source: Dict[str, float] = field(default_factory=dict) 

67 top_models: List[Dict] = field(default_factory=list) 

68 

69 

70class RevenueTracker: 

71 """Track revenue vs compute cost for the earning spark metric.""" 

72 

73 def __init__(self, tracker_path: Optional[str] = None): 

74 try: 

75 from core.platform_paths import get_db_dir 

76 data_dir = Path(get_db_dir()) 

77 except ImportError: 

78 data_dir = Path.home() / 'Documents' / 'Nunba' / 'data' 

79 data_dir.mkdir(parents=True, exist_ok=True) 

80 

81 self._path = Path(tracker_path) if tracker_path else data_dir / 'revenue_tracker.json' 

82 self._lock = threading.Lock() 

83 

84 # In-memory buffers (persisted periodically) 

85 self._costs: List[CostEntry] = [] 

86 self._revenues: List[RevenueEntry] = [] 

87 

88 # Running totals 

89 self._total_cost = 0.0 

90 self._total_revenue = 0.0 

91 self._total_requests = 0 

92 

93 self._load() 

94 

95 def _load(self): 

96 if self._path.exists(): 

97 try: 

98 with open(self._path, 'r') as f: 

99 data = json.load(f) 

100 self._total_cost = data.get('total_cost', 0.0) 

101 self._total_revenue = data.get('total_revenue', 0.0) 

102 self._total_requests = data.get('total_requests', 0) 

103 # Load recent entries (last 24h only — older data is aggregated) 

104 for c in data.get('recent_costs', []): 

105 self._costs.append(CostEntry(**{ 

106 k: v for k, v in c.items() 

107 if k in CostEntry.__dataclass_fields__})) 

108 for r in data.get('recent_revenues', []): 

109 self._revenues.append(RevenueEntry(**{ 

110 k: v for k, v in r.items() 

111 if k in RevenueEntry.__dataclass_fields__})) 

112 except Exception as e: 

113 logger.warning("Failed to load revenue tracker: %s", e) 

114 

115 def save(self): 

116 # Keep only last 24h of entries 

117 cutoff = time.time() - 86400 

118 with self._lock: 

119 recent_costs = [asdict(c) for c in self._costs if c.timestamp > cutoff] 

120 recent_revenues = [asdict(r) for r in self._revenues if r.timestamp > cutoff] 

121 data = { 

122 'total_cost': self._total_cost, 

123 'total_revenue': self._total_revenue, 

124 'total_requests': self._total_requests, 

125 'recent_costs': recent_costs, 

126 'recent_revenues': recent_revenues, 

127 'last_saved': time.time(), 

128 } 

129 try: 

130 with open(self._path, 'w') as f: 

131 json.dump(data, f, indent=2) 

132 except Exception as e: 

133 logger.error("Failed to save revenue tracker: %s", e) 

134 

135 # ── Recording ───────────────────────────────────────────────────── 

136 

137 def record_cost(self, provider_id: str, model_id: str, cost_usd: float, 

138 tokens_used: int = 0, request_type: str = 'llm', 

139 cost_type: str = 'api'): 

140 """Record an API cost event (called by gateway after each request).""" 

141 entry = CostEntry( 

142 timestamp=time.time(), provider_id=provider_id, 

143 model_id=model_id, cost_usd=cost_usd, 

144 cost_type=cost_type, tokens_used=tokens_used, 

145 request_type=request_type, 

146 ) 

147 with self._lock: 

148 self._costs.append(entry) 

149 self._total_cost += cost_usd 

150 self._total_requests += 1 

151 

152 # Auto-save and trim every 50 requests 

153 if self._total_requests % 50 == 0: 

154 self._trim_old_entries() 

155 self.save() 

156 

157 def _trim_old_entries(self): 

158 """Remove entries older than 24h from in-memory lists.""" 

159 cutoff = time.time() - 86400 

160 with self._lock: 

161 self._costs = [c for c in self._costs if c.timestamp > cutoff] 

162 self._revenues = [r for r in self._revenues if r.timestamp > cutoff] 

163 

164 def record_revenue(self, source: str, amount_usd: float, 

165 provider_id: str = '', user_id: str = '', 

166 event_type: str = 'commission'): 

167 """Record a revenue event and credit the contributing user. 

168 

169 When a user's compute, API keys, agents, or content generates revenue, 

170 they earn spark proportional to the amount. Requires revenue_share consent. 

171 """ 

172 # Credit the contributing user via ResonanceService (if consented) 

173 if user_id and amount_usd > 0: 

174 try: 

175 from integrations.social.consent_service import ConsentService 

176 from integrations.social.models import db_session 

177 with db_session() as db: 

178 if ConsentService.check_consent(db, user_id, 'revenue_share'): 

179 from integrations.social.resonance_engine import ResonanceService 

180 # 1 USD = 100 spark (configurable exchange rate) 

181 spark_amount = max(1, int(amount_usd * 100)) 

182 ResonanceService.award_spark( 

183 db, user_id, spark_amount, 

184 source_type='revenue_share', 

185 description=f'Earned from {source}: ${amount_usd:.4f} ({provider_id})', 

186 ) 

187 db.commit() 

188 except Exception as _e: 

189 logger.debug("Revenue credit failed for user %s: %s", user_id, _e) 

190 

191 entry = RevenueEntry( 

192 timestamp=time.time(), source=source, 

193 provider_id=provider_id, amount_usd=amount_usd, 

194 user_id=user_id, event_type=event_type, 

195 ) 

196 with self._lock: 

197 self._revenues.append(entry) 

198 self._total_revenue += amount_usd 

199 self.save() 

200 

201 # ── Analytics ───────────────────────────────────────────────────── 

202 

203 def get_earning_spark(self) -> float: 

204 """Revenue / Cost ratio. >1.0 = profitable.""" 

205 if self._total_cost <= 0: 

206 return float('inf') if self._total_revenue > 0 else 0.0 

207 return self._total_revenue / self._total_cost 

208 

209 def get_summary(self) -> Dict[str, Any]: 

210 """Full analytics summary for dashboard.""" 

211 spark = self.get_earning_spark() 

212 net_margin = ((self._total_revenue - self._total_cost) / self._total_revenue 

213 if self._total_revenue > 0 else 0.0) 

214 

215 # Cost breakdown by provider 

216 cost_by_provider: Dict[str, float] = {} 

217 cost_by_type: Dict[str, float] = {} 

218 for c in self._costs: 

219 cost_by_provider[c.provider_id] = cost_by_provider.get( 

220 c.provider_id, 0) + c.cost_usd 

221 cost_by_type[c.request_type] = cost_by_type.get( 

222 c.request_type, 0) + c.cost_usd 

223 

224 # Revenue breakdown 

225 revenue_by_source: Dict[str, float] = {} 

226 for r in self._revenues: 

227 revenue_by_source[r.source] = revenue_by_source.get( 

228 r.source, 0) + r.amount_usd 

229 

230 # Top cost models 

231 model_costs: Dict[str, float] = {} 

232 for c in self._costs: 

233 key = f"{c.provider_id}/{c.model_id}" 

234 model_costs[key] = model_costs.get(key, 0) + c.cost_usd 

235 top_models = sorted(model_costs.items(), key=lambda x: x[1], reverse=True)[:10] 

236 

237 return { 

238 'total_cost_usd': round(self._total_cost, 6), 

239 'total_revenue_usd': round(self._total_revenue, 6), 

240 'net_profit_usd': round(self._total_revenue - self._total_cost, 6), 

241 'earning_spark': round(spark, 3), 

242 'net_margin_pct': round(net_margin * 100, 1), 

243 'total_requests': self._total_requests, 

244 'cost_per_request': round(self._total_cost / max(1, self._total_requests), 6), 

245 'cost_by_provider': {k: round(v, 6) for k, v in 

246 sorted(cost_by_provider.items(), key=lambda x: x[1], reverse=True)}, 

247 'cost_by_type': {k: round(v, 6) for k, v in cost_by_type.items()}, 

248 'revenue_by_source': {k: round(v, 6) for k, v in revenue_by_source.items()}, 

249 'top_cost_models': [{'model': m, 'cost': round(c, 6)} for m, c in top_models], 

250 } 

251 

252 def get_period_stats(self, hours: int = 24) -> PeriodStats: 

253 """Get stats for a specific time period (last N hours).""" 

254 cutoff = time.time() - (hours * 3600) 

255 costs = [c for c in self._costs if c.timestamp > cutoff] 

256 revenues = [r for r in self._revenues if r.timestamp > cutoff] 

257 

258 total_cost = sum(c.cost_usd for c in costs) 

259 total_revenue = sum(r.amount_usd for r in revenues) 

260 spark = total_revenue / total_cost if total_cost > 0 else 0 

261 

262 return PeriodStats( 

263 period=f'last_{hours}h', 

264 start_ts=cutoff, 

265 end_ts=time.time(), 

266 total_cost=total_cost, 

267 total_revenue=total_revenue, 

268 total_requests=len(costs), 

269 earning_spark=spark, 

270 net_margin=((total_revenue - total_cost) / total_revenue 

271 if total_revenue > 0 else 0), 

272 ) 

273 

274 

275# ═══════════════════════════════════════════════════════════════════════ 

276# Singleton 

277# ═══════════════════════════════════════════════════════════════════════ 

278 

279_tracker: Optional[RevenueTracker] = None 

280_tracker_lock = threading.Lock() 

281 

282 

283def get_tracker() -> RevenueTracker: 

284 global _tracker 

285 if _tracker is None: 

286 with _tracker_lock: 

287 if _tracker is None: 

288 _tracker = RevenueTracker() 

289 return _tracker