Coverage for integrations / coding_agent / hive_task_protocol.py: 72.7%

381 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Hive Task Protocol — Define, dispatch, and track distributed coding tasks. 

3 

4Tasks flow through the hive: 

5 1. A seeded agent (or user) creates a HiveTask 

6 2. HiveTaskDispatcher finds the best available Claude Code session 

7 3. Task is shard-filtered based on trust level 

8 4. Session executes and reports result 

9 5. Result is validated, Spark reward calculated, capital distributed 

10 

11Task types map to seeded agents: 

12 - CODE_REVIEW: Review PR/code for quality, security 

13 - CODE_WRITE: Write new code for a feature/fix 

14 - CODE_TEST: Write or run tests 

15 - MODEL_ONBOARD: Quantize + onboard a new HF model 

16 - BENCHMARK: Run benchmarks on a model 

17 - DOCUMENTATION: Write docs for code 

18 - BUG_FIX: Fix a reported bug 

19 - REFACTOR: Improve code structure 

20 

21Storage: agent_data/hive_tasks.json (portable across nodes). 

22Thread-safe via Lock on all mutations. 

23""" 

24 

25import json 

26import logging 

27import os 

28import threading 

29import time 

30import uuid 

31from dataclasses import dataclass, field, asdict 

32from enum import Enum 

33from typing import Any, Dict, List, Optional 

34 

35logger = logging.getLogger(__name__) 

36 

37# ─── Storage path ─────────────────────────────────────────────────────── 

38# Must be user-writable. Previously resolved to the HART_INSTALL_DIR 

39# fallback which in bundled Nunba builds is `C:\Program Files (x86)\ 

40# HevolveAI\Nunba` — read-only for non-admin users, causing every 

41# `_save()` to fail with `[Errno 13] Permission denied` (task #250 

42# regression found by runtime-log-watcher on 2026-04-15). Use the 

43# platform-standard user data dir: `~/Documents/Nunba/data/agent_data` 

44# on all OSes, matching MemoryGraph + every other Nunba data path. 

45def _resolve_data_dir() -> str: 

46 try: 

47 from core.platform_paths import get_data_dir as _gdd 

48 _base = _gdd() 

49 except Exception: 

50 _base = os.path.join( 

51 os.path.expanduser('~'), 'Documents', 'Nunba', 'data', 

52 ) 

53 _dir = os.path.join(_base, 'agent_data') 

54 try: 

55 os.makedirs(_dir, exist_ok=True) 

56 except OSError: 

57 pass 

58 return _dir 

59 

60 

61_DATA_DIR = _resolve_data_dir() 

62_TASKS_FILE = os.path.join(_DATA_DIR, 'hive_tasks.json') 

63 

64 

65# ─── Task Types ───────────────────────────────────────────────────────── 

66 

67class HiveTaskType(str, Enum): 

68 CODE_REVIEW = 'code_review' 

69 CODE_WRITE = 'code_write' 

70 CODE_TEST = 'code_test' 

71 MODEL_ONBOARD = 'model_onboard' 

72 BENCHMARK = 'benchmark' 

73 DOCUMENTATION = 'documentation' 

74 BUG_FIX = 'bug_fix' 

75 REFACTOR = 'refactor' 

76 

77 

78class HiveTaskStatus(str, Enum): 

79 PENDING = 'pending' 

80 ASSIGNED = 'assigned' 

81 IN_PROGRESS = 'in_progress' 

82 COMPLETED = 'completed' 

83 FAILED = 'failed' 

84 VALIDATED = 'validated' 

85 CANCELLED = 'cancelled' 

86 

87 

88# Map seeded-agent slugs to the task types they create. 

89# The daemon can use this to auto-create tasks from active goals. 

90AGENT_TASK_MAP = { 

91 'bootstrap_compute_recruiter': HiveTaskType.CODE_WRITE, 

92 'bootstrap_model_provisioner': HiveTaskType.MODEL_ONBOARD, 

93 'bootstrap_capital_distributor': HiveTaskType.CODE_WRITE, 

94 'bootstrap_hive_model_trainer': HiveTaskType.BENCHMARK, 

95 'bootstrap_opensource_evangelist': HiveTaskType.DOCUMENTATION, 

96 'bootstrap_node_health_optimizer': HiveTaskType.BUG_FIX, 

97} 

98 

99# Base Spark reward ranges per task type (min, max). 

100_SPARK_RANGES = { 

101 HiveTaskType.CODE_REVIEW: (5, 30), 

102 HiveTaskType.CODE_WRITE: (10, 100), 

103 HiveTaskType.CODE_TEST: (8, 50), 

104 HiveTaskType.MODEL_ONBOARD: (15, 80), 

105 HiveTaskType.BENCHMARK: (5, 40), 

106 HiveTaskType.DOCUMENTATION: (3, 25), 

107 HiveTaskType.BUG_FIX: (10, 80), 

108 HiveTaskType.REFACTOR: (10, 60), 

109} 

110 

111 

112# ─── Dataclass ────────────────────────────────────────────────────────── 

113 

114@dataclass 

115class HiveTask: 

116 """A single distributed coding task for the hive.""" 

117 

118 task_id: str # UUID4 

119 task_type: str # HiveTaskType value 

120 title: str 

121 description: str 

122 instructions: str # Detailed instructions for Claude Code 

123 repo_url: str = '' # Git repo URL (empty for hive-internal) 

124 files_scope: List[str] = field(default_factory=list) # Files involved 

125 shard_level: str = 'INTERFACES' # Privacy level for untrusted sessions 

126 priority: int = 50 # 0-100 (higher = more urgent) 

127 spark_reward: int = 10 # Spark tokens for completion 

128 max_duration_minutes: int = 30 

129 requires_tests: bool = True 

130 requires_review: bool = True 

131 origin_node_id: str = '' # Who created this task 

132 origin_signature: str = '' # Ed25519 signature for verification 

133 assigned_session_id: str = '' 

134 status: str = 'pending' # HiveTaskStatus value 

135 result: Dict[str, Any] = field(default_factory=dict) 

136 created_at: float = 0.0 

137 completed_at: float = 0.0 

138 

139 def to_dict(self) -> Dict: 

140 return asdict(self) 

141 

142 @classmethod 

143 def from_dict(cls, d: Dict) -> 'HiveTask': 

144 """Reconstruct from a JSON-serialised dict, tolerating missing keys.""" 

145 known_fields = {f.name for f in cls.__dataclass_fields__.values()} 

146 filtered = {k: v for k, v in d.items() if k in known_fields} 

147 return cls(**filtered) 

148 

149 

150# ─── Helpers ──────────────────────────────────────────────────────────── 

151 

152def estimate_complexity(instructions: str) -> int: 

153 """Heuristic complexity estimate. Returns a Spark value 1-100. 

154 

155 Signals: 

156 - Instruction length (more text = more work) 

157 - File references (paths ending in .py, .js, etc.) 

158 - Presence of testing requirements 

159 - Presence of refactoring/migration keywords 

160 """ 

161 score = 0 

162 

163 # Length component: 1 point per 200 characters, capped at 40 

164 score += min(40, len(instructions) // 200) 

165 

166 instructions_lower = instructions.lower() 

167 

168 # File references 

169 file_extensions = ('.py', '.js', '.ts', '.rs', '.go', '.java', '.c', '.cpp') 

170 file_count = sum( 

171 1 for word in instructions.split() 

172 if any(word.endswith(ext) for ext in file_extensions) 

173 ) 

174 score += min(20, file_count * 3) 

175 

176 # Testing requirements 

177 test_keywords = ('test', 'pytest', 'unittest', 'coverage', 'assert') 

178 if any(kw in instructions_lower for kw in test_keywords): 

179 score += 10 

180 

181 # Refactoring/migration keywords 

182 hard_keywords = ('refactor', 'migrate', 'rewrite', 'restructure', 

183 'breaking change', 'backward compat') 

184 if any(kw in instructions_lower for kw in hard_keywords): 

185 score += 15 

186 

187 # Security keywords 

188 security_keywords = ('security', 'vulnerability', 'cve', 'injection', 

189 'authentication', 'authorization') 

190 if any(kw in instructions_lower for kw in security_keywords): 

191 score += 10 

192 

193 return max(1, min(100, score)) 

194 

195 

196def validate_result(task: HiveTask, result: Dict) -> float: 

197 """Quality score 0.0-1.0 for a task result. 

