librelist archives

« back to archive

[PATCH 0/17] kqueue and epoll fixes

[PATCH 0/17] kqueue and epoll fixes

From:
Eric Wong
Date:
2013-04-30 @ 02:39
I'm slowly fleshing out kqueue support and also fixing some
timing-related bugs in the test suite.  kqueue support is
tested on FreeBSD 9.1

The kqueue code should also be compatible with Rubinius,
(tested against the stable libkqueue branch under Linux)

(diffstat against "master")

 ext/sleepy_penguin/epoll.c          |  15 +-
 ext/sleepy_penguin/extconf.rb       |   6 +-
 ext/sleepy_penguin/init.c           |  11 +
 ext/sleepy_penguin/kqueue.c         | 643 ++++++++++++++++++++++++++++++++++++
 ext/sleepy_penguin/sleepy_penguin.h |  12 +
 ext/sleepy_penguin/value2timespec.h |   2 +-
 lib/sleepy_penguin.rb               |   1 -
 lib/sleepy_penguin/epoll.rb         |  53 +--
 lib/sleepy_penguin/kevent.rb        |   3 +
 lib/sleepy_penguin/kqueue.rb        | 115 +++++++
 lib/sleepy_penguin/kqueue/io.rb     |  30 ++
 pkg.mk                              |   2 +-
 test/test_epoll.rb                  |  89 +++--
 test/test_epoll_io.rb               |   3 +-
 test/test_epoll_optimizations.rb    |   1 -
 test/test_inotify.rb                |   2 +-
 test/test_kqueue.rb                 |  75 +++++
 test/test_kqueue_io.rb              | 107 ++++++
 test/test_timerfd.rb                |   4 +-
 19 files changed, 1078 insertions(+), 96 deletions(-)

Eric Wong (17):
      test_epoll: remove assert_nothing_raised
      test: remove Rubinius-specific checks and skips
      test_epoll: avoid sleeping inside a signal handler
      fork-safe "to_io" in high-level epoll/kqueue
      test_kqueue: join thread after test
      test_kqueue_io: test for multiple event return
      test_timerfd: relax timing-sensitive test
      kqueue: set zero timeout if not retrieving events
      test_epoll: workaround MRI 1.8 threading bug
      test_kqueue_io: join thread in test when done using
      test_kqueue: only test if IO#autoclose= exists
      kqueue/io: fix MRI 1.8 support code for event retrieval
      kqueue: workaround lack of RSTRUCT* macros on Rubinius
      test_epoll: join thread before return from test
      test_epoll: increase delay between signal spamming
      epoll: clear FD marks snapshot before returning
      test_epoll: workaround race condition in test_close

[PATCH 01/17] test_epoll: remove assert_nothing_raised

From:
Eric Wong
Date:
2013-04-30 @ 02:39
assert_nothing_raised hides backtraces on real errors,
so we'll stop doing it, now.
---
 test/test_epoll.rb | 60 ++++++++++++++++++++++--------------------------------
 1 file changed, 24 insertions(+), 36 deletions(-)

diff --git a/test/test_epoll.rb b/test/test_epoll.rb
index 7c648ff..b2c5c48 100644
--- a/test/test_epoll.rb
+++ b/test/test_epoll.rb
@@ -122,11 +122,9 @@ def test_edge_accept
   end
 
   def teardown
-    assert_nothing_raised do
-      @rd.close unless @rd.closed?
-      @wr.close unless @wr.closed?
-      @ep.close unless @ep.closed?
-    end
+    @rd.close unless @rd.closed?
+    @wr.close unless @wr.closed?
+    @ep.close unless @ep.closed?
   end
 
   def test_max_events_big
@@ -170,11 +168,9 @@ def test_signal_safe_wait_forever
       exit!(0)
     end
     time[:START_WAIT] = Time.now
-    assert_nothing_raised do
-      @ep.wait do |flags, obj|
-        tmp << [ flags, obj ]
-        time[:EP] = Time.now
-      end
+    @ep.wait do |flags, obj|
+      tmp << [ flags, obj ]
+      time[:EP] = Time.now
     end
     assert_equal([[Epoll::IN, @rd]], tmp)
     _, status = Process.waitpid2(pid)
@@ -222,12 +218,10 @@ def test_hup
 
   def test_multiple
     r, w = IO.pipe
-    assert_nothing_raised do
-      @ep.add r, Epoll::IN
-      @ep.add @rd, Epoll::IN
-      @ep.add w, Epoll::OUT
-      @ep.add @wr, Epoll::OUT
-    end
+    @ep.add r, Epoll::IN
+    @ep.add @rd, Epoll::IN
+    @ep.add w, Epoll::OUT
+    @ep.add @wr, Epoll::OUT
     tmp = []
     @ep.wait { |flags, obj| tmp << [ flags, obj ] }
     assert_equal 2, tmp.size
@@ -238,16 +232,14 @@ def test_multiple
   end
 
   def test_gc
-    assert_nothing_raised { 4096.times { Epoll.new } }
+    4096.times { Epoll.new }
     assert ! @ep.closed?
   end unless RBX
 
   def test_gc_to_io
