Coverage for integrations / social / campaign_service.py: 24.3%

115 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial - Campaign Service ("Make Me Viral") 

3Users deploy their own trained agents to auto-market products. 

4""" 

5import json 

6import logging 

7from datetime import datetime, timedelta 

8from typing import Optional, Dict, List 

9 

10from sqlalchemy import desc, func 

11from sqlalchemy.orm import Session 

12 

13from .models import ( 

14 User, Post, Campaign, CampaignAction, ResonanceWallet, 

15) 

16from .resonance_engine import ResonanceService 

17 

18logger = logging.getLogger('hevolve_social') 

19 

20# Campaign Spark costs 

21CAMPAIGN_COSTS = { 

22 'generate_strategy': 10, 

23 'post': 15, 

24 'comment': 5, 

25 'boost': 0, # variable, handled by boost system 

26} 

27 

28# Rate limits 

29MAX_CAMPAIGN_POSTS_PER_DAY = 3 

30 

31 

32class CampaignService: 

33 

34 @staticmethod 

35 def create_campaign(db: Session, owner_id: str, name: str, 

36 description: str = '', goal: str = 'awareness', 

37 product_url: str = '', product_description: str = '', 

38 agent_id: str = None, 

39 target_regions: List[str] = None, 

40 target_communities: List[str] = None, 

41 total_spark_budget: int = 100) -> Dict: 

42 """Create a new campaign.""" 

43 campaign = Campaign( 

44 owner_id=owner_id, 

45 name=name, 

46 description=description, 

47 goal=goal, 

48 product_url=product_url, 

49 product_description=product_description, 

50 agent_id=agent_id, 

51 status='draft', 

52 target_regions=json.dumps(target_regions or []), 

53 target_communities=json.dumps(target_communities or []), 

54 total_spark_budget=total_spark_budget, 

55 spark_spent=0, 

56 impressions=0, 

57 clicks=0, 

58 conversions=0, 

59 ) 

60 db.add(campaign) 

61 db.flush() 

62 return campaign.to_dict() 

63 

64 @staticmethod 

65 def get_campaign(db: Session, campaign_id: str) -> Optional[Dict]: 

66 campaign = db.query(Campaign).filter_by(id=campaign_id).first() 

67 if not campaign: 

68 return None 

69 result = campaign.to_dict() 

70 # Add action count 

71 result['action_count'] = db.query(func.count(CampaignAction.id)).filter_by( 

72 campaign_id=campaign_id 

73 ).scalar() or 0 

74 return result 

75 

76 @staticmethod 

77 def list_campaigns(db: Session, owner_id: str = None, 

78 status: str = None, 

79 limit: int = 25, offset: int = 0) -> List[Dict]: 

80 q = db.query(Campaign) 

81 if owner_id: 

82 q = q.filter_by(owner_id=owner_id) 

83 if status: 

84 q = q.filter_by(status=status) 

85 campaigns = q.order_by(desc(Campaign.created_at)).offset(offset).limit(limit).all() 

86 return [c.to_dict() for c in campaigns] 

87 

88 @staticmethod 

89 def update_campaign(db: Session, campaign_id: str, owner_id: str, 

90 updates: Dict) -> Optional[Dict]: 

91 """Update campaign settings.""" 

92 campaign = db.query(Campaign).filter_by(id=campaign_id, owner_id=owner_id).first() 

93 if not campaign: 

94 return None 

95 

96 allowed = ['name', 'description', 'status', 'product_url', 'product_description', 

97 'total_spark_budget'] 

98 for key in allowed: 

99 if key in updates: 

100 setattr(campaign, key, updates[key]) 

101 

102 if 'target_regions' in updates: 

103 campaign.target_regions = json.dumps(updates['target_regions']) 

104 if 'target_communities' in updates: 

105 campaign.target_communities = json.dumps(updates['target_communities']) 

106 

107 # Status transitions 

108 if updates.get('status') == 'active' and not campaign.started_at: 

109 campaign.started_at = datetime.utcnow() 

110 

111 db.flush() 

112 return campaign.to_dict() 

113 

114 @staticmethod 

115 def generate_strategy(db: Session, campaign_id: str, 

116 owner_id: str) -> Optional[Dict]: 

117 """Generate a marketing strategy using the campaign's agent. 

118 Costs Spark and calls the agent's LLM.""" 

119 campaign = db.query(Campaign).filter_by(id=campaign_id, owner_id=owner_id).first() 

120 if not campaign: 

121 return None 

122 

123 # Charge strategy generation cost 

124 cost = CAMPAIGN_COSTS['generate_strategy'] 

125 success, _ = ResonanceService.spend_spark( 

126 db, owner_id, cost, 'campaign_strategy', campaign_id, 

127 f'Strategy generation for {campaign.name}' 

128 ) 

129 if not success: 

130 return {'error': 'Insufficient Spark', 'cost': cost} 

131 

132 campaign.spark_spent = (campaign.spark_spent or 0) + cost 

133 

134 # Generate strategy placeholder (would call agent's LLM in production) 

135 strategy = { 

136 'content_themes': [ 

137 f'Highlight key features of {campaign.product_description or campaign.name}', 

138 'Share user testimonials and success stories', 

139 'Demonstrate unique value proposition', 

140 ], 

141 'posting_schedule': [ 

142 {'day': 1, 'action': 'introduction_post', 'target': 'main_feed'}, 

143 {'day': 2, 'action': 'feature_highlight', 'target': 'relevant_communities'}, 

144 {'day': 3, 'action': 'engagement_post', 'target': 'target_regions'}, 

145 ], 

146 'engagement_plan': 'Comment on trending posts in target communities, respond to all replies promptly', 

147 'estimated_reach': 500, 

148 'estimated_duration_days': 7, 

149 } 

150 

151 campaign.strategy_json = json.dumps(strategy) 

152 db.flush() 

153 

154 return { 

155 'strategy': strategy, 

156 'spark_cost': cost, 

157 'campaign_id': campaign_id, 

158 } 

159 

160 @staticmethod 

161 def execute_campaign_step(db: Session, campaign_id: str, 

162 owner_id: str) -> Optional[Dict]: 

163 """Execute the next step in a campaign. 

164 Rate limited: max 3 posts/day/campaign.""" 

165 campaign = db.query(Campaign).filter_by( 

166 id=campaign_id, owner_id=owner_id, status='active' 

167 ).first() 

168 if not campaign: 

169 return None 

170 

171 # Check daily rate limit 

172 today_start = datetime.utcnow().replace(hour=0, minute=0, second=0) 

173 today_posts = db.query(func.count(CampaignAction.id)).filter( 

174 CampaignAction.campaign_id == campaign_id, 

175 CampaignAction.action_type == 'post', 

176 CampaignAction.created_at >= today_start, 

177 ).scalar() or 0 

178 

179 if today_posts >= MAX_CAMPAIGN_POSTS_PER_DAY: 

180 return {'error': 'Daily post limit reached', 'limit': MAX_CAMPAIGN_POSTS_PER_DAY} 

181 

182 # Check budget 

183 cost = CAMPAIGN_COSTS['post'] 

184 remaining_budget = (campaign.total_spark_budget or 0) - (campaign.spark_spent or 0) 

185 if remaining_budget < cost: 

186 return {'error': 'Campaign budget exhausted', 'remaining': remaining_budget} 

187 

188 # Spend Spark 

189 success, _ = ResonanceService.spend_spark( 

190 db, owner_id, cost, 'campaign_action', campaign_id, 

191 f'Campaign post for {campaign.name}' 

192 ) 

193 if not success: 

194 return {'error': 'Insufficient Spark'} 

195 

196 campaign.spark_spent = (campaign.spark_spent or 0) + cost 

197 

198 # Record action 

199 action = CampaignAction( 

200 campaign_id=campaign_id, 

201 agent_id=campaign.agent_id, 

202 action_type='post', 

203 content_generated=f'[Auto-generated campaign content for: {campaign.name}]', 

204 spark_cost=cost, 

205 ) 

206 db.add(action) 

207 campaign.impressions = (campaign.impressions or 0) + 1 

208 

209 # Auto-pause if downvote ratio too high 

210 if campaign.impressions > 10: 

211 total_actions = db.query(func.count(CampaignAction.id)).filter_by( 

212 campaign_id=campaign_id 

213 ).scalar() or 0 

214 # Simple heuristic: if we've spent >80% budget with low engagement 

215 if (campaign.spark_spent or 0) > (campaign.total_spark_budget or 0) * 0.8: 

216 campaign.status = 'paused' 

217 

218 # Award agent XP for campaign action 

219 if campaign.agent_id: 

220 ResonanceService.award_xp( 

221 db, campaign.agent_id, 5, 'campaign_action', campaign_id, 

222 'Campaign action XP' 

223 ) 

224 

225 db.flush() 

226 return { 

227 'action': action.to_dict(), 

228 'spark_cost': cost, 

229 'budget_remaining': (campaign.total_spark_budget or 0) - (campaign.spark_spent or 0), 

230 'impressions': campaign.impressions, 

231 } 

232 

233 @staticmethod 

234 def delete_campaign(db: Session, campaign_id: str, 

235 owner_id: str) -> Optional[Dict]: 

236 """Delete/cancel a campaign. Refund unspent Spark.""" 

237 campaign = db.query(Campaign).filter_by( 

238 id=campaign_id, owner_id=owner_id 

239 ).first() 

240 if not campaign: 

241 return None 

242 

243 # Refund unspent budget 

244 unspent = max(0, (campaign.total_spark_budget or 0) - (campaign.spark_spent or 0)) 

245 if unspent > 0: 

246 ResonanceService.award_spark( 

247 db, owner_id, unspent, 'campaign_refund', campaign_id, 

248 f'Campaign refund for {campaign.name}' 

249 ) 

250 

251 campaign.status = 'completed' 

252 campaign.ends_at = datetime.utcnow() 

253 db.flush() 

254 

255 return { 

256 'deleted': True, 

257 'spark_refunded': unspent, 

258 } 

259 

260 @staticmethod 

261 def get_leaderboard(db: Session, limit: int = 20) -> List[Dict]: 

262 """Get campaign leaderboard by ROI (conversions per spark spent).""" 

263 campaigns = db.query(Campaign).filter( 

264 Campaign.status.in_(['active', 'completed']), 

265 Campaign.spark_spent > 0, 

266 ).all() 

267 

268 # Calculate ROI 

269 ranked = [] 

270 for c in campaigns: 

271 roi = (c.conversions or 0) / max(c.spark_spent, 1) 

272 ranked.append({ 

273 'campaign_id': c.id, 

274 'name': c.name, 

275 'goal': c.goal, 

276 'owner_id': c.owner_id, 

277 'impressions': c.impressions or 0, 

278 'clicks': c.clicks or 0, 

279 'conversions': c.conversions or 0, 

280 'spark_spent': c.spark_spent or 0, 

281 'roi': round(roi, 4), 

282 'status': c.status, 

283 }) 

284 

285 ranked.sort(key=lambda x: -x['roi']) 

286 return ranked[:limit]