Coverage for integrations / agent_engine / journey_engine.py: 14.9%
343 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Agentic User Journey Engine for HARTOS Marketing Flywheel
4Orchestrates the full prospect lifecycle across channels:
5 Discover -> Research -> Outreach -> Follow-up -> Reply -> Meeting -> Close
7Each stage has:
8 - Entry conditions (triggers)
9 - Agentic actions (what HARTOS does proactively)
10 - Exit conditions (what moves to next stage)
11 - Channel preferences (where to reach the prospect)
13The engine is:
14 - Extensible: add stages/actions via register_stage()
15 - Scalable: async execution via HARTOS workflow engine
16 - Intuitive: config-driven, not code-driven
17 - Agentic: HARTOS proactively researches, writes, follows up, schedules
19Integrates with:
20 - outreach_crm_tools.py (prospect store + Erxes CRM)
21 - erxes_client.py (native CRM sync)
22 - channels/ (Discord, Telegram, Slack, WhatsApp, Email)
23 - automation/workflows.py (async workflow execution)
24 - automation/triggers.py (event-based triggers)
25 - agent_daemon.py (periodic tick-based checks)
26 - marketing_tools.py (social posting, campaigns)
27"""
28import json
29import logging
30import os
31import threading
32import time
33from datetime import datetime, timedelta
34from typing import Any, Callable, Dict, List, Optional
36logger = logging.getLogger('hevolve_journey')
38# ---- Journey Stage Definitions ----
40STAGES = {
41 'discover': {
42 'order': 0,
43 'description': 'Prospect identified but not yet researched',
44 'crm_stage': 'new',
45 'auto_actions': ['research_prospect'],
46 'exit_to': 'research',
47 },
48 'research': {
49 'order': 1,
50 'description': 'Agent researching prospect (funding, tech, pain points)',
51 'crm_stage': 'new',
52 'auto_actions': ['deep_research', 'generate_personalized_email'],
53 'exit_to': 'outreach',
54 },
55 'outreach': {
56 'order': 2,
57 'description': 'Initial personalized email sent',
58 'crm_stage': 'contacted',
59 'auto_actions': ['send_outreach_email'],
60 'exit_to': 'nurture',
61 },
62 'nurture': {
63 'order': 3,
64 'description': 'Follow-up sequence running (day 3, 7, 14)',
65 'crm_stage': 'contacted',
66 'auto_actions': ['check_followups', 'try_alternate_channel'],
67 'exit_to': 'engaged',
68 'exit_on': 'reply_received',
69 },
70 'engaged': {
71 'order': 4,
72 'description': 'Prospect replied -- agent drafts response',
73 'crm_stage': 'replied',
74 'auto_actions': ['draft_response', 'notify_user', 'propose_meeting'],
75 'exit_to': 'meeting',
76 },
77 'meeting': {
78 'order': 5,
79 'description': 'Meeting scheduled or in progress',
80 'crm_stage': 'meeting',
81 'auto_actions': ['prepare_demo_materials', 'send_calendar_invite'],
82 'exit_to': 'negotiation',
83 },
84 'negotiation': {
85 'order': 6,
86 'description': 'Active deal negotiation',
87 'crm_stage': 'negotiation',
88 'auto_actions': ['track_deal_progress', 'generate_proposal'],
89 'exit_to': 'won',
90 },
91 'won': {
92 'order': 7,
93 'description': 'Deal closed -- onboard partner',
94 'crm_stage': 'won',
95 'auto_actions': ['send_welcome_sequence', 'create_partner_channel'],
96 'exit_to': None,
97 },
98 'lost': {
99 'order': -1,
100 'description': 'Deal lost -- schedule re-engagement',
101 'crm_stage': 'lost',
102 'auto_actions': ['schedule_reengagement', 'analyze_loss_reason'],
103 'exit_to': None,
104 },
105}
107# ---- Channel Strategy ----
109CHANNEL_PRIORITY = {
110 'outreach': ['email'],
111 'nurture': ['email', 'linkedin'],
112 'engaged': ['email', 'slack', 'discord'],
113 'meeting': ['email', 'slack', 'discord', 'whatsapp'],
114 'negotiation': ['email', 'slack', 'whatsapp'],
115 'won': ['email', 'slack', 'discord', 'telegram'],
116}
118# ---- A/B Test Tracking ----
120AB_VARIANTS = {
121 'subject_direct': {
122 'id': 'A',
123 'template': 'quick question about {product_keyword}',
124 'style': 'direct, curiosity-driven',
125 },
126 'subject_mutual': {
127 'id': 'B',
128 'template': '{company} + HARTOS -- better together?',
129 'style': 'partnership-framed, collaborative',
130 },
131}
134class JourneyEngine:
135 """Orchestrates prospect journeys across the HARTOS flywheel."""
137 def __init__(self):
138 self._stages = dict(STAGES)
139 self._action_handlers: Dict[str, Callable] = {}
140 self._hooks: Dict[str, List[Callable]] = {} # stage_enter, stage_exit, etc.
141 self._lock = threading.Lock()
142 self._register_default_actions()
144 # ---- Stage Management ----
146 def register_stage(self, name: str, config: Dict):
147 """Add or override a journey stage."""
148 self._stages[name] = config
149 logger.info('Journey: registered stage "%s"', name)
151 def register_action(self, name: str, handler: Callable):
152 """Register an action handler.
154 Handler signature: handler(prospect: Dict, context: Dict) -> Dict
155 """
156 self._action_handlers[name] = handler
157 logger.info('Journey: registered action "%s"', name)
159 def register_hook(self, event: str, handler: Callable):
160 """Register a hook for journey events.
162 Events: stage_enter, stage_exit, email_sent, reply_received,
163 meeting_scheduled, deal_won, deal_lost
164 """
165 self._hooks.setdefault(event, []).append(handler)
167 # ---- Journey Execution ----
169 def advance_prospect(self, prospect: Dict, force_stage: str = None) -> Dict:
170 """Move a prospect to the next stage and execute actions.
172 Returns updated prospect with actions taken.
173 """
174 current = prospect.get('journey_stage', prospect.get('stage', 'discover'))
175 stage_config = self._stages.get(current, {})
177 # Determine target stage
178 if force_stage:
179 target = force_stage
180 else:
181 target = stage_config.get('exit_to', current)
183 if not target or target == current:
184 return {'moved': False, 'stage': current, 'actions': []}
186 # Fire exit hooks
187 self._fire_hooks('stage_exit', prospect=prospect, from_stage=current, to_stage=target)
189 # Update prospect
190 prospect['journey_stage'] = target
191 prospect['stage'] = self._stages.get(target, {}).get('crm_stage', target)
192 prospect['updated_at'] = datetime.utcnow().isoformat()
194 # Fire enter hooks
195 self._fire_hooks('stage_enter', prospect=prospect, stage=target)
197 # Execute auto-actions for the new stage
198 actions_taken = self._execute_stage_actions(prospect, target)
200 # Sync to CRM
201 self._sync_crm(prospect)
203 logger.info('Journey: %s moved %s -> %s (%d actions)',
204 prospect.get('company', '?'), current, target, len(actions_taken))
206 return {'moved': True, 'from': current, 'to': target, 'actions': actions_taken}
208 def run_stage_actions(self, prospect: Dict) -> List[Dict]:
209 """Execute pending actions for the prospect's current stage.
211 Called by the daemon on each tick.
212 """
213 stage = prospect.get('journey_stage', prospect.get('stage', 'discover'))
214 return self._execute_stage_actions(prospect, stage)
216 def process_event(self, event_type: str, prospect: Dict, event_data: Dict = None) -> Dict:
217 """Process an external event that may trigger a stage transition.
219 Events:
220 reply_received: prospect replied to email
221 meeting_confirmed: meeting scheduled
222 deal_won / deal_lost: final outcomes
223 channel_message: message on any channel
224 link_clicked: tracking pixel fired
225 """
226 current = prospect.get('journey_stage', prospect.get('stage', 'discover'))
228 transitions = {
229 'reply_received': {
230 'from': ['outreach', 'nurture', 'contacted'],
231 'to': 'engaged',
232 },
233 'meeting_confirmed': {
234 'from': ['engaged', 'replied'],
235 'to': 'meeting',
236 },
237 'deal_won': {
238 'from': ['negotiation', 'meeting'],
239 'to': 'won',
240 },
241 'deal_lost': {
242 'from': ['negotiation', 'meeting', 'engaged', 'nurture'],
243 'to': 'lost',
244 },
245 }
247 transition = transitions.get(event_type)
248 if transition and current in transition.get('from', []):
249 self._fire_hooks(event_type, prospect=prospect, data=event_data)
250 return self.advance_prospect(prospect, force_stage=transition['to'])
252 return {'moved': False, 'stage': current, 'event': event_type}
254 # ---- Tick-based Processing (called by daemon) ----
256 def tick(self, all_prospects: Dict) -> Dict:
257 """Process all prospects on a daemon tick.
259 Checks for:
260 - Pending follow-ups to send
261 - Stage transitions that should happen
262 - Prospects stuck in a stage too long
263 - A/B test results to analyze
265 Returns summary of actions taken.
266 """
267 summary = {'actions': 0, 'transitions': 0, 'followups_sent': 0}
269 for pid, prospect in all_prospects.items():
270 stage = prospect.get('journey_stage', prospect.get('stage', 'discover'))
271 stage_config = self._stages.get(stage, {})
273 # Check if prospect should auto-advance
274 if stage == 'discover' and not prospect.get('researched'):
275 result = self.advance_prospect(prospect, 'research')
276 if result.get('moved'):
277 summary['transitions'] += 1
279 elif stage == 'research' and prospect.get('email_draft'):
280 result = self.advance_prospect(prospect, 'outreach')
281 if result.get('moved'):
282 summary['transitions'] += 1
284 elif stage in ('outreach', 'nurture', 'contacted'):
285 # Check for due follow-ups
286 actions = self.run_stage_actions(prospect)
287 summary['actions'] += len(actions)
289 # Check for stale prospects (no activity for 21+ days)
290 last_activity = prospect.get('last_email_at') or prospect.get('updated_at', '')
291 if last_activity:
292 try:
293 last = datetime.fromisoformat(last_activity.replace('Z', '+00:00').replace('+00:00', ''))
294 days_stale = (datetime.utcnow() - last).days
295 if days_stale > 21 and stage in ('nurture', 'contacted'):
296 # Try alternate channel
297 self._try_alternate_channel(prospect)
298 summary['actions'] += 1
299 except (ValueError, TypeError):
300 pass
302 return summary
304 # ---- A/B Testing ----
306 def get_ab_stats(self, prospects: Dict) -> Dict:
307 """Analyze A/B test results across all prospects."""
308 stats = {'A': {'sent': 0, 'replied': 0, 'meetings': 0},
309 'B': {'sent': 0, 'replied': 0, 'meetings': 0}}
311 for pid, p in prospects.items():
312 variant = 'A' if 'variant: A' in p.get('notes', '') else 'B' if 'variant: B' in p.get('notes', '') else None
313 if not variant:
314 continue
315 stats[variant]['sent'] += 1
316 if p.get('last_reply_at'):
317 stats[variant]['replied'] += 1
318 if p.get('stage') in ('meeting', 'negotiation', 'won'):
319 stats[variant]['meetings'] += 1
321 # Calculate rates
322 for v in ('A', 'B'):
323 sent = stats[v]['sent'] or 1
324 stats[v]['reply_rate'] = round(stats[v]['replied'] / sent * 100, 1)
325 stats[v]['meeting_rate'] = round(stats[v]['meetings'] / sent * 100, 1)
327 # Determine winner
328 a_score = stats['A']['reply_rate'] + stats['A']['meeting_rate'] * 2
329 b_score = stats['B']['reply_rate'] + stats['B']['meeting_rate'] * 2
330 stats['winner'] = 'A' if a_score > b_score else 'B' if b_score > a_score else 'tie'
331 stats['confidence'] = 'low' if (stats['A']['sent'] + stats['B']['sent']) < 20 else 'medium' if (stats['A']['sent'] + stats['B']['sent']) < 50 else 'high'
333 return stats
335 # ---- Channel Routing ----
337 def get_channels_for_stage(self, stage: str) -> List[str]:
338 """Get preferred channels for a journey stage."""
339 return CHANNEL_PRIORITY.get(stage, ['email'])
341 def send_via_channel(self, prospect: Dict, message: str, channel: str = None) -> Dict:
342 """Send a message to a prospect via the best available channel."""
343 stage = prospect.get('journey_stage', 'outreach')
344 channels = [channel] if channel else self.get_channels_for_stage(stage)
346 for ch in channels:
347 result = self._send_channel_message(prospect, message, ch)
348 if result.get('success'):
349 return result
351 return {'success': False, 'error': 'no channel available'}
353 # ---- Internal Methods ----
355 def _register_default_actions(self):
356 """Register built-in action handlers."""
357 self._action_handlers.update({
358 'research_prospect': self._action_research,
359 'deep_research': self._action_deep_research,
360 'generate_personalized_email': self._action_generate_email,
361 'send_outreach_email': self._action_send_email,
362 'check_followups': self._action_check_followups,
363 'try_alternate_channel': self._action_try_alternate_channel,
364 'draft_response': self._action_draft_response,
365 'notify_user': self._action_notify_user,
366 'propose_meeting': self._action_propose_meeting,
367 'prepare_demo_materials': self._action_noop,
368 'send_calendar_invite': self._action_noop,
369 'track_deal_progress': self._action_noop,
370 'generate_proposal': self._action_noop,
371 'send_welcome_sequence': self._action_noop,
372 'create_partner_channel': self._action_noop,
373 'schedule_reengagement': self._action_schedule_reengagement,
374 'analyze_loss_reason': self._action_noop,
375 })
377 def _execute_stage_actions(self, prospect: Dict, stage: str) -> List[Dict]:
378 """Execute all auto-actions for a stage."""
379 stage_config = self._stages.get(stage, {})
380 actions = stage_config.get('auto_actions', [])
381 results = []
383 for action_name in actions:
384 handler = self._action_handlers.get(action_name)
385 if handler:
386 try:
387 result = handler(prospect, {})
388 results.append({'action': action_name, 'result': result})
389 except Exception as e:
390 logger.error('Journey action %s failed: %s', action_name, e)
391 results.append({'action': action_name, 'error': str(e)})
393 return results
395 def _fire_hooks(self, event: str, **kwargs):
396 """Fire all hooks for an event."""
397 for hook in self._hooks.get(event, []):
398 try:
399 hook(**kwargs)
400 except Exception as e:
401 logger.error('Journey hook %s failed: %s', event, e)
403 def _sync_crm(self, prospect: Dict):
404 """Sync prospect state to Erxes CRM."""
405 try:
406 from integrations.agent_engine.erxes_client import get_erxes_client
407 erxes = get_erxes_client()
408 if erxes and prospect.get('erxes_deal_id'):
409 crm_stage = prospect.get('stage', 'new')
410 erxes.move_deal(prospect['erxes_deal_id'], crm_stage)
411 except Exception as e:
412 logger.debug('CRM sync failed: %s', e)
414 def _send_channel_message(self, prospect: Dict, message: str, channel: str) -> Dict:
415 """Send a message via a specific channel adapter."""
416 if channel == 'email':
417 try:
418 from integrations.agent_engine.outreach_crm_tools import _send_email
419 result = _send_email(prospect['email'], 'Update from HevolveAI', message)
420 return {'success': result.get('success', False), 'channel': 'email'}
421 except Exception as e:
422 return {'success': False, 'error': str(e)}
424 # Other channels via registry
425 try:
426 from integrations.channels.registry import get_registry
427 registry = get_registry()
428 adapter = registry.get_adapter(channel)
429 if adapter:
430 import asyncio
431 loop = getattr(registry, '_loop', None)
432 if loop and loop.is_running():
433 future = asyncio.run_coroutine_threadsafe(
434 adapter.send_message(
435 chat_id=prospect.get('channel_ids', {}).get(channel, ''),
436 text=message,
437 ),
438 loop,
439 )
440 result = future.result(timeout=15)
441 return {'success': result.success, 'channel': channel}
442 except Exception as e:
443 logger.debug('Channel %s send failed: %s', channel, e)
445 return {'success': False, 'channel': channel, 'error': 'adapter unavailable'}
447 def _try_alternate_channel(self, prospect: Dict):
448 """Try reaching a prospect via a different channel than email."""
449 stage = prospect.get('journey_stage', 'nurture')
450 channels = self.get_channels_for_stage(stage)
451 for ch in channels:
452 if ch != 'email':
453 message = (
454 "hey, sent you an email about HARTOS a few days ago. "
455 "just wanted to make sure it didn't get lost. "
456 "happy to chat here if that's easier."
457 )
458 result = self.send_via_channel(prospect, message, ch)
459 if result.get('success'):
460 prospect['alternate_channel_tried'] = ch
461 prospect['updated_at'] = datetime.utcnow().isoformat()
462 return
464 # ---- Action Handlers ----
466 def _action_research(self, prospect: Dict, ctx: Dict) -> Dict:
467 """Use Crawl4AI to research a prospect's company."""
468 url = prospect.get('url', '')
469 if not url:
470 return {'skipped': True, 'reason': 'no url'}
472 try:
473 from urllib.request import Request, urlopen
474 crawl_url = os.environ.get('CRAWL4AI_URL', 'http://172.17.0.1:8094')
475 payload = json.dumps({'url': url, 'timeout': 60000}).encode('utf-8')
476 req = Request(crawl_url + '/crawl', data=payload,
477 headers={'Content-Type': 'application/json'})
478 resp = urlopen(req, timeout=120)
479 data = json.loads(resp.read().decode('utf-8'))
480 if data.get('markdown'):
481 prospect['research'] = data['markdown'][:2000]
482 prospect['researched'] = True
483 return {'success': True, 'words': data.get('word_count', 0)}
484 except Exception as e:
485 logger.debug('Research crawl failed: %s', e)
487 return {'success': False}
489 def _action_deep_research(self, prospect: Dict, ctx: Dict) -> Dict:
490 """Agent-powered deep research via HARTOS dispatch."""
491 if prospect.get('deep_researched'):
492 return {'skipped': True}
494 try:
495 from integrations.agent_engine.dispatch import dispatch_goal
496 prompt = (
497 "Research %s (%s). Find: funding stage, team size, tech stack, "
498 "recent news, pain points that HARTOS could solve. "
499 "Summarize in 3-4 bullet points."
500 % (prospect.get('company', ''), prospect.get('url', ''))
501 )
502 dispatch_goal(
503 user_id=prospect.get('created_by', 'system'),
504 prompt=prompt,
505 goal_tags=['research'],
506 )
507 prospect['deep_researched'] = True
508 return {'dispatched': True}
509 except Exception as e:
510 return {'error': str(e)}
512 def _action_generate_email(self, prospect: Dict, ctx: Dict) -> Dict:
513 """Generate a personalized outreach email based on research."""
514 research = prospect.get('research', '')
515 company = prospect.get('company', '')
516 product = prospect.get('product', prospect.get('notes', ''))
518 # Simple template (agent can override with dispatch)
519 prospect['email_draft'] = {
520 'subject': 'quick question about %s' % company.lower(),
521 'body': (
522 '<p>hey,</p>'
523 '<p>been looking at what %s is building. %s</p>'
524 '<p>we have an open-source on-device AI runtime (HARTOS) '
525 'that handles LLM inference, vision, speech, and multi-agent '
526 'orchestration right on the hardware. no cloud dependency.</p>'
527 '<p>worth a quick chat?</p>'
528 '<p>sathish<br>founder, HevolveAI</p>'
529 ) % (company, product[:100] if product else ''),
530 }
531 return {'generated': True}
533 def _action_send_email(self, prospect: Dict, ctx: Dict) -> Dict:
534 """Send the outreach email."""
535 draft = prospect.get('email_draft')
536 if not draft:
537 return {'skipped': True, 'reason': 'no draft'}
539 if prospect.get('emails_sent', 0) > 0:
540 return {'skipped': True, 'reason': 'already sent'}
542 try:
543 from integrations.agent_engine.outreach_crm_tools import _send_email
544 result = _send_email(prospect['email'], draft['subject'], draft['body'])
545 if result.get('success') or result.get('via'):
546 prospect['emails_sent'] = 1
547 prospect['last_email_at'] = datetime.utcnow().isoformat()
548 prospect['stage'] = 'contacted'
549 return {'sent': True, 'result': result}
550 except Exception as e:
551 return {'error': str(e)}
553 return {'sent': False}
555 def _action_check_followups(self, prospect: Dict, ctx: Dict) -> Dict:
556 """Check and send pending follow-ups (delegates to outreach tools)."""
557 try:
558 from integrations.agent_engine.outreach_crm_tools import check_pending_followups_daemon
559 return check_pending_followups_daemon()
560 except Exception as e:
561 return {'error': str(e)}
563 def _action_try_alternate_channel(self, prospect: Dict, ctx: Dict) -> Dict:
564 """Try reaching via alternate channel if email isn't working."""
565 if prospect.get('alternate_channel_tried'):
566 return {'skipped': True}
567 self._try_alternate_channel(prospect)
568 return {'tried': prospect.get('alternate_channel_tried', 'none')}
570 def _action_draft_response(self, prospect: Dict, ctx: Dict) -> Dict:
571 """Dispatch agent to draft a response to prospect's reply."""
572 try:
573 from integrations.agent_engine.outreach_crm_tools import _dispatch_response_draft
574 context = {
575 'prospect': prospect,
576 'their_reply': prospect.get('last_reply', {}),
577 'our_emails': [],
578 }
579 _dispatch_response_draft(prospect, context)
580 return {'dispatched': True}
581 except Exception as e:
582 return {'error': str(e)}
584 def _action_notify_user(self, prospect: Dict, ctx: Dict) -> Dict:
585 """Push notification to user about prospect activity."""
586 try:
587 from integrations.channels.response.router import get_response_router
588 router = get_response_router()
589 msg = "%s (%s) is now in stage: %s" % (
590 prospect.get('company', '?'),
591 prospect.get('email', '?'),
592 prospect.get('journey_stage', prospect.get('stage', '?')),
593 )
594 router.route_response(
595 user_id=prospect.get('created_by', 'system'),
596 response_text=msg,
597 fan_out=True,
598 )
599 return {'notified': True}
600 except Exception as e:
601 return {'error': str(e)}
603 def _action_propose_meeting(self, prospect: Dict, ctx: Dict) -> Dict:
604 """Agent proposes a meeting time to the prospect."""
605 if prospect.get('meeting_proposed'):
606 return {'skipped': True}
608 try:
609 from integrations.agent_engine.outreach_crm_tools import _send_email
610 result = _send_email(
611 prospect['email'],
612 're: HARTOS partnership',
613 '<p>great to hear back from you! would any of these work for a quick call?</p>'
614 '<ul>'
615 '<li>this week: Thursday or Friday, 2-4pm PT</li>'
616 '<li>next week: Tuesday or Wednesday, morning PT</li>'
617 '</ul>'
618 '<p>happy to adjust to your timezone.</p>'
619 '<p>sathish</p>',
620 )
621 prospect['meeting_proposed'] = True
622 return {'proposed': True, 'result': result}
623 except Exception as e:
624 return {'error': str(e)}
626 def _action_schedule_reengagement(self, prospect: Dict, ctx: Dict) -> Dict:
627 """Schedule re-engagement for lost deals (try again in 60 days)."""
628 prospect['reengagement_at'] = (datetime.utcnow() + timedelta(days=60)).isoformat()
629 return {'scheduled': True, 'date': prospect['reengagement_at']}
631 def _action_noop(self, prospect: Dict, ctx: Dict) -> Dict:
632 """Placeholder for actions not yet implemented."""
633 return {'noop': True}
636# ---- Singleton ----
638_engine = None
639_engine_lock = threading.Lock()
642def get_journey_engine() -> JourneyEngine:
643 """Get or create the singleton journey engine."""
644 global _engine
645 with _engine_lock:
646 if _engine is None:
647 _engine = JourneyEngine()
648 logger.info('Journey engine initialized with %d stages', len(_engine._stages))
649 return _engine
652# ---- Agent Tool Registration ----
654def register_journey_tools(helper, assistant, user_id: str):
655 """Register journey tools for the marketing/sales agent."""
656 # Routed through core.labeled_autogen_function so each tool emits a
657 # publish_chat_stage('tool_call', text=…) before invocation — same UI
658 # status pipeline as LangChain's _with_tool_logging chokepoint.
659 from core.labeled_autogen_function import register_labeled_function
661 engine = get_journey_engine()
663 def view_journey_pipeline() -> str:
664 """View the full prospect journey pipeline with stage counts and A/B stats."""
665 from integrations.agent_engine.outreach_crm_tools import _load_prospects
666 data = _load_prospects()
667 prospects = data.get('prospects', {})
669 # Group by journey stage
670 pipeline = {}
671 for pid, p in prospects.items():
672 stage = p.get('journey_stage', p.get('stage', 'unknown'))
673 pipeline.setdefault(stage, []).append({
674 'company': p.get('company'),
675 'email': p.get('email'),
676 'emails_sent': p.get('emails_sent', 0),
677 'last_activity': p.get('last_email_at') or p.get('updated_at'),
678 })
680 # A/B stats
681 ab = engine.get_ab_stats(prospects)
683 return json.dumps({
684 'pipeline': {k: {'count': len(v), 'prospects': v} for k, v in pipeline.items()},
685 'total': len(prospects),
686 'ab_test': ab,
687 }, default=str)
689 def advance_prospect_stage(
690 prospect_id: str,
691 target_stage: str = None,
692 ) -> str:
693 """Move a prospect to the next journey stage (or a specific stage).
695 Executes all auto-actions for the new stage.
696 """
697 from integrations.agent_engine.outreach_crm_tools import _load_prospects, _save_prospects
698 data = _load_prospects()
699 prospect = data['prospects'].get(prospect_id)
700 if not prospect:
701 return json.dumps({'error': 'prospect not found'})
703 result = engine.advance_prospect(prospect, force_stage=target_stage)
704 _save_prospects(data)
705 return json.dumps(result, default=str)
707 def run_journey_tick() -> str:
708 """Run one tick of the journey engine across all prospects.
710 Checks follow-ups, stage transitions, stale prospects, and A/B results.
711 """
712 from integrations.agent_engine.outreach_crm_tools import _load_prospects, _save_prospects
713 data = _load_prospects()
714 summary = engine.tick(data.get('prospects', {}))
715 _save_prospects(data)
716 return json.dumps(summary, default=str)
718 def send_prospect_message(
719 prospect_id: str,
720 message: str,
721 channel: str = None,
722 ) -> str:
723 """Send a message to a prospect via the best available channel.
725 Channels: email, slack, discord, telegram, whatsapp.
726 If no channel specified, uses the best one for the prospect's journey stage.
727 """
728 from integrations.agent_engine.outreach_crm_tools import _load_prospects
729 data = _load_prospects()
730 prospect = data['prospects'].get(prospect_id)
731 if not prospect:
732 return json.dumps({'error': 'prospect not found'})
734 result = engine.send_via_channel(prospect, message, channel)
735 return json.dumps(result, default=str)
737 # UI labels for these tool names live in core/constants.py:TOOL_LABELS
738 # (single source of truth) — looked up at registration time so the
739 # factory's mandatory ui_label kwarg is satisfied without duplicating
740 # the strings here.
741 from core.constants import TOOL_LABELS
742 for func in [view_journey_pipeline, advance_prospect_stage, run_journey_tick, send_prospect_message]:
743 register_labeled_function(
744 func,
745 caller=helper,
746 executor=assistant,
747 description=func.__doc__,
748 ui_label=TOOL_LABELS.get(func.__name__, f'Running {func.__name__}…'),
749 )
751 logger.info('Registered 4 journey tools for user %s', user_id)
754# ---- Daemon Integration ----
756def journey_daemon_tick() -> Dict:
757 """Called by agent_daemon.py on each tick.
759 Runs the journey engine tick and returns summary.
760 Only saves data back if something actually changed.
761 """
762 try:
763 from integrations.agent_engine.outreach_crm_tools import _load_prospects, _save_prospects
764 engine = get_journey_engine()
765 data = _load_prospects()
766 if not data.get('prospects'):
767 return {'skipped': True, 'reason': 'no prospects'}
768 summary = engine.tick(data.get('prospects', {}))
769 # Only save if something happened
770 if summary.get('transitions', 0) > 0 or summary.get('actions', 0) > 0:
771 _save_prospects(data)
772 return summary
773 except Exception as e:
774 logger.error('Journey daemon tick failed: %s', e)
775 return {'error': str(e)}
778# ---- Goal Type Registration ----
780def register_sales_goal_type():
781 """Register 'sales' as a goal type in the agent engine.
783 The sales agent proactively manages the entire flywheel:
784 - Discovers and researches prospects
785 - Writes personalized outreach
786 - Manages follow-up sequences
787 - Handles replies and schedules meetings
788 - Tracks pipeline and A/B tests
789 """
790 try:
791 from integrations.agent_engine.goal_manager import register_goal_type
792 register_goal_type(
793 goal_type='sales',
794 build_prompt=_build_sales_prompt,
795 tool_tags=['sales', 'outreach', 'marketing', 'email', 'crm'],
796 )
797 logger.info("Registered 'sales' goal type")
798 except Exception as e:
799 logger.error('Failed to register sales goal type: %s', e)
802def _build_sales_prompt(goal_dict: Dict, product_dict: Dict = None) -> str:
803 """Build the system prompt for the sales agent.
805 Injects live pipeline state, A/B stats, and vertical coverage
806 so the agent can make informed decisions each dispatch.
807 """
808 from integrations.agent_engine.outreach_crm_tools import _load_prospects
809 data = _load_prospects()
810 prospects = data.get('prospects', {})
811 sequences = data.get('sequences', {})
812 sent_log = data.get('sent_log', [])
814 engine = get_journey_engine()
815 ab_stats = engine.get_ab_stats(prospects)
817 # Pipeline summary
818 pipeline = {}
819 for pid, p in prospects.items():
820 stage = p.get('journey_stage', p.get('stage', 'unknown'))
821 pipeline.setdefault(stage, []).append(p.get('company', '?'))
823 pipeline_text = '\n'.join(
824 ' %s: %d [%s]' % (stage, len(companies), ', '.join(companies[:5]))
825 for stage, companies in sorted(pipeline.items())
826 )
828 # Vertical coverage
829 verticals = {}
830 for p in prospects.values():
831 v = p.get('vertical', 'unknown')
832 verticals[v] = verticals.get(v, 0) + 1
833 vertical_text = ', '.join('%s(%d)' % (v, c) for v, c in sorted(verticals.items(), key=lambda x: -x[1]))
835 # Pending follow-ups
836 pending_followups = 0
837 overdue_followups = 0
838 now_iso = datetime.utcnow().isoformat()
839 for seq in sequences.values():
840 if seq.get('status') != 'active':
841 continue
842 for step in seq.get('steps', []):
843 if step.get('status') == 'pending':
844 pending_followups += 1
845 if step.get('scheduled_at', '9999') < now_iso:
846 overdue_followups += 1
848 # Stale prospects (no activity > 7 days)
849 stale = []
850 for pid, p in prospects.items():
851 last = p.get('last_email_at') or p.get('updated_at', '')
852 if last and last < (datetime.utcnow() - timedelta(days=7)).isoformat():
853 if p.get('stage') in ('contacted', 'nurture'):
854 stale.append(p.get('company', '?'))
856 return """You are the HARTOS Sales & Marketing Agent. You run CONTINUOUSLY inside the HARTOS daemon.
858============================================================
859IDENTITY
860============================================================
861Name: Cortext (the HARTOS marketing intelligence)
862Creator: Sathish, founder of HevolveAI
863Email from: cortext@hertzai.com
864Replies go to: sathish@hertzai.com
865Website: https://hevolve.ai
867============================================================
868PRODUCT: HARTOS (Hevolve Hive Agentic Runtime OS)
869============================================================
870What it is:
871 - Open-source on-device AI runtime for robotics
872 - Runs LLM inference, computer vision, speech recognition, and multi-agent
873 orchestration DIRECTLY on robot hardware. No cloud round-trips.
874 - Every robot running HARTOS joins the Hive intelligence network:
875 private data stays local, but collective models improve with scale.
877Why robotics companies care:
878 - Eliminates cloud latency for real-time autonomy
879 - Works offline (warehouses, surgical theaters, field, underwater, space)
880 - One OS handles perception + reasoning + action (replaces 5+ SDKs)
881 - Early partners shape the intelligence layer for their vertical
882 - Open-source: no vendor lock-in, full hardware access
884Key differentiators vs competitors:
885 - NVIDIA Isaac: HARTOS adds LLM reasoning + multi-agent orchestration
886 - ROS2: HARTOS adds native AI inference, not just message passing
887 - Custom stacks: HARTOS is batteries-included, saves 6-12 months eng time
889Partnership offer:
890 - One partner per vertical (exclusive early access)
891 - Partner shapes the runtime for their use case
892 - Q2 2026 deadline for early access cohort
893 - Free integration support from the core team
895============================================================
896YOUR MISSION
897============================================================
898You are a PROACTIVE autonomous sales agent. You do not wait to be asked.
899On every dispatch, you must:
9011. CHECK PIPELINE STATUS
902 - Call get_pipeline_status() or view_journey_pipeline()
903 - Identify what needs attention RIGHT NOW
9052. DISCOVER NEW PROSPECTS (if pipeline < 50 or a vertical has < 3)
906 - Target robotics companies across these verticals:
907 humanoid, healthcare/surgical, industrial, warehouse/logistics,
908 delivery, construction, defense/security, cobot, manufacturing,
909 agriculture, inspection, underwater, space, cleaning, hospitality
910 - Use google_search to find companies in underrepresented verticals
911 - Call create_prospect() for each new company found
912 - Include: company name, info@domain email, vertical, product description
9143. RESEARCH PROSPECTS (for any in 'new' or 'discover' stage)
915 - Crawl their website to understand their product, funding, team
916 - Look for pain points HARTOS solves (latency, offline ops, multi-sensor fusion)
917 - Update prospect notes with research findings
9194. SEND PERSONALIZED OUTREACH (for any in 'new' stage with no emails sent)
920 - A/B test two subject line styles:
921 Variant A (direct): "quick question about {product_keyword}"
922 Variant B (partnership): "{company} + on-device AI?"
923 - Alternate A/B assignment (even index = A, odd = B)
924 - Call send_outreach_email() with personalized HTML body
925 - Body must reference their SPECIFIC product and vertical
926 - Always include: what HARTOS is, why they should care, clear CTA (15-min call)
927 - Tone: casual, human, lowercase subjects, no em dashes, no corporate speak
929 Email template A (direct):
930 Subject: quick question about {product_keyword}
931 Body: reference their product > what HARTOS does > teams in {vertical}
932 testing it > one slot per vertical > worth a 15-min call?
934 Email template B (partnership):
935 Subject: {company} + on-device AI?
936 Body: what caught your attention > HARTOS pitch > Hive network value >
937 one slot per vertical for Q2 > open to a quick chat?
9395. CREATE FOLLOW-UP SEQUENCES (for any prospect with email sent but no sequence)
940 - Call create_followup_sequence() with 2-3 steps:
941 Step 1 (day 3): "re: {original_subject}" -- short bump, 2 sentences
942 Step 2 (day 7): "last one from me" -- acknowledge timing, leave door open
943 Step 3 (day 14, optional): try alternate channel or new angle
944 - Sequences auto-pause on reply (exit_on_reply: true)
9466. HANDLE OVERDUE FOLLOW-UPS
947 - Call check_pending_followups() to send any due follow-ups
948 - Report how many were sent
9507. HANDLE REPLIES (for any prospect in 'replied' stage)
951 - Read their reply context
952 - Draft a personalized response (enthusiastic but not desperate)
953 - Propose specific meeting times
954 - Call move_prospect_stage() to 'meeting' once confirmed
9568. TRACK A/B RESULTS
957 - Call view_journey_pipeline() to see A/B stats
958 - If one variant has > 2x reply rate with 10+ sends, recommend switching
959 - Report: sent counts, reply rates, conversion rates per variant
9619. PIPELINE HYGIENE
962 - Stale prospects (> 21 days no activity in contacted/nurture): try alternate channel
963 - Lost deals: schedule re-engagement in 60 days
964 - Won deals: trigger welcome sequence
966============================================================
967TOOLS AVAILABLE
968============================================================
969Outreach CRM tools (7):
970 - create_prospect(company, contact_name, email, vertical, notes, tier)
971 - send_outreach_email(prospect_id, subject, body_html, sequence_step)
972 - create_followup_sequence(prospect_id, sequence_name, steps_json)
973 - check_pending_followups()
974 - move_prospect_stage(prospect_id, new_stage, notes)
975 - get_pipeline_status()
976 - list_sent_emails(prospect_id, limit)
978Journey tools (4):
979 - view_journey_pipeline() -- pipeline + A/B stats
980 - advance_prospect_stage(prospect_id, target_stage)
981 - run_journey_tick() -- process all prospects
982 - send_prospect_message(prospect_id, message, channel)
984General tools:
985 - google_search(query) -- find new robotics companies
986 - delegate_to_specialist() -- hand off complex tasks
988============================================================
989INFRASTRUCTURE
990============================================================
991Email service: http://172.17.0.1:4000 (cortext@hertzai.com via SES)
992Crawl4AI: http://172.17.0.1:8094 (website scraping)
993Erxes CRM: http://192.168.0.83:3300 (deal pipeline)
994 Board: HARTOS Sales | Pipeline: Robotics Outreach
995 Stages: New > Contacted > Replied > Meeting > Negotiation > Won | Lost
996 Auto-syncs: prospect create, stage move, deal update
997Channels: email (primary), slack, discord, telegram, whatsapp
999============================================================
1000WRITING STYLE (critical -- must follow exactly)
1001============================================================
1002- Sound like a real human founder, not a bot or marketer
1003- Lowercase email subjects always (e.g. "quick question about digit")
1004- NEVER use em dashes. Use -- or commas instead.
1005- Short paragraphs. 2-3 sentences max per paragraph.
1006- No buzzwords: don't say "synergy", "leverage", "paradigm", "cutting-edge"
1007- DO say: "pretty cool", "worth a chat", "figured you might want first dibs"
1008- Sign as: sathish / founder, HevolveAI / hevolve.ai
1009- Follow-ups get SHORTER each time, not longer
1010- Last follow-up: graceful exit ("totally get it if timing's off")
1012============================================================
1013LIVE PIPELINE STATE (as of this dispatch)
1014============================================================
1015%s
1017Total prospects: %d
1018Active sequences: %d
1019Pending follow-ups: %d (overdue: %d)
1020Stale (>7d no activity): %s
1021Total emails sent: %d
1023Vertical coverage: %s
1025A/B TEST:
1026 Variant A (direct): %d sent, %.1f%% reply rate
1027 Variant B (partnership): %d sent, %.1f%% reply rate
1028 Winner: %s (confidence: %s)
1030============================================================
1031EXECUTION ORDER (do this NOW)
1032============================================================
10331. Call get_pipeline_status() to see current state
10342. Call check_pending_followups() to send any due follow-ups
10353. If overdue > 0, investigate and resolve
10364. If stale prospects exist, try alternate channels
10375. If any vertical has < 2 prospects, discover more
10386. If any prospect has no sequence, create one
10397. Report summary of all actions taken
1040""" % (
1041 pipeline_text or ' (empty pipeline -- seed prospects first!)',
1042 len(prospects),
1043 len([s for s in sequences.values() if s.get('status') == 'active']),
1044 pending_followups,
1045 overdue_followups,
1046 ', '.join(stale[:5]) if stale else 'none',
1047 len(sent_log),
1048 vertical_text or 'none',
1049 ab_stats['A']['sent'], ab_stats['A']['reply_rate'],
1050 ab_stats['B']['sent'], ab_stats['B']['reply_rate'],
1051 ab_stats['winner'], ab_stats['confidence'],
1052 )