Coverage for integrations / channels / response / streaming.py: 43.1%

297 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2StreamingResponse - Manages streaming LLM responses to chat platforms. 

3 

4Provides support for streaming responses with message editing for platforms 

5that support it (Discord, Telegram, Slack), with fallback for those that don't. 

6Designed to work properly in Docker/containerized environments. 

7""" 

8 

9import asyncio 

10import threading 

11import time 

12from contextlib import asynccontextmanager 

13from typing import Optional, Callable, Any, AsyncGenerator, Dict, List, Union 

14from dataclasses import dataclass, field 

15from enum import Enum 

16import logging 

17 

18logger = logging.getLogger(__name__) 

19 

20 

21class StreamState(Enum): 

22 """States for streaming response.""" 

23 IDLE = "idle" 

24 STREAMING = "streaming" 

25 UPDATING = "updating" 

26 FINALIZING = "finalizing" 

27 COMPLETED = "completed" 

28 ERROR = "error" 

29 CANCELLED = "cancelled" 

30 

31 

32class PlatformCapability(Enum): 

33 """Platform message editing capabilities.""" 

34 FULL_EDIT = "full_edit" # Can edit messages (Discord, Telegram, Slack) 

35 APPEND_ONLY = "append_only" # Can only append new messages 

36 NO_EDIT = "no_edit" # No edit support, fallback to single message 

37 

38 

39@dataclass 

40class StreamConfig: 

41 """Configuration for streaming behavior.""" 

42 update_interval: float = 0.5 # Seconds between updates 

43 min_chunk_size: int = 10 # Minimum characters before update 

44 max_chunk_size: int = 500 # Maximum characters per update 

45 buffer_size: int = 4096 # Buffer size for stream reading 

46 show_typing_during_stream: bool = True 

47 progress_indicator: str = "..." # Appended during streaming 

48 error_indicator: str = "[Error]" 

49 timeout: float = 300.0 # Max stream duration in seconds 

50 max_retries: int = 3 # Retries on update failure 

51 retry_delay: float = 1.0 # Delay between retries 

52 # Docker/container-specific settings 

53 websocket_ping_interval: float = 20.0 # Keep-alive for WebSocket 

54 connection_timeout: float = 30.0 # Connection establishment timeout 

55 

56 

57@dataclass 

58class StreamContext: 

59 """Context information for a streaming session.""" 

60 channel: str 

61 chat_id: str 

62 message_id: Optional[str] = None 

63 initial_message_id: Optional[str] = None # Original user message 

64 content_buffer: str = "" 

65 chunk_count: int = 0 

66 bytes_streamed: int = 0 

67 started_at: float = field(default_factory=time.time) 

68 last_update_at: float = field(default_factory=time.time) 

69 state: StreamState = StreamState.IDLE 

70 error: Optional[str] = None 

71 platform_capability: PlatformCapability = PlatformCapability.FULL_EDIT 

72 metadata: Dict[str, Any] = field(default_factory=dict) 

73 

74 

75@dataclass 

76class ProgressIndicator: 

77 """Progress indicator configuration.""" 

78 enabled: bool = True 

79 style: str = "dots" # dots, spinner, percentage, none 

80 frames: List[str] = field(default_factory=lambda: [".", "..", "..."]) 

81 current_frame: int = 0 

82 

83 def next_frame(self) -> str: 

84 """Get the next animation frame.""" 

85 if not self.enabled or self.style == "none": 

86 return "" 

87 frame = self.frames[self.current_frame] 

88 self.current_frame = (self.current_frame + 1) % len(self.frames) 

89 return frame 

90 

91 

92# Platform capability mappings 

93PLATFORM_CAPABILITIES: Dict[str, PlatformCapability] = { 

94 "discord": PlatformCapability.FULL_EDIT, 

95 "telegram": PlatformCapability.FULL_EDIT, 

96 "slack": PlatformCapability.FULL_EDIT, 

97 "whatsapp": PlatformCapability.NO_EDIT, 

98 "sms": PlatformCapability.NO_EDIT, 

99 "matrix": PlatformCapability.FULL_EDIT, 

100 "teams": PlatformCapability.FULL_EDIT, 

101 "line": PlatformCapability.NO_EDIT, 

102 "web": PlatformCapability.FULL_EDIT, 

103 "api": PlatformCapability.APPEND_ONLY, 

104} 

105 

106 

107class StreamingResponse: 

108 """ 

