Telemetry: Introducing MetricReportQueue class

This class serves as a wrapper of ReportQueue, enable periodic
flush based on policy.

Bug: b:177847653
Change-Id: I40c05e15c5353a3ee3850c549cb5d1d63f9122a8
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/3247824
Commit-Queue: Ahmed Nasr <[email protected]>
Reviewed-by: Leonid Baraz <[email protected]>
Cr-Commit-Position: refs/heads/main@{#936642}
diff --git a/components/reporting/metrics/BUILD.gn b/components/reporting/metrics/BUILD.gn
index a055b77..55066a64 100644
--- a/components/reporting/metrics/BUILD.gn
+++ b/components/reporting/metrics/BUILD.gn
@@ -6,6 +6,8 @@
   sources = [
     "metric_rate_controller.cc",
     "metric_rate_controller.h",
+    "metric_report_queue.cc",
+    "metric_report_queue.h",
     "metric_reporting_controller.cc",
     "metric_reporting_controller.h",
     "reporting_settings.h",
@@ -13,7 +15,10 @@
   ]
   deps = [
     "//base",
+    "//components/reporting/client:report_queue",
     "//components/reporting/proto:metric_data_proto",
+    "//components/reporting/proto:record_constants",
+    "//components/reporting/util:status",
   ]
 }
 
@@ -33,6 +38,7 @@
   testonly = true
   sources = [
     "metric_rate_controller_unittest.cc",
+    "metric_report_queue_unittest.cc",
     "metric_reporting_controller_unittest.cc",
   ]
   deps = [
@@ -40,6 +46,11 @@
     ":test_support",
     "//base",
     "//base/test:test_support",
+    "//components/reporting/client:test_support",
+    "//components/reporting/proto:metric_data_proto",
+    "//components/reporting/proto:record_constants",
+    "//components/reporting/util:status",
+    "//testing/gmock",
     "//testing/gtest",
   ]
 }
