Coverage for integrations / agent_engine / instruction_queue.py: 88.5%

489 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Instruction Queue — Never miss a user instruction. 

3 

4Every user instruction is immediately queued and registered with SmartLedger 

5for LLM-based dependency analysis. When compute becomes available, 

6instructions are pulled in proper dependency order (not just priority), 

7correlated with existing context, and executed — individually or as a batch. 

8 

9SmartLedger integration: 

10 enqueue() → ledger.add_dynamic_task() (LLM classifies deps) 

11 pull_batch() → ledger.get_next/parallel() (respects dep graph) 

12 complete_batch() → ledger.complete_task_and_route() (unblocks dependents) 

13 

14Falls back to simple priority ordering when SmartLedger is unavailable 

15(e.g. no LLM reachable, agent_ledger package not installed). 

16 

17Batch mode: If multiple instructions queue up before compute arrives, 

18they are consolidated into a single prompt that fits the context window. 

19Related instructions are grouped, duplicates deduplicated, dependencies 

20ordered by the ledger's task graph. 

21 

22Architecture: 

23 User says "do X" → enqueue_instruction(user_id, "do X") 

24 

25 ├─ Stored in agent_data/instructions/{user_id}_queue.json 

26 ├─ Registered with SmartLedger via add_dynamic_task(): 

27 │ ├─ LLM classifies relationship to existing tasks 

28 │ ├─ Sets prerequisites, blockers, execution mode 

29 │ └─ Determines parallel vs sequential ordering 

30 ├─ Correlated with: 

31 │ ├─ Conversation state (recent messages) 

32 │ ├─ Related queued instructions (semantic grouping) 

33 │ ├─ Active goals (avoid duplication) 

34 │ └─ Ledger tasks (dependency tracking) 

35 

36 └─ When compute arrives: 

37 ├─ pull_batch(user_id, max_tokens=...) 

38 │ └─ Uses ledger.get_next_executable_task() for dep-aware ordering 

39 ├─ Execute via /chat or dispatch.py 

40 └─ complete_batch() → ledger.complete_task_and_route() 

41 

42Integration points: 

43 - hart_intelligence /chat endpoint: auto-enqueue if compute busy 

44 - hart_cli.py: `hart -p "task"` enqueues if server busy 

45 - dispatch.py: pull_batch when idle compute detected 

46 - agent_daemon.py: periodic batch check on tick 

47""" 

48 

49import json 

50import logging 

51import os 

52import threading 

53import time 

54import hashlib 

55from datetime import datetime 

56from enum import Enum 

57from typing import Any, Dict, List, Optional, Tuple 

58 

59logger = logging.getLogger('hevolve.instruction_queue') 

60 

61# SmartLedger — optional, graceful fallback when unavailable 

62try: 

63 from agent_ledger import SmartLedger 

64 _LEDGER_AVAILABLE = True 

65except ImportError: 

66 SmartLedger = None 

67 _LEDGER_AVAILABLE = False 

68 

69_QUEUE_DIR = os.path.join( 

70 os.environ.get('HART_INSTALL_DIR', 

71 os.path.dirname(os.path.dirname(os.path.dirname( 

72 os.path.abspath(__file__))))), 

73 'agent_data', 'instructions', 

74) 

75 

76 

77class InstructionStatus(str, Enum): 

78 QUEUED = 'queued' # Waiting for compute 

79 BATCHED = 'batched' # Included in a batch, not yet executed 

80 IN_PROGRESS = 'in_progress' # Currently being executed 

81 DONE = 'done' # Successfully completed 

82 FAILED = 'failed' # Execution failed 

83 CANCELLED = 'cancelled' # User cancelled 

84 

85 

86class Instruction: 

87 """A single user instruction with metadata.""" 

88 

89 __slots__ = ( 

90 'id', 'user_id', 'text', 'status', 'created_at', 

91 'updated_at', 'priority', 'tags', 'context', 

92 'related_goal_id', 'batch_id', 'result', 'error', 

93 ) 

94 

95 def __init__(self, user_id: str, text: str, priority: int = 5, 

96 tags: Optional[List[str]] = None, 

97 context: Optional[Dict] = None, 

98 related_goal_id: Optional[str] = None): 

99 self.id = hashlib.sha256( 

100 f"{user_id}:{text}:{time.time()}".encode() 

101 ).hexdigest()[:16] 

102 self.user_id = user_id 

103 self.text = text 

104 self.status = InstructionStatus.QUEUED 

105 self.created_at = datetime.utcnow().isoformat() 

106 self.updated_at = self.created_at 

107 self.priority = priority # 1=highest, 10=lowest 

108 self.tags = tags or [] 

109 self.context = context or {} 

110 self.related_goal_id = related_goal_id 

111 self.batch_id = None 

112 self.result = None 

113 self.error = None 

114 

115 def to_dict(self) -> Dict: 

116 return { 

117 'id': self.id, 

118 'user_id': self.user_id, 

119 'text': self.text, 

120 'status': self.status.value if isinstance(self.status, InstructionStatus) else self.status, 

121 'created_at': self.created_at, 

122 'updated_at': self.updated_at, 

123 'priority': self.priority, 

124 'tags': self.tags, 

125 'context': self.context, 

126 'related_goal_id': self.related_goal_id, 

127 'batch_id': self.batch_id, 

128 'result': self.result, 

129 'error': self.error, 

130 } 

131 

132 @classmethod 

133 def from_dict(cls, d: Dict) -> 'Instruction': 

134 inst = cls.__new__(cls) 

135 inst.id = d['id'] 

136 inst.user_id = d['user_id'] 

137 inst.text = d['text'] 

138 inst.status = InstructionStatus(d.get('status', 'queued')) 

139 inst.created_at = d.get('created_at', '') 

140 inst.updated_at = d.get('updated_at', '') 

141 inst.priority = d.get('priority', 5) 

142 inst.tags = d.get('tags', []) 

143 inst.context = d.get('context', {}) 

144 inst.related_goal_id = d.get('related_goal_id') 

145 inst.batch_id = d.get('batch_id') 

146 inst.result = d.get('result') 

147 inst.error = d.get('error') 

148 return inst 

149 

150 

151class InstructionBatch: 

152 """A consolidated batch of related instructions.""" 

153 

154 def __init__(self, batch_id: str, instructions: List[Instruction], 

155 consolidated_prompt: str): 

156 self.batch_id = batch_id 

157 self.instructions = instructions 

158 self.consolidated_prompt = consolidated_prompt 

159 self.created_at = datetime.utcnow().isoformat() 

160 self.token_estimate = self._estimate_tokens(consolidated_prompt) 

161 

162 @staticmethod 

163 def _estimate_tokens(text: str) -> int: 

164 """Rough token estimate (~4 chars per token for English).""" 

165 return len(text) // 4 

166 

167 def to_dict(self) -> Dict: 

168 return { 

169 'batch_id': self.batch_id, 

170 'instruction_ids': [i.id for i in self.instructions], 

171 'consolidated_prompt': self.consolidated_prompt, 

172 'created_at': self.created_at, 

173 'token_estimate': self.token_estimate, 

174 'instruction_count': len(self.instructions), 

175 } 

176 

177 

178class ExecutionPlan: 

179 """Dependency-aware execution plan for queued instructions. 