-    assert_nothing_raised do
-      4096.times do
-        ep = Epoll.new
-        assert_kind_of IO, ep.to_io
-      end
+    4096.times do
+      ep = Epoll.new
+      assert_kind_of IO, ep.to_io
     end
     assert ! @ep.closed?
   end unless RBX
@@ -259,7 +251,7 @@ def test_clone
     clone.add @wr, Epoll::OUT
     @ep.wait(nil, 0) { |flags, obj| tmp << [ flags, obj ] }
     assert_equal([[Epoll::OUT, @wr]], tmp)
-    assert_nothing_raised { clone.close }
+    clone.close
   end
 
   def test_dup
@@ -269,16 +261,14 @@ def test_dup
     clone.add @wr, Epoll::OUT
     @ep.wait(nil, 0) { |flags, obj| tmp << [ flags, obj ] }
     assert_equal([[Epoll::OUT, @wr]], tmp)
-    assert_nothing_raised { clone.close }
+    clone.close
   end
 
   def test_set_idempotency
-    assert_nothing_raised do
-      @ep.set @rd, Epoll::IN
-      @ep.set @rd, Epoll::IN
-      @ep.set @wr, Epoll::OUT
-      @ep.set @wr, Epoll::OUT
-    end
+    @ep.set @rd, Epoll::IN
+    @ep.set @rd, Epoll::IN
+    @ep.set @wr, Epoll::OUT
+    @ep.set @wr, Epoll::OUT
   end
 
   def test_wait_timeout
@@ -290,10 +280,8 @@ def test_wait_timeout
 
   def test_del
     assert_raises(Errno::ENOENT) { @ep.del(@rd) }
-    assert_nothing_raised do
-      @ep.add(@rd, Epoll::IN)
-      @ep.del(@rd)
-    end
+    @ep.add(@rd, Epoll::IN)
+    @ep.del(@rd)
   end
 
   def test_wait_read
@@ -368,7 +356,7 @@ def test_new
   def test_delete
     assert_nil @ep.delete(@rd)
     assert_nil @ep.delete(@wr)
-    assert_nothing_raised { @ep.add @rd, Epoll::IN }
+    @ep.add @rd, Epoll::IN
     assert_equal @rd, @ep.delete(@rd)
     assert_nil @ep.delete(@rd)
   end
@@ -471,7 +459,7 @@ def test_epoll_wait_signal_torture
       exit!(0)
     end
     while tmp.empty?
-      assert_nothing_raised { @ep.wait(nil, 100) { |flags,obj| tmp << obj } }
+      @ep.wait(nil, 100) { |flags,obj| tmp << obj }
       empty += 1
     end
     _, status = Process.waitpid2(pid)
-- 
1.8.2.1.367.gc875ca7

[PATCH 03/17] test_epoll: avoid sleeping inside a signal handler

From:
Eric Wong
Date:
2013-04-30 @ 02:39
It's generally unsafe to sleep inside a signal handler, and
seems to cause intermittent test failures.
---
 test/test_epoll.rb | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/test/test_epoll.rb b/test/test_epoll.rb
index 163a32c..c6cc198 100644
--- a/test/test_epoll.rb
+++ b/test/test_epoll.rb
@@ -154,10 +154,13 @@ def test_max_events_small
 
   def test_signal_safe_wait_forever
     time = {}
+    thr = nil
     trap(:USR1) do
       time[:USR1] = Time.now
-      sleep 0.5
-      @wr.write '.'
+      thr = Thread.new do
+        sleep 0.5
+        @wr.syswrite '.'
+      end
     end
     @ep.add @rd, Epoll::IN
     tmp = []
@@ -178,6 +181,8 @@ def test_signal_safe_wait_forever
     assert_in_delta(0.5, usr1_delay, 0.1, "usr1_delay=#{usr1_delay}")
     ep_delay = time[:EP] - time[:USR1]
     assert_in_delta(0.5, ep_delay, 0.1, "ep1_delay=#{ep_delay}")
+    assert_kind_of Thread, thr
+    thr.join
     ensure
       trap(:USR1, 'DEFAULT')
   end
-- 
1.8.2.1.367.gc875ca7

[PATCH 12/17] kqueue/io: fix MRI 1.8 support code for event retrieval

From:
Eric Wong
Date:
2013-04-30 @ 02:39
First off, the timeout is not handled properly when timing out,
resulting in an infinite loop.

Secondly, arguments were not passed to the yielded block correctly.

Finally, the return value of kevent was not returned correctly to
the caller.
---
 lib/sleepy_penguin/kqueue/io.rb | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/lib/sleepy_penguin/kqueue/io.rb b/lib/sleepy_penguin/kqueue/io.rb
index 1e5809d..15502d4 100644
--- a/lib/sleepy_penguin/kqueue/io.rb
+++ b/lib/sleepy_penguin/kqueue/io.rb
@@ -15,11 +15,12 @@ def kevent(changelist = nil, nevents = nil, timeout = nil)
       expire_at = timeout ? Time.now + timeout : nil
       begin
         IO.select([self], nil, nil, timeout)
