Coverage for integrations / agent_engine / shard_engine.py: 47.6%
393 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Shard Engine — Context-scoped task decomposition.
4When a code task arrives:
5 1. Identify target function(s) to modify
6 2. Build call-chain context: upstream callers + downstream callees (FULL source)
7 3. Everything else: interfaces only (signatures + types + docstrings)
8 4. Agent sees enough to code accurately, exposure proportional to task
10Context model (CALL_CHAIN scope):
11 Target function: FULL implementation (what you're modifying)
12 Upstream callers: FULL implementation (who calls it, input contracts)
13 Downstream callees: FULL implementation (what it calls, output contracts)
14 Everything else: Interfaces only (signatures + types)
16Security model:
17 - Exposure proportional to task, not whole codebase
18 - E2E encrypted between nodes
19 - Trust-based peer access (SAME_USER / autotrust)
20 - Validation on trusted node before applying diffs
21"""
23import ast
24import hashlib
25import json
26import logging
27import os
28import time
29from dataclasses import dataclass, field, asdict
30from enum import Enum
31from typing import Any, Dict, List, Optional, Set, Tuple
33logger = logging.getLogger('hevolve.shard_engine')
36class ShardScope(str, Enum):
37 """What a shard agent is allowed to see."""
38 FULL_FILE = 'full_file' # See complete file(s) — trusted agent only
39 CALL_CHAIN = 'call_chain' # Target func + upstream/downstream FULL, rest interfaces
40 INTERFACES = 'interfaces' # See signatures + types, not implementations
41 SIGNATURES = 'signatures' # See function/class names + params only
42 MINIMAL = 'minimal' # See only the specific function to modify
45@dataclass
46class InterfaceSpec:
47 """Extracted interface from a Python file — what agents see instead of full code."""
48 file_path: str
49 classes: List[Dict[str, Any]] = field(default_factory=list)
50 functions: List[Dict[str, Any]] = field(default_factory=list)
51 imports: List[str] = field(default_factory=list)
52 constants: List[Dict[str, str]] = field(default_factory=list)
53 module_doc: str = ''
56@dataclass
57class CodeShard:
58 """An isolated unit of work for a compute agent."""
59 shard_id: str
60 task_description: str
61 scope: ShardScope
62 target_files: List[str] # Files to modify
63 interface_specs: List[InterfaceSpec] # What the agent can see
64 full_content: Dict[str, str] # file_path → content (only for scope=FULL_FILE)
65 test_expectations: List[str] # What the result should satisfy
66 dependencies: List[str] # Other shard IDs this depends on
67 expires_at: float # Auto-expire timestamp
68 obfuscated: bool = False # Whether paths are scrambled
70 def to_dict(self) -> Dict:
71 return {
72 'shard_id': self.shard_id,
73 'task_description': self.task_description,
74 'scope': self.scope.value,
75 'target_files': self.target_files,
76 'interfaces': [asdict(s) for s in self.interface_specs],
77 'full_content': self.full_content,
78 'test_expectations': self.test_expectations,
79 'dependencies': self.dependencies,
80 'expires_at': self.expires_at,
81 'obfuscated': self.obfuscated,
82 }
84 def is_expired(self) -> bool:
85 return time.time() > self.expires_at
88@dataclass
89class ShardResult:
90 """Result from an agent working on a shard."""
91 shard_id: str
92 diffs: Dict[str, str] # file_path → unified diff
93 test_results: Optional[str] # stdout from test run
94 success: bool
95 error: Optional[str] = None
98class CallGraphExtractor:
99 """Extract upstream callers and downstream callees for a target function.
101 Primary: Trueflow MCP (port 5681) — has runtime call tree + coverage.
102 Fallback: AST-based static analysis — walks function bodies for Name references.
103 """
105 @staticmethod
106 def get_call_chain(file_path: str, function_name: str,
107 code_root: str = '') -> Dict[str, List[str]]:
108 """Get upstream (callers) and downstream (callees) for a function.
110 Returns:
111 {'target': 'func_name',
112 'upstream': ['caller1', 'caller2'],
113 'downstream': ['callee1', 'callee2'],
114 'source': 'trueflow' | 'ast'}
115 """
116 # Try Trueflow MCP first (richer: runtime coverage + dynamic call tree)
117 try:
118 chain = CallGraphExtractor._from_trueflow(file_path, function_name)
119 if chain.get('upstream') or chain.get('downstream'):
120 return chain
121 except Exception:
122 pass
124 # Fallback: static AST analysis
125 return CallGraphExtractor._from_ast(file_path, function_name, code_root)
127 @staticmethod
128 def _from_trueflow(file_path: str, function_name: str) -> Dict:
129 """Get call chain from Trueflow MCP (IDE must be running)."""
130 from core.http_pool import pooled_post
131 trueflow_url = os.environ.get('TRUEFLOW_MCP_URL',
132 'http://localhost:5681')
133 resp = pooled_post(
134 f'{trueflow_url}/analyze_call_tree',
135 json={'file': file_path, 'function': function_name},
136 timeout=10,
137 )
138 if resp.status_code == 200:
139 data = resp.json()
140 return {
141 'target': function_name,
142 'upstream': data.get('callers', []),
143 'downstream': data.get('callees', []),
144 'source': 'trueflow',
145 }
146 return {'target': function_name, 'upstream': [], 'downstream': [],
147 'source': 'trueflow'}
149 @staticmethod
150 def _from_ast(file_path: str, function_name: str,
151 code_root: str = '') -> Dict:
152 """Static call graph via AST. Scans target file + importers."""
153 result = {
154 'target': function_name,
155 'upstream': [],
156 'downstream': [],
157 'source': 'ast',
158 }
160 try:
161 with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
162 source = f.read()
163 tree = ast.parse(source)
164 except (IOError, SyntaxError):
165 return result
167 all_functions = {} # name → ast node
168 for node in ast.walk(tree):
169 if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
170 all_functions[node.name] = node
172 # Downstream: what does the target function call?
173 target_node = all_functions.get(function_name)
174 if target_node:
175 for node in ast.walk(target_node):
176 if isinstance(node, ast.Call):
177 callee = None
178 if isinstance(node.func, ast.Name):
179 callee = node.func.id
180 elif isinstance(node.func, ast.Attribute):
181 callee = node.func.attr
182 if callee and callee != function_name:
183 if callee not in result['downstream']:
184 result['downstream'].append(callee)
186 # Upstream: which functions in this file call the target?
187 for fname, fnode in all_functions.items():
188 if fname == function_name:
189 continue
190 for node in ast.walk(fnode):
191 if isinstance(node, ast.Call):
192 callee = None
193 if isinstance(node.func, ast.Name):
194 callee = node.func.id
195 elif isinstance(node.func, ast.Attribute):
196 callee = node.func.attr
197 if callee == function_name:
198 if fname not in result['upstream']:
199 result['upstream'].append(fname)
200 break
202 # Cross-file upstream: scan other files that import from this module
203 if code_root:
204 target_module = os.path.relpath(file_path, code_root)
205 target_module = target_module.replace(os.sep, '.').rstrip('.py')
206 result['upstream'].extend(
207 CallGraphExtractor._find_cross_file_callers(
208 code_root, target_module, function_name))
210 return result
212 @staticmethod
213 def _find_cross_file_callers(code_root: str, target_module: str,
214 function_name: str) -> List[str]:
215 """Find functions in other files that import and call the target."""
216 callers = []
217 exclude = {'__pycache__', 'venv', '.venv', 'venv310', '.git',
218 'node_modules', 'agent_data', 'tests', 'build'}
219 for root, dirs, files in os.walk(code_root):
220 dirs[:] = [d for d in dirs if d not in exclude]
221 for fname in files:
222 if not fname.endswith('.py'):
223 continue
224 full_path = os.path.join(root, fname)
225 try:
226 with open(full_path, 'r', encoding='utf-8',
227 errors='ignore') as f:
228 src = f.read()
229 tree = ast.parse(src)
230 except (IOError, SyntaxError):
231 continue
233 # Check if this file imports our target function
234 imports_target = False
235 for node in ast.walk(tree):
236 if isinstance(node, ast.ImportFrom):
237 if node.module and target_module in node.module:
238 for alias in node.names:
239 if alias.name == function_name:
240 imports_target = True
241 break
242 if imports_target:
243 break
244 if not imports_target:
245 continue
247 # Find which functions call it
248 rel = os.path.relpath(full_path, code_root)
249 for node in ast.walk(tree):
250 if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
251 for child in ast.walk(node):
252 if isinstance(child, ast.Call):
253 cname = None
254 if isinstance(child.func, ast.Name):
255 cname = child.func.id
256 elif isinstance(child.func, ast.Attribute):
257 cname = child.func.attr
258 if cname == function_name:
259 caller_id = f"{rel}:{node.name}"
260 if caller_id not in callers:
261 callers.append(caller_id)
262 break
263 if len(callers) >= 20: # Cap cross-file search
264 break
265 return callers
267 @staticmethod
268 def extract_function_source(file_path: str, function_name: str) -> str:
269 """Extract the full source code of a specific function from a file."""
270 try:
271 with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
272 lines = f.readlines()
273 source = ''.join(lines)
274 tree = ast.parse(source)
275 except (IOError, SyntaxError):
276 return ''
278 for node in ast.walk(tree):
279 if (isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef))
280 and node.name == function_name):
281 start = node.lineno - 1
282 end = getattr(node, 'end_lineno', start + 1)
283 return ''.join(lines[start:end])
284 return ''
287class InterfaceExtractor:
288 """Extract function signatures, class definitions, and types from Python files.
290 This is what agents see instead of full implementations.
291 They get enough to understand the API contract without seeing the logic.
292 """
294 @staticmethod
295 def extract_from_file(file_path: str) -> InterfaceSpec:
296 """Extract public interface from a Python file."""
297 spec = InterfaceSpec(file_path=file_path)
299 try:
300 with open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
301 source = f.read()
302 except (IOError, OSError):
303 return spec
305 try:
306 tree = ast.parse(source)
307 except SyntaxError:
308 return spec
310 # Module docstring
311 if (tree.body and isinstance(tree.body[0], ast.Expr)
312 and isinstance(tree.body[0].value, (ast.Constant, ast.Str))):
313 doc = tree.body[0].value
314 spec.module_doc = doc.value if isinstance(doc, ast.Constant) else doc.s
316 for node in ast.iter_child_nodes(tree):
317 # Imports
318 if isinstance(node, ast.Import):
319 for alias in node.names:
320 spec.imports.append(f"import {alias.name}")
321 elif isinstance(node, ast.ImportFrom):
322 names = ', '.join(a.name for a in node.names)
323 spec.imports.append(f"from {node.module} import {names}")
325 # Functions
326 elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)):
327 if node.name.startswith('_') and not node.name.startswith('__'):
328 continue # Skip private functions
329 func_info = InterfaceExtractor._extract_function(node)
330 spec.functions.append(func_info)
332 # Classes
333 elif isinstance(node, ast.ClassDef):
334 cls_info = InterfaceExtractor._extract_class(node)
335 spec.classes.append(cls_info)
337 # Module-level constants (UPPER_CASE assignments)
338 elif isinstance(node, ast.Assign):
339 for target in node.targets:
340 if isinstance(target, ast.Name) and target.id.isupper():
341 value_str = ast.dump(node.value)[:100]
342 spec.constants.append({
343 'name': target.id,
344 'value_hint': value_str,
345 })
347 return spec
349 @staticmethod
350 def _extract_function(node) -> Dict:
351 """Extract function signature (name, args, return type, docstring)."""
352 args = []
353 for arg in node.args.args:
354 arg_info = {'name': arg.arg}
355 if arg.annotation:
356 arg_info['type'] = ast.unparse(arg.annotation)
357 args.append(arg_info)
359 # Defaults
360 defaults = node.args.defaults
361 if defaults:
362 offset = len(args) - len(defaults)
363 for i, d in enumerate(defaults):
364 try:
365 args[offset + i]['default'] = ast.unparse(d)
366 except Exception:
367 pass
369 info = {
370 'name': node.name,
371 'args': args,
372 'async': isinstance(node, ast.AsyncFunctionDef),
373 }
375 # Return type
376 if node.returns:
377 info['return_type'] = ast.unparse(node.returns)
379 # Docstring
380 if (node.body and isinstance(node.body[0], ast.Expr)
381 and isinstance(node.body[0].value, (ast.Constant, ast.Str))):
382 doc = node.body[0].value
383 info['docstring'] = doc.value if isinstance(doc, ast.Constant) else doc.s
385 # Decorators
386 if node.decorator_list:
387 info['decorators'] = [ast.unparse(d) for d in node.decorator_list]
389 return info
391 @staticmethod
392 def _extract_class(node: ast.ClassDef) -> Dict:
393 """Extract class interface (name, bases, methods, docstring)."""
394 info = {
395 'name': node.name,
396 'bases': [ast.unparse(b) for b in node.bases],
397 'methods': [],
398 }
400 # Docstring
401 if (node.body and isinstance(node.body[0], ast.Expr)
402 and isinstance(node.body[0].value, (ast.Constant, ast.Str))):
403 doc = node.body[0].value
404 info['docstring'] = doc.value if isinstance(doc, ast.Constant) else doc.s
406 # Methods (public only)
407 for item in node.body:
408 if isinstance(item, (ast.FunctionDef, ast.AsyncFunctionDef)):
409 if item.name.startswith('_') and not item.name.startswith('__'):
410 continue
411 method_info = InterfaceExtractor._extract_function(item)
412 info['methods'].append(method_info)
414 return info
416 @staticmethod
417 def extract_from_directory(dir_path: str,
418 extensions: Tuple[str, ...] = ('.py',),
419 exclude_dirs: Optional[Set[str]] = None
420 ) -> Dict[str, InterfaceSpec]:
421 """Extract interfaces from all Python files in a directory."""
422 exclude = exclude_dirs or {
423 '__pycache__', 'venv', '.venv', 'venv310', '.git',
424 'node_modules', 'agent_data', 'tests',
425 }
426 specs = {}
427 for root, dirs, files in os.walk(dir_path):
428 dirs[:] = [d for d in dirs if d not in exclude]
429 for fname in files:
430 if any(fname.endswith(ext) for ext in extensions):
431 full_path = os.path.join(root, fname)
432 rel_path = os.path.relpath(full_path, dir_path)
433 specs[rel_path] = InterfaceExtractor.extract_from_file(full_path)
434 return specs
437class ShardEngine:
438 """Context-scoped task decomposition engine.
440 Runs on the TRUSTED node (user's machine). Creates shards with
441 context proportional to the task — not the whole codebase.
443 CALL_CHAIN scope (default for distributed work):
444 Target function: FULL source
445 Upstream callers: FULL source (input contracts)
446 Downstream callees: FULL source (output contracts)
447 Everything else: Interfaces only (signatures + types)
449 Call graph from Trueflow MCP (when IDE running) or AST fallback.
450 """
452 def __init__(self, code_root: str = None, shard_ttl: int = 3600):
453 self.code_root = code_root or os.environ.get(
454 'HART_INSTALL_DIR',
455 os.path.dirname(os.path.dirname(os.path.dirname(
456 os.path.abspath(__file__))))
457 )
458 self.shard_ttl = shard_ttl # Shards expire after 1 hour
459 self._interface_cache: Dict[str, InterfaceSpec] = {}
460 self._active_shards: Dict[str, CodeShard] = {}
462 def get_interface_map(self, refresh: bool = False) -> Dict[str, InterfaceSpec]:
463 """Get the full interface map of the codebase.
465 This is what the cloud orchestrator sees (PARTIAL TRUST):
466 function signatures, types, imports — NOT implementations.
467 """
468 if self._interface_cache and not refresh:
469 return self._interface_cache
470 self._interface_cache = InterfaceExtractor.extract_from_directory(
471 self.code_root)
472 return self._interface_cache
474 def create_shard(self, task: str, target_files: List[str],
475 scope: ShardScope = ShardScope.INTERFACES,
476 related_files: Optional[List[str]] = None,
477 test_expectations: Optional[List[str]] = None,
478 depends_on: Optional[List[str]] = None,
479 obfuscate_paths: bool = False) -> CodeShard:
480 """Create a code shard for a compute agent.
482 Args:
483 task: Description of what the agent should do
484 target_files: Files the agent will modify
485 scope: How much of each file the agent can see
486 related_files: Additional files the agent needs for context (read-only)
487 test_expectations: What the result should satisfy
488 depends_on: Other shard IDs that must complete first
489 obfuscate_paths: Scramble file paths for extra privacy
490 """
491 shard_id = hashlib.sha256(
492 f"shard:{task}:{time.time()}".encode()
493 ).hexdigest()[:12]
495 all_files = list(set(target_files + (related_files or [])))
496 interface_specs = []
497 full_content = {}
499 for rel_path in all_files:
500 full_path = os.path.join(self.code_root, rel_path)
501 if not os.path.exists(full_path):
502 continue
504 if scope == ShardScope.FULL_FILE:
505 # Trusted agent — gets everything
506 try:
507 with open(full_path, 'r', encoding='utf-8', errors='ignore') as f:
508 full_content[rel_path] = f.read()
509 except IOError:
510 pass
511 elif scope in (ShardScope.INTERFACES, ShardScope.SIGNATURES):
512 # Gets signatures and types, not implementations
513 spec = InterfaceExtractor.extract_from_file(full_path)
514 if scope == ShardScope.SIGNATURES:
515 # Strip docstrings and constants for minimal scope
516 spec.module_doc = ''
517 spec.constants = []
518 for func in spec.functions:
519 func.pop('docstring', None)
520 for cls in spec.classes:
521 cls.pop('docstring', None)
522 for m in cls.get('methods', []):
523 m.pop('docstring', None)
524 interface_specs.append(spec)
525 elif scope == ShardScope.MINIMAL:
526 # Only give the specific function to modify
527 # Full file content but stripped to relevant functions
528 spec = InterfaceExtractor.extract_from_file(full_path)
529 interface_specs.append(spec)
531 # Obfuscate paths if requested
532 actual_target_files = target_files
533 if obfuscate_paths:
534 path_map = {}
535 for i, p in enumerate(all_files):
536 obf = f"module_{i:03d}.py"
537 path_map[p] = obf
538 actual_target_files = [path_map.get(f, f) for f in target_files]
539 for spec in interface_specs:
540 spec.file_path = path_map.get(spec.file_path, spec.file_path)
541 full_content = {path_map.get(k, k): v for k, v in full_content.items()}
543 shard = CodeShard(
544 shard_id=shard_id,
545 task_description=task,
546 scope=scope,
547 target_files=actual_target_files,
548 interface_specs=interface_specs,
549 full_content=full_content,
550 test_expectations=test_expectations or [],
551 dependencies=depends_on or [],
552 expires_at=time.time() + self.shard_ttl,
553 obfuscated=obfuscate_paths,
554 )
556 self._active_shards[shard_id] = shard
557 logger.info(
558 f"Shard [{shard_id}]: {len(target_files)} target files, "
559 f"scope={scope.value}, task={task[:60]}..."
560 )
561 return shard
563 def create_call_chain_shard(self, task: str, target_file: str,
564 target_function: str) -> CodeShard:
565 """Create a shard with call-chain context for a specific function.
567 Context model:
568 Target function: FULL source
569 Upstream callers: FULL source (who calls it)
570 Downstream callees: FULL source (what it calls)
571 Same-file others: Interfaces only
572 Other files: Interfaces only (imported modules)
574 Uses Trueflow MCP (analyze_call_tree) when IDE is running,
575 falls back to AST-based static analysis on headless nodes.
576 """
577 shard_id = hashlib.sha256(
578 f"cc:{target_file}:{target_function}:{time.time()}".encode()
579 ).hexdigest()[:12]
581 full_path = os.path.join(self.code_root, target_file)
583 # Get call chain (Trueflow → AST fallback)
584 chain = CallGraphExtractor.get_call_chain(
585 full_path, target_function, self.code_root)
587 # Build context: full source for call chain, interfaces for rest
588 full_content = {}
589 interface_specs = []
591 # 1. Target function — FULL source
592 target_src = CallGraphExtractor.extract_function_source(
593 full_path, target_function)
594 if target_src:
595 full_content[f'{target_file}::{target_function}'] = target_src
597 # 2. Upstream callers — FULL source
598 for caller in chain.get('upstream', []):
599 if ':' in caller: # Cross-file: "path/file.py:func_name"
600 cfile, cfunc = caller.rsplit(':', 1)
601 cfull = os.path.join(self.code_root, cfile)
602 src = CallGraphExtractor.extract_function_source(cfull, cfunc)
603 if src:
604 full_content[f'{cfile}::{cfunc}'] = src
605 else: # Same file
606 src = CallGraphExtractor.extract_function_source(
607 full_path, caller)
608 if src:
609 full_content[f'{target_file}::{caller}'] = src
611 # 3. Downstream callees — FULL source (same file only;
612 # cross-file callees get interfaces via imports)
613 for callee in chain.get('downstream', []):
614 src = CallGraphExtractor.extract_function_source(
615 full_path, callee)
616 if src:
617 full_content[f'{target_file}::{callee}'] = src
619 # 4. Same-file interfaces (everything NOT in the call chain)
620 spec = InterfaceExtractor.extract_from_file(full_path)
621 call_chain_names = (
622 {target_function} |
623 set(chain.get('upstream', [])) |
624 set(chain.get('downstream', []))
625 )
626 # Strip cross-file references from the set
627 call_chain_names = {
628 n.split(':')[-1] if ':' in n else n for n in call_chain_names
629 }
630 # Keep only interface entries for functions NOT in call chain
631 spec.functions = [
632 f for f in spec.functions if f['name'] not in call_chain_names
633 ]
634 interface_specs.append(spec)
636 # 5. Imported module interfaces (downstream cross-file context)
637 for imp in spec.imports:
638 # Resolve "from X import Y" to file path
639 if imp.startswith('from '):
640 parts = imp.split()
641 if len(parts) >= 2:
642 mod_path = parts[1].replace('.', os.sep) + '.py'
643 mod_full = os.path.join(self.code_root, mod_path)
644 if os.path.exists(mod_full):
645 mod_spec = InterfaceExtractor.extract_from_file(mod_full)
646 interface_specs.append(mod_spec)
648 shard = CodeShard(
649 shard_id=shard_id,
650 task_description=task,
651 scope=ShardScope.CALL_CHAIN,
652 target_files=[target_file],
653 interface_specs=interface_specs,
654 full_content=full_content,
655 test_expectations=[],
656 dependencies=[],
657 expires_at=time.time() + self.shard_ttl,
658 )
659 self._active_shards[shard_id] = shard
661 logger.info(
662 f"Call-chain shard [{shard_id}]: {target_file}::{target_function}, "
663 f"{len(chain.get('upstream', []))} upstream, "
664 f"{len(chain.get('downstream', []))} downstream, "
665 f"source={chain.get('source', 'unknown')}")
666 return shard
668 def decompose_task(self, task: str, scope: ShardScope = ShardScope.INTERFACES,
669 max_files_per_shard: int = 5) -> List[CodeShard]:
670 """Auto-decompose a task into multiple shards.
672 Uses the interface map to determine which files are relevant,
673 then groups them into shards with minimal overlap.
674 """
675 # Get interface map
676 imap = self.get_interface_map()
678 # Simple keyword matching to find relevant files
679 task_lower = task.lower()
680 relevant_files = []
681 for rel_path, spec in imap.items():
682 # Check if file content matches the task
683 all_names = (
684 [f['name'] for f in spec.functions] +
685 [c['name'] for c in spec.classes] +
686 [rel_path]
687 )
688 score = sum(1 for name in all_names if name.lower() in task_lower)
689 if score > 0:
690 relevant_files.append((rel_path, score))
692 if not relevant_files:
693 # Fallback: return a single shard with just the task description
694 return [self.create_shard(task, [], scope)]
696 # Sort by relevance
697 relevant_files.sort(key=lambda x: -x[1])
699 # Group into shards
700 shards = []
701 remaining = [f for f, _ in relevant_files]
702 while remaining:
703 chunk = remaining[:max_files_per_shard]
704 remaining = remaining[max_files_per_shard:]
705 shard = self.create_shard(
706 task=task,
707 target_files=chunk,
708 scope=scope,
709 )
710 shards.append(shard)
712 return shards
714 def validate_result(self, shard_id: str, result: ShardResult) -> Tuple[bool, str]:
715 """Validate a shard result before applying it.
717 Runs on the TRUSTED node only.
718 """
719 shard = self._active_shards.get(shard_id)
720 if not shard:
721 return False, 'Unknown shard ID'
722 if shard.is_expired():
723 return False, 'Shard has expired'
724 if not result.success:
725 return False, f'Agent reported failure: {result.error}'
727 # Check that diffs only touch target files
728 for diff_path in result.diffs:
729 if diff_path not in shard.target_files:
730 return False, f'Diff modifies unauthorized file: {diff_path}'
732 return True, 'Result validated'
734 def cleanup_expired(self) -> int:
735 """Remove expired shards."""
736 expired = [
737 sid for sid, s in self._active_shards.items()
738 if s.is_expired()
739 ]
740 for sid in expired:
741 del self._active_shards[sid]
742 return len(expired)
744 def get_stats(self) -> Dict:
745 """Engine statistics."""
746 return {
747 'active_shards': len(self._active_shards),
748 'interface_cache_size': len(self._interface_cache),
749 'code_root': self.code_root,
750 'shard_ttl': self.shard_ttl,
751 }
754# ═══════════════════════════════════════════════════════════════════════
755# Singleton
756# ═══════════════════════════════════════════════════════════════════════
758_engine: Optional[ShardEngine] = None
761def get_shard_engine() -> ShardEngine:
762 """Get or create the shard engine singleton."""
763 global _engine
764 if _engine is None:
765 _engine = ShardEngine()
766 return _engine