180 

181 Splits instructions into: 

182 - parallel_groups: list of groups that can run concurrently within each group 

183 - sequential_chain: ordered list where each depends on the previous 

184 

185 Dispatcher executes all items in a parallel group concurrently, 

186 waits for completion, then moves to the next group. 

187 

188 Example for instructions: [A, B, C, D, E] 

189 If A,B are independent and C depends on A, D depends on B, E depends on C+D: 

190 waves = [[A, B], [C, D], [E]] 

191 Wave 1: dispatch A and B in parallel 

192 Wave 2: after A+B complete, dispatch C and D in parallel 

193 Wave 3: after C+D complete, dispatch E 

194 """ 

195 

196 def __init__(self, waves: List[List[Instruction]], batch_id: str): 

197 self.waves = waves # List of parallel groups in dependency order 

198 self.batch_id = batch_id 

199 self.created_at = datetime.utcnow().isoformat() 

200 self.total_instructions = sum(len(w) for w in waves) 

201 

202 def to_dict(self) -> Dict: 

203 return { 

204 'batch_id': self.batch_id, 

205 'waves': [ 

206 [{'id': i.id, 'text': i.text[:100]} for i in wave] 

207 for wave in self.waves 

208 ], 

209 'total_instructions': self.total_instructions, 

210 'wave_count': len(self.waves), 

211 } 

212 

213 

214class InstructionQueue: 

215 """Persistent instruction queue per user. 

216 

217 Thread-safe. File-backed (JSON). Survives restarts. 

218 Uses SmartLedger for LLM-based dependency analysis between instructions. 

219 

220 Concurrency guarantees: 

221 - _lock (RLock): serializes ALL state mutations (instructions, ledger, file I/O) 

222 - _drain_lock (Lock): prevents concurrent drains for same user 

223 - _save() uses atomic write (temp + rename) to prevent corruption 

224 - SmartLedger is always accessed inside _lock — no separate ledger lock needed 

225 - Cross-process safety: file lock via _QUEUE_DIR/{user}_drain.lock 

226 """ 

227 

228 def __init__(self, user_id: str): 

229 self.user_id = user_id 

230 self._lock = threading.RLock() # Protects all state mutations 

231 self._drain_lock = threading.Lock() # Prevents concurrent drains 

232 self._queue_path = os.path.join(_QUEUE_DIR, f'{user_id}_queue.json') 

233 self._drain_lock_path = os.path.join(_QUEUE_DIR, f'{user_id}_drain.lock') 

234 self._instructions: Dict[str, Instruction] = {} 

235 self._ledger = None # Lazy-initialized SmartLedger 

236 self._task_map: Dict[str, str] = {} # instruction_id → ledger_task_id 

237 self._load() 

238 

239 def acquire_drain_lock(self, timeout: float = 0) -> bool: 

240 """Acquire exclusive drain lock for this user's queue. 

