diff options
author | Koichi Sasada <[email protected]> | 2025-05-27 03:58:04 +0900 |
---|---|---|
committer | Koichi Sasada <[email protected]> | 2025-05-31 04:01:33 +0900 |
commit | ef2bb61018cd9ccb5b61a3d91911e04a773da4a7 (patch) | |
tree | fcf3685efc9d3efaa1a66236ad17d2a72b7c5144 /thread_win32.c | |
parent | d2a1ad00cbba41e22c11abf2948c23cd8d68f565 (diff) |
`Ractor::Port`
* Added `Ractor::Port`
* `Ractor::Port#receive` (support multi-threads)
* `Rcator::Port#close`
* `Ractor::Port#closed?`
* Added some methods
* `Ractor#join`
* `Ractor#value`
* `Ractor#monitor`
* `Ractor#unmonitor`
* Removed some methods
* `Ractor#take`
* `Ractor.yield`
* Change the spec
* `Racotr.select`
You can wait for multiple sequences of messages with `Ractor::Port`.
```ruby
ports = 3.times.map{ Ractor::Port.new }
ports.map.with_index do |port, ri|
Ractor.new port,ri do |port, ri|
3.times{|i| port << "r#{ri}-#{i}"}
end
end
p ports.each{|port| pp 3.times.map{port.receive}}
```
In this example, we use 3 ports, and 3 Ractors send messages to them respectively.
We can receive a series of messages from each port.
You can use `Ractor#value` to get the last value of a Ractor's block:
```ruby
result = Ractor.new do
heavy_task()
end.value
```
You can wait for the termination of a Ractor with `Ractor#join` like this:
```ruby
Ractor.new do
some_task()
end.join
```
`#value` and `#join` are similar to `Thread#value` and `Thread#join`.
To implement `#join`, `Ractor#monitor` (and `Ractor#unmonitor`) is introduced.
This commit changes `Ractor.select()` method.
It now only accepts ports or Ractors, and returns when a port receives a message or a Ractor terminates.
We removes `Ractor.yield` and `Ractor#take` because:
* `Ractor::Port` supports most of similar use cases in a simpler manner.
* Removing them significantly simplifies the code.
We also change the internal thread scheduler code (thread_pthread.c):
* During barrier synchronization, we keep the `ractor_sched` lock to avoid deadlocks.
This lock is released by `rb_ractor_sched_barrier_end()`
which is called at the end of operations that require the barrier.
* fix potential deadlock issues by checking interrupts just before setting UBF.
https://bugs.ruby-lang.org/issues/21262
Notes
Notes:
Merged: https://github.com/ruby/ruby/pull/13445
Diffstat (limited to 'thread_win32.c')
-rw-r--r-- | thread_win32.c | 14 |
1 files changed, 5 insertions, 9 deletions
diff --git a/thread_win32.c b/thread_win32.c index c656d79a1a..ed8a99dd88 100644 --- a/thread_win32.c +++ b/thread_win32.c @@ -922,6 +922,7 @@ vm_barrier_finish_p(rb_vm_t *vm) vm->ractor.blocking_cnt); VM_ASSERT(vm->ractor.blocking_cnt <= vm->ractor.cnt); + return vm->ractor.blocking_cnt == vm->ractor.cnt; } @@ -947,7 +948,7 @@ rb_ractor_sched_barrier_start(rb_vm_t *vm, rb_ractor_t *cr) // wait while (!vm_barrier_finish_p(vm)) { - rb_vm_cond_wait(vm, &vm->ractor.sync.barrier_cond); + rb_vm_cond_wait(vm, &vm->ractor.sync.barrier_complete_cond); } RUBY_DEBUG_LOG("cnt:%u barrier success", vm->ractor.sync.barrier_cnt); @@ -957,9 +958,7 @@ rb_ractor_sched_barrier_start(rb_vm_t *vm, rb_ractor_t *cr) vm->ractor.sync.barrier_waiting = false; vm->ractor.sync.barrier_cnt++; - ccan_list_for_each(&vm->ractor.set, r, vmlr_node) { - rb_native_cond_signal(&r->barrier_wait_cond); - } + rb_native_cond_broadcast(&vm->ractor.sync.barrier_release_cond); } void @@ -983,7 +982,7 @@ rb_ractor_sched_barrier_join(rb_vm_t *vm, rb_ractor_t *cr) if (vm_barrier_finish_p(vm)) { RUBY_DEBUG_LOG("wakeup barrier owner"); - rb_native_cond_signal(&vm->ractor.sync.barrier_cond); + rb_native_cond_signal(&vm->ractor.sync.barrier_complete_cond); } else { RUBY_DEBUG_LOG("wait for barrier finish"); @@ -991,10 +990,7 @@ rb_ractor_sched_barrier_join(rb_vm_t *vm, rb_ractor_t *cr) // wait for restart while (barrier_cnt == vm->ractor.sync.barrier_cnt) { - vm->ractor.sync.lock_owner = NULL; - rb_native_cond_wait(&cr->barrier_wait_cond, &vm->ractor.sync.lock); - VM_ASSERT(vm->ractor.sync.lock_owner == NULL); - vm->ractor.sync.lock_owner = cr; + rb_vm_cond_wait(vm, &vm->ractor.sync.barrier_release_cond); } RUBY_DEBUG_LOG("barrier is released. Acquire vm_lock"); |