summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamuel Williams <[email protected]>2025-06-14 12:32:51 +0900
committerGitHub <[email protected]>2025-06-14 12:32:51 +0900
commit68625a23d6deeb2e4c498d4bccc36d616608e05f (patch)
treece9859b8706304a195692326680bd153997a32cb
parentc45c600e2237affa8ba62ea5b290e29a1045d483 (diff)
Fix blocking operation cancellation. (#13614)
Expose `rb_thread_resolve_unblock_function` internally.
Notes
Notes: Merged-By: ioquatix <[email protected]>
-rw-r--r--internal/thread.h2
-rw-r--r--scheduler.c29
-rw-r--r--thread.c31
3 files changed, 49 insertions, 13 deletions
diff --git a/internal/thread.h b/internal/thread.h
index 00fcbfc560..928126c3b0 100644
--- a/internal/thread.h
+++ b/internal/thread.h
@@ -83,6 +83,8 @@ RUBY_SYMBOL_EXPORT_END
int rb_threadptr_execute_interrupts(struct rb_thread_struct *th, int blocking_timing);
bool rb_thread_mn_schedulable(VALUE thread);
+bool rb_thread_resolve_unblock_function(rb_unblock_function_t **unblock_function, void **data2, struct rb_thread_struct *thread);
+
// interrupt exec
typedef VALUE (rb_interrupt_exec_func_t)(void *data);
diff --git a/scheduler.c b/scheduler.c
index 11faca01d3..83b9681cc3 100644
--- a/scheduler.c
+++ b/scheduler.c
@@ -63,8 +63,10 @@ typedef enum {
struct rb_fiber_scheduler_blocking_operation {
void *(*function)(void *);
void *data;
+
rb_unblock_function_t *unblock_function;
void *data2;
+
int flags;
struct rb_fiber_scheduler_blocking_operation_state *state;
@@ -208,7 +210,10 @@ rb_fiber_scheduler_blocking_operation_execute(rb_fiber_scheduler_blocking_operat
return -1; // Invalid blocking operation
}
- // Atomically check if we can transition from QUEUED to EXECUTING
+ // Resolve sentinel values for unblock_function and data2:
+ rb_thread_resolve_unblock_function(&blocking_operation->unblock_function, &blocking_operation->data2, GET_THREAD());
+
+ // Atomically check if we can transition from QUEUED to EXECUTING
rb_atomic_t expected = RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED;
if (RUBY_ATOMIC_CAS(blocking_operation->status, expected, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING) != expected) {
// Already cancelled or in wrong state
@@ -1124,25 +1129,33 @@ rb_fiber_scheduler_blocking_operation_cancel(rb_fiber_scheduler_blocking_operati
rb_atomic_t current_state = RUBY_ATOMIC_LOAD(blocking_operation->status);
- switch (current_state) {
+ switch (current_state) {
case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_QUEUED:
- // Work hasn't started - just mark as cancelled
+ // Work hasn't started - just mark as cancelled:
if (RUBY_ATOMIC_CAS(blocking_operation->status, current_state, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED) == current_state) {
- return 0; // Successfully cancelled before execution
+ // Successfully cancelled before execution:
+ return 0;
}
// Fall through if state changed between load and CAS
case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_EXECUTING:
// Work is running - mark cancelled AND call unblock function
- RUBY_ATOMIC_SET(blocking_operation->status, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED);
- if (blocking_operation->unblock_function) {
+ if (RUBY_ATOMIC_CAS(blocking_operation->status, current_state, RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED) != current_state) {
+ // State changed between load and CAS - operation may have completed:
+ return 0;
+ }
+ // Otherwise, we successfully marked it as cancelled, so we can call the unblock function:
+ rb_unblock_function_t *unblock_function = blocking_operation->unblock_function;
+ if (unblock_function) {
+ RUBY_ASSERT(unblock_function != (rb_unblock_function_t *)-1 && "unblock_function is still sentinel value -1, should have been resolved earlier");
blocking_operation->unblock_function(blocking_operation->data2);
}
- return 1; // Cancelled during execution (unblock function called)
+ // Cancelled during execution (unblock function called):
+ return 1;
case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_COMPLETED:
case RB_FIBER_SCHEDULER_BLOCKING_OPERATION_STATUS_CANCELLED:
- // Already finished or cancelled
+ // Already finished or cancelled:
return 0;
}
diff --git a/thread.c b/thread.c
index 232c677382..41bd6c9ec6 100644
--- a/thread.c
+++ b/thread.c
@@ -1540,6 +1540,29 @@ blocking_region_end(rb_thread_t *th, struct rb_blocking_region_buffer *region)
#endif
}
+/*
+ * Resolve sentinel unblock function values to their actual function pointers
+ * and appropriate data2 values. This centralizes the logic for handling
+ * RUBY_UBF_IO and RUBY_UBF_PROCESS sentinel values.
+ *
+ * @param unblock_function Pointer to unblock function pointer (modified in place)
+ * @param data2 Pointer to data2 pointer (modified in place)
+ * @param thread Thread context for resolving data2 when needed
+ * @return true if sentinel values were resolved, false otherwise
+ */
+bool
+rb_thread_resolve_unblock_function(rb_unblock_function_t **unblock_function, void **data2, struct rb_thread_struct *thread)
+{
+ rb_unblock_function_t *ubf = *unblock_function;
+
+ if ((ubf == RUBY_UBF_IO) || (ubf == RUBY_UBF_PROCESS)) {
+ *unblock_function = ubf_select;
+ *data2 = thread;
+ return true;
+ }
+ return false;
+}
+
void *
rb_nogvl(void *(*func)(void *), void *data1,
rb_unblock_function_t *ubf, void *data2,
@@ -1566,11 +1589,9 @@ rb_nogvl(void *(*func)(void *), void *data1,
bool is_main_thread = vm->ractor.main_thread == th;
int saved_errno = 0;
- if ((ubf == RUBY_UBF_IO) || (ubf == RUBY_UBF_PROCESS)) {
- ubf = ubf_select;
- data2 = th;
- }
- else if (ubf && rb_ractor_living_thread_num(th->ractor) == 1 && is_main_thread) {
+ rb_thread_resolve_unblock_function(&ubf, &data2, th);
+
+ if (ubf && rb_ractor_living_thread_num(th->ractor) == 1 && is_main_thread) {
if (flags & RB_NOGVL_UBF_ASYNC_SAFE) {
vm->ubf_async_safe = 1;
}