Coverage for integrations / agent_engine / erxes_client.py: 0.0%

245 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Native Erxes CRM v1 Client for HARTOS 

3 

4Handles: 

5 - Cookie-based authentication (login + auto-refresh) 

6 - Customer CRUD (create, find by email, update) 

7 - Company CRUD 

8 - Deal pipeline management (create, move stage, list) 

9 - Stage mapping between HARTOS names and Erxes IDs 

10 - Pipeline status queries 

11 

12Uses only stdlib (urllib, http.cookiejar) — no requests dependency. 

13 

14Environment: 

15 ERXES_API_URL — e.g. http://192.168.0.83:3300 

16 ERXES_EMAIL — admin email (default: sathish@hevolve.ai) 

17 ERXES_PASSWORD — admin password 

18 ERXES_BOARD_ID — deal board ID (auto-discovered if not set) 

19 ERXES_PIPELINE_ID — pipeline ID (auto-discovered if not set) 

20""" 

21import http.cookiejar 

22import json 

23import logging 

24import os 

25import threading 

26import time 

27import urllib.request 

28import urllib.error 

29from typing import Dict, List, Optional, Tuple 

30 

31logger = logging.getLogger('hevolve_erxes') 

32 

33# ── Singleton client ── 

34_client_instance = None 

35_client_lock = threading.Lock() 

36 

37 

38class ErxesCRMClient: 

39 """Thread-safe Erxes v1 GraphQL client with cookie-based auth.""" 

40 

41 def __init__(self, api_url: str, email: str, password: str): 

42 self.api_url = api_url.rstrip('/') 

43 self.graphql_url = self.api_url + '/graphql' 

44 self.email = email 

45 self.password = password 

46 

47 self._cookie_jar = http.cookiejar.CookieJar() 

48 self._opener = urllib.request.build_opener( 

49 urllib.request.HTTPCookieProcessor(self._cookie_jar) 

50 ) 

51 self._auth_lock = threading.Lock() 

52 self._logged_in = False 

53 self._login_time = 0 

54 

55 # Stage mapping: populated on first use 

56 self._stage_map = {} # name -> id 

57 self._stage_map_rev = {} # id -> name 

58 self._board_id = os.environ.get('ERXES_BOARD_ID', '') 

59 self._pipeline_id = os.environ.get('ERXES_PIPELINE_ID', '') 

60 

61 # ═══════════════════════════════════════════════════════════ 

62 # Core GraphQL transport 

63 # ═══════════════════════════════════════════════════════════ 

64 

65 def _gql(self, query: str, variables: Dict = None, retry_auth: bool = True) -> Dict: 

66 """Execute a GraphQL query/mutation. Auto-logs-in if needed.""" 

67 if not self._logged_in: 

68 self._login() 

69 

70 payload = {'query': query} 

71 if variables: 

72 payload['variables'] = variables 

73 

74 data = json.dumps(payload).encode('utf-8') 

75 req = urllib.request.Request( 

76 self.graphql_url, data=data, 

77 headers={'Content-Type': 'application/json'} 

78 ) 

79 try: 

80 resp = self._opener.open(req, timeout=15) 

81 result = json.loads(resp.read().decode('utf-8')) 

82 

83 # Check for auth errors 

84 errors = result.get('errors', []) 

85 if errors and any('Login required' in e.get('message', '') for e in errors): 

86 if retry_auth: 

87 self._logged_in = False 

88 self._login() 

89 return self._gql(query, variables, retry_auth=False) 

90 return result 

91 

92 except urllib.error.HTTPError as e: 

93 body = e.read().decode('utf-8', errors='replace')[:500] 

94 if 'Login required' in body and retry_auth: 

95 self._logged_in = False 

96 self._login() 

97 return self._gql(query, variables, retry_auth=False) 

98 return {'errors': [{'message': str(e), 'body': body}]} 

99 except Exception as e: 

100 return {'errors': [{'message': str(e)}]} 

101 

102 def _login(self): 

103 """Authenticate and store session cookie.""" 

104 with self._auth_lock: 

105 if self._logged_in and (time.time() - self._login_time) < 3600: 

106 return 

107 

108 payload = json.dumps({ 

109 'query': 'mutation Login($email: String!, $password: String!) { login(email: $email, password: $password) }', 

110 'variables': {'email': self.email, 'password': self.password} 

111 }).encode('utf-8') 

112 req = urllib.request.Request( 

113 self.graphql_url, data=payload, 

114 headers={'Content-Type': 'application/json'} 

115 ) 

116 try: 

117 resp = self._opener.open(req, timeout=15) 

118 result = json.loads(resp.read().decode('utf-8')) 

119 if result.get('data', {}).get('login') == 'loggedIn': 

120 self._logged_in = True 

121 self._login_time = time.time() 

122 logger.info('Erxes: logged in as %s', self.email) 

123 else: 

124 logger.error('Erxes login failed: %s', result) 

125 except Exception as e: 

126 logger.error('Erxes login error: %s', e) 

127 

128 # ═══════════════════════════════════════════════════════════ 

129 # Stage Mapping 

130 # ═══════════════════════════════════════════════════════════ 

131 

132 def _ensure_stage_map(self): 

133 """Load stage IDs from Erxes pipeline. Auto-discovers board/pipeline if not set.""" 

134 if self._stage_map: 

135 return 

136 

137 # Discover board if needed 

138 if not self._board_id: 

139 r = self._gql('{ boards(type: "deal") { _id name } }') 

140 boards = r.get('data', {}).get('boards', []) 

141 if boards: 

142 self._board_id = boards[0]['_id'] 

143 logger.info('Erxes: auto-discovered board %s (%s)', boards[0]['name'], self._board_id) 

144 

145 # Discover pipeline if needed — prefer the one that has stages 

146 if self._board_id and not self._pipeline_id: 

147 r = self._gql( 

148 'query($boardId: String!) { pipelines(boardId: $boardId) { _id name } }', 

149 {'boardId': self._board_id} 

150 ) 

151 pipelines = r.get('data', {}).get('pipelines', []) 

152 best = None 

153 for p in pipelines: 

154 sr = self._gql( 

155 'query($pid: String!) { stages(pipelineId: $pid) { _id } }', 

156 {'pid': p['_id']} 

157 ) 

158 stages = sr.get('data', {}).get('stages', []) 

159 if stages: 

160 best = p 

161 break 

162 if best: 

163 self._pipeline_id = best['_id'] 

164 logger.info('Erxes: auto-discovered pipeline %s (%s) with stages', best['name'], self._pipeline_id) 

165 elif pipelines: 

166 self._pipeline_id = pipelines[0]['_id'] 

167 logger.info('Erxes: auto-discovered pipeline %s (%s) (no stages)', pipelines[0]['name'], self._pipeline_id) 

168 

169 # Load stages 

170 if self._pipeline_id: 

171 r = self._gql( 

172 'query($pid: String!) { stages(pipelineId: $pid) { _id name order } }', 

173 {'pid': self._pipeline_id} 

174 ) 

175 stages = r.get('data', {}).get('stages', []) 

176 for s in stages: 

177 name_lower = s['name'].lower() 

178 self._stage_map[name_lower] = s['_id'] 

179 self._stage_map_rev[s['_id']] = name_lower 

180 logger.info('Erxes: loaded %d stages', len(self._stage_map)) 

181 

182 def stage_id(self, stage_name: str) -> Optional[str]: 

183 """Get Erxes stage ID from HARTOS stage name.""" 

184 self._ensure_stage_map() 

185 return self._stage_map.get(stage_name.lower()) 

186 

187 def stage_name(self, stage_id: str) -> Optional[str]: 

188 """Get HARTOS stage name from Erxes stage ID.""" 

189 self._ensure_stage_map() 

190 return self._stage_map_rev.get(stage_id) 

191 

192 # ═══════════════════════════════════════════════════════════ 

193 # Customer Operations 

194 # ═══════════════════════════════════════════════════════════ 

195 

196 def find_customer_by_email(self, email: str) -> Optional[Dict]: 

197 """Find a customer by email. Returns None if not found. 

198 

199 Uses paginated listing with client-side match because Erxes v1 

200 searchValue has ES field mapping issues. 

201 """ 

202 page = 1 

203 while page <= 10: # safety limit 

204 r = self._gql( 

205 '{ customers(page: %d, perPage: 50) ' 

206 '{ _id firstName lastName primaryEmail state } }' % page 

207 ) 

208 customers = r.get('data', {}).get('customers', []) 

209 if not customers: 

210 break 

211 for c in customers: 

212 if c.get('primaryEmail', '').lower() == email.lower(): 

213 return c 

214 page += 1 

215 return None 

216 

217 def create_customer(self, first_name: str, last_name: str, email: str, 

218 state: str = 'lead') -> Optional[Dict]: 

219 """Create a customer. Returns created customer or None on failure.""" 

220 # Check if already exists 

221 existing = self.find_customer_by_email(email) 

222 if existing: 

223 return existing 

224 

225 mutation = ''' 

226 mutation AddCustomer($firstName: String, $lastName: String, 

227 $primaryEmail: String, $state: String) { 

228 customersAdd(firstName: $firstName, lastName: $lastName, 

229 primaryEmail: $primaryEmail, state: $state) { 

230 _id firstName lastName primaryEmail 

231 } 

232 }''' 

233 r = self._gql(mutation, { 

234 'firstName': first_name, 

235 'lastName': last_name, 

236 'primaryEmail': email, 

237 'state': state, 

238 }) 

239 created = r.get('data', {}).get('customersAdd') 

240 errors = r.get('errors', []) 

241 is_url_error = errors and all('Url is invalid' in e.get('message', '') for e in errors) 

242 

243 if created: 

244 logger.info('Erxes: created customer %s %s (%s)', first_name, last_name, email) 

245 return created 

246 elif 'Duplicated email' in str(errors): 

247 return self.find_customer_by_email(email) 

248 elif is_url_error: 

249 # Customer was created despite webhook error -- find it 

250 found = self.find_customer_by_email(email) 

251 if found: 

252 logger.info('Erxes: created customer %s (%s) (webhook warning suppressed)', first_name, email) 

253 return found 

254 else: 

255 logger.warning('Erxes: customer create failed: %s', errors) 

256 return None 

257 

258 # ═══════════════════════════════════════════════════════════ 

259 # Deal Operations 

260 # ═══════════════════════════════════════════════════════════ 

261 

262 def create_deal(self, name: str, stage_name: str = 'new', 

263 customer_ids: List[str] = None) -> Optional[Dict]: 

264 """Create a deal in the pipeline at the specified stage.""" 

265 self._ensure_stage_map() 

266 sid = self.stage_id(stage_name) 

267 if not sid: 

268 logger.error('Erxes: unknown stage "%s"', stage_name) 

269 return None 

270 

271 mutation = ''' 

272 mutation AddDeal($name: String!, $stageId: String!, $customerIds: [String]) { 

273 dealsAdd(name: $name, stageId: $stageId, customerIds: $customerIds) { 

274 _id name stageId 

275 } 

276 }''' 

277 r = self._gql(mutation, { 

278 'name': name, 

279 'stageId': sid, 

280 'customerIds': customer_ids or [], 

281 }) 

282 deal = r.get('data', {}).get('dealsAdd') 

283 errors = r.get('errors', []) 

284 is_url_error = errors and all('Url is invalid' in e.get('message', '') for e in errors) 

285 

286 if deal: 

287 logger.info('Erxes: created deal "%s" in stage %s', name, stage_name) 

288 return deal 

289 elif is_url_error: 

290 # Deal was created in MongoDB despite webhook error -- find it 

291 found = self.find_deal_by_name(name) 

292 if found: 

293 logger.info('Erxes: created deal "%s" in stage %s (webhook warning suppressed)', name, stage_name) 

294 return found 

295 logger.warning('Erxes: deal create had webhook error but deal not found in DB') 

296 else: 

297 logger.warning('Erxes: deal create failed: %s', errors) 

298 return None 

299 

300 def move_deal(self, deal_id: str, new_stage_name: str) -> Optional[Dict]: 

301 """Move a deal to a different pipeline stage. 

302 

303 Note: Erxes v1 returns 'Url is invalid' error from a post-update 

304 webhook, but the DB mutation succeeds. We treat this as success. 

305 """ 

306 sid = self.stage_id(new_stage_name) 

307 if not sid: 

308 logger.error('Erxes: unknown stage "%s"', new_stage_name) 

309 return None 

310 

311 mutation = ''' 

312 mutation EditDeal($id: String!, $stageId: String) { 

313 dealsEdit(_id: $id, stageId: $stageId) { 

314 _id name stageId 

315 } 

316 }''' 

317 r = self._gql(mutation, {'id': deal_id, 'stageId': sid}) 

318 deal = r.get('data', {}).get('dealsEdit') 

319 

320 # "Url is invalid" is a non-fatal webhook error -- the DB update succeeds 

321 errors = r.get('errors', []) 

322 is_url_error = errors and all('Url is invalid' in e.get('message', '') for e in errors) 

323 if deal: 

324 logger.info('Erxes: moved deal %s to stage %s', deal_id, new_stage_name) 

325 return deal 

326 elif is_url_error: 

327 logger.info('Erxes: moved deal %s to stage %s (webhook warning suppressed)', deal_id, new_stage_name) 

328 return {'_id': deal_id, 'stageId': sid, 'webhook_warning': True} 

329 return None 

330 

331 def find_deal_by_name(self, name: str) -> Optional[Dict]: 

332 """Find a deal by name (partial match via client-side filter).""" 

333 r = self._gql('{ deals { _id name stageId customerIds } }') 

334 deals = r.get('data', {}).get('deals', []) 

335 name_lower = name.lower() 

336 for d in deals: 

337 if name_lower in d.get('name', '').lower(): 

338 return d 

339 return None 

340 

341 def list_deals_by_stage(self, stage_name: str) -> List[Dict]: 

342 """List all deals in a given stage.""" 

343 sid = self.stage_id(stage_name) 

344 if not sid: 

345 return [] 

346 r = self._gql('{ deals { _id name stageId customerIds } }') 

347 deals = r.get('data', {}).get('deals', []) 

348 return [d for d in deals if d.get('stageId') == sid] 

349 

350 def get_pipeline_status(self) -> Dict: 

351 """Get full pipeline status: deals grouped by stage with counts.""" 

352 self._ensure_stage_map() 

353 pipeline = {} 

354 total = 0 

355 for stage_name, stage_id in self._stage_map.items(): 

356 r = self._gql( 

357 'query($sid: String!) { deals(stageId: $sid) { _id name } }', 

358 {'sid': stage_id} 

359 ) 

360 deals = r.get('data', {}).get('deals', []) 

361 pipeline[stage_name] = { 

362 'stage_id': stage_id, 

363 'count': len(deals), 

364 'deals': [{'id': d['_id'], 'name': d['name']} for d in deals], 

365 } 

366 total += len(deals) 

367 return {'pipeline': pipeline, 'total_deals': total} 

368 

369 # ═══════════════════════════════════════════════════════════ 

370 # Prospect Sync — bidirectional HARTOS <-> Erxes 

371 # ═══════════════════════════════════════════════════════════ 

372 

373 def sync_prospect_to_erxes(self, prospect: Dict) -> Dict: 

374 """Sync a HARTOS prospect to Erxes (customer + deal). 

