Skip to content

Commit 657cdab

Browse files
committed
Allow duplicate jobs to be discarded
1 parent 098a004 commit 657cdab

File tree

8 files changed

+400
-3
lines changed

8 files changed

+400
-3
lines changed

app/models/solid_queue/job/concurrency_controls.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ module ConcurrencyControls
88
included do
99
has_one :blocked_execution
1010

11-
delegate :concurrency_limit, :concurrency_duration, to: :job_class
11+
delegate :concurrency_limit, :concurrency_duration, :concurrency_on_conflict, to: :job_class
1212

1313
before_destroy :unblock_next_blocked_job, if: -> { concurrency_limited? && ready? }
1414
end

app/models/solid_queue/job/executable.rb

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,12 @@ def prepare_for_execution
6767
def dispatch
6868
if acquire_concurrency_lock then ready
6969
else
70-
block
70+
case job_class.concurrency_on_conflict
71+
when :discard
72+
discard_on_conflict
73+
else
74+
block
75+
end
7176
end
7277
end
7378

@@ -104,6 +109,10 @@ def ready
104109
ReadyExecution.create_or_find_by!(job_id: id)
105110
end
106111

112+
def discard_on_conflict
113+
finished!
114+
end
115+
107116
def execution
108117
%w[ ready claimed failed ].reduce(nil) { |acc, status| acc || public_send("#{status}_execution") }
109118
end

lib/active_job/concurrency_controls.rb

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,16 @@ module ConcurrencyControls
1212

1313
class_attribute :concurrency_limit
1414
class_attribute :concurrency_duration, default: SolidQueue.default_concurrency_control_period
15+
class_attribute :concurrency_on_conflict, default: :block
1516
end
1617