-        n = __kevent(changelist, nevents, 0) do |a,b,c,d,e,f|
-          yield a, b, c, d, e
+        n = __kevent(changelist, nevents, 0) do |*args|
+          yield(*args)
         end
-      end while n == 0 &&
+      end while n == 0 && timeout != 0 &&
                 (expire_at == nil || timeout = __update_timeout(expire_at))
+      n
     else
       # nevents should be zero or nil here
       __kevent(changelist, nevents, 0)
-- 
1.8.2.1.367.gc875ca7

[PATCH 11/17] test_kqueue: only test if IO#autoclose= exists

From:
Eric Wong
Date:
2013-04-30 @ 02:39
The high-level kqueue class is not usable without IO#autoclose
---
 test/test_kqueue.rb | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/test/test_kqueue.rb b/test/test_kqueue.rb
index fb53a1b..f6f21e7 100644
--- a/test/test_kqueue.rb
+++ b/test/test_kqueue.rb
@@ -71,4 +71,5 @@ def test_usable_after_fork
   ensure
     kq.close
   end
-end if defined?(SleepyPenguin::Kqueue)
+end if defined?(SleepyPenguin::Kqueue) &&
+       IO.instance_methods.include?(:autoclose=)
-- 
1.8.2.1.367.gc875ca7

[PATCH 17/17] test_epoll: workaround race condition in test_close

From:
Eric Wong
Date:
2013-04-30 @ 02:39
Wait longer before killing the epoll_wait thread, as we may not
have entered epoll_wait inside that thread before we send
Thread#kill to it.  This caused intermittent IOError as the
thread delected the Epoll::IO object was already closed, before
the snapshot (to prevent GC) could be made.
---
 test/test_epoll.rb | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/test/test_epoll.rb b/test/test_epoll.rb
index bdb550b..4b4437a 100644
--- a/test/test_epoll.rb
+++ b/test/test_epoll.rb
@@ -197,7 +197,8 @@ def test_close
     thr = Thread.new { @ep.wait { |flags, obj| tmp << [ flags, obj ] } }
     @rd.close
     @wr.close
-    assert_nil thr.join(0.01)
+    Thread.pass
+    assert_nil thr.join(0.25)
     assert thr.alive?
     thr.kill
     assert tmp.empty?
-- 
1.8.2.1.367.gc875ca7

[PATCH 04/17] fork-safe "to_io" in high-level epoll/kqueue

From:
Eric Wong
Date:
2013-04-30 @ 02:39
We need to validate the underlying IO object before using
it in a forked child.
---
 lib/sleepy_penguin/epoll.rb  | 51 ++++++++++++++++++++++++--------------------
 lib/sleepy_penguin/kqueue.rb | 39 ++++++++++++++++++---------------
 2 files changed, 50 insertions(+), 40 deletions(-)

diff --git a/lib/sleepy_penguin/epoll.rb b/lib/sleepy_penguin/epoll.rb
index 8d78e46..f29189a 100644
--- a/lib/sleepy_penguin/epoll.rb
+++ b/lib/sleepy_penguin/epoll.rb
@@ -1,45 +1,50 @@
 require 'thread'
 class SleepyPenguin::Epoll
 
-  # Epoll objects may be watched by IO.select and similar methods
-  attr_reader :to_io
-
   # call-seq:
   #     SleepyPenguin::Epoll.new([flags]) -> Epoll object
   #
   # Creates a new Epoll object with an optional +flags+ argument.
   # +flags+ may currently be +:CLOEXEC+ or +0+ (or +nil+).
   def initialize(create_flags = nil)
-    @to_io = SleepyPenguin::Epoll::IO.new(create_flags)
+    @io = SleepyPenguin::Epoll::IO.new(create_flags)
     @mtx = Mutex.new
     @events = []
     @marks = []
     @pid = $$
     @create_flags = create_flags
-    @copies = { @to_io => self }
+    @copies = { @io => self }
   end
 
   def __ep_reinit # :nodoc:
     @events.clear
     @marks.clear
-    @to_io = SleepyPenguin::Epoll::IO.new(@create_flags)
+    @io = SleepyPenguin::Epoll::IO.new(@create_flags)
   end
 
   # auto-reinitialize the Epoll object after forking
   def __ep_check # :nodoc:
     return if @pid == $$
-    return if @to_io.closed?
+    return if @io.closed?
     objects = @copies.values
     @copies.each_key { |epio| epio.close }
     @copies.clear
     __ep_reinit
     objects.each do |obj|
-      io_dup = @to_io.dup
+      io_dup = @io.dup
       @copies[io_dup] = obj
     end
     @pid = $$
   end
 
+  # Epoll objects may be watched by IO.select and similar methods
+  def to_io
+    @mtx.synchronize do
+      __ep_check
+      @io
+    end
+  end
+
   # Calls epoll_wait(2) and yields Integer +events+ and IO objects watched
   # for.  +maxevents+ is the maximum number of events to process at once,
   # lower numbers may prevent starvation when used by epoll_wait in multiple
@@ -59,7 +64,7 @@ def wait(maxevents = 64, timeout = nil)
     # we keep a snapshot of @marks around in case another thread closes
     # the IO while it is being transferred to userspace.  We release mtx
     # so another thread may add events to us while we're sleeping.
