Coverage for integrations / agent_engine / journey_engine.py: 14.9%

343 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Agentic User Journey Engine for HARTOS Marketing Flywheel 

3 

4Orchestrates the full prospect lifecycle across channels: 

5 Discover -> Research -> Outreach -> Follow-up -> Reply -> Meeting -> Close 

6 

7Each stage has: 

8 - Entry conditions (triggers) 

9 - Agentic actions (what HARTOS does proactively) 

10 - Exit conditions (what moves to next stage) 

11 - Channel preferences (where to reach the prospect) 

12 

13The engine is: 

14 - Extensible: add stages/actions via register_stage() 

15 - Scalable: async execution via HARTOS workflow engine 

16 - Intuitive: config-driven, not code-driven 

17 - Agentic: HARTOS proactively researches, writes, follows up, schedules 

18 

19Integrates with: 

20 - outreach_crm_tools.py (prospect store + Erxes CRM) 

21 - erxes_client.py (native CRM sync) 

22 - channels/ (Discord, Telegram, Slack, WhatsApp, Email) 

23 - automation/workflows.py (async workflow execution) 

24 - automation/triggers.py (event-based triggers) 

25 - agent_daemon.py (periodic tick-based checks) 

26 - marketing_tools.py (social posting, campaigns) 

27""" 

28import json 

29import logging 

30import os 

31import threading 

32import time 

33from datetime import datetime, timedelta 

34from typing import Any, Callable, Dict, List, Optional 

35 

36logger = logging.getLogger('hevolve_journey') 

37 

38# ---- Journey Stage Definitions ---- 

39 

40STAGES = { 

41 'discover': { 

42 'order': 0, 

43 'description': 'Prospect identified but not yet researched', 

44 'crm_stage': 'new', 

45 'auto_actions': ['research_prospect'], 

46 'exit_to': 'research', 

47 }, 

48 'research': { 

49 'order': 1, 

50 'description': 'Agent researching prospect (funding, tech, pain points)', 

51 'crm_stage': 'new', 

52 'auto_actions': ['deep_research', 'generate_personalized_email'], 

53 'exit_to': 'outreach', 

54 }, 

55 'outreach': { 

56 'order': 2, 

57 'description': 'Initial personalized email sent', 

58 'crm_stage': 'contacted', 

59 'auto_actions': ['send_outreach_email'], 

60 'exit_to': 'nurture', 

61 }, 

62 'nurture': { 

63 'order': 3, 

64 'description': 'Follow-up sequence running (day 3, 7, 14)', 

65 'crm_stage': 'contacted', 

66 'auto_actions': ['check_followups', 'try_alternate_channel'], 

67 'exit_to': 'engaged', 

68 'exit_on': 'reply_received', 

69 }, 

70 'engaged': { 

71 'order': 4, 

72 'description': 'Prospect replied -- agent drafts response', 

73 'crm_stage': 'replied', 

74 'auto_actions': ['draft_response', 'notify_user', 'propose_meeting'], 

75 'exit_to': 'meeting', 

76 }, 

77 'meeting': { 

78 'order': 5, 

79 'description': 'Meeting scheduled or in progress', 

80 'crm_stage': 'meeting', 

81 'auto_actions': ['prepare_demo_materials', 'send_calendar_invite'], 

82 'exit_to': 'negotiation', 

83 }, 

84 'negotiation': { 

85 'order': 6, 

86 'description': 'Active deal negotiation', 

87 'crm_stage': 'negotiation', 

88 'auto_actions': ['track_deal_progress', 'generate_proposal'], 

89 'exit_to': 'won', 

90 }, 

91 'won': { 

92 'order': 7, 

93 'description': 'Deal closed -- onboard partner', 

94 'crm_stage': 'won', 

95 'auto_actions': ['send_welcome_sequence', 'create_partner_channel'], 

96 'exit_to': None, 

97 }, 

98 'lost': { 

99 'order': -1, 

100 'description': 'Deal lost -- schedule re-engagement', 

101 'crm_stage': 'lost', 

102 'auto_actions': ['schedule_reengagement', 'analyze_loss_reason'], 

103 'exit_to': None, 

104 }, 

105} 

106 

107# ---- Channel Strategy ---- 

108 

109CHANNEL_PRIORITY = { 

110 'outreach': ['email'], 

111 'nurture': ['email', 'linkedin'], 

112 'engaged': ['email', 'slack', 'discord'], 

113 'meeting': ['email', 'slack', 'discord', 'whatsapp'], 

114 'negotiation': ['email', 'slack', 'whatsapp'], 

115 'won': ['email', 'slack', 'discord', 'telegram'], 

116} 

117 

118# ---- A/B Test Tracking ---- 

119 

120AB_VARIANTS = { 

121 'subject_direct': { 

122 'id': 'A', 

123 'template': 'quick question about {product_keyword}', 

124 'style': 'direct, curiosity-driven', 

125 }, 

126 'subject_mutual': { 

127 'id': 'B', 

128 'template': '{company} + HARTOS -- better together?', 

129 'style': 'partnership-framed, collaborative', 

130 }, 

131} 

132 

133 

134class JourneyEngine: 

135 """Orchestrates prospect journeys across the HARTOS flywheel.""" 

136 

137 def __init__(self): 

138 self._stages = dict(STAGES) 

139 self._action_handlers: Dict[str, Callable] = {} 

140 self._hooks: Dict[str, List[Callable]] = {} # stage_enter, stage_exit, etc. 

141 self._lock = threading.Lock() 

142 self._register_default_actions() 

143 

144 # ---- Stage Management ---- 

145 

146 def register_stage(self, name: str, config: Dict): 

147 """Add or override a journey stage.""" 

148 self._stages[name] = config 

149 logger.info('Journey: registered stage "%s"', name) 

150 

151 def register_action(self, name: str, handler: Callable): 

152 """Register an action handler. 

153 

154 Handler signature: handler(prospect: Dict, context: Dict) -> Dict 

155 """ 

156 self._action_handlers[name] = handler 

157 logger.info('Journey: registered action "%s"', name) 

158 

159 def register_hook(self, event: str, handler: Callable): 

160 """Register a hook for journey events. 

161 

162 Events: stage_enter, stage_exit, email_sent, reply_received, 

163 meeting_scheduled, deal_won, deal_lost 

164 """ 

165 self._hooks.setdefault(event, []).append(handler) 

166 

167 # ---- Journey Execution ---- 

168 

169 def advance_prospect(self, prospect: Dict, force_stage: str = None) -> Dict: 

170 """Move a prospect to the next stage and execute actions. 

171 

172 Returns updated prospect with actions taken. 

173 """ 

174 current = prospect.get('journey_stage', prospect.get('stage', 'discover')) 

175 stage_config = self._stages.get(current, {}) 

176 

177 # Determine target stage 

178 if force_stage: 

179 target = force_stage 

180 else: 

181 target = stage_config.get('exit_to', current) 

182 

183 if not target or target == current: 

184 return {'moved': False, 'stage': current, 'actions': []} 

185 

186 # Fire exit hooks 

187 self._fire_hooks('stage_exit', prospect=prospect, from_stage=current, to_stage=target) 

188 

189 # Update prospect 

190 prospect['journey_stage'] = target 

191 prospect['stage'] = self._stages.get(target, {}).get('crm_stage', target) 

192 prospect['updated_at'] = datetime.utcnow().isoformat() 

193 

194 # Fire enter hooks 

195 self._fire_hooks('stage_enter', prospect=prospect, stage=target) 

196 

197 # Execute auto-actions for the new stage 

198 actions_taken = self._execute_stage_actions(prospect, target) 

199 

200 # Sync to CRM 

201 self._sync_crm(prospect) 

202 

203 logger.info('Journey: %s moved %s -> %s (%d actions)', 

204 prospect.get('company', '?'), current, target, len(actions_taken)) 

205 

206 return {'moved': True, 'from': current, 'to': target, 'actions': actions_taken} 

207 

208 def run_stage_actions(self, prospect: Dict) -> List[Dict]: 

209 """Execute pending actions for the prospect's current stage. 

210 

211 Called by the daemon on each tick. 

212 """ 

213 stage = prospect.get('journey_stage', prospect.get('stage', 'discover')) 

214 return self._execute_stage_actions(prospect, stage) 

215 

216 def process_event(self, event_type: str, prospect: Dict, event_data: Dict = None) -> Dict: 

217 """Process an external event that may trigger a stage transition. 

218 

219 Events: 

220 reply_received: prospect replied to email 

221 meeting_confirmed: meeting scheduled 

222 deal_won / deal_lost: final outcomes 

223 channel_message: message on any channel 

224 link_clicked: tracking pixel fired 

225 """ 

226 current = prospect.get('journey_stage', prospect.get('stage', 'discover')) 

227 

228 transitions = { 

229 'reply_received': { 

230 'from': ['outreach', 'nurture', 'contacted'], 

231 'to': 'engaged', 

232 }, 

233 'meeting_confirmed': { 

234 'from': ['engaged', 'replied'], 

235 'to': 'meeting', 

236 }, 

237 'deal_won': { 

238 'from': ['negotiation', 'meeting'], 

239 'to': 'won', 

240 }, 

241 'deal_lost': { 

242 'from': ['negotiation', 'meeting', 'engaged', 'nurture'], 

243 'to': 'lost', 

244 }, 

245 } 

246 

247 transition = transitions.get(event_type) 

248 if transition and current in transition.get('from', []): 

249 self._fire_hooks(event_type, prospect=prospect, data=event_data) 

250 return self.advance_prospect(prospect, force_stage=transition['to']) 

251 

252 return {'moved': False, 'stage': current, 'event': event_type} 

253 

254 # ---- Tick-based Processing (called by daemon) ---- 

255 

256 def tick(self, all_prospects: Dict) -> Dict: 

257 """Process all prospects on a daemon tick. 

258 

259 Checks for: 

260 - Pending follow-ups to send 

261 - Stage transitions that should happen 

262 - Prospects stuck in a stage too long 

263 - A/B test results to analyze 

264 

265 Returns summary of actions taken. 

