Coverage for integrations / coding_agent / claude_hive_session.py: 69.1%

595 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Claude Code Hive Session — Connect Claude Code to the HART OS hive. 

3 

4When a user opts in, their Claude Code session becomes a hive worker: 

5 - Receives coding tasks from the hive's task distributor 

6 - Executes them using Claude Code's native capabilities 

7 - Returns results through PeerLink 

8 - Earns Spark tokens for contributions 

9 

10Protocol: 

11 1. User runs: hart hive connect (or POST /api/hive/session/connect) 

12 2. Session registers with PeerLink as a CODING_AGENT peer 

13 3. Hive dispatcher sees this node as available for coding tasks 

14 4. Tasks arrive via instruction queue (privacy-filtered through shard engine) 

15 5. Claude Code executes: read files, edit code, run tests 

16 6. Results published back via PeerLink + EventBus 

17 7. Spark tokens awarded based on task complexity and quality 

18 

19Security: 

20 - Shard engine filters what code is shared (INTERFACES level for untrusted peers) 

21 - User can set scope: own repos only, public repos, or any hive task 

22 - All code changes require user approval before commit 

23 - Master key verification on task origin (no rogue dispatchers) 

24""" 

25 

26import hashlib 

27import json 

28import logging 

29import os 

30import threading 

31import time 

32import uuid 

33from typing import Any, Dict, List, Optional 

34 

35logger = logging.getLogger('hevolve.hive_session') 

36 

37# ─── Constants ─────────────────────────────────────────────────────── 

38 

39TASK_SCOPES = {'own_repos', 'public', 'any'} 

40 

41SESSION_CAPABILITIES_DEFAULT = { 

42 'languages': ['python', 'javascript', 'typescript', 'rust', 'go'], 

43 'frameworks': ['flask', 'fastapi', 'react', 'next.js'], 

44 'can_run_tests': True, 

45 'can_edit_files': True, 

46 'can_read_files': True, 

47 'can_run_commands': True, 

48 'max_file_size_kb': 500, 

49 'max_task_duration_minutes': 30, 

50} 

51 

52PEER_TYPE = 'CODING_AGENT' 

53 

54# Session status values 

55STATUS_DISCONNECTED = 'disconnected' 

56STATUS_CONNECTING = 'connecting' 

57STATUS_IDLE = 'idle' 

58STATUS_WORKING = 'working' 

59STATUS_PAUSED = 'paused' 

60 

61_VALID_STATUSES = { 

62 STATUS_DISCONNECTED, STATUS_CONNECTING, STATUS_IDLE, 

63 STATUS_WORKING, STATUS_PAUSED, 

64} 

65 

66# EventBus topic for hive task dispatch 

67EVENT_TASK_DISPATCHED = 'hive.task.dispatched' 

68EVENT_TASK_COMPLETED = 'hive.task.completed' 

69EVENT_SESSION_CONNECTED = 'hive.session.connected' 

70EVENT_SESSION_DISCONNECTED = 'hive.session.disconnected' 

71 

72# Spark reward constants (per-task base, scaled by complexity) 

73SPARK_BASE_REWARD = 10 

74SPARK_COMPLEXITY_MULTIPLIER = 5 

75SPARK_QUALITY_BONUS_THRESHOLD = 0.8 # quality score above this earns bonus 

76 

77# Max pending tasks before rejecting new ones 

78MAX_PENDING_TASKS = 20 

79 

80 

81class ClaudeHiveSession: 

82 """Manages a Claude Code session's connection to the HART OS hive. 

83 

84 Thread-safe. All public methods acquire self._lock before mutating state. 

85 """ 

86 

87 def __init__(self): 

88 self.session_id: str = '' 

89 self.user_id: str = '' 

90 self.status: str = STATUS_DISCONNECTED 

91 self.capabilities: Dict[str, Any] = {} 

92 self.task_scope: str = 'own_repos' 

93 self.current_task: Optional[Dict] = None 

94 self.stats: Dict[str, Any] = { 

95 'tasks_completed': 0, 

96 'tasks_failed': 0, 

97 'spark_earned': 0, 

98 'avg_quality_score': 0.0, 

99 'total_quality_points': 0.0, 

100 'connected_since': None, 

101 } 

102 

103 self._peer_link_id: Optional[str] = None 

104 self._instruction_queue = None # Lazy: InstructionQueue from instruction_queue.py 

105 self._lock = threading.Lock() 

106 self._event_subscriptions: List = [] 

107 self._pending_tasks: List[Dict] = [] 

108 self._completed_tasks: List[Dict] = [] 

109 self._max_completed_history = 100 

110 

111 # ─── Connection Lifecycle ──────────────────────────────────────── 

112 

113 def connect(self, user_id: str, capabilities: Dict = None, 

114 task_scope: str = 'own_repos') -> Dict: 

115 """Register with PeerLink as a CODING_AGENT worker node. 

116 

117 Subscribes to task dispatch events on EventBus so the hive 

118 dispatcher can route coding tasks to this session. 

119 

120 Args: 

121 user_id: HART OS user ID 

122 capabilities: What this session can do (languages, etc.) 

123 task_scope: 'own_repos', 'public', or 'any' 

124 

125 Returns: 

126 Session info dict with session_id, status, peer_link_id 

127 """ 

128 if task_scope not in TASK_SCOPES: 

129 return { 

130 'success': False, 

131 'error': f'Invalid task_scope: {task_scope}. Must be one of {TASK_SCOPES}', 

132 } 

133 

134 with self._lock: 

135 if self.status != STATUS_DISCONNECTED: 

136 return { 

137 'success': False, 

138 'error': f'Already connected (status={self.status})', 

139 'session_id': self.session_id, 

140 } 

141 

142 self.status = STATUS_CONNECTING 

143 self.user_id = user_id 

144 self.session_id = f'chs_{uuid.uuid4().hex[:12]}' 

145 self.capabilities = capabilities or dict(SESSION_CAPABILITIES_DEFAULT) 

146 self.task_scope = task_scope 

147 self.stats['connected_since'] = time.time() 

148 

149 # Register with PeerLink 

150 peer_link_id = self._register_peer_link() 

151 

152 # Subscribe to EventBus for task dispatch 

153 self._subscribe_events() 

154 

155 # Initialize local instruction queue 

156 self._init_instruction_queue() 

157 

158 with self._lock: 

159 self._peer_link_id = peer_link_id 

160 self.status = STATUS_IDLE 

161 

162 # Emit connection event 

163 self._emit_event(EVENT_SESSION_CONNECTED, { 

164 'session_id': self.session_id, 

165 'user_id': self.user_id, 

166 'capabilities': self.capabilities, 

167 'task_scope': self.task_scope, 

168 'peer_type': PEER_TYPE, 

169 }) 

170 

171 logger.info( 

172 "Hive session connected: session=%s user=%s scope=%s peer=%s", 

173 self.session_id, self.user_id, self.task_scope, 

174 peer_link_id or 'local-only', 

175 ) 

176 

177 return { 

178 'success': True, 

179 'session_id': self.session_id, 

180 'user_id': self.user_id, 

181 'status': STATUS_IDLE, 

182 'peer_link_id': peer_link_id, 

183 'capabilities': self.capabilities, 

184 'task_scope': self.task_scope, 

185 } 

186 

187 def disconnect(self) -> Dict: 

188 """Unregister from PeerLink, unsubscribe events, flush tasks. 

