Coverage for integrations / channels / commands / builtin.py: 38.2%
414 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Built-in Commands
4Implements 20+ built-in commands for the bot as specified in the HevolveBot integration plan.
5Provides user commands, group commands, and admin commands.
7User Commands:
8- /help [command] - Show help for command or list all
9- /start - Start interaction (welcome message)
10- /stop - Stop receiving messages
11- /status - Show bot and session status
12- /pair <code> - Pair account with code
13- /unpair - Remove account pairing
14- /clear - Clear conversation history
15- /history [n] - Show last n messages
16- /model [name] - Show or set current model
17- /language [lang] - Set preferred language
18- /timezone [tz] - Set timezone
19- /feedback <text> - Send feedback
21Group Commands:
22- /mention <on|off|reply> - Set mention mode for group
23- /quiet - Disable bot in group temporarily
24- /resume - Resume bot in group
26Admin Commands:
27- /broadcast <message> - Send to all users
28- /stats - Show usage statistics
29- /users - List active users
30- /ban <user> - Block user
31- /unban <user> - Unblock user
32- /config get <key> - Get config value
33- /config set <key> <val> - Set config value
34- /reload - Reload configuration
35- /debug <on|off> - Toggle debug mode
36"""
38from dataclasses import dataclass, field
39from datetime import datetime
40from enum import Enum
41from typing import (
42 Any,
43 Callable,
44 Dict,
45 List,
46 Optional,
47 Set,
48 Tuple,
49 Union,
50)
51import asyncio
52import logging
53import os
55from .registry import (
56 CommandRegistry,
57 CommandDefinition,
58 CommandScope,
59 CommandCategory,
60 get_command_registry,
61)
62from .detection import CommandDetector, DetectedCommand, get_command_detector
63from .arguments import (
64 ArgumentParser,
65 ArgumentDefinition,
66 ArgumentType,
67 ArgumentChoice,
68 ParseResult,
69 validate_range,
70 validate_length,
71)
72from .mention_gating import MentionGate, MentionMode, get_mention_gate
74logger = logging.getLogger(__name__)
77# ==============================================================================
78# Command Context and Result
79# ==============================================================================
81@dataclass
82class CommandContext:
83 """
84 Context passed to command handlers.
86 Attributes:
87 channel: Channel name (telegram, discord, etc.)
88 chat_id: Chat/group ID
89 sender_id: Sender's ID
90 sender_name: Sender's display name
91 user_id: Paired agent user ID (if paired)
92 prompt_id: Paired agent prompt ID (if paired)
93 is_admin: Whether sender has admin privileges
94 is_group: Whether this is a group chat
95 raw_args: Raw argument string
96 parsed_args: Parsed arguments dict
97 session: Session object (if available)
98 message_id: Original message ID
99 timestamp: Message timestamp
100 metadata: Additional context metadata
101 """
102 channel: str
103 chat_id: str
104 sender_id: str
105 sender_name: Optional[str] = None
106 user_id: Optional[int] = None
107 prompt_id: Optional[int] = None
108 is_admin: bool = False
109 is_group: bool = False
110 raw_args: Optional[str] = None
111 parsed_args: Dict[str, Any] = field(default_factory=dict)
112 session: Optional[Any] = None
113 message_id: Optional[str] = None
114 timestamp: datetime = field(default_factory=datetime.now)
115 metadata: Dict[str, Any] = field(default_factory=dict)
117 @property
118 def is_paired(self) -> bool:
119 """Check if user is paired."""
120 return self.user_id is not None and self.prompt_id is not None
123@dataclass
124class CommandResult:
125 """
126 Result from executing a command.
128 Attributes:
129 success: Whether command succeeded
130 message: Response message to send
131 data: Additional data from command
132 error: Error message if failed
133 silent: Whether to suppress response
134 metadata: Additional result metadata
135 """
136 success: bool
137 message: Optional[str] = None
138 data: Dict[str, Any] = field(default_factory=dict)
139 error: Optional[str] = None
140 silent: bool = False
141 metadata: Dict[str, Any] = field(default_factory=dict)
143 @classmethod
144 def ok(cls, message: str = None, **data) -> "CommandResult":
145 """Create a successful result."""
146 return cls(success=True, message=message, data=data)
148 @classmethod
149 def fail(cls, error: str, **data) -> "CommandResult":
150 """Create a failed result."""
151 return cls(success=False, error=error, data=data)
153 @classmethod
154 def silent_ok(cls, **data) -> "CommandResult":
155 """Create a silent successful result."""
156 return cls(success=True, silent=True, data=data)
159# ==============================================================================
160# Command Handler Type
161# ==============================================================================
163# Type for command handlers - can be sync or async
164CommandHandler = Callable[[CommandContext], Any] # Returns CommandResult or coroutine
167# ==============================================================================
168# Built-in Commands Class
169# ==============================================================================
171class BuiltinCommands:
172 """
173 Provides all built-in commands for the bot.
175 Registers and manages user commands, group commands, and admin commands.
177 Usage:
178 builtin = BuiltinCommands()
179 builtin.register_all()
181 # Or with custom registry:
182 registry = CommandRegistry()
183 builtin = BuiltinCommands(registry=registry)
184 builtin.register_all()
185 """
187 def __init__(
188 self,
189 registry: Optional[CommandRegistry] = None,
190 session_manager: Optional[Any] = None,
191 pairing_manager: Optional[Any] = None,
192 mention_gate: Optional[MentionGate] = None,
193 config_store: Optional[Dict[str, Any]] = None,
194 admin_ids: Optional[Set[str]] = None,
195 bot_name: str = "Bot",
196 bot_version: str = "1.0.0",
197 ):
198 """
199 Initialize built-in commands.
201 Args:
202 registry: Command registry to use
203 session_manager: Session manager for history/state
204 pairing_manager: Pairing manager for auth
205 mention_gate: Mention gate for group mode
206 config_store: Configuration store
207 admin_ids: Set of admin user IDs
208 bot_name: Bot display name
209 bot_version: Bot version string
210 """
211 self.registry = registry or get_command_registry()
212 self.session_manager = session_manager
213 self.pairing_manager = pairing_manager
214 self.mention_gate = mention_gate or get_mention_gate()
215 self.config_store = config_store or {}
216 self.admin_ids = admin_ids or set()
217 self.bot_name = bot_name
218 self.bot_version = bot_version
220 # State
221 self._debug_mode = False
222 self._stopped_users: Set[Tuple[str, str]] = set() # (channel, sender_id)
223 self._banned_users: Set[str] = set()
224 self._feedback_handler: Optional[Callable] = None
225 self._broadcast_handler: Optional[Callable] = None
226 self._model_store: Dict[Tuple[str, str], str] = {} # (channel, sender_id) -> model
227 self._language_store: Dict[Tuple[str, str], str] = {}
228 self._timezone_store: Dict[Tuple[str, str], str] = {}
230 # Command parsers
231 self._parsers: Dict[str, ArgumentParser] = {}
232 self._setup_parsers()
234 def _setup_parsers(self) -> None:
235 """Set up argument parsers for commands."""
236 # /help [command]
237 self._parsers["help"] = ArgumentParser([
238 ArgumentDefinition(
239 name="command",
240 description="Command to get help for",
241 arg_type=ArgumentType.STRING,
242 required=False,
243 ),
244 ])
246 # /pair <code>
247 self._parsers["pair"] = ArgumentParser([
248 ArgumentDefinition(
249 name="code",
250 description="Pairing code",
251 arg_type=ArgumentType.STRING,
252 required=True,
253 ),
254 ])
256 # /history [n]
257 self._parsers["history"] = ArgumentParser([
258 ArgumentDefinition(
259 name="count",
260 description="Number of messages to show",
261 arg_type=ArgumentType.INTEGER,
262 required=False,
263 default=10,
264 validator=validate_range(1, 100),
265 ),
266 ])
268 # /model [name]
269 self._parsers["model"] = ArgumentParser([
270 ArgumentDefinition(
271 name="name",
272 description="Model name to set",
273 arg_type=ArgumentType.STRING,
274 required=False,
275 ),
276 ])
278 # /language [lang]
279 self._parsers["language"] = ArgumentParser([
280 ArgumentDefinition(
281 name="language",
282 description="Language code (e.g., en, es, fr)",
283 arg_type=ArgumentType.STRING,
284 required=False,
285 ),
286 ])
288 # /timezone [tz]
289 self._parsers["timezone"] = ArgumentParser([
290 ArgumentDefinition(
291 name="timezone",
292 description="Timezone (e.g., UTC, America/New_York)",
293 arg_type=ArgumentType.STRING,
294 required=False,
295 ),
296 ])
298 # /feedback <text>
299 self._parsers["feedback"] = ArgumentParser([
300 ArgumentDefinition(
301 name="text",
302 description="Feedback text",
303 arg_type=ArgumentType.STRING,
304 required=True,
305 capture_remaining=True,
306 validator=validate_length(min_len=5, max_len=1000),
307 ),
308 ])
310 # /mention <mode>
311 self._parsers["mention"] = ArgumentParser([
312 ArgumentDefinition(
313 name="mode",
314 description="Mention mode",
315 arg_type=ArgumentType.CHOICE,
316 required=True,
317 choices=[
318 ArgumentChoice("on", "Always respond"),
319 ArgumentChoice("off", "Mention required"),
320 ArgumentChoice("reply", "Reply only"),
321 ],
322 ),
323 ])
325 # /broadcast <message>
326 self._parsers["broadcast"] = ArgumentParser([
327 ArgumentDefinition(
328 name="message",
329 description="Message to broadcast",
330 arg_type=ArgumentType.STRING,
331 required=True,
332 capture_remaining=True,
333 ),
334 ])
336 # /ban <user>
337 self._parsers["ban"] = ArgumentParser([
338 ArgumentDefinition(
339 name="user",
340 description="User ID to ban",
341 arg_type=ArgumentType.STRING,
342 required=True,
343 ),
344 ])
346 # /unban <user>
347 self._parsers["unban"] = ArgumentParser([
348 ArgumentDefinition(
349 name="user",
350 description="User ID to unban",
351 arg_type=ArgumentType.STRING,
352 required=True,
353 ),
354 ])
356 # /config get <key> or /config set <key> <value>
357 self._parsers["config"] = ArgumentParser([
358 ArgumentDefinition(
359 name="action",
360 description="Action (get or set)",
361 arg_type=ArgumentType.CHOICE,
362 required=True,
363 choices=[
364 ArgumentChoice("get", "Get config value"),
365 ArgumentChoice("set", "Set config value"),
366 ],
367 ),
368 ArgumentDefinition(
369 name="key",
370 description="Config key",
371 arg_type=ArgumentType.STRING,
372 required=True,
373 ),
374 ArgumentDefinition(
375 name="value",
376 description="Value to set (for set action)",
377 arg_type=ArgumentType.STRING,
378 required=False,
379 capture_remaining=True,
380 ),
381 ])
383 # /debug <on|off>
384 self._parsers["debug"] = ArgumentParser([
385 ArgumentDefinition(
386 name="state",
387 description="Debug state",
388 arg_type=ArgumentType.BOOLEAN,
389 required=True,
390 ),
391 ])
393 def register_all(self) -> None:
394 """Register all built-in commands."""
395 self._register_user_commands()
396 self._register_group_commands()
397 self._register_admin_commands()
398 logger.info(f"Registered {len(self.registry)} built-in commands")
400 def _register_user_commands(self) -> None:
401 """Register user commands."""
402 # /help
403 self.registry.register(CommandDefinition(
404 key="help",
405 description="Show help for a command or list all commands",
406 handler=self.cmd_help,
407 aliases=["/help", "/h", "/?"],
408 category=CommandCategory.STATUS,
409 accepts_args=True,
410 ))
412 # /start
413 self.registry.register(CommandDefinition(
414 key="start",
415 description="Start interaction with the bot",
416 handler=self.cmd_start,
417 aliases=["/start"],
418 category=CommandCategory.SESSION,
419 ))
421 # /stop
422 self.registry.register(CommandDefinition(
423 key="stop",
424 description="Stop receiving messages from the bot",
425 handler=self.cmd_stop,
426 aliases=["/stop"],
427 category=CommandCategory.SESSION,
428 ))
430 # /status
431 self.registry.register(CommandDefinition(
432 key="status",
433 description="Show bot and session status",
434 handler=self.cmd_status,
435 aliases=["/status", "/info"],
436 category=CommandCategory.STATUS,
437 ))
439 # /pair
440 self.registry.register(CommandDefinition(
441 key="pair",
442 description="Pair your account with a pairing code",
443 handler=self.cmd_pair,
444 aliases=["/pair", "/link"],
445 category=CommandCategory.SESSION,
446 accepts_args=True,
447 ))
449 # /unpair
450 self.registry.register(CommandDefinition(
451 key="unpair",
452 description="Remove account pairing",
453 handler=self.cmd_unpair,
454 aliases=["/unpair", "/unlink"],
455 category=CommandCategory.SESSION,
456 ))
458 # /clear
459 self.registry.register(CommandDefinition(
460 key="clear",
461 description="Clear conversation history",
462 handler=self.cmd_clear,
463 aliases=["/clear", "/reset"],
464 category=CommandCategory.SESSION,
465 ))
467 # /history
468 self.registry.register(CommandDefinition(
469 key="history",
470 description="Show last n messages from history",
471 handler=self.cmd_history,
472 aliases=["/history", "/hist"],
473 category=CommandCategory.SESSION,
474 accepts_args=True,
475 ))
477 # /model
478 self.registry.register(CommandDefinition(
479 key="model",
480 description="Show or set current AI model",
481 handler=self.cmd_model,
482 aliases=["/model"],
483 category=CommandCategory.OPTIONS,
484 accepts_args=True,
485 ))
487 # /language
488 self.registry.register(CommandDefinition(
489 key="language",
490 description="Set preferred language",
491 handler=self.cmd_language,
492 aliases=["/language", "/lang"],
493 category=CommandCategory.OPTIONS,
494 accepts_args=True,
495 ))
497 # /timezone
498 self.registry.register(CommandDefinition(
499 key="timezone",
500 description="Set your timezone",
501 handler=self.cmd_timezone,
502 aliases=["/timezone", "/tz"],
503 category=CommandCategory.OPTIONS,
504 accepts_args=True,
505 ))
507 # /feedback
508 self.registry.register(CommandDefinition(
509 key="feedback",
510 description="Send feedback to the bot developers",
511 handler=self.cmd_feedback,
512 aliases=["/feedback", "/fb"],
513 category=CommandCategory.STATUS,
514 accepts_args=True,
515 ))
517 def _register_group_commands(self) -> None:
518 """Register group commands."""
519 # /mention
520 self.registry.register(CommandDefinition(
521 key="mention",
522 description="Set mention mode for group (on/off/reply)",
523 handler=self.cmd_mention,
524 aliases=["/mention"],
525 category=CommandCategory.MANAGEMENT,
526 accepts_args=True,
527 ))
529 # /quiet
530 self.registry.register(CommandDefinition(
531 key="quiet",
532 description="Disable bot in group temporarily",
533 handler=self.cmd_quiet,
534 aliases=["/quiet", "/silence", "/mute"],
535 category=CommandCategory.MANAGEMENT,
536 ))
538 # /resume
539 self.registry.register(CommandDefinition(
540 key="resume",
541 description="Resume bot in group after quiet",
542 handler=self.cmd_resume,
543 aliases=["/resume", "/unmute"],
544 category=CommandCategory.MANAGEMENT,
545 ))
547 def _register_admin_commands(self) -> None:
548 """Register admin commands."""
549 # /broadcast
550 self.registry.register(CommandDefinition(
551 key="broadcast",
552 description="Send message to all users (admin only)",
553 handler=self.cmd_broadcast,
554 aliases=["/broadcast", "/announce"],
555 category=CommandCategory.MANAGEMENT,
556 accepts_args=True,
557 metadata={"require_admin": True},
558 ))
560 # /stats
561 self.registry.register(CommandDefinition(
562 key="stats",
563 description="Show usage statistics (admin only)",
564 handler=self.cmd_stats,
565 aliases=["/stats", "/statistics"],
566 category=CommandCategory.STATUS,
567 metadata={"require_admin": True},
568 ))
570 # /users
571 self.registry.register(CommandDefinition(
572 key="users",
573 description="List active users (admin only)",
574 handler=self.cmd_users,
575 aliases=["/users"],
576 category=CommandCategory.STATUS,
577 metadata={"require_admin": True},
578 ))
580 # /ban
581 self.registry.register(CommandDefinition(
582 key="ban",
583 description="Block a user (admin only)",
584 handler=self.cmd_ban,
585 aliases=["/ban", "/block"],
586 category=CommandCategory.MANAGEMENT,
587 accepts_args=True,
588 metadata={"require_admin": True},
589 ))
591 # /unban
592 self.registry.register(CommandDefinition(
593 key="unban",
594 description="Unblock a user (admin only)",
595 handler=self.cmd_unban,
596 aliases=["/unban", "/unblock"],
597 category=CommandCategory.MANAGEMENT,
598 accepts_args=True,
599 metadata={"require_admin": True},
600 ))
602 # /config
603 self.registry.register(CommandDefinition(
604 key="config",
605 description="Get or set configuration values (admin only)",
606 handler=self.cmd_config,
607 aliases=["/config", "/cfg"],
608 category=CommandCategory.MANAGEMENT,
609 accepts_args=True,
610 metadata={"require_admin": True},
611 ))
613 # /reload
614 self.registry.register(CommandDefinition(
615 key="reload",
616 description="Reload configuration (admin only)",
617 handler=self.cmd_reload,
618 aliases=["/reload"],
619 category=CommandCategory.MANAGEMENT,
620 metadata={"require_admin": True},
621 ))
623 # /debug
624 self.registry.register(CommandDefinition(
625 key="debug",
626 description="Toggle debug mode (admin only)",
627 handler=self.cmd_debug,
628 aliases=["/debug"],
629 category=CommandCategory.MANAGEMENT,
630 accepts_args=True,
631 hidden=True,
632 metadata={"require_admin": True},
633 ))
635 # ==========================================================================
636 # Command Execution
637 # ==========================================================================
639 async def execute(
640 self,
641 command_name: str,
642 context: CommandContext,
643 ) -> CommandResult:
644 """
645 Execute a command.
647 Args:
648 command_name: Command key or alias
649 context: Command context
651 Returns:
652 CommandResult from the handler
653 """
654 # Get command
655 command = self.registry.get(command_name)
656 if not command:
657 command = self.registry.get_by_alias(command_name)
659 if not command:
660 return CommandResult.fail(f"Unknown command: {command_name}")
662 if not command.enabled:
663 return CommandResult.fail(f"Command is disabled: {command_name}")
665 # Check admin requirement
666 if command.metadata.get("require_admin", False):
667 if not self._is_admin(context):
668 return CommandResult.fail("This command requires admin privileges")
670 # Check if user is banned
671 if self._is_banned(context.sender_id):
672 return CommandResult.fail("You are blocked from using this bot")
674 # Check if user has stopped
675 if self._is_stopped(context.channel, context.sender_id):
676 if command.key not in ("start", "resume"):
677 return CommandResult.silent_ok()
679 # Parse arguments if command accepts them
680 if command.accepts_args and command.key in self._parsers:
681 parser = self._parsers[command.key]
682 parse_result = parser.parse(context.raw_args)
683 if not parse_result.success:
684 usage = parser.format_usage(command.key)
685 return CommandResult.fail(
686 f"Invalid arguments: {', '.join(parse_result.errors)}\n"
687 f"Usage: {usage}"
688 )
689 context.parsed_args = parse_result.args
691 # Execute handler
692 try:
693 if command.handler is None:
694 return CommandResult.fail(f"No handler for command: {command_name}")
696 result = command.handler(context)
697 if asyncio.iscoroutine(result):
698 result = await result
700 return result
702 except Exception as e:
703 logger.exception(f"Error executing command {command_name}: {e}")
704 return CommandResult.fail(f"Error: {str(e)}")
706 def _is_admin(self, context: CommandContext) -> bool:
707 """Check if context user is an admin."""
708 if context.is_admin:
709 return True
710 return context.sender_id in self.admin_ids
712 def _is_banned(self, sender_id: str) -> bool:
713 """Check if user is banned."""
714 return sender_id in self._banned_users
716 def _is_stopped(self, channel: str, sender_id: str) -> bool:
717 """Check if user has stopped the bot."""
718 return (channel, sender_id) in self._stopped_users
720 # ==========================================================================
721 # User Command Handlers
722 # ==========================================================================
724 def cmd_help(self, ctx: CommandContext) -> CommandResult:
725 """Handle /help command."""
726 command_name = ctx.parsed_args.get("command")
728 if command_name:
729 # Help for specific command
730 cmd = self.registry.get(command_name)
731 if not cmd:
732 cmd = self.registry.get_by_alias(command_name)
734 if not cmd:
735 return CommandResult.fail(f"Unknown command: {command_name}")
737 # Build detailed help
738 lines = [
739 f"**{cmd.primary_alias}** - {cmd.description}",
740 "",
741 ]
743 if cmd.aliases:
744 lines.append(f"Aliases: {', '.join(cmd.aliases)}")
746 if cmd.key in self._parsers:
747 lines.append("")
748 lines.append("Arguments:")
749 lines.append(self._parsers[cmd.key].format_help())
751 if cmd.metadata.get("require_admin"):
752 lines.append("")
753 lines.append("(Admin only)")
755 return CommandResult.ok("\n".join(lines))
757 else:
758 # List all commands
759 commands = self.registry.list_commands(include_hidden=False)
761 # Group by category
762 by_category: Dict[CommandCategory, List[CommandDefinition]] = {}
763 for cmd in commands:
764 if cmd.metadata.get("require_admin") and not self._is_admin(ctx):
765 continue
766 if cmd.category not in by_category:
767 by_category[cmd.category] = []
768 by_category[cmd.category].append(cmd)
770 lines = [f"**{self.bot_name} Commands**", ""]
772 for category in CommandCategory:
773 if category not in by_category:
774 continue
775 cmds = by_category[category]
776 lines.append(f"**{category.value.title()}:**")
777 for cmd in cmds:
778 lines.append(f" {cmd.primary_alias} - {cmd.description}")
779 lines.append("")
781 lines.append("Use /help <command> for detailed help.")
783 return CommandResult.ok("\n".join(lines))
785 def cmd_start(self, ctx: CommandContext) -> CommandResult:
786 """Handle /start command."""
787 # Remove from stopped users
788 key = (ctx.channel, ctx.sender_id)
789 if key in self._stopped_users:
790 self._stopped_users.discard(key)
792 name = ctx.sender_name or "there"
793 message = (
794 f"Hello {name}! Welcome to {self.bot_name}.\n\n"
795 f"I'm here to help you. Send me a message to get started.\n\n"
796 f"Use /help to see available commands."
797 )
799 return CommandResult.ok(message)
801 def cmd_stop(self, ctx: CommandContext) -> CommandResult:
802 """Handle /stop command."""
803 key = (ctx.channel, ctx.sender_id)
804 self._stopped_users.add(key)
806 return CommandResult.ok(
807 f"You've stopped {self.bot_name}. "
808 f"Use /start to receive messages again."
809 )
811 def cmd_status(self, ctx: CommandContext) -> CommandResult:
812 """Handle /status command."""
813 lines = [
814 f"**{self.bot_name} Status**",
815 "",
816 f"Version: {self.bot_version}",
817 f"Channel: {ctx.channel}",
818 f"Debug Mode: {'On' if self._debug_mode else 'Off'}",
819 ]
821 if ctx.is_paired:
822 lines.append(f"Paired: Yes (User ID: {ctx.user_id})")
823 else:
824 lines.append("Paired: No")
826 if ctx.session and hasattr(ctx.session, "message_count"):
827 lines.append(f"Messages in session: {ctx.session.message_count}")
829 # Show user preferences
830 user_key = (ctx.channel, ctx.sender_id)
831 if user_key in self._model_store:
832 lines.append(f"Model: {self._model_store[user_key]}")
833 if user_key in self._language_store:
834 lines.append(f"Language: {self._language_store[user_key]}")
835 if user_key in self._timezone_store:
836 lines.append(f"Timezone: {self._timezone_store[user_key]}")
838 return CommandResult.ok("\n".join(lines))
840 def cmd_pair(self, ctx: CommandContext) -> CommandResult:
841 """Handle /pair command."""
842 code = ctx.parsed_args.get("code", "")
844 if not code:
845 return CommandResult.fail(
846 "Please provide a pairing code.\n"
847 "Usage: /pair <code>"
848 )
850 if not self.pairing_manager:
851 return CommandResult.fail(
852 "Pairing is not configured. Please contact an administrator."
853 )
855 session = self.pairing_manager.verify_pairing(
856 channel=ctx.channel,
857 sender_id=ctx.sender_id,
858 code=code,
859 )
861 if session:
862 return CommandResult.ok(
863 f"Pairing successful! Your account is now linked.\n"
864 f"User ID: {session.user_id}",
865 user_id=session.user_id,
866 prompt_id=session.prompt_id,
867 )
868 else:
869 return CommandResult.fail(
870 "Invalid or expired pairing code. Please get a new code and try again."
871 )
873 def cmd_unpair(self, ctx: CommandContext) -> CommandResult:
874 """Handle /unpair command."""
875 if not self.pairing_manager:
876 return CommandResult.fail("Pairing is not configured.")
878 if not ctx.is_paired:
879 return CommandResult.fail("You are not paired.")
881 success = self.pairing_manager.unpair(ctx.channel, ctx.sender_id)
883 if success:
884 return CommandResult.ok(
885 "Your account has been unpaired. "
886 "You'll need to pair again to continue using the bot."
887 )
888 else:
889 return CommandResult.fail("Failed to unpair account.")
891 def cmd_clear(self, ctx: CommandContext) -> CommandResult:
892 """Handle /clear command."""
893 if ctx.session and hasattr(ctx.session, "clear_history"):
894 ctx.session.clear_history()
895 return CommandResult.ok("Conversation history cleared.")
897 if self.session_manager:
898 session = self.session_manager.get_session(
899 ctx.channel, ctx.sender_id, create=False
900 )
901 if session:
902 session.clear_history()
903 return CommandResult.ok("Conversation history cleared.")
905 return CommandResult.ok("No history to clear.")
907 def cmd_history(self, ctx: CommandContext) -> CommandResult:
908 """Handle /history command."""
909 count = ctx.parsed_args.get("count", 10)
911 session = ctx.session
912 if not session and self.session_manager:
913 session = self.session_manager.get_session(
914 ctx.channel, ctx.sender_id, create=False
915 )
917 if not session or not hasattr(session, "messages"):
918 return CommandResult.ok("No conversation history.")
920 messages = session.messages[-count:]
921 if not messages:
922 return CommandResult.ok("No messages in history.")
924 lines = [f"**Last {len(messages)} messages:**", ""]
925 for msg in messages:
926 role = "You" if msg.role == "user" else self.bot_name
927 content = msg.content[:100] + "..." if len(msg.content) > 100 else msg.content
928 lines.append(f"**{role}:** {content}")
930 return CommandResult.ok("\n".join(lines))
932 def cmd_model(self, ctx: CommandContext) -> CommandResult:
933 """Handle /model command."""
934 name = ctx.parsed_args.get("name")
935 user_key = (ctx.channel, ctx.sender_id)
937 if not name:
938 # Show current model
939 current = self._model_store.get(user_key, "default")
940 return CommandResult.ok(f"Current model: {current}")
942 # Set model
943 self._model_store[user_key] = name
944 return CommandResult.ok(f"Model set to: {name}")
946 def cmd_language(self, ctx: CommandContext) -> CommandResult:
947 """Handle /language command."""
948 lang = ctx.parsed_args.get("language")
949 user_key = (ctx.channel, ctx.sender_id)
951 if not lang:
952 current = self._language_store.get(user_key, "en")
953 return CommandResult.ok(f"Current language: {current}")
955 self._language_store[user_key] = lang
956 return CommandResult.ok(f"Language set to: {lang}")
958 def cmd_timezone(self, ctx: CommandContext) -> CommandResult:
959 """Handle /timezone command."""
960 tz = ctx.parsed_args.get("timezone")
961 user_key = (ctx.channel, ctx.sender_id)
963 if not tz:
964 current = self._timezone_store.get(user_key, "UTC")
965 return CommandResult.ok(f"Current timezone: {current}")
967 self._timezone_store[user_key] = tz
968 return CommandResult.ok(f"Timezone set to: {tz}")
970 def cmd_feedback(self, ctx: CommandContext) -> CommandResult:
971 """Handle /feedback command."""
972 text = ctx.parsed_args.get("text")
974 if not text:
975 return CommandResult.fail(
976 "Please provide feedback text.\n"
977 "Usage: /feedback <your feedback>"
978 )
980 # Call feedback handler if set
981 if self._feedback_handler:
982 try:
983 self._feedback_handler(ctx.sender_id, ctx.channel, text)
984 except Exception as e:
985 logger.error(f"Feedback handler error: {e}")
987 logger.info(f"Feedback from {ctx.sender_id}: {text}")
989 return CommandResult.ok(
990 "Thank you for your feedback! We appreciate you taking the time to help us improve."
991 )
993 # ==========================================================================
994 # Group Command Handlers
995 # ==========================================================================
997 def cmd_mention(self, ctx: CommandContext) -> CommandResult:
998 """Handle /mention command."""
999 if not ctx.is_group:
1000 return CommandResult.fail("This command only works in groups.")
1002 mode_str = ctx.parsed_args.get("mode", "").lower()
1004 mode_map = {
1005 "on": MentionMode.ALWAYS,
1006 "off": MentionMode.MENTION,
1007 "reply": MentionMode.COMMANDS_ONLY,
1008 }
1010 if mode_str not in mode_map:
1011 return CommandResult.fail(
1012 "Invalid mode. Use: on, off, or reply\n"
1013 "- on: Always respond\n"
1014 "- off: Only respond when mentioned\n"
1015 "- reply: Only respond to commands"
1016 )
1018 mode = mode_map[mode_str]
1019 self.mention_gate.set_mode(ctx.chat_id, mode)
1021 return CommandResult.ok(f"Mention mode set to: {mode_str}")
1023 def cmd_quiet(self, ctx: CommandContext) -> CommandResult:
1024 """Handle /quiet command."""
1025 if not ctx.is_group:
1026 return CommandResult.fail("This command only works in groups.")
1028 self.mention_gate.quiet(ctx.chat_id)
1030 return CommandResult.ok(
1031 f"{self.bot_name} is now quiet in this group. "
1032 f"Use /resume to re-enable."
1033 )
1035 def cmd_resume(self, ctx: CommandContext) -> CommandResult:
1036 """Handle /resume command."""
1037 if not ctx.is_group:
1038 return CommandResult.fail("This command only works in groups.")
1040 self.mention_gate.resume(ctx.chat_id)
1042 return CommandResult.ok(f"{self.bot_name} has resumed in this group.")
1044 # ==========================================================================
1045 # Admin Command Handlers
1046 # ==========================================================================
1048 async def cmd_broadcast(self, ctx: CommandContext) -> CommandResult:
1049 """Handle /broadcast command."""
1050 message = ctx.parsed_args.get("message")
1052 if not message:
1053 return CommandResult.fail(
1054 "Please provide a message to broadcast.\n"
1055 "Usage: /broadcast <message>"
1056 )
1058 if self._broadcast_handler:
1059 try:
1060 count = await self._broadcast_handler(message)
1061 return CommandResult.ok(f"Broadcast sent to {count} users.")
1062 except Exception as e:
1063 return CommandResult.fail(f"Broadcast failed: {e}")
1065 return CommandResult.ok(
1066 "Broadcast queued. (No broadcast handler configured)"
1067 )
1069 def cmd_stats(self, ctx: CommandContext) -> CommandResult:
1070 """Handle /stats command."""
1071 lines = [
1072 f"**{self.bot_name} Statistics**",
1073 "",
1074 ]
1076 # Session stats
1077 if self.session_manager:
1078 total = self.session_manager.get_session_count()
1079 lines.append(f"Active sessions: {total}")
1081 # Command stats from registry
1082 lines.append(f"Registered commands: {len(self.registry)}")
1084 # Other stats
1085 lines.append(f"Banned users: {len(self._banned_users)}")
1086 lines.append(f"Stopped users: {len(self._stopped_users)}")
1087 lines.append(f"Debug mode: {'On' if self._debug_mode else 'Off'}")
1089 return CommandResult.ok("\n".join(lines))
1091 def cmd_users(self, ctx: CommandContext) -> CommandResult:
1092 """Handle /users command."""
1093 if not self.session_manager:
1094 return CommandResult.ok("No session manager configured.")
1096 sessions = self.session_manager.list_sessions()[:20] # Limit to 20
1098 if not sessions:
1099 return CommandResult.ok("No active users.")
1101 lines = ["**Active Users:**", ""]
1102 for session in sessions:
1103 paired = "Paired" if session.user_id else "Not paired"
1104 lines.append(f"- {session.channel}:{session.sender_id} ({paired})")
1106 if len(sessions) == 20:
1107 lines.append("... (showing first 20)")
1109 return CommandResult.ok("\n".join(lines))
1111 def cmd_ban(self, ctx: CommandContext) -> CommandResult:
1112 """Handle /ban command."""
1113 user = ctx.parsed_args.get("user")
1115 if not user:
1116 return CommandResult.fail(
1117 "Please specify a user to ban.\n"
1118 "Usage: /ban <user_id>"
1119 )
1121 if user in self._banned_users:
1122 return CommandResult.fail(f"User {user} is already banned.")
1124 self._banned_users.add(user)
1125 logger.info(f"Admin {ctx.sender_id} banned user {user}")
1127 return CommandResult.ok(f"User {user} has been banned.")
1129 def cmd_unban(self, ctx: CommandContext) -> CommandResult:
1130 """Handle /unban command."""
1131 user = ctx.parsed_args.get("user")
1133 if not user:
1134 return CommandResult.fail(
1135 "Please specify a user to unban.\n"
1136 "Usage: /unban <user_id>"
1137 )
1139 if user not in self._banned_users:
1140 return CommandResult.fail(f"User {user} is not banned.")
1142 self._banned_users.discard(user)
1143 logger.info(f"Admin {ctx.sender_id} unbanned user {user}")
1145 return CommandResult.ok(f"User {user} has been unbanned.")
1147 def cmd_config(self, ctx: CommandContext) -> CommandResult:
1148 """Handle /config command."""
1149 action = ctx.parsed_args.get("action")
1150 key = ctx.parsed_args.get("key")
1151 value = ctx.parsed_args.get("value")
1153 if action == "get":
1154 if key not in self.config_store:
1155 return CommandResult.fail(f"Config key not found: {key}")
1156 return CommandResult.ok(f"{key} = {self.config_store[key]}")
1158 elif action == "set":
1159 if not value:
1160 return CommandResult.fail(
1161 "Please provide a value.\n"
1162 "Usage: /config set <key> <value>"
1163 )
1164 self.config_store[key] = value
1165 logger.info(f"Admin {ctx.sender_id} set config {key}={value}")
1166 return CommandResult.ok(f"Set {key} = {value}")
1168 return CommandResult.fail("Invalid action. Use 'get' or 'set'.")
1170 def cmd_reload(self, ctx: CommandContext) -> CommandResult:
1171 """Handle /reload command."""
1172 # Placeholder for configuration reload
1173 logger.info(f"Admin {ctx.sender_id} triggered reload")
1175 return CommandResult.ok(
1176 "Configuration reloaded.\n"
1177 "(Note: Full reload requires restart for some settings)"
1178 )
1180 def cmd_debug(self, ctx: CommandContext) -> CommandResult:
1181 """Handle /debug command."""
1182 state = ctx.parsed_args.get("state")
1184 if state is None:
1185 return CommandResult.ok(f"Debug mode: {'On' if self._debug_mode else 'Off'}")
1187 self._debug_mode = state
1188 logger.info(f"Admin {ctx.sender_id} set debug mode to {state}")
1190 return CommandResult.ok(f"Debug mode: {'On' if state else 'Off'}")
1192 # ==========================================================================
1193 # Configuration Methods
1194 # ==========================================================================
1196 def set_feedback_handler(self, handler: Callable[[str, str, str], None]) -> None:
1197 """
1198 Set handler for feedback submissions.
1200 Args:
1201 handler: Function(sender_id, channel, text)
1202 """
1203 self._feedback_handler = handler
1205 def set_broadcast_handler(self, handler: Callable[[str], int]) -> None:
1206 """
1207 Set handler for broadcast messages.
1209 Args:
1210 handler: Async function(message) -> count
1211 """
1212 self._broadcast_handler = handler
1214 def add_admin(self, user_id: str) -> None:
1215 """Add an admin user."""
1216 self.admin_ids.add(user_id)
1218 def remove_admin(self, user_id: str) -> None:
1219 """Remove an admin user."""
1220 self.admin_ids.discard(user_id)
1222 def get_user_model(self, channel: str, sender_id: str) -> Optional[str]:
1223 """Get user's preferred model."""
1224 return self._model_store.get((channel, sender_id))
1226 def get_user_language(self, channel: str, sender_id: str) -> Optional[str]:
1227 """Get user's preferred language."""
1228 return self._language_store.get((channel, sender_id))
1230 def get_user_timezone(self, channel: str, sender_id: str) -> Optional[str]:
1231 """Get user's timezone."""
1232 return self._timezone_store.get((channel, sender_id))
1235# ==============================================================================
1236# Global Instance
1237# ==============================================================================
1239_builtin_commands: Optional[BuiltinCommands] = None
1242def get_builtin_commands() -> BuiltinCommands:
1243 """Get or create the global built-in commands instance."""
1244 global _builtin_commands
1245 if _builtin_commands is None:
1246 _builtin_commands = BuiltinCommands()
1247 return _builtin_commands
1250def reset_builtin_commands() -> None:
1251 """Reset the global built-in commands instance."""
1252 global _builtin_commands
1253 _builtin_commands = None
1256def register_builtin_commands(
1257 registry: Optional[CommandRegistry] = None,
1258 **kwargs,
1259) -> BuiltinCommands:
1260 """
1261 Register all built-in commands.
1263 Args:
1264 registry: Optional custom registry
1265 **kwargs: Additional arguments for BuiltinCommands
1267 Returns:
1268 BuiltinCommands instance
1269 """
1270 global _builtin_commands
1271 _builtin_commands = BuiltinCommands(registry=registry, **kwargs)
1272 _builtin_commands.register_all()
1273 return _builtin_commands