blob: 9f564c8b272b78fc1678be9b49ce3eabb53b0372 [file] [log] [blame]
Leonid Baraz61437cb2021-02-26 20:43:061// Copyright 2020 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
Leonid Barazf10cae82021-09-14 00:59:385#include "components/reporting/client/report_queue_impl.h"
Leonid Baraz61437cb2021-02-26 20:43:066
7#include <memory>
Leonid Barazf84aa6c2021-12-13 19:38:208#include <queue>
Leonid Baraz61437cb2021-02-26 20:43:069#include <string>
10#include <utility>
11
12#include "base/bind.h"
13#include "base/callback.h"
14#include "base/json/json_writer.h"
15#include "base/memory/ptr_util.h"
16#include "base/memory/ref_counted.h"
17#include "base/memory/scoped_refptr.h"
Leonid Barazb8c275352021-08-05 00:59:0918#include "base/notreached.h"
Leonid Baraz61437cb2021-02-26 20:43:0619#include "base/sequence_checker.h"
20#include "base/strings/strcat.h"
Patrick Monette643cdf62021-10-15 19:13:4221#include "base/task/bind_post_task.h"
Leonid Baraz61437cb2021-02-26 20:43:0622#include "base/task/task_traits.h"
23#include "base/task/thread_pool.h"
24#include "base/time/time.h"
25#include "base/values.h"
Leonid Barazcaac7c92021-03-04 17:34:0526#include "components/reporting/client/report_queue_configuration.h"
Josh Hilkee7a46992021-10-21 20:21:1927#include "components/reporting/proto/synced/record.pb.h"
28#include "components/reporting/proto/synced/record_constants.pb.h"
Leonid Baraz61437cb2021-02-26 20:43:0629#include "components/reporting/storage/storage_module_interface.h"
30#include "components/reporting/util/status.h"
Leonid Baraz61437cb2021-02-26 20:43:0631#include "components/reporting/util/statusor.h"
32
33namespace reporting {
Leonid Barazf8a9daf2022-06-02 01:09:3534namespace {
35// Calls |record_producer|, checks the result and in case of success, forwards
36// it to the storage. In production code should be invoked asynchronously, on a
37// thread pool (no synchronization expected).
38void AddRecordToStorage(scoped_refptr<StorageModuleInterface> storage,
39 Priority priority,
40 std::string dm_token,
41 Destination destination,
42 ReportQueue::RecordProducer record_producer,
43 StorageModuleInterface::EnqueueCallback callback) {
Leonid Baraz8e2830e2022-06-02 20:26:5044 // Generate record data.
Leonid Barazf8a9daf2022-06-02 01:09:3545 auto record_result = std::move(record_producer).Run();
46 if (!record_result.ok()) {
47 std::move(callback).Run(record_result.status());
48 return;
49 }
50
51 // Augment data.
52 Record record;
53 *record.mutable_data() = std::move(record_result.ValueOrDie());
54 record.set_destination(destination);
55
Leonid Baraz8e2830e2022-06-02 20:26:5056 // |record| with no DM token is assumed to be associated with device DM token
Leonid Barazf8a9daf2022-06-02 01:09:3557 if (!dm_token.empty()) {
58 *record.mutable_dm_token() = std::move(dm_token);
59 }
60
61 // Calculate timestamp in microseconds - to match Spanner expectations.
62 const int64_t time_since_epoch_us =
63 base::Time::Now().ToJavaTime() * base::Time::kMicrosecondsPerMillisecond;
64 record.set_timestamp_us(time_since_epoch_us);
65 if (!record_result.ok()) {
66 std::move(callback).Run(record_result.status());
67 return;
68 }
69
70 // Add resulting Record to the storage.
71 storage->AddRecord(priority, std::move(record), std::move(callback));
72}
73} // namespace
Leonid Baraz61437cb2021-02-26 20:43:0674
Leonid Baraz4d49766a2021-10-16 23:50:4875void ReportQueueImpl::Create(
Leonid Baraz61437cb2021-02-26 20:43:0676 std::unique_ptr<ReportQueueConfiguration> config,
Leonid Baraz4d49766a2021-10-16 23:50:4877 scoped_refptr<StorageModuleInterface> storage,
78 base::OnceCallback<void(StatusOr<std::unique_ptr<ReportQueue>>)> cb) {
79 std::move(cb).Run(base::WrapUnique<ReportQueueImpl>(
80 new ReportQueueImpl(std::move(config), storage)));
Leonid Baraz61437cb2021-02-26 20:43:0681}
82
Leonid Baraz61437cb2021-02-26 20:43:0683ReportQueueImpl::ReportQueueImpl(
84 std::unique_ptr<ReportQueueConfiguration> config,
85 scoped_refptr<StorageModuleInterface> storage)
Leonid Barazf8a9daf2022-06-02 01:09:3586 : config_(std::move(config)), storage_(storage) {}
Leonid Baraz61437cb2021-02-26 20:43:0687
Leonid Barazf8a9daf2022-06-02 01:09:3588ReportQueueImpl::~ReportQueueImpl() = default;
89
90void ReportQueueImpl::AddProducedRecord(RecordProducer record_producer,
91 Priority priority,
92 EnqueueCallback callback) const {
Leonid Baraz61437cb2021-02-26 20:43:0693 const Status status = config_->CheckPolicy();
94 if (!status.ok()) {
95 std::move(callback).Run(status);
96 return;
97 }
98
99 if (priority == Priority::UNDEFINED_PRIORITY) {
100 std::move(callback).Run(
101 Status(error::INVALID_ARGUMENT, "Priority must be defined"));
102 return;
103 }
104
Leonid Barazf8a9daf2022-06-02 01:09:35105 // Execute |record_producer| on arbitrary thread, analyze the result and send
106 // it to the Storage, returning with the callback.
107 base::ThreadPool::PostTask(
108 FROM_HERE, {base::TaskPriority::BEST_EFFORT},
109 base::BindOnce(&AddRecordToStorage, storage_, priority,
110 config_->dm_token(), config_->destination(),
111 std::move(record_producer), std::move(callback)));
Leonid Baraz61437cb2021-02-26 20:43:06112}
113
Leonid Baraz45429512021-03-12 18:20:12114void ReportQueueImpl::Flush(Priority priority, FlushCallback callback) {
115 storage_->Flush(priority, std::move(callback));
116}
117
Leonid Barazb8c275352021-08-05 00:59:09118base::OnceCallback<void(StatusOr<std::unique_ptr<ReportQueue>>)>
119ReportQueueImpl::PrepareToAttachActualQueue() const {
120 NOTREACHED();
121 return base::BindOnce(
122 [](StatusOr<std::unique_ptr<ReportQueue>>) { NOTREACHED(); });
123}
124
Leonid Baraz8e2830e2022-06-02 20:26:50125// Implementation of SpeculativeReportQueueImpl::PendingRecordProducer
126
127SpeculativeReportQueueImpl::PendingRecordProducer::PendingRecordProducer(
128 RecordProducer producer,
129 Priority priority)
130 : record_producer(std::move(producer)), record_priority(priority) {}
131
132SpeculativeReportQueueImpl::PendingRecordProducer::PendingRecordProducer(
133 PendingRecordProducer&& other)
134 : record_producer(std::move(other.record_producer)),
135 record_priority(other.record_priority) {}
136
137SpeculativeReportQueueImpl::PendingRecordProducer::~PendingRecordProducer() =
138 default;
139
140SpeculativeReportQueueImpl::PendingRecordProducer&
141SpeculativeReportQueueImpl::PendingRecordProducer::operator=(
142 PendingRecordProducer&& other) {
143 record_producer = std::move(other.record_producer);
144 record_priority = other.record_priority;
145 return *this;
146}
147
Leonid Barazb8c275352021-08-05 00:59:09148// static
149std::unique_ptr<SpeculativeReportQueueImpl, base::OnTaskRunnerDeleter>
150SpeculativeReportQueueImpl::Create() {
151 scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner =
152 base::ThreadPool::CreateSequencedTaskRunner(
153 {base::TaskPriority::BEST_EFFORT, base::MayBlock()});
154 return std::unique_ptr<SpeculativeReportQueueImpl, base::OnTaskRunnerDeleter>(
155 new SpeculativeReportQueueImpl(sequenced_task_runner),
156 base::OnTaskRunnerDeleter(sequenced_task_runner));
157}
158
159SpeculativeReportQueueImpl::SpeculativeReportQueueImpl(
160 scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner)
161 : sequenced_task_runner_(sequenced_task_runner) {
162 DETACH_FROM_SEQUENCE(sequence_checker_);
163}
164
165SpeculativeReportQueueImpl::~SpeculativeReportQueueImpl() {
166 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
167}
168
169void SpeculativeReportQueueImpl::Flush(Priority priority,
170 FlushCallback callback) {
171 sequenced_task_runner_->PostTask(
172 FROM_HERE,
173 base::BindOnce(
174 [](Priority priority, FlushCallback callback,
175 base::WeakPtr<SpeculativeReportQueueImpl> self) {
176 if (!self) {
177 std::move(callback).Run(
178 Status(error::UNAVAILABLE, "Queue has been destructed"));
179 return;
180 }
181 DCHECK_CALLED_ON_VALID_SEQUENCE(self->sequence_checker_);
182 if (!self->report_queue_) {
183 std::move(callback).Run(Status(error::FAILED_PRECONDITION,
184 "ReportQueue is not ready yet."));
185 return;
186 }
187 self->report_queue_->Flush(priority, std::move(callback));
188 },
189 priority, std::move(callback), weak_ptr_factory_.GetWeakPtr()));
190}
191
Leonid Barazf8a9daf2022-06-02 01:09:35192void SpeculativeReportQueueImpl::AddProducedRecord(
193 RecordProducer record_producer,
194 Priority priority,
195 EnqueueCallback callback) const {
196 // Invoke producer on a thread pool, then enqueue record on sequenced task
197 // runner.
Leonid Baraz8e2830e2022-06-02 20:26:50198 sequenced_task_runner_->PostTask(
199 FROM_HERE,
200 base::BindOnce(&SpeculativeReportQueueImpl::MaybeEnqueueRecordProducer,
201 weak_ptr_factory_.GetWeakPtr(), priority,
202 std::move(callback), std::move(record_producer)));
Leonid Barazb8c275352021-08-05 00:59:09203}
204
Leonid Baraz8e2830e2022-06-02 20:26:50205void SpeculativeReportQueueImpl::MaybeEnqueueRecordProducer(
Leonid Barazb8c275352021-08-05 00:59:09206 Priority priority,
Leonid Barazf8a9daf2022-06-02 01:09:35207 EnqueueCallback callback,
Leonid Baraz8e2830e2022-06-02 20:26:50208 RecordProducer record_producer) const {
Leonid Barazb8c275352021-08-05 00:59:09209 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
210 if (!report_queue_) {
211 // Queue is not ready yet, store the record in the memory
212 // queue.
Leonid Baraz8e2830e2022-06-02 20:26:50213 pending_record_producers_.emplace(std::move(record_producer), priority);
Leonid Barazb8c275352021-08-05 00:59:09214 std::move(callback).Run(Status::StatusOK());
215 return;
216 }
217 // Queue is ready. If memory queue is empty, just forward the
218 // record.
Leonid Baraz8e2830e2022-06-02 20:26:50219 if (pending_record_producers_.empty()) {
220 report_queue_->AddProducedRecord(std::move(record_producer), priority,
221 std::move(callback));
Leonid Barazb8c275352021-08-05 00:59:09222 return;
223 }
224 // If memory queue is not empty, attach the new record at the
225 // end and initiate enqueuing of everything from there.
Leonid Baraz8e2830e2022-06-02 20:26:50226 pending_record_producers_.emplace(std::move(record_producer), priority);
227 EnqueuePendingRecordProducers(std::move(callback));
Leonid Barazb8c275352021-08-05 00:59:09228}
229
Leonid Baraz8e2830e2022-06-02 20:26:50230void SpeculativeReportQueueImpl::EnqueuePendingRecordProducers(
Leonid Barazb8c275352021-08-05 00:59:09231 EnqueueCallback callback) const {
232 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
Leonid Barazb8c275352021-08-05 00:59:09233 DCHECK(report_queue_);
Leonid Baraz8e2830e2022-06-02 20:26:50234 if (pending_record_producers_.empty()) {
Ahmed Nasrd6d02da72022-04-13 18:07:41235 std::move(callback).Run(Status::StatusOK());
236 return;
237 }
238
Leonid Baraz8e2830e2022-06-02 20:26:50239 auto head = std::move(pending_record_producers_.front());
240 pending_record_producers_.pop();
241 if (pending_record_producers_.empty()) {
Leonid Barazb8c275352021-08-05 00:59:09242 // Last of the pending records.
Leonid Baraz8e2830e2022-06-02 20:26:50243 report_queue_->AddProducedRecord(std::move(head.record_producer),
244 head.record_priority, std::move(callback));
Leonid Barazb8c275352021-08-05 00:59:09245 return;
246 }
Leonid Baraz8e2830e2022-06-02 20:26:50247 report_queue_->AddProducedRecord(
248 std::move(head.record_producer), head.record_priority,
Leonid Barazb8c275352021-08-05 00:59:09249 base::BindPostTask(
250 sequenced_task_runner_,
251 base::BindOnce(
252 [](base::WeakPtr<const SpeculativeReportQueueImpl> self,
253 EnqueueCallback callback, Status status) {
254 if (!status.ok()) {
255 std::move(callback).Run(status);
256 return;
257 }
258 if (!self) {
259 std::move(callback).Run(
260 Status(error::UNAVAILABLE, "Queue has been destructed"));
261 return;
262 }
263 self->sequenced_task_runner_->PostTask(
Leonid Baraz8e2830e2022-06-02 20:26:50264 FROM_HERE, base::BindOnce(&SpeculativeReportQueueImpl::
265 EnqueuePendingRecordProducers,
266 self, std::move(callback)));
Leonid Barazb8c275352021-08-05 00:59:09267 },
268 weak_ptr_factory_.GetWeakPtr(), std::move(callback))));
269}
270
271base::OnceCallback<void(StatusOr<std::unique_ptr<ReportQueue>>)>
272SpeculativeReportQueueImpl::PrepareToAttachActualQueue() const {
273 return base::BindPostTask(
274 sequenced_task_runner_,
275 base::BindOnce(
276 [](base::WeakPtr<SpeculativeReportQueueImpl> speculative_queue,
277 StatusOr<std::unique_ptr<ReportQueue>> actual_queue_result) {
278 if (!speculative_queue) {
279 return; // Speculative queue was destructed in a meantime.
280 }
281 if (!actual_queue_result.ok()) {
282 return; // Actual queue creation failed.
283 }
284 // Set actual queue for the speculative queue to use
285 // (asynchronously).
286 speculative_queue->AttachActualQueue(
287 std::move(actual_queue_result.ValueOrDie()));
288 },
289 weak_ptr_factory_.GetWeakPtr()));
290}
291
292void SpeculativeReportQueueImpl::AttachActualQueue(
293 std::unique_ptr<ReportQueue> actual_queue) {
294 sequenced_task_runner_->PostTask(
295 FROM_HERE,
296 base::BindOnce(
297 [](base::WeakPtr<SpeculativeReportQueueImpl> self,
298 std::unique_ptr<ReportQueue> actual_queue) {
299 if (!self) {
300 return;
301 }
302 DCHECK_CALLED_ON_VALID_SEQUENCE(self->sequence_checker_);
303 if (self->report_queue_) {
304 // Already attached, do nothing.
305 return;
306 }
307 self->report_queue_ = std::move(actual_queue);
Leonid Baraz8e2830e2022-06-02 20:26:50308 if (!self->pending_record_producers_.empty()) {
309 self->EnqueuePendingRecordProducers(
Leonid Barazb8c275352021-08-05 00:59:09310 base::BindOnce([](Status enqueue_status) {
311 if (!enqueue_status.ok()) {
312 LOG(ERROR) << "Pending records failed to enqueue, status="
313 << enqueue_status;
314 }
315 }));
316 }
317 },
318 weak_ptr_factory_.GetWeakPtr(), std::move(actual_queue)));
319}
320
Leonid Baraz61437cb2021-02-26 20:43:06321} // namespace reporting