Coverage for integrations / channels / identity / agent_identity.py: 31.9%

141 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Agent Identity Management for HevolveBot Integration. 

3 

4This module provides AgentIdentity and IdentityManager for managing 

5bot identities across different channels. 

6""" 

7 

8from dataclasses import dataclass, field, asdict 

9from typing import Dict, List, Optional, Any 

10from datetime import datetime 

11import uuid 

12import json 

13import logging 

14 

15logger = logging.getLogger(__name__) 

16 

17 

18@dataclass 

19class AgentIdentity: 

20 """Represents an agent's identity configuration.""" 

21 

22 id: str = field(default_factory=lambda: str(uuid.uuid4())) 

23 name: str = "HevolveBot" 

24 description: str = "An intelligent assistant" 

25 avatar_url: Optional[str] = None 

26 emoji: str = "🤖" 

27 personality: Dict[str, Any] = field(default_factory=lambda: { 

28 "tone": "friendly", 

29 "formality": "casual", 

30 "verbosity": "balanced" 

31 }) 

32 capabilities: List[str] = field(default_factory=lambda: [ 

33 "conversation", 

34 "task_execution", 

35 "information_retrieval" 

36 ]) 

37 metadata: Dict[str, Any] = field(default_factory=dict) 

38 created_at: datetime = field(default_factory=datetime.utcnow) 

39 updated_at: datetime = field(default_factory=datetime.utcnow) 

40 

41 def to_dict(self) -> Dict[str, Any]: 

42 """Convert identity to dictionary.""" 

43 data = asdict(self) 

44 data['created_at'] = self.created_at.isoformat() 

45 data['updated_at'] = self.updated_at.isoformat() 

46 return data 

47 

48 @classmethod 

49 def from_dict(cls, data: Dict[str, Any]) -> 'AgentIdentity': 

50 """Create identity from dictionary.""" 

51 if 'created_at' in data and isinstance(data['created_at'], str): 

52 data['created_at'] = datetime.fromisoformat(data['created_at']) 

53 if 'updated_at' in data and isinstance(data['updated_at'], str): 

54 data['updated_at'] = datetime.fromisoformat(data['updated_at']) 

55 return cls(**data) 

56 

57 def update(self, **kwargs) -> 'AgentIdentity': 

58 """Update identity fields and return self.""" 

59 for key, value in kwargs.items(): 

60 if hasattr(self, key): 

61 setattr(self, key, value) 

62 self.updated_at = datetime.utcnow() 

63 return self 

64 

65 def has_capability(self, capability: str) -> bool: 

66 """Check if agent has a specific capability.""" 

67 return capability in self.capabilities 

68 

69 def add_capability(self, capability: str) -> None: 

70 """Add a capability to the agent.""" 

71 if capability not in self.capabilities: 

72 self.capabilities.append(capability) 

73 self.updated_at = datetime.utcnow() 

74 

75 def remove_capability(self, capability: str) -> bool: 

76 """Remove a capability from the agent.""" 

77 if capability in self.capabilities: 

78 self.capabilities.remove(capability) 

79 self.updated_at = datetime.utcnow() 

80 return True 

81 return False 

82 

83 def get_personality_trait(self, trait: str, default: Any = None) -> Any: 

84 """Get a specific personality trait.""" 

85 return self.personality.get(trait, default) 

86 

87 def set_personality_trait(self, trait: str, value: Any) -> None: 

88 """Set a specific personality trait.""" 

89 self.personality[trait] = value 

90 self.updated_at = datetime.utcnow() 

91 

92 

93class IdentityManager: 

94 """ 

95 Manages agent identities across channels. 

96 

97 Supports: 

98 - Default identity management 

99 - Per-channel identity customization 

100 - Identity persistence 

101 - Identity switching 

102 """ 

103 

104 def __init__(self, default_identity: Optional[AgentIdentity] = None): 

105 """ 

106 Initialize the identity manager. 

107 

108 Args: 

109 default_identity: Optional default identity to use 

110 """ 

111 self._default_identity = default_identity or AgentIdentity() 

112 self._channel_identities: Dict[str, AgentIdentity] = {} 

113 self._identity_store: Dict[str, AgentIdentity] = {} 

114 self._active_identity_id: str = self._default_identity.id 

115 

116 # Register default identity 

117 self._identity_store[self._default_identity.id] = self._default_identity 

118 

119 @property 

120 def default_identity(self) -> AgentIdentity: 

121 """Get the default identity.""" 

122 return self._default_identity 

123 

124 def get_identity(self, identity_id: Optional[str] = None) -> Optional[AgentIdentity]: 

125 """ 

126 Get an identity by ID. 

127 

128 Args: 

129 identity_id: Optional ID of the identity to retrieve. 

130 Returns active identity if None. 

131 

132 Returns: 

133 The requested identity or None if not found 

134 """ 

135 if identity_id is None: 

136 return self._identity_store.get(self._active_identity_id) 

137 return self._identity_store.get(identity_id) 

138 

139 def set_identity(self, identity: AgentIdentity) -> None: 

140 """ 

141 Register or update an identity. 

142 

143 Args: 

144 identity: The identity to register 

145 """ 

146 self._identity_store[identity.id] = identity 

147 logger.info(f"Identity registered: {identity.id} ({identity.name})") 

148 

149 def set_active_identity(self, identity_id: str) -> bool: 

150 """ 

151 Set the active identity. 

152 

153 Args: 

154 identity_id: ID of the identity to activate 

155 

156 Returns: 

157 True if successful, False if identity not found 

158 """ 

159 if identity_id in self._identity_store: 

160 self._active_identity_id = identity_id 

161 logger.info(f"Active identity set to: {identity_id}") 

162 return True 

163 logger.warning(f"Identity not found: {identity_id}") 

164 return False 

165 

166 def get_identity_for_channel(self, channel: str) -> AgentIdentity: 

167 """ 

168 Get the identity configured for a specific channel. 

169 

170 Args: 

171 channel: The channel identifier (e.g., 'discord', 'telegram', 'slack') 

172 

173 Returns: 

174 The channel-specific identity or the default identity 

175 """ 

176 if channel in self._channel_identities: 

177 return self._channel_identities[channel] 

178 return self._default_identity 

179 

180 def set_identity_for_channel(self, channel: str, identity: AgentIdentity) -> None: 

181 """ 

182 Set the identity for a specific channel. 

183 

184 Args: 

185 channel: The channel identifier 

186 identity: The identity to use for this channel 

187 """ 

188 self._channel_identities[channel] = identity 

189 # Also register in store if not already 

190 if identity.id not in self._identity_store: 

191 self._identity_store[identity.id] = identity 

192 logger.info(f"Identity set for channel {channel}: {identity.name}") 

193 

194 def remove_channel_identity(self, channel: str) -> bool: 

195 """ 

196 Remove channel-specific identity (will fall back to default). 

197 

198 Args: 

199 channel: The channel identifier 

200 

201 Returns: 

202 True if removed, False if no custom identity was set 

203 """ 

204 if channel in self._channel_identities: 

205 del self._channel_identities[channel] 

206 logger.info(f"Channel identity removed for: {channel}") 

207 return True 

208 return False 

209 

210 def list_identities(self) -> List[AgentIdentity]: 

211 """Get all registered identities.""" 

212 return list(self._identity_store.values()) 

213 

214 def list_channel_identities(self) -> Dict[str, AgentIdentity]: 

215 """Get all channel-specific identities.""" 

216 return dict(self._channel_identities) 

217 

218 def create_identity(self, **kwargs) -> AgentIdentity: 

219 """ 

220 Create and register a new identity. 

221 

222 Args: 

223 **kwargs: Identity attributes 

224 

225 Returns: 

226 The newly created identity 

227 """ 

228 identity = AgentIdentity(**kwargs) 

229 self.set_identity(identity) 

230 return identity 

231 

232 def delete_identity(self, identity_id: str) -> bool: 

233 """ 

234 Delete an identity. 

235 

236 Args: 

237 identity_id: ID of the identity to delete 

238 

239 Returns: 

240 True if deleted, False if not found or is default 

241 """ 

242 if identity_id == self._default_identity.id: 

243 logger.warning("Cannot delete default identity") 

244 return False 

245 

246 if identity_id in self._identity_store: 

247 # Remove from channel mappings 

248 channels_to_remove = [ 

249 ch for ch, ident in self._channel_identities.items() 

250 if ident.id == identity_id 

251 ] 

252 for channel in channels_to_remove: 

253 del self._channel_identities[channel] 

254 

255 # Remove from store 

256 del self._identity_store[identity_id] 

257 

258 # Reset active if needed 

259 if self._active_identity_id == identity_id: 

260 self._active_identity_id = self._default_identity.id 

261 

262 logger.info(f"Identity deleted: {identity_id}") 

263 return True 

264 

265 return False 

266 

267 def clone_identity(self, identity_id: str, new_name: Optional[str] = None) -> Optional[AgentIdentity]: 

268 """ 

269 Clone an existing identity. 

270 

271 Args: 

272 identity_id: ID of the identity to clone 

273 new_name: Optional new name for the clone 

274 

275 Returns: 

276 The cloned identity or None if source not found 

277 """ 

278 source = self.get_identity(identity_id) 

279 if source is None: 

280 return None 

281 

282 data = source.to_dict() 

283 data['id'] = str(uuid.uuid4()) 

284 data['name'] = new_name or f"{source.name} (Copy)" 

285 data['created_at'] = datetime.utcnow() 

286 data['updated_at'] = datetime.utcnow() 

287 

288 clone = AgentIdentity.from_dict(data) 

289 self.set_identity(clone) 

290 return clone 

291 

292 def export_identities(self) -> str: 

293 """Export all identities as JSON.""" 

294 data = { 

295 'default_identity_id': self._default_identity.id, 

296 'active_identity_id': self._active_identity_id, 

297 'identities': [ident.to_dict() for ident in self._identity_store.values()], 

298 'channel_mappings': { 

299 channel: ident.id 

300 for channel, ident in self._channel_identities.items() 

301 } 

302 } 

303 return json.dumps(data, indent=2) 

304 

305 def import_identities(self, json_data: str) -> int: 

306 """ 

307 Import identities from JSON. 

308 

309 Args: 

310 json_data: JSON string containing identity data 

311 

312 Returns: 

313 Number of identities imported 

314 """ 

315 data = json.loads(json_data) 

316 count = 0 

317 

318 for ident_data in data.get('identities', []): 

319 identity = AgentIdentity.from_dict(ident_data) 

320 self.set_identity(identity) 

321 count += 1 

322 

323 # Restore channel mappings 

324 for channel, ident_id in data.get('channel_mappings', {}).items(): 

325 if ident_id in self._identity_store: 

326 self._channel_identities[channel] = self._identity_store[ident_id] 

327 

328 return count