blob: 3b69da2d1921f43232083bcc912d64b1d652b0f6 [file] [log] [blame]
Avi Drissman8ba1bad2022-09-13 19:22:361// Copyright 2020 The Chromium Authors
Leonid Baraz61437cb2021-02-26 20:43:062// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
Leonid Barazf10cae82021-09-14 00:59:385#include "components/reporting/client/report_queue_impl.h"
Leonid Baraz61437cb2021-02-26 20:43:066
7#include <memory>
Arthur Sonzognic571efb2024-01-26 20:26:188#include <optional>
Leonid Barazf84aa6c2021-12-13 19:38:209#include <queue>
Leonid Baraz61437cb2021-02-26 20:43:0610#include <string>
11#include <utility>
12
Ahmed Nasrf21e8432022-07-30 00:48:3713#include "base/check.h"
Vignesh Shenvi17227fa2023-11-17 19:07:2814#include "base/check_op.h"
Avi Drissman12be0312023-01-11 09:16:0915#include "base/functional/bind.h"
16#include "base/functional/callback.h"
Vignesh Shenvi17227fa2023-11-17 19:07:2817#include "base/location.h"
Hong Xu353b73552023-10-27 19:43:5018#include "base/logging.h"
Leonid Baraz61437cb2021-02-26 20:43:0619#include "base/memory/ptr_util.h"
Leonid Baraz61437cb2021-02-26 20:43:0620#include "base/memory/scoped_refptr.h"
Vignesh Shenvi17227fa2023-11-17 19:07:2821#include "base/memory/weak_ptr.h"
Hong Xue84060512023-01-31 10:26:1822#include "base/metrics/histogram_functions.h"
Leonid Barazb8c275352021-08-05 00:59:0923#include "base/notreached.h"
Leonid Baraz61437cb2021-02-26 20:43:0624#include "base/sequence_checker.h"
Hong Xue84060512023-01-31 10:26:1825#include "base/strings/strcat.h"
26#include "base/strings/string_number_conversions.h"
Patrick Monette643cdf62021-10-15 19:13:4227#include "base/task/bind_post_task.h"
Sean Mahere672a662023-01-09 21:42:2828#include "base/task/sequenced_task_runner.h"
Leonid Baraz61437cb2021-02-26 20:43:0629#include "base/task/task_traits.h"
30#include "base/task/thread_pool.h"
31#include "base/time/time.h"
Vignesh Shenvi17227fa2023-11-17 19:07:2832#include "base/types/expected.h"
Leonid Barazcaac7c92021-03-04 17:34:0533#include "components/reporting/client/report_queue_configuration.h"
Josh Hilke19210322023-09-19 19:26:3534#include "components/reporting/proto/synced/metric_data.pb.h"
Josh Hilkee7a46992021-10-21 20:21:1935#include "components/reporting/proto/synced/record.pb.h"
36#include "components/reporting/proto/synced/record_constants.pb.h"
Leonid Baraz61437cb2021-02-26 20:43:0637#include "components/reporting/storage/storage_module_interface.h"
Josh Hilke1c36a732024-05-22 22:38:2038#include "components/reporting/util/reporting_errors.h"
Leonid Baraz61437cb2021-02-26 20:43:0639#include "components/reporting/util/status.h"
Leonid Baraz61437cb2021-02-26 20:43:0640#include "components/reporting/util/statusor.h"
41
42namespace reporting {
Leonid Barazf8a9daf2022-06-02 01:09:3543namespace {
Leonid Baraz38bc6e222022-09-29 22:49:2944
Hong Xue84060512023-01-31 10:26:1845// UTC time of 2122-01-01T00:00:00Z since Unix epoch 1970-01-01T00:00:00Z in
46// microseconds.
47constexpr int64_t kTime2122 = 4'796'668'800'000'000;
48
Josh Hilke19210322023-09-19 19:26:3549// Returns true if record is allowed to go to `destination`. Returns false
50// otherwise.
51static bool RecordMayGoToDestination(const std::string& record_data,
52 Destination destination) {
53 // All records sent to destination *_METRIC must be MetricData
54 // protos due to the way the server is implemented.
55 if (destination == Destination::EVENT_METRIC ||
56 destination == Destination::TELEMETRY_METRIC ||
57 destination == Destination::INFO_METRIC) {
58 MetricData metric_data;
59 const bool is_metric_data =
60 metric_data.ParseFromString(record_data) &&
61 (metric_data.has_event_data() || metric_data.has_telemetry_data() ||
62 metric_data.has_info_data());
63 LOG_IF(ERROR, !is_metric_data)
64 << "Only MetricData records may be enqueued with destinations: "
65 "EVENT_METRIC, TELEMETRY_METRIC, or INFO_METRIC";
66 return is_metric_data;
67 }
68 return true;
69}
70
Josh Hilkefff491372024-05-09 22:15:5571StatusOr<Record> ProduceRecord(std::string dm_token,
72 Destination destination,
73 int64_t reserved_space,
74 std::optional<SourceInfo> source_info,
75 ReportQueue::RecordProducer record_producer) {
Leonid Baraz8e2830e2022-06-02 20:26:5076 // Generate record data.
Tom Sepez709fd2932024-09-11 02:15:2277 auto record_result = std::move(record_producer).Run();
Hong Xu5b50cfa2023-10-26 21:28:3778 if (!record_result.has_value()) {
Josh Hilkefff491372024-05-09 22:15:5579 return base::unexpected(record_result.error());
Leonid Barazf8a9daf2022-06-02 01:09:3580 }
81
Hong Xu92482af2023-10-25 21:17:5582 CHECK(RecordMayGoToDestination(record_result.value(), destination));
Josh Hilke19210322023-09-19 19:26:3583
Leonid Barazf8a9daf2022-06-02 01:09:3584 // Augment data.
85 Record record;
Hong Xu92482af2023-10-25 21:17:5586 *record.mutable_data() = std::move(record_result.value());
Leonid Barazf8a9daf2022-06-02 01:09:3587 record.set_destination(destination);
Leonid Barazb4bdd5d2022-11-10 02:08:3888 if (reserved_space > 0L) {
89 record.set_reserved_space(reserved_space);
90 }
Leonid Barazf8a9daf2022-06-02 01:09:3591
Leonid Baraz9975c2642023-02-24 22:09:3892 // Additional record augmentation for keeping local record copy.
93 // Note: that must be done before calling `storage->AddRecord` below,
94 // because later the handler might call it with no need to set this flag.
95 switch (destination) {
96 case LOG_UPLOAD:
97 // It would be better to base the decision on `upload_settings` presence
98 // in the event, but that would require protobuf reflecion, that is not
99 // included in Chromium build. So instead we just use `destination`.
100 record.set_needs_local_unencrypted_copy(true);
101 break;
102 default: // Do nothing.
103 break;
104 }
105
Josh Hilke19210322023-09-19 19:26:35106 // |record| with no DM token is assumed to be associated with device DM
107 // token.
Leonid Barazf8a9daf2022-06-02 01:09:35108 if (!dm_token.empty()) {
109 *record.mutable_dm_token() = std::move(dm_token);
110 }
111
Vignesh Shenvi459a6a622023-07-12 00:23:16112 // Augment source info if available.
113 if (source_info.has_value()) {
114 *record.mutable_source_info() = std::move(source_info.value());
115 }
116
Leonid Barazf8a9daf2022-06-02 01:09:35117 // Calculate timestamp in microseconds - to match Spanner expectations.
118 const int64_t time_since_epoch_us =
Peter Kasting08b91b42023-10-21 03:46:09119 base::Time::Now().InMillisecondsSinceUnixEpoch() *
120 base::Time::kMicrosecondsPerMillisecond;
Hong Xue84060512023-01-31 10:26:18121 if (time_since_epoch_us > kTime2122) {
122 // Unusual timestamp. Reject the record even though the record is good
123 // otherwise, because we can't obtain a reasonable timestamp. We have this
124 // code block here because server very occasionally detects very large
Josh Hilke19210322023-09-19 19:26:35125 // timestamps. The reason could come from occasional irregular system
126 // time. Filtering out irregular timestamps here should address the
127 // problem without leaving timestamp-related bugs in the ERP undiscovered
128 // (should there be any).
Hong Xue84060512023-01-31 10:26:18129 base::UmaHistogramBoolean("Browser.ERP.UnusualEnqueueTimestamp", true);
Josh Hilkefff491372024-05-09 22:15:55130 return base::unexpected(Status(
Hong Xue84060512023-01-31 10:26:18131 error::FAILED_PRECONDITION,
132 base::StrCat(
133 {"Abnormal system timestamp obtained. Microseconds since epoch: ",
134 base::NumberToString(time_since_epoch_us)})));
Josh Hilkefff491372024-05-09 22:15:55135 }
Josh Hilke67441be2024-05-13 18:17:00136 record.set_timestamp_us(time_since_epoch_us);
Josh Hilkefff491372024-05-09 22:15:55137 return std::move(record);
138}
139
140// Calls |record_producer|, checks the result and in case of success, forwards
141// it to the storage. In production code should be invoked asynchronously, on
142// a thread pool (no synchronization expected).
143void AddRecordToStorage(scoped_refptr<StorageModuleInterface> storage,
144 Priority priority,
145 WrappedRateLimiter::AsyncAcquireCb acquire_cb,
146 std::string dm_token,
147 Destination destination,
148 int64_t reserved_space,
149 std::optional<SourceInfo> source_info,
150 ReportQueue::RecordProducer record_producer,
151 StorageModuleInterface::EnqueueCallback callback) {
152 auto record_result =
153 ProduceRecord(dm_token, destination, reserved_space, source_info,
154 std::move(record_producer));
155
156 if (!record_result.has_value()) {
157 std::move(callback).Run(record_result.error());
Hong Xue84060512023-01-31 10:26:18158 return;
159 }
Leonid Baraz135ad9052023-05-19 15:20:26160
Josh Hilkefff491372024-05-09 22:15:55161 const auto record_size = record_result.value().ByteSizeLong();
Leonid Baraz135ad9052023-05-19 15:20:26162
163 // Prepare `Storage::AddRecord` as a callback.
Josh Hilkefff491372024-05-09 22:15:55164 auto add_record_cb =
165 base::BindOnce(&StorageModuleInterface::AddRecord, storage, priority,
166 std::move(record_result.value()));
Leonid Baraz135ad9052023-05-19 15:20:26167
168 // Rate-limit event, if required.
169 if (!acquire_cb) {
170 // No rate limiter, just add resulting Record to the storage.
171 std::move(add_record_cb).Run(std::move(callback));
Leonid Barazf8a9daf2022-06-02 01:09:35172 return;
173 }
174
Leonid Baraz135ad9052023-05-19 15:20:26175 // Add Record only if rate limiter approves.
176 acquire_cb.Run(
177 record_size,
178 base::BindOnce(
179 [](size_t record_size,
180 base::OnceCallback<void(StorageModuleInterface::EnqueueCallback
181 callback)> add_record_cb,
182 StorageModuleInterface::EnqueueCallback callback, bool acquired) {
183 if (!acquired) {
184 std::move(callback).Run(
185 Status(error::OUT_OF_RANGE,
186 base::StrCat({"Event size ",
187 base::NumberToString(record_size),
188 " rejected by rate limiter"})));
189 return;
190 }
191 // Add resulting Record to the storage.
192 std::move(add_record_cb).Run(std::move(callback));
193 },
194 record_size, std::move(add_record_cb), std::move(callback)));
Leonid Barazf8a9daf2022-06-02 01:09:35195}
196} // namespace
Leonid Baraz61437cb2021-02-26 20:43:06197
Leonid Baraz4d49766a2021-10-16 23:50:48198void ReportQueueImpl::Create(
Leonid Baraz61437cb2021-02-26 20:43:06199 std::unique_ptr<ReportQueueConfiguration> config,
Leonid Baraz4d49766a2021-10-16 23:50:48200 scoped_refptr<StorageModuleInterface> storage,
201 base::OnceCallback<void(StatusOr<std::unique_ptr<ReportQueue>>)> cb) {
202 std::move(cb).Run(base::WrapUnique<ReportQueueImpl>(
203 new ReportQueueImpl(std::move(config), storage)));
Leonid Baraz61437cb2021-02-26 20:43:06204}
205
Leonid Baraz61437cb2021-02-26 20:43:06206ReportQueueImpl::ReportQueueImpl(
207 std::unique_ptr<ReportQueueConfiguration> config,
208 scoped_refptr<StorageModuleInterface> storage)
Vignesh Shenvi17227fa2023-11-17 19:07:28209 : config_(std::move(config)), storage_(storage) {
210 CHECK(config_);
211}
Leonid Baraz61437cb2021-02-26 20:43:06212
Leonid Barazf8a9daf2022-06-02 01:09:35213ReportQueueImpl::~ReportQueueImpl() = default;
214
215void ReportQueueImpl::AddProducedRecord(RecordProducer record_producer,
216 Priority priority,
217 EnqueueCallback callback) const {
Leonid Baraz61437cb2021-02-26 20:43:06218 const Status status = config_->CheckPolicy();
219 if (!status.ok()) {
220 std::move(callback).Run(status);
221 return;
222 }
223
224 if (priority == Priority::UNDEFINED_PRIORITY) {
225 std::move(callback).Run(
226 Status(error::INVALID_ARGUMENT, "Priority must be defined"));
227 return;
228 }
229
Leonid Barazf8a9daf2022-06-02 01:09:35230 // Execute |record_producer| on arbitrary thread, analyze the result and send
231 // it to the Storage, returning with the callback.
232 base::ThreadPool::PostTask(
233 FROM_HERE, {base::TaskPriority::BEST_EFFORT},
234 base::BindOnce(&AddRecordToStorage, storage_, priority,
Leonid Baraz135ad9052023-05-19 15:20:26235 config_->is_event_allowed_cb(), config_->dm_token(),
236 config_->destination(), config_->reserved_space(),
Vignesh Shenvi459a6a622023-07-12 00:23:16237 config_->source_info(), std::move(record_producer),
Irem Uguz02a125792024-01-15 09:28:58238 // EnqueueCallback must be run on the current thread, we
239 // need to bind to make sure it's posted to correct thread
240 // from the ThreadPool.
241 base::BindPostTaskToCurrentDefault(std::move(callback))));
Leonid Baraz61437cb2021-02-26 20:43:06242}
243
Leonid Baraz45429512021-03-12 18:20:12244void ReportQueueImpl::Flush(Priority priority, FlushCallback callback) {
245 storage_->Flush(priority, std::move(callback));
246}
247
Leonid Barazb8c275352021-08-05 00:59:09248base::OnceCallback<void(StatusOr<std::unique_ptr<ReportQueue>>)>
249ReportQueueImpl::PrepareToAttachActualQueue() const {
Peter Boström77d21352024-11-13 22:26:11250 NOTREACHED();
Leonid Barazb8c275352021-08-05 00:59:09251}
252
Vignesh Shenvi17227fa2023-11-17 19:07:28253Destination ReportQueueImpl::GetDestination() const {
254 return config_->destination();
255}
256
Leonid Baraz8e2830e2022-06-02 20:26:50257// Implementation of SpeculativeReportQueueImpl::PendingRecordProducer
258
259SpeculativeReportQueueImpl::PendingRecordProducer::PendingRecordProducer(
260 RecordProducer producer,
Leonid Baraz38bc6e222022-09-29 22:49:29261 EnqueueCallback callback,
Leonid Baraz8e2830e2022-06-02 20:26:50262 Priority priority)
Leonid Baraz38bc6e222022-09-29 22:49:29263 : record_producer(std::move(producer)),
264 record_callback(std::move(callback)),
265 record_priority(priority) {}
Leonid Baraz8e2830e2022-06-02 20:26:50266
267SpeculativeReportQueueImpl::PendingRecordProducer::PendingRecordProducer(
268 PendingRecordProducer&& other)
269 : record_producer(std::move(other.record_producer)),
Leonid Baraz38bc6e222022-09-29 22:49:29270 record_callback(std::move(other.record_callback)),
Leonid Baraz8e2830e2022-06-02 20:26:50271 record_priority(other.record_priority) {}
272
273SpeculativeReportQueueImpl::PendingRecordProducer::~PendingRecordProducer() =
274 default;
275
276SpeculativeReportQueueImpl::PendingRecordProducer&
277SpeculativeReportQueueImpl::PendingRecordProducer::operator=(
278 PendingRecordProducer&& other) {
279 record_producer = std::move(other.record_producer);
Leonid Baraz38bc6e222022-09-29 22:49:29280 record_callback = std::move(other.record_callback);
Leonid Baraz8e2830e2022-06-02 20:26:50281 record_priority = other.record_priority;
282 return *this;
283}
284
Leonid Barazb8c275352021-08-05 00:59:09285// static
286std::unique_ptr<SpeculativeReportQueueImpl, base::OnTaskRunnerDeleter>
Vignesh Shenvi17227fa2023-11-17 19:07:28287SpeculativeReportQueueImpl::Create(
288 const SpeculativeConfigSettings& config_settings) {
Leonid Barazb8c275352021-08-05 00:59:09289 scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner =
290 base::ThreadPool::CreateSequencedTaskRunner(
291 {base::TaskPriority::BEST_EFFORT, base::MayBlock()});
292 return std::unique_ptr<SpeculativeReportQueueImpl, base::OnTaskRunnerDeleter>(
Vignesh Shenvi17227fa2023-11-17 19:07:28293 new SpeculativeReportQueueImpl(config_settings, sequenced_task_runner),
Leonid Barazb8c275352021-08-05 00:59:09294 base::OnTaskRunnerDeleter(sequenced_task_runner));
295}
296
297SpeculativeReportQueueImpl::SpeculativeReportQueueImpl(
Vignesh Shenvi17227fa2023-11-17 19:07:28298 const SpeculativeConfigSettings& config_settings,
Leonid Barazb8c275352021-08-05 00:59:09299 scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner)
Vignesh Shenvi17227fa2023-11-17 19:07:28300 : sequenced_task_runner_(sequenced_task_runner),
301 config_settings_(config_settings) {
Leonid Barazb8c275352021-08-05 00:59:09302 DETACH_FROM_SEQUENCE(sequence_checker_);
303}
304
305SpeculativeReportQueueImpl::~SpeculativeReportQueueImpl() {
306 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
Leonid Baraz4a70e5b2022-09-30 20:12:04307 PurgePendingProducers(
308 Status(error::DATA_LOSS, "The queue is being destructed"));
Leonid Barazb8c275352021-08-05 00:59:09309}
310
311void SpeculativeReportQueueImpl::Flush(Priority priority,
312 FlushCallback callback) {
313 sequenced_task_runner_->PostTask(
314 FROM_HERE,
315 base::BindOnce(
316 [](Priority priority, FlushCallback callback,
317 base::WeakPtr<SpeculativeReportQueueImpl> self) {
318 if (!self) {
319 std::move(callback).Run(
320 Status(error::UNAVAILABLE, "Queue has been destructed"));
Josh Hilke1c36a732024-05-22 22:38:20321 base::UmaHistogramEnumeration(
322 reporting::kUmaUnavailableErrorReason,
323 UnavailableErrorReason::REPORT_QUEUE_DESTRUCTED,
324 UnavailableErrorReason::MAX_VALUE);
Leonid Barazb8c275352021-08-05 00:59:09325 return;
326 }
327 DCHECK_CALLED_ON_VALID_SEQUENCE(self->sequence_checker_);
Leonid Baraz4a70e5b2022-09-30 20:12:04328 if (!self->actual_report_queue_.has_value()) {
Leonid Barazb8c275352021-08-05 00:59:09329 std::move(callback).Run(Status(error::FAILED_PRECONDITION,
330 "ReportQueue is not ready yet."));
331 return;
332 }
Ahmed Nasrf21e8432022-07-30 00:48:37333 const std::unique_ptr<ReportQueue>& report_queue =
Leonid Baraz4a70e5b2022-09-30 20:12:04334 self->actual_report_queue_.value();
Ahmed Nasrf21e8432022-07-30 00:48:37335 report_queue->Flush(priority, std::move(callback));
Leonid Barazb8c275352021-08-05 00:59:09336 },
337 priority, std::move(callback), weak_ptr_factory_.GetWeakPtr()));
338}
339
Leonid Barazf8a9daf2022-06-02 01:09:35340void SpeculativeReportQueueImpl::AddProducedRecord(
341 RecordProducer record_producer,
342 Priority priority,
343 EnqueueCallback callback) const {
344 // Invoke producer on a thread pool, then enqueue record on sequenced task
345 // runner.
Leonid Baraz8e2830e2022-06-02 20:26:50346 sequenced_task_runner_->PostTask(
347 FROM_HERE,
348 base::BindOnce(&SpeculativeReportQueueImpl::MaybeEnqueueRecordProducer,
349 weak_ptr_factory_.GetWeakPtr(), priority,
Sergey Poromov6b9d2482023-05-04 15:47:03350 base::BindPostTaskToCurrentDefault(std::move(callback)),
351 std::move(record_producer)));
Leonid Barazb8c275352021-08-05 00:59:09352}
353
Leonid Baraz8e2830e2022-06-02 20:26:50354void SpeculativeReportQueueImpl::MaybeEnqueueRecordProducer(
Leonid Barazb8c275352021-08-05 00:59:09355 Priority priority,
Leonid Barazf8a9daf2022-06-02 01:09:35356 EnqueueCallback callback,
Leonid Baraz8e2830e2022-06-02 20:26:50357 RecordProducer record_producer) const {
Leonid Barazb8c275352021-08-05 00:59:09358 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
Leonid Baraz4a70e5b2022-09-30 20:12:04359 if (!actual_report_queue_.has_value()) {
Ahmed Nasrf21e8432022-07-30 00:48:37360 // Queue is not ready yet, store the record in the memory queue.
Leonid Baraz38bc6e222022-09-29 22:49:29361 pending_record_producers_.emplace(std::move(record_producer),
362 std::move(callback), priority);
Leonid Barazb8c275352021-08-05 00:59:09363 return;
364 }
Ahmed Nasrf21e8432022-07-30 00:48:37365 // Queue is ready. If memory queue is empty, just forward the record.
Leonid Baraz8e2830e2022-06-02 20:26:50366 if (pending_record_producers_.empty()) {
Ahmed Nasrf21e8432022-07-30 00:48:37367 const std::unique_ptr<ReportQueue>& report_queue =
Leonid Baraz4a70e5b2022-09-30 20:12:04368 actual_report_queue_.value();
Ahmed Nasrf21e8432022-07-30 00:48:37369 report_queue->AddProducedRecord(std::move(record_producer), priority,
370 std::move(callback));
Leonid Barazb8c275352021-08-05 00:59:09371 return;
372 }
373 // If memory queue is not empty, attach the new record at the
374 // end and initiate enqueuing of everything from there.
Leonid Baraz38bc6e222022-09-29 22:49:29375 pending_record_producers_.emplace(std::move(record_producer),
376 std::move(callback), priority);
377 EnqueuePendingRecordProducers();
Leonid Barazb8c275352021-08-05 00:59:09378}
379
Leonid Baraz38bc6e222022-09-29 22:49:29380void SpeculativeReportQueueImpl::EnqueuePendingRecordProducers() const {
Leonid Barazb8c275352021-08-05 00:59:09381 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
Leonid Baraz48447f142023-07-31 19:46:21382 CHECK(actual_report_queue_.has_value());
Leonid Baraz8e2830e2022-06-02 20:26:50383 if (pending_record_producers_.empty()) {
Ahmed Nasrd6d02da72022-04-13 18:07:41384 return;
385 }
Ahmed Nasrf21e8432022-07-30 00:48:37386 const std::unique_ptr<ReportQueue>& report_queue =
Leonid Baraz4a70e5b2022-09-30 20:12:04387 actual_report_queue_.value();
Leonid Baraz8e2830e2022-06-02 20:26:50388 auto head = std::move(pending_record_producers_.front());
389 pending_record_producers_.pop();
390 if (pending_record_producers_.empty()) {
Leonid Barazb8c275352021-08-05 00:59:09391 // Last of the pending records.
Ahmed Nasrf21e8432022-07-30 00:48:37392 report_queue->AddProducedRecord(std::move(head.record_producer),
Leonid Baraz38bc6e222022-09-29 22:49:29393 head.record_priority,
394 std::move(head.record_callback));
Leonid Barazb8c275352021-08-05 00:59:09395 return;
396 }
Ahmed Nasrf21e8432022-07-30 00:48:37397 report_queue->AddProducedRecord(
Leonid Baraz8e2830e2022-06-02 20:26:50398 std::move(head.record_producer), head.record_priority,
Leonid Barazb8c275352021-08-05 00:59:09399 base::BindPostTask(
400 sequenced_task_runner_,
401 base::BindOnce(
402 [](base::WeakPtr<const SpeculativeReportQueueImpl> self,
403 EnqueueCallback callback, Status status) {
404 if (!status.ok()) {
405 std::move(callback).Run(status);
406 return;
407 }
408 if (!self) {
409 std::move(callback).Run(
410 Status(error::UNAVAILABLE, "Queue has been destructed"));
Josh Hilke1c36a732024-05-22 22:38:20411 base::UmaHistogramEnumeration(
412 reporting::kUmaUnavailableErrorReason,
413 UnavailableErrorReason::REPORT_QUEUE_DESTRUCTED,
414 UnavailableErrorReason::MAX_VALUE);
Leonid Barazb8c275352021-08-05 00:59:09415 return;
416 }
Leonid Baraz38bc6e222022-09-29 22:49:29417 std::move(callback).Run(status);
418 self->EnqueuePendingRecordProducers();
Leonid Barazb8c275352021-08-05 00:59:09419 },
Leonid Baraz38bc6e222022-09-29 22:49:29420 weak_ptr_factory_.GetWeakPtr(),
421 std::move(head.record_callback))));
Leonid Barazb8c275352021-08-05 00:59:09422}
423
424base::OnceCallback<void(StatusOr<std::unique_ptr<ReportQueue>>)>
425SpeculativeReportQueueImpl::PrepareToAttachActualQueue() const {
426 return base::BindPostTask(
427 sequenced_task_runner_,
Leonid Baraz081325392022-11-21 09:13:29428 base::BindOnce(&SpeculativeReportQueueImpl::AttachActualQueue,
429 weak_ptr_factory_.GetMutableWeakPtr()));
Leonid Barazb8c275352021-08-05 00:59:09430}
431
Vignesh Shenvi17227fa2023-11-17 19:07:28432Destination SpeculativeReportQueueImpl::GetDestination() const {
433 return config_settings_.destination;
434}
435
Leonid Barazb8c275352021-08-05 00:59:09436void SpeculativeReportQueueImpl::AttachActualQueue(
Ahmed Nasrf21e8432022-07-30 00:48:37437 StatusOr<std::unique_ptr<ReportQueue>> status_or_actual_queue) {
Leonid Baraz081325392022-11-21 09:13:29438 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
439 if (actual_report_queue_.has_value()) {
440 // Already attached, do nothing.
441 return;
442 }
Hong Xu5b50cfa2023-10-26 21:28:37443 if (!status_or_actual_queue.has_value()) {
Leonid Baraz081325392022-11-21 09:13:29444 // Failed to create actual queue.
445 // Flush all pending records with this status.
Hong Xu547bcda2023-10-27 04:28:01446 PurgePendingProducers(status_or_actual_queue.error());
Leonid Baraz081325392022-11-21 09:13:29447 return;
448 }
449 // Actual report queue succeeded, store it (never to change later).
Vignesh Shenvi17227fa2023-11-17 19:07:28450 CHECK_EQ(config_settings_.destination,
451 status_or_actual_queue.value()->GetDestination());
Hong Xu92482af2023-10-25 21:17:55452 actual_report_queue_ = std::move(status_or_actual_queue.value());
Leonid Baraz081325392022-11-21 09:13:29453 EnqueuePendingRecordProducers();
Leonid Barazb8c275352021-08-05 00:59:09454}
Leonid Baraz4a70e5b2022-09-30 20:12:04455
456void SpeculativeReportQueueImpl::PurgePendingProducers(Status status) const {
457 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
458 while (!pending_record_producers_.empty()) {
459 auto head = std::move(pending_record_producers_.front());
460 pending_record_producers_.pop();
Josh Hilke990e0752024-05-28 20:58:35461 base::UmaHistogramEnumeration(
462 reporting::kUmaDataLossErrorReason,
463 DataLossErrorReason::
464 SPECULATIVE_REPORT_QUEUE_DESTRUCTED_BEFORE_RECORDS_ENQUEUED,
465 DataLossErrorReason::MAX_VALUE);
Leonid Baraz4a70e5b2022-09-30 20:12:04466 std::move(head.record_callback).Run(status);
467 }
468}
Leonid Baraz61437cb2021-02-26 20:43:06469} // namespace reporting