Coverage for integrations / vlm / qwen3vl_backend.py: 79.4%

373 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2qwen3vl_backend.py - Unified Qwen3-VL backend for Computer Use. 

3 

4Replaces the 3-model pipeline (OmniParser + MiniCPM + separate LLM) with a 

5single Qwen3-VL call that handles screen parsing, bbox grounding, scene 

6description, and action reasoning in one pass. 

7 

8Qwen3-VL returns bounding boxes in normalized [0, 1000] coordinates. 

9This module converts them to pixel coordinates for pyautogui consumption. 

10 

11Usage: 

12 backend = get_qwen3vl_backend() 

13 result = backend.parse_and_reason(screenshot_b64, "Click the Save button") 

14 # result = {screen_info, parsed_content_list, action_json, reasoning} 

15""" 

16 

17import os 

18import io 

19import json 

20import re 

21import base64 

22import logging 

23import time 

24 

25logger = logging.getLogger('hevolve.vlm.qwen3vl_backend') 

26 

27_instance = None 

28 

29# Prompt for unified screen parsing + action reasoning 

30UNIFIED_PROMPT = """You are a computer use agent analyzing a screenshot. 

31 

32Task: {instruction} 

33 

34Analyze the screenshot and: 

351. Identify all visible UI elements (buttons, text fields, links, menus, icons, checkboxes, tabs). 

362. For each element, provide its bounding box as [x1, y1, x2, y2] in pixel coordinates. 

373. Given the task, decide the next action. 

38 

39Output ONLY valid JSON: 

40{{ 

41 "UI_Elements": [ 

42 {{"id": 1, "type": "button", "label": "element text", "bbox": [x1, y1, x2, y2]}}, 

43 ... 

44 ], 

45 "Reasoning": "Brief explanation of current screen state and why this action is needed", 

46 "Next Action": "left_click | right_click | double_click | type | key | hotkey | scroll_up | scroll_down | wait | None", 

47 "Box ID": null, 

48 "coordinate": [x, y], 

49 "value": "text to type or key to press (if applicable)", 

50 "Status": "IN_PROGRESS | DONE" 

51}} 

52 

53When the task is complete, set "Next Action": "None" and "Status": "DONE". 

54If clicking a UI element, set "Box ID" to the element's id and "coordinate" to its center.""" 

55 

56# Prompt for screen parsing only (drop-in replacement for OmniParser) 

57PARSE_ONLY_PROMPT = """Analyze this screenshot. List every visible UI element. 

58 

59For each element provide: 

60- Sequential ID number 

61- Element type (button, textfield, link, icon, menu, tab, checkbox, label, image, dropdown) 

62- Label or text content 

63- Bounding box as [x1, y1, x2, y2] in pixel coordinates 

64 

65Output ONLY valid JSON: 

66{{ 

67 "UI_Elements": [ 

68 {{"id": 1, "type": "button", "label": "Save", "bbox": [100, 50, 200, 80]}}, 

69 {{"id": 2, "type": "textfield", "label": "filename", "bbox": [210, 50, 400, 80]}} 

70 ] 

71}}""" 

72 

73 

74def get_qwen3vl_backend(): 

75 """Get singleton Qwen3VLBackend instance.""" 

76 global _instance 

77 if _instance is None: 

78 _instance = Qwen3VLBackend() 

79 return _instance 

80 

81 

82class Qwen3VLBackend: 

83 """Unified screen parsing + action reasoning via Qwen3-VL.""" 

84 

85 def __init__(self, base_url=None, model_name=None): 

86 try: 

87 from core.port_registry import get_port 

88 _llm_port = get_port('llm') 

89 except Exception as _port_err: 

90 # Sensible fallback - port_registry unavailable means we 

91 # are running outside the bundled Nunba context (test / 

92 # standalone). Honour HEVOLVE_LLM_PORT env or default 8080. 

93 logger.debug(f"port_registry unavailable, using env/8080: {_port_err}") 

94 _llm_port = int(os.environ.get('HEVOLVE_LLM_PORT', 8080)) 

95 self.base_url = base_url or os.environ.get( 

96 'HEVOLVE_VLM_ENDPOINT_URL', 

97 os.environ.get('HEVOLVE_LLM_ENDPOINT_URL', f'http://127.0.0.1:{_llm_port}/v1') 

98 ) 

99 self.model_name = model_name or os.environ.get( 

100 'HEVOLVE_VLM_MODEL_NAME', 

101 os.environ.get('HEVOLVE_LLM_MODEL_NAME', 'local') 

102 ) 

103 self.api_key = os.environ.get( 

104 'HEVOLVE_VLM_API_KEY', 

105 os.environ.get('HEVOLVE_LLM_API_KEY', 'dummy') 

106 ) 

107 self.timeout = int(os.environ.get('HEVOLVE_VLM_TIMEOUT', '90')) 

108 

109 def parse_and_reason(self, screenshot_b64, task_instruction, history=None): 

110 """ 

111 Single call: screenshot → UI elements + bbox + action decision. 

112 

113 Args: 

114 screenshot_b64: Base64-encoded PNG screenshot 

115 task_instruction: What the user wants done 

116 history: Optional conversation history (list of message dicts) 

117 Returns: 

118 dict with keys: 

119 - screen_info: str (ID→label text for display) 

120 - parsed_content_list: list of {id, type, label, bbox} 

121 - action_json: dict with Next Action, coordinate, value, Status 

122 - reasoning: str 

123 - latency: float 

124 """ 

125 prompt_text = UNIFIED_PROMPT.format(instruction=task_instruction) 

126 start = time.time() 

127 

128 messages = list(history) if history else [] 

129 messages.append({ 

130 "role": "user", 

131 "content": [ 

132 {"type": "text", "text": prompt_text}, 

133 {"type": "image_url", "image_url": { 

134 "url": f"data:image/jpeg;base64,{screenshot_b64}" 

135 }}, 

136 ] 

137 }) 

138 

139 raw = self._call_api(messages) 

140 latency = time.time() - start 

141 

142 parsed = self._parse_unified_response(raw) 

143 

144 # Get image dimensions for coordinate normalization 

145 img_w, img_h = self._get_image_dimensions(screenshot_b64) 

146 

147 # Build OmniParser-compatible output 

148 ui_elements = parsed.get('UI_Elements', []) 

149 normalized_elements = [] 

150 screen_info_lines = [] 

151 

152 for elem in ui_elements: 

153 bbox = elem.get('bbox', []) 

154 if len(bbox) == 4 and self._is_normalized_1000(bbox, img_w, img_h): 

155 bbox = self._normalize_bbox(bbox, img_w, img_h) 

156 

157 normalized_elements.append({ 

158 'idx': elem.get('id', 0), 

159 'type': elem.get('type', 'unknown'), 

160 'content': elem.get('label', ''), 

161 'bbox': bbox, 

162 }) 

163 screen_info_lines.append( 

164 f"{elem.get('id', 0)}: {elem.get('type', '')} \"{elem.get('label', '')}\"" 

165 ) 

166 

167 # Resolve Box ID → coordinate if needed 

168 action_json = { 

169 'Reasoning': parsed.get('Reasoning', ''), 

170 'Next Action': parsed.get('Next Action', 'None'), 

171 'Box ID': parsed.get('Box ID'), 

172 'coordinate': parsed.get('coordinate'), 

173 'value': parsed.get('value', ''), 

174 'Status': parsed.get('Status', 'IN_PROGRESS'), 

175 } 

176 

177 if action_json['coordinate'] is None and action_json['Box ID'] is not None: 

178 for elem in normalized_elements: 

179 if elem['idx'] == action_json['Box ID']: 

180 bbox = elem['bbox'] 

181 if len(bbox) == 4: 

182 action_json['coordinate'] = [ 

183 int((bbox[0] + bbox[2]) / 2), 

184 int((bbox[1] + bbox[3]) / 2), 

185 ] 

186 break 

187 

188 return { 

189 'screen_info': '\n'.join(screen_info_lines), 

190 'parsed_content_list': normalized_elements, 

191 'action_json': action_json, 

192 'reasoning': parsed.get('Reasoning', ''), 

193 'latency': latency, 

194 } 

195 

196 def parse_screen(self, screenshot_b64): 

197 """ 

198 Screen parsing only — drop-in replacement for local_omniparser.parse_screen. 

199 

200 Returns same dict format as OmniParser for backward compatibility. 

201 """ 

202 start = time.time() 

203 

204 messages = [{ 

205 "role": "user", 

206 "content": [ 

207 {"type": "text", "text": PARSE_ONLY_PROMPT}, 

208 {"type": "image_url", "image_url": { 

209 "url": f"data:image/jpeg;base64,{screenshot_b64}" 

210 }}, 

211 ] 

212 }] 

213 

214 raw = self._call_api(messages) 

215 latency = time.time() - start 

216 

217 parsed = self._parse_unified_response(raw) 

218 img_w, img_h = self._get_image_dimensions(screenshot_b64) 

219 

220 ui_elements = parsed.get('UI_Elements', []) 

221 content_list = [] 

222 screen_info_lines = [] 

223 

224 for elem in ui_elements: 

225 bbox = elem.get('bbox', []) 

226 if len(bbox) == 4 and self._is_normalized_1000(bbox, img_w, img_h): 

227 bbox = self._normalize_bbox(bbox, img_w, img_h) 

228 

229 content_list.append({ 

230 'idx': elem.get('id', 0), 

231 'type': elem.get('type', 'unknown'), 

232 'content': elem.get('label', ''), 

233 'bbox': bbox, 

234 }) 

235 screen_info_lines.append( 

236 f"{elem.get('id', 0)}: {elem.get('type', '')} \"{elem.get('label', '')}\"" 

237 ) 

238 

239 return { 

240 'screen_info': '\n'.join(screen_info_lines), 

241 'parsed_content_list': content_list, 

242 'som_image_base64': screenshot_b64, 

243 'original_screenshot_base64': screenshot_b64, 

244 'width': img_w, 

245 'height': img_h, 

246 'latency': latency, 

247 } 

248 

249 # Taskbar keywords — if task mentions any of these, use taskbar_list strategy 

250 _TASKBAR_KEYWORDS = { 

251 'taskbar', 'start button', 'start menu', 'search icon', 'search bar', 

252 'chrome', 'edge', 'firefox', 'file explorer', 'explorer icon', 

253 'clock', 'time display', 'system tray', 'notification', 'volume', 

254 'wifi', 'network', 'battery', 'spotify', 'discord', 'teams', 

255 'pinned', 'xbox', 'game bar', 

256 # App names that are typically in the taskbar 

257 'open chrome', 'open edge', 'open firefox', 'open explorer', 

258 'open spotify', 'open discord', 'open teams', 'open steam', 

259 'launch chrome', 'launch edge', 'launch firefox', 

260 } 

261 

262 # Action keywords for detecting non-click actions from task text 

263 _RIGHT_CLICK_KEYWORDS = {'right-click', 'right click', 'context menu', 'rightclick'} 

264 _DOUBLE_CLICK_KEYWORDS = {'double-click', 'double click', 'doubleclick'} 

265 _SCROLL_DOWN_KEYWORDS = {'scroll down', 'scroll below', 'page down'} 

266 _SCROLL_UP_KEYWORDS = {'scroll up', 'scroll above', 'page up'} 

267 

268 def _get_os_context(self): 

269 """Get OS window list with foreground/z-index info for grounding context.""" 

270 try: 

271 import subprocess, platform 

272 from core.subprocess_safe import hidden_popen_kwargs 

273 _hide = hidden_popen_kwargs() 

274 _os = platform.system() 

275 if _os == 'Windows': 

276 # Get foreground window title via PowerShell 

277 # _hide adds CREATE_NO_WINDOW so the powershell child does 

278 # not flicker a console window on every VLM probe. 

279 _fg = subprocess.run( 

280 ['powershell', '-NoProfile', '-Command', 

281 'Add-Type @"\nusing System;\nusing System.Runtime.InteropServices;\n' 

282 'public class FG { [DllImport("user32.dll")] public static extern IntPtr GetForegroundWindow(); ' 

283 '[DllImport("user32.dll")] public static extern int GetWindowText(IntPtr h, System.Text.StringBuilder t, int c); }\n"@; ' 

284 '$h=[FG]::GetForegroundWindow(); $sb=New-Object System.Text.StringBuilder 256; ' 

285 '[void][FG]::GetWindowText($h,$sb,256); $sb.ToString()'], 

286 capture_output=True, text=True, timeout=5, **_hide) 