198 

199 Checks: 

200 - Files changed match the task scope 

201 - Tests included if task requires them 

202 - No PII leakage (DLP scan) 

203 - Result dict contains expected keys 

204 """ 

205 score = 0.0 

206 checks_total = 0 

207 

208 # 1. Result structure: must have 'files_changed' or 'diff' 

209 checks_total += 1 

210 if result.get('files_changed') or result.get('diff'): 

211 score += 1.0 

212 

213 # 2. Files match scope (if scope was defined) 

214 if task.files_scope: 

215 checks_total += 1 

216 changed = set(result.get('files_changed', [])) 

217 scope = set(task.files_scope) 

218 if changed and changed.issubset(scope): 

219 score += 1.0 

220 elif changed: 

221 # Partial credit: fraction within scope 

222 overlap = len(changed & scope) 

223 score += overlap / len(changed) if changed else 0.0 

224 

225 # 3. Tests included when required 

226 if task.requires_tests: 

227 checks_total += 1 

228 if result.get('tests_passed') is not None: 

229 score += 1.0 if result['tests_passed'] else 0.3 

230 elif result.get('test_output'): 

231 score += 0.5 # Tests ran but no pass/fail indicator 

232 

233 # 4. No errors reported 

234 checks_total += 1 

235 if not result.get('error'): 

236 score += 1.0 

237 

238 # 5. DLP scan on result text (try/except — optional dependency) 

239 result_text = json.dumps(result, default=str) 

240 checks_total += 1 

241 try: 

242 from security.dlp_engine import get_dlp_engine 

243 dlp = get_dlp_engine() 

244 findings = dlp.scan(result_text) 

245 if not findings: 

246 score += 1.0 

247 else: 

248 logger.warning( 

249 "DLP findings in task %s result: %d PII items", 

250 task.task_id, len(findings), 

251 ) 

252 # Partial credit: penalise proportionally 

253 score += max(0.0, 1.0 - len(findings) * 0.25) 

254 except Exception: 

255 # DLP not available — give benefit of the doubt 

256 score += 1.0 

257 

258 return round(score / checks_total, 3) if checks_total > 0 else 0.0 

259 

260 

261# ─── Persistence ──────────────────────────────────────────────────────── 

262 

263def _load_tasks() -> List[Dict]: 

264 """Load task list from JSON file.""" 

265 if not os.path.exists(_TASKS_FILE): 

266 return [] 

267 try: 

268 with open(_TASKS_FILE, 'r', encoding='utf-8') as f: 

269 data = json.load(f) 

270 return data if isinstance(data, list) else [] 

271 except (json.JSONDecodeError, IOError) as exc: 

272 logger.warning("Failed to load hive tasks: %s", exc) 

273 return [] 

274 

275 

276def _save_tasks(tasks: List[Dict]) -> None: 

277 """Atomically save task list to JSON file.""" 

278 os.makedirs(os.path.dirname(_TASKS_FILE), exist_ok=True) 

279 tmp_path = _TASKS_FILE + '.tmp' 

280 try: 

281 with open(tmp_path, 'w', encoding='utf-8') as f: 

282 json.dump(tasks, f, indent=2, default=str) 

283 # Atomic rename (works on POSIX; best-effort on Windows) 

284 if os.path.exists(_TASKS_FILE): 

285 os.replace(tmp_path, _TASKS_FILE) 

286 else: 

287 os.rename(tmp_path, _TASKS_FILE) 

288 except IOError as exc: 

289 logger.error("Failed to save hive tasks: %s", exc) 

290 

291 

292# ─── Dispatcher ───────────────────────────────────────────────────────── 

293 

294class HiveTaskDispatcher: 

295 """Create, dispatch, and track distributed coding tasks. 

296 

297 Finds pending tasks, matches them to connected Claude Code sessions 

298 based on capabilities and trust, then tracks results and distributes 

299 Spark rewards via the revenue aggregator. 

300 

301 Thread-safe: all task mutations are guarded by ``_lock``. 