1718
class_methods do
18-
def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period)
19+
def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration: SolidQueue.default_concurrency_control_period, on_conflict: :block)
1920
self.concurrency_key = key
2021
self.concurrency_limit = to
2122
self.concurrency_group = group
2223
self.concurrency_duration = duration
24+
self.concurrency_on_conflict = on_conflict
2325
end
2426
end
2527

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
class DiscardOnConflictJob < ApplicationJob
2+
limits_concurrency to: 1, key: ->(value) { value }, on_conflict: :discard
3+
4+
def perform(value)
5+
Rails.logger.info "Performing DiscardOnConflictJob with value: #{value}"
6+
end
7+
end
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
class LimitedDiscardJob < ApplicationJob
2+
limits_concurrency to: 2, key: ->(group, id) { group }, on_conflict: :discard
3+
4+
def perform(group, id)
5+
Rails.logger.info "Performing LimitedDiscardJob with group: #{group}, id: #{id}"
6+
sleep 0.1
7+
end
8+
end
Lines changed: 137 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,137 @@
1+
# frozen_string_literal: true
2+
3+
require "test_helper"
4+
5+
class ConcurrencyDiscardTest < ActiveSupport::TestCase
6+
setup do
7+
@job_result = JobResult.create!(queue_name: "default", status: "test")
8+
end
9+
10+
test "discard jobs when concurrency limit is reached with on_conflict: :discard" do
11+
# Enqueue first job - should be executed
12+
job1 = DiscardOnConflictJob.perform_later(@job_result.id)
13+
14+
# Enqueue second job - should be discarded due to concurrency limit
15+
job2 = DiscardOnConflictJob.perform_later(@job_result.id)
16+
17+
# Enqueue third job - should also be discarded
18+
job3 = DiscardOnConflictJob.perform_later(@job_result.id)
19+
20+
# Check that first job was ready
21+
solid_job1 = SolidQueue::Job.find_by(active_job_id: job1.job_id)
22+
assert solid_job1.ready?
23+
assert solid_job1.ready_execution.present?
24+
25+
# Check that second and third jobs were discarded
26+
solid_job2 = SolidQueue::Job.find_by(active_job_id: job2.job_id)
27+
assert solid_job2.finished?
28+
assert_nil solid_job2.ready_execution
29+
assert_nil solid_job2.blocked_execution
30+
31+
solid_job3 = SolidQueue::Job.find_by(active_job_id: job3.job_id)
32+
assert solid_job3.finished?
33+
assert_nil solid_job3.ready_execution
34+
assert_nil solid_job3.blocked_execution
35+
end
36+
37+
test "block jobs when concurrency limit is reached without on_conflict option" do
38+
# Using SequentialUpdateResultJob which has default blocking behavior
39+
# Enqueue first job - should be executed
40+
job1 = SequentialUpdateResultJob.perform_later(@job_result, name: "A")
41+
42+
# Enqueue second job - should be blocked due to concurrency limit
43+
job2 = SequentialUpdateResultJob.perform_later(@job_result, name: "B")
44+
45+
# Check that second job is blocked
46+
solid_job2 = SolidQueue::Job.find_by(active_job_id: job2.job_id)
47+
assert solid_job2.blocked?
48+
assert solid_job2.blocked_execution.present?
49+
end
50+
51+
test "respect concurrency limit with discard option" do
52+
# Enqueue jobs with limit of 2
53+
job1 = LimitedDiscardJob.perform_later("group1", 1)
54+
job2 = LimitedDiscardJob.perform_later("group1", 2)
55+
job3 = LimitedDiscardJob.perform_later("group1", 3) # Should be discarded
56+
job4 = LimitedDiscardJob.perform_later("group1", 4) # Should be discarded
57+
58+
# Check that first two jobs are ready
59+
solid_job1 = SolidQueue::Job.find_by(active_job_id: job1.job_id)
60+
solid_job2 = SolidQueue::Job.find_by(active_job_id: job2.job_id)
61+
assert solid_job1.ready?
62+
assert solid_job2.ready?
63+
64+
# Check that third and fourth jobs are discarded
65+
solid_job3 = SolidQueue::Job.find_by(active_job_id: job3.job_id)
66+
solid_job4 = SolidQueue::Job.find_by(active_job_id: job4.job_id)
67+
assert solid_job3.finished?
68+
assert solid_job4.finished?
69+
assert_nil solid_job3.ready_execution
70+
assert_nil solid_job4.ready_execution
71+
end
72+
73+
test "discard option works with different concurrency keys" do
74+
# These should not conflict because they have different keys
75+
job1 = DiscardOnConflictJob.perform_later("key1")
76+
job2 = DiscardOnConflictJob.perform_later("key2")
77+
job3 = DiscardOnConflictJob.perform_later("key1") # Should be discarded
78+
79+
# Check that first two jobs are ready (different keys)
80+
solid_job1 = SolidQueue::Job.find_by(active_job_id: job1.job_id)
81+
solid_job2 = SolidQueue::Job.find_by(active_job_id: job2.job_id)
82+
assert solid_job1.ready?
83+
assert solid_job2.ready?
84+
85+
# Check that third job is discarded (same key as first)
86+
solid_job3 = SolidQueue::Job.find_by(active_job_id: job3.job_id)
87+
assert solid_job3.finished?
88+
assert_nil solid_job3.ready_execution
89+
end
90+
91+
test "discarded jobs do not unblock other jobs" do
92+
# Enqueue a job that will be executed
93+
job1 = DiscardOnConflictJob.perform_later(@job_result.id)
94+
95+
# Enqueue a job that will be discarded
96+
job2 = DiscardOnConflictJob.perform_later(@job_result.id)
97+
98+
# The first job should be ready
99+
solid_job1 = SolidQueue::Job.find_by(active_job_id: job1.job_id)
100+
assert solid_job1.ready?
101+
102+
# The second job should be discarded immediately
103+
solid_job2 = SolidQueue::Job.find_by(active_job_id: job2.job_id)
104+
assert solid_job2.finished?
105+
106+
# Complete the first job and release its lock
107+
solid_job1.unblock_next_blocked_job
108+
solid_job1.finished!
109+
110+
# Enqueue another job - it should be ready since the lock is released
111+
job3 = DiscardOnConflictJob.perform_later(@job_result.id)
112+
solid_job3 = SolidQueue::Job.find_by(active_job_id: job3.job_id)
113+
assert solid_job3.ready?
114+
end
115+
116+
test "discarded jobs are marked as finished without execution" do
117+
# Enqueue a job that will be ready
118+
job1 = DiscardOnConflictJob.perform_later("test_key")
119+
120+
# Enqueue a job that will be discarded
121+
job2 = DiscardOnConflictJob.perform_later("test_key")
122+
123+
solid_job1 = SolidQueue::Job.find_by(active_job_id: job1.job_id)
124+
solid_job2 = SolidQueue::Job.find_by(active_job_id: job2.job_id)
125+
126+
# First job should be ready
127+
assert solid_job1.ready?
128+
assert solid_job1.ready_execution.present?
129+
130+
# Second job should be finished without any execution
131+
assert solid_job2.finished?
132+
assert_nil solid_job2.ready_execution
133+
assert_nil solid_job2.claimed_execution
134+
assert_nil solid_job2.failed_execution
135+
assert_nil solid_job2.blocked_execution
136+
end
137+
end

