diff --git a/app/models/jobs/http_post.rb b/app/models/jobs/http_post.rb new file mode 100644 index 000000000..6feb8c140 --- /dev/null +++ b/app/models/jobs/http_post.rb @@ -0,0 +1,12 @@ +module Jobs + class HttpPost + @queue = :http + def self.perform(url, body, tries_remaining) + begin + RestClient.post(url, body) + rescue + Resque.enqueue(self, url, body, tries_remaining -1) unless tries_remaining <= 1 + end + end + end +end diff --git a/app/models/user.rb b/app/models/user.rb index a82a0f9fc..fad092f69 100644 --- a/app/models/user.rb +++ b/app/models/user.rb @@ -13,8 +13,6 @@ class User plugin MongoMapper::Devise - QUEUE = MessageHandler.new - devise :invitable, :database_authenticatable, :registerable, :recoverable, :rememberable, :trackable, :validatable @@ -185,7 +183,7 @@ class User end def post_to_facebook(service, message) - Rails.logger.info("Sending a message: #{message} to Facebook") + Rails.logger.debug("event=post_to_service type=facebook sender_handle=#{self.diaspora_handle}") begin RestClient.post("https://graph.facebook.com/me/feed", :message => message, :access_token => service.access_token) rescue Exception => e @@ -193,7 +191,9 @@ class User end end - def post_to_twitter(service, message) + def post_to_twitter(service, message) + Rails.logger.debug("event=post_to_service type=twitter sender_handle=#{self.diaspora_handle}") + twitter_key = SERVICES['twitter']['consumer_key'] twitter_consumer_secret = SERVICES['twitter']['consumer_secret'] @@ -211,6 +211,12 @@ class User Twitter.update(message) end + def post_to_hub(post) + Rails.logger.debug("event=post_to_service type=pubsub sender_handle=#{self.diaspora_handle}") + + EventMachine::PubSubHubbub.new(APP_CONFIG[:pubsub_server]).publish self.public_url + end + def update_post(post, post_hash = {}) if self.owns? post post.update_attributes(post_hash) @@ -249,7 +255,7 @@ class User contacts = contacts | aspect.contacts } - push_to_hub(post) if post.respond_to?(:public) && post.public + post_to_hub(post) if post.respond_to?(:public) && post.public push_to_people(post, self.person_objects(target_contacts)) end @@ -266,20 +272,15 @@ class User # calling nil? performs a necessary evaluation. if person.owner_id Rails.logger.info("event=push_to_person route=local sender=#{self.diaspora_handle} recipient=#{person.diaspora_handle} payload_type=#{post.class}") - #Resque.enqueue(Jobs::Receive, person.owner_id, post.to_diaspora_xml, self.person.id) Jobs::Receive.perform(person.owner_id, post.to_diaspora_xml, self.person.id) else xml = salmon.xml_for person Rails.logger.info("event=push_to_person route=remote sender=#{self.diaspora_handle} recipient=#{person.diaspora_handle} payload_type=#{post.class}") - QUEUE.add_post_request(person.receive_url, xml) - QUEUE.process + MessageHandler.add_post_request(person.receive_url, xml) end end - def push_to_hub(post) - Rails.logger.debug("event=push_to_hub target=#{APP_CONFIG[:pubsub_server]} sender_url=#{self.public_url}") - QUEUE.add_hub_notification(APP_CONFIG[:pubsub_server], self.public_url) - end + def salmon(post) created_salmon = Salmon::SalmonSlap.create(self, post.to_diaspora_xml) diff --git a/config/application.rb b/config/application.rb index ff901fd5d..612df55c3 100644 --- a/config/application.rb +++ b/config/application.rb @@ -13,6 +13,7 @@ Bundler.require(:default, Rails.env) if defined?(Bundler) require File.expand_path('../../lib/mongo_mapper/bson_id', __FILE__) require File.expand_path('../../lib/log_overrider', __FILE__) +require File.expand_path('../../lib/message_handler', __FILE__) module Diaspora class Application < Rails::Application # Settings in config/environments/* take precedence over those specified here. diff --git a/lib/message_handler.rb b/lib/message_handler.rb index 101de70b2..9aa001fc0 100644 --- a/lib/message_handler.rb +++ b/lib/message_handler.rb @@ -2,73 +2,14 @@ # licensed under the Affero General Public License version 3 or later. See # the COPYRIGHT file. -class MessageHandler +module MessageHandler NUM_TRIES = 3 - TIMEOUT = 10 #seconds - REDIRECTS = 3 - def initialize - @queue = EM::Queue.new - end - - def clear_queue - @queue = EM::Queue.new - end - - def add_get_request(destinations) - [*destinations].each{ |dest| @queue.push(Message.new(:get, dest))} - end - - def add_post_request(destinations, body) + def self.add_post_request(destinations, body) b = CGI::escape( body ) - [*destinations].each{|dest| @queue.push(Message.new(:post, dest, :body => b))} - end - - def add_hub_notification(hub_url, feed_url) - @queue.push(Message.new(:hub_publish, hub_url, :body => feed_url)) - end - - def process - @queue.pop{ |query| - case query.type - when :post - http = EventMachine::HttpRequest.new(query.destination).post :timeout => TIMEOUT, :redirects => REDIRECTS, :body =>{:xml => query.body} - http.callback { process; process} - when :get - http = EventMachine::HttpRequest.new(query.destination).get :timeout => TIMEOUT, :redirects => REDIRECTS - http.callback {process} - when :hub_publish - http = EventMachine::PubSubHubbub.new(query.destination).publish query.body, :timeout => TIMEOUT, :redirects => REDIRECTS - http.callback {process} - else - raise "message is not a type I know!" - end - - http.errback { - Rails.logger.info(http.response) - Rails.logger.info("Failure from #{query.destination}, retrying...") - - query.try_count +=1 - @queue.push query unless query.try_count >= NUM_TRIES - process - } - } unless @queue.size == 0 - end - - def size - @queue.size - end - - class Message - attr_accessor :type, :destination, :body, :callback, :owner_url, :try_count - def initialize(type, dest, opts = {}) - @type = type - @owner_url = opts[:owner_url] - @destination = dest - @body = opts[:body] - @callback = opts[:callback] ||= lambda{ process; process } - @try_count = 0 + [*destinations].each do |dest| + Resque.enqueue(Jobs::HttpPost, dest, body, NUM_TRIES) end end end diff --git a/spec/helper_methods.rb b/spec/helper_methods.rb index 60c9c4c92..c4718aa1d 100644 --- a/spec/helper_methods.rb +++ b/spec/helper_methods.rb @@ -22,10 +22,6 @@ module HelperMethods user.stub!(:push_to_person) end - def message_queue - User::QUEUE - end - def connect_users(user1, aspect1, user2, aspect2) user1.send_contact_request_to(user2.person, aspect1) diff --git a/spec/lib/message_handler_spec.rb b/spec/lib/message_handler_spec.rb index 4cd0494c7..796ef3864 100644 --- a/spec/lib/message_handler_spec.rb +++ b/spec/lib/message_handler_spec.rb @@ -5,158 +5,23 @@ require 'spec_helper' describe MessageHandler do - - let(:handler) {MessageHandler.new()} let(:message_body) {"I want to pump you up"} let(:message_urls) {["http://www.google.com/", "http://yahoo.com/", "http://foo.com/"]} - let(:success) {FakeHttpRequest.new(:success)} - let(:failure) {FakeHttpRequest.new(:failure)} - - before do - handler.clear_queue - end - - after(:all) do - handler.clear_queue - end - - describe 'GET messages' do - describe 'creating a GET query' do - it 'should be able to add a GET query to the queue with required destinations' do - EventMachine.run{ - handler.add_get_request(message_urls) - handler.size.should == message_urls.size - EventMachine.stop - } - end - end - - describe 'processing a GET query' do - it 'should remove sucessful http requests from the queue' do - EventMachine::HttpRequest.stub!(:new).and_return(success) - - EventMachine.run { - handler.add_get_request("http://www.google.com/") - handler.size.should == 1 - handler.process - handler.size.should == 0 - EventMachine.stop - } - end - - it 'should only retry a bad request the correct number of times' do - failure.should_receive(:get).exactly(MessageHandler::NUM_TRIES).times.and_return(failure) - EventMachine::HttpRequest.stub!(:new).and_return(failure) - - EventMachine.run { - handler.add_get_request("http://asdfsdajfsdfbasdj.com/") - handler.size.should == 1 - handler.process - handler.size.should == 0 - - EventMachine.stop - } - end - end - end describe 'POST messages' do - it 'should be able to add a post message to the queue' do - EventMachine::HttpRequest.stub!(:new).and_return(success) - - EventMachine.run { - handler.size.should ==0 - handler.add_post_request(message_urls.first, message_body) - handler.size.should == 1 - - EventMachine.stop - } - end - - it 'should be able to insert many posts into the queue' do - EventMachine::HttpRequest.stub!(:new).and_return(success) - - EventMachine.run { - handler.size.should == 0 - handler.add_post_request(message_urls, message_body) - handler.size.should == message_urls.size - EventMachine.stop - } - end - - it 'should post a single message to a given URL' do - success.should_receive(:post).and_return(success) - EventMachine::HttpRequest.stub!(:new).and_return(success) - - EventMachine.run{ - handler.add_post_request(message_urls.first, message_body) - handler.size.should == 1 - handler.process - handler.size.should == 0 - - EventMachine.stop - - } - end - end - - describe "Hub publish" do before do - EventMachine::PubSubHubbub.stub(:new).and_return(:success) + @num_tries = MessageHandler::NUM_TRIES end - it 'should correctly queue up a pubsubhubbub publish request' do - destination = "http://identi.ca/hub/" - feed_location = "http://google.com/" - - EventMachine.run { - handler.add_hub_notification(destination, feed_location) - q = handler.instance_variable_get(:@queue) - - message = "" - q.pop{|m| message = m} - - message.destination.should == destination - message.body.should == feed_location - - EventMachine.stop - } - end - end - - describe "Mixed Queries" do - - it 'should process both POST and GET requests in the same queue' do - success.should_receive(:get).exactly(3).times.and_return(success) - success.should_receive(:post).exactly(3).times.and_return(success) - EventMachine::HttpRequest.stub!(:new).and_return(success) - - EventMachine.run{ - handler.add_post_request(message_urls, message_body) - handler.size.should == 3 - handler.add_get_request(message_urls) - handler.size.should == 6 - handler.process - timer = EventMachine::Timer.new(1) do - handler.size.should == 0 - EventMachine.stop - end - } + it 'enqueues a POST' do + Resque.should_receive(:enqueue).with(Jobs::HttpPost, message_urls.first, message_body, @num_tries) + MessageHandler.add_post_request(message_urls.first, message_body) end - it 'should be able to have seperate POST and GET have different callbacks' do - success.should_receive(:get).exactly(1).times.and_return(success) - success.should_receive(:post).exactly(1).times.and_return(success) - - EventMachine::HttpRequest.stub!(:new).and_return(success) - - EventMachine.run{ - handler.add_post_request(message_urls.first, message_body) - handler.add_get_request(message_urls.first) - handler.process - - EventMachine.stop - } - + it 'enqueues multiple POSTs' do + message_urls.each do |url| + Resque.should_receive(:enqueue).with(Jobs::HttpPost, url, message_body, @num_tries).once + end + MessageHandler.add_post_request(message_urls, message_body) end end end diff --git a/spec/models/comment_spec.rb b/spec/models/comment_spec.rb index d4b12ce95..8b46e8538 100644 --- a/spec/models/comment_spec.rb +++ b/spec/models/comment_spec.rb @@ -74,7 +74,7 @@ describe Comment do it 'should not send out comments when we have no people' do status = Factory.create(:status_message, :person => user.person) - User::QUEUE.should_not_receive(:add_post_request) + MessageHandler.should_not_receive(:add_post_request) user.comment "sup dog", :on => status end @@ -95,12 +95,12 @@ describe Comment do it "should send a user's comment on a person's post to that person" do - User::QUEUE.should_receive(:add_post_request).once + MessageHandler.should_receive(:add_post_request).once user.comment "yo", :on => @person_status end it 'should send a user comment on his own post to lots of people' do - User::QUEUE.should_receive(:add_post_request).once + MessageHandler.should_receive(:add_post_request).once user2.raw_visible_posts.count.should == 0 @@ -112,13 +112,13 @@ describe Comment do it 'should send a comment a person made on your post to all people' do comment = Comment.new(:person_id => @person.id, :diaspora_handle => @person.diaspora_handle, :text => "cats", :post => @user_status) - User::QUEUE.should_receive(:add_post_request).once + MessageHandler.should_receive(:add_post_request).once user.receive comment.to_diaspora_xml, @person end it 'should send a comment a user made on your post to all people' do comment = user2.comment( "balls", :on => @user_status) - User::QUEUE.should_receive(:add_post_request).once + MessageHandler.should_receive(:add_post_request).once user.receive comment.to_diaspora_xml, user2.person end @@ -128,13 +128,13 @@ describe Comment do end it 'should not send a comment a person made on his own post to anyone' do - User::QUEUE.should_not_receive(:add_post_request) + MessageHandler.should_not_receive(:add_post_request) comment = Comment.new(:person_id => @person.id, :diaspora_handle => @person.diaspora_handle, :text => "cats", :post => @person_status) user.receive comment.to_diaspora_xml, @person end it 'should not send a comment a person made on a person post to anyone' do - User::QUEUE.should_not_receive(:add_post_request) + MessageHandler.should_not_receive(:add_post_request) comment = Comment.new(:person_id => @person2.id, :diaspora_handle => @person.diaspora_handle, :text => "cats", :post => @person_status) user.receive comment.to_diaspora_xml, @person end diff --git a/spec/models/jobs/http_post_spec.rb b/spec/models/jobs/http_post_spec.rb new file mode 100644 index 000000000..633228d67 --- /dev/null +++ b/spec/models/jobs/http_post_spec.rb @@ -0,0 +1,17 @@ +require 'spec_helper' + +describe Jobs::HttpPost do + before do + @url = 'example.org/things/on/fire' + @body = 'California' + end + it 'POSTs to a given URL' do + RestClient.should_receive(:post).with(@url, @body).and_return(true) + Jobs::HttpPost.perform(@url, @body, 3) + end + it 'retries' do + RestClient.should_receive(:post).with(@url, @body).and_raise(SocketError) + Resque.should_receive(:enqueue).with(Jobs::HttpPost, @url, @body, 1).once + Jobs::HttpPost.perform(@url, @body, 2) + end +end diff --git a/spec/models/retraction_spec.rb b/spec/models/retraction_spec.rb index 178c1d3d0..265fd258e 100644 --- a/spec/models/retraction_spec.rb +++ b/spec/models/retraction_spec.rb @@ -23,7 +23,7 @@ describe Retraction do describe 'dispatching' do it 'should dispatch a message on delete' do Factory.create(:person) - User::QUEUE.should_receive :add_post_request + MessageHandler.should_receive :add_post_request post.destroy end end diff --git a/spec/models/user/posting_spec.rb b/spec/models/user/posting_spec.rb index e891bfbfa..26971940b 100644 --- a/spec/models/user/posting_spec.rb +++ b/spec/models/user/posting_spec.rb @@ -85,6 +85,18 @@ describe User do user.dispatch_post(status, :to => "all") end + it "posts to a pubsub hub if enabled" do + EventMachine::PubSubHubbub.should_receive(:new).and_return(FakeHttpRequest.new(:success)) + + destination = "http://identi.ca/hub/" + feed_location = "http://google.com/" + + EventMachine.run { + user.post_to_hub(feed_location) + EventMachine.stop + } + end + it "does not post to services if post is not public" do @status_opts[:public] = false status.save @@ -166,7 +178,7 @@ describe User do end it 'does not use the queue for local transfer' do - User::QUEUE.should_receive(:add_post_request).once + MessageHandler.should_receive(:add_post_request).once remote_person = user4.person remote_person.owner_id = nil diff --git a/spec/models/user/receive_spec.rb b/spec/models/user/receive_spec.rb index 225c593b4..b76c3b28f 100644 --- a/spec/models/user/receive_spec.rb +++ b/spec/models/user/receive_spec.rb @@ -86,23 +86,27 @@ describe User do aspect.reload end - it "should add a received post to the aspect and visible_posts array" do + it "adds a received post to the aspect and visible_posts array" do user.raw_visible_posts.include?(@status_message).should be_true aspect.posts.include?(@status_message).should be_true end - it 'should be removed on disconnecting' do + it 'removes posts upon disconnecting' do user.disconnect(user2.person) user.reload user.raw_visible_posts.should_not include @status_message end - it 'should be remove a post if the noone links to it' do + it 'deletes a post if the noone links to it' do person = user2.person - user2.delete - person.reload + person.owner_id = nil + person.save + @status_message.user_refs = 1 + @status_message.save - lambda {user.disconnect(person)}.should change(Post, :count).by(-1) + lambda { + user.disconnected_by(user2.person) + }.should change(Post, :count).by(-1) end it 'should keep track of user references for one person ' do