Skip to content

Commit bd2b655

Browse files
committed
Tidy up after merging two branches implementing the same feature
Brought together #523 and #586, and took some parts from each. Clean and enhance the README as well.
1 parent d8e0bd1 commit bd2b655

File tree

5 files changed

+22
-52
lines changed

5 files changed

+22
-52
lines changed

README.md

Lines changed: 7 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -428,25 +428,25 @@ In the case of recurring tasks, if such error is raised when enqueuing the job c
428428

429429
## Concurrency controls
430430

431-
Solid Queue extends Active Job with concurrency controls, that allows you to limit how many jobs of a certain type or with certain arguments can run at the same time. When limited in this way, jobs will be blocked from running, and they'll stay blocked until another job finishes and unblocks them, or after the set expiry time (concurrency limit's _duration_) elapses. Jobs can be configured to either be discarded or blocked.
431+
Solid Queue extends Active Job with concurrency controls, that allows you to limit how many jobs of a certain type or with certain arguments can run at the same time. When limited in this way, by default, jobs will be blocked from running, and they'll stay blocked until another job finishes and unblocks them, or after the set expiry time (concurrency limit's _duration_) elapses. Alternatively, jobs can be configured to be discarded instead of blocked. This means that if a job with certain arguments has already been enqueued, other jobs with the same characteristics (in the same concurrency _class_) won't be enqueued, they'll silently complete.
432432

433433
```ruby
434434
class MyJob < ApplicationJob
435-
limits_concurrency to: max_concurrent_executions, key: ->(arg1, arg2, **) { ... }, duration: max_interval_to_guarantee_concurrency_limit, group: concurrency_group, on_conflict: conflict_behaviour
435+
limits_concurrency to: max_concurrent_executions, key: ->(arg1, arg2, **) { ... }, duration: max_interval_to_guarantee_concurrency_limit, group: concurrency_group, on_conflict: on_conflict_behaviour
436436
437437
# ...
438438
```
439439
- `key` is the only required parameter, and it can be a symbol, a string or a proc that receives the job arguments as parameters and will be used to identify the jobs that need to be limited together. If the proc returns an Active Record record, the key will be built from its class name and `id`.
440440
- `to` is `1` by default.
441441
- `duration` is set to `SolidQueue.default_concurrency_control_period` by default, which itself defaults to `3 minutes`, but that you can configure as well.
442442
- `group` is used to control the concurrency of different job classes together. It defaults to the job class name.
443-
- `on_conflict` controls behaviour when enqueuing a job which is above the max concurrent executions for your configuration.
444-
- (default) `:block`; the job is blocked and is dispatched until another job completes and unblocks it
445-
- `:discard`; the job is discarded
443+
- `on_conflict` controls behaviour when enqueuing a job that conflicts with the concurrency limits configured. It can be set to one of the following:
444+
- (default) `:block`: the job is blocked and is dispatched when another job completes and unblocks it, or when the duration expires.
445+
- `:discard`: the job is discarded (silently finishes). When you choose this option, bear in mind that if a job runs and fails to remove the concurrency lock (or _semaphore_, read below to know more about this), all jobs conflicting with it will be discarded up to the interval defined by `duration` has elapsed.
446446

447447
When a job includes these controls, we'll ensure that, at most, the number of jobs (indicated as `to`) that yield the same `key` will be performed concurrently, and this guarantee will last for `duration` for each job enqueued. Note that there's no guarantee about _the order of execution_, only about jobs being performed at the same time (overlapping).
448448