266 """ 

267 summary = {'actions': 0, 'transitions': 0, 'followups_sent': 0} 

268 

269 for pid, prospect in all_prospects.items(): 

270 stage = prospect.get('journey_stage', prospect.get('stage', 'discover')) 

271 stage_config = self._stages.get(stage, {}) 

272 

273 # Check if prospect should auto-advance 

274 if stage == 'discover' and not prospect.get('researched'): 

275 result = self.advance_prospect(prospect, 'research') 

276 if result.get('moved'): 

277 summary['transitions'] += 1 

278 

279 elif stage == 'research' and prospect.get('email_draft'): 

280 result = self.advance_prospect(prospect, 'outreach') 

281 if result.get('moved'): 

282 summary['transitions'] += 1 

283 

284 elif stage in ('outreach', 'nurture', 'contacted'): 

285 # Check for due follow-ups 

286 actions = self.run_stage_actions(prospect) 

287 summary['actions'] += len(actions) 

288 

289 # Check for stale prospects (no activity for 21+ days) 

290 last_activity = prospect.get('last_email_at') or prospect.get('updated_at', '') 

291 if last_activity: 

292 try: 

293 last = datetime.fromisoformat(last_activity.replace('Z', '+00:00').replace('+00:00', '')) 

294 days_stale = (datetime.utcnow() - last).days 

295 if days_stale > 21 and stage in ('nurture', 'contacted'): 

296 # Try alternate channel 

297 self._try_alternate_channel(prospect) 

298 summary['actions'] += 1 

299 except (ValueError, TypeError): 

300 pass 

301 

302 return summary 

303 

304 # ---- A/B Testing ---- 

305 

306 def get_ab_stats(self, prospects: Dict) -> Dict: 

307 """Analyze A/B test results across all prospects.""" 

308 stats = {'A': {'sent': 0, 'replied': 0, 'meetings': 0}, 

309 'B': {'sent': 0, 'replied': 0, 'meetings': 0}} 

310 

311 for pid, p in prospects.items(): 

312 variant = 'A' if 'variant: A' in p.get('notes', '') else 'B' if 'variant: B' in p.get('notes', '') else None 

313 if not variant: 

314 continue 

315 stats[variant]['sent'] += 1 

316 if p.get('last_reply_at'): 

317 stats[variant]['replied'] += 1 

318 if p.get('stage') in ('meeting', 'negotiation', 'won'): 

319 stats[variant]['meetings'] += 1 

320 

321 # Calculate rates 

322 for v in ('A', 'B'): 

323 sent = stats[v]['sent'] or 1 

324 stats[v]['reply_rate'] = round(stats[v]['replied'] / sent * 100, 1) 

325 stats[v]['meeting_rate'] = round(stats[v]['meetings'] / sent * 100, 1) 

326 

327 # Determine winner 

328 a_score = stats['A']['reply_rate'] + stats['A']['meeting_rate'] * 2 

329 b_score = stats['B']['reply_rate'] + stats['B']['meeting_rate'] * 2 

330 stats['winner'] = 'A' if a_score > b_score else 'B' if b_score > a_score else 'tie' 

331 stats['confidence'] = 'low' if (stats['A']['sent'] + stats['B']['sent']) < 20 else 'medium' if (stats['A']['sent'] + stats['B']['sent']) < 50 else 'high' 

332 

333 return stats 

334 

335 # ---- Channel Routing ---- 

336 

337 def get_channels_for_stage(self, stage: str) -> List[str]: 

338 """Get preferred channels for a journey stage.""" 

339 return CHANNEL_PRIORITY.get(stage, ['email']) 

340 

341 def send_via_channel(self, prospect: Dict, message: str, channel: str = None) -> Dict: 

342 """Send a message to a prospect via the best available channel.""" 

343 stage = prospect.get('journey_stage', 'outreach') 

344 channels = [channel] if channel else self.get_channels_for_stage(stage) 

345 

346 for ch in channels: 

347 result = self._send_channel_message(prospect, message, ch) 

348 if result.get('success'): 

349 return result 

350 

351 return {'success': False, 'error': 'no channel available'} 

352 

353 # ---- Internal Methods ---- 

354 

355 def _register_default_actions(self): 

356 """Register built-in action handlers.""" 

357 self._action_handlers.update({ 

358 'research_prospect': self._action_research, 

359 'deep_research': self._action_deep_research, 

360 'generate_personalized_email': self._action_generate_email, 

361 'send_outreach_email': self._action_send_email, 

362 'check_followups': self._action_check_followups, 

363 'try_alternate_channel': self._action_try_alternate_channel, 

364 'draft_response': self._action_draft_response, 

365 'notify_user': self._action_notify_user, 

366 'propose_meeting': self._action_propose_meeting, 

367 'prepare_demo_materials': self._action_noop, 

368 'send_calendar_invite': self._action_noop, 

369 'track_deal_progress': self._action_noop, 

370 'generate_proposal': self._action_noop, 

371 'send_welcome_sequence': self._action_noop, 

372 'create_partner_channel': self._action_noop, 

373 'schedule_reengagement': self._action_schedule_reengagement, 

374 'analyze_loss_reason': self._action_noop, 

375 }) 

376 

377 def _execute_stage_actions(self, prospect: Dict, stage: str) -> List[Dict]: 

378 """Execute all auto-actions for a stage.""" 

379 stage_config = self._stages.get(stage, {}) 

380 actions = stage_config.get('auto_actions', []) 

381 results = [] 

382 

383 for action_name in actions: 

384 handler = self._action_handlers.get(action_name) 

385 if handler: 

386 try: 

387 result = handler(prospect, {}) 

388 results.append({'action': action_name, 'result': result}) 

389 except Exception as e: 

390 logger.error('Journey action %s failed: %s', action_name, e) 

391 results.append({'action': action_name, 'error': str(e)}) 

392 

393 return results 

394 

395 def _fire_hooks(self, event: str, **kwargs): 

396 """Fire all hooks for an event.""" 

397 for hook in self._hooks.get(event, []): 

398 try: 

399 hook(**kwargs) 

400 except Exception as e: 

401 logger.error('Journey hook %s failed: %s', event, e) 

402 

403 def _sync_crm(self, prospect: Dict): 

404 """Sync prospect state to Erxes CRM.""" 

405 try: 

406 from integrations.agent_engine.erxes_client import get_erxes_client 

407 erxes = get_erxes_client() 

408 if erxes and prospect.get('erxes_deal_id'): 

409 crm_stage = prospect.get('stage', 'new') 

410 erxes.move_deal(prospect['erxes_deal_id'], crm_stage) 

411 except Exception as e: 

412 logger.debug('CRM sync failed: %s', e) 

413 

414 def _send_channel_message(self, prospect: Dict, message: str, channel: str) -> Dict: 

415 """Send a message via a specific channel adapter.""" 

416 if channel == 'email': 

417 try: 

418 from integrations.agent_engine.outreach_crm_tools import _send_email 

419 result = _send_email(prospect['email'], 'Update from HevolveAI', message) 

420 return {'success': result.get('success', False), 'channel': 'email'} 

421 except Exception as e: 

422 return {'success': False, 'error': str(e)} 

423 

424 # Other channels via registry 

425 try: 

426 from integrations.channels.registry import get_registry 

427 registry = get_registry() 

428 adapter = registry.get_adapter(channel) 

429 if adapter: 

430 import asyncio 

431 loop = getattr(registry, '_loop', None) 

432 if loop and loop.is_running(): 

433 future = asyncio.run_coroutine_threadsafe( 

434 adapter.send_message( 

435 chat_id=prospect.get('channel_ids', {}).get(channel, ''), 

436 text=message, 

437 ), 

438 loop, 

439 ) 

440 result = future.result(timeout=15) 

441 return {'success': result.success, 'channel': channel} 

442 except Exception as e: 

443 logger.debug('Channel %s send failed: %s', channel, e) 

444 

445 return {'success': False, 'channel': channel, 'error': 'adapter unavailable'} 

446 

447 def _try_alternate_channel(self, prospect: Dict): 

448 """Try reaching a prospect via a different channel than email.""" 

449 stage = prospect.get('journey_stage', 'nurture') 

450 channels = self.get_channels_for_stage(stage) 

451 for ch in channels: 

452 if ch != 'email': 

453 message = ( 

454 "hey, sent you an email about HARTOS a few days ago. " 

455 "just wanted to make sure it didn't get lost. " 

456 "happy to chat here if that's easier." 

457 ) 

458 result = self.send_via_channel(prospect, message, ch) 

459 if result.get('success'): 

460 prospect['alternate_channel_tried'] = ch 

461 prospect['updated_at'] = datetime.utcnow().isoformat() 

462 return 

463 

464 # ---- Action Handlers ---- 

465 

466 def _action_research(self, prospect: Dict, ctx: Dict) -> Dict: 

467 """Use Crawl4AI to research a prospect's company.""" 

