diff --git a/Changelog.md b/Changelog.md index 772876799..4d0a518a5 100644 --- a/Changelog.md +++ b/Changelog.md @@ -13,6 +13,7 @@ * Enable frozen string literals [#7595](https://github.com/diaspora/diaspora/pull/7595) * Remove `rails_admin_histories` table [#7597](https://github.com/diaspora/diaspora/pull/7597) * Optimize memory usage on profile export [#7627](https://github.com/diaspora/diaspora/pull/7627) +* Limit the number of parallel exports [#7629](https://github.com/diaspora/diaspora/pull/7629) ## Bug fixes * Fix displaying polls with long answers [#7579](https://github.com/diaspora/diaspora/pull/7579) diff --git a/app/workers/export_user.rb b/app/workers/export_user.rb index b73535d17..247b8a6e3 100644 --- a/app/workers/export_user.rb +++ b/app/workers/export_user.rb @@ -4,12 +4,25 @@ # licensed under the Affero General Public License version 3 or later. See # the COPYRIGHT file. - module Workers class ExportUser < Base sidekiq_options queue: :low + include Diaspora::Logging + def perform(user_id) + if currently_running_exports >= AppConfig.settings.export_concurrency.to_i + logger.info "Already the maximum number of parallel user exports running, " \ + "scheduling export for User:#{user_id} in 5 minutes." + self.class.perform_in(5.minutes + rand(30), user_id) + else + export_user(user_id) + end + end + + private + + def export_user(user_id) @user = User.find(user_id) @user.perform_export! @@ -19,5 +32,13 @@ module Workers ExportMailer.export_failure_for(@user).deliver_now end end + + def currently_running_exports + return 0 if AppConfig.environment.single_process_mode? + Sidekiq::Workers.new.count do |process_id, thread_id, work| + !(Process.pid.to_s == process_id.split(":")[1] && Thread.current.object_id.to_s(36) == thread_id) && + work["payload"]["class"] == self.class.to_s + end + end end end diff --git a/config/defaults.yml b/config/defaults.yml index c28ef2696..38277b295 100644 --- a/config/defaults.yml +++ b/config/defaults.yml @@ -111,6 +111,7 @@ defaults: suggest_email: typhoeus_verbose: false typhoeus_concurrency: 20 + export_concurrency: 1 username_blacklist: - 'admin' - 'administrator' diff --git a/config/diaspora.yml.example b/config/diaspora.yml.example index b2573625d..d52378c29 100644 --- a/config/diaspora.yml.example +++ b/config/diaspora.yml.example @@ -455,6 +455,11 @@ configuration: ## Section ## of your Sidekiq workers. #typhoeus_concurrency: 20 + ## Maximum number of parallel user data export jobs (default=1) + ## Be careful, exports of big/old profiles can use a lot of memory, running + ## many of them in parallel can be a problem for small servers. + #export_concurrency: 1 + ## Captcha settings captcha: ## Section diff --git a/spec/workers/export_user_spec.rb b/spec/workers/export_user_spec.rb index 72b0c341f..e72410149 100644 --- a/spec/workers/export_user_spec.rb +++ b/spec/workers/export_user_spec.rb @@ -22,4 +22,68 @@ describe Workers::ExportUser do expect(ExportMailer).to receive(:export_failure_for).with(alice).and_call_original Workers::ExportUser.new.perform(alice.id) end + + context "concurrency" do + before do + AppConfig.environment.single_process_mode = false + AppConfig.settings.export_concurrency = 1 + end + + after :all do + AppConfig.environment.single_process_mode = true + end + + let(:pid) { "#{Socket.gethostname}:#{Process.pid}:#{SecureRandom.hex(6)}" } + + it "schedules a job for later when already another parallel export job is running" do + expect(Sidekiq::Workers).to receive(:new).and_return( + [[pid, SecureRandom.hex(4), {"payload" => {"class" => "Workers::ExportUser"}}]] + ) + + expect(Workers::ExportUser).to receive(:perform_in).with(kind_of(Integer), alice.id) + expect(alice).not_to receive(:perform_export!) + + Workers::ExportUser.new.perform(alice.id) + end + + it "runs the export when the own running job" do + expect(Sidekiq::Workers).to receive(:new).and_return( + [[pid, Thread.current.object_id.to_s(36), {"payload" => {"class" => "Workers::ExportUser"}}]] + ) + + expect(Workers::ExportUser).not_to receive(:perform_in).with(kind_of(Integer), alice.id) + expect(alice).to receive(:perform_export!) + + Workers::ExportUser.new.perform(alice.id) + end + + it "runs the export when no other job is running" do + expect(Sidekiq::Workers).to receive(:new).and_return([]) + + expect(Workers::ExportUser).not_to receive(:perform_in).with(kind_of(Integer), alice.id) + expect(alice).to receive(:perform_export!) + + Workers::ExportUser.new.perform(alice.id) + end + + it "runs the export when some other job is running" do + expect(Sidekiq::Workers).to receive(:new).and_return( + [[pid, SecureRandom.hex(4), {"payload" => {"class" => "Workers::OtherJob"}}]] + ) + + expect(Workers::ExportUser).not_to receive(:perform_in).with(kind_of(Integer), alice.id) + expect(alice).to receive(:perform_export!) + + Workers::ExportUser.new.perform(alice.id) + end + + it "runs the export when diaspora is in single process mode" do + AppConfig.environment.single_process_mode = true + expect(Sidekiq::Workers).not_to receive(:new) + expect(Workers::ExportUser).not_to receive(:perform_in).with(kind_of(Integer), alice.id) + expect(alice).to receive(:perform_export!) + + Workers::ExportUser.new.perform(alice.id) + end + end end