Coverage for integrations / channels / response / typing.py: 35.3%

170 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2TypingManager - Manages typing indicators for chat channels. 

3 

4Provides start/stop/pulse methods and a typing() context manager 

5for indicating that the bot is actively processing/typing a response. 

6""" 

7 

8import asyncio 

9import threading 

10import time 

11from contextlib import contextmanager, asynccontextmanager 

12from typing import Optional, Callable, Any, Union 

13from dataclasses import dataclass, field 

14from enum import Enum 

15import logging 

16 

17logger = logging.getLogger(__name__) 

18 

19 

20class TypingState(Enum): 

21 """States for typing indicator.""" 

22 IDLE = "idle" 

23 TYPING = "typing" 

24 PULSING = "pulsing" 

25 

26 

27@dataclass 

28class TypingConfig: 

29 """Configuration for typing indicator behavior.""" 

30 pulse_interval: float = 5.0 # Seconds between pulses 

31 auto_stop_timeout: float = 30.0 # Auto-stop after this duration 

32 min_pulse_duration: float = 0.5 # Minimum duration for a pulse 

33 max_retries: int = 3 # Max retries on failure 

34 

35 

36@dataclass 

37class TypingContext: 

38 """Context information for a typing session.""" 

39 channel_id: str 

40 user_id: Optional[str] = None 

41 message_id: Optional[str] = None 

42 started_at: float = field(default_factory=time.time) 

43 pulse_count: int = 0 

44 state: TypingState = TypingState.IDLE 

45 

46 

47class TypingManager: 

48 """ 

49 Manages typing indicators for chat channels. 

50 

51 Supports both synchronous and asynchronous operations, 

52 with automatic pulse maintenance and timeout handling. 

53 """ 

54 

55 def __init__( 

56 self, 

57 send_typing: Optional[Callable[[str], Any]] = None, 

58 config: Optional[TypingConfig] = None 

59 ): 

60 """ 

61 Initialize the TypingManager. 

62 

63 Args: 

64 send_typing: Callback function to send typing indicator to channel. 

65 Takes channel_id as argument. 

66 config: Optional configuration for typing behavior. 

67 """ 

68 self._send_typing = send_typing 

69 self._config = config or TypingConfig() 

70 self._active_contexts: dict[str, TypingContext] = {} 

71 self._pulse_tasks: dict[str, Union[asyncio.Task, threading.Thread]] = {} 

72 self._lock = threading.Lock() 

73 self._async_lock: Optional[asyncio.Lock] = None 

74 self._stopped_channels: set[str] = set() 

75 

76 @property 

77 def config(self) -> TypingConfig: 

78 """Get the typing configuration.""" 

79 return self._config 

80 

81 def set_send_callback(self, callback: Callable[[str], Any]) -> None: 

82 """Set the callback function for sending typing indicators.""" 

83 self._send_typing = callback 

84 

85 def _get_async_lock(self) -> asyncio.Lock: 

86 """Get or create the async lock.""" 

87 if self._async_lock is None: 

88 self._async_lock = asyncio.Lock() 

89 return self._async_lock 

90 

91 def start(self, channel_id: str, user_id: Optional[str] = None, 

92 message_id: Optional[str] = None) -> TypingContext: 

93 """ 

94 Start a typing indicator for a channel. 

95 

96 Args: 

97 channel_id: The channel to show typing in. 

98 user_id: Optional user ID for context. 

99 message_id: Optional message ID being responded to. 

100 

101 Returns: 

102 TypingContext for the started session. 

103 """ 

104 with self._lock: 

105 # Stop existing if any 

106 if channel_id in self._active_contexts: 

107 self._stop_internal(channel_id) 

108 

109 # Remove from stopped channels if present 

110 self._stopped_channels.discard(channel_id) 

111 

112 context = TypingContext( 

113 channel_id=channel_id, 

114 user_id=user_id, 

115 message_id=message_id, 

116 state=TypingState.TYPING 

117 ) 

118 self._active_contexts[channel_id] = context 

119 

120 # Send initial typing indicator 

121 self._send_indicator(channel_id) 

122 

123 logger.debug(f"Started typing indicator for channel {channel_id}") 

124 return context 

125 

126 def stop(self, channel_id: str) -> bool: 

127 """ 

