Skip to content

Commit 41b1b8f

Browse files
author
Ask Solem
committed
Merge branch 'timer2'
2 parents 8f93062 + f6ec0e3 commit 41b1b8f

File tree

12 files changed

+61
-359
lines changed

12 files changed

+61
-359
lines changed

celery/conf.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
"CELERYD_POOL_PUTLOCKS": True,
5454
"CELERYD_POOL": "celery.concurrency.processes.TaskPool",
5555
"CELERYD_MEDIATOR": "celery.worker.controllers.Mediator",
56-
"CELERYD_ETA_SCHEDULER": "celery.worker.controllers.ScheduleController",
56+
"CELERYD_ETA_SCHEDULER": "timer2.Timer",
5757
"CELERYD_LISTENER": "celery.worker.listener.CarrotListener",
5858
"CELERYD_CONCURRENCY": 0, # defaults to cpu count
5959
"CELERYD_PREFETCH_MULTIPLIER": 4,

celery/tests/test_worker.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
from carrot.backends.base import BaseMessage
99
from carrot.connection import BrokerConnection
10+
from timer2 import Timer
1011

1112
from celery import conf
1213
from celery.decorators import task as task_dec
@@ -17,7 +18,6 @@
1718
from celery.worker.buckets import FastQueue
1819
from celery.worker.job import TaskRequest
1920
from celery.worker.listener import CarrotListener, QoS, RUN
20-
from celery.worker.scheduler import Scheduler
2121

2222
from celery.tests.compat import catch_warnings
2323
from celery.tests.utils import execute_context
@@ -160,7 +160,7 @@ class test_CarrotListener(unittest.TestCase):
160160

161161
def setUp(self):
162162
self.ready_queue = FastQueue()
163-
self.eta_schedule = Scheduler(self.ready_queue)
163+
self.eta_schedule = Timer()
164164
self.logger = get_logger()
165165
self.logger.setLevel(0)
166166

@@ -336,7 +336,7 @@ def qos(self, **kwargs):
336336
items = [entry[2] for entry in self.eta_schedule.queue]
337337
found = 0
338338
for item in items:
339-
if item.task_name == foo_task.name:
339+
if item.args[0].task_name == foo_task.name:
340340
found = True
341341
self.assertTrue(found)
342342
self.assertTrue(l.task_consumer.prefetch_count_incremented)
@@ -388,10 +388,10 @@ def test_receieve_message_eta(self):
388388
l.receive_message(m.decode(), m)
389389

390390
in_hold = self.eta_schedule.queue[0]
391-
self.assertEqual(len(in_hold), 4)
392-
eta, priority, task, on_accept = in_hold
391+
self.assertEqual(len(in_hold), 3)
392+
eta, priority, entry = in_hold
393+
task = entry.args[0]
393394
self.assertIsInstance(task, TaskRequest)
394-
self.assertTrue(callable(on_accept))
395395
self.assertEqual(task.task_name, foo_task.name)
396396
self.assertEqual(task.execute(), 2 * 4 * 8)
397397
self.assertRaises(Empty, self.ready_queue.get_nowait)
@@ -466,7 +466,7 @@ def test_with_rate_limits_disabled(self):
466466

467467
def test_attrs(self):
468468
worker = self.worker
469-
self.assertIsInstance(worker.eta_schedule, Scheduler)
469+
self.assertIsInstance(worker.scheduler, Timer)
470470
self.assertTrue(worker.scheduler)
471471
self.assertTrue(worker.pool)
472472
self.assertTrue(worker.listener)

celery/tests/test_worker_control.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
import socket
22
import unittest2 as unittest
33

4+
from timer2 import Timer
5+
46
from celery import conf
57
from celery.decorators import task
68
from celery.registry import tasks
@@ -10,7 +12,6 @@
1012
from celery.worker.buckets import FastQueue
1113
from celery.worker.job import TaskRequest
1214
from celery.worker.state import revoked
13-
from celery.worker.scheduler import Scheduler
1415

1516
hostname = socket.gethostname()
1617

@@ -44,7 +45,7 @@ def __init__(self):
4445
task_id=gen_unique_id(),
4546
args=(2, 2),
4647
kwargs={}))
47-
self.eta_schedule = Scheduler(self.ready_queue)
48+
self.eta_schedule = Timer()
4849
self.event_dispatcher = Dispatcher()
4950

5051

@@ -81,7 +82,8 @@ def test_dump_schedule(self):
8182
listener = Listener()
8283
panel = self.create_panel(listener=listener)
8384
self.assertFalse(panel.execute("dump_schedule"))
84-
listener.eta_schedule.enter("foo", eta=100)
85+
import operator
86+
listener.eta_schedule.schedule.enter(100, operator.add, (2, 2))
8587
self.assertTrue(panel.execute("dump_schedule"))
8688

8789
def test_dump_reserved(self):
Lines changed: 6 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
1-
import time
21
import unittest2 as unittest
2+
33
from Queue import Queue
44

55
from celery.utils import gen_unique_id
66
from celery.worker.controllers import Mediator
7-
from celery.worker.controllers import BackgroundThread, ScheduleController
87
from celery.worker.state import revoked as revoked_tasks
98

109

@@ -26,35 +25,7 @@ def revoked(self):
2625
return False
2726

2827

29-
class MyBackgroundThread(BackgroundThread):
30-
31-
def on_iteration(self):
32-
time.sleep(1)
33-
34-
35-
class TestBackgroundThread(unittest.TestCase):
36-
37-
def test_on_iteration(self):
38-
self.assertRaises(NotImplementedError,
39-
BackgroundThread().on_iteration)
40-
41-
def test_run(self):
42-
t = MyBackgroundThread()
43-
t._shutdown.set()
44-
t.run()
45-
self.assertTrue(t._stopped.isSet())
46-
47-
def test_start_stop(self):
48-
t = MyBackgroundThread()
49-
t.start()
50-
self.assertFalse(t._shutdown.isSet())
51-
self.assertFalse(t._stopped.isSet())
52-
t.stop()
53-
self.assertTrue(t._shutdown.isSet())
54-
self.assertTrue(t._stopped.isSet())
55-
56-
57-
class TestMediator(unittest.TestCase):
28+
class test_Mediator(unittest.TestCase):
5829

5930
def test_mediator_start__stop(self):
6031
ready_queue = Queue()
@@ -67,7 +38,7 @@ def test_mediator_start__stop(self):
6738
self.assertTrue(m._shutdown.isSet())
6839
self.assertTrue(m._stopped.isSet())
6940

70-
def test_mediator_on_iteration(self):
41+
def test_mediator_move(self):
7142
ready_queue = Queue()
7243
got = {}
7344

@@ -77,11 +48,11 @@ def mycallback(value):
7748
m = Mediator(ready_queue, mycallback)
7849
ready_queue.put(MockTask("George Costanza"))
7950

80-
m.on_iteration()
51+
m.move()
8152

8253
self.assertEqual(got["value"], "George Costanza")
8354

84-
def test_mediator_on_iteration_revoked(self):
55+
def test_mediator_move_revoked(self):
8556
ready_queue = Queue()
8657
got = {}
8758

@@ -94,30 +65,7 @@ def mycallback(value):
9465
revoked_tasks.add(t.task_id)
9566
ready_queue.put(t)
9667

97-
m.on_iteration()
68+
m.move()
9869

9970
self.assertNotIn("value", got)
10071
self.assertTrue(t.acked)
101-
102-
103-
class TestScheduleController(unittest.TestCase):
104-
105-
def test_on_iteration(self):
106-
times = range(10) + [None]
107-
c = ScheduleController(times)
108-
109-
import time
110-
slept = [None]
111-
112-
def _sleep(count):
113-
slept[0] = count
114-
115-
old_sleep = time.sleep
116-
time.sleep = _sleep
117-
try:
118-
for i in times:
119-
c.on_iteration()
120-
res = i or 1
121-
self.assertEqual(slept[0], res)
122-
finally:
123-
time.sleep = old_sleep

celery/tests/test_worker_scheduler.py

Lines changed: 0 additions & 97 deletions
This file was deleted.

celery/worker/__init__.py

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@
88
import traceback
99
from multiprocessing.util import Finalize
1010

11+
from timer2 import Timer
12+
1113
from celery import conf
14+
from celery import log
1215
from celery import registry
1316
from celery import platform
1417
from celery import signals
@@ -18,7 +21,6 @@
1821

1922
from celery.worker import state
2023
from celery.worker.buckets import TaskBucket, FastQueue
21-
from celery.worker.scheduler import Scheduler
2224

2325
RUN = 0x1
2426
CLOSE = 0x2
@@ -141,6 +143,8 @@ def __init__(self, concurrency=None, logfile=None, loglevel=None,
141143
self.task_soft_time_limit = task_soft_time_limit
142144
self.max_tasks_per_child = max_tasks_per_child
143145
self.pool_putlocks = pool_putlocks
146+
self.timer_debug = log.SilenceRepeated(self.logger.debug,
147+
max_iterations=10)
144148
self.db = db
145149
self._finalize = Finalize(self, self.stop, exitpriority=1)
146150

@@ -153,7 +157,6 @@ def __init__(self, concurrency=None, logfile=None, loglevel=None,
153157
self.ready_queue = FastQueue()
154158
else:
155159
self.ready_queue = TaskBucket(task_registry=registry.tasks)
156-
self.eta_schedule = Scheduler(self.ready_queue, logger=self.logger)
157160

158161
self.logger.debug("Instantiating thread components...")
159162

@@ -169,7 +172,9 @@ def __init__(self, concurrency=None, logfile=None, loglevel=None,
169172
callback=self.process_task,
170173
logger=self.logger)
171174
self.scheduler = instantiate(eta_scheduler_cls,
172-
self.eta_schedule, logger=self.logger)
175+
precision=conf.CELERYD_ETA_SCHEDULER_PRECISION,
176+
on_error=self.on_timer_error,
177+
on_tick=self.on_timer_tick)
173178

174179
self.clockservice = None
175180
if self.embed_clockservice:
@@ -179,7 +184,7 @@ def __init__(self, concurrency=None, logfile=None, loglevel=None,
179184
prefetch_count = self.concurrency * conf.CELERYD_PREFETCH_MULTIPLIER
180185
self.listener = instantiate(listener_cls,
181186
self.ready_queue,
182-
self.eta_schedule,
187+
self.scheduler,
183188
logger=self.logger,
184189
hostname=self.hostname,
185190
send_events=self.send_events,
@@ -247,3 +252,11 @@ def _shutdown(self, warm=True):
247252

248253
self.listener.close_connection()
249254
self._state = TERMINATE
255+
256+
def on_timer_error(self, exc_info):
257+
_, exc, _ = exc_info
258+
self.logger.error("Timer error: %r" % (exc, ))
259+
260+
def on_timer_tick(self, delay):
261+
self.timer_debug("Scheduler wake-up! Next eta %s secs." % delay)
262+

celery/worker/control/builtins.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ def rate_limit(panel, task_name, rate_limit, **kwargs):
9999

100100
@Panel.register
101101
def dump_schedule(panel, safe=False, **kwargs):
102-
schedule = panel.listener.eta_schedule
102+
schedule = panel.listener.eta_schedule.schedule
103103
if not schedule.queue:
104104
panel.logger.info("--Empty schedule--")
105105
return []

0 commit comments

Comments
 (0)