Skip to content

Commit 01ba385

Browse files
daviddavisOmer Katz
authored andcommitted
Adding ability to retry signal receiver after raised exception (celery#4192)
1 parent 215376d commit 01ba385

File tree

3 files changed

+76
-2
lines changed

3 files changed

+76
-2
lines changed

CONTRIBUTORS.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -243,5 +243,6 @@ Samuel Dion-Girardeau, 2017/05/29
243243
Aydin Sen, 2017/06/14
244244
Preston Moore, 2017/06/18
245245
Nicolas Mota, 2017/08/10
246+
David Davis, 2017/08/11
246247
Martial Pageau, 2017/08/16
247248
Sammie S. Taunton, 2017/08/17

celery/utils/dispatch/signal.py

Lines changed: 46 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,10 +5,12 @@
55
import threading
66
import weakref
77
import warnings
8+
from kombu.utils.functional import retry_over_time
89
from celery.exceptions import CDeprecationWarning
910
from celery.five import python_2_unicode_compatible, range, text_t
1011
from celery.local import PromiseProxy, Proxy
1112
from celery.utils.functional import fun_accepts_kwargs
13+
from celery.utils.time import humanize_seconds
1214
from celery.utils.log import get_logger
1315
try:
1416
from weakref import WeakMethod
@@ -36,6 +38,10 @@ def _make_id(target): # pragma: no cover
3638

3739
NO_RECEIVERS = object()
3840

41+
RECEIVER_RETRY_ERROR = """\
42+
Could not process signal receiver %(receiver)s. Retrying %(when)s...\
43+
"""
44+
3945

4046
@python_2_unicode_compatible
4147
class Signal(object): # pragma: no cover
@@ -103,12 +109,49 @@ def connect(self, *args, **kwargs):
103109
dispatch_uid (Hashable): An identifier used to uniquely identify a
104110
particular instance of a receiver. This will usually be a
105111
string, though it may be anything hashable.
112+
113+
retry (bool): If the signal receiver raises an exception
114+
(e.g. ConnectionError), the receiver will be retried until it
115+
runs successfully. A strong ref to the receiver will be stored
116+
and the `weak` option will be ignored.
106117
"""
107-
def _handle_options(sender=None, weak=True, dispatch_uid=None):
118+
def _handle_options(sender=None, weak=True, dispatch_uid=None,
119+
retry=False):
108120

109121
def _connect_signal(fun):
110-
self._connect_signal(fun, sender, weak, dispatch_uid)
122+
123+
options = {'dispatch_uid': dispatch_uid,
124+
'weak': weak}
125+
126+
def _retry_receiver(retry_fun):
127+
128+
def _try_receiver_over_time(*args, **kwargs):
129+
def on_error(exc, intervals, retries):
130+
interval = next(intervals)
131+
err_msg = RECEIVER_RETRY_ERROR % \
132+
{'receiver': retry_fun,
133+
'when': humanize_seconds(interval, 'in', ' ')}
134+
logger.error(err_msg)
135+
return interval
136+
137+
return retry_over_time(retry_fun, Exception, args,
138+
kwargs, on_error)
139+
140+
return _try_receiver_over_time
141+
142+
if retry:
143+
options['weak'] = False
144+
if not dispatch_uid:
145+
# if there's no dispatch_uid then we need to set the
146+
# dispatch uid to the original func id so we can look
147+
# it up later with the original func id
148+
options['dispatch_uid'] = _make_id(fun)
149+
fun = _retry_receiver(fun)
150+
151+
self._connect_signal(fun, sender, options['weak'],
152+
options['dispatch_uid'])
111153
return fun
154+
112155
return _connect_signal
113156

114157
if args and callable(args[0]):
@@ -158,6 +201,7 @@ def _connect_signal(self, receiver, sender, weak, dispatch_uid):
158201
else:
159202
self.receivers.append((lookup_key, receiver))
160203
self.sender_receivers_cache.clear()
204+
161205
return receiver
162206

163207
def disconnect(self, receiver=None, sender=None, weak=None,

t/unit/utils/test_dispatcher.py

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,3 +143,32 @@ def test_disconnection(self):
143143
finally:
144144
a_signal.disconnect(receiver_3)
145145
self._testIsClean(a_signal)
146+
147+
def test_retry(self):
148+
149+
class non_local:
150+
counter = 1
151+
152+
def succeeds_eventually(val, **kwargs):
153+
non_local.counter += 1
154+
if non_local.counter < 3:
155+
raise ValueError('this')
156+
157+
return val
158+
159+
a_signal.connect(succeeds_eventually, sender=self, retry=True)
160+
try:
161+
result = a_signal.send(sender=self, val='test')
162+
assert non_local.counter == 3
163+
assert result[0][1] == 'test'
164+
finally:
165+
a_signal.disconnect(succeeds_eventually, sender=self)
166+
self._testIsClean(a_signal)
167+
168+
def test_retry_with_dispatch_uid(self):
169+
uid = 'abc123'
170+
a_signal.connect(receiver_1_arg, sender=self, retry=True,
171+
dispatch_uid=uid)
172+
assert a_signal.receivers[0][0][0] == uid
173+
a_signal.disconnect(receiver_1_arg, sender=self, dispatch_uid=uid)
174+
self._testIsClean(a_signal)

0 commit comments

Comments
 (0)