test/models/solid_queue/job_test.rb

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,13 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob
1818
limits_concurrency key: ->(job_result, **) { job_result }, group: "MyGroup"
1919
end
2020

21+
class NonOverlappingDiscardJob < ApplicationJob
22+
limits_concurrency key: ->(job_result, **) { job_result }, on_conflict: :discard
23+
24+
def perform(job_result)
25+
end
26+
end
27+
2128
setup do
2229
@result = JobResult.create!(queue_name: "default")
2330
end
@@ -45,6 +52,82 @@ class NonOverlappingGroupedJob2 < NonOverlappingJob
4552
assert_equal 8, execution.priority
4653
end
4754

55+
test "enqueue jobs with on_conflict discard" do
56+
# First job should be ready
57+
active_job1 = NonOverlappingDiscardJob.new(@result)
58+
assert_ready do
59+
SolidQueue::Job.enqueue(active_job1)
60+
end
61+
job1 = SolidQueue::Job.find_by(active_job_id: active_job1.job_id)
62+
assert job1.ready?
63+
64+
# Second job should be discarded (finished without execution)
65+
active_job2 = NonOverlappingDiscardJob.new(@result)
66+
assert_no_difference -> { SolidQueue::ReadyExecution.count } do
67+
assert_no_difference -> { SolidQueue::BlockedExecution.count } do
68+
SolidQueue::Job.enqueue(active_job2)
69+
end
70+
end
71+
job2 = SolidQueue::Job.find_by(active_job_id: active_job2.job_id)
72+
73+
assert job2.finished?
74+
assert_nil job2.ready_execution
75+
assert_nil job2.blocked_execution
76+
assert_nil job2.claimed_execution
77+
assert_nil job2.failed_execution
78+
79+
# Third job with same key should also be discarded
80+
active_job3 = NonOverlappingDiscardJob.new(@result)
81+
assert_no_difference -> { SolidQueue::ReadyExecution.count } do
82+
SolidQueue::Job.enqueue(active_job3)
83+
end
84+
job3 = SolidQueue::Job.find_by(active_job_id: active_job3.job_id)
85+
86+
assert job3.finished?
87+
end
88+
89+
test "compare blocking vs discard behavior" do
90+
# Test default blocking behavior
91+
blocking_job1 = NonOverlappingJob.new(@result)
92+
assert_ready do
93+
SolidQueue::Job.enqueue(blocking_job1)
94+
end
95+
job1 = SolidQueue::Job.find_by(active_job_id: blocking_job1.job_id)
96+
assert job1.ready?
97+
98+
# Second job should be blocked (not discarded)
99+
blocking_job2 = NonOverlappingJob.new(@result)
100+
assert_difference -> { SolidQueue::BlockedExecution.count }, +1 do
101+
SolidQueue::Job.enqueue(blocking_job2)
102+
end
103+
job2 = SolidQueue::Job.find_by(active_job_id: blocking_job2.job_id)
104+
assert job2.blocked?
105+
assert job2.blocked_execution.present?
106+
assert_not job2.finished?
107+
108+
# Clean up for discard test
109+
SolidQueue::Job.destroy_all
110+
SolidQueue::Semaphore.destroy_all
111+
112+
# Test discard behavior
113+
discard_job1 = NonOverlappingDiscardJob.new(@result)
114+
assert_ready do
115+
SolidQueue::Job.enqueue(discard_job1)
116+
end
117+
job3 = SolidQueue::Job.find_by(active_job_id: discard_job1.job_id)
118+
assert job3.ready?
119+
120+
# Second job should be discarded (not blocked)
121+
discard_job2 = NonOverlappingDiscardJob.new(@result)
122+
assert_no_difference -> { SolidQueue::BlockedExecution.count } do
123+
SolidQueue::Job.enqueue(discard_job2)
124+
end
125+
job4 = SolidQueue::Job.find_by(active_job_id: discard_job2.job_id)
126+
assert job4.finished?
127+
assert_nil job4.blocked_execution
128+
assert_nil job4.ready_execution
129+
end
130+
48131
test "enqueue active job to be scheduled in the future" do
49132
active_job = AddToBufferJob.new(1).set(priority: 8, queue: "test")
50133

0 commit comments

Comments
 (0)