librelist archives

« back to archive

[PATCH] epoll: use per-thread data structure for concurrent Epoll#wait

[PATCH] epoll: use per-thread data structure for concurrent Epoll#wait

From:
Eric Wong
Date:
2012-03-22 @ 08:57
This allows multiple threads to park on Epoll#wait (without
holding onto the GVL).  This allows a single, one-shot notification
to wake up a single thread (another notification to a different
IO object would wake up another thread).

This allows using the same multi-threaded, EPOLLONESHOT-based
design as cmogstored:

  http://bogomips.org/cmogstored/queues.txt
---

  Pushed to git://bogomips.org/sleepy_penguin.git

 ext/sleepy_penguin/epoll.c       |  139 +++++++++++++++++++++++++-------------
 ext/sleepy_penguin/epoll_green.h |   39 +++++------
 test/test_epoll.rb               |   48 ++++++++++++-
 3 files changed, 160 insertions(+), 66 deletions(-)

diff --git a/ext/sleepy_penguin/epoll.c b/ext/sleepy_penguin/epoll.c
index 31c72e6..fa8edf0 100644
--- a/ext/sleepy_penguin/epoll.c
+++ b/ext/sleepy_penguin/epoll.c
@@ -12,6 +12,7 @@
 #include "missing_rb_update_max_fd.h"
 #define EP_RECREATE (-2)
 
+static pthread_key_t epoll_key;
 static st_table *active;
 static const int step = 64; /* unlikely to grow unless you're huge */
 static VALUE cEpoll_IO;
@@ -36,18 +37,60 @@ static VALUE unpack_event_data(struct epoll_event *event)
 	return (VALUE)event->data.ptr;
 }
 
+#if defined(__STDC_VERSION__) && (__STDC_VERSION__ >= 199901L)
+# define FLEX_ARRAY
+#elif defined(__GNUC__)
+# if (__GNUC__ >= 3)
+#  define FLEX_ARRAY
+# else
+#  define FLEX_ARRAY 0
+# endif
+#endif
+
 struct rb_epoll {
 	int fd;
-	int timeout;
-	int maxevents;
-	int capa;
-	struct epoll_event *events;
 	VALUE io;
 	VALUE marks;
 	VALUE flag_cache;
 	int flags;
 };
 
+struct ep_per_thread {
+	struct rb_epoll *ep;
+	int timeout;
+	int maxevents;
+	int capa;
+	struct epoll_event events[FLEX_ARRAY];
+};
+
+static struct ep_per_thread *ept_get(int maxevents)
+{
+	struct ep_per_thread *ept = pthread_getspecific(epoll_key);
+	int err;
+	size_t size;
+
+	if (ept && ept->capa >= maxevents)
+		goto out;
+
+	size = sizeof(struct ep_per_thread) +
+	       sizeof(struct epoll_event) * maxevents;
+
+	free(ept); /* free(NULL) works on glibc */
+	ept = malloc(size);
+	if (ept == NULL)
+		rb_memerror();
+	err = pthread_setspecific(epoll_key, ept);
+	if (err != 0) {
+		errno = err;
+		rb_sys_fail("pthread_setspecific");
+	}
+	ept->capa = maxevents;
+out:
+	ept->maxevents = maxevents;
+
+	return ept;
+}
+
 static struct rb_epoll *ep_get(VALUE self)
 {
 	struct rb_epoll *ep;
@@ -70,7 +113,6 @@ static void gcfree(void *ptr)
 {
 	struct rb_epoll *ep = ptr;
 
-	xfree(ep->events);
 	if (ep->fd >= 0) {
 		st_data_t key = ep->fd;
 		st_delete(active, &key, NULL);
@@ -95,9 +137,7 @@ static VALUE alloc(VALUE klass)
 	ep->io = Qnil;
 	ep->marks = Qnil;
 	ep->flag_cache = Qnil;
-	ep->capa = step;
 	ep->flags = 0;
-	ep->events = xmalloc(sizeof(struct epoll_event) * ep->capa);
 
 	return self;
 }
@@ -296,10 +336,10 @@ out:
 	return io;
 }
 
-static VALUE epwait_result(struct rb_epoll *ep, int n)
+static VALUE epwait_result(struct ep_per_thread *ept, int n)
 {
 	int i;
-	struct epoll_event *epoll_event = ep->events;
+	struct epoll_event *epoll_event = ept->events;
 	VALUE obj_events, obj;
 
 	if (n == -1)
@@ -311,50 +351,44 @@ static VALUE epwait_result(struct rb_epoll *ep, int n)
 		rb_yield_values(2, obj_events, obj);
 	}
 
-	/* grow our event buffer for the next epoll_wait call */
-	if (n == ep->capa) {
-		xfree(ep->events);
-		ep->capa += step;
-		ep->events = xmalloc(sizeof(struct epoll_event) * ep->capa);
-	}
-
 	return INT2NUM(n);
 }
 
