Coverage for integrations / internal_comm / internal_agent_communication.py: 71.4%

245 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2A2A (Agent-to-Agent) Protocol Module 

3 

4This module enables direct agent-to-agent communication, context exchange, 

5and skill-based task delegation within the multi-agent system. 

6 

7Features: 

8- Agent skill registry and discovery 

9- Context sharing between agents 

10- Task delegation to specialist agents 

11- Inter-agent messaging 

12- Collaborative execution tracking 

13""" 

14 

15import json 

16import logging 

17import threading 

18from typing import Dict, List, Any, Optional, Callable 

19from datetime import datetime 

20from collections import deque 

21import uuid 

22 

23logger = logging.getLogger(__name__) 

24 

25 

26class AgentSkill: 

27 """Represents a skill that an agent possesses. 

28 

29 Tracks multiple dimensions for intelligent agent selection: 

30 - proficiency: accuracy/quality (0.0-1.0) 

31 - avg_latency_ms: average execution time in milliseconds 

32 - avg_cost_spark: average cost per execution in Spark 

33 - success_rate: derived from usage_count / success_count 

34 """ 

35 

36 def __init__(self, name: str, description: str, proficiency: float = 1.0, 

37 avg_latency_ms: float = 0.0, avg_cost_spark: float = 0.0, 

38 metadata: Optional[Dict] = None): 

39 """ 

40 Initialize an agent skill 

41 

42 Args: 

43 name: Skill identifier 

44 description: Human-readable skill description 

45 proficiency: Skill proficiency level (0.0 to 1.0) 

46 avg_latency_ms: Average execution time in milliseconds 

47 avg_cost_spark: Average cost per execution in Spark currency 

48 metadata: Additional skill metadata 

49 """ 

50 self.name = name 

51 self.description = description 

52 self.proficiency = max(0.0, min(1.0, proficiency)) 

53 self.avg_latency_ms = avg_latency_ms 

54 self.avg_cost_spark = avg_cost_spark 

55 self.metadata = metadata or {} 

56 self.usage_count = 0 

57 self.success_count = 0 

58 self._total_latency_ms = 0.0 

59 self._total_cost_spark = 0.0 

60 

61 def record_usage(self, success: bool = True, latency_ms: float = 0.0, 

62 cost_spark: float = 0.0): 

63 """Record skill usage with optional latency and cost tracking.""" 

64 self.usage_count += 1 

65 if success: 

66 self.success_count += 1 

67 if latency_ms > 0: 

68 self._total_latency_ms += latency_ms 

69 self.avg_latency_ms = self._total_latency_ms / self.usage_count 

70 if cost_spark > 0: 

71 self._total_cost_spark += cost_spark 

72 self.avg_cost_spark = self._total_cost_spark / self.usage_count 

73 

74 def get_success_rate(self) -> float: 

75 """Calculate skill success rate""" 

76 if self.usage_count == 0: 

77 return 0.0 

78 return self.success_count / self.usage_count 

79 

80 def to_dict(self) -> Dict[str, Any]: 

81 """Convert skill to dictionary""" 

82 return { 

83 'name': self.name, 

84 'description': self.description, 

85 'proficiency': self.proficiency, 

86 'avg_latency_ms': round(self.avg_latency_ms, 1), 

87 'avg_cost_spark': round(self.avg_cost_spark, 2), 

88 'usage_count': self.usage_count, 

89 'success_rate': round(self.get_success_rate(), 3), 

90 'metadata': self.metadata 

91 } 

92 

93 

94class AgentSkillRegistry: 

95 """Registry for tracking agent skills and capabilities""" 

96 

97 def __init__(self): 

98 """Initialize the skill registry""" 

99 self.agents: Dict[str, Dict[str, AgentSkill]] = {} # agent_id -> {skill_name -> AgentSkill} 

100 self.lock = threading.Lock() 

101 

102 def register_agent(self, agent_id: str, skills: List[Dict[str, Any]]): 

103 """ 

104 Register an agent with its skills 

105 

106 Args: 