302 """ 

303 

304 def __init__(self): 

305 self._lock = threading.Lock() 

306 self._tasks: Dict[str, HiveTask] = {} 

307 self._stats = { 

308 'total_created': 0, 

309 'total_completed': 0, 

310 'total_failed': 0, 

311 'total_spark_distributed': 0, 

312 'quality_scores': [], # Rolling window for avg 

313 } 

314 self._load_from_disk() 

315 

316 # ── Persistence ────────────────────────────────────────────────── 

317 

318 def _load_from_disk(self) -> None: 

319 raw = _load_tasks() 

320 for d in raw: 

321 try: 

322 task = HiveTask.from_dict(d) 

323 self._tasks[task.task_id] = task 

324 except Exception as exc: 

325 logger.debug("Skipping malformed task: %s", exc) 

326 

327 def _persist(self) -> None: 

328 """Save current task state to disk. Caller must hold _lock.""" 

329 _save_tasks([t.to_dict() for t in self._tasks.values()]) 

330 

331 # ── Task creation ──────────────────────────────────────────────── 

332 

333 def create_task(self, task_type: str, title: str, description: str, 

334 instructions: str, **kwargs) -> HiveTask: 

335 """Create and queue a new hive task. 

336 

337 Auto-calculates ``spark_reward`` from instruction complexity if 

338 not explicitly provided via *kwargs*. 

339 

340 Args: 

341 task_type: One of HiveTaskType values. 

342 title: Short human-readable title. 

343 description: What the task accomplishes. 

344 instructions: Detailed instructions for Claude Code. 

345 **kwargs: Override any HiveTask field (priority, spark_reward, etc.) 

346 

347 Returns: 

348 The newly created HiveTask (status='pending'). 

349 """ 

350 task_id = str(uuid.uuid4()) 

351 

352 # Auto-calculate Spark reward if not provided 

353 if 'spark_reward' not in kwargs: 

354 complexity = estimate_complexity(instructions) 

355 try: 

356 tt = HiveTaskType(task_type) 

357 lo, hi = _SPARK_RANGES.get(tt, (5, 50)) 

358 except ValueError: 

359 lo, hi = 5, 50 

360 kwargs['spark_reward'] = max(lo, min(hi, complexity)) 

361 

362 task = HiveTask( 

363 task_id=task_id, 

364 task_type=task_type, 

365 title=title, 

366 description=description, 

367 instructions=instructions, 

368 created_at=time.time(), 

369 **kwargs, 

370 ) 

371 

372 with self._lock: 

373 self._tasks[task_id] = task 

374 self._stats['total_created'] += 1 

375 self._persist() 

376 

377 logger.info( 

378 "Hive task created: [%s] %s (type=%s, spark=%d, priority=%d)", 

379 task_id[:8], title, task_type, task.spark_reward, task.priority, 

380 ) 

381 return task 

382 

383 # ── Dispatch ───────────────────────────────────────────────────── 

384 

385 def dispatch_pending(self) -> int: 

386 """Find pending tasks and dispatch to available sessions. 

387 

388 Called by the agent daemon on each tick. Matches tasks to the 

389 best available Claude Code session based on capabilities and 

390 trust level, then calls ``session.receive_task()``. 

391 

392 Returns: 

393 Number of tasks successfully dispatched. 

394 """ 

395 dispatched = 0 

396 pending = self.get_pending_tasks() 

397 if not pending: 

398 return 0 

399 

400 for task in pending: 

401 session_id = self.match_session(task) 

402 if not session_id: 

403 continue 

404 

405 # Deliver task to session 

406 delivered = self._deliver_to_session(session_id, task) 

407 if delivered: 

408 with self._lock: 

409 task.status = HiveTaskStatus.ASSIGNED.value 

410 task.assigned_session_id = session_id 

411 self._persist() 

412 dispatched += 1 

413 logger.info( 

414 "Dispatched task [%s] -> session [%s]", 

415 task.task_id[:8], session_id[:8], 

416 ) 

417 

418 return dispatched 

419 

420 def match_session(self, task: HiveTask) -> Optional[str]: 

421 """Find the best session for a task. 

422 

423 Selection criteria (in priority order): 

