[PATCH] tevent and threads - infrastructure improvements - version #2

Jeremy Allison jra at samba.org
Mon Sep 14 19:09:30 UTC 2015


On Mon, Sep 14, 2015 at 07:26:56PM +0200, Ralph Böhme wrote:
> On Mon, Sep 14, 2015 at 09:23:08AM -0700, Jeremy Allison wrote:
> > On Sun, Sep 13, 2015 at 02:34:11PM +0200, Ralph Böhme wrote:
> > > Hi Jeremy,
> > > 
> > > On Fri, Jul 24, 2015 at 10:16:15AM -0700, Jeremy Allison wrote:
> > > > On Thu, Jul 23, 2015 at 04:50:37PM -0700, Jeremy Allison wrote:
> > > > > 
> > > > > FYI. I now have a working implementation of this
> > > > > API - passes valgrind memcheck and drd !
> > > > > 
> > > > > Hurrah for me :-).
> > > > > 
> > > > > Will post an updated patch once I've finished
> > > > > updating the tutorial.
> > > > 
> > > > Here it is. Passes valgrind --tool=drd and
> > > > valgrind --tool=memcheck.
> > > > 
> > > > Metze please let me know if this is what
> > > > you had in mind.
> > > > 
> > > > Everyone else just review :-).
> > > 
> > > a few minor issue:
> > > 
> > > * always use talloc_get_type_abort() where possible
> > 
> > Looks good to me. Do you want me to squash these
> > into the main patch and add your signed-off-by ?
> > 
> > > * reverse the order of signalling and unlocking in
> > >   tevent_thread_proxy_schedule()
> > 
> > LGTM. Again, squash and signed-off ?
> > 
> > Also, are you getting to a 'Reviewed-by' ? :-).
> 
> yes on both, feel free to squash and add my signed-off and
> reviewed-by.

Unfortunately - remember:

http://bholley.net/blog/2015/must-be-this-tall-to-write-multi-threaded-code.html

:-).

The optimization change you added:

Reverse order of unlocking and notifying in
tevent_thread_proxy_schedule(): unlocking before notifying avoids a
potential wakeup of the signalled thread with the mutex still locked.

introduces valgrind --tool=drd and --tool =helgrind errors
that are not there with the original code path.

It's OK to wake up the signalled thread with the mutex still locked,
it'll just wait on the mutex until it gets unlocked. No big deal :-).

The reason the mutex must be held until after the signaling
is done is that in the second test code there may be a destructor
waiting to remove the proxy object once the mutex gets released,
and if that happens you get conflicting write errors.

Another proof of the old adage premature optimization is
the root of all evil :-).

Everything else was OK though - squashed into the patch !

Updated patchset with your signed-off-by's attached.

Let me know if this is OK.

Cheers,

Jeremy.
-------------- next part --------------
From 820189e48db8dff89aeedfc6d9309a57a0d8427d Mon Sep 17 00:00:00 2001
From: Jeremy Allison <jra at samba.org>
Date: Thu, 23 Jul 2015 15:23:50 -0700
Subject: [PATCH 1/4] lib: tevent: Initial checkin of threaded tevent context
 calling code.

Adds 2 new functions:

struct tevent_thread_proxy *tevent_thread_proxy_create(
                struct tevent_context *dest_ev_ctx);

void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
		struct tevent_immediate **pp_im,
		tevent_immediate_handler_t handler,
		void *pp_private_data);

Brief doc included. Tests, docs and tutorial to follow.

Signed-off-by: Jeremy Allison <jra at samba.org>
Signed-off-by: Ralph Boehme <slow at samba.org>
---
 lib/tevent/tevent.h         |  48 ++++++
 lib/tevent/tevent_threads.c | 377 ++++++++++++++++++++++++++++++++++++++++++++
 lib/tevent/wscript          |   4 +-
 3 files changed, 427 insertions(+), 2 deletions(-)
 create mode 100644 lib/tevent/tevent_threads.c

diff --git a/lib/tevent/tevent.h b/lib/tevent/tevent.h
index b6c39d1..5c739d5 100644
--- a/lib/tevent/tevent.h
+++ b/lib/tevent/tevent.h
@@ -39,6 +39,7 @@ struct tevent_fd;
 struct tevent_timer;
 struct tevent_immediate;
 struct tevent_signal;
