Coverage for integrations / channels / response / reactions.py: 31.6%

225 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2AckManager - Manages acknowledgment reactions for messages. 

3 

4Provides emoji reactions to acknowledge message receipt, processing status, 

5completion, and errors. 

6""" 

7 

8import asyncio 

9import threading 

10import time 

11from typing import Optional, Callable, Any, List, Set 

12from dataclasses import dataclass, field 

13from enum import Enum 

14import logging 

15 

16logger = logging.getLogger(__name__) 

17 

18 

19class AckState(Enum): 

20 """Acknowledgment states.""" 

21 NONE = "none" 

22 RECEIVED = "received" 

23 PROCESSING = "processing" 

24 COMPLETE = "complete" 

25 ERROR = "error" 

26 

27 

28@dataclass 

29class AckConfig: 

30 """Configuration for acknowledgment reactions.""" 

31 received_emoji: str = "\u2705" # Check mark 

32 processing_emoji: str = "\u23f3" # Hourglass 

33 complete_emoji: str = "\u2714\ufe0f" # Heavy check mark 

34 error_emoji: str = "\u274c" # Cross mark 

35 thinking_emoji: str = "\U0001f914" # Thinking face 

36 queued_emoji: str = "\U0001f4cb" # Clipboard 

37 

38 auto_remove_on_complete: bool = True 

39 auto_remove_delay: float = 2.0 # Seconds to wait before auto-remove 

40 remove_previous_on_transition: bool = True 

41 

42 

43@dataclass 

44class AckContext: 

45 """Context for an acknowledgment session.""" 

46 message_id: str 

47 channel_id: str 

48 current_state: AckState = AckState.NONE 

49 reactions_added: List[str] = field(default_factory=list) 

50 created_at: float = field(default_factory=time.time) 

51 last_updated: float = field(default_factory=time.time) 

52 

53 

54class AckManager: 

55 """ 

56 Manages acknowledgment reactions for messages. 

57 

58 Provides methods to add/remove emoji reactions indicating 

59 message processing status. 

60 """ 

61 

62 def __init__( 

63 self, 

64 add_reaction: Optional[Callable[[str, str, str], Any]] = None, 

65 remove_reaction: Optional[Callable[[str, str, str], Any]] = None, 

66 config: Optional[AckConfig] = None 

67 ): 

68 """ 

69 Initialize the AckManager. 

70 

71 Args: 

72 add_reaction: Callback to add reaction. Takes (channel_id, message_id, emoji). 

73 remove_reaction: Callback to remove reaction. Takes (channel_id, message_id, emoji). 

74 config: Optional configuration for ack behavior. 

