librelist archives

« back to archive

[PATCH] refactor and fix leak from thread-local storage use

[PATCH] refactor and fix leak from thread-local storage use

From:
Eric Wong
Date:
2013-12-27 @ 23:08
Storing heap-allocated memory in __thread is not feasible for a
library since it provides no automatic resource de-allocation.

This oversight caused rare applications which use short-lived
threads for epoll_wait, kevent, or inotify read to leak memory over
time.  So we refactor everything to use pthread_* thread-local
storage APIs instead.

While we're at it, we can safely use a common, generic buffer for
inotify, epoll, and kevent to avoid running into PTHREAD_KEYS_MAX
limitations.

These leaks only affected sleepy_penguin v3.2.0 and later, and
only applications which use short-lived threads to call epoll_wait,
kevent and inotify read.
---
 ext/sleepy_penguin/epoll.c          | 17 ++----------
 ext/sleepy_penguin/init.c           | 55 +++++++++++++++++++++++++++++++++++++
 ext/sleepy_penguin/inotify.c        | 44 ++++++++---------------------
 ext/sleepy_penguin/kqueue.c         | 17 ++----------
 ext/sleepy_penguin/sleepy_penguin.h |  1 +
 5 files changed, 72 insertions(+), 62 deletions(-)

diff --git a/ext/sleepy_penguin/epoll.c b/ext/sleepy_penguin/epoll.c
index 5e5cb20..423ed69 100644
--- a/ext/sleepy_penguin/epoll.c
+++ b/ext/sleepy_penguin/epoll.c
@@ -1,7 +1,6 @@
 #include "sleepy_penguin.h"
 #ifdef HAVE_SYS_EPOLL_H
 #include <sys/epoll.h>
-#include <unistd.h>
 #include <time.h>
 #include "missing_clock_gettime.h"
 #include "missing_epoll.h"
@@ -52,10 +51,8 @@ static int ep_fd_check(struct ep_per_thread *ept)
 
 static struct ep_per_thread *ept_get(VALUE self, int maxevents)
 {
-	static __thread struct ep_per_thread *ept;
+	struct ep_per_thread *ept;
 	size_t size;
-	int err;
-	void *ptr;
 
 	/* error check here to prevent OOM from posix_memalign */
 	if (maxevents <= 0) {
@@ -63,21 +60,11 @@ static struct ep_per_thread *ept_get(VALUE self, int 
maxevents)
 		rb_sys_fail("epoll_wait maxevents <= 0");
 	}
 
-	if (ept && ept->capa >= maxevents)
-		goto out;
-
 	size = sizeof(struct ep_per_thread) +
 	       sizeof(struct epoll_event) * maxevents;
 
-	free(ept); /* free(NULL) is POSIX and works on glibc */
-	err = posix_memalign(&ptr, rb_sp_l1_cache_line_size, size);
-	if (err) {
-		errno = err;
-		rb_memerror();
-	}
-	ept = ptr;
+	ept = rb_sp_gettlsbuf(&size);
 	ept->capa = maxevents;
-out:
 	ept->maxevents = maxevents;
 	ept->io = self;
 	ept->fd = rb_sp_fileno(ept->io);
diff --git a/ext/sleepy_penguin/init.c b/ext/sleepy_penguin/init.c
index 90eddbd..776d6e0 100644
--- a/ext/sleepy_penguin/init.c
+++ b/ext/sleepy_penguin/init.c
@@ -4,10 +4,17 @@
 #endif
 
 #include <unistd.h>
+#include <pthread.h>
 #include <sys/types.h>
 #include "git_version.h"
+#include "sleepy_penguin.h"
 #define L1_CACHE_LINE_MAX 128 /* largest I've seen (Pentium 4) */
 size_t rb_sp_l1_cache_line_size;
+static pthread_key_t rb_sp_key;
+struct rb_sp_tlsbuf {
+	size_t capa;
+	unsigned char ptr[FLEX_ARRAY];
+};
 
 #ifdef HAVE_SYS_EVENT_H
 void sleepy_penguin_init_kqueue(void);
@@ -56,9 +63,57 @@ static size_t l1_cache_line_size_detect(void)
 	return L1_CACHE_LINE_MAX;
 }
 
