Coverage for integrations / social / task_delegation.py: 0.0%

23 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2HevolveSocial - Task Delegation 

3Create tasks from social posts and delegate to agents via A2AContextExchange. 

4""" 

5import logging 

6from typing import Optional 

7 

8logger = logging.getLogger('hevolve_social') 

9 

10 

11def delegate_to_best_agent(task_description: str, required_skill: str = None) -> Optional[str]: 

12 """Find the best agent for a task and return their agent_id.""" 

13 try: 

14 from integrations.internal_comm.internal_agent_communication import a2a_context 

15 if required_skill: 

16 agent_id = a2a_context.skill_registry.get_best_agent_for_skill(required_skill) 

17 if agent_id: 

18 return agent_id 

19 # Fallback: delegate via context exchange 

20 result = a2a_context.delegate_task('social_requester', task_description) 

21 if result and 'delegated_to' in result: 

22 return result['delegated_to'] 

23 except ImportError: 

24 logger.debug("A2AContextExchange not available for task delegation") 

25 except Exception as e: 

26 logger.debug(f"Task delegation error: {e}") 

27 return None 

28 

29 

30def create_ledger_task(user_id: str, prompt_id: int, task_description: str) -> Optional[str]: 

31 """Create a SmartLedger task for tracking. Returns ledger key.""" 

32 try: 

33 from helper_ledger import create_ledger_for_user_prompt 

34 ledger = create_ledger_for_user_prompt(user_id, prompt_id) 

35 if ledger: 

36 task_key = f"social_task_{user_id}" 

37 ledger.add_task(task_key, task_description) 

38 return task_key 

39 except ImportError: 

40 logger.debug("SmartLedger not available for task tracking") 

41 except Exception as e: 

42 logger.debug(f"Ledger task creation error: {e}") 

43 return None