287 fg_title = _fg.stdout.strip() if _fg.returncode == 0 else '' 

288 

289 # Get all windows 

290 _r = subprocess.run( 

291 ['powershell', '-NoProfile', '-Command', 

292 'Get-Process | Where-Object {$_.MainWindowTitle -ne ""} | ' 

293 'Select-Object ProcessName, MainWindowTitle | ConvertTo-Json'], 

294 capture_output=True, text=True, timeout=5, **_hide) 

295 if _r.returncode == 0: 

296 _wins = json.loads(_r.stdout) 

297 if isinstance(_wins, dict): 

298 _wins = [_wins] 

299 _win_list = ', '.join(f'{w["ProcessName"]}:{w["MainWindowTitle"]}' 

300 for w in _wins if w.get('MainWindowTitle')) 

301 fg_info = f' FOREGROUND (topmost): "{fg_title}".' if fg_title else '' 

302 return f'OS: Windows.{fg_info} Open windows: [{_win_list}]\n' 

303 elif _os == 'Linux': 

304 # Get foreground window 

305 _fg = subprocess.run(['xdotool', 'getactivewindow', 'getwindowname'], 

306 capture_output=True, text=True, timeout=3) 

307 fg_title = _fg.stdout.strip() if _fg.returncode == 0 else '' 

308 _r = subprocess.run(['wmctrl', '-l'], capture_output=True, text=True, timeout=3) 

309 if _r.returncode == 0: 

310 fg_info = f' FOREGROUND: "{fg_title}".' if fg_title else '' 

311 return f'OS: Linux.{fg_info} Open windows: [{_r.stdout.strip()}]\n' 

312 elif _os == 'Darwin': 

313 # Get frontmost app 

314 _fg = subprocess.run( 

315 ['osascript', '-e', 

316 'tell application "System Events" to get name of first process whose frontmost is true'], 

317 capture_output=True, text=True, timeout=3) 

318 fg_title = _fg.stdout.strip() if _fg.returncode == 0 else '' 

319 _r = subprocess.run( 

320 ['osascript', '-e', 

321 'tell application "System Events" to get name of every process whose visible is true'], 

322 capture_output=True, text=True, timeout=3) 

323 if _r.returncode == 0: 

324 fg_info = f' FOREGROUND: "{fg_title}".' if fg_title else '' 

325 return f'OS: macOS.{fg_info} Visible apps: [{_r.stdout.strip()}]\n' 

326 except Exception as e: 

327 # OS-context probes are nice-to-have - the VLM still 

328 # works without them. Log so silent-fallback doesn't 

329 # mask a broken probe (osascript/wmctrl/PowerShell missing). 

330 logger.debug(f"_get_os_context probe failed: {e}") 

331 return '' 

332 

333 def _detect_action_type(self, task, raw_response=''): 

334 """Detect action type from task text and VLM response. 

335 

336 Returns one of: left_click, right_click, double_click, scroll_up, scroll_down, type, done 

337 """ 

338 task_lower = task.lower() 

339 raw_lower = raw_response.lower() 

340 combined = task_lower + ' ' + raw_lower 

341 

342 if any(kw in combined for kw in self._RIGHT_CLICK_KEYWORDS): 

343 return 'right_click' 

344 if any(kw in combined for kw in self._DOUBLE_CLICK_KEYWORDS): 

345 return 'double_click' 

346 if any(kw in combined for kw in self._SCROLL_DOWN_KEYWORDS): 

347 return 'scroll_down' 

348 if any(kw in combined for kw in self._SCROLL_UP_KEYWORDS): 

349 return 'scroll_up' 

350 return 'left_click' 

351 

352 def _parse_action_response(self, raw, img_w, img_h, task=''): 

353 """Parse VLM response into action dict. Returns 

354 ``(result_dict, nx, ny)`` or ``(result_dict, None, None)``. 

355 

356 Phase 5: thin shim onto :func:`integrations.vlm.parser.parse_vlm_action` 

357 with ``expected_shape='point_only'``. The byte-equivalent 

358 legacy fields are reproduced via 

359 :meth:`ParsedAction.to_point_action_dict`. 

360 

361 ``img_w/img_h`` arg kept for back-compat — historically the 

362 function fell back to image dims when pyautogui.size() failed. 

363 Pyautogui screen size is the source of truth (we use it for 

364 the actual click), so we pass it through to the parser as 

365 the scaling target. 

366 """ 

367 from integrations.vlm.parser import parse_vlm_action 

368 try: 

369 import pyautogui as _pag 

370 _screen_w, _screen_h = _pag.size() 

371 except Exception as _pag_err: 

372 # Pyautogui can fail when no display is attached (CI / 

373 # headless). Fall back to image dims so the parser at 

374 # least produces stable norm_x/norm_y; downstream callers 

375 # that need true screen px will see them mismatch. 

376 logger.debug(f"pyautogui.size() unavailable, using image dims: {_pag_err}") 

377 _screen_w, _screen_h = img_w, img_h 

378 pa = parse_vlm_action( 

379 raw, expected_shape='point_only', 

380 task=task, 

381 screen_w=_screen_w, screen_h=_screen_h, 

382 detect_action_type=self._detect_action_type, 

383 scroll_down_keywords=self._SCROLL_DOWN_KEYWORDS, 

384 scroll_up_keywords=self._SCROLL_UP_KEYWORDS, 

385 ) 

386 return pa.to_point_action_dict(), pa.norm_x, pa.norm_y 

387 

388 def _is_taskbar_task(self, task): 

389 """Check if task involves taskbar elements.""" 

390 task_lower = task.lower() 

391 return any(kw in task_lower for kw in self._TASKBAR_KEYWORDS) 

392 

393 def _taskbar_list_lookup(self, screenshot_b64, target_name): 

394 """ 

395 Taskbar list strategy: ask model to list ALL taskbar icons with coords, 

396 then find the target by name. Avg error=50, best for taskbar targets. 

397 

398 Two-pass matching: first ask for the full list, then ask the model 

399 which item matches the target (avoids naive keyword matching). 

400 """ 

401 list_raw = self._call_api([{ 

402 "role": "user", 

403 "content": [ 

404 {"type": "text", "text": ( 

405 'List every icon in the taskbar at the bottom of the screen, from LEFT to RIGHT. ' 

406 'For each icon give its <point>x,y</point> location. Format:\n' 

407 '1. [icon name] <point>x,y</point>\n' 

408 '2. [icon name] <point>x,y</point>\n...' 

409 )}, 

410 {"type": "image_url", "image_url": { 

411 "url": f"data:image/jpeg;base64,{screenshot_b64}" 

412 }}, 

413 ] 

414 }]) 