109 Manages streaming LLM responses to chat platforms. 

110 

111 Supports: 

112 - Streaming with message editing for supported platforms 

113 - Chunked updates with configurable interval 

114 - Fallback for platforms without edit support 

115 - Progress indicators during streaming 

116 - Error handling for stream interruptions 

117 - Integration with TypingManager for typing indicators 

118 - Docker/container-friendly WebSocket handling 

119 """ 

120 

121 def __init__( 

122 self, 

123 send_message: Optional[Callable[[str, str], Any]] = None, 

124 edit_message: Optional[Callable[[str, str, str], Any]] = None, 

125 typing_manager: Optional[Any] = None, 

126 config: Optional[StreamConfig] = None 

127 ): 

128 """ 

129 Initialize the StreamingResponse manager. 

130 

131 Args: 

132 send_message: Callback to send message. Takes (chat_id, text) -> message_id. 

133 edit_message: Callback to edit message. Takes (chat_id, message_id, text). 

134 typing_manager: Optional TypingManager instance for showing typing. 

135 config: Optional configuration for streaming behavior. 

136 """ 

137 self._send_message = send_message 

138 self._edit_message = edit_message 

139 self._typing_manager = typing_manager 

140 self._config = config or StreamConfig() 

141 self._active_streams: Dict[str, StreamContext] = {} 

142 self._lock = threading.Lock() 

143 self._async_lock: Optional[asyncio.Lock] = None 

144 self._cancelled_streams: set = set() 

145 

146 @property 

147 def config(self) -> StreamConfig: 

148 """Get the streaming configuration.""" 

149 return self._config 

150 

151 def set_callbacks( 

152 self, 

153 send_message: Callable[[str, str], Any], 

154 edit_message: Optional[Callable[[str, str, str], Any]] = None 

155 ) -> None: 

156 """Set the callback functions for sending/editing messages.""" 

157 self._send_message = send_message 

158 self._edit_message = edit_message 

159 

160 def set_typing_manager(self, typing_manager: Any) -> None: 

161 """Set the typing manager for showing typing during stream.""" 

162 self._typing_manager = typing_manager 

163 

164 def _get_async_lock(self) -> asyncio.Lock: 

165 """Get or create the async lock.""" 

166 if self._async_lock is None: 

167 self._async_lock = asyncio.Lock() 

168 return self._async_lock 

169 

170 def _get_platform_capability(self, channel: str) -> PlatformCapability: 

171 """Get the capability for a platform.""" 

172 return PLATFORM_CAPABILITIES.get( 

173 channel.lower(), 

174 PlatformCapability.NO_EDIT 

175 ) 

176 

177 def _create_stream_key(self, channel: str, chat_id: str) -> str: 

178 """Create a unique key for a stream context.""" 

179 return f"{channel}:{chat_id}" 

180 

181 async def stream( 

182 self, 

183 channel: str, 

184 chat_id: str, 

185 generator: AsyncGenerator[str, None], 

186 initial_message_id: Optional[str] = None, 

187 metadata: Optional[Dict[str, Any]] = None 

188 ) -> StreamContext: 

189 """ 

190 Stream content from an async generator to a chat. 

191 

192 Args: 

193 channel: The channel name (telegram, discord, etc.). 

194 chat_id: The chat/conversation ID. 

195 generator: Async generator yielding content chunks. 

196 initial_message_id: Optional message ID being responded to. 

197 metadata: Optional metadata to attach to context. 

198 

199 Returns: 

200 StreamContext with final state and content. 

