Coverage for integrations / channels / response / typing.py: 35.3%
170 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2TypingManager - Manages typing indicators for chat channels.
4Provides start/stop/pulse methods and a typing() context manager
5for indicating that the bot is actively processing/typing a response.
6"""
8import asyncio
9import threading
10import time
11from contextlib import contextmanager, asynccontextmanager
12from typing import Optional, Callable, Any, Union
13from dataclasses import dataclass, field
14from enum import Enum
15import logging
17logger = logging.getLogger(__name__)
20class TypingState(Enum):
21 """States for typing indicator."""
22 IDLE = "idle"
23 TYPING = "typing"
24 PULSING = "pulsing"
27@dataclass
28class TypingConfig:
29 """Configuration for typing indicator behavior."""
30 pulse_interval: float = 5.0 # Seconds between pulses
31 auto_stop_timeout: float = 30.0 # Auto-stop after this duration
32 min_pulse_duration: float = 0.5 # Minimum duration for a pulse
33 max_retries: int = 3 # Max retries on failure
36@dataclass
37class TypingContext:
38 """Context information for a typing session."""
39 channel_id: str
40 user_id: Optional[str] = None
41 message_id: Optional[str] = None
42 started_at: float = field(default_factory=time.time)
43 pulse_count: int = 0
44 state: TypingState = TypingState.IDLE
47class TypingManager:
48 """
49 Manages typing indicators for chat channels.
51 Supports both synchronous and asynchronous operations,
52 with automatic pulse maintenance and timeout handling.
53 """
55 def __init__(
56 self,
57 send_typing: Optional[Callable[[str], Any]] = None,
58 config: Optional[TypingConfig] = None
59 ):
60 """
61 Initialize the TypingManager.
63 Args:
64 send_typing: Callback function to send typing indicator to channel.
65 Takes channel_id as argument.
66 config: Optional configuration for typing behavior.
67 """
68 self._send_typing = send_typing
69 self._config = config or TypingConfig()
70 self._active_contexts: dict[str, TypingContext] = {}
71 self._pulse_tasks: dict[str, Union[asyncio.Task, threading.Thread]] = {}
72 self._lock = threading.Lock()
73 self._async_lock: Optional[asyncio.Lock] = None
74 self._stopped_channels: set[str] = set()
76 @property
77 def config(self) -> TypingConfig:
78 """Get the typing configuration."""
79 return self._config
81 def set_send_callback(self, callback: Callable[[str], Any]) -> None:
82 """Set the callback function for sending typing indicators."""
83 self._send_typing = callback
85 def _get_async_lock(self) -> asyncio.Lock:
86 """Get or create the async lock."""
87 if self._async_lock is None:
88 self._async_lock = asyncio.Lock()
89 return self._async_lock
91 def start(self, channel_id: str, user_id: Optional[str] = None,
92 message_id: Optional[str] = None) -> TypingContext:
93 """
94 Start a typing indicator for a channel.
96 Args:
97 channel_id: The channel to show typing in.
98 user_id: Optional user ID for context.
99 message_id: Optional message ID being responded to.
101 Returns:
102 TypingContext for the started session.
103 """
104 with self._lock:
105 # Stop existing if any
106 if channel_id in self._active_contexts:
107 self._stop_internal(channel_id)
109 # Remove from stopped channels if present
110 self._stopped_channels.discard(channel_id)
112 context = TypingContext(
113 channel_id=channel_id,
114 user_id=user_id,
115 message_id=message_id,
116 state=TypingState.TYPING
117 )
118 self._active_contexts[channel_id] = context
120 # Send initial typing indicator
121 self._send_indicator(channel_id)
123 logger.debug(f"Started typing indicator for channel {channel_id}")
124 return context
126 def stop(self, channel_id: str) -> bool:
127 """
128 Stop the typing indicator for a channel.
130 Args:
131 channel_id: The channel to stop typing in.
133 Returns:
134 True if a typing session was stopped, False if none was active.
135 """
136 with self._lock:
137 return self._stop_internal(channel_id)
139 def _stop_internal(self, channel_id: str) -> bool:
140 """Internal stop without lock (must be called with lock held)."""
141 if channel_id not in self._active_contexts:
142 return False
144 context = self._active_contexts.pop(channel_id)
145 context.state = TypingState.IDLE
147 # Mark as stopped
148 self._stopped_channels.add(channel_id)
150 # Cancel pulse task if any
151 if channel_id in self._pulse_tasks:
152 task = self._pulse_tasks.pop(channel_id)
153 if isinstance(task, asyncio.Task):
154 task.cancel()
155 elif isinstance(task, threading.Thread):
156 # Thread will check stopped_channels and exit
157 pass
159 logger.debug(f"Stopped typing indicator for channel {channel_id}")
160 return True
162 def pulse(self, channel_id: str) -> bool:
163 """
164 Send a single typing pulse for a channel.
166 This refreshes the typing indicator without starting a new session.
168 Args:
169 channel_id: The channel to pulse typing in.
171 Returns:
172 True if pulse was sent, False if no active session.
173 """
174 with self._lock:
175 context = self._active_contexts.get(channel_id)
176 if context is None:
177 return False
179 context.pulse_count += 1
180 context.state = TypingState.PULSING
181 self._send_indicator(channel_id)
182 context.state = TypingState.TYPING
184 logger.debug(f"Pulsed typing indicator for channel {channel_id} "
185 f"(pulse #{context.pulse_count})")
186 return True
188 def _send_indicator(self, channel_id: str) -> None:
189 """Send the typing indicator via callback."""
190 if self._send_typing:
191 try:
192 self._send_typing(channel_id)
193 except Exception as e:
194 logger.warning(f"Failed to send typing indicator: {e}")
196 def is_typing(self, channel_id: str) -> bool:
197 """Check if typing is active for a channel."""
198 with self._lock:
199 return channel_id in self._active_contexts
201 def get_context(self, channel_id: str) -> Optional[TypingContext]:
202 """Get the typing context for a channel."""
203 with self._lock:
204 return self._active_contexts.get(channel_id)
206 def get_active_channels(self) -> list[str]:
207 """Get list of channels with active typing indicators."""
208 with self._lock:
209 return list(self._active_contexts.keys())
211 def stop_all(self) -> int:
212 """
213 Stop all active typing indicators.
215 Returns:
216 Number of indicators stopped.
217 """
218 with self._lock:
219 count = len(self._active_contexts)
220 channels = list(self._active_contexts.keys())
221 for channel_id in channels:
222 self._stop_internal(channel_id)
223 return count
225 @contextmanager
226 def typing(self, channel_id: str, user_id: Optional[str] = None,
227 message_id: Optional[str] = None, auto_pulse: bool = False):
228 """
229 Context manager for typing indicator.
231 Automatically starts typing on enter and stops on exit.
233 Args:
234 channel_id: The channel to show typing in.
235 user_id: Optional user ID for context.
236 message_id: Optional message ID being responded to.
237 auto_pulse: Whether to automatically pulse during the context.
239 Yields:
240 The TypingContext for the session.
242 Example:
243 with manager.typing("channel123") as ctx:
244 # Do processing...
245 pass
246 # Typing automatically stopped
247 """
248 context = self.start(channel_id, user_id, message_id)
249 pulse_thread = None
251 try:
252 if auto_pulse:
253 pulse_thread = self._start_pulse_thread(channel_id)
254 yield context
255 finally:
256 if pulse_thread:
257 # Thread will stop when channel is removed from active
258 pass
259 self.stop(channel_id)
261 def _start_pulse_thread(self, channel_id: str) -> threading.Thread:
262 """Start a background thread for auto-pulsing."""
263 def pulse_loop():
264 while True:
265 time.sleep(self._config.pulse_interval)
266 if channel_id in self._stopped_channels:
267 break
268 with self._lock:
269 if channel_id not in self._active_contexts:
270 break
271 if not self.pulse(channel_id):
272 break
274 thread = threading.Thread(target=pulse_loop, daemon=True)
275 thread.start()
276 with self._lock:
277 self._pulse_tasks[channel_id] = thread
278 return thread
280 # Async variants
281 async def start_async(self, channel_id: str, user_id: Optional[str] = None,
282 message_id: Optional[str] = None) -> TypingContext:
283 """Async version of start()."""
284 async with self._get_async_lock():
285 return self.start(channel_id, user_id, message_id)
287 async def stop_async(self, channel_id: str) -> bool:
288 """Async version of stop()."""
289 async with self._get_async_lock():
290 return self.stop(channel_id)
292 async def pulse_async(self, channel_id: str) -> bool:
293 """Async version of pulse()."""
294 async with self._get_async_lock():
295 return self.pulse(channel_id)
297 @asynccontextmanager
298 async def typing_async(self, channel_id: str, user_id: Optional[str] = None,
299 message_id: Optional[str] = None, auto_pulse: bool = False):
300 """
301 Async context manager for typing indicator.
303 Args:
304 channel_id: The channel to show typing in.
305 user_id: Optional user ID for context.
306 message_id: Optional message ID being responded to.
307 auto_pulse: Whether to automatically pulse during the context.
309 Yields:
310 The TypingContext for the session.
311 """
312 context = await self.start_async(channel_id, user_id, message_id)
313 pulse_task = None
315 try:
316 if auto_pulse:
317 pulse_task = asyncio.create_task(
318 self._pulse_loop_async(channel_id)
319 )
320 with self._lock:
321 self._pulse_tasks[channel_id] = pulse_task
322 yield context
323 finally:
324 if pulse_task:
325 pulse_task.cancel()
326 try:
327 await pulse_task
328 except asyncio.CancelledError:
329 pass
330 await self.stop_async(channel_id)
332 async def _pulse_loop_async(self, channel_id: str) -> None:
333 """Async pulse loop for auto-pulsing."""
334 try:
335 while True:
336 await asyncio.sleep(self._config.pulse_interval)
337 if not await self.pulse_async(channel_id):
338 break
339 except asyncio.CancelledError:
340 pass
342 def get_typing_duration(self, channel_id: str) -> Optional[float]:
343 """Get how long typing has been active for a channel."""
344 with self._lock:
345 context = self._active_contexts.get(channel_id)
346 if context:
347 return time.time() - context.started_at
348 return None
350 def get_stats(self) -> dict:
351 """Get statistics about typing indicators."""
352 with self._lock:
353 total_pulses = sum(
354 ctx.pulse_count for ctx in self._active_contexts.values()
355 )
356 return {
357 "active_count": len(self._active_contexts),
358 "total_pulses": total_pulses,
359 "channels": list(self._active_contexts.keys())
360 }