Merge pull request #3482 from Raven24/public-fetcher
first working version of a 'post fetcher' for remote accounts
This commit is contained in:
commit
fb9da740bb
8 changed files with 419 additions and 3 deletions
15
app/models/jobs/fetch_public_posts.rb
Normal file
15
app/models/jobs/fetch_public_posts.rb
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
# Copyright (c) 2010-2012, Diaspora Inc. This file is
|
||||
# licensed under the Affero General Public License version 3 or later. See
|
||||
# the COPYRIGHT file.
|
||||
|
||||
module Jobs
|
||||
class FetchPublicPosts < Base
|
||||
@queue = :http_service
|
||||
|
||||
def self.perform(diaspora_id)
|
||||
require Rails.root.join('lib','diaspora','fetcher','public')
|
||||
|
||||
PublicFetcher.new.fetch!(diaspora_id)
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
# Copyright (c) 2010-2011, Diaspora Inc. This file is
|
||||
# Copyright (c) 2010-2012, Diaspora Inc. This file is
|
||||
# licensed under the Affero General Public License version 3 or later. See
|
||||
# the COPYRIGHT file.
|
||||
|
||||
|
|
@ -7,7 +7,10 @@ module Jobs
|
|||
@queue = :socket_webfinger
|
||||
|
||||
def self.perform(account)
|
||||
Webfinger.new(account).fetch
|
||||
person = Webfinger.new(account).fetch
|
||||
|
||||
# also, schedule to fetch a few public posts from that person
|
||||
Resque.enqueue(Jobs::FetchPublicPosts, person.diaspora_handle) unless person.nil?
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
|||
5
db/migrate/20120803143552_add_fetch_status_to_people.rb
Normal file
5
db/migrate/20120803143552_add_fetch_status_to_people.rb
Normal file
|
|
@ -0,0 +1,5 @@
|
|||
class AddFetchStatusToPeople < ActiveRecord::Migration
|
||||
def change
|
||||
add_column :people, :fetch_status, :integer, :default => 0
|
||||
end
|
||||
end
|
||||
|
|
@ -11,7 +11,7 @@
|
|||
#
|
||||
# It's strongly recommended to check this file into your version control system.
|
||||
|
||||
ActiveRecord::Schema.define(:version => 20120521191429) do
|
||||
ActiveRecord::Schema.define(:version => 20120803143552) do
|
||||
|
||||
create_table "account_deletions", :force => true do |t|
|
||||
t.string "diaspora_handle"
|
||||
|
|
@ -229,6 +229,7 @@ ActiveRecord::Schema.define(:version => 20120521191429) do
|
|||
t.datetime "created_at", :null => false
|
||||
t.datetime "updated_at", :null => false
|
||||
t.boolean "closed_account", :default => false
|
||||
t.integer "fetch_status", :default => 0
|
||||
end
|
||||
|
||||
add_index "people", ["diaspora_handle"], :name => "index_people_on_diaspora_handle", :unique => true
|
||||
|
|
|
|||
176
lib/diaspora/fetcher/public.rb
Normal file
176
lib/diaspora/fetcher/public.rb
Normal file
|
|
@ -0,0 +1,176 @@
|
|||
# Copyright (c) 2010-2012, Diaspora Inc. This file is
|
||||
# licensed under the Affero General Public License version 3 or later. See
|
||||
# the COPYRIGHT file.
|
||||
|
||||
class PublicFetcher
|
||||
|
||||
# various states that can be assigned to a person to describe where
|
||||
# in the process of fetching their public posts we're currently at
|
||||
Status_Initial = 0
|
||||
Status_Running = 1
|
||||
Status_Fetched = 2
|
||||
Status_Processed = 3
|
||||
Status_Done = 4
|
||||
Status_Failed = 5
|
||||
Status_Unfetchable = 6
|
||||
|
||||
# perform all actions necessary to fetch the public posts of a person
|
||||
# with the given diaspora_id
|
||||
def fetch! diaspora_id
|
||||
@person = Person.by_account_identifier diaspora_id
|
||||
return unless qualifies_for_fetching?
|
||||
|
||||
begin
|
||||
retrieve_and_process_posts
|
||||
rescue => e
|
||||
set_fetch_status PublicFetcher::Status_Failed
|
||||
raise e
|
||||
end
|
||||
|
||||
set_fetch_status PublicFetcher::Status_Done
|
||||
end
|
||||
|
||||
private
|
||||
# checks, that public posts for the person can be fetched,
|
||||
# if it is reasonable to do so, and that they have not been fetched already
|
||||
def qualifies_for_fetching?
|
||||
raise ActiveRecord::RecordNotFound unless @person.present?
|
||||
return false if @person.fetch_status == PublicFetcher::Status_Unfetchable
|
||||
|
||||
# local users don't need to be fetched
|
||||
if @person.local?
|
||||
set_fetch_status PublicFetcher::Status_Unfetchable
|
||||
return false
|
||||
end
|
||||
|
||||
# this record is already being worked on
|
||||
return false if @person.fetch_status > PublicFetcher::Status_Initial
|
||||
|
||||
# ok, let's go
|
||||
@person.remote? &&
|
||||
@person.fetch_status == PublicFetcher::Status_Initial
|
||||
end
|
||||
|
||||
# call the methods to fetch and process the public posts for the person
|
||||
# does some error logging, in case of an exception
|
||||
def retrieve_and_process_posts
|
||||
begin
|
||||
retrieve_posts
|
||||
rescue => e
|
||||
FEDERATION_LOGGER.error "unable to retrieve public posts for #{@person.diaspora_handle}"
|
||||
raise e
|
||||
end
|
||||
|
||||
begin
|
||||
process_posts
|
||||
rescue => e
|
||||
FEDERATION_LOGGER.error "unable to process public posts for #{@person.diaspora_handle}"
|
||||
raise e
|
||||
end
|
||||
end
|
||||
|
||||
# fetch the public posts of the person from their server and save the
|
||||
# JSON response to `@data`
|
||||
def retrieve_posts
|
||||
set_fetch_status PublicFetcher::Status_Running
|
||||
|
||||
FEDERATION_LOGGER.info "fetching public posts for #{@person.diaspora_handle}"
|
||||
|
||||
conn = Faraday.new(:url => @person.url) do |c|
|
||||
c.request :json
|
||||
c.response :json
|
||||
c.adapter :net_http
|
||||
end
|
||||
conn.headers[:user_agent] = 'diaspora-fetcher'
|
||||
conn.headers[:accept] = 'application/json'
|
||||
|
||||
resp = conn.get "/people/#{@person.guid}"
|
||||
|
||||
FEDERATION_LOGGER.debug resp.body.to_s[0..250]
|
||||
|
||||
@data = resp.body
|
||||
set_fetch_status PublicFetcher::Status_Fetched
|
||||
end
|
||||
|
||||
# process the public posts that were previously fetched with `retrieve_posts`
|
||||
# adds posts, which pass some basic sanity-checking
|
||||
# @see validate
|
||||
def process_posts
|
||||
@data.each do |post|
|
||||
next unless validate(post)
|
||||
|
||||
FEDERATION_LOGGER.info "saving fetched post (#{post['guid']}) to database"
|
||||
|
||||
FEDERATION_LOGGER.debug post.to_s[0..250]
|
||||
|
||||
entry = StatusMessage.diaspora_initialize(
|
||||
:author => @person,
|
||||
:public => true,
|
||||
:guid => post['guid'],
|
||||
:text => post['text'],
|
||||
:provider_display_name => post['provider_display_name'],
|
||||
:created_at => ActiveSupport::TimeZone.new('UTC').parse(post['created_at']),
|
||||
:interacted_at => ActiveSupport::TimeZone.new('UTC').parse(post['interacted_at']),
|
||||
:frame_name => post['frame_name']
|
||||
)
|
||||
entry.save
|
||||
end
|
||||
set_fetch_status PublicFetcher::Status_Processed
|
||||
end
|
||||
|
||||
# set and save the fetch status for the current person
|
||||
def set_fetch_status status
|
||||
return if @person.nil?
|
||||
|
||||
@person.fetch_status = status
|
||||
@person.save
|
||||
end
|
||||
|
||||
# perform various validations to make sure the post can be saved without
|
||||
# troubles
|
||||
# @see check_existing
|
||||
# @see check_author
|
||||
# @see check_public
|
||||
# @see check_type
|
||||
def validate post
|
||||
check_existing(post) && check_author(post) && check_public(post) && check_type(post)
|
||||
end
|
||||
|
||||
# hopefully there is no post with the same guid somewhere already...
|
||||
def check_existing post
|
||||
new_post = (Post.find_by_guid(post['guid']).blank?)
|
||||
|
||||
FEDERATION_LOGGER.warn "a post with that guid (#{post['guid']}) already exists" unless new_post
|
||||
|
||||
new_post
|
||||
end
|
||||
|
||||
# checks if the author of the given post is actually from the person
|
||||
# we're currently processing
|
||||
def check_author post
|
||||
guid = post['author']['guid']
|
||||
equal = (guid == @person.guid)
|
||||
|
||||
FEDERATION_LOGGER.warn "the author (#{guid}) does not match the person currently being processed (#{@person.guid})" unless equal
|
||||
|
||||
equal
|
||||
end
|
||||
|
||||
# returns wether the given post is public
|
||||
def check_public post
|
||||
ispublic = (post['public'] == true)
|
||||
|
||||
FEDERATION_LOGGER.warn "the post (#{post['guid']}) is not public, this is not intended..." unless ispublic
|
||||
|
||||
ispublic
|
||||
end
|
||||
|
||||
# see, if the type of the given post is something we can handle
|
||||
def check_type post
|
||||
type_ok = (post['post_type'] == "StatusMessage")
|
||||
|
||||
FEDERATION_LOGGER.warn "the post (#{post['guid']}) has a type, which cannot be handled (#{post['post_type']})" unless type_ok
|
||||
|
||||
type_ok
|
||||
end
|
||||
end
|
||||
|
|
@ -1,3 +1,7 @@
|
|||
# Copyright (c) 2010-2012, Diaspora Inc. This file is
|
||||
# licensed under the Affero General Public License version 3 or later. See
|
||||
# the COPYRIGHT file.
|
||||
|
||||
require Rails.root.join('lib', 'hcard')
|
||||
require Rails.root.join('lib', 'webfinger_profile')
|
||||
|
||||
|
|
|
|||
1
spec/fixtures/public_posts.json
vendored
Normal file
1
spec/fixtures/public_posts.json
vendored
Normal file
File diff suppressed because one or more lines are too long
211
spec/lib/diaspora/fetcher/public_spec.rb
Normal file
211
spec/lib/diaspora/fetcher/public_spec.rb
Normal file
|
|
@ -0,0 +1,211 @@
|
|||
# Copyright (c) 2010-2012, Diaspora Inc. This file is
|
||||
# licensed under the Affero General Public License version 3 or later. See
|
||||
# the COPYRIGHT file.
|
||||
|
||||
require Rails.root.join('lib','diaspora','fetcher','public')
|
||||
require 'spec_helper'
|
||||
|
||||
# Tests fetching public posts of a person on a remote server
|
||||
describe PublicFetcher do
|
||||
before do
|
||||
|
||||
# the fixture is taken from an actual json request.
|
||||
# it contains 10 StatusMessages and 5 Reshares, all of them public
|
||||
# the guid of the person is "7445f9a0a6c28ebb"
|
||||
@fixture = File.open(Rails.root.join('spec', 'fixtures', 'public_posts.json')).read
|
||||
@fetcher = PublicFetcher.new
|
||||
@person = Factory(:person, {:guid => "7445f9a0a6c28ebb",
|
||||
:url => "https://remote-testpod.net",
|
||||
:diaspora_handle => "testuser@remote-testpod.net"})
|
||||
|
||||
stub_request(:get, /remote-testpod.net\/people\/.*/)
|
||||
.with(:headers => {'Accept'=>'application/json'})
|
||||
.to_return(:body => @fixture)
|
||||
end
|
||||
|
||||
describe "#retrieve_posts" do
|
||||
before do
|
||||
person = @person
|
||||
@fetcher.instance_eval {
|
||||
@person = person
|
||||
retrieve_posts
|
||||
}
|
||||
end
|
||||
|
||||
it "sets the operation status on the person" do
|
||||
@person.reload
|
||||
@person.fetch_status.should_not eql(PublicFetcher::Status_Initial)
|
||||
@person.fetch_status.should eql(PublicFetcher::Status_Fetched)
|
||||
end
|
||||
|
||||
it "sets the @data variable to the parsed JSON data" do
|
||||
data = @fetcher.instance_eval {
|
||||
@data
|
||||
}
|
||||
data.should_not be_nil
|
||||
data.size.should eql JSON.parse(@fixture).size
|
||||
end
|
||||
end
|
||||
|
||||
describe "#process_posts" do
|
||||
before do
|
||||
person = @person
|
||||
data = JSON.parse(@fixture)
|
||||
|
||||
@fetcher.instance_eval {
|
||||
@person = person
|
||||
@data = data
|
||||
}
|
||||
end
|
||||
|
||||
it 'creates 10 new posts in the database' do
|
||||
before_count = Post.count
|
||||
@fetcher.instance_eval {
|
||||
process_posts
|
||||
}
|
||||
after_count = Post.count
|
||||
after_count.should eql(before_count + 10)
|
||||
end
|
||||
|
||||
it 'sets the operation status on the person' do
|
||||
@fetcher.instance_eval {
|
||||
process_posts
|
||||
}
|
||||
|
||||
@person.reload
|
||||
@person.fetch_status.should_not eql(PublicFetcher::Status_Initial)
|
||||
@person.fetch_status.should eql(PublicFetcher::Status_Processed)
|
||||
end
|
||||
end
|
||||
|
||||
context "private methods" do
|
||||
let(:public_fetcher) { PublicFetcher.new }
|
||||
|
||||
describe '#qualifies_for_fetching?' do
|
||||
it "raises an error if the person doesn't exist" do
|
||||
lambda {
|
||||
public_fetcher.instance_eval {
|
||||
@person = Person.by_account_identifier "someone@unknown.com"
|
||||
qualifies_for_fetching?
|
||||
}
|
||||
}.should raise_error ActiveRecord::RecordNotFound
|
||||
end
|
||||
|
||||
it 'returns false if the person is unfetchable' do
|
||||
public_fetcher.instance_eval {
|
||||
@person = Factory(:person, {:fetch_status => PublicFetcher::Status_Unfetchable})
|
||||
qualifies_for_fetching?
|
||||
}.should be_false
|
||||
end
|
||||
|
||||
it 'returns false and sets the person unfetchable for a local account' do
|
||||
user = Factory(:user)
|
||||
public_fetcher.instance_eval {
|
||||
@person = user.person
|
||||
qualifies_for_fetching?
|
||||
}.should be_false
|
||||
user.person.fetch_status.should eql PublicFetcher::Status_Unfetchable
|
||||
end
|
||||
|
||||
it 'returns false if the person is processing already (or has been processed)' do
|
||||
person = Factory(:person)
|
||||
person.fetch_status = PublicFetcher::Status_Fetched
|
||||
person.save
|
||||
public_fetcher.instance_eval {
|
||||
@person = person
|
||||
qualifies_for_fetching?
|
||||
}.should be_false
|
||||
end
|
||||
|
||||
it "returns true, if the user is remote and hasn't been fetched" do
|
||||
person = Factory(:person, {:diaspora_handle => 'neo@theone.net'})
|
||||
public_fetcher.instance_eval {
|
||||
@person = person
|
||||
qualifies_for_fetching?
|
||||
}.should be_true
|
||||
end
|
||||
end
|
||||
|
||||
describe '#set_fetch_status' do
|
||||
it 'sets the current status of fetching on the person' do
|
||||
person = @person
|
||||
public_fetcher.instance_eval {
|
||||
@person = person
|
||||
set_fetch_status PublicFetcher::Status_Unfetchable
|
||||
}
|
||||
@person.fetch_status.should eql PublicFetcher::Status_Unfetchable
|
||||
|
||||
public_fetcher.instance_eval {
|
||||
set_fetch_status PublicFetcher::Status_Initial
|
||||
}
|
||||
@person.fetch_status.should eql PublicFetcher::Status_Initial
|
||||
end
|
||||
end
|
||||
|
||||
describe '#validate' do
|
||||
it "calls all validation helper methods" do
|
||||
public_fetcher.should_receive(:check_existing).and_return(true)
|
||||
public_fetcher.should_receive(:check_author).and_return(true)
|
||||
public_fetcher.should_receive(:check_public).and_return(true)
|
||||
public_fetcher.should_receive(:check_type).and_return(true)
|
||||
|
||||
public_fetcher.instance_eval { validate({}) }.should be_true
|
||||
end
|
||||
end
|
||||
|
||||
describe '#check_existing' do
|
||||
it 'returns false if a post with the same guid exists' do
|
||||
post = {'guid' => Factory(:status_message).guid}
|
||||
public_fetcher.instance_eval { check_existing post }.should be_false
|
||||
end
|
||||
|
||||
it 'returns true if the guid cannot be found' do
|
||||
post = {'guid' => SecureRandom.hex(8)}
|
||||
public_fetcher.instance_eval { check_existing post }.should be_true
|
||||
end
|
||||
end
|
||||
|
||||
describe '#check_author' do
|
||||
let!(:some_person) { Factory(:person) }
|
||||
|
||||
before do
|
||||
person = some_person
|
||||
public_fetcher.instance_eval { @person = person }
|
||||
end
|
||||
|
||||
it "returns false if the person doesn't match" do
|
||||
post = { 'author' => { 'guid' => SecureRandom.hex(8) } }
|
||||
public_fetcher.instance_eval { check_author post }.should be_false
|
||||
end
|
||||
|
||||
it "returns true if the persons match" do
|
||||
post = { 'author' => { 'guid' => some_person.guid } }
|
||||
public_fetcher.instance_eval { check_author post }.should be_true
|
||||
end
|
||||
end
|
||||
|
||||
describe '#check_public' do
|
||||
it "returns false if the post is not public" do
|
||||
post = {'public' => false}
|
||||
public_fetcher.instance_eval { check_public post }.should be_false
|
||||
end
|
||||
|
||||
it "returns true if the post is public" do
|
||||
post = {'public' => true}
|
||||
public_fetcher.instance_eval { check_public post }.should be_true
|
||||
end
|
||||
end
|
||||
|
||||
describe '#check_type' do
|
||||
it "returns false if the type is anything other that 'StatusMessage'" do
|
||||
post = {'post_type'=>'Reshare'}
|
||||
public_fetcher.instance_eval { check_type post }.should be_false
|
||||
end
|
||||
|
||||
it "returns true if the type is 'StatusMessage'" do
|
||||
post = {'post_type'=>'StatusMessage'}
|
||||
public_fetcher.instance_eval { check_type post }.should be_true
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
Loading…
Reference in a new issue