415 

416 # Extract all items with coords from the list 

417 items = [] 

418 for line in list_raw.split('\n'): 

419 m = re.search(r'<point>\s*(\d+)\s*,\s*(\d+)\s*</point>', line) 

420 if m: 

421 items.append((int(m.group(1)), int(m.group(2)), line.strip())) 

422 

423 if not items: 

424 return None, list_raw 

425 

426 # Smart matching: extract target keywords and score each item 

427 # Map common task phrases to icon names 

428 _ALIASES = { 

429 'start': ['start', 'windows', 'menu'], 

430 'search': ['search', 'magnif'], 

431 'chrome': ['chrome', 'google'], 

432 'edge': ['edge', 'microsoft edge'], 

433 'explorer': ['explorer', 'file', 'folder'], 

434 'clock': ['clock', 'time', 'date'], 

435 'volume': ['volume', 'sound', 'speaker'], 

436 'network': ['network', 'wifi', 'internet'], 

437 } 

438 

439 task_lower = target_name.lower() 

440 search_terms = [] 

441 for key, aliases in _ALIASES.items(): 

442 if key in task_lower: 

443 search_terms.extend(aliases) 

444 if not search_terms: 

445 # Fallback: use significant words from task 

446 search_terms = [w for w in task_lower.split() if len(w) > 2 

447 and w not in ('the', 'click', 'open', 'icon', 'button', 'taskbar')] 

448 

449 best_match = None 

450 best_score = 0 

451 for nx, ny, line_text in items: 

452 line_lower = line_text.lower() 

453 score = sum(1 for term in search_terms if term in line_lower) 

454 if score > best_score: 

455 best_score = score 

456 best_match = (nx, ny, line_text) 

457 

458 return best_match, list_raw 

459 

460 # ─── Phase 10: P2P inference resolver ────────────────────────────── 

461 # Mobile devices can't competitively run a 4B+ multimodal model; 

462 # they capture, transmit to a paired peer (typically the user's 

463 # desktop Nunba) for inference, then execute the action locally. 

464 # This resolver picks where the VLM call goes based on 

465 # intelligence_preference + reachability. Plan §8 / §10. 

466 

467 def dispatch_inference(self, request: dict, *, 

468 peer_dispatch=None, 

469 intelligence_preference: str = 'hybrid' 

470 ) -> dict: 

471 """Pick the right tier for a VLM inference request and run it. 

472 

473 Args: 

474 request: dict with at least 

475 ``{'method', 'screenshot_b64', 'task'}``. Optional 

476 keys: ``history``, ``window_rect``, ``platform``, 

477 ``request_id``, ``prefer_local``. 

478 peer_dispatch: optional callable 

479 ``peer_dispatch(channel, payload, timeout)`` to route 

480 to a paired peer over PeerLink. When None, only 

481 local + cloud tiers are considered. 

482 intelligence_preference: ``'local_only'`` (default for 

483 desktop) | ``'hybrid'`` (try local first, peer as 

484 fallback) | ``'hive'`` (prefer peer/hive when local 

485 is busy or unreachable). 

486 

487 Returns: 

488 dict with grounding result + ``'tier'`` field set to 

489 whichever path executed: ``'local'`` | ``'paired_peer'`` 

490 | ``'hive'`` | ``'cloud'`` | ``'no_route'``. 

491 """ 

492 method = request.get('method', 'point_and_act') 

493 screenshot_b64 = request.get('screenshot_b64', '') 

494 task = request.get('task', '') 

495 history = request.get('history') 

496 prefer_local = request.get('prefer_local', True) 

497 

498 local_available = self._is_local_vlm_available() 

499 

500 # Tier orderings per plan §10: 

501 # local_only → local (or no_route) 

502 # hybrid → local → paired_peer → hive → cloud (always all 4) 

503 # hive → paired_peer → hive → local → cloud 

504 # Reviewer flagged that the prior 'hybrid' order excluded 

505 # 'cloud' when local was reachable, which contradicted the 

506 # plan's "fall through all four tiers" wording. Now matches. 

507 if intelligence_preference == 'local_only': 

508 tiers = ['local'] if local_available else [] 

509 elif intelligence_preference == 'hive': 

510 tiers = ['paired_peer', 'hive'] 

511 if local_available: 

512 tiers.append('local') 

513 tiers.append('cloud') 

514 else: # 'hybrid' (default) 

515 tiers = [] 

516 if local_available and prefer_local: 

517 tiers.append('local') 

518 tiers += ['paired_peer', 'hive'] 

519 if local_available and not prefer_local: 

520 tiers.append('local') 

521 tiers.append('cloud') 

522 

523 for tier in tiers: 

524 try: 

525 if tier == 'local': 

526 result = self._dispatch_local(method, screenshot_b64, 

527 task, history) 

528 elif tier == 'paired_peer': 

529 if peer_dispatch is None: 

530 continue 

531 result = self._dispatch_paired_peer( 

532 request, peer_dispatch) 

533 if result is None: 

534 continue 

535 elif tier == 'hive': 

536 if peer_dispatch is None: 

537 continue 

538 result = self._dispatch_hive(request, peer_dispatch) 

539 if result is None: 

540 continue 

541 elif tier == 'cloud': 

542 result = self._dispatch_cloud(request) 

543 if result is None: 

544 continue 

545 else: 

546 continue 

547 result['tier'] = tier 

548 return result 

549 except Exception as e: 

550 logger.debug(f'tier {tier} failed: {e}') 

551 continue 

552 

553 return {'tier': 'no_route', 

554 'error': f'no inference path available ' 

555 f'(intelligence_preference={intelligence_preference})'} 

556 

557 def _is_local_vlm_available(self) -> bool: 

558 """Quick reachability probe for the local VLM endpoint. 

559 

560 Uses ``self.base_url`` (constructor attribute) — earlier 

561 version of this method referenced ``self.api_url`` which 

562 doesn't exist; reviewer caught the typo before it shipped 

563 to a real caller. Llama-server's /health returns 200 OK 

564 when ready, 503 when warming up, anything else when down. 

565 """ 