+struct tevent_thread_proxy;
 
 /**
  * @defgroup tevent The tevent API
@@ -1698,6 +1699,53 @@ typedef int (*tevent_nesting_hook)(struct tevent_context *ev,
 				   bool begin,
 				   void *stack_ptr,
 				   const char *location);
+
+/**
+ * @brief Create a tevent_thread_proxy for message passing between threads.
+ *
+ * The tevent_context must have been allocated on the NULL
+ * talloc context, and talloc_disable_null_tracking() must
+ * have been called.
+ *
+ * @param[in]  dest_ev_ctx      The tevent_context to receive events.
+ *
+ * @see tevent_thread_proxy_schedule()
+ */
+struct tevent_thread_proxy *tevent_thread_proxy_create(
+                struct tevent_context *dest_ev_ctx);
+
+/**
+ * @brief Schedule an immediate event on an event context from another thread.
+ *
+ * Causes dest_ev_ctx, being run by another thread, to receive an
+ * immediate event calling the handler with the *pp_private parameter.
+ *
+ * *pp_im must be a pointer to an immediate event talloced on a context owned
+ * by the calling thread, or the NULL context. Ownership will
+ * be transferred to the tevent library and *pp_im will be returned as NULL.
+ *
+ * *pp_private_data must be a talloced area of memory with no destructors.
+ * Ownership of this memory will be transferred to the tevent library and
+ * *pp_private_data will be set to NULL on successful completion of
+ * the call. Set pp_private to NULL if no parameter transfer
+ * needed (a pure callback). This is an asynchronous request, caller
+ * does not wait for callback to be completed before returning.
+ *
+ * @param[in]  tp               The tevent_thread_proxy to use.
+ *
+ * @param[in]  pp_im            Pointer to immediate event pointer.
+ *
+ * @param[in]  handler          The function that will be called.
+ *
+ * @param[in]  pp_private_data  The talloced memory to transfer.
+ *
+ * @see tevent_thread_proxy_create()
+ */
+void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
+				struct tevent_immediate **pp_im,
+				tevent_immediate_handler_t handler,
+				void *pp_private_data);
+
 #ifdef TEVENT_DEPRECATED
 #ifndef _DEPRECATED_
 #if (__GNUC__ >= 3) && (__GNUC_MINOR__ >= 1 )
