Skip to content

Commit bf635be

Browse files
committed
Adds batch processing support
This builds on some work that I did for Sidekiq actually. I had to put a database in front of sidekiq to make it work like I wanted and to ensure that no duplicates are processed while some stuff needs to happen at the end.
1 parent 37ae2ce commit bf635be

File tree

10 files changed

+771
-3
lines changed

10 files changed

+771
-3
lines changed

README.md

Lines changed: 69 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
Solid Queue is a DB-based queuing backend for [Active Job](https://edgeguides.rubyonrails.org/active_job_basics.html), designed with simplicity and performance in mind.
44

5-
Besides regular job enqueuing and processing, Solid Queue supports delayed jobs, concurrency controls, recurring jobs, pausing queues, numeric priorities per job, priorities by queue order, and bulk enqueuing (`enqueue_all` for Active Job's `perform_all_later`).
5+
Besides regular job enqueuing and processing, Solid Queue supports delayed jobs, concurrency controls, batch processing, recurring jobs, pausing queues, numeric priorities per job, priorities by queue order, and bulk enqueuing (`enqueue_all` for Active Job's `perform_all_later`).
66

77
Solid Queue can be used with SQL databases such as MySQL, PostgreSQL or SQLite, and it leverages the `FOR UPDATE SKIP LOCKED` clause, if available, to avoid blocking and waiting on locks when polling jobs. It relies on Active Job for retries, discarding, error handling, serialization, or delays, and it's compatible with Ruby on Rails's multi-threading.
88

@@ -505,6 +505,74 @@ production:
505505

506506
Or something similar to that depending on your setup. You can also assign a different queue to a job on the moment of enqueuing so you can decide whether to enqueue a job in the throttled queue or another queue depending on the arguments, or pass a block to `queue_as` as explained [here](https://guides.rubyonrails.org/active_job_basics.html#queues).
507507

508+
## Batch processing
509+
510+
Solid Queue supports grouping jobs into batches, allowing you to track their collective progress, run callbacks when all jobs complete, and manage complex workflows. This is useful for processing large datasets in parallel, importing files, or any scenario where you need to coordinate multiple jobs.
511+
512+
To create a batch, use `perform_batch_later`:
513+
514+
```ruby
515+
# Simple batch
516+
batch = MyJob.perform_batch_later([
517+
{ user_id: 1, action: "update" },
518+
{ user_id: 2, action: "update" },
519+
{ user_id: 3, action: "update" }
520+
])
521+
522+
puts batch.batch_id # => "550e8400-e29b-41d4-a716..."
523+
puts batch.total_jobs # => 3
524+
```
525+
526+
You can specify callbacks to run when the batch completes:
527+
528+
```ruby
529+
batch = DataImportJob.perform_batch_later(
530+
import_rows,
531+
on_success: { job: ImportSuccessJob, args: { email: "[email protected]" } },
532+
on_failure: { job: ImportFailureJob, args: { email: "[email protected]" } },
533+
on_complete: { job: ImportCompleteJob },
534+
metadata: { source: "api", imported_by: current_user.id }
535+
)
536+
```
537+
538+
- `on_success`: Runs when all jobs complete successfully
539+
- `on_failure`: Runs if any job fails
540+
- `on_complete`: Always runs when the batch finishes
541+
542+
Jobs can check if they're part of a batch:
543+
544+
```ruby
545+
class DataImportJob < ApplicationJob
546+
def perform(row_data)
547+
if in_batch?
548+
Rails.logger.info "Processing row as part of batch #{batch_id}"
549+
Rails.logger.info "Batch progress: #{batch_progress}%"
550+
end
551+
552+
# Process the row...
553+
end
554+
end
555+
```
556+
557+
You can query and monitor batches:
558+
559+
```ruby
560+
# Find a batch
561+
batch = SolidQueue::Batch.find_by(batch_id: batch_id)
562+
563+
# Check progress
564+
batch.pending_jobs # => 10
565+
batch.completed_jobs # => 85
566+
batch.failed_jobs # => 5
567+
batch.progress_percentage # => 90.0
568+
batch.finished? # => false
569+
570+
# Query batches by status
571+
SolidQueue::Batch.pending
572+
SolidQueue::Batch.completed
573+
SolidQueue::Batch.failed
574+
```
575+
508576
## Failed jobs and retries
509577

510578
Solid Queue doesn't include any automatic retry mechanism, it [relies on Active Job for this](https://edgeguides.rubyonrails.org/active_job_basics.html#retrying-or-discarding-failed-jobs). Jobs that fail will be kept in the system, and a _failed execution_ (a record in the `solid_queue_failed_executions` table) will be created for these. The job will stay there until manually discarded or re-enqueued. You can do this in a console as:
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
# frozen_string_literal: true
2+
3+
module SolidQueue
4+
class BatchUpdateJob < ActiveJob::Base
5+
queue_as :default
6+
7+
discard_on ActiveRecord::RecordNotFound
8+
9+
def perform(batch_id, job_id)
10+
batch = Batch.find_by!(batch_id: batch_id)
11+
job = Job.find_by!(id: job_id)
12+
13+
# Only process if the job is actually finished and belongs to this batch
14+
return unless job.finished? && job.batch_id == batch_id
15+
16+
batch.job_finished!(job)
17+
rescue => e
18+
Rails.logger.error "[SolidQueue] BatchUpdateJob failed for batch #{batch_id}, job #{job_id}: #{e.message}"
19+
raise
20+
end
21+
end
22+
end

app/models/solid_queue/batch.rb

Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
# frozen_string_literal: true
2+
3+
module SolidQueue
4+
class Batch < Record
5+
serialize :on_complete_job_args, coder: JSON
6+
serialize :on_success_job_args, coder: JSON
7+
serialize :on_failure_job_args, coder: JSON
8+
serialize :metadata, coder: JSON
9+
10+
STATUSES = %w[pending processing completed failed]
11+
12+
validates :batch_id, uniqueness: true
13+
validates :status, inclusion: { in: STATUSES }
14+
15+
has_many :jobs, foreign_key: :batch_id, primary_key: :batch_id, dependent: :nullify
16+
17+
scope :pending, -> { where(status: "pending") }
18+
scope :processing, -> { where(status: "processing") }
19+
scope :completed, -> { where(status: "completed") }
20+
scope :failed, -> { where(status: "failed") }
21+
scope :finished, -> { where(status: %w[completed failed]) }
22+
scope :unfinished, -> { where(status: %w[pending processing]) }
23+
24+
before_create :set_batch_id
25+
26+
class << self
27+
def enqueue(job_instances, on_complete: nil, on_success: nil, on_failure: nil, metadata: {})
28+
return 0 if job_instances.empty?
29+
30+
batch = create!(
31+
on_complete_job_class: on_complete&.dig(:job)&.to_s,
32+
on_complete_job_args: on_complete&.dig(:args),
33+
on_success_job_class: on_success&.dig(:job)&.to_s,
34+
on_success_job_args: on_success&.dig(:args),
35+
on_failure_job_class: on_failure&.dig(:job)&.to_s,
36+
on_failure_job_args: on_failure&.dig(:args),
37+
metadata: metadata,
38+
total_jobs: job_instances.size,
39+
pending_jobs: job_instances.size
40+
)
41+
42+
# Add batch_id to each job
43+
job_instances.each do |job|
44+
job.batch_id = batch.batch_id
45+
end
46+
47+
# Use SolidQueue's bulk enqueue
48+
enqueued_count = SolidQueue::Job.enqueue_all(job_instances)
49+
50+
# Update pending count if some jobs failed to enqueue
51+
if enqueued_count < job_instances.size
52+
batch.update!(pending_jobs: enqueued_count)
53+
end
54+
55+
batch
56+
end
57+
end
58+
59+
def add_jobs(job_instances)
60+
return 0 if job_instances.empty? || finished?
61+
62+
job_instances.each do |job|
63+
job.batch_id = batch_id
64+
end
65+
66+
enqueued_count = SolidQueue::Job.enqueue_all(job_instances)
67+
68+
increment!(:total_jobs, job_instances.size)
69+
increment!(:pending_jobs, enqueued_count)
70+
71+
enqueued_count
72+
end
73+
74+
def job_finished!(job)
75+
return if finished?
76+
77+
transaction do
78+
if job.failed_execution.present?
79+
increment!(:failed_jobs)
80+
else
81+
increment!(:completed_jobs)
82+
end
83+
84+
decrement!(:pending_jobs)
85+
86+
check_completion!
87+
end
88+
end
89+
90+
def check_completion!
91+
return if finished?
92+
93+
if pending_jobs <= 0
94+
if failed_jobs > 0
95+
mark_as_failed!
96+
else
97+
mark_as_completed!
98+
end
99+
elsif status == "pending"
100+
update!(status: "processing")
101+
end
102+
end
103+
104+
def finished?
105+
status.in?(%w[completed failed])
106+
end
107+
108+
def processing?
109+
status == "processing"
110+
end
111+
112+
def pending?
113+
status == "pending"
114+
end
115+
116+
def progress_percentage
117+
return 0 if total_jobs == 0
118+
((completed_jobs + failed_jobs) * 100.0 / total_jobs).round(2)
119+
end
120+
121+
private
122+
def set_batch_id
123+
self.batch_id ||= SecureRandom.uuid
124+
end
125+
126+
def mark_as_completed!
127+
update!(status: "completed", completed_at: Time.current)
128+
enqueue_callback(:on_success)
129+
enqueue_callback(:on_complete)
130+
end
131+
132+
def mark_as_failed!
133+
update!(status: "failed", completed_at: Time.current)
134+
enqueue_callback(:on_failure)
135+
enqueue_callback(:on_complete)
136+
end
137+
138+
def enqueue_callback(callback_type)
139+
job_class = public_send("#{callback_type}_job_class")
140+
job_args = public_send("#{callback_type}_job_args")
141+
142+
return unless job_class.present?
143+
144+
job_class.constantize.perform_later(
145+
batch_id: batch_id,
146+
**(job_args || {}).symbolize_keys
147+
)
148+
rescue => e
149+
Rails.logger.error "[SolidQueue] Failed to enqueue #{callback_type} callback for batch #{batch_id}: #{e.message}"
150+
end
151+
end
152+
end

app/models/solid_queue/job.rb

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ module SolidQueue
44
class Job < Record
55
class EnqueueError < StandardError; end
66

7-
include Executable, Clearable, Recurrable
7+
include Executable, Clearable, Recurrable, Batchable
88

99
serialize :arguments, coder: JSON
1010

@@ -60,7 +60,8 @@ def attributes_from_active_job(active_job)
6060
scheduled_at: active_job.scheduled_at,
6161
class_name: active_job.class.name,
6262
arguments: active_job.serialize,
63-
concurrency_key: active_job.concurrency_key
63+
concurrency_key: active_job.concurrency_key,
64+
batch_id: active_job.respond_to?(:batch_id) ? active_job.batch_id : nil
6465
}
6566
end
6667
end
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
# frozen_string_literal: true
2+
3+
module SolidQueue
4+
class Job
5+
module Batchable
6+
extend ActiveSupport::Concern
7+
8+
included do
9+
belongs_to :batch, foreign_key: :batch_id, primary_key: :batch_id, optional: true, class_name: "SolidQueue::Batch"
10+
11+
scope :in_batch, ->(batch_id) { where(batch_id: batch_id) }
12+
scope :without_batch, -> { where(batch_id: nil) }
13+
scope :batch_pending, -> { in_batch.where(finished_at: nil) }
14+
scope :batch_finished, -> { in_batch.where.not(finished_at: nil) }
15+
16+
after_update :notify_batch_if_finished, if: :batch_id?
17+
end
18+
19+
class_methods do
20+
def enqueue_batch(active_jobs, **batch_options)
21+
return 0 if active_jobs.empty?
22+
23+
Batch.enqueue(active_jobs, **batch_options)
24+
end
25+
26+
def create_all_from_active_jobs_with_batch(active_jobs, batch_id = nil)
27+
if batch_id.present?
28+
job_rows = active_jobs.map do |job|
29+
attributes_from_active_job(job).merge(batch_id: batch_id)
30+
end
31+
insert_all(job_rows)
32+
where(active_job_id: active_jobs.map(&:job_id))
33+
else
34+
create_all_from_active_jobs_without_batch(active_jobs)
35+
end
36+
end
37+
end
38+
39+
def in_batch?
40+
batch_id.present?
41+
end
42+
43+
def batch_siblings
44+
return Job.none unless in_batch?
45+
46+
self.class.in_batch(batch_id).where.not(id: id)
47+
end
48+
49+
def batch_position
50+
return nil unless in_batch?
51+
52+
batch.jobs.where("id <= ?", id).count
53+
end
54+
55+
private
56+
def notify_batch_if_finished
57+
return unless saved_change_to_finished_at? && finished_at.present?
58+
return unless batch.present?
59+
60+
# Use perform_later to avoid holding locks
61+
BatchUpdateJob.perform_later(batch_id, id)
62+
rescue => e
63+
Rails.logger.error "[SolidQueue] Failed to notify batch #{batch_id} about job #{id} completion: #{e.message}"
64+
end
65+
end
66+
end
67+
end

0 commit comments

Comments
 (0)