128 Stop the typing indicator for a channel. 

129 

130 Args: 

131 channel_id: The channel to stop typing in. 

132 

133 Returns: 

134 True if a typing session was stopped, False if none was active. 

135 """ 

136 with self._lock: 

137 return self._stop_internal(channel_id) 

138 

139 def _stop_internal(self, channel_id: str) -> bool: 

140 """Internal stop without lock (must be called with lock held).""" 

141 if channel_id not in self._active_contexts: 

142 return False 

143 

144 context = self._active_contexts.pop(channel_id) 

145 context.state = TypingState.IDLE 

146 

147 # Mark as stopped 

148 self._stopped_channels.add(channel_id) 

149 

150 # Cancel pulse task if any 

151 if channel_id in self._pulse_tasks: 

152 task = self._pulse_tasks.pop(channel_id) 

153 if isinstance(task, asyncio.Task): 

154 task.cancel() 

155 elif isinstance(task, threading.Thread): 

156 # Thread will check stopped_channels and exit 

157 pass 

158 

159 logger.debug(f"Stopped typing indicator for channel {channel_id}") 

160 return True 

161 

162 def pulse(self, channel_id: str) -> bool: 

163 """ 

164 Send a single typing pulse for a channel. 

165 

166 This refreshes the typing indicator without starting a new session. 

167 

168 Args: 

169 channel_id: The channel to pulse typing in. 

170 

171 Returns: 

172 True if pulse was sent, False if no active session. 

173 """ 

174 with self._lock: 

175 context = self._active_contexts.get(channel_id) 

176 if context is None: 

177 return False 

178 

179 context.pulse_count += 1 

180 context.state = TypingState.PULSING 

181 self._send_indicator(channel_id) 

182 context.state = TypingState.TYPING 

183 

184 logger.debug(f"Pulsed typing indicator for channel {channel_id} " 

185 f"(pulse #{context.pulse_count})") 

186 return True 

187 

188 def _send_indicator(self, channel_id: str) -> None: 

189 """Send the typing indicator via callback.""" 

190 if self._send_typing: 

191 try: 

192 self._send_typing(channel_id) 

193 except Exception as e: 

194 logger.warning(f"Failed to send typing indicator: {e}") 

195 

196 def is_typing(self, channel_id: str) -> bool: 

197 """Check if typing is active for a channel.""" 

198 with self._lock: 

199 return channel_id in self._active_contexts 

200 

201 def get_context(self, channel_id: str) -> Optional[TypingContext]: 

202 """Get the typing context for a channel.""" 

203 with self._lock: 

204 return self._active_contexts.get(channel_id) 

205 

206 def get_active_channels(self) -> list[str]: 

207 """Get list of channels with active typing indicators.""" 

208 with self._lock: 

209 return list(self._active_contexts.keys()) 

210 

211 def stop_all(self) -> int: 

212 """ 

213 Stop all active typing indicators. 

214 

215 Returns: 

216 Number of indicators stopped. 

217 """ 

218 with self._lock: 

219 count = len(self._active_contexts) 

220 channels = list(self._active_contexts.keys()) 

221 for channel_id in channels: 

222 self._stop_internal(channel_id) 

223 return count 

224 

225 @contextmanager 

226 def typing(self, channel_id: str, user_id: Optional[str] = None, 

227 message_id: Optional[str] = None, auto_pulse: bool = False): 

228 """ 

229 Context manager for typing indicator. 

230 

231 Automatically starts typing on enter and stops on exit. 

232 

233 Args: 

234 channel_id: The channel to show typing in. 

235 user_id: Optional user ID for context. 

236 message_id: Optional message ID being responded to. 

237 auto_pulse: Whether to automatically pulse during the context. 

238 

239 Yields: 

240 The TypingContext for the session. 

241 

242 Example: 