189 

190 Returns: 

191 Summary of session including final stats 

192 """ 

193 with self._lock: 

194 if self.status == STATUS_DISCONNECTED: 

195 return {'success': True, 'message': 'Already disconnected'} 

196 

197 prev_status = self.status 

198 self.status = STATUS_DISCONNECTED 

199 session_id = self.session_id 

200 final_stats = dict(self.stats) 

201 pending_count = len(self._pending_tasks) 

202 

203 # Unsubscribe from EventBus 

204 self._unsubscribe_events() 

205 

206 # Unregister from PeerLink 

207 self._unregister_peer_link() 

208 

209 # Flush pending tasks back to hive queue 

210 if pending_count > 0: 

211 self._flush_pending_tasks() 

212 

213 # Emit disconnection event 

214 self._emit_event(EVENT_SESSION_DISCONNECTED, { 

215 'session_id': session_id, 

216 'user_id': self.user_id, 

217 'final_stats': final_stats, 

218 }) 

219 

220 with self._lock: 

221 self._peer_link_id = None 

222 self.current_task = None 

223 self.session_id = '' 

224 

225 logger.info( 

226 "Hive session disconnected: session=%s stats=%s flushed=%d", 

227 session_id, final_stats, pending_count, 

228 ) 

229 

230 return { 

231 'success': True, 

232 'session_id': session_id, 

233 'previous_status': prev_status, 

234 'stats': final_stats, 

235 'flushed_tasks': pending_count, 

236 } 

237 

238 # ─── Task Reception ────────────────────────────────────────────── 

239 

240 def receive_task(self, task: Dict) -> bool: 

241 """Accept a coding task from the hive dispatcher. 

242 

243 Validates: 

244 1. Session is idle or has room in pending queue 

245 2. Task origin signature (master key verification) 

246 3. Task scope matches session scope setting 

247 4. Privacy filtering via shard engine 

248 

249 Args: 

250 task: Dict with keys: task_id, description, origin_signature, 

251 scope_level, target_files, test_expectations, etc. 

252 

253 Returns: 

254 True if task was accepted and queued 

255 """ 

256 task_id = task.get('task_id', 'unknown') 

257 

258 with self._lock: 

259 if self.status not in (STATUS_IDLE, STATUS_WORKING): 

260 logger.info("Task %s rejected: session status=%s", task_id, self.status) 

261 return False 

262 

263 if len(self._pending_tasks) >= MAX_PENDING_TASKS: 

264 logger.info("Task %s rejected: pending queue full (%d)", 

265 task_id, MAX_PENDING_TASKS) 

266 return False 

267 

268 # Validate task origin (master key signature) 

269 if not self._verify_task_origin(task): 

270 logger.warning("Task %s rejected: invalid origin signature", task_id) 

271 return False 

272 

273 # Check scope compatibility 

274 if not self._check_scope_match(task): 

275 logger.info("Task %s rejected: scope mismatch (session=%s)", 

276 task_id, self.task_scope) 

277 return False 

278 

279 # Privacy filter through shard engine 

280 filtered_task = self._apply_shard_filter(task) 

281 if filtered_task is None: 

282 logger.warning("Task %s rejected: shard filter denied", task_id) 

283 return False 

284 

285 # Queue the task 

286 with self._lock: 

287 filtered_task['received_at'] = time.time() 

288 filtered_task['status'] = 'pending' 

289 self._pending_tasks.append(filtered_task) 

290 

291 # Also add to instruction queue for dependency ordering 

292 self._enqueue_instruction(filtered_task) 

293 

294 logger.info("Task %s accepted (pending=%d)", task_id, 

295 len(self._pending_tasks)) 

296 

297 # If the session is idle, immediately start working on the task 

298 with self._lock: 

299 is_idle = (self.status == STATUS_IDLE and self.current_task is None) 

300 if is_idle: 

301 self._execute_next_task() 

302 

303 return True 

304 

305 def _execute_next_task(self): 

306 """Pick the highest-priority pending task and execute it. 

307 

308 Called after receive_task() when idle, or by the daemon tick. 

309 Runs synchronously -- caller should invoke from a background thread 

310 if non-blocking execution is required. 

311 """ 

312 with self._lock: 

313 if self.status not in (STATUS_IDLE,): 

314 return # Already working or paused/disconnected 

315 if not self._pending_tasks: 

316 return 

317 

318 # Sort by priority (descending), then by received_at (ascending) 

319 self._pending_tasks.sort( 

320 key=lambda t: (-t.get('priority', 0), t.get('received_at', 0)) 

321 ) 

322 task = self._pending_tasks[0] 

323 

324 task_id = task.get('task_id', 'unknown') 

325 logger.info("Executing next task: %s", task_id) 

326 

327 # Execute the task through the full pipeline 

328 result = self._execute_task_steps(task) 

329 

330 # Report the result back to the hive 

331 self._report_result(task_id, result) 

332 

333 def _execute_task_steps(self, task: Dict) -> Dict: 

334 """Full task execution pipeline: 

335 

336 1. Read instructions from task payload / shard 

337 2. Apply shard filter for privacy 

338 3. Execute via instruction queue + CREATE/REUSE pipeline 

339 4. Collect and return results 

340 

341 Args: 

342 task: Filtered task dict from the pending queue. 

343 

344 Returns: 

345 Result dict compatible with execute_task() output. 

346 """ 

347 task_id = task.get('task_id', 'unknown') 

348 start_time = time.time() 

349 

350 with self._lock: 

351 self.status = STATUS_WORKING 

352 self.current_task = task 

353 

354 result = { 

355 'task_id': task_id, 

356 'status': 'pending', 

357 'changes': [], 

358 'test_results': None, 

359 'error': None, 

360 'duration_s': 0.0, 

361 'complexity_score': 0, 

362 } 

363 

364 try: 

365 # Step 1: Read instructions (may come from shard or full payload) 

366 description = task.get('description', '') 

367 instructions = task.get('instructions', description) 

368 target_files = task.get('target_files', task.get('files_scope', [])) 

369 

370 # Step 2: If shard was provided, use shard content 

371 shard = task.get('shard') 

372 if shard and isinstance(shard, dict): 

373 # Merge shard interface info into instructions 

374 shard_content = shard.get('full_content', '') 

375 if shard_content: 

376 instructions = ( 

377 f"{instructions}\n\n--- Shard context ---\n{shard_content}" 

378 ) 

379 

380 # Step 3: Execute via instruction queue + pipeline 

381 plan = self._build_execution_plan(instructions, target_files) 

382 execution_result = self._dispatch_to_pipeline( 

383 instructions, target_files, plan) 

384 

385 if execution_result.get('error'): 

386 result['status'] = 'error' 

387 result['error'] = execution_result['error'] 

388 else: 

389 result['status'] = 'completed' 

390 result['changes'] = execution_result.get('changes', []) 

391 result['test_results'] = execution_result.get('test_results') 

392 

393 # Step 4: Score complexity 

394 result['complexity_score'] = self._score_complexity( 

395 target_files, result['changes']) 

396 

397 except Exception as e: 

398 logger.error("Task %s execution steps error: %s", task_id, e) 

399 result['status'] = 'error' 

400 result['error'] = str(e) 

401 

402 result['duration_s'] = round(time.time() - start_time, 2) 

403 

404 # Restore session state 

405 with self._lock: 

406 self.current_task = None 

407 self.status = STATUS_IDLE 

408 

409 # Remove from pending queue 

410 self._pending_tasks = [ 

411 t for t in self._pending_tasks 

412 if t.get('task_id') != task_id 

413 ] 

414 

415 return result 

416 

417 def _report_result(self, task_id: str, result: Dict): 

418 """Report task result back to the hive. 

419 

420 1. Calls report_result() for local stats + PeerLink + EventBus 

421 2. Notifies HiveTaskDispatcher.on_task_result() for Spark award 

422 3. Emits EVENT_TASK_COMPLETED 

423 """ 

424 # Update local stats and publish via PeerLink/EventBus 

425 self.report_result(task_id, result) 

426 

427 # Notify the HiveTaskDispatcher for validation + Spark distribution 

428 try: 

429 from integrations.coding_agent.hive_task_protocol import get_dispatcher 

430 dispatcher = get_dispatcher() 

431 

432 # Build result dict expected by the dispatcher 

433 dispatcher_result = { 

434 'files_changed': [c.get('file') for c in result.get('changes', [])], 

435 'diff': '\n'.join( 

436 c.get('diff', '') for c in result.get('changes', [])), 

437 'tests_passed': ( 

438 'passed' in (result.get('test_results') or '').lower() 

439 if result.get('test_results') else None 

440 ), 

441 'test_output': result.get('test_results'), 

442 'error': result.get('error'), 

443 } 

444 

445 reward_info = dispatcher.on_task_result(task_id, dispatcher_result) 

446 if reward_info.get('spark_awarded', 0) > 0: 

447 logger.info( 

448 "Task %s earned %d Spark (quality=%.3f)", 

449 task_id, reward_info['spark_awarded'], 

450 reward_info.get('quality_score', 0.0), 

451 ) 

452 except ImportError: 

453 logger.debug("HiveTaskDispatcher not available for result reporting") 

454 except Exception as e: 

455 logger.debug("Dispatcher result notification failed: %s", e) 

456 

457 # ─── Task Execution ────────────────────────────────────────────── 

458 

459 def execute_task(self, task: Dict) -> Dict: 

460 """Execute a coding task using Claude Code capabilities. 

461 

462 This is the bridge between hive dispatch and Claude Code's 

463 native ability to read files, edit code, and run tests. 

464 

465 Does NOT auto-commit -- returns proposed changes for user approval. 

466 

467 Args: 

468 task: Filtered task dict with description, target_files, etc. 

469 

470 Returns: 

471 Result dict: {task_id, status, changes, test_results, error, 

472 duration_s, complexity_score} 

473 """ 

474 task_id = task.get('task_id', 'unknown') 

475 start_time = time.time() 

476 

477 with self._lock: 

478 if self.status == STATUS_PAUSED: 

479 return { 

480 'task_id': task_id, 

481 'status': 'rejected', 

482 'error': 'Session is paused', 

483 'changes': [], 

484 'test_results': None, 

485 } 

486 

487 self.status = STATUS_WORKING 

488 self.current_task = task 

489 

490 result = { 

491 'task_id': task_id, 

492 'status': 'pending', 

493 'changes': [], 

494 'test_results': None, 

495 'error': None, 

496 'duration_s': 0.0, 

497 'complexity_score': 0, 

498 } 

499 

500 try: 

501 # 1. Parse task instructions 

502 description = task.get('description', '') 

503 target_files = task.get('target_files', []) 

504 test_expectations = task.get('test_expectations', []) 

505 

506 # 2. Build execution plan 

507 plan = self._build_execution_plan(description, target_files) 

508 

509 # 3. Dispatch through CREATE/REUSE pipeline 

510 execution_result = self._dispatch_to_pipeline( 

511 description, target_files, plan) 

512 

513 if execution_result.get('error'): 

514 result['status'] = 'error' 

515 result['error'] = execution_result['error'] 

516 else: 

517 result['status'] = 'completed' 

518 result['changes'] = execution_result.get('changes', []) 

519 result['test_results'] = execution_result.get('test_results') 

520 

521 # 4. Score complexity 

522 result['complexity_score'] = self._score_complexity( 

523 target_files, result['changes']) 

524 

525 except Exception as e: 

526 logger.error("Task %s execution error: %s", task_id, e) 

527 result['status'] = 'error' 

528 result['error'] = str(e) 

529 

530 result['duration_s'] = round(time.time() - start_time, 2) 

531 

532 # Update session state 

533 with self._lock: 

534 self.current_task = None 

535 self.status = STATUS_IDLE 

536 

537 # Remove from pending 

538 self._pending_tasks = [ 

539 t for t in self._pending_tasks 

540 if t.get('task_id') != task_id 

541 ] 

542 

543 return result 

544 

545 def report_result(self, task_id: str, result: Dict) -> bool: 

546 """Publish task result back to the hive via PeerLink + EventBus. 

547 

548 Includes quality metrics and triggers Spark reward calculation. 

549 

550 Args: 

551 task_id: The completed task's ID 

552 result: Output from execute_task() 

553 

554 Returns: 

555 True if result was successfully published 

556 """ 

557 with self._lock: 

558 session_id = self.session_id 

559 user_id = self.user_id 

560 if not session_id: 

561 return False 

562 

563 quality_score = self._compute_quality_score(result) 

564 spark_reward = self._calculate_spark_reward(result, quality_score) 

565 

566 report = { 

567 'session_id': session_id, 

568 'user_id': user_id, 

569 'task_id': task_id, 

570 'status': result.get('status', 'unknown'), 

571 'changes_count': len(result.get('changes', [])), 

572 'test_results': result.get('test_results'), 

573 'error': result.get('error'), 

574 'duration_s': result.get('duration_s', 0), 

575 'quality_score': quality_score, 

576 'spark_reward': spark_reward, 

577 'complexity_score': result.get('complexity_score', 0), 

578 'reported_at': time.time(), 

579 } 

580 

581 # Update local stats 

582 with self._lock: 

583 if result.get('status') == 'completed': 

584 self.stats['tasks_completed'] += 1 

585 total = self.stats['tasks_completed'] 

586 old_total_q = self.stats['total_quality_points'] 

587 new_total_q = old_total_q + quality_score 

588 self.stats['total_quality_points'] = new_total_q 

589 self.stats['avg_quality_score'] = ( 

590 round(new_total_q / total, 3) if total > 0 else 0.0 

591 ) 

592 else: 

593 self.stats['tasks_failed'] += 1 

594 self.stats['spark_earned'] += spark_reward 

595 

596 # Add to completed history (capped) 

597 self._completed_tasks.append(report) 

598 if len(self._completed_tasks) > self._max_completed_history: 

599 self._completed_tasks = self._completed_tasks[ 

600 -self._max_completed_history:] 

601 

602 # Publish via PeerLink 

603 published = self._publish_via_peer_link('dispatch', { 

604 'type': 'task_result', 

605 'payload': report, 

606 }) 

607 

608 # Emit EventBus event 

609 self._emit_event(EVENT_TASK_COMPLETED, report) 

610 

611 # Record Spark reward 

612 self._record_spark_reward(user_id, task_id, spark_reward) 

613 

614 logger.info( 

615 "Task %s result reported: status=%s quality=%.2f spark=%d", 

616 task_id, result.get('status'), quality_score, spark_reward, 

617 ) 

618 return published or True # Event was emitted even if PeerLink unavailable 

619 

620 # ─── Status & Configuration ────────────────────────────────────── 

621 

622 def get_status(self) -> Dict: 

623 """Return current session state, stats, and active task info.""" 

624 with self._lock: 

625 return { 

626 'session_id': self.session_id, 

627 'user_id': self.user_id, 

628 'status': self.status, 

629 'task_scope': self.task_scope, 

630 'capabilities': self.capabilities, 

631 'current_task': ( 

632 {'task_id': self.current_task.get('task_id'), 

633 'description': self.current_task.get('description', '')[:200]} 

634 if self.current_task else None 

635 ), 

636 'pending_tasks': len(self._pending_tasks), 

637 'completed_tasks': len(self._completed_tasks), 

638 'stats': dict(self.stats), 

639 'peer_link_id': self._peer_link_id, 

640 } 

641 

642 def set_task_scope(self, scope: str) -> Dict: 

643 """Update what tasks this session accepts. 

644 

645 Args: 

646 scope: 'own_repos', 'public', or 'any' 

647 

648 Returns: 

649 Result dict with success flag 

650 """ 

651 if scope not in TASK_SCOPES: 

652 return { 

653 'success': False, 

654 'error': f'Invalid scope: {scope}. Must be one of {TASK_SCOPES}', 

655 } 

656 with self._lock: 

657 old_scope = self.task_scope 

658 self.task_scope = scope 

659 logger.info("Task scope changed: %s -> %s", old_scope, scope) 

660 return {'success': True, 'previous_scope': old_scope, 'scope': scope} 

661 

662 def pause(self) -> Dict: 

663 """Temporarily stop accepting new tasks. 

664 

665 Current task continues if one is in progress. 

666 """ 

667 with self._lock: 

668 if self.status == STATUS_DISCONNECTED: 

669 return {'success': False, 'error': 'Not connected'} 

670 if self.status == STATUS_PAUSED: 

671 return {'success': True, 'message': 'Already paused'} 

672 prev = self.status 

673 self.status = STATUS_PAUSED 

674 logger.info("Hive session paused (was %s)", prev) 

675 return {'success': True, 'previous_status': prev, 'status': STATUS_PAUSED} 

676 

677 def resume(self) -> Dict: 

678 """Resume accepting tasks after a pause.""" 

679 with self._lock: 

680 if self.status != STATUS_PAUSED: 

681 return { 

682 'success': False, 

683 'error': f'Not paused (status={self.status})', 

684 } 

685 self.status = STATUS_IDLE 

686 logger.info("Hive session resumed") 

687 return {'success': True, 'status': STATUS_IDLE} 

688 

689 def get_tasks(self) -> Dict: 

690 """List pending and completed tasks.""" 

691 with self._lock: 

692 pending = [ 

693 {'task_id': t.get('task_id'), 'description': t.get('description', '')[:200], 

694 'received_at': t.get('received_at')} 

695 for t in self._pending_tasks 

696 ] 

697 completed = [ 

698 {'task_id': t.get('task_id'), 'status': t.get('status'), 

699 'quality_score': t.get('quality_score'), 

700 'spark_reward': t.get('spark_reward'), 

701 'reported_at': t.get('reported_at')} 

702 for t in self._completed_tasks[-50:] # Last 50 

703 ] 

704 return {'pending': pending, 'completed': completed} 

705 

706 # ─── Internal: PeerLink Integration ────────────────────────────── 

707 

708 def _register_peer_link(self) -> Optional[str]: 

709 """Register this session as a CODING_AGENT peer in PeerLinkManager.""" 

710 try: 

711 from core.peer_link.link_manager import get_link_manager 

712 manager = get_link_manager() 

713 if not manager: 

714 return None 

715 

716 # Generate a peer ID for this session 

717 node_id = os.environ.get('HEVOLVE_NODE_ID', '') 

718 if not node_id: 

719 node_id = hashlib.sha256( 

720 f"{self.user_id}:{self.session_id}".encode() 

721 ).hexdigest()[:16] 

722 

723 peer_id = f"{PEER_TYPE}_{node_id}_{self.session_id}" 

724 

725 # Advertise capabilities so the hive dispatcher can match tasks 

726 manager.broadcast('dispatch', { 

727 'type': 'peer_announce', 

728 'peer_type': PEER_TYPE, 

729 'peer_id': peer_id, 

730 'user_id': self.user_id, 

731 'session_id': self.session_id, 

732 'capabilities': self.capabilities, 

733 'task_scope': self.task_scope, 

734 }) 

735 

736 return peer_id 

737 except ImportError: 

738 logger.debug("PeerLink not available, running in local-only mode") 

739 return None 

740 except Exception as e: 

741 logger.warning("PeerLink registration failed: %s", e) 

742 return None 

743 

744 def _unregister_peer_link(self): 

745 """Unregister from PeerLink on disconnect.""" 

746 try: 

747 from core.peer_link.link_manager import get_link_manager 

748 manager = get_link_manager() 

749 if not manager: 

750 return 

751 

752 manager.broadcast('dispatch', { 

753 'type': 'peer_depart', 

754 'peer_type': PEER_TYPE, 

755 'peer_id': self._peer_link_id or '', 

756 'user_id': self.user_id, 

757 'session_id': self.session_id, 

758 }) 

759 except Exception: 

760 pass 

761 

762 def _publish_via_peer_link(self, channel: str, data: Dict) -> bool: 

763 """Publish data via PeerLink broadcast.""" 

764 try: 

765 from core.peer_link.link_manager import get_link_manager 

766 manager = get_link_manager() 

767 if manager: 

768 sent = manager.broadcast(channel, data) 

769 return sent > 0 

770 except Exception as e: 

771 logger.debug("PeerLink publish failed: %s", e) 

772 return False 

773 

774 # ─── Internal: EventBus Integration ────────────────────────────── 

775 

776 def _subscribe_events(self): 

777 """Subscribe to hive task dispatch events on the platform EventBus.""" 

778 try: 

779 from core.platform.registry import get_registry 

780 registry = get_registry() 

781 if not registry or not registry.has('events'): 

782 return 

783 bus = registry.get('events') 

784 

785 def on_task_dispatched(topic, data): 

786 """Handle incoming task from hive dispatcher.""" 

787 if not isinstance(data, dict): 

788 return 

789 # Only accept tasks targeted at us or broadcast 

790 target = data.get('target_session') 

791 if target and target != self.session_id: 

792 return 

793 self.receive_task(data) 

794 

795 bus.on(EVENT_TASK_DISPATCHED, on_task_dispatched) 

796 self._event_subscriptions.append( 

797 (EVENT_TASK_DISPATCHED, on_task_dispatched)) 

798 except Exception as e: 

799 logger.debug("EventBus subscription failed: %s", e) 

800 

801 def _unsubscribe_events(self): 

802 """Unsubscribe all EventBus listeners.""" 

803 try: 

804 from core.platform.registry import get_registry 

805 registry = get_registry() 

806 if not registry or not registry.has('events'): 

807 return 

808 bus = registry.get('events') 

809 for topic, callback in self._event_subscriptions: 

810 bus.off(topic, callback) 

811 except Exception: 

812 pass 

813 self._event_subscriptions.clear() 

814 

815 def _emit_event(self, topic: str, data: Any): 

816 """Emit an event on the platform EventBus (best-effort).""" 

817 try: 

818 from core.platform.events import emit_event 

819 emit_event(topic, data) 

820 except Exception: 

821 pass 

822 

823 # ─── Internal: Instruction Queue ───────────────────────────────── 

824 

825 def _init_instruction_queue(self): 

826 """Initialize local instruction queue for incoming tasks.""" 

827 try: 

828 from integrations.agent_engine.instruction_queue import get_queue 

829 self._instruction_queue = get_queue(self.user_id) 

830 except Exception as e: 

831 logger.debug("Instruction queue init failed: %s", e) 

832 self._instruction_queue = None 

833 

834 def _enqueue_instruction(self, task: Dict): 

835 """Add a task to the instruction queue for dependency ordering.""" 

836 try: 

837 from integrations.agent_engine.instruction_queue import enqueue_instruction 

838 enqueue_instruction( 

839 user_id=self.user_id, 

840 text=task.get('description', '')[:2000], 

841 priority=task.get('priority', 5), 

842 tags=['hive_task', PEER_TYPE], 

843 context={ 

844 'task_id': task.get('task_id'), 

845 'source': 'hive_session', 

846 'session_id': self.session_id, 

847 'target_files': task.get('target_files', []), 

848 }, 

849 ) 

850 except Exception as e: 

851 logger.debug("Instruction enqueue failed: %s", e) 

852 

853 # ─── Internal: Security ────────────────────────────────────────── 

854 

855 def _verify_task_origin(self, task: Dict) -> bool: 

856 """Verify that a task was dispatched by a legitimate hive node. 

857 

858 Uses master key signature verification to prevent rogue dispatchers. 

859 Tasks without a signature are accepted only from SAME_USER trust level. 

860 """ 

861 signature = task.get('origin_signature', '') 

862 if not signature: 

863 # Allow unsigned tasks only from SAME_USER trust (own devices) 

864 trust = task.get('trust_level', '') 

865 if trust == 'same_user': 

866 return True 

867 logger.debug("Task %s has no signature and trust=%s", 

868 task.get('task_id'), trust) 

869 return False 

870 

871 try: 

872 from security.master_key import verify_master_signature 

873 # Build payload to verify (exclude the signature field itself) 

874 payload = {k: v for k, v in task.items() 

875 if k != 'origin_signature'} 

876 return verify_master_signature(payload, signature) 

877 except ImportError: 

878 logger.warning("master_key module unavailable — rejecting task (cannot verify)") 

879 return False 

880 except Exception as e: 

881 logger.warning("Task origin verification failed: %s", e) 

882 return False 

883 

884 def _check_scope_match(self, task: Dict) -> bool: 

885 """Check whether a task matches this session's scope setting.""" 

886 task_scope_level = task.get('scope_level', 'any') 

887 

888 if self.task_scope == 'any': 

889 return True 

890 if self.task_scope == 'public': 

891 return task_scope_level in ('public', 'own_repos') 

892 if self.task_scope == 'own_repos': 

893 # Only accept tasks for repos owned by this user 

894 task_owner = task.get('repo_owner', '') 

895 return task_owner == self.user_id or task_scope_level == 'own_repos' 

896 return False 

897 

898 def _apply_shard_filter(self, task: Dict) -> Optional[Dict]: 

899 """Filter task content through the shard engine based on trust level. 

900 

901 - SAME_USER trust: FULL_FILE scope (see everything) 

902 - PEER trust: INTERFACES scope (signatures + types only) 

903 - RELAY/unknown: SIGNATURES scope (minimal) 

904 """ 

905 trust = task.get('trust_level', 'relay') 

906 

907 try: 

908 from integrations.agent_engine.shard_engine import ( 

909 ShardScope, ShardEngine, get_shard_engine, 

910 ) 

911 

912 if trust == 'same_user': 

913 scope = ShardScope.FULL_FILE 

914 elif trust == 'peer': 

915 scope = ShardScope.INTERFACES 

916 else: 

917 scope = ShardScope.SIGNATURES 

918 

919 # If task has raw file content and we need to filter it down, 

920 # run it through the shard engine 

921 if task.get('full_content') and scope != ShardScope.FULL_FILE: 

922 engine = get_shard_engine() 

923 shard = engine.create_shard( 

924 task=task.get('description', ''), 

925 target_files=task.get('target_files', []), 

926 scope=scope, 

927 ) 

