blob: d39544cfdd21c4758c92ece8c8ecfcc665b63faf [file] [log] [blame]
Leonid Baraz61437cb2021-02-26 20:43:061// Copyright 2020 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
Leonid Barazf10cae82021-09-14 00:59:385#include "components/reporting/client/report_queue_impl.h"
Leonid Baraz61437cb2021-02-26 20:43:066
7#include <memory>
Leonid Barazf84aa6c2021-12-13 19:38:208#include <queue>
Leonid Baraz61437cb2021-02-26 20:43:069#include <string>
10#include <utility>
11
12#include "base/bind.h"
13#include "base/callback.h"
Ahmed Nasrf21e8432022-07-30 00:48:3714#include "base/check.h"
Leonid Baraz61437cb2021-02-26 20:43:0615#include "base/memory/ptr_util.h"
Leonid Baraz61437cb2021-02-26 20:43:0616#include "base/memory/scoped_refptr.h"
Leonid Barazb8c275352021-08-05 00:59:0917#include "base/notreached.h"
Leonid Baraz61437cb2021-02-26 20:43:0618#include "base/sequence_checker.h"
Patrick Monette643cdf62021-10-15 19:13:4219#include "base/task/bind_post_task.h"
Leonid Baraz61437cb2021-02-26 20:43:0620#include "base/task/task_traits.h"
21#include "base/task/thread_pool.h"
22#include "base/time/time.h"
Leonid Barazcaac7c92021-03-04 17:34:0523#include "components/reporting/client/report_queue_configuration.h"
Josh Hilkee7a46992021-10-21 20:21:1924#include "components/reporting/proto/synced/record.pb.h"
25#include "components/reporting/proto/synced/record_constants.pb.h"
Leonid Baraz61437cb2021-02-26 20:43:0626#include "components/reporting/storage/storage_module_interface.h"
27#include "components/reporting/util/status.h"
Leonid Baraz61437cb2021-02-26 20:43:0628#include "components/reporting/util/statusor.h"
29
30namespace reporting {
Leonid Barazf8a9daf2022-06-02 01:09:3531namespace {
32// Calls |record_producer|, checks the result and in case of success, forwards
33// it to the storage. In production code should be invoked asynchronously, on a
34// thread pool (no synchronization expected).
35void AddRecordToStorage(scoped_refptr<StorageModuleInterface> storage,
36 Priority priority,
37 std::string dm_token,
38 Destination destination,
39 ReportQueue::RecordProducer record_producer,
40 StorageModuleInterface::EnqueueCallback callback) {
Leonid Baraz8e2830e2022-06-02 20:26:5041 // Generate record data.
Leonid Barazf8a9daf2022-06-02 01:09:3542 auto record_result = std::move(record_producer).Run();
43 if (!record_result.ok()) {
44 std::move(callback).Run(record_result.status());
45 return;
46 }
47
48 // Augment data.
49 Record record;
50 *record.mutable_data() = std::move(record_result.ValueOrDie());
51 record.set_destination(destination);
52
Leonid Baraz8e2830e2022-06-02 20:26:5053 // |record| with no DM token is assumed to be associated with device DM token
Leonid Barazf8a9daf2022-06-02 01:09:3554 if (!dm_token.empty()) {
55 *record.mutable_dm_token() = std::move(dm_token);
56 }
57
58 // Calculate timestamp in microseconds - to match Spanner expectations.
59 const int64_t time_since_epoch_us =
60 base::Time::Now().ToJavaTime() * base::Time::kMicrosecondsPerMillisecond;
61 record.set_timestamp_us(time_since_epoch_us);
62 if (!record_result.ok()) {
63 std::move(callback).Run(record_result.status());
64 return;
65 }
66
67 // Add resulting Record to the storage.
68 storage->AddRecord(priority, std::move(record), std::move(callback));
69}
70} // namespace
Leonid Baraz61437cb2021-02-26 20:43:0671
Leonid Baraz4d49766a2021-10-16 23:50:4872void ReportQueueImpl::Create(
Leonid Baraz61437cb2021-02-26 20:43:0673 std::unique_ptr<ReportQueueConfiguration> config,
Leonid Baraz4d49766a2021-10-16 23:50:4874 scoped_refptr<StorageModuleInterface> storage,
75 base::OnceCallback<void(StatusOr<std::unique_ptr<ReportQueue>>)> cb) {
76 std::move(cb).Run(base::WrapUnique<ReportQueueImpl>(
77 new ReportQueueImpl(std::move(config), storage)));
Leonid Baraz61437cb2021-02-26 20:43:0678}
79
Leonid Baraz61437cb2021-02-26 20:43:0680ReportQueueImpl::ReportQueueImpl(
81 std::unique_ptr<ReportQueueConfiguration> config,
82 scoped_refptr<StorageModuleInterface> storage)
Leonid Barazf8a9daf2022-06-02 01:09:3583 : config_(std::move(config)), storage_(storage) {}
Leonid Baraz61437cb2021-02-26 20:43:0684
Leonid Barazf8a9daf2022-06-02 01:09:3585ReportQueueImpl::~ReportQueueImpl() = default;
86
87void ReportQueueImpl::AddProducedRecord(RecordProducer record_producer,
88 Priority priority,
89 EnqueueCallback callback) const {
Leonid Baraz61437cb2021-02-26 20:43:0690 const Status status = config_->CheckPolicy();
91 if (!status.ok()) {
92 std::move(callback).Run(status);
93 return;
94 }
95
96 if (priority == Priority::UNDEFINED_PRIORITY) {
97 std::move(callback).Run(
98 Status(error::INVALID_ARGUMENT, "Priority must be defined"));
99 return;
100 }
101
Leonid Barazf8a9daf2022-06-02 01:09:35102 // Execute |record_producer| on arbitrary thread, analyze the result and send
103 // it to the Storage, returning with the callback.
104 base::ThreadPool::PostTask(
105 FROM_HERE, {base::TaskPriority::BEST_EFFORT},
106 base::BindOnce(&AddRecordToStorage, storage_, priority,
107 config_->dm_token(), config_->destination(),
108 std::move(record_producer), std::move(callback)));
Leonid Baraz61437cb2021-02-26 20:43:06109}
110
Leonid Baraz45429512021-03-12 18:20:12111void ReportQueueImpl::Flush(Priority priority, FlushCallback callback) {
112 storage_->Flush(priority, std::move(callback));
113}
114
Leonid Barazb8c275352021-08-05 00:59:09115base::OnceCallback<void(StatusOr<std::unique_ptr<ReportQueue>>)>
116ReportQueueImpl::PrepareToAttachActualQueue() const {
117 NOTREACHED();
118 return base::BindOnce(
119 [](StatusOr<std::unique_ptr<ReportQueue>>) { NOTREACHED(); });
120}
121
Leonid Baraz8e2830e2022-06-02 20:26:50122// Implementation of SpeculativeReportQueueImpl::PendingRecordProducer
123
124SpeculativeReportQueueImpl::PendingRecordProducer::PendingRecordProducer(
125 RecordProducer producer,
126 Priority priority)
127 : record_producer(std::move(producer)), record_priority(priority) {}
128
129SpeculativeReportQueueImpl::PendingRecordProducer::PendingRecordProducer(
130 PendingRecordProducer&& other)
131 : record_producer(std::move(other.record_producer)),
132 record_priority(other.record_priority) {}
133
134SpeculativeReportQueueImpl::PendingRecordProducer::~PendingRecordProducer() =
135 default;
136
137SpeculativeReportQueueImpl::PendingRecordProducer&
138SpeculativeReportQueueImpl::PendingRecordProducer::operator=(
139 PendingRecordProducer&& other) {
140 record_producer = std::move(other.record_producer);
141 record_priority = other.record_priority;
142 return *this;
143}
144
Leonid Barazb8c275352021-08-05 00:59:09145// static
146std::unique_ptr<SpeculativeReportQueueImpl, base::OnTaskRunnerDeleter>
147SpeculativeReportQueueImpl::Create() {
148 scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner =
149 base::ThreadPool::CreateSequencedTaskRunner(
150 {base::TaskPriority::BEST_EFFORT, base::MayBlock()});
151 return std::unique_ptr<SpeculativeReportQueueImpl, base::OnTaskRunnerDeleter>(
152 new SpeculativeReportQueueImpl(sequenced_task_runner),
153 base::OnTaskRunnerDeleter(sequenced_task_runner));
154}
155
156SpeculativeReportQueueImpl::SpeculativeReportQueueImpl(
157 scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner)
158 : sequenced_task_runner_(sequenced_task_runner) {
159 DETACH_FROM_SEQUENCE(sequence_checker_);
160}
161
162SpeculativeReportQueueImpl::~SpeculativeReportQueueImpl() {
163 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
164}
165
166void SpeculativeReportQueueImpl::Flush(Priority priority,
167 FlushCallback callback) {
168 sequenced_task_runner_->PostTask(
169 FROM_HERE,
170 base::BindOnce(
171 [](Priority priority, FlushCallback callback,
172 base::WeakPtr<SpeculativeReportQueueImpl> self) {
173 if (!self) {
174 std::move(callback).Run(
175 Status(error::UNAVAILABLE, "Queue has been destructed"));
176 return;
177 }
178 DCHECK_CALLED_ON_VALID_SEQUENCE(self->sequence_checker_);
Ahmed Nasrf21e8432022-07-30 00:48:37179 if (!self->status_or_report_queue_.has_value()) {
Leonid Barazb8c275352021-08-05 00:59:09180 std::move(callback).Run(Status(error::FAILED_PRECONDITION,
181 "ReportQueue is not ready yet."));
182 return;
183 }
Ahmed Nasrf21e8432022-07-30 00:48:37184 if (!self->status_or_report_queue_->ok()) {
185 std::move(callback).Run(self->status_or_report_queue_->status());
186 return;
187 }
188 const std::unique_ptr<ReportQueue>& report_queue =
189 self->status_or_report_queue_->ValueOrDie();
190 report_queue->Flush(priority, std::move(callback));
Leonid Barazb8c275352021-08-05 00:59:09191 },
192 priority, std::move(callback), weak_ptr_factory_.GetWeakPtr()));
193}
194
Leonid Barazf8a9daf2022-06-02 01:09:35195void SpeculativeReportQueueImpl::AddProducedRecord(
196 RecordProducer record_producer,
197 Priority priority,
198 EnqueueCallback callback) const {
199 // Invoke producer on a thread pool, then enqueue record on sequenced task
200 // runner.
Leonid Baraz8e2830e2022-06-02 20:26:50201 sequenced_task_runner_->PostTask(
202 FROM_HERE,
203 base::BindOnce(&SpeculativeReportQueueImpl::MaybeEnqueueRecordProducer,
204 weak_ptr_factory_.GetWeakPtr(), priority,
205 std::move(callback), std::move(record_producer)));
Leonid Barazb8c275352021-08-05 00:59:09206}
207
Leonid Baraz8e2830e2022-06-02 20:26:50208void SpeculativeReportQueueImpl::MaybeEnqueueRecordProducer(
Leonid Barazb8c275352021-08-05 00:59:09209 Priority priority,
Leonid Barazf8a9daf2022-06-02 01:09:35210 EnqueueCallback callback,
Leonid Baraz8e2830e2022-06-02 20:26:50211 RecordProducer record_producer) const {
Leonid Barazb8c275352021-08-05 00:59:09212 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
Ahmed Nasrf21e8432022-07-30 00:48:37213 if (!status_or_report_queue_.has_value()) {
214 // Queue is not ready yet, store the record in the memory queue.
Leonid Baraz8e2830e2022-06-02 20:26:50215 pending_record_producers_.emplace(std::move(record_producer), priority);
Leonid Barazb8c275352021-08-05 00:59:09216 std::move(callback).Run(Status::StatusOK());
217 return;
218 }
Ahmed Nasrf21e8432022-07-30 00:48:37219 if (!status_or_report_queue_->ok()) {
220 // Queue creation failed.
221 std::move(callback).Run(status_or_report_queue_->status());
222 return;
223 }
224 // Queue is ready. If memory queue is empty, just forward the record.
Leonid Baraz8e2830e2022-06-02 20:26:50225 if (pending_record_producers_.empty()) {
Ahmed Nasrf21e8432022-07-30 00:48:37226 const std::unique_ptr<ReportQueue>& report_queue =
227 status_or_report_queue_->ValueOrDie();
228 report_queue->AddProducedRecord(std::move(record_producer), priority,
229 std::move(callback));
Leonid Barazb8c275352021-08-05 00:59:09230 return;
231 }
232 // If memory queue is not empty, attach the new record at the
233 // end and initiate enqueuing of everything from there.
Leonid Baraz8e2830e2022-06-02 20:26:50234 pending_record_producers_.emplace(std::move(record_producer), priority);
235 EnqueuePendingRecordProducers(std::move(callback));
Leonid Barazb8c275352021-08-05 00:59:09236}
237
Leonid Baraz8e2830e2022-06-02 20:26:50238void SpeculativeReportQueueImpl::EnqueuePendingRecordProducers(
Leonid Barazb8c275352021-08-05 00:59:09239 EnqueueCallback callback) const {
240 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
Ahmed Nasrf21e8432022-07-30 00:48:37241 DCHECK(status_or_report_queue_.has_value());
242 DCHECK(status_or_report_queue_->ok());
Leonid Baraz8e2830e2022-06-02 20:26:50243 if (pending_record_producers_.empty()) {
Ahmed Nasrd6d02da72022-04-13 18:07:41244 std::move(callback).Run(Status::StatusOK());
245 return;
246 }
Ahmed Nasrf21e8432022-07-30 00:48:37247 const std::unique_ptr<ReportQueue>& report_queue =
248 status_or_report_queue_->ValueOrDie();
Ahmed Nasrd6d02da72022-04-13 18:07:41249
Leonid Baraz8e2830e2022-06-02 20:26:50250 auto head = std::move(pending_record_producers_.front());
251 pending_record_producers_.pop();
252 if (pending_record_producers_.empty()) {
Leonid Barazb8c275352021-08-05 00:59:09253 // Last of the pending records.
Ahmed Nasrf21e8432022-07-30 00:48:37254 report_queue->AddProducedRecord(std::move(head.record_producer),
255 head.record_priority, std::move(callback));
Leonid Barazb8c275352021-08-05 00:59:09256 return;
257 }
Ahmed Nasrf21e8432022-07-30 00:48:37258 report_queue->AddProducedRecord(
Leonid Baraz8e2830e2022-06-02 20:26:50259 std::move(head.record_producer), head.record_priority,
Leonid Barazb8c275352021-08-05 00:59:09260 base::BindPostTask(
261 sequenced_task_runner_,
262 base::BindOnce(
263 [](base::WeakPtr<const SpeculativeReportQueueImpl> self,
264 EnqueueCallback callback, Status status) {
265 if (!status.ok()) {
266 std::move(callback).Run(status);
267 return;
268 }
269 if (!self) {
270 std::move(callback).Run(
271 Status(error::UNAVAILABLE, "Queue has been destructed"));
272 return;
273 }
274 self->sequenced_task_runner_->PostTask(
Leonid Baraz8e2830e2022-06-02 20:26:50275 FROM_HERE, base::BindOnce(&SpeculativeReportQueueImpl::
276 EnqueuePendingRecordProducers,
277 self, std::move(callback)));
Leonid Barazb8c275352021-08-05 00:59:09278 },
279 weak_ptr_factory_.GetWeakPtr(), std::move(callback))));
280}
281
282base::OnceCallback<void(StatusOr<std::unique_ptr<ReportQueue>>)>
283SpeculativeReportQueueImpl::PrepareToAttachActualQueue() const {
284 return base::BindPostTask(
285 sequenced_task_runner_,
286 base::BindOnce(
287 [](base::WeakPtr<SpeculativeReportQueueImpl> speculative_queue,
288 StatusOr<std::unique_ptr<ReportQueue>> actual_queue_result) {
289 if (!speculative_queue) {
290 return; // Speculative queue was destructed in a meantime.
291 }
Leonid Barazb8c275352021-08-05 00:59:09292 // Set actual queue for the speculative queue to use
293 // (asynchronously).
294 speculative_queue->AttachActualQueue(
Ahmed Nasrf21e8432022-07-30 00:48:37295 std::move(std::move(actual_queue_result)));
Leonid Barazb8c275352021-08-05 00:59:09296 },
297 weak_ptr_factory_.GetWeakPtr()));
298}
299
300void SpeculativeReportQueueImpl::AttachActualQueue(
Ahmed Nasrf21e8432022-07-30 00:48:37301 StatusOr<std::unique_ptr<ReportQueue>> status_or_actual_queue) {
Leonid Barazb8c275352021-08-05 00:59:09302 sequenced_task_runner_->PostTask(
303 FROM_HERE,
304 base::BindOnce(
305 [](base::WeakPtr<SpeculativeReportQueueImpl> self,
Ahmed Nasrf21e8432022-07-30 00:48:37306 StatusOr<std::unique_ptr<ReportQueue>> status_or_actual_queue) {
Leonid Barazb8c275352021-08-05 00:59:09307 if (!self) {
308 return;
309 }
310 DCHECK_CALLED_ON_VALID_SEQUENCE(self->sequence_checker_);
Ahmed Nasrf21e8432022-07-30 00:48:37311 if (self->status_or_report_queue_.has_value()) {
Leonid Barazb8c275352021-08-05 00:59:09312 // Already attached, do nothing.
313 return;
314 }
Ahmed Nasrf21e8432022-07-30 00:48:37315 self->status_or_report_queue_ = std::move(status_or_actual_queue);
316 // TODO(b/239583016): remove the ok status check once the enqueue
317 // callbacks are stored along with the records in a pending queue
318 // instead of only the records, and run the callbacks with the
319 // failure status if creation failed.
320 if (self->status_or_report_queue_->ok() &&
321 !self->pending_record_producers_.empty()) {
Leonid Baraz8e2830e2022-06-02 20:26:50322 self->EnqueuePendingRecordProducers(
Leonid Barazb8c275352021-08-05 00:59:09323 base::BindOnce([](Status enqueue_status) {
324 if (!enqueue_status.ok()) {
325 LOG(ERROR) << "Pending records failed to enqueue, status="
326 << enqueue_status;
327 }
328 }));
329 }
330 },
Ahmed Nasrf21e8432022-07-30 00:48:37331 weak_ptr_factory_.GetWeakPtr(), std::move(status_or_actual_queue)));
Leonid Barazb8c275352021-08-05 00:59:09332}
333
Leonid Baraz61437cb2021-02-26 20:43:06334} // namespace reporting