Coverage for integrations / internal_comm / internal_agent_communication.py: 71.4%
245 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2A2A (Agent-to-Agent) Protocol Module
4This module enables direct agent-to-agent communication, context exchange,
5and skill-based task delegation within the multi-agent system.
7Features:
8- Agent skill registry and discovery
9- Context sharing between agents
10- Task delegation to specialist agents
11- Inter-agent messaging
12- Collaborative execution tracking
13"""
15import json
16import logging
17import threading
18from typing import Dict, List, Any, Optional, Callable
19from datetime import datetime
20from collections import deque
21import uuid
23logger = logging.getLogger(__name__)
26class AgentSkill:
27 """Represents a skill that an agent possesses.
29 Tracks multiple dimensions for intelligent agent selection:
30 - proficiency: accuracy/quality (0.0-1.0)
31 - avg_latency_ms: average execution time in milliseconds
32 - avg_cost_spark: average cost per execution in Spark
33 - success_rate: derived from usage_count / success_count
34 """
36 def __init__(self, name: str, description: str, proficiency: float = 1.0,
37 avg_latency_ms: float = 0.0, avg_cost_spark: float = 0.0,
38 metadata: Optional[Dict] = None):
39 """
40 Initialize an agent skill
42 Args:
43 name: Skill identifier
44 description: Human-readable skill description
45 proficiency: Skill proficiency level (0.0 to 1.0)
46 avg_latency_ms: Average execution time in milliseconds
47 avg_cost_spark: Average cost per execution in Spark currency
48 metadata: Additional skill metadata
49 """
50 self.name = name
51 self.description = description
52 self.proficiency = max(0.0, min(1.0, proficiency))
53 self.avg_latency_ms = avg_latency_ms
54 self.avg_cost_spark = avg_cost_spark
55 self.metadata = metadata or {}
56 self.usage_count = 0
57 self.success_count = 0
58 self._total_latency_ms = 0.0
59 self._total_cost_spark = 0.0
61 def record_usage(self, success: bool = True, latency_ms: float = 0.0,
62 cost_spark: float = 0.0):
63 """Record skill usage with optional latency and cost tracking."""
64 self.usage_count += 1
65 if success:
66 self.success_count += 1
67 if latency_ms > 0:
68 self._total_latency_ms += latency_ms
69 self.avg_latency_ms = self._total_latency_ms / self.usage_count
70 if cost_spark > 0:
71 self._total_cost_spark += cost_spark
72 self.avg_cost_spark = self._total_cost_spark / self.usage_count
74 def get_success_rate(self) -> float:
75 """Calculate skill success rate"""
76 if self.usage_count == 0:
77 return 0.0
78 return self.success_count / self.usage_count
80 def to_dict(self) -> Dict[str, Any]:
81 """Convert skill to dictionary"""
82 return {
83 'name': self.name,
84 'description': self.description,
85 'proficiency': self.proficiency,
86 'avg_latency_ms': round(self.avg_latency_ms, 1),
87 'avg_cost_spark': round(self.avg_cost_spark, 2),
88 'usage_count': self.usage_count,
89 'success_rate': round(self.get_success_rate(), 3),
90 'metadata': self.metadata
91 }
94class AgentSkillRegistry:
95 """Registry for tracking agent skills and capabilities"""
97 def __init__(self):
98 """Initialize the skill registry"""
99 self.agents: Dict[str, Dict[str, AgentSkill]] = {} # agent_id -> {skill_name -> AgentSkill}
100 self.lock = threading.Lock()
102 def register_agent(self, agent_id: str, skills: List[Dict[str, Any]]):
103 """
104 Register an agent with its skills
106 Args:
107 agent_id: Unique agent identifier
108 skills: List of skill definitions
109 """
110 with self.lock:
111 if agent_id not in self.agents:
112 self.agents[agent_id] = {}
114 for skill_def in skills:
115 skill = AgentSkill(
116 name=skill_def.get('name'),
117 description=skill_def.get('description', ''),
118 proficiency=skill_def.get('proficiency', 1.0),
119 avg_latency_ms=skill_def.get('avg_latency_ms', 0.0),
120 avg_cost_spark=skill_def.get('avg_cost_spark', 0.0),
121 metadata=skill_def.get('metadata', {})
122 )
123 self.agents[agent_id][skill.name] = skill
125 logger.info(f"Registered agent {agent_id} with {len(skills)} skills")
127 def find_agents_with_skill(self, skill_name: str, min_proficiency: float = 0.0,
128 strategy: str = 'accuracy') -> List[tuple]:
129 """
130 Find all agents that have a specific skill, sorted by strategy.
132 Args:
133 skill_name: Name of the skill to find
134 min_proficiency: Minimum proficiency level required
135 strategy: Selection strategy:
136 - 'accuracy': highest proficiency (default, best quality)
137 - 'speed': lowest avg_latency_ms (fastest response)
138 - 'efficiency': highest success_rate with lowest cost
139 - 'balanced': weighted composite of all dimensions
141 Returns:
142 List of (agent_id, skill) tuples sorted by strategy
143 """
144 with self.lock:
145 matches = []
146 for agent_id, skills in self.agents.items():
147 if skill_name in skills:
148 skill = skills[skill_name]
149 if skill.proficiency >= min_proficiency:
150 matches.append((agent_id, skill))
152 if strategy == 'speed':
153 # Lowest latency first (0 = unknown, sort last)
154 matches.sort(key=lambda x: x[1].avg_latency_ms if x[1].avg_latency_ms > 0
155 else float('inf'))
156 elif strategy == 'efficiency':
157 # Highest success_rate / lowest cost
158 def efficiency_score(item):
159 s = item[1]
160 rate = s.get_success_rate() if s.usage_count > 0 else s.proficiency
161 cost_penalty = s.avg_cost_spark / 100.0 if s.avg_cost_spark > 0 else 0.0
162 return rate - cost_penalty
163 matches.sort(key=efficiency_score, reverse=True)
164 elif strategy == 'balanced':
165 # Weighted composite: 40% proficiency, 25% success_rate,
166 # 20% speed (inverse latency), 15% cost (inverse)
167 def balanced_score(item):
168 s = item[1]
169 prof = s.proficiency
170 rate = s.get_success_rate() if s.usage_count > 0 else prof
171 # Normalize latency: lower is better, cap at 60s
172 latency_norm = 1.0 - min(s.avg_latency_ms / 60000.0, 1.0) if s.avg_latency_ms > 0 else 0.5
173 # Normalize cost: lower is better, cap at 100 spark
174 cost_norm = 1.0 - min(s.avg_cost_spark / 100.0, 1.0) if s.avg_cost_spark > 0 else 0.5
175 return (0.40 * prof) + (0.25 * rate) + (0.20 * latency_norm) + (0.15 * cost_norm)
176 matches.sort(key=balanced_score, reverse=True)
177 else:
178 # Default: accuracy — highest proficiency first
179 matches.sort(key=lambda x: x[1].proficiency, reverse=True)
181 return matches
183 def get_agent_skills(self, agent_id: str) -> Dict[str, AgentSkill]:
184 """Get all skills for an agent"""
185 with self.lock:
186 return self.agents.get(agent_id, {})
188 def record_skill_usage(self, agent_id: str, skill_name: str, success: bool = True):
189 """Record usage of a skill by an agent"""
190 with self.lock:
191 if agent_id in self.agents and skill_name in self.agents[agent_id]:
192 self.agents[agent_id][skill_name].record_usage(success)
194 def get_best_agent_for_skill(self, skill_name: str,
195 strategy: str = 'accuracy') -> Optional[str]:
196 """
197 Get the best agent for a specific skill using the given strategy.
199 Args:
200 skill_name: Name of the skill
201 strategy: Selection strategy ('accuracy', 'speed', 'efficiency', 'balanced')
203 Returns:
204 Agent ID of the best agent, or None if no agent has the skill
205 """
206 matches = self.find_agents_with_skill(skill_name, strategy=strategy)
207 if matches:
208 return matches[0][0]
209 return None
212class A2AMessage:
213 """Represents a message between agents"""
215 def __init__(self, from_agent: str, to_agent: str, message_type: str, content: Any, metadata: Optional[Dict] = None):
216 """
217 Initialize an A2A message
219 Args:
220 from_agent: Sender agent ID
221 to_agent: Recipient agent ID
222 message_type: Type of message (request, response, broadcast, etc.)
223 content: Message content
224 metadata: Additional metadata
225 """
226 self.message_id = str(uuid.uuid4())
227 self.from_agent = from_agent
228 self.to_agent = to_agent
229 self.message_type = message_type
230 self.content = content
231 self.metadata = metadata or {}
232 self.timestamp = datetime.now()
233 self.status = 'pending'
235 def to_dict(self) -> Dict[str, Any]:
236 """Convert message to dictionary"""
237 return {
238 'message_id': self.message_id,
239 'from_agent': self.from_agent,
240 'to_agent': self.to_agent,
241 'message_type': self.message_type,
242 'content': self.content,
243 'metadata': self.metadata,
244 'timestamp': self.timestamp.isoformat(),
245 'status': self.status
246 }
249class A2AContextExchange:
250 """Manages context exchange and task delegation between agents"""
252 def __init__(self, skill_registry: AgentSkillRegistry):
253 """
254 Initialize A2A context exchange
256 Args:
257 skill_registry: Agent skill registry
258 """
259 self.skill_registry = skill_registry
260 self.message_queues: Dict[str, deque] = {} # agent_id -> message queue
261 self.shared_context: Dict[str, Any] = {} # Shared context across agents
262 self.delegations: Dict[str, Dict[str, Any]] = {} # delegation_id -> delegation info
263 self.lock = threading.Lock()
265 def register_agent(self, agent_id: str):
266 """Register an agent for A2A communication"""
267 with self.lock:
268 if agent_id not in self.message_queues:
269 self.message_queues[agent_id] = deque(maxlen=100)
270 logger.info(f"Registered agent {agent_id} for A2A communication")
272 def send_message(self, from_agent: str, to_agent: str, message_type: str, content: Any, metadata: Optional[Dict] = None):
273 """
274 Send a message from one agent to another
276 Args:
277 from_agent: Sender agent ID
278 to_agent: Recipient agent ID
279 message_type: Type of message
280 content: Message content
281 metadata: Additional metadata
283 Returns:
284 Message ID
285 """
286 # Security: Encrypt message content if A2ACrypto available
287 try:
288 from security.crypto import A2ACrypto
289 crypto = A2ACrypto()
290 if isinstance(content, str):
291 content = crypto.encrypt_message(content)
292 elif isinstance(content, dict):
293 content = crypto.encrypt_payload(content)
294 except ImportError:
295 pass # Send unencrypted (backward compat)
296 except Exception:
297 pass # Encryption failed, send unencrypted
299 message = A2AMessage(from_agent, to_agent, message_type, content, metadata)
301 with self.lock:
302 if to_agent not in self.message_queues:
303 self.register_agent(to_agent)
305 self.message_queues[to_agent].append(message)
306 logger.info(f"Message sent from {from_agent} to {to_agent}: {message_type}")
308 return message.message_id
310 def get_messages(self, agent_id: str, message_type: Optional[str] = None) -> List[A2AMessage]:
311 """
312 Get pending messages for an agent
314 Args:
315 agent_id: Agent ID
316 message_type: Optional filter by message type
318 Returns:
319 List of pending messages
320 """
321 with self.lock:
322 if agent_id not in self.message_queues:
323 return []
325 messages = list(self.message_queues[agent_id])
327 if message_type:
328 messages = [m for m in messages if m.message_type == message_type]
330 # Clear processed messages
331 self.message_queues[agent_id].clear()
333 return messages
335 def share_context(self, agent_id: str, context_key: str, context_value: Any):
336 """
337 Share context with other agents
339 Args:
340 agent_id: Agent sharing the context
341 context_key: Context key
342 context_value: Context value
343 """
344 with self.lock:
345 self.shared_context[context_key] = {
346 'value': context_value,
347 'shared_by': agent_id,
348 'timestamp': datetime.now().isoformat()
349 }
350 logger.info(f"Agent {agent_id} shared context: {context_key}")
352 def get_shared_context(self, context_key: str) -> Optional[Any]:
353 """
354 Get shared context
356 Args:
357 context_key: Context key
359 Returns:
360 Context value or None
361 """
362 with self.lock:
363 context = self.shared_context.get(context_key)
364 if context:
365 return context.get('value')
366 return None
368 def _score_agent(self, agent_skills: Dict[str, 'AgentSkill'],
369 required_skills: List[str], strategy: str) -> float:
370 """
371 Score an agent across all required skills using the given strategy.
373 Args:
374 agent_skills: Dict of skill_name -> AgentSkill for this agent
375 required_skills: Skills the agent must have
376 strategy: 'accuracy', 'speed', 'efficiency', or 'balanced'
378 Returns:
379 Composite score (higher is better)
380 """
381 scores = []
382 for skill_name in required_skills:
383 skill = agent_skills.get(skill_name)
384 if not skill:
385 return -1.0 # Missing required skill
387 if strategy == 'speed':
388 # Lower latency = higher score; unknown (0) gets middle score
389 if skill.avg_latency_ms > 0:
390 scores.append(1.0 - min(skill.avg_latency_ms / 60000.0, 1.0))
391 else:
392 scores.append(0.5)
393 elif strategy == 'efficiency':
394 rate = skill.get_success_rate() if skill.usage_count > 0 else skill.proficiency
395 cost_penalty = min(skill.avg_cost_spark / 100.0, 1.0) if skill.avg_cost_spark > 0 else 0.0
396 scores.append(rate - cost_penalty)
397 elif strategy == 'balanced':
398 prof = skill.proficiency
399 rate = skill.get_success_rate() if skill.usage_count > 0 else prof
400 latency_norm = (1.0 - min(skill.avg_latency_ms / 60000.0, 1.0)) if skill.avg_latency_ms > 0 else 0.5
401 cost_norm = (1.0 - min(skill.avg_cost_spark / 100.0, 1.0)) if skill.avg_cost_spark > 0 else 0.5
402 scores.append(0.40 * prof + 0.25 * rate + 0.20 * latency_norm + 0.15 * cost_norm)
403 else:
404 # accuracy (default)
405 scores.append(skill.proficiency)
407 return sum(scores) / len(scores) if scores else 0.0
409 def delegate_task(self, from_agent: str, task: str, required_skills: List[str],
410 context: Optional[Dict] = None,
411 strategy: str = 'accuracy') -> Optional[str]:
412 """
413 Delegate a task to the most suitable agent using the given strategy.
415 Args:
416 from_agent: Agent delegating the task
417 task: Task description
418 required_skills: List of required skills
419 context: Optional task context
420 strategy: Selection strategy ('accuracy', 'speed', 'efficiency', 'balanced')
422 Returns:
423 Delegation ID or None if no suitable agent found
424 """
425 if not required_skills:
426 logger.warning("No required skills specified for task delegation")
427 return None
429 # Find agents with all required skills
430 suitable_agents = None
431 for skill in required_skills:
432 agents_with_skill = set(
433 agent_id for agent_id, _
434 in self.skill_registry.find_agents_with_skill(skill, strategy=strategy)
435 )
436 if suitable_agents is None:
437 suitable_agents = agents_with_skill
438 else:
439 suitable_agents = suitable_agents.intersection(agents_with_skill)
441 if not suitable_agents:
442 # Fallback: semantic match against existing recipes + expert catalog
443 try:
444 from integrations.agentic_router import find_matching_agent
445 _match = find_matching_agent(task)
446 if _match and _match.get('agent_id'):
447 logger.info(f"Skill match failed, semantic match found: {_match['name']}")
448 delegation_id = str(uuid.uuid4())
449 with self.lock:
450 self.delegations[delegation_id] = {
451 'from_agent': from_agent,
452 'to_agent': _match['agent_id'],
453 'task': task,
454 'status': 'matched_existing',
455 'matched_via': 'semantic',
456 'matched_name': _match['name'],
457 }
458 return delegation_id
459 except Exception as _e:
460 logger.debug(f"Semantic fallback failed: {_e}")
461 logger.warning(f"No agents found with required skills: {required_skills}")
462 return None
464 # Score each agent using the strategy across all required skills
465 best_agent = None
466 best_score = -1.0
468 for agent_id in suitable_agents:
469 if agent_id == from_agent:
470 continue
472 agent_skills = self.skill_registry.get_agent_skills(agent_id)
473 score = self._score_agent(agent_skills, required_skills, strategy)
475 if score > best_score:
476 best_score = score
477 best_agent = agent_id
479 if not best_agent:
480 logger.warning("No suitable agent found for delegation")
481 return None
483 # Create delegation
484 delegation_id = str(uuid.uuid4())
486 with self.lock:
487 self.delegations[delegation_id] = {
488 'from_agent': from_agent,
489 'to_agent': best_agent,
490 'task': task,
491 'required_skills': required_skills,
492 'context': context or {},
493 'status': 'delegated',
494 'created_at': datetime.now().isoformat(),
495 'result': None
496 }
498 # Send delegation message
499 self.send_message(
500 from_agent=from_agent,
501 to_agent=best_agent,
502 message_type='task_delegation',
503 content=task,
504 metadata={
505 'delegation_id': delegation_id,
506 'required_skills': required_skills,
507 'context': context
508 }
509 )
511 logger.info(f"Task delegated from {from_agent} to {best_agent}: {delegation_id}")
512 return delegation_id
514 def complete_delegation(self, delegation_id: str, result: Any):
515 """
516 Mark a delegation as complete with result
518 Args:
519 delegation_id: Delegation ID
520 result: Delegation result
521 """
522 with self.lock:
523 if delegation_id in self.delegations:
524 self.delegations[delegation_id]['status'] = 'completed'
525 self.delegations[delegation_id]['result'] = result
526 self.delegations[delegation_id]['completed_at'] = datetime.now().isoformat()
528 # Send completion message back to delegator
529 delegation = self.delegations[delegation_id]
530 self.send_message(
531 from_agent=delegation['to_agent'],
532 to_agent=delegation['from_agent'],
533 message_type='delegation_complete',
534 content=result,
535 metadata={'delegation_id': delegation_id}
536 )
538 logger.info(f"Delegation completed: {delegation_id}")
540 def get_delegation_status(self, delegation_id: str) -> Optional[Dict[str, Any]]:
541 """Get delegation status"""
542 with self.lock:
543 return self.delegations.get(delegation_id)
546# Global instances
547skill_registry = AgentSkillRegistry()
548a2a_context = A2AContextExchange(skill_registry)
551def register_agent_with_skills(agent_id: str, skills: List[Dict[str, Any]]):
552 """
553 Register an agent with its skills
555 Args:
556 agent_id: Agent identifier
557 skills: List of skill definitions
558 """
559 skill_registry.register_agent(agent_id, skills)
560 a2a_context.register_agent(agent_id)
563def create_delegation_function(from_agent_id: str) -> Callable:
564 """
565 Create a delegation function for an agent
567 Args:
568 from_agent_id: Agent ID
570 Returns:
571 Delegation function
572 """
573 def delegate_to_specialist(task: str, required_skills: List[str], context: Optional[Dict] = None) -> str:
574 """
575 Delegate a task to a specialist agent
577 Args:
578 task: Task description
579 required_skills: Required skills for the task
580 context: Optional task context
582 Returns:
583 Delegation ID or error message
584 """
585 delegation_id = a2a_context.delegate_task(from_agent_id, task, required_skills, context)
587 if delegation_id:
588 return json.dumps({
589 'success': True,
590 'delegation_id': delegation_id,
591 'message': f'Task delegated successfully to specialist agent'
592 })
593 else:
594 return json.dumps({
595 'success': False,
596 'error': 'No suitable agent found with required skills',
597 'required_skills': required_skills
598 })
600 return delegate_to_specialist
603def create_context_sharing_function(agent_id: str) -> Callable:
604 """
605 Create a context sharing function for an agent
607 Args:
608 agent_id: Agent ID
610 Returns:
611 Context sharing function
612 """
613 def share_context_with_agents(context_key: str, context_value: Any) -> str:
614 """
615 Share context with other agents
617 Args:
618 context_key: Context identifier
619 context_value: Context value
621 Returns:
622 Success message
623 """
624 a2a_context.share_context(agent_id, context_key, context_value)
626 return json.dumps({
627 'success': True,
628 'message': f'Context "{context_key}" shared successfully',
629 'shared_by': agent_id
630 })
632 return share_context_with_agents
635def create_context_retrieval_function() -> Callable:
636 """
637 Create a context retrieval function
639 Returns:
640 Context retrieval function
641 """
642 def get_shared_context(context_key: str) -> str:
643 """
644 Retrieve shared context from other agents
646 Args:
647 context_key: Context identifier
649 Returns:
650 Context value or error
651 """
652 context = a2a_context.get_shared_context(context_key)
654 if context is not None:
655 return json.dumps({
656 'success': True,
657 'context_key': context_key,
658 'context_value': context
659 })
660 else:
661 return json.dumps({
662 'success': False,
663 'error': f'Context "{context_key}" not found'
664 })
666 return get_shared_context