241 

242 Prevents concurrent drains (daemon tick + API call + another agent). 

243 Uses both an in-process threading.Lock AND a filesystem lock file 

244 for cross-process safety (daemon vs. API server in separate processes). 

245 

246 Args: 

247 timeout: Seconds to wait. 0 = non-blocking (return False immediately if busy). 

248 

249 Returns: 

250 True if lock acquired, False if another drain is in progress. 

251 """ 

252 acquired = self._drain_lock.acquire(timeout=timeout) if timeout > 0 else self._drain_lock.acquire(blocking=False) 

253 if not acquired: 

254 return False 

255 

256 # Cross-process file lock: write PID to lock file 

257 try: 

258 os.makedirs(os.path.dirname(self._drain_lock_path), exist_ok=True) 

259 if os.path.exists(self._drain_lock_path): 

260 try: 

261 with open(self._drain_lock_path, 'r') as f: 

262 lock_data = json.load(f) 

263 lock_pid = lock_data.get('pid', -1) 

264 lock_time = lock_data.get('time', 0) 

265 # Check if lock holder is still alive 

266 if lock_pid != os.getpid(): 

267 try: 

268 os.kill(lock_pid, 0) # Check if process exists 

269 # Process alive — check staleness (10 min max) 

270 if time.time() - lock_time < 600: 

271 self._drain_lock.release() 

272 return False 

273 # Stale lock — take it 

274 except (OSError, ProcessLookupError): 

275 pass # Dead process — take the lock 

276 except (json.JSONDecodeError, IOError): 

277 pass # Corrupt lock file — take it 

278 

279 with open(self._drain_lock_path, 'w') as f: 

280 json.dump({'pid': os.getpid(), 'time': time.time(), 

281 'user_id': self.user_id}, f) 

282 except IOError: 

283 pass # File lock is best-effort — thread lock still held 

284 

285 return True 

286 

287 def release_drain_lock(self): 

288 """Release the drain lock.""" 

289 try: 

290 if os.path.exists(self._drain_lock_path): 

291 os.remove(self._drain_lock_path) 

292 except OSError: 

293 pass 

294 try: 

295 self._drain_lock.release() 

296 except RuntimeError: 

297 pass # Already released 

298 

299 def _get_ledger(self): 

300 """Get or create SmartLedger for this user's instruction queue. 

301 

302 The ledger provides LLM-based dependency analysis between queued 

303 instructions via add_dynamic_task(). Returns None when SmartLedger 

304 is unavailable — caller must fall back to simple priority ordering. 

305 """ 

306 if self._ledger is not None: 

307 return self._ledger 

308 if not _LEDGER_AVAILABLE: 

309 return None 

310 try: 

311 # Dedicated ledger for the instruction queue — separate from 

312 # per-prompt action ledgers in create_recipe.py. 

313 # agent_id = 'iq_{user}' distinguishes this from action ledgers. 

314 self._ledger = SmartLedger( 

315 agent_id=f'iq_{self.user_id}', 

316 session_id=f'{self.user_id}_instruction_queue', 

317 ) 

318 logger.info(f"SmartLedger initialized for instruction queue [{self.user_id}]") 

319 return self._ledger 

320 except Exception as e: 

321 logger.debug(f"SmartLedger unavailable for instruction queue: {e}") 

322 return None 

323 

324 def _register_with_ledger(self, inst: 'Instruction') -> Optional[str]: 

325 """Register an instruction with SmartLedger for dependency analysis. 

326 

327 Calls ledger.add_dynamic_task() which uses LLM to classify the 

328 relationship between this instruction and all existing tasks: 

329 - child/sibling/sequential/conditional/independent 

330 - prerequisites, blockers, execution mode (parallel/sequential) 

331 - delegation needs, scheduling, retry config 

332 

333 Returns the ledger task_id on success, None on failure. 

