From 8189c1730c65273637d3ac268d66b228c9d38950 Mon Sep 17 00:00:00 2001 From: Pedro Pombeiro Date: Thu, 4 Sep 2025 12:49:41 +0200 Subject: [PATCH 1/7] Retry job if runner does not acknowledge it in a timely fashion Add support 2-phase commit of pending jobs in `PUT /jobs/:id` Related to https://gitlab.com/gitlab-org/gitlab/-/issues/341293 Changelog: added # Conflicts: # doc/api/openapi/openapi_v2.yaml # lib/api/ci/runner.rb --- app/assets/javascripts/editor/schema/ci.json | 4 + app/finders/ci/auth_job_finder.rb | 2 + app/models/ci/build.rb | 55 +++ app/models/concerns/enums/ci/commit_status.rb | 1 + app/presenters/commit_status_presenter.rb | 3 +- app/services/ci/register_job_service.rb | 57 ++- app/services/ci/retry_waiting_job_service.rb | 48 ++ app/services/ci/update_build_queue_service.rb | 4 +- app/services/ci/update_build_state_service.rb | 37 ++ app/workers/all_queues.yml | 10 + .../ci/retry_stuck_waiting_job_worker.rb | 29 ++ .../allow_runner_job_acknowledgement.yml | 10 + config/sidekiq_queues.yml | 2 + doc/api/graphql/reference/_index.md | 1 + doc/api/openapi/openapi_v2.yaml | 4 +- doc/development/cicd/_index.md | 86 +++- doc/development/cicd/two_phase_job_commit.md | 169 +++++++ lib/api/ci/runner.rb | 9 +- lib/gitlab/ci/build/auto_retry.rb | 3 +- lib/gitlab/ci/queue/metrics.rb | 3 + lib/gitlab/ci/status/build/failed.rb | 1 + spec/factories/ci/builds.rb | 24 + spec/factories/ci/runner_machines.rb | 4 + spec/lib/gitlab/ci/build/auto_retry_spec.rb | 3 + spec/models/ci/build_two_phase_commit_spec.rb | 422 +++++++++++++++++ spec/models/ci/runner_manager_spec.rb | 98 ++++ spec/models/commit_status_spec.rb | 1 + ...unner_job_confirmation_integration_spec.rb | 324 ++++++++++++++ .../api/ci/runner_job_confirmation_spec.rb | 304 +++++++++++++ .../ci/cancel_pipeline_service_spec.rb | 2 +- ...ister_job_service_two_phase_commit_spec.rb | 423 ++++++++++++++++++ .../ci/retry_waiting_job_service_spec.rb | 211 +++++++++ .../ci/update_build_queue_service_spec.rb | 6 + .../ci/update_build_state_service_spec.rb | 275 +++++++++++- .../ci/retry_stuck_waiting_job_worker_spec.rb | 213 +++++++++ 35 files changed, 2826 insertions(+), 22 deletions(-) create mode 100644 app/services/ci/retry_waiting_job_service.rb create mode 100644 app/workers/ci/retry_stuck_waiting_job_worker.rb create mode 100644 config/feature_flags/gitlab_com_derisk/allow_runner_job_acknowledgement.yml create mode 100644 doc/development/cicd/two_phase_job_commit.md create mode 100644 spec/models/ci/build_two_phase_commit_spec.rb create mode 100644 spec/requests/api/ci/runner_job_confirmation_integration_spec.rb create mode 100644 spec/requests/api/ci/runner_job_confirmation_spec.rb create mode 100644 spec/services/ci/register_job_service_two_phase_commit_spec.rb create mode 100644 spec/services/ci/retry_waiting_job_service_spec.rb create mode 100644 spec/workers/ci/retry_stuck_waiting_job_worker_spec.rb diff --git a/app/assets/javascripts/editor/schema/ci.json b/app/assets/javascripts/editor/schema/ci.json index 4b18e88db5450e..7827b6a83707c2 100644 --- a/app/assets/javascripts/editor/schema/ci.json +++ b/app/assets/javascripts/editor/schema/ci.json @@ -1981,6 +1981,10 @@ { "const": "data_integrity_failure", "description": "Retry if there is an unknown job problem." + }, + { + "const": "runner_provisioning_timeout", + "description": "Retry if the runner manager did not provision a runner to pick up the job in time." } ] }, diff --git a/app/finders/ci/auth_job_finder.rb b/app/finders/ci/auth_job_finder.rb index 50a6a47b70c49c..9351f57943497a 100644 --- a/app/finders/ci/auth_job_finder.rb +++ b/app/finders/ci/auth_job_finder.rb @@ -60,6 +60,8 @@ def validate_job!(job) end def validate_executing_job!(job) + return if job.waiting_for_runner_ack? + raise NotRunningJobError, 'Job is not running' unless Ci::HasStatus::EXECUTING_STATUSES.include?(job.status) end diff --git a/app/models/ci/build.rb b/app/models/ci/build.rb index 952eca00bcce58..f779721c4846e4 100644 --- a/app/models/ci/build.rb +++ b/app/models/ci/build.rb @@ -19,6 +19,11 @@ class Build < Ci::Processable self.allow_legacy_sti_class = true + # The `RUNNER_ACK_QUEUE_EXPIRY_TIME` indicates the longest interval that GitLab will wait for a ping from a Runner + # supporting 2-phase commit to either continue waiting (status=pending) or accept (status=running) a job that is + # pending for that runner. + RUNNER_ACK_QUEUE_EXPIRY_TIME = 2.minutes + belongs_to :project, inverse_of: :builds belongs_to :runner belongs_to :erased_by, class_name: 'User' @@ -1236,6 +1241,47 @@ def token super end + # + # Support for two-phase runner job acceptance acknowledgement + # + def waiting_for_runner_ack? + pending? && runner_id.present? && runner_manager_id_waiting_for_ack.present? + end + + # Create a Redis cache entry containing the runner manager id on which we're waiting on + # for acknowledgement (job accepted or job declined) + def set_waiting_for_runner_ack(runner_manager_id) + return unless runner_manager_id.present? + + with_redis do |redis| + # Store runner manager ID for this job, only if key does not yet exist + redis.set(runner_build_ack_queue_key, runner_manager_id, ex: RUNNER_ACK_QUEUE_EXPIRY_TIME, nx: true) + end + end + + # Update the ttl for the Redis cache entry containing the runner manager id on which we're waiting on + # for acknowledgement (job accepted or job declined) + def heartbeat_runner_ack_wait(runner_manager_id) + return unless runner_manager_id.present? + + with_redis do |redis| + # Update TTL, only if key already exists + redis.set(runner_build_ack_queue_key, runner_manager_id, ex: RUNNER_ACK_QUEUE_EXPIRY_TIME, nx: false) + end + end + + # Remove the Redis cache entry containing the runner manager id on which we're waiting on + # for acknowledgement (job accepted or job declined) + def cancel_wait_for_runner_ack + with_redis do |redis| + redis.del(runner_build_ack_queue_key) + end + end + + def runner_manager_id_waiting_for_ack + with_redis { |redis| redis.get(runner_build_ack_queue_key)&.to_i } + end + protected def run_status_commit_hooks! @@ -1394,6 +1440,15 @@ def partition_id_prefix_in_16_bit_encode def prefix_and_partition_for_token TOKEN_PREFIX + partition_id_prefix_in_16_bit_encode end + + def with_redis(&block) + # Use SharedState to avoid cache evictions + Gitlab::Redis::SharedState.with(&block) + end + + def runner_build_ack_queue_key + "runner:build_ack_queue:#{token}" + end end end diff --git a/app/models/concerns/enums/ci/commit_status.rb b/app/models/concerns/enums/ci/commit_status.rb index 45acd5595521c5..bb7a2e426cf67d 100644 --- a/app/models/concerns/enums/ci/commit_status.rb +++ b/app/models/concerns/enums/ci/commit_status.rb @@ -30,6 +30,7 @@ def self.failure_reasons environment_creation_failure: 21, deployment_rejected: 22, failed_outdated_deployment_job: 23, + runner_provisioning_timeout: 24, protected_environment_failure: 1_000, insufficient_bridge_permissions: 1_001, downstream_bridge_project_not_found: 1_002, diff --git a/app/presenters/commit_status_presenter.rb b/app/presenters/commit_status_presenter.rb index 7ccd4f1a11c839..c753e82a80d7d5 100644 --- a/app/presenters/commit_status_presenter.rb +++ b/app/presenters/commit_status_presenter.rb @@ -38,7 +38,8 @@ class CommitStatusPresenter < Gitlab::View::Presenter::Delegated deployment_rejected: 'This deployment job was rejected.', ip_restriction_failure: "This job could not be executed because group IP address restrictions are enabled, and the runner's IP address is not in the allowed range.", failed_outdated_deployment_job: 'The deployment job is older than the latest deployment, and therefore failed.', - reached_downstream_pipeline_trigger_rate_limit: 'Too many downstream pipelines triggered in the last minute. Try again later.' + reached_downstream_pipeline_trigger_rate_limit: 'Too many downstream pipelines triggered in the last minute. Try again later.', + runner_provisioning_timeout: 'The runner manager did not provision a runner to pick up the job in time. The job will be retried automatically.' }.freeze private_constant :CALLOUT_FAILURE_MESSAGES diff --git a/app/services/ci/register_job_service.rb b/app/services/ci/register_job_service.rb index f47b53d9886996..80499dff8d2f7d 100644 --- a/app/services/ci/register_job_service.rb +++ b/app/services/ci/register_job_service.rb @@ -287,11 +287,9 @@ def assign_runner!(build, params) else @metrics.increment_queue_operation(:runner_pre_assign_checks_success) - @logger.instrument(:assign_runner_run) do - build.run! - end + return assign_job_to_waiting_state(build, runner_manager) if runner_supports_job_acknowledgment?(build, params) - build.runner_manager = runner_manager if runner_manager + assign_job_to_running_state(build, runner_manager) end !failure_reason @@ -365,6 +363,57 @@ def pre_assign_runner_checks user_blocked: ->(build, _) { build.user&.blocked? } } end + + def runner_supports_job_acknowledgment?(build, params) + return if Feature.disabled?(:allow_runner_job_acknowledgement, build.project.root_namespace) + + !!params.dig(:info, :features, :two_phase_job_commit) + end + + def assign_job_to_waiting_state(build, runner_manager) + # The runner supports two-phase commit. Let's remove the build from the `ci_pending_builds` table so that it + # won't be assigned to other runners, while we wait for the runner to accept or decline the job. + # In the meantime, the build/runner manager association will live in Redis. + success = false + @logger.instrument(:assign_runner_waiting) do + # Add pending job to Redis + build.set_waiting_for_runner_ack(runner_manager.id) + + # Save job and remove pending job from db queue + Ci::Build.transaction do + build.save! + + Ci::UpdateBuildQueueService.new.remove!(build) + end + + Ci::RetryStuckWaitingJobWorker.perform_in(Ci::Build::RUNNER_ACK_QUEUE_EXPIRY_TIME, build.id) + + success = true + rescue ActiveRecord::ActiveRecordError + # If we didn't manage to remove pending job, let's roll back the Redis change + build.cancel_wait_for_runner_ack + rescue Redis::BaseError + break + end + + @metrics.increment_queue_operation(:runner_assigned_waiting) if success + + success + end + + def assign_job_to_running_state(build, runner_manager) + # The runner does not support two-phase commit. Let's move the job to `running` state immediately. + @logger.instrument(:assign_runner_run) do + build.run! + end + + # The runner_manager join record is created immediately, since it is marked as `autosave: true` to avoid race + # conditions when one runner manager is assigned the job, and others are competing for the same job, + # which would cause duplicate key constraint failures. + # By only assigning the runner manager once the job starts running, we avoid the problem. + build.runner_manager = runner_manager if runner_manager + @metrics.increment_queue_operation(:runner_assigned_run) + end end end diff --git a/app/services/ci/retry_waiting_job_service.rb b/app/services/ci/retry_waiting_job_service.rb new file mode 100644 index 00000000000000..a4fad3e31e08a7 --- /dev/null +++ b/app/services/ci/retry_waiting_job_service.rb @@ -0,0 +1,48 @@ +# frozen_string_literal: true + +module Ci + class RetryWaitingJobService + attr_reader :build, :metrics + + def initialize(build, metrics = ::Gitlab::Ci::Queue::Metrics) + @build = build + @metrics = metrics + end + + def execute + metrics.increment_queue_operation(:runner_queue_timeout) + + return job_not_waiting_error unless build_waiting? + return job_not_finished_waiting_error unless can_drop_and_retry? + + # build.drop! will cause build to be retried automatically, if the retry count is below the limit + build.drop!(:runner_provisioning_timeout) + + return job_not_auto_retryable_error unless build.retried? + + ServiceResponse.success + end + + private + + def build_waiting? + build&.pending? && build.runner_id.present? + end + + def can_drop_and_retry? + build.pending? && !build.waiting_for_runner_ack? + end + + def job_not_waiting_error + ServiceResponse.error(message: 'Job is not in waiting state', payload: { reason: :not_in_waiting_state }) + end + + def job_not_finished_waiting_error + ServiceResponse.error(message: 'Job is not finished waiting', payload: { reason: :not_finished_waiting }) + end + + def job_not_auto_retryable_error + ServiceResponse.error(message: 'Job is not auto-retryable', payload: { job: build, reason: :not_auto_retryable }) + end + end +end diff --git a/app/services/ci/update_build_queue_service.rb b/app/services/ci/update_build_queue_service.rb index cfe841bd13fc10..8345d883caf79a 100644 --- a/app/services/ci/update_build_queue_service.rb +++ b/app/services/ci/update_build_queue_service.rb @@ -33,7 +33,9 @@ def push(build, transition) def pop(build, transition) raise InvalidQueueTransition unless transition.from == 'pending' - transition.within_transaction { remove!(build) } + transition.within_transaction do + remove!(build).tap { build.cancel_wait_for_runner_ack } + end end ## diff --git a/app/services/ci/update_build_state_service.rb b/app/services/ci/update_build_state_service.rb index 10dd5d7f4758cb..5a59dadf8f022b 100644 --- a/app/services/ci/update_build_state_service.rb +++ b/app/services/ci/update_build_state_service.rb @@ -19,6 +19,12 @@ def initialize(build, params, metrics = ::Gitlab::Ci::Trace::Metrics.new) end def execute + # Handle two-phase commit workflow for jobs waiting for runner acknowledgment + if build.waiting_for_runner_ack? + result = handle_runner_ack_workflow + return result unless result.status == 100 # Continue processing if returned HTTP 100 Continue + end + unless accept_available? return update_build_state! end @@ -113,6 +119,37 @@ def update_build_state! end end + def handle_runner_ack_workflow + case build_state + when 'pending' + if build.runner_manager.present? + return Result.new(status: 409) # Conflict: Job is already assigned to a runner + end + + # Keep-alive signal during runner preparation + # The runner is still preparing and confirming it can handle the job + build.heartbeat_runner_ack_wait(build.runner_manager_id_waiting_for_ack) + + Result.new(status: 200) + when 'running' + # Runner is ready to start execution and has accepted the job - transition job to running state + runner_manager = ::Ci::RunnerManager.find_by_id(build.runner_manager_id_waiting_for_ack) + + if runner_manager.nil? + return Result.new(status: 409) # Conflict: Job is not in pending state or not assigned to a runner + end + + # Transition job to running state and assign the runner manager + build.run! + build.runner_manager = runner_manager + + Result.new(status: 100) # Continue processing with update_build_state! + else + # Invalid state for two-phase commit workflow + Result.new(status: 400) + end + end + def discard_build_trace! metrics.increment_trace_operation(operation: :discarded) end diff --git a/app/workers/all_queues.yml b/app/workers/all_queues.yml index 166fa1c4e162c4..d2eea0e23b0969 100644 --- a/app/workers/all_queues.yml +++ b/app/workers/all_queues.yml @@ -3518,6 +3518,16 @@ :idempotent: true :tags: [] :queue_namespace: +- :name: ci_retry_stuck_waiting_job + :worker_name: Ci::RetryStuckWaitingJobWorker + :feature_category: :continuous_integration + :has_external_dependencies: false + :urgency: :low + :resource_boundary: :unknown + :weight: 1 + :idempotent: true + :tags: [] + :queue_namespace: - :name: ci_runners_process_runner_version_update :worker_name: Ci::Runners::ProcessRunnerVersionUpdateWorker :feature_category: :fleet_visibility diff --git a/app/workers/ci/retry_stuck_waiting_job_worker.rb b/app/workers/ci/retry_stuck_waiting_job_worker.rb new file mode 100644 index 00000000000000..1c9e6abf7dc39d --- /dev/null +++ b/app/workers/ci/retry_stuck_waiting_job_worker.rb @@ -0,0 +1,29 @@ +# frozen_string_literal: true + +module Ci + class RetryStuckWaitingJobWorker + include ApplicationWorker + + # Do not execute (in fact, don't even enqueue) another instance of + # this Worker with the same args. + deduplicate :until_executed, including_scheduled: true + data_consistency :sticky + idempotent! + + feature_category :continuous_integration + + RETRY_TIMEOUT = 30 + + def perform(build_id) + build = Ci::Build.find_by_id(build_id) + + RetryWaitingJobService.new(build).execute.tap do |result| + if result.error? && result.payload[:reason] == :not_finished_waiting + # If job is still waiting for runner ack (meaning runner is taking longer than expected to provision, + # but still actively sending a heartbeat), then let's reschedule this job. + self.class.perform_in(RETRY_TIMEOUT) + end + end + end + end +end diff --git a/config/feature_flags/gitlab_com_derisk/allow_runner_job_acknowledgement.yml b/config/feature_flags/gitlab_com_derisk/allow_runner_job_acknowledgement.yml new file mode 100644 index 00000000000000..0027adddb98239 --- /dev/null +++ b/config/feature_flags/gitlab_com_derisk/allow_runner_job_acknowledgement.yml @@ -0,0 +1,10 @@ +--- +name: allow_runner_job_acknowledgement +description: Allow GitLab Runner to acknowledge acceptance of a CI job +feature_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/464048 +introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/204265 +rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/568905 +milestone: '18.5' +group: group::runner +type: gitlab_com_derisk +default_enabled: false diff --git a/config/sidekiq_queues.yml b/config/sidekiq_queues.yml index a9ed99b46c1bbf..0adec81f615adf 100644 --- a/config/sidekiq_queues.yml +++ b/config/sidekiq_queues.yml @@ -231,6 +231,8 @@ - 1 - - ci_parse_secure_file_metadata - 1 +- - ci_retry_stuck_waiting_job + - 1 - - ci_runners_export_usage_csv - 1 - - ci_runners_process_runner_version_update diff --git a/doc/api/graphql/reference/_index.md b/doc/api/graphql/reference/_index.md index 52aa14e05f7757..e71dd9ed9528f4 100644 --- a/doc/api/graphql/reference/_index.md +++ b/doc/api/graphql/reference/_index.md @@ -47290,6 +47290,7 @@ Available input types. | `REACHED_DOWNSTREAM_PIPELINE_TRIGGER_RATE_LIMIT` | A job that failed due to reached downstream pipeline trigger rate limit. | | `REACHED_MAX_DESCENDANT_PIPELINES_DEPTH` | A job that failed due to reached max descendant pipelines depth. | | `REACHED_MAX_PIPELINE_HIERARCHY_SIZE` | A job that failed due to reached max pipeline hierarchy size. | +| `RUNNER_PROVISIONING_TIMEOUT` | A job that failed due to runner provisioning timeout. | | `RUNNER_SYSTEM_FAILURE` | A job that failed due to runner system failure. | | `RUNNER_UNSUPPORTED` | A job that failed due to runner unsupported. | | `SCHEDULER_FAILURE` | A job that failed due to scheduler failure. | diff --git a/doc/api/openapi/openapi_v2.yaml b/doc/api/openapi/openapi_v2.yaml index 45020cb3726003..1463e59c9c9c7d 100644 --- a/doc/api/openapi/openapi_v2.yaml +++ b/doc/api/openapi/openapi_v2.yaml @@ -40731,6 +40731,8 @@ paths: description: Unknown parameters '403': description: Forbidden + '409': + description: Conflict tags: - jobs operationId: putApiV4JobsId @@ -67543,7 +67545,7 @@ definitions: description: Job token state: type: string - description: 'Job''s status: success, failed' + description: 'Job''s status: pending, running, success, failed' checksum: type: string description: Job's trace CRC32 checksum diff --git a/doc/development/cicd/_index.md b/doc/development/cicd/_index.md index e934c6c97399b2..2b91d6d3367d9d 100644 --- a/doc/development/cicd/_index.md +++ b/doc/development/cicd/_index.md @@ -60,7 +60,9 @@ Development guides that are specific to CI/CD are listed here: - [The CI configuration guide](configuration.md) - [The CI schema guide](schema.md) - If you are making a change to core CI/CD process such as linting or pipeline creation, refer to the - [CI/CD testing guide](testing.md) + [CI/CD testing guide](testing.md) +- If you are working with runner job execution workflows, see the [Two-phase commit for CI/CD jobs](two_phase_job_commit.md) + guide. See the [CI/CD YAML reference documentation guide](cicd_reference_documentation_guide.md) to learn how to update the [CI/CD YAML syntax reference page](../../ci/yaml/_index.md). @@ -138,11 +140,19 @@ connected to the GitLab instance. These can be instance runners, group runners, The communication between runners and the Rails server occurs through a set of API endpoints, grouped as the `Runner API Gateway`. -We can register, delete, and verify runners, which also causes read/write queries to the database. After a runner is connected, -it keeps asking for the next job to execute. This invokes the [`RegisterJobService`](https://gitlab.com/gitlab-org/gitlab/-/blob/master/app/services/ci/register_job_service.rb) -which picks the next job and assigns it to the runner. At this point the job transitions to a -`running` state, which again triggers `ProcessPipelineService` due to the status change. -For more details read [Job scheduling](#job-scheduling)). +We can register, delete, and verify runners, which also causes read/write queries to the database. After a runner is +connected, it keeps asking for the next job to execute. +This invokes the [`RegisterJobService`](https://gitlab.com/gitlab-org/gitlab/-/blob/master/app/services/ci/register_job_service.rb) +which picks the next job and assigns it to the runner. + +**Job state transitions depend on runner capabilities:** + +- **If the runner supports `two_phase_job_commit` feature**: The job remains in `pending` state while the runner prepares, + then transitions to `running` when the runner signals readiness through `PUT /jobs/:id`. +- **Otherwise**: The job transitions to a `running` state, which again triggers + `ProcessPipelineService` due to the status change. + +For more details, see [Job scheduling](#job-scheduling) and [Two-phase commit for CI/CD jobs](two_phase_job_commit.md). While a job is being executed, the runner sends logs back to the server as well any possible artifacts that must be stored. Also, a job may depend on artifacts from previous jobs to run. In this @@ -178,6 +188,27 @@ A job with the `created` state isn't seen by the runner yet. To make it possible When the runner is connected, it requests the next `pending` job to run by polling the server continuously. +### Two-phase commit workflow + +Since 18.5, GitLab supports a two-phase commit workflow for job execution that improves timing accuracy and reliability. +This feature allows runners to: + +1. **Request and receive a job** while keeping it in `pending` state during preparation. +1. **Signal readiness** to transition the job to `running` state when actually ready to execute. + +For runners that support the two-phase commit feature (indicated by `two_phase_job_commit: true` in their capabilities), +the job assignment process works as follows: + +- **Phase 1**: Runner requests a job → job assigned to runner but remains `pending`, removed from `ci_pending_builds` + and tracked in Redis cache. +- **Phase 2**: Runner completes preparation → signals acceptance → job transitions to `running` state. + +For legacy runners, the traditional workflow continues unchanged: job assignment immediately transitions the job to +`running` state. + +This two-phase approach ensures that only actual execution time is counted, improving compute minute accuracy and +reducing issues with jobs getting stuck during runner preparation. + {{< alert type="note" >}} API endpoints used by the runner to interact with GitLab are defined in [`lib/api/ci/runner.rb`](https://gitlab.com/gitlab-org/gitlab/-/blob/master/lib/api/ci/runner.rb) @@ -206,9 +237,30 @@ This API endpoint runs [`Ci::RegisterJobService`](https://gitlab.com/gitlab-org/ 1. Assigns it to the runner 1. Presents it to the runner via the API response +#### Job status updates + +Runners communicate job status changes using the `PUT /api/v4/jobs/:id` endpoint. This endpoint supports different +workflows depending on runner capabilities: + +**For runners with two-phase commit support:** + +- `state=pending`: Keep-alive signal during preparation (returns `200 OK`). +- `state=running`: Runner ready to start execution (transitions job to running, returns `200 OK`). +- Other states: Regular job completion (`success`, `failed`, etc.) + +**Error responses:** + +- `409 Conflict`: Job not in expected state for the requested transition. +- `400 Bad Request`: Invalid parameters or state. + +**For legacy runners:** + +- Job status updates work as before, with immediate state transitions. + ### `Ci::RegisterJobService` -There are 3 top level queries that this service uses to gather the majority of the jobs and they are selected based on the level where the runner is registered to: +This service uses 3 top level queries to gather the majority of the jobs and they are selected based on the level +where the runner is registered to: - Select jobs for instance runner (instance-wide) - Uses a fair scheduling algorithm which prioritizes projects with fewer running builds @@ -230,6 +282,26 @@ At this point we loop through remaining `pending` jobs and we try to assign the As we increase the number of runners in the pool we also increase the chances of conflicts which would arise if assigning the same job to different runners. To prevent that we gracefully rescue conflict errors and assign the next job in the list. +#### Job assignment behavior + +The service handles job assignment differently based on runner capabilities: + +**For runners supporting two-phase commit** (indicated by `two_phase_job_commit: true` in runner features): + +1. Job is assigned to the runner but remains in `pending` state. +1. Job is removed from `ci_pending_builds` table to prevent assignment to other runners. +1. Job association with runner manager is stored in Redis cache. +1. Runner can send keep-alive signals and eventually transition the job to `running`. + +**For legacy runners**: + +1. Job is assigned to the runner and immediately transitions to `running` state. +1. Job is moved from `ci_pending_builds` to `ci_running_builds`. +1. Traditional workflow continues unchanged. + +This dual approach ensures backward compatibility while enabling improved timing accuracy for runners that support the +two-phase job commit feature. + ### Dropping stuck builds There are two ways of marking builds as "stuck" and drop them. diff --git a/doc/development/cicd/two_phase_job_commit.md b/doc/development/cicd/two_phase_job_commit.md new file mode 100644 index 00000000000000..90363c164039d1 --- /dev/null +++ b/doc/development/cicd/two_phase_job_commit.md @@ -0,0 +1,169 @@ +--- +stage: Verify +group: Pipeline Execution +info: Any user with at least the Maintainer role can merge updates to this content. For details, see https://docs.gitlab.com/development/development_processes/#development-guidelines-review. +title: Two-phase commit for CI/CD jobs +--- + +The two-phase commit feature introduces a new workflow for GitLab Runner job execution that addresses timing accuracy and reliability issues in the current job assignment process. + +## Problem statement + +In the current workflow, when a GitLab Runner requests a job, the job is immediately transitioned from `pending` to `running` state, even though the runner may still be performing preparation tasks such as: + +- Finding or provisioning execution capacity +- Setting up the execution environment +- Downloading dependencies + +This immediate transition can lead to: + +1. **Misleading job timing**: Preparation time is counted as execution time +1. **Compute minute accuracy issues**: Preparation time may be counted toward compute minutes +1. **Job assignment reliability**: Jobs assigned to runners that go offline during preparation remain stuck until + timeout + +## Solution: Two-phase commit + +The two-phase commit feature introduces a new workflow where: + +1. **Phase 1 - Job Assignment**: Runner requests and receives a job, but the job remains in `pending` state +1. **Phase 2 - Job Acceptance**: Runner completes preparation and explicitly signals readiness to start execution + +### Workflow comparison + +#### Current workflow (legacy) + +1. Job created → pending state, added to `ci_pending_builds` +1. Runner requests job → job assigned to runner, transitioned to running, moved from `ci_pending_builds` to `ci_running_builds` +1. Runner executes job + +#### New workflow (two-phase commit) + +1. Job created → pending state, added to `ci_pending_builds` +1. Runner requests job → job assigned to runner, remains pending, removed from `ci_pending_builds` and added to Redis cache +1. Runner performs preparation tasks +1. Runner sends keep-alive signals (optional) and GitLab checks against Redis cache +1. Runner signals acceptance → job transitioned to running (entry created in `ci_running_builds`) +1. Runner executes job + +## Implementation details + +### Runner feature detection + +Runners that support two-phase commit must include the feature in their capabilities: + +```json +{ + "info": { + "features": { + "two_phase_job_commit": true + } + } +} +``` + +### API endpoints + +#### Job request (modified) + +- **Endpoint**: `POST /api/v4/jobs/request` +- **Behavior**: + - For runners with `two_phase_job_commit`: Job assigned but remains `pending` + - For legacy runners: Job assigned and transitioned to `running` (unchanged) + +#### Job status update (modified) + +- **Endpoint**: `PUT /api/v4/jobs/:id` +- **Parameters**: + - `token`: Job authentication token + - `state`: Job state - for two-phase commit workflow, supports `pending` (keep-alive) and `running` (ready to start) +- **Behavior**: + - `state=pending`: Keep-alive signal during preparation (returns `200 OK`) + - `state=running`: Runner ready to start execution (transitions job to running) + - Other states: Regular job completion (success, failed, etc.) +- **Responses**: + - `200 OK`: Status updated successfully + - `409 Conflict`: Job not in expected state for running transition + - `400 Bad Request`: Invalid parameters + +### Database changes + +No database schema changes are required. The implementation uses existing fields and state transitions. + +### State management + +#### Job states + +- **pending**: Job is waiting for runner or runner is preparing +- **running**: Job is actively being executed by runner +- **Other states**: Unchanged + +#### Queue management + +- Jobs are removed from `ci_pending_builds` when assigned to a runner (both workflows) +- For two-phase commit: Job remains `pending` until runner acceptance or a timeout +- For legacy: Job transitions to `running` immediately + +## Backward compatibility + +The feature is fully backward compatible: + +- Legacy runners continue to work with the existing workflow +- New runners can opt-in to two-phase commit by declaring the feature +- No changes required for existing GitLab installations + +### GitLab configuration + +No configuration changes are required. The feature is automatically available when runners declare support for it. + +## Monitoring and observability + +### Metrics + +- Existing job timing metrics remain unchanged +- New metrics may be added for preparation time tracking +- Runner heartbeat metrics continue to work + +### Logging + +- Job assignment events are logged with two-phase commit context +- Runner provisioning status updates are logged +- Existing job execution logs are unchanged + +## Testing + +The feature includes comprehensive test coverage: + +- Unit tests for service logic +- API endpoint tests +- Integration tests for full workflow +- Backward compatibility tests + +## Future enhancements + +While not implemented in the initial version, the two-phase commit foundation enables: + +1. **Job Declination**: Runners could decline jobs they cannot execute +1. **Timeout Handling**: Automatic job reassignment if runners don't accept within a timeout +1. **Smart Routing**: Router daemons could use the two-phase commit for more efficient job distribution +1. **Preparation Time Tracking**: Separate tracking of preparation vs. execution time + +## Security considerations + +- Job tokens remain valid during the preparation phase +- Runner authentication is required for all provisioning status updates +- No additional security risks are introduced + +## Performance impact + +- Minimal performance impact on existing workflows +- Slight reduction in database load due to fewer state transitions for two-phase commit jobs +- No impact on job execution performance + +## Migration path + +1. **Phase 1**: Deploy GitLab with two-phase commit support (backward compatible) +1. **Phase 2**: Update runners to support two-phase commit feature +1. **Phase 3**: Monitor and optimize based on usage patterns + +No forced migration is required - runners can adopt the feature at their own pace. diff --git a/lib/api/ci/runner.rb b/lib/api/ci/runner.rb index ce7c3f3bd8ee22..a64c00c979aa3e 100644 --- a/lib/api/ci/runner.rb +++ b/lib/api/ci/runner.rb @@ -230,12 +230,13 @@ class Runner < ::API::Base http_codes [[200, 'Job was updated'], [202, 'Update accepted'], [400, 'Unknown parameters'], - [403, 'Forbidden']] + [403, 'Forbidden'], + [409, 'Conflict']] end params do requires :token, type: String, desc: 'Job token' requires :id, type: Integer, desc: "Job's ID" - optional :state, type: String, desc: "Job's status: success, failed" + optional :state, type: String, desc: "Job's status: pending, running, success, failed" optional :checksum, type: String, desc: "Job's trace CRC32 checksum" optional :failure_reason, type: String, desc: "Job's failure_reason" optional :output, type: Hash, desc: 'Build log state' do @@ -249,8 +250,8 @@ class Runner < ::API::Base Gitlab::Metrics.add_event(:update_build) - service = ::Ci::UpdateBuildStateService - .new(job, declared_params(include_missing: false)) + # Handle job state updates through the service (including two-phase commit workflow) + service = ::Ci::UpdateBuildStateService.new(job, declared_params(include_missing: false)) service.execute.then do |result| track_ci_minutes_usage!(job) diff --git a/lib/gitlab/ci/build/auto_retry.rb b/lib/gitlab/ci/build/auto_retry.rb index 5420cb499f4477..6e3311ce34a879 100644 --- a/lib/gitlab/ci/build/auto_retry.rb +++ b/lib/gitlab/ci/build/auto_retry.rb @@ -4,7 +4,8 @@ class Gitlab::Ci::Build::AutoRetry include Gitlab::Utils::StrongMemoize DEFAULT_RETRIES = { - scheduler_failure: 2 + scheduler_failure: 2, + runner_provisioning_timeout: 2 }.freeze RETRY_OVERRIDES = { diff --git a/lib/gitlab/ci/queue/metrics.rb b/lib/gitlab/ci/queue/metrics.rb index 1762b06fa5c2df..92033074952014 100644 --- a/lib/gitlab/ci/queue/metrics.rb +++ b/lib/gitlab/ci/queue/metrics.rb @@ -33,6 +33,9 @@ class Metrics :runner_pre_assign_checks_failed, :runner_pre_assign_checks_success, :runner_queue_tick, + :runner_queue_timeout, + :runner_assigned_waiting, + :runner_assigned_run, :shared_runner_build_new, :shared_runner_build_done ].to_set.freeze diff --git a/lib/gitlab/ci/status/build/failed.rb b/lib/gitlab/ci/status/build/failed.rb index 55c989d78914df..a51e8445edcfc9 100644 --- a/lib/gitlab/ci/status/build/failed.rb +++ b/lib/gitlab/ci/status/build/failed.rb @@ -43,6 +43,7 @@ class Failed < Status::Extended deployment_rejected: 'deployment rejected', ip_restriction_failure: 'IP address restriction failure', failed_outdated_deployment_job: 'failed outdated deployment job', + runner_provisioning_timeout: 'runner provisioning timeout', reached_downstream_pipeline_trigger_rate_limit: 'Too many downstream pipelines triggered in the last minute. Try again later.' }.freeze # rubocop: enable Layout/LineLength diff --git a/spec/factories/ci/builds.rb b/spec/factories/ci/builds.rb index 3eb116234a48c8..514d2e71c5c120 100644 --- a/spec/factories/ci/builds.rb +++ b/spec/factories/ci/builds.rb @@ -65,6 +65,30 @@ end end + trait :waiting_for_runner_ack do + pending + + transient do + ack_runner_manager { nil } + end + + after(:create) do |build, evaluator| + # Use provided runner_manager or create one + runner_manager = + evaluator.ack_runner_manager || begin + # Ensure build has a runner + runner = build.runner || create(:ci_runner, :with_runner_manager) + build.update!(runner: runner) unless build.runner + + # Create a runner_manager for the runner, if needed + runner.runner_managers.first || create(:ci_runner_machine, runner: runner) + end + + # Set waiting for runner ack + build.set_waiting_for_runner_ack(runner_manager.id) + end + end + trait :with_build_name do after(:create) do |build, _| create(:ci_build_name, build: build) diff --git a/spec/factories/ci/runner_machines.rb b/spec/factories/ci/runner_machines.rb index 2ac737a327ab65..674593569f7b59 100644 --- a/spec/factories/ci/runner_machines.rb +++ b/spec/factories/ci/runner_machines.rb @@ -49,5 +49,9 @@ trait :cancel_gracefully_feature do runtime_features { { 'cancel_gracefully' => true } } end + + trait :two_phase_job_commit_feature do + runtime_features { { 'two_phase_job_commit' => true } } + end end end diff --git a/spec/lib/gitlab/ci/build/auto_retry_spec.rb b/spec/lib/gitlab/ci/build/auto_retry_spec.rb index b70c496fad9242..1edd49fb73e412 100644 --- a/spec/lib/gitlab/ci/build/auto_retry_spec.rb +++ b/spec/lib/gitlab/ci/build/auto_retry_spec.rb @@ -23,6 +23,8 @@ "not matching with always" | 0 | { when: %w[always], max: 2 } | :api_failure | nil | true "not matching reason" | 0 | { when: %w[script_error], max: 2 } | :api_failure | nil | false "scheduler failure override" | 1 | { when: %w[scheduler_failure], max: 1 } | :scheduler_failure | nil | false + "runner provisioning timeout override" | 1 | { when: %w[runner_provisioning_timeout], max: 1 } | + :runner_provisioning_timeout | nil | false # retry:exit_codes "matching exit code" | 0 | { exit_codes: [255, 137], max: 2 } | nil | 137 | true "matching exit code simple" | 0 | { exit_codes: [255], max: 2 } | nil | 255 | true @@ -37,6 +39,7 @@ "not matching EC & FR" | 0 | { exit_codes: [1], when: %w[script_failure], max: 2 } | :api_failure | 137 | false # other "default for scheduler failure" | 1 | {} | :scheduler_failure | nil | true + "default for runner provisioning timeout" | 1 | {} | :runner_provisioning_timeout | nil | true "quota is exceeded" | 0 | { max: 2 } | :ci_quota_exceeded | nil | false "no matching runner" | 0 | { max: 2 } | :no_matching_runner | nil | false "missing dependencies" | 0 | { max: 2 } | :missing_dependency_failure | nil | false diff --git a/spec/models/ci/build_two_phase_commit_spec.rb b/spec/models/ci/build_two_phase_commit_spec.rb new file mode 100644 index 00000000000000..3d0106bec2960b --- /dev/null +++ b/spec/models/ci/build_two_phase_commit_spec.rb @@ -0,0 +1,422 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Ci::Build, 'two_phase_job_commit runner feature support', :clean_gitlab_redis_cache, + feature_category: :continuous_integration do + let_it_be(:runner, freeze: true) { create(:ci_runner) } + let_it_be(:runner_manager, freeze: true) { create(:ci_runner_machine, runner: runner) } + let_it_be(:project, freeze: true) { create(:project, :repository) } + let_it_be(:pipeline, freeze: true) { create(:ci_pipeline, project: project) } + let_it_be(:redis_klass) { Gitlab::Redis::SharedState } + + let(:build) { create(:ci_build, pipeline: pipeline) } + + describe '#waiting_for_runner_ack?' do + subject { build.waiting_for_runner_ack? } + + context 'when build is not pending' do + let(:build) { create(:ci_build, :running, pipeline: pipeline, runner: runner) } + + it { is_expected.to be false } + end + + context 'when build is pending but has no runner' do + let(:build) { create(:ci_build, :pending, pipeline: pipeline) } + + it { is_expected.to be false } + end + + context 'when build is pending with runner but no runner manager waiting for ack' do + let(:build) { create(:ci_build, :pending, pipeline: pipeline, runner: runner) } + + it { is_expected.to be false } + end + + context 'when build is pending with runner and runner manager waiting for ack' do + let(:build) { create(:ci_build, :waiting_for_runner_ack, pipeline: pipeline, runner: runner) } + + it { is_expected.to be true } + end + + context 'when build is in different states' do + %i[created preparing manual scheduled success failed canceled skipped].each do |status| + context "when build is #{status}" do + let(:build) { create(:ci_build, status, pipeline: pipeline, runner: runner) } + + before do + build.set_waiting_for_runner_ack(runner_manager.id) + end + + it { is_expected.to be false } + end + end + end + + context 'when allow_runner_job_acknowledgement feature flag is disabled' do + before do + stub_feature_flags(allow_runner_job_acknowledgement: false) + end + + context 'when build is pending with runner and runner manager waiting for ack' do + let(:build) { create(:ci_build, :waiting_for_runner_ack, pipeline: pipeline, runner: runner) } + + it 'returns true because Redis entry exists (edge case fix)' do + is_expected.to be true + end + end + + context 'when build is pending with runner but no runner manager waiting for ack' do + let(:build) { create(:ci_build, :pending, pipeline: pipeline, runner: runner) } + + it { is_expected.to be false } + end + end + end + + describe '#set_waiting_for_runner_ack' do + let(:runner_manager_id) { 123 } + + it 'stores the runner manager ID in Redis with expiry' do + build.set_waiting_for_runner_ack(runner_manager_id) + + # Verify the value is stored + expect(build.runner_manager_id_waiting_for_ack).to eq(runner_manager_id) + + # Verify the key has an expiry time set + ttl = runner_build_ack_queue_key_ttl + expect(ttl).to be > 0 + expect(ttl).to be <= described_class::RUNNER_ACK_QUEUE_EXPIRY_TIME + end + + it 'uses the correct expiry time' do + expect(described_class::RUNNER_ACK_QUEUE_EXPIRY_TIME).to eq(2.minutes) + end + end + + describe '#cancel_wait_for_runner_ack' do + let(:runner_manager_id) { 123 } + + before do + build.set_waiting_for_runner_ack(runner_manager_id) + end + + it 'removes the runner manager ID from Redis' do + expect do + build.cancel_wait_for_runner_ack + end.to change { build.runner_manager_id_waiting_for_ack }.from(runner_manager_id).to(nil) + end + end + + describe '#runner_manager_id_waiting_for_ack' do + subject(:runner_manager_id_waiting_for_ack) { build.runner_manager_id_waiting_for_ack } + + context 'when no runner manager is waiting for ack' do + it { is_expected.to be_nil } + end + + context 'when a runner manager is waiting for ack' do + let(:runner_manager_id) { 456 } + + before do + build.set_waiting_for_runner_ack(runner_manager_id) + end + + it { is_expected.to eq(runner_manager_id) } + end + + context 'when the Redis key has expired' do + let(:runner_manager_id) { 789 } + + before do + build.set_waiting_for_runner_ack(runner_manager_id) + + # Simulate expiry by manually deleting the key + redis_klass.with { |redis| redis.del(runner_build_ack_queue_key) } + end + + it { is_expected.to be_nil } + end + + context 'when allow_runner_job_acknowledgement feature flag is disabled' do + before do + stub_feature_flags(allow_runner_job_acknowledgement: false) + end + + context 'when no runner manager is waiting for ack' do + it { is_expected.to be_nil } + end + + context 'when a runner manager is waiting for ack' do + let(:runner_manager_id) { 456 } + + before do + build.set_waiting_for_runner_ack(runner_manager_id) + end + + it 'returns the value regardless of feature flag state' do + # Verify Redis has the value + expect(redis_klass.with { |redis| redis.get(runner_build_ack_queue_key)&.to_i }) + .to eq(runner_manager_id) + + expect(runner_manager_id_waiting_for_ack).to eq(runner_manager_id) + end + end + end + end + + describe 'state transition from pending to running' do + context 'when build is waiting for runner ack' do + let(:build) { create(:ci_build, :waiting_for_runner_ack, pipeline: pipeline, runner: runner) } + + it 'resets waiting for runner ack on transition to running' do + expect(build).to receive(:cancel_wait_for_runner_ack).and_call_original + + expect { build.run! }.to change { build.runner_manager_id_waiting_for_ack }.to(nil) + end + end + + context 'when build is not waiting for runner ack' do + let(:build) { create(:ci_build, :pending, pipeline: pipeline, runner: runner) } + + it 'still calls reset_waiting_for_runner_ack' do + expect(build).to receive(:cancel_wait_for_runner_ack).and_call_original + + build.run! + end + end + end + + describe 'integration with Redis' do + let(:runner_manager_id) { 999 } + + it 'can set, get, and delete values from Redis' do + # Initially no value + expect(build.runner_manager_id_waiting_for_ack).to be_nil + + # Set a value + build.set_waiting_for_runner_ack(runner_manager_id) + expect(build.runner_manager_id_waiting_for_ack).to eq(runner_manager_id) + + # Reset the value + build.cancel_wait_for_runner_ack + expect(build.runner_manager_id_waiting_for_ack).to be_nil + end + + it 'handles string to integer conversion correctly' do + build.set_waiting_for_runner_ack(runner_manager_id) + + # Verify that the value is stored as string but returned as integer + redis_klass.with do |redis| + stored_value = redis.get(runner_build_ack_queue_key) + expect(stored_value).to eq(runner_manager_id.to_s) + end + + expect(build.runner_manager_id_waiting_for_ack).to eq(runner_manager_id) + end + + it 'handles nil and zero values correctly' do + expect(build.runner_manager_id_waiting_for_ack).to be_nil + + # Test with nil (should not set anything) + build.set_waiting_for_runner_ack(nil) + expect(build.runner_manager_id_waiting_for_ack).to be_nil + + # Test with zero + build.set_waiting_for_runner_ack(0) + expect(build.runner_manager_id_waiting_for_ack).to be_zero + end + + it 'does not overwrite existing values' do + build.set_waiting_for_runner_ack(100) + expect(build.runner_manager_id_waiting_for_ack).to eq(100) + + expect { build.set_waiting_for_runner_ack(200) } + .not_to change { build.runner_manager_id_waiting_for_ack }.from(100) + end + end + + describe '#supported_runner?' do + subject(:supported_runner) { build.supported_runner?(features) } + + context 'when runner supports two_phase_job_commit' do + let(:features) { { two_phase_job_commit: true } } + + it 'returns true for runners with two_phase_job_commit feature' do + is_expected.to be true + end + end + + context 'when runner does not support two_phase_job_commit' do + let(:features) { { other_feature: true } } + + it 'returns true for runners without two_phase_job_commit feature' do + # two_phase_job_commit is not a required feature, so builds should work + # with both old and new runners + is_expected.to be true + end + end + + context 'when features is nil' do + let(:features) { nil } + + it 'returns true for legacy runners' do + is_expected.to be true + end + end + + context 'when features is empty' do + let(:features) { {} } + + it 'returns true for runners with no features' do + is_expected.to be true + end + end + + context 'with specific runner feature requirements' do + # This test ensures that two_phase_job_commit doesn't interfere with + # existing runner feature requirements + let(:build) do + create(:ci_build, pipeline: pipeline, options: { + artifacts: { + reports: { + junit: 'test-results.xml' + } + } + }) + end + + context 'when runner supports both required features and two_phase_job_commit' do + let(:features) do + { + upload_multiple_artifacts: true, + two_phase_job_commit: true + } + end + + it 'returns true' do + is_expected.to be true + end + end + + context 'when runner supports two_phase_job_commit but not required features' do + let(:features) do + { + two_phase_job_commit: true + # missing upload_multiple_artifacts + } + end + + it 'returns false due to missing required feature' do + is_expected.to be false + end + end + end + end + + describe 'RUNNER_ACK_QUEUE_EXPIRY_TIME constant' do + it 'is set to 2 minutes' do + expect(described_class::RUNNER_ACK_QUEUE_EXPIRY_TIME).to eq(2.minutes) + end + end + + describe 'edge cases' do + context 'when Redis is unavailable' do + let(:runner_manager_id) { 123 } + + before do + allow(redis_klass).to receive(:with).and_raise(Redis::CannotConnectError) + end + + it 'raises error from set_waiting_for_runner_ack' do + expect { build.set_waiting_for_runner_ack(runner_manager_id) }.to raise_error(Redis::CannotConnectError) + end + + it 'raises error from runner_manager_id_waiting_for_ack' do + expect { build.cancel_wait_for_runner_ack }.to raise_error(Redis::CannotConnectError) + end + + it 'raises error from runner_manager_id_waiting_for_ack' do + expect { build.runner_manager_id_waiting_for_ack }.to raise_error(Redis::CannotConnectError) + end + end + end + + describe '#heartbeat_runner_ack_wait' do + let(:build) { create(:ci_build, :pending, pipeline: pipeline) } + + subject(:heartbeat_runner_ack_wait) { build.heartbeat_runner_ack_wait(runner_manager_id) } + + context 'when runner_manager_id is present' do + let(:runner_manager_id) { 123 } + + before do + redis_klass.with do |redis| + redis.set(runner_build_ack_queue_key, runner_manager_id, + ex: described_class::RUNNER_ACK_QUEUE_EXPIRY_TIME - 10, nx: true) + end + end + + it 'updates the Redis cache entry with new TTL' do + redis_klass.with do |redis| + expect(redis).to receive(:set) + .with(runner_build_ack_queue_key, runner_manager_id, + ex: described_class::RUNNER_ACK_QUEUE_EXPIRY_TIME, nx: false) + .and_call_original + end + + expect do + heartbeat_runner_ack_wait + end + .to change { runner_build_ack_queue_key_ttl }.by_at_least(10) + .and not_change { build.runner_manager_id_waiting_for_ack }.from(runner_manager_id) + end + + context 'when Redis operation fails' do + before do + redis_klass.with do |redis| + allow(redis).to receive(:set).and_raise(Redis::BaseError, 'Connection failed') + end + end + + it 'raises error' do + expect { heartbeat_runner_ack_wait }.to raise_error(Redis::BaseError) + end + end + end + + context 'when runner_manager_id is nil' do + let(:runner_manager_id) { nil } + + it 'does not update Redis cache' do + redis_klass.with do |redis| + expect(redis).not_to receive(:set) + end + + heartbeat_runner_ack_wait + end + end + + context 'when runner_manager_id is empty string' do + let(:runner_manager_id) { '' } + + it 'does not update Redis cache' do + redis_klass.with do |redis| + expect(redis).not_to receive(:set) + end + + heartbeat_runner_ack_wait + end + end + end + + private + + def runner_build_ack_queue_key + build.send(:runner_build_ack_queue_key) + end + + def runner_build_ack_queue_key_ttl + redis_klass.with { |redis| redis.ttl(runner_build_ack_queue_key) } + end +end diff --git a/spec/models/ci/runner_manager_spec.rb b/spec/models/ci/runner_manager_spec.rb index 0d19dc3034615d..0235ff0005853f 100644 --- a/spec/models/ci/runner_manager_spec.rb +++ b/spec/models/ci/runner_manager_spec.rb @@ -765,4 +765,102 @@ def does_db_update it { is_expected.to be true } end end + + describe 'two_phase_job_commit runtime feature' do + let_it_be(:runner, freeze: true) { create(:ci_runner) } + let_it_be_with_reload(:runner_manager) { create(:ci_runner_machine, runner: runner) } + + describe 'runtime_features validation' do + it 'accepts two_phase_job_commit as a valid runtime feature' do + runner_manager.runtime_features = { two_phase_job_commit: true } + + expect(runner_manager).to be_valid + end + + it 'accepts two_phase_job_commit set to false' do + runner_manager.runtime_features = { two_phase_job_commit: false } + + expect(runner_manager).to be_valid + end + + it 'accepts multiple runtime features including two_phase_job_commit' do + runner_manager.runtime_features = { + two_phase_job_commit: true, + cancel_gracefully: true, + other_feature: false + } + + expect(runner_manager).to be_valid + end + + it 'rejects non-boolean values for two_phase_job_commit' do + runner_manager.runtime_features = { two_phase_job_commit: 'yes' } + + expect(runner_manager).not_to be_valid + expect(runner_manager.errors[:runtime_features]).to be_present + end + end + + describe 'heartbeat with two_phase_job_commit feature' do + it 'updates runtime_features with two_phase_job_commit' do + values = { + version: '16.0.0', + runtime_features: { two_phase_job_commit: true } + } + + expect { runner_manager.heartbeat(values) } + .to change { runner_manager.runtime_features } + .to('two_phase_job_commit' => true) + end + + it 'preserves other runtime features when updating two_phase_job_commit' do + runner_manager.update!(runtime_features: { cancel_gracefully: true }) + + values = { + version: '16.0.0', + runtime_features: { + cancel_gracefully: true, + two_phase_job_commit: true + } + } + + runner_manager.heartbeat(values) + + expect(runner_manager.runtime_features).to eq( + 'cancel_gracefully' => true, + 'two_phase_job_commit' => true + ) + end + end + + describe 'querying runners with two_phase_job_commit support' do + let_it_be(:runner_with_feature) do + create(:ci_runner_machine, :two_phase_job_commit_feature, runner: runner) + end + + let_it_be(:runner_without_feature) do + create(:ci_runner_machine, runner: create(:ci_runner), runtime_features: { other_feature: true }) + end + + let_it_be(:legacy_runner) do + create(:ci_runner_machine, runner: create(:ci_runner), runtime_features: {}) + end + + it 'can find runners with two_phase_job_commit support' do + runners_with_feature = described_class.where( + "runtime_features ->> 'two_phase_job_commit' = 'true'" + ) + + expect(runners_with_feature).to contain_exactly(runner_with_feature) + end + + it 'can find runners without two_phase_job_commit support' do + runners_without_feature = described_class.where( + "runtime_features ->> 'two_phase_job_commit' IS NULL OR runtime_features ->> 'two_phase_job_commit' = 'false'" + ) + + expect(runners_without_feature).to contain_exactly(runner_manager, runner_without_feature, legacy_runner) + end + end + end end diff --git a/spec/models/commit_status_spec.rb b/spec/models/commit_status_spec.rb index 7f964a4288d6b2..d466146e4c96d9 100644 --- a/spec/models/commit_status_spec.rb +++ b/spec/models/commit_status_spec.rb @@ -964,6 +964,7 @@ def create_status(**opts) :api_failure | true :stuck_or_timeout_failure | true :runner_system_failure | true + :runner_provisioning_timeout | true end with_them do diff --git a/spec/requests/api/ci/runner_job_confirmation_integration_spec.rb b/spec/requests/api/ci/runner_job_confirmation_integration_spec.rb new file mode 100644 index 00000000000000..c4fd5dfce56405 --- /dev/null +++ b/spec/requests/api/ci/runner_job_confirmation_integration_spec.rb @@ -0,0 +1,324 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe 'Job confirmation integration', :freeze_time, :clean_gitlab_redis_cache, feature_category: :continuous_integration do + let_it_be(:project, freeze: true) { create(:project) } + let_it_be(:pipeline, freeze: true) { create(:ci_pipeline, project: project) } + let_it_be(:runner, freeze: true) { create(:ci_runner, :project, projects: [project]) } + let_it_be(:runner_manager, freeze: true) { create(:ci_runner_machine, runner: runner, system_xid: 'abc') } + + let!(:build) { create(:ci_build, :pending, :queued, pipeline: pipeline, created_at: 10.seconds.ago) } + + describe 'Full job confirmation workflow' do + shared_examples 'the legacy workflow (direct transition to running)' do + it 'follows the legacy workflow (direct transition to running)' do + travel 2.seconds + + # Step 1: Runner requests a job + post '/api/v4/jobs/request', params: runner_params + + expect(response).to have_gitlab_http_status(:created) + job_response = Gitlab::Json.parse(response.body) + job_id = job_response['id'] + job_token = job_response['token'] + + # Verify job transitioned directly to running (feature flag disabled) + job = Ci::Build.find(job_id) + expect(job).to be_running + expect(job).not_to be_waiting_for_runner_ack + expect(job.runner_id).to eq(runner.id) + expect(job.runner_manager).to eq(runner_manager) + expect(job.started_at).to be_present + expect(job.started_at - job.queued_at).to eq 2.seconds + + # Verify job is removed from pending builds queue + expect(Ci::PendingBuild.where(build: job)).to be_empty + + # Verify running build tracking entry was created + running_build = Ci::RunningBuild.find_by(build: job) + expect(running_build).to be_present + + # Step 2: Runner can update job status normally + travel 10.seconds + + put "/api/v4/jobs/#{job_id}", params: { + token: job_token, + state: 'success' + } + + expect(response).to have_gitlab_http_status(:ok) + + job.reload + expect(job).to be_success + expect(job.started_at - job.queued_at).to eq 2.seconds + expect(job.finished_at).to be_present + expect(job.duration).to eq 10.seconds + end + + it 'rejects two-phase commit workflow attempts' do + # Step 1: Runner requests a job + post '/api/v4/jobs/request', params: runner_params + + expect(response).to have_gitlab_http_status(:created) + job_response = Gitlab::Json.parse(response.body) + job_id = job_response['id'] + job_token = job_response['token'] + + # Job should be running already (feature flag disabled) + job = Ci::Build.find(job_id) + expect(job).to be_running + + # Step 2: Attempt to send keep-alive signals (should be rejected) + put "/api/v4/jobs/#{job_id}", params: { + token: job_token, + state: 'pending' + } + + # Should return 400 because job is already running + expect(response).to have_gitlab_http_status(:bad_request) + end + end + + context 'with runner supporting two_phase_job_commit' do + let(:runner_params) do + { + token: runner.token, + system_id: runner_manager.system_xid, + info: { + features: { + two_phase_job_commit: true + } + } + } + end + + it 'follows the complete job confirmation workflow' do + # Step 1: Runner requests a job + post '/api/v4/jobs/request', params: runner_params + + expect(response).to have_gitlab_http_status(:created) + job_response = Gitlab::Json.parse(response.body) + job_id = job_response['id'] + job_token = job_response['token'] + + # Verify job is assigned to runner but still pending + job = Ci::Build.find(job_id) + expect(job).to be_pending + expect(job).to be_waiting_for_runner_ack + expect(job.runner_id).to eq(runner.id) + expect(job.runner_manager).to be_nil + expect(job.runner_manager_id_waiting_for_ack).to eq(runner_manager.id) + expect(job.queued_at - job.created_at).to eq 10.seconds + + # Verify job is removed from pending builds queue + expect(Ci::PendingBuild.where(build: job)).to be_empty + + # Step 2: Runner sends keep-alive signals using PUT /jobs/:id with state=pending + 3.times do + travel 2.seconds + + put "/api/v4/jobs/#{job_id}", params: { + token: job_token, + state: 'pending' + } + + expect(response).to have_gitlab_http_status(:ok) + expect(response.body).to eq('200') + + # Job should still be pending + job.reload + expect(job).to be_pending + expect(job.runner_manager_id_waiting_for_ack).to eq(runner_manager.id) + end + + # Step 3: Runner accepts the job using PUT /jobs/:id with state=running + put "/api/v4/jobs/#{job_id}", params: { + token: job_token, + state: 'running' + } + + expect(response).to have_gitlab_http_status(:ok) + expect(response.body).to eq('200') + + # Verify job transitioned to running + job.reload + expect(job).to be_running + expect(job).not_to be_waiting_for_runner_ack + expect(job.started_at).to be_present + expect(job.started_at - job.queued_at).to eq 6.seconds + expect(job.runner_manager_id_waiting_for_ack).to be_nil + + # Verify running build tracking entry was created + running_build = Ci::RunningBuild.find_by(build: job) + expect(running_build).to be_present + expect(running_build.runner_id).to eq(runner.id) + + # Step 4: Runner can now update job status normally using PUT /jobs/:id + travel 5.minutes + + put "/api/v4/jobs/#{job_id}", params: { + token: job_token, + state: 'success' + } + + expect(response).to have_gitlab_http_status(:ok) + + job.reload + expect(job).to be_success + expect(job.finished_at).to be_present + expect(job.duration).to eq 5.minutes + end + + it 'prevents other runners from picking up assigned job' do + # Step 1: First runner requests a job + post '/api/v4/jobs/request', params: runner_params + + expect(response).to have_gitlab_http_status(:created) + job_response = Gitlab::Json.parse(response.body) + job_id = job_response['id'] + + # Verify job is assigned and removed from queue + job = Ci::Build.find(job_id) + expect(job).to be_pending + expect(job.runner_id).to eq(runner.id) + expect(Ci::PendingBuild.where(build: job)).to be_empty + + # Step 2: Second runner tries to request a job + other_runner = create(:ci_runner, :project, projects: [project]) + post '/api/v4/jobs/request', params: { + token: other_runner.token, + info: { features: { two_phase_job_commit: true } } + } + + # Should not get the already assigned job + expect(response).to have_gitlab_http_status(:no_content) + end + + context 'when allow_runner_job_acknowledgement feature flag is disabled' do + before do + stub_feature_flags(allow_runner_job_acknowledgement: false) + end + + it_behaves_like 'the legacy workflow (direct transition to running)' + end + end + + context 'with feature flag toggle during two-phase commit' do + context 'when feature flag is disabled after job pickup' do + it 'completes acknowledgment flow despite FF being disabled' do + # Step 1: Runner picks up job with two-phase commit + post api('/jobs/request'), params: { + token: runner.token, + info: { features: { two_phase_job_commit: true } } + } + + expect(response).to have_gitlab_http_status(:created) + expect(json_response['id']).to eq(build.id) + + # Verify job is in waiting state + build.reload + expect(build).to be_pending + expect(build).to be_waiting_for_runner_ack + + # Step 2: Disable feature flag mid-flight + stub_feature_flags(allow_runner_job_acknowledgement: false) + + # Step 3: Runner sends acknowledgment + put api("/jobs/#{build.id}"), params: { + token: build.token, + state: 'running' + } + + # Should still process the acknowledgment + expect(response).to have_gitlab_http_status(:ok) + expect(build.reload).to be_running + + # Verify Redis state is cleaned up + ::Gitlab::Redis::SharedState.with do |redis| + expect(redis.exists(runner_build_ack_queue_key)).to be_zero + end + end + + context 'with multiple runners' do + let(:runner2) { create(:ci_runner, :project, projects: [project]) } + + it 'prevents race conditions when FF toggled' do + # Runner 1 picks up job with FF enabled + post api('/jobs/request'), params: { + token: runner.token, + info: { features: { two_phase_job_commit: true } } + } + expect(response).to have_gitlab_http_status(:created) + + # Disable FF + stub_feature_flags(allow_runner_job_acknowledgement: false) + + # Runner 2 should not see the job + post api('/jobs/request'), params: { + token: runner2.token + } + expect(response).to have_gitlab_http_status(:no_content) + + # Runner 1 acknowledges + put api("/jobs/#{build.id}"), params: { + token: build.token, + state: 'running' + } + expect(response).to have_gitlab_http_status(:ok) + + # Job should be running with runner 1 + expect(build.reload.runner).to eq(runner) + end + end + end + + context 'when feature flag is re-enabled during acknowledgment' do + it 'maintains consistent behavior' do + # Start with FF disabled + stub_feature_flags(allow_runner_job_acknowledgement: false) + + # Pick up job normally (no two-phase commit) + post api('/jobs/request'), params: { + token: runner.token + } + expect(response).to have_gitlab_http_status(:created) + expect(build.reload).to be_running + + # Enable FF + stub_feature_flags(allow_runner_job_acknowledgement: true) + + # Status update should work normally + put api("/jobs/#{build.id}"), params: { + token: build.token, + state: 'running', + checksum: 'abc123' + } + expect(response).to have_gitlab_http_status(:ok) + end + end + + private + + def runner_build_ack_queue_key + build.send(:runner_build_ack_queue_key) + end + end + + context 'with legacy runner (no two_phase_job_commit support)' do + let(:runner_params) do + { + token: runner.token, + system_id: runner_manager.system_xid, + info: { + features: { + other_feature: true + } + } + } + end + + it_behaves_like 'the legacy workflow (direct transition to running)' + end + end +end diff --git a/spec/requests/api/ci/runner_job_confirmation_spec.rb b/spec/requests/api/ci/runner_job_confirmation_spec.rb new file mode 100644 index 00000000000000..b9c1f987326816 --- /dev/null +++ b/spec/requests/api/ci/runner_job_confirmation_spec.rb @@ -0,0 +1,304 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe API::Ci::Runner, 'PUT /jobs/:id with job confirmation', feature_category: :continuous_integration do + let_it_be(:project, freeze: true) { create(:project, :repository) } + let_it_be(:pipeline, freeze: true) { create(:ci_pipeline, project: project) } + let_it_be(:runner, freeze: true) { create(:ci_runner, :project, projects: [project]) } + + let(:job_id) { job.id } + let(:job_token) { job.token } + + describe 'PUT /api/v4/jobs/:id', :clean_gitlab_redis_cache do + let(:api_url) { "/api/v4/jobs/#{job_id}" } + let(:params) { { token: job_token, state: state } } + + subject(:perform_request) do + put api_url, params: params + end + + context 'when job is in pending state and assigned to runner' do + context 'without two-phase commit' do + let_it_be_with_refind(:runner_manager) { create(:ci_runner_machine, runner: runner, system_xid: 'abc') } + + let!(:job) do + create(:ci_build, :pending, :queued, pipeline: pipeline, runner: runner, runner_manager: runner_manager) + end + + context 'when state is pending' do + let(:state) { 'pending' } + + it 'returns 403 Forbidden' do + perform_request + + expect(response).to have_gitlab_http_status(:forbidden) + expect(json_response['message']).to eq('403 Forbidden - Job is not processing on runner') + end + + it 'does not change job status' do + expect { perform_request } + .not_to change { job.reload.status } + + expect(job).to be_pending + end + + it 'does not update runner heartbeat', :clean_gitlab_redis_cache do + expect { perform_request } + .to not_change { runner.reload.contacted_at } + .and not_change { runner_manager.reload.contacted_at } + end + end + end + + context 'with two-phase commit', :aggregate_failures do + let_it_be_with_refind(:runner_manager) do + create(:ci_runner_machine, :two_phase_job_commit_feature, runner: runner, system_xid: 'def') + end + + let!(:job) { create(:ci_build, :waiting_for_runner_ack, pipeline: pipeline, runner: runner) } + + context 'when state is pending' do + let(:state) { 'pending' } + + it 'returns 200 OK' do + perform_request + + expect(response).to have_gitlab_http_status(:ok) + expect(response.body).to eq('200') + end + + it 'does not change job status' do + expect { perform_request } + .not_to change { job.reload.status } + + expect(job).to be_pending + end + + it 'updates runner heartbeat only' do + expect { perform_request } + .to change { runner.reload.contacted_at } + .and not_change { runner_manager.reload.contacted_at } + end + end + + context 'when state is running' do + let(:state) { 'running' } + + it 'transitions job to running state' do + expect(job.runner_id).to eq(runner.id) + + expect { perform_request } + .to change { job.reload.status }.from('pending').to('running') + .and change { job.reload.started_at } + .and change { job.reload.runner_manager }.from(nil).to(runner_manager) + + expect(response).to have_gitlab_http_status(:ok) + expect(response.body).to eq('200') + end + + it 'creates running build tracking entry' do + expect { perform_request }.to change { Ci::RunningBuild.count }.by(1) + + running_build = Ci::RunningBuild.last + expect(running_build.build_id).to eq(job.id) + expect(running_build.runner_id).to eq(runner.id) + end + + it 'sets started_at timestamp', :freeze_time do + perform_request + + expect(job.reload.started_at).to eq(Time.current) + end + + context 'when job has moved to running state' do + before do + job.run! + job.runner_manager = runner_manager + end + + it 'returns 200 OK' do + perform_request + + expect(response).to have_gitlab_http_status(:ok) + end + end + + context 'when job is not assigned to a runner' do + let!(:job) { create(:ci_build, :pending, pipeline: pipeline, runner: nil) } + + it 'returns 403 Forbidden' do + perform_request + + expect(response).to have_gitlab_http_status(:forbidden) + expect(json_response['message']).to eq('403 Forbidden - Job is not processing on runner') + end + end + end + + context 'when state is regular job completion state' do + let(:state) { 'success' } + + it 'returns 400 Bad Request' do + perform_request + + expect(response).to have_gitlab_http_status(:bad_request) + expect(job.reload).to be_pending + end + end + end + + context 'when allow_runner_job_acknowledgement feature flag is disabled' do + before do + stub_feature_flags(allow_runner_job_acknowledgement: false) + end + + # Even though the FF is disabled, we should still process waiting jobs regardless, + # so that they don't become zombies + context 'with two-phase commit setup', :aggregate_failures do + let_it_be_with_refind(:runner_manager) do + create(:ci_runner_machine, :two_phase_job_commit_feature, runner: runner, system_xid: 'def') + end + + let!(:job) { create(:ci_build, :waiting_for_runner_ack, pipeline: pipeline, runner: runner) } + + context 'when state is pending' do + let(:state) { 'pending' } + + it 'returns 200 OK' do + perform_request + + expect(response).to have_gitlab_http_status(:ok) + expect(response.body).to eq('200') + end + + it 'does not change job status' do + expect { perform_request }.not_to change { job.reload.status } + + expect(job).to be_pending + end + + it 'updates runner heartbeat only' do + expect { perform_request } + .to change { runner.reload.contacted_at } + .and not_change { runner_manager.reload.contacted_at } + end + end + + context 'when state is running' do + let(:state) { 'running' } + + it 'transitions job to running state' do + expect(job.runner_id).to eq(runner.id) + + expect { perform_request } + .to change { job.reload.status }.from('pending').to('running') + .and change { job.reload.started_at } + .and change { job.reload.runner_manager }.from(nil).to(runner_manager) + + expect(response).to have_gitlab_http_status(:ok) + expect(response.body).to eq('200') + end + + it 'creates running build tracking entry' do + expect { perform_request }.to change { Ci::RunningBuild.count }.by(1) + + running_build = Ci::RunningBuild.last + expect(running_build.build_id).to eq(job.id) + expect(running_build.runner_id).to eq(runner.id) + end + + it 'sets started_at timestamp', :freeze_time do + perform_request + + expect(job.reload.started_at).to eq(Time.current) + end + + context 'when job has moved to running state' do + before do + job.run! + job.runner_manager = runner_manager + end + + it 'returns 200 OK' do + perform_request + + expect(response).to have_gitlab_http_status(:ok) + end + end + + context 'when job is not assigned to a runner' do + let!(:job) { create(:ci_build, :pending, pipeline: pipeline, runner: nil) } + + it 'returns 403 Forbidden' do + perform_request + + expect(response).to have_gitlab_http_status(:forbidden) + expect(json_response['message']).to eq('403 Forbidden - Job is not processing on runner') + end + end + end + end + + context 'without two-phase commit' do + let_it_be_with_refind(:runner_manager) { create(:ci_runner_machine, runner: runner, system_xid: 'abc') } + + let!(:job) do + create(:ci_build, :pending, :queued, pipeline: pipeline, runner: runner, runner_manager: runner_manager) + end + + context 'when state is pending' do + let(:state) { 'pending' } + + it 'returns 403 Forbidden' do + perform_request + + expect(response).to have_gitlab_http_status(:forbidden) + expect(json_response['message']).to eq('403 Forbidden - Job is not processing on runner') + end + end + end + end + end + + context 'when token is invalid' do + let(:params) { { token: '*************', state: 'pending' } } + let!(:job) do + create(:ci_build, :pending, pipeline: pipeline, runner: runner, runner_manager: nil) + end + + it 'returns 403 Forbidden' do + perform_request + + expect(response).to have_gitlab_http_status(:forbidden) + end + end + + context 'when job does not exist' do + let(:job_id) { non_existing_record_id } + let(:state) { 'pending' } + let!(:job) do + create(:ci_build, :pending, pipeline: pipeline, runner: runner, runner_manager: nil) + end + + it 'returns 403 Forbidden' do + perform_request + + expect(response).to have_gitlab_http_status(:forbidden) + end + end + + context 'when token is missing' do + let(:params) { { state: 'pending' } } + let!(:job) do + create(:ci_build, :pending, pipeline: pipeline, runner: runner, runner_manager: nil) + end + + it 'returns 400 Bad Request' do + perform_request + + expect(response).to have_gitlab_http_status(:bad_request) + end + end + end +end diff --git a/spec/services/ci/cancel_pipeline_service_spec.rb b/spec/services/ci/cancel_pipeline_service_spec.rb index 64378cc1d8b723..ad5d7792beea53 100644 --- a/spec/services/ci/cancel_pipeline_service_spec.rb +++ b/spec/services/ci/cancel_pipeline_service_spec.rb @@ -229,7 +229,7 @@ described_class.new(pipeline: pipeline2, current_user: current_user).force_execute end - extra_update_queries = 5 # transition ... => :canceled, queue pop + extra_update_queries = 7 # transition ... => :canceled, queue pop extra_generic_commit_status_validation_queries = 2 # name_uniqueness_across_types expect(control2.count) diff --git a/spec/services/ci/register_job_service_two_phase_commit_spec.rb b/spec/services/ci/register_job_service_two_phase_commit_spec.rb new file mode 100644 index 00000000000000..a61ed5567e34e3 --- /dev/null +++ b/spec/services/ci/register_job_service_two_phase_commit_spec.rb @@ -0,0 +1,423 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Ci::RegisterJobService, 'two-phase commit feature', feature_category: :continuous_integration do + let_it_be(:project, freeze: true) { create(:project, :repository) } + let_it_be(:pipeline, freeze: true) { create(:ci_pipeline, project: project) } + let_it_be(:runner, freeze: true) { create(:ci_runner, :project, projects: [project]) } + let_it_be(:runner_manager, freeze: true) { create(:ci_runner_machine, runner: runner) } + + let(:service) { described_class.new(runner, runner_manager) } + + describe '#execute', :aggregate_failures do + let!(:build) { create(:ci_build, :pending, :queued, pipeline: pipeline) } + + subject(:execute) { service.execute(runner_params) } + + shared_examples 'the legacy workflow (direct transition to running)' do + it 'transitions job directly to running state (legacy behavior)' do + expect(build).to be_pending + expect(build.runner_id).to be_nil + + result = execute + + expect(result).to be_valid + expect(result.build).to eq(build) + + build.reload + expect(build).to be_running + expect(build.runner_id).to eq(runner.id) + expect(build.runner_manager).to eq(runner_manager) + end + + it 'removes job from pending builds queue' do + expect { execute }.to change { Ci::PendingBuild.where(build: build).count }.from(1).to(0) + end + + it 'creates running build tracking entry' do + expect { execute }.to change { Ci::RunningBuild.count }.by(1) + end + + it 'does not set waiting for runner ack' do + expect(execute).to be_valid + expect(build.reload).not_to be_waiting_for_runner_ack + end + + context 'when logger is enabled' do + before do + stub_const('Ci::RegisterJobService::Logger::MAX_DURATION', 0) + end + + it 'logs the instrumentation' do + expect(Gitlab::AppJsonLogger).to receive(:info).once.with( + hash_including( + class: 'Ci::RegisterJobService::Logger', + message: 'RegisterJobService exceeded maximum duration', + runner_id: runner.id, + runner_type: runner.runner_type, + assign_runner_run_duration_s: { count: 1, max: anything, sum: anything } + ) + ) + + execute + end + end + end + + context 'when runner supports two_phase_job_commit feature' do + let(:runner_params) do + { + info: { + features: { + two_phase_job_commit: true + } + } + } + end + + it 'assigns runner but keeps job in pending state' do + expect(build).to be_pending + expect(build.runner_id).to be_nil + + expect(::Ci::RetryStuckWaitingJobWorker).to receive(:perform_in) + .with(Ci::Build::RUNNER_ACK_QUEUE_EXPIRY_TIME, build.id) + + result = execute + + expect(result).to be_valid + expect(result.build).to eq(build) + + build.reload + expect(build).to be_pending + expect(build.runner_id).to eq(runner.id) + expect(build.runner_manager).to be_nil + end + + it 'removes job from pending builds queue' do + expect { execute }.to change { Ci::PendingBuild.where(build: build).count }.from(1).to(0) + end + + it 'does not create running build tracking entry' do + expect { execute }.not_to change { Ci::RunningBuild.count } + end + + context 'when logger is enabled' do + before do + stub_const('Ci::RegisterJobService::Logger::MAX_DURATION', 0) + end + + it 'logs the instrumentation' do + expect(Gitlab::AppJsonLogger).to receive(:info).once.with( + hash_including( + class: 'Ci::RegisterJobService::Logger', + message: 'RegisterJobService exceeded maximum duration', + runner_id: runner.id, + runner_type: runner.runner_type, + total_duration_s: anything, + process_queue_duration_s: anything, + retrieve_queue_duration_s: anything, + process_build_duration_s: { count: 1, max: anything, sum: anything }, + process_build_runner_matched_duration_s: { count: 1, max: anything, sum: anything }, + process_build_present_build_duration_s: { count: 1, max: anything, sum: anything }, + present_build_logs_duration_s: { count: 1, max: anything, sum: anything }, + present_build_response_json_duration_s: { count: 1, max: anything, sum: anything }, + process_build_assign_runner_duration_s: { count: 1, max: anything, sum: anything }, + assign_runner_waiting_duration_s: { count: 1, max: anything, sum: anything } + ) + ) + + execute + end + end + + context 'when allow_runner_job_acknowledgement feature flag is disabled' do + before do + stub_feature_flags(allow_runner_job_acknowledgement: false) + end + + it_behaves_like 'the legacy workflow (direct transition to running)' # despite two_phase_job_commit support + end + + context 'when operations fail during two-phase commit assignment' do + let(:redis_klass) { Gitlab::Redis::SharedState } + + context 'when Redis fails' do + before do + redis_klass.with do |redis| + allow(redis).to receive(:set).and_call_original + allow(redis).to receive(:set) + .with(runner_build_ack_queue_key, runner_manager.id, anything) + .and_raise(Redis::CannotConnectError) + end + end + + it 'rolls back runner assignment and Redis state' do + expect(Ci::RetryStuckWaitingJobWorker).not_to receive(:perform_in) + allow_next_instance_of(Gitlab::Ci::Queue::Metrics) do |metrics| + allow(metrics).to receive(:increment_queue_operation) + expect(metrics).not_to receive(:increment_queue_operation).with(:runner_assigned_waiting) + end + + expect { execute } + .to not_change { build.reload.queuing_entry }.from(an_instance_of(Ci::PendingBuild)) + .and not_change { build.reload.runner_id }.from(nil) + .and not_change { build.reload.status }.from('pending') + end + end + + context 'when build save fails' do + before do + allow_next_found_instance_of(Ci::Build) do |build| + allow(build).to receive(:save!).and_raise(ActiveRecord::RecordInvalid) + end + end + + it 'rolls back runner assignment and Redis state' do + expect(Ci::RetryStuckWaitingJobWorker).not_to receive(:perform_in) + allow_next_instance_of(Gitlab::Ci::Queue::Metrics) do |metrics| + allow(metrics).to receive(:increment_queue_operation) + expect(metrics).not_to receive(:increment_queue_operation).with(:runner_assigned_waiting) + end + + expect { execute } + .to not_change { build.reload.queuing_entry }.from(an_instance_of(Ci::PendingBuild)) + .and not_change { build.reload.runner_id }.from(nil) + .and not_change { build.reload.status }.from('pending') + .and not_change { build.reload.waiting_for_runner_ack? }.from(false) + .and not_change { redis_ack_pending_key_count }.from(0) + end + end + + context 'when queue removal fails' do + it 'rolls back runner assignment and Redis state' do + expect(Ci::RetryStuckWaitingJobWorker).not_to receive(:perform_in) + allow_next_instance_of(Gitlab::Ci::Queue::Metrics) do |metrics| + allow(metrics).to receive(:increment_queue_operation) + expect(metrics).not_to receive(:increment_queue_operation).with(:runner_assigned_waiting) + end + allow_next_instance_of(Ci::UpdateBuildQueueService) do |service| + expect(service).to receive(:remove!).with(build).and_raise(ActiveRecord::StatementInvalid) + end + + expect { execute } + .to not_change { build.reload.queuing_entry }.from(an_instance_of(Ci::PendingBuild)) + .and not_change { build.reload.runner_id }.from(nil) + .and not_change { build.reload.status }.from('pending') + .and not_change { build.reload.waiting_for_runner_ack? }.from(false) + .and not_change { redis_ack_pending_key_count }.from(0) + end + end + end + + private + + def runner_build_ack_queue_key + build.send(:runner_build_ack_queue_key) + end + + def redis_ack_pending_key_count + redis_klass.with do |redis| + redis.exists(runner_build_ack_queue_key) + end + end + end + + context 'when runner does not support two_phase_job_commit feature' do + let(:runner_params) do + { + info: { + features: { + other_feature: true + } + } + } + end + + it_behaves_like 'the legacy workflow (direct transition to running)' + end + + context 'when runner has no features specified' do + let(:runner_params) { { info: {} } } + + it_behaves_like 'the legacy workflow (direct transition to running)' + end + + context 'when two_phase_job_commit feature is explicitly disabled' do + let(:runner_params) do + { + info: { + features: { + two_phase_job_commit: false + } + } + } + end + + it_behaves_like 'the legacy workflow (direct transition to running)' + + context 'when allow_runner_job_acknowledgement feature flag is disabled' do + before do + stub_feature_flags(allow_runner_job_acknowledgement: false) + end + + it_behaves_like 'the legacy workflow (direct transition to running)' + end + end + end + + describe '#runner_supports_job_acknowledgment?' do + let(:build) { create(:ci_build, :pending, :queued, pipeline: pipeline) } + + subject { service.send(:runner_supports_job_acknowledgment?, build, params) } + + context 'when two_phase_job_commit feature is true' do + let(:params) { { info: { features: { two_phase_job_commit: true } } } } + + it { is_expected.to be true } + + context 'when allow_runner_job_acknowledgement feature flag is disabled' do + before do + stub_feature_flags(allow_runner_job_acknowledgement: false) + end + + it { is_expected.to be_falsey } + end + end + + context 'when two_phase_job_commit feature is false' do + let(:params) { { info: { features: { two_phase_job_commit: false } } } } + + it { is_expected.to be false } + end + + context 'when two_phase_job_commit feature is not specified' do + let(:params) { { info: { features: {} } } } + + it { is_expected.to be false } + end + + context 'when features are not specified' do + let(:params) { { info: {} } } + + it { is_expected.to be false } + end + + context 'when info is not specified' do + let(:params) { {} } + + it { is_expected.to be false } + end + end + + describe 'runner job acknowledgment support' do + let!(:pending_job) { create(:ci_build, :pending, :queued, pipeline: pipeline) } + + subject(:execute) { service.execute(params) } + + context 'when runner supports two-phase job commit' do + let(:params) do + { + info: { + features: { + two_phase_job_commit: true + } + } + } + end + + it 'assigns job to waiting state' do + expect(Ci::RetryStuckWaitingJobWorker).to receive(:perform_in) + .with(Ci::Build::RUNNER_ACK_QUEUE_EXPIRY_TIME, pending_job.id) + + result = execute + + expect(result).to be_valid + expect(result.build).to eq(pending_job) + expect(pending_job.reload).to be_pending + expect(pending_job.queuing_entry).to be_nil + expect(pending_job).to be_waiting_for_runner_ack + expect(pending_job.runner_manager_id_waiting_for_ack).to eq(runner_manager.id) + end + + it 'increments runner_assigned_waiting metric' do + expect_next_instance_of(Gitlab::Ci::Queue::Metrics) do |metrics| + allow(metrics).to receive(:increment_queue_operation) + expect(metrics).to receive(:increment_queue_operation).with(:runner_assigned_waiting) + end + + execute + end + + context 'when allow_runner_job_acknowledgement feature flag is disabled' do + before do + stub_feature_flags(allow_runner_job_acknowledgement: false) + end + + it 'assigns job to running state immediately' do + result = execute + + expect(result).to be_valid + expect(result.build).to eq(pending_job) + expect(pending_job.reload).to be_running + expect(pending_job).not_to be_waiting_for_runner_ack + expect(pending_job.runner_manager).to eq(runner_manager) + end + + it 'increments runner_assigned_run metric' do + expect_next_instance_of(Gitlab::Ci::Queue::Metrics) do |metrics| + allow(metrics).to receive(:increment_queue_operation) + expect(metrics).to receive(:increment_queue_operation).with(:runner_assigned_run) + end + + execute + end + end + end + + context 'when runner does not support two-phase job commit' do + let(:params) do + { + info: { + features: { + upload_multiple_artifacts: true + } + } + } + end + + it 'assigns job to running state immediately' do + result = execute + + expect(result).to be_valid + expect(result.build).to eq(pending_job) + expect(pending_job.reload).to be_running + expect(pending_job).not_to be_waiting_for_runner_ack + expect(pending_job.runner_manager).to eq(runner_manager) + end + + it 'increments runner_assigned_run metric' do + expect_next_instance_of(Gitlab::Ci::Queue::Metrics) do |metrics| + allow(metrics).to receive(:increment_queue_operation) + expect(metrics).to receive(:increment_queue_operation).with(:runner_assigned_run) + end + + execute + end + end + + context 'when params are missing info section' do + let(:params) { {} } + + it 'assigns job to running state immediately' do + result = execute + + expect(result).to be_valid + expect(result.build).to eq(pending_job) + expect(pending_job.reload).to be_running + expect(pending_job).not_to be_waiting_for_runner_ack + expect(pending_job.runner_manager).to eq(runner_manager) + end + end + end +end diff --git a/spec/services/ci/retry_waiting_job_service_spec.rb b/spec/services/ci/retry_waiting_job_service_spec.rb new file mode 100644 index 00000000000000..2104826a353a60 --- /dev/null +++ b/spec/services/ci/retry_waiting_job_service_spec.rb @@ -0,0 +1,211 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Ci::RetryWaitingJobService, :clean_gitlab_redis_shared_state, feature_category: :continuous_integration do + let_it_be(:user, freeze: true) { create(:user) } + let_it_be(:project, freeze: true) { create(:project, maintainers: [user]) } + let_it_be(:pipeline, freeze: true) { create(:ci_pipeline, project: project, user: user) } + let_it_be(:runner, freeze: true) { create(:ci_runner, :with_runner_manager) } + + let(:runner_manager_id) { runner.runner_managers.sole.id } + let(:redis_klass) { Gitlab::Redis::SharedState } + let(:metrics) { instance_double(Gitlab::Ci::Queue::Metrics) } + let(:service) { described_class.new(build, metrics) } + + describe '#execute' do + subject(:execute) { service.execute } + + before do + allow(metrics).to receive(:increment_queue_operation) + end + + shared_examples 'job is not in waiting state' do + it 'returns error response' do + expect(execute).to be_error + expect(execute.message).to eq('Job is not in waiting state') + expect(execute.payload[:reason]).to eq(:not_in_waiting_state) + end + + it 'increments runner_queue_timeout metric' do + expect(metrics).to receive(:increment_queue_operation).with(:runner_queue_timeout) + + execute + end + + it 'does not call RetryJobService' do + expect(Ci::RetryJobService).not_to receive(:new) + + execute + end + end + + context 'when build is nil' do + let(:build) { nil } + + it_behaves_like 'job is not in waiting state' + end + + context 'when build is not pending' do + let_it_be(:build, freeze: true) { create(:ci_build, :running, pipeline: pipeline, user: user) } + + it_behaves_like 'job is not in waiting state' + end + + context 'when build is retryable' do + let_it_be(:build, freeze: true) { create(:ci_build, :retryable, pipeline: pipeline, user: user) } + + it_behaves_like 'job is not in waiting state' + end + + context 'when build is waiting for runner ack' do + let_it_be_with_refind(:build) do + create(:ci_build, :waiting_for_runner_ack, pipeline: pipeline, runner: runner, user: user) + end + + it 'increments runner_queue_timeout metric' do + expect(metrics).to receive(:increment_queue_operation).with(:runner_queue_timeout) + + execute + end + + it 'drops the build with runner_provisioning_timeout reason' do + expect { execute }.to change { build.reload.status }.from('pending').to('failed') + expect(build.failure_reason).to eq('runner_provisioning_timeout') + end + + context 'when build is not retryable' do + context 'with build already failed' do + before do + build.drop! + end + + it_behaves_like 'job is not in waiting state' + end + + context 'with build redis ttl not yet elapsed' do + before do + build.set_waiting_for_runner_ack(runner_manager_id) + end + + it 'returns error response' do + expect(execute).to be_error + expect(execute.message).to eq('Job is not finished waiting') + expect(execute.payload[:reason]).to eq(:not_finished_waiting) + end + + it 'does not call RetryJobService' do + expect(Ci::RetryJobService).not_to receive(:new) + + execute + end + + it 'does not drop the build' do + expect { execute }.not_to change { build.reload.status }.from('pending') + end + end + end + + context 'when RetryJobService fails' do + before do + allow(build).to receive_messages(retryable?: true, retried?: false) + + allow_next_instance_of(Ci::RetryJobService) do |retry_service| + allow(retry_service).to receive(:execute).and_return(ServiceResponse.error(message: 'Retry failed')) + end + end + + it 'returns :not_auto_retryable error' do + result = execute + + expect(result).to be_error + expect(result.message).to eq('Job is not auto-retryable') + expect(execute.payload[:reason]).to eq(:not_auto_retryable) + end + + it 'still drops the build' do + expect { execute }.to change { build.reload.status }.from('pending').to('failed') + expect(build.failure_reason).to eq('runner_provisioning_timeout') + end + end + end + + describe 'integration with real RetryJobService', :aggregate_failures do + let!(:build) { create(:ci_build, :waiting_for_runner_ack, pipeline: pipeline, user: user, runner: runner) } + + before do + expire_redis_ttl(runner_build_ack_queue_key) + end + + specify { expect(build).not_to be_retryable } + + shared_examples 'job is retried with provisioning timeout' do + it 'increments runner_queue_timeout metric' do + expect(metrics).to receive(:increment_queue_operation).with(:runner_queue_timeout) + + execute + end + + it 'drops the job with runner_provisioning_timeout reason and retries it' do + expect { execute } + .to change { build.reload.status }.from('pending').to('failed') + .and change { Ci::Build.count }.by(1) + + expect(build.reload).to be_failed + expect(build).to be_retried + expect(build.failure_reason).to eq('runner_provisioning_timeout') + + new_build = Ci::Build.id_not_in(build.id).last + expect(new_build).to be_pending + expect(new_build.name).to eq(build.name) + expect(new_build.pipeline).to eq(pipeline) + end + end + + it_behaves_like 'job is retried with provisioning timeout' + + it 'retries the job using RetryJobService' do + expect(Ci::RetryJobService).to receive(:new).and_call_original + + execute + end + + context 'when auto_retry_allowed? is false' do + before do + allow(build).to receive(:auto_retry_allowed?).and_return(false) + end + + it 'returns error response' do + expect(execute).to be_error + expect(execute.message).to eq('Job is not auto-retryable') + expect(execute.payload[:reason]).to eq(:not_auto_retryable) + end + end + + context 'when retry count exceeds limit' do + before do + allow(build).to receive(:retries_count) + .and_return(Gitlab::Ci::Build::AutoRetry::DEFAULT_RETRIES[:runner_provisioning_timeout]) + end + + it 'returns error response' do + expect(execute).to be_error + expect(execute.message).to eq('Job is not auto-retryable') + expect(execute.payload[:reason]).to eq(:not_auto_retryable) + end + end + end + end + + private + + def runner_build_ack_queue_key + build.send(:runner_build_ack_queue_key) + end + + def expire_redis_ttl(cache_key) + redis_klass.with do |redis| + redis.del(cache_key) + end + end +end diff --git a/spec/services/ci/update_build_queue_service_spec.rb b/spec/services/ci/update_build_queue_service_spec.rb index 141caac7e05ffb..d319b9b8b2bfa1 100644 --- a/spec/services/ci/update_build_queue_service_spec.rb +++ b/spec/services/ci/update_build_queue_service_spec.rb @@ -76,6 +76,12 @@ expect(dequeued).to eq build.id end + it 'calls cancel_wait_for_runner_ack on build' do + expect(build).to receive(:cancel_wait_for_runner_ack) + + subject.pop(build, transition) + end + it 'increments queue pop metric' do metrics = spy('metrics') diff --git a/spec/services/ci/update_build_state_service_spec.rb b/spec/services/ci/update_build_state_service_spec.rb index b8defe3681288e..437ad7975d5717 100644 --- a/spec/services/ci/update_build_state_service_spec.rb +++ b/spec/services/ci/update_build_state_service_spec.rb @@ -3,8 +3,8 @@ require 'spec_helper' RSpec.describe Ci::UpdateBuildStateService, '#execute', feature_category: :continuous_integration do - let_it_be(:project) { create(:project) } - let_it_be(:pipeline) { create(:ci_pipeline, project: project) } + let_it_be(:project, freeze: true) { create(:project) } + let_it_be(:pipeline, freeze: true) { create(:ci_pipeline, project: project) } let(:build) { create(:ci_build, :running, pipeline: pipeline) } let(:metrics) { spy('metrics') } @@ -462,4 +462,275 @@ def execute_with_stubbed_metrics! .new(build, params, metrics) .execute end + + public + + describe 'runner acknowledgment workflow' do + let_it_be(:runner) { create(:ci_runner) } + let_it_be(:runner_manager) { create(:ci_runner_machine, runner: runner, system_xid: 'abc') } + + let(:redis_klass) { Gitlab::Redis::SharedState } + + context 'when build is waiting for runner acknowledgment', :clean_gitlab_redis_cache do + let(:build) do + create(:ci_build, :waiting_for_runner_ack, pipeline: pipeline, runner: runner, + ack_runner_manager: runner_manager) + end + + context 'when state is pending' do + let(:params) { { state: 'pending' } } + + context 'when build is not assigned to a runner manager' do + specify 'should not have runner manager assigned' do + expect(build.runner_manager).to be_nil + end + + it 'returns 200 OK status for keep-alive signal' do + result = execute + + expect(result.status).to eq 200 + expect(result.backoff).to be_nil + end + + it 'does not assign runner manager' do + expect { execute }.to not_change { build.reload.runner_manager }.from(nil) + end + + it 'updates runner manager heartbeat' do + expect(build).to receive(:heartbeat_runner_ack_wait).with(runner_manager.id) + + execute + end + + it 'resets ttl', :freeze_time do + service.execute + + consume_redis_ttl(runner_build_ack_queue_key) + + # Redis key TTL should increase by at least 1 second + expect { execute }.to change { redis_ttl(runner_build_ack_queue_key) }.by_at_least(1) + end + + it 'does not change build state' do + expect { execute }.not_to change { build.reload.status } + end + end + + context 'when build is already assigned to a runner manager (race condition)' do + before do + allow(build).to receive(:runner_manager).and_return(runner_manager) + end + + it 'returns 409 Conflict status' do + result = execute + + expect(result.status).to eq 409 + expect(result.backoff).to be_nil + end + + it 'does not change build state' do + expect { execute }.not_to change { build.reload.status } + end + + it 'does not reset ttl', :freeze_time do + service.execute + + consume_redis_ttl(runner_build_ack_queue_key) + + expect { execute }.not_to change { redis_ttl(runner_build_ack_queue_key) } + end + end + end + + context 'when state is running' do + let(:params) { { state: 'running' } } + + context 'when runner manager exists' do + it 'transitions job to running state and returns 200 OK' do + expect(build).to receive(:run!) + + result = execute + + expect(result.status).to eq 200 + expect(result.backoff).to be_nil + end + + it 'assigns the runner manager to the build' do + allow(build).to receive(:run!) + + expect { execute }.to change { build.reload.runner_manager }.from(nil).to(runner_manager) + end + end + + context 'when runner manager does not exist' do + let(:build) do + create(:ci_build, :pending, pipeline: pipeline, runner: runner).tap do |b| + b.set_waiting_for_runner_ack(non_existing_record_id) + end + end + + it 'returns 409 Conflict status' do + expect(execute.status).to eq 409 + expect(execute.backoff).to be_nil + end + + it 'does not change build state or runner manager' do + expect { execute }.to not_change { build.reload.status } + .and not_change { build.reload.runner_manager } + end + + it 'does not change ttl' do + expect { execute }.not_to change { redis_ttl(runner_build_ack_queue_key) } + end + + it 'does not transition build to running' do + expect(build).not_to receive(:run!) + + execute + end + end + end + + context 'when state is invalid for two-phase commit workflow' do + %w[success failed].each do |state| + context "when state is #{state}" do + let(:params) { { state: state } } + + it 'returns 400 Bad Request status' do + result = execute + + expect(result.status).to eq 400 + expect(result.backoff).to be_nil + end + + it 'does not change build state' do + expect { execute }.not_to change { build.reload.status } + end + + it 'does not change ttl' do + expect { execute }.not_to change { redis_ttl(runner_build_ack_queue_key) } + end + end + end + end + + context 'when handling edge cases in runner ack workflow' do + context 'when build state is empty' do + let(:params) { { state: '' } } + + it 'returns 400 Bad Request status' do + result = execute + + expect(result.status).to eq 400 + expect(result.backoff).to be_nil + end + end + + context 'when build state is nil' do + let(:params) { {} } + + it 'returns 400 Bad Request status' do + result = execute + + expect(result.status).to eq 400 + expect(result.backoff).to be_nil + end + end + end + end + + context 'when build is not waiting for runner acknowledgment' do + let(:build) { create(:ci_build, :running, runner: runner, runner_manager: runner_manager, pipeline: pipeline) } + let(:params) { { state: 'success' } } + + it 'skips runner ack workflow and proceeds with normal processing' do + expect(build).not_to receive(:heartbeat_runner_ack_wait) + expect(build).not_to receive(:run!) + expect(build).to receive(:success!) + expect(service).to receive(:accept_available?).and_call_original + + expect(execute.status).to eq 200 + end + end + + context 'when allow_runner_job_acknowledgement feature flag is disabled' do + before do + stub_feature_flags(allow_runner_job_acknowledgement: false) + end + + context 'when build would normally be waiting for runner acknowledgment', :clean_gitlab_redis_cache do + let(:build) do + create(:ci_build, :waiting_for_runner_ack, pipeline: pipeline, runner: runner, + ack_runner_manager: runner_manager) + end + + context 'when state is pending' do + let(:params) { { state: 'pending' } } + + it 'returns 200 OK despite feature flag' do + result = execute + + expect(result.status).to eq 200 + expect(result.backoff).to be_nil + end + + it 'does not change build state' do + expect { execute }.not_to change { build.reload.status } + end + + it 'updates runner manager heartbeat' do + expect(build).to receive(:heartbeat_runner_ack_wait).with(runner_manager.id) + + execute + end + end + + context 'when state is running' do + let(:params) { { state: 'running' } } + + it 'returns 200 OK despite feature flag' do + result = execute + + expect(result.status).to eq 200 + expect(result.backoff).to be_nil + end + + it 'changes build state to running' do + expect { execute }.to change { build.reload.status }.from('pending').to('running') + end + end + end + + context 'when build is not waiting for runner acknowledgment' do + let(:build) { create(:ci_build, :running, runner: runner, runner_manager: runner_manager, pipeline: pipeline) } + let(:params) { { state: 'success' } } + + it 'proceeds with normal processing' do + expect(build).not_to receive(:heartbeat_runner_ack_wait) + expect(build).to receive(:success!) + expect(service).to receive(:accept_available?).and_call_original + + expect(execute.status).to eq 200 + end + end + end + + private + + def runner_build_ack_queue_key + build.send(:runner_build_ack_queue_key) + end + + def redis_ttl(cache_key) + redis_klass.with do |redis| + redis.ttl(cache_key) + end + end + + def consume_redis_ttl(cache_key) + redis_klass.with do |redis| + redis.set(cache_key, runner_manager.id, ex: Ci::Build::RUNNER_ACK_QUEUE_EXPIRY_TIME - 1, nx: false) + end + end + end end diff --git a/spec/workers/ci/retry_stuck_waiting_job_worker_spec.rb b/spec/workers/ci/retry_stuck_waiting_job_worker_spec.rb new file mode 100644 index 00000000000000..b7321493d0bf9f --- /dev/null +++ b/spec/workers/ci/retry_stuck_waiting_job_worker_spec.rb @@ -0,0 +1,213 @@ +# frozen_string_literal: true + +require 'spec_helper' + +RSpec.describe Ci::RetryStuckWaitingJobWorker, :clean_gitlab_redis_shared_state, feature_category: :continuous_integration do + let_it_be(:user, freeze: true) { create(:user) } + let_it_be(:project, freeze: true) { create(:project, maintainers: [user]) } + let_it_be(:runner, freeze: true) { create(:ci_runner, :with_runner_manager) } + let_it_be(:pipeline, freeze: true) { create(:ci_pipeline, project: project) } + + let(:redis_klass) { Gitlab::Redis::SharedState } + let(:worker) { described_class.new } + + it_behaves_like 'an idempotent worker' do + let_it_be(:build, freeze: true) do + create(:ci_build, :waiting_for_runner_ack, project: project, pipeline: pipeline, runner: runner, user: user) + end + + let(:job_args) { [build.id] } + + before do + allow_next_instances_of(Ci::RetryWaitingJobService, build) do |service| + allow(service).to receive(:execute).and_return(ServiceResponse.success) + end + + allow(Ci::Build).to receive(:find_by_id).with(build.id).and_return(build) + end + end + + describe '#perform' do + let_it_be(:build, freeze: true) do + create(:ci_build, :waiting_for_runner_ack, project: project, pipeline: pipeline, runner: runner, user: user) + end + + subject(:perform) { worker.perform(build_id) } + + context 'when build exists' do + let(:build_id) { build.id } + + before do + allow(Ci::Build).to receive(:find_by_id).with(build_id).and_return(build) + end + + context 'and is waiting for runner ack' do + before do + allow(build).to receive(:waiting_for_runner_ack?).and_return(true) + end + + context 'when runner is still actively heartbeating' do + before do + allow_next_instances_of(Ci::RetryWaitingJobService, build) do |service| + allow(service).to receive(:execute) do + ServiceResponse.error( + message: 'Job is not finished waiting', payload: { job: build, reason: :not_finished_waiting }) + end + end + end + + it 'calls RetryWaitingJobService but does not drop build' do + expect_next_instance_of(Ci::RetryWaitingJobService, build) do |service| + expect(service).to receive(:execute) do + ServiceResponse.error( + message: 'Job is not finished waiting', payload: { job: build, reason: :not_finished_waiting }) + end + end + + perform + end + + it 'reschedules itself for RETRY_TIMEOUT seconds' do + expect(described_class).to receive(:perform_in).with(described_class::RETRY_TIMEOUT) + + perform + end + + it 'consistently reschedules with the same TTL value' do + expect(described_class).to receive(:perform_in).with(described_class::RETRY_TIMEOUT).twice + + # First execution + worker.perform(build_id) + + # Second execution (simulating the rescheduled job running) + worker.perform(build_id) + end + end + + context 'when runner is not actively heartbeating' do + before do + allow_next_instance_of(Ci::RetryWaitingJobService, build) do |service| + allow(service).to(receive(:execute)).and_return(ServiceResponse.success) + end + end + + it 'does not reschedule itself' do + expect(described_class).not_to receive(:perform_in) + + perform + end + end + end + + context 'and is not waiting for runner ack' do + before do + allow(build).to receive(:waiting_for_runner_ack?).and_return(false) + + allow_next_instance_of(Ci::RetryWaitingJobService, build) do |service| + allow(service).to receive(:execute).and_return(ServiceResponse.success) + end + end + + it 'does not reschedule itself' do + expect(described_class).not_to receive(:perform_in) + + perform + end + + it 'calls RetryWaitingJobService nonetheless' do + expect_next_instance_of(Ci::RetryWaitingJobService, build) do |service| + allow(service).to receive(:execute).and_return(ServiceResponse.success) + end + + perform + end + end + end + + context 'when build does not exist' do + let(:build_id) { non_existing_record_id } + + before do + allow_next_instances_of(Ci::RetryWaitingJobService, nil) do |service| + allow(service).to receive(:execute).and_return(ServiceResponse.error(message: 'Job is not in waiting state')) + end + end + + it 'does not reschedule itself' do + expect(described_class).not_to receive(:perform_in) + + perform + end + + it 'calls RetryWaitingJobService with nil build' do + expect_next_instances_of(Ci::RetryWaitingJobService, nil) do |service| + expect(service).to receive(:execute).and_return(ServiceResponse.error(message: 'Job is not in waiting state')) + end + + expect { perform }.not_to raise_error + end + end + end + + describe 'integration with RetryWaitingJobService' do + let_it_be_with_refind(:build) do + create(:ci_build, :waiting_for_runner_ack, project: project, pipeline: pipeline, runner: runner, user: user) + end + + let(:build_id) { build.id } + + before do + allow(Ci::Build).to receive(:find_by_id).with(build_id).and_return(build) + end + + context 'when runner is still actively heartbeating' do + before do + allow(build).to receive(:waiting_for_runner_ack?).and_return(true) + end + + it 'drops build and does not retry it' do + result = nil + expect { result = worker.perform(build_id) } + .to not_change { Ci::Build.all.count }.from(1) + .and not_change { build.retries_count } + + expect(build.reload).to be_pending + expect(result).to be_error + end + end + + context 'when runner is not actively heartbeating' do + before do + allow(build).to receive(:waiting_for_runner_ack?).and_return(false) + end + + it 'drops and retries build' do + result = nil + expect { result = worker.perform(build_id) } + .to change { Ci::Build.all.count }.by(1) + .and change { build.retries_count }.by(1) + + expect(build.reload).to be_failed + expect(build.failure_reason).to eq('runner_provisioning_timeout') + expect(result).to be_success + end + + context 'when retry count exceeds limit' do + before do + allow(build).to receive(:retries_count) + .and_return(Gitlab::Ci::Build::AutoRetry::DEFAULT_RETRIES[:runner_provisioning_timeout]) + end + + it 'drops build and does not retry it' do + result = nil + expect { result = worker.perform(build_id) } + .to not_change { Ci::Build.all.count }.from(1) + .and not_change { build.retries_count } + + expect(build.reload).to be_failed + expect(result).to be_error + end + end + end + end +end -- GitLab From 81fc298c31d0675e9477524f4db1351d1dcab6ea Mon Sep 17 00:00:00 2001 From: Pedro Pombeiro Date: Tue, 23 Sep 2025 15:17:24 +0200 Subject: [PATCH 2/7] Fix Redis call --- app/models/ci/build.rb | 2 +- spec/models/ci/build_two_phase_commit_spec.rb | 16 ++++++++++++---- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/app/models/ci/build.rb b/app/models/ci/build.rb index f779721c4846e4..e6d31d9461c77a 100644 --- a/app/models/ci/build.rb +++ b/app/models/ci/build.rb @@ -1266,7 +1266,7 @@ def heartbeat_runner_ack_wait(runner_manager_id) with_redis do |redis| # Update TTL, only if key already exists - redis.set(runner_build_ack_queue_key, runner_manager_id, ex: RUNNER_ACK_QUEUE_EXPIRY_TIME, nx: false) + redis.set(runner_build_ack_queue_key, runner_manager_id, ex: RUNNER_ACK_QUEUE_EXPIRY_TIME, xx: true) end end diff --git a/spec/models/ci/build_two_phase_commit_spec.rb b/spec/models/ci/build_two_phase_commit_spec.rb index 3d0106bec2960b..f0bb0475680553 100644 --- a/spec/models/ci/build_two_phase_commit_spec.rb +++ b/spec/models/ci/build_two_phase_commit_spec.rb @@ -347,6 +347,16 @@ subject(:heartbeat_runner_ack_wait) { build.heartbeat_runner_ack_wait(runner_manager_id) } + context 'when runner_manager_id does not exist' do + let(:runner_manager_id) { non_existing_record_id } + + it 'does not create new Redis cache entry' do + expect { heartbeat_runner_ack_wait } + .to not_change { runner_build_ack_queue_key_ttl }.from(-2) + .and not_change { build.runner_manager_id_waiting_for_ack }.from(nil) + end + end + context 'when runner_manager_id is present' do let(:runner_manager_id) { 123 } @@ -361,13 +371,11 @@ redis_klass.with do |redis| expect(redis).to receive(:set) .with(runner_build_ack_queue_key, runner_manager_id, - ex: described_class::RUNNER_ACK_QUEUE_EXPIRY_TIME, nx: false) + ex: described_class::RUNNER_ACK_QUEUE_EXPIRY_TIME, xx: true) .and_call_original end - expect do - heartbeat_runner_ack_wait - end + expect { heartbeat_runner_ack_wait } .to change { runner_build_ack_queue_key_ttl }.by_at_least(10) .and not_change { build.runner_manager_id_waiting_for_ack }.from(runner_manager_id) end -- GitLab From 7d0ab798d1d08781de61ca77fa01400bdfd5e582 Mon Sep 17 00:00:00 2001 From: Pedro Pombeiro Date: Tue, 23 Sep 2025 15:27:44 +0200 Subject: [PATCH 3/7] Return 400 Bad Request when build is not in expected state --- app/services/ci/update_build_state_service.rb | 2 +- spec/services/ci/update_build_state_service_spec.rb | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/app/services/ci/update_build_state_service.rb b/app/services/ci/update_build_state_service.rb index 5a59dadf8f022b..960c022f7c94aa 100644 --- a/app/services/ci/update_build_state_service.rb +++ b/app/services/ci/update_build_state_service.rb @@ -136,7 +136,7 @@ def handle_runner_ack_workflow runner_manager = ::Ci::RunnerManager.find_by_id(build.runner_manager_id_waiting_for_ack) if runner_manager.nil? - return Result.new(status: 409) # Conflict: Job is not in pending state or not assigned to a runner + return Result.new(status: 400) # Bad request: Job is not in pending state or not assigned to a runner end # Transition job to running state and assign the runner manager diff --git a/spec/services/ci/update_build_state_service_spec.rb b/spec/services/ci/update_build_state_service_spec.rb index 437ad7975d5717..32e5708e426739 100644 --- a/spec/services/ci/update_build_state_service_spec.rb +++ b/spec/services/ci/update_build_state_service_spec.rb @@ -569,8 +569,8 @@ def execute_with_stubbed_metrics! end end - it 'returns 409 Conflict status' do - expect(execute.status).to eq 409 + it 'returns 400 Bad Request status' do + expect(execute.status).to eq 400 expect(execute.backoff).to be_nil end -- GitLab From 764b672a2401731705430edfda452559ba1d6919 Mon Sep 17 00:00:00 2001 From: Pedro Pombeiro Date: Tue, 23 Sep 2025 15:28:09 +0200 Subject: [PATCH 4/7] Pass build_id when rescheduling worker --- app/workers/ci/retry_stuck_waiting_job_worker.rb | 2 +- spec/workers/ci/retry_stuck_waiting_job_worker_spec.rb | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/app/workers/ci/retry_stuck_waiting_job_worker.rb b/app/workers/ci/retry_stuck_waiting_job_worker.rb index 1c9e6abf7dc39d..da354cda9f797c 100644 --- a/app/workers/ci/retry_stuck_waiting_job_worker.rb +++ b/app/workers/ci/retry_stuck_waiting_job_worker.rb @@ -21,7 +21,7 @@ def perform(build_id) if result.error? && result.payload[:reason] == :not_finished_waiting # If job is still waiting for runner ack (meaning runner is taking longer than expected to provision, # but still actively sending a heartbeat), then let's reschedule this job. - self.class.perform_in(RETRY_TIMEOUT) + self.class.perform_in(RETRY_TIMEOUT, build_id) end end end diff --git a/spec/workers/ci/retry_stuck_waiting_job_worker_spec.rb b/spec/workers/ci/retry_stuck_waiting_job_worker_spec.rb index b7321493d0bf9f..53969a71ac0a82 100644 --- a/spec/workers/ci/retry_stuck_waiting_job_worker_spec.rb +++ b/spec/workers/ci/retry_stuck_waiting_job_worker_spec.rb @@ -68,13 +68,13 @@ end it 'reschedules itself for RETRY_TIMEOUT seconds' do - expect(described_class).to receive(:perform_in).with(described_class::RETRY_TIMEOUT) + expect(described_class).to receive(:perform_in).with(described_class::RETRY_TIMEOUT, build_id) perform end it 'consistently reschedules with the same TTL value' do - expect(described_class).to receive(:perform_in).with(described_class::RETRY_TIMEOUT).twice + expect(described_class).to receive(:perform_in).with(described_class::RETRY_TIMEOUT, build_id).twice # First execution worker.perform(build_id) -- GitLab From deeec644a7ffa6dcd2e41e705ca05ca909148bd0 Mon Sep 17 00:00:00 2001 From: Pedro Pombeiro Date: Tue, 23 Sep 2025 16:07:39 +0200 Subject: [PATCH 5/7] Update FF group and MR URL --- .../gitlab_com_derisk/allow_runner_job_acknowledgement.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/config/feature_flags/gitlab_com_derisk/allow_runner_job_acknowledgement.yml b/config/feature_flags/gitlab_com_derisk/allow_runner_job_acknowledgement.yml index 0027adddb98239..adb123bc13173f 100644 --- a/config/feature_flags/gitlab_com_derisk/allow_runner_job_acknowledgement.yml +++ b/config/feature_flags/gitlab_com_derisk/allow_runner_job_acknowledgement.yml @@ -2,9 +2,9 @@ name: allow_runner_job_acknowledgement description: Allow GitLab Runner to acknowledge acceptance of a CI job feature_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/464048 -introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/204265 +introduced_by_url: https://gitlab.com/gitlab-org/gitlab/-/merge_requests/206034 rollout_issue_url: https://gitlab.com/gitlab-org/gitlab/-/issues/568905 milestone: '18.5' -group: group::runner +group: group::ci platform type: gitlab_com_derisk default_enabled: false -- GitLab From 2bb2122c4c240d02782e4a5630d651fae1e8369a Mon Sep 17 00:00:00 2001 From: Pedro Pombeiro Date: Tue, 23 Sep 2025 16:08:09 +0200 Subject: [PATCH 6/7] Ensure heartbeat is only allowed from initial runner manager --- app/models/ci/build.rb | 2 +- spec/models/ci/build_two_phase_commit_spec.rb | 14 ++++++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) diff --git a/app/models/ci/build.rb b/app/models/ci/build.rb index e6d31d9461c77a..29d1e56e897b4c 100644 --- a/app/models/ci/build.rb +++ b/app/models/ci/build.rb @@ -1262,7 +1262,7 @@ def set_waiting_for_runner_ack(runner_manager_id) # Update the ttl for the Redis cache entry containing the runner manager id on which we're waiting on # for acknowledgement (job accepted or job declined) def heartbeat_runner_ack_wait(runner_manager_id) - return unless runner_manager_id.present? + return unless runner_manager_id.present? && runner_manager_id == runner_manager_id_waiting_for_ack with_redis do |redis| # Update TTL, only if key already exists diff --git a/spec/models/ci/build_two_phase_commit_spec.rb b/spec/models/ci/build_two_phase_commit_spec.rb index f0bb0475680553..7d2c6bc878dc7a 100644 --- a/spec/models/ci/build_two_phase_commit_spec.rb +++ b/spec/models/ci/build_two_phase_commit_spec.rb @@ -354,6 +354,8 @@ expect { heartbeat_runner_ack_wait } .to not_change { runner_build_ack_queue_key_ttl }.from(-2) .and not_change { build.runner_manager_id_waiting_for_ack }.from(nil) + + expect(heartbeat_runner_ack_wait).to be_falsey end end @@ -378,6 +380,18 @@ expect { heartbeat_runner_ack_wait } .to change { runner_build_ack_queue_key_ttl }.by_at_least(10) .and not_change { build.runner_manager_id_waiting_for_ack }.from(runner_manager_id) + + expect(heartbeat_runner_ack_wait).to be_truthy + end + + context 'and runner_manager_id does not match existing cache entry' do + it 'does not create new Redis cache entry and returns false' do + expect { build.heartbeat_runner_ack_wait(non_existing_record_id) } + .to not_change { runner_build_ack_queue_key_ttl > 0 }.from(true) + .and not_change { build.runner_manager_id_waiting_for_ack }.from(runner_manager_id) + + expect(build.heartbeat_runner_ack_wait(non_existing_record_id)).to be_falsey + end end context 'when Redis operation fails' do -- GitLab From 87ba032b2220bdbb6e8ad9d8311b49f651cf6dd1 Mon Sep 17 00:00:00 2001 From: Pedro Pombeiro Date: Fri, 26 Sep 2025 21:26:08 +0200 Subject: [PATCH 7/7] Address MR review comments --- app/models/ci/build.rb | 2 +- app/services/ci/update_build_state_service.rb | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/app/models/ci/build.rb b/app/models/ci/build.rb index 29d1e56e897b4c..7976a337154c6b 100644 --- a/app/models/ci/build.rb +++ b/app/models/ci/build.rb @@ -1447,7 +1447,7 @@ def with_redis(&block) end def runner_build_ack_queue_key - "runner:build_ack_queue:#{token}" + "runner:build_ack_queue:#{id}" end end end diff --git a/app/services/ci/update_build_state_service.rb b/app/services/ci/update_build_state_service.rb index 960c022f7c94aa..2f983c568c34ef 100644 --- a/app/services/ci/update_build_state_service.rb +++ b/app/services/ci/update_build_state_service.rb @@ -22,7 +22,7 @@ def execute # Handle two-phase commit workflow for jobs waiting for runner acknowledgment if build.waiting_for_runner_ack? result = handle_runner_ack_workflow - return result unless result.status == 100 # Continue processing if returned HTTP 100 Continue + return result unless result.nil? # Continue processing if nil returned end unless accept_available? @@ -143,7 +143,7 @@ def handle_runner_ack_workflow build.run! build.runner_manager = runner_manager - Result.new(status: 100) # Continue processing with update_build_state! + nil # Continue processing with update_build_state! else # Invalid state for two-phase commit workflow Result.new(status: 400) -- GitLab