blob: 60f73ba6381fdcb2fd4b5060cabc42bff490e315 [file] [log] [blame]
Avi Drissman8ba1bad2022-09-13 19:22:361// Copyright 2020 The Chromium Authors
Leonid Baraz61437cb2021-02-26 20:43:062// Use of this source code is governed by a BSD-style license that can be
3// found in the LICENSE file.
4
Leonid Barazf10cae82021-09-14 00:59:385#include "components/reporting/client/report_queue_impl.h"
Leonid Baraz61437cb2021-02-26 20:43:066
7#include <memory>
Leonid Barazf84aa6c2021-12-13 19:38:208#include <queue>
Leonid Baraz61437cb2021-02-26 20:43:069#include <string>
10#include <utility>
11
12#include "base/bind.h"
13#include "base/callback.h"
Ahmed Nasrf21e8432022-07-30 00:48:3714#include "base/check.h"
Leonid Baraz61437cb2021-02-26 20:43:0615#include "base/memory/ptr_util.h"
Leonid Baraz61437cb2021-02-26 20:43:0616#include "base/memory/scoped_refptr.h"
Leonid Barazb8c275352021-08-05 00:59:0917#include "base/notreached.h"
Leonid Baraz61437cb2021-02-26 20:43:0618#include "base/sequence_checker.h"
Patrick Monette643cdf62021-10-15 19:13:4219#include "base/task/bind_post_task.h"
Leonid Baraz61437cb2021-02-26 20:43:0620#include "base/task/task_traits.h"
21#include "base/task/thread_pool.h"
22#include "base/time/time.h"
Leonid Barazcaac7c92021-03-04 17:34:0523#include "components/reporting/client/report_queue_configuration.h"
Josh Hilkee7a46992021-10-21 20:21:1924#include "components/reporting/proto/synced/record.pb.h"
25#include "components/reporting/proto/synced/record_constants.pb.h"
Leonid Baraz61437cb2021-02-26 20:43:0626#include "components/reporting/storage/storage_module_interface.h"
27#include "components/reporting/util/status.h"
Leonid Baraz61437cb2021-02-26 20:43:0628#include "components/reporting/util/statusor.h"
29
30namespace reporting {
Leonid Barazf8a9daf2022-06-02 01:09:3531namespace {
32// Calls |record_producer|, checks the result and in case of success, forwards
33// it to the storage. In production code should be invoked asynchronously, on a
34// thread pool (no synchronization expected).
35void AddRecordToStorage(scoped_refptr<StorageModuleInterface> storage,
36 Priority priority,
37 std::string dm_token,
38 Destination destination,
39 ReportQueue::RecordProducer record_producer,
40 StorageModuleInterface::EnqueueCallback callback) {
Leonid Baraz8e2830e2022-06-02 20:26:5041 // Generate record data.
Leonid Barazf8a9daf2022-06-02 01:09:3542 auto record_result = std::move(record_producer).Run();
43 if (!record_result.ok()) {
44 std::move(callback).Run(record_result.status());
45 return;
46 }
47
48 // Augment data.
49 Record record;
50 *record.mutable_data() = std::move(record_result.ValueOrDie());
51 record.set_destination(destination);
52
Leonid Baraz8e2830e2022-06-02 20:26:5053 // |record| with no DM token is assumed to be associated with device DM token
Leonid Barazf8a9daf2022-06-02 01:09:3554 if (!dm_token.empty()) {
55 *record.mutable_dm_token() = std::move(dm_token);
56 }
57
58 // Calculate timestamp in microseconds - to match Spanner expectations.
59 const int64_t time_since_epoch_us =
60 base::Time::Now().ToJavaTime() * base::Time::kMicrosecondsPerMillisecond;
61 record.set_timestamp_us(time_since_epoch_us);
62 if (!record_result.ok()) {
63 std::move(callback).Run(record_result.status());
64 return;
65 }
66
67 // Add resulting Record to the storage.
68 storage->AddRecord(priority, std::move(record), std::move(callback));
69}
70} // namespace
Leonid Baraz61437cb2021-02-26 20:43:0671
Leonid Baraz4d49766a2021-10-16 23:50:4872void ReportQueueImpl::Create(
Leonid Baraz61437cb2021-02-26 20:43:0673 std::unique_ptr<ReportQueueConfiguration> config,
Leonid Baraz4d49766a2021-10-16 23:50:4874 scoped_refptr<StorageModuleInterface> storage,
75 base::OnceCallback<void(StatusOr<std::unique_ptr<ReportQueue>>)> cb) {
76 std::move(cb).Run(base::WrapUnique<ReportQueueImpl>(
77 new ReportQueueImpl(std::move(config), storage)));
Leonid Baraz61437cb2021-02-26 20:43:0678}
79
Leonid Baraz61437cb2021-02-26 20:43:0680ReportQueueImpl::ReportQueueImpl(
81 std::unique_ptr<ReportQueueConfiguration> config,
82 scoped_refptr<StorageModuleInterface> storage)
Leonid Barazf8a9daf2022-06-02 01:09:3583 : config_(std::move(config)), storage_(storage) {}
Leonid Baraz61437cb2021-02-26 20:43:0684
Leonid Barazf8a9daf2022-06-02 01:09:3585ReportQueueImpl::~ReportQueueImpl() = default;
86
87void ReportQueueImpl::AddProducedRecord(RecordProducer record_producer,
88 Priority priority,
89 EnqueueCallback callback) const {
Leonid Baraz61437cb2021-02-26 20:43:0690 const Status status = config_->CheckPolicy();
91 if (!status.ok()) {
92 std::move(callback).Run(status);
93 return;
94 }
95
96 if (priority == Priority::UNDEFINED_PRIORITY) {
97 std::move(callback).Run(