Coverage for integrations / channels / response / reactions.py: 31.6%
225 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2AckManager - Manages acknowledgment reactions for messages.
4Provides emoji reactions to acknowledge message receipt, processing status,
5completion, and errors.
6"""
8import asyncio
9import threading
10import time
11from typing import Optional, Callable, Any, List, Set
12from dataclasses import dataclass, field
13from enum import Enum
14import logging
16logger = logging.getLogger(__name__)
19class AckState(Enum):
20 """Acknowledgment states."""
21 NONE = "none"
22 RECEIVED = "received"
23 PROCESSING = "processing"
24 COMPLETE = "complete"
25 ERROR = "error"
28@dataclass
29class AckConfig:
30 """Configuration for acknowledgment reactions."""
31 received_emoji: str = "\u2705" # Check mark
32 processing_emoji: str = "\u23f3" # Hourglass
33 complete_emoji: str = "\u2714\ufe0f" # Heavy check mark
34 error_emoji: str = "\u274c" # Cross mark
35 thinking_emoji: str = "\U0001f914" # Thinking face
36 queued_emoji: str = "\U0001f4cb" # Clipboard
38 auto_remove_on_complete: bool = True
39 auto_remove_delay: float = 2.0 # Seconds to wait before auto-remove
40 remove_previous_on_transition: bool = True
43@dataclass
44class AckContext:
45 """Context for an acknowledgment session."""
46 message_id: str
47 channel_id: str
48 current_state: AckState = AckState.NONE
49 reactions_added: List[str] = field(default_factory=list)
50 created_at: float = field(default_factory=time.time)
51 last_updated: float = field(default_factory=time.time)
54class AckManager:
55 """
56 Manages acknowledgment reactions for messages.
58 Provides methods to add/remove emoji reactions indicating
59 message processing status.
60 """
62 def __init__(
63 self,
64 add_reaction: Optional[Callable[[str, str, str], Any]] = None,
65 remove_reaction: Optional[Callable[[str, str, str], Any]] = None,
66 config: Optional[AckConfig] = None
67 ):
68 """
69 Initialize the AckManager.
71 Args:
72 add_reaction: Callback to add reaction. Takes (channel_id, message_id, emoji).
73 remove_reaction: Callback to remove reaction. Takes (channel_id, message_id, emoji).
74 config: Optional configuration for ack behavior.
75 """
76 self._add_reaction = add_reaction
77 self._remove_reaction = remove_reaction
78 self._config = config or AckConfig()
79 self._contexts: dict[str, AckContext] = {} # keyed by message_id
80 self._lock = threading.Lock()
81 self._async_lock: Optional[asyncio.Lock] = None
83 @property
84 def config(self) -> AckConfig:
85 """Get the ack configuration."""
86 return self._config
88 @property
89 def received_emoji(self) -> str:
90 """Get the received emoji."""
91 return self._config.received_emoji
93 @property
94 def processing_emoji(self) -> str:
95 """Get the processing emoji."""
96 return self._config.processing_emoji
98 @property
99 def complete_emoji(self) -> str:
100 """Get the complete emoji."""
101 return self._config.complete_emoji
103 @property
104 def error_emoji(self) -> str:
105 """Get the error emoji."""
106 return self._config.error_emoji
108 def set_callbacks(
109 self,
110 add_reaction: Callable[[str, str, str], Any],
111 remove_reaction: Callable[[str, str, str], Any]
112 ) -> None:
113 """Set the callback functions for adding/removing reactions."""
114 self._add_reaction = add_reaction
115 self._remove_reaction = remove_reaction
117 def set_emojis(
118 self,
119 received: Optional[str] = None,
120 processing: Optional[str] = None,
121 complete: Optional[str] = None,
122 error: Optional[str] = None
123 ) -> None:
124 """Update emoji configuration."""
125 if received is not None:
126 self._config.received_emoji = received
127 if processing is not None:
128 self._config.processing_emoji = processing
129 if complete is not None:
130 self._config.complete_emoji = complete
131 if error is not None:
132 self._config.error_emoji = error
134 def _get_async_lock(self) -> asyncio.Lock:
135 """Get or create the async lock."""
136 if self._async_lock is None:
137 self._async_lock = asyncio.Lock()
138 return self._async_lock
140 def _get_or_create_context(self, message_id: str, channel_id: str) -> AckContext:
141 """Get existing context or create new one."""
142 if message_id not in self._contexts:
143 self._contexts[message_id] = AckContext(
144 message_id=message_id,
145 channel_id=channel_id
146 )
147 return self._contexts[message_id]
149 def _add_reaction_internal(self, channel_id: str, message_id: str, emoji: str) -> bool:
150 """Internal method to add a reaction."""
151 if self._add_reaction:
152 try:
153 self._add_reaction(channel_id, message_id, emoji)
154 return True
155 except Exception as e:
156 logger.warning(f"Failed to add reaction {emoji}: {e}")
157 return False
158 return False
160 def _remove_reaction_internal(self, channel_id: str, message_id: str, emoji: str) -> bool:
161 """Internal method to remove a reaction."""
162 if self._remove_reaction:
163 try:
164 self._remove_reaction(channel_id, message_id, emoji)
165 return True
166 except Exception as e:
167 logger.warning(f"Failed to remove reaction {emoji}: {e}")
168 return False
169 return False
171 def ack_received(self, channel_id: str, message_id: str) -> AckContext:
172 """
173 Acknowledge that a message was received.
175 Args:
176 channel_id: The channel containing the message.
177 message_id: The message to acknowledge.
179 Returns:
180 AckContext for the acknowledgment session.
181 """
182 with self._lock:
183 context = self._get_or_create_context(message_id, channel_id)
185 # Remove previous reactions if configured
186 if self._config.remove_previous_on_transition:
187 self._remove_all_reactions(context)
189 # Add received emoji
190 emoji = self._config.received_emoji
191 if self._add_reaction_internal(channel_id, message_id, emoji):
192 context.reactions_added.append(emoji)
194 context.current_state = AckState.RECEIVED
195 context.last_updated = time.time()
197 logger.debug(f"Acked received for message {message_id}")
198 return context
200 def ack_processing(self, channel_id: str, message_id: str) -> AckContext:
201 """
202 Acknowledge that a message is being processed.
204 Args:
205 channel_id: The channel containing the message.
206 message_id: The message being processed.
208 Returns:
209 AckContext for the acknowledgment session.
210 """
211 with self._lock:
212 context = self._get_or_create_context(message_id, channel_id)
214 # Remove previous reactions if configured
215 if self._config.remove_previous_on_transition:
216 self._remove_all_reactions(context)
218 # Add processing emoji
219 emoji = self._config.processing_emoji
220 if self._add_reaction_internal(channel_id, message_id, emoji):
221 context.reactions_added.append(emoji)
223 context.current_state = AckState.PROCESSING
224 context.last_updated = time.time()
226 logger.debug(f"Acked processing for message {message_id}")
227 return context
229 def ack_complete(self, channel_id: str, message_id: str,
230 auto_remove: Optional[bool] = None) -> AckContext:
231 """
232 Acknowledge that processing is complete.
234 Args:
235 channel_id: The channel containing the message.
236 message_id: The completed message.
237 auto_remove: Override auto-remove setting for this call.
239 Returns:
240 AckContext for the acknowledgment session.
241 """
242 with self._lock:
243 context = self._get_or_create_context(message_id, channel_id)
245 # Remove previous reactions if configured
246 if self._config.remove_previous_on_transition:
247 self._remove_all_reactions(context)
249 # Add complete emoji
250 emoji = self._config.complete_emoji
251 if self._add_reaction_internal(channel_id, message_id, emoji):
252 context.reactions_added.append(emoji)
254 context.current_state = AckState.COMPLETE
255 context.last_updated = time.time()
257 # Auto-remove if configured
258 should_remove = auto_remove if auto_remove is not None else self._config.auto_remove_on_complete
259 if should_remove:
260 self._schedule_removal(context)
262 logger.debug(f"Acked complete for message {message_id}")
263 return context
265 def ack_error(self, channel_id: str, message_id: str) -> AckContext:
266 """
267 Acknowledge that an error occurred.
269 Args:
270 channel_id: The channel containing the message.
271 message_id: The message that errored.
273 Returns:
274 AckContext for the acknowledgment session.
275 """
276 with self._lock:
277 context = self._get_or_create_context(message_id, channel_id)
279 # Remove previous reactions if configured
280 if self._config.remove_previous_on_transition:
281 self._remove_all_reactions(context)
283 # Add error emoji
284 emoji = self._config.error_emoji
285 if self._add_reaction_internal(channel_id, message_id, emoji):
286 context.reactions_added.append(emoji)
288 context.current_state = AckState.ERROR
289 context.last_updated = time.time()
291 logger.debug(f"Acked error for message {message_id}")
292 return context
294 def ack_queued(self, channel_id: str, message_id: str) -> AckContext:
295 """
296 Acknowledge that a message is queued for processing.
298 Args:
299 channel_id: The channel containing the message.
300 message_id: The queued message.
302 Returns:
303 AckContext for the acknowledgment session.
304 """
305 with self._lock:
306 context = self._get_or_create_context(message_id, channel_id)
308 # Add queued emoji
309 emoji = self._config.queued_emoji
310 if self._add_reaction_internal(channel_id, message_id, emoji):
311 context.reactions_added.append(emoji)
313 context.last_updated = time.time()
315 logger.debug(f"Acked queued for message {message_id}")
316 return context
318 def ack_thinking(self, channel_id: str, message_id: str) -> AckContext:
319 """
320 Show thinking indicator on a message.
322 Args:
323 channel_id: The channel containing the message.
324 message_id: The message being thought about.
326 Returns:
327 AckContext for the acknowledgment session.
328 """
329 with self._lock:
330 context = self._get_or_create_context(message_id, channel_id)
332 # Add thinking emoji
333 emoji = self._config.thinking_emoji
334 if self._add_reaction_internal(channel_id, message_id, emoji):
335 context.reactions_added.append(emoji)
337 context.last_updated = time.time()
339 logger.debug(f"Acked thinking for message {message_id}")
340 return context
342 def remove_acks(self, channel_id: str, message_id: str) -> int:
343 """
344 Remove all acknowledgment reactions from a message.
346 Args:
347 channel_id: The channel containing the message.
348 message_id: The message to clear reactions from.
350 Returns:
351 Number of reactions removed.
352 """
353 with self._lock:
354 context = self._contexts.get(message_id)
355 if not context:
356 return 0
358 count = self._remove_all_reactions(context)
359 context.current_state = AckState.NONE
361 # Clean up context
362 del self._contexts[message_id]
364 logger.debug(f"Removed {count} acks from message {message_id}")
365 return count
367 def _remove_all_reactions(self, context: AckContext) -> int:
368 """Remove all reactions from a context."""
369 count = 0
370 for emoji in list(context.reactions_added):
371 if self._remove_reaction_internal(context.channel_id, context.message_id, emoji):
372 count += 1
373 context.reactions_added.clear()
374 return count
376 def _schedule_removal(self, context: AckContext) -> None:
377 """Schedule automatic removal of reactions."""
378 def remove_after_delay():
379 time.sleep(self._config.auto_remove_delay)
380 self.remove_acks(context.channel_id, context.message_id)
382 thread = threading.Thread(target=remove_after_delay, daemon=True)
383 thread.start()
385 def get_state(self, message_id: str) -> AckState:
386 """Get the current acknowledgment state for a message."""
387 with self._lock:
388 context = self._contexts.get(message_id)
389 return context.current_state if context else AckState.NONE
391 def get_context(self, message_id: str) -> Optional[AckContext]:
392 """Get the acknowledgment context for a message."""
393 with self._lock:
394 return self._contexts.get(message_id)
396 def get_active_messages(self) -> List[str]:
397 """Get list of messages with active acknowledgments."""
398 with self._lock:
399 return list(self._contexts.keys())
401 def clear_all(self) -> int:
402 """
403 Clear all acknowledgments.
405 Returns:
406 Number of messages cleared.
407 """
408 with self._lock:
409 count = len(self._contexts)
410 for context in list(self._contexts.values()):
411 self._remove_all_reactions(context)
412 self._contexts.clear()
413 return count
415 # Async variants
416 async def ack_received_async(self, channel_id: str, message_id: str) -> AckContext:
417 """Async version of ack_received()."""
418 async with self._get_async_lock():
419 return self.ack_received(channel_id, message_id)
421 async def ack_processing_async(self, channel_id: str, message_id: str) -> AckContext:
422 """Async version of ack_processing()."""
423 async with self._get_async_lock():
424 return self.ack_processing(channel_id, message_id)
426 async def ack_complete_async(self, channel_id: str, message_id: str,
427 auto_remove: Optional[bool] = None) -> AckContext:
428 """Async version of ack_complete()."""
429 async with self._get_async_lock():
430 return self.ack_complete(channel_id, message_id, auto_remove)
432 async def ack_error_async(self, channel_id: str, message_id: str) -> AckContext:
433 """Async version of ack_error()."""
434 async with self._get_async_lock():
435 return self.ack_error(channel_id, message_id)
437 async def remove_acks_async(self, channel_id: str, message_id: str) -> int:
438 """Async version of remove_acks()."""
439 async with self._get_async_lock():
440 return self.remove_acks(channel_id, message_id)
442 def transition_state(self, channel_id: str, message_id: str,
443 new_state: AckState) -> AckContext:
444 """
445 Transition a message to a new acknowledgment state.
447 Args:
448 channel_id: The channel containing the message.
449 message_id: The message to transition.
450 new_state: The new state to transition to.
452 Returns:
453 Updated AckContext.
454 """
455 state_methods = {
456 AckState.RECEIVED: self.ack_received,
457 AckState.PROCESSING: self.ack_processing,
458 AckState.COMPLETE: self.ack_complete,
459 AckState.ERROR: self.ack_error,
460 AckState.NONE: lambda c, m: self.remove_acks(c, m) or self._get_or_create_context(m, c)
461 }
463 method = state_methods.get(new_state)
464 if method:
465 return method(channel_id, message_id)
466 raise ValueError(f"Unknown state: {new_state}")
468 def get_stats(self) -> dict:
469 """Get statistics about acknowledgments."""
470 with self._lock:
471 state_counts = {}
472 for context in self._contexts.values():
473 state = context.current_state.value
474 state_counts[state] = state_counts.get(state, 0) + 1
476 return {
477 "total_tracked": len(self._contexts),
478 "state_counts": state_counts,
479 "messages": list(self._contexts.keys())
480 }