[PATCH] tevent and threads - infrastructure improvements - version #2

Jeremy Allison jra at samba.org
Fri Jul 24 17:16:15 UTC 2015


On Thu, Jul 23, 2015 at 04:50:37PM -0700, Jeremy Allison wrote:
> 
> FYI. I now have a working implementation of this
> API - passes valgrind memcheck and drd !
> 
> Hurrah for me :-).
> 
> Will post an updated patch once I've finished
> updating the tutorial.

Here it is. Passes valgrind --tool=drd and
valgrind --tool=memcheck.

Metze please let me know if this is what
you had in mind.

Everyone else just review :-).

Cheers,

	Jeremy.
-------------- next part --------------
From 0e63c14834b0204b67abf75afdbf4bf5c59e27df Mon Sep 17 00:00:00 2001
From: Jeremy Allison <jra at samba.org>
Date: Thu, 23 Jul 2015 15:23:50 -0700
Subject: [PATCH 1/4] lib: tevent: Initial checkin of threaded tevent context
 calling code.

Adds 2 new functions:

struct tevent_thread_proxy *tevent_thread_proxy_create(
                struct tevent_context *dest_ev_ctx);

void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
		struct tevent_immediate **pp_im,
		tevent_immediate_handler_t handler,
		void *pp_private_data);

Brief doc included. Tests, docs and tutorial to follow.

Signed-off-by: Jeremy Allison <jra at samba.org>
---
 lib/tevent/tevent.h         |  48 ++++++
 lib/tevent/tevent_threads.c | 365 ++++++++++++++++++++++++++++++++++++++++++++
 lib/tevent/wscript          |   4 +-
 3 files changed, 415 insertions(+), 2 deletions(-)
 create mode 100644 lib/tevent/tevent_threads.c

diff --git a/lib/tevent/tevent.h b/lib/tevent/tevent.h
index b6c39d1..5c739d5 100644
--- a/lib/tevent/tevent.h
+++ b/lib/tevent/tevent.h
@@ -39,6 +39,7 @@ struct tevent_fd;
 struct tevent_timer;
 struct tevent_immediate;
 struct tevent_signal;
+struct tevent_thread_proxy;
 
 /**
  * @defgroup tevent The tevent API
@@ -1698,6 +1699,53 @@ typedef int (*tevent_nesting_hook)(struct tevent_context *ev,
 				   bool begin,
 				   void *stack_ptr,
 				   const char *location);
+
+/**
+ * @brief Create a tevent_thread_proxy for message passing between threads.
+ *
+ * The tevent_context must have been allocated on the NULL
+ * talloc context, and talloc_disable_null_tracking() must
+ * have been called.
+ *
+ * @param[in]  dest_ev_ctx      The tevent_context to receive events.
+ *
+ * @see tevent_thread_proxy_schedule()
+ */
+struct tevent_thread_proxy *tevent_thread_proxy_create(
+                struct tevent_context *dest_ev_ctx);
+
+/**
+ * @brief Schedule an immediate event on an event context from another thread.
+ *
+ * Causes dest_ev_ctx, being run by another thread, to receive an
+ * immediate event calling the handler with the *pp_private parameter.
+ *
+ * *pp_im must be a pointer to an immediate event talloced on a context owned
+ * by the calling thread, or the NULL context. Ownership will
+ * be transferred to the tevent library and *pp_im will be returned as NULL.
+ *
+ * *pp_private_data must be a talloced area of memory with no destructors.
+ * Ownership of this memory will be transferred to the tevent library and
+ * *pp_private_data will be set to NULL on successful completion of
+ * the call. Set pp_private to NULL if no parameter transfer
+ * needed (a pure callback). This is an asynchronous request, caller
+ * does not wait for callback to be completed before returning.
+ *
+ * @param[in]  tp               The tevent_thread_proxy to use.
+ *
+ * @param[in]  pp_im            Pointer to immediate event pointer.
+ *
+ * @param[in]  handler          The function that will be called.
+ *
+ * @param[in]  pp_private_data  The talloced memory to transfer.
+ *
+ * @see tevent_thread_proxy_create()
+ */
+void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
+				struct tevent_immediate **pp_im,
+				tevent_immediate_handler_t handler,
+				void *pp_private_data);
+
 #ifdef TEVENT_DEPRECATED
 #ifndef _DEPRECATED_
 #if (__GNUC__ >= 3) && (__GNUC_MINOR__ >= 1 )
