Retry federation if remote pod is down
This commit is contained in:
parent
db3e411978
commit
bcbc86e502
8 changed files with 58 additions and 35 deletions
2
Gemfile
2
Gemfile
|
|
@ -71,6 +71,8 @@ gem 'ruby-oembed'
|
||||||
gem 'resque', '1.10.0'
|
gem 'resque', '1.10.0'
|
||||||
gem 'resque-ensure-connected'
|
gem 'resque-ensure-connected'
|
||||||
gem 'resque-timeout', '1.0.0'
|
gem 'resque-timeout', '1.0.0'
|
||||||
|
gem 'resque-scheduler'
|
||||||
|
gem 'resque-retry'
|
||||||
gem 'SystemTimer', '1.2.1', :platforms => :ruby_18
|
gem 'SystemTimer', '1.2.1', :platforms => :ruby_18
|
||||||
|
|
||||||
# reporting
|
# reporting
|
||||||
|
|
|
||||||
11
Gemfile.lock
11
Gemfile.lock
|
|
@ -381,6 +381,13 @@ GEM
|
||||||
resque-ensure-connected (0.1.0)
|
resque-ensure-connected (0.1.0)
|
||||||
activerecord (>= 2.3.5)
|
activerecord (>= 2.3.5)
|
||||||
resque (~> 1.10.0)
|
resque (~> 1.10.0)
|
||||||
|
resque-retry (0.1.0)
|
||||||
|
resque (>= 1.8.0)
|
||||||
|
resque-scheduler (>= 1.8.0)
|
||||||
|
resque-scheduler (1.9.9)
|
||||||
|
redis (>= 2.0.1)
|
||||||
|
resque (>= 1.8.0)
|
||||||
|
rufus-scheduler
|
||||||
resque-timeout (1.0.0)
|
resque-timeout (1.0.0)
|
||||||
resque (~> 1.0)
|
resque (~> 1.0)
|
||||||
rest-client (1.6.1)
|
rest-client (1.6.1)
|
||||||
|
|
@ -422,6 +429,8 @@ GEM
|
||||||
archive-tar-minitar (>= 0.5.2)
|
archive-tar-minitar (>= 0.5.2)
|
||||||
rubyntlm (0.1.1)
|
rubyntlm (0.1.1)
|
||||||
rubyzip (0.9.4)
|
rubyzip (0.9.4)
|
||||||
|
rufus-scheduler (2.0.11)
|
||||||
|
tzinfo (>= 0.3.23)
|
||||||
sass (3.1.7)
|
sass (3.1.7)
|
||||||
selenium-webdriver (2.7.0)
|
selenium-webdriver (2.7.0)
|
||||||
childprocess (>= 0.2.1)
|
childprocess (>= 0.2.1)
|
||||||
|
|
@ -533,6 +542,8 @@ DEPENDENCIES
|
||||||
redcarpet (= 2.0.0b5)
|
redcarpet (= 2.0.0b5)
|
||||||
resque (= 1.10.0)
|
resque (= 1.10.0)
|
||||||
resque-ensure-connected
|
resque-ensure-connected
|
||||||
|
resque-retry
|
||||||
|
resque-scheduler
|
||||||
resque-timeout (= 1.0.0)
|
resque-timeout (= 1.0.0)
|
||||||
rest-client (= 1.6.1)
|
rest-client (= 1.6.1)
|
||||||
roxml!
|
roxml!
|
||||||
|
|
|
||||||
|
|
@ -3,16 +3,30 @@
|
||||||
# the COPYRIGHT file.
|
# the COPYRIGHT file.
|
||||||
|
|
||||||
require 'uri'
|
require 'uri'
|
||||||
|
require 'resque-retry'
|
||||||
require File.join(Rails.root, 'lib/hydra_wrapper')
|
require File.join(Rails.root, 'lib/hydra_wrapper')
|
||||||
|
|
||||||
module Jobs
|
module Jobs
|
||||||
class HttpMulti < Base
|
class HttpMulti < Base
|
||||||
|
extend Resque::Plugins::ExponentialBackoff
|
||||||
|
|
||||||
@queue = :http
|
@queue = :http
|
||||||
|
@backoff_strategy = [10.seconds,
|
||||||
|
1.minute,
|
||||||
|
10.minutes,
|
||||||
|
1.hour,
|
||||||
|
3.hours,
|
||||||
|
6.hours,
|
||||||
|
12.hours,
|
||||||
|
1.day,
|
||||||
|
2.days]
|
||||||
|
|
||||||
MAX_RETRIES = 3
|
def self.args_for_retry(user_id, encoded_object_xml, person_ids, dispatcher_class_as_string)
|
||||||
|
[user_id, encoded_object_xml, @failed_people, dispatcher_class_as_string]
|
||||||
|
end
|
||||||
|
|
||||||
def self.perform(user_id, encoded_object_xml, person_ids, dispatcher_class_as_string, retry_count=0)
|
|
||||||
|
def self.perform(user_id, encoded_object_xml, person_ids, dispatcher_class_as_string)
|
||||||
user = User.find(user_id)
|
user = User.find(user_id)
|
||||||
people = Person.where(:id => person_ids)
|
people = Person.where(:id => person_ids)
|
||||||
|
|
||||||
|
|
@ -22,17 +36,17 @@ module Jobs
|
||||||
hydra.enqueue_batch
|
hydra.enqueue_batch
|
||||||
hydra.run
|
hydra.run
|
||||||
|
|
||||||
unless hydra.failed_people.empty?
|
@failed_people = hydra.failed_people
|
||||||
if retry_count < MAX_RETRIES
|
|
||||||
Resque.enqueue(Jobs::HttpMulti, user_id, encoded_object_xml, hydra.failed_people, dispatcher_class_as_string, retry_count + 1 )
|
if not @failed_people.empty?
|
||||||
|
if self.retry_limit_reached?
|
||||||
|
msg = "event=http_multi_abandon sender_id=#{user_id} failed_recipient_ids='[#{@failed_people.join(', ')}]'"
|
||||||
|
Rails.logger.info(msg)
|
||||||
else
|
else
|
||||||
Rails.logger.info("event=http_multi_abandon sender_id=#{user_id} failed_recipient_ids='[#{person_ids.join(', ')}] '")
|
raise 'retry'
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,6 @@
|
||||||
require 'resque'
|
require 'resque'
|
||||||
|
require 'resque_scheduler'
|
||||||
|
require 'resque/scheduler'
|
||||||
|
|
||||||
Resque::Plugins::Timeout.timeout = 300
|
Resque::Plugins::Timeout.timeout = 300
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -22,11 +22,14 @@ module ResqueJobLogging
|
||||||
backtrace = application_trace(error)
|
backtrace = application_trace(error)
|
||||||
log_string << "app_backtrace='#{backtrace.join(";")}' "
|
log_string << "app_backtrace='#{backtrace.join(";")}' "
|
||||||
notify_hoptoad(error, args) if AppConfig[:hoptoad_api_key].present?
|
notify_hoptoad(error, args) if AppConfig[:hoptoad_api_key].present?
|
||||||
|
|
||||||
|
do_log = !self.respond_to?('retry_limit_reached?') || self.retry_limit_reached?
|
||||||
else
|
else
|
||||||
log_string += "status=complete "
|
log_string += "status=complete "
|
||||||
|
do_log = true
|
||||||
end
|
end
|
||||||
|
|
||||||
Rails.logger.info(log_string)
|
Rails.logger.info(log_string) if do_log
|
||||||
raise error if error
|
raise error if error
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,12 @@
|
||||||
require 'resque/tasks'
|
require 'resque/tasks'
|
||||||
|
require 'resque_scheduler/tasks'
|
||||||
|
|
||||||
task "resque:setup" do
|
task "resque:setup" do
|
||||||
require File.join(File.dirname(__FILE__), '..', '..', 'config', 'environment')
|
require File.join(File.dirname(__FILE__), '..', '..', 'config', 'environment')
|
||||||
Rails.logger.info("event=resque_setup rails_env=#{Rails.env}")
|
Rails.logger.info("event=resque_setup rails_env=#{Rails.env}")
|
||||||
|
|
||||||
|
require 'resque_scheduler'
|
||||||
|
require 'resque/scheduler'
|
||||||
end
|
end
|
||||||
|
|
||||||
desc "Alias for resque:work (To run workers on Heroku)"
|
desc "Alias for resque:work (To run workers on Heroku)"
|
||||||
|
|
|
||||||
|
|
@ -31,28 +31,6 @@ describe Jobs::HttpMulti do
|
||||||
Jobs::HttpMulti.perform(bob.id, @post_xml, people_ids, "Postzord::Dispatcher::Private")
|
Jobs::HttpMulti.perform(bob.id, @post_xml, people_ids, "Postzord::Dispatcher::Private")
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'retries' do
|
|
||||||
person = @people[0]
|
|
||||||
|
|
||||||
@hydra.stub(:post, person.receive_url).and_return(@failed_response)
|
|
||||||
|
|
||||||
Typhoeus::Hydra.stub!(:new).and_return(@hydra)
|
|
||||||
|
|
||||||
Resque.should_receive(:enqueue).with(Jobs::HttpMulti, bob.id, @post_xml, [person.id], anything, 1).once
|
|
||||||
Jobs::HttpMulti.perform(bob.id, @post_xml, [person.id], "Postzord::Dispatcher::Private")
|
|
||||||
end
|
|
||||||
|
|
||||||
it 'max retries' do
|
|
||||||
person = @people[0]
|
|
||||||
|
|
||||||
@hydra.stub(:post, person.receive_url).and_return(@failed_response)
|
|
||||||
|
|
||||||
Typhoeus::Hydra.stub!(:new).and_return(@hydra)
|
|
||||||
|
|
||||||
Resque.should_not_receive(:enqueue)
|
|
||||||
Jobs::HttpMulti.perform(bob.id, @post_xml, [person.id], "Postzord::Dispatcher::Private", 3)
|
|
||||||
end
|
|
||||||
|
|
||||||
it 'generates encrypted xml for people' do
|
it 'generates encrypted xml for people' do
|
||||||
person = @people[0]
|
person = @people[0]
|
||||||
|
|
||||||
|
|
@ -76,7 +54,12 @@ describe Jobs::HttpMulti do
|
||||||
|
|
||||||
Typhoeus::Hydra.stub!(:new).and_return(@hydra)
|
Typhoeus::Hydra.stub!(:new).and_return(@hydra)
|
||||||
|
|
||||||
|
begin
|
||||||
Jobs::HttpMulti.perform(bob.id, @post_xml, [person.id], "Postzord::Dispatcher::Private")
|
Jobs::HttpMulti.perform(bob.id, @post_xml, [person.id], "Postzord::Dispatcher::Private")
|
||||||
|
rescue RuntimeError => e
|
||||||
|
e.message == 'retry'
|
||||||
|
end
|
||||||
|
|
||||||
person.reload
|
person.reload
|
||||||
person.url.should == "https://remote.net/"
|
person.url.should == "https://remote.net/"
|
||||||
end
|
end
|
||||||
|
|
|
||||||
|
|
@ -1,7 +1,11 @@
|
||||||
module Resque
|
module Resque
|
||||||
def enqueue(klass, *args)
|
def enqueue(klass, *args)
|
||||||
if $process_queue
|
if $process_queue
|
||||||
|
begin
|
||||||
klass.send(:perform, *args)
|
klass.send(:perform, *args)
|
||||||
|
rescue RuntimeError => e
|
||||||
|
e.message == 'retry'
|
||||||
|
end
|
||||||
else
|
else
|
||||||
true
|
true
|
||||||
end
|
end
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue