[PATCH] Avoid a pipe read in aio result scheduling
Volker Lendecke
vl at samba.org
Mon Aug 22 15:27:35 UTC 2016
Hi!
Attached find a patchset that is supposed to improve our async I/O
result scheduling. For a more verbose explanation I'd recommend the
commit message for [PATCH 04/23].
Jeremy, I did not look deeply enough into your threaded messaging
tevent code to see whether the tevent_thread_proxy_schedule()
implementation can be based upon tevent_threaded_schedule_immediate().
Thanks metze for very deep review and insights.
Comments?
Thanks, Volker
-------------- next part --------------
>From 7b07c12d3ffaf37debd81a4cad8fd385fa7d4360 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 12 Aug 2016 16:40:05 +0200
Subject: [PATCH 01/23] libreplace: Ask for eventfd(2)
This will be used in tevent soon
Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
lib/replace/wscript | 3 +++
1 file changed, 3 insertions(+)
diff --git a/lib/replace/wscript b/lib/replace/wscript
index 145300d..1dfd902 100644
--- a/lib/replace/wscript
+++ b/lib/replace/wscript
@@ -483,6 +483,9 @@ removeea setea
if conf.CONFIG_SET('HAVE_PORT_CREATE') and conf.CONFIG_SET('HAVE_PORT_H'):
conf.DEFINE('HAVE_SOLARIS_PORTS', 1)
+ if conf.CHECK_FUNCS('eventfd', headers='sys/eventfd.h'):
+ conf.DEFINE('HAVE_EVENTFD', 1)
+
conf.CHECK_HEADERS('poll.h')
conf.CHECK_FUNCS('poll')
--
2.1.4
>From 9c292edb10c47a3c8b3a269d0aa8556e411aa8a2 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 15 Aug 2016 10:33:09 +0200
Subject: [PATCH 02/23] tevent: Fix a typo
Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
lib/tevent/tevent_threads.c | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/lib/tevent/tevent_threads.c b/lib/tevent/tevent_threads.c
index 15882e4..22b854c 100644
--- a/lib/tevent/tevent_threads.c
+++ b/lib/tevent/tevent_threads.c
@@ -108,7 +108,7 @@ static void schedule_immediate_functions(struct tevent_thread_proxy *tp)
if (tp->tofree_im_list != NULL) {
/*
* Once the current immediate events
- * are processed, we need to reshedule
+ * are processed, we need to reschedule
* ourselves to free them. This works
* as tevent_schedule_immediate()
* always adds events to the *END* of
--
2.1.4
>From b2a4dfb515b9eea36fe8641ee3a7916e430e3ad1 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 29 Jul 2016 08:53:59 +0200
Subject: [PATCH 03/23] tevent: Move the async wakeup pipe to common
Signalling the main event loop will also happen from threads soon, and
that will use the same mechanism. This also keeps the pipe open after the last
signal handler is removed. Threaded jobs will come and go very frequently, and
always setting up and tearing down the pipe for each job will be expensive.
Also, this is "just" two file descriptors, and with eventfd just one.
Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
lib/tevent/ABI/tevent-0.9.29.sigs | 3 ++
lib/tevent/tevent.c | 87 +++++++++++++++++++++++++++++++++++++--
lib/tevent/tevent_internal.h | 4 ++
lib/tevent/tevent_poll.c | 5 +--
lib/tevent/tevent_signal.c | 60 ++++-----------------------
5 files changed, 99 insertions(+), 60 deletions(-)
diff --git a/lib/tevent/ABI/tevent-0.9.29.sigs b/lib/tevent/ABI/tevent-0.9.29.sigs
index 1357751..4b64741 100644
--- a/lib/tevent/ABI/tevent-0.9.29.sigs
+++ b/lib/tevent/ABI/tevent-0.9.29.sigs
@@ -28,10 +28,13 @@ tevent_common_fd_destructor: int (struct tevent_fd *)
tevent_common_fd_get_flags: uint16_t (struct tevent_fd *)
tevent_common_fd_set_close_fn: void (struct tevent_fd *, tevent_fd_close_fn_t)
tevent_common_fd_set_flags: void (struct tevent_fd *, uint16_t)
+tevent_common_have_events: bool (struct tevent_context *)
tevent_common_loop_immediate: bool (struct tevent_context *)
tevent_common_loop_timer_delay: struct timeval (struct tevent_context *)
tevent_common_loop_wait: int (struct tevent_context *, const char *)
tevent_common_schedule_immediate: void (struct tevent_immediate *, struct tevent_context *, tevent_immediate_handler_t, void *, const char *, const char *)
+tevent_common_wakeup: int (struct tevent_context *)
+tevent_common_wakeup_init: int (struct tevent_context *)
tevent_context_init: struct tevent_context *(TALLOC_CTX *)
tevent_context_init_byname: struct tevent_context *(TALLOC_CTX *, const char *)
tevent_context_init_ops: struct tevent_context *(TALLOC_CTX *, const struct tevent_ops *, void *)
diff --git a/lib/tevent/tevent.c b/lib/tevent/tevent.c
index 843cf05..34cd402 100644
--- a/lib/tevent/tevent.c
+++ b/lib/tevent/tevent.c
@@ -620,6 +620,28 @@ done:
return ret;
}
+bool tevent_common_have_events(struct tevent_context *ev)
+{
+ if (ev->fd_events != NULL) {
+ if (ev->fd_events != ev->pipe_fde) {
+ return true;
+ }
+ if (ev->fd_events->next != NULL) {
+ return true;
+ }
+
+ /*
+ * At this point we just have the wakeup pipe event as
+ * the only fd_event. That one does not count as a
+ * regular event, so look at the other event types.
+ */
+ }
+
+ return ((ev->timer_events != NULL) ||
+ (ev->immediate_events != NULL) ||
+ (ev->signal_events != NULL));
+}
+
/*
return on failure or (with 0) if all fd events are removed
*/
@@ -629,10 +651,7 @@ int tevent_common_loop_wait(struct tevent_context *ev,
/*
* loop as long as we have events pending
*/
- while (ev->fd_events ||
- ev->timer_events ||
- ev->immediate_events ||
- ev->signal_events) {
+ while (tevent_common_have_events(ev)) {
int ret;
ret = _tevent_loop_once(ev, location);
if (ret != 0) {
@@ -670,3 +689,63 @@ int tevent_re_initialise(struct tevent_context *ev)
return ev->ops->context_init(ev);
}
+
+static void wakeup_pipe_handler(struct tevent_context *ev,
+ struct tevent_fd *fde,
+ uint16_t flags, void *_private)
+{
+ ssize_t ret;
+
+ char c[16];
+ /* its non-blocking, doesn't matter if we read too much */
+ do {
+ ret = read(fde->fd, c, sizeof(c));
+ } while (ret == -1 && errno == EINTR);
+}
+
+/*
+ * Initialize the wakeup pipe and pipe fde
+ */
+
+int tevent_common_wakeup_init(struct tevent_context *ev)
+{
+ int ret;
+
+ if (ev->pipe_fde != NULL) {
+ return 0;
+ }
+
+ ret = pipe(ev->pipe_fds);
+ if (ret == -1) {
+ return errno;
+ }
+ ev_set_blocking(ev->pipe_fds[0], false);
+ ev_set_blocking(ev->pipe_fds[1], false);
+
+ ev->pipe_fde = tevent_add_fd(ev, ev, ev->pipe_fds[0],
+ TEVENT_FD_READ,
+ wakeup_pipe_handler, NULL);
+ if (ev->pipe_fde == NULL) {
+ close(ev->pipe_fds[0]);
+ close(ev->pipe_fds[1]);
+ return ENOMEM;
+ }
+
+ return 0;
+}
+
+int tevent_common_wakeup(struct tevent_context *ev)
+{
+ ssize_t ret;
+
+ if (ev->pipe_fds[1] == -1) {
+ return ENOTCONN;
+ }
+
+ do {
+ char c = '\0';
+ ret = write(ev->pipe_fds[1], &c, 1);
+ } while ((ret == -1) && (errno == EINTR));
+
+ return 0;
+}
diff --git a/lib/tevent/tevent_internal.h b/lib/tevent/tevent_internal.h
index 10cc4a4..8362770 100644
--- a/lib/tevent/tevent_internal.h
+++ b/lib/tevent/tevent_internal.h
@@ -328,6 +328,10 @@ void tevent_common_schedule_immediate(struct tevent_immediate *im,
const char *location);
bool tevent_common_loop_immediate(struct tevent_context *ev);
+bool tevent_common_have_events(struct tevent_context *ev);
+int tevent_common_wakeup_init(struct tevent_context *ev);
+int tevent_common_wakeup(struct tevent_context *ev);
+
struct tevent_signal *tevent_common_add_signal(struct tevent_context *ev,
TALLOC_CTX *mem_ctx,
int signum,
diff --git a/lib/tevent/tevent_poll.c b/lib/tevent/tevent_poll.c
index e1c305d..3547e91 100644
--- a/lib/tevent/tevent_poll.c
+++ b/lib/tevent/tevent_poll.c
@@ -667,10 +667,7 @@ static int poll_event_loop_wait(struct tevent_context *ev,
/*
* loop as long as we have events pending
*/
- while (ev->fd_events ||
- ev->timer_events ||
- ev->immediate_events ||
- ev->signal_events ||
+ while (tevent_common_have_events(ev) ||
poll_ev->fresh ||
poll_ev->disabled) {
int ret;
diff --git a/lib/tevent/tevent_signal.c b/lib/tevent/tevent_signal.c
index 635a7a1..c85e1c5 100644
--- a/lib/tevent/tevent_signal.c
+++ b/lib/tevent/tevent_signal.c
@@ -95,7 +95,6 @@ static uint32_t tevent_sig_count(struct tevent_sigcounter s)
*/
static void tevent_common_signal_handler(int signum)
{
- char c = 0;
struct tevent_common_signal_list *sl;
struct tevent_context *ev = NULL;
int saved_errno = errno;
@@ -106,13 +105,8 @@ static void tevent_common_signal_handler(int signum)
/* Write to each unique event context. */
for (sl = sig_state->sig_handlers[signum]; sl; sl = sl->next) {
if (sl->se->event_ctx && sl->se->event_ctx != ev) {
- ssize_t ret;
-
ev = sl->se->event_ctx;
- /* doesn't matter if this pipe overflows */
- do {
- ret = write(ev->pipe_fds[1], &c, 1);
- } while (ret == -1 && errno == EINTR);
+ tevent_common_wakeup(ev);
}
}
@@ -198,16 +192,6 @@ static int tevent_signal_destructor(struct tevent_signal *se)
struct tevent_context *ev = se->event_ctx;
DLIST_REMOVE(ev->signal_events, se);
-
- if (ev->signal_events == NULL && ev->pipe_fde != NULL) {
- /*
- * This was the last signal. Destroy the pipe.
- */
- TALLOC_FREE(ev->pipe_fde);
-
- close(ev->pipe_fds[0]);
- close(ev->pipe_fds[1]);
- }
}
talloc_free(sl);
@@ -233,21 +217,6 @@ static int tevent_signal_destructor(struct tevent_signal *se)
}
/*
- this is part of the pipe hack needed to avoid the signal race condition
-*/
-static void signal_pipe_handler(struct tevent_context *ev, struct tevent_fd *fde,
- uint16_t flags, void *_private)
-{
- ssize_t ret;
-
- char c[16];
- /* its non-blocking, doesn't matter if we read too much */
- do {
- ret = read(fde->fd, c, sizeof(c));
- } while (ret == -1 && errno == EINTR);
-}
-
-/*
add a signal event
return NULL on failure (memory allocation error)
*/
@@ -263,6 +232,13 @@ struct tevent_signal *tevent_common_add_signal(struct tevent_context *ev,
struct tevent_signal *se;
struct tevent_common_signal_list *sl;
sigset_t set, oldset;
+ int ret;
+
+ ret = tevent_common_wakeup_init(ev);
+ if (ret != 0) {
+ errno = ret;
+ return NULL;
+ }
if (signum >= TEVENT_NUM_SIGNALS) {
errno = EINVAL;
@@ -304,26 +280,6 @@ struct tevent_signal *tevent_common_add_signal(struct tevent_context *ev,
return NULL;
}
- /* we need to setup the pipe hack handler if not already
- setup */
- if (ev->pipe_fde == NULL) {
- if (pipe(ev->pipe_fds) == -1) {
- talloc_free(se);
- return NULL;
- }
- ev_set_blocking(ev->pipe_fds[0], false);
- ev_set_blocking(ev->pipe_fds[1], false);
- ev->pipe_fde = tevent_add_fd(ev, ev, ev->pipe_fds[0],
- TEVENT_FD_READ,
- signal_pipe_handler, NULL);
- if (!ev->pipe_fde) {
- close(ev->pipe_fds[0]);
- close(ev->pipe_fds[1]);
- talloc_free(se);
- return NULL;
- }
- }
-
/* only install a signal handler if not already installed */
if (sig_state->sig_handlers[signum] == NULL) {
struct sigaction act;
--
2.1.4
>From 5c9f9ab188fc36195c668d0b7f1954d2921461c0 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 8 Aug 2016 11:26:37 +0200
Subject: [PATCH 04/23] tevent: Add threaded immediate activation
This is infrastructure to improve our async r/w result handling and latency.
The pthreadpool signalling goes through a pipe. This has downsides: The main
event loop has to go through a read on the pipe before it can ship the result.
Also, it is not guaranteed by poll/epoll that the pthreadpool signal pipe is
handled with top priority. When an async pread/pwrite has finished, we should
immediately ship the result to the client, not waiting for anything else.
This patch enables tevent_immediate structs as job signalling. This means a
busy main tevent loop will handle the threaded job completion before any timed
or file descriptor events. Opposite to Jeremy's tevent_thread_proxy this is
done by a modification of the main event loop by looking at a linked list under
a central mutex.
Regarding performance: In a later commit I've created a test that does nothing
but fire one immediate over and over again. If you add a phread_mutex_lock and
unlock pair in the immediate handler, you lose roughly 25% of rounds per
second, so it is measurable. It is questionable that will be measurable in the
real world, but to counter concerns activation of immediates needs to go
through a new struct tevent_threaded_context. Only if such a
tevent_threaded_context exists for a tevent context, the main loop takes the
hit to look at the mutex'ed list of finished jobs.
This patch by design does not care about talloc hierarchies. The idea is that
the main thread owning the tevent context creates a chunk of memory and
prepares the tevent_immediate indication job completion. The main thread hands
the memory chunk together with the immediate as a job description over to a
helper thread. The helper thread does its job and upon completion calls
tevent_threaded_schedule_immediate with the already-prepared immediate. From
that point on memory ownership is again transferred to the main thread.
Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
lib/tevent/ABI/tevent-0.9.29.sigs | 3 +
lib/tevent/tevent.c | 147 ++++++++++++++++++++++++++++++++++++++
lib/tevent/tevent.h | 74 +++++++++++++++++++
lib/tevent/tevent_epoll.c | 4 ++
lib/tevent/tevent_internal.h | 17 +++++
lib/tevent/tevent_poll.c | 4 ++
lib/tevent/tevent_port.c | 4 ++
lib/tevent/tevent_select.c | 4 ++
lib/tevent/tevent_threads.c | 119 ++++++++++++++++++++++++++++++
lib/tevent/wscript | 6 +-
10 files changed, 381 insertions(+), 1 deletion(-)
diff --git a/lib/tevent/ABI/tevent-0.9.29.sigs b/lib/tevent/ABI/tevent-0.9.29.sigs
index 4b64741..9b8bfa1 100644
--- a/lib/tevent/ABI/tevent-0.9.29.sigs
+++ b/lib/tevent/ABI/tevent-0.9.29.sigs
@@ -16,6 +16,7 @@ _tevent_req_nomem: bool (const void *, struct tevent_req *, const char *)
_tevent_req_notify_callback: void (struct tevent_req *, const char *)
_tevent_req_oom: void (struct tevent_req *, const char *)
_tevent_schedule_immediate: void (struct tevent_immediate *, struct tevent_context *, tevent_immediate_handler_t, void *, const char *, const char *)
+_tevent_threaded_schedule_immediate: void (struct tevent_threaded_context *, struct tevent_immediate *, tevent_immediate_handler_t, void *, const char *, const char *)
tevent_backend_list: const char **(TALLOC_CTX *)
tevent_cleanup_pending_signal_handlers: void (struct tevent_signal *)
tevent_common_add_fd: struct tevent_fd *(struct tevent_context *, TALLOC_CTX *, int, uint16_t, tevent_fd_handler_t, void *, const char *, const char *)
@@ -33,6 +34,7 @@ tevent_common_loop_immediate: bool (struct tevent_context *)
tevent_common_loop_timer_delay: struct timeval (struct tevent_context *)
tevent_common_loop_wait: int (struct tevent_context *, const char *)
tevent_common_schedule_immediate: void (struct tevent_immediate *, struct tevent_context *, tevent_immediate_handler_t, void *, const char *, const char *)
+tevent_common_threaded_activate_immediate: void (struct tevent_context *)
tevent_common_wakeup: int (struct tevent_context *)
tevent_common_wakeup_init: int (struct tevent_context *)
tevent_context_init: struct tevent_context *(TALLOC_CTX *)
@@ -80,6 +82,7 @@ tevent_set_trace_callback: void (struct tevent_context *, tevent_trace_callback_
tevent_signal_support: bool (struct tevent_context *)
tevent_thread_proxy_create: struct tevent_thread_proxy *(struct tevent_context *)
tevent_thread_proxy_schedule: void (struct tevent_thread_proxy *, struct tevent_immediate **, tevent_immediate_handler_t, void *)
+tevent_threaded_context_create: struct tevent_threaded_context *(TALLOC_CTX *, struct tevent_context *)
tevent_timeval_add: struct timeval (const struct timeval *, uint32_t, uint32_t)
tevent_timeval_compare: int (const struct timeval *, const struct timeval *)
tevent_timeval_current: struct timeval (void)
diff --git a/lib/tevent/tevent.c b/lib/tevent/tevent.c
index 34cd402..b8178b2 100644
--- a/lib/tevent/tevent.c
+++ b/lib/tevent/tevent.c
@@ -59,11 +59,16 @@
*/
#include "replace.h"
#include "system/filesys.h"
+#ifdef HAVE_PTHREAD
+#include "system/threads.h"
+#endif
#define TEVENT_DEPRECATED 1
#include "tevent.h"
#include "tevent_internal.h"
#include "tevent_util.h"
+static void tevent_abort(struct tevent_context *ev, const char *reason);
+
struct tevent_ops_list {
struct tevent_ops_list *next, *prev;
const char *name;
@@ -173,6 +178,91 @@ const char **tevent_backend_list(TALLOC_CTX *mem_ctx)
return list;
}
+#ifdef HAVE_PTHREAD
+
+static pthread_mutex_t tevent_contexts_mutex = PTHREAD_MUTEX_INITIALIZER;
+static struct tevent_context *tevent_contexts = NULL;
+static pthread_once_t tevent_atfork_initialized = PTHREAD_ONCE_INIT;
+
+static void tevent_atfork_prepare(void)
+{
+ struct tevent_context *ev;
+ int ret;
+
+ ret = pthread_mutex_lock(&tevent_contexts_mutex);
+ if (ret != 0) {
+ abort();
+ }
+
+ for (ev = tevent_contexts; ev != NULL; ev = ev->next) {
+ ret = pthread_mutex_lock(&ev->scheduled_mutex);
+ if (ret != 0) {
+ tevent_abort(ev, "pthread_mutex_lock failed");
+ }
+ }
+}
+
+static void tevent_atfork_parent(void)
+{
+ struct tevent_context *ev;
+ int ret;
+
+ for (ev = DLIST_TAIL(tevent_contexts); ev != NULL;
+ ev = DLIST_PREV(ev)) {
+ ret = pthread_mutex_unlock(&ev->scheduled_mutex);
+ if (ret != 0) {
+ tevent_abort(ev, "pthread_mutex_unlock failed");
+ }
+ }
+
+ ret = pthread_mutex_unlock(&tevent_contexts_mutex);
+ if (ret != 0) {
+ abort();
+ }
+}
+
+static void tevent_atfork_child(void)
+{
+ struct tevent_context *ev;
+ int ret;
+
+ for (ev = DLIST_TAIL(tevent_contexts); ev != NULL;
+ ev = DLIST_PREV(ev)) {
+ struct tevent_threaded_context *tctx;
+
+ for (tctx = ev->threaded_contexts; tctx != NULL;
+ tctx = tctx->next) {
+ tctx->event_ctx = NULL;
+ }
+
+ ev->threaded_contexts = NULL;
+
+ ret = pthread_mutex_unlock(&ev->scheduled_mutex);
+ if (ret != 0) {
+ tevent_abort(ev, "pthread_mutex_unlock failed");
+ }
+ }
+
+ ret = pthread_mutex_unlock(&tevent_contexts_mutex);
+ if (ret != 0) {
+ abort();
+ }
+}
+
+static void tevent_prep_atfork(void)
+{
+ int ret;
+
+ ret = pthread_atfork(tevent_atfork_prepare,
+ tevent_atfork_parent,
+ tevent_atfork_child);
+ if (ret != 0) {
+ abort();
+ }
+}
+
+#endif
+
int tevent_common_context_destructor(struct tevent_context *ev)
{
struct tevent_fd *fd, *fn;
@@ -180,6 +270,33 @@ int tevent_common_context_destructor(struct tevent_context *ev)
struct tevent_immediate *ie, *in;
struct tevent_signal *se, *sn;
+#ifdef HAVE_PTHREAD
+ int ret;
+
+ ret = pthread_mutex_lock(&tevent_contexts_mutex);
+ if (ret != 0) {
+ abort();
+ }
+
+ DLIST_REMOVE(tevent_contexts, ev);
+
+ ret = pthread_mutex_unlock(&tevent_contexts_mutex);
+ if (ret != 0) {
+ abort();
+ }
+#endif
+
+ if (ev->threaded_contexts != NULL) {
+ /*
+ * Threaded contexts are indicators that threads are
+ * about to send us immediates via
+ * tevent_threaded_schedule_immediate. The caller
+ * needs to make sure that the tevent context lives
+ * long enough to receive immediates from all threads.
+ */
+ tevent_abort(ev, "threaded contexts exist");
+ }
+
if (ev->pipe_fde) {
talloc_free(ev->pipe_fde);
close(ev->pipe_fds[0]);
@@ -255,6 +372,36 @@ struct tevent_context *tevent_context_init_ops(TALLOC_CTX *mem_ctx,
ev = talloc_zero(mem_ctx, struct tevent_context);
if (!ev) return NULL;
+#ifdef HAVE_PTHREAD
+
+ ret = pthread_once(&tevent_atfork_initialized, tevent_prep_atfork);
+ if (ret != 0) {
+ talloc_free(ev);
+ return NULL;
+ }
+
+ ret = pthread_mutex_init(&ev->scheduled_mutex, NULL);
+ if (ret != 0) {
+ talloc_free(ev);
+ return NULL;
+ }
+
+ ret = pthread_mutex_lock(&tevent_contexts_mutex);
+ if (ret != 0) {
+ pthread_mutex_destroy(&ev->scheduled_mutex);
+ talloc_free(ev);
+ return NULL;
+ }
+
+ DLIST_ADD(tevent_contexts, ev);
+
+ ret = pthread_mutex_unlock(&tevent_contexts_mutex);
+ if (ret != 0) {
+ abort();
+ }
+
+#endif
+
talloc_set_destructor(ev, tevent_common_context_destructor);
ev->ops = ops;
diff --git a/lib/tevent/tevent.h b/lib/tevent/tevent.h
index 1c1271b..2432344 100644
--- a/lib/tevent/tevent.h
+++ b/lib/tevent/tevent.h
@@ -40,6 +40,7 @@ struct tevent_timer;
struct tevent_immediate;
struct tevent_signal;
struct tevent_thread_proxy;
+struct tevent_threaded_context;
/**
* @defgroup tevent The tevent API
@@ -1750,6 +1751,79 @@ void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
tevent_immediate_handler_t handler,
void *pp_private_data);
+/*
+ * @brief Create a context for threaded activation of immediates
+ *
+ * A tevent_treaded_context provides a link into an event
+ * context. Using tevent_threaded_schedule_immediate, it is possible
+ * to activate an immediate event from within a thread.
+ *
+ * It is the duty of the caller of tevent_threaded_context_create() to
+ * keep the event context around longer than any
+ * tevent_threaded_context. tevent will abort if ev is talllc_free'ed
+ * with an active tevent_threaded_context.
+ *
+ * If tevent is build without pthread support, this always returns
+ * NULL with errno=ENOSYS.
+ *
+ * @param[in] mem_ctx The talloc memory context to use.
+ * @param[in] ev The event context to link this to.
+ * @return The threaded context, or NULL with errno set.
+ *
+ * @see tevent_threaded_schedule_immediate()
+ *
+ * @note Available as of tevent 0.9.30
+ */
+struct tevent_threaded_context *tevent_threaded_context_create(
+ TALLOC_CTX *mem_ctx, struct tevent_context *ev);
+
+#ifdef DOXYGEN
+/*
+ * @brief Activate an immediate from a thread
+ *
+ * Activate an immediate from within a thread.
+ *
+ * This routine does not watch out for talloc hierarchies. This means
+ * that it is highly recommended to create the tevent_immediate in the
+ * thread owning tctx, allocate a threaded job description for the
+ * thread, hand over both pointers to a helper thread and not touch it
+ * in the main thread at all anymore.
+ *
+ * tevent_threaded_schedule_immediate is intended as a job completion
+ * indicator for simple threaded helpers.
+ *
+ * Please be aware that tevent_threaded_schedule_immediate is very
+ * picky about its arguments: An immediate may not already be
+ * activated and the handler must exist. With
+ * tevent_threaded_schedule_immediate memory ownership is transferred
+ * to the main thread holding the tevent context behind tctx, the
+ * helper thread can't access it anymore.
+ *
+ * @param[in] tctx The threaded context to go through
+ * @param[in] im The immediate event to activate
+ * @param[in] handler The immediate handler to call in the main thread
+ * @param[in] private_data Pointer for the immediate handler
+ *
+ * @see tevent_threaded_context_create()
+ *
+ * @note Available as of tevent 0.9.30
+ */
+void tevent_threaded_schedule_immediate(struct tevent_threaded_context *tctx,
+ struct tevent_immediate *im,
+ tevent_immediate_handler_t handler,
+ void *private_data);
+#else
+void _tevent_threaded_schedule_immediate(struct tevent_threaded_context *tctx,
+ struct tevent_immediate *im,
+ tevent_immediate_handler_t handler,
+ void *private_data,
+ const char *handler_name,
+ const char *location);
+#define tevent_threaded_schedule_immediate(tctx, im, handler, private_data) \
+ _tevent_threaded_schedule_immediate(tctx, im, handler, private_data, \
+ #handler, __location__);
+#endif
+
#ifdef TEVENT_DEPRECATED
#ifndef _DEPRECATED_
#ifdef HAVE___ATTRIBUTE__
diff --git a/lib/tevent/tevent_epoll.c b/lib/tevent/tevent_epoll.c
index 507ea5c..4147c67 100644
--- a/lib/tevent/tevent_epoll.c
+++ b/lib/tevent/tevent_epoll.c
@@ -903,6 +903,10 @@ static int epoll_event_loop_once(struct tevent_context *ev, const char *location
return 0;
}
+ if (ev->threaded_contexts != NULL) {
+ tevent_common_threaded_activate_immediate(ev);
+ }
+
if (ev->immediate_events &&
tevent_common_loop_immediate(ev)) {
return 0;
diff --git a/lib/tevent/tevent_internal.h b/lib/tevent/tevent_internal.h
index 8362770..6b29547 100644
--- a/lib/tevent/tevent_internal.h
+++ b/lib/tevent/tevent_internal.h
@@ -228,6 +228,11 @@ struct tevent_signal {
void *additional_data;
};
+struct tevent_threaded_context {
+ struct tevent_threaded_context *next, *prev;
+ struct tevent_context *event_ctx;
+};
+
struct tevent_debug_ops {
void (*debug)(void *context, enum tevent_debug_level level,
const char *fmt, va_list ap) PRINTF_ATTRIBUTE(3,0);
@@ -247,6 +252,13 @@ struct tevent_context {
/* list of timed events - used by common code */
struct tevent_timer *timer_events;
+ /* List of threaded job indicators */
+ struct tevent_threaded_context *threaded_contexts;
+
+ /* List of scheduled immediates */
+ pthread_mutex_t scheduled_mutex;
+ struct tevent_immediate *scheduled_immediates;
+
/* list of immediate events - used by common code */
struct tevent_immediate *immediate_events;
@@ -282,6 +294,10 @@ struct tevent_context {
* tevent_common_add_timer_v2()
*/
struct tevent_timer *last_zero_timer;
+
+#ifdef HAVE_PTHREAD
+ struct tevent_context *prev, *next;
+#endif
};
const struct tevent_ops *tevent_find_ops_byname(const char *name);
@@ -327,6 +343,7 @@ void tevent_common_schedule_immediate(struct tevent_immediate *im,
const char *handler_name,
const char *location);
bool tevent_common_loop_immediate(struct tevent_context *ev);
+void tevent_common_threaded_activate_immediate(struct tevent_context *ev);
bool tevent_common_have_events(struct tevent_context *ev);
int tevent_common_wakeup_init(struct tevent_context *ev);
diff --git a/lib/tevent/tevent_poll.c b/lib/tevent/tevent_poll.c
index 3547e91..09d85fa 100644
--- a/lib/tevent/tevent_poll.c
+++ b/lib/tevent/tevent_poll.c
@@ -645,6 +645,10 @@ static int poll_event_loop_once(struct tevent_context *ev,
return 0;
}
+ if (ev->threaded_contexts != NULL) {
+ tevent_common_threaded_activate_immediate(ev);
+ }
+
if (ev->immediate_events &&
tevent_common_loop_immediate(ev)) {
return 0;
diff --git a/lib/tevent/tevent_port.c b/lib/tevent/tevent_port.c
index 4b524df..8cf9fd1 100644
--- a/lib/tevent/tevent_port.c
+++ b/lib/tevent/tevent_port.c
@@ -760,6 +760,10 @@ static int port_event_loop_once(struct tevent_context *ev, const char *location)
return 0;
}
+ if (ev->threaded_contexts != NULL) {
+ tevent_common_threaded_activate_immediate(ev);
+ }
+
if (ev->immediate_events &&
tevent_common_loop_immediate(ev)) {
return 0;
diff --git a/lib/tevent/tevent_select.c b/lib/tevent/tevent_select.c
index ec7565d..55dd0b6 100644
--- a/lib/tevent/tevent_select.c
+++ b/lib/tevent/tevent_select.c
@@ -244,6 +244,10 @@ static int select_event_loop_once(struct tevent_context *ev, const char *locatio
return 0;
}
+ if (ev->threaded_contexts != NULL) {
+ tevent_common_threaded_activate_immediate(ev);
+ }
+
if (ev->immediate_events &&
tevent_common_loop_immediate(ev)) {
return 0;
diff --git a/lib/tevent/tevent_threads.c b/lib/tevent/tevent_threads.c
index 22b854c..e42759e 100644
--- a/lib/tevent/tevent_threads.c
+++ b/lib/tevent/tevent_threads.c
@@ -371,3 +371,122 @@ void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
;
}
#endif
+
+static int tevent_threaded_context_destructor(
+ struct tevent_threaded_context *tctx)
+{
+ if (tctx->event_ctx != NULL) {
+ DLIST_REMOVE(tctx->event_ctx->threaded_contexts, tctx);
+ }
+ return 0;
+}
+
+struct tevent_threaded_context *tevent_threaded_context_create(
+ TALLOC_CTX *mem_ctx, struct tevent_context *ev)
+{
+#ifdef HAVE_PTHREAD
+ struct tevent_threaded_context *tctx;
+ int ret;
+
+ ret = tevent_common_wakeup_init(ev);
+ if (ret != 0) {
+ errno = ret;
+ return NULL;
+ }
+
+ tctx = talloc(mem_ctx, struct tevent_threaded_context);
+ if (tctx == NULL) {
+ return NULL;
+ }
+ tctx->event_ctx = ev;
+
+ DLIST_ADD(ev->threaded_contexts, tctx);
+ talloc_set_destructor(tctx, tevent_threaded_context_destructor);
+
+ return tctx;
+#else
+ errno = ENOSYS;
+ return NULL;
+#endif
+}
+
+void _tevent_threaded_schedule_immediate(struct tevent_threaded_context *tctx,
+ struct tevent_immediate *im,
+ tevent_immediate_handler_t handler,
+ void *private_data,
+ const char *handler_name,
+ const char *location)
+{
+#ifdef HAVE_PTHREAD
+ struct tevent_context *ev = tctx->event_ctx;
+ int ret;
+
+ if ((im->event_ctx != NULL) || (handler == NULL)) {
+ abort();
+ }
+
+ im->event_ctx = ev;
+ im->handler = handler;
+ im->private_data = private_data;
+ im->handler_name = handler_name;
+ im->schedule_location = location;
+ im->cancel_fn = NULL;
+ im->additional_data = NULL;
+
+ ret = pthread_mutex_lock(&ev->scheduled_mutex);
+ if (ret != 0) {
+ abort();
+ }
+
+ DLIST_ADD_END(ev->scheduled_immediates, im);
+
+ ret = pthread_mutex_unlock(&ev->scheduled_mutex);
+ if (ret != 0) {
+ abort();
+ }
+
+ /*
+ * We might want to wake up the main thread under the lock. We
+ * had a slightly similar situation in pthreadpool, changed
+ * with 1c4284c7395f23. This is not exactly the same, as the
+ * wakeup is only a last-resort thing in case the main thread
+ * is sleeping. Doing the wakeup under the lock can easily
+ * lead to a contended mutex, which is much more expensive
+ * than a noncontended one. So I'd opt for the lower footprint
+ * initially. Maybe we have to change that later.
+ */
+ tevent_common_wakeup(ev);
+#else
+ /*
+ * tevent_threaded_context_create() returned NULL with ENOSYS...
+ */
+ abort();
+#endif
+}
+
+void tevent_common_threaded_activate_immediate(struct tevent_context *ev)
+{
+#ifdef HAVE_PTHREAD
+ int ret;
+ ret = pthread_mutex_lock(&ev->scheduled_mutex);
+ if (ret != 0) {
+ abort();
+ }
+
+ while (ev->scheduled_immediates != NULL) {
+ struct tevent_immediate *im = ev->scheduled_immediates;
+ DLIST_REMOVE(ev->scheduled_immediates, im);
+ DLIST_ADD_END(ev->immediate_events, im);
+ }
+
+ ret = pthread_mutex_unlock(&ev->scheduled_mutex);
+ if (ret != 0) {
+ abort();
+ }
+#else
+ /*
+ * tevent_threaded_context_create() returned NULL with ENOSYS...
+ */
+ abort();
+#endif
+}
diff --git a/lib/tevent/wscript b/lib/tevent/wscript
index 71b9475..3e0b413 100755
--- a/lib/tevent/wscript
+++ b/lib/tevent/wscript
@@ -99,9 +99,13 @@ def build(bld):
private_library = True
if not bld.CONFIG_SET('USING_SYSTEM_TEVENT'):
+ tevent_deps = 'replace talloc'
+ if bld.CONFIG_SET('HAVE_PTHREAD'):
+ tevent_deps += ' pthread'
+
bld.SAMBA_LIBRARY('tevent',
SRC,
- deps='replace talloc',
+ deps=tevent_deps,
enabled= not bld.CONFIG_SET('USING_SYSTEM_TEVENT'),
includes='.',
abi_directory='ABI',
--
2.1.4
>From 04e969c9973257097ef488a6ebf944e033e80e6e Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 8 Aug 2016 12:51:56 +0200
Subject: [PATCH 05/23] lib: enable threaded immediates in source3
Logically this belongs into the previous patch, but tevent deserves isolated
patches :-)
Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
source3/lib/events.c | 4 ++++
1 file changed, 4 insertions(+)
diff --git a/source3/lib/events.c b/source3/lib/events.c
index 2e862ca..a866ef5 100644
--- a/source3/lib/events.c
+++ b/source3/lib/events.c
@@ -188,6 +188,10 @@ bool run_events_poll(struct tevent_context *ev, int pollrtn,
return true;
}
+ if (ev->threaded_contexts != NULL) {
+ tevent_common_threaded_activate_immediate(ev);
+ }
+
if (ev->immediate_events &&
tevent_common_loop_immediate(ev)) {
return true;
--
2.1.4
>From 86815b12b5d37b4d75d042ab4897bbca54f23aca Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 8 Aug 2016 08:56:23 +0200
Subject: [PATCH 06/23] tevent: reorder tevent_context for cache locality
No functionality change. This just looks better in objdump --disassemble :-)
Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
lib/tevent/tevent_internal.h | 25 ++++++++++++++++---------
1 file changed, 16 insertions(+), 9 deletions(-)
diff --git a/lib/tevent/tevent_internal.h b/lib/tevent/tevent_internal.h
index 6b29547..f17ce94 100644
--- a/lib/tevent/tevent_internal.h
+++ b/lib/tevent/tevent_internal.h
@@ -246,25 +246,32 @@ struct tevent_context {
/* the specific events implementation */
const struct tevent_ops *ops;
+ /*
+ * The following three pointers are queried on every loop_once
+ * in the order in which they appear here. Not measured, but
+ * hopefully putting them at the top together with "ops"
+ * should make tevent a *bit* more cache-friendly than before.
+ */
+
+ /* list of signal events - used by common code */
+ struct tevent_signal *signal_events;
+
+ /* List of threaded job indicators */
+ struct tevent_threaded_context *threaded_contexts;
+
+ /* list of immediate events - used by common code */
+ struct tevent_immediate *immediate_events;
+
/* list of fd events - used by common code */
struct tevent_fd *fd_events;
/* list of timed events - used by common code */
struct tevent_timer *timer_events;
- /* List of threaded job indicators */
- struct tevent_threaded_context *threaded_contexts;
-
/* List of scheduled immediates */
pthread_mutex_t scheduled_mutex;
struct tevent_immediate *scheduled_immediates;
- /* list of immediate events - used by common code */
- struct tevent_immediate *immediate_events;
-
- /* list of signal events - used by common code */
- struct tevent_signal *signal_events;
-
/* this is private for the events_ops implementation */
void *additional_data;
--
2.1.4
>From 04966c4d0a7567923eef759a2f4d3c655bb35096 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 8 Aug 2016 12:53:08 +0200
Subject: [PATCH 07/23] tevent: Simple test for threaded immediates
Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
lib/tevent/testsuite.c | 99 ++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 99 insertions(+)
diff --git a/lib/tevent/testsuite.c b/lib/tevent/testsuite.c
index b37c7b1..4783ab4 100644
--- a/lib/tevent/testsuite.c
+++ b/lib/tevent/testsuite.c
@@ -1148,6 +1148,101 @@ static bool test_multi_tevent_threaded_1(struct torture_context *test,
talloc_free(master_ev);
return true;
}
+
+struct threaded_test_2 {
+ struct tevent_threaded_context *tctx;
+ struct tevent_immediate *im;
+ pthread_t thread_id;
+};
+
+static void master_callback_2(struct tevent_context *ev,
+ struct tevent_immediate *im,
+ void *private_data);
+
+static void *thread_fn_2(void *private_data)
+{
+ struct threaded_test_2 *state = private_data;
+
+ state->thread_id = pthread_self();
+
+ usleep(random() % 7000);
+
+ tevent_threaded_schedule_immediate(
+ state->tctx, state->im, master_callback_2, state);
+
+ return NULL;
+}
+
+static void master_callback_2(struct tevent_context *ev,
+ struct tevent_immediate *im,
+ void *private_data)
+{
+ struct threaded_test_2 *state = private_data;
+ int i;
+
+ for (i = 0; i < NUM_TEVENT_THREADS; i++) {
+ if (pthread_equal(state->thread_id, thread_map[i])) {
+ break;
+ }
+ }
+ torture_comment(thread_test_ctx,
+ "Callback_2 %u from thread %u\n",
+ thread_counter,
+ i);
+ thread_counter++;
+}
+
+static bool test_multi_tevent_threaded_2(struct torture_context *test,
+ const void *test_data)
+{
+ unsigned i;
+
+ struct tevent_context *ev;
+ struct tevent_threaded_context *tctx;
+ int ret;
+
+ thread_test_ctx = test;
+ thread_counter = 0;
+
+ ev = tevent_context_init(test);
+ torture_assert(test, ev != NULL, "tevent_context_init failed");
+
+ tctx = tevent_threaded_context_create(ev, ev);
+ torture_assert(test, tctx != NULL,
+ "tevent_threaded_context_create failed");
+
+ for (i=0; i<NUM_TEVENT_THREADS; i++) {
+ struct threaded_test_2 *state;
+
+ state = talloc(ev, struct threaded_test_2);
+ torture_assert(test, state != NULL, "talloc failed");
+
+ state->tctx = tctx;
+ state->im = tevent_create_immediate(state);
+ torture_assert(test, state->im != NULL,
+ "tevent_create_immediate failed");
+
+ ret = pthread_create(&thread_map[i], NULL, thread_fn_2, state);
+ torture_assert(test, ret == 0, "pthread_create failed");
+ }
+
+ while (thread_counter < NUM_TEVENT_THREADS) {
+ ret = tevent_loop_once(ev);
+ torture_assert(test, ret == 0, "tevent_loop_once failed");
+ }
+
+ /* Wait for all the threads to finish - join 'em. */
+ for (i = 0; i < NUM_TEVENT_THREADS; i++) {
+ void *retval;
+ ret = pthread_join(thread_map[i], &retval);
+ torture_assert(test, ret == 0, "pthread_join failed");
+ /* Free the child thread event context. */
+ }
+
+ talloc_free(tctx);
+ talloc_free(ev);
+ return true;
+}
#endif
struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx)
@@ -1190,6 +1285,10 @@ struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx)
test_multi_tevent_threaded_1,
NULL);
+ torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded_2",
+ test_multi_tevent_threaded_2,
+ NULL);
+
#endif
return suite;
--
2.1.4
>From 5d9587fdf033102a202d8f3002f42772aa7c9f93 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 12 Aug 2016 16:00:56 +0200
Subject: [PATCH 08/23] tevent: Move rundown of the event pipe
Purely cosmetic change: This moves closing the signal/thread event pipe
to where it's opened. This prepares the eventfd support, making the
"magic" for eventfd more obvious.
Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
lib/tevent/tevent.c | 21 +++++++++++++++------
1 file changed, 15 insertions(+), 6 deletions(-)
diff --git a/lib/tevent/tevent.c b/lib/tevent/tevent.c
index b8178b2..d286850 100644
--- a/lib/tevent/tevent.c
+++ b/lib/tevent/tevent.c
@@ -178,6 +178,8 @@ const char **tevent_backend_list(TALLOC_CTX *mem_ctx)
return list;
}
+static void tevent_common_wakeup_fini(struct tevent_context *ev);
+
#ifdef HAVE_PTHREAD
static pthread_mutex_t tevent_contexts_mutex = PTHREAD_MUTEX_INITIALIZER;
@@ -297,12 +299,7 @@ int tevent_common_context_destructor(struct tevent_context *ev)
tevent_abort(ev, "threaded contexts exist");
}
- if (ev->pipe_fde) {
- talloc_free(ev->pipe_fde);
- close(ev->pipe_fds[0]);
- close(ev->pipe_fds[1]);
- ev->pipe_fde = NULL;
- }
+ tevent_common_wakeup_fini(ev);
for (fd = ev->fd_events; fd; fd = fn) {
fn = fd->next;
@@ -896,3 +893,15 @@ int tevent_common_wakeup(struct tevent_context *ev)
return 0;
}
+
+static void tevent_common_wakeup_fini(struct tevent_context *ev)
+{
+ if (ev->pipe_fde == NULL) {
+ return;
+ }
+
+ TALLOC_FREE(ev->pipe_fde);
+
+ close(ev->pipe_fds[0]);
+ close(ev->pipe_fds[1]);
+}
--
2.1.4
>From 0798ca4478efbb30ca795a99fcdf0267a3aa3680 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 12 Aug 2016 16:07:07 +0200
Subject: [PATCH 09/23] tevent: Move a variable declaration into a while block
Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
lib/tevent/tevent.c | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/lib/tevent/tevent.c b/lib/tevent/tevent.c
index d286850..e35eb5d 100644
--- a/lib/tevent/tevent.c
+++ b/lib/tevent/tevent.c
@@ -840,9 +840,9 @@ static void wakeup_pipe_handler(struct tevent_context *ev,
{
ssize_t ret;
- char c[16];
/* its non-blocking, doesn't matter if we read too much */
do {
+ char c[16];
ret = read(fde->fd, c, sizeof(c));
} while (ret == -1 && errno == EINTR);
}
--
2.1.4
>From ce1afb342a225a03fcecc9b050f84ae8fe6eb423 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 12 Aug 2016 16:32:33 +0200
Subject: [PATCH 10/23] tevent: Use eventfd for signal/thread wakeup
According to the manpage, eventfd is cheaper than a pipe. At least, we can save
a file descriptor and space for it in struct tevent_context :-)
Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
lib/tevent/tevent.c | 66 ++++++++++++++++++++++++++++++++------------
lib/tevent/tevent_internal.h | 7 +++--
2 files changed, 53 insertions(+), 20 deletions(-)
diff --git a/lib/tevent/tevent.c b/lib/tevent/tevent.c
index e35eb5d..331be0e 100644
--- a/lib/tevent/tevent.c
+++ b/lib/tevent/tevent.c
@@ -66,6 +66,9 @@
#include "tevent.h"
#include "tevent_internal.h"
#include "tevent_util.h"
+#ifdef HAVE_EVENTFD
+#include <sys/eventfd.h>
+#endif
static void tevent_abort(struct tevent_context *ev, const char *reason);
@@ -767,7 +770,7 @@ done:
bool tevent_common_have_events(struct tevent_context *ev)
{
if (ev->fd_events != NULL) {
- if (ev->fd_events != ev->pipe_fde) {
+ if (ev->fd_events != ev->wakeup_fde) {
return true;
}
if (ev->fd_events->next != NULL) {
@@ -840,10 +843,14 @@ static void wakeup_pipe_handler(struct tevent_context *ev,
{
ssize_t ret;
- /* its non-blocking, doesn't matter if we read too much */
do {
- char c[16];
- ret = read(fde->fd, c, sizeof(c));
+ /*
+ * This is the boilerplate for eventfd, but it works
+ * for pipes too. And as we don't care about the data
+ * we read, we're fine.
+ */
+ uint64_t val;
+ ret = read(fde->fd, &val, sizeof(val));
} while (ret == -1 && errno == EINTR);
}
@@ -855,23 +862,39 @@ int tevent_common_wakeup_init(struct tevent_context *ev)
{
int ret;
- if (ev->pipe_fde != NULL) {
+ if (ev->wakeup_fde != NULL) {
return 0;
}
- ret = pipe(ev->pipe_fds);
+#ifdef HAVE_EVENTFD
+ ret = eventfd(0, EFD_NONBLOCK);
if (ret == -1) {
return errno;
}
- ev_set_blocking(ev->pipe_fds[0], false);
- ev_set_blocking(ev->pipe_fds[1], false);
+ ev->wakeup_fd = ret;
+#else
+ {
+ int pipe_fds[2];
+ ret = pipe(pipe_fds);
+ if (ret == -1) {
+ return errno;
+ }
+ ev->wakeup_fd = pipe_fds[0];
+ ev->wakeup_write_fd = pipe_fds[1];
+
+ ev_set_blocking(ev->wakeup_fd, false);
+ ev_set_blocking(ev->wakeup_write_fd, false);
+ }
+#endif
- ev->pipe_fde = tevent_add_fd(ev, ev, ev->pipe_fds[0],
+ ev->wakeup_fde = tevent_add_fd(ev, ev, ev->wakeup_fd,
TEVENT_FD_READ,
wakeup_pipe_handler, NULL);
- if (ev->pipe_fde == NULL) {
- close(ev->pipe_fds[0]);
- close(ev->pipe_fds[1]);
+ if (ev->wakeup_fde == NULL) {
+ close(ev->wakeup_fd);
+#ifndef HAVE_EVENTFD
+ close(ev->wakeup_write_fd);
+#endif
return ENOMEM;
}
@@ -882,13 +905,18 @@ int tevent_common_wakeup(struct tevent_context *ev)
{
ssize_t ret;
- if (ev->pipe_fds[1] == -1) {
+ if (ev->wakeup_fde == NULL) {
return ENOTCONN;
}
do {
+#ifdef HAVE_EVENTFD
+ uint64_t val = 1;
+ ret = write(ev->wakeup_fd, &val, sizeof(val));
+#else
char c = '\0';
- ret = write(ev->pipe_fds[1], &c, 1);
+ ret = write(ev->wakeup_write_fd, &c, 1);
+#endif
} while ((ret == -1) && (errno == EINTR));
return 0;
@@ -896,12 +924,14 @@ int tevent_common_wakeup(struct tevent_context *ev)
static void tevent_common_wakeup_fini(struct tevent_context *ev)
{
- if (ev->pipe_fde == NULL) {
+ if (ev->wakeup_fde == NULL) {
return;
}
- TALLOC_FREE(ev->pipe_fde);
+ TALLOC_FREE(ev->wakeup_fde);
- close(ev->pipe_fds[0]);
- close(ev->pipe_fds[1]);
+ close(ev->wakeup_fd);
+#ifndef HAVE_EVENTFD
+ close(ev->wakeup_write_fd);
+#endif
}
diff --git a/lib/tevent/tevent_internal.h b/lib/tevent/tevent_internal.h
index f17ce94..d960544 100644
--- a/lib/tevent/tevent_internal.h
+++ b/lib/tevent/tevent_internal.h
@@ -276,8 +276,11 @@ struct tevent_context {
void *additional_data;
/* pipe hack used with signal handlers */
- struct tevent_fd *pipe_fde;
- int pipe_fds[2];
+ struct tevent_fd *wakeup_fde;
+ int wakeup_fd;
+#ifndef HAVE_EVENT_FD
+ int wakeup_write_fd;
+#endif
/* debugging operations */
struct tevent_debug_ops debug_ops;
--
2.1.4
>From 7d0c5ab16f272fd451fe5ada6018cb690286bc57 Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Wed, 17 Aug 2016 10:08:57 +0200
Subject: [PATCH 11/23] tevent: version 0.9.30
* add tevent_threaded_context_create() and tevent_threaded_schedule_immediate()
They add a way to pass the thread result from a helper thread into
the main event loop.
Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
lib/tevent/ABI/tevent-0.9.29.sigs | 6 ---
lib/tevent/ABI/tevent-0.9.30.sigs | 96 +++++++++++++++++++++++++++++++++++++++
lib/tevent/wscript | 2 +-
3 files changed, 97 insertions(+), 7 deletions(-)
create mode 100644 lib/tevent/ABI/tevent-0.9.30.sigs
diff --git a/lib/tevent/ABI/tevent-0.9.29.sigs b/lib/tevent/ABI/tevent-0.9.29.sigs
index 9b8bfa1..1357751 100644
--- a/lib/tevent/ABI/tevent-0.9.29.sigs
+++ b/lib/tevent/ABI/tevent-0.9.29.sigs
@@ -16,7 +16,6 @@ _tevent_req_nomem: bool (const void *, struct tevent_req *, const char *)
_tevent_req_notify_callback: void (struct tevent_req *, const char *)
_tevent_req_oom: void (struct tevent_req *, const char *)
_tevent_schedule_immediate: void (struct tevent_immediate *, struct tevent_context *, tevent_immediate_handler_t, void *, const char *, const char *)
-_tevent_threaded_schedule_immediate: void (struct tevent_threaded_context *, struct tevent_immediate *, tevent_immediate_handler_t, void *, const char *, const char *)
tevent_backend_list: const char **(TALLOC_CTX *)
tevent_cleanup_pending_signal_handlers: void (struct tevent_signal *)
tevent_common_add_fd: struct tevent_fd *(struct tevent_context *, TALLOC_CTX *, int, uint16_t, tevent_fd_handler_t, void *, const char *, const char *)
@@ -29,14 +28,10 @@ tevent_common_fd_destructor: int (struct tevent_fd *)
tevent_common_fd_get_flags: uint16_t (struct tevent_fd *)
tevent_common_fd_set_close_fn: void (struct tevent_fd *, tevent_fd_close_fn_t)
tevent_common_fd_set_flags: void (struct tevent_fd *, uint16_t)
-tevent_common_have_events: bool (struct tevent_context *)
tevent_common_loop_immediate: bool (struct tevent_context *)
tevent_common_loop_timer_delay: struct timeval (struct tevent_context *)
tevent_common_loop_wait: int (struct tevent_context *, const char *)
tevent_common_schedule_immediate: void (struct tevent_immediate *, struct tevent_context *, tevent_immediate_handler_t, void *, const char *, const char *)
-tevent_common_threaded_activate_immediate: void (struct tevent_context *)
-tevent_common_wakeup: int (struct tevent_context *)
-tevent_common_wakeup_init: int (struct tevent_context *)
tevent_context_init: struct tevent_context *(TALLOC_CTX *)
tevent_context_init_byname: struct tevent_context *(TALLOC_CTX *, const char *)
tevent_context_init_ops: struct tevent_context *(TALLOC_CTX *, const struct tevent_ops *, void *)
@@ -82,7 +77,6 @@ tevent_set_trace_callback: void (struct tevent_context *, tevent_trace_callback_
tevent_signal_support: bool (struct tevent_context *)
tevent_thread_proxy_create: struct tevent_thread_proxy *(struct tevent_context *)
tevent_thread_proxy_schedule: void (struct tevent_thread_proxy *, struct tevent_immediate **, tevent_immediate_handler_t, void *)
-tevent_threaded_context_create: struct tevent_threaded_context *(TALLOC_CTX *, struct tevent_context *)
tevent_timeval_add: struct timeval (const struct timeval *, uint32_t, uint32_t)
tevent_timeval_compare: int (const struct timeval *, const struct timeval *)
tevent_timeval_current: struct timeval (void)
diff --git a/lib/tevent/ABI/tevent-0.9.30.sigs b/lib/tevent/ABI/tevent-0.9.30.sigs
new file mode 100644
index 0000000..9b8bfa1
--- /dev/null
+++ b/lib/tevent/ABI/tevent-0.9.30.sigs
@@ -0,0 +1,96 @@
+_tevent_add_fd: struct tevent_fd *(struct tevent_context *, TALLOC_CTX *, int, uint16_t, tevent_fd_handler_t, void *, const char *, const char *)
+_tevent_add_signal: struct tevent_signal *(struct tevent_context *, TALLOC_CTX *, int, int, tevent_signal_handler_t, void *, const char *, const char *)
+_tevent_add_timer: struct tevent_timer *(struct tevent_context *, TALLOC_CTX *, struct timeval, tevent_timer_handler_t, void *, const char *, const char *)
+_tevent_create_immediate: struct tevent_immediate *(TALLOC_CTX *, const char *)
+_tevent_loop_once: int (struct tevent_context *, const char *)
+_tevent_loop_until: int (struct tevent_context *, bool (*)(void *), void *, const char *)
+_tevent_loop_wait: int (struct tevent_context *, const char *)
+_tevent_queue_create: struct tevent_queue *(TALLOC_CTX *, const char *, const char *)
+_tevent_req_callback_data: void *(struct tevent_req *)
+_tevent_req_cancel: bool (struct tevent_req *, const char *)
+_tevent_req_create: struct tevent_req *(TALLOC_CTX *, void *, size_t, const char *, const char *)
+_tevent_req_data: void *(struct tevent_req *)
+_tevent_req_done: void (struct tevent_req *, const char *)
+_tevent_req_error: bool (struct tevent_req *, uint64_t, const char *)
+_tevent_req_nomem: bool (const void *, struct tevent_req *, const char *)
+_tevent_req_notify_callback: void (struct tevent_req *, const char *)
+_tevent_req_oom: void (struct tevent_req *, const char *)
+_tevent_schedule_immediate: void (struct tevent_immediate *, struct tevent_context *, tevent_immediate_handler_t, void *, const char *, const char *)
+_tevent_threaded_schedule_immediate: void (struct tevent_threaded_context *, struct tevent_immediate *, tevent_immediate_handler_t, void *, const char *, const char *)
+tevent_backend_list: const char **(TALLOC_CTX *)
+tevent_cleanup_pending_signal_handlers: void (struct tevent_signal *)
+tevent_common_add_fd: struct tevent_fd *(struct tevent_context *, TALLOC_CTX *, int, uint16_t, tevent_fd_handler_t, void *, const char *, const char *)
+tevent_common_add_signal: struct tevent_signal *(struct tevent_context *, TALLOC_CTX *, int, int, tevent_signal_handler_t, void *, const char *, const char *)
+tevent_common_add_timer: struct tevent_timer *(struct tevent_context *, TALLOC_CTX *, struct timeval, tevent_timer_handler_t, void *, const char *, const char *)
+tevent_common_add_timer_v2: struct tevent_timer *(struct tevent_context *, TALLOC_CTX *, struct timeval, tevent_timer_handler_t, void *, const char *, const char *)
+tevent_common_check_signal: int (struct tevent_context *)
+tevent_common_context_destructor: int (struct tevent_context *)
+tevent_common_fd_destructor: int (struct tevent_fd *)
+tevent_common_fd_get_flags: uint16_t (struct tevent_fd *)
+tevent_common_fd_set_close_fn: void (struct tevent_fd *, tevent_fd_close_fn_t)
+tevent_common_fd_set_flags: void (struct tevent_fd *, uint16_t)
+tevent_common_have_events: bool (struct tevent_context *)
+tevent_common_loop_immediate: bool (struct tevent_context *)
+tevent_common_loop_timer_delay: struct timeval (struct tevent_context *)
+tevent_common_loop_wait: int (struct tevent_context *, const char *)
+tevent_common_schedule_immediate: void (struct tevent_immediate *, struct tevent_context *, tevent_immediate_handler_t, void *, const char *, const char *)
+tevent_common_threaded_activate_immediate: void (struct tevent_context *)
+tevent_common_wakeup: int (struct tevent_context *)
+tevent_common_wakeup_init: int (struct tevent_context *)
+tevent_context_init: struct tevent_context *(TALLOC_CTX *)
+tevent_context_init_byname: struct tevent_context *(TALLOC_CTX *, const char *)
+tevent_context_init_ops: struct tevent_context *(TALLOC_CTX *, const struct tevent_ops *, void *)
+tevent_debug: void (struct tevent_context *, enum tevent_debug_level, const char *, ...)
+tevent_fd_get_flags: uint16_t (struct tevent_fd *)
+tevent_fd_set_auto_close: void (struct tevent_fd *)
+tevent_fd_set_close_fn: void (struct tevent_fd *, tevent_fd_close_fn_t)
+tevent_fd_set_flags: void (struct tevent_fd *, uint16_t)
+tevent_get_trace_callback: void (struct tevent_context *, tevent_trace_callback_t *, void *)
+tevent_loop_allow_nesting: void (struct tevent_context *)
+tevent_loop_set_nesting_hook: void (struct tevent_context *, tevent_nesting_hook, void *)
+tevent_num_signals: size_t (void)
+tevent_queue_add: bool (struct tevent_queue *, struct tevent_context *, struct tevent_req *, tevent_queue_trigger_fn_t, void *)
+tevent_queue_add_entry: struct tevent_queue_entry *(struct tevent_queue *, struct tevent_context *, struct tevent_req *, tevent_queue_trigger_fn_t, void *)
+tevent_queue_add_optimize_empty: struct tevent_queue_entry *(struct tevent_queue *, struct tevent_context *, struct tevent_req *, tevent_queue_trigger_fn_t, void *)
+tevent_queue_length: size_t (struct tevent_queue *)
+tevent_queue_running: bool (struct tevent_queue *)
+tevent_queue_start: void (struct tevent_queue *)
+tevent_queue_stop: void (struct tevent_queue *)
+tevent_queue_wait_recv: bool (struct tevent_req *)
+tevent_queue_wait_send: struct tevent_req *(TALLOC_CTX *, struct tevent_context *, struct tevent_queue *)
+tevent_re_initialise: int (struct tevent_context *)
+tevent_register_backend: bool (const char *, const struct tevent_ops *)
+tevent_req_default_print: char *(struct tevent_req *, TALLOC_CTX *)
+tevent_req_defer_callback: void (struct tevent_req *, struct tevent_context *)
+tevent_req_is_error: bool (struct tevent_req *, enum tevent_req_state *, uint64_t *)
+tevent_req_is_in_progress: bool (struct tevent_req *)
+tevent_req_poll: bool (struct tevent_req *, struct tevent_context *)
+tevent_req_post: struct tevent_req *(struct tevent_req *, struct tevent_context *)
+tevent_req_print: char *(TALLOC_CTX *, struct tevent_req *)
+tevent_req_received: void (struct tevent_req *)
+tevent_req_set_callback: void (struct tevent_req *, tevent_req_fn, void *)
+tevent_req_set_cancel_fn: void (struct tevent_req *, tevent_req_cancel_fn)
+tevent_req_set_cleanup_fn: void (struct tevent_req *, tevent_req_cleanup_fn)
+tevent_req_set_endtime: bool (struct tevent_req *, struct tevent_context *, struct timeval)
+tevent_req_set_print_fn: void (struct tevent_req *, tevent_req_print_fn)
+tevent_sa_info_queue_count: size_t (void)
+tevent_set_abort_fn: void (void (*)(const char *))
+tevent_set_debug: int (struct tevent_context *, void (*)(void *, enum tevent_debug_level, const char *, va_list), void *)
+tevent_set_debug_stderr: int (struct tevent_context *)
+tevent_set_default_backend: void (const char *)
+tevent_set_trace_callback: void (struct tevent_context *, tevent_trace_callback_t, void *)
+tevent_signal_support: bool (struct tevent_context *)
+tevent_thread_proxy_create: struct tevent_thread_proxy *(struct tevent_context *)
+tevent_thread_proxy_schedule: void (struct tevent_thread_proxy *, struct tevent_immediate **, tevent_immediate_handler_t, void *)
+tevent_threaded_context_create: struct tevent_threaded_context *(TALLOC_CTX *, struct tevent_context *)
+tevent_timeval_add: struct timeval (const struct timeval *, uint32_t, uint32_t)
+tevent_timeval_compare: int (const struct timeval *, const struct timeval *)
+tevent_timeval_current: struct timeval (void)
+tevent_timeval_current_ofs: struct timeval (uint32_t, uint32_t)
+tevent_timeval_is_zero: bool (const struct timeval *)
+tevent_timeval_set: struct timeval (uint32_t, uint32_t)
+tevent_timeval_until: struct timeval (const struct timeval *, const struct timeval *)
+tevent_timeval_zero: struct timeval (void)
+tevent_trace_point_callback: void (struct tevent_context *, enum tevent_trace_point)
+tevent_wakeup_recv: bool (struct tevent_req *)
+tevent_wakeup_send: struct tevent_req *(TALLOC_CTX *, struct tevent_context *, struct timeval)
diff --git a/lib/tevent/wscript b/lib/tevent/wscript
index 3e0b413..380316d 100755
--- a/lib/tevent/wscript
+++ b/lib/tevent/wscript
@@ -1,7 +1,7 @@
#!/usr/bin/env python
APPNAME = 'tevent'
-VERSION = '0.9.29'
+VERSION = '0.9.30'
blddir = 'bin'
--
2.1.4
>From ad5309b5dfac1d6b62d2b1b79ea4c439b60432a9 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Sat, 30 Jul 2016 10:20:08 +0200
Subject: [PATCH 12/23] lib: Add pthreadpool_pipe
First step to separate the signalling mechanism from the core pthreadpool code.
A later patch will add a pthreadpool that directly indicates job completion via
tevent_threaded_schedule_immediate.
Signed-off-by: Volker Lendecke <vl at samba.org>
---
source3/lib/pthreadpool/pthreadpool_pipe.c | 84 ++++++++++++++++++++++++++++++
source3/lib/pthreadpool/pthreadpool_pipe.h | 39 ++++++++++++++
source3/lib/pthreadpool/wscript_build | 4 +-
3 files changed, 125 insertions(+), 2 deletions(-)
create mode 100644 source3/lib/pthreadpool/pthreadpool_pipe.c
create mode 100644 source3/lib/pthreadpool/pthreadpool_pipe.h
diff --git a/source3/lib/pthreadpool/pthreadpool_pipe.c b/source3/lib/pthreadpool/pthreadpool_pipe.c
new file mode 100644
index 0000000..76bafa2
--- /dev/null
+++ b/source3/lib/pthreadpool/pthreadpool_pipe.c
@@ -0,0 +1,84 @@
+/*
+ * Unix SMB/CIFS implementation.
+ * threadpool implementation based on pthreads
+ * Copyright (C) Volker Lendecke 2009,2011
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "replace.h"
+#include "system/filesys.h"
+#include "pthreadpool_pipe.h"
+#include "pthreadpool.h"
+
+struct pthreadpool_pipe {
+ struct pthreadpool *pool;
+};
+
+int pthreadpool_pipe_init(unsigned max_threads,
+ struct pthreadpool_pipe **presult)
+{
+ struct pthreadpool_pipe *p;
+ int ret;
+
+ p = malloc(sizeof(struct pthreadpool_pipe));
+ if (p == NULL) {
+ return ENOMEM;
+ }
+
+ ret = pthreadpool_init(max_threads, &p->pool);
+ if (ret != 0) {
+ free(p);
+ return ret;
+ }
+
+ *presult = p;
+ return 0;
+}
+
+int pthreadpool_pipe_destroy(struct pthreadpool_pipe *pool)
+{
+ int ret;
+
+ ret = pthreadpool_destroy(pool->pool);
+ if (ret != 0) {
+ return ret;
+ }
+ free(pool);
+ return 0;
+}
+
+int pthreadpool_pipe_add_job(struct pthreadpool_pipe *pool, int job_id,
+ void (*fn)(void *private_data),
+ void *private_data)
+{
+ int ret;
+ ret = pthreadpool_add_job(pool->pool, job_id, fn, private_data);
+ return ret;
+}
+
+int pthreadpool_pipe_signal_fd(struct pthreadpool_pipe *pool)
+{
+ int fd;
+ fd = pthreadpool_signal_fd(pool->pool);
+ return fd;
+}
+
+int pthreadpool_pipe_finished_jobs(struct pthreadpool_pipe *pool, int *jobids,
+ unsigned num_jobids)
+{
+ int ret;
+ ret = pthreadpool_finished_jobs(pool->pool, jobids, num_jobids);
+ return ret;
+}
diff --git a/source3/lib/pthreadpool/pthreadpool_pipe.h b/source3/lib/pthreadpool/pthreadpool_pipe.h
new file mode 100644
index 0000000..77516f7
--- /dev/null
+++ b/source3/lib/pthreadpool/pthreadpool_pipe.h
@@ -0,0 +1,39 @@
+/*
+ * Unix SMB/CIFS implementation.
+ * threadpool implementation based on pthreads
+ * Copyright (C) Volker Lendecke 2009,2011
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef __PTHREADPOOL_PIPE_H__
+#define __PTHREADPOOL_PIPE_H__
+
+struct pthreadpool_pipe;
+
+int pthreadpool_pipe_init(unsigned max_threads,
+ struct pthreadpool_pipe **presult);
+
+int pthreadpool_pipe_destroy(struct pthreadpool_pipe *pool);
+
+int pthreadpool_pipe_add_job(struct pthreadpool_pipe *pool, int job_id,
+ void (*fn)(void *private_data),
+ void *private_data);
+
+int pthreadpool_pipe_signal_fd(struct pthreadpool_pipe *pool);
+
+int pthreadpool_pipe_finished_jobs(struct pthreadpool_pipe *pool, int *jobids,
+ unsigned num_jobids);
+
+#endif
diff --git a/source3/lib/pthreadpool/wscript_build b/source3/lib/pthreadpool/wscript_build
index bdd5f53..aa02850 100644
--- a/source3/lib/pthreadpool/wscript_build
+++ b/source3/lib/pthreadpool/wscript_build
@@ -2,11 +2,11 @@
if bld.env.WITH_PTHREADPOOL:
bld.SAMBA3_SUBSYSTEM('PTHREADPOOL',
- source='pthreadpool.c',
+ source='pthreadpool.c pthreadpool_pipe.c',
deps='pthread rt replace')
else:
bld.SAMBA3_SUBSYSTEM('PTHREADPOOL',
- source='pthreadpool_sync.c',
+ source='pthreadpool_sync.c pthreadpool_pipe.c',
deps='replace')
--
2.1.4
>From 7cb698e7b614bc22fc9d32523a6fc4f86980e044 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 15 Aug 2016 13:57:20 +0200
Subject: [PATCH 13/23] lib: Use pthreadpool_pipe instead of pthreadpool
Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
source3/lib/asys/asys.c | 22 ++++-----
source3/lib/fncall.c | 16 +++----
source3/lib/pthreadpool/tests.c | 94 +++++++++++++++++++------------------
source3/lib/unix_msg/unix_msg.c | 22 ++++-----
source3/modules/vfs_aio_pthread.c | 22 ++++-----
source3/torture/bench_pthreadpool.c | 22 ++++-----
6 files changed, 100 insertions(+), 98 deletions(-)
diff --git a/source3/lib/asys/asys.c b/source3/lib/asys/asys.c
index 670be01..594d470 100644
--- a/source3/lib/asys/asys.c
+++ b/source3/lib/asys/asys.c
@@ -19,7 +19,7 @@
#include "asys.h"
#include <stdlib.h>
#include <errno.h>
-#include "../pthreadpool/pthreadpool.h"
+#include "../pthreadpool/pthreadpool_pipe.h"
#include "lib/util/time.h"
#include "smbprofile.h"
@@ -59,8 +59,8 @@ struct asys_job {
};
struct asys_context {
- struct pthreadpool *pool;
- int pthreadpool_fd;
+ struct pthreadpool_pipe *pool;
+ int pthreadpool_pipe_fd;
unsigned num_jobs;
struct asys_job **jobs;
@@ -79,12 +79,12 @@ int asys_context_init(struct asys_context **pctx, unsigned max_parallel)
if (ctx == NULL) {
return ENOMEM;
}
- ret = pthreadpool_init(max_parallel, &ctx->pool);
+ ret = pthreadpool_pipe_init(max_parallel, &ctx->pool);
if (ret != 0) {
free(ctx);
return ret;
}
- ctx->pthreadpool_fd = pthreadpool_signal_fd(ctx->pool);
+ ctx->pthreadpool_pipe_fd = pthreadpool_pipe_signal_fd(ctx->pool);
*pctx = ctx;
return 0;
@@ -92,7 +92,7 @@ int asys_context_init(struct asys_context **pctx, unsigned max_parallel)
int asys_signalfd(struct asys_context *ctx)
{
- return ctx->pthreadpool_fd;
+ return ctx->pthreadpool_pipe_fd;
}
int asys_context_destroy(struct asys_context *ctx)
@@ -106,7 +106,7 @@ int asys_context_destroy(struct asys_context *ctx)
}
}
- ret = pthreadpool_destroy(ctx->pool);
+ ret = pthreadpool_pipe_destroy(ctx->pool);
if (ret != 0) {
return ret;
}
@@ -179,7 +179,7 @@ int asys_pwrite(struct asys_context *ctx, int fildes, const void *buf,
args->nbyte = nbyte;
args->offset = offset;
- ret = pthreadpool_add_job(ctx->pool, jobid, asys_pwrite_do, job);
+ ret = pthreadpool_pipe_add_job(ctx->pool, jobid, asys_pwrite_do, job);
if (ret != 0) {
return ret;
}
@@ -224,7 +224,7 @@ int asys_pread(struct asys_context *ctx, int fildes, void *buf,
args->nbyte = nbyte;
args->offset = offset;
- ret = pthreadpool_add_job(ctx->pool, jobid, asys_pread_do, job);
+ ret = pthreadpool_pipe_add_job(ctx->pool, jobid, asys_pread_do, job);
if (ret != 0) {
return ret;
}
@@ -265,7 +265,7 @@ int asys_fsync(struct asys_context *ctx, int fildes, void *private_data)
args = &job->args.fsync_args;
args->fildes = fildes;
- ret = pthreadpool_add_job(ctx->pool, jobid, asys_fsync_do, job);
+ ret = pthreadpool_pipe_add_job(ctx->pool, jobid, asys_fsync_do, job);
if (ret != 0) {
return ret;
}
@@ -307,7 +307,7 @@ int asys_results(struct asys_context *ctx, struct asys_result *results,
int jobids[num_results];
int i, ret;
- ret = pthreadpool_finished_jobs(ctx->pool, jobids, num_results);
+ ret = pthreadpool_pipe_finished_jobs(ctx->pool, jobids, num_results);
if (ret <= 0) {
return ret;
}
diff --git a/source3/lib/fncall.c b/source3/lib/fncall.c
index 88304d6..0923c14 100644
--- a/source3/lib/fncall.c
+++ b/source3/lib/fncall.c
@@ -20,7 +20,7 @@
#include "includes.h"
#include "../lib/util/tevent_unix.h"
-#include "lib/pthreadpool/pthreadpool.h"
+#include "lib/pthreadpool/pthreadpool_pipe.h"
struct fncall_state {
struct fncall_context *ctx;
@@ -32,7 +32,7 @@ struct fncall_state {
};
struct fncall_context {
- struct pthreadpool *pool;
+ struct pthreadpool_pipe *pool;
int next_job_id;
int sig_fd;
struct tevent_req **pending;
@@ -61,7 +61,7 @@ static int fncall_context_destructor(struct fncall_context *ctx)
fncall_handler(NULL, NULL, TEVENT_FD_READ, ctx);
}
- pthreadpool_destroy(ctx->pool);
+ pthreadpool_pipe_destroy(ctx->pool);
ctx->pool = NULL;
return 0;
@@ -78,14 +78,14 @@ struct fncall_context *fncall_context_init(TALLOC_CTX *mem_ctx,
return NULL;
}
- ret = pthreadpool_init(max_threads, &ctx->pool);
+ ret = pthreadpool_pipe_init(max_threads, &ctx->pool);
if (ret != 0) {
TALLOC_FREE(ctx);
return NULL;
}
talloc_set_destructor(ctx, fncall_context_destructor);
- ctx->sig_fd = pthreadpool_signal_fd(ctx->pool);
+ ctx->sig_fd = pthreadpool_pipe_signal_fd(ctx->pool);
if (ctx->sig_fd == -1) {
TALLOC_FREE(ctx);
return NULL;
@@ -266,8 +266,8 @@ struct tevent_req *fncall_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
state->private_parent = talloc_parent(private_data);
state->job_private = talloc_move(state, &private_data);
- ret = pthreadpool_add_job(state->ctx->pool, state->job_id, fn,
- state->job_private);
+ ret = pthreadpool_pipe_add_job(state->ctx->pool, state->job_id, fn,
+ state->job_private);
if (ret == -1) {
tevent_req_error(req, errno);
return tevent_req_post(req, ev);
@@ -287,7 +287,7 @@ static void fncall_handler(struct tevent_context *ev, struct tevent_fd *fde,
int i, num_pending;
int job_id;
- if (pthreadpool_finished_jobs(ctx->pool, &job_id, 1) < 0) {
+ if (pthreadpool_pipe_finished_jobs(ctx->pool, &job_id, 1) < 0) {
return;
}
diff --git a/source3/lib/pthreadpool/tests.c b/source3/lib/pthreadpool/tests.c
index 8474712..0b48b41 100644
--- a/source3/lib/pthreadpool/tests.c
+++ b/source3/lib/pthreadpool/tests.c
@@ -7,22 +7,22 @@
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
-#include "pthreadpool.h"
+#include "pthreadpool_pipe.h"
static int test_init(void)
{
- struct pthreadpool *p;
+ struct pthreadpool_pipe *p;
int ret;
- ret = pthreadpool_init(1, &p);
+ ret = pthreadpool_pipe_init(1, &p);
if (ret != 0) {
- fprintf(stderr, "pthreadpool_init failed: %s\n",
+ fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
strerror(ret));
return -1;
}
- ret = pthreadpool_destroy(p);
+ ret = pthreadpool_pipe_destroy(p);
if (ret != 0) {
- fprintf(stderr, "pthreadpool_init failed: %s\n",
+ fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
strerror(ret));
return -1;
}
@@ -43,7 +43,7 @@ static void test_sleep(void *ptr)
static int test_jobs(int num_threads, int num_jobs)
{
char *finished;
- struct pthreadpool *p;
+ struct pthreadpool_pipe *p;
int timeout = 1;
int i, ret;
@@ -53,25 +53,25 @@ static int test_jobs(int num_threads, int num_jobs)
return -1;
}
- ret = pthreadpool_init(num_threads, &p);
+ ret = pthreadpool_pipe_init(num_threads, &p);
if (ret != 0) {
- fprintf(stderr, "pthreadpool_init failed: %s\n",
+ fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
strerror(ret));
return -1;
}
for (i=0; i<num_jobs; i++) {
- ret = pthreadpool_add_job(p, i, test_sleep, &timeout);
+ ret = pthreadpool_pipe_add_job(p, i, test_sleep, &timeout);
if (ret != 0) {
- fprintf(stderr, "pthreadpool_add_job failed: %s\n",
- strerror(ret));
+ fprintf(stderr, "pthreadpool_pipe_add_job failed: "
+ "%s\n", strerror(ret));
return -1;
}
}
for (i=0; i<num_jobs; i++) {
int jobid = -1;
- ret = pthreadpool_finished_jobs(p, &jobid, 1);
+ ret = pthreadpool_pipe_finished_jobs(p, &jobid, 1);
if ((ret != 1) || (jobid >= num_jobs)) {
fprintf(stderr, "invalid job number %d\n", jobid);
return -1;
@@ -87,9 +87,9 @@ static int test_jobs(int num_threads, int num_jobs)
}
}
- ret = pthreadpool_destroy(p);
+ ret = pthreadpool_pipe_destroy(p);
if (ret != 0) {
- fprintf(stderr, "pthreadpool_destroy failed: %s\n",
+ fprintf(stderr, "pthreadpool_pipe_destroy failed: %s\n",
strerror(ret));
return -1;
}
@@ -100,37 +100,37 @@ static int test_jobs(int num_threads, int num_jobs)
static int test_busydestroy(void)
{
- struct pthreadpool *p;
+ struct pthreadpool_pipe *p;
int timeout = 50;
struct pollfd pfd;
int ret;
- ret = pthreadpool_init(1, &p);
+ ret = pthreadpool_pipe_init(1, &p);
if (ret != 0) {
- fprintf(stderr, "pthreadpool_init failed: %s\n",
+ fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
strerror(ret));
return -1;
}
- ret = pthreadpool_add_job(p, 1, test_sleep, &timeout);
+ ret = pthreadpool_pipe_add_job(p, 1, test_sleep, &timeout);
if (ret != 0) {
- fprintf(stderr, "pthreadpool_add_job failed: %s\n",
+ fprintf(stderr, "pthreadpool_pipe_add_job failed: %s\n",
strerror(ret));
return -1;
}
- ret = pthreadpool_destroy(p);
+ ret = pthreadpool_pipe_destroy(p);
if (ret != EBUSY) {
fprintf(stderr, "Could destroy a busy pool\n");
return -1;
}
- pfd.fd = pthreadpool_signal_fd(p);
+ pfd.fd = pthreadpool_pipe_signal_fd(p);
pfd.events = POLLIN|POLLERR;
poll(&pfd, 1, -1);
- ret = pthreadpool_destroy(p);
+ ret = pthreadpool_pipe_destroy(p);
if (ret != 0) {
- fprintf(stderr, "pthreadpool_destroy failed: %s\n",
+ fprintf(stderr, "pthreadpool_pipe_destroy failed: %s\n",
strerror(ret));
return -1;
}
@@ -139,7 +139,7 @@ static int test_busydestroy(void)
struct threaded_state {
pthread_t tid;
- struct pthreadpool *p;
+ struct pthreadpool_pipe *p;
int start_job;
int num_jobs;
int timeout;
@@ -151,11 +151,12 @@ static void *test_threaded_worker(void *p)
int i;
for (i=0; i<state->num_jobs; i++) {
- int ret = pthreadpool_add_job(state->p, state->start_job + i,
- test_sleep, &state->timeout);
+ int ret = pthreadpool_pipe_add_job(
+ state->p, state->start_job + i,
+ test_sleep, &state->timeout);
if (ret != 0) {
- fprintf(stderr, "pthreadpool_add_job failed: %s\n",
- strerror(ret));
+ fprintf(stderr, "pthreadpool_pipe_add_job failed: "
+ "%s\n", strerror(ret));
return NULL;
}
}
@@ -165,7 +166,7 @@ static void *test_threaded_worker(void *p)
static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
int num_jobs)
{
- struct pthreadpool **pools;
+ struct pthreadpool_pipe **pools;
struct threaded_state *states;
struct threaded_state *state;
struct pollfd *pfds;
@@ -186,7 +187,7 @@ static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
return -1;
}
- pools = calloc(num_pools, sizeof(struct pthreadpool *));
+ pools = calloc(num_pools, sizeof(struct pthreadpool_pipe *));
if (pools == NULL) {
fprintf(stderr, "calloc failed\n");
return -1;
@@ -199,13 +200,13 @@ static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
}
for (i=0; i<num_pools; i++) {
- ret = pthreadpool_init(poolsize, &pools[i]);
+ ret = pthreadpool_pipe_init(poolsize, &pools[i]);
if (ret != 0) {
- fprintf(stderr, "pthreadpool_init failed: %s\n",
+ fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
strerror(ret));
return -1;
}
- pfds[i].fd = pthreadpool_signal_fd(pools[i]);
+ pfds[i].fd = pthreadpool_pipe_signal_fd(pools[i]);
pfds[i].events = POLLIN|POLLHUP;
}
@@ -241,10 +242,10 @@ static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
}
if (child == 0) {
for (i=0; i<num_pools; i++) {
- ret = pthreadpool_destroy(pools[i]);
+ ret = pthreadpool_pipe_destroy(pools[i]);
if (ret != 0) {
- fprintf(stderr, "pthreadpool_destroy failed: "
- "%s\n", strerror(ret));
+ fprintf(stderr, "pthreadpool_pipe_destroy "
+ "failed: %s\n", strerror(ret));
exit(1);
}
}
@@ -284,7 +285,8 @@ static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
continue;
}
- ret = pthreadpool_finished_jobs(pools[j], &jobid, 1);
+ ret = pthreadpool_pipe_finished_jobs(
+ pools[j], &jobid, 1);
if ((ret != 1) || (jobid >= num_jobs * num_threads)) {
fprintf(stderr, "invalid job number %d\n",
jobid);
@@ -304,10 +306,10 @@ static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
}
for (i=0; i<num_pools; i++) {
- ret = pthreadpool_destroy(pools[i]);
+ ret = pthreadpool_pipe_destroy(pools[i]);
if (ret != 0) {
- fprintf(stderr, "pthreadpool_destroy failed: %s\n",
- strerror(ret));
+ fprintf(stderr, "pthreadpool_pipe_destroy failed: "
+ "%s\n", strerror(ret));
return -1;
}
}
@@ -322,19 +324,19 @@ static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
static int test_fork(void)
{
- struct pthreadpool *p;
+ struct pthreadpool_pipe *p;
pid_t child, waited;
int status, ret;
- ret = pthreadpool_init(1, &p);
+ ret = pthreadpool_pipe_init(1, &p);
if (ret != 0) {
- fprintf(stderr, "pthreadpool_init failed: %s\n",
+ fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
strerror(ret));
return -1;
}
- ret = pthreadpool_destroy(p);
+ ret = pthreadpool_pipe_destroy(p);
if (ret != 0) {
- fprintf(stderr, "pthreadpool_destroy failed: %s\n",
+ fprintf(stderr, "pthreadpool_pipe_destroy failed: %s\n",
strerror(ret));
return -1;
}
diff --git a/source3/lib/unix_msg/unix_msg.c b/source3/lib/unix_msg/unix_msg.c
index aed1f75..5fac68b 100644
--- a/source3/lib/unix_msg/unix_msg.c
+++ b/source3/lib/unix_msg/unix_msg.c
@@ -22,7 +22,7 @@
#include "system/time.h"
#include "system/network.h"
#include "lib/util/dlinklist.h"
-#include "pthreadpool/pthreadpool.h"
+#include "pthreadpool/pthreadpool_pipe.h"
#include "lib/util/iov_buf.h"
#include "lib/util/msghdr.h"
#include <fcntl.h>
@@ -69,7 +69,7 @@ struct unix_dgram_ctx {
struct poll_watch *sock_read_watch;
struct unix_dgram_send_queue *send_queues;
- struct pthreadpool *send_pool;
+ struct pthreadpool_pipe *send_pool;
struct poll_watch *pool_read_watch;
uint8_t *recv_buf;
@@ -341,18 +341,18 @@ static int unix_dgram_init_pthreadpool(struct unix_dgram_ctx *ctx)
return 0;
}
- ret = pthreadpool_init(0, &ctx->send_pool);
+ ret = pthreadpool_pipe_init(0, &ctx->send_pool);
if (ret != 0) {
return ret;
}
- signalfd = pthreadpool_signal_fd(ctx->send_pool);
+ signalfd = pthreadpool_pipe_signal_fd(ctx->send_pool);
ctx->pool_read_watch = ctx->ev_funcs->watch_new(
ctx->ev_funcs, signalfd, POLLIN,
unix_dgram_job_finished, ctx);
if (ctx->pool_read_watch == NULL) {
- pthreadpool_destroy(ctx->send_pool);
+ pthreadpool_pipe_destroy(ctx->send_pool);
ctx->send_pool = NULL;
return ENOMEM;
}
@@ -525,7 +525,7 @@ static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
struct unix_dgram_msg *msg;
int ret, job;
- ret = pthreadpool_finished_jobs(ctx->send_pool, &job, 1);
+ ret = pthreadpool_pipe_finished_jobs(ctx->send_pool, &job, 1);
if (ret != 1) {
return;
}
@@ -547,8 +547,8 @@ static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
free(msg);
if (q->msgs != NULL) {
- ret = pthreadpool_add_job(ctx->send_pool, q->sock,
- unix_dgram_send_job, q->msgs);
+ ret = pthreadpool_pipe_add_job(ctx->send_pool, q->sock,
+ unix_dgram_send_job, q->msgs);
if (ret == 0) {
return;
}
@@ -654,8 +654,8 @@ static int unix_dgram_send(struct unix_dgram_ctx *ctx,
unix_dgram_send_queue_free(q);
return ret;
}
- ret = pthreadpool_add_job(ctx->send_pool, q->sock,
- unix_dgram_send_job, q->msgs);
+ ret = pthreadpool_pipe_add_job(ctx->send_pool, q->sock,
+ unix_dgram_send_job, q->msgs);
if (ret != 0) {
unix_dgram_send_queue_free(q);
return ret;
@@ -675,7 +675,7 @@ static int unix_dgram_free(struct unix_dgram_ctx *ctx)
}
if (ctx->send_pool != NULL) {
- int ret = pthreadpool_destroy(ctx->send_pool);
+ int ret = pthreadpool_pipe_destroy(ctx->send_pool);
if (ret != 0) {
return ret;
}
diff --git a/source3/modules/vfs_aio_pthread.c b/source3/modules/vfs_aio_pthread.c
index 7037b63..6edf250 100644
--- a/source3/modules/vfs_aio_pthread.c
+++ b/source3/modules/vfs_aio_pthread.c
@@ -26,7 +26,7 @@
#include "system/shmem.h"
#include "smbd/smbd.h"
#include "smbd/globals.h"
-#include "lib/pthreadpool/pthreadpool.h"
+#include "lib/pthreadpool/pthreadpool_pipe.h"
#ifdef HAVE_LINUX_FALLOC_H
#include <linux/falloc.h>
#endif
@@ -38,7 +38,7 @@
***********************************************************************/
static bool init_aio_threadpool(struct tevent_context *ev_ctx,
- struct pthreadpool **pp_pool,
+ struct pthreadpool_pipe **pp_pool,
void (*completion_fn)(struct tevent_context *,
struct tevent_fd *,
uint16_t,
@@ -51,19 +51,19 @@ static bool init_aio_threadpool(struct tevent_context *ev_ctx,
return true;
}
- ret = pthreadpool_init(lp_aio_max_threads(), pp_pool);
+ ret = pthreadpool_pipe_init(lp_aio_max_threads(), pp_pool);
if (ret) {
errno = ret;
return false;
}
sock_event = tevent_add_fd(ev_ctx,
NULL,
- pthreadpool_signal_fd(*pp_pool),
+ pthreadpool_pipe_signal_fd(*pp_pool),
TEVENT_FD_READ,
completion_fn,
NULL);
if (sock_event == NULL) {
- pthreadpool_destroy(*pp_pool);
+ pthreadpool_pipe_destroy(*pp_pool);
*pp_pool = NULL;
return false;
}
@@ -87,7 +87,7 @@ static bool init_aio_threadpool(struct tevent_context *ev_ctx,
* process, as is the current jobid.
*/
-static struct pthreadpool *open_pool;
+static struct pthreadpool_pipe *open_pool;
static int aio_pthread_open_jobid;
struct aio_open_private_data {
@@ -167,7 +167,7 @@ static void aio_open_handle_completion(struct tevent_context *event_ctx,
return;
}
- ret = pthreadpool_finished_jobs(open_pool, &jobid, 1);
+ ret = pthreadpool_pipe_finished_jobs(open_pool, &jobid, 1);
if (ret != 1) {
smb_panic("aio_open_handle_completion");
/* notreached. */
@@ -368,10 +368,10 @@ static int open_async(const files_struct *fsp,
return -1;
}
- ret = pthreadpool_add_job(open_pool,
- opd->jobid,
- aio_open_worker,
- (void *)opd);
+ ret = pthreadpool_pipe_add_job(open_pool,
+ opd->jobid,
+ aio_open_worker,
+ (void *)opd);
if (ret) {
errno = ret;
return -1;
diff --git a/source3/torture/bench_pthreadpool.c b/source3/torture/bench_pthreadpool.c
index 247063d..82a84cf 100644
--- a/source3/torture/bench_pthreadpool.c
+++ b/source3/torture/bench_pthreadpool.c
@@ -19,7 +19,7 @@
*/
#include "includes.h"
-#include "lib/pthreadpool/pthreadpool.h"
+#include "lib/pthreadpool/pthreadpool_pipe.h"
#include "proto.h"
extern int torture_numops;
@@ -31,12 +31,12 @@ static void null_job(void *private_data)
bool run_bench_pthreadpool(int dummy)
{
- struct pthreadpool *pool;
+ struct pthreadpool_pipe *pool;
int i, ret;
- ret = pthreadpool_init(1, &pool);
+ ret = pthreadpool_pipe_init(1, &pool);
if (ret != 0) {
- d_fprintf(stderr, "pthreadpool_init failed: %s\n",
+ d_fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
strerror(ret));
return false;
}
@@ -44,21 +44,21 @@ bool run_bench_pthreadpool(int dummy)
for (i=0; i<torture_numops; i++) {
int jobid;
- ret = pthreadpool_add_job(pool, 0, null_job, NULL);
+ ret = pthreadpool_pipe_add_job(pool, 0, null_job, NULL);
if (ret != 0) {
- d_fprintf(stderr, "pthreadpool_add_job failed: %s\n",
- strerror(ret));
+ d_fprintf(stderr, "pthreadpool_pipe_add_job "
+ "failed: %s\n", strerror(ret));
break;
}
- ret = pthreadpool_finished_jobs(pool, &jobid, 1);
+ ret = pthreadpool_pipe_finished_jobs(pool, &jobid, 1);
if (ret < 0) {
- d_fprintf(stderr, "pthreadpool_finished_job failed: %s\n",
- strerror(-ret));
+ d_fprintf(stderr, "pthreadpool_pipe_finished_job "
+ "failed: %s\n", strerror(-ret));
break;
}
}
- pthreadpool_destroy(pool);
+ pthreadpool_pipe_destroy(pool);
return (ret == 1);
}
--
2.1.4
>From 1dec4e67d3c6d11da3353769aad4d0210bd6f331 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 15 Aug 2016 13:59:12 +0200
Subject: [PATCH 14/23] lib: Move pipe signalling to pthreadpool_pipe.c
Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
source3/lib/pthreadpool/pthreadpool.c | 82 +++-------------
source3/lib/pthreadpool/pthreadpool.h | 34 +------
source3/lib/pthreadpool/pthreadpool_pipe.c | 116 ++++++++++++++++++++---
source3/lib/pthreadpool/pthreadpool_sync.c | 144 +++--------------------------
4 files changed, 134 insertions(+), 242 deletions(-)
diff --git a/source3/lib/pthreadpool/pthreadpool.c b/source3/lib/pthreadpool/pthreadpool.c
index b071e53..4c2858a 100644
--- a/source3/lib/pthreadpool/pthreadpool.c
+++ b/source3/lib/pthreadpool/pthreadpool.c
@@ -19,7 +19,6 @@
#include "replace.h"
#include "system/time.h"
-#include "system/filesys.h"
#include "system/wait.h"
#include "system/threads.h"
#include "pthreadpool.h"
@@ -58,9 +57,10 @@ struct pthreadpool {
size_t num_jobs;
/*
- * pipe for signalling
+ * Indicate job completion
*/
- int sig_pipe[2];
+ int (*signal_fn)(int jobid, void *private_data);
+ void *signal_private_data;
/*
* indicator to worker threads that they should shut down
@@ -99,7 +99,9 @@ static void pthreadpool_prep_atfork(void);
* Initialize a thread pool
*/
-int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
+int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
+ int (*signal_fn)(int jobid, void *private_data),
+ void *signal_private_data)
{
struct pthreadpool *pool;
int ret;
@@ -108,6 +110,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
if (pool == NULL) {
return ENOMEM;
}
+ pool->signal_fn = signal_fn;
+ pool->signal_private_data = signal_private_data;
pool->jobs_array_len = 4;
pool->jobs = calloc(
@@ -120,18 +124,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
pool->head = pool->num_jobs = 0;
- ret = pipe(pool->sig_pipe);
- if (ret == -1) {
- int err = errno;
- free(pool->jobs);
- free(pool);
- return err;
- }
-
ret = pthread_mutex_init(&pool->mutex, NULL);
if (ret != 0) {
- close(pool->sig_pipe[0]);
- close(pool->sig_pipe[1]);
free(pool->jobs);
free(pool);
return ret;
@@ -140,8 +134,6 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
ret = pthread_cond_init(&pool->condvar, NULL);
if (ret != 0) {
pthread_mutex_destroy(&pool->mutex);
- close(pool->sig_pipe[0]);
- close(pool->sig_pipe[1]);
free(pool->jobs);
free(pool);
return ret;
@@ -158,8 +150,6 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
if (ret != 0) {
pthread_cond_destroy(&pool->condvar);
pthread_mutex_destroy(&pool->mutex);
- close(pool->sig_pipe[0]);
- close(pool->sig_pipe[1]);
free(pool->jobs);
free(pool);
return ret;
@@ -218,12 +208,6 @@ static void pthreadpool_child(void)
pool != NULL;
pool = DLIST_PREV(pool)) {
- close(pool->sig_pipe[0]);
- close(pool->sig_pipe[1]);
-
- ret = pipe(pool->sig_pipe);
- assert(ret == 0);
-
pool->num_threads = 0;
pool->num_exited = 0;
@@ -249,16 +233,6 @@ static void pthreadpool_prep_atfork(void)
}
/*
- * Return the file descriptor which becomes readable when a job has
- * finished
- */
-
-int pthreadpool_signal_fd(struct pthreadpool *pool)
-{
- return pool->sig_pipe[0];
-}
-
-/*
* Do a pthread_join() on all children that have exited, pool->mutex must be
* locked
*/
@@ -287,32 +261,6 @@ static void pthreadpool_join_children(struct pthreadpool *pool)
}
/*
- * Fetch a finished job number from the signal pipe
- */
-
-int pthreadpool_finished_jobs(struct pthreadpool *pool, int *jobids,
- unsigned num_jobids)
-{
- ssize_t to_read, nread;
-
- nread = -1;
- errno = EINTR;
-
- to_read = sizeof(int) * num_jobids;
-
- while ((nread == -1) && (errno == EINTR)) {
- nread = read(pool->sig_pipe[0], jobids, to_read);
- }
- if (nread == -1) {
- return -errno;
- }
- if ((nread % sizeof(int)) != 0) {
- return -EINVAL;
- }
- return nread / sizeof(int);
-}
-
-/*
* Destroy a thread pool, finishing all threads working for it
*/
@@ -390,12 +338,6 @@ int pthreadpool_destroy(struct pthreadpool *pool)
ret = pthread_mutex_unlock(&pthreadpools_mutex);
assert(ret == 0);
- close(pool->sig_pipe[0]);
- pool->sig_pipe[0] = -1;
-
- close(pool->sig_pipe[1]);
- pool->sig_pipe[1] = -1;
-
free(pool->exited);
free(pool->jobs);
free(pool);
@@ -527,8 +469,7 @@ static void *pthreadpool_server(void *arg)
}
if (pthreadpool_get_job(pool, &job)) {
- ssize_t written;
- int sig_pipe = pool->sig_pipe[1];
+ int ret;
/*
* Do the work with the mutex unlocked
@@ -542,8 +483,9 @@ static void *pthreadpool_server(void *arg)
res = pthread_mutex_lock(&pool->mutex);
assert(res == 0);
- written = write(sig_pipe, &job.id, sizeof(job.id));
- if (written != sizeof(int)) {
+ ret = pool->signal_fn(job.id,
+ pool->signal_private_data);
+ if (ret != 0) {
pthreadpool_server_exit(pool);
pthread_mutex_unlock(&pool->mutex);
return NULL;
diff --git a/source3/lib/pthreadpool/pthreadpool.h b/source3/lib/pthreadpool/pthreadpool.h
index adb825a..0b8d6e5 100644
--- a/source3/lib/pthreadpool/pthreadpool.h
+++ b/source3/lib/pthreadpool/pthreadpool.h
@@ -43,7 +43,9 @@ struct pthreadpool;
* max_threads=0 means unlimited parallelism. The caller has to take
* care to not overload the system.
*/
-int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult);
+int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
+ int (*signal_fn)(int jobid, void *private_data),
+ void *signal_private_data);
/**
* @brief Destroy a pthreadpool
@@ -60,8 +62,8 @@ int pthreadpool_destroy(struct pthreadpool *pool);
* @brief Add a job to a pthreadpool
*
* This adds a job to a pthreadpool. The job can be identified by
- * job_id. This integer will be returned from
- * pthreadpool_finished_jobs() then the job is completed.
+ * job_id. This integer will be passed to signal_fn() when the
+ * job is completed.
*
* @param[in] pool The pool to run the job on
* @param[in] job_id A custom identifier
@@ -72,30 +74,4 @@ int pthreadpool_destroy(struct pthreadpool *pool);
int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
void (*fn)(void *private_data), void *private_data);
-/**
- * @brief Get the signalling fd from a pthreadpool
- *
- * Completion of a job is indicated by readability of the fd returned
- * by pthreadpool_signal_fd().
- *
- * @param[in] pool The pool in question
- * @return The fd to listen on for readability
- */
-int pthreadpool_signal_fd(struct pthreadpool *pool);
-
-/**
- * @brief Get the job_ids of finished jobs
- *
- * This blocks until a job has finished unless the fd returned by
- * pthreadpool_signal_fd() is readable.
- *
- * @param[in] pool The pool to query for finished jobs
- * @param[out] jobids The job_ids of the finished job
- * @param[int] num_jobids The job_ids array size
- * @return success: >=0, number of finished jobs
- * failure: -errno
- */
-int pthreadpool_finished_jobs(struct pthreadpool *pool, int *jobids,
- unsigned num_jobids);
-
#endif
diff --git a/source3/lib/pthreadpool/pthreadpool_pipe.c b/source3/lib/pthreadpool/pthreadpool_pipe.c
index 76bafa2..3eaf5e3 100644
--- a/source3/lib/pthreadpool/pthreadpool_pipe.c
+++ b/source3/lib/pthreadpool/pthreadpool_pipe.c
@@ -24,26 +24,57 @@
struct pthreadpool_pipe {
struct pthreadpool *pool;
+ pid_t pid;
+ int pipe_fds[2];
};
+static int pthreadpool_pipe_signal(int jobid, void *private_data);
+
int pthreadpool_pipe_init(unsigned max_threads,
struct pthreadpool_pipe **presult)
{
- struct pthreadpool_pipe *p;
+ struct pthreadpool_pipe *pool;
int ret;
- p = malloc(sizeof(struct pthreadpool_pipe));
- if (p == NULL) {
+ pool = malloc(sizeof(struct pthreadpool_pipe));
+ if (pool == NULL) {
return ENOMEM;
}
+ pool->pid = getpid();
+
+ ret = pipe(pool->pipe_fds);
+ if (ret == -1) {
+ int err = errno;
+ free(pool);
+ return err;
+ }
- ret = pthreadpool_init(max_threads, &p->pool);
+ ret = pthreadpool_init(max_threads, &pool->pool,
+ pthreadpool_pipe_signal, pool);
if (ret != 0) {
- free(p);
+ close(pool->pipe_fds[0]);
+ close(pool->pipe_fds[1]);
+ free(pool);
return ret;
}
- *presult = p;
+ *presult = pool;
+ return 0;
+}
+
+static int pthreadpool_pipe_signal(int jobid, void *private_data)
+{
+ struct pthreadpool_pipe *pool = private_data;
+ ssize_t written;
+
+ do {
+ written = write(pool->pipe_fds[1], &jobid, sizeof(jobid));
+ } while ((written == -1) && (errno == EINTR));
+
+ if (written != sizeof(jobid)) {
+ return errno;
+ }
+
return 0;
}
@@ -55,30 +86,91 @@ int pthreadpool_pipe_destroy(struct pthreadpool_pipe *pool)
if (ret != 0) {
return ret;
}
+
+ close(pool->pipe_fds[0]);
+ pool->pipe_fds[0] = -1;
+
+ close(pool->pipe_fds[1]);
+ pool->pipe_fds[1] = -1;
+
free(pool);
return 0;
}
+static int pthreadpool_pipe_reinit(struct pthreadpool_pipe *pool)
+{
+ pid_t pid = getpid();
+ int signal_fd;
+ int ret;
+
+ if (pid == pool->pid) {
+ return 0;
+ }
+
+ signal_fd = pool->pipe_fds[0];
+
+ close(pool->pipe_fds[0]);
+ pool->pipe_fds[0] = -1;
+
+ close(pool->pipe_fds[1]);
+ pool->pipe_fds[1] = -1;
+
+ ret = pipe(pool->pipe_fds);
+ if (ret != 0) {
+ return errno;
+ }
+
+ ret = dup2(pool->pipe_fds[0], signal_fd);
+ if (ret != 0) {
+ return errno;
+ }
+
+ pool->pipe_fds[0] = signal_fd;
+
+ return 0;
+}
+
int pthreadpool_pipe_add_job(struct pthreadpool_pipe *pool, int job_id,
void (*fn)(void *private_data),
void *private_data)
{
int ret;
+
+ ret = pthreadpool_pipe_reinit(pool);
+ if (ret != 0) {
+ return ret;
+ }
+
ret = pthreadpool_add_job(pool->pool, job_id, fn, private_data);
return ret;
}
int pthreadpool_pipe_signal_fd(struct pthreadpool_pipe *pool)
{
- int fd;
- fd = pthreadpool_signal_fd(pool->pool);
- return fd;
+ return pool->pipe_fds[0];
}
int pthreadpool_pipe_finished_jobs(struct pthreadpool_pipe *pool, int *jobids,
unsigned num_jobids)
{
- int ret;
- ret = pthreadpool_finished_jobs(pool->pool, jobids, num_jobids);
- return ret;
+ ssize_t to_read, nread;
+ pid_t pid = getpid();
+
+ if (pool->pid != pid) {
+ return EINVAL;
+ }
+
+ to_read = sizeof(int) * num_jobids;
+
+ do {
+ nread = read(pool->pipe_fds[0], jobids, to_read);
+ } while ((nread == -1) && (errno == EINTR));
+
+ if (nread == -1) {
+ return -errno;
+ }
+ if ((nread % sizeof(int)) != 0) {
+ return -EINVAL;
+ }
+ return nread / sizeof(int);
}
diff --git a/source3/lib/pthreadpool/pthreadpool_sync.c b/source3/lib/pthreadpool/pthreadpool_sync.c
index 5f06cae..3e78f46 100644
--- a/source3/lib/pthreadpool/pthreadpool_sync.c
+++ b/source3/lib/pthreadpool/pthreadpool_sync.c
@@ -17,165 +17,47 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-#include <errno.h>
-#include <stdio.h>
-#include <unistd.h>
-#include <stdlib.h>
-#include <string.h>
-#include <signal.h>
-#include <assert.h>
-#include <fcntl.h>
-#include <sys/time.h>
+#include "replace.h"
#include "pthreadpool.h"
struct pthreadpool {
/*
- * pipe for signalling
+ * Indicate job completion
*/
- int sig_pipe[2];
-
- /*
- * Have we sent something into the pipe that has not been
- * retrieved yet?
- */
- int pipe_busy;
-
- /*
- * Jobids that we have not sent into the pipe yet
- */
- size_t num_ids;
- int *ids;
+ int (*signal_fn)(int jobid,
+ void *private_data);
+ void *signal_private_data;
};
-int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
+int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
+ int (*signal_fn)(int jobid,
+ void *private_data),
+ void *signal_private_data)
{
struct pthreadpool *pool;
- int ret;
pool = (struct pthreadpool *)calloc(1, sizeof(struct pthreadpool));
if (pool == NULL) {
return ENOMEM;
}
- ret = pipe(pool->sig_pipe);
- if (ret == -1) {
- int err = errno;
- free(pool);
- return err;
- }
- *presult = pool;
- return 0;
-}
-
-int pthreadpool_signal_fd(struct pthreadpool *pool)
-{
- return pool->sig_pipe[0];
-}
-
-static int pthreadpool_write_to_pipe(struct pthreadpool *pool)
-{
- ssize_t written;
-
- if (pool->pipe_busy) {
- return 0;
- }
- if (pool->num_ids == 0) {
- return 0;
- }
+ pool->signal_fn = signal_fn;
+ pool->signal_private_data = signal_private_data;
- written = -1;
- errno = EINTR;
-
- while ((written == -1) && (errno == EINTR)) {
- written = write(pool->sig_pipe[1], &pool->ids[0], sizeof(int));
- }
- if (written == -1) {
- return errno;
- }
- if (written != sizeof(int)) {
- /*
- * If a single int only partially fits into the pipe,
- * we can assume ourselves pretty broken
- */
- close(pool->sig_pipe[1]);
- pool->sig_pipe[1] = -1;
- return EIO;
- }
-
- if (pool->num_ids > 1) {
- memmove(pool->ids, pool->ids+1, sizeof(int) * (pool->num_ids-1));
- }
- pool->num_ids -= 1;
- pool->pipe_busy = 1;
+ *presult = pool;
return 0;
}
int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
void (*fn)(void *private_data), void *private_data)
{
- int *tmp;
-
- if (pool->sig_pipe[1] == -1) {
- return EIO;
- }
-
fn(private_data);
- tmp = realloc(pool->ids, sizeof(int) * (pool->num_ids+1));
- if (tmp == NULL) {
- return ENOMEM;
- }
- pool->ids = tmp;
- pool->ids[pool->num_ids] = job_id;
- pool->num_ids += 1;
-
- return pthreadpool_write_to_pipe(pool);
-
-}
-
-int pthreadpool_finished_jobs(struct pthreadpool *pool, int *jobids,
- unsigned num_jobids)
-{
- ssize_t to_read, nread;
- int ret;
-
- nread = -1;
- errno = EINTR;
-
- to_read = sizeof(int) * num_jobids;
-
- while ((nread == -1) && (errno == EINTR)) {
- nread = read(pool->sig_pipe[0], jobids, to_read);
- }
- if (nread == -1) {
- return -errno;
- }
- if ((nread % sizeof(int)) != 0) {
- return -EINVAL;
- }
-
- pool->pipe_busy = 0;
-
- ret = pthreadpool_write_to_pipe(pool);
- if (ret != 0) {
- return -ret;
- }
-
- return nread / sizeof(int);
+ return pool->signal_fn(job_id, pool->signal_private_data);
}
int pthreadpool_destroy(struct pthreadpool *pool)
{
- if (pool->sig_pipe[0] != -1) {
- close(pool->sig_pipe[0]);
- pool->sig_pipe[0] = -1;
- }
-
- if (pool->sig_pipe[1] != -1) {
- close(pool->sig_pipe[1]);
- pool->sig_pipe[1] = -1;
- }
- free(pool->ids);
free(pool);
return 0;
}
--
2.1.4
>From dc446e50c8ec336f514bc4ac3e378144c0784c21 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Sun, 31 Jul 2016 08:57:35 +0200
Subject: [PATCH 15/23] lib: add job data to to callback
The pthreadpool_tevent wrapper will need this
Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
source3/lib/pthreadpool/pthreadpool.c | 19 +++++++++++++------
source3/lib/pthreadpool/pthreadpool.h | 7 +++++--
source3/lib/pthreadpool/pthreadpool_pipe.c | 10 ++++++++--
source3/lib/pthreadpool/pthreadpool_sync.c | 13 +++++++++----
4 files changed, 35 insertions(+), 14 deletions(-)
diff --git a/source3/lib/pthreadpool/pthreadpool.c b/source3/lib/pthreadpool/pthreadpool.c
index 4c2858a..fc21d43 100644
--- a/source3/lib/pthreadpool/pthreadpool.c
+++ b/source3/lib/pthreadpool/pthreadpool.c
@@ -59,8 +59,11 @@ struct pthreadpool {
/*
* Indicate job completion
*/
- int (*signal_fn)(int jobid, void *private_data);
- void *signal_private_data;
+ int (*signal_fn)(int jobid,
+ void (*job_fn)(void *private_data),
+ void *job_fn_private_data,
+ void *private_data);
+ void *signal_fn_private_data;
/*
* indicator to worker threads that they should shut down
@@ -100,8 +103,11 @@ static void pthreadpool_prep_atfork(void);
*/
int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
- int (*signal_fn)(int jobid, void *private_data),
- void *signal_private_data)
+ int (*signal_fn)(int jobid,
+ void (*job_fn)(void *private_data),
+ void *job_fn_private_data,
+ void *private_data),
+ void *signal_fn_private_data)
{
struct pthreadpool *pool;
int ret;
@@ -111,7 +117,7 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
return ENOMEM;
}
pool->signal_fn = signal_fn;
- pool->signal_private_data = signal_private_data;
+ pool->signal_fn_private_data = signal_fn_private_data;
pool->jobs_array_len = 4;
pool->jobs = calloc(
@@ -484,7 +490,8 @@ static void *pthreadpool_server(void *arg)
assert(res == 0);
ret = pool->signal_fn(job.id,
- pool->signal_private_data);
+ job.fn, job.private_data,
+ pool->signal_fn_private_data);
if (ret != 0) {
pthreadpool_server_exit(pool);
pthread_mutex_unlock(&pool->mutex);
diff --git a/source3/lib/pthreadpool/pthreadpool.h b/source3/lib/pthreadpool/pthreadpool.h
index 0b8d6e5..ee9d957 100644
--- a/source3/lib/pthreadpool/pthreadpool.h
+++ b/source3/lib/pthreadpool/pthreadpool.h
@@ -44,8 +44,11 @@ struct pthreadpool;
* care to not overload the system.
*/
int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
- int (*signal_fn)(int jobid, void *private_data),
- void *signal_private_data);
+ int (*signal_fn)(int jobid,
+ void (*job_fn)(void *private_data),
+ void *job_fn_private_data,
+ void *private_data),
+ void *signal_fn_private_data);
/**
* @brief Destroy a pthreadpool
diff --git a/source3/lib/pthreadpool/pthreadpool_pipe.c b/source3/lib/pthreadpool/pthreadpool_pipe.c
index 3eaf5e3..f7995ab 100644
--- a/source3/lib/pthreadpool/pthreadpool_pipe.c
+++ b/source3/lib/pthreadpool/pthreadpool_pipe.c
@@ -28,7 +28,10 @@ struct pthreadpool_pipe {
int pipe_fds[2];
};
-static int pthreadpool_pipe_signal(int jobid, void *private_data);
+static int pthreadpool_pipe_signal(int jobid,
+ void (*job_fn)(void *private_data),
+ void *job_private_data,
+ void *private_data);
int pthreadpool_pipe_init(unsigned max_threads,
struct pthreadpool_pipe **presult)
@@ -62,7 +65,10 @@ int pthreadpool_pipe_init(unsigned max_threads,
return 0;
}
-static int pthreadpool_pipe_signal(int jobid, void *private_data)
+static int pthreadpool_pipe_signal(int jobid,
+ void (*job_fn)(void *private_data),
+ void *job_private_data,
+ void *private_data)
{
struct pthreadpool_pipe *pool = private_data;
ssize_t written;
diff --git a/source3/lib/pthreadpool/pthreadpool_sync.c b/source3/lib/pthreadpool/pthreadpool_sync.c
index 3e78f46..d9a95f5 100644
--- a/source3/lib/pthreadpool/pthreadpool_sync.c
+++ b/source3/lib/pthreadpool/pthreadpool_sync.c
@@ -26,14 +26,18 @@ struct pthreadpool {
* Indicate job completion
*/
int (*signal_fn)(int jobid,
+ void (*job_fn)(void *private_data),
+ void *job_fn_private_data,
void *private_data);
- void *signal_private_data;
+ void *signal_fn_private_data;
};
int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
int (*signal_fn)(int jobid,
+ void (*job_fn)(void *private_data),
+ void *job_fn_private_data,
void *private_data),
- void *signal_private_data)
+ void *signal_fn_private_data)
{
struct pthreadpool *pool;
@@ -42,7 +46,7 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
return ENOMEM;
}
pool->signal_fn = signal_fn;
- pool->signal_private_data = signal_private_data;
+ pool->signal_fn_private_data = signal_fn_private_data;
*presult = pool;
return 0;
@@ -53,7 +57,8 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
{
fn(private_data);
- return pool->signal_fn(job_id, pool->signal_private_data);
+ return pool->signal_fn(job_id, fn, private_data,
+ pool->signal_fn_private_data);
}
int pthreadpool_destroy(struct pthreadpool *pool)
--
2.1.4
>From a4fd4f911c2d0373f230d2cde5adfcedbd3bcf61 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 8 Aug 2016 15:02:36 +0200
Subject: [PATCH 16/23] lib: Add pthreadpool_tevent
This is a replacement for fncall.[ch] without having to go through
a pipe for job completion signalling
Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
source3/lib/pthreadpool/pthreadpool_tevent.c | 240 +++++++++++++++++++++++++++
source3/lib/pthreadpool/pthreadpool_tevent.h | 37 +++++
source3/lib/pthreadpool/wscript_build | 14 +-
3 files changed, 287 insertions(+), 4 deletions(-)
create mode 100644 source3/lib/pthreadpool/pthreadpool_tevent.c
create mode 100644 source3/lib/pthreadpool/pthreadpool_tevent.h
diff --git a/source3/lib/pthreadpool/pthreadpool_tevent.c b/source3/lib/pthreadpool/pthreadpool_tevent.c
new file mode 100644
index 0000000..02a7f2f
--- /dev/null
+++ b/source3/lib/pthreadpool/pthreadpool_tevent.c
@@ -0,0 +1,240 @@
+/*
+ * Unix SMB/CIFS implementation.
+ * threadpool implementation based on pthreads
+ * Copyright (C) Volker Lendecke 2009,2011
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "replace.h"
+#include "pthreadpool_tevent.h"
+#include "pthreadpool.h"
+#include "lib/util/tevent_unix.h"
+#include "lib/util/dlinklist.h"
+
+struct pthreadpool_tevent_job_state;
+
+struct pthreadpool_tevent {
+ struct pthreadpool *pool;
+
+ struct pthreadpool_tevent_job_state *jobs;
+};
+
+static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool);
+
+static int pthreadpool_tevent_job_signal(int jobid,
+ void (*job_fn)(void *private_data),
+ void *job_private_data,
+ void *private_data);
+
+int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
+ struct pthreadpool_tevent **presult)
+{
+ struct pthreadpool_tevent *pool;
+ int ret;
+
+ pool = talloc_zero(mem_ctx, struct pthreadpool_tevent);
+ if (pool == NULL) {
+ return ENOMEM;
+ }
+
+ ret = pthreadpool_init(max_threads, &pool->pool,
+ pthreadpool_tevent_job_signal, pool);
+ if (ret != 0) {
+ TALLOC_FREE(pool);
+ return ret;
+ }
+
+ talloc_set_destructor(pool, pthreadpool_tevent_destructor);
+
+ *presult = pool;
+ return 0;
+}
+
+static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
+{
+ int ret;
+
+ ret = pthreadpool_destroy(pool->pool);
+ if (ret != 0) {
+ return ret;
+ }
+ pool->pool = NULL;
+
+ if (pool->jobs != NULL) {
+ abort();
+ }
+
+ return 0;
+}
+
+struct pthreadpool_tevent_job_state {
+ struct pthreadpool_tevent_job_state *prev, *next;
+ struct pthreadpool_tevent *pool;
+ struct tevent_context *ev;
+ struct tevent_threaded_context *tctx;
+ struct tevent_immediate *im;
+ struct tevent_req *req;
+
+ void (*fn)(void *private_data);
+ void *private_data;
+};
+
+static void pthreadpool_tevent_job_fn(void *private_data);
+static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
+ struct tevent_immediate *im,
+ void *private_data);
+
+static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job_state *state)
+{
+ if (state->pool == NULL) {
+ return 0;
+ }
+
+ /*
+ * We should never be called with state->req != NULL,
+ * state->pool must be cleared before the 2nd talloc_free().
+ */
+ if (state->req != NULL) {
+ abort();
+ }
+
+ /*
+ * We need to reparent to a long term context.
+ */
+ (void)talloc_reparent(state->req, state->pool, state);
+ state->req = NULL;
+ return -1;
+}
+
+struct tevent_req *pthreadpool_tevent_job_send(
+ TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+ struct pthreadpool_tevent *pool,
+ void (*fn)(void *private_data), void *private_data)
+{
+ struct tevent_req *req;
+ struct pthreadpool_tevent_job_state *state;
+ int ret;
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct pthreadpool_tevent_job_state);
+ if (req == NULL) {
+ return NULL;
+ }
+ state->pool = pool;
+ state->ev = ev;
+ state->req = req;
+ state->fn = fn;
+ state->private_data = private_data;
+
+ state->im = tevent_create_immediate(state);
+ if (tevent_req_nomem(state->im, req)) {
+ return tevent_req_post(req, ev);
+ }
+
+#ifdef HAVE_PTHREAD
+ state->tctx = tevent_threaded_context_create(state, ev);
+ if (state->tctx == NULL && errno == ENOSYS) {
+ /*
+ * Samba build with pthread support but
+ * tevent without???
+ */
+ tevent_req_error(req, ENOSYS);
+ return tevent_req_post(req, ev);
+ }
+ if (tevent_req_nomem(state->tctx, req)) {
+ return tevent_req_post(req, ev);
+ }
+#endif
+
+ ret = pthreadpool_add_job(pool->pool, 0,
+ pthreadpool_tevent_job_fn,
+ state);
+ if (tevent_req_error(req, ret)) {
+ return tevent_req_post(req, ev);
+ }
+
+ /*
+ * Once the job is scheduled, we need to protect
+ * our memory.
+ */
+ talloc_set_destructor(state, pthreadpool_tevent_job_destructor);
+
+ DLIST_ADD_END(pool->jobs, state);
+
+ return req;
+}
+
+static void pthreadpool_tevent_job_fn(void *private_data)
+{
+ struct pthreadpool_tevent_job_state *state = talloc_get_type_abort(
+ private_data, struct pthreadpool_tevent_job_state);
+ state->fn(state->private_data);
+}
+
+static int pthreadpool_tevent_job_signal(int jobid,
+ void (*job_fn)(void *private_data),
+ void *job_private_data,
+ void *private_data)
+{
+ struct pthreadpool_tevent_job_state *state = talloc_get_type_abort(
+ job_private_data, struct pthreadpool_tevent_job_state);
+
+ if (state->tctx != NULL) {
+ /* with HAVE_PTHREAD */
+ tevent_threaded_schedule_immediate(state->tctx, state->im,
+ pthreadpool_tevent_job_done,
+ state);
+ } else {
+ /* without HAVE_PTHREAD */
+ tevent_schedule_immediate(state->im, state->ev,
+ pthreadpool_tevent_job_done,
+ state);
+ }
+
+ return 0;
+}
+
+static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
+ struct tevent_immediate *im,
+ void *private_data)
+{
+ struct pthreadpool_tevent_job_state *state = talloc_get_type_abort(
+ private_data, struct pthreadpool_tevent_job_state);
+
+ DLIST_REMOVE(state->pool->jobs, state);
+ state->pool = NULL;
+
+ TALLOC_FREE(state->tctx);
+
+ if (state->req == NULL) {
+ /*
+ * There was a talloc_free() state->req
+ * while the job was pending,
+ * which mean we're reparented on a longterm
+ * talloc context.
+ *
+ * We just cleanup here...
+ */
+ talloc_free(state);
+ return;
+ }
+
+ tevent_req_done(state->req);
+}
+
+int pthreadpool_tevent_job_recv(struct tevent_req *req)
+{
+ return tevent_req_simple_recv_unix(req);
+}
diff --git a/source3/lib/pthreadpool/pthreadpool_tevent.h b/source3/lib/pthreadpool/pthreadpool_tevent.h
new file mode 100644
index 0000000..de74a34
--- /dev/null
+++ b/source3/lib/pthreadpool/pthreadpool_tevent.h
@@ -0,0 +1,37 @@
+/*
+ * Unix SMB/CIFS implementation.
+ * threadpool implementation based on pthreads
+ * Copyright (C) Volker Lendecke 2016
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef __PTHREADPOOL_TEVENT_H__
+#define __PTHREADPOOL_TEVENT_H__
+
+#include <tevent.h>
+
+struct pthreadpool_tevent;
+
+int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
+ struct pthreadpool_tevent **presult);
+
+struct tevent_req *pthreadpool_tevent_job_send(
+ TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+ struct pthreadpool_tevent *pool,
+ void (*fn)(void *private_data), void *private_data);
+
+int pthreadpool_tevent_job_recv(struct tevent_req *req);
+
+#endif
diff --git a/source3/lib/pthreadpool/wscript_build b/source3/lib/pthreadpool/wscript_build
index aa02850..8195af7 100644
--- a/source3/lib/pthreadpool/wscript_build
+++ b/source3/lib/pthreadpool/wscript_build
@@ -2,12 +2,18 @@
if bld.env.WITH_PTHREADPOOL:
bld.SAMBA3_SUBSYSTEM('PTHREADPOOL',
- source='pthreadpool.c pthreadpool_pipe.c',
- deps='pthread rt replace')
+ source='''pthreadpool.c
+ pthreadpool_pipe.c
+ pthreadpool_tevent.c
+ ''',
+ deps='pthread rt replace tevent-util')
else:
bld.SAMBA3_SUBSYSTEM('PTHREADPOOL',
- source='pthreadpool_sync.c pthreadpool_pipe.c',
- deps='replace')
+ source='''pthreadpool_sync.c
+ pthreadpool_pipe.c
+ pthreadpool_tevent.c
+ ''',
+ deps='replace tevent-util')
bld.SAMBA3_BINARY('pthreadpooltest',
--
2.1.4
>From 037ffd0b7740d40015bbe8c95aa329113d4861cb Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 8 Aug 2016 15:04:39 +0200
Subject: [PATCH 17/23] smbtorture3: Add LOCAL-PTHREADPOOL-TEVENT
Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
source3/selftest/tests.py | 1 +
source3/torture/proto.h | 1 +
source3/torture/test_pthreadpool_tevent.c | 82 +++++++++++++++++++++++++++++++
source3/torture/torture.c | 1 +
source3/wscript_build | 1 +
5 files changed, 86 insertions(+)
create mode 100644 source3/torture/test_pthreadpool_tevent.c
diff --git a/source3/selftest/tests.py b/source3/selftest/tests.py
index 0a0cb08..dbd4c58 100755
--- a/source3/selftest/tests.py
+++ b/source3/selftest/tests.py
@@ -114,6 +114,7 @@ local_tests = [
"LOCAL-MESSAGING-FDPASS2",
"LOCAL-MESSAGING-FDPASS2a",
"LOCAL-MESSAGING-FDPASS2b",
+ "LOCAL-PTHREADPOOL-TEVENT",
"LOCAL-hex_encode_buf",
"LOCAL-sprintf_append",
"LOCAL-remove_duplicate_addrs2"]
diff --git a/source3/torture/proto.h b/source3/torture/proto.h
index fc7c33f..7d2dedd 100644
--- a/source3/torture/proto.h
+++ b/source3/torture/proto.h
@@ -121,5 +121,6 @@ bool run_messaging_fdpass2(int dummy);
bool run_messaging_fdpass2a(int dummy);
bool run_messaging_fdpass2b(int dummy);
bool run_oplock_cancel(int dummy);
+bool run_pthreadpool_tevent(int dummy);
#endif /* __TORTURE_H__ */
diff --git a/source3/torture/test_pthreadpool_tevent.c b/source3/torture/test_pthreadpool_tevent.c
new file mode 100644
index 0000000..c90a394
--- /dev/null
+++ b/source3/torture/test_pthreadpool_tevent.c
@@ -0,0 +1,82 @@
+/*
+ * Unix SMB/CIFS implementation.
+ * Test pthreadpool_tevent
+ * Copyright (C) Volker Lendecke 2016
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "includes.h"
+#include "system/select.h"
+#include "proto.h"
+#include "lib/pthreadpool/pthreadpool_tevent.h"
+
+static void job_fn(void *private_data);
+
+bool run_pthreadpool_tevent(int dummy)
+{
+ struct tevent_context *ev;
+ struct pthreadpool_tevent *pool;
+ struct tevent_req *req;
+ int ret, val;
+ bool ok;
+
+ ev = tevent_context_init_byname(NULL, "poll");
+ if (ev == NULL) {
+ fprintf(stderr, "tevent_context_init failed\n");
+ return false;
+ }
+
+ ret = pthreadpool_tevent_init(ev, 100, &pool);
+ if (ret != 0) {
+ fprintf(stderr, "pthreadpool_tevent_init failed: %s\n",
+ strerror(ret));
+ return false;
+ }
+
+ val = -1;
+
+ req = pthreadpool_tevent_job_send(ev, ev, pool, job_fn, &val);
+ if (req == NULL) {
+ fprintf(stderr, "pthreadpool_tevent_job_send failed\n");
+ return false;
+ }
+
+ ok = tevent_req_poll(req, ev);
+ if (!ok) {
+ fprintf(stderr, "tevent_req_poll failed\n");
+ return false;
+ }
+
+ ret = pthreadpool_tevent_job_recv(req);
+ if (ret != 0) {
+ fprintf(stderr, "pthreadpool_tevent_job failed: %s\n",
+ strerror(ret));
+ return false;
+ }
+
+ printf("%d\n", val);
+
+ TALLOC_FREE(pool);
+ TALLOC_FREE(ev);
+ return true;
+}
+
+static void job_fn(void *private_data)
+{
+ int *pret = private_data;
+ *pret = 4711;
+
+ poll(NULL, 0, 100);
+}
diff --git a/source3/torture/torture.c b/source3/torture/torture.c
index f9766bb..06b919e 100644
--- a/source3/torture/torture.c
+++ b/source3/torture/torture.c
@@ -10534,6 +10534,7 @@ static struct {
{ "local-tdb-writer", run_local_tdb_writer, 0 },
{ "LOCAL-DBWRAP-CTDB", run_local_dbwrap_ctdb, 0 },
{ "LOCAL-BENCH-PTHREADPOOL", run_bench_pthreadpool, 0 },
+ { "LOCAL-PTHREADPOOL-TEVENT", run_pthreadpool_tevent, 0 },
{ "qpathinfo-bufsize", run_qpathinfo_bufsize, 0 },
{NULL, NULL, 0}};
diff --git a/source3/wscript_build b/source3/wscript_build
index edf921c..1d6f043 100755
--- a/source3/wscript_build
+++ b/source3/wscript_build
@@ -1288,6 +1288,7 @@ bld.SAMBA3_BINARY('smbtorture' + bld.env.suffix3,
torture/test_messaging_read.c
torture/test_messaging_fd_passing.c
torture/test_oplock_cancel.c
+ torture/test_pthreadpool_tevent.c
torture/t_strappend.c
torture/bench_pthreadpool.c
torture/wbc_async.c''',
--
2.1.4
>From 97c2484a2e83b66ce288c7f5d9182776eb92c597 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 8 Aug 2016 15:07:30 +0200
Subject: [PATCH 18/23] smbd: Add pthreadpool_tevent to smbd_server_connection
Prerequisite to convert the vfs _send/recv functions
Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
source3/modules/vfs_default.c | 15 +++++++++++++++
source3/smbd/globals.h | 4 ++++
2 files changed, 19 insertions(+)
diff --git a/source3/modules/vfs_default.c b/source3/modules/vfs_default.c
index 5227e95..8021319 100644
--- a/source3/modules/vfs_default.c
+++ b/source3/modules/vfs_default.c
@@ -33,6 +33,7 @@
#include "lib/asys/asys.h"
#include "lib/util/tevent_ntstatus.h"
#include "lib/util/sys_rw.h"
+#include "lib/pthreadpool/pthreadpool_tevent.h"
#undef DBGC_CLASS
#define DBGC_CLASS DBGC_VFS
@@ -750,6 +751,20 @@ fail:
return false;
}
+static int vfswrap_init_pool(struct smbd_server_connection *conn)
+{
+ int ret;
+
+ if (conn->pool != NULL) {
+ return 0;
+ }
+
+ ret = pthreadpool_tevent_init(conn, lp_aio_max_threads(),
+ &conn->pool);
+ return ret;
+}
+
+
struct vfswrap_asys_state {
struct asys_context *asys_ctx;
struct tevent_req *req;
diff --git a/source3/smbd/globals.h b/source3/smbd/globals.h
index 8ba564d..61d85cb 100644
--- a/source3/smbd/globals.h
+++ b/source3/smbd/globals.h
@@ -854,6 +854,8 @@ struct user_struct {
struct smbXsrv_session *session;
};
+struct pthreadpool_tevent;
+
struct smbd_server_connection {
const struct tsocket_address *local_address;
const struct tsocket_address *remote_address;
@@ -925,6 +927,8 @@ struct smbd_server_connection {
struct asys_context *asys_ctx;
struct tevent_fd *asys_fde;
+ struct pthreadpool_tevent *pool;
+
struct smbXsrv_client *client;
};
--
2.1.4
>From 5bc65cceca2c776e0a2506286d8f884ab3b9f8b2 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Sat, 6 Aug 2016 21:58:09 +0200
Subject: [PATCH 19/23] vfs: Convert vfs_pread_send to pthreadpool_tevent
Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
source3/modules/vfs_default.c | 106 +++++++++++++++++++++++++++++++++++++-----
1 file changed, 94 insertions(+), 12 deletions(-)
diff --git a/source3/modules/vfs_default.c b/source3/modules/vfs_default.c
index 8021319..4007395 100644
--- a/source3/modules/vfs_default.c
+++ b/source3/modules/vfs_default.c
@@ -780,6 +780,21 @@ static int vfswrap_asys_state_destructor(struct vfswrap_asys_state *s)
return 0;
}
+struct vfswrap_pread_state {
+ ssize_t ret;
+ int err;
+ int fd;
+ void *buf;
+ size_t count;
+ off_t offset;
+
+ struct vfs_aio_state vfs_aio_state;
+ SMBPROFILE_BYTES_ASYNC_STATE(profile_bytes);
+};
+
+static void vfs_pread_do(void *private_data);
+static void vfs_pread_done(struct tevent_req *subreq);
+
static struct tevent_req *vfswrap_pread_send(struct vfs_handle_struct *handle,
TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
@@ -787,33 +802,100 @@ static struct tevent_req *vfswrap_pread_send(struct vfs_handle_struct *handle,
void *data,
size_t n, off_t offset)
{
- struct tevent_req *req;
- struct vfswrap_asys_state *state;
+ struct tevent_req *req, *subreq;
+ struct vfswrap_pread_state *state;
int ret;
- req = tevent_req_create(mem_ctx, &state, struct vfswrap_asys_state);
+ req = tevent_req_create(mem_ctx, &state, struct vfswrap_pread_state);
if (req == NULL) {
return NULL;
}
- if (!vfswrap_init_asys_ctx(handle->conn->sconn)) {
- tevent_req_oom(req);
+
+ ret = vfswrap_init_pool(handle->conn->sconn);
+ if (tevent_req_error(req, ret)) {
return tevent_req_post(req, ev);
}
- state->asys_ctx = handle->conn->sconn->asys_ctx;
- state->req = req;
+
+ state->ret = -1;
+ state->fd = fsp->fh->fd;
+ state->buf = data;
+ state->count = n;
+ state->offset = offset;
SMBPROFILE_BYTES_ASYNC_START(syscall_asys_pread, profile_p,
state->profile_bytes, n);
- ret = asys_pread(state->asys_ctx, fsp->fh->fd, data, n, offset, req);
- if (ret != 0) {
- tevent_req_error(req, ret);
+ SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->profile_bytes);
+
+ subreq = pthreadpool_tevent_job_send(
+ state, ev, handle->conn->sconn->pool,
+ vfs_pread_do, state);
+ if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, ev);
}
- talloc_set_destructor(state, vfswrap_asys_state_destructor);
+ tevent_req_set_callback(subreq, vfs_pread_done, req);
return req;
}
+static void vfs_pread_do(void *private_data)
+{
+ struct vfswrap_pread_state *state = talloc_get_type_abort(
+ private_data, struct vfswrap_pread_state);
+ struct timespec start_time;
+ struct timespec end_time;
+
+ SMBPROFILE_BYTES_ASYNC_SET_BUSY(state->profile_bytes);
+
+ PROFILE_TIMESTAMP(&start_time);
+
+ do {
+ state->ret = pread(state->fd, state->buf, state->count,
+ state->offset);
+ } while ((state->ret == -1) && (errno == EINTR));
+
+ state->err = errno;
+
+ PROFILE_TIMESTAMP(&end_time);
+
+ state->vfs_aio_state.duration = nsec_time_diff(&end_time, &start_time);
+
+ SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->profile_bytes);
+}
+
+static void vfs_pread_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+#ifdef WITH_PROFILE
+ struct vfswrap_pread_state *state = tevent_req_data(
+ req, struct vfswrap_pread_state);
+#endif
+ int ret;
+
+ ret = pthreadpool_tevent_job_recv(subreq);
+ TALLOC_FREE(subreq);
+ SMBPROFILE_BYTES_ASYNC_END(state->profile_bytes);
+ if (tevent_req_error(req, ret)) {
+ return;
+ }
+
+ tevent_req_done(req);
+}
+
+static ssize_t vfswrap_pread_recv(struct tevent_req *req,
+ struct vfs_aio_state *vfs_aio_state)
+{
+ struct vfswrap_pread_state *state = tevent_req_data(
+ req, struct vfswrap_pread_state);
+
+ if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) {
+ return -1;
+ }
+
+ *vfs_aio_state = state->vfs_aio_state;
+ return state->ret;
+}
+
static struct tevent_req *vfswrap_pwrite_send(struct vfs_handle_struct *handle,
TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
@@ -2660,7 +2742,7 @@ static struct vfs_fn_pointers vfs_default_fns = {
.read_fn = vfswrap_read,
.pread_fn = vfswrap_pread,
.pread_send_fn = vfswrap_pread_send,
- .pread_recv_fn = vfswrap_asys_ssize_t_recv,
+ .pread_recv_fn = vfswrap_pread_recv,
.write_fn = vfswrap_write,
.pwrite_fn = vfswrap_pwrite,
.pwrite_send_fn = vfswrap_pwrite_send,
--
2.1.4
>From 1ee198f7dc46d8627842fee1e5bedddff721b2e9 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Sun, 7 Aug 2016 15:44:52 +0200
Subject: [PATCH 20/23] vfs: Convert vfs_write_send to pthreadpool_tevent
Signed-off-by: Volker Lendecke <vl at samba.org>
---
source3/modules/vfs_default.c | 106 +++++++++++++++++++++++++++++++++++++-----
1 file changed, 94 insertions(+), 12 deletions(-)
diff --git a/source3/modules/vfs_default.c b/source3/modules/vfs_default.c
index 4007395..8825e7b 100644
--- a/source3/modules/vfs_default.c
+++ b/source3/modules/vfs_default.c
@@ -896,6 +896,21 @@ static ssize_t vfswrap_pread_recv(struct tevent_req *req,
return state->ret;
}
+struct vfswrap_pwrite_state {
+ ssize_t ret;
+ int err;
+ int fd;
+ const void *buf;
+ size_t count;
+ off_t offset;
+
+ struct vfs_aio_state vfs_aio_state;
+ SMBPROFILE_BYTES_ASYNC_STATE(profile_bytes);
+};
+
+static void vfs_pwrite_do(void *private_data);
+static void vfs_pwrite_done(struct tevent_req *subreq);
+
static struct tevent_req *vfswrap_pwrite_send(struct vfs_handle_struct *handle,
TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
@@ -903,33 +918,100 @@ static struct tevent_req *vfswrap_pwrite_send(struct vfs_handle_struct *handle,
const void *data,
size_t n, off_t offset)
{
- struct tevent_req *req;
- struct vfswrap_asys_state *state;
+ struct tevent_req *req, *subreq;
+ struct vfswrap_pwrite_state *state;
int ret;
- req = tevent_req_create(mem_ctx, &state, struct vfswrap_asys_state);
+ req = tevent_req_create(mem_ctx, &state, struct vfswrap_pwrite_state);
if (req == NULL) {
return NULL;
}
- if (!vfswrap_init_asys_ctx(handle->conn->sconn)) {
- tevent_req_oom(req);
+
+ ret = vfswrap_init_pool(handle->conn->sconn);
+ if (tevent_req_error(req, ret)) {
return tevent_req_post(req, ev);
}
- state->asys_ctx = handle->conn->sconn->asys_ctx;
- state->req = req;
+
+ state->ret = -1;
+ state->fd = fsp->fh->fd;
+ state->buf = data;
+ state->count = n;
+ state->offset = offset;
SMBPROFILE_BYTES_ASYNC_START(syscall_asys_pwrite, profile_p,
state->profile_bytes, n);
- ret = asys_pwrite(state->asys_ctx, fsp->fh->fd, data, n, offset, req);
- if (ret != 0) {
- tevent_req_error(req, ret);
+ SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->profile_bytes);
+
+ subreq = pthreadpool_tevent_job_send(
+ state, ev, handle->conn->sconn->pool,
+ vfs_pwrite_do, state);
+ if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, ev);
}
- talloc_set_destructor(state, vfswrap_asys_state_destructor);
+ tevent_req_set_callback(subreq, vfs_pwrite_done, req);
return req;
}
+static void vfs_pwrite_do(void *private_data)
+{
+ struct vfswrap_pwrite_state *state = talloc_get_type_abort(
+ private_data, struct vfswrap_pwrite_state);
+ struct timespec start_time;
+ struct timespec end_time;
+
+ SMBPROFILE_BYTES_ASYNC_SET_BUSY(state->profile_bytes);
+
+ PROFILE_TIMESTAMP(&start_time);
+
+ do {
+ state->ret = pwrite(state->fd, state->buf, state->count,
+ state->offset);
+ } while ((state->ret == -1) && (errno == EINTR));
+
+ state->err = errno;
+
+ PROFILE_TIMESTAMP(&end_time);
+
+ state->vfs_aio_state.duration = nsec_time_diff(&end_time, &start_time);
+
+ SMBPROFILE_BYTES_ASYNC_SET_IDLE(state->profile_bytes);
+}
+
+static void vfs_pwrite_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+#ifdef WITH_PROFILE
+ struct vfswrap_pwrite_state *state = tevent_req_data(
+ req, struct vfswrap_pwrite_state);
+#endif
+ int ret;
+
+ ret = pthreadpool_tevent_job_recv(subreq);
+ TALLOC_FREE(subreq);
+ SMBPROFILE_BYTES_ASYNC_END(state->profile_bytes);
+ if (tevent_req_error(req, ret)) {
+ return;
+ }
+
+ tevent_req_done(req);
+}
+
+static ssize_t vfswrap_pwrite_recv(struct tevent_req *req,
+ struct vfs_aio_state *vfs_aio_state)
+{
+ struct vfswrap_pwrite_state *state = tevent_req_data(
+ req, struct vfswrap_pwrite_state);
+
+ if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) {
+ return -1;
+ }
+
+ *vfs_aio_state = state->vfs_aio_state;
+ return state->ret;
+}
+
static struct tevent_req *vfswrap_fsync_send(struct vfs_handle_struct *handle,
TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
@@ -2746,7 +2828,7 @@ static struct vfs_fn_pointers vfs_default_fns = {
.write_fn = vfswrap_write,
.pwrite_fn = vfswrap_pwrite,
.pwrite_send_fn = vfswrap_pwrite_send,
- .pwrite_recv_fn = vfswrap_asys_ssize_t_recv,
+ .pwrite_recv_fn = vfswrap_pwrite_recv,
.lseek_fn = vfswrap_lseek,
.sendfile_fn = vfswrap_sendfile,
.recvfile_fn = vfswrap_recvfile,
--
2.1.4
>From c4a65538eea3cc41b5a0b04f7e79d37293c22106 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Sun, 7 Aug 2016 15:53:12 +0200
Subject: [PATCH 21/23] vfs: Convert vfs_fsync_send to pthreadpool_tevent
Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
source3/modules/vfs_default.c | 93 +++++++++++++++++++++++++++++++++++++------
1 file changed, 81 insertions(+), 12 deletions(-)
diff --git a/source3/modules/vfs_default.c b/source3/modules/vfs_default.c
index 8825e7b..ea9f6ab 100644
--- a/source3/modules/vfs_default.c
+++ b/source3/modules/vfs_default.c
@@ -1012,38 +1012,107 @@ static ssize_t vfswrap_pwrite_recv(struct tevent_req *req,
return state->ret;
}
+struct vfswrap_fsync_state {
+ ssize_t ret;
+ int err;
+ int fd;
+
+ struct vfs_aio_state vfs_aio_state;
+ SMBPROFILE_BASIC_ASYNC_STATE(profile_basic);
+};
+
+static void vfs_fsync_do(void *private_data);
+static void vfs_fsync_done(struct tevent_req *subreq);
+
static struct tevent_req *vfswrap_fsync_send(struct vfs_handle_struct *handle,
TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct files_struct *fsp)
{
- struct tevent_req *req;
- struct vfswrap_asys_state *state;
+ struct tevent_req *req, *subreq;
+ struct vfswrap_fsync_state *state;
int ret;
- req = tevent_req_create(mem_ctx, &state, struct vfswrap_asys_state);
+ req = tevent_req_create(mem_ctx, &state, struct vfswrap_fsync_state);
if (req == NULL) {
return NULL;
}
- if (!vfswrap_init_asys_ctx(handle->conn->sconn)) {
- tevent_req_oom(req);
+
+ ret = vfswrap_init_pool(handle->conn->sconn);
+ if (tevent_req_error(req, ret)) {
return tevent_req_post(req, ev);
}
- state->asys_ctx = handle->conn->sconn->asys_ctx;
- state->req = req;
+
+ state->ret = -1;
+ state->fd = fsp->fh->fd;
SMBPROFILE_BASIC_ASYNC_START(syscall_asys_fsync, profile_p,
state->profile_basic);
- ret = asys_fsync(state->asys_ctx, fsp->fh->fd, req);
- if (ret != 0) {
- tevent_req_error(req, ret);
+
+ subreq = pthreadpool_tevent_job_send(
+ state, ev, handle->conn->sconn->pool, vfs_fsync_do, state);
+ if (tevent_req_nomem(subreq, req)) {
return tevent_req_post(req, ev);
}
- talloc_set_destructor(state, vfswrap_asys_state_destructor);
+ tevent_req_set_callback(subreq, vfs_fsync_done, req);
return req;
}
+static void vfs_fsync_do(void *private_data)
+{
+ struct vfswrap_fsync_state *state = talloc_get_type_abort(
+ private_data, struct vfswrap_fsync_state);
+ struct timespec start_time;
+ struct timespec end_time;
+
+ PROFILE_TIMESTAMP(&start_time);
+
+ do {
+ state->ret = fsync(state->fd);
+ } while ((state->ret == -1) && (errno == EINTR));
+
+ state->err = errno;
+
+ PROFILE_TIMESTAMP(&end_time);
+
+ state->vfs_aio_state.duration = nsec_time_diff(&end_time, &start_time);
+}
+
+static void vfs_fsync_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+#ifdef WITH_PROFILE
+ struct vfswrap_fsync_state *state = tevent_req_data(
+ req, struct vfswrap_fsync_state);
+#endif
+ int ret;
+
+ ret = pthreadpool_tevent_job_recv(subreq);
+ TALLOC_FREE(subreq);
+ SMBPROFILE_BASIC_ASYNC_END(state->profile_basic);
+ if (tevent_req_error(req, ret)) {
+ return;
+ }
+
+ tevent_req_done(req);
+}
+
+static int vfswrap_fsync_recv(struct tevent_req *req,
+ struct vfs_aio_state *vfs_aio_state)
+{
+ struct vfswrap_fsync_state *state = tevent_req_data(
+ req, struct vfswrap_fsync_state);
+
+ if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) {
+ return -1;
+ }
+
+ *vfs_aio_state = state->vfs_aio_state;
+ return state->ret;
+}
+
static void vfswrap_asys_finished(struct tevent_context *ev,
struct tevent_fd *fde,
uint16_t flags, void *p)
@@ -2835,7 +2904,7 @@ static struct vfs_fn_pointers vfs_default_fns = {
.rename_fn = vfswrap_rename,
.fsync_fn = vfswrap_fsync,
.fsync_send_fn = vfswrap_fsync_send,
- .fsync_recv_fn = vfswrap_asys_int_recv,
+ .fsync_recv_fn = vfswrap_fsync_recv,
.stat_fn = vfswrap_stat,
.fstat_fn = vfswrap_fstat,
.lstat_fn = vfswrap_lstat,
--
2.1.4
>From db5529cdce637b1c69499c87c371e18d9f62cb88 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Sun, 7 Aug 2016 15:59:10 +0200
Subject: [PATCH 22/23] vfs: Remove link to asys_
No longer needed after conversion to pthreadpool_tevent
Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
source3/modules/vfs_default.c | 131 ------------------------------------------
source3/smbd/globals.h | 6 --
2 files changed, 137 deletions(-)
diff --git a/source3/modules/vfs_default.c b/source3/modules/vfs_default.c
index ea9f6ab..53199b8 100644
--- a/source3/modules/vfs_default.c
+++ b/source3/modules/vfs_default.c
@@ -30,7 +30,6 @@
#include "source3/include/msdfs.h"
#include "librpc/gen_ndr/ndr_dfsblobs.h"
#include "lib/util/tevent_unix.h"
-#include "lib/asys/asys.h"
#include "lib/util/tevent_ntstatus.h"
#include "lib/util/sys_rw.h"
#include "lib/pthreadpool/pthreadpool_tevent.h"
@@ -706,51 +705,6 @@ static ssize_t vfswrap_pwrite(vfs_handle_struct *handle, files_struct *fsp, cons
return result;
}
-static void vfswrap_asys_finished(struct tevent_context *ev,
- struct tevent_fd *fde,
- uint16_t flags, void *p);
-
-static bool vfswrap_init_asys_ctx(struct smbd_server_connection *conn)
-{
- struct asys_context *ctx;
- struct tevent_fd *fde;
- int ret;
- int fd;
-
- if (conn->asys_ctx != NULL) {
- return true;
- }
-
- ret = asys_context_init(&ctx, lp_aio_max_threads());
- if (ret != 0) {
- DEBUG(1, ("asys_context_init failed: %s\n", strerror(ret)));
- return false;
- }
-
- fd = asys_signalfd(ctx);
-
- ret = set_blocking(fd, false);
- if (ret != 0) {
- DBG_WARNING("set_blocking failed: %s\n", strerror(errno));
- goto fail;
- }
-
- fde = tevent_add_fd(conn->ev_ctx, conn, fd, TEVENT_FD_READ,
- vfswrap_asys_finished, ctx);
- if (fde == NULL) {
- DEBUG(1, ("tevent_add_fd failed\n"));
- goto fail;
- }
-
- conn->asys_ctx = ctx;
- conn->asys_fde = fde;
- return true;
-
-fail:
- asys_context_destroy(ctx);
- return false;
-}
-
static int vfswrap_init_pool(struct smbd_server_connection *conn)
{
int ret;
@@ -764,22 +718,6 @@ static int vfswrap_init_pool(struct smbd_server_connection *conn)
return ret;
}
-
-struct vfswrap_asys_state {
- struct asys_context *asys_ctx;
- struct tevent_req *req;
- ssize_t ret;
- struct vfs_aio_state vfs_aio_state;
- SMBPROFILE_BASIC_ASYNC_STATE(profile_basic);
- SMBPROFILE_BYTES_ASYNC_STATE(profile_bytes);
-};
-
-static int vfswrap_asys_state_destructor(struct vfswrap_asys_state *s)
-{
- asys_cancel(s->asys_ctx, s->req);
- return 0;
-}
-
struct vfswrap_pread_state {
ssize_t ret;
int err;
@@ -1113,75 +1051,6 @@ static int vfswrap_fsync_recv(struct tevent_req *req,
return state->ret;
}
-static void vfswrap_asys_finished(struct tevent_context *ev,
- struct tevent_fd *fde,
- uint16_t flags, void *p)
-{
- struct asys_context *asys_ctx = (struct asys_context *)p;
- struct asys_result results[get_outstanding_aio_calls()];
- int i, ret;
-
- if ((flags & TEVENT_FD_READ) == 0) {
- return;
- }
-
- ret = asys_results(asys_ctx, results, get_outstanding_aio_calls());
- if (ret < 0) {
- DEBUG(1, ("asys_results returned %s\n", strerror(-ret)));
- return;
- }
-
- for (i=0; i<ret; i++) {
- struct asys_result *result = &results[i];
- struct tevent_req *req;
- struct vfswrap_asys_state *state;
-
- if ((result->ret == -1) && (result->err == ECANCELED)) {
- continue;
- }
-
- req = talloc_get_type_abort(result->private_data,
- struct tevent_req);
- state = tevent_req_data(req, struct vfswrap_asys_state);
-
- talloc_set_destructor(state, NULL);
-
- SMBPROFILE_BASIC_ASYNC_END(state->profile_basic);
- SMBPROFILE_BYTES_ASYNC_END(state->profile_bytes);
- state->ret = result->ret;
- state->vfs_aio_state.error = result->err;
- state->vfs_aio_state.duration = result->duration;
- tevent_req_defer_callback(req, ev);
- tevent_req_done(req);
- }
-}
-
-static ssize_t vfswrap_asys_ssize_t_recv(struct tevent_req *req,
- struct vfs_aio_state *vfs_aio_state)
-{
- struct vfswrap_asys_state *state = tevent_req_data(
- req, struct vfswrap_asys_state);
-
- if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) {
- return -1;
- }
- *vfs_aio_state = state->vfs_aio_state;
- return state->ret;
-}
-
-static int vfswrap_asys_int_recv(struct tevent_req *req,
- struct vfs_aio_state *vfs_aio_state)
-{
- struct vfswrap_asys_state *state = tevent_req_data(
- req, struct vfswrap_asys_state);
-
- if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) {
- return -1;
- }
- *vfs_aio_state = state->vfs_aio_state;
- return state->ret;
-}
-
static off_t vfswrap_lseek(vfs_handle_struct *handle, files_struct *fsp, off_t offset, int whence)
{
off_t result = 0;
diff --git a/source3/smbd/globals.h b/source3/smbd/globals.h
index 61d85cb..76ce0bb 100644
--- a/source3/smbd/globals.h
+++ b/source3/smbd/globals.h
@@ -921,12 +921,6 @@ struct smbd_server_connection {
} locks;
} smb2;
- /*
- * Link into libasys for asynchronous operations
- */
- struct asys_context *asys_ctx;
- struct tevent_fd *asys_fde;
-
struct pthreadpool_tevent *pool;
struct smbXsrv_client *client;
--
2.1.4
>From c9ce2063636533b81498503803ff0eb4456fe04c Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Sun, 7 Aug 2016 16:03:00 +0200
Subject: [PATCH 23/23] lib: Remove unused source3/lib/asys
Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
---
source3/lib/asys/asys.c | 342 -----------------------------------------
source3/lib/asys/asys.h | 155 -------------------
source3/lib/asys/tests.c | 90 -----------
source3/lib/asys/wscript_build | 10 --
source3/wscript_build | 2 -
5 files changed, 599 deletions(-)
delete mode 100644 source3/lib/asys/asys.c
delete mode 100644 source3/lib/asys/asys.h
delete mode 100644 source3/lib/asys/tests.c
delete mode 100644 source3/lib/asys/wscript_build
diff --git a/source3/lib/asys/asys.c b/source3/lib/asys/asys.c
deleted file mode 100644
index 594d470..0000000
--- a/source3/lib/asys/asys.c
+++ /dev/null
@@ -1,342 +0,0 @@
-/*
- * Async syscalls
- * Copyright (C) Volker Lendecke 2012
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "asys.h"
-#include <stdlib.h>
-#include <errno.h>
-#include "../pthreadpool/pthreadpool_pipe.h"
-#include "lib/util/time.h"
-#include "smbprofile.h"
-
-struct asys_pwrite_args {
- int fildes;
- const void *buf;
- size_t nbyte;
- off_t offset;
-};
-
-struct asys_pread_args {
- int fildes;
- void *buf;
- size_t nbyte;
- off_t offset;
-};
-
-struct asys_fsync_args {
- int fildes;
-};
-
-union asys_job_args {
- struct asys_pwrite_args pwrite_args;
- struct asys_pread_args pread_args;
- struct asys_fsync_args fsync_args;
-};
-
-struct asys_job {
- void *private_data;
- union asys_job_args args;
- ssize_t ret;
- int err;
- char busy;
- char canceled;
- struct timespec start_time;
- struct timespec end_time;
-};
-
-struct asys_context {
- struct pthreadpool_pipe *pool;
- int pthreadpool_pipe_fd;
-
- unsigned num_jobs;
- struct asys_job **jobs;
-};
-
-struct asys_creds_context {
- int dummy;
-};
-
-int asys_context_init(struct asys_context **pctx, unsigned max_parallel)
-{
- struct asys_context *ctx;
- int ret;
-
- ctx = calloc(1, sizeof(struct asys_context));
- if (ctx == NULL) {
- return ENOMEM;
- }
- ret = pthreadpool_pipe_init(max_parallel, &ctx->pool);
- if (ret != 0) {
- free(ctx);
- return ret;
- }
- ctx->pthreadpool_pipe_fd = pthreadpool_pipe_signal_fd(ctx->pool);
-
- *pctx = ctx;
- return 0;
-}
-
-int asys_signalfd(struct asys_context *ctx)
-{
- return ctx->pthreadpool_pipe_fd;
-}
-
-int asys_context_destroy(struct asys_context *ctx)
-{
- int ret;
- unsigned i;
-
- for (i=0; i<ctx->num_jobs; i++) {
- if (ctx->jobs[i]->busy) {
- return EBUSY;
- }
- }
-
- ret = pthreadpool_pipe_destroy(ctx->pool);
- if (ret != 0) {
- return ret;
- }
- for (i=0; i<ctx->num_jobs; i++) {
- free(ctx->jobs[i]);
- }
- free(ctx->jobs);
- free(ctx);
- return 0;
-}
-
-static int asys_new_job(struct asys_context *ctx, int *jobid,
- struct asys_job **pjob)
-{
- struct asys_job **tmp;
- struct asys_job *job;
- unsigned i;
-
- for (i=0; i<ctx->num_jobs; i++) {
- job = ctx->jobs[i];
- if (!job->busy) {
- job->err = 0;
- *pjob = job;
- *jobid = i;
- return 0;
- }
- }
-
- if (ctx->num_jobs+1 == 0) {
- return EBUSY; /* overflow */
- }
-
- tmp = realloc(ctx->jobs, sizeof(struct asys_job *)*(ctx->num_jobs+1));
- if (tmp == NULL) {
- return ENOMEM;
- }
- ctx->jobs = tmp;
-
- job = calloc(1, sizeof(struct asys_job));
- if (job == NULL) {
- return ENOMEM;
- }
- ctx->jobs[ctx->num_jobs] = job;
-
- *jobid = ctx->num_jobs;
- *pjob = job;
- ctx->num_jobs += 1;
- return 0;
-}
-
-static void asys_pwrite_do(void *private_data);
-
-int asys_pwrite(struct asys_context *ctx, int fildes, const void *buf,
- size_t nbyte, off_t offset, void *private_data)
-{
- struct asys_job *job;
- struct asys_pwrite_args *args;
- int jobid;
- int ret;
-
- ret = asys_new_job(ctx, &jobid, &job);
- if (ret != 0) {
- return ret;
- }
- job->private_data = private_data;
-
- args = &job->args.pwrite_args;
- args->fildes = fildes;
- args->buf = buf;
- args->nbyte = nbyte;
- args->offset = offset;
-
- ret = pthreadpool_pipe_add_job(ctx->pool, jobid, asys_pwrite_do, job);
- if (ret != 0) {
- return ret;
- }
- job->busy = 1;
-
- return 0;
-}
-
-static void asys_pwrite_do(void *private_data)
-{
- struct asys_job *job = (struct asys_job *)private_data;
- struct asys_pwrite_args *args = &job->args.pwrite_args;
-
- PROFILE_TIMESTAMP(&job->start_time);
- job->ret = pwrite(args->fildes, args->buf, args->nbyte, args->offset);
- PROFILE_TIMESTAMP(&job->end_time);
-
- if (job->ret == -1) {
- job->err = errno;
- }
-}
-
-static void asys_pread_do(void *private_data);
-
-int asys_pread(struct asys_context *ctx, int fildes, void *buf,
- size_t nbyte, off_t offset, void *private_data)
-{
- struct asys_job *job;
- struct asys_pread_args *args;
- int jobid;
- int ret;
-
- ret = asys_new_job(ctx, &jobid, &job);
- if (ret != 0) {
- return ret;
- }
- job->private_data = private_data;
-
- args = &job->args.pread_args;
- args->fildes = fildes;
- args->buf = buf;
- args->nbyte = nbyte;
- args->offset = offset;
-
- ret = pthreadpool_pipe_add_job(ctx->pool, jobid, asys_pread_do, job);
- if (ret != 0) {
- return ret;
- }
- job->busy = 1;
-
- return 0;
-}
-
-static void asys_pread_do(void *private_data)
-{
- struct asys_job *job = (struct asys_job *)private_data;
- struct asys_pread_args *args = &job->args.pread_args;
-
- PROFILE_TIMESTAMP(&job->start_time);
- job->ret = pread(args->fildes, args->buf, args->nbyte, args->offset);
- PROFILE_TIMESTAMP(&job->end_time);
-
- if (job->ret == -1) {
- job->err = errno;
- }
-}
-
-static void asys_fsync_do(void *private_data);
-
-int asys_fsync(struct asys_context *ctx, int fildes, void *private_data)
-{
- struct asys_job *job;
- struct asys_fsync_args *args;
- int jobid;
- int ret;
-
- ret = asys_new_job(ctx, &jobid, &job);
- if (ret != 0) {
- return ret;
- }
- job->private_data = private_data;
-
- args = &job->args.fsync_args;
- args->fildes = fildes;
-
- ret = pthreadpool_pipe_add_job(ctx->pool, jobid, asys_fsync_do, job);
- if (ret != 0) {
- return ret;
- }
- job->busy = 1;
-
- return 0;
-}
-
-static void asys_fsync_do(void *private_data)
-{
- struct asys_job *job = (struct asys_job *)private_data;
- struct asys_fsync_args *args = &job->args.fsync_args;
-
- PROFILE_TIMESTAMP(&job->start_time);
- job->ret = fsync(args->fildes);
- PROFILE_TIMESTAMP(&job->end_time);
-
- if (job->ret == -1) {
- job->err = errno;
- }
-}
-
-void asys_cancel(struct asys_context *ctx, void *private_data)
-{
- unsigned i;
-
- for (i=0; i<ctx->num_jobs; i++) {
- struct asys_job *job = ctx->jobs[i];
-
- if (job->private_data == private_data) {
- job->canceled = 1;
- }
- }
-}
-
-int asys_results(struct asys_context *ctx, struct asys_result *results,
- unsigned num_results)
-{
- int jobids[num_results];
- int i, ret;
-
- ret = pthreadpool_pipe_finished_jobs(ctx->pool, jobids, num_results);
- if (ret <= 0) {
- return ret;
- }
-
- for (i=0; i<ret; i++) {
- struct asys_result *result = &results[i];
- struct asys_job *job;
- int jobid;
-
- jobid = jobids[i];
-
- if ((jobid < 0) || (jobid >= ctx->num_jobs)) {
- return -EIO;
- }
-
- job = ctx->jobs[jobid];
-
- if (job->canceled) {
- result->ret = -1;
- result->err = ECANCELED;
- } else {
- result->ret = job->ret;
- result->err = job->err;
- }
- result->private_data = job->private_data;
- result->duration = nsec_time_diff(&job->end_time, &job->start_time);
-
- job->busy = 0;
- }
-
- return ret;
-}
diff --git a/source3/lib/asys/asys.h b/source3/lib/asys/asys.h
deleted file mode 100644
index f576bd3..0000000
--- a/source3/lib/asys/asys.h
+++ /dev/null
@@ -1,155 +0,0 @@
-/*
- * Async syscalls
- * Copyright (C) Volker Lendecke 2012
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#ifndef __ASYS_H__
-#define __ASYS_H__
-
-#include "replace.h"
-#include "system/filesys.h"
-
-/**
- * @defgroup asys The async syscall library
- *
- * This module contains a set of asynchronous functions that directly
- * wrap normally synchronous posix system calls. The reason for this
- * module's existence is the limited set of operations the posix async
- * I/O API provides.
- *
- * The basic flow of operations is:
- *
- * The application creates a asys_context structure using
- * asys_context_create()
- *
- * The application triggers a call to the library by calling for
- * example asys_ftruncate(). asys_ftruncate() takes a private_data
- * argument that will be returned later by asys_result. The calling
- * application should hand a pointer representing the async operation
- * to the private_data argument.
- *
- * The application puts the fd returned by asys_signalfd() into its
- * event loop. When the signal fd becomes readable, the application
- * calls asys_result() to grab the final result of one of the system
- * calls that were issued in the meantime.
- *
- * For multi-user applications it is necessary to create different
- * credential contexts, as it is not clear when exactly the real
- * system call will be issued. The application might have called
- * seteuid(2) or something equivalent in the meantime. Thus, all
- * system calls doing access checks, in particular all calls doing
- * path-based operations, require a struct auth_creds_context
- * parameter. asys_creds_context_create() creates such a context. All
- * credential-checking operations take a struct asys_creds_context as
- * an argument. It can be NULL if the application never changes
- * credentials.
- *
- * @{
- */
-
-struct asys_context;
-struct asys_creds_context;
-
-enum asys_log_level {
- ASYS_LOG_FATAL = 0,
- ASYS_DEBUG_ERROR,
- ASYS_DEBUG_WARNING,
- ASYS_DEBUG_TRACE
-};
-
-#ifndef PRINTF_ATTRIBUTE
-#if (__GNUC__ >= 3)
-/** Use gcc attribute to check printf fns. a1 is the 1-based index of
- * the parameter containing the format, and a2 the index of the first
- * argument. Note that some gcc 2.x versions don't handle this
- * properly **/
-#define PRINTF_ATTRIBUTE(a1, a2) __attribute__ ((format (__printf__, a1, a2)))
-#else
-#define PRINTF_ATTRIBUTE(a1, a2)
-#endif
-#endif
-
-typedef void (*asys_log_fn)(struct asys_context *ctx, void *private_data,
- enum asys_log_level level,
- const char *fmt, ...) PRINTF_ATTRIBUTE(4, 5);
-
-int asys_context_init(struct asys_context **ctx, unsigned max_parallel);
-int asys_context_destroy(struct asys_context *ctx);
-void asys_set_log_fn(struct asys_context *ctx, asys_log_fn fn,
- void *private_data);
-
-/**
- * @brief Get the the signal fd
- *
- * asys_signalfd() returns a file descriptor that will become readable
- * whenever an asynchronous request has finished. When the signalfd is
- * readable, calling asys_result() will not block.
- *
- * @param[in] ctx The asys context
- * @return A file descriptor indicating a finished operation
- */
-
-int asys_signalfd(struct asys_context *ctx);
-
-struct asys_result {
- ssize_t ret;
- int err;
- void *private_data;
- uint64_t duration; /* nanoseconds */
-};
-
-/**
- * @brief Pull the results from async operations
- *
- * Whe the fd returned from asys_signalfd() is readable, one or more async
- * operations have finished. The result from the async operations can be pulled
- * with asys_results().
- *
- * @param[in] ctx The asys context
- * @param[out] results The result strutcts
- * @param[in] num_results The length of the results array
- * @return success: >=0, number of finished jobs
- * failure: -errno
- */
-int asys_results(struct asys_context *ctx, struct asys_result *results,
- unsigned num_results);
-
-void asys_cancel(struct asys_context *ctx, void *private_data);
-
-int asys_pread(struct asys_context *ctx, int fildes, void *buf, size_t nbyte,
- off_t offset, void *private_data);
-int asys_pwrite(struct asys_context *ctx, int fildes, const void *buf,
- size_t nbyte, off_t offset, void *private_data);
-int asys_ftruncate(struct asys_context *ctx, int filedes, off_t length,
- void *private_data);
-int asys_fsync(struct asys_context *ctx, int fd, void *private_data);
-int asys_close(struct asys_context *ctx, int fd, void *private_data);
-
-struct asys_creds_context *asys_creds_context_create(
- struct asys_context *ctx,
- uid_t uid, gid_t gid, unsigned num_gids, gid_t *gids);
-
-int asys_creds_context_delete(struct asys_creds_context *ctx);
-
-int asys_open(struct asys_context *ctx, struct asys_creds_context *cctx,
- const char *pathname, int flags, mode_t mode,
- void *private_data);
-int asys_unlink(struct asys_context *ctx, struct asys_creds_context *cctx,
- const char *pathname, void *private_data);
-
-/* @} */
-
-#endif /* __ASYS_H__ */
diff --git a/source3/lib/asys/tests.c b/source3/lib/asys/tests.c
deleted file mode 100644
index e54e3ea..0000000
--- a/source3/lib/asys/tests.c
+++ /dev/null
@@ -1,90 +0,0 @@
-/*
- * Test async syscalls
- * Copyright (C) Volker Lendecke 2012
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "asys.h"
-#include <stdio.h>
-#include <string.h>
-#include <stdlib.h>
-#include <errno.h>
-
-int main(int argc, const char *argv[])
-{
- struct asys_context *ctx;
- int i, fd, ret;
-
- int *buf;
-
- int ntasks = 10;
-
- ret = asys_context_init(&ctx, 0);
- if (ret != 0) {
- perror("asys_context_create failed");
- return 1;
- }
-
- fd = open("asys_testfile", O_CREAT|O_RDWR, 0644);
- if (fd == -1) {
- perror("open failed");
- return 1;
- }
-
- buf = calloc(ntasks, sizeof(int));
- if (buf == NULL) {
- perror("calloc failed");
- return 1;
- }
-
- for (i=0; i<ntasks; i++) {
- buf[i] = i;
- }
-
- for (i=0; i<ntasks; i++) {
- ret = asys_pwrite(ctx, fd, &buf[i], sizeof(int),
- i * sizeof(int), &buf[i]);
- if (ret != 0) {
- errno = ret;
- perror("asys_pwrite failed");
- return 1;
- }
- }
-
- for (i=0; i<ntasks; i++) {
- struct asys_result result;
- int *pidx;
-
- ret = asys_results(ctx, &result, 1);
- if (ret < 0) {
- errno = -ret;
- perror("asys_result failed");
- return 1;
- }
- pidx = (int *)result.private_data;
-
- printf("%d returned %d\n", *pidx, (int)result.ret);
- }
-
- ret = asys_context_destroy(ctx);
- if (ret != 0) {
- perror("asys_context_delete failed");
- return 1;
- }
-
- free(buf);
-
- return 0;
-}
diff --git a/source3/lib/asys/wscript_build b/source3/lib/asys/wscript_build
deleted file mode 100644
index 520994f..0000000
--- a/source3/lib/asys/wscript_build
+++ /dev/null
@@ -1,10 +0,0 @@
-#!/usr/bin/env python
-
-bld.SAMBA3_SUBSYSTEM('LIBASYS',
- source='asys.c',
- deps='PTHREADPOOL samba-util')
-
-bld.SAMBA3_BINARY('asystest',
- source='tests.c',
- deps='LIBASYS',
- install=False)
diff --git a/source3/wscript_build b/source3/wscript_build
index 1d6f043..69b7371 100755
--- a/source3/wscript_build
+++ b/source3/wscript_build
@@ -629,7 +629,6 @@ bld.SAMBA3_LIBRARY('smbd_base',
RPC_SERVICE
NDR_SMBXSRV
LEASES_DB
- LIBASYS
sysquotas
NDR_SMB_ACL
netapi
@@ -1513,7 +1512,6 @@ bld.SAMBA3_BINARY('spotlight2sparql',
bld.RECURSE('auth')
bld.RECURSE('libgpo/gpext')
bld.RECURSE('lib/pthreadpool')
-bld.RECURSE('lib/asys')
bld.RECURSE('lib/unix_msg')
bld.RECURSE('librpc')
bld.RECURSE('librpc/idl')
--
2.1.4
More information about the samba-technical
mailing list