Coverage for integrations / agent_engine / marketing_tools.py: 41.5%

123 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Unified Agent Goal Engine - Marketing Tools (Tier 2) 

3 

4These tools are loaded ONLY when the agent is working on a marketing goal. 

5They wrap existing CampaignService, AdService, PostService, and channel adapters. 

6 

7Tier 1 (Default): google_search, text_2_image, delegate_to_specialist, etc. 

8Tier 2 (Category): create_social_post, create_campaign, create_ad, post_to_channel 

9Tier 3 (Runtime): delegate_to_specialist finds agents with needed skills via A2A 

10""" 

11import json 

12import logging 

13from typing import Annotated, Optional 

14 

15logger = logging.getLogger('hevolve_social') 

16 

17 

18def register_marketing_tools(helper, assistant, user_id: str): 

19 """Register marketing-specific tools with the agent (Tier 2). 

20 

21 These wrap existing services — no new logic, just tool interfaces 

22 that let the agent use Campaign/Ad/Post services and channel adapters. 

23 

24 Args: 

25 helper: AutoGen helper agent (registers for LLM) 

26 assistant: AutoGen assistant agent (registers for execution) 

27 user_id: Current user ID for ownership 

28 """ 

29 

30 def create_social_post( 

31 title: Annotated[str, "Post title"], 

32 content: Annotated[str, "Post content/body"], 

33 community_id: Annotated[Optional[str], "Community to post in (optional)"] = None, 

34 media_url: Annotated[Optional[str], "URL of media attachment (optional)"] = None, 

35 ) -> str: 

36 """Create a post on the HART social platform.""" 

37 try: 

38 from integrations.social.models import get_db, Post 

39 db = get_db() 

40 try: 

41 post = Post( 

42 author_id=str(user_id), 

43 title=title, 

44 content=content, 

45 community_id=community_id, 

46 media_url=media_url or '', 

47 ) 

48 db.add(post) 

49 db.commit() 

50 result = post.to_dict() 

51 return json.dumps({'success': True, 'post': result}) 

52 finally: 

53 db.close() 

54 except Exception as e: 

55 return json.dumps({'success': False, 'error': str(e)}) 

56 

57 def create_campaign( 

58 name: Annotated[str, "Campaign name"], 

59 description: Annotated[str, "Campaign description and strategy"], 

60 campaign_type: Annotated[str, "Campaign type: awareness|engagement|conversion|retention"] = 'awareness', 

61 target_communities: Annotated[Optional[str], "Comma-separated community IDs to target"] = None, 

62 budget: Annotated[int, "Spark budget for this campaign"] = 50, 

63 ) -> str: 

64 """Create a marketing campaign using the Campaign Service.""" 

65 try: 

66 from integrations.social.campaign_service import CampaignService 

67 from integrations.social.models import get_db 

68 db = get_db() 

69 try: 

70 targets = [t.strip() for t in target_communities.split(',')] if target_communities else [] 

71 result = CampaignService.create_campaign( 

72 db, 

73 creator_id=str(user_id), 

74 name=name, 

75 description=description, 

76 campaign_type=campaign_type, 

77 target_communities=targets, 

78 budget_spark=budget, 

79 ) 

80 db.commit() 

81 return json.dumps({'success': True, 'campaign': result}) 

82 finally: 

83 db.close() 

84 except Exception as e: 

85 return json.dumps({'success': False, 'error': str(e)}) 

86 

87 def create_ad( 

88 title: Annotated[str, "Ad title"], 

89 content: Annotated[str, "Ad content/copy"], 

90 target_url: Annotated[str, "URL the ad links to"], 

91 ad_type: Annotated[str, "Ad type: banner|sponsored|native"] = 'native', 

92 budget: Annotated[int, "Spark budget for this ad"] = 50, 

93 target_audience: Annotated[Optional[str], "Target audience description"] = None, 

94 ) -> str: 

95 """Create a targeted ad unit using the Ad Service.""" 

96 try: 

97 from integrations.social.ad_service import AdService 

98 from integrations.social.models import get_db 

99 db = get_db() 

100 try: 

101 result = AdService.create_ad( 

102 db, 

103 advertiser_id=str(user_id), 

104 title=title, 

105 content=content, 

106 target_url=target_url, 

107 ad_type=ad_type, 

108 budget_spark=budget, 

109 targeting_json={'audience': target_audience} if target_audience else {}, 

110 ) 

111 db.commit() 

112 return json.dumps({'success': True, 'ad': result}) 

113 finally: 

114 db.close() 

115 except Exception as e: 

116 return json.dumps({'success': False, 'error': str(e)}) 

117 

118 def post_to_channel( 

119 channel: Annotated[str, "Channel name: twitter|instagram|email|discord|telegram|whatsapp|slack|linkedin|nostr|matrix"], 

120 content: Annotated[str, "Content to post"], 

121 media_url: Annotated[Optional[str], "Media URL to include (optional)"] = None, 

122 extra_config: Annotated[Optional[str], "JSON string with channel-specific config (optional)"] = None, 

123 ) -> str: 

124 """Post content to an external channel via the unified channel adapter system. 