-    @to_io.epoll_wait(maxevents, timeout) { |events, io| yield(events, io) }
+    @io.epoll_wait(maxevents, timeout) { |events, io| yield(events, io) }
   ensure
     # hopefully Ruby does not optimize this array away...
     snapshot[0]
@@ -72,7 +77,7 @@ def add(io, events)
     events = __event_flags(events)
     @mtx.synchronize do
       __ep_check
-      @to_io.epoll_ctl(CTL_ADD, io, events)
+      @io.epoll_ctl(CTL_ADD, io, events)
       @events[fd] = events
       @marks[fd] = io
     end
@@ -87,7 +92,7 @@ def del(io)
     fd = io.to_io.fileno
     @mtx.synchronize do
       __ep_check
-      @to_io.epoll_ctl(CTL_DEL, io, 0)
+      @io.epoll_ctl(CTL_DEL, io, 0)
       @events[fd] = @marks[fd] = nil
     end
     0
@@ -110,7 +115,7 @@ def delete(io)
       __ep_check
       cur_io = @marks[fd]
       return if nil == cur_io || cur_io.to_io.closed?
-      @to_io.epoll_ctl(CTL_DEL, io, 0)
+      @io.epoll_ctl(CTL_DEL, io, 0)
       @events[fd] = @marks[fd] = nil
     end
     io
@@ -127,7 +132,7 @@ def mod(io, events)
     fd = io.to_io.fileno
     @mtx.synchronize do
       __ep_check
-      @to_io.epoll_ctl(CTL_MOD, io, events)
+      @io.epoll_ctl(CTL_MOD, io, events)
       @marks[fd] = io # may be a different object with same fd/file
       @events[fd] = events
     end
@@ -159,18 +164,18 @@ def set(io, events)
         cur_events = @events[fd]
         return 0 if (cur_events & ONESHOT) == 0 && cur_events == events
         begin
-          @to_io.epoll_ctl(CTL_MOD, io, events)
+          @io.epoll_ctl(CTL_MOD, io, events)
         rescue Errno::ENOENT
           warn "epoll event cache failed (mod -> add)"
-          @to_io.epoll_ctl(CTL_ADD, io, events)
+          @io.epoll_ctl(CTL_ADD, io, events)
           @marks[fd] = io
         end
       else
         begin
-          @to_io.epoll_ctl(CTL_ADD, io, events)
+          @io.epoll_ctl(CTL_ADD, io, events)
         rescue Errno::EEXIST
           warn "epoll event cache failed (add -> mod)"
-          @to_io.epoll_ctl(CTL_MOD, io, events)
+          @io.epoll_ctl(CTL_MOD, io, events)
         end
         @marks[fd] = io
       end
@@ -186,8 +191,8 @@ def set(io, events)
   # Raises IOError if object is already closed.
   def close
     @mtx.synchronize do
-      @copies.delete(@to_io)
-      @to_io.close
+      @copies.delete(@io)
+      @io.close
     end
   end
 
@@ -197,7 +202,7 @@ def close
   # Returns whether or not an Epoll object is closed.
   def closed?
     @mtx.synchronize do
-      @to_io.closed?
+      @io.closed?
     end
   end
 
@@ -254,9 +259,9 @@ def initialize_copy(src) # :nodoc:
     @mtx.synchronize do
       __ep_check
       rv = super
-      unless @to_io.closed?
-        @to_io = @to_io.dup
-        @copies[@to_io] = self
+      unless @io.closed?
+        @io = @io.dup
+        @copies[@io] = self
       end
       rv
     end
diff --git a/lib/sleepy_penguin/kqueue.rb b/lib/sleepy_penguin/kqueue.rb
index fbbde8a..1eeb641 100644
--- a/lib/sleepy_penguin/kqueue.rb
+++ b/lib/sleepy_penguin/kqueue.rb
@@ -8,23 +8,28 @@
 # Events registered to a Kqueue object cannot be shared across fork
 # due to the underlying implementation of kqueue in *BSDs.
 class SleepyPenguin::Kqueue
-  # Kqueue objects may be watched by IO.select and similar methods
-  attr_reader :to_io
-
   def initialize
-    @to_io = SleepyPenguin::Kqueue::IO.new
+    @io = SleepyPenguin::Kqueue::IO.new
     @mtx = Mutex.new
     @pid = $$
-    @copies = { @to_io => self }
+    @copies = { @io => self }
+  end
+
+  # Kqueue objects may be watched by IO.select and similar methods
+  def to_io
+    @mtx.synchronize do
+      __kq_check
+      @io
+    end
   end
 
   def __kq_reinit # :nodoc:
-    @to_io = SleepyPenguin::Kqueue::IO.new
+    @io = SleepyPenguin::Kqueue::IO.new
   end
 
   def __kq_check # :nodoc:
-    return if @pid == $$ || @to_io.closed?
-    unless @to_io.respond_to?(:autoclose=)
+    return if @pid == $$ || @io.closed?
+    unless @io.respond_to?(:autoclose=)
       raise RuntimeError,
        "Kqueue is not safe to use without IO#autoclose=, upgrade to Ruby 1.9+"
     end
