made http calls synchronous. messagehandler enqueues POSTs into resque.

This commit is contained in:
danielvincent 2010-12-03 15:25:00 -08:00
parent f4fa3191bc
commit a7823485c7
11 changed files with 87 additions and 238 deletions

View file

@ -0,0 +1,12 @@
module Jobs
class HttpPost
@queue = :http
def self.perform(url, body, tries_remaining)
begin
RestClient.post(url, body)
rescue
Resque.enqueue(self, url, body, tries_remaining -1) unless tries_remaining <= 1
end
end
end
end

View file

@ -13,8 +13,6 @@ class User
plugin MongoMapper::Devise
QUEUE = MessageHandler.new
devise :invitable, :database_authenticatable, :registerable,
:recoverable, :rememberable, :trackable, :validatable
@ -185,7 +183,7 @@ class User
end
def post_to_facebook(service, message)
Rails.logger.info("Sending a message: #{message} to Facebook")
Rails.logger.debug("event=post_to_service type=facebook sender_handle=#{self.diaspora_handle}")
begin
RestClient.post("https://graph.facebook.com/me/feed", :message => message, :access_token => service.access_token)
rescue Exception => e
@ -193,7 +191,9 @@ class User
end
end
def post_to_twitter(service, message)
def post_to_twitter(service, message)
Rails.logger.debug("event=post_to_service type=twitter sender_handle=#{self.diaspora_handle}")
twitter_key = SERVICES['twitter']['consumer_key']
twitter_consumer_secret = SERVICES['twitter']['consumer_secret']
@ -211,6 +211,12 @@ class User
Twitter.update(message)
end
def post_to_hub(post)
Rails.logger.debug("event=post_to_service type=pubsub sender_handle=#{self.diaspora_handle}")
EventMachine::PubSubHubbub.new(APP_CONFIG[:pubsub_server]).publish self.public_url
end
def update_post(post, post_hash = {})
if self.owns? post
post.update_attributes(post_hash)
@ -249,7 +255,7 @@ class User
contacts = contacts | aspect.contacts
}
push_to_hub(post) if post.respond_to?(:public) && post.public
post_to_hub(post) if post.respond_to?(:public) && post.public
push_to_people(post, self.person_objects(target_contacts))
end
@ -266,20 +272,15 @@ class User
# calling nil? performs a necessary evaluation.
if person.owner_id
Rails.logger.info("event=push_to_person route=local sender=#{self.diaspora_handle} recipient=#{person.diaspora_handle} payload_type=#{post.class}")
#Resque.enqueue(Jobs::Receive, person.owner_id, post.to_diaspora_xml, self.person.id)
Jobs::Receive.perform(person.owner_id, post.to_diaspora_xml, self.person.id)
else
xml = salmon.xml_for person
Rails.logger.info("event=push_to_person route=remote sender=#{self.diaspora_handle} recipient=#{person.diaspora_handle} payload_type=#{post.class}")
QUEUE.add_post_request(person.receive_url, xml)
QUEUE.process
MessageHandler.add_post_request(person.receive_url, xml)
end
end
def push_to_hub(post)
Rails.logger.debug("event=push_to_hub target=#{APP_CONFIG[:pubsub_server]} sender_url=#{self.public_url}")
QUEUE.add_hub_notification(APP_CONFIG[:pubsub_server], self.public_url)
end
def salmon(post)
created_salmon = Salmon::SalmonSlap.create(self, post.to_diaspora_xml)

View file

@ -13,6 +13,7 @@ Bundler.require(:default, Rails.env) if defined?(Bundler)
require File.expand_path('../../lib/mongo_mapper/bson_id', __FILE__)
require File.expand_path('../../lib/log_overrider', __FILE__)
require File.expand_path('../../lib/message_handler', __FILE__)
module Diaspora
class Application < Rails::Application
# Settings in config/environments/* take precedence over those specified here.

View file

@ -2,73 +2,14 @@
# licensed under the Affero General Public License version 3 or later. See
# the COPYRIGHT file.
class MessageHandler
module MessageHandler
NUM_TRIES = 3
TIMEOUT = 10 #seconds
REDIRECTS = 3
def initialize
@queue = EM::Queue.new
end
def clear_queue
@queue = EM::Queue.new
end
def add_get_request(destinations)
[*destinations].each{ |dest| @queue.push(Message.new(:get, dest))}
end
def add_post_request(destinations, body)
def self.add_post_request(destinations, body)
b = CGI::escape( body )
[*destinations].each{|dest| @queue.push(Message.new(:post, dest, :body => b))}
end
def add_hub_notification(hub_url, feed_url)
@queue.push(Message.new(:hub_publish, hub_url, :body => feed_url))
end
def process
@queue.pop{ |query|
case query.type
when :post
http = EventMachine::HttpRequest.new(query.destination).post :timeout => TIMEOUT, :redirects => REDIRECTS, :body =>{:xml => query.body}
http.callback { process; process}
when :get
http = EventMachine::HttpRequest.new(query.destination).get :timeout => TIMEOUT, :redirects => REDIRECTS
http.callback {process}
when :hub_publish
http = EventMachine::PubSubHubbub.new(query.destination).publish query.body, :timeout => TIMEOUT, :redirects => REDIRECTS
http.callback {process}
else
raise "message is not a type I know!"
end
http.errback {
Rails.logger.info(http.response)
Rails.logger.info("Failure from #{query.destination}, retrying...")
query.try_count +=1
@queue.push query unless query.try_count >= NUM_TRIES
process
}
} unless @queue.size == 0
end
def size
@queue.size
end
class Message
attr_accessor :type, :destination, :body, :callback, :owner_url, :try_count
def initialize(type, dest, opts = {})
@type = type
@owner_url = opts[:owner_url]
@destination = dest
@body = opts[:body]
@callback = opts[:callback] ||= lambda{ process; process }
@try_count = 0
[*destinations].each do |dest|
Resque.enqueue(Jobs::HttpPost, dest, body, NUM_TRIES)
end
end
end

View file

@ -22,10 +22,6 @@ module HelperMethods
user.stub!(:push_to_person)
end
def message_queue
User::QUEUE
end
def connect_users(user1, aspect1, user2, aspect2)
user1.send_contact_request_to(user2.person, aspect1)

View file

