Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 52 additions & 2 deletions include/ruby/fiber/scheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,67 @@
*/
#include "ruby/internal/config.h"

#include <errno.h>

#ifdef STDC_HEADERS
#include <stddef.h> /* size_t */
#endif

#include "ruby/ruby.h"
#include "ruby/internal/dllexport.h"
#include "ruby/internal/arithmetic.h"

RBIMPL_SYMBOL_EXPORT_BEGIN()

struct timeval;

/**
* Wrap a `ssize_t` and `int errno` into a single `VALUE`. This interface should
* be used to safely capture results from system calls like `read` and `write`.
*
* You should use `rb_fiber_scheduler_io_result_apply` to unpack the result of
* this value and update `int errno`.
*
* You should not directly try to interpret the result value as it is considered
* an opaque representation. However, the general representation is an integer
* in the range of `[-int errno, size_t size]`. Linux generally restricts the
* result of system calls like `read` and `write` to `<= 2^31` which means this
* will typically fit within a single FIXNUM.
*
* @param[in] result The result of the system call.
* @param[in] error The value of `errno`.
* @return A `VALUE` which contains the result and/or errno.
*/
static inline VALUE
rb_fiber_scheduler_io_result(ssize_t result, int error) {
if (result == -1) {
return RB_INT2NUM(-error);
} else {
return RB_SIZE2NUM(result);
}
}

/**
* Apply an io result to the local thread, returning the value of the orginal
* system call that created it and updating `int errno`.
*
* You should not directly try to interpret the result value as it is considered
* an opaque representation.
*
* @param[in] result The `VALUE` which contains an errno and/or result size.
* @post Updates `int errno` with the value if negative.
* @return The original result of the system call.
*/
static inline ssize_t
rb_fiber_scheduler_io_result_apply(VALUE result) {
if (RB_FIXNUM_P(result) && RB_NUM2INT(result) < 0) {
errno = -RB_NUM2INT(result);
return -1;
} else {
return RB_NUM2SIZE(result);
}
}

/**
* Queries the current scheduler of the current thread that is calling this
* function.
Expand Down Expand Up @@ -195,7 +245,7 @@ VALUE rb_fiber_scheduler_io_wait_writable(VALUE scheduler, VALUE io);
* @param[out] buffer Return buffer.
* @param[in] length Requested number of bytes to read.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_read`.
* @return otherwise What `scheduler.io_read` returns.
* @return otherwise What `scheduler.io_read` returns `[-errno, size]`.
*/
VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t length);

Expand All @@ -207,7 +257,7 @@ VALUE rb_fiber_scheduler_io_read(VALUE scheduler, VALUE io, VALUE buffer, size_t
* @param[in] buffer What to write.
* @param[in] length Number of bytes to write.
* @retval RUBY_Qundef `scheduler` doesn't have `#io_write`.
* @return otherwise What `scheduler.io_write` returns.
* @return otherwise What `scheduler.io_write` returns `[-errno, size]`.
*/
VALUE rb_fiber_scheduler_io_write(VALUE scheduler, VALUE io, VALUE buffer, size_t length);

Expand Down
121 changes: 56 additions & 65 deletions io.c
Original file line number Diff line number Diff line change
Expand Up @@ -1138,14 +1138,14 @@ rb_read_internal(rb_io_t *fptr, void *buf, size_t count)
{
VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
VALUE result = rb_fiber_scheduler_io_read_memory(scheduler, fptr->self, buf, count, 1);
VALUE result = rb_fiber_scheduler_io_read_memory(scheduler, fptr->self, buf, count, count);

if (result != Qundef) {
ssize_t length = RB_NUM2SSIZE(result);
ssize_t length = rb_fiber_scheduler_io_result_apply(result);

if (length < 0) rb_sys_fail_path(fptr->pathv);
if (length < 0) rb_sys_fail_path(fptr->pathv);

return length;
return length;
}
}

Expand All @@ -1165,14 +1165,10 @@ rb_write_internal(rb_io_t *fptr, const void *buf, size_t count)
{
VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
VALUE result = rb_fiber_scheduler_io_write_memory(scheduler, fptr->self, buf, count, count);
VALUE result = rb_fiber_scheduler_io_write_memory(scheduler, fptr->self, buf, count, 0);

if (result != Qundef) {
ssize_t length = RB_NUM2SSIZE(result);

if (length < 0) rb_sys_fail_path(fptr->pathv);

return length;
return rb_fiber_scheduler_io_result_apply(result);
}
}

Expand All @@ -1182,33 +1178,34 @@ rb_write_internal(rb_io_t *fptr, const void *buf, size_t count)
.capa = count
};

