Coverage for integrations / agent_engine / instruction_queue.py: 88.5%
489 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Instruction Queue — Never miss a user instruction.
4Every user instruction is immediately queued and registered with SmartLedger
5for LLM-based dependency analysis. When compute becomes available,
6instructions are pulled in proper dependency order (not just priority),
7correlated with existing context, and executed — individually or as a batch.
9SmartLedger integration:
10 enqueue() → ledger.add_dynamic_task() (LLM classifies deps)
11 pull_batch() → ledger.get_next/parallel() (respects dep graph)
12 complete_batch() → ledger.complete_task_and_route() (unblocks dependents)
14Falls back to simple priority ordering when SmartLedger is unavailable
15(e.g. no LLM reachable, agent_ledger package not installed).
17Batch mode: If multiple instructions queue up before compute arrives,
18they are consolidated into a single prompt that fits the context window.
19Related instructions are grouped, duplicates deduplicated, dependencies
20ordered by the ledger's task graph.
22Architecture:
23 User says "do X" → enqueue_instruction(user_id, "do X")
24 │
25 ├─ Stored in agent_data/instructions/{user_id}_queue.json
26 ├─ Registered with SmartLedger via add_dynamic_task():
27 │ ├─ LLM classifies relationship to existing tasks
28 │ ├─ Sets prerequisites, blockers, execution mode
29 │ └─ Determines parallel vs sequential ordering
30 ├─ Correlated with:
31 │ ├─ Conversation state (recent messages)
32 │ ├─ Related queued instructions (semantic grouping)
33 │ ├─ Active goals (avoid duplication)
34 │ └─ Ledger tasks (dependency tracking)
35 │
36 └─ When compute arrives:
37 ├─ pull_batch(user_id, max_tokens=...)
38 │ └─ Uses ledger.get_next_executable_task() for dep-aware ordering
39 ├─ Execute via /chat or dispatch.py
40 └─ complete_batch() → ledger.complete_task_and_route()
42Integration points:
43 - hart_intelligence /chat endpoint: auto-enqueue if compute busy
44 - hart_cli.py: `hart -p "task"` enqueues if server busy
45 - dispatch.py: pull_batch when idle compute detected
46 - agent_daemon.py: periodic batch check on tick
47"""
49import json
50import logging
51import os
52import threading
53import time
54import hashlib
55from datetime import datetime
56from enum import Enum
57from typing import Any, Dict, List, Optional, Tuple
59logger = logging.getLogger('hevolve.instruction_queue')
61# SmartLedger — optional, graceful fallback when unavailable
62try:
63 from agent_ledger import SmartLedger
64 _LEDGER_AVAILABLE = True
65except ImportError:
66 SmartLedger = None
67 _LEDGER_AVAILABLE = False
69_QUEUE_DIR = os.path.join(
70 os.environ.get('HART_INSTALL_DIR',
71 os.path.dirname(os.path.dirname(os.path.dirname(
72 os.path.abspath(__file__))))),
73 'agent_data', 'instructions',
74)
77class InstructionStatus(str, Enum):
78 QUEUED = 'queued' # Waiting for compute
79 BATCHED = 'batched' # Included in a batch, not yet executed
80 IN_PROGRESS = 'in_progress' # Currently being executed
81 DONE = 'done' # Successfully completed
82 FAILED = 'failed' # Execution failed
83 CANCELLED = 'cancelled' # User cancelled
86class Instruction:
87 """A single user instruction with metadata."""
89 __slots__ = (
90 'id', 'user_id', 'text', 'status', 'created_at',
91 'updated_at', 'priority', 'tags', 'context',
92 'related_goal_id', 'batch_id', 'result', 'error',
93 )
95 def __init__(self, user_id: str, text: str, priority: int = 5,
96 tags: Optional[List[str]] = None,
97 context: Optional[Dict] = None,
98 related_goal_id: Optional[str] = None):
99 self.id = hashlib.sha256(
100 f"{user_id}:{text}:{time.time()}".encode()
101 ).hexdigest()[:16]
102 self.user_id = user_id
103 self.text = text
104 self.status = InstructionStatus.QUEUED
105 self.created_at = datetime.utcnow().isoformat()
106 self.updated_at = self.created_at
107 self.priority = priority # 1=highest, 10=lowest
108 self.tags = tags or []
109 self.context = context or {}
110 self.related_goal_id = related_goal_id
111 self.batch_id = None
112 self.result = None
113 self.error = None
115 def to_dict(self) -> Dict:
116 return {
117 'id': self.id,
118 'user_id': self.user_id,
119 'text': self.text,
120 'status': self.status.value if isinstance(self.status, InstructionStatus) else self.status,
121 'created_at': self.created_at,
122 'updated_at': self.updated_at,
123 'priority': self.priority,
124 'tags': self.tags,
125 'context': self.context,
126 'related_goal_id': self.related_goal_id,
127 'batch_id': self.batch_id,
128 'result': self.result,
129 'error': self.error,
130 }
132 @classmethod
133 def from_dict(cls, d: Dict) -> 'Instruction':
134 inst = cls.__new__(cls)
135 inst.id = d['id']
136 inst.user_id = d['user_id']
137 inst.text = d['text']
138 inst.status = InstructionStatus(d.get('status', 'queued'))
139 inst.created_at = d.get('created_at', '')
140 inst.updated_at = d.get('updated_at', '')
141 inst.priority = d.get('priority', 5)
142 inst.tags = d.get('tags', [])
143 inst.context = d.get('context', {})
144 inst.related_goal_id = d.get('related_goal_id')
145 inst.batch_id = d.get('batch_id')
146 inst.result = d.get('result')
147 inst.error = d.get('error')
148 return inst
151class InstructionBatch:
152 """A consolidated batch of related instructions."""
154 def __init__(self, batch_id: str, instructions: List[Instruction],
155 consolidated_prompt: str):
156 self.batch_id = batch_id
157 self.instructions = instructions
158 self.consolidated_prompt = consolidated_prompt
159 self.created_at = datetime.utcnow().isoformat()
160 self.token_estimate = self._estimate_tokens(consolidated_prompt)
162 @staticmethod
163 def _estimate_tokens(text: str) -> int:
164 """Rough token estimate (~4 chars per token for English)."""
165 return len(text) // 4
167 def to_dict(self) -> Dict:
168 return {
169 'batch_id': self.batch_id,
170 'instruction_ids': [i.id for i in self.instructions],
171 'consolidated_prompt': self.consolidated_prompt,
172 'created_at': self.created_at,
173 'token_estimate': self.token_estimate,
174 'instruction_count': len(self.instructions),
175 }
178class ExecutionPlan:
179 """Dependency-aware execution plan for queued instructions.
181 Splits instructions into:
182 - parallel_groups: list of groups that can run concurrently within each group
183 - sequential_chain: ordered list where each depends on the previous
185 Dispatcher executes all items in a parallel group concurrently,
186 waits for completion, then moves to the next group.
188 Example for instructions: [A, B, C, D, E]
189 If A,B are independent and C depends on A, D depends on B, E depends on C+D:
190 waves = [[A, B], [C, D], [E]]
191 Wave 1: dispatch A and B in parallel
192 Wave 2: after A+B complete, dispatch C and D in parallel
193 Wave 3: after C+D complete, dispatch E
194 """
196 def __init__(self, waves: List[List[Instruction]], batch_id: str):
197 self.waves = waves # List of parallel groups in dependency order
198 self.batch_id = batch_id
199 self.created_at = datetime.utcnow().isoformat()
200 self.total_instructions = sum(len(w) for w in waves)
202 def to_dict(self) -> Dict:
203 return {
204 'batch_id': self.batch_id,
205 'waves': [
206 [{'id': i.id, 'text': i.text[:100]} for i in wave]
207 for wave in self.waves
208 ],
209 'total_instructions': self.total_instructions,
210 'wave_count': len(self.waves),
211 }
214class InstructionQueue:
215 """Persistent instruction queue per user.
217 Thread-safe. File-backed (JSON). Survives restarts.
218 Uses SmartLedger for LLM-based dependency analysis between instructions.
220 Concurrency guarantees:
221 - _lock (RLock): serializes ALL state mutations (instructions, ledger, file I/O)
222 - _drain_lock (Lock): prevents concurrent drains for same user
223 - _save() uses atomic write (temp + rename) to prevent corruption
224 - SmartLedger is always accessed inside _lock — no separate ledger lock needed
225 - Cross-process safety: file lock via _QUEUE_DIR/{user}_drain.lock
226 """
228 def __init__(self, user_id: str):
229 self.user_id = user_id
230 self._lock = threading.RLock() # Protects all state mutations
231 self._drain_lock = threading.Lock() # Prevents concurrent drains
232 self._queue_path = os.path.join(_QUEUE_DIR, f'{user_id}_queue.json')
233 self._drain_lock_path = os.path.join(_QUEUE_DIR, f'{user_id}_drain.lock')
234 self._instructions: Dict[str, Instruction] = {}
235 self._ledger = None # Lazy-initialized SmartLedger
236 self._task_map: Dict[str, str] = {} # instruction_id → ledger_task_id
237 self._load()
239 def acquire_drain_lock(self, timeout: float = 0) -> bool:
240 """Acquire exclusive drain lock for this user's queue.
242 Prevents concurrent drains (daemon tick + API call + another agent).
243 Uses both an in-process threading.Lock AND a filesystem lock file
244 for cross-process safety (daemon vs. API server in separate processes).
246 Args:
247 timeout: Seconds to wait. 0 = non-blocking (return False immediately if busy).
249 Returns:
250 True if lock acquired, False if another drain is in progress.
251 """
252 acquired = self._drain_lock.acquire(timeout=timeout) if timeout > 0 else self._drain_lock.acquire(blocking=False)
253 if not acquired:
254 return False
256 # Cross-process file lock: write PID to lock file
257 try:
258 os.makedirs(os.path.dirname(self._drain_lock_path), exist_ok=True)
259 if os.path.exists(self._drain_lock_path):
260 try:
261 with open(self._drain_lock_path, 'r') as f:
262 lock_data = json.load(f)
263 lock_pid = lock_data.get('pid', -1)
264 lock_time = lock_data.get('time', 0)
265 # Check if lock holder is still alive
266 if lock_pid != os.getpid():
267 try:
268 os.kill(lock_pid, 0) # Check if process exists
269 # Process alive — check staleness (10 min max)
270 if time.time() - lock_time < 600:
271 self._drain_lock.release()
272 return False
273 # Stale lock — take it
274 except (OSError, ProcessLookupError):
275 pass # Dead process — take the lock
276 except (json.JSONDecodeError, IOError):
277 pass # Corrupt lock file — take it
279 with open(self._drain_lock_path, 'w') as f:
280 json.dump({'pid': os.getpid(), 'time': time.time(),
281 'user_id': self.user_id}, f)
282 except IOError:
283 pass # File lock is best-effort — thread lock still held
285 return True
287 def release_drain_lock(self):
288 """Release the drain lock."""
289 try:
290 if os.path.exists(self._drain_lock_path):
291 os.remove(self._drain_lock_path)
292 except OSError:
293 pass
294 try:
295 self._drain_lock.release()
296 except RuntimeError:
297 pass # Already released
299 def _get_ledger(self):
300 """Get or create SmartLedger for this user's instruction queue.
302 The ledger provides LLM-based dependency analysis between queued
303 instructions via add_dynamic_task(). Returns None when SmartLedger
304 is unavailable — caller must fall back to simple priority ordering.
305 """
306 if self._ledger is not None:
307 return self._ledger
308 if not _LEDGER_AVAILABLE:
309 return None
310 try:
311 # Dedicated ledger for the instruction queue — separate from
312 # per-prompt action ledgers in create_recipe.py.
313 # agent_id = 'iq_{user}' distinguishes this from action ledgers.
314 self._ledger = SmartLedger(
315 agent_id=f'iq_{self.user_id}',
316 session_id=f'{self.user_id}_instruction_queue',
317 )
318 logger.info(f"SmartLedger initialized for instruction queue [{self.user_id}]")
319 return self._ledger
320 except Exception as e:
321 logger.debug(f"SmartLedger unavailable for instruction queue: {e}")
322 return None
324 def _register_with_ledger(self, inst: 'Instruction') -> Optional[str]:
325 """Register an instruction with SmartLedger for dependency analysis.
327 Calls ledger.add_dynamic_task() which uses LLM to classify the
328 relationship between this instruction and all existing tasks:
329 - child/sibling/sequential/conditional/independent
330 - prerequisites, blockers, execution mode (parallel/sequential)
331 - delegation needs, scheduling, retry config
333 Returns the ledger task_id on success, None on failure.
334 """
335 ledger = self._get_ledger()
336 if ledger is None:
337 return None
339 try:
340 task = ledger.add_dynamic_task(
341 task_description=inst.text,
342 context={
343 'current_action_id': None,
344 'previous_outcome': None,
345 'user_message': inst.text,
346 'discovered_by': 'instruction_queue',
347 'instruction_id': inst.id,
348 'priority': inst.priority,
349 'tags': inst.tags,
350 'related_goal_id': inst.related_goal_id,
351 },
352 )
353 if task:
354 task_id = task.task_id
355 self._task_map[inst.id] = task_id
356 inst.context['ledger_task_id'] = task_id
357 logger.info(
358 f"Instruction [{inst.id}] registered with ledger as [{task_id}]"
359 )
360 return task_id
361 except Exception as e:
362 logger.debug(f"Ledger registration failed for [{inst.id}]: {e}")
363 return None
365 def _load(self):
366 """Load queue from disk. Auto-detects encrypted vs plaintext."""
367 if not os.path.exists(self._queue_path):
368 return
369 try:
370 try:
371 from security.crypto import decrypt_json_file
372 data = decrypt_json_file(self._queue_path)
373 except ImportError:
374 with open(self._queue_path, 'r', encoding='utf-8') as f:
375 data = json.load(f)
376 if data is None:
377 return
378 for d in data.get('instructions', []):
379 inst = Instruction.from_dict(d)
380 self._instructions[inst.id] = inst
381 # Restore task map from persisted context
382 ltid = inst.context.get('ledger_task_id')
383 if ltid:
384 self._task_map[inst.id] = ltid
385 except (json.JSONDecodeError, IOError, KeyError) as e:
386 logger.warning(f"Failed to load instruction queue: {e}")
388 def _save(self):
389 """Persist queue to disk using atomic write (temp + rename).
391 Encrypted at rest when HEVOLVE_DATA_KEY is configured.
392 Prevents corruption if process crashes mid-write: the rename is
393 atomic on POSIX and near-atomic on Windows (NTFS MoveFileEx).
394 """
395 os.makedirs(os.path.dirname(self._queue_path), exist_ok=True)
396 data = {
397 'user_id': self.user_id,
398 'updated_at': datetime.utcnow().isoformat(),
399 'instructions': [i.to_dict() for i in self._instructions.values()],
400 }
401 tmp_path = self._queue_path + '.tmp'
402 try:
403 try:
404 from security.crypto import encrypt_json_file
405 encrypt_json_file(tmp_path, data)
406 except ImportError:
407 with open(tmp_path, 'w', encoding='utf-8') as f:
408 json.dump(data, f, indent=2)
409 f.flush()
410 os.fsync(f.fileno())
411 # Atomic rename (replaces target on POSIX; os.replace on all platforms)
412 os.replace(tmp_path, self._queue_path)
413 except IOError as e:
414 logger.error(f"Failed to save instruction queue: {e}")
415 # Clean up temp file on failure
416 try:
417 os.remove(tmp_path)
418 except OSError:
419 pass
421 def enqueue(self, text: str, priority: int = 5,
422 tags: Optional[List[str]] = None,
423 context: Optional[Dict] = None,
424 related_goal_id: Optional[str] = None) -> Instruction:
425 """Add an instruction to the queue.
427 The instruction is persisted to JSON AND registered with SmartLedger
428 for LLM-based dependency analysis. The ledger classifies how this
429 instruction relates to all other queued instructions (child, sibling,
430 sequential, conditional, independent) and sets prerequisites/blockers.
432 When SmartLedger is unavailable, falls back to simple priority ordering.
433 """
434 with self._lock:
435 # Deduplicate: if exact same text is already queued, skip
436 for inst in self._instructions.values():
437 if inst.text == text and inst.status == InstructionStatus.QUEUED:
438 logger.debug(f"Duplicate instruction skipped: {text[:50]}...")
439 return inst
441 inst = Instruction(
442 user_id=self.user_id,
443 text=text,
444 priority=priority,
445 tags=tags,
446 context=context,
447 related_goal_id=related_goal_id,
448 )
449 self._instructions[inst.id] = inst
451 # Register with SmartLedger for LLM dependency analysis.
452 # This call may invoke the LLM to classify task relationships.
453 self._register_with_ledger(inst)
455 self._save()
456 logger.info(f"Instruction queued [{inst.id}]: {text[:80]}...")
457 return inst
459 def get_pending(self) -> List[Instruction]:
460 """Get all queued (unconsumed) instructions, sorted by priority then time."""
461 with self._lock:
462 pending = [
463 i for i in self._instructions.values()
464 if i.status == InstructionStatus.QUEUED
465 ]
466 pending.sort(key=lambda i: (i.priority, i.created_at))
467 return pending
469 def get_all(self) -> List[Instruction]:
470 """Get all instructions regardless of status."""
471 with self._lock:
472 return list(self._instructions.values())
474 def mark_status(self, instruction_id: str, status: InstructionStatus,
475 result: Optional[str] = None,
476 error: Optional[str] = None):
477 """Update instruction status."""
478 with self._lock:
479 inst = self._instructions.get(instruction_id)
480 if inst:
481 inst.status = status
482 inst.updated_at = datetime.utcnow().isoformat()
483 if result:
484 inst.result = result
485 if error:
486 inst.error = error
487 self._save()
489 def cancel(self, instruction_id: str):
490 """Cancel a queued instruction."""
491 self.mark_status(instruction_id, InstructionStatus.CANCELLED)
493 def _get_ledger_ordered_pending(self) -> List['Instruction']:
494 """Get pending instructions ordered by SmartLedger dependency graph.
496 Uses get_next_executable_task() and get_parallel_executable_tasks()
497 to build a dependency-aware execution order. Tasks whose
498 prerequisites aren't met yet are excluded (they'll be pulled in
499 a future batch after their dependencies complete).
501 Returns simple priority-sorted list when ledger is unavailable.
502 """
503 pending = self.get_pending()
504 if not pending:
505 return []
507 ledger = self._get_ledger()
508 if ledger is None or not self._task_map:
509 return pending # Fallback: simple priority order
511 # Build reverse map: ledger_task_id → Instruction
512 task_to_inst: Dict[str, 'Instruction'] = {}
513 unmapped: List['Instruction'] = []
514 for inst in pending:
515 ltid = self._task_map.get(inst.id)
516 if ltid and ltid in ledger.tasks:
517 task_to_inst[ltid] = inst
518 else:
519 unmapped.append(inst)
521 if not task_to_inst:
522 return pending # No mapped tasks, use simple ordering
524 # Collect executable tasks from ledger in dependency order
525 ordered: List['Instruction'] = []
526 seen: set = set()
528 # First: all parallel-ready tasks (can run concurrently)
529 try:
530 parallel = ledger.get_parallel_executable_tasks()
531 for task in parallel:
532 if task.task_id in task_to_inst and task.task_id not in seen:
533 ordered.append(task_to_inst[task.task_id])
534 seen.add(task.task_id)
535 except Exception as e:
536 logger.debug(f"Ledger parallel query failed: {e}")
538 # Then: sequential tasks in dependency order
539 try:
540 max_iter = len(task_to_inst) + 5 # safety bound
541 for _ in range(max_iter):
542 task = ledger.get_next_executable_task()
543 if task is None:
544 break
545 if task.task_id in task_to_inst and task.task_id not in seen:
546 ordered.append(task_to_inst[task.task_id])
547 seen.add(task.task_id)
548 elif task.task_id in seen:
549 # Already included, skip to avoid infinite loop
550 break
551 else:
552 break # Not an instruction queue task
553 except Exception as e:
554 logger.debug(f"Ledger sequential query failed: {e}")
556 # Append any mapped but not-yet-executable instructions at the end
557 # (their deps aren't met — they'll wait for next batch)
558 for ltid, inst in task_to_inst.items():
559 if ltid not in seen:
560 ordered.append(inst)
562 # Append unmapped instructions last
563 ordered.extend(unmapped)
565 logger.info(
566 f"Ledger-ordered batch: {len(ordered)} instructions "
567 f"({len(seen)} dependency-resolved, {len(unmapped)} unmapped)"
568 )
569 return ordered
571 def pull_batch(self, max_tokens: int = 8000,
572 max_instructions: int = 20) -> Optional[InstructionBatch]:
573 """Pull queued instructions into a consolidated batch.
575 Uses SmartLedger dependency graph to order instructions correctly:
576 - Prerequisites are satisfied before dependents
577 - Parallel-safe tasks are grouped together
578 - Sequential chains maintain proper order
580 Falls back to priority+tag ordering when ledger is unavailable.
582 Args:
583 max_tokens: Maximum estimated tokens for the batch prompt
584 max_instructions: Maximum number of instructions in one batch
586 Returns:
587 InstructionBatch if there are pending instructions, else None
588 """
589 with self._lock:
590 # Use ledger-aware ordering when available
591 pending = self._get_ledger_ordered_pending()
592 if not pending:
593 return None
595 # Select instructions that fit within token budget
596 selected = []
597 total_chars = 0
598 char_budget = max_tokens * 4 # ~4 chars per token
600 for inst in pending[:max_instructions]:
601 inst_chars = len(inst.text) + len(json.dumps(inst.context))
602 if total_chars + inst_chars > char_budget and selected:
603 break # Stop adding — budget exceeded
604 selected.append(inst)
605 total_chars += inst_chars
607 if not selected:
608 return None
610 # Generate batch ID
611 batch_id = hashlib.sha256(
612 f"batch:{self.user_id}:{time.time()}".encode()
613 ).hexdigest()[:12]
615 # Consolidate into a single prompt
616 prompt = self._consolidate(selected)
618 # Mark as batched
619 for inst in selected:
620 inst.status = InstructionStatus.BATCHED
621 inst.batch_id = batch_id
622 inst.updated_at = datetime.utcnow().isoformat()
624 self._save()
626 batch = InstructionBatch(batch_id, selected, prompt)
627 logger.info(
628 f"Batch [{batch_id}]: {len(selected)} instructions, "
629 f"~{batch.token_estimate} tokens"
630 )
631 return batch
633 def pull_execution_plan(self, max_tokens: int = 8000,
634 max_instructions: int = 20) -> Optional[ExecutionPlan]:
635 """Pull queued instructions as a dependency-aware execution plan.
637 Returns an ExecutionPlan with waves of instructions:
638 - Each wave is a group of independent instructions (dispatch in parallel)
639 - Waves are ordered by dependency (wave N+1 depends on wave N)
641 When SmartLedger is unavailable, all instructions go into a single wave
642 (effectively the same as pull_batch but wrapped in ExecutionPlan).
644 Args:
645 max_tokens: Maximum estimated tokens across all waves
646 max_instructions: Maximum total instructions
648 Returns:
649 ExecutionPlan with parallel waves, or None if queue empty
650 """
651 with self._lock:
652 pending = self.get_pending()
653 if not pending:
654 return None
656 # Budget filter
657 selected = []
658 total_chars = 0
659 char_budget = max_tokens * 4
660 for inst in pending[:max_instructions]:
661 inst_chars = len(inst.text) + len(json.dumps(inst.context))
662 if total_chars + inst_chars > char_budget and selected:
663 break
664 selected.append(inst)
665 total_chars += inst_chars
667 if not selected:
668 return None
670 batch_id = hashlib.sha256(
671 f"plan:{self.user_id}:{time.time()}".encode()
672 ).hexdigest()[:12]
674 # Build waves using ledger dependency graph
675 waves = self._build_waves(selected)
677 # Mark all as batched
678 for inst in selected:
679 inst.status = InstructionStatus.BATCHED
680 inst.batch_id = batch_id
681 inst.updated_at = datetime.utcnow().isoformat()
682 self._save()
684 plan = ExecutionPlan(waves, batch_id)
685 logger.info(
686 f"Execution plan [{batch_id}]: {plan.total_instructions} "
687 f"instructions in {len(waves)} waves"
688 )
689 return plan
691 def _build_waves(self, instructions: List[Instruction]) -> List[List[Instruction]]:
692 """Split instructions into dependency-ordered parallel waves.
694 Uses SmartLedger to determine which instructions are independent
695 (can run in same wave) vs dependent (must run in later waves).
697 Wave 0: all instructions with no prerequisites
698 Wave 1: instructions whose prerequisites are all in wave 0
699 Wave N: instructions whose prerequisites are all in waves < N
701 Falls back to single wave when ledger is unavailable.
702 """
703 ledger = self._get_ledger()
704 if ledger is None or not self._task_map:
705 # No ledger — everything in one wave
706 return [instructions]
708 # Build maps
709 inst_by_id: Dict[str, Instruction] = {i.id: i for i in instructions}
710 inst_by_task: Dict[str, Instruction] = {}
711 task_by_inst: Dict[str, str] = {}
712 unmapped: List[Instruction] = []
714 for inst in instructions:
715 ltid = self._task_map.get(inst.id)
716 if ltid and ltid in ledger.tasks:
717 inst_by_task[ltid] = inst
718 task_by_inst[inst.id] = ltid
719 else:
720 unmapped.append(inst)
722 if not inst_by_task:
723 return [instructions]
725 # Collect task IDs that are in our instruction set
726 our_task_ids = set(inst_by_task.keys())
728 # Build dependency graph restricted to our task set
729 # For each task, find which of OUR tasks it depends on
730 deps: Dict[str, set] = {} # task_id → set of task_ids it depends on
731 for tid in our_task_ids:
732 task = ledger.tasks.get(tid)
733 if task is None:
734 deps[tid] = set()
735 continue
736 task_deps = set()
737 # Check prerequisites
738 if hasattr(task, 'prerequisites') and task.prerequisites:
739 for prereq in task.prerequisites:
740 if prereq in our_task_ids:
741 task_deps.add(prereq)
742 # Check blocked_by
743 if hasattr(task, 'blocked_by') and task.blocked_by:
744 for blocker in task.blocked_by:
745 if blocker in our_task_ids:
746 task_deps.add(blocker)
747 # Check parent (child must wait for parent's other children)
748 if hasattr(task, 'parent_task_id') and task.parent_task_id:
749 if task.parent_task_id in our_task_ids:
750 task_deps.add(task.parent_task_id)
751 deps[tid] = task_deps
753 # Topological sort into waves (Kahn's algorithm by level)
754 waves: List[List[Instruction]] = []
755 placed: set = set()
756 remaining = set(our_task_ids)
758 max_iterations = len(our_task_ids) + 1
759 for _ in range(max_iterations):
760 if not remaining:
761 break
762 # Find tasks whose deps are all placed
763 wave_tasks = [
764 tid for tid in remaining
765 if deps[tid].issubset(placed)
766 ]
767 if not wave_tasks:
768 # Circular dependency — dump remaining into last wave
769 wave_tasks = list(remaining)
770 wave = [inst_by_task[tid] for tid in wave_tasks if tid in inst_by_task]
771 if wave:
772 waves.append(wave)
773 placed.update(wave_tasks)
774 remaining -= set(wave_tasks)
776 # Unmapped instructions go in the first wave (no known deps)
777 if unmapped:
778 if waves:
779 waves[0] = unmapped + waves[0]
780 else:
781 waves = [unmapped]
783 return waves if waves else [instructions]
785 def complete_instruction(self, instruction_id: str, result: Optional[str] = None):
786 """Mark a single instruction as done and notify ledger.
788 Used by parallel dispatch to complete individual instructions
789 (vs complete_batch which completes all at once).
790 """
791 with self._lock:
792 inst = self._instructions.get(instruction_id)
793 if not inst:
794 return
795 inst.status = InstructionStatus.DONE
796 inst.updated_at = datetime.utcnow().isoformat()
797 if result:
798 inst.result = result
799 # Notify ledger — unblocks dependents
800 ledger = self._get_ledger()
801 if ledger is not None:
802 ltid = self._task_map.get(instruction_id)
803 if ltid:
804 try:
805 ledger.complete_task_and_route(
806 ltid, outcome='success', result=result,
807 )
808 except Exception as e:
809 logger.debug(f"Ledger completion failed for [{ltid}]: {e}")
810 self._save()
812 def fail_instruction(self, instruction_id: str, error: str):
813 """Mark a single instruction as failed and return to queue."""
814 with self._lock:
815 inst = self._instructions.get(instruction_id)
816 if not inst:
817 return
818 inst.status = InstructionStatus.QUEUED
819 inst.batch_id = None
820 inst.error = error
821 inst.updated_at = datetime.utcnow().isoformat()
822 ledger = self._get_ledger()
823 if ledger is not None:
824 ltid = self._task_map.get(instruction_id)
825 if ltid:
826 try:
827 ledger.complete_task_and_route(
828 ltid, outcome='failure', result=error,
829 )
830 except Exception as e:
831 logger.debug(f"Ledger failure routing for [{ltid}]: {e}")
832 self._save()
834 def _consolidate(self, instructions: List[Instruction]) -> str:
835 """Consolidate multiple instructions into a single prompt.
837 Groups by tags, adds context, maintains priority order.
838 """
839 if len(instructions) == 1:
840 inst = instructions[0]
841 prompt = inst.text
842 if inst.context:
843 ctx_str = json.dumps(inst.context, indent=2)
844 prompt = f"Context:\n{ctx_str}\n\nInstruction:\n{inst.text}"
845 return prompt
847 # Multiple instructions → structured batch
848 lines = [
849 f"You have {len(instructions)} queued instructions to execute.",
850 "Process them in the order listed. Each instruction may depend on previous ones.",
851 "",
852 ]
854 # Group by tags
855 tagged: Dict[str, List[Instruction]] = {}
856 untagged: List[Instruction] = []
857 for inst in instructions:
858 if inst.tags:
859 key = ', '.join(sorted(inst.tags))
860 tagged.setdefault(key, []).append(inst)
861 else:
862 untagged.append(inst)
864 idx = 1
865 if tagged:
866 for tag_group, group_insts in tagged.items():
867 lines.append(f"## Group: {tag_group}")
868 for inst in group_insts:
869 lines.append(f"{idx}. [P{inst.priority}] {inst.text}")
870 if inst.context:
871 lines.append(f" Context: {json.dumps(inst.context)}")
872 idx += 1
873 lines.append("")
875 if untagged:
876 if tagged:
877 lines.append("## Other Instructions")
878 for inst in untagged:
879 lines.append(f"{idx}. [P{inst.priority}] {inst.text}")
880 if inst.context:
881 lines.append(f" Context: {json.dumps(inst.context)}")
882 idx += 1
884 return '\n'.join(lines)
886 def complete_batch(self, batch_id: str, result: Optional[str] = None):
887 """Mark all instructions in a batch as done.
889 Also calls ledger.complete_task_and_route() for each instruction
890 so the SmartLedger can unblock dependent tasks and activate
891 conditional/sequential follow-ups.
892 """
893 with self._lock:
894 ledger = self._get_ledger()
895 for inst in self._instructions.values():
896 if inst.batch_id == batch_id:
897 inst.status = InstructionStatus.DONE
898 inst.updated_at = datetime.utcnow().isoformat()
899 if result:
900 inst.result = result
901 # Notify ledger — unblocks dependents
902 if ledger is not None:
903 ltid = self._task_map.get(inst.id)
904 if ltid:
905 try:
906 ledger.complete_task_and_route(
907 ltid, outcome='success', result=result,
908 )
909 except Exception as e:
910 logger.debug(f"Ledger completion failed for [{ltid}]: {e}")
911 self._save()
913 def fail_batch(self, batch_id: str, error: str):
914 """Mark batch instructions as failed — they return to QUEUED for retry.
916 Also notifies the ledger so dependent tasks remain blocked
917 until the retry succeeds.
918 """
919 with self._lock:
920 ledger = self._get_ledger()
921 for inst in self._instructions.values():
922 if inst.batch_id == batch_id and inst.status == InstructionStatus.BATCHED:
923 inst.status = InstructionStatus.QUEUED
924 inst.batch_id = None
925 inst.error = error
926 inst.updated_at = datetime.utcnow().isoformat()
927 # Notify ledger of failure
928 if ledger is not None:
929 ltid = self._task_map.get(inst.id)
930 if ltid:
931 try:
932 ledger.complete_task_and_route(
933 ltid, outcome='failure', result=error,
934 )
935 except Exception as e:
936 logger.debug(f"Ledger failure routing failed for [{ltid}]: {e}")
937 self._save()
939 def stats(self) -> Dict:
940 """Queue statistics."""
941 with self._lock:
942 statuses = {}
943 for inst in self._instructions.values():
944 s = inst.status.value if isinstance(inst.status, InstructionStatus) else inst.status
945 statuses[s] = statuses.get(s, 0) + 1
946 return {
947 'user_id': self.user_id,
948 'total': len(self._instructions),
949 'by_status': statuses,
950 'pending': statuses.get('queued', 0),
951 }
953 def clear_done(self):
954 """Remove completed/cancelled instructions to prevent unbounded growth."""
955 with self._lock:
956 to_remove = [
957 iid for iid, inst in self._instructions.items()
958 if inst.status in (InstructionStatus.DONE, InstructionStatus.CANCELLED)
959 ]
960 for iid in to_remove:
961 del self._instructions[iid]
962 if to_remove:
963 self._save()
964 return len(to_remove)
967# ═══════════════════════════════════════════════════════════════════════
968# Singleton registry — one queue per user
969# ═══════════════════════════════════════════════════════════════════════
971_queues: Dict[str, InstructionQueue] = {}
972_queue_lock = threading.Lock()
975def get_queue(user_id: str) -> InstructionQueue:
976 """Get or create instruction queue for a user."""
977 with _queue_lock:
978 if user_id not in _queues:
979 _queues[user_id] = InstructionQueue(user_id)
980 return _queues[user_id]
983def enqueue_instruction(user_id: str, text: str, **kwargs) -> Instruction:
984 """Convenience: enqueue an instruction for a user."""
985 return get_queue(user_id).enqueue(text, **kwargs)
988def pull_user_batch(user_id: str, max_tokens: int = 8000) -> Optional[InstructionBatch]:
989 """Convenience: pull a batch for a user."""
990 return get_queue(user_id).pull_batch(max_tokens=max_tokens)
993def get_all_pending() -> Dict[str, List[Dict]]:
994 """Get pending instructions across all users."""
995 result = {}
996 # Scan queue directory for all users
997 if os.path.isdir(_QUEUE_DIR):
998 for fname in os.listdir(_QUEUE_DIR):
999 if fname.endswith('_queue.json'):
1000 uid = fname.replace('_queue.json', '')
1001 q = get_queue(uid)
1002 pending = q.get_pending()
1003 if pending:
1004 result[uid] = [i.to_dict() for i in pending]
1005 return result