DG MS; fixed tests; explicitly send in dispatcher in httpmulti
This commit is contained in:
parent
60ace5c297
commit
fab6f9ccd9
9 changed files with 72 additions and 125 deletions
|
|
@ -3,20 +3,22 @@
|
||||||
# the COPYRIGHT file.
|
# the COPYRIGHT file.
|
||||||
|
|
||||||
require 'uri'
|
require 'uri'
|
||||||
|
require File.join(Rails.root, 'lib/hydra_wrapper')
|
||||||
|
|
||||||
module Job
|
module Job
|
||||||
class HttpMulti < Base
|
class HttpMulti < Base
|
||||||
|
|
||||||
@queue = :http
|
@queue = :http
|
||||||
|
|
||||||
MAX_RETRIES = 3
|
MAX_RETRIES = 3
|
||||||
|
|
||||||
def self.perform(user_id, encoded_object_xml, person_ids, retry_count=0)
|
def self.perform(user_id, encoded_object_xml, person_ids, dispatcher_class_as_string, retry_count=0)
|
||||||
return true if user_id == '91842' #NOTE 09/08/11 blocking diapsorahqposts
|
return true if user_id == '91842' #NOTE 09/08/11 blocking diapsorahqposts
|
||||||
|
|
||||||
user = User.find(user_id)
|
user = User.find(user_id)
|
||||||
people = Person.where(:id => person_ids)
|
people = Person.where(:id => person_ids)
|
||||||
|
|
||||||
dispatcher = Postzord::Dispatcher::Private
|
dispatcher = dispatcher_class_as_string.constantize
|
||||||
hydra = HydraWrapper.new(user, people, encoded_object_xml, dispatcher)
|
hydra = HydraWrapper.new(user, people, encoded_object_xml, dispatcher)
|
||||||
|
|
||||||
hydra.enqueue_batch
|
hydra.enqueue_batch
|
||||||
|
|
@ -24,7 +26,7 @@ module Job
|
||||||
|
|
||||||
unless hydra.failed_people.empty?
|
unless hydra.failed_people.empty?
|
||||||
if retry_count < MAX_RETRIES
|
if retry_count < MAX_RETRIES
|
||||||
Resque.enqueue(Job::HttpMulti, user_id, encoded_object_xml, hydra.failed_people, retry_count + 1 )
|
Resque.enqueue(Job::HttpMulti, user_id, encoded_object_xml, hydra.failed_people, dispatcher_class_as_string, retry_count + 1 )
|
||||||
else
|
else
|
||||||
Rails.logger.info("event=http_multi_abandon sender_id=#{user_id} failed_recipient_ids='[#{person_ids.join(', ')}] '")
|
Rails.logger.info("event=http_multi_abandon sender_id=#{user_id} failed_recipient_ids='[#{person_ids.join(', ')}] '")
|
||||||
end
|
end
|
||||||
|
|
|
||||||
|
|
@ -5,22 +5,35 @@
|
||||||
|
|
||||||
class Postzord::Dispatcher
|
class Postzord::Dispatcher
|
||||||
require File.join(Rails.root, 'lib/postzord/dispatcher/private')
|
require File.join(Rails.root, 'lib/postzord/dispatcher/private')
|
||||||
#require File.join(Rails.root, 'lib/postzord/dispatcher/public')
|
require File.join(Rails.root, 'lib/postzord/dispatcher/public')
|
||||||
|
|
||||||
attr_reader :sender, :object, :xml, :subscribers
|
attr_reader :sender, :object, :xml, :subscribers
|
||||||
|
|
||||||
|
# @return [Postzord::Dispatcher] Public or private dispatcher depending on the object's intended audience
|
||||||
def self.build(user, object, opts={})
|
def self.build(user, object, opts={})
|
||||||
unless object.respond_to? :to_diaspora_xml
|
unless object.respond_to? :to_diaspora_xml
|
||||||
raise 'this object does not respond_to? to_diaspora xml. try including Diaspora::Webhooks into your object'
|
raise 'this object does not respond_to? to_diaspora xml. try including Diaspora::Webhooks into your object'
|
||||||
end
|
end
|
||||||
|
|
||||||
#if object.respond_to?(:public) && object.public?
|
#if self.object_should_be_processed_as_public?(object)
|
||||||
# Postzord::Dispatcher::Public.new(user, object, opts)
|
# Postzord::Dispatcher::Public.new(user, object, opts)
|
||||||
#else
|
#else
|
||||||
Postzord::Dispatcher::Private.new(user, object, opts)
|
Postzord::Dispatcher::Private.new(user, object, opts)
|
||||||
#end
|
#end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# @param object [Object]
|
||||||
|
# @return [Boolean]
|
||||||
|
def self.object_should_be_processed_as_public?(object)
|
||||||
|
if object.respond_to?(:public) && object.public?
|
||||||
|
true
|
||||||
|
elsif object.respond_to?(:relayable?) && object.parent.public?
|
||||||
|
true
|
||||||
|
else
|
||||||
|
false
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
# @return [Object]
|
# @return [Object]
|
||||||
def post(opts={})
|
def post(opts={})
|
||||||
self.post_to_subscribers if @subscribers.present?
|
self.post_to_subscribers if @subscribers.present?
|
||||||
|
|
@ -141,5 +154,12 @@ class Postzord::Dispatcher
|
||||||
@object.socket_to_user(user)
|
@object.socket_to_user(user)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# Enqueues a job in Resque
|
||||||
|
# @param remote_people [Array<Person>] Recipients of the post on other pods
|
||||||
|
# @return [void]
|
||||||
|
def queue_remote_delivery_job(remote_people)
|
||||||
|
Resque.enqueue(Job::HttpMulti, @sender.id, Base64.encode64(@object.to_diaspora_xml), remote_people.map{|p| p.id}, self.class.to_s)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -28,11 +28,4 @@ class Postzord::Dispatcher::Private < Postzord::Dispatcher
|
||||||
def self.receive_url_for(person)
|
def self.receive_url_for(person)
|
||||||
person.receive_url
|
person.receive_url
|
||||||
end
|
end
|
||||||
|
|
||||||
# Enqueues a job in Resque
|
|
||||||
# @param remote_people [Array<Person>] Recipients of the post on other pods
|
|
||||||
# @return [void]
|
|
||||||
def queue_remote_delivery_job(remote_people)
|
|
||||||
Resque.enqueue(Job::HttpMulti, @sender.id, Base64.encode64(@object.to_diaspora_xml), remote_people.map{|p| p.id})
|
|
||||||
end
|
|
||||||
end
|
end
|
||||||
|
|
|
||||||
31
lib/postzord/dispatcher/public.rb
Normal file
31
lib/postzord/dispatcher/public.rb
Normal file
|
|
@ -0,0 +1,31 @@
|
||||||
|
# Copyright (c) 2010-2011, Diaspora Inc. This file is
|
||||||
|
# licensed under the Affero General Public License version 3 or later. See
|
||||||
|
# the COPYRIGHT file.
|
||||||
|
|
||||||
|
class Postzord::Dispatcher::Public < Postzord::Dispatcher
|
||||||
|
|
||||||
|
# @param user [User] User dispatching the object in question
|
||||||
|
# @param object [Object] The object to be sent to other Diaspora installations
|
||||||
|
# @opt additional_subscribers [Array<Person>] Additional subscribers
|
||||||
|
def initialize(user, object, opts={})
|
||||||
|
@sender = user
|
||||||
|
@object = object
|
||||||
|
@xml = @object.to_diaspora_xml
|
||||||
|
|
||||||
|
additional_subscribers = opts[:additional_subscribers] || []
|
||||||
|
@subscribers = subscribers_from_object | [*additional_subscribers]
|
||||||
|
end
|
||||||
|
|
||||||
|
# @param user [User]
|
||||||
|
# @param activity [String]
|
||||||
|
# @return [Salmon::EncryptedSlap]
|
||||||
|
def self.salmon(user, activity)
|
||||||
|
Salmon::Slap.create_by_user_and_activity(user, activity)
|
||||||
|
end
|
||||||
|
|
||||||
|
# @param person [Person]
|
||||||
|
# @return [String]
|
||||||
|
def self.receive_url_for(person)
|
||||||
|
person.url + 'receive/public'
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
@ -1,3 +1,7 @@
|
||||||
|
# Copyright (c) 2010-2011, Diaspora Inc. This file is
|
||||||
|
# licensed under the Affero General Public License version 3 or later. See
|
||||||
|
# the COPYRIGHT file.
|
||||||
|
|
||||||
require 'aspect_stream'
|
require 'aspect_stream'
|
||||||
|
|
||||||
describe AspectStream do
|
describe AspectStream do
|
||||||
|
|
|
||||||
|
|
@ -2,110 +2,9 @@
|
||||||
# licensed under the Affero General Public License version 3 or later. See
|
# licensed under the Affero General Public License version 3 or later. See
|
||||||
# the COPYRIGHT file.
|
# the COPYRIGHT file.
|
||||||
|
|
||||||
|
require 'hydra_wrapper'
|
||||||
require 'lib/hydra_wrapper'
|
|
||||||
|
|
||||||
describe HydraWrapper do
|
describe HydraWrapper do
|
||||||
|
|
||||||
context 'intergration' do
|
|
||||||
pending
|
|
||||||
before :all do
|
|
||||||
enable_typhoeus
|
|
||||||
end
|
|
||||||
|
|
||||||
after :all do
|
|
||||||
disable_typhoeus
|
|
||||||
end
|
|
||||||
|
|
||||||
before do
|
|
||||||
@people = [Factory(:person), Factory(:person)]
|
|
||||||
@post_xml = Base64.encode64("AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAH")
|
|
||||||
|
|
||||||
@hydra = Typhoeus::Hydra.new
|
|
||||||
@response = Typhoeus::Response.new(:code => 200, :headers => "", :body => "", :time => 0.2, :effective_url => 'http://foobar.com')
|
|
||||||
@failed_response = Typhoeus::Response.new(:code => 504, :headers => "", :body => "", :time => 0.2, :effective_url => 'http://foobar.com')
|
|
||||||
end
|
|
||||||
|
|
||||||
it 'POSTs to more than one person' do
|
|
||||||
@people.each do |person|
|
|
||||||
@hydra.stub(:post, person.receive_url).and_return(@response)
|
|
||||||
end
|
|
||||||
|
|
||||||
@hydra.should_receive(:queue).twice
|
|
||||||
@hydra.should_receive(:run).once
|
|
||||||
Typhoeus::Hydra.stub!(:new).and_return(@hydra)
|
|
||||||
|
|
||||||
people_ids = @people.map{ |p| p.id }
|
|
||||||
Job::HttpMulti.perform(bob.id, @post_xml, people_ids)
|
|
||||||
end
|
|
||||||
|
|
||||||
it 'retries' do
|
|
||||||
person = @people[0]
|
|
||||||
|
|
||||||
@hydra.stub(:post, person.receive_url).and_return(@failed_response)
|
|
||||||
|
|
||||||
Typhoeus::Hydra.stub!(:new).and_return(@hydra)
|
|
||||||
|
|
||||||
Resque.should_receive(:enqueue).with(Job::HttpMulti, bob.id, @post_xml, [person.id], 1).once
|
|
||||||
Job::HttpMulti.perform(bob.id, @post_xml, [person.id])
|
|
||||||
end
|
|
||||||
|
|
||||||
it 'max retries' do
|
|
||||||
person = @people[0]
|
|
||||||
|
|
||||||
@hydra.stub(:post, person.receive_url).and_return(@failed_response)
|
|
||||||
|
|
||||||
Typhoeus::Hydra.stub!(:new).and_return(@hydra)
|
|
||||||
|
|
||||||
Resque.should_not_receive(:enqueue)
|
|
||||||
Job::HttpMulti.perform(bob.id, @post_xml, [person.id], 3)
|
|
||||||
end
|
|
||||||
|
|
||||||
it 'generates encrypted xml for people' do
|
|
||||||
person = @people[0]
|
|
||||||
|
|
||||||
@hydra.stub(:post, person.receive_url).and_return(@response)
|
|
||||||
|
|
||||||
Typhoeus::Hydra.stub!(:new).and_return(@hydra)
|
|
||||||
|
|
||||||
salmon = Salmon::EncryptedSlap.create_by_user_and_activity(bob, Base64.decode64(@post_xml))
|
|
||||||
Salmon::EncryptedSlap.stub(:create_by_user_and_activity).and_return(salmon)
|
|
||||||
salmon.should_receive(:xml_for).and_return("encrypted things")
|
|
||||||
|
|
||||||
Job::HttpMulti.perform(bob.id, @post_xml, [person.id])
|
|
||||||
end
|
|
||||||
|
|
||||||
it 'updates http users who have moved to https' do
|
|
||||||
person = @people.first
|
|
||||||
person.url = 'http://remote.net/'
|
|
||||||
person.save
|
|
||||||
response = Typhoeus::Response.new(:code => 301,:effective_url => 'https://foobar.com', :headers_hash => {"Location" => person.receive_url.sub('http://', 'https://')}, :body => "", :time => 0.2)
|
|
||||||
@hydra.stub(:post, person.receive_url).and_return(response)
|
|
||||||
|
|
||||||
Typhoeus::Hydra.stub!(:new).and_return(@hydra)
|
|
||||||
|
|
||||||
Job::HttpMulti.perform(bob.id, @post_xml, [person.id])
|
|
||||||
person.reload
|
|
||||||
person.url.should == "https://remote.net/"
|
|
||||||
end
|
|
||||||
|
|
||||||
it 'only sends to users with valid RSA keys' do
|
|
||||||
person = @people[0]
|
|
||||||
person.serialized_public_key = "-----BEGIN RSA PUBLIC KEY-----\nPsych!\n-----END RSA PUBLIC KEY-----"
|
|
||||||
person.save
|
|
||||||
|
|
||||||
@hydra.stub(:post, @people[0].receive_url).and_return(@response)
|
|
||||||
@hydra.stub(:post, @people[1].receive_url).and_return(@response)
|
|
||||||
Typhoeus::Hydra.stub!(:new).and_return(@hydra)
|
|
||||||
|
|
||||||
@hydra.should_receive(:queue).once
|
|
||||||
Job::HttpMulti.perform(bob.id, @post_xml, [@people[0].id, @people[1].id])
|
|
||||||
end
|
|
||||||
end
|
|
||||||
|
|
||||||
|
|
||||||
###############
|
|
||||||
|
|
||||||
before do
|
before do
|
||||||
@wrapper = HydraWrapper.new(stub, [stub, stub, stub], stub, stub)
|
@wrapper = HydraWrapper.new(stub, [stub, stub, stub], stub, stub)
|
||||||
end
|
end
|
||||||
|
|
|
||||||
|
|
@ -3,7 +3,6 @@
|
||||||
# the COPYRIGHT file.
|
# the COPYRIGHT file.
|
||||||
|
|
||||||
require 'spec_helper'
|
require 'spec_helper'
|
||||||
|
|
||||||
require File.join(Rails.root, 'lib/postzord/dispatcher/private')
|
require File.join(Rails.root, 'lib/postzord/dispatcher/private')
|
||||||
|
|
||||||
describe Postzord::Dispatcher::Private do
|
describe Postzord::Dispatcher::Private do
|
||||||
|
|
|
||||||
|
|
@ -40,7 +40,6 @@ describe Postzord::Dispatcher do
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'raises and gives you a helpful message if the object can not federate' do
|
it 'raises and gives you a helpful message if the object can not federate' do
|
||||||
pending "put this in the base class!"
|
|
||||||
expect {
|
expect {
|
||||||
Postzord::Dispatcher.build(alice, [])
|
Postzord::Dispatcher.build(alice, [])
|
||||||
}.should raise_error /Diaspora::Webhooks/
|
}.should raise_error /Diaspora::Webhooks/
|
||||||
|
|
@ -231,7 +230,7 @@ describe Postzord::Dispatcher do
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'should queue an HttpPost job for each remote person' do
|
it 'should queue an HttpPost job for each remote person' do
|
||||||
Resque.should_receive(:enqueue).with(Job::HttpMulti, alice.id, anything, @remote_people.map{|p| p.id}).once
|
Resque.should_receive(:enqueue).with(Job::HttpMulti, alice.id, anything, @remote_people.map{|p| p.id}, anything).once
|
||||||
@mailman.send(:deliver_to_remote, @remote_people)
|
@mailman.send(:deliver_to_remote, @remote_people)
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
@ -28,7 +28,7 @@ describe Job::HttpMulti do
|
||||||
Typhoeus::Hydra.stub!(:new).and_return(@hydra)
|
Typhoeus::Hydra.stub!(:new).and_return(@hydra)
|
||||||
|
|
||||||
people_ids = @people.map{ |p| p.id }
|
people_ids = @people.map{ |p| p.id }
|
||||||
Job::HttpMulti.perform(bob.id, @post_xml, people_ids)
|
Job::HttpMulti.perform(bob.id, @post_xml, people_ids, "Postzord::Dispatcher::Private")
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'retries' do
|
it 'retries' do
|
||||||
|
|
@ -38,8 +38,8 @@ describe Job::HttpMulti do
|
||||||
|
|
||||||
Typhoeus::Hydra.stub!(:new).and_return(@hydra)
|
Typhoeus::Hydra.stub!(:new).and_return(@hydra)
|
||||||
|
|
||||||
Resque.should_receive(:enqueue).with(Job::HttpMulti, bob.id, @post_xml, [person.id], 1).once
|
Resque.should_receive(:enqueue).with(Job::HttpMulti, bob.id, @post_xml, [person.id], anything, 1).once
|
||||||
Job::HttpMulti.perform(bob.id, @post_xml, [person.id])
|
Job::HttpMulti.perform(bob.id, @post_xml, [person.id], "Postzord::Dispatcher::Private")
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'max retries' do
|
it 'max retries' do
|
||||||
|
|
@ -50,7 +50,7 @@ describe Job::HttpMulti do
|
||||||
Typhoeus::Hydra.stub!(:new).and_return(@hydra)
|
Typhoeus::Hydra.stub!(:new).and_return(@hydra)
|
||||||
|
|
||||||
Resque.should_not_receive(:enqueue)
|
Resque.should_not_receive(:enqueue)
|
||||||
Job::HttpMulti.perform(bob.id, @post_xml, [person.id], 3)
|
Job::HttpMulti.perform(bob.id, @post_xml, [person.id], "Postzord::Dispatcher::Private", 3)
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'generates encrypted xml for people' do
|
it 'generates encrypted xml for people' do
|
||||||
|
|
@ -64,7 +64,7 @@ describe Job::HttpMulti do
|
||||||
Salmon::EncryptedSlap.stub(:create_by_user_and_activity).and_return(salmon)
|
Salmon::EncryptedSlap.stub(:create_by_user_and_activity).and_return(salmon)
|
||||||
salmon.should_receive(:xml_for).and_return("encrypted things")
|
salmon.should_receive(:xml_for).and_return("encrypted things")
|
||||||
|
|
||||||
Job::HttpMulti.perform(bob.id, @post_xml, [person.id])
|
Job::HttpMulti.perform(bob.id, @post_xml, [person.id], "Postzord::Dispatcher::Private")
|
||||||
end
|
end
|
||||||
|
|
||||||
it 'updates http users who have moved to https' do
|
it 'updates http users who have moved to https' do
|
||||||
|
|
@ -76,7 +76,7 @@ describe Job::HttpMulti do
|
||||||
|
|
||||||
Typhoeus::Hydra.stub!(:new).and_return(@hydra)
|
Typhoeus::Hydra.stub!(:new).and_return(@hydra)
|
||||||
|
|
||||||
Job::HttpMulti.perform(bob.id, @post_xml, [person.id])
|
Job::HttpMulti.perform(bob.id, @post_xml, [person.id], "Postzord::Dispatcher::Private")
|
||||||
person.reload
|
person.reload
|
||||||
person.url.should == "https://remote.net/"
|
person.url.should == "https://remote.net/"
|
||||||
end
|
end
|
||||||
|
|
@ -91,6 +91,6 @@ describe Job::HttpMulti do
|
||||||
Typhoeus::Hydra.stub!(:new).and_return(@hydra)
|
Typhoeus::Hydra.stub!(:new).and_return(@hydra)
|
||||||
|
|
||||||
@hydra.should_receive(:queue).once
|
@hydra.should_receive(:queue).once
|
||||||
Job::HttpMulti.perform(bob.id, @post_xml, [@people[0].id, @people[1].id])
|
Job::HttpMulti.perform(bob.id, @post_xml, [@people[0].id, @people[1].id], "Postzord::Dispatcher::Private")
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue