Coverage for integrations / vlm / parser.py: 97.6%
168 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2integrations.vlm.parser — single source of truth for VLM response parsing.
4Phase 5 of memory/vlm_best_of_all_worlds_plan.md §4. Replaces three
5parallel parsers that drifted apart over time:
7 * ``local_loop._parse_vlm_response`` — JSON shape, used by
8 inline-prompt branch
9 * ``qwen3vl_backend._parse_unified_response`` — JSON shape with
10 UI_Elements, used by parse_and_reason / parse_screen
11 * ``qwen3vl_backend._parse_action_response`` — free-text shape with
12 <point>x,y</point> / TYPE: / DONE / scroll, used by point_and_act
13 and the taskbar shortcut
15The first two duplicated their JSON extraction. This module
16exposes:
18 ``extract_json(text)`` — single canonical JSON extractor
19 (handles ```json blocks, raw {},
20 depth-counted nested objects)
21 ``ParsedAction`` dataclass — normalized result; same fields
22 regardless of input shape so
23 downstream code stops branching
24 on which parser ran.
25 ``parse_vlm_action(raw, ...)`` — single entry point keyed on
26 ``expected_shape='action_json' |
27 'som_bbox' | 'point_only'``.
29Old parsers are shimmed onto this module — see the docstrings on
30each shim for byte-equivalence notes.
31"""
33import json
34import re
35import logging
36from dataclasses import dataclass, field, asdict
37from typing import List, Optional, Callable
39logger = logging.getLogger('hevolve.vlm.parser')
42# ─── Pre-compiled regex (module-load cost is one-time) ────────────────
44_CODE_BLOCK_RE = re.compile(r'```(?:json)?\s*(\{.*?\})\s*```', re.DOTALL)
45_RAW_BRACE_RE = re.compile(r'\{[^{}]*(?:\{[^{}]*\}[^{}]*)*\}', re.DOTALL)
46_POINT_RE = re.compile(r'<point>\s*(\d+)\s*,\s*(\d+)\s*</point>')
47_TYPE_PREFIX_RE = re.compile(r'^TYPE:\s*(.+)$', re.IGNORECASE)
48_TYPE_FREETEXT_RE = re.compile(
49 r'(?:type|enter|input)\s*[:\-"\']+\s*(.+?)(?:\s*<|$)',
50 re.IGNORECASE,
51)
52_NUMBER_RE = re.compile(r'\d+')
55# ─── ParsedAction dataclass ──────────────────────────────────────────
57@dataclass
58class ParsedAction:
59 """Normalized result of parsing any of the three VLM response
60 shapes. Fields not relevant to a given shape stay at their
61 default values; consumers read whichever fields apply.
63 Back-compat conversion methods:
64 ``to_action_json_dict()`` reproduces what ``_parse_vlm_response``
65 and ``_parse_unified_response`` historically returned.
66 ``to_point_action_dict()`` reproduces the ``_parse_action_response``
67 shape (the dict point_and_act builds its result from).
68 """
69 raw: str = ''
70 action: str = 'none' # 'left_click', 'type', 'scroll_down', 'done', 'none', ...
71 reasoning: str = ''
72 done: bool = False
73 status: str = '' # 'IN_PROGRESS', 'DONE' (JSON shapes)
74 text: str = '' # for 'type' actions
75 norm_x: Optional[int] = None # 0-1000 normalized
76 norm_y: Optional[int] = None
77 screen_x: int = 0
78 screen_y: int = 0
79 box_id: Optional[int] = None
80 coordinate: Optional[List[int]] = None
81 next_action: str = '' # original 'Next Action' string
82 ui_elements: List[dict] = field(default_factory=list)
83 parsed_content_list: List[dict] = field(default_factory=list)
85 def to_action_json_dict(self) -> dict:
86 """Convert to the dict legacy ``_parse_vlm_response`` /
87 ``_parse_unified_response`` callers consume.
89 Keeps original casing of 'Next Action' / 'Status' / 'Reasoning'
90 + 'Box ID' to avoid breaking downstream key access.
91 """
92 out = {
93 'Next Action': self.next_action or 'None',
94 'Status': self.status or ('DONE' if self.done else 'IN_PROGRESS'),
95 'Reasoning': self.reasoning or self.raw[:500],
96 }
97 if self.text:
98 out['value'] = self.text
99 if self.coordinate is not None:
100 out['coordinate'] = self.coordinate
101 if self.box_id is not None:
102 out['Box ID'] = self.box_id
103 if self.ui_elements:
104 out['UI_Elements'] = self.ui_elements
105 if self.parsed_content_list:
106 out['parsed_content_list'] = self.parsed_content_list
107 return out
109 def to_point_action_dict(self) -> dict:
110 """Convert to the dict shape ``_parse_action_response`` returns
111 for point_and_act. Includes only keys the legacy code populated.
112 """
113 result = {
114 'action': self.action,
115 'screen_x': self.screen_x,
116 'screen_y': self.screen_y,
117 'text': self.text,
118 'done': self.done,
119 'reasoning': self.reasoning,
120 'raw': self.raw,
121 }
122 if self.norm_x is not None:
123 result['norm_x'] = self.norm_x
124 if self.norm_y is not None:
125 result['norm_y'] = self.norm_y
126 return result
129# ─── Extract JSON ─────────────────────────────────────────────────────
131def extract_json(text: str) -> Optional[dict]:
132 """Extract a JSON object from VLM text.
134 Tries in order:
135 1. Markdown `````json``... fenced block (most reliable —
136 models trained on instruction data tend to fence JSON).
137 2. Depth-counted brace walk (correctly handles nested objects;
138 returns the OUTER object).
139 3. Simple raw ``{...}`` match (last-resort cheap path; only
140 reached when the depth-counted walk found nothing
141 JSON-parseable).
143 The legacy ``_parse_unified_response`` had raw-brace BEFORE
144 depth-counted, which on nested input like
145 ``{"outer": {"inner": [{...}]}}`` returned the innermost
146 ``{...}`` instead of the full object — a partial extraction
147 bug that silently lost UI_Elements / Reasoning fields. This
148 implementation fixes that by trying depth-counted first.
150 Returns ``None`` when nothing parseable was found.
151 """
152 if not text:
153 return None
154 m = _CODE_BLOCK_RE.search(text)
155 if m:
156 try:
157 return json.loads(m.group(1))
158 except json.JSONDecodeError:
159 pass
160 # Depth-counted nested-brace walk — returns OUTER object on success.
161 depth = 0
162 start_idx = None
163 for i, ch in enumerate(text):
164 if ch == '{':
165 if depth == 0:
166 start_idx = i
167 depth += 1
168 elif ch == '}':
169 depth -= 1
170 if depth == 0 and start_idx is not None:
171 try:
172 return json.loads(text[start_idx:i + 1])
173 except json.JSONDecodeError:
174 start_idx = None
175 # Last resort: simple raw-brace. Only reached when depth-counted
176 # found no balanced top-level object (e.g. truncated mid-stream).
177 m = _RAW_BRACE_RE.search(text)
178 if m:
179 try:
180 return json.loads(m.group(0))
181 except json.JSONDecodeError:
182 pass
183 return None
186# ─── parse_vlm_action: single entry point ─────────────────────────────
188def parse_vlm_action(
189 raw: str,
190 *,
191 expected_shape: str = 'action_json',
192 task: str = '',
193 screen_w: Optional[int] = None,
194 screen_h: Optional[int] = None,
195 detect_action_type: Optional[Callable[[str, str], str]] = None,
196 scroll_down_keywords: tuple = (),
197 scroll_up_keywords: tuple = (),
198) -> ParsedAction:
199 """Parse *raw* VLM response into a normalized :class:`ParsedAction`.
201 Args:
202 raw: VLM response string.
203 expected_shape: Which schema to expect.
204 ``'action_json'`` — Single-action JSON dict with keys
205 ``Next Action``, ``Status``, ``Reasoning``, ``coordinate``,
206 ``value``, ``Box ID`` (optional). Local-loop inline branch
207 + parse_and_reason action_json.
208 ``'som_bbox'`` — Same as action_json but additionally
209 extracts ``UI_Elements`` / ``parsed_content_list`` for
210 the SoM-bbox view.
211 ``'point_only'`` — Free-text response with
212 ``<point>x,y</point>`` markers, ``TYPE:`` prefix,
213 ``DONE``, or scroll keywords. Used by ``point_and_act``
214 and the taskbar shortcut.
215 task: Original task string (only used by ``'point_only'`` for
216 action-type detection).
217 screen_w, screen_h: Screen dimensions for norm→screen-px
218 scaling on ``'point_only'`` (caller passes
219 ``pyautogui.size()``; falls back to no scaling if None).
220 detect_action_type: Callable ``(task, raw) -> action_type``
221 injected by ``Qwen3VLBackend`` so this parser doesn't
222 need to know about the backend's keyword tables.
223 scroll_down_keywords, scroll_up_keywords: Tuples of substrings
224 the ``'point_only'`` parser checks against task+raw to
225 detect scroll intent.
226 """
227 raw = (raw or '').strip()
229 if expected_shape in ('action_json', 'som_bbox'):
230 return _parse_json_shape(raw, include_som=(expected_shape == 'som_bbox'))
232 if expected_shape == 'point_only':
233 return _parse_point_shape(
234 raw, task=task,
235 screen_w=screen_w, screen_h=screen_h,
236 detect_action_type=detect_action_type,
237 scroll_down_keywords=scroll_down_keywords,
238 scroll_up_keywords=scroll_up_keywords,
239 )
241 raise ValueError(f"Unknown expected_shape: {expected_shape!r}")
244def _parse_json_shape(raw: str, *, include_som: bool) -> ParsedAction:
245 """Common JSON-shape parser used by both 'action_json' and
246 'som_bbox'. ``include_som`` adds UI_Elements + parsed_content_list
247 population from the parsed dict."""
248 pa = ParsedAction(raw=raw)
249 parsed = extract_json(raw)
250 if parsed is None:
251 # Fallback shape — treat as DONE so the loop terminates safely.
252 pa.next_action = 'None'
253 pa.status = 'DONE'
254 pa.done = True
255 pa.action = 'none'
256 pa.reasoning = raw[:500] or 'Empty / unparseable VLM response'
257 if not raw:
258 pa.reasoning = 'Empty VLM response'
259 return pa
261 pa.next_action = parsed.get('Next Action', '') or ''
262 pa.status = parsed.get('Status', '') or ''
263 pa.reasoning = parsed.get('Reasoning', '') or raw[:500]
264 pa.text = parsed.get('value', '') or ''
265 pa.coordinate = parsed.get('coordinate')
266 pa.box_id = parsed.get('Box ID')
267 pa.done = pa.status.upper() == 'DONE'
268 # Normalize action: 'left_click' → 'left_click', 'Left Click' → 'left_click'
269 pa.action = (pa.next_action or 'none').lower().replace(' ', '_')
270 if include_som:
271 pa.ui_elements = parsed.get('UI_Elements', []) or []
272 pa.parsed_content_list = parsed.get('parsed_content_list', []) or []
273 return pa
276def _parse_point_shape(
277 raw: str, *,
278 task: str,
279 screen_w: Optional[int],
280 screen_h: Optional[int],
281 detect_action_type: Optional[Callable[[str, str], str]],
282 scroll_down_keywords: tuple,
283 scroll_up_keywords: tuple,
284) -> ParsedAction:
285 """Free-text shape parser — extracts <point>x,y</point>, TYPE:,
286 DONE, scroll keywords. Mirrors the legacy
287 ``_parse_action_response`` behaviour byte-for-byte except it
288 returns a ParsedAction instead of a 3-tuple (the shim adapts)."""
289 pa = ParsedAction(raw=raw)
291 # DONE — task complete signal.
292 if 'DONE' in raw.upper():
293 pa.action = 'done'
294 pa.done = True
295 pa.reasoning = raw
296 return pa
298 # TYPE: prefix variant (most reliable when present).
299 m = _TYPE_PREFIX_RE.match(raw)
300 if m:
301 text = m.group(1).strip()
302 pa.action = 'type'
303 pa.text = text
304 pa.reasoning = f'type "{text}"'
305 return pa
307 # Free-text "type X" variant — only when NO point marker is present
308 # (otherwise the point should win).
309 if '<point>' not in raw:
310 m = _TYPE_FREETEXT_RE.search(raw)
311 if m:
312 text = m.group(1).strip().strip('"\'')
313 pa.action = 'type'
314 pa.text = text
315 pa.reasoning = f'type "{text}"'
316 return pa
318 # Scroll keywords (task or raw).
319 raw_lower = raw.lower()
320 task_lower = task.lower() if task else ''
321 if any(kw in task_lower or kw in raw_lower for kw in scroll_down_keywords):
322 pa.action = 'scroll_down'
323 pa.reasoning = 'scroll down'
324 return pa
325 if any(kw in task_lower or kw in raw_lower for kw in scroll_up_keywords):
326 pa.action = 'scroll_up'
327 pa.reasoning = 'scroll up'
328 return pa
330 # Coordinate extraction — <point> first, then number-pair fallback.
331 action_type = (detect_action_type(task, raw)
332 if detect_action_type else 'left_click')
333 m = _POINT_RE.search(raw)
334 if m:
335 nx, ny = int(m.group(1)), int(m.group(2))
336 pa.action = action_type
337 pa.norm_x = nx
338 pa.norm_y = ny
339 if screen_w and screen_h:
340 pa.screen_x = int(nx * screen_w / 1000)
341 pa.screen_y = int(ny * screen_h / 1000)
342 pa.reasoning = f'{action_type} at ({nx},{ny}) normalized'
343 return pa
345 nums = _NUMBER_RE.findall(raw)
346 if len(nums) >= 2:
347 nx, ny = int(nums[0]), int(nums[1])
348 if 0 <= nx <= 1000 and 0 <= ny <= 1000:
349 pa.action = action_type
350 pa.norm_x = nx
351 pa.norm_y = ny
352 if screen_w and screen_h:
353 pa.screen_x = int(nx * screen_w / 1000)
354 pa.screen_y = int(ny * screen_h / 1000)
355 pa.reasoning = f'fallback {action_type} ({nx},{ny})'
356 return pa
358 # Couldn't extract anything actionable.
359 logger.warning(f"Could not parse point_only response: {raw[:100]}")
360 pa.action = 'none'
361 pa.reasoning = raw[:500] or 'unparseable point_only response'
362 return pa