Coverage for integrations / vlm / vlm_agent_integration.py: 100.0%
87 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2VLM Agent Integration Module
4Bridges VLM agent's visual computer use capabilities with the agent ledger system.
5Provides visual feedback context injection and Windows command execution.
7The VLM agent (from OmniParser) provides:
8- Screen understanding via OmniParser
9- GUI interaction (click, type, scroll, etc.)
10- File operations (read, write, list, copy)
11- Windows command execution
13This module integrates VLM feedback into the agent ledger for:
14- Visual task verification
15- Screen state tracking
16- GUI automation context
17- Computer use feedback loop
18"""
20import json
21import logging
22import os
23from typing import Dict, Any, Optional, List
24from datetime import datetime
25from pathlib import Path
26from core.http_pool import pooled_get, pooled_post
28logger = logging.getLogger(__name__)
31class VLMAgentContext:
32 """
33 Manages VLM agent context and feedback integration with the agent ledger.
35 Provides methods to:
36 1. Inject visual feedback from VLM agent into agent context
37 2. Track screen state and GUI actions
38 3. Execute Windows commands through VLM agent
39 4. Update ledger with visual verification results
40 """
42 def __init__(self, vlm_server_url: str = None, omniparser_url: str = None):
43 if vlm_server_url is None:
44 vlm_server_url = f"http://localhost:{os.environ.get('VLM_GUI_PORT', '5001')}"
45 if omniparser_url is None:
46 omniparser_url = f"http://localhost:{os.environ.get('OMNIPARSER_PORT', '8080')}"
47 """
48 Initialize VLM agent context manager.
50 Args:
51 vlm_server_url: URL of VLM agent server (agentic_rpc.py Flask server)
52 omniparser_url: URL of OmniParser server for screen parsing
53 """
54 self.vlm_server_url = vlm_server_url
55 self.omniparser_url = omniparser_url
56 self.screen_history: List[Dict[str, Any]] = []
57 self.action_history: List[Dict[str, Any]] = []
59 def is_vlm_available(self) -> bool:
60 """Check if VLM agent server is available."""
61 try:
62 response = pooled_get(f"{self.vlm_server_url}/health", timeout=2)
63 return response.status_code == 200
64 except Exception:
65 return False
67 def is_omniparser_available(self) -> bool:
68 """Check if OmniParser server is available."""
69 try:
70 response = pooled_get(f"{self.omniparser_url}/probe", timeout=2)
71 return response.status_code == 200
72 except Exception:
73 return False
75 def get_screen_context(self) -> Optional[Dict[str, Any]]:
76 """
77 Get current screen context from OmniParser.
79 Returns:
80 Dictionary with:
81 - screenshot_base64: Base64 encoded screenshot
82 - parsed_elements: List of detected UI elements
83 - screen_info: Text description of screen
84 - width, height: Screen dimensions
85 """
86 try:
87 if not self.is_omniparser_available():
88 logger.warning("OmniParser not available, skipping screen context")
89 return None
91 # Request screen parsing from OmniParser
92 response = pooled_post(
93 f"{self.omniparser_url}/parse_screen",
94 json={"include_som": True},
95 timeout=10
96 )
98 if response.status_code == 200:
99 screen_data = response.json()
101 # Store in history
102 self.screen_history.append({
103 "timestamp": datetime.now().isoformat(),
104 "screen_info": screen_data.get("screen_info", ""),
105 "element_count": len(screen_data.get("parsed_content_list", []))
106 })
108 # Keep only last 10 screens
109 if len(self.screen_history) > 10:
110 self.screen_history.pop(0)
112 return screen_data
113 else:
114 logger.error(f"OmniParser returned error: {response.status_code}")
115 return None
117 except Exception as e:
118 logger.error(f"Error getting screen context: {e}")
119 return None
121 def inject_visual_context_into_ledger_task(self, task_context: Dict[str, Any]) -> Dict[str, Any]:
122 """
123 Inject current visual screen context into a ledger task's context.
125 Args:
126 task_context: Existing task context dictionary
128 Returns:
129 Enhanced task context with visual information
130 """
131 screen_context = self.get_screen_context()
133 if screen_context:
134 task_context["visual_context"] = {
135 "has_screen_info": True,
136 "screen_summary": screen_context.get("screen_info", "")[:500], # First 500 chars
137 "visible_elements": len(screen_context.get("parsed_content_list", [])),
138 "screen_dimensions": {
139 "width": screen_context.get("width"),
140 "height": screen_context.get("height")
141 },
142 "timestamp": datetime.now().isoformat()
143 }
144 else:
145 task_context["visual_context"] = {
146 "has_screen_info": False,
147 "note": "VLM agent not available"
148 }
150 return task_context
152 def execute_vlm_action(
153 self,
154 action: str,
155 parameters: Optional[Dict[str, Any]] = None,
156 user_id: str = "agent",
157 prompt_id: str = "task"
158 ) -> Dict[str, Any]:
159 """
160 Execute an action through VLM agent.
162 Args:
163 action: Action type (e.g., "type", "left_click", "list_folders_and_files")
164 parameters: Action parameters (e.g., {"text": "Hello", "coordinate": [100, 200]})
165 user_id: User identifier
166 prompt_id: Prompt/task identifier
168 Returns:
169 Result dictionary with status and output
170 """
171 try:
172 if not self.is_vlm_available():
173 return {
174 "status": "error",
175 "message": "VLM agent server not available",
176 "action": action
177 }
179 # Prepare request payload
180 payload = {
181 "user_id": user_id,
182 "prompt_id": prompt_id,
183 "action": action,
184 "parameters": parameters or {}
185 }
187 # Send action request to VLM agent
188 response = pooled_post(
189 f"{self.vlm_server_url}/execute_action",
190 json=payload,
191 timeout=30
192 )
194 if response.status_code == 200:
195 result = response.json()
197 # Track action in history
198 self.action_history.append({
199 "timestamp": datetime.now().isoformat(),
200 "action": action,
201 "parameters": parameters,
202 "result": result.get("status", "unknown")
203 })
205 # Keep only last 50 actions
206 if len(self.action_history) > 50:
207 self.action_history.pop(0)
209 return result
210 else:
211 return {
212 "status": "error",
213 "message": f"VLM agent returned error: {response.status_code}",
214 "action": action
215 }
217 except Exception as e:
218 logger.error(f"Error executing VLM action '{action}': {e}")
219 return {
220 "status": "error",
221 "message": str(e),
222 "action": action
223 }
225 # execute_windows_command REMOVED — it was a Windows-specific parallel
226 # path that simulated Win+R → type → Enter against the old OmniParser
227 # HTTP server with no denylist, no NFKC normalization, no timeout, and
228 # no audit log. The unified cross-OS replacement is
229 # `execute_windows_or_android_command` in create_recipe.py, which:
230 #
231 # - supports windows / linux / macos / android via vlm_adapter →
232 # run_local_agentic_loop → shared shell/open_file_gui actions
233 # - runs every command through _handle_shell_command_tool's denylist
234 # - works in frozen builds (no pyautogui HTTP dependency)
235 #
236 # Callers that used `VLMAgentContext.execute_windows_command(cmd)` should
237 # use the autogen-registered `execute_windows_or_android_command(
238 # instructions=cmd, os_to_control='windows')` instead. See commit
239 # history dce4b31..HEAD for the subprocess isolation + unified shell
240 # handler context.
242 def get_visual_feedback_for_task(self, task_description: str) -> str:
243 """
244 Get visual feedback about current screen state relevant to a task.
246 Args:
247 task_description: Description of the task being performed
249 Returns:
250 Text feedback about screen state
251 """
252 screen_context = self.get_screen_context()
254 if not screen_context:
255 return "Visual feedback unavailable (VLM agent not accessible)"
257 feedback_parts = []
258 feedback_parts.append(f"Task: {task_description}")
259 feedback_parts.append(f"\nScreen Analysis:")
260 feedback_parts.append(f"- Detected {len(screen_context.get('parsed_content_list', []))} UI elements")
261 feedback_parts.append(f"- Screen dimensions: {screen_context.get('width')}x{screen_context.get('height')}")
263 # Add summary of visible elements
264 screen_info = screen_context.get("screen_info", "")
265 if screen_info:
266 feedback_parts.append(f"\nVisible elements:")
267 feedback_parts.append(screen_info[:500]) # First 500 chars
269 # Add recent action history
270 if self.action_history:
271 feedback_parts.append(f"\nRecent actions (last 5):")
272 for action_record in self.action_history[-5:]:
273 feedback_parts.append(
274 f"- {action_record['action']} -> {action_record['result']}"
275 )
277 return "\n".join(feedback_parts)
279 def create_vlm_enabled_tool(self, tool_name: str, tool_description: str) -> Dict[str, Any]:
280 """
281 Create a tool definition that can be used by agents to interact with VLM.
283 Args:
284 tool_name: Name of the tool
285 tool_description: Description of what the tool does
287 Returns:
288 Tool definition dictionary
289 """
290 return {
291 "type": "function",
292 "function": {
293 "name": tool_name,
294 "description": tool_description,
295 "parameters": {
296 "type": "object",
297 "properties": {
298 "action": {
299 "type": "string",
300 "enum": [
301 "type", "left_click", "right_click", "double_click",
302 "hover", "scroll_up", "scroll_down", "wait", "hotkey",
303 "list_folders_and_files", "Open_file_and_copy_paste",
304 "open_file_gui", "write_file", "read_file_and_understand"
305 ],
306 "description": "The action to perform"
307 },
308 "parameters": {
309 "type": "object",
310 "description": "Parameters for the action (e.g., {'text': 'Hello'}, {'coordinate': [100, 200]})"
311 }
312 },
313 "required": ["action"]
314 }
315 }
316 }
318 def get_status_summary(self) -> Dict[str, Any]:
319 """Get current status summary of VLM agent integration."""
320 return {
321 "vlm_available": self.is_vlm_available(),
322 "omniparser_available": self.is_omniparser_available(),
323 "screen_history_count": len(self.screen_history),
324 "action_history_count": len(self.action_history),
325 "last_screen_capture": self.screen_history[-1]["timestamp"] if self.screen_history else None,
326 "last_action": self.action_history[-1] if self.action_history else None
327 }
330# Singleton instance
331_vlm_context = None
333def get_vlm_context(vlm_server_url: str = None, omniparser_url: str = None) -> VLMAgentContext:
334 """Get or create the singleton VLM context instance."""
335 global _vlm_context
336 if _vlm_context is None:
337 _vlm_context = VLMAgentContext(vlm_server_url, omniparser_url)
338 return _vlm_context
341if __name__ == "__main__":
342 # Test the VLM integration
343 print("Testing VLM Agent Integration\n")
345 vlm = get_vlm_context()
347 # Check availability
348 print(f"VLM Agent available: {vlm.is_vlm_available()}")
349 print(f"OmniParser available: {vlm.is_omniparser_available()}")
351 # Get status
352 status = vlm.get_status_summary()
353 print(f"\nStatus: {json.dumps(status, indent=2)}")
355 # Test getting screen context (if available)
356 if vlm.is_omniparser_available():
357 print("\nGetting screen context...")
358 screen = vlm.get_screen_context()
359 if screen:
360 print(f"Screen dimensions: {screen.get('width')}x{screen.get('height')}")
361 print(f"Detected elements: {len(screen.get('parsed_content_list', []))}")
363 # Test context injection
364 print("\nTesting context injection...")
365 task_context = {
366 "task_id": "test_task",
367 "description": "Test task for VLM integration"
368 }
369 enhanced_context = vlm.inject_visual_context_into_ledger_task(task_context)
370 print(f"Enhanced context: {json.dumps(enhanced_context, indent=2)}")