diff --git a/lib/tevent/tevent_threads.c b/lib/tevent/tevent_threads.c
new file mode 100644
index 0000000..7db8f47
--- /dev/null
+++ b/lib/tevent/tevent_threads.c
@@ -0,0 +1,365 @@
+/*
+   tevent event library.
+
+   Copyright (C) Jeremy Allison 2015
+
+     ** NOTE! The following LGPL license applies to the tevent
+     ** library. This does NOT imply that all of Samba is released
+     ** under the LGPL
+
+   This library is free software; you can redistribute it and/or
+   modify it under the terms of the GNU Lesser General Public
+   License as published by the Free Software Foundation; either
+   version 3 of the License, or (at your option) any later version.
+
+   This library is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   Lesser General Public License for more details.
+
+   You should have received a copy of the GNU Lesser General Public
+   License along with this library; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "replace.h"
+#include "system/filesys.h"
+#include "talloc.h"
+#include "tevent.h"
+#include "tevent_internal.h"
+#include "tevent_util.h"
+
+#if defined(HAVE_PTHREAD)
+#include <pthread.h>
+
+struct tevent_immediate_list {
+	struct tevent_immediate_list *next, *prev;
+	tevent_immediate_handler_t handler;
+	struct tevent_immediate *im;
+	void *private_ptr;
+};
+
+struct tevent_thread_proxy {
+	pthread_mutex_t mutex;
+	struct tevent_context *dest_ev_ctx;
+	int read_fd;
+	int write_fd;
+	struct tevent_fd *pipe_read_fde;
+	/* Pending events list. */
+	struct tevent_immediate_list *im_list;
+	/* Completed events list. */
+	struct tevent_immediate_list *tofree_im_list;
+	struct tevent_immediate *free_im;
+};
+
+static void free_im_list(struct tevent_immediate_list **pp_list_head)
+{
+	struct tevent_immediate_list *im_entry;
+	struct tevent_immediate_list *im_next;
+
+	for(im_entry = *pp_list_head; im_entry; im_entry = im_next) {
+		im_next = im_entry->next;
+		DLIST_REMOVE(*pp_list_head, im_entry);
+		TALLOC_FREE(im_entry);
+	}
+}
+
+static void free_list_handler(struct tevent_context *ev,
+				struct tevent_immediate *im,
+				void *private_ptr)
+{
+	struct tevent_thread_proxy *tp =
+		(struct tevent_thread_proxy *)private_ptr;
+	int ret;
+
+	ret = pthread_mutex_lock(&tp->mutex);
+	if (ret != 0) {
+		abort();
+		/* Notreached. */
+		return;
+	}
+
+	free_im_list(&tp->tofree_im_list);
+
+	ret = pthread_mutex_unlock(&tp->mutex);
+	if (ret != 0) {
+		abort();
+		/* Notreached. */
+		return;
+	}
+}
+
+static void schedule_immediate_functions(struct tevent_thread_proxy *tp)
+{
+	struct tevent_immediate_list *im_entry;
+	struct tevent_immediate_list *im_next;
+
+	for(im_entry = tp->im_list; im_entry; im_entry = im_next) {
+		im_next = im_entry->next;
+		DLIST_REMOVE(tp->im_list, im_entry);
+
+		tevent_schedule_immediate(im_entry->im,
+					tp->dest_ev_ctx,
+					im_entry->handler,
+					im_entry->private_ptr);
+
+		/* Move from pending list to free list. */
+		DLIST_ADD(tp->tofree_im_list, im_entry);
+	}
+	if (tp->tofree_im_list != NULL) {
+		/*
+		 * Once the current immediate events
+		 * are processed, we need to reshedule
+		 * ourselves to free them. This works
+		 * as tevent_schedule_immediate()
+		 * always adds events to the *END* of
+		 * the immediate events list.
+		 */
+		tevent_schedule_immediate(tp->free_im,
+					tp->dest_ev_ctx,
+					free_list_handler,
+					tp);
+	}
+}
+
+static void pipe_read_handler(struct tevent_context *ev,
+				struct tevent_fd *fde,
+				uint16_t flags,
+				void *private_ptr)
+{
+	struct tevent_thread_proxy *tp =
+		(struct tevent_thread_proxy *)private_ptr;
+	ssize_t len = 64;
+	int ret;
+
+	ret = pthread_mutex_lock(&tp->mutex);
+	if (ret != 0) {
+		abort();
+		/* Notreached. */
+		return;
+	}
+
+	/*
+	 * Clear out all data in the pipe. We
+	 * don't really care if this returns -1.
+	 */
+	while (len == 64) {
+		char buf[64];
+		len = read(tp->read_fd, buf, 64);
+	};
+
+	schedule_immediate_functions(tp);
+
+	ret = pthread_mutex_unlock(&tp->mutex);
+	if (ret != 0) {
+		abort();
+		/* Notreached. */
+		return;
+	}
+}
+
+static int tevent_thread_proxy_destructor(struct tevent_thread_proxy *tp)
+{
+	int ret;
+
+	ret = pthread_mutex_lock(&tp->mutex);
+	if (ret != 0) {
+		abort();
+		/* Notreached. */
+		return 0;
+	}
+
+	TALLOC_FREE(tp->pipe_read_fde);
+
+	if (tp->read_fd != -1) {
+		(void)close(tp->read_fd);
+		tp->read_fd = -1;
+	}
+	if (tp->write_fd != -1) {
+		(void)close(tp->write_fd);
+		tp->write_fd = -1;
+	}
+
+	/* Hmmm. It's probably an error if we get here with
+	   any non-NULL immediate entries.. */
+
+	free_im_list(&tp->im_list);
+	free_im_list(&tp->tofree_im_list);
+
+	TALLOC_FREE(tp->free_im);
+
+	ret = pthread_mutex_unlock(&tp->mutex);
+	if (ret != 0) {
+		abort();
+		/* Notreached. */
+		return 0;
+	}
+
+	ret = pthread_mutex_destroy(&tp->mutex);
+	if (ret != 0) {
+		abort();
+		/* Notreached. */
+		return 0;
+	}
+
+	return 0;
+}
+
+/*
+ * Create a struct that can be passed to other threads
+ * to allow them to signal the struct tevent_context *
+ * passed in.
+ */
+
+struct tevent_thread_proxy *tevent_thread_proxy_create(
+		struct tevent_context *dest_ev_ctx)
+{
+	int ret;
+	int pipefds[2];
+	struct tevent_thread_proxy *tp = talloc_zero(dest_ev_ctx,
+					struct tevent_thread_proxy);
+	if (tp == NULL) {
+		return NULL;
+	}
+
+	ret = pthread_mutex_init(&tp->mutex, NULL);
+	if (ret != 0) {
+		goto fail;
+	}
+
+	tp->dest_ev_ctx = dest_ev_ctx;
+	tp->read_fd = -1;
+	tp->write_fd = -1;
+
+	talloc_set_destructor(tp, tevent_thread_proxy_destructor);
+
+	ret = pipe(pipefds);
+	if (ret == -1) {
+		goto fail;
+	}
+
+	tp->read_fd = pipefds[0];
+	tp->write_fd = pipefds[1];
+
+	ret = ev_set_blocking(pipefds[0], false);
+	if (ret != 0) {
+		goto fail;
+	}
+	ret = ev_set_blocking(pipefds[1], false);
+	if (ret != 0) {
+		goto fail;
+	}
+	if (!ev_set_close_on_exec(pipefds[0])) {
+		goto fail;
+	}
+	if (!ev_set_close_on_exec(pipefds[1])) {
+		goto fail;
+	}
+
+	tp->pipe_read_fde = tevent_add_fd(dest_ev_ctx,
+				tp,
+				tp->read_fd,
+				TEVENT_FD_READ,
+				pipe_read_handler,
+				tp);
+
+	/*
+	 * Create an immediate event to free
+	 * completed lists.
+	 */
+	tp->free_im = tevent_create_immediate(tp);
+	if (tp->free_im == NULL) {
+		goto fail;
+	}
+
+	return tp;
+
+  fail:
+
+	TALLOC_FREE(tp);
+	return NULL;
+}
+
+/*
+ * This function schedules an immediate event to be called with argument
+ * *pp_private in the thread context of dest_ev_ctx. Caller doesn't
+ * wait for activation to take place, this is simply fire-and-forget.
+ *
+ * pp_im must be a pointer to an immediate event talloced on
+ * a context owned by the calling thread, or the NULL context.
+ * Ownership of *pp_im will be transfered to the tevent library.
+ *
+ * pp_private can be null, or contents of *pp_private must be
+ * talloc'ed memory on a context owned by the calling thread
+ * or the NULL context. If non-null, ownership of *pp_private will
+ * be transfered to the tevent library.
+ *
+ * If you want to return a message, have the destination use the
+ * same function call to send back to the caller.
+ */
+
+
+void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
+				struct tevent_immediate **pp_im,
+				tevent_immediate_handler_t handler,
+				void *pp_private_data)
+{
+	struct tevent_immediate_list *im_entry;
+	int ret;
+	char c;
+
+	ret = pthread_mutex_lock(&tp->mutex);
+	if (ret != 0) {
+		abort();
+		/* Notreached. */
+		return;
+	}
+
+	if (tp->write_fd == -1) {
+		/* In the process of being destroyed. Ignore. */
+		goto end;
+	}
+
+	/* Create a new immediate_list entry. MUST BE ON THE NULL CONTEXT */
+	im_entry = talloc_zero(NULL, struct tevent_immediate_list);
+	if (im_entry == NULL) {
+		goto end;
+	}
+
+	im_entry->handler = handler;
+	im_entry->im = talloc_move(im_entry, pp_im);
+
+	if (pp_private_data != NULL) {
+		void **pptr = (void **)pp_private_data;
+		im_entry->private_ptr = talloc_move(im_entry, pptr);
+	}
+
+	DLIST_ADD(tp->im_list, im_entry);
+
+	/* And notify the dest_ev_ctx to wake up. */
+	c = '\0';
+	(void)write(tp->write_fd, &c, 1);
+
+  end:
+
+	ret = pthread_mutex_unlock(&tp->mutex);
+	if (ret != 0) {
+		abort();
+		/* Notreached. */
+	}
+}
+#else
+/* !HAVE_PTHREAD */
+struct tevent_thread_proxy *tevent_thread_proxy_create(
+		struct tevent_context *dest_ev_ctx)
+{
+	return NULL;
+}
+
+void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
+				struct tevent_immediate **pp_im,
+				tevent_immediate_handler_t handler,
+				void *pp_private_data)
+{
+	;
+}
+#endif
diff --git a/lib/tevent/wscript b/lib/tevent/wscript
index 827094c..4c5fe0c 100755
--- a/lib/tevent/wscript
+++ b/lib/tevent/wscript
@@ -1,7 +1,7 @@
 #!/usr/bin/env python
 
 APPNAME = 'tevent'
