Coverage for integrations / coding_agent / orchestrator.py: 32.4%
290 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Coding Agent Orchestrator — Unified entry point for coding tool execution.
4Singleton orchestrator that:
5- Auto-detects installed tools
6- Routes tasks to the best tool via CodingToolRouter
7- Records benchmarks for distributed learning
8- Supports compute-aware hive offload when local compute is insufficient
9- Exposes list_tools() and get_benchmarks() for Nunba settings UI
11This is a LEAF tool — it calls external CLI subprocesses, never /chat.
12This eliminates callback loops and double-dispatch antipatterns.
13"""
14import logging
15import os
16import threading
17from typing import Dict, Optional
19logger = logging.getLogger('hevolve.coding_agent')
22class CodingAgentOrchestrator:
23 """Singleton coding agent orchestrator."""
25 def __init__(self):
26 self._lock = threading.Lock()
28 def execute(self, task: str, task_type: str = 'feature',
29 preferred_tool: str = '', user_id: str = '',
30 model: str = '', working_dir: str = '',
31 data_scope: str = '') -> Dict:
32 """Execute a coding task using the best available tool.
34 This is a terminal operation — calls subprocess, never /chat.
35 Safe to invoke from within an AutoGen agent's tool execution.
37 Respects data classification:
38 - edge_only / user_devices → local execution only
39 - trusted_peer → shard with INTERFACES scope, E2E to trusted peers
40 - federated / public → shard with FULL_FILE, fan out to N peers
41 - (empty) → auto-classify based on file content (DLP scan)
43 Args:
44 task: The coding task description
45 task_type: code_review, feature, bug_fix, refactor, app_build
46 preferred_tool: User override (kilocode, claude_code, opencode, claw_native)
47 user_id: For benchmark tracking
48 model: LLM model override (empty = use tool's default)
49 working_dir: Working directory for the coding tool
50 data_scope: Privacy scope override (empty = auto-classify)
52 Returns:
53 {success, output, tool, execution_time_s, task_type, error?}
54 """
55 # ── Step 1: Classify data scope ──
56 scope = self._classify_scope(task, working_dir, data_scope)
57 logger.info(f"[CLASSIFY] task_type={task_type}, scope={scope}")
59 # ── Step 2: Route by scope ──
60 if scope in ('edge_only', 'user_devices'):
61 # Private — never leaves this device (or user's own devices)
62 return self._execute_local(task, task_type, preferred_tool,
63 user_id, model, working_dir)
65 if not self._can_run_locally():
66 # Node can't run locally — distribute with scope-aware sharding
67 return self._distribute_to_hive(task, task_type, preferred_tool,
68 user_id, model, working_dir, scope)
70 return self._execute_local(task, task_type, preferred_tool,
71 user_id, model, working_dir)
73 def _classify_scope(self, task: str, working_dir: str,
74 override: str = '') -> str:
75 """Classify the privacy scope of a coding task.
77 Order: explicit override > DLP scan > default (trusted_peer).
78 """
79 if override:
80 return override
82 # Auto-classify by scanning target files for secrets/PII
83 try:
84 from security.dlp_engine import get_dlp_engine
86 if not working_dir:
87 return 'trusted_peer' # Default: shareable with trusted peers
89 # Scan a sample of files for PII/secrets
90 dlp = get_dlp_engine()
91 has_secrets = False
92 files_checked = 0
93 for root, _, files in os.walk(working_dir):
94 for fname in files:
95 if fname.endswith(('.env', '.pem', '.key', 'credentials.json',
96 'config.json', '.secret')):
97 has_secrets = True
98 break
99 if files_checked >= 5:
100 break
101 if fname.endswith(('.py', '.rs', '.js', '.ts')):
102 fpath = os.path.join(root, fname)
103 try:
104 with open(fpath, 'r', errors='ignore') as f:
105 sample = f.read(4096)
106 findings = dlp.scan(sample)
107 if findings:
108 has_secrets = True
109 break
110 except IOError:
111 pass
112 files_checked += 1
113 if has_secrets:
114 break
116 if has_secrets:
117 return 'edge_only' # Secrets found — local only
118 return 'trusted_peer' # Safe for trusted peers
119 except Exception:
120 return 'trusted_peer' # Default if classification fails
122 def _execute_local(self, task: str, task_type: str,
123 preferred_tool: str, user_id: str,
124 model: str, working_dir: str) -> Dict:
125 """Execute locally via subprocess."""
126 from .tool_router import CodingToolRouter
127 from .benchmark_tracker import get_benchmark_tracker
129 router = CodingToolRouter()
130 backend = router.route(task, task_type, preferred_tool)
132 if backend is None:
133 return {
134 'success': False,
135 'output': '',
136 'tool': 'none',
137 'task_type': task_type,
138 'error': 'No coding tools installed. '
139 'Install one: kilocode, claude (Claude Code), or opencode.',
140 }
142 # Build context for the backend
143 context = {}
144 if model:
145 context['model'] = model
146 if working_dir:
147 context['working_dir'] = working_dir
149 # Execute via subprocess (leaf operation, no /chat re-entry)
150 result = backend.execute(task, context)
151 result['task_type'] = task_type
153 # Record benchmark
154 tracker = get_benchmark_tracker()
155 tracker.record(
156 task_type=task_type,
157 tool_name=result.get('tool', backend.name),
158 completion_time_s=result.get('execution_time_s', 0),
159 success=result.get('success', False),
160 model_name=model,
161 user_id=user_id,
162 )
164 # Capture successful edits as recipe steps for REUSE mode
165 if result.get('success') and result.get('edits'):
166 try:
167 from .recipe_bridge import CodingRecipeBridge
168 bridge = CodingRecipeBridge()
169 bridge.capture_edit_as_recipe_step(
170 task=task,
171 tool_name=result.get('tool', backend.name),
172 file_edits=result['edits'],
173 working_dir=working_dir,
174 )
175 except Exception as e:
176 logger.debug(f"Recipe capture: {e}")
178 # Broadcast to EventBus — UI, ledger, Agent Lightning all subscribe
179 try:
180 from core.platform.events import emit_event
181 emit_event('coding.task_completed', {
182 'tool': result.get('tool', backend.name),
183 'task_type': task_type,
184 'success': result.get('success', False),
185 'execution_time_s': result.get('execution_time_s', 0),
186 'user_id': user_id,
187 'working_dir': working_dir,
188 'task_summary': task[:200],
189 })
190 except Exception:
191 pass # EventBus optional — don't block on it
193 return result
195 def _can_run_locally(self) -> bool:
196 """Check if this node has sufficient compute for coding tools.
198 Coding tools are CLI subprocesses — they mostly need network
199 (for API calls) and disk space, not GPU. So the gate is lenient.
200 """
201 try:
202 from security.system_requirements import (
203 get_tier, _TIER_RANK, NodeTierLevel, FEATURE_TIER_MAP
204 )
205 feature_entry = FEATURE_TIER_MAP.get('coding_aggregator')
206 if not feature_entry:
207 return True # Feature not in map yet, allow
208 min_tier, _ = feature_entry
209 current = get_tier()
210 return _TIER_RANK[current] >= _TIER_RANK[min_tier]
211 except Exception:
212 return True # If system_requirements unavailable, allow
214 def _distribute_to_hive(self, task: str, task_type: str,
215 preferred_tool: str, user_id: str,
216 model: str, working_dir: str,
217 scope: str = 'trusted_peer') -> Dict:
218 """Shard task, fan out to N peers in parallel, merge results.
220 Flow:
221 1. ShardEngine.decompose_task() → N shards
222 2. ScopeGuard.check_egress() on each shard
223 3. Fan out shards to available trusted peers (concurrent)
224 4. Merge diffs, validate, record benchmarks
225 5. Fall back to single-peer or local on failure
227 Scope determines shard visibility:
228 - trusted_peer → INTERFACES (signatures only, not implementations)
229 - federated/public → FULL_FILE (peer sees everything)
230 """
231 import time
232 from concurrent.futures import ThreadPoolExecutor, as_completed
234 start = time.time()
236 try:
237 from integrations.agent_engine.shard_engine import ShardEngine, ShardScope
238 from integrations.agent_engine.compute_mesh_service import get_compute_mesh
239 from security.channel_encryption import encrypt_json_for_peer, decrypt_json_from_peer
240 from security.edge_privacy import PrivacyScope, ScopeGuard
241 from core.http_pool import pooled_post
243 # ── Map scope to shard visibility ──
244 # trusted_peer uses FULL_FILE because the payload is E2E encrypted
245 # to the specific peer — the content is as private as the transport.
246 # INTERFACES scope would send empty files, making the peer useless.
247 shard_scope = {
248 'trusted_peer': ShardScope.FULL_FILE,
249 'federated': ShardScope.FULL_FILE,
250 'public': ShardScope.FULL_FILE,
251 }.get(scope, ShardScope.FULL_FILE)
253 # ── Decompose task into shards ──
254 engine = ShardEngine(code_root=working_dir) if working_dir else ShardEngine()
255 shards = engine.decompose_task(task, scope=shard_scope, max_files_per_shard=5)
257 if not shards:
258 logger.info("[DISTRIBUTE] No shards — falling back to single-peer offload")
259 return self._offload_to_hive(task, task_type, preferred_tool,
260 user_id, model, working_dir)
262 logger.info(f"[DISTRIBUTE] Decomposed into {len(shards)} shards (scope={shard_scope.value})")
264 # ── Check egress on each shard ──
265 guard = ScopeGuard()
266 dest_privacy = {
267 'trusted_peer': PrivacyScope.TRUSTED_PEER,
268 'federated': PrivacyScope.FEDERATED,
269 'public': PrivacyScope.PUBLIC,
270 }.get(scope, PrivacyScope.TRUSTED_PEER)
272 cleared_shards = []
273 for shard in shards:
274 shard_data = {
275 '_privacy_scope': scope,
276 'task': shard.task_description,
277 'files': list(shard.full_content.keys()) if shard.full_content else [],
278 }
279 allowed, reason = guard.check_egress(shard_data, dest_privacy)
280 if allowed:
281 cleared_shards.append(shard)
282 else:
283 logger.warning(f"[DISTRIBUTE] Shard blocked by ScopeGuard: {reason}")
285 if not cleared_shards:
286 logger.info("[DISTRIBUTE] All shards blocked — executing locally")
287 return self._execute_local(task, task_type, preferred_tool,
288 user_id, model, working_dir)
290 # ── Get available peers ──
291 mesh = get_compute_mesh()
292 peers = mesh.get_available_peers()
293 trusted_peers = [p for p in peers if self._is_code_trusted(p)]
295 if not trusted_peers:
296 logger.info("[DISTRIBUTE] No trusted peers — executing locally")
297 return self._execute_local(task, task_type, preferred_tool,
298 user_id, model, working_dir)
300 # ── Fan out: assign shards to peers round-robin, execute in parallel ──
301 assignments = []
302 for i, shard in enumerate(cleared_shards):
303 peer = trusted_peers[i % len(trusted_peers)]
304 assignments.append((shard, peer))
306 logger.info(f"[DISTRIBUTE] Dispatching {len(assignments)} shards to "
307 f"{len(trusted_peers)} peers")
309 merged_output = []
310 merged_diffs = {}
311 failures = 0
313 # Get our own public key so peers can encrypt responses back to us
314 try:
315 from security.channel_encryption import get_x25519_public_hex
316 our_public_key = get_x25519_public_hex()
317 except Exception:
318 our_public_key = None
320 def _dispatch_shard(shard, peer):
321 """Send one shard to one peer, return result."""
322 peer_pub = peer.get('x25519_public_hex')
323 peer_url = peer.get('url', '')
324 if not peer_pub or not peer_url:
325 return None
327 # Convert interface_specs to serializable dicts
328 iface_dicts = []
329 for spec in (shard.interface_specs or []):
330 try:
331 from dataclasses import asdict
332 iface_dicts.append(asdict(spec))
333 except Exception:
334 pass
336 payload = {
337 'task': shard.task_description,
338 'task_type': task_type,
339 'preferred_tool': preferred_tool,
340 'model': model,
341 'file_content': shard.full_content or {},
342 'interfaces': iface_dicts,
343 'shard_scope': shard.scope.value,
344 'sender_public_key': our_public_key, # Inside envelope — MITM can't replace
345 }
346 envelope = encrypt_json_for_peer(payload, peer_pub)
347 try:
348 resp = pooled_post(
349 f'{peer_url.rstrip("/")}/coding/execute',
350 json={'encrypted': envelope},
351 timeout=300,
352 )
353 if resp.status_code == 200:
354 resp_data = resp.json()
355 encrypted_result = resp_data.get('encrypted')
356 if encrypted_result:
357 return decrypt_json_from_peer(encrypted_result)
358 # Fallback: unencrypted result (SAME_USER trust)
359 return resp_data.get('result')
360 except Exception as e:
361 logger.warning(f"[DISTRIBUTE] Peer {peer.get('node_id','?')} failed: {e}")
362 return None
364 with ThreadPoolExecutor(max_workers=min(len(assignments), 8)) as pool:
365 futures = {
366 pool.submit(_dispatch_shard, shard, peer): (shard, peer)
367 for shard, peer in assignments
368 }
369 for future in as_completed(futures):
370 shard, peer = futures[future]
371 result = future.result()
372 peer_id = peer.get('node_id', 'unknown')
373 if result and result.get('success'):
374 merged_output.append(result.get('output', ''))
375 # Merge diffs (validate: only files in the shard's target list)
376 allowed_files = set(shard.target_files or [])
377 if shard.full_content:
378 allowed_files.update(shard.full_content.keys())
379 for fpath, diff in result.get('diffs', {}).items():
380 if not allowed_files or fpath in allowed_files:
381 merged_diffs[fpath] = diff
382 else:
383 logger.warning(f"[DISTRIBUTE] Peer {peer_id} "
384 f"returned diff for unauthorized file: {fpath}")
385 self._record_peer_trust(peer_id, success=True)
386 else:
387 failures += 1
388 self._record_peer_trust(peer_id, success=False)
390 elapsed = time.time() - start
392 # ── Record benchmark ──
393 from .benchmark_tracker import get_benchmark_tracker
394 tracker = get_benchmark_tracker()
395 tracker.record(
396 task_type=task_type,
397 tool_name='distributed',
398 completion_time_s=elapsed,
399 success=failures < len(assignments),
400 model_name=model,
401 user_id=user_id,
402 offloaded=True,
403 )
405 # ── Emit event ──
406 try:
407 from core.platform.events import emit_event
408 emit_event('coding.distributed_complete', {
409 'shards': len(cleared_shards),
410 'peers_used': len(trusted_peers),
411 'failures': failures,
412 'elapsed_s': elapsed,
413 'scope': scope,
414 'task_type': task_type,
415 })
416 except Exception:
417 pass
419 return {
420 'success': failures < len(assignments),
421 'output': '\n---\n'.join(merged_output),
422 'diffs': merged_diffs,
423 'tool': 'distributed',
424 'task_type': task_type,
425 'execution_time_s': elapsed,
426 'shards_total': len(cleared_shards),
427 'shards_succeeded': len(cleared_shards) - failures,
428 'peers_used': min(len(trusted_peers), len(cleared_shards)),
429 'scope': scope,
430 }
432 except Exception as e:
433 logger.warning(f"[DISTRIBUTE] Failed ({e}), falling back to single-peer")
434 return self._offload_to_hive(task, task_type, preferred_tool,
435 user_id, model, working_dir)
437 def _offload_to_hive(self, task: str, task_type: str,
438 preferred_tool: str, user_id: str,
439 model: str, working_dir: str) -> Dict:
440 """Offload to a trusted hive peer with sufficient compute.
442 Security: E2E encryption (X25519 + AES-256-GCM) with full source context.
443 Trust: Only offload code tasks to peers with sufficient trust score.
444 Accuracy > Security theater: peers get full file content (encrypted),
445 not interface stubs. An LLM without full context produces broken code.
447 Autotrust: peers earn trust through successful task completion.
448 After 5+ validated code tasks, a peer auto-promotes to code-trusted.
449 """
450 try:
451 from integrations.agent_engine.compute_mesh_service import get_compute_mesh
452 from security.channel_encryption import (
453 encrypt_json_for_peer, decrypt_json_from_peer
454 )
455 from core.http_pool import pooled_post
457 mesh = get_compute_mesh()
458 peers = mesh.get_available_peers()
460 if not peers:
461 logger.info("No hive peers available, attempting local execution")
462 return self._execute_local(task, task_type, preferred_tool,
463 user_id, model, working_dir)
465 # Filter to code-trusted peers only
466 trusted_peers = [
467 p for p in peers
468 if self._is_code_trusted(p)
469 ]
471 if not trusted_peers:
472 logger.info("No code-trusted peers, executing locally")
473 return self._execute_local(task, task_type, preferred_tool,
474 user_id, model, working_dir)
476 # Pick best trusted peer (by compute score)
477 best_peer = max(trusted_peers, key=lambda p: mesh.score(p))
478 peer_pub = best_peer.get('x25519_public_hex')
479 peer_url = best_peer.get('url', '')
481 if not peer_pub or not peer_url:
482 return self._execute_local(task, task_type, preferred_tool,
483 user_id, model, working_dir)
485 # Include full source context for target files (encrypted)
486 # Accuracy > security theater: the peer needs full context to code well
487 file_content = {}
488 if working_dir:
489 file_content = self._read_target_files(task, working_dir)
491 # Encrypt full payload (forward secrecy via ephemeral keys)
492 payload = {
493 'task': task,
494 'task_type': task_type,
495 'preferred_tool': preferred_tool,
496 'model': model,
497 'file_content': file_content,
498 'working_dir_name': os.path.basename(working_dir) if working_dir else '',
499 }
500 envelope = encrypt_json_for_peer(payload, peer_pub)
502 # POST to peer's /coding/execute endpoint
503 resp = pooled_post(
504 f'{peer_url.rstrip("/")}/coding/execute',
505 json={'encrypted': envelope},
506 timeout=300,
507 )
509 peer_id = best_peer.get('node_id', 'unknown')
510 if resp.status_code == 200:
511 encrypted_result = resp.json().get('encrypted')
512 if encrypted_result:
513 result = decrypt_json_from_peer(encrypted_result)
514 if result:
515 result['offloaded'] = True
516 result['peer_id'] = peer_id
518 # Validate diffs only touch expected files
519 diffs = result.get('diffs', {})
520 if file_content and diffs:
521 unauthorized = [
522 f for f in diffs
523 if f not in file_content
524 ]
525 if unauthorized:
526 logger.warning(
527 f"Peer {peer_id} returned diffs for "
528 f"unauthorized files: {unauthorized}")
529 result['success'] = False
530 result['error'] = 'Unauthorized file modifications'
532 # Record benchmark
533 from .benchmark_tracker import get_benchmark_tracker
534 tracker = get_benchmark_tracker()
535 tracker.record(
536 task_type=task_type,
537 tool_name=result.get('tool', 'unknown'),
538 completion_time_s=result.get('execution_time_s', 0),
539 success=result.get('success', False),
540 model_name=model,
541 user_id=user_id,
542 offloaded=True,
543 )
545 # Autotrust: successful validated tasks build trust
546 if result.get('success'):
547 self._record_peer_trust(peer_id, success=True)
548 return result
550 # Peer failed — record for trust scoring
551 self._record_peer_trust(peer_id, success=False)
553 except Exception as e:
554 logger.warning(f"Hive offload failed ({e}), falling back to local")
556 # Fallback to local
557 return self._execute_local(task, task_type, preferred_tool,
558 user_id, model, working_dir)
560 @staticmethod
561 def _is_code_trusted(peer: Dict) -> bool:
562 """Check if a peer is trusted for code tasks.
564 Trust sources (any one is sufficient):
565 - SAME_USER: peer belongs to the same user (their other machine)
566 - Explicit grant: peer has 'code_trusted' flag
567 - Autotrust: peer has 5+ successful validated code tasks
568 """
569 if peer.get('trust_level') == 'SAME_USER':
570 return True
571 if peer.get('code_trusted'):
572 return True
573 # Autotrust: earned through track record
574 successful_tasks = peer.get('successful_code_tasks', 0)
575 return successful_tasks >= 5
577 @staticmethod
578 def _record_peer_trust(peer_id: str, success: bool):
579 """Record a peer's code task result for autotrust scoring."""
580 try:
581 from integrations.social.models import db_session, PeerNode
582 with db_session() as db:
583 node = db.query(PeerNode).filter_by(node_id=peer_id).first()
584 if node and hasattr(node, 'successful_code_tasks'):
585 if success:
586 node.successful_code_tasks = (
587 node.successful_code_tasks or 0) + 1
588 else:
589 node.successful_code_tasks = max(
590 0, (node.successful_code_tasks or 0) - 2)
591 except Exception:
592 pass
594 @staticmethod
595 def _read_target_files(task: str, working_dir: str) -> Dict[str, str]:
596 """Read target files for the task from the working directory.
598 Uses the shard engine's keyword matching to find relevant files,
599 limited to a reasonable context window.
600 """
601 file_content = {}
602 max_files = 10
603 max_total_chars = 200_000 # ~50K tokens
604 total_chars = 0
605 try:
606 from integrations.agent_engine.shard_engine import ShardEngine
607 engine = ShardEngine(code_root=working_dir)
608 imap = engine.get_interface_map()
610 # Score files by task keyword relevance
611 task_lower = task.lower()
612 scored = []
613 for rel_path, spec in imap.items():
614 names = (
615 [f['name'] for f in spec.functions] +
616 [c['name'] for c in spec.classes] +
617 [rel_path]
618 )
619 score = sum(1 for n in names if n.lower() in task_lower)
620 if score > 0:
621 scored.append((rel_path, score))
622 scored.sort(key=lambda x: -x[1])
624 for rel_path, _ in scored[:max_files]:
625 full_path = os.path.join(working_dir, rel_path)
626 if os.path.exists(full_path):
627 try:
628 with open(full_path, 'r', encoding='utf-8',
629 errors='ignore') as f:
630 content = f.read()
631 if total_chars + len(content) > max_total_chars:
632 break
633 file_content[rel_path] = content
634 total_chars += len(content)
635 except IOError:
636 pass
637 except Exception as e:
638 logger.debug(f"Target file reading: {e}")
639 return file_content
641 def list_tools(self) -> Dict:
642 """List all tools with install status, capabilities, and benchmarks."""
643 from .installer import get_tool_info
644 from .benchmark_tracker import get_benchmark_tracker
646 tools = get_tool_info()
647 try:
648 benchmarks = get_benchmark_tracker().get_summary()
649 except Exception:
650 benchmarks = {'total_benchmarks': 0, 'by_tool': [], 'by_task_type': []}
652 return {
653 'tools': tools,
654 'benchmarks': benchmarks,
655 'can_run_locally': self._can_run_locally(),
656 }
658 def get_benchmarks(self) -> Dict:
659 """Get benchmark dashboard data."""
660 from .benchmark_tracker import get_benchmark_tracker
661 return get_benchmark_tracker().get_summary()
664# ─── Module-level singleton ───
665_orchestrator = None
666_orchestrator_lock = threading.Lock()
669def get_coding_orchestrator() -> CodingAgentOrchestrator:
670 """Get or create the singleton CodingAgentOrchestrator."""
671 global _orchestrator
672 if _orchestrator is None:
673 with _orchestrator_lock:
674 if _orchestrator is None:
675 _orchestrator = CodingAgentOrchestrator()
676 return _orchestrator