diff options
-rw-r--r-- | scheduler.c | 34 | ||||
-rw-r--r-- | thread.c | 35 |
2 files changed, 55 insertions, 14 deletions
diff --git a/scheduler.c b/scheduler.c index 4267cb094f..9f68feef9d 100644 --- a/scheduler.c +++ b/scheduler.c @@ -170,6 +170,10 @@ verify_interface(VALUE scheduler) if (!rb_respond_to(scheduler, id_io_wait)) { rb_raise(rb_eArgError, "Scheduler must implement #io_wait"); } + + if (!rb_respond_to(scheduler, id_fiber_interrupt)) { + rb_warn("Scheduler should implement #fiber_interrupt"); + } } static VALUE @@ -458,7 +462,11 @@ rb_fiber_scheduler_io_wait(VALUE scheduler, VALUE io, VALUE events, VALUE timeou scheduler, io, events, timeout }; - return rb_thread_io_blocking_operation(io, fiber_scheduler_io_wait, (VALUE)&arguments); + if (rb_respond_to(scheduler, id_fiber_interrupt)) { + return rb_thread_io_blocking_operation(io, fiber_scheduler_io_wait, (VALUE)&arguments); + } else { + return fiber_scheduler_io_wait((VALUE)&arguments); + } } VALUE @@ -546,7 +554,11 @@ rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t lengt scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset) }; - return rb_thread_io_blocking_operation(io, fiber_scheduler_io_read, (VALUE)&arguments); + if (rb_respond_to(scheduler, id_fiber_interrupt)) { + return rb_thread_io_blocking_operation(io, fiber_scheduler_io_read, (VALUE)&arguments); + } else { + return fiber_scheduler_io_read((VALUE)&arguments); + } } /* @@ -581,7 +593,11 @@ rb_fiber_scheduler_io_pread(VALUE scheduler, VALUE io, rb_off_t from, VALUE buff scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset) }; - return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pread, (VALUE)&arguments); + if (rb_respond_to(scheduler, id_fiber_interrupt)) { + return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pread, (VALUE)&arguments); + } else { + return fiber_scheduler_io_pread((VALUE)&arguments); + } } /* @@ -630,7 +646,11 @@ rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t leng scheduler, io, buffer, SIZET2NUM(length), SIZET2NUM(offset) }; - return rb_thread_io_blocking_operation(io, fiber_scheduler_io_write, (VALUE)&arguments); + if (rb_respond_to(scheduler, id_fiber_interrupt)) { + return rb_thread_io_blocking_operation(io, fiber_scheduler_io_write, (VALUE)&arguments); + } else { + return fiber_scheduler_io_write((VALUE)&arguments); + } } /* @@ -666,7 +686,11 @@ rb_fiber_scheduler_io_pwrite(VALUE scheduler, VALUE io, rb_off_t from, VALUE buf scheduler, io, buffer, OFFT2NUM(from), SIZET2NUM(length), SIZET2NUM(offset) }; - return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pwrite, (VALUE)&arguments); + if (rb_respond_to(scheduler, id_fiber_interrupt)) { + return rb_thread_io_blocking_operation(io, fiber_scheduler_io_pwrite, (VALUE)&arguments); + } else { + return fiber_scheduler_io_pwrite((VALUE)&arguments); + } } VALUE @@ -1721,6 +1721,12 @@ rb_io_blocking_operation_enter(struct rb_io *io, struct rb_io_blocking_operation ccan_list_add(rb_io_blocking_operations(io), &blocking_operation->list); } +static void +rb_io_blocking_operation_pop(struct rb_io *io, struct rb_io_blocking_operation *blocking_operation) +{ + ccan_list_del(&blocking_operation->list); +} + struct io_blocking_operation_arguments { struct rb_io *io; struct rb_io_blocking_operation *blocking_operation; @@ -1732,7 +1738,7 @@ io_blocking_operation_exit(VALUE _arguments) struct io_blocking_operation_arguments *arguments = (void*)_arguments; struct rb_io_blocking_operation *blocking_operation = arguments->blocking_operation; - ccan_list_del(&blocking_operation->list); + rb_io_blocking_operation_pop(arguments->io, blocking_operation); rb_io_t *io = arguments->io; rb_thread_t *thread = io->closing_ec->thread_ptr; @@ -1763,6 +1769,9 @@ rb_io_blocking_operation_exit(struct rb_io *io, struct rb_io_blocking_operation { VALUE wakeup_mutex = io->wakeup_mutex; + // Indicate that the blocking operation is no longer active: + blocking_operation->ec = NULL; + if (RB_TEST(wakeup_mutex)) { struct io_blocking_operation_arguments arguments = { .io = io, @@ -1772,7 +1781,8 @@ rb_io_blocking_operation_exit(struct rb_io *io, struct rb_io_blocking_operation rb_mutex_synchronize(wakeup_mutex, io_blocking_operation_exit, (VALUE)&arguments); } else { - ccan_list_del(&blocking_operation->list); + // If there's no wakeup_mutex, we can safely remove the operation directly: + rb_io_blocking_operation_pop(io, blocking_operation); } } @@ -1809,7 +1819,7 @@ rb_thread_io_blocking_operation(VALUE self, VALUE(*function)(VALUE), VALUE argum struct rb_io_blocking_operation blocking_operation = { .ec = ec, }; - ccan_list_add(&io->blocking_operations, &blocking_operation.list); + rb_io_blocking_operation_enter(io, &blocking_operation); struct io_blocking_operation_arguments io_blocking_operation_arguments = { .io = io, @@ -2765,13 +2775,20 @@ thread_io_close_notify_all(VALUE _io) ccan_list_for_each(rb_io_blocking_operations(io), blocking_operation, list) { rb_execution_context_t *ec = blocking_operation->ec; - rb_thread_t *thread = ec->thread_ptr; + // If the operation is in progress, we need to interrupt it: + if (ec) { + rb_thread_t *thread = ec->thread_ptr; - if (thread->scheduler != Qnil) { - rb_fiber_scheduler_fiber_interrupt(thread->scheduler, rb_fiberptr_self(ec->fiber_ptr), error); - } else { - rb_threadptr_pending_interrupt_enque(thread, error); - rb_threadptr_interrupt(thread); + VALUE result = RUBY_Qundef; + if (thread->scheduler != Qnil) { + result = rb_fiber_scheduler_fiber_interrupt(thread->scheduler, rb_fiberptr_self(ec->fiber_ptr), error); + } + + if (result == RUBY_Qundef) { + // If the thread is not the current thread, we need to enqueue an error: + rb_threadptr_pending_interrupt_enque(thread, error); + rb_threadptr_interrupt(thread); + } } count += 1; |