449-
The concurrency limits use the concept of semaphores when enqueuing, and work as follows: when a job is enqueued, we check if it specifies concurrency controls. If it does, we check the semaphore for the computed concurrency key. If the semaphore is open, we claim it and we set the job as _ready_. Ready means it can be picked up by workers for execution. When the job finishes executing (be it successfully or unsuccessfully, resulting in a failed execution), we signal the semaphore and try to unblock the next job with the same key, if any. Unblocking the next job doesn't mean running that job right away, but moving it from _blocked_ to _ready_. Since something can happen that prevents the first job from releasing the semaphore and unblocking the next job (for example, someone pulling a plug in the machine where the worker is running), we have the `duration` as a failsafe. Jobs that have been blocked for more than duration are candidates to be released, but only as many of them as the concurrency rules allow, as each one would need to go through the semaphore dance check. This means that the `duration` is not really about the job that's enqueued or being run, it's about the jobs that are blocked waiting. It's important to note that after one or more candidate jobs are unblocked (either because a job finishes or because `duration` expires and a semaphore is released), the `duration` timer for the still blocked jobs is reset. This happens indirectly via the expiration time of the semaphore, which is updated.
449+
The concurrency limits use the concept of semaphores when enqueuing, and work as follows: when a job is enqueued, we check if it specifies concurrency controls. If it does, we check the semaphore for the computed concurrency key. If the semaphore is open, we claim it and we set the job as _ready_. Ready means it can be picked up by workers for execution. When the job finishes executing (be it successfully or unsuccessfully, resulting in a failed execution), we signal the semaphore and try to unblock the next job with the same key, if any. Unblocking the next job doesn't mean running that job right away, but moving it from _blocked_ to _ready_. If you're using the `discard` behaviour for `on_conflict`, jobs enqueued while the semaphore is closed will be discarded. Since something can happen that prevents the first job from releasing the semaphore and unblocking the next job (for example, someone pulling a plug in the machine where the worker is running), we have the `duration` as a failsafe. Jobs that have been blocked for more than duration are candidates to be released, but only as many of them as the concurrency rules allow, as each one would need to go through the semaphore dance check. This means that the `duration` is not really about the job that's enqueued or being run, it's about the jobs that are blocked waiting. It's important to note that after one or more candidate jobs are unblocked (either because a job finishes or because `duration` expires and a semaphore is released), the `duration` timer for the still blocked jobs is reset. This happens indirectly via the expiration time of the semaphore, which is updated. In a similar way, when using `discard` as the behaviour to handle conflicts, you might have jobs discarded for up to the `duration` interval if something happens and a running job fails to release the semaphore.
450450

451451

452452
For example:
@@ -485,31 +485,6 @@ Jobs are unblocked in order of priority but queue order is not taken into accoun
485485

486486
Finally, failed jobs that are automatically or manually retried work in the same way as new jobs that get enqueued: they get in the queue for getting an open semaphore, and whenever they get it, they'll be run. It doesn't matter if they had already gotten an open semaphore in the past.
487487

488-
### Discarding conflicting jobs
489-
490-
When configuring `on_conflict` with `:discard`, jobs enqueued above the concurrent execution limit are discarded and failed to be enqueued.
491-
492-
```ruby
493-
class ConcurrentJob < ApplicationJob
494-
limits_concurrency key: ->(record) { record }, on_conflict: :discard
495-
496-
def perform(user); end
497-
end
498-
499-
enqueued_job = ConcurrentJob.perform_later(record)
500-
# => instance of ConcurrentJob
501-
enqueued_job.successfully_enqueued?
502-
# => true
503-
504-
second_enqueued_job = ConcurrentJob.perform_later(record) do |job|
505-
job.successfully_enqueued?
506-
# => false
507-
end
508-
509-
second_enqueued_job
510-
# => false
511-
```
512-
513488
### Performance considerations
514489

515490
Concurrency controls introduce significant overhead (blocked executions need to be created and promoted to ready, semaphores need to be created and updated) so you should consider carefully whether you need them. For throttling purposes, where you plan to have `limit` significantly larger than 1, I'd encourage relying on a limited number of workers per queue instead. For example:
@@ -533,9 +508,8 @@ production:
533508

534509
Or something similar to that depending on your setup. You can also assign a different queue to a job on the moment of enqueuing so you can decide whether to enqueue a job in the throttled queue or another queue depending on the arguments, or pass a block to `queue_as` as explained [here](https://guides.rubyonrails.org/active_job_basics.html#queues).
535510

536-
### Discarding concurrent jobs
537-
538511

512+
In addition, mixing concurrency controls with bulk enqueuing (Active Job's `perform_all_later`) is not a good idea because concurrency controlled job needs to be enqueued one by one to ensure concurrency limits are respected, so you lose all the benefits of bulk enqueuing.
539513

540514
## Failed jobs and retries
541515

app/models/solid_queue/job.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
module SolidQueue
44
class Job < Record
5-
class EnqueueError < ActiveJob::EnqueueError; end
5+
class EnqueueError < StandardError; end
66

77
include Executable, Clearable, Recurrable
88

app/models/solid_queue/job/concurrency_controls.rb

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ module ConcurrencyControls
88
included do
99
has_one :blocked_execution
1010

11-
delegate :concurrency_limit, :concurrency_duration, :concurrency_on_conflict, to: :job_class
11+
delegate :concurrency_limit, :concurrency_duration, to: :job_class
1212

1313
before_destroy :unblock_next_blocked_job, if: -> { concurrency_limited? && ready? }
1414
end
@@ -34,8 +34,8 @@ def blocked?
3434
end
3535

3636
private
37-
def discard_concurrent?
38-
concurrency_on_conflict == :discard
37+
def concurrency_on_conflict
38+
job_class.concurrency_on_conflict.to_s.inquiry
3939
end
4040

4141
def acquire_concurrency_lock
@@ -50,6 +50,14 @@ def release_concurrency_lock
5050
Semaphore.signal(self)
5151
end
5252

53+
def handle_concurrency_conflict
54+
if concurrency_on_conflict.discard?
55+
finished!
56+
else
57+
block
58+
end
59+
end
60+
5361
def block
5462
BlockedExecution.create_or_find_by!(job_id: id)
5563
end

app/models/solid_queue/job/executable.rb

Lines changed: 1 addition & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -74,16 +74,8 @@ def prepare_for_execution
7474

7575
def dispatch
7676
if acquire_concurrency_lock then ready
77-
elsif discard_concurrent?
78-
discard
79-
raise EnqueueError.new("Dispatched job discarded due to concurrent configuration.")
8077
else
81-
case job_class.concurrency_on_conflict
82-
when :discard
83-
discard_on_conflict
84-
else
85-
block
86-
end
78+
handle_concurrency_conflict
8779
end
8880
end
8981

@@ -120,10 +112,6 @@ def ready
120112
ReadyExecution.create_or_find_by!(job_id: id)
121113
end
122114

123-
def discard_on_conflict
124-
finished!
125-
end
126-
127115
def execution
128116
%w[ ready claimed failed ].reduce(nil) { |acc, status| acc || public_send("#{status}_execution") }
129117
end

lib/active_job/concurrency_controls.rb

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,13 @@ module ConcurrencyControls
55
extend ActiveSupport::Concern
66

77
DEFAULT_CONCURRENCY_GROUP = ->(*) { self.class.name }
8+
CONCURRENCY_ON_CONFLICT_BEHAVIOUR = %i[ block discard ]
89

910
included do
1011
class_attribute :concurrency_key, instance_accessor: false
1112
class_attribute :concurrency_group, default: DEFAULT_CONCURRENCY_GROUP, instance_accessor: false
1213

1314
class_attribute :concurrency_limit
14-
class_attribute :concurrency_on_conflict
1515
class_attribute :concurrency_duration, default: SolidQueue.default_concurrency_control_period
1616
class_attribute :concurrency_on_conflict, default: :block
1717
end
@@ -22,7 +22,7 @@ def limits_concurrency(key:, to: 1, group: DEFAULT_CONCURRENCY_GROUP, duration:
2222
self.concurrency_limit = to
2323
self.concurrency_group = group
2424
self.concurrency_duration = duration
25-
self.concurrency_on_conflict = on_conflict
25+
self.concurrency_on_conflict = on_conflict.presence_in(CONCURRENCY_ON_CONFLICT_BEHAVIOUR) || :block
2626
end
2727
end
2828

0 commit comments

Comments
 (0)