diff --git a/lib/tevent/tevent_threads.c b/lib/tevent/tevent_threads.c
new file mode 100644
index 0000000..2b046c2
--- /dev/null
+++ b/lib/tevent/tevent_threads.c
@@ -0,0 +1,377 @@
+/*
+   tevent event library.
+
+   Copyright (C) Jeremy Allison 2015
+
+     ** NOTE! The following LGPL license applies to the tevent
+     ** library. This does NOT imply that all of Samba is released
+     ** under the LGPL
+
+   This library is free software; you can redistribute it and/or
+   modify it under the terms of the GNU Lesser General Public
+   License as published by the Free Software Foundation; either
+   version 3 of the License, or (at your option) any later version.
+
+   This library is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   Lesser General Public License for more details.
+
+   You should have received a copy of the GNU Lesser General Public
+   License along with this library; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "replace.h"
+#include "system/filesys.h"
+#include "talloc.h"
+#include "tevent.h"
+#include "tevent_internal.h"
+#include "tevent_util.h"
+
+#if defined(HAVE_PTHREAD)
+#include <pthread.h>
+
+struct tevent_immediate_list {
+	struct tevent_immediate_list *next, *prev;
+	tevent_immediate_handler_t handler;
+	struct tevent_immediate *im;
+	void *private_ptr;
+};
+
+struct tevent_thread_proxy {
+	pthread_mutex_t mutex;
+	struct tevent_context *dest_ev_ctx;
+	int read_fd;
+	int write_fd;
+	struct tevent_fd *pipe_read_fde;
+	/* Pending events list. */
+	struct tevent_immediate_list *im_list;
+	/* Completed events list. */
+	struct tevent_immediate_list *tofree_im_list;
+	struct tevent_immediate *free_im;
+};
+
+static void free_im_list(struct tevent_immediate_list **pp_list_head)
+{
+	struct tevent_immediate_list *im_entry;
+	struct tevent_immediate_list *im_next;
+
+	for(im_entry = *pp_list_head; im_entry; im_entry = im_next) {
+		im_next = im_entry->next;
+		DLIST_REMOVE(*pp_list_head, im_entry);
+		TALLOC_FREE(im_entry);
+	}
+}
+
+static void free_list_handler(struct tevent_context *ev,
+				struct tevent_immediate *im,
+				void *private_ptr)
+{
+	struct tevent_thread_proxy *tp =
+		talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
+	int ret;
+
+	ret = pthread_mutex_lock(&tp->mutex);
+	if (ret != 0) {
+		abort();
+		/* Notreached. */
+		return;
+	}
+
+	free_im_list(&tp->tofree_im_list);
+
+	ret = pthread_mutex_unlock(&tp->mutex);
+	if (ret != 0) {
+		abort();
+		/* Notreached. */
+		return;
+	}
+}
+
+static void schedule_immediate_functions(struct tevent_thread_proxy *tp)
+{
+	struct tevent_immediate_list *im_entry;
+	struct tevent_immediate_list *im_next;
+
+	for(im_entry = tp->im_list; im_entry; im_entry = im_next) {
+		im_next = im_entry->next;
+		DLIST_REMOVE(tp->im_list, im_entry);
+
+		tevent_schedule_immediate(im_entry->im,
+					tp->dest_ev_ctx,
+					im_entry->handler,
+					im_entry->private_ptr);
+
+		/* Move from pending list to free list. */
+		DLIST_ADD(tp->tofree_im_list, im_entry);
+	}
+	if (tp->tofree_im_list != NULL) {
+		/*
+		 * Once the current immediate events
+		 * are processed, we need to reshedule
+		 * ourselves to free them. This works
+		 * as tevent_schedule_immediate()
+		 * always adds events to the *END* of
+		 * the immediate events list.
+		 */
+		tevent_schedule_immediate(tp->free_im,
+					tp->dest_ev_ctx,
+					free_list_handler,
+					tp);
+	}
+}
+
+static void pipe_read_handler(struct tevent_context *ev,
+				struct tevent_fd *fde,
+				uint16_t flags,
+				void *private_ptr)
+{
+	struct tevent_thread_proxy *tp =
+		talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
+	ssize_t len = 64;
+	int ret;
+
+	ret = pthread_mutex_lock(&tp->mutex);
+	if (ret != 0) {
+		abort();
+		/* Notreached. */
+		return;
+	}
+
+	/*
+	 * Clear out all data in the pipe. We
+	 * don't really care if this returns -1.
+	 */
+	while (len == 64) {
+		char buf[64];
+		len = read(tp->read_fd, buf, 64);
+	};
+
+	schedule_immediate_functions(tp);
+
+	ret = pthread_mutex_unlock(&tp->mutex);
+	if (ret != 0) {
+		abort();
+		/* Notreached. */
+		return;
+	}
+}
+
+static int tevent_thread_proxy_destructor(struct tevent_thread_proxy *tp)
+{
+	int ret;
+
+	ret = pthread_mutex_lock(&tp->mutex);
+	if (ret != 0) {
+		abort();
+		/* Notreached. */
+		return 0;
+	}
+
+	TALLOC_FREE(tp->pipe_read_fde);
+
+	if (tp->read_fd != -1) {
+		(void)close(tp->read_fd);
+		tp->read_fd = -1;
+	}
+	if (tp->write_fd != -1) {
+		(void)close(tp->write_fd);
+		tp->write_fd = -1;
+	}
+
+	/* Hmmm. It's probably an error if we get here with
+	   any non-NULL immediate entries.. */
+
+	free_im_list(&tp->im_list);
+	free_im_list(&tp->tofree_im_list);
+
+	TALLOC_FREE(tp->free_im);
+
+	ret = pthread_mutex_unlock(&tp->mutex);
+	if (ret != 0) {
+		abort();
+		/* Notreached. */
+		return 0;
+	}
+
+	ret = pthread_mutex_destroy(&tp->mutex);
+	if (ret != 0) {
+		abort();
+		/* Notreached. */
+		return 0;
+	}
+
+	return 0;
+}
+
+/*
+ * Create a struct that can be passed to other threads
+ * to allow them to signal the struct tevent_context *
+ * passed in.
+ */
+
+struct tevent_thread_proxy *tevent_thread_proxy_create(
+		struct tevent_context *dest_ev_ctx)
+{
+	int ret;
+	int pipefds[2];
+	struct tevent_thread_proxy *tp = talloc_zero(dest_ev_ctx,
+					struct tevent_thread_proxy);
+	if (tp == NULL) {
+		return NULL;
+	}
+
+	ret = pthread_mutex_init(&tp->mutex, NULL);
+	if (ret != 0) {
+		goto fail;
+	}
+
+	tp->dest_ev_ctx = dest_ev_ctx;
+	tp->read_fd = -1;
+	tp->write_fd = -1;
+
+	talloc_set_destructor(tp, tevent_thread_proxy_destructor);
+
+	ret = pipe(pipefds);
+	if (ret == -1) {
+		goto fail;
+	}
+
+	tp->read_fd = pipefds[0];
+	tp->write_fd = pipefds[1];
+
+	ret = ev_set_blocking(pipefds[0], false);
+	if (ret != 0) {
+		goto fail;
+	}
+	ret = ev_set_blocking(pipefds[1], false);
+	if (ret != 0) {
+		goto fail;
+	}
+	if (!ev_set_close_on_exec(pipefds[0])) {
+		goto fail;
+	}
+	if (!ev_set_close_on_exec(pipefds[1])) {
+		goto fail;
+	}
+
+	tp->pipe_read_fde = tevent_add_fd(dest_ev_ctx,
+				tp,
+				tp->read_fd,
+				TEVENT_FD_READ,
+				pipe_read_handler,
+				tp);
+
+	/*
+	 * Create an immediate event to free
+	 * completed lists.
+	 */
+	tp->free_im = tevent_create_immediate(tp);
+	if (tp->free_im == NULL) {
+		goto fail;
+	}
+
+	return tp;
+
+  fail:
+
+	TALLOC_FREE(tp);
+	return NULL;
+}
+
+/*
+ * This function schedules an immediate event to be called with argument
+ * *pp_private in the thread context of dest_ev_ctx. Caller doesn't
+ * wait for activation to take place, this is simply fire-and-forget.
+ *
+ * pp_im must be a pointer to an immediate event talloced on
+ * a context owned by the calling thread, or the NULL context.
+ * Ownership of *pp_im will be transfered to the tevent library.
+ *
+ * pp_private can be null, or contents of *pp_private must be
+ * talloc'ed memory on a context owned by the calling thread
+ * or the NULL context. If non-null, ownership of *pp_private will
+ * be transfered to the tevent library.
+ *
+ * If you want to return a message, have the destination use the
+ * same function call to send back to the caller.
+ */
+
+
+void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
+				struct tevent_immediate **pp_im,
+				tevent_immediate_handler_t handler,
+				void *pp_private_data)
+{
+	struct tevent_immediate_list *im_entry;
+	int ret;
+	char c;
+
+	ret = pthread_mutex_lock(&tp->mutex);
+	if (ret != 0) {
+		abort();
+		/* Notreached. */
+		return;
+	}
+
+	if (tp->write_fd == -1) {
+		/* In the process of being destroyed. Ignore. */
+		goto fail;
+	}
+
+	/* Create a new immediate_list entry. MUST BE ON THE NULL CONTEXT */
+	im_entry = talloc_zero(NULL, struct tevent_immediate_list);
+	if (im_entry == NULL) {
+		goto fail;
+	}
+
+	im_entry->handler = handler;
+	im_entry->im = talloc_move(im_entry, pp_im);
+
+	if (pp_private_data != NULL) {
+		void **pptr = (void **)pp_private_data;
+		im_entry->private_ptr = talloc_move(im_entry, pptr);
+	}
+
+	DLIST_ADD(tp->im_list, im_entry);
+
+	/*
+	 * Unlocking before notifying avoids a potential wakeup of the
+	 * signalled thread with the mutex still locked
+	 */
+	ret = pthread_mutex_unlock(&tp->mutex);
+	if (ret != 0) {
+		abort();
+		/* Notreached. */
+	}
+
+	/* And notify the dest_ev_ctx to wake up. */
+	c = '\0';
+	(void)write(tp->write_fd, &c, 1);
+
+	return;
+
+  fail:
+
+	ret = pthread_mutex_unlock(&tp->mutex);
+	if (ret != 0) {
+		abort();
+		/* Notreached. */
+	}
+}
+#else
+/* !HAVE_PTHREAD */
+struct tevent_thread_proxy *tevent_thread_proxy_create(
+		struct tevent_context *dest_ev_ctx)
+{
+	return NULL;
+}
+
+void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
+				struct tevent_immediate **pp_im,
+				tevent_immediate_handler_t handler,
+				void *pp_private_data)
+{
+	;
+}
+#endif
diff --git a/lib/tevent/wscript b/lib/tevent/wscript
index 827094c..4c5fe0c 100755
--- a/lib/tevent/wscript
+++ b/lib/tevent/wscript
@@ -1,7 +1,7 @@
 #!/usr/bin/env python
 
 APPNAME = 'tevent'
