I trying to make ratelimiter for users in aiogram bot with middlware, now I created class for requests and class for ratelimiter, where reuqest are. My problem is what is better for performance and safety, use multiple async lock calls for each operation or make these operations under one async lock.
my rate_limiter.py:
import asyncio
import time
from collections import defaultdict
from dataclasses import dataclass
class RequestsData:
def __init__(
self,
seconds_interval: int = 3,
max_requests_per_day: int = 5,
):
self.seconds_interval: int = seconds_interval
self.max_requests_per_day: int = max_requests_per_day
self.day_len_seconds: int = 60 * 60 * 24
self.last_time: float | None = None
self.just_limit_exited: bool = True
self.total_requests: int = 0
self.total_requests_start_time: float | None = None
self._async_lock = asyncio.Lock()
@classmethod
def get_now_seconds(self) -> float:
return time.time()
async def reset_day_limit(self) -> bool:
async with self._async_lock:
# no requests start time
if self.total_requests_start_time is None:
return True
current_time_seconds = self.get_now_seconds()
# check and reset total requests start time as current time seconds
if current_time_seconds - self.total_requests_start_time > self.day_len_seconds:
self.total_requests_start_time = current_time_seconds
self.total_requests = 0
return True
return False
async def exited_day_limit(self) -> tuple[bool, bool]:
async with self._async_lock:
if self.total_requests >= self.max_requests_per_day-1:
# set and reuturn limit exited
if self.just_limit_exited:
self.just_limit_exited = False
# limit exited, not blocked
return (True, False)
return (False, False)
# return no exited, can make requests
return (False, True)
async def rate_limited(self) -> tuple[bool, bool]:
async with self._async_lock:
current_time_seconds = self.get_now_seconds()
# check times diff less than interval
if self.last_time is not None and current_time_seconds - self.last_time < self.seconds_interval:
# set and return limit exited
if self.just_limit_exited:
self.just_limit_exited = False
# limit exited and blocked
return (True, False)
# limit not exited and blocked
return (False, False)
# limit not exited and not blocked
return (False, True)
async def add_reuqest(self):
async with self._async_lock:
current_time_seconds = self.get_now_seconds()
self.last_time = current_time_seconds
self.just_limit_exited = True
self.total_requests += 1
class RateLimiter:
def __init__(self,
seconds_interval: int=1,
max_requests_per_day: int=5,
):
self.requsts = defaultdict[int, RequestsData](
lambda: RequestsData(seconds_interval, max_requests_per_day)
)
async def on_request(self, uid: int) -> tuple[bool, bool]:
# get requests data by uid
uid_requests: RequestsData = self.requsts[uid]
# check and reset day limit start
await uid_requests.reset_day_limit()
# check exited day limit
just_exited, can_make_requests = await uid_requests.exited_day_limit()
if not can_make_requests:
return (just_exited, can_make_requests)
# check rate limited by last time
just_exited, can_make_requests = await uid_requests.rate_limited()
if not can_make_requests:
return (just_exited, can_make_requests)
# add reuqest with last time as now
await uid_requests.add_reuqest()
# can make requests, not blocked
return (False, True)
I use this rate limiter in middleware:
class MainMiddleware(BaseMiddleware):
def __init__(self):
super().__init__()
# set rate limiter
self.rate_limiter = RateLimiter()
async def __call__(
self,
handler: Callable[[TelegramObject, Dict[str, Any]], Awaitable[Any]],
event: Message | CallbackQuery,
data: Dict[str, Any]
):
# check chat type is private
if isinstance(event, Message):
chat_type = event.chat.type
else:
chat_type = event.message.chat.type
if chat_type != ChatType.PRIVATE:
logger.info("chat is not private %s", event)
return
user_id = event.from_user.id
lang = event.from_user.language_code
# check with rete limiter
just_limit_exited, can_make_request = await self.rate_limiter.on_request(
user_id
)
# notify can't make requests
if just_limit_exited:
text = get_text(texts.rate_limit_exited_text, lang)
await event.answer(text)
# cant make requests
if not can_make_request:
return None