Coverage for integrations / channels / queue / concurrency.py: 76.4%

195 statements  

« prev     ^ index     » next       coverage.py v7.14.0, created at 2026-05-12 04:49 +0000

1""" 

2Concurrency Control System 

3 

4Limits concurrent message processing to prevent overload. 

5Ported from HevolveBot's src/config/agent-limits.ts. 

6 

7Features: 

8- Per-user limits 

9- Per-channel limits 

10- Per-chat limits 

11- Global limits 

12- Queue when limited option 

13""" 

14 

15from __future__ import annotations 

16 

17import asyncio 

18import logging 

19import threading 

20from dataclasses import dataclass, field 

21from datetime import datetime 

22from typing import Optional, Dict, Set, Tuple 

23 

24logger = logging.getLogger(__name__) 

25 

26 

27@dataclass 

28class ConcurrencyLimits: 

29 """Configuration for concurrency limits.""" 

30 max_per_user: int = 4 

31 max_per_channel: int = 20 

32 max_per_chat: int = 2 

33 max_global: int = 100 

34 queue_when_limited: bool = True 

35 timeout_seconds: int = 300 

36 

37 

38@dataclass 

39class ConcurrencyStats: 

40 """Statistics for concurrency controller.""" 

41 current_global: int = 0 

42 current_by_channel: Dict[str, int] = field(default_factory=dict) 

43 current_by_chat: Dict[str, int] = field(default_factory=dict) 

44 current_by_user: Dict[str, int] = field(default_factory=dict) 

45 total_acquired: int = 0 

46 total_rejected: int = 0 

47 total_queued: int = 0 

48 total_released: int = 0 

49 

50 

51@dataclass 

52class ConcurrencySlot: 

53 """Represents an acquired concurrency slot.""" 

54 slot_id: str 

55 channel: str 

56 chat_id: str 

57 user_id: str 

58 acquired_at: datetime = field(default_factory=datetime.now) 

59 

60 

61class ConcurrencyController: 

62 """ 

63 Controls concurrent message processing. 

64 

65 Ensures system doesn't get overwhelmed by limiting how many 

66 messages can be processed simultaneously. 

67 

68 Usage: 

69 limits = ConcurrencyLimits(max_per_user=4, max_global=100) 

70 controller = ConcurrencyController(limits) 

71 

72 # Try to acquire a slot 

73 if await controller.acquire("telegram", "chat123", "user456"): 

74 try: 

75 # Process message 

76 pass 

77 finally: 

78 controller.release("telegram", "chat123", "user456") 

79 """ 

80 

81 def __init__(self, limits: ConcurrencyLimits): 

82 self.limits = limits 

83 self._slots: Dict[str, ConcurrencySlot] = {} 

84 self._by_channel: Dict[str, Set[str]] = {} 

85 self._by_chat: Dict[str, Set[str]] = {} 

86 self._by_user: Dict[str, Set[str]] = {} 

87 self._lock = threading.Lock() 

88 self._stats = ConcurrencyStats() 

89 self._slot_counter = 0 

90 self._waiters: Dict[str, asyncio.Event] = {} 

91 

92 def _make_slot_id(self, channel: str, chat_id: str, user_id: str) -> str: 

93 """Generate unique slot ID.""" 

94 self._slot_counter += 1 

95 return f"{channel}:{chat_id}:{user_id}:{self._slot_counter}" 

96 

97 def _get_chat_key(self, channel: str, chat_id: str) -> str: 

98 """Get unique key for a chat.""" 

99 return f"{channel}:{chat_id}" 

100 

101 def _is_available_unlocked( 

102 self, 

103 channel: str, 

104 chat_id: str, 

105 user_id: str, 

106 ) -> bool: 

107 """ 

108 Check if a slot is available (internal, no lock). 

109 

110 Must be called with self._lock already held. 

111 """ 

112 # Check global limit 

113 if len(self._slots) >= self.limits.max_global: 

114 return False 

115 

116 # Check per-channel limit 

117 channel_slots = self._by_channel.get(channel, set()) 

118 if len(channel_slots) >= self.limits.max_per_channel: 

119 return False 

120 

121 # Check per-chat limit 

122 chat_key = self._get_chat_key(channel, chat_id) 

123 chat_slots = self._by_chat.get(chat_key, set()) 

124 if len(chat_slots) >= self.limits.max_per_chat: 

125 return False 

126 

127 # Check per-user limit 

128 user_slots = self._by_user.get(user_id, set()) 

129 if len(user_slots) >= self.limits.max_per_user: 

130 return False 

131 

132 return True 