-VERSION = '0.9.25'
+VERSION = '0.9.26'
 
 blddir = 'bin'
 
@@ -83,7 +83,7 @@ def build(bld):
 
     SRC = '''tevent.c tevent_debug.c tevent_fd.c tevent_immediate.c
              tevent_queue.c tevent_req.c tevent_select.c
-             tevent_poll.c
+             tevent_poll.c tevent_threads.c
              tevent_signal.c tevent_standard.c tevent_timed.c tevent_util.c tevent_wakeup.c'''
 
     if bld.CONFIG_SET('HAVE_EPOLL'):
-- 
2.6.0.rc0.131.gf624c3d


From b721cb718c764a745c633d91deddf504ab560ebb Mon Sep 17 00:00:00 2001
From: Jeremy Allison <jra at samba.org>
Date: Fri, 24 Jul 2015 08:50:31 -0700
Subject: [PATCH 2/4] lib: tevent: Initial test of tevent threaded context
 code.

Signed-off-by: Jeremy Allison <jra at samba.org>
Signed-off-by: Ralph Boehme <slow at samba.org>
---
 lib/tevent/testsuite.c | 125 +++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 125 insertions(+)

diff --git a/lib/tevent/testsuite.c b/lib/tevent/testsuite.c
index c63c878..f8f65f8 100644
--- a/lib/tevent/testsuite.c
+++ b/lib/tevent/testsuite.c
@@ -808,6 +808,127 @@ static bool test_event_context_threaded(struct torture_context *test,
 	return true;
 }
 