-static int epoll_resume_p(uint64_t expire_at, struct rb_epoll *ep)
+static int epoll_resume_p(uint64_t expire_at, struct ep_per_thread *ept)
 {
 	uint64_t now;
 
-	ep_fd_check(ep);
+	ep_fd_check(ept->ep);
 
 	if (errno != EINTR)
 		return 0;
-	if (ep->timeout < 0)
+	if (ept->timeout < 0)
 		return 1;
 	now = now_ms();
-	ep->timeout = now > expire_at ? 0 : (int)(expire_at - now);
+	ept->timeout = now > expire_at ? 0 : (int)(expire_at - now);
 	return 1;
 }
 
 #if defined(HAVE_RB_THREAD_BLOCKING_REGION)
 static VALUE nogvl_wait(void *args)
 {
-	struct rb_epoll *ep = args;
-	int n = epoll_wait(ep->fd, ep->events, ep->maxevents, ep->timeout);
+	struct ep_per_thread *ept = args;
+	int fd = ept->ep->fd;
+	int n = epoll_wait(fd, ept->events, ept->maxevents, ept->timeout);
 
 	return (VALUE)n;
 }
 
-static VALUE real_epwait(struct rb_epoll *ep)
+static VALUE real_epwait(struct ep_per_thread *ept)
 {
 	int n;
-	uint64_t expire_at = ep->timeout > 0 ? now_ms() + ep->timeout : 0;
+	uint64_t expire_at = ept->timeout > 0 ? now_ms() + ept->timeout : 0;
 
-	do
-		n = (int)rb_sp_fd_region(nogvl_wait, ep, ep->fd);
-	while (n == -1 && epoll_resume_p(expire_at, ep));
+	do {
+		n = (int)rb_sp_fd_region(nogvl_wait, ept, ept->ep->fd);
+	} while (n == -1 && epoll_resume_p(expire_at, ept));
 
-	return epwait_result(ep, n);
+	return epwait_result(ept, n);
 }
 #else /* 1.8 Green thread compatible code */
 #  include "epoll_green.h"
