Coverage for integrations / channels / automation / triggers.py: 67.6%

213 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Trigger Manager for HevolveBot Integration. 

3 

4Provides event-based trigger registration and evaluation. 

5""" 

6 

7import re 

8import secrets 

9from dataclasses import dataclass, field 

10from datetime import datetime 

11from enum import Enum 

12from typing import Any, Callable, Dict, List, Optional, Pattern, Union 

13 

14 

15class TriggerType(Enum): 

16 """Types of triggers that can be registered.""" 

17 MESSAGE_RECEIVED = "message_received" 

18 USER_JOINED = "user_joined" 

19 USER_LEFT = "user_left" 

20 REACTION_ADDED = "reaction_added" 

21 FILE_SHARED = "file_shared" 

22 MENTION = "mention" 

23 KEYWORD = "keyword" 

24 REGEX = "regex" 

25 SCHEDULE = "schedule" 

26 WEBHOOK = "webhook" 

27 VISUAL_MATCH = "visual_match" 

28 SCREEN_MATCH = "screen_match" 

29 

30 

31class TriggerPriority(Enum): 

32 """Priority levels for trigger execution.""" 

33 LOW = 1 

34 NORMAL = 5 

35 HIGH = 10 

36 CRITICAL = 100 

37 

38 

39@dataclass 

40class TriggerCondition: 

41 """A condition that must be met for a trigger to fire.""" 

42 field: str 

43 operator: str # 'eq', 'ne', 'gt', 'lt', 'gte', 'lte', 'contains', 'startswith', 'endswith', 'matches' 

44 value: Any 

45 

46 def evaluate(self, data: Dict[str, Any]) -> bool: 

47 """ 

48 Evaluate this condition against event data. 

49 

50 Args: 

51 data: The event data to check 

52 

53 Returns: 

54 True if condition is met 

55 """ 

56 actual = data.get(self.field) 

57 

58 if actual is None: 

59 return False 

60 

61 if self.operator == "eq": 

62 return actual == self.value 

63 elif self.operator == "ne": 

64 return actual != self.value 

65 elif self.operator == "gt": 

66 return actual > self.value 

67 elif self.operator == "lt": 

68 return actual < self.value 

69 elif self.operator == "gte": 

70 return actual >= self.value 

71 elif self.operator == "lte": 

72 return actual <= self.value 

73 elif self.operator == "contains": 

74 return self.value in str(actual) 

75 elif self.operator == "startswith": 

76 return str(actual).startswith(self.value) 

77 elif self.operator == "endswith": 

78 return str(actual).endswith(self.value) 

79 elif self.operator == "matches": 

80 return bool(re.match(self.value, str(actual))) 

81 else: 

82 return False 

83 

84 

85@dataclass 

86class Trigger: 

87 """A registered trigger.""" 

88 id: str 

89 name: str 

90 trigger_type: TriggerType 

91 callback: Callable[[Dict[str, Any]], Any] 

92 enabled: bool = True 

93 priority: TriggerPriority = TriggerPriority.NORMAL 

94 conditions: List[TriggerCondition] = field(default_factory=list) 

95 keywords: List[str] = field(default_factory=list) 

96 pattern: Optional[str] = None 

97 compiled_pattern: Optional[Pattern] = None 

98 channel_filter: Optional[List[str]] = None 

99 user_filter: Optional[List[str]] = None 

100 cooldown_seconds: int = 0 

101 last_triggered: Optional[datetime] = None 

102 trigger_count: int = 0 

103 max_triggers: Optional[int] = None 

104 created_at: datetime = field(default_factory=datetime.now) 

105 metadata: Dict[str, Any] = field(default_factory=dict) 

106 

107 

108@dataclass 

109class TriggerResult: 

110 """Result of a trigger evaluation.""" 

111 trigger_id: str 

112 trigger_name: str 

113 triggered: bool 

114 callback_result: Any = None 

115 error: Optional[str] = None 

116 execution_time_ms: float = 0.0 

117 

118 

119class TriggerManager: 

120 """ 

121 Manages event-based triggers. 

122 

123 Features: 

124 - Register triggers for various event types 

125 - Support for keyword and regex matching 

126 - Conditional trigger evaluation 

127 - Priority-based execution order 

128 - Cooldown support 

129 - Channel and user filtering 

