Coverage for integrations / vlm / vlm_adapter.py: 98.3%
59 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2vlm_adapter.py - Three-tier VLM execution adapter.
4Follows the same pattern as Nunba's hartos_backend_adapter.py:
5- Tier 1: In-process (NUNBA_BUNDLED + pyautogui + OmniParser direct import)
6- Tier 2: HTTP local (flat mode, localhost:5001 screenshot/execute + localhost:8080 parse)
7- Tier 3: Crossbar WAMP (central/regional mode - caller handles via subscribe_and_return)
9Reuses existing env vars: NUNBA_BUNDLED, sys.frozen, HEVOLVE_NODE_TIER.
10Circuit breaker: 2 consecutive failures → skip tier (same as hartos_backend_adapter.py).
11"""
13import os
14import sys
15import time
16import logging
17import threading
19logger = logging.getLogger('hevolve.vlm_adapter')
21# ---------------------------------------------------------------------------
22# Reuse existing bundled-mode flag (same as hart_intelligence:329)
23# ---------------------------------------------------------------------------
24_BUNDLED_MODE = bool(os.environ.get('NUNBA_BUNDLED') or getattr(sys, 'frozen', False))
26# ---------------------------------------------------------------------------
27# Reuse existing node tier (same as create_recipe.py:285)
28# ---------------------------------------------------------------------------
29_node_tier = os.environ.get('HEVOLVE_NODE_TIER', 'flat')
31# ---------------------------------------------------------------------------
32# Circuit breaker (same as hartos_backend_adapter.py:49-52)
33# ---------------------------------------------------------------------------
34_tier1_fail_count = 0
35_tier2_fail_count = 0
36_FAIL_THRESHOLD = 2
38# ---------------------------------------------------------------------------
39# Tier 1: try direct import of pyautogui at module level
40# ---------------------------------------------------------------------------
41_HAS_PYAUTOGUI = False
42try:
43 import pyautogui # noqa: F401
44 _HAS_PYAUTOGUI = True
45except ImportError:
46 pass
48# ---------------------------------------------------------------------------
49# Probe cache (avoid hammering localhost every call)
50# ---------------------------------------------------------------------------
51_probe_cache = {'ts': 0, 'result': None}
52_PROBE_TTL = 60 # seconds
55def execute_vlm_instruction(message: dict) -> dict | None:
56 """
57 Three-tier VLM execution.
59 Returns:
60 dict - {status, extracted_responses, execution_time_seconds} on success
61 None - signals caller to fall back to Tier 3 (Crossbar subscribe_and_return)
62 """
63 global _tier1_fail_count, _tier2_fail_count
65 # Tier 1: In-process (deps available + circuit breaker open)
66 # Standalone HARTOS with pyautogui works — no need for NUNBA_BUNDLED gate
67 if _HAS_PYAUTOGUI and _tier1_fail_count < _FAIL_THRESHOLD:
68 try:
69 from integrations.vlm.local_loop import run_local_agentic_loop
70 result = run_local_agentic_loop(message, tier='inprocess')
71 _tier1_fail_count = 0 # reset on success
72 return result
73 except Exception as e:
74 _tier1_fail_count += 1
75 logger.warning(
76 f"VLM Tier 1 (in-process) failed "
77 f"({_tier1_fail_count}/{_FAIL_THRESHOLD}): {e}"
78 )
80 # Tier 2: HTTP local (flat mode + circuit breaker open)
81 if _node_tier == 'flat' and _tier2_fail_count < _FAIL_THRESHOLD:
82 try:
83 from integrations.vlm.local_loop import run_local_agentic_loop
84 result = run_local_agentic_loop(message, tier='http')
85 _tier2_fail_count = 0 # reset on success
86 return result
87 except Exception as e:
88 _tier2_fail_count += 1
89 logger.warning(
90 f"VLM Tier 2 (HTTP local) failed "
91 f"({_tier2_fail_count}/{_FAIL_THRESHOLD}): {e}"
92 )
94 # Tier 3: Crossbar WAMP - return None so caller uses subscribe_and_return()
95 logger.info("VLM routing to Tier 3 (Crossbar WAMP)")
96 return None
99def check_vlm_available() -> bool:
100 """
101 Quick availability check - replaces the 2-second Crossbar ping.
103 Returns True if at least one tier is expected to work.
104 """
105 # Tier 1: available if pyautogui present (standalone or bundled)
106 if _HAS_PYAUTOGUI:
107 return True
109 # Tier 2: flat mode - probe local services (cached)
110 if _node_tier == 'flat':
111 if _probe_local_services():
112 return True
114 # Tier 3: Crossbar assumed available for regional/central
115 # (actual connectivity checked by subscribe_and_return at call time)
116 return True
119def _probe_local_services() -> bool:
120 """Check if OmniParser (:8080) and omnitool-gui (:5001) are reachable."""
121 global _probe_cache
122 now = time.time()
123 if now - _probe_cache['ts'] < _PROBE_TTL and _probe_cache['result'] is not None:
124 return _probe_cache['result']
126 result = False
127 try:
128 import requests as _req
129 # Quick health checks with short timeout
130 omni_port = os.environ.get('OMNIPARSER_PORT', '8080')
131 gui_port = os.environ.get('VLM_GUI_PORT', '5001')
132 omni_ok = _req.get(f'http://localhost:{omni_port}/', timeout=2).status_code < 500
133 gui_ok = _req.get(f'http://localhost:{gui_port}/', timeout=2).status_code < 500
134 result = omni_ok and gui_ok
135 except Exception:
136 result = False
138 _probe_cache['ts'] = now
139 _probe_cache['result'] = result
140 return result
143def reset_circuit_breakers():
144 """Reset all circuit breakers (useful after config change or reconnect)."""
145 global _tier1_fail_count, _tier2_fail_count
146 _tier1_fail_count = 0
147 _tier2_fail_count = 0
148 _probe_cache['ts'] = 0
149 _probe_cache['result'] = None