107 agent_id: Unique agent identifier 

108 skills: List of skill definitions 

109 """ 

110 with self.lock: 

111 if agent_id not in self.agents: 

112 self.agents[agent_id] = {} 

113 

114 for skill_def in skills: 

115 skill = AgentSkill( 

116 name=skill_def.get('name'), 

117 description=skill_def.get('description', ''), 

118 proficiency=skill_def.get('proficiency', 1.0), 

119 avg_latency_ms=skill_def.get('avg_latency_ms', 0.0), 

120 avg_cost_spark=skill_def.get('avg_cost_spark', 0.0), 

121 metadata=skill_def.get('metadata', {}) 

122 ) 

123 self.agents[agent_id][skill.name] = skill 

124 

125 logger.info(f"Registered agent {agent_id} with {len(skills)} skills") 

126 

127 def find_agents_with_skill(self, skill_name: str, min_proficiency: float = 0.0, 

128 strategy: str = 'accuracy') -> List[tuple]: 

129 """ 

130 Find all agents that have a specific skill, sorted by strategy. 

131 

132 Args: 

133 skill_name: Name of the skill to find 

134 min_proficiency: Minimum proficiency level required 

135 strategy: Selection strategy: 

136 - 'accuracy': highest proficiency (default, best quality) 

137 - 'speed': lowest avg_latency_ms (fastest response) 

138 - 'efficiency': highest success_rate with lowest cost 

139 - 'balanced': weighted composite of all dimensions 

140 

141 Returns: 

142 List of (agent_id, skill) tuples sorted by strategy 

143 """ 

144 with self.lock: 

145 matches = [] 

146 for agent_id, skills in self.agents.items(): 

147 if skill_name in skills: 

148 skill = skills[skill_name] 

149 if skill.proficiency >= min_proficiency: 

150 matches.append((agent_id, skill)) 

151 

152 if strategy == 'speed': 

153 # Lowest latency first (0 = unknown, sort last) 

154 matches.sort(key=lambda x: x[1].avg_latency_ms if x[1].avg_latency_ms > 0 

155 else float('inf')) 

156 elif strategy == 'efficiency': 

157 # Highest success_rate / lowest cost 

158 def efficiency_score(item): 

159 s = item[1] 

160 rate = s.get_success_rate() if s.usage_count > 0 else s.proficiency 

161 cost_penalty = s.avg_cost_spark / 100.0 if s.avg_cost_spark > 0 else 0.0 

162 return rate - cost_penalty 

163 matches.sort(key=efficiency_score, reverse=True) 

164 elif strategy == 'balanced': 

165 # Weighted composite: 40% proficiency, 25% success_rate, 

166 # 20% speed (inverse latency), 15% cost (inverse) 

167 def balanced_score(item): 

168 s = item[1] 

169 prof = s.proficiency 

170 rate = s.get_success_rate() if s.usage_count > 0 else prof 

171 # Normalize latency: lower is better, cap at 60s 

172 latency_norm = 1.0 - min(s.avg_latency_ms / 60000.0, 1.0) if s.avg_latency_ms > 0 else 0.5 

173 # Normalize cost: lower is better, cap at 100 spark 

174 cost_norm = 1.0 - min(s.avg_cost_spark / 100.0, 1.0) if s.avg_cost_spark > 0 else 0.5 

175 return (0.40 * prof) + (0.25 * rate) + (0.20 * latency_norm) + (0.15 * cost_norm) 

176 matches.sort(key=balanced_score, reverse=True) 

177 else: 

178 # Default: accuracy — highest proficiency first 

179 matches.sort(key=lambda x: x[1].proficiency, reverse=True) 

180 

181 return matches 

182 

183 def get_agent_skills(self, agent_id: str) -> Dict[str, AgentSkill]: 

184 """Get all skills for an agent""" 

185 with self.lock: 

186 return self.agents.get(agent_id, {}) 

187 

188 def record_skill_usage(self, agent_id: str, skill_name: str, success: bool = True): 

189 """Record usage of a skill by an agent""" 

190 with self.lock: 

191 if agent_id in self.agents and skill_name in self.agents[agent_id]: 

192 self.agents[agent_id][skill_name].record_usage(success) 

193 

194 def get_best_agent_for_skill(self, skill_name: str, 

195 strategy: str = 'accuracy') -> Optional[str]: 

196 """ 

