Schedule a connection-check when receiving a message from an offline pod
closes #7158
This commit is contained in:
parent
08282cea01
commit
57c0330535
9 changed files with 117 additions and 17 deletions
|
|
@ -61,6 +61,10 @@ class Pod < ActiveRecord::Base
|
||||||
def check_all!
|
def check_all!
|
||||||
Pod.find_in_batches(batch_size: 20) {|batch| batch.each(&:test_connection!) }
|
Pod.find_in_batches(batch_size: 20) {|batch| batch.each(&:test_connection!) }
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def check_scheduled!
|
||||||
|
Pod.where(scheduled_check: true).find_each(&:test_connection!)
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
def offline?
|
def offline?
|
||||||
|
|
@ -76,6 +80,10 @@ class Pod < ActiveRecord::Base
|
||||||
"#{id}:#{host}"
|
"#{id}:#{host}"
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def schedule_check_if_needed
|
||||||
|
update_column(:scheduled_check, true) if offline? && !scheduled_check
|
||||||
|
end
|
||||||
|
|
||||||
def test_connection!
|
def test_connection!
|
||||||
result = ConnectionTester.check uri.to_s
|
result = ConnectionTester.check uri.to_s
|
||||||
logger.debug "tested pod: '#{uri}' - #{result.inspect}"
|
logger.debug "tested pod: '#{uri}' - #{result.inspect}"
|
||||||
|
|
@ -108,6 +116,7 @@ class Pod < ActiveRecord::Base
|
||||||
|
|
||||||
attributes_from_result(result)
|
attributes_from_result(result)
|
||||||
touch(:checked_at)
|
touch(:checked_at)
|
||||||
|
self.scheduled_check = false
|
||||||
|
|
||||||
save
|
save
|
||||||
end
|
end
|
||||||
|
|
|
||||||
9
app/workers/recheck_scheduled_pods.rb
Normal file
9
app/workers/recheck_scheduled_pods.rb
Normal file
|
|
@ -0,0 +1,9 @@
|
||||||
|
module Workers
|
||||||
|
class RecheckScheduledPods < Base
|
||||||
|
sidekiq_options queue: :low
|
||||||
|
|
||||||
|
def perform
|
||||||
|
Pod.check_scheduled!
|
||||||
|
end
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
@ -93,7 +93,9 @@ DiasporaFederation.configure do |config|
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
on :receive_entity do |entity, _sender, recipient_id|
|
on :receive_entity do |entity, sender, recipient_id|
|
||||||
|
Person.by_account_identifier(sender).pod.try(:schedule_check_if_needed)
|
||||||
|
|
||||||
case entity
|
case entity
|
||||||
when DiasporaFederation::Entities::AccountDeletion
|
when DiasporaFederation::Entities::AccountDeletion
|
||||||
Diaspora::Federation::Receive.account_deletion(entity)
|
Diaspora::Federation::Receive.account_deletion(entity)
|
||||||
|
|
|
||||||
|
|
@ -9,3 +9,7 @@ queue_users_for_removal:
|
||||||
recurring_pod_check:
|
recurring_pod_check:
|
||||||
cron: "0 0 * * *"
|
cron: "0 0 * * *"
|
||||||
class: "Workers::RecurringPodCheck"
|
class: "Workers::RecurringPodCheck"
|
||||||
|
|
||||||
|
recheck_scheduled_pods:
|
||||||
|
cron: "*/30 * * * *"
|
||||||
|
class: "Workers::RecheckScheduledPods"
|
||||||
|
|
|
||||||
5
db/migrate/20161024231443_add_scheduled_check_to_pod.rb
Normal file
5
db/migrate/20161024231443_add_scheduled_check_to_pod.rb
Normal file
|
|
@ -0,0 +1,5 @@
|
||||||
|
class AddScheduledCheckToPod < ActiveRecord::Migration
|
||||||
|
def change
|
||||||
|
add_column :pods, :scheduled_check, :boolean, default: false, null: false
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
@ -11,7 +11,7 @@
|
||||||
#
|
#
|
||||||
# It's strongly recommended that you check this file into your version control system.
|
# It's strongly recommended that you check this file into your version control system.
|
||||||
|
|
||||||
ActiveRecord::Schema.define(version: 20161015174300) do
|
ActiveRecord::Schema.define(version: 20161024231443) do
|
||||||
|
|
||||||
create_table "account_deletions", force: :cascade do |t|
|
create_table "account_deletions", force: :cascade do |t|
|
||||||
t.string "diaspora_handle", limit: 255
|
t.string "diaspora_handle", limit: 255
|
||||||
|
|
@ -370,6 +370,7 @@ ActiveRecord::Schema.define(version: 20161015174300) do
|
||||||
t.string "error", limit: 255
|
t.string "error", limit: 255
|
||||||
t.integer "port", limit: 4
|
t.integer "port", limit: 4
|
||||||
t.boolean "blocked", default: false
|
t.boolean "blocked", default: false
|
||||||
|
t.boolean "scheduled_check", default: false, null: false
|
||||||
end
|
end
|
||||||
|
|
||||||
add_index "pods", ["checked_at"], name: "index_pods_on_checked_at", using: :btree
|
add_index "pods", ["checked_at"], name: "index_pods_on_checked_at", using: :btree
|
||||||
|
|
|
||||||
|
|
@ -338,7 +338,7 @@ describe "diaspora federation callbacks" do
|
||||||
|
|
||||||
describe ":receive_entity" do
|
describe ":receive_entity" do
|
||||||
it "receives an AccountDeletion" do
|
it "receives an AccountDeletion" do
|
||||||
account_deletion = FactoryGirl.build(:account_deletion_entity)
|
account_deletion = FactoryGirl.build(:account_deletion_entity, author: remote_person.diaspora_handle)
|
||||||
|
|
||||||
expect(Diaspora::Federation::Receive).to receive(:account_deletion).with(account_deletion)
|
expect(Diaspora::Federation::Receive).to receive(:account_deletion).with(account_deletion)
|
||||||
expect(Workers::ReceiveLocal).not_to receive(:perform_async)
|
expect(Workers::ReceiveLocal).not_to receive(:perform_async)
|
||||||
|
|
@ -347,7 +347,7 @@ describe "diaspora federation callbacks" do
|
||||||
end
|
end
|
||||||
|
|
||||||
it "receives a Retraction" do
|
it "receives a Retraction" do
|
||||||
retraction = FactoryGirl.build(:retraction_entity)
|
retraction = FactoryGirl.build(:retraction_entity, author: remote_person.diaspora_handle)
|
||||||
|
|
||||||
expect(Diaspora::Federation::Receive).to receive(:retraction).with(retraction, 42)
|
expect(Diaspora::Federation::Receive).to receive(:retraction).with(retraction, 42)
|
||||||
expect(Workers::ReceiveLocal).not_to receive(:perform_async)
|
expect(Workers::ReceiveLocal).not_to receive(:perform_async)
|
||||||
|
|
@ -356,7 +356,7 @@ describe "diaspora federation callbacks" do
|
||||||
end
|
end
|
||||||
|
|
||||||
it "receives a entity" do
|
it "receives a entity" do
|
||||||
received = FactoryGirl.build(:status_message_entity)
|
received = FactoryGirl.build(:status_message_entity, author: remote_person.diaspora_handle)
|
||||||
persisted = FactoryGirl.create(:status_message)
|
persisted = FactoryGirl.create(:status_message)
|
||||||
|
|
||||||
expect(Diaspora::Federation::Receive).to receive(:perform).with(received).and_return(persisted)
|
expect(Diaspora::Federation::Receive).to receive(:perform).with(received).and_return(persisted)
|
||||||
|
|
@ -365,8 +365,20 @@ describe "diaspora federation callbacks" do
|
||||||
DiasporaFederation.callbacks.trigger(:receive_entity, received, received.author, nil)
|
DiasporaFederation.callbacks.trigger(:receive_entity, received, received.author, nil)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
it "calls schedule_check_if_needed on the senders pod" do
|
||||||
|
received = FactoryGirl.build(:status_message_entity, author: remote_person.diaspora_handle)
|
||||||
|
persisted = FactoryGirl.create(:status_message)
|
||||||
|
|
||||||
|
expect(Person).to receive(:by_account_identifier).with(received.author).and_return(remote_person)
|
||||||
|
expect(remote_person.pod).to receive(:schedule_check_if_needed)
|
||||||
|
expect(Diaspora::Federation::Receive).to receive(:perform).with(received).and_return(persisted)
|
||||||
|
expect(Workers::ReceiveLocal).to receive(:perform_async).with(persisted.class.to_s, persisted.id, [])
|
||||||
|
|
||||||
|
DiasporaFederation.callbacks.trigger(:receive_entity, received, received.author, nil)
|
||||||
|
end
|
||||||
|
|
||||||
it "receives a entity for a recipient" do
|
it "receives a entity for a recipient" do
|
||||||
received = FactoryGirl.build(:status_message_entity)
|
received = FactoryGirl.build(:status_message_entity, author: remote_person.diaspora_handle)
|
||||||
persisted = FactoryGirl.create(:status_message)
|
persisted = FactoryGirl.create(:status_message)
|
||||||
|
|
||||||
expect(Diaspora::Federation::Receive).to receive(:perform).with(received).and_return(persisted)
|
expect(Diaspora::Federation::Receive).to receive(:perform).with(received).and_return(persisted)
|
||||||
|
|
@ -376,7 +388,7 @@ describe "diaspora federation callbacks" do
|
||||||
end
|
end
|
||||||
|
|
||||||
it "does not trigger a ReceiveLocal job if Receive.perform returned nil" do
|
it "does not trigger a ReceiveLocal job if Receive.perform returned nil" do
|
||||||
received = FactoryGirl.build(:status_message_entity)
|
received = FactoryGirl.build(:status_message_entity, author: remote_person.diaspora_handle)
|
||||||
|
|
||||||
expect(Diaspora::Federation::Receive).to receive(:perform).with(received).and_return(nil)
|
expect(Diaspora::Federation::Receive).to receive(:perform).with(received).and_return(nil)
|
||||||
expect(Workers::ReceiveLocal).not_to receive(:perform_async)
|
expect(Workers::ReceiveLocal).not_to receive(:perform_async)
|
||||||
|
|
|
||||||
|
|
@ -82,6 +82,16 @@ describe Pod, type: :model do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
describe ".check_scheduled!" do
|
||||||
|
it "calls #test_connection! on all scheduled pods" do
|
||||||
|
(0..4).map { FactoryGirl.create(:pod) }
|
||||||
|
FactoryGirl.create(:pod, scheduled_check: true)
|
||||||
|
|
||||||
|
expect_any_instance_of(Pod).to receive(:test_connection!)
|
||||||
|
Pod.check_scheduled!
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
describe "#active?" do
|
describe "#active?" do
|
||||||
it "returns true for an unchecked pod" do
|
it "returns true for an unchecked pod" do
|
||||||
pod = FactoryGirl.create(:pod)
|
pod = FactoryGirl.create(:pod)
|
||||||
|
|
@ -104,6 +114,32 @@ describe Pod, type: :model do
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
describe "#schedule_check_if_needed" do
|
||||||
|
it "schedules the pod for the next check if it is offline" do
|
||||||
|
pod = FactoryGirl.create(:pod, status: :net_failed)
|
||||||
|
pod.schedule_check_if_needed
|
||||||
|
expect(pod.scheduled_check).to be_truthy
|
||||||
|
end
|
||||||
|
|
||||||
|
it "does nothing if the pod unchecked" do
|
||||||
|
pod = FactoryGirl.create(:pod)
|
||||||
|
pod.schedule_check_if_needed
|
||||||
|
expect(pod.scheduled_check).to be_falsey
|
||||||
|
end
|
||||||
|
|
||||||
|
it "does nothing if the pod is online" do
|
||||||
|
pod = FactoryGirl.create(:pod, status: :no_errors)
|
||||||
|
pod.schedule_check_if_needed
|
||||||
|
expect(pod.scheduled_check).to be_falsey
|
||||||
|
end
|
||||||
|
|
||||||
|
it "does nothing if the pod is scheduled for the next check" do
|
||||||
|
pod = FactoryGirl.create(:pod, status: :no_errors, scheduled_check: true)
|
||||||
|
expect(pod).not_to receive(:update_column)
|
||||||
|
pod.schedule_check_if_needed
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
describe "#test_connection!" do
|
describe "#test_connection!" do
|
||||||
before do
|
before do
|
||||||
@pod = FactoryGirl.create(:pod)
|
@pod = FactoryGirl.create(:pod)
|
||||||
|
|
@ -127,6 +163,16 @@ describe Pod, type: :model do
|
||||||
expect(@pod.checked_at).to be_within(1.second).of Time.zone.now
|
expect(@pod.checked_at).to be_within(1.second).of Time.zone.now
|
||||||
end
|
end
|
||||||
|
|
||||||
|
it "resets the scheduled_check flag" do
|
||||||
|
allow(@result).to receive(:error)
|
||||||
|
allow(@result).to receive(:error?)
|
||||||
|
@pod.update_column(:scheduled_check, true)
|
||||||
|
|
||||||
|
@pod.test_connection!
|
||||||
|
|
||||||
|
expect(@pod.scheduled_check).to be_falsey
|
||||||
|
end
|
||||||
|
|
||||||
it "handles a failed check" do
|
it "handles a failed check" do
|
||||||
expect(@result).to receive(:error?).at_least(:once) { true }
|
expect(@result).to receive(:error?).at_least(:once) { true }
|
||||||
expect(@result).to receive(:error).at_least(:once) { ConnectionTester::NetFailure.new }
|
expect(@result).to receive(:error).at_least(:once) { ConnectionTester::NetFailure.new }
|
||||||
|
|
|
||||||
12
spec/workers/recheck_offline_pods_spec.rb
Normal file
12
spec/workers/recheck_offline_pods_spec.rb
Normal file
|
|
@ -0,0 +1,12 @@
|
||||||
|
|
||||||
|
require "spec_helper"
|
||||||
|
|
||||||
|
describe Workers::RecheckScheduledPods do
|
||||||
|
it "performs a connection test on all scheduled pods" do
|
||||||
|
(0..4).map { FactoryGirl.create(:pod) }
|
||||||
|
FactoryGirl.create(:pod, scheduled_check: true)
|
||||||
|
|
||||||
|
expect_any_instance_of(Pod).to receive(:test_connection!)
|
||||||
|
Workers::RecheckScheduledPods.new.perform
|
||||||
|
end
|
||||||
|
end
|
||||||
Loading…
Reference in a new issue