Migration Backend Part

This commit is contained in:
Thorsten Claus 2021-09-19 14:48:38 +02:00 committed by Benjamin Neff
parent ced6905cbc
commit 6c4c6f8889
No known key found for this signature in database
GPG key ID: 971464C3F1A90194
17 changed files with 257 additions and 84 deletions

1
.gitignore vendored
View file

@ -77,3 +77,4 @@ diaspora.iml
# WebTranslateIt
.wti
/__MACOSX/

View file

@ -154,6 +154,8 @@ class UsersController < ApplicationController
:post_default_public,
:otp_required_for_login,
:otp_secret,
:exported_photos_file,
:export,
email_preferences: UserPreference::VALID_EMAIL_TYPES.map(&:to_sym)
)
end
@ -172,6 +174,8 @@ class UsersController < ApplicationController
change_post_default(user_data)
elsif user_data[:color_theme]
change_settings(user_data, "users.update.color_theme_changed", "users.update.color_theme_not_changed")
elsif user_data[:export] || user_data[:exported_photos_file]
upload_export_files(user_data)
else
change_settings(user_data)
end
@ -235,6 +239,19 @@ class UsersController < ApplicationController
end
end
def upload_export_files(user_data)
logger.info "Start importing account"
@user.export = user_data[:export] if user_data[:export]
@user.exported_photos_file = user_data[:exported_photos_file] if user_data[:exported_photos_file]
if @user.save
flash.now[:notice] = "Your account migration has been scheduled"
else
flash.now[:error] = "Your account migration could not be scheduled for the following reason:"\
" #{@user.errors.full_messages}"
end
Workers::ImportUser.perform_async(@user.id)
end
def change_settings(user_data, successful="users.update.settings_updated", error="users.update.settings_not_updated")
if @user.update_attributes(user_data)
flash.now[:notice] = t(successful)

View file

@ -7,7 +7,7 @@ class AccountMigration < ApplicationRecord
belongs_to :new_person, class_name: "Person"
validates :old_person, uniqueness: true
validates :new_person, uniqueness: true
validates :new_person, presence: true
after_create :lock_old_user!
@ -28,7 +28,6 @@ class AccountMigration < ApplicationRecord
@sender ||= old_user || ephemeral_sender
end
# executes a migration plan according to this AccountMigration object
def perform!
raise "already performed" if performed?

View file

