Skip to content

Commit f508afb

Browse files
author
Ask Solem
committed
Periodic Tasks deprecated in favor of CELERYBEAT_SCHEDULE setting.
Example:: CELERYBEAT_SCHEDULE = { "add.often": dict(name="tasks.add", schedule=timedelta(seconds=10), args=(), kwargs={}, options={}), }
1 parent 0ced98a commit f508afb

File tree

5 files changed

+281
-265
lines changed

5 files changed

+281
-265
lines changed

celery/beat.py

Lines changed: 28 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,14 @@
77
import shelve
88
import threading
99
import multiprocessing
10-
from datetime import datetime
10+
from datetime import datetime, timedelta
1111
from UserDict import UserDict
1212

1313
from celery import log
1414
from celery import conf
1515
from celery import platform
1616
from celery.execute import send_task
17+
from celery.schedules import schedule
1718
from celery.messaging import establish_connection
1819
from celery.utils.info import humanize_seconds
1920

@@ -159,15 +160,24 @@ def apply_async(self, entry, **kwargs):
159160
entry.name, exc))
160161
return result
161162

163+
def maybe_schedule(self, s, relative=False):
164+
if isinstance(s, int):
165+
return timedelta(seconds=s)
166+
if isinstance(s, timedelta):
167+
return schedule(s, relative)
168+
return s
169+
162170
def setup_schedule(self):
163-
from datetime import timedelta
164-
from celery.task.schedules import schedule
165-
self.schedule["add.often"] = ScheduleEntry("tasks.add",
166-
schedule(timedelta(seconds=5)),
167-
args=(4, 4))
168-
self.schedule["sleep.often"] = ScheduleEntry("tasks.sleeptask",
169-
schedule(timedelta(minutes=1)),
170-
args=(2, ))
171+
self.schedule = self.dict_to_entries(conf.CELERYBEAT_SCHEDULE)
172+
173+
def dict_to_entries(self, dict_):
174+
entries = {}
175+
for name, entry in dict_.items():
176+
relative = entry.pop("relative", None)
177+
entry["schedule"] = self.maybe_schedule(entry["schedule"],
178+
relative)
179+
entries[name] = ScheduleEntry(**entry)
180+
return entries
171181

172182
def cleanup(self):
173183
pass
@@ -183,25 +193,23 @@ class ClockService(object):
183193

184194
def __init__(self, logger=None,
185195
max_interval=conf.CELERYBEAT_MAX_LOOP_INTERVAL,
196+
schedule=conf.CELERYBEAT_SCHEDULE,
186197
schedule_filename=conf.CELERYBEAT_SCHEDULE_FILENAME):
187198
self.logger = logger or log.get_default_logger()
188199
self.max_interval = max_interval
189-
self.schedule_filename = schedule_filename
190200
self._shutdown = threading.Event()
191201
self._stopped = threading.Event()
192-
self._schedule = None
202+
self.schedule = schedule
193203
self._scheduler = None
194-
self._in_sync = False
195204
silence = self.max_interval < 60 and 10 or 1
196205
self.debug = log.SilenceRepeated(self.logger.debug,
197206
max_iterations=silence)
198207

199208
def start(self, embedded_process=False):
200-
self.logger.info("ClockService: Starting...")
201-
self.logger.debug("ClockService: "
202-
"Ticking with max interval->%s, schedule->%s" % (
203-
humanize_seconds(self.max_interval),
204-
self.schedule_filename))
209+
self.logger.info("Celerybeat: Starting...")
210+
self.logger.debug("Celerybeat: "
211+
"Ticking with max interval->%s" % (
212+
humanize_seconds(self.max_interval)))
205213