@ -5,158 +5,23 @@
require 'spec_helper'
describe MessageHandler do
let(:handler) {MessageHandler.new()}
let(:message_body) {"I want to pump you up"}
let(:message_urls) {["http://www.google.com/", "http://yahoo.com/", "http://foo.com/"]}
let(:success) {FakeHttpRequest.new(:success)}
let(:failure) {FakeHttpRequest.new(:failure)}
before do
handler.clear_queue
end
after(:all) do
handler.clear_queue
end
describe 'GET messages' do
describe 'creating a GET query' do
it 'should be able to add a GET query to the queue with required destinations' do
EventMachine.run{
handler.add_get_request(message_urls)
handler.size.should == message_urls.size
EventMachine.stop
}
end
end
describe 'processing a GET query' do
it 'should remove sucessful http requests from the queue' do
EventMachine::HttpRequest.stub!(:new).and_return(success)
EventMachine.run {
handler.add_get_request("http://www.google.com/")
handler.size.should == 1
handler.process
handler.size.should == 0
EventMachine.stop
}
end
it 'should only retry a bad request the correct number of times' do
failure.should_receive(:get).exactly(MessageHandler::NUM_TRIES).times.and_return(failure)
EventMachine::HttpRequest.stub!(:new).and_return(failure)
EventMachine.run {
handler.add_get_request("http://asdfsdajfsdfbasdj.com/")
handler.size.should == 1
handler.process
handler.size.should == 0
EventMachine.stop
}
end
end
end
describe 'POST messages' do
it 'should be able to add a post message to the queue' do
EventMachine::HttpRequest.stub!(:new).and_return(success)
EventMachine.run {
handler.size.should ==0
handler.add_post_request(message_urls.first, message_body)
handler.size.should == 1
EventMachine.stop
}
end
it 'should be able to insert many posts into the queue' do
EventMachine::HttpRequest.stub!(:new).and_return(success)
EventMachine.run {
handler.size.should == 0
handler.add_post_request(message_urls, message_body)
handler.size.should == message_urls.size
EventMachine.stop
}
end
it 'should post a single message to a given URL' do
success.should_receive(:post).and_return(success)
EventMachine::HttpRequest.stub!(:new).and_return(success)
EventMachine.run{
handler.add_post_request(message_urls.first, message_body)
handler.size.should == 1
handler.process
handler.size.should == 0
EventMachine.stop
}
end
end
describe "Hub publish" do
before do
EventMachine::PubSubHubbub.stub(:new).and_return(:success)
@num_tries = MessageHandler::NUM_TRIES
end
it 'should correctly queue up a pubsubhubbub publish request' do
destination = "http://identi.ca/hub/"
feed_location = "http://google.com/"
EventMachine.run {
handler.add_hub_notification(destination, feed_location)
q = handler.instance_variable_get(:@queue)
message = ""
q.pop{|m| message = m}
message.destination.should == destination
message.body.should == feed_location
EventMachine.stop
}
end
end
describe "Mixed Queries" do
it 'should process both POST and GET requests in the same queue' do
success.should_receive(:get).exactly(3).times.and_return(success)
success.should_receive(:post).exactly(3).times.and_return(success)
EventMachine::HttpRequest.stub!(:new).and_return(success)
EventMachine.run{
handler.add_post_request(message_urls, message_body)
handler.size.should == 3
handler.add_get_request(message_urls)
handler.size.should == 6
handler.process
timer = EventMachine::Timer.new(1) do
handler.size.should == 0
EventMachine.stop
end
}
it 'enqueues a POST' do
Resque.should_receive(:enqueue).with(Jobs::HttpPost, message_urls.first, message_body, @num_tries)
MessageHandler.add_post_request(message_urls.first, message_body)
end
it 'should be able to have seperate POST and GET have different callbacks' do
success.should_receive(:get).exactly(1).times.and_return(success)
success.should_receive(:post).exactly(1).times.and_return(success)
EventMachine::HttpRequest.stub!(:new).and_return(success)
EventMachine.run{
handler.add_post_request(message_urls.first, message_body)
handler.add_get_request(message_urls.first)
handler.process
EventMachine.stop
}
it 'enqueues multiple POSTs' do
message_urls.each do |url|
Resque.should_receive(:enqueue).with(Jobs::HttpPost, url, message_body, @num_tries).once
end
MessageHandler.add_post_request(message_urls, message_body)
end
end
end

View file

