diff --git a/.rubocop_todo/gitlab/bounded_contexts.yml b/.rubocop_todo/gitlab/bounded_contexts.yml index fc6a5fa1c546190a4c579e9229af17b04cc9f101..0af6b47256341a2199130a4f30d4270a17d7de4f 100644 --- a/.rubocop_todo/gitlab/bounded_contexts.yml +++ b/.rubocop_todo/gitlab/bounded_contexts.yml @@ -1916,6 +1916,7 @@ Gitlab/BoundedContexts: - 'app/workers/concerns/worker_attributes.rb' - 'app/workers/concerns/worker_context.rb' - 'app/workers/concurrency_limit/resume_worker.rb' + - 'app/workers/concurrency_limit/recovery_worker.rb' - 'app/workers/container_expiration_policies/cleanup_container_repository_worker.rb' - 'app/workers/container_expiration_policy_worker.rb' - 'app/workers/counters/cleanup_refresh_worker.rb' diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index 6689dae91b4a5465f551698bb1af77d8f4907b11..cb1e310741e79f0ac6cc1dfab97a1f7af06fafe1 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -133,6 +133,16 @@ :idempotent: false :tags: [] :queue_namespace: :chaos +- :name: chaos:chaos_db_sleep + :worker_name: Chaos::DbSleepWorker + :feature_category: :not_owned + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 2 + :idempotent: false + :tags: [] + :queue_namespace: :chaos - :name: chaos:chaos_db_spin :worker_name: Chaos::DbSpinWorker :feature_category: :not_owned diff --git a/app/workers/chaos/db_sleep_worker.rb b/app/workers/chaos/db_sleep_worker.rb new file mode 100644 index 0000000000000000000000000000000000000000..55ad534d096f7e6847606301c8253b550887a11b --- /dev/null +++ b/app/workers/chaos/db_sleep_worker.rb @@ -0,0 +1,16 @@ +# frozen_string_literal: true + +module Chaos + class DbSleepWorker # rubocop:disable Scalability/IdempotentWorker + include ApplicationWorker + + data_consistency :always + + sidekiq_options retry: 3 + include ChaosQueue + + def perform(duration_s) + Gitlab::Chaos.db_sleep(duration_s) + end + end +end diff --git a/app/workers/concerns/application_worker.rb b/app/workers/concerns/application_worker.rb index fabb994a07a2270f8b1e28d6d5e4dac9eae18d74..9eaaae614fdb1d72c144b8063f5ec9e6531b3acc 100644 --- a/app/workers/concerns/application_worker.rb +++ b/app/workers/concerns/application_worker.rb @@ -254,7 +254,7 @@ def process_schedule_at_for_batch(schedule_at, index) end def set_default_concurrency_limit - concurrency_limit -> { 0 } + concurrency_limit -> { 3000 } end end end diff --git a/app/workers/concurrency_limit/recovery_worker.rb b/app/workers/concurrency_limit/recovery_worker.rb new file mode 100644 index 0000000000000000000000000000000000000000..a4718aae377ebf1818fffb26c98314ca5bb3ade9 --- /dev/null +++ b/app/workers/concurrency_limit/recovery_worker.rb @@ -0,0 +1,86 @@ +# frozen_string_literal: true + +module ConcurrencyLimit + class RecoveryWorker + include ApplicationWorker + include CronjobQueue # rubocop:disable Scalability/CronWorkerContext -- No context needed + + feature_category :scalability + data_consistency :sticky + idempotent! + + def perform(worker_name = nil) + if worker_name + recover!(worker_name) + else + recover_workers + end + end + + private + + def recover!(worker_name) + current = current_limit(worker_name) + max = max_limit(worker_name) + + # Don't exceed the max limit + new_limit = [recovery_strategy.concurrency_operator.call(current), max].min + + set_current_limit!(worker_name, limit: new_limit) + + Sidekiq.logger.info( + message: "Recovering concurrency limit for worker", + worker_name: worker_name, + previous_limit: current, + new_limit: new_limit, + max_limit: max + ) + + # TODO: Track that the limit was already recovered for the current minute + end + + def recover_workers + # Sidekiq.logger.info "recovery_worker unthrottled_workers #{unthrottled_workers}" + unthrottled_workers.each do |worker_name| + Sidekiq.logger.info "recovery_worker #{worker_name} current_limit #{current_limit(worker_name)} max_limit #{max_limit(worker_name)}" + next if current_limit(worker_name) >= max_limit(worker_name) + + recover!(worker_name) + end + end + + # Returns unthrottled workers in the previous minute + def unthrottled_workers + workers - throttled_workers + end + + def throttled_workers + Gitlab::SidekiqMiddleware::Throttling::ThrottlingTracker + .throttled_workers(time: Time.current - 1.minute) + end + + def workers + Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.workers.map(&:name) + end + + def recovery_strategy + Gitlab::SidekiqMiddleware::Throttling::Strategy::GradualRecovery + end + + def max_limit(worker_name) + Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.limit_for(worker: worker_name.safe_constantize) + end + + def current_limit(worker_name) + concurrency_limit_service.current_limit(worker_name) + end + + def set_current_limit!(worker_name, limit:) + concurrency_limit_service.set_current_limit!(worker_name, limit: limit) + end + + def concurrency_limit_service + Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService + end + end +end diff --git a/app/workers/concurrency_limit/resume_worker.rb b/app/workers/concurrency_limit/resume_worker.rb index 3e617c22cdbbb8ee8c02ab8515408033a4d45d20..5d5d64043ea2f3b750ada42fa5c8af691967bda3 100644 --- a/app/workers/concurrency_limit/resume_worker.rb +++ b/app/workers/concurrency_limit/resume_worker.rb @@ -40,7 +40,7 @@ def process_worker(worker_name) worker = worker_name.safe_constantize return unless worker - limit = ::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.limit_for(worker: worker) + limit = current_limit queue_size = queue_size(worker) current = concurrent_worker_count(worker) @@ -85,5 +85,9 @@ def resume_processing!(worker, limit:) def workers Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.workers end + + def current_limit + Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService.current_limit(worker_name) + end end end diff --git a/config/initializers/0_marginalia.rb b/config/initializers/0_marginalia.rb index af76e9f048d58679471164089bfd1e2c2a58dc58..0e74450d11e0dc362c944e41655d3077289925a0 100644 --- a/config/initializers/0_marginalia.rb +++ b/config/initializers/0_marginalia.rb @@ -12,7 +12,8 @@ # We only enable this in production because a number of tests do string # matching against the raw SQL, and prepending the comment prevents color # coding from working in the development log. -Marginalia::Comment.prepend_comment = true if Rails.env.production? +# Marginalia::Comment.prepend_comment = true if Rails.env.production? +Marginalia::Comment.prepend_comment = true Marginalia::Comment.components = [:application, :correlation_id, :jid, :endpoint_id, :db_config_database, :db_config_name, :console_hostname, :console_username] diff --git a/config/initializers/1_settings.rb b/config/initializers/1_settings.rb index c82a276b344072a5e553a4b88cc9d987514e57d5..2def3d7dc551828abed38b82b8ff31fca2b634ce 100644 --- a/config/initializers/1_settings.rb +++ b/config/initializers/1_settings.rb @@ -771,6 +771,9 @@ Settings.cron_jobs['import_placeholder_user_cleanup_worker'] ||= {} Settings.cron_jobs['import_placeholder_user_cleanup_worker']['cron'] ||= "0 0 * * *" Settings.cron_jobs['import_placeholder_user_cleanup_worker']['job_class'] = 'Import::PlaceholderUserCleanupWorker' +Settings.cron_jobs['concurrency_limit_recovery_worker'] ||= {} +Settings.cron_jobs['concurrency_limit_recovery_worker']['cron'] ||= "*/1 * * * *" +Settings.cron_jobs['concurrency_limit_recovery_worker']['job_class'] = "ConcurrencyLimit::RecoveryWorker" Gitlab.ee do Settings.cron_jobs['analytics_devops_adoption_create_all_snapshots_worker'] ||= {} diff --git a/config/initializers/sidekiq.rb b/config/initializers/sidekiq.rb index 09c4ee3702e7227688a88a84adeafd982ce97531..79b91b032d0335cadf94de2c15df37c39288fe20 100644 --- a/config/initializers/sidekiq.rb +++ b/config/initializers/sidekiq.rb @@ -139,3 +139,10 @@ def self.enabled? Sidekiq::Client.prepend Gitlab::SidekiqSharding::Validator::Client Sidekiq::RedisClientAdapter::CompatMethods.prepend Gitlab::SidekiqSharding::Validator Sidekiq::Job::Setter.prepend Gitlab::Patch::SidekiqJobSetter + +# Gitlab::BackgroundTask.new(Gitlab::SidekiqThrottling::ConcurrencyLimitRecoverer.new).start if Gitlab::Runtime.sidekiq? + +# rubocop:disable Cop/SidekiqApiUsage -- registering web UI doesn't need shard awareness +Sidekiq::Web.register Gitlab::SidekiqMiddleware::ConcurrencyLimit::Web::WebExtension +Sidekiq::Web.tabs["Concurrency Limit"] = "concurrency_limit" +# rubocop:enable Cop/SidekiqApiUsage diff --git a/lib/gitlab/chaos.rb b/lib/gitlab/chaos.rb index dcbf0d6000725e30536700ecc62ffc99c381a0f7..5fab6eb9e76242ccfdc26d7698888f4e843d6513 100644 --- a/lib/gitlab/chaos.rb +++ b/lib/gitlab/chaos.rb @@ -38,6 +38,10 @@ def self.db_spin(duration_s, interval_s) end end + def self.db_sleep(duration_s) + ApplicationRecord.connection.execute(ApplicationRecord.sanitize_sql_array(["SELECT PG_SLEEP(?)", duration_s])) + end + # sleep will sleep for the specified duration def self.sleep(duration_s) Kernel.sleep(duration_s) diff --git a/lib/gitlab/metrics/samplers/stat_activity_sampler.rb b/lib/gitlab/metrics/samplers/stat_activity_sampler.rb index 3dcbb116284c3c8a063abfc19d7be29ceef80899..24deaef3bbb6cc490265acc39568fcb5cc508a46 100644 --- a/lib/gitlab/metrics/samplers/stat_activity_sampler.rb +++ b/lib/gitlab/metrics/samplers/stat_activity_sampler.rb @@ -4,7 +4,8 @@ module Gitlab module Metrics module Samplers class StatActivitySampler < BaseSampler - DEFAULT_SAMPLING_INTERVAL_SECONDS = 60 + # TODO: Change back + DEFAULT_SAMPLING_INTERVAL_SECONDS = 10 def sample return unless ::Feature.enabled?(:sample_pg_stat_activity, Feature.current_pod, type: :ops) diff --git a/lib/gitlab/resource_usage_limiter.rb b/lib/gitlab/resource_usage_limiter.rb index 2f61385325f7c20fee8449222a1db38c992f1a6c..1a028f9ba6bf27124a91f6326da7c77e95262d1c 100644 --- a/lib/gitlab/resource_usage_limiter.rb +++ b/lib/gitlab/resource_usage_limiter.rb @@ -35,7 +35,7 @@ def throttled?(limit, peek: false) Gitlab::ApplicationRateLimiter.resource_usage_throttled?( limit.name, - resource_key: limit.resource_key, + resource_key: limit.resource_key.to_sym, scope: scope, threshold: limit.threshold, interval: limit.interval, diff --git a/lib/gitlab/sidekiq_limits.rb b/lib/gitlab/sidekiq_limits.rb index ea01242d7cc31a8f0843ae8132cd82970e19685c..89ee1ecfedf8d5aee2d9f78dad2554237c3c5ea6 100644 --- a/lib/gitlab/sidekiq_limits.rb +++ b/lib/gitlab/sidekiq_limits.rb @@ -45,8 +45,8 @@ module SidekiqLimits }, { selector: Gitlab::SidekiqConfig::WorkerMatcher.new("*"), - threshold: 1000, - interval: 60 + threshold: 5, + interval: 60 # TODO: Change back } ] }, diff --git a/lib/gitlab/sidekiq_middleware.rb b/lib/gitlab/sidekiq_middleware.rb index 814e0713ee9cd3a0a48cfa4c3247599596046a3e..d70952c11882b05b0f617c59a6aa5a2dc04e3333 100644 --- a/lib/gitlab/sidekiq_middleware.rb +++ b/lib/gitlab/sidekiq_middleware.rb @@ -80,6 +80,7 @@ def self.middlewares(metrics: true, arguments_logger: true, skip_jobs: true) # Any middlewares after DuplicateJobs::Server can return/intercept jobs. ::Gitlab::SidekiqMiddleware::DuplicateJobs::Server, ::Gitlab::SidekiqMiddleware::PauseControl::Server, + ::Gitlab::SidekiqMiddleware::Throttling::Server, ::Gitlab::SidekiqMiddleware::ConcurrencyLimit::Server, skip_jobs ? ::Gitlab::SidekiqMiddleware::SkipJobs : nil, ::Gitlab::Database::LoadBalancing::SidekiqServerMiddleware, diff --git a/lib/gitlab/sidekiq_middleware/concurrency_limit/concurrency_limit_service.rb b/lib/gitlab/sidekiq_middleware/concurrency_limit/concurrency_limit_service.rb index a7abe5e876931443ae330ea6c3f61e59c4b14571..0513018333eb4004191fb61ef7e9d083ebaf14cd 100644 --- a/lib/gitlab/sidekiq_middleware/concurrency_limit/concurrency_limit_service.rb +++ b/lib/gitlab/sidekiq_middleware/concurrency_limit/concurrency_limit_service.rb @@ -12,10 +12,16 @@ class ConcurrencyLimitService delegate :track_execution_start, :track_execution_end, :cleanup_stale_trackers, :concurrent_worker_count, to: :@worker_execution_tracker + delegate :current_limit, :set_current_limit!, to: :@dynamic_limit_manager + + delegate :status, :status=, to: :@state_manager + def initialize(worker_name) @worker_name = worker_name @queue_manager = QueueManager.new(worker_name: worker_name, prefix: REDIS_KEY_PREFIX) @worker_execution_tracker = WorkerExecutionTracker.new(worker_name: worker_name, prefix: REDIS_KEY_PREFIX) + @dynamic_limit_manager = DynamicLimitManager.new(worker_name: worker_name, prefix: REDIS_KEY_PREFIX) + @state_manager = StateManager.new(worker_name: worker_name, prefix: REDIS_KEY_PREFIX) end class << self @@ -54,6 +60,27 @@ def track_execution_end(worker_name) def concurrent_worker_count(worker_name) new(worker_name).concurrent_worker_count end + + def current_limit(worker_name) + new(worker_name).current_limit + end + + def over_the_limit?(worker_name) + puts "#{worker_name} concurrent worker count: #{concurrent_worker_count(worker_name)} vs current_limit(worker_name): #{current_limit(worker_name)}" + concurrent_worker_count(worker_name) > current_limit(worker_name) + end + + def set_current_limit!(worker_name, limit:) + new(worker_name).set_current_limit!(limit) + end + + def status(worker_name) + new(worker_name).status + end + + def status=(worker_name, new_status) + new(worker_name).status = new_status + end end end end diff --git a/lib/gitlab/sidekiq_middleware/concurrency_limit/dynamic_limit_manager.rb b/lib/gitlab/sidekiq_middleware/concurrency_limit/dynamic_limit_manager.rb new file mode 100644 index 0000000000000000000000000000000000000000..b56d5a0fb1b0bc88c06a3d1ac992ecb47682e296 --- /dev/null +++ b/lib/gitlab/sidekiq_middleware/concurrency_limit/dynamic_limit_manager.rb @@ -0,0 +1,57 @@ +# frozen_string_literal: true + +module Gitlab + module SidekiqMiddleware + module ConcurrencyLimit + class DynamicLimitManager + # TODO: Decide TTL + TTL = 30.minutes + attr_reader :worker_class, :worker_name + + def initialize(worker_name:, prefix:) + @worker_name = worker_name + @prefix = prefix + end + + # Gets the current limit value in Redis. + # Falls back on the max limit value set in WorkersMap if no limit is set in Redis. + # Returns 0 if no limit is set. + # @return [Integer] The current limit value, or 0 if no limit is set + def current_limit + value = with_redis do |r| + value = r.get(current_limit_key) + value.to_i if value + end + + return value unless value.nil? + + max = limit_from_workers_map + set_current_limit!(max) + max + end + + # Sets the current limit value in Redis + # @param value [Integer] The new limit value to set + def set_current_limit!(value) + with_redis do |r| + r.set(current_limit_key, value.to_i, ex: TTL) + end + end + + private + + def limit_from_workers_map + WorkersMap.limit_for(worker: worker_name.safe_constantize) + end + + def with_redis(&block) + Gitlab::Redis::QueuesMetadata.with(&block) # rubocop:disable CodeReuse/ActiveRecord -- Not active record + end + + def current_limit_key + "#{@prefix}:{#{@worker_name.underscore}}:current_limit" + end + end + end + end +end diff --git a/lib/gitlab/sidekiq_middleware/concurrency_limit/middleware.rb b/lib/gitlab/sidekiq_middleware/concurrency_limit/middleware.rb index d6be27343bbe81201dfebebe3323c50d24a333bc..5851451495c986dba46692d09b375a4a5d6dfabf 100644 --- a/lib/gitlab/sidekiq_middleware/concurrency_limit/middleware.rb +++ b/lib/gitlab/sidekiq_middleware/concurrency_limit/middleware.rb @@ -54,11 +54,12 @@ def should_defer_schedule? def should_defer_perform? return false if Feature.disabled?(:sidekiq_concurrency_limit_middleware, Feature.current_request, type: :ops) + return false if concurrency_service.status(worker_class) == 'disabled' return false if resumed? return true if has_jobs_in_queue? - ::Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.over_the_limit?(worker: worker) + concurrency_service.over_the_limit?(worker_class) end def concurrency_service diff --git a/lib/gitlab/sidekiq_middleware/concurrency_limit/state_manager.rb b/lib/gitlab/sidekiq_middleware/concurrency_limit/state_manager.rb new file mode 100644 index 0000000000000000000000000000000000000000..3b053cf1f24ecbb1479e1e930c9d8499119e5d53 --- /dev/null +++ b/lib/gitlab/sidekiq_middleware/concurrency_limit/state_manager.rb @@ -0,0 +1,37 @@ +# frozen_string_literal: true + +module Gitlab + module SidekiqMiddleware + module ConcurrencyLimit + class StateManager + attr_reader :redis_key, :worker_name + + ALLOWED_STATUS = %w[enabled disabled].freeze + DEFAULT_STATUS = 'enabled' + + def initialize(worker_name:, prefix:) + @worker_name = worker_name + @redis_key = "#{prefix}:{#{worker_name.underscore}}:status" + end + + def status + with_redis { |redis| redis.get(@redis_key) } || DEFAULT_STATUS + end + + def status=(new_status) + unless ALLOWED_STATUS.include?(new_status) + raise ArgumentError, "Invalid status value. Must be one of: #{ALLOWED_STATUS.join(', ')}" + end + + with_redis { |redis| redis.set(@redis_key, new_status) } + end + + private + + def with_redis(&) + Gitlab::Redis::SharedState.with(&) # rubocop:disable CodeReuse/ActiveRecord -- Not active record + end + end + end + end +end diff --git a/lib/gitlab/sidekiq_middleware/concurrency_limit/web/views/index.erb b/lib/gitlab/sidekiq_middleware/concurrency_limit/web/views/index.erb new file mode 100644 index 0000000000000000000000000000000000000000..f4c326dfc8d6f82483ad06c0b9af05a23fa9a245 --- /dev/null +++ b/lib/gitlab/sidekiq_middleware/concurrency_limit/web/views/index.erb @@ -0,0 +1,65 @@ +
+
+

