Coverage for integrations / coding_agent / aider_native_backend.py: 39.0%

146 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Aider Native Backend — In-process coding backend using vendored Aider modules. 

3 

4Unlike KiloCode/ClaudeCode/OpenCode backends which shell out via subprocess, 

5this backend runs Aider's code intelligence in-process for: 

6- Zero-latency startup (no subprocess spawn) 

7- Direct access to repo map, edit diffs, linting 

8- Recipe integration (edit results flow into HARTOS recipe pattern) 

9- Budget gate integration (metered usage tracking) 

10 

11Requires: tree-sitter, tree-sitter-language-pack, grep-ast, diskcache, 

12 diff-match-patch, gitpython (all in requirements.txt) 

13""" 

14import logging 

15import os 

16import time 

17from pathlib import Path 

18from typing import Dict, List, Optional 

19 

20from .tool_backends import CodingToolBackend 

21 

22logger = logging.getLogger('hevolve.coding_agent') 

23 

24# Lazy import flag — set on first is_installed() check 

25_AIDER_CORE_AVAILABLE = None 

26 

27 

28def _check_aider_core(): 

29 """Check if vendored aider_core modules are importable.""" 

30 global _AIDER_CORE_AVAILABLE 

31 if _AIDER_CORE_AVAILABLE is None: 

32 try: 

33 from .aider_core.repomap import RepoMap 

34 from .aider_core.coders.search_replace import flexible_search_and_replace 

35 from .aider_core.io_adapter import SimpleIO 

36 from .aider_core.hart_model_adapter import HartModelAdapter 

37 _AIDER_CORE_AVAILABLE = True 

38 except ImportError as e: 

39 logger.debug(f"Aider core not available: {e}") 

40 _AIDER_CORE_AVAILABLE = False 

41 return _AIDER_CORE_AVAILABLE 

42 

43 

44class AiderNativeBackend(CodingToolBackend): 

45 """In-process Aider backend using vendored modules. 

46 

47 This is NOT a subprocess wrapper — it runs Aider's code intelligence 

48 directly in the HARTOS Python process. 

49 """ 

50 

51 name = 'aider_native' 

52 binary = '' # No external binary needed 

53 strengths = [ 

54 'code_review', 'refactoring', 'multi_file_edit', 

55 'repo_understanding', 'architecture', 'debugging', 

56 ] 

57 

58 def is_installed(self) -> bool: 

59 """Check if vendored aider_core modules are available.""" 

60 return _check_aider_core() 

61 

62 def build_command(self, task: str, context: Optional[Dict] = None) -> List[str]: 

63 """Not used — this backend runs in-process, not subprocess.""" 

64 return [] 

65 

66 def parse_output(self, stdout: str, stderr: str, returncode: int) -> Dict: 

67 """Not used — this backend runs in-process, not subprocess.""" 

68 return {'success': True, 'output': stdout} 

69 

70 def get_capabilities(self) -> Dict: 

71 caps = super().get_capabilities() 

72 caps['type'] = 'native' 

73 caps['features'] = ['repo_map', 'search_replace', 'linting', 'recipe_capture'] 

74 return caps 

75 

76 def execute(self, task: str, context: Optional[Dict] = None, 

77 timeout: int = 300) -> Dict: 

78 """Execute a coding task using in-process Aider intelligence. 

79 

80 This is a TERMINAL operation — runs code analysis/editing in-process. 

81 Never re-dispatches to /chat or creates new agents. 

82 

83 Returns: 

84 {success, output, tool, execution_time_s, repo_map?, files_changed?, error?} 