468 url = prospect.get('url', '') 

469 if not url: 

470 return {'skipped': True, 'reason': 'no url'} 

471 

472 try: 

473 from urllib.request import Request, urlopen 

474 crawl_url = os.environ.get('CRAWL4AI_URL', 'http://172.17.0.1:8094') 

475 payload = json.dumps({'url': url, 'timeout': 60000}).encode('utf-8') 

476 req = Request(crawl_url + '/crawl', data=payload, 

477 headers={'Content-Type': 'application/json'}) 

478 resp = urlopen(req, timeout=120) 

479 data = json.loads(resp.read().decode('utf-8')) 

480 if data.get('markdown'): 

481 prospect['research'] = data['markdown'][:2000] 

482 prospect['researched'] = True 

483 return {'success': True, 'words': data.get('word_count', 0)} 

484 except Exception as e: 

485 logger.debug('Research crawl failed: %s', e) 

486 

487 return {'success': False} 

488 

489 def _action_deep_research(self, prospect: Dict, ctx: Dict) -> Dict: 

490 """Agent-powered deep research via HARTOS dispatch.""" 

491 if prospect.get('deep_researched'): 

492 return {'skipped': True} 

493 

494 try: 

495 from integrations.agent_engine.dispatch import dispatch_goal 

496 prompt = ( 

497 "Research %s (%s). Find: funding stage, team size, tech stack, " 

498 "recent news, pain points that HARTOS could solve. " 

499 "Summarize in 3-4 bullet points." 

500 % (prospect.get('company', ''), prospect.get('url', '')) 

501 ) 

502 dispatch_goal( 

503 user_id=prospect.get('created_by', 'system'), 

504 prompt=prompt, 

505 goal_tags=['research'], 

506 ) 

507 prospect['deep_researched'] = True 

508 return {'dispatched': True} 

509 except Exception as e: 

510 return {'error': str(e)} 

511 

512 def _action_generate_email(self, prospect: Dict, ctx: Dict) -> Dict: 

513 """Generate a personalized outreach email based on research.""" 

514 research = prospect.get('research', '') 

515 company = prospect.get('company', '') 

516 product = prospect.get('product', prospect.get('notes', '')) 

517 

518 # Simple template (agent can override with dispatch) 

519 prospect['email_draft'] = { 

520 'subject': 'quick question about %s' % company.lower(), 

521 'body': ( 

522 '<p>hey,</p>' 

523 '<p>been looking at what %s is building. %s</p>' 

524 '<p>we have an open-source on-device AI runtime (HARTOS) ' 

525 'that handles LLM inference, vision, speech, and multi-agent ' 

526 'orchestration right on the hardware. no cloud dependency.</p>' 

527 '<p>worth a quick chat?</p>' 

528 '<p>sathish<br>founder, HevolveAI</p>' 

529 ) % (company, product[:100] if product else ''), 

530 } 

531 return {'generated': True} 

532 

533 def _action_send_email(self, prospect: Dict, ctx: Dict) -> Dict: 

534 """Send the outreach email.""" 

535 draft = prospect.get('email_draft') 

536 if not draft: 

537 return {'skipped': True, 'reason': 'no draft'} 

538 

539 if prospect.get('emails_sent', 0) > 0: 

540 return {'skipped': True, 'reason': 'already sent'} 

541 

542 try: 

