create new receive workers

This commit is contained in:
Benjamin Neff 2016-04-04 00:56:45 +02:00
parent 9021268e7a
commit e9f53265c9
10 changed files with 153 additions and 79 deletions

View file

@ -0,0 +1,36 @@
module Workers
class ReceiveBase < Base
sidekiq_options queue: :receive
include Diaspora::Logging
# don't retry for errors that will fail again
def filter_errors_for_retry
yield
rescue DiasporaFederation::Entity::ValidationError,
DiasporaFederation::Entity::InvalidRootNode,
DiasporaFederation::Entity::InvalidEntityName,
DiasporaFederation::Entity::UnknownEntity,
DiasporaFederation::Entities::Relayable::SignatureVerificationFailed,
DiasporaFederation::Federation::Receiver::InvalidSender,
DiasporaFederation::Federation::Receiver::NotPublic,
DiasporaFederation::Salmon::SenderKeyNotFound,
DiasporaFederation::Salmon::InvalidEnvelope,
DiasporaFederation::Salmon::InvalidSignature,
DiasporaFederation::Salmon::InvalidAlgorithm,
DiasporaFederation::Salmon::InvalidEncoding,
# TODO: deprecated
DiasporaFederation::Salmon::MissingMagicEnvelope,
DiasporaFederation::Salmon::MissingAuthor,
DiasporaFederation::Salmon::MissingHeader,
DiasporaFederation::Salmon::InvalidHeader => e
logger.warn "don't retry for error: #{e.class}"
rescue ActiveRecord::RecordInvalid => e
logger.warn "failed to save received object: #{e.record.errors.full_messages}"
raise e unless [
"already been taken",
"is ignored by the post author"
].any? {|reason| e.message.include? reason }
end
end
end

View file

@ -1,19 +0,0 @@
# Copyright (c) 2010-2011, Diaspora Inc. This file is
# licensed under the Affero General Public License version 3 or later. See
# the COPYRIGHT file.
module Workers
class ReceiveEncryptedSalmon < Base
sidekiq_options queue: :receive_salmon
def perform(user_id, xml)
suppress_annoying_errors do
user = User.find(user_id)
zord = Postzord::Receiver::Private.new(user, :salmon_xml => xml)
zord.perform!
end
end
end
end

View file

@ -0,0 +1,15 @@
# Copyright (c) 2010-2011, Diaspora Inc. This file is
# licensed under the Affero General Public License version 3 or later. See
# the COPYRIGHT file.
module Workers
class ReceivePrivate < ReceiveBase
def perform(user_id, data, legacy)
filter_errors_for_retry do
user_private_key = User.where(id: user_id).pluck(:serialized_private_key).first
rsa_key = OpenSSL::PKey::RSA.new(user_private_key)
DiasporaFederation::Federation::Receiver.receive_private(data, rsa_key, user_id, legacy)
end
end
end
end

View file

@ -0,0 +1,13 @@
# Copyright (c) 2010-2011, Diaspora Inc. This file is
# licensed under the Affero General Public License version 3 or later. See
# the COPYRIGHT file.
module Workers
class ReceivePublic < ReceiveBase
def perform(data, legacy=false)
filter_errors_for_retry do
DiasporaFederation::Federation::Receiver.receive_public(data, legacy)
end
end
end
end

View file

@ -1,16 +0,0 @@
# Copyright (c) 2010-2011, Diaspora Inc. This file is
# licensed under the Affero General Public License version 3 or later. See
# the COPYRIGHT file.
module Workers
class ReceiveUnencryptedSalmon < Base
sidekiq_options queue: :receive
def perform(xml)
suppress_annoying_errors do
receiver = Postzord::Receiver::Public.new(xml)
receiver.perform!
end
end
end
end

View file

@ -65,12 +65,12 @@ DiasporaFederation.configure do |config|
on :fetch_private_key do |diaspora_id|
key = Person.where(diaspora_handle: diaspora_id).joins(:owner).pluck(:serialized_private_key).first
OpenSSL::PKey::RSA.new key unless key.nil?
OpenSSL::PKey::RSA.new(key) unless key.nil?
end
on :fetch_public_key do |diaspora_id|
key = Person.find_or_fetch_by_identifier(diaspora_id).serialized_public_key
OpenSSL::PKey::RSA.new key unless key.nil?
OpenSSL::PKey::RSA.new(key) unless key.nil?
end
on :fetch_related_entity do |entity_type, guid|
@ -78,18 +78,15 @@ DiasporaFederation.configure do |config|
Diaspora::Federation::Entities.related_entity(entity) if entity
end
on :queue_public_receive do |xml|
Workers::ReceiveUnencryptedSalmon.perform_async(xml)
on :queue_public_receive do |xml, legacy=false|
Workers::ReceivePublic.perform_async(xml, legacy)
end
on :queue_private_receive do |guid, xml|
on :queue_private_receive do |guid, xml, legacy=false|
person = Person.find_by_guid(guid)
if person.nil? || person.owner_id.nil?
false
else
Workers::ReceiveEncryptedSalmon.perform_async(person.owner.id, xml)
true
(person.present? && person.owner_id.present?).tap do |user_found|
Workers::ReceivePrivate.perform_async(person.owner.id, xml, legacy) if user_found
end
end

View file

