Coverage for integrations / coding_agent / remote_executor.py: 0.0%
62 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Remote Desktop Executor — CLI bridge to Nunba /execute and /screenshot endpoints.
4Handles:
5- HTTP dispatch to remote Nunba instance
6- Security pre-checks (action_classifier, DLP)
7- Screenshot capture
8- Local VLM desktop automation (via existing local_loop)
9"""
10import json
11import logging
12import os
13from typing import Dict, Optional
15from core.http_pool import pooled_get, pooled_post
16from core.port_registry import get_port
18logger = logging.getLogger('hevolve.coding_agent')
21class RemoteDesktopExecutor:
22 """Bridge CLI commands to Nunba /execute and /screenshot endpoints."""
24 def __init__(self, nunba_url: str = f'http://localhost:{get_port("backend")}'):
25 self.base_url = nunba_url.rstrip('/')
27 def execute(self, command: str, timeout: int = 120,
28 force: bool = False) -> Dict:
29 """Execute a command on a remote machine via Nunba /execute endpoint.
31 Args:
32 command: The command to execute
33 timeout: Execution timeout in seconds
34 force: If True, bypass destructive command check
36 Returns:
37 {success, output, returncode?, error?}
38 """
39 # Security pre-check: destructive command detection
40 if not force:
41 blocked = self._check_security(command)
42 if blocked:
43 return blocked
45 import requests
47 try:
48 resp = pooled_post(
49 f'{self.base_url}/execute',
50 json={'command': command, 'timeout': timeout},
51 timeout=timeout + 10,
52 )
53 if resp.status_code == 200:
54 data = resp.json()
55 return {
56 'success': data.get('returncode', 1) == 0,
57 'output': data.get('output', ''),
58 'returncode': data.get('returncode', -1),
59 }
60 else:
61 return {
62 'success': False,
63 'output': '',
64 'error': f'HTTP {resp.status_code}: {resp.text[:200]}',
65 }
66 except requests.ConnectionError:
67 return {
68 'success': False,
69 'output': '',
70 'error': f'Cannot connect to Nunba at {self.base_url}',
71 }
72 except requests.Timeout:
73 return {
74 'success': False,
75 'output': '',
76 'error': f'Request timed out after {timeout}s',
77 }
78 except Exception as e:
79 return {
80 'success': False,
81 'output': '',
82 'error': str(e),
83 }
85 def screenshot(self) -> Dict:
86 """Capture screenshot from remote Nunba /screenshot endpoint.
88 Returns:
89 {success, image_base64?, content_type?, error?}
90 """
91 import requests
93 try:
94 resp = pooled_get(
95 f'{self.base_url}/screenshot',
96 timeout=30,
97 )
98 if resp.status_code == 200:
99 content_type = resp.headers.get('Content-Type', '')
100 if 'json' in content_type:
101 data = resp.json()
102 return {
103 'success': True,
104 'image_base64': data.get('image', data.get('screenshot', '')),
105 'content_type': 'image/png',
106 }
107 else:
108 import base64
109 return {
110 'success': True,
111 'image_base64': base64.b64encode(resp.content).decode(),
112 'content_type': content_type,
113 }
114 else:
115 return {
116 'success': False,
117 'error': f'HTTP {resp.status_code}: {resp.text[:200]}',
118 }
119 except requests.ConnectionError:
120 return {
121 'success': False,
122 'error': f'Cannot connect to Nunba at {self.base_url}',
123 }
124 except Exception as e:
125 return {
126 'success': False,
127 'error': str(e),
128 }
130 def execute_desktop_task(self, instruction: str, target: str = 'local',
131 nunba_url: str = '') -> Dict:
132 """Execute a desktop automation task (VLM agentic loop).
134 Args:
135 instruction: Natural language instruction (e.g., "open Chrome")
136 target: 'local' for in-process VLM loop, 'remote' for Nunba dispatch
137 nunba_url: Override Nunba URL for remote execution
139 Returns:
140 {success, output, error?}
141 """
142 if target == 'remote':
143 url = nunba_url or self.base_url
144 executor = RemoteDesktopExecutor(url)
145 return executor.execute(instruction)
147 # Local execution via existing VLM pipeline
148 try:
149 from integrations.vlm.local_loop import run_local_agentic_loop
150 result = run_local_agentic_loop(instruction)
151 return {
152 'success': True,
153 'output': json.dumps(result, default=str),
154 }
155 except ImportError:
156 return {
157 'success': False,
158 'output': '',
159 'error': 'VLM pipeline not available (pyautogui/OmniParser not installed)',
160 }
161 except Exception as e:
162 return {
163 'success': False,
164 'output': '',
165 'error': str(e),
166 }
168 def _check_security(self, command: str) -> Optional[Dict]:
169 """Run security checks on command before remote dispatch.
171 Returns error dict if blocked, None if OK.
172 """
173 # Action classifier — detect destructive patterns
174 try:
175 from security.action_classifier import classify_action
176 classification = classify_action(command)
177 if classification == 'destructive':
178 return {
179 'success': False,
180 'output': '',
181 'error': (
182 f'Destructive command detected: {command[:100]}. '
183 'Use --force to override.'
184 ),
185 }
186 except ImportError:
187 pass
189 # DLP — scan for PII before sending to remote
190 try:
191 from security.dlp_engine import get_dlp_engine
192 dlp = get_dlp_engine()
193 allowed, reason = dlp.check_outbound(command)
194 if not allowed:
195 return {
196 'success': False,
197 'output': '',
198 'error': f'DLP blocked: {reason or "PII detected"}',
199 }
200 except ImportError:
201 pass
203 return None