Coverage for integrations / vlm / vlm_agent_integration.py: 100.0%

87 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2VLM Agent Integration Module 

3 

4Bridges VLM agent's visual computer use capabilities with the agent ledger system. 

5Provides visual feedback context injection and Windows command execution. 

6 

7The VLM agent (from OmniParser) provides: 

8- Screen understanding via OmniParser 

9- GUI interaction (click, type, scroll, etc.) 

10- File operations (read, write, list, copy) 

11- Windows command execution 

12 

13This module integrates VLM feedback into the agent ledger for: 

14- Visual task verification 

15- Screen state tracking 

16- GUI automation context 

17- Computer use feedback loop 

18""" 

19 

20import json 

21import logging 

22import os 

23from typing import Dict, Any, Optional, List 

24from datetime import datetime 

25from pathlib import Path 

26from core.http_pool import pooled_get, pooled_post 

27 

28logger = logging.getLogger(__name__) 

29 

30 

31class VLMAgentContext: 

32 """ 

33 Manages VLM agent context and feedback integration with the agent ledger. 

34 

35 Provides methods to: 

36 1. Inject visual feedback from VLM agent into agent context 

37 2. Track screen state and GUI actions 

38 3. Execute Windows commands through VLM agent 

39 4. Update ledger with visual verification results 

40 """ 

41 

42 def __init__(self, vlm_server_url: str = None, omniparser_url: str = None): 

43 if vlm_server_url is None: 

44 vlm_server_url = f"http://localhost:{os.environ.get('VLM_GUI_PORT', '5001')}" 

45 if omniparser_url is None: 

46 omniparser_url = f"http://localhost:{os.environ.get('OMNIPARSER_PORT', '8080')}" 

47 """ 

48 Initialize VLM agent context manager. 

49 

50 Args: 

51 vlm_server_url: URL of VLM agent server (agentic_rpc.py Flask server) 

52 omniparser_url: URL of OmniParser server for screen parsing 

53 """ 

54 self.vlm_server_url = vlm_server_url 

55 self.omniparser_url = omniparser_url 

56 self.screen_history: List[Dict[str, Any]] = [] 

57 self.action_history: List[Dict[str, Any]] = [] 

58 

59 def is_vlm_available(self) -> bool: 

60 """Check if VLM agent server is available.""" 

61 try: 

62 response = pooled_get(f"{self.vlm_server_url}/health", timeout=2) 

63 return response.status_code == 200 

64 except Exception: 

65 return False 

66 

67 def is_omniparser_available(self) -> bool: 

68 """Check if OmniParser server is available.""" 

69 try: 

70 response = pooled_get(f"{self.omniparser_url}/probe", timeout=2) 

71 return response.status_code == 200 

72 except Exception: 

73 return False 

74 

75 def get_screen_context(self) -> Optional[Dict[str, Any]]: 

76 """ 

77 Get current screen context from OmniParser. 

78 

79 Returns: 

80 Dictionary with: 

81 - screenshot_base64: Base64 encoded screenshot 

82 - parsed_elements: List of detected UI elements 

83 - screen_info: Text description of screen 

84 - width, height: Screen dimensions 

85 """ 

86 try: 

87 if not self.is_omniparser_available(): 

88 logger.warning("OmniParser not available, skipping screen context") 

89 return None 

90 

91 # Request screen parsing from OmniParser 

92 response = pooled_post( 

93 f"{self.omniparser_url}/parse_screen", 

94 json={"include_som": True}, 

95 timeout=10 

96 ) 

97 

98 if response.status_code == 200: 

99 screen_data = response.json() 

100 

101 # Store in history 

102 self.screen_history.append({ 

103 "timestamp": datetime.now().isoformat(), 

104 "screen_info": screen_data.get("screen_info", ""), 

105 "element_count": len(screen_data.get("parsed_content_list", [])) 

106 }) 

107 

108 # Keep only last 10 screens 

109 if len(self.screen_history) > 10: 

110 self.screen_history.pop(0) 

111 

112 return screen_data 

113 else: 

114 logger.error(f"OmniParser returned error: {response.status_code}") 

115 return None 

116 

117 except Exception as e: 

118 logger.error(f"Error getting screen context: {e}") 

119 return None 

120 

121 def inject_visual_context_into_ledger_task(self, task_context: Dict[str, Any]) -> Dict[str, Any]: 

122 """ 

123 Inject current visual screen context into a ledger task's context. 