+static void sp_once(void)
+{
+	int err = pthread_key_create(&rb_sp_key, free);
+
+	if (err) {
+		errno = err;
+		rb_sys_fail( "pthread_key_create");
+	}
+}
+
+void *rb_sp_gettlsbuf(size_t *size)
+{
+	struct rb_sp_tlsbuf *buf = pthread_getspecific(rb_sp_key);
+	void *ptr;
+	int err;
+	size_t bytes;
+
+	if (buf && buf->capa >= *size) {
+		*size = buf->capa;
+		goto out;
+	}
+
+	free(buf);
+	bytes = *size + sizeof(struct rb_sp_tlsbuf);
+	err = posix_memalign(&ptr, rb_sp_l1_cache_line_size, bytes);
+	if (err) {
+		errno = err;
+		rb_memerror(); /* fatal */
+	}
+
+	buf = ptr;
+	buf->capa = *size;
+	err = pthread_setspecific(rb_sp_key, buf);
+	if (err != 0) {
+		errno = err;
+		rb_sys_fail("BUG: pthread_setspecific");
+	}
+out:
+	return buf->ptr;
+}
+
 void Init_sleepy_penguin_ext(void)
 {
 	VALUE mSleepyPenguin;
+	static pthread_once_t once = PTHREAD_ONCE_INIT;
+	int err = pthread_once(&once, sp_once);
+
+	if (err) {
+		errno = err;
+		rb_sys_fail("pthread_once");
+	}
 
 	rb_sp_l1_cache_line_size = l1_cache_line_size_detect();
 
diff --git a/ext/sleepy_penguin/inotify.c b/ext/sleepy_penguin/inotify.c
index 5771030..1f2d4ac 100644
--- a/ext/sleepy_penguin/inotify.c
+++ b/ext/sleepy_penguin/inotify.c
@@ -4,11 +4,6 @@
 #include <sys/ioctl.h>
 #include "missing_inotify.h"
 
-struct inbuf {
-	size_t capa;
-	void *ptr;
-};
-
 static ID id_inotify_tmp, id_mask;
 static VALUE cEvent, checks;
 
@@ -137,43 +132,31 @@ static VALUE event_new(struct inotify_event *e)
 
 struct inread_args {
 	int fd;
-	struct inbuf *inbuf;
+	size_t size;
+	void *buf;
 };
 
 static VALUE inread(void *ptr)
 {
 	struct inread_args *args = ptr;
 
-	return (VALUE)read(args->fd, args->inbuf->ptr, args->inbuf->capa);
-}
-
-static void inbuf_grow(struct inbuf *inbuf, size_t size)
-{
-	int err;
-
-	if (inbuf->capa >= size)
-		return;
-	free(inbuf->ptr);
-	err = posix_memalign(&inbuf->ptr, rb_sp_l1_cache_line_size, size);
-	if (err) {
-		errno = err;
-		rb_memerror();
-	}
-	inbuf->capa = size;
+	return (VALUE)read(args->fd, args->buf, args->size);
 }
 
 static void resize_internal_buffer(struct inread_args *args)
 {
 	int newlen;
 
-	if (args->inbuf->capa > 0x10000)
+	if (args->size > 0x10000)
 		rb_raise(rb_eRuntimeError, "path too long");
 
 	if (ioctl(args->fd, FIONREAD, &newlen) != 0)
 		rb_sys_fail("ioctl(inotify,FIONREAD)");
 
-	if (newlen > 0)
-		inbuf_grow(args->inbuf, (size_t)newlen);
+	if (newlen > 0) {
+		args->size = (size_t)newlen;
+		args->buf = rb_sp_gettlsbuf(&args->size);
+	}
 
 	if (newlen == 0) /* race: some other thread grabbed the data */
 		return;
@@ -192,8 +175,6 @@ static void resize_internal_buffer(struct inread_args *args)
  */
 static VALUE take(int argc, VALUE *argv, VALUE self)
 {
-	static __thread struct inbuf inbuf;
-
 	struct inread_args args;
 	VALUE tmp = rb_ivar_get(self, id_inotify_tmp);
 	struct inotify_event *e, *end;
@@ -206,9 +187,9 @@ static VALUE take(int argc, VALUE *argv, VALUE self)
 
 	rb_scan_args(argc, argv, "01", &nonblock);
 
-	inbuf_grow(&inbuf, 128);
 	args.fd = rb_sp_fileno(self);
-	args.inbuf = &inbuf;
+	args.size = 128;
+	args.buf = rb_sp_gettlsbuf(&args.size);
 
 	if (RTEST(nonblock))
 		rb_sp_set_nonblock(args.fd);
@@ -228,9 +209,8 @@ static VALUE take(int argc, VALUE *argv, VALUE self)
 				rb_sys_fail("read(inotify)");
 		} else {
 			/* buffer in userspace to minimize read() calls */
-			end = (struct inotify_event *)
-					((char *)args.inbuf->ptr + r);
-			for (e = args.inbuf->ptr; e < end; ) {
+			end = (struct inotify_event *)((char *)args.buf + r);
+			for (e = args.buf; e < end; ) {
 				VALUE event = event_new(e);
 				if (NIL_P(rv))
 					rv = event;
diff --git a/ext/sleepy_penguin/kqueue.c b/ext/sleepy_penguin/kqueue.c
index 83f3623..22e20f1 100644
--- a/ext/sleepy_penguin/kqueue.c
+++ b/ext/sleepy_penguin/kqueue.c
@@ -74,10 +74,8 @@ static int kq_fd_check(struct kq_per_thread *kpt)
 
 static struct kq_per_thread *kpt_get(VALUE self, int nchanges, int nevents)
 {
-	static __thread struct kq_per_thread *kpt;
+	struct kq_per_thread *kpt;
 	size_t size;
-	void *ptr;
-	int err;
 	int max = nchanges > nevents ? nchanges : nevents;
 
 	/* error check here to prevent OOM from posix_memalign */
@@ -86,20 +84,9 @@ static struct kq_per_thread *kpt_get(VALUE self, int 
nchanges, int nevents)
 		rb_sys_fail("kevent got negative events < 0");
 	}
 
-	if (kpt && kpt->capa >= max)
-		goto out;
-
 	size = sizeof(struct kq_per_thread) + sizeof(struct kevent) * max;
-
-	free(kpt); /* free(NULL) is POSIX */
-	err = posix_memalign(&ptr, rb_sp_l1_cache_line_size, size);
-	if (err) {
-		errno = err;
-		rb_memerror();
-	}
-	kpt = ptr;
+	kpt = rb_sp_gettlsbuf(&size);
 	kpt->capa = max;
-out:
 	kpt->nchanges = nchanges;
 	kpt->nevents = nevents;
 	kpt->io = self;
diff --git a/ext/sleepy_penguin/sleepy_penguin.h 
b/ext/sleepy_penguin/sleepy_penguin.h
index 4ed0663..522ac0a 100644
--- a/ext/sleepy_penguin/sleepy_penguin.h
+++ b/ext/sleepy_penguin/sleepy_penguin.h
@@ -77,6 +77,7 @@ static inline VALUE fake_blocking_region(VALUE 
(*fn)(void *), void *data)
 
 typedef int rb_sp_waitfn(int fd);
 int rb_sp_wait(rb_sp_waitfn waiter, VALUE obj, int *fd);
+void *rb_sp_gettlsbuf(size_t *size);
 
 /* Flexible array elements are standard in C99 */
 #if defined(__STDC_VERSION__) && (__STDC_VERSION__ >= 199901L)
-- 
EW