Coverage for integrations / agent_engine / shard_engine.py: 47.6%

393 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Shard Engine — Context-scoped task decomposition. 

3 

4When a code task arrives: 

5 1. Identify target function(s) to modify 

6 2. Build call-chain context: upstream callers + downstream callees (FULL source) 

7 3. Everything else: interfaces only (signatures + types + docstrings) 

8 4. Agent sees enough to code accurately, exposure proportional to task 

9 

10Context model (CALL_CHAIN scope): 

11 Target function: FULL implementation (what you're modifying) 

12 Upstream callers: FULL implementation (who calls it, input contracts) 

13 Downstream callees: FULL implementation (what it calls, output contracts) 

14 Everything else: Interfaces only (signatures + types) 

15 

16Security model: 

17 - Exposure proportional to task, not whole codebase 

18 - E2E encrypted between nodes 

19 - Trust-based peer access (SAME_USER / autotrust) 

20 - Validation on trusted node before applying diffs 

21""" 

22 

23import ast 

24import hashlib 

25import json 

26import logging 

27import os 

28import time 

29from dataclasses import dataclass, field, asdict 

30from enum import Enum 

31from typing import Any, Dict, List, Optional, Set, Tuple 

32 

33logger = logging.getLogger('hevolve.shard_engine') 

34 

35 

36class ShardScope(str, Enum): 

37 """What a shard agent is allowed to see.""" 

38 FULL_FILE = 'full_file' # See complete file(s) — trusted agent only 

39 CALL_CHAIN = 'call_chain' # Target func + upstream/downstream FULL, rest interfaces 

40 INTERFACES = 'interfaces' # See signatures + types, not implementations 

41 SIGNATURES = 'signatures' # See function/class names + params only 

42 MINIMAL = 'minimal' # See only the specific function to modify 

43 

44 

45@dataclass 

46class InterfaceSpec: 

47 """Extracted interface from a Python file — what agents see instead of full code.""" 

48 file_path: str 

49 classes: List[Dict[str, Any]] = field(default_factory=list) 

50 functions: List[Dict[str, Any]] = field(default_factory=list) 

51 imports: List[str] = field(default_factory=list) 

52 constants: List[Dict[str, str]] = field(default_factory=list) 

53 module_doc: str = '' 

54 

55 

56@dataclass 

57class CodeShard: 

58 """An isolated unit of work for a compute agent.""" 

59 shard_id: str 

60 task_description: str 

61 scope: ShardScope 

62 target_files: List[str] # Files to modify 

63 interface_specs: List[InterfaceSpec] # What the agent can see 

64 full_content: Dict[str, str] # file_path → content (only for scope=FULL_FILE) 

65 test_expectations: List[str] # What the result should satisfy 

66 dependencies: List[str] # Other shard IDs this depends on 

67 expires_at: float # Auto-expire timestamp 

68 obfuscated: bool = False # Whether paths are scrambled 

69 

70 def to_dict(self) -> Dict: 

71 return { 

72 'shard_id': self.shard_id, 

73 'task_description': self.task_description, 

74 'scope': self.scope.value, 

75 'target_files': self.target_files, 

76 'interfaces': [asdict(s) for s in self.interface_specs], 

77 'full_content': self.full_content, 

78 'test_expectations': self.test_expectations, 

79 'dependencies': self.dependencies, 

80 'expires_at': self.expires_at, 

81 'obfuscated': self.obfuscated, 

82 } 

83 

84 def is_expired(self) -> bool: 

85 return time.time() > self.expires_at 

86 

87 

88@dataclass 

89class ShardResult: 

90 """Result from an agent working on a shard.""" 

91 shard_id: str 

92 diffs: Dict[str, str] # file_path → unified diff 

93 test_results: Optional[str] # stdout from test run 

94 success: bool 

95 error: Optional[str] = None 

96 

97 

98class CallGraphExtractor: 

99 """Extract upstream callers and downstream callees for a target function. 

100 

101 Primary: Trueflow MCP (port 5681) — has runtime call tree + coverage. 

102 Fallback: AST-based static analysis — walks function bodies for Name references. 

103 """ 

104 

105 @staticmethod 

106 def get_call_chain(file_path: str, function_name: str, 

107 code_root: str = '') -> Dict[str, List[str]]: 

108 """Get upstream (callers) and downstream (callees) for a function. 

109 

110 Returns: 

111 {'target': 'func_name', 

112 'upstream': ['caller1', 'caller2'], 

113 'downstream': ['callee1', 'callee2'], 

114 'source': 'trueflow' | 'ast'} 

115 """ 

116 # Try Trueflow MCP first (richer: runtime coverage + dynamic call tree) 

117 try: 

118 chain = CallGraphExtractor._from_trueflow(file_path, function_name) 

119 if chain.get('upstream') or chain.get('downstream'): 

120 return chain 

121 except Exception: 

122 pass 

123 

124 # Fallback: static AST analysis 

125 return CallGraphExtractor._from_ast(file_path, function_name, code_root) 

126 

127 @staticmethod 

128 def _from_trueflow(file_path: str, function_name: str) -> Dict: 

129 """Get call chain from Trueflow MCP (IDE must be running).""" 

130 from core.http_pool import pooled_post 

131 trueflow_url = os.environ.get('TRUEFLOW_MCP_URL', 

132 'http://localhost:5681') 

133 resp = pooled_post( 

134 f'{trueflow_url}/analyze_call_tree', 

135 json={'file': file_path, 'function': function_name}, 

136 timeout=10, 

137 ) 

138 if resp.status_code == 200: 

139 data = resp.json() 

140 return { 

141 'target': function_name, 

142 'upstream': data.get('callers', []), 

143 'downstream': data.get('callees', []), 

144 'source': 'trueflow', 

145 } 

146 return {'target': function_name, 'upstream': [], 'downstream': [], 

147 'source': 'trueflow'} 

148 

149 @staticmethod 

150 def _from_ast(file_path: str, function_name: str, 

151 code_root: str = '') -> Dict: 

152 """Static call graph via AST. Scans target file + importers.""" 

153 result = { 

154 'target': function_name, 

155 'upstream': [], 

156 'downstream': [], 

157 'source': 'ast', 

158 } 

159 

160 try: 

161 with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: 

162 source = f.read() 

163 tree = ast.parse(source) 

164 except (IOError, SyntaxError): 

165 return result 

166 

167 all_functions = {} # name → ast node 

168 for node in ast.walk(tree): 

169 if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): 

170 all_functions[node.name] = node 

171 

172 # Downstream: what does the target function call? 

173 target_node = all_functions.get(function_name) 

174 if target_node: 

175 for node in ast.walk(target_node): 

176 if isinstance(node, ast.Call): 

177 callee = None 

178 if isinstance(node.func, ast.Name): 

179 callee = node.func.id 

180 elif isinstance(node.func, ast.Attribute): 

181 callee = node.func.attr 

182 if callee and callee != function_name: 

183 if callee not in result['downstream']: 

184 result['downstream'].append(callee) 

185 

186 # Upstream: which functions in this file call the target? 

187 for fname, fnode in all_functions.items(): 

188 if fname == function_name: 

189 continue 

190 for node in ast.walk(fnode): 

191 if isinstance(node, ast.Call): 

192 callee = None 

193 if isinstance(node.func, ast.Name): 

194 callee = node.func.id 

195 elif isinstance(node.func, ast.Attribute): 

196 callee = node.func.attr 

197 if callee == function_name: 

198 if fname not in result['upstream']: 

199 result['upstream'].append(fname) 

200 break 

201 

202 # Cross-file upstream: scan other files that import from this module 

203 if code_root: 

204 target_module = os.path.relpath(file_path, code_root) 

205 target_module = target_module.replace(os.sep, '.').rstrip('.py') 

206 result['upstream'].extend( 

207 CallGraphExtractor._find_cross_file_callers( 

208 code_root, target_module, function_name)) 

209 

210 return result 

211 

212 @staticmethod 

213 def _find_cross_file_callers(code_root: str, target_module: str, 

214 function_name: str) -> List[str]: 

215 """Find functions in other files that import and call the target.""" 

216 callers = [] 

217 exclude = {'__pycache__', 'venv', '.venv', 'venv310', '.git', 

218 'node_modules', 'agent_data', 'tests', 'build'} 

219 for root, dirs, files in os.walk(code_root): 

220 dirs[:] = [d for d in dirs if d not in exclude] 

221 for fname in files: 

222 if not fname.endswith('.py'): 

223 continue 

224 full_path = os.path.join(root, fname) 

225 try: 

226 with open(full_path, 'r', encoding='utf-8', 

227 errors='ignore') as f: 

228 src = f.read() 

229 tree = ast.parse(src) 

230 except (IOError, SyntaxError): 

231 continue 

232 

233 # Check if this file imports our target function 

234 imports_target = False 

235 for node in ast.walk(tree): 

236 if isinstance(node, ast.ImportFrom): 

237 if node.module and target_module in node.module: 

238 for alias in node.names: 

239 if alias.name == function_name: 

240 imports_target = True 

241 break 

242 if imports_target: 

243 break 

244 if not imports_target: 

245 continue 

246 

247 # Find which functions call it 

248 rel = os.path.relpath(full_path, code_root) 

249 for node in ast.walk(tree): 

250 if isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): 

251 for child in ast.walk(node): 

252 if isinstance(child, ast.Call): 

253 cname = None 

254 if isinstance(child.func, ast.Name): 

255 cname = child.func.id 

256 elif isinstance(child.func, ast.Attribute): 

257 cname = child.func.attr 

258 if cname == function_name: 

259 caller_id = f"{rel}:{node.name}" 

260 if caller_id not in callers: 

261 callers.append(caller_id) 

262 break 

263 if len(callers) >= 20: # Cap cross-file search 

264 break 

265 return callers 

266 

267 @staticmethod 

268 def extract_function_source(file_path: str, function_name: str) -> str: 

269 """Extract the full source code of a specific function from a file.""" 

270 try: 

271 with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: 

272 lines = f.readlines() 

273 source = ''.join(lines) 

274 tree = ast.parse(source) 

275 except (IOError, SyntaxError): 

276 return '' 

277 

278 for node in ast.walk(tree): 

279 if (isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)) 

280 and node.name == function_name): 

281 start = node.lineno - 1 

282 end = getattr(node, 'end_lineno', start + 1) 

283 return ''.join(lines[start:end]) 

284 return '' 

285 

286 

287class InterfaceExtractor: 

288 """Extract function signatures, class definitions, and types from Python files. 

289 

290 This is what agents see instead of full implementations. 

291 They get enough to understand the API contract without seeing the logic. 

292 """ 

293 

294 @staticmethod 

295 def extract_from_file(file_path: str) -> InterfaceSpec: 

296 """Extract public interface from a Python file.""" 

297 spec = InterfaceSpec(file_path=file_path) 

298 

299 try: 

300 with open(file_path, 'r', encoding='utf-8', errors='ignore') as f: 

301 source = f.read() 

302 except (IOError, OSError): 

303 return spec 

304 

305 try: 

306 tree = ast.parse(source) 

307 except SyntaxError: 

308 return spec 

309 

310 # Module docstring 

311 if (tree.body and isinstance(tree.body[0], ast.Expr) 

312 and isinstance(tree.body[0].value, (ast.Constant, ast.Str))): 

313 doc = tree.body[0].value 

314 spec.module_doc = doc.value if isinstance(doc, ast.Constant) else doc.s 

315 

316 for node in ast.iter_child_nodes(tree): 

317 # Imports 

318 if isinstance(node, ast.Import): 

319 for alias in node.names: 

320 spec.imports.append(f"import {alias.name}") 

321 elif isinstance(node, ast.ImportFrom): 

322 names = ', '.join(a.name for a in node.names) 

323 spec.imports.append(f"from {node.module} import {names}") 

324 

325 # Functions 

326 elif isinstance(node, (ast.FunctionDef, ast.AsyncFunctionDef)): 

327 if node.name.startswith('_') and not node.name.startswith('__'): 

328 continue # Skip private functions 

329 func_info = InterfaceExtractor._extract_function(node) 

330 spec.functions.append(func_info) 

331 

332 # Classes 

333 elif isinstance(node, ast.ClassDef): 

334 cls_info = InterfaceExtractor._extract_class(node) 

335 spec.classes.append(cls_info) 

336 

337 # Module-level constants (UPPER_CASE assignments) 

338 elif isinstance(node, ast.Assign): 

339 for target in node.targets: 

340 if isinstance(target, ast.Name) and target.id.isupper(): 

341 value_str = ast.dump(node.value)[:100] 

342 spec.constants.append({ 

343 'name': target.id, 

344 'value_hint': value_str, 

345 }) 

346 

347 return spec 

348 

349 @staticmethod 

350 def _extract_function(node) -> Dict: 

351 """Extract function signature (name, args, return type, docstring).""" 

352 args = [] 

353 for arg in node.args.args: 

354 arg_info = {'name': arg.arg} 

355 if arg.annotation: 

356 arg_info['type'] = ast.unparse(arg.annotation) 

357 args.append(arg_info) 

358 

359 # Defaults 

360 defaults = node.args.defaults 

361 if defaults: 

362 offset = len(args) - len(defaults) 

363 for i, d in enumerate(defaults): 

364 try: 

365 args[offset + i]['default'] = ast.unparse(d) 

366 except Exception: 

367 pass 

368 

369 info = { 

370 'name': node.name, 

371 'args': args, 

372 'async': isinstance(node, ast.AsyncFunctionDef), 

373 } 

374 

375 # Return type 

376 if node.returns: 

377 info['return_type'] = ast.unparse(node.returns) 

378 

379 # Docstring 

380 if (node.body and isinstance(node.body[0], ast.Expr) 

381 and isinstance(node.body[0].value, (ast.Constant, ast.Str))): 

382 doc = node.body[0].value 

383 info['docstring'] = doc.value if isinstance(doc, ast.Constant) else doc.s 

384 

385 # Decorators 

386 if node.decorator_list: 

387 info['decorators'] = [ast.unparse(d) for d in node.decorator_list] 

388 

389 return info 

390 

391 @staticmethod 

392 def _extract_class(node: ast.ClassDef) -> Dict: 

393 """Extract class interface (name, bases, methods, docstring).""" 

394 info = { 

395 'name': node.name, 

396 'bases': [ast.unparse(b) for b in node.bases], 

397 'methods': [], 

398 } 

399 

400 # Docstring 

401 if (node.body and isinstance(node.body[0], ast.Expr) 

402 and isinstance(node.body[0].value, (ast.Constant, ast.Str))): 

403 doc = node.body[0].value 

404 info['docstring'] = doc.value if isinstance(doc, ast.Constant) else doc.s 

405 

406 # Methods (public only) 

407 for item in node.body: 

408 if isinstance(item, (ast.FunctionDef, ast.AsyncFunctionDef)): 

409 if item.name.startswith('_') and not item.name.startswith('__'): 

410 continue 

411 method_info = InterfaceExtractor._extract_function(item) 

412 info['methods'].append(method_info) 

413 

414 return info 

415 

416 @staticmethod 

417 def extract_from_directory(dir_path: str, 

418 extensions: Tuple[str, ...] = ('.py',), 

419 exclude_dirs: Optional[Set[str]] = None 

420 ) -> Dict[str, InterfaceSpec]: 

421 """Extract interfaces from all Python files in a directory.""" 

422 exclude = exclude_dirs or { 

423 '__pycache__', 'venv', '.venv', 'venv310', '.git', 

424 'node_modules', 'agent_data', 'tests', 

425 } 

426 specs = {} 

427 for root, dirs, files in os.walk(dir_path): 

428 dirs[:] = [d for d in dirs if d not in exclude] 

429 for fname in files: 

430 if any(fname.endswith(ext) for ext in extensions): 

431 full_path = os.path.join(root, fname) 

432 rel_path = os.path.relpath(full_path, dir_path) 

433 specs[rel_path] = InterfaceExtractor.extract_from_file(full_path) 

434 return specs 

435 

436 

437class ShardEngine: 

438 """Context-scoped task decomposition engine. 

439 

440 Runs on the TRUSTED node (user's machine). Creates shards with 

441 context proportional to the task — not the whole codebase. 

442 

443 CALL_CHAIN scope (default for distributed work): 

444 Target function: FULL source 

445 Upstream callers: FULL source (input contracts) 

446 Downstream callees: FULL source (output contracts) 

447 Everything else: Interfaces only (signatures + types) 

448 

449 Call graph from Trueflow MCP (when IDE running) or AST fallback. 

450 """ 

451 

452 def __init__(self, code_root: str = None, shard_ttl: int = 3600): 

453 self.code_root = code_root or os.environ.get( 

454 'HART_INSTALL_DIR', 

455 os.path.dirname(os.path.dirname(os.path.dirname( 

456 os.path.abspath(__file__)))) 

457 ) 

458 self.shard_ttl = shard_ttl # Shards expire after 1 hour 

459 self._interface_cache: Dict[str, InterfaceSpec] = {} 

460 self._active_shards: Dict[str, CodeShard] = {} 

461 

462 def get_interface_map(self, refresh: bool = False) -> Dict[str, InterfaceSpec]: 

463 """Get the full interface map of the codebase. 

464 

465 This is what the cloud orchestrator sees (PARTIAL TRUST): 

466 function signatures, types, imports — NOT implementations. 

467 """ 

468 if self._interface_cache and not refresh: 

469 return self._interface_cache 

470 self._interface_cache = InterfaceExtractor.extract_from_directory( 

471 self.code_root) 

472 return self._interface_cache 

473 

474 def create_shard(self, task: str, target_files: List[str], 

475 scope: ShardScope = ShardScope.INTERFACES, 

476 related_files: Optional[List[str]] = None, 

477 test_expectations: Optional[List[str]] = None, 

478 depends_on: Optional[List[str]] = None, 

479 obfuscate_paths: bool = False) -> CodeShard: 

480 """Create a code shard for a compute agent. 

481 

482 Args: 

483 task: Description of what the agent should do 

484 target_files: Files the agent will modify 

485 scope: How much of each file the agent can see 

486 related_files: Additional files the agent needs for context (read-only) 

487 test_expectations: What the result should satisfy 

488 depends_on: Other shard IDs that must complete first 

489 obfuscate_paths: Scramble file paths for extra privacy 

490 """ 

491 shard_id = hashlib.sha256( 

492 f"shard:{task}:{time.time()}".encode() 

493 ).hexdigest()[:12] 

494 

495 all_files = list(set(target_files + (related_files or []))) 

496 interface_specs = [] 

497 full_content = {} 

498 

499 for rel_path in all_files: 

500 full_path = os.path.join(self.code_root, rel_path) 

501 if not os.path.exists(full_path): 

502 continue 

503 

504 if scope == ShardScope.FULL_FILE: 

505 # Trusted agent — gets everything 

506 try: 

507 with open(full_path, 'r', encoding='utf-8', errors='ignore') as f: 

508 full_content[rel_path] = f.read() 

509 except IOError: 

510 pass 

511 elif scope in (ShardScope.INTERFACES, ShardScope.SIGNATURES): 

512 # Gets signatures and types, not implementations 

513 spec = InterfaceExtractor.extract_from_file(full_path) 

514 if scope == ShardScope.SIGNATURES: 

515 # Strip docstrings and constants for minimal scope 

516 spec.module_doc = '' 

517 spec.constants = [] 

518 for func in spec.functions: 

519 func.pop('docstring', None) 

520 for cls in spec.classes: 

521 cls.pop('docstring', None) 

522 for m in cls.get('methods', []): 

523 m.pop('docstring', None) 

524 interface_specs.append(spec) 

525 elif scope == ShardScope.MINIMAL: 

526 # Only give the specific function to modify 

527 # Full file content but stripped to relevant functions 

528 spec = InterfaceExtractor.extract_from_file(full_path) 

529 interface_specs.append(spec) 

530 

531 # Obfuscate paths if requested 

532 actual_target_files = target_files 

533 if obfuscate_paths: 

534 path_map = {} 

535 for i, p in enumerate(all_files): 

536 obf = f"module_{i:03d}.py" 

537 path_map[p] = obf 

538 actual_target_files = [path_map.get(f, f) for f in target_files] 

539 for spec in interface_specs: 

540 spec.file_path = path_map.get(spec.file_path, spec.file_path) 

541 full_content = {path_map.get(k, k): v for k, v in full_content.items()} 

542 

543 shard = CodeShard( 

544 shard_id=shard_id, 

545 task_description=task, 

546 scope=scope, 

547 target_files=actual_target_files, 

548 interface_specs=interface_specs, 

549 full_content=full_content, 

550 test_expectations=test_expectations or [], 

551 dependencies=depends_on or [], 

552 expires_at=time.time() + self.shard_ttl, 

553 obfuscated=obfuscate_paths, 

554 ) 

555 

556 self._active_shards[shard_id] = shard 

557 logger.info( 

558 f"Shard [{shard_id}]: {len(target_files)} target files, " 

559 f"scope={scope.value}, task={task[:60]}..." 

560 ) 

561 return shard 

562 

563 def create_call_chain_shard(self, task: str, target_file: str, 

564 target_function: str) -> CodeShard: 

565 """Create a shard with call-chain context for a specific function. 

566 

567 Context model: 

568 Target function: FULL source 

569 Upstream callers: FULL source (who calls it) 

570 Downstream callees: FULL source (what it calls) 

571 Same-file others: Interfaces only 

572 Other files: Interfaces only (imported modules) 

573 

574 Uses Trueflow MCP (analyze_call_tree) when IDE is running, 

575 falls back to AST-based static analysis on headless nodes. 

576 """ 

577 shard_id = hashlib.sha256( 

578 f"cc:{target_file}:{target_function}:{time.time()}".encode() 

579 ).hexdigest()[:12] 

580 

581 full_path = os.path.join(self.code_root, target_file) 

582 

583 # Get call chain (Trueflow → AST fallback) 

584 chain = CallGraphExtractor.get_call_chain( 

585 full_path, target_function, self.code_root) 

586 

587 # Build context: full source for call chain, interfaces for rest 

588 full_content = {} 

589 interface_specs = [] 

590 

591 # 1. Target function — FULL source 

592 target_src = CallGraphExtractor.extract_function_source( 

593 full_path, target_function) 

594 if target_src: 

595 full_content[f'{target_file}::{target_function}'] = target_src 

596 

597 # 2. Upstream callers — FULL source 

598 for caller in chain.get('upstream', []): 

599 if ':' in caller: # Cross-file: "path/file.py:func_name" 

600 cfile, cfunc = caller.rsplit(':', 1) 

601 cfull = os.path.join(self.code_root, cfile) 

602 src = CallGraphExtractor.extract_function_source(cfull, cfunc) 

603 if src: 

604 full_content[f'{cfile}::{cfunc}'] = src 

605 else: # Same file 

606 src = CallGraphExtractor.extract_function_source( 

607 full_path, caller) 

608 if src: 

609 full_content[f'{target_file}::{caller}'] = src 

610 

611 # 3. Downstream callees — FULL source (same file only; 

612 # cross-file callees get interfaces via imports) 

613 for callee in chain.get('downstream', []): 

614 src = CallGraphExtractor.extract_function_source( 

615 full_path, callee) 

616 if src: 

617 full_content[f'{target_file}::{callee}'] = src 

618 

619 # 4. Same-file interfaces (everything NOT in the call chain) 

620 spec = InterfaceExtractor.extract_from_file(full_path) 

621 call_chain_names = ( 

622 {target_function} | 

623 set(chain.get('upstream', [])) | 

624 set(chain.get('downstream', [])) 

625 ) 

626 # Strip cross-file references from the set 

627 call_chain_names = { 

628 n.split(':')[-1] if ':' in n else n for n in call_chain_names 

629 } 

630 # Keep only interface entries for functions NOT in call chain 

631 spec.functions = [ 

632 f for f in spec.functions if f['name'] not in call_chain_names 

633 ] 

634 interface_specs.append(spec) 

635 

636 # 5. Imported module interfaces (downstream cross-file context) 

637 for imp in spec.imports: 

638 # Resolve "from X import Y" to file path 

639 if imp.startswith('from '): 

640 parts = imp.split() 

641 if len(parts) >= 2: 

642 mod_path = parts[1].replace('.', os.sep) + '.py' 

643 mod_full = os.path.join(self.code_root, mod_path) 

644 if os.path.exists(mod_full): 

645 mod_spec = InterfaceExtractor.extract_from_file(mod_full) 

646 interface_specs.append(mod_spec) 

647 

648 shard = CodeShard( 

649 shard_id=shard_id, 

650 task_description=task, 

651 scope=ShardScope.CALL_CHAIN, 

652 target_files=[target_file], 

653 interface_specs=interface_specs, 

654 full_content=full_content, 

655 test_expectations=[], 

656 dependencies=[], 

657 expires_at=time.time() + self.shard_ttl, 

658 ) 

659 self._active_shards[shard_id] = shard 

660 

661 logger.info( 

662 f"Call-chain shard [{shard_id}]: {target_file}::{target_function}, " 

663 f"{len(chain.get('upstream', []))} upstream, " 

664 f"{len(chain.get('downstream', []))} downstream, " 

665 f"source={chain.get('source', 'unknown')}") 

666 return shard 

667 

668 def decompose_task(self, task: str, scope: ShardScope = ShardScope.INTERFACES, 

669 max_files_per_shard: int = 5) -> List[CodeShard]: 

670 """Auto-decompose a task into multiple shards. 

671 

672 Uses the interface map to determine which files are relevant, 

673 then groups them into shards with minimal overlap. 

674 """ 

675 # Get interface map 

676 imap = self.get_interface_map() 

677 

678 # Simple keyword matching to find relevant files 

679 task_lower = task.lower() 

680 relevant_files = [] 

681 for rel_path, spec in imap.items(): 

682 # Check if file content matches the task 

683 all_names = ( 

684 [f['name'] for f in spec.functions] + 

685 [c['name'] for c in spec.classes] + 

686 [rel_path] 

687 ) 

688 score = sum(1 for name in all_names if name.lower() in task_lower) 

689 if score > 0: 

690 relevant_files.append((rel_path, score)) 

691 

692 if not relevant_files: 

693 # Fallback: return a single shard with just the task description 

694 return [self.create_shard(task, [], scope)] 

695 

696 # Sort by relevance 

697 relevant_files.sort(key=lambda x: -x[1]) 

698 

699 # Group into shards 

700 shards = [] 

701 remaining = [f for f, _ in relevant_files] 

702 while remaining: 

703 chunk = remaining[:max_files_per_shard] 

704 remaining = remaining[max_files_per_shard:] 

705 shard = self.create_shard( 

706 task=task, 

707 target_files=chunk, 

708 scope=scope, 

709 ) 

710 shards.append(shard) 

711 

712 return shards 

713 

714 def validate_result(self, shard_id: str, result: ShardResult) -> Tuple[bool, str]: 

715 """Validate a shard result before applying it. 

716 

717 Runs on the TRUSTED node only. 

718 """ 

719 shard = self._active_shards.get(shard_id) 

720 if not shard: 

721 return False, 'Unknown shard ID' 

722 if shard.is_expired(): 

723 return False, 'Shard has expired' 

724 if not result.success: 

725 return False, f'Agent reported failure: {result.error}' 

726 

727 # Check that diffs only touch target files 

728 for diff_path in result.diffs: 

729 if diff_path not in shard.target_files: 

730 return False, f'Diff modifies unauthorized file: {diff_path}' 

731 

732 return True, 'Result validated' 

733 

734 def cleanup_expired(self) -> int: 

735 """Remove expired shards.""" 

736 expired = [ 

737 sid for sid, s in self._active_shards.items() 

738 if s.is_expired() 

739 ] 

740 for sid in expired: 

741 del self._active_shards[sid] 

742 return len(expired) 

743 

744 def get_stats(self) -> Dict: 

745 """Engine statistics.""" 

746 return { 

747 'active_shards': len(self._active_shards), 

748 'interface_cache_size': len(self._interface_cache), 

749 'code_root': self.code_root, 

750 'shard_ttl': self.shard_ttl, 

751 } 

752 

753 

754# ═══════════════════════════════════════════════════════════════════════ 

755# Singleton 

756# ═══════════════════════════════════════════════════════════════════════ 

757 

758_engine: Optional[ShardEngine] = None 

759 

760 

761def get_shard_engine() -> ShardEngine: 

762 """Get or create the shard engine singleton.""" 

763 global _engine 

764 if _engine is None: 

765 _engine = ShardEngine() 

766 return _engine