206214
if embedded_process:
207215
platform.set_process_title("celerybeat")
@@ -212,7 +220,7 @@ def start(self, embedded_process=False):
212220
if self._shutdown.isSet():
213221
break
214222
interval = self.scheduler.tick()
215-
self.debug("ClockService: Waking up %s." % (
223+
self.debug("Celerybeat: Waking up %s." % (
216224
humanize_seconds(interval, prefix="in ")))
217225
time.sleep(interval)
218226
except (KeyboardInterrupt, SystemExit):
@@ -221,25 +229,13 @@ def start(self, embedded_process=False):
221229
self.sync()
222230

223231
def sync(self):
224-
if self._schedule is not None and not self._in_sync:
225-
self.logger.debug("ClockService: Syncing schedule to disk...")
226-
self._schedule.sync()
227-
self._schedule.close()
228-
self._in_sync = True
229-
self._stopped.set()
232+
self._stopped.set()
230233

231234
def stop(self, wait=False):
232-
self.logger.info("ClockService: Shutting down...")
235+
self.logger.info("Celerybeat: Shutting down...")
233236
self._shutdown.set()
234237
wait and self._stopped.wait() # block until shutdown done.
235238

236-
@property
237-
def schedule(self):
238-
if self._schedule is None:
239-
filename = self.schedule_filename
240-
self._schedule = self.open_schedule(filename=filename)
241-
return self._schedule
242-
243239
@property
244240
def scheduler(self):
245241
if self._scheduler is None:

celery/conf.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
"CELERYD_LOG_COLOR": False,
5757
"CELERYD_LOG_LEVEL": "WARN",
5858
"CELERYD_LOG_FILE": None, # stderr
59+
"CELERYBEAT_SCHEDULE": {},
5960
"CELERYBEAT_SCHEDULE_FILENAME": "celerybeat-schedule",
6061
"CELERYBEAT_MAX_LOOP_INTERVAL": 5 * 60, # five minutes.
6162
"CELERYBEAT_LOG_LEVEL": "INFO",
@@ -196,6 +197,7 @@ def _get(name, default=None, compat=None):
196197
# :--- Celery Beat <- -- --- - ----- -- #
197198
CELERYBEAT_LOG_LEVEL = _get("CELERYBEAT_LOG_LEVEL")
198199
CELERYBEAT_LOG_FILE = _get("CELERYBEAT_LOG_FILE")
200+
CELERYBEAT_SCHEDULE = _get("CELERYBEAT_SCHEDULE")
199201
CELERYBEAT_SCHEDULE_FILENAME = _get("CELERYBEAT_SCHEDULE_FILENAME")
200202
CELERYBEAT_MAX_LOOP_INTERVAL = _get("CELERYBEAT_MAX_LOOP_INTERVAL")
201203

celery/schedules.py

Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
from datetime import datetime
2+
from pyparsing import Word, Literal, ZeroOrMore, Optional, Group, StringEnd, alphas
3+
4+
from celery.utils import is_iterable
5+
from celery.utils.timeutils import timedelta_seconds, weekday, remaining
6+
7+
__all__ = ["schedule", "crontab"]
8+
9+
10+
class schedule(object):
11+
relative = False
12+
13+
def __init__(self, run_every=None, relative=False):
14+
self.run_every = run_every
15+
self.relative = relative
16+
17+
def remaining_estimate(self, last_run_at):
18+
"""Returns when the periodic task should run next as a timedelta."""
19+
return remaining(last_run_at, self.run_every, relative=self.relative)
20+
21+
def is_due(self, last_run_at):
22+
"""Returns tuple of two items ``(is_due, next_time_to_run)``,
23+
where next time to run is in seconds.
24+
25+
See :meth:`celery.task.base.PeriodicTask.is_due` for more information.
26+
27+
"""
28+
rem_delta = self.remaining_estimate(last_run_at)
29+
rem = timedelta_seconds(rem_delta)
30+
if rem == 0:
31+
return True, timedelta_seconds(self.run_every)
32+
return False, rem
33+
34+
35+
class crontab_parser(object):
36+
"""Parser for crontab expressions. Any expression of the form 'groups' (see
37+
BNF grammar below) is accepted and expanded to a set of numbers. These
38+
numbers represent the units of time that the crontab needs to run on::
39+
40+
digit :: '0'..'9'
41+
dow :: 'a'..'z'
42+
number :: digit+ | dow+
43+
steps :: number
44+
range :: number ( '-' number ) ?
45+
numspec :: '*' | range
46+
expr :: numspec ( '/' steps ) ?
47+
groups :: expr ( ',' expr ) *
48+
49+
The parser is a general purpose one, useful for parsing hours, minutes and
50+
day_of_week expressions. Example usage::
51+
52+
minutes = crontab_parser(60).parse("*/15") # yields [0,15,30,45]
53+
hours = crontab_parser(24).parse("*/4") # yields [0,4,8,12,16,20]
54+
day_of_week = crontab_parser(7).parse("*") # yields [0,1,2,3,4,5,6]
55+
56+
"""
57+
58+
def __init__(self, max_=60):
59+
# define the grammar structure
60+
digits = "0123456789"
61+
star = Literal('*')
62+
number = Word(digits) | Word(alphas)
63+
steps = number
64+
range_ = number + Optional(Literal('-') + number)
65+
numspec = star | range_
66+
expr = Group(numspec) + Optional(Literal('/') + steps)
67+
extra_groups = ZeroOrMore(Literal(',') + expr)
68+
groups = expr + extra_groups + StringEnd()
69+
70+
# define parse actions
71+
star.setParseAction(self._expand_star)
72+
number.setParseAction(self._expand_number)
73+
range_.setParseAction(self._expand_range)
74+
expr.setParseAction(self._filter_steps)
75+
extra_groups.setParseAction(self._ignore_comma)
76+
groups.setParseAction(self._join_to_set)
77+
78+
self.max_ = max_
79+
self.parser = groups
80+
81+
@staticmethod
82+
def _expand_number(toks):
83+
try:
84+
i = int(toks[0])
85+
except ValueError:
86+
try:
87+
i = weekday(toks[0])
88+
except KeyError:
89+
raise ValueError("Invalid weekday literal '%s'." % toks[0])
90+
return [i]
91+
92+
@staticmethod
93+
def _expand_range(toks):
94+
if len(toks) > 1:
95+
return range(toks[0], int(toks[2])+1)
96+
else:
97+
return toks[0]
98+
99+
def _expand_star(self, toks):
100+
return range(self.max_)
101+
102+
@staticmethod
103+
def _filter_steps(toks):
104+
numbers = toks[0]
105+
if len(toks) > 1:
106+
steps = toks[2]
107+
return [n for n in numbers if n % steps == 0]
108+
else:
109+
return numbers
110+
111+
@staticmethod
112+
def _ignore_comma(toks):
113+
return filter(lambda x: x != ',', toks)
114+
115+
@staticmethod
116+
def _join_to_set(toks):
117+
return set(toks.asList())
118+
119+
def parse(self, cronspec):
120+
return self.parser.parseString(cronspec).pop()
121+
122+
123+
class crontab(schedule):
124+
"""A crontab can be used as the ``run_every`` value of a
125+
:class:`PeriodicTask` to add cron-like scheduling.
126+
127+
Like a :manpage:`cron` job, you can specify units of time of when
128+
you would like the task to execute. It is a reasonably complete
129+
implementation of cron's features, so it should provide a fair
130+
degree of scheduling needs.
131+
132+
You can specify a minute, an hour, and/or a day of the week in any
133+
of the following formats:
134+
135+
.. attribute:: minute
136+
137+
- A (list of) integers from 0-59 that represent the minutes of
138+
an hour of when execution should occur; or
139+
- A string representing a crontab pattern. This may get pretty
140+
advanced, like `minute="*/15"` (for every quarter) or
141+
`minute="1,13,30-45,50-59/2"`.
142+
143+
.. attribute:: hour
144+
145+
- A (list of) integers from 0-23 that represent the hours of
146+
a day of when execution should occur; or
147+
- A string representing a crontab pattern. This may get pretty
148+
advanced, like `hour="*/3"` (for every three hours) or
149+
`hour="0,8-17/2"` (at midnight, and every two hours during
150+
office hours).
151+
152+
.. attribute:: day_of_week
153+
154+
- A (list of) integers from 0-6, where Sunday = 0 and Saturday =
155+
6, that represent the days of a week that execution should
156+
occur.
157+
- A string representing a crontab pattern. This may get pretty
158+
advanced, like `day_of_week="mon-fri"` (for weekdays only).
159+
(Beware that `day_of_week="*/2"` does not literally mean
160+
"every two days", but "every day that is divisible by two"!)
161+
162+
"""
163+
164+
@staticmethod
165+
def _expand_cronspec(cronspec, max_):
166+
"""Takes the given cronspec argument in one of the forms::
167+
168+
int (like 7)
169+
basestring (like '3-5,*/15', '*', or 'monday')
170+
set (like set([0,15,30,45]))
171+
list (like [8-17])
172+
173+
And convert it to an (expanded) set representing all time unit
174+
values on which the crontab triggers. Only in case of the base
175+
type being 'basestring', parsing occurs. (It is fast and
176+
happens only once for each crontab instance, so there is no
177+
significant performance overhead involved.)
178+
179+
For the other base types, merely Python type conversions happen.
180+
181+
The argument `max_` is needed to determine the expansion of '*'.
182+
183+
"""
184+
if isinstance(cronspec, int):
185+
result = set([cronspec])
186+
elif isinstance(cronspec, basestring):
187+
result = crontab_parser(max_).parse(cronspec)
188+
elif isinstance(cronspec, set):
189+
result = cronspec
190+
elif is_iterable(cronspec):
191+
result = set(cronspec)
192+
else:
193+
raise TypeError("Argument cronspec needs to be of any of the " + \
194+
"following types: int, basestring, or an iterable type. " + \
195+
"'%s' was given." % type(cronspec))
196+
197+
# assure the result does not exceed the max
198+
for number in result:
199+
if number >= max_:
200+
raise ValueError("Invalid crontab pattern. Valid " + \
201+
"range is 0-%d. '%d' was found." % (max_, number))
202+
203+
return result
204+
205+
def __init__(self, minute='*', hour='*', day_of_week='*',
206+
nowfun=datetime.now):
207+
self.hour = self._expand_cronspec(hour, 24)
208+
self.minute = self._expand_cronspec(minute, 60)
209+
self.day_of_week = self._expand_cronspec(day_of_week, 7)
210+
self.nowfun = nowfun
211+
212+
def remaining_estimate(self, last_run_at):
213+
# remaining_estimate controls the frequency of scheduler
214+
# ticks. The scheduler needs to wake up every second in this case.
215+
return 1
216+
217+
def is_due(self, last_run_at):
218+
now = self.nowfun()
219+
last = now - last_run_at
220+
due, when = False, 1
221+
if last.days > 0 or last.seconds > 60:
222+
due = now.isoweekday() % 7 in self.day_of_week and \
223+
now.hour in self.hour and \
224+
now.minute in self.minute
225+
return due, when

0 commit comments

Comments
 (0)