201 """ 

202 stream_key = self._create_stream_key(channel, chat_id) 

203 

204 # Cancel any existing stream for this chat 

205 await self._cancel_existing_stream(stream_key) 

206 

207 # Create stream context 

208 context = StreamContext( 

209 channel=channel, 

210 chat_id=chat_id, 

211 initial_message_id=initial_message_id, 

212 state=StreamState.STREAMING, 

213 platform_capability=self._get_platform_capability(channel), 

214 metadata=metadata or {} 

215 ) 

216 

217 async with self._get_async_lock(): 

218 self._active_streams[stream_key] = context 

219 self._cancelled_streams.discard(stream_key) 

220 

221 try: 

222 # Start typing indicator if available 

223 if self._typing_manager and self._config.show_typing_during_stream: 

224 await self._start_typing(channel, chat_id) 

225 

226 # Send initial placeholder message for platforms with edit support 

227 if context.platform_capability == PlatformCapability.FULL_EDIT: 

228 context.message_id = await self._send_initial_message( 

229 chat_id, 

230 self._config.progress_indicator 

231 ) 

232 

233 # Process the stream 

234 await self._process_stream(context, generator, stream_key) 

235 

236 # Finalize 

237 if context.state != StreamState.CANCELLED: 

238 await self.finalize(context.message_id, context.content_buffer) 

239 context.state = StreamState.COMPLETED 

240 

241 except asyncio.CancelledError: 

242 context.state = StreamState.CANCELLED 

243 logger.info(f"Stream cancelled for {stream_key}") 

244 except Exception as e: 

245 context.state = StreamState.ERROR 

246 context.error = str(e) 

247 logger.error(f"Stream error for {stream_key}: {e}") 

248 await self._handle_stream_error(context, e) 

249 finally: 

250 # Stop typing indicator 

251 if self._typing_manager and self._config.show_typing_during_stream: 

252 await self._stop_typing(channel, chat_id) 

253 

254 # Clean up 

255 async with self._get_async_lock(): 

256 self._active_streams.pop(stream_key, None) 

257 

258 return context 

259 

260 async def _process_stream( 

261 self, 

262 context: StreamContext, 

263 generator: AsyncGenerator[str, None], 

264 stream_key: str 

265 ) -> None: 

266 """Process chunks from the generator.""" 

267 last_update_time = time.time() 

268 pending_content = "" 

269 progress = ProgressIndicator() 

270 timeout_task = asyncio.create_task( 

271 self._stream_timeout_watcher(stream_key, self._config.timeout) 

272 ) 

273 

274 try: 

275 async for chunk in generator: 

276 # Check for cancellation 

277 if stream_key in self._cancelled_streams: 

278 context.state = StreamState.CANCELLED 

279 break 

280 

281 # Accumulate content 

282 context.content_buffer += chunk 

283 pending_content += chunk 

284 context.chunk_count += 1 

285 context.bytes_streamed += len(chunk.encode('utf-8')) 

286 

287 # Check if we should update 

288 current_time = time.time() 

289 time_elapsed = current_time - last_update_time 

290 content_size = len(pending_content) 

291 

292 should_update = ( 

293 time_elapsed >= self._config.update_interval and 

294 content_size >= self._config.min_chunk_size 

295 ) or content_size >= self._config.max_chunk_size 

296 

297 if should_update and context.message_id: 

298 context.state = StreamState.UPDATING 

299 

300 # Add progress indicator 

301 display_content = context.content_buffer 

302 if progress.enabled: 

303 display_content += progress.next_frame() 

304 

305 await self.update_message( 

306 context.message_id, 

307 display_content 

308 ) 

309 

310 last_update_time = current_time 

311 pending_content = "" 

312 context.last_update_at = current_time 

313 context.state = StreamState.STREAMING 

314 

315 # Pulse typing if needed 

316 if self._typing_manager: 

317 await self._pulse_typing(context.channel, context.chat_id) 

318 

319 finally: 

320 timeout_task.cancel() 

321 try: 

322 await timeout_task 

323 except asyncio.CancelledError: 

324 pass 

325 

326 async def _stream_timeout_watcher( 

327 self, 

328 stream_key: str, 

329 timeout: float 

330 ) -> None: 

331 """Watch for stream timeout and cancel if exceeded.""" 

332 await asyncio.sleep(timeout) 

333 logger.warning(f"Stream timeout reached for {stream_key}") 

334 self._cancelled_streams.add(stream_key) 

335 

336 async def _cancel_existing_stream(self, stream_key: str) -> None: 

337 """Cancel an existing stream for the same chat.""" 

338 async with self._get_async_lock(): 

339 if stream_key in self._active_streams: 

340 self._cancelled_streams.add(stream_key) 

341 logger.info(f"Cancelled existing stream for {stream_key}") 

342 

343 async def update_message( 

344 self, 

345 message_id: str, 

346 content: str 

347 ) -> None: 

348 """ 

349 Update an existing message with new content. 

350 

351 Args: 

352 message_id: The message ID to update. 

353 content: The new content to set. 