<%= t('Concurrency Limit') %>

+
+ +
+ +
+
+ + + + + + + + + + + + <% @worker_names.each do |name| %> + + <% + status = Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService.status(name) + style = "#{status == 'disabled' ? "background: #ecc; color: #585454;": ""}" + current_limit = Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService.current_limit(name) + max_limit = Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.limit_for(worker: name.safe_constantize) + %> + + + + + + + + + + + + <% end %> + +
<%= t('Status') %><%= t('Name') %><%= t('Current Limit') %><%= t('Max Limit') %><%= t('Actions')%>
<%= status %> <%= name %> + + <%= max_limit %> + + +
diff --git a/lib/gitlab/sidekiq_middleware/concurrency_limit/web/web_extension.rb b/lib/gitlab/sidekiq_middleware/concurrency_limit/web/web_extension.rb new file mode 100644 index 0000000000000000000000000000000000000000..8ce1309eacfc462545b69f5e9e930093dc1bbcf2 --- /dev/null +++ b/lib/gitlab/sidekiq_middleware/concurrency_limit/web/web_extension.rb @@ -0,0 +1,55 @@ +# frozen_string_literal: true +# +module Gitlab + module SidekiqMiddleware + module ConcurrencyLimit + module Web + module WebExtension + def self.registered(app) + # Index page of concurrency limit settings. + app.get '/concurrency_limit' do + @worker_names = Gitlab::SidekiqConfig.workers.map { |w| w.klass.name } + .filter(&:present?) + .sort + x = params[:substr] + @worker_names.filter! { |worker_name| worker_name&.downcase&.include?(x.downcase) } if x && x != "" + + view_path = File.join(File.expand_path(__dir__), "views") + + render(:erb, File.read(File.join(view_path, "index.erb"))) + end + + # Set current limit for workers. + app.post '/concurrency_limit/:name/current_limit' do + new_limit = params['new_limit'] + worker_name = route_params[:name] + + Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService.set_current_limit!( + worker_name, + limit: new_limit + ) + + redirect_with_query "#{root_path}concurrency_limit" + end + + # Disable concurrency limit + app.post '/concurrency_limit/:name/disable' do + worker_name = route_params[:name] + + Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService.new(worker_name).status = 'disabled' + redirect_with_query "#{root_path}concurrency_limit" + end + + # Enable concurrency limit + app.post '/concurrency_limit/:name/enable' do + worker_name = route_params[:name] + + Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService.new(worker_name).status = 'enabled' + redirect_with_query "#{root_path}concurrency_limit" + end + end + end + end + end + end +end diff --git a/lib/gitlab/sidekiq_middleware/server_metrics.rb b/lib/gitlab/sidekiq_middleware/server_metrics.rb index b15594623c59a363ebfc9aa01a6a366a845f9ee7..9e500f450c749cb9b700dcf1af4a1b20038225f3 100644 --- a/lib/gitlab/sidekiq_middleware/server_metrics.rb +++ b/lib/gitlab/sidekiq_middleware/server_metrics.rb @@ -193,6 +193,7 @@ def record_resource_usage_counters @metrics[:sidekiq_jobs_gitaly_seconds_sum].increment(labels, get_gitaly_time(instrumentation)) @metrics[:sidekiq_redis_requests_duration_seconds_sum].increment(labels, get_redis_time(instrumentation)) @metrics[:sidekiq_elasticsearch_requests_duration_seconds_sum].increment(labels, get_elasticsearch_time(instrumentation)) + # TODO: Record primary DB usage, same as in logs json.db_main_duration_s end end diff --git a/lib/gitlab/sidekiq_middleware/throttling/decider.rb b/lib/gitlab/sidekiq_middleware/throttling/decider.rb new file mode 100644 index 0000000000000000000000000000000000000000..03c5f8459001b0ed6a3e675cb3d22410519fecac --- /dev/null +++ b/lib/gitlab/sidekiq_middleware/throttling/decider.rb @@ -0,0 +1,71 @@ +# frozen_string_literal: true + +module Gitlab + module SidekiqMiddleware + module Throttling + class Decider + Decision = Struct.new(:needs_throttle, :strategy) + + def initialize(worker_name) + @worker_name = worker_name + end + + # Returns Decision on how to throttle the worker + # + # @return Decision + def execute + # TODO: Add logging/metrics + # return Decision.new(false, Strategy::None) unless spike_detected? + return Decision.new(false, Strategy::None) unless db_duration_exceeded_quota? + + return Decision.new(true, Strategy::HardThrottle) if dominant_in_pg_stat_activity? + + Decision.new(true, Strategy::SoftThrottle) + end + + private + + attr_reader :worker_name + + def db_duration_exceeded_quota? + Gitlab::ResourceUsageLimiter.new(worker_name: worker_name).exceeded_limits? + end + + def dominant_in_pg_stat_activity? + Gitlab::Database::LoadBalancing.base_models.each do |load_balancing_base_model| + next unless dominant_by_db?(load_balancing_base_model) + + Sidekiq.logger.info "pg_stat_activity: dominant worker #{worker_name} db_connection #{db_connection_name(load_balancing_base_model)}" + return true + end + + false + end + + def dominant_by_db?(load_balancing_base_model) + by_db = Gitlab::Database::StatActivity + .new(db_connection_name(load_balancing_base_model)) + .non_idle_connections_by_db(4) + + Sidekiq.logger.info "dominant_by_db? #{worker_name} #{by_db}" + by_db.each_value do |count_aggregates| + next unless count_aggregates.has_key?(worker_name) + + return true if count_aggregates[worker_name] == count_aggregates.values.max + end + + false + end + + def spike_detected? + # TODO: EWMA implementation + false + end + + def db_connection_name(base_model) + base_model.connection.load_balancer.name + end + end + end + end +end diff --git a/lib/gitlab/sidekiq_middleware/throttling/middleware.rb b/lib/gitlab/sidekiq_middleware/throttling/middleware.rb new file mode 100644 index 0000000000000000000000000000000000000000..018f6332e39080a5dd56874f7097da0ef37f3773 --- /dev/null +++ b/lib/gitlab/sidekiq_middleware/throttling/middleware.rb @@ -0,0 +1,74 @@ +# frozen_string_literal: true + +module Gitlab + module SidekiqMiddleware + module Throttling + class Middleware + def initialize(worker, job) + @job = job + + @worker_class = worker.is_a?(Class) ? worker : worker.class + @worker_name = worker_class.name + end + + def perform + # TODO: Use exclusive lock with a short few seconds TTL maybe? + Sidekiq.logger.info "already_throttled #{worker_name} #{already_throttled?}" + return yield if already_throttled? + + decision = throttling_decision + return yield unless decision.needs_throttle + + throttle!(decision.strategy) + yield + end + + private + + attr_reader :worker_class, :worker_name, :decisions + + def throttling_decision + Gitlab::SidekiqMiddleware::Throttling::Decider.new(worker_name).execute + end + + def throttle!(strategy) + Sidekiq.logger.info( + class: worker_name, + throttling_decision: strategy.name, + message: "#{worker_name} is being throttled with strategy #{strategy.name}" + ) + + new_limit = strategy.concurrency_operator.call(current_limit) + + set_current_limit!(limit: new_limit) + + Gitlab::SidekiqMiddleware::Throttling::ThrottlingTracker.record(worker_name) + Sidekiq.logger.info "Throttled #{worker_name} to #{new_limit}" + end + + def current_limit + concurrency_limit_service.current_limit(worker_name) + end + + def set_current_limit!(limit:) + concurrency_limit_service.set_current_limit!(worker_name, limit: limit) + end + + def concurrency_limit_service + Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService + end + + def already_throttled? + Gitlab::SidekiqMiddleware::Throttling::ThrottlingTracker.throttled?(worker_name) + end + + def throttle_worker_class! + decisions.each do |decision| + ::Gitlab::SidekiqMiddleware::Throttling::ThrottlingService.new( + decision.worker_class, decision.strategy).execute + end + end + end + end + end +end diff --git a/lib/gitlab/sidekiq_middleware/throttling/server.rb b/lib/gitlab/sidekiq_middleware/throttling/server.rb new file mode 100644 index 0000000000000000000000000000000000000000..db8bdc6c59b5e075781ab1e6efba8910da509cc4 --- /dev/null +++ b/lib/gitlab/sidekiq_middleware/throttling/server.rb @@ -0,0 +1,13 @@ +# frozen_string_literal: true + +module Gitlab + module SidekiqMiddleware + module Throttling + class Server + def call(worker_class, job, _queue, &block) + ::Gitlab::SidekiqMiddleware::Throttling::Middleware.new(worker_class, job).perform(&block) + end + end + end + end +end diff --git a/lib/gitlab/sidekiq_middleware/throttling/strategy.rb b/lib/gitlab/sidekiq_middleware/throttling/strategy.rb new file mode 100644 index 0000000000000000000000000000000000000000..07acd7b694577220eecb7bb2c090524315e0509a --- /dev/null +++ b/lib/gitlab/sidekiq_middleware/throttling/strategy.rb @@ -0,0 +1,19 @@ +# frozen_string_literal: true + +module Gitlab + module SidekiqMiddleware + module Throttling + module Strategy + StrategyStruct = Struct.new(:name, :concurrency_operator) + + SoftThrottle = StrategyStruct.new("SoftThrottle", ->(limit) { [(0.8 * limit).ceil, 1].max }) + HardThrottle = StrategyStruct.new("HardThrottle", ->(limit) { [(0.5 * limit).ceil, 1].max }) + + GradualRecovery = StrategyStruct.new("GradualRecovery", ->(limit) { [limit + 1, (limit * 1.1).ceil].max }) + + # no-op + None = StrategyStruct.new("None", nil) + end + end + end +end diff --git a/lib/gitlab/sidekiq_middleware/throttling/throttling_tracker.rb b/lib/gitlab/sidekiq_middleware/throttling/throttling_tracker.rb new file mode 100644 index 0000000000000000000000000000000000000000..b55ef96fe240d4ef77879b5166b9740d8171f320 --- /dev/null +++ b/lib/gitlab/sidekiq_middleware/throttling/throttling_tracker.rb @@ -0,0 +1,57 @@ +# frozen_string_literal: true + +module Gitlab + module SidekiqMiddleware + module Throttling + class ThrottlingTracker + INTERVAL_SECONDS = 60 + TTL = 2.minutes + + class << self + def throttled_workers(time:) + with_redis do |redis| + redis.smembers(lookup_key(period_key(time))) + end + end + + def record(worker_name) + with_redis do |redis| + redis.pipelined do |pipeline| + pipeline.set(cache_key(worker_name, period_key), "true", ex: TTL) + pipeline.sadd(lookup_key(period_key), worker_name) + pipeline.expire(lookup_key(period_key), TTL) + end + end + end + + # Returns whether the worker was already throttled within the current minute + def throttled?(worker_name) + with_redis do |redis| + redis.exists?(cache_key(worker_name, period_key)) + end + end + + private + + attr_reader :worker_name + + def period_key(time = Time.current) + time.to_i.divmod(INTERVAL_SECONDS).first + end + + def cache_key(worker_name, period_key) + "sidekiq:throttling:worker:{#{worker_name}}:#{period_key}:throttled" + end + + def lookup_key(period_key) + "sidekiq:throttling:worker:lookup:#{period_key}:throttled" + end + + def with_redis(&block) + Gitlab::Redis::QueuesMetadata.with(&block) + end + end + end + end + end +end diff --git a/lib/gitlab/sidekiq_throttling/concurrency_limit_recoverer.rb b/lib/gitlab/sidekiq_throttling/concurrency_limit_recoverer.rb new file mode 100644 index 0000000000000000000000000000000000000000..70261a829ed370bba5ccc2b65bbfd542b22c196a --- /dev/null +++ b/lib/gitlab/sidekiq_throttling/concurrency_limit_recoverer.rb @@ -0,0 +1,99 @@ +# frozen_string_literal: true + +module Gitlab + module SidekiqThrottling + class ConcurrencyLimitRecoverer + include ExclusiveLeaseGuard + + INTERVAL = 1.minute + + def initialize + @alive = true + end + + def call + while @alive + sleep(sleep_interval) + + try_obtain_lease do + unthrottled_workers.each do |worker| + Sidekiq.logger.info "recoverer worker #{worker} #{worker.class}" + next if current_limit(worker) >= max_limit(worker) + + recover!(worker) + end + end + + stop + end + end + + def stop + @alive = false + end + + def lease_timeout + 1.minute + end + + private + + def sleep_interval + (INTERVAL - 10..INTERVAL + 10).step(0.1).to_a.sample + end + + # Returns unthrottled workers in the previous minute + def unthrottled_workers + workers - throttled_workers + end + + def throttled_workers + Gitlab::SidekiqMiddleware::Throttling::ThrottlingTracker + .throttled_workers(time: Time.current - 1.minute) + end + + def workers + Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.workers.map(&:name) + end + + def current_limit(worker) + concurrency_limit_service.current_limit(worker) + end + + def max_limit(worker) + Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.limit_for(worker: worker) + end + + # Gradually recover the concurrency limit for a worker + def recover!(worker) + current = current_limit(worker) + max = max_limit(worker) + + # Don't exceed the max limit + new_limit = [recovery_strategy.call(current), max].min + + set_current_limit!(worker, limit: new_limit) + + Gitlab::AppLogger.info( + message: "Recovering concurrency limit for worker", + worker_name: worker.name, + previous_limit: current, + new_limit: new_limit, + max_limit: max + ) + end + + def recovery_strategy + Gitlab::SidekiqMiddleware::Throttling::Strategy::GradualRecovery + end + + def set_current_limit!(worker, limit:) + concurrency_limit_service.set_current_limit!(worker, limit: limit) + end + + def concurrency_limit_service + Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService + end + end + end +end diff --git a/lib/gitlab/sidekiq_throttling/concurrency_limit_throttler.rb b/lib/gitlab/sidekiq_throttling/concurrency_limit_throttler.rb new file mode 100644 index 0000000000000000000000000000000000000000..a627ec9bfcd0b3a3ee416d6068b01926d9922794 --- /dev/null +++ b/lib/gitlab/sidekiq_throttling/concurrency_limit_throttler.rb @@ -0,0 +1,73 @@ +# frozen_string_literal: true + +module Gitlab + module SidekiqThrottling + class ConcurrencyLimitThrottler + include ExclusiveLeaseGuard + + def initialize + @alive = true + end + + def call + while @alive + sleep(1.minute) + + try_obtain_lease do + workers.each do |worker| + decision = throttling_decision(worker) + + next unless decision.needs_throttle + + throttle!(worker, strategy) + end + end + + stop + end + end + + def stop + @alive = false + end + + def lease_timeout + 1.minute + end + + private + + def throttling_decision(worker) + Gitlab::SidekiqMiddleware::Throttling::Decider.new(worker.name).execute + end + + def throttle!(worker, strategy) + current_limit = concurrency_limit_service.current_limit(worker) + new_limit = strategy.call(current_limit) + + # TODO: Add logging/metrics + set_current_limit!(worker, limit: new_limit) + end + + def current_limit(worker) + concurrency_limit_service.current_limit(worker) + end + + def set_current_limit!(worker, limit:) + concurrency_limit_service.set_current_limit!(worker, limit: limit) + end + + def strategies + Gitlab::SidekiqMiddleware::Throttling::Strategy + end + + def workers + Gitlab::SidekiqMiddleware::ConcurrencyLimit::WorkersMap.workers + end + + def concurrency_limit_service + Gitlab::SidekiqMiddleware::ConcurrencyLimit::ConcurrencyLimitService + end + end + end +end