// this file is included by ractor.c struct ractor_port { rb_ractor_t *r; st_data_t id_; }; static st_data_t ractor_port_id(const struct ractor_port *rp) { return rp->id_; } static VALUE rb_cRactorPort; static VALUE ractor_receive(rb_execution_context_t *ec, const struct ractor_port *rp); static VALUE ractor_send(rb_execution_context_t *ec, const struct ractor_port *rp, VALUE obj, VALUE move); static VALUE ractor_try_send(rb_execution_context_t *ec, const struct ractor_port *rp, VALUE obj, VALUE move); static void ractor_add_port(rb_ractor_t *r, st_data_t id); static void ractor_port_mark(void *ptr) { const struct ractor_port *rp = (struct ractor_port *)ptr; if (rp->r) { rb_gc_mark(rp->r->pub.self); } } static void ractor_port_free(void *ptr) { xfree(ptr); } static size_t ractor_port_memsize(const void *ptr) { return sizeof(struct ractor_port); } static const rb_data_type_t ractor_port_data_type = { "ractor/port", { ractor_port_mark, ractor_port_free, ractor_port_memsize, NULL, // update }, 0, 0, RUBY_TYPED_FREE_IMMEDIATELY | RUBY_TYPED_WB_PROTECTED, }; static st_data_t ractor_genid_for_port(rb_ractor_t *cr) { // TODO: enough? return cr->sync.next_port_id++; } static struct ractor_port * RACTOR_PORT_PTR(VALUE self) { VM_ASSERT(rb_typeddata_is_kind_of(self, &ractor_port_data_type)); struct ractor_port *rp = DATA_PTR(self); return rp; } static VALUE ractor_port_alloc(VALUE klass) { struct ractor_port *rp; VALUE rpv = TypedData_Make_Struct(klass, struct ractor_port, &ractor_port_data_type, rp); return rpv; } static VALUE ractor_port_init(VALUE rpv, rb_ractor_t *r) { struct ractor_port *rp = RACTOR_PORT_PTR(rpv); rp->r = r; RB_OBJ_WRITTEN(rpv, Qundef, r->pub.self); rp->id_ = ractor_genid_for_port(r); ractor_add_port(r, ractor_port_id(rp)); rb_obj_freeze(rpv); return rpv; } static VALUE ractor_port_initialzie(VALUE self) { return ractor_port_init(self, GET_RACTOR()); } static VALUE ractor_port_initialzie_copy(VALUE self, VALUE orig) { struct ractor_port *dst = RACTOR_PORT_PTR(self); struct ractor_port *src = RACTOR_PORT_PTR(orig); dst->r = src->r; RB_OBJ_WRITTEN(self, Qundef, dst->r->pub.self); dst->id_ = ractor_port_id(src); return self; } static VALUE ractor_port_new(rb_ractor_t *r) { VALUE rpv = ractor_port_alloc(rb_cRactorPort); ractor_port_init(rpv, r); return rpv; } static bool ractor_port_p(VALUE self) { return rb_typeddata_is_kind_of(self, &ractor_port_data_type); } static VALUE ractor_port_receive(rb_execution_context_t *ec, VALUE self) { const struct ractor_port *rp = RACTOR_PORT_PTR(self); if (rp->r != rb_ec_ractor_ptr(ec)) { rb_raise(rb_eRactorError, "only allowed from the creator Ractor of this port"); } return ractor_receive(ec, rp); } static VALUE ractor_port_send(rb_execution_context_t *ec, VALUE self, VALUE obj, VALUE move) { const struct ractor_port *rp = RACTOR_PORT_PTR(self); ractor_send(ec, rp, obj, RTEST(move)); return self; } static bool ractor_closed_port_p(rb_execution_context_t *ec, rb_ractor_t *r, const struct ractor_port *rp); static bool ractor_close_port(rb_execution_context_t *ec, rb_ractor_t *r, const struct ractor_port *rp); static VALUE ractor_port_closed_p(rb_execution_context_t *ec, VALUE self) { const struct ractor_port *rp = RACTOR_PORT_PTR(self); if (ractor_closed_port_p(ec, rp->r, rp)) { return Qtrue; } else { return Qfalse; } } static VALUE ractor_port_close(rb_execution_context_t *ec, VALUE self) { const struct ractor_port *rp = RACTOR_PORT_PTR(self); rb_ractor_t *cr = rb_ec_ractor_ptr(ec); if (cr != rp->r) { rb_raise(rb_eRactorError, "closing port by other ractors is not allowed"); } ractor_close_port(ec, cr, rp); return self; } // ractor-internal // ractor-internal - ractor_basket enum ractor_basket_type { // basket is empty basket_type_none, // value is available basket_type_ref, basket_type_copy, basket_type_move, }; struct ractor_basket { enum ractor_basket_type type; VALUE sender; st_data_t port_id; struct { VALUE v; bool exception; } p; // payload struct ccan_list_node node; }; #if 0 static inline bool ractor_basket_type_p(const struct ractor_basket *b, enum ractor_basket_type type) { return b->type == type; } static inline bool ractor_basket_none_p(const struct ractor_basket *b) { return ractor_basket_type_p(b, basket_type_none); } #endif static void ractor_basket_mark(const struct ractor_basket *b) { rb_gc_mark(b->p.v); } static void ractor_basket_free(struct ractor_basket *b) { xfree(b); } static struct ractor_basket * ractor_basket_alloc(void) { struct ractor_basket *b = ALLOC(struct ractor_basket); return b; } // ractor-internal - ractor_queue struct ractor_queue { struct ccan_list_head set; bool closed; }; static void ractor_queue_init(struct ractor_queue *rq) { ccan_list_head_init(&rq->set); rq->closed = false; } static struct ractor_queue * ractor_queue_new(void) { struct ractor_queue *rq = ALLOC(struct ractor_queue); ractor_queue_init(rq); return rq; } static void ractor_queue_mark(const struct ractor_queue *rq) { const struct ractor_basket *b; ccan_list_for_each(&rq->set, b, node) { ractor_basket_mark(b); } } static void ractor_queue_free(struct ractor_queue *rq) { struct ractor_basket *b, *nxt; ccan_list_for_each_safe(&rq->set, b, nxt, node) { ccan_list_del_init(&b->node); ractor_basket_free(b); } VM_ASSERT(ccan_list_empty(&rq->set)); xfree(rq); } RBIMPL_ATTR_MAYBE_UNUSED() static size_t ractor_queue_size(const struct ractor_queue *rq) { size_t size = 0; const struct ractor_basket *b; ccan_list_for_each(&rq->set, b, node) { size++; } return size; } static void ractor_queue_close(struct ractor_queue *rq) { rq->closed = true; } static void ractor_queue_move(struct ractor_queue *dst_rq, struct ractor_queue *src_rq) { struct ccan_list_head *src = &src_rq->set; struct ccan_list_head *dst = &dst_rq->set; dst->n.next = src->n.next; dst->n.prev = src->n.prev; dst->n.next->prev = &dst->n; dst->n.prev->next = &dst->n; ccan_list_head_init(src); } #if 0 static struct ractor_basket * ractor_queue_head(rb_ractor_t *r, struct ractor_queue *rq) { return ccan_list_top(&rq->set, struct ractor_basket, node); } #endif static bool ractor_queue_empty_p(rb_ractor_t *r, const struct ractor_queue *rq) { return ccan_list_empty(&rq->set); } static struct ractor_basket * ractor_queue_deq(rb_ractor_t *r, struct ractor_queue *rq) { VM_ASSERT(GET_RACTOR() == r); return ccan_list_pop(&rq->set, struct ractor_basket, node); } static void ractor_queue_enq(rb_ractor_t *r, struct ractor_queue *rq, struct ractor_basket *basket) { ccan_list_add_tail(&rq->set, &basket->node); } #if 0 static void rq_dump(const struct ractor_queue *rq) { int i=0; struct ractor_basket *b; ccan_list_for_each(&rq->set, b, node) { fprintf(stderr, "%d type:%s %p\n", i, basket_type_name(b->type), (void *)b); i++; } } #endif static void ractor_delete_port(rb_ractor_t *cr, st_data_t id, bool locked); static struct ractor_queue * ractor_get_queue(rb_ractor_t *cr, st_data_t id, bool locked) { VM_ASSERT(cr == GET_RACTOR()); struct ractor_queue *rq; if (cr->sync.ports && st_lookup(cr->sync.ports, id, (st_data_t *)&rq)) { if (rq->closed && ractor_queue_empty_p(cr, rq)) { ractor_delete_port(cr, id, locked); return NULL; } else { return rq; } } else { return NULL; } } // ractor-internal - ports static void ractor_add_port(rb_ractor_t *r, st_data_t id) { struct ractor_queue *rq = ractor_queue_new(); ASSERT_ractor_unlocking(r); RUBY_DEBUG_LOG("id:%u", (unsigned int)id); RACTOR_LOCK(r); { // memo: can cause GC, but GC doesn't use ractor locking. st_insert(r->sync.ports, id, (st_data_t)rq); } RACTOR_UNLOCK(r); } static void ractor_delete_port_locked(rb_ractor_t *cr, st_data_t id) { ASSERT_ractor_locking(cr); RUBY_DEBUG_LOG("id:%u", (unsigned int)id); struct ractor_queue *rq; if (st_delete(cr->sync.ports, &id, (st_data_t *)&rq)) { ractor_queue_free(rq); } else { VM_ASSERT(0); } } static void ractor_delete_port(rb_ractor_t *cr, st_data_t id, bool locked) { if (locked) { ractor_delete_port_locked(cr, id); } else { RACTOR_LOCK_SELF(cr); { ractor_delete_port_locked(cr, id); } RACTOR_UNLOCK_SELF(cr); } } static const struct ractor_port * ractor_default_port(rb_ractor_t *r) { return RACTOR_PORT_PTR(r->sync.default_port_value); } static VALUE ractor_default_port_value(rb_ractor_t *r) { return r->sync.default_port_value; } static bool ractor_closed_port_p(rb_execution_context_t *ec, rb_ractor_t *r, const struct ractor_port *rp) { VM_ASSERT(rb_ec_ractor_ptr(ec) == rp->r ? 1 : (ASSERT_ractor_locking(rp->r), 1)); const struct ractor_queue *rq; if (rp->r->sync.ports && st_lookup(rp->r->sync.ports, ractor_port_id(rp), (st_data_t *)&rq)) { return rq->closed; } else { return true; } } static void ractor_deliver_incoming_messages(rb_execution_context_t *ec, rb_ractor_t *cr); static bool ractor_queue_empty_p(rb_ractor_t *r, const struct ractor_queue *rq); static bool ractor_close_port(rb_execution_context_t *ec, rb_ractor_t *cr, const struct ractor_port *rp) { VM_ASSERT(cr == rp->r); struct ractor_queue *rq = NULL; RACTOR_LOCK_SELF(cr); { ractor_deliver_incoming_messages(ec, cr); // check incoming messages if (st_lookup(rp->r->sync.ports, ractor_port_id(rp), (st_data_t *)&rq)) { ractor_queue_close(rq); if (ractor_queue_empty_p(cr, rq)) { // delete from the table ractor_delete_port(cr, ractor_port_id(rp), true); } // TODO: free rq } } RACTOR_UNLOCK_SELF(cr); return rq != NULL; } static int ractor_free_all_ports_i(st_data_t port_id, st_data_t val, st_data_t dat) { struct ractor_queue *rq = (struct ractor_queue *)val; // rb_ractor_t *cr = (rb_ractor_t *)dat; ractor_queue_free(rq); return ST_CONTINUE; } static void ractor_free_all_ports(rb_ractor_t *cr) { if (cr->sync.ports) { st_foreach(cr->sync.ports, ractor_free_all_ports_i, (st_data_t)cr); st_free_table(cr->sync.ports); cr->sync.ports = NULL; } if (cr->sync.recv_queue) { ractor_queue_free(cr->sync.recv_queue); cr->sync.recv_queue = NULL; } } static void ractor_sync_terminate_atfork(rb_vm_t *vm, rb_ractor_t *r) { ractor_free_all_ports(r); r->sync.legacy = Qnil; } // Ractor#monitor struct ractor_monitor { struct ractor_port port; struct ccan_list_node node; }; static void ractor_mark_monitors(rb_ractor_t *r) { const struct ractor_monitor *rm; ccan_list_for_each(&r->sync.monitors, rm, node) { rb_gc_mark(rm->port.r->pub.self); } } static VALUE ractor_exit_token(bool exc) { if (exc) { RUBY_DEBUG_LOG("aborted"); return ID2SYM(idAborted); } else { RUBY_DEBUG_LOG("exited"); return ID2SYM(idExited); } } static VALUE ractor_monitor(rb_execution_context_t *ec, VALUE self, VALUE port) { rb_ractor_t *r = RACTOR_PTR(self); bool terminated = false; const struct ractor_port *rp = RACTOR_PORT_PTR(port); struct ractor_monitor *rm = ALLOC(struct ractor_monitor); rm->port = *rp; // copy port information RACTOR_LOCK(r); { if (UNDEF_P(r->sync.legacy)) { // not terminated RUBY_DEBUG_LOG("OK/r:%u -> port:%u@r%u", (unsigned int)rb_ractor_id(r), (unsigned int)ractor_port_id(&rm->port), (unsigned int)rb_ractor_id(rm->port.r)); ccan_list_add_tail(&r->sync.monitors, &rm->node); } else { RUBY_DEBUG_LOG("NG/r:%u -> port:%u@r%u", (unsigned int)rb_ractor_id(r), (unsigned int)ractor_port_id(&rm->port), (unsigned int)rb_ractor_id(rm->port.r)); terminated = true; } } RACTOR_UNLOCK(r); if (terminated) { xfree(rm); ractor_port_send(ec, port, ractor_exit_token(r->sync.legacy_exc), Qfalse); return Qfalse; } else { return Qtrue; } } static VALUE ractor_unmonitor(rb_execution_context_t *ec, VALUE self, VALUE port) { rb_ractor_t *r = RACTOR_PTR(self); const struct ractor_port *rp = RACTOR_PORT_PTR(port); RACTOR_LOCK(r); { if (UNDEF_P(r->sync.legacy)) { // not terminated struct ractor_monitor *rm, *nxt; ccan_list_for_each_safe(&r->sync.monitors, rm, nxt, node) { if (ractor_port_id(&rm->port) == ractor_port_id(rp)) { RUBY_DEBUG_LOG("r:%u -> port:%u@r%u", (unsigned int)rb_ractor_id(r), (unsigned int)ractor_port_id(&rm->port), (unsigned int)rb_ractor_id(rm->port.r)); ccan_list_del(&rm->node); xfree(rm); } } } } RACTOR_UNLOCK(r); return self; } static void ractor_notify_exit(rb_execution_context_t *ec, rb_ractor_t *cr, VALUE legacy, bool exc) { RUBY_DEBUG_LOG("exc:%d", exc); VM_ASSERT(!UNDEF_P(legacy)); VM_ASSERT(cr->sync.legacy == Qundef); RACTOR_LOCK_SELF(cr); { ractor_free_all_ports(cr); cr->sync.legacy = legacy; cr->sync.legacy_exc = exc; } RACTOR_UNLOCK_SELF(cr); // send token VALUE token = ractor_exit_token(exc); struct ractor_monitor *rm, *nxt; ccan_list_for_each_safe(&cr->sync.monitors, rm, nxt, node) { RUBY_DEBUG_LOG("port:%u@r%u", (unsigned int)ractor_port_id(&rm->port), (unsigned int)rb_ractor_id(rm->port.r)); ractor_try_send(ec, &rm->port, token, false); ccan_list_del(&rm->node); xfree(rm); } VM_ASSERT(ccan_list_empty(&cr->sync.monitors)); } // ractor-internal - initialize, mark, free, memsize static int ractor_mark_ports_i(st_data_t key, st_data_t val, st_data_t data) { // id -> ractor_queue const struct ractor_queue *rq = (struct ractor_queue *)val; ractor_queue_mark(rq); return ST_CONTINUE; } static void ractor_sync_mark(rb_ractor_t *r) { rb_gc_mark(r->sync.default_port_value); if (r->sync.ports) { ractor_queue_mark(r->sync.recv_queue); st_foreach(r->sync.ports, ractor_mark_ports_i, 0); } ractor_mark_monitors(r); } static int ractor_sync_free_ports_i(st_data_t _key, st_data_t val, st_data_t _args) { struct ractor_queue *queue = (struct ractor_queue *)val; ractor_queue_free(queue); return ST_CONTINUE; } static void ractor_sync_free(rb_ractor_t *r) { if (r->sync.recv_queue) { ractor_queue_free(r->sync.recv_queue); } // maybe NULL if (r->sync.ports) { st_foreach(r->sync.ports, ractor_sync_free_ports_i, 0); st_free_table(r->sync.ports); r->sync.ports = NULL; } } static size_t ractor_sync_memsize(const rb_ractor_t *r) { return st_table_size(r->sync.ports); } static void ractor_sync_init(rb_ractor_t *r) { // lock rb_native_mutex_initialize(&r->sync.lock); // monitors ccan_list_head_init(&r->sync.monitors); // waiters ccan_list_head_init(&r->sync.waiters); // receiving queue r->sync.recv_queue = ractor_queue_new(); // ports r->sync.ports = st_init_numtable(); r->sync.default_port_value = ractor_port_new(r); FL_SET_RAW(r->sync.default_port_value, RUBY_FL_SHAREABLE); // only default ports are shareable // legacy r->sync.legacy = Qundef; #ifndef RUBY_THREAD_PTHREAD_H rb_native_cond_initialize(&r->sync.wakeup_cond); #endif } // Ractor#value static rb_ractor_t * ractor_set_successor_once(rb_ractor_t *r, rb_ractor_t *cr) { if (r->sync.successor == NULL) { RACTOR_LOCK(r); { if (r->sync.successor != NULL) { // already `value`ed } else { r->sync.successor = cr; } } RACTOR_UNLOCK(r); } VM_ASSERT(r->sync.successor != NULL); return r->sync.successor; } static VALUE ractor_reset_belonging(VALUE obj); static VALUE ractor_make_remote_exception(VALUE cause, VALUE sender) { VALUE err = rb_exc_new_cstr(rb_eRactorRemoteError, "thrown by remote Ractor."); rb_ivar_set(err, rb_intern("@ractor"), sender); rb_ec_setup_exception(NULL, err, cause); return err; } static VALUE ractor_value(rb_execution_context_t *ec, VALUE self) { rb_ractor_t *cr = rb_ec_ractor_ptr(ec); rb_ractor_t *r = RACTOR_PTR(self); rb_ractor_t *sr = ractor_set_successor_once(r, cr); if (sr == cr) { ractor_reset_belonging(r->sync.legacy); if (r->sync.legacy_exc) { rb_exc_raise(ractor_make_remote_exception(r->sync.legacy, self)); } return r->sync.legacy; } else { rb_raise(rb_eRactorError, "Only the successor ractor can take a value"); } } static VALUE ractor_move(VALUE obj); // in this file static VALUE ractor_copy(VALUE obj); // in this file static VALUE ractor_prepare_payload(rb_execution_context_t *ec, VALUE obj, enum ractor_basket_type *ptype) { switch (*ptype) { case basket_type_ref: return obj; case basket_type_move: return ractor_move(obj); default: if (rb_ractor_shareable_p(obj)) { *ptype = basket_type_ref; return obj; } else { *ptype = basket_type_copy; return ractor_copy(obj); } } } static struct ractor_basket * ractor_basket_new(rb_execution_context_t *ec, VALUE obj, enum ractor_basket_type type, bool exc) { VALUE v = ractor_prepare_payload(ec, obj, &type); struct ractor_basket *b = ractor_basket_alloc(); b->type = type; b->p.v = v; b->p.exception = exc; return b; } static VALUE ractor_basket_value(struct ractor_basket *b) { switch (b->type) { case basket_type_ref: break; case basket_type_copy: case basket_type_move: ractor_reset_belonging(b->p.v); break; default: VM_ASSERT(0); // unreachable } VM_ASSERT(!RB_TYPE_P(b->p.v, T_NONE)); return b->p.v; } static VALUE ractor_basket_accept(struct ractor_basket *b) { VALUE v = ractor_basket_value(b); if (b->p.exception) { VALUE err = ractor_make_remote_exception(v, b->sender); ractor_basket_free(b); rb_exc_raise(err); } ractor_basket_free(b); return v; } // Ractor blocking by receive enum ractor_wakeup_status { wakeup_none, wakeup_by_send, wakeup_by_interrupt, // wakeup_by_close, }; struct ractor_waiter { enum ractor_wakeup_status wakeup_status; rb_thread_t *th; struct ccan_list_node node; }; #if VM_CHECK_MODE > 0 static bool ractor_waiter_included(rb_ractor_t *cr, rb_thread_t *th) { ASSERT_ractor_locking(cr); struct ractor_waiter *w; ccan_list_for_each(&cr->sync.waiters, w, node) { if (w->th == th) { return true; } } return false; } #endif #if USE_RUBY_DEBUG_LOG static const char * wakeup_status_str(enum ractor_wakeup_status wakeup_status) { switch (wakeup_status) { case wakeup_none: return "none"; case wakeup_by_send: return "by_send"; case wakeup_by_interrupt: return "by_interrupt"; // case wakeup_by_close: return "by_close"; } rb_bug("unreachable"); } static const char * basket_type_name(enum ractor_basket_type type) { switch (type) { case basket_type_none: return "none"; case basket_type_ref: return "ref"; case basket_type_copy: return "copy"; case basket_type_move: return "move"; } VM_ASSERT(0); return NULL; } #endif // USE_RUBY_DEBUG_LOG #ifdef RUBY_THREAD_PTHREAD_H // #else // win32 static void ractor_cond_wait(rb_ractor_t *r) { #if RACTOR_CHECK_MODE > 0 VALUE locked_by = r->sync.locked_by; r->sync.locked_by = Qnil; #endif rb_native_cond_wait(&r->sync.wakeup_cond, &r->sync.lock); #if RACTOR_CHECK_MODE > 0 r->sync.locked_by = locked_by; #endif } static void * ractor_wait_no_gvl(void *ptr) { struct ractor_waiter *waiter = (struct ractor_waiter *)ptr; rb_ractor_t *cr = waiter->th->ractor; RACTOR_LOCK_SELF(cr); { if (waiter->wakeup_status == wakeup_none) { ractor_cond_wait(cr); } } RACTOR_UNLOCK_SELF(cr); return NULL; } static void rb_ractor_sched_wait(rb_execution_context_t *ec, rb_ractor_t *cr, rb_unblock_function_t *ubf, void *ptr) { struct ractor_waiter *waiter = (struct ractor_waiter *)ptr; RACTOR_UNLOCK(cr); { rb_nogvl(ractor_wait_no_gvl, waiter, ubf, waiter, RB_NOGVL_UBF_ASYNC_SAFE | RB_NOGVL_INTR_FAIL); } RACTOR_LOCK(cr); } static void rb_ractor_sched_wakeup(rb_ractor_t *r, rb_thread_t *th) { // ractor lock is acquired rb_native_cond_broadcast(&r->sync.wakeup_cond); } #endif static bool ractor_wakeup_all(rb_ractor_t *r, enum ractor_wakeup_status wakeup_status) { ASSERT_ractor_unlocking(r); RUBY_DEBUG_LOG("r:%u wakeup:%s", rb_ractor_id(r), wakeup_status_str(wakeup_status)); bool wakeup_p = false; RACTOR_LOCK(r); while (1) { struct ractor_waiter *waiter = ccan_list_pop(&r->sync.waiters, struct ractor_waiter, node); if (waiter) { VM_ASSERT(waiter->wakeup_status == wakeup_none); waiter->wakeup_status = wakeup_status; rb_ractor_sched_wakeup(r, waiter->th); wakeup_p = true; } else { break; } } RACTOR_UNLOCK(r); return wakeup_p; } static void ubf_ractor_wait(void *ptr) { struct ractor_waiter *waiter = (struct ractor_waiter *)ptr; rb_thread_t *th = waiter->th; rb_ractor_t *r = th->ractor; // clear ubf and nobody can kick UBF th->unblock.func = NULL; th->unblock.arg = NULL; rb_native_mutex_unlock(&th->interrupt_lock); { RACTOR_LOCK(r); { if (waiter->wakeup_status == wakeup_none) { RUBY_DEBUG_LOG("waiter:%p", (void *)waiter); waiter->wakeup_status = wakeup_by_interrupt; ccan_list_del(&waiter->node); rb_ractor_sched_wakeup(r, waiter->th); } } RACTOR_UNLOCK(r); } rb_native_mutex_lock(&th->interrupt_lock); } static enum ractor_wakeup_status ractor_wait(rb_execution_context_t *ec, rb_ractor_t *cr) { rb_thread_t *th = rb_ec_thread_ptr(ec); struct ractor_waiter waiter = { .wakeup_status = wakeup_none, .th = th, }; RUBY_DEBUG_LOG("wait%s", ""); ASSERT_ractor_locking(cr); VM_ASSERT(GET_RACTOR() == cr); VM_ASSERT(!ractor_waiter_included(cr, th)); ccan_list_add_tail(&cr->sync.waiters, &waiter.node); // resume another ready thread and wait for an event rb_ractor_sched_wait(ec, cr, ubf_ractor_wait, &waiter); if (waiter.wakeup_status == wakeup_none) { ccan_list_del(&waiter.node); } RUBY_DEBUG_LOG("wakeup_status:%s", wakeup_status_str(waiter.wakeup_status)); RACTOR_UNLOCK_SELF(cr); { rb_ec_check_ints(ec); } RACTOR_LOCK_SELF(cr); VM_ASSERT(!ractor_waiter_included(cr, th)); return waiter.wakeup_status; } static void ractor_deliver_incoming_messages(rb_execution_context_t *ec, rb_ractor_t *cr) { ASSERT_ractor_locking(cr); struct ractor_queue *recv_q = cr->sync.recv_queue; struct ractor_basket *b; while ((b = ractor_queue_deq(cr, recv_q)) != NULL) { ractor_queue_enq(cr, ractor_get_queue(cr, b->port_id, true), b); } } static bool ractor_check_received(rb_ractor_t *cr, struct ractor_queue *messages) { struct ractor_queue *received_queue = cr->sync.recv_queue; bool received = false; ASSERT_ractor_locking(cr); if (ractor_queue_empty_p(cr, received_queue)) { RUBY_DEBUG_LOG("empty"); } else { received = true; // messages <- incoming ractor_queue_init(messages); ractor_queue_move(messages, received_queue); } VM_ASSERT(ractor_queue_empty_p(cr, received_queue)); RUBY_DEBUG_LOG("received:%d", received); return received; } static void ractor_wait_receive(rb_execution_context_t *ec, rb_ractor_t *cr) { struct ractor_queue messages; bool deliverred = false; RACTOR_LOCK_SELF(cr); { if (ractor_check_received(cr, &messages)) { deliverred = true; } else { ractor_wait(ec, cr); } } RACTOR_UNLOCK_SELF(cr); if (deliverred) { VM_ASSERT(!ractor_queue_empty_p(cr, &messages)); struct ractor_basket *b; while ((b = ractor_queue_deq(cr, &messages)) != NULL) { ractor_queue_enq(cr, ractor_get_queue(cr, b->port_id, false), b); } } } static VALUE ractor_try_receive(rb_execution_context_t *ec, rb_ractor_t *cr, const struct ractor_port *rp) { struct ractor_queue *rq = ractor_get_queue(cr, ractor_port_id(rp), false); if (rq == NULL) { rb_raise(rb_eRactorClosedError, "The port was already closed"); } struct ractor_basket *b = ractor_queue_deq(cr, rq); if (rq->closed && ractor_queue_empty_p(cr, rq)) { ractor_delete_port(cr, ractor_port_id(rp), false); } if (b) { return ractor_basket_accept(b); } else { return Qundef; } } static VALUE ractor_receive(rb_execution_context_t *ec, const struct ractor_port *rp) { rb_ractor_t *cr = rb_ec_ractor_ptr(ec); VM_ASSERT(cr == rp->r); RUBY_DEBUG_LOG("port:%u", (unsigned int)ractor_port_id(rp)); while (1) { VALUE v = ractor_try_receive(ec, cr, rp); if (v != Qundef) { return v; } else { ractor_wait_receive(ec, cr); } } } // Ractor#send static void ractor_send_basket(rb_execution_context_t *ec, const struct ractor_port *rp, struct ractor_basket *b, bool raise_on_error) { bool closed = false; RUBY_DEBUG_LOG("port:%u@r%u b:%s v:%p", (unsigned int)ractor_port_id(rp), rb_ractor_id(rp->r), basket_type_name(b->type), (void *)b->p.v); RACTOR_LOCK(rp->r); { if (ractor_closed_port_p(ec, rp->r, rp)) { closed = true; } else { b->port_id = ractor_port_id(rp); ractor_queue_enq(rp->r, rp->r->sync.recv_queue, b); } } RACTOR_UNLOCK(rp->r); // NOTE: ref r -> b->p.v is created, but Ractor is unprotected object, so no problem on that. if (!closed) { ractor_wakeup_all(rp->r, wakeup_by_send); } else { RUBY_DEBUG_LOG("closed:%u@r%u", (unsigned int)ractor_port_id(rp), rb_ractor_id(rp->r)); if (raise_on_error) { ractor_basket_free(b); rb_raise(rb_eRactorClosedError, "The port was already closed"); } } } static VALUE ractor_send0(rb_execution_context_t *ec, const struct ractor_port *rp, VALUE obj, VALUE move, bool raise_on_error) { struct ractor_basket *b = ractor_basket_new(ec, obj, RTEST(move) ? basket_type_move : basket_type_none, false); ractor_send_basket(ec, rp, b, raise_on_error); RB_GC_GUARD(obj); return rp->r->pub.self; } static VALUE ractor_send(rb_execution_context_t *ec, const struct ractor_port *rp, VALUE obj, VALUE move) { return ractor_send0(ec, rp, obj, move, true); } static VALUE ractor_try_send(rb_execution_context_t *ec, const struct ractor_port *rp, VALUE obj, VALUE move) { return ractor_send0(ec, rp, obj, move, false); } // Ractor::Selector struct ractor_selector { rb_ractor_t *r; struct st_table *ports; // rpv -> rp }; static int ractor_selector_mark_i(st_data_t key, st_data_t val, st_data_t dmy) { rb_gc_mark((VALUE)key); // rpv return ST_CONTINUE; } static void ractor_selector_mark(void *ptr) { struct ractor_selector *s = ptr; if (s->ports) { st_foreach(s->ports, ractor_selector_mark_i, 0); } } static void ractor_selector_free(void *ptr) { struct ractor_selector *s = ptr; st_free_table(s->ports); ruby_xfree(ptr); } static size_t ractor_selector_memsize(const void *ptr) { const struct ractor_selector *s = ptr; return sizeof(struct ractor_selector) + st_memsize(s->ports); } static const rb_data_type_t ractor_selector_data_type = { "ractor/selector", { ractor_selector_mark, ractor_selector_free, ractor_selector_memsize, NULL, // update }, 0, 0, RUBY_TYPED_FREE_IMMEDIATELY, }; static struct ractor_selector * RACTOR_SELECTOR_PTR(VALUE selv) { VM_ASSERT(rb_typeddata_is_kind_of(selv, &ractor_selector_data_type)); return (struct ractor_selector *)DATA_PTR(selv); } // Ractor::Selector.new static VALUE ractor_selector_create(VALUE klass) { struct ractor_selector *s; VALUE selv = TypedData_Make_Struct(klass, struct ractor_selector, &ractor_selector_data_type, s); s->ports = st_init_numtable(); // TODO return selv; } // Ractor::Selector#add(r) /* * call-seq: * add(ractor) -> ractor * * Adds _ractor_ to +self+. Raises an exception if _ractor_ is already added. * Returns _ractor_. */ static VALUE ractor_selector_add(VALUE selv, VALUE rpv) { if (!ractor_port_p(rpv)) { rb_raise(rb_eArgError, "Not a Ractor::Port object"); } struct ractor_selector *s = RACTOR_SELECTOR_PTR(selv); const struct ractor_port *rp = RACTOR_PORT_PTR(rpv); if (st_lookup(s->ports, (st_data_t)rpv, NULL)) { rb_raise(rb_eArgError, "already added"); } st_insert(s->ports, (st_data_t)rpv, (st_data_t)rp); return selv; } // Ractor::Selector#remove(r) /* call-seq: * remove(ractor) -> ractor * * Removes _ractor_ from +self+. Raises an exception if _ractor_ is not added. * Returns the removed _ractor_. */ static VALUE ractor_selector_remove(VALUE selv, VALUE rpv) { if (!ractor_port_p(rpv)) { rb_raise(rb_eArgError, "Not a Ractor::Port object"); } struct ractor_selector *s = RACTOR_SELECTOR_PTR(selv); if (!st_lookup(s->ports, (st_data_t)rpv, NULL)) { rb_raise(rb_eArgError, "not added yet"); } st_delete(s->ports, (st_data_t *)&rpv, NULL); return selv; } // Ractor::Selector#clear /* * call-seq: * clear -> self * * Removes all ractors from +self+. Raises +self+. */ static VALUE ractor_selector_clear(VALUE selv) { struct ractor_selector *s = RACTOR_SELECTOR_PTR(selv); st_clear(s->ports); return selv; } /* * call-seq: * empty? -> true or false * * Returns +true+ if no ractor is added. */ static VALUE ractor_selector_empty_p(VALUE selv) { struct ractor_selector *s = RACTOR_SELECTOR_PTR(selv); return s->ports->num_entries == 0 ? Qtrue : Qfalse; } // Ractor::Selector#wait struct ractor_selector_wait_data { rb_ractor_t *cr; rb_execution_context_t *ec; bool found; VALUE v; VALUE rpv; }; static int ractor_selector_wait_i(st_data_t key, st_data_t val, st_data_t data) { struct ractor_selector_wait_data *p = (struct ractor_selector_wait_data *)data; const struct ractor_port *rp = (const struct ractor_port *)val; VALUE v = ractor_try_receive(p->ec, p->cr, rp); if (v != Qundef) { p->found = true; p->v = v; p->rpv = (VALUE)key; return ST_STOP; } else { return ST_CONTINUE; } } static VALUE ractor_selector__wait(rb_execution_context_t *ec, VALUE selector) { rb_ractor_t *cr = rb_ec_ractor_ptr(ec); struct ractor_selector *s = RACTOR_SELECTOR_PTR(selector); struct ractor_selector_wait_data data = { .ec = ec, .cr = cr, .found = false, }; while (1) { st_foreach(s->ports, ractor_selector_wait_i, (st_data_t)&data); if (data.found) { return rb_ary_new_from_args(2, data.rpv, data.v); } ractor_wait_receive(ec, cr); } } /* * call-seq: * wait(receive: false, yield_value: undef, move: false) -> [ractor, value] * * Waits until any ractor in _selector_ can be active. */ static VALUE ractor_selector_wait(VALUE selector) { return ractor_selector__wait(GET_EC(), selector); } static VALUE ractor_selector_new(int argc, VALUE *ractors, VALUE klass) { VALUE selector = ractor_selector_create(klass); for (int i=0; i