+#define NUM_TEVENT_THREADS 100
+
+/* Ugly, but needed for torture_comment... */
+static struct torture_context *thread_test_ctx;
+static pthread_t thread_map[NUM_TEVENT_THREADS];
+static unsigned thread_counter;
+
+/* Called in master thread context */
+static void callback_nowait(struct tevent_context *ev,
+				struct tevent_immediate *im,
+				void *private_ptr)
+{
+	pthread_t *thread_id_ptr =
+		talloc_get_type_abort(private_ptr, pthread_t);
+	unsigned i;
+
+	for (i = 0; i < NUM_TEVENT_THREADS; i++) {
+		if (pthread_equal(*thread_id_ptr,
+				thread_map[i])) {
+			break;
+		}
+	}
+	torture_comment(thread_test_ctx,
+			"Callback %u from thread %u\n",
+			thread_counter,
+			i);
+	thread_counter++;
+}
+
+/* Blast the master tevent_context with a callback, no waiting. */
+static void *thread_fn_nowait(void *private_ptr)
+{
+	struct tevent_thread_proxy *master_tp =
+		talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
+	struct tevent_immediate *im;
+	pthread_t *thread_id_ptr;
+
+	im = tevent_create_immediate(NULL);
+	if (im == NULL) {
+		return NULL;
+	}
+	thread_id_ptr = talloc(NULL, pthread_t);
+	if (thread_id_ptr == NULL) {
+		return NULL;
+	}
+	*thread_id_ptr = pthread_self();
+
+	tevent_thread_proxy_schedule(master_tp,
+				&im,
+				callback_nowait,
+				&thread_id_ptr);
+	return NULL;
+}
+
+static void timeout_fn(struct tevent_context *ev,
+			struct tevent_timer *te,
+			struct timeval tv, void *p)
+{
+	thread_counter = NUM_TEVENT_THREADS * 10;
+}
+
+static bool test_multi_tevent_threaded(struct torture_context *test,
+					const void *test_data)
+{
+	unsigned i;
+	struct tevent_context *master_ev;
+	struct tevent_thread_proxy *tp;
+
+	talloc_disable_null_tracking();
+
+	/* Ugly global stuff. */
+	thread_test_ctx = test;
+	thread_counter = 0;
+
+	master_ev = tevent_context_init(NULL);
+	if (master_ev == NULL) {
+		return false;
+	}
+	tevent_set_debug_stderr(master_ev);
+
+	tp = tevent_thread_proxy_create(master_ev);
+	if (tp == NULL) {
+		torture_fail(test,
+			talloc_asprintf(test,
+				"tevent_thread_proxy_create failed\n"));
+		talloc_free(master_ev);
+		return false;
+	}
+
+	for (i = 0; i < NUM_TEVENT_THREADS; i++) {
+		int ret = pthread_create(&thread_map[i],
+				NULL,
+				thread_fn_nowait,
+				tp);
+		if (ret != 0) {
+			torture_fail(test,
+				talloc_asprintf(test,
+					"Failed to create thread %i, %d\n",
+					i, ret));
+			return false;
+		}
+	}
+
+	/* Ensure we don't wait more than 10 seconds. */
+	tevent_add_timer(master_ev,
+			master_ev,
+			timeval_current_ofs(10,0),
+			timeout_fn,
+			NULL);
+
+	while (thread_counter < NUM_TEVENT_THREADS) {
+		int ret = tevent_loop_once(master_ev);
+		torture_assert(test, ret == 0, "tevent_loop_once failed");
+	}
+
+	torture_assert(test, thread_counter == NUM_TEVENT_THREADS,
+		"thread_counter fail\n");
+
+	talloc_free(master_ev);
+	return true;
+}
 #endif
 
 struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx)
@@ -841,6 +962,10 @@ struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx)
 	torture_suite_add_simple_tcase_const(suite, "threaded_poll_mt",
 					     test_event_context_threaded,
 					     NULL);
+
+	torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded",
+					     test_multi_tevent_threaded,
+					     NULL);
 #endif
 
 	return suite;
-- 
2.6.0.rc0.131.gf624c3d


From 52a6693917e822dc6d13586461bdc656e5c0f81c Mon Sep 17 00:00:00 2001
From: Jeremy Allison <jra at samba.org>
Date: Fri, 24 Jul 2015 09:27:21 -0700
Subject: [PATCH 3/4] lib: tevent: tests: Add a second thread test that does
 request/reply.

Both tests run cleanly with valgrind --tool=drd.

Signed-off-by: Jeremy Allison <jra at samba.org>
Signed-off-by: Ralph Boehme <slow at samba.org>
---
 lib/tevent/testsuite.c | 189 +++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 189 insertions(+)

diff --git a/lib/tevent/testsuite.c b/lib/tevent/testsuite.c
index f8f65f8..1937f90 100644
--- a/lib/tevent/testsuite.c
+++ b/lib/tevent/testsuite.c
@@ -929,6 +929,190 @@ static bool test_multi_tevent_threaded(struct torture_context *test,
 	talloc_free(master_ev);
 	return true;
 }
