remove HttpMulti and HydraWrapper
This commit is contained in:
parent
ae05d4e928
commit
51aca4506f
9 changed files with 0 additions and 409 deletions
|
|
@ -1,43 +0,0 @@
|
|||
# Copyright (c) 2010-2011, Diaspora Inc. This file is
|
||||
# licensed under the Affero General Public License version 3 or later. See
|
||||
# the COPYRIGHT file.
|
||||
|
||||
module Workers
|
||||
class HttpMulti < Base
|
||||
sidekiq_options queue: :http
|
||||
|
||||
MAX_RETRIES = 3
|
||||
ABANDON_ON_CODES=[:peer_failed_verification, # Certificate does not match URL
|
||||
:ssl_connect_error, # Problem negotiating ssl version or Cert couldn't be verified (often self-signed)
|
||||
:ssl_cacert, # Expired SSL cert
|
||||
]
|
||||
def perform(user_id, encoded_object_xml, person_ids, dispatcher_class_as_string, retry_count=0)
|
||||
user = User.find(user_id)
|
||||
people = Person.where(:id => person_ids)
|
||||
|
||||
dispatcher = dispatcher_class_as_string.constantize
|
||||
hydra = HydraWrapper.new(user, people, encoded_object_xml, dispatcher)
|
||||
|
||||
hydra.enqueue_batch
|
||||
|
||||
hydra.keep_for_retry_if do |response|
|
||||
!ABANDON_ON_CODES.include?(response.return_code)
|
||||
end
|
||||
|
||||
hydra.run
|
||||
|
||||
|
||||
unless hydra.people_to_retry.empty?
|
||||
if retry_count < MAX_RETRIES
|
||||
Workers::HttpMulti.perform_in(1.hour, user_id, encoded_object_xml, hydra.people_to_retry, dispatcher_class_as_string, retry_count + 1)
|
||||
else
|
||||
logger.info "event=http_multi_abandon sender_id=#{user_id} failed_recipient_ids='[#{person_ids.join(', ')}]'"
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
@ -14,7 +14,6 @@ require 'diaspora'
|
|||
require 'direction_detector'
|
||||
require 'email_inviter'
|
||||
require 'evil_query'
|
||||
require 'hydra_wrapper'
|
||||
require 'postzord'
|
||||
require 'publisher'
|
||||
require 'pubsubhubbub'
|
||||
|
|
|
|||
|
|
@ -1,100 +0,0 @@
|
|||
# Copyright (c) 2010-2011, Diaspora Inc. This file is
|
||||
# licensed under the Affero General Public License version 3 or later. See
|
||||
# the COPYRIGHT file.
|
||||
|
||||
class HydraWrapper
|
||||
include Diaspora::Logging
|
||||
|
||||
OPTS = {
|
||||
maxredirs: 3,
|
||||
timeout: 25,
|
||||
method: :post,
|
||||
verbose: AppConfig.settings.typhoeus_verbose?,
|
||||
cainfo: AppConfig.environment.certificate_authorities.get,
|
||||
headers: {
|
||||
'Expect' => '',
|
||||
'Transfer-Encoding' => '',
|
||||
'User-Agent' => "Diaspora #{AppConfig.version_string}"
|
||||
}
|
||||
}
|
||||
|
||||
attr_reader :people_to_retry , :user, :encoded_object_xml
|
||||
attr_accessor :dispatcher_class, :people
|
||||
delegate :run, to: :hydra
|
||||
|
||||
def initialize user, people, encoded_object_xml, dispatcher_class
|
||||
@user = user
|
||||
@people_to_retry = []
|
||||
@people = people
|
||||
@dispatcher_class = dispatcher_class
|
||||
@encoded_object_xml = encoded_object_xml
|
||||
@keep_for_retry_proc = Proc.new do |response|
|
||||
true
|
||||
end
|
||||
end
|
||||
|
||||
# Inserts jobs for all @people
|
||||
def enqueue_batch
|
||||
grouped_people.each do |receive_url, people_for_receive_url|
|
||||
if xml = xml_factory.xml_for(people_for_receive_url.first)
|
||||
insert_job(receive_url, xml, people_for_receive_url)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
# This method can be used to tell the hydra whether or not to
|
||||
# retry a request that it made which failed.
|
||||
# @yieldparam response [Typhoeus::Response] The response object for the failed request.
|
||||
# @yieldreturn [Boolean] Whether the request whose response was passed to the block should be retried.
|
||||
def keep_for_retry_if &block
|
||||
@keep_for_retry_proc = block
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def hydra
|
||||
@hydra ||= Typhoeus::Hydra.new(max_concurrency: AppConfig.settings.typhoeus_concurrency.to_i)
|
||||
end
|
||||
|
||||
# @return [Salmon]
|
||||
def xml_factory
|
||||
@xml_factory ||= @dispatcher_class.salmon @user, Base64.decode64(@encoded_object_xml)
|
||||
end
|
||||
|
||||
# Group people on their receiving_urls
|
||||
# @return [Hash] People grouped by receive_url ([String] => [Array<Person>])
|
||||
def grouped_people
|
||||
@people.group_by { |person|
|
||||
@dispatcher_class.receive_url_for person
|
||||
}
|
||||
end
|
||||
|
||||
# Prepares and inserts job into the hydra queue
|
||||
# @param url [String]
|
||||
# @param xml [String]
|
||||
# @params people [Array<Person>]
|
||||
def insert_job url, xml, people
|
||||
request = Typhoeus::Request.new url, OPTS.merge(body: {xml: CGI.escape(xml)})
|
||||
prepare_request request, people
|
||||
hydra.queue request
|
||||
end
|
||||
|
||||
# @param request [Typhoeus::Request]
|
||||
# @param person [Person]
|
||||
def prepare_request request, people_for_receive_url
|
||||
request.on_complete do |response|
|
||||
# Save the reference to the pod to the database if not already present
|
||||
Pod.find_or_create_by(url: response.effective_url)
|
||||
|
||||
unless response.success?
|
||||
logger.warn "event=http_multi_fail sender_id=#{@user.id} url=#{response.effective_url} " \
|
||||
"return_code=#{response.return_code} response_code=#{response.response_code}"
|
||||
|
||||
if @keep_for_retry_proc.call(response)
|
||||
@people_to_retry += people_for_receive_url.map(&:id)
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
@ -94,12 +94,6 @@ class Postzord::Dispatcher
|
|||
# @param remote_people [Array<Person>] Recipients of the post on other pods
|
||||
# @return [void]
|
||||
def queue_remote_delivery_job(remote_people)
|
||||
Workers::HttpMulti.perform_async(
|
||||
@sender.id,
|
||||
Base64.strict_encode64(@object.to_diaspora_xml),
|
||||
remote_people.map{|p| p.id},
|
||||
self.class.to_s
|
||||
)
|
||||
end
|
||||
|
||||
# @param people [Array<Person>] Recipients of the post
|
||||
|
|
|
|||
|
|
@ -1,77 +0,0 @@
|
|||
# Copyright (c) 2010-2011, Diaspora Inc. This file is
|
||||
# licensed under the Affero General Public License version 3 or later. See
|
||||
# the COPYRIGHT file.
|
||||
|
||||
require 'spec_helper'
|
||||
|
||||
describe HydraWrapper do
|
||||
before do
|
||||
@people = ["person", "person2", "person3"]
|
||||
@wrapper = HydraWrapper.new double, @people, "<encoded_xml>", double
|
||||
end
|
||||
|
||||
describe 'initialize' do
|
||||
it 'it sets the proper instance variables' do
|
||||
user = "user"
|
||||
encoded_object_xml = "encoded xml"
|
||||
dispatcher_class = "Postzord::Dispatcher::Private"
|
||||
|
||||
wrapper = HydraWrapper.new user, @people, encoded_object_xml, dispatcher_class
|
||||
expect(wrapper.user).to eq(user)
|
||||
expect(wrapper.people).to eq(@people)
|
||||
expect(wrapper.encoded_object_xml).to eq(encoded_object_xml)
|
||||
end
|
||||
end
|
||||
|
||||
describe '#run' do
|
||||
it 'delegates #run to the @hydra' do
|
||||
hydra = double.as_null_object
|
||||
@wrapper.instance_variable_set :@hydra, hydra
|
||||
expect(hydra).to receive :run
|
||||
@wrapper.run
|
||||
end
|
||||
end
|
||||
|
||||
describe '#xml_factory' do
|
||||
it 'calls the salmon method on the dispatcher class (and memoizes)' do
|
||||
allow(Base64).to receive(:decode64).and_return "#{@wrapper.encoded_object_xml} encoded"
|
||||
decoded = Base64.decode64 @wrapper.encoded_object_xml
|
||||
expect(@wrapper.dispatcher_class).to receive(:salmon).with(@wrapper.user, decoded).once.and_return true
|
||||
@wrapper.send :xml_factory
|
||||
@wrapper.send :xml_factory
|
||||
end
|
||||
end
|
||||
|
||||
describe '#grouped_people' do
|
||||
it 'groups people given their receive_urls' do
|
||||
expect(@wrapper.dispatcher_class).to receive(:receive_url_for).and_return "foo.com", "bar.com", "bar.com"
|
||||
|
||||
expect(@wrapper.send(:grouped_people)).to eq({"foo.com" => [@people[0]], "bar.com" => @people[1,2]})
|
||||
end
|
||||
end
|
||||
|
||||
describe '#enqueue_batch' do
|
||||
it 'calls #grouped_people' do
|
||||
expect(@wrapper).to receive(:grouped_people).and_return []
|
||||
@wrapper.enqueue_batch
|
||||
end
|
||||
|
||||
it 'inserts a job for every group of people' do
|
||||
allow(Base64).to receive(:decode64)
|
||||
@wrapper.dispatcher_class = double salmon: double(xml_for: "<XML>")
|
||||
allow(@wrapper).to receive(:grouped_people).and_return('https://foo.com' => @wrapper.people)
|
||||
expect(@wrapper.people).to receive(:first).once
|
||||
expect(@wrapper).to receive(:insert_job).with('https://foo.com', "<XML>", @wrapper.people).once
|
||||
@wrapper.enqueue_batch
|
||||
end
|
||||
|
||||
it 'does not insert a job for a person whos xml returns false' do
|
||||
allow(Base64).to receive(:decode64)
|
||||
allow(@wrapper).to receive(:grouped_people).and_return('https://foo.com' => [double])
|
||||
@wrapper.dispatcher_class = double salmon: double(xml_for: false)
|
||||
expect(@wrapper).not_to receive :insert_job
|
||||
@wrapper.enqueue_batch
|
||||
end
|
||||
|
||||
end
|
||||
end
|
||||
|
|
@ -184,24 +184,6 @@ describe Postzord::Dispatcher do
|
|||
end
|
||||
end
|
||||
|
||||
describe '#deliver_to_remote' do
|
||||
before do
|
||||
@remote_people = []
|
||||
@remote_people << alice.person
|
||||
@mailman = Postzord::Dispatcher.build(alice, @sm)
|
||||
@hydra = double()
|
||||
allow(Typhoeus::Hydra).to receive(:new).and_return(@hydra)
|
||||
end
|
||||
|
||||
it 'should queue an HttpMultiJob for the remote people' do
|
||||
allow_any_instance_of(Postzord::Dispatcher::Public).to receive(:deliver_to_remote).and_call_original
|
||||
expect(Workers::HttpMulti).to receive(:perform_async).with(alice.id, anything, @remote_people.map{|p| p.id}, anything).once
|
||||
@mailman.send(:deliver_to_remote, @remote_people)
|
||||
|
||||
allow(Postzord::Dispatcher::Public).to receive(:deliver_to_remote)
|
||||
end
|
||||
end
|
||||
|
||||
describe '#deliver_to_local' do
|
||||
before do
|
||||
@mailman = Postzord::Dispatcher.build(alice, @sm)
|
||||
|
|
@ -284,7 +266,6 @@ describe Postzord::Dispatcher do
|
|||
mailman = Postzord::Dispatcher.build(alice, FactoryGirl.create(:status_message), :url => "http://joindiaspora.com/p/123", :services => [@s1])
|
||||
|
||||
allow(Workers::PublishToHub).to receive(:perform_async).with(anything)
|
||||
allow(Workers::HttpMulti).to receive(:perform_async).with(anything, anything, anything)
|
||||
expect(Workers::PostToService).to receive(:perform_async).with(@s1.id, anything, anything)
|
||||
mailman.post
|
||||
end
|
||||
|
|
|
|||
|
|
@ -105,7 +105,6 @@ RSpec.configure do |config|
|
|||
config.before(:each) do
|
||||
I18n.locale = :en
|
||||
stub_request(:post, "https://pubsubhubbub.appspot.com/")
|
||||
disable_typhoeus
|
||||
$process_queue = false
|
||||
allow(Workers::SendPublic).to receive(:perform_async)
|
||||
allow(Workers::SendPrivate).to receive(:perform_async)
|
||||
|
|
|
|||
|
|
@ -1,22 +0,0 @@
|
|||
class FakeHydra
|
||||
def queue(*args); end
|
||||
def run; end
|
||||
end
|
||||
|
||||
class FakeHydraRequest
|
||||
def initialize(*args); end
|
||||
def on_complete; end
|
||||
end
|
||||
|
||||
def disable_typhoeus
|
||||
silence_warnings do
|
||||
Workers::HttpMulti.const_set('Hydra', FakeHydra)
|
||||
Workers::HttpMulti.const_set('Request', FakeHydraRequest)
|
||||
end
|
||||
end
|
||||
def enable_typhoeus
|
||||
silence_warnings do
|
||||
Workers::HttpMulti.const_set('Hydra', Typhoeus::Hydra)
|
||||
Workers::HttpMulti.const_set('Request', Typhoeus::Request)
|
||||
end
|
||||
end
|
||||
|
|
@ -1,140 +0,0 @@
|
|||
require 'spec_helper'
|
||||
|
||||
describe Workers::HttpMulti do
|
||||
before :all do
|
||||
WebMock.disable_net_connect! allow_localhost: true
|
||||
WebMock::HttpLibAdapters::TyphoeusAdapter.disable!
|
||||
enable_typhoeus
|
||||
end
|
||||
after :all do
|
||||
disable_typhoeus
|
||||
WebMock.disable_net_connect!
|
||||
end
|
||||
|
||||
before do
|
||||
@people = [FactoryGirl.create(:person), FactoryGirl.create(:person)]
|
||||
@post_xml = Base64.encode64 "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAH"
|
||||
|
||||
@hydra = Typhoeus::Hydra.new
|
||||
allow(Typhoeus::Hydra).to receive(:new).and_return(@hydra)
|
||||
@salmon = Salmon::EncryptedSlap.create_by_user_and_activity bob, Base64.decode64(@post_xml)
|
||||
allow(Salmon::EncryptedSlap).to receive(:create_by_user_and_activity).and_return @salmon
|
||||
@body = "encrypted things"
|
||||
allow(@salmon).to receive(:xml_for).and_return @body
|
||||
|
||||
@response = Typhoeus::Response.new(
|
||||
code: 200,
|
||||
body: "",
|
||||
time: 0.2,
|
||||
effective_url: 'http://foobar.com',
|
||||
return_code: :ok
|
||||
)
|
||||
@failed_response = Typhoeus::Response.new(
|
||||
code: 504,
|
||||
body: "",
|
||||
time: 0.2,
|
||||
effective_url: 'http://foobar.com',
|
||||
return_code: :ok
|
||||
)
|
||||
@ssl_error_response = Typhoeus::Response.new(
|
||||
code: 0,
|
||||
body: "",
|
||||
time: 0.2,
|
||||
effective_url: 'http://foobar.com',
|
||||
return_code: :ssl_connect_error
|
||||
)
|
||||
@unable_to_resolve_response = Typhoeus::Response.new(
|
||||
code: 0,
|
||||
body: "",
|
||||
time: 0.2,
|
||||
effective_url: 'http://foobar.com',
|
||||
return_code: :couldnt_resolve_host
|
||||
)
|
||||
end
|
||||
|
||||
it 'POSTs to more than one person' do
|
||||
@people.each do |person|
|
||||
Typhoeus.stub(person.receive_url).and_return @response
|
||||
end
|
||||
|
||||
expect(@hydra).to receive(:queue).twice
|
||||
expect(@hydra).to receive(:run).once
|
||||
|
||||
Workers::HttpMulti.new.perform bob.id, @post_xml, @people.map(&:id), "Postzord::Dispatcher::Private"
|
||||
end
|
||||
|
||||
it 'retries' do
|
||||
person = @people.first
|
||||
|
||||
Typhoeus.stub(person.receive_url).and_return @failed_response
|
||||
|
||||
expect(Workers::HttpMulti).to receive(:perform_in).with(1.hour, bob.id, @post_xml, [person.id], anything, 1).once
|
||||
Workers::HttpMulti.new.perform bob.id, @post_xml, [person.id], "Postzord::Dispatcher::Private"
|
||||
end
|
||||
|
||||
it 'retries if it could not resolve the server' do
|
||||
person = @people.first
|
||||
|
||||
Typhoeus.stub(person.receive_url).and_return @unable_to_resolve_response
|
||||
|
||||
expect(Workers::HttpMulti).to receive(:perform_in).with(1.hour, bob.id, @post_xml, [person.id], anything, 1).once
|
||||
Workers::HttpMulti.new.perform bob.id, @post_xml, [person.id], "Postzord::Dispatcher::Private"
|
||||
end
|
||||
|
||||
it 'does not retry on an SSL error' do
|
||||
person = @people.first
|
||||
|
||||
Typhoeus.stub(person.receive_url).and_return @ssl_error_response
|
||||
|
||||
expect(Workers::HttpMulti).not_to receive(:perform_in)
|
||||
Workers::HttpMulti.new.perform bob.id, @post_xml, [person.id], "Postzord::Dispatcher::Private"
|
||||
end
|
||||
|
||||
it 'max retries' do
|
||||
person = @people.first
|
||||
|
||||
Typhoeus.stub(person.receive_url).and_return @failed_response
|
||||
|
||||
expect(Workers::HttpMulti).not_to receive :perform_in
|
||||
Workers::HttpMulti.new.perform bob.id, @post_xml, [person.id], "Postzord::Dispatcher::Private", 3
|
||||
end
|
||||
|
||||
it 'generates encrypted xml for people' do
|
||||
person = @people.first
|
||||
|
||||
Typhoeus.stub(person.receive_url).and_return @response
|
||||
expect(@salmon).to receive(:xml_for).and_return @body
|
||||
|
||||
Workers::HttpMulti.new.perform bob.id, @post_xml, [person.id], "Postzord::Dispatcher::Private"
|
||||
end
|
||||
|
||||
it "updates http users who have moved to https" do
|
||||
person = @people.first
|
||||
|
||||
response = Typhoeus::Response.new(
|
||||
code: 301,
|
||||
effective_url: "https://example.net",
|
||||
response_headers: "Location: #{person.receive_url.sub('http://', 'https://')}",
|
||||
body: "",
|
||||
time: 0.2
|
||||
)
|
||||
Typhoeus.stub(person.receive_url).and_return response
|
||||
|
||||
Workers::HttpMulti.new.perform bob.id, @post_xml, [person.id], "Postzord::Dispatcher::Private"
|
||||
expect(Person.find(person.id).url).to eq("https://example.net/")
|
||||
end
|
||||
|
||||
it 'only sends to users with valid RSA keys' do
|
||||
person = @people.first
|
||||
person.serialized_public_key = "-----BEGIN RSA PUBLIC KEY-----\nPsych!\n-----END RSA PUBLIC KEY-----"
|
||||
person.save
|
||||
|
||||
allow(@salmon).to receive(:xml_for).and_call_original
|
||||
|
||||
Typhoeus.stub(person.receive_url).and_return @response
|
||||
Typhoeus.stub(@people[1].receive_url).and_return @response
|
||||
|
||||
expect(@hydra).to receive(:queue).once
|
||||
Workers::HttpMulti.new.perform bob.id, @post_xml, @people.map(&:id), "Postzord::Dispatcher::Private"
|
||||
end
|
||||
end
|
||||
Loading…
Reference in a new issue