Coverage for integrations / vlm / vlm_adapter.py: 98.3%

59 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2vlm_adapter.py - Three-tier VLM execution adapter. 

3 

4Follows the same pattern as Nunba's hartos_backend_adapter.py: 

5- Tier 1: In-process (NUNBA_BUNDLED + pyautogui + OmniParser direct import) 

6- Tier 2: HTTP local (flat mode, localhost:5001 screenshot/execute + localhost:8080 parse) 

7- Tier 3: Crossbar WAMP (central/regional mode - caller handles via subscribe_and_return) 

8 

9Reuses existing env vars: NUNBA_BUNDLED, sys.frozen, HEVOLVE_NODE_TIER. 

10Circuit breaker: 2 consecutive failures → skip tier (same as hartos_backend_adapter.py). 

11""" 

12 

13import os 

14import sys 

15import time 

16import logging 

17import threading 

18 

19logger = logging.getLogger('hevolve.vlm_adapter') 

20 

21# --------------------------------------------------------------------------- 

22# Reuse existing bundled-mode flag (same as hart_intelligence:329) 

23# --------------------------------------------------------------------------- 

24_BUNDLED_MODE = bool(os.environ.get('NUNBA_BUNDLED') or getattr(sys, 'frozen', False)) 

25 

26# --------------------------------------------------------------------------- 

27# Reuse existing node tier (same as create_recipe.py:285) 

28# --------------------------------------------------------------------------- 

29_node_tier = os.environ.get('HEVOLVE_NODE_TIER', 'flat') 

30 

31# --------------------------------------------------------------------------- 

32# Circuit breaker (same as hartos_backend_adapter.py:49-52) 

33# --------------------------------------------------------------------------- 

34_tier1_fail_count = 0 

35_tier2_fail_count = 0 

36_FAIL_THRESHOLD = 2 

37 

38# --------------------------------------------------------------------------- 

39# Tier 1: try direct import of pyautogui at module level 

40# --------------------------------------------------------------------------- 

41_HAS_PYAUTOGUI = False 

42try: 

43 import pyautogui # noqa: F401 

44 _HAS_PYAUTOGUI = True 

45except ImportError: 

46 pass 

47 

48# --------------------------------------------------------------------------- 

49# Probe cache (avoid hammering localhost every call) 

50# --------------------------------------------------------------------------- 

51_probe_cache = {'ts': 0, 'result': None} 

52_PROBE_TTL = 60 # seconds 

53 

54 

55def execute_vlm_instruction(message: dict) -> dict | None: 

56 """ 

57 Three-tier VLM execution. 

58 

59 Returns: 

60 dict - {status, extracted_responses, execution_time_seconds} on success 

61 None - signals caller to fall back to Tier 3 (Crossbar subscribe_and_return) 

62 """ 

63 global _tier1_fail_count, _tier2_fail_count 

64 

65 # Tier 1: In-process (deps available + circuit breaker open) 

66 # Standalone HARTOS with pyautogui works — no need for NUNBA_BUNDLED gate 

67 if _HAS_PYAUTOGUI and _tier1_fail_count < _FAIL_THRESHOLD: 

68 try: 

69 from integrations.vlm.local_loop import run_local_agentic_loop 

70 result = run_local_agentic_loop(message, tier='inprocess') 

71 _tier1_fail_count = 0 # reset on success 

72 return result 

73 except Exception as e: 

74 _tier1_fail_count += 1 

75 logger.warning( 

76 f"VLM Tier 1 (in-process) failed " 

77 f"({_tier1_fail_count}/{_FAIL_THRESHOLD}): {e}" 

78 ) 

79 

80 # Tier 2: HTTP local (flat mode + circuit breaker open) 

81 if _node_tier == 'flat' and _tier2_fail_count < _FAIL_THRESHOLD: 

82 try: 

83 from integrations.vlm.local_loop import run_local_agentic_loop 

84 result = run_local_agentic_loop(message, tier='http') 

85 _tier2_fail_count = 0 # reset on success 

86 return result 

87 except Exception as e: 

88 _tier2_fail_count += 1 

89 logger.warning( 

90 f"VLM Tier 2 (HTTP local) failed " 

91 f"({_tier2_fail_count}/{_FAIL_THRESHOLD}): {e}" 

92 ) 

93 

94 # Tier 3: Crossbar WAMP - return None so caller uses subscribe_and_return() 

95 logger.info("VLM routing to Tier 3 (Crossbar WAMP)") 

96 return None 

97 

98 

99def check_vlm_available() -> bool: 

100 """ 

101 Quick availability check - replaces the 2-second Crossbar ping. 

102 

103 Returns True if at least one tier is expected to work. 

104 """ 

105 # Tier 1: available if pyautogui present (standalone or bundled) 

106 if _HAS_PYAUTOGUI: 

107 return True 

108 

109 # Tier 2: flat mode - probe local services (cached) 

110 if _node_tier == 'flat': 

111 if _probe_local_services(): 

112 return True 

113 

114 # Tier 3: Crossbar assumed available for regional/central 

115 # (actual connectivity checked by subscribe_and_return at call time) 

116 return True 

117 

118 

119def _probe_local_services() -> bool: 

120 """Check if OmniParser (:8080) and omnitool-gui (:5001) are reachable.""" 

121 global _probe_cache 

122 now = time.time() 

123 if now - _probe_cache['ts'] < _PROBE_TTL and _probe_cache['result'] is not None: 

124 return _probe_cache['result'] 

125 

126 result = False 

127 try: 

128 import requests as _req 

129 # Quick health checks with short timeout 

130 omni_port = os.environ.get('OMNIPARSER_PORT', '8080') 

131 gui_port = os.environ.get('VLM_GUI_PORT', '5001') 

132 omni_ok = _req.get(f'http://localhost:{omni_port}/', timeout=2).status_code < 500 

133 gui_ok = _req.get(f'http://localhost:{gui_port}/', timeout=2).status_code < 500 

134 result = omni_ok and gui_ok 

135 except Exception: 

136 result = False 

137 

138 _probe_cache['ts'] = now 

139 _probe_cache['result'] = result 

140 return result 

141 

142 

143def reset_circuit_breakers(): 

144 """Reset all circuit breakers (useful after config change or reconnect).""" 

145 global _tier1_fail_count, _tier2_fail_count 

146 _tier1_fail_count = 0 

147 _tier2_fail_count = 0 

148 _probe_cache['ts'] = 0 

149 _probe_cache['result'] = None