928 # Replace full content with filtered view 

929 filtered = dict(task) 

930 filtered['full_content'] = shard.full_content 

931 filtered['interface_specs'] = [ 

932 s.file_path for s in shard.interface_specs 

933 ] 

934 filtered['shard_scope'] = scope.value 

935 return filtered 

936 

937 except ImportError: 

938 logger.debug("Shard engine not available, passing task as-is") 

939 except Exception as e: 

940 logger.warning("Shard filter error: %s", e) 

941 

942 # Pass through unmodified (no shard engine or no filtering needed) 

943 return dict(task) 

944 

945 # ─── Internal: Task Execution ──────────────────────────────────── 

946 

947 def _build_execution_plan(self, description: str, 

948 target_files: List[str]) -> Dict: 

949 """Build an execution plan from task description. 

950 

951 Returns a structured plan dict that the pipeline can execute. 

952 """ 

953 plan = { 

954 'description': description, 

955 'steps': [], 

956 } 

957 

958 # Determine steps from description 

959 if target_files: 

960 plan['steps'].append({ 

961 'action': 'read_files', 

962 'files': target_files, 

963 }) 

964 

965 plan['steps'].append({ 

966 'action': 'execute_instructions', 

967 'description': description, 

968 }) 

969 

970 if target_files: 

971 plan['steps'].append({ 

972 'action': 'run_tests', 

973 'scope': 'affected', 

974 }) 

975 

976 return plan 

977 

978 def _dispatch_to_pipeline(self, description: str, 

979 target_files: List[str], 

980 plan: Dict) -> Dict: 

981 """Dispatch the task through the CREATE/REUSE pipeline. 

982 

983 Uses dispatch_goal() (3-tier: in-process, HTTP, llama.cpp fallback) 

984 to execute the coding task through the standard agent pipeline. 

985 """ 

986 result = { 

987 'changes': [], 

988 'test_results': None, 

989 'error': None, 

990 } 

991 

992 prompt = self._build_task_prompt(description, target_files) 

993 

994 try: 

995 from integrations.agent_engine.dispatch import dispatch_goal 

996 

997 # Generate a deterministic goal_id from task content 

998 goal_id = hashlib.sha256( 

999 f"hive:{self.session_id}:{description[:200]}".encode() 

1000 ).hexdigest()[:16] 

1001 

1002 response = dispatch_goal( 

1003 prompt=prompt, 

1004 user_id=self.user_id, 

1005 goal_id=goal_id, 

1006 goal_type='coding', 

1007 ) 

1008 

1009 if response: 

1010 result['changes'] = self._parse_changes_from_response(response) 

1011 result['test_results'] = self._extract_test_results(response) 

1012 else: 

1013 result['error'] = 'Pipeline returned no response' 

1014 

1015 except Exception as e: 

1016 result['error'] = f'Pipeline dispatch failed: {e}' 

1017 

1018 return result 

1019 

1020 def _build_task_prompt(self, description: str, 

1021 target_files: List[str]) -> str: 

1022 """Build a structured prompt for the CREATE/REUSE pipeline.""" 

1023 parts = [ 

1024 f"HIVE CODING TASK (session {self.session_id}):", 

1025 f"\n{description}", 

1026 ] 

1027 if target_files: 

1028 parts.append(f"\nTarget files: {', '.join(target_files)}") 

1029 parts.append("\nProvide changes as file diffs. Do NOT commit.") 

1030 return '\n'.join(parts) 

1031 

1032 def _parse_changes_from_response(self, response: str) -> List[Dict]: 

1033 """Extract file changes from pipeline response.""" 

1034 changes = [] 

1035 # Look for diff-like patterns in the response 

1036 current_file = None 

1037 current_diff_lines = [] 

1038 

1039 for line in response.split('\n'): 

1040 if line.startswith('--- a/') or line.startswith('+++ b/'): 

1041 fname = line[6:].strip() 

1042 if line.startswith('+++ b/') and fname: 

1043 current_file = fname 

1044 elif line.startswith('@@') and current_file: 

1045 if current_diff_lines and current_file: 

1046 changes.append({ 

1047 'file': current_file, 

1048 'diff': '\n'.join(current_diff_lines), 

1049 }) 

1050 current_diff_lines = [line] 

1051 elif current_file and (line.startswith('+') or line.startswith('-') 

1052 or line.startswith(' ')): 

1053 current_diff_lines.append(line) 

1054 

1055 # Flush last change 

1056 if current_file and current_diff_lines: 

1057 changes.append({ 

1058 'file': current_file, 

1059 'diff': '\n'.join(current_diff_lines), 

1060 }) 

1061 

1062 return changes 

1063 

1064 def _extract_test_results(self, response: str) -> Optional[str]: 

1065 """Extract test results from pipeline response, if any.""" 

1066 # Look for common test output patterns 

1067 markers = ['PASSED', 'FAILED', 'ERROR', 'tests passed', 

1068 'test session', 'pytest'] 

1069 for marker in markers: 

1070 idx = response.lower().find(marker.lower()) 

1071 if idx >= 0: 

1072 # Return surrounding context 

1073 start = max(0, idx - 200) 

1074 end = min(len(response), idx + 500) 

1075 return response[start:end] 

1076 return None 

1077 

1078 # ─── Internal: Quality & Rewards ───────────────────────────────── 

1079 

1080 def _compute_quality_score(self, result: Dict) -> float: 

1081 """Compute a quality score for a completed task (0.0 to 1.0).""" 

1082 if result.get('status') != 'completed': 

1083 return 0.0 

1084 

1085 score = 0.5 # Base score for completion 

1086 

1087 # Bonus for having changes 

1088 changes = result.get('changes', []) 

1089 if changes: 

1090 score += 0.2 

1091 

1092 # Bonus for passing tests 

1093 test_results = result.get('test_results', '') 

1094 if test_results: 

1095 lower = test_results.lower() 

1096 if 'passed' in lower and 'failed' not in lower: 

1097 score += 0.3 

1098 elif 'passed' in lower: 

1099 score += 0.1 

1100 

1101 return min(1.0, round(score, 3)) 

1102 

1103 def _calculate_spark_reward(self, result: Dict, 

1104 quality_score: float) -> int: 

1105 """Calculate Spark token reward for a completed task.""" 

1106 if result.get('status') != 'completed': 

1107 return 0 

1108 

1109 complexity = result.get('complexity_score', 1) 

1110 base = SPARK_BASE_REWARD 

1111 reward = base + (complexity * SPARK_COMPLEXITY_MULTIPLIER) 

1112 

1113 # Quality bonus 

1114 if quality_score >= SPARK_QUALITY_BONUS_THRESHOLD: 

1115 reward = int(reward * 1.5) 

1116 

1117 return int(reward) 

