Coverage for integrations / social / rate_limiter.py: 97.0%
67 statements
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
« prev ^ index » next coverage.py v7.14.0, created at 2026-05-12 04:49 +0000
1"""
2HevolveSocial - Rate Limiter
3In-memory token bucket with Redis fallback.
4"""
5import os
6import time
7import threading
8from functools import wraps
9from flask import request, g, jsonify
12class TokenBucket:
13 """Thread-safe in-memory token bucket rate limiter."""
15 _CLEANUP_INTERVAL = 100 # Run cleanup every 100 calls
17 def __init__(self):
18 self._buckets = {} # key -> (tokens, last_refill)
19 self._lock = threading.Lock()
20 self._cleanup_counter = 0
22 def _get_key(self, user_id: str, action: str) -> str:
23 return f"{user_id}:{action}"
25 def check(self, user_id: str, action: str, max_tokens: int, refill_rate: float) -> bool:
26 """
27 Check if action is allowed. Returns True if allowed, False if rate-limited.
28 refill_rate = tokens added per second.
29 """
30 self._cleanup_counter += 1
31 if self._cleanup_counter >= self._CLEANUP_INTERVAL:
32 self._cleanup_counter = 0
33 self.cleanup()
35 key = self._get_key(user_id, action)
36 now = time.time()
38 with self._lock:
39 if key not in self._buckets:
40 self._buckets[key] = (max_tokens - 1, now)
41 return True
43 tokens, last_refill = self._buckets[key]
44 elapsed = now - last_refill
45 tokens = min(max_tokens, tokens + elapsed * refill_rate)
47 if tokens >= 1:
48 self._buckets[key] = (tokens - 1, now)
49 return True
50 else:
51 self._buckets[key] = (tokens, now)
52 return False
54 def cleanup(self, max_age: float = 3600):
55 """Remove stale entries older than max_age seconds."""
56 now = time.time()
57 with self._lock:
58 stale = [k for k, (_, t) in self._buckets.items() if now - t > max_age]
59 for k in stale:
60 del self._buckets[k]
63_limiter = TokenBucket()
65def _build_limits():
66 """Build rate limit config. Relaxed when SOCIAL_RATE_LIMIT_DISABLED=1 or FLASK_ENV=testing."""
67 disabled = os.environ.get('SOCIAL_RATE_LIMIT_DISABLED', '').strip() in ('1', 'true', 'yes')
68 testing = os.environ.get('FLASK_ENV', '').strip() in ('testing', 'test')
70 if disabled or testing:
71 _unlimited = {'max_tokens': 100000, 'refill_rate': 100000 / 60}
72 return {k: dict(_unlimited) for k in ('global', 'auth', 'register', 'post', 'comment', 'vote', 'search')}
74 # Production limits
75 return {
76 'global': {'max_tokens': 100, 'refill_rate': 100 / 60}, # 100 req/min
77 'auth': {'max_tokens': 5, 'refill_rate': 5 / 300}, # 5 attempts/5min
78 'register': {'max_tokens': 3, 'refill_rate': 3 / 3600}, # 3 registrations/hr
79 'post': {'max_tokens': 1, 'refill_rate': 1 / 1800}, # 1 post/30min
80 'comment': {'max_tokens': 50, 'refill_rate': 50 / 3600}, # 50 comments/hr
81 'vote': {'max_tokens': 60, 'refill_rate': 60 / 60}, # 60 votes/min
82 'search': {'max_tokens': 30, 'refill_rate': 30 / 60}, # 30 searches/min
83 }
86LIMITS = _build_limits()
89def rate_limit(action: str = 'global'):
90 """Decorator: rate-limits an endpoint. Requires g.user to be set."""
91 def decorator(f):
92 @wraps(f)
93 def decorated(*args, **kwargs):
94 user_id = getattr(g, 'user', None)
95 if user_id is None:
96 # Use IP as fallback for unauthenticated requests
97 user_id = request.remote_addr or 'anonymous'
98 else:
99 user_id = g.user.id
101 # Check global limit first
102 cfg = LIMITS.get('global', LIMITS['global'])
103 if not _limiter.check(str(user_id), 'global', cfg['max_tokens'], cfg['refill_rate']):
104 return jsonify({
105 'success': False,
106 'error': 'Rate limit exceeded. Try again later.'
107 }), 429
109 # Check action-specific limit
110 if action != 'global' and action in LIMITS:
111 cfg = LIMITS[action]
112 if not _limiter.check(str(user_id), action, cfg['max_tokens'], cfg['refill_rate']):
113 return jsonify({
114 'success': False,
115 'error': f'Rate limit exceeded for {action}. Try again later.'
116 }), 429
118 return f(*args, **kwargs)
119 return decorated
120 return decorator
123def get_limiter() -> TokenBucket:
124 return _limiter