75 """ 

76 self._add_reaction = add_reaction 

77 self._remove_reaction = remove_reaction 

78 self._config = config or AckConfig() 

79 self._contexts: dict[str, AckContext] = {} # keyed by message_id 

80 self._lock = threading.Lock() 

81 self._async_lock: Optional[asyncio.Lock] = None 

82 

83 @property 

84 def config(self) -> AckConfig: 

85 """Get the ack configuration.""" 

86 return self._config 

87 

88 @property 

89 def received_emoji(self) -> str: 

90 """Get the received emoji.""" 

91 return self._config.received_emoji 

92 

93 @property 

94 def processing_emoji(self) -> str: 

95 """Get the processing emoji.""" 

96 return self._config.processing_emoji 

97 

98 @property 

99 def complete_emoji(self) -> str: 

100 """Get the complete emoji.""" 

101 return self._config.complete_emoji 

102 

103 @property 

104 def error_emoji(self) -> str: 

105 """Get the error emoji.""" 

106 return self._config.error_emoji 

107 

108 def set_callbacks( 

109 self, 

110 add_reaction: Callable[[str, str, str], Any], 

111 remove_reaction: Callable[[str, str, str], Any] 

112 ) -> None: 

113 """Set the callback functions for adding/removing reactions.""" 

114 self._add_reaction = add_reaction 

115 self._remove_reaction = remove_reaction 

116 

117 def set_emojis( 

118 self, 

119 received: Optional[str] = None, 

120 processing: Optional[str] = None, 

121 complete: Optional[str] = None, 

122 error: Optional[str] = None 

123 ) -> None: 

124 """Update emoji configuration.""" 

125 if received is not None: 

126 self._config.received_emoji = received 

127 if processing is not None: 

128 self._config.processing_emoji = processing 

129 if complete is not None: 

130 self._config.complete_emoji = complete 

131 if error is not None: 

132 self._config.error_emoji = error 

133 

134 def _get_async_lock(self) -> asyncio.Lock: 

135 """Get or create the async lock.""" 

136 if self._async_lock is None: 

137 self._async_lock = asyncio.Lock() 

138 return self._async_lock 

139 

140 def _get_or_create_context(self, message_id: str, channel_id: str) -> AckContext: 

141 """Get existing context or create new one.""" 

142 if message_id not in self._contexts: 

143 self._contexts[message_id] = AckContext( 

144 message_id=message_id, 

145 channel_id=channel_id 

146 ) 

147 return self._contexts[message_id] 

148 

149 def _add_reaction_internal(self, channel_id: str, message_id: str, emoji: str) -> bool: 

150 """Internal method to add a reaction.""" 

151 if self._add_reaction: 

152 try: 

153 self._add_reaction(channel_id, message_id, emoji) 

154 return True 

155 except Exception as e: 

156 logger.warning(f"Failed to add reaction {emoji}: {e}") 

157 return False 

158 return False 

159 

160 def _remove_reaction_internal(self, channel_id: str, message_id: str, emoji: str) -> bool: 

161 """Internal method to remove a reaction.""" 

162 if self._remove_reaction: 

163 try: 

164 self._remove_reaction(channel_id, message_id, emoji) 

165 return True 

166 except Exception as e: 

167 logger.warning(f"Failed to remove reaction {emoji}: {e}") 

168 return False 

169 return False 

170 

171 def ack_received(self, channel_id: str, message_id: str) -> AckContext: 

172 """ 

173 Acknowledge that a message was received. 

174 

175 Args: 

176 channel_id: The channel containing the message. 

177 message_id: The message to acknowledge. 

178 

179 Returns: 

180 AckContext for the acknowledgment session. 

181 """ 

182 with self._lock: 

183 context = self._get_or_create_context(message_id, channel_id) 

184 

185 # Remove previous reactions if configured 

186 if self._config.remove_previous_on_transition: 

187 self._remove_all_reactions(context) 

188 

189 # Add received emoji 

190 emoji = self._config.received_emoji 

191 if self._add_reaction_internal(channel_id, message_id, emoji): 

192 context.reactions_added.append(emoji) 

193 

194 context.current_state = AckState.RECEIVED 

195 context.last_updated = time.time() 

196 

197 logger.debug(f"Acked received for message {message_id}") 

198 return context 

199 

200 def ack_processing(self, channel_id: str, message_id: str) -> AckContext: 

201 """ 

202 Acknowledge that a message is being processed. 

203 

204 Args: 

205 channel_id: The channel containing the message. 

206 message_id: The message being processed. 

207 

208 Returns: 

209 AckContext for the acknowledgment session. 

210 """ 

211 with self._lock: 

212 context = self._get_or_create_context(message_id, channel_id) 

213 

214 # Remove previous reactions if configured 

215 if self._config.remove_previous_on_transition: 

216 self._remove_all_reactions(context) 

217 

218 # Add processing emoji 

219 emoji = self._config.processing_emoji 

220 if self._add_reaction_internal(channel_id, message_id, emoji): 

221 context.reactions_added.append(emoji) 

222 

223 context.current_state = AckState.PROCESSING 

224 context.last_updated = time.time() 

225 

226 logger.debug(f"Acked processing for message {message_id}") 

227 return context 

228 

229 def ack_complete(self, channel_id: str, message_id: str, 

230 auto_remove: Optional[bool] = None) -> AckContext: 

231 """ 

232 Acknowledge that processing is complete. 

233 

234 Args: 

235 channel_id: The channel containing the message. 

236 message_id: The completed message. 

237 auto_remove: Override auto-remove setting for this call. 

238 

239 Returns: 

240 AckContext for the acknowledgment session. 