566 try: 

567 from core.http_pool import pooled_get 

568 health_url = self.base_url.rstrip('/').replace('/v1', '') + '/health' 

569 r = pooled_get(health_url, timeout=1) 

570 return r.status_code == 200 

571 except Exception as e: 

572 logger.debug(f'_is_local_vlm_available probe failed: {e}') 

573 return False 

574 

575 def _dispatch_local(self, method, screenshot_b64, task, history): 

576 """Execute the requested method against the local VLM.""" 

577 if method == 'parse_and_reason': 

578 return self.parse_and_reason(screenshot_b64, task, 

579 history=history) 

580 if method == 'point_and_act': 

581 return self.point_and_act(screenshot_b64, task, 

582 history=history) 

583 # Default to point_and_act for unknown methods. 

584 return self.point_and_act(screenshot_b64, task, history=history) 

585 

586 def _dispatch_paired_peer(self, request, peer_dispatch): 

587 """Route to a paired peer over the PeerLink compute channel. 

588 Same wire shape both sides agree on (see plan §8 for the 

589 request/response schemas).""" 

590 try: 

591 payload = dict(request, type='vlm_grounding') 

592 response = peer_dispatch('compute', payload, timeout=60) 

593 if response and response.get('type') == 'vlm_grounding_result': 

594 return response 

595 except Exception as e: 

596 logger.debug(f'paired peer dispatch failed: {e}') 

597 return None 

598 

599 def _dispatch_hive(self, request, peer_dispatch): 

600 """Same shape as paired_peer but routed via hivemind channel 

601 for hive-grade VLM nodes (compute-host tier).""" 

602 try: 

603 payload = dict(request, type='vlm_grounding') 

604 response = peer_dispatch('hivemind', payload, timeout=60) 

605 if response and response.get('type') == 'vlm_grounding_result': 

606 return response 

607 except Exception as e: 

608 logger.debug(f'hive dispatch failed: {e}') 

609 return None 

610 

611 def _dispatch_cloud(self, request): 

612 """Last resort — Hevolve.ai cloud VLM via WorldModelBridge.""" 

613 try: 

614 from integrations.world_model_bridge import dispatch_to_cloud 

615 except ImportError: 

616 return None 

617 try: 

618 return dispatch_to_cloud('vlm_grounding', request) 

619 except Exception as e: 

620 logger.debug(f'cloud dispatch failed: {e}') 

621 return None 

622 

623 # ─── Phase 3.5: Complementary path router ────────────────────────── 

624 # The keystone of vlm_best_of_all_worlds_plan.md. The three sibling 

625 # methods (point_and_act / parse_and_reason / run_local_agentic_loop) 

626 # aren't competitors — each has a real specialty. route_task picks 

627 # the right path per task class instead of always hitting the same 

628 # primary first. See plan §13 for the full design rationale. 

629 

630 # Compiled at module-import time. Word-boundary anchored so 'list' 

631 # inside 'specialist' doesn't trip the enumerate route. Patterns 

632 # ordered most-specific-first within each list. 

633 _ENUMERATE_PATTERNS = [ 

634 re.compile(r'\blist (?:all|every|each)\b', re.I), 

635 re.compile(r"\bwhat(?:\'s| is) on (?:the )?screen\b", re.I), 

636 re.compile(r'\bshow me (?:all|every|each)\b', re.I), 

637 re.compile(r'\bfind all\b', re.I), 

638 re.compile(r'\benumerate\b', re.I), 

639 re.compile(r'\bevery (?:clickable|button|icon|element|link|item)\b', 

640 re.I), 

641 re.compile(r'\bhow many\b', re.I), 

642 ] 

643 _MULTI_STEP_PATTERNS = [ 

644 re.compile(r'\b(?:and then|after that|then click|then type)\b', 

645 re.I), 

646 re.compile(r'\bnavigate to\b', re.I), 

647 re.compile(r'\bfill (?:in|out)\b', re.I), 

648 re.compile( 

649 r'\b(?:open|launch|start|run)\b.+\band\b.+' 

650 r'\b(?:click|type|select|press|enter|play|search)\b', 

651 re.I, 

652 ), 

653 re.compile(r'\b(?:step \d+|first[,.]?\s+then|step-by-step)\b', 

654 re.I), 

655 ] 

656 

657 def route_task(self, task: str, context: dict = None) -> str: 

658 """Pick the best grounding path for *task*. 

659 

660 Returns one of: 

661 ``'enumerate'`` — task asks about multiple/all UI elements 

662 → use :meth:`parse_and_reason` for SoM 

663 bbox view (revives the otherwise-dead path) 

664 ``'multi_step'`` — task chains multiple actions 

665 → caller should drive 

666 :func:`integrations.vlm.local_loop.run_local_agentic_loop` 

667 ``'single_shot'`` — one action on one target (default) 

668 → use :meth:`point_and_act` 

669 

670 Heuristic v1 (this implementation): keyword classifier on 

671 the task string only. Fast (microseconds), no VLM call. 

672 Plan §13 v2: the draft 0.8B can self-classify in the same 

673 prompt that produces the action — defer until v1 baseline 

674 is established. 

675 

676 Empty / None task returns 'single_shot' (the safest default — 

677 single VLM call, no over-commitment to a multi-iter loop). 

678 

679 ``context`` reserved for future use (re-dispatch hints from 

680 prior iterations: e.g. the loop's body sees ``Status: DONE`` 

681 after one click and feeds back ``{'observed_done_after': 1}`` 

682 which would downgrade a multi_step verdict to single_shot). 

683 Currently ignored. 

684 """ 

685 if not task: 

686 return 'single_shot' 

687 for pat in self._ENUMERATE_PATTERNS: 

688 if pat.search(task): 

689 return 'enumerate' 

690 for pat in self._MULTI_STEP_PATTERNS: 

691 if pat.search(task): 

692 return 'multi_step' 

693 return 'single_shot' 

694 

695 def dispatch_grounding(self, screenshot_b64, task, *, 

696 history=None, prev_screenshot_b64=None, 

697 route: str = None): 