124 

125 Args: 

126 task_context: Existing task context dictionary 

127 

128 Returns: 

129 Enhanced task context with visual information 

130 """ 

131 screen_context = self.get_screen_context() 

132 

133 if screen_context: 

134 task_context["visual_context"] = { 

135 "has_screen_info": True, 

136 "screen_summary": screen_context.get("screen_info", "")[:500], # First 500 chars 

137 "visible_elements": len(screen_context.get("parsed_content_list", [])), 

138 "screen_dimensions": { 

139 "width": screen_context.get("width"), 

140 "height": screen_context.get("height") 

141 }, 

142 "timestamp": datetime.now().isoformat() 

143 } 

144 else: 

145 task_context["visual_context"] = { 

146 "has_screen_info": False, 

147 "note": "VLM agent not available" 

148 } 

149 

150 return task_context 

151 

152 def execute_vlm_action( 

153 self, 

154 action: str, 

155 parameters: Optional[Dict[str, Any]] = None, 

156 user_id: str = "agent", 

157 prompt_id: str = "task" 

158 ) -> Dict[str, Any]: 

159 """ 

160 Execute an action through VLM agent. 

161 

162 Args: 

163 action: Action type (e.g., "type", "left_click", "list_folders_and_files") 

164 parameters: Action parameters (e.g., {"text": "Hello", "coordinate": [100, 200]}) 

165 user_id: User identifier 

166 prompt_id: Prompt/task identifier 

167 

168 Returns: 

169 Result dictionary with status and output 

170 """ 

171 try: 

172 if not self.is_vlm_available(): 

173 return { 

174 "status": "error", 

175 "message": "VLM agent server not available", 

176 "action": action 

177 } 

178 

179 # Prepare request payload 

180 payload = { 

181 "user_id": user_id, 

182 "prompt_id": prompt_id, 

183 "action": action, 

184 "parameters": parameters or {} 

185 } 

186 

187 # Send action request to VLM agent 

188 response = pooled_post( 

189 f"{self.vlm_server_url}/execute_action", 

190 json=payload, 

191 timeout=30 

192 ) 

193 

194 if response.status_code == 200: 

195 result = response.json() 

196 

197 # Track action in history 

198 self.action_history.append({ 

199 "timestamp": datetime.now().isoformat(), 

200 "action": action, 

201 "parameters": parameters, 

202 "result": result.get("status", "unknown") 

203 }) 

204 

205 # Keep only last 50 actions 

206 if len(self.action_history) > 50: 

207 self.action_history.pop(0) 

208 

209 return result 

210 else: 

211 return { 

212 "status": "error", 

213 "message": f"VLM agent returned error: {response.status_code}", 

214 "action": action 

215 } 

216 

217 except Exception as e: 

218 logger.error(f"Error executing VLM action '{action}': {e}") 

219 return { 

220 "status": "error", 

221 "message": str(e), 

222 "action": action 

223 } 

224 

225 # execute_windows_command REMOVED — it was a Windows-specific parallel 

226 # path that simulated Win+R → type → Enter against the old OmniParser 

227 # HTTP server with no denylist, no NFKC normalization, no timeout, and 

228 # no audit log. The unified cross-OS replacement is 

229 # `execute_windows_or_android_command` in create_recipe.py, which: 

230 # 

231 # - supports windows / linux / macos / android via vlm_adapter → 

232 # run_local_agentic_loop → shared shell/open_file_gui actions 

233 # - runs every command through _handle_shell_command_tool's denylist 

234 # - works in frozen builds (no pyautogui HTTP dependency) 

235 # 

236 # Callers that used `VLMAgentContext.execute_windows_command(cmd)` should 

237 # use the autogen-registered `execute_windows_or_android_command( 

238 # instructions=cmd, os_to_control='windows')` instead. See commit 

239 # history dce4b31..HEAD for the subprocess isolation + unified shell 

240 # handler context. 

241 

242 def get_visual_feedback_for_task(self, task_description: str) -> str: 

243 """ 

244 Get visual feedback about current screen state relevant to a task. 

245 

246 Args: 

247 task_description: Description of the task being performed 

248 

249 Returns: 

250 Text feedback about screen state 