375 

376 Returns dict with erxes_customer_id and erxes_deal_id. 

377 Uses existing IDs from prospect if available to avoid duplicates. 

378 """ 

379 result = {'synced': False} 

380 

381 # Use existing customer ID or create/find 

382 customer_id = prospect.get('erxes_customer_id') 

383 if not customer_id: 

384 name_parts = prospect.get('contact_name', '').split(None, 1) 

385 first_name = name_parts[0] if name_parts else prospect.get('company', '') 

386 last_name = name_parts[1] if len(name_parts) > 1 else '' 

387 customer = self.create_customer(first_name, last_name, prospect['email']) 

388 if customer: 

389 customer_id = customer['_id'] 

390 

391 if customer_id: 

392 result['erxes_customer_id'] = customer_id 

393 

394 # Use existing deal ID or find/create 

395 deal_id = prospect.get('erxes_deal_id') 

396 if deal_id: 

397 # Already linked -- just ensure correct stage 

398 self._ensure_stage_map() 

399 target_stage = prospect.get('stage', 'new') 

400 sid = self.stage_id(target_stage) 

401 if sid: 

402 # Verify deal exists and move if needed 

403 self.move_deal(deal_id, target_stage) 

404 result['erxes_deal_id'] = deal_id 

405 else: 

406 # Find existing deal by company name 

407 deal = self.find_deal_by_name(prospect.get('company', '')) 

408 if not deal: 

409 deal_name = '%s - %s' % ( 

410 prospect.get('company', 'Unknown'), 

411 prospect.get('vertical', 'General').replace('_', ' ').title() 

412 ) 

413 deal = self.create_deal( 

414 name=deal_name, 

415 stage_name=prospect.get('stage', 'new'), 

416 customer_ids=[customer_id], 

417 ) 

418 if deal: 

419 result['erxes_deal_id'] = deal['_id'] 

420 

421 result['synced'] = True 

422 

423 return result 

424 

425 def sync_stage_change(self, prospect: Dict, new_stage: str) -> bool: 

426 """Sync a stage change from HARTOS to Erxes deal pipeline.""" 

427 deal_id = prospect.get('erxes_deal_id') 

428 if not deal_id: 

429 # Try to find the deal 

430 deal = self.find_deal_by_name(prospect.get('company', '')) 

431 if deal: 

432 deal_id = deal['_id'] 

433 

434 if deal_id: 

435 result = self.move_deal(deal_id, new_stage) 

436 return result is not None 

437 return False 

438 

439 # ═══════════════════════════════════════════════════════════ 

440 # Health / Status 

441 # ═══════════════════════════════════════════════════════════ 

442 

443 def is_available(self) -> bool: 

444 """Check if Erxes API is reachable and we can authenticate.""" 

445 try: 

446 self._login() 

447 return self._logged_in 

448 except Exception: 

449 return False 

450 

451 def status(self) -> Dict: 

452 """Get connection status and pipeline summary.""" 

453 available = self.is_available() 

454 result = { 

455 'available': available, 

456 'api_url': self.api_url, 

457 'email': self.email, 

458 'logged_in': self._logged_in, 

459 } 

460 if available: 

461 pipeline = self.get_pipeline_status() 

462 result['pipeline'] = pipeline 

463 return result 

464 

465 

466def get_erxes_client() -> Optional[ErxesCRMClient]: 

467 """Get or create the singleton Erxes client. 

468 

469 Returns None if ERXES_API_URL is not configured. 

470 """ 

471 global _client_instance 

472 

473 api_url = os.environ.get('ERXES_API_URL', '') 

474 if not api_url: 

475 return None 

476 

477 with _client_lock: 

478 if _client_instance is None: 

479 email = os.environ.get('ERXES_EMAIL', '') 

480 password = os.environ.get('ERXES_PASSWORD', '') 

481 if not email or not password: 

482 logger.warning('ERXES_EMAIL and ERXES_PASSWORD must be set') 

483 return None 

484 _client_instance = ErxesCRMClient(api_url, email, password) 

485 logger.info('Erxes client initialized: %s', api_url) 

486 return _client_instance