543 from integrations.agent_engine.outreach_crm_tools import _send_email 

544 result = _send_email(prospect['email'], draft['subject'], draft['body']) 

545 if result.get('success') or result.get('via'): 

546 prospect['emails_sent'] = 1 

547 prospect['last_email_at'] = datetime.utcnow().isoformat() 

548 prospect['stage'] = 'contacted' 

549 return {'sent': True, 'result': result} 

550 except Exception as e: 

551 return {'error': str(e)} 

552 

553 return {'sent': False} 

554 

555 def _action_check_followups(self, prospect: Dict, ctx: Dict) -> Dict: 

556 """Check and send pending follow-ups (delegates to outreach tools).""" 

557 try: 

558 from integrations.agent_engine.outreach_crm_tools import check_pending_followups_daemon 

559 return check_pending_followups_daemon() 

560 except Exception as e: 

561 return {'error': str(e)} 

562 

563 def _action_try_alternate_channel(self, prospect: Dict, ctx: Dict) -> Dict: 

564 """Try reaching via alternate channel if email isn't working.""" 

565 if prospect.get('alternate_channel_tried'): 

566 return {'skipped': True} 

567 self._try_alternate_channel(prospect) 

568 return {'tried': prospect.get('alternate_channel_tried', 'none')} 

569 

570 def _action_draft_response(self, prospect: Dict, ctx: Dict) -> Dict: 

571 """Dispatch agent to draft a response to prospect's reply.""" 

572 try: 

573 from integrations.agent_engine.outreach_crm_tools import _dispatch_response_draft 

574 context = { 

575 'prospect': prospect, 

576 'their_reply': prospect.get('last_reply', {}), 

577 'our_emails': [], 

578 } 

579 _dispatch_response_draft(prospect, context) 

580 return {'dispatched': True} 

581 except Exception as e: 

582 return {'error': str(e)} 

583 

584 def _action_notify_user(self, prospect: Dict, ctx: Dict) -> Dict: 

585 """Push notification to user about prospect activity.""" 

586 try: 

587 from integrations.channels.response.router import get_response_router 

588 router = get_response_router() 

589 msg = "%s (%s) is now in stage: %s" % ( 

590 prospect.get('company', '?'), 

591 prospect.get('email', '?'), 

592 prospect.get('journey_stage', prospect.get('stage', '?')), 

593 ) 

594 router.route_response( 

595 user_id=prospect.get('created_by', 'system'), 

596 response_text=msg, 

597 fan_out=True, 

598 ) 

599 return {'notified': True} 

600 except Exception as e: 

601 return {'error': str(e)} 

602 

603 def _action_propose_meeting(self, prospect: Dict, ctx: Dict) -> Dict: 

604 """Agent proposes a meeting time to the prospect.""" 

605 if prospect.get('meeting_proposed'): 

606 return {'skipped': True} 

607 

608 try: 

609 from integrations.agent_engine.outreach_crm_tools import _send_email 

610 result = _send_email( 

611 prospect['email'], 

612 're: HARTOS partnership', 

613 '<p>great to hear back from you! would any of these work for a quick call?</p>' 

614 '<ul>' 

615 '<li>this week: Thursday or Friday, 2-4pm PT</li>' 

616 '<li>next week: Tuesday or Wednesday, morning PT</li>' 

617 '</ul>' 

618 '<p>happy to adjust to your timezone.</p>' 

619 '<p>sathish</p>', 

620 ) 

621 prospect['meeting_proposed'] = True 

622 return {'proposed': True, 'result': result} 

623 except Exception as e: 

624 return {'error': str(e)} 

625 

626 def _action_schedule_reengagement(self, prospect: Dict, ctx: Dict) -> Dict: 

627 """Schedule re-engagement for lost deals (try again in 60 days).""" 

628 prospect['reengagement_at'] = (datetime.utcnow() + timedelta(days=60)).isoformat() 

629 return {'scheduled': True, 'date': prospect['reengagement_at']} 

630 

631 def _action_noop(self, prospect: Dict, ctx: Dict) -> Dict: 

632 """Placeholder for actions not yet implemented.""" 

633 return {'noop': True} 

634 

635 

636# ---- Singleton ---- 

637 

638_engine = None 

639_engine_lock = threading.Lock() 

640 

641 

642def get_journey_engine() -> JourneyEngine: 

643 """Get or create the singleton journey engine.""" 

644 global _engine 

645 with _engine_lock: 

646 if _engine is None: 

647 _engine = JourneyEngine() 

648 logger.info('Journey engine initialized with %d stages', len(_engine._stages)) 

649 return _engine 

650 

651 

652# ---- Agent Tool Registration ---- 

653 

654def register_journey_tools(helper, assistant, user_id: str): 

655 """Register journey tools for the marketing/sales agent.""" 

656 # Routed through core.labeled_autogen_function so each tool emits a 

657 # publish_chat_stage('tool_call', text=…) before invocation — same UI 

658 # status pipeline as LangChain's _with_tool_logging chokepoint. 

659 from core.labeled_autogen_function import register_labeled_function 

660 

661 engine = get_journey_engine() 

662 

663 def view_journey_pipeline() -> str: 

664 """View the full prospect journey pipeline with stage counts and A/B stats.""" 

665 from integrations.agent_engine.outreach_crm_tools import _load_prospects 

666 data = _load_prospects() 

667 prospects = data.get('prospects', {}) 

668 

669 # Group by journey stage 

670 pipeline = {} 

671 for pid, p in prospects.items(): 

672 stage = p.get('journey_stage', p.get('stage', 'unknown')) 

