Coverage for integrations / coding_agent / claude_hive_session.py: 69.1%
595 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Claude Code Hive Session — Connect Claude Code to the HART OS hive.
4When a user opts in, their Claude Code session becomes a hive worker:
5 - Receives coding tasks from the hive's task distributor
6 - Executes them using Claude Code's native capabilities
7 - Returns results through PeerLink
8 - Earns Spark tokens for contributions
10Protocol:
11 1. User runs: hart hive connect (or POST /api/hive/session/connect)
12 2. Session registers with PeerLink as a CODING_AGENT peer
13 3. Hive dispatcher sees this node as available for coding tasks
14 4. Tasks arrive via instruction queue (privacy-filtered through shard engine)
15 5. Claude Code executes: read files, edit code, run tests
16 6. Results published back via PeerLink + EventBus
17 7. Spark tokens awarded based on task complexity and quality
19Security:
20 - Shard engine filters what code is shared (INTERFACES level for untrusted peers)
21 - User can set scope: own repos only, public repos, or any hive task
22 - All code changes require user approval before commit
23 - Master key verification on task origin (no rogue dispatchers)
24"""
26import hashlib
27import json
28import logging
29import os
30import threading
31import time
32import uuid
33from typing import Any, Dict, List, Optional
35logger = logging.getLogger('hevolve.hive_session')
37# ─── Constants ───────────────────────────────────────────────────────
39TASK_SCOPES = {'own_repos', 'public', 'any'}
41SESSION_CAPABILITIES_DEFAULT = {
42 'languages': ['python', 'javascript', 'typescript', 'rust', 'go'],
43 'frameworks': ['flask', 'fastapi', 'react', 'next.js'],
44 'can_run_tests': True,
45 'can_edit_files': True,
46 'can_read_files': True,
47 'can_run_commands': True,
48 'max_file_size_kb': 500,
49 'max_task_duration_minutes': 30,
50}
52PEER_TYPE = 'CODING_AGENT'
54# Session status values
55STATUS_DISCONNECTED = 'disconnected'
56STATUS_CONNECTING = 'connecting'
57STATUS_IDLE = 'idle'
58STATUS_WORKING = 'working'
59STATUS_PAUSED = 'paused'
61_VALID_STATUSES = {
62 STATUS_DISCONNECTED, STATUS_CONNECTING, STATUS_IDLE,
63 STATUS_WORKING, STATUS_PAUSED,
64}
66# EventBus topic for hive task dispatch
67EVENT_TASK_DISPATCHED = 'hive.task.dispatched'
68EVENT_TASK_COMPLETED = 'hive.task.completed'
69EVENT_SESSION_CONNECTED = 'hive.session.connected'
70EVENT_SESSION_DISCONNECTED = 'hive.session.disconnected'
72# Spark reward constants (per-task base, scaled by complexity)
73SPARK_BASE_REWARD = 10
74SPARK_COMPLEXITY_MULTIPLIER = 5
75SPARK_QUALITY_BONUS_THRESHOLD = 0.8 # quality score above this earns bonus
77# Max pending tasks before rejecting new ones
78MAX_PENDING_TASKS = 20
81class ClaudeHiveSession:
82 """Manages a Claude Code session's connection to the HART OS hive.
84 Thread-safe. All public methods acquire self._lock before mutating state.
85 """
87 def __init__(self):
88 self.session_id: str = ''
89 self.user_id: str = ''
90 self.status: str = STATUS_DISCONNECTED
91 self.capabilities: Dict[str, Any] = {}
92 self.task_scope: str = 'own_repos'
93 self.current_task: Optional[Dict] = None
94 self.stats: Dict[str, Any] = {
95 'tasks_completed': 0,
96 'tasks_failed': 0,
97 'spark_earned': 0,
98 'avg_quality_score': 0.0,
99 'total_quality_points': 0.0,
100 'connected_since': None,
101 }
103 self._peer_link_id: Optional[str] = None
104 self._instruction_queue = None # Lazy: InstructionQueue from instruction_queue.py
105 self._lock = threading.Lock()
106 self._event_subscriptions: List = []
107 self._pending_tasks: List[Dict] = []
108 self._completed_tasks: List[Dict] = []
109 self._max_completed_history = 100
111 # ─── Connection Lifecycle ────────────────────────────────────────
113 def connect(self, user_id: str, capabilities: Dict = None,
114 task_scope: str = 'own_repos') -> Dict:
115 """Register with PeerLink as a CODING_AGENT worker node.
117 Subscribes to task dispatch events on EventBus so the hive
118 dispatcher can route coding tasks to this session.
120 Args:
121 user_id: HART OS user ID
122 capabilities: What this session can do (languages, etc.)
123 task_scope: 'own_repos', 'public', or 'any'
125 Returns:
126 Session info dict with session_id, status, peer_link_id
127 """
128 if task_scope not in TASK_SCOPES:
129 return {
130 'success': False,
131 'error': f'Invalid task_scope: {task_scope}. Must be one of {TASK_SCOPES}',
132 }
134 with self._lock:
135 if self.status != STATUS_DISCONNECTED:
136 return {
137 'success': False,
138 'error': f'Already connected (status={self.status})',
139 'session_id': self.session_id,
140 }
142 self.status = STATUS_CONNECTING
143 self.user_id = user_id
144 self.session_id = f'chs_{uuid.uuid4().hex[:12]}'
145 self.capabilities = capabilities or dict(SESSION_CAPABILITIES_DEFAULT)
146 self.task_scope = task_scope
147 self.stats['connected_since'] = time.time()
149 # Register with PeerLink
150 peer_link_id = self._register_peer_link()
152 # Subscribe to EventBus for task dispatch
153 self._subscribe_events()
155 # Initialize local instruction queue
156 self._init_instruction_queue()
158 with self._lock:
159 self._peer_link_id = peer_link_id
160 self.status = STATUS_IDLE
162 # Emit connection event
163 self._emit_event(EVENT_SESSION_CONNECTED, {
164 'session_id': self.session_id,
165 'user_id': self.user_id,
166 'capabilities': self.capabilities,
167 'task_scope': self.task_scope,
168 'peer_type': PEER_TYPE,
169 })
171 logger.info(
172 "Hive session connected: session=%s user=%s scope=%s peer=%s",
173 self.session_id, self.user_id, self.task_scope,
174 peer_link_id or 'local-only',
175 )
177 return {
178 'success': True,
179 'session_id': self.session_id,
180 'user_id': self.user_id,
181 'status': STATUS_IDLE,
182 'peer_link_id': peer_link_id,
183 'capabilities': self.capabilities,
184 'task_scope': self.task_scope,
185 }
187 def disconnect(self) -> Dict:
188 """Unregister from PeerLink, unsubscribe events, flush tasks.
190 Returns:
191 Summary of session including final stats
192 """
193 with self._lock:
194 if self.status == STATUS_DISCONNECTED:
195 return {'success': True, 'message': 'Already disconnected'}
197 prev_status = self.status
198 self.status = STATUS_DISCONNECTED
199 session_id = self.session_id
200 final_stats = dict(self.stats)
201 pending_count = len(self._pending_tasks)
203 # Unsubscribe from EventBus
204 self._unsubscribe_events()
206 # Unregister from PeerLink
207 self._unregister_peer_link()
209 # Flush pending tasks back to hive queue
210 if pending_count > 0:
211 self._flush_pending_tasks()
213 # Emit disconnection event
214 self._emit_event(EVENT_SESSION_DISCONNECTED, {
215 'session_id': session_id,
216 'user_id': self.user_id,
217 'final_stats': final_stats,
218 })
220 with self._lock:
221 self._peer_link_id = None
222 self.current_task = None
223 self.session_id = ''
225 logger.info(
226 "Hive session disconnected: session=%s stats=%s flushed=%d",
227 session_id, final_stats, pending_count,
228 )
230 return {
231 'success': True,
232 'session_id': session_id,
233 'previous_status': prev_status,
234 'stats': final_stats,
235 'flushed_tasks': pending_count,
236 }
238 # ─── Task Reception ──────────────────────────────────────────────
240 def receive_task(self, task: Dict) -> bool:
241 """Accept a coding task from the hive dispatcher.
243 Validates:
244 1. Session is idle or has room in pending queue
245 2. Task origin signature (master key verification)
246 3. Task scope matches session scope setting
247 4. Privacy filtering via shard engine
249 Args:
250 task: Dict with keys: task_id, description, origin_signature,
251 scope_level, target_files, test_expectations, etc.
253 Returns:
254 True if task was accepted and queued
255 """
256 task_id = task.get('task_id', 'unknown')
258 with self._lock:
259 if self.status not in (STATUS_IDLE, STATUS_WORKING):
260 logger.info("Task %s rejected: session status=%s", task_id, self.status)
261 return False
263 if len(self._pending_tasks) >= MAX_PENDING_TASKS:
264 logger.info("Task %s rejected: pending queue full (%d)",
265 task_id, MAX_PENDING_TASKS)
266 return False
268 # Validate task origin (master key signature)
269 if not self._verify_task_origin(task):
270 logger.warning("Task %s rejected: invalid origin signature", task_id)
271 return False
273 # Check scope compatibility
274 if not self._check_scope_match(task):
275 logger.info("Task %s rejected: scope mismatch (session=%s)",
276 task_id, self.task_scope)
277 return False
279 # Privacy filter through shard engine
280 filtered_task = self._apply_shard_filter(task)
281 if filtered_task is None:
282 logger.warning("Task %s rejected: shard filter denied", task_id)
283 return False
285 # Queue the task
286 with self._lock:
287 filtered_task['received_at'] = time.time()
288 filtered_task['status'] = 'pending'
289 self._pending_tasks.append(filtered_task)
291 # Also add to instruction queue for dependency ordering
292 self._enqueue_instruction(filtered_task)
294 logger.info("Task %s accepted (pending=%d)", task_id,
295 len(self._pending_tasks))
297 # If the session is idle, immediately start working on the task
298 with self._lock:
299 is_idle = (self.status == STATUS_IDLE and self.current_task is None)
300 if is_idle:
301 self._execute_next_task()
303 return True
305 def _execute_next_task(self):
306 """Pick the highest-priority pending task and execute it.
308 Called after receive_task() when idle, or by the daemon tick.
309 Runs synchronously -- caller should invoke from a background thread
310 if non-blocking execution is required.
311 """
312 with self._lock:
313 if self.status not in (STATUS_IDLE,):
314 return # Already working or paused/disconnected
315 if not self._pending_tasks:
316 return
318 # Sort by priority (descending), then by received_at (ascending)
319 self._pending_tasks.sort(
320 key=lambda t: (-t.get('priority', 0), t.get('received_at', 0))
321 )
322 task = self._pending_tasks[0]
324 task_id = task.get('task_id', 'unknown')
325 logger.info("Executing next task: %s", task_id)
327 # Execute the task through the full pipeline
328 result = self._execute_task_steps(task)
330 # Report the result back to the hive
331 self._report_result(task_id, result)
333 def _execute_task_steps(self, task: Dict) -> Dict:
334 """Full task execution pipeline:
336 1. Read instructions from task payload / shard
337 2. Apply shard filter for privacy
338 3. Execute via instruction queue + CREATE/REUSE pipeline
339 4. Collect and return results
341 Args:
342 task: Filtered task dict from the pending queue.
344 Returns:
345 Result dict compatible with execute_task() output.
346 """
347 task_id = task.get('task_id', 'unknown')
348 start_time = time.time()
350 with self._lock:
351 self.status = STATUS_WORKING
352 self.current_task = task
354 result = {
355 'task_id': task_id,
356 'status': 'pending',
357 'changes': [],
358 'test_results': None,
359 'error': None,
360 'duration_s': 0.0,
361 'complexity_score': 0,
362 }
364 try:
365 # Step 1: Read instructions (may come from shard or full payload)
366 description = task.get('description', '')
367 instructions = task.get('instructions', description)
368 target_files = task.get('target_files', task.get('files_scope', []))
370 # Step 2: If shard was provided, use shard content
371 shard = task.get('shard')
372 if shard and isinstance(shard, dict):
373 # Merge shard interface info into instructions
374 shard_content = shard.get('full_content', '')
375 if shard_content:
376 instructions = (
377 f"{instructions}\n\n--- Shard context ---\n{shard_content}"
378 )
380 # Step 3: Execute via instruction queue + pipeline
381 plan = self._build_execution_plan(instructions, target_files)
382 execution_result = self._dispatch_to_pipeline(
383 instructions, target_files, plan)
385 if execution_result.get('error'):
386 result['status'] = 'error'
387 result['error'] = execution_result['error']
388 else:
389 result['status'] = 'completed'
390 result['changes'] = execution_result.get('changes', [])
391 result['test_results'] = execution_result.get('test_results')
393 # Step 4: Score complexity
394 result['complexity_score'] = self._score_complexity(
395 target_files, result['changes'])
397 except Exception as e:
398 logger.error("Task %s execution steps error: %s", task_id, e)
399 result['status'] = 'error'
400 result['error'] = str(e)
402 result['duration_s'] = round(time.time() - start_time, 2)
404 # Restore session state
405 with self._lock:
406 self.current_task = None
407 self.status = STATUS_IDLE
409 # Remove from pending queue
410 self._pending_tasks = [
411 t for t in self._pending_tasks
412 if t.get('task_id') != task_id
413 ]
415 return result
417 def _report_result(self, task_id: str, result: Dict):
418 """Report task result back to the hive.
420 1. Calls report_result() for local stats + PeerLink + EventBus
421 2. Notifies HiveTaskDispatcher.on_task_result() for Spark award
422 3. Emits EVENT_TASK_COMPLETED
423 """
424 # Update local stats and publish via PeerLink/EventBus
425 self.report_result(task_id, result)
427 # Notify the HiveTaskDispatcher for validation + Spark distribution
428 try:
429 from integrations.coding_agent.hive_task_protocol import get_dispatcher
430 dispatcher = get_dispatcher()
432 # Build result dict expected by the dispatcher
433 dispatcher_result = {
434 'files_changed': [c.get('file') for c in result.get('changes', [])],
435 'diff': '\n'.join(
436 c.get('diff', '') for c in result.get('changes', [])),
437 'tests_passed': (
438 'passed' in (result.get('test_results') or '').lower()
439 if result.get('test_results') else None
440 ),
441 'test_output': result.get('test_results'),
442 'error': result.get('error'),
443 }
445 reward_info = dispatcher.on_task_result(task_id, dispatcher_result)
446 if reward_info.get('spark_awarded', 0) > 0:
447 logger.info(
448 "Task %s earned %d Spark (quality=%.3f)",
449 task_id, reward_info['spark_awarded'],
450 reward_info.get('quality_score', 0.0),
451 )
452 except ImportError:
453 logger.debug("HiveTaskDispatcher not available for result reporting")
454 except Exception as e:
455 logger.debug("Dispatcher result notification failed: %s", e)
457 # ─── Task Execution ──────────────────────────────────────────────
459 def execute_task(self, task: Dict) -> Dict:
460 """Execute a coding task using Claude Code capabilities.
462 This is the bridge between hive dispatch and Claude Code's
463 native ability to read files, edit code, and run tests.
465 Does NOT auto-commit -- returns proposed changes for user approval.
467 Args:
468 task: Filtered task dict with description, target_files, etc.
470 Returns:
471 Result dict: {task_id, status, changes, test_results, error,
472 duration_s, complexity_score}
473 """
474 task_id = task.get('task_id', 'unknown')
475 start_time = time.time()
477 with self._lock:
478 if self.status == STATUS_PAUSED:
479 return {
480 'task_id': task_id,
481 'status': 'rejected',
482 'error': 'Session is paused',
483 'changes': [],
484 'test_results': None,
485 }
487 self.status = STATUS_WORKING
488 self.current_task = task
490 result = {
491 'task_id': task_id,
492 'status': 'pending',
493 'changes': [],
494 'test_results': None,
495 'error': None,
496 'duration_s': 0.0,
497 'complexity_score': 0,
498 }
500 try:
501 # 1. Parse task instructions
502 description = task.get('description', '')
503 target_files = task.get('target_files', [])
504 test_expectations = task.get('test_expectations', [])
506 # 2. Build execution plan
507 plan = self._build_execution_plan(description, target_files)
509 # 3. Dispatch through CREATE/REUSE pipeline
510 execution_result = self._dispatch_to_pipeline(
511 description, target_files, plan)
513 if execution_result.get('error'):
514 result['status'] = 'error'
515 result['error'] = execution_result['error']
516 else:
517 result['status'] = 'completed'
518 result['changes'] = execution_result.get('changes', [])
519 result['test_results'] = execution_result.get('test_results')
521 # 4. Score complexity
522 result['complexity_score'] = self._score_complexity(
523 target_files, result['changes'])
525 except Exception as e:
526 logger.error("Task %s execution error: %s", task_id, e)
527 result['status'] = 'error'
528 result['error'] = str(e)
530 result['duration_s'] = round(time.time() - start_time, 2)
532 # Update session state
533 with self._lock:
534 self.current_task = None
535 self.status = STATUS_IDLE
537 # Remove from pending
538 self._pending_tasks = [
539 t for t in self._pending_tasks
540 if t.get('task_id') != task_id
541 ]
543 return result
545 def report_result(self, task_id: str, result: Dict) -> bool:
546 """Publish task result back to the hive via PeerLink + EventBus.
548 Includes quality metrics and triggers Spark reward calculation.
550 Args:
551 task_id: The completed task's ID
552 result: Output from execute_task()
554 Returns:
555 True if result was successfully published
556 """
557 with self._lock:
558 session_id = self.session_id
559 user_id = self.user_id
560 if not session_id:
561 return False
563 quality_score = self._compute_quality_score(result)
564 spark_reward = self._calculate_spark_reward(result, quality_score)
566 report = {
567 'session_id': session_id,
568 'user_id': user_id,
569 'task_id': task_id,
570 'status': result.get('status', 'unknown'),
571 'changes_count': len(result.get('changes', [])),
572 'test_results': result.get('test_results'),
573 'error': result.get('error'),
574 'duration_s': result.get('duration_s', 0),
575 'quality_score': quality_score,
576 'spark_reward': spark_reward,
577 'complexity_score': result.get('complexity_score', 0),
578 'reported_at': time.time(),
579 }
581 # Update local stats
582 with self._lock:
583 if result.get('status') == 'completed':
584 self.stats['tasks_completed'] += 1
585 total = self.stats['tasks_completed']
586 old_total_q = self.stats['total_quality_points']
587 new_total_q = old_total_q + quality_score
588 self.stats['total_quality_points'] = new_total_q
589 self.stats['avg_quality_score'] = (
590 round(new_total_q / total, 3) if total > 0 else 0.0
591 )
592 else:
593 self.stats['tasks_failed'] += 1
594 self.stats['spark_earned'] += spark_reward
596 # Add to completed history (capped)
597 self._completed_tasks.append(report)
598 if len(self._completed_tasks) > self._max_completed_history:
599 self._completed_tasks = self._completed_tasks[
600 -self._max_completed_history:]
602 # Publish via PeerLink
603 published = self._publish_via_peer_link('dispatch', {
604 'type': 'task_result',
605 'payload': report,
606 })
608 # Emit EventBus event
609 self._emit_event(EVENT_TASK_COMPLETED, report)
611 # Record Spark reward
612 self._record_spark_reward(user_id, task_id, spark_reward)
614 logger.info(
615 "Task %s result reported: status=%s quality=%.2f spark=%d",
616 task_id, result.get('status'), quality_score, spark_reward,
617 )
618 return published or True # Event was emitted even if PeerLink unavailable
620 # ─── Status & Configuration ──────────────────────────────────────
622 def get_status(self) -> Dict:
623 """Return current session state, stats, and active task info."""
624 with self._lock:
625 return {
626 'session_id': self.session_id,
627 'user_id': self.user_id,
628 'status': self.status,
629 'task_scope': self.task_scope,
630 'capabilities': self.capabilities,
631 'current_task': (
632 {'task_id': self.current_task.get('task_id'),
633 'description': self.current_task.get('description', '')[:200]}
634 if self.current_task else None
635 ),
636 'pending_tasks': len(self._pending_tasks),
637 'completed_tasks': len(self._completed_tasks),
638 'stats': dict(self.stats),
639 'peer_link_id': self._peer_link_id,
640 }
642 def set_task_scope(self, scope: str) -> Dict:
643 """Update what tasks this session accepts.
645 Args:
646 scope: 'own_repos', 'public', or 'any'
648 Returns:
649 Result dict with success flag
650 """
651 if scope not in TASK_SCOPES:
652 return {
653 'success': False,
654 'error': f'Invalid scope: {scope}. Must be one of {TASK_SCOPES}',
655 }
656 with self._lock:
657 old_scope = self.task_scope
658 self.task_scope = scope
659 logger.info("Task scope changed: %s -> %s", old_scope, scope)
660 return {'success': True, 'previous_scope': old_scope, 'scope': scope}
662 def pause(self) -> Dict:
663 """Temporarily stop accepting new tasks.
665 Current task continues if one is in progress.
666 """
667 with self._lock:
668 if self.status == STATUS_DISCONNECTED:
669 return {'success': False, 'error': 'Not connected'}
670 if self.status == STATUS_PAUSED:
671 return {'success': True, 'message': 'Already paused'}
672 prev = self.status
673 self.status = STATUS_PAUSED
674 logger.info("Hive session paused (was %s)", prev)
675 return {'success': True, 'previous_status': prev, 'status': STATUS_PAUSED}
677 def resume(self) -> Dict:
678 """Resume accepting tasks after a pause."""
679 with self._lock:
680 if self.status != STATUS_PAUSED:
681 return {
682 'success': False,
683 'error': f'Not paused (status={self.status})',
684 }
685 self.status = STATUS_IDLE
686 logger.info("Hive session resumed")
687 return {'success': True, 'status': STATUS_IDLE}
689 def get_tasks(self) -> Dict:
690 """List pending and completed tasks."""
691 with self._lock:
692 pending = [
693 {'task_id': t.get('task_id'), 'description': t.get('description', '')[:200],
694 'received_at': t.get('received_at')}
695 for t in self._pending_tasks
696 ]
697 completed = [
698 {'task_id': t.get('task_id'), 'status': t.get('status'),
699 'quality_score': t.get('quality_score'),
700 'spark_reward': t.get('spark_reward'),
701 'reported_at': t.get('reported_at')}
702 for t in self._completed_tasks[-50:] # Last 50
703 ]
704 return {'pending': pending, 'completed': completed}
706 # ─── Internal: PeerLink Integration ──────────────────────────────
708 def _register_peer_link(self) -> Optional[str]:
709 """Register this session as a CODING_AGENT peer in PeerLinkManager."""
710 try:
711 from core.peer_link.link_manager import get_link_manager
712 manager = get_link_manager()
713 if not manager:
714 return None
716 # Generate a peer ID for this session
717 node_id = os.environ.get('HEVOLVE_NODE_ID', '')
718 if not node_id:
719 node_id = hashlib.sha256(
720 f"{self.user_id}:{self.session_id}".encode()
721 ).hexdigest()[:16]
723 peer_id = f"{PEER_TYPE}_{node_id}_{self.session_id}"
725 # Advertise capabilities so the hive dispatcher can match tasks
726 manager.broadcast('dispatch', {
727 'type': 'peer_announce',
728 'peer_type': PEER_TYPE,
729 'peer_id': peer_id,
730 'user_id': self.user_id,
731 'session_id': self.session_id,
732 'capabilities': self.capabilities,
733 'task_scope': self.task_scope,
734 })
736 return peer_id
737 except ImportError:
738 logger.debug("PeerLink not available, running in local-only mode")
739 return None
740 except Exception as e:
741 logger.warning("PeerLink registration failed: %s", e)
742 return None
744 def _unregister_peer_link(self):
745 """Unregister from PeerLink on disconnect."""
746 try:
747 from core.peer_link.link_manager import get_link_manager
748 manager = get_link_manager()
749 if not manager:
750 return
752 manager.broadcast('dispatch', {
753 'type': 'peer_depart',
754 'peer_type': PEER_TYPE,
755 'peer_id': self._peer_link_id or '',
756 'user_id': self.user_id,
757 'session_id': self.session_id,
758 })
759 except Exception:
760 pass
762 def _publish_via_peer_link(self, channel: str, data: Dict) -> bool:
763 """Publish data via PeerLink broadcast."""
764 try:
765 from core.peer_link.link_manager import get_link_manager
766 manager = get_link_manager()
767 if manager:
768 sent = manager.broadcast(channel, data)
769 return sent > 0
770 except Exception as e:
771 logger.debug("PeerLink publish failed: %s", e)
772 return False
774 # ─── Internal: EventBus Integration ──────────────────────────────
776 def _subscribe_events(self):
777 """Subscribe to hive task dispatch events on the platform EventBus."""
778 try:
779 from core.platform.registry import get_registry
780 registry = get_registry()
781 if not registry or not registry.has('events'):
782 return
783 bus = registry.get('events')
785 def on_task_dispatched(topic, data):
786 """Handle incoming task from hive dispatcher."""
787 if not isinstance(data, dict):
788 return
789 # Only accept tasks targeted at us or broadcast
790 target = data.get('target_session')
791 if target and target != self.session_id:
792 return
793 self.receive_task(data)
795 bus.on(EVENT_TASK_DISPATCHED, on_task_dispatched)
796 self._event_subscriptions.append(
797 (EVENT_TASK_DISPATCHED, on_task_dispatched))
798 except Exception as e:
799 logger.debug("EventBus subscription failed: %s", e)
801 def _unsubscribe_events(self):
802 """Unsubscribe all EventBus listeners."""
803 try:
804 from core.platform.registry import get_registry
805 registry = get_registry()
806 if not registry or not registry.has('events'):
807 return
808 bus = registry.get('events')
809 for topic, callback in self._event_subscriptions:
810 bus.off(topic, callback)
811 except Exception:
812 pass
813 self._event_subscriptions.clear()
815 def _emit_event(self, topic: str, data: Any):
816 """Emit an event on the platform EventBus (best-effort)."""
817 try:
818 from core.platform.events import emit_event
819 emit_event(topic, data)
820 except Exception:
821 pass
823 # ─── Internal: Instruction Queue ─────────────────────────────────
825 def _init_instruction_queue(self):
826 """Initialize local instruction queue for incoming tasks."""
827 try:
828 from integrations.agent_engine.instruction_queue import get_queue
829 self._instruction_queue = get_queue(self.user_id)
830 except Exception as e:
831 logger.debug("Instruction queue init failed: %s", e)
832 self._instruction_queue = None
834 def _enqueue_instruction(self, task: Dict):
835 """Add a task to the instruction queue for dependency ordering."""
836 try:
837 from integrations.agent_engine.instruction_queue import enqueue_instruction
838 enqueue_instruction(
839 user_id=self.user_id,
840 text=task.get('description', '')[:2000],
841 priority=task.get('priority', 5),
842 tags=['hive_task', PEER_TYPE],
843 context={
844 'task_id': task.get('task_id'),
845 'source': 'hive_session',
846 'session_id': self.session_id,
847 'target_files': task.get('target_files', []),
848 },
849 )
850 except Exception as e:
851 logger.debug("Instruction enqueue failed: %s", e)
853 # ─── Internal: Security ──────────────────────────────────────────
855 def _verify_task_origin(self, task: Dict) -> bool:
856 """Verify that a task was dispatched by a legitimate hive node.
858 Uses master key signature verification to prevent rogue dispatchers.
859 Tasks without a signature are accepted only from SAME_USER trust level.
860 """
861 signature = task.get('origin_signature', '')
862 if not signature:
863 # Allow unsigned tasks only from SAME_USER trust (own devices)
864 trust = task.get('trust_level', '')
865 if trust == 'same_user':
866 return True
867 logger.debug("Task %s has no signature and trust=%s",
868 task.get('task_id'), trust)
869 return False
871 try:
872 from security.master_key import verify_master_signature
873 # Build payload to verify (exclude the signature field itself)
874 payload = {k: v for k, v in task.items()
875 if k != 'origin_signature'}
876 return verify_master_signature(payload, signature)
877 except ImportError:
878 logger.warning("master_key module unavailable — rejecting task (cannot verify)")
879 return False
880 except Exception as e:
881 logger.warning("Task origin verification failed: %s", e)
882 return False
884 def _check_scope_match(self, task: Dict) -> bool:
885 """Check whether a task matches this session's scope setting."""
886 task_scope_level = task.get('scope_level', 'any')
888 if self.task_scope == 'any':
889 return True
890 if self.task_scope == 'public':
891 return task_scope_level in ('public', 'own_repos')
892 if self.task_scope == 'own_repos':
893 # Only accept tasks for repos owned by this user
894 task_owner = task.get('repo_owner', '')
895 return task_owner == self.user_id or task_scope_level == 'own_repos'
896 return False
898 def _apply_shard_filter(self, task: Dict) -> Optional[Dict]:
899 """Filter task content through the shard engine based on trust level.
901 - SAME_USER trust: FULL_FILE scope (see everything)
902 - PEER trust: INTERFACES scope (signatures + types only)
903 - RELAY/unknown: SIGNATURES scope (minimal)
904 """
905 trust = task.get('trust_level', 'relay')
907 try:
908 from integrations.agent_engine.shard_engine import (
909 ShardScope, ShardEngine, get_shard_engine,
910 )
912 if trust == 'same_user':
913 scope = ShardScope.FULL_FILE
914 elif trust == 'peer':
915 scope = ShardScope.INTERFACES
916 else:
917 scope = ShardScope.SIGNATURES
919 # If task has raw file content and we need to filter it down,
920 # run it through the shard engine
921 if task.get('full_content') and scope != ShardScope.FULL_FILE:
922 engine = get_shard_engine()
923 shard = engine.create_shard(
924 task=task.get('description', ''),
925 target_files=task.get('target_files', []),
926 scope=scope,
927 )
928 # Replace full content with filtered view
929 filtered = dict(task)
930 filtered['full_content'] = shard.full_content
931 filtered['interface_specs'] = [
932 s.file_path for s in shard.interface_specs
933 ]
934 filtered['shard_scope'] = scope.value
935 return filtered
937 except ImportError:
938 logger.debug("Shard engine not available, passing task as-is")
939 except Exception as e:
940 logger.warning("Shard filter error: %s", e)
942 # Pass through unmodified (no shard engine or no filtering needed)
943 return dict(task)
945 # ─── Internal: Task Execution ────────────────────────────────────
947 def _build_execution_plan(self, description: str,
948 target_files: List[str]) -> Dict:
949 """Build an execution plan from task description.
951 Returns a structured plan dict that the pipeline can execute.
952 """
953 plan = {
954 'description': description,
955 'steps': [],
956 }
958 # Determine steps from description
959 if target_files:
960 plan['steps'].append({
961 'action': 'read_files',
962 'files': target_files,
963 })
965 plan['steps'].append({
966 'action': 'execute_instructions',
967 'description': description,
968 })
970 if target_files:
971 plan['steps'].append({
972 'action': 'run_tests',
973 'scope': 'affected',
974 })
976 return plan
978 def _dispatch_to_pipeline(self, description: str,
979 target_files: List[str],
980 plan: Dict) -> Dict:
981 """Dispatch the task through the CREATE/REUSE pipeline.
983 Uses dispatch_goal() (3-tier: in-process, HTTP, llama.cpp fallback)
984 to execute the coding task through the standard agent pipeline.
985 """
986 result = {
987 'changes': [],
988 'test_results': None,
989 'error': None,
990 }
992 prompt = self._build_task_prompt(description, target_files)
994 try:
995 from integrations.agent_engine.dispatch import dispatch_goal
997 # Generate a deterministic goal_id from task content
998 goal_id = hashlib.sha256(
999 f"hive:{self.session_id}:{description[:200]}".encode()
1000 ).hexdigest()[:16]
1002 response = dispatch_goal(
1003 prompt=prompt,
1004 user_id=self.user_id,
1005 goal_id=goal_id,
1006 goal_type='coding',
1007 )
1009 if response:
1010 result['changes'] = self._parse_changes_from_response(response)
1011 result['test_results'] = self._extract_test_results(response)
1012 else:
1013 result['error'] = 'Pipeline returned no response'
1015 except Exception as e:
1016 result['error'] = f'Pipeline dispatch failed: {e}'
1018 return result
1020 def _build_task_prompt(self, description: str,
1021 target_files: List[str]) -> str:
1022 """Build a structured prompt for the CREATE/REUSE pipeline."""
1023 parts = [
1024 f"HIVE CODING TASK (session {self.session_id}):",
1025 f"\n{description}",
1026 ]
1027 if target_files:
1028 parts.append(f"\nTarget files: {', '.join(target_files)}")
1029 parts.append("\nProvide changes as file diffs. Do NOT commit.")
1030 return '\n'.join(parts)
1032 def _parse_changes_from_response(self, response: str) -> List[Dict]:
1033 """Extract file changes from pipeline response."""
1034 changes = []
1035 # Look for diff-like patterns in the response
1036 current_file = None
1037 current_diff_lines = []
1039 for line in response.split('\n'):
1040 if line.startswith('--- a/') or line.startswith('+++ b/'):
1041 fname = line[6:].strip()
1042 if line.startswith('+++ b/') and fname:
1043 current_file = fname
1044 elif line.startswith('@@') and current_file:
1045 if current_diff_lines and current_file:
1046 changes.append({
1047 'file': current_file,
1048 'diff': '\n'.join(current_diff_lines),
1049 })
1050 current_diff_lines = [line]
1051 elif current_file and (line.startswith('+') or line.startswith('-')
1052 or line.startswith(' ')):
1053 current_diff_lines.append(line)
1055 # Flush last change
1056 if current_file and current_diff_lines:
1057 changes.append({
1058 'file': current_file,
1059 'diff': '\n'.join(current_diff_lines),
1060 })
1062 return changes
1064 def _extract_test_results(self, response: str) -> Optional[str]:
1065 """Extract test results from pipeline response, if any."""
1066 # Look for common test output patterns
1067 markers = ['PASSED', 'FAILED', 'ERROR', 'tests passed',
1068 'test session', 'pytest']
1069 for marker in markers:
1070 idx = response.lower().find(marker.lower())
1071 if idx >= 0:
1072 # Return surrounding context
1073 start = max(0, idx - 200)
1074 end = min(len(response), idx + 500)
1075 return response[start:end]
1076 return None
1078 # ─── Internal: Quality & Rewards ─────────────────────────────────
1080 def _compute_quality_score(self, result: Dict) -> float:
1081 """Compute a quality score for a completed task (0.0 to 1.0)."""
1082 if result.get('status') != 'completed':
1083 return 0.0
1085 score = 0.5 # Base score for completion
1087 # Bonus for having changes
1088 changes = result.get('changes', [])
1089 if changes:
1090 score += 0.2
1092 # Bonus for passing tests
1093 test_results = result.get('test_results', '')
1094 if test_results:
1095 lower = test_results.lower()
1096 if 'passed' in lower and 'failed' not in lower:
1097 score += 0.3
1098 elif 'passed' in lower:
1099 score += 0.1
1101 return min(1.0, round(score, 3))
1103 def _calculate_spark_reward(self, result: Dict,
1104 quality_score: float) -> int:
1105 """Calculate Spark token reward for a completed task."""
1106 if result.get('status') != 'completed':
1107 return 0
1109 complexity = result.get('complexity_score', 1)
1110 base = SPARK_BASE_REWARD
1111 reward = base + (complexity * SPARK_COMPLEXITY_MULTIPLIER)
1113 # Quality bonus
1114 if quality_score >= SPARK_QUALITY_BONUS_THRESHOLD:
1115 reward = int(reward * 1.5)
1117 return int(reward)
1119 def _score_complexity(self, target_files: List[str],
1120 changes: List[Dict]) -> int:
1121 """Score task complexity (1-10) based on files and changes."""
1122 score = 1
1123 score += min(len(target_files), 3) # Up to +3 for multi-file
1124 score += min(len(changes), 3) # Up to +3 for many changes
1126 # Count total diff lines
1127 total_lines = sum(
1128 len(c.get('diff', '').split('\n')) for c in changes
1129 )
1130 if total_lines > 100:
1131 score += 2
1132 elif total_lines > 30:
1133 score += 1
1135 return min(10, score)
1137 def _record_spark_reward(self, user_id: str, task_id: str,
1138 spark_amount: int):
1139 """Record Spark reward in the hosting reward system."""
1140 if spark_amount <= 0:
1141 return
1142 try:
1143 from integrations.agent_engine.hosting_reward_service import (
1144 get_hosting_reward_service,
1145 )
1146 svc = get_hosting_reward_service()
1147 if svc:
1148 svc.record_contribution(
1149 user_id=user_id,
1150 contribution_type='hive_coding_task',
1151 amount=spark_amount,
1152 metadata={'task_id': task_id, 'source': 'claude_hive_session'},
1153 )
1154 except Exception as e:
1155 logger.debug("Spark reward recording failed: %s", e)
1157 # ─── Internal: Flush on Disconnect ───────────────────────────────
1159 def _flush_pending_tasks(self):
1160 """Return pending tasks to the hive queue on disconnect."""
1161 with self._lock:
1162 tasks = list(self._pending_tasks)
1163 self._pending_tasks.clear()
1165 for task in tasks:
1166 try:
1167 self._publish_via_peer_link('dispatch', {
1168 'type': 'task_returned',
1169 'task_id': task.get('task_id'),
1170 'reason': 'session_disconnect',
1171 'session_id': self.session_id,
1172 })
1173 except Exception:
1174 pass
1177# ═════════════════════════════════════════════════════════════════════
1178# Module-level Singleton
1179# ═════════════════════════════════════════════════════════════════════
1181_session: Optional[ClaudeHiveSession] = None
1182_session_lock = threading.Lock()
1185def get_hive_session() -> ClaudeHiveSession:
1186 """Get or create the ClaudeHiveSession singleton."""
1187 global _session
1188 if _session is None:
1189 with _session_lock:
1190 if _session is None:
1191 _session = ClaudeHiveSession()
1192 return _session
1195class SessionRegistry:
1196 """Registry of all active Claude Code hive sessions.
1198 The HiveTaskDispatcher.match_session() queries this to find
1199 available sessions for task assignment. Tracks both the local
1200 singleton session and any remote sessions announced via PeerLink.
1202 Thread-safe: all mutations are guarded by ``_lock``.
1203 """
1205 def __init__(self):
1206 self._lock = threading.Lock()
1207 # session_id -> ClaudeHiveSession or Dict (remote announcement)
1208 self._sessions: Dict[str, Any] = {}
1210 def register(self, session) -> None:
1211 """Register a session (local ClaudeHiveSession or remote announcement dict).
1213 Args:
1214 session: A ClaudeHiveSession instance or a dict with at minimum
1215 'session_id' and 'status' keys.
1216 """
1217 sid = (session.session_id if hasattr(session, 'session_id')
1218 else session.get('session_id', ''))
1219 if not sid:
1220 return
1221 with self._lock:
1222 self._sessions[sid] = session
1223 logger.debug("Session registered in registry: %s", sid)
1225 def unregister(self, session_id: str) -> None:
1226 """Remove a session from the registry.
1228 Args:
1229 session_id: The session ID to remove.
1230 """
1231 with self._lock:
1232 removed = self._sessions.pop(session_id, None)
1233 if removed:
1234 logger.debug("Session unregistered from registry: %s", session_id)
1236 def get_session(self, session_id: str):
1237 """Get a specific session by ID.
1239 Returns:
1240 ClaudeHiveSession instance or None.
1241 """
1242 with self._lock:
1243 entry = self._sessions.get(session_id)
1245 # If it's a dict (remote announcement), return None
1246 # (only real session objects can execute tasks)
1247 if entry and hasattr(entry, 'receive_task'):
1248 return entry
1250 # Fallback: check the module-level singleton
1251 session = get_hive_session()
1252 if session.session_id == session_id:
1253 return session
1254 return None
1256 def get_available_sessions(self) -> List[Dict]:
1257 """Return list of session info dicts available for task assignment.
1259 Each dict contains the keys expected by HiveTaskDispatcher.match_session():
1260 - session_id, status, languages, quality_score, region, capabilities
1262 Returns:
1263 List of session info dicts.
1264 """
1265 results: List[Dict] = []
1267 # Always include the local singleton if it's active
1268 local = get_hive_session()
1269 if local.session_id and local.status in (STATUS_IDLE, STATUS_WORKING):
1270 results.append(self._session_to_dict(local))
1272 # Include registered remote sessions
1273 with self._lock:
1274 for sid, entry in self._sessions.items():
1275 if hasattr(entry, 'session_id'):
1276 # Local ClaudeHiveSession instance
1277 if entry.session_id == local.session_id:
1278 continue # Already included above
1279 if entry.status in (STATUS_IDLE, STATUS_WORKING):
1280 results.append(self._session_to_dict(entry))
1281 elif isinstance(entry, dict):
1282 # Remote announcement dict
1283 status = entry.get('status', '')
1284 if status in (STATUS_IDLE, STATUS_WORKING):
1285 results.append(entry)
1287 return results
1289 @staticmethod
1290 def _session_to_dict(session) -> Dict:
1291 """Convert a ClaudeHiveSession to the dict format expected by the dispatcher."""
1292 caps = getattr(session, 'capabilities', {}) or {}
1293 stats = getattr(session, 'stats', {}) or {}
1294 return {
1295 'session_id': session.session_id,
1296 'user_id': getattr(session, 'user_id', ''),
1297 'status': session.status,
1298 'languages': caps.get('languages', []),
1299 'frameworks': caps.get('frameworks', []),
1300 'capabilities': caps,
1301 'task_scope': getattr(session, 'task_scope', 'own_repos'),
1302 'quality_score': stats.get('avg_quality_score', 0.5),
1303 'region': os.environ.get('HEVOLVE_REGION', ''),
1304 'peer_link_id': getattr(session, '_peer_link_id', ''),
1305 }
1308_registry = None
1309_registry_lock = threading.Lock()
1312def get_session_registry() -> SessionRegistry:
1313 """Get or create the SessionRegistry singleton."""
1314 global _registry
1315 if _registry is None:
1316 with _registry_lock:
1317 if _registry is None:
1318 _registry = SessionRegistry()
1319 return _registry
1322# ═════════════════════════════════════════════════════════════════════
1323# Flask Blueprint
1324# ═════════════════════════════════════════════════════════════════════
1326_blueprint: Optional[Any] = None
1327_blueprint_lock = threading.Lock()
1330def _create_blueprint():
1331 """Create the hive session Flask Blueprint."""
1332 try:
1333 from flask import Blueprint, request, jsonify
1334 except ImportError:
1335 return None
1337 bp = Blueprint('hive_session', __name__)
1339 @bp.route('/api/hive/session/connect', methods=['POST'])
1340 def api_connect():
1341 data = request.get_json(silent=True) or {}
1342 user_id = data.get('user_id', '')
1343 if not user_id:
1344 return jsonify({'success': False, 'error': 'user_id is required'}), 400
1346 capabilities = data.get('capabilities')
1347 task_scope = data.get('task_scope', 'own_repos')
1349 session = get_hive_session()
1350 result = session.connect(
1351 user_id=user_id,
1352 capabilities=capabilities,
1353 task_scope=task_scope,
1354 )
1356 status_code = 200 if result.get('success') else 400
1357 return jsonify(result), status_code
1359 @bp.route('/api/hive/session/disconnect', methods=['POST'])
1360 def api_disconnect():
1361 session = get_hive_session()
1362 result = session.disconnect()
1363 return jsonify(result), 200
1365 @bp.route('/api/hive/session/status', methods=['GET'])
1366 def api_status():
1367 session = get_hive_session()
1368 return jsonify(session.get_status()), 200
1370 @bp.route('/api/hive/session/pause', methods=['POST'])
1371 def api_pause():
1372 session = get_hive_session()
1373 result = session.pause()
1374 status_code = 200 if result.get('success') else 400
1375 return jsonify(result), status_code
1377 @bp.route('/api/hive/session/resume', methods=['POST'])
1378 def api_resume():
1379 session = get_hive_session()
1380 result = session.resume()
1381 status_code = 200 if result.get('success') else 400
1382 return jsonify(result), status_code
1384 @bp.route('/api/hive/session/scope', methods=['POST'])
1385 def api_set_scope():
1386 data = request.get_json(silent=True) or {}
1387 scope = data.get('scope', '')
1388 if not scope:
1389 return jsonify({'success': False, 'error': 'scope is required'}), 400
1391 session = get_hive_session()
1392 result = session.set_task_scope(scope)
1393 status_code = 200 if result.get('success') else 400
1394 return jsonify(result), status_code
1396 @bp.route('/api/hive/session/tasks', methods=['GET'])
1397 def api_tasks():
1398 session = get_hive_session()
1399 return jsonify(session.get_tasks()), 200
1401 @bp.route('/api/hive/session/task/<task_id>/result', methods=['POST'])
1402 def api_task_result(task_id):
1403 """Report a task result externally (e.g., from a remote session).
1405 Body JSON: {status, changes, test_results, error, duration_s, ...}
1406 """
1407 data = request.get_json(silent=True) or {}
1408 if not task_id:
1409 return jsonify({'success': False, 'error': 'task_id is required'}), 400
1411 session = get_hive_session()
1412 if session.status == STATUS_DISCONNECTED:
1413 return jsonify({'success': False, 'error': 'Session not connected'}), 400
1415 # Build result dict from request body
1416 result = {
1417 'task_id': task_id,
1418 'status': data.get('status', 'completed'),
1419 'changes': data.get('changes', []),
1420 'test_results': data.get('test_results'),
1421 'error': data.get('error'),
1422 'duration_s': data.get('duration_s', 0.0),
1423 'complexity_score': data.get('complexity_score', 0),
1424 }
1426 published = session.report_result(task_id, result)
1428 # Also notify the dispatcher if available
1429 reward_info = {}
1430 try:
1431 from integrations.coding_agent.hive_task_protocol import get_dispatcher
1432 dispatcher = get_dispatcher()
1433 dispatcher_result = {
1434 'files_changed': [c.get('file') for c in result.get('changes', [])],
1435 'diff': '\n'.join(
1436 c.get('diff', '') for c in result.get('changes', [])),
1437 'tests_passed': data.get('tests_passed'),
1438 'test_output': data.get('test_results'),
1439 'error': data.get('error'),
1440 }
1441 reward_info = dispatcher.on_task_result(task_id, dispatcher_result)
1442 except Exception:
1443 pass
1445 return jsonify({
1446 'success': True,
1447 'task_id': task_id,
1448 'published': published,
1449 'reward': reward_info,
1450 }), 200
1452 return bp
1455def get_blueprint():
1456 """Get or create the hive_session Flask Blueprint.
1458 Returns None if Flask is not installed.
1459 """
1460 global _blueprint
1461 if _blueprint is None:
1462 with _blueprint_lock:
1463 if _blueprint is None:
1464 _blueprint = _create_blueprint()
1465 return _blueprint
1468# Public alias matching the naming convention in hive_signal_bridge.py
1469create_hive_session_blueprint = _create_blueprint
1472# Convenience alias for registration in hart_intelligence_entry.py:
1473# from integrations.coding_agent.claude_hive_session import hive_session_bp
1474# if hive_session_bp: app.register_blueprint(hive_session_bp)
1475hive_session_bp = get_blueprint()