[PATCH] Avoid a pipe read in aio result scheduling
Jeremy Allison
jra at samba.org
Mon Aug 22 19:09:29 UTC 2016
On Mon, Aug 22, 2016 at 05:27:35PM +0200, Volker Lendecke wrote:
> Hi!
>
> Attached find a patchset that is supposed to improve our async I/O
> result scheduling. For a more verbose explanation I'd recommend the
> commit message for [PATCH 04/23].
>
> Jeremy, I did not look deeply enough into your threaded messaging
> tevent code to see whether the tevent_thread_proxy_schedule()
> implementation can be based upon tevent_threaded_schedule_immediate().
>
> Thanks metze for very deep review and insights.
>
> Comments?
Oh it removes the asys stuff ! Very cool (deleted code is always
good :-).
I'll take some time and go over this really carefully asap (next
day or so).
Once it's in I'll see if I can improve tevent_thread_proxy_schedule()
by removing the internal pipe and basing it upon this new code !
Cheers,
Jeremy.
> From 7b07c12d3ffaf37debd81a4cad8fd385fa7d4360 Mon Sep 17 00:00:00 2001
> From: Volker Lendecke <vl at samba.org>
> Date: Fri, 12 Aug 2016 16:40:05 +0200
> Subject: [PATCH 01/23] libreplace: Ask for eventfd(2)
>
> This will be used in tevent soon
>
> Signed-off-by: Volker Lendecke <vl at samba.org>
> Reviewed-by: Stefan Metzmacher <metze at samba.org>
> ---
> lib/replace/wscript | 3 +++
> 1 file changed, 3 insertions(+)
>
> diff --git a/lib/replace/wscript b/lib/replace/wscript
> index 145300d..1dfd902 100644
> --- a/lib/replace/wscript
> +++ b/lib/replace/wscript
> @@ -483,6 +483,9 @@ removeea setea
> if conf.CONFIG_SET('HAVE_PORT_CREATE') and conf.CONFIG_SET('HAVE_PORT_H'):
> conf.DEFINE('HAVE_SOLARIS_PORTS', 1)
>
> + if conf.CHECK_FUNCS('eventfd', headers='sys/eventfd.h'):
> + conf.DEFINE('HAVE_EVENTFD', 1)
> +
> conf.CHECK_HEADERS('poll.h')
> conf.CHECK_FUNCS('poll')
>
> --
> 2.1.4
>
>
> From 9c292edb10c47a3c8b3a269d0aa8556e411aa8a2 Mon Sep 17 00:00:00 2001
> From: Volker Lendecke <vl at samba.org>
> Date: Mon, 15 Aug 2016 10:33:09 +0200
> Subject: [PATCH 02/23] tevent: Fix a typo
>
> Signed-off-by: Volker Lendecke <vl at samba.org>
> Reviewed-by: Stefan Metzmacher <metze at samba.org>
> ---
> lib/tevent/tevent_threads.c | 2 +-
> 1 file changed, 1 insertion(+), 1 deletion(-)
>
> diff --git a/lib/tevent/tevent_threads.c b/lib/tevent/tevent_threads.c
> index 15882e4..22b854c 100644
> --- a/lib/tevent/tevent_threads.c
> +++ b/lib/tevent/tevent_threads.c
> @@ -108,7 +108,7 @@ static void schedule_immediate_functions(struct tevent_thread_proxy *tp)
> if (tp->tofree_im_list != NULL) {
> /*
> * Once the current immediate events
> - * are processed, we need to reshedule
> + * are processed, we need to reschedule
> * ourselves to free them. This works
> * as tevent_schedule_immediate()
> * always adds events to the *END* of
> --
> 2.1.4
>
>
> From b2a4dfb515b9eea36fe8641ee3a7916e430e3ad1 Mon Sep 17 00:00:00 2001
> From: Volker Lendecke <vl at samba.org>
> Date: Fri, 29 Jul 2016 08:53:59 +0200
> Subject: [PATCH 03/23] tevent: Move the async wakeup pipe to common
>
> Signalling the main event loop will also happen from threads soon, and
> that will use the same mechanism. This also keeps the pipe open after the last
> signal handler is removed. Threaded jobs will come and go very frequently, and
> always setting up and tearing down the pipe for each job will be expensive.
> Also, this is "just" two file descriptors, and with eventfd just one.
>
> Signed-off-by: Volker Lendecke <vl at samba.org>
> Reviewed-by: Stefan Metzmacher <metze at samba.org>
> ---
> lib/tevent/ABI/tevent-0.9.29.sigs | 3 ++
> lib/tevent/tevent.c | 87 +++++++++++++++++++++++++++++++++++++--
> lib/tevent/tevent_internal.h | 4 ++
> lib/tevent/tevent_poll.c | 5 +--
> lib/tevent/tevent_signal.c | 60 ++++-----------------------
> 5 files changed, 99 insertions(+), 60 deletions(-)
>
> diff --git a/lib/tevent/ABI/tevent-0.9.29.sigs b/lib/tevent/ABI/tevent-0.9.29.sigs
> index 1357751..4b64741 100644
> --- a/lib/tevent/ABI/tevent-0.9.29.sigs
> +++ b/lib/tevent/ABI/tevent-0.9.29.sigs
> @@ -28,10 +28,13 @@ tevent_common_fd_destructor: int (struct tevent_fd *)
> tevent_common_fd_get_flags: uint16_t (struct tevent_fd *)
> tevent_common_fd_set_close_fn: void (struct tevent_fd *, tevent_fd_close_fn_t)
> tevent_common_fd_set_flags: void (struct tevent_fd *, uint16_t)
> +tevent_common_have_events: bool (struct tevent_context *)
> tevent_common_loop_immediate: bool (struct tevent_context *)
> tevent_common_loop_timer_delay: struct timeval (struct tevent_context *)
> tevent_common_loop_wait: int (struct tevent_context *, const char *)
> tevent_common_schedule_immediate: void (struct tevent_immediate *, struct tevent_context *, tevent_immediate_handler_t, void *, const char *, const char *)
> +tevent_common_wakeup: int (struct tevent_context *)
> +tevent_common_wakeup_init: int (struct tevent_context *)
> tevent_context_init: struct tevent_context *(TALLOC_CTX *)
> tevent_context_init_byname: struct tevent_context *(TALLOC_CTX *, const char *)
> tevent_context_init_ops: struct tevent_context *(TALLOC_CTX *, const struct tevent_ops *, void *)
> diff --git a/lib/tevent/tevent.c b/lib/tevent/tevent.c
> index 843cf05..34cd402 100644
> --- a/lib/tevent/tevent.c
> +++ b/lib/tevent/tevent.c
> @@ -620,6 +620,28 @@ done:
> return ret;
> }
>
> +bool tevent_common_have_events(struct tevent_context *ev)
> +{
> + if (ev->fd_events != NULL) {
> + if (ev->fd_events != ev->pipe_fde) {
> + return true;
> + }
> + if (ev->fd_events->next != NULL) {
> + return true;
> + }
> +
> + /*
> + * At this point we just have the wakeup pipe event as
> + * the only fd_event. That one does not count as a
> + * regular event, so look at the other event types.
> + */
> + }
> +
> + return ((ev->timer_events != NULL) ||
> + (ev->immediate_events != NULL) ||
> + (ev->signal_events != NULL));
> +}
> +
> /*
> return on failure or (with 0) if all fd events are removed
> */
> @@ -629,10 +651,7 @@ int tevent_common_loop_wait(struct tevent_context *ev,
> /*
> * loop as long as we have events pending
> */
> - while (ev->fd_events ||
> - ev->timer_events ||
> - ev->immediate_events ||
> - ev->signal_events) {
> + while (tevent_common_have_events(ev)) {
> int ret;
> ret = _tevent_loop_once(ev, location);
> if (ret != 0) {
> @@ -670,3 +689,63 @@ int tevent_re_initialise(struct tevent_context *ev)
>
> return ev->ops->context_init(ev);
> }
> +
> +static void wakeup_pipe_handler(struct tevent_context *ev,
> + struct tevent_fd *fde,
> + uint16_t flags, void *_private)
> +{
> + ssize_t ret;
> +
> + char c[16];
> + /* its non-blocking, doesn't matter if we read too much */
> + do {
> + ret = read(fde->fd, c, sizeof(c));
> + } while (ret == -1 && errno == EINTR);
> +}
> +
> +/*
> + * Initialize the wakeup pipe and pipe fde
> + */
> +
> +int tevent_common_wakeup_init(struct tevent_context *ev)
> +{
> + int ret;
> +
> + if (ev->pipe_fde != NULL) {
> + return 0;
> + }
> +
> + ret = pipe(ev->pipe_fds);
> + if (ret == -1) {
> + return errno;
> + }
> + ev_set_blocking(ev->pipe_fds[0], false);
> + ev_set_blocking(ev->pipe_fds[1], false);
> +
> + ev->pipe_fde = tevent_add_fd(ev, ev, ev->pipe_fds[0],
> + TEVENT_FD_READ,
> + wakeup_pipe_handler, NULL);
> + if (ev->pipe_fde == NULL) {
> + close(ev->pipe_fds[0]);
> + close(ev->pipe_fds[1]);
> + return ENOMEM;
> + }
> +
> + return 0;
> +}
> +
> +int tevent_common_wakeup(struct tevent_context *ev)
> +{
> + ssize_t ret;
> +
> + if (ev->pipe_fds[1] == -1) {
> + return ENOTCONN;
> + }
> +
> + do {
> + char c = '\0';
> + ret = write(ev->pipe_fds[1], &c, 1);
> + } while ((ret == -1) && (errno == EINTR));
> +
> + return 0;
> +}
> diff --git a/lib/tevent/tevent_internal.h b/lib/tevent/tevent_internal.h
> index 10cc4a4..8362770 100644
> --- a/lib/tevent/tevent_internal.h
> +++ b/lib/tevent/tevent_internal.h
> @@ -328,6 +328,10 @@ void tevent_common_schedule_immediate(struct tevent_immediate *im,
> const char *location);
> bool tevent_common_loop_immediate(struct tevent_context *ev);
>
> +bool tevent_common_have_events(struct tevent_context *ev);
> +int tevent_common_wakeup_init(struct tevent_context *ev);
> +int tevent_common_wakeup(struct tevent_context *ev);
> +
> struct tevent_signal *tevent_common_add_signal(struct tevent_context *ev,
> TALLOC_CTX *mem_ctx,
> int signum,
> diff --git a/lib/tevent/tevent_poll.c b/lib/tevent/tevent_poll.c
> index e1c305d..3547e91 100644
> --- a/lib/tevent/tevent_poll.c
> +++ b/lib/tevent/tevent_poll.c
> @@ -667,10 +667,7 @@ static int poll_event_loop_wait(struct tevent_context *ev,
> /*
> * loop as long as we have events pending
> */
> - while (ev->fd_events ||
> - ev->timer_events ||
> - ev->immediate_events ||
> - ev->signal_events ||
> + while (tevent_common_have_events(ev) ||
> poll_ev->fresh ||
> poll_ev->disabled) {
> int ret;
> diff --git a/lib/tevent/tevent_signal.c b/lib/tevent/tevent_signal.c
> index 635a7a1..c85e1c5 100644
> --- a/lib/tevent/tevent_signal.c
> +++ b/lib/tevent/tevent_signal.c
> @@ -95,7 +95,6 @@ static uint32_t tevent_sig_count(struct tevent_sigcounter s)
> */
> static void tevent_common_signal_handler(int signum)
> {
> - char c = 0;
> struct tevent_common_signal_list *sl;
> struct tevent_context *ev = NULL;
> int saved_errno = errno;
> @@ -106,13 +105,8 @@ static void tevent_common_signal_handler(int signum)
> /* Write to each unique event context. */
> for (sl = sig_state->sig_handlers[signum]; sl; sl = sl->next) {
> if (sl->se->event_ctx && sl->se->event_ctx != ev) {
> - ssize_t ret;
> -
> ev = sl->se->event_ctx;
> - /* doesn't matter if this pipe overflows */
> - do {
> - ret = write(ev->pipe_fds[1], &c, 1);
> - } while (ret == -1 && errno == EINTR);
> + tevent_common_wakeup(ev);
> }
> }
>
> @@ -198,16 +192,6 @@ static int tevent_signal_destructor(struct tevent_signal *se)
> struct tevent_context *ev = se->event_ctx;
>
> DLIST_REMOVE(ev->signal_events, se);
> -
> - if (ev->signal_events == NULL && ev->pipe_fde != NULL) {
> - /*
> - * This was the last signal. Destroy the pipe.
> - */
> - TALLOC_FREE(ev->pipe_fde);
> -
> - close(ev->pipe_fds[0]);
> - close(ev->pipe_fds[1]);
> - }
> }
>
> talloc_free(sl);
> @@ -233,21 +217,6 @@ static int tevent_signal_destructor(struct tevent_signal *se)
> }
>
> /*
> - this is part of the pipe hack needed to avoid the signal race condition
> -*/
> -static void signal_pipe_handler(struct tevent_context *ev, struct tevent_fd *fde,
> - uint16_t flags, void *_private)
> -{
> - ssize_t ret;
> -
> - char c[16];
> - /* its non-blocking, doesn't matter if we read too much */
> - do {
> - ret = read(fde->fd, c, sizeof(c));
> - } while (ret == -1 && errno == EINTR);
> -}
> -
> -/*
> add a signal event
> return NULL on failure (memory allocation error)
> */
> @@ -263,6 +232,13 @@ struct tevent_signal *tevent_common_add_signal(struct tevent_context *ev,
> struct tevent_signal *se;
> struct tevent_common_signal_list *sl;
> sigset_t set, oldset;
> + int ret;
> +
> + ret = tevent_common_wakeup_init(ev);
> + if (ret != 0) {
> + errno = ret;
> + return NULL;
> + }
>
> if (signum >= TEVENT_NUM_SIGNALS) {
> errno = EINVAL;
> @@ -304,26 +280,6 @@ struct tevent_signal *tevent_common_add_signal(struct tevent_context *ev,
> return NULL;
> }
>
> - /* we need to setup the pipe hack handler if not already
> - setup */
> - if (ev->pipe_fde == NULL) {
> - if (pipe(ev->pipe_fds) == -1) {
> - talloc_free(se);
> - return NULL;
> - }
> - ev_set_blocking(ev->pipe_fds[0], false);
> - ev_set_blocking(ev->pipe_fds[1], false);
> - ev->pipe_fde = tevent_add_fd(ev, ev, ev->pipe_fds[0],
> - TEVENT_FD_READ,
> - signal_pipe_handler, NULL);
> - if (!ev->pipe_fde) {
> - close(ev->pipe_fds[0]);
> - close(ev->pipe_fds[1]);
> - talloc_free(se);
> - return NULL;
> - }
> - }
> -
> /* only install a signal handler if not already installed */
> if (sig_state->sig_handlers[signum] == NULL) {
> struct sigaction act;
> --
> 2.1.4
>
>
> From 5c9f9ab188fc36195c668d0b7f1954d2921461c0 Mon Sep 17 00:00:00 2001
> From: Volker Lendecke <vl at samba.org>
> Date: Mon, 8 Aug 2016 11:26:37 +0200
> Subject: [PATCH 04/23] tevent: Add threaded immediate activation
>
> This is infrastructure to improve our async r/w result handling and latency.
> The pthreadpool signalling goes through a pipe. This has downsides: The main
> event loop has to go through a read on the pipe before it can ship the result.
> Also, it is not guaranteed by poll/epoll that the pthreadpool signal pipe is
> handled with top priority. When an async pread/pwrite has finished, we should
> immediately ship the result to the client, not waiting for anything else.
>
> This patch enables tevent_immediate structs as job signalling. This means a
> busy main tevent loop will handle the threaded job completion before any timed
> or file descriptor events. Opposite to Jeremy's tevent_thread_proxy this is
> done by a modification of the main event loop by looking at a linked list under
> a central mutex.
>
> Regarding performance: In a later commit I've created a test that does nothing
> but fire one immediate over and over again. If you add a phread_mutex_lock and
> unlock pair in the immediate handler, you lose roughly 25% of rounds per
> second, so it is measurable. It is questionable that will be measurable in the
> real world, but to counter concerns activation of immediates needs to go
> through a new struct tevent_threaded_context. Only if such a
> tevent_threaded_context exists for a tevent context, the main loop takes the
> hit to look at the mutex'ed list of finished jobs.
>
> This patch by design does not care about talloc hierarchies. The idea is that
> the main thread owning the tevent context creates a chunk of memory and
> prepares the tevent_immediate indication job completion. The main thread hands
> the memory chunk together with the immediate as a job description over to a
> helper thread. The helper thread does its job and upon completion calls
> tevent_threaded_schedule_immediate with the already-prepared immediate. From
> that point on memory ownership is again transferred to the main thread.
>
> Signed-off-by: Volker Lendecke <vl at samba.org>
> Reviewed-by: Stefan Metzmacher <metze at samba.org>
> ---
> lib/tevent/ABI/tevent-0.9.29.sigs | 3 +
> lib/tevent/tevent.c | 147 ++++++++++++++++++++++++++++++++++++++
> lib/tevent/tevent.h | 74 +++++++++++++++++++
> lib/tevent/tevent_epoll.c | 4 ++
> lib/tevent/tevent_internal.h | 17 +++++
> lib/tevent/tevent_poll.c | 4 ++
> lib/tevent/tevent_port.c | 4 ++
> lib/tevent/tevent_select.c | 4 ++
> lib/tevent/tevent_threads.c | 119 ++++++++++++++++++++++++++++++
> lib/tevent/wscript | 6 +-
> 10 files changed, 381 insertions(+), 1 deletion(-)
>
> diff --git a/lib/tevent/ABI/tevent-0.9.29.sigs b/lib/tevent/ABI/tevent-0.9.29.sigs
> index 4b64741..9b8bfa1 100644
> --- a/lib/tevent/ABI/tevent-0.9.29.sigs
> +++ b/lib/tevent/ABI/tevent-0.9.29.sigs
> @@ -16,6 +16,7 @@ _tevent_req_nomem: bool (const void *, struct tevent_req *, const char *)
> _tevent_req_notify_callback: void (struct tevent_req *, const char *)
> _tevent_req_oom: void (struct tevent_req *, const char *)
> _tevent_schedule_immediate: void (struct tevent_immediate *, struct tevent_context *, tevent_immediate_handler_t, void *, const char *, const char *)
> +_tevent_threaded_schedule_immediate: void (struct tevent_threaded_context *, struct tevent_immediate *, tevent_immediate_handler_t, void *, const char *, const char *)
> tevent_backend_list: const char **(TALLOC_CTX *)
> tevent_cleanup_pending_signal_handlers: void (struct tevent_signal *)
> tevent_common_add_fd: struct tevent_fd *(struct tevent_context *, TALLOC_CTX *, int, uint16_t, tevent_fd_handler_t, void *, const char *, const char *)
> @@ -33,6 +34,7 @@ tevent_common_loop_immediate: bool (struct tevent_context *)
> tevent_common_loop_timer_delay: struct timeval (struct tevent_context *)
> tevent_common_loop_wait: int (struct tevent_context *, const char *)
> tevent_common_schedule_immediate: void (struct tevent_immediate *, struct tevent_context *, tevent_immediate_handler_t, void *, const char *, const char *)
> +tevent_common_threaded_activate_immediate: void (struct tevent_context *)
> tevent_common_wakeup: int (struct tevent_context *)
> tevent_common_wakeup_init: int (struct tevent_context *)
> tevent_context_init: struct tevent_context *(TALLOC_CTX *)
> @@ -80,6 +82,7 @@ tevent_set_trace_callback: void (struct tevent_context *, tevent_trace_callback_
> tevent_signal_support: bool (struct tevent_context *)
> tevent_thread_proxy_create: struct tevent_thread_proxy *(struct tevent_context *)
> tevent_thread_proxy_schedule: void (struct tevent_thread_proxy *, struct tevent_immediate **, tevent_immediate_handler_t, void *)
> +tevent_threaded_context_create: struct tevent_threaded_context *(TALLOC_CTX *, struct tevent_context *)
> tevent_timeval_add: struct timeval (const struct timeval *, uint32_t, uint32_t)
> tevent_timeval_compare: int (const struct timeval *, const struct timeval *)
> tevent_timeval_current: struct timeval (void)
> diff --git a/lib/tevent/tevent.c b/lib/tevent/tevent.c
> index 34cd402..b8178b2 100644
> --- a/lib/tevent/tevent.c
> +++ b/lib/tevent/tevent.c
> @@ -59,11 +59,16 @@
> */
> #include "replace.h"
> #include "system/filesys.h"
> +#ifdef HAVE_PTHREAD
> +#include "system/threads.h"
> +#endif
> #define TEVENT_DEPRECATED 1
> #include "tevent.h"
> #include "tevent_internal.h"
> #include "tevent_util.h"
>
> +static void tevent_abort(struct tevent_context *ev, const char *reason);
> +
> struct tevent_ops_list {
> struct tevent_ops_list *next, *prev;
> const char *name;
> @@ -173,6 +178,91 @@ const char **tevent_backend_list(TALLOC_CTX *mem_ctx)
> return list;
> }
>
> +#ifdef HAVE_PTHREAD
> +
> +static pthread_mutex_t tevent_contexts_mutex = PTHREAD_MUTEX_INITIALIZER;
> +static struct tevent_context *tevent_contexts = NULL;
> +static pthread_once_t tevent_atfork_initialized = PTHREAD_ONCE_INIT;
> +
> +static void tevent_atfork_prepare(void)
> +{
> + struct tevent_context *ev;
> + int ret;
> +
> + ret = pthread_mutex_lock(&tevent_contexts_mutex);
> + if (ret != 0) {
> + abort();
> + }
> +
> + for (ev = tevent_contexts; ev != NULL; ev = ev->next) {
> + ret = pthread_mutex_lock(&ev->scheduled_mutex);
> + if (ret != 0) {
> + tevent_abort(ev, "pthread_mutex_lock failed");
> + }
> + }
> +}
> +
> +static void tevent_atfork_parent(void)
> +{
> + struct tevent_context *ev;
> + int ret;
> +
> + for (ev = DLIST_TAIL(tevent_contexts); ev != NULL;
> + ev = DLIST_PREV(ev)) {
> + ret = pthread_mutex_unlock(&ev->scheduled_mutex);
> + if (ret != 0) {
> + tevent_abort(ev, "pthread_mutex_unlock failed");
> + }
> + }
> +
> + ret = pthread_mutex_unlock(&tevent_contexts_mutex);
> + if (ret != 0) {
> + abort();
> + }
> +}
> +
> +static void tevent_atfork_child(void)
> +{
> + struct tevent_context *ev;
> + int ret;
> +
> + for (ev = DLIST_TAIL(tevent_contexts); ev != NULL;
> + ev = DLIST_PREV(ev)) {
> + struct tevent_threaded_context *tctx;
> +
> + for (tctx = ev->threaded_contexts; tctx != NULL;
> + tctx = tctx->next) {
> + tctx->event_ctx = NULL;
> + }
> +
> + ev->threaded_contexts = NULL;
> +
> + ret = pthread_mutex_unlock(&ev->scheduled_mutex);
> + if (ret != 0) {
> + tevent_abort(ev, "pthread_mutex_unlock failed");
> + }
> + }
> +
> + ret = pthread_mutex_unlock(&tevent_contexts_mutex);
> + if (ret != 0) {
> + abort();
> + }
> +}
> +
> +static void tevent_prep_atfork(void)
> +{
> + int ret;
> +
> + ret = pthread_atfork(tevent_atfork_prepare,
> + tevent_atfork_parent,
> + tevent_atfork_child);
> + if (ret != 0) {
> + abort();
> + }
> +}
> +
> +#endif
> +
> int tevent_common_context_destructor(struct tevent_context *ev)
> {
> struct tevent_fd *fd, *fn;
> @@ -180,6 +270,33 @@ int tevent_common_context_destructor(struct tevent_context *ev)
> struct tevent_immediate *ie, *in;
> struct tevent_signal *se, *sn;
>
> +#ifdef HAVE_PTHREAD
> + int ret;
> +
> + ret = pthread_mutex_lock(&tevent_contexts_mutex);
> + if (ret != 0) {
> + abort();
> + }
> +
> + DLIST_REMOVE(tevent_contexts, ev);
> +
> + ret = pthread_mutex_unlock(&tevent_contexts_mutex);
> + if (ret != 0) {
> + abort();
> + }
> +#endif
> +
> + if (ev->threaded_contexts != NULL) {
> + /*
> + * Threaded contexts are indicators that threads are
> + * about to send us immediates via
> + * tevent_threaded_schedule_immediate. The caller
> + * needs to make sure that the tevent context lives
> + * long enough to receive immediates from all threads.
> + */
> + tevent_abort(ev, "threaded contexts exist");
> + }
> +
> if (ev->pipe_fde) {
> talloc_free(ev->pipe_fde);
> close(ev->pipe_fds[0]);
> @@ -255,6 +372,36 @@ struct tevent_context *tevent_context_init_ops(TALLOC_CTX *mem_ctx,
> ev = talloc_zero(mem_ctx, struct tevent_context);
> if (!ev) return NULL;
>
> +#ifdef HAVE_PTHREAD
> +
> + ret = pthread_once(&tevent_atfork_initialized, tevent_prep_atfork);
> + if (ret != 0) {
> + talloc_free(ev);
> + return NULL;
> + }
> +
> + ret = pthread_mutex_init(&ev->scheduled_mutex, NULL);
> + if (ret != 0) {
> + talloc_free(ev);
> + return NULL;
> + }
> +
> + ret = pthread_mutex_lock(&tevent_contexts_mutex);
> + if (ret != 0) {
> + pthread_mutex_destroy(&ev->scheduled_mutex);
> + talloc_free(ev);
> + return NULL;
> + }
> +
> + DLIST_ADD(tevent_contexts, ev);
> +
> + ret = pthread_mutex_unlock(&tevent_contexts_mutex);
> + if (ret != 0) {
> + abort();
> + }
> +
> +#endif
> +
> talloc_set_destructor(ev, tevent_common_context_destructor);
>
> ev->ops = ops;
> diff --git a/lib/tevent/tevent.h b/lib/tevent/tevent.h
> index 1c1271b..2432344 100644
> --- a/lib/tevent/tevent.h
> +++ b/lib/tevent/tevent.h
> @@ -40,6 +40,7 @@ struct tevent_timer;
> struct tevent_immediate;
> struct tevent_signal;
> struct tevent_thread_proxy;
> +struct tevent_threaded_context;
>
> /**
> * @defgroup tevent The tevent API
> @@ -1750,6 +1751,79 @@ void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
> tevent_immediate_handler_t handler,
> void *pp_private_data);
>
> +/*
> + * @brief Create a context for threaded activation of immediates
> + *
> + * A tevent_treaded_context provides a link into an event
> + * context. Using tevent_threaded_schedule_immediate, it is possible
> + * to activate an immediate event from within a thread.
> + *
> + * It is the duty of the caller of tevent_threaded_context_create() to
> + * keep the event context around longer than any
> + * tevent_threaded_context. tevent will abort if ev is talllc_free'ed
> + * with an active tevent_threaded_context.
> + *
> + * If tevent is build without pthread support, this always returns
> + * NULL with errno=ENOSYS.
> + *
> + * @param[in] mem_ctx The talloc memory context to use.
> + * @param[in] ev The event context to link this to.
> + * @return The threaded context, or NULL with errno set.
> + *
> + * @see tevent_threaded_schedule_immediate()
> + *
> + * @note Available as of tevent 0.9.30
> + */
> +struct tevent_threaded_context *tevent_threaded_context_create(
> + TALLOC_CTX *mem_ctx, struct tevent_context *ev);
> +
> +#ifdef DOXYGEN
> +/*
> + * @brief Activate an immediate from a thread
> + *
> + * Activate an immediate from within a thread.
> + *
> + * This routine does not watch out for talloc hierarchies. This means
> + * that it is highly recommended to create the tevent_immediate in the
> + * thread owning tctx, allocate a threaded job description for the
> + * thread, hand over both pointers to a helper thread and not touch it
> + * in the main thread at all anymore.
> + *
> + * tevent_threaded_schedule_immediate is intended as a job completion
> + * indicator for simple threaded helpers.
> + *
> + * Please be aware that tevent_threaded_schedule_immediate is very
> + * picky about its arguments: An immediate may not already be
> + * activated and the handler must exist. With
> + * tevent_threaded_schedule_immediate memory ownership is transferred
> + * to the main thread holding the tevent context behind tctx, the
> + * helper thread can't access it anymore.
> + *
> + * @param[in] tctx The threaded context to go through
> + * @param[in] im The immediate event to activate
> + * @param[in] handler The immediate handler to call in the main thread
> + * @param[in] private_data Pointer for the immediate handler
> + *
> + * @see tevent_threaded_context_create()
> + *
> + * @note Available as of tevent 0.9.30
> + */
> +void tevent_threaded_schedule_immediate(struct tevent_threaded_context *tctx,
> + struct tevent_immediate *im,
> + tevent_immediate_handler_t handler,
> + void *private_data);
> +#else
> +void _tevent_threaded_schedule_immediate(struct tevent_threaded_context *tctx,
> + struct tevent_immediate *im,
> + tevent_immediate_handler_t handler,
> + void *private_data,
> + const char *handler_name,
> + const char *location);
> +#define tevent_threaded_schedule_immediate(tctx, im, handler, private_data) \
> + _tevent_threaded_schedule_immediate(tctx, im, handler, private_data, \
> + #handler, __location__);
> +#endif
> +
> #ifdef TEVENT_DEPRECATED
> #ifndef _DEPRECATED_
> #ifdef HAVE___ATTRIBUTE__
> diff --git a/lib/tevent/tevent_epoll.c b/lib/tevent/tevent_epoll.c
> index 507ea5c..4147c67 100644
> --- a/lib/tevent/tevent_epoll.c
> +++ b/lib/tevent/tevent_epoll.c
> @@ -903,6 +903,10 @@ static int epoll_event_loop_once(struct tevent_context *ev, const char *location
> return 0;
> }
>
> + if (ev->threaded_contexts != NULL) {
> + tevent_common_threaded_activate_immediate(ev);
> + }
> +
> if (ev->immediate_events &&
> tevent_common_loop_immediate(ev)) {
> return 0;
> diff --git a/lib/tevent/tevent_internal.h b/lib/tevent/tevent_internal.h
> index 8362770..6b29547 100644
> --- a/lib/tevent/tevent_internal.h
> +++ b/lib/tevent/tevent_internal.h
> @@ -228,6 +228,11 @@ struct tevent_signal {
> void *additional_data;
> };
>
> +struct tevent_threaded_context {
> + struct tevent_threaded_context *next, *prev;
> + struct tevent_context *event_ctx;
> +};
> +
> struct tevent_debug_ops {
> void (*debug)(void *context, enum tevent_debug_level level,
> const char *fmt, va_list ap) PRINTF_ATTRIBUTE(3,0);
> @@ -247,6 +252,13 @@ struct tevent_context {
> /* list of timed events - used by common code */
> struct tevent_timer *timer_events;
>
> + /* List of threaded job indicators */
> + struct tevent_threaded_context *threaded_contexts;
> +
> + /* List of scheduled immediates */
> + pthread_mutex_t scheduled_mutex;
> + struct tevent_immediate *scheduled_immediates;
> +
> /* list of immediate events - used by common code */
> struct tevent_immediate *immediate_events;
>
> @@ -282,6 +294,10 @@ struct tevent_context {
> * tevent_common_add_timer_v2()
> */
> struct tevent_timer *last_zero_timer;
> +
> +#ifdef HAVE_PTHREAD
> + struct tevent_context *prev, *next;
> +#endif
> };
>
> const struct tevent_ops *tevent_find_ops_byname(const char *name);
> @@ -327,6 +343,7 @@ void tevent_common_schedule_immediate(struct tevent_immediate *im,
> const char *handler_name,
> const char *location);
> bool tevent_common_loop_immediate(struct tevent_context *ev);
> +void tevent_common_threaded_activate_immediate(struct tevent_context *ev);
>
> bool tevent_common_have_events(struct tevent_context *ev);
> int tevent_common_wakeup_init(struct tevent_context *ev);
> diff --git a/lib/tevent/tevent_poll.c b/lib/tevent/tevent_poll.c
> index 3547e91..09d85fa 100644
> --- a/lib/tevent/tevent_poll.c
> +++ b/lib/tevent/tevent_poll.c
> @@ -645,6 +645,10 @@ static int poll_event_loop_once(struct tevent_context *ev,
> return 0;
> }
>
> + if (ev->threaded_contexts != NULL) {
> + tevent_common_threaded_activate_immediate(ev);
> + }
> +
> if (ev->immediate_events &&
> tevent_common_loop_immediate(ev)) {
> return 0;
> diff --git a/lib/tevent/tevent_port.c b/lib/tevent/tevent_port.c
> index 4b524df..8cf9fd1 100644
> --- a/lib/tevent/tevent_port.c
> +++ b/lib/tevent/tevent_port.c
> @@ -760,6 +760,10 @@ static int port_event_loop_once(struct tevent_context *ev, const char *location)
> return 0;
> }
>
> + if (ev->threaded_contexts != NULL) {
> + tevent_common_threaded_activate_immediate(ev);
> + }
> +
> if (ev->immediate_events &&
> tevent_common_loop_immediate(ev)) {
> return 0;
> diff --git a/lib/tevent/tevent_select.c b/lib/tevent/tevent_select.c
> index ec7565d..55dd0b6 100644
> --- a/lib/tevent/tevent_select.c
> +++ b/lib/tevent/tevent_select.c
> @@ -244,6 +244,10 @@ static int select_event_loop_once(struct tevent_context *ev, const char *locatio
> return 0;
> }
>
> + if (ev->threaded_contexts != NULL) {
> + tevent_common_threaded_activate_immediate(ev);
> + }
> +
> if (ev->immediate_events &&
> tevent_common_loop_immediate(ev)) {
> return 0;
> diff --git a/lib/tevent/tevent_threads.c b/lib/tevent/tevent_threads.c
> index 22b854c..e42759e 100644
> --- a/lib/tevent/tevent_threads.c
> +++ b/lib/tevent/tevent_threads.c
> @@ -371,3 +371,122 @@ void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
> ;
> }
> #endif
> +
> +static int tevent_threaded_context_destructor(
> + struct tevent_threaded_context *tctx)
> +{
> + if (tctx->event_ctx != NULL) {
> + DLIST_REMOVE(tctx->event_ctx->threaded_contexts, tctx);
> + }
> + return 0;
> +}
> +
> +struct tevent_threaded_context *tevent_threaded_context_create(
> + TALLOC_CTX *mem_ctx, struct tevent_context *ev)
> +{
> +#ifdef HAVE_PTHREAD
> + struct tevent_threaded_context *tctx;
> + int ret;
> +
> + ret = tevent_common_wakeup_init(ev);
> + if (ret != 0) {
> + errno = ret;
> + return NULL;
> + }
> +
> + tctx = talloc(mem_ctx, struct tevent_threaded_context);
> + if (tctx == NULL) {
> + return NULL;
> + }
> + tctx->event_ctx = ev;
> +
> + DLIST_ADD(ev->threaded_contexts, tctx);
> + talloc_set_destructor(tctx, tevent_threaded_context_destructor);
> +
> + return tctx;
> +#else
> + errno = ENOSYS;
> + return NULL;
> +#endif
> +}
> +
> +void _tevent_threaded_schedule_immediate(struct tevent_threaded_context *tctx,
> + struct tevent_immediate *im,
> + tevent_immediate_handler_t handler,
> + void *private_data,
> + const char *handler_name,
> + const char *location)
> +{
> +#ifdef HAVE_PTHREAD
> + struct tevent_context *ev = tctx->event_ctx;
> + int ret;
> +
> + if ((im->event_ctx != NULL) || (handler == NULL)) {
> + abort();
> + }
> +
> + im->event_ctx = ev;
> + im->handler = handler;
> + im->private_data = private_data;
> + im->handler_name = handler_name;
> + im->schedule_location = location;
> + im->cancel_fn = NULL;
> + im->additional_data = NULL;
> +
> + ret = pthread_mutex_lock(&ev->scheduled_mutex);
> + if (ret != 0) {
> + abort();
> + }
> +
> + DLIST_ADD_END(ev->scheduled_immediates, im);
> +
> + ret = pthread_mutex_unlock(&ev->scheduled_mutex);
> + if (ret != 0) {
> + abort();
> + }
> +
> + /*
> + * We might want to wake up the main thread under the lock. We
> + * had a slightly similar situation in pthreadpool, changed
> + * with 1c4284c7395f23. This is not exactly the same, as the
> + * wakeup is only a last-resort thing in case the main thread
> + * is sleeping. Doing the wakeup under the lock can easily
> + * lead to a contended mutex, which is much more expensive
> + * than a noncontended one. So I'd opt for the lower footprint
> + * initially. Maybe we have to change that later.
> + */
> + tevent_common_wakeup(ev);
> +#else
> + /*
> + * tevent_threaded_context_create() returned NULL with ENOSYS...
> + */
> + abort();
> +#endif
> +}
> +
> +void tevent_common_threaded_activate_immediate(struct tevent_context *ev)
> +{
> +#ifdef HAVE_PTHREAD
> + int ret;
> + ret = pthread_mutex_lock(&ev->scheduled_mutex);
> + if (ret != 0) {
> + abort();
> + }
> +
> + while (ev->scheduled_immediates != NULL) {
> + struct tevent_immediate *im = ev->scheduled_immediates;
> + DLIST_REMOVE(ev->scheduled_immediates, im);
> + DLIST_ADD_END(ev->immediate_events, im);
> + }
> +
> + ret = pthread_mutex_unlock(&ev->scheduled_mutex);
> + if (ret != 0) {
> + abort();
> + }
> +#else
> + /*
> + * tevent_threaded_context_create() returned NULL with ENOSYS...
> + */
> + abort();
> +#endif
> +}
> diff --git a/lib/tevent/wscript b/lib/tevent/wscript
> index 71b9475..3e0b413 100755
> --- a/lib/tevent/wscript
> +++ b/lib/tevent/wscript
> @@ -99,9 +99,13 @@ def build(bld):
> private_library = True
>
> if not bld.CONFIG_SET('USING_SYSTEM_TEVENT'):
> + tevent_deps = 'replace talloc'
> + if bld.CONFIG_SET('HAVE_PTHREAD'):
> + tevent_deps += ' pthread'
> +
> bld.SAMBA_LIBRARY('tevent',
> SRC,
> - deps='replace talloc',
> + deps=tevent_deps,
> enabled= not bld.CONFIG_SET('USING_SYSTEM_TEVENT'),
> includes='.',
> abi_directory='ABI',
> --
> 2.1.4
>
>
> From 04e969c9973257097ef488a6ebf944e033e80e6e Mon Sep 17 00:00:00 2001
> From: Volker Lendecke <vl at samba.org>
> Date: Mon, 8 Aug 2016 12:51:56 +0200
> Subject: [PATCH 05/23] lib: enable threaded immediates in source3
>
> Logically this belongs into the previous patch, but tevent deserves isolated
> patches :-)
>
> Signed-off-by: Volker Lendecke <vl at samba.org>
> Reviewed-by: Stefan Metzmacher <metze at samba.org>
> ---
> source3/lib/events.c | 4 ++++
> 1 file changed, 4 insertions(+)
>
> diff --git a/source3/lib/events.c b/source3/lib/events.c
> index 2e862ca..a866ef5 100644
> --- a/source3/lib/events.c
> +++ b/source3/lib/events.c
> @@ -188,6 +188,10 @@ bool run_events_poll(struct tevent_context *ev, int pollrtn,
> return true;
> }
>
> + if (ev->threaded_contexts != NULL) {
> + tevent_common_threaded_activate_immediate(ev);
> + }
> +
> if (ev->immediate_events &&
> tevent_common_loop_immediate(ev)) {
> return true;
> --
> 2.1.4
>
>
> From 86815b12b5d37b4d75d042ab4897bbca54f23aca Mon Sep 17 00:00:00 2001
> From: Volker Lendecke <vl at samba.org>
> Date: Mon, 8 Aug 2016 08:56:23 +0200
> Subject: [PATCH 06/23] tevent: reorder tevent_context for cache locality
>
> No functionality change. This just looks better in objdump --disassemble :-)
>
> Signed-off-by: Volker Lendecke <vl at samba.org>
> Reviewed-by: Stefan Metzmacher <metze at samba.org>
> ---
> lib/tevent/tevent_internal.h | 25 ++++++++++++++++---------
> 1 file changed, 16 insertions(+), 9 deletions(-)
>
> diff --git a/lib/tevent/tevent_internal.h b/lib/tevent/tevent_internal.h
> index 6b29547..f17ce94 100644
> --- a/lib/tevent/tevent_internal.h
> +++ b/lib/tevent/tevent_internal.h
> @@ -246,25 +246,32 @@ struct tevent_context {
> /* the specific events implementation */
> const struct tevent_ops *ops;
>
> + /*
> + * The following three pointers are queried on every loop_once
> + * in the order in which they appear here. Not measured, but
> + * hopefully putting them at the top together with "ops"
> + * should make tevent a *bit* more cache-friendly than before.
> + */
> +
> + /* list of signal events - used by common code */
> + struct tevent_signal *signal_events;
> +
> + /* List of threaded job indicators */
> + struct tevent_threaded_context *threaded_contexts;
> +
> + /* list of immediate events - used by common code */
> + struct tevent_immediate *immediate_events;
> +
> /* list of fd events - used by common code */
> struct tevent_fd *fd_events;
>
> /* list of timed events - used by common code */
> struct tevent_timer *timer_events;
>
> - /* List of threaded job indicators */
> - struct tevent_threaded_context *threaded_contexts;
> -
> /* List of scheduled immediates */
> pthread_mutex_t scheduled_mutex;
> struct tevent_immediate *scheduled_immediates;
>
> - /* list of immediate events - used by common code */
> - struct tevent_immediate *immediate_events;
> -
> - /* list of signal events - used by common code */
> - struct tevent_signal *signal_events;
> -
> /* this is private for the events_ops implementation */
> void *additional_data;
>
> --
> 2.1.4
>
>
> From 04966c4d0a7567923eef759a2f4d3c655bb35096 Mon Sep 17 00:00:00 2001
> From: Volker Lendecke <vl at samba.org>
> Date: Mon, 8 Aug 2016 12:53:08 +0200
> Subject: [PATCH 07/23] tevent: Simple test for threaded immediates
>
> Signed-off-by: Volker Lendecke <vl at samba.org>
> Reviewed-by: Stefan Metzmacher <metze at samba.org>
> ---
> lib/tevent/testsuite.c | 99 ++++++++++++++++++++++++++++++++++++++++++++++++++
> 1 file changed, 99 insertions(+)
>
> diff --git a/lib/tevent/testsuite.c b/lib/tevent/testsuite.c
> index b37c7b1..4783ab4 100644
> --- a/lib/tevent/testsuite.c
> +++ b/lib/tevent/testsuite.c
> @@ -1148,6 +1148,101 @@ static bool test_multi_tevent_threaded_1(struct torture_context *test,
> talloc_free(master_ev);
> return true;
> }
> +
> +struct threaded_test_2 {
> + struct tevent_threaded_context *tctx;
> + struct tevent_immediate *im;
> + pthread_t thread_id;
> +};
> +
> +static void master_callback_2(struct tevent_context *ev,
> + struct tevent_immediate *im,
> + void *private_data);
> +
> +static void *thread_fn_2(void *private_data)
> +{
> + struct threaded_test_2 *state = private_data;
> +
> + state->thread_id = pthread_self();
> +
> + usleep(random() % 7000);
> +
> + tevent_threaded_schedule_immediate(
> + state->tctx, state->im, master_callback_2, state);
> +
> + return NULL;
> +}
> +
> +static void master_callback_2(struct tevent_context *ev,
> + struct tevent_immediate *im,
> + void *private_data)
> +{
> + struct threaded_test_2 *state = private_data;
> + int i;
> +
> + for (i = 0; i < NUM_TEVENT_THREADS; i++) {
> + if (pthread_equal(state->thread_id, thread_map[i])) {
> + break;
> + }
> + }
> + torture_comment(thread_test_ctx,
> + "Callback_2 %u from thread %u\n",
> + thread_counter,
> + i);
> + thread_counter++;
> +}
> +
> +static bool test_multi_tevent_threaded_2(struct torture_context *test,
> + const void *test_data)
> +{
> + unsigned i;
> +
> + struct tevent_context *ev;
> + struct tevent_threaded_context *tctx;
> + int ret;
> +
> + thread_test_ctx = test;
> + thread_counter = 0;
> +
> + ev = tevent_context_init(test);
> + torture_assert(test, ev != NULL, "tevent_context_init failed");
> +
> + tctx = tevent_threaded_context_create(ev, ev);
> + torture_assert(test, tctx != NULL,
> + "tevent_threaded_context_create failed");
> +
> + for (i=0; i<NUM_TEVENT_THREADS; i++) {
> + struct threaded_test_2 *state;
> +
> + state = talloc(ev, struct threaded_test_2);
> + torture_assert(test, state != NULL, "talloc failed");
> +
> + state->tctx = tctx;
> + state->im = tevent_create_immediate(state);
> + torture_assert(test, state->im != NULL,
> + "tevent_create_immediate failed");
> +
> + ret = pthread_create(&thread_map[i], NULL, thread_fn_2, state);
> + torture_assert(test, ret == 0, "pthread_create failed");
> + }
> +
> + while (thread_counter < NUM_TEVENT_THREADS) {
> + ret = tevent_loop_once(ev);
> + torture_assert(test, ret == 0, "tevent_loop_once failed");
> + }
> +
> + /* Wait for all the threads to finish - join 'em. */
> + for (i = 0; i < NUM_TEVENT_THREADS; i++) {
> + void *retval;
> + ret = pthread_join(thread_map[i], &retval);
> + torture_assert(test, ret == 0, "pthread_join failed");
> + /* Free the child thread event context. */
> + }
> +
> + talloc_free(tctx);
> + talloc_free(ev);
> + return true;
> +}
> #endif
>
> struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx)
> @@ -1190,6 +1285,10 @@ struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx)
> test_multi_tevent_threaded_1,
> NULL);
>
> + torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded_2",
> + test_multi_tevent_threaded_2,
> + NULL);
> +
> #endif
>
> return suite;
> --
> 2.1.4
>
>
> From 5d9587fdf033102a202d8f3002f42772aa7c9f93 Mon Sep 17 00:00:00 2001
> From: Volker Lendecke <vl at samba.org>
> Date: Fri, 12 Aug 2016 16:00:56 +0200
> Subject: [PATCH 08/23] tevent: Move rundown of the event pipe
>
> Purely cosmetic change: This moves closing the signal/thread event pipe
> to where it's opened. This prepares the eventfd support, making the
> "magic" for eventfd more obvious.
>
> Signed-off-by: Volker Lendecke <vl at samba.org>
> Reviewed-by: Stefan Metzmacher <metze at samba.org>
> ---
> lib/tevent/tevent.c | 21 +++++++++++++++------
> 1 file changed, 15 insertions(+), 6 deletions(-)
>
> diff --git a/lib/tevent/tevent.c b/lib/tevent/tevent.c
> index b8178b2..d286850 100644
> --- a/lib/tevent/tevent.c
> +++ b/lib/tevent/tevent.c
> @@ -178,6 +178,8 @@ const char **tevent_backend_list(TALLOC_CTX *mem_ctx)
> return list;
> }
>
> +static void tevent_common_wakeup_fini(struct tevent_context *ev);
> +
> #ifdef HAVE_PTHREAD
>
> static pthread_mutex_t tevent_contexts_mutex = PTHREAD_MUTEX_INITIALIZER;
> @@ -297,12 +299,7 @@ int tevent_common_context_destructor(struct tevent_context *ev)
> tevent_abort(ev, "threaded contexts exist");
> }
>
> - if (ev->pipe_fde) {
> - talloc_free(ev->pipe_fde);
> - close(ev->pipe_fds[0]);
> - close(ev->pipe_fds[1]);
> - ev->pipe_fde = NULL;
> - }
> + tevent_common_wakeup_fini(ev);
>
> for (fd = ev->fd_events; fd; fd = fn) {
> fn = fd->next;
> @@ -896,3 +893,15 @@ int tevent_common_wakeup(struct tevent_context *ev)
>
> return 0;
> }
> +
> +static void tevent_common_wakeup_fini(struct tevent_context *ev)
> +{
> + if (ev->pipe_fde == NULL) {
> + return;
> + }
> +
> + TALLOC_FREE(ev->pipe_fde);
> +
> + close(ev->pipe_fds[0]);
> + close(ev->pipe_fds[1]);
> +}
> --
> 2.1.4
>
>
> From 0798ca4478efbb30ca795a99fcdf0267a3aa3680 Mon Sep 17 00:00:00 2001
> From: Volker Lendecke <vl at samba.org>
> Date: Fri, 12 Aug 2016 16:07:07 +0200
> Subject: [PATCH 09/23] tevent: Move a variable declaration into a while block
>
> Signed-off-by: Volker Lendecke <vl at samba.org>
> Reviewed-by: Stefan Metzmacher <metze at samba.org>
> ---
> lib/tevent/tevent.c | 2 +-
> 1 file changed, 1 insertion(+), 1 deletion(-)
>
> diff --git a/lib/tevent/tevent.c b/lib/tevent/tevent.c
> index d286850..e35eb5d 100644
> --- a/lib/tevent/tevent.c
> +++ b/lib/tevent/tevent.c
> @@ -840,9 +840,9 @@ static void wakeup_pipe_handler(struct tevent_context *ev,
> {
> ssize_t ret;
>
> - char c[16];
> /* its non-blocking, doesn't matter if we read too much */
> do {
> + char c[16];
> ret = read(fde->fd, c, sizeof(c));
> } while (ret == -1 && errno == EINTR);
> }
> --
> 2.1.4
>
>
> From ce1afb342a225a03fcecc9b050f84ae8fe6eb423 Mon Sep 17 00:00:00 2001
> From: Volker Lendecke <vl at samba.org>
> Date: Fri, 12 Aug 2016 16:32:33 +0200
> Subject: [PATCH 10/23] tevent: Use eventfd for signal/thread wakeup
>
> According to the manpage, eventfd is cheaper than a pipe. At least, we can save
> a file descriptor and space for it in struct tevent_context :-)
>
> Signed-off-by: Volker Lendecke <vl at samba.org>
> Reviewed-by: Stefan Metzmacher <metze at samba.org>
> ---
> lib/tevent/tevent.c | 66 ++++++++++++++++++++++++++++++++------------
> lib/tevent/tevent_internal.h | 7 +++--
> 2 files changed, 53 insertions(+), 20 deletions(-)
>
> diff --git a/lib/tevent/tevent.c b/lib/tevent/tevent.c
> index e35eb5d..331be0e 100644
> --- a/lib/tevent/tevent.c
> +++ b/lib/tevent/tevent.c
> @@ -66,6 +66,9 @@
> #include "tevent.h"
> #include "tevent_internal.h"
> #include "tevent_util.h"
> +#ifdef HAVE_EVENTFD
> +#include <sys/eventfd.h>
> +#endif
>
> static void tevent_abort(struct tevent_context *ev, const char *reason);
>
> @@ -767,7 +770,7 @@ done:
> bool tevent_common_have_events(struct tevent_context *ev)
> {
> if (ev->fd_events != NULL) {
> - if (ev->fd_events != ev->pipe_fde) {
> + if (ev->fd_events != ev->wakeup_fde) {
> return true;
> }
> if (ev->fd_events->next != NULL) {
> @@ -840,10 +843,14 @@ static void wakeup_pipe_handler(struct tevent_context *ev,
> {
> ssize_t ret;
>
> - /* its non-blocking, doesn't matter if we read too much */
> do {
> - char c[16];
> - ret = read(fde->fd, c, sizeof(c));
> + /*
> + * This is the boilerplate for eventfd, but it works
> + * for pipes too. And as we don't care about the data
> + * we read, we're fine.
> + */
> + uint64_t val;
> + ret = read(fde->fd, &val, sizeof(val));
> } while (ret == -1 && errno == EINTR);
> }
>
> @@ -855,23 +862,39 @@ int tevent_common_wakeup_init(struct tevent_context *ev)
> {
> int ret;
>
> - if (ev->pipe_fde != NULL) {
> + if (ev->wakeup_fde != NULL) {
> return 0;
> }
>
> - ret = pipe(ev->pipe_fds);
> +#ifdef HAVE_EVENTFD
> + ret = eventfd(0, EFD_NONBLOCK);
> if (ret == -1) {
> return errno;
> }
> - ev_set_blocking(ev->pipe_fds[0], false);
> - ev_set_blocking(ev->pipe_fds[1], false);
> + ev->wakeup_fd = ret;
> +#else
> + {
> + int pipe_fds[2];
> + ret = pipe(pipe_fds);
> + if (ret == -1) {
> + return errno;
> + }
> + ev->wakeup_fd = pipe_fds[0];
> + ev->wakeup_write_fd = pipe_fds[1];
> +
> + ev_set_blocking(ev->wakeup_fd, false);
> + ev_set_blocking(ev->wakeup_write_fd, false);
> + }
> +#endif
>
> - ev->pipe_fde = tevent_add_fd(ev, ev, ev->pipe_fds[0],
> + ev->wakeup_fde = tevent_add_fd(ev, ev, ev->wakeup_fd,
> TEVENT_FD_READ,
> wakeup_pipe_handler, NULL);
> - if (ev->pipe_fde == NULL) {
> - close(ev->pipe_fds[0]);
> - close(ev->pipe_fds[1]);
> + if (ev->wakeup_fde == NULL) {
> + close(ev->wakeup_fd);
> +#ifndef HAVE_EVENTFD
> + close(ev->wakeup_write_fd);
> +#endif
> return ENOMEM;
> }
>
> @@ -882,13 +905,18 @@ int tevent_common_wakeup(struct tevent_context *ev)
> {
> ssize_t ret;
>
> - if (ev->pipe_fds[1] == -1) {
> + if (ev->wakeup_fde == NULL) {
> return ENOTCONN;
> }
>
> do {
> +#ifdef HAVE_EVENTFD
> + uint64_t val = 1;
> + ret = write(ev->wakeup_fd, &val, sizeof(val));
> +#else
> char c = '\0';
> - ret = write(ev->pipe_fds[1], &c, 1);
> + ret = write(ev->wakeup_write_fd, &c, 1);
> +#endif
> } while ((ret == -1) && (errno == EINTR));
>
> return 0;
> @@ -896,12 +924,14 @@ int tevent_common_wakeup(struct tevent_context *ev)
>
> static void tevent_common_wakeup_fini(struct tevent_context *ev)
> {
> - if (ev->pipe_fde == NULL) {
> + if (ev->wakeup_fde == NULL) {
> return;
> }
>
> - TALLOC_FREE(ev->pipe_fde);
> + TALLOC_FREE(ev->wakeup_fde);
>
> - close(ev->pipe_fds[0]);
> - close(ev->pipe_fds[1]);
> + close(ev->wakeup_fd);
> +#ifndef HAVE_EVENTFD
> + close(ev->wakeup_write_fd);
> +#endif
> }
> diff --git a/lib/tevent/tevent_internal.h b/lib/tevent/tevent_internal.h
> index f17ce94..d960544 100644
> --- a/lib/tevent/tevent_internal.h
> +++ b/lib/tevent/tevent_internal.h
> @@ -276,8 +276,11 @@ struct tevent_context {
> void *additional_data;
>
> /* pipe hack used with signal handlers */
> - struct tevent_fd *pipe_fde;
> - int pipe_fds[2];
> + struct tevent_fd *wakeup_fde;
> + int wakeup_fd;
> +#ifndef HAVE_EVENT_FD
> + int wakeup_write_fd;
> +#endif
>
> /* debugging operations */
> struct tevent_debug_ops debug_ops;
> --
> 2.1.4
>
>
> From 7d0c5ab16f272fd451fe5ada6018cb690286bc57 Mon Sep 17 00:00:00 2001
> From: Stefan Metzmacher <metze at samba.org>
> Date: Wed, 17 Aug 2016 10:08:57 +0200
> Subject: [PATCH 11/23] tevent: version 0.9.30
>
> * add tevent_threaded_context_create() and tevent_threaded_schedule_immediate()
> They add a way to pass the thread result from a helper thread into
> the main event loop.
>
> Signed-off-by: Stefan Metzmacher <metze at samba.org>
> ---
> lib/tevent/ABI/tevent-0.9.29.sigs | 6 ---
> lib/tevent/ABI/tevent-0.9.30.sigs | 96 +++++++++++++++++++++++++++++++++++++++
> lib/tevent/wscript | 2 +-
> 3 files changed, 97 insertions(+), 7 deletions(-)
> create mode 100644 lib/tevent/ABI/tevent-0.9.30.sigs
>
> diff --git a/lib/tevent/ABI/tevent-0.9.29.sigs b/lib/tevent/ABI/tevent-0.9.29.sigs
> index 9b8bfa1..1357751 100644
> --- a/lib/tevent/ABI/tevent-0.9.29.sigs
> +++ b/lib/tevent/ABI/tevent-0.9.29.sigs
> @@ -16,7 +16,6 @@ _tevent_req_nomem: bool (const void *, struct tevent_req *, const char *)
> _tevent_req_notify_callback: void (struct tevent_req *, const char *)
> _tevent_req_oom: void (struct tevent_req *, const char *)
> _tevent_schedule_immediate: void (struct tevent_immediate *, struct tevent_context *, tevent_immediate_handler_t, void *, const char *, const char *)
> -_tevent_threaded_schedule_immediate: void (struct tevent_threaded_context *, struct tevent_immediate *, tevent_immediate_handler_t, void *, const char *, const char *)
> tevent_backend_list: const char **(TALLOC_CTX *)
> tevent_cleanup_pending_signal_handlers: void (struct tevent_signal *)
> tevent_common_add_fd: struct tevent_fd *(struct tevent_context *, TALLOC_CTX *, int, uint16_t, tevent_fd_handler_t, void *, const char *, const char *)
> @@ -29,14 +28,10 @@ tevent_common_fd_destructor: int (struct tevent_fd *)
> tevent_common_fd_get_flags: uint16_t (struct tevent_fd *)
> tevent_common_fd_set_close_fn: void (struct tevent_fd *, tevent_fd_close_fn_t)
> tevent_common_fd_set_flags: void (struct tevent_fd *, uint16_t)
> -tevent_common_have_events: bool (struct tevent_context *)
> tevent_common_loop_immediate: bool (struct tevent_context *)
> tevent_common_loop_timer_delay: struct timeval (struct tevent_context *)
> tevent_common_loop_wait: int (struct tevent_context *, const char *)
> tevent_common_schedule_immediate: void (struct tevent_immediate *, struct tevent_context *, tevent_immediate_handler_t, void *, const char *, const char *)
> -tevent_common_threaded_activate_immediate: void (struct tevent_context *)
> -tevent_common_wakeup: int (struct tevent_context *)
> -tevent_common_wakeup_init: int (struct tevent_context *)
> tevent_context_init: struct tevent_context *(TALLOC_CTX *)
> tevent_context_init_byname: struct tevent_context *(TALLOC_CTX *, const char *)
> tevent_context_init_ops: struct tevent_context *(TALLOC_CTX *, const struct tevent_ops *, void *)
> @@ -82,7 +77,6 @@ tevent_set_trace_callback: void (struct tevent_context *, tevent_trace_callback_
> tevent_signal_support: bool (struct tevent_context *)
> tevent_thread_proxy_create: struct tevent_thread_proxy *(struct tevent_context *)
> tevent_thread_proxy_schedule: void (struct tevent_thread_proxy *, struct tevent_immediate **, tevent_immediate_handler_t, void *)
> -tevent_threaded_context_create: struct tevent_threaded_context *(TALLOC_CTX *, struct tevent_context *)
> tevent_timeval_add: struct timeval (const struct timeval *, uint32_t, uint32_t)
> tevent_timeval_compare: int (const struct timeval *, const struct timeval *)
> tevent_timeval_current: struct timeval (void)
> diff --git a/lib/tevent/ABI/tevent-0.9.30.sigs b/lib/tevent/ABI/tevent-0.9.30.sigs
> new file mode 100644
> index 0000000..9b8bfa1
> --- /dev/null
> +++ b/lib/tevent/ABI/tevent-0.9.30.sigs
> @@ -0,0 +1,96 @@
> +_tevent_add_fd: struct tevent_fd *(struct tevent_context *, TALLOC_CTX *, int, uint16_t, tevent_fd_handler_t, void *, const char *, const char *)
> +_tevent_add_signal: struct tevent_signal *(struct tevent_context *, TALLOC_CTX *, int, int, tevent_signal_handler_t, void *, const char *, const char *)
> +_tevent_add_timer: struct tevent_timer *(struct tevent_context *, TALLOC_CTX *, struct timeval, tevent_timer_handler_t, void *, const char *, const char *)
> +_tevent_create_immediate: struct tevent_immediate *(TALLOC_CTX *, const char *)
> +_tevent_loop_once: int (struct tevent_context *, const char *)
> +_tevent_loop_until: int (struct tevent_context *, bool (*)(void *), void *, const char *)
> +_tevent_loop_wait: int (struct tevent_context *, const char *)
> +_tevent_queue_create: struct tevent_queue *(TALLOC_CTX *, const char *, const char *)
> +_tevent_req_callback_data: void *(struct tevent_req *)
> +_tevent_req_cancel: bool (struct tevent_req *, const char *)
> +_tevent_req_create: struct tevent_req *(TALLOC_CTX *, void *, size_t, const char *, const char *)
> +_tevent_req_data: void *(struct tevent_req *)
> +_tevent_req_done: void (struct tevent_req *, const char *)
> +_tevent_req_error: bool (struct tevent_req *, uint64_t, const char *)
> +_tevent_req_nomem: bool (const void *, struct tevent_req *, const char *)
> +_tevent_req_notify_callback: void (struct tevent_req *, const char *)
> +_tevent_req_oom: void (struct tevent_req *, const char *)
> +_tevent_schedule_immediate: void (struct tevent_immediate *, struct tevent_context *, tevent_immediate_handler_t, void *, const char *, const char *)
> +_tevent_threaded_schedule_immediate: void (struct tevent_threaded_context *, struct tevent_immediate *, tevent_immediate_handler_t, void *, const char *, const char *)
> +tevent_backend_list: const char **(TALLOC_CTX *)
> +tevent_cleanup_pending_signal_handlers: void (struct tevent_signal *)
> +tevent_common_add_fd: struct tevent_fd *(struct tevent_context *, TALLOC_CTX *, int, uint16_t, tevent_fd_handler_t, void *, const char *, const char *)
> +tevent_common_add_signal: struct tevent_signal *(struct tevent_context *, TALLOC_CTX *, int, int, tevent_signal_handler_t, void *, const char *, const char *)
> +tevent_common_add_timer: struct tevent_timer *(struct tevent_context *, TALLOC_CTX *, struct timeval, tevent_timer_handler_t, void *, const char *, const char *)
> +tevent_common_add_timer_v2: struct tevent_timer *(struct tevent_context *, TALLOC_CTX *, struct timeval, tevent_timer_handler_t, void *, const char *, const char *)
> +tevent_common_check_signal: int (struct tevent_context *)
> +tevent_common_context_destructor: int (struct tevent_context *)
> +tevent_common_fd_destructor: int (struct tevent_fd *)
> +tevent_common_fd_get_flags: uint16_t (struct tevent_fd *)
> +tevent_common_fd_set_close_fn: void (struct tevent_fd *, tevent_fd_close_fn_t)
> +tevent_common_fd_set_flags: void (struct tevent_fd *, uint16_t)
> +tevent_common_have_events: bool (struct tevent_context *)
> +tevent_common_loop_immediate: bool (struct tevent_context *)
> +tevent_common_loop_timer_delay: struct timeval (struct tevent_context *)
> +tevent_common_loop_wait: int (struct tevent_context *, const char *)
> +tevent_common_schedule_immediate: void (struct tevent_immediate *, struct tevent_context *, tevent_immediate_handler_t, void *, const char *, const char *)
> +tevent_common_threaded_activate_immediate: void (struct tevent_context *)
> +tevent_common_wakeup: int (struct tevent_context *)
> +tevent_common_wakeup_init: int (struct tevent_context *)
> +tevent_context_init: struct tevent_context *(TALLOC_CTX *)
> +tevent_context_init_byname: struct tevent_context *(TALLOC_CTX *, const char *)
> +tevent_context_init_ops: struct tevent_context *(TALLOC_CTX *, const struct tevent_ops *, void *)
> +tevent_debug: void (struct tevent_context *, enum tevent_debug_level, const char *, ...)
> +tevent_fd_get_flags: uint16_t (struct tevent_fd *)
> +tevent_fd_set_auto_close: void (struct tevent_fd *)
> +tevent_fd_set_close_fn: void (struct tevent_fd *, tevent_fd_close_fn_t)
> +tevent_fd_set_flags: void (struct tevent_fd *, uint16_t)
> +tevent_get_trace_callback: void (struct tevent_context *, tevent_trace_callback_t *, void *)
> +tevent_loop_allow_nesting: void (struct tevent_context *)
> +tevent_loop_set_nesting_hook: void (struct tevent_context *, tevent_nesting_hook, void *)
> +tevent_num_signals: size_t (void)
> +tevent_queue_add: bool (struct tevent_queue *, struct tevent_context *, struct tevent_req *, tevent_queue_trigger_fn_t, void *)
> +tevent_queue_add_entry: struct tevent_queue_entry *(struct tevent_queue *, struct tevent_context *, struct tevent_req *, tevent_queue_trigger_fn_t, void *)
> +tevent_queue_add_optimize_empty: struct tevent_queue_entry *(struct tevent_queue *, struct tevent_context *, struct tevent_req *, tevent_queue_trigger_fn_t, void *)
> +tevent_queue_length: size_t (struct tevent_queue *)
> +tevent_queue_running: bool (struct tevent_queue *)
> +tevent_queue_start: void (struct tevent_queue *)
> +tevent_queue_stop: void (struct tevent_queue *)
> +tevent_queue_wait_recv: bool (struct tevent_req *)
> +tevent_queue_wait_send: struct tevent_req *(TALLOC_CTX *, struct tevent_context *, struct tevent_queue *)
> +tevent_re_initialise: int (struct tevent_context *)
> +tevent_register_backend: bool (const char *, const struct tevent_ops *)
> +tevent_req_default_print: char *(struct tevent_req *, TALLOC_CTX *)
> +tevent_req_defer_callback: void (struct tevent_req *, struct tevent_context *)
> +tevent_req_is_error: bool (struct tevent_req *, enum tevent_req_state *, uint64_t *)
> +tevent_req_is_in_progress: bool (struct tevent_req *)
> +tevent_req_poll: bool (struct tevent_req *, struct tevent_context *)
> +tevent_req_post: struct tevent_req *(struct tevent_req *, struct tevent_context *)
> +tevent_req_print: char *(TALLOC_CTX *, struct tevent_req *)
> +tevent_req_received: void (struct tevent_req *)
> +tevent_req_set_callback: void (struct tevent_req *, tevent_req_fn, void *)
> +tevent_req_set_cancel_fn: void (struct tevent_req *, tevent_req_cancel_fn)
> +tevent_req_set_cleanup_fn: void (struct tevent_req *, tevent_req_cleanup_fn)
> +tevent_req_set_endtime: bool (struct tevent_req *, struct tevent_context *, struct timeval)
> +tevent_req_set_print_fn: void (struct tevent_req *, tevent_req_print_fn)
> +tevent_sa_info_queue_count: size_t (void)
> +tevent_set_abort_fn: void (void (*)(const char *))
> +tevent_set_debug: int (struct tevent_context *, void (*)(void *, enum tevent_debug_level, const char *, va_list), void *)
> +tevent_set_debug_stderr: int (struct tevent_context *)
> +tevent_set_default_backend: void (const char *)
> +tevent_set_trace_callback: void (struct tevent_context *, tevent_trace_callback_t, void *)
> +tevent_signal_support: bool (struct tevent_context *)
> +tevent_thread_proxy_create: struct tevent_thread_proxy *(struct tevent_context *)
> +tevent_thread_proxy_schedule: void (struct tevent_thread_proxy *, struct tevent_immediate **, tevent_immediate_handler_t, void *)
> +tevent_threaded_context_create: struct tevent_threaded_context *(TALLOC_CTX *, struct tevent_context *)
> +tevent_timeval_add: struct timeval (const struct timeval *, uint32_t, uint32_t)
> +tevent_timeval_compare: int (const struct timeval *, const struct timeval *)
> +tevent_timeval_current: struct timeval (void)
> +tevent_timeval_current_ofs: struct timeval (uint32_t, uint32_t)
> +tevent_timeval_is_zero: bool (const struct timeval *)
> +tevent_timeval_set: struct timeval (uint32_t, uint32_t)
> +tevent_timeval_until: struct timeval (const struct timeval *, const struct timeval *)
> +tevent_timeval_zero: struct timeval (void)
> +tevent_trace_point_callback: void (struct tevent_context *, enum tevent_trace_point)
> +tevent_wakeup_recv: bool (struct tevent_req *)
> +tevent_wakeup_send: struct tevent_req *(TALLOC_CTX *, struct tevent_context *, struct timeval)
> diff --git a/lib/tevent/wscript b/lib/tevent/wscript
> index 3e0b413..380316d 100755
> --- a/lib/tevent/wscript
> +++ b/lib/tevent/wscript
> @@ -1,7 +1,7 @@
> #!/usr/bin/env python
>
> APPNAME = 'tevent'
> -VERSION = '0.9.29'
> +VERSION = '0.9.30'
>
> blddir = 'bin'
>
> --
> 2.1.4
>
>
> From ad5309b5dfac1d6b62d2b1b79ea4c439b60432a9 Mon Sep 17 00:00:00 2001
> From: Volker Lendecke <vl at samba.org>
> Date: Sat, 30 Jul 2016 10:20:08 +0200
> Subject: [PATCH 12/23] lib: Add pthreadpool_pipe
>
> First step to separate the signalling mechanism from the core pthreadpool code.
> A later patch will add a pthreadpool that directly indicates job completion via
> tevent_threaded_schedule_immediate.
>
> Signed-off-by: Volker Lendecke <vl at samba.org>
> ---
> source3/lib/pthreadpool/pthreadpool_pipe.c | 84 ++++++++++++++++++++++++++++++
> source3/lib/pthreadpool/pthreadpool_pipe.h | 39 ++++++++++++++
> source3/lib/pthreadpool/wscript_build | 4 +-
> 3 files changed, 125 insertions(+), 2 deletions(-)
> create mode 100644 source3/lib/pthreadpool/pthreadpool_pipe.c
> create mode 100644 source3/lib/pthreadpool/pthreadpool_pipe.h
>
> diff --git a/source3/lib/pthreadpool/pthreadpool_pipe.c b/source3/lib/pthreadpool/pthreadpool_pipe.c
> new file mode 100644
> index 0000000..76bafa2
> --- /dev/null
> +++ b/source3/lib/pthreadpool/pthreadpool_pipe.c
> @@ -0,0 +1,84 @@
> +/*
> + * Unix SMB/CIFS implementation.
> + * threadpool implementation based on pthreads
> + * Copyright (C) Volker Lendecke 2009,2011
> + *
> + * This program is free software; you can redistribute it and/or modify
> + * it under the terms of the GNU General Public License as published by
> + * the Free Software Foundation; either version 3 of the License, or
> + * (at your option) any later version.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + * GNU General Public License for more details.
> + *
> + * You should have received a copy of the GNU General Public License
> + * along with this program. If not, see <http://www.gnu.org/licenses/>.
> + */
> +
> +#include "replace.h"
> +#include "system/filesys.h"
> +#include "pthreadpool_pipe.h"
> +#include "pthreadpool.h"
> +
> +struct pthreadpool_pipe {
> + struct pthreadpool *pool;
> +};
> +
> +int pthreadpool_pipe_init(unsigned max_threads,
> + struct pthreadpool_pipe **presult)
> +{
> + struct pthreadpool_pipe *p;
> + int ret;
> +
> + p = malloc(sizeof(struct pthreadpool_pipe));
> + if (p == NULL) {
> + return ENOMEM;
> + }
> +
> + ret = pthreadpool_init(max_threads, &p->pool);
> + if (ret != 0) {
> + free(p);
> + return ret;
> + }
> +
> + *presult = p;
> + return 0;
> +}
> +
> +int pthreadpool_pipe_destroy(struct pthreadpool_pipe *pool)
> +{
> + int ret;
> +
> + ret = pthreadpool_destroy(pool->pool);
> + if (ret != 0) {
> + return ret;
> + }
> + free(pool);
> + return 0;
> +}
> +
> +int pthreadpool_pipe_add_job(struct pthreadpool_pipe *pool, int job_id,
> + void (*fn)(void *private_data),
> + void *private_data)
> +{
> + int ret;
> + ret = pthreadpool_add_job(pool->pool, job_id, fn, private_data);
> + return ret;
> +}
> +
> +int pthreadpool_pipe_signal_fd(struct pthreadpool_pipe *pool)
> +{
> + int fd;
> + fd = pthreadpool_signal_fd(pool->pool);
> + return fd;
> +}
> +
> +int pthreadpool_pipe_finished_jobs(struct pthreadpool_pipe *pool, int *jobids,
> + unsigned num_jobids)
> +{
> + int ret;
> + ret = pthreadpool_finished_jobs(pool->pool, jobids, num_jobids);
> + return ret;
> +}
> diff --git a/source3/lib/pthreadpool/pthreadpool_pipe.h b/source3/lib/pthreadpool/pthreadpool_pipe.h
> new file mode 100644
> index 0000000..77516f7
> --- /dev/null
> +++ b/source3/lib/pthreadpool/pthreadpool_pipe.h
> @@ -0,0 +1,39 @@
> +/*
> + * Unix SMB/CIFS implementation.
> + * threadpool implementation based on pthreads
> + * Copyright (C) Volker Lendecke 2009,2011
> + *
> + * This program is free software; you can redistribute it and/or modify
> + * it under the terms of the GNU General Public License as published by
> + * the Free Software Foundation; either version 3 of the License, or
> + * (at your option) any later version.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + * GNU General Public License for more details.
> + *
> + * You should have received a copy of the GNU General Public License
> + * along with this program. If not, see <http://www.gnu.org/licenses/>.
> + */
> +
> +#ifndef __PTHREADPOOL_PIPE_H__
> +#define __PTHREADPOOL_PIPE_H__
> +
> +struct pthreadpool_pipe;
> +
> +int pthreadpool_pipe_init(unsigned max_threads,
> + struct pthreadpool_pipe **presult);
> +
> +int pthreadpool_pipe_destroy(struct pthreadpool_pipe *pool);
> +
> +int pthreadpool_pipe_add_job(struct pthreadpool_pipe *pool, int job_id,
> + void (*fn)(void *private_data),
> + void *private_data);
> +
> +int pthreadpool_pipe_signal_fd(struct pthreadpool_pipe *pool);
> +
> +int pthreadpool_pipe_finished_jobs(struct pthreadpool_pipe *pool, int *jobids,
> + unsigned num_jobids);
> +
> +#endif
> diff --git a/source3/lib/pthreadpool/wscript_build b/source3/lib/pthreadpool/wscript_build
> index bdd5f53..aa02850 100644
> --- a/source3/lib/pthreadpool/wscript_build
> +++ b/source3/lib/pthreadpool/wscript_build
> @@ -2,11 +2,11 @@
>
> if bld.env.WITH_PTHREADPOOL:
> bld.SAMBA3_SUBSYSTEM('PTHREADPOOL',
> - source='pthreadpool.c',
> + source='pthreadpool.c pthreadpool_pipe.c',
> deps='pthread rt replace')
> else:
> bld.SAMBA3_SUBSYSTEM('PTHREADPOOL',
> - source='pthreadpool_sync.c',
> + source='pthreadpool_sync.c pthreadpool_pipe.c',
> deps='replace')
>
>
> --
> 2.1.4
>
>
> From 7cb698e7b614bc22fc9d32523a6fc4f86980e044 Mon Sep 17 00:00:00 2001
> From: Volker Lendecke <vl at samba.org>
> Date: Mon, 15 Aug 2016 13:57:20 +0200
> Subject: [PATCH 13/23] lib: Use pthreadpool_pipe instead of pthreadpool
>
> Signed-off-by: Volker Lendecke <vl at samba.org>
> Reviewed-by: Stefan Metzmacher <metze at samba.org>
> ---
> source3/lib/asys/asys.c | 22 ++++-----
> source3/lib/fncall.c | 16 +++----
> source3/lib/pthreadpool/tests.c | 94 +++++++++++++++++++------------------
> source3/lib/unix_msg/unix_msg.c | 22 ++++-----
> source3/modules/vfs_aio_pthread.c | 22 ++++-----
> source3/torture/bench_pthreadpool.c | 22 ++++-----
> 6 files changed, 100 insertions(+), 98 deletions(-)
>
> diff --git a/source3/lib/asys/asys.c b/source3/lib/asys/asys.c
> index 670be01..594d470 100644
> --- a/source3/lib/asys/asys.c
> +++ b/source3/lib/asys/asys.c
> @@ -19,7 +19,7 @@
> #include "asys.h"
> #include <stdlib.h>
> #include <errno.h>
> -#include "../pthreadpool/pthreadpool.h"
> +#include "../pthreadpool/pthreadpool_pipe.h"
> #include "lib/util/time.h"
> #include "smbprofile.h"
>
> @@ -59,8 +59,8 @@ struct asys_job {
> };
>
> struct asys_context {
> - struct pthreadpool *pool;
> - int pthreadpool_fd;
> + struct pthreadpool_pipe *pool;
> + int pthreadpool_pipe_fd;
>
> unsigned num_jobs;
> struct asys_job **jobs;
> @@ -79,12 +79,12 @@ int asys_context_init(struct asys_context **pctx, unsigned max_parallel)
> if (ctx == NULL) {
> return ENOMEM;
> }
> - ret = pthreadpool_init(max_parallel, &ctx->pool);
> + ret = pthreadpool_pipe_init(max_parallel, &ctx->pool);
> if (ret != 0) {
> free(ctx);
> return ret;
> }
> - ctx->pthreadpool_fd = pthreadpool_signal_fd(ctx->pool);
> + ctx->pthreadpool_pipe_fd = pthreadpool_pipe_signal_fd(ctx->pool);
>
> *pctx = ctx;
> return 0;
> @@ -92,7 +92,7 @@ int asys_context_init(struct asys_context **pctx, unsigned max_parallel)
>
> int asys_signalfd(struct asys_context *ctx)
> {
> - return ctx->pthreadpool_fd;
> + return ctx->pthreadpool_pipe_fd;
> }
>
> int asys_context_destroy(struct asys_context *ctx)
> @@ -106,7 +106,7 @@ int asys_context_destroy(struct asys_context *ctx)
> }
> }
>
> - ret = pthreadpool_destroy(ctx->pool);
> + ret = pthreadpool_pipe_destroy(ctx->pool);
> if (ret != 0) {
> return ret;
> }
> @@ -179,7 +179,7 @@ int asys_pwrite(struct asys_context *ctx, int fildes, const void *buf,
> args->nbyte = nbyte;
> args->offset = offset;
>
> - ret = pthreadpool_add_job(ctx->pool, jobid, asys_pwrite_do, job);
> + ret = pthreadpool_pipe_add_job(ctx->pool, jobid, asys_pwrite_do, job);
> if (ret != 0) {
> return ret;
> }
> @@ -224,7 +224,7 @@ int asys_pread(struct asys_context *ctx, int fildes, void *buf,
> args->nbyte = nbyte;
> args->offset = offset;
>
> - ret = pthreadpool_add_job(ctx->pool, jobid, asys_pread_do, job);
> + ret = pthreadpool_pipe_add_job(ctx->pool, jobid, asys_pread_do, job);
> if (ret != 0) {
> return ret;
> }
> @@ -265,7 +265,7 @@ int asys_fsync(struct asys_context *ctx, int fildes, void *private_data)
> args = &job->args.fsync_args;
> args->fildes = fildes;
>
> - ret = pthreadpool_add_job(ctx->pool, jobid, asys_fsync_do, job);
> + ret = pthreadpool_pipe_add_job(ctx->pool, jobid, asys_fsync_do, job);
> if (ret != 0) {
> return ret;
> }
> @@ -307,7 +307,7 @@ int asys_results(struct asys_context *ctx, struct asys_result *results,
> int jobids[num_results];
> int i, ret;
>
> - ret = pthreadpool_finished_jobs(ctx->pool, jobids, num_results);
> + ret = pthreadpool_pipe_finished_jobs(ctx->pool, jobids, num_results);
> if (ret <= 0) {
> return ret;
> }
> diff --git a/source3/lib/fncall.c b/source3/lib/fncall.c
> index 88304d6..0923c14 100644
> --- a/source3/lib/fncall.c
> +++ b/source3/lib/fncall.c
> @@ -20,7 +20,7 @@
> #include "includes.h"
> #include "../lib/util/tevent_unix.h"
>
> -#include "lib/pthreadpool/pthreadpool.h"
> +#include "lib/pthreadpool/pthreadpool_pipe.h"
>
> struct fncall_state {
> struct fncall_context *ctx;
> @@ -32,7 +32,7 @@ struct fncall_state {
> };
>
> struct fncall_context {
> - struct pthreadpool *pool;
> + struct pthreadpool_pipe *pool;
> int next_job_id;
> int sig_fd;
> struct tevent_req **pending;
> @@ -61,7 +61,7 @@ static int fncall_context_destructor(struct fncall_context *ctx)
> fncall_handler(NULL, NULL, TEVENT_FD_READ, ctx);
> }
>
> - pthreadpool_destroy(ctx->pool);
> + pthreadpool_pipe_destroy(ctx->pool);
> ctx->pool = NULL;
>
> return 0;
> @@ -78,14 +78,14 @@ struct fncall_context *fncall_context_init(TALLOC_CTX *mem_ctx,
> return NULL;
> }
>
> - ret = pthreadpool_init(max_threads, &ctx->pool);
> + ret = pthreadpool_pipe_init(max_threads, &ctx->pool);
> if (ret != 0) {
> TALLOC_FREE(ctx);
> return NULL;
> }
> talloc_set_destructor(ctx, fncall_context_destructor);
>
> - ctx->sig_fd = pthreadpool_signal_fd(ctx->pool);
> + ctx->sig_fd = pthreadpool_pipe_signal_fd(ctx->pool);
> if (ctx->sig_fd == -1) {
> TALLOC_FREE(ctx);
> return NULL;
> @@ -266,8 +266,8 @@ struct tevent_req *fncall_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
> state->private_parent = talloc_parent(private_data);
> state->job_private = talloc_move(state, &private_data);
>
> - ret = pthreadpool_add_job(state->ctx->pool, state->job_id, fn,
> - state->job_private);
> + ret = pthreadpool_pipe_add_job(state->ctx->pool, state->job_id, fn,
> + state->job_private);
> if (ret == -1) {
> tevent_req_error(req, errno);
> return tevent_req_post(req, ev);
> @@ -287,7 +287,7 @@ static void fncall_handler(struct tevent_context *ev, struct tevent_fd *fde,
> int i, num_pending;
> int job_id;
>
> - if (pthreadpool_finished_jobs(ctx->pool, &job_id, 1) < 0) {
> + if (pthreadpool_pipe_finished_jobs(ctx->pool, &job_id, 1) < 0) {
> return;
> }
>
> diff --git a/source3/lib/pthreadpool/tests.c b/source3/lib/pthreadpool/tests.c
> index 8474712..0b48b41 100644
> --- a/source3/lib/pthreadpool/tests.c
> +++ b/source3/lib/pthreadpool/tests.c
> @@ -7,22 +7,22 @@
> #include <unistd.h>
> #include <sys/types.h>
> #include <sys/wait.h>
> -#include "pthreadpool.h"
> +#include "pthreadpool_pipe.h"
>
> static int test_init(void)
> {
> - struct pthreadpool *p;
> + struct pthreadpool_pipe *p;
> int ret;
>
> - ret = pthreadpool_init(1, &p);
> + ret = pthreadpool_pipe_init(1, &p);
> if (ret != 0) {
> - fprintf(stderr, "pthreadpool_init failed: %s\n",
> + fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
> strerror(ret));
> return -1;
> }
> - ret = pthreadpool_destroy(p);
> + ret = pthreadpool_pipe_destroy(p);
> if (ret != 0) {
> - fprintf(stderr, "pthreadpool_init failed: %s\n",
> + fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
> strerror(ret));
> return -1;
> }
> @@ -43,7 +43,7 @@ static void test_sleep(void *ptr)
> static int test_jobs(int num_threads, int num_jobs)
> {
> char *finished;
> - struct pthreadpool *p;
> + struct pthreadpool_pipe *p;
> int timeout = 1;
> int i, ret;
>
> @@ -53,25 +53,25 @@ static int test_jobs(int num_threads, int num_jobs)
> return -1;
> }
>
> - ret = pthreadpool_init(num_threads, &p);
> + ret = pthreadpool_pipe_init(num_threads, &p);
> if (ret != 0) {
> - fprintf(stderr, "pthreadpool_init failed: %s\n",
> + fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
> strerror(ret));
> return -1;
> }
>
> for (i=0; i<num_jobs; i++) {
> - ret = pthreadpool_add_job(p, i, test_sleep, &timeout);
> + ret = pthreadpool_pipe_add_job(p, i, test_sleep, &timeout);
> if (ret != 0) {
> - fprintf(stderr, "pthreadpool_add_job failed: %s\n",
> - strerror(ret));
> + fprintf(stderr, "pthreadpool_pipe_add_job failed: "
> + "%s\n", strerror(ret));
> return -1;
> }
> }
>
> for (i=0; i<num_jobs; i++) {
> int jobid = -1;
> - ret = pthreadpool_finished_jobs(p, &jobid, 1);
> + ret = pthreadpool_pipe_finished_jobs(p, &jobid, 1);
> if ((ret != 1) || (jobid >= num_jobs)) {
> fprintf(stderr, "invalid job number %d\n", jobid);
> return -1;
> @@ -87,9 +87,9 @@ static int test_jobs(int num_threads, int num_jobs)
> }
> }
>
> - ret = pthreadpool_destroy(p);
> + ret = pthreadpool_pipe_destroy(p);
> if (ret != 0) {
> - fprintf(stderr, "pthreadpool_destroy failed: %s\n",
> + fprintf(stderr, "pthreadpool_pipe_destroy failed: %s\n",
> strerror(ret));
> return -1;
> }
> @@ -100,37 +100,37 @@ static int test_jobs(int num_threads, int num_jobs)
>
> static int test_busydestroy(void)
> {
> - struct pthreadpool *p;
> + struct pthreadpool_pipe *p;
> int timeout = 50;
> struct pollfd pfd;
> int ret;
>
> - ret = pthreadpool_init(1, &p);
> + ret = pthreadpool_pipe_init(1, &p);
> if (ret != 0) {
> - fprintf(stderr, "pthreadpool_init failed: %s\n",
> + fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
> strerror(ret));
> return -1;
> }
> - ret = pthreadpool_add_job(p, 1, test_sleep, &timeout);
> + ret = pthreadpool_pipe_add_job(p, 1, test_sleep, &timeout);
> if (ret != 0) {
> - fprintf(stderr, "pthreadpool_add_job failed: %s\n",
> + fprintf(stderr, "pthreadpool_pipe_add_job failed: %s\n",
> strerror(ret));
> return -1;
> }
> - ret = pthreadpool_destroy(p);
> + ret = pthreadpool_pipe_destroy(p);
> if (ret != EBUSY) {
> fprintf(stderr, "Could destroy a busy pool\n");
> return -1;
> }
>
> - pfd.fd = pthreadpool_signal_fd(p);
> + pfd.fd = pthreadpool_pipe_signal_fd(p);
> pfd.events = POLLIN|POLLERR;
>
> poll(&pfd, 1, -1);
>
> - ret = pthreadpool_destroy(p);
> + ret = pthreadpool_pipe_destroy(p);
> if (ret != 0) {
> - fprintf(stderr, "pthreadpool_destroy failed: %s\n",
> + fprintf(stderr, "pthreadpool_pipe_destroy failed: %s\n",
> strerror(ret));
> return -1;
> }
> @@ -139,7 +139,7 @@ static int test_busydestroy(void)
>
> struct threaded_state {
> pthread_t tid;
> - struct pthreadpool *p;
> + struct pthreadpool_pipe *p;
> int start_job;
> int num_jobs;
> int timeout;
> @@ -151,11 +151,12 @@ static void *test_threaded_worker(void *p)
> int i;
>
> for (i=0; i<state->num_jobs; i++) {
> - int ret = pthreadpool_add_job(state->p, state->start_job + i,
> - test_sleep, &state->timeout);
> + int ret = pthreadpool_pipe_add_job(
> + state->p, state->start_job + i,
> + test_sleep, &state->timeout);
> if (ret != 0) {
> - fprintf(stderr, "pthreadpool_add_job failed: %s\n",
> - strerror(ret));
> + fprintf(stderr, "pthreadpool_pipe_add_job failed: "
> + "%s\n", strerror(ret));
> return NULL;
> }
> }
> @@ -165,7 +166,7 @@ static void *test_threaded_worker(void *p)
> static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
> int num_jobs)
> {
> - struct pthreadpool **pools;
> + struct pthreadpool_pipe **pools;
> struct threaded_state *states;
> struct threaded_state *state;
> struct pollfd *pfds;
> @@ -186,7 +187,7 @@ static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
> return -1;
> }
>
> - pools = calloc(num_pools, sizeof(struct pthreadpool *));
> + pools = calloc(num_pools, sizeof(struct pthreadpool_pipe *));
> if (pools == NULL) {
> fprintf(stderr, "calloc failed\n");
> return -1;
> @@ -199,13 +200,13 @@ static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
> }
>
> for (i=0; i<num_pools; i++) {
> - ret = pthreadpool_init(poolsize, &pools[i]);
> + ret = pthreadpool_pipe_init(poolsize, &pools[i]);
> if (ret != 0) {
> - fprintf(stderr, "pthreadpool_init failed: %s\n",
> + fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
> strerror(ret));
> return -1;
> }
> - pfds[i].fd = pthreadpool_signal_fd(pools[i]);
> + pfds[i].fd = pthreadpool_pipe_signal_fd(pools[i]);
> pfds[i].events = POLLIN|POLLHUP;
> }
>
> @@ -241,10 +242,10 @@ static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
> }
> if (child == 0) {
> for (i=0; i<num_pools; i++) {
> - ret = pthreadpool_destroy(pools[i]);
> + ret = pthreadpool_pipe_destroy(pools[i]);
> if (ret != 0) {
> - fprintf(stderr, "pthreadpool_destroy failed: "
> - "%s\n", strerror(ret));
> + fprintf(stderr, "pthreadpool_pipe_destroy "
> + "failed: %s\n", strerror(ret));
> exit(1);
> }
> }
> @@ -284,7 +285,8 @@ static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
> continue;
> }
>
> - ret = pthreadpool_finished_jobs(pools[j], &jobid, 1);
> + ret = pthreadpool_pipe_finished_jobs(
> + pools[j], &jobid, 1);
> if ((ret != 1) || (jobid >= num_jobs * num_threads)) {
> fprintf(stderr, "invalid job number %d\n",
> jobid);
> @@ -304,10 +306,10 @@ static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
> }
>
> for (i=0; i<num_pools; i++) {
> - ret = pthreadpool_destroy(pools[i]);
> + ret = pthreadpool_pipe_destroy(pools[i]);
> if (ret != 0) {
> - fprintf(stderr, "pthreadpool_destroy failed: %s\n",
> - strerror(ret));
> + fprintf(stderr, "pthreadpool_pipe_destroy failed: "
> + "%s\n", strerror(ret));
> return -1;
> }
> }
> @@ -322,19 +324,19 @@ static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
>
> static int test_fork(void)
> {
> - struct pthreadpool *p;
> + struct pthreadpool_pipe *p;
> pid_t child, waited;
> int status, ret;
>
> - ret = pthreadpool_init(1, &p);
> + ret = pthreadpool_pipe_init(1, &p);
> if (ret != 0) {
> - fprintf(stderr, "pthreadpool_init failed: %s\n",
> + fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
> strerror(ret));
> return -1;
> }
> - ret = pthreadpool_destroy(p);
> + ret = pthreadpool_pipe_destroy(p);
> if (ret != 0) {
> - fprintf(stderr, "pthreadpool_destroy failed: %s\n",
> + fprintf(stderr, "pthreadpool_pipe_destroy failed: %s\n",
> strerror(ret));
> return -1;
> }
> diff --git a/source3/lib/unix_msg/unix_msg.c b/source3/lib/unix_msg/unix_msg.c
> index aed1f75..5fac68b 100644
> --- a/source3/lib/unix_msg/unix_msg.c
> +++ b/source3/lib/unix_msg/unix_msg.c
> @@ -22,7 +22,7 @@
> #include "system/time.h"
> #include "system/network.h"
> #include "lib/util/dlinklist.h"
> -#include "pthreadpool/pthreadpool.h"
> +#include "pthreadpool/pthreadpool_pipe.h"
> #include "lib/util/iov_buf.h"
> #include "lib/util/msghdr.h"
> #include <fcntl.h>
> @@ -69,7 +69,7 @@ struct unix_dgram_ctx {
> struct poll_watch *sock_read_watch;
> struct unix_dgram_send_queue *send_queues;
>
> - struct pthreadpool *send_pool;
> + struct pthreadpool_pipe *send_pool;
> struct poll_watch *pool_read_watch;
>
> uint8_t *recv_buf;
> @@ -341,18 +341,18 @@ static int unix_dgram_init_pthreadpool(struct unix_dgram_ctx *ctx)
> return 0;
> }
>
> - ret = pthreadpool_init(0, &ctx->send_pool);
> + ret = pthreadpool_pipe_init(0, &ctx->send_pool);
> if (ret != 0) {
> return ret;
> }
>
> - signalfd = pthreadpool_signal_fd(ctx->send_pool);
> + signalfd = pthreadpool_pipe_signal_fd(ctx->send_pool);
>
> ctx->pool_read_watch = ctx->ev_funcs->watch_new(
> ctx->ev_funcs, signalfd, POLLIN,
> unix_dgram_job_finished, ctx);
> if (ctx->pool_read_watch == NULL) {
> - pthreadpool_destroy(ctx->send_pool);
> + pthreadpool_pipe_destroy(ctx->send_pool);
> ctx->send_pool = NULL;
> return ENOMEM;
> }
> @@ -525,7 +525,7 @@ static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
> struct unix_dgram_msg *msg;
> int ret, job;
>
> - ret = pthreadpool_finished_jobs(ctx->send_pool, &job, 1);
> + ret = pthreadpool_pipe_finished_jobs(ctx->send_pool, &job, 1);
> if (ret != 1) {
> return;
> }
> @@ -547,8 +547,8 @@ static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
> free(msg);
>
> if (q->msgs != NULL) {
> - ret = pthreadpool_add_job(ctx->send_pool, q->sock,
> - unix_dgram_send_job, q->msgs);
> + ret = pthreadpool_pipe_add_job(ctx->send_pool, q->sock,
> + unix_dgram_send_job, q->msgs);
> if (ret == 0) {
> return;
> }
> @@ -654,8 +654,8 @@ static int unix_dgram_send(struct unix_dgram_ctx *ctx,
> unix_dgram_send_queue_free(q);
> return ret;
> }
> - ret = pthreadpool_add_job(ctx->send_pool, q->sock,
> - unix_dgram_send_job, q->msgs);
> + ret = pthreadpool_pipe_add_job(ctx->send_pool, q->sock,
> + unix_dgram_send_job, q->msgs);
> if (ret != 0) {
> unix_dgram_send_queue_free(q);
> return ret;
> @@ -675,7 +675,7 @@ static int unix_dgram_free(struct unix_dgram_ctx *ctx)
> }
>
> if (ctx->send_pool != NULL) {
> - int ret = pthreadpool_destroy(ctx->send_pool);
> + int ret = pthreadpool_pipe_destroy(ctx->send_pool);
> if (ret != 0) {
> return ret;
> }
> diff --git a/source3/modules/vfs_aio_pthread.c b/source3/modules/vfs_aio_pthread.c
> index 7037b63..6edf250 100644
> --- a/source3/modules/vfs_aio_pthread.c
> +++ b/source3/modules/vfs_aio_pthread.c
> @@ -26,7 +26,7 @@
> #include "system/shmem.h"
> #include "smbd/smbd.h"
> #include "smbd/globals.h"
> -#include "lib/pthreadpool/pthreadpool.h"
> +#include "lib/pthreadpool/pthreadpool_pipe.h"
> #ifdef HAVE_LINUX_FALLOC_H
> #include <linux/falloc.h>
> #endif
> @@ -38,7 +38,7 @@
> ***********************************************************************/
>
> static bool init_aio_threadpool(struct tevent_context *ev_ctx,
> - struct pthreadpool **pp_pool,
> + struct pthreadpool_pipe **pp_pool,
> void (*completion_fn)(struct tevent_context *,
> struct tevent_fd *,
> uint16_t,
> @@ -51,19 +51,19 @@ static bool init_aio_threadpool(struct tevent_context *ev_ctx,
> return true;
> }
>
> - ret = pthreadpool_init(lp_aio_max_threads(), pp_pool);
> + ret = pthreadpool_pipe_init(lp_aio_max_threads(), pp_pool);
> if (ret) {
> errno = ret;
> return false;
> }
> sock_event = tevent_add_fd(ev_ctx,
> NULL,
> - pthreadpool_signal_fd(*pp_pool),
> + pthreadpool_pipe_signal_fd(*pp_pool),
> TEVENT_FD_READ,
> completion_fn,
> NULL);
> if (sock_event == NULL) {
> - pthreadpool_destroy(*pp_pool);
> + pthreadpool_pipe_destroy(*pp_pool);
> *pp_pool = NULL;
> return false;
> }
> @@ -87,7 +87,7 @@ static bool init_aio_threadpool(struct tevent_context *ev_ctx,
> * process, as is the current jobid.
> */
>
> -static struct pthreadpool *open_pool;
> +static struct pthreadpool_pipe *open_pool;
> static int aio_pthread_open_jobid;
>
> struct aio_open_private_data {
> @@ -167,7 +167,7 @@ static void aio_open_handle_completion(struct tevent_context *event_ctx,
> return;
> }
>
> - ret = pthreadpool_finished_jobs(open_pool, &jobid, 1);
> + ret = pthreadpool_pipe_finished_jobs(open_pool, &jobid, 1);
> if (ret != 1) {
> smb_panic("aio_open_handle_completion");
> /* notreached. */
> @@ -368,10 +368,10 @@ static int open_async(const files_struct *fsp,
> return -1;
> }
>
> - ret = pthreadpool_add_job(open_pool,
> - opd->jobid,
> - aio_open_worker,
> - (void *)opd);
> + ret = pthreadpool_pipe_add_job(open_pool,
> + opd->jobid,
> + aio_open_worker,
> + (void *)opd);
> if (ret) {
> errno = ret;
> return -1;
> diff --git a/source3/torture/bench_pthreadpool.c b/source3/torture/bench_pthreadpool.c
> index 247063d..82a84cf 100644
> --- a/source3/torture/bench_pthreadpool.c
> +++ b/source3/torture/bench_pthreadpool.c
> @@ -19,7 +19,7 @@
> */
>
> #include "includes.h"
> -#include "lib/pthreadpool/pthreadpool.h"
> +#include "lib/pthreadpool/pthreadpool_pipe.h"
> #include "proto.h"
>
> extern int torture_numops;
> @@ -31,12 +31,12 @@ static void null_job(void *private_data)
>
> bool run_bench_pthreadpool(int dummy)
> {
> - struct pthreadpool *pool;
> + struct pthreadpool_pipe *pool;
> int i, ret;
>
> - ret = pthreadpool_init(1, &pool);
> + ret = pthreadpool_pipe_init(1, &pool);
> if (ret != 0) {
> - d_fprintf(stderr, "pthreadpool_init failed: %s\n",
> + d_fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
> strerror(ret));
> return false;
> }
> @@ -44,21 +44,21 @@ bool run_bench_pthreadpool(int dummy)
> for (i=0; i<torture_numops; i++) {
> int jobid;
>
> - ret = pthreadpool_add_job(pool, 0, null_job, NULL);
> + ret = pthreadpool_pipe_add_job(pool, 0, null_job, NULL);
> if (ret != 0) {
> - d_fprintf(stderr, "pthreadpool_add_job failed: %s\n",
> - strerror(ret));
> + d_fprintf(stderr, "pthreadpool_pipe_add_job "
> + "failed: %s\n", strerror(ret));
> break;
> }
> - ret = pthreadpool_finished_jobs(pool, &jobid, 1);
> + ret = pthreadpool_pipe_finished_jobs(pool, &jobid, 1);
> if (ret < 0) {
> - d_fprintf(stderr, "pthreadpool_finished_job failed: %s\n",
> - strerror(-ret));
> + d_fprintf(stderr, "pthreadpool_pipe_finished_job "
> + "failed: %s\n", strerror(-ret));
> break;
> }
> }
>
> - pthreadpool_destroy(pool);
> + pthreadpool_pipe_destroy(pool);
>
> return (ret == 1);
> }
> --
> 2.1.4
>
>
> From 1dec4e67d3c6d11da3353769aad4d0210bd6f331 Mon Sep 17 00:00:00 2001
> From: Volker Lendecke <vl at samba.org>
> Date: Mon, 15 Aug 2016 13:59:12 +0200
> Subject: [PATCH 14/23] lib: Move pipe signalling to pthreadpool_pipe.c
>
> Signed-off-by: Volker Lendecke <vl at samba.org>
> Reviewed-by: Stefan Metzmacher <metze at samba.org>
> ---
> source3/lib/pthreadpool/pthreadpool.c | 82 +++-------------
> source3/lib/pthreadpool/pthreadpool.h | 34 +------
> source3/lib/pthreadpool/pthreadpool_pipe.c | 116 ++++++++++++++++++++---
> source3/lib/pthreadpool/pthreadpool_sync.c | 144 +++--------------------------
> 4 files changed, 134 insertions(+), 242 deletions(-)
>
> diff --git a/source3/lib/pthreadpool/pthreadpool.c b/source3/lib/pthreadpool/pthreadpool.c
> index b071e53..4c2858a 100644
> --- a/source3/lib/pthreadpool/pthreadpool.c
> +++ b/source3/lib/pthreadpool/pthreadpool.c
> @@ -19,7 +19,6 @@
>
> #include "replace.h"
> #include "system/time.h"
> -#include "system/filesys.h"
> #include "system/wait.h"
> #include "system/threads.h"
> #include "pthreadpool.h"
> @@ -58,9 +57,10 @@ struct pthreadpool {
> size_t num_jobs;
>
> /*
> - * pipe for signalling
> + * Indicate job completion
> */
> - int sig_pipe[2];
> + int (*signal_fn)(int jobid, void *private_data);
> + void *signal_private_data;
>
> /*
> * indicator to worker threads that they should shut down
> @@ -99,7 +99,9 @@ static void pthreadpool_prep_atfork(void);
> * Initialize a thread pool
> */
>
> -int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
> +int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
> + int (*signal_fn)(int jobid, void *private_data),
> + void *signal_private_data)
> {
> struct pthreadpool *pool;
> int ret;
> @@ -108,6 +110,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
> if (pool == NULL) {
> return ENOMEM;
> }
> + pool->signal_fn = signal_fn;
> + pool->signal_private_data = signal_private_data;
>
> pool->jobs_array_len = 4;
> pool->jobs = calloc(
> @@ -120,18 +124,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
>
> pool->head = pool->num_jobs = 0;
>
> - ret = pipe(pool->sig_pipe);
> - if (ret == -1) {
> - int err = errno;
> - free(pool->jobs);
> - free(pool);
> - return err;
> - }
> -
> ret = pthread_mutex_init(&pool->mutex, NULL);
> if (ret != 0) {
> - close(pool->sig_pipe[0]);
> - close(pool->sig_pipe[1]);
> free(pool->jobs);
> free(pool);
> return ret;
> @@ -140,8 +134,6 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
> ret = pthread_cond_init(&pool->condvar, NULL);
> if (ret != 0) {
> pthread_mutex_destroy(&pool->mutex);
> - close(pool->sig_pipe[0]);
> - close(pool->sig_pipe[1]);
> free(pool->jobs);
> free(pool);
> return ret;
> @@ -158,8 +150,6 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
> if (ret != 0) {
> pthread_cond_destroy(&pool->condvar);
> pthread_mutex_destroy(&pool->mutex);
> - close(pool->sig_pipe[0]);
> - close(pool->sig_pipe[1]);
> free(pool->jobs);
> free(pool);
> return ret;
> @@ -218,12 +208,6 @@ static void pthreadpool_child(void)
> pool != NULL;
> pool = DLIST_PREV(pool)) {
>
> - close(pool->sig_pipe[0]);
> - close(pool->sig_pipe[1]);
> -
> - ret = pipe(pool->sig_pipe);
> - assert(ret == 0);
> -
> pool->num_threads = 0;
>
> pool->num_exited = 0;
> @@ -249,16 +233,6 @@ static void pthreadpool_prep_atfork(void)
> }
>
> /*
> - * Return the file descriptor which becomes readable when a job has
> - * finished
> - */
> -
> -int pthreadpool_signal_fd(struct pthreadpool *pool)
> -{
> - return pool->sig_pipe[0];
> -}
> -
> -/*
> * Do a pthread_join() on all children that have exited, pool->mutex must be
> * locked
> */
> @@ -287,32 +261,6 @@ static void pthreadpool_join_children(struct pthreadpool *pool)
> }
>
> /*
> - * Fetch a finished job number from the signal pipe
> - */
> -
> -int pthreadpool_finished_jobs(struct pthreadpool *pool, int *jobids,
> - unsigned num_jobids)
> -{
> - ssize_t to_read, nread;
> -
> - nread = -1;
> - errno = EINTR;
> -
> - to_read = sizeof(int) * num_jobids;
> -
> - while ((nread == -1) && (errno == EINTR)) {
> - nread = read(pool->sig_pipe[0], jobids, to_read);
> - }
> - if (nread == -1) {
> - return -errno;
> - }
> - if ((nread % sizeof(int)) != 0) {
> - return -EINVAL;
> - }
> - return nread / sizeof(int);
> -}
> -
> -/*
> * Destroy a thread pool, finishing all threads working for it
> */
>
> @@ -390,12 +338,6 @@ int pthreadpool_destroy(struct pthreadpool *pool)
> ret = pthread_mutex_unlock(&pthreadpools_mutex);
> assert(ret == 0);
>
> - close(pool->sig_pipe[0]);
> - pool->sig_pipe[0] = -1;
> -
> - close(pool->sig_pipe[1]);
> - pool->sig_pipe[1] = -1;
> -
> free(pool->exited);
> free(pool->jobs);
> free(pool);
> @@ -527,8 +469,7 @@ static void *pthreadpool_server(void *arg)
> }
>
> if (pthreadpool_get_job(pool, &job)) {
> - ssize_t written;
> - int sig_pipe = pool->sig_pipe[1];
> + int ret;
>
> /*
> * Do the work with the mutex unlocked
> @@ -542,8 +483,9 @@ static void *pthreadpool_server(void *arg)
> res = pthread_mutex_lock(&pool->mutex);
> assert(res == 0);
>
> - written = write(sig_pipe, &job.id, sizeof(job.id));
> - if (written != sizeof(int)) {
> + ret = pool->signal_fn(job.id,
> + pool->signal_private_data);
> + if (ret != 0) {
> pthreadpool_server_exit(pool);
> pthread_mutex_unlock(&pool->mutex);
> return NULL;
> diff --git a/source3/lib/pthreadpool/pthreadpool.h b/source3/lib/pthreadpool/pthreadpool.h
> index adb825a..0b8d6e5 100644
> --- a/source3/lib/pthreadpool/pthreadpool.h
> +++ b/source3/lib/pthreadpool/pthreadpool.h
> @@ -43,7 +43,9 @@ struct pthreadpool;
> * max_threads=0 means unlimited parallelism. The caller has to take
> * care to not overload the system.
> */
> -int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult);
> +int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
> + int (*signal_fn)(int jobid, void *private_data),
> + void *signal_private_data);
>
> /**
> * @brief Destroy a pthreadpool
> @@ -60,8 +62,8 @@ int pthreadpool_destroy(struct pthreadpool *pool);
> * @brief Add a job to a pthreadpool
> *
> * This adds a job to a pthreadpool. The job can be identified by
> - * job_id. This integer will be returned from
> - * pthreadpool_finished_jobs() then the job is completed.
> + * job_id. This integer will be passed to signal_fn() when the
> + * job is completed.
> *
> * @param[in] pool The pool to run the job on
> * @param[in] job_id A custom identifier
> @@ -72,30 +74,4 @@ int pthreadpool_destroy(struct pthreadpool *pool);
> int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
> void (*fn)(void *private_data), void *private_data);
>
> -/**
> - * @brief Get the signalling fd from a pthreadpool
> - *
> - * Completion of a job is indicated by readability of the fd returned
> - * by pthreadpool_signal_fd().
> - *
> - * @param[in] pool The pool in question
> - * @return The fd to listen on for readability
> - */
> -int pthreadpool_signal_fd(struct pthreadpool *pool);
> -
> -/**
> - * @brief Get the job_ids of finished jobs
> - *
> - * This blocks until a job has finished unless the fd returned by
> - * pthreadpool_signal_fd() is readable.
> - *
> - * @param[in] pool The pool to query for finished jobs
> - * @param[out] jobids The job_ids of the finished job
> - * @param[int] num_jobids The job_ids array size
> - * @return success: >=0, number of finished jobs
> - * failure: -errno
> - */
> -int pthreadpool_finished_jobs(struct pthreadpool *pool, int *jobids,
> - unsigned num_jobids);
> -
> #endif
> diff --git a/source3/lib/pthreadpool/pthreadpool_pipe.c b/source3/lib/pthreadpool/pthreadpool_pipe.c
> index 76bafa2..3eaf5e3 100644
> --- a/source3/lib/pthreadpool/pthreadpool_pipe.c
> +++ b/source3/lib/pthreadpool/pthreadpool_pipe.c
> @@ -24,26 +24,57 @@
>
> struct pthreadpool_pipe {
> struct pthreadpool *pool;
> + pid_t pid;
> + int pipe_fds[2];
> };
>
> +static int pthreadpool_pipe_signal(int jobid, void *private_data);
> +
> int pthreadpool_pipe_init(unsigned max_threads,
> struct pthreadpool_pipe **presult)
> {
> - struct pthreadpool_pipe *p;
> + struct pthreadpool_pipe *pool;
> int ret;
>
> - p = malloc(sizeof(struct pthreadpool_pipe));
> - if (p == NULL) {
> + pool = malloc(sizeof(struct pthreadpool_pipe));
> + if (pool == NULL) {
> return ENOMEM;
> }
> + pool->pid = getpid();
> +
> + ret = pipe(pool->pipe_fds);
> + if (ret == -1) {
> + int err = errno;
> + free(pool);
> + return err;
> + }
>
> - ret = pthreadpool_init(max_threads, &p->pool);
> + ret = pthreadpool_init(max_threads, &pool->pool,
> + pthreadpool_pipe_signal, pool);
> if (ret != 0) {
> - free(p);
> + close(pool->pipe_fds[0]);
> + close(pool->pipe_fds[1]);
> + free(pool);
> return ret;
> }
>
> - *presult = p;
> + *presult = pool;
> + return 0;
> +}
> +
> +static int pthreadpool_pipe_signal(int jobid, void *private_data)
> +{
> + struct pthreadpool_pipe *pool = private_data;
> + ssize_t written;
> +
> + do {
> + written = write(pool->pipe_fds[1], &jobid, sizeof(jobid));
> + } while ((written == -1) && (errno == EINTR));
> +
> + if (written != sizeof(jobid)) {
> + return errno;
> + }
> +
> return 0;
> }
>
> @@ -55,30 +86,91 @@ int pthreadpool_pipe_destroy(struct pthreadpool_pipe *pool)
> if (ret != 0) {
> return ret;
> }
> +
> + close(pool->pipe_fds[0]);
> + pool->pipe_fds[0] = -1;
> +
> + close(pool->pipe_fds[1]);
> + pool->pipe_fds[1] = -1;
> +
> free(pool);
> return 0;
> }
>
> +static int pthreadpool_pipe_reinit(struct pthreadpool_pipe *pool)
> +{
> + pid_t pid = getpid();
> + int signal_fd;
> + int ret;
> +
> + if (pid == pool->pid) {
> + return 0;
> + }
> +
> + signal_fd = pool->pipe_fds[0];
> +
> + close(pool->pipe_fds[0]);
> + pool->pipe_fds[0] = -1;
> +
> + close(pool->pipe_fds[1]);
> + pool->pipe_fds[1] = -1;
> +
> + ret = pipe(pool->pipe_fds);
> + if (ret != 0) {
> + return errno;
> + }
> +
> + ret = dup2(pool->pipe_fds[0], signal_fd);
> + if (ret != 0) {
> + return errno;
> + }
> +
> + pool->pipe_fds[0] = signal_fd;
> +
> + return 0;
> +}
> +
> int pthreadpool_pipe_add_job(struct pthreadpool_pipe *pool, int job_id,
> void (*fn)(void *private_data),
> void *private_data)
> {
> int ret;
> +
> + ret = pthreadpool_pipe_reinit(pool);
> + if (ret != 0) {
> + return ret;
> + }
> +
> ret = pthreadpool_add_job(pool->pool, job_id, fn, private_data);
> return ret;
> }
>
> int pthreadpool_pipe_signal_fd(struct pthreadpool_pipe *pool)
> {
> - int fd;
> - fd = pthreadpool_signal_fd(pool->pool);
> - return fd;
> + return pool->pipe_fds[0];
> }
>
> int pthreadpool_pipe_finished_jobs(struct pthreadpool_pipe *pool, int *jobids,
> unsigned num_jobids)
> {
> - int ret;
> - ret = pthreadpool_finished_jobs(pool->pool, jobids, num_jobids);
> - return ret;
> + ssize_t to_read, nread;
> + pid_t pid = getpid();
> +
> + if (pool->pid != pid) {
> + return EINVAL;
> + }
> +
> + to_read = sizeof(int) * num_jobids;
> +
> + do {
> + nread = read(pool->pipe_fds[0], jobids, to_read);
> + } while ((nread == -1) && (errno == EINTR));
> +
> + if (nread == -1) {
> + return -errno;
> + }
> + if ((nread % sizeof(int)) != 0) {
> + return -EINVAL;
> + }
> + return nread / sizeof(int);
> }
> diff --git a/source3/lib/pthreadpool/pthreadpool_sync.c b/source3/lib/pthreadpool/pthreadpool_sync.c
> index 5f06cae..3e78f46 100644
> --- a/source3/lib/pthreadpool/pthreadpool_sync.c
> +++ b/source3/lib/pthreadpool/pthreadpool_sync.c
> @@ -17,165 +17,47 @@
> * along with this program. If not, see <http://www.gnu.org/licenses/>.
> */
>
> -#include <errno.h>
> -#include <stdio.h>
> -#include <unistd.h>
> -#include <stdlib.h>
> -#include <string.h>
> -#include <signal.h>
> -#include <assert.h>
> -#include <fcntl.h>
> -#include <sys/time.h>
>
> +#include "replace.h"
> #include "pthreadpool.h"
>
> struct pthreadpool {
> /*
> - * pipe for signalling
> + * Indicate job completion
> */
> - int sig_pipe[2];
> -
> - /*
> - * Have we sent something into the pipe that has not been
> - * retrieved yet?
> - */
> - int pipe_busy;
> -
> - /*
> - * Jobids that we have not sent into the pipe yet
> - */
> - size_t num_ids;
> - int *ids;
> + int (*signal_fn)(int jobid,
> + void *private_data);
> + void *signal_private_data;
> };
>
> -int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
> +int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
> + int (*signal_fn)(int jobid,
> + void *private_data),
> + void *signal_private_data)
> {
> struct pthreadpool *pool;
> - int ret;
>
> pool = (struct pthreadpool *)calloc(1, sizeof(struct pthreadpool));
> if (pool == NULL) {
> return ENOMEM;
> }
> - ret = pipe(pool->sig_pipe);
> - if (ret == -1) {
> - int err = errno;
> - free(pool);
> - return err;
> - }
> - *presult = pool;
> - return 0;
> -}
> -
> -int pthreadpool_signal_fd(struct pthreadpool *pool)
> -{
> - return pool->sig_pipe[0];
> -}
> -
> -static int pthreadpool_write_to_pipe(struct pthreadpool *pool)
> -{
> - ssize_t written;
> -
> - if (pool->pipe_busy) {
> - return 0;
> - }
> - if (pool->num_ids == 0) {
> - return 0;
> - }
> + pool->signal_fn = signal_fn;
> + pool->signal_private_data = signal_private_data;
>
> - written = -1;
> - errno = EINTR;
> -
> - while ((written == -1) && (errno == EINTR)) {
> - written = write(pool->sig_pipe[1], &pool->ids[0], sizeof(int));
> - }
> - if (written == -1) {
> - return errno;
> - }
> - if (written != sizeof(int)) {
> - /*
> - * If a single int only partially fits into the pipe,
> - * we can assume ourselves pretty broken
> - */
> - close(pool->sig_pipe[1]);
> - pool->sig_pipe[1] = -1;
> - return EIO;
> - }
> -
> - if (pool->num_ids > 1) {
> - memmove(pool->ids, pool->ids+1, sizeof(int) * (pool->num_ids-1));
> - }
> - pool->num_ids -= 1;
> - pool->pipe_busy = 1;
> + *presult = pool;
> return 0;
> }
>
> int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
> void (*fn)(void *private_data), void *private_data)
> {
> - int *tmp;
> -
> - if (pool->sig_pipe[1] == -1) {
> - return EIO;
> - }
> -
> fn(private_data);
>
> - tmp = realloc(pool->ids, sizeof(int) * (pool->num_ids+1));
> - if (tmp == NULL) {
> - return ENOMEM;
> - }
> - pool->ids = tmp;
> - pool->ids[pool->num_ids] = job_id;
> - pool->num_ids += 1;
> -
> - return pthreadpool_write_to_pipe(pool);
> -
> -}
> -
> -int pthreadpool_finished_jobs(struct pthreadpool *pool, int *jobids,
> - unsigned num_jobids)
> -{
> - ssize_t to_read, nread;
> - int ret;
> -
> - nread = -1;
> - errno = EINTR;
> -
> - to_read = sizeof(int) * num_jobids;
> -
> - while ((nread == -1) && (errno == EINTR)) {
> - nread = read(pool->sig_pipe[0], jobids, to_read);
> - }
> - if (nread == -1) {
> - return -errno;
> - }
> - if ((nread % sizeof(int)) != 0) {
> - return -EINVAL;
> - }
> -
> - pool->pipe_busy = 0;
> -
> - ret = pthreadpool_write_to_pipe(pool);
> - if (ret != 0) {
> - return -ret;
> - }
> -
> - return nread / sizeof(int);
> + return pool->signal_fn(job_id, pool->signal_private_data);
> }
>
> int pthreadpool_destroy(struct pthreadpool *pool)
> {
> - if (pool->sig_pipe[0] != -1) {
> - close(pool->sig_pipe[0]);
> - pool->sig_pipe[0] = -1;
> - }
> -
> - if (pool->sig_pipe[1] != -1) {
> - close(pool->sig_pipe[1]);
> - pool->sig_pipe[1] = -1;
> - }
> - free(pool->ids);
> free(pool);
> return 0;
> }
> --
> 2.1.4
>
>
> From dc446e50c8ec336f514bc4ac3e378144c0784c21 Mon Sep 17 00:00:00 2001
> From: Volker Lendecke <vl at samba.org>
> Date: Sun, 31 Jul 2016 08:57:35 +0200
> Subject: [PATCH 15/23] lib: add job data to to callback
>
> The pthreadpool_tevent wrapper will need this
>
> Signed-off-by: Volker Lendecke <vl at samba.org>
> Reviewed-by: Stefan Metzmacher <metze at samba.org>
> ---
> source3/lib/pthreadpool/pthreadpool.c | 19 +++++++++++++------
> source3/lib/pthreadpool/pthreadpool.h | 7 +++++--
> source3/lib/pthreadpool/pthreadpool_pipe.c | 10 ++++++++--
> source3/lib/pthreadpool/pthreadpool_sync.c | 13 +++++++++----
> 4 files changed, 35 insertions(+), 14 deletions(-)
>
> diff --git a/source3/lib/pthreadpool/pthreadpool.c b/source3/lib/pthreadpool/pthreadpool.c
> index 4c2858a..fc21d43 100644
> --- a/source3/lib/pthreadpool/pthreadpool.c
> +++ b/source3/lib/pthreadpool/pthreadpool.c
> @@ -59,8 +59,11 @@ struct pthreadpool {
> /*
> * Indicate job completion
> */
> - int (*signal_fn)(int jobid, void *private_data);
> - void *signal_private_data;
> + int (*signal_fn)(int jobid,
> + void (*job_fn)(void *private_data),
> + void *job_fn_private_data,
> + void *private_data);
> + void *signal_fn_private_data;
>
> /*
> * indicator to worker threads that they should shut down
> @@ -100,8 +103,11 @@ static void pthreadpool_prep_atfork(void);
> */
>
> int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
> - int (*signal_fn)(int jobid, void *private_data),
> - void *signal_private_data)
> + int (*signal_fn)(int jobid,
> + void (*job_fn)(void *private_data),
> + void *job_fn_private_data,
> + void *private_data),
> + void *signal_fn_private_data)
> {
> struct pthreadpool *pool;
> int ret;
> @@ -111,7 +117,7 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
> return ENOMEM;
> }
> pool->signal_fn = signal_fn;
> - pool->signal_private_data = signal_private_data;
> + pool->signal_fn_private_data = signal_fn_private_data;
>
> pool->jobs_array_len = 4;
> pool->jobs = calloc(
> @@ -484,7 +490,8 @@ static void *pthreadpool_server(void *arg)
> assert(res == 0);
>
> ret = pool->signal_fn(job.id,
> - pool->signal_private_data);
> + job.fn, job.private_data,
> + pool->signal_fn_private_data);
> if (ret != 0) {
> pthreadpool_server_exit(pool);
> pthread_mutex_unlock(&pool->mutex);
> diff --git a/source3/lib/pthreadpool/pthreadpool.h b/source3/lib/pthreadpool/pthreadpool.h
> index 0b8d6e5..ee9d957 100644
> --- a/source3/lib/pthreadpool/pthreadpool.h
> +++ b/source3/lib/pthreadpool/pthreadpool.h
> @@ -44,8 +44,11 @@ struct pthreadpool;
> * care to not overload the system.
> */
> int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
> - int (*signal_fn)(int jobid, void *private_data),
> - void *signal_private_data);
> + int (*signal_fn)(int jobid,
> + void (*job_fn)(void *private_data),
> + void *job_fn_private_data,
> + void *private_data),
> + void *signal_fn_private_data);
>
> /**
> * @brief Destroy a pthreadpool
> diff --git a/source3/lib/pthreadpool/pthreadpool_pipe.c b/source3/lib/pthreadpool/pthreadpool_pipe.c
> index 3eaf5e3..f7995ab 100644
> --- a/source3/lib/pthreadpool/pthreadpool_pipe.c
> +++ b/source3/lib/pthreadpool/pthreadpool_pipe.c
> @@ -28,7 +28,10 @@ struct pthreadpool_pipe {
> int pipe_fds[2];
> };
>
> -static int pthreadpool_pipe_signal(int jobid, void *private_data);
> +static int pthreadpool_pipe_signal(int jobid,
> + void (*job_fn)(void *private_data),
> + void *job_private_data,
> + void *private_data);
>
> int pthreadpool_pipe_init(unsigned max_threads,
> struct pthreadpool_pipe **presult)
> @@ -62,7 +65,10 @@ int pthreadpool_pipe_init(unsigned max_threads,
> return 0;
> }
>
> -static int pthreadpool_pipe_signal(int jobid, void *private_data)
> +static int pthreadpool_pipe_signal(int jobid,
> + void (*job_fn)(void *private_data),
> + void *job_private_data,
> + void *private_data)
> {
> struct pthreadpool_pipe *pool = private_data;
> ssize_t written;
> diff --git a/source3/lib/pthreadpool/pthreadpool_sync.c b/source3/lib/pthreadpool/pthreadpool_sync.c
> index 3e78f46..d9a95f5 100644
> --- a/source3/lib/pthreadpool/pthreadpool_sync.c
> +++ b/source3/lib/pthreadpool/pthreadpool_sync.c
> @@ -26,14 +26,18 @@ struct pthreadpool {
> * Indicate job completion
> */
> int (*signal_fn)(int jobid,
> + void (*job_fn)(void *private_data),
> + void *job_fn_private_data,
> void *private_data);
> - void *signal_private_data;
> + void *signal_fn_private_data;
> };
>
> int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
> int (*signal_fn)(int jobid,
> + void (*job_fn)(void *private_data),
> + void *job_fn_private_data,
> void *private_data),
> - void *signal_private_data)
> + void *signal_fn_private_data)
> {
> struct pthreadpool *pool;
>
> @@ -42,7 +46,7 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
> return ENOMEM;
> }
> pool->signal_fn = signal_fn;
> - pool->signal_private_data = signal_private_data;
> + pool->signal_fn_private_data = signal_fn_private_data;
>
> *presult = pool;
> return 0;
> @@ -53,7 +57,8 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
> {
> fn(private_data);
>
> - return pool->signal_fn(job_id, pool->signal_private_data);
> + return pool->signal_fn(job_id, fn, private_data,
> + pool->signal_fn_private_data);
> }
>
> int pthreadpool_destroy(struct pthreadpool *pool)
> --
> 2.1.4
>
>
> From a4fd4f911c2d0373f230d2cde5adfcedbd3bcf61 Mon Sep 17 00:00:00 2001
> From: Volker Lendecke <vl at samba.org>
> Date: Mon, 8 Aug 2016 15:02:36 +0200
> Subject: [PATCH 16/23] lib: Add pthreadpool_tevent
>
> This is a replacement for fncall.[ch] without having to go through
> a pipe for job completion signalling
>
> Signed-off-by: Volker Lendecke <vl at samba.org>
> Reviewed-by: Stefan Metzmacher <metze at samba.org>
> ---
> source3/lib/pthreadpool/pthreadpool_tevent.c | 240 +++++++++++++++++++++++++++
> source3/lib/pthreadpool/pthreadpool_tevent.h | 37 +++++
> source3/lib/pthreadpool/wscript_build | 14 +-
> 3 files changed, 287 insertions(+), 4 deletions(-)
> create mode 100644 source3/lib/pthreadpool/pthreadpool_tevent.c
> create mode 100644 source3/lib/pthreadpool/pthreadpool_tevent.h
>
> diff --git a/source3/lib/pthreadpool/pthreadpool_tevent.c b/source3/lib/pthreadpool/pthreadpool_tevent.c
> new file mode 100644
> index 0000000..02a7f2f
> --- /dev/null
> +++ b/source3/lib/pthreadpool/pthreadpool_tevent.c
> @@ -0,0 +1,240 @@
> +/*
> + * Unix SMB/CIFS implementation.
> + * threadpool implementation based on pthreads
> + * Copyright (C) Volker Lendecke 2009,2011
> + *
> + * This program is free software; you can redistribute it and/or modify
> + * it under the terms of the GNU General Public License as published by
> + * the Free Software Foundation; either version 3 of the License, or
> + * (at your option) any later version.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + * GNU General Public License for more details.
> + *
> + * You should have received a copy of the GNU General Public License
> + * along with this program. If not, see <http://www.gnu.org/licenses/>.
> + */
> +
> +#include "replace.h"
> +#include "pthreadpool_tevent.h"
> +#include "pthreadpool.h"
> +#include "lib/util/tevent_unix.h"
> +#include "lib/util/dlinklist.h"
> +
> +struct pthreadpool_tevent_job_state;
> +
> +struct pthreadpool_tevent {
> + struct pthreadpool *pool;
> +
> + struct pthreadpool_tevent_job_state *jobs;
> +};
> +
> +static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool);
> +
> +static int pthreadpool_tevent_job_signal(int jobid,
> + void (*job_fn)(void *private_data),
> + void *job_private_data,
> + void *private_data);
> +
> +int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
> + struct pthreadpool_tevent **presult)
> +{
> + struct pthreadpool_tevent *pool;
> + int ret;
> +
> + pool = talloc_zero(mem_ctx, struct pthreadpool_tevent);
> + if (pool == NULL) {
> + return ENOMEM;
> + }
> +
> + ret = pthreadpool_init(max_threads, &pool->pool,
> + pthreadpool_tevent_job_signal, pool);
> + if (ret != 0) {
> + TALLOC_FREE(pool);
> + return ret;
> + }
> +
> + talloc_set_destructor(pool, pthreadpool_tevent_destructor);
> +
> + *presult = pool;
> + return 0;
> +}
> +
> +static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
> +{
> + int ret;
> +
> + ret = pthreadpool_destroy(pool->pool);
> + if (ret != 0) {
> + return ret;
> + }
> + pool->pool = NULL;
> +
> + if (pool->jobs != NULL) {
> + abort();
> + }
> +
> + return 0;
> +}
> +
> +struct pthreadpool_tevent_job_state {
> + struct pthreadpool_tevent_job_state *prev, *next;
> + struct pthreadpool_tevent *pool;
> + struct tevent_context *ev;
> + struct tevent_threaded_context *tctx;
> + struct tevent_immediate *im;
> + struct tevent_req *req;
> +
> + void (*fn)(void *private_data);
> + void *private_data;
> +};
> +
> +static void pthreadpool_tevent_job_fn(void *private_data);
> +static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
> + struct tevent_immediate *im,
> + void *private_data);
> +
> +static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job_state *state)
> +{
> + if (state->pool == NULL) {
> + return 0;
> + }
> +
> + /*
> + * We should never be called with state->req != NULL,
> + * state->pool must be cleared before the 2nd talloc_free().
> + */
> + if (state->req != NULL) {
> + abort();
> + }
> +
> + /*
> + * We need to reparent to a long term context.
> + */
> + (void)talloc_reparent(state->req, state->pool, state);
> + state->req = NULL;
> + return -1;
> +}
> +
> +struct tevent_req *pthreadpool_tevent_job_send(
> + TALLOC_CTX *mem_ctx, struct tevent_context *ev,
> + struct pthreadpool_tevent *pool,
> + void (*fn)(void *private_data), void *private_data)
> +{
> + struct tevent_req *req;
> + struct pthreadpool_tevent_job_state *state;
> + int ret;
> +
> + req = tevent_req_create(mem_ctx, &state,
> + struct pthreadpool_tevent_job_state);
> + if (req == NULL) {
> + return NULL;
> + }
> + state->pool = pool;
> + state->ev = ev;
> + state->req = req;
> + state->fn = fn;
> + state->private_data = private_data;
> +
> + state->im = tevent_create_immediate(state);
> + if (tevent_req_nomem(state->im, req)) {
> + return tevent_req_post(req, ev);
> + }
> +
> +#ifdef HAVE_PTHREAD
> + state->tctx = tevent_threaded_context_create(state, ev);
> + if (state->tctx == NULL && errno == ENOSYS) {
> + /*
> + * Samba build with pthread support but
> + * tevent without???
> + */
> + tevent_req_error(req, ENOSYS);
> + return tevent_req_post(req, ev);
> + }
> + if (tevent_req_nomem(state->tctx, req)) {
> + return tevent_req_post(req, ev);
> + }
> +#endif
> +
> + ret = pthreadpool_add_job(pool->pool, 0,
> + pthreadpool_tevent_job_fn,
> + state);
> + if (tevent_req_error(req, ret)) {
> + return tevent_req_post(req, ev);
> + }
> +
> + /*
> + * Once the job is scheduled, we need to protect
> + * our memory.
> + */
> + talloc_set_destructor(state, pthreadpool_tevent_job_destructor);
> +
> + DLIST_ADD_END(pool->jobs, state);
> +
> + return req;
> +}
> +
> +static void pthreadpool_tevent_job_fn(void *private_data)
> +{
> + struct pthreadpool_tevent_job_state *state = talloc_get_type_abort(
> + private_data, struct pthreadpool_tevent_job_state);
> + state->fn(state->private_data);
> +}
> +
> +static int pthreadpool_tevent_job_signal(int jobid,
> + void (*job_fn)(void *private_data),
> + void *job_private_data,
> + void *private_data)
> +{
> + struct pthreadpool_tevent_job_state *state = talloc_get_type_abort(
> + job_private_data, struct pthreadpool_tevent_job_state);
> +
> + if (state->tctx != NULL) {
> + /* with HAVE_PTHREAD */
> + tevent_threaded_schedule_immediate(state->tctx, state->im,
> + pthreadpool_tevent_job_done,
> + state);
> + } else {
> + /* without HAVE_PTHREAD */
> + tevent_schedule_immediate(state->im, state->ev,
> + pthreadpool_tevent_job_done,
> + state);
> + }
> +
> + return 0;
> +}
> +
> +static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
> + struct tevent_immediate *im,
> + void *private_data)
> +{
> + struct pthreadpool_tevent_job_state *state = talloc_get_type_abort(
> + private_data, struct pthreadpool_tevent_job_state);
> +
> + DLIST_REMOVE(state->pool->jobs, state);
> + state->pool = NULL;
> +
> + TALLOC_FREE(state->tctx);
> +
> + if (state->req == NULL) {
> + /*
> + * There was a talloc_free() state->req
> + * while the job was pending,
> + * which mean we're reparented on a longterm
> + * talloc context.
> + *
> + * We just cleanup here...
> + */
> + talloc_free(state);
> + return;
> + }
> +
> + tevent_req_done(state->req);
> +}
> +
> +int pthreadpool_tevent_job_recv(struct tevent_req *req)
> +{
> + return tevent_req_simple_recv_unix(req);
> +}
> diff --git a/source3/lib/pthreadpool/pthreadpool_tevent.h b/source3/lib/pthreadpool/pthreadpool_tevent.h
> new file mode 100644
> index 0000000..de74a34
> --- /dev/null
> +++ b/source3/lib/pthreadpool/pthreadpool_tevent.h
> @@ -0,0 +1,37 @@
> +/*
> + * Unix SMB/CIFS implementation.
> + * threadpool implementation based on pthreads
> + * Copyright (C) Volker Lendecke 2016
> + *
> + * This program is free software; you can redistribute it and/or modify
> + * it under the terms of the GNU General Public License as published by
> + * the Free Software Foundation; either version 3 of the License, or
> + * (at your option) any later version.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + * GNU General Public License for more details.
> + *
> + * You should have received a copy of the GNU General Public License
> + * along with this program. If not, see <http://www.gnu.org/licenses/>.
> + */
> +
> +#ifndef __PTHREADPOOL_TEVENT_H__
> +#define __PTHREADPOOL_TEVENT_H__
> +
> +#include <tevent.h>
> +
> +struct pthreadpool_tevent;
> +
> +int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
> + struct pthreadpool_tevent **presult);
> +
> +struct tevent_req *pthreadpool_tevent_job_send(
> + TALLOC_CTX *mem_ctx, struct tevent_context *ev,
> + struct pthreadpool_tevent *pool,
> + void (*fn)(void *private_data), void *private_data);
> +
> +int pthreadpool_tevent_job_recv(struct tevent_req *req);
> +
> +#endif
> diff --git a/source3/lib/pthreadpool/wscript_build b/source3/lib/pthreadpool/wscript_build
> index aa02850..8195af7 100644
> --- a/source3/lib/pthreadpool/wscript_build
> +++ b/source3/lib/pthreadpool/wscript_build
> @@ -2,12 +2,18 @@
>
> if bld.env.WITH_PTHREADPOOL:
> bld.SAMBA3_SUBSYSTEM('PTHREADPOOL',
> - source='pthreadpool.c pthreadpool_pipe.c',
> - deps='pthread rt replace')
> + source='''pthreadpool.c
> + pthreadpool_pipe.c
> + pthreadpool_tevent.c
> + ''',
> + deps='pthread rt replace tevent-util')
> else:
> bld.SAMBA3_SUBSYSTEM('PTHREADPOOL',
> - source='pthreadpool_sync.c pthreadpool_pipe.c',
> - deps='replace')
> + source='''pthreadpool_sync.c
> + pthreadpool_pipe.c
> + pthreadpool_tevent.c
> + ''',
> + deps='replace tevent-util')
>
>
> bld.SAMBA3_BINARY('pthreadpooltest',
> --
> 2.1.4
>
>
> From 037ffd0b7740d40015bbe8c95aa329113d4861cb Mon Sep 17 00:00:00 2001
> From: Volker Lendecke <vl at samba.org>
> Date: Mon, 8 Aug 2016 15:04:39 +0200
> Subject: [PATCH 17/23] smbtorture3: Add LOCAL-PTHREADPOOL-TEVENT
>
> Signed-off-by: Volker Lendecke <vl at samba.org>
> Reviewed-by: Stefan Metzmacher <metze at samba.org>
> ---
> source3/selftest/tests.py | 1 +
> source3/torture/proto.h | 1 +
> source3/torture/test_pthreadpool_tevent.c | 82 +++++++++++++++++++++++++++++++
> source3/torture/torture.c | 1 +
> source3/wscript_build | 1 +
> 5 files changed, 86 insertions(+)
> create mode 100644 source3/torture/test_pthreadpool_tevent.c
>
> diff --git a/source3/selftest/tests.py b/source3/selftest/tests.py
> index 0a0cb08..dbd4c58 100755
> --- a/source3/selftest/tests.py
> +++ b/source3/selftest/tests.py
> @@ -114,6 +114,7 @@ local_tests = [
> "LOCAL-MESSAGING-FDPASS2",
> "LOCAL-MESSAGING-FDPASS2a",
> "LOCAL-MESSAGING-FDPASS2b",
> + "LOCAL-PTHREADPOOL-TEVENT",
> "LOCAL-hex_encode_buf",
> "LOCAL-sprintf_append",
> "LOCAL-remove_duplicate_addrs2"]
> diff --git a/source3/torture/proto.h b/source3/torture/proto.h
> index fc7c33f..7d2dedd 100644
> --- a/source3/torture/proto.h
> +++ b/source3/torture/proto.h
> @@ -121,5 +121,6 @@ bool run_messaging_fdpass2(int dummy);
> bool run_messaging_fdpass2a(int dummy);
> bool run_messaging_fdpass2b(int dummy);
> bool run_oplock_cancel(int dummy);
> +bool run_pthreadpool_tevent(int dummy);
>
> #endif /* __TORTURE_H__ */
> diff --git a/source3/torture/test_pthreadpool_tevent.c b/source3/torture/test_pthreadpool_tevent.c
> new file mode 100644
> index 0000000..c90a394
> --- /dev/null
> +++ b/source3/torture/test_pthreadpool_tevent.c
> @@ -0,0 +1,82 @@
> +/*
> + * Unix SMB/CIFS implementation.
> + * Test pthreadpool_tevent
> + * Copyright (C) Volker Lendecke 2016
> + *
> + * This program is free software; you can redistribute it and/or modify
> + * it under the terms of the GNU General Public License as published by
> + * the Free Software Foundation; either version 3 of the License, or
> + * (at your option) any later version.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + * GNU General Public License for more details.
> + *
> + * You should have received a copy of the GNU General Public License
> + * along with this program. If not, see <http://www.gnu.org/licenses/>.
> + */
> +
> +#include "includes.h"
> +#include "system/select.h"
> +#include "proto.h"
> +#include "lib/pthreadpool/pthreadpool_tevent.h"
> +
> +static void job_fn(void *private_data);
> +
> +bool run_pthreadpool_tevent(int dummy)
> +{
> + struct tevent_context *ev;
> + struct pthreadpool_tevent *pool;
> + struct tevent_req *req;
> + int ret, val;
> + bool ok;
> +
> + ev = tevent_context_init_byname(NULL, "poll");
> + if (ev == NULL) {
> + fprintf(stderr, "tevent_context_init failed\n");
> + return false;
> + }
> +
> + ret = pthreadpool_tevent_init(ev, 100, &pool);
> + if (ret != 0) {
> + fprintf(stderr, "pthreadpool_tevent_init failed: %s\n",
> + strerror(ret));
> + return false;
> + }
> +
> + val = -1;
> +
> + req = pthreadpool_tevent_job_send(ev, ev, pool, job_fn, &val);
> + if (req == NULL) {
> + fprintf(stderr, "pthreadpool_tevent_job_send failed\n");
> + return false;
> + }
> +
> + ok = tevent_req_poll(req, ev);
> + if (!ok) {
> + fprintf(stderr, "tevent_req_poll failed\n");
> + return false;
> + }
> +
> + ret = pthreadpool_tevent_job_recv(req);
> + if (ret != 0) {
> + fprintf(stderr, "pthreadpool_tevent_job failed: %s\n",
> + strerror(ret));
> + return false;
> + }
> +
> + printf("%d\n", val);
> +
> + TALLOC_FREE(pool);
> + TALLOC_FREE(ev);
> + return true;
> +}
> +
> +static void job_fn(void *private_data)
> +{
> + int *pret = private_data;
> + *pret = 4711;
> +
> + poll(NULL, 0, 100);
> +}
> diff --git a/source3/torture/torture.c b/source3/torture/torture.c
> index f9766bb..06b919e 100644
> --- a/source3/torture/torture.c
> +++ b/source3/torture/torture.c
> @@ -10534,6 +10534,7 @@ static struct {
> { "local-tdb-writer", run_local_tdb_writer, 0 },
> { "LOCAL-DBWRAP-CTDB", run_local_dbwrap_ctdb, 0 },
> { "LOCAL-BENCH-PTHREADPOOL", run_bench_pthreadpool, 0 },
> + { "LOCAL-PTHREADPOOL-TEVENT", run_pthreadpool_tevent, 0 },
> { "qpathinfo-bufsize", run_qpathinfo_bufsize, 0 },
> {NULL, NULL, 0}};
>
> diff --git a/source3/wscript_build b/source3/wscript_build
> index edf921c..1d6f043 100755
> --- a/source3/wscript_build
> +++ b/source3/wscript_build
> @@ -1288,6 +1288,7 @@ bld.SAMBA3_BINARY('smbtorture' + bld.env.suffix3,
> torture/test_messaging_read.c
> torture/test_messaging_fd_passing.c
> torture/test_oplock_cancel.c
> + torture/test_pthreadpool_tevent.c
> torture/t_strappend.c
> torture/bench_pthreadpool.c
> torture/wbc_async.c''',
> --
> 2.1.4
>
>
> From 97c2484a2e83b66ce288c7f5d9182776eb92c597 Mon Sep 17 00:00:00 2001
> From: Volker Lendecke <vl at samba.org>
> Date: Mon, 8 Aug 2016 15:07:30 +0200
> Subject: [PATCH 18/23] smbd: Add pthreadpool_tevent to smbd_server_connection
>
> Prerequisite to convert the vfs _send/recv functions
>
> Signed-off-by: Volker Lendecke <vl at samba.org>
> Reviewed-by: Stefan Metzmacher <metze at samba.org>
> ---
> source3/modules/vfs_default.c | 15 +++++++++++++++
> source3/smbd/globals.h | 4 ++++
> 2 files changed, 19 insertions(+)
>
> diff --git a/source3/modules/vfs_default.c b/source3/modules/vfs_default.c
> index 5227e95..8021319 100644
> --- a/source3/modules/vfs_default.c
> +++ b/source3/modules/vfs_default.c
> @@ -33,6 +33,7 @@
> #include "lib/asys/asys.h"
> #include "lib/util/tevent_ntstatus.h"
> #include "lib/util/sys_rw.h"
> +#include "lib/pthreadpool/pthreadpool_tevent.h"
>
> #undef DBGC_CLASS
> #define DBGC_CLASS DBGC_VFS
> @@ -750,6 +751,20 @@ fail:
> return false;
> }
>
> +static int vfswrap_init_pool(struct smbd_server_connection *conn)
> +{
> + int ret;
> +
> + if (conn->pool != NULL) {
> + return 0;
> + }
> +
> + ret = pthreadpool_tevent_init(conn, lp_aio_max_threads(),
> + &conn->pool);
> + return ret;
> +}
> +
> +
> struct vfswrap_asys_state {
> struct asys_context *asys_ctx;
> struct tevent_req *req;
> diff --git a/source3/smbd/globals.h b/source3/smbd/globals.h
> index 8ba564d..61d85cb 100644
> --- a/source3/smbd/globals.h
> +++ b/source3/smbd/globals.h
> @@ -854,6 +854,8 @@ struct user_struct {
> struct smbXsrv_session *session;
> };
>
> +struct pthreadpool_tevent;
> +
> struct smbd_server_connection {
> const struct tsocket_address *local_address;
> const struct tsocket_address *remote_address;
> @@ -925,6 +927,8 @@ struct smbd_server_connection {
> struct asys_context *asys_ctx;
> struct tevent_fd *asys_fde;
>
> + struct pthreadpool_tevent *pool;
> +
> struct smbXsrv_client *client;
> };
>
> --
> 2.1.4
>
>
> From 5bc65cceca2c776e0a2506286d8f884ab3b9f8b2 Mon Sep 17 00:00:00 2001
> From: Volker Lendecke <vl at samba.org>
> Date: Sat, 6 Aug 2016 21:58:09 +0200
> Subject: [PATCH 19/23] vfs: Convert vfs_pread_send to pthreadpool_tevent
>
> Signed-off-by: Volker Lendecke <vl at samba.org>
> Reviewed-by: Stefan Metzmacher <metze at samba.org>
> ---
> source3/modules/vfs_default.c | 106 +++++++++++++++++++++++++++++++++++++-----
> 1 file changed, 94 insertions(+), 12 deletions(-)
>
> diff --git a/source3/modules/vfs_default.c b/source3/modules/vfs_default.c
> index 8021319..4007395 100644
> --- a/source3/modules/vfs_default.c
> +++ b/source3/modules/vfs_default.c
> @@ -780,6 +780,21 @@ static int vfswrap_asys_state_destructor(struct vfswrap_asys_state *s)
> return 0;
> }
>
> +struct vfswrap_pread_state {
> + ssize_t ret;
> + int err;
> + int fd;
> + void *buf;
> + size_t count;
> + off_t offset;
> +
> + struct vfs_aio_state vfs_aio_state;
> + SMBPROFILE_BYTES_ASYNC_STATE(profile_bytes);
> +};
> +
> +static void vfs_pread_do(void *private_data);
> +static void vfs_pread_done(struct tevent_req *subreq);
> +
> static struct tevent_req *vfswrap_pread_send(struct vfs_handle_struct *handle,
> TALLOC_CTX *mem_ctx,
> struct tevent_context *ev,
> @@ -787,33 +802,100 @@ static struct tevent_req *vfswrap_pread_send(struct vfs_handle_struct *handle,
> void *data,
> size_t n, off_t offset)
> {
> - struct tevent_req *req;
> - struct vfswrap_asys_state *state;
> + struct tevent_req *req, *subreq;
> + struct vfswrap_pread_state *state;
> int ret;
>
> - req = tevent_req_create(mem_ctx, &state, struct vfswrap_asys_state);
> + req = tevent_req_create(mem_ctx, &state, struct vfswrap_pread_state);
> if (req == NULL) {
> return NULL;
> }
> - if (!vfswrap_init_asys_ctx(handle->conn->sconn)) {
> - tevent_req_oom(req);
> +
> + ret = vfswrap_init_pool(handle->conn->sconn);
> + if (tevent_req_error(req, ret)) {
> return tevent_req_post(req, ev);
> }
> - state->asys_ctx = handle->conn->sconn->asys_ctx;
> - state->req = req;
> +
> + state->ret = -1;
> + state->fd = fsp->fh->fd;
> + state->buf = data;
> + state->count = n;
> + state->offset = offset;
>
> SMBPROFILE_BYTES_ASYNC_START(syscall_asys_pread, profile_p,
> state->profile_bytes, n);
> - ret = asys_pread(state->asys_ctx, fsp->fh->fd, data, n, offset, req);
> - if (ret != 0) {
> - tevent_req_error(req, ret);
> + SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->profile_bytes);
> +
> + subreq = pthreadpool_tevent_job_send(
> + state, ev, handle->conn->sconn->pool,
> + vfs_pread_do, state);
> + if (tevent_req_nomem(subreq, req)) {
> return tevent_req_post(req, ev);
> }
> - talloc_set_destructor(state, vfswrap_asys_state_destructor);
> + tevent_req_set_callback(subreq, vfs_pread_done, req);
>
> return req;
> }
>
> +static void vfs_pread_do(void *private_data)
> +{
> + struct vfswrap_pread_state *state = talloc_get_type_abort(
> + private_data, struct vfswrap_pread_state);
> + struct timespec start_time;
> + struct timespec end_time;
> +
> + SMBPROFILE_BYTES_ASYNC_SET_BUSY(state->profile_bytes);
> +
> + PROFILE_TIMESTAMP(&start_time);
> +
> + do {
> + state->ret = pread(state->fd, state->buf, state->count,
> + state->offset);
> + } while ((state->ret == -1) && (errno == EINTR));
> +
> + state->err = errno;
> +
> + PROFILE_TIMESTAMP(&end_time);
> +
> + state->vfs_aio_state.duration = nsec_time_diff(&end_time, &start_time);
> +
> + SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->profile_bytes);
> +}
> +
> +static void vfs_pread_done(struct tevent_req *subreq)
> +{
> + struct tevent_req *req = tevent_req_callback_data(
> + subreq, struct tevent_req);
> +#ifdef WITH_PROFILE
> + struct vfswrap_pread_state *state = tevent_req_data(
> + req, struct vfswrap_pread_state);
> +#endif
> + int ret;
> +
> + ret = pthreadpool_tevent_job_recv(subreq);
> + TALLOC_FREE(subreq);
> + SMBPROFILE_BYTES_ASYNC_END(state->profile_bytes);
> + if (tevent_req_error(req, ret)) {
> + return;
> + }
> +
> + tevent_req_done(req);
> +}
> +
> +static ssize_t vfswrap_pread_recv(struct tevent_req *req,
> + struct vfs_aio_state *vfs_aio_state)
> +{
> + struct vfswrap_pread_state *state = tevent_req_data(
> + req, struct vfswrap_pread_state);
> +
> + if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) {
> + return -1;
> + }
> +
> + *vfs_aio_state = state->vfs_aio_state;
> + return state->ret;
> +}
> +
> static struct tevent_req *vfswrap_pwrite_send(struct vfs_handle_struct *handle,
> TALLOC_CTX *mem_ctx,
> struct tevent_context *ev,
> @@ -2660,7 +2742,7 @@ static struct vfs_fn_pointers vfs_default_fns = {
> .read_fn = vfswrap_read,
> .pread_fn = vfswrap_pread,
> .pread_send_fn = vfswrap_pread_send,
> - .pread_recv_fn = vfswrap_asys_ssize_t_recv,
> + .pread_recv_fn = vfswrap_pread_recv,
> .write_fn = vfswrap_write,
> .pwrite_fn = vfswrap_pwrite,
> .pwrite_send_fn = vfswrap_pwrite_send,
> --
> 2.1.4
>
>
> From 1ee198f7dc46d8627842fee1e5bedddff721b2e9 Mon Sep 17 00:00:00 2001
> From: Volker Lendecke <vl at samba.org>
> Date: Sun, 7 Aug 2016 15:44:52 +0200
> Subject: [PATCH 20/23] vfs: Convert vfs_write_send to pthreadpool_tevent
>
> Signed-off-by: Volker Lendecke <vl at samba.org>
> ---
> source3/modules/vfs_default.c | 106 +++++++++++++++++++++++++++++++++++++-----
> 1 file changed, 94 insertions(+), 12 deletions(-)
>
> diff --git a/source3/modules/vfs_default.c b/source3/modules/vfs_default.c
> index 4007395..8825e7b 100644
> --- a/source3/modules/vfs_default.c
> +++ b/source3/modules/vfs_default.c
> @@ -896,6 +896,21 @@ static ssize_t vfswrap_pread_recv(struct tevent_req *req,
> return state->ret;
> }
>
> +struct vfswrap_pwrite_state {
> + ssize_t ret;
> + int err;
> + int fd;
> + const void *buf;
> + size_t count;
> + off_t offset;
> +
> + struct vfs_aio_state vfs_aio_state;
> + SMBPROFILE_BYTES_ASYNC_STATE(profile_bytes);
> +};
> +
> +static void vfs_pwrite_do(void *private_data);
> +static void vfs_pwrite_done(struct tevent_req *subreq);
> +
> static struct tevent_req *vfswrap_pwrite_send(struct vfs_handle_struct *handle,
> TALLOC_CTX *mem_ctx,
> struct tevent_context *ev,
> @@ -903,33 +918,100 @@ static struct tevent_req *vfswrap_pwrite_send(struct vfs_handle_struct *handle,
> const void *data,
> size_t n, off_t offset)
> {
> - struct tevent_req *req;
> - struct vfswrap_asys_state *state;
> + struct tevent_req *req, *subreq;
> + struct vfswrap_pwrite_state *state;
> int ret;
>
> - req = tevent_req_create(mem_ctx, &state, struct vfswrap_asys_state);
> + req = tevent_req_create(mem_ctx, &state, struct vfswrap_pwrite_state);
> if (req == NULL) {
> return NULL;
> }
> - if (!vfswrap_init_asys_ctx(handle->conn->sconn)) {
> - tevent_req_oom(req);
> +
> + ret = vfswrap_init_pool(handle->conn->sconn);
> + if (tevent_req_error(req, ret)) {
> return tevent_req_post(req, ev);
> }
> - state->asys_ctx = handle->conn->sconn->asys_ctx;
> - state->req = req;
> +
> + state->ret = -1;
> + state->fd = fsp->fh->fd;
> + state->buf = data;
> + state->count = n;
> + state->offset = offset;
>
> SMBPROFILE_BYTES_ASYNC_START(syscall_asys_pwrite, profile_p,
> state->profile_bytes, n);
> - ret = asys_pwrite(state->asys_ctx, fsp->fh->fd, data, n, offset, req);
> - if (ret != 0) {
> - tevent_req_error(req, ret);
> + SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->profile_bytes);
> +
> + subreq = pthreadpool_tevent_job_send(
> + state, ev, handle->conn->sconn->pool,
> + vfs_pwrite_do, state);
> + if (tevent_req_nomem(subreq, req)) {
> return tevent_req_post(req, ev);
> }
> - talloc_set_destructor(state, vfswrap_asys_state_destructor);
> + tevent_req_set_callback(subreq, vfs_pwrite_done, req);
>
> return req;
> }
>
> +static void vfs_pwrite_do(void *private_data)
> +{
> + struct vfswrap_pwrite_state *state = talloc_get_type_abort(
> + private_data, struct vfswrap_pwrite_state);
> + struct timespec start_time;
> + struct timespec end_time;
> +
> + SMBPROFILE_BYTES_ASYNC_SET_BUSY(state->profile_bytes);
> +
> + PROFILE_TIMESTAMP(&start_time);
> +
> + do {
> + state->ret = pwrite(state->fd, state->buf, state->count,
> + state->offset);
> + } while ((state->ret == -1) && (errno == EINTR));
> +
> + state->err = errno;
> +
> + PROFILE_TIMESTAMP(&end_time);
> +
> + state->vfs_aio_state.duration = nsec_time_diff(&end_time, &start_time);
> +
> + SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->profile_bytes);
> +}
> +
> +static void vfs_pwrite_done(struct tevent_req *subreq)
> +{
> + struct tevent_req *req = tevent_req_callback_data(
> + subreq, struct tevent_req);
> +#ifdef WITH_PROFILE
> + struct vfswrap_pwrite_state *state = tevent_req_data(
> + req, struct vfswrap_pwrite_state);
> +#endif
> + int ret;
> +
> + ret = pthreadpool_tevent_job_recv(subreq);
> + TALLOC_FREE(subreq);
> + SMBPROFILE_BYTES_ASYNC_END(state->profile_bytes);
> + if (tevent_req_error(req, ret)) {
> + return;
> + }
> +
> + tevent_req_done(req);
> +}
> +
> +static ssize_t vfswrap_pwrite_recv(struct tevent_req *req,
> + struct vfs_aio_state *vfs_aio_state)
> +{
> + struct vfswrap_pwrite_state *state = tevent_req_data(
> + req, struct vfswrap_pwrite_state);
> +
> + if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) {
> + return -1;
> + }
> +
> + *vfs_aio_state = state->vfs_aio_state;
> + return state->ret;
> +}
> +
> static struct tevent_req *vfswrap_fsync_send(struct vfs_handle_struct *handle,
> TALLOC_CTX *mem_ctx,
> struct tevent_context *ev,
> @@ -2746,7 +2828,7 @@ static struct vfs_fn_pointers vfs_default_fns = {
> .write_fn = vfswrap_write,
> .pwrite_fn = vfswrap_pwrite,
> .pwrite_send_fn = vfswrap_pwrite_send,
> - .pwrite_recv_fn = vfswrap_asys_ssize_t_recv,
> + .pwrite_recv_fn = vfswrap_pwrite_recv,
> .lseek_fn = vfswrap_lseek,
> .sendfile_fn = vfswrap_sendfile,
> .recvfile_fn = vfswrap_recvfile,
> --
> 2.1.4
>
>
> From c4a65538eea3cc41b5a0b04f7e79d37293c22106 Mon Sep 17 00:00:00 2001
> From: Volker Lendecke <vl at samba.org>
> Date: Sun, 7 Aug 2016 15:53:12 +0200
> Subject: [PATCH 21/23] vfs: Convert vfs_fsync_send to pthreadpool_tevent
>
> Signed-off-by: Volker Lendecke <vl at samba.org>
> Reviewed-by: Stefan Metzmacher <metze at samba.org>
> ---
> source3/modules/vfs_default.c | 93 +++++++++++++++++++++++++++++++++++++------
> 1 file changed, 81 insertions(+), 12 deletions(-)
>
> diff --git a/source3/modules/vfs_default.c b/source3/modules/vfs_default.c
> index 8825e7b..ea9f6ab 100644
> --- a/source3/modules/vfs_default.c
> +++ b/source3/modules/vfs_default.c
> @@ -1012,38 +1012,107 @@ static ssize_t vfswrap_pwrite_recv(struct tevent_req *req,
> return state->ret;
> }
>
> +struct vfswrap_fsync_state {
> + ssize_t ret;
> + int err;
> + int fd;
> +
> + struct vfs_aio_state vfs_aio_state;
> + SMBPROFILE_BASIC_ASYNC_STATE(profile_basic);
> +};
> +
> +static void vfs_fsync_do(void *private_data);
> +static void vfs_fsync_done(struct tevent_req *subreq);
> +
> static struct tevent_req *vfswrap_fsync_send(struct vfs_handle_struct *handle,
> TALLOC_CTX *mem_ctx,
> struct tevent_context *ev,
> struct files_struct *fsp)
> {
> - struct tevent_req *req;
> - struct vfswrap_asys_state *state;
> + struct tevent_req *req, *subreq;
> + struct vfswrap_fsync_state *state;
> int ret;
>
> - req = tevent_req_create(mem_ctx, &state, struct vfswrap_asys_state);
> + req = tevent_req_create(mem_ctx, &state, struct vfswrap_fsync_state);
> if (req == NULL) {
> return NULL;
> }
> - if (!vfswrap_init_asys_ctx(handle->conn->sconn)) {
> - tevent_req_oom(req);
> +
> + ret = vfswrap_init_pool(handle->conn->sconn);
> + if (tevent_req_error(req, ret)) {
> return tevent_req_post(req, ev);
> }
> - state->asys_ctx = handle->conn->sconn->asys_ctx;
> - state->req = req;
> +
> + state->ret = -1;
> + state->fd = fsp->fh->fd;
>
> SMBPROFILE_BASIC_ASYNC_START(syscall_asys_fsync, profile_p,
> state->profile_basic);
> - ret = asys_fsync(state->asys_ctx, fsp->fh->fd, req);
> - if (ret != 0) {
> - tevent_req_error(req, ret);
> +
> + subreq = pthreadpool_tevent_job_send(
> + state, ev, handle->conn->sconn->pool, vfs_fsync_do, state);
> + if (tevent_req_nomem(subreq, req)) {
> return tevent_req_post(req, ev);
> }
> - talloc_set_destructor(state, vfswrap_asys_state_destructor);
> + tevent_req_set_callback(subreq, vfs_fsync_done, req);
>
> return req;
> }
>
> +static void vfs_fsync_do(void *private_data)
> +{
> + struct vfswrap_fsync_state *state = talloc_get_type_abort(
> + private_data, struct vfswrap_fsync_state);
> + struct timespec start_time;
> + struct timespec end_time;
> +
> + PROFILE_TIMESTAMP(&start_time);
> +
> + do {
> + state->ret = fsync(state->fd);
> + } while ((state->ret == -1) && (errno == EINTR));
> +
> + state->err = errno;
> +
> + PROFILE_TIMESTAMP(&end_time);
> +
> + state->vfs_aio_state.duration = nsec_time_diff(&end_time, &start_time);
> +}
> +
> +static void vfs_fsync_done(struct tevent_req *subreq)
> +{
> + struct tevent_req *req = tevent_req_callback_data(
> + subreq, struct tevent_req);
> +#ifdef WITH_PROFILE
> + struct vfswrap_fsync_state *state = tevent_req_data(
> + req, struct vfswrap_fsync_state);
> +#endif
> + int ret;
> +
> + ret = pthreadpool_tevent_job_recv(subreq);
> + TALLOC_FREE(subreq);
> + SMBPROFILE_BASIC_ASYNC_END(state->profile_basic);
> + if (tevent_req_error(req, ret)) {
> + return;
> + }
> +
> + tevent_req_done(req);
> +}
> +
> +static int vfswrap_fsync_recv(struct tevent_req *req,
> + struct vfs_aio_state *vfs_aio_state)
> +{
> + struct vfswrap_fsync_state *state = tevent_req_data(
> + req, struct vfswrap_fsync_state);
> +
> + if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) {
> + return -1;
> + }
> +
> + *vfs_aio_state = state->vfs_aio_state;
> + return state->ret;
> +}
> +
> static void vfswrap_asys_finished(struct tevent_context *ev,
> struct tevent_fd *fde,
> uint16_t flags, void *p)
> @@ -2835,7 +2904,7 @@ static struct vfs_fn_pointers vfs_default_fns = {
> .rename_fn = vfswrap_rename,
> .fsync_fn = vfswrap_fsync,
> .fsync_send_fn = vfswrap_fsync_send,
> - .fsync_recv_fn = vfswrap_asys_int_recv,
> + .fsync_recv_fn = vfswrap_fsync_recv,
> .stat_fn = vfswrap_stat,
> .fstat_fn = vfswrap_fstat,
> .lstat_fn = vfswrap_lstat,
> --
> 2.1.4
>
>
> From db5529cdce637b1c69499c87c371e18d9f62cb88 Mon Sep 17 00:00:00 2001
> From: Volker Lendecke <vl at samba.org>
> Date: Sun, 7 Aug 2016 15:59:10 +0200
> Subject: [PATCH 22/23] vfs: Remove link to asys_
>
> No longer needed after conversion to pthreadpool_tevent
>
> Signed-off-by: Volker Lendecke <vl at samba.org>
> Reviewed-by: Stefan Metzmacher <metze at samba.org>
> ---
> source3/modules/vfs_default.c | 131 ------------------------------------------
> source3/smbd/globals.h | 6 --
> 2 files changed, 137 deletions(-)
>
> diff --git a/source3/modules/vfs_default.c b/source3/modules/vfs_default.c
> index ea9f6ab..53199b8 100644
> --- a/source3/modules/vfs_default.c
> +++ b/source3/modules/vfs_default.c
> @@ -30,7 +30,6 @@
> #include "source3/include/msdfs.h"
> #include "librpc/gen_ndr/ndr_dfsblobs.h"
> #include "lib/util/tevent_unix.h"
> -#include "lib/asys/asys.h"
> #include "lib/util/tevent_ntstatus.h"
> #include "lib/util/sys_rw.h"
> #include "lib/pthreadpool/pthreadpool_tevent.h"
> @@ -706,51 +705,6 @@ static ssize_t vfswrap_pwrite(vfs_handle_struct *handle, files_struct *fsp, cons
> return result;
> }
>
> -static void vfswrap_asys_finished(struct tevent_context *ev,
> - struct tevent_fd *fde,
> - uint16_t flags, void *p);
> -
> -static bool vfswrap_init_asys_ctx(struct smbd_server_connection *conn)
> -{
> - struct asys_context *ctx;
> - struct tevent_fd *fde;
> - int ret;
> - int fd;
> -
> - if (conn->asys_ctx != NULL) {
> - return true;
> - }
> -
> - ret = asys_context_init(&ctx, lp_aio_max_threads());
> - if (ret != 0) {
> - DEBUG(1, ("asys_context_init failed: %s\n", strerror(ret)));
> - return false;
> - }
> -
> - fd = asys_signalfd(ctx);
> -
> - ret = set_blocking(fd, false);
> - if (ret != 0) {
> - DBG_WARNING("set_blocking failed: %s\n", strerror(errno));
> - goto fail;
> - }
> -
> - fde = tevent_add_fd(conn->ev_ctx, conn, fd, TEVENT_FD_READ,
> - vfswrap_asys_finished, ctx);
> - if (fde == NULL) {
> - DEBUG(1, ("tevent_add_fd failed\n"));
> - goto fail;
> - }
> -
> - conn->asys_ctx = ctx;
> - conn->asys_fde = fde;
> - return true;
> -
> -fail:
> - asys_context_destroy(ctx);
> - return false;
> -}
> -
> static int vfswrap_init_pool(struct smbd_server_connection *conn)
> {
> int ret;
> @@ -764,22 +718,6 @@ static int vfswrap_init_pool(struct smbd_server_connection *conn)
> return ret;
> }
>
> -
> -struct vfswrap_asys_state {
> - struct asys_context *asys_ctx;
> - struct tevent_req *req;
> - ssize_t ret;
> - struct vfs_aio_state vfs_aio_state;
> - SMBPROFILE_BASIC_ASYNC_STATE(profile_basic);
> - SMBPROFILE_BYTES_ASYNC_STATE(profile_bytes);
> -};
> -
> -static int vfswrap_asys_state_destructor(struct vfswrap_asys_state *s)
> -{
> - asys_cancel(s->asys_ctx, s->req);
> - return 0;
> -}
> -
> struct vfswrap_pread_state {
> ssize_t ret;
> int err;
> @@ -1113,75 +1051,6 @@ static int vfswrap_fsync_recv(struct tevent_req *req,
> return state->ret;
> }
>
> -static void vfswrap_asys_finished(struct tevent_context *ev,
> - struct tevent_fd *fde,
> - uint16_t flags, void *p)
> -{
> - struct asys_context *asys_ctx = (struct asys_context *)p;
> - struct asys_result results[get_outstanding_aio_calls()];
> - int i, ret;
> -
> - if ((flags & TEVENT_FD_READ) == 0) {
> - return;
> - }
> -
> - ret = asys_results(asys_ctx, results, get_outstanding_aio_calls());
> - if (ret < 0) {
> - DEBUG(1, ("asys_results returned %s\n", strerror(-ret)));
> - return;
> - }
> -
> - for (i=0; i<ret; i++) {
> - struct asys_result *result = &results[i];
> - struct tevent_req *req;
> - struct vfswrap_asys_state *state;
> -
> - if ((result->ret == -1) && (result->err == ECANCELED)) {
> - continue;
> - }
> -
> - req = talloc_get_type_abort(result->private_data,
> - struct tevent_req);
> - state = tevent_req_data(req, struct vfswrap_asys_state);
> -
> - talloc_set_destructor(state, NULL);
> -
> - SMBPROFILE_BASIC_ASYNC_END(state->profile_basic);
> - SMBPROFILE_BYTES_ASYNC_END(state->profile_bytes);
> - state->ret = result->ret;
> - state->vfs_aio_state.error = result->err;
> - state->vfs_aio_state.duration = result->duration;
> - tevent_req_defer_callback(req, ev);
> - tevent_req_done(req);
> - }
> -}
> -
> -static ssize_t vfswrap_asys_ssize_t_recv(struct tevent_req *req,
> - struct vfs_aio_state *vfs_aio_state)
> -{
> - struct vfswrap_asys_state *state = tevent_req_data(
> - req, struct vfswrap_asys_state);
> -
> - if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) {
> - return -1;
> - }
> - *vfs_aio_state = state->vfs_aio_state;
> - return state->ret;
> -}
> -
> -static int vfswrap_asys_int_recv(struct tevent_req *req,
> - struct vfs_aio_state *vfs_aio_state)
> -{
> - struct vfswrap_asys_state *state = tevent_req_data(
> - req, struct vfswrap_asys_state);
> -
> - if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) {
> - return -1;
> - }
> - *vfs_aio_state = state->vfs_aio_state;
> - return state->ret;
> -}
> -
> static off_t vfswrap_lseek(vfs_handle_struct *handle, files_struct *fsp, off_t offset, int whence)
> {
> off_t result = 0;
> diff --git a/source3/smbd/globals.h b/source3/smbd/globals.h
> index 61d85cb..76ce0bb 100644
> --- a/source3/smbd/globals.h
> +++ b/source3/smbd/globals.h
> @@ -921,12 +921,6 @@ struct smbd_server_connection {
> } locks;
> } smb2;
>
> - /*
> - * Link into libasys for asynchronous operations
> - */
> - struct asys_context *asys_ctx;
> - struct tevent_fd *asys_fde;
> -
> struct pthreadpool_tevent *pool;
>
> struct smbXsrv_client *client;
> --
> 2.1.4
>
>
> From c9ce2063636533b81498503803ff0eb4456fe04c Mon Sep 17 00:00:00 2001
> From: Volker Lendecke <vl at samba.org>
> Date: Sun, 7 Aug 2016 16:03:00 +0200
> Subject: [PATCH 23/23] lib: Remove unused source3/lib/asys
>
> Signed-off-by: Volker Lendecke <vl at samba.org>
> Reviewed-by: Stefan Metzmacher <metze at samba.org>
> ---
> source3/lib/asys/asys.c | 342 -----------------------------------------
> source3/lib/asys/asys.h | 155 -------------------
> source3/lib/asys/tests.c | 90 -----------
> source3/lib/asys/wscript_build | 10 --
> source3/wscript_build | 2 -
> 5 files changed, 599 deletions(-)
> delete mode 100644 source3/lib/asys/asys.c
> delete mode 100644 source3/lib/asys/asys.h
> delete mode 100644 source3/lib/asys/tests.c
> delete mode 100644 source3/lib/asys/wscript_build
>
> diff --git a/source3/lib/asys/asys.c b/source3/lib/asys/asys.c
> deleted file mode 100644
> index 594d470..0000000
> --- a/source3/lib/asys/asys.c
> +++ /dev/null
> @@ -1,342 +0,0 @@
> -/*
> - * Async syscalls
> - * Copyright (C) Volker Lendecke 2012
> - *
> - * This program is free software; you can redistribute it and/or modify
> - * it under the terms of the GNU General Public License as published by
> - * the Free Software Foundation; either version 3 of the License, or
> - * (at your option) any later version.
> - *
> - * This program is distributed in the hope that it will be useful,
> - * but WITHOUT ANY WARRANTY; without even the implied warranty of
> - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> - * GNU General Public License for more details.
> - *
> - * You should have received a copy of the GNU General Public License
> - * along with this program. If not, see <http://www.gnu.org/licenses/>.
> - */
> -
> -#include "asys.h"
> -#include <stdlib.h>
> -#include <errno.h>
> -#include "../pthreadpool/pthreadpool_pipe.h"
> -#include "lib/util/time.h"
> -#include "smbprofile.h"
> -
> -struct asys_pwrite_args {
> - int fildes;
> - const void *buf;
> - size_t nbyte;
> - off_t offset;
> -};
> -
> -struct asys_pread_args {
> - int fildes;
> - void *buf;
> - size_t nbyte;
> - off_t offset;
> -};
> -
> -struct asys_fsync_args {
> - int fildes;
> -};
> -
> -union asys_job_args {
> - struct asys_pwrite_args pwrite_args;
> - struct asys_pread_args pread_args;
> - struct asys_fsync_args fsync_args;
> -};
> -
> -struct asys_job {
> - void *private_data;
> - union asys_job_args args;
> - ssize_t ret;
> - int err;
> - char busy;
> - char canceled;
> - struct timespec start_time;
> - struct timespec end_time;
> -};
> -
> -struct asys_context {
> - struct pthreadpool_pipe *pool;
> - int pthreadpool_pipe_fd;
> -
> - unsigned num_jobs;
> - struct asys_job **jobs;
> -};
> -
> -struct asys_creds_context {
> - int dummy;
> -};
> -
> -int asys_context_init(struct asys_context **pctx, unsigned max_parallel)
> -{
> - struct asys_context *ctx;
> - int ret;
> -
> - ctx = calloc(1, sizeof(struct asys_context));
> - if (ctx == NULL) {
> - return ENOMEM;
> - }
> - ret = pthreadpool_pipe_init(max_parallel, &ctx->pool);
> - if (ret != 0) {
> - free(ctx);
> - return ret;
> - }
> - ctx->pthreadpool_pipe_fd = pthreadpool_pipe_signal_fd(ctx->pool);
> -
> - *pctx = ctx;
> - return 0;
> -}
> -
> -int asys_signalfd(struct asys_context *ctx)
> -{
> - return ctx->pthreadpool_pipe_fd;
> -}
> -
> -int asys_context_destroy(struct asys_context *ctx)
> -{
> - int ret;
> - unsigned i;
> -
> - for (i=0; i<ctx->num_jobs; i++) {
> - if (ctx->jobs[i]->busy) {
> - return EBUSY;
> - }
> - }
> -
> - ret = pthreadpool_pipe_destroy(ctx->pool);
> - if (ret != 0) {
> - return ret;
> - }
> - for (i=0; i<ctx->num_jobs; i++) {
> - free(ctx->jobs[i]);
> - }
> - free(ctx->jobs);
> - free(ctx);
> - return 0;
> -}
> -
> -static int asys_new_job(struct asys_context *ctx, int *jobid,
> - struct asys_job **pjob)
> -{
> - struct asys_job **tmp;
> - struct asys_job *job;
> - unsigned i;
> -
> - for (i=0; i<ctx->num_jobs; i++) {
> - job = ctx->jobs[i];
> - if (!job->busy) {
> - job->err = 0;
> - *pjob = job;
> - *jobid = i;
> - return 0;
> - }
> - }
> -
> - if (ctx->num_jobs+1 == 0) {
> - return EBUSY; /* overflow */
> - }
> -
> - tmp = realloc(ctx->jobs, sizeof(struct asys_job *)*(ctx->num_jobs+1));
> - if (tmp == NULL) {
> - return ENOMEM;
> - }
> - ctx->jobs = tmp;
> -
> - job = calloc(1, sizeof(struct asys_job));
> - if (job == NULL) {
> - return ENOMEM;
> - }
> - ctx->jobs[ctx->num_jobs] = job;
> -
> - *jobid = ctx->num_jobs;
> - *pjob = job;
> - ctx->num_jobs += 1;
> - return 0;
> -}
> -
> -static void asys_pwrite_do(void *private_data);
> -
> -int asys_pwrite(struct asys_context *ctx, int fildes, const void *buf,
> - size_t nbyte, off_t offset, void *private_data)
> -{
> - struct asys_job *job;
> - struct asys_pwrite_args *args;
> - int jobid;
> - int ret;
> -
> - ret = asys_new_job(ctx, &jobid, &job);
> - if (ret != 0) {
> - return ret;
> - }
> - job->private_data = private_data;
> -
> - args = &job->args.pwrite_args;
> - args->fildes = fildes;
> - args->buf = buf;
> - args->nbyte = nbyte;
> - args->offset = offset;
> -
> - ret = pthreadpool_pipe_add_job(ctx->pool, jobid, asys_pwrite_do, job);
> - if (ret != 0) {
> - return ret;
> - }
> - job->busy = 1;
> -
> - return 0;
> -}
> -
> -static void asys_pwrite_do(void *private_data)
> -{
> - struct asys_job *job = (struct asys_job *)private_data;
> - struct asys_pwrite_args *args = &job->args.pwrite_args;
> -
> - PROFILE_TIMESTAMP(&job->start_time);
> - job->ret = pwrite(args->fildes, args->buf, args->nbyte, args->offset);
> - PROFILE_TIMESTAMP(&job->end_time);
> -
> - if (job->ret == -1) {
> - job->err = errno;
> - }
> -}
> -
> -static void asys_pread_do(void *private_data);
> -
> -int asys_pread(struct asys_context *ctx, int fildes, void *buf,
> - size_t nbyte, off_t offset, void *private_data)
> -{
> - struct asys_job *job;
> - struct asys_pread_args *args;
> - int jobid;
> - int ret;
> -
> - ret = asys_new_job(ctx, &jobid, &job);
> - if (ret != 0) {
> - return ret;
> - }
> - job->private_data = private_data;
> -
> - args = &job->args.pread_args;
> - args->fildes = fildes;
> - args->buf = buf;
> - args->nbyte = nbyte;
> - args->offset = offset;
> -
> - ret = pthreadpool_pipe_add_job(ctx->pool, jobid, asys_pread_do, job);
> - if (ret != 0) {
> - return ret;
> - }
> - job->busy = 1;
> -
> - return 0;
> -}
> -
> -static void asys_pread_do(void *private_data)
> -{
> - struct asys_job *job = (struct asys_job *)private_data;
> - struct asys_pread_args *args = &job->args.pread_args;
> -
> - PROFILE_TIMESTAMP(&job->start_time);
> - job->ret = pread(args->fildes, args->buf, args->nbyte, args->offset);
> - PROFILE_TIMESTAMP(&job->end_time);
> -
> - if (job->ret == -1) {
> - job->err = errno;
> - }
> -}
> -
> -static void asys_fsync_do(void *private_data);
> -
> -int asys_fsync(struct asys_context *ctx, int fildes, void *private_data)
> -{
> - struct asys_job *job;
> - struct asys_fsync_args *args;
> - int jobid;
> - int ret;
> -
> - ret = asys_new_job(ctx, &jobid, &job);
> - if (ret != 0) {
> - return ret;
> - }
> - job->private_data = private_data;
> -
> - args = &job->args.fsync_args;
> - args->fildes = fildes;
> -
> - ret = pthreadpool_pipe_add_job(ctx->pool, jobid, asys_fsync_do, job);
> - if (ret != 0) {
> - return ret;
> - }
> - job->busy = 1;
> -
> - return 0;
> -}
> -
> -static void asys_fsync_do(void *private_data)
> -{
> - struct asys_job *job = (struct asys_job *)private_data;
> - struct asys_fsync_args *args = &job->args.fsync_args;
> -
> - PROFILE_TIMESTAMP(&job->start_time);
> - job->ret = fsync(args->fildes);
> - PROFILE_TIMESTAMP(&job->end_time);
> -
> - if (job->ret == -1) {
> - job->err = errno;
> - }
> -}
> -
> -void asys_cancel(struct asys_context *ctx, void *private_data)
> -{
> - unsigned i;
> -
> - for (i=0; i<ctx->num_jobs; i++) {
> - struct asys_job *job = ctx->jobs[i];
> -
> - if (job->private_data == private_data) {
> - job->canceled = 1;
> - }
> - }
> -}
> -
> -int asys_results(struct asys_context *ctx, struct asys_result *results,
> - unsigned num_results)
> -{
> - int jobids[num_results];
> - int i, ret;
> -
> - ret = pthreadpool_pipe_finished_jobs(ctx->pool, jobids, num_results);
> - if (ret <= 0) {
> - return ret;
> - }
> -
> - for (i=0; i<ret; i++) {
> - struct asys_result *result = &results[i];
> - struct asys_job *job;
> - int jobid;
> -
> - jobid = jobids[i];
> -
> - if ((jobid < 0) || (jobid >= ctx->num_jobs)) {
> - return -EIO;
> - }
> -
> - job = ctx->jobs[jobid];
> -
> - if (job->canceled) {
> - result->ret = -1;
> - result->err = ECANCELED;
> - } else {
> - result->ret = job->ret;
> - result->err = job->err;
> - }
> - result->private_data = job->private_data;
> - result->duration = nsec_time_diff(&job->end_time, &job->start_time);
> -
> - job->busy = 0;
> - }
> -
> - return ret;
> -}
> diff --git a/source3/lib/asys/asys.h b/source3/lib/asys/asys.h
> deleted file mode 100644
> index f576bd3..0000000
> --- a/source3/lib/asys/asys.h
> +++ /dev/null
> @@ -1,155 +0,0 @@
> -/*
> - * Async syscalls
> - * Copyright (C) Volker Lendecke 2012
> - *
> - * This program is free software; you can redistribute it and/or modify
> - * it under the terms of the GNU General Public License as published by
> - * the Free Software Foundation; either version 3 of the License, or
> - * (at your option) any later version.
> - *
> - * This program is distributed in the hope that it will be useful,
> - * but WITHOUT ANY WARRANTY; without even the implied warranty of
> - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> - * GNU General Public License for more details.
> - *
> - * You should have received a copy of the GNU General Public License
> - * along with this program. If not, see <http://www.gnu.org/licenses/>.
> - */
> -
> -#ifndef __ASYS_H__
> -#define __ASYS_H__
> -
> -#include "replace.h"
> -#include "system/filesys.h"
> -
> -/**
> - * @defgroup asys The async syscall library
> - *
> - * This module contains a set of asynchronous functions that directly
> - * wrap normally synchronous posix system calls. The reason for this
> - * module's existence is the limited set of operations the posix async
> - * I/O API provides.
> - *
> - * The basic flow of operations is:
> - *
> - * The application creates a asys_context structure using
> - * asys_context_create()
> - *
> - * The application triggers a call to the library by calling for
> - * example asys_ftruncate(). asys_ftruncate() takes a private_data
> - * argument that will be returned later by asys_result. The calling
> - * application should hand a pointer representing the async operation
> - * to the private_data argument.
> - *
> - * The application puts the fd returned by asys_signalfd() into its
> - * event loop. When the signal fd becomes readable, the application
> - * calls asys_result() to grab the final result of one of the system
> - * calls that were issued in the meantime.
> - *
> - * For multi-user applications it is necessary to create different
> - * credential contexts, as it is not clear when exactly the real
> - * system call will be issued. The application might have called
> - * seteuid(2) or something equivalent in the meantime. Thus, all
> - * system calls doing access checks, in particular all calls doing
> - * path-based operations, require a struct auth_creds_context
> - * parameter. asys_creds_context_create() creates such a context. All
> - * credential-checking operations take a struct asys_creds_context as
> - * an argument. It can be NULL if the application never changes
> - * credentials.
> - *
> - * @{
> - */
> -
> -struct asys_context;
> -struct asys_creds_context;
> -
> -enum asys_log_level {
> - ASYS_LOG_FATAL = 0,
> - ASYS_DEBUG_ERROR,
> - ASYS_DEBUG_WARNING,
> - ASYS_DEBUG_TRACE
> -};
> -
> -#ifndef PRINTF_ATTRIBUTE
> -#if (__GNUC__ >= 3)
> -/** Use gcc attribute to check printf fns. a1 is the 1-based index of
> - * the parameter containing the format, and a2 the index of the first
> - * argument. Note that some gcc 2.x versions don't handle this
> - * properly **/
> -#define PRINTF_ATTRIBUTE(a1, a2) __attribute__ ((format (__printf__, a1, a2)))
> -#else
> -#define PRINTF_ATTRIBUTE(a1, a2)
> -#endif
> -#endif
> -
> -typedef void (*asys_log_fn)(struct asys_context *ctx, void *private_data,
> - enum asys_log_level level,
> - const char *fmt, ...) PRINTF_ATTRIBUTE(4, 5);
> -
> -int asys_context_init(struct asys_context **ctx, unsigned max_parallel);
> -int asys_context_destroy(struct asys_context *ctx);
> -void asys_set_log_fn(struct asys_context *ctx, asys_log_fn fn,
> - void *private_data);
> -
> -/**
> - * @brief Get the the signal fd
> - *
> - * asys_signalfd() returns a file descriptor that will become readable
> - * whenever an asynchronous request has finished. When the signalfd is
> - * readable, calling asys_result() will not block.
> - *
> - * @param[in] ctx The asys context
> - * @return A file descriptor indicating a finished operation
> - */
> -
> -int asys_signalfd(struct asys_context *ctx);
> -
> -struct asys_result {
> - ssize_t ret;
> - int err;
> - void *private_data;
> - uint64_t duration; /* nanoseconds */
> -};
> -
> -/**
> - * @brief Pull the results from async operations
> - *
> - * Whe the fd returned from asys_signalfd() is readable, one or more async
> - * operations have finished. The result from the async operations can be pulled
> - * with asys_results().
> - *
> - * @param[in] ctx The asys context
> - * @param[out] results The result strutcts
> - * @param[in] num_results The length of the results array
> - * @return success: >=0, number of finished jobs
> - * failure: -errno
> - */
> -int asys_results(struct asys_context *ctx, struct asys_result *results,
> - unsigned num_results);
> -
> -void asys_cancel(struct asys_context *ctx, void *private_data);
> -
> -int asys_pread(struct asys_context *ctx, int fildes, void *buf, size_t nbyte,
> - off_t offset, void *private_data);
> -int asys_pwrite(struct asys_context *ctx, int fildes, const void *buf,
> - size_t nbyte, off_t offset, void *private_data);
> -int asys_ftruncate(struct asys_context *ctx, int filedes, off_t length,
> - void *private_data);
> -int asys_fsync(struct asys_context *ctx, int fd, void *private_data);
> -int asys_close(struct asys_context *ctx, int fd, void *private_data);
> -
> -struct asys_creds_context *asys_creds_context_create(
> - struct asys_context *ctx,
> - uid_t uid, gid_t gid, unsigned num_gids, gid_t *gids);
> -
> -int asys_creds_context_delete(struct asys_creds_context *ctx);
> -
> -int asys_open(struct asys_context *ctx, struct asys_creds_context *cctx,
> - const char *pathname, int flags, mode_t mode,
> - void *private_data);
> -int asys_unlink(struct asys_context *ctx, struct asys_creds_context *cctx,
> - const char *pathname, void *private_data);
> -
> -/* @} */
> -
> -#endif /* __ASYS_H__ */
> diff --git a/source3/lib/asys/tests.c b/source3/lib/asys/tests.c
> deleted file mode 100644
> index e54e3ea..0000000
> --- a/source3/lib/asys/tests.c
> +++ /dev/null
> @@ -1,90 +0,0 @@
> -/*
> - * Test async syscalls
> - * Copyright (C) Volker Lendecke 2012
> - *
> - * This program is free software; you can redistribute it and/or modify
> - * it under the terms of the GNU General Public License as published by
> - * the Free Software Foundation; either version 3 of the License, or
> - * (at your option) any later version.
> - *
> - * This program is distributed in the hope that it will be useful,
> - * but WITHOUT ANY WARRANTY; without even the implied warranty of
> - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> - * GNU General Public License for more details.
> - *
> - * You should have received a copy of the GNU General Public License
> - * along with this program. If not, see <http://www.gnu.org/licenses/>.
> - */
> -
> -#include "asys.h"
> -#include <stdio.h>
> -#include <string.h>
> -#include <stdlib.h>
> -#include <errno.h>
> -
> -int main(int argc, const char *argv[])
> -{
> - struct asys_context *ctx;
> - int i, fd, ret;
> -
> - int *buf;
> -
> - int ntasks = 10;
> -
> - ret = asys_context_init(&ctx, 0);
> - if (ret != 0) {
> - perror("asys_context_create failed");
> - return 1;
> - }
> -
> - fd = open("asys_testfile", O_CREAT|O_RDWR, 0644);
> - if (fd == -1) {
> - perror("open failed");
> - return 1;
> - }
> -
> - buf = calloc(ntasks, sizeof(int));
> - if (buf == NULL) {
> - perror("calloc failed");
> - return 1;
> - }
> -
> - for (i=0; i<ntasks; i++) {
> - buf[i] = i;
> - }
> -
> - for (i=0; i<ntasks; i++) {
> - ret = asys_pwrite(ctx, fd, &buf[i], sizeof(int),
> - i * sizeof(int), &buf[i]);
> - if (ret != 0) {
> - errno = ret;
> - perror("asys_pwrite failed");
> - return 1;
> - }
> - }
> -
> - for (i=0; i<ntasks; i++) {
> - struct asys_result result;
> - int *pidx;
> -
> - ret = asys_results(ctx, &result, 1);
> - if (ret < 0) {
> - errno = -ret;
> - perror("asys_result failed");
> - return 1;
> - }
> - pidx = (int *)result.private_data;
> -
> - printf("%d returned %d\n", *pidx, (int)result.ret);
> - }
> -
> - ret = asys_context_destroy(ctx);
> - if (ret != 0) {
> - perror("asys_context_delete failed");
> - return 1;
> - }
> -
> - free(buf);
> -
> - return 0;
> -}
> diff --git a/source3/lib/asys/wscript_build b/source3/lib/asys/wscript_build
> deleted file mode 100644
> index 520994f..0000000
> --- a/source3/lib/asys/wscript_build
> +++ /dev/null
> @@ -1,10 +0,0 @@
> -#!/usr/bin/env python
> -
> -bld.SAMBA3_SUBSYSTEM('LIBASYS',
> - source='asys.c',
> - deps='PTHREADPOOL samba-util')
> -
> -bld.SAMBA3_BINARY('asystest',
> - source='tests.c',
> - deps='LIBASYS',
> - install=False)
> diff --git a/source3/wscript_build b/source3/wscript_build
> index 1d6f043..69b7371 100755
> --- a/source3/wscript_build
> +++ b/source3/wscript_build
> @@ -629,7 +629,6 @@ bld.SAMBA3_LIBRARY('smbd_base',
> RPC_SERVICE
> NDR_SMBXSRV
> LEASES_DB
> - LIBASYS
> sysquotas
> NDR_SMB_ACL
> netapi
> @@ -1513,7 +1512,6 @@ bld.SAMBA3_BINARY('spotlight2sparql',
> bld.RECURSE('auth')
> bld.RECURSE('libgpo/gpext')
> bld.RECURSE('lib/pthreadpool')
> -bld.RECURSE('lib/asys')
> bld.RECURSE('lib/unix_msg')
> bld.RECURSE('librpc')
> bld.RECURSE('librpc/idl')
> --
> 2.1.4
>
More information about the samba-technical
mailing list