Coverage for integrations / agent_engine / outreach_crm_tools.py: 12.9%
294 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Unified Agent Goal Engine - Outreach CRM Tools (Tier 2)
4These tools are loaded ONLY when the agent is working on an 'outreach' goal.
5They connect HARTOS agents to Erxes CRM for:
6 - Lead/contact management
7 - Deal pipeline tracking
8 - Automated email outreach via cortext@hertzai.com
9 - Follow-up sequence automation
10 - Reply detection and pipeline stage updates
12Integrates with:
13 - Erxes GraphQL API (http://192.168.0.9:3300)
14 - HertzAI email service (http://192.168.0.9:4000/sendEmail)
16Tier 1 (Default): google_search, text_2_image, delegate_to_specialist, etc.
17Tier 2 (Category): create_lead, send_outreach, check_replies, move_deal_stage
18Tier 3 (Runtime): delegate_to_specialist for channel-specific follow-ups
19"""
20import html
21import json
22import logging
23import os
24import re
25import threading
26import time
27from datetime import datetime, timedelta
28from typing import Annotated, Optional, Dict, List
30logger = logging.getLogger('hevolve_outreach')
32# ── Configuration ──
33ERXES_API_URL = os.environ.get('ERXES_API_URL', 'http://localhost:3300')
34EMAIL_SERVICE_URL = os.environ.get('EMAIL_SERVICE_URL', 'http://localhost:4000')
36# ── Thread lock for prospect file access (daemon + request thread) ──
37_prospect_lock = threading.Lock()
39# ── Local prospect store (file-backed, works without Erxes) ──
40_PROSPECT_FILE = os.path.join(
41 os.path.dirname(os.path.abspath(__file__)),
42 '..', '..', 'agent_data', 'outreach_prospects.json'
43)
46def _load_prospects() -> Dict:
47 """Load prospect data from local JSON store. Thread-safe."""
48 with _prospect_lock:
49 try:
50 with open(_PROSPECT_FILE, 'r') as f:
51 return json.load(f)
52 except (FileNotFoundError, json.JSONDecodeError):
53 return {'prospects': {}, 'sequences': {}, 'sent_log': []}
56def _save_prospects(data: Dict):
57 """Save prospect data to local JSON store. Thread-safe.
59 Safety: refuses to overwrite non-empty data with empty data.
60 """
61 with _prospect_lock:
62 prospects = data.get('prospects', {})
63 # Guard: never overwrite existing data with empty
64 if not prospects:
65 try:
66 with open(_PROSPECT_FILE, 'r') as f:
67 existing = json.load(f)
68 if existing.get('prospects'):
69 logger.warning('_save_prospects: refusing to overwrite %d prospects with empty data',
70 len(existing['prospects']))
71 return
72 except (FileNotFoundError, json.JSONDecodeError):
73 pass
74 os.makedirs(os.path.dirname(_PROSPECT_FILE), exist_ok=True)
75 with open(_PROSPECT_FILE, 'w') as f:
76 json.dump(data, f, indent=2, default=str)
79def _get_erxes():
80 """Get the native Erxes CRM client (singleton). Returns None if unavailable."""
81 try:
82 from integrations.agent_engine.erxes_client import get_erxes_client
83 return get_erxes_client()
84 except Exception as e:
85 logger.debug(f"Erxes client unavailable: {e}")
86 return None
89def _sync_prospect_to_crm(prospect: Dict) -> Dict:
90 """Sync a prospect to Erxes CRM. Returns sync result with IDs."""
91 erxes = _get_erxes()
92 if not erxes:
93 return {'synced': False, 'reason': 'erxes_unavailable'}
94 try:
95 result = erxes.sync_prospect_to_erxes(prospect)
96 if result.get('synced'):
97 logger.info(f"CRM synced: {prospect.get('company')} "
98 f"(customer={result.get('erxes_customer_id')}, "
99 f"deal={result.get('erxes_deal_id')})")
100 return result
101 except Exception as e:
102 logger.debug(f"CRM sync failed: {e}")
103 return {'synced': False, 'error': str(e)}
106def _sync_stage_change(prospect: Dict, new_stage: str):
107 """Sync a stage change to Erxes deal pipeline."""
108 erxes = _get_erxes()
109 if not erxes:
110 return
111 try:
112 erxes.sync_stage_change(prospect, new_stage)
113 except Exception as e:
114 logger.debug(f"CRM stage sync failed: {e}")
117def _sanitize_html(html_body: str) -> str:
118 """Strip dangerous HTML tags (script, iframe, object) from email body."""
119 if not html_body:
120 return ''
121 cleaned = re.sub(r'<(script|iframe|object|embed|form|input)[^>]*>.*?</\1>', '', html_body, flags=re.IGNORECASE | re.DOTALL)
122 cleaned = re.sub(r'\s+on\w+\s*=\s*["\'][^"\']*["\']', '', cleaned, flags=re.IGNORECASE)
123 return cleaned
126def _send_email(to_email: str, subject: str, html_body: str) -> Dict:
127 """Send email via the channel system (user's configured SMTP).
129 Tries the email channel adapter first (user's own SMTP config).
130 Falls back to the HertzAI mailer service if no email channel is configured.
131 """
132 sanitized_body = _sanitize_html(html_body)
133 # Strip HTML for plain-text fallback
134 plain_text = re.sub(r'<[^>]+>', '', sanitized_body).strip()
136 # Tier 1: Email channel adapter (user's SMTP)
137 try:
138 from integrations.channels.registry import get_registry
139 import asyncio
140 registry = get_registry()
141 adapter = registry.get_adapter('email')
142 if adapter and hasattr(adapter, 'send_email'):
143 loop = getattr(registry, '_loop', None)
144 if loop and loop.is_running():
145 future = asyncio.run_coroutine_threadsafe(
146 adapter.send_email(
147 to=to_email, subject=subject,
148 body=plain_text, html_body=sanitized_body,
149 ),
150 loop,
151 )
152 result = future.result(timeout=30)
153 return {'success': result.success, 'via': 'email_channel',
154 'error': result.error if not result.success else None}
155 except Exception as e:
156 logger.debug(f"Email channel not available: {e}")
158 # Tier 2: HertzAI mailer service (fallback)
159 try:
160 from core.http_pool import pooled_post
161 resp = pooled_post(
162 f'{EMAIL_SERVICE_URL}/sendEmail',
163 json={
164 'emailList': [to_email] if isinstance(to_email, str) else to_email,
165 'subject': subject,
166 'message': sanitized_body,
167 },
168 headers={'Content-Type': 'application/json'},
169 timeout=15,
170 )
171 try:
172 body = resp.json()
173 except Exception:
174 body = {'raw': resp.text[:200]}
175 return {'success': resp.status_code < 400, 'via': 'mailer_service',
176 'status_code': resp.status_code, 'response': body}
177 except Exception as e:
178 return {'success': False, 'error': str(e)}
181def register_outreach_tools(helper, assistant, user_id: str):
182 """Register outreach CRM tools with the agent (Tier 2).
184 Args:
185 helper: AutoGen helper agent (registers for LLM)
186 assistant: AutoGen assistant agent (registers for execution)
187 user_id: Current user ID for ownership
188 """
190 def create_prospect(
191 company: Annotated[str, "Company name"],
192 contact_name: Annotated[str, "Contact person's name"],
193 email: Annotated[str, "Contact email address"],
194 title: Annotated[Optional[str], "Contact's job title"] = None,
195 vertical: Annotated[Optional[str], "Industry vertical (humanoid|healthcare|industrial|cobot)"] = None,
196 notes: Annotated[Optional[str], "Additional notes about the prospect"] = None,
197 tier: Annotated[int, "Prospect tier (1=top priority, 2=secondary)"] = 1,
198 ) -> str:
199 """Create a new prospect in the outreach CRM.
201 Stores in local JSON and syncs to Erxes if available.
202 """
203 data = _load_prospects()
204 prospect_id = f"{company.lower().replace(' ', '_')}_{int(time.time())}"
206 prospect = {
207 'id': prospect_id,
208 'company': company,
209 'contact_name': contact_name,
210 'email': email,
211 'title': title or '',
212 'vertical': vertical or 'general',
213 'notes': notes or '',
214 'tier': tier,
215 'stage': 'new', # new → contacted → replied → meeting → negotiation → won/lost
216 'created_at': datetime.utcnow().isoformat(),
217 'updated_at': datetime.utcnow().isoformat(),
218 'created_by': user_id,
219 'emails_sent': 0,
220 'last_email_at': None,
221 'last_reply_at': None,
222 }
224 data['prospects'][prospect_id] = prospect
225 _save_prospects(data)
227 # Sync to Erxes CRM (native client)
228 crm_result = _sync_prospect_to_crm(prospect)
229 if crm_result.get('synced'):
230 prospect['erxes_customer_id'] = crm_result.get('erxes_customer_id')
231 prospect['erxes_deal_id'] = crm_result.get('erxes_deal_id')
232 data['prospects'][prospect_id] = prospect
233 _save_prospects(data)
235 return json.dumps({'success': True, 'prospect': prospect, 'crm_sync': crm_result})
237 def send_outreach_email(
238 prospect_id: Annotated[str, "Prospect ID to send email to"],
239 subject: Annotated[str, "Email subject line"],
240 body_html: Annotated[str, "Email body in HTML format"],
241 sequence_step: Annotated[int, "Sequence step number (1=initial, 2=first follow-up, etc.)"] = 1,
242 ) -> str:
243 """Send an outreach email to a prospect and log it.
245 Uses the HertzAI email service (cortext@hertzai.com).
246 Updates prospect stage to 'contacted' on first email.
247 """
248 data = _load_prospects()
249 prospect = data['prospects'].get(prospect_id)
250 if not prospect:
251 return json.dumps({'success': False, 'error': f'Prospect {prospect_id} not found'})
253 # Send via HertzAI
254 result = _send_email(
255 to_email=prospect['email'],
256 subject=subject,
257 html_body=body_html,
258 )
260 # Log the send
261 send_entry = {
262 'prospect_id': prospect_id,
263 'to': prospect['email'],
264 'subject': subject,
265 'sequence_step': sequence_step,
266 'sent_at': datetime.utcnow().isoformat(),
267 'send_result': result,
268 }
269 data['sent_log'].append(send_entry)
271 # Update prospect state
272 prospect['emails_sent'] = prospect.get('emails_sent', 0) + 1
273 prospect['last_email_at'] = datetime.utcnow().isoformat()
274 if prospect['stage'] == 'new':
275 prospect['stage'] = 'contacted'
276 prospect['updated_at'] = datetime.utcnow().isoformat()
278 _save_prospects(data)
279 return json.dumps({'success': True, 'send_result': result, 'prospect_stage': prospect['stage']})
281 def create_followup_sequence(
282 prospect_id: Annotated[str, "Prospect ID to create sequence for"],
283 sequence_name: Annotated[str, "Name for this follow-up sequence"],
284 steps: Annotated[str, "JSON array of sequence steps, each with: delay_days, subject, body_html"],
285 ) -> str:
286 """Create an automated follow-up sequence for a prospect.
288 Each step has a delay (days after previous step), subject, and body.
289 The agent daemon will check and execute pending steps daily.
291 Example steps JSON:
292 [
293 {"delay_days": 3, "subject": "following up on HARTOS", "body_html": "<p>Hi...</p>"},
294 {"delay_days": 5, "subject": "quick update", "body_html": "<p>Hey...</p>"},
295 {"delay_days": 7, "subject": "last note", "body_html": "<p>Just wanted...</p>"}
296 ]
297 """
298 data = _load_prospects()
299 prospect = data['prospects'].get(prospect_id)
300 if not prospect:
301 return json.dumps({'success': False, 'error': f'Prospect {prospect_id} not found'})
303 try:
304 step_list = json.loads(steps) if isinstance(steps, str) else steps
305 except json.JSONDecodeError:
306 return json.dumps({'success': False, 'error': 'Invalid JSON for steps'})
308 sequence_id = f"seq_{prospect_id}_{int(time.time())}"
310 # Calculate scheduled dates from now
311 base_time = datetime.utcnow()
312 scheduled_steps = []
313 cumulative_days = 0
314 for i, step in enumerate(step_list):
315 cumulative_days += step.get('delay_days', 3)
316 scheduled_steps.append({
317 'step_number': i + 2, # step 1 was the initial outreach
318 'delay_days': step['delay_days'],
319 'scheduled_at': (base_time + timedelta(days=cumulative_days)).isoformat(),
320 'subject': step['subject'],
321 'body_html': step['body_html'],
322 'status': 'pending', # pending → sent → skipped (if replied)
323 'sent_at': None,
324 })
326 sequence = {
327 'id': sequence_id,
328 'prospect_id': prospect_id,
329 'name': sequence_name,
330 'steps': scheduled_steps,
331 'created_at': datetime.utcnow().isoformat(),
332 'status': 'active', # active → paused → completed
333 'exit_on_reply': True,
334 }
336 data['sequences'][sequence_id] = sequence
337 _save_prospects(data)
339 return json.dumps({'success': True, 'sequence': sequence})
341 def check_pending_followups() -> str:
342 """Check all active sequences for follow-ups that are due.
344 Sends emails for due steps, skips sequences where the prospect has replied.
345 Returns summary of actions taken.
346 """
347 return _check_pending_followups_impl()
349 def move_prospect_stage(
350 prospect_id: Annotated[str, "Prospect ID"],
351 new_stage: Annotated[str, "New stage: new|contacted|replied|meeting|negotiation|won|lost"],
352 notes: Annotated[Optional[str], "Notes about the stage change"] = None,
353 ) -> str:
354 """Move a prospect to a new pipeline stage."""
355 valid_stages = ['new', 'contacted', 'replied', 'meeting', 'negotiation', 'won', 'lost']
356 if new_stage not in valid_stages:
357 return json.dumps({'success': False, 'error': f'Invalid stage. Use: {valid_stages}'})
359 data = _load_prospects()
360 prospect = data['prospects'].get(prospect_id)
361 if not prospect:
362 return json.dumps({'success': False, 'error': f'Prospect {prospect_id} not found'})
364 old_stage = prospect['stage']
365 prospect['stage'] = new_stage
366 prospect['updated_at'] = datetime.utcnow().isoformat()
367 if notes:
368 prospect['notes'] = prospect.get('notes', '') + f'\n[{datetime.utcnow().isoformat()}] {old_stage}→{new_stage}: {notes}'
370 # If prospect replied, mark it
371 if new_stage == 'replied':
372 prospect['last_reply_at'] = datetime.utcnow().isoformat()
374 _save_prospects(data)
376 # Sync stage change to Erxes CRM
377 _sync_stage_change(prospect, new_stage)
379 return json.dumps({'success': True, 'prospect': prospect, 'transition': f'{old_stage}→{new_stage}'})
381 def get_pipeline_status() -> str:
382 """Get the full outreach pipeline status — all prospects grouped by stage.
384 Merges local prospect data with Erxes CRM pipeline view.
385 """
386 data = _load_prospects()
387 pipeline = {}
388 for pid, prospect in data.get('prospects', {}).items():
389 stage = prospect.get('stage', 'new')
390 if stage not in pipeline:
391 pipeline[stage] = []
392 pipeline[stage].append({
393 'id': pid,
394 'company': prospect['company'],
395 'contact': prospect['contact_name'],
396 'email': prospect['email'],
397 'emails_sent': prospect.get('emails_sent', 0),
398 'last_email': prospect.get('last_email_at'),
399 'tier': prospect.get('tier', 1),
400 'erxes_deal_id': prospect.get('erxes_deal_id'),
401 })
403 # Count active sequences
404 active_sequences = sum(
405 1 for s in data.get('sequences', {}).values()
406 if s.get('status') == 'active'
407 )
409 # Include Erxes CRM status if available
410 erxes_status = None
411 erxes = _get_erxes()
412 if erxes:
413 try:
414 erxes_status = erxes.get_pipeline_status()
415 except Exception as e:
416 erxes_status = {'error': str(e)}
418 return json.dumps({
419 'success': True,
420 'pipeline': pipeline,
421 'total_prospects': len(data.get('prospects', {})),
422 'active_sequences': active_sequences,
423 'erxes_pipeline': erxes_status,
424 })
426 def list_sent_emails(
427 prospect_id: Annotated[Optional[str], "Filter by prospect ID (optional)"] = None,
428 limit: Annotated[int, "Max emails to return"] = 20,
429 ) -> str:
430 """List sent outreach emails, optionally filtered by prospect."""
431 data = _load_prospects()
432 log = data.get('sent_log', [])
434 if prospect_id:
435 log = [e for e in log if e.get('prospect_id') == prospect_id]
437 # Most recent first
438 log = sorted(log, key=lambda x: x.get('sent_at', ''), reverse=True)[:limit]
439 return json.dumps({'success': True, 'emails': log, 'total': len(log)})
441 # ── Register all tools ──
442 # Routed through core.labeled_autogen_function so the UI spinner
443 # shows a tool-specific label (publish_chat_stage('tool_call', …))
444 # for autogen-invoked tools too. Same shape as LangChain's
445 # _with_tool_logging chokepoint. UI labels live in
446 # core/constants.py:TOOL_LABELS (single source of truth).
447 from core.labeled_autogen_function import register_labeled_function
448 from core.constants import TOOL_LABELS
450 for func in [
451 create_prospect,
452 send_outreach_email,
453 create_followup_sequence,
454 check_pending_followups,
455 move_prospect_stage,
456 get_pipeline_status,
457 list_sent_emails,
458 ]:
459 register_labeled_function(
460 func,
461 caller=helper,
462 executor=assistant,
463 description=func.__doc__,
464 ui_label=TOOL_LABELS.get(func.__name__, f'Running {func.__name__}…'),
465 )
467 logger.info(f"Registered 7 outreach CRM tools for user {user_id}")
470def _check_pending_followups_impl() -> str:
471 """Shared logic for checking pending follow-ups. Caller must hold _prospect_lock.
473 Returns JSON string with actions taken. Used by both the tool-level
474 check_pending_followups and the daemon-level check_pending_followups_daemon.
475 """
476 data = _load_prospects()
477 now = datetime.utcnow()
478 actions_taken = []
480 for seq_id, sequence in data.get('sequences', {}).items():
481 if sequence['status'] != 'active':
482 continue
484 prospect = data['prospects'].get(sequence['prospect_id'])
485 if not prospect:
486 continue
488 # Exit condition: prospect already replied
489 if sequence.get('exit_on_reply') and prospect.get('last_reply_at'):
490 sequence['status'] = 'completed'
491 actions_taken.append({
492 'action': 'sequence_completed',
493 'reason': 'prospect_replied',
494 'prospect': prospect['company'],
495 })
496 continue
498 # Check each pending step
499 for step in sequence['steps']:
500 if step['status'] != 'pending':
501 continue
503 scheduled = datetime.fromisoformat(step['scheduled_at'])
504 if now >= scheduled:
505 result = _send_email(
506 to_email=prospect['email'],
507 subject=step['subject'],
508 html_body=step['body_html'],
509 )
510 step['status'] = 'sent'
511 step['sent_at'] = now.isoformat()
513 prospect['emails_sent'] = prospect.get('emails_sent', 0) + 1
514 prospect['last_email_at'] = now.isoformat()
516 actions_taken.append({
517 'action': 'followup_sent',
518 'prospect': prospect['company'],
519 'step': step['step_number'],
520 'subject': step['subject'],
521 'result': result,
522 })
523 break # Only send one step per check cycle
525 # Check if all steps are done
526 if all(s['status'] != 'pending' for s in sequence['steps']):
527 sequence['status'] = 'completed'
529 _save_prospects(data)
530 return json.dumps({'success': True, 'actions': actions_taken, 'checked_at': now.isoformat()})
533# ── Goal Type Registration ──
535def build_outreach_prompt(goal_dict: Dict, product_dict: Dict = None) -> str:
536 """Build the prompt for an outreach goal.
538 This prompt is sent to /chat → CREATE/REUSE pipeline.
539 """
540 title = goal_dict.get('title', 'Sales Outreach')
541 description = goal_dict.get('description', '')
542 config = goal_dict.get('config', {})
544 prospects_summary = ''
545 data = _load_prospects()
546 if data['prospects']:
547 lines = []
548 for pid, p in data['prospects'].items():
549 lines.append(f"- {p['company']} ({p['contact_name']}, {p['email']}) — stage: {p['stage']}, emails sent: {p.get('emails_sent', 0)}")
550 prospects_summary = '\n'.join(lines)
552 return f"""You are a B2B sales outreach agent for HevolveAI.
554PRODUCT: HARTOS (Hevolve Hive Agentic Runtime OS)
555- On-device AI runtime for robotics: LLM inference, vision, speech, semantic memory, multi-agent orchestration
556- Democratically evolving Hive network (not controlled by any single LLM vendor)
557- Flywheel: more robots deployed → more data → better model → more developers → more robots
558- Early partners shape how the intelligence evolves for their vertical
560GOAL: {title}
561{description}
563CURRENT PIPELINE:
564{prospects_summary or '(No prospects yet — use create_prospect to add them)'}
566RULES:
5671. Sound human. No em dashes. Casual lowercase subject lines. Short sentences.
5682. Create FOMO: one partner per vertical, Q2 deadline, name competitors who locked into Big Tech
5693. Follow-up sequence: day 3, day 7, day 14 — each shorter and more direct
5704. If a prospect replies, move them to 'replied' stage and stop the sequence
5715. Use the outreach CRM tools to track everything
573CONFIG: {json.dumps(config)}
574"""
577def register_outreach_goal_type():
578 """Register 'outreach' as a goal type in the agent engine.
580 Call this during HARTOS boot to enable outreach automation.
581 """
582 from integrations.agent_engine.goal_manager import register_goal_type
583 register_goal_type(
584 goal_type='outreach',
585 build_prompt=build_outreach_prompt,
586 tool_tags=['outreach', 'email', 'crm'],
587 )
588 logger.info("Registered 'outreach' goal type with prompt builder and tool tags")
589 # Wire reply detection into email channel
590 register_reply_handler()
593def check_pending_followups_daemon() -> dict:
594 """Module-level follow-up checker called by agent_daemon on each tick.
596 Delegates to the tool-level check_pending_followups logic (DRY).
597 Returns dict with 'sent' count.
598 """
599 result_json = _check_pending_followups_impl()
600 result = json.loads(result_json)
601 sent = len([a for a in result.get('actions', []) if a.get('action') == 'followup_sent'])
602 for action in result.get('actions', []):
603 if action.get('action') == 'followup_sent':
604 logger.info(f"Follow-up sent: {action.get('prospect')} step {action.get('step')} subject='{action.get('subject')}'")
605 elif action.get('action') == 'sequence_completed':
606 logger.info(f"Sequence completed: {action.get('prospect')} ({action.get('reason')})")
607 return {'sent': sent, 'checked_at': result.get('checked_at', '')}
610# ═══════════════════════════════════════════════════════════════
611# Inbound Reply Handler — wired into email channel adapter
612# ═══════════════════════════════════════════════════════════════
614def handle_inbound_email(sender_email: str, subject: str, body: str, message_id: str = '') -> Optional[Dict]:
615 """Check if an inbound email matches a prospect. If so, update CRM and trigger response flow.
617 Called by the email channel adapter's _process_email hook.
618 Returns match info if this is a prospect reply, None otherwise.
619 """
620 data = _load_prospects()
622 # Match sender to any prospect by email
623 matched_prospect = None
624 for pid, prospect in data.get('prospects', {}).items():
625 if prospect.get('email', '').lower() == sender_email.lower():
626 matched_prospect = prospect
627 break
629 if not matched_prospect:
630 return None # Not a prospect — normal email flow
632 # ── Update prospect state ──
633 matched_prospect['stage'] = 'replied'
634 matched_prospect['last_reply_at'] = datetime.utcnow().isoformat()
635 matched_prospect['updated_at'] = datetime.utcnow().isoformat()
636 matched_prospect['notes'] = matched_prospect.get('notes', '') + (
637 f"\n[{datetime.utcnow().isoformat()}] REPLY received: {subject}"
638 )
640 # ── Pause all active sequences for this prospect ──
641 for seq_id, sequence in data.get('sequences', {}).items():
642 if sequence.get('prospect_id') == matched_prospect['id'] and sequence['status'] == 'active':
643 sequence['status'] = 'completed'
644 logger.info(f"Sequence {seq_id} auto-completed: prospect {matched_prospect['company']} replied")
646 _save_prospects(data)
648 # ── Sync stage change to Erxes CRM ──
649 _sync_stage_change(matched_prospect, 'replied')
651 # ── Build context for the agent to draft a response ──
652 # Gather full conversation history
653 sent_emails = [
654 e for e in data.get('sent_log', [])
655 if e.get('prospect_id') == matched_prospect['id']
656 ]
657 sent_emails.sort(key=lambda x: x.get('sent_at', ''))
659 context = {
660 'prospect': matched_prospect,
661 'their_reply': {'subject': subject, 'body': body[:2000], 'message_id': message_id},
662 'our_emails': [
663 {'subject': e.get('subject', ''), 'sent_at': e.get('sent_at', ''), 'step': e.get('sequence_step', 1)}
664 for e in sent_emails[-5:] # Last 5 emails we sent
665 ],
666 'campaign_stage': matched_prospect.get('stage', 'replied'),
667 'total_emails_sent': matched_prospect.get('emails_sent', 0),
668 }
670 # ── Push notification to user ──
671 _notify_prospect_replied(matched_prospect, subject, body, context)
673 # ── Dispatch agent to draft response ──
674 _dispatch_response_draft(matched_prospect, context)
676 logger.info(
677 f"Prospect reply detected: {matched_prospect['company']} ({sender_email}) "
678 f"subject='{subject}' — stage moved to 'replied', sequences paused"
679 )
680 return context
683def _notify_prospect_replied(prospect: Dict, subject: str, body: str, context: Dict):
684 """Push notification to user that a prospect replied.
686 Uses EventBus → Nunba notification channel.
687 """
688 try:
689 from core.platform.events import emit_event
690 emit_event('outreach.prospect_replied', {
691 'prospect_id': prospect['id'],
692 'company': prospect['company'],
693 'contact': prospect['contact_name'],
694 'email': prospect['email'],
695 'subject': subject,
696 'body_preview': body[:200],
697 'stage': prospect['stage'],
698 'emails_sent': prospect.get('emails_sent', 0),
699 })
700 except Exception as e:
701 logger.debug(f"EventBus notification failed: {e}")
703 # Also try direct Nunba push via channel system
704 try:
705 from integrations.channels.agent_tools import _get_user_id_from_threadlocal
706 from integrations.channels.response.router import get_response_router
707 user_id = prospect.get('created_by') or _get_user_id_from_threadlocal()
708 router = get_response_router()
709 notification = (
710 f"📬 {prospect['company']} replied to your outreach!\n"
711 f"From: {prospect['contact_name']} ({prospect['email']})\n"
712 f"Subject: {subject}\n"
713 f"Preview: {body[:150]}..."
714 )
715 router.route_response(user_id=user_id, response_text=notification, fan_out=True)
716 except Exception as e:
717 logger.debug(f"Push notification failed: {e}")
720def _dispatch_response_draft(prospect: Dict, context: Dict):
721 """Dispatch an agent task to draft a response to the prospect's reply.
723 Uses the existing dispatch system — creates a goal that runs through
724 the CREATE/REUSE pipeline with full conversation context.
725 """
726 try:
727 from integrations.agent_engine.dispatch import dispatch_goal
728 user_id = prospect.get('created_by', 'system')
730 prompt = (
731 f"A prospect has replied to our outreach. Draft a response.\n\n"
732 f"PROSPECT: {prospect['company']} — {prospect['contact_name']} ({prospect['email']})\n"
733 f"STAGE: {prospect.get('stage', 'replied')}\n"
734 f"THEIR REPLY:\nSubject: {context['their_reply']['subject']}\n"
735 f"Body: {context['their_reply']['body']}\n\n"
736 f"OUR PREVIOUS EMAILS ({len(context['our_emails'])} total):\n"
737 )
738 for e in context['our_emails']:
739 prompt += f" - Step {e['step']}: \"{e['subject']}\" (sent {e['sent_at']})\n"
741 prompt += (
742 f"\nRULES:\n"
743 f"1. Be conversational, not salesy. They replied — that's interest.\n"
744 f"2. Reference what they said specifically.\n"
745 f"3. Propose a concrete next step (call, demo, meeting link).\n"
746 f"4. Keep it short — 3-5 sentences max.\n"
747 f"5. Do NOT send the email automatically — present the draft for user approval.\n"
748 )
750 dispatch_goal(
751 prompt=prompt,
752 user_id=user_id,
753 goal_id=f"outreach_reply_{prospect['id']}_{int(time.time())}",
754 goal_type='outreach',
755 )
756 except Exception as e:
757 logger.error(f"Failed to dispatch response draft: {e}")
760def register_reply_handler():
761 """Register the inbound email handler with the email channel adapter.
763 Called during boot (from register_outreach_goal_type) to wire
764 reply detection into the channel system.
765 """
766 try:
767 from integrations.channels.registry import get_registry
768 registry = get_registry()
769 adapter = registry.get_adapter('email')
770 if adapter and hasattr(adapter, 'on_message'):
771 async def _outreach_reply_hook(message):
772 """Post-process inbound emails for prospect matching."""
773 sender = getattr(message, 'sender_id', '') or ''
774 subject = getattr(message, 'metadata', {}).get('subject', '')
775 body = getattr(message, 'text', '')
776 msg_id = getattr(message, 'message_id', '')
777 if '@' in sender:
778 handle_inbound_email(sender, subject, body, msg_id)
780 adapter.on_message(_outreach_reply_hook)
781 logger.info("Outreach reply handler registered with email adapter")
782 except Exception as e:
783 logger.debug(f"Could not register reply handler: {e}")