From 6c4c6f88890882378351d10320a467637bb72ca0 Mon Sep 17 00:00:00 2001 From: Thorsten Claus Date: Sun, 19 Sep 2021 14:48:38 +0200 Subject: [PATCH] Migration Backend Part --- .gitignore | 1 + app/controllers/users_controller.rb | 17 ++++ app/models/account_migration.rb | 3 +- app/models/photo.rb | 52 +++++----- app/models/user.rb | 27 ++++-- app/services/import_service.rb | 96 +++++++++++++++++++ app/services/migration_service.rb | 16 ++++ app/uploaders/exported_photos.rb | 10 +- app/uploaders/exported_user.rb | 8 +- app/uploaders/secure_uploader.rb | 3 +- app/workers/import_user.rb | 36 +++++++ app/workers/process_photo.rb | 2 +- lib/archive_importer.rb | 6 +- lib/tasks/accounts.rake | 52 ++++------ .../receive_federation_messages_spec.rb | 9 -- spec/lib/archive_importer_spec.rb | 1 - spec/models/user_spec.rb | 2 +- 17 files changed, 257 insertions(+), 84 deletions(-) create mode 100644 app/services/import_service.rb create mode 100644 app/workers/import_user.rb diff --git a/.gitignore b/.gitignore index a91f73413..69a800c39 100644 --- a/.gitignore +++ b/.gitignore @@ -77,3 +77,4 @@ diaspora.iml # WebTranslateIt .wti +/__MACOSX/ diff --git a/app/controllers/users_controller.rb b/app/controllers/users_controller.rb index 726f46f2e..45bec9817 100644 --- a/app/controllers/users_controller.rb +++ b/app/controllers/users_controller.rb @@ -154,6 +154,8 @@ class UsersController < ApplicationController :post_default_public, :otp_required_for_login, :otp_secret, + :exported_photos_file, + :export, email_preferences: UserPreference::VALID_EMAIL_TYPES.map(&:to_sym) ) end @@ -172,6 +174,8 @@ class UsersController < ApplicationController change_post_default(user_data) elsif user_data[:color_theme] change_settings(user_data, "users.update.color_theme_changed", "users.update.color_theme_not_changed") + elsif user_data[:export] || user_data[:exported_photos_file] + upload_export_files(user_data) else change_settings(user_data) end @@ -235,6 +239,19 @@ class UsersController < ApplicationController end end + def upload_export_files(user_data) + logger.info "Start importing account" + @user.export = user_data[:export] if user_data[:export] + @user.exported_photos_file = user_data[:exported_photos_file] if user_data[:exported_photos_file] + if @user.save + flash.now[:notice] = "Your account migration has been scheduled" + else + flash.now[:error] = "Your account migration could not be scheduled for the following reason:"\ + " #{@user.errors.full_messages}" + end + Workers::ImportUser.perform_async(@user.id) + end + def change_settings(user_data, successful="users.update.settings_updated", error="users.update.settings_not_updated") if @user.update_attributes(user_data) flash.now[:notice] = t(successful) diff --git a/app/models/account_migration.rb b/app/models/account_migration.rb index 478179725..b971b881b 100644 --- a/app/models/account_migration.rb +++ b/app/models/account_migration.rb @@ -7,7 +7,7 @@ class AccountMigration < ApplicationRecord belongs_to :new_person, class_name: "Person" validates :old_person, uniqueness: true - validates :new_person, uniqueness: true + validates :new_person, presence: true after_create :lock_old_user! @@ -28,7 +28,6 @@ class AccountMigration < ApplicationRecord @sender ||= old_user || ephemeral_sender end - # executes a migration plan according to this AccountMigration object def perform! raise "already performed" if performed? diff --git a/app/models/photo.rb b/app/models/photo.rb index 9678db4ac..0a77df62a 100644 --- a/app/models/photo.rb +++ b/app/models/photo.rb @@ -22,7 +22,7 @@ class Photo < ApplicationRecord large: photo.url(:scaled_full), raw: photo.url } - }, :as => :sizes + }, as: :sizes t.add lambda { |photo| { height: photo.height, @@ -48,25 +48,25 @@ class Photo < ApplicationRecord before_destroy :ensure_user_picture after_destroy :clear_empty_status_message - after_commit :on => :create do - queue_processing_job if self.author.local? + after_commit on: :create do + queue_processing_job if author.local? end scope :on_statuses, ->(post_guids) { - where(:status_message_guid => post_guids) + where(status_message_guid: post_guids) } def clear_empty_status_message - if self.status_message && self.status_message.text_and_photos_blank? - self.status_message.destroy + if status_message&.text_and_photos_blank? + status_message.destroy else true end end def ownership_of_status_message - message = StatusMessage.find_by_guid(self.status_message_guid) + message = StatusMessage.find_by(guid: status_message_guid) return unless status_message_guid && message && diaspora_handle != message.diaspora_handle errors.add(:base, "Photo must have the same owner as status message") @@ -96,26 +96,22 @@ class Photo < ApplicationRecord end def update_remote_path - unless self.unprocessed_image.url.match(/^https?:\/\//) - remote_path = "#{AppConfig.pod_uri.to_s.chomp("/")}#{self.unprocessed_image.url}" - else - remote_path = self.unprocessed_image.url - end + remote_path = if unprocessed_image.url.match(%r{^https?://}) + unprocessed_image.url + else + "#{AppConfig.pod_uri.to_s.chomp('/')}#{unprocessed_image.url}" + end - name_start = remote_path.rindex '/' + name_start = remote_path.rindex "/" self.remote_photo_path = "#{remote_path.slice(0, name_start)}/" self.remote_photo_name = remote_path.slice(name_start + 1, remote_path.length) end - def url(name = nil) - if remote_photo_path - name = name.to_s + '_' if name + def url(name=nil) + if remote_photo_path.present? && remote_photo_name.present? + name = "#{name}_" if name image_url = remote_photo_path + name.to_s + remote_photo_name - if AppConfig.privacy.camo.proxy_remote_pod_images? - Diaspora::Camo.image_url(image_url) - else - image_url - end + camo_image_url(image_url) elsif processed? processed_image.url(name) else @@ -124,7 +120,7 @@ class Photo < ApplicationRecord end def ensure_user_picture - profiles = Profile.where(:image_url => url(:thumb_large)) + profiles = Profile.where(image_url: url(:thumb_large)) profiles.each { |profile| profile.image_url = nil profile.save @@ -132,7 +128,7 @@ class Photo < ApplicationRecord end def queue_processing_job - Workers::ProcessPhoto.perform_async(self.id) + Workers::ProcessPhoto.perform_async(id) end def self.visible(current_user, person, limit=:all, max_time=nil) @@ -143,4 +139,14 @@ class Photo < ApplicationRecord end photos.where(pending: false).order("created_at DESC") end + + private + + def camo_image_url(image_url) + if AppConfig.privacy.camo.proxy_remote_pod_images? + Diaspora::Camo.image_url(image_url) + else + image_url + end + end end diff --git a/app/models/user.rb b/app/models/user.rb index 788fe6aba..28813c604 100644 --- a/app/models/user.rb +++ b/app/models/user.rb @@ -325,9 +325,9 @@ class User < ApplicationRecord else update exporting: false end - rescue => error - logger.error "Unexpected error while exporting user '#{username}': #{error.class}: #{error.message}\n" \ - "#{error.backtrace.first(15).join("\n")}" + rescue StandardError => e + logger.error "Unexpected error while exporting data for '#{username}': #{e.class}: #{e.message}\n" \ + "#{e.backtrace.first(15).join("\n")}" update exporting: false end @@ -335,7 +335,7 @@ class User < ApplicationRecord ActiveSupport::Gzip.compress Diaspora::Exporter.new(self).execute end - ######### Photos export ################## + ######### Photo export ################## mount_uploader :exported_photos_file, ExportedPhotos def queue_export_photos @@ -345,9 +345,9 @@ class User < ApplicationRecord def perform_export_photos! PhotoExporter.new(self).perform - rescue => error - logger.error "Unexpected error while exporting photos for '#{username}': #{error.class}: #{error.message}\n" \ - "#{error.backtrace.first(15).join("\n")}" + rescue StandardError => e + logger.error "Unexpected error while exporting photos for '#{username}': #{e.class}: #{e.message}\n" \ + "#{e.backtrace.first(15).join("\n")}" update exporting_photos: false end @@ -403,13 +403,19 @@ class User < ApplicationRecord tag_followings.any? || profile[:image_url] end - ###Helpers############ - def self.build(opts = {}) + ### Helpers ############ + def self.build(opts={}) u = User.new(opts.except(:person, :id)) u.setup(opts) u end + def self.find_or_build(opts={}) + user = User.find_by(username: opts[:username]) + user ||= User.build(opts) + user + end + def setup(opts) self.username = opts[:username] self.email = opts[:email] @@ -417,10 +423,11 @@ class User < ApplicationRecord self.language ||= I18n.locale.to_s self.color_theme = opts[:color_theme] self.color_theme ||= AppConfig.settings.default_color_theme - self.valid? + valid? errors = self.errors errors.delete :person return if errors.size > 0 + self.set_person(Person.new((opts[:person] || {}).except(:id))) self.generate_keys self diff --git a/app/services/import_service.rb b/app/services/import_service.rb new file mode 100644 index 000000000..13ca0654a --- /dev/null +++ b/app/services/import_service.rb @@ -0,0 +1,96 @@ +# frozen_string_literal: true + +class ImportService + include Diaspora::Logging + + def import_by_user(user) + import_by_files(user.export.current_path, user.exported_photos_file.current_path, user.username) + end + + def import_by_files(path_to_profile, path_to_photos, username) + if path_to_profile.present? + logger.info "Import for profile #{username} at path #{path_to_profile} requested" + import_user_profile(path_to_profile, username) + end + + user = User.find_by(username: username) + raise ArgumentError, "Username #{username} should exist before uploading photos." if user.nil? + + if path_to_photos.present? + logger.info("Importing photos from import file for '#{username}' from #{path_to_photos}") + import_user_photos(user, path_to_photos) + end + remove_file_references(user) + end + + private + + def import_user_profile(path_to_profile, username) + raise ArgumentError, "Profile file not found at path: #{path_to_profile}" unless File.exist?(path_to_profile) + + service = MigrationService.new(path_to_profile, username) + logger.info "Start validating user profile #{username}" + service.validate + logger.info "Start importing user profile for '#{username}'" + service.perform! + logger.info "Successfully imported profile: #{username}" + rescue MigrationService::ArchiveValidationFailed => e + logger.error "Errors in the archive found: #{e.message}" + rescue MigrationService::MigrationAlreadyExists + logger.error "Migration record already exists for the user, can't continue" + rescue MigrationService::SelfMigrationNotAllowed + logger.error "You can't migrate onto your own account" + end + + def import_user_photos(user, path_to_photos) + raise ArgumentError, "Photos file not found at path: #{path_to_photos}" unless File.exist?(path_to_photos) + + uncompressed_photos_folder = unzip_photos_file(path_to_photos) + user.posts.find_in_batches do |posts| + import_photos_for_posts(posts, uncompressed_photos_folder) + end + FileUtils.rmdir(uncompressed_photos_folder) + end + + def import_photos_for_posts(posts, source_dir) + posts.each do |post| + post.photos.each do |photo| + uploaded_file = "#{source_dir}/#{photo.remote_photo_name}" + next unless File.exist?(uploaded_file) && photo.remote_photo_name.present? + + File.open(uploaded_file) do |file| + photo.random_string = File.basename(uploaded_file, ".*") + photo.unprocessed_image = file + photo.save(touch: false) + end + photo.queue_processing_job + end + end + end + + def unzip_photos_file(photo_file_path) + folder = create_folder(photo_file_path) + Zip::File.open(photo_file_path) do |zip_file| + zip_file.each do |file| + target_name = "#{folder}#{Pathname::SEPARATOR_LIST}#{file}" + zip_file.extract(file, target_name) unless File.exist?(target_name) + rescue Errno::ENOENT => e + logger.error e.to_s + end + end + folder + end + + def create_folder(compressed_file_name) + extension = File.extname(compressed_file_name) + folder = compressed_file_name.delete_suffix(extension) + FileUtils.mkdir(folder) unless File.exist?(folder) + folder + end + + def remove_file_references(user) + user.remove_exported_photos_file + user.remove_export + user.save + end +end diff --git a/app/services/migration_service.rb b/app/services/migration_service.rb index 68f891b1f..56c13a45b 100644 --- a/app/services/migration_service.rb +++ b/app/services/migration_service.rb @@ -10,9 +10,12 @@ class MigrationService end def validate + return unless archive_file_exists? + archive_validator.validate raise ArchiveValidationFailed, errors.join("\n") if errors.any? raise MigrationAlreadyExists if AccountMigration.where(old_person: old_person).any? + raise SelfMigrationNotAllowed if self_import? end def perform! @@ -23,6 +26,12 @@ class MigrationService remove_intermediate_file end + def self_import? + source_diaspora_id = archive_validator.archive_author_diaspora_id + target_diaspora_id = "#{new_user_name}#{User.diaspora_id_host}" + source_diaspora_id == target_diaspora_id + end + # when old person can't be resolved we still import data but we don't create&perform AccountMigration instance def only_import? old_person.nil? @@ -73,6 +82,10 @@ class MigrationService File.new(archive_path, "r") end + def archive_file_exists? + File.exist?(archive_path) + end + def zip_file? filetype = MIME::Types.type_for(archive_path).first.content_type filetype.eql?("application/zip") @@ -122,4 +135,7 @@ class MigrationService class MigrationAlreadyExists < RuntimeError end + + class SelfMigrationNotAllowed < RuntimeError + end end diff --git a/app/uploaders/exported_photos.rb b/app/uploaders/exported_photos.rb index b19c4edb1..724e65ef4 100644 --- a/app/uploaders/exported_photos.rb +++ b/app/uploaders/exported_photos.rb @@ -9,7 +9,15 @@ class ExportedPhotos < SecureUploader "uploads/users" end + def extension_allowlist + %w[zip] + end + def filename - "#{model.username}_photos_#{secure_token}.zip" if original_filename.present? + return if original_filename.blank? + + filename_parts = original_filename.split(".") + extensions = filename_parts.join(".") + "#{model.username}_photos_#{secure_token}.#{extensions}" if original_filename.present? end end diff --git a/app/uploaders/exported_user.rb b/app/uploaders/exported_user.rb index 3fc0172e6..2f707eb8c 100644 --- a/app/uploaders/exported_user.rb +++ b/app/uploaders/exported_user.rb @@ -10,10 +10,14 @@ class ExportedUser < SecureUploader end def extension_allowlist - %w[gz] + %w[gz zip json] end def filename - "#{model.username}_diaspora_data_#{secure_token}.json.gz" if original_filename.present? + return if original_filename.blank? + + filename_parts = original_filename.split(".") + extensions = filename_parts.join(".") + "#{model.username}_data_#{secure_token}.#{extensions}" end end diff --git a/app/uploaders/secure_uploader.rb b/app/uploaders/secure_uploader.rb index d5e62e77b..963c047c1 100644 --- a/app/uploaders/secure_uploader.rb +++ b/app/uploaders/secure_uploader.rb @@ -2,7 +2,8 @@ class SecureUploader < CarrierWave::Uploader::Base protected - def secure_token(bytes = 16) + + def secure_token(bytes=16) var = :"@#{mounted_as}_secure_token" model.instance_variable_get(var) or model.instance_variable_set(var, SecureRandom.urlsafe_base64(bytes)) end diff --git a/app/workers/import_user.rb b/app/workers/import_user.rb new file mode 100644 index 000000000..8fb1801b3 --- /dev/null +++ b/app/workers/import_user.rb @@ -0,0 +1,36 @@ +# frozen_string_literal: true + +module Workers + class ImportUser < Base + sidekiq_options queue: :low + + include Diaspora::Logging + + def perform(user_id) + if currently_running_exports >= AppConfig.settings.export_concurrency.to_i + + logger.info "Already the maximum number of parallel user imports running, " \ + "scheduling import for User:#{user_id} in 5 minutes." + self.class.perform_in(5.minutes + rand(30), user_id) + else + import_user(user_id) + end + end + + private + + def import_user(user_id) + user = User.find(user_id) + ImportService.new.import_by_user(user) + end + + def currently_running_exports + return 0 if AppConfig.environment.single_process_mode? + + Sidekiq::Workers.new.count do |process_id, thread_id, work| + !(Process.pid.to_s == process_id.split(":")[1] && Thread.current.object_id.to_s(36) == thread_id) && + work["payload"]["class"] == self.class.to_s + end + end + end +end diff --git a/app/workers/process_photo.rb b/app/workers/process_photo.rb index 41ef597d2..9b12bd8f4 100644 --- a/app/workers/process_photo.rb +++ b/app/workers/process_photo.rb @@ -16,9 +16,9 @@ module Workers return false if photo.processed? || unprocessed_image.path.try(:include?, ".gif") photo.processed_image.store!(unprocessed_image) - photo.save! rescue ActiveRecord::RecordNotFound # Deleted before the job was run + # Ignored end end end diff --git a/lib/archive_importer.rb b/lib/archive_importer.rb index 4b307aa88..1b0bab6cb 100644 --- a/lib/archive_importer.rb +++ b/lib/archive_importer.rb @@ -31,12 +31,14 @@ class ArchiveImporter username: attr[:username], password: attr[:password], password_confirmation: attr[:password], - getting_started: false, person: { profile_attributes: profile_attributes } ) - self.user = User.build(data) + self.user = User.find_or_build(data) + user.show_community_spotlight_in_stream = data.fetch(:show_community_spotlight_in_stream, true) + user.strip_exif = data.fetch(:strip_exif, true) + user.getting_started = false user.save! end diff --git a/lib/tasks/accounts.rake b/lib/tasks/accounts.rake index c33c528fd..24077ec3e 100644 --- a/lib/tasks/accounts.rake +++ b/lib/tasks/accounts.rake @@ -2,43 +2,33 @@ namespace :accounts do desc "Perform migration" - task :migration, %i[archive_path new_user_name] => :environment do |_t, args| - puts "Account migration is requested" - args = %i[archive_path new_user_name].map {|name| [name, args[name]] }.to_h + task :migration, %i[archive_path photos_path new_user_name] => :environment do |_t, args| + puts "Account migration is requested. You can import a profile or a photos archive or booth." + args = %i[archive_path photos_path new_user_name].map {|name| [name, args[name]] }.to_h process_arguments(args) - - begin - service = MigrationService.new(args[:archive_path], args[:new_user_name]) - service.validate - puts "Warnings:\n#{service.warnings.join("\n")}\n-----" if service.warnings.any? - if service.only_import? - puts "Warning: Archive owner is not fetchable. Proceeding with data import, but account migration record "\ - "won't be created" - end - print "Do you really want to execute the archive import? Note: this is irreversible! [y/N]: " - next unless $stdin.gets.strip.casecmp?("y") - - start_time = Time.now.getlocal - service.perform! - puts service.only_import? ? "Data import complete!" : "Data import and migration complete!" - puts "Migration took #{Time.now.getlocal - start_time} seconds" - rescue MigrationService::ArchiveValidationFailed => exception - puts "Errors in the archive found:\n#{exception.message}\n-----" - rescue MigrationService::MigrationAlreadyExists - puts "Migration record already exists for the user, can't continue" + start_time = Time.now.getlocal + if args[:new_user_name].present? && (args[:archive_path].present? || args[:photos_path].present?) + ImportService.new.import_by_files(args[:archive_path], args[:photos_path], args[:new_user_name]) + puts "\n Migration completed in #{Time.now.getlocal - start_time} seconds. (Photos might still be processed in)" + else + puts "Must set a user name and a archive file path or photos file path" end end def process_arguments(args) - if args[:archive_path].nil? - print "Enter the archive path: " - args[:archive_path] = $stdin.gets.strip - end - if args[:new_user_name].nil? - print "Enter the new user name: " - args[:new_user_name] = $stdin.gets.strip - end + args[:archive_path] = request_parameter(args[:archive_path], "Enter the archive (.json, .gz, .zip) path: ") + args[:photos_path] = request_parameter(args[:photos_path], "Enter the photos (.zip) path: ") + args[:new_user_name] = request_parameter(args[:new_user_name], "Enter the new user name: ") + puts "Archive path: #{args[:archive_path]}" + puts "Photos path: #{args[:photos_path]}" puts "New username: #{args[:new_user_name]}" end end + +def request_parameter(arg, text) + return arg unless arg.nil? + + print text + $stdin.gets.strip +end diff --git a/spec/integration/federation/receive_federation_messages_spec.rb b/spec/integration/federation/receive_federation_messages_spec.rb index 382e676ec..db919b397 100644 --- a/spec/integration/federation/receive_federation_messages_spec.rb +++ b/spec/integration/federation/receive_federation_messages_spec.rb @@ -76,15 +76,6 @@ describe "Receive federation messages feature" do }.to raise_error(ActiveRecord::RecordInvalid) end - it "doesn't accept second migration for the same new user profile" do - run_migration - expect { - sender = create_remote_user("example.org") - entity = create_account_migration_entity(sender.diaspora_handle, new_user) - post_message(generate_payload(entity, sender)) - }.to raise_error(ActiveRecord::RecordInvalid) - end - context "when our pod was left" do let(:sender) { FactoryBot.create(:user) } diff --git a/spec/lib/archive_importer_spec.rb b/spec/lib/archive_importer_spec.rb index 99858f147..6ee361067 100644 --- a/spec/lib/archive_importer_spec.rb +++ b/spec/lib/archive_importer_spec.rb @@ -147,7 +147,6 @@ describe ArchiveImporter do expect { archive_importer.create_user(username: "new_name", password: "123456") }.to change(User, :count).by(1) - expect(archive_importer.user.email).to eq("user@example.com") expect(archive_importer.user.strip_exif).to eq(false) expect(archive_importer.user.show_community_spotlight_in_stream).to eq(false) diff --git a/spec/models/user_spec.rb b/spec/models/user_spec.rb index 6784f4a92..268538209 100644 --- a/spec/models/user_spec.rb +++ b/spec/models/user_spec.rb @@ -1006,7 +1006,7 @@ describe User, type: :model do expect(user.export).to be_present expect(user.exported_at).to be_present expect(user.exporting).to be_falsey - expect(user.export.filename).to match(/.json/) + expect(user.export.filename).to match(/\.json\.gz$/) expect(ActiveSupport::Gzip.decompress(user.export.file.read)).to include user.username end