Skip to content

Commit 73ea354

Browse files
committed
Tests passing
1 parent 632d80f commit 73ea354

File tree

6 files changed

+207
-80
lines changed

6 files changed

+207
-80
lines changed

celery/app/task.py

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -778,6 +778,21 @@ def update_state(self, task_id=None, state=None, meta=None):
778778
task_id = self.request.id
779779
self.backend.store_result(task_id, meta, state)
780780

781+
def on_success(self, retval, task_id, args, kwargs):
782+
"""Success handler.
783+
784+
Run by the worker if the task executes successfully.
785+
786+
:param retval: The return value of the task.
787+
:param task_id: Unique id of the executed task.
788+
:param args: Original arguments for the executed task.
789+
:param kwargs: Original keyword arguments for the executed task.
790+
791+
The return value of this handler is ignored.
792+
793+
"""
794+
pass
795+
781796
def on_retry(self, exc, task_id, args, kwargs, einfo):
782797
"""Retry handler.
783798
@@ -815,6 +830,24 @@ def on_failure(self, exc, task_id, args, kwargs, einfo):
815830
"""
816831
pass
817832

833+
def after_return(self, status, retval, task_id, args, kwargs, einfo):
834+
"""Handler called after the task returns.
835+
836+
:param status: Current task state.
837+
:param retval: Task return value/exception.
838+
:param task_id: Unique id of the task.
839+
:param args: Original arguments for the task that failed.
840+
:param kwargs: Original keyword arguments for the task
841+
that failed.
842+
843+
:keyword einfo: :class:`~celery.datastructures.ExceptionInfo`
844+
instance, containing the traceback (if any).
845+
846+
The return value of this handler is ignored.
847+
848+
"""
849+
pass
850+
818851
def send_error_email(self, context, exc, **kwargs):
819852
if self.send_error_emails and not self.disable_error_emails:
820853
self.ErrorMail(self, **kwargs).send(context, exc)

celery/task/trace.py

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -65,10 +65,10 @@ def mro_lookup(cls, attr, stop=()):
6565
return node
6666

6767

68-
def defines_custom_call(task):
68+
def task_has_custom(task, attr):
6969
"""Returns true if the task or one of its bases
70-
defines __call__ (excluding the one in BaseTask)."""
71-
return mro_lookup(task.__class__, "__call__", stop=(BaseTask, object))
70+
defines ``attr`` (excluding the one in BaseTask)."""
71+
return mro_lookup(task.__class__, attr, stop=(BaseTask, object))
7272

7373

7474
class TraceInfo(object):
@@ -157,7 +157,7 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
157157
# If the task doesn't define a custom __call__ method
158158
# we optimize it away by simply calling the run method directly,
159159
# saving the extra method call and a line less in the stack trace.
160-
fun = task if defines_custom_call(task) else task.run
160+
fun = task if task_has_custom(task, "__call__") else task.run
161161

162162
loader = loader or current_app.loader
163163
backend = task.backend
@@ -170,8 +170,12 @@ def build_tracer(name, task, loader=None, hostname=None, store_errors=True,
170170
loader_task_init = loader.on_task_init
171171
loader_cleanup = loader.on_process_cleanup
172172

173-
task_on_success = getattr(task, "on_success", None)
174-
task_after_return = getattr(task, "after_return", None)
173+
task_on_success = None
174+
task_after_return = None
175+
if task_has_custom(task, "on_success"):
176+
task_on_success = task.on_success
177+
if task_has_custom(task, "after_return"):
178+
task_after_return = task.after_return
175179

176180
store_result = backend.store_result
177181
backend_cleanup = backend.process_cleanup

celery/tests/bin/test_celeryd.py

Lines changed: 128 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from celery import current_app
2020
from celery.apps import worker as cd
2121
from celery.bin.celeryd import WorkerCommand, main as celeryd_main
22-
from celery.exceptions import ImproperlyConfigured
22+
from celery.exceptions import ImproperlyConfigured, SystemTerminate
2323
from celery.utils.log import ensure_process_aware_logger
2424
from celery.worker import state
2525

@@ -32,12 +32,17 @@ def disable_stdouts(fun):
3232

3333
@wraps(fun)
3434
def disable(*args, **kwargs):
35-
sys.stdout, sys.stderr = WhateverIO(), WhateverIO()
35+
prev_out, prev_err = sys.stdout, sys.stderr
36+
prev_rout, prev_rerr = sys.__stdout__, sys.__stderr__
37+
sys.stdout = sys.__stdout__ = WhateverIO()
38+
sys.stderr = sys.__stderr__ = WhateverIO()
3639
try:
3740
return fun(*args, **kwargs)
3841
finally:
39-
sys.stdout = sys.__stdout__
40-
sys.stderr = sys.__stderr__
42+
sys.stdout = prev_out
43+
sys.stderr = prev_err
44+
sys.__stdout__ = prev_rout
45+
sys.__stderr__ = prev_rerr
4146

4247
return disable
4348

@@ -58,6 +63,9 @@ class Worker(cd.Worker):
5863
class test_Worker(AppCase):
5964
Worker = Worker
6065

66+
def teardown(self):
67+
self.app.conf.CELERY_INCLUDE = ()
68+
6169
@disable_stdouts
6270
def test_queues_string(self):
6371
celery = Celery(set_as_current=False)
@@ -402,19 +410,33 @@ class Signals(platforms.Signals):
402410
def __setitem__(self, sig, handler):
403411
next_handlers[sig] = handler
404412

405-
p, platforms.signals = platforms.signals, Signals()
406-
try:
407-
handlers["SIGINT"]("SIGINT", object())
408-
self.assertTrue(state.should_stop)
409-
finally:
410-
platforms.signals = p
411-
state.should_stop = False
413+
with patch("celery.apps.worker.active_thread_count") as c:
414+
c.return_value = 3
415+
p, platforms.signals = platforms.signals, Signals()
416+
try:
417+
handlers["SIGINT"]("SIGINT", object())
418+
self.assertTrue(state.should_stop)
419+
finally:
420+
platforms.signals = p
421+
state.should_stop = False
412422

413-
try:
414-
next_handlers["SIGINT"]("SIGINT", object())
415-
self.assertTrue(state.should_terminate)
416-
finally:
417-
state.should_terminate = False
423+
try:
424+
next_handlers["SIGINT"]("SIGINT", object())
425+
self.assertTrue(state.should_terminate)
426+
finally:
427+
state.should_terminate = False
428+
429+
with patch("celery.apps.worker.active_thread_count") as c:
430+
c.return_value = 1
431+
p, platforms.signals = platforms.signals, Signals()
432+
try:
433+
with self.assertRaises(SystemExit):
434+
handlers["SIGINT"]("SIGINT", object())
435+
finally:
436+
platforms.signals = p
437+
438+
with self.assertRaises(SystemTerminate):
439+
next_handlers["SIGINT"]("SIGINT", object())
418440

419441
@disable_stdouts
420442
def test_worker_int_handler_only_stop_MainProcess(self):
@@ -424,14 +446,27 @@ def test_worker_int_handler_only_stop_MainProcess(self):
424446
raise SkipTest("only relevant for multiprocessing")
425447
process = current_process()
426448
name, process.name = process.name, "OtherProcess"
427-
try:
428-
worker = self._Worker()
429-
handlers = self.psig(cd.install_worker_int_handler, worker)
430-
handlers["SIGINT"]("SIGINT", object())
431-
self.assertTrue(state.should_stop)
432-
finally:
433-
process.name = name
434-
state.should_stop = False
449+
with patch("celery.apps.worker.active_thread_count") as c:
450+
c.return_value = 3
451+
try:
452+
worker = self._Worker()
453+
handlers = self.psig(cd.install_worker_int_handler, worker)
454+
handlers["SIGINT"]("SIGINT", object())
455+
self.assertTrue(state.should_stop)
456+
finally:
457+
process.name = name
458+
state.should_stop = False
459+
460+
with patch("celery.apps.worker.active_thread_count") as c:
461+
c.return_value = 1
462+
try:
463+
worker = self._Worker()
464+
handlers = self.psig(cd.install_worker_int_handler, worker)
465+
with self.assertRaises(SystemExit):
466+
handlers["SIGINT"]("SIGINT", object())
467+
finally:
468+
process.name = name
469+
state.should_stop = False
435470

436471
@disable_stdouts
437472
def test_install_HUP_not_supported_handler(self):
@@ -448,25 +483,49 @@ def test_worker_term_hard_handler_only_stop_MainProcess(self):
448483
process = current_process()
449484
name, process.name = process.name, "OtherProcess"
450485
try:
486+
with patch("celery.apps.worker.active_thread_count") as c:
487+
c.return_value = 3
488+
worker = self._Worker()
489+
handlers = self.psig(
490+
cd.install_worker_term_hard_handler, worker)
491+
try:
492+
handlers["SIGQUIT"]("SIGQUIT", object())
493+
self.assertTrue(state.should_terminate)
494+
finally:
495+
state.should_terminate = False
496+
with patch("celery.apps.worker.active_thread_count") as c:
497+
c.return_value = 1
498+
worker = self._Worker()
499+
handlers = self.psig(
500+
cd.install_worker_term_hard_handler, worker)
501+
with self.assertRaises(SystemTerminate):
502+
handlers["SIGQUIT"]("SIGQUIT", object())
503+
finally:
504+
process.name = name
505+
506+
@disable_stdouts
507+
def test_worker_term_handler_when_threads(self):
508+
with patch("celery.apps.worker.active_thread_count") as c:
509+
c.return_value = 3
451510
worker = self._Worker()
452-
handlers = self.psig(cd.install_worker_term_hard_handler, worker)
511+
handlers = self.psig(cd.install_worker_term_handler, worker)
453512
try:
454-
handlers["SIGQUIT"]("SIGQUIT", object())
455-
self.assertTrue(state.should_terminate)
513+
handlers["SIGTERM"]("SIGTERM", object())
514+
self.assertTrue(state.should_stop)
456515
finally:
457-
state.should_terminate = False
458-
finally:
459-
process.name = name
516+
state.should_stop = False
460517

461518
@disable_stdouts
462-
def test_worker_term_handler(self):
463-
worker = self._Worker()
464-
handlers = self.psig(cd.install_worker_term_handler, worker)
465-
try:
466-
handlers["SIGTERM"]("SIGTERM", object())
467-
self.assertTrue(state.should_stop)
468-
finally:
469-
state.should_stop = False
519+
def test_worker_term_handler_when_single_thread(self):
520+
with patch("celery.apps.worker.active_thread_count") as c:
521+
c.return_value = 1
522+
worker = self._Worker()
523+
handlers = self.psig(cd.install_worker_term_handler, worker)
524+
try:
525+
with self.assertRaises(SystemExit):
526+
handlers["SIGTERM"]("SIGTERM", object())
527+
finally:
528+
state.should_stop = False
470529

471530
@patch("sys.__stderr__")
472531
def test_worker_cry_handler(self, stderr):
@@ -490,10 +549,18 @@ def test_worker_term_handler_only_stop_MainProcess(self):
490549
process = current_process()
491550
name, process.name = process.name, "OtherProcess"
492551
try:
493-
worker = self._Worker()
494-
handlers = self.psig(cd.install_worker_term_handler, worker)
495-
handlers["SIGTERM"]("SIGTERM", object())
496-
self.assertTrue(state.should_stop)
552+
with patch("celery.apps.worker.active_thread_count") as c:
553+
c.return_value = 3
554+
worker = self._Worker()
555+
handlers = self.psig(cd.install_worker_term_handler, worker)
556+
handlers["SIGTERM"]("SIGTERM", object())
557+
self.assertTrue(state.should_stop)
558+
with patch("celery.apps.worker.active_thread_count") as c:
559+
c.return_value = 1
560+
worker = self._Worker()
561+
handlers = self.psig(cd.install_worker_term_handler, worker)
562+
with self.assertRaises(SystemExit):
563+
handlers["SIGTERM"]("SIGTERM", object())
497564
finally:
498565
process.name = name
499566
state.should_stop = False
@@ -521,11 +588,22 @@ def _execv(*args):
521588
state.should_stop = False
522589

523590
@disable_stdouts
524-
def test_worker_term_hard_handler(self):
525-
worker = self._Worker()
526-
handlers = self.psig(cd.install_worker_term_hard_handler, worker)
527-
try:
528-
handlers["SIGQUIT"]("SIGQUIT", object())
529-
self.assertTrue(state.should_terminate)
530-
finally:
531-
state.should_terminate = False
591+
def test_worker_term_hard_handler_when_threaded(self):
592+
with patch("celery.apps.worker.active_thread_count") as c:
593+
c.return_value = 3
594+
worker = self._Worker()
595+
handlers = self.psig(cd.install_worker_term_hard_handler, worker)
596+
try:
597+
handlers["SIGQUIT"]("SIGQUIT", object())
598+
self.assertTrue(state.should_terminate)
599+
finally:
600+
state.should_terminate = False
601+
602+
@disable_stdouts
603+
def test_worker_term_hard_handler_when_single_threaded(self):
604+
with patch("celery.apps.worker.active_thread_count") as c:
605+
c.return_value = 1
606+
worker = self._Worker()
607+
handlers = self.psig(cd.install_worker_term_hard_handler, worker)
608+
with self.assertRaises(SystemTerminate):
609+
handlers["SIGQUIT"]("SIGQUIT", object())

celery/tests/utilities/test_timer2.py

Lines changed: 27 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -68,12 +68,12 @@ class test_Timer(Case):
6868
@skip_if_quick
6969
def test_enter_after(self):
7070
t = timer2.Timer()
71-
done = [False]
71+
try:
72+
done = [False]
7273

73-
def set_done():
74-
done[0] = True
74+
def set_done():
75+
done[0] = True
7576

76-
try:
7777
t.apply_after(300, set_done)
7878
while not done[0]:
7979
time.sleep(0.1)
@@ -88,25 +88,29 @@ def test_exit_after(self):
8888

8989
def test_apply_interval(self):
9090
t = timer2.Timer()
91-
t.schedule.enter_after = Mock()
92-
93-
myfun = Mock()
94-
t.apply_interval(30, myfun)
95-
96-
self.assertEqual(t.schedule.enter_after.call_count, 1)
97-
args1, _ = t.schedule.enter_after.call_args_list[0]
98-
msec1, tref1, _ = args1
99-
self.assertEqual(msec1, 30)
100-
tref1()
101-
102-
self.assertEqual(t.schedule.enter_after.call_count, 2)
103-
args2, _ = t.schedule.enter_after.call_args_list[1]
104-
msec2, tref2, _ = args2
105-
self.assertEqual(msec2, 30)
106-
tref2.cancelled = True
107-
tref2()
108-
109-
self.assertEqual(t.schedule.enter_after.call_count, 2)
91+
try:
92+
t.schedule.enter_after = Mock()
93+
94+
myfun = Mock()
95+
myfun.__name__ = "myfun"
96+
t.apply_interval(30, myfun)
97+
98+
self.assertEqual(t.schedule.enter_after.call_count, 1)
99+
args1, _ = t.schedule.enter_after.call_args_list[0]
100+
msec1, tref1, _ = args1
101+
self.assertEqual(msec1, 30)
102+
tref1()
103+
104+
self.assertEqual(t.schedule.enter_after.call_count, 2)
105+
args2, _ = t.schedule.enter_after.call_args_list[1]
106+
msec2, tref2, _ = args2
107+
self.assertEqual(msec2, 30)
108+
tref2.cancelled = True
109+
tref2()
110+
111+
self.assertEqual(t.schedule.enter_after.call_count, 2)
112+
finally:
113+
t.stop()
110114

111115
@patch("celery.utils.timer2.logger")
112116
def test_apply_entry_error_handled(self, logger):

0 commit comments

Comments
 (0)