diff --git a/lib/diaspora_federation/federation/fetcher.rb b/lib/diaspora_federation/federation/fetcher.rb index 26b8a26..089e51e 100644 --- a/lib/diaspora_federation/federation/fetcher.rb +++ b/lib/diaspora_federation/federation/fetcher.rb @@ -7,15 +7,9 @@ module DiasporaFederation # @param [Symbol, String] entity_type snake_case version of the entity class # @param [String] guid guid of the entity to fetch def self.fetch_public(author, entity_type, guid) - url = DiasporaFederation.callbacks.trigger( - :fetch_person_url_to, author, "/fetch/#{entity_name(entity_type)}/#{guid}" - ) - response = HttpClient.get(url) - raise "Failed to fetch #{url}: #{response.status}" unless response.success? - - magic_env_xml = Nokogiri::XML(response.body).root - magic_env = Salmon::MagicEnvelope.unenvelop(magic_env_xml) - Receiver::Public.new(magic_env).receive + type = entity_name(entity_type).to_s + raise "Already fetching ..." if fetching[type].include?(guid) + fetch_from_url(author, type, guid) rescue => e # rubocop:disable Lint/RescueWithoutErrorClass raise NotFetchable, "Failed to fetch #{entity_type}:#{guid} from #{author}: #{e.class}: #{e.message}" end @@ -28,6 +22,23 @@ module DiasporaFederation class_name.gsub(/(.)([A-Z])/, '\1_\2').downcase end + private_class_method def self.fetch_from_url(author, type, guid) + fetching[type] << guid + + url = DiasporaFederation.callbacks.trigger(:fetch_person_url_to, author, "/fetch/#{type}/#{guid}") + response = HttpClient.get(url) + raise "Failed to fetch #{url}: #{response.status}" unless response.success? + + Receiver.receive_public(response.body) + ensure + fetching[type].delete(guid) + end + + # currently fetching entities in the same thread + private_class_method def self.fetching + Thread.current[:fetching_entities] ||= Hash.new {|h, k| h[k] = [] } + end + # Raised, if the entity is not fetchable class NotFetchable < RuntimeError end diff --git a/spec/lib/diaspora_federation/federation/fetcher_spec.rb b/spec/lib/diaspora_federation/federation/fetcher_spec.rb index 22898f5..51997eb 100644 --- a/spec/lib/diaspora_federation/federation/fetcher_spec.rb +++ b/spec/lib/diaspora_federation/federation/fetcher_spec.rb @@ -93,6 +93,49 @@ module DiasporaFederation Federation::Fetcher.fetch_public(post.author, :post, post.guid) }.to raise_error Federation::Fetcher::NotFetchable end + + it "detects a loop and breaks it" do + guid1 = Fabricate.sequence(:guid) + guid2 = Fabricate.sequence(:guid) + text1 = "Look at diaspora://#{alice.diaspora_id}/post/#{guid2}" + text2 = "LOL a loop at diaspora://#{alice.diaspora_id}/post/#{guid1}" + post1 = Fabricate(:status_message_entity, public: true, guid: guid1, text: text1, author: alice.diaspora_id) + post2 = Fabricate(:status_message_entity, public: true, guid: guid2, text: text2, author: alice.diaspora_id) + + [post1, post2].each do |post| + post_magic_env = Salmon::MagicEnvelope.new(post, post.author).envelop(alice.private_key).to_xml + + stub_request(:get, "https://example.org/fetch/post/#{post.guid}") + .to_return(status: 200, body: post_magic_env) + + expect_callback(:fetch_person_url_to, post.author, "/fetch/post/#{post.guid}") + .and_return("https://example.org/fetch/post/#{post.guid}") + expect_callback(:fetch_related_entity, "Post", post.guid).and_return(nil) + expect_callback(:receive_entity, kind_of(DiasporaFederation::Entities::StatusMessage), post.author, nil) + end + + expect_callback(:fetch_public_key, alice.diaspora_id).twice.and_return(alice.public_key) + + Federation::Fetcher.fetch_public(post1.author, :post, post1.guid) + end + + it "allows to fetch the same entity in two different threads" do + stub_request(:get, "https://example.org/fetch/post/#{post.guid}") + .to_return(status: 200, body: lambda {|_| + sleep 0.1 + post_magic_env + }) + + expect_callback(:fetch_person_url_to, post.author, "/fetch/post/#{post.guid}") + .twice.and_return("https://example.org/fetch/post/#{post.guid}") + expect_callback(:fetch_public_key, post.author).twice.and_return(alice.public_key) + expect_callback(:receive_entity, kind_of(DiasporaFederation::Entities::StatusMessage), post.author, nil).twice + + threads = Array.new(2).map do + Thread.new { Federation::Fetcher.fetch_public(post.author, :post, post.guid) } + end + threads.each(&:join) + end end end end