Merge pull request #87 from SuperTux88/fetch-only-once
Don't fetch the same entity twice in the same thread
This commit is contained in:
commit
181d11d2f5
2 changed files with 63 additions and 9 deletions
|
|
@ -7,15 +7,9 @@ module DiasporaFederation
|
||||||
# @param [Symbol, String] entity_type snake_case version of the entity class
|
# @param [Symbol, String] entity_type snake_case version of the entity class
|
||||||
# @param [String] guid guid of the entity to fetch
|
# @param [String] guid guid of the entity to fetch
|
||||||
def self.fetch_public(author, entity_type, guid)
|
def self.fetch_public(author, entity_type, guid)
|
||||||
url = DiasporaFederation.callbacks.trigger(
|
type = entity_name(entity_type).to_s
|
||||||
:fetch_person_url_to, author, "/fetch/#{entity_name(entity_type)}/#{guid}"
|
raise "Already fetching ..." if fetching[type].include?(guid)
|
||||||
)
|
fetch_from_url(author, type, guid)
|
||||||
response = HttpClient.get(url)
|
|
||||||
raise "Failed to fetch #{url}: #{response.status}" unless response.success?
|
|
||||||
|
|
||||||
magic_env_xml = Nokogiri::XML(response.body).root
|
|
||||||
magic_env = Salmon::MagicEnvelope.unenvelop(magic_env_xml)
|
|
||||||
Receiver::Public.new(magic_env).receive
|
|
||||||
rescue => e # rubocop:disable Lint/RescueWithoutErrorClass
|
rescue => e # rubocop:disable Lint/RescueWithoutErrorClass
|
||||||
raise NotFetchable, "Failed to fetch #{entity_type}:#{guid} from #{author}: #{e.class}: #{e.message}"
|
raise NotFetchable, "Failed to fetch #{entity_type}:#{guid} from #{author}: #{e.class}: #{e.message}"
|
||||||
end
|
end
|
||||||
|
|
@ -28,6 +22,23 @@ module DiasporaFederation
|
||||||
class_name.gsub(/(.)([A-Z])/, '\1_\2').downcase
|
class_name.gsub(/(.)([A-Z])/, '\1_\2').downcase
|
||||||
end
|
end
|
||||||
|
|
||||||
|
private_class_method def self.fetch_from_url(author, type, guid)
|
||||||
|
fetching[type] << guid
|
||||||
|
|
||||||
|
url = DiasporaFederation.callbacks.trigger(:fetch_person_url_to, author, "/fetch/#{type}/#{guid}")
|
||||||
|
response = HttpClient.get(url)
|
||||||
|
raise "Failed to fetch #{url}: #{response.status}" unless response.success?
|
||||||
|
|
||||||
|
Receiver.receive_public(response.body)
|
||||||
|
ensure
|
||||||
|
fetching[type].delete(guid)
|
||||||
|
end
|
||||||
|
|
||||||
|
# currently fetching entities in the same thread
|
||||||
|
private_class_method def self.fetching
|
||||||
|
Thread.current[:fetching_entities] ||= Hash.new {|h, k| h[k] = [] }
|
||||||
|
end
|
||||||
|
|
||||||
# Raised, if the entity is not fetchable
|
# Raised, if the entity is not fetchable
|
||||||
class NotFetchable < RuntimeError
|
class NotFetchable < RuntimeError
|
||||||
end
|
end
|
||||||
|
|
|
||||||
|
|
@ -93,6 +93,49 @@ module DiasporaFederation
|
||||||
Federation::Fetcher.fetch_public(post.author, :post, post.guid)
|
Federation::Fetcher.fetch_public(post.author, :post, post.guid)
|
||||||
}.to raise_error Federation::Fetcher::NotFetchable
|
}.to raise_error Federation::Fetcher::NotFetchable
|
||||||
end
|
end
|
||||||
|
|
||||||
|
it "detects a loop and breaks it" do
|
||||||
|
guid1 = Fabricate.sequence(:guid)
|
||||||
|
guid2 = Fabricate.sequence(:guid)
|
||||||
|
text1 = "Look at diaspora://#{alice.diaspora_id}/post/#{guid2}"
|
||||||
|
text2 = "LOL a loop at diaspora://#{alice.diaspora_id}/post/#{guid1}"
|
||||||
|
post1 = Fabricate(:status_message_entity, public: true, guid: guid1, text: text1, author: alice.diaspora_id)
|
||||||
|
post2 = Fabricate(:status_message_entity, public: true, guid: guid2, text: text2, author: alice.diaspora_id)
|
||||||
|
|
||||||
|
[post1, post2].each do |post|
|
||||||
|
post_magic_env = Salmon::MagicEnvelope.new(post, post.author).envelop(alice.private_key).to_xml
|
||||||
|
|
||||||
|
stub_request(:get, "https://example.org/fetch/post/#{post.guid}")
|
||||||
|
.to_return(status: 200, body: post_magic_env)
|
||||||
|
|
||||||
|
expect_callback(:fetch_person_url_to, post.author, "/fetch/post/#{post.guid}")
|
||||||
|
.and_return("https://example.org/fetch/post/#{post.guid}")
|
||||||
|
expect_callback(:fetch_related_entity, "Post", post.guid).and_return(nil)
|
||||||
|
expect_callback(:receive_entity, kind_of(DiasporaFederation::Entities::StatusMessage), post.author, nil)
|
||||||
|
end
|
||||||
|
|
||||||
|
expect_callback(:fetch_public_key, alice.diaspora_id).twice.and_return(alice.public_key)
|
||||||
|
|
||||||
|
Federation::Fetcher.fetch_public(post1.author, :post, post1.guid)
|
||||||
|
end
|
||||||
|
|
||||||
|
it "allows to fetch the same entity in two different threads" do
|
||||||
|
stub_request(:get, "https://example.org/fetch/post/#{post.guid}")
|
||||||
|
.to_return(status: 200, body: lambda {|_|
|
||||||
|
sleep 0.1
|
||||||
|
post_magic_env
|
||||||
|
})
|
||||||
|
|
||||||
|
expect_callback(:fetch_person_url_to, post.author, "/fetch/post/#{post.guid}")
|
||||||
|
.twice.and_return("https://example.org/fetch/post/#{post.guid}")
|
||||||
|
expect_callback(:fetch_public_key, post.author).twice.and_return(alice.public_key)
|
||||||
|
expect_callback(:receive_entity, kind_of(DiasporaFederation::Entities::StatusMessage), post.author, nil).twice
|
||||||
|
|
||||||
|
threads = Array.new(2).map do
|
||||||
|
Thread.new { Federation::Fetcher.fetch_public(post.author, :post, post.guid) }
|
||||||
|
end
|
||||||
|
threads.each(&:join)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue