Coverage for integrations / social / ad_service.py: 85.6%
222 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HevolveSocial - Ad Service
3Ad creation, serving, impression/click tracking, anti-fraud, revenue sharing.
4Advertisers spend Spark to run ads; peer node hosters earn 90% of ad revenue (90/9/1 split).
5"""
6import logging
7from datetime import datetime, timedelta
8from typing import Optional, Dict, List, Tuple
10from sqlalchemy import desc, func, and_
11from sqlalchemy.orm import Session
13from .models import (
14 User, AdUnit, AdPlacement, AdImpression, PeerNode,
15 CommunityMembership, HostingReward,
16)
17from .resonance_engine import ResonanceService
19logger = logging.getLogger('hevolve_social')
21# ─── Constants ───
23AD_COSTS = {
24 'default_cpi': 0.1, # Spark per impression
25 'default_cpc': 1.0, # Spark per click
26 'min_budget': 50, # Minimum Spark budget to create an ad
27}
29MAX_IMPRESSIONS_PER_USER_PER_AD_PER_HOUR = 3
30MAX_CLICKS_PER_USER_PER_AD_PER_HOUR = 1
32try:
33 from integrations.agent_engine.revenue_aggregator import REVENUE_SPLIT_USERS
34 HOSTER_REVENUE_SHARE = REVENUE_SPLIT_USERS # 0.90 (was 0.70)
35except ImportError:
36 HOSTER_REVENUE_SHARE = 0.90
37HOSTER_UNWITNESSED_SHARE = 0.50 # fraud-penalty rate (unchanged)
38PLATFORM_REVENUE_SHARE = 1.0 - HOSTER_REVENUE_SHARE # 0.10 (was 0.30)
40# Default placements to seed
41DEFAULT_PLACEMENTS = [
42 {'name': 'feed_top', 'display_name': 'Feed Top Banner',
43 'description': 'Banner ad at top of main feed', 'max_ads': 1},
44 {'name': 'sidebar', 'display_name': 'Sidebar Ad',
45 'description': 'Sidebar advertisement panel', 'max_ads': 2},
46 {'name': 'region_page', 'display_name': 'Region Page Ad',
47 'description': 'Ad shown on region landing pages', 'max_ads': 1},
48 {'name': 'post_interstitial', 'display_name': 'Post Interstitial',
49 'description': 'Ad between posts in feed', 'max_ads': 1},
50]
53class AdService:
55 # ─── CRUD ───
57 @staticmethod
58 def create_ad(db: Session, advertiser_id: str, title: str,
59 click_url: str, content: str = '',
60 image_url: str = '', ad_type: str = 'banner',
61 targeting: Dict = None,
62 budget_spark: int = 100,
63 cost_per_impression: float = 0.1,
64 cost_per_click: float = 1.0,
65 starts_at: str = None, ends_at: str = None) -> Dict:
66 """Create an ad unit. Debits Spark budget from advertiser."""
67 if budget_spark < AD_COSTS['min_budget']:
68 return {'error': f"Minimum budget is {AD_COSTS['min_budget']} Spark"}
70 if not title or not click_url:
71 return {'error': 'Title and click_url are required'}
73 # Debit Spark budget
74 success, remaining = ResonanceService.spend_spark(
75 db, advertiser_id, budget_spark, 'ad_budget', None,
76 f'Ad budget: {title[:50]}')
77 if not success:
78 return {'error': 'Insufficient Spark', 'spark_balance': remaining}
80 ad = AdUnit(
81 advertiser_id=advertiser_id,
82 title=title, content=content,
83 image_url=image_url, click_url=click_url,
84 ad_type=ad_type,
85 targeting_json=targeting or {},
86 budget_spark=budget_spark,
87 cost_per_impression=cost_per_impression,
88 cost_per_click=cost_per_click,
89 status='active',
90 )
92 if starts_at:
93 try:
94 ad.starts_at = datetime.fromisoformat(starts_at)
95 except (ValueError, TypeError):
96 pass
97 if ends_at:
98 try:
99 ad.ends_at = datetime.fromisoformat(ends_at)
100 except (ValueError, TypeError):
101 pass
103 db.add(ad)
104 db.flush()
105 return ad.to_dict()
107 @staticmethod
108 def get_ad(db: Session, ad_id: str) -> Optional[Dict]:
109 ad = db.query(AdUnit).filter_by(id=ad_id).first()
110 return ad.to_dict() if ad else None
112 @staticmethod
113 def list_my_ads(db: Session, advertiser_id: str,
114 status: str = None,
115 limit: int = 25, offset: int = 0) -> List[Dict]:
116 q = db.query(AdUnit).filter_by(advertiser_id=advertiser_id)
117 if status:
118 q = q.filter_by(status=status)
119 ads = q.order_by(desc(AdUnit.created_at)).offset(offset).limit(limit).all()
120 return [a.to_dict() for a in ads]
122 @staticmethod
123 def update_ad(db: Session, ad_id: str, advertiser_id: str,
124 updates: Dict) -> Optional[Dict]:
125 ad = db.query(AdUnit).filter_by(id=ad_id, advertiser_id=advertiser_id).first()
126 if not ad:
127 return None
128 for key in ['title', 'content', 'image_url', 'click_url', 'ad_type',
129 'targeting_json', 'status']:
130 if key in updates:
131 setattr(ad, key, updates[key])
132 db.flush()
133 return ad.to_dict()
135 @staticmethod
136 def pause_ad(db: Session, ad_id: str, advertiser_id: str) -> Optional[Dict]:
137 ad = db.query(AdUnit).filter_by(id=ad_id, advertiser_id=advertiser_id).first()
138 if not ad:
139 return None
140 ad.status = 'paused'
141 db.flush()
142 return ad.to_dict()
144 @staticmethod
145 def delete_ad(db: Session, ad_id: str, advertiser_id: str) -> Optional[Dict]:
146 """Cancel ad and refund unspent Spark."""
147 ad = db.query(AdUnit).filter_by(id=ad_id, advertiser_id=advertiser_id).first()
148 if not ad:
149 return None
150 unspent = max(0, (ad.budget_spark or 0) - (ad.spent_spark or 0))
151 if unspent > 0:
152 ResonanceService.award_spark(
153 db, advertiser_id, unspent, 'ad_refund', ad.id,
154 f'Ad refund: {ad.title[:50]}')
155 ad.status = 'completed'
156 db.flush()
157 return {'deleted': True, 'spark_refunded': unspent}
159 # ─── Ad Serving ───
161 @staticmethod
162 def serve_ad(db: Session, user_id: str = None,
163 region_id: str = None,
164 placement_name: str = 'feed_top',
165 node_id: str = None) -> Optional[Dict]:
166 """Select the best ad for a given context."""
167 now = datetime.utcnow()
169 # Active ads with remaining budget
170 q = db.query(AdUnit).filter(
171 AdUnit.status == 'active',
172 AdUnit.spent_spark < AdUnit.budget_spark,
173 )
174 q = q.filter(
175 (AdUnit.starts_at.is_(None)) | (AdUnit.starts_at <= now),
176 (AdUnit.ends_at.is_(None)) | (AdUnit.ends_at > now),
177 )
178 candidates = q.all()
179 if not candidates:
180 return None
182 # Targeting filters
183 user_communities = set()
184 user_type = None
185 if user_id:
186 memberships = db.query(CommunityMembership.community_id).filter_by(
187 user_id=user_id).all()
188 user_communities = {m[0] for m in memberships}
189 user_obj = db.query(User).filter_by(id=user_id).first()
190 user_type = user_obj.user_type if user_obj else None
192 scored = []
193 for ad in candidates:
194 targeting = ad.targeting_json or {}
195 target_regions = targeting.get('region_ids', [])
196 if target_regions and region_id and region_id not in target_regions:
197 continue
198 target_communities = targeting.get('community_ids', [])
199 if target_communities and not user_communities.intersection(set(target_communities)):
200 continue
201 target_user_types = targeting.get('user_types', [])
202 if target_user_types and user_type and user_type not in target_user_types:
203 continue
204 scored.append(ad)
206 if not scored:
207 return None
209 # Anti-fraud: exclude ads user has seen too many times
210 if user_id:
211 one_hour_ago = now - timedelta(hours=1)
212 filtered = []
213 for ad in scored:
214 count = db.query(func.count(AdImpression.id)).filter(
215 AdImpression.ad_id == ad.id,
216 AdImpression.user_id == user_id,
217 AdImpression.impression_type == 'view',
218 AdImpression.created_at >= one_hour_ago,
219 ).scalar() or 0
220 if count < MAX_IMPRESSIONS_PER_USER_PER_AD_PER_HOUR:
221 filtered.append(ad)
222 scored = filtered
224 if not scored:
225 return None
227 # Sort by remaining budget descending, pick top
228 scored.sort(key=lambda a: (a.budget_spark - a.spent_spark), reverse=True)
229 winner = scored[0]
231 return {
232 'ad': winner.to_dict(),
233 'placement': placement_name,
234 }
236 # ─── Impressions & Clicks ───
238 @staticmethod
239 def record_impression(db: Session, ad_id: str,
240 user_id: str = None,
241 node_id: str = None,
242 region_id: str = None,
243 placement_id: str = None,
244 ip_hash: str = None) -> Optional[Dict]:
245 """Record an ad view impression. Credits node hoster."""
246 ad = db.query(AdUnit).filter_by(id=ad_id, status='active').first()
247 if not ad:
248 return None
250 # Anti-fraud
251 if user_id and not AdService._check_rate_limit(
252 db, ad_id, user_id, 'view',
253 MAX_IMPRESSIONS_PER_USER_PER_AD_PER_HOUR
254 ):
255 return {'error': 'Rate limit exceeded', 'ad_id': ad_id}
257 # Budget check
258 cost = ad.cost_per_impression or AD_COSTS['default_cpi']
259 if (ad.spent_spark or 0) + cost > (ad.budget_spark or 0):
260 ad.status = 'exhausted'
261 db.flush()
262 return {'error': 'Ad budget exhausted'}
264 imp = AdImpression(
265 ad_id=ad_id, placement_id=placement_id,
266 node_id=node_id, region_id=region_id,
267 user_id=user_id, impression_type='view',
268 ip_hash=ip_hash,
269 )
270 db.add(imp)
271 db.flush()
273 ad.spent_spark = int((ad.spent_spark or 0) + cost)
274 ad.impression_count = (ad.impression_count or 0) + 1
276 # Request peer witness (best-effort, non-blocking)
277 witnessed = False
278 if node_id:
279 try:
280 from .integrity_service import IntegrityService
281 witness_result = IntegrityService.request_nearest_witness(
282 db, imp.id, ad_id, node_id)
283 if witness_result:
284 witnessed = True
285 # Seal the impression with witness data
286 imp.witness_node_id = witness_result.get(
287 'attester_node_id', '')
288 imp.witness_signature = witness_result.get(
289 'signature', '')
290 imp.sealed_hash = imp.compute_seal_hash
291 imp.sealed_at = datetime.utcnow()
292 except Exception:
293 pass
295 # Credit node hoster: 90% if witnessed, 50% if not (fraud penalty)
296 share = HOSTER_REVENUE_SHARE if witnessed else HOSTER_UNWITNESSED_SHARE
297 hoster_share = cost * share
298 AdService._credit_node_hoster(
299 db, node_id, hoster_share,
300 f'Ad impression revenue: ad {ad_id[:8]}')
302 # ── Viewer earns Spark for ad_impression_served ──────────────
303 # AWARD_TABLE['ad_impression_served'] = {'spark': 1} has
304 # existed in resonance_engine for every past release but was
305 # never invoked — the viewer's Spark wallet was always empty
306 # from ad exposure even though the node hoster was credited.
307 # Closing this loop is the "eyeball earns revenue" piece of
308 # the revenue-tracker flow.
309 #
310 # Gates already in place above keep this honest:
311 # * per-user-per-ad 3/hour rate limit (line 251) — anti-farm
312 # * budget-exhausted short-circuit (line 259) — no phantom
313 # * Provenance: ResonanceTransaction gets
314 # source_type='ad_impression_served', source_id=imp.id
315 # so every Spark credit traces back to the exact sealed
316 # impression row (user_id, node_id, witness, sealed_hash).
317 # * Isolation: award_action awards to `user_id` only; the
318 # outer API handler already scoped this to g.user_id via
319 # @require_auth before calling record_impression.
320 if user_id:
321 try:
322 ResonanceService.award_action(
323 db, user_id, 'ad_impression_served', imp.id)
324 except Exception as _ve:
325 logger.debug(
326 f"viewer spark credit skipped for imp {imp.id}: {_ve}")
328 db.flush()
329 result = imp.to_dict()
330 result['witnessed'] = witnessed
331 return result
333 @staticmethod
334 def record_click(db: Session, ad_id: str,
335 user_id: str = None,
336 node_id: str = None,
337 ip_hash: str = None) -> Optional[Dict]:
338 """Record an ad click. Credits node hoster."""
339 ad = db.query(AdUnit).filter_by(id=ad_id, status='active').first()
340 if not ad:
341 return None
343 # Anti-fraud
344 if user_id and not AdService._check_rate_limit(
345 db, ad_id, user_id, 'click',
346 MAX_CLICKS_PER_USER_PER_AD_PER_HOUR
347 ):
348 return {'error': 'Click rate limit exceeded', 'ad_id': ad_id}
350 cost = ad.cost_per_click or AD_COSTS['default_cpc']
351 if (ad.spent_spark or 0) + cost > (ad.budget_spark or 0):
352 ad.status = 'exhausted'
353 db.flush()
354 return {'error': 'Ad budget exhausted'}
356 imp = AdImpression(
357 ad_id=ad_id, node_id=node_id,
358 user_id=user_id, impression_type='click',
359 ip_hash=ip_hash,
360 )
361 db.add(imp)
362 db.flush()
364 ad.spent_spark = int((ad.spent_spark or 0) + cost)
365 ad.click_count = (ad.click_count or 0) + 1
367 # Request peer witness for click (best-effort)
368 witnessed = False
369 if node_id:
370 try:
371 from .integrity_service import IntegrityService
372 witnessed = IntegrityService.request_nearest_witness(
373 db, imp.id, ad_id, node_id)
374 except Exception:
375 pass
377 share = HOSTER_REVENUE_SHARE if witnessed else HOSTER_UNWITNESSED_SHARE
378 hoster_share = cost * share
379 AdService._credit_node_hoster(
380 db, node_id, hoster_share,
381 f'Ad click revenue: ad {ad_id[:8]}')
383 db.flush()
384 result = imp.to_dict()
385 result['witnessed'] = witnessed
386 return result
388 # ─── Analytics ───
390 @staticmethod
391 def get_analytics(db: Session, ad_id: str,
392 advertiser_id: str) -> Optional[Dict]:
393 ad = db.query(AdUnit).filter_by(
394 id=ad_id, advertiser_id=advertiser_id).first()
395 if not ad:
396 return None
398 impressions = ad.impression_count or 0
399 clicks = ad.click_count or 0
400 ctr = (clicks / impressions * 100) if impressions > 0 else 0.0
402 # Per-node breakdown
403 node_stats = db.query(
404 AdImpression.node_id,
405 func.count(AdImpression.id),
406 ).filter_by(ad_id=ad_id).group_by(
407 AdImpression.node_id,
408 ).all()
410 return {
411 'ad': ad.to_dict(),
412 'impressions': impressions,
413 'clicks': clicks,
414 'ctr': round(ctr, 2),
415 'spent_spark': ad.spent_spark or 0,
416 'remaining_spark': max(0, (ad.budget_spark or 0) - (ad.spent_spark or 0)),
417 'node_breakdown': [
418 {'node_id': nid, 'count': cnt}
419 for nid, cnt in node_stats if nid
420 ],
421 }
423 # ─── Internal Helpers ───
425 @staticmethod
426 def _check_rate_limit(db: Session, ad_id: str, user_id: str,
427 impression_type: str, max_count: int) -> bool:
428 """Returns True if under rate limit."""
429 one_hour_ago = datetime.utcnow() - timedelta(hours=1)
430 count = db.query(func.count(AdImpression.id)).filter(
431 AdImpression.ad_id == ad_id,
432 AdImpression.user_id == user_id,
433 AdImpression.impression_type == impression_type,
434 AdImpression.created_at >= one_hour_ago,
435 ).scalar() or 0
436 return count < max_count
438 @staticmethod
439 def _credit_node_hoster(db: Session, node_id: str,
440 spark_amount: float, reason: str):
441 """Credit node operator with their revenue share."""
442 peer = db.query(PeerNode).filter_by(node_id=node_id).first()
443 if not peer or not peer.node_operator_id:
444 return
445 if spark_amount >= 1:
446 ResonanceService.award_spark(
447 db, peer.node_operator_id, int(spark_amount),
448 'ad_revenue', node_id, reason)
450 # ─── Seeding ───
452 @staticmethod
453 def seed_placements(db: Session) -> int:
454 """Seed default ad placements. Returns count created."""
455 created = 0
456 for p in DEFAULT_PLACEMENTS:
457 existing = db.query(AdPlacement).filter_by(name=p['name']).first()
458 if not existing:
459 db.add(AdPlacement(**p))
460 created += 1
461 if created > 0:
462 db.flush()
463 return created