-VERSION = '0.9.25'
+VERSION = '0.9.26'
 
 blddir = 'bin'
 
@@ -83,7 +83,7 @@ def build(bld):
 
     SRC = '''tevent.c tevent_debug.c tevent_fd.c tevent_immediate.c
              tevent_queue.c tevent_req.c tevent_select.c
-             tevent_poll.c
+             tevent_poll.c tevent_threads.c
              tevent_signal.c tevent_standard.c tevent_timed.c tevent_util.c tevent_wakeup.c'''
 
     if bld.CONFIG_SET('HAVE_EPOLL'):
-- 
2.5.0.rc2.392.g76e840b


From 3308d03acf4277ab7a3e75de0abaafa4ebf21b49 Mon Sep 17 00:00:00 2001
From: Jeremy Allison <jra at samba.org>
Date: Fri, 24 Jul 2015 08:50:31 -0700
Subject: [PATCH 2/4] lib: tevent: Initial test of tevent threaded context
 code.

Signed-off-by: Jeremy Allison <jra at samba.org>
---
 lib/tevent/testsuite.c | 124 +++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 124 insertions(+)

diff --git a/lib/tevent/testsuite.c b/lib/tevent/testsuite.c
index c63c878..bf769e0 100644
--- a/lib/tevent/testsuite.c
+++ b/lib/tevent/testsuite.c
@@ -808,6 +808,126 @@ static bool test_event_context_threaded(struct torture_context *test,
 	return true;
 }
 
+#define NUM_TEVENT_THREADS 100
+
+/* Ugly, but needed for torture_comment... */
+static struct torture_context *thread_test_ctx;
+static pthread_t thread_map[NUM_TEVENT_THREADS];
+static unsigned thread_counter;
+
+/* Called in master thread context */
+static void callback_nowait(struct tevent_context *ev,
+				struct tevent_immediate *im,
+				void *private_ptr)
+{
+	pthread_t *thread_id_ptr = (pthread_t *)private_ptr;
+	unsigned i;
+
+	for (i = 0; i < NUM_TEVENT_THREADS; i++) {
+		if (pthread_equal(*thread_id_ptr,
+				thread_map[i])) {
+			break;
+		}
+	}
+	torture_comment(thread_test_ctx,
+			"Callback %u from thread %u\n",
+			thread_counter,
+			i);
+	thread_counter++;
+}
+
+/* Blast the master tevent_context with a callback, no waiting. */
+static void *thread_fn_nowait(void *private_ptr)
+{
+	struct tevent_thread_proxy *master_tp =
+			(struct tevent_thread_proxy *)private_ptr;
+	struct tevent_immediate *im;
+	pthread_t *thread_id_ptr;
+
+	im = tevent_create_immediate(NULL);
+	if (im == NULL) {
+		return NULL;
+	}
+	thread_id_ptr = talloc(NULL, pthread_t);
+	if (thread_id_ptr == NULL) {
+		return NULL;
+	}
+	*thread_id_ptr = pthread_self();
+
+	tevent_thread_proxy_schedule(master_tp,
+				&im,
+				callback_nowait,
+				&thread_id_ptr);
+	return NULL;
+}
+
+static void timeout_fn(struct tevent_context *ev,
+			struct tevent_timer *te,
+			struct timeval tv, void *p)
+{
+	thread_counter = NUM_TEVENT_THREADS * 10;
+}
+
+static bool test_multi_tevent_threaded(struct torture_context *test,
+					const void *test_data)
+{
+	unsigned i;
+	struct tevent_context *master_ev;
+	struct tevent_thread_proxy *tp;
+
+	talloc_disable_null_tracking();
+
+	/* Ugly global stuff. */
+	thread_test_ctx = test;
+	thread_counter = 0;
+
+	master_ev = tevent_context_init(NULL);
+	if (master_ev == NULL) {
+		return false;
+	}
+	tevent_set_debug_stderr(master_ev);
+
+	tp = tevent_thread_proxy_create(master_ev);
+	if (tp == NULL) {
+		torture_fail(test,
+			talloc_asprintf(test,
+				"tevent_thread_proxy_create failed\n"));
+		talloc_free(master_ev);
+		return false;
+	}
+
+	for (i = 0; i < NUM_TEVENT_THREADS; i++) {
+		int ret = pthread_create(&thread_map[i],
+				NULL,
+				thread_fn_nowait,
+				tp);
+		if (ret != 0) {
+			torture_fail(test,
+				talloc_asprintf(test,
+					"Failed to create thread %i, %d\n",
+					i, ret));
+			return false;
+		}
+	}
+
+	/* Ensure we don't wait more than 10 seconds. */
+	tevent_add_timer(master_ev,
+			master_ev,
+			timeval_current_ofs(10,0),
+			timeout_fn,
+			NULL);
+
+	while (thread_counter < NUM_TEVENT_THREADS) {
+		int ret = tevent_loop_once(master_ev);
+		torture_assert(test, ret == 0, "tevent_loop_once failed");
+	}
+
+	torture_assert(test, thread_counter == NUM_TEVENT_THREADS,
+		"thread_counter fail\n");
+
+	talloc_free(master_ev);
+	return true;
+}
 #endif
 
 struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx)