@ -22,7 +22,7 @@ class Photo < ApplicationRecord
large: photo.url(:scaled_full),
raw: photo.url
}
}, :as => :sizes
}, as: :sizes
t.add lambda { |photo|
{
height: photo.height,
@ -48,25 +48,25 @@ class Photo < ApplicationRecord
before_destroy :ensure_user_picture
after_destroy :clear_empty_status_message
after_commit :on => :create do
queue_processing_job if self.author.local?
after_commit on: :create do
queue_processing_job if author.local?
end
scope :on_statuses, ->(post_guids) {
where(:status_message_guid => post_guids)
where(status_message_guid: post_guids)
}
def clear_empty_status_message
if self.status_message && self.status_message.text_and_photos_blank?
self.status_message.destroy
if status_message&.text_and_photos_blank?
status_message.destroy
else
true
end
end
def ownership_of_status_message
message = StatusMessage.find_by_guid(self.status_message_guid)
message = StatusMessage.find_by(guid: status_message_guid)
return unless status_message_guid && message && diaspora_handle != message.diaspora_handle
errors.add(:base, "Photo must have the same owner as status message")
@ -96,26 +96,22 @@ class Photo < ApplicationRecord
end
def update_remote_path
unless self.unprocessed_image.url.match(/^https?:\/\//)
remote_path = "#{AppConfig.pod_uri.to_s.chomp("/")}#{self.unprocessed_image.url}"
else
remote_path = self.unprocessed_image.url
end
remote_path = if unprocessed_image.url.match(%r{^https?://})
unprocessed_image.url
else
"#{AppConfig.pod_uri.to_s.chomp('/')}#{unprocessed_image.url}"
end
name_start = remote_path.rindex '/'
name_start = remote_path.rindex "/"
self.remote_photo_path = "#{remote_path.slice(0, name_start)}/"
self.remote_photo_name = remote_path.slice(name_start + 1, remote_path.length)
end
def url(name = nil)
if remote_photo_path
name = name.to_s + '_' if name
def url(name=nil)
if remote_photo_path.present? && remote_photo_name.present?
name = "#{name}_" if name
image_url = remote_photo_path + name.to_s + remote_photo_name
if AppConfig.privacy.camo.proxy_remote_pod_images?
Diaspora::Camo.image_url(image_url)
else
image_url
end
camo_image_url(image_url)
elsif processed?
processed_image.url(name)
else
@ -124,7 +120,7 @@ class Photo < ApplicationRecord
end
def ensure_user_picture
profiles = Profile.where(:image_url => url(:thumb_large))
profiles = Profile.where(image_url: url(:thumb_large))
profiles.each { |profile|
profile.image_url = nil
profile.save
@ -132,7 +128,7 @@ class Photo < ApplicationRecord
end
def queue_processing_job
Workers::ProcessPhoto.perform_async(self.id)
Workers::ProcessPhoto.perform_async(id)
end
def self.visible(current_user, person, limit=:all, max_time=nil)
@ -143,4 +139,14 @@ class Photo < ApplicationRecord
end
photos.where(pending: false).order("created_at DESC")
end
private
def camo_image_url(image_url)
if AppConfig.privacy.camo.proxy_remote_pod_images?
Diaspora::Camo.image_url(image_url)
else
image_url
end
end
end

View file

@ -325,9 +325,9 @@ class User < ApplicationRecord
else
update exporting: false
end
rescue => error
logger.error "Unexpected error while exporting user '#{username}': #{error.class}: #{error.message}\n" \
"#{error.backtrace.first(15).join("\n")}"
rescue StandardError => e
logger.error "Unexpected error while exporting data for '#{username}': #{e.class}: #{e.message}\n" \
"#{e.backtrace.first(15).join("\n")}"
update exporting: false
end
@ -335,7 +335,7 @@ class User < ApplicationRecord
ActiveSupport::Gzip.compress Diaspora::Exporter.new(self).execute
end
######### Photos export ##################
######### Photo export ##################
mount_uploader :exported_photos_file, ExportedPhotos
def queue_export_photos
@ -345,9 +345,9 @@ class User < ApplicationRecord
def perform_export_photos!
PhotoExporter.new(self).perform
rescue => error
logger.error "Unexpected error while exporting photos for '#{username}': #{error.class}: #{error.message}\n" \
"#{error.backtrace.first(15).join("\n")}"
rescue StandardError => e
logger.error "Unexpected error while exporting photos for '#{username}': #{e.class}: #{e.message}\n" \
"#{e.backtrace.first(15).join("\n")}"
update exporting_photos: false
end
@ -403,13 +403,19 @@ class User < ApplicationRecord
tag_followings.any? || profile[:image_url]
end
###Helpers############
def self.build(opts = {})
### Helpers ############
def self.build(opts={})
u = User.new(opts.except(:person, :id))
u.setup(opts)
u
end
def self.find_or_build(opts={})
user = User.find_by(username: opts[:username])
user ||= User.build(opts)
user
end
def setup(opts)
self.username = opts[:username]
self.email = opts[:email]
@ -417,10 +423,11 @@ class User < ApplicationRecord
self.language ||= I18n.locale.to_s
self.color_theme = opts[:color_theme]
self.color_theme ||= AppConfig.settings.default_color_theme
self.valid?
valid?
errors = self.errors
errors.delete :person
return if errors.size > 0
self.set_person(Person.new((opts[:person] || {}).except(:id)))
self.generate_keys
self

View file

@ -0,0 +1,96 @@
# frozen_string_literal: true
class ImportService
include Diaspora::Logging
def import_by_user(user)
import_by_files(user.export.current_path, user.exported_photos_file.current_path, user.username)
end
def import_by_files(path_to_profile, path_to_photos, username)
if path_to_profile.present?
logger.info "Import for profile #{username} at path #{path_to_profile} requested"
import_user_profile(path_to_profile, username)
end
user = User.find_by(username: username)
raise ArgumentError, "Username #{username} should exist before uploading photos." if user.nil?
if path_to_photos.present?
logger.info("Importing photos from import file for '#{username}' from #{path_to_photos}")
import_user_photos(user, path_to_photos)
end
remove_file_references(user)
end
private
def import_user_profile(path_to_profile, username)
raise ArgumentError, "Profile file not found at path: #{path_to_profile}" unless File.exist?(path_to_profile)
service = MigrationService.new(path_to_profile, username)
logger.info "Start validating user profile #{username}"
service.validate
logger.info "Start importing user profile for '#{username}'"
service.perform!
logger.info "Successfully imported profile: #{username}"
rescue MigrationService::ArchiveValidationFailed => e
logger.error "Errors in the archive found: #{e.message}"
rescue MigrationService::MigrationAlreadyExists
logger.error "Migration record already exists for the user, can't continue"
rescue MigrationService::SelfMigrationNotAllowed
logger.error "You can't migrate onto your own account"
end
def import_user_photos(user, path_to_photos)
raise ArgumentError, "Photos file not found at path: #{path_to_photos}" unless File.exist?(path_to_photos)
uncompressed_photos_folder = unzip_photos_file(path_to_photos)
user.posts.find_in_batches do |posts|
import_photos_for_posts(posts, uncompressed_photos_folder)
end
FileUtils.rmdir(uncompressed_photos_folder)
end
def import_photos_for_posts(posts, source_dir)
posts.each do |post|
post.photos.each do |photo|
uploaded_file = "#{source_dir}/#{photo.remote_photo_name}"
next unless File.exist?(uploaded_file) && photo.remote_photo_name.present?
File.open(uploaded_file) do |file|
photo.random_string = File.basename(uploaded_file, ".*")
photo.unprocessed_image = file
photo.save(touch: false)
end
photo.queue_processing_job
end
end
end
def unzip_photos_file(photo_file_path)
folder = create_folder(photo_file_path)
Zip::File.open(photo_file_path) do |zip_file|
zip_file.each do |file|
target_name = "#{folder}#{Pathname::SEPARATOR_LIST}#{file}"
zip_file.extract(file, target_name) unless File.exist?(target_name)
rescue Errno::ENOENT => e
logger.error e.to_s
end
end
folder
end
def create_folder(compressed_file_name)
extension = File.extname(compressed_file_name)
folder = compressed_file_name.delete_suffix(extension)
FileUtils.mkdir(folder) unless File.exist?(folder)
folder
end
def remove_file_references(user)
user.remove_exported_photos_file
user.remove_export
user.save
end
end

View file

@ -10,9 +10,12 @@ class MigrationService
end
def validate
return unless archive_file_exists?
archive_validator.validate
raise ArchiveValidationFailed, errors.join("\n") if errors.any?
raise MigrationAlreadyExists if AccountMigration.where(old_person: old_person).any?
raise SelfMigrationNotAllowed if self_import?
end
def perform!
@ -23,6 +26,12 @@ class MigrationService
remove_intermediate_file
end
def self_import?
source_diaspora_id = archive_validator.archive_author_diaspora_id
target_diaspora_id = "#{new_user_name}#{User.diaspora_id_host}"
source_diaspora_id == target_diaspora_id
end
# when old person can't be resolved we still import data but we don't create&perform AccountMigration instance
def only_import?
old_person.nil?
@ -73,6 +82,10 @@ class MigrationService
File.new(archive_path, "r")
end
def archive_file_exists?
File.exist?(archive_path)
end
def zip_file?
filetype = MIME::Types.type_for(archive_path).first.content_type
filetype.eql?("application/zip")
@ -122,4 +135,7 @@ class MigrationService
class MigrationAlreadyExists < RuntimeError
end
class SelfMigrationNotAllowed < RuntimeError
end
end

View file

@ -9,7 +9,15 @@ class ExportedPhotos < SecureUploader
"uploads/users"
end
def extension_allowlist
%w[zip]
end
def filename
"#{model.username}_photos_#{secure_token}.zip" if original_filename.present?
return if original_filename.blank?
filename_parts = original_filename.split(".")
extensions = filename_parts.join(".")
"#{model.username}_photos_#{secure_token}.#{extensions}" if original_filename.present?
end
end

View file

@ -10,10 +10,14 @@ class ExportedUser < SecureUploader
end
def extension_allowlist
%w[gz]
%w[gz zip json]
end
def filename
"#{model.username}_diaspora_data_#{secure_token}.json.gz" if original_filename.present?
return if original_filename.blank?
filename_parts = original_filename.split(".")
extensions = filename_parts.join(".")
"#{model.username}_data_#{secure_token}.#{extensions}"
end
end

View file

@ -2,7 +2,8 @@
class SecureUploader < CarrierWave::Uploader::Base
protected
def secure_token(bytes = 16)
def secure_token(bytes=16)
var = :"@#{mounted_as}_secure_token"
model.instance_variable_get(var) or model.instance_variable_set(var, SecureRandom.urlsafe_base64(bytes))
end

View file

@ -0,0 +1,36 @@
# frozen_string_literal: true
module Workers
class ImportUser < Base
sidekiq_options queue: :low
include Diaspora::Logging
def perform(user_id)
if currently_running_exports >= AppConfig.settings.export_concurrency.to_i
logger.info "Already the maximum number of parallel user imports running, " \
"scheduling import for User:#{user_id} in 5 minutes."
self.class.perform_in(5.minutes + rand(30), user_id)
else
import_user(user_id)
end
end
private
def import_user(user_id)
user = User.find(user_id)
ImportService.new.import_by_user(user)
end
def currently_running_exports
return 0 if AppConfig.environment.single_process_mode?
Sidekiq::Workers.new.count do |process_id, thread_id, work|
!(Process.pid.to_s == process_id.split(":")[1] && Thread.current.object_id.to_s(36) == thread_id) &&
work["payload"]["class"] == self.class.to_s
end
end
end
end

View file

@ -16,9 +16,9 @@ module Workers
return false if photo.processed? || unprocessed_image.path.try(:include?, ".gif")
photo.processed_image.store!(unprocessed_image)
photo.save!
rescue ActiveRecord::RecordNotFound # Deleted before the job was run
# Ignored
end
end
end

View file

@ -31,12 +31,14 @@ class ArchiveImporter
username: attr[:username],
password: attr[:password],
password_confirmation: attr[:password],
getting_started: false,
person: {
profile_attributes: profile_attributes
}
)
self.user = User.build(data)
self.user = User.find_or_build(data)
user.show_community_spotlight_in_stream = data.fetch(:show_community_spotlight_in_stream, true)
user.strip_exif = data.fetch(:strip_exif, true)
user.getting_started = false
user.save!
end

View file

@ -2,43 +2,33 @@
namespace :accounts do
desc "Perform migration"
task :migration, %i[archive_path new_user_name] => :environment do |_t, args|
puts "Account migration is requested"
args = %i[archive_path new_user_name].map {|name| [name, args[name]] }.to_h
task :migration, %i[archive_path photos_path new_user_name] => :environment do |_t, args|
puts "Account migration is requested. You can import a profile or a photos archive or booth."
args = %i[archive_path photos_path new_user_name].map {|name| [name, args[name]] }.to_h
process_arguments(args)
begin
service = MigrationService.new(args[:archive_path], args[:new_user_name])
service.validate
puts "Warnings:\n#{service.warnings.join("\n")}\n-----" if service.warnings.any?
if service.only_import?
puts "Warning: Archive owner is not fetchable. Proceeding with data import, but account migration record "\
"won't be created"
end
print "Do you really want to execute the archive import? Note: this is irreversible! [y/N]: "
next unless $stdin.gets.strip.casecmp?("y")
start_time = Time.now.getlocal
service.perform!
puts service.only_import? ? "Data import complete!" : "Data import and migration complete!"
puts "Migration took #{Time.now.getlocal - start_time} seconds"
rescue MigrationService::ArchiveValidationFailed => exception
puts "Errors in the archive found:\n#{exception.message}\n-----"
rescue MigrationService::MigrationAlreadyExists
puts "Migration record already exists for the user, can't continue"
start_time = Time.now.getlocal
if args[:new_user_name].present? && (args[:archive_path].present? || args[:photos_path].present?)
ImportService.new.import_by_files(args[:archive_path], args[:photos_path], args[:new_user_name])
puts "\n Migration completed in #{Time.now.getlocal - start_time} seconds. (Photos might still be processed in)"
else
puts "Must set a user name and a archive file path or photos file path"
end
end
def process_arguments(args)
if args[:archive_path].nil?
print "Enter the archive path: "
args[:archive_path] = $stdin.gets.strip
end
if args[:new_user_name].nil?
print "Enter the new user name: "
args[:new_user_name] = $stdin.gets.strip
end
args[:archive_path] = request_parameter(args[:archive_path], "Enter the archive (.json, .gz, .zip) path: ")
args[:photos_path] = request_parameter(args[:photos_path], "Enter the photos (.zip) path: ")
args[:new_user_name] = request_parameter(args[:new_user_name], "Enter the new user name: ")
puts "Archive path: #{args[:archive_path]}"
puts "Photos path: #{args[:photos_path]}"
puts "New username: #{args[:new_user_name]}"
end
end
def request_parameter(arg, text)
return arg unless arg.nil?
print text
$stdin.gets.strip
end

View file

@ -76,15 +76,6 @@ describe "Receive federation messages feature" do
}.to raise_error(ActiveRecord::RecordInvalid)
end
it "doesn't accept second migration for the same new user profile" do
run_migration
expect {
sender = create_remote_user("example.org")
entity = create_account_migration_entity(sender.diaspora_handle, new_user)
post_message(generate_payload(entity, sender))
}.to raise_error(ActiveRecord::RecordInvalid)
end
context "when our pod was left" do
let(:sender) { FactoryBot.create(:user) }

View file

@ -147,7 +147,6 @@ describe ArchiveImporter do
expect {
archive_importer.create_user(username: "new_name", password: "123456")
}.to change(User, :count).by(1)
expect(archive_importer.user.email).to eq("user@example.com")
expect(archive_importer.user.strip_exif).to eq(false)
expect(archive_importer.user.show_community_spotlight_in_stream).to eq(false)

View file

@ -1006,7 +1006,7 @@ describe User, type: :model do
expect(user.export).to be_present
expect(user.exported_at).to be_present
expect(user.exporting).to be_falsey
expect(user.export.filename).to match(/.json/)
expect(user.export.filename).to match(/\.json\.gz$/)
expect(ActiveSupport::Gzip.decompress(user.export.file.read)).to include user.username
end