1118 

1119 def _score_complexity(self, target_files: List[str], 

1120 changes: List[Dict]) -> int: 

1121 """Score task complexity (1-10) based on files and changes.""" 

1122 score = 1 

1123 score += min(len(target_files), 3) # Up to +3 for multi-file 

1124 score += min(len(changes), 3) # Up to +3 for many changes 

1125 

1126 # Count total diff lines 

1127 total_lines = sum( 

1128 len(c.get('diff', '').split('\n')) for c in changes 

1129 ) 

1130 if total_lines > 100: 

1131 score += 2 

1132 elif total_lines > 30: 

1133 score += 1 

1134 

1135 return min(10, score) 

1136 

1137 def _record_spark_reward(self, user_id: str, task_id: str, 

1138 spark_amount: int): 

1139 """Record Spark reward in the hosting reward system.""" 

1140 if spark_amount <= 0: 

1141 return 

1142 try: 

1143 from integrations.agent_engine.hosting_reward_service import ( 

1144 get_hosting_reward_service, 

1145 ) 

1146 svc = get_hosting_reward_service() 

1147 if svc: 

1148 svc.record_contribution( 

1149 user_id=user_id, 

1150 contribution_type='hive_coding_task', 

1151 amount=spark_amount, 

1152 metadata={'task_id': task_id, 'source': 'claude_hive_session'}, 

1153 ) 

1154 except Exception as e: 

1155 logger.debug("Spark reward recording failed: %s", e) 

1156 

1157 # ─── Internal: Flush on Disconnect ─────────────────────────────── 

1158 

1159 def _flush_pending_tasks(self): 

1160 """Return pending tasks to the hive queue on disconnect.""" 

1161 with self._lock: 

1162 tasks = list(self._pending_tasks) 

1163 self._pending_tasks.clear() 

1164 

1165 for task in tasks: 

1166 try: 

1167 self._publish_via_peer_link('dispatch', { 

1168 'type': 'task_returned', 

1169 'task_id': task.get('task_id'), 

1170 'reason': 'session_disconnect', 

1171 'session_id': self.session_id, 

1172 }) 

1173 except Exception: 

1174 pass 

1175 

1176 

1177# ═════════════════════════════════════════════════════════════════════ 

1178# Module-level Singleton 

1179# ═════════════════════════════════════════════════════════════════════ 

1180 

1181_session: Optional[ClaudeHiveSession] = None 

1182_session_lock = threading.Lock() 

1183 

1184 

1185def get_hive_session() -> ClaudeHiveSession: 

1186 """Get or create the ClaudeHiveSession singleton.""" 

1187 global _session 

1188 if _session is None: 

1189 with _session_lock: 

1190 if _session is None: 

1191 _session = ClaudeHiveSession() 

1192 return _session 

1193 

1194 

1195class SessionRegistry: 

1196 """Registry of all active Claude Code hive sessions. 

1197 

1198 The HiveTaskDispatcher.match_session() queries this to find 

1199 available sessions for task assignment. Tracks both the local 

1200 singleton session and any remote sessions announced via PeerLink. 

1201 

1202 Thread-safe: all mutations are guarded by ``_lock``. 

1203 """ 

1204 

1205 def __init__(self): 

1206 self._lock = threading.Lock() 

1207 # session_id -> ClaudeHiveSession or Dict (remote announcement) 

1208 self._sessions: Dict[str, Any] = {} 

1209 

1210 def register(self, session) -> None: 

1211 """Register a session (local ClaudeHiveSession or remote announcement dict). 

1212 

1213 Args: 

1214 session: A ClaudeHiveSession instance or a dict with at minimum 

1215 'session_id' and 'status' keys. 

1216 """ 

1217 sid = (session.session_id if hasattr(session, 'session_id') 

1218 else session.get('session_id', '')) 

1219 if not sid: 

1220 return 

1221 with self._lock: 

1222 self._sessions[sid] = session 

1223 logger.debug("Session registered in registry: %s", sid) 

1224 

1225 def unregister(self, session_id: str) -> None: 

1226 """Remove a session from the registry. 

1227 

1228 Args: 

1229 session_id: The session ID to remove. 

1230 """ 

1231 with self._lock: 

1232 removed = self._sessions.pop(session_id, None) 

1233 if removed: 

1234 logger.debug("Session unregistered from registry: %s", session_id) 

1235 

1236 def get_session(self, session_id: str): 

1237 """Get a specific session by ID. 

1238 

1239 Returns: 

1240 ClaudeHiveSession instance or None. 

1241 """ 

1242 with self._lock: 

1243 entry = self._sessions.get(session_id) 

1244 

1245 # If it's a dict (remote announcement), return None 

1246 # (only real session objects can execute tasks) 

1247 if entry and hasattr(entry, 'receive_task'): 

1248 return entry 

1249 

1250 # Fallback: check the module-level singleton 

1251 session = get_hive_session() 

1252 if session.session_id == session_id: 

1253 return session 

1254 return None 

1255 

1256 def get_available_sessions(self) -> List[Dict]: 

1257 """Return list of session info dicts available for task assignment. 

1258 

1259 Each dict contains the keys expected by HiveTaskDispatcher.match_session(): 

1260 - session_id, status, languages, quality_score, region, capabilities 

1261 

1262 Returns: 

1263 List of session info dicts. 

1264 """ 

1265 results: List[Dict] = [] 

1266 

1267 # Always include the local singleton if it's active 

1268 local = get_hive_session() 

1269 if local.session_id and local.status in (STATUS_IDLE, STATUS_WORKING): 

1270 results.append(self._session_to_dict(local)) 

1271 

1272 # Include registered remote sessions 

1273 with self._lock: 

1274 for sid, entry in self._sessions.items(): 

1275 if hasattr(entry, 'session_id'): 

1276 # Local ClaudeHiveSession instance 

1277 if entry.session_id == local.session_id: 

1278 continue # Already included above 

1279 if entry.status in (STATUS_IDLE, STATUS_WORKING): 

1280 results.append(self._session_to_dict(entry)) 

1281 elif isinstance(entry, dict): 

1282 # Remote announcement dict 

1283 status = entry.get('status', '') 

1284 if status in (STATUS_IDLE, STATUS_WORKING): 

1285 results.append(entry) 

1286 

1287 return results 

1288 

1289 @staticmethod 

1290 def _session_to_dict(session) -> Dict: 

1291 """Convert a ClaudeHiveSession to the dict format expected by the dispatcher.""" 

1292 caps = getattr(session, 'capabilities', {}) or {} 

1293 stats = getattr(session, 'stats', {}) or {} 