673 pipeline.setdefault(stage, []).append({ 

674 'company': p.get('company'), 

675 'email': p.get('email'), 

676 'emails_sent': p.get('emails_sent', 0), 

677 'last_activity': p.get('last_email_at') or p.get('updated_at'), 

678 }) 

679 

680 # A/B stats 

681 ab = engine.get_ab_stats(prospects) 

682 

683 return json.dumps({ 

684 'pipeline': {k: {'count': len(v), 'prospects': v} for k, v in pipeline.items()}, 

685 'total': len(prospects), 

686 'ab_test': ab, 

687 }, default=str) 

688 

689 def advance_prospect_stage( 

690 prospect_id: str, 

691 target_stage: str = None, 

692 ) -> str: 

693 """Move a prospect to the next journey stage (or a specific stage). 

694 

695 Executes all auto-actions for the new stage. 

696 """ 

697 from integrations.agent_engine.outreach_crm_tools import _load_prospects, _save_prospects 

698 data = _load_prospects() 

699 prospect = data['prospects'].get(prospect_id) 

700 if not prospect: 

701 return json.dumps({'error': 'prospect not found'}) 

702 

703 result = engine.advance_prospect(prospect, force_stage=target_stage) 

704 _save_prospects(data) 

705 return json.dumps(result, default=str) 

706 

707 def run_journey_tick() -> str: 

708 """Run one tick of the journey engine across all prospects. 

709 

710 Checks follow-ups, stage transitions, stale prospects, and A/B results. 

711 """ 

712 from integrations.agent_engine.outreach_crm_tools import _load_prospects, _save_prospects 

713 data = _load_prospects() 

714 summary = engine.tick(data.get('prospects', {})) 

715 _save_prospects(data) 

716 return json.dumps(summary, default=str) 

717 

718 def send_prospect_message( 

719 prospect_id: str, 

720 message: str, 

721 channel: str = None, 

722 ) -> str: 

723 """Send a message to a prospect via the best available channel. 

724 

725 Channels: email, slack, discord, telegram, whatsapp. 

726 If no channel specified, uses the best one for the prospect's journey stage. 

727 """ 

728 from integrations.agent_engine.outreach_crm_tools import _load_prospects 

729 data = _load_prospects() 

730 prospect = data['prospects'].get(prospect_id) 

731 if not prospect: 

732 return json.dumps({'error': 'prospect not found'}) 

733 

734 result = engine.send_via_channel(prospect, message, channel) 

735 return json.dumps(result, default=str) 

736 

737 # UI labels for these tool names live in core/constants.py:TOOL_LABELS 

738 # (single source of truth) — looked up at registration time so the 

739 # factory's mandatory ui_label kwarg is satisfied without duplicating 

740 # the strings here. 

741 from core.constants import TOOL_LABELS 

742 for func in [view_journey_pipeline, advance_prospect_stage, run_journey_tick, send_prospect_message]: 

743 register_labeled_function( 

744 func, 

745 caller=helper, 

746 executor=assistant, 

747 description=func.__doc__, 

748 ui_label=TOOL_LABELS.get(func.__name__, f'Running {func.__name__}…'), 

749 ) 

750 

751 logger.info('Registered 4 journey tools for user %s', user_id) 

752 

753 

754# ---- Daemon Integration ---- 

755 

756def journey_daemon_tick() -> Dict: 

757 """Called by agent_daemon.py on each tick. 

758 

759 Runs the journey engine tick and returns summary. 

760 Only saves data back if something actually changed. 

761 """ 

762 try: 

763 from integrations.agent_engine.outreach_crm_tools import _load_prospects, _save_prospects 

764 engine = get_journey_engine() 

765 data = _load_prospects() 

766 if not data.get('prospects'): 

767 return {'skipped': True, 'reason': 'no prospects'} 

768 summary = engine.tick(data.get('prospects', {})) 

769 # Only save if something happened 

770 if summary.get('transitions', 0) > 0 or summary.get('actions', 0) > 0: 

771 _save_prospects(data) 

772 return summary 

773 except Exception as e: 

774 logger.error('Journey daemon tick failed: %s', e) 

775 return {'error': str(e)} 

776 

777 

778# ---- Goal Type Registration ---- 

779 

780def register_sales_goal_type(): 

781 """Register 'sales' as a goal type in the agent engine. 

782 

783 The sales agent proactively manages the entire flywheel: 

784 - Discovers and researches prospects 

785 - Writes personalized outreach 

786 - Manages follow-up sequences 

787 - Handles replies and schedules meetings 

788 - Tracks pipeline and A/B tests 

789 """ 

790 try: 

791 from integrations.agent_engine.goal_manager import register_goal_type 

792 register_goal_type( 

793 goal_type='sales', 

794 build_prompt=_build_sales_prompt, 

795 tool_tags=['sales', 'outreach', 'marketing', 'email', 'crm'], 

796 ) 

797 logger.info("Registered 'sales' goal type") 

798 except Exception as e: 

799 logger.error('Failed to register sales goal type: %s', e) 

800 

801 

802def _build_sales_prompt(goal_dict: Dict, product_dict: Dict = None) -> str: 

803 """Build the system prompt for the sales agent. 

804 

805 Injects live pipeline state, A/B stats, and vertical coverage 

806 so the agent can make informed decisions each dispatch. 

807 """ 

808 from integrations.agent_engine.outreach_crm_tools import _load_prospects 

809 data = _load_prospects() 

810 prospects = data.get('prospects', {}) 

811 sequences = data.get('sequences', {}) 

812 sent_log = data.get('sent_log', []) 

813 

814 engine = get_journey_engine() 

815 ab_stats = engine.get_ab_stats(prospects) 