+
+struct reply_state {
+	struct tevent_thread_proxy *reply_tp;
+	pthread_t thread_id;
+	int *p_finished;
+};
+
+static void thread_timeout_fn(struct tevent_context *ev,
+			struct tevent_timer *te,
+			struct timeval tv, void *p)
+{
+	int *p_finished = (int *)p;
+
+	*p_finished = 2;
+}
+
+/* Called in child-thread context */
+static void thread_callback(struct tevent_context *ev,
+				struct tevent_immediate *im,
+				void *private_ptr)
+{
+	struct reply_state *rsp =
+		talloc_get_type_abort(private_ptr, struct reply_state);
+
+	talloc_steal(ev, rsp);
+	*rsp->p_finished = 1;
+}
+
+/* Called in master thread context */
+static void master_callback(struct tevent_context *ev,
+				struct tevent_immediate *im,
+				void *private_ptr)
+{
+	struct reply_state *rsp =
+		talloc_get_type_abort(private_ptr, struct reply_state);
+	unsigned i;
+
+	talloc_steal(ev, rsp);
+
+	for (i = 0; i < NUM_TEVENT_THREADS; i++) {
+		if (pthread_equal(rsp->thread_id,
+				thread_map[i])) {
+			break;
+		}
+	}
+	torture_comment(thread_test_ctx,
+			"Callback %u from thread %u\n",
+			thread_counter,
+			i);
+	thread_counter++;
+
+	/* Now reply to the thread ! */
+	tevent_thread_proxy_schedule(rsp->reply_tp,
+				&im,
+				thread_callback,
+				&rsp);
+}
+
+static void *thread_fn_1(void *private_ptr)
+{
+	struct tevent_thread_proxy *master_tp =
+		talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
+	struct tevent_thread_proxy *tp;
+	struct tevent_immediate *im;
+	struct tevent_context *ev;
+	struct reply_state *rsp;
+	int finished = 0;
+	int ret;
+
+	ev = tevent_context_init(NULL);
+	if (ev == NULL) {
+		return NULL;
+	}
+
+	tp = tevent_thread_proxy_create(ev);
+	if (tp == NULL) {
+		talloc_free(ev);
+		return NULL;
+	}
+
+	im = tevent_create_immediate(ev);
+	if (im == NULL) {
+		talloc_free(ev);
+		return NULL;
+	}
+
+	rsp = talloc(ev, struct reply_state);
+	if (rsp == NULL) {
+		talloc_free(ev);
+		return NULL;
+	}
+
+	rsp->thread_id = pthread_self();
+	rsp->reply_tp = tp;
+	rsp->p_finished = &finished;
+
+	/* Introduce a little randomness into the mix.. */
+	usleep(random() % 7000);
+
+	tevent_thread_proxy_schedule(master_tp,
+				&im,
+				master_callback,
+				&rsp);
+
+	/* Ensure we don't wait more than 10 seconds. */
+	tevent_add_timer(ev,
+			ev,
+			timeval_current_ofs(10,0),
+			thread_timeout_fn,
+			&finished);
+
+	while (finished == 0) {
+		ret = tevent_loop_once(ev);
+		assert(ret == 0);
+	}
+
+	if (finished > 1) {
+		/* Timeout ! */
+		abort();
+	}
+
+	talloc_free(ev);
+	return master_tp;
+}
+
+static bool test_multi_tevent_threaded_1(struct torture_context *test,
+					const void *test_data)
+{
+	unsigned i;
+	struct tevent_context *master_ev;
+	struct tevent_thread_proxy *master_tp;
+	int ret;
+
+	talloc_disable_null_tracking();
+
+	/* Ugly global stuff. */
+	thread_test_ctx = test;
+	thread_counter = 0;
+
+	master_ev = tevent_context_init(NULL);
+	if (master_ev == NULL) {
+		return false;
+	}
+	tevent_set_debug_stderr(master_ev);
+
+	master_tp = tevent_thread_proxy_create(master_ev);
+	if (master_tp == NULL) {
+		torture_fail(test,
+			talloc_asprintf(test,
+				"tevent_thread_proxy_create failed\n"));
+		talloc_free(master_ev);
+		return false;
+	}
+
+	for (i = 0; i < NUM_TEVENT_THREADS; i++) {
+		ret = pthread_create(&thread_map[i],
+				NULL,
+				thread_fn_1,
+				master_tp);
+		if (ret != 0) {
+			torture_fail(test,
+				talloc_asprintf(test,
+					"Failed to create thread %i, %d\n",
+					i, ret));
+				return false;
+		}
+	}
+
+	while (thread_counter < NUM_TEVENT_THREADS) {
+		ret = tevent_loop_once(master_ev);
+		torture_assert(test, ret == 0, "tevent_loop_once failed");
+	}
+
+	/* Wait for all the threads to finish - join 'em. */
+	for (i = 0; i < NUM_TEVENT_THREADS; i++) {
+		void *retval;
+		ret = pthread_join(thread_map[i], &retval);
+		torture_assert(test, ret == 0, "pthread_join failed");
+		torture_assert(test, retval == master_tp, "thread failed");
+	}
+
+	talloc_free(master_ev);
+	return true;
+}
 #endif
 
 struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx)
@@ -966,6 +1150,11 @@ struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx)
 	torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded",
 					     test_multi_tevent_threaded,
 					     NULL);
+
+	torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded_1",
+					     test_multi_tevent_threaded_1,
+					     NULL);
+
 #endif
 
 	return suite;
-- 
2.6.0.rc0.131.gf624c3d


From da0d8c09d47c8276587a97d25ebaa9c2d4b9181c Mon Sep 17 00:00:00 2001
From: Jeremy Allison <jra at samba.org>
Date: Wed, 22 Jul 2015 11:52:06 -0700
Subject: [PATCH 4/4] lib: tevent: docs: Add tutorial on thread usage.

Signed-off-by: Jeremy Allison <jra at samba.org>
Signed-off-by: Ralph Boehme <slow at samba.org>
---
 lib/tevent/doc/tevent_thread.dox   | 309 +++++++++++++++++++++++++++++++++++++
 lib/tevent/doc/tevent_tutorial.dox |   2 +
 2 files changed, 311 insertions(+)
 create mode 100644 lib/tevent/doc/tevent_thread.dox