@@ -841,6 +961,10 @@ struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx)
 	torture_suite_add_simple_tcase_const(suite, "threaded_poll_mt",
 					     test_event_context_threaded,
 					     NULL);
+
+	torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded",
+					     test_multi_tevent_threaded,
+					     NULL);
 #endif
 
 	return suite;
-- 
2.5.0.rc2.392.g76e840b


From b4c40ad42562dc8c1b1c312d195d8caaad14bfd1 Mon Sep 17 00:00:00 2001
From: Jeremy Allison <jra at samba.org>
Date: Fri, 24 Jul 2015 09:27:21 -0700
Subject: [PATCH 3/4] lib: tevent: tests: Add a second thread test that does
 request/reply.

Both tests run cleanly with valgrind --tool=drd.

Signed-off-by: Jeremy Allison <jra at samba.org>
---
 lib/tevent/testsuite.c | 185 +++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 185 insertions(+)

diff --git a/lib/tevent/testsuite.c b/lib/tevent/testsuite.c
index bf769e0..a43468d 100644
--- a/lib/tevent/testsuite.c
+++ b/lib/tevent/testsuite.c
@@ -928,6 +928,186 @@ static bool test_multi_tevent_threaded(struct torture_context *test,
 	talloc_free(master_ev);
 	return true;
 }
