Refactory archive concurrency so the same logic can be reused
This commit is contained in:
parent
1eb2c59cce
commit
96493b4a5c
6 changed files with 53 additions and 61 deletions
38
app/workers/archive_base.rb
Normal file
38
app/workers/archive_base.rb
Normal file
|
|
@ -0,0 +1,38 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
# Copyright (c) 2010-2011, Diaspora Inc. This file is
|
||||
# licensed under the Affero General Public License version 3 or later. See
|
||||
# the COPYRIGHT file.
|
||||
|
||||
module Workers
|
||||
class ArchiveBase < Base
|
||||
sidekiq_options queue: :low
|
||||
|
||||
include Diaspora::Logging
|
||||
|
||||
def perform(*args)
|
||||
if currently_running_archive_jobs >= AppConfig.settings.archive_jobs_concurrency.to_i
|
||||
logger.info "Already the maximum number of parallel archive jobs running, " \
|
||||
"scheduling #{self.class}:#{args} in 5 minutes."
|
||||
self.class.perform_in(5.minutes + rand(30), *args)
|
||||
else
|
||||
perform_archive_job(*args)
|
||||
end
|
||||
end
|
||||
|
||||
private
|
||||
|
||||
def perform_archive_job(_args)
|
||||
raise NotImplementedError, "You must override perform_archive_job"
|
||||
end
|
||||
|
||||
def currently_running_archive_jobs
|
||||
return 0 if AppConfig.environment.single_process_mode?
|
||||
|
||||
Sidekiq::Workers.new.count do |process_id, thread_id, work|
|
||||
!(Process.pid.to_s == process_id.split(":")[1] && Thread.current.object_id.to_s(36) == thread_id) &&
|
||||
ArchiveBase.subclasses.map(&:to_s).include?(work["payload"]["class"])
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
@ -5,39 +5,17 @@
|
|||
# the COPYRIGHT file.
|
||||
|
||||
module Workers
|
||||
class ExportUser < Base
|
||||
sidekiq_options queue: :low
|
||||
|
||||
include Diaspora::Logging
|
||||
|
||||
def perform(user_id)
|
||||
if currently_running_exports >= AppConfig.settings.export_concurrency.to_i
|
||||
logger.info "Already the maximum number of parallel user exports running, " \
|
||||
"scheduling export for User:#{user_id} in 5 minutes."
|
||||
self.class.perform_in(5.minutes + rand(30), user_id)
|
||||
else
|
||||
export_user(user_id)
|
||||
end
|
||||
end
|
||||
|
||||
class ExportUser < ArchiveBase
|
||||
private
|
||||
|
||||
def export_user(user_id)
|
||||
@user = User.find(user_id)
|
||||
@user.perform_export!
|
||||
def perform_archive_job(user_id)
|
||||
user = User.find(user_id)
|
||||
user.perform_export!
|
||||
|
||||
if @user.reload.export.present?
|
||||
ExportMailer.export_complete_for(@user).deliver_now
|
||||
if user.reload.export.present?
|
||||
ExportMailer.export_complete_for(user).deliver_now
|
||||
else
|
||||
ExportMailer.export_failure_for(@user).deliver_now
|
||||
end
|
||||
end
|
||||
|
||||
def currently_running_exports
|
||||
return 0 if AppConfig.environment.single_process_mode?
|
||||
Sidekiq::Workers.new.count do |process_id, thread_id, work|
|
||||
!(Process.pid.to_s == process_id.split(":")[1] && Thread.current.object_id.to_s(36) == thread_id) &&
|
||||
work["payload"]["class"] == self.class.to_s
|
||||
ExportMailer.export_failure_for(user).deliver_now
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
|||
|
|
@ -1,36 +1,12 @@
|
|||
# frozen_string_literal: true
|
||||
|
||||
module Workers
|
||||
class ImportUser < Base
|
||||
sidekiq_options queue: :low
|
||||
|
||||
include Diaspora::Logging
|
||||
|
||||
def perform(user_id)
|
||||
if currently_running_exports >= AppConfig.settings.export_concurrency.to_i
|
||||
|
||||
logger.info "Already the maximum number of parallel user imports running, " \
|
||||
"scheduling import for User:#{user_id} in 5 minutes."
|
||||
self.class.perform_in(5.minutes + rand(30), user_id)
|
||||
else
|
||||
import_user(user_id)
|
||||
end
|
||||
end
|
||||
|
||||
class ImportUser < ArchiveBase
|
||||
private
|
||||
|
||||
def import_user(user_id)
|
||||
def perform_archive_job(user_id)
|
||||
user = User.find(user_id)
|
||||
ImportService.new.import_by_user(user)
|
||||
end
|
||||
|
||||
def currently_running_exports
|
||||
return 0 if AppConfig.environment.single_process_mode?
|
||||
|
||||
Sidekiq::Workers.new.count do |process_id, thread_id, work|
|
||||
!(Process.pid.to_s == process_id.split(":")[1] && Thread.current.object_id.to_s(36) == thread_id) &&
|
||||
work["payload"]["class"] == self.class.to_s
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
|
|
|
|||
|
|
@ -97,7 +97,7 @@ defaults:
|
|||
suggest_email:
|
||||
typhoeus_verbose: false
|
||||
typhoeus_concurrency: 20
|
||||
export_concurrency: 1
|
||||
archive_jobs_concurrency: 1
|
||||
username_blacklist:
|
||||
- 'admin'
|
||||
- 'administrator'
|
||||
|
|
|
|||
|
|
@ -344,10 +344,10 @@
|
|||
## of your Sidekiq workers.
|
||||
#typhoeus_concurrency = 20
|
||||
|
||||
## Maximum number of parallel user data export jobs (default=1)
|
||||
## Be careful, exports of big/old profiles can use a lot of memory, running
|
||||
## many of them in parallel can be a problem for small servers.
|
||||
#export_concurrency = 1
|
||||
## Maximum number of parallel user data import/export jobs (default=1)
|
||||
## Be careful, imports and exports of big/old profiles can use a lot of memory,
|
||||
## running many of them in parallel can be a problem for small servers.
|
||||
#archive_jobs_concurrency = 1
|
||||
|
||||
## Welcome Message settings
|
||||
[configuration.settings.welcome_message]
|
||||
|
|
|
|||
|
|
@ -26,7 +26,7 @@ describe Workers::ExportUser do
|
|||
context "concurrency" do
|
||||
before do
|
||||
AppConfig.environment.single_process_mode = false
|
||||
AppConfig.settings.export_concurrency = 1
|
||||
AppConfig.settings.archive_jobs_concurrency = 1
|
||||
end
|
||||
|
||||
after :all do
|
||||
|
|
|
|||
Loading…
Reference in a new issue