698 """Route *task* to the best grounding method via :meth:`route_task`, 

699 then call it. Single entry point so callers don't have to know 

700 which of the three siblings to invoke for which task class. 

701 

702 Behavior per route: 

703 * ``'enumerate'`` → :meth:`parse_and_reason` (SoM result) 

704 * ``'single_shot'`` → :meth:`point_and_act` (drop-in shape) 

705 * ``'multi_step'`` → returns a sentinel 

706 ``{'route': 'multi_step', 'recommend': 

707 'run_local_agentic_loop', 'reasoning': '...'}`` 

708 so the caller can escalate to the loop dispatcher (which 

709 lives in local_loop.py and would create a circular import 

710 if called from inside the backend). 

711 

712 ``route`` may be passed explicitly to override the heuristic 

713 (e.g. the loop dispatcher already decided multi_step and is 

714 calling per-iteration with route='single_shot'). 

715 

716 Every result has ``'route'`` set so the regression gate can 

717 catch silent routing drift across runs. 

718 """ 

719 if route is None: 

720 route = self.route_task(task) 

721 

722 if route == 'enumerate': 

723 result = self.parse_and_reason( 

724 screenshot_b64, task, history=history) 

725 result.setdefault('route', 'enumerate') 

726 return result 

727 

728 if route == 'multi_step': 

729 # Sentinel — local_loop owns the multi-iter dispatch. 

730 # Returning instead of importing avoids backend → loop → 

731 # backend circular dependency. 

732 return { 

733 'action': None, 

734 'route': 'multi_step', 

735 'recommend': 'run_local_agentic_loop', 

736 'reasoning': ( 

737 'task chains multiple actions; caller should ' 

738 'dispatch to run_local_agentic_loop which calls ' 

739 'this backend per-iteration with route=single_shot' 

740 ), 

741 'latency': 0.0, 

742 } 

743 

744 # Default: single_shot via point_and_act. 

745 result = self.point_and_act( 

746 screenshot_b64, task, 

747 history=history, prev_screenshot_b64=prev_screenshot_b64) 

748 result.setdefault('route', 'single_shot') 

749 return result 

750 

751 def point_and_act(self, screenshot_b64, task, history=None, prev_screenshot_b64=None): 

752 """ 

753 Optimized hybrid grounding strategy based on benchmark results. 

754 

755 Strategy selection (benchmark-driven): 

756 1. Taskbar targets → taskbar_list (list all icons, pick by name) avg=50 

757 2. All targets → describe_first (describe position, then point) avg=78 

758 3. Suspicious center coords → elimination retry (halving search) 

759 

760 Args: 

761 screenshot_b64: Current screenshot (base64 JPEG/PNG) 

762 task: What to accomplish (e.g. "Click the Start button") 

763 history: List of previous action strings for context 

764 prev_screenshot_b64: Previous screenshot for state change detection 

765 

766 Returns: 

767 dict with: action, screen_x, screen_y, text, done, reasoning, raw 

768 """ 

769 start = time.time() 

770 hist_text = ' → '.join(history[-3:]) if history else 'None' 

771 os_context = self._get_os_context() 

772 img_w, img_h = self._get_image_dimensions(screenshot_b64) 

773 

774 # Screen dimensions for pyautogui coordinate scaling 

775 try: 

776 import pyautogui as _pag 

777 screen_w, screen_h = _pag.size() 

778 except Exception as _pag_err: 

779 logger.debug( 

780 f"pyautogui.size() unavailable, using image dims: {_pag_err}") 

781 screen_w, screen_h = img_w, img_h 

782 

783 # --- Strategy 1: Taskbar pre-check via shared helper --- 

784 # Phase 3 of vlm_best_of_all_worlds_plan.md: replaced inline 

785 # taskbar_list code with a call to try_taskbar_pre_check (the 

786 # b7936bf helper). Behavior is byte-identical to the prior 

787 # inline implementation — same _is_taskbar_task gate, same 

788 # _taskbar_list_lookup call, same return-dict shape, same 

789 # fall-through when no match. Verified by the existing 

790 # TestPointAndActBottomEdgeRetry suite. 

791 taskbar_action = self.try_taskbar_pre_check( 

792 screenshot_b64, task, screen_w, screen_h, start) 

793 if taskbar_action is not None: 

794 return taskbar_action 

795 

796 # --- Strategy 2: describe_first (primary, avg=78) --- 

797 state_hint = '' 

798 if prev_screenshot_b64: 

799 state_hint = ( 

800 'Compare this screenshot with the previous one. ' 

801 'Did the screen change from the last action? ' 

802 'If so, proceed to the next step. If not, the last action may have missed its target.\n\n' 

803 ) 

804 

805 prompt_text = ( 

806 f'{os_context}' 

807 f'{state_hint}' 

808 f'Task: {task}\n' 

809 f'Previous actions: {hist_text}\n\n' 

810 f'What is the single next action? Do NOT repeat previous actions.\n\n' 

811 f'- To click: first describe WHERE the target is on screen ' 

812 f'(which edge, which corner, left/right side), ' 

813 f'then give <point>x,y</point> (0-1000 normalized).\n' 

814 f'- To right-click: describe WHERE, then give <point>x,y</point>\n' 

815 f'- To double-click: describe WHERE, then give <point>x,y</point>\n' 

816 f'- To type text: reply TYPE:the text here\n' 

817 f'- To scroll: reply SCROLL_UP or SCROLL_DOWN\n' 

818 f'- If task is complete: reply DONE' 

819 ) 

820 

821 messages = [] 

822 if prev_screenshot_b64: 

823 messages.append({ 

824 "role": "user", 

825 "content": [ 

826 {"type": "text", "text": "Previous screenshot (before last action):"}, 

827 {"type": "image_url", "image_url": { 

828 "url": f"data:image/jpeg;base64,{prev_screenshot_b64}" 

829 }}, 

830 ] 

831 }) 

832 messages.append({ 

833 "role": "assistant", 

834 "content": f"Previous action: {history[-1] if history else 'none'}" 

835 }) 

836 messages.append({ 

837 "role": "user", 

838 "content": [ 

839 {"type": "text", "text": prompt_text}, 

840 {"type": "image_url", "image_url": { 

841 "url": f"data:image/jpeg;base64,{screenshot_b64}" 

842 }}, 

843 ] 

844 }) 

845 

846 raw = self._call_api(messages) 

847 result, nx, ny = self._parse_action_response(raw, img_w, img_h, task=task) 

848 

849 # --- Strategy 3: bias detection + elimination retry via helpers --- 

850 # Phase 3 refactor: replaced inline center/bottom/top-edge bias 