424 1. Filter by task scope (own_repos vs public vs any) 

425 2. Filter by capabilities (language, framework match) 

426 3. Prefer sessions with higher quality scores 

427 4. Prefer sessions with lower latency (same region) 

428 

429 Returns: 

430 session_id or None if no suitable session found. 

431 """ 

432 try: 

433 from integrations.coding_agent.claude_hive_session import ( 

434 get_session_registry, 

435 ) 

436 registry = get_session_registry() 

437 except ImportError: 

438 logger.debug("claude_hive_session not available for matching") 

439 return None 

440 except Exception as exc: 

441 logger.debug("Session registry unavailable: %s", exc) 

442 return None 

443 

444 if not hasattr(registry, 'get_available_sessions'): 

445 return None 

446 

447 try: 

448 sessions = registry.get_available_sessions() 

449 except Exception: 

450 return None 

451 

452 if not sessions: 

453 return None 

454 

455 # Score each session 

456 best_id = None 

457 best_score = -1.0 

458 

459 for session in sessions: 

460 sid = session.get('session_id', '') 

461 if not sid: 

462 continue 

463 

464 score = 0.0 

465 

466 # Capability match: language overlap 

467 session_langs = set(session.get('languages', [])) 

468 task_langs = set() 

469 for fpath in task.files_scope: 

470 ext = os.path.splitext(fpath)[1].lower() 

471 lang_map = { 

472 '.py': 'python', '.js': 'javascript', '.ts': 'typescript', 

473 '.rs': 'rust', '.go': 'go', '.java': 'java', 

474 '.c': 'c', '.cpp': 'cpp', 

475 } 

476 if ext in lang_map: 

477 task_langs.add(lang_map[ext]) 

478 if not task_langs or task_langs & session_langs: 

479 score += 2.0 # Match or no constraint 

480 

481 # Quality score 

482 quality = session.get('quality_score', 0.5) 

483 score += quality * 3.0 

484 

485 # Latency / region preference 

486 session_region = session.get('region', '') 

487 task_region = task.origin_node_id[:3] if task.origin_node_id else '' 

488 if session_region and task_region and session_region == task_region: 

489 score += 1.0 

490 

491 # Availability: prefer idle sessions 

492 if session.get('status') == 'idle': 

493 score += 1.5 

494 

495 if score > best_score: 

496 best_score = score 

497 best_id = sid 

498 

499 return best_id 

500 

501 def _deliver_to_session(self, session_id: str, task: HiveTask) -> bool: 

502 """Send a task to a Claude Code session. Returns True on success.""" 

503 try: 

504 from integrations.coding_agent.claude_hive_session import ( 

505 get_session_registry, 

506 ) 

507 registry = get_session_registry() 

508 session = registry.get_session(session_id) 

509 if session is None: 

510 return False 

511 

512 # Apply shard filtering based on trust level 

513 task_payload = task.to_dict() 

514 session_trust = getattr(session, 'trust_level', 'PEER') 

515 if session_trust not in ('SAME_USER', 'FULL_FILE'): 

516 task_payload = self._apply_shard_filter(task, session_trust) 

517 

518 return session.receive_task(task_payload) 

519 except ImportError: 

520 logger.debug("Cannot deliver: claude_hive_session not importable") 

521 return False 

522 except Exception as exc: 

523 logger.warning("Task delivery failed for [%s]: %s", 

524 task.task_id[:8], exc) 

525 return False 

526 

527 def _apply_shard_filter(self, task: HiveTask, trust_level: str) -> Dict: 

528 """Reduce task payload based on shard scope for untrusted sessions. 

529 

530 Uses the ShardEngine to strip implementation details, leaving 

531 only interface-level information for untrusted peers. 

