Coverage for integrations / coding_agent / aider_core / utils.py: 28.5%
228 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1import os
2import platform
3import subprocess
4import sys
5import tempfile
6from pathlib import Path
8try:
9 import oslex
10except ImportError:
11 import shlex as oslex # Fallback to stdlib shlex
13from .dump import dump # noqa: F401
14from .waiting import Spinner
16IMAGE_EXTENSIONS = {".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tiff", ".webp", ".pdf"}
19class IgnorantTemporaryDirectory:
20 def __init__(self):
21 if sys.version_info >= (3, 10):
22 self.temp_dir = tempfile.TemporaryDirectory(ignore_cleanup_errors=True)
23 else:
24 self.temp_dir = tempfile.TemporaryDirectory()
26 def __enter__(self):
27 return self.temp_dir.__enter__()
29 def __exit__(self, exc_type, exc_val, exc_tb):
30 self.cleanup()
32 def cleanup(self):
33 try:
34 self.temp_dir.cleanup()
35 except (OSError, PermissionError, RecursionError):
36 pass # Ignore errors (Windows and potential recursion)
38 def __getattr__(self, item):
39 return getattr(self.temp_dir, item)
42class ChdirTemporaryDirectory(IgnorantTemporaryDirectory):
43 def __init__(self):
44 try:
45 self.cwd = os.getcwd()
46 except FileNotFoundError:
47 self.cwd = None
49 super().__init__()
51 def __enter__(self):
52 res = super().__enter__()
53 os.chdir(Path(self.temp_dir.name).resolve())
54 return res
56 def __exit__(self, exc_type, exc_val, exc_tb):
57 if self.cwd:
58 try:
59 os.chdir(self.cwd)
60 except FileNotFoundError:
61 pass
62 super().__exit__(exc_type, exc_val, exc_tb)
65class GitTemporaryDirectory(ChdirTemporaryDirectory):
66 def __enter__(self):
67 dname = super().__enter__()
68 self.repo = make_repo(dname)
69 return dname
71 def __exit__(self, exc_type, exc_val, exc_tb):
72 del self.repo
73 super().__exit__(exc_type, exc_val, exc_tb)
76def make_repo(path=None):
77 import git
79 if not path:
80 path = "."
81 repo = git.Repo.init(path)
82 repo.config_writer().set_value("user", "name", "Test User").release()
83 repo.config_writer().set_value("user", "email", "testuser@example.com").release()
85 return repo
88def is_image_file(file_name):
89 """
90 Check if the given file name has an image file extension.
92 :param file_name: The name of the file to check.
93 :return: True if the file is an image, False otherwise.
94 """
95 file_name = str(file_name) # Convert file_name to string
96 return any(file_name.endswith(ext) for ext in IMAGE_EXTENSIONS)
99def safe_abs_path(res):
100 "Gives an abs path, which safely returns a full (not 8.3) windows path"
101 try:
102 res = Path(res).resolve()
103 except (RuntimeError, OSError):
104 res = Path(res).absolute()
105 return str(res)
108def format_content(role, content):
109 formatted_lines = []
110 for line in content.splitlines():
111 formatted_lines.append(f"{role} {line}")
112 return "\n".join(formatted_lines)
115def format_messages(messages, title=None):
116 output = []
117 if title:
118 output.append(f"{title.upper()} {'*' * 50}")
120 for msg in messages:
121 output.append("-------")
122 role = msg["role"].upper()
123 content = msg.get("content")
124 if isinstance(content, list): # Handle list content (e.g., image messages)
125 for item in content:
126 if isinstance(item, dict):
127 for key, value in item.items():
128 if isinstance(value, dict) and "url" in value:
129 output.append(f"{role} {key.capitalize()} URL: {value['url']}")
130 else:
131 output.append(f"{role} {key}: {value}")
132 else:
133 output.append(f"{role} {item}")
134 elif isinstance(content, str): # Handle string content
135 output.append(format_content(role, content))
136 function_call = msg.get("function_call")
137 if function_call:
138 output.append(f"{role} Function Call: {function_call}")
140 return "\n".join(output)
143def show_messages(messages, title=None, functions=None):
144 formatted_output = format_messages(messages, title)
145 print(formatted_output)
147 if functions:
148 dump(functions)
151def split_chat_history_markdown(text, include_tool=False):
152 messages = []
153 user = []
154 assistant = []
155 tool = []
156 lines = text.splitlines(keepends=True)
158 def append_msg(role, lines):
159 lines = "".join(lines)
160 if lines.strip():
161 messages.append(dict(role=role, content=lines))
163 for line in lines:
164 if line.startswith("# "):
165 continue
166 if line.startswith("> "):
167 append_msg("assistant", assistant)
168 assistant = []
169 append_msg("user", user)
170 user = []
171 tool.append(line[2:])
172 continue
173 # if line.startswith("#### /"):
174 # continue
176 if line.startswith("#### "):
177 append_msg("assistant", assistant)
178 assistant = []
179 append_msg("tool", tool)
180 tool = []
182 content = line[5:]
183 user.append(content)
184 continue
186 append_msg("user", user)
187 user = []
188 append_msg("tool", tool)
189 tool = []
191 assistant.append(line)
193 append_msg("assistant", assistant)
194 append_msg("user", user)
196 if not include_tool:
197 messages = [m for m in messages if m["role"] != "tool"]
199 return messages
202def get_pip_install(args):
203 cmd = [
204 sys.executable,
205 "-m",
206 "pip",
207 "install",
208 "--upgrade",
209 "--upgrade-strategy",
210 "only-if-needed",
211 ]
212 cmd += args
213 return cmd
216def run_install(cmd):
217 print()
218 print("Installing:", printable_shell_command(cmd))
220 # First ensure pip is available
221 ensurepip_cmd = [sys.executable, "-m", "ensurepip", "--upgrade"]
222 try:
223 from core.subprocess_safe import hidden_popen_kwargs
224 _hide = hidden_popen_kwargs()
225 except Exception:
226 _hide = {}
227 try:
228 subprocess.run(ensurepip_cmd, capture_output=True, check=False, **_hide)
229 except Exception:
230 pass # Continue even if ensurepip fails
232 try:
233 output = []
234 process = subprocess.Popen(
235 cmd,
236 stdout=subprocess.PIPE,
237 stderr=subprocess.STDOUT,
238 text=True,
239 bufsize=1,
240 universal_newlines=True,
241 encoding=sys.stdout.encoding,
242 errors="replace",
243 **_hide,
244 )
245 spinner = Spinner("Installing...")
247 while True:
248 char = process.stdout.read(1)
249 if not char:
250 break
252 output.append(char)
253 spinner.step()
255 spinner.end()
256 return_code = process.wait()
257 output = "".join(output)
259 if return_code == 0:
260 print("Installation complete.")
261 print()
262 return True, output
264 except subprocess.CalledProcessError as e:
265 print(f"\nError running pip install: {e}")
267 print("\nInstallation failed.\n")
269 return False, output
272def find_common_root(abs_fnames):
273 try:
274 if len(abs_fnames) == 1:
275 return safe_abs_path(os.path.dirname(list(abs_fnames)[0]))
276 elif abs_fnames:
277 return safe_abs_path(os.path.commonpath(list(abs_fnames)))
278 except OSError:
279 pass
281 try:
282 return safe_abs_path(os.getcwd())
283 except FileNotFoundError:
284 # Fallback if cwd is deleted
285 return "."
288def format_tokens(count):
289 if count < 1000:
290 return f"{count}"
291 elif count < 10000:
292 return f"{count / 1000:.1f}k"
293 else:
294 return f"{round(count / 1000)}k"
297def touch_file(fname):
298 fname = Path(fname)
299 try:
300 fname.parent.mkdir(parents=True, exist_ok=True)
301 fname.touch()
302 return True
303 except OSError:
304 return False
307def check_pip_install_extra(io, module, prompt, pip_install_cmd, self_update=False):
308 if module:
309 try:
310 __import__(module)
311 return True
312 except (ImportError, ModuleNotFoundError, RuntimeError):
313 pass
315 cmd = get_pip_install(pip_install_cmd)
317 if prompt:
318 io.tool_warning(prompt)
320 if self_update and platform.system() == "Windows":
321 io.tool_output("Run this command to update:")
322 print()
323 print(printable_shell_command(cmd)) # plain print so it doesn't line-wrap
324 return
326 if not io.confirm_ask("Run pip install?", default="y", subject=printable_shell_command(cmd)):
327 return
329 success, output = run_install(cmd)
330 if success:
331 if not module:
332 return True
333 try:
334 __import__(module)
335 return True
336 except (ImportError, ModuleNotFoundError, RuntimeError) as err:
337 io.tool_error(str(err))
338 pass
340 io.tool_error(output)
342 print()
343 print("Install failed, try running this command manually:")
344 print(printable_shell_command(cmd))
347def printable_shell_command(cmd_list):
348 """
349 Convert a list of command arguments to a properly shell-escaped string.
351 Args:
352 cmd_list (list): List of command arguments.
354 Returns:
355 str: Shell-escaped command string.
356 """
357 return oslex.join(cmd_list)