334 """ 

335 ledger = self._get_ledger() 

336 if ledger is None: 

337 return None 

338 

339 try: 

340 task = ledger.add_dynamic_task( 

341 task_description=inst.text, 

342 context={ 

343 'current_action_id': None, 

344 'previous_outcome': None, 

345 'user_message': inst.text, 

346 'discovered_by': 'instruction_queue', 

347 'instruction_id': inst.id, 

348 'priority': inst.priority, 

349 'tags': inst.tags, 

350 'related_goal_id': inst.related_goal_id, 

351 }, 

352 ) 

353 if task: 

354 task_id = task.task_id 

355 self._task_map[inst.id] = task_id 

356 inst.context['ledger_task_id'] = task_id 

357 logger.info( 

358 f"Instruction [{inst.id}] registered with ledger as [{task_id}]" 

359 ) 

360 return task_id 

361 except Exception as e: 

362 logger.debug(f"Ledger registration failed for [{inst.id}]: {e}") 

363 return None 

364 

365 def _load(self): 

366 """Load queue from disk. Auto-detects encrypted vs plaintext.""" 

367 if not os.path.exists(self._queue_path): 

368 return 

369 try: 

370 try: 

371 from security.crypto import decrypt_json_file 

372 data = decrypt_json_file(self._queue_path) 

373 except ImportError: 

374 with open(self._queue_path, 'r', encoding='utf-8') as f: 

375 data = json.load(f) 

376 if data is None: 

377 return 

378 for d in data.get('instructions', []): 

379 inst = Instruction.from_dict(d) 

380 self._instructions[inst.id] = inst 

381 # Restore task map from persisted context 

382 ltid = inst.context.get('ledger_task_id') 

383 if ltid: 

384 self._task_map[inst.id] = ltid 

385 except (json.JSONDecodeError, IOError, KeyError) as e: 

386 logger.warning(f"Failed to load instruction queue: {e}") 

387 

388 def _save(self): 

389 """Persist queue to disk using atomic write (temp + rename). 

390 

391 Encrypted at rest when HEVOLVE_DATA_KEY is configured. 

392 Prevents corruption if process crashes mid-write: the rename is 

393 atomic on POSIX and near-atomic on Windows (NTFS MoveFileEx). 

394 """ 

395 os.makedirs(os.path.dirname(self._queue_path), exist_ok=True) 

396 data = { 

397 'user_id': self.user_id, 

398 'updated_at': datetime.utcnow().isoformat(), 

399 'instructions': [i.to_dict() for i in self._instructions.values()], 

400 } 

401 tmp_path = self._queue_path + '.tmp' 

402 try: 

403 try: 

404 from security.crypto import encrypt_json_file 

405 encrypt_json_file(tmp_path, data) 

406 except ImportError: 

407 with open(tmp_path, 'w', encoding='utf-8') as f: 

408 json.dump(data, f, indent=2) 

409 f.flush() 

410 os.fsync(f.fileno()) 

411 # Atomic rename (replaces target on POSIX; os.replace on all platforms) 

412 os.replace(tmp_path, self._queue_path) 

413 except IOError as e: 

414 logger.error(f"Failed to save instruction queue: {e}") 

415 # Clean up temp file on failure 

416 try: 

417 os.remove(tmp_path) 

418 except OSError: 

419 pass 

420 

421 def enqueue(self, text: str, priority: int = 5, 

422 tags: Optional[List[str]] = None, 

423 context: Optional[Dict] = None, 

424 related_goal_id: Optional[str] = None) -> Instruction: 

425 """Add an instruction to the queue. 

426 

427 The instruction is persisted to JSON AND registered with SmartLedger 

428 for LLM-based dependency analysis. The ledger classifies how this 

429 instruction relates to all other queued instructions (child, sibling, 

430 sequential, conditional, independent) and sets prerequisites/blockers. 

431 

432 When SmartLedger is unavailable, falls back to simple priority ordering. 

433 """ 

434 with self._lock: 

435 # Deduplicate: if exact same text is already queued, skip 

436 for inst in self._instructions.values(): 

437 if inst.text == text and inst.status == InstructionStatus.QUEUED: 

438 logger.debug(f"Duplicate instruction skipped: {text[:50]}...") 

439 return inst 

440 

441 inst = Instruction( 

442 user_id=self.user_id, 

443 text=text, 

444 priority=priority, 

445 tags=tags, 

446 context=context, 

447 related_goal_id=related_goal_id, 

448 ) 

449 self._instructions[inst.id] = inst 

450 

451 # Register with SmartLedger for LLM dependency analysis. 

452 # This call may invoke the LLM to classify task relationships. 

453 self._register_with_ledger(inst) 

454 

455 self._save() 

456 logger.info(f"Instruction queued [{inst.id}]: {text[:80]}...") 

457 return inst 

458 

459 def get_pending(self) -> List[Instruction]: 

460 """Get all queued (unconsumed) instructions, sorted by priority then time.""" 

461 with self._lock: 

462 pending = [ 

463 i for i in self._instructions.values() 

464 if i.status == InstructionStatus.QUEUED 

465 ] 

466 pending.sort(key=lambda i: (i.priority, i.created_at)) 

467 return pending 

468 

469 def get_all(self) -> List[Instruction]: 

470 """Get all instructions regardless of status.""" 

471 with self._lock: 

472 return list(self._instructions.values()) 

473 

474 def mark_status(self, instruction_id: str, status: InstructionStatus, 

475 result: Optional[str] = None, 

476 error: Optional[str] = None): 

477 """Update instruction status.""" 

478 with self._lock: 

479 inst = self._instructions.get(instruction_id) 

480 if inst: 

481 inst.status = status 

482 inst.updated_at = datetime.utcnow().isoformat() 

483 if result: 

484 inst.result = result 

485 if error: 

486 inst.error = error 

487 self._save() 

488 

489 def cancel(self, instruction_id: str): 

490 """Cancel a queued instruction.""" 

491 self.mark_status(instruction_id, InstructionStatus.CANCELLED) 

492 

493 def _get_ledger_ordered_pending(self) -> List['Instruction']: 

494 """Get pending instructions ordered by SmartLedger dependency graph. 

495 

496 Uses get_next_executable_task() and get_parallel_executable_tasks() 

497 to build a dependency-aware execution order. Tasks whose 

498 prerequisites aren't met yet are excluded (they'll be pulled in 

499 a future batch after their dependencies complete). 

500 

501 Returns simple priority-sorted list when ledger is unavailable. 

502 """ 

503 pending = self.get_pending() 

504 if not pending: 

505 return [] 

506 

507 ledger = self._get_ledger() 

508 if ledger is None or not self._task_map: 

509 return pending # Fallback: simple priority order 

510 

511 # Build reverse map: ledger_task_id → Instruction 

512 task_to_inst: Dict[str, 'Instruction'] = {} 

513 unmapped: List['Instruction'] = [] 

514 for inst in pending: 

515 ltid = self._task_map.get(inst.id) 

516 if ltid and ltid in ledger.tasks: 

517 task_to_inst[ltid] = inst 

518 else: 

519 unmapped.append(inst) 

520 

521 if not task_to_inst: 

522 return pending # No mapped tasks, use simple ordering 

523 

524 # Collect executable tasks from ledger in dependency order 

525 ordered: List['Instruction'] = [] 

526 seen: set = set() 

527 

528 # First: all parallel-ready tasks (can run concurrently) 

529 try: 

530 parallel = ledger.get_parallel_executable_tasks() 

531 for task in parallel: 

532 if task.task_id in task_to_inst and task.task_id not in seen: 

533 ordered.append(task_to_inst[task.task_id]) 

534 seen.add(task.task_id) 

535 except Exception as e: 

536 logger.debug(f"Ledger parallel query failed: {e}") 

537 

538 # Then: sequential tasks in dependency order 

539 try: 

540 max_iter = len(task_to_inst) + 5 # safety bound 

541 for _ in range(max_iter): 

542 task = ledger.get_next_executable_task() 

543 if task is None: 

544 break 

545 if task.task_id in task_to_inst and task.task_id not in seen: 

546 ordered.append(task_to_inst[task.task_id]) 

547 seen.add(task.task_id) 

548 elif task.task_id in seen: 

549 # Already included, skip to avoid infinite loop 

550 break 

551 else: 

552 break # Not an instruction queue task 

553 except Exception as e: 

554 logger.debug(f"Ledger sequential query failed: {e}") 

555 

556 # Append any mapped but not-yet-executable instructions at the end 

557 # (their deps aren't met — they'll wait for next batch) 

558 for ltid, inst in task_to_inst.items(): 

559 if ltid not in seen: 

560 ordered.append(inst) 

561 

562 # Append unmapped instructions last 

563 ordered.extend(unmapped) 

564 

565 logger.info( 

566 f"Ledger-ordered batch: {len(ordered)} instructions " 

567 f"({len(seen)} dependency-resolved, {len(unmapped)} unmapped)" 

568 ) 

569 return ordered 

570 

571 def pull_batch(self, max_tokens: int = 8000, 

572 max_instructions: int = 20) -> Optional[InstructionBatch]: 

573 """Pull queued instructions into a consolidated batch. 

574 

575 Uses SmartLedger dependency graph to order instructions correctly: 

576 - Prerequisites are satisfied before dependents 

577 - Parallel-safe tasks are grouped together 

578 - Sequential chains maintain proper order 

579 

580 Falls back to priority+tag ordering when ledger is unavailable. 

581 

582 Args: 

583 max_tokens: Maximum estimated tokens for the batch prompt 

584 max_instructions: Maximum number of instructions in one batch 

585 

586 Returns: 

587 InstructionBatch if there are pending instructions, else None 

588 """ 

589 with self._lock: 

590 # Use ledger-aware ordering when available 

591 pending = self._get_ledger_ordered_pending() 

592 if not pending: 

593 return None 

594 

595 # Select instructions that fit within token budget 

596 selected = [] 

597 total_chars = 0 

598 char_budget = max_tokens * 4 # ~4 chars per token 

599 

600 for inst in pending[:max_instructions]: 

601 inst_chars = len(inst.text) + len(json.dumps(inst.context)) 

602 if total_chars + inst_chars > char_budget and selected: 

603 break # Stop adding — budget exceeded 

604 selected.append(inst) 

605 total_chars += inst_chars 

606 

607 if not selected: 

608 return None 

609 

610 # Generate batch ID 

611 batch_id = hashlib.sha256( 

612 f"batch:{self.user_id}:{time.time()}".encode() 

613 ).hexdigest()[:12] 

614 

615 # Consolidate into a single prompt 

616 prompt = self._consolidate(selected) 

617 

618 # Mark as batched 

619 for inst in selected: 

620 inst.status = InstructionStatus.BATCHED 

621 inst.batch_id = batch_id 

622 inst.updated_at = datetime.utcnow().isoformat() 

623 

624 self._save() 

625 

626 batch = InstructionBatch(batch_id, selected, prompt) 

627 logger.info( 

628 f"Batch [{batch_id}]: {len(selected)} instructions, " 

629 f"~{batch.token_estimate} tokens" 

630 ) 

631 return batch 

632 

633 def pull_execution_plan(self, max_tokens: int = 8000, 

634 max_instructions: int = 20) -> Optional[ExecutionPlan]: 

635 """Pull queued instructions as a dependency-aware execution plan. 

636 

637 Returns an ExecutionPlan with waves of instructions: 

638 - Each wave is a group of independent instructions (dispatch in parallel) 

639 - Waves are ordered by dependency (wave N+1 depends on wave N) 

640 

641 When SmartLedger is unavailable, all instructions go into a single wave 

642 (effectively the same as pull_batch but wrapped in ExecutionPlan). 

643 

644 Args: 

645 max_tokens: Maximum estimated tokens across all waves 

646 max_instructions: Maximum total instructions 

647 

648 Returns: 

649 ExecutionPlan with parallel waves, or None if queue empty 

650 """ 

651 with self._lock: 

652 pending = self.get_pending() 

653 if not pending: 

654 return None 

655 

656 # Budget filter 

657 selected = [] 

658 total_chars = 0 

659 char_budget = max_tokens * 4 

660 for inst in pending[:max_instructions]: 

661 inst_chars = len(inst.text) + len(json.dumps(inst.context)) 

662 if total_chars + inst_chars > char_budget and selected: 

663 break 

664 selected.append(inst) 

665 total_chars += inst_chars 

666 

667 if not selected: 

668 return None 

669 

670 batch_id = hashlib.sha256( 

671 f"plan:{self.user_id}:{time.time()}".encode() 

672 ).hexdigest()[:12] 

673 

674 # Build waves using ledger dependency graph 

675 waves = self._build_waves(selected) 

676 

677 # Mark all as batched 

678 for inst in selected: 

679 inst.status = InstructionStatus.BATCHED 

680 inst.batch_id = batch_id 

681 inst.updated_at = datetime.utcnow().isoformat() 

682 self._save() 

683 

684 plan = ExecutionPlan(waves, batch_id) 

685 logger.info( 

686 f"Execution plan [{batch_id}]: {plan.total_instructions} " 

687 f"instructions in {len(waves)} waves" 

688 ) 

689 return plan 

690 

691 def _build_waves(self, instructions: List[Instruction]) -> List[List[Instruction]]: 

692 """Split instructions into dependency-ordered parallel waves. 

693 

694 Uses SmartLedger to determine which instructions are independent 

695 (can run in same wave) vs dependent (must run in later waves). 

696 

697 Wave 0: all instructions with no prerequisites 

698 Wave 1: instructions whose prerequisites are all in wave 0 

699 Wave N: instructions whose prerequisites are all in waves < N 

700 

701 Falls back to single wave when ledger is unavailable. 

702 """ 

703 ledger = self._get_ledger() 

704 if ledger is None or not self._task_map: 

705 # No ledger — everything in one wave 

706 return [instructions] 

707 

708 # Build maps 

709 inst_by_id: Dict[str, Instruction] = {i.id: i for i in instructions} 

710 inst_by_task: Dict[str, Instruction] = {} 

711 task_by_inst: Dict[str, str] = {} 

712 unmapped: List[Instruction] = [] 

713 

714 for inst in instructions: 

715 ltid = self._task_map.get(inst.id) 

716 if ltid and ltid in ledger.tasks: 

717 inst_by_task[ltid] = inst 

718 task_by_inst[inst.id] = ltid 

719 else: 

720 unmapped.append(inst) 

721 

722 if not inst_by_task: 

723 return [instructions] 

724 

725 # Collect task IDs that are in our instruction set 

726 our_task_ids = set(inst_by_task.keys()) 

727 

728 # Build dependency graph restricted to our task set 

729 # For each task, find which of OUR tasks it depends on 

730 deps: Dict[str, set] = {} # task_id → set of task_ids it depends on 

731 for tid in our_task_ids: 

732 task = ledger.tasks.get(tid) 

733 if task is None: 

734 deps[tid] = set() 

735 continue 

736 task_deps = set() 

737 # Check prerequisites 

738 if hasattr(task, 'prerequisites') and task.prerequisites: 

739 for prereq in task.prerequisites: 

740 if prereq in our_task_ids: 

741 task_deps.add(prereq) 

742 # Check blocked_by 

743 if hasattr(task, 'blocked_by') and task.blocked_by: 

744 for blocker in task.blocked_by: 

745 if blocker in our_task_ids: 

746 task_deps.add(blocker) 

747 # Check parent (child must wait for parent's other children) 

748 if hasattr(task, 'parent_task_id') and task.parent_task_id: 

749 if task.parent_task_id in our_task_ids: 

750 task_deps.add(task.parent_task_id) 

751 deps[tid] = task_deps 

752 

753 # Topological sort into waves (Kahn's algorithm by level) 

754 waves: List[List[Instruction]] = [] 

755 placed: set = set() 

756 remaining = set(our_task_ids) 

757 

758 max_iterations = len(our_task_ids) + 1 

759 for _ in range(max_iterations): 

760 if not remaining: 

761 break 

762 # Find tasks whose deps are all placed 

763 wave_tasks = [ 

764 tid for tid in remaining 

765 if deps[tid].issubset(placed) 

766 ] 

767 if not wave_tasks: 

768 # Circular dependency — dump remaining into last wave 

769 wave_tasks = list(remaining) 

770 wave = [inst_by_task[tid] for tid in wave_tasks if tid in inst_by_task] 

771 if wave: 

772 waves.append(wave) 

773 placed.update(wave_tasks) 

774 remaining -= set(wave_tasks) 

775 

776 # Unmapped instructions go in the first wave (no known deps) 

777 if unmapped: 

778 if waves: 

779 waves[0] = unmapped + waves[0] 

780 else: 

781 waves = [unmapped] 

782 

783 return waves if waves else [instructions] 

784 

785 def complete_instruction(self, instruction_id: str, result: Optional[str] = None): 

786 """Mark a single instruction as done and notify ledger. 