851 # checks + elimination prompt construction with detect_grounding_bias 

852 # + retry_with_elimination (b7936bf helpers). Same patterns 

853 # detected, same retry prompt, same reproduced-bias rejection 

854 # rule. Verified by TestPointAndActBottomEdgeRetry. 

855 bias_kind = self.detect_grounding_bias(nx, ny, result['action'], task) 

856 if bias_kind is not None: 

857 retry = self.retry_with_elimination( 

858 screenshot_b64, task, img_w, img_h, bias_kind) 

859 if retry is not None: 

860 # Helper returns (result, nx, ny) — strategy already 

861 # tagged 'elimination_retry' on the inner result. 

862 result, nx, ny = retry 

863 

864 latency = time.time() - start 

865 result['latency'] = latency 

866 result.setdefault('strategy', 'describe_first') 

867 return result 

868 

869 # ─── Shared grounding-strategy helpers ────────────────────────────── 

870 # Extracted from point_and_act so the multi-iteration agentic loop 

871 # (integrations/vlm/local_loop.py) can use them too. point_and_act 

872 # was refactored in Phase 3 of vlm_best_of_all_worlds_plan.md to 

873 # call these helpers instead of maintaining inline copies, so 

874 # there is now ONE source of truth for taskbar shortcut + bias 

875 # detection + elimination retry — no parallel paths. 

876 # 

877 # Why it matters: commit 8fa6e97 (Apr 10, 2026 — "Single VLM call: 

878 # plan + ground in one prompt — halves per-step latency") moved the 

879 # loop OFF point_and_act onto its own inline prompt to halve 

880 # latency. That trade-off shipped the latency win but silently 

881 # dropped point_and_act's smart grounding (taskbar_list shortcut + 

882 # center/bottom/top-edge bias detection + elimination_retry). 

883 # These helpers restore those strategies to the loop without 

884 # paying point_and_act's two-phase latency cost. 

885 

886 def try_taskbar_pre_check(self, screenshot_b64, task, 

887 screen_w, screen_h, started_at): 

888 """Pre-VLM-call taskbar shortcut. 

889 

890 When the task targets a taskbar item ("open Chrome", "click 

891 Start button", etc.), skip the heavy describe_first VLM call 

892 and use _taskbar_list_lookup directly. Returns the click 

893 action dict on a hit, None on a miss (caller falls through to 

894 its normal VLM grounding path). 

895 

896 Args: 

897 screenshot_b64: current screen as base64 (JPEG/PNG) 

898 task: user instruction 

899 screen_w, screen_h: physical screen pixel dimensions for 

900 pyautogui coordinate scaling (norm 0-1000 → screen px) 

901 started_at: time.time() value from the caller's start — 

902 used to compute total latency for telemetry parity 

903 with point_and_act. 

904 

905 Returns: 

906 dict (point_and_act-compatible action shape) or None. 

907 """ 

908 if not self._is_taskbar_task(task): 

909 return None 

910 logger.info(f"Using taskbar_list strategy for: {task}") 

911 match, list_raw = self._taskbar_list_lookup(screenshot_b64, task) 

912 if not match: 

913 logger.info("taskbar_list: no match found, falling through") 

914 return None 

915 nx, ny, match_line = match 

916 px = int(nx * screen_w / 1000) 

917 py = int(ny * screen_h / 1000) 

918 return { 

919 'action': 'left_click', 

920 'screen_x': px, 'screen_y': py, 

921 'norm_x': nx, 'norm_y': ny, 

922 'text': '', 'done': False, 

923 'reasoning': f'taskbar_list: {match_line}', 

924 'raw': list_raw, 

925 'latency': time.time() - started_at, 

926 'strategy': 'taskbar_list', 

927 } 

928 

929 def detect_grounding_bias(self, nx, ny, action, task): 

930 """Pure-function bias detector for VLM-grounded click coords. 

931 

932 Returns 'center' | 'bottom-edge' | 'top-edge' | None. Mirrors 

933 the inline checks in point_and_act so the loop can ask the 

934 same question on its own grounded coords. Coordinates are 

935 in 0-1000 normalized space. 

936 """ 

937 if nx is None or ny is None or action != 'left_click': 

938 return None 

939 is_center = (350 < nx < 650 and 350 < ny < 650) 

940 task_lower = task.lower() 

941 task_is_taskbar = self._is_taskbar_task(task) or any( 

942 kw in task_lower for kw in 

943 ('taskbar', 'start button', 'system tray') 

944 ) 

945 is_bottom_edge = (ny > 930 and not task_is_taskbar) 

946 is_top_edge = (ny < 30) 

947 if is_bottom_edge: 

948 return 'bottom-edge' 

949 if is_top_edge: 

950 return 'top-edge' 

951 if is_center: 

952 return 'center' 

953 return None 

954 

955 def retry_with_elimination(self, screenshot_b64, task, 

956 img_w, img_h, bias_kind): 

957 """Elimination-retry VLM call for biased coordinates. 

958 

959 When detect_grounding_bias flags a coord, this re-asks the VLM 

960 with a more pointed prompt (top/bottom/left/right thirds, 

961 avoid taskbar strip). Returns (result, nx, ny) on a clean 

962 re-grounding, None when the retry reproduces the same bias 

963 (caller keeps the original coords). 

964 

965 bias_kind: one of 'center' | 'bottom-edge' | 'top-edge'. 

966 """ 

967 logger.info( 

968 f"{bias_kind}-biased coords for non-taskbar task, " 

969 f"retrying with elimination strategy" 

970 ) 

971 elim_prompt = ( 

972 f'I need to find the target for: {task}\n' 

973 f'Describe its location precisely BEFORE giving coordinates:\n' 

974 f' - Top half or bottom half?\n' 

975 f' - Left third, middle third, or right third?\n' 

976 f' - Is it inside a window, in a menu, or on the taskbar?\n' 

977 f'If the task asks to open an app and that app is not ' 

978 f'already visible, the correct action is usually NOT a ' 

979 f'click — respond with DONE and I will use a keyboard ' 

980 f'shortcut instead.\n' 

981 f'Otherwise, give the precise <point>x,y</point> (0-1000 normalized) ' 

982 f'and avoid the taskbar strip (y > 930) unless the target ' 

983 f'is an actual taskbar icon.' 

984 ) 

