Store the actual queue creation status in SpeculativeReportQueue.
This will replying with failure status to enqueue requests in case
the actual queue creation failed.
In a follow up CL, the enqueue requests callbacks will be deferred to
until the actual queue is created or failed.
Bug: b:239583016
Change-Id: If123a8a66a4526063f30d76329bb605f3f724db7
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3794505
Commit-Queue: Ahmed Nasr <[email protected]>
Reviewed-by: Vignesh Shenvi <[email protected]>
Reviewed-by: Leonid Baraz <[email protected]>
Cr-Commit-Position: refs/heads/main@{#1030005}
diff --git a/components/reporting/client/report_queue_impl.cc b/components/reporting/client/report_queue_impl.cc
index 9f564c8..d39544c 100644
--- a/components/reporting/client/report_queue_impl.cc
+++ b/components/reporting/client/report_queue_impl.cc
@@ -11,18 +11,15 @@
#include "base/bind.h"
#include "base/callback.h"
-#include "base/json/json_writer.h"
+#include "base/check.h"
#include "base/memory/ptr_util.h"
-#include "base/memory/ref_counted.h"
#include "base/memory/scoped_refptr.h"
#include "base/notreached.h"
#include "base/sequence_checker.h"
-#include "base/strings/strcat.h"
#include "base/task/bind_post_task.h"
#include "base/task/task_traits.h"
#include "base/task/thread_pool.h"
#include "base/time/time.h"
-#include "base/values.h"
#include "components/reporting/client/report_queue_configuration.h"
#include "components/reporting/proto/synced/record.pb.h"
#include "components/reporting/proto/synced/record_constants.pb.h"
@@ -179,12 +176,18 @@
return;
}
DCHECK_CALLED_ON_VALID_SEQUENCE(self->sequence_checker_);
- if (!self->report_queue_) {
+ if (!self->status_or_report_queue_.has_value()) {
std::move(callback).Run(Status(error::FAILED_PRECONDITION,
"ReportQueue is not ready yet."));
return;
}
- self->report_queue_->Flush(priority, std::move(callback));
+ if (!self->status_or_report_queue_->ok()) {
+ std::move(callback).Run(self->status_or_report_queue_->status());
+ return;
+ }
+ const std::unique_ptr<ReportQueue>& report_queue =
+ self->status_or_report_queue_->ValueOrDie();
+ report_queue->Flush(priority, std::move(callback));
},
priority, std::move(callback), weak_ptr_factory_.GetWeakPtr()));
}
@@ -207,18 +210,23 @@
EnqueueCallback callback,
RecordProducer record_producer) const {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
- if (!report_queue_) {
- // Queue is not ready yet, store the record in the memory
- // queue.
+ if (!status_or_report_queue_.has_value()) {
+ // Queue is not ready yet, store the record in the memory queue.
pending_record_producers_.emplace(std::move(record_producer), priority);
std::move(callback).Run(Status::StatusOK());
return;
}
- // Queue is ready. If memory queue is empty, just forward the
- // record.
+ if (!status_or_report_queue_->ok()) {
+ // Queue creation failed.
+ std::move(callback).Run(status_or_report_queue_->status());
+ return;
+ }
+ // Queue is ready. If memory queue is empty, just forward the record.
if (pending_record_producers_.empty()) {
- report_queue_->AddProducedRecord(std::move(record_producer), priority,
- std::move(callback));
+ const std::unique_ptr<ReportQueue>& report_queue =
+ status_or_report_queue_->ValueOrDie();
+ report_queue->AddProducedRecord(std::move(record_producer), priority,
+ std::move(callback));
return;
}
// If memory queue is not empty, attach the new record at the
@@ -230,21 +238,24 @@
void SpeculativeReportQueueImpl::EnqueuePendingRecordProducers(
EnqueueCallback callback) const {
DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
- DCHECK(report_queue_);
+ DCHECK(status_or_report_queue_.has_value());
+ DCHECK(status_or_report_queue_->ok());
if (pending_record_producers_.empty()) {
std::move(callback).Run(Status::StatusOK());
return;
}
+ const std::unique_ptr<ReportQueue>& report_queue =
+ status_or_report_queue_->ValueOrDie();
auto head = std::move(pending_record_producers_.front());
pending_record_producers_.pop();
if (pending_record_producers_.empty()) {
// Last of the pending records.
- report_queue_->AddProducedRecord(std::move(head.record_producer),
- head.record_priority, std::move(callback));
+ report_queue->AddProducedRecord(std::move(head.record_producer),
+ head.record_priority, std::move(callback));
return;
}
- report_queue_->AddProducedRecord(
+ report_queue->AddProducedRecord(
std::move(head.record_producer), head.record_priority,
base::BindPostTask(
sequenced_task_runner_,
@@ -278,34 +289,36 @@
if (!speculative_queue) {
return; // Speculative queue was destructed in a meantime.
}
- if (!actual_queue_result.ok()) {
- return; // Actual queue creation failed.
- }
// Set actual queue for the speculative queue to use
// (asynchronously).
speculative_queue->AttachActualQueue(
- std::move(actual_queue_result.ValueOrDie()));
+ std::move(std::move(actual_queue_result)));
},
weak_ptr_factory_.GetWeakPtr()));
}
void SpeculativeReportQueueImpl::AttachActualQueue(
- std::unique_ptr<ReportQueue> actual_queue) {
+ StatusOr<std::unique_ptr<ReportQueue>> status_or_actual_queue) {
sequenced_task_runner_->PostTask(
FROM_HERE,
base::BindOnce(
[](base::WeakPtr<SpeculativeReportQueueImpl> self,
- std::unique_ptr<ReportQueue> actual_queue) {
+ StatusOr<std::unique_ptr<ReportQueue>> status_or_actual_queue) {
if (!self) {
return;
}
DCHECK_CALLED_ON_VALID_SEQUENCE(self->sequence_checker_);
- if (self->report_queue_) {
+ if (self->status_or_report_queue_.has_value()) {
// Already attached, do nothing.
return;
}
- self->report_queue_ = std::move(actual_queue);
- if (!self->pending_record_producers_.empty()) {
+ self->status_or_report_queue_ = std::move(status_or_actual_queue);
+ // TODO(b/239583016): remove the ok status check once the enqueue
+ // callbacks are stored along with the records in a pending queue
+ // instead of only the records, and run the callbacks with the
+ // failure status if creation failed.
+ if (self->status_or_report_queue_->ok() &&
+ !self->pending_record_producers_.empty()) {
self->EnqueuePendingRecordProducers(
base::BindOnce([](Status enqueue_status) {
if (!enqueue_status.ok()) {
@@ -315,7 +328,7 @@
}));
}
},
- weak_ptr_factory_.GetWeakPtr(), std::move(actual_queue)));
+ weak_ptr_factory_.GetWeakPtr(), std::move(status_or_actual_queue)));
}
} // namespace reporting