librelist archives

« back to archive

[PATCH] add kgio_syssend method to wrap send(2)

[PATCH] add kgio_syssend method to wrap send(2)

From:
Eric Wong
Date:
2014-02-04 @ 01:42
This behaves like kgio_trywrite on GNU/Linux, but allows extra flags
to be specified.  The main purpose of this is to support use of the
MSG_MORE flag on GNU/Linux.
---
 ext/kgio/write.c     | 55 ++++++++++++++++++++++++++++++++++++++++++++++++++++
 test/test_syssend.rb | 43 ++++++++++++++++++++++++++++++++++++++++
 2 files changed, 98 insertions(+)
 create mode 100644 test/test_syssend.rb

diff --git a/ext/kgio/write.c b/ext/kgio/write.c
index 53b5b54..d118fd0 100644
--- a/ext/kgio/write.c
+++ b/ext/kgio/write.c
@@ -15,6 +15,7 @@ struct wr_args {
 	const char *ptr;
 	long len;
 	int fd;
+	int flags;
 };
 
 static void prepare_write(struct wr_args *a, VALUE io, VALUE str)
@@ -159,6 +160,56 @@ static VALUE kgio_trysend(VALUE io, VALUE str)
 #  define kgio_trysend kgio_trywrite
 #endif /* ! USE_MSG_DONTWAIT */
 
+#ifdef HAVE_RB_THREAD_IO_BLOCKING_REGION
+#  include "blocking_io_region.h"
+#ifdef MSG_DONTWAIT /* Linux only */
+#  define MY_MSG_DONTWAIT (MSG_DONTWAIT)
+#else
+#  define MY_MSG_DONTWAIT (0)
+#endif
+
+static VALUE nogvl_send(void *ptr)
+{
+	struct wr_args *a = ptr;
+
+	return (VALUE)send(a->fd, a->ptr, a->len, a->flags);
+}
+/*
+ * call-seq:
+ *
+ *	io.kgio_syssend(str, flags) -> nil, String or :wait_writable
+ *
+ * Returns nil if the write was completed in full.
+ *
+ * Returns a String containing the unwritten portion if EAGAIN
+ * was encountered, but some portion was successfully written.
+ *
+ * Returns :wait_writable if EAGAIN is encountered and nothing
+ * was written.
+ *
+ * This method is only available on Ruby 1.9.3 or later.
+ */
+static VALUE kgio_syssend(VALUE io, VALUE str, VALUE flags)
+{
+	struct wr_args a;
+	long n;
+
+	a.flags = NUM2INT(flags);
+	prepare_write(&a, io, str);
+	if (a.flags & MY_MSG_DONTWAIT) {
+		do {
+			n = (long)send(a.fd, a.ptr, a.len, a.flags);
+		} while (write_check(&a, n, "send", 0) != 0);
+	} else {
+		do {
+			n = (long)rb_thread_io_blocking_region(
+						nogvl_send, &a, a.fd);
+		} while (write_check(&a, n, "send", 0) != 0);
+	}
+	return a.buf;
+}
+#endif /* HAVE_RB_THREAD_IO_BLOCKING_REGION */
+
 /*
  * call-seq:
  *
@@ -209,4 +260,8 @@ void init_kgio_write(void)
 	mSocketMethods = rb_define_module_under(mKgio, "SocketMethods");
 	rb_define_method(mSocketMethods, "kgio_write", kgio_send, 1);
 	rb_define_method(mSocketMethods, "kgio_trywrite", kgio_trysend, 1);
+
+#ifdef USE_MSG_DONTWAIT
+	rb_define_method(mSocketMethods, "kgio_syssend", kgio_syssend, 2);
+#endif
 }
diff --git a/test/test_syssend.rb b/test/test_syssend.rb
new file mode 100644
index 0000000..5089ce3
--- /dev/null
+++ b/test/test_syssend.rb
@@ -0,0 +1,43 @@
+require 'test/unit'
+require 'kgio'
+
+class TestKgioSyssend < Test::Unit::TestCase
+  def setup
+    @host = '127.0.0.1' || ENV["TEST_HOST"]
+  end
+
+  def test_syssend
+    srv = Kgio::TCPServer.new(@host, 0)
+    port = srv.addr[1]
+    client = TCPSocket.new(@host, port)
+    acc = srv.kgio_accept
+    th = Thread.new { client.readpartial(4) }
+    sleep(0.05)
+    assert_nil acc.kgio_syssend("HI", Socket::MSG_DONTWAIT | Socket::MSG_MORE)
+    assert_nil acc.kgio_syssend("HI", Socket::MSG_DONTWAIT)
+    assert_equal "HIHI", th.value
+
+    buf = "*" * 123
+    res = []
+    case rv = acc.kgio_syssend(buf, Socket::MSG_DONTWAIT)
+    when nil
+    when String
+      res << rv
+    when Symbol
+      res << rv
+      break
+    end while true
+    assert_equal :wait_writable, res.last
+    if res.size > 1
+      assert_kind_of String, res[-2]
+    else
+      warn "res too small"
+    end
+
+    # blocking
+    th = Thread.new { loop { acc.kgio_syssend("ZZZZ", 0) } }
+    assert_nil th.join(0.1)
+  ensure
+    [ srv, acc, client ].each { |io| io.close if io }
+  end
+end if RUBY_PLATFORM =~ /linux/
-- 
Eric Wong