243 with manager.typing("channel123") as ctx: 

244 # Do processing... 

245 pass 

246 # Typing automatically stopped 

247 """ 

248 context = self.start(channel_id, user_id, message_id) 

249 pulse_thread = None 

250 

251 try: 

252 if auto_pulse: 

253 pulse_thread = self._start_pulse_thread(channel_id) 

254 yield context 

255 finally: 

256 if pulse_thread: 

257 # Thread will stop when channel is removed from active 

258 pass 

259 self.stop(channel_id) 

260 

261 def _start_pulse_thread(self, channel_id: str) -> threading.Thread: 

262 """Start a background thread for auto-pulsing.""" 

263 def pulse_loop(): 

264 while True: 

265 time.sleep(self._config.pulse_interval) 

266 if channel_id in self._stopped_channels: 

267 break 

268 with self._lock: 

269 if channel_id not in self._active_contexts: 

270 break 

271 if not self.pulse(channel_id): 

272 break 

273 

274 thread = threading.Thread(target=pulse_loop, daemon=True) 

275 thread.start() 

276 with self._lock: 

277 self._pulse_tasks[channel_id] = thread 

278 return thread 

279 

280 # Async variants 

281 async def start_async(self, channel_id: str, user_id: Optional[str] = None, 

282 message_id: Optional[str] = None) -> TypingContext: 

283 """Async version of start().""" 

284 async with self._get_async_lock(): 

285 return self.start(channel_id, user_id, message_id) 

286 

287 async def stop_async(self, channel_id: str) -> bool: 

288 """Async version of stop().""" 

289 async with self._get_async_lock(): 

290 return self.stop(channel_id) 

291 

292 async def pulse_async(self, channel_id: str) -> bool: 

293 """Async version of pulse().""" 

294 async with self._get_async_lock(): 

295 return self.pulse(channel_id) 

296 

297 @asynccontextmanager 

298 async def typing_async(self, channel_id: str, user_id: Optional[str] = None, 

299 message_id: Optional[str] = None, auto_pulse: bool = False): 

300 """ 

301 Async context manager for typing indicator. 

302 

303 Args: 

304 channel_id: The channel to show typing in. 

305 user_id: Optional user ID for context. 

306 message_id: Optional message ID being responded to. 

307 auto_pulse: Whether to automatically pulse during the context. 

308 

309 Yields: 

310 The TypingContext for the session. 

311 """ 

312 context = await self.start_async(channel_id, user_id, message_id) 

313 pulse_task = None 

314 

315 try: 

316 if auto_pulse: 

317 pulse_task = asyncio.create_task( 

318 self._pulse_loop_async(channel_id) 

319 ) 

320 with self._lock: 

321 self._pulse_tasks[channel_id] = pulse_task 

322 yield context 

323 finally: 

324 if pulse_task: 

325 pulse_task.cancel() 

326 try: 

327 await pulse_task 

328 except asyncio.CancelledError: 

329 pass 

330 await self.stop_async(channel_id) 

331 

332 async def _pulse_loop_async(self, channel_id: str) -> None: 

333 """Async pulse loop for auto-pulsing.""" 

334 try: 

335 while True: 

336 await asyncio.sleep(self._config.pulse_interval) 

337 if not await self.pulse_async(channel_id): 

338 break 

339 except asyncio.CancelledError: 

340 pass 

341 

342 def get_typing_duration(self, channel_id: str) -> Optional[float]: 

343 """Get how long typing has been active for a channel.""" 

344 with self._lock: 

345 context = self._active_contexts.get(channel_id) 

346 if context: 

347 return time.time() - context.started_at 

348 return None 

349 

350 def get_stats(self) -> dict: 

351 """Get statistics about typing indicators.""" 

352 with self._lock: 

353 total_pulses = sum( 

354 ctx.pulse_count for ctx in self._active_contexts.values() 

355 ) 

356 return { 

357 "active_count": len(self._active_contexts), 

358 "total_pulses": total_pulses, 

359 "channels": list(self._active_contexts.keys()) 

360 }