Coverage for integrations / channels / identity / agent_identity.py: 31.9%
141 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Agent Identity Management for HevolveBot Integration.
4This module provides AgentIdentity and IdentityManager for managing
5bot identities across different channels.
6"""
8from dataclasses import dataclass, field, asdict
9from typing import Dict, List, Optional, Any
10from datetime import datetime
11import uuid
12import json
13import logging
15logger = logging.getLogger(__name__)
18@dataclass
19class AgentIdentity:
20 """Represents an agent's identity configuration."""
22 id: str = field(default_factory=lambda: str(uuid.uuid4()))
23 name: str = "HevolveBot"
24 description: str = "An intelligent assistant"
25 avatar_url: Optional[str] = None
26 emoji: str = "🤖"
27 personality: Dict[str, Any] = field(default_factory=lambda: {
28 "tone": "friendly",
29 "formality": "casual",
30 "verbosity": "balanced"
31 })
32 capabilities: List[str] = field(default_factory=lambda: [
33 "conversation",
34 "task_execution",
35 "information_retrieval"
36 ])
37 metadata: Dict[str, Any] = field(default_factory=dict)
38 created_at: datetime = field(default_factory=datetime.utcnow)
39 updated_at: datetime = field(default_factory=datetime.utcnow)
41 def to_dict(self) -> Dict[str, Any]:
42 """Convert identity to dictionary."""
43 data = asdict(self)
44 data['created_at'] = self.created_at.isoformat()
45 data['updated_at'] = self.updated_at.isoformat()
46 return data
48 @classmethod
49 def from_dict(cls, data: Dict[str, Any]) -> 'AgentIdentity':
50 """Create identity from dictionary."""
51 if 'created_at' in data and isinstance(data['created_at'], str):
52 data['created_at'] = datetime.fromisoformat(data['created_at'])
53 if 'updated_at' in data and isinstance(data['updated_at'], str):
54 data['updated_at'] = datetime.fromisoformat(data['updated_at'])
55 return cls(**data)
57 def update(self, **kwargs) -> 'AgentIdentity':
58 """Update identity fields and return self."""
59 for key, value in kwargs.items():
60 if hasattr(self, key):
61 setattr(self, key, value)
62 self.updated_at = datetime.utcnow()
63 return self
65 def has_capability(self, capability: str) -> bool:
66 """Check if agent has a specific capability."""
67 return capability in self.capabilities
69 def add_capability(self, capability: str) -> None:
70 """Add a capability to the agent."""
71 if capability not in self.capabilities:
72 self.capabilities.append(capability)
73 self.updated_at = datetime.utcnow()
75 def remove_capability(self, capability: str) -> bool:
76 """Remove a capability from the agent."""
77 if capability in self.capabilities:
78 self.capabilities.remove(capability)
79 self.updated_at = datetime.utcnow()
80 return True
81 return False
83 def get_personality_trait(self, trait: str, default: Any = None) -> Any:
84 """Get a specific personality trait."""
85 return self.personality.get(trait, default)
87 def set_personality_trait(self, trait: str, value: Any) -> None:
88 """Set a specific personality trait."""
89 self.personality[trait] = value
90 self.updated_at = datetime.utcnow()
93class IdentityManager:
94 """
95 Manages agent identities across channels.
97 Supports:
98 - Default identity management
99 - Per-channel identity customization
100 - Identity persistence
101 - Identity switching
102 """
104 def __init__(self, default_identity: Optional[AgentIdentity] = None):
105 """
106 Initialize the identity manager.
108 Args:
109 default_identity: Optional default identity to use
110 """
111 self._default_identity = default_identity or AgentIdentity()
112 self._channel_identities: Dict[str, AgentIdentity] = {}
113 self._identity_store: Dict[str, AgentIdentity] = {}
114 self._active_identity_id: str = self._default_identity.id
116 # Register default identity
117 self._identity_store[self._default_identity.id] = self._default_identity
119 @property
120 def default_identity(self) -> AgentIdentity:
121 """Get the default identity."""
122 return self._default_identity
124 def get_identity(self, identity_id: Optional[str] = None) -> Optional[AgentIdentity]:
125 """
126 Get an identity by ID.
128 Args:
129 identity_id: Optional ID of the identity to retrieve.
130 Returns active identity if None.
132 Returns:
133 The requested identity or None if not found
134 """
135 if identity_id is None:
136 return self._identity_store.get(self._active_identity_id)
137 return self._identity_store.get(identity_id)
139 def set_identity(self, identity: AgentIdentity) -> None:
140 """
141 Register or update an identity.
143 Args:
144 identity: The identity to register
145 """
146 self._identity_store[identity.id] = identity
147 logger.info(f"Identity registered: {identity.id} ({identity.name})")
149 def set_active_identity(self, identity_id: str) -> bool:
150 """
151 Set the active identity.
153 Args:
154 identity_id: ID of the identity to activate
156 Returns:
157 True if successful, False if identity not found
158 """
159 if identity_id in self._identity_store:
160 self._active_identity_id = identity_id
161 logger.info(f"Active identity set to: {identity_id}")
162 return True
163 logger.warning(f"Identity not found: {identity_id}")
164 return False
166 def get_identity_for_channel(self, channel: str) -> AgentIdentity:
167 """
168 Get the identity configured for a specific channel.
170 Args:
171 channel: The channel identifier (e.g., 'discord', 'telegram', 'slack')
173 Returns:
174 The channel-specific identity or the default identity
175 """
176 if channel in self._channel_identities:
177 return self._channel_identities[channel]
178 return self._default_identity
180 def set_identity_for_channel(self, channel: str, identity: AgentIdentity) -> None:
181 """
182 Set the identity for a specific channel.
184 Args:
185 channel: The channel identifier
186 identity: The identity to use for this channel
187 """
188 self._channel_identities[channel] = identity
189 # Also register in store if not already
190 if identity.id not in self._identity_store:
191 self._identity_store[identity.id] = identity
192 logger.info(f"Identity set for channel {channel}: {identity.name}")
194 def remove_channel_identity(self, channel: str) -> bool:
195 """
196 Remove channel-specific identity (will fall back to default).
198 Args:
199 channel: The channel identifier
201 Returns:
202 True if removed, False if no custom identity was set
203 """
204 if channel in self._channel_identities:
205 del self._channel_identities[channel]
206 logger.info(f"Channel identity removed for: {channel}")
207 return True
208 return False
210 def list_identities(self) -> List[AgentIdentity]:
211 """Get all registered identities."""
212 return list(self._identity_store.values())
214 def list_channel_identities(self) -> Dict[str, AgentIdentity]:
215 """Get all channel-specific identities."""
216 return dict(self._channel_identities)
218 def create_identity(self, **kwargs) -> AgentIdentity:
219 """
220 Create and register a new identity.
222 Args:
223 **kwargs: Identity attributes
225 Returns:
226 The newly created identity
227 """
228 identity = AgentIdentity(**kwargs)
229 self.set_identity(identity)
230 return identity
232 def delete_identity(self, identity_id: str) -> bool:
233 """
234 Delete an identity.
236 Args:
237 identity_id: ID of the identity to delete
239 Returns:
240 True if deleted, False if not found or is default
241 """
242 if identity_id == self._default_identity.id:
243 logger.warning("Cannot delete default identity")
244 return False
246 if identity_id in self._identity_store:
247 # Remove from channel mappings
248 channels_to_remove = [
249 ch for ch, ident in self._channel_identities.items()
250 if ident.id == identity_id
251 ]
252 for channel in channels_to_remove:
253 del self._channel_identities[channel]
255 # Remove from store
256 del self._identity_store[identity_id]
258 # Reset active if needed
259 if self._active_identity_id == identity_id:
260 self._active_identity_id = self._default_identity.id
262 logger.info(f"Identity deleted: {identity_id}")
263 return True
265 return False
267 def clone_identity(self, identity_id: str, new_name: Optional[str] = None) -> Optional[AgentIdentity]:
268 """
269 Clone an existing identity.
271 Args:
272 identity_id: ID of the identity to clone
273 new_name: Optional new name for the clone
275 Returns:
276 The cloned identity or None if source not found
277 """
278 source = self.get_identity(identity_id)
279 if source is None:
280 return None
282 data = source.to_dict()
283 data['id'] = str(uuid.uuid4())
284 data['name'] = new_name or f"{source.name} (Copy)"
285 data['created_at'] = datetime.utcnow()
286 data['updated_at'] = datetime.utcnow()
288 clone = AgentIdentity.from_dict(data)
289 self.set_identity(clone)
290 return clone
292 def export_identities(self) -> str:
293 """Export all identities as JSON."""
294 data = {
295 'default_identity_id': self._default_identity.id,
296 'active_identity_id': self._active_identity_id,
297 'identities': [ident.to_dict() for ident in self._identity_store.values()],
298 'channel_mappings': {
299 channel: ident.id
300 for channel, ident in self._channel_identities.items()
301 }
302 }
303 return json.dumps(data, indent=2)
305 def import_identities(self, json_data: str) -> int:
306 """
307 Import identities from JSON.
309 Args:
310 json_data: JSON string containing identity data
312 Returns:
313 Number of identities imported
314 """
315 data = json.loads(json_data)
316 count = 0
318 for ident_data in data.get('identities', []):
319 identity = AgentIdentity.from_dict(ident_data)
320 self.set_identity(identity)
321 count += 1
323 # Restore channel mappings
324 for channel, ident_id in data.get('channel_mappings', {}).items():
325 if ident_id in self._identity_store:
326 self._channel_identities[channel] = self._identity_store[ident_id]
328 return count