130 """ 

131 

132 def __init__(self): 

133 """Initialize the TriggerManager.""" 

134 self._triggers: Dict[str, Trigger] = {} 

135 self._type_index: Dict[TriggerType, List[str]] = {t: [] for t in TriggerType} 

136 

137 def register( 

138 self, 

139 trigger_type: TriggerType, 

140 callback: Callable[[Dict[str, Any]], Any], 

141 name: Optional[str] = None, 

142 trigger_id: Optional[str] = None, 

143 priority: TriggerPriority = TriggerPriority.NORMAL, 

144 conditions: Optional[List[TriggerCondition]] = None, 

145 keywords: Optional[List[str]] = None, 

146 pattern: Optional[str] = None, 

147 channel_filter: Optional[List[str]] = None, 

148 user_filter: Optional[List[str]] = None, 

149 cooldown_seconds: int = 0, 

150 max_triggers: Optional[int] = None, 

151 metadata: Optional[Dict[str, Any]] = None 

152 ) -> Trigger: 

153 """ 

154 Register a new trigger. 

155 

156 Args: 

157 trigger_type: The type of event to trigger on 

158 callback: Function to call when trigger fires 

159 name: Optional trigger name 

160 trigger_id: Optional custom ID 

161 priority: Execution priority 

162 conditions: Optional list of conditions 

163 keywords: Keywords to match (for KEYWORD type) 

164 pattern: Regex pattern (for REGEX type) 

165 channel_filter: Optional list of channel IDs to match 

166 user_filter: Optional list of user IDs to match 

167 cooldown_seconds: Minimum seconds between triggers 

168 max_triggers: Maximum number of times to trigger 

169 metadata: Optional metadata 

170 

171 Returns: 

172 The created Trigger 

173 

174 Raises: 

175 ValueError: If required parameters are missing 

176 """ 

177 trigger_id = trigger_id or f"trg_{secrets.token_hex(6)}" 

178 name = name or f"{trigger_type.value} trigger" 

179 

180 if trigger_id in self._triggers: 

181 raise ValueError(f"Trigger with ID '{trigger_id}' already exists") 

182 

183 # Validate type-specific requirements 

184 if trigger_type == TriggerType.KEYWORD and not keywords: 

185 raise ValueError("KEYWORD trigger requires keywords list") 

186 

187 if trigger_type == TriggerType.REGEX and not pattern: 

188 raise ValueError("REGEX trigger requires pattern") 

189 

190 # Compile regex pattern if provided 

191 compiled_pattern = None 

192 if pattern: 

193 try: 

194 compiled_pattern = re.compile(pattern, re.IGNORECASE) 

195 except re.error as e: 

196 raise ValueError(f"Invalid regex pattern: {e}") 

197 

198 trigger = Trigger( 

199 id=trigger_id, 

200 name=name, 

201 trigger_type=trigger_type, 

202 callback=callback, 

203 priority=priority, 

204 conditions=conditions or [], 

205 keywords=keywords or [], 

206 pattern=pattern, 

207 compiled_pattern=compiled_pattern, 

208 channel_filter=channel_filter, 

209 user_filter=user_filter, 

210 cooldown_seconds=cooldown_seconds, 

211 max_triggers=max_triggers, 

212 metadata=metadata or {} 

213 ) 

214 

215 self._triggers[trigger_id] = trigger 

216 self._type_index[trigger_type].append(trigger_id) 

217 

218 return trigger 

219 

220 def unregister(self, trigger_id: str) -> bool: 

221 """ 

222 Unregister a trigger. 

223 

224 Args: 

225 trigger_id: The trigger ID to remove 

226 

227 Returns: 

228 True if removed, False if not found 

229 """ 

230 if trigger_id in self._triggers: 

231 trigger = self._triggers[trigger_id] 

232 self._type_index[trigger.trigger_type].remove(trigger_id) 

233 del self._triggers[trigger_id] 

234 return True 

235 return False 

236 

237 def enable(self, trigger_id: str) -> bool: 

238 """ 

239 Enable a trigger. 

240 

241 Args: 

242 trigger_id: The trigger ID 

243 

244 Returns: 

245 True if enabled, False if not found 