532 """ 

533 payload = task.to_dict() 

534 try: 

535 from integrations.agent_engine.shard_engine import ( 

536 ShardEngine, ShardScope, 

537 ) 

538 scope_map = { 

539 'INTERFACES': ShardScope.INTERFACES, 

540 'SIGNATURES': ShardScope.SIGNATURES, 

541 'MINIMAL': ShardScope.MINIMAL, 

542 } 

543 scope = scope_map.get(task.shard_level, ShardScope.INTERFACES) 

544 engine = ShardEngine() 

545 shard = engine.create_shard( 

546 task=task.instructions, 

547 target_files=task.files_scope, 

548 scope=scope, 

549 ) 

550 payload['shard'] = shard.to_dict() 

551 # Strip raw instructions for untrusted sessions 

552 if scope != ShardScope.FULL_FILE: 

553 payload.pop('instructions', None) 

554 except Exception as exc: 

555 logger.debug("Shard filtering failed, sending metadata only: %s", exc) 

556 payload.pop('instructions', None) 

557 return payload 

558 

559 # ── Result handling ────────────────────────────────────────────── 

560 

561 def on_task_result(self, task_id: str, result: Dict) -> Dict: 

562 """Called when a session reports a task result. 

563 

564 Validates the result, calculates a quality score, awards Spark 

565 via the revenue aggregator, and updates task status. 

566 

567 Args: 

568 task_id: UUID of the completed task. 

569 result: Dict with keys like files_changed, diff, tests_passed, 

570 test_output, error, etc. 

571 

572 Returns: 

573 {spark_awarded: int, quality_score: float, validated: bool} 

574 """ 

575 with self._lock: 

576 task = self._tasks.get(task_id) 

577 if not task: 

578 return {'spark_awarded': 0, 'quality_score': 0.0, 

579 'validated': False, 'error': 'unknown_task'} 

580 

581 quality = validate_result(task, result) 

582 task.result = result 

583 task.completed_at = time.time() 

584 

585 # Quality gate: require >= 0.4 to count as completed 

586 validated = quality >= 0.4 

587 if validated: 

588 task.status = HiveTaskStatus.VALIDATED.value 

589 else: 

590 task.status = HiveTaskStatus.FAILED.value 

591 self._stats['total_failed'] += 1 

592 

593 # Calculate Spark reward: base * quality 

594 spark_awarded = 0 

595 if validated: 

596 spark_awarded = max(1, int(task.spark_reward * quality)) 

597 self._stats['total_completed'] += 1 

598 self._stats['total_spark_distributed'] += spark_awarded 

599 self._distribute_spark(task, spark_awarded) 

600 

601 self._stats['quality_scores'].append(quality) 

602 # Keep rolling window of last 500 scores 

603 if len(self._stats['quality_scores']) > 500: 

604 self._stats['quality_scores'] = self._stats['quality_scores'][-500:] 

605 

606 self._persist() 

607 

608 logger.info( 

609 "Task result [%s]: quality=%.3f, spark=%d, validated=%s", 

610 task_id[:8], quality, spark_awarded, validated, 

611 ) 

612 return { 

613 'spark_awarded': spark_awarded, 

614 'quality_score': quality, 

615 'validated': validated, 

616 } 

617 

618 def _distribute_spark(self, task: HiveTask, spark_amount: int) -> None: 

619 """Award Spark to the session operator via revenue aggregator. 

620 

621 Follows the 90/9/1 split model: 

622 90% to the compute contributor (session operator) 

623 9% to infrastructure pool 

624 1% to central 

