Skip to content

Commit 127fb65

Browse files
committed
Canvas should not use current_app, instead take the app from the first task. Closes celery#1249
1 parent 978db4c commit 127fb65

File tree

1 file changed

+23
-17
lines changed

1 file changed

+23
-17
lines changed

celery/canvas.py

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
from kombu.utils import cached_property, fxrange, kwdict, reprcall, uuid
2020

2121
from celery import current_app
22-
from celery.local import Proxy
2322
from celery.utils.compat import chain_from_iterable
2423
from celery.result import AsyncResult, GroupResult
2524
from celery.utils.functional import (
@@ -28,8 +27,6 @@
2827
)
2928
from celery.utils.text import truncate
3029

31-
Chord = Proxy(lambda: current_app.tasks['celery.chord'])
32-
3330

3431
class _getitem_property(object):
3532
"""Attribute -> dict key descriptor.
@@ -278,7 +275,8 @@ def __init__(self, *tasks, **options):
278275
self.subtask_type = 'chain'
279276

280277
def __call__(self, *args, **kwargs):
281-
return self.apply_async(args, kwargs)
278+
if self.tasks:
279+
return self.apply_async(args, kwargs)
282280

283281
@classmethod
284282
def from_dict(self, d):
@@ -288,6 +286,10 @@ def from_dict(self, d):
288286
tasks[0]['args'] = d['args'] + tasks[0]['args']
289287
return chain(*d['kwargs']['tasks'], **kwdict(d['options']))
290288

289+
@property
290+
def type(self):
291+
return self._type or self.tasks[0].type
292+
291293
def __repr__(self):
292294
return ' | '.join(map(repr, self.tasks))
293295
Signature.register_type(chain)
@@ -394,10 +396,16 @@ def from_dict(self, d):
394396
return group(tasks, **kwdict(d['options']))
395397

396398
def __call__(self, *partial_args, **options):
397-
tasks, result, gid, args = self.type.prepare(
398-
options, map(Signature.clone, self.tasks), partial_args,
399-
)
400-
return self.type(tasks, result, gid, args)
399+
tasks = [task.clone() for task in self.tasks]
400+
if not tasks:
401+
return
402+
# taking the app from the first task in the list,
403+
# there may be a better solution to this, e.g.
404+
# consolidate tasks with the same app and apply them in
405+
# batches.
406+
type = tasks[0].type.app.tasks[self['task']]
407+
tasks, result, gid, args = type.prepare(options, tasks, partial_args)
408+
return type(*type.prepare(options, tasks, partial_args))
401409

402410
def _freeze(self, _id=None):
403411
opts = self.options
@@ -428,7 +436,6 @@ def __repr__(self):
428436

429437

430438
class chord(Signature):
431-
Chord = Chord
432439

433440
def __init__(self, header, body=None, task='celery.chord',
434441
args=(), kwargs={}, **options):
@@ -450,8 +457,12 @@ def _unpack_args(header=None, body=None, **kwargs):
450457
# than manually popping things off.
451458
return (header, body), kwargs
452459

460+
@property
461+
def type(self):
462+
return self._type or self.tasks[0].type
463+
453464
def __call__(self, body=None, **kwargs):
454-
_chord = self.Chord
465+
_chord = self.type.app.tasks['celery.chord']
455466
body = (body or self.kwargs['body']).clone()
456467
kwargs = dict(self.kwargs, body=body, **kwargs)
457468
if _chord.app.conf.CELERY_ALWAYS_EAGER:
@@ -481,13 +492,8 @@ def __repr__(self):
481492
return self.body.reprcall(self.tasks)
482493
return '<chord without body: %r>' % (self.tasks, )
483494

484-
@property
485-
def tasks(self):
486-
return self.kwargs['header']
487-
488-
@property
489-
def body(self):
490-
return self.kwargs.get('body')
495+
tasks = _getitem_property('kwargs.header')
496+
body = _getitem_property('kwargs.body')
491497
Signature.register_type(chord)
492498

493499

0 commit comments

Comments
 (0)