Skip to content

Commit 07c589f

Browse files
author
Ask Solem
committed
Added the ability to set an expiry date and time for tasks.
Example: # Task expires after one minute from now. task.apply_async(args, kwargs, expires=datetime.now() + timedelta(minutes=1)
1 parent 03bb80a commit 07c589f

File tree

7 files changed

+68
-26
lines changed

7 files changed

+68
-26
lines changed

celery/execute/__init__.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
@with_connection
1818
def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
1919
task_id=None, publisher=None, connection=None, connect_timeout=None,
20-
router=None, **options):
20+
router=None, expires=None, **options):
2121
"""Run a task asynchronously by the celery daemon(s).
2222
2323
:param task: The :class:`~celery.task.base.Task` to run.
@@ -33,9 +33,13 @@ def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
3333
the ``immediate`` setting, they are unrelated).
3434
3535
:keyword eta: A :class:`~datetime.datetime` object that describes the
36-
absolute time when the task should execute. May not be specified
37-
if ``countdown`` is also supplied. (Do not confuse this with the
38-
``immediate`` setting, they are unrelated).
36+
absolute time and date of when the task should execute. May not be
37+
specified if ``countdown`` is also supplied. (Do not confuse this
38+
with the ``immediate`` setting, they are unrelated).
39+
40+
:keyword expires: A :class:`~datetime.datetime` object that describes
41+
the absolute time and date of when the task should expire.
42+
The task will not be executed after the expiration time.
3943
4044
:keyword connection: Re-use existing broker connection instead
4145
of establishing a new one. The ``connect_timeout`` argument is
@@ -96,7 +100,8 @@ def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
96100
exchange_type=exchange_type)
97101
try:
98102
task_id = publish.delay_task(task.name, args, kwargs, task_id=task_id,
99-
countdown=countdown, eta=eta, **options)
103+
countdown=countdown, eta=eta,
104+
expires=expires, **options)
100105
finally:
101106
publisher or publish.close()
102107

@@ -106,7 +111,7 @@ def apply_async(task, args=None, kwargs=None, countdown=None, eta=None,
106111
@with_connection
107112
def send_task(name, args=None, kwargs=None, countdown=None, eta=None,
108113
task_id=None, publisher=None, connection=None, connect_timeout=None,
109-
result_cls=AsyncResult, **options):
114+
result_cls=AsyncResult, expires=None, **options):
110115
"""Send task by name.
111116
112117
Useful if you don't have access to the :class:`~celery.task.base.Task`
@@ -124,7 +129,8 @@ def send_task(name, args=None, kwargs=None, countdown=None, eta=None,
124129
exchange_type=exchange_type)
125130
try:
126131
task_id = publish.delay_task(name, args, kwargs, task_id=task_id,
127-
countdown=countdown, eta=eta, **options)
132+
countdown=countdown, eta=eta,
133+
expires=expires, **options)
128134
finally:
129135
publisher or publish.close()
130136

celery/messaging.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ def declare(self):
5252
_exchanges_declared.add(self.exchange)
5353

5454
def delay_task(self, task_name, task_args=None, task_kwargs=None,
55-
countdown=None, eta=None, task_id=None, taskset_id=None, **kwargs):
55+
countdown=None, eta=None, task_id=None, taskset_id=None,
56+
expires=None, **kwargs):
5657
"""Delay task for execution by the celery nodes."""
5758

5859
task_id = task_id or gen_unique_id()
@@ -73,6 +74,7 @@ def delay_task(self, task_name, task_args=None, task_kwargs=None,
7374
"kwargs": task_kwargs or {},
7475
"retries": kwargs.get("retries", 0),
7576
"eta": eta and eta.isoformat(),
77+
"expires": expires and expires.isoformat(),
7678
}
7779

7880
if taskset_id:

celery/tests/test_task.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,14 +10,13 @@
1010
from celery import messaging
1111
from celery.task.schedules import crontab, crontab_parser
1212
from celery.utils import timeutils
13-
from celery.utils import gen_unique_id
13+
from celery.utils import gen_unique_id, parse_iso8601
1414
from celery.utils.functional import wraps
1515
from celery.result import EagerResult
1616
from celery.execute import send_task
1717
from celery.backends import default_backend
1818
from celery.decorators import task as task_dec
1919
from celery.exceptions import RetryTaskError
20-
from celery.worker.listener import parse_iso8601
2120

2221
from celery.tests.utils import with_eager_tasks
2322

celery/utils/__init__.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,13 @@
77
except ImportError:
88
ctypes = None
99
import importlib
10+
from datetime import datetime
1011
from uuid import UUID, uuid4, _uuid_generate_random
1112
from inspect import getargspec
1213
from itertools import islice
1314

1415
from carrot.utils import rpartition
16+
from dateutil.parser import parse as parse_iso8601
1517

1618
from celery.utils.compat import all, any, defaultdict
1719
from celery.utils.timeutils import timedelta_seconds # was here before
@@ -98,6 +100,15 @@ def noop(*args, **kwargs):
98100
pass
99101

100102

103+
def maybe_iso8601(dt):
104+
"""``Either datetime | str -> datetime or None -> None``"""
105+
if not dt:
106+
return
107+
if isinstance(dt, datetime):
108+
return dt
109+
return parse_iso8601(dt)
110+
111+
101112
def kwdict(kwargs):
102113
"""Make sure keyword arguments are not in unicode.
103114

celery/worker/job.py

Lines changed: 24 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,16 @@
33
import socket
44
import warnings
55

6+
from datetime import datetime
7+
68
from celery import conf
79
from celery import log
810
from celery import platform
911
from celery.datastructures import ExceptionInfo
1012
from celery.execute.trace import TaskTrace
1113
from celery.loaders import current_loader
1214
from celery.registry import tasks
13-
from celery.utils import noop, kwdict, fun_takes_kwargs
15+
from celery.utils import noop, kwdict, fun_takes_kwargs, maybe_iso8601
1416
from celery.utils.compat import any
1517
from celery.utils.mail import mail_admins
1618
from celery.worker import state
@@ -208,12 +210,14 @@ class TaskRequest(object):
208210
def __init__(self, task_name, task_id, args, kwargs,
209211
on_ack=noop, retries=0, delivery_info=None, hostname=None,
210212
email_subject=None, email_body=None, logger=None,
211-
eventer=None, **opts):
213+
eventer=None, eta=None, expires=None, **opts):
212214
self.task_name = task_name
213215
self.task_id = task_id
214216
self.retries = retries
215217
self.args = args
216218
self.kwargs = kwargs
219+
self.eta = eta
220+
self.expires = expires
217221
self.on_ack = on_ack
218222
self.delivery_info = delivery_info or {}
219223
self.hostname = hostname or socket.gethostname()
@@ -224,9 +228,16 @@ def __init__(self, task_name, task_id, args, kwargs,
224228

225229
self.task = tasks[self.task_name]
226230

231+
def maybe_expire(self):
232+
if self.expires and datetime.now() > self.expires:
233+
state.revoked.add(self.task_id)
234+
self.task.backend.mark_as_revoked(self.task_id)
235+
227236
def revoked(self):
228237
if self._already_revoked:
229238
return True
239+
if self.expires:
240+
self.maybe_expire()
230241
if self.task_id in state.revoked:
231242
self.logger.warn("Skipping revoked task: %s[%s]" % (
232243
self.task_name, self.task_id))
@@ -253,6 +264,8 @@ def from_message(cls, message, message_data, logger=None, eventer=None,
253264
args = message_data["args"]
254265
kwargs = message_data["kwargs"]
255266
retries = message_data.get("retries", 0)
267+
eta = maybe_iso8601(message_data.get("eta"))
268+
expires = maybe_iso8601(message_data.get("expires"))
256269

257270
_delivery_info = getattr(message, "delivery_info", {})
258271
delivery_info = dict((key, _delivery_info.get(key))
@@ -265,7 +278,8 @@ def from_message(cls, message, message_data, logger=None, eventer=None,
265278
return cls(task_name, task_id, args, kwdict(kwargs),
266279
retries=retries, on_ack=message.ack,
267280
delivery_info=delivery_info, logger=logger,
268-
eventer=eventer, hostname=hostname)
281+
eventer=eventer, hostname=hostname,
282+
eta=eta, expires=expires)
269283

270284
def extend_with_default_kwargs(self, loglevel, logfile):
271285
"""Extend the tasks keyword arguments with standard task arguments.
@@ -445,3 +459,10 @@ def info(self, safe=False):
445459
"time_start": self.time_start,
446460
"acknowledged": self.acknowledged,
447461
"delivery_info": self.delivery_info}
462+
463+
def shortinfo(self):
464+
return "%s[%s]%s%s" % (
465+
self.task_name,
466+
self.task_id,
467+
self.eta and " eta:[%s]" % (self.eta, ),
468+
self.expires and " expires:[%s]" % (self.expires, ))