787 

788 Used by parallel dispatch to complete individual instructions 

789 (vs complete_batch which completes all at once). 

790 """ 

791 with self._lock: 

792 inst = self._instructions.get(instruction_id) 

793 if not inst: 

794 return 

795 inst.status = InstructionStatus.DONE 

796 inst.updated_at = datetime.utcnow().isoformat() 

797 if result: 

798 inst.result = result 

799 # Notify ledger — unblocks dependents 

800 ledger = self._get_ledger() 

801 if ledger is not None: 

802 ltid = self._task_map.get(instruction_id) 

803 if ltid: 

804 try: 

805 ledger.complete_task_and_route( 

806 ltid, outcome='success', result=result, 

807 ) 

808 except Exception as e: 

809 logger.debug(f"Ledger completion failed for [{ltid}]: {e}") 

810 self._save() 

811 

812 def fail_instruction(self, instruction_id: str, error: str): 

813 """Mark a single instruction as failed and return to queue.""" 

814 with self._lock: 

815 inst = self._instructions.get(instruction_id) 

816 if not inst: 

817 return 

818 inst.status = InstructionStatus.QUEUED 

819 inst.batch_id = None 

820 inst.error = error 

821 inst.updated_at = datetime.utcnow().isoformat() 

822 ledger = self._get_ledger() 

823 if ledger is not None: 

824 ltid = self._task_map.get(instruction_id) 

825 if ltid: 

826 try: 

827 ledger.complete_task_and_route( 

828 ltid, outcome='failure', result=error, 

829 ) 

830 except Exception as e: 

831 logger.debug(f"Ledger failure routing for [{ltid}]: {e}") 

832 self._save() 

833 

834 def _consolidate(self, instructions: List[Instruction]) -> str: 

835 """Consolidate multiple instructions into a single prompt. 

836 

837 Groups by tags, adds context, maintains priority order. 

838 """ 

839 if len(instructions) == 1: 

840 inst = instructions[0] 

841 prompt = inst.text 

842 if inst.context: 

843 ctx_str = json.dumps(inst.context, indent=2) 

844 prompt = f"Context:\n{ctx_str}\n\nInstruction:\n{inst.text}" 

845 return prompt 

846 

847 # Multiple instructions → structured batch 

848 lines = [ 

849 f"You have {len(instructions)} queued instructions to execute.", 

850 "Process them in the order listed. Each instruction may depend on previous ones.", 

851 "", 

852 ] 

853 

854 # Group by tags 

855 tagged: Dict[str, List[Instruction]] = {} 

856 untagged: List[Instruction] = [] 

857 for inst in instructions: 

858 if inst.tags: 

859 key = ', '.join(sorted(inst.tags)) 

860 tagged.setdefault(key, []).append(inst) 

861 else: 

862 untagged.append(inst) 

863 

864 idx = 1 

865 if tagged: 

866 for tag_group, group_insts in tagged.items(): 

867 lines.append(f"## Group: {tag_group}") 

868 for inst in group_insts: 

869 lines.append(f"{idx}. [P{inst.priority}] {inst.text}") 

870 if inst.context: 

871 lines.append(f" Context: {json.dumps(inst.context)}") 

872 idx += 1 

873 lines.append("") 

874 

875 if untagged: 

876 if tagged: 

877 lines.append("## Other Instructions") 

878 for inst in untagged: 

879 lines.append(f"{idx}. [P{inst.priority}] {inst.text}") 

880 if inst.context: 

881 lines.append(f" Context: {json.dumps(inst.context)}") 

882 idx += 1 

883 

884 return '\n'.join(lines) 

885 

886 def complete_batch(self, batch_id: str, result: Optional[str] = None): 

887 """Mark all instructions in a batch as done. 