251 """ 

252 screen_context = self.get_screen_context() 

253 

254 if not screen_context: 

255 return "Visual feedback unavailable (VLM agent not accessible)" 

256 

257 feedback_parts = [] 

258 feedback_parts.append(f"Task: {task_description}") 

259 feedback_parts.append(f"\nScreen Analysis:") 

260 feedback_parts.append(f"- Detected {len(screen_context.get('parsed_content_list', []))} UI elements") 

261 feedback_parts.append(f"- Screen dimensions: {screen_context.get('width')}x{screen_context.get('height')}") 

262 

263 # Add summary of visible elements 

264 screen_info = screen_context.get("screen_info", "") 

265 if screen_info: 

266 feedback_parts.append(f"\nVisible elements:") 

267 feedback_parts.append(screen_info[:500]) # First 500 chars 

268 

269 # Add recent action history 

270 if self.action_history: 

271 feedback_parts.append(f"\nRecent actions (last 5):") 

272 for action_record in self.action_history[-5:]: 

273 feedback_parts.append( 

274 f"- {action_record['action']} -> {action_record['result']}" 

275 ) 

276 

277 return "\n".join(feedback_parts) 

278 

279 def create_vlm_enabled_tool(self, tool_name: str, tool_description: str) -> Dict[str, Any]: 

280 """ 

281 Create a tool definition that can be used by agents to interact with VLM. 

282 

283 Args: 

284 tool_name: Name of the tool 

285 tool_description: Description of what the tool does 

286 

287 Returns: 

288 Tool definition dictionary 

289 """ 

290 return { 

291 "type": "function", 

292 "function": { 

293 "name": tool_name, 

294 "description": tool_description, 

295 "parameters": { 

296 "type": "object", 

297 "properties": { 

298 "action": { 

299 "type": "string", 

300 "enum": [ 

301 "type", "left_click", "right_click", "double_click", 

302 "hover", "scroll_up", "scroll_down", "wait", "hotkey", 

303 "list_folders_and_files", "Open_file_and_copy_paste", 

304 "open_file_gui", "write_file", "read_file_and_understand" 

305 ], 

306 "description": "The action to perform" 

307 }, 

308 "parameters": { 

309 "type": "object", 

310 "description": "Parameters for the action (e.g., {'text': 'Hello'}, {'coordinate': [100, 200]})" 

311 } 

312 }, 

313 "required": ["action"] 

314 } 

315 } 

316 } 

317 

318 def get_status_summary(self) -> Dict[str, Any]: 

319 """Get current status summary of VLM agent integration.""" 

320 return { 

321 "vlm_available": self.is_vlm_available(), 

322 "omniparser_available": self.is_omniparser_available(), 

323 "screen_history_count": len(self.screen_history), 

324 "action_history_count": len(self.action_history), 

325 "last_screen_capture": self.screen_history[-1]["timestamp"] if self.screen_history else None, 

326 "last_action": self.action_history[-1] if self.action_history else None 

327 } 

328 

329 

330# Singleton instance 

331_vlm_context = None 

332 

333def get_vlm_context(vlm_server_url: str = None, omniparser_url: str = None) -> VLMAgentContext: 

334 """Get or create the singleton VLM context instance.""" 

335 global _vlm_context 

336 if _vlm_context is None: 

337 _vlm_context = VLMAgentContext(vlm_server_url, omniparser_url) 

338 return _vlm_context 

339 

340 

341if __name__ == "__main__": 

342 # Test the VLM integration 

343 print("Testing VLM Agent Integration\n") 

344 

345 vlm = get_vlm_context() 

346 

347 # Check availability 

348 print(f"VLM Agent available: {vlm.is_vlm_available()}") 

349 print(f"OmniParser available: {vlm.is_omniparser_available()}") 

350 

351 # Get status 

352 status = vlm.get_status_summary() 

353 print(f"\nStatus: {json.dumps(status, indent=2)}") 

354 

355 # Test getting screen context (if available) 

356 if vlm.is_omniparser_available(): 

357 print("\nGetting screen context...") 

358 screen = vlm.get_screen_context() 

359 if screen: 

360 print(f"Screen dimensions: {screen.get('width')}x{screen.get('height')}") 

361 print(f"Detected elements: {len(screen.get('parsed_content_list', []))}") 

362 

363 # Test context injection 

364 print("\nTesting context injection...") 

365 task_context = { 

366 "task_id": "test_task", 

367 "description": "Test task for VLM integration" 

368 } 

369 enhanced_context = vlm.inject_visual_context_into_ledger_task(task_context) 

370 print(f"Enhanced context: {json.dumps(enhanced_context, indent=2)}")