Skip to content

Commit d0db7d4

Browse files
stovenatorAsk Solem
authored andcommitted
Support exchange_type argument to apply_async(), defaults to Task.exchange_type
1 parent 273f1e4 commit d0db7d4

File tree

2 files changed

+16
-3
lines changed

2 files changed

+16
-3
lines changed

celery/execute/__init__.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,10 @@ def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
4040
:keyword exchange: The named exchange to send the task to. Defaults to
4141
:attr:`celery.task.base.Task.exchange`.
4242
43+
:keyword exchange_type: The exchange type to initalize the exchange as
44+
if not already declared.
45+
Defaults to :attr:`celery.task.base.Task.exchange_type`.
46+
4347
:keyword immediate: Request immediate delivery. Will raise an exception
4448
if the task cannot be routed to a worker immediately.
4549
(Do not confuse this parameter with the ``countdown`` and ``eta``
@@ -72,8 +76,10 @@ def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
7276
task = tasks[task.name] # get instance from registry
7377
options = dict(extract_exec_options(task), **options)
7478
exchange = options.get("exchange")
79+
exchange_type = options.get("exchange_type")
7580

76-
publish = publisher or task.get_publisher(connection, exchange=exchange)
81+
publish = publisher or task.get_publisher(connection, exchange=exchange,
82+
exchange_type=exchange_type)
7783
try:
7884
task_id = publish.delay_task(task.name, args, kwargs, task_id=task_id,
7985
countdown=countdown, eta=eta, **options)
@@ -89,7 +95,10 @@ def send_task(name, args=None, kwargs=None, countdown=None, eta=None,
8995
result_cls=AsyncResult, **options):
9096

9197
exchange = options.get("exchange")
92-
publish = publisher or TaskPublisher(connection, exchange=exchange)
98+
exchange_type = options.get("exchange_type")
99+
100+
publish = publisher or TaskPublisher(connection, exchange=exchange,
101+
exchange_type=exchange_type)
93102
try:
94103
task_id = publish.delay_task(name, args, kwargs, task_id=task_id,
95104
countdown=countdown, eta=eta, **options)

celery/task/base.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,6 +165,7 @@ class Task(object):
165165
rate_limit = conf.DEFAULT_RATE_LIMIT
166166
rate_limit_queue_type = Queue
167167
backend = default_backend
168+
exchange_type = conf.DEFAULT_EXCHANGE_TYPE
168169

169170
MaxRetriesExceededError = MaxRetriesExceededError
170171

@@ -208,7 +209,7 @@ def establish_connection(self,
208209

209210
@classmethod
210211
def get_publisher(self, connection=None, exchange=None,
211-
connect_timeout=conf.BROKER_CONNECTION_TIMEOUT):
212+
connect_timeout=conf.BROKER_CONNECTION_TIMEOUT,exchange_type=None):
212213
"""Get a celery task message publisher.
213214
214215
:rtype: :class:`celery.messaging.TaskPublisher`.
@@ -223,9 +224,12 @@ def get_publisher(self, connection=None, exchange=None,
223224
"""
224225
if exchange is None:
225226
exchange = self.exchange
227+
if exchange_type is None:
228+
exchange_type = self.exchange_type
226229
connection = connection or self.establish_connection(connect_timeout)
227230
return TaskPublisher(connection=connection,
228231
exchange=exchange,
232+
exchange_type=exchange_type,
229233
routing_key=self.routing_key)
230234

231235
@classmethod

0 commit comments

Comments
 (0)