1- import time
2- from asyncio import Event , sleep , TimeoutError
3- from collections import deque
1+ from asyncio import Event , TimeoutError , sleep
42from contextlib import asynccontextmanager
53
64import aiohttp
75from aiohttp .client_exceptions import (
86 ClientConnectionError ,
97 ClientPayloadError ,
108 ClientResponseError ,
11- ServerTimeoutError ,
129)
1310
11+ from .leaky_bucket import LeakyBucketLimiter
1412from .logger import logger
1513from .utils import _url_valid
1614
17- BITRIX_POOL_SIZE = 50
18- BITRIX_RPS = 2.0
1915BITRIX_MAX_BATCH_SIZE = 50
2016BITRIX_MAX_CONCURRENT_REQUESTS = 50
2117
18+ BITRIX_MAX_REQUEST_RUNNING_TIME = 480
19+ BITRIX_MEASUREMENT_PERIOD = 10 * 60
20+
2221MAX_RETRIES = 10
2322
2423RESTORE_CONNECTIONS_FACTOR = 1.3 # скорость восстановления количества запросов
@@ -33,6 +32,14 @@ class ServerError(Exception):
3332 pass
3433
3534
35+ RETRIED_ERRORS = (
36+ ClientPayloadError ,
37+ ClientConnectionError ,
38+ ServerError ,
39+ TimeoutError ,
40+ )
41+
42+
3643class ServerRequestHandler :
3744 """
3845 Используется для контроля скорости доступа к серверам Битрикс.
@@ -48,19 +55,13 @@ def __init__(self, webhook, respect_velocity_policy, client):
4855 self .webhook = self .standardize_webhook (webhook )
4956 self .respect_velocity_policy = respect_velocity_policy
5057
51- self .requests_per_second = BITRIX_RPS
52- self .pool_size = BITRIX_POOL_SIZE
53-
5458 self .active_runs = 0
5559
5660 # если пользователь при инициализации передал клиента со своими настройками,
5761 # то будем использовать его клиента
5862 self .client_provided_by_user = bool (client )
5963 self .session = client
6064
61- # rr - requests register - список отправленных запросов к серверу
62- self .rr = deque ()
63-
6465 # лимит количества одновременных запросов,
6566 # установленный конструктором или пользователем
6667 self .mcr_max = BITRIX_MAX_CONCURRENT_REQUESTS
@@ -76,6 +77,9 @@ def __init__(self, webhook, respect_velocity_policy, client):
7677 # если отрицательное - количество последовательно полученных ошибок
7778 self .successive_results = 0
7879
80+ # rate limiters by method
81+ self .limiters = {} # dict[str, LeakyBucketLimiter]
82+
7983 @staticmethod
8084 def standardize_webhook (webhook ):
8185 """Приводит `webhook` к стандартному виду."""
@@ -120,36 +124,37 @@ async def handle_sessions(self):
120124 if not self .active_runs and self .session and not self .session .closed :
121125 await self .session .close ()
122126
123- async def single_request (self , method , params = None ) -> dict :
127+ async def single_request (self , method : str , params = None ) -> dict :
124128 """Делает единичный запрос к серверу,
125129 с повторными попытками при необходимости."""
126130
127131 while True :
128132
129133 try :
130- result = await self .request_attempt (method , params )
134+ result = await self .request_attempt (method . strip (). lower () , params )
131135 self .success ()
132136 return result
133137
134- except (
135- ClientPayloadError ,
136- ClientConnectionError ,
137- ServerError ,
138- TimeoutError ,
139- ) as err :
138+ except RETRIED_ERRORS as err : # all other exceptions will propagate
140139 self .failure (err )
141140
142141 async def request_attempt (self , method , params = None ) -> dict :
143142 """Делает попытку запроса к серверу, ожидая при необходимости."""
144143
145144 try :
146- async with self .acquire ():
145+ async with self .acquire (method ):
147146 logger .debug (f"Requesting {{'method': { method } , 'params': { params } }}" )
147+
148148 async with self .session .post (
149149 url = self .webhook + method , json = params
150150 ) as response :
151151 json = await response .json (encoding = "utf-8" )
152+
152153 logger .debug ("Response: %s" , json )
154+
155+ request_run_time = json ["time" ]["operating" ]
156+ self .limiters [method ].register (request_run_time )
157+
153158 return json
154159
155160 except ClientResponseError as error :
@@ -175,15 +180,21 @@ def failure(self, err: Exception):
175180 ) from err
176181
177182 @asynccontextmanager
178- async def acquire (self ):
183+ async def acquire (self , method : str ):
179184 """Ожидает, пока не станет безопасно делать запрос к серверу."""
180185
181186 await self .autothrottle ()
182187
183188 async with self .limit_concurrent_requests ():
184189 if self .respect_velocity_policy :
185- async with self .limit_request_velocity ():
190+ if method not in self .limiters :
191+ self .limiters [method ] = LeakyBucketLimiter (
192+ BITRIX_MAX_REQUEST_RUNNING_TIME , BITRIX_MEASUREMENT_PERIOD
193+ )
194+
195+ async with self .limiters [method ].acquire ():
186196 yield
197+
187198 else :
188199 yield
189200
@@ -220,7 +231,7 @@ async def autothrottle(self):
220231
221232 @asynccontextmanager
222233 async def limit_concurrent_requests (self ):
223- """Не позволяет оновременно выполнять
234+ """Не позволяет одновременно выполнять
224235 более `self.mcr_cur_limit` запросов."""
225236
226237 while self .concurrent_requests > self .mcr_cur_limit :
@@ -235,30 +246,3 @@ async def limit_concurrent_requests(self):
235246 finally :
236247 self .concurrent_requests -= 1
237248 self .request_complete .set ()
238-
239- @asynccontextmanager
240- async def limit_request_velocity (self ):
241- """Ограничивает скорость запросов к серверу."""
242-
243- # если пул заполнен, ждать
244- while len (self .rr ) >= self .pool_size :
245- time_from_last_request = time .monotonic () - self .rr [0 ]
246- time_to_wait = 1 / self .requests_per_second - time_from_last_request
247- if time_to_wait > 0 :
248- await sleep (time_to_wait )
249- else :
250- break
251-
252- # зарегистрировать запрос в очереди
253- start_time = time .monotonic ()
254- self .rr .appendleft (start_time )
255-
256- # отдать управление
257- try :
258- yield
259-
260- # подчистить пул
261- finally :
262- trim_time = start_time - self .pool_size / self .requests_per_second
263- while self .rr [- 1 ] < trim_time :
264- self .rr .pop ()
0 commit comments