85 """ 

86 if not self.is_installed(): 

87 return { 

88 'success': False, 

89 'output': '', 

90 'tool': self.name, 

91 'execution_time_s': 0, 

92 'error': 'Aider core not available (missing dependencies)', 

93 } 

94 

95 start = time.time() 

96 try: 

97 result = self._execute_task(task, context or {}) 

98 elapsed = time.time() - start 

99 result['tool'] = self.name 

100 result['execution_time_s'] = round(elapsed, 2) 

101 return result 

102 except Exception as e: 

103 elapsed = time.time() - start 

104 logger.error(f"Aider native execution failed: {e}", exc_info=True) 

105 return { 

106 'success': False, 

107 'output': '', 

108 'tool': self.name, 

109 'execution_time_s': round(elapsed, 2), 

110 'error': str(e), 

111 } 

112 

113 def _execute_task(self, task: str, context: Dict) -> Dict: 

114 """Core task execution logic.""" 

115 from .aider_core.hart_model_adapter import HartModelAdapter, send_completion 

116 from .aider_core.io_adapter import SimpleIO 

117 from .aider_core.coders.search_replace import ( 

118 flexible_search_and_replace, editblock_strategies, 

119 ) 

120 

121 working_dir = context.get('working_dir', '.') 

122 files = context.get('files', []) 

123 task_type = context.get('task_type', 'feature') 

124 

125 io = SimpleIO() 

126 model = HartModelAdapter.from_hartos_config() 

127 

128 # Build repo map for context (the key differentiator) 

129 repo_map_text = '' 

130 if not files: 

131 # Auto-discover relevant files via repo map 

132 repo_map_text = self._get_repo_map(working_dir, io, model, files) 

133 

134 # Build system prompt based on task type 

135 system_prompt = self._build_system_prompt(task_type, repo_map_text) 

136 

137 # Send to LLM 

138 messages = [ 

139 {'role': 'system', 'content': system_prompt}, 

140 {'role': 'user', 'content': task}, 

141 ] 

142 

143 # Add file contents if specified 

144 if files: 

145 file_contents = self._read_files(files, working_dir) 

146 if file_contents: 

147 messages.insert(1, { 

148 'role': 'user', 

149 'content': f"Here are the files to work with:\n\n{file_contents}", 

150 }) 

151 

152 model_name = context.get('model', '') 

153 user_id = context.get('user_id', '') 

154 response = send_completion( 

155 messages, model=model_name, user_id=user_id, 

156 ) 

157 

158 if response is None: 

159 return { 

160 'success': False, 

161 'output': '', 

162 'error': 'LLM completion failed', 

163 } 

164 

165 # Parse edit blocks from response and apply them 

166 applied_edits = self._apply_edits(response, working_dir, files) 

167 

168 output_parts = [response] 

169 if applied_edits: 

170 output_parts.append( 

171 f"\n--- Applied {len(applied_edits)} edit(s) ---" 

172 ) 

173 for edit in applied_edits: 

174 output_parts.append(f" {edit['file']}: {edit['status']}") 

175 

176 return { 

177 'success': True, 

178 'output': '\n'.join(output_parts), 

179 'repo_map': repo_map_text[:2000] if repo_map_text else '', 

180 'files_changed': [e['file'] for e in applied_edits if e['status'] == 'applied'], 

181 'edits': applied_edits, 

182 } 

183 

184 def get_repo_map(self, working_dir: str = '.', files: Optional[List[str]] = None, 

185 max_tokens: int = 2048) -> str: 

186 """Get tree-sitter based repo map for a directory. 

187 

188 Exposed for use by other HARTOS components (e.g., AutoGen tool). 

189 

190 Args: 

191 working_dir: Root directory to map 

192 files: Optional list of files to focus on (chat files) 

193 max_tokens: Maximum tokens for the map 

194 

195 Returns: 

196 Formatted repo map string with function/class signatures. 