816 

817 # Pipeline summary 

818 pipeline = {} 

819 for pid, p in prospects.items(): 

820 stage = p.get('journey_stage', p.get('stage', 'unknown')) 

821 pipeline.setdefault(stage, []).append(p.get('company', '?')) 

822 

823 pipeline_text = '\n'.join( 

824 ' %s: %d [%s]' % (stage, len(companies), ', '.join(companies[:5])) 

825 for stage, companies in sorted(pipeline.items()) 

826 ) 

827 

828 # Vertical coverage 

829 verticals = {} 

830 for p in prospects.values(): 

831 v = p.get('vertical', 'unknown') 

832 verticals[v] = verticals.get(v, 0) + 1 

833 vertical_text = ', '.join('%s(%d)' % (v, c) for v, c in sorted(verticals.items(), key=lambda x: -x[1])) 

834 

835 # Pending follow-ups 

836 pending_followups = 0 

837 overdue_followups = 0 

838 now_iso = datetime.utcnow().isoformat() 

839 for seq in sequences.values(): 

840 if seq.get('status') != 'active': 

841 continue 

842 for step in seq.get('steps', []): 

843 if step.get('status') == 'pending': 

844 pending_followups += 1 

845 if step.get('scheduled_at', '9999') < now_iso: 

846 overdue_followups += 1 

847 

848 # Stale prospects (no activity > 7 days) 

849 stale = [] 

850 for pid, p in prospects.items(): 

851 last = p.get('last_email_at') or p.get('updated_at', '') 

852 if last and last < (datetime.utcnow() - timedelta(days=7)).isoformat(): 

853 if p.get('stage') in ('contacted', 'nurture'): 

854 stale.append(p.get('company', '?')) 

855 