@@ -374,20 +408,16 @@ static VALUE epwait(int argc, VALUE *argv, VALUE self)
 {
 	VALUE timeout, maxevents;
 	struct rb_epoll *ep = ep_get(self);
+	struct ep_per_thread *ept;
 
 	ep_check(ep);
 	rb_need_block();
 	rb_scan_args(argc, argv, "02", &maxevents, &timeout);
-	ep->timeout = NIL_P(timeout) ? -1 : NUM2INT(timeout);
-	ep->maxevents = NIL_P(maxevents) ? ep->capa : NUM2INT(maxevents);
+	ept = ept_get(NIL_P(maxevents) ? 64 : NUM2INT(maxevents));
+	ept->timeout = NIL_P(timeout) ? -1 : NUM2INT(timeout);
+	ept->ep = ep;
 
-	if (ep->maxevents > ep->capa) {
-		xfree(ep->events);
-		ep->capa = ep->maxevents;
-		ep->events = xmalloc(sizeof(struct epoll_event) * ep->capa);
-	}
-
-	return real_epwait(ep);
+	return real_epwait(ept);
 }
 
 /*
@@ -526,8 +556,7 @@ static VALUE init_copy(VALUE copy, VALUE orig)
 	struct rb_epoll *a = ep_get(orig);
 	struct rb_epoll *b = ep_get(copy);
 
-	assert(a->events && b->events && a->events != b->events &&
-	       NIL_P(b->io) && "Ruby broken?");
+	assert(NIL_P(b->io) && "Ruby broken?");
 
 	ep_check(a);
 	assert(NIL_P(b->marks) && "mark array not nil");
@@ -632,9 +661,34 @@ static void atfork_child(void)
 	st_free_table(old);
 }
 
+static void epoll_once(void)
+{
+	int err = pthread_key_create(&epoll_key, free);
+
+	if (err) {
+		errno = err;
+		rb_sys_fail("pthread_key_create");
+	}
+
+	active = st_init_numtable();
+
+	if (pthread_atfork(NULL, NULL, atfork_child) != 0) {
+		rb_gc();
+		if (pthread_atfork(NULL, NULL, atfork_child) != 0)
+			rb_memerror();
+	}
+}
+
 void sleepy_penguin_init_epoll(void)
 {
 	VALUE mSleepyPenguin, cEpoll;
+	pthread_once_t once = PTHREAD_ONCE_INIT;
+	int err = pthread_once(&once, epoll_once);
+
+	if (err) {
+		errno = err;
+		rb_sys_fail("pthread_once(.., epoll_once)");
+	}
 
 	/*
 	 * Document-module: SleepyPenguin
@@ -732,11 +786,4 @@ void sleepy_penguin_init_epoll(void)
 	rb_define_const(cEpoll, "ONESHOT", UINT2NUM(EPOLLONESHOT));
 
 	id_for_fd = rb_intern("for_fd");
-	active = st_init_numtable();
-
-	if (pthread_atfork(NULL, NULL, atfork_child) != 0) {
-		rb_gc();
-		if (pthread_atfork(NULL, NULL, atfork_child) != 0)
-			rb_memerror();
-	}
 }
diff --git a/ext/sleepy_penguin/epoll_green.h b/ext/sleepy_penguin/epoll_green.h
index ef36490..276a545 100644
--- a/ext/sleepy_penguin/epoll_green.h
+++ b/ext/sleepy_penguin/epoll_green.h
@@ -20,49 +20,50 @@ do { \
 } while (0)
 #endif
 
-static int safe_epoll_wait(struct rb_epoll *ep)
+static int safe_epoll_wait(struct ep_per_thread *ept)
 {
 	int n;
 
 	do {
 		TRAP_BEG;
-		n = epoll_wait(ep->fd, ep->events, ep->maxevents, 0);
+		n = epoll_wait(ept->ep->fd, ept->events, ept->maxevents, 0);
 		TRAP_END;
-	} while (n == -1 && errno == EINTR && ep_fd_check(ep));
+	} while (n == -1 && errno == EINTR && ep_fd_check(ept->ep));
 
 	return n;
 }
 
-static int epwait_forever(struct rb_epoll *ep)
+static int epwait_forever(struct ep_per_thread *ept)
 {
 	int n;
 
 	do {
-		(void)rb_io_wait_readable(ep->fd);
-		n = safe_epoll_wait(ep);
+		(void)rb_io_wait_readable(ept->ep->fd);
+		n = safe_epoll_wait(ept);
 	} while (n == 0);
 
 	return n;
 }
 
-static int epwait_timed(struct rb_epoll *ep)
+static int epwait_timed(struct ep_per_thread *ept)
 {
 	struct timeval tv;
 
-	tv.tv_sec = ep->timeout / 1000;
-	tv.tv_usec = (ep->timeout % 1000) * 1000;
+	tv.tv_sec = ept->timeout / 1000;
+	tv.tv_usec = (ept->timeout % 1000) * 1000;
 
 	for (;;) {
 		struct timeval t0, now, diff;
 		int n;
+		int fd = ept->ep->fd;
 		fd_set rfds;
 
 		FD_ZERO(&rfds);
-		FD_SET(ep->fd, &rfds);
+		FD_SET(fd, &rfds);
 
 		gettimeofday(&t0, NULL);
-		(void)rb_thread_select(ep->fd + 1, &rfds, NULL, NULL, &tv);
-		n = safe_epoll_wait(ep);
+		(void)rb_thread_select(fd + 1, &rfds, NULL, NULL, &tv);
+		n = safe_epoll_wait(ept);
 		if (n != 0)
 			return n;
 
@@ -79,16 +80,16 @@ static int epwait_timed(struct rb_epoll *ep)
 	return -1;
 }
 
-static VALUE real_epwait(struct rb_epoll *ep)
+static VALUE real_epwait(struct ep_per_thread *ept)
 {
 	int n;
 
-	if (ep->timeout == -1)
-		n = epwait_forever(ep);
-	else if (ep->timeout == 0)
-		n = safe_epoll_wait(ep);
+	if (ept->timeout == -1)
+		n = epwait_forever(ept);
+	else if (ept->timeout == 0)
+		n = safe_epoll_wait(ept);
 	else
-		n = epwait_timed(ep);
+		n = epwait_timed(ept);
 
-	return epwait_result(ep, n);
+	return epwait_result(ept, n);
 }
diff --git a/test/test_epoll.rb b/test/test_epoll.rb
index c96a733..7633d94 100644
--- a/test/test_epoll.rb
+++ b/test/test_epoll.rb
@@ -1,7 +1,9 @@
 require 'test/unit'
 require 'fcntl'
 require 'socket'
+require 'thread'
 $-w = true
+Thread.abort_on_exception = true
 
 require 'sleepy_penguin'
 
@@ -439,7 +441,7 @@ class TestEpoll < Test::Unit::TestCase
   def test_epoll_wait_signal_torture
     usr1 = 0
     empty = 0
-    nr = 1000
+    nr = 100
     @ep.add(@rd, Epoll::IN)
     tmp = []
     trap(:USR1) { usr1 += 1 }
@@ -461,4 +463,48 @@ class TestEpoll < Test::Unit::TestCase
     ensure
       trap(:USR1, "DEFAULT")
   end if ENV["STRESS"].to_i != 0
+
+  def test_wait_one_event_per_thread
+    thr = []
+    pipes = {}
+    lock = Mutex.new
+    maxevents = 1
+    ok = []
+    nr = 10
+    nr.times do
+      r, w = IO.pipe
+      pipes[r] = w
+      @ep.add(r, Epoll::IN | Epoll::ET | Epoll::ONESHOT)
+
+      t = Thread.new do
+        sleep 2
+        events = 0
+        @ep.wait(maxevents) do |_,obj|
+          assert pipes.include?(obj), "#{obj.inspect} is unknown"
+          lock.synchronize { ok << obj }
+          events += 1
+        end
+        events
+      end
+      thr << t
+    end
+    pipes.each_value { |w| w.syswrite '.' }
+    thr.each do |t|
+      begin
+        t.run
+      rescue ThreadError
+      end
+    end
+
+    thr.each { |t| assert_equal 1, t.value }
+    assert_equal nr, ok.size, ok.inspect
+    assert_equal ok.size, ok.uniq.size, ok.inspect
+    assert_equal ok.map { |io| io.fileno }.sort,
+                 pipes.keys.map { |io| io.fileno }.sort
+  ensure
+    pipes.each do |r,w|
+      r.close
+      w.close
+    end
+  end
 end
-- 
Eric Wong