1294 return { 

1295 'session_id': session.session_id, 

1296 'user_id': getattr(session, 'user_id', ''), 

1297 'status': session.status, 

1298 'languages': caps.get('languages', []), 

1299 'frameworks': caps.get('frameworks', []), 

1300 'capabilities': caps, 

1301 'task_scope': getattr(session, 'task_scope', 'own_repos'), 

1302 'quality_score': stats.get('avg_quality_score', 0.5), 

1303 'region': os.environ.get('HEVOLVE_REGION', ''), 

1304 'peer_link_id': getattr(session, '_peer_link_id', ''), 

1305 } 

1306 

1307 

1308_registry = None 

1309_registry_lock = threading.Lock() 

1310 

1311 

1312def get_session_registry() -> SessionRegistry: 

1313 """Get or create the SessionRegistry singleton.""" 

1314 global _registry 

1315 if _registry is None: 

1316 with _registry_lock: 

1317 if _registry is None: 

1318 _registry = SessionRegistry() 

1319 return _registry 

1320 

1321 

1322# ═════════════════════════════════════════════════════════════════════ 

1323# Flask Blueprint 

1324# ═════════════════════════════════════════════════════════════════════ 

1325 

1326_blueprint: Optional[Any] = None 

1327_blueprint_lock = threading.Lock() 

1328 

1329 

1330def _create_blueprint(): 

1331 """Create the hive session Flask Blueprint.""" 

1332 try: 

1333 from flask import Blueprint, request, jsonify 

1334 except ImportError: 

1335 return None 

1336 

1337 bp = Blueprint('hive_session', __name__) 

1338 

1339 @bp.route('/api/hive/session/connect', methods=['POST']) 

1340 def api_connect(): 

1341 data = request.get_json(silent=True) or {} 

1342 user_id = data.get('user_id', '') 

1343 if not user_id: 

1344 return jsonify({'success': False, 'error': 'user_id is required'}), 400 

1345 

1346 capabilities = data.get('capabilities') 

1347 task_scope = data.get('task_scope', 'own_repos') 

1348 

1349 session = get_hive_session() 

1350 result = session.connect( 

1351 user_id=user_id, 

1352 capabilities=capabilities, 

1353 task_scope=task_scope, 

1354 ) 

1355 

1356 status_code = 200 if result.get('success') else 400 

1357 return jsonify(result), status_code 

1358 

1359 @bp.route('/api/hive/session/disconnect', methods=['POST']) 

1360 def api_disconnect(): 

1361 session = get_hive_session() 

1362 result = session.disconnect() 

1363 return jsonify(result), 200 

1364 

1365 @bp.route('/api/hive/session/status', methods=['GET']) 

1366 def api_status(): 

1367 session = get_hive_session() 

1368 return jsonify(session.get_status()), 200 

1369 

1370 @bp.route('/api/hive/session/pause', methods=['POST']) 

1371 def api_pause(): 

1372 session = get_hive_session() 

1373 result = session.pause() 

1374 status_code = 200 if result.get('success') else 400 

1375 return jsonify(result), status_code 

1376 

1377 @bp.route('/api/hive/session/resume', methods=['POST']) 

1378 def api_resume(): 

1379 session = get_hive_session() 

1380 result = session.resume() 

1381 status_code = 200 if result.get('success') else 400 

1382 return jsonify(result), status_code 

1383 

1384 @bp.route('/api/hive/session/scope', methods=['POST']) 

1385 def api_set_scope(): 

1386 data = request.get_json(silent=True) or {} 

1387 scope = data.get('scope', '') 

1388 if not scope: 

1389 return jsonify({'success': False, 'error': 'scope is required'}), 400 

1390 

1391 session = get_hive_session() 

1392 result = session.set_task_scope(scope) 

1393 status_code = 200 if result.get('success') else 400 

1394 return jsonify(result), status_code 

1395 

1396 @bp.route('/api/hive/session/tasks', methods=['GET']) 

1397 def api_tasks(): 

1398 session = get_hive_session() 

1399 return jsonify(session.get_tasks()), 200 

1400 

1401 @bp.route('/api/hive/session/task/<task_id>/result', methods=['POST']) 

1402 def api_task_result(task_id): 

1403 """Report a task result externally (e.g., from a remote session). 

1404 

1405 Body JSON: {status, changes, test_results, error, duration_s, ...} 

1406 """ 

1407 data = request.get_json(silent=True) or {} 

1408 if not task_id: 

1409 return jsonify({'success': False, 'error': 'task_id is required'}), 400 

1410 

1411 session = get_hive_session() 

1412 if session.status == STATUS_DISCONNECTED: 

1413 return jsonify({'success': False, 'error': 'Session not connected'}), 400 

1414 

1415 # Build result dict from request body 

1416 result = { 

1417 'task_id': task_id, 

1418 'status': data.get('status', 'completed'), 

1419 'changes': data.get('changes', []), 

1420 'test_results': data.get('test_results'), 

1421 'error': data.get('error'), 

1422 'duration_s': data.get('duration_s', 0.0), 

1423 'complexity_score': data.get('complexity_score', 0), 

1424 } 

1425 

1426 published = session.report_result(task_id, result) 

1427 

1428 # Also notify the dispatcher if available 

1429 reward_info = {} 

1430 try: 

1431 from integrations.coding_agent.hive_task_protocol import get_dispatcher 

1432 dispatcher = get_dispatcher() 

1433 dispatcher_result = { 

1434 'files_changed': [c.get('file') for c in result.get('changes', [])], 

1435 'diff': '\n'.join( 

1436 c.get('diff', '') for c in result.get('changes', [])), 

1437 'tests_passed': data.get('tests_passed'), 

1438 'test_output': data.get('test_results'), 

1439 'error': data.get('error'), 

1440 } 

1441 reward_info = dispatcher.on_task_result(task_id, dispatcher_result) 

1442 except Exception: 

1443 pass 

1444 

1445 return jsonify({ 

1446 'success': True, 

1447 'task_id': task_id, 

1448 'published': published, 

1449 'reward': reward_info, 

1450 }), 200 

1451 

1452 return bp 

1453 

1454 

1455def get_blueprint(): 

1456 """Get or create the hive_session Flask Blueprint. 

1457 

1458 Returns None if Flask is not installed. 

1459 """ 

1460 global _blueprint 

1461 if _blueprint is None: 

1462 with _blueprint_lock: 

1463 if _blueprint is None: 

1464 _blueprint = _create_blueprint() 

1465 return _blueprint 

1466 

1467 

1468# Public alias matching the naming convention in hive_signal_bridge.py 

1469create_hive_session_blueprint = _create_blueprint 

1470 

1471 

1472# Convenience alias for registration in hart_intelligence_entry.py: 

1473# from integrations.coding_agent.claude_hive_session import hive_session_bp 

1474# if hive_session_bp: app.register_blueprint(hive_session_bp) 

1475hive_session_bp = get_blueprint()