Enable asynchronous conversion in ReportQueue
Actually, conversion takes place on a level below: some of the
ReportQueue functionality expects synchronous behavior, especially in
tests.
Bug: b:233426237
Change-Id: I351a8f516cdeadd2d13b21ca0f332e54cf55efe5
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3674303
Reviewed-by: Vignesh Shenvi <[email protected]>
Commit-Queue: Leonid Baraz <[email protected]>
Cr-Commit-Position: refs/heads/main@{#1009894}
diff --git a/components/reporting/client/report_queue_impl.cc b/components/reporting/client/report_queue_impl.cc
index 16993eb..33240496 100644
--- a/components/reporting/client/report_queue_impl.cc
+++ b/components/reporting/client/report_queue_impl.cc
@@ -28,10 +28,49 @@
#include "components/reporting/proto/synced/record_constants.pb.h"
#include "components/reporting/storage/storage_module_interface.h"
#include "components/reporting/util/status.h"
-#include "components/reporting/util/status_macros.h"
#include "components/reporting/util/statusor.h"
namespace reporting {
+namespace {
+// Calls |record_producer|, checks the result and in case of success, forwards
+// it to the storage. In production code should be invoked asynchronously, on a
+// thread pool (no synchronization expected).
+void AddRecordToStorage(scoped_refptr<StorageModuleInterface> storage,
+ Priority priority,
+ std::string dm_token,
+ Destination destination,
+ ReportQueue::RecordProducer record_producer,
+ StorageModuleInterface::EnqueueCallback callback) {
+ // Generates record data.
+ auto record_result = std::move(record_producer).Run();
+ if (!record_result.ok()) {
+ std::move(callback).Run(record_result.status());
+ return;
+ }
+
+ // Augment data.
+ Record record;
+ *record.mutable_data() = std::move(record_result.ValueOrDie());
+ record.set_destination(destination);
+
+ // record with no DM token is assumed to be associated with device DM token
+ if (!dm_token.empty()) {
+ *record.mutable_dm_token() = std::move(dm_token);
+ }
+
+ // Calculate timestamp in microseconds - to match Spanner expectations.
+ const int64_t time_since_epoch_us =
+ base::Time::Now().ToJavaTime() * base::Time::kMicrosecondsPerMillisecond;
+ record.set_timestamp_us(time_since_epoch_us);
+ if (!record_result.ok()) {
+ std::move(callback).Run(record_result.status());
+ return;
+ }
+
+ // Add resulting Record to the storage.
+ storage->AddRecord(priority, std::move(record), std::move(callback));
+}
+} // namespace
void ReportQueueImpl::Create(
std::unique_ptr<ReportQueueConfiguration> config,
@@ -41,21 +80,16 @@
new ReportQueueImpl(std::move(config), storage)));
}
-ReportQueueImpl::~ReportQueueImpl() = default;
-
ReportQueueImpl::ReportQueueImpl(
std::unique_ptr<ReportQueueConfiguration> config,
scoped_refptr<StorageModuleInterface> storage)
- : config_(std::move(config)),
- storage_(storage),
- sequenced_task_runner_(
- base::ThreadPool::CreateSequencedTaskRunner(base::TaskTraits())) {
- DETACH_FROM_SEQUENCE(sequence_checker_);
-}
+ : config_(std::move(config)), storage_(storage) {}
-void ReportQueueImpl::AddRecord(std::string record,
- Priority priority,
- EnqueueCallback callback) const {
+ReportQueueImpl::~ReportQueueImpl() = default;
+
+void ReportQueueImpl::AddProducedRecord(RecordProducer record_producer,
+ Priority priority,
+ EnqueueCallback callback) const {
const Status status = config_->CheckPolicy();
if (!status.ok()) {
std::move(callback).Run(status);
@@ -68,34 +102,13 @@
return;
}
- sequenced_task_runner_->PostTask(
- FROM_HERE, base::BindOnce(&ReportQueueImpl::SendRecordToStorage,
- base::Unretained(this), std::move(record),
- priority, std::move(callback)));
-}
-
-void ReportQueueImpl::SendRecordToStorage(std::string record_data,
- Priority priority,
- EnqueueCallback callback) const {
- storage_->AddRecord(priority, AugmentRecord(std::move(record_data)),
- std::move(callback));
-}
-
-Record ReportQueueImpl::AugmentRecord(std::string record_data) const {
- Record record;
- *record.mutable_data() = std::move(record_data);
- record.set_destination(config_->destination());
-
- // record with no DM token is assumed to be associated with device DM token
- if (!config_->dm_token().empty()) {
- record.set_dm_token(config_->dm_token());
- }
-
- // Calculate timestamp in microseconds - to match Spanner expectations.
- const int64_t time_since_epoch_us =
- base::Time::Now().ToJavaTime() * base::Time::kMicrosecondsPerMillisecond;
- record.set_timestamp_us(time_since_epoch_us);
- return record;
+ // Execute |record_producer| on arbitrary thread, analyze the result and send
+ // it to the Storage, returning with the callback.
+ base::ThreadPool::PostTask(
+ FROM_HERE, {base::TaskPriority::BEST_EFFORT},
+ base::BindOnce(&AddRecordToStorage, storage_, priority,
+ config_->dm_token(), config_->destination(),
+ std::move(record_producer), std::move(callback)));
}
void ReportQueueImpl::Flush(Priority priority, FlushCallback callback) {
@@ -153,25 +166,41 @@
priority, std::move(callback), weak_ptr_factory_.GetWeakPtr()));
}
-void SpeculativeReportQueueImpl::AddRecord(std::string record,
- Priority priority,
- EnqueueCallback callback) const {
- sequenced_task_runner_->PostTask(
- FROM_HERE,
- base::BindOnce(&SpeculativeReportQueueImpl::MaybeEnqueueRecord,
- weak_ptr_factory_.GetWeakPtr(), std::move(record),
- priority, std::move(callback)));
+void SpeculativeReportQueueImpl::AddProducedRecord(
+ RecordProducer record_producer,
+ Priority priority,
+ EnqueueCallback callback) const {
+ // Invoke producer on a thread pool, then enqueue record on sequenced task
+ // runner.
+ base::ThreadPool::PostTask(
+ FROM_HERE, {},
+ base::BindOnce(
+ [](RecordProducer record_producer,
+ base::OnceCallback<void(StatusOr<std::string>)> callback) {
+ std::move(callback).Run(std::move(record_producer).Run());
+ },
+ std::move(record_producer),
+ base::BindPostTask(
+ sequenced_task_runner_,
+ base::BindOnce(&SpeculativeReportQueueImpl::MaybeEnqueueRecord,
+ weak_ptr_factory_.GetWeakPtr(), priority,
+ std::move(callback)))));
}
void SpeculativeReportQueueImpl::MaybeEnqueueRecord(
- std::string record,
Priority priority,
- EnqueueCallback callback) const {
+ EnqueueCallback callback,
+ StatusOr<std::string> record_or_error) const {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
+ if (!record_or_error.ok()) {
+ std::move(callback).Run(record_or_error.status());
+ return;
+ }
+ auto record = std::move(record_or_error.ValueOrDie());
if (!report_queue_) {
// Queue is not ready yet, store the record in the memory
// queue.
- pending_records_.emplace(record, priority);
+ pending_records_.emplace(std::move(record), priority);
std::move(callback).Run(Status::StatusOK());
return;
}
@@ -201,11 +230,11 @@
pending_records_.pop();
if (pending_records_.empty()) {
// Last of the pending records.
- report_queue_->Enqueue(record, priority, std::move(callback));
+ report_queue_->Enqueue(std::move(record), priority, std::move(callback));
return;
}
report_queue_->Enqueue(
- record, priority,
+ std::move(record), priority,
base::BindPostTask(
sequenced_task_runner_,
base::BindOnce(