Coverage for integrations / agent_engine / outreach_crm_tools.py: 12.9%

294 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Unified Agent Goal Engine - Outreach CRM Tools (Tier 2) 

3 

4These tools are loaded ONLY when the agent is working on an 'outreach' goal. 

5They connect HARTOS agents to Erxes CRM for: 

6 - Lead/contact management 

7 - Deal pipeline tracking 

8 - Automated email outreach via cortext@hertzai.com 

9 - Follow-up sequence automation 

10 - Reply detection and pipeline stage updates 

11 

12Integrates with: 

13 - Erxes GraphQL API (http://192.168.0.9:3300) 

14 - HertzAI email service (http://192.168.0.9:4000/sendEmail) 

15 

16Tier 1 (Default): google_search, text_2_image, delegate_to_specialist, etc. 

17Tier 2 (Category): create_lead, send_outreach, check_replies, move_deal_stage 

18Tier 3 (Runtime): delegate_to_specialist for channel-specific follow-ups 

19""" 

20import html 

21import json 

22import logging 

23import os 

24import re 

25import threading 

26import time 

27from datetime import datetime, timedelta 

28from typing import Annotated, Optional, Dict, List 

29 

30logger = logging.getLogger('hevolve_outreach') 

31 

32# ── Configuration ── 

33ERXES_API_URL = os.environ.get('ERXES_API_URL', 'http://localhost:3300') 

34EMAIL_SERVICE_URL = os.environ.get('EMAIL_SERVICE_URL', 'http://localhost:4000') 

35 

36# ── Thread lock for prospect file access (daemon + request thread) ── 

37_prospect_lock = threading.Lock() 

38 

39# ── Local prospect store (file-backed, works without Erxes) ── 

40_PROSPECT_FILE = os.path.join( 

41 os.path.dirname(os.path.abspath(__file__)), 

42 '..', '..', 'agent_data', 'outreach_prospects.json' 

43) 

44 

45 

46def _load_prospects() -> Dict: 

47 """Load prospect data from local JSON store. Thread-safe.""" 

48 with _prospect_lock: 

49 try: 

50 with open(_PROSPECT_FILE, 'r') as f: 

51 return json.load(f) 

52 except (FileNotFoundError, json.JSONDecodeError): 

53 return {'prospects': {}, 'sequences': {}, 'sent_log': []} 

54 

55 

56def _save_prospects(data: Dict): 

57 """Save prospect data to local JSON store. Thread-safe. 

58 

59 Safety: refuses to overwrite non-empty data with empty data. 

60 """ 

61 with _prospect_lock: 

62 prospects = data.get('prospects', {}) 

63 # Guard: never overwrite existing data with empty 

64 if not prospects: 

65 try: 

66 with open(_PROSPECT_FILE, 'r') as f: 

67 existing = json.load(f) 

68 if existing.get('prospects'): 

69 logger.warning('_save_prospects: refusing to overwrite %d prospects with empty data', 

70 len(existing['prospects'])) 

71 return 

72 except (FileNotFoundError, json.JSONDecodeError): 

73 pass 

74 os.makedirs(os.path.dirname(_PROSPECT_FILE), exist_ok=True) 

75 with open(_PROSPECT_FILE, 'w') as f: 

76 json.dump(data, f, indent=2, default=str) 

77 

78 

79def _get_erxes(): 

80 """Get the native Erxes CRM client (singleton). Returns None if unavailable.""" 

81 try: 

82 from integrations.agent_engine.erxes_client import get_erxes_client 

83 return get_erxes_client() 

84 except Exception as e: 

85 logger.debug(f"Erxes client unavailable: {e}") 

86 return None 

87 

88 

89def _sync_prospect_to_crm(prospect: Dict) -> Dict: 

90 """Sync a prospect to Erxes CRM. Returns sync result with IDs.""" 

91 erxes = _get_erxes() 

92 if not erxes: 

93 return {'synced': False, 'reason': 'erxes_unavailable'} 

94 try: 

95 result = erxes.sync_prospect_to_erxes(prospect) 

96 if result.get('synced'): 

97 logger.info(f"CRM synced: {prospect.get('company')} " 

98 f"(customer={result.get('erxes_customer_id')}, " 

99 f"deal={result.get('erxes_deal_id')})") 

100 return result 

101 except Exception as e: 

102 logger.debug(f"CRM sync failed: {e}") 

103 return {'synced': False, 'error': str(e)} 

104 

105 

106def _sync_stage_change(prospect: Dict, new_stage: str): 

107 """Sync a stage change to Erxes deal pipeline.""" 

108 erxes = _get_erxes() 

109 if not erxes: 

110 return 

111 try: 

112 erxes.sync_stage_change(prospect, new_stage) 

113 except Exception as e: 

114 logger.debug(f"CRM stage sync failed: {e}") 

115 

116 

117def _sanitize_html(html_body: str) -> str: 

118 """Strip dangerous HTML tags (script, iframe, object) from email body.""" 

119 if not html_body: 

120 return '' 

121 cleaned = re.sub(r'<(script|iframe|object|embed|form|input)[^>]*>.*?</\1>', '', html_body, flags=re.IGNORECASE | re.DOTALL) 

122 cleaned = re.sub(r'\s+on\w+\s*=\s*["\'][^"\']*["\']', '', cleaned, flags=re.IGNORECASE) 

123 return cleaned 

124 

125 

126def _send_email(to_email: str, subject: str, html_body: str) -> Dict: 

127 """Send email via the channel system (user's configured SMTP). 

128 

129 Tries the email channel adapter first (user's own SMTP config). 

130 Falls back to the HertzAI mailer service if no email channel is configured. 

131 """ 

132 sanitized_body = _sanitize_html(html_body) 

133 # Strip HTML for plain-text fallback 

134 plain_text = re.sub(r'<[^>]+>', '', sanitized_body).strip() 

135 

136 # Tier 1: Email channel adapter (user's SMTP) 

137 try: 

138 from integrations.channels.registry import get_registry 

139 import asyncio 

140 registry = get_registry() 

141 adapter = registry.get_adapter('email') 

142 if adapter and hasattr(adapter, 'send_email'): 

143 loop = getattr(registry, '_loop', None) 

144 if loop and loop.is_running(): 

145 future = asyncio.run_coroutine_threadsafe( 

146 adapter.send_email( 

147 to=to_email, subject=subject, 

148 body=plain_text, html_body=sanitized_body, 

149 ), 

150 loop, 

151 ) 

152 result = future.result(timeout=30) 

153 return {'success': result.success, 'via': 'email_channel', 

154 'error': result.error if not result.success else None} 

155 except Exception as e: 

156 logger.debug(f"Email channel not available: {e}") 

157 

158 # Tier 2: HertzAI mailer service (fallback) 

159 try: 

160 from core.http_pool import pooled_post 

161 resp = pooled_post( 

162 f'{EMAIL_SERVICE_URL}/sendEmail', 

163 json={ 

164 'emailList': [to_email] if isinstance(to_email, str) else to_email, 

165 'subject': subject, 

166 'message': sanitized_body, 

167 }, 

168 headers={'Content-Type': 'application/json'}, 

169 timeout=15, 

170 ) 

171 try: 

172 body = resp.json() 

173 except Exception: 

174 body = {'raw': resp.text[:200]} 

175 return {'success': resp.status_code < 400, 'via': 'mailer_service', 

176 'status_code': resp.status_code, 'response': body} 

177 except Exception as e: 

178 return {'success': False, 'error': str(e)} 

179 

180 

181def register_outreach_tools(helper, assistant, user_id: str): 

182 """Register outreach CRM tools with the agent (Tier 2). 

183 

184 Args: 

185 helper: AutoGen helper agent (registers for LLM) 

186 assistant: AutoGen assistant agent (registers for execution) 

187 user_id: Current user ID for ownership 

188 """ 

189 

190 def create_prospect( 

191 company: Annotated[str, "Company name"], 

192 contact_name: Annotated[str, "Contact person's name"], 

193 email: Annotated[str, "Contact email address"], 

194 title: Annotated[Optional[str], "Contact's job title"] = None, 

195 vertical: Annotated[Optional[str], "Industry vertical (humanoid|healthcare|industrial|cobot)"] = None, 

196 notes: Annotated[Optional[str], "Additional notes about the prospect"] = None, 

197 tier: Annotated[int, "Prospect tier (1=top priority, 2=secondary)"] = 1, 

198 ) -> str: 

199 """Create a new prospect in the outreach CRM. 

200 

201 Stores in local JSON and syncs to Erxes if available. 

202 """ 

203 data = _load_prospects() 

204 prospect_id = f"{company.lower().replace(' ', '_')}_{int(time.time())}" 

205 

206 prospect = { 

207 'id': prospect_id, 

208 'company': company, 

209 'contact_name': contact_name, 

210 'email': email, 

211 'title': title or '', 

212 'vertical': vertical or 'general', 

213 'notes': notes or '', 

214 'tier': tier, 

215 'stage': 'new', # new → contacted → replied → meeting → negotiation → won/lost 

216 'created_at': datetime.utcnow().isoformat(), 

217 'updated_at': datetime.utcnow().isoformat(), 

218 'created_by': user_id, 

219 'emails_sent': 0, 

220 'last_email_at': None, 

221 'last_reply_at': None, 

222 } 

223 

224 data['prospects'][prospect_id] = prospect 

225 _save_prospects(data) 

226 

227 # Sync to Erxes CRM (native client) 

228 crm_result = _sync_prospect_to_crm(prospect) 

229 if crm_result.get('synced'): 

230 prospect['erxes_customer_id'] = crm_result.get('erxes_customer_id') 

231 prospect['erxes_deal_id'] = crm_result.get('erxes_deal_id') 

232 data['prospects'][prospect_id] = prospect 

233 _save_prospects(data) 

234 

235 return json.dumps({'success': True, 'prospect': prospect, 'crm_sync': crm_result}) 

236 

237 def send_outreach_email( 

238 prospect_id: Annotated[str, "Prospect ID to send email to"], 

239 subject: Annotated[str, "Email subject line"], 

240 body_html: Annotated[str, "Email body in HTML format"], 

241 sequence_step: Annotated[int, "Sequence step number (1=initial, 2=first follow-up, etc.)"] = 1, 

242 ) -> str: 

243 """Send an outreach email to a prospect and log it. 

244 

245 Uses the HertzAI email service (cortext@hertzai.com). 

246 Updates prospect stage to 'contacted' on first email. 

247 """ 

248 data = _load_prospects() 

249 prospect = data['prospects'].get(prospect_id) 

250 if not prospect: 

251 return json.dumps({'success': False, 'error': f'Prospect {prospect_id} not found'}) 

252 

253 # Send via HertzAI 

254 result = _send_email( 

255 to_email=prospect['email'], 

256 subject=subject, 

257 html_body=body_html, 

258 ) 

259 

260 # Log the send 

261 send_entry = { 

262 'prospect_id': prospect_id, 

263 'to': prospect['email'], 

264 'subject': subject, 

265 'sequence_step': sequence_step, 

266 'sent_at': datetime.utcnow().isoformat(), 

267 'send_result': result, 

268 } 

269 data['sent_log'].append(send_entry) 

270 

271 # Update prospect state 

272 prospect['emails_sent'] = prospect.get('emails_sent', 0) + 1 

273 prospect['last_email_at'] = datetime.utcnow().isoformat() 

274 if prospect['stage'] == 'new': 

275 prospect['stage'] = 'contacted' 

276 prospect['updated_at'] = datetime.utcnow().isoformat() 

277 

278 _save_prospects(data) 

279 return json.dumps({'success': True, 'send_result': result, 'prospect_stage': prospect['stage']}) 

280 

281 def create_followup_sequence( 

282 prospect_id: Annotated[str, "Prospect ID to create sequence for"], 

283 sequence_name: Annotated[str, "Name for this follow-up sequence"], 

284 steps: Annotated[str, "JSON array of sequence steps, each with: delay_days, subject, body_html"], 

285 ) -> str: 

286 """Create an automated follow-up sequence for a prospect. 

287 

288 Each step has a delay (days after previous step), subject, and body. 

289 The agent daemon will check and execute pending steps daily. 

290 

291 Example steps JSON: 

292 [ 

293 {"delay_days": 3, "subject": "following up on HARTOS", "body_html": "<p>Hi...</p>"}, 

294 {"delay_days": 5, "subject": "quick update", "body_html": "<p>Hey...</p>"}, 

295 {"delay_days": 7, "subject": "last note", "body_html": "<p>Just wanted...</p>"} 

296 ] 

297 """ 

298 data = _load_prospects() 

299 prospect = data['prospects'].get(prospect_id) 

300 if not prospect: 

301 return json.dumps({'success': False, 'error': f'Prospect {prospect_id} not found'}) 

302 

303 try: 

304 step_list = json.loads(steps) if isinstance(steps, str) else steps 

305 except json.JSONDecodeError: 

306 return json.dumps({'success': False, 'error': 'Invalid JSON for steps'}) 

307 

308 sequence_id = f"seq_{prospect_id}_{int(time.time())}" 

309 

310 # Calculate scheduled dates from now 

311 base_time = datetime.utcnow() 

312 scheduled_steps = [] 

313 cumulative_days = 0 

314 for i, step in enumerate(step_list): 

315 cumulative_days += step.get('delay_days', 3) 

316 scheduled_steps.append({ 

317 'step_number': i + 2, # step 1 was the initial outreach 

318 'delay_days': step['delay_days'], 

319 'scheduled_at': (base_time + timedelta(days=cumulative_days)).isoformat(), 

320 'subject': step['subject'], 

321 'body_html': step['body_html'], 

322 'status': 'pending', # pending → sent → skipped (if replied) 

323 'sent_at': None, 

324 }) 

325 

326 sequence = { 

327 'id': sequence_id, 

328 'prospect_id': prospect_id, 

329 'name': sequence_name, 

330 'steps': scheduled_steps, 

331 'created_at': datetime.utcnow().isoformat(), 

332 'status': 'active', # active → paused → completed 

333 'exit_on_reply': True, 

334 } 

335 

336 data['sequences'][sequence_id] = sequence 

337 _save_prospects(data) 

338 

339 return json.dumps({'success': True, 'sequence': sequence}) 

340 

341 def check_pending_followups() -> str: 

342 """Check all active sequences for follow-ups that are due. 

343 

344 Sends emails for due steps, skips sequences where the prospect has replied. 

345 Returns summary of actions taken. 

346 """ 

347 return _check_pending_followups_impl() 

348 

349 def move_prospect_stage( 

350 prospect_id: Annotated[str, "Prospect ID"], 

351 new_stage: Annotated[str, "New stage: new|contacted|replied|meeting|negotiation|won|lost"], 

352 notes: Annotated[Optional[str], "Notes about the stage change"] = None, 

353 ) -> str: 

354 """Move a prospect to a new pipeline stage.""" 

355 valid_stages = ['new', 'contacted', 'replied', 'meeting', 'negotiation', 'won', 'lost'] 

356 if new_stage not in valid_stages: 

357 return json.dumps({'success': False, 'error': f'Invalid stage. Use: {valid_stages}'}) 

358 

359 data = _load_prospects() 

360 prospect = data['prospects'].get(prospect_id) 

361 if not prospect: 

362 return json.dumps({'success': False, 'error': f'Prospect {prospect_id} not found'}) 

363 

364 old_stage = prospect['stage'] 

365 prospect['stage'] = new_stage 

366 prospect['updated_at'] = datetime.utcnow().isoformat() 

367 if notes: 

368 prospect['notes'] = prospect.get('notes', '') + f'\n[{datetime.utcnow().isoformat()}] {old_stage}→{new_stage}: {notes}' 

369 

370 # If prospect replied, mark it 

371 if new_stage == 'replied': 

372 prospect['last_reply_at'] = datetime.utcnow().isoformat() 

373 

374 _save_prospects(data) 

375 

376 # Sync stage change to Erxes CRM 

377 _sync_stage_change(prospect, new_stage) 

378 

379 return json.dumps({'success': True, 'prospect': prospect, 'transition': f'{old_stage}→{new_stage}'}) 

380 

381 def get_pipeline_status() -> str: 

382 """Get the full outreach pipeline status — all prospects grouped by stage. 

383 

384 Merges local prospect data with Erxes CRM pipeline view. 

385 """ 

386 data = _load_prospects() 

387 pipeline = {} 

388 for pid, prospect in data.get('prospects', {}).items(): 

389 stage = prospect.get('stage', 'new') 

390 if stage not in pipeline: 

391 pipeline[stage] = [] 

392 pipeline[stage].append({ 

393 'id': pid, 

394 'company': prospect['company'], 

395 'contact': prospect['contact_name'], 

396 'email': prospect['email'], 

397 'emails_sent': prospect.get('emails_sent', 0), 

398 'last_email': prospect.get('last_email_at'), 

399 'tier': prospect.get('tier', 1), 

400 'erxes_deal_id': prospect.get('erxes_deal_id'), 

401 }) 

402 

403 # Count active sequences 

404 active_sequences = sum( 

405 1 for s in data.get('sequences', {}).values() 

406 if s.get('status') == 'active' 

407 ) 

408 

409 # Include Erxes CRM status if available 

410 erxes_status = None 

411 erxes = _get_erxes() 

412 if erxes: 

413 try: 

414 erxes_status = erxes.get_pipeline_status() 

415 except Exception as e: 

416 erxes_status = {'error': str(e)} 

417 

418 return json.dumps({ 

419 'success': True, 

420 'pipeline': pipeline, 

421 'total_prospects': len(data.get('prospects', {})), 

422 'active_sequences': active_sequences, 

423 'erxes_pipeline': erxes_status, 

424 }) 

425 

426 def list_sent_emails( 

427 prospect_id: Annotated[Optional[str], "Filter by prospect ID (optional)"] = None, 

428 limit: Annotated[int, "Max emails to return"] = 20, 

429 ) -> str: 

430 """List sent outreach emails, optionally filtered by prospect.""" 

431 data = _load_prospects() 

432 log = data.get('sent_log', []) 

433 

434 if prospect_id: 

435 log = [e for e in log if e.get('prospect_id') == prospect_id] 

436 

437 # Most recent first 

438 log = sorted(log, key=lambda x: x.get('sent_at', ''), reverse=True)[:limit] 

439 return json.dumps({'success': True, 'emails': log, 'total': len(log)}) 

440 

441 # ── Register all tools ── 

442 # Routed through core.labeled_autogen_function so the UI spinner 

443 # shows a tool-specific label (publish_chat_stage('tool_call', …)) 

444 # for autogen-invoked tools too. Same shape as LangChain's 

445 # _with_tool_logging chokepoint. UI labels live in 

446 # core/constants.py:TOOL_LABELS (single source of truth). 

447 from core.labeled_autogen_function import register_labeled_function 

448 from core.constants import TOOL_LABELS 

449 

450 for func in [ 

451 create_prospect, 

452 send_outreach_email, 

453 create_followup_sequence, 

454 check_pending_followups, 

455 move_prospect_stage, 

456 get_pipeline_status, 

457 list_sent_emails, 

458 ]: 

459 register_labeled_function( 

460 func, 

461 caller=helper, 

462 executor=assistant, 

463 description=func.__doc__, 

464 ui_label=TOOL_LABELS.get(func.__name__, f'Running {func.__name__}…'), 

465 ) 

466 

467 logger.info(f"Registered 7 outreach CRM tools for user {user_id}") 

468 

469 

470def _check_pending_followups_impl() -> str: 

471 """Shared logic for checking pending follow-ups. Caller must hold _prospect_lock. 

472 

473 Returns JSON string with actions taken. Used by both the tool-level 

474 check_pending_followups and the daemon-level check_pending_followups_daemon. 

475 """ 

476 data = _load_prospects() 

477 now = datetime.utcnow() 

478 actions_taken = [] 

479 

480 for seq_id, sequence in data.get('sequences', {}).items(): 

481 if sequence['status'] != 'active': 

482 continue 

483 

484 prospect = data['prospects'].get(sequence['prospect_id']) 

485 if not prospect: 

486 continue 

487 

488 # Exit condition: prospect already replied 

489 if sequence.get('exit_on_reply') and prospect.get('last_reply_at'): 

490 sequence['status'] = 'completed' 

491 actions_taken.append({ 

492 'action': 'sequence_completed', 

493 'reason': 'prospect_replied', 

494 'prospect': prospect['company'], 

495 }) 

496 continue 

497 

498 # Check each pending step 

499 for step in sequence['steps']: 

500 if step['status'] != 'pending': 

501 continue 

502 

503 scheduled = datetime.fromisoformat(step['scheduled_at']) 

504 if now >= scheduled: 

505 result = _send_email( 

506 to_email=prospect['email'], 

507 subject=step['subject'], 

508 html_body=step['body_html'], 

509 ) 

510 step['status'] = 'sent' 

511 step['sent_at'] = now.isoformat() 

512 

513 prospect['emails_sent'] = prospect.get('emails_sent', 0) + 1 

514 prospect['last_email_at'] = now.isoformat() 

515 

516 actions_taken.append({ 

517 'action': 'followup_sent', 

518 'prospect': prospect['company'], 

519 'step': step['step_number'], 

520 'subject': step['subject'], 

521 'result': result, 

522 }) 

523 break # Only send one step per check cycle 

524 

525 # Check if all steps are done 

526 if all(s['status'] != 'pending' for s in sequence['steps']): 

527 sequence['status'] = 'completed' 

528 

529 _save_prospects(data) 

530 return json.dumps({'success': True, 'actions': actions_taken, 'checked_at': now.isoformat()}) 

531 

532 

533# ── Goal Type Registration ── 

534 

535def build_outreach_prompt(goal_dict: Dict, product_dict: Dict = None) -> str: 

536 """Build the prompt for an outreach goal. 

537 

538 This prompt is sent to /chat → CREATE/REUSE pipeline. 

539 """ 

540 title = goal_dict.get('title', 'Sales Outreach') 

541 description = goal_dict.get('description', '') 

542 config = goal_dict.get('config', {}) 

543 

544 prospects_summary = '' 

545 data = _load_prospects() 

546 if data['prospects']: 

547 lines = [] 

548 for pid, p in data['prospects'].items(): 

549 lines.append(f"- {p['company']} ({p['contact_name']}, {p['email']}) — stage: {p['stage']}, emails sent: {p.get('emails_sent', 0)}") 

550 prospects_summary = '\n'.join(lines) 

551 

552 return f"""You are a B2B sales outreach agent for HevolveAI. 

553 

554PRODUCT: HARTOS (Hevolve Hive Agentic Runtime OS) 

555- On-device AI runtime for robotics: LLM inference, vision, speech, semantic memory, multi-agent orchestration 

556- Democratically evolving Hive network (not controlled by any single LLM vendor) 

557- Flywheel: more robots deployed → more data → better model → more developers → more robots 

558- Early partners shape how the intelligence evolves for their vertical 

559 

560GOAL: {title} 

561{description} 

562 

563CURRENT PIPELINE: 

564{prospects_summary or '(No prospects yet — use create_prospect to add them)'} 

565 

566RULES: 

5671. Sound human. No em dashes. Casual lowercase subject lines. Short sentences. 

5682. Create FOMO: one partner per vertical, Q2 deadline, name competitors who locked into Big Tech 

5693. Follow-up sequence: day 3, day 7, day 14 — each shorter and more direct 

5704. If a prospect replies, move them to 'replied' stage and stop the sequence 

5715. Use the outreach CRM tools to track everything 

572 

573CONFIG: {json.dumps(config)} 

574""" 

575 

576 

577def register_outreach_goal_type(): 

578 """Register 'outreach' as a goal type in the agent engine. 

579 

580 Call this during HARTOS boot to enable outreach automation. 

581 """ 

582 from integrations.agent_engine.goal_manager import register_goal_type 

583 register_goal_type( 

584 goal_type='outreach', 

585 build_prompt=build_outreach_prompt, 

586 tool_tags=['outreach', 'email', 'crm'], 

587 ) 

588 logger.info("Registered 'outreach' goal type with prompt builder and tool tags") 

589 # Wire reply detection into email channel 

590 register_reply_handler() 

591 

592 

593def check_pending_followups_daemon() -> dict: 

594 """Module-level follow-up checker called by agent_daemon on each tick. 

595 

596 Delegates to the tool-level check_pending_followups logic (DRY). 

597 Returns dict with 'sent' count. 

598 """ 

599 result_json = _check_pending_followups_impl() 

600 result = json.loads(result_json) 

601 sent = len([a for a in result.get('actions', []) if a.get('action') == 'followup_sent']) 

602 for action in result.get('actions', []): 

603 if action.get('action') == 'followup_sent': 

604 logger.info(f"Follow-up sent: {action.get('prospect')} step {action.get('step')} subject='{action.get('subject')}'") 

605 elif action.get('action') == 'sequence_completed': 

606 logger.info(f"Sequence completed: {action.get('prospect')} ({action.get('reason')})") 

607 return {'sent': sent, 'checked_at': result.get('checked_at', '')} 

608 

609 

610# ═══════════════════════════════════════════════════════════════ 

611# Inbound Reply Handler — wired into email channel adapter 

612# ═══════════════════════════════════════════════════════════════ 

613 

614def handle_inbound_email(sender_email: str, subject: str, body: str, message_id: str = '') -> Optional[Dict]: 

615 """Check if an inbound email matches a prospect. If so, update CRM and trigger response flow. 

616 

617 Called by the email channel adapter's _process_email hook. 

618 Returns match info if this is a prospect reply, None otherwise. 

619 """ 

620 data = _load_prospects() 

621 

622 # Match sender to any prospect by email 

623 matched_prospect = None 

624 for pid, prospect in data.get('prospects', {}).items(): 

625 if prospect.get('email', '').lower() == sender_email.lower(): 

626 matched_prospect = prospect 

627 break 

628 

629 if not matched_prospect: 

630 return None # Not a prospect — normal email flow 

631 

632 # ── Update prospect state ── 

633 matched_prospect['stage'] = 'replied' 

634 matched_prospect['last_reply_at'] = datetime.utcnow().isoformat() 

635 matched_prospect['updated_at'] = datetime.utcnow().isoformat() 

636 matched_prospect['notes'] = matched_prospect.get('notes', '') + ( 

637 f"\n[{datetime.utcnow().isoformat()}] REPLY received: {subject}" 

638 ) 

639 

640 # ── Pause all active sequences for this prospect ── 

641 for seq_id, sequence in data.get('sequences', {}).items(): 

642 if sequence.get('prospect_id') == matched_prospect['id'] and sequence['status'] == 'active': 

643 sequence['status'] = 'completed' 

644 logger.info(f"Sequence {seq_id} auto-completed: prospect {matched_prospect['company']} replied") 

645 

646 _save_prospects(data) 

647 

648 # ── Sync stage change to Erxes CRM ── 

649 _sync_stage_change(matched_prospect, 'replied') 

650 

651 # ── Build context for the agent to draft a response ── 

652 # Gather full conversation history 

653 sent_emails = [ 

654 e for e in data.get('sent_log', []) 

655 if e.get('prospect_id') == matched_prospect['id'] 

656 ] 

657 sent_emails.sort(key=lambda x: x.get('sent_at', '')) 

658 

659 context = { 

660 'prospect': matched_prospect, 

661 'their_reply': {'subject': subject, 'body': body[:2000], 'message_id': message_id}, 

662 'our_emails': [ 

663 {'subject': e.get('subject', ''), 'sent_at': e.get('sent_at', ''), 'step': e.get('sequence_step', 1)} 

664 for e in sent_emails[-5:] # Last 5 emails we sent 

665 ], 

666 'campaign_stage': matched_prospect.get('stage', 'replied'), 

667 'total_emails_sent': matched_prospect.get('emails_sent', 0), 

668 } 

669 

670 # ── Push notification to user ── 

671 _notify_prospect_replied(matched_prospect, subject, body, context) 

672 

673 # ── Dispatch agent to draft response ── 

674 _dispatch_response_draft(matched_prospect, context) 

675 

676 logger.info( 

677 f"Prospect reply detected: {matched_prospect['company']} ({sender_email}) " 

678 f"subject='{subject}' — stage moved to 'replied', sequences paused" 

679 ) 

680 return context 

681 

682 

683def _notify_prospect_replied(prospect: Dict, subject: str, body: str, context: Dict): 

684 """Push notification to user that a prospect replied. 

685 

686 Uses EventBus → Nunba notification channel. 

687 """ 

688 try: 

689 from core.platform.events import emit_event 

690 emit_event('outreach.prospect_replied', { 

691 'prospect_id': prospect['id'], 

692 'company': prospect['company'], 

693 'contact': prospect['contact_name'], 

694 'email': prospect['email'], 

695 'subject': subject, 

696 'body_preview': body[:200], 

697 'stage': prospect['stage'], 

698 'emails_sent': prospect.get('emails_sent', 0), 

699 }) 

700 except Exception as e: 

701 logger.debug(f"EventBus notification failed: {e}") 

702 

703 # Also try direct Nunba push via channel system 

704 try: 

705 from integrations.channels.agent_tools import _get_user_id_from_threadlocal 

706 from integrations.channels.response.router import get_response_router 

707 user_id = prospect.get('created_by') or _get_user_id_from_threadlocal() 

708 router = get_response_router() 

709 notification = ( 

710 f"📬 {prospect['company']} replied to your outreach!\n" 

711 f"From: {prospect['contact_name']} ({prospect['email']})\n" 

712 f"Subject: {subject}\n" 

713 f"Preview: {body[:150]}..." 

714 ) 

715 router.route_response(user_id=user_id, response_text=notification, fan_out=True) 

716 except Exception as e: 

717 logger.debug(f"Push notification failed: {e}") 

718 

719 

720def _dispatch_response_draft(prospect: Dict, context: Dict): 

721 """Dispatch an agent task to draft a response to the prospect's reply. 

722 

723 Uses the existing dispatch system — creates a goal that runs through 

724 the CREATE/REUSE pipeline with full conversation context. 

725 """ 

726 try: 

727 from integrations.agent_engine.dispatch import dispatch_goal 

728 user_id = prospect.get('created_by', 'system') 

729 

730 prompt = ( 

731 f"A prospect has replied to our outreach. Draft a response.\n\n" 

732 f"PROSPECT: {prospect['company']} — {prospect['contact_name']} ({prospect['email']})\n" 

733 f"STAGE: {prospect.get('stage', 'replied')}\n" 

734 f"THEIR REPLY:\nSubject: {context['their_reply']['subject']}\n" 

735 f"Body: {context['their_reply']['body']}\n\n" 

736 f"OUR PREVIOUS EMAILS ({len(context['our_emails'])} total):\n" 

737 ) 

738 for e in context['our_emails']: 

739 prompt += f" - Step {e['step']}: \"{e['subject']}\" (sent {e['sent_at']})\n" 

740 

741 prompt += ( 

742 f"\nRULES:\n" 

743 f"1. Be conversational, not salesy. They replied — that's interest.\n" 

744 f"2. Reference what they said specifically.\n" 

745 f"3. Propose a concrete next step (call, demo, meeting link).\n" 

746 f"4. Keep it short — 3-5 sentences max.\n" 

747 f"5. Do NOT send the email automatically — present the draft for user approval.\n" 

748 ) 

749 

750 dispatch_goal( 

751 prompt=prompt, 

752 user_id=user_id, 

753 goal_id=f"outreach_reply_{prospect['id']}_{int(time.time())}", 

754 goal_type='outreach', 

755 ) 

756 except Exception as e: 

757 logger.error(f"Failed to dispatch response draft: {e}") 

758 

759 

760def register_reply_handler(): 

761 """Register the inbound email handler with the email channel adapter. 

762 

763 Called during boot (from register_outreach_goal_type) to wire 

764 reply detection into the channel system. 

765 """ 

766 try: 

767 from integrations.channels.registry import get_registry 

768 registry = get_registry() 

769 adapter = registry.get_adapter('email') 

770 if adapter and hasattr(adapter, 'on_message'): 

771 async def _outreach_reply_hook(message): 

772 """Post-process inbound emails for prospect matching.""" 

773 sender = getattr(message, 'sender_id', '') or '' 

774 subject = getattr(message, 'metadata', {}).get('subject', '') 

775 body = getattr(message, 'text', '') 

776 msg_id = getattr(message, 'message_id', '') 

777 if '@' in sender: 

778 handle_inbound_email(sender, subject, body, msg_id) 

779 

780 adapter.on_message(_outreach_reply_hook) 

781 logger.info("Outreach reply handler registered with email adapter") 

782 except Exception as e: 

783 logger.debug(f"Could not register reply handler: {e}")