354 """ 

355 if not self._edit_message: 

356 logger.debug("No edit_message callback set, skipping update") 

357 return 

358 

359 retries = 0 

360 while retries < self._config.max_retries: 

361 try: 

362 result = self._edit_message(message_id, content) 

363 if asyncio.iscoroutine(result): 

364 await result 

365 return 

366 except Exception as e: 

367 retries += 1 

368 if retries >= self._config.max_retries: 

369 logger.error(f"Failed to update message after {retries} retries: {e}") 

370 raise 

371 logger.warning(f"Update retry {retries}/{self._config.max_retries}: {e}") 

372 await asyncio.sleep(self._config.retry_delay) 

373 

374 async def finalize( 

375 self, 

376 message_id: Optional[str], 

377 final_content: str 

378 ) -> None: 

379 """ 

380 Finalize a streaming message with the complete content. 

381 

382 Args: 

383 message_id: The message ID to finalize (or None for new message). 

384 final_content: The final complete content. 

385 """ 

386 if message_id and self._edit_message: 

387 # Update existing message with final content (no progress indicator) 

388 await self.update_message(message_id, final_content) 

389 elif not message_id and self._send_message: 

390 # Send as new message if no message_id 

391 await self._send_initial_message(None, final_content) 

392 

393 async def _send_initial_message( 

394 self, 

395 chat_id: Optional[str], 

396 content: str 

397 ) -> Optional[str]: 

398 """Send the initial placeholder message.""" 

399 if not self._send_message: 

400 return None 

401 

402 try: 

403 result = self._send_message(chat_id, content) 

404 if asyncio.iscoroutine(result): 

405 result = await result 

406 # Expect result to be message_id or dict with message_id 

407 if isinstance(result, str): 

408 return result 

409 elif isinstance(result, dict): 

410 return result.get('message_id') or result.get('id') 

411 return str(result) if result else None 

412 except Exception as e: 

413 logger.error(f"Failed to send initial message: {e}") 

414 return None 

415 

416 async def _handle_stream_error( 

417 self, 

418 context: StreamContext, 

419 error: Exception 

420 ) -> None: 

421 """Handle stream errors by sending error indicator.""" 

422 error_content = context.content_buffer 

423 if error_content: 

424 error_content += f"\n\n{self._config.error_indicator}" 

425 else: 

426 error_content = f"{self._config.error_indicator} {str(error)}" 

427 

428 if context.message_id: 

429 try: 

430 await self.update_message(context.message_id, error_content) 

431 except Exception as e: 

432 logger.error(f"Failed to send error indicator: {e}") 

433 

434 async def _start_typing(self, channel: str, chat_id: str) -> None: 

435 """Start typing indicator via TypingManager.""" 

436 if self._typing_manager: 

437 try: 

438 if hasattr(self._typing_manager, 'start_async'): 

439 await self._typing_manager.start_async(chat_id) 

440 else: 

441 self._typing_manager.start(chat_id) 

442 except Exception as e: 

443 logger.debug(f"Failed to start typing: {e}") 

444 

445 async def _stop_typing(self, channel: str, chat_id: str) -> None: 

446 """Stop typing indicator via TypingManager.""" 

447 if self._typing_manager: 

448 try: 

449 if hasattr(self._typing_manager, 'stop_async'): 

450 await self._typing_manager.stop_async(chat_id) 

451 else: 

452 self._typing_manager.stop(chat_id) 

453 except Exception as e: 

454 logger.debug(f"Failed to stop typing: {e}") 

455 

456 async def _pulse_typing(self, channel: str, chat_id: str) -> None: 

457 """Pulse typing indicator to keep it active.""" 

458 if self._typing_manager: 

459 try: 

460 if hasattr(self._typing_manager, 'pulse_async'): 

461 await self._typing_manager.pulse_async(chat_id) 

462 else: 

463 self._typing_manager.pulse(chat_id) 

464 except Exception as e: 

465 logger.debug(f"Failed to pulse typing: {e}") 

466 

467 def cancel_stream(self, channel: str, chat_id: str) -> bool: 

468 """ 

469 Cancel an active stream. 

470 

471 Args: 

472 channel: The channel name. 

473 chat_id: The chat ID. 

474 

475 Returns: 

476 True if a stream was cancelled, False if none was active. 