@@ -35,7 +40,7 @@ def __kq_check # :nodoc:
     @copies.clear
     __kq_reinit
     objects.each do |obj|
-      io_dup = @to_io.dup
+      io_dup = @io.dup
       @copies[io_dup] = obj
     end
     @pid = $$
@@ -61,7 +66,7 @@ def kevent(changelist = nil, *args)
     end
 
     if block_given?
-      n = @to_io.kevent(changelist, *args) do |ident,filter,flags,
+      n = @io.kevent(changelist, *args) do |ident,filter,flags,
                                                fflags,data,udata|
         # This may raise and cause events to be lost,
         # that's the users' fault/problem
@@ -70,7 +75,7 @@ def kevent(changelist = nil, *args)
                                         fflags, data, udata)
       end
     else
-      n = @to_io.kevent(changelist, *args)
+      n = @io.kevent(changelist, *args)
     end
   end
 
@@ -78,9 +83,9 @@ def initialize_copy(src) # :nodoc:
     @mtx.synchronize do
       __kq_check
       rv = super
-      unless @to_io.closed?
-        @to_io = @to_io.dup
-        @copies[@to_io] = self
+      unless @io.closed?
+        @io = @io.dup
+        @copies[@io] = self
       end
       rv
     end
@@ -93,8 +98,8 @@ def initialize_copy(src) # :nodoc:
   # Raises IOError if object is already closed.
   def close
     @mtx.synchronize do
-      @copies.delete(@to_io)
-      @to_io.close
+      @copies.delete(@io)
+      @io.close
     end
   end
 
@@ -104,7 +109,7 @@ def close
   # Returns whether or not an Kqueue object is closed.
   def closed?
     @mtx.synchronize do
-      @to_io.closed?
+      @io.closed?
     end
   end
 end
-- 
1.8.2.1.367.gc875ca7

[PATCH 02/17] test: remove Rubinius-specific checks and skips

From:
Eric Wong
Date:
2013-04-30 @ 02:39
These skips no longer seem needed.  Removed the GC tests since
they were unreliable (even on MRI), anyways.
---
 test/test_epoll.rb               | 16 +---------------
 test/test_epoll_io.rb            |  1 -
 test/test_epoll_optimizations.rb |  1 -
 3 files changed, 1 insertion(+), 17 deletions(-)

diff --git a/test/test_epoll.rb b/test/test_epoll.rb
index b2c5c48..163a32c 100644
--- a/test/test_epoll.rb
+++ b/test/test_epoll.rb
@@ -9,7 +9,6 @@
 
 class TestEpoll < Test::Unit::TestCase
   include SleepyPenguin
-  RBX = defined?(RUBY_ENGINE) && (RUBY_ENGINE == 'rbx')
 
   def setup
     @rd, @wr = IO.pipe
@@ -181,7 +180,7 @@ def test_signal_safe_wait_forever
     assert_in_delta(0.5, ep_delay, 0.1, "ep1_delay=#{ep_delay}")
     ensure
       trap(:USR1, 'DEFAULT')
-  end unless RBX
+  end
 
   def test_close
     @ep.add @rd, Epoll::IN
@@ -231,19 +230,6 @@ def test_multiple
     assert ios.include?(w)
   end
 
-  def test_gc
-    4096.times { Epoll.new }
-    assert ! @ep.closed?
-  end unless RBX
-
-  def test_gc_to_io
-    4096.times do
-      ep = Epoll.new
-      assert_kind_of IO, ep.to_io
-    end
-    assert ! @ep.closed?
-  end unless RBX
-
   def test_clone
     tmp = []
     clone = @ep.clone
diff --git a/test/test_epoll_io.rb b/test/test_epoll_io.rb
index a899e0d..daaa229 100644
--- a/test/test_epoll_io.rb
+++ b/test/test_epoll_io.rb
@@ -8,7 +8,6 @@
 
 class TestEpollIO < Test::Unit::TestCase
   include SleepyPenguin
-  RBX = defined?(RUBY_ENGINE) && (RUBY_ENGINE == 'rbx')
 
   def setup
     @rd, @wr = IO.pipe
diff --git a/test/test_epoll_optimizations.rb b/test/test_epoll_optimizations.rb
index f5970fd..c03b9d6 100644
--- a/test/test_epoll_optimizations.rb
+++ b/test/test_epoll_optimizations.rb
@@ -9,7 +9,6 @@
 
 class TestEpollOptimizations < Test::Unit::TestCase
   include SleepyPenguin
-  RBX = defined?(RUBY_ENGINE) && (RUBY_ENGINE == 'rbx')
   IO_PURGATORY = []
 
   def setup
-- 
1.8.2.1.367.gc875ca7

[PATCH 13/17] kqueue: workaround lack of RSTRUCT* macros on Rubinius

From:
Eric Wong
Date:
2013-04-30 @ 02:39
Rubinius will not support RSTRUCT* macros, so converting the
structs to arrays is the least intrusive way to go about our
code.