+
+struct reply_state {
+	struct tevent_thread_proxy *reply_tp;
+	pthread_t thread_id;
+	int *p_finished;
+};
+
+static void thread_timeout_fn(struct tevent_context *ev,
+			struct tevent_timer *te,
+			struct timeval tv, void *p)
+{
+	int *p_finished = (int *)p;
+
+	*p_finished = 2;
+}
+
+/* Called in child-thread context */
+static void thread_callback(struct tevent_context *ev,
+				struct tevent_immediate *im,
+				void *private_ptr)
+{
+	struct reply_state *rsp =
+		(struct reply_state *)talloc_move(ev, &private_ptr);
+	*rsp->p_finished = 1;
+}
+
+/* Called in master thread context */
+static void master_callback(struct tevent_context *ev,
+				struct tevent_immediate *im,
+				void *private_ptr)
+{
+	struct reply_state *rsp =
+		(struct reply_state *)talloc_move(ev, &private_ptr);
+	unsigned i;
+
+	for (i = 0; i < NUM_TEVENT_THREADS; i++) {
+		if (pthread_equal(rsp->thread_id,
+				thread_map[i])) {
+			break;
+		}
+	}
+	torture_comment(thread_test_ctx,
+			"Callback %u from thread %u\n",
+			thread_counter,
+			i);
+	thread_counter++;
+
+	/* Now reply to the thread ! */
+	tevent_thread_proxy_schedule(rsp->reply_tp,
+				&im,
+				thread_callback,
+				&rsp);
+}
+
+static void *thread_fn_1(void *private_ptr)
+{
+	struct tevent_thread_proxy *master_tp =
+			(struct tevent_thread_proxy *)private_ptr;
+	struct tevent_thread_proxy *tp;
+	struct tevent_immediate *im;
+	struct tevent_context *ev;
+	struct reply_state *rsp;
+	int finished = 0;
+	int ret;
+
+	ev = tevent_context_init(NULL);
+	if (ev == NULL) {
+		return NULL;
+	}
+
+	tp = tevent_thread_proxy_create(ev);
+	if (tp == NULL) {
+		talloc_free(ev);
+		return NULL;
+	}
+
+	im = tevent_create_immediate(ev);
+	if (im == NULL) {
+		talloc_free(ev);
+		return NULL;
+	}
+
+	rsp = talloc(ev, struct reply_state);
+	if (rsp == NULL) {
+		talloc_free(ev);
+		return NULL;
+	}
+
+	rsp->thread_id = pthread_self();
+	rsp->reply_tp = tp;
+	rsp->p_finished = &finished;
+
+	/* Introduce a little randomness into the mix.. */
+	usleep(random() % 7000);
+
+	tevent_thread_proxy_schedule(master_tp,
+				&im,
+				master_callback,
+				&rsp);
+
+	/* Ensure we don't wait more than 10 seconds. */
+	tevent_add_timer(ev,
+			ev,
+			timeval_current_ofs(10,0),
+			thread_timeout_fn,
+			&finished);
+
+	while (finished == 0) {
+		ret = tevent_loop_once(ev);
+		assert(ret == 0);
+	}
+
+	if (finished > 1) {
+		/* Timeout ! */
+		abort();
+	}
+
+	talloc_free(ev);
+	return master_tp;
+}
+
+static bool test_multi_tevent_threaded_1(struct torture_context *test,
+					const void *test_data)
+{
+	unsigned i;
+	struct tevent_context *master_ev;
+	struct tevent_thread_proxy *master_tp;
+	int ret;
+
+	talloc_disable_null_tracking();
+
+	/* Ugly global stuff. */
+	thread_test_ctx = test;
+	thread_counter = 0;
+
+	master_ev = tevent_context_init(NULL);
+	if (master_ev == NULL) {
+		return false;
+	}
+	tevent_set_debug_stderr(master_ev);
+
+	master_tp = tevent_thread_proxy_create(master_ev);
+	if (master_tp == NULL) {
+		torture_fail(test,
+			talloc_asprintf(test,
+				"tevent_thread_proxy_create failed\n"));
+		talloc_free(master_ev);
+		return false;
+	}
+
+	for (i = 0; i < NUM_TEVENT_THREADS; i++) {
+		ret = pthread_create(&thread_map[i],
+				NULL,
+				thread_fn_1,
+				master_tp);
+		if (ret != 0) {
+			torture_fail(test,
+				talloc_asprintf(test,
+					"Failed to create thread %i, %d\n",
+					i, ret));
+				return false;
+		}
+	}
+
+	while (thread_counter < NUM_TEVENT_THREADS) {
+		ret = tevent_loop_once(master_ev);
+		torture_assert(test, ret == 0, "tevent_loop_once failed");
+	}
+
+	/* Wait for all the threads to finish - join 'em. */
+	for (i = 0; i < NUM_TEVENT_THREADS; i++) {
+		void *retval;
+		ret = pthread_join(thread_map[i], &retval);
+		torture_assert(test, ret == 0, "pthread_join failed");
+		torture_assert(test, retval == master_tp, "thread failed");
+	}
+
+	talloc_free(master_ev);
+	return true;
+}
 #endif
 
 struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx)
@@ -965,6 +1145,11 @@ struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx)
 	torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded",
 					     test_multi_tevent_threaded,
 					     NULL);
+
+	torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded_1",
+					     test_multi_tevent_threaded_1,
+					     NULL);
+
 #endif
 
 	return suite;
-- 
2.5.0.rc2.392.g76e840b


From a305404821f0625e404f0d3bbeda57b8755725b6 Mon Sep 17 00:00:00 2001
From: Jeremy Allison <jra at samba.org>
Date: Wed, 22 Jul 2015 11:52:06 -0700
Subject: [PATCH 4/4] lib: tevent: docs: Add tutorial on thread usage.

Signed-off-by: Jeremy Allison <jra at samba.org>
---
 lib/tevent/doc/tevent_thread.dox   | 304 +++++++++++++++++++++++++++++++++++++
 lib/tevent/doc/tevent_tutorial.dox |   2 +
 2 files changed, 306 insertions(+)
 create mode 100644 lib/tevent/doc/tevent_thread.dox

