diff options
Diffstat (limited to 'thread_pthread.c')
-rw-r--r-- | thread_pthread.c | 251 |
1 files changed, 143 insertions, 108 deletions
diff --git a/thread_pthread.c b/thread_pthread.c index 1ec460940a..f9352bbb56 100644 --- a/thread_pthread.c +++ b/thread_pthread.c @@ -374,40 +374,47 @@ ractor_sched_dump_(const char *file, int line, rb_vm_t *vm) #define thread_sched_unlock(a, b) thread_sched_unlock_(a, b, __FILE__, __LINE__) static void -thread_sched_lock_(struct rb_thread_sched *sched, rb_thread_t *th, const char *file, int line) +thread_sched_set_locked(struct rb_thread_sched *sched, rb_thread_t *th) { - rb_native_mutex_lock(&sched->lock_); - -#if VM_CHECK_MODE - RUBY_DEBUG_LOG2(file, line, "th:%u prev_owner:%u", rb_th_serial(th), rb_th_serial(sched->lock_owner)); +#if VM_CHECK_MODE > 0 VM_ASSERT(sched->lock_owner == NULL); + sched->lock_owner = th; -#else - RUBY_DEBUG_LOG2(file, line, "th:%u", rb_th_serial(th)); #endif } static void -thread_sched_unlock_(struct rb_thread_sched *sched, rb_thread_t *th, const char *file, int line) +thread_sched_set_unlocked(struct rb_thread_sched *sched, rb_thread_t *th) { - RUBY_DEBUG_LOG2(file, line, "th:%u", rb_th_serial(th)); - -#if VM_CHECK_MODE +#if VM_CHECK_MODE > 0 VM_ASSERT(sched->lock_owner == th); + sched->lock_owner = NULL; #endif - - rb_native_mutex_unlock(&sched->lock_); } static void -thread_sched_set_lock_owner(struct rb_thread_sched *sched, rb_thread_t *th) +thread_sched_lock_(struct rb_thread_sched *sched, rb_thread_t *th, const char *file, int line) { - RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); + rb_native_mutex_lock(&sched->lock_); -#if VM_CHECK_MODE > 0 - sched->lock_owner = th; +#if VM_CHECK_MODE + RUBY_DEBUG_LOG2(file, line, "r:%d th:%u", th ? (int)rb_ractor_id(th->ractor) : -1, rb_th_serial(th)); +#else + RUBY_DEBUG_LOG2(file, line, "th:%u", rb_th_serial(th)); #endif + + thread_sched_set_locked(sched, th); +} + +static void +thread_sched_unlock_(struct rb_thread_sched *sched, rb_thread_t *th, const char *file, int line) +{ + RUBY_DEBUG_LOG2(file, line, "th:%u", rb_th_serial(th)); + + thread_sched_set_unlocked(sched, th); + + rb_native_mutex_unlock(&sched->lock_); } static void @@ -542,7 +549,6 @@ ractor_sched_timeslice_threads_contain_p(rb_vm_t *vm, rb_thread_t *th) } static void ractor_sched_barrier_join_signal_locked(rb_vm_t *vm); -static void ractor_sched_barrier_join_wait_locked(rb_vm_t *vm, rb_thread_t *th); // setup timeslice signals by the timer thread. static void @@ -585,11 +591,10 @@ thread_sched_setup_running_threads(struct rb_thread_sched *sched, rb_ractor_t *c } if (add_th) { - while (UNLIKELY(vm->ractor.sched.barrier_waiting)) { - RUBY_DEBUG_LOG("barrier-wait"); - - ractor_sched_barrier_join_signal_locked(vm); - ractor_sched_barrier_join_wait_locked(vm, add_th); + if (vm->ractor.sched.barrier_waiting) { + // TODO: GC barrier check? + RUBY_DEBUG_LOG("barrier_waiting"); + RUBY_VM_SET_VM_BARRIER_INTERRUPT(add_th->ec); } VM_ASSERT(!ractor_sched_running_threads_contain_p(vm, add_th)); @@ -598,7 +603,6 @@ thread_sched_setup_running_threads(struct rb_thread_sched *sched, rb_ractor_t *c ccan_list_add(&vm->ractor.sched.running_threads, &add_th->sched.node.running_threads); vm->ractor.sched.running_cnt++; sched->is_running = true; - VM_ASSERT(!vm->ractor.sched.barrier_waiting); } if (add_timeslice_th) { @@ -622,19 +626,6 @@ thread_sched_setup_running_threads(struct rb_thread_sched *sched, rb_ractor_t *c } ractor_sched_unlock(vm, cr); - if (add_th && !del_th && UNLIKELY(vm->ractor.sync.lock_owner != NULL)) { - // it can be after barrier synchronization by another ractor - rb_thread_t *lock_owner = NULL; -#if VM_CHECK_MODE - lock_owner = sched->lock_owner; -#endif - thread_sched_unlock(sched, lock_owner); - { - RB_VM_LOCKING(); - } - thread_sched_lock(sched, lock_owner); - } - //RUBY_DEBUG_LOG("+:%u -:%u +ts:%u -ts:%u run:%u->%u", // rb_th_serial(add_th), rb_th_serial(del_th), // rb_th_serial(add_timeslice_th), rb_th_serial(del_timeslice_th), @@ -753,7 +744,8 @@ thread_sched_enq(struct rb_thread_sched *sched, rb_thread_t *ready_th) } } else { - VM_ASSERT(!ractor_sched_timeslice_threads_contain_p(ready_th->vm, sched->running)); + // ractor_sched lock is needed + // VM_ASSERT(!ractor_sched_timeslice_threads_contain_p(ready_th->vm, sched->running)); } ccan_list_add_tail(&sched->readyq, &ready_th->sched.node.readyq); @@ -849,12 +841,12 @@ thread_sched_wait_running_turn(struct rb_thread_sched *sched, rb_thread_t *th, b if (th_has_dedicated_nt(th)) { RUBY_DEBUG_LOG("(nt) sleep th:%u running:%u", rb_th_serial(th), rb_th_serial(sched->running)); - thread_sched_set_lock_owner(sched, NULL); + thread_sched_set_unlocked(sched, th); { RUBY_DEBUG_LOG("nt:%d cond:%p", th->nt->serial, &th->nt->cond.readyq); rb_native_cond_wait(&th->nt->cond.readyq, &sched->lock_); } - thread_sched_set_lock_owner(sched, th); + thread_sched_set_locked(sched, th); RUBY_DEBUG_LOG("(nt) wakeup %s", sched->running == th ? "success" : "failed"); if (th == sched->running) { @@ -870,12 +862,12 @@ thread_sched_wait_running_turn(struct rb_thread_sched *sched, rb_thread_t *th, b RUBY_DEBUG_LOG("th:%u->%u (direct)", rb_th_serial(th), rb_th_serial(next_th)); - thread_sched_set_lock_owner(sched, NULL); + thread_sched_set_unlocked(sched, th); { rb_ractor_set_current_ec(th->ractor, NULL); thread_sched_switch(th, next_th); } - thread_sched_set_lock_owner(sched, th); + thread_sched_set_locked(sched, th); } else { // search another ready ractor @@ -884,12 +876,12 @@ thread_sched_wait_running_turn(struct rb_thread_sched *sched, rb_thread_t *th, b RUBY_DEBUG_LOG("th:%u->%u (ractor scheduling)", rb_th_serial(th), rb_th_serial(next_th)); - thread_sched_set_lock_owner(sched, NULL); + thread_sched_set_unlocked(sched, th); { rb_ractor_set_current_ec(th->ractor, NULL); coroutine_transfer0(th->sched.context, nt->nt_context, false); } - thread_sched_set_lock_owner(sched, th); + thread_sched_set_locked(sched, th); } VM_ASSERT(rb_current_ec_noinline() == th->ec); @@ -1041,15 +1033,45 @@ thread_sched_to_waiting(struct rb_thread_sched *sched, rb_thread_t *th) } // mini utility func -static void -setup_ubf(rb_thread_t *th, rb_unblock_function_t *func, void *arg) +// return true if any there are any interrupts +static bool +ubf_set(rb_thread_t *th, rb_unblock_function_t *func, void *arg) { + VM_ASSERT(func != NULL); + + retry: + if (RUBY_VM_INTERRUPTED(th->ec)) { + RUBY_DEBUG_LOG("interrupted:0x%x", th->ec->interrupt_flag); + return true; + } + rb_native_mutex_lock(&th->interrupt_lock); { + if (!th->ec->raised_flag && RUBY_VM_INTERRUPTED(th->ec)) { + rb_native_mutex_unlock(&th->interrupt_lock); + goto retry; + } + + VM_ASSERT(th->unblock.func == NULL); th->unblock.func = func; th->unblock.arg = arg; } rb_native_mutex_unlock(&th->interrupt_lock); + + return false; +} + +static void +ubf_clear(rb_thread_t *th) +{ + if (th->unblock.func) { + rb_native_mutex_lock(&th->interrupt_lock); + { + th->unblock.func = NULL; + th->unblock.arg = NULL; + } + rb_native_mutex_unlock(&th->interrupt_lock); + } } static void @@ -1085,7 +1107,10 @@ thread_sched_to_waiting_until_wakeup(struct rb_thread_sched *sched, rb_thread_t RUBY_DEBUG_LOG("th:%u", rb_th_serial(th)); RB_VM_SAVE_MACHINE_CONTEXT(th); - setup_ubf(th, ubf_waiting, (void *)th); + + if (ubf_set(th, ubf_waiting, (void *)th)) { + return; + } RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th); @@ -1102,7 +1127,7 @@ thread_sched_to_waiting_until_wakeup(struct rb_thread_sched *sched, rb_thread_t } thread_sched_unlock(sched, th); - setup_ubf(th, NULL, NULL); + ubf_clear(th); } // run another thread in the ready queue. @@ -1311,66 +1336,59 @@ void rb_ractor_unlock_self(rb_ractor_t *r); // The current thread for a ractor is put to "sleep" (descheduled in the STOPPED_FOREVER state) waiting for // a ractor action to wake it up. See docs for `ractor_sched_sleep_with_cleanup` for more info. void -rb_ractor_sched_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf_schedule_ractor_th) +rb_ractor_sched_wait(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf, void *ubf_arg) { // ractor lock of cr is acquired - // r is sleeping status + + RUBY_DEBUG_LOG("start%s", ""); + rb_thread_t * volatile th = rb_ec_thread_ptr(ec); struct rb_thread_sched *sched = TH_SCHED(th); - struct ccan_list_node *waitn = &th->ractor_waiting.waiting_node; - VM_ASSERT(waitn->next == waitn->prev && waitn->next == waitn); // it should be unlinked - ccan_list_add(&cr->sync.wait.waiting_threads, waitn); - setup_ubf(th, ubf_schedule_ractor_th, (void *)ec); + if (ubf_set(th, ubf, ubf_arg)) { + // interrupted + return; + } thread_sched_lock(sched, th); { + // setup sleep + bool can_direct_transfer = !th_has_dedicated_nt(th); + RB_VM_SAVE_MACHINE_CONTEXT(th); + th->status = THREAD_STOPPED_FOREVER; + RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th); + thread_sched_wakeup_next_thread(sched, th, can_direct_transfer); + rb_ractor_unlock_self(cr); { - if (RUBY_VM_INTERRUPTED(th->ec)) { - RUBY_DEBUG_LOG("interrupted"); - } - else if (th->ractor_waiting.wakeup_status != wakeup_none) { - RUBY_DEBUG_LOG("awaken:%d", (int)th->ractor_waiting.wakeup_status); - } - else { - // sleep - RB_VM_SAVE_MACHINE_CONTEXT(th); - th->status = THREAD_STOPPED_FOREVER; - - RB_INTERNAL_THREAD_HOOK(RUBY_INTERNAL_THREAD_EVENT_SUSPENDED, th); - - bool can_direct_transfer = !th_has_dedicated_nt(th); - thread_sched_wakeup_next_thread(sched, th, can_direct_transfer); - thread_sched_wait_running_turn(sched, th, can_direct_transfer); - th->status = THREAD_RUNNABLE; - // wakeup - } + // sleep + thread_sched_wait_running_turn(sched, th, can_direct_transfer); + th->status = THREAD_RUNNABLE; } + rb_ractor_lock_self(cr); } thread_sched_unlock(sched, th); - setup_ubf(th, NULL, NULL); + ubf_clear(th); - rb_ractor_lock_self(cr); - ccan_list_del_init(waitn); + RUBY_DEBUG_LOG("end%s", ""); } void -rb_ractor_sched_wakeup(rb_ractor_t *r, rb_thread_t *th) +rb_ractor_sched_wakeup(rb_ractor_t *r, rb_thread_t *r_th) { - // ractor lock of r is acquired - struct rb_thread_sched *sched = TH_SCHED(th); + // ractor lock of r is NOT acquired + struct rb_thread_sched *sched = TH_SCHED(r_th); - VM_ASSERT(th->ractor_waiting.wakeup_status != 0); + RUBY_DEBUG_LOG("r:%u th:%d", (unsigned int)rb_ractor_id(r), r_th->serial); - thread_sched_lock(sched, th); + thread_sched_lock(sched, r_th); { - if (th->status == THREAD_STOPPED_FOREVER) { - thread_sched_to_ready_common(sched, th, true, false); + if (r_th->status == THREAD_STOPPED_FOREVER) { + thread_sched_to_ready_common(sched, r_th, true, false); } } - thread_sched_unlock(sched, th); + thread_sched_unlock(sched, r_th); } static bool @@ -1378,6 +1396,7 @@ ractor_sched_barrier_completed_p(rb_vm_t *vm) { RUBY_DEBUG_LOG("run:%u wait:%u", vm->ractor.sched.running_cnt, vm->ractor.sched.barrier_waiting_cnt); VM_ASSERT(vm->ractor.sched.running_cnt - 1 >= vm->ractor.sched.barrier_waiting_cnt); + return (vm->ractor.sched.running_cnt - vm->ractor.sched.barrier_waiting_cnt) == 1; } @@ -1388,6 +1407,8 @@ rb_ractor_sched_barrier_start(rb_vm_t *vm, rb_ractor_t *cr) VM_ASSERT(vm->ractor.sync.lock_owner == cr); // VM is locked VM_ASSERT(!vm->ractor.sched.barrier_waiting); VM_ASSERT(vm->ractor.sched.barrier_waiting_cnt == 0); + VM_ASSERT(vm->ractor.sched.barrier_ractor == NULL); + VM_ASSERT(vm->ractor.sched.barrier_lock_rec == 0); RUBY_DEBUG_LOG("start serial:%u", vm->ractor.sched.barrier_serial); @@ -1396,46 +1417,60 @@ rb_ractor_sched_barrier_start(rb_vm_t *vm, rb_ractor_t *cr) ractor_sched_lock(vm, cr); { vm->ractor.sched.barrier_waiting = true; + vm->ractor.sched.barrier_ractor = cr; + vm->ractor.sched.barrier_lock_rec = vm->ractor.sync.lock_rec; // release VM lock lock_rec = vm->ractor.sync.lock_rec; vm->ractor.sync.lock_rec = 0; vm->ractor.sync.lock_owner = NULL; rb_native_mutex_unlock(&vm->ractor.sync.lock); - { - // interrupts all running threads - rb_thread_t *ith; - ccan_list_for_each(&vm->ractor.sched.running_threads, ith, sched.node.running_threads) { - if (ith->ractor != cr) { - RUBY_DEBUG_LOG("barrier int:%u", rb_th_serial(ith)); - RUBY_VM_SET_VM_BARRIER_INTERRUPT(ith->ec); - } - } - // wait for other ractors - while (!ractor_sched_barrier_completed_p(vm)) { - ractor_sched_set_unlocked(vm, cr); - rb_native_cond_wait(&vm->ractor.sched.barrier_complete_cond, &vm->ractor.sched.lock); - ractor_sched_set_locked(vm, cr); + // interrupts all running threads + rb_thread_t *ith; + ccan_list_for_each(&vm->ractor.sched.running_threads, ith, sched.node.running_threads) { + if (ith->ractor != cr) { + RUBY_DEBUG_LOG("barrier request to th:%u", rb_th_serial(ith)); + RUBY_VM_SET_VM_BARRIER_INTERRUPT(ith->ec); } } - } - ractor_sched_unlock(vm, cr); - // acquire VM lock - rb_native_mutex_lock(&vm->ractor.sync.lock); - vm->ractor.sync.lock_rec = lock_rec; - vm->ractor.sync.lock_owner = cr; + // wait for other ractors + while (!ractor_sched_barrier_completed_p(vm)) { + ractor_sched_set_unlocked(vm, cr); + rb_native_cond_wait(&vm->ractor.sched.barrier_complete_cond, &vm->ractor.sched.lock); + ractor_sched_set_locked(vm, cr); + } - RUBY_DEBUG_LOG("completed seirial:%u", vm->ractor.sched.barrier_serial); + RUBY_DEBUG_LOG("completed seirial:%u", vm->ractor.sched.barrier_serial); - ractor_sched_lock(vm, cr); - { - vm->ractor.sched.barrier_waiting = false; + // no other ractors are there vm->ractor.sched.barrier_serial++; vm->ractor.sched.barrier_waiting_cnt = 0; rb_native_cond_broadcast(&vm->ractor.sched.barrier_release_cond); + + // acquire VM lock + rb_native_mutex_lock(&vm->ractor.sync.lock); + vm->ractor.sync.lock_rec = lock_rec; + vm->ractor.sync.lock_owner = cr; } + + // do not release ractor_sched_lock and threre is no newly added (resumed) thread + // thread_sched_setup_running_threads +} + +// called from vm_lock_leave if the vm_lock used for barrierred +void +rb_ractor_sched_barrier_end(rb_vm_t *vm, rb_ractor_t *cr) +{ + RUBY_DEBUG_LOG("serial:%u", (unsigned int)vm->ractor.sched.barrier_serial - 1); + VM_ASSERT(vm->ractor.sched.barrier_waiting); + VM_ASSERT(vm->ractor.sched.barrier_ractor); + VM_ASSERT(vm->ractor.sched.barrier_lock_rec > 0); + + vm->ractor.sched.barrier_waiting = false; + vm->ractor.sched.barrier_ractor = NULL; + vm->ractor.sched.barrier_lock_rec = 0; ractor_sched_unlock(vm, cr); } |