From 7596a49b469604a4ba4f924e7ece5c07b4333c8d Mon Sep 17 00:00:00 2001 From: Benjamin Neff Date: Sun, 17 Jan 2016 01:40:21 +0100 Subject: [PATCH] add new send workers --- app/workers/send_base.rb | 29 ++++++++++++++++++++ app/workers/send_private.rb | 13 +++++++++ app/workers/send_public.rb | 13 +++++++++ spec/workers/send_base_spec.rb | 20 ++++++++++++++ spec/workers/send_private_spec.rb | 44 +++++++++++++++++++++++++++++++ spec/workers/send_public_spec.rb | 41 ++++++++++++++++++++++++++++ 6 files changed, 160 insertions(+) create mode 100644 app/workers/send_base.rb create mode 100644 app/workers/send_private.rb create mode 100644 app/workers/send_public.rb create mode 100644 spec/workers/send_base_spec.rb create mode 100644 spec/workers/send_private_spec.rb create mode 100644 spec/workers/send_public_spec.rb diff --git a/app/workers/send_base.rb b/app/workers/send_base.rb new file mode 100644 index 000000000..f7cce44a3 --- /dev/null +++ b/app/workers/send_base.rb @@ -0,0 +1,29 @@ +module Workers + class SendBase < Base + sidekiq_options queue: :http, retry: 0 + + MAX_RETRIES = AppConfig.environment.sidekiq.retry.get.to_i + + protected + + def schedule_retry(retry_count, sender_id, obj_str, failed_urls) + if retry_count < MAX_RETRIES + yield(seconds_to_delay(retry_count), retry_count) + else + logger.warn "status=abandon sender=#{sender_id} obj=#{obj_str} failed_urls='[#{failed_urls.join(', ')}]'" + raise MaxRetriesReached + end + end + + private + + # based on Sidekiq::Middleware::Server::RetryJobs#seconds_to_delay + def seconds_to_delay(count) + ((count + 3)**4) + (rand(30) * (count + 1)) + end + + # send job to the dead job queue + class MaxRetriesReached < RuntimeError + end + end +end diff --git a/app/workers/send_private.rb b/app/workers/send_private.rb new file mode 100644 index 000000000..8f87d6b51 --- /dev/null +++ b/app/workers/send_private.rb @@ -0,0 +1,13 @@ +module Workers + class SendPrivate < SendBase + def perform(sender_id, obj_str, targets, retry_count=0) + targets_to_retry = DiasporaFederation::Federation::Sender.private(sender_id, obj_str, targets) + + return if targets_to_retry.empty? + + schedule_retry(retry_count + 1, sender_id, obj_str, targets_to_retry.keys) do |delay, new_retry_count| + Workers::SendPrivate.perform_in(delay, sender_id, obj_str, targets_to_retry, new_retry_count) + end + end + end +end diff --git a/app/workers/send_public.rb b/app/workers/send_public.rb new file mode 100644 index 000000000..5022eac6f --- /dev/null +++ b/app/workers/send_public.rb @@ -0,0 +1,13 @@ +module Workers + class SendPublic < SendBase + def perform(sender_id, obj_str, urls, xml, retry_count=0) + urls_to_retry = DiasporaFederation::Federation::Sender.public(sender_id, obj_str, urls, xml) + + return if urls_to_retry.empty? + + schedule_retry(retry_count + 1, sender_id, obj_str, urls_to_retry) do |delay, new_retry_count| + Workers::SendPublic.perform_in(delay, sender_id, obj_str, urls_to_retry, xml, new_retry_count) + end + end + end +end diff --git a/spec/workers/send_base_spec.rb b/spec/workers/send_base_spec.rb new file mode 100644 index 000000000..6515faf90 --- /dev/null +++ b/spec/workers/send_base_spec.rb @@ -0,0 +1,20 @@ +require "spec_helper" + +describe Workers::SendBase do + it "retries first time after at least 256 seconds" do + retry_delay = Workers::SendBase.new.send(:seconds_to_delay, 1) + expect(retry_delay).to be >= 256 + expect(retry_delay).to be < 316 + end + + it "increases the interval for each retry" do + expect(Workers::SendBase.new.send(:seconds_to_delay, 2)).to be >= 625 + expect(Workers::SendBase.new.send(:seconds_to_delay, 3)).to be >= 1_296 + expect(Workers::SendBase.new.send(:seconds_to_delay, 4)).to be >= 2_401 + expect(Workers::SendBase.new.send(:seconds_to_delay, 5)).to be >= 4_096 + expect(Workers::SendBase.new.send(:seconds_to_delay, 6)).to be >= 6_561 + expect(Workers::SendBase.new.send(:seconds_to_delay, 7)).to be >= 10_000 + expect(Workers::SendBase.new.send(:seconds_to_delay, 8)).to be >= 14_641 + expect(Workers::SendBase.new.send(:seconds_to_delay, 9)).to be >= 20_736 + end +end diff --git a/spec/workers/send_private_spec.rb b/spec/workers/send_private_spec.rb new file mode 100644 index 000000000..947b6c8ee --- /dev/null +++ b/spec/workers/send_private_spec.rb @@ -0,0 +1,44 @@ +require "spec_helper" + +describe Workers::SendPrivate do + let(:sender_id) { "any_user@example.org" } + let(:obj_str) { "status_message@guid" } + let(:targets) { + { + "https://example.org/receive/user/guid" => "post", + "https://example.com/receive/user/guid" => "post2" + } + } + let(:failing_targets) { {"https://example.org/receive/user/guid" => "post"} } + + it "succeeds if all urls were successful" do + expect(DiasporaFederation::Federation::Sender).to receive(:private).with( + sender_id, obj_str, targets + ).and_return({}) + expect(Workers::SendPrivate).not_to receive(:perform_in) + + Workers::SendPrivate.new.perform(sender_id, obj_str, targets) + end + + it "retries failing urls" do + expect(DiasporaFederation::Federation::Sender).to receive(:private).with( + sender_id, obj_str, targets + ).and_return(failing_targets) + expect(Workers::SendPrivate).to receive(:perform_in).with( + kind_of(Fixnum), sender_id, obj_str, failing_targets, 1 + ) + + Workers::SendPrivate.new.perform(sender_id, obj_str, targets) + end + + it "does not retry failing urls if max retries is reached" do + expect(DiasporaFederation::Federation::Sender).to receive(:private).with( + sender_id, obj_str, targets + ).and_return(failing_targets) + expect(Workers::SendPrivate).not_to receive(:perform_in) + + expect { + Workers::SendPrivate.new.perform(sender_id, obj_str, targets, 9) + }.to raise_error Workers::SendBase::MaxRetriesReached + end +end diff --git a/spec/workers/send_public_spec.rb b/spec/workers/send_public_spec.rb new file mode 100644 index 000000000..2bb7eb9c4 --- /dev/null +++ b/spec/workers/send_public_spec.rb @@ -0,0 +1,41 @@ +require "spec_helper" + +describe Workers::SendPublic do + let(:sender_id) { "any_user@example.org" } + let(:obj_str) { "status_message@guid" } + let(:urls) { ["https://example.org/receive/public", "https://example.com/receive/public"] } + let(:xml) { "post" } + + it "succeeds if all urls were successful" do + expect(DiasporaFederation::Federation::Sender).to receive(:public).with( + sender_id, obj_str, urls, xml + ).and_return([]) + expect(Workers::SendPublic).not_to receive(:perform_in) + + Workers::SendPublic.new.perform(sender_id, obj_str, urls, xml) + end + + it "retries failing urls" do + failing_urls = [urls.at(0)] + expect(DiasporaFederation::Federation::Sender).to receive(:public).with( + sender_id, obj_str, urls, xml + ).and_return(failing_urls) + expect(Workers::SendPublic).to receive(:perform_in).with( + kind_of(Fixnum), sender_id, obj_str, failing_urls, xml, 1 + ) + + Workers::SendPublic.new.perform(sender_id, obj_str, urls, xml) + end + + it "does not retry failing urls if max retries is reached" do + failing_urls = [urls.at(0)] + expect(DiasporaFederation::Federation::Sender).to receive(:public).with( + sender_id, obj_str, urls, xml + ).and_return(failing_urls) + expect(Workers::SendPublic).not_to receive(:perform_in) + + expect { + Workers::SendPublic.new.perform(sender_id, obj_str, urls, xml, 9) + }.to raise_error Workers::SendBase::MaxRetriesReached + end +end