Coverage for integrations / social / ad_service.py: 85.6%

222 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial - Ad Service 

3Ad creation, serving, impression/click tracking, anti-fraud, revenue sharing. 

4Advertisers spend Spark to run ads; peer node hosters earn 90% of ad revenue (90/9/1 split). 

5""" 

6import logging 

7from datetime import datetime, timedelta 

8from typing import Optional, Dict, List, Tuple 

9 

10from sqlalchemy import desc, func, and_ 

11from sqlalchemy.orm import Session 

12 

13from .models import ( 

14 User, AdUnit, AdPlacement, AdImpression, PeerNode, 

15 CommunityMembership, HostingReward, 

16) 

17from .resonance_engine import ResonanceService 

18 

19logger = logging.getLogger('hevolve_social') 

20 

21# ─── Constants ─── 

22 

23AD_COSTS = { 

24 'default_cpi': 0.1, # Spark per impression 

25 'default_cpc': 1.0, # Spark per click 

26 'min_budget': 50, # Minimum Spark budget to create an ad 

27} 

28 

29MAX_IMPRESSIONS_PER_USER_PER_AD_PER_HOUR = 3 

30MAX_CLICKS_PER_USER_PER_AD_PER_HOUR = 1 

31 

32try: 

33 from integrations.agent_engine.revenue_aggregator import REVENUE_SPLIT_USERS 

34 HOSTER_REVENUE_SHARE = REVENUE_SPLIT_USERS # 0.90 (was 0.70) 

35except ImportError: 

36 HOSTER_REVENUE_SHARE = 0.90 

37HOSTER_UNWITNESSED_SHARE = 0.50 # fraud-penalty rate (unchanged) 

38PLATFORM_REVENUE_SHARE = 1.0 - HOSTER_REVENUE_SHARE # 0.10 (was 0.30) 

39 

40# Default placements to seed 

41DEFAULT_PLACEMENTS = [ 

42 {'name': 'feed_top', 'display_name': 'Feed Top Banner', 

43 'description': 'Banner ad at top of main feed', 'max_ads': 1}, 

44 {'name': 'sidebar', 'display_name': 'Sidebar Ad', 

45 'description': 'Sidebar advertisement panel', 'max_ads': 2}, 

46 {'name': 'region_page', 'display_name': 'Region Page Ad', 

47 'description': 'Ad shown on region landing pages', 'max_ads': 1}, 

48 {'name': 'post_interstitial', 'display_name': 'Post Interstitial', 

49 'description': 'Ad between posts in feed', 'max_ads': 1}, 

50] 

51 

52 

53class AdService: 

54 

55 # ─── CRUD ─── 

56 

57 @staticmethod 

58 def create_ad(db: Session, advertiser_id: str, title: str, 

59 click_url: str, content: str = '', 

60 image_url: str = '', ad_type: str = 'banner', 

61 targeting: Dict = None, 

62 budget_spark: int = 100, 

63 cost_per_impression: float = 0.1, 

64 cost_per_click: float = 1.0, 

65 starts_at: str = None, ends_at: str = None) -> Dict: 

66 """Create an ad unit. Debits Spark budget from advertiser.""" 

67 if budget_spark < AD_COSTS['min_budget']: 

68 return {'error': f"Minimum budget is {AD_COSTS['min_budget']} Spark"} 

69 

70 if not title or not click_url: 

71 return {'error': 'Title and click_url are required'} 

72 

73 # Debit Spark budget 

74 success, remaining = ResonanceService.spend_spark( 

75 db, advertiser_id, budget_spark, 'ad_budget', None, 

76 f'Ad budget: {title[:50]}') 

77 if not success: 

78 return {'error': 'Insufficient Spark', 'spark_balance': remaining} 

79 

80 ad = AdUnit( 

81 advertiser_id=advertiser_id, 

82 title=title, content=content, 

83 image_url=image_url, click_url=click_url, 

84 ad_type=ad_type, 

85 targeting_json=targeting or {}, 

86 budget_spark=budget_spark, 

87 cost_per_impression=cost_per_impression, 

88 cost_per_click=cost_per_click, 

89 status='active', 

90 ) 

91 

92 if starts_at: 

93 try: 

94 ad.starts_at = datetime.fromisoformat(starts_at) 

95 except (ValueError, TypeError): 

96 pass 

97 if ends_at: 

98 try: 

99 ad.ends_at = datetime.fromisoformat(ends_at) 

100 except (ValueError, TypeError): 

101 pass 

102 

103 db.add(ad) 

104 db.flush() 

105 return ad.to_dict() 

106 

107 @staticmethod 

108 def get_ad(db: Session, ad_id: str) -> Optional[Dict]: 

109 ad = db.query(AdUnit).filter_by(id=ad_id).first() 

110 return ad.to_dict() if ad else None 

111 

112 @staticmethod 

113 def list_my_ads(db: Session, advertiser_id: str, 

114 status: str = None, 

115 limit: int = 25, offset: int = 0) -> List[Dict]: 

116 q = db.query(AdUnit).filter_by(advertiser_id=advertiser_id) 

117 if status: 

118 q = q.filter_by(status=status) 

119 ads = q.order_by(desc(AdUnit.created_at)).offset(offset).limit(limit).all() 

120 return [a.to_dict() for a in ads] 

121 

122 @staticmethod 

123 def update_ad(db: Session, ad_id: str, advertiser_id: str, 

124 updates: Dict) -> Optional[Dict]: 

125 ad = db.query(AdUnit).filter_by(id=ad_id, advertiser_id=advertiser_id).first() 

126 if not ad: 

127 return None 

128 for key in ['title', 'content', 'image_url', 'click_url', 'ad_type', 

129 'targeting_json', 'status']: 

130 if key in updates: 

131 setattr(ad, key, updates[key]) 

132 db.flush() 

133 return ad.to_dict() 

134 

135 @staticmethod 

136 def pause_ad(db: Session, ad_id: str, advertiser_id: str) -> Optional[Dict]: 

137 ad = db.query(AdUnit).filter_by(id=ad_id, advertiser_id=advertiser_id).first() 

138 if not ad: 

139 return None 

140 ad.status = 'paused' 

141 db.flush() 

142 return ad.to_dict() 

143 

144 @staticmethod 

145 def delete_ad(db: Session, ad_id: str, advertiser_id: str) -> Optional[Dict]: 

146 """Cancel ad and refund unspent Spark.""" 

147 ad = db.query(AdUnit).filter_by(id=ad_id, advertiser_id=advertiser_id).first() 

148 if not ad: 

149 return None 

150 unspent = max(0, (ad.budget_spark or 0) - (ad.spent_spark or 0)) 

151 if unspent > 0: 

152 ResonanceService.award_spark( 

153 db, advertiser_id, unspent, 'ad_refund', ad.id, 

154 f'Ad refund: {ad.title[:50]}') 

155 ad.status = 'completed' 

156 db.flush() 

157 return {'deleted': True, 'spark_refunded': unspent} 

158 

159 # ─── Ad Serving ─── 

160 

161 @staticmethod 

162 def serve_ad(db: Session, user_id: str = None, 

163 region_id: str = None, 

164 placement_name: str = 'feed_top', 

165 node_id: str = None) -> Optional[Dict]: 

166 """Select the best ad for a given context.""" 

167 now = datetime.utcnow() 

168 

169 # Active ads with remaining budget 

170 q = db.query(AdUnit).filter( 

171 AdUnit.status == 'active', 

172 AdUnit.spent_spark < AdUnit.budget_spark, 

173 ) 

174 q = q.filter( 

175 (AdUnit.starts_at.is_(None)) | (AdUnit.starts_at <= now), 

176 (AdUnit.ends_at.is_(None)) | (AdUnit.ends_at > now), 

177 ) 

178 candidates = q.all() 

179 if not candidates: 

180 return None 

181 

182 # Targeting filters 

183 user_communities = set() 

184 user_type = None 

185 if user_id: 

186 memberships = db.query(CommunityMembership.community_id).filter_by( 

187 user_id=user_id).all() 

188 user_communities = {m[0] for m in memberships} 

189 user_obj = db.query(User).filter_by(id=user_id).first() 

190 user_type = user_obj.user_type if user_obj else None 

191 

192 scored = [] 

193 for ad in candidates: 

194 targeting = ad.targeting_json or {} 

195 target_regions = targeting.get('region_ids', []) 

196 if target_regions and region_id and region_id not in target_regions: 

197 continue 

198 target_communities = targeting.get('community_ids', []) 

199 if target_communities and not user_communities.intersection(set(target_communities)): 

200 continue 

201 target_user_types = targeting.get('user_types', []) 

202 if target_user_types and user_type and user_type not in target_user_types: 

203 continue 

204 scored.append(ad) 

205 

206 if not scored: 

207 return None 

208 

209 # Anti-fraud: exclude ads user has seen too many times 

210 if user_id: 

211 one_hour_ago = now - timedelta(hours=1) 

212 filtered = [] 

213 for ad in scored: 

214 count = db.query(func.count(AdImpression.id)).filter( 

215 AdImpression.ad_id == ad.id, 

216 AdImpression.user_id == user_id, 

217 AdImpression.impression_type == 'view', 

218 AdImpression.created_at >= one_hour_ago, 

219 ).scalar() or 0 

220 if count < MAX_IMPRESSIONS_PER_USER_PER_AD_PER_HOUR: 

221 filtered.append(ad) 

222 scored = filtered 

223 

224 if not scored: 

225 return None 

226 

227 # Sort by remaining budget descending, pick top 

228 scored.sort(key=lambda a: (a.budget_spark - a.spent_spark), reverse=True) 

229 winner = scored[0] 

230 

231 return { 

232 'ad': winner.to_dict(), 

233 'placement': placement_name, 

234 } 

235 

236 # ─── Impressions & Clicks ─── 

237 

238 @staticmethod 

239 def record_impression(db: Session, ad_id: str, 

240 user_id: str = None, 

241 node_id: str = None, 

242 region_id: str = None, 

243 placement_id: str = None, 

244 ip_hash: str = None) -> Optional[Dict]: 

245 """Record an ad view impression. Credits node hoster.""" 

246 ad = db.query(AdUnit).filter_by(id=ad_id, status='active').first() 

247 if not ad: 

248 return None 

249 

250 # Anti-fraud 

251 if user_id and not AdService._check_rate_limit( 

252 db, ad_id, user_id, 'view', 

253 MAX_IMPRESSIONS_PER_USER_PER_AD_PER_HOUR 

254 ): 

255 return {'error': 'Rate limit exceeded', 'ad_id': ad_id} 

256 

257 # Budget check 

258 cost = ad.cost_per_impression or AD_COSTS['default_cpi'] 

259 if (ad.spent_spark or 0) + cost > (ad.budget_spark or 0): 

260 ad.status = 'exhausted' 

261 db.flush() 

262 return {'error': 'Ad budget exhausted'} 

263 

264 imp = AdImpression( 

265 ad_id=ad_id, placement_id=placement_id, 

266 node_id=node_id, region_id=region_id, 

267 user_id=user_id, impression_type='view', 

268 ip_hash=ip_hash, 

269 ) 

270 db.add(imp) 

271 db.flush() 

272 

273 ad.spent_spark = int((ad.spent_spark or 0) + cost) 

274 ad.impression_count = (ad.impression_count or 0) + 1 

275 

276 # Request peer witness (best-effort, non-blocking) 

277 witnessed = False 

278 if node_id: 

279 try: 

280 from .integrity_service import IntegrityService 

281 witness_result = IntegrityService.request_nearest_witness( 

282 db, imp.id, ad_id, node_id) 

283 if witness_result: 

284 witnessed = True 

285 # Seal the impression with witness data 

286 imp.witness_node_id = witness_result.get( 

287 'attester_node_id', '') 

288 imp.witness_signature = witness_result.get( 

289 'signature', '') 

290 imp.sealed_hash = imp.compute_seal_hash 

291 imp.sealed_at = datetime.utcnow() 

292 except Exception: 

293 pass 

294 

295 # Credit node hoster: 90% if witnessed, 50% if not (fraud penalty) 

296 share = HOSTER_REVENUE_SHARE if witnessed else HOSTER_UNWITNESSED_SHARE 

297 hoster_share = cost * share 

298 AdService._credit_node_hoster( 

299 db, node_id, hoster_share, 

300 f'Ad impression revenue: ad {ad_id[:8]}') 

301 

302 # ── Viewer earns Spark for ad_impression_served ────────────── 

303 # AWARD_TABLE['ad_impression_served'] = {'spark': 1} has 

304 # existed in resonance_engine for every past release but was 

305 # never invoked — the viewer's Spark wallet was always empty 

306 # from ad exposure even though the node hoster was credited. 

307 # Closing this loop is the "eyeball earns revenue" piece of 

308 # the revenue-tracker flow. 

309 # 

310 # Gates already in place above keep this honest: 

311 # * per-user-per-ad 3/hour rate limit (line 251) — anti-farm 

312 # * budget-exhausted short-circuit (line 259) — no phantom 

313 # * Provenance: ResonanceTransaction gets 

314 # source_type='ad_impression_served', source_id=imp.id 

315 # so every Spark credit traces back to the exact sealed 

316 # impression row (user_id, node_id, witness, sealed_hash). 

317 # * Isolation: award_action awards to `user_id` only; the 

318 # outer API handler already scoped this to g.user_id via 

319 # @require_auth before calling record_impression. 

320 if user_id: 

321 try: 

322 ResonanceService.award_action( 

323 db, user_id, 'ad_impression_served', imp.id) 

324 except Exception as _ve: 

325 logger.debug( 

326 f"viewer spark credit skipped for imp {imp.id}: {_ve}") 

327 

328 db.flush() 

329 result = imp.to_dict() 

330 result['witnessed'] = witnessed 

331 return result 

332 

333 @staticmethod 

334 def record_click(db: Session, ad_id: str, 

335 user_id: str = None, 

336 node_id: str = None, 

337 ip_hash: str = None) -> Optional[Dict]: 

338 """Record an ad click. Credits node hoster.""" 

339 ad = db.query(AdUnit).filter_by(id=ad_id, status='active').first() 

340 if not ad: 

341 return None 

342 

343 # Anti-fraud 

344 if user_id and not AdService._check_rate_limit( 

345 db, ad_id, user_id, 'click', 

346 MAX_CLICKS_PER_USER_PER_AD_PER_HOUR 

347 ): 

348 return {'error': 'Click rate limit exceeded', 'ad_id': ad_id} 

349 

350 cost = ad.cost_per_click or AD_COSTS['default_cpc'] 

351 if (ad.spent_spark or 0) + cost > (ad.budget_spark or 0): 

352 ad.status = 'exhausted' 

353 db.flush() 

354 return {'error': 'Ad budget exhausted'} 

355 

356 imp = AdImpression( 

357 ad_id=ad_id, node_id=node_id, 

358 user_id=user_id, impression_type='click', 

359 ip_hash=ip_hash, 

360 ) 

361 db.add(imp) 

362 db.flush() 

363 

364 ad.spent_spark = int((ad.spent_spark or 0) + cost) 

365 ad.click_count = (ad.click_count or 0) + 1 

366 

367 # Request peer witness for click (best-effort) 

368 witnessed = False 

369 if node_id: 

370 try: 

371 from .integrity_service import IntegrityService 

372 witnessed = IntegrityService.request_nearest_witness( 

373 db, imp.id, ad_id, node_id) 

374 except Exception: 

375 pass 

376 

377 share = HOSTER_REVENUE_SHARE if witnessed else HOSTER_UNWITNESSED_SHARE 

378 hoster_share = cost * share 

379 AdService._credit_node_hoster( 

380 db, node_id, hoster_share, 

381 f'Ad click revenue: ad {ad_id[:8]}') 

382 

383 db.flush() 

384 result = imp.to_dict() 

385 result['witnessed'] = witnessed 

386 return result 

387 

388 # ─── Analytics ─── 

389 

390 @staticmethod 

391 def get_analytics(db: Session, ad_id: str, 

392 advertiser_id: str) -> Optional[Dict]: 

393 ad = db.query(AdUnit).filter_by( 

394 id=ad_id, advertiser_id=advertiser_id).first() 

395 if not ad: 

396 return None 

397 

398 impressions = ad.impression_count or 0 

399 clicks = ad.click_count or 0 

400 ctr = (clicks / impressions * 100) if impressions > 0 else 0.0 

401 

402 # Per-node breakdown 

403 node_stats = db.query( 

404 AdImpression.node_id, 

405 func.count(AdImpression.id), 

406 ).filter_by(ad_id=ad_id).group_by( 

407 AdImpression.node_id, 

408 ).all() 

409 

410 return { 

411 'ad': ad.to_dict(), 

412 'impressions': impressions, 

413 'clicks': clicks, 

414 'ctr': round(ctr, 2), 

415 'spent_spark': ad.spent_spark or 0, 

416 'remaining_spark': max(0, (ad.budget_spark or 0) - (ad.spent_spark or 0)), 

417 'node_breakdown': [ 

418 {'node_id': nid, 'count': cnt} 

419 for nid, cnt in node_stats if nid 

420 ], 

421 } 

422 

423 # ─── Internal Helpers ─── 

424 

425 @staticmethod 

426 def _check_rate_limit(db: Session, ad_id: str, user_id: str, 

427 impression_type: str, max_count: int) -> bool: 

428 """Returns True if under rate limit.""" 

429 one_hour_ago = datetime.utcnow() - timedelta(hours=1) 

430 count = db.query(func.count(AdImpression.id)).filter( 

431 AdImpression.ad_id == ad_id, 

432 AdImpression.user_id == user_id, 

433 AdImpression.impression_type == impression_type, 

434 AdImpression.created_at >= one_hour_ago, 

435 ).scalar() or 0 

436 return count < max_count 

437 

438 @staticmethod 

439 def _credit_node_hoster(db: Session, node_id: str, 

440 spark_amount: float, reason: str): 

441 """Credit node operator with their revenue share.""" 

442 peer = db.query(PeerNode).filter_by(node_id=node_id).first() 

443 if not peer or not peer.node_operator_id: 

444 return 

445 if spark_amount >= 1: 

446 ResonanceService.award_spark( 

447 db, peer.node_operator_id, int(spark_amount), 

448 'ad_revenue', node_id, reason) 

449 

450 # ─── Seeding ─── 

451 

452 @staticmethod 

453 def seed_placements(db: Session) -> int: 

454 """Seed default ad placements. Returns count created.""" 

455 created = 0 

456 for p in DEFAULT_PLACEMENTS: 

457 existing = db.query(AdPlacement).filter_by(name=p['name']).first() 

458 if not existing: 

459 db.add(AdPlacement(**p)) 

460 created += 1 

461 if created > 0: 

462 db.flush() 

463 return created