Coverage for integrations / coding_agent / hive_task_protocol.py: 72.7%
381 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Hive Task Protocol — Define, dispatch, and track distributed coding tasks.
4Tasks flow through the hive:
5 1. A seeded agent (or user) creates a HiveTask
6 2. HiveTaskDispatcher finds the best available Claude Code session
7 3. Task is shard-filtered based on trust level
8 4. Session executes and reports result
9 5. Result is validated, Spark reward calculated, capital distributed
11Task types map to seeded agents:
12 - CODE_REVIEW: Review PR/code for quality, security
13 - CODE_WRITE: Write new code for a feature/fix
14 - CODE_TEST: Write or run tests
15 - MODEL_ONBOARD: Quantize + onboard a new HF model
16 - BENCHMARK: Run benchmarks on a model
17 - DOCUMENTATION: Write docs for code
18 - BUG_FIX: Fix a reported bug
19 - REFACTOR: Improve code structure
21Storage: agent_data/hive_tasks.json (portable across nodes).
22Thread-safe via Lock on all mutations.
23"""
25import json
26import logging
27import os
28import threading
29import time
30import uuid
31from dataclasses import dataclass, field, asdict
32from enum import Enum
33from typing import Any, Dict, List, Optional
35logger = logging.getLogger(__name__)
37# ─── Storage path ───────────────────────────────────────────────────────
38# Must be user-writable. Previously resolved to the HART_INSTALL_DIR
39# fallback which in bundled Nunba builds is `C:\Program Files (x86)\
40# HevolveAI\Nunba` — read-only for non-admin users, causing every
41# `_save()` to fail with `[Errno 13] Permission denied` (task #250
42# regression found by runtime-log-watcher on 2026-04-15). Use the
43# platform-standard user data dir: `~/Documents/Nunba/data/agent_data`
44# on all OSes, matching MemoryGraph + every other Nunba data path.
45def _resolve_data_dir() -> str:
46 try:
47 from core.platform_paths import get_data_dir as _gdd
48 _base = _gdd()
49 except Exception:
50 _base = os.path.join(
51 os.path.expanduser('~'), 'Documents', 'Nunba', 'data',
52 )
53 _dir = os.path.join(_base, 'agent_data')
54 try:
55 os.makedirs(_dir, exist_ok=True)
56 except OSError:
57 pass
58 return _dir
61_DATA_DIR = _resolve_data_dir()
62_TASKS_FILE = os.path.join(_DATA_DIR, 'hive_tasks.json')
65# ─── Task Types ─────────────────────────────────────────────────────────
67class HiveTaskType(str, Enum):
68 CODE_REVIEW = 'code_review'
69 CODE_WRITE = 'code_write'
70 CODE_TEST = 'code_test'
71 MODEL_ONBOARD = 'model_onboard'
72 BENCHMARK = 'benchmark'
73 DOCUMENTATION = 'documentation'
74 BUG_FIX = 'bug_fix'
75 REFACTOR = 'refactor'
78class HiveTaskStatus(str, Enum):
79 PENDING = 'pending'
80 ASSIGNED = 'assigned'
81 IN_PROGRESS = 'in_progress'
82 COMPLETED = 'completed'
83 FAILED = 'failed'
84 VALIDATED = 'validated'
85 CANCELLED = 'cancelled'
88# Map seeded-agent slugs to the task types they create.
89# The daemon can use this to auto-create tasks from active goals.
90AGENT_TASK_MAP = {
91 'bootstrap_compute_recruiter': HiveTaskType.CODE_WRITE,
92 'bootstrap_model_provisioner': HiveTaskType.MODEL_ONBOARD,
93 'bootstrap_capital_distributor': HiveTaskType.CODE_WRITE,
94 'bootstrap_hive_model_trainer': HiveTaskType.BENCHMARK,
95 'bootstrap_opensource_evangelist': HiveTaskType.DOCUMENTATION,
96 'bootstrap_node_health_optimizer': HiveTaskType.BUG_FIX,
97}
99# Base Spark reward ranges per task type (min, max).
100_SPARK_RANGES = {
101 HiveTaskType.CODE_REVIEW: (5, 30),
102 HiveTaskType.CODE_WRITE: (10, 100),
103 HiveTaskType.CODE_TEST: (8, 50),
104 HiveTaskType.MODEL_ONBOARD: (15, 80),
105 HiveTaskType.BENCHMARK: (5, 40),
106 HiveTaskType.DOCUMENTATION: (3, 25),
107 HiveTaskType.BUG_FIX: (10, 80),
108 HiveTaskType.REFACTOR: (10, 60),
109}
112# ─── Dataclass ──────────────────────────────────────────────────────────
114@dataclass
115class HiveTask:
116 """A single distributed coding task for the hive."""
118 task_id: str # UUID4
119 task_type: str # HiveTaskType value
120 title: str
121 description: str
122 instructions: str # Detailed instructions for Claude Code
123 repo_url: str = '' # Git repo URL (empty for hive-internal)
124 files_scope: List[str] = field(default_factory=list) # Files involved
125 shard_level: str = 'INTERFACES' # Privacy level for untrusted sessions
126 priority: int = 50 # 0-100 (higher = more urgent)
127 spark_reward: int = 10 # Spark tokens for completion
128 max_duration_minutes: int = 30
129 requires_tests: bool = True
130 requires_review: bool = True
131 origin_node_id: str = '' # Who created this task
132 origin_signature: str = '' # Ed25519 signature for verification
133 assigned_session_id: str = ''
134 status: str = 'pending' # HiveTaskStatus value
135 result: Dict[str, Any] = field(default_factory=dict)
136 created_at: float = 0.0
137 completed_at: float = 0.0
139 def to_dict(self) -> Dict:
140 return asdict(self)
142 @classmethod
143 def from_dict(cls, d: Dict) -> 'HiveTask':
144 """Reconstruct from a JSON-serialised dict, tolerating missing keys."""
145 known_fields = {f.name for f in cls.__dataclass_fields__.values()}
146 filtered = {k: v for k, v in d.items() if k in known_fields}
147 return cls(**filtered)
150# ─── Helpers ────────────────────────────────────────────────────────────
152def estimate_complexity(instructions: str) -> int:
153 """Heuristic complexity estimate. Returns a Spark value 1-100.
155 Signals:
156 - Instruction length (more text = more work)
157 - File references (paths ending in .py, .js, etc.)
158 - Presence of testing requirements
159 - Presence of refactoring/migration keywords
160 """
161 score = 0
163 # Length component: 1 point per 200 characters, capped at 40
164 score += min(40, len(instructions) // 200)
166 instructions_lower = instructions.lower()
168 # File references
169 file_extensions = ('.py', '.js', '.ts', '.rs', '.go', '.java', '.c', '.cpp')
170 file_count = sum(
171 1 for word in instructions.split()
172 if any(word.endswith(ext) for ext in file_extensions)
173 )
174 score += min(20, file_count * 3)
176 # Testing requirements
177 test_keywords = ('test', 'pytest', 'unittest', 'coverage', 'assert')
178 if any(kw in instructions_lower for kw in test_keywords):
179 score += 10
181 # Refactoring/migration keywords
182 hard_keywords = ('refactor', 'migrate', 'rewrite', 'restructure',
183 'breaking change', 'backward compat')
184 if any(kw in instructions_lower for kw in hard_keywords):
185 score += 15
187 # Security keywords
188 security_keywords = ('security', 'vulnerability', 'cve', 'injection',
189 'authentication', 'authorization')
190 if any(kw in instructions_lower for kw in security_keywords):
191 score += 10
193 return max(1, min(100, score))
196def validate_result(task: HiveTask, result: Dict) -> float:
197 """Quality score 0.0-1.0 for a task result.
199 Checks:
200 - Files changed match the task scope
201 - Tests included if task requires them
202 - No PII leakage (DLP scan)
203 - Result dict contains expected keys
204 """
205 score = 0.0
206 checks_total = 0
208 # 1. Result structure: must have 'files_changed' or 'diff'
209 checks_total += 1
210 if result.get('files_changed') or result.get('diff'):
211 score += 1.0
213 # 2. Files match scope (if scope was defined)
214 if task.files_scope:
215 checks_total += 1
216 changed = set(result.get('files_changed', []))
217 scope = set(task.files_scope)
218 if changed and changed.issubset(scope):
219 score += 1.0
220 elif changed:
221 # Partial credit: fraction within scope
222 overlap = len(changed & scope)
223 score += overlap / len(changed) if changed else 0.0
225 # 3. Tests included when required
226 if task.requires_tests:
227 checks_total += 1
228 if result.get('tests_passed') is not None:
229 score += 1.0 if result['tests_passed'] else 0.3
230 elif result.get('test_output'):
231 score += 0.5 # Tests ran but no pass/fail indicator
233 # 4. No errors reported
234 checks_total += 1
235 if not result.get('error'):
236 score += 1.0
238 # 5. DLP scan on result text (try/except — optional dependency)
239 result_text = json.dumps(result, default=str)
240 checks_total += 1
241 try:
242 from security.dlp_engine import get_dlp_engine
243 dlp = get_dlp_engine()
244 findings = dlp.scan(result_text)
245 if not findings:
246 score += 1.0
247 else:
248 logger.warning(
249 "DLP findings in task %s result: %d PII items",
250 task.task_id, len(findings),
251 )
252 # Partial credit: penalise proportionally
253 score += max(0.0, 1.0 - len(findings) * 0.25)
254 except Exception:
255 # DLP not available — give benefit of the doubt
256 score += 1.0
258 return round(score / checks_total, 3) if checks_total > 0 else 0.0
261# ─── Persistence ────────────────────────────────────────────────────────
263def _load_tasks() -> List[Dict]:
264 """Load task list from JSON file."""
265 if not os.path.exists(_TASKS_FILE):
266 return []
267 try:
268 with open(_TASKS_FILE, 'r', encoding='utf-8') as f:
269 data = json.load(f)
270 return data if isinstance(data, list) else []
271 except (json.JSONDecodeError, IOError) as exc:
272 logger.warning("Failed to load hive tasks: %s", exc)
273 return []
276def _save_tasks(tasks: List[Dict]) -> None:
277 """Atomically save task list to JSON file."""
278 os.makedirs(os.path.dirname(_TASKS_FILE), exist_ok=True)
279 tmp_path = _TASKS_FILE + '.tmp'
280 try:
281 with open(tmp_path, 'w', encoding='utf-8') as f:
282 json.dump(tasks, f, indent=2, default=str)
283 # Atomic rename (works on POSIX; best-effort on Windows)
284 if os.path.exists(_TASKS_FILE):
285 os.replace(tmp_path, _TASKS_FILE)
286 else:
287 os.rename(tmp_path, _TASKS_FILE)
288 except IOError as exc:
289 logger.error("Failed to save hive tasks: %s", exc)
292# ─── Dispatcher ─────────────────────────────────────────────────────────
294class HiveTaskDispatcher:
295 """Create, dispatch, and track distributed coding tasks.
297 Finds pending tasks, matches them to connected Claude Code sessions
298 based on capabilities and trust, then tracks results and distributes
299 Spark rewards via the revenue aggregator.
301 Thread-safe: all task mutations are guarded by ``_lock``.
302 """
304 def __init__(self):
305 self._lock = threading.Lock()
306 self._tasks: Dict[str, HiveTask] = {}
307 self._stats = {
308 'total_created': 0,
309 'total_completed': 0,
310 'total_failed': 0,
311 'total_spark_distributed': 0,
312 'quality_scores': [], # Rolling window for avg
313 }
314 self._load_from_disk()
316 # ── Persistence ──────────────────────────────────────────────────
318 def _load_from_disk(self) -> None:
319 raw = _load_tasks()
320 for d in raw:
321 try:
322 task = HiveTask.from_dict(d)
323 self._tasks[task.task_id] = task
324 except Exception as exc:
325 logger.debug("Skipping malformed task: %s", exc)
327 def _persist(self) -> None:
328 """Save current task state to disk. Caller must hold _lock."""
329 _save_tasks([t.to_dict() for t in self._tasks.values()])
331 # ── Task creation ────────────────────────────────────────────────
333 def create_task(self, task_type: str, title: str, description: str,
334 instructions: str, **kwargs) -> HiveTask:
335 """Create and queue a new hive task.
337 Auto-calculates ``spark_reward`` from instruction complexity if
338 not explicitly provided via *kwargs*.
340 Args:
341 task_type: One of HiveTaskType values.
342 title: Short human-readable title.
343 description: What the task accomplishes.
344 instructions: Detailed instructions for Claude Code.
345 **kwargs: Override any HiveTask field (priority, spark_reward, etc.)
347 Returns:
348 The newly created HiveTask (status='pending').
349 """
350 task_id = str(uuid.uuid4())
352 # Auto-calculate Spark reward if not provided
353 if 'spark_reward' not in kwargs:
354 complexity = estimate_complexity(instructions)
355 try:
356 tt = HiveTaskType(task_type)
357 lo, hi = _SPARK_RANGES.get(tt, (5, 50))
358 except ValueError:
359 lo, hi = 5, 50
360 kwargs['spark_reward'] = max(lo, min(hi, complexity))
362 task = HiveTask(
363 task_id=task_id,
364 task_type=task_type,
365 title=title,
366 description=description,
367 instructions=instructions,
368 created_at=time.time(),
369 **kwargs,
370 )
372 with self._lock:
373 self._tasks[task_id] = task
374 self._stats['total_created'] += 1
375 self._persist()
377 logger.info(
378 "Hive task created: [%s] %s (type=%s, spark=%d, priority=%d)",
379 task_id[:8], title, task_type, task.spark_reward, task.priority,
380 )
381 return task
383 # ── Dispatch ─────────────────────────────────────────────────────
385 def dispatch_pending(self) -> int:
386 """Find pending tasks and dispatch to available sessions.
388 Called by the agent daemon on each tick. Matches tasks to the
389 best available Claude Code session based on capabilities and
390 trust level, then calls ``session.receive_task()``.
392 Returns:
393 Number of tasks successfully dispatched.
394 """
395 dispatched = 0
396 pending = self.get_pending_tasks()
397 if not pending:
398 return 0
400 for task in pending:
401 session_id = self.match_session(task)
402 if not session_id:
403 continue
405 # Deliver task to session
406 delivered = self._deliver_to_session(session_id, task)
407 if delivered:
408 with self._lock:
409 task.status = HiveTaskStatus.ASSIGNED.value
410 task.assigned_session_id = session_id
411 self._persist()
412 dispatched += 1
413 logger.info(
414 "Dispatched task [%s] -> session [%s]",
415 task.task_id[:8], session_id[:8],
416 )
418 return dispatched
420 def match_session(self, task: HiveTask) -> Optional[str]:
421 """Find the best session for a task.
423 Selection criteria (in priority order):
424 1. Filter by task scope (own_repos vs public vs any)
425 2. Filter by capabilities (language, framework match)
426 3. Prefer sessions with higher quality scores
427 4. Prefer sessions with lower latency (same region)
429 Returns:
430 session_id or None if no suitable session found.
431 """
432 try:
433 from integrations.coding_agent.claude_hive_session import (
434 get_session_registry,
435 )
436 registry = get_session_registry()
437 except ImportError:
438 logger.debug("claude_hive_session not available for matching")
439 return None
440 except Exception as exc:
441 logger.debug("Session registry unavailable: %s", exc)
442 return None
444 if not hasattr(registry, 'get_available_sessions'):
445 return None
447 try:
448 sessions = registry.get_available_sessions()
449 except Exception:
450 return None
452 if not sessions:
453 return None
455 # Score each session
456 best_id = None
457 best_score = -1.0
459 for session in sessions:
460 sid = session.get('session_id', '')
461 if not sid:
462 continue
464 score = 0.0
466 # Capability match: language overlap
467 session_langs = set(session.get('languages', []))
468 task_langs = set()
469 for fpath in task.files_scope:
470 ext = os.path.splitext(fpath)[1].lower()
471 lang_map = {
472 '.py': 'python', '.js': 'javascript', '.ts': 'typescript',
473 '.rs': 'rust', '.go': 'go', '.java': 'java',
474 '.c': 'c', '.cpp': 'cpp',
475 }
476 if ext in lang_map:
477 task_langs.add(lang_map[ext])
478 if not task_langs or task_langs & session_langs:
479 score += 2.0 # Match or no constraint
481 # Quality score
482 quality = session.get('quality_score', 0.5)
483 score += quality * 3.0
485 # Latency / region preference
486 session_region = session.get('region', '')
487 task_region = task.origin_node_id[:3] if task.origin_node_id else ''
488 if session_region and task_region and session_region == task_region:
489 score += 1.0
491 # Availability: prefer idle sessions
492 if session.get('status') == 'idle':
493 score += 1.5
495 if score > best_score:
496 best_score = score
497 best_id = sid
499 return best_id
501 def _deliver_to_session(self, session_id: str, task: HiveTask) -> bool:
502 """Send a task to a Claude Code session. Returns True on success."""
503 try:
504 from integrations.coding_agent.claude_hive_session import (
505 get_session_registry,
506 )
507 registry = get_session_registry()
508 session = registry.get_session(session_id)
509 if session is None:
510 return False
512 # Apply shard filtering based on trust level
513 task_payload = task.to_dict()
514 session_trust = getattr(session, 'trust_level', 'PEER')
515 if session_trust not in ('SAME_USER', 'FULL_FILE'):
516 task_payload = self._apply_shard_filter(task, session_trust)
518 return session.receive_task(task_payload)
519 except ImportError:
520 logger.debug("Cannot deliver: claude_hive_session not importable")
521 return False
522 except Exception as exc:
523 logger.warning("Task delivery failed for [%s]: %s",
524 task.task_id[:8], exc)
525 return False
527 def _apply_shard_filter(self, task: HiveTask, trust_level: str) -> Dict:
528 """Reduce task payload based on shard scope for untrusted sessions.
530 Uses the ShardEngine to strip implementation details, leaving
531 only interface-level information for untrusted peers.
532 """
533 payload = task.to_dict()
534 try:
535 from integrations.agent_engine.shard_engine import (
536 ShardEngine, ShardScope,
537 )
538 scope_map = {
539 'INTERFACES': ShardScope.INTERFACES,
540 'SIGNATURES': ShardScope.SIGNATURES,
541 'MINIMAL': ShardScope.MINIMAL,
542 }
543 scope = scope_map.get(task.shard_level, ShardScope.INTERFACES)
544 engine = ShardEngine()
545 shard = engine.create_shard(
546 task=task.instructions,
547 target_files=task.files_scope,
548 scope=scope,
549 )
550 payload['shard'] = shard.to_dict()
551 # Strip raw instructions for untrusted sessions
552 if scope != ShardScope.FULL_FILE:
553 payload.pop('instructions', None)
554 except Exception as exc:
555 logger.debug("Shard filtering failed, sending metadata only: %s", exc)
556 payload.pop('instructions', None)
557 return payload
559 # ── Result handling ──────────────────────────────────────────────
561 def on_task_result(self, task_id: str, result: Dict) -> Dict:
562 """Called when a session reports a task result.
564 Validates the result, calculates a quality score, awards Spark
565 via the revenue aggregator, and updates task status.
567 Args:
568 task_id: UUID of the completed task.
569 result: Dict with keys like files_changed, diff, tests_passed,
570 test_output, error, etc.
572 Returns:
573 {spark_awarded: int, quality_score: float, validated: bool}
574 """
575 with self._lock:
576 task = self._tasks.get(task_id)
577 if not task:
578 return {'spark_awarded': 0, 'quality_score': 0.0,
579 'validated': False, 'error': 'unknown_task'}
581 quality = validate_result(task, result)
582 task.result = result
583 task.completed_at = time.time()
585 # Quality gate: require >= 0.4 to count as completed
586 validated = quality >= 0.4
587 if validated:
588 task.status = HiveTaskStatus.VALIDATED.value
589 else:
590 task.status = HiveTaskStatus.FAILED.value
591 self._stats['total_failed'] += 1
593 # Calculate Spark reward: base * quality
594 spark_awarded = 0
595 if validated:
596 spark_awarded = max(1, int(task.spark_reward * quality))
597 self._stats['total_completed'] += 1
598 self._stats['total_spark_distributed'] += spark_awarded
599 self._distribute_spark(task, spark_awarded)
601 self._stats['quality_scores'].append(quality)
602 # Keep rolling window of last 500 scores
603 if len(self._stats['quality_scores']) > 500:
604 self._stats['quality_scores'] = self._stats['quality_scores'][-500:]
606 self._persist()
608 logger.info(
609 "Task result [%s]: quality=%.3f, spark=%d, validated=%s",
610 task_id[:8], quality, spark_awarded, validated,
611 )
612 return {
613 'spark_awarded': spark_awarded,
614 'quality_score': quality,
615 'validated': validated,
616 }
618 def _distribute_spark(self, task: HiveTask, spark_amount: int) -> None:
619 """Award Spark to the session operator via revenue aggregator.
621 Follows the 90/9/1 split model:
622 90% to the compute contributor (session operator)
623 9% to infrastructure pool
624 1% to central
625 """
626 try:
627 from integrations.agent_engine.revenue_aggregator import (
628 REVENUE_SPLIT_USERS,
629 REVENUE_SPLIT_INFRA,
630 REVENUE_SPLIT_CENTRAL,
631 )
632 user_share = max(1, int(spark_amount * REVENUE_SPLIT_USERS))
633 infra_share = int(spark_amount * REVENUE_SPLIT_INFRA)
634 central_share = int(spark_amount * REVENUE_SPLIT_CENTRAL)
636 logger.info(
637 "Spark distribution for task [%s]: "
638 "user=%d, infra=%d, central=%d (total=%d)",
639 task.task_id[:8], user_share, infra_share, central_share,
640 spark_amount,
641 )
642 # Actual wallet crediting would go through ResonanceService.award_spark()
643 # when the social models are available. Log-only for now.
644 except ImportError:
645 logger.debug("revenue_aggregator not available for Spark distribution")
646 except Exception as exc:
647 logger.warning("Spark distribution failed: %s", exc)
649 # ── Query ────────────────────────────────────────────────────────
651 def get_pending_tasks(self) -> List[HiveTask]:
652 """List all pending tasks sorted by priority (highest first)."""
653 with self._lock:
654 pending = [
655 t for t in self._tasks.values()
656 if t.status == HiveTaskStatus.PENDING.value
657 ]
658 pending.sort(key=lambda t: (-t.priority, t.created_at))
659 return pending
661 def get_task(self, task_id: str) -> Optional[HiveTask]:
662 """Retrieve a task by ID."""
663 with self._lock:
664 return self._tasks.get(task_id)
666 def cancel_task(self, task_id: str) -> bool:
667 """Cancel a pending or assigned task. Returns True if cancelled."""
668 with self._lock:
669 task = self._tasks.get(task_id)
670 if not task:
671 return False
672 if task.status in (HiveTaskStatus.COMPLETED.value,
673 HiveTaskStatus.VALIDATED.value):
674 return False # Cannot cancel finished tasks
675 task.status = HiveTaskStatus.CANCELLED.value
676 self._persist()
677 logger.info("Task [%s] cancelled", task_id[:8])
678 return True
680 def get_stats(self) -> Dict:
681 """Dispatcher statistics.
683 Returns:
684 Dict with total_created, total_completed, total_failed,
685 avg_quality, total_spark_distributed, pending_count.
686 """
687 with self._lock:
688 scores = self._stats['quality_scores']
689 avg_quality = (
690 round(sum(scores) / len(scores), 3) if scores else 0.0
691 )
692 return {
693 'total_created': self._stats['total_created'],
694 'total_completed': self._stats['total_completed'],
695 'total_failed': self._stats['total_failed'],
696 'total_spark_distributed': self._stats['total_spark_distributed'],
697 'avg_quality': avg_quality,
698 'pending_count': sum(
699 1 for t in self._tasks.values()
700 if t.status == HiveTaskStatus.PENDING.value
701 ),
702 'active_count': sum(
703 1 for t in self._tasks.values()
704 if t.status in (HiveTaskStatus.ASSIGNED.value,
705 HiveTaskStatus.IN_PROGRESS.value)
706 ),
707 'total_tasks': len(self._tasks),
708 }
711# ═══════════════════════════════════════════════════════════════════════
712# Singleton
713# ═══════════════════════════════════════════════════════════════════════
715_dispatcher: Optional[HiveTaskDispatcher] = None
716_dispatcher_lock = threading.Lock()
719def get_dispatcher() -> HiveTaskDispatcher:
720 """Get or create the HiveTaskDispatcher singleton."""
721 global _dispatcher
722 if _dispatcher is None:
723 with _dispatcher_lock:
724 if _dispatcher is None:
725 _dispatcher = HiveTaskDispatcher()
726 return _dispatcher
729# ═══════════════════════════════════════════════════════════════════════
730# Hive ledger reader (L5 acceptance contract)
731# ═══════════════════════════════════════════════════════════════════════
732#
733# The Hive Test Ledger is a durable, append-only markdown file in the
734# user's Claude Code project memory dir. Every Hive participant pulls
735# open items from the same file so we benchmark Claude Code, Nunba's
736# in-app coding agent, local Qwen, closed APIs, and HF models on the
737# identical task list. This module is the canonical reader; ledger
738# spec lives inside the file itself (memory/project_hive_test_ledger.md).
739#
740# Parser assumes the ledger uses the standard 6-column markdown table
741# (# | Timestamp | Verbatim ask | Success criterion | Acceptance test |
742# Status) and that no cell contains an unescaped pipe — true by the
743# ledger's own append-only contract.
745@dataclass(frozen=True)
746class LedgerEntry:
747 """One row of the Hive Test Ledger."""
748 id: str
749 timestamp: str
750 ask: str
751 success_criterion: str
752 acceptance_test: str
753 status: str
756def _resolve_ledger_path(explicit: Optional[str] = None) -> Optional[str]:
757 """Find the active project_hive_test_ledger.md.
759 Resolution order:
760 1. explicit arg (when caller knows the path)
761 2. HIVE_LEDGER_PATH env var (cross-agent override)
762 3. Most-recently-modified ~/.claude/projects/*/memory/
763 project_hive_test_ledger.md (Claude-Code default location)
765 Returns None if no candidate exists. Pure read-only — no side
766 effects, no caching.
767 """
768 if explicit:
769 return explicit if os.path.isfile(explicit) else None
770 env = os.environ.get('HIVE_LEDGER_PATH')
771 if env and os.path.isfile(env):
772 return env
773 base = os.path.join(os.path.expanduser('~'), '.claude', 'projects')
774 if not os.path.isdir(base):
775 return None
776 candidates: List[tuple] = []
777 try:
778 projects = os.listdir(base)
779 except OSError:
780 return None
781 for project in projects:
782 path = os.path.join(
783 base, project, 'memory', 'project_hive_test_ledger.md',
784 )
785 if os.path.isfile(path):
786 try:
787 candidates.append((os.path.getmtime(path), path))
788 except OSError:
789 continue
790 if not candidates:
791 return None
792 candidates.sort(reverse=True)
793 return candidates[0][1]
796def _parse_ledger_row(line: str) -> Optional[LedgerEntry]:
797 """Parse one markdown table row into a LedgerEntry, or None if the
798 line is not a data row (header, separator, prose, blank).
799 """
800 line = line.strip()
801 if not line.startswith('|'):
802 return None
803 if line.startswith('|---') or line.startswith('| ---'):
804 return None
805 if line.startswith('| #'):
806 return None
807 # Strip leading + trailing pipes, split on pipe.
808 parts = [p.strip() for p in line.strip('|').split('|')]
809 if len(parts) != 6:
810 return None
811 entry_id, ts, ask, ok, atest, status = parts
812 if not entry_id or entry_id == '#':
813 return None
814 return LedgerEntry(
815 id=entry_id,
816 timestamp=ts,
817 ask=ask.strip('"'),
818 success_criterion=ok,
819 acceptance_test=atest,
820 status=status,
821 )
824def load_user_ledger(
825 path: Optional[str] = None,
826 include_closed: bool = False,
827) -> List[LedgerEntry]:
828 """Parse the Hive Test Ledger and return open items (default).
830 Args:
831 path: explicit ledger file location. If None, resolves via
832 HIVE_LEDGER_PATH env or auto-discovery under
833 ~/.claude/projects/*/memory/.
834 include_closed: if True, include rows whose status begins
835 with `done_by_` or `claimed_by_`; default returns only
836 rows with status == 'open'.
838 Returns:
839 Ordered list of LedgerEntry in file order. Empty list if the
840 ledger does not exist or contains no parseable rows. Never
841 raises — read failures degrade to a warning + empty list so
842 agents that depend on this never crash on a missing memory
843 directory.
844 """
845 resolved = _resolve_ledger_path(path)
846 if resolved is None:
847 logger.info('hive_task_protocol.load_user_ledger: no ledger found')
848 return []
849 try:
850 with open(resolved, 'r', encoding='utf-8') as fh:
851 lines = fh.readlines()
852 except OSError as e:
853 logger.warning(
854 'hive_task_protocol.load_user_ledger: read failed %s: %s',
855 resolved, e,
856 )
857 return []
858 out: List[LedgerEntry] = []
859 for line in lines:
860 entry = _parse_ledger_row(line)
861 if entry is None:
862 continue
863 if include_closed or entry.status == 'open':
864 out.append(entry)
865 return out