blob: cdd5e2ce5b561dc042362e51c951743e2d38e269 [file] [log] [blame]
// Copyright 2016 The Chromium Authors
// Use of this source code is governed by a BSD-style license that can be
// found in the LICENSE file.
#include "base/task/thread_pool/worker_thread.h"
#include <stddef.h>
#include <algorithm>
#include <atomic>
#include <optional>
#include <utility>
#include "base/check_op.h"
#include "base/compiler_specific.h"
#include "base/debug/alias.h"
#include "base/functional/callback_helpers.h"
#include "base/synchronization/waitable_event.h"
#include "base/task/thread_pool/environment_config.h"
#include "base/task/thread_pool/worker_thread_observer.h"
#include "base/threading/hang_watcher.h"
#include "base/time/time.h"
#include "base/time/time_override.h"
#include "base/trace_event/base_tracing.h"
#include "build/build_config.h"
#include "partition_alloc/buildflags.h"
#if PA_BUILDFLAG(USE_PARTITION_ALLOC)
#include "partition_alloc/partition_alloc_config.h" // nogncheck
#endif
#if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
#include "base/files/file_descriptor_watcher_posix.h"
#endif
#if BUILDFLAG(IS_APPLE)
#include "base/apple/scoped_nsautorelease_pool.h"
#endif
#if PA_BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
PA_CONFIG(THREAD_CACHE_SUPPORTED)
#include "partition_alloc/thread_cache.h" // nogncheck
#endif
namespace base::internal {
constexpr TimeDelta WorkerThread::Delegate::kPurgeThreadCacheIdleDelay;
WorkerThread::ThreadLabel WorkerThread::Delegate::GetThreadLabel() const {
return WorkerThread::ThreadLabel::POOLED;
}
bool WorkerThread::Delegate::TimedWait(TimeDelta timeout) {
return wake_up_event_.TimedWait(timeout);
}
void WorkerThread::Delegate::WaitForWork() {
const TimeDelta sleep_duration_before_worker_reclaim = GetSleepTimeout();
// When a thread goes to sleep, the memory retained by its thread cache is
// trapped there for as long as the thread sleeps. To prevent that, we can
// either purge the thread cache right before going to sleep, or after some
// delay.
//
// Purging the thread cache incurs a cost on the next task, since its thread
// cache will be empty and allocation performance initially lower. As a lot of
// sleeps are very short, do not purge all the time (this would also make
// sleep / wakeups cycles more costly).
//
// Instead, sleep for min(timeout, 1s). If the wait times out then purge at
// that point, and go to sleep for the remaining of the time. This ensures
// that we do no work for short sleeps, and that threads do not get awaken
// many times.
#if PA_BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
PA_CONFIG(THREAD_CACHE_SUPPORTED)
const TimeDelta sleep_duration_before_purge =
GetSleepDurationBeforePurge(base::TimeTicks::Now());
const bool was_signaled = TimedWait(std::min(
sleep_duration_before_purge, sleep_duration_before_worker_reclaim));
// Timed out.
if (!was_signaled) {
partition_alloc::ThreadCache::PurgeCurrentThread();
// The thread woke up to purge before its standard reclaim time. Sleep for
// what's remaining until then.
if (sleep_duration_before_worker_reclaim > sleep_duration_before_purge) {
TimedWait(sleep_duration_before_worker_reclaim.is_max()
? TimeDelta::Max()
: sleep_duration_before_worker_reclaim -
sleep_duration_before_purge);
}
}
#else
TimedWait(sleep_duration_before_worker_reclaim);
#endif // PA_BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) &&
// PA_CONFIG(THREAD_CACHE_SUPPORTED)
}
#if PA_BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) && \
PA_CONFIG(THREAD_CACHE_SUPPORTED)
TimeDelta WorkerThread::Delegate::GetSleepDurationBeforePurge(TimeTicks now) {
base::TimeDelta sleep_duration_before_purge = kPurgeThreadCacheIdleDelay;
// Use the first time a worker goes to sleep in this process as an
// approximation of the process creation time.
static const TimeTicks first_sleep_time = now;
const TimeTicks first_sleep_time_to_use =
!first_sleep_time_for_testing_.is_null() ? first_sleep_time_for_testing_
: first_sleep_time;
const base::TimeTicks first_wake_time =
first_sleep_time_to_use + kFirstSleepDurationBeforePurge;
// A sleep that occurs within `kFirstSleepDurationBeforePurge` of the
// first sleep lasts at least `kFirstSleepDurationBeforePurge`.
if (now <= first_wake_time) {
// Avoid sleeping for less than `sleep_duration_before_purge` since that is
// the shortest expected duration to wait for a purge.
sleep_duration_before_purge =
std::max(kFirstSleepDurationBeforePurge, sleep_duration_before_purge);
}
// Align wakeups for purges to reduce the chances of taking the CPU out of
// sleep multiple times for these operations. This can happen if many workers
// in the same process scheduled wakeups. This can create a situation where
// any one worker wakes every `kPurgeThreadCacheIdleDelay` / N where N is the
// number of workers.
const TimeTicks snapped_purge_time =
(now + sleep_duration_before_purge)
.SnappedToNextTick(TimeTicks(), kPurgeThreadCacheIdleDelay);
return snapped_purge_time - now;
}
#endif // PA_BUILDFLAG(USE_PARTITION_ALLOC_AS_MALLOC) &&
// PA_CONFIG(THREAD_CACHE_SUPPORTED)
WorkerThread::WorkerThread(ThreadType thread_type_hint,
std::unique_ptr<Delegate> delegate,
TrackedRef<TaskTracker> task_tracker,
size_t sequence_num,
const CheckedLock* predecessor_lock,
void* flow_terminator)
: thread_lock_(predecessor_lock),
task_tracker_(std::move(task_tracker)),
thread_type_hint_(thread_type_hint),
current_thread_type_(GetDesiredThreadType()),
sequence_num_(sequence_num),
flow_terminator_(flow_terminator == nullptr
? reinterpret_cast<intptr_t>(this)
: reinterpret_cast<intptr_t>(flow_terminator)),
delegate_(std::move(delegate)) {
DCHECK(task_tracker_);
DCHECK(CanUseBackgroundThreadTypeForWorkerThread() ||
thread_type_hint_ != ThreadType::kBackground);
DCHECK(CanUseUtilityThreadTypeForWorkerThread() ||
thread_type_hint != ThreadType::kUtility);
DCHECK(delegate_);
delegate_->wake_up_event_.declare_only_used_while_idle();
}
bool WorkerThread::Start(
scoped_refptr<SingleThreadTaskRunner> io_thread_task_runner,
WorkerThreadObserver* worker_thread_observer) {
CheckedLock::AssertNoLockHeldOnCurrentThread();
CheckedAutoLock auto_lock(thread_lock_);
DCHECK(thread_handle_.is_null());
#if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
DCHECK(io_thread_task_runner);
io_thread_task_runner_ = std::move(io_thread_task_runner);
#endif
if (should_exit_.IsSet() || join_called_for_testing_.IsSet()) {
return true;
}
DCHECK(!worker_thread_observer_);
worker_thread_observer_ = worker_thread_observer;
self_ = this;
constexpr size_t kDefaultStackSize = 0;
PlatformThread::CreateWithType(kDefaultStackSize, this, &thread_handle_,
current_thread_type_);
if (thread_handle_.is_null()) {
self_ = nullptr;
return false;
}
return true;
}
void WorkerThread::Destroy() {
CheckedAutoLock auto_lock(thread_lock_);
// If |thread_handle_| wasn't joined, detach it.
if (!thread_handle_.is_null()) {
PlatformThread::Detach(thread_handle_);
}
}
bool WorkerThread::ThreadAliveForTesting() const {
CheckedAutoLock auto_lock(thread_lock_);
return !thread_handle_.is_null();
}
void WorkerThread::JoinForTesting() {
DCHECK(!join_called_for_testing_.IsSet());
join_called_for_testing_.Set();
delegate_->wake_up_event_.Signal();
PlatformThreadHandle thread_handle;
{
CheckedAutoLock auto_lock(thread_lock_);
if (thread_handle_.is_null()) {
return;
}
thread_handle = thread_handle_;
// Reset |thread_handle_| so it isn't joined by the destructor.
thread_handle_ = PlatformThreadHandle();
}
PlatformThread::Join(thread_handle);
}
void WorkerThread::Cleanup() {
DCHECK(!should_exit_.IsSet());
should_exit_.Set();
delegate_->wake_up_event_.Signal();
}
void WorkerThread::WakeUp() {
// Signalling an event can deschedule the current thread. Since being
// descheduled while holding a lock is undesirable (https://crbug.com/890978),
// assert that no lock is held by the current thread.
CheckedLock::AssertNoLockHeldOnCurrentThread();
// Calling WakeUp() after Cleanup() or Join() is wrong because the
// WorkerThread cannot run more tasks.
DCHECK(!join_called_for_testing_.IsSet());
DCHECK(!should_exit_.IsSet());
TRACE_EVENT_INSTANT("wakeup.flow", "WorkerThread::WakeUp",
perfetto::Flow::FromPointer(this));
delegate_->wake_up_event_.Signal();
}
WorkerThread::Delegate* WorkerThread::delegate() {
return delegate_.get();
}
WorkerThread::~WorkerThread() {
Destroy();
}
void WorkerThread::MaybeUpdateThreadType() {
UpdateThreadType(GetDesiredThreadType());
}
void WorkerThread::BeginUnusedPeriod() {
CheckedAutoLock auto_lock(thread_lock_);
DCHECK(last_used_time_.is_null());
last_used_time_ = subtle::TimeTicksNowIgnoringOverride();
}
void WorkerThread::EndUnusedPeriod() {
CheckedAutoLock auto_lock(thread_lock_);
DCHECK(!last_used_time_.is_null());
last_used_time_ = TimeTicks();
}
TimeTicks WorkerThread::GetLastUsedTime() const {
CheckedAutoLock auto_lock(thread_lock_);
return last_used_time_;
}
bool WorkerThread::ShouldExit() const {
// The ordering of the checks is important below. This WorkerThread may be
// released and outlive |task_tracker_| in unit tests. However, when the
// WorkerThread is released, |should_exit_| will be set, so check that
// first.
return should_exit_.IsSet() || join_called_for_testing_.IsSet() ||
task_tracker_->IsShutdownComplete();
}
ThreadType WorkerThread::GetDesiredThreadType() const {
// To avoid shutdown hangs, disallow a type below kNormal during shutdown
if (task_tracker_->HasShutdownStarted()) {
return ThreadType::kDefault;
}
return thread_type_hint_;
}
void WorkerThread::UpdateThreadType(ThreadType desired_thread_type) {
if (desired_thread_type == current_thread_type_) {
return;
}
PlatformThread::SetCurrentThreadType(desired_thread_type);
current_thread_type_ = desired_thread_type;
}
void WorkerThread::ThreadMain() {
#if (BUILDFLAG(IS_POSIX) && !BUILDFLAG(IS_NACL)) || BUILDFLAG(IS_FUCHSIA)
DCHECK(io_thread_task_runner_);
FileDescriptorWatcher file_descriptor_watcher(io_thread_task_runner_);
#endif
if (thread_type_hint_ == ThreadType::kBackground) {
switch (delegate()->GetThreadLabel()) {
case ThreadLabel::POOLED:
RunBackgroundPooledWorker();
return;
case ThreadLabel::SHARED:
RunBackgroundSharedWorker();
return;
case ThreadLabel::DEDICATED:
RunBackgroundDedicatedWorker();
return;
#if BUILDFLAG(IS_WIN)
case ThreadLabel::SHARED_COM:
RunBackgroundSharedCOMWorker();
return;
case ThreadLabel::DEDICATED_COM:
RunBackgroundDedicatedCOMWorker();
return;
#endif // BUILDFLAG(IS_WIN)
}
}
switch (delegate()->GetThreadLabel()) {
case ThreadLabel::POOLED:
RunPooledWorker();
return;
case ThreadLabel::SHARED:
RunSharedWorker();
return;
case ThreadLabel::DEDICATED:
RunDedicatedWorker();
return;
#if BUILDFLAG(IS_WIN)
case ThreadLabel::SHARED_COM:
RunSharedCOMWorker();
return;
case ThreadLabel::DEDICATED_COM:
RunDedicatedCOMWorker();
return;
#endif // BUILDFLAG(IS_WIN)
}
}
NOINLINE void WorkerThread::RunPooledWorker() {
RunWorker();
NO_CODE_FOLDING();
}
NOINLINE void WorkerThread::RunBackgroundPooledWorker() {
RunWorker();
NO_CODE_FOLDING();
}
NOINLINE void WorkerThread::RunSharedWorker() {
RunWorker();
NO_CODE_FOLDING();
}
NOINLINE void WorkerThread::RunBackgroundSharedWorker() {
RunWorker();
NO_CODE_FOLDING();
}
NOINLINE void WorkerThread::RunDedicatedWorker() {
RunWorker();
NO_CODE_FOLDING();
}
NOINLINE void WorkerThread::RunBackgroundDedicatedWorker() {
RunWorker();
NO_CODE_FOLDING();
}
#if BUILDFLAG(IS_WIN)
NOINLINE void WorkerThread::RunSharedCOMWorker() {
RunWorker();
NO_CODE_FOLDING();
}
NOINLINE void WorkerThread::RunBackgroundSharedCOMWorker() {
RunWorker();
NO_CODE_FOLDING();
}
NOINLINE void WorkerThread::RunDedicatedCOMWorker() {
RunWorker();
NO_CODE_FOLDING();
}
NOINLINE void WorkerThread::RunBackgroundDedicatedCOMWorker() {
RunWorker();
NO_CODE_FOLDING();
}
#endif // BUILDFLAG(IS_WIN)
void WorkerThread::RunWorker() {
DCHECK_EQ(self_, this);
TRACE_EVENT_INSTANT0("base", "WorkerThread born", TRACE_EVENT_SCOPE_THREAD);
TRACE_EVENT_BEGIN0("base", "WorkerThread active");
if (worker_thread_observer_) {
worker_thread_observer_->OnWorkerThreadMainEntry();
}
delegate()->OnMainEntry(this);
// Background threads can take an arbitrary amount of time to complete, do not
// watch them for hangs. Ignore priority boosting for now.
const bool watch_for_hangs =
base::HangWatcher::IsThreadPoolHangWatchingEnabled() &&
GetDesiredThreadType() != ThreadType::kBackground;
// If this process has a HangWatcher register this thread for watching.
base::ScopedClosureRunner unregister_for_hang_watching;
if (watch_for_hangs) {
unregister_for_hang_watching = base::HangWatcher::RegisterThread(
base::HangWatcher::ThreadType::kThreadPoolThread);
}
while (!ShouldExit()) {
#if BUILDFLAG(IS_APPLE)
apple::ScopedNSAutoreleasePool autorelease_pool;
#endif
std::optional<WatchHangsInScope> hang_watch_scope;
TRACE_EVENT_END0("base", "WorkerThread active");
hang_watch_scope.reset();
delegate()->WaitForWork();
TRACE_EVENT_BEGIN("base", "WorkerThread active",
perfetto::TerminatingFlow::FromPointer(
reinterpret_cast<void*>(flow_terminator_)));
// Don't GetWork() in the case where we woke up for Cleanup().
if (ShouldExit()) {
break;
}
if (watch_for_hangs) {
hang_watch_scope.emplace();
}
// Thread type needs to be updated before GetWork.
UpdateThreadType(GetDesiredThreadType());
// Get the task source containing the first task to execute.
RegisteredTaskSource task_source = delegate()->GetWork(this);
// If acquiring work failed and the worker's still alive,
// record that this is an unnecessary wakeup.
if (!task_source && !ShouldExit()) {
delegate()->RecordUnnecessaryWakeup();
}
while (task_source) {
// Alias pointer for investigation of memory corruption. crbug.com/1218384
TaskSource* task_source_before_run = task_source.get();
base::debug::Alias(&task_source_before_run);
task_source = task_tracker_->RunAndPopNextTask(std::move(task_source));
// Alias pointer for investigation of memory corruption. crbug.com/1218384
TaskSource* task_source_before_move = task_source.get();
base::debug::Alias(&task_source_before_move);
// We emplace the hang_watch_scope here so that each hang watch scope
// covers one GetWork (or SwapProcessedTask) as well as one
// RunAndPopNextTask.
if (watch_for_hangs) {
hang_watch_scope.emplace();
}
RegisteredTaskSource new_task_source =
delegate()->SwapProcessedTask(std::move(task_source), this);
UpdateThreadType(GetDesiredThreadType());
// Check that task_source is always cleared, to help investigation of
// memory corruption where task_source is non-null after being moved.
// crbug.com/1218384
CHECK(!task_source); // NOLINT(bugprone-use-after-move)
task_source = std::move(new_task_source);
}
}
// Important: It is unsafe to access unowned state (e.g. |task_tracker_|)
// after invoking OnMainExit().
delegate()->OnMainExit(this);
if (worker_thread_observer_) {
worker_thread_observer_->OnWorkerThreadMainExit();
}
// Release the self-reference to |this|. This can result in deleting |this|
// and as such no more member accesses should be made after this point.
self_ = nullptr;
TRACE_EVENT_END0("base", "WorkerThread active");
TRACE_EVENT_INSTANT0("base", "WorkerThread dead", TRACE_EVENT_SCOPE_THREAD);
}
} // namespace base::internal