diff --git a/app/models/bulk_imports/entity.rb b/app/models/bulk_imports/entity.rb index 437118c36e844282c071af08ec58614d131f56c1..a075c2f7e4ff72e34e362ac5f605dcd88afa0cce 100644 --- a/app/models/bulk_imports/entity.rb +++ b/app/models/bulk_imports/entity.rb @@ -124,6 +124,10 @@ def pluralized_name entity_type.pluralize end + def portable_class + entity_type.classify.constantize + end + def base_resource_url_path "/#{pluralized_name}/#{encoded_source_full_path}" end diff --git a/app/workers/bulk_imports/pipeline_batch_worker.rb b/app/workers/bulk_imports/pipeline_batch_worker.rb index 55936e85d48321a8e1810857e2deea700e3d8ddb..1485275e6162c476e186a041441d7d7d24a0cdd3 100644 --- a/app/workers/bulk_imports/pipeline_batch_worker.rb +++ b/app/workers/bulk_imports/pipeline_batch_worker.rb @@ -5,6 +5,8 @@ class PipelineBatchWorker include ApplicationWorker include ExclusiveLeaseGuard + DEFER_ON_HEALTH_DELAY = 5.minutes + data_consistency :always # rubocop:disable SidekiqLoadBalancing/WorkerDataConsistency feature_category :importers sidekiq_options dead: false, retry: 3 @@ -16,6 +18,26 @@ class PipelineBatchWorker new.perform_failure(msg['args'].first, exception) end + defer_on_database_health_signal(:gitlab_main, [], DEFER_ON_HEALTH_DELAY) do |job_args, schema, tables| + batch = ::BulkImports::BatchTracker.find(job_args.first) + pipeline_tracker = batch.tracker + pipeline_schema = ::BulkImports::PipelineSchemaInfo.new( + pipeline_tracker.pipeline_class, + pipeline_tracker.entity.portable_class + ) + + if pipeline_schema.db_schema && pipeline_schema.db_table + schema = pipeline_schema.db_schema + tables = [pipeline_schema.db_table] + end + + [schema, tables] + end + + def self.defer_on_database_health_signal? + Feature.enabled?(:bulk_import_deferred_workers) + end + def perform(batch_id) @batch = ::BulkImports::BatchTracker.find(batch_id) diff --git a/app/workers/bulk_imports/pipeline_worker.rb b/app/workers/bulk_imports/pipeline_worker.rb index 4b1df9c85a69ae119d4f8c9c69ae13d03eef7359..2c1d28b33c52152a7f367101cfa6422a0836801d 100644 --- a/app/workers/bulk_imports/pipeline_worker.rb +++ b/app/workers/bulk_imports/pipeline_worker.rb @@ -7,6 +7,8 @@ class PipelineWorker FILE_EXTRACTION_PIPELINE_PERFORM_DELAY = 10.seconds + DEFER_ON_HEALTH_DELAY = 5.minutes + data_consistency :always feature_category :importers sidekiq_options dead: false, retry: 3 @@ -21,6 +23,25 @@ class PipelineWorker new.perform_failure(msg['args'][0], msg['args'][2], exception) end + defer_on_database_health_signal(:gitlab_main, [], DEFER_ON_HEALTH_DELAY) do |job_args, schema, tables| + pipeline_tracker = ::BulkImports::Tracker.find(job_args.first) + pipeline_schema = ::BulkImports::PipelineSchemaInfo.new( + pipeline_tracker.pipeline_class, + pipeline_tracker.entity.portable_class + ) + + if pipeline_schema.db_schema && pipeline_schema.db_table + schema = pipeline_schema.db_schema + tables = [pipeline_schema.db_table] + end + + [schema, tables] + end + + def self.defer_on_database_health_signal? + Feature.enabled?(:bulk_import_deferred_workers) + end + # Keep _stage parameter for backwards compatibility. def perform(pipeline_tracker_id, _stage, entity_id) @entity = ::BulkImports::Entity.find(entity_id) diff --git a/app/workers/concerns/worker_attributes.rb b/app/workers/concerns/worker_attributes.rb index cb09aaf1a6a6cc66319ed9fea99d06fd5b445361..28c82a5a38ea6f99c1ee90bf935a02f37f4c5154 100644 --- a/app/workers/concerns/worker_attributes.rb +++ b/app/workers/concerns/worker_attributes.rb @@ -201,10 +201,10 @@ def big_payload? !!get_class_attribute(:big_payload) end - def defer_on_database_health_signal(gitlab_schema, tables = [], delay_by = DEFAULT_DEFER_DELAY) + def defer_on_database_health_signal(gitlab_schema, tables = [], delay_by = DEFAULT_DEFER_DELAY, &block) set_class_attribute( :database_health_check_attrs, - { gitlab_schema: gitlab_schema, tables: tables, delay_by: delay_by } + { gitlab_schema: gitlab_schema, tables: tables, delay_by: delay_by, block: block } ) end diff --git a/config/feature_flags/development/bulk_import_deferred_workers.yml b/config/feature_flags/development/bulk_import_deferred_workers.yml new file mode 100644 index 0000000000000000000000000000000000000000..1b6a022099ccd63d9723d400fe29974db7bacf07 --- /dev/null +++ b/config/feature_flags/development/bulk_import_deferred_workers.yml @@ -0,0 +1,8 @@ +--- +name: bulk_import_deferred_workers +introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/136137 +rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/431032 +milestone: '16.6' +type: development +group: group::import and integrate +default_enabled: false diff --git a/lib/bulk_imports/pipeline_schema_info.rb b/lib/bulk_imports/pipeline_schema_info.rb new file mode 100644 index 0000000000000000000000000000000000000000..df35a3569d643df2aa0f3e869dcb9e44f4000c77 --- /dev/null +++ b/lib/bulk_imports/pipeline_schema_info.rb @@ -0,0 +1,36 @@ +# frozen_string_literal: true + +module BulkImports + class PipelineSchemaInfo + def initialize(pipeline_class, portable_class) + @pipeline_class = pipeline_class + @portable_class = portable_class + end + + def db_schema + return unless relation + return unless association + + Gitlab::Database::GitlabSchema.tables_to_schema[association.table_name] + end + + def db_table + return unless relation + return unless association + + association.table_name + end + + private + + attr_reader :pipeline_class, :portable_class + + def relation + @relation ||= pipeline_class.try(:relation) + end + + def association + @association ||= portable_class.reflect_on_association(relation) + end + end +end diff --git a/lib/gitlab/sidekiq_middleware/skip_jobs.rb b/lib/gitlab/sidekiq_middleware/skip_jobs.rb index 34ad843e8ee59772e60b67dfa70c4c0a60c71b08..56b150116a33619bde5ba9cdcd0ae95096970968 100644 --- a/lib/gitlab/sidekiq_middleware/skip_jobs.rb +++ b/lib/gitlab/sidekiq_middleware/skip_jobs.rb @@ -80,13 +80,20 @@ def defer_job_by_database_health_signal?(job, worker_class) end health_check_attrs = worker_class.database_health_check_attrs - job_base_model = Gitlab::Database.schemas_to_base_models[health_check_attrs[:gitlab_schema]].first + + tables, schema = health_check_attrs.values_at(:tables, :gitlab_schema) + + if health_check_attrs[:block].respond_to?(:call) + schema, tables = health_check_attrs[:block].call(job['args'], schema, tables) + end + + job_base_model = Gitlab::Database.schemas_to_base_models[schema].first health_context = Gitlab::Database::HealthStatus::Context.new( DatabaseHealthStatusChecker.new(job['jid'], worker_class.name), job_base_model.connection, - health_check_attrs[:tables], - health_check_attrs[:gitlab_schema] + tables, + schema ) Gitlab::Database::HealthStatus.evaluate(health_context).any?(&:stop?) diff --git a/spec/lib/bulk_imports/pipeline_schema_info_spec.rb b/spec/lib/bulk_imports/pipeline_schema_info_spec.rb new file mode 100644 index 0000000000000000000000000000000000000000..45dd92ca26da99f6e6e58286c8b274e923321c0a --- /dev/null +++ b/spec/lib/bulk_imports/pipeline_schema_info_spec.rb @@ -0,0 +1,60 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe BulkImports::PipelineSchemaInfo, feature_category: :importers do + let(:entity) { build(:bulk_import_entity, :project_entity) } + let(:tracker) { build(:bulk_import_tracker, entity: entity, pipeline_name: pipeline_name) } + + let(:pipeline_name) { BulkImports::Common::Pipelines::LabelsPipeline.to_s } + + subject { described_class.new(tracker.pipeline_class, tracker.entity.portable_class) } + + describe '#db_schema' do + context 'when pipeline defines a relation name which is an association' do + it 'returns the schema name of the table used by the association' do + expect(subject.db_schema).to eq(:gitlab_main_cell) + end + end + + context 'when pipeline does not define a relation name' do + let(:pipeline_name) { BulkImports::Common::Pipelines::EntityFinisher.to_s } + + it 'returns nil' do + expect(subject.db_schema).to eq(nil) + end + end + + context 'when pipeline relation name is not an association' do + let(:pipeline_name) { BulkImports::Projects::Pipelines::CommitNotesPipeline.to_s } + + it 'returns nil' do + expect(subject.db_schema).to eq(nil) + end + end + end + + describe '#db_table' do + context 'when pipeline defines a relation name which is an association' do + it 'returns the name of the table used by the association' do + expect(subject.db_table).to eq('labels') + end + end + + context 'when pipeline does not define a relation name' do + let(:pipeline_name) { BulkImports::Common::Pipelines::EntityFinisher.to_s } + + it 'returns nil' do + expect(subject.db_table).to eq(nil) + end + end + + context 'when pipeline relation name is not an association' do + let(:pipeline_name) { BulkImports::Projects::Pipelines::CommitNotesPipeline.to_s } + + it 'returns nil' do + expect(subject.db_table).to eq(nil) + end + end + end +end diff --git a/spec/lib/gitlab/sidekiq_middleware/skip_jobs_spec.rb b/spec/lib/gitlab/sidekiq_middleware/skip_jobs_spec.rb index 2fa0e44d44f15fe4dfcb1d05c0b94c47ff6eca3d..6df77c350e239ec77cc9df76b48aabfa3cd13da3 100644 --- a/spec/lib/gitlab/sidekiq_middleware/skip_jobs_spec.rb +++ b/spec/lib/gitlab/sidekiq_middleware/skip_jobs_spec.rb @@ -185,6 +185,21 @@ def self.name TestWorker.perform_async(*job['args']) end end + + context 'when a block is provided' do + before do + TestWorker.defer_on_database_health_signal(*health_signal_attrs.values) do + [:gitlab_ci, [:ci_pipelines]] + end + end + + it 'uses the lazy evaluated schema and tables returned by the block' do + expect(Gitlab::Database::HealthStatus::Context).to receive(:new) + .with(anything, anything, [:ci_pipelines], :gitlab_ci).and_call_original + + expect { |b| subject.call(TestWorker.new, job, queue, &b) }.to yield_control + end + end end end end diff --git a/spec/models/bulk_imports/entity_spec.rb b/spec/models/bulk_imports/entity_spec.rb index 3e98ba0973e8b003a15d452a7411fa9a6bb0a0b3..b822786579b671b7bee8709aead68bb9cdf6af0d 100644 --- a/spec/models/bulk_imports/entity_spec.rb +++ b/spec/models/bulk_imports/entity_spec.rb @@ -248,6 +248,24 @@ end end + describe '#portable_class' do + context 'when entity is group' do + it 'returns Group class' do + entity = build(:bulk_import_entity, :group_entity) + + expect(entity.portable_class).to eq(Group) + end + end + + context 'when entity is project' do + it 'returns Project class' do + entity = build(:bulk_import_entity, :project_entity) + + expect(entity.portable_class).to eq(Project) + end + end + end + describe '#export_relations_url_path' do context 'when entity is group' do it 'returns group export relations url' do diff --git a/spec/workers/bulk_imports/pipeline_batch_worker_spec.rb b/spec/workers/bulk_imports/pipeline_batch_worker_spec.rb index 9ac297ae75782ae376c8428710d7e45a4f4d6549..c459c17b1bcaead8ab2ecdd4f59cadc508d58885 100644 --- a/spec/workers/bulk_imports/pipeline_batch_worker_spec.rb +++ b/spec/workers/bulk_imports/pipeline_batch_worker_spec.rb @@ -13,6 +13,10 @@ def initialize(context) @context = context end + def self.relation + 'labels' + end + def run @context.tracker.finish! end @@ -202,4 +206,55 @@ def self.file_extraction_pipeline? expect(batch.reload).to be_failed end end + + context 'with stop signal from database health check' do + around do |example| + with_sidekiq_server_middleware do |chain| + chain.add Gitlab::SidekiqMiddleware::SkipJobs + Sidekiq::Testing.inline! { example.run } + end + end + + before do + stub_feature_flags("drop_sidekiq_jobs_#{described_class.name}": false) + + stop_signal = instance_double("Gitlab::Database::HealthStatus::Signals::Stop", stop?: true) + allow(Gitlab::Database::HealthStatus).to receive(:evaluate).and_return([stop_signal]) + end + + it 'defers the job by set time' do + expect_next_instance_of(described_class) do |worker| + expect(worker).not_to receive(:perform).with(batch.id) + end + + expect(described_class).to receive(:perform_in).with(described_class::DEFER_ON_HEALTH_DELAY, batch.id) + + described_class.perform_async(batch.id) + end + + it 'lazy evaluates schema and tables', :aggregate_failures do + block = described_class.database_health_check_attrs[:block] + + job_args = [batch.id] + + schema, table = block.call([job_args]) + + expect(schema).to eq(:gitlab_main_cell) + expect(table).to eq(['labels']) + end + + context 'when `bulk_import_deferred_workers` feature flag is disabled' do + it 'does not defer job execution' do + stub_feature_flags(bulk_import_deferred_workers: false) + + expect_next_instance_of(described_class) do |worker| + expect(worker).to receive(:perform).with(batch.id) + end + + expect(described_class).not_to receive(:perform_in) + + described_class.perform_async(batch.id) + end + end + end end diff --git a/spec/workers/bulk_imports/pipeline_worker_spec.rb b/spec/workers/bulk_imports/pipeline_worker_spec.rb index 6ea7334f6a67b07ae3f4f1049a5412b8e1f75c02..d99b3e9de732b9d7f528dcb014304f1fde3b584c 100644 --- a/spec/workers/bulk_imports/pipeline_worker_spec.rb +++ b/spec/workers/bulk_imports/pipeline_worker_spec.rb @@ -9,6 +9,10 @@ def initialize(_); end def run; end + def self.relation + 'labels' + end + def self.file_extraction_pipeline? false end @@ -155,6 +159,62 @@ def self.file_extraction_pipeline? end end + context 'with stop signal from database health check' do + around do |example| + with_sidekiq_server_middleware do |chain| + chain.add Gitlab::SidekiqMiddleware::SkipJobs + Sidekiq::Testing.inline! { example.run } + end + end + + before do + stub_feature_flags("drop_sidekiq_jobs_#{described_class.name}": false) + + stop_signal = instance_double("Gitlab::Database::HealthStatus::Signals::Stop", stop?: true) + allow(Gitlab::Database::HealthStatus).to receive(:evaluate).and_return([stop_signal]) + end + + it 'defers the job by set time' do + expect_next_instance_of(described_class) do |worker| + expect(worker).not_to receive(:perform).with(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + end + + expect(described_class).to receive(:perform_in).with( + described_class::DEFER_ON_HEALTH_DELAY, + pipeline_tracker.id, + pipeline_tracker.stage, + entity.id + ) + + described_class.perform_async(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + end + + it 'lazy evaluates schema and tables', :aggregate_failures do + block = described_class.database_health_check_attrs[:block] + + job_args = [pipeline_tracker.id, pipeline_tracker.stage, entity.id] + + schema, table = block.call([job_args]) + + expect(schema).to eq(:gitlab_main_cell) + expect(table).to eq(['labels']) + end + + context 'when `bulk_import_deferred_workers` feature flag is disabled' do + it 'does not defer job execution' do + stub_feature_flags(bulk_import_deferred_workers: false) + + expect_next_instance_of(described_class) do |worker| + expect(worker).to receive(:perform).with(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + end + + expect(described_class).not_to receive(:perform_in) + + described_class.perform_async(pipeline_tracker.id, pipeline_tracker.stage, entity.id) + end + end + end + context 'when pipeline is finished' do let(:pipeline_tracker) do create( diff --git a/spec/workers/ci/pipeline_success_unlock_artifacts_worker_spec.rb b/spec/workers/ci/pipeline_success_unlock_artifacts_worker_spec.rb index 60a34fdab53554d9504a36910f6e2fdb6a09091a..d5f3c2b92fb095c96ed1be3d603d4f796a7f37e3 100644 --- a/spec/workers/ci/pipeline_success_unlock_artifacts_worker_spec.rb +++ b/spec/workers/ci/pipeline_success_unlock_artifacts_worker_spec.rb @@ -75,7 +75,8 @@ expect(described_class.database_health_check_attrs).to eq( gitlab_schema: :gitlab_ci, delay_by: described_class::DEFAULT_DEFER_DELAY, - tables: [:ci_job_artifacts] + tables: [:ci_job_artifacts], + block: nil ) end end diff --git a/spec/workers/concerns/worker_attributes_spec.rb b/spec/workers/concerns/worker_attributes_spec.rb index 90c07a9c9590fce1b50652b5a0718f7b2d84eb92..767a55162fbf3a1757090b1ab4b907e1a3d0a565 100644 --- a/spec/workers/concerns/worker_attributes_spec.rb +++ b/spec/workers/concerns/worker_attributes_spec.rb @@ -37,7 +37,7 @@ def self.name :worker_has_external_dependencies? | :worker_has_external_dependencies! | false | [] | true :idempotent? | :idempotent! | false | [] | true :big_payload? | :big_payload! | false | [] | true - :database_health_check_attrs | :defer_on_database_health_signal | nil | [:gitlab_main, [:users], 1.minute] | { gitlab_schema: :gitlab_main, tables: [:users], delay_by: 1.minute } + :database_health_check_attrs | :defer_on_database_health_signal | nil | [:gitlab_main, [:users], 1.minute] | { gitlab_schema: :gitlab_main, tables: [:users], delay_by: 1.minute, block: nil } end # rubocop: enable Layout/LineLength