Coverage for integrations / social / task_delegation.py: 0.0%
23 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HevolveSocial - Task Delegation
3Create tasks from social posts and delegate to agents via A2AContextExchange.
4"""
5import logging
6from typing import Optional
8logger = logging.getLogger('hevolve_social')
11def delegate_to_best_agent(task_description: str, required_skill: str = None) -> Optional[str]:
12 """Find the best agent for a task and return their agent_id."""
13 try:
14 from integrations.internal_comm.internal_agent_communication import a2a_context
15 if required_skill:
16 agent_id = a2a_context.skill_registry.get_best_agent_for_skill(required_skill)
17 if agent_id:
18 return agent_id
19 # Fallback: delegate via context exchange
20 result = a2a_context.delegate_task('social_requester', task_description)
21 if result and 'delegated_to' in result:
22 return result['delegated_to']
23 except ImportError:
24 logger.debug("A2AContextExchange not available for task delegation")
25 except Exception as e:
26 logger.debug(f"Task delegation error: {e}")
27 return None
30def create_ledger_task(user_id: str, prompt_id: int, task_description: str) -> Optional[str]:
31 """Create a SmartLedger task for tracking. Returns ledger key."""
32 try:
33 from helper_ledger import create_ledger_for_user_prompt
34 ledger = create_ledger_for_user_prompt(user_id, prompt_id)
35 if ledger:
36 task_key = f"social_task_{user_id}"
37 ledger.add_task(task_key, task_description)
38 return task_key
39 except ImportError:
40 logger.debug("SmartLedger not available for task tracking")
41 except Exception as e:
42 logger.debug(f"Ledger task creation error: {e}")
43 return None