625 """ 

626 try: 

627 from integrations.agent_engine.revenue_aggregator import ( 

628 REVENUE_SPLIT_USERS, 

629 REVENUE_SPLIT_INFRA, 

630 REVENUE_SPLIT_CENTRAL, 

631 ) 

632 user_share = max(1, int(spark_amount * REVENUE_SPLIT_USERS)) 

633 infra_share = int(spark_amount * REVENUE_SPLIT_INFRA) 

634 central_share = int(spark_amount * REVENUE_SPLIT_CENTRAL) 

635 

636 logger.info( 

637 "Spark distribution for task [%s]: " 

638 "user=%d, infra=%d, central=%d (total=%d)", 

639 task.task_id[:8], user_share, infra_share, central_share, 

640 spark_amount, 

641 ) 

642 # Actual wallet crediting would go through ResonanceService.award_spark() 

643 # when the social models are available. Log-only for now. 

644 except ImportError: 

645 logger.debug("revenue_aggregator not available for Spark distribution") 

646 except Exception as exc: 

647 logger.warning("Spark distribution failed: %s", exc) 

648 

649 # ── Query ──────────────────────────────────────────────────────── 

650 

651 def get_pending_tasks(self) -> List[HiveTask]: 

652 """List all pending tasks sorted by priority (highest first).""" 

653 with self._lock: 

654 pending = [ 

655 t for t in self._tasks.values() 

656 if t.status == HiveTaskStatus.PENDING.value 

657 ] 

658 pending.sort(key=lambda t: (-t.priority, t.created_at)) 

659 return pending 

660 

661 def get_task(self, task_id: str) -> Optional[HiveTask]: 

662 """Retrieve a task by ID.""" 

663 with self._lock: 

664 return self._tasks.get(task_id) 

665 

666 def cancel_task(self, task_id: str) -> bool: 

667 """Cancel a pending or assigned task. Returns True if cancelled.""" 

668 with self._lock: 

669 task = self._tasks.get(task_id) 

670 if not task: 

671 return False 

672 if task.status in (HiveTaskStatus.COMPLETED.value, 

673 HiveTaskStatus.VALIDATED.value): 

674 return False # Cannot cancel finished tasks 

675 task.status = HiveTaskStatus.CANCELLED.value 

676 self._persist() 

677 logger.info("Task [%s] cancelled", task_id[:8]) 

678 return True 

679 

680 def get_stats(self) -> Dict: 

681 """Dispatcher statistics. 

682 

683 Returns: 

684 Dict with total_created, total_completed, total_failed, 

685 avg_quality, total_spark_distributed, pending_count. 

686 """ 

687 with self._lock: 

688 scores = self._stats['quality_scores'] 

689 avg_quality = ( 

690 round(sum(scores) / len(scores), 3) if scores else 0.0 

691 ) 

692 return { 

693 'total_created': self._stats['total_created'], 

694 'total_completed': self._stats['total_completed'], 

695 'total_failed': self._stats['total_failed'], 

696 'total_spark_distributed': self._stats['total_spark_distributed'], 

697 'avg_quality': avg_quality, 

698 'pending_count': sum( 

699 1 for t in self._tasks.values() 

700 if t.status == HiveTaskStatus.PENDING.value 

701 ), 

702 'active_count': sum( 

703 1 for t in self._tasks.values() 

704 if t.status in (HiveTaskStatus.ASSIGNED.value, 

705 HiveTaskStatus.IN_PROGRESS.value) 

706 ), 

707 'total_tasks': len(self._tasks), 

708 } 

709 

710 

711# ═══════════════════════════════════════════════════════════════════════ 

712# Singleton 

713# ═══════════════════════════════════════════════════════════════════════ 

714 

715_dispatcher: Optional[HiveTaskDispatcher] = None 

716_dispatcher_lock = threading.Lock() 

717 

718 

719def get_dispatcher() -> HiveTaskDispatcher: 

720 """Get or create the HiveTaskDispatcher singleton.""" 

721 global _dispatcher 

722 if _dispatcher is None: 

723 with _dispatcher_lock: 

724 if _dispatcher is None: 

725 _dispatcher = HiveTaskDispatcher() 

726 return _dispatcher 

727 

728 

729# ═══════════════════════════════════════════════════════════════════════ 

730# Hive ledger reader (L5 acceptance contract) 

731# ═══════════════════════════════════════════════════════════════════════ 

732# 

733# The Hive Test Ledger is a durable, append-only markdown file in the 

734# user's Claude Code project memory dir. Every Hive participant pulls 

735# open items from the same file so we benchmark Claude Code, Nunba's 

736# in-app coding agent, local Qwen, closed APIs, and HF models on the 

737# identical task list. This module is the canonical reader; ledger 

738# spec lives inside the file itself (memory/project_hive_test_ledger.md). 

739# 

740# Parser assumes the ledger uses the standard 6-column markdown table 

741# (# | Timestamp | Verbatim ask | Success criterion | Acceptance test | 

742# Status) and that no cell contains an unescaped pipe — true by the 

743# ledger's own append-only contract. 

744 

745@dataclass(frozen=True) 

746class LedgerEntry: 

747 """One row of the Hive Test Ledger.""" 

748 id: str 

749 timestamp: str 

750 ask: str 

751 success_criterion: str 

752 acceptance_test: str 

753 status: str 

754 

755 

756def _resolve_ledger_path(explicit: Optional[str] = None) -> Optional[str]: 

757 """Find the active project_hive_test_ledger.md. 

