[ThreadPool]: Implement RunIntent.
This CL is refactor-only:
- Adds TaskSource::WillRunTask() and TaskSource::GetMaxConcurrency
to control max concurrency of a TaskSource (Sequence is always 1).
- Returns RegisteredTaskSourceAndRunIntent from GetWork().
- Refactor TaskSource::DidProcessTask(bool can_run) to highlight that the
the task might not have run.
Currently, only concurrency == 1 is supported, so DCHECK that RunIntent is valid.
This will change in a follow-up:
https://chromium-review.googlesource.com/c/chromium/src/+/1582427
Bug: 839091
Change-Id: Ib81966effd0467c784176875f5695e50cf345365
Reviewed-on: https://chromium-review.googlesource.com/c/chromium/src/+/1655389
Commit-Queue: Etienne Pierre-Doray <[email protected]>
Reviewed-by: François Doray <[email protected]>
Reviewed-by: Gabriel Charette <[email protected]>
Cr-Commit-Position: refs/heads/master@{#675386}
diff --git a/base/task/thread_pool/task_source.h b/base/task/thread_pool/task_source.h
index fea31657..b46678d 100644
--- a/base/task/thread_pool/task_source.h
+++ b/base/task/thread_pool/task_source.h
@@ -40,10 +40,13 @@
// A TaskSource is a virtual class that provides a series of Tasks that must be
// executed.
//
-// In order to execute a task from this TaskSource, TakeTask() can be called to
-// access the next Task, and DidRunTask() must be called after the task executed
-// and before accessing any subsequent Tasks. This ensure that the number of
-// workers concurrently running tasks never go over the intended concurrency.
+// In order to execute a task from this TaskSource, a worker should first make
+// sure that a task can run with WillRunTask(). TakeTask() can then be called to
+// access the next Task, and DidProcessTask() must be called after the task
+// executed. Many overlapping chains of WillRunTask(), TakeTask(), run and
+// DidProcessTask() can run concurrently, as permitted by WillRunTask(). This
+// ensure that the number of workers concurrently running tasks never go over
+// the intended concurrency.
//
// In comments below, an "empty TaskSource" is a TaskSource with no Task.
//
@@ -55,11 +58,54 @@
// running it (and taking Tasks from it as a result). A dangling reference cycle
// would only occur should they release their reference to it while it's not
// empty. In other words, it is only correct for them to release it when
-// DidRunTask() returns false.
+// DidProcessTask() returns false.
//
// This class is thread-safe.
class BASE_EXPORT TaskSource : public RefCountedThreadSafe<TaskSource> {
public:
+ // Indicates whether a TaskSource may run any additional tasks.
+ enum class ConcurrencyStatus {
+ kSaturated, // The maximum intended concurrency was reached.
+ kPartial, // Additional tasks may concurrently run.
+ };
+
+ // Result of WillRunTask(). A single task associated with a RunIntent may be
+ // accessed with TakeTask() and run iff this evaluates to true.
+ class BASE_EXPORT RunIntent {
+ public:
+ RunIntent() = default;
+ RunIntent(RunIntent&&) noexcept;
+ ~RunIntent();
+
+ RunIntent& operator=(RunIntent&&);
+
+ operator bool() const { return !!task_source_; }
+
+ // Returns true iff the TaskSource from which this RunIntent was obtained
+ // may not run any additional tasks beyond this RunIntent as it has reached
+ // its maximum concurrency. This indicates that the TaskSource no longer
+ // needs to be queued.
+ bool IsSaturated() const {
+ return concurrency_status_ == ConcurrencyStatus::kSaturated;
+ }
+
+ const TaskSource* task_source() const { return task_source_; }
+
+ private:
+ friend class TaskSource;
+
+ RunIntent(const TaskSource* task_source,
+ ConcurrencyStatus concurrency_status);
+
+ void Release() {
+ DCHECK(task_source_);
+ task_source_ = nullptr;
+ }
+
+ const TaskSource* task_source_ = nullptr;
+ ConcurrencyStatus concurrency_status_ = ConcurrencyStatus::kSaturated;
+ };
+
// A Transaction can perform multiple operations atomically on a
// TaskSource. While a Transaction is alive, it is guaranteed that nothing
// else will access the TaskSource; the TaskSource's lock is held for the
@@ -69,6 +115,8 @@
Transaction(Transaction&& other);
~Transaction();
+ operator bool() const { return !!task_source_; }
+
// Returns the next task to run from this TaskSource. This should be called
// only if NeedsWorker returns true. Cannot be called on an empty
// TaskSource.
@@ -77,12 +125,12 @@
// Optional<Task> is never nullptr. An Optional is used in preparation for
// the merge between ThreadPool and TaskQueueManager (in Blink).
// https://crbug.com/783309
- Optional<Task> TakeTask();
+ Optional<Task> TakeTask(RunIntent intent);
- // Must be called once the task was executed. Cannot be called on an empty
- // TaskSource. Returns true if the TaskSource should be queued after this
- // operation.
- bool DidRunTask();
+ // Must be called once the task was run or skipped. |was_run| should be true
+ // if the task executed. Cannot be called on an empty TaskSource. Returns
+ // true if the TaskSource should be queued after this operation.
+ bool DidProcessTask(bool was_run);
// Returns a SequenceSortKey representing the priority of the TaskSource.
// Cannot be called on an empty TaskSource.
@@ -125,6 +173,19 @@
virtual ExecutionEnvironment GetExecutionEnvironment() = 0;
+ // Informs this TaskSource that an additional Task could be run. Returns a
+ // RunIntent that evaluates to true if this operation is allowed (TakeTask()
+ // can be called), or false otherwise. This function is not thread safe and
+ // must be externally synchronized (e.g. by the lock of the PriorityQueue
+ // holding the TaskSource).
+ virtual RunIntent WillRunTask() = 0;
+
+ // Returns the maximum number of tasks from this TaskSource that can run
+ // concurrently. The concurrency is generally controlled through
+ // WillRunTask(), but calling this directly is useful to determine the right
+ // number of workers beforehand.
+ virtual size_t GetMaxConcurrency() const = 0;
+
// Support for IntrusiveHeap.
void SetHeapHandle(const HeapHandle& handle);
void ClearHeapHandle();
@@ -137,7 +198,7 @@
}
// A reference to TaskRunner is only retained between PushTask() and when
- // DidRunTask() returns false, guaranteeing it is safe to dereference this
+ // DidProcessTask() returns false, guaranteeing it is safe to dereference this
// pointer. Otherwise, the caller should guarantee such TaskRunner still
// exists before dereferencing.
TaskRunner* task_runner() const { return task_runner_; }
@@ -149,9 +210,10 @@
virtual Optional<Task> TakeTask() = 0;
- // Returns true if the TaskSource should be queued after this
- // operation.
- virtual bool DidRunTask() = 0;
+ // Informs this TaskSource that a task was processed. |was_run| indicates
+ // whether the task was allowed to run or not. Returns true if the TaskSource
+ // should be queued after this operation.
+ virtual bool DidProcessTask(bool was_run) = 0;
virtual SequenceSortKey GetSortKey() const = 0;
@@ -160,6 +222,10 @@
// Sets TaskSource priority to |priority|.
void UpdatePriority(TaskPriority priority);
+ // Constructs and returns a RunIntent, where |is_saturated| indicates that the
+ // TaskSource has reached its maximum concurrency.
+ RunIntent MakeRunIntent(ConcurrencyStatus concurrency_status) const;
+
// The TaskTraits of all Tasks in the TaskSource.
TaskTraits traits_;
@@ -176,7 +242,7 @@
// A pointer to the TaskRunner that posts to this TaskSource, if any. The
// derived class is responsible for calling AddRef() when a TaskSource from
// which no Task is executing becomes non-empty and Release() when
- // DidRunTask() returns false.
+ // DidProcessTask() returns false.
TaskRunner* task_runner_;
TaskSourceExecutionMode execution_mode_;
@@ -190,7 +256,7 @@
public:
RegisteredTaskSource();
RegisteredTaskSource(std::nullptr_t);
- RegisteredTaskSource(RegisteredTaskSource&& other);
+ RegisteredTaskSource(RegisteredTaskSource&& other) noexcept;
~RegisteredTaskSource();
RegisteredTaskSource& operator=(RegisteredTaskSource&& other);
@@ -208,7 +274,6 @@
private:
friend class TaskTracker;
-
RegisteredTaskSource(scoped_refptr<TaskSource> task_source,
TaskTracker* task_tracker);
@@ -218,32 +283,58 @@
DISALLOW_COPY_AND_ASSIGN(RegisteredTaskSource);
};
-template <class T>
-struct BASE_EXPORT BasicTaskSourceAndTransaction {
- T task_source;
- TaskSource::Transaction transaction;
-
- static BasicTaskSourceAndTransaction FromTaskSource(T task_source) {
- auto transaction = task_source->BeginTransaction();
- return BasicTaskSourceAndTransaction(std::move(task_source),
- std::move(transaction));
+// Base implementation for TransactionWith[Owned/Registered]TaskSource (with
+// Transaction as the decorator) and RunIntentWithRegisteredTaskSource (with
+// RunIntent as the decorator).
+template <class Decorator, class T>
+class BASE_EXPORT DecoratorWithTaskSource : public Decorator {
+ public:
+ DecoratorWithTaskSource() = default;
+ DecoratorWithTaskSource(std::nullptr_t) : DecoratorWithTaskSource() {}
+ DecoratorWithTaskSource(T task_source_in, Decorator decorator)
+ : Decorator(std::move(decorator)),
+ task_source_(std::move(task_source_in)) {
+ DCHECK_EQ(task_source_.get(), this->task_source());
}
+ DecoratorWithTaskSource(DecoratorWithTaskSource&& other) = default;
+ ~DecoratorWithTaskSource() = default;
- BasicTaskSourceAndTransaction(T task_source_in,
- TaskSource::Transaction transaction_in)
- : task_source(std::move(task_source_in)),
- transaction(std::move(transaction_in)) {}
- BasicTaskSourceAndTransaction(BasicTaskSourceAndTransaction&& other) =
- default;
- ~BasicTaskSourceAndTransaction() = default;
+ DecoratorWithTaskSource& operator=(DecoratorWithTaskSource&&) = default;
- DISALLOW_COPY_AND_ASSIGN(BasicTaskSourceAndTransaction);
+ T take_task_source() { return std::move(task_source_); }
+
+ protected:
+ T task_source_;
+
+ DISALLOW_COPY_AND_ASSIGN(DecoratorWithTaskSource);
};
-using TaskSourceAndTransaction =
- BasicTaskSourceAndTransaction<scoped_refptr<TaskSource>>;
-using RegisteredTaskSourceAndTransaction =
- BasicTaskSourceAndTransaction<RegisteredTaskSource>;
+// A RunIntent with an additional RegisteredTaskSource member.
+using RunIntentWithRegisteredTaskSource =
+ DecoratorWithTaskSource<TaskSource::RunIntent, RegisteredTaskSource>;
+
+template <class T>
+struct BASE_EXPORT BasicTransactionWithTaskSource
+ : public DecoratorWithTaskSource<TaskSource::Transaction, T> {
+ using DecoratorWithTaskSource<TaskSource::Transaction,
+ T>::DecoratorWithTaskSource;
+
+ static BasicTransactionWithTaskSource FromTaskSource(T task_source) {
+ auto transaction = task_source->BeginTransaction();
+ return BasicTransactionWithTaskSource(std::move(task_source),
+ std::move(transaction));
+ }
+};
+
+// A Transaction with an additional scoped_refptr<TaskSource> member. Useful to
+// carry ownership of a TaskSource with an associated Transaction.
+using TransactionWithOwnedTaskSource =
+ BasicTransactionWithTaskSource<scoped_refptr<TaskSource>>;
+
+// A Transaction with an additional RegisteredTaskSource member. Useful to carry
+// a RegisteredTaskSource with an associated Transaction.
+using TransactionWithRegisteredTaskSource =
+ BasicTransactionWithTaskSource<RegisteredTaskSource>;
} // namespace internal
} // namespace base