477 """ 

478 stream_key = self._create_stream_key(channel, chat_id) 

479 with self._lock: 

480 if stream_key in self._active_streams: 

481 self._cancelled_streams.add(stream_key) 

482 return True 

483 return False 

484 

485 def is_streaming(self, channel: str, chat_id: str) -> bool: 

486 """Check if streaming is active for a chat.""" 

487 stream_key = self._create_stream_key(channel, chat_id) 

488 with self._lock: 

489 return stream_key in self._active_streams 

490 

491 def get_context(self, channel: str, chat_id: str) -> Optional[StreamContext]: 

492 """Get the streaming context for a chat.""" 

493 stream_key = self._create_stream_key(channel, chat_id) 

494 with self._lock: 

495 return self._active_streams.get(stream_key) 

496 

497 def get_active_streams(self) -> List[str]: 

498 """Get list of active stream keys.""" 

499 with self._lock: 

500 return list(self._active_streams.keys()) 

501 

502 async def cancel_all(self) -> int: 

503 """ 

504 Cancel all active streams. 

505 

506 Returns: 

507 Number of streams cancelled. 

508 """ 

509 async with self._get_async_lock(): 

510 count = len(self._active_streams) 

511 for stream_key in self._active_streams.keys(): 

512 self._cancelled_streams.add(stream_key) 

513 return count 

514 

515 @asynccontextmanager 

516 async def streaming_context( 

517 self, 

518 channel: str, 

519 chat_id: str, 

520 initial_message_id: Optional[str] = None 

521 ): 

522 """ 

523 Async context manager for streaming sessions. 

524 

525 Usage: 

526 async with streaming.streaming_context("telegram", "123") as ctx: 

527 async for chunk in llm_response: 

528 ctx.content_buffer += chunk 

529 await streaming.update_message(ctx.message_id, ctx.content_buffer) 

530 

531 Args: 

532 channel: The channel name. 

533 chat_id: The chat ID. 

534 initial_message_id: Optional message being responded to. 

535 

536 Yields: 

537 StreamContext for the session. 

538 """ 

539 stream_key = self._create_stream_key(channel, chat_id) 

540 

541 context = StreamContext( 

542 channel=channel, 

543 chat_id=chat_id, 

544 initial_message_id=initial_message_id, 

545 state=StreamState.STREAMING, 

546 platform_capability=self._get_platform_capability(channel) 

547 ) 

548 

549 async with self._get_async_lock(): 

550 self._active_streams[stream_key] = context 

551 

552 try: 

553 # Start typing 

554 if self._typing_manager and self._config.show_typing_during_stream: 

555 await self._start_typing(channel, chat_id) 

556 

557 # Send initial message for platforms with edit support 

558 if context.platform_capability == PlatformCapability.FULL_EDIT: 

559 context.message_id = await self._send_initial_message( 

560 chat_id, 

561 self._config.progress_indicator 

562 ) 

563 

564 yield context 

565 

566 # Finalize 

567 if context.state != StreamState.CANCELLED and context.content_buffer: 

568 await self.finalize(context.message_id, context.content_buffer) 

569 context.state = StreamState.COMPLETED 

570 

571 except Exception as e: 

572 context.state = StreamState.ERROR 

573 context.error = str(e) 

574 await self._handle_stream_error(context, e) 

575 raise 

576 finally: 

577 # Stop typing 

578 if self._typing_manager and self._config.show_typing_during_stream: 

579 await self._stop_typing(channel, chat_id) 

580 

581 # Clean up 

582 async with self._get_async_lock(): 

583 self._active_streams.pop(stream_key, None) 

584 

585 def get_stats(self) -> Dict[str, Any]: 

586 """Get statistics about streaming responses.""" 

587 with self._lock: 

588 total_bytes = sum( 

589 ctx.bytes_streamed for ctx in self._active_streams.values() 

590 ) 

591 total_chunks = sum( 

592 ctx.chunk_count for ctx in self._active_streams.values() 

593 ) 

594 return { 

595 "active_streams": len(self._active_streams), 

596 "total_bytes_streamed": total_bytes, 

597 "total_chunks": total_chunks, 

598 "stream_keys": list(self._active_streams.keys()) 

599 } 

600 

601 

602class FallbackStreamingResponse: 

603 """ 

604 Fallback streaming handler for platforms without message editing. 

605 

606 Collects all content and sends as a single message at the end. 

607 """ 

608 

609 def __init__( 

610 self, 

611 send_message: Callable[[str, str], Any], 

612 typing_manager: Optional[Any] = None, 

613 config: Optional[StreamConfig] = None 

614 ): 

615 """ 

616 Initialize the fallback streaming handler. 

617 

618 Args: 

619 send_message: Callback to send message. Takes (chat_id, text). 

620 typing_manager: Optional TypingManager instance. 

621 config: Optional configuration. 

622 """ 

623 self._send_message = send_message 

624 self._typing_manager = typing_manager 

625 self._config = config or StreamConfig() 

626 

627 async def stream( 

628 self, 

629 channel: str, 

630 chat_id: str, 

631 generator: AsyncGenerator[str, None], 

632 initial_message_id: Optional[str] = None 

633 ) -> StreamContext: 

634 """ 

635 Collect stream content and send as single message. 

636 

637 Args: 

638 channel: The channel name. 

639 chat_id: The chat ID. 

640 generator: Async generator yielding content chunks. 

641 initial_message_id: Optional message being responded to. 

642 

643 Returns: 

644 StreamContext with collected content. 

645 """ 

646 context = StreamContext( 

647 channel=channel, 

648 chat_id=chat_id, 

649 initial_message_id=initial_message_id, 

650 state=StreamState.STREAMING, 

651 platform_capability=PlatformCapability.NO_EDIT 

652 ) 

653 

654 try: 

655 # Start typing 

656 if self._typing_manager: 

657 if hasattr(self._typing_manager, 'start_async'): 

658 await self._typing_manager.start_async(chat_id) 

659 else: 

660 self._typing_manager.start(chat_id) 

661 

662 # Collect all content 

663 async for chunk in generator: 

664 context.content_buffer += chunk 

665 context.chunk_count += 1 

666 context.bytes_streamed += len(chunk.encode('utf-8')) 

667 

668 # Pulse typing periodically 

669 if self._typing_manager and context.chunk_count % 10 == 0: 

670 if hasattr(self._typing_manager, 'pulse_async'): 

671 await self._typing_manager.pulse_async(chat_id) 

672 else: 

673 self._typing_manager.pulse(chat_id) 

674 

675 # Send final message 

676 if context.content_buffer: 

677 result = self._send_message(chat_id, context.content_buffer) 

678 if asyncio.iscoroutine(result): 

679 result = await result 

680 if isinstance(result, str): 

681 context.message_id = result 

682 elif isinstance(result, dict): 

683 context.message_id = result.get('message_id') or result.get('id') 

684 

685 context.state = StreamState.COMPLETED 

686 

687 except Exception as e: 

688 context.state = StreamState.ERROR 

689 context.error = str(e) 

690 logger.error(f"Fallback stream error: {e}") 

691 finally: 

692 # Stop typing 

693 if self._typing_manager: 

694 if hasattr(self._typing_manager, 'stop_async'): 

695 await self._typing_manager.stop_async(chat_id) 

696 else: 

697 self._typing_manager.stop(chat_id) 

698 

699 return context 

700 

701 

702def create_streaming_response( 

703 channel: str, 

704 send_message: Callable[[str, str], Any], 

705 edit_message: Optional[Callable[[str, str, str], Any]] = None, 

706 typing_manager: Optional[Any] = None, 

707 config: Optional[StreamConfig] = None 

708) -> Union[StreamingResponse, FallbackStreamingResponse]: 

709 """ 

710 Factory function to create appropriate streaming response handler. 

711 

712 Args: 

713 channel: The channel name. 

714 send_message: Callback to send message. 

715 edit_message: Optional callback to edit message. 

716 typing_manager: Optional TypingManager instance. 

717 config: Optional configuration. 

718 

719 Returns: 

720 StreamingResponse or FallbackStreamingResponse based on platform capability. 

721 """ 

722 capability = PLATFORM_CAPABILITIES.get( 

723 channel.lower(), 

724 PlatformCapability.NO_EDIT 

725 ) 

726 

727 if capability == PlatformCapability.FULL_EDIT and edit_message: 

728 return StreamingResponse( 

729 send_message=send_message, 

730 edit_message=edit_message, 

731 typing_manager=typing_manager, 

732 config=config 

733 ) 

734 else: 

735 return FallbackStreamingResponse( 

736 send_message=send_message, 

737 typing_manager=typing_manager, 

738 config=config 

739 )