return (ssize_t)rb_thread_io_blocking_region(internal_write_func, &iis, fptr->fd);
if (fptr->write_lock && rb_mutex_owned_p(fptr->write_lock))
return (ssize_t)rb_thread_call_without_gvl2(internal_write_func2, &iis, RUBY_UBF_IO, NULL);
else
return (ssize_t)rb_thread_io_blocking_region(internal_write_func, &iis, fptr->fd);
}

#ifdef HAVE_WRITEV
static ssize_t
rb_write_internal2(rb_io_t *fptr, const void *buf, size_t count)
rb_writev_internal(rb_io_t *fptr, const struct iovec *iov, int iovcnt)
{
struct io_internal_write_struct iis = {
.fd = fptr->fd,
.buf = buf,
.capa = count
};
VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
for (int i = 0; i < iovcnt; i += 1) {
VALUE result = rb_fiber_scheduler_io_write_memory(scheduler, fptr->self, iov[i].iov_base, iov[i].iov_len, 0);

return (ssize_t)rb_thread_call_without_gvl2(internal_write_func2, &iis,
RUBY_UBF_IO, NULL);
}
if (result != Qundef) {
return rb_fiber_scheduler_io_result_apply(result);
}
}
}

#ifdef HAVE_WRITEV
static ssize_t
rb_writev_internal(int fd, const struct iovec *iov, int iovcnt)
{
struct io_internal_writev_struct iis = {
.fd = fd,
.fd = fptr->fd,
.iov = iov,
.iovcnt = iovcnt,
};

return (ssize_t)rb_thread_io_blocking_region(internal_writev_func, &iis, fd);
return (ssize_t)rb_thread_io_blocking_region(internal_writev_func, &iis, fptr->fd);
}
#endif

Expand Down Expand Up @@ -1592,7 +1589,7 @@ io_binwrite_string(VALUE arg)
iov[1].iov_base = (char *)p->ptr;
iov[1].iov_len = p->length;

r = rb_writev_internal(fptr->fd, iov, 2);
r = rb_writev_internal(fptr, iov, 2);

if (r < 0)
return r;
Expand Down Expand Up @@ -1654,56 +1651,49 @@ io_binwrite(VALUE str, const char *ptr, long len, rb_io_t *fptr, int nosync)

if ((n = len) <= 0) return n;

VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
VALUE result = rb_fiber_scheduler_io_write_memory(scheduler, fptr->self, ptr, len, len);

if (result != Qundef) {
ssize_t length = RB_NUM2SSIZE(result);

if (length < 0) rb_sys_fail_path(fptr->pathv);

return length;
}
}

if (fptr->wbuf.ptr == NULL && !(!nosync && (fptr->mode & FMODE_SYNC))) {
fptr->wbuf.off = 0;
fptr->wbuf.len = 0;
fptr->wbuf.capa = IO_WBUF_CAPA_MIN;
fptr->wbuf.ptr = ALLOC_N(char, fptr->wbuf.capa);
fptr->write_lock = rb_mutex_new();
rb_mutex_allow_trap(fptr->write_lock, 1);
rb_mutex_allow_trap(fptr->write_lock, 1);
}

