Coverage for integrations / internal_comm / task_delegation_bridge.py: 0.0%
95 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Task Delegation Bridge - Integrates A2A delegation with SmartLedger
4This module bridges the gap between A2A delegation and task_ledger,
5ensuring that delegated tasks are properly tracked with state management
6and auto-resume capabilities.
8Key Features:
9- Parent task goes BLOCKED while delegation is in progress
10- Delegated task created in ledger with proper parent-child relationship
11- Automatic resume of parent task when delegation completes
12- Full audit trail of delegation lifecycle
13- Nested task support for complex delegations
14"""
16import logging
17import json
18from typing import Optional, Dict, Any, List
19from datetime import datetime
20import sys
21import os
23# Add parent directory to path for imports
24sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..')))
26from agent_ledger import (
27 SmartLedger, Task, TaskType, TaskStatus, ExecutionMode
28)
29from integrations.internal_comm.internal_agent_communication import (
30 A2AContextExchange, skill_registry
31)
33logger = logging.getLogger(__name__)
36class TaskDelegationBridge:
37 """
38 Bridges A2A delegation with SmartLedger for proper state management
40 Workflow:
41 1. Agent A has task T1 in progress
42 2. Agent A delegates subtask T2 to Agent B
43 3. T1 → BLOCKED (waiting for delegation)
44 4. T2 → Created as child of T1, assigned to Agent B
45 5. Agent B processes T2
46 6. When T2 → COMPLETED, T1 → Auto-resumes to IN_PROGRESS
47 """
49 def __init__(self, a2a_context: A2AContextExchange, ledger: SmartLedger):
50 """
51 Initialize delegation bridge
53 Args:
54 a2a_context: A2A context exchange for delegation
55 ledger: SmartLedger for task tracking
56 """
57 self.a2a_context = a2a_context
58 self.ledger = ledger
59 self.delegation_map = {} # delegation_id -> (parent_task_id, child_task_id)
61 def delegate_task_with_tracking(
62 self,
63 parent_task_id: str,
64 from_agent: str,
65 task_description: str,
66 required_skills: List[str],
67 context: Optional[Dict] = None
68 ) -> Optional[str]:
69 """
70 Delegate a task with full task_ledger integration
72 Args:
73 parent_task_id: ID of the parent task (will be BLOCKED)
74 from_agent: Agent delegating the task
75 task_description: Description of task to delegate
76 required_skills: Required skills for the task
77 context: Optional context data
79 Returns:
80 Delegation ID or None if delegation fails
81 """
82 # Step 1: Get parent task
83 parent_task = self.ledger.get_task(parent_task_id)
84 if not parent_task:
85 logger.error(f"Parent task not found: {parent_task_id}")
86 return None
88 # Step 2: Delegate via A2A
89 delegation_id = self.a2a_context.delegate_task(
90 from_agent=from_agent,
91 task=task_description,
92 required_skills=required_skills,
93 context=context
94 )
96 if not delegation_id:
97 logger.error("A2A delegation failed - no suitable agent found")
98 return None
100 # Step 3: Get delegation info to find target agent
101 delegation_info = self.a2a_context.delegations.get(delegation_id)
102 if not delegation_info:
103 logger.error(f"Delegation info not found: {delegation_id}")
104 return None
106 to_agent = delegation_info['to_agent']
108 # Step 4: Create child task in ledger with parent-child relationship
109 child_task = self.ledger.create_parent_child_task(
110 parent_task_id=parent_task_id,
111 child_description=task_description,
112 child_type=TaskType.AUTONOMOUS,
113 context={
114 'delegation_id': delegation_id,
115 'delegated_by': from_agent,
116 'delegated_to': to_agent,
117 'required_skills': required_skills,
118 'delegation_context': context or {}
119 }
120 )
122 if not child_task:
123 logger.error(f"Failed to create child task for delegation {delegation_id}")
124 return None
126 # Step 6: Block parent task (waiting for delegation)
127 self.ledger.update_task_status(
128 parent_task_id,
129 TaskStatus.BLOCKED,
130 f"Waiting for delegated task: {child_task.task_id}"
131 )
133 # Step 7: Map delegation to task IDs
134 self.delegation_map[delegation_id] = {
135 'parent_task_id': parent_task_id,
136 'child_task_id': child_task.task_id,
137 'from_agent': from_agent,
138 'to_agent': to_agent,
139 'created_at': datetime.now().isoformat()
140 }
142 logger.info(
143 f"Task delegation tracked: {delegation_id}\n"
144 f" Parent task {parent_task_id} → BLOCKED\n"
145 f" Child task {child_task.task_id} → Created for {to_agent}"
146 )
148 return delegation_id
150 def complete_delegation_with_tracking(
151 self,
152 delegation_id: str,
153 result: Any,
154 success: bool = True
155 ) -> bool:
156 """
157 Complete a delegation and update task states
159 Args:
160 delegation_id: Delegation ID
161 result: Delegation result
162 success: Whether delegation succeeded
164 Returns:
165 True if completion successful
166 """
167 # Step 1: Get delegation mapping
168 if delegation_id not in self.delegation_map:
169 logger.error(f"Delegation mapping not found: {delegation_id}")
170 return False
172 mapping = self.delegation_map[delegation_id]
173 parent_task_id = mapping['parent_task_id']
174 child_task_id = mapping['child_task_id']
176 # Step 2: Complete delegation in A2A
177 self.a2a_context.complete_delegation(delegation_id, result)
179 # Step 3: Update child task status
180 child_status = TaskStatus.COMPLETED if success else TaskStatus.FAILED
181 self.ledger.update_task_status(
182 child_task_id,
183 child_status,
184 json.dumps(result) if isinstance(result, (dict, list)) else str(result)
185 )
187 # Step 4: Auto-resume parent task (task_ledger should handle this automatically)
188 # But let's explicitly trigger it to be safe
189 parent_task = self.ledger.get_task(parent_task_id)
190 if parent_task and parent_task.status == TaskStatus.BLOCKED:
191 # Check if all dependencies are complete
192 if self._all_dependencies_complete(parent_task_id):
193 self.ledger.update_task_status(
194 parent_task_id,
195 TaskStatus.IN_PROGRESS,
196 f"Resumed after delegation {delegation_id} completed"
197 )
198 logger.info(f"Parent task {parent_task_id} auto-resumed after delegation")
200 logger.info(
201 f"Delegation completed: {delegation_id}\n"
202 f" Child task {child_task_id} → {child_status.value}\n"
203 f" Parent task {parent_task_id} → Resumed"
204 )
206 return True
208 def _all_dependencies_complete(self, task_id: str) -> bool:
209 """Check if all dependencies of a task are complete"""
210 task = self.ledger.get_task(task_id)
211 if not task or not task.depends_on:
212 return True
214 for dep_id in task.depends_on:
215 dep_task = self.ledger.get_task(dep_id)
216 if not dep_task or not TaskStatus.is_terminal_state(dep_task.status):
217 return False
219 return True
221 def get_delegation_status(self, delegation_id: str) -> Optional[Dict[str, Any]]:
222 """
223 Get comprehensive status of a delegation
225 Args:
226 delegation_id: Delegation ID
228 Returns:
229 Dictionary with delegation status including task states
230 """
231 if delegation_id not in self.delegation_map:
232 return None
234 mapping = self.delegation_map[delegation_id]
235 parent_task = self.ledger.get_task(mapping['parent_task_id'])
236 child_task = self.ledger.get_task(mapping['child_task_id'])
237 a2a_delegation = self.a2a_context.delegations.get(delegation_id)
239 return {
240 'delegation_id': delegation_id,
241 'parent_task': {
242 'task_id': parent_task.task_id if parent_task else None,
243 'status': parent_task.status.value if parent_task else None,
244 'description': parent_task.description if parent_task else None
245 },
246 'child_task': {
247 'task_id': child_task.task_id if child_task else None,
248 'status': child_task.status.value if child_task else None,
249 'description': child_task.description if child_task else None
250 },
251 'delegation': a2a_delegation,
252 'mapping': mapping
253 }
255 def list_active_delegations(self) -> List[Dict[str, Any]]:
256 """Get all active delegations with their task states"""
257 active = []
259 for delegation_id, mapping in self.delegation_map.items():
260 child_task = self.ledger.get_task(mapping['child_task_id'])
262 # Only include if child task is not in terminal state
263 if child_task and not TaskStatus.is_terminal_state(child_task.status):
264 status = self.get_delegation_status(delegation_id)
265 if status:
266 active.append(status)
268 return active
271def create_delegation_function_with_ledger(
272 agent_name: str,
273 ledger: SmartLedger,
274 a2a_context: A2AContextExchange,
275 current_task_id: Optional[str] = None
276):
277 """
278 Create a delegation function that integrates with task_ledger
280 Args:
281 agent_name: Name of the agent
282 ledger: SmartLedger instance
283 a2a_context: A2A context exchange
284 current_task_id: Current task ID (will be blocked during delegation)
286 Returns:
287 Delegation function for autogen
288 """
289 bridge = TaskDelegationBridge(a2a_context, ledger)
291 def delegate_with_tracking(
292 task: str,
293 required_skills: List[str],
294 context: Optional[Dict] = None
295 ) -> str:
296 """
297 Delegate a task to a specialist agent with full tracking
299 Args:
300 task: Task description
301 required_skills: Required skills
302 context: Optional context
304 Returns:
305 JSON result with delegation status
306 """
307 # If we have a current task, use it as parent
308 parent_task_id = current_task_id
310 # If no current task, create one
311 if not parent_task_id:
312 parent_task_id = f"task_delegation_{uuid.uuid4().hex[:12]}"
313 parent_task = Task(
314 task_id=parent_task_id,
315 description=f"{agent_name} - delegating task",
316 task_type=TaskType.AUTONOMOUS,
317 context={'delegating_agent': agent_name}
318 )
319 ledger.add_task(parent_task)
321 # Delegate with tracking
322 delegation_id = bridge.delegate_task_with_tracking(
323 parent_task_id=parent_task_id,
324 from_agent=agent_name,
325 task_description=task,
326 required_skills=required_skills,
327 context=context
328 )
330 if delegation_id:
331 status = bridge.get_delegation_status(delegation_id)
332 return json.dumps({
333 'success': True,
334 'delegation_id': delegation_id,
335 'message': f'Task delegated to specialist agent',
336 'status': status
337 }, indent=2)
338 else:
339 return json.dumps({
340 'success': False,
341 'error': 'No suitable agent found for delegation'
342 }, indent=2)
344 return delegate_with_tracking
347# Convenience exports
348__all__ = [
349 'TaskDelegationBridge',
350 'create_delegation_function_with_ledger'
351]