[PATCH] tevent and threads - infrastructure improvements - version #2
Jeremy Allison
jra at samba.org
Tue Sep 15 17:26:56 UTC 2015
On Tue, Sep 15, 2015 at 09:46:30AM -0700, Jeremy Allison wrote:
>
> Yes, that's a good idea. I'll update the docs to add this and
> post a new patchset so we have something complete to discuss
> next week at SDC.
New patchset with expanded comments and expanded docs explaining
the issue.
Cheers,
Jeremy.
-------------- next part --------------
From 1386ba1464bc9ccd2b2925e179580b28dcecbad3 Mon Sep 17 00:00:00 2001
From: Jeremy Allison <jra at samba.org>
Date: Thu, 23 Jul 2015 15:23:50 -0700
Subject: [PATCH 1/4] lib: tevent: Initial checkin of threaded tevent context
calling code.
Adds 2 new functions:
struct tevent_thread_proxy *tevent_thread_proxy_create(
struct tevent_context *dest_ev_ctx);
void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
struct tevent_immediate **pp_im,
tevent_immediate_handler_t handler,
void *pp_private_data);
Brief doc included. Tests, docs and tutorial to follow.
Signed-off-by: Jeremy Allison <jra at samba.org>
Signed-off-by: Ralph Boehme <slow at samba.org>
---
lib/tevent/tevent.h | 48 ++++++
lib/tevent/tevent_threads.c | 365 ++++++++++++++++++++++++++++++++++++++++++++
lib/tevent/wscript | 4 +-
3 files changed, 415 insertions(+), 2 deletions(-)
create mode 100644 lib/tevent/tevent_threads.c
diff --git a/lib/tevent/tevent.h b/lib/tevent/tevent.h
index b6c39d1..5c739d5 100644
--- a/lib/tevent/tevent.h
+++ b/lib/tevent/tevent.h
@@ -39,6 +39,7 @@ struct tevent_fd;
struct tevent_timer;
struct tevent_immediate;
struct tevent_signal;
+struct tevent_thread_proxy;
/**
* @defgroup tevent The tevent API
@@ -1698,6 +1699,53 @@ typedef int (*tevent_nesting_hook)(struct tevent_context *ev,
bool begin,
void *stack_ptr,
const char *location);
+
+/**
+ * @brief Create a tevent_thread_proxy for message passing between threads.
+ *
+ * The tevent_context must have been allocated on the NULL
+ * talloc context, and talloc_disable_null_tracking() must
+ * have been called.
+ *
+ * @param[in] dest_ev_ctx The tevent_context to receive events.
+ *
+ * @see tevent_thread_proxy_schedule()
+ */
+struct tevent_thread_proxy *tevent_thread_proxy_create(
+ struct tevent_context *dest_ev_ctx);
+
+/**
+ * @brief Schedule an immediate event on an event context from another thread.
+ *
+ * Causes dest_ev_ctx, being run by another thread, to receive an
+ * immediate event calling the handler with the *pp_private parameter.
+ *
+ * *pp_im must be a pointer to an immediate event talloced on a context owned
+ * by the calling thread, or the NULL context. Ownership will
+ * be transferred to the tevent library and *pp_im will be returned as NULL.
+ *
+ * *pp_private_data must be a talloced area of memory with no destructors.
+ * Ownership of this memory will be transferred to the tevent library and
+ * *pp_private_data will be set to NULL on successful completion of
+ * the call. Set pp_private to NULL if no parameter transfer
+ * needed (a pure callback). This is an asynchronous request, caller
+ * does not wait for callback to be completed before returning.
+ *
+ * @param[in] tp The tevent_thread_proxy to use.
+ *
+ * @param[in] pp_im Pointer to immediate event pointer.
+ *
+ * @param[in] handler The function that will be called.
+ *
+ * @param[in] pp_private_data The talloced memory to transfer.
+ *
+ * @see tevent_thread_proxy_create()
+ */
+void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
+ struct tevent_immediate **pp_im,
+ tevent_immediate_handler_t handler,
+ void *pp_private_data);
+
#ifdef TEVENT_DEPRECATED
#ifndef _DEPRECATED_
#if (__GNUC__ >= 3) && (__GNUC_MINOR__ >= 1 )
diff --git a/lib/tevent/tevent_threads.c b/lib/tevent/tevent_threads.c
new file mode 100644
index 0000000..78e4fe2
--- /dev/null
+++ b/lib/tevent/tevent_threads.c
@@ -0,0 +1,365 @@
+/*
+ tevent event library.
+
+ Copyright (C) Jeremy Allison 2015
+
+ ** NOTE! The following LGPL license applies to the tevent
+ ** library. This does NOT imply that all of Samba is released
+ ** under the LGPL
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 3 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "replace.h"
+#include "system/filesys.h"
+#include "talloc.h"
+#include "tevent.h"
+#include "tevent_internal.h"
+#include "tevent_util.h"
+
+#if defined(HAVE_PTHREAD)
+#include <pthread.h>
+
+struct tevent_immediate_list {
+ struct tevent_immediate_list *next, *prev;
+ tevent_immediate_handler_t handler;
+ struct tevent_immediate *im;
+ void *private_ptr;
+};
+
+struct tevent_thread_proxy {
+ pthread_mutex_t mutex;
+ struct tevent_context *dest_ev_ctx;
+ int read_fd;
+ int write_fd;
+ struct tevent_fd *pipe_read_fde;
+ /* Pending events list. */
+ struct tevent_immediate_list *im_list;
+ /* Completed events list. */
+ struct tevent_immediate_list *tofree_im_list;
+ struct tevent_immediate *free_im;
+};
+
+static void free_im_list(struct tevent_immediate_list **pp_list_head)
+{
+ struct tevent_immediate_list *im_entry;
+ struct tevent_immediate_list *im_next;
+
+ for(im_entry = *pp_list_head; im_entry; im_entry = im_next) {
+ im_next = im_entry->next;
+ DLIST_REMOVE(*pp_list_head, im_entry);
+ TALLOC_FREE(im_entry);
+ }
+}
+
+static void free_list_handler(struct tevent_context *ev,
+ struct tevent_immediate *im,
+ void *private_ptr)
+{
+ struct tevent_thread_proxy *tp =
+ talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
+ int ret;
+
+ ret = pthread_mutex_lock(&tp->mutex);
+ if (ret != 0) {
+ abort();
+ /* Notreached. */
+ return;
+ }
+
+ free_im_list(&tp->tofree_im_list);
+
+ ret = pthread_mutex_unlock(&tp->mutex);
+ if (ret != 0) {
+ abort();
+ /* Notreached. */
+ return;
+ }
+}
+
+static void schedule_immediate_functions(struct tevent_thread_proxy *tp)
+{
+ struct tevent_immediate_list *im_entry;
+ struct tevent_immediate_list *im_next;
+
+ for(im_entry = tp->im_list; im_entry; im_entry = im_next) {
+ im_next = im_entry->next;
+ DLIST_REMOVE(tp->im_list, im_entry);
+
+ tevent_schedule_immediate(im_entry->im,
+ tp->dest_ev_ctx,
+ im_entry->handler,
+ im_entry->private_ptr);
+
+ /* Move from pending list to free list. */
+ DLIST_ADD(tp->tofree_im_list, im_entry);
+ }
+ if (tp->tofree_im_list != NULL) {
+ /*
+ * Once the current immediate events
+ * are processed, we need to reshedule
+ * ourselves to free them. This works
+ * as tevent_schedule_immediate()
+ * always adds events to the *END* of
+ * the immediate events list.
+ */
+ tevent_schedule_immediate(tp->free_im,
+ tp->dest_ev_ctx,
+ free_list_handler,
+ tp);
+ }
+}
+
+static void pipe_read_handler(struct tevent_context *ev,
+ struct tevent_fd *fde,
+ uint16_t flags,
+ void *private_ptr)
+{
+ struct tevent_thread_proxy *tp =
+ talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
+ ssize_t len = 64;
+ int ret;
+
+ ret = pthread_mutex_lock(&tp->mutex);
+ if (ret != 0) {
+ abort();
+ /* Notreached. */
+ return;
+ }
+
+ /*
+ * Clear out all data in the pipe. We
+ * don't really care if this returns -1.
+ */
+ while (len == 64) {
+ char buf[64];
+ len = read(tp->read_fd, buf, 64);
+ };
+
+ schedule_immediate_functions(tp);
+
+ ret = pthread_mutex_unlock(&tp->mutex);
+ if (ret != 0) {
+ abort();
+ /* Notreached. */
+ return;
+ }
+}
+
+static int tevent_thread_proxy_destructor(struct tevent_thread_proxy *tp)
+{
+ int ret;
+
+ ret = pthread_mutex_lock(&tp->mutex);
+ if (ret != 0) {
+ abort();
+ /* Notreached. */
+ return 0;
+ }
+
+ TALLOC_FREE(tp->pipe_read_fde);
+
+ if (tp->read_fd != -1) {
+ (void)close(tp->read_fd);
+ tp->read_fd = -1;
+ }
+ if (tp->write_fd != -1) {
+ (void)close(tp->write_fd);
+ tp->write_fd = -1;
+ }
+
+ /* Hmmm. It's probably an error if we get here with
+ any non-NULL immediate entries.. */
+
+ free_im_list(&tp->im_list);
+ free_im_list(&tp->tofree_im_list);
+
+ TALLOC_FREE(tp->free_im);
+
+ ret = pthread_mutex_unlock(&tp->mutex);
+ if (ret != 0) {
+ abort();
+ /* Notreached. */
+ return 0;
+ }
+
+ ret = pthread_mutex_destroy(&tp->mutex);
+ if (ret != 0) {
+ abort();
+ /* Notreached. */
+ return 0;
+ }
+
+ return 0;
+}
+
+/*
+ * Create a struct that can be passed to other threads
+ * to allow them to signal the struct tevent_context *
+ * passed in.
+ */
+
+struct tevent_thread_proxy *tevent_thread_proxy_create(
+ struct tevent_context *dest_ev_ctx)
+{
+ int ret;
+ int pipefds[2];
+ struct tevent_thread_proxy *tp = talloc_zero(dest_ev_ctx,
+ struct tevent_thread_proxy);
+ if (tp == NULL) {
+ return NULL;
+ }
+
+ ret = pthread_mutex_init(&tp->mutex, NULL);
+ if (ret != 0) {
+ goto fail;
+ }
+
+ tp->dest_ev_ctx = dest_ev_ctx;
+ tp->read_fd = -1;
+ tp->write_fd = -1;
+
+ talloc_set_destructor(tp, tevent_thread_proxy_destructor);
+
+ ret = pipe(pipefds);
+ if (ret == -1) {
+ goto fail;
+ }
+
+ tp->read_fd = pipefds[0];
+ tp->write_fd = pipefds[1];
+
+ ret = ev_set_blocking(pipefds[0], false);
+ if (ret != 0) {
+ goto fail;
+ }
+ ret = ev_set_blocking(pipefds[1], false);
+ if (ret != 0) {
+ goto fail;
+ }
+ if (!ev_set_close_on_exec(pipefds[0])) {
+ goto fail;
+ }
+ if (!ev_set_close_on_exec(pipefds[1])) {
+ goto fail;
+ }
+
+ tp->pipe_read_fde = tevent_add_fd(dest_ev_ctx,
+ tp,
+ tp->read_fd,
+ TEVENT_FD_READ,
+ pipe_read_handler,
+ tp);
+
+ /*
+ * Create an immediate event to free
+ * completed lists.
+ */
+ tp->free_im = tevent_create_immediate(tp);
+ if (tp->free_im == NULL) {
+ goto fail;
+ }
+
+ return tp;
+
+ fail:
+
+ TALLOC_FREE(tp);
+ return NULL;
+}
+
+/*
+ * This function schedules an immediate event to be called with argument
+ * *pp_private in the thread context of dest_ev_ctx. Caller doesn't
+ * wait for activation to take place, this is simply fire-and-forget.
+ *
+ * pp_im must be a pointer to an immediate event talloced on
+ * a context owned by the calling thread, or the NULL context.
+ * Ownership of *pp_im will be transfered to the tevent library.
+ *
+ * pp_private can be null, or contents of *pp_private must be
+ * talloc'ed memory on a context owned by the calling thread
+ * or the NULL context. If non-null, ownership of *pp_private will
+ * be transfered to the tevent library.
+ *
+ * If you want to return a message, have the destination use the
+ * same function call to send back to the caller.
+ */
+
+
+void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
+ struct tevent_immediate **pp_im,
+ tevent_immediate_handler_t handler,
+ void *pp_private_data)
+{
+ struct tevent_immediate_list *im_entry;
+ int ret;
+ char c;
+
+ ret = pthread_mutex_lock(&tp->mutex);
+ if (ret != 0) {
+ abort();
+ /* Notreached. */
+ return;
+ }
+
+ if (tp->write_fd == -1) {
+ /* In the process of being destroyed. Ignore. */
+ goto end;
+ }
+
+ /* Create a new immediate_list entry. MUST BE ON THE NULL CONTEXT */
+ im_entry = talloc_zero(NULL, struct tevent_immediate_list);
+ if (im_entry == NULL) {
+ goto end;
+ }
+
+ im_entry->handler = handler;
+ im_entry->im = talloc_move(im_entry, pp_im);
+
+ if (pp_private_data != NULL) {
+ void **pptr = (void **)pp_private_data;
+ im_entry->private_ptr = talloc_move(im_entry, pptr);
+ }
+
+ DLIST_ADD(tp->im_list, im_entry);
+
+ /* And notify the dest_ev_ctx to wake up. */
+ c = '\0';
+ (void)write(tp->write_fd, &c, 1);
+
+ end:
+
+ ret = pthread_mutex_unlock(&tp->mutex);
+ if (ret != 0) {
+ abort();
+ /* Notreached. */
+ }
+}
+#else
+/* !HAVE_PTHREAD */
+struct tevent_thread_proxy *tevent_thread_proxy_create(
+ struct tevent_context *dest_ev_ctx)
+{
+ return NULL;
+}
+
+void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
+ struct tevent_immediate **pp_im,
+ tevent_immediate_handler_t handler,
+ void *pp_private_data)
+{
+ ;
+}
+#endif
diff --git a/lib/tevent/wscript b/lib/tevent/wscript
index 827094c..4c5fe0c 100755
--- a/lib/tevent/wscript
+++ b/lib/tevent/wscript
@@ -1,7 +1,7 @@
#!/usr/bin/env python
APPNAME = 'tevent'
-VERSION = '0.9.25'
+VERSION = '0.9.26'
blddir = 'bin'
@@ -83,7 +83,7 @@ def build(bld):
SRC = '''tevent.c tevent_debug.c tevent_fd.c tevent_immediate.c
tevent_queue.c tevent_req.c tevent_select.c
- tevent_poll.c
+ tevent_poll.c tevent_threads.c
tevent_signal.c tevent_standard.c tevent_timed.c tevent_util.c tevent_wakeup.c'''
if bld.CONFIG_SET('HAVE_EPOLL'):
--
2.6.0.rc0.131.gf624c3d
From 724a9fb08689b5dd9f1c48bb87ddf0954257c803 Mon Sep 17 00:00:00 2001
From: Jeremy Allison <jra at samba.org>
Date: Fri, 24 Jul 2015 08:50:31 -0700
Subject: [PATCH 2/4] lib: tevent: Initial test of tevent threaded context
code.
Signed-off-by: Jeremy Allison <jra at samba.org>
Signed-off-by: Ralph Boehme <slow at samba.org>
---
lib/tevent/testsuite.c | 125 +++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 125 insertions(+)
diff --git a/lib/tevent/testsuite.c b/lib/tevent/testsuite.c
index c63c878..f8f65f8 100644
--- a/lib/tevent/testsuite.c
+++ b/lib/tevent/testsuite.c
@@ -808,6 +808,127 @@ static bool test_event_context_threaded(struct torture_context *test,
return true;
}
+#define NUM_TEVENT_THREADS 100
+
+/* Ugly, but needed for torture_comment... */
+static struct torture_context *thread_test_ctx;
+static pthread_t thread_map[NUM_TEVENT_THREADS];
+static unsigned thread_counter;
+
+/* Called in master thread context */
+static void callback_nowait(struct tevent_context *ev,
+ struct tevent_immediate *im,
+ void *private_ptr)
+{
+ pthread_t *thread_id_ptr =
+ talloc_get_type_abort(private_ptr, pthread_t);
+ unsigned i;
+
+ for (i = 0; i < NUM_TEVENT_THREADS; i++) {
+ if (pthread_equal(*thread_id_ptr,
+ thread_map[i])) {
+ break;
+ }
+ }
+ torture_comment(thread_test_ctx,
+ "Callback %u from thread %u\n",
+ thread_counter,
+ i);
+ thread_counter++;
+}
+
+/* Blast the master tevent_context with a callback, no waiting. */
+static void *thread_fn_nowait(void *private_ptr)
+{
+ struct tevent_thread_proxy *master_tp =
+ talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
+ struct tevent_immediate *im;
+ pthread_t *thread_id_ptr;
+
+ im = tevent_create_immediate(NULL);
+ if (im == NULL) {
+ return NULL;
+ }
+ thread_id_ptr = talloc(NULL, pthread_t);
+ if (thread_id_ptr == NULL) {
+ return NULL;
+ }
+ *thread_id_ptr = pthread_self();
+
+ tevent_thread_proxy_schedule(master_tp,
+ &im,
+ callback_nowait,
+ &thread_id_ptr);
+ return NULL;
+}
+
+static void timeout_fn(struct tevent_context *ev,
+ struct tevent_timer *te,
+ struct timeval tv, void *p)
+{
+ thread_counter = NUM_TEVENT_THREADS * 10;
+}
+
+static bool test_multi_tevent_threaded(struct torture_context *test,
+ const void *test_data)
+{
+ unsigned i;
+ struct tevent_context *master_ev;
+ struct tevent_thread_proxy *tp;
+
+ talloc_disable_null_tracking();
+
+ /* Ugly global stuff. */
+ thread_test_ctx = test;
+ thread_counter = 0;
+
+ master_ev = tevent_context_init(NULL);
+ if (master_ev == NULL) {
+ return false;
+ }
+ tevent_set_debug_stderr(master_ev);
+
+ tp = tevent_thread_proxy_create(master_ev);
+ if (tp == NULL) {
+ torture_fail(test,
+ talloc_asprintf(test,
+ "tevent_thread_proxy_create failed\n"));
+ talloc_free(master_ev);
+ return false;
+ }
+
+ for (i = 0; i < NUM_TEVENT_THREADS; i++) {
+ int ret = pthread_create(&thread_map[i],
+ NULL,
+ thread_fn_nowait,
+ tp);
+ if (ret != 0) {
+ torture_fail(test,
+ talloc_asprintf(test,
+ "Failed to create thread %i, %d\n",
+ i, ret));
+ return false;
+ }
+ }
+
+ /* Ensure we don't wait more than 10 seconds. */
+ tevent_add_timer(master_ev,
+ master_ev,
+ timeval_current_ofs(10,0),
+ timeout_fn,
+ NULL);
+
+ while (thread_counter < NUM_TEVENT_THREADS) {
+ int ret = tevent_loop_once(master_ev);
+ torture_assert(test, ret == 0, "tevent_loop_once failed");
+ }
+
+ torture_assert(test, thread_counter == NUM_TEVENT_THREADS,
+ "thread_counter fail\n");
+
+ talloc_free(master_ev);
+ return true;
+}
#endif
struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx)
@@ -841,6 +962,10 @@ struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx)
torture_suite_add_simple_tcase_const(suite, "threaded_poll_mt",
test_event_context_threaded,
NULL);
+
+ torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded",
+ test_multi_tevent_threaded,
+ NULL);
#endif
return suite;
--
2.6.0.rc0.131.gf624c3d
From f777e5caa3eba2ac43dc9439c72afaa3a4005d96 Mon Sep 17 00:00:00 2001
From: Jeremy Allison <jra at samba.org>
Date: Fri, 24 Jul 2015 09:27:21 -0700
Subject: [PATCH 3/4] lib: tevent: tests: Add a second thread test that does
request/reply.
Both tests run cleanly with valgrind --tool=drd and
valgrind --tool=helgrind
Signed-off-by: Jeremy Allison <jra at samba.org>
Signed-off-by: Ralph Boehme <slow at samba.org>
---
lib/tevent/testsuite.c | 205 +++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 205 insertions(+)
diff --git a/lib/tevent/testsuite.c b/lib/tevent/testsuite.c
index f8f65f8..bcd27fd 100644
--- a/lib/tevent/testsuite.c
+++ b/lib/tevent/testsuite.c
@@ -929,6 +929,206 @@ static bool test_multi_tevent_threaded(struct torture_context *test,
talloc_free(master_ev);
return true;
}
+
+struct reply_state {
+ struct tevent_thread_proxy *reply_tp;
+ pthread_t thread_id;
+ int *p_finished;
+};
+
+static void thread_timeout_fn(struct tevent_context *ev,
+ struct tevent_timer *te,
+ struct timeval tv, void *p)
+{
+ int *p_finished = (int *)p;
+
+ *p_finished = 2;
+}
+
+/* Called in child-thread context */
+static void thread_callback(struct tevent_context *ev,
+ struct tevent_immediate *im,
+ void *private_ptr)
+{
+ struct reply_state *rsp =
+ talloc_get_type_abort(private_ptr, struct reply_state);
+
+ talloc_steal(ev, rsp);
+ *rsp->p_finished = 1;
+}
+
+/* Called in master thread context */
+static void master_callback(struct tevent_context *ev,
+ struct tevent_immediate *im,
+ void *private_ptr)
+{
+ struct reply_state *rsp =
+ talloc_get_type_abort(private_ptr, struct reply_state);
+ unsigned i;
+
+ talloc_steal(ev, rsp);
+
+ for (i = 0; i < NUM_TEVENT_THREADS; i++) {
+ if (pthread_equal(rsp->thread_id,
+ thread_map[i])) {
+ break;
+ }
+ }
+ torture_comment(thread_test_ctx,
+ "Callback %u from thread %u\n",
+ thread_counter,
+ i);
+ /* Now reply to the thread ! */
+ tevent_thread_proxy_schedule(rsp->reply_tp,
+ &im,
+ thread_callback,
+ &rsp);
+
+ thread_counter++;
+}
+
+static void *thread_fn_1(void *private_ptr)
+{
+ struct tevent_thread_proxy *master_tp =
+ talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
+ struct tevent_thread_proxy *tp;
+ struct tevent_immediate *im;
+ struct tevent_context *ev;
+ struct reply_state *rsp;
+ int finished = 0;
+ int ret;
+
+ ev = tevent_context_init(NULL);
+ if (ev == NULL) {
+ return NULL;
+ }
+
+ tp = tevent_thread_proxy_create(ev);
+ if (tp == NULL) {
+ talloc_free(ev);
+ return NULL;
+ }
+
+ im = tevent_create_immediate(ev);
+ if (im == NULL) {
+ talloc_free(ev);
+ return NULL;
+ }
+
+ rsp = talloc(ev, struct reply_state);
+ if (rsp == NULL) {
+ talloc_free(ev);
+ return NULL;
+ }
+
+ rsp->thread_id = pthread_self();
+ rsp->reply_tp = tp;
+ rsp->p_finished = &finished;
+
+ /* Introduce a little randomness into the mix.. */
+ usleep(random() % 7000);
+
+ tevent_thread_proxy_schedule(master_tp,
+ &im,
+ master_callback,
+ &rsp);
+
+ /* Ensure we don't wait more than 10 seconds. */
+ tevent_add_timer(ev,
+ ev,
+ timeval_current_ofs(10,0),
+ thread_timeout_fn,
+ &finished);
+
+ while (finished == 0) {
+ ret = tevent_loop_once(ev);
+ assert(ret == 0);
+ }
+
+ if (finished > 1) {
+ /* Timeout ! */
+ abort();
+ }
+
+ /*
+ * NB. We should talloc_free(ev) here, but if we do
+ * we currently get hit by helgrind Fix #323432
+ * "When calling pthread_cond_destroy or pthread_mutex_destroy
+ * with initializers as argument Helgrind (incorrectly) reports errors."
+ *
+ * http://valgrind.10908.n7.nabble.com/Helgrind-3-9-0-false-positive-
+ * with-pthread-mutex-destroy-td47757.html
+ *
+ * Helgrind doesn't understand that the request/reply
+ * messages provide synchronization between the lock/unlock
+ * in tevent_thread_proxy_schedule(), and the pthread_destroy()
+ * when the struct tevent_thread_proxy object is talloc_free'd.
+ *
+ * As a work-around for now return ev for the parent thread to free.
+ */
+ return ev;
+}
+
+static bool test_multi_tevent_threaded_1(struct torture_context *test,
+ const void *test_data)
+{
+ unsigned i;
+ struct tevent_context *master_ev;
+ struct tevent_thread_proxy *master_tp;
+ int ret;
+
+ talloc_disable_null_tracking();
+
+ /* Ugly global stuff. */
+ thread_test_ctx = test;
+ thread_counter = 0;
+
+ master_ev = tevent_context_init(NULL);
+ if (master_ev == NULL) {
+ return false;
+ }
+ tevent_set_debug_stderr(master_ev);
+
+ master_tp = tevent_thread_proxy_create(master_ev);
+ if (master_tp == NULL) {
+ torture_fail(test,
+ talloc_asprintf(test,
+ "tevent_thread_proxy_create failed\n"));
+ talloc_free(master_ev);
+ return false;
+ }
+
+ for (i = 0; i < NUM_TEVENT_THREADS; i++) {
+ ret = pthread_create(&thread_map[i],
+ NULL,
+ thread_fn_1,
+ master_tp);
+ if (ret != 0) {
+ torture_fail(test,
+ talloc_asprintf(test,
+ "Failed to create thread %i, %d\n",
+ i, ret));
+ return false;
+ }
+ }
+
+ while (thread_counter < NUM_TEVENT_THREADS) {
+ ret = tevent_loop_once(master_ev);
+ torture_assert(test, ret == 0, "tevent_loop_once failed");
+ }
+
+ /* Wait for all the threads to finish - join 'em. */
+ for (i = 0; i < NUM_TEVENT_THREADS; i++) {
+ void *retval;
+ ret = pthread_join(thread_map[i], &retval);
+ torture_assert(test, ret == 0, "pthread_join failed");
+ /* Free the child thread event context. */
+ talloc_free(retval);
+ }
+
+ talloc_free(master_ev);
+ return true;
+}
#endif
struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx)
@@ -966,6 +1166,11 @@ struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx)
torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded",
test_multi_tevent_threaded,
NULL);
+
+ torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded_1",
+ test_multi_tevent_threaded_1,
+ NULL);
+
#endif
return suite;
--
2.6.0.rc0.131.gf624c3d
From 735e9534ad1da712a9460ad97837d0252c044920 Mon Sep 17 00:00:00 2001
From: Jeremy Allison <jra at samba.org>
Date: Wed, 22 Jul 2015 11:52:06 -0700
Subject: [PATCH 4/4] lib: tevent: docs: Add tutorial on thread usage.
Signed-off-by: Jeremy Allison <jra at samba.org>
Signed-off-by: Ralph Boehme <slow at samba.org>
---
lib/tevent/doc/tevent_thread.dox | 322 +++++++++++++++++++++++++++++++++++++
lib/tevent/doc/tevent_tutorial.dox | 2 +
2 files changed, 324 insertions(+)
create mode 100644 lib/tevent/doc/tevent_thread.dox
diff --git a/lib/tevent/doc/tevent_thread.dox b/lib/tevent/doc/tevent_thread.dox
new file mode 100644
index 0000000..8bf181c
--- /dev/null
+++ b/lib/tevent/doc/tevent_thread.dox
@@ -0,0 +1,322 @@
+/**
+ at page tevent_context Chapter 6: Tevent with threads
+
+ at section context Tevent with threads
+
+In order to use tevent with threads, you must first understand
+how to use the talloc library in threaded programs. For more
+information about working with talloc, please visit <a
+href="http://talloc.samba.org/">talloc website</a> where tutorial and
+documentation are located.
+
+If a tevent context structure is talloced from a NULL, thread-safe talloc
+context, then it can be safe to use in a threaded program. The function
+<code>talloc_disable_null_tracking()</code> <b>must</b> be called from the initial
+program thread before any talloc calls are made to ensure talloc is thread-safe.
+
+Each thread must create it's own tevent context structure as follows
+<code>tevent_context_init(NULL)</code> and no talloc memory contexts
+can be shared between threads.
+
+Separate threads using tevent in this way can communicate
+by writing data into file descriptors that are being monitored
+by a tevent context on another thread. For example (simplified
+with no error handling):
+
+ at code
+Main thread:
+
+main()
+{
+ talloc_disable_null_tracking();
+
+ struct tevent_context *master_ev = tevent_context_init(NULL);
+ void *mem_ctx = talloc_new(master_ev);
+
+ // Create file descriptor to monitor.
+ int pipefds[2];
+
+ pipe(pipefds);
+
+ struct tevent_fd *fde = tevent_add_fd(master_ev,
+ mem_ctx,
+ pipefds[0], // read side of pipe
+ TEVENT_FD_READ,
+ pipe_read_handler, // callback function
+ private_data_pointer);
+
+ // Create sub thread, pass pipefds[1] write side of pipe to it.
+ // The above code not shown here..
+
+ // Process events.
+ tevent_loop_wait(master_ev);
+
+ // Cleanup if loop exits.
+ talloc_free(master_ev);
+}
+
+ at endcode
+
+When the subthread writes to pipefds[1], the function
+<code>pipe_read_handler()</code> will be called in the main thread.
+
+ at subsection More sophisticated use
+
+A popular way to use an event library within threaded programs
+is to allow a sub-thread to asynchronously schedule a tevent_immediate
+function call from the event loop of another thread. This can be built
+out of the basic functions and isolation mechanisms of tevent,
+but tevent also comes with some utility functions that make
+this easier, so long as you understand the limitations that
+using threads with talloc and tevent impose.
+
+To allow a tevent context to receive an asynchronous tevent_immediate
+function callback from another thread, create a struct tevent_thread_proxy *
+by calling @code
+
+struct tevent_thread_proxy *tevent_thread_proxy_create(
+ struct tevent_context *dest_ev_ctx);
+
+ at endcode
+
+This function allocates the internal data structures to
+allow asynchronous callbacks as a talloc child of the
+struct tevent_context *, and returns a struct tevent_thread_proxy *
+that can be passed to another thread.
+
+When you have finished receiving asynchronous callbacks, simply
+talloc_free the struct tevent_thread_proxy *, or talloc_free
+the struct tevent_context *, which will deallocate the resources
+used.
+
+To schedule an asynchronous tevent_immediate function call from one
+thread on the tevent loop of another thread, use
+ at code
+
+void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
+ struct tevent_immediate **pp_im,
+ tevent_immediate_handler_t handler,
+ void **pp_private_data);
+
+ at endcode
+
+This function causes the function <code>handler()</code>
+to be invoked as a tevent_immediate callback from the event loop
+of the thread that created the struct tevent_thread_proxy *
+(so the owning <code>struct tevent_context *</code> should be
+long-lived and not in the process of being torn down).
+
+The <code>struct tevent_thread_proxy</code> object being
+used here is a child of the event context of the target
+thread. So external synchronization mechanisms must be
+used to ensure that the target object is still in use
+at the time of the <code>tevent_thread_proxy_schedule()</code>
+call. In the example below, the request/response nature
+of the communication ensures this.
+
+The <code>struct tevent_immediate **pp_im</code> passed into this function
+should be a struct tevent_immediate * allocated on a talloc context
+local to this thread, and will be reparented via talloc_move
+to be owned by <code>struct tevent_thread_proxy *tp</code>.
+<code>*pp_im</code> will be set to NULL on successful scheduling
+of the tevent_immediate call.
+
+<code>handler()</code> will be called as a normal tevent_immediate
+callback from the <code>struct tevent_context *</code> of the destination
+event loop that created the <code>struct tevent_thread_proxy *</code>
+
+Returning from this functions does not mean that the <code>handler</code>
+has been invoked, merely that it has been scheduled to be called in the
+destination event loop.
+
+Because the calling thread does not wait for the
+callback to be scheduled and run on the destination
+thread, this is a fire-and-forget call. If you wish
+confirmation of the <code>handler()</code> being
+successfully invoked, you must ensure it replies to the
+caller in some way.
+
+Because of asynchronous nature of this call, the nature
+of the parameter passed to the destination thread has some
+restructions. If you don't need parameters, merely pass
+<code>NULL</code> as the value of
+<code>void **pp_private_data</code>.
+
+If you wish to pass a pointer to data between the threads,
+it <b>MUST</b> be a pointer to a talloced pointer, which is
+not part of a talloc-pool, and it must not have a destructor
+attached. The ownership of the memory pointed to will
+be passed from the calling thread to the tevent library,
+and if the receiving thread does not talloc-reparent
+it to its own contexts, it will be freed once the
+<code>handler</code> is called.
+
+On success, <code>*pp_private</code> will be <code>NULL</code>
+to signify the talloc memory ownership has been moved.
+
+In practice for message passing between threads in
+event loops these restrictions are not very onerous.
+
+The easiest way to to a request-reply pair between
+tevent loops on different threads is to pass the
+parameter block of memory back and forth using
+a reply <code>tevent_thread_proxy_schedule()</code>
+call.
+
+Here is an example (without error checking for
+simplicity):
+
+ at code
+------------------------------------------------
+// Master thread.
+
+main()
+{
+ // Make talloc thread-safe.
+
+ talloc_disable_null_tracking();
+
+ // Create the master event context.
+
+ struct tevent_context *master_ev = tevent_context_init(NULL);
+
+ // Create the master thread proxy to allow it to receive
+ // async callbacks from other threads.
+
+ struct tevent_thread_proxy *master_tp =
+ tevent_thread_proxy_create(master_ev);
+
+ // Create sub-threads, passing master_tp in
+ // some way to them.
+ // This code not shown..
+
+ // Process events.
+ // Function master_callback() below
+ // will be invoked on this thread on
+ // master_ev event context.
+
+ tevent_loop_wait(master_ev);
+
+ // Cleanup if loop exits.
+
+ talloc_free(master_ev);
+}
+
+// Data passed between threads.
+struct reply_state {
+ struct tevent_thread_proxy *reply_tp;
+ pthread_t thread_id;
+ bool *p_finished;
+};
+
+// Callback Called in child thread context.
+
+static void thread_callback(struct tevent_context *ev,
+ struct tevent_immediate *im,
+ void *private_ptr)
+{
+ // Move the ownership of what private_ptr
+ // points to from the tevent library back to this thread.
+
+ struct reply_state *rsp =
+ talloc_get_type_abort(private_ptr, struct reply_state);
+
+ talloc_steal(ev, rsp);
+
+ *rsp->p_finished = true;
+
+ // im will be talloc_freed on return from this call.
+ // but rsp will not.
+}
+
+// Callback Called in master thread context.
+
+static void master_callback(struct tevent_context *ev,
+ struct tevent_immediate *im,
+ void *private_ptr)
+{
+ // Move the ownership of what private_ptr
+ // points to from the tevent library to this thread.
+
+ struct reply_state *rsp =
+ talloc_get_type_abort(private_ptr, struct reply_state);
+
+ talloc_steal(ev, rsp);
+
+ printf("Callback from thread %s\n", thread_id_to_string(rsp->thread_id));
+
+ /* Now reply to the thread ! */
+ tevent_thread_proxy_schedule(rsp->reply_tp,
+ &im,
+ thread_callback,
+ &rsp);
+
+ // Note - rsp and im are now NULL as the tevent library
+ // owns the memory.
+}
+
+// Child thread.
+
+static void *thread_fn(void *private_ptr)
+{
+ struct tevent_thread_proxy *master_tp =
+ talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
+ bool finished = false;
+ int ret;
+
+ // Create our own event context.
+
+ struct tevent_context *ev = tevent_context_init(NULL);
+
+ // Create the local thread proxy to allow us to receive
+ // async callbacks from other threads.
+
+ struct tevent_thread_proxy *local_tp =
+ tevent_thread_proxy_create(master_ev);
+
+ // Setup the data to send.
+
+ struct reply_state *rsp = talloc(ev, struct reply_state);
+
+ rsp->reply_tp = local_tp;
+ rsp->thread_id = pthread_self();
+ rsp->p_finished = &finished;
+
+ // Create the immediate event to use.
+
+ struct tevent_immediate *im = tevent_create_immediate(ev);
+
+ // Call the master thread.
+
+ tevent_thread_proxy_schedule(master_tp,
+ &im,
+ master_callback,
+ &rsp);
+
+ // Note - rsp and im are now NULL as the tevent library
+ // owns the memory.
+
+ // Wait for the reply.
+
+ while (!finished) {
+ tevent_loop_once(ev);
+ }
+
+ // Cleanup.
+
+ talloc_free(ev);
+ return NULL;
+}
+
+ at endcode
+
+Note this doesn't have to be a master-subthread communication.
+Any thread that has access to the <code>struct tevent_thread_proxy *</code>
+pointer of another thread that has called <code>tevent_thread_proxy_create()
+</code> can send an async tevent_immediate request.
+
+But remember the caveat that external synchronization must be used
+to ensure the target <code>struct tevent_thread_proxy *</code> object
+exists at the time of the <code>tevent_thread_proxy_schedule()</code>
+call or unreproducible crashes will result.
+*/
diff --git a/lib/tevent/doc/tevent_tutorial.dox b/lib/tevent/doc/tevent_tutorial.dox
index 9f01fa1..207a244 100644
--- a/lib/tevent/doc/tevent_tutorial.dox
+++ b/lib/tevent/doc/tevent_tutorial.dox
@@ -17,4 +17,6 @@ Tutorial describing working with tevent library.
@subpage tevent_queue
+ at subpage tevent_thread
+
*/
--
2.6.0.rc0.131.gf624c3d
More information about the samba-technical
mailing list