if ((!nosync && (fptr->mode & (FMODE_SYNC|FMODE_TTY))) ||
(fptr->wbuf.ptr && fptr->wbuf.capa <= fptr->wbuf.len + len)) {
struct binwrite_arg arg;
struct binwrite_arg arg;

arg.fptr = fptr;
arg.str = str;
arg.fptr = fptr;
arg.str = str;
retry:
arg.ptr = ptr + offset;
arg.length = n;
if (fptr->write_lock) {
arg.ptr = ptr + offset;
arg.length = n;

if (fptr->write_lock) {
r = rb_mutex_synchronize(fptr->write_lock, io_binwrite_string, (VALUE)&arg);
}
else {
r = io_binwrite_string((VALUE)&arg);
}
/* xxx: other threads may modify given string. */
}
else {
r = io_binwrite_string((VALUE)&arg);
}

/* xxx: other threads may modify given string. */
if (r == n) return len;
if (0 <= r) {
offset += r;
n -= r;
errno = EAGAIN;
}
if (r == -2L)
return -1L;
}

if (r == -2L)
return -1L;
if (rb_io_maybe_wait_writable(errno, fptr->self, Qnil)) {
rb_io_check_closed(fptr);
if (offset < len)
goto retry;

if (offset < len)
goto retry;
}

return -1L;
}

Expand All @@ -1712,8 +1702,10 @@ io_binwrite(VALUE str, const char *ptr, long len, rb_io_t *fptr, int nosync)
MEMMOVE(fptr->wbuf.ptr, fptr->wbuf.ptr+fptr->wbuf.off, char, fptr->wbuf.len);
fptr->wbuf.off = 0;
}

MEMMOVE(fptr->wbuf.ptr+fptr->wbuf.off+fptr->wbuf.len, ptr+offset, char, len);
fptr->wbuf.len += (int)len;

return len;
}

Expand Down Expand Up @@ -1853,7 +1845,7 @@ static VALUE
call_writev_internal(VALUE arg)
{
struct binwritev_arg *p = (struct binwritev_arg *)arg;
return rb_writev_internal(p->fptr->fd, p->iov, p->iovcnt);
return rb_writev_internal(p->fptr, p->iov, p->iovcnt);
}

static long
Expand Down Expand Up @@ -1906,7 +1898,7 @@ io_binwritev(struct iovec *iov, int iovcnt, rb_io_t *fptr)
r = rb_mutex_synchronize(fptr->write_lock, call_writev_internal, (VALUE)&arg);
}
else {
r = rb_writev_internal(fptr->fd, iov, iovcnt);
r = rb_writev_internal(fptr, iov, iovcnt);
}

if (r >= 0) {
Expand Down Expand Up @@ -2330,6 +2322,7 @@ fptr_wait_readable(rb_io_t *fptr)

if (ret)
rb_io_check_closed(fptr);

return ret;
}

Expand Down Expand Up @@ -3063,10 +3056,11 @@ read_internal_call(VALUE arg)

VALUE scheduler = rb_fiber_scheduler_current();
if (scheduler != Qnil) {
VALUE result = rb_fiber_scheduler_io_read_memory(scheduler, iis->fptr->self, iis->buf, iis->capa, 1);
VALUE result = rb_fiber_scheduler_io_read_memory(scheduler, iis->fptr->self, iis->buf, iis->capa, 0);

if (result != Qundef) {
return (VALUE)RB_NUM2SSIZE(result);
// This is actually returned as a pseudo-VALUE and later cast to a long:
return (VALUE)rb_fiber_scheduler_io_result_apply(result);
}
}

Expand Down Expand Up @@ -4761,10 +4755,7 @@ finish_writeconv(rb_io_t *fptr, int noalloc)
res = rb_econv_convert(fptr->writeconv, NULL, NULL, &dp, de, 0);
while (dp-ds) {
retry:
if (fptr->write_lock && rb_mutex_owned_p(fptr->write_lock))
r = rb_write_internal2(fptr, ds, dp-ds);
else
r = rb_write_internal(fptr, ds, dp-ds);
r = rb_write_internal(fptr, ds, dp-ds);
if (r == dp-ds)
break;
if (0 <= r) {
Expand Down
Loading