ref: https://github.com/rubinius/rubinius/issues/494
---
 ext/sleepy_penguin/kqueue.c | 70 ++++++++++++++++++++++++++++++++-------------
 test/test_kqueue_io.rb      |  6 ++++
 2 files changed, 56 insertions(+), 20 deletions(-)

diff --git a/ext/sleepy_penguin/kqueue.c b/ext/sleepy_penguin/kqueue.c
index 78a13c3..155204b 100644
--- a/ext/sleepy_penguin/kqueue.c
+++ b/ext/sleepy_penguin/kqueue.c
@@ -24,6 +24,18 @@
 #  define NUM2USHORT(n) (short)NUM2UINT(n)
 #endif
 
+/*
+ * Rubinius does not support RSTRUCT_* in the C API:
+ * ref: https://github.com/rubinius/rubinius/issues/494
+ */
+#if defined(RUBINIUS)
+#  define RBX_STRUCT (1)
+#  define RSTRUCT_LEN(s) 0, rb_bug("RSTRUCT_LEN attempted in Rubinius")
+#  define RSTRUCT_PTR(s) NULL, rb_bug("RSTRUCT_PTR attempted in Rubinius")
+#else
+#  define RBX_STRUCT (0)
+#endif
+
 static const long NANO_PER_SEC = 1000000000;
 static ID id_for_fd;
 static VALUE mEv, mEvFilt, mNote, mVQ;
@@ -233,33 +245,42 @@ static void event_set(struct kevent *event, VALUE *chg)
 	EV_SET(event, ident, filter, flags, fflags, data, udata);
 }
 
