Coverage for integrations / channels / automation / triggers.py: 67.6%
213 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Trigger Manager for HevolveBot Integration.
4Provides event-based trigger registration and evaluation.
5"""
7import re
8import secrets
9from dataclasses import dataclass, field
10from datetime import datetime
11from enum import Enum
12from typing import Any, Callable, Dict, List, Optional, Pattern, Union
15class TriggerType(Enum):
16 """Types of triggers that can be registered."""
17 MESSAGE_RECEIVED = "message_received"
18 USER_JOINED = "user_joined"
19 USER_LEFT = "user_left"
20 REACTION_ADDED = "reaction_added"
21 FILE_SHARED = "file_shared"
22 MENTION = "mention"
23 KEYWORD = "keyword"
24 REGEX = "regex"
25 SCHEDULE = "schedule"
26 WEBHOOK = "webhook"
27 VISUAL_MATCH = "visual_match"
28 SCREEN_MATCH = "screen_match"
31class TriggerPriority(Enum):
32 """Priority levels for trigger execution."""
33 LOW = 1
34 NORMAL = 5
35 HIGH = 10
36 CRITICAL = 100
39@dataclass
40class TriggerCondition:
41 """A condition that must be met for a trigger to fire."""
42 field: str
43 operator: str # 'eq', 'ne', 'gt', 'lt', 'gte', 'lte', 'contains', 'startswith', 'endswith', 'matches'
44 value: Any
46 def evaluate(self, data: Dict[str, Any]) -> bool:
47 """
48 Evaluate this condition against event data.
50 Args:
51 data: The event data to check
53 Returns:
54 True if condition is met
55 """
56 actual = data.get(self.field)
58 if actual is None:
59 return False
61 if self.operator == "eq":
62 return actual == self.value
63 elif self.operator == "ne":
64 return actual != self.value
65 elif self.operator == "gt":
66 return actual > self.value
67 elif self.operator == "lt":
68 return actual < self.value
69 elif self.operator == "gte":
70 return actual >= self.value
71 elif self.operator == "lte":
72 return actual <= self.value
73 elif self.operator == "contains":
74 return self.value in str(actual)
75 elif self.operator == "startswith":
76 return str(actual).startswith(self.value)
77 elif self.operator == "endswith":
78 return str(actual).endswith(self.value)
79 elif self.operator == "matches":
80 return bool(re.match(self.value, str(actual)))
81 else:
82 return False
85@dataclass
86class Trigger:
87 """A registered trigger."""
88 id: str
89 name: str
90 trigger_type: TriggerType
91 callback: Callable[[Dict[str, Any]], Any]
92 enabled: bool = True
93 priority: TriggerPriority = TriggerPriority.NORMAL
94 conditions: List[TriggerCondition] = field(default_factory=list)
95 keywords: List[str] = field(default_factory=list)
96 pattern: Optional[str] = None
97 compiled_pattern: Optional[Pattern] = None
98 channel_filter: Optional[List[str]] = None
99 user_filter: Optional[List[str]] = None
100 cooldown_seconds: int = 0
101 last_triggered: Optional[datetime] = None
102 trigger_count: int = 0
103 max_triggers: Optional[int] = None
104 created_at: datetime = field(default_factory=datetime.now)
105 metadata: Dict[str, Any] = field(default_factory=dict)
108@dataclass
109class TriggerResult:
110 """Result of a trigger evaluation."""
111 trigger_id: str
112 trigger_name: str
113 triggered: bool
114 callback_result: Any = None
115 error: Optional[str] = None
116 execution_time_ms: float = 0.0
119class TriggerManager:
120 """
121 Manages event-based triggers.
123 Features:
124 - Register triggers for various event types
125 - Support for keyword and regex matching
126 - Conditional trigger evaluation
127 - Priority-based execution order
128 - Cooldown support
129 - Channel and user filtering
130 """
132 def __init__(self):
133 """Initialize the TriggerManager."""
134 self._triggers: Dict[str, Trigger] = {}
135 self._type_index: Dict[TriggerType, List[str]] = {t: [] for t in TriggerType}
137 def register(
138 self,
139 trigger_type: TriggerType,
140 callback: Callable[[Dict[str, Any]], Any],
141 name: Optional[str] = None,
142 trigger_id: Optional[str] = None,
143 priority: TriggerPriority = TriggerPriority.NORMAL,
144 conditions: Optional[List[TriggerCondition]] = None,
145 keywords: Optional[List[str]] = None,
146 pattern: Optional[str] = None,
147 channel_filter: Optional[List[str]] = None,
148 user_filter: Optional[List[str]] = None,
149 cooldown_seconds: int = 0,
150 max_triggers: Optional[int] = None,
151 metadata: Optional[Dict[str, Any]] = None
152 ) -> Trigger:
153 """
154 Register a new trigger.
156 Args:
157 trigger_type: The type of event to trigger on
158 callback: Function to call when trigger fires
159 name: Optional trigger name
160 trigger_id: Optional custom ID
161 priority: Execution priority
162 conditions: Optional list of conditions
163 keywords: Keywords to match (for KEYWORD type)
164 pattern: Regex pattern (for REGEX type)
165 channel_filter: Optional list of channel IDs to match
166 user_filter: Optional list of user IDs to match
167 cooldown_seconds: Minimum seconds between triggers
168 max_triggers: Maximum number of times to trigger
169 metadata: Optional metadata
171 Returns:
172 The created Trigger
174 Raises:
175 ValueError: If required parameters are missing
176 """
177 trigger_id = trigger_id or f"trg_{secrets.token_hex(6)}"
178 name = name or f"{trigger_type.value} trigger"
180 if trigger_id in self._triggers:
181 raise ValueError(f"Trigger with ID '{trigger_id}' already exists")
183 # Validate type-specific requirements
184 if trigger_type == TriggerType.KEYWORD and not keywords:
185 raise ValueError("KEYWORD trigger requires keywords list")
187 if trigger_type == TriggerType.REGEX and not pattern:
188 raise ValueError("REGEX trigger requires pattern")
190 # Compile regex pattern if provided
191 compiled_pattern = None
192 if pattern:
193 try:
194 compiled_pattern = re.compile(pattern, re.IGNORECASE)
195 except re.error as e:
196 raise ValueError(f"Invalid regex pattern: {e}")
198 trigger = Trigger(
199 id=trigger_id,
200 name=name,
201 trigger_type=trigger_type,
202 callback=callback,
203 priority=priority,
204 conditions=conditions or [],
205 keywords=keywords or [],
206 pattern=pattern,
207 compiled_pattern=compiled_pattern,
208 channel_filter=channel_filter,
209 user_filter=user_filter,
210 cooldown_seconds=cooldown_seconds,
211 max_triggers=max_triggers,
212 metadata=metadata or {}
213 )
215 self._triggers[trigger_id] = trigger
216 self._type_index[trigger_type].append(trigger_id)
218 return trigger
220 def unregister(self, trigger_id: str) -> bool:
221 """
222 Unregister a trigger.
224 Args:
225 trigger_id: The trigger ID to remove
227 Returns:
228 True if removed, False if not found
229 """
230 if trigger_id in self._triggers:
231 trigger = self._triggers[trigger_id]
232 self._type_index[trigger.trigger_type].remove(trigger_id)
233 del self._triggers[trigger_id]
234 return True
235 return False
237 def enable(self, trigger_id: str) -> bool:
238 """
239 Enable a trigger.
241 Args:
242 trigger_id: The trigger ID
244 Returns:
245 True if enabled, False if not found
246 """
247 if trigger_id in self._triggers:
248 self._triggers[trigger_id].enabled = True
249 return True
250 return False
252 def disable(self, trigger_id: str) -> bool:
253 """
254 Disable a trigger.
256 Args:
257 trigger_id: The trigger ID
259 Returns:
260 True if disabled, False if not found
261 """
262 if trigger_id in self._triggers:
263 self._triggers[trigger_id].enabled = False
264 return True
265 return False
267 def get_trigger(self, trigger_id: str) -> Optional[Trigger]:
268 """
269 Get a trigger by ID.
271 Args:
272 trigger_id: The trigger ID
274 Returns:
275 The trigger or None
276 """
277 return self._triggers.get(trigger_id)
279 def list_triggers(
280 self,
281 trigger_type: Optional[TriggerType] = None,
282 enabled_only: bool = False
283 ) -> List[Trigger]:
284 """
285 List registered triggers.
287 Args:
288 trigger_type: Optional filter by type
289 enabled_only: Only return enabled triggers
291 Returns:
292 List of matching triggers
293 """
294 if trigger_type:
295 trigger_ids = self._type_index.get(trigger_type, [])
296 triggers = [self._triggers[tid] for tid in trigger_ids]
297 else:
298 triggers = list(self._triggers.values())
300 if enabled_only:
301 triggers = [t for t in triggers if t.enabled]
303 # Sort by priority (highest first)
304 triggers.sort(key=lambda t: t.priority.value, reverse=True)
306 return triggers
308 def evaluate(
309 self,
310 event_type: TriggerType,
311 event_data: Dict[str, Any],
312 stop_on_first: bool = False
313 ) -> List[TriggerResult]:
314 """
315 Evaluate triggers for an event.
317 Args:
318 event_type: The type of event
319 event_data: The event data
320 stop_on_first: Stop after first successful trigger
322 Returns:
323 List of trigger results
324 """
325 import time
327 results = []
329 # Get triggers for this event type, sorted by priority
330 triggers = self.list_triggers(trigger_type=event_type, enabled_only=True)
332 for trigger in triggers:
333 # Check if trigger should fire
334 should_fire, reason = self._should_fire(trigger, event_data)
336 if not should_fire:
337 continue
339 # Execute the trigger
340 start_time = time.time()
341 result = TriggerResult(
342 trigger_id=trigger.id,
343 trigger_name=trigger.name,
344 triggered=True
345 )
347 try:
348 result.callback_result = trigger.callback(event_data)
349 trigger.last_triggered = datetime.now()
350 trigger.trigger_count += 1
352 # Check if max triggers reached
353 if trigger.max_triggers and trigger.trigger_count >= trigger.max_triggers:
354 trigger.enabled = False
356 except Exception as e:
357 result.error = str(e)
359 result.execution_time_ms = (time.time() - start_time) * 1000
360 results.append(result)
362 if stop_on_first and result.callback_result is not None:
363 break
365 return results
367 def _should_fire(
368 self,
369 trigger: Trigger,
370 event_data: Dict[str, Any]
371 ) -> tuple[bool, str]:
372 """
373 Check if a trigger should fire for given event data.
375 Returns:
376 Tuple of (should_fire, reason)
377 """
378 # Check cooldown
379 if trigger.cooldown_seconds > 0 and trigger.last_triggered:
380 elapsed = (datetime.now() - trigger.last_triggered).total_seconds()
381 if elapsed < trigger.cooldown_seconds:
382 return False, "cooldown"
384 # Check max triggers
385 if trigger.max_triggers and trigger.trigger_count >= trigger.max_triggers:
386 return False, "max_triggers_reached"
388 # Check channel filter
389 if trigger.channel_filter:
390 channel = event_data.get("channel_id") or event_data.get("channel")
391 if channel not in trigger.channel_filter:
392 return False, "channel_filter"
394 # Check user filter
395 if trigger.user_filter:
396 user = event_data.get("user_id") or event_data.get("user")
397 if user not in trigger.user_filter:
398 return False, "user_filter"
400 # Check conditions
401 for condition in trigger.conditions:
402 if not condition.evaluate(event_data):
403 return False, f"condition_{condition.field}"
405 # Type-specific checks
406 if trigger.trigger_type == TriggerType.KEYWORD:
407 message = str(event_data.get("message", "") or event_data.get("text", ""))
408 if not any(kw.lower() in message.lower() for kw in trigger.keywords):
409 return False, "keyword_not_found"
411 elif trigger.trigger_type == TriggerType.REGEX:
412 message = str(event_data.get("message", "") or event_data.get("text", ""))
413 if trigger.compiled_pattern and not trigger.compiled_pattern.search(message):
414 return False, "pattern_not_matched"
416 elif trigger.trigger_type == TriggerType.MENTION:
417 mentions = event_data.get("mentions", [])
418 mentioned_user = event_data.get("mentioned_user")
419 if mentioned_user and mentioned_user not in mentions:
420 return False, "not_mentioned"
422 elif trigger.trigger_type in (TriggerType.VISUAL_MATCH, TriggerType.SCREEN_MATCH):
423 description = str(event_data.get("description", ""))
424 if trigger.keywords and not any(kw.lower() in description.lower() for kw in trigger.keywords):
425 return False, "visual_keyword_not_found"
426 if trigger.compiled_pattern and not trigger.compiled_pattern.search(description):
427 return False, "visual_pattern_not_matched"
429 return True, "ok"
431 def evaluate_message(
432 self,
433 message: str,
434 channel_id: Optional[str] = None,
435 user_id: Optional[str] = None,
436 extra_data: Optional[Dict[str, Any]] = None
437 ) -> List[TriggerResult]:
438 """
439 Convenience method to evaluate a message against all applicable triggers.
441 Args:
442 message: The message text
443 channel_id: Optional channel ID
444 user_id: Optional user ID
445 extra_data: Optional additional event data
447 Returns:
448 List of trigger results
449 """
450 event_data = {
451 "message": message,
452 "text": message,
453 "channel_id": channel_id,
454 "user_id": user_id,
455 **(extra_data or {})
456 }
458 results = []
460 # Evaluate MESSAGE_RECEIVED triggers
461 results.extend(self.evaluate(TriggerType.MESSAGE_RECEIVED, event_data))
463 # Evaluate KEYWORD triggers
464 results.extend(self.evaluate(TriggerType.KEYWORD, event_data))
466 # Evaluate REGEX triggers
467 results.extend(self.evaluate(TriggerType.REGEX, event_data))
469 return results
471 def reset_trigger(self, trigger_id: str) -> bool:
472 """
473 Reset a trigger's state (count, last_triggered, re-enable).
475 Args:
476 trigger_id: The trigger ID
478 Returns:
479 True if reset, False if not found
480 """
481 if trigger_id in self._triggers:
482 trigger = self._triggers[trigger_id]
483 trigger.trigger_count = 0
484 trigger.last_triggered = None
485 trigger.enabled = True
486 return True
487 return False
489 def get_stats(self) -> Dict[str, Any]:
490 """
491 Get statistics about registered triggers.
493 Returns:
494 Dictionary with trigger statistics
495 """
496 total = len(self._triggers)
497 enabled = sum(1 for t in self._triggers.values() if t.enabled)
498 by_type = {
499 t.value: len(ids) for t, ids in self._type_index.items()
500 }
501 total_triggers = sum(t.trigger_count for t in self._triggers.values())
503 return {
504 "total_triggers": total,
505 "enabled_triggers": enabled,
506 "disabled_triggers": total - enabled,
507 "by_type": by_type,
508 "total_executions": total_triggers
509 }