diff --git a/components/reporting/metrics/metric_report_queue.cc b/components/reporting/metrics/metric_report_queue.cc
new file mode 100644
index 0000000..b6e0787
--- /dev/null
+++ b/components/reporting/metrics/metric_report_queue.cc
@@ -0,0 +1,51 @@
+// Copyright 2021 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "components/reporting/metrics/metric_report_queue.h"
+
+#include "base/bind.h"
+#include "base/logging.h"
+#include "components/reporting/metrics/metric_rate_controller.h"
+#include "components/reporting/metrics/reporting_settings.h"
+#include "components/reporting/proto/metric_data.pb.h"
+#include "components/reporting/util/status.h"
+
+namespace reporting {
+
+MetricReportQueue::MetricReportQueue(
+    std::unique_ptr<ReportQueue, base::OnTaskRunnerDeleter> report_queue,
+    Priority priority)
+    : report_queue_(std::move(report_queue)), priority_(priority) {}
+
+MetricReportQueue::MetricReportQueue(
+    std::unique_ptr<ReportQueue, base::OnTaskRunnerDeleter> report_queue,
+    Priority priority,
+    ReportingSettings* reporting_settings,
+    const std::string& rate_setting_path,
+    base::TimeDelta default_rate,
+    int rate_unit_to_ms)
+    : MetricReportQueue(std::move(report_queue), priority) {
+  rate_controller_ = std::make_unique<MetricRateController>(
+      base::BindRepeating(&MetricReportQueue::Flush, base::Unretained(this)),
+      reporting_settings, rate_setting_path, default_rate, rate_unit_to_ms);
+  rate_controller_->Start();
+}
+
+MetricReportQueue::~MetricReportQueue() = default;
+
+void MetricReportQueue::Enqueue(const MetricData& metric_data,
+                                ReportQueue::EnqueueCallback callback) {
+  report_queue_->Enqueue(&metric_data, priority_, std::move(callback));
+}
+
+void MetricReportQueue::Flush() {
+  report_queue_->Flush(
+      priority_, base::BindOnce([](Status status) {
+        if (!status.ok()) {
+          DVLOG(1) << "Could not upload metric data records because of: "
+                   << status;
+        }
+      }));
+}
+}  // namespace reporting
diff --git a/components/reporting/metrics/metric_report_queue.h b/components/reporting/metrics/metric_report_queue.h
new file mode 100644
index 0000000..b279fc8f
--- /dev/null
+++ b/components/reporting/metrics/metric_report_queue.h
@@ -0,0 +1,61 @@
+// Copyright 2021 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#ifndef COMPONENTS_REPORTING_METRICS_METRIC_REPORT_QUEUE_H_
+#define COMPONENTS_REPORTING_METRICS_METRIC_REPORT_QUEUE_H_
+
+#include <memory>
+#include <string>
+
+#include "base/task/sequenced_task_runner.h"
+#include "base/time/time.h"
+#include "components/reporting/client/report_queue.h"
+#include "components/reporting/proto/synced/record_constants.pb.h"
+
+namespace reporting {
+
+class MetricData;
+class MetricRateController;
+class ReportingSettings;
+
+// Simple wrapper for `::reporting::ReportQueue` that can be set for periodic
+// upload.
+class MetricReportQueue {
+ public:
+  MetricReportQueue(
+      std::unique_ptr<ReportQueue, base::OnTaskRunnerDeleter> report_queue,
+      Priority priority);
+
+  // Constructor used if the reporter is needed to upload records periodically.
+  MetricReportQueue(
+      std::unique_ptr<ReportQueue, base::OnTaskRunnerDeleter> report_queue,
+      Priority priority,
+      ReportingSettings* reporting_settings,
+      const std::string& rate_setting_path,
+      base::TimeDelta default_rate,
+      int rate_unit_to_ms = 1);
+
+  MetricReportQueue() = delete;
+  MetricReportQueue(const MetricReportQueue& other) = delete;
+  MetricReportQueue& operator=(const MetricReportQueue& other) = delete;
+
+  virtual ~MetricReportQueue();
+
+  // Enqueue the metric data.
+  virtual void Enqueue(const MetricData& metric_data,
+                       ReportQueue::EnqueueCallback callback);
+
+  // Initiate upload of records with `priority_`.
+  virtual void Flush();
+
+ private:
+  const std::unique_ptr<ReportQueue, base::OnTaskRunnerDeleter> report_queue_;
+
+  const Priority priority_;
+
+  std::unique_ptr<MetricRateController> rate_controller_;
+};
+}  // namespace reporting
+
+#endif  // COMPONENTS_REPORTING_METRICS_METRIC_REPORT_QUEUE_H_
diff --git a/components/reporting/metrics/metric_report_queue_unittest.cc b/components/reporting/metrics/metric_report_queue_unittest.cc
new file mode 100644
index 0000000..1c9bf35f
--- /dev/null
+++ b/components/reporting/metrics/metric_report_queue_unittest.cc
@@ -0,0 +1,155 @@
+// Copyright 2021 The Chromium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style license that can be
+// found in the LICENSE file.
+
+#include "components/reporting/metrics/metric_report_queue.h"
+
+#include <memory>
+#include <string>
+
+#include "base/strings/string_piece_forward.h"
+#include "base/task/sequenced_task_runner.h"
+#include "base/task/thread_pool.h"
+#include "base/test/bind.h"
+#include "base/test/task_environment.h"
+#include "base/time/time.h"
+#include "components/reporting/client/mock_report_queue.h"
+#include "components/reporting/metrics/fake_reporting_settings.h"
+#include "components/reporting/proto/metric_data.pb.h"
+#include "components/reporting/proto/synced/record_constants.pb.h"
+#include "components/reporting/util/status.h"
+#include "testing/gmock/include/gmock/gmock.h"
+#include "testing/gtest/include/gtest/gtest.h"
+
+namespace reporting {
+
+using ::testing::_;
+
+class MetricReportQueueTest : public ::testing::Test {
+ public:
+  void SetUp() override {
+    priority_ = Priority::SLOW_BATCH;
+    settings_ = std::make_unique<FakeReportingSettings>();
+  }
+
+ protected:
+  const std::string kRateSettingPath = "rate_path";
+
+  std::unique_ptr<FakeReportingSettings> settings_;
+
+  Priority priority_;
+
+  base::test::TaskEnvironment task_environment_{
+      base::test::TaskEnvironment::TimeSource::MOCK_TIME};
+};
+
+TEST_F(MetricReportQueueTest, ManualFlush) {
+  auto mock_queue =
+      std::unique_ptr<::reporting::MockReportQueue, base::OnTaskRunnerDeleter>(
+          new testing::StrictMock<::reporting::MockReportQueue>(),
+          base::OnTaskRunnerDeleter(
+              base::ThreadPool::CreateSequencedTaskRunner({})));
+  auto* mock_queue_ptr = mock_queue.get();
+  MetricData record;
+  record.set_timestamp_ms(123456);
+
+  MetricReportQueue metric_report_queue(std::move(mock_queue), priority_);
+
+  EXPECT_CALL(*mock_queue_ptr, AddRecord(_, _, _))
+      .WillOnce([&record, this](base::StringPiece record_string,
+                                Priority actual_priority,
+                                ReportQueue::EnqueueCallback cb) {
+        std::move(cb).Run(Status());
+        MetricData actual_record;
+
+        EXPECT_TRUE(actual_record.ParseFromArray(record_string.data(),
+                                                 record_string.size()));
+        EXPECT_EQ(actual_record.timestamp_ms(), record.timestamp_ms());
+        EXPECT_EQ(actual_priority, priority_);
+      });
+  bool callback_called = false;
+  metric_report_queue.Enqueue(
+      record, base::BindLambdaForTesting(
+                  [&callback_called](Status) { callback_called = true; }));
+  EXPECT_TRUE(callback_called);
+
+  EXPECT_CALL(*mock_queue_ptr, Flush(priority_, _)).Times(1);
+  metric_report_queue.Flush();
+}
+
+TEST_F(MetricReportQueueTest, RateControlledFlush_TimeNotElapsed) {
+  constexpr int rate_ms = 10000;
+  settings_->SetInteger(kRateSettingPath, rate_ms);
+  auto mock_queue =
+      std::unique_ptr<::reporting::MockReportQueue, base::OnTaskRunnerDeleter>(
+          new testing::StrictMock<::reporting::MockReportQueue>(),
+          base::OnTaskRunnerDeleter(
+              base::ThreadPool::CreateSequencedTaskRunner({})));
+  auto* mock_queue_ptr = mock_queue.get();
+  MetricData record;
+  record.set_timestamp_ms(123456);
+
+  MetricReportQueue metric_report_queue(std::move(mock_queue), priority_,
+                                        settings_.get(), kRateSettingPath,
+                                        /*default_rate=*/base::Milliseconds(1));
+
+  EXPECT_CALL(*mock_queue_ptr, AddRecord(_, _, _))
+      .WillOnce([&record, this](base::StringPiece record_string,
+                                Priority actual_priority,
+                                ReportQueue::EnqueueCallback cb) {
+        std::move(cb).Run(Status());
+        MetricData actual_record;
+
+        EXPECT_TRUE(actual_record.ParseFromArray(record_string.data(),
+                                                 record_string.size()));
+        EXPECT_EQ(actual_record.timestamp_ms(), record.timestamp_ms());
+        EXPECT_EQ(actual_priority, priority_);
+      });
+  bool callback_called = false;
+  metric_report_queue.Enqueue(
+      record, base::BindLambdaForTesting(
+                  [&callback_called](Status) { callback_called = true; }));
+  EXPECT_TRUE(callback_called);
+
+  EXPECT_CALL(*mock_queue_ptr, Flush).Times(0);
+  task_environment_.FastForwardBy(base::Milliseconds(rate_ms - 1));
+}
+
+TEST_F(MetricReportQueueTest, RateControlledFlush_TimeElapsed) {
+  constexpr int rate_ms = 10000;
+  settings_->SetInteger(kRateSettingPath, rate_ms);
+  auto mock_queue =
+      std::unique_ptr<::reporting::MockReportQueue, base::OnTaskRunnerDeleter>(
+          new testing::StrictMock<::reporting::MockReportQueue>(),
+          base::OnTaskRunnerDeleter(
+              base::ThreadPool::CreateSequencedTaskRunner({})));
+  auto* mock_queue_ptr = mock_queue.get();
+  MetricData record;
+  record.set_timestamp_ms(123456);
+
+  MetricReportQueue metric_report_queue(std::move(mock_queue), priority_,
+                                        settings_.get(), kRateSettingPath,
+                                        /*default_rate=*/base::Milliseconds(1));
+
+  EXPECT_CALL(*mock_queue_ptr, AddRecord(_, _, _))
+      .WillOnce([&record, this](base::StringPiece record_string,
+                                Priority actual_priority,
+                                ReportQueue::EnqueueCallback cb) {
+        std::move(cb).Run(Status());
+        MetricData actual_record;
+
+        EXPECT_TRUE(actual_record.ParseFromArray(record_string.data(),
+                                                 record_string.size()));
+        EXPECT_EQ(actual_record.timestamp_ms(), record.timestamp_ms());
+        EXPECT_EQ(actual_priority, priority_);
+      });
+  bool callback_called = false;
+  metric_report_queue.Enqueue(
+      record, base::BindLambdaForTesting(
+                  [&callback_called](Status) { callback_called = true; }));
+  EXPECT_TRUE(callback_called);
+
+  EXPECT_CALL(*mock_queue_ptr, Flush(priority_, _)).Times(1);
+  task_environment_.FastForwardBy(base::Milliseconds(rate_ms));
+}
+}  // namespace reporting