Limit the number of parallel exports that are allowed to run
closes #7629
This commit is contained in:
parent
b8fb4b6251
commit
fd36517dee
5 changed files with 93 additions and 1 deletions
|
|
@ -13,6 +13,7 @@
|
||||||
* Enable frozen string literals [#7595](https://github.com/diaspora/diaspora/pull/7595)
|
* Enable frozen string literals [#7595](https://github.com/diaspora/diaspora/pull/7595)
|
||||||
* Remove `rails_admin_histories` table [#7597](https://github.com/diaspora/diaspora/pull/7597)
|
* Remove `rails_admin_histories` table [#7597](https://github.com/diaspora/diaspora/pull/7597)
|
||||||
* Optimize memory usage on profile export [#7627](https://github.com/diaspora/diaspora/pull/7627)
|
* Optimize memory usage on profile export [#7627](https://github.com/diaspora/diaspora/pull/7627)
|
||||||
|
* Limit the number of parallel exports [#7629](https://github.com/diaspora/diaspora/pull/7629)
|
||||||
|
|
||||||
## Bug fixes
|
## Bug fixes
|
||||||
* Fix displaying polls with long answers [#7579](https://github.com/diaspora/diaspora/pull/7579)
|
* Fix displaying polls with long answers [#7579](https://github.com/diaspora/diaspora/pull/7579)
|
||||||
|
|
|
||||||
|
|
@ -4,12 +4,25 @@
|
||||||
# licensed under the Affero General Public License version 3 or later. See
|
# licensed under the Affero General Public License version 3 or later. See
|
||||||
# the COPYRIGHT file.
|
# the COPYRIGHT file.
|
||||||
|
|
||||||
|
|
||||||
module Workers
|
module Workers
|
||||||
class ExportUser < Base
|
class ExportUser < Base
|
||||||
sidekiq_options queue: :low
|
sidekiq_options queue: :low
|
||||||
|
|
||||||
|
include Diaspora::Logging
|
||||||
|
|
||||||
def perform(user_id)
|
def perform(user_id)
|
||||||
|
if currently_running_exports >= AppConfig.settings.export_concurrency.to_i
|
||||||
|
logger.info "Already the maximum number of parallel user exports running, " \
|
||||||
|
"scheduling export for User:#{user_id} in 5 minutes."
|
||||||
|
self.class.perform_in(5.minutes + rand(30), user_id)
|
||||||
|
else
|
||||||
|
export_user(user_id)
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
private
|
||||||
|
|
||||||
|
def export_user(user_id)
|
||||||
@user = User.find(user_id)
|
@user = User.find(user_id)
|
||||||
@user.perform_export!
|
@user.perform_export!
|
||||||
|
|
||||||
|
|
@ -19,5 +32,13 @@ module Workers
|
||||||
ExportMailer.export_failure_for(@user).deliver_now
|
ExportMailer.export_failure_for(@user).deliver_now
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
def currently_running_exports
|
||||||
|
return 0 if AppConfig.environment.single_process_mode?
|
||||||
|
Sidekiq::Workers.new.count do |process_id, thread_id, work|
|
||||||
|
!(Process.pid.to_s == process_id.split(":")[1] && Thread.current.object_id.to_s(36) == thread_id) &&
|
||||||
|
work["payload"]["class"] == self.class.to_s
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
|
||||||
|
|
@ -111,6 +111,7 @@ defaults:
|
||||||
suggest_email:
|
suggest_email:
|
||||||
typhoeus_verbose: false
|
typhoeus_verbose: false
|
||||||
typhoeus_concurrency: 20
|
typhoeus_concurrency: 20
|
||||||
|
export_concurrency: 1
|
||||||
username_blacklist:
|
username_blacklist:
|
||||||
- 'admin'
|
- 'admin'
|
||||||
- 'administrator'
|
- 'administrator'
|
||||||
|
|
|
||||||
|
|
@ -455,6 +455,11 @@ configuration: ## Section
|
||||||
## of your Sidekiq workers.
|
## of your Sidekiq workers.
|
||||||
#typhoeus_concurrency: 20
|
#typhoeus_concurrency: 20
|
||||||
|
|
||||||
|
## Maximum number of parallel user data export jobs (default=1)
|
||||||
|
## Be careful, exports of big/old profiles can use a lot of memory, running
|
||||||
|
## many of them in parallel can be a problem for small servers.
|
||||||
|
#export_concurrency: 1
|
||||||
|
|
||||||
## Captcha settings
|
## Captcha settings
|
||||||
captcha: ## Section
|
captcha: ## Section
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -22,4 +22,68 @@ describe Workers::ExportUser do
|
||||||
expect(ExportMailer).to receive(:export_failure_for).with(alice).and_call_original
|
expect(ExportMailer).to receive(:export_failure_for).with(alice).and_call_original
|
||||||
Workers::ExportUser.new.perform(alice.id)
|
Workers::ExportUser.new.perform(alice.id)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
context "concurrency" do
|
||||||
|
before do
|
||||||
|
AppConfig.environment.single_process_mode = false
|
||||||
|
AppConfig.settings.export_concurrency = 1
|
||||||
|
end
|
||||||
|
|
||||||
|
after :all do
|
||||||
|
AppConfig.environment.single_process_mode = true
|
||||||
|
end
|
||||||
|
|
||||||
|
let(:pid) { "#{Socket.gethostname}:#{Process.pid}:#{SecureRandom.hex(6)}" }
|
||||||
|
|
||||||
|
it "schedules a job for later when already another parallel export job is running" do
|
||||||
|
expect(Sidekiq::Workers).to receive(:new).and_return(
|
||||||
|
[[pid, SecureRandom.hex(4), {"payload" => {"class" => "Workers::ExportUser"}}]]
|
||||||
|
)
|
||||||
|
|
||||||
|
expect(Workers::ExportUser).to receive(:perform_in).with(kind_of(Integer), alice.id)
|
||||||
|
expect(alice).not_to receive(:perform_export!)
|
||||||
|
|
||||||
|
Workers::ExportUser.new.perform(alice.id)
|
||||||
|
end
|
||||||
|
|
||||||
|
it "runs the export when the own running job" do
|
||||||
|
expect(Sidekiq::Workers).to receive(:new).and_return(
|
||||||
|
[[pid, Thread.current.object_id.to_s(36), {"payload" => {"class" => "Workers::ExportUser"}}]]
|
||||||
|
)
|
||||||
|
|
||||||
|
expect(Workers::ExportUser).not_to receive(:perform_in).with(kind_of(Integer), alice.id)
|
||||||
|
expect(alice).to receive(:perform_export!)
|
||||||
|
|
||||||
|
Workers::ExportUser.new.perform(alice.id)
|
||||||
|
end
|
||||||
|
|
||||||
|
it "runs the export when no other job is running" do
|
||||||
|
expect(Sidekiq::Workers).to receive(:new).and_return([])
|
||||||
|
|
||||||
|
expect(Workers::ExportUser).not_to receive(:perform_in).with(kind_of(Integer), alice.id)
|
||||||
|
expect(alice).to receive(:perform_export!)
|
||||||
|
|
||||||
|
Workers::ExportUser.new.perform(alice.id)
|
||||||
|
end
|
||||||
|
|
||||||
|
it "runs the export when some other job is running" do
|
||||||
|
expect(Sidekiq::Workers).to receive(:new).and_return(
|
||||||
|
[[pid, SecureRandom.hex(4), {"payload" => {"class" => "Workers::OtherJob"}}]]
|
||||||
|
)
|
||||||
|
|
||||||
|
expect(Workers::ExportUser).not_to receive(:perform_in).with(kind_of(Integer), alice.id)
|
||||||
|
expect(alice).to receive(:perform_export!)
|
||||||
|
|
||||||
|
Workers::ExportUser.new.perform(alice.id)
|
||||||
|
end
|
||||||
|
|
||||||
|
it "runs the export when diaspora is in single process mode" do
|
||||||
|
AppConfig.environment.single_process_mode = true
|
||||||
|
expect(Sidekiq::Workers).not_to receive(:new)
|
||||||
|
expect(Workers::ExportUser).not_to receive(:perform_in).with(kind_of(Integer), alice.id)
|
||||||
|
expect(alice).to receive(:perform_export!)
|
||||||
|
|
||||||
|
Workers::ExportUser.new.perform(alice.id)
|
||||||
|
end
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
|
||||||
Loading…
Reference in a new issue