241 """ 

242 with self._lock: 

243 context = self._get_or_create_context(message_id, channel_id) 

244 

245 # Remove previous reactions if configured 

246 if self._config.remove_previous_on_transition: 

247 self._remove_all_reactions(context) 

248 

249 # Add complete emoji 

250 emoji = self._config.complete_emoji 

251 if self._add_reaction_internal(channel_id, message_id, emoji): 

252 context.reactions_added.append(emoji) 

253 

254 context.current_state = AckState.COMPLETE 

255 context.last_updated = time.time() 

256 

257 # Auto-remove if configured 

258 should_remove = auto_remove if auto_remove is not None else self._config.auto_remove_on_complete 

259 if should_remove: 

260 self._schedule_removal(context) 

261 

262 logger.debug(f"Acked complete for message {message_id}") 

263 return context 

264 

265 def ack_error(self, channel_id: str, message_id: str) -> AckContext: 

266 """ 

267 Acknowledge that an error occurred. 

268 

269 Args: 

270 channel_id: The channel containing the message. 

271 message_id: The message that errored. 

272 

273 Returns: 

274 AckContext for the acknowledgment session. 

275 """ 

276 with self._lock: 

277 context = self._get_or_create_context(message_id, channel_id) 

278 

279 # Remove previous reactions if configured 

280 if self._config.remove_previous_on_transition: 

281 self._remove_all_reactions(context) 

282 

283 # Add error emoji 

284 emoji = self._config.error_emoji 

285 if self._add_reaction_internal(channel_id, message_id, emoji): 

286 context.reactions_added.append(emoji) 

287 

288 context.current_state = AckState.ERROR 

289 context.last_updated = time.time() 

290 

291 logger.debug(f"Acked error for message {message_id}") 

292 return context 

293 

294 def ack_queued(self, channel_id: str, message_id: str) -> AckContext: 

295 """ 

296 Acknowledge that a message is queued for processing. 

297 

298 Args: 

299 channel_id: The channel containing the message. 

300 message_id: The queued message. 

301 

302 Returns: 

303 AckContext for the acknowledgment session. 

304 """ 

305 with self._lock: 

306 context = self._get_or_create_context(message_id, channel_id) 

307 

308 # Add queued emoji 

309 emoji = self._config.queued_emoji 

310 if self._add_reaction_internal(channel_id, message_id, emoji): 

311 context.reactions_added.append(emoji) 

312 

313 context.last_updated = time.time() 

314 

315 logger.debug(f"Acked queued for message {message_id}") 

316 return context 

317 

318 def ack_thinking(self, channel_id: str, message_id: str) -> AckContext: 

319 """ 

320 Show thinking indicator on a message. 

321 

322 Args: 

323 channel_id: The channel containing the message. 

324 message_id: The message being thought about. 

325 

326 Returns: 

327 AckContext for the acknowledgment session. 

328 """ 

329 with self._lock: 

330 context = self._get_or_create_context(message_id, channel_id) 

331 

332 # Add thinking emoji 

333 emoji = self._config.thinking_emoji 

334 if self._add_reaction_internal(channel_id, message_id, emoji): 

335 context.reactions_added.append(emoji) 

336 

337 context.last_updated = time.time() 

338 

339 logger.debug(f"Acked thinking for message {message_id}") 

340 return context 

341 

342 def remove_acks(self, channel_id: str, message_id: str) -> int: 

343 """ 

344 Remove all acknowledgment reactions from a message. 

345 

346 Args: 

347 channel_id: The channel containing the message. 

348 message_id: The message to clear reactions from. 

349 

350 Returns: 

351 Number of reactions removed. 

352 """ 

353 with self._lock: 

354 context = self._contexts.get(message_id) 

355 if not context: 

356 return 0 

357 

358 count = self._remove_all_reactions(context) 

359 context.current_state = AckState.NONE 

360 

361 # Clean up context 

362 del self._contexts[message_id] 

363 

364 logger.debug(f"Removed {count} acks from message {message_id}") 

365 return count 

366 

367 def _remove_all_reactions(self, context: AckContext) -> int: 

368 """Remove all reactions from a context.""" 

369 count = 0 

370 for emoji in list(context.reactions_added): 

371 if self._remove_reaction_internal(context.channel_id, context.message_id, emoji): 

372 count += 1 

373 context.reactions_added.clear() 

374 return count 

375 

376 def _schedule_removal(self, context: AckContext) -> None: 

377 """Schedule automatic removal of reactions.""" 

378 def remove_after_delay(): 

379 time.sleep(self._config.auto_remove_delay) 

380 self.remove_acks(context.channel_id, context.message_id) 

381 

382 thread = threading.Thread(target=remove_after_delay, daemon=True) 

383 thread.start() 

384 

385 def get_state(self, message_id: str) -> AckState: 

386 """Get the current acknowledgment state for a message.""" 

387 with self._lock: 

388 context = self._contexts.get(message_id) 

389 return context.current_state if context else AckState.NONE 

390 

391 def get_context(self, message_id: str) -> Optional[AckContext]: 

392 """Get the acknowledgment context for a message.""" 

393 with self._lock: 

394 return self._contexts.get(message_id) 

395 

396 def get_active_messages(self) -> List[str]: 

397 """Get list of messages with active acknowledgments.""" 

398 with self._lock: 

399 return list(self._contexts.keys()) 

400 

401 def clear_all(self) -> int: 

402 """ 

403 Clear all acknowledgments. 

404 

405 Returns: 

406 Number of messages cleared. 

407 """ 

408 with self._lock: 

409 count = len(self._contexts) 

410 for context in list(self._contexts.values()): 

411 self._remove_all_reactions(context) 

412 self._contexts.clear() 

413 return count 

414 

415 # Async variants 

416 async def ack_received_async(self, channel_id: str, message_id: str) -> AckContext: 

417 """Async version of ack_received().""" 

418 async with self._get_async_lock(): 

419 return self.ack_received(channel_id, message_id) 

420 

421 async def ack_processing_async(self, channel_id: str, message_id: str) -> AckContext: 

422 """Async version of ack_processing().""" 

423 async with self._get_async_lock(): 

424 return self.ack_processing(channel_id, message_id) 

425 

426 async def ack_complete_async(self, channel_id: str, message_id: str, 

427 auto_remove: Optional[bool] = None) -> AckContext: 

428 """Async version of ack_complete().""" 

429 async with self._get_async_lock(): 

430 return self.ack_complete(channel_id, message_id, auto_remove) 

431 

432 async def ack_error_async(self, channel_id: str, message_id: str) -> AckContext: 

433 """Async version of ack_error().""" 

434 async with self._get_async_lock(): 

435 return self.ack_error(channel_id, message_id) 

436 

437 async def remove_acks_async(self, channel_id: str, message_id: str) -> int: 

438 """Async version of remove_acks().""" 

439 async with self._get_async_lock(): 

440 return self.remove_acks(channel_id, message_id) 

441 

442 def transition_state(self, channel_id: str, message_id: str, 

443 new_state: AckState) -> AckContext: 

444 """ 

445 Transition a message to a new acknowledgment state. 

446 

447 Args: 

448 channel_id: The channel containing the message. 

449 message_id: The message to transition. 

450 new_state: The new state to transition to. 

451 

452 Returns: 

453 Updated AckContext. 

454 """ 

455 state_methods = { 

456 AckState.RECEIVED: self.ack_received, 

457 AckState.PROCESSING: self.ack_processing, 

458 AckState.COMPLETE: self.ack_complete, 

459 AckState.ERROR: self.ack_error, 

460 AckState.NONE: lambda c, m: self.remove_acks(c, m) or self._get_or_create_context(m, c) 

461 } 

462 

463 method = state_methods.get(new_state) 

464 if method: 

465 return method(channel_id, message_id) 

466 raise ValueError(f"Unknown state: {new_state}") 

467 

468 def get_stats(self) -> dict: 

469 """Get statistics about acknowledgments.""" 

470 with self._lock: 

471 state_counts = {} 

472 for context in self._contexts.values(): 

473 state = context.current_state.value 

474 state_counts[state] = state_counts.get(state, 0) + 1 

475 

476 return { 

477 "total_tracked": len(self._contexts), 

478 "state_counts": state_counts, 

479 "messages": list(self._contexts.keys()) 

480 }