celery/worker/listener.py

Lines changed: 8 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -80,11 +80,10 @@
8080

8181
from datetime import datetime
8282

83-
from dateutil.parser import parse as parse_iso8601
8483
from carrot.connection import AMQPConnectionException
8584

8685
from celery import conf
87-
from celery.utils import noop, retry_over_time
86+
from celery.utils import noop, retry_over_time, maybe_iso8601
8887
from celery.worker.job import TaskRequest, InvalidTaskError
8988
from celery.worker.control import ControlDispatch
9089
from celery.worker.heartbeat import Heart
@@ -249,7 +248,7 @@ def consume_messages(self):
249248
self.qos.update()
250249
wait_for_message()
251250

252-
def on_task(self, task, eta=None):
251+
def on_task(self, task):
253252
"""Handle received task.
254253
255254
If the task has an ``eta`` we enter it into the ETA schedule,
@@ -260,21 +259,17 @@ def on_task(self, task, eta=None):
260259
if task.revoked():
261260
return
262261

262+
self.logger.info("Got task from broker: %s" % (task.shortinfo(), ))
263+
263264
self.event_dispatcher.send("task-received", uuid=task.task_id,
264265
name=task.task_name, args=repr(task.args),
265-
kwargs=repr(task.kwargs), retries=task.retries, eta=eta)
266+
kwargs=repr(task.kwargs), retries=task.retries, eta=task.eta)
266267

267-
if eta:
268-
if not isinstance(eta, datetime):
269-
eta = parse_iso8601(eta)
268+
if task.eta:
270269
self.qos.increment()
271-
self.logger.info("Got task from broker: %s[%s] eta:[%s]" % (
272-
task.task_name, task.task_id, eta))
273-
self.eta_schedule.enter(task, eta=eta,
270+
self.eta_schedule.enter(task, eta=task.eta,
274271
callback=self.qos.decrement_eventually)
275272
else:
276-
self.logger.info("Got task from broker: %s[%s]" % (
277-
task.task_name, task.task_id))
278273
self.ready_queue.put(task)
279274

280275
def on_control(self, control):
@@ -300,7 +295,7 @@ def receive_message(self, message_data, message):
300295
str(exc), message_data))
301296
message.ack()
302297
else:
303-
self.on_task(task, eta=message_data.get("eta"))
298+
self.on_task(task)
304299
return
305300

306301
# Handle control command

docs/internals/protocol.rst

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,14 @@ Message format
4242
format. If not provided the message is not scheduled, but will be
4343
executed asap.
4444

45+
* expires (introduced after v2.0.2)
46+
``string`` (ISO 8601)
47+
48+
Expiration date. This is the date and time in ISO 8601 format.
49+
If not provided the message will never expire. The message
50+
will be expired when the message is received and the expiration date
51+
has been exceeded.
52+
4553
Example message
4654
===============
4755

0 commit comments

Comments
 (0)