197 """ 

198 if not self.is_installed(): 

199 return 'Repo map unavailable (aider core not installed)' 

200 

201 from .aider_core.io_adapter import SimpleIO 

202 from .aider_core.hart_model_adapter import HartModelAdapter 

203 

204 io = SimpleIO() 

205 model = HartModelAdapter.from_hartos_config() 

206 return self._get_repo_map(working_dir, io, model, files or [], max_tokens) 

207 

208 def _get_repo_map(self, working_dir: str, io, model, chat_files: List[str], 

209 max_tokens: int = 2048) -> str: 

210 """Internal: generate repo map using vendored RepoMap.""" 

211 try: 

212 from .aider_core.repomap import RepoMap 

213 

214 abs_dir = str(Path(working_dir).resolve()) 

215 rm = RepoMap( 

216 root=abs_dir, 

217 io=io, 

218 main_model=model, 

219 map_tokens=max_tokens, 

220 ) 

221 

222 # Collect all source files in directory 

223 other_files = [] 

224 for root, dirs, filenames in os.walk(abs_dir): 

225 # Skip hidden dirs and common non-source dirs 

226 dirs[:] = [d for d in dirs if not d.startswith('.') and d not in ( 

227 'node_modules', '__pycache__', 'venv', '.git', 'dist', 'build', 

228 )] 

229 for fname in filenames: 

230 fpath = os.path.join(root, fname) 

231 other_files.append(fpath) 

232 

233 # Resolve chat files to absolute paths 

234 abs_chat = [str(Path(f).resolve()) for f in chat_files] 

235 

236 # Remove chat files from other_files 

237 other_files = [f for f in other_files if f not in abs_chat] 

238 

239 repo_map = rm.get_repo_map( 

240 chat_files=abs_chat, 

241 other_files=other_files, 

242 ) 

243 return repo_map or '' 

244 

245 except Exception as e: 

246 logger.warning(f"Repo map generation failed: {e}") 

247 return '' 

248 

249 def _build_system_prompt(self, task_type: str, repo_map: str) -> str: 

250 """Build system prompt for LLM based on task type.""" 

251 prompt = ( 

252 "You are an expert coding assistant. " 

253 "When making code changes, use SEARCH/REPLACE blocks:\n\n" 

254 "```\n" 

255 "<<<<<<< SEARCH\n" 

256 "exact code to find\n" 

257 "=======\n" 

258 "replacement code\n" 

259 ">>>>>>> REPLACE\n" 

260 "```\n\n" 

261 "Always include the filename before each block as: `filename.py`\n" 

262 ) 

263 

264 if task_type == 'code_review': 

265 prompt += "\nFocus on code quality, bugs, security issues, and improvements.\n" 

266 elif task_type == 'refactor': 

267 prompt += "\nFocus on improving code structure without changing behavior.\n" 

268 elif task_type == 'bug_fix': 

269 prompt += "\nFocus on identifying and fixing the bug described.\n" 

270 

271 if repo_map: 

272 prompt += f"\n## Repository structure:\n{repo_map}\n" 

273 

274 return prompt 

275 

276 def _read_files(self, files: List[str], working_dir: str) -> str: 

277 """Read file contents for inclusion in prompt.""" 

278 parts = [] 

279 for fname in files[:10]: # Cap at 10 files 

280 fpath = Path(working_dir) / fname 

281 try: 

282 content = fpath.read_text(encoding='utf-8', errors='replace') 

283 parts.append(f"### {fname}\n```\n{content}\n```\n") 

284 except (OSError, UnicodeDecodeError): 

285 parts.append(f"### {fname}\n(could not read)\n") 

286 return '\n'.join(parts) 

287 

288 def _apply_edits(self, response: str, working_dir: str, 

289 files: List[str]) -> List[Dict]: 

290 """Parse SEARCH/REPLACE blocks from LLM response and apply them.""" 

291 from .aider_core.coders.search_replace import ( 

292 flexible_search_and_replace, editblock_strategies, 

293 ) 

294 

295 edits = self._parse_edit_blocks(response) 

296 results = [] 

297 

298 for edit in edits: 

299 fname = edit['file'] 

300 fpath = Path(working_dir) / fname 

301 

302 if not fpath.exists(): 

303 results.append({'file': fname, 'status': 'skipped', 'reason': 'file not found'}) 

304 continue 

305 

306 try: 

307 original = fpath.read_text(encoding='utf-8', errors='replace') 

308 texts = (edit['search'], edit['replace'], original) 

309 new_text = flexible_search_and_replace(texts, editblock_strategies) 

310 

311 if new_text and new_text != original: 

312 fpath.write_text(new_text, encoding='utf-8') 

313 results.append({ 

314 'file': fname, 

315 'status': 'applied', 

316 'search': edit['search'][:100], 

317 'replace': edit['replace'][:100], 

318 }) 

319 logger.info(f"Applied edit to {fname}") 

320 elif new_text == original: 

321 results.append({'file': fname, 'status': 'no_change'}) 

322 else: 

323 results.append({'file': fname, 'status': 'failed', 'reason': 'search text not found'}) 

324 except Exception as e: 

325 results.append({'file': fname, 'status': 'error', 'reason': str(e)}) 

326 

327 return results 

328 

329 @staticmethod 

330 def _parse_edit_blocks(response: str) -> List[Dict]: 

331 """Parse SEARCH/REPLACE blocks from LLM response. 

332 

333 Expected format: 

334 `filename.py` 

335 <<<<<<< SEARCH 

336 exact code to find 

337 ======= 

338 replacement code 

339 >>>>>>> REPLACE 

340 """ 

341 import re 

342 

343 blocks = [] 

344 # Match filename followed by search/replace block 

345 pattern = re.compile( 

346 r'`([^`]+\.\w+)`\s*\n' 

347 r'<<<<<<< SEARCH\n' 

348 r'(.*?)\n' 

349 r'=======\n' 

350 r'(.*?)\n' 

351 r'>>>>>>> REPLACE', 

352 re.DOTALL, 

353 ) 

354 

355 for match in pattern.finditer(response): 

356 blocks.append({ 

357 'file': match.group(1).strip(), 

358 'search': match.group(2), 

359 'replace': match.group(3), 

360 }) 

361 

362 return blocks