Coverage for integrations / channels / response / streaming.py: 43.1%
297 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2StreamingResponse - Manages streaming LLM responses to chat platforms.
4Provides support for streaming responses with message editing for platforms
5that support it (Discord, Telegram, Slack), with fallback for those that don't.
6Designed to work properly in Docker/containerized environments.
7"""
9import asyncio
10import threading
11import time
12from contextlib import asynccontextmanager
13from typing import Optional, Callable, Any, AsyncGenerator, Dict, List, Union
14from dataclasses import dataclass, field
15from enum import Enum
16import logging
18logger = logging.getLogger(__name__)
21class StreamState(Enum):
22 """States for streaming response."""
23 IDLE = "idle"
24 STREAMING = "streaming"
25 UPDATING = "updating"
26 FINALIZING = "finalizing"
27 COMPLETED = "completed"
28 ERROR = "error"
29 CANCELLED = "cancelled"
32class PlatformCapability(Enum):
33 """Platform message editing capabilities."""
34 FULL_EDIT = "full_edit" # Can edit messages (Discord, Telegram, Slack)
35 APPEND_ONLY = "append_only" # Can only append new messages
36 NO_EDIT = "no_edit" # No edit support, fallback to single message
39@dataclass
40class StreamConfig:
41 """Configuration for streaming behavior."""
42 update_interval: float = 0.5 # Seconds between updates
43 min_chunk_size: int = 10 # Minimum characters before update
44 max_chunk_size: int = 500 # Maximum characters per update
45 buffer_size: int = 4096 # Buffer size for stream reading
46 show_typing_during_stream: bool = True
47 progress_indicator: str = "..." # Appended during streaming
48 error_indicator: str = "[Error]"
49 timeout: float = 300.0 # Max stream duration in seconds
50 max_retries: int = 3 # Retries on update failure
51 retry_delay: float = 1.0 # Delay between retries
52 # Docker/container-specific settings
53 websocket_ping_interval: float = 20.0 # Keep-alive for WebSocket
54 connection_timeout: float = 30.0 # Connection establishment timeout
57@dataclass
58class StreamContext:
59 """Context information for a streaming session."""
60 channel: str
61 chat_id: str
62 message_id: Optional[str] = None
63 initial_message_id: Optional[str] = None # Original user message
64 content_buffer: str = ""
65 chunk_count: int = 0
66 bytes_streamed: int = 0
67 started_at: float = field(default_factory=time.time)
68 last_update_at: float = field(default_factory=time.time)
69 state: StreamState = StreamState.IDLE
70 error: Optional[str] = None
71 platform_capability: PlatformCapability = PlatformCapability.FULL_EDIT
72 metadata: Dict[str, Any] = field(default_factory=dict)
75@dataclass
76class ProgressIndicator:
77 """Progress indicator configuration."""
78 enabled: bool = True
79 style: str = "dots" # dots, spinner, percentage, none
80 frames: List[str] = field(default_factory=lambda: [".", "..", "..."])
81 current_frame: int = 0
83 def next_frame(self) -> str:
84 """Get the next animation frame."""
85 if not self.enabled or self.style == "none":
86 return ""
87 frame = self.frames[self.current_frame]
88 self.current_frame = (self.current_frame + 1) % len(self.frames)
89 return frame
92# Platform capability mappings
93PLATFORM_CAPABILITIES: Dict[str, PlatformCapability] = {
94 "discord": PlatformCapability.FULL_EDIT,
95 "telegram": PlatformCapability.FULL_EDIT,
96 "slack": PlatformCapability.FULL_EDIT,
97 "whatsapp": PlatformCapability.NO_EDIT,
98 "sms": PlatformCapability.NO_EDIT,
99 "matrix": PlatformCapability.FULL_EDIT,
100 "teams": PlatformCapability.FULL_EDIT,
101 "line": PlatformCapability.NO_EDIT,
102 "web": PlatformCapability.FULL_EDIT,
103 "api": PlatformCapability.APPEND_ONLY,
104}
107class StreamingResponse:
108 """
109 Manages streaming LLM responses to chat platforms.
111 Supports:
112 - Streaming with message editing for supported platforms
113 - Chunked updates with configurable interval
114 - Fallback for platforms without edit support
115 - Progress indicators during streaming
116 - Error handling for stream interruptions
117 - Integration with TypingManager for typing indicators
118 - Docker/container-friendly WebSocket handling
119 """
121 def __init__(
122 self,
123 send_message: Optional[Callable[[str, str], Any]] = None,
124 edit_message: Optional[Callable[[str, str, str], Any]] = None,
125 typing_manager: Optional[Any] = None,
126 config: Optional[StreamConfig] = None
127 ):
128 """
129 Initialize the StreamingResponse manager.
131 Args:
132 send_message: Callback to send message. Takes (chat_id, text) -> message_id.
133 edit_message: Callback to edit message. Takes (chat_id, message_id, text).
134 typing_manager: Optional TypingManager instance for showing typing.
135 config: Optional configuration for streaming behavior.
136 """
137 self._send_message = send_message
138 self._edit_message = edit_message
139 self._typing_manager = typing_manager
140 self._config = config or StreamConfig()
141 self._active_streams: Dict[str, StreamContext] = {}
142 self._lock = threading.Lock()
143 self._async_lock: Optional[asyncio.Lock] = None
144 self._cancelled_streams: set = set()
146 @property
147 def config(self) -> StreamConfig:
148 """Get the streaming configuration."""
149 return self._config
151 def set_callbacks(
152 self,
153 send_message: Callable[[str, str], Any],
154 edit_message: Optional[Callable[[str, str, str], Any]] = None
155 ) -> None:
156 """Set the callback functions for sending/editing messages."""
157 self._send_message = send_message
158 self._edit_message = edit_message
160 def set_typing_manager(self, typing_manager: Any) -> None:
161 """Set the typing manager for showing typing during stream."""
162 self._typing_manager = typing_manager
164 def _get_async_lock(self) -> asyncio.Lock:
165 """Get or create the async lock."""
166 if self._async_lock is None:
167 self._async_lock = asyncio.Lock()
168 return self._async_lock
170 def _get_platform_capability(self, channel: str) -> PlatformCapability:
171 """Get the capability for a platform."""
172 return PLATFORM_CAPABILITIES.get(
173 channel.lower(),
174 PlatformCapability.NO_EDIT
175 )
177 def _create_stream_key(self, channel: str, chat_id: str) -> str:
178 """Create a unique key for a stream context."""
179 return f"{channel}:{chat_id}"
181 async def stream(
182 self,
183 channel: str,
184 chat_id: str,
185 generator: AsyncGenerator[str, None],
186 initial_message_id: Optional[str] = None,
187 metadata: Optional[Dict[str, Any]] = None
188 ) -> StreamContext:
189 """
190 Stream content from an async generator to a chat.
192 Args:
193 channel: The channel name (telegram, discord, etc.).
194 chat_id: The chat/conversation ID.
195 generator: Async generator yielding content chunks.
196 initial_message_id: Optional message ID being responded to.
197 metadata: Optional metadata to attach to context.
199 Returns:
200 StreamContext with final state and content.
201 """
202 stream_key = self._create_stream_key(channel, chat_id)
204 # Cancel any existing stream for this chat
205 await self._cancel_existing_stream(stream_key)
207 # Create stream context
208 context = StreamContext(
209 channel=channel,
210 chat_id=chat_id,
211 initial_message_id=initial_message_id,
212 state=StreamState.STREAMING,
213 platform_capability=self._get_platform_capability(channel),
214 metadata=metadata or {}
215 )
217 async with self._get_async_lock():
218 self._active_streams[stream_key] = context
219 self._cancelled_streams.discard(stream_key)
221 try:
222 # Start typing indicator if available
223 if self._typing_manager and self._config.show_typing_during_stream:
224 await self._start_typing(channel, chat_id)
226 # Send initial placeholder message for platforms with edit support
227 if context.platform_capability == PlatformCapability.FULL_EDIT:
228 context.message_id = await self._send_initial_message(
229 chat_id,
230 self._config.progress_indicator
231 )
233 # Process the stream
234 await self._process_stream(context, generator, stream_key)
236 # Finalize
237 if context.state != StreamState.CANCELLED:
238 await self.finalize(context.message_id, context.content_buffer)
239 context.state = StreamState.COMPLETED
241 except asyncio.CancelledError:
242 context.state = StreamState.CANCELLED
243 logger.info(f"Stream cancelled for {stream_key}")
244 except Exception as e:
245 context.state = StreamState.ERROR
246 context.error = str(e)
247 logger.error(f"Stream error for {stream_key}: {e}")
248 await self._handle_stream_error(context, e)
249 finally:
250 # Stop typing indicator
251 if self._typing_manager and self._config.show_typing_during_stream:
252 await self._stop_typing(channel, chat_id)
254 # Clean up
255 async with self._get_async_lock():
256 self._active_streams.pop(stream_key, None)
258 return context
260 async def _process_stream(
261 self,
262 context: StreamContext,
263 generator: AsyncGenerator[str, None],
264 stream_key: str
265 ) -> None:
266 """Process chunks from the generator."""
267 last_update_time = time.time()
268 pending_content = ""
269 progress = ProgressIndicator()
270 timeout_task = asyncio.create_task(
271 self._stream_timeout_watcher(stream_key, self._config.timeout)
272 )
274 try:
275 async for chunk in generator:
276 # Check for cancellation
277 if stream_key in self._cancelled_streams:
278 context.state = StreamState.CANCELLED
279 break
281 # Accumulate content
282 context.content_buffer += chunk
283 pending_content += chunk
284 context.chunk_count += 1
285 context.bytes_streamed += len(chunk.encode('utf-8'))
287 # Check if we should update
288 current_time = time.time()
289 time_elapsed = current_time - last_update_time
290 content_size = len(pending_content)
292 should_update = (
293 time_elapsed >= self._config.update_interval and
294 content_size >= self._config.min_chunk_size
295 ) or content_size >= self._config.max_chunk_size
297 if should_update and context.message_id:
298 context.state = StreamState.UPDATING
300 # Add progress indicator
301 display_content = context.content_buffer
302 if progress.enabled:
303 display_content += progress.next_frame()
305 await self.update_message(
306 context.message_id,
307 display_content
308 )
310 last_update_time = current_time
311 pending_content = ""
312 context.last_update_at = current_time
313 context.state = StreamState.STREAMING
315 # Pulse typing if needed
316 if self._typing_manager:
317 await self._pulse_typing(context.channel, context.chat_id)
319 finally:
320 timeout_task.cancel()
321 try:
322 await timeout_task
323 except asyncio.CancelledError:
324 pass
326 async def _stream_timeout_watcher(
327 self,
328 stream_key: str,
329 timeout: float
330 ) -> None:
331 """Watch for stream timeout and cancel if exceeded."""
332 await asyncio.sleep(timeout)
333 logger.warning(f"Stream timeout reached for {stream_key}")
334 self._cancelled_streams.add(stream_key)
336 async def _cancel_existing_stream(self, stream_key: str) -> None:
337 """Cancel an existing stream for the same chat."""
338 async with self._get_async_lock():
339 if stream_key in self._active_streams:
340 self._cancelled_streams.add(stream_key)
341 logger.info(f"Cancelled existing stream for {stream_key}")
343 async def update_message(
344 self,
345 message_id: str,
346 content: str
347 ) -> None:
348 """
349 Update an existing message with new content.
351 Args:
352 message_id: The message ID to update.
353 content: The new content to set.
354 """
355 if not self._edit_message:
356 logger.debug("No edit_message callback set, skipping update")
357 return
359 retries = 0
360 while retries < self._config.max_retries:
361 try:
362 result = self._edit_message(message_id, content)
363 if asyncio.iscoroutine(result):
364 await result
365 return
366 except Exception as e:
367 retries += 1
368 if retries >= self._config.max_retries:
369 logger.error(f"Failed to update message after {retries} retries: {e}")
370 raise
371 logger.warning(f"Update retry {retries}/{self._config.max_retries}: {e}")
372 await asyncio.sleep(self._config.retry_delay)
374 async def finalize(
375 self,
376 message_id: Optional[str],
377 final_content: str
378 ) -> None:
379 """
380 Finalize a streaming message with the complete content.
382 Args:
383 message_id: The message ID to finalize (or None for new message).
384 final_content: The final complete content.
385 """
386 if message_id and self._edit_message:
387 # Update existing message with final content (no progress indicator)
388 await self.update_message(message_id, final_content)
389 elif not message_id and self._send_message:
390 # Send as new message if no message_id
391 await self._send_initial_message(None, final_content)
393 async def _send_initial_message(
394 self,
395 chat_id: Optional[str],
396 content: str
397 ) -> Optional[str]:
398 """Send the initial placeholder message."""
399 if not self._send_message:
400 return None
402 try:
403 result = self._send_message(chat_id, content)
404 if asyncio.iscoroutine(result):
405 result = await result
406 # Expect result to be message_id or dict with message_id
407 if isinstance(result, str):
408 return result
409 elif isinstance(result, dict):
410 return result.get('message_id') or result.get('id')
411 return str(result) if result else None
412 except Exception as e:
413 logger.error(f"Failed to send initial message: {e}")
414 return None
416 async def _handle_stream_error(
417 self,
418 context: StreamContext,
419 error: Exception
420 ) -> None:
421 """Handle stream errors by sending error indicator."""
422 error_content = context.content_buffer
423 if error_content:
424 error_content += f"\n\n{self._config.error_indicator}"
425 else:
426 error_content = f"{self._config.error_indicator} {str(error)}"
428 if context.message_id:
429 try:
430 await self.update_message(context.message_id, error_content)
431 except Exception as e:
432 logger.error(f"Failed to send error indicator: {e}")
434 async def _start_typing(self, channel: str, chat_id: str) -> None:
435 """Start typing indicator via TypingManager."""
436 if self._typing_manager:
437 try:
438 if hasattr(self._typing_manager, 'start_async'):
439 await self._typing_manager.start_async(chat_id)
440 else:
441 self._typing_manager.start(chat_id)
442 except Exception as e:
443 logger.debug(f"Failed to start typing: {e}")
445 async def _stop_typing(self, channel: str, chat_id: str) -> None:
446 """Stop typing indicator via TypingManager."""
447 if self._typing_manager:
448 try:
449 if hasattr(self._typing_manager, 'stop_async'):
450 await self._typing_manager.stop_async(chat_id)
451 else:
452 self._typing_manager.stop(chat_id)
453 except Exception as e:
454 logger.debug(f"Failed to stop typing: {e}")
456 async def _pulse_typing(self, channel: str, chat_id: str) -> None:
457 """Pulse typing indicator to keep it active."""
458 if self._typing_manager:
459 try:
460 if hasattr(self._typing_manager, 'pulse_async'):
461 await self._typing_manager.pulse_async(chat_id)
462 else:
463 self._typing_manager.pulse(chat_id)
464 except Exception as e:
465 logger.debug(f"Failed to pulse typing: {e}")
467 def cancel_stream(self, channel: str, chat_id: str) -> bool:
468 """
469 Cancel an active stream.
471 Args:
472 channel: The channel name.
473 chat_id: The chat ID.
475 Returns:
476 True if a stream was cancelled, False if none was active.
477 """
478 stream_key = self._create_stream_key(channel, chat_id)
479 with self._lock:
480 if stream_key in self._active_streams:
481 self._cancelled_streams.add(stream_key)
482 return True
483 return False
485 def is_streaming(self, channel: str, chat_id: str) -> bool:
486 """Check if streaming is active for a chat."""
487 stream_key = self._create_stream_key(channel, chat_id)
488 with self._lock:
489 return stream_key in self._active_streams
491 def get_context(self, channel: str, chat_id: str) -> Optional[StreamContext]:
492 """Get the streaming context for a chat."""
493 stream_key = self._create_stream_key(channel, chat_id)
494 with self._lock:
495 return self._active_streams.get(stream_key)
497 def get_active_streams(self) -> List[str]:
498 """Get list of active stream keys."""
499 with self._lock:
500 return list(self._active_streams.keys())
502 async def cancel_all(self) -> int:
503 """
504 Cancel all active streams.
506 Returns:
507 Number of streams cancelled.
508 """
509 async with self._get_async_lock():
510 count = len(self._active_streams)
511 for stream_key in self._active_streams.keys():
512 self._cancelled_streams.add(stream_key)
513 return count
515 @asynccontextmanager
516 async def streaming_context(
517 self,
518 channel: str,
519 chat_id: str,
520 initial_message_id: Optional[str] = None
521 ):
522 """
523 Async context manager for streaming sessions.
525 Usage:
526 async with streaming.streaming_context("telegram", "123") as ctx:
527 async for chunk in llm_response:
528 ctx.content_buffer += chunk
529 await streaming.update_message(ctx.message_id, ctx.content_buffer)
531 Args:
532 channel: The channel name.
533 chat_id: The chat ID.
534 initial_message_id: Optional message being responded to.
536 Yields:
537 StreamContext for the session.
538 """
539 stream_key = self._create_stream_key(channel, chat_id)
541 context = StreamContext(
542 channel=channel,
543 chat_id=chat_id,
544 initial_message_id=initial_message_id,
545 state=StreamState.STREAMING,
546 platform_capability=self._get_platform_capability(channel)
547 )
549 async with self._get_async_lock():
550 self._active_streams[stream_key] = context
552 try:
553 # Start typing
554 if self._typing_manager and self._config.show_typing_during_stream:
555 await self._start_typing(channel, chat_id)
557 # Send initial message for platforms with edit support
558 if context.platform_capability == PlatformCapability.FULL_EDIT:
559 context.message_id = await self._send_initial_message(
560 chat_id,
561 self._config.progress_indicator
562 )
564 yield context
566 # Finalize
567 if context.state != StreamState.CANCELLED and context.content_buffer:
568 await self.finalize(context.message_id, context.content_buffer)
569 context.state = StreamState.COMPLETED
571 except Exception as e:
572 context.state = StreamState.ERROR
573 context.error = str(e)
574 await self._handle_stream_error(context, e)
575 raise
576 finally:
577 # Stop typing
578 if self._typing_manager and self._config.show_typing_during_stream:
579 await self._stop_typing(channel, chat_id)
581 # Clean up
582 async with self._get_async_lock():
583 self._active_streams.pop(stream_key, None)
585 def get_stats(self) -> Dict[str, Any]:
586 """Get statistics about streaming responses."""
587 with self._lock:
588 total_bytes = sum(
589 ctx.bytes_streamed for ctx in self._active_streams.values()
590 )
591 total_chunks = sum(
592 ctx.chunk_count for ctx in self._active_streams.values()
593 )
594 return {
595 "active_streams": len(self._active_streams),
596 "total_bytes_streamed": total_bytes,
597 "total_chunks": total_chunks,
598 "stream_keys": list(self._active_streams.keys())
599 }
602class FallbackStreamingResponse:
603 """
604 Fallback streaming handler for platforms without message editing.
606 Collects all content and sends as a single message at the end.
607 """
609 def __init__(
610 self,
611 send_message: Callable[[str, str], Any],
612 typing_manager: Optional[Any] = None,
613 config: Optional[StreamConfig] = None
614 ):
615 """
616 Initialize the fallback streaming handler.
618 Args:
619 send_message: Callback to send message. Takes (chat_id, text).
620 typing_manager: Optional TypingManager instance.
621 config: Optional configuration.
622 """
623 self._send_message = send_message
624 self._typing_manager = typing_manager
625 self._config = config or StreamConfig()
627 async def stream(
628 self,
629 channel: str,
630 chat_id: str,
631 generator: AsyncGenerator[str, None],
632 initial_message_id: Optional[str] = None
633 ) -> StreamContext:
634 """
635 Collect stream content and send as single message.
637 Args:
638 channel: The channel name.
639 chat_id: The chat ID.
640 generator: Async generator yielding content chunks.
641 initial_message_id: Optional message being responded to.
643 Returns:
644 StreamContext with collected content.
645 """
646 context = StreamContext(
647 channel=channel,
648 chat_id=chat_id,
649 initial_message_id=initial_message_id,
650 state=StreamState.STREAMING,
651 platform_capability=PlatformCapability.NO_EDIT
652 )
654 try:
655 # Start typing
656 if self._typing_manager:
657 if hasattr(self._typing_manager, 'start_async'):
658 await self._typing_manager.start_async(chat_id)
659 else:
660 self._typing_manager.start(chat_id)
662 # Collect all content
663 async for chunk in generator:
664 context.content_buffer += chunk
665 context.chunk_count += 1
666 context.bytes_streamed += len(chunk.encode('utf-8'))
668 # Pulse typing periodically
669 if self._typing_manager and context.chunk_count % 10 == 0:
670 if hasattr(self._typing_manager, 'pulse_async'):
671 await self._typing_manager.pulse_async(chat_id)
672 else:
673 self._typing_manager.pulse(chat_id)
675 # Send final message
676 if context.content_buffer:
677 result = self._send_message(chat_id, context.content_buffer)
678 if asyncio.iscoroutine(result):
679 result = await result
680 if isinstance(result, str):
681 context.message_id = result
682 elif isinstance(result, dict):
683 context.message_id = result.get('message_id') or result.get('id')
685 context.state = StreamState.COMPLETED
687 except Exception as e:
688 context.state = StreamState.ERROR
689 context.error = str(e)
690 logger.error(f"Fallback stream error: {e}")
691 finally:
692 # Stop typing
693 if self._typing_manager:
694 if hasattr(self._typing_manager, 'stop_async'):
695 await self._typing_manager.stop_async(chat_id)
696 else:
697 self._typing_manager.stop(chat_id)
699 return context
702def create_streaming_response(
703 channel: str,
704 send_message: Callable[[str, str], Any],
705 edit_message: Optional[Callable[[str, str, str], Any]] = None,
706 typing_manager: Optional[Any] = None,
707 config: Optional[StreamConfig] = None
708) -> Union[StreamingResponse, FallbackStreamingResponse]:
709 """
710 Factory function to create appropriate streaming response handler.
712 Args:
713 channel: The channel name.
714 send_message: Callback to send message.
715 edit_message: Optional callback to edit message.
716 typing_manager: Optional TypingManager instance.
717 config: Optional configuration.
719 Returns:
720 StreamingResponse or FallbackStreamingResponse based on platform capability.
721 """
722 capability = PLATFORM_CAPABILITIES.get(
723 channel.lower(),
724 PlatformCapability.NO_EDIT
725 )
727 if capability == PlatformCapability.FULL_EDIT and edit_message:
728 return StreamingResponse(
729 send_message=send_message,
730 edit_message=edit_message,
731 typing_manager=typing_manager,
732 config=config
733 )
734 else:
735 return FallbackStreamingResponse(
736 send_message=send_message,
737 typing_manager=typing_manager,
738 config=config
739 )