246 """ 

247 if trigger_id in self._triggers: 

248 self._triggers[trigger_id].enabled = True 

249 return True 

250 return False 

251 

252 def disable(self, trigger_id: str) -> bool: 

253 """ 

254 Disable a trigger. 

255 

256 Args: 

257 trigger_id: The trigger ID 

258 

259 Returns: 

260 True if disabled, False if not found 

261 """ 

262 if trigger_id in self._triggers: 

263 self._triggers[trigger_id].enabled = False 

264 return True 

265 return False 

266 

267 def get_trigger(self, trigger_id: str) -> Optional[Trigger]: 

268 """ 

269 Get a trigger by ID. 

270 

271 Args: 

272 trigger_id: The trigger ID 

273 

274 Returns: 

275 The trigger or None 

276 """ 

277 return self._triggers.get(trigger_id) 

278 

279 def list_triggers( 

280 self, 

281 trigger_type: Optional[TriggerType] = None, 

282 enabled_only: bool = False 

283 ) -> List[Trigger]: 

284 """ 

285 List registered triggers. 

286 

287 Args: 

288 trigger_type: Optional filter by type 

289 enabled_only: Only return enabled triggers 

290 

291 Returns: 

292 List of matching triggers 

293 """ 

294 if trigger_type: 

295 trigger_ids = self._type_index.get(trigger_type, []) 

296 triggers = [self._triggers[tid] for tid in trigger_ids] 

297 else: 

298 triggers = list(self._triggers.values()) 

299 

300 if enabled_only: 

301 triggers = [t for t in triggers if t.enabled] 

302 

303 # Sort by priority (highest first) 

304 triggers.sort(key=lambda t: t.priority.value, reverse=True) 

305 

306 return triggers 

307 

308 def evaluate( 

309 self, 

310 event_type: TriggerType, 

311 event_data: Dict[str, Any], 

312 stop_on_first: bool = False 

313 ) -> List[TriggerResult]: 

314 """ 

315 Evaluate triggers for an event. 

316 

317 Args: 

318 event_type: The type of event 

319 event_data: The event data 

320 stop_on_first: Stop after first successful trigger 

321 

322 Returns: 

323 List of trigger results 

324 """ 

325 import time 

326 

327 results = [] 

328 

329 # Get triggers for this event type, sorted by priority 

330 triggers = self.list_triggers(trigger_type=event_type, enabled_only=True) 

331 

332 for trigger in triggers: 

333 # Check if trigger should fire 

334 should_fire, reason = self._should_fire(trigger, event_data) 

335 

336 if not should_fire: 

337 continue 

338 

339 # Execute the trigger 

340 start_time = time.time() 

341 result = TriggerResult( 

342 trigger_id=trigger.id, 

343 trigger_name=trigger.name, 

344 triggered=True 

345 ) 

346 

347 try: 

348 result.callback_result = trigger.callback(event_data) 

349 trigger.last_triggered = datetime.now() 

350 trigger.trigger_count += 1 

351 

352 # Check if max triggers reached 

353 if trigger.max_triggers and trigger.trigger_count >= trigger.max_triggers: 

354 trigger.enabled = False 

355 

356 except Exception as e: 

357 result.error = str(e) 

358 

359 result.execution_time_ms = (time.time() - start_time) * 1000 

360 results.append(result) 

361 

362 if stop_on_first and result.callback_result is not None: 

363 break 

364 

365 return results 

366 

367 def _should_fire( 

368 self, 

369 trigger: Trigger, 

370 event_data: Dict[str, Any] 

371 ) -> tuple[bool, str]: 

372 """ 

373 Check if a trigger should fire for given event data. 

374 

375 Returns: 

376 Tuple of (should_fire, reason) 

