Coverage for integrations / agent_engine / erxes_client.py: 0.0%
245 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Native Erxes CRM v1 Client for HARTOS
4Handles:
5 - Cookie-based authentication (login + auto-refresh)
6 - Customer CRUD (create, find by email, update)
7 - Company CRUD
8 - Deal pipeline management (create, move stage, list)
9 - Stage mapping between HARTOS names and Erxes IDs
10 - Pipeline status queries
12Uses only stdlib (urllib, http.cookiejar) — no requests dependency.
14Environment:
15 ERXES_API_URL — e.g. http://192.168.0.83:3300
16 ERXES_EMAIL — admin email (default: sathish@hevolve.ai)
17 ERXES_PASSWORD — admin password
18 ERXES_BOARD_ID — deal board ID (auto-discovered if not set)
19 ERXES_PIPELINE_ID — pipeline ID (auto-discovered if not set)
20"""
21import http.cookiejar
22import json
23import logging
24import os
25import threading
26import time
27import urllib.request
28import urllib.error
29from typing import Dict, List, Optional, Tuple
31logger = logging.getLogger('hevolve_erxes')
33# ── Singleton client ──
34_client_instance = None
35_client_lock = threading.Lock()
38class ErxesCRMClient:
39 """Thread-safe Erxes v1 GraphQL client with cookie-based auth."""
41 def __init__(self, api_url: str, email: str, password: str):
42 self.api_url = api_url.rstrip('/')
43 self.graphql_url = self.api_url + '/graphql'
44 self.email = email
45 self.password = password
47 self._cookie_jar = http.cookiejar.CookieJar()
48 self._opener = urllib.request.build_opener(
49 urllib.request.HTTPCookieProcessor(self._cookie_jar)
50 )
51 self._auth_lock = threading.Lock()
52 self._logged_in = False
53 self._login_time = 0
55 # Stage mapping: populated on first use
56 self._stage_map = {} # name -> id
57 self._stage_map_rev = {} # id -> name
58 self._board_id = os.environ.get('ERXES_BOARD_ID', '')
59 self._pipeline_id = os.environ.get('ERXES_PIPELINE_ID', '')
61 # ═══════════════════════════════════════════════════════════
62 # Core GraphQL transport
63 # ═══════════════════════════════════════════════════════════
65 def _gql(self, query: str, variables: Dict = None, retry_auth: bool = True) -> Dict:
66 """Execute a GraphQL query/mutation. Auto-logs-in if needed."""
67 if not self._logged_in:
68 self._login()
70 payload = {'query': query}
71 if variables:
72 payload['variables'] = variables
74 data = json.dumps(payload).encode('utf-8')
75 req = urllib.request.Request(
76 self.graphql_url, data=data,
77 headers={'Content-Type': 'application/json'}
78 )
79 try:
80 resp = self._opener.open(req, timeout=15)
81 result = json.loads(resp.read().decode('utf-8'))
83 # Check for auth errors
84 errors = result.get('errors', [])
85 if errors and any('Login required' in e.get('message', '') for e in errors):
86 if retry_auth:
87 self._logged_in = False
88 self._login()
89 return self._gql(query, variables, retry_auth=False)
90 return result
92 except urllib.error.HTTPError as e:
93 body = e.read().decode('utf-8', errors='replace')[:500]
94 if 'Login required' in body and retry_auth:
95 self._logged_in = False
96 self._login()
97 return self._gql(query, variables, retry_auth=False)
98 return {'errors': [{'message': str(e), 'body': body}]}
99 except Exception as e:
100 return {'errors': [{'message': str(e)}]}
102 def _login(self):
103 """Authenticate and store session cookie."""
104 with self._auth_lock:
105 if self._logged_in and (time.time() - self._login_time) < 3600:
106 return
108 payload = json.dumps({
109 'query': 'mutation Login($email: String!, $password: String!) { login(email: $email, password: $password) }',
110 'variables': {'email': self.email, 'password': self.password}
111 }).encode('utf-8')
112 req = urllib.request.Request(
113 self.graphql_url, data=payload,
114 headers={'Content-Type': 'application/json'}
115 )
116 try:
117 resp = self._opener.open(req, timeout=15)
118 result = json.loads(resp.read().decode('utf-8'))
119 if result.get('data', {}).get('login') == 'loggedIn':
120 self._logged_in = True
121 self._login_time = time.time()
122 logger.info('Erxes: logged in as %s', self.email)
123 else:
124 logger.error('Erxes login failed: %s', result)
125 except Exception as e:
126 logger.error('Erxes login error: %s', e)
128 # ═══════════════════════════════════════════════════════════
129 # Stage Mapping
130 # ═══════════════════════════════════════════════════════════
132 def _ensure_stage_map(self):
133 """Load stage IDs from Erxes pipeline. Auto-discovers board/pipeline if not set."""
134 if self._stage_map:
135 return
137 # Discover board if needed
138 if not self._board_id:
139 r = self._gql('{ boards(type: "deal") { _id name } }')
140 boards = r.get('data', {}).get('boards', [])
141 if boards:
142 self._board_id = boards[0]['_id']
143 logger.info('Erxes: auto-discovered board %s (%s)', boards[0]['name'], self._board_id)
145 # Discover pipeline if needed — prefer the one that has stages
146 if self._board_id and not self._pipeline_id:
147 r = self._gql(
148 'query($boardId: String!) { pipelines(boardId: $boardId) { _id name } }',
149 {'boardId': self._board_id}
150 )
151 pipelines = r.get('data', {}).get('pipelines', [])
152 best = None
153 for p in pipelines:
154 sr = self._gql(
155 'query($pid: String!) { stages(pipelineId: $pid) { _id } }',
156 {'pid': p['_id']}
157 )
158 stages = sr.get('data', {}).get('stages', [])
159 if stages:
160 best = p
161 break
162 if best:
163 self._pipeline_id = best['_id']
164 logger.info('Erxes: auto-discovered pipeline %s (%s) with stages', best['name'], self._pipeline_id)
165 elif pipelines:
166 self._pipeline_id = pipelines[0]['_id']
167 logger.info('Erxes: auto-discovered pipeline %s (%s) (no stages)', pipelines[0]['name'], self._pipeline_id)
169 # Load stages
170 if self._pipeline_id:
171 r = self._gql(
172 'query($pid: String!) { stages(pipelineId: $pid) { _id name order } }',
173 {'pid': self._pipeline_id}
174 )
175 stages = r.get('data', {}).get('stages', [])
176 for s in stages:
177 name_lower = s['name'].lower()
178 self._stage_map[name_lower] = s['_id']
179 self._stage_map_rev[s['_id']] = name_lower
180 logger.info('Erxes: loaded %d stages', len(self._stage_map))
182 def stage_id(self, stage_name: str) -> Optional[str]:
183 """Get Erxes stage ID from HARTOS stage name."""
184 self._ensure_stage_map()
185 return self._stage_map.get(stage_name.lower())
187 def stage_name(self, stage_id: str) -> Optional[str]:
188 """Get HARTOS stage name from Erxes stage ID."""
189 self._ensure_stage_map()
190 return self._stage_map_rev.get(stage_id)
192 # ═══════════════════════════════════════════════════════════
193 # Customer Operations
194 # ═══════════════════════════════════════════════════════════
196 def find_customer_by_email(self, email: str) -> Optional[Dict]:
197 """Find a customer by email. Returns None if not found.
199 Uses paginated listing with client-side match because Erxes v1
200 searchValue has ES field mapping issues.
201 """
202 page = 1
203 while page <= 10: # safety limit
204 r = self._gql(
205 '{ customers(page: %d, perPage: 50) '
206 '{ _id firstName lastName primaryEmail state } }' % page
207 )
208 customers = r.get('data', {}).get('customers', [])
209 if not customers:
210 break
211 for c in customers:
212 if c.get('primaryEmail', '').lower() == email.lower():
213 return c
214 page += 1
215 return None
217 def create_customer(self, first_name: str, last_name: str, email: str,
218 state: str = 'lead') -> Optional[Dict]:
219 """Create a customer. Returns created customer or None on failure."""
220 # Check if already exists
221 existing = self.find_customer_by_email(email)
222 if existing:
223 return existing
225 mutation = '''
226 mutation AddCustomer($firstName: String, $lastName: String,
227 $primaryEmail: String, $state: String) {
228 customersAdd(firstName: $firstName, lastName: $lastName,
229 primaryEmail: $primaryEmail, state: $state) {
230 _id firstName lastName primaryEmail
231 }
232 }'''
233 r = self._gql(mutation, {
234 'firstName': first_name,
235 'lastName': last_name,
236 'primaryEmail': email,
237 'state': state,
238 })
239 created = r.get('data', {}).get('customersAdd')
240 errors = r.get('errors', [])
241 is_url_error = errors and all('Url is invalid' in e.get('message', '') for e in errors)
243 if created:
244 logger.info('Erxes: created customer %s %s (%s)', first_name, last_name, email)
245 return created
246 elif 'Duplicated email' in str(errors):
247 return self.find_customer_by_email(email)
248 elif is_url_error:
249 # Customer was created despite webhook error -- find it
250 found = self.find_customer_by_email(email)
251 if found:
252 logger.info('Erxes: created customer %s (%s) (webhook warning suppressed)', first_name, email)
253 return found
254 else:
255 logger.warning('Erxes: customer create failed: %s', errors)
256 return None
258 # ═══════════════════════════════════════════════════════════
259 # Deal Operations
260 # ═══════════════════════════════════════════════════════════
262 def create_deal(self, name: str, stage_name: str = 'new',
263 customer_ids: List[str] = None) -> Optional[Dict]:
264 """Create a deal in the pipeline at the specified stage."""
265 self._ensure_stage_map()
266 sid = self.stage_id(stage_name)
267 if not sid:
268 logger.error('Erxes: unknown stage "%s"', stage_name)
269 return None
271 mutation = '''
272 mutation AddDeal($name: String!, $stageId: String!, $customerIds: [String]) {
273 dealsAdd(name: $name, stageId: $stageId, customerIds: $customerIds) {
274 _id name stageId
275 }
276 }'''
277 r = self._gql(mutation, {
278 'name': name,
279 'stageId': sid,
280 'customerIds': customer_ids or [],
281 })
282 deal = r.get('data', {}).get('dealsAdd')
283 errors = r.get('errors', [])
284 is_url_error = errors and all('Url is invalid' in e.get('message', '') for e in errors)
286 if deal:
287 logger.info('Erxes: created deal "%s" in stage %s', name, stage_name)
288 return deal
289 elif is_url_error:
290 # Deal was created in MongoDB despite webhook error -- find it
291 found = self.find_deal_by_name(name)
292 if found:
293 logger.info('Erxes: created deal "%s" in stage %s (webhook warning suppressed)', name, stage_name)
294 return found
295 logger.warning('Erxes: deal create had webhook error but deal not found in DB')
296 else:
297 logger.warning('Erxes: deal create failed: %s', errors)
298 return None
300 def move_deal(self, deal_id: str, new_stage_name: str) -> Optional[Dict]:
301 """Move a deal to a different pipeline stage.
303 Note: Erxes v1 returns 'Url is invalid' error from a post-update
304 webhook, but the DB mutation succeeds. We treat this as success.
305 """
306 sid = self.stage_id(new_stage_name)
307 if not sid:
308 logger.error('Erxes: unknown stage "%s"', new_stage_name)
309 return None
311 mutation = '''
312 mutation EditDeal($id: String!, $stageId: String) {
313 dealsEdit(_id: $id, stageId: $stageId) {
314 _id name stageId
315 }
316 }'''
317 r = self._gql(mutation, {'id': deal_id, 'stageId': sid})
318 deal = r.get('data', {}).get('dealsEdit')
320 # "Url is invalid" is a non-fatal webhook error -- the DB update succeeds
321 errors = r.get('errors', [])
322 is_url_error = errors and all('Url is invalid' in e.get('message', '') for e in errors)
323 if deal:
324 logger.info('Erxes: moved deal %s to stage %s', deal_id, new_stage_name)
325 return deal
326 elif is_url_error:
327 logger.info('Erxes: moved deal %s to stage %s (webhook warning suppressed)', deal_id, new_stage_name)
328 return {'_id': deal_id, 'stageId': sid, 'webhook_warning': True}
329 return None
331 def find_deal_by_name(self, name: str) -> Optional[Dict]:
332 """Find a deal by name (partial match via client-side filter)."""
333 r = self._gql('{ deals { _id name stageId customerIds } }')
334 deals = r.get('data', {}).get('deals', [])
335 name_lower = name.lower()
336 for d in deals:
337 if name_lower in d.get('name', '').lower():
338 return d
339 return None
341 def list_deals_by_stage(self, stage_name: str) -> List[Dict]:
342 """List all deals in a given stage."""
343 sid = self.stage_id(stage_name)
344 if not sid:
345 return []
346 r = self._gql('{ deals { _id name stageId customerIds } }')
347 deals = r.get('data', {}).get('deals', [])
348 return [d for d in deals if d.get('stageId') == sid]
350 def get_pipeline_status(self) -> Dict:
351 """Get full pipeline status: deals grouped by stage with counts."""
352 self._ensure_stage_map()
353 pipeline = {}
354 total = 0
355 for stage_name, stage_id in self._stage_map.items():
356 r = self._gql(
357 'query($sid: String!) { deals(stageId: $sid) { _id name } }',
358 {'sid': stage_id}
359 )
360 deals = r.get('data', {}).get('deals', [])
361 pipeline[stage_name] = {
362 'stage_id': stage_id,
363 'count': len(deals),
364 'deals': [{'id': d['_id'], 'name': d['name']} for d in deals],
365 }
366 total += len(deals)
367 return {'pipeline': pipeline, 'total_deals': total}
369 # ═══════════════════════════════════════════════════════════
370 # Prospect Sync — bidirectional HARTOS <-> Erxes
371 # ═══════════════════════════════════════════════════════════
373 def sync_prospect_to_erxes(self, prospect: Dict) -> Dict:
374 """Sync a HARTOS prospect to Erxes (customer + deal).
376 Returns dict with erxes_customer_id and erxes_deal_id.
377 Uses existing IDs from prospect if available to avoid duplicates.
378 """
379 result = {'synced': False}
381 # Use existing customer ID or create/find
382 customer_id = prospect.get('erxes_customer_id')
383 if not customer_id:
384 name_parts = prospect.get('contact_name', '').split(None, 1)
385 first_name = name_parts[0] if name_parts else prospect.get('company', '')
386 last_name = name_parts[1] if len(name_parts) > 1 else ''
387 customer = self.create_customer(first_name, last_name, prospect['email'])
388 if customer:
389 customer_id = customer['_id']
391 if customer_id:
392 result['erxes_customer_id'] = customer_id
394 # Use existing deal ID or find/create
395 deal_id = prospect.get('erxes_deal_id')
396 if deal_id:
397 # Already linked -- just ensure correct stage
398 self._ensure_stage_map()
399 target_stage = prospect.get('stage', 'new')
400 sid = self.stage_id(target_stage)
401 if sid:
402 # Verify deal exists and move if needed
403 self.move_deal(deal_id, target_stage)
404 result['erxes_deal_id'] = deal_id
405 else:
406 # Find existing deal by company name
407 deal = self.find_deal_by_name(prospect.get('company', ''))
408 if not deal:
409 deal_name = '%s - %s' % (
410 prospect.get('company', 'Unknown'),
411 prospect.get('vertical', 'General').replace('_', ' ').title()
412 )
413 deal = self.create_deal(
414 name=deal_name,
415 stage_name=prospect.get('stage', 'new'),
416 customer_ids=[customer_id],
417 )
418 if deal:
419 result['erxes_deal_id'] = deal['_id']
421 result['synced'] = True
423 return result
425 def sync_stage_change(self, prospect: Dict, new_stage: str) -> bool:
426 """Sync a stage change from HARTOS to Erxes deal pipeline."""
427 deal_id = prospect.get('erxes_deal_id')
428 if not deal_id:
429 # Try to find the deal
430 deal = self.find_deal_by_name(prospect.get('company', ''))
431 if deal:
432 deal_id = deal['_id']
434 if deal_id:
435 result = self.move_deal(deal_id, new_stage)
436 return result is not None
437 return False
439 # ═══════════════════════════════════════════════════════════
440 # Health / Status
441 # ═══════════════════════════════════════════════════════════
443 def is_available(self) -> bool:
444 """Check if Erxes API is reachable and we can authenticate."""
445 try:
446 self._login()
447 return self._logged_in
448 except Exception:
449 return False
451 def status(self) -> Dict:
452 """Get connection status and pipeline summary."""
453 available = self.is_available()
454 result = {
455 'available': available,
456 'api_url': self.api_url,
457 'email': self.email,
458 'logged_in': self._logged_in,
459 }
460 if available:
461 pipeline = self.get_pipeline_status()
462 result['pipeline'] = pipeline
463 return result
466def get_erxes_client() -> Optional[ErxesCRMClient]:
467 """Get or create the singleton Erxes client.
469 Returns None if ERXES_API_URL is not configured.
470 """
471 global _client_instance
473 api_url = os.environ.get('ERXES_API_URL', '')
474 if not api_url:
475 return None
477 with _client_lock:
478 if _client_instance is None:
479 email = os.environ.get('ERXES_EMAIL', '')
480 password = os.environ.get('ERXES_PASSWORD', '')
481 if not email or not password:
482 logger.warning('ERXES_EMAIL and ERXES_PASSWORD must be set')
483 return None
484 _client_instance = ErxesCRMClient(api_url, email, password)
485 logger.info('Erxes client initialized: %s', api_url)
486 return _client_instance