Coverage for integrations / coding_agent / aider_core / utils.py: 28.5%

228 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1import os 

2import platform 

3import subprocess 

4import sys 

5import tempfile 

6from pathlib import Path 

7 

8try: 

9 import oslex 

10except ImportError: 

11 import shlex as oslex # Fallback to stdlib shlex 

12 

13from .dump import dump # noqa: F401 

14from .waiting import Spinner 

15 

16IMAGE_EXTENSIONS = {".png", ".jpg", ".jpeg", ".gif", ".bmp", ".tiff", ".webp", ".pdf"} 

17 

18 

19class IgnorantTemporaryDirectory: 

20 def __init__(self): 

21 if sys.version_info >= (3, 10): 

22 self.temp_dir = tempfile.TemporaryDirectory(ignore_cleanup_errors=True) 

23 else: 

24 self.temp_dir = tempfile.TemporaryDirectory() 

25 

26 def __enter__(self): 

27 return self.temp_dir.__enter__() 

28 

29 def __exit__(self, exc_type, exc_val, exc_tb): 

30 self.cleanup() 

31 

32 def cleanup(self): 

33 try: 

34 self.temp_dir.cleanup() 

35 except (OSError, PermissionError, RecursionError): 

36 pass # Ignore errors (Windows and potential recursion) 

37 

38 def __getattr__(self, item): 

39 return getattr(self.temp_dir, item) 

40 

41 

42class ChdirTemporaryDirectory(IgnorantTemporaryDirectory): 

43 def __init__(self): 

44 try: 

45 self.cwd = os.getcwd() 

46 except FileNotFoundError: 

47 self.cwd = None 

48 

49 super().__init__() 

50 

51 def __enter__(self): 

52 res = super().__enter__() 

53 os.chdir(Path(self.temp_dir.name).resolve()) 

54 return res 

55 

56 def __exit__(self, exc_type, exc_val, exc_tb): 

57 if self.cwd: 

58 try: 

59 os.chdir(self.cwd) 

60 except FileNotFoundError: 

61 pass 

62 super().__exit__(exc_type, exc_val, exc_tb) 

63 

64 

65class GitTemporaryDirectory(ChdirTemporaryDirectory): 

66 def __enter__(self): 

67 dname = super().__enter__() 

68 self.repo = make_repo(dname) 

69 return dname 

70 

71 def __exit__(self, exc_type, exc_val, exc_tb): 

72 del self.repo 

73 super().__exit__(exc_type, exc_val, exc_tb) 

74 

75 

76def make_repo(path=None): 

77 import git 

78 

79 if not path: 

80 path = "." 

81 repo = git.Repo.init(path) 

82 repo.config_writer().set_value("user", "name", "Test User").release() 

83 repo.config_writer().set_value("user", "email", "testuser@example.com").release() 

84 

85 return repo 

86 

87 

88def is_image_file(file_name): 

89 """ 

90 Check if the given file name has an image file extension. 

91 

92 :param file_name: The name of the file to check. 

93 :return: True if the file is an image, False otherwise. 

94 """ 

95 file_name = str(file_name) # Convert file_name to string 

96 return any(file_name.endswith(ext) for ext in IMAGE_EXTENSIONS) 

97 

98 

99def safe_abs_path(res): 

100 "Gives an abs path, which safely returns a full (not 8.3) windows path" 

101 try: 

102 res = Path(res).resolve() 

103 except (RuntimeError, OSError): 

104 res = Path(res).absolute() 

105 return str(res) 

106 

107 

108def format_content(role, content): 

109 formatted_lines = [] 

110 for line in content.splitlines(): 

111 formatted_lines.append(f"{role} {line}") 

112 return "\n".join(formatted_lines) 

113 

114 

115def format_messages(messages, title=None): 

116 output = [] 

117 if title: 

118 output.append(f"{title.upper()} {'*' * 50}") 

119 

120 for msg in messages: 

121 output.append("-------") 

122 role = msg["role"].upper() 

123 content = msg.get("content") 

124 if isinstance(content, list): # Handle list content (e.g., image messages) 

125 for item in content: 

126 if isinstance(item, dict): 

127 for key, value in item.items(): 

128 if isinstance(value, dict) and "url" in value: 

129 output.append(f"{role} {key.capitalize()} URL: {value['url']}") 

130 else: 

131 output.append(f"{role} {key}: {value}") 

132 else: 

133 output.append(f"{role} {item}") 

134 elif isinstance(content, str): # Handle string content 

135 output.append(format_content(role, content)) 

136 function_call = msg.get("function_call") 

137 if function_call: 

138 output.append(f"{role} Function Call: {function_call}") 

139 

140 return "\n".join(output) 

141 

142 

143def show_messages(messages, title=None, functions=None): 

144 formatted_output = format_messages(messages, title) 

145 print(formatted_output) 

146 

147 if functions: 

148 dump(functions) 

149 

150 

151def split_chat_history_markdown(text, include_tool=False): 

152 messages = [] 

153 user = [] 

154 assistant = [] 

155 tool = [] 

156 lines = text.splitlines(keepends=True) 

157 

158 def append_msg(role, lines): 

159 lines = "".join(lines) 

160 if lines.strip(): 

161 messages.append(dict(role=role, content=lines)) 

162 

163 for line in lines: 

164 if line.startswith("# "): 

165 continue 

166 if line.startswith("> "): 

167 append_msg("assistant", assistant) 

168 assistant = [] 

169 append_msg("user", user) 

170 user = [] 

171 tool.append(line[2:]) 

172 continue 

173 # if line.startswith("#### /"): 

174 # continue 

175 

176 if line.startswith("#### "): 

177 append_msg("assistant", assistant) 

178 assistant = [] 

179 append_msg("tool", tool) 

180 tool = [] 

181 

182 content = line[5:] 

183 user.append(content) 

184 continue 

185 

186 append_msg("user", user) 

187 user = [] 

188 append_msg("tool", tool) 

189 tool = [] 

190 

191 assistant.append(line) 

192 

193 append_msg("assistant", assistant) 

194 append_msg("user", user) 

195 

196 if not include_tool: 

197 messages = [m for m in messages if m["role"] != "tool"] 

198 

199 return messages 

200 

201 

202def get_pip_install(args): 

203 cmd = [ 

204 sys.executable, 

205 "-m", 

206 "pip", 

207 "install", 

208 "--upgrade", 

209 "--upgrade-strategy", 

210 "only-if-needed", 

211 ] 

212 cmd += args 

213 return cmd 

214 

215 

216def run_install(cmd): 

217 print() 

218 print("Installing:", printable_shell_command(cmd)) 

219 

220 # First ensure pip is available 

221 ensurepip_cmd = [sys.executable, "-m", "ensurepip", "--upgrade"] 

222 try: 

223 from core.subprocess_safe import hidden_popen_kwargs 

224 _hide = hidden_popen_kwargs() 

225 except Exception: 

226 _hide = {} 

227 try: 

228 subprocess.run(ensurepip_cmd, capture_output=True, check=False, **_hide) 

229 except Exception: 

230 pass # Continue even if ensurepip fails 

231 

232 try: 

233 output = [] 

234 process = subprocess.Popen( 

235 cmd, 

236 stdout=subprocess.PIPE, 

237 stderr=subprocess.STDOUT, 

238 text=True, 

239 bufsize=1, 

240 universal_newlines=True, 

241 encoding=sys.stdout.encoding, 

242 errors="replace", 

243 **_hide, 

244 ) 

245 spinner = Spinner("Installing...") 

246 

247 while True: 

248 char = process.stdout.read(1) 

249 if not char: 

250 break 

251 

252 output.append(char) 

253 spinner.step() 

254 

255 spinner.end() 

256 return_code = process.wait() 

257 output = "".join(output) 

258 

259 if return_code == 0: 

260 print("Installation complete.") 

261 print() 

262 return True, output 

263 

264 except subprocess.CalledProcessError as e: 

265 print(f"\nError running pip install: {e}") 

266 

267 print("\nInstallation failed.\n") 

268 

269 return False, output 

270 

271 

272def find_common_root(abs_fnames): 

273 try: 

274 if len(abs_fnames) == 1: 

275 return safe_abs_path(os.path.dirname(list(abs_fnames)[0])) 

276 elif abs_fnames: 

277 return safe_abs_path(os.path.commonpath(list(abs_fnames))) 

278 except OSError: 

279 pass 

280 

281 try: 

282 return safe_abs_path(os.getcwd()) 

283 except FileNotFoundError: 

284 # Fallback if cwd is deleted 

285 return "." 

286 

287 

288def format_tokens(count): 

289 if count < 1000: 

290 return f"{count}" 

291 elif count < 10000: 

292 return f"{count / 1000:.1f}k" 

293 else: 

294 return f"{round(count / 1000)}k" 

295 

296 

297def touch_file(fname): 

298 fname = Path(fname) 

299 try: 

300 fname.parent.mkdir(parents=True, exist_ok=True) 

301 fname.touch() 

302 return True 

303 except OSError: 

304 return False 

305 

306 

307def check_pip_install_extra(io, module, prompt, pip_install_cmd, self_update=False): 

308 if module: 

309 try: 

310 __import__(module) 

311 return True 

312 except (ImportError, ModuleNotFoundError, RuntimeError): 

313 pass 

314 

315 cmd = get_pip_install(pip_install_cmd) 

316 

317 if prompt: 

318 io.tool_warning(prompt) 

319 

320 if self_update and platform.system() == "Windows": 

321 io.tool_output("Run this command to update:") 

322 print() 

323 print(printable_shell_command(cmd)) # plain print so it doesn't line-wrap 

324 return 

325 

326 if not io.confirm_ask("Run pip install?", default="y", subject=printable_shell_command(cmd)): 

327 return 

328 

329 success, output = run_install(cmd) 

330 if success: 

331 if not module: 

332 return True 

333 try: 

334 __import__(module) 

335 return True 

336 except (ImportError, ModuleNotFoundError, RuntimeError) as err: 

337 io.tool_error(str(err)) 

338 pass 

339 

340 io.tool_error(output) 

341 

342 print() 

343 print("Install failed, try running this command manually:") 

344 print(printable_shell_command(cmd)) 

345 

346 

347def printable_shell_command(cmd_list): 

348 """ 

349 Convert a list of command arguments to a properly shell-escaped string. 

350 

351 Args: 

352 cmd_list (list): List of command arguments. 

353 

354 Returns: 

355 str: Shell-escaped command string. 

356 """ 

357 return oslex.join(cmd_list)