985 elim_raw = self._call_api([{ 

986 "role": "user", 

987 "content": [ 

988 {"type": "text", "text": elim_prompt}, 

989 {"type": "image_url", "image_url": { 

990 "url": f"data:image/jpeg;base64,{screenshot_b64}" 

991 }}, 

992 ] 

993 }]) 

994 elim_result, enx, eny = self._parse_action_response( 

995 elim_raw, img_w, img_h, task=task, 

996 ) 

997 # Reject only if the retry reproduced the original bias. 

998 if enx is None or eny is None: 

999 return None 

1000 if bias_kind == 'bottom-edge': 

1001 task_lower = task.lower() 

1002 task_is_taskbar = self._is_taskbar_task(task) or any( 

1003 kw in task_lower for kw in 

1004 ('taskbar', 'start button', 'system tray') 

1005 ) 

1006 if eny > 930 and not task_is_taskbar: 

1007 return None 

1008 elif bias_kind == 'top-edge': 

1009 if eny < 30: 

1010 return None 

1011 elif bias_kind == 'center': 

1012 if 350 < enx < 650 and 350 < eny < 650: 

1013 return None 

1014 elim_result['strategy'] = 'elimination_retry' 

1015 logger.info(f"Elimination retry gave ({enx},{eny}) — using it") 

1016 return elim_result, enx, eny 

1017 

1018 def verify_goal(self, screenshot_b64, goal): 

1019 """Check if the goal is achieved by looking at the current screenshot. 

1020 

1021 Returns: (bool, str) — (achieved, explanation) 

1022 """ 

1023 raw = self._call_api([{ 

1024 "role": "user", 

1025 "content": [ 

1026 {"type": "text", "text": ( 

1027 f'Is this goal achieved? Goal: "{goal}"\n' 

1028 f'Reply YES or NO and one sentence why.' 

1029 )}, 

1030 {"type": "image_url", "image_url": { 

1031 "url": f"data:image/jpeg;base64,{screenshot_b64}" 

1032 }}, 

1033 ] 

1034 }]) 

1035 achieved = 'YES' in raw.upper().split('.')[0] 

1036 return achieved, raw.strip() 

1037 

1038 def describe_scene(self, screenshot_b64, prompt='Describe what you see in this image'): 

1039 """Scene description — drop-in replacement for MiniCPM backend.""" 

1040 messages = [{ 

1041 "role": "user", 

1042 "content": [ 

1043 {"type": "text", "text": prompt}, 

1044 {"type": "image_url", "image_url": { 

1045 "url": f"data:image/jpeg;base64,{screenshot_b64}" 

1046 }}, 

1047 ] 

1048 }] 

1049 return self._call_api(messages) 

1050 

1051 def _call_api(self, messages): 

1052 """Call Qwen3-VL OpenAI-compatible API.""" 

1053 from core.http_pool import pooled_post 

1054 

1055 try: 

1056 resp = pooled_post( 

1057 f'{self.base_url.rstrip("/")}/chat/completions', 

1058 json={ 

1059 'model': self.model_name, 

1060 'messages': messages, 

1061 'max_tokens': 4096, 

1062 'temperature': 0.0, 

1063 }, 

1064 headers={'Authorization': f'Bearer {self.api_key}'}, 

1065 timeout=self.timeout, 

1066 ) 

1067 resp.raise_for_status() 

1068 data = resp.json() 

1069 msg = data['choices'][0]['message'] 

1070 # Qwen3.5 thinking mode: content may be None if all output is in 

1071 # reasoning_content. Fall back to reasoning_content if content is empty. 

1072 content = msg.get('content') 

1073 if not content and msg.get('reasoning_content'): 

1074 content = msg['reasoning_content'] 

1075 return content or '' 

1076 except Exception as e: 

1077 logger.error(f"Qwen3-VL API call failed: {e}") 

1078 raise 

1079 

1080 def _parse_unified_response(self, response_text): 

1081 """Parse Qwen3-VL JSON response, handling markdown blocks and partial JSON. 

1082 

1083 Phase 5: thin shim onto :mod:`integrations.vlm.parser`. Same 

1084 dict shape (UI_Elements + Next Action + Status + Reasoning) 

1085 as the historical inline implementation, but the JSON 

1086 extraction (code-block / raw-brace / depth-counted) lives in 

1087 one canonical place now. 

1088 """ 

1089 from integrations.vlm.parser import parse_vlm_action 

1090 pa = parse_vlm_action( 

1091 response_text or '', expected_shape='som_bbox') 

1092 result = pa.to_action_json_dict() 

1093 # Legacy callers expect UI_Elements always present (default to []). 

1094 result.setdefault('UI_Elements', []) 

1095 return result 

1096 

1097 @staticmethod 

1098 def _get_image_dimensions(b64_data): 

1099 """Get width, height from base64 PNG/JPEG image.""" 

1100 try: 

1101 from PIL import Image 

1102 img_bytes = base64.b64decode(b64_data) 

1103 img = Image.open(io.BytesIO(img_bytes)) 

1104 return img.width, img.height 

1105 except Exception as e: 

1106 # Fallback to common resolution. Log because using the 

1107 # wrong resolution causes coord-scaling drift downstream; 

1108 # silent fallback would be diagnosable only via wrong- 

1109 # location-clicks symptoms in production. 

1110 logger.debug(f"_get_image_dimensions failed, using 1920x1080 fallback: {e}") 

1111 return 1920, 1080 

1112 

1113 @staticmethod 

1114 def _is_normalized_1000(bbox, img_w, img_h): 

1115 """Check if bbox values are in Qwen3-VL's [0, 1000] normalized range.""" 

1116 if not bbox or len(bbox) != 4: 

1117 return False 

1118 # If all values are <=1000 and the image is larger than 1000px, 

1119 # these are probably normalized coordinates 

1120 max_val = max(bbox) 

1121 return max_val <= 1000 and (img_w > 1000 or img_h > 1000) 

1122 

1123 @staticmethod 

1124 def _normalize_bbox(bbox_1000, img_w, img_h): 

1125 """Convert Qwen3-VL [0, 1000] normalized bbox to pixel coordinates.""" 

1126 return [ 

1127 int(bbox_1000[0] * img_w / 1000), 

1128 int(bbox_1000[1] * img_h / 1000), 

1129 int(bbox_1000[2] * img_w / 1000), 

1130 int(bbox_1000[3] * img_h / 1000), 

1131 ]