Skip to content

Commit 40d0500

Browse files
author
Ask Solem
committed
celerybeat: Added Interface required for the Django database scheduler to work.
1 parent 1ab0847 commit 40d0500

File tree

3 files changed

+49
-13
lines changed

3 files changed

+49
-13
lines changed

celery/beat.py

Lines changed: 28 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
from celery.execute import send_task
1717
from celery.schedules import schedule
1818
from celery.messaging import establish_connection
19+
from celery.utils import instantiate
1920
from celery.utils.info import humanize_seconds
2021

2122

@@ -84,6 +85,12 @@ def is_due(self):
8485
"""See :meth:`celery.task.base.PeriodicTask.is_due`."""
8586
return self.schedule.is_due(self.last_run_at)
8687

88+
def __repr__(self):
89+
return "<Entry: %s(*%s, **%s) {%s}>" % (self.name,
90+
self.args,
91+
self.kwargs,
92+
self.schedule)
93+
8794

8895
class Scheduler(UserDict):
8996
"""Scheduler for periodic tasks.
@@ -105,6 +112,7 @@ class Scheduler(UserDict):
105112
Maximum time to sleep between re-checking the schedule.
106113
107114
"""
115+
Entry = ScheduleEntry
108116

109117
def __init__(self, schedule=None, logger=None,
110118
max_interval=None):
@@ -117,6 +125,9 @@ def __init__(self, schedule=None, logger=None,
117125
self.cleanup()
118126
self.setup_schedule()
119127

128+
def iterentries(self):
129+
return self.schedule.itervalues()
130+
120131
def tick(self):
121132
"""Run a tick, that is one iteration of the scheduler.
122133
Executes all due tasks."""
@@ -126,7 +137,7 @@ def tick(self):
126137
remaining_times = []
127138
connection = establish_connection()
128139
try:
129-
for entry in self.schedule.values():
140+
for entry in self.iterentries():
130141
is_due, next_time_to_run = entry.is_due()
131142
if is_due:
132143
debug("Scheduler: Sending due task %s" % entry.name)
@@ -145,12 +156,17 @@ def tick(self):
145156

146157
return min(remaining_times + [self.max_interval])
147158

148-
def apply_async(self, entry, **kwargs):
159+
def reserve(self, entry):
160+
new_entry = self.schedule[entry.name] = entry.next()
161+
return new_entry
149162

163+
def apply_async(self, entry, **kwargs):
150164
# Update timestamps and run counts before we actually execute,
151165
# so we have that done if an exception is raised (doesn't schedule
152166
# forever.)
153-
entry = self.schedule[entry.name] = entry.next()
167+
entry = self.reserve(entry)
168+
169+
print("APPLYING: %s" % (entry, ))
154170

155171
try:
156172
result = send_task(entry.name, entry.args, entry.kwargs,
@@ -168,15 +184,15 @@ def maybe_schedule(self, s, relative=False):
168184
return s
169185

170186
def setup_schedule(self):
171-
self.schedule = self.dict_to_entries(conf.CELERYBEAT_SCHEDULE)
187+
self.data = self.dict_to_entries(conf.CELERYBEAT_SCHEDULE)
172188

173189
def dict_to_entries(self, dict_):
174190
entries = {}
175191
for name, entry in dict_.items():
176192
relative = entry.pop("relative", None)
177193
entry["schedule"] = self.maybe_schedule(entry["schedule"],
178194
relative)
179-
entries[name] = ScheduleEntry(**entry)
195+
entries[name] = self.Entry(**entry)
180196
return entries
181197

182198
def cleanup(self):
@@ -194,9 +210,11 @@ class ClockService(object):
194210
def __init__(self, logger=None,
195211
max_interval=conf.CELERYBEAT_MAX_LOOP_INTERVAL,
196212
schedule=conf.CELERYBEAT_SCHEDULE,
197-
schedule_filename=conf.CELERYBEAT_SCHEDULE_FILENAME):
213+
schedule_filename=conf.CELERYBEAT_SCHEDULE_FILENAME,
214+
scheduler_cls=None):
198215
self.logger = logger or log.get_default_logger()
199216
self.max_interval = max_interval
217+
self.scheduler_cls = scheduler_cls or self.scheduler_cls
200218
self._shutdown = threading.Event()
201219
self._stopped = threading.Event()
202220
self.schedule = schedule
@@ -239,9 +257,10 @@ def stop(self, wait=False):
239257
@property
240258
def scheduler(self):
241259
if self._scheduler is None:
242-
self._scheduler = self.scheduler_cls(schedule=self.schedule,
243-
logger=self.logger,
244-
max_interval=self.max_interval)
260+
self._scheduler = instantiate(self.scheduler_cls,
261+
schedule=self.schedule,
262+
logger=self.logger,
263+
max_interval=self.max_interval)
245264
return self._scheduler
246265

247266

celery/bin/celerybeat.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
Path to the schedule database. Defaults to ``celerybeat-schedule``.
99
The extension ".db" will be appended to the filename.
1010
11+
.. cmdoption:: -S, --scheduler
12+
13+
Scheduler class to use. Default is celery.beat.Scheduler
14+
1115
.. cmdoption:: -f, --logfile
1216
1317
Path to log file. If no logfile is specified, ``stderr`` is used.
@@ -43,6 +47,13 @@
4347
help="Path to the schedule database. The extension \
4448
'.db' will be appended to the filename. Default: %s" % (
4549
conf.CELERYBEAT_SCHEDULE_FILENAME)),
50+
optparse.make_option('--max-interval',
51+
default=3600, type="int", dest="max_interval",
52+
help="Maximum time to sleep between rechecking the schedule."),
53+
optparse.make_option('-S', '--scheduler',
54+
default=None,
55+
action="store", dest="scheduler_cls",
56+
help="Scheduler class. Default is celery.beat.Scheduler"),
4657
optparse.make_option('-f', '--logfile', default=conf.CELERYBEAT_LOG_FILE,
4758
action="store", dest="logfile",
4859
help="Path to log file."),
@@ -58,13 +69,17 @@ class Beat(object):
5869

5970
def __init__(self, loglevel=conf.CELERYBEAT_LOG_LEVEL,
6071
logfile=conf.CELERYBEAT_LOG_FILE,
61-
schedule=conf.CELERYBEAT_SCHEDULE_FILENAME, **kwargs):
72+
schedule=conf.CELERYBEAT_SCHEDULE_FILENAME,
73+
max_interval=None,
74+
scheduler_cls=None, **kwargs):
6275
"""Starts the celerybeat task scheduler."""
6376

6477
self.loglevel = loglevel
6578
self.logfile = logfile
6679
self.schedule = schedule
67-
# Setup logging
80+
self.scheduler_cls = scheduler_cls
81+
self.max_interval = max_interval
82+
6883
if not isinstance(self.loglevel, int):
6984
self.loglevel = conf.LOG_LEVELS[self.loglevel.upper()]
7085

@@ -79,7 +94,9 @@ def run(self):
7994
def start_scheduler(self):
8095
from celery.log import setup_logger
8196
logger = setup_logger(self.loglevel, self.logfile, name="celery.beat")
82-
beat = self.ClockService(logger,
97+
beat = self.ClockService(logger=logger,
98+
max_interval=self.max_interval,
99+
scheduler_cls=self.scheduler_cls,
83100
schedule_filename=self.schedule)
84101

85102
try:

celery/task/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919

2020
PERIODIC_DEPRECATION_TEXT = """\
2121
Periodic task classes has been deprecated and will be removed
22-
in celery v1.6.
22+
in celery v3.0.
2323
2424
Please use the CELERYBEAT_SCHEDULE setting instead:
2525

0 commit comments

Comments
 (0)