@ -284,36 +284,36 @@ describe "diaspora federation callbacks" do
end
describe ":queue_public_receive" do
it "enqueues a ReceiveUnencryptedSalmon job" do
xml = "<diaspora/>"
expect(Workers::ReceiveUnencryptedSalmon).to receive(:perform_async).with(xml)
it "enqueues a ReceivePublic job" do
data = "<diaspora/>"
expect(Workers::ReceivePublic).to receive(:perform_async).with(data, true)
DiasporaFederation.callbacks.trigger(:queue_public_receive, xml)
DiasporaFederation.callbacks.trigger(:queue_public_receive, data, true)
end
end
describe ":queue_private_receive" do
let(:xml) { "<diaspora/>" }
let(:data) { "<diaspora/>" }
it "returns true if the user is found" do
result = DiasporaFederation.callbacks.trigger(:queue_private_receive, alice.person.guid, xml)
result = DiasporaFederation.callbacks.trigger(:queue_private_receive, alice.person.guid, data)
expect(result).to be_truthy
end
it "enqueues a ReceiveEncryptedSalmon job" do
expect(Workers::ReceiveEncryptedSalmon).to receive(:perform_async).with(alice.id, xml)
it "enqueues a ReceivePrivate job" do
expect(Workers::ReceivePrivate).to receive(:perform_async).with(alice.id, data, true)
DiasporaFederation.callbacks.trigger(:queue_private_receive, alice.person.guid, xml)
DiasporaFederation.callbacks.trigger(:queue_private_receive, alice.person.guid, data, true)
end
it "returns false if the no user is found" do
person = FactoryGirl.create(:person)
result = DiasporaFederation.callbacks.trigger(:queue_private_receive, person.guid, xml)
result = DiasporaFederation.callbacks.trigger(:queue_private_receive, person.guid, data, true)
expect(result).to be_falsey
end
it "returns false if the no person is found" do
result = DiasporaFederation.callbacks.trigger(:queue_private_receive, "2398rq3948yftn", xml)
result = DiasporaFederation.callbacks.trigger(:queue_private_receive, "2398rq3948yftn", data, true)
expect(result).to be_falsey
end
end

View file

@ -0,0 +1,40 @@
require "spec_helper"
describe Workers::ReceivePrivate do
let(:data) { "<xml></xml>" }
it "calls receive_private of federation gem" do
rsa_key = double
expect(OpenSSL::PKey::RSA).to receive(:new).with(alice.serialized_private_key).and_return(rsa_key)
expect(DiasporaFederation::Federation::Receiver).to receive(:receive_private).with(data, rsa_key, alice.id, true)
Workers::ReceivePrivate.new.perform(alice.id, data, true)
end
it "filters errors that would also fail on second try" do
rsa_key = double
expect(OpenSSL::PKey::RSA).to receive(:new).with(alice.serialized_private_key).and_return(rsa_key)
expect(DiasporaFederation::Federation::Receiver).to receive(:receive_private).with(
data, rsa_key, alice.id, false
).and_raise(DiasporaFederation::Salmon::InvalidSignature)
expect {
Workers::ReceivePrivate.new.perform(alice.id, data, false)
}.not_to raise_error
end
it "does not filter errors that would succeed on second try" do
rsa_key = double
expect(OpenSSL::PKey::RSA).to receive(:new).with(alice.serialized_private_key).and_return(rsa_key)
expect(DiasporaFederation::Federation::Receiver).to receive(:receive_private).with(
data, rsa_key, alice.id, false
).and_raise(DiasporaFederation::Federation::Fetcher::NotFetchable)
expect {
Workers::ReceivePrivate.new.perform(alice.id, data, false)
}.to raise_error DiasporaFederation::Federation::Fetcher::NotFetchable
end
end

View file

@ -0,0 +1,31 @@
require "spec_helper"
describe Workers::ReceivePublic do
let(:data) { "<xml></xml>" }
it "calls receive_public of federation gem" do
expect(DiasporaFederation::Federation::Receiver).to receive(:receive_public).with(data, true)
Workers::ReceivePublic.new.perform(data, true)
end
it "filters errors that would also fail on second try" do
expect(DiasporaFederation::Federation::Receiver).to receive(:receive_public).with(
data, false
).and_raise(DiasporaFederation::Salmon::InvalidSignature)
expect {
Workers::ReceivePublic.new.perform(data, false)
}.not_to raise_error
end
it "does not filter errors that would succeed on second try" do
expect(DiasporaFederation::Federation::Receiver).to receive(:receive_public).with(
data, false
).and_raise(DiasporaFederation::Federation::Fetcher::NotFetchable)
expect {
Workers::ReceivePublic.new.perform(data, false)
}.to raise_error DiasporaFederation::Federation::Fetcher::NotFetchable
end
end

View file

@ -1,23 +0,0 @@
require 'spec_helper'
describe Workers::ReceiveEncryptedSalmon do
before do
@user = alice
@xml = '<xml></xml>'
allow(User).to receive(:find){ |id|
if id == @user.id
@user
else
nil
end
}
end
it 'calls receive_salmon' do
zord = double
expect(zord).to receive(:perform!)
expect(Postzord::Receiver::Private).to receive(:new).with(@user, hash_including(:salmon_xml => @xml)).and_return(zord)
Workers::ReceiveEncryptedSalmon.new.perform(@user.id, @xml)
end
end