Skip to content

Commit 267ede3

Browse files
author
Ask Solem
committed
celery.datastructures.TokenBucket: Generic Token Bucket algorithm
1 parent 76bcc62 commit 267ede3

File tree

3 files changed

+63
-50
lines changed

3 files changed

+63
-50
lines changed

celery/datastructures.py

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -265,3 +265,55 @@ def __setitem__(self, key, value):
265265
while len(self) >= self.limit:
266266
self.popitem(last=False)
267267
super(LocalCache, self).__setitem__(key, value)
268+
269+
270+
class TokenBucket(object):
271+
"""Token Bucket Algorithm.
272+
273+
See http://en.wikipedia.org/wiki/Token_Bucket
274+
Most of this code was stolen from an entry in the ASPN Python Cookbook:
275+
http://code.activestate.com/recipes/511490/
276+
277+
:param fill_rate: see :attr:`fill_rate`.
278+
:keyword capacity: see :attr:`capacity`.
279+
280+
.. attribute:: fill_rate
281+
282+
The rate in tokens/second that the bucket will be refilled.
283+
284+
.. attribute:: capacity
285+
286+
Maximum number of tokens in the bucket. Default is ``1``.
287+
288+
.. attribute:: timestamp
289+
290+
Timestamp of the last time a token was taken out of the bucket.
291+
292+
"""
293+
294+
def __init__(self, fill_rate, capacity=1):
295+
self.capacity = float(capacity)
296+
self._tokens = capacity
297+
self.fill_rate = float(fill_rate)
298+
self.timestamp = time.time()
299+
300+
def can_consume(self, tokens=1):
301+
if tokens <= self._get_tokens():
302+
self._tokens -= tokens
303+
return True
304+
return False
305+
306+
def expected_time(self, tokens=1):
307+
"""Returns the expected time in seconds when a new token should be
308+
available. *Note: consumes a token from the bucket*"""
309+
_tokens = self._get_tokens()
310+
tokens = max(tokens, _tokens)
311+
return (tokens - _tokens) / self.fill_rate
312+
313+
def _get_tokens(self):
314+
if self._tokens < self.capacity:
315+
now = time.time()
316+
delta = self.fill_rate * (now - self.timestamp)
317+
self._tokens = min(self.capacity, self._tokens + delta)
318+
self.timestamp = now
319+
return self._tokens

celery/tests/test_buckets.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -183,13 +183,14 @@ def test_auto_add_on_missing(self):
183183
@skip_if_disabled
184184
def test_has_rate_limits(self):
185185
b = buckets.TaskBucket(task_registry=self.registry)
186-
self.assertEqual(b.buckets[TaskA.name].fill_rate, 10)
186+
self.assertEqual(b.buckets[TaskA.name]._bucket.fill_rate, 10)
187187
self.assertIsInstance(b.buckets[TaskB.name], buckets.Queue)
188-
self.assertEqual(b.buckets[TaskC.name].fill_rate, 1)
188+
self.assertEqual(b.buckets[TaskC.name]._bucket.fill_rate, 1)
189189
self.registry.register(TaskD)
190190
b.init_with_registry()
191191
try:
192-
self.assertEqual(b.buckets[TaskD.name].fill_rate, 1000 / 60.0)
192+
self.assertEqual(b.buckets[TaskD.name]._bucket.fill_rate,
193+
1000 / 60.0)
193194
finally:
194195
self.registry.unregister(TaskD)
195196

@@ -284,10 +285,6 @@ def test_items(self):
284285

285286
class test_FastQueue(unittest.TestCase):
286287

287-
def test_can_consume(self):
288-
x = buckets.FastQueue()
289-
self.assertTrue(x.can_consume())
290-
291288
def test_items(self):
292289
x = buckets.FastQueue()
293290
x.put(10)

celery/worker/buckets.py

Lines changed: 7 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from collections import deque
44
from Queue import Queue, Empty as QueueEmpty
55

6+
from celery.datastructures import TokenBucket
67
from celery.utils import all
78
from celery.utils import timeutils
89
from celery.utils.compat import izip_longest, chain_from_iterable
@@ -194,9 +195,6 @@ def clear(self):
194195
def expected_time(self, tokens=1):
195196
return 0
196197

197-
def can_consume(self, tokens=1):
198-
return True
199-
200198
def wait(self, block=True):
201199
return self.get(block=block)
202200

@@ -210,36 +208,19 @@ class TokenBucketQueue(object):
210208
211209
This uses the token bucket algorithm to rate limit the queue on get
212210
operations.
213-
See http://en.wikipedia.org/wiki/Token_Bucket
214-
Most of this code was stolen from an entry in the ASPN Python Cookbook:
215-
http://code.activestate.com/recipes/511490/
216-
217-
:param fill_rate: see :attr:`fill_rate`.
218-
:keyword capacity: see :attr:`capacity`.
219-
220-
.. attribute:: fill_rate
221-
222-
The rate in tokens/second that the bucket will be refilled.
223211
224-
.. attribute:: capacity
225-
226-
Maximum number of tokens in the bucket. Default is ``1``.
227-
228-
.. attribute:: timestamp
229-
230-
Timestamp of the last time a token was taken out of the bucket.
212+
:param fill_rate: The rate in tokens/second that the bucket will
213+
be refilled.
214+
:keyword capacity: Maximum number of tokens in the bucket. Default is 1.
231215
232216
"""
233217
RateLimitExceeded = RateLimitExceeded
234218

235219
def __init__(self, fill_rate, queue=None, capacity=1):
236-
self.capacity = float(capacity)
237-
self._tokens = self.capacity
220+
self._bucket = TokenBucket(fill_rate, capacity)
238221
self.queue = queue
239222
if not self.queue:
240223
self.queue = Queue()
241-
self.fill_rate = float(fill_rate)
242-
self.timestamp = time.time()
243224

244225
def put(self, item, block=True):
245226
"""Put an item into the queue.
@@ -271,7 +252,7 @@ def get(self, block=True):
271252
"""
272253
get = block and self.queue.get or self.queue.get_nowait
273254

274-
if not self.can_consume(1):
255+
if not self._bucket.can_consume(1):
275256
raise RateLimitExceeded()
276257

277258
return get()
@@ -311,27 +292,10 @@ def wait(self, block=False):
311292
return self.get(block=block)
312293
time.sleep(remaining)
313294

314-
def can_consume(self, tokens=1):
315-
"""Consume tokens from the bucket. Returns True if there were
316-
sufficient tokens otherwise False."""
317-
if tokens <= self._get_tokens():
318-
self._tokens -= tokens
319-
return True
320-
return False
321-
322295
def expected_time(self, tokens=1):
323296
"""Returns the expected time in seconds when a new token should be
324297
available."""
325-
tokens = max(tokens, self._get_tokens())
326-
return (tokens - self._get_tokens()) / self.fill_rate
327-
328-
def _get_tokens(self):
329-
if self._tokens < self.capacity:
330-
now = time.time()
331-
delta = self.fill_rate * (now - self.timestamp)
332-
self._tokens = min(self.capacity, self._tokens + delta)
333-
self.timestamp = now
334-
return self._tokens
298+
return self._bucket.expected_time(tokens)
335299

336300
@property
337301
def items(self):

0 commit comments

Comments
 (0)