Coverage for integrations / internal_comm / task_delegation_bridge.py: 0.0%

95 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Task Delegation Bridge - Integrates A2A delegation with SmartLedger 

3 

4This module bridges the gap between A2A delegation and task_ledger, 

5ensuring that delegated tasks are properly tracked with state management 

6and auto-resume capabilities. 

7 

8Key Features: 

9- Parent task goes BLOCKED while delegation is in progress 

10- Delegated task created in ledger with proper parent-child relationship 

11- Automatic resume of parent task when delegation completes 

12- Full audit trail of delegation lifecycle 

13- Nested task support for complex delegations 

14""" 

15 

16import logging 

17import json 

18from typing import Optional, Dict, Any, List 

19from datetime import datetime 

20import sys 

21import os 

22 

23# Add parent directory to path for imports 

24sys.path.insert(0, os.path.abspath(os.path.join(os.path.dirname(__file__), '../..'))) 

25 

26from agent_ledger import ( 

27 SmartLedger, Task, TaskType, TaskStatus, ExecutionMode 

28) 

29from integrations.internal_comm.internal_agent_communication import ( 

30 A2AContextExchange, skill_registry 

31) 

32 

33logger = logging.getLogger(__name__) 

34 

35 

36class TaskDelegationBridge: 

37 """ 

38 Bridges A2A delegation with SmartLedger for proper state management 

39 

40 Workflow: 

41 1. Agent A has task T1 in progress 

42 2. Agent A delegates subtask T2 to Agent B 

43 3. T1 → BLOCKED (waiting for delegation) 

44 4. T2 → Created as child of T1, assigned to Agent B 

45 5. Agent B processes T2 

46 6. When T2 → COMPLETED, T1 → Auto-resumes to IN_PROGRESS 

47 """ 

48 

49 def __init__(self, a2a_context: A2AContextExchange, ledger: SmartLedger): 

50 """ 

51 Initialize delegation bridge 

52 

53 Args: 

54 a2a_context: A2A context exchange for delegation 

55 ledger: SmartLedger for task tracking 

56 """ 

57 self.a2a_context = a2a_context 

58 self.ledger = ledger 

59 self.delegation_map = {} # delegation_id -> (parent_task_id, child_task_id) 

60 

61 def delegate_task_with_tracking( 

62 self, 

63 parent_task_id: str, 

64 from_agent: str, 

65 task_description: str, 

66 required_skills: List[str], 

67 context: Optional[Dict] = None 

68 ) -> Optional[str]: 

69 """ 

70 Delegate a task with full task_ledger integration 

71 

72 Args: 

73 parent_task_id: ID of the parent task (will be BLOCKED) 

74 from_agent: Agent delegating the task 

75 task_description: Description of task to delegate 

76 required_skills: Required skills for the task 

77 context: Optional context data 

78 

79 Returns: 

80 Delegation ID or None if delegation fails 

