librelist archives

« back to archive

[PATCH 1/2] inotify: use thread-local internal buffer

[PATCH 1/2] inotify: use thread-local internal buffer

From:
Eric Wong
Date:
2013-04-12 @ 22:25
This gives us thread-safety for the internal buffer.  While
we're at it, cache-align this buffer to avoid unnecessary
overhead when read() writes to it.
---
 ext/sleepy_penguin/epoll.c          | 17 +------
 ext/sleepy_penguin/init.c           | 19 ++++++++
 ext/sleepy_penguin/inotify.c        | 90 +++++++++++++++++++++----------------
 ext/sleepy_penguin/sleepy_penguin.h |  1 +
 test/test_inotify.rb                | 13 ------
 5 files changed, 73 insertions(+), 67 deletions(-)

diff --git a/ext/sleepy_penguin/epoll.c b/ext/sleepy_penguin/epoll.c
index ed45fbd..7ce5dcb 100644
--- a/ext/sleepy_penguin/epoll.c
+++ b/ext/sleepy_penguin/epoll.c
@@ -5,11 +5,9 @@
 #include "missing_epoll.h"
 #include "missing_rb_thread_fd_close.h"
 #include "missing_rb_update_max_fd.h"
-#define L1_CACHE_LINE_MAX 128 /* largest I've seen (Pentium 4) */
 
 static ID id_for_fd;
 static VALUE cEpoll;
