Skip to content

Commit 6b1ac3b

Browse files
authored
Support for RQ's Cron Scheduler (WIP) (#725)
* Created DjangoCronScheduler * Successfully create rqcron management command * Cleaned up test * Require RQ >= 2.6 * Typing fixes and bump minimum Django and Python requirement * Fix mypy * Fix mypy
1 parent ab44344 commit 6b1ac3b

File tree

7 files changed

+371
-2
lines changed

7 files changed

+371
-2
lines changed

django_rq/cron.py

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
import logging
2+
from typing import Any, Callable, Dict, Optional, Tuple
3+
4+
from rq.cron import CronScheduler
5+
6+
from .queues import get_connection
7+
8+
9+
class DjangoCronScheduler(CronScheduler):
10+
"""
11+
A Django-RQ bridge for RQ's CronScheduler that integrates with django_rq's
12+
queue configuration system.
13+
14+
Key differences from RQ's CronScheduler:
15+
- Can be initialized without a connection parameter
16+
- Connection is set dynamically when the first job is registered
17+
- Validates that all registered jobs use queues with the same Redis connection
18+
- Integrates with RQ_QUEUES configuration from Django settings
19+
"""
20+
21+
_connection_config: Optional[Dict[str, Any]]
22+
23+
def __init__(self, logging_level: int = logging.INFO, name: str = ""):
24+
"""
25+
Initialize DjangoCronScheduler without Redis connection.
26+
27+
Connection will be set when the first job is registered via register().
28+
29+
Args:
30+
logging_level: Logging level for the scheduler
31+
name: Optional name for the scheduler instance
32+
"""
33+
# Call parent __init__ with connection=None initially
34+
super().__init__(connection=None, logging_level=logging_level)
35+
36+
# Track our django_rq specific state
37+
self._connection_config = None
38+
39+
def _get_connection_config(self, queue_name: str) -> Dict[str, Any]:
40+
"""
41+
Extract Redis connection configuration for a queue to compare connections.
42+
43+
Args:
44+
queue_name: Name of the queue
45+
46+
Returns:
47+
Dictionary of connection parameters for comparison
48+
"""
49+
connection = get_connection(queue_name)
50+
kwargs = connection.connection_pool.connection_kwargs
51+
52+
# Only compare essential connection parameters that determine if
53+
# two connections are to the same Redis instance
54+
essential_params = ["host", "port", "db", "username", "password"]
55+
return {key: kwargs.get(key) for key in essential_params if key in kwargs}
56+
57+
def register(
58+
self,
59+
func: Callable[..., Any],
60+
queue_name: str,
61+
args: Optional[Tuple[Any, ...]] = None,
62+
kwargs: Optional[Dict[str, Any]] = None,
63+
interval: Optional[int] = None,
64+
cron: Optional[str] = None,
65+
timeout: Optional[int] = None,
66+
result_ttl: int = 500,
67+
ttl: Optional[int] = None,
68+
failure_ttl: Optional[int] = None,
69+
meta: Optional[Dict[str, Any]] = None,
70+
):
71+
"""
72+
Register a function to be run at regular intervals.
73+
74+
On first call, this sets the Redis connection for the scheduler.
75+
Subsequent calls validate that the queue uses the same Redis connection.
76+
77+
Args:
78+
func: Function to be scheduled
79+
queue_name: Name of the django_rq queue (must exist in RQ_QUEUES)
80+
args: Arguments to pass to the function
81+
kwargs: Keyword arguments to pass to the function
82+
interval: Interval in seconds (mutually exclusive with cron)
83+
cron: Cron expression (mutually exclusive with interval)
84+
timeout: Job timeout in seconds
85+
result_ttl: How long to keep job results
86+
ttl: Job time-to-live
87+
failure_ttl: How long to keep failed job info
88+
meta: Additional job metadata
89+
90+
Returns:
91+
CronJob instance
92+
93+
Raises:
94+
ValueError: If queue not found or uses different Redis connection
95+
"""
96+
# Get connection for this queue
97+
connection = get_connection(queue_name)
98+
current_config = self._get_connection_config(queue_name)
99+
100+
if self._connection_config:
101+
# Validate that this queue uses the same Redis connection
102+
if current_config != self._connection_config:
103+
raise ValueError(
104+
f"Queue '{queue_name}' uses a different Redis connection than previously "
105+
+ "registered queues. All jobs in a DjangoCronScheduler instance must use "
106+
+ "queues with the same Redis connection."
107+
)
108+
else:
109+
# First registration - set connection
110+
self.connection = connection
111+
self._connection_config = current_config
112+
113+
# Now call parent register method
114+
return super().register(
115+
func=func,
116+
queue_name=queue_name,
117+
args=args,
118+
kwargs=kwargs,
119+
interval=interval,
120+
cron=cron,
121+
timeout=timeout,
122+
result_ttl=result_ttl,
123+
ttl=ttl,
124+
failure_ttl=failure_ttl,
125+
meta=meta,
126+
)
Lines changed: 59 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
import logging
2+
import sys
3+
from typing import Any
4+
5+
from django.core.management.base import BaseCommand, CommandParser
6+
7+
from ...cron import DjangoCronScheduler
8+
9+
10+
class Command(BaseCommand):
11+
"""
12+
Starts the RQ cron scheduler with Django-RQ integration.
13+
14+
Example usage:
15+
python manage.py rqcron cron_config.py
16+
python manage.py rqcron myapp.cron_jobs --logging-level DEBUG
17+
"""
18+
19+
help = "Starts the RQ cron scheduler"
20+
21+
def add_arguments(self, parser: CommandParser) -> None:
22+
# Positional argument for config file/module
23+
parser.add_argument(
24+
'config_path',
25+
help='Path to cron configuration file or module path'
26+
)
27+
28+
# Optional logging level
29+
parser.add_argument(
30+
'--logging-level', '-l',
31+
choices=['DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL'],
32+
default='INFO',
33+
help='Set logging level (default: INFO)'
34+
)
35+
36+
def handle(self, *args: Any, **options: Any) -> None:
37+
"""Main command handler."""
38+
config_path: str = options['config_path']
39+
logging_level: int = getattr(logging, options['logging_level'])
40+
41+
# Create Django cron scheduler
42+
scheduler = DjangoCronScheduler(logging_level=logging_level)
43+
44+
try:
45+
# Load configuration from file
46+
self.stdout.write(f'Loading cron configuration from {config_path}')
47+
scheduler.load_config_from_file(config_path)
48+
49+
# Start the scheduler
50+
job_count = len(scheduler.get_jobs())
51+
self.stdout.write(
52+
self.style.SUCCESS(f'Starting cron scheduler with {job_count} jobs...')
53+
)
54+
55+
scheduler.start()
56+
57+
except KeyboardInterrupt:
58+
self.stdout.write('\nShutting down cron scheduler...')
59+
sys.exit(0)

django_rq/tests/cron_config1.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
"""
2+
Cron configuration file #1 for testing the rqcron management command.
3+
Contains 2 jobs for the main test.
4+
"""
5+
from rq import cron
6+
from .fixtures import say_hello
7+
8+
9+
# Register a simple cron job that runs every minute
10+
cron.register(say_hello, 'default', args=('from cron config1',), cron='* * * * *')
11+
12+
# Register another job that runs every 5 seconds using interval
13+
cron.register(say_hello, 'default', args=('every 5 seconds config1',), interval=5)

django_rq/tests/cron_config2.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
"""
2+
Cron configuration file #2 for testing the rqcron management command.
3+
Contains different jobs for alternative test scenarios.
4+
"""
5+
from rq import cron
6+
from .fixtures import say_hello
7+
8+
9+
# Register a job with different arguments for testing
10+
cron.register(say_hello, 'default', args=('from cron config2',), cron='*/2 * * * *') # Every 2 minutes
11+
12+
# Register another job with a different interval
13+
cron.register(say_hello, 'default', args=('every 10 seconds config2',), interval=10)

django_rq/tests/fixtures.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,3 +35,8 @@ def access_self():
3535

3636
def failing_job():
3737
raise ValueError
38+
39+
40+
def say_hello(name='World'):
41+
"""Simple test function for cron job testing."""
42+
return f"Hello, {name}!"