197 Get the best agent for a specific skill using the given strategy. 

198 

199 Args: 

200 skill_name: Name of the skill 

201 strategy: Selection strategy ('accuracy', 'speed', 'efficiency', 'balanced') 

202 

203 Returns: 

204 Agent ID of the best agent, or None if no agent has the skill 

205 """ 

206 matches = self.find_agents_with_skill(skill_name, strategy=strategy) 

207 if matches: 

208 return matches[0][0] 

209 return None 

210 

211 

212class A2AMessage: 

213 """Represents a message between agents""" 

214 

215 def __init__(self, from_agent: str, to_agent: str, message_type: str, content: Any, metadata: Optional[Dict] = None): 

216 """ 

217 Initialize an A2A message 

218 

219 Args: 

220 from_agent: Sender agent ID 

221 to_agent: Recipient agent ID 

222 message_type: Type of message (request, response, broadcast, etc.) 

223 content: Message content 

224 metadata: Additional metadata 

225 """ 

226 self.message_id = str(uuid.uuid4()) 

227 self.from_agent = from_agent 

228 self.to_agent = to_agent 

229 self.message_type = message_type 

230 self.content = content 

231 self.metadata = metadata or {} 

232 self.timestamp = datetime.now() 

233 self.status = 'pending' 

234 

235 def to_dict(self) -> Dict[str, Any]: 

236 """Convert message to dictionary""" 

237 return { 

238 'message_id': self.message_id, 

239 'from_agent': self.from_agent, 

240 'to_agent': self.to_agent, 

241 'message_type': self.message_type, 

242 'content': self.content, 

243 'metadata': self.metadata, 

244 'timestamp': self.timestamp.isoformat(), 

245 'status': self.status 

246 } 

247 

248 

249class A2AContextExchange: 

250 """Manages context exchange and task delegation between agents""" 

251 

252 def __init__(self, skill_registry: AgentSkillRegistry): 

253 """ 

254 Initialize A2A context exchange 

255 

256 Args: 

257 skill_registry: Agent skill registry 

258 """ 

259 self.skill_registry = skill_registry 

260 self.message_queues: Dict[str, deque] = {} # agent_id -> message queue 

261 self.shared_context: Dict[str, Any] = {} # Shared context across agents 

262 self.delegations: Dict[str, Dict[str, Any]] = {} # delegation_id -> delegation info 

263 self.lock = threading.Lock() 

264 

265 def register_agent(self, agent_id: str): 

266 """Register an agent for A2A communication""" 

267 with self.lock: 

268 if agent_id not in self.message_queues: 

269 self.message_queues[agent_id] = deque(maxlen=100) 

270 logger.info(f"Registered agent {agent_id} for A2A communication") 

271 

272 def send_message(self, from_agent: str, to_agent: str, message_type: str, content: Any, metadata: Optional[Dict] = None): 

273 """ 

274 Send a message from one agent to another 

275 

276 Args: 

277 from_agent: Sender agent ID 

278 to_agent: Recipient agent ID 

279 message_type: Type of message 

280 content: Message content 

281 metadata: Additional metadata 

282 

283 Returns: 

284 Message ID 

285 """ 

286 # Security: Encrypt message content if A2ACrypto available 

287 try: 

288 from security.crypto import A2ACrypto 

289 crypto = A2ACrypto() 

290 if isinstance(content, str): 

291 content = crypto.encrypt_message(content) 

292 elif isinstance(content, dict): 

293 content = crypto.encrypt_payload(content) 

294 except ImportError: 

295 pass # Send unencrypted (backward compat) 

296 except Exception: 

297 pass # Encryption failed, send unencrypted 

298 

299 message = A2AMessage(from_agent, to_agent, message_type, content, metadata) 

300 

301 with self.lock: 

302 if to_agent not in self.message_queues: 

303 self.register_agent(to_agent) 

304 

305 self.message_queues[to_agent].append(message) 

306 logger.info(f"Message sent from {from_agent} to {to_agent}: {message_type}") 

307 

308 return message.message_id 

309 

310 def get_messages(self, agent_id: str, message_type: Optional[str] = None) -> List[A2AMessage]: 

311 """ 

