From d852144f3ca3f3cca868a24246beec1be658b24e Mon Sep 17 00:00:00 2001 From: Benjamin Neff Date: Tue, 12 Jan 2016 02:33:41 +0100 Subject: [PATCH] add send functionality --- lib/diaspora_federation.rb | 1 + lib/diaspora_federation/federation.rb | 1 + lib/diaspora_federation/federation/sender.rb | 33 +++++++ .../federation/sender/hydra_wrapper.rb | 88 +++++++++++++++++++ .../initializers/diaspora_federation.rb | 3 + 5 files changed, 126 insertions(+) create mode 100644 lib/diaspora_federation/federation/sender.rb create mode 100644 lib/diaspora_federation/federation/sender/hydra_wrapper.rb diff --git a/lib/diaspora_federation.rb b/lib/diaspora_federation.rb index 378c27c..396c81f 100644 --- a/lib/diaspora_federation.rb +++ b/lib/diaspora_federation.rb @@ -31,6 +31,7 @@ module DiasporaFederation queue_public_receive queue_private_receive save_entity_after_receive + update_pod ) # defaults diff --git a/lib/diaspora_federation/federation.rb b/lib/diaspora_federation/federation.rb index f18c752..668ae1f 100644 --- a/lib/diaspora_federation/federation.rb +++ b/lib/diaspora_federation/federation.rb @@ -6,3 +6,4 @@ end require "diaspora_federation/federation/exceptions" require "diaspora_federation/federation/receiver" +require "diaspora_federation/federation/sender" diff --git a/lib/diaspora_federation/federation/sender.rb b/lib/diaspora_federation/federation/sender.rb new file mode 100644 index 0000000..a392122 --- /dev/null +++ b/lib/diaspora_federation/federation/sender.rb @@ -0,0 +1,33 @@ +module DiasporaFederation + module Federation + # Federation logic to send messages to other pods + module Sender + # Send a public message to all urls + # + # @param [String] sender_id sender diaspora-ID + # @param [String] guid guid of the object to send (can be nil if the object has no guid) + # @param [Array] urls receive-urls from pods + # @param [String] xml salmon-xml + # @return [Array] url to retry + def self.public(sender_id, guid, urls, xml) + hydra = HydraWrapper.new(sender_id, guid) + urls.each {|url| hydra.insert_job(url, xml) } + hydra.send + end + + # Send a private message to receive-urls + # + # @param [String] sender_id sender diaspora-ID + # @param [String] guid guid of the object to send (can be nil if the object has no guid) + # @param [Hash] targets Hash with receive-urls (key) of peoples with encrypted salmon-xml for them (value) + # @return [Hash] targets to retry + def self.private(sender_id, guid, targets) + hydra = HydraWrapper.new(sender_id, guid) + targets.each {|url, xml| hydra.insert_job(url, xml) } + Hash[hydra.send.map {|url| [url, targets[url]] }] + end + end + end +end + +require "diaspora_federation/federation/sender/hydra_wrapper" diff --git a/lib/diaspora_federation/federation/sender/hydra_wrapper.rb b/lib/diaspora_federation/federation/sender/hydra_wrapper.rb new file mode 100644 index 0000000..f4d9d23 --- /dev/null +++ b/lib/diaspora_federation/federation/sender/hydra_wrapper.rb @@ -0,0 +1,88 @@ +module DiasporaFederation + module Federation + module Sender + # A wrapper for [Typhoeus::Hydra] + # + # Uses parallel http requests to send out the salmon-messages + class HydraWrapper + include Logging + + # Hydra default opts + # @return [Hash] hydra opts + def self.hydra_opts + @hydra_opts ||= { + maxredirs: DiasporaFederation.http_redirect_limit, + timeout: DiasporaFederation.http_timeout, + method: :post, + verbose: DiasporaFederation.http_verbose, + cainfo: DiasporaFederation.certificate_authorities, + headers: { + "Expect" => "", + "Transfer-Encoding" => "", + "User-Agent" => DiasporaFederation.http_user_agent + } + } + end + + # Create a new instance for a message + # + # @param [String] sender_id sender diaspora-ID + # @param [String] guid guid of the object to send (can be nil if the object has no guid) + def initialize(sender_id, guid) + @sender_id = sender_id + @guid = guid + @urls_to_retry = [] + end + + # Prepares and inserts job into the hydra queue + # @param [String] url the receive-url for the xml + # @param [String] xml xml salmon message + def insert_job(url, xml) + request = Typhoeus::Request.new(url, HydraWrapper.hydra_opts.merge(body: {xml: xml})) + prepare_request(request) + hydra.queue(request) + end + + # Sends all queued messages + # @return [Array] urls to retry + def send + hydra.run + @urls_to_retry + end + + private + + # @return [Typhoeus::Hydra] hydra + def hydra + @hydra ||= Typhoeus::Hydra.new(max_concurrency: DiasporaFederation.http_concurrency) + end + + # Logic for after complete + # @param [Typhoeus::Request] request + def prepare_request(request) + request.on_complete do |response| + success = response.success? + DiasporaFederation.callbacks.trigger(:update_pod, pod_url(response.effective_url), success) + + log_line = "success=#{success} sender=#{@sender_id} guid=#{@guid} url=#{response.effective_url} " \ + "message=#{response.return_code} code=#{response.response_code} time=#{response.total_time}" + if success + logger.info(log_line) + else + logger.warn(log_line) + + @urls_to_retry << request.url + end + end + end + + # Get the pod root-url from the send-url + # @param [String] url + # @return [String] pod root-url + def pod_url(url) + URI.parse(url).tap {|uri| uri.path = "/" }.to_s + end + end + end + end +end diff --git a/test/dummy/config/initializers/diaspora_federation.rb b/test/dummy/config/initializers/diaspora_federation.rb index b0bd2d7..082d0de 100644 --- a/test/dummy/config/initializers/diaspora_federation.rb +++ b/test/dummy/config/initializers/diaspora_federation.rb @@ -97,5 +97,8 @@ DiasporaFederation.configure do |config| on :save_entity_after_receive do end + + on :update_pod do + end end end