Coverage for integrations / channels / queue / concurrency.py: 76.4%
195 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2Concurrency Control System
4Limits concurrent message processing to prevent overload.
5Ported from HevolveBot's src/config/agent-limits.ts.
7Features:
8- Per-user limits
9- Per-channel limits
10- Per-chat limits
11- Global limits
12- Queue when limited option
13"""
15from __future__ import annotations
17import asyncio
18import logging
19import threading
20from dataclasses import dataclass, field
21from datetime import datetime
22from typing import Optional, Dict, Set, Tuple
24logger = logging.getLogger(__name__)
27@dataclass
28class ConcurrencyLimits:
29 """Configuration for concurrency limits."""
30 max_per_user: int = 4
31 max_per_channel: int = 20
32 max_per_chat: int = 2
33 max_global: int = 100
34 queue_when_limited: bool = True
35 timeout_seconds: int = 300
38@dataclass
39class ConcurrencyStats:
40 """Statistics for concurrency controller."""
41 current_global: int = 0
42 current_by_channel: Dict[str, int] = field(default_factory=dict)
43 current_by_chat: Dict[str, int] = field(default_factory=dict)
44 current_by_user: Dict[str, int] = field(default_factory=dict)
45 total_acquired: int = 0
46 total_rejected: int = 0
47 total_queued: int = 0
48 total_released: int = 0
51@dataclass
52class ConcurrencySlot:
53 """Represents an acquired concurrency slot."""
54 slot_id: str
55 channel: str
56 chat_id: str
57 user_id: str
58 acquired_at: datetime = field(default_factory=datetime.now)
61class ConcurrencyController:
62 """
63 Controls concurrent message processing.
65 Ensures system doesn't get overwhelmed by limiting how many
66 messages can be processed simultaneously.
68 Usage:
69 limits = ConcurrencyLimits(max_per_user=4, max_global=100)
70 controller = ConcurrencyController(limits)
72 # Try to acquire a slot
73 if await controller.acquire("telegram", "chat123", "user456"):
74 try:
75 # Process message
76 pass
77 finally:
78 controller.release("telegram", "chat123", "user456")
79 """
81 def __init__(self, limits: ConcurrencyLimits):
82 self.limits = limits
83 self._slots: Dict[str, ConcurrencySlot] = {}
84 self._by_channel: Dict[str, Set[str]] = {}
85 self._by_chat: Dict[str, Set[str]] = {}
86 self._by_user: Dict[str, Set[str]] = {}
87 self._lock = threading.Lock()
88 self._stats = ConcurrencyStats()
89 self._slot_counter = 0
90 self._waiters: Dict[str, asyncio.Event] = {}
92 def _make_slot_id(self, channel: str, chat_id: str, user_id: str) -> str:
93 """Generate unique slot ID."""
94 self._slot_counter += 1
95 return f"{channel}:{chat_id}:{user_id}:{self._slot_counter}"
97 def _get_chat_key(self, channel: str, chat_id: str) -> str:
98 """Get unique key for a chat."""
99 return f"{channel}:{chat_id}"
101 def _is_available_unlocked(
102 self,
103 channel: str,
104 chat_id: str,
105 user_id: str,
106 ) -> bool:
107 """
108 Check if a slot is available (internal, no lock).
110 Must be called with self._lock already held.
111 """
112 # Check global limit
113 if len(self._slots) >= self.limits.max_global:
114 return False
116 # Check per-channel limit
117 channel_slots = self._by_channel.get(channel, set())
118 if len(channel_slots) >= self.limits.max_per_channel:
119 return False
121 # Check per-chat limit
122 chat_key = self._get_chat_key(channel, chat_id)
123 chat_slots = self._by_chat.get(chat_key, set())
124 if len(chat_slots) >= self.limits.max_per_chat:
125 return False
127 # Check per-user limit
128 user_slots = self._by_user.get(user_id, set())
129 if len(user_slots) >= self.limits.max_per_user:
130 return False
132 return True
134 def is_available(
135 self,
136 channel: str,
137 chat_id: str,
138 user_id: str,
139 ) -> bool:
140 """
141 Check if a slot is available without acquiring.
143 Args:
144 channel: Channel name
145 chat_id: Chat identifier
146 user_id: User identifier
148 Returns:
149 True if slot would be available
150 """
151 with self._lock:
152 return self._is_available_unlocked(channel, chat_id, user_id)
154 def acquire_sync(
155 self,
156 channel: str,
157 chat_id: str,
158 user_id: str,
159 ) -> Optional[str]:
160 """
161 Synchronously try to acquire a concurrency slot.
163 Args:
164 channel: Channel name
165 chat_id: Chat identifier
166 user_id: User identifier
168 Returns:
169 Slot ID if acquired, None if not available
170 """
171 with self._lock:
172 if not self._is_available_unlocked(channel, chat_id, user_id):
173 self._stats.total_rejected += 1
174 return None
176 # Acquire slot
177 slot_id = self._make_slot_id(channel, chat_id, user_id)
178 slot = ConcurrencySlot(
179 slot_id=slot_id,
180 channel=channel,
181 chat_id=chat_id,
182 user_id=user_id,
183 )
185 self._slots[slot_id] = slot
187 # Update indexes
188 if channel not in self._by_channel:
189 self._by_channel[channel] = set()
190 self._by_channel[channel].add(slot_id)
192 chat_key = self._get_chat_key(channel, chat_id)
193 if chat_key not in self._by_chat:
194 self._by_chat[chat_key] = set()
195 self._by_chat[chat_key].add(slot_id)
197 if user_id not in self._by_user:
198 self._by_user[user_id] = set()
199 self._by_user[user_id].add(slot_id)
201 self._stats.total_acquired += 1
202 return slot_id
204 async def acquire(
205 self,
206 channel: str,
207 chat_id: str,
208 user_id: str,
209 wait: bool = False,
210 timeout: Optional[float] = None,
211 ) -> Optional[str]:
212 """
213 Try to acquire a concurrency slot.
215 Args:
216 channel: Channel name
217 chat_id: Chat identifier
218 user_id: User identifier
219 wait: Whether to wait for a slot if not available
220 timeout: Maximum wait time in seconds
222 Returns:
223 Slot ID if acquired, None if not available or timed out
224 """
225 # Try immediate acquire
226 slot_id = self.acquire_sync(channel, chat_id, user_id)
227 if slot_id:
228 return slot_id
230 if not wait:
231 return None
233 # Wait for availability
234 wait_key = self._get_chat_key(channel, chat_id)
235 event = asyncio.Event()
237 with self._lock:
238 self._waiters[wait_key] = event
239 self._stats.total_queued += 1
241 try:
242 wait_timeout = timeout or self.limits.timeout_seconds
243 await asyncio.wait_for(event.wait(), timeout=wait_timeout)
244 return self.acquire_sync(channel, chat_id, user_id)
245 except asyncio.TimeoutError:
246 return None
247 finally:
248 with self._lock:
249 self._waiters.pop(wait_key, None)
251 def release(
252 self,
253 slot_id: Optional[str] = None,
254 channel: Optional[str] = None,
255 chat_id: Optional[str] = None,
256 user_id: Optional[str] = None,
257 ) -> bool:
258 """
259 Release a concurrency slot.
261 Can release by slot_id or by channel/chat_id/user_id combination.
263 Args:
264 slot_id: Slot ID to release
265 channel: Channel name
266 chat_id: Chat identifier
267 user_id: User identifier
269 Returns:
270 True if released, False if not found
271 """
272 with self._lock:
273 # Find slot to release
274 if slot_id and slot_id in self._slots:
275 slot = self._slots.pop(slot_id)
276 elif channel and chat_id and user_id:
277 # Find slot by attributes
278 for sid, s in list(self._slots.items()):
279 if s.channel == channel and s.chat_id == chat_id and s.user_id == user_id:
280 slot = self._slots.pop(sid)
281 slot_id = sid
282 break
283 else:
284 return False
285 else:
286 return False
288 # Update indexes
289 if slot.channel in self._by_channel:
290 self._by_channel[slot.channel].discard(slot_id)
291 if not self._by_channel[slot.channel]:
292 del self._by_channel[slot.channel]
294 chat_key = self._get_chat_key(slot.channel, slot.chat_id)
295 if chat_key in self._by_chat:
296 self._by_chat[chat_key].discard(slot_id)
297 if not self._by_chat[chat_key]:
298 del self._by_chat[chat_key]
300 if slot.user_id in self._by_user:
301 self._by_user[slot.user_id].discard(slot_id)
302 if not self._by_user[slot.user_id]:
303 del self._by_user[slot.user_id]
305 self._stats.total_released += 1
307 # Notify waiters
308 if chat_key in self._waiters:
309 self._waiters[chat_key].set()
311 return True
313 def release_all_for_user(self, user_id: str) -> int:
314 """Release all slots for a user."""
315 with self._lock:
316 slot_ids = list(self._by_user.get(user_id, set()))
318 released = 0
319 for slot_id in slot_ids:
320 if self.release(slot_id=slot_id):
321 released += 1
322 return released
324 def release_all_for_channel(self, channel: str) -> int:
325 """Release all slots for a channel."""
326 with self._lock:
327 slot_ids = list(self._by_channel.get(channel, set()))
329 released = 0
330 for slot_id in slot_ids:
331 if self.release(slot_id=slot_id):
332 released += 1
333 return released
335 def release_all_for_chat(self, channel: str, chat_id: str) -> int:
336 """Release all slots for a specific chat."""
337 chat_key = self._get_chat_key(channel, chat_id)
338 with self._lock:
339 slot_ids = list(self._by_chat.get(chat_key, set()))
341 released = 0
342 for slot_id in slot_ids:
343 if self.release(slot_id=slot_id):
344 released += 1
345 return released
347 def get_usage(self) -> ConcurrencyStats:
348 """Get current concurrency usage statistics."""
349 with self._lock:
350 stats = ConcurrencyStats(
351 current_global=len(self._slots),
352 current_by_channel={k: len(v) for k, v in self._by_channel.items()},
353 current_by_chat={k: len(v) for k, v in self._by_chat.items()},
354 current_by_user={k: len(v) for k, v in self._by_user.items()},
355 total_acquired=self._stats.total_acquired,
356 total_rejected=self._stats.total_rejected,
357 total_queued=self._stats.total_queued,
358 total_released=self._stats.total_released,
359 )
360 return stats
362 def get_slot_count(
363 self,
364 channel: Optional[str] = None,
365 chat_id: Optional[str] = None,
366 user_id: Optional[str] = None,
367 ) -> int:
368 """Get current slot count with optional filtering."""
369 with self._lock:
370 if user_id:
371 return len(self._by_user.get(user_id, set()))
372 if channel and chat_id:
373 chat_key = self._get_chat_key(channel, chat_id)
374 return len(self._by_chat.get(chat_key, set()))
375 if channel:
376 return len(self._by_channel.get(channel, set()))
377 return len(self._slots)
379 def cleanup_stale(self, max_age_seconds: Optional[int] = None) -> int:
380 """
381 Remove stale slots that have been held too long.
383 Args:
384 max_age_seconds: Maximum slot age (defaults to timeout_seconds)
386 Returns:
387 Number of slots released
388 """
389 max_age = max_age_seconds or self.limits.timeout_seconds
390 cutoff = datetime.now()
392 with self._lock:
393 stale_ids = []
394 for slot_id, slot in self._slots.items():
395 age = (cutoff - slot.acquired_at).total_seconds()
396 if age > max_age:
397 stale_ids.append(slot_id)
399 released = 0
400 for slot_id in stale_ids:
401 if self.release(slot_id=slot_id):
402 released += 1
404 return released
406 def clear(self) -> int:
407 """Clear all slots."""
408 with self._lock:
409 count = len(self._slots)
410 self._slots.clear()
411 self._by_channel.clear()
412 self._by_chat.clear()
413 self._by_user.clear()
414 return count