Skip to content

Commit fb89c11

Browse files
Rewrote schedules
Schedules are cleaned up a lot. They no longer know about actual task instances. Much less repetition and extra arguments allow you to specify more precise times.
1 parent 86134b8 commit fb89c11

File tree

4 files changed

+117
-75
lines changed

4 files changed

+117
-75
lines changed

periodically/backends.py

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from .models import ExecutionRecord
22
from datetime import datetime
33
from .signals import task_complete
4+
from .utils import get_scheduled_time
45
import logging
56
import sys
67

@@ -11,6 +12,7 @@
1112
class BaseBackend(object):
1213
"""
1314
Keeps a schedule of periodic tasks.
15+
1416
"""
1517
_schedules = []
1618

@@ -83,14 +85,12 @@ def _run_tasks(self, tasks=None, fake=None, force=False):
8385
return
8486

8587
# Run the task if it's due (or past due).
86-
if force or schedule.get_scheduled_time(task) <= datetime.now():
87-
previous_record = schedule.get_previous_record(task)
88+
if force or get_scheduled_time(task, schedule) <= datetime.now():
89+
previous_record = ExecutionRecord.objects.get_most_recent(task, schedule)
8890
fake_task = fake or fake is None and previous_record is None
8991

9092
# If we're forcing the task, use the previous scheduled
91-
# time. That way we don't put off any upcoming tasks whose
92-
# schedule's get_scheduled_time method relies on the time
93-
# of the previous execution.
93+
# time.
9494
scheduled_time = previous_record.scheduled_time if force and previous_record else None
9595

9696
if fake_task:
@@ -100,7 +100,8 @@ def _run_tasks(self, tasks=None, fake=None, force=False):
100100

101101
def check_timeout(self, task):
102102
from .settings import DEFAULT_TIMEOUT
103-
for record in ExecutionRecord.objects.filter(task_id=task.task_id, end_time__isnull=True):
103+
for record in ExecutionRecord.objects.filter(task_id=task.task_id,
104+
end_time__isnull=True):
104105
timeout = getattr(task, 'timeout', DEFAULT_TIMEOUT)
105106
running_time = datetime.now() - record.start_time
106107
if running_time > timeout:
@@ -124,7 +125,7 @@ def fake_task(self, task, schedule, scheduled_time=None):
124125
self.logger.info('Faking periodic task "%s"' % task.task_id)
125126

126127
if scheduled_time is None:
127-
scheduled_time = schedule.get_scheduled_time(task)
128+
scheduled_time = get_scheduled_time(task, schedule)
128129

