Merge pull request #6972 from jhass/queue_migration
Add rake task to move jobs from any legacy queue to the default queue
This commit is contained in:
commit
3942dca08f
2 changed files with 28 additions and 15 deletions
|
|
@ -69,12 +69,15 @@ If you run your sidekiq with a custom queue configuration, please make sure to u
|
|||
|
||||
The new queues are: `urgent, high, medium, low, default`.
|
||||
|
||||
When you upgrade to the new version, some jobs may persist in the old queues. To ensure that jobs to be processed, launch
|
||||
job processing for old queues using command:
|
||||
When you upgrade to the new version, some jobs may persist in the old queues. To move them to the default queue,
|
||||
so they're processed, run:
|
||||
|
||||
```
|
||||
bin/rake migrations:run_legacy_queues
|
||||
bin/rake migrations:legacy_queues
|
||||
```
|
||||
|
||||
Note that this will retry all dead jobs, if you want to prevent that empty the dead queue first.
|
||||
|
||||
The command will report queues that still have jobs and launch sidekiq process for that queues.
|
||||
|
||||
## Refactor
|
||||
|
|
|
|||
|
|
@ -128,19 +128,29 @@ namespace :migrations do
|
|||
end
|
||||
end
|
||||
|
||||
LEGACY_QUEUES = %w(
|
||||
maintenance dispatch delete_account http http_service export photos socket_webfinger mail receive_local receive
|
||||
).freeze
|
||||
CURRENT_QUEUES = %w(urgent high medium low default).freeze
|
||||
|
||||
desc "Run sidekiq with old queues so it can finish deferred jobs"
|
||||
task :run_legacy_queues do
|
||||
queues_with_jobs = LEGACY_QUEUES.select {|queue| Sidekiq::Queue.new(queue).size > 0 }
|
||||
if queues_with_jobs.empty?
|
||||
puts "No jobs in legacy queues!"
|
||||
else
|
||||
puts "Launching sidekiq with queues: #{queues_with_jobs.join(', ')}"
|
||||
queus_cli = queues_with_jobs.map {|queue| "-q #{queue}" }.join(" ")
|
||||
system "bundle exec sidekiq #{queus_cli} -e #{Rails.env}"
|
||||
desc "Migrate sidekiq jobs, retries, scheduled and dead jobs from any legacy queue to "\
|
||||
"the default queue (retries all dead jobs)"
|
||||
task :legacy_queues do
|
||||
# Push all retries, scheduled and dead jobs to their queues
|
||||
Sidekiq::RetrySet.new.retry_all
|
||||
Sidekiq::DeadSet.new.retry_all
|
||||
Sidekiq::ScheduledSet.new.reject {|job| CURRENT_QUEUES.include? job.queue }.each(&:add_to_queue)
|
||||
|
||||
# Move all jobs from legacy queues to the default queue
|
||||
Sidekiq::Queue.all.each do |queue|
|
||||
next if CURRENT_QUEUES.include? queue.name
|
||||
|
||||
puts "Migrating #{queue.size} jobs from #{queue.name} to default..."
|
||||
queue.each do |job|
|
||||
job.item["queue"] = "default"
|
||||
Sidekiq::Client.push(job.item)
|
||||
job.delete
|
||||
end
|
||||
|
||||
# Delete the queue
|
||||
queue.clear
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
|||
Loading…
Reference in a new issue