+/* sets ptr and len */
+static void unpack_event(VALUE **ptr, VALUE *len, VALUE *event)
+{
+	switch (TYPE(*event)) {
+	case T_STRUCT:
+		if (RBX_STRUCT) {
+			*event = rb_funcall(*event, rb_intern("to_a"), 0, 0);
+			/* fall-through to T_ARRAY */
+		} else {
+			*len = RSTRUCT_LEN(*event);
+			*ptr = RSTRUCT_PTR(*event);
+			return;
+		}
+	case T_ARRAY:
+		*len = RARRAY_LEN(*event);
+		*ptr = RARRAY_PTR(*event);
+		return;
+	default:
+		rb_raise(rb_eTypeError, "unsupported type in changelist");
+	}
+}
+
 static void ary2eventlist(struct kevent *events, VALUE changelist)
 {
 	VALUE *chg = RARRAY_PTR(changelist);
 	long i = RARRAY_LEN(changelist);
+	VALUE event;
 
 	for (; --i >= 0; chg++) {
 		VALUE clen;
 		VALUE *cptr;
 
-		switch (TYPE(*chg)) {
-		case T_STRUCT:
-			clen = RSTRUCT_LEN(*chg);
-			cptr = RSTRUCT_PTR(*chg);
-			break;
-		case T_ARRAY:
-			clen = RARRAY_LEN(*chg);
-			cptr = RARRAY_PTR(*chg);
-			break;
-		default:
-			rb_raise(rb_eTypeError,
-				 "unsupported type in changelist");
-		}
-		if (clen != 6) {
-			fprintf(stderr, "clen: %ld\n", clen);
-			rb_p(*chg);
+		event = *chg;
+		unpack_event(&cptr, &clen, &event);
+		if (clen != 6)
 			goto out_list;
-		}
 		event_set(events++, cptr);
 	}
 	return;
@@ -273,14 +294,23 @@ out_list:
  */
 static void changelist_prepare(struct kevent *events, VALUE changelist)
 {
+	VALUE *cptr;
+	VALUE clen;
+	VALUE event;
+
 	switch (TYPE(changelist)) {
 	case T_ARRAY:
 		ary2eventlist(events, changelist);
-		break;
+		return;
 	case T_STRUCT:
-		if (RSTRUCT_LEN(changelist) != 6)
+		event = changelist;
+		unpack_event(&cptr, &clen, &event);
+		if (clen != 6)
 			rb_raise(rb_eTypeError, "event is not a Kevent struct");
-		event_set(events, RSTRUCT_PTR(changelist));
+		event_set(events, cptr);
+		return;
+	default:
+		rb_bug("changelist_prepare not type filtered by sp_kevent");
 	}
 }
 
diff --git a/test/test_kqueue_io.rb b/test/test_kqueue_io.rb
index 904a1cc..076c9f0 100644
--- a/test/test_kqueue_io.rb
+++ b/test/test_kqueue_io.rb
@@ -16,6 +16,12 @@ def teardown
     end
   end
 
+  def test_bad_type
+    kq = Kqueue::IO.new
+    @to_close << kq
+    assert_raises(TypeError) { kq.kevent("HI") }
+  end
+
   def test_multi_event
     kq = Kqueue::IO.new
     @to_close << kq
-- 
1.8.2.1.367.gc875ca7

[PATCH 09/17] test_epoll: workaround MRI 1.8 threading bug

From:
Eric Wong
Date:
2013-04-30 @ 02:39
Threads do not seem safe to start inside signal handlers on
Matz Ruby 1.8
---
 test/test_epoll.rb | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git a/test/test_epoll.rb b/test/test_epoll.rb
index c6cc198..5bf332f 100644
--- a/test/test_epoll.rb
+++ b/test/test_epoll.rb
@@ -153,14 +153,17 @@ def test_max_events_small
   end
 
   def test_signal_safe_wait_forever
+    sigpipe = IO.pipe
     time = {}
-    thr = nil
+    thr = Thread.new do
+      IO.select([sigpipe[0]]) # wait for USR1
+      sigpipe[0].read(1)
+      sleep 0.5
+      @wr.syswrite '.'
+    end
     trap(:USR1) do
       time[:USR1] = Time.now
-      thr = Thread.new do
-        sleep 0.5
-        @wr.syswrite '.'
-      end
+      sigpipe[1].syswrite('.') # wake up thr
     end
     @ep.add @rd, Epoll::IN
     tmp = []
@@ -184,6 +187,7 @@ def test_signal_safe_wait_forever
     assert_kind_of Thread, thr
     thr.join
     ensure
+      sigpipe.each { |io| io.close }
       trap(:USR1, 'DEFAULT')
   end
 
-- 
1.8.2.1.367.gc875ca7

[PATCH 05/17] test_kqueue: join thread after test

From:
Eric Wong
Date:
2013-04-30 @ 02:39
It's good to cleanup after ourselves.
---
 test/test_kqueue.rb | 19 +++++++++++++++++++
 1 file changed, 19 insertions(+)

diff --git a/test/test_kqueue.rb b/test/test_kqueue.rb
index 408783e..fb53a1b 100644
--- a/test/test_kqueue.rb
+++ b/test/test_kqueue.rb
@@ -34,6 +34,7 @@ def test_kqueue
     end
     assert_equal 0, events.size
     assert_equal 0, n
+    thr.join
 
     # synchronous add
     events = []
@@ -52,4 +53,22 @@ def test_kqueue
     rd.close if rd
     wr.close if wr
   end
+
+  def test_usable_after_fork
+    kq = Kqueue.new
+    pid = fork do
+      begin
+        ok = false
+        assert_equal(0, kq.kevent(nil, 1, 0.1) { exit!(false) })
+        ok = true
+      ensure
+        exit!(ok)
+      end
+    end
+    assert_equal(0, kq.kevent(nil, 1, 0.1) { exit!(false) })
+    _, status = Process.waitpid2(pid)
+    assert status.success?, status.inspect
+  ensure
+    kq.close
+  end
 end if defined?(SleepyPenguin::Kqueue)
-- 
1.8.2.1.367.gc875ca7

[PATCH 16/17] epoll: clear FD marks snapshot before returning

From:
Eric Wong
Date:
2013-04-30 @ 02:39
This allows the heap to reclaim memory sooner (than waiting for
GC), lowering memory usage and perhaps speeding up future
allocations.
---
 lib/sleepy_penguin/epoll.rb | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lib/sleepy_penguin/epoll.rb b/lib/sleepy_penguin/epoll.rb
index f29189a..637db8d 100644
--- a/lib/sleepy_penguin/epoll.rb
+++ b/lib/sleepy_penguin/epoll.rb
@@ -67,7 +67,7 @@ def wait(maxevents = 64, timeout = nil)
     @io.epoll_wait(maxevents, timeout) { |events, io| yield(events, io) }
   ensure
     # hopefully Ruby does not optimize this array away...
-    snapshot[0]
+    snapshot.clear
   end
 
   # Starts watching a given +io+ object with +events+ which may be an Integer
-- 
1.8.2.1.367.gc875ca7

[PATCH 06/17] test_kqueue_io: test for multiple event return

From:
Eric Wong
Date:
2013-04-30 @ 02:39
This is not _my_ common use case, but some people may
want to fetch multiple events at once.
---
 test/test_kqueue_io.rb | 47 +++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 47 insertions(+)

diff --git a/test/test_kqueue_io.rb b/test/test_kqueue_io.rb
index ea18767..dc5f8ea 100644
--- a/test/test_kqueue_io.rb
+++ b/test/test_kqueue_io.rb
@@ -6,6 +6,53 @@
 class TestKqueueIO < Test::Unit::TestCase
   include SleepyPenguin
 
+  def setup
+    @to_close = []
+  end
+
+  def teardown
+    @to_close.each do |io|
+      io.close unless io.closed?
+    end
+  end
+
+  def test_multi_event
+    kq = Kqueue::IO.new
+    @to_close << kq
+    list = []
+    pipes = [ IO.pipe, IO.pipe, IO.pipe, IO.pipe ]
+    pipes.each do |(r,w)|
+      @to_close << r
+      @to_close << w
+      list << Kevent[r.fileno, EvFilt::READ, Ev::ADD|Ev::ONESHOT, 0, 0, r]
+    end
+    kq.kevent(list)
+
+    pipes.each do |(_,w)|
+      w.syswrite('.')
+    end
+    received = []
+    seen = {}
+    kq.kevent(nil, 1) do |*args|
+      received << args
+      assert_equal 6, args.size
+      assert_kind_of IO, args[5]
+      assert_nil seen[args[5]]
+      seen[args[5]] = true
+    end
+
+    assert_equal 1, received.size
+
+    kq.kevent(nil, 666) do |*args|
+      received << args
+      assert_equal 6, args.size
+      assert_kind_of IO, args[5]
+      assert_nil seen[args[5]]
+      seen[args[5]] = true
+    end
+    assert_equal 4, received.size
+  end
+
   def test_xthread
     kq = Kqueue::IO.new
     assert_kind_of IO, kq
-- 
1.8.2.1.367.gc875ca7

[PATCH 10/17] test_kqueue_io: join thread in test when done using

From:
Eric Wong
Date:
2013-04-30 @ 02:39
Due to strange scheduling, the ensure clause could fire while
the thread was still inside wr.syswrite even though the main
thread received the event and exited the method.
---
 test/test_kqueue_io.rb | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/test/test_kqueue_io.rb b/test/test_kqueue_io.rb
index dc5f8ea..904a1cc 100644
--- a/test/test_kqueue_io.rb
+++ b/test/test_kqueue_io.rb
@@ -72,6 +72,8 @@ def test_xthread
     assert_equal EvFilt::READ, events[0][1]
     assert_equal 1, n
 
+    thr.join
+
     # we should be drained
     events = []
     n = kq.kevent(nil, 1, 0) do |ident,filter,flags,fflags,data,udata|
-- 
1.8.2.1.367.gc875ca7

[PATCH 15/17] test_epoll: increase delay between signal spamming

From:
Eric Wong
Date:
2013-04-30 @ 02:39
Frequently sending signals can lead to high memory usage and
slowdowns on some Ruby + malloc implementations.
---
 test/test_epoll.rb | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/test/test_epoll.rb b/test/test_epoll.rb
index af70fa2..bdb550b 100644
--- a/test/test_epoll.rb
+++ b/test/test_epoll.rb
@@ -450,7 +450,7 @@ def test_epoll_wait_signal_torture
       trap(:USR1, "DEFAULT")
       sleep 0.1
       ppid = Process.ppid
-      nr.times { Process.kill(:USR1, ppid); sleep 0.01 }
+      nr.times { Process.kill(:USR1, ppid); sleep 0.05 }
       @wr.syswrite('.')
       exit!(0)
     end
-- 
1.8.2.1.367.gc875ca7

[PATCH 07/17] test_timerfd: relax timing-sensitive test

From:
Eric Wong
Date:
2013-04-30 @ 02:39
This test failed on overloaded systems (and may still fail)
Unfortunately timers are hard to test as system latency
must be taken into account.
---
 test/test_timerfd.rb | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/test/test_timerfd.rb b/test/test_timerfd.rb
index e8dc321..81aa6d3 100644
--- a/test/test_timerfd.rb
+++ b/test/test_timerfd.rb
@@ -65,9 +65,9 @@ def test_gettime
 
   def test_expirations_nonblock
     tfd = TimerFD.new(:MONOTONIC)
-    assert_equal([0, 0], tfd.settime(0, 0, 0.01))
+    assert_equal([0, 0], tfd.settime(0, 0, 0.05))
     assert_nil tfd.expirations(true)
-    sleep 0.01
+    sleep 0.05
     assert_equal 1, tfd.expirations
   end
 end if defined?(SleepyPenguin::TimerFD)
-- 
1.8.2.1.367.gc875ca7

[PATCH 08/17] kqueue: set zero timeout if not retrieving events

From:
Eric Wong
Date:
2013-04-30 @ 02:39
Having a timeout does not make sense if not retrieving events,
so avoid potentially triggering bugs or strange behavior between
different kqueue implementations.
---
 ext/sleepy_penguin/kqueue.c | 1 +
 1 file changed, 1 insertion(+)

diff --git a/ext/sleepy_penguin/kqueue.c b/ext/sleepy_penguin/kqueue.c
index 8e33592..78a13c3 100644
--- a/ext/sleepy_penguin/kqueue.c
+++ b/ext/sleepy_penguin/kqueue.c
@@ -317,6 +317,7 @@ static VALUE sp_kevent(int argc, VALUE *argv, VALUE self)
 			rb_raise(rb_eArgError,
 				"nevents specified but block not given");
 		nevents = 0;
+		timeout = INT2FIX(0);
 	}
 
 	kpt = kpt_get(self, nchanges, nevents);
-- 
1.8.2.1.367.gc875ca7

[PATCH 14/17] test_epoll: join thread before return from test

From:
Eric Wong
Date:
2013-04-30 @ 02:39
We want to avoid closing the descriptor while the thread
is running.
---
 test/test_epoll.rb | 1 +
 1 file changed, 1 insertion(+)

diff --git a/test/test_epoll.rb b/test/test_epoll.rb
index 5bf332f..af70fa2 100644
--- a/test/test_epoll.rb
+++ b/test/test_epoll.rb
@@ -201,6 +201,7 @@ def test_close
     assert thr.alive?
     thr.kill
     assert tmp.empty?
+    thr.join
   end
 
   def test_rdhup
-- 
1.8.2.1.367.gc875ca7