129130
# Create the log for this execution.
130131
log = ExecutionRecord.objects.create(
@@ -150,7 +151,7 @@ def run_task(self, task, schedule, scheduled_time=None):
150151

151152

152153
if scheduled_time is None:
153-
scheduled_time = schedule.get_scheduled_time(task)
154+
scheduled_time = get_scheduled_time(task, schedule)
154155

155156
# Create the log for this execution.
156157
log = ExecutionRecord.objects.create(
@@ -195,11 +196,14 @@ def complete_task(self, task, success=True, extra=None):
195196
Marks a task as complete and performs other post-completion tasks. The
196197
<code>extra</code> argument is a dictionary of values to be passed to
197198
<code>Logger.log()</code> as keyword args.
199+
198200
"""
199201
if extra is not None:
200202
self.logger.log(**extra)
201203

202-
record = ExecutionRecord.objects.filter(task_id=task.task_id, end_time=None).order_by('-start_time')[0]
204+
record = ExecutionRecord.objects \
205+
.filter(task_id=task.task_id, end_time=None) \
206+
.order_by('-start_time')[0]
203207
record.end_time = datetime.now()
204208
record.completed_successfully = success
205209
record.save()
@@ -211,5 +215,6 @@ class DefaultBackend(BaseBackend):
211215
"""
212216
A backend that only runs tasks when explicitly told to (i.e. when its
213217
`run_scheduled_tasks()` method is invoked).
218+
214219
"""
215220
pass

periodically/models.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,16 @@
11
from django.db import models
22

33

4+
class ExecutionRecordManager(models.Manager):
5+
def get_most_recent(self, task=None, schedule=None):
6+
qs = self.get_query_set().order_by('-start_time').all()
7+
if task:
8+
qs = qs.filter(task_id=task.task_id)
9+
if schedule:
10+
qs = qs.filter(schedule_id=schedule.schedule_id)
11+
return qs[0] if qs else None
12+
13+
414
class ExecutionRecord(models.Model):
515
task_id = models.CharField(max_length=255)
616
schedule_id = models.BigIntegerField()
@@ -9,6 +19,8 @@ class ExecutionRecord(models.Model):
919
end_time = models.DateTimeField(blank=True, null=True)
1020
completed_successfully = models.BooleanField(default=False)
1121
is_fake = models.BooleanField(default=False)
12-
22+
23+
objects = ExecutionRecordManager()
24+
1325
def __unicode__(self):
1426
return '%s @ %s' % (self.task_id, self.start_time)

periodically/schedules.py

Lines changed: 74 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -1,106 +1,115 @@
1+
import time as _time
12
from datetime import datetime, date, time, timedelta
2-
from .models import ExecutionRecord
33

44

55
class Schedule(object):
6-
7-
def get_previous_record(self, task):
8-
record_list = ExecutionRecord.objects \
9-
.filter(task_id=task.task_id, schedule_id=self.__hash__()) \
10-
.order_by('-start_time')
11-
if record_list:
12-
return record_list[0]
13-
else:
14-
return None
15-
166
def __eq__(self, other):
17-
return self.__dict__ == other.__dict__
7+
other_id = getattr(other, 'schedule_id', None)
8+
return self.schedule_id == other_id
189

1910
def __ne__(self, other):
2011
return not self.__eq__(other)
2112

13+
def __hash__(self):
14+
return self.schedule_id
15+
16+
17+
class BaseSchedule(object):
18+
"""A base class for the built-in schedules. Don't use this yourself.
19+
20+
"""
2221

23-
class PeriodicSchedule(Schedule):
22+
@property
23+
def schedule_id(self):
24+
class_name = '%s.%s' % (self.__class__.__module__,
25+
self.__class__.__name__)
26+
time_args = tuple([getattr(self, name) for name in self._time_attrs])
27+
return hash((class_name, time_args))
2428

25-
def get_scheduled_time(self, task):
26-
previous_record = self.get_previous_record(task)
27-
if previous_record:
28-
next_run_time = previous_record.scheduled_time \
29-
+ self.repeat_interval
29+
def time_before(self, time):
30+
kwargs = dict((k, getattr(self, k)) for k in self._time_attrs)
31+
t = time.replace(**kwargs)
32+
if t > time:
33+
previous_time = t - self.repeat_interval
3034
else:
31-
next_run_time = datetime.now()
32-
return next_run_time
35+
previous_time = t
36+
return previous_time
3337

34-
def __hash__(self):
35-
class_name = '%s.%s' % (self.__module__, self.__class__)
36-
return hash((class_name, self.repeat_interval))
38+
def time_after(self, time):
39+
return self.time_before(time) + self.repeat_interval
3740

3841

39-
class Daily(Schedule):
42+
class Hourly(BaseSchedule):
43+
repeat_interval = timedelta(hours=1)
44+
_time_attrs = ['minute', 'second', 'microsecond']
4045

41-
def __init__(self, hour=0, minute=0, second=0, microsecond=0):
42-
self.hour = hour
46+
def __init__(self, minute=0, second=0, microsecond=0):
47+
super(Hourly, self).__init__()
4348
self.minute = minute
4449
self.second = second
4550
self.microsecond = microsecond
4651

47-
def __hash__(self):
48-
class_name = '%s.%s' % (self.__module__, self.__class__)
49-
return hash((class_name, self.hour, self.minute, self.second,
50-
self.microsecond))
51-
52-
53-
def get_scheduled_time(self, task):
54-
previous_record = self.get_previous_record(task)
55-
today = date.today()
56-
t = time(self.hour, self.minute, self.second, self.microsecond)
57-
now = datetime.now()
58-
59-
if previous_record:
60-
previous_run_date = \
61-
datetime.date(previous_record.scheduled_time)
62-
scheduled_date = previous_run_date + timedelta(days=1)
63-
scheduled_time = datetime.combine(scheduled_date, t)
64-
use_last_skipped_time = scheduled_time < now
65-
else:
66-
use_last_skipped_time = True
67-
68-
# If the next scheduled time already happened, just use the last
69-
# skipped time. Otherwise, we'd end up scheduling an execution for
70-
# each missed day.
71-
if use_last_skipped_time:
72-
todays_run_time = datetime.combine(today, t)
73-
if todays_run_time < now:
74-
scheduled_time = todays_run_time
75-
else:
76-
scheduled_time = todays_run_time - timedelta(days=1)
52+
def __unicode__(self):
53+
return 'Hourly @ %s' % time(minute=self.minute, second=self.second,
54+
microsecond=self.microsecond)
7755

78-
return scheduled_time
7956

57+
class Daily(BaseSchedule):
58+
repeat_interval = timedelta(days=1)
59+
_time_attrs = ['hour', 'minute', 'second', 'microsecond']
8060

81-
class Hourly(PeriodicSchedule):
82-
repeat_interval = timedelta(hours=1)
61+
def __init__(self, hour=0, minute=0, second=0, microsecond=0):
62+
super(Daily, self).__init__()
63+
self.hour = hour
64+
self.minute = minute
65+
self.second = second
66+
self.microsecond = microsecond
8367

8468
def __unicode__(self):
85-
return 'Hourly'
69+
return 'Daily @ %s' % time(hour=self.hour, minute=self.minute,
70+
second=self.second, microsecond=self.microsecond)
71+
8672

73+
class Weekly(BaseSchedule):
74+
repeat_interval = timedelta(days=7)
75+
_time_attrs = ['day', 'hour', 'minute', 'second', 'microsecond']
8776

88-
class Weekly(PeriodicSchedule):
89-
repeat_interval = timedelta(weeks=1)
77+
def __init__(self, day=0, hour=0, minute=0, second=0, microsecond=0):
78+
super(Weekly, self).__init__()
79+
self.day = day
80+
self.hour = hour
81+
self.minute = minute
82+
self.second = second
83+
self.microsecond = microsecond
9084

9185
def __unicode__(self):
92-
return 'Weekly'
86+
day_name = _time.strftime('%A', _time.strptime(str(self.day), '%w'))
87+
return '%s @ %s' % (day_name, time(hour=self.hour, minute=self.minute,
88+
second=self.second, microsecond=self.microsecond))
9389

9490

95-
class Every(PeriodicSchedule):
91+
class Every(BaseSchedule):
92+
_time_attrs = ['repeat_interval']
9693

9794
def __init__(self, interval=None, days=0, seconds=0, microseconds=0,
98-
milliseconds=0, minutes=0, hours=0, weeks=0):
95+
milliseconds=0, minutes=0, hours=0, weeks=0,
96+
starting_at=datetime(1970, 1, 1)):
9997
super(Every, self).__init__()
10098
if interval is None:
10199
interval = timedelta(days, seconds, microseconds,
102100
milliseconds, minutes, hours, weeks)
103101
self.repeat_interval = interval
102+
self.starting_at = starting_at
103+
104+
def time_before(self, time):
105+
# Uses "perfect time" (no leap years, etc.)
106+
interval = self.repeat_interval.microseconds / 1000 \
107+
+ self.repeat_interval.total_seconds() * 1000
108+
time_since_start = time - self.starting_at
109+
ms_since_start = time_since_start.microseconds / 1000 \
110+
+ time_since_start.total_seconds() * 1000
111+
time = interval * int(ms_since_start / interval)
112+
return datetime.fromtimestamp(time / 1000)
104113

105114
def __unicode__(self):
106115
return 'Every %s' % self.repeat_interval

periodically/utils.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
1+
from datetime import datetime
12
from django.utils import importlib
23
from . import settings
4+
from .models import ExecutionRecord
35

46

57
class InvalidBackendAliasError(Exception):
@@ -12,6 +14,20 @@ def __init__(self, backend):
1214
Exception.__init__(self, 'Could not find backend %s' % backend)
1315

1416

17+
def get_scheduled_time(task, schedule, now=None):
18+
previous_record = ExecutionRecord.objects.get_most_recent(task, schedule)
19+
if not now:
20+
now = datetime.now()
21+
previous_scheduled_time = schedule.time_before(now)
22+
if not previous_record:
23+
scheduled_time = previous_scheduled_time
24+
elif previous_record.scheduled_time < previous_scheduled_time:
25+
scheduled_time = previous_scheduled_time
26+
else:
27+
scheduled_time = schedule.time_after(now)
28+
return scheduled_time
29+
30+
1531
def get_scheduler_backend_class(backend_alias=None):
1632
"""
1733
Accepts a scheduler alias and returns the corresponding backend class.

0 commit comments

Comments
 (0)