133 

134 def is_available( 

135 self, 

136 channel: str, 

137 chat_id: str, 

138 user_id: str, 

139 ) -> bool: 

140 """ 

141 Check if a slot is available without acquiring. 

142 

143 Args: 

144 channel: Channel name 

145 chat_id: Chat identifier 

146 user_id: User identifier 

147 

148 Returns: 

149 True if slot would be available 

150 """ 

151 with self._lock: 

152 return self._is_available_unlocked(channel, chat_id, user_id) 

153 

154 def acquire_sync( 

155 self, 

156 channel: str, 

157 chat_id: str, 

158 user_id: str, 

159 ) -> Optional[str]: 

160 """ 

161 Synchronously try to acquire a concurrency slot. 

162 

163 Args: 

164 channel: Channel name 

165 chat_id: Chat identifier 

166 user_id: User identifier 

167 

168 Returns: 

169 Slot ID if acquired, None if not available 

170 """ 

171 with self._lock: 

172 if not self._is_available_unlocked(channel, chat_id, user_id): 

173 self._stats.total_rejected += 1 

174 return None 

175 

176 # Acquire slot 

177 slot_id = self._make_slot_id(channel, chat_id, user_id) 

178 slot = ConcurrencySlot( 

179 slot_id=slot_id, 

180 channel=channel, 

181 chat_id=chat_id, 

182 user_id=user_id, 

183 ) 

184 

185 self._slots[slot_id] = slot 

186 

187 # Update indexes 

188 if channel not in self._by_channel: 

189 self._by_channel[channel] = set() 

190 self._by_channel[channel].add(slot_id) 

191 

192 chat_key = self._get_chat_key(channel, chat_id) 

193 if chat_key not in self._by_chat: 

194 self._by_chat[chat_key] = set() 

195 self._by_chat[chat_key].add(slot_id) 

196 

197 if user_id not in self._by_user: 

198 self._by_user[user_id] = set() 

199 self._by_user[user_id].add(slot_id) 

200 

201 self._stats.total_acquired += 1 

202 return slot_id 

203 

204 async def acquire( 

205 self, 

206 channel: str, 

207 chat_id: str, 

208 user_id: str, 

209 wait: bool = False, 

210 timeout: Optional[float] = None, 

211 ) -> Optional[str]: 

212 """ 

213 Try to acquire a concurrency slot. 

214 

215 Args: 

216 channel: Channel name 

217 chat_id: Chat identifier 

218 user_id: User identifier 

219 wait: Whether to wait for a slot if not available 

220 timeout: Maximum wait time in seconds 

221 

222 Returns: 

223 Slot ID if acquired, None if not available or timed out 

224 """ 

225 # Try immediate acquire 

226 slot_id = self.acquire_sync(channel, chat_id, user_id) 

227 if slot_id: 

228 return slot_id 

229 

230 if not wait: 

231 return None 

232 

233 # Wait for availability 

234 wait_key = self._get_chat_key(channel, chat_id) 

235 event = asyncio.Event() 

236 

237 with self._lock: 

238 self._waiters[wait_key] = event 

239 self._stats.total_queued += 1 

240 

241 try: 

242 wait_timeout = timeout or self.limits.timeout_seconds 

243 await asyncio.wait_for(event.wait(), timeout=wait_timeout) 

244 return self.acquire_sync(channel, chat_id, user_id) 

245 except asyncio.TimeoutError: 

246 return None 

247 finally: 

248 with self._lock: 

249 self._waiters.pop(wait_key, None) 

250 

251 def release( 

252 self, 

253 slot_id: Optional[str] = None, 

254 channel: Optional[str] = None, 

255 chat_id: Optional[str] = None, 

256 user_id: Optional[str] = None, 

257 ) -> bool: 

258 """ 

259 Release a concurrency slot. 

260 

261 Can release by slot_id or by channel/chat_id/user_id combination. 

262 

263 Args: 

264 slot_id: Slot ID to release 

265 channel: Channel name 

266 chat_id: Chat identifier 

267 user_id: User identifier 

268 

269 Returns: 

270 True if released, False if not found 

271 """ 

272 with self._lock: 

273 # Find slot to release 

274 if slot_id and slot_id in self._slots: 

275 slot = self._slots.pop(slot_id) 

276 elif channel and chat_id and user_id: 

277 # Find slot by attributes 

278 for sid, s in list(self._slots.items()): 

279 if s.channel == channel and s.chat_id == chat_id and s.user_id == user_id: 

280 slot = self._slots.pop(sid) 

281 slot_id = sid 

282 break 

283 else: 

284 return False 

285 else: 

286 return False 

287 

288 # Update indexes 

289 if slot.channel in self._by_channel: 

290 self._by_channel[slot.channel].discard(slot_id) 

291 if not self._by_channel[slot.channel]: 

292 del self._by_channel[slot.channel] 

293 

294 chat_key = self._get_chat_key(slot.channel, slot.chat_id) 

295 if chat_key in self._by_chat: 

296 self._by_chat[chat_key].discard(slot_id) 

297 if not self._by_chat[chat_key]: 

298 del self._by_chat[chat_key] 

299 

300 if slot.user_id in self._by_user: 

301 self._by_user[slot.user_id].discard(slot_id) 

302 if not self._by_user[slot.user_id]: 

303 del self._by_user[slot.user_id] 

304 

305 self._stats.total_released += 1 

306 

307 # Notify waiters 

308 if chat_key in self._waiters: 

309 self._waiters[chat_key].set() 

310 

311 return True 

312 

313 def release_all_for_user(self, user_id: str) -> int: 

314 """Release all slots for a user.""" 

315 with self._lock: 

316 slot_ids = list(self._by_user.get(user_id, set())) 

317 

318 released = 0 

319 for slot_id in slot_ids: 

320 if self.release(slot_id=slot_id): 

321 released += 1 

322 return released 

323 

324 def release_all_for_channel(self, channel: str) -> int: 

325 """Release all slots for a channel.""" 

326 with self._lock: 

327 slot_ids = list(self._by_channel.get(channel, set())) 

328 

329 released = 0 

330 for slot_id in slot_ids: 

331 if self.release(slot_id=slot_id): 

332 released += 1 

333 return released 

334 

335 def release_all_for_chat(self, channel: str, chat_id: str) -> int: 

336 """Release all slots for a specific chat.""" 

337 chat_key = self._get_chat_key(channel, chat_id) 

338 with self._lock: 

339 slot_ids = list(self._by_chat.get(chat_key, set())) 

340 

341 released = 0 

342 for slot_id in slot_ids: 

343 if self.release(slot_id=slot_id): 

344 released += 1 

345 return released 

346 

347 def get_usage(self) -> ConcurrencyStats: 

348 """Get current concurrency usage statistics.""" 

349 with self._lock: 

350 stats = ConcurrencyStats( 

351 current_global=len(self._slots), 

352 current_by_channel={k: len(v) for k, v in self._by_channel.items()}, 

353 current_by_chat={k: len(v) for k, v in self._by_chat.items()}, 

354 current_by_user={k: len(v) for k, v in self._by_user.items()}, 

355 total_acquired=self._stats.total_acquired, 

356 total_rejected=self._stats.total_rejected, 

357 total_queued=self._stats.total_queued, 

358 total_released=self._stats.total_released, 

359 ) 

360 return stats 

361 

362 def get_slot_count( 

363 self, 

364 channel: Optional[str] = None, 

365 chat_id: Optional[str] = None, 

366 user_id: Optional[str] = None, 

367 ) -> int: 

368 """Get current slot count with optional filtering.""" 

369 with self._lock: 

370 if user_id: 

371 return len(self._by_user.get(user_id, set())) 

372 if channel and chat_id: 

373 chat_key = self._get_chat_key(channel, chat_id) 

374 return len(self._by_chat.get(chat_key, set())) 

375 if channel: 

376 return len(self._by_channel.get(channel, set())) 

377 return len(self._slots) 

378 

379 def cleanup_stale(self, max_age_seconds: Optional[int] = None) -> int: 

380 """ 

381 Remove stale slots that have been held too long. 

382 

383 Args: 

384 max_age_seconds: Maximum slot age (defaults to timeout_seconds) 

385 

386 Returns: 

387 Number of slots released 

388 """ 

389 max_age = max_age_seconds or self.limits.timeout_seconds 

390 cutoff = datetime.now() 

391 

392 with self._lock: 

393 stale_ids = [] 

394 for slot_id, slot in self._slots.items(): 

395 age = (cutoff - slot.acquired_at).total_seconds() 

396 if age > max_age: 

397 stale_ids.append(slot_id) 

398 

399 released = 0 

400 for slot_id in stale_ids: 

401 if self.release(slot_id=slot_id): 

402 released += 1 

403 

404 return released 

405 

406 def clear(self) -> int: 

407 """Clear all slots.""" 

408 with self._lock: 

409 count = len(self._slots) 

410 self._slots.clear() 

411 self._by_channel.clear() 

412 self._by_chat.clear() 

413 self._by_user.clear() 

414 return count