From 644767d0276b4299e958c583a2af91e0cd8fa5c4 Mon Sep 17 00:00:00 2001 From: Jeremy Allison Date: Tue, 21 Jul 2015 12:37:08 -0700 Subject: [PATCH 1/4] lib: tevent: Initial checkin of threaded tevent context calling code. Adds 3 new functions: int tevent_threaded_context_register(struct tevent_context *ev); int tevent_threaded_context_unregister(struct tevent_context *ev); int tevent_threaded_async_send(struct tevent_context *dest_ev_ctx, void (*callback_fn)(struct tevent_context *, void *), void **pp_private); Brief doc included. Full docs and tutorial to follow. Signed-off-by: Jeremy Allison --- lib/tevent/tevent.h | 54 ++++++ lib/tevent/tevent_threads.c | 402 ++++++++++++++++++++++++++++++++++++++++++++ lib/tevent/wscript | 4 +- 3 files changed, 458 insertions(+), 2 deletions(-) create mode 100644 lib/tevent/tevent_threads.c diff --git a/lib/tevent/tevent.h b/lib/tevent/tevent.h index b6c39d1..7c8fae6 100644 --- a/lib/tevent/tevent.h +++ b/lib/tevent/tevent.h @@ -1698,6 +1698,60 @@ typedef int (*tevent_nesting_hook)(struct tevent_context *ev, bool begin, void *stack_ptr, const char *location); + +/** + * @brief Register a tevent_context for message passing between threads. + * + * The tevent_context must have been allocated on the NULL + * talloc context, and talloc_disable_null_tracking() must + * have been called. + * + * @param[in] ev The tevent_context to register. + * + * @see tevent_threaded_context_unregister() + * @see tevent_threaded_async_call() + */ +int tevent_threaded_context_register(struct tevent_context *ev); + +/** + * @brief Unregister a tevent_context for message passing between threads. + * + * If there are any pending uncalled callbacks on this context, + * they will be called before this function returns. + * + * @param[in] ev The tevent_context to unregister. + * + * @see tevent_threaded_context_register() + * @see tevent_threaded_async_call() + */ +int tevent_threaded_context_unregister(struct tevent_context *ev); + +/** + * @brief Call a function from an event context on another thread. + * + * Causes dest_ev_ctx, being run by another thread, to call the + * function callback_fn with the *pp_private parameter. pp_parameter + * must be a talloced area of memory with no destructors. Ownership + * of this memory will be transferred to the tevent library and + * *pp_private will be set to NULL on successful completion of + * the call. Set pp_private to NULL if no parameter transfer + * needed (a pure callback). Callback is asynchronous, caller + * does not wait for callback to be completed before returning. + * + * @param[in] dest_ev_ctx The tevent_context to call. + * + * @param[in] callback_fn The function that will be called. + * + * @param[in] pp_private The talloced memory to transfer. + * + * @see tevent_threaded_context_register() + * @see tevent_threaded_context_unregister() + */ +int tevent_threaded_async_call(struct tevent_context *dest_ev_ctx, + void (*callback_fn)(struct tevent_context *, + void *), + void **pp_private); + #ifdef TEVENT_DEPRECATED #ifndef _DEPRECATED_ #if (__GNUC__ >= 3) && (__GNUC_MINOR__ >= 1 ) diff --git a/lib/tevent/tevent_threads.c b/lib/tevent/tevent_threads.c new file mode 100644 index 0000000..093b243 --- /dev/null +++ b/lib/tevent/tevent_threads.c @@ -0,0 +1,402 @@ +/* + tevent event library. + + Copyright (C) Jeremy Allison 2015 + + ** NOTE! The following LGPL license applies to the tevent + ** library. This does NOT imply that all of Samba is released + ** under the LGPL + + This library is free software; you can redistribute it and/or + modify it under the terms of the GNU Lesser General Public + License as published by the Free Software Foundation; either + version 3 of the License, or (at your option) any later version. + + This library is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + Lesser General Public License for more details. + + You should have received a copy of the GNU Lesser General Public + License along with this library; if not, see . +*/ + +#include "replace.h" +#include "system/filesys.h" +#include "talloc.h" +#include "tevent.h" +#include "tevent_internal.h" +#include "tevent_util.h" + +#if defined(HAVE_PTHREAD) +#include + +struct tevent_callback_list { + struct tevent_callback_list *next, *prev; + void (*callback_fn)(struct tevent_context *ev_ctx, void *); + void *private_ptr; +}; + +struct tevent_context_list { + struct tevent_context_list *next, *prev; + struct tevent_context *ev_ctx; + int read_fd; + int write_fd; + struct tevent_fd *pipe_read_fde; + struct tevent_callback_list *cb_list; +}; + +static struct tevent_mt_list { + pthread_mutex_t mutex; + struct tevent_context_list *ev_ctx_list; +} global_event_mt_list = { + .mutex = PTHREAD_MUTEX_INITIALIZER, +}; + +static void call_callback_functions(struct tevent_context_list *entry) +{ + struct tevent_callback_list *cb_entry; + struct tevent_callback_list *cb_next; + + for(cb_entry = entry->cb_list; cb_entry; cb_entry = cb_next) { + int ret; + + cb_next = cb_entry->next; + DLIST_REMOVE(entry->cb_list, cb_entry); + + ret = pthread_mutex_unlock(&global_event_mt_list.mutex); + if (ret != 0) { + abort(); + /* Notreached. */ + return; + } + + /* callback fn called with mutex unlocked. */ + cb_entry->callback_fn(entry->ev_ctx, cb_entry->private_ptr); + + ret = pthread_mutex_lock(&global_event_mt_list.mutex); + if (ret != 0) { + abort(); + /* Notreached. */ + return; + } + TALLOC_FREE(cb_entry); + } +} + +static void pipe_read_handler(struct tevent_context *ev, + struct tevent_fd *fde, + uint16_t flags, + void *private_ptr) +{ + struct tevent_context_list *entry = + (struct tevent_context_list *)private_ptr; + ssize_t len = 64; + int ret; + + ret = pthread_mutex_lock(&global_event_mt_list.mutex); + if (ret != 0) { + abort(); + /* Notreached. */ + return; + } + + /* Clear out all data in the pipe. */ + while (len == 64) { + char buf[64]; + len = read(entry->read_fd, buf, 64); + }; + + call_callback_functions(entry); + + ret = pthread_mutex_unlock(&global_event_mt_list.mutex); + if (ret != 0) { + abort(); + /* Notreached. */ + return; + } +} + +static int tevent_threaded_context_destructor(struct tevent_context_list *entry) +{ + struct tevent_callback_list *cb_entry; + struct tevent_callback_list *cb_next; + int ret; + + ret = pthread_mutex_lock(&global_event_mt_list.mutex); + if (ret != 0) { + abort(); + /* Notreached. */ + return 0; + } + + DLIST_REMOVE(global_event_mt_list.ev_ctx_list, entry); + + TALLOC_FREE(entry->pipe_read_fde); + + if (entry->read_fd != -1) { + (void)close(entry->read_fd); + entry->read_fd = -1; + } + if (entry->write_fd != -1) { + (void)close(entry->write_fd); + entry->write_fd = -1; + } + + /* Hmmm. It's probably an error if we get here with + any non-NULL callback data.. */ + + for(cb_entry = entry->cb_list; cb_entry; cb_entry = cb_next) { + /* We must free these by hand as they're + allocated on the NULL context, not on entry. */ + cb_next = cb_entry->next; + DLIST_REMOVE(entry->cb_list, cb_entry); + TALLOC_FREE(cb_entry); + } + + ret = pthread_mutex_unlock(&global_event_mt_list.mutex); + if (ret != 0) { + abort(); + /* Notreached. */ + return 0; + } + + return 0; +} + +int tevent_threaded_context_register(struct tevent_context *ev) +{ + struct tevent_context_list *le = NULL; + struct tevent_context_list *entry = NULL; + int pipefds[2]; + int ret; + + le = talloc_zero(ev, struct tevent_context_list); + if (le == NULL) { + return ENOMEM; + } + le->ev_ctx = ev; + le->read_fd = -1; + le->write_fd = -1; + + talloc_set_destructor(le, tevent_threaded_context_destructor); + + ret = pipe(pipefds); + if (ret == -1) { + ret = errno; + goto fail; + } + + le->read_fd = pipefds[0]; + le->write_fd = pipefds[1]; + + ret = ev_set_blocking(pipefds[0], false); + if (ret != 0) { + ret = errno; + goto fail; + } + ret = ev_set_blocking(pipefds[1], false); + if (ret != 0) { + ret = errno; + goto fail; + } + if (!ev_set_close_on_exec(pipefds[0])) { + ret = errno; + goto fail; + } + if (!ev_set_close_on_exec(pipefds[1])) { + ret = errno; + goto fail; + } + + le->pipe_read_fde = tevent_add_fd(ev, + le, + le->read_fd, + TEVENT_FD_READ, + pipe_read_handler, + le); + + ret = pthread_mutex_lock(&global_event_mt_list.mutex); + if (ret != 0) { + abort(); + /* Notreached. */ + return ret; + } + /* + * Ensure this event context hasn't already been + * registered. + */ + + for (entry = global_event_mt_list.ev_ctx_list; + entry; + entry = entry->next) { + if (entry->ev_ctx == ev) { + break; + } + } + + if (entry == NULL) { + DLIST_ADD(global_event_mt_list.ev_ctx_list, le); + } + + ret = pthread_mutex_unlock(&global_event_mt_list.mutex); + if (ret != 0) { + abort(); + /* Notreached. */ + return ret; + } + + if (entry != NULL) { + ret = EEXIST; + goto fail; + } + + /* Now ready to accept callback requests. */ + return 0; + + fail: + + TALLOC_FREE(le); + return ret; +} + +int tevent_threaded_context_unregister(struct tevent_context *ev) +{ + struct tevent_context_list *le; + int ret; + + ret = pthread_mutex_lock(&global_event_mt_list.mutex); + if (ret != 0) { + abort(); + /* Notreached. */ + return ret; + } + + for (le = global_event_mt_list.ev_ctx_list; le; le = le->next) { + if (le->ev_ctx != ev) { + continue; + } + /* Ensure we can't get any more calls. */ + TALLOC_FREE(le->pipe_read_fde); + (void)close(le->read_fd); + (void)close(le->write_fd); + le->read_fd = -1; + le->write_fd = -1; + + /* Drain any pending callbacks. */ + call_callback_functions(le); + + break; + } + + ret = pthread_mutex_unlock(&global_event_mt_list.mutex); + if (ret != 0) { + abort(); + /* Notreached. */ + return ret; + } + + if (le != NULL) { + /* Removes from the global list under the mutex. */ + TALLOC_FREE(le); + } else { + /* ev wasn't in the registered list. */ + ret = ESRCH; + } + return ret; +} + +/* + * This function schedules callback_fn to be called with argument + * *pp_private in the thread context of dest_ev_ctx. Caller doesn't + * wait for activation to take place, this is simply fire-and-forget. + * + * pp_private can be null, or contents of *pp_private must be either NULL + * or talloc'ed memory on a context owned by the calling thread + * (or the NULL context). If non-null, ownership of *pp_private will + * be transfered to the tevent library. + * + * If you want to return a message, have the destination use the + * same function call to send back to the caller. + */ + +int tevent_threaded_async_call(struct tevent_context *dest_ev_ctx, + void (*callback_fn)(struct tevent_context *, + void *), + void **pp_private) +{ + struct tevent_context_list *l; + int ret; + int unlock_ret; + + ret = pthread_mutex_lock(&global_event_mt_list.mutex); + if (ret != 0) { + abort(); + /* Notreached. */ + return ret; + } + + /* Look for the destination context in the global registered list. */ + for (l = global_event_mt_list.ev_ctx_list; l != NULL; l = l->next) { + char c = 0; + struct tevent_callback_list *cbe; + + if (l->ev_ctx != dest_ev_ctx) { + continue; + } + + if (l->write_fd == -1) { + /* In the process of being destroyed. Ignore. */ + continue; + } + + /* Found it - create a new callback entry + MUST BE ON THE NULL CONTEXT !!!! */ + + cbe = talloc_zero(NULL, struct tevent_callback_list); + if (cbe == NULL) { + ret = ENOMEM; + break; + } + cbe->callback_fn = callback_fn; + if (pp_private != NULL && *pp_private != NULL) { + cbe->private_ptr = talloc_move(cbe, pp_private); + } + DLIST_ADD(l->cb_list, cbe); + /* And notify the target to wake up. */ + (void)write(l->write_fd, &c, 1); + } + + unlock_ret = pthread_mutex_unlock(&global_event_mt_list.mutex); + if (unlock_ret != 0) { + abort(); + /* Notreached. */ + return unlock_ret; + } + + if (l == NULL) { + /* dest_ev_ctx wasn't in the registered list. */ + ret = ESRCH; + } + + return ret; +} +#else +/* !HAVE_PTHREAD */ +int tevent_threaded_context_register(struct tevent_context *ev) +{ + return ENOSYS; +} + +int tevent_threaded_context_unregister(struct tevent_context *ev) +{ + return ENOSYS; +} + +int tevent_threaded_async_call(struct tevent_context *dest_ev_ctx, + void (*callback_fn)(void *), + void **pp_private) +{ + return ENOSYS; +} +#endif diff --git a/lib/tevent/wscript b/lib/tevent/wscript index 827094c..4c5fe0c 100755 --- a/lib/tevent/wscript +++ b/lib/tevent/wscript @@ -1,7 +1,7 @@ #!/usr/bin/env python APPNAME = 'tevent' -VERSION = '0.9.25' +VERSION = '0.9.26' blddir = 'bin' @@ -83,7 +83,7 @@ def build(bld): SRC = '''tevent.c tevent_debug.c tevent_fd.c tevent_immediate.c tevent_queue.c tevent_req.c tevent_select.c - tevent_poll.c + tevent_poll.c tevent_threads.c tevent_signal.c tevent_standard.c tevent_timed.c tevent_util.c tevent_wakeup.c''' if bld.CONFIG_SET('HAVE_EPOLL'): -- 2.4.3.573.g4eafbef From 19dabd031759367f33dd9dac18b9aa0b27e5bba8 Mon Sep 17 00:00:00 2001 From: Jeremy Allison Date: Tue, 21 Jul 2015 12:37:34 -0700 Subject: [PATCH 2/4] lib: tevent: Initial test of tevent threaded context code. Signed-off-by: Jeremy Allison --- lib/tevent/testsuite.c | 125 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 125 insertions(+) diff --git a/lib/tevent/testsuite.c b/lib/tevent/testsuite.c index c63c878..c06608e 100644 --- a/lib/tevent/testsuite.c +++ b/lib/tevent/testsuite.c @@ -808,6 +808,127 @@ static bool test_event_context_threaded(struct torture_context *test, return true; } +#define NUM_TEVENT_THREADS 100 + +/* Ugly, but needed for torture_comment... */ +static struct torture_context *thread_test_ctx; +static pthread_t thread_map[NUM_TEVENT_THREADS]; +static unsigned thread_counter; + +/* Called in master thread context */ +static void callback_nowait(struct tevent_context *ev, + void *private_ptr) +{ + pthread_t *thread_id_ptr = (pthread_t *)private_ptr; + unsigned i; + + for (i = 0; i < NUM_TEVENT_THREADS; i++) { + if (pthread_equal(*thread_id_ptr, + thread_map[i])) { + break; + } + } + torture_comment(thread_test_ctx, + "Callback %u from thread %u\n", + thread_counter, + i); + thread_counter++; +} + +/* Blast the master tevent_context with a callback, no waiting. */ +static void *thread_fn_nowait(void *private_ptr) +{ + struct tevent_context *master_ev = (struct tevent_context *)private_ptr; + pthread_t *thread_id_ptr = talloc(NULL, pthread_t); + + if (thread_id_ptr == NULL) { + return NULL; + } + *thread_id_ptr = pthread_self(); + + (void)tevent_threaded_async_call(master_ev, + callback_nowait, + (void **)&thread_id_ptr); + return NULL; +} + +static void timeout_fn(struct tevent_context *ev, + struct tevent_timer *te, + struct timeval tv, void *p) +{ + thread_counter = NUM_TEVENT_THREADS * 10; +} + +static bool test_multi_tevent_threaded(struct torture_context *test, + const void *test_data) +{ + unsigned i; + struct tevent_context *master_ev; + int ret; + + talloc_disable_null_tracking(); + + /* Ugly global stuff. */ + thread_test_ctx = test; + thread_counter = 0; + + master_ev = tevent_context_init(NULL); + if (master_ev == NULL) { + return false; + } + tevent_set_debug_stderr(master_ev); + + ret = tevent_threaded_context_register(master_ev); + if (ret != 0) { + torture_fail(test, + talloc_asprintf(test, + "tevent_threaded_context_register failed %d\n", + ret)); + talloc_free(master_ev); + return false; + } + + for (i = 0; i < NUM_TEVENT_THREADS; i++) { + ret = pthread_create(&thread_map[i], + NULL, + thread_fn_nowait, + master_ev); + if (ret != 0) { + torture_fail(test, + talloc_asprintf(test, + "Failed to create thread %i, %d\n", + i, ret)); + return false; + } + } + + /* Ensure we don't wait more than 10 seconds. */ + tevent_add_timer(master_ev, + master_ev, + timeval_current_ofs(10,0), + timeout_fn, + NULL); + + while (thread_counter < NUM_TEVENT_THREADS) { + ret = tevent_loop_once(master_ev); + torture_assert(test, ret == 0, "tevent_loop_once failed"); + } + + torture_assert(test, thread_counter == NUM_TEVENT_THREADS, + "thread_counter fail\n"); + + ret = tevent_threaded_context_unregister(master_ev); + if (ret != 0) { + torture_fail(test, + talloc_asprintf(test, + "tevent_threaded_context_unregister failed %d\n", + ret)); + talloc_free(master_ev); + return false; + } + talloc_free(master_ev); + return true; +} #endif struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx) @@ -841,6 +962,10 @@ struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx) torture_suite_add_simple_tcase_const(suite, "threaded_poll_mt", test_event_context_threaded, NULL); + + torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded", + test_multi_tevent_threaded, + NULL); #endif return suite; -- 2.4.3.573.g4eafbef From d2707907baac5277d41c6fae741ed8c1796acb85 Mon Sep 17 00:00:00 2001 From: Jeremy Allison Date: Tue, 21 Jul 2015 14:32:00 -0700 Subject: [PATCH 3/4] lib: tevent: tests: Add a second thread test that does request/reply. Both tests run cleanly with valgrind --tool=drd. Signed-off-by: Jeremy Allison --- lib/tevent/testsuite.c | 183 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 183 insertions(+) diff --git a/lib/tevent/testsuite.c b/lib/tevent/testsuite.c index c06608e..f334a8b 100644 --- a/lib/tevent/testsuite.c +++ b/lib/tevent/testsuite.c @@ -929,6 +929,184 @@ static bool test_multi_tevent_threaded(struct torture_context *test, talloc_free(master_ev); return true; } + +struct reply_state { + struct tevent_context *reply_ev; + pthread_t thread_id; + int *p_finished; +}; + +static void thread_timeout_fn(struct tevent_context *ev, + struct tevent_timer *te, + struct timeval tv, void *p) +{ + int *p_finished = (int *)p; + + *p_finished = 2; +} + +/* Called in child-thread context */ +static void thread_ev_callback(struct tevent_context *ev, + void *private_ptr) +{ + struct reply_state *rsp = + (struct reply_state *)talloc_move(ev, &private_ptr); + *rsp->p_finished = 1; +} + +/* Called in master thread context */ +static void master_ev_callback(struct tevent_context *ev, + void *private_ptr) +{ + struct reply_state *rsp = + (struct reply_state *)talloc_move(ev, &private_ptr); + unsigned i; + + for (i = 0; i < NUM_TEVENT_THREADS; i++) { + if (pthread_equal(rsp->thread_id, + thread_map[i])) { + break; + } + } + torture_comment(thread_test_ctx, + "Callback %u from thread %u\n", + thread_counter, + i); + thread_counter++; + + /* Now reply to the thread ! */ + (void)tevent_threaded_async_call(rsp->reply_ev, + thread_ev_callback, + (void **)&rsp); +} + +static void *thread_fn_1(void *private_ptr) +{ + struct tevent_context *master_ev = (struct tevent_context *)private_ptr; + struct tevent_context *ev; + struct reply_state *rsp; + int finished = 0; + int ret; + + ev = tevent_context_init(NULL); + if (ev == NULL) { + return NULL; + } + + ret = tevent_threaded_context_register(ev); + if (ret != 0) { + talloc_free(ev); + return NULL; + } + + rsp = talloc(ev, struct reply_state); + if (rsp == NULL) { + tevent_threaded_context_unregister(ev); + talloc_free(ev); + return NULL; + } + + rsp->thread_id = pthread_self(); + rsp->reply_ev = ev; + rsp->p_finished = &finished; + + /* Introduce a little randomness into the mix.. */ + usleep(random() % 3000); + + (void)tevent_threaded_async_call(master_ev, + master_ev_callback, + (void **)&rsp); + + /* Ensure we don't wait more than 10 seconds. */ + tevent_add_timer(ev, + ev, + timeval_current_ofs(10,0), + thread_timeout_fn, + &finished); + + while (finished == 0) { + ret = tevent_loop_once(ev); + assert(ret == 0); + } + + if (finished > 1) { + /* Timeout ! */ + abort(); + } + + tevent_threaded_context_unregister(ev); + talloc_free(ev); + return master_ev; +} + +static bool test_multi_tevent_threaded_1(struct torture_context *test, + const void *test_data) +{ + unsigned i; + struct tevent_context *master_ev; + int ret; + + talloc_disable_null_tracking(); + + /* Ugly global stuff. */ + thread_test_ctx = test; + thread_counter = 0; + + master_ev = tevent_context_init(NULL); + if (master_ev == NULL) { + return false; + } + tevent_set_debug_stderr(master_ev); + + ret = tevent_threaded_context_register(master_ev); + if (ret != 0) { + torture_fail(test, + talloc_asprintf(test, + "tevent_threaded_context_register failed %d\n", + ret)); + talloc_free(master_ev); + return false; + } + + for (i = 0; i < NUM_TEVENT_THREADS; i++) { + ret = pthread_create(&thread_map[i], + NULL, + thread_fn_1, + master_ev); + if (ret != 0) { + torture_fail(test, + talloc_asprintf(test, + "Failed to create thread %i, %d\n", + i, ret)); + return false; + } + } + + while (thread_counter < NUM_TEVENT_THREADS) { + ret = tevent_loop_once(master_ev); + torture_assert(test, ret == 0, "tevent_loop_once failed"); + } + + /* Wait for all the threads to finish - join 'em. */ + for (i = 0; i < NUM_TEVENT_THREADS; i++) { + void *retval; + ret = pthread_join(thread_map[i], &retval); + torture_assert(test, ret == 0, "pthread_join failed"); + torture_assert(test, retval == master_ev, "thread failed"); + } + + ret = tevent_threaded_context_unregister(master_ev); + if (ret != 0) { + torture_fail(test, + talloc_asprintf(test, + "tevent_threaded_context_unregister failed %d\n", + ret)); + talloc_free(master_ev); + return false; + } + talloc_free(master_ev); + return true; +} #endif struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx) @@ -966,6 +1144,11 @@ struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx) torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded", test_multi_tevent_threaded, NULL); + + torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded_1", + test_multi_tevent_threaded_1, + NULL); + #endif return suite; -- 2.4.3.573.g4eafbef From e89e9afc8d8926a464d821f889f1b5e79898c39e Mon Sep 17 00:00:00 2001 From: Jeremy Allison Date: Wed, 22 Jul 2015 11:52:06 -0700 Subject: [PATCH 4/4] lib: tevent: docs: Add tutorial on thread usage. Signed-off-by: Jeremy Allison --- lib/tevent/doc/tevent_thread.dox | 294 +++++++++++++++++++++++++++++++++++++ lib/tevent/doc/tevent_tutorial.dox | 2 + 2 files changed, 296 insertions(+) create mode 100644 lib/tevent/doc/tevent_thread.dox diff --git a/lib/tevent/doc/tevent_thread.dox b/lib/tevent/doc/tevent_thread.dox new file mode 100644 index 0000000..e81f4cf --- /dev/null +++ b/lib/tevent/doc/tevent_thread.dox @@ -0,0 +1,294 @@ +/** +@page tevent_context Chapter 6: Tevent with threads + +@section context Tevent with threads + +In order to use tevent with threads, you must first understand +how to use the talloc library in threaded programs. For more +information about working with talloc, please visit talloc website where tutorial and +documentation are located. + +If a tevent context structure is talloced from a NULL, thread-safe talloc +context, then it can be safe to use in a threaded program. The function +talloc_disable_null_tracking() must be called from the initial +program thread before any talloc calls are made to ensure talloc is thread-safe. + +Each thread must create it's own tevent context structure as follows +tevent_context_init(NULL) and no talloc memory contexts +can be shared between threads. + +Separate threads using tevent in this way can communicate +by writing data into file descriptors that are being monitored +by a tevent context on another thread. For example (simplified +with no error handling): + +@code +Main thread: + +main() +{ + talloc_disable_null_tracking(); + + struct tevent_context *master_ev = tevent_context_init(NULL); + void *mem_ctx = talloc_new(master_ev); + + // Create file descriptor to monitor. + int pipefds[2]; + + pipe(pipefds); + + struct tevent_fd *fde = tevent_add_fd(master_ev, + mem_ctx, + pipefds[0], // read side of pipe + TEVENT_FD_READ, + pipe_read_handler, // callback function + private_data_pointer); + + // Create sub thread, pass pipefds[1] write side of pipe to it. + // The above code not shown here.. + + // Process events. + tevent_loop_wait(master_ev); + + // Cleanup if loop exits. + talloc_free(master_ev); +} + +@endcode + +When the subthread writes to pipefds[1], the function +pipe_read_handler() will be called in the main thread. + +@subsection More sophisticated use + +A popular way to use an event library within threaded programs +is to allow a sub-thread to asynchronously schedule a function +call from the event loop of another thread. This can be built +out of the basic functions and isolation mechanisms of tevent, +but tevent also comes with some utility functions that make +this easier, so long as you understand the limitations that +using threads with talloc and tevent impose. + +To register a tevent context to allow it to receive an +asynchronous callback from another thread, simply call +@code + +int tevent_threaded_context_register(struct tevent_context *ev); + +@endcode + +This function allocates the internal data structures to +allow asynchronous callbacks, and returns 0 on success, +non-zero (errno) on failure. + +When you have finished receiving asynchronous callbacks, and +just before freeing the tevent context, call +@code + +int tevent_threaded_context_unregister(struct tevent_context *ev); + +#endcode +which will deallocate the resources used. Again it +returns 0 on success, non-zero (errno) on failure. + +To schedule an asynchronous callback from one thread +on the tevent loop of another thread, use +@code + +int tevent_threaded_async_call(struct tevent_context *dest_ev_ctx, + void (*callback_fn)(struct tevent_context *, + void *), + void **pp_private); + +@endcode + +This function causes the function callback_fn() +to be invoked as a callback from the event loop of the thread +that created it (so dest_ev_ctx should be long-lived +and not in the process of being torn down). + +callback_fn() will be called with two parameters, +the struct tevent_context * of the event loop +that is processing the callback, and a void * +pointer that points to the data passed by the caller in +*pp_private. + +A return of zero from this functions does not mean +that the callback_fn has been invoked, merely that +it has been scheduled to be called in the destination +event loop. An errno value is returned on error. + +Because the calling thread does not wait for the +callback to be scheduled and run on the destination +thread, this is a fire-and-forget call. If you wish +confirmation of the callback_fn() being +successfully invoked, you must ensure it replies to the +caller in some way. + +Because of asynchronous nature of this call, the nature +of the parameter passed to the destination thread has some +restructions. If you don't need parameters, merely pass +NULL as the value of void **pp_private. + +If you wish to pass a pointer to data between the threads, +it MUST be a talloced pointer, which is not part +of a talloc-pool, and it must not have a destructor +attached. The ownership of the memory pointed to will +be passed from the calling thread to the tevent library, +and if the receiving thread does not talloc-reparent +it to its own contexts, it will be freed once the +callback_fn is called. + +On success, *pp_private will be NULL +to signify the talloc memory ownership has been moved. + +In practice for message passing between threads in +event loops these restrictions are not very onerous. + +The easiest way to to a request-reply pair between +tevent loops on different threads is to pass the +parameter block of memory back and forth using +a reply tevent_threaded_async_call() +call. + +Here is an example (without error checking for +simplicity): + +@code +------------------------------------------------ +// Master thread. + +main() +{ + // Make talloc thread-safe. + + talloc_disable_null_tracking(); + + // Create the master event context. + + struct tevent_context *master_ev = tevent_context_init(NULL); + + // Register it to receive async callbacks from + // other threads. + + tevent_threaded_context_register(master_ev); + + // Create sub-threads, passing master_ev in + // some way to them. + // This code not shown.. + + // Process events. + // Function master_ev_callback() below + // will be invoked on this thread on + // master_ev event context. + + tevent_loop_wait(master_ev); + + // Cleanup if loop exits. + + tevent_threaded_context_unregister(master_ev); + talloc_free(master_ev); +} + +------------------------------------------------ + +// Data passed between threads. +struct reply_state { + struct tevent_context *reply_ev; + pthread_t thread_id; + bool *p_finished; +}; + +// Callback Called in child thread context. +static void thread_ev_callback(struct tevent_context *ev, + void *private_ptr) +{ + // Move the ownership of what private_ptr + // points to from the tevent library back to this thread. + + struct reply_state *rsp = + (struct reply_state *)talloc_move(ev, &private_ptr); + *rsp->p_finished = true; +} + + +// Callback Called in master thread context. + +static void master_ev_callback(struct tevent_context *ev, + void *private_ptr) +{ + // Move the ownership of what private_ptr + // points to from the tevent library to this thread. + + struct reply_state *rsp = + (struct reply_state *)talloc_move(ev, &private_ptr); + } + + printf("Callback from thread %s\n", thread_id_to_string(rsp->thread_id)); + + /* Now reply to the thread ! */ + (void)tevent_threaded_async_call(rsp->reply_ev, + thread_ev_callback, + (void **)&rsp); + + // Note - rsp is now NULL as the tevent library + // owns the memory. +} + +// Child thread. + +static void *thread_fn(void *private_ptr) +{ + struct tevent_context *master_ev = (struct tevent_context *)private_ptr; + struct tevent_context *ev; + struct reply_state *rsp; + bool finished = false; + int ret; + + // Create our own event context. + + ev = tevent_context_init(NULL); + + // Register it to receive async callbacks from + // other threads. + + ret = tevent_threaded_context_register(ev); + + // Setup the data to send. + + rsp = talloc(ev, struct reply_state); + + rsp->reply_ev = ev; + rsp->thread_id = pthread_self(); + rsp->p_finished = &finished; + + // Call the master thread. + + tevent_threaded_async_call(master_ev, + master_ev_callback, + (void **)&rsp); + + // Note - rsp is now NULL as the tevent library + // owns the memory. + + // Wait for the reply. + + while (!finished) { + tevent_loop_once(ev); + } + + // Cleanup. + + tevent_threaded_context_unregister(ev); + talloc_free(ev); + return NULL; +} + +@endcode + +Note this doesn't have to be a master-subthread communication. +Any thread that has access to the struct tevent_context * +pointer of another thread that has called tevent_threaded_context_register() + can send an async callback request. +*/ diff --git a/lib/tevent/doc/tevent_tutorial.dox b/lib/tevent/doc/tevent_tutorial.dox index 9f01fa1..207a244 100644 --- a/lib/tevent/doc/tevent_tutorial.dox +++ b/lib/tevent/doc/tevent_tutorial.dox @@ -17,4 +17,6 @@ Tutorial describing working with tevent library. @subpage tevent_queue +@subpage tevent_thread + */ -- 2.4.3.573.g4eafbef