888 

889 Also calls ledger.complete_task_and_route() for each instruction 

890 so the SmartLedger can unblock dependent tasks and activate 

891 conditional/sequential follow-ups. 

892 """ 

893 with self._lock: 

894 ledger = self._get_ledger() 

895 for inst in self._instructions.values(): 

896 if inst.batch_id == batch_id: 

897 inst.status = InstructionStatus.DONE 

898 inst.updated_at = datetime.utcnow().isoformat() 

899 if result: 

900 inst.result = result 

901 # Notify ledger — unblocks dependents 

902 if ledger is not None: 

903 ltid = self._task_map.get(inst.id) 

904 if ltid: 

905 try: 

906 ledger.complete_task_and_route( 

907 ltid, outcome='success', result=result, 

908 ) 

909 except Exception as e: 

910 logger.debug(f"Ledger completion failed for [{ltid}]: {e}") 

911 self._save() 

912 

913 def fail_batch(self, batch_id: str, error: str): 

914 """Mark batch instructions as failed — they return to QUEUED for retry. 

915 

916 Also notifies the ledger so dependent tasks remain blocked 

917 until the retry succeeds. 

918 """ 

919 with self._lock: 

920 ledger = self._get_ledger() 

921 for inst in self._instructions.values(): 

922 if inst.batch_id == batch_id and inst.status == InstructionStatus.BATCHED: 

923 inst.status = InstructionStatus.QUEUED 

924 inst.batch_id = None 

925 inst.error = error 

926 inst.updated_at = datetime.utcnow().isoformat() 

927 # Notify ledger of failure 

928 if ledger is not None: 

929 ltid = self._task_map.get(inst.id) 

930 if ltid: 

931 try: 

932 ledger.complete_task_and_route( 

933 ltid, outcome='failure', result=error, 

934 ) 

935 except Exception as e: 

936 logger.debug(f"Ledger failure routing failed for [{ltid}]: {e}") 

937 self._save() 

938 

939 def stats(self) -> Dict: 

940 """Queue statistics.""" 

941 with self._lock: 

942 statuses = {} 

943 for inst in self._instructions.values(): 

944 s = inst.status.value if isinstance(inst.status, InstructionStatus) else inst.status 

945 statuses[s] = statuses.get(s, 0) + 1 

946 return { 

947 'user_id': self.user_id, 

948 'total': len(self._instructions), 

949 'by_status': statuses, 

950 'pending': statuses.get('queued', 0), 

951 } 

952 

953 def clear_done(self): 

954 """Remove completed/cancelled instructions to prevent unbounded growth.""" 

955 with self._lock: 

956 to_remove = [ 

957 iid for iid, inst in self._instructions.items() 

958 if inst.status in (InstructionStatus.DONE, InstructionStatus.CANCELLED) 

959 ] 

960 for iid in to_remove: 

961 del self._instructions[iid] 

962 if to_remove: 

963 self._save() 

964 return len(to_remove) 

965 

966 

967# ═══════════════════════════════════════════════════════════════════════ 

968# Singleton registry — one queue per user 

969# ═══════════════════════════════════════════════════════════════════════ 

970 

971_queues: Dict[str, InstructionQueue] = {} 

972_queue_lock = threading.Lock() 

973 

974 

975def get_queue(user_id: str) -> InstructionQueue: 

976 """Get or create instruction queue for a user.""" 

977 with _queue_lock: 

978 if user_id not in _queues: 

979 _queues[user_id] = InstructionQueue(user_id) 

980 return _queues[user_id] 

981 

982 

983def enqueue_instruction(user_id: str, text: str, **kwargs) -> Instruction: 

984 """Convenience: enqueue an instruction for a user.""" 

985 return get_queue(user_id).enqueue(text, **kwargs) 

986 

987 

988def pull_user_batch(user_id: str, max_tokens: int = 8000) -> Optional[InstructionBatch]: 

989 """Convenience: pull a batch for a user.""" 

990 return get_queue(user_id).pull_batch(max_tokens=max_tokens) 

991 

992 

993def get_all_pending() -> Dict[str, List[Dict]]: 

994 """Get pending instructions across all users.""" 

995 result = {} 

996 # Scan queue directory for all users 

997 if os.path.isdir(_QUEUE_DIR): 

998 for fname in os.listdir(_QUEUE_DIR): 

999 if fname.endswith('_queue.json'): 

1000 uid = fname.replace('_queue.json', '') 

1001 q = get_queue(uid) 

1002 pending = q.get_pending() 

1003 if pending: 

1004 result[uid] = [i.to_dict() for i in pending] 

1005 return result