blob: 16993ebe7e744424d960e33a6df7036d41a1cb84 [file] [log] [blame]
Leonid Baraz61437cb2021-02-26 20:43:061// Copyright 2020 The Chromium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
Leonid Barazf10cae82021-09-14 00:59:385#include "components/reporting/client/report_queue_impl.h"
Leonid Baraz61437cb2021-02-26 20:43:066
7#include <memory>
Leonid Barazf84aa6c2021-12-13 19:38:208#include <queue>
Leonid Baraz61437cb2021-02-26 20:43:069#include <string>
10#include <utility>
11
12#include "base/bind.h"
13#include "base/callback.h"
14#include "base/json/json_writer.h"
15#include "base/memory/ptr_util.h"
16#include "base/memory/ref_counted.h"
17#include "base/memory/scoped_refptr.h"
Leonid Barazb8c275352021-08-05 00:59:0918#include "base/notreached.h"
Leonid Baraz61437cb2021-02-26 20:43:0619#include "base/sequence_checker.h"
20#include "base/strings/strcat.h"
Patrick Monette643cdf62021-10-15 19:13:4221#include "base/task/bind_post_task.h"
Leonid Baraz61437cb2021-02-26 20:43:0622#include "base/task/task_traits.h"
23#include "base/task/thread_pool.h"
24#include "base/time/time.h"
25#include "base/values.h"
Leonid Barazcaac7c92021-03-04 17:34:0526#include "components/reporting/client/report_queue_configuration.h"
Josh Hilkee7a46992021-10-21 20:21:1927#include "components/reporting/proto/synced/record.pb.h"
28#include "components/reporting/proto/synced/record_constants.pb.h"
Leonid Baraz61437cb2021-02-26 20:43:0629#include "components/reporting/storage/storage_module_interface.h"
30#include "components/reporting/util/status.h"
31#include "components/reporting/util/status_macros.h"
32#include "components/reporting/util/statusor.h"
33
34namespace reporting {
35
Leonid Baraz4d49766a2021-10-16 23:50:4836void ReportQueueImpl::Create(
Leonid Baraz61437cb2021-02-26 20:43:0637 std::unique_ptr<ReportQueueConfiguration> config,
Leonid Baraz4d49766a2021-10-16 23:50:4838 scoped_refptr<StorageModuleInterface> storage,
39 base::OnceCallback<void(StatusOr<std::unique_ptr<ReportQueue>>)> cb) {
40 std::move(cb).Run(base::WrapUnique<ReportQueueImpl>(
41 new ReportQueueImpl(std::move(config), storage)));
Leonid Baraz61437cb2021-02-26 20:43:0642}
43
44ReportQueueImpl::~ReportQueueImpl() = default;
45
46ReportQueueImpl::ReportQueueImpl(
47 std::unique_ptr<ReportQueueConfiguration> config,
48 scoped_refptr<StorageModuleInterface> storage)
49 : config_(std::move(config)),
50 storage_(storage),
51 sequenced_task_runner_(
52 base::ThreadPool::CreateSequencedTaskRunner(base::TaskTraits())) {
53 DETACH_FROM_SEQUENCE(sequence_checker_);
54}
55
Leonid Baraz2aaffa32022-06-01 17:04:4556void ReportQueueImpl::AddRecord(std::string record,
Leonid Baraz61437cb2021-02-26 20:43:0657 Priority priority,
58 EnqueueCallback callback) const {
59 const Status status = config_->CheckPolicy();
60 if (!status.ok()) {
61 std::move(callback).Run(status);
62 return;
63 }
64
65 if (priority == Priority::UNDEFINED_PRIORITY) {
66 std::move(callback).Run(
67 Status(error::INVALID_ARGUMENT, "Priority must be defined"));
68 return;
69 }
70
71 sequenced_task_runner_->PostTask(
72 FROM_HERE, base::BindOnce(&ReportQueueImpl::SendRecordToStorage,
Leonid Baraz2aaffa32022-06-01 17:04:4573 base::Unretained(this), std::move(record),
Leonid Baraz61437cb2021-02-26 20:43:0674 priority, std::move(callback)));
75}
76
Leonid Baraz2aaffa32022-06-01 17:04:4577void ReportQueueImpl::SendRecordToStorage(std::string record_data,
Leonid Baraz61437cb2021-02-26 20:43:0678 Priority priority,
79 EnqueueCallback callback) const {
Leonid Baraz2aaffa32022-06-01 17:04:4580 storage_->AddRecord(priority, AugmentRecord(std::move(record_data)),
Leonid Baraz61437cb2021-02-26 20:43:0681 std::move(callback));
82}
83
Leonid Baraz2aaffa32022-06-01 17:04:4584Record ReportQueueImpl::AugmentRecord(std::string record_data) const {
Leonid Baraz61437cb2021-02-26 20:43:0685 Record record;
Leonid Baraz2aaffa32022-06-01 17:04:4586 *record.mutable_data() = std::move(record_data);
Leonid Baraz61437cb2021-02-26 20:43:0687 record.set_destination(config_->destination());
Vignesh Shenvi2fd13cd2021-09-23 00:53:0888
89 // record with no DM token is assumed to be associated with device DM token
90 if (!config_->dm_token().empty()) {
91 record.set_dm_token(config_->dm_token());
92 }
93
Leonid Baraz61437cb2021-02-26 20:43:0694 // Calculate timestamp in microseconds - to match Spanner expectations.
95 const int64_t time_since_epoch_us =
Leonid Baraz45b4bc92021-09-14 02:48:4796 base::Time::Now().ToJavaTime() * base::Time::kMicrosecondsPerMillisecond;
Leonid Baraz61437cb2021-02-26 20:43:0697 record.set_timestamp_us(time_since_epoch_us);
98 return record;
99}
100
Leonid Baraz45429512021-03-12 18:20:12101void ReportQueueImpl::Flush(Priority priority, FlushCallback callback) {
102 storage_->Flush(priority, std::move(callback));
103}
104
Leonid Barazb8c275352021-08-05 00:59:09105base::OnceCallback<void(StatusOr<std::unique_ptr<ReportQueue>>)>
106ReportQueueImpl::PrepareToAttachActualQueue() const {
107 NOTREACHED();
108 return base::BindOnce(
109 [](StatusOr<std::unique_ptr<ReportQueue>>) { NOTREACHED(); });
110}
111
112// static
113std::unique_ptr<SpeculativeReportQueueImpl, base::OnTaskRunnerDeleter>
114SpeculativeReportQueueImpl::Create() {
115 scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner =
116 base::ThreadPool::CreateSequencedTaskRunner(
117 {base::TaskPriority::BEST_EFFORT, base::MayBlock()});
118 return std::unique_ptr<SpeculativeReportQueueImpl, base::OnTaskRunnerDeleter>(
119 new SpeculativeReportQueueImpl(sequenced_task_runner),
120 base::OnTaskRunnerDeleter(sequenced_task_runner));
121}
122
123SpeculativeReportQueueImpl::SpeculativeReportQueueImpl(
124 scoped_refptr<base::SequencedTaskRunner> sequenced_task_runner)
125 : sequenced_task_runner_(sequenced_task_runner) {
126 DETACH_FROM_SEQUENCE(sequence_checker_);
127}
128
129SpeculativeReportQueueImpl::~SpeculativeReportQueueImpl() {
130 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
131}
132
133void SpeculativeReportQueueImpl::Flush(Priority priority,
134 FlushCallback callback) {
135 sequenced_task_runner_->PostTask(
136 FROM_HERE,
137 base::BindOnce(
138 [](Priority priority, FlushCallback callback,
139 base::WeakPtr<SpeculativeReportQueueImpl> self) {
140 if (!self) {
141 std::move(callback).Run(
142 Status(error::UNAVAILABLE, "Queue has been destructed"));
143 return;
144 }
145 DCHECK_CALLED_ON_VALID_SEQUENCE(self->sequence_checker_);
146 if (!self->report_queue_) {
147 std::move(callback).Run(Status(error::FAILED_PRECONDITION,
148 "ReportQueue is not ready yet."));
149 return;
150 }
151 self->report_queue_->Flush(priority, std::move(callback));
152 },
153 priority, std::move(callback), weak_ptr_factory_.GetWeakPtr()));
154}
155
Leonid Baraz2aaffa32022-06-01 17:04:45156void SpeculativeReportQueueImpl::AddRecord(std::string record,
Leonid Barazb8c275352021-08-05 00:59:09157 Priority priority,
158 EnqueueCallback callback) const {
159 sequenced_task_runner_->PostTask(
160 FROM_HERE,
161 base::BindOnce(&SpeculativeReportQueueImpl::MaybeEnqueueRecord,
Leonid Baraz2aaffa32022-06-01 17:04:45162 weak_ptr_factory_.GetWeakPtr(), std::move(record),
Leonid Barazb8c275352021-08-05 00:59:09163 priority, std::move(callback)));
164}
165
166void SpeculativeReportQueueImpl::MaybeEnqueueRecord(
Leonid Baraz2aaffa32022-06-01 17:04:45167 std::string record,
Leonid Barazb8c275352021-08-05 00:59:09168 Priority priority,
169 EnqueueCallback callback) const {
170 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
171 if (!report_queue_) {
172 // Queue is not ready yet, store the record in the memory
173 // queue.
174 pending_records_.emplace(record, priority);
175 std::move(callback).Run(Status::StatusOK());
176 return;
177 }
178 // Queue is ready. If memory queue is empty, just forward the
179 // record.
180 if (pending_records_.empty()) {
Leonid Baraz2aaffa32022-06-01 17:04:45181 report_queue_->Enqueue(std::move(record), priority, std::move(callback));
Leonid Barazb8c275352021-08-05 00:59:09182 return;
183 }
184 // If memory queue is not empty, attach the new record at the
185 // end and initiate enqueuing of everything from there.
Leonid Baraz2aaffa32022-06-01 17:04:45186 pending_records_.emplace(std::move(record), priority);
Leonid Barazb8c275352021-08-05 00:59:09187 EnqueuePendingRecords(std::move(callback));
188}
189
190void SpeculativeReportQueueImpl::EnqueuePendingRecords(
191 EnqueueCallback callback) const {
192 DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
Leonid Barazb8c275352021-08-05 00:59:09193 DCHECK(report_queue_);
Ahmed Nasrd6d02da72022-04-13 18:07:41194 if (pending_records_.empty()) {
195 std::move(callback).Run(Status::StatusOK());
196 return;
197 }
198
Leonid Barazb8c275352021-08-05 00:59:09199 std::string record(pending_records_.front().first);
200 Priority priority = pending_records_.front().second;
201 pending_records_.pop();
202 if (pending_records_.empty()) {
203 // Last of the pending records.
204 report_queue_->Enqueue(record, priority, std::move(callback));
205 return;
206 }
207 report_queue_->Enqueue(
208 record, priority,
209 base::BindPostTask(
210 sequenced_task_runner_,
211 base::BindOnce(
212 [](base::WeakPtr<const SpeculativeReportQueueImpl> self,
213 EnqueueCallback callback, Status status) {
214 if (!status.ok()) {
215 std::move(callback).Run(status);
216 return;
217 }
218 if (!self) {
219 std::move(callback).Run(
220 Status(error::UNAVAILABLE, "Queue has been destructed"));
221 return;
222 }
223 self->sequenced_task_runner_->PostTask(
224 FROM_HERE,
225 base::BindOnce(
226 &SpeculativeReportQueueImpl::EnqueuePendingRecords,
227 self, std::move(callback)));
228 },
229 weak_ptr_factory_.GetWeakPtr(), std::move(callback))));
230}
231
232base::OnceCallback<void(StatusOr<std::unique_ptr<ReportQueue>>)>
233SpeculativeReportQueueImpl::PrepareToAttachActualQueue() const {
234 return base::BindPostTask(
235 sequenced_task_runner_,
236 base::BindOnce(
237 [](base::WeakPtr<SpeculativeReportQueueImpl> speculative_queue,
238 StatusOr<std::unique_ptr<ReportQueue>> actual_queue_result) {
239 if (!speculative_queue) {
240 return; // Speculative queue was destructed in a meantime.
241 }
242 if (!actual_queue_result.ok()) {
243 return; // Actual queue creation failed.
244 }
245 // Set actual queue for the speculative queue to use
246 // (asynchronously).
247 speculative_queue->AttachActualQueue(
248 std::move(actual_queue_result.ValueOrDie()));
249 },
250 weak_ptr_factory_.GetWeakPtr()));
251}
252
253void SpeculativeReportQueueImpl::AttachActualQueue(
254 std::unique_ptr<ReportQueue> actual_queue) {
255 sequenced_task_runner_->PostTask(
256 FROM_HERE,
257 base::BindOnce(
258 [](base::WeakPtr<SpeculativeReportQueueImpl> self,
259 std::unique_ptr<ReportQueue> actual_queue) {
260 if (!self) {
261 return;
262 }
263 DCHECK_CALLED_ON_VALID_SEQUENCE(self->sequence_checker_);
264 if (self->report_queue_) {
265 // Already attached, do nothing.
266 return;
267 }
268 self->report_queue_ = std::move(actual_queue);
269 if (!self->pending_records_.empty()) {
270 self->EnqueuePendingRecords(
271 base::BindOnce([](Status enqueue_status) {
272 if (!enqueue_status.ok()) {
273 LOG(ERROR) << "Pending records failed to enqueue, status="
274 << enqueue_status;
275 }
276 }));
277 }
278 },
279 weak_ptr_factory_.GetWeakPtr(), std::move(actual_queue)));
280}
281
Leonid Baraz61437cb2021-02-26 20:43:06282} // namespace reporting