125 

126 Routes to the appropriate channel adapter (Twitter, Instagram, Email, etc.). 

127 If the channel adapter is not available, delegates to a specialist agent. 

128 """ 

129 try: 

130 from integrations.channels.extensions import get_available_adapters 

131 adapters = get_available_adapters() 

132 

133 # Find matching adapter 

134 adapter_name = f"{channel}_adapter" 

135 adapter_factory = None 

136 for name, factory in adapters.items(): 

137 if channel.lower() in name.lower(): 

138 adapter_factory = factory 

139 break 

140 

141 if adapter_factory: 

142 adapter = adapter_factory() 

143 config = json.loads(extra_config) if extra_config else {} 

144 result = adapter.send_message( 

145 content=content, 

146 media_url=media_url, 

147 **config, 

148 ) 

149 return json.dumps({'success': True, 'channel': channel, 'result': str(result)}) 

150 else: 

151 return json.dumps({ 

152 'success': False, 

153 'error': f'Channel adapter for {channel} not available. ' 

154 f'Use delegate_to_specialist to find an agent with {channel} skills.', 

155 'available_channels': list(adapters.keys()), 

156 }) 

157 except Exception as e: 

158 return json.dumps({'success': False, 'error': str(e)}) 

159 

160 def create_referral_campaign( 

161 name: Annotated[str, "Campaign name"], 

162 description: Annotated[str, "Campaign description and referral strategy"], 

163 referral_message: Annotated[str, "Shareable referral message for users"] = "Join me on HART!", 

164 target_communities: Annotated[Optional[str], "Comma-separated community IDs to target"] = None, 

165 budget: Annotated[int, "Spark budget for this campaign"] = 100, 

166 ) -> str: 

167 """Create a referral-driven growth campaign with auto-generated referral code.""" 

168 try: 

169 from integrations.social.campaign_service import CampaignService 

170 from integrations.social.distribution_service import DistributionService 

171 from integrations.social.models import get_db 

172 db = get_db() 

173 try: 

174 # Generate referral code for the campaign owner 

175 ref_result = DistributionService.get_or_create_referral_code(db, str(user_id)) 

176 ref_code = ref_result.get('code', '') 

177 

178 targets = [t.strip() for t in target_communities.split(',')] if target_communities else [] 

179 result = CampaignService.create_campaign( 

180 db, 

181 creator_id=str(user_id), 

182 name=name, 

183 description=description, 

184 campaign_type='conversion', 

185 target_communities=targets, 

186 budget_spark=budget, 

187 ) 

188 # Store referral mechanics in strategy. Use the canonical 

189 # ``invite_share_url`` builder (single source of truth) so 

190 # G1's Invite_Friend tool, this referral campaign, and any 

191 # future invite surface all emit the same shape. Honors 

192 # ``HEVOLVE_INVITE_BASE_URL`` env override automatically. 

193 if result and isinstance(result, dict): 

194 from integrations.social.distribution_service import ( 

195 invite_share_url, 

196 ) 

197 result['referral_code'] = ref_code 

198 result['referral_message'] = referral_message 

199 result['referral_link'] = ( 

200 invite_share_url(ref_code) if ref_code else '' 

201 ) 

202 

203 db.commit() 

204 return json.dumps({'success': True, 'campaign': result}) 

205 finally: 

206 db.close() 

207 except Exception as e: 

208 return json.dumps({'success': False, 'error': str(e)}) 

209 

210 def get_growth_metrics() -> str: 

211 """Get platform growth metrics including viral coefficient (K factor).""" 

212 try: 

213 from integrations.social.models import get_db, User 

214 from integrations.social.models import Campaign, Referral 

215 from datetime import datetime, timedelta 

216 db = get_db() 

217 try: 

218 now = datetime.utcnow() 

219 week_ago = now - timedelta(days=7) 

220 

221 total_users = db.query(User).count() 

222 new_users_7d = db.query(User).filter(User.created_at >= week_ago).count() 

223 

224 # Referral metrics 

225 total_referrals = db.query(Referral).count() 

226 recent_referrals = db.query(Referral).filter( 

227 Referral.created_at >= week_ago 

228 ).count() 

229 

230 # Viral coefficient K = avg_referrals_per_user * conversion_rate 

231 users_with_referrals = db.query(Referral.referrer_id).distinct().count() 

232 avg_referrals = total_referrals / max(users_with_referrals, 1) 

233 conversion_rate = total_referrals / max(total_users, 1) 

234 k_factor = avg_referrals * conversion_rate 

235 

236 # Top campaigns 

237 top_campaigns = db.query(Campaign).order_by( 

238 Campaign.created_at.desc() 

239 ).limit(5).all() 

240 

241 db.commit() 

242 return json.dumps({ 

243 'success': True, 

244 'metrics': { 

245 'total_users': total_users, 

246 'new_users_7d': new_users_7d, 

247 'total_referrals': total_referrals, 

248 'recent_referrals_7d': recent_referrals, 

249 'users_who_referred': users_with_referrals, 

250 'avg_referrals_per_referrer': round(avg_referrals, 2), 

251 'conversion_rate': round(conversion_rate, 4), 

252 'viral_coefficient_k': round(k_factor, 4), 

253 'k_status': 'exponential' if k_factor > 1 else 'sub-viral', 

254 'top_campaigns': [c.to_dict() for c in top_campaigns], 

255 } 

256 }) 

257 finally: 

258 db.close() 

259 except Exception as e: 

260 return json.dumps({'success': False, 'error': str(e)}) 

261 

262 # Register all marketing tools 

263 tools = [ 

264 ('create_social_post', 'Create a post on the HART social platform for marketing', create_social_post), 

265 ('create_campaign', 'Create a marketing campaign with strategy, targeting, and budget', create_campaign), 

266 ('create_ad', 'Create a targeted ad unit with budget and audience targeting', create_ad), 

267 ('post_to_channel', 'Post content to external channels (Twitter, Instagram, Email, Discord, etc.)', post_to_channel), 

268 ('create_referral_campaign', 'Create a referral-driven growth campaign with auto-generated referral code', create_referral_campaign), 

269 ('get_growth_metrics', 'Get platform growth metrics including viral coefficient (K factor)', get_growth_metrics), 

270 ] 

271 

272 for name, desc, func in tools: 

273 helper.register_for_llm(name=name, description=desc)(func) 

274 assistant.register_for_execution(name=name)(func) 

275 

276 logger.info(f"Registered {len(tools)} marketing tools for user {user_id}") 

277 

278 # Register skills so OTHER agents can discover and delegate TO this agent 

279 try: 

280 from integrations.internal_comm.internal_agent_communication import register_agent_with_skills 

281 register_agent_with_skills(f"marketing_{user_id}", [ 

282 {'name': 'social_posting', 'description': 'Create posts on HART platform', 'proficiency': 0.9}, 

283 {'name': 'campaign_management', 'description': 'Create and manage marketing campaigns', 'proficiency': 0.9}, 

284 {'name': 'ad_creation', 'description': 'Create targeted ad units', 'proficiency': 0.9}, 

285 {'name': 'channel_distribution', 'description': 'Post to external channels', 'proficiency': 0.8}, 

286 {'name': 'referral_campaigns', 'description': 'Create referral-driven growth campaigns', 'proficiency': 0.9}, 

287 {'name': 'growth_analytics', 'description': 'Analyze growth metrics and viral coefficient', 'proficiency': 0.85}, 

288 ]) 

289 except Exception as e: 

290 logger.debug(f"Marketing skill registration skipped: {e}") 

291 

292 

293def detect_goal_tags(prompt) -> list: 

294 """Detect goal type tags from a prompt for category-based tool loading. 

295 

296 Returns list of tags like ['marketing'], ['coding'], or [] for general. 

297 

298 Accepts any input — the autogen agent-creation path 

299 (``create_recipe.py:1735``) sometimes hands HARTOS's own 

300 ``helper.Action`` object (``helper.py:1363``, the multi-step task 

301 tracker) instead of a raw string. Without coercion, 

302 ``prompt.lower()`` raises ``AttributeError: 'Action' object has no 

303 attribute 'lower'`` and the surrounding ``except`` swallows it 

304 silently — leaving the agent with NO goal-specific tools 

305 registered, so the LLM produces prose instead of taking real 

306 actions. Root cause for ~6+ weeks of marketing goals "completing" 

307 without any outreach side-effects. 

308 """ 

309 lower = str(prompt).lower() 

310 tags = [] 

311 

312 marketing_keywords = [ 

313 'market', 'campaign', 'advertis', 'promotion', 'social media', 

314 'content marketing', 'brand', 'outbound', 'inbound', 'lead gen', 

315 'email marketing', 'seo', 'influencer', 'viral', 'engagement', 

316 'conversion', 'target audience', 'marketing goal', 'ad ', 'ads ', 

317 ] 

318 if any(kw in lower for kw in marketing_keywords): 

319 tags.append('marketing') 

320 

321 coding_keywords = [ 

322 'github', 'repository', 'codebase', 'refactor', 'implement', 

323 'bug fix', 'pull request', 'commit', 'branch', 'repo', 

324 ] 

325 if any(kw in lower for kw in coding_keywords): 

326 tags.append('coding') 

327 

328 ip_keywords = [ 

329 'patent', 'trademark', 'copyright', 'intellectual property', 

330 'ip protection', 'infringement', 'prior art', 'cease and desist', 

331 'dmca', 'filing', 'provisional patent', 'claims', 

332 ] 

333 if any(kw in lower for kw in ip_keywords): 

334 tags.append('ip_protection') 

335 

336 self_build_keywords = [ 

337 'self-build', 'self_build', 'nixos', 'nix package', 'install package', 

338 'runtime.nix', 'os rebuild', 'nixos-rebuild', 'system package', 

339 'hart-pkg', 'nix-env', 'system generation', 'rollback generation', 

340 ] 

341 if any(kw in lower for kw in self_build_keywords): 

342 tags.append('self_build') 

343 

344 outreach_keywords = [ 

345 'outreach', 'prospect', 'cold email', 'follow up', 'follow-up', 

346 'sales email', 'pipeline', 'crm', 'lead gen', 'deal stage', 

347 'send email to', 'contact ', 'outbound email', 'drip', 'sequence', 

348 'partnership outreach', 'b2b', 'sales funnel', 

349 ] 

350 if any(kw in lower for kw in outreach_keywords): 

351 tags.append('outreach') 

352 

353 sales_keywords = [ 

354 'sales agent', 'marketing agent', 'flywheel', 'user journey', 

355 'a/b test', 'ab test', 'prospect journey', 'sales pipeline', 

356 'meeting schedule', 'deal close', 'partner onboard', 

357 'channel outreach', 'multi-channel', 'nurture', 

358 ] 

359 if any(kw in lower for kw in sales_keywords): 

360 tags.append('sales') 

361 

362 return tags