Merge pull request #6950 from cmrd-senya/consolidate-queues

Consolidate queues
This commit is contained in:
Jonne Haß 2016-08-10 11:46:09 +02:00 committed by GitHub
commit 90abad0152
36 changed files with 73 additions and 46 deletions

View file

@ -59,6 +59,24 @@ wiki](https://wiki.diasporafoundation.org/Integration/Chat#Vines_to_Prosody)
for more information on how to migrate to Prosody if you've been using Vines for more information on how to migrate to Prosody if you've been using Vines
before. before.
## Sidekiq queue changes
We've decreased the amount of sidekiq queues from 13 to 5 in PR [#6950](https://github.com/diaspora/diaspora/pull/6950).
The new queues are organized according to priority for the jobs they will process. When upgrading please make sure to
empty the sidekiq queues before shutting down the server for an update.
If you run your sidekiq with a custom queue configuration, please make sure to update that for the new queues.
The new queues are: `urgent, high, medium, low, default`.
When you upgrade to the new version, some jobs may persist in the old queues. To ensure that jobs to be processed, launch
job processing for old queues using command:
```
rake migrations:run_legacy_queues
```
The command will report queues that still have jobs and launch sidekiq process for that queues.
## Refactor ## Refactor
* Improve bookmarklet [#5904](https://github.com/diaspora/diaspora/pull/5904) * Improve bookmarklet [#5904](https://github.com/diaspora/diaspora/pull/5904)
* Update listen configuration to listen on unix sockets by default [#5974](https://github.com/diaspora/diaspora/pull/5974) * Update listen configuration to listen on unix sockets by default [#5974](https://github.com/diaspora/diaspora/pull/5974)
@ -118,6 +136,7 @@ before.
* Extract relayable signatures into their own tables [#6932](https://github.com/diaspora/diaspora/pull/6932) * Extract relayable signatures into their own tables [#6932](https://github.com/diaspora/diaspora/pull/6932)
* Remove outdated columns from posts table [#6940](https://github.com/diaspora/diaspora/pull/6940) * Remove outdated columns from posts table [#6940](https://github.com/diaspora/diaspora/pull/6940)
* Remove some unused routes [#6781](https://github.com/diaspora/diaspora/pull/6781) * Remove some unused routes [#6781](https://github.com/diaspora/diaspora/pull/6781)
* Consolidate sidekiq queues [#6950](https://github.com/diaspora/diaspora/pull/6950)
## Bug fixes ## Bug fixes
* Destroy Participation when removing interactions with a post [#5852](https://github.com/diaspora/diaspora/pull/5852) * Destroy Participation when removing interactions with a post [#5852](https://github.com/diaspora/diaspora/pull/5852)

View file

@ -1,6 +1,6 @@
module Workers module Workers
class CleanCachedFiles < Base class CleanCachedFiles < Base
sidekiq_options queue: :maintenance sidekiq_options queue: :low
def perform def perform
CarrierWave.clean_cached_files! CarrierWave.clean_cached_files!

View file

@ -4,7 +4,7 @@
module Workers module Workers
class DeferredDispatch < Base class DeferredDispatch < Base
sidekiq_options queue: :dispatch sidekiq_options queue: :high
def perform(user_id, object_class_name, object_id, opts) def perform(user_id, object_class_name, object_id, opts)
user = User.find(user_id) user = User.find(user_id)

View file

@ -4,7 +4,7 @@
module Workers module Workers
class DeferredRetraction < Base class DeferredRetraction < Base
sidekiq_options queue: :dispatch sidekiq_options queue: :high
def perform(user_id, retraction_data, recipient_ids, opts) def perform(user_id, retraction_data, recipient_ids, opts)
user = User.find(user_id) user = User.find(user_id)

View file

@ -5,7 +5,7 @@
module Workers module Workers
class DeleteAccount < Base class DeleteAccount < Base
sidekiq_options queue: :delete_account sidekiq_options queue: :low
def perform(account_deletion_id) def perform(account_deletion_id)
account_deletion = AccountDeletion.find(account_deletion_id) account_deletion = AccountDeletion.find(account_deletion_id)

View file

@ -4,7 +4,7 @@
# #
module Workers module Workers
class DeletePostFromService < Base class DeletePostFromService < Base
sidekiq_options queue: :http_service sidekiq_options queue: :high
def perform(service_id, opts) def perform(service_id, opts)
service = Service.find_by_id(service_id) service = Service.find_by_id(service_id)

View file

@ -5,7 +5,7 @@
module Workers module Workers
class ExportPhotos < Base class ExportPhotos < Base
sidekiq_options queue: :export sidekiq_options queue: :low
def perform(user_id) def perform(user_id)
@user = User.find(user_id) @user = User.find(user_id)

View file

@ -5,7 +5,7 @@
module Workers module Workers
class ExportUser < Base class ExportUser < Base
sidekiq_options queue: :export sidekiq_options queue: :low
def perform(user_id) def perform(user_id)
@user = User.find(user_id) @user = User.find(user_id)

View file

@ -5,7 +5,7 @@
module Workers module Workers
class FetchProfilePhoto < Base class FetchProfilePhoto < Base
sidekiq_options queue: :photos sidekiq_options queue: :medium
def perform(user_id, service_id, fallback_image_url = nil) def perform(user_id, service_id, fallback_image_url = nil)
service = Service.find(service_id) service = Service.find(service_id)

View file

@ -4,7 +4,7 @@
module Workers module Workers
class FetchPublicPosts < Base class FetchPublicPosts < Base
sidekiq_options queue: :http_service sidekiq_options queue: :medium
def perform(diaspora_id) def perform(diaspora_id)
Diaspora::Fetcher::Public.new.fetch!(diaspora_id) Diaspora::Fetcher::Public.new.fetch!(diaspora_id)

View file

@ -4,7 +4,7 @@
module Workers module Workers
class FetchWebfinger < Base class FetchWebfinger < Base
sidekiq_options queue: :socket_webfinger sidekiq_options queue: :urgent
def perform(account) def perform(account)
person = Person.find_or_fetch_by_identifier(account) person = Person.find_or_fetch_by_identifier(account)

View file

@ -5,7 +5,7 @@
module Workers module Workers
class GatherOEmbedData < Base class GatherOEmbedData < Base
sidekiq_options queue: :http_service sidekiq_options queue: :medium
def perform(post_id, url, retry_count=1) def perform(post_id, url, retry_count=1)
post = Post.find(post_id) post = Post.find(post_id)

View file

@ -5,7 +5,7 @@
module Workers module Workers
class GatherOpenGraphData < Base class GatherOpenGraphData < Base
sidekiq_options queue: :http_service sidekiq_options queue: :medium
def perform(post_id, url, retry_count=1) def perform(post_id, url, retry_count=1)
post = Post.find(post_id) post = Post.find(post_id)

View file

@ -1,7 +1,7 @@
module Workers module Workers
module Mail module Mail
class AlsoCommented < Base class AlsoCommented < Base
sidekiq_options queue: :mail sidekiq_options queue: :low
def perform(recipient_id, sender_id, comment_id) def perform(recipient_id, sender_id, comment_id)
if email = Notifier.also_commented(recipient_id, sender_id, comment_id) if email = Notifier.also_commented(recipient_id, sender_id, comment_id)

View file

@ -1,7 +1,7 @@
module Workers module Workers
module Mail module Mail
class CommentOnPost < Base class CommentOnPost < Base
sidekiq_options queue: :mail sidekiq_options queue: :low
def perform(recipient_id, sender_id, comment_id) def perform(recipient_id, sender_id, comment_id)
Notifier.comment_on_post(recipient_id, sender_id, comment_id).deliver_now Notifier.comment_on_post(recipient_id, sender_id, comment_id).deliver_now

View file

@ -1,7 +1,7 @@
module Workers module Workers
module Mail module Mail
class ConfirmEmail < Base class ConfirmEmail < Base
sidekiq_options queue: :mail sidekiq_options queue: :low
def perform(user_id) def perform(user_id)
Notifier.confirm_email(user_id).deliver_now Notifier.confirm_email(user_id).deliver_now

View file

@ -5,7 +5,7 @@
module Workers module Workers
module Mail module Mail
class InviteEmail < Base class InviteEmail < Base
sidekiq_options queue: :mail sidekiq_options queue: :low
def perform(emails, inviter_id, options={}) def perform(emails, inviter_id, options={})
EmailInviter.new(emails, User.find(inviter_id), options).send! EmailInviter.new(emails, User.find(inviter_id), options).send!

View file

@ -1,7 +1,7 @@
module Workers module Workers
module Mail module Mail
class Liked < Base class Liked < Base
sidekiq_options queue: :mail sidekiq_options queue: :low
def perform(recipient_id, sender_id, like_id) def perform(recipient_id, sender_id, like_id)
Notifier.liked(recipient_id, sender_id, like_id).deliver_now Notifier.liked(recipient_id, sender_id, like_id).deliver_now

View file

@ -6,7 +6,7 @@
module Workers module Workers
module Mail module Mail
class Mentioned < Base class Mentioned < Base
sidekiq_options queue: :mail sidekiq_options queue: :low
def perform(recipient_id, actor_id, target_id) def perform(recipient_id, actor_id, target_id)
Notifier.mentioned( recipient_id, actor_id, target_id).deliver_now Notifier.mentioned( recipient_id, actor_id, target_id).deliver_now

View file

@ -6,7 +6,7 @@
module Workers module Workers
module Mail module Mail
class PrivateMessage < Base class PrivateMessage < Base
sidekiq_options queue: :mail sidekiq_options queue: :low
def perform(recipient_id, actor_id, target_id) def perform(recipient_id, actor_id, target_id)
Notifier.private_message( recipient_id, actor_id, target_id).deliver_now Notifier.private_message( recipient_id, actor_id, target_id).deliver_now

View file

@ -1,7 +1,7 @@
module Workers module Workers
module Mail module Mail
class ReportWorker < Base class ReportWorker < Base
sidekiq_options queue: :mail sidekiq_options queue: :low
def perform(report_id) def perform(report_id)
ReportMailer.new_report(report_id).each(&:deliver_now) ReportMailer.new_report(report_id).each(&:deliver_now)

View file

@ -1,7 +1,7 @@
module Workers module Workers
module Mail module Mail
class Reshared < Base class Reshared < Base
sidekiq_options queue: :mail sidekiq_options queue: :low
def perform(recipient_id, sender_id, reshare_id) def perform(recipient_id, sender_id, reshare_id)
Notifier.reshared(recipient_id, sender_id, reshare_id).deliver_now Notifier.reshared(recipient_id, sender_id, reshare_id).deliver_now

View file

@ -6,7 +6,7 @@
module Workers module Workers
module Mail module Mail
class StartedSharing < Base class StartedSharing < Base
sidekiq_options queue: :mail sidekiq_options queue: :low
def perform(recipient_id, sender_id, target_id) def perform(recipient_id, sender_id, target_id)
Notifier.started_sharing(recipient_id, sender_id).deliver_now Notifier.started_sharing(recipient_id, sender_id).deliver_now

View file

@ -4,7 +4,7 @@
# #
module Workers module Workers
class PostToService < Base class PostToService < Base
sidekiq_options queue: :http_service sidekiq_options queue: :medium
def perform(service_id, post_id, url) def perform(service_id, post_id, url)
service = Service.find_by_id(service_id) service = Service.find_by_id(service_id)

View file

@ -5,7 +5,7 @@
module Workers module Workers
class ProcessPhoto < Base class ProcessPhoto < Base
sidekiq_options queue: :photos sidekiq_options queue: :low
def perform(id) def perform(id)
photo = Photo.find(id) photo = Photo.find(id)

View file

@ -4,7 +4,7 @@
module Workers module Workers
class PublishToHub < Base class PublishToHub < Base
sidekiq_options queue: :http_service sidekiq_options queue: :medium
def perform(sender_atom_url) def perform(sender_atom_url)
Pubsubhubbub.new(AppConfig.environment.pubsub_server.get).publish(sender_atom_url) Pubsubhubbub.new(AppConfig.environment.pubsub_server.get).publish(sender_atom_url)

View file

@ -4,7 +4,7 @@
module Workers module Workers
class QueueUsersForRemoval < Base class QueueUsersForRemoval < Base
sidekiq_options queue: :maintenance sidekiq_options queue: :low
def perform def perform
# Queue users for removal due to inactivity # Queue users for removal due to inactivity

View file

@ -1,6 +1,6 @@
module Workers module Workers
class ReceiveBase < Base class ReceiveBase < Base
sidekiq_options queue: :receive sidekiq_options queue: :urgent
include Diaspora::Logging include Diaspora::Logging

View file

@ -1,6 +1,6 @@
module Workers module Workers
class ReceiveLocal < Base class ReceiveLocal < Base
sidekiq_options queue: :receive_local sidekiq_options queue: :high
def perform(object_class_string, object_id, recipient_user_ids) def perform(object_class_string, object_id, recipient_user_ids)
object = object_class_string.constantize.find(object_id) object = object_class_string.constantize.find(object_id)

View file

@ -1,6 +1,6 @@
module Workers module Workers
class RecurringPodCheck < Base class RecurringPodCheck < Base
sidekiq_options queue: :maintenance sidekiq_options queue: :low
def perform def perform
Pod.check_all! Pod.check_all!

View file

@ -4,7 +4,7 @@
module Workers module Workers
class RemoveOldUser < Base class RemoveOldUser < Base
sidekiq_options queue: :maintenance sidekiq_options queue: :low
def safe_remove_after def safe_remove_after
# extra safety time to compare in addition to remove_after # extra safety time to compare in addition to remove_after
@ -24,4 +24,4 @@ module Workers
end end
end end
end end
end end

View file

@ -5,7 +5,7 @@
module Workers module Workers
class ResendInvitation < Base class ResendInvitation < Base
sidekiq_options queue: :mail sidekiq_options queue: :low
def perform(invitation_id) def perform(invitation_id)
inv = Invitation.find(invitation_id) inv = Invitation.find(invitation_id)

View file

@ -1,6 +1,6 @@
module Workers module Workers
class ResetPassword < Base class ResetPassword < Base
sidekiq_options queue: :mail sidekiq_options queue: :urgent
def perform(user_id) def perform(user_id)
User.find(user_id).send_reset_password_instructions! User.find(user_id).send_reset_password_instructions!

View file

@ -1,6 +1,6 @@
module Workers module Workers
class SendBase < Base class SendBase < Base
sidekiq_options queue: :http, retry: 0 sidekiq_options queue: :medium, retry: 0
MAX_RETRIES = AppConfig.environment.sidekiq.retry.get.to_i MAX_RETRIES = AppConfig.environment.sidekiq.retry.get.to_i

View file

@ -8,16 +8,8 @@
:dead_max_jobs: <%= AppConfig.environment.sidekiq.dead_jobs_limit.to_i %> :dead_max_jobs: <%= AppConfig.environment.sidekiq.dead_jobs_limit.to_i %>
:dead_timeout_in_seconds: <%= AppConfig.environment.sidekiq.dead_jobs_timeout.to_i %> :dead_timeout_in_seconds: <%= AppConfig.environment.sidekiq.dead_jobs_timeout.to_i %>
:queues: :queues:
- socket_webfinger - urgent
- photos - high
- http_service - medium
- dispatch - low
- mail
- delete_account
- receive_local
- receive
- receive_salmon
- http
- export
- maintenance
- default - default

View file

@ -127,4 +127,20 @@ namespace :migrations do
tag.destroy tag.destroy
end end
end end
LEGACY_QUEUES = %w(
maintenance dispatch delete_account http http_service export photos socket_webfinger mail receive_local receive
).freeze
desc "Run sidekiq with old queues so it can finish deferred jobs"
task :run_legacy_queues do
queues_with_jobs = LEGACY_QUEUES.select {|queue| Sidekiq::Queue.new(queue).size > 0 }
if queues_with_jobs.empty?
puts "No jobs in legacy queues!"
else
puts "Launching sidekiq with queues: #{queues_with_jobs.join(', ')}"
queus_cli = queues_with_jobs.map {|queue| "-q #{queue}" }.join(" ")
system "bundle exec sidekiq #{queus_cli} -e #{Rails.env}"
end
end
end end