312 Get pending messages for an agent 

313 

314 Args: 

315 agent_id: Agent ID 

316 message_type: Optional filter by message type 

317 

318 Returns: 

319 List of pending messages 

320 """ 

321 with self.lock: 

322 if agent_id not in self.message_queues: 

323 return [] 

324 

325 messages = list(self.message_queues[agent_id]) 

326 

327 if message_type: 

328 messages = [m for m in messages if m.message_type == message_type] 

329 

330 # Clear processed messages 

331 self.message_queues[agent_id].clear() 

332 

333 return messages 

334 

335 def share_context(self, agent_id: str, context_key: str, context_value: Any): 

336 """ 

337 Share context with other agents 

338 

339 Args: 

340 agent_id: Agent sharing the context 

341 context_key: Context key 

342 context_value: Context value 

343 """ 

344 with self.lock: 

345 self.shared_context[context_key] = { 

346 'value': context_value, 

347 'shared_by': agent_id, 

348 'timestamp': datetime.now().isoformat() 

349 } 

350 logger.info(f"Agent {agent_id} shared context: {context_key}") 

351 

352 def get_shared_context(self, context_key: str) -> Optional[Any]: 

353 """ 

354 Get shared context 

355 

356 Args: 

357 context_key: Context key 

358 

359 Returns: 

360 Context value or None 

361 """ 

362 with self.lock: 

363 context = self.shared_context.get(context_key) 

364 if context: 

365 return context.get('value') 

366 return None 

367 

368 def _score_agent(self, agent_skills: Dict[str, 'AgentSkill'], 

369 required_skills: List[str], strategy: str) -> float: 

370 """ 

371 Score an agent across all required skills using the given strategy. 

372 

373 Args: 

374 agent_skills: Dict of skill_name -> AgentSkill for this agent 

375 required_skills: Skills the agent must have 

376 strategy: 'accuracy', 'speed', 'efficiency', or 'balanced' 

377 

378 Returns: 

379 Composite score (higher is better) 

380 """ 

381 scores = [] 

382 for skill_name in required_skills: 

383 skill = agent_skills.get(skill_name) 

384 if not skill: 

385 return -1.0 # Missing required skill 

386 

387 if strategy == 'speed': 

388 # Lower latency = higher score; unknown (0) gets middle score 

389 if skill.avg_latency_ms > 0: 

390 scores.append(1.0 - min(skill.avg_latency_ms / 60000.0, 1.0)) 

391 else: 

392 scores.append(0.5) 

393 elif strategy == 'efficiency': 

394 rate = skill.get_success_rate() if skill.usage_count > 0 else skill.proficiency 

395 cost_penalty = min(skill.avg_cost_spark / 100.0, 1.0) if skill.avg_cost_spark > 0 else 0.0 

396 scores.append(rate - cost_penalty) 

397 elif strategy == 'balanced': 

398 prof = skill.proficiency 

399 rate = skill.get_success_rate() if skill.usage_count > 0 else prof 

400 latency_norm = (1.0 - min(skill.avg_latency_ms / 60000.0, 1.0)) if skill.avg_latency_ms > 0 else 0.5 

401 cost_norm = (1.0 - min(skill.avg_cost_spark / 100.0, 1.0)) if skill.avg_cost_spark > 0 else 0.5 

402 scores.append(0.40 * prof + 0.25 * rate + 0.20 * latency_norm + 0.15 * cost_norm) 

403 else: 

404 # accuracy (default) 

405 scores.append(skill.proficiency) 

406 

407 return sum(scores) / len(scores) if scores else 0.0 

408 

409 def delegate_task(self, from_agent: str, task: str, required_skills: List[str], 

410 context: Optional[Dict] = None, 

411 strategy: str = 'accuracy') -> Optional[str]: 

412 """ 

413 Delegate a task to the most suitable agent using the given strategy. 

414 

415 Args: 

416 from_agent: Agent delegating the task 

417 task: Task description 

418 required_skills: List of required skills 

419 context: Optional task context 

420 strategy: Selection strategy ('accuracy', 'speed', 'efficiency', 'balanced') 

421 

422 Returns: 

423 Delegation ID or None if no suitable agent found 

424 """ 

425 if not required_skills: 

426 logger.warning("No required skills specified for task delegation") 

427 return None 

428 

429 # Find agents with all required skills 

430 suitable_agents = None 

431 for skill in required_skills: 

432 agents_with_skill = set( 

433 agent_id for agent_id, _ 

434 in self.skill_registry.find_agents_with_skill(skill, strategy=strategy) 

435 ) 

436 if suitable_agents is None: 

437 suitable_agents = agents_with_skill 

438 else: 

439 suitable_agents = suitable_agents.intersection(agents_with_skill) 

440 

441 if not suitable_agents: 

442 # Fallback: semantic match against existing recipes + expert catalog 

443 try: 

444 from integrations.agentic_router import find_matching_agent 

445 _match = find_matching_agent(task) 

446 if _match and _match.get('agent_id'): 

447 logger.info(f"Skill match failed, semantic match found: {_match['name']}") 

448 delegation_id = str(uuid.uuid4()) 

449 with self.lock: 

450 self.delegations[delegation_id] = { 

451 'from_agent': from_agent, 

452 'to_agent': _match['agent_id'], 

453 'task': task, 

454 'status': 'matched_existing', 

455 'matched_via': 'semantic', 

456 'matched_name': _match['name'], 

457 } 

458 return delegation_id 

459 except Exception as _e: 

460 logger.debug(f"Semantic fallback failed: {_e}") 

461 logger.warning(f"No agents found with required skills: {required_skills}") 

462 return None 

463 

464 # Score each agent using the strategy across all required skills 

465 best_agent = None 

466 best_score = -1.0 

467 

468 for agent_id in suitable_agents: 

469 if agent_id == from_agent: 

470 continue 

471 

472 agent_skills = self.skill_registry.get_agent_skills(agent_id) 

473 score = self._score_agent(agent_skills, required_skills, strategy) 

474 

475 if score > best_score: 

476 best_score = score 

477 best_agent = agent_id 

478 

479 if not best_agent: 

480 logger.warning("No suitable agent found for delegation") 

481 return None 

482 

483 # Create delegation 

484 delegation_id = str(uuid.uuid4()) 

485 

486 with self.lock: 

487 self.delegations[delegation_id] = { 

488 'from_agent': from_agent, 

489 'to_agent': best_agent, 

490 'task': task, 

491 'required_skills': required_skills, 

492 'context': context or {}, 

493 'status': 'delegated', 

494 'created_at': datetime.now().isoformat(), 

495 'result': None 

496 } 

497 

498 # Send delegation message 

499 self.send_message( 

500 from_agent=from_agent, 

501 to_agent=best_agent, 

502 message_type='task_delegation', 

503 content=task, 

504 metadata={ 

505 'delegation_id': delegation_id, 

506 'required_skills': required_skills, 

507 'context': context 

508 } 

509 ) 

510 

511 logger.info(f"Task delegated from {from_agent} to {best_agent}: {delegation_id}") 

512 return delegation_id 

513 

514 def complete_delegation(self, delegation_id: str, result: Any): 

515 """ 

516 Mark a delegation as complete with result 

517 

518 Args: 

519 delegation_id: Delegation ID 

520 result: Delegation result 

521 """ 

522 with self.lock: 

523 if delegation_id in self.delegations: 

524 self.delegations[delegation_id]['status'] = 'completed' 

525 self.delegations[delegation_id]['result'] = result 

526 self.delegations[delegation_id]['completed_at'] = datetime.now().isoformat() 

527 

528 # Send completion message back to delegator 

529 delegation = self.delegations[delegation_id] 

530 self.send_message( 

531 from_agent=delegation['to_agent'], 

532 to_agent=delegation['from_agent'], 

533 message_type='delegation_complete', 

534 content=result, 

535 metadata={'delegation_id': delegation_id} 

536 ) 

537 

538 logger.info(f"Delegation completed: {delegation_id}") 

539 

540 def get_delegation_status(self, delegation_id: str) -> Optional[Dict[str, Any]]: 

541 """Get delegation status""" 

542 with self.lock: 

543 return self.delegations.get(delegation_id) 

544 

545 

546# Global instances 

547skill_registry = AgentSkillRegistry() 

548a2a_context = A2AContextExchange(skill_registry) 

549 

550 

551def register_agent_with_skills(agent_id: str, skills: List[Dict[str, Any]]): 

552 """ 

553 Register an agent with its skills 

554 

555 Args: 

556 agent_id: Agent identifier 

557 skills: List of skill definitions 

558 """ 

559 skill_registry.register_agent(agent_id, skills) 

560 a2a_context.register_agent(agent_id) 

561 

562 

563def create_delegation_function(from_agent_id: str) -> Callable: 

564 """ 

565 Create a delegation function for an agent 

566 

567 Args: 

568 from_agent_id: Agent ID 

569 

570 Returns: 

571 Delegation function 

572 """ 

573 def delegate_to_specialist(task: str, required_skills: List[str], context: Optional[Dict] = None) -> str: 

574 """ 

575 Delegate a task to a specialist agent 

576 

577 Args: 

578 task: Task description 

579 required_skills: Required skills for the task 

580 context: Optional task context 

581 

582 Returns: 

583 Delegation ID or error message 

584 """ 

585 delegation_id = a2a_context.delegate_task(from_agent_id, task, required_skills, context) 

586 

587 if delegation_id: 

588 return json.dumps({ 

589 'success': True, 

590 'delegation_id': delegation_id, 

591 'message': f'Task delegated successfully to specialist agent' 

592 }) 

593 else: 

594 return json.dumps({ 

595 'success': False, 

596 'error': 'No suitable agent found with required skills', 

597 'required_skills': required_skills 

598 }) 

599 

600 return delegate_to_specialist 

601 

602 

603def create_context_sharing_function(agent_id: str) -> Callable: 

604 """ 

605 Create a context sharing function for an agent 

606 

607 Args: 

608 agent_id: Agent ID 

609 

610 Returns: 

611 Context sharing function 

612 """ 

613 def share_context_with_agents(context_key: str, context_value: Any) -> str: 

614 """ 

615 Share context with other agents 

616 

617 Args: 

618 context_key: Context identifier 

619 context_value: Context value 

620 

621 Returns: 

622 Success message 

623 """ 

624 a2a_context.share_context(agent_id, context_key, context_value) 

625 

626 return json.dumps({ 

627 'success': True, 

628 'message': f'Context "{context_key}" shared successfully', 

629 'shared_by': agent_id 

630 }) 

631 

632 return share_context_with_agents 

633 

634 

635def create_context_retrieval_function() -> Callable: 

636 """ 

637 Create a context retrieval function 

638 

639 Returns: 

640 Context retrieval function 

641 """ 

642 def get_shared_context(context_key: str) -> str: 

643 """ 

644 Retrieve shared context from other agents 

645 

646 Args: 

647 context_key: Context identifier 

648 

649 Returns: 

650 Context value or error 

651 """ 

652 context = a2a_context.get_shared_context(context_key) 

653 

654 if context is not None: 

655 return json.dumps({ 

656 'success': True, 

657 'context_key': context_key, 

658 'context_value': context 

659 }) 

660 else: 

661 return json.dumps({ 

662 'success': False, 

663 'error': f'Context "{context_key}" not found' 

664 }) 

665 

666 return get_shared_context