81 """ 

82 # Step 1: Get parent task 

83 parent_task = self.ledger.get_task(parent_task_id) 

84 if not parent_task: 

85 logger.error(f"Parent task not found: {parent_task_id}") 

86 return None 

87 

88 # Step 2: Delegate via A2A 

89 delegation_id = self.a2a_context.delegate_task( 

90 from_agent=from_agent, 

91 task=task_description, 

92 required_skills=required_skills, 

93 context=context 

94 ) 

95 

96 if not delegation_id: 

97 logger.error("A2A delegation failed - no suitable agent found") 

98 return None 

99 

100 # Step 3: Get delegation info to find target agent 

101 delegation_info = self.a2a_context.delegations.get(delegation_id) 

102 if not delegation_info: 

103 logger.error(f"Delegation info not found: {delegation_id}") 

104 return None 

105 

106 to_agent = delegation_info['to_agent'] 

107 

108 # Step 4: Create child task in ledger with parent-child relationship 

109 child_task = self.ledger.create_parent_child_task( 

110 parent_task_id=parent_task_id, 

111 child_description=task_description, 

112 child_type=TaskType.AUTONOMOUS, 

113 context={ 

114 'delegation_id': delegation_id, 

115 'delegated_by': from_agent, 

116 'delegated_to': to_agent, 

117 'required_skills': required_skills, 

118 'delegation_context': context or {} 

119 } 

120 ) 

121 

122 if not child_task: 

123 logger.error(f"Failed to create child task for delegation {delegation_id}") 

124 return None 

125 

126 # Step 6: Block parent task (waiting for delegation) 

127 self.ledger.update_task_status( 

128 parent_task_id, 

129 TaskStatus.BLOCKED, 

130 f"Waiting for delegated task: {child_task.task_id}" 

131 ) 

132 

133 # Step 7: Map delegation to task IDs 

134 self.delegation_map[delegation_id] = { 

135 'parent_task_id': parent_task_id, 

136 'child_task_id': child_task.task_id, 

137 'from_agent': from_agent, 

138 'to_agent': to_agent, 

139 'created_at': datetime.now().isoformat() 

140 } 

141 

142 logger.info( 

143 f"Task delegation tracked: {delegation_id}\n" 

144 f" Parent task {parent_task_id} → BLOCKED\n" 

145 f" Child task {child_task.task_id} → Created for {to_agent}" 

146 ) 

147 

148 return delegation_id 

149 

150 def complete_delegation_with_tracking( 

151 self, 

152 delegation_id: str, 

153 result: Any, 

154 success: bool = True 

155 ) -> bool: 

156 """ 

157 Complete a delegation and update task states 

158 

159 Args: 

160 delegation_id: Delegation ID 

161 result: Delegation result 

162 success: Whether delegation succeeded 

163 

164 Returns: 

165 True if completion successful 

166 """ 

167 # Step 1: Get delegation mapping 

168 if delegation_id not in self.delegation_map: 

169 logger.error(f"Delegation mapping not found: {delegation_id}") 

170 return False 

171 

172 mapping = self.delegation_map[delegation_id] 

173 parent_task_id = mapping['parent_task_id'] 

174 child_task_id = mapping['child_task_id'] 

175 

176 # Step 2: Complete delegation in A2A 

177 self.a2a_context.complete_delegation(delegation_id, result) 

178 

179 # Step 3: Update child task status 

180 child_status = TaskStatus.COMPLETED if success else TaskStatus.FAILED 

181 self.ledger.update_task_status( 

182 child_task_id, 

183 child_status, 

184 json.dumps(result) if isinstance(result, (dict, list)) else str(result) 

185 ) 

186 

187 # Step 4: Auto-resume parent task (task_ledger should handle this automatically) 

188 # But let's explicitly trigger it to be safe 

189 parent_task = self.ledger.get_task(parent_task_id) 

190 if parent_task and parent_task.status == TaskStatus.BLOCKED: 

191 # Check if all dependencies are complete 

192 if self._all_dependencies_complete(parent_task_id): 

193 self.ledger.update_task_status( 

194 parent_task_id, 

195 TaskStatus.IN_PROGRESS, 

196 f"Resumed after delegation {delegation_id} completed" 

197 ) 

198 logger.info(f"Parent task {parent_task_id} auto-resumed after delegation") 

199 

200 logger.info( 

201 f"Delegation completed: {delegation_id}\n" 

202 f" Child task {child_task_id} → {child_status.value}\n" 

203 f" Parent task {parent_task_id} → Resumed" 

204 ) 

205 

206 return True 

207 

208 def _all_dependencies_complete(self, task_id: str) -> bool: 

209 """Check if all dependencies of a task are complete""" 

210 task = self.ledger.get_task(task_id) 

211 if not task or not task.depends_on: 

212 return True 

213 

214 for dep_id in task.depends_on: 

215 dep_task = self.ledger.get_task(dep_id) 

216 if not dep_task or not TaskStatus.is_terminal_state(dep_task.status): 

217 return False 

218 

219 return True 

220 

221 def get_delegation_status(self, delegation_id: str) -> Optional[Dict[str, Any]]: 

222 """ 

223 Get comprehensive status of a delegation 

224 

225 Args: 

226 delegation_id: Delegation ID 

227 

228 Returns: 

229 Dictionary with delegation status including task states 

230 """ 

231 if delegation_id not in self.delegation_map: 

232 return None 

233 

234 mapping = self.delegation_map[delegation_id] 

235 parent_task = self.ledger.get_task(mapping['parent_task_id']) 

236 child_task = self.ledger.get_task(mapping['child_task_id']) 

237 a2a_delegation = self.a2a_context.delegations.get(delegation_id) 

238 

239 return { 

240 'delegation_id': delegation_id, 

241 'parent_task': { 

242 'task_id': parent_task.task_id if parent_task else None, 

243 'status': parent_task.status.value if parent_task else None, 

244 'description': parent_task.description if parent_task else None 

245 }, 

246 'child_task': { 

247 'task_id': child_task.task_id if child_task else None, 

248 'status': child_task.status.value if child_task else None, 

249 'description': child_task.description if child_task else None 

250 }, 

251 'delegation': a2a_delegation, 

252 'mapping': mapping 

253 } 

254 

255 def list_active_delegations(self) -> List[Dict[str, Any]]: 

256 """Get all active delegations with their task states""" 

257 active = [] 

258 

259 for delegation_id, mapping in self.delegation_map.items(): 

260 child_task = self.ledger.get_task(mapping['child_task_id']) 

261 

262 # Only include if child task is not in terminal state 

263 if child_task and not TaskStatus.is_terminal_state(child_task.status): 

264 status = self.get_delegation_status(delegation_id) 

265 if status: 

266 active.append(status) 

267 

268 return active 

269 

270 

271def create_delegation_function_with_ledger( 

272 agent_name: str, 

273 ledger: SmartLedger, 

274 a2a_context: A2AContextExchange, 

275 current_task_id: Optional[str] = None 

276): 

277 """ 

278 Create a delegation function that integrates with task_ledger 

279 

280 Args: 

281 agent_name: Name of the agent 

282 ledger: SmartLedger instance 

283 a2a_context: A2A context exchange 

284 current_task_id: Current task ID (will be blocked during delegation) 

285 

286 Returns: 

287 Delegation function for autogen 

288 """ 

289 bridge = TaskDelegationBridge(a2a_context, ledger) 

290 

291 def delegate_with_tracking( 

292 task: str, 

293 required_skills: List[str], 

294 context: Optional[Dict] = None 

295 ) -> str: 

296 """ 

297 Delegate a task to a specialist agent with full tracking 

298 

299 Args: 

300 task: Task description 

301 required_skills: Required skills 

302 context: Optional context 

303 

304 Returns: 

305 JSON result with delegation status 

306 """ 

307 # If we have a current task, use it as parent 

308 parent_task_id = current_task_id 

309 

310 # If no current task, create one 

311 if not parent_task_id: 

312 parent_task_id = f"task_delegation_{uuid.uuid4().hex[:12]}" 

313 parent_task = Task( 

314 task_id=parent_task_id, 

315 description=f"{agent_name} - delegating task", 

316 task_type=TaskType.AUTONOMOUS, 

317 context={'delegating_agent': agent_name} 

318 ) 

319 ledger.add_task(parent_task) 

320 

321 # Delegate with tracking 

322 delegation_id = bridge.delegate_task_with_tracking( 

323 parent_task_id=parent_task_id, 

324 from_agent=agent_name, 

325 task_description=task, 

326 required_skills=required_skills, 

327 context=context 

328 ) 

329 

330 if delegation_id: 

331 status = bridge.get_delegation_status(delegation_id) 

332 return json.dumps({ 

333 'success': True, 

334 'delegation_id': delegation_id, 

335 'message': f'Task delegated to specialist agent', 

336 'status': status 

337 }, indent=2) 

338 else: 

339 return json.dumps({ 

340 'success': False, 

341 'error': 'No suitable agent found for delegation' 

342 }, indent=2) 

343 

344 return delegate_with_tracking 

345 

346 

347# Convenience exports 

348__all__ = [ 

349 'TaskDelegationBridge', 

350 'create_delegation_function_with_ledger' 

351]