diff --git a/lib/tevent/doc/tevent_thread.dox b/lib/tevent/doc/tevent_thread.dox
new file mode 100644
index 0000000..91d3f21
--- /dev/null
+++ b/lib/tevent/doc/tevent_thread.dox
@@ -0,0 +1,304 @@
+/**
+ at page tevent_context Chapter 6: Tevent with threads
+
+ at section context Tevent with threads
+
+In order to use tevent with threads, you must first understand
+how to use the talloc library in threaded programs. For more
+information about working with talloc, please visit <a
+href="http://talloc.samba.org/">talloc website</a> where tutorial and
+documentation are located.
+
+If a tevent context structure is talloced from a NULL, thread-safe talloc
+context, then it can be safe to use in a threaded program. The function
+<code>talloc_disable_null_tracking()</code> <b>must</b> be called from the initial
+program thread before any talloc calls are made to ensure talloc is thread-safe.
+
+Each thread must create it's own tevent context structure as follows
+<code>tevent_context_init(NULL)</code> and no talloc memory contexts
+can be shared between threads.
+
+Separate threads using tevent in this way can communicate
+by writing data into file descriptors that are being monitored
+by a tevent context on another thread. For example (simplified
+with no error handling):
+
+ at code
+Main thread:
+
+main()
+{
+	talloc_disable_null_tracking();
+
+	struct tevent_context *master_ev = tevent_context_init(NULL);
+	void *mem_ctx = talloc_new(master_ev);
+
+	// Create file descriptor to monitor.
+	int pipefds[2];
+
+	pipe(pipefds);
+
+	struct tevent_fd *fde = tevent_add_fd(master_ev,
+				mem_ctx,
+				pipefds[0], // read side of pipe
+				TEVENT_FD_READ,
+				pipe_read_handler, // callback function
+				private_data_pointer);
+
+	// Create sub thread, pass pipefds[1] write side of pipe to it.
+	// The above code not shown here..
+
+	// Process events.
+	tevent_loop_wait(master_ev);
+
+	// Cleanup if loop exits.
+	talloc_free(master_ev);
+}
+
+ at endcode
+
+When the subthread writes to pipefds[1], the function
+<code>pipe_read_handler()</code> will be called in the main thread.
+
+ at subsection More sophisticated use
+
+A popular way to use an event library within threaded programs
+is to allow a sub-thread to asynchronously schedule a tevent_immediate
+function call from the event loop of another thread. This can be built
+out of the basic functions and isolation mechanisms of tevent,
+but tevent also comes with some utility functions that make
+this easier, so long as you understand the limitations that
+using threads with talloc and tevent impose.
+
+To allow a tevent context to receive an asynchronous tevent_immediate
+function callback from another thread, create a struct tevent_thread_proxy *
+by calling @code
+
+struct tevent_thread_proxy *tevent_thread_proxy_create(
+                struct tevent_context *dest_ev_ctx);
+
+ at endcode
+
+This function allocates the internal data structures to
+allow asynchronous callbacks as a talloc child of the
+struct tevent_context *, and returns a struct tevent_thread_proxy *
+that can be passed to another thread.
+
+When you have finished receiving asynchronous callbacks, simply
+talloc_free the struct tevent_thread_proxy *, or talloc_free
+the struct tevent_context *, which will deallocate the resources
+used.
+
+To schedule an asynchronous tevent_immediate function call from one
+thread on the tevent loop of another thread, use
+ at code
+
+void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
+                                struct tevent_immediate **pp_im,
+                                tevent_immediate_handler_t handler,
+                                void **pp_private_data);
+
+ at endcode
+
+This function causes the function <code>handler()</code>
+to be invoked as a tevent_immediate callback from the event loop
+of the thread that created the struct tevent_thread_proxy *
+(so the owning <code>struct tevent_context *</code> should be
+long-lived and not in the process of being torn down).
+
+The <code>struct tevent_immediate **pp_im</code> passed into this function
+should be a struct tevent_immediate * allocated on a talloc context
+local to this thread, and will be reparented via talloc_move
+to be owned by <code>struct tevent_thread_proxy *tp</code>.
+<code>*pp_im</code> will be set to NULL on successful scheduling
+of the tevent_immediate call.
+
+<code>handler()</code> will be called as a normal tevent_immediate
+callback from the <code>struct tevent_context *</code> of the destination
+event loop that created the <code>struct tevent_thread_proxy *</code>
+
+Returning from this functions does not mean that the <code>handler</code>
+has been invoked, merely that it has been scheduled to be called in the
+destination event loop.
+
+Because the calling thread does not wait for the
+callback to be scheduled and run on the destination
+thread, this is a fire-and-forget call. If you wish
+confirmation of the <code>handler()</code> being
+successfully invoked, you must ensure it replies to the
+caller in some way.
+
+Because of asynchronous nature of this call, the nature
+of the parameter passed to the destination thread has some
+restructions. If you don't need parameters, merely pass
+<code>NULL</code> as the value of
+<code>void **pp_private_data</code>.
+
+If you wish to pass a pointer to data between the threads,
+it <b>MUST</b> be a pointer to a talloced pointer, which is
+not part of a talloc-pool, and it must not have a destructor
+attached. The ownership of the memory pointed to will
+be passed from the calling thread to the tevent library,
+and if the receiving thread does not talloc-reparent
+it to its own contexts, it will be freed once the
+<code>handler</code> is called.
+
+On success, <code>*pp_private</code> will be <code>NULL</code>
+to signify the talloc memory ownership has been moved.
+
+In practice for message passing between threads in
+event loops these restrictions are not very onerous.
+
+The easiest way to to a request-reply pair between
+tevent loops on different threads is to pass the
+parameter block of memory back and forth using
+a reply <code>tevent_thread_proxy_schedule()</code>
+call.
+
+Here is an example (without error checking for
+simplicity):
+
+ at code
+------------------------------------------------
+// Master thread.
+
+main()
+{
+	// Make talloc thread-safe.
+
+	talloc_disable_null_tracking();
+
+	// Create the master event context.
+
+	struct tevent_context *master_ev = tevent_context_init(NULL);
+
+	// Create the master thread proxy to allow it to receive
+	// async callbacks from other threads.
+
+	struct tevent_thread_proxy *master_tp =
+			tevent_thread_proxy_create(master_ev);
+
+	// Create sub-threads, passing master_tp in
+	// some way to them.
+	// This code not shown..
+
+	// Process events.
+	// Function master_callback() below
+	// will be invoked on this thread on
+	// master_ev event context.
+
+	tevent_loop_wait(master_ev);
+
+	// Cleanup if loop exits.
+
+	talloc_free(master_ev);
+}
+
+// Data passed between threads.
+struct reply_state {
+	struct tevent_thread_proxy *reply_tp;
+	pthread_t thread_id;
+	bool *p_finished;
+};
+
+// Callback Called in child thread context.
+
+static void thread_callback(struct tevent_context *ev,
+                                struct tevent_immediate *im,
+                                void *private_ptr)
+{
+	// Move the ownership of what private_ptr
+	// points to from the tevent library back to this thread.
+
+	struct reply_state *rsp =
+		(struct reply_state *)talloc_move(ev, &private_ptr);
+	*rsp->p_finished = true;
+
+	// im will be talloc_freed on return from this call.
+	// but rsp will not.
+}
+
+// Callback Called in master thread context.
+
+static void master_callback(struct tevent_context *ev,
+                                struct tevent_immediate *im,
+                                void *private_ptr)
+{
+	// Move the ownership of what private_ptr
+	// points to from the tevent library to this thread.
+
+	struct reply_state *rsp =
+		(struct reply_state *)talloc_move(ev, &private_ptr);
+	}
+
+	printf("Callback from thread %s\n", thread_id_to_string(rsp->thread_id));
+
+	/* Now reply to the thread ! */
+        tevent_thread_proxy_schedule(rsp->reply_tp,
+                                &im,
+                                thread_callback,
+                                &rsp);
+
+	// Note - rsp and im are now NULL as the tevent library
+	// owns the memory.
+}
+
+// Child thread.
+
+static void *thread_fn(void *private_ptr)
+{
+	struct tevent_thread_proxy *master_tp = (struct tevent_thread_proxy *)private_ptr;
+	bool finished = false;
+	int ret;
+
+	// Create our own event context.
+
+	struct tevent_context *ev = tevent_context_init(NULL);
+
+	// Create the local thread proxy to allow us to receive
+	// async callbacks from other threads.
+
+	struct tevent_thread_proxy *local_tp =
+			tevent_thread_proxy_create(master_ev);
+
+	// Setup the data to send.
+
+	struct reply_state *rsp = talloc(ev, struct reply_state);
+
+	rsp->reply_tp = local_tp;
+	rsp->thread_id = pthread_self();
+	rsp->p_finished = &finished;
+
+	// Create the immediate event to use.
+
+	struct tevent_immediate *im = tevent_create_immediate(ev);
+
+	// Call the master thread.
+
+	tevent_thread_proxy_schedule(master_tp,
+				&im,
+				master_callback,
+				&rsp);
+
+	// Note - rsp and im are now NULL as the tevent library
+	// owns the memory.
+
+	// Wait for the reply.
+
+	while (!finished) {
+		tevent_loop_once(ev);
+	}
+
+	// Cleanup.
+
+	talloc_free(ev);
+	return NULL;
+}
+
+ at endcode
+
+Note this doesn't have to be a master-subthread communication.
+Any thread that has access to the <code>struct tevent_thread_proxy *</code>
+pointer of another thread that has called <code>tevent_thread_proxy_create()
+</code> can send an async tevent_immediate request.
+*/
diff --git a/lib/tevent/doc/tevent_tutorial.dox b/lib/tevent/doc/tevent_tutorial.dox
index 9f01fa1..207a244 100644
--- a/lib/tevent/doc/tevent_tutorial.dox
+++ b/lib/tevent/doc/tevent_tutorial.dox
@@ -17,4 +17,6 @@ Tutorial describing working with tevent library.
 
 @subpage tevent_queue
 
+ at subpage tevent_thread
+
 */
-- 
2.5.0.rc2.392.g76e840b



More information about the samba-technical mailing list