Skip to content

Commit 6cf4f40

Browse files
author
Omer Katz
authored
Raise proper error when replacing with an empty chain. (celery#6452)
Fixes celery#6451.
1 parent 8fee0bf commit 6cf4f40

File tree

3 files changed

+23
-5
lines changed

3 files changed

+23
-5
lines changed

celery/app/task.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88

99
from celery import current_app, group, states
1010
from celery._state import _task_stack
11-
from celery.canvas import signature
11+
from celery.canvas import _chain, signature
1212
from celery.exceptions import (Ignore, ImproperlyConfigured,
1313
MaxRetriesExceededError, Reject, Retry)
1414
from celery.local import class_property
@@ -882,6 +882,11 @@ def replace(self, sig):
882882
link=self.request.callbacks,
883883
link_error=self.request.errbacks,
884884
)
885+
elif isinstance(sig, _chain):
886+
if not sig.tasks:
887+
raise ImproperlyConfigured(
888+
"Cannot replace with an empty chain"
889+
)
885890

886891
if self.request.chain:
887892
# We need to freeze the new signature with the current task's ID to

t/integration/tasks.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,11 @@ def replace_with_chain_which_raises(self, *args, link_msg=None):
100100
return self.replace(c)
101101

102102

103+
@shared_task(bind=True)
104+
def replace_with_empty_chain(self, *_):
105+
return self.replace(chain())
106+
107+
103108
@shared_task(bind=True)
104109
def add_to_all(self, nums, val):
105110
"""Add the given value to all supplied numbers."""

t/integration/test_canvas.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66

77
from celery import chain, chord, group, signature
88
from celery.backends.base import BaseKeyValueStoreBackend
9-
from celery.exceptions import TimeoutError
9+
from celery.exceptions import ImproperlyConfigured, TimeoutError
1010
from celery.result import AsyncResult, GroupResult, ResultSet
1111

1212
from . import tasks
@@ -15,9 +15,10 @@
1515
add_to_all, add_to_all_to_chord, build_chain_inside_task,
1616
chord_error, collect_ids, delayed_sum,
1717
delayed_sum_with_soft_guard, fail, identity, ids,
18-
print_unicode, raise_error, redis_echo, retry_once,
19-
return_exception, return_priority, second_order_replace1,
20-
tsum, replace_with_chain, replace_with_chain_which_raises)
18+
print_unicode, raise_error, redis_echo,
19+
replace_with_chain, replace_with_chain_which_raises,
20+
replace_with_empty_chain, retry_once, return_exception,
21+
return_priority, second_order_replace1, tsum)
2122

2223
RETRYABLE_EXCEPTIONS = (OSError, ConnectionError, TimeoutError)
2324

@@ -584,6 +585,13 @@ def test_chain_with_eb_replaced_with_chain_with_eb(self, manager):
584585
assert redis_connection.blpop('redis-echo', min(1, TIMEOUT)) is None
585586
redis_connection.delete('redis-echo')
586587

588+
def test_replace_chain_with_empty_chain(self, manager):
589+
r = chain(identity.s(1), replace_with_empty_chain.s()).delay()
590+
591+
with pytest.raises(ImproperlyConfigured,
592+
match="Cannot replace with an empty chain"):
593+
r.get(timeout=TIMEOUT)
594+
587595

588596
class test_result_set:
589597

0 commit comments

Comments
 (0)