758 

759 Resolution order: 

760 1. explicit arg (when caller knows the path) 

761 2. HIVE_LEDGER_PATH env var (cross-agent override) 

762 3. Most-recently-modified ~/.claude/projects/*/memory/ 

763 project_hive_test_ledger.md (Claude-Code default location) 

764 

765 Returns None if no candidate exists. Pure read-only — no side 

766 effects, no caching. 

767 """ 

768 if explicit: 

769 return explicit if os.path.isfile(explicit) else None 

770 env = os.environ.get('HIVE_LEDGER_PATH') 

771 if env and os.path.isfile(env): 

772 return env 

773 base = os.path.join(os.path.expanduser('~'), '.claude', 'projects') 

774 if not os.path.isdir(base): 

775 return None 

776 candidates: List[tuple] = [] 

777 try: 

778 projects = os.listdir(base) 

779 except OSError: 

780 return None 

781 for project in projects: 

782 path = os.path.join( 

783 base, project, 'memory', 'project_hive_test_ledger.md', 

784 ) 

785 if os.path.isfile(path): 

786 try: 

787 candidates.append((os.path.getmtime(path), path)) 

788 except OSError: 

789 continue 

790 if not candidates: 

791 return None 

792 candidates.sort(reverse=True) 

793 return candidates[0][1] 

794 

795 

796def _parse_ledger_row(line: str) -> Optional[LedgerEntry]: 

797 """Parse one markdown table row into a LedgerEntry, or None if the 

798 line is not a data row (header, separator, prose, blank). 

799 """ 

800 line = line.strip() 

801 if not line.startswith('|'): 

802 return None 

803 if line.startswith('|---') or line.startswith('| ---'): 

804 return None 

805 if line.startswith('| #'): 

806 return None 

807 # Strip leading + trailing pipes, split on pipe. 

808 parts = [p.strip() for p in line.strip('|').split('|')] 

809 if len(parts) != 6: 

810 return None 

811 entry_id, ts, ask, ok, atest, status = parts 

812 if not entry_id or entry_id == '#': 

813 return None 

814 return LedgerEntry( 

815 id=entry_id, 

816 timestamp=ts, 

817 ask=ask.strip('"'), 

818 success_criterion=ok, 

819 acceptance_test=atest, 

820 status=status, 

821 ) 

822 

823 

824def load_user_ledger( 

825 path: Optional[str] = None, 

826 include_closed: bool = False, 

827) -> List[LedgerEntry]: 

828 """Parse the Hive Test Ledger and return open items (default). 

829 

830 Args: 

831 path: explicit ledger file location. If None, resolves via 

832 HIVE_LEDGER_PATH env or auto-discovery under 

833 ~/.claude/projects/*/memory/. 

834 include_closed: if True, include rows whose status begins 

835 with `done_by_` or `claimed_by_`; default returns only 

836 rows with status == 'open'. 

837 

838 Returns: 

839 Ordered list of LedgerEntry in file order. Empty list if the 

840 ledger does not exist or contains no parseable rows. Never 

841 raises — read failures degrade to a warning + empty list so 

842 agents that depend on this never crash on a missing memory 

843 directory. 

844 """ 

845 resolved = _resolve_ledger_path(path) 

846 if resolved is None: 

847 logger.info('hive_task_protocol.load_user_ledger: no ledger found') 

848 return [] 

849 try: 

850 with open(resolved, 'r', encoding='utf-8') as fh: 

851 lines = fh.readlines() 

852 except OSError as e: 

853 logger.warning( 

854 'hive_task_protocol.load_user_ledger: read failed %s: %s', 

855 resolved, e, 

856 ) 

857 return [] 

858 out: List[LedgerEntry] = [] 

859 for line in lines: 

860 entry = _parse_ledger_row(line) 

861 if entry is None: 

862 continue 

863 if include_closed or entry.status == 'open': 

864 out.append(entry) 

865 return out