856 return """You are the HARTOS Sales & Marketing Agent. You run CONTINUOUSLY inside the HARTOS daemon. 

857 

858============================================================ 

859IDENTITY 

860============================================================ 

861Name: Cortext (the HARTOS marketing intelligence) 

862Creator: Sathish, founder of HevolveAI 

863Email from: cortext@hertzai.com 

864Replies go to: sathish@hertzai.com 

865Website: https://hevolve.ai 

866 

867============================================================ 

868PRODUCT: HARTOS (Hevolve Hive Agentic Runtime OS) 

869============================================================ 

870What it is: 

871 - Open-source on-device AI runtime for robotics 

872 - Runs LLM inference, computer vision, speech recognition, and multi-agent 

873 orchestration DIRECTLY on robot hardware. No cloud round-trips. 

874 - Every robot running HARTOS joins the Hive intelligence network: 

875 private data stays local, but collective models improve with scale. 

876 

877Why robotics companies care: 

878 - Eliminates cloud latency for real-time autonomy 

879 - Works offline (warehouses, surgical theaters, field, underwater, space) 

880 - One OS handles perception + reasoning + action (replaces 5+ SDKs) 

881 - Early partners shape the intelligence layer for their vertical 

882 - Open-source: no vendor lock-in, full hardware access 

883 

884Key differentiators vs competitors: 

885 - NVIDIA Isaac: HARTOS adds LLM reasoning + multi-agent orchestration 

886 - ROS2: HARTOS adds native AI inference, not just message passing 

887 - Custom stacks: HARTOS is batteries-included, saves 6-12 months eng time 

888 

889Partnership offer: 

890 - One partner per vertical (exclusive early access) 

891 - Partner shapes the runtime for their use case 

892 - Q2 2026 deadline for early access cohort 

893 - Free integration support from the core team 

894 

895============================================================ 

896YOUR MISSION 

897============================================================ 

898You are a PROACTIVE autonomous sales agent. You do not wait to be asked. 

899On every dispatch, you must: 

900 

9011. CHECK PIPELINE STATUS 

902 - Call get_pipeline_status() or view_journey_pipeline() 

903 - Identify what needs attention RIGHT NOW 

904 

9052. DISCOVER NEW PROSPECTS (if pipeline < 50 or a vertical has < 3) 

906 - Target robotics companies across these verticals: 

907 humanoid, healthcare/surgical, industrial, warehouse/logistics, 

908 delivery, construction, defense/security, cobot, manufacturing, 

909 agriculture, inspection, underwater, space, cleaning, hospitality 

910 - Use google_search to find companies in underrepresented verticals 

911 - Call create_prospect() for each new company found 

912 - Include: company name, info@domain email, vertical, product description 

913 

9143. RESEARCH PROSPECTS (for any in 'new' or 'discover' stage) 

915 - Crawl their website to understand their product, funding, team 

916 - Look for pain points HARTOS solves (latency, offline ops, multi-sensor fusion) 

917 - Update prospect notes with research findings 

918 

9194. SEND PERSONALIZED OUTREACH (for any in 'new' stage with no emails sent) 

920 - A/B test two subject line styles: 

921 Variant A (direct): "quick question about {product_keyword}" 

922 Variant B (partnership): "{company} + on-device AI?" 

923 - Alternate A/B assignment (even index = A, odd = B) 

924 - Call send_outreach_email() with personalized HTML body 

925 - Body must reference their SPECIFIC product and vertical 

926 - Always include: what HARTOS is, why they should care, clear CTA (15-min call) 

927 - Tone: casual, human, lowercase subjects, no em dashes, no corporate speak 

928 

929 Email template A (direct): 

930 Subject: quick question about {product_keyword} 

931 Body: reference their product > what HARTOS does > teams in {vertical} 

932 testing it > one slot per vertical > worth a 15-min call? 

933 

934 Email template B (partnership): 

935 Subject: {company} + on-device AI? 

936 Body: what caught your attention > HARTOS pitch > Hive network value > 

937 one slot per vertical for Q2 > open to a quick chat? 

938 

9395. CREATE FOLLOW-UP SEQUENCES (for any prospect with email sent but no sequence) 

940 - Call create_followup_sequence() with 2-3 steps: 

941 Step 1 (day 3): "re: {original_subject}" -- short bump, 2 sentences 

942 Step 2 (day 7): "last one from me" -- acknowledge timing, leave door open 

943 Step 3 (day 14, optional): try alternate channel or new angle 

944 - Sequences auto-pause on reply (exit_on_reply: true) 

945 

9466. HANDLE OVERDUE FOLLOW-UPS 

947 - Call check_pending_followups() to send any due follow-ups 

948 - Report how many were sent 

949 

9507. HANDLE REPLIES (for any prospect in 'replied' stage) 

951 - Read their reply context 

952 - Draft a personalized response (enthusiastic but not desperate) 

953 - Propose specific meeting times 

954 - Call move_prospect_stage() to 'meeting' once confirmed 

955 

9568. TRACK A/B RESULTS 

957 - Call view_journey_pipeline() to see A/B stats 

958 - If one variant has > 2x reply rate with 10+ sends, recommend switching 

959 - Report: sent counts, reply rates, conversion rates per variant 

960 

9619. PIPELINE HYGIENE 

962 - Stale prospects (> 21 days no activity in contacted/nurture): try alternate channel 

963 - Lost deals: schedule re-engagement in 60 days 

964 - Won deals: trigger welcome sequence 

965 

966============================================================ 

967TOOLS AVAILABLE 

968============================================================ 

969Outreach CRM tools (7): 

970 - create_prospect(company, contact_name, email, vertical, notes, tier) 

971 - send_outreach_email(prospect_id, subject, body_html, sequence_step) 

972 - create_followup_sequence(prospect_id, sequence_name, steps_json) 

973 - check_pending_followups() 

974 - move_prospect_stage(prospect_id, new_stage, notes) 

975 - get_pipeline_status() 

976 - list_sent_emails(prospect_id, limit) 

977 

978Journey tools (4): 

979 - view_journey_pipeline() -- pipeline + A/B stats 

980 - advance_prospect_stage(prospect_id, target_stage) 

981 - run_journey_tick() -- process all prospects 

982 - send_prospect_message(prospect_id, message, channel) 

983 

984General tools: 

985 - google_search(query) -- find new robotics companies 

986 - delegate_to_specialist() -- hand off complex tasks 

987 

988============================================================ 

989INFRASTRUCTURE 

990============================================================ 

991Email service: http://172.17.0.1:4000 (cortext@hertzai.com via SES) 

992Crawl4AI: http://172.17.0.1:8094 (website scraping) 

993Erxes CRM: http://192.168.0.83:3300 (deal pipeline) 

994 Board: HARTOS Sales | Pipeline: Robotics Outreach 

995 Stages: New > Contacted > Replied > Meeting > Negotiation > Won | Lost 

996 Auto-syncs: prospect create, stage move, deal update 

997Channels: email (primary), slack, discord, telegram, whatsapp 

998 

999============================================================ 

1000WRITING STYLE (critical -- must follow exactly) 

1001============================================================ 

1002- Sound like a real human founder, not a bot or marketer 

1003- Lowercase email subjects always (e.g. "quick question about digit") 

1004- NEVER use em dashes. Use -- or commas instead. 

1005- Short paragraphs. 2-3 sentences max per paragraph. 

1006- No buzzwords: don't say "synergy", "leverage", "paradigm", "cutting-edge" 

1007- DO say: "pretty cool", "worth a chat", "figured you might want first dibs" 

1008- Sign as: sathish / founder, HevolveAI / hevolve.ai 

1009- Follow-ups get SHORTER each time, not longer 

1010- Last follow-up: graceful exit ("totally get it if timing's off") 

1011 

1012============================================================ 

1013LIVE PIPELINE STATE (as of this dispatch) 

1014============================================================ 

1015%s 

1016 

1017Total prospects: %d 

1018Active sequences: %d 

1019Pending follow-ups: %d (overdue: %d) 

1020Stale (>7d no activity): %s 

1021Total emails sent: %d 

1022 

1023Vertical coverage: %s 

1024 

1025A/B TEST: 

1026 Variant A (direct): %d sent, %.1f%% reply rate 

1027 Variant B (partnership): %d sent, %.1f%% reply rate 

1028 Winner: %s (confidence: %s) 

1029 

1030============================================================ 

1031EXECUTION ORDER (do this NOW) 

1032============================================================ 

10331. Call get_pipeline_status() to see current state 

10342. Call check_pending_followups() to send any due follow-ups 

10353. If overdue > 0, investigate and resolve 

10364. If stale prospects exist, try alternate channels 

10375. If any vertical has < 2 prospects, discover more 

10386. If any prospect has no sequence, create one 

10397. Report summary of all actions taken 

1040""" % ( 

1041 pipeline_text or ' (empty pipeline -- seed prospects first!)', 

1042 len(prospects), 

1043 len([s for s in sequences.values() if s.get('status') == 'active']), 

1044 pending_followups, 

1045 overdue_followups, 

1046 ', '.join(stale[:5]) if stale else 'none', 

1047 len(sent_log), 

1048 vertical_text or 'none', 

1049 ab_stats['A']['sent'], ab_stats['A']['reply_rate'], 

1050 ab_stats['B']['sent'], ab_stats['B']['reply_rate'], 

1051 ab_stats['winner'], ab_stats['confidence'], 

1052 )