Stop exporting symbols for MJIT
[ruby.git] / ractor.c
blob49e4f457c229ce8ce5376d25060a463615611e07
1 // Ractor implementation
3 #include "ruby/ruby.h"
4 #include "ruby/thread.h"
5 #include "ruby/ractor.h"
6 #include "ruby/thread_native.h"
7 #include "vm_core.h"
8 #include "eval_intern.h"
9 #include "vm_sync.h"
10 #include "ractor_core.h"
11 #include "internal/complex.h"
12 #include "internal/error.h"
13 #include "internal/gc.h"
14 #include "internal/hash.h"
15 #include "internal/rational.h"
16 #include "internal/struct.h"
17 #include "internal/thread.h"
18 #include "variable.h"
19 #include "transient_heap.h"
20 #include "yjit.h"
21 #include "mjit.h"
23 VALUE rb_cRactor;
24 static VALUE rb_cRactorSelector;
26 VALUE rb_eRactorUnsafeError;
27 VALUE rb_eRactorIsolationError;
28 static VALUE rb_eRactorError;
29 static VALUE rb_eRactorRemoteError;
30 static VALUE rb_eRactorMovedError;
31 static VALUE rb_eRactorClosedError;
32 static VALUE rb_cRactorMovedObject;
34 static void vm_ractor_blocking_cnt_inc(rb_vm_t *vm, rb_ractor_t *r, const char *file, int line);
36 // Ractor locking
38 static void
39 ASSERT_ractor_unlocking(rb_ractor_t *r)
41 #if RACTOR_CHECK_MODE > 0
42 // GET_EC is NULL in an MJIT worker
43 if (rb_current_execution_context(false) != NULL && r->sync.locked_by == rb_ractor_self(GET_RACTOR())) {
44 rb_bug("recursive ractor locking");
46 #endif
49 static void
50 ASSERT_ractor_locking(rb_ractor_t *r)
52 #if RACTOR_CHECK_MODE > 0
53 // GET_EC is NULL in an MJIT worker
54 if (rb_current_execution_context(false) != NULL && r->sync.locked_by != rb_ractor_self(GET_RACTOR())) {
55 rp(r->sync.locked_by);
56 rb_bug("ractor lock is not acquired.");
58 #endif
61 static void
62 ractor_lock(rb_ractor_t *r, const char *file, int line)
64 RUBY_DEBUG_LOG2(file, line, "locking r:%u%s", r->pub.id, GET_RACTOR() == r ? " (self)" : "");
66 ASSERT_ractor_unlocking(r);
67 rb_native_mutex_lock(&r->sync.lock);
69 #if RACTOR_CHECK_MODE > 0
70 if (rb_current_execution_context(false) != NULL) { // GET_EC is NULL in an MJIT worker
71 r->sync.locked_by = rb_ractor_self(GET_RACTOR());
73 #endif
75 RUBY_DEBUG_LOG2(file, line, "locked r:%u%s", r->pub.id, GET_RACTOR() == r ? " (self)" : "");
78 static void
79 ractor_lock_self(rb_ractor_t *cr, const char *file, int line)
81 VM_ASSERT(cr == GET_RACTOR());
82 #if RACTOR_CHECK_MODE > 0
83 VM_ASSERT(cr->sync.locked_by != cr->pub.self);
84 #endif
85 ractor_lock(cr, file, line);
88 static void
89 ractor_unlock(rb_ractor_t *r, const char *file, int line)
91 ASSERT_ractor_locking(r);
92 #if RACTOR_CHECK_MODE > 0
93 r->sync.locked_by = Qnil;
94 #endif
95 rb_native_mutex_unlock(&r->sync.lock);
97 RUBY_DEBUG_LOG2(file, line, "r:%u%s", r->pub.id, GET_RACTOR() == r ? " (self)" : "");
100 static void
101 ractor_unlock_self(rb_ractor_t *cr, const char *file, int line)
103 VM_ASSERT(cr == GET_RACTOR());
104 #if RACTOR_CHECK_MODE > 0
105 VM_ASSERT(cr->sync.locked_by == cr->pub.self);
106 #endif
107 ractor_unlock(cr, file, line);
110 #define RACTOR_LOCK(r) ractor_lock(r, __FILE__, __LINE__)
111 #define RACTOR_UNLOCK(r) ractor_unlock(r, __FILE__, __LINE__)
112 #define RACTOR_LOCK_SELF(r) ractor_lock_self(r, __FILE__, __LINE__)
113 #define RACTOR_UNLOCK_SELF(r) ractor_unlock_self(r, __FILE__, __LINE__)
115 static void
116 ractor_cond_wait(rb_ractor_t *r)
118 #if RACTOR_CHECK_MODE > 0
119 VALUE locked_by = r->sync.locked_by;
120 r->sync.locked_by = Qnil;
121 #endif
122 rb_native_cond_wait(&r->sync.cond, &r->sync.lock);
124 #if RACTOR_CHECK_MODE > 0
125 r->sync.locked_by = locked_by;
126 #endif
129 // Ractor status
131 static const char *
132 ractor_status_str(enum ractor_status status)
134 switch (status) {
135 case ractor_created: return "created";
136 case ractor_running: return "running";
137 case ractor_blocking: return "blocking";
138 case ractor_terminated: return "terminated";
140 rb_bug("unreachable");
143 static void
144 ractor_status_set(rb_ractor_t *r, enum ractor_status status)
146 RUBY_DEBUG_LOG("r:%u [%s]->[%s]", r->pub.id, ractor_status_str(r->status_), ractor_status_str(status));
148 // check 1
149 if (r->status_ != ractor_created) {
150 VM_ASSERT(r == GET_RACTOR()); // only self-modification is allowed.
151 ASSERT_vm_locking();
154 // check2: transition check. assume it will be vanished on non-debug build.
155 switch (r->status_) {
156 case ractor_created:
157 VM_ASSERT(status == ractor_blocking);
158 break;
159 case ractor_running:
160 VM_ASSERT(status == ractor_blocking||
161 status == ractor_terminated);
162 break;
163 case ractor_blocking:
164 VM_ASSERT(status == ractor_running);
165 break;
166 case ractor_terminated:
167 rb_bug("unreachable");
168 break;
171 r->status_ = status;
174 static bool
175 ractor_status_p(rb_ractor_t *r, enum ractor_status status)
177 return rb_ractor_status_p(r, status);
180 // Ractor data/mark/free
182 static struct rb_ractor_basket *ractor_queue_at(rb_ractor_t *r, struct rb_ractor_queue *rq, int i);
183 static void ractor_local_storage_mark(rb_ractor_t *r);
184 static void ractor_local_storage_free(rb_ractor_t *r);
186 static void
187 ractor_queue_mark(struct rb_ractor_queue *rq)
189 for (int i=0; i<rq->cnt; i++) {
190 struct rb_ractor_basket *b = ractor_queue_at(NULL, rq, i);
191 rb_gc_mark(b->sender);
193 switch (b->type.e) {
194 case basket_type_yielding:
195 case basket_type_take_basket:
196 case basket_type_deleted:
197 case basket_type_reserved:
198 // ignore
199 break;
200 default:
201 rb_gc_mark(b->p.send.v);
206 static void
207 ractor_mark(void *ptr)
209 rb_ractor_t *r = (rb_ractor_t *)ptr;
211 ractor_queue_mark(&r->sync.recv_queue);
212 ractor_queue_mark(&r->sync.takers_queue);
214 rb_gc_mark(r->receiving_mutex);
216 rb_gc_mark(r->loc);
217 rb_gc_mark(r->name);
218 rb_gc_mark(r->r_stdin);
219 rb_gc_mark(r->r_stdout);
220 rb_gc_mark(r->r_stderr);
221 rb_hook_list_mark(&r->pub.hooks);
223 if (r->threads.cnt > 0) {
224 rb_thread_t *th = 0;
225 ccan_list_for_each(&r->threads.set, th, lt_node) {
226 VM_ASSERT(th != NULL);
227 rb_gc_mark(th->self);
231 ractor_local_storage_mark(r);
234 static void
235 ractor_queue_free(struct rb_ractor_queue *rq)
237 free(rq->baskets);
240 static void
241 ractor_free(void *ptr)
243 rb_ractor_t *r = (rb_ractor_t *)ptr;
244 RUBY_DEBUG_LOG("free r:%d", rb_ractor_id(r));
245 rb_native_mutex_destroy(&r->sync.lock);
246 rb_native_cond_destroy(&r->sync.cond);
247 ractor_queue_free(&r->sync.recv_queue);
248 ractor_queue_free(&r->sync.takers_queue);
249 ractor_local_storage_free(r);
250 rb_hook_list_free(&r->pub.hooks);
251 ruby_xfree(r);
254 static size_t
255 ractor_queue_memsize(const struct rb_ractor_queue *rq)
257 return sizeof(struct rb_ractor_basket) * rq->size;
260 static size_t
261 ractor_memsize(const void *ptr)
263 rb_ractor_t *r = (rb_ractor_t *)ptr;
265 // TODO: more correct?
266 return sizeof(rb_ractor_t) +
267 ractor_queue_memsize(&r->sync.recv_queue) +
268 ractor_queue_memsize(&r->sync.takers_queue);
271 static const rb_data_type_t ractor_data_type = {
272 "ractor",
274 ractor_mark,
275 ractor_free,
276 ractor_memsize,
277 NULL, // update
279 0, 0, RUBY_TYPED_FREE_IMMEDIATELY /* | RUBY_TYPED_WB_PROTECTED */
282 bool
283 rb_ractor_p(VALUE gv)
285 if (rb_typeddata_is_kind_of(gv, &ractor_data_type)) {
286 return true;
288 else {
289 return false;
293 static inline rb_ractor_t *
294 RACTOR_PTR(VALUE self)
296 VM_ASSERT(rb_ractor_p(self));
297 rb_ractor_t *r = DATA_PTR(self);
298 return r;
301 static rb_atomic_t ractor_last_id;
303 #if RACTOR_CHECK_MODE > 0
304 uint32_t
305 rb_ractor_current_id(void)
307 if (GET_THREAD()->ractor == NULL) {
308 return 1; // main ractor
310 else {
311 return rb_ractor_id(GET_RACTOR());
314 #endif
316 // Ractor queue
318 static void
319 ractor_queue_setup(struct rb_ractor_queue *rq)
321 rq->size = 2;
322 rq->cnt = 0;
323 rq->start = 0;
324 rq->baskets = malloc(sizeof(struct rb_ractor_basket) * rq->size);
327 static struct rb_ractor_basket *
328 ractor_queue_head(rb_ractor_t *r, struct rb_ractor_queue *rq)
330 if (r != NULL) ASSERT_ractor_locking(r);
331 return &rq->baskets[rq->start];
334 static struct rb_ractor_basket *
335 ractor_queue_at(rb_ractor_t *r, struct rb_ractor_queue *rq, int i)
337 if (r != NULL) ASSERT_ractor_locking(r);
338 return &rq->baskets[(rq->start + i) % rq->size];
341 static void
342 ractor_queue_advance(rb_ractor_t *r, struct rb_ractor_queue *rq)
344 ASSERT_ractor_locking(r);
346 if (rq->reserved_cnt == 0) {
347 rq->cnt--;
348 rq->start = (rq->start + 1) % rq->size;
349 rq->serial++;
351 else {
352 ractor_queue_at(r, rq, 0)->type.e = basket_type_deleted;
356 static bool
357 ractor_queue_skip_p(rb_ractor_t *r, struct rb_ractor_queue *rq, int i)
359 struct rb_ractor_basket *b = ractor_queue_at(r, rq, i);
360 return basket_type_p(b, basket_type_deleted) ||
361 basket_type_p(b, basket_type_reserved);
364 static void
365 ractor_queue_compact(rb_ractor_t *r, struct rb_ractor_queue *rq)
367 ASSERT_ractor_locking(r);
369 while (rq->cnt > 0 && basket_type_p(ractor_queue_at(r, rq, 0), basket_type_deleted)) {
370 ractor_queue_advance(r, rq);
374 static bool
375 ractor_queue_empty_p(rb_ractor_t *r, struct rb_ractor_queue *rq)
377 ASSERT_ractor_locking(r);
379 if (rq->cnt == 0) {
380 return true;
383 ractor_queue_compact(r, rq);
385 for (int i=0; i<rq->cnt; i++) {
386 if (!ractor_queue_skip_p(r, rq, i)) {
387 return false;
391 return true;
394 static bool
395 ractor_queue_deq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket)
397 ASSERT_ractor_locking(r);
399 for (int i=0; i<rq->cnt; i++) {
400 if (!ractor_queue_skip_p(r, rq, i)) {
401 struct rb_ractor_basket *b = ractor_queue_at(r, rq, i);
402 *basket = *b;
404 // remove from queue
405 b->type.e = basket_type_deleted;
406 ractor_queue_compact(r, rq);
407 return true;
411 return false;
414 static void
415 ractor_queue_enq(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket)
417 ASSERT_ractor_locking(r);
419 if (rq->size <= rq->cnt) {
420 rq->baskets = realloc(rq->baskets, sizeof(struct rb_ractor_basket) * rq->size * 2);
421 for (int i=rq->size - rq->start; i<rq->cnt; i++) {
422 rq->baskets[i + rq->start] = rq->baskets[i + rq->start - rq->size];
424 rq->size *= 2;
426 rq->baskets[(rq->start + rq->cnt++) % rq->size] = *basket;
427 // fprintf(stderr, "%s %p->cnt:%d\n", RUBY_FUNCTION_NAME_STRING, (void *)rq, rq->cnt);
430 static void
431 ractor_queue_delete(rb_ractor_t *r, struct rb_ractor_queue *rq, struct rb_ractor_basket *basket)
433 basket->type.e = basket_type_deleted;
436 // Ractor basket
438 static VALUE ractor_reset_belonging(VALUE obj); // in this file
440 static VALUE
441 ractor_basket_value(struct rb_ractor_basket *b)
443 switch (b->type.e) {
444 case basket_type_ref:
445 break;
446 case basket_type_copy:
447 case basket_type_move:
448 case basket_type_will:
449 b->type.e = basket_type_ref;
450 b->p.send.v = ractor_reset_belonging(b->p.send.v);
451 break;
452 default:
453 rb_bug("unreachable");
456 return b->p.send.v;
459 static VALUE
460 ractor_basket_accept(struct rb_ractor_basket *b)
462 VALUE v = ractor_basket_value(b);
464 if (b->p.send.exception) {
465 VALUE cause = v;
466 VALUE err = rb_exc_new_cstr(rb_eRactorRemoteError, "thrown by remote Ractor.");
467 rb_ivar_set(err, rb_intern("@ractor"), b->sender);
468 rb_ec_setup_exception(NULL, err, cause);
469 rb_exc_raise(err);
472 return v;
475 // Ractor synchronizations
477 #if USE_RUBY_DEBUG_LOG
478 static const char *
479 wait_status_str(enum rb_ractor_wait_status wait_status)
481 switch ((int)wait_status) {
482 case wait_none: return "none";
483 case wait_receiving: return "receiving";
484 case wait_taking: return "taking";
485 case wait_yielding: return "yielding";
486 case wait_receiving|wait_taking: return "receiving|taking";
487 case wait_receiving|wait_yielding: return "receiving|yielding";
488 case wait_taking|wait_yielding: return "taking|yielding";
489 case wait_receiving|wait_taking|wait_yielding: return "receiving|taking|yielding";
491 rb_bug("unreachable");
494 static const char *
495 wakeup_status_str(enum rb_ractor_wakeup_status wakeup_status)
497 switch (wakeup_status) {
498 case wakeup_none: return "none";
499 case wakeup_by_send: return "by_send";
500 case wakeup_by_yield: return "by_yield";
501 case wakeup_by_take: return "by_take";
502 case wakeup_by_close: return "by_close";
503 case wakeup_by_interrupt: return "by_interrupt";
504 case wakeup_by_retry: return "by_retry";
506 rb_bug("unreachable");
509 static const char *
510 basket_type_name(enum rb_ractor_basket_type type)
512 switch (type) {
513 case basket_type_none: return "none";
514 case basket_type_ref: return "ref";
515 case basket_type_copy: return "copy";
516 case basket_type_move: return "move";
517 case basket_type_will: return "will";
518 case basket_type_deleted: return "deleted";
519 case basket_type_reserved: return "reserved";
520 case basket_type_take_basket: return "take_basket";
521 case basket_type_yielding: return "yielding";
523 VM_ASSERT(0);
524 return NULL;
526 #endif // USE_RUBY_DEBUG_LOG
528 static bool
529 ractor_sleeping_by(const rb_ractor_t *r, enum rb_ractor_wait_status wait_status)
531 return (r->sync.wait.status & wait_status) && r->sync.wait.wakeup_status == wakeup_none;
534 static bool
535 ractor_wakeup(rb_ractor_t *r, enum rb_ractor_wait_status wait_status, enum rb_ractor_wakeup_status wakeup_status)
537 ASSERT_ractor_locking(r);
539 RUBY_DEBUG_LOG("r:%u wait_by:%s -> wait:%s wakeup:%s",
540 rb_ractor_id(r),
541 wait_status_str(r->sync.wait.status),
542 wait_status_str(wait_status),
543 wakeup_status_str(wakeup_status));
545 if (ractor_sleeping_by(r, wait_status)) {
546 r->sync.wait.wakeup_status = wakeup_status;
547 rb_native_cond_broadcast(&r->sync.cond);
548 return true;
550 else {
551 return false;
555 static void *
556 ractor_sleep_wo_gvl(void *ptr)
558 rb_ractor_t *cr = ptr;
559 RACTOR_LOCK_SELF(cr);
561 VM_ASSERT(cr->sync.wait.status != wait_none);
562 if (cr->sync.wait.wakeup_status == wakeup_none) {
563 ractor_cond_wait(cr);
565 cr->sync.wait.status = wait_none;
567 RACTOR_UNLOCK_SELF(cr);
568 return NULL;
571 static void
572 ractor_sleep_interrupt(void *ptr)
574 rb_ractor_t *r = ptr;
576 RACTOR_LOCK(r);
578 ractor_wakeup(r, wait_receiving | wait_taking | wait_yielding, wakeup_by_interrupt);
580 RACTOR_UNLOCK(r);
583 typedef void (*ractor_sleep_cleanup_function)(rb_ractor_t *cr, void *p);
585 static enum rb_ractor_wakeup_status
586 ractor_sleep_with_cleanup(rb_execution_context_t *ec, rb_ractor_t *cr, enum rb_ractor_wait_status wait_status,
587 ractor_sleep_cleanup_function cf_func, void *cf_data)
589 enum rb_ractor_wakeup_status wakeup_status;
590 VM_ASSERT(GET_RACTOR() == cr);
592 // TODO: multi-threads
593 VM_ASSERT(cr->sync.wait.status == wait_none);
594 VM_ASSERT(wait_status != wait_none);
595 cr->sync.wait.status = wait_status;
596 cr->sync.wait.wakeup_status = wakeup_none;
598 // fprintf(stderr, "%s r:%p status:%s, wakeup_status:%s\n", RUBY_FUNCTION_NAME_STRING, (void *)cr,
599 // wait_status_str(cr->sync.wait.status), wakeup_status_str(cr->sync.wait.wakeup_status));
601 RUBY_DEBUG_LOG("sleep by %s", wait_status_str(wait_status));
603 RACTOR_UNLOCK(cr);
605 rb_nogvl(ractor_sleep_wo_gvl, cr,
606 ractor_sleep_interrupt, cr,
607 RB_NOGVL_UBF_ASYNC_SAFE | RB_NOGVL_INTR_FAIL);
609 RACTOR_LOCK(cr);
611 // rb_nogvl() can be canceled by interrupts
612 if (cr->sync.wait.status != wait_none) {
613 cr->sync.wait.status = wait_none;
614 cr->sync.wait.wakeup_status = wakeup_by_interrupt;
616 RACTOR_UNLOCK(cr);
618 if (cf_func) {
619 int state;
620 EC_PUSH_TAG(ec);
621 if ((state = EC_EXEC_TAG()) == TAG_NONE) {
622 rb_thread_check_ints();
624 EC_POP_TAG();
626 if (state) {
627 (*cf_func)(cr, cf_data);
628 EC_JUMP_TAG(ec, state);
631 else {
632 rb_thread_check_ints();
635 RACTOR_LOCK(cr); // reachable?
638 // TODO: multi-thread
639 wakeup_status = cr->sync.wait.wakeup_status;
640 cr->sync.wait.wakeup_status = wakeup_none;
642 RUBY_DEBUG_LOG("wakeup %s", wakeup_status_str(wakeup_status));
644 return wakeup_status;
647 static enum rb_ractor_wakeup_status
648 ractor_sleep(rb_execution_context_t *ec, rb_ractor_t *cr, enum rb_ractor_wait_status wait_status)
650 return ractor_sleep_with_cleanup(ec, cr, wait_status, NULL, NULL);
653 // Ractor.receive
655 static void
656 ractor_recursive_receive_if(rb_ractor_t *r)
658 if (r->receiving_mutex && rb_mutex_owned_p(r->receiving_mutex)) {
659 rb_raise(rb_eRactorError, "can not call receive/receive_if recursively");
663 static VALUE
664 ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *rq)
666 struct rb_ractor_basket basket;
667 ractor_recursive_receive_if(cr);
668 bool received = false;
670 RACTOR_LOCK_SELF(cr);
672 RUBY_DEBUG_LOG("rq->cnt:%d", rq->cnt);
673 received = ractor_queue_deq(cr, rq, &basket);
675 RACTOR_UNLOCK_SELF(cr);
677 if (!received) {
678 if (cr->sync.incoming_port_closed) {
679 rb_raise(rb_eRactorClosedError, "The incoming port is already closed");
681 return Qundef;
683 else {
684 return ractor_basket_accept(&basket);
688 static void
689 ractor_wait_receive(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *rq)
691 VM_ASSERT(cr == rb_ec_ractor_ptr(ec));
692 ractor_recursive_receive_if(cr);
694 RACTOR_LOCK(cr);
696 while (ractor_queue_empty_p(cr, rq)) {
697 ractor_sleep(ec, cr, wait_receiving);
700 RACTOR_UNLOCK(cr);
703 static VALUE
704 ractor_receive(rb_execution_context_t *ec, rb_ractor_t *cr)
706 VM_ASSERT(cr == rb_ec_ractor_ptr(ec));
707 VALUE v;
708 struct rb_ractor_queue *rq = &cr->sync.recv_queue;
710 while (UNDEF_P(v = ractor_try_receive(ec, cr, rq))) {
711 ractor_wait_receive(ec, cr, rq);
714 return v;
717 #if 0
718 static void
719 rq_dump(struct rb_ractor_queue *rq)
721 bool bug = false;
722 for (int i=0; i<rq->cnt; i++) {
723 struct rb_ractor_basket *b = ractor_queue_at(NULL, rq, i);
724 fprintf(stderr, "%d (start:%d) type:%s %p %s\n", i, rq->start, basket_type_name(b->type),
725 (void *)b, RSTRING_PTR(RARRAY_AREF(b->v, 1)));
726 if (basket_type_p(b, basket_type_reserved) bug = true;
728 if (bug) rb_bug("!!");
730 #endif
732 struct receive_block_data {
733 rb_ractor_t *cr;
734 struct rb_ractor_queue *rq;
735 VALUE v;
736 int index;
737 bool success;
740 static void
741 ractor_receive_if_lock(rb_ractor_t *cr)
743 VALUE m = cr->receiving_mutex;
744 if (m == Qfalse) {
745 m = cr->receiving_mutex = rb_mutex_new();
747 rb_mutex_lock(m);
750 static VALUE
751 receive_if_body(VALUE ptr)
753 struct receive_block_data *data = (struct receive_block_data *)ptr;
755 ractor_receive_if_lock(data->cr);
756 VALUE block_result = rb_yield(data->v);
757 rb_ractor_t *cr = data->cr;
759 RACTOR_LOCK_SELF(cr);
761 struct rb_ractor_basket *b = ractor_queue_at(cr, data->rq, data->index);
762 VM_ASSERT(basket_type_p(b, basket_type_reserved));
763 data->rq->reserved_cnt--;
765 if (RTEST(block_result)) {
766 ractor_queue_delete(cr, data->rq, b);
767 ractor_queue_compact(cr, data->rq);
769 else {
770 b->type.e = basket_type_ref;
773 RACTOR_UNLOCK_SELF(cr);
775 data->success = true;
777 if (RTEST(block_result)) {
778 return data->v;
780 else {
781 return Qundef;
785 static VALUE
786 receive_if_ensure(VALUE v)
788 struct receive_block_data *data = (struct receive_block_data *)v;
789 rb_ractor_t *cr = data->cr;
791 if (!data->success) {
792 RACTOR_LOCK_SELF(cr);
794 struct rb_ractor_basket *b = ractor_queue_at(cr, data->rq, data->index);
795 VM_ASSERT(basket_type_p(b, basket_type_reserved));
796 b->type.e = basket_type_deleted;
797 data->rq->reserved_cnt--;
799 RACTOR_UNLOCK_SELF(cr);
802 rb_mutex_unlock(cr->receiving_mutex);
803 return Qnil;
806 static VALUE
807 ractor_receive_if(rb_execution_context_t *ec, VALUE crv, VALUE b)
809 if (!RTEST(b)) rb_raise(rb_eArgError, "no block given");
811 rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
812 unsigned int serial = (unsigned int)-1;
813 int index = 0;
814 struct rb_ractor_queue *rq = &cr->sync.recv_queue;
816 while (1) {
817 VALUE v = Qundef;
819 ractor_wait_receive(ec, cr, rq);
821 RACTOR_LOCK_SELF(cr);
823 if (serial != rq->serial) {
824 serial = rq->serial;
825 index = 0;
828 // check newer version
829 for (int i=index; i<rq->cnt; i++) {
830 if (!ractor_queue_skip_p(cr, rq, i)) {
831 struct rb_ractor_basket *b = ractor_queue_at(cr, rq, i);
832 v = ractor_basket_value(b);
833 b->type.e = basket_type_reserved;
834 rq->reserved_cnt++;
835 index = i;
836 break;
840 RACTOR_UNLOCK_SELF(cr);
842 if (!UNDEF_P(v)) {
843 struct receive_block_data data = {
844 .cr = cr,
845 .rq = rq,
846 .v = v,
847 .index = index,
848 .success = false,
851 VALUE result = rb_ensure(receive_if_body, (VALUE)&data,
852 receive_if_ensure, (VALUE)&data);
854 if (!UNDEF_P(result)) return result;
855 index++;
858 RUBY_VM_CHECK_INTS(ec);
862 static void
863 ractor_send_basket(rb_execution_context_t *ec, rb_ractor_t *r, struct rb_ractor_basket *b)
865 bool closed = false;
867 RACTOR_LOCK(r);
869 if (r->sync.incoming_port_closed) {
870 closed = true;
872 else {
873 ractor_queue_enq(r, &r->sync.recv_queue, b);
874 ractor_wakeup(r, wait_receiving, wakeup_by_send);
877 RACTOR_UNLOCK(r);
879 if (closed) {
880 rb_raise(rb_eRactorClosedError, "The incoming-port is already closed");
884 // Ractor#send
886 static VALUE ractor_move(VALUE obj); // in this file
887 static VALUE ractor_copy(VALUE obj); // in this file
889 static void
890 ractor_basket_prepare_contents(VALUE obj, VALUE move, VALUE *pobj, enum rb_ractor_basket_type *ptype)
892 VALUE v;
893 enum rb_ractor_basket_type type;
895 if (rb_ractor_shareable_p(obj)) {
896 type = basket_type_ref;
897 v = obj;
899 else if (!RTEST(move)) {
900 v = ractor_copy(obj);
901 type = basket_type_copy;
903 else {
904 type = basket_type_move;
905 v = ractor_move(obj);
908 *pobj = v;
909 *ptype = type;
912 static void
913 ractor_basket_fill_(rb_ractor_t *cr, struct rb_ractor_basket *basket, VALUE obj, bool exc)
915 VM_ASSERT(cr == GET_RACTOR());
917 basket->sender = cr->pub.self;
918 basket->p.send.exception = exc;
919 basket->p.send.v = obj;
922 static void
923 ractor_basket_fill(rb_ractor_t *cr, struct rb_ractor_basket *basket, VALUE obj, VALUE move, bool exc)
925 VALUE v;
926 enum rb_ractor_basket_type type;
927 ractor_basket_prepare_contents(obj, move, &v, &type);
928 ractor_basket_fill_(cr, basket, v, exc);
929 basket->type.e = type;
932 static void
933 ractor_basket_fill_will(rb_ractor_t *cr, struct rb_ractor_basket *basket, VALUE obj, bool exc)
935 ractor_basket_fill_(cr, basket, obj, exc);
936 basket->type.e = basket_type_will;
939 static VALUE
940 ractor_send(rb_execution_context_t *ec, rb_ractor_t *r, VALUE obj, VALUE move)
942 struct rb_ractor_basket basket;
943 // TODO: Ractor local GC
944 ractor_basket_fill(rb_ec_ractor_ptr(ec), &basket, obj, move, false);
945 ractor_send_basket(ec, r, &basket);
946 return r->pub.self;
949 // Ractor#take
951 static bool
952 ractor_take_has_will(rb_ractor_t *r)
954 ASSERT_ractor_locking(r);
956 return basket_type_p(&r->sync.will_basket, basket_type_will);
959 static bool
960 ractor_take_will(rb_ractor_t *r, struct rb_ractor_basket *b)
962 ASSERT_ractor_locking(r);
964 if (ractor_take_has_will(r)) {
965 *b = r->sync.will_basket;
966 r->sync.will_basket.type.e = basket_type_none;
967 return true;
969 else {
970 VM_ASSERT(basket_type_p(&r->sync.will_basket, basket_type_none));
971 return false;
975 static bool
976 ractor_take_will_lock(rb_ractor_t *r, struct rb_ractor_basket *b)
978 ASSERT_ractor_unlocking(r);
979 bool taken;
981 RACTOR_LOCK(r);
983 taken = ractor_take_will(r, b);
985 RACTOR_UNLOCK(r);
987 return taken;
990 static bool
991 ractor_register_take(rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *take_basket,
992 bool is_take, struct rb_ractor_selector_take_config *config, bool ignore_error)
994 struct rb_ractor_basket b = {
995 .type.e = basket_type_take_basket,
996 .sender = cr->pub.self,
997 .p = {
998 .take = {
999 .basket = take_basket,
1000 .config = config,
1004 bool closed = false;
1006 RACTOR_LOCK(r);
1008 if (is_take && ractor_take_will(r, take_basket)) {
1009 RUBY_DEBUG_LOG("take over a will of r:%d", rb_ractor_id(r));
1011 else if (!is_take && ractor_take_has_will(r)) {
1012 RUBY_DEBUG_LOG("has_will");
1013 VM_ASSERT(config != NULL);
1014 config->closed = true;
1016 else if (r->sync.outgoing_port_closed) {
1017 closed = true;
1019 else {
1020 RUBY_DEBUG_LOG("register in r:%d", rb_ractor_id(r));
1021 ractor_queue_enq(r, &r->sync.takers_queue, &b);
1023 if (basket_none_p(take_basket)) {
1024 ractor_wakeup(r, wait_yielding, wakeup_by_take);
1028 RACTOR_UNLOCK(r);
1030 if (closed) {
1031 if (!ignore_error) rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
1032 return false;
1034 else {
1035 return true;
1039 static bool
1040 ractor_deregister_take(rb_ractor_t *r, struct rb_ractor_basket *take_basket)
1042 struct rb_ractor_queue *ts = &r->sync.takers_queue;
1043 bool deleted = false;
1045 RACTOR_LOCK(r);
1047 if (r->sync.outgoing_port_closed) {
1048 // ok
1050 else {
1051 for (int i=0; i<ts->cnt; i++) {
1052 struct rb_ractor_basket *b = ractor_queue_at(r, ts, i);
1053 if (basket_type_p(b, basket_type_take_basket) && b->p.take.basket == take_basket) {
1054 ractor_queue_delete(r, ts, b);
1055 deleted = true;
1058 if (deleted) {
1059 ractor_queue_compact(r, ts);
1063 RACTOR_UNLOCK(r);
1065 return deleted;
1068 static VALUE
1069 ractor_try_take(rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *take_basket)
1071 bool taken;
1073 RACTOR_LOCK_SELF(cr);
1075 if (basket_none_p(take_basket) || basket_type_p(take_basket, basket_type_yielding)) {
1076 taken = false;
1078 else {
1079 taken = true;
1082 RACTOR_UNLOCK_SELF(cr);
1084 if (taken) {
1085 RUBY_DEBUG_LOG("taken");
1086 if (basket_type_p(take_basket, basket_type_deleted)) {
1087 VM_ASSERT(r->sync.outgoing_port_closed);
1088 rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
1090 return ractor_basket_accept(take_basket);
1092 else {
1093 RUBY_DEBUG_LOG("not taken");
1094 return Qundef;
1099 #if VM_CHECK_MODE > 0
1100 static bool
1101 ractor_check_specific_take_basket_lock(rb_ractor_t *r, struct rb_ractor_basket *tb)
1103 bool ret = false;
1104 struct rb_ractor_queue *ts = &r->sync.takers_queue;
1106 RACTOR_LOCK(r);
1108 for (int i=0; i<ts->cnt; i++) {
1109 struct rb_ractor_basket *b = ractor_queue_at(r, ts, i);
1110 if (basket_type_p(b, basket_type_take_basket) && b->p.take.basket == tb) {
1111 ret = true;
1112 break;
1116 RACTOR_UNLOCK(r);
1118 return ret;
1120 #endif
1122 static void
1123 ractor_take_cleanup(rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *tb)
1125 retry:
1126 if (basket_none_p(tb)) { // not yielded yet
1127 if (!ractor_deregister_take(r, tb)) {
1128 // not in r's takers queue
1129 rb_thread_sleep(0);
1130 goto retry;
1133 else {
1134 VM_ASSERT(!ractor_check_specific_take_basket_lock(r, tb));
1138 struct take_wait_take_cleanup_data {
1139 rb_ractor_t *r;
1140 struct rb_ractor_basket *tb;
1143 static void
1144 ractor_wait_take_cleanup(rb_ractor_t *cr, void *ptr)
1146 struct take_wait_take_cleanup_data *data = (struct take_wait_take_cleanup_data *)ptr;
1147 ractor_take_cleanup(cr, data->r, data->tb);
1150 static void
1151 ractor_wait_take(rb_execution_context_t *ec, rb_ractor_t *cr, rb_ractor_t *r, struct rb_ractor_basket *take_basket)
1153 struct take_wait_take_cleanup_data data = {
1154 .r = r,
1155 .tb = take_basket,
1158 RACTOR_LOCK_SELF(cr);
1160 if (basket_none_p(take_basket) || basket_type_p(take_basket, basket_type_yielding)) {
1161 ractor_sleep_with_cleanup(ec, cr, wait_taking, ractor_wait_take_cleanup, &data);
1164 RACTOR_UNLOCK_SELF(cr);
1167 static VALUE
1168 ractor_take(rb_execution_context_t *ec, rb_ractor_t *r)
1170 RUBY_DEBUG_LOG("from r:%u", rb_ractor_id(r));
1171 VALUE v;
1172 rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
1174 struct rb_ractor_basket take_basket = {
1175 .type.e = basket_type_none,
1176 .sender = 0,
1179 ractor_register_take(cr, r, &take_basket, true, NULL, false);
1181 while (UNDEF_P(v = ractor_try_take(cr, r, &take_basket))) {
1182 ractor_wait_take(ec, cr, r, &take_basket);
1185 VM_ASSERT(!basket_none_p(&take_basket));
1186 VM_ASSERT(!ractor_check_specific_take_basket_lock(r, &take_basket));
1188 return v;
1191 // Ractor.yield
1193 static bool
1194 ractor_check_take_basket(rb_ractor_t *cr, struct rb_ractor_queue *rs)
1196 ASSERT_ractor_locking(cr);
1198 for (int i=0; i<rs->cnt; i++) {
1199 struct rb_ractor_basket *b = ractor_queue_at(cr, rs, i);
1200 if (basket_type_p(b, basket_type_take_basket) &&
1201 basket_none_p(b->p.take.basket)) {
1202 return true;
1206 return false;
1209 static bool
1210 ractor_deq_take_basket(rb_ractor_t *cr, struct rb_ractor_queue *rs, struct rb_ractor_basket *b)
1212 ASSERT_ractor_unlocking(cr);
1213 struct rb_ractor_basket *first_tb = NULL;
1214 bool found = false;
1216 RACTOR_LOCK_SELF(cr);
1218 while (ractor_queue_deq(cr, rs, b)) {
1219 if (basket_type_p(b, basket_type_take_basket)) {
1220 struct rb_ractor_basket *tb = b->p.take.basket;
1222 if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_yielding) == basket_type_none) {
1223 found = true;
1224 break;
1226 else {
1227 ractor_queue_enq(cr, rs, b);
1228 if (first_tb == NULL) first_tb = tb;
1229 struct rb_ractor_basket *head = ractor_queue_head(cr, rs);
1230 VM_ASSERT(head != NULL);
1231 if (basket_type_p(head, basket_type_take_basket) && head->p.take.basket == first_tb) {
1232 break; // loop detected
1236 else {
1237 VM_ASSERT(basket_none_p(b));
1241 if (found && b->p.take.config && !b->p.take.config->oneshot) {
1242 ractor_queue_enq(cr, rs, b);
1245 RACTOR_UNLOCK_SELF(cr);
1247 return found;
1250 static bool
1251 ractor_try_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *ts, VALUE obj, VALUE move, bool exc, bool is_will)
1253 ASSERT_ractor_unlocking(cr);
1255 struct rb_ractor_basket b;
1257 if (ractor_deq_take_basket(cr, ts, &b)) {
1258 VM_ASSERT(basket_type_p(&b, basket_type_take_basket));
1259 VM_ASSERT(basket_type_p(b.p.take.basket, basket_type_yielding));
1261 rb_ractor_t *tr = RACTOR_PTR(b.sender);
1262 struct rb_ractor_basket *tb = b.p.take.basket;
1263 enum rb_ractor_basket_type type;
1265 RUBY_DEBUG_LOG("basket from r:%u", rb_ractor_id(tr));
1267 if (is_will) {
1268 type = basket_type_will;
1270 else {
1271 int state;
1273 // begin
1274 EC_PUSH_TAG(ec);
1275 if ((state = EC_EXEC_TAG()) == TAG_NONE) {
1276 // TODO: Ractor local GC
1277 ractor_basket_prepare_contents(obj, move, &obj, &type);
1279 EC_POP_TAG();
1280 // rescue
1281 if (state) {
1282 RACTOR_LOCK_SELF(cr);
1284 b.p.take.basket->type.e = basket_type_none;
1285 ractor_queue_enq(cr, ts, &b);
1287 RACTOR_UNLOCK_SELF(cr);
1288 EC_JUMP_TAG(ec, state);
1292 RACTOR_LOCK(tr);
1294 VM_ASSERT(basket_type_p(tb, basket_type_yielding));
1295 // fill atomic
1296 RUBY_DEBUG_LOG("fill %sbasket from r:%u", is_will ? "will " : "", rb_ractor_id(tr));
1297 ractor_basket_fill_(cr, tb, obj, exc);
1298 if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_yielding, type) != basket_type_yielding) {
1299 rb_bug("unreachable");
1301 ractor_wakeup(tr, wait_taking, wakeup_by_yield);
1303 RACTOR_UNLOCK(tr);
1305 return true;
1307 else {
1308 RUBY_DEBUG_LOG("no take basket");
1309 return false;
1313 static void
1314 ractor_wait_yield(rb_execution_context_t *ec, rb_ractor_t *cr, struct rb_ractor_queue *ts)
1316 RACTOR_LOCK_SELF(cr);
1318 while (!ractor_check_take_basket(cr, ts)) {
1319 ractor_sleep(ec, cr, wait_yielding);
1322 RACTOR_UNLOCK_SELF(cr);
1325 static VALUE
1326 ractor_yield(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE obj, VALUE move)
1328 struct rb_ractor_queue *ts = &cr->sync.takers_queue;
1330 while (!ractor_try_yield(ec, cr, ts, obj, move, false, false)) {
1331 ractor_wait_yield(ec, cr, ts);
1334 return Qnil;
1337 // Ractor::Selector
1339 struct rb_ractor_selector {
1340 rb_ractor_t *r;
1341 struct rb_ractor_basket take_basket;
1342 st_table *take_ractors; // rb_ractor_t * => (struct rb_ractor_selector_take_config *)
1345 static int
1346 ractor_selector_mark_ractors_i(st_data_t key, st_data_t value, st_data_t data)
1348 const rb_ractor_t *r = (rb_ractor_t *)key;
1349 rb_gc_mark(r->pub.self);
1350 return ST_CONTINUE;
1353 static void
1354 ractor_selector_mark(void *ptr)
1356 struct rb_ractor_selector *s = ptr;
1358 if (s->take_ractors) {
1359 st_foreach(s->take_ractors, ractor_selector_mark_ractors_i, 0);
1362 switch (s->take_basket.type.e) {
1363 case basket_type_ref:
1364 case basket_type_copy:
1365 case basket_type_move:
1366 case basket_type_will:
1367 rb_gc_mark(s->take_basket.sender);
1368 rb_gc_mark(s->take_basket.p.send.v);
1369 break;
1370 default:
1371 break;
1375 static int
1376 ractor_selector_release_i(st_data_t key, st_data_t val, st_data_t data)
1378 struct rb_ractor_selector *s = (struct rb_ractor_selector *)data;
1379 struct rb_ractor_selector_take_config *config = (struct rb_ractor_selector_take_config *)val;
1381 if (!config->closed) {
1382 ractor_deregister_take((rb_ractor_t *)key, &s->take_basket);
1384 free(config);
1385 return ST_CONTINUE;
1388 static void
1389 ractor_selector_free(void *ptr)
1391 struct rb_ractor_selector *s = ptr;
1392 st_foreach(s->take_ractors, ractor_selector_release_i, (st_data_t)s);
1393 st_free_table(s->take_ractors);
1394 ruby_xfree(ptr);
1397 static size_t
1398 ractor_selector_memsize(const void *ptr)
1400 const struct rb_ractor_selector *s = ptr;
1401 return sizeof(struct rb_ractor_selector) +
1402 st_memsize(s->take_ractors) +
1403 s->take_ractors->num_entries * sizeof(struct rb_ractor_selector_take_config);
1406 static const rb_data_type_t ractor_selector_data_type = {
1407 "ractor/selector",
1409 ractor_selector_mark,
1410 ractor_selector_free,
1411 ractor_selector_memsize,
1412 NULL, // update
1414 0, 0, RUBY_TYPED_FREE_IMMEDIATELY,
1417 static struct rb_ractor_selector *
1418 RACTOR_SELECTOR_PTR(VALUE selv)
1420 VM_ASSERT(rb_typeddata_is_kind_of(selv, &ractor_selector_data_type));
1422 return (struct rb_ractor_selector *)DATA_PTR(selv);
1425 // Ractor::Selector.new
1427 static VALUE
1428 ractor_selector_create(VALUE crv)
1430 struct rb_ractor_selector *s;
1431 VALUE selv = TypedData_Make_Struct(rb_cRactorSelector, struct rb_ractor_selector, &ractor_selector_data_type, s);
1432 s->take_basket.type.e = basket_type_reserved;
1433 s->take_ractors = st_init_numtable(); // ractor (ptr) -> take_config
1434 return selv;
1437 // Ractor::Selector#add(r)
1439 static VALUE
1440 ractor_selector_add(rb_execution_context_t *ec, VALUE selv, VALUE rv)
1442 if (!rb_ractor_p(rv)) {
1443 rb_raise(rb_eArgError, "Not a ractor object");
1446 rb_ractor_t *r = RACTOR_PTR(rv);
1447 struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
1449 if (st_lookup(s->take_ractors, (st_data_t)r, NULL)) {
1450 rb_raise(rb_eArgError, "already added");
1453 struct rb_ractor_selector_take_config *config = malloc(sizeof(struct rb_ractor_selector_take_config));
1454 VM_ASSERT(config != NULL);
1455 config->closed = false;
1456 config->oneshot = false;
1458 if (ractor_register_take(rb_ec_ractor_ptr(ec), r, &s->take_basket, false, config, true)) {
1459 st_insert(s->take_ractors, (st_data_t)r, (st_data_t)config);
1462 return rv;
1465 // Ractor::Selector#remove(r)
1467 static VALUE
1468 ractor_selector_remove(rb_execution_context_t *ec, VALUE selv, VALUE rv)
1470 if (!rb_ractor_p(rv)) {
1471 rb_raise(rb_eArgError, "Not a ractor object");
1474 rb_ractor_t *r = RACTOR_PTR(rv);
1475 struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
1477 RUBY_DEBUG_LOG("r:%u", rb_ractor_id(r));
1479 if (!st_lookup(s->take_ractors, (st_data_t)r, NULL)) {
1480 rb_raise(rb_eArgError, "not added yet");
1483 ractor_deregister_take(r, &s->take_basket);
1484 struct rb_ractor_selector_take_config *config;
1485 st_delete(s->take_ractors, (st_data_t *)&r, (st_data_t *)&config);
1486 free(config);
1488 return rv;
1491 // Ractor::Selector#clear
1493 struct ractor_selector_clear_data {
1494 VALUE selv;
1495 rb_execution_context_t *ec;
1498 static int
1499 ractor_selector_clear_i(st_data_t key, st_data_t val, st_data_t data)
1501 struct ractor_selector_clear_data *ptr = (struct ractor_selector_clear_data *)data;
1502 rb_ractor_t *r = (rb_ractor_t *)key;
1503 ractor_selector_remove(ptr->ec, ptr->selv, r->pub.self);
1504 return ST_CONTINUE;
1507 static VALUE
1508 ractor_selector_clear(rb_execution_context_t *ec, VALUE selv)
1510 struct ractor_selector_clear_data data = {
1511 .selv = selv,
1512 .ec = ec,
1514 struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
1516 st_foreach(s->take_ractors, ractor_selector_clear_i, (st_data_t)&data);
1517 st_clear(s->take_ractors);
1518 return selv;
1521 static VALUE
1522 ractor_selector_empty_p(rb_execution_context_t *ec, VALUE selv)
1524 struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
1525 return s->take_ractors->num_entries == 0 ? Qtrue : Qfalse;
1528 static int
1529 ractor_selector_wait_i(st_data_t key, st_data_t val, st_data_t dat)
1531 rb_ractor_t *r = (rb_ractor_t *)key;
1532 struct rb_ractor_basket *tb = (struct rb_ractor_basket *)dat;
1533 int ret;
1535 if (!basket_none_p(tb)) {
1536 RUBY_DEBUG_LOG("already taken:%s", basket_type_name(tb->type.e));
1537 return ST_STOP;
1540 RACTOR_LOCK(r);
1542 if (basket_type_p(&r->sync.will_basket, basket_type_will)) {
1543 RUBY_DEBUG_LOG("r:%u has will", rb_ractor_id(r));
1545 if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_will) == basket_type_none) {
1546 ractor_take_will(r, tb);
1547 ret = ST_STOP;
1549 else {
1550 RUBY_DEBUG_LOG("has will, but already taken (%s)", basket_type_name(tb->type.e));
1551 ret = ST_CONTINUE;
1554 else if (r->sync.outgoing_port_closed) {
1555 RUBY_DEBUG_LOG("r:%u is closed", rb_ractor_id(r));
1557 if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_deleted) == basket_type_none) {
1558 tb->sender = r->pub.self;
1559 ret = ST_STOP;
1561 else {
1562 RUBY_DEBUG_LOG("closed, but already taken (%s)", basket_type_name(tb->type.e));
1563 ret = ST_CONTINUE;
1566 else {
1567 RUBY_DEBUG_LOG("wakeup r:%u", rb_ractor_id(r));
1568 ractor_wakeup(r, wait_yielding, wakeup_by_take);
1569 ret = ST_CONTINUE;
1572 RACTOR_UNLOCK(r);
1574 return ret;
1577 // Ractor::Selector#wait
1579 static void
1580 ractor_selector_wait_cleaup(rb_ractor_t *cr, void *ptr)
1582 struct rb_ractor_basket *tb = (struct rb_ractor_basket *)ptr;
1584 RACTOR_LOCK_SELF(cr);
1586 while (basket_type_p(tb, basket_type_yielding)) rb_thread_sleep(0);
1587 // if tb->type is not none, taking is succeeded, but interruption ignore it unfortunately.
1588 tb->type.e = basket_type_reserved;
1590 RACTOR_UNLOCK_SELF(cr);
1593 static VALUE
1594 ractor_selector_wait(rb_execution_context_t *ec, VALUE selv, VALUE do_receivev, VALUE do_yieldv, VALUE yield_value, VALUE move)
1596 struct rb_ractor_selector *s = RACTOR_SELECTOR_PTR(selv);
1597 struct rb_ractor_basket *tb = &s->take_basket;
1598 struct rb_ractor_basket taken_basket;
1599 rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
1600 bool do_receive = !!RTEST(do_receivev);
1601 bool do_yield = !!RTEST(do_yieldv);
1602 VALUE ret_v, ret_r;
1603 enum rb_ractor_wait_status wait_status;
1604 struct rb_ractor_queue *rq = &cr->sync.recv_queue;
1605 struct rb_ractor_queue *ts = &cr->sync.takers_queue;
1607 RUBY_DEBUG_LOG("start");
1609 retry:
1610 RUBY_DEBUG_LOG("takers:%ld", s->take_ractors->num_entries);
1612 // setup wait_status
1613 wait_status = wait_none;
1614 if (s->take_ractors->num_entries > 0) wait_status |= wait_taking;
1615 if (do_receive) wait_status |= wait_receiving;
1616 if (do_yield) wait_status |= wait_yielding;
1618 RUBY_DEBUG_LOG("wait:%s", wait_status_str(wait_status));
1620 if (wait_status == wait_none) {
1621 rb_raise(rb_eRactorError, "no taking ractors");
1624 // check recv_queue
1625 if (do_receive && (ret_v = ractor_try_receive(ec, cr, rq)) != Qundef) {
1626 ret_r = ID2SYM(rb_intern("receive"));
1627 goto success;
1630 // check takers
1631 if (do_yield && ractor_try_yield(ec, cr, ts, yield_value, move, false, false)) {
1632 ret_v = Qnil;
1633 ret_r = ID2SYM(rb_intern("yield"));
1634 goto success;
1637 // check take_basket
1638 VM_ASSERT(basket_type_p(&s->take_basket, basket_type_reserved));
1639 s->take_basket.type.e = basket_type_none;
1640 // kick all take target ractors
1641 st_foreach(s->take_ractors, ractor_selector_wait_i, (st_data_t)tb);
1643 RACTOR_LOCK_SELF(cr);
1645 retry_waiting:
1646 while (1) {
1647 if (!basket_none_p(tb)) {
1648 RUBY_DEBUG_LOG("taken:%s from r:%u", basket_type_name(tb->type.e),
1649 tb->sender ? rb_ractor_id(RACTOR_PTR(tb->sender)) : 0);
1650 break;
1652 if (do_receive && !ractor_queue_empty_p(cr, rq)) {
1653 RUBY_DEBUG_LOG("can receive (%d)", rq->cnt);
1654 break;
1656 if (do_yield && ractor_check_take_basket(cr, ts)) {
1657 RUBY_DEBUG_LOG("can yield");
1658 break;
1661 ractor_sleep_with_cleanup(ec, cr, wait_status, ractor_selector_wait_cleaup, tb);
1664 taken_basket = *tb;
1666 // ensure
1667 // tb->type.e = basket_type_reserved # do it atomic in the following code
1668 if (taken_basket.type.e == basket_type_yielding ||
1669 RUBY_ATOMIC_CAS(tb->type.atomic, taken_basket.type.e, basket_type_reserved) != taken_basket.type.e) {
1671 if (basket_type_p(tb, basket_type_yielding)) {
1672 RACTOR_UNLOCK_SELF(cr);
1674 rb_thread_sleep(0);
1676 RACTOR_LOCK_SELF(cr);
1678 goto retry_waiting;
1681 RACTOR_UNLOCK_SELF(cr);
1683 // check the taken resutl
1684 switch (taken_basket.type.e) {
1685 case basket_type_none:
1686 VM_ASSERT(do_receive || do_yield);
1687 goto retry;
1688 case basket_type_yielding:
1689 rb_bug("unreachable");
1690 case basket_type_deleted: {
1691 ractor_selector_remove(ec, selv, taken_basket.sender);
1693 rb_ractor_t *r = RACTOR_PTR(taken_basket.sender);
1694 if (ractor_take_will_lock(r, &taken_basket)) {
1695 RUBY_DEBUG_LOG("has_will");
1697 else {
1698 RUBY_DEBUG_LOG("no will");
1699 // rb_raise(rb_eRactorClosedError, "The outgoing-port is already closed");
1700 // remove and retry wait
1701 goto retry;
1703 break;
1705 case basket_type_will:
1706 // no more messages
1707 ractor_selector_remove(ec, selv, taken_basket.sender);
1708 break;
1709 default:
1710 break;
1713 RUBY_DEBUG_LOG("taken_basket:%s", basket_type_name(taken_basket.type.e));
1715 ret_v = ractor_basket_accept(&taken_basket);
1716 ret_r = taken_basket.sender;
1717 success:
1718 return rb_ary_new_from_args(2, ret_r, ret_v);
1721 // Ractor#close_incoming
1723 static VALUE
1724 ractor_close_incoming(rb_execution_context_t *ec, rb_ractor_t *r)
1726 VALUE prev;
1728 RACTOR_LOCK(r);
1730 if (!r->sync.incoming_port_closed) {
1731 prev = Qfalse;
1732 r->sync.incoming_port_closed = true;
1733 if (ractor_wakeup(r, wait_receiving, wakeup_by_close)) {
1734 VM_ASSERT(ractor_queue_empty_p(r, &r->sync.recv_queue));
1735 RUBY_DEBUG_LOG("cancel receiving");
1738 else {
1739 prev = Qtrue;
1742 RACTOR_UNLOCK(r);
1743 return prev;
1746 // Ractor#close_outgoing
1748 static VALUE
1749 ractor_close_outgoing(rb_execution_context_t *ec, rb_ractor_t *r)
1751 VALUE prev;
1753 RACTOR_LOCK(r);
1755 struct rb_ractor_queue *ts = &r->sync.takers_queue;
1756 rb_ractor_t *tr;
1757 struct rb_ractor_basket b;
1759 if (!r->sync.outgoing_port_closed) {
1760 prev = Qfalse;
1761 r->sync.outgoing_port_closed = true;
1763 else {
1764 VM_ASSERT(ractor_queue_empty_p(r, ts));
1765 prev = Qtrue;
1768 // wakeup all taking ractors
1769 while (ractor_queue_deq(r, ts, &b)) {
1770 if (basket_type_p(&b, basket_type_take_basket)) {
1771 tr = RACTOR_PTR(b.sender);
1772 struct rb_ractor_basket *tb = b.p.take.basket;
1774 if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_none, basket_type_yielding) == basket_type_none) {
1775 b.p.take.basket->sender = r->pub.self;
1776 if (RUBY_ATOMIC_CAS(tb->type.atomic, basket_type_yielding, basket_type_deleted) != basket_type_yielding) {
1777 rb_bug("unreachable");
1779 RUBY_DEBUG_LOG("set delete for r:%u", rb_ractor_id(RACTOR_PTR(b.sender)));
1782 if (b.p.take.config) {
1783 b.p.take.config->closed = true;
1786 // TODO: deadlock-able?
1787 RACTOR_LOCK(tr);
1789 ractor_wakeup(tr, wait_taking, wakeup_by_close);
1791 RACTOR_UNLOCK(tr);
1795 // raising yielding Ractor
1796 ractor_wakeup(r, wait_yielding, wakeup_by_close);
1798 VM_ASSERT(ractor_queue_empty_p(r, ts));
1800 RACTOR_UNLOCK(r);
1801 return prev;
1804 // creation/termination
1806 static uint32_t
1807 ractor_next_id(void)
1809 uint32_t id;
1811 id = (uint32_t)(RUBY_ATOMIC_FETCH_ADD(ractor_last_id, 1) + 1);
1813 return id;
1816 static void
1817 vm_insert_ractor0(rb_vm_t *vm, rb_ractor_t *r, bool single_ractor_mode)
1819 RUBY_DEBUG_LOG("r:%u ractor.cnt:%u++", r->pub.id, vm->ractor.cnt);
1820 VM_ASSERT(single_ractor_mode || RB_VM_LOCKED_P());
1822 ccan_list_add_tail(&vm->ractor.set, &r->vmlr_node);
1823 vm->ractor.cnt++;
1826 static void
1827 cancel_single_ractor_mode(void)
1829 // enable multi-ractor mode
1830 RUBY_DEBUG_LOG("enable multi-ractor mode");
1832 VALUE was_disabled = rb_gc_enable();
1834 rb_gc_start();
1835 rb_transient_heap_evacuate();
1837 if (was_disabled) {
1838 rb_gc_disable();
1841 ruby_single_main_ractor = NULL;
1844 static void
1845 vm_insert_ractor(rb_vm_t *vm, rb_ractor_t *r)
1847 VM_ASSERT(ractor_status_p(r, ractor_created));
1849 if (rb_multi_ractor_p()) {
1850 RB_VM_LOCK();
1852 vm_insert_ractor0(vm, r, false);
1853 vm_ractor_blocking_cnt_inc(vm, r, __FILE__, __LINE__);
1855 RB_VM_UNLOCK();
1857 else {
1858 if (vm->ractor.cnt == 0) {
1859 // main ractor
1860 vm_insert_ractor0(vm, r, true);
1861 ractor_status_set(r, ractor_blocking);
1862 ractor_status_set(r, ractor_running);
1864 else {
1865 cancel_single_ractor_mode();
1866 vm_insert_ractor0(vm, r, true);
1867 vm_ractor_blocking_cnt_inc(vm, r, __FILE__, __LINE__);
1872 static void
1873 vm_remove_ractor(rb_vm_t *vm, rb_ractor_t *cr)
1875 VM_ASSERT(ractor_status_p(cr, ractor_running));
1876 VM_ASSERT(vm->ractor.cnt > 1);
1877 VM_ASSERT(cr->threads.cnt == 1);
1879 RB_VM_LOCK();
1881 RUBY_DEBUG_LOG("ractor.cnt:%u-- terminate_waiting:%d",
1882 vm->ractor.cnt, vm->ractor.sync.terminate_waiting);
1884 VM_ASSERT(vm->ractor.cnt > 0);
1885 ccan_list_del(&cr->vmlr_node);
1887 if (vm->ractor.cnt <= 2 && vm->ractor.sync.terminate_waiting) {
1888 rb_native_cond_signal(&vm->ractor.sync.terminate_cond);
1890 vm->ractor.cnt--;
1892 /* Clear the cached freelist to prevent a memory leak. */
1893 rb_gc_ractor_newobj_cache_clear(&cr->newobj_cache);
1895 ractor_status_set(cr, ractor_terminated);
1897 RB_VM_UNLOCK();
1900 static VALUE
1901 ractor_alloc(VALUE klass)
1903 rb_ractor_t *r;
1904 VALUE rv = TypedData_Make_Struct(klass, rb_ractor_t, &ractor_data_type, r);
1905 FL_SET_RAW(rv, RUBY_FL_SHAREABLE);
1906 r->pub.self = rv;
1907 VM_ASSERT(ractor_status_p(r, ractor_created));
1908 return rv;
1911 rb_ractor_t *
1912 rb_ractor_main_alloc(void)
1914 rb_ractor_t *r = ruby_mimmalloc(sizeof(rb_ractor_t));
1915 if (r == NULL) {
1916 fprintf(stderr, "[FATAL] failed to allocate memory for main ractor\n");
1917 exit(EXIT_FAILURE);
1919 MEMZERO(r, rb_ractor_t, 1);
1920 r->pub.id = ++ractor_last_id;
1921 r->loc = Qnil;
1922 r->name = Qnil;
1923 r->pub.self = Qnil;
1924 ruby_single_main_ractor = r;
1926 return r;
1929 #if defined(HAVE_WORKING_FORK)
1930 void
1931 rb_ractor_atfork(rb_vm_t *vm, rb_thread_t *th)
1933 // initialize as a main ractor
1934 vm->ractor.cnt = 0;
1935 vm->ractor.blocking_cnt = 0;
1936 ruby_single_main_ractor = th->ractor;
1937 th->ractor->status_ = ractor_created;
1939 rb_ractor_living_threads_init(th->ractor);
1940 rb_ractor_living_threads_insert(th->ractor, th);
1942 VM_ASSERT(vm->ractor.blocking_cnt == 0);
1943 VM_ASSERT(vm->ractor.cnt == 1);
1945 #endif
1947 void rb_thread_sched_init(struct rb_thread_sched *);
1949 void
1950 rb_ractor_living_threads_init(rb_ractor_t *r)
1952 ccan_list_head_init(&r->threads.set);
1953 r->threads.cnt = 0;
1954 r->threads.blocking_cnt = 0;
1957 static void
1958 ractor_init(rb_ractor_t *r, VALUE name, VALUE loc)
1960 ractor_queue_setup(&r->sync.recv_queue);
1961 ractor_queue_setup(&r->sync.takers_queue);
1962 rb_native_mutex_initialize(&r->sync.lock);
1963 rb_native_cond_initialize(&r->sync.cond);
1964 rb_native_cond_initialize(&r->barrier_wait_cond);
1966 // thread management
1967 rb_thread_sched_init(&r->threads.sched);
1968 rb_ractor_living_threads_init(r);
1970 // naming
1971 if (!NIL_P(name)) {
1972 rb_encoding *enc;
1973 StringValueCStr(name);
1974 enc = rb_enc_get(name);
1975 if (!rb_enc_asciicompat(enc)) {
1976 rb_raise(rb_eArgError, "ASCII incompatible encoding (%s)",
1977 rb_enc_name(enc));
1979 name = rb_str_new_frozen(name);
1981 r->name = name;
1982 r->loc = loc;
1985 void
1986 rb_ractor_main_setup(rb_vm_t *vm, rb_ractor_t *r, rb_thread_t *th)
1988 r->pub.self = TypedData_Wrap_Struct(rb_cRactor, &ractor_data_type, r);
1989 FL_SET_RAW(r->pub.self, RUBY_FL_SHAREABLE);
1990 ractor_init(r, Qnil, Qnil);
1991 r->threads.main = th;
1992 rb_ractor_living_threads_insert(r, th);
1995 static VALUE
1996 ractor_create(rb_execution_context_t *ec, VALUE self, VALUE loc, VALUE name, VALUE args, VALUE block)
1998 VALUE rv = ractor_alloc(self);
1999 rb_ractor_t *r = RACTOR_PTR(rv);
2000 ractor_init(r, name, loc);
2002 // can block here
2003 r->pub.id = ractor_next_id();
2004 RUBY_DEBUG_LOG("r:%u", r->pub.id);
2006 rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
2007 r->verbose = cr->verbose;
2008 r->debug = cr->debug;
2010 rb_yjit_before_ractor_spawn();
2011 rb_mjit_before_ractor_spawn();
2012 rb_thread_create_ractor(r, args, block);
2014 RB_GC_GUARD(rv);
2015 return rv;
2018 static void
2019 ractor_yield_atexit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE v, bool exc)
2021 if (cr->sync.outgoing_port_closed) {
2022 return;
2025 ASSERT_ractor_unlocking(cr);
2027 struct rb_ractor_queue *ts = &cr->sync.takers_queue;
2029 retry:
2030 if (ractor_try_yield(ec, cr, ts, v, Qfalse, exc, true)) {
2031 // OK.
2033 else {
2034 bool retry = false;
2035 RACTOR_LOCK(cr);
2037 if (!ractor_check_take_basket(cr, ts)) {
2038 VM_ASSERT(cr->sync.wait.status == wait_none);
2039 RUBY_DEBUG_LOG("leave a will");
2040 ractor_basket_fill_will(cr, &cr->sync.will_basket, v, exc);
2042 else {
2043 RUBY_DEBUG_LOG("rare timing!");
2044 retry = true; // another ractor is waiting for the yield.
2047 RACTOR_UNLOCK(cr);
2049 if (retry) goto retry;
2053 void
2054 rb_ractor_atexit(rb_execution_context_t *ec, VALUE result)
2056 rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
2057 ractor_yield_atexit(ec, cr, result, false);
2060 void
2061 rb_ractor_atexit_exception(rb_execution_context_t *ec)
2063 rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
2064 ractor_yield_atexit(ec, cr, ec->errinfo, true);
2067 void
2068 rb_ractor_teardown(rb_execution_context_t *ec)
2070 rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
2071 ractor_close_incoming(ec, cr);
2072 ractor_close_outgoing(ec, cr);
2074 // sync with rb_ractor_terminate_interrupt_main_thread()
2075 RB_VM_LOCK_ENTER();
2077 VM_ASSERT(cr->threads.main != NULL);
2078 cr->threads.main = NULL;
2080 RB_VM_LOCK_LEAVE();
2083 void
2084 rb_ractor_receive_parameters(rb_execution_context_t *ec, rb_ractor_t *r, int len, VALUE *ptr)
2086 for (int i=0; i<len; i++) {
2087 ptr[i] = ractor_receive(ec, r);
2091 void
2092 rb_ractor_send_parameters(rb_execution_context_t *ec, rb_ractor_t *r, VALUE args)
2094 int len = RARRAY_LENINT(args);
2095 for (int i=0; i<len; i++) {
2096 ractor_send(ec, r, RARRAY_AREF(args, i), false);
2100 bool
2101 rb_ractor_main_p_(void)
2103 VM_ASSERT(rb_multi_ractor_p());
2104 rb_execution_context_t *ec = GET_EC();
2105 return rb_ec_ractor_ptr(ec) == rb_ec_vm_ptr(ec)->ractor.main_ractor;
2108 bool
2109 rb_obj_is_main_ractor(VALUE gv)
2111 if (!rb_ractor_p(gv)) return false;
2112 rb_ractor_t *r = DATA_PTR(gv);
2113 return r == GET_VM()->ractor.main_ractor;
2117 rb_ractor_living_thread_num(const rb_ractor_t *r)
2119 return r->threads.cnt;
2122 VALUE
2123 rb_ractor_thread_list(rb_ractor_t *r)
2125 rb_thread_t *th = 0;
2126 VALUE *ts;
2127 int ts_cnt;
2129 RACTOR_LOCK(r);
2131 ts = ALLOCA_N(VALUE, r->threads.cnt);
2132 ts_cnt = 0;
2134 ccan_list_for_each(&r->threads.set, th, lt_node) {
2135 switch (th->status) {
2136 case THREAD_RUNNABLE:
2137 case THREAD_STOPPED:
2138 case THREAD_STOPPED_FOREVER:
2139 ts[ts_cnt++] = th->self;
2140 default:
2141 break;
2145 RACTOR_UNLOCK(r);
2147 VALUE ary = rb_ary_new();
2148 for (int i=0; i<ts_cnt; i++) {
2149 rb_ary_push(ary, ts[i]);
2152 return ary;
2155 void
2156 rb_ractor_living_threads_insert(rb_ractor_t *r, rb_thread_t *th)
2158 VM_ASSERT(th != NULL);
2160 RACTOR_LOCK(r);
2162 RUBY_DEBUG_LOG("r(%d)->threads.cnt:%d++", r->pub.id, r->threads.cnt);
2163 ccan_list_add_tail(&r->threads.set, &th->lt_node);
2164 r->threads.cnt++;
2166 RACTOR_UNLOCK(r);
2168 // first thread for a ractor
2169 if (r->threads.cnt == 1) {
2170 VM_ASSERT(ractor_status_p(r, ractor_created));
2171 vm_insert_ractor(th->vm, r);
2175 static void
2176 vm_ractor_blocking_cnt_inc(rb_vm_t *vm, rb_ractor_t *r, const char *file, int line)
2178 ractor_status_set(r, ractor_blocking);
2180 RUBY_DEBUG_LOG2(file, line, "vm->ractor.blocking_cnt:%d++", vm->ractor.blocking_cnt);
2181 vm->ractor.blocking_cnt++;
2182 VM_ASSERT(vm->ractor.blocking_cnt <= vm->ractor.cnt);
2185 void
2186 rb_vm_ractor_blocking_cnt_inc(rb_vm_t *vm, rb_ractor_t *cr, const char *file, int line)
2188 ASSERT_vm_locking();
2189 VM_ASSERT(GET_RACTOR() == cr);
2190 vm_ractor_blocking_cnt_inc(vm, cr, file, line);
2193 void
2194 rb_vm_ractor_blocking_cnt_dec(rb_vm_t *vm, rb_ractor_t *cr, const char *file, int line)
2196 ASSERT_vm_locking();
2197 VM_ASSERT(GET_RACTOR() == cr);
2199 RUBY_DEBUG_LOG2(file, line, "vm->ractor.blocking_cnt:%d--", vm->ractor.blocking_cnt);
2200 VM_ASSERT(vm->ractor.blocking_cnt > 0);
2201 vm->ractor.blocking_cnt--;
2203 ractor_status_set(cr, ractor_running);
2206 static void
2207 ractor_check_blocking(rb_ractor_t *cr, unsigned int remained_thread_cnt, const char *file, int line)
2209 VM_ASSERT(cr == GET_RACTOR());
2211 RUBY_DEBUG_LOG2(file, line,
2212 "cr->threads.cnt:%u cr->threads.blocking_cnt:%u vm->ractor.cnt:%u vm->ractor.blocking_cnt:%u",
2213 cr->threads.cnt, cr->threads.blocking_cnt,
2214 GET_VM()->ractor.cnt, GET_VM()->ractor.blocking_cnt);
2216 VM_ASSERT(cr->threads.cnt >= cr->threads.blocking_cnt + 1);
2218 if (remained_thread_cnt > 0 &&
2219 // will be block
2220 cr->threads.cnt == cr->threads.blocking_cnt + 1) {
2221 // change ractor status: running -> blocking
2222 rb_vm_t *vm = GET_VM();
2223 ASSERT_vm_unlocking();
2225 RB_VM_LOCK();
2227 rb_vm_ractor_blocking_cnt_inc(vm, cr, file, line);
2229 RB_VM_UNLOCK();
2233 void
2234 rb_ractor_living_threads_remove(rb_ractor_t *cr, rb_thread_t *th)
2236 VM_ASSERT(cr == GET_RACTOR());
2237 RUBY_DEBUG_LOG("r->threads.cnt:%d--", cr->threads.cnt);
2238 ractor_check_blocking(cr, cr->threads.cnt - 1, __FILE__, __LINE__);
2240 if (cr->threads.cnt == 1) {
2241 vm_remove_ractor(th->vm, cr);
2243 else {
2244 RACTOR_LOCK(cr);
2246 ccan_list_del(&th->lt_node);
2247 cr->threads.cnt--;
2249 RACTOR_UNLOCK(cr);
2253 void
2254 rb_ractor_blocking_threads_inc(rb_ractor_t *cr, const char *file, int line)
2256 RUBY_DEBUG_LOG2(file, line, "cr->threads.blocking_cnt:%d++", cr->threads.blocking_cnt);
2258 VM_ASSERT(cr->threads.cnt > 0);
2259 VM_ASSERT(cr == GET_RACTOR());
2261 ractor_check_blocking(cr, cr->threads.cnt, __FILE__, __LINE__);
2262 cr->threads.blocking_cnt++;
2265 void
2266 rb_ractor_blocking_threads_dec(rb_ractor_t *cr, const char *file, int line)
2268 RUBY_DEBUG_LOG2(file, line,
2269 "r->threads.blocking_cnt:%d--, r->threads.cnt:%u",
2270 cr->threads.blocking_cnt, cr->threads.cnt);
2272 VM_ASSERT(cr == GET_RACTOR());
2274 if (cr->threads.cnt == cr->threads.blocking_cnt) {
2275 rb_vm_t *vm = GET_VM();
2277 RB_VM_LOCK_ENTER();
2279 rb_vm_ractor_blocking_cnt_dec(vm, cr, __FILE__, __LINE__);
2281 RB_VM_LOCK_LEAVE();
2284 cr->threads.blocking_cnt--;
2287 void
2288 rb_ractor_vm_barrier_interrupt_running_thread(rb_ractor_t *r)
2290 VM_ASSERT(r != GET_RACTOR());
2291 ASSERT_ractor_unlocking(r);
2292 ASSERT_vm_locking();
2294 RACTOR_LOCK(r);
2296 if (ractor_status_p(r, ractor_running)) {
2297 rb_execution_context_t *ec = r->threads.running_ec;
2298 if (ec) {
2299 RUBY_VM_SET_VM_BARRIER_INTERRUPT(ec);
2303 RACTOR_UNLOCK(r);
2306 void
2307 rb_ractor_terminate_interrupt_main_thread(rb_ractor_t *r)
2309 VM_ASSERT(r != GET_RACTOR());
2310 ASSERT_ractor_unlocking(r);
2311 ASSERT_vm_locking();
2313 rb_thread_t *main_th = r->threads.main;
2314 if (main_th) {
2315 if (main_th->status != THREAD_KILLED) {
2316 RUBY_VM_SET_TERMINATE_INTERRUPT(main_th->ec);
2317 rb_threadptr_interrupt(main_th);
2319 else {
2320 RUBY_DEBUG_LOG("killed (%p)", (void *)main_th);
2325 void rb_thread_terminate_all(rb_thread_t *th); // thread.c
2327 static void
2328 ractor_terminal_interrupt_all(rb_vm_t *vm)
2330 if (vm->ractor.cnt > 1) {
2331 // send terminate notification to all ractors
2332 rb_ractor_t *r = 0;
2333 ccan_list_for_each(&vm->ractor.set, r, vmlr_node) {
2334 if (r != vm->ractor.main_ractor) {
2335 rb_ractor_terminate_interrupt_main_thread(r);
2341 void
2342 rb_ractor_terminate_all(void)
2344 rb_vm_t *vm = GET_VM();
2345 rb_ractor_t *cr = vm->ractor.main_ractor;
2347 VM_ASSERT(cr == GET_RACTOR()); // only main-ractor's main-thread should kick it.
2349 if (vm->ractor.cnt > 1) {
2350 RB_VM_LOCK();
2351 ractor_terminal_interrupt_all(vm); // kill all ractors
2352 RB_VM_UNLOCK();
2354 rb_thread_terminate_all(GET_THREAD()); // kill other threads in main-ractor and wait
2356 RB_VM_LOCK();
2358 while (vm->ractor.cnt > 1) {
2359 RUBY_DEBUG_LOG("terminate_waiting:%d", vm->ractor.sync.terminate_waiting);
2360 vm->ractor.sync.terminate_waiting = true;
2362 // wait for 1sec
2363 rb_vm_ractor_blocking_cnt_inc(vm, cr, __FILE__, __LINE__);
2364 rb_vm_cond_timedwait(vm, &vm->ractor.sync.terminate_cond, 1000 /* ms */);
2365 rb_vm_ractor_blocking_cnt_dec(vm, cr, __FILE__, __LINE__);
2367 ractor_terminal_interrupt_all(vm);
2370 RB_VM_UNLOCK();
2373 rb_execution_context_t *
2374 rb_vm_main_ractor_ec(rb_vm_t *vm)
2376 return vm->ractor.main_ractor->threads.running_ec;
2379 static VALUE
2380 ractor_moved_missing(int argc, VALUE *argv, VALUE self)
2382 rb_raise(rb_eRactorMovedError, "can not send any methods to a moved object");
2386 * Document-class: Ractor::ClosedError
2388 * Raised when an attempt is made to send a message to a closed port,
2389 * or to retrieve a message from a closed and empty port.
2390 * Ports may be closed explicitly with Ractor#close_outgoing/close_incoming
2391 * and are closed implicitly when a Ractor terminates.
2393 * r = Ractor.new { sleep(500) }
2394 * r.close_outgoing
2395 * r.take # Ractor::ClosedError
2397 * ClosedError is a descendant of StopIteration, so the closing of the ractor will break
2398 * the loops without propagating the error:
2400 * r = Ractor.new do
2401 * loop do
2402 * msg = receive # raises ClosedError and loop traps it
2403 * puts "Received: #{msg}"
2404 * end
2405 * puts "loop exited"
2406 * end
2408 * 3.times{|i| r << i}
2409 * r.close_incoming
2410 * r.take
2411 * puts "Continue successfully"
2413 * This will print:
2415 * Received: 0
2416 * Received: 1
2417 * Received: 2
2418 * loop exited
2419 * Continue successfully
2423 * Document-class: Ractor::RemoteError
2425 * Raised on attempt to Ractor#take if there was an uncaught exception in the Ractor.
2426 * Its +cause+ will contain the original exception, and +ractor+ is the original ractor
2427 * it was raised in.
2429 * r = Ractor.new { raise "Something weird happened" }
2431 * begin
2432 * r.take
2433 * rescue => e
2434 * p e # => #<Ractor::RemoteError: thrown by remote Ractor.>
2435 * p e.ractor == r # => true
2436 * p e.cause # => #<RuntimeError: Something weird happened>
2437 * end
2442 * Document-class: Ractor::MovedError
2444 * Raised on an attempt to access an object which was moved in Ractor#send or Ractor.yield.
2446 * r = Ractor.new { sleep }
2448 * ary = [1, 2, 3]
2449 * r.send(ary, move: true)
2450 * ary.inspect
2451 * # Ractor::MovedError (can not send any methods to a moved object)
2456 * Document-class: Ractor::MovedObject
2458 * A special object which replaces any value that was moved to another ractor in Ractor#send
2459 * or Ractor.yield. Any attempt to access the object results in Ractor::MovedError.
2461 * r = Ractor.new { receive }
2463 * ary = [1, 2, 3]
2464 * r.send(ary, move: true)
2465 * p Ractor::MovedObject === ary
2466 * # => true
2467 * ary.inspect
2468 * # Ractor::MovedError (can not send any methods to a moved object)
2471 // Main docs are in ractor.rb, but without this clause there are weird artifacts
2472 // in their rendering.
2474 * Document-class: Ractor
2478 void
2479 Init_Ractor(void)
2481 rb_cRactor = rb_define_class("Ractor", rb_cObject);
2482 rb_undef_alloc_func(rb_cRactor);
2484 rb_eRactorError = rb_define_class_under(rb_cRactor, "Error", rb_eRuntimeError);
2485 rb_eRactorIsolationError = rb_define_class_under(rb_cRactor, "IsolationError", rb_eRactorError);
2486 rb_eRactorRemoteError = rb_define_class_under(rb_cRactor, "RemoteError", rb_eRactorError);
2487 rb_eRactorMovedError = rb_define_class_under(rb_cRactor, "MovedError", rb_eRactorError);
2488 rb_eRactorClosedError = rb_define_class_under(rb_cRactor, "ClosedError", rb_eStopIteration);
2489 rb_eRactorUnsafeError = rb_define_class_under(rb_cRactor, "UnsafeError", rb_eRactorError);
2491 rb_cRactorMovedObject = rb_define_class_under(rb_cRactor, "MovedObject", rb_cBasicObject);
2492 rb_undef_alloc_func(rb_cRactorMovedObject);
2493 rb_define_method(rb_cRactorMovedObject, "method_missing", ractor_moved_missing, -1);
2495 // override methods defined in BasicObject
2496 rb_define_method(rb_cRactorMovedObject, "__send__", ractor_moved_missing, -1);
2497 rb_define_method(rb_cRactorMovedObject, "!", ractor_moved_missing, -1);
2498 rb_define_method(rb_cRactorMovedObject, "==", ractor_moved_missing, -1);
2499 rb_define_method(rb_cRactorMovedObject, "!=", ractor_moved_missing, -1);
2500 rb_define_method(rb_cRactorMovedObject, "__id__", ractor_moved_missing, -1);
2501 rb_define_method(rb_cRactorMovedObject, "equal?", ractor_moved_missing, -1);
2502 rb_define_method(rb_cRactorMovedObject, "instance_eval", ractor_moved_missing, -1);
2503 rb_define_method(rb_cRactorMovedObject, "instance_exec", ractor_moved_missing, -1);
2505 rb_cRactorSelector = rb_define_class_under(rb_cRactor, "Selector", rb_cObject);
2506 rb_undef_alloc_func(rb_cRactorSelector);
2509 void
2510 rb_ractor_dump(void)
2512 rb_vm_t *vm = GET_VM();
2513 rb_ractor_t *r = 0;
2515 ccan_list_for_each(&vm->ractor.set, r, vmlr_node) {
2516 if (r != vm->ractor.main_ractor) {
2517 fprintf(stderr, "r:%u (%s)\n", r->pub.id, ractor_status_str(r->status_));
2522 VALUE
2523 rb_ractor_stdin(void)
2525 if (rb_ractor_main_p()) {
2526 return rb_stdin;
2528 else {
2529 rb_ractor_t *cr = GET_RACTOR();
2530 return cr->r_stdin;
2534 VALUE
2535 rb_ractor_stdout(void)
2537 if (rb_ractor_main_p()) {
2538 return rb_stdout;
2540 else {
2541 rb_ractor_t *cr = GET_RACTOR();
2542 return cr->r_stdout;
2546 VALUE
2547 rb_ractor_stderr(void)
2549 if (rb_ractor_main_p()) {
2550 return rb_stderr;
2552 else {
2553 rb_ractor_t *cr = GET_RACTOR();
2554 return cr->r_stderr;
2558 void
2559 rb_ractor_stdin_set(VALUE in)
2561 if (rb_ractor_main_p()) {
2562 rb_stdin = in;
2564 else {
2565 rb_ractor_t *cr = GET_RACTOR();
2566 RB_OBJ_WRITE(cr->pub.self, &cr->r_stdin, in);
2570 void
2571 rb_ractor_stdout_set(VALUE out)
2573 if (rb_ractor_main_p()) {
2574 rb_stdout = out;
2576 else {
2577 rb_ractor_t *cr = GET_RACTOR();
2578 RB_OBJ_WRITE(cr->pub.self, &cr->r_stdout, out);
2582 void
2583 rb_ractor_stderr_set(VALUE err)
2585 if (rb_ractor_main_p()) {
2586 rb_stderr = err;
2588 else {
2589 rb_ractor_t *cr = GET_RACTOR();
2590 RB_OBJ_WRITE(cr->pub.self, &cr->r_stderr, err);
2594 rb_hook_list_t *
2595 rb_ractor_hooks(rb_ractor_t *cr)
2597 return &cr->pub.hooks;
2600 /// traverse function
2602 // 2: stop search
2603 // 1: skip child
2604 // 0: continue
2606 enum obj_traverse_iterator_result {
2607 traverse_cont,
2608 traverse_skip,
2609 traverse_stop,
2612 typedef enum obj_traverse_iterator_result (*rb_obj_traverse_enter_func)(VALUE obj);
2613 typedef enum obj_traverse_iterator_result (*rb_obj_traverse_leave_func)(VALUE obj);
2614 typedef enum obj_traverse_iterator_result (*rb_obj_traverse_final_func)(VALUE obj);
2616 static enum obj_traverse_iterator_result null_leave(VALUE obj);
2618 struct obj_traverse_data {
2619 rb_obj_traverse_enter_func enter_func;
2620 rb_obj_traverse_leave_func leave_func;
2622 st_table *rec;
2623 VALUE rec_hash;
2627 struct obj_traverse_callback_data {
2628 bool stop;
2629 struct obj_traverse_data *data;
2632 static int obj_traverse_i(VALUE obj, struct obj_traverse_data *data);
2634 static int
2635 obj_hash_traverse_i(VALUE key, VALUE val, VALUE ptr)
2637 struct obj_traverse_callback_data *d = (struct obj_traverse_callback_data *)ptr;
2639 if (obj_traverse_i(key, d->data)) {
2640 d->stop = true;
2641 return ST_STOP;
2644 if (obj_traverse_i(val, d->data)) {
2645 d->stop = true;
2646 return ST_STOP;
2649 return ST_CONTINUE;
2652 static enum rb_id_table_iterator_result
2653 obj_hash_iv_traverse_i(VALUE val, void *ptr)
2655 struct obj_traverse_callback_data *d = (struct obj_traverse_callback_data *)ptr;
2657 if (obj_traverse_i(val, d->data)) {
2658 d->stop = true;
2659 return ID_TABLE_STOP;
2662 return ID_TABLE_CONTINUE;
2665 static void
2666 obj_traverse_reachable_i(VALUE obj, void *ptr)
2668 struct obj_traverse_callback_data *d = (struct obj_traverse_callback_data *)ptr;
2670 if (obj_traverse_i(obj, d->data)) {
2671 d->stop = true;
2675 static struct st_table *
2676 obj_traverse_rec(struct obj_traverse_data *data)
2678 if (UNLIKELY(!data->rec)) {
2679 data->rec_hash = rb_ident_hash_new();
2680 data->rec = RHASH_ST_TABLE(data->rec_hash);
2682 return data->rec;
2685 static int
2686 obj_traverse_i(VALUE obj, struct obj_traverse_data *data)
2688 if (RB_SPECIAL_CONST_P(obj)) return 0;
2690 switch (data->enter_func(obj)) {
2691 case traverse_cont: break;
2692 case traverse_skip: return 0; // skip children
2693 case traverse_stop: return 1; // stop search
2696 if (UNLIKELY(st_insert(obj_traverse_rec(data), obj, 1))) {
2697 // already traversed
2698 return 0;
2701 if (UNLIKELY(FL_TEST_RAW(obj, FL_EXIVAR))) {
2702 struct gen_ivtbl *ivtbl;
2703 rb_ivar_generic_ivtbl_lookup(obj, &ivtbl);
2704 for (uint32_t i = 0; i < ivtbl->numiv; i++) {
2705 VALUE val = ivtbl->ivptr[i];
2706 if (!UNDEF_P(val) && obj_traverse_i(val, data)) return 1;
2710 switch (BUILTIN_TYPE(obj)) {
2711 // no child node
2712 case T_STRING:
2713 case T_FLOAT:
2714 case T_BIGNUM:
2715 case T_REGEXP:
2716 case T_FILE:
2717 case T_SYMBOL:
2718 case T_MATCH:
2719 break;
2721 case T_OBJECT:
2723 if (rb_shape_obj_too_complex(obj)) {
2724 struct obj_traverse_callback_data d = {
2725 .stop = false,
2726 .data = data,
2728 rb_id_table_foreach_values(ROBJECT_IV_HASH(obj), obj_hash_iv_traverse_i, &d);
2729 if (d.stop) return 1;
2731 else {
2732 uint32_t len = ROBJECT_IV_COUNT(obj);
2733 VALUE *ptr = ROBJECT_IVPTR(obj);
2735 for (uint32_t i=0; i<len; i++) {
2736 VALUE val = ptr[i];
2737 if (!UNDEF_P(val) && obj_traverse_i(val, data)) return 1;
2741 break;
2743 case T_ARRAY:
2745 for (int i = 0; i < RARRAY_LENINT(obj); i++) {
2746 VALUE e = rb_ary_entry(obj, i);
2747 if (obj_traverse_i(e, data)) return 1;
2750 break;
2752 case T_HASH:
2754 if (obj_traverse_i(RHASH_IFNONE(obj), data)) return 1;
2756 struct obj_traverse_callback_data d = {
2757 .stop = false,
2758 .data = data,
2760 rb_hash_foreach(obj, obj_hash_traverse_i, (VALUE)&d);
2761 if (d.stop) return 1;
2763 break;
2765 case T_STRUCT:
2767 long len = RSTRUCT_LEN(obj);
2768 const VALUE *ptr = RSTRUCT_CONST_PTR(obj);
2770 for (long i=0; i<len; i++) {
2771 if (obj_traverse_i(ptr[i], data)) return 1;
2774 break;
2776 case T_RATIONAL:
2777 if (obj_traverse_i(RRATIONAL(obj)->num, data)) return 1;
2778 if (obj_traverse_i(RRATIONAL(obj)->den, data)) return 1;
2779 break;
2780 case T_COMPLEX:
2781 if (obj_traverse_i(RCOMPLEX(obj)->real, data)) return 1;
2782 if (obj_traverse_i(RCOMPLEX(obj)->imag, data)) return 1;
2783 break;
2785 case T_DATA:
2786 case T_IMEMO:
2788 struct obj_traverse_callback_data d = {
2789 .stop = false,
2790 .data = data,
2792 RB_VM_LOCK_ENTER_NO_BARRIER();
2794 rb_objspace_reachable_objects_from(obj, obj_traverse_reachable_i, &d);
2796 RB_VM_LOCK_LEAVE_NO_BARRIER();
2797 if (d.stop) return 1;
2799 break;
2801 // unreachable
2802 case T_CLASS:
2803 case T_MODULE:
2804 case T_ICLASS:
2805 default:
2806 rp(obj);
2807 rb_bug("unreachable");
2810 if (data->leave_func(obj) == traverse_stop) {
2811 return 1;
2813 else {
2814 return 0;
2818 struct rb_obj_traverse_final_data {
2819 rb_obj_traverse_final_func final_func;
2820 int stopped;
2823 static int
2824 obj_traverse_final_i(st_data_t key, st_data_t val, st_data_t arg)
2826 struct rb_obj_traverse_final_data *data = (void *)arg;
2827 if (data->final_func(key)) {
2828 data->stopped = 1;
2829 return ST_STOP;
2831 return ST_CONTINUE;
2834 // 0: traverse all
2835 // 1: stopped
2836 static int
2837 rb_obj_traverse(VALUE obj,
2838 rb_obj_traverse_enter_func enter_func,
2839 rb_obj_traverse_leave_func leave_func,
2840 rb_obj_traverse_final_func final_func)
2842 struct obj_traverse_data data = {
2843 .enter_func = enter_func,
2844 .leave_func = leave_func,
2845 .rec = NULL,
2848 if (obj_traverse_i(obj, &data)) return 1;
2849 if (final_func && data.rec) {
2850 struct rb_obj_traverse_final_data f = {final_func, 0};
2851 st_foreach(data.rec, obj_traverse_final_i, (st_data_t)&f);
2852 return f.stopped;
2854 return 0;
2857 static int
2858 frozen_shareable_p(VALUE obj, bool *made_shareable)
2860 if (!RB_TYPE_P(obj, T_DATA)) {
2861 return true;
2863 else if (RTYPEDDATA_P(obj)) {
2864 const rb_data_type_t *type = RTYPEDDATA_TYPE(obj);
2865 if (type->flags & RUBY_TYPED_FROZEN_SHAREABLE) {
2866 return true;
2868 else if (made_shareable && rb_obj_is_proc(obj)) {
2869 // special path to make shareable Proc.
2870 rb_proc_ractor_make_shareable(obj);
2871 *made_shareable = true;
2872 VM_ASSERT(RB_OBJ_SHAREABLE_P(obj));
2873 return false;
2877 return false;
2880 static enum obj_traverse_iterator_result
2881 make_shareable_check_shareable(VALUE obj)
2883 VM_ASSERT(!SPECIAL_CONST_P(obj));
2884 bool made_shareable = false;
2886 if (rb_ractor_shareable_p(obj)) {
2887 return traverse_skip;
2889 else if (!frozen_shareable_p(obj, &made_shareable)) {
2890 if (made_shareable) {
2891 return traverse_skip;
2893 else {
2894 rb_raise(rb_eRactorError, "can not make shareable object for %"PRIsVALUE, obj);
2898 if (!RB_OBJ_FROZEN_RAW(obj)) {
2899 rb_funcall(obj, idFreeze, 0);
2901 if (UNLIKELY(!RB_OBJ_FROZEN_RAW(obj))) {
2902 rb_raise(rb_eRactorError, "#freeze does not freeze object correctly");
2905 if (RB_OBJ_SHAREABLE_P(obj)) {
2906 return traverse_skip;
2910 return traverse_cont;
2913 static enum obj_traverse_iterator_result
2914 mark_shareable(VALUE obj)
2916 FL_SET_RAW(obj, RUBY_FL_SHAREABLE);
2917 return traverse_cont;
2920 VALUE
2921 rb_ractor_make_shareable(VALUE obj)
2923 rb_obj_traverse(obj,
2924 make_shareable_check_shareable,
2925 null_leave, mark_shareable);
2926 return obj;
2929 VALUE
2930 rb_ractor_make_shareable_copy(VALUE obj)
2932 VALUE copy = ractor_copy(obj);
2933 rb_obj_traverse(copy,
2934 make_shareable_check_shareable,
2935 null_leave, mark_shareable);
2936 return copy;
2939 VALUE
2940 rb_ractor_ensure_shareable(VALUE obj, VALUE name)
2942 if (!rb_ractor_shareable_p(obj)) {
2943 VALUE message = rb_sprintf("cannot assign unshareable object to %"PRIsVALUE,
2944 name);
2945 rb_exc_raise(rb_exc_new_str(rb_eRactorIsolationError, message));
2947 return obj;
2950 void
2951 rb_ractor_ensure_main_ractor(const char *msg)
2953 if (!rb_ractor_main_p()) {
2954 rb_raise(rb_eRactorIsolationError, "%s", msg);
2958 static enum obj_traverse_iterator_result
2959 shareable_p_enter(VALUE obj)
2961 if (RB_OBJ_SHAREABLE_P(obj)) {
2962 return traverse_skip;
2964 else if (RB_TYPE_P(obj, T_CLASS) ||
2965 RB_TYPE_P(obj, T_MODULE) ||
2966 RB_TYPE_P(obj, T_ICLASS)) {
2967 // TODO: remove it
2968 mark_shareable(obj);
2969 return traverse_skip;
2971 else if (RB_OBJ_FROZEN_RAW(obj) &&
2972 frozen_shareable_p(obj, NULL)) {
2973 return traverse_cont;
2976 return traverse_stop; // fail
2979 bool
2980 rb_ractor_shareable_p_continue(VALUE obj)
2982 if (rb_obj_traverse(obj,
2983 shareable_p_enter, null_leave,
2984 mark_shareable)) {
2985 return false;
2987 else {
2988 return true;
2992 #if RACTOR_CHECK_MODE > 0
2993 static enum obj_traverse_iterator_result
2994 reset_belonging_enter(VALUE obj)
2996 if (rb_ractor_shareable_p(obj)) {
2997 return traverse_skip;
2999 else {
3000 rb_ractor_setup_belonging(obj);
3001 return traverse_cont;
3004 #endif
3006 static enum obj_traverse_iterator_result
3007 null_leave(VALUE obj)
3009 return traverse_cont;
3012 static VALUE
3013 ractor_reset_belonging(VALUE obj)
3015 #if RACTOR_CHECK_MODE > 0
3016 rb_obj_traverse(obj, reset_belonging_enter, null_leave, NULL);
3017 #endif
3018 return obj;
3022 /// traverse and replace function
3024 // 2: stop search
3025 // 1: skip child
3026 // 0: continue
3028 struct obj_traverse_replace_data;
3029 static int obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data);
3030 typedef enum obj_traverse_iterator_result (*rb_obj_traverse_replace_enter_func)(VALUE obj, struct obj_traverse_replace_data *data);
3031 typedef enum obj_traverse_iterator_result (*rb_obj_traverse_replace_leave_func)(VALUE obj, struct obj_traverse_replace_data *data);
3033 struct obj_traverse_replace_data {
3034 rb_obj_traverse_replace_enter_func enter_func;
3035 rb_obj_traverse_replace_leave_func leave_func;
3037 st_table *rec;
3038 VALUE rec_hash;
3040 VALUE replacement;
3041 bool move;
3044 struct obj_traverse_replace_callback_data {
3045 bool stop;
3046 VALUE src;
3047 struct obj_traverse_replace_data *data;
3050 static int
3051 obj_hash_traverse_replace_foreach_i(st_data_t key, st_data_t value, st_data_t argp, int error)
3053 return ST_REPLACE;
3056 static int
3057 obj_hash_traverse_replace_i(st_data_t *key, st_data_t *val, st_data_t ptr, int exists)
3059 struct obj_traverse_replace_callback_data *d = (struct obj_traverse_replace_callback_data *)ptr;
3060 struct obj_traverse_replace_data *data = d->data;
3062 if (obj_traverse_replace_i(*key, data)) {
3063 d->stop = true;
3064 return ST_STOP;
3066 else if (*key != data->replacement) {
3067 VALUE v = *key = data->replacement;
3068 RB_OBJ_WRITTEN(d->src, Qundef, v);
3071 if (obj_traverse_replace_i(*val, data)) {
3072 d->stop = true;
3073 return ST_STOP;
3075 else if (*val != data->replacement) {
3076 VALUE v = *val = data->replacement;
3077 RB_OBJ_WRITTEN(d->src, Qundef, v);
3080 return ST_CONTINUE;
3083 static enum rb_id_table_iterator_result
3084 obj_iv_hash_traverse_replace_foreach_i(VALUE val, void *data)
3086 return ID_TABLE_REPLACE;
3089 static enum rb_id_table_iterator_result
3090 obj_iv_hash_traverse_replace_i(VALUE *val, void *ptr, int exists)
3092 struct obj_traverse_replace_callback_data *d = (struct obj_traverse_replace_callback_data *)ptr;
3093 struct obj_traverse_replace_data *data = d->data;
3095 if (obj_traverse_replace_i(*val, data)) {
3096 d->stop = true;
3097 return ID_TABLE_STOP;
3099 else if (*val != data->replacement) {
3100 VALUE v = *val = data->replacement;
3101 RB_OBJ_WRITTEN(d->src, Qundef, v);
3104 return ID_TABLE_CONTINUE;
3107 static struct st_table *
3108 obj_traverse_replace_rec(struct obj_traverse_replace_data *data)
3110 if (UNLIKELY(!data->rec)) {
3111 data->rec_hash = rb_ident_hash_new();
3112 data->rec = RHASH_ST_TABLE(data->rec_hash);
3114 return data->rec;
3117 #if USE_TRANSIENT_HEAP
3118 void rb_ary_transient_heap_evacuate(VALUE ary, int promote);
3119 void rb_obj_transient_heap_evacuate(VALUE obj, int promote);
3120 void rb_hash_transient_heap_evacuate(VALUE hash, int promote);
3121 void rb_struct_transient_heap_evacuate(VALUE st, int promote);
3122 #endif
3124 static void
3125 obj_refer_only_shareables_p_i(VALUE obj, void *ptr)
3127 int *pcnt = (int *)ptr;
3129 if (!rb_ractor_shareable_p(obj)) {
3130 *pcnt++;
3134 static int
3135 obj_refer_only_shareables_p(VALUE obj)
3137 int cnt = 0;
3138 RB_VM_LOCK_ENTER_NO_BARRIER();
3140 rb_objspace_reachable_objects_from(obj, obj_refer_only_shareables_p_i, &cnt);
3142 RB_VM_LOCK_LEAVE_NO_BARRIER();
3143 return cnt == 0;
3146 static int
3147 obj_traverse_replace_i(VALUE obj, struct obj_traverse_replace_data *data)
3149 st_data_t replacement;
3151 if (RB_SPECIAL_CONST_P(obj)) {
3152 data->replacement = obj;
3153 return 0;
3156 switch (data->enter_func(obj, data)) {
3157 case traverse_cont: break;
3158 case traverse_skip: return 0; // skip children
3159 case traverse_stop: return 1; // stop search
3162 replacement = (st_data_t)data->replacement;
3164 if (UNLIKELY(st_lookup(obj_traverse_replace_rec(data), (st_data_t)obj, &replacement))) {
3165 data->replacement = (VALUE)replacement;
3166 return 0;
3168 else {
3169 st_insert(obj_traverse_replace_rec(data), (st_data_t)obj, replacement);
3172 if (!data->move) {
3173 obj = replacement;
3176 #define CHECK_AND_REPLACE(v) do { \
3177 VALUE _val = (v); \
3178 if (obj_traverse_replace_i(_val, data)) { return 1; } \
3179 else if (data->replacement != _val) { RB_OBJ_WRITE(obj, &v, data->replacement); } \
3180 } while (0)
3182 if (UNLIKELY(FL_TEST_RAW(obj, FL_EXIVAR))) {
3183 struct gen_ivtbl *ivtbl;
3184 rb_ivar_generic_ivtbl_lookup(obj, &ivtbl);
3185 for (uint32_t i = 0; i < ivtbl->numiv; i++) {
3186 if (!UNDEF_P(ivtbl->ivptr[i])) {
3187 CHECK_AND_REPLACE(ivtbl->ivptr[i]);
3192 switch (BUILTIN_TYPE(obj)) {
3193 // no child node
3194 case T_FLOAT:
3195 case T_BIGNUM:
3196 case T_REGEXP:
3197 case T_FILE:
3198 case T_SYMBOL:
3199 case T_MATCH:
3200 break;
3201 case T_STRING:
3202 rb_str_make_independent(obj);
3203 break;
3205 case T_OBJECT:
3207 if (rb_shape_obj_too_complex(obj)) {
3208 struct rb_id_table * table = ROBJECT_IV_HASH(obj);
3209 struct obj_traverse_replace_callback_data d = {
3210 .stop = false,
3211 .data = data,
3212 .src = obj,
3214 rb_id_table_foreach_values_with_replace(table,
3215 obj_iv_hash_traverse_replace_foreach_i,
3216 obj_iv_hash_traverse_replace_i,
3217 (void *)&d);
3219 else {
3220 #if USE_TRANSIENT_HEAP
3221 if (data->move) rb_obj_transient_heap_evacuate(obj, TRUE);
3222 #endif
3224 uint32_t len = ROBJECT_IV_COUNT(obj);
3225 VALUE *ptr = ROBJECT_IVPTR(obj);
3227 for (uint32_t i=0; i<len; i++) {
3228 if (!UNDEF_P(ptr[i])) {
3229 CHECK_AND_REPLACE(ptr[i]);
3234 break;
3236 case T_ARRAY:
3238 rb_ary_cancel_sharing(obj);
3239 #if USE_TRANSIENT_HEAP
3240 if (data->move) rb_ary_transient_heap_evacuate(obj, TRUE);
3241 #endif
3243 for (int i = 0; i < RARRAY_LENINT(obj); i++) {
3244 VALUE e = rb_ary_entry(obj, i);
3246 if (obj_traverse_replace_i(e, data)) {
3247 return 1;
3249 else if (e != data->replacement) {
3250 RARRAY_ASET(obj, i, data->replacement);
3253 RB_GC_GUARD(obj);
3255 break;
3257 case T_HASH:
3259 #if USE_TRANSIENT_HEAP
3260 if (data->move) rb_hash_transient_heap_evacuate(obj, TRUE);
3261 #endif
3262 struct obj_traverse_replace_callback_data d = {
3263 .stop = false,
3264 .data = data,
3265 .src = obj,
3267 rb_hash_stlike_foreach_with_replace(obj,
3268 obj_hash_traverse_replace_foreach_i,
3269 obj_hash_traverse_replace_i,
3270 (VALUE)&d);
3271 if (d.stop) return 1;
3272 // TODO: rehash here?
3274 VALUE ifnone = RHASH_IFNONE(obj);
3275 if (obj_traverse_replace_i(ifnone, data)) {
3276 return 1;
3278 else if (ifnone != data->replacement) {
3279 RHASH_SET_IFNONE(obj, data->replacement);
3282 break;
3284 case T_STRUCT:
3286 #if USE_TRANSIENT_HEAP
3287 if (data->move) rb_struct_transient_heap_evacuate(obj, TRUE);
3288 #endif
3289 long len = RSTRUCT_LEN(obj);
3290 const VALUE *ptr = RSTRUCT_CONST_PTR(obj);
3292 for (long i=0; i<len; i++) {
3293 CHECK_AND_REPLACE(ptr[i]);
3296 break;
3298 case T_RATIONAL:
3299 CHECK_AND_REPLACE(RRATIONAL(obj)->num);
3300 CHECK_AND_REPLACE(RRATIONAL(obj)->den);
3301 break;
3302 case T_COMPLEX:
3303 CHECK_AND_REPLACE(RCOMPLEX(obj)->real);
3304 CHECK_AND_REPLACE(RCOMPLEX(obj)->imag);
3305 break;
3307 case T_DATA:
3308 if (!data->move && obj_refer_only_shareables_p(obj)) {
3309 break;
3311 else {
3312 rb_raise(rb_eRactorError, "can not %s %"PRIsVALUE" object.",
3313 data->move ? "move" : "copy", rb_class_of(obj));
3316 case T_IMEMO:
3317 // not supported yet
3318 return 1;
3320 // unreachable
3321 case T_CLASS:
3322 case T_MODULE:
3323 case T_ICLASS:
3324 default:
3325 rp(obj);
3326 rb_bug("unreachable");
3329 data->replacement = (VALUE)replacement;
3331 if (data->leave_func(obj, data) == traverse_stop) {
3332 return 1;
3334 else {
3335 return 0;
3339 // 0: traverse all
3340 // 1: stopped
3341 static VALUE
3342 rb_obj_traverse_replace(VALUE obj,
3343 rb_obj_traverse_replace_enter_func enter_func,
3344 rb_obj_traverse_replace_leave_func leave_func,
3345 bool move)
3347 struct obj_traverse_replace_data data = {
3348 .enter_func = enter_func,
3349 .leave_func = leave_func,
3350 .rec = NULL,
3351 .replacement = Qundef,
3352 .move = move,
3355 if (obj_traverse_replace_i(obj, &data)) {
3356 return Qundef;
3358 else {
3359 return data.replacement;
3363 struct RVALUE {
3364 VALUE flags;
3365 VALUE klass;
3366 VALUE v1;
3367 VALUE v2;
3368 VALUE v3;
3371 static const VALUE fl_users = FL_USER1 | FL_USER2 | FL_USER3 |
3372 FL_USER4 | FL_USER5 | FL_USER6 | FL_USER7 |
3373 FL_USER8 | FL_USER9 | FL_USER10 | FL_USER11 |
3374 FL_USER12 | FL_USER13 | FL_USER14 | FL_USER15 |
3375 FL_USER16 | FL_USER17 | FL_USER18 | FL_USER19;
3377 static void
3378 ractor_moved_bang(VALUE obj)
3380 // invalidate src object
3381 struct RVALUE *rv = (void *)obj;
3383 rv->klass = rb_cRactorMovedObject;
3384 rv->v1 = 0;
3385 rv->v2 = 0;
3386 rv->v3 = 0;
3387 rv->flags = rv->flags & ~fl_users;
3389 // TODO: record moved location
3392 static enum obj_traverse_iterator_result
3393 move_enter(VALUE obj, struct obj_traverse_replace_data *data)
3395 if (rb_ractor_shareable_p(obj)) {
3396 data->replacement = obj;
3397 return traverse_skip;
3399 else {
3400 data->replacement = rb_obj_alloc(RBASIC_CLASS(obj));
3401 return traverse_cont;
3405 void rb_replace_generic_ivar(VALUE clone, VALUE obj); // variable.c
3407 static enum obj_traverse_iterator_result
3408 move_leave(VALUE obj, struct obj_traverse_replace_data *data)
3410 VALUE v = data->replacement;
3411 struct RVALUE *dst = (struct RVALUE *)v;
3412 struct RVALUE *src = (struct RVALUE *)obj;
3414 dst->flags = (dst->flags & ~fl_users) | (src->flags & fl_users);
3416 dst->v1 = src->v1;
3417 dst->v2 = src->v2;
3418 dst->v3 = src->v3;
3420 if (UNLIKELY(FL_TEST_RAW(obj, FL_EXIVAR))) {
3421 rb_replace_generic_ivar(v, obj);
3424 // TODO: generic_ivar
3426 ractor_moved_bang(obj);
3427 return traverse_cont;
3430 static VALUE
3431 ractor_move(VALUE obj)
3433 VALUE val = rb_obj_traverse_replace(obj, move_enter, move_leave, true);
3434 if (!UNDEF_P(val)) {
3435 return val;
3437 else {
3438 rb_raise(rb_eRactorError, "can not move the object");
3442 static enum obj_traverse_iterator_result
3443 copy_enter(VALUE obj, struct obj_traverse_replace_data *data)
3445 if (rb_ractor_shareable_p(obj)) {
3446 data->replacement = obj;
3447 return traverse_skip;
3449 else {
3450 data->replacement = rb_obj_clone(obj);
3451 return traverse_cont;
3455 static enum obj_traverse_iterator_result
3456 copy_leave(VALUE obj, struct obj_traverse_replace_data *data)
3458 return traverse_cont;
3461 static VALUE
3462 ractor_copy(VALUE obj)
3464 VALUE val = rb_obj_traverse_replace(obj, copy_enter, copy_leave, false);
3465 if (!UNDEF_P(val)) {
3466 return val;
3468 else {
3469 rb_raise(rb_eRactorError, "can not copy the object");
3473 // Ractor local storage
3475 struct rb_ractor_local_key_struct {
3476 const struct rb_ractor_local_storage_type *type;
3477 void *main_cache;
3480 static struct freed_ractor_local_keys_struct {
3481 int cnt;
3482 int capa;
3483 rb_ractor_local_key_t *keys;
3484 } freed_ractor_local_keys;
3486 static int
3487 ractor_local_storage_mark_i(st_data_t key, st_data_t val, st_data_t dmy)
3489 struct rb_ractor_local_key_struct *k = (struct rb_ractor_local_key_struct *)key;
3490 if (k->type->mark) (*k->type->mark)((void *)val);
3491 return ST_CONTINUE;
3494 static enum rb_id_table_iterator_result
3495 idkey_local_storage_mark_i(ID id, VALUE val, void *dmy)
3497 rb_gc_mark(val);
3498 return ID_TABLE_CONTINUE;
3501 static void
3502 ractor_local_storage_mark(rb_ractor_t *r)
3504 if (r->local_storage) {
3505 st_foreach(r->local_storage, ractor_local_storage_mark_i, 0);
3507 for (int i=0; i<freed_ractor_local_keys.cnt; i++) {
3508 rb_ractor_local_key_t key = freed_ractor_local_keys.keys[i];
3509 st_data_t val, k = (st_data_t)key;
3510 if (st_delete(r->local_storage, &k, &val) &&
3511 (key = (rb_ractor_local_key_t)k)->type->free) {
3512 (*key->type->free)((void *)val);
3517 if (r->idkey_local_storage) {
3518 rb_id_table_foreach(r->idkey_local_storage, idkey_local_storage_mark_i, NULL);
3522 static int
3523 ractor_local_storage_free_i(st_data_t key, st_data_t val, st_data_t dmy)
3525 struct rb_ractor_local_key_struct *k = (struct rb_ractor_local_key_struct *)key;
3526 if (k->type->free) (*k->type->free)((void *)val);
3527 return ST_CONTINUE;
3530 static void
3531 ractor_local_storage_free(rb_ractor_t *r)
3533 if (r->local_storage) {
3534 st_foreach(r->local_storage, ractor_local_storage_free_i, 0);
3535 st_free_table(r->local_storage);
3538 if (r->idkey_local_storage) {
3539 rb_id_table_free(r->idkey_local_storage);
3543 static void
3544 rb_ractor_local_storage_value_mark(void *ptr)
3546 rb_gc_mark((VALUE)ptr);
3549 static const struct rb_ractor_local_storage_type ractor_local_storage_type_null = {
3550 NULL,
3551 NULL,
3554 const struct rb_ractor_local_storage_type rb_ractor_local_storage_type_free = {
3555 NULL,
3556 ruby_xfree,
3559 static const struct rb_ractor_local_storage_type ractor_local_storage_type_value = {
3560 rb_ractor_local_storage_value_mark,
3561 NULL,
3564 rb_ractor_local_key_t
3565 rb_ractor_local_storage_ptr_newkey(const struct rb_ractor_local_storage_type *type)
3567 rb_ractor_local_key_t key = ALLOC(struct rb_ractor_local_key_struct);
3568 key->type = type ? type : &ractor_local_storage_type_null;
3569 key->main_cache = (void *)Qundef;
3570 return key;
3573 rb_ractor_local_key_t
3574 rb_ractor_local_storage_value_newkey(void)
3576 return rb_ractor_local_storage_ptr_newkey(&ractor_local_storage_type_value);
3579 void
3580 rb_ractor_local_storage_delkey(rb_ractor_local_key_t key)
3582 RB_VM_LOCK_ENTER();
3584 if (freed_ractor_local_keys.cnt == freed_ractor_local_keys.capa) {
3585 freed_ractor_local_keys.capa = freed_ractor_local_keys.capa ? freed_ractor_local_keys.capa * 2 : 4;
3586 REALLOC_N(freed_ractor_local_keys.keys, rb_ractor_local_key_t, freed_ractor_local_keys.capa);
3588 freed_ractor_local_keys.keys[freed_ractor_local_keys.cnt++] = key;
3590 RB_VM_LOCK_LEAVE();
3593 static bool
3594 ractor_local_ref(rb_ractor_local_key_t key, void **pret)
3596 if (rb_ractor_main_p()) {
3597 if (!UNDEF_P((VALUE)key->main_cache)) {
3598 *pret = key->main_cache;
3599 return true;
3601 else {
3602 return false;
3605 else {
3606 rb_ractor_t *cr = GET_RACTOR();
3608 if (cr->local_storage && st_lookup(cr->local_storage, (st_data_t)key, (st_data_t *)pret)) {
3609 return true;
3611 else {
3612 return false;
3617 static void
3618 ractor_local_set(rb_ractor_local_key_t key, void *ptr)
3620 rb_ractor_t *cr = GET_RACTOR();
3622 if (cr->local_storage == NULL) {
3623 cr->local_storage = st_init_numtable();
3626 st_insert(cr->local_storage, (st_data_t)key, (st_data_t)ptr);
3628 if (rb_ractor_main_p()) {
3629 key->main_cache = ptr;
3633 VALUE
3634 rb_ractor_local_storage_value(rb_ractor_local_key_t key)
3636 void *val;
3637 if (ractor_local_ref(key, &val)) {
3638 return (VALUE)val;
3640 else {
3641 return Qnil;
3645 bool
3646 rb_ractor_local_storage_value_lookup(rb_ractor_local_key_t key, VALUE *val)
3648 if (ractor_local_ref(key, (void **)val)) {
3649 return true;
3651 else {
3652 return false;
3656 void
3657 rb_ractor_local_storage_value_set(rb_ractor_local_key_t key, VALUE val)
3659 ractor_local_set(key, (void *)val);
3662 void *
3663 rb_ractor_local_storage_ptr(rb_ractor_local_key_t key)
3665 void *ret;
3666 if (ractor_local_ref(key, &ret)) {
3667 return ret;
3669 else {
3670 return NULL;
3674 void
3675 rb_ractor_local_storage_ptr_set(rb_ractor_local_key_t key, void *ptr)
3677 ractor_local_set(key, ptr);
3680 #define DEFAULT_KEYS_CAPA 0x10
3682 void
3683 rb_ractor_finish_marking(void)
3685 for (int i=0; i<freed_ractor_local_keys.cnt; i++) {
3686 ruby_xfree(freed_ractor_local_keys.keys[i]);
3688 freed_ractor_local_keys.cnt = 0;
3689 if (freed_ractor_local_keys.capa > DEFAULT_KEYS_CAPA) {
3690 freed_ractor_local_keys.capa = DEFAULT_KEYS_CAPA;
3691 REALLOC_N(freed_ractor_local_keys.keys, rb_ractor_local_key_t, DEFAULT_KEYS_CAPA);
3695 static VALUE
3696 ractor_local_value(rb_execution_context_t *ec, VALUE self, VALUE sym)
3698 rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
3699 ID id = rb_check_id(&sym);
3700 struct rb_id_table *tbl = cr->idkey_local_storage;
3701 VALUE val;
3703 if (id && tbl && rb_id_table_lookup(tbl, id, &val)) {
3704 return val;
3706 else {
3707 return Qnil;
3711 static VALUE
3712 ractor_local_value_set(rb_execution_context_t *ec, VALUE self, VALUE sym, VALUE val)
3714 rb_ractor_t *cr = rb_ec_ractor_ptr(ec);
3715 ID id = SYM2ID(rb_to_symbol(sym));
3716 struct rb_id_table *tbl = cr->idkey_local_storage;
3718 if (tbl == NULL) {
3719 tbl = cr->idkey_local_storage = rb_id_table_create(2);
3721 rb_id_table_insert(tbl, id, val);
3722 return val;
3725 #include "ractor.rbinc"