django_rq/tests/test_cron.py

Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
import tempfile
2+
import os
3+
from contextlib import suppress
4+
from io import StringIO
5+
from unittest import TestCase
6+
from unittest.mock import patch
7+
8+
from django.core.management import call_command
9+
from django.core.management.base import CommandError
10+
from rq.cron import CronJob
11+
12+
from ..cron import DjangoCronScheduler
13+
from ..management.commands.rqcron import Command as RqcronCommand
14+
from .fixtures import say_hello
15+
16+
17+
class CronTest(TestCase):
18+
19+
def test_django_cron_scheduler_init(self):
20+
"""Test DjangoCronScheduler can be initialized without connection."""
21+
scheduler = DjangoCronScheduler()
22+
23+
# Should not have connection until first register() call
24+
self.assertIsNone(scheduler.connection)
25+
self.assertIsNone(scheduler._connection_config)
26+
self.assertEqual(scheduler._cron_jobs, [])
27+
28+
def test_first_register_initializes_connection(self):
29+
"""Test that first register() call initializes the scheduler with queue's connection."""
30+
scheduler = DjangoCronScheduler()
31+
32+
# Register a job with cron expression (run every minute)
33+
cron_job = scheduler.register(say_hello, 'default', cron='* * * * *')
34+
35+
# Should now have connection set
36+
self.assertIsNotNone(scheduler.connection)
37+
self.assertIsNotNone(scheduler._connection_config)
38+
self.assertIsInstance(cron_job, CronJob)
39+
self.assertEqual(len(scheduler.get_jobs()), 1)
40+
41+
# Verify cron expression is set correctly
42+
self.assertEqual(cron_job.cron, '* * * * *')
43+
self.assertIsNone(cron_job.interval)
44+
self.assertIsNotNone(cron_job.next_run_time)
45+
46+
def test_connection_validation(self):
47+
"""Test connection validation for same, compatible, and incompatible queues."""
48+
# Start with test3 queue (localhost:6379, DB=1)
49+
scheduler = DjangoCronScheduler()
50+
51+
# Same queue multiple times should work
52+
job1 = scheduler.register(say_hello, 'test3', interval=60)
53+
job2 = scheduler.register(say_hello, 'test3', interval=120)
54+
55+
self.assertEqual(len(scheduler.get_jobs()), 2)
56+
self.assertEqual(job1.queue_name, 'test3')
57+
self.assertEqual(job2.queue_name, 'test3')
58+
59+
# Compatible queues (same Redis connection) should work
60+
# Both 'test3' and 'async' use localhost:6379 with DB=1
61+
job3 = scheduler.register(say_hello, 'async', interval=180)
62+
self.assertEqual(len(scheduler.get_jobs()), 3)
63+
self.assertEqual(job3.queue_name, 'async')
64+
65+
# Queues having different Redis connections should fail
66+
# 'default' uses DB=0 while test3/async use DB=1
67+
with self.assertRaises(ValueError):
68+
scheduler.register(say_hello, 'default', interval=240)
69+
70+
# Undefined queue_name should fail
71+
scheduler = DjangoCronScheduler()
72+
with self.assertRaises(KeyError):
73+
scheduler.register(say_hello, 'nonexistent_queue', interval=300)
74+
75+
76+
class CronCommandTest(TestCase):
77+
78+
@patch('django_rq.cron.DjangoCronScheduler.start')
79+
def test_rqcron_command(self, mock_start):
80+
"""Test rqcron command execution: success and import errors from load_config_from_file."""
81+
mock_start.return_value = None
82+
83+
# Test 1: Successful execution
84+
out = StringIO()
85+
config_path = 'django_rq.tests.cron_config1'
86+
87+
call_command('rqcron', config_path, stdout=out)
88+
89+
output = out.getvalue()
90+
self.assertIn(f'Loading cron configuration from {config_path}', output)
91+
self.assertIn('Starting cron scheduler with 2 jobs...', output)
92+
mock_start.assert_called_once()
93+
94+
# Test 2: File not found - should raise ImportError from RQ
95+
with self.assertRaises(ImportError) as cm:
96+
call_command('rqcron', 'nonexistent_file.py')
97+
98+
self.assertIn("No module named 'nonexistent_file'", str(cm.exception))
99+
100+
# Test 3: Import error
101+
with self.assertRaises(ImportError) as cm:
102+
call_command('rqcron', 'nonexistent.module.path')
103+
104+
self.assertIn("No module named 'nonexistent'", str(cm.exception))
105+
106+
@patch('django_rq.cron.DjangoCronScheduler.start')
107+
@patch('django_rq.cron.DjangoCronScheduler.load_config_from_file')
108+
def test_rqcron_command_exceptions(self, mock_load_config, mock_start):
109+
"""Test rqcron command exception handling."""
110+
mock_load_config.return_value = None
111+
112+
# Test KeyboardInterrupt handling
113+
mock_start.side_effect = KeyboardInterrupt()
114+
with self.assertRaises(SystemExit):
115+
call_command('rqcron', 'django_rq.tests.cron_config2')
116+
117+
# Test general exception handling - should bubble up as raw exception
118+
mock_load_config.side_effect = Exception("Test error")
119+
with self.assertRaises(Exception) as cm:
120+
call_command('rqcron', 'django_rq.tests.cron_config2')
121+
122+
self.assertEqual(str(cm.exception), "Test error")
123+
124+
def test_rqcron_command_successful_run(self):
125+
"""Test successful rqcron command execution without mocking."""
126+
out = StringIO()
127+
config_path = 'django_rq.tests.cron_config1'
128+
129+
# Use a very short timeout to test actual execution
130+
import signal
131+
132+
def timeout_handler(signum, frame):
133+
raise KeyboardInterrupt()
134+
135+
# Set up a timeout to stop the command after a short time
136+
old_handler = signal.signal(signal.SIGALRM, timeout_handler)
137+
signal.alarm(1) # Stop after 1 second
138+
139+
try:
140+
# The command will be interrupted and may or may not raise SystemExit depending on Django version
141+
with suppress(SystemExit):
142+
call_command('rqcron', config_path, stdout=out)
143+
finally:
144+
signal.alarm(0) # Cancel the alarm
145+
signal.signal(signal.SIGALRM, old_handler)
146+
147+
output = out.getvalue()
148+
self.assertIn(f'Loading cron configuration from {config_path}', output)
149+
self.assertIn('Starting cron scheduler with 2 jobs...', output)

pyproject.toml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ readme = "README.rst"
1010
license = "MIT"
1111
license-files = ["LICENSE.txt"]
1212
authors = [{ name = "Selwin Ong", email = "[email protected]" }]
13-
requires-python = ">=3.8"
14-
dependencies = ["django>=3.2", "rq>=2", "redis>=3.5"]
13+
requires-python = ">=3.9"
14+
dependencies = ["django>=4.2", "rq>=2.6", "redis>=3.5"]
1515
classifiers = [
1616
"Development Status :: 5 - Production/Stable",
1717
"Environment :: Web Environment",
@@ -81,6 +81,10 @@ ignore_missing_imports = true
8181
module = "rq_scheduler.*"
8282
ignore_missing_imports = true
8383

84+
[[tool.mypy.overrides]]
85+
module = "rq.cron"
86+
ignore_missing_imports = true
87+
8488
[[tool.mypy.overrides]]
8589
module = "sentry_sdk.*"
8690
ignore_missing_imports = true

0 commit comments

Comments
 (0)