-static size_t l1_cache_line_size;
 
 static uint64_t now_ms(void)
 {
@@ -74,7 +72,7 @@ static struct ep_per_thread *ept_get(VALUE self, int maxevents)
 	       sizeof(struct epoll_event) * maxevents;
 
 	free(ept); /* free(NULL) is POSIX and works on glibc */
-	err = posix_memalign(&ptr, l1_cache_line_size, size);
+	err = posix_memalign(&ptr, rb_sp_l1_cache_line_size, size);
 	if (err) {
 		errno = err;
 		rb_memerror();
@@ -234,23 +232,10 @@ static VALUE event_flags(VALUE self, VALUE flags)
 	return UINT2NUM(rb_sp_get_uflags(self, flags));
 }
 
-static size_t l1_cache_line_size_detect(void)
-{
-#ifdef _SC_LEVEL1_DCACHE_LINESIZE
-	long tmp = sysconf(_SC_LEVEL1_DCACHE_LINESIZE);
-
-	if (tmp > 0 && tmp <= L1_CACHE_LINE_MAX)
-		return (size_t)tmp;
-#endif /* _SC_LEVEL1_DCACHE_LINESIZE */
-	return L1_CACHE_LINE_MAX;
-}
-
 void sleepy_penguin_init_epoll(void)
 {
 	VALUE mSleepyPenguin, cEpoll_IO;
 
-	l1_cache_line_size = l1_cache_line_size_detect();
-
 	/*
 	 * Document-module: SleepyPenguin
 	 *
diff --git a/ext/sleepy_penguin/init.c b/ext/sleepy_penguin/init.c
index eea0025..eb332bf 100644
--- a/ext/sleepy_penguin/init.c
+++ b/ext/sleepy_penguin/init.c
@@ -1,3 +1,9 @@
+#define _GNU_SOURCE
+#include <unistd.h>
+#include <sys/types.h>
+#define L1_CACHE_LINE_MAX 128 /* largest I've seen (Pentium 4) */
+size_t rb_sp_l1_cache_line_size;
+
 void sleepy_penguin_init_epoll(void);
 
 #ifdef HAVE_SYS_TIMERFD_H
@@ -24,8 +30,21 @@ void sleepy_penguin_init_signalfd(void);
 #  define sleepy_penguin_init_signalfd() for(;0;)
 #endif
 
+static size_t l1_cache_line_size_detect(void)
+{
+#ifdef _SC_LEVEL1_DCACHE_LINESIZE
+	long tmp = sysconf(_SC_LEVEL1_DCACHE_LINESIZE);
+
+	if (tmp > 0 && tmp <= L1_CACHE_LINE_MAX)
+		return (size_t)tmp;
+#endif /* _SC_LEVEL1_DCACHE_LINESIZE */
+	return L1_CACHE_LINE_MAX;
+}
+
 void Init_sleepy_penguin_ext(void)
 {
+	rb_sp_l1_cache_line_size = l1_cache_line_size_detect();
+
 	sleepy_penguin_init_epoll();
 	sleepy_penguin_init_timerfd();
 	sleepy_penguin_init_eventfd();
diff --git a/ext/sleepy_penguin/inotify.c b/ext/sleepy_penguin/inotify.c
index 4c606a4..344145c 100644
--- a/ext/sleepy_penguin/inotify.c
+++ b/ext/sleepy_penguin/inotify.c
@@ -4,7 +4,12 @@
 #include <sys/ioctl.h>
 #include "missing_inotify.h"
 
-static ID id_inotify_buf, id_inotify_tmp, id_mask;
+struct inbuf {
+	size_t capa;
+	void *ptr;
+};
+
+static ID id_inotify_tmp, id_mask;
 static VALUE cEvent, checks;
 
 /*
@@ -36,7 +41,6 @@ static VALUE s_new(int argc, VALUE *argv, VALUE klass)
 
 	rv = INT2FIX(fd);
 	rv = rb_call_super(1, &rv);
-	rb_ivar_set(rv, id_inotify_buf, rb_str_new(0, 128));
 	rb_ivar_set(rv, id_inotify_tmp, rb_ary_new());
 
 	return rv;
@@ -133,15 +137,50 @@ static VALUE event_new(struct inotify_event *e)
 
 struct inread_args {
 	int fd;
-	struct inotify_event *ptr;
-	long len;
+	struct inbuf *inbuf;
 };
 
 static VALUE inread(void *ptr)
 {
 	struct inread_args *args = ptr;
 
-	return (VALUE)read(args->fd, args->ptr, args->len);
+	return (VALUE)read(args->fd, args->inbuf->ptr, args->inbuf->capa);
+}
+
+static void inbuf_grow(struct inbuf *inbuf, size_t size)
+{
+	int err;
+
+	if (inbuf->capa >= size)
+		return;
+	free(inbuf->ptr);
+	err = posix_memalign(&inbuf->ptr, rb_sp_l1_cache_line_size, size);
+	if (err) {
+		errno = err;
+		rb_memerror();
+	}
+	inbuf->capa = size;
+}
+
+static void resize_internal_buffer(struct inread_args *args)
+{
+	int newlen;
+
+	if (args->inbuf->capa > 0x10000)
+		rb_raise(rb_eRuntimeError, "path too long");
+
+	if (ioctl(args->fd, FIONREAD, &newlen) != 0)
+		rb_sys_fail("ioctl(inotify,FIONREAD)");
+
+	if (newlen > 0)
+		inbuf_grow(args->inbuf, (size_t)newlen);
+
+	if (newlen == 0) /* race: some other thread grabbed the data */
+		return;
+
+	rb_raise(rb_eRuntimeError,
+		"ioctl(inotify,FIONREAD) returned negative length: %d",
+		newlen);
 }
 
 /*
@@ -153,8 +192,9 @@ static VALUE inread(void *ptr)
  */
 static VALUE take(int argc, VALUE *argv, VALUE self)
 {
+	static __thread struct inbuf inbuf;
+
 	struct inread_args args;
-	VALUE buf;
 	VALUE tmp = rb_ivar_get(self, id_inotify_tmp);
 	struct inotify_event *e, *end;
 	ssize_t r;
@@ -166,10 +206,9 @@ static VALUE take(int argc, VALUE *argv, VALUE self)
 
 	rb_scan_args(argc, argv, "01", &nonblock);
 
+	inbuf_grow(&inbuf, 128);
 	args.fd = rb_sp_fileno(self);
-	buf = rb_ivar_get(self, id_inotify_buf);
-	args.len = RSTRING_LEN(buf);
-	args.ptr = (struct inotify_event *)RSTRING_PTR(buf);
+	args.inbuf = &inbuf;
 
 	if (RTEST(nonblock))
 		rb_sp_set_nonblock(args.fd);
@@ -181,15 +220,7 @@ static VALUE take(int argc, VALUE *argv, VALUE self)
 		    ||
 		    (r < 0 && errno == EINVAL) /* Linux >= 2.6.21 */
 		   ) {
-			/* resize internal buffer */
-			int newlen;
-			if (args.len > 0x10000)
-				rb_raise(rb_eRuntimeError, "path too long");
-			if (ioctl(args.fd, FIONREAD, &newlen) != 0)
-				rb_sys_fail("ioctl(inotify,FIONREAD)");
-			rb_str_resize(buf, newlen);
-			args.ptr = (struct inotify_event *)RSTRING_PTR(buf);
-			args.len = newlen;
+			resize_internal_buffer(&args);
 		} else if (r < 0) {
 			if (errno == EAGAIN && RTEST(nonblock))
 				return Qnil;
@@ -197,8 +228,9 @@ static VALUE take(int argc, VALUE *argv, VALUE self)
 				rb_sys_fail("read(inotify)");
 		} else {
 			/* buffer in userspace to minimize read() calls */
-			end = (struct inotify_event *)((char *)args.ptr + r);
-			for (e = args.ptr; e < end; ) {
+			end = (struct inotify_event *)
+					((char *)args.inbuf->ptr + r);
+			for (e = args.inbuf->ptr; e < end; ) {
 				VALUE event = event_new(e);
 				if (NIL_P(rv))
 					rv = event;
@@ -241,22 +273,6 @@ static VALUE events(VALUE self)
 
 /*
  * call-seq:
- *	inotify.dup	-> another Inotify object
- *
- * Duplicates an Inotify object, allowing it to be used in a blocking
- * fashion in another thread.  Ensures duplicated Inotify objects do
- * not share read buffers, but do share the userspace Array buffer.
- */
-static VALUE init_copy(VALUE dest, VALUE orig)
-{
-	rb_call_super(1, &orig); /* copy all other ivars as-is */
-	rb_ivar_set(dest, id_inotify_buf, rb_str_new(0, 128));
-
-	return dest;
-}
-
-/*
- * call-seq:
  *	ino.each { |event| ... } -> ino
  *
  * Yields each Inotify::Event received in a blocking fashion.
@@ -300,7 +316,6 @@ void sleepy_penguin_init_inotify(void)
 	cInotify = rb_define_class_under(mSleepyPenguin, "Inotify", rb_cIO);
 	rb_define_method(cInotify, "add_watch", add_watch, 2);
 	rb_define_method(cInotify, "rm_watch", rm_watch, 1);
-	rb_define_method(cInotify, "initialize_copy", init_copy, 1);
 	rb_define_method(cInotify, "take", take, -1);
 	rb_define_method(cInotify, "each", each, 0);
 
@@ -330,7 +345,6 @@ void sleepy_penguin_init_inotify(void)
 	cEvent = rb_define_class_under(cInotify, "Event", cEvent);
 	rb_define_method(cEvent, "events", events, 0);
 	rb_define_singleton_method(cInotify, "new", s_new, -1);
-	id_inotify_buf = rb_intern("@inotify_buf");
 	id_inotify_tmp = rb_intern("@inotify_tmp");
 	id_mask = rb_intern("mask");
 	checks = rb_ary_new();
diff --git a/ext/sleepy_penguin/sleepy_penguin.h 
b/ext/sleepy_penguin/sleepy_penguin.h
index bd4a4ca..599b319 100644
--- a/ext/sleepy_penguin/sleepy_penguin.h
+++ b/ext/sleepy_penguin/sleepy_penguin.h
@@ -12,6 +12,7 @@
 #include <assert.h>
 #include <unistd.h>
 
+extern size_t rb_sp_l1_cache_line_size;
 unsigned rb_sp_get_uflags(VALUE klass, VALUE flags);
 int rb_sp_get_flags(VALUE klass, VALUE flags);
 int rb_sp_io_closed(VALUE io);
diff --git a/test/test_inotify.rb b/test/test_inotify.rb
index dd2c7ad..b50a83b 100644
--- a/test/test_inotify.rb
+++ b/test/test_inotify.rb
@@ -30,19 +30,6 @@ def test_constants
     end
   end
 
-  def test_dup
-    a = Inotify.new
-    b = a.dup
-    assert a.fileno != b.fileno
-    abuf = a.instance_variable_get(:@inotify_buf)
-    bbuf = b.instance_variable_get(:@inotify_buf)
-    assert abuf.object_id != bbuf.object_id, "#{a.inspect} #{b.inspect}"
-
-    atmp = a.instance_variable_get(:@inotify_tmp)
-    btmp = b.instance_variable_get(:@inotify_tmp)
-    assert_equal atmp.object_id, btmp.object_id
-  end
-
   def test_new_nonblock
     ino = Inotify.new Inotify::NONBLOCK
     flags = ino.fcntl(Fcntl::F_GETFL) & Fcntl::O_NONBLOCK
-- 
1.8.2.1.366.ge2af9e3

[PATCH 2/2] inotify: thread-safe Inotify#take for rbx

From:
Eric Wong
Date:
2013-04-12 @ 22:25
Rubinius provides a Rubinius.synchronize helper for locking
objects which do not otherwise have locks.  We need to
synchronize Inotify#take access to prevent the internal array
from being clobbered.

This avoids unnecessary locking overhead on MRI which maintains
a GVL.
---
 lib/sleepy_penguin.rb | 15 +++++++++++++++
 1 file changed, 15 insertions(+)

diff --git a/lib/sleepy_penguin.rb b/lib/sleepy_penguin.rb
index c13eb0c..1888e3b 100644
--- a/lib/sleepy_penguin.rb
+++ b/lib/sleepy_penguin.rb
@@ -6,3 +6,18 @@ module SleepyPenguin
 end
 require 'sleepy_penguin_ext'
 require 'sleepy_penguin/epoll'
+
+# :stopdoc:
+#
+# We need to serialize Inotify#take for Rubinius since that has no GVL
+# to protect the internal array
+if defined?(SleepyPenguin::Inotify) &&
+   defined?(Rubinius) && Rubinius.respond_to?(:synchronize)
+  class SleepyPenguin::Inotify
+    alias __take take
+    undef_method :take
+    def take(*args)
+      Rubinius.synchronize(@inotify_tmp) { __take(*args) }
+    end
+  end
+end
-- 
1.8.2.1.366.ge2af9e3