@ -74,7 +74,7 @@ describe Comment do
it 'should not send out comments when we have no people' do
status = Factory.create(:status_message, :person => user.person)
User::QUEUE.should_not_receive(:add_post_request)
MessageHandler.should_not_receive(:add_post_request)
user.comment "sup dog", :on => status
end
@ -95,12 +95,12 @@ describe Comment do
it "should send a user's comment on a person's post to that person" do
User::QUEUE.should_receive(:add_post_request).once
MessageHandler.should_receive(:add_post_request).once
user.comment "yo", :on => @person_status
end
it 'should send a user comment on his own post to lots of people' do
User::QUEUE.should_receive(:add_post_request).once
MessageHandler.should_receive(:add_post_request).once
user2.raw_visible_posts.count.should == 0
@ -112,13 +112,13 @@ describe Comment do
it 'should send a comment a person made on your post to all people' do
comment = Comment.new(:person_id => @person.id, :diaspora_handle => @person.diaspora_handle, :text => "cats", :post => @user_status)
User::QUEUE.should_receive(:add_post_request).once
MessageHandler.should_receive(:add_post_request).once
user.receive comment.to_diaspora_xml, @person
end
it 'should send a comment a user made on your post to all people' do
comment = user2.comment( "balls", :on => @user_status)
User::QUEUE.should_receive(:add_post_request).once
MessageHandler.should_receive(:add_post_request).once
user.receive comment.to_diaspora_xml, user2.person
end
@ -128,13 +128,13 @@ describe Comment do
end
it 'should not send a comment a person made on his own post to anyone' do
User::QUEUE.should_not_receive(:add_post_request)
MessageHandler.should_not_receive(:add_post_request)
comment = Comment.new(:person_id => @person.id, :diaspora_handle => @person.diaspora_handle, :text => "cats", :post => @person_status)
user.receive comment.to_diaspora_xml, @person
end
it 'should not send a comment a person made on a person post to anyone' do
User::QUEUE.should_not_receive(:add_post_request)
MessageHandler.should_not_receive(:add_post_request)
comment = Comment.new(:person_id => @person2.id, :diaspora_handle => @person.diaspora_handle, :text => "cats", :post => @person_status)
user.receive comment.to_diaspora_xml, @person
end

View file

@ -0,0 +1,17 @@
require 'spec_helper'
describe Jobs::HttpPost do
before do
@url = 'example.org/things/on/fire'
@body = '<xml>California</xml>'
end
it 'POSTs to a given URL' do
RestClient.should_receive(:post).with(@url, @body).and_return(true)
Jobs::HttpPost.perform(@url, @body, 3)
end
it 'retries' do
RestClient.should_receive(:post).with(@url, @body).and_raise(SocketError)
Resque.should_receive(:enqueue).with(Jobs::HttpPost, @url, @body, 1).once
Jobs::HttpPost.perform(@url, @body, 2)
end
end

View file

@ -23,7 +23,7 @@ describe Retraction do
describe 'dispatching' do
it 'should dispatch a message on delete' do
Factory.create(:person)
User::QUEUE.should_receive :add_post_request
MessageHandler.should_receive :add_post_request
post.destroy
end
end

View file

@ -85,6 +85,18 @@ describe User do
user.dispatch_post(status, :to => "all")
end
it "posts to a pubsub hub if enabled" do
EventMachine::PubSubHubbub.should_receive(:new).and_return(FakeHttpRequest.new(:success))
destination = "http://identi.ca/hub/"
feed_location = "http://google.com/"
EventMachine.run {
user.post_to_hub(feed_location)
EventMachine.stop
}
end
it "does not post to services if post is not public" do
@status_opts[:public] = false
status.save
@ -166,7 +178,7 @@ describe User do
end
it 'does not use the queue for local transfer' do
User::QUEUE.should_receive(:add_post_request).once
MessageHandler.should_receive(:add_post_request).once
remote_person = user4.person
remote_person.owner_id = nil

View file

@ -86,23 +86,27 @@ describe User do
aspect.reload
end
it "should add a received post to the aspect and visible_posts array" do
it "adds a received post to the aspect and visible_posts array" do
user.raw_visible_posts.include?(@status_message).should be_true
aspect.posts.include?(@status_message).should be_true
end
it 'should be removed on disconnecting' do
it 'removes posts upon disconnecting' do
user.disconnect(user2.person)
user.reload
user.raw_visible_posts.should_not include @status_message
end
it 'should be remove a post if the noone links to it' do
it 'deletes a post if the noone links to it' do
person = user2.person
user2.delete
person.reload
person.owner_id = nil
person.save
@status_message.user_refs = 1
@status_message.save
lambda {user.disconnect(person)}.should change(Post, :count).by(-1)
lambda {
user.disconnected_by(user2.person)
}.should change(Post, :count).by(-1)
end
it 'should keep track of user references for one person ' do