Coverage for integrations / social / campaign_service.py: 24.3%
115 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HevolveSocial - Campaign Service ("Make Me Viral")
3Users deploy their own trained agents to auto-market products.
4"""
5import json
6import logging
7from datetime import datetime, timedelta
8from typing import Optional, Dict, List
10from sqlalchemy import desc, func
11from sqlalchemy.orm import Session
13from .models import (
14 User, Post, Campaign, CampaignAction, ResonanceWallet,
15)
16from .resonance_engine import ResonanceService
18logger = logging.getLogger('hevolve_social')
20# Campaign Spark costs
21CAMPAIGN_COSTS = {
22 'generate_strategy': 10,
23 'post': 15,
24 'comment': 5,
25 'boost': 0, # variable, handled by boost system
26}
28# Rate limits
29MAX_CAMPAIGN_POSTS_PER_DAY = 3
32class CampaignService:
34 @staticmethod
35 def create_campaign(db: Session, owner_id: str, name: str,
36 description: str = '', goal: str = 'awareness',
37 product_url: str = '', product_description: str = '',
38 agent_id: str = None,
39 target_regions: List[str] = None,
40 target_communities: List[str] = None,
41 total_spark_budget: int = 100) -> Dict:
42 """Create a new campaign."""
43 campaign = Campaign(
44 owner_id=owner_id,
45 name=name,
46 description=description,
47 goal=goal,
48 product_url=product_url,
49 product_description=product_description,
50 agent_id=agent_id,
51 status='draft',
52 target_regions=json.dumps(target_regions or []),
53 target_communities=json.dumps(target_communities or []),
54 total_spark_budget=total_spark_budget,
55 spark_spent=0,
56 impressions=0,
57 clicks=0,
58 conversions=0,
59 )
60 db.add(campaign)
61 db.flush()
62 return campaign.to_dict()
64 @staticmethod
65 def get_campaign(db: Session, campaign_id: str) -> Optional[Dict]:
66 campaign = db.query(Campaign).filter_by(id=campaign_id).first()
67 if not campaign:
68 return None
69 result = campaign.to_dict()
70 # Add action count
71 result['action_count'] = db.query(func.count(CampaignAction.id)).filter_by(
72 campaign_id=campaign_id
73 ).scalar() or 0
74 return result
76 @staticmethod
77 def list_campaigns(db: Session, owner_id: str = None,
78 status: str = None,
79 limit: int = 25, offset: int = 0) -> List[Dict]:
80 q = db.query(Campaign)
81 if owner_id:
82 q = q.filter_by(owner_id=owner_id)
83 if status:
84 q = q.filter_by(status=status)
85 campaigns = q.order_by(desc(Campaign.created_at)).offset(offset).limit(limit).all()
86 return [c.to_dict() for c in campaigns]
88 @staticmethod
89 def update_campaign(db: Session, campaign_id: str, owner_id: str,
90 updates: Dict) -> Optional[Dict]:
91 """Update campaign settings."""
92 campaign = db.query(Campaign).filter_by(id=campaign_id, owner_id=owner_id).first()
93 if not campaign:
94 return None
96 allowed = ['name', 'description', 'status', 'product_url', 'product_description',
97 'total_spark_budget']
98 for key in allowed:
99 if key in updates:
100 setattr(campaign, key, updates[key])
102 if 'target_regions' in updates:
103 campaign.target_regions = json.dumps(updates['target_regions'])
104 if 'target_communities' in updates:
105 campaign.target_communities = json.dumps(updates['target_communities'])
107 # Status transitions
108 if updates.get('status') == 'active' and not campaign.started_at:
109 campaign.started_at = datetime.utcnow()
111 db.flush()
112 return campaign.to_dict()
114 @staticmethod
115 def generate_strategy(db: Session, campaign_id: str,
116 owner_id: str) -> Optional[Dict]:
117 """Generate a marketing strategy using the campaign's agent.
118 Costs Spark and calls the agent's LLM."""
119 campaign = db.query(Campaign).filter_by(id=campaign_id, owner_id=owner_id).first()
120 if not campaign:
121 return None
123 # Charge strategy generation cost
124 cost = CAMPAIGN_COSTS['generate_strategy']
125 success, _ = ResonanceService.spend_spark(
126 db, owner_id, cost, 'campaign_strategy', campaign_id,
127 f'Strategy generation for {campaign.name}'
128 )
129 if not success:
130 return {'error': 'Insufficient Spark', 'cost': cost}
132 campaign.spark_spent = (campaign.spark_spent or 0) + cost
134 # Generate strategy placeholder (would call agent's LLM in production)
135 strategy = {
136 'content_themes': [
137 f'Highlight key features of {campaign.product_description or campaign.name}',
138 'Share user testimonials and success stories',
139 'Demonstrate unique value proposition',
140 ],
141 'posting_schedule': [
142 {'day': 1, 'action': 'introduction_post', 'target': 'main_feed'},
143 {'day': 2, 'action': 'feature_highlight', 'target': 'relevant_communities'},
144 {'day': 3, 'action': 'engagement_post', 'target': 'target_regions'},
145 ],
146 'engagement_plan': 'Comment on trending posts in target communities, respond to all replies promptly',
147 'estimated_reach': 500,
148 'estimated_duration_days': 7,
149 }
151 campaign.strategy_json = json.dumps(strategy)
152 db.flush()
154 return {
155 'strategy': strategy,
156 'spark_cost': cost,
157 'campaign_id': campaign_id,
158 }
160 @staticmethod
161 def execute_campaign_step(db: Session, campaign_id: str,
162 owner_id: str) -> Optional[Dict]:
163 """Execute the next step in a campaign.
164 Rate limited: max 3 posts/day/campaign."""
165 campaign = db.query(Campaign).filter_by(
166 id=campaign_id, owner_id=owner_id, status='active'
167 ).first()
168 if not campaign:
169 return None
171 # Check daily rate limit
172 today_start = datetime.utcnow().replace(hour=0, minute=0, second=0)
173 today_posts = db.query(func.count(CampaignAction.id)).filter(
174 CampaignAction.campaign_id == campaign_id,
175 CampaignAction.action_type == 'post',
176 CampaignAction.created_at >= today_start,
177 ).scalar() or 0
179 if today_posts >= MAX_CAMPAIGN_POSTS_PER_DAY:
180 return {'error': 'Daily post limit reached', 'limit': MAX_CAMPAIGN_POSTS_PER_DAY}
182 # Check budget
183 cost = CAMPAIGN_COSTS['post']
184 remaining_budget = (campaign.total_spark_budget or 0) - (campaign.spark_spent or 0)
185 if remaining_budget < cost:
186 return {'error': 'Campaign budget exhausted', 'remaining': remaining_budget}
188 # Spend Spark
189 success, _ = ResonanceService.spend_spark(
190 db, owner_id, cost, 'campaign_action', campaign_id,
191 f'Campaign post for {campaign.name}'
192 )
193 if not success:
194 return {'error': 'Insufficient Spark'}
196 campaign.spark_spent = (campaign.spark_spent or 0) + cost
198 # Record action
199 action = CampaignAction(
200 campaign_id=campaign_id,
201 agent_id=campaign.agent_id,
202 action_type='post',
203 content_generated=f'[Auto-generated campaign content for: {campaign.name}]',
204 spark_cost=cost,
205 )
206 db.add(action)
207 campaign.impressions = (campaign.impressions or 0) + 1
209 # Auto-pause if downvote ratio too high
210 if campaign.impressions > 10:
211 total_actions = db.query(func.count(CampaignAction.id)).filter_by(
212 campaign_id=campaign_id
213 ).scalar() or 0
214 # Simple heuristic: if we've spent >80% budget with low engagement
215 if (campaign.spark_spent or 0) > (campaign.total_spark_budget or 0) * 0.8:
216 campaign.status = 'paused'
218 # Award agent XP for campaign action
219 if campaign.agent_id:
220 ResonanceService.award_xp(
221 db, campaign.agent_id, 5, 'campaign_action', campaign_id,
222 'Campaign action XP'
223 )
225 db.flush()
226 return {
227 'action': action.to_dict(),
228 'spark_cost': cost,
229 'budget_remaining': (campaign.total_spark_budget or 0) - (campaign.spark_spent or 0),
230 'impressions': campaign.impressions,
231 }
233 @staticmethod
234 def delete_campaign(db: Session, campaign_id: str,
235 owner_id: str) -> Optional[Dict]:
236 """Delete/cancel a campaign. Refund unspent Spark."""
237 campaign = db.query(Campaign).filter_by(
238 id=campaign_id, owner_id=owner_id
239 ).first()
240 if not campaign:
241 return None
243 # Refund unspent budget
244 unspent = max(0, (campaign.total_spark_budget or 0) - (campaign.spark_spent or 0))
245 if unspent > 0:
246 ResonanceService.award_spark(
247 db, owner_id, unspent, 'campaign_refund', campaign_id,
248 f'Campaign refund for {campaign.name}'
249 )
251 campaign.status = 'completed'
252 campaign.ends_at = datetime.utcnow()
253 db.flush()
255 return {
256 'deleted': True,
257 'spark_refunded': unspent,
258 }
260 @staticmethod
261 def get_leaderboard(db: Session, limit: int = 20) -> List[Dict]:
262 """Get campaign leaderboard by ROI (conversions per spark spent)."""
263 campaigns = db.query(Campaign).filter(
264 Campaign.status.in_(['active', 'completed']),
265 Campaign.spark_spent > 0,
266 ).all()
268 # Calculate ROI
269 ranked = []
270 for c in campaigns:
271 roi = (c.conversions or 0) / max(c.spark_spent, 1)
272 ranked.append({
273 'campaign_id': c.id,
274 'name': c.name,
275 'goal': c.goal,
276 'owner_id': c.owner_id,
277 'impressions': c.impressions or 0,
278 'clicks': c.clicks or 0,
279 'conversions': c.conversions or 0,
280 'spark_spent': c.spark_spent or 0,
281 'roi': round(roi, 4),
282 'status': c.status,
283 })
285 ranked.sort(key=lambda x: -x['roi'])
286 return ranked[:limit]