retry receive share-visibility when failed while receiving parallel
refactoring:
- remove unused return-values (were used for caching, which was removed again)
- remove transaction (doesn't help here, added in 2615126)
closes #6068
This commit is contained in:
parent
010afa1019
commit
aa2297a8c0
8 changed files with 117 additions and 99 deletions
|
|
@ -26,6 +26,7 @@
|
|||
* Remove deprecated Facebook permissions [#6019](https://github.com/diaspora/diaspora/pull/6019)
|
||||
* Make used post title lengths more consistent [#6022](https://github.com/diaspora/diaspora/pull/6022)
|
||||
* Improved logging source [#6041](https://github.com/diaspora/diaspora/pull/6041)
|
||||
* Gracefully handle duplicate entry while receiving share-visibility in parallel [#6068](https://github.com/diaspora/diaspora/pull/6068)
|
||||
|
||||
## Bug fixes
|
||||
* Disable auto follow back on aspect deletion [#5846](https://github.com/diaspora/diaspora/pull/5846)
|
||||
|
|
|
|||
|
|
@ -2,21 +2,19 @@
|
|||
# licensed under the Affero General Public License version 3 or later. See
|
||||
# the COPYRIGHT file.
|
||||
|
||||
#this module attempts to be what you need to mix into
|
||||
# this module attempts to be what you need to mix into
|
||||
# base level federation objects that are not relayable, and not persistable
|
||||
#assumes there is an author, author_id, id,
|
||||
# assumes there is an author, author_id, id,
|
||||
module Diaspora
|
||||
module Federated
|
||||
module Shareable
|
||||
|
||||
def self.included(model)
|
||||
model.instance_eval do
|
||||
#we are order dependant so you don't have to be!
|
||||
# we are order dependant so you don't have to be!
|
||||
include Diaspora::Federated::Base
|
||||
include Diaspora::Federated::Shareable::InstanceMethods
|
||||
include Diaspora::Guid
|
||||
|
||||
|
||||
xml_attr :diaspora_handle
|
||||
xml_attr :public
|
||||
xml_attr :created_at
|
||||
|
|
@ -26,11 +24,11 @@ module Diaspora
|
|||
module InstanceMethods
|
||||
include Diaspora::Logging
|
||||
def diaspora_handle
|
||||
read_attribute(:diaspora_handle) || self.author.diaspora_handle
|
||||
read_attribute(:diaspora_handle) || author.diaspora_handle
|
||||
end
|
||||
|
||||
def diaspora_handle=(author_handle)
|
||||
self.author = Person.where(:diaspora_handle => author_handle).first
|
||||
self.author = Person.where(diaspora_handle: author_handle).first
|
||||
write_attribute(:diaspora_handle, author_handle)
|
||||
end
|
||||
|
||||
|
|
@ -38,25 +36,11 @@ module Diaspora
|
|||
# @param [Person] person The person who dispatched this shareable to the
|
||||
# @return [void]
|
||||
def receive(user, person)
|
||||
#exists locally, but you dont know about it
|
||||
#does not exsist locally, and you dont know about it
|
||||
#exists_locally?
|
||||
#you know about it, and it is mutable
|
||||
#you know about it, and it is not mutable
|
||||
self.class.transaction do
|
||||
local_shareable = persisted_shareable
|
||||
|
||||
if local_shareable && verify_persisted_shareable(local_shareable)
|
||||
self.receive_persisted(user, person, local_shareable)
|
||||
|
||||
elsif !local_shareable
|
||||
self.receive_non_persisted(user, person)
|
||||
|
||||
if local_shareable
|
||||
receive_persisted(user, person, local_shareable) if verify_persisted_shareable(local_shareable)
|
||||
else
|
||||
logger.warn "event=receive payload_type=#{self.class} update=true status=abort " \
|
||||
"sender=#{diaspora_handle} reason='update not from shareable owner' existing_shareable=#{id}"
|
||||
false
|
||||
end
|
||||
receive_non_persisted(user, person)
|
||||
end
|
||||
end
|
||||
|
||||
|
|
@ -68,53 +52,67 @@ module Diaspora
|
|||
if self.public?
|
||||
user.contact_people
|
||||
else
|
||||
user.people_in_aspects(user.aspects_with_shareable(self.class, self.id))
|
||||
user.people_in_aspects(user.aspects_with_shareable(self.class, id))
|
||||
end
|
||||
end
|
||||
|
||||
protected
|
||||
|
||||
# @return [Shareable,void]
|
||||
def persisted_shareable
|
||||
self.class.where(:guid => self.guid).first
|
||||
self.class.where(guid: guid).first
|
||||
end
|
||||
|
||||
# @return [Boolean]
|
||||
def verify_persisted_shareable(persisted_shareable)
|
||||
persisted_shareable.author_id == self.author_id
|
||||
end
|
||||
|
||||
def receive_persisted(user, person, local_shareable)
|
||||
known_shareable = user.find_visible_shareable_by_id(self.class.base_class, self.guid, :key => :guid)
|
||||
if known_shareable
|
||||
if known_shareable.mutable?
|
||||
known_shareable.update_attributes(self.attributes.except("id"))
|
||||
true
|
||||
else
|
||||
return true if persisted_shareable.author_id == author_id
|
||||
logger.warn "event=receive payload_type=#{self.class} update=true status=abort " \
|
||||
"sender=#{diaspora_handle} reason=immutable existing_shareable=#{known_shareable.id}"
|
||||
"sender=#{diaspora_handle} reason='update not from shareable owner' guid=#{guid}"
|
||||
false
|
||||
end
|
||||
|
||||
def receive_persisted(user, person, shareable)
|
||||
known_shareable = user.find_visible_shareable_by_id(self.class.base_class, guid, key: :guid)
|
||||
if known_shareable
|
||||
update_existing_sharable(known_shareable)
|
||||
else
|
||||
user.contact_for(person).receive_shareable(local_shareable)
|
||||
user.notify_if_mentioned(local_shareable)
|
||||
logger.info "event=receive payload_type=#{self.class} update=true status=complete " \
|
||||
"sender=#{diaspora_handle} existing_shareable=#{local_shareable.id}"
|
||||
true
|
||||
receive_shareable_visibility(user, person, shareable)
|
||||
end
|
||||
end
|
||||
|
||||
def update_existing_sharable(shareable)
|
||||
if shareable.mutable?
|
||||
shareable.update_attributes(attributes.except("id"))
|
||||
logger.info "event=receive payload_type=#{self.class} update=true status=complete " \
|
||||
"sender=#{diaspora_handle} guid=#{shareable.guid}"
|
||||
else
|
||||
logger.warn "event=receive payload_type=#{self.class} update=true status=abort " \
|
||||
"sender=#{diaspora_handle} reason=immutable guid=#{shareable.guid}"
|
||||
end
|
||||
end
|
||||
|
||||
def receive_shareable_visibility(user, person, shareable)
|
||||
user.contact_for(person).receive_shareable(shareable)
|
||||
user.notify_if_mentioned(shareable)
|
||||
logger.info "event=receive payload_type=#{self.class} status=complete " \
|
||||
"sender=#{diaspora_handle} receiver=#{person.diaspora_handle} guid=#{shareable.guid}"
|
||||
end
|
||||
|
||||
def receive_non_persisted(user, person)
|
||||
if self.save
|
||||
user.contact_for(person).receive_shareable(self)
|
||||
user.notify_if_mentioned(self)
|
||||
logger.info "event=receive payload_type=#{self.class} update=false status=complete " \
|
||||
"sender=#{diaspora_handle}"
|
||||
true
|
||||
if save
|
||||
logger.info "event=receive payload_type=#{self.class} status=complete sender=#{diaspora_handle} " \
|
||||
"guid=#{guid}"
|
||||
receive_shareable_visibility(user, person, self)
|
||||
else
|
||||
logger.warn "event=receive payload_type=#{self.class} update=false status=abort " \
|
||||
"sender=#{diaspora_handle} reason=#{errors.full_messages}"
|
||||
false
|
||||
logger.warn "event=receive payload_type=#{self.class} status=abort sender=#{diaspora_handle} " \
|
||||
"reason=#{errors.full_messages} guid=#{guid}"
|
||||
end
|
||||
rescue ActiveRecord::RecordNotUnique => e
|
||||
# this happens, when two share-visibilities are received parallel. Retry again with local shareable.
|
||||
logger.info "event=receive payload_type=#{self.class} status=retry sender=#{diaspora_handle} guid=#{guid}"
|
||||
local_shareable = persisted_shareable
|
||||
raise e unless local_shareable
|
||||
receive_shareable_visibility(user, person, local_shareable) if verify_persisted_shareable(local_shareable)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
|||
|
|
@ -26,17 +26,15 @@ class Postzord::Receiver::LocalBatch < Postzord::Receiver
|
|||
notify_users
|
||||
|
||||
logger.info "receiving local batch completed for #{@object.inspect}"
|
||||
true
|
||||
end
|
||||
|
||||
# NOTE(copied over from receiver public)
|
||||
# @return [Object]
|
||||
# @return [void]
|
||||
def receive_relayable
|
||||
if @object.parent_author.local?
|
||||
# receive relayable object only for the owner of the parent object
|
||||
@object.receive(@object.parent_author.owner)
|
||||
end
|
||||
@object
|
||||
end
|
||||
|
||||
# Batch import post visibilities for the recipients of the given @object
|
||||
|
|
|
|||
|
|
@ -21,7 +21,6 @@ class Postzord::Receiver::Private < Postzord::Receiver
|
|||
else
|
||||
logger.error "event=receive status=abort reason='not_verified for key' " \
|
||||
"recipient=#{@user.diaspora_handle} sender=#{@salmon.author_id}"
|
||||
false
|
||||
end
|
||||
rescue => e
|
||||
logger.error "failed to receive #{@object.class} from sender:#{@sender.id} for user:#{@user.id}: #{e.message}\n" \
|
||||
|
|
@ -40,14 +39,13 @@ class Postzord::Receiver::Private < Postzord::Receiver
|
|||
receive_object
|
||||
end
|
||||
|
||||
# @return [Object]
|
||||
# @return [void]
|
||||
def receive_object
|
||||
obj = @object.receive(@user, @author)
|
||||
Notification.notify(@user, obj, @author) if obj.respond_to?(:notification_type)
|
||||
logger.info "user:#{@user.id} successfully received #{@object.class} from person #{@sender.guid}" \
|
||||
"#{": #{@object.guid}" if @object.respond_to?(:guid)}"
|
||||
logger.debug "received: #{@object.inspect}"
|
||||
obj
|
||||
end
|
||||
|
||||
protected
|
||||
|
|
|
|||
|
|
@ -20,10 +20,10 @@ class Postzord::Receiver::Public < Postzord::Receiver
|
|||
|
||||
# @return [void]
|
||||
def receive!
|
||||
return false unless verified_signature?
|
||||
return unless verified_signature?
|
||||
# return false unless account_deletion_is_from_author
|
||||
|
||||
return false unless save_object
|
||||
return unless save_object
|
||||
|
||||
logger.info "received a #{@object.inspect}"
|
||||
if @object.is_a?(SignedRetraction) # feels like a hack
|
||||
|
|
@ -37,11 +37,10 @@ class Postzord::Receiver::Public < Postzord::Receiver
|
|||
#nothing
|
||||
else
|
||||
Workers::ReceiveLocalBatch.perform_async(@object.class.to_s, @object.id, self.recipient_user_ids)
|
||||
true
|
||||
end
|
||||
end
|
||||
|
||||
# @return [Object]
|
||||
# @return [void]
|
||||
def receive_relayable
|
||||
if @object.parent_author.local?
|
||||
# receive relayable object only for the owner of the parent object
|
||||
|
|
@ -50,7 +49,6 @@ class Postzord::Receiver::Public < Postzord::Receiver
|
|||
# notify everyone who can see the parent object
|
||||
receiver = Postzord::Receiver::LocalBatch.new(@object, self.recipient_user_ids)
|
||||
receiver.notify_users
|
||||
@object
|
||||
end
|
||||
|
||||
# @return [Object]
|
||||
|
|
|
|||
|
|
@ -43,22 +43,17 @@ describe Postzord::Receiver::Private do
|
|||
@salmon = @zord.instance_variable_get(:@salmon)
|
||||
end
|
||||
|
||||
context 'returns false' do
|
||||
it 'if the salmon author does not exist' do
|
||||
context "does not parse and receive" do
|
||||
it "if the salmon author does not exist" do
|
||||
@zord.instance_variable_set(:@sender, nil)
|
||||
expect(@zord.receive!).to eq(false)
|
||||
end
|
||||
|
||||
it 'if the author does not match the signature' do
|
||||
@zord.instance_variable_set(:@sender, FactoryGirl.create(:person))
|
||||
expect(@zord.receive!).to eq(false)
|
||||
end
|
||||
end
|
||||
|
||||
context 'returns the sent object' do
|
||||
it 'returns the received object on success' do
|
||||
expect(@zord).not_to receive(:parse_and_receive)
|
||||
@zord.receive!
|
||||
end
|
||||
|
||||
it "if the author does not match the signature" do
|
||||
@zord.instance_variable_set(:@sender, FactoryGirl.create(:person))
|
||||
expect(@zord).not_to receive(:parse_and_receive)
|
||||
@zord.receive!
|
||||
expect(@zord.instance_variable_get(:@object)).to respond_to(:to_diaspora_xml)
|
||||
end
|
||||
end
|
||||
|
||||
|
|
|
|||
|
|
@ -45,9 +45,10 @@ describe Postzord::Receiver::Public do
|
|||
@receiver.perform!
|
||||
end
|
||||
|
||||
it 'returns false if signature is not verified' do
|
||||
it "does not save the object if signature is not verified" do
|
||||
expect(@receiver).to receive(:verified_signature?).and_return(false)
|
||||
expect(@receiver.perform!).to be false
|
||||
expect(@receiver).not_to receive(:save_object)
|
||||
@receiver.perform!
|
||||
end
|
||||
|
||||
context 'if signature is valid' do
|
||||
|
|
|
|||
|
|
@ -229,18 +229,33 @@ describe Post, :type => :model do
|
|||
end
|
||||
|
||||
describe "#receive" do
|
||||
it 'returns false if the post does not verify' do
|
||||
@post = FactoryGirl.create(:status_message, :author => bob.person)
|
||||
expect(@post).to receive(:verify_persisted_shareable).and_return(false)
|
||||
expect(@post.receive(bob, eve.person)).to eq(false)
|
||||
it "does not receive if the post does not verify" do
|
||||
@post = FactoryGirl.create(:status_message, author: bob.person)
|
||||
@known_post = FactoryGirl.create(:status_message, author: eve.person)
|
||||
allow(@post).to receive(:persisted_shareable).and_return(@known_post)
|
||||
expect(@post).not_to receive(:receive_persisted)
|
||||
@post.receive(bob, eve.person)
|
||||
end
|
||||
|
||||
it "receives an update if the post is known" do
|
||||
@post = FactoryGirl.create(:status_message, author: bob.person)
|
||||
expect(@post).to receive(:receive_persisted)
|
||||
@post.receive(bob, eve.person)
|
||||
end
|
||||
|
||||
it "receives a new post if the post is unknown" do
|
||||
@post = FactoryGirl.create(:status_message, author: bob.person)
|
||||
allow(@post).to receive(:persisted_shareable).and_return(nil)
|
||||
expect(@post).to receive(:receive_non_persisted)
|
||||
@post.receive(bob, eve.person)
|
||||
end
|
||||
end
|
||||
|
||||
describe "#receive_persisted" do
|
||||
before do
|
||||
@post = FactoryGirl.create(:status_message, :author => bob.person)
|
||||
@post = FactoryGirl.create(:status_message, author: bob.person)
|
||||
@known_post = Post.new
|
||||
allow(bob).to receive(:contact_for).with(eve.person).and_return(double(:receive_shareable => true))
|
||||
allow(bob).to receive(:contact_for).with(eve.person).and_return(double(receive_shareable: true))
|
||||
end
|
||||
|
||||
context "user knows about the post" do
|
||||
|
|
@ -248,16 +263,16 @@ describe Post, :type => :model do
|
|||
allow(bob).to receive(:find_visible_shareable_by_id).and_return(@known_post)
|
||||
end
|
||||
|
||||
it 'updates attributes only if mutable' do
|
||||
it "updates attributes only if mutable" do
|
||||
allow(@known_post).to receive(:mutable?).and_return(true)
|
||||
expect(@known_post).to receive(:update_attributes)
|
||||
expect(@post.send(:receive_persisted, bob, eve.person, @known_post)).to eq(true)
|
||||
end
|
||||
|
||||
it 'returns false if trying to update a non-mutable object' do
|
||||
it "does not update attributes if trying to update a non-mutable object" do
|
||||
allow(@known_post).to receive(:mutable?).and_return(false)
|
||||
expect(@known_post).not_to receive(:update_attributes)
|
||||
expect(@post.send(:receive_persisted, bob, eve.person, @known_post)).to eq(false)
|
||||
@post.send(:receive_persisted, bob, eve.person, @known_post)
|
||||
end
|
||||
end
|
||||
|
||||
|
|
@ -271,8 +286,8 @@ describe Post, :type => :model do
|
|||
expect(@post.send(:receive_persisted, bob, eve.person, @known_post)).to eq(true)
|
||||
end
|
||||
|
||||
it 'notifies the user if they are mentioned' do
|
||||
allow(bob).to receive(:contact_for).with(eve.person).and_return(double(:receive_shareable => true))
|
||||
it "notifies the user if they are mentioned" do
|
||||
allow(bob).to receive(:contact_for).with(eve.person).and_return(double(receive_shareable: true))
|
||||
expect(bob).to receive(:notify_if_mentioned).and_return(true)
|
||||
|
||||
expect(@post.send(:receive_persisted, bob, eve.person, @known_post)).to eq(true)
|
||||
|
|
@ -280,29 +295,43 @@ describe Post, :type => :model do
|
|||
end
|
||||
end
|
||||
|
||||
describe '#receive_non_persisted' do
|
||||
describe "#receive_non_persisted" do
|
||||
context "the user does not know about the post" do
|
||||
before do
|
||||
@post = FactoryGirl.create(:status_message, :author => bob.person)
|
||||
@post = FactoryGirl.create(:status_message, author: bob.person)
|
||||
allow(bob).to receive(:find_visible_shareable_by_id).and_return(nil)
|
||||
allow(bob).to receive(:notify_if_mentioned).and_return(true)
|
||||
end
|
||||
|
||||
it "it receives the post from the contact of the author" do
|
||||
expect(bob).to receive(:contact_for).with(eve.person).and_return(double(:receive_shareable => true))
|
||||
expect(bob).to receive(:contact_for).with(eve.person).and_return(double(receive_shareable: true))
|
||||
expect(@post.send(:receive_non_persisted, bob, eve.person)).to eq(true)
|
||||
end
|
||||
|
||||
it 'notifies the user if they are mentioned' do
|
||||
allow(bob).to receive(:contact_for).with(eve.person).and_return(double(:receive_shareable => true))
|
||||
it "notifies the user if they are mentioned" do
|
||||
allow(bob).to receive(:contact_for).with(eve.person).and_return(double(receive_shareable: true))
|
||||
expect(bob).to receive(:notify_if_mentioned).and_return(true)
|
||||
|
||||
expect(@post.send(:receive_non_persisted, bob, eve.person)).to eq(true)
|
||||
end
|
||||
|
||||
it 'returns false if the post does not save' do
|
||||
it "does not create shareable visibility if the post does not save" do
|
||||
allow(@post).to receive(:save).and_return(false)
|
||||
expect(@post.send(:receive_non_persisted, bob, eve.person)).to eq(false)
|
||||
expect(@post).not_to receive(:receive_shareable_visibility)
|
||||
@post.send(:receive_non_persisted, bob, eve.person)
|
||||
end
|
||||
|
||||
it "retries if saving fails with RecordNotUnique error" do
|
||||
allow(@post).to receive(:save).and_raise(ActiveRecord::RecordNotUnique.new("Duplicate entry ..."))
|
||||
expect(bob).to receive(:contact_for).with(eve.person).and_return(double(receive_shareable: true))
|
||||
expect(@post.send(:receive_non_persisted, bob, eve.person)).to eq(true)
|
||||
end
|
||||
|
||||
it "retries if saving fails with RecordNotUnique error and raise again if no persisted shareable found" do
|
||||
allow(@post).to receive(:save).and_raise(ActiveRecord::RecordNotUnique.new("Duplicate entry ..."))
|
||||
allow(@post).to receive(:persisted_shareable).and_return(nil)
|
||||
expect(bob).not_to receive(:contact_for).with(eve.person)
|
||||
expect { @post.send(:receive_non_persisted, bob, eve.person) }.to raise_error(ActiveRecord::RecordNotUnique)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
|||
Loading…
Reference in a new issue