Coverage for integrations / coding_agent / aider_native_backend.py: 39.0%
146 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Aider Native Backend — In-process coding backend using vendored Aider modules.
4Unlike KiloCode/ClaudeCode/OpenCode backends which shell out via subprocess,
5this backend runs Aider's code intelligence in-process for:
6- Zero-latency startup (no subprocess spawn)
7- Direct access to repo map, edit diffs, linting
8- Recipe integration (edit results flow into HARTOS recipe pattern)
9- Budget gate integration (metered usage tracking)
11Requires: tree-sitter, tree-sitter-language-pack, grep-ast, diskcache,
12 diff-match-patch, gitpython (all in requirements.txt)
13"""
14import logging
15import os
16import time
17from pathlib import Path
18from typing import Dict, List, Optional
20from .tool_backends import CodingToolBackend
22logger = logging.getLogger('hevolve.coding_agent')
24# Lazy import flag — set on first is_installed() check
25_AIDER_CORE_AVAILABLE = None
28def _check_aider_core():
29 """Check if vendored aider_core modules are importable."""
30 global _AIDER_CORE_AVAILABLE
31 if _AIDER_CORE_AVAILABLE is None:
32 try:
33 from .aider_core.repomap import RepoMap
34 from .aider_core.coders.search_replace import flexible_search_and_replace
35 from .aider_core.io_adapter import SimpleIO
36 from .aider_core.hart_model_adapter import HartModelAdapter
37 _AIDER_CORE_AVAILABLE = True
38 except ImportError as e:
39 logger.debug(f"Aider core not available: {e}")
40 _AIDER_CORE_AVAILABLE = False
41 return _AIDER_CORE_AVAILABLE
44class AiderNativeBackend(CodingToolBackend):
45 """In-process Aider backend using vendored modules.
47 This is NOT a subprocess wrapper — it runs Aider's code intelligence
48 directly in the HARTOS Python process.
49 """
51 name = 'aider_native'
52 binary = '' # No external binary needed
53 strengths = [
54 'code_review', 'refactoring', 'multi_file_edit',
55 'repo_understanding', 'architecture', 'debugging',
56 ]
58 def is_installed(self) -> bool:
59 """Check if vendored aider_core modules are available."""
60 return _check_aider_core()
62 def build_command(self, task: str, context: Optional[Dict] = None) -> List[str]:
63 """Not used — this backend runs in-process, not subprocess."""
64 return []
66 def parse_output(self, stdout: str, stderr: str, returncode: int) -> Dict:
67 """Not used — this backend runs in-process, not subprocess."""
68 return {'success': True, 'output': stdout}
70 def get_capabilities(self) -> Dict:
71 caps = super().get_capabilities()
72 caps['type'] = 'native'
73 caps['features'] = ['repo_map', 'search_replace', 'linting', 'recipe_capture']
74 return caps
76 def execute(self, task: str, context: Optional[Dict] = None,
77 timeout: int = 300) -> Dict:
78 """Execute a coding task using in-process Aider intelligence.
80 This is a TERMINAL operation — runs code analysis/editing in-process.
81 Never re-dispatches to /chat or creates new agents.
83 Returns:
84 {success, output, tool, execution_time_s, repo_map?, files_changed?, error?}
85 """
86 if not self.is_installed():
87 return {
88 'success': False,
89 'output': '',
90 'tool': self.name,
91 'execution_time_s': 0,
92 'error': 'Aider core not available (missing dependencies)',
93 }
95 start = time.time()
96 try:
97 result = self._execute_task(task, context or {})
98 elapsed = time.time() - start
99 result['tool'] = self.name
100 result['execution_time_s'] = round(elapsed, 2)
101 return result
102 except Exception as e:
103 elapsed = time.time() - start
104 logger.error(f"Aider native execution failed: {e}", exc_info=True)
105 return {
106 'success': False,
107 'output': '',
108 'tool': self.name,
109 'execution_time_s': round(elapsed, 2),
110 'error': str(e),
111 }
113 def _execute_task(self, task: str, context: Dict) -> Dict:
114 """Core task execution logic."""
115 from .aider_core.hart_model_adapter import HartModelAdapter, send_completion
116 from .aider_core.io_adapter import SimpleIO
117 from .aider_core.coders.search_replace import (
118 flexible_search_and_replace, editblock_strategies,
119 )
121 working_dir = context.get('working_dir', '.')
122 files = context.get('files', [])
123 task_type = context.get('task_type', 'feature')
125 io = SimpleIO()
126 model = HartModelAdapter.from_hartos_config()
128 # Build repo map for context (the key differentiator)
129 repo_map_text = ''
130 if not files:
131 # Auto-discover relevant files via repo map
132 repo_map_text = self._get_repo_map(working_dir, io, model, files)
134 # Build system prompt based on task type
135 system_prompt = self._build_system_prompt(task_type, repo_map_text)
137 # Send to LLM
138 messages = [
139 {'role': 'system', 'content': system_prompt},
140 {'role': 'user', 'content': task},
141 ]
143 # Add file contents if specified
144 if files:
145 file_contents = self._read_files(files, working_dir)
146 if file_contents:
147 messages.insert(1, {
148 'role': 'user',
149 'content': f"Here are the files to work with:\n\n{file_contents}",
150 })
152 model_name = context.get('model', '')
153 user_id = context.get('user_id', '')
154 response = send_completion(
155 messages, model=model_name, user_id=user_id,
156 )
158 if response is None:
159 return {
160 'success': False,
161 'output': '',
162 'error': 'LLM completion failed',
163 }
165 # Parse edit blocks from response and apply them
166 applied_edits = self._apply_edits(response, working_dir, files)
168 output_parts = [response]
169 if applied_edits:
170 output_parts.append(
171 f"\n--- Applied {len(applied_edits)} edit(s) ---"
172 )
173 for edit in applied_edits:
174 output_parts.append(f" {edit['file']}: {edit['status']}")
176 return {
177 'success': True,
178 'output': '\n'.join(output_parts),
179 'repo_map': repo_map_text[:2000] if repo_map_text else '',
180 'files_changed': [e['file'] for e in applied_edits if e['status'] == 'applied'],
181 'edits': applied_edits,
182 }
184 def get_repo_map(self, working_dir: str = '.', files: Optional[List[str]] = None,
185 max_tokens: int = 2048) -> str:
186 """Get tree-sitter based repo map for a directory.
188 Exposed for use by other HARTOS components (e.g., AutoGen tool).
190 Args:
191 working_dir: Root directory to map
192 files: Optional list of files to focus on (chat files)
193 max_tokens: Maximum tokens for the map
195 Returns:
196 Formatted repo map string with function/class signatures.
197 """
198 if not self.is_installed():
199 return 'Repo map unavailable (aider core not installed)'
201 from .aider_core.io_adapter import SimpleIO
202 from .aider_core.hart_model_adapter import HartModelAdapter
204 io = SimpleIO()
205 model = HartModelAdapter.from_hartos_config()
206 return self._get_repo_map(working_dir, io, model, files or [], max_tokens)
208 def _get_repo_map(self, working_dir: str, io, model, chat_files: List[str],
209 max_tokens: int = 2048) -> str:
210 """Internal: generate repo map using vendored RepoMap."""
211 try:
212 from .aider_core.repomap import RepoMap
214 abs_dir = str(Path(working_dir).resolve())
215 rm = RepoMap(
216 root=abs_dir,
217 io=io,
218 main_model=model,
219 map_tokens=max_tokens,
220 )
222 # Collect all source files in directory
223 other_files = []
224 for root, dirs, filenames in os.walk(abs_dir):
225 # Skip hidden dirs and common non-source dirs
226 dirs[:] = [d for d in dirs if not d.startswith('.') and d not in (
227 'node_modules', '__pycache__', 'venv', '.git', 'dist', 'build',
228 )]
229 for fname in filenames:
230 fpath = os.path.join(root, fname)
231 other_files.append(fpath)
233 # Resolve chat files to absolute paths
234 abs_chat = [str(Path(f).resolve()) for f in chat_files]
236 # Remove chat files from other_files
237 other_files = [f for f in other_files if f not in abs_chat]
239 repo_map = rm.get_repo_map(
240 chat_files=abs_chat,
241 other_files=other_files,
242 )
243 return repo_map or ''
245 except Exception as e:
246 logger.warning(f"Repo map generation failed: {e}")
247 return ''
249 def _build_system_prompt(self, task_type: str, repo_map: str) -> str:
250 """Build system prompt for LLM based on task type."""
251 prompt = (
252 "You are an expert coding assistant. "
253 "When making code changes, use SEARCH/REPLACE blocks:\n\n"
254 "```\n"
255 "<<<<<<< SEARCH\n"
256 "exact code to find\n"
257 "=======\n"
258 "replacement code\n"
259 ">>>>>>> REPLACE\n"
260 "```\n\n"
261 "Always include the filename before each block as: `filename.py`\n"
262 )
264 if task_type == 'code_review':
265 prompt += "\nFocus on code quality, bugs, security issues, and improvements.\n"
266 elif task_type == 'refactor':
267 prompt += "\nFocus on improving code structure without changing behavior.\n"
268 elif task_type == 'bug_fix':
269 prompt += "\nFocus on identifying and fixing the bug described.\n"
271 if repo_map:
272 prompt += f"\n## Repository structure:\n{repo_map}\n"
274 return prompt
276 def _read_files(self, files: List[str], working_dir: str) -> str:
277 """Read file contents for inclusion in prompt."""
278 parts = []
279 for fname in files[:10]: # Cap at 10 files
280 fpath = Path(working_dir) / fname
281 try:
282 content = fpath.read_text(encoding='utf-8', errors='replace')
283 parts.append(f"### {fname}\n```\n{content}\n```\n")
284 except (OSError, UnicodeDecodeError):
285 parts.append(f"### {fname}\n(could not read)\n")
286 return '\n'.join(parts)
288 def _apply_edits(self, response: str, working_dir: str,
289 files: List[str]) -> List[Dict]:
290 """Parse SEARCH/REPLACE blocks from LLM response and apply them."""
291 from .aider_core.coders.search_replace import (
292 flexible_search_and_replace, editblock_strategies,
293 )
295 edits = self._parse_edit_blocks(response)
296 results = []
298 for edit in edits:
299 fname = edit['file']
300 fpath = Path(working_dir) / fname
302 if not fpath.exists():
303 results.append({'file': fname, 'status': 'skipped', 'reason': 'file not found'})
304 continue
306 try:
307 original = fpath.read_text(encoding='utf-8', errors='replace')
308 texts = (edit['search'], edit['replace'], original)
309 new_text = flexible_search_and_replace(texts, editblock_strategies)
311 if new_text and new_text != original:
312 fpath.write_text(new_text, encoding='utf-8')
313 results.append({
314 'file': fname,
315 'status': 'applied',
316 'search': edit['search'][:100],
317 'replace': edit['replace'][:100],
318 })
319 logger.info(f"Applied edit to {fname}")
320 elif new_text == original:
321 results.append({'file': fname, 'status': 'no_change'})
322 else:
323 results.append({'file': fname, 'status': 'failed', 'reason': 'search text not found'})
324 except Exception as e:
325 results.append({'file': fname, 'status': 'error', 'reason': str(e)})
327 return results
329 @staticmethod
330 def _parse_edit_blocks(response: str) -> List[Dict]:
331 """Parse SEARCH/REPLACE blocks from LLM response.
333 Expected format:
334 `filename.py`
335 <<<<<<< SEARCH
336 exact code to find
337 =======
338 replacement code
339 >>>>>>> REPLACE
340 """
341 import re
343 blocks = []
344 # Match filename followed by search/replace block
345 pattern = re.compile(
346 r'`([^`]+\.\w+)`\s*\n'
347 r'<<<<<<< SEARCH\n'
348 r'(.*?)\n'
349 r'=======\n'
350 r'(.*?)\n'
351 r'>>>>>>> REPLACE',
352 re.DOTALL,
353 )
355 for match in pattern.finditer(response):
356 blocks.append({
357 'file': match.group(1).strip(),
358 'search': match.group(2),
359 'replace': match.group(3),
360 })
362 return blocks