diff --git a/lib/tevent/doc/tevent_thread.dox b/lib/tevent/doc/tevent_thread.dox
new file mode 100644
index 0000000..c1c2f70
--- /dev/null
+++ b/lib/tevent/doc/tevent_thread.dox
@@ -0,0 +1,309 @@
+/**
+ at page tevent_context Chapter 6: Tevent with threads
+
+ at section context Tevent with threads
+
+In order to use tevent with threads, you must first understand
+how to use the talloc library in threaded programs. For more
+information about working with talloc, please visit <a
+href="http://talloc.samba.org/">talloc website</a> where tutorial and
+documentation are located.
+
+If a tevent context structure is talloced from a NULL, thread-safe talloc
+context, then it can be safe to use in a threaded program. The function
+<code>talloc_disable_null_tracking()</code> <b>must</b> be called from the initial
+program thread before any talloc calls are made to ensure talloc is thread-safe.
+
+Each thread must create it's own tevent context structure as follows
+<code>tevent_context_init(NULL)</code> and no talloc memory contexts
+can be shared between threads.
+
+Separate threads using tevent in this way can communicate
+by writing data into file descriptors that are being monitored
+by a tevent context on another thread. For example (simplified
+with no error handling):
+
+ at code
+Main thread:
+
+main()
+{
+	talloc_disable_null_tracking();
+
+	struct tevent_context *master_ev = tevent_context_init(NULL);
+	void *mem_ctx = talloc_new(master_ev);
+
+	// Create file descriptor to monitor.
+	int pipefds[2];
+
+	pipe(pipefds);
+
+	struct tevent_fd *fde = tevent_add_fd(master_ev,
+				mem_ctx,
+				pipefds[0], // read side of pipe
+				TEVENT_FD_READ,
+				pipe_read_handler, // callback function
+				private_data_pointer);
+
+	// Create sub thread, pass pipefds[1] write side of pipe to it.
+	// The above code not shown here..
+
+	// Process events.
+	tevent_loop_wait(master_ev);
+
+	// Cleanup if loop exits.
+	talloc_free(master_ev);
+}
+
+ at endcode
+
+When the subthread writes to pipefds[1], the function
+<code>pipe_read_handler()</code> will be called in the main thread.
+
+ at subsection More sophisticated use
+
+A popular way to use an event library within threaded programs
+is to allow a sub-thread to asynchronously schedule a tevent_immediate
+function call from the event loop of another thread. This can be built
+out of the basic functions and isolation mechanisms of tevent,
+but tevent also comes with some utility functions that make
+this easier, so long as you understand the limitations that
+using threads with talloc and tevent impose.
+
+To allow a tevent context to receive an asynchronous tevent_immediate
+function callback from another thread, create a struct tevent_thread_proxy *
+by calling @code
+
+struct tevent_thread_proxy *tevent_thread_proxy_create(
+                struct tevent_context *dest_ev_ctx);
+
+ at endcode
+
+This function allocates the internal data structures to
+allow asynchronous callbacks as a talloc child of the
+struct tevent_context *, and returns a struct tevent_thread_proxy *
+that can be passed to another thread.
+
+When you have finished receiving asynchronous callbacks, simply
+talloc_free the struct tevent_thread_proxy *, or talloc_free
+the struct tevent_context *, which will deallocate the resources
+used.
+
+To schedule an asynchronous tevent_immediate function call from one
+thread on the tevent loop of another thread, use
+ at code
+
+void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
+                                struct tevent_immediate **pp_im,
+                                tevent_immediate_handler_t handler,
+                                void **pp_private_data);
+
+ at endcode
+
+This function causes the function <code>handler()</code>
+to be invoked as a tevent_immediate callback from the event loop
+of the thread that created the struct tevent_thread_proxy *
+(so the owning <code>struct tevent_context *</code> should be
+long-lived and not in the process of being torn down).
+
+The <code>struct tevent_immediate **pp_im</code> passed into this function
+should be a struct tevent_immediate * allocated on a talloc context
+local to this thread, and will be reparented via talloc_move
+to be owned by <code>struct tevent_thread_proxy *tp</code>.
+<code>*pp_im</code> will be set to NULL on successful scheduling
+of the tevent_immediate call.
+
+<code>handler()</code> will be called as a normal tevent_immediate
+callback from the <code>struct tevent_context *</code> of the destination
+event loop that created the <code>struct tevent_thread_proxy *</code>
+
+Returning from this functions does not mean that the <code>handler</code>
+has been invoked, merely that it has been scheduled to be called in the
+destination event loop.
+
+Because the calling thread does not wait for the
+callback to be scheduled and run on the destination
+thread, this is a fire-and-forget call. If you wish
+confirmation of the <code>handler()</code> being
+successfully invoked, you must ensure it replies to the
+caller in some way.
+
+Because of asynchronous nature of this call, the nature
+of the parameter passed to the destination thread has some
+restructions. If you don't need parameters, merely pass
+<code>NULL</code> as the value of
+<code>void **pp_private_data</code>.
+
+If you wish to pass a pointer to data between the threads,
+it <b>MUST</b> be a pointer to a talloced pointer, which is
+not part of a talloc-pool, and it must not have a destructor
+attached. The ownership of the memory pointed to will
+be passed from the calling thread to the tevent library,
+and if the receiving thread does not talloc-reparent
+it to its own contexts, it will be freed once the
+<code>handler</code> is called.
+
+On success, <code>*pp_private</code> will be <code>NULL</code>
+to signify the talloc memory ownership has been moved.
+
+In practice for message passing between threads in
+event loops these restrictions are not very onerous.
+
+The easiest way to to a request-reply pair between
+tevent loops on different threads is to pass the
+parameter block of memory back and forth using
+a reply <code>tevent_thread_proxy_schedule()</code>
+call.
+
+Here is an example (without error checking for
+simplicity):
+
+ at code
+------------------------------------------------
+// Master thread.
+
+main()
+{
+	// Make talloc thread-safe.
+
+	talloc_disable_null_tracking();
+
+	// Create the master event context.
+
+	struct tevent_context *master_ev = tevent_context_init(NULL);
+
+	// Create the master thread proxy to allow it to receive
+	// async callbacks from other threads.
+
+	struct tevent_thread_proxy *master_tp =
+			tevent_thread_proxy_create(master_ev);
+
+	// Create sub-threads, passing master_tp in
+	// some way to them.
+	// This code not shown..
+
+	// Process events.
+	// Function master_callback() below
+	// will be invoked on this thread on
+	// master_ev event context.
+
+	tevent_loop_wait(master_ev);
+
+	// Cleanup if loop exits.
+
+	talloc_free(master_ev);
+}
+
+// Data passed between threads.
+struct reply_state {
+	struct tevent_thread_proxy *reply_tp;
+	pthread_t thread_id;
+	bool *p_finished;
+};
+
+// Callback Called in child thread context.
+
+static void thread_callback(struct tevent_context *ev,
+                                struct tevent_immediate *im,
+                                void *private_ptr)
+{
+	// Move the ownership of what private_ptr
+	// points to from the tevent library back to this thread.
+
+	struct reply_state *rsp =
+		talloc_get_type_abort(private_ptr, struct reply_state);
+
+	talloc_steal(ev, rsp);
+
+	*rsp->p_finished = true;
+
+	// im will be talloc_freed on return from this call.
+	// but rsp will not.
+}
+
+// Callback Called in master thread context.
+
+static void master_callback(struct tevent_context *ev,
+                                struct tevent_immediate *im,
+                                void *private_ptr)
+{
+	// Move the ownership of what private_ptr
+	// points to from the tevent library to this thread.
+
+	struct reply_state *rsp =
+		talloc_get_type_abort(private_ptr, struct reply_state);
+
+	talloc_steal(ev, rsp);
+
+	printf("Callback from thread %s\n", thread_id_to_string(rsp->thread_id));
+
+	/* Now reply to the thread ! */
+        tevent_thread_proxy_schedule(rsp->reply_tp,
+                                &im,
+                                thread_callback,
+                                &rsp);
+
+	// Note - rsp and im are now NULL as the tevent library
+	// owns the memory.
+}
+
+// Child thread.
+
+static void *thread_fn(void *private_ptr)
+{
+	struct tevent_thread_proxy *master_tp =
+		talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
+	bool finished = false;
+	int ret;
+
+	// Create our own event context.
+
+	struct tevent_context *ev = tevent_context_init(NULL);
+
+	// Create the local thread proxy to allow us to receive
+	// async callbacks from other threads.
+
+	struct tevent_thread_proxy *local_tp =
+			tevent_thread_proxy_create(master_ev);
+
+	// Setup the data to send.
+
+	struct reply_state *rsp = talloc(ev, struct reply_state);
+
+	rsp->reply_tp = local_tp;
+	rsp->thread_id = pthread_self();
+	rsp->p_finished = &finished;
+
+	// Create the immediate event to use.
+
+	struct tevent_immediate *im = tevent_create_immediate(ev);
+
+	// Call the master thread.
+
+	tevent_thread_proxy_schedule(master_tp,
+				&im,
+				master_callback,
+				&rsp);
+
+	// Note - rsp and im are now NULL as the tevent library
+	// owns the memory.
+
+	// Wait for the reply.
+
+	while (!finished) {
+		tevent_loop_once(ev);
+	}
+
+	// Cleanup.
+
+	talloc_free(ev);
+	return NULL;
+}
+
+ at endcode
+
+Note this doesn't have to be a master-subthread communication.
+Any thread that has access to the <code>struct tevent_thread_proxy *</code>
+pointer of another thread that has called <code>tevent_thread_proxy_create()
+</code> can send an async tevent_immediate request.
+*/
diff --git a/lib/tevent/doc/tevent_tutorial.dox b/lib/tevent/doc/tevent_tutorial.dox
index 9f01fa1..207a244 100644
--- a/lib/tevent/doc/tevent_tutorial.dox
+++ b/lib/tevent/doc/tevent_tutorial.dox
@@ -17,4 +17,6 @@ Tutorial describing working with tevent library.
 
 @subpage tevent_queue
 
+ at subpage tevent_thread
+
 */
-- 
2.6.0.rc0.131.gf624c3d



More information about the samba-technical mailing list