377 """ 

378 # Check cooldown 

379 if trigger.cooldown_seconds > 0 and trigger.last_triggered: 

380 elapsed = (datetime.now() - trigger.last_triggered).total_seconds() 

381 if elapsed < trigger.cooldown_seconds: 

382 return False, "cooldown" 

383 

384 # Check max triggers 

385 if trigger.max_triggers and trigger.trigger_count >= trigger.max_triggers: 

386 return False, "max_triggers_reached" 

387 

388 # Check channel filter 

389 if trigger.channel_filter: 

390 channel = event_data.get("channel_id") or event_data.get("channel") 

391 if channel not in trigger.channel_filter: 

392 return False, "channel_filter" 

393 

394 # Check user filter 

395 if trigger.user_filter: 

396 user = event_data.get("user_id") or event_data.get("user") 

397 if user not in trigger.user_filter: 

398 return False, "user_filter" 

399 

400 # Check conditions 

401 for condition in trigger.conditions: 

402 if not condition.evaluate(event_data): 

403 return False, f"condition_{condition.field}" 

404 

405 # Type-specific checks 

406 if trigger.trigger_type == TriggerType.KEYWORD: 

407 message = str(event_data.get("message", "") or event_data.get("text", "")) 

408 if not any(kw.lower() in message.lower() for kw in trigger.keywords): 

409 return False, "keyword_not_found" 

410 

411 elif trigger.trigger_type == TriggerType.REGEX: 

412 message = str(event_data.get("message", "") or event_data.get("text", "")) 

413 if trigger.compiled_pattern and not trigger.compiled_pattern.search(message): 

414 return False, "pattern_not_matched" 

415 

416 elif trigger.trigger_type == TriggerType.MENTION: 

417 mentions = event_data.get("mentions", []) 

418 mentioned_user = event_data.get("mentioned_user") 

419 if mentioned_user and mentioned_user not in mentions: 

420 return False, "not_mentioned" 

421 

422 elif trigger.trigger_type in (TriggerType.VISUAL_MATCH, TriggerType.SCREEN_MATCH): 

423 description = str(event_data.get("description", "")) 

424 if trigger.keywords and not any(kw.lower() in description.lower() for kw in trigger.keywords): 

425 return False, "visual_keyword_not_found" 

426 if trigger.compiled_pattern and not trigger.compiled_pattern.search(description): 

427 return False, "visual_pattern_not_matched" 

428 

429 return True, "ok" 

430 

431 def evaluate_message( 

432 self, 

433 message: str, 

434 channel_id: Optional[str] = None, 

435 user_id: Optional[str] = None, 

436 extra_data: Optional[Dict[str, Any]] = None 

437 ) -> List[TriggerResult]: 

438 """ 

439 Convenience method to evaluate a message against all applicable triggers. 

440 

441 Args: 

442 message: The message text 

443 channel_id: Optional channel ID 

444 user_id: Optional user ID 

445 extra_data: Optional additional event data 

446 

447 Returns: 

448 List of trigger results 

449 """ 

450 event_data = { 

451 "message": message, 

452 "text": message, 

453 "channel_id": channel_id, 

454 "user_id": user_id, 

455 **(extra_data or {}) 

456 } 

457 

458 results = [] 

459 

460 # Evaluate MESSAGE_RECEIVED triggers 

461 results.extend(self.evaluate(TriggerType.MESSAGE_RECEIVED, event_data)) 

462 

463 # Evaluate KEYWORD triggers 

464 results.extend(self.evaluate(TriggerType.KEYWORD, event_data)) 

465 

466 # Evaluate REGEX triggers 

467 results.extend(self.evaluate(TriggerType.REGEX, event_data)) 

468 

469 return results 

470 

471 def reset_trigger(self, trigger_id: str) -> bool: 

472 """ 

473 Reset a trigger's state (count, last_triggered, re-enable). 

474 

475 Args: 

476 trigger_id: The trigger ID 

477 

478 Returns: 

479 True if reset, False if not found 

480 """ 

481 if trigger_id in self._triggers: 

482 trigger = self._triggers[trigger_id] 

483 trigger.trigger_count = 0 

484 trigger.last_triggered = None 

485 trigger.enabled = True 

486 return True 

487 return False 

488 

489 def get_stats(self) -> Dict[str, Any]: 

490 """ 

491 Get statistics about registered triggers. 

492 

493 Returns: 

494 Dictionary with trigger statistics 

495 """ 

496 total = len(self._triggers) 

497 enabled = sum(1 for t in self._triggers.values() if t.enabled) 

498 by_type = { 

499 t.value: len(ids) for t, ids in self._type_index.items() 

500 } 

501 total_triggers = sum(t.trigger_count for t in self._triggers.values()) 

502 

503 return { 

504 "total_triggers": total, 

505 "enabled_triggers": enabled, 

506 "disabled_triggers": total - enabled, 

507 "by_type": by_type, 

508 "total_executions": total_triggers 

509 }