[PATCH] restructure messaging
Volker Lendecke
vl at samba.org
Wed Sep 28 23:33:25 UTC 2016
Hi!
Looking for a scary patchset? Here it is....
Main reason for this patchset is the last two ones: This patches
passes down the event context that triggered a receiving message
callback to the handlers and only acts upon them in the right context.
On the way there I've eliminated poll_funcs and made our pthreadpool
safe against run-down with blocked threads. It's the last part that
again and again reminded me of
http://bholley.net/blog/2015/must-be-this-tall-to-write-multi-threaded-code.html
Consider it WIP, it has to survive a few autobuilds and the
tevent version needs to be bumped.
Review appreciated!
Thanks, Volker
-------------- next part --------------
>From c5182348fabb078ced166885036bd8d9d73b8fed Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 23 Sep 2016 18:43:04 -0700
Subject: [PATCH 01/26] messaging4: Fix signed/unsigned hickups
Signed-off-by: Volker Lendecke <vl at samba.org>
---
source4/lib/messaging/messaging.c | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/source4/lib/messaging/messaging.c b/source4/lib/messaging/messaging.c
index d0beef6..c0b64be 100644
--- a/source4/lib/messaging/messaging.c
+++ b/source4/lib/messaging/messaging.c
@@ -149,7 +149,7 @@ NTSTATUS imessaging_register(struct imessaging_context *msg, void *private_data,
/* possibly expand dispatch array */
if (msg_type >= msg->num_types) {
struct dispatch_fn **dp;
- int i;
+ uint32_t i;
dp = talloc_realloc(msg, msg->dispatch, struct dispatch_fn *, msg_type+1);
NT_STATUS_HAVE_NO_MEMORY(dp);
msg->dispatch = dp;
@@ -728,7 +728,7 @@ static int all_servers_func(const char *name, unsigned num_servers,
struct irpc_name_records *name_records = talloc_get_type(
private_data, struct irpc_name_records);
struct irpc_name_record *name_record;
- int i;
+ uint32_t i;
name_records->names
= talloc_realloc(name_records, name_records->names,
--
2.7.4
>From c18ced932b47b066a47964f41dbfd999dbdc5142 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Wed, 31 Aug 2016 15:03:16 +0200
Subject: [PATCH 02/26] tevent: Factor out tevent_common_insert_timer
Signed-off-by: Volker Lendecke <vl at samba.org>
---
lib/tevent/tevent_timed.c | 70 +++++++++++++++++++++++++++--------------------
1 file changed, 40 insertions(+), 30 deletions(-)
diff --git a/lib/tevent/tevent_timed.c b/lib/tevent/tevent_timed.c
index 920d39f..bb0160c 100644
--- a/lib/tevent/tevent_timed.c
+++ b/lib/tevent/tevent_timed.c
@@ -154,39 +154,13 @@ static int tevent_common_timed_deny_destructor(struct tevent_timer *te)
return -1;
}
-/*
- add a timed event
- return NULL on failure (memory allocation error)
-*/
-static struct tevent_timer *tevent_common_add_timer_internal(
- struct tevent_context *ev,
- TALLOC_CTX *mem_ctx,
- struct timeval next_event,
- tevent_timer_handler_t handler,
- void *private_data,
- const char *handler_name,
- const char *location,
- bool optimize_zero)
+static void tevent_common_insert_timer(struct tevent_context *ev,
+ struct tevent_timer *te,
+ bool optimize_zero)
{
- struct tevent_timer *te, *prev_te, *cur_te;
-
- te = talloc(mem_ctx?mem_ctx:ev, struct tevent_timer);
- if (te == NULL) return NULL;
-
- te->event_ctx = ev;
- te->next_event = next_event;
- te->handler = handler;
- te->private_data = private_data;
- te->handler_name = handler_name;
- te->location = location;
- te->additional_data = NULL;
-
- if (ev->timer_events == NULL) {
- ev->last_zero_timer = NULL;
- }
+ struct tevent_timer *prev_te = NULL;
/* keep the list ordered */
- prev_te = NULL;
if (optimize_zero && tevent_timeval_is_zero(&te->next_event)) {
/*
* Some callers use zero tevent_timer
@@ -199,6 +173,8 @@ static struct tevent_timer *tevent_common_add_timer_internal(
prev_te = ev->last_zero_timer;
ev->last_zero_timer = te;
} else {
+ struct tevent_timer *cur_te;
+
/*
* we traverse the list from the tail
* because it's much more likely that
@@ -227,6 +203,40 @@ static struct tevent_timer *tevent_common_add_timer_internal(
}
DLIST_ADD_AFTER(ev->timer_events, te, prev_te);
+}
+
+/*
+ add a timed event
+ return NULL on failure (memory allocation error)
+*/
+static struct tevent_timer *tevent_common_add_timer_internal(
+ struct tevent_context *ev,
+ TALLOC_CTX *mem_ctx,
+ struct timeval next_event,
+ tevent_timer_handler_t handler,
+ void *private_data,
+ const char *handler_name,
+ const char *location,
+ bool optimize_zero)
+{
+ struct tevent_timer *te;
+
+ te = talloc(mem_ctx?mem_ctx:ev, struct tevent_timer);
+ if (te == NULL) return NULL;
+
+ te->event_ctx = ev;
+ te->next_event = next_event;
+ te->handler = handler;
+ te->private_data = private_data;
+ te->handler_name = handler_name;
+ te->location = location;
+ te->additional_data = NULL;
+
+ if (ev->timer_events == NULL) {
+ ev->last_zero_timer = NULL;
+ }
+
+ tevent_common_insert_timer(ev, te, optimize_zero);
talloc_set_destructor(te, tevent_common_timed_destructor);
--
2.7.4
>From ef4331804f81db860671f11409baa1db1a77d4ed Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Wed, 31 Aug 2016 15:39:59 +0200
Subject: [PATCH 03/26] tevent: Add tevent_update_timer()
This will be a quicker way to time out sending sockets in messaging_dgm. Right
now cleanup of out-sockets is a bit coarse. The ideal would be to kill a socket
after being idle n seconds. This would mean to free and re-install a timer on
every packet. tevent_update_timer will be quite a bit cheaper.
Signed-off-by: Volker Lendecke <vl at samba.org>
---
lib/tevent/ABI/tevent-0.9.30.sigs | 1 +
lib/tevent/tevent.h | 10 ++++++++++
lib/tevent/tevent_timed.c | 18 ++++++++++++++++++
3 files changed, 29 insertions(+)
diff --git a/lib/tevent/ABI/tevent-0.9.30.sigs b/lib/tevent/ABI/tevent-0.9.30.sigs
index 9b8bfa1..66a450e 100644
--- a/lib/tevent/ABI/tevent-0.9.30.sigs
+++ b/lib/tevent/ABI/tevent-0.9.30.sigs
@@ -92,5 +92,6 @@ tevent_timeval_set: struct timeval (uint32_t, uint32_t)
tevent_timeval_until: struct timeval (const struct timeval *, const struct timeval *)
tevent_timeval_zero: struct timeval (void)
tevent_trace_point_callback: void (struct tevent_context *, enum tevent_trace_point)
+tevent_update_timer: void (struct tevent_timer *, struct timeval)
tevent_wakeup_recv: bool (struct tevent_req *)
tevent_wakeup_send: struct tevent_req *(TALLOC_CTX *, struct tevent_context *, struct timeval)
diff --git a/lib/tevent/tevent.h b/lib/tevent/tevent.h
index 7de04d0..bb23257 100644
--- a/lib/tevent/tevent.h
+++ b/lib/tevent/tevent.h
@@ -252,6 +252,16 @@ struct tevent_timer *_tevent_add_timer(struct tevent_context *ev,
#handler, __location__)
#endif
+/**
+ * @brief Set the time a tevent_timer fires
+ *
+ * @param[in] te The timer event to reset
+ *
+ * @param[in] next_event Timeval specifying the absolute time to fire this
+ * event. This is not an offset.
+ */
+void tevent_update_timer(struct tevent_timer *te, struct timeval next_event);
+
#ifdef DOXYGEN
/**
* Initialize an immediate event object
diff --git a/lib/tevent/tevent_timed.c b/lib/tevent/tevent_timed.c
index bb0160c..92f3ed1 100644
--- a/lib/tevent/tevent_timed.c
+++ b/lib/tevent/tevent_timed.c
@@ -284,6 +284,24 @@ struct tevent_timer *tevent_common_add_timer_v2(struct tevent_context *ev,
true);
}
+void tevent_update_timer(struct tevent_timer *te, struct timeval next_event)
+{
+ struct tevent_context *ev = te->event_ctx;
+
+ if (ev->last_zero_timer == te) {
+ te->event_ctx->last_zero_timer = DLIST_PREV(te);
+ }
+ DLIST_REMOVE(ev->timer_events, te);
+
+ te->next_event = next_event;
+
+ /*
+ * Not doing the zero_timer optimization. This is for new code
+ * that should know about immediates.
+ */
+ tevent_common_insert_timer(ev, te, false);
+}
+
/*
do a single event loop using the events defined in ev
--
2.7.4
>From 6deb4f32574bfd54da5cfc08c402f2931ec49ba4 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Wed, 7 Sep 2016 19:17:21 +0200
Subject: [PATCH 04/26] tevent: Rename wakeup fds
This makes the reading end of the signalling pipe special: If we have eventfd,
this is the same as the write fd. Without eventfd, it will have to be a
separate fd. This moves the requirement to #ifdef from the writing end to the
reading end. Why? We'll use the writing end somewhere else too soon, and this
patch avoids an #ifdef in that new place.
Signed-off-by: Volker Lendecke <vl at samba.org>
---
lib/tevent/tevent.c | 21 +++++++++++----------
lib/tevent/tevent_internal.h | 4 ++--
2 files changed, 13 insertions(+), 12 deletions(-)
diff --git a/lib/tevent/tevent.c b/lib/tevent/tevent.c
index 331be0e..87776ec 100644
--- a/lib/tevent/tevent.c
+++ b/lib/tevent/tevent.c
@@ -860,7 +860,7 @@ static void wakeup_pipe_handler(struct tevent_context *ev,
int tevent_common_wakeup_init(struct tevent_context *ev)
{
- int ret;
+ int ret, read_fd;
if (ev->wakeup_fde != NULL) {
return 0;
@@ -871,7 +871,7 @@ int tevent_common_wakeup_init(struct tevent_context *ev)
if (ret == -1) {
return errno;
}
- ev->wakeup_fd = ret;
+ read_fd = ev->wakeup_fd = ret;
#else
{
int pipe_fds[2];
@@ -879,21 +879,22 @@ int tevent_common_wakeup_init(struct tevent_context *ev)
if (ret == -1) {
return errno;
}
- ev->wakeup_fd = pipe_fds[0];
- ev->wakeup_write_fd = pipe_fds[1];
+ ev->wakeup_fd = pipe_fds[1];
+ ev->wakeup_read_fd = pipe_fds[0];
ev_set_blocking(ev->wakeup_fd, false);
- ev_set_blocking(ev->wakeup_write_fd, false);
+ ev_set_blocking(ev->wakeup_read_fd, false);
+
+ read_fd = ev->wakeup_read_fd;
}
#endif
- ev->wakeup_fde = tevent_add_fd(ev, ev, ev->wakeup_fd,
- TEVENT_FD_READ,
+ ev->wakeup_fde = tevent_add_fd(ev, ev, read_fd, TEVENT_FD_READ,
wakeup_pipe_handler, NULL);
if (ev->wakeup_fde == NULL) {
close(ev->wakeup_fd);
#ifndef HAVE_EVENTFD
- close(ev->wakeup_write_fd);
+ close(ev->wakeup_read_fd);
#endif
return ENOMEM;
}
@@ -915,7 +916,7 @@ int tevent_common_wakeup(struct tevent_context *ev)
ret = write(ev->wakeup_fd, &val, sizeof(val));
#else
char c = '\0';
- ret = write(ev->wakeup_write_fd, &c, 1);
+ ret = write(ev->wakeup_fd, &c, 1);
#endif
} while ((ret == -1) && (errno == EINTR));
@@ -932,6 +933,6 @@ static void tevent_common_wakeup_fini(struct tevent_context *ev)
close(ev->wakeup_fd);
#ifndef HAVE_EVENTFD
- close(ev->wakeup_write_fd);
+ close(ev->wakeup_read_fd);
#endif
}
diff --git a/lib/tevent/tevent_internal.h b/lib/tevent/tevent_internal.h
index d960544..84ae5bc 100644
--- a/lib/tevent/tevent_internal.h
+++ b/lib/tevent/tevent_internal.h
@@ -277,9 +277,9 @@ struct tevent_context {
/* pipe hack used with signal handlers */
struct tevent_fd *wakeup_fde;
- int wakeup_fd;
+ int wakeup_fd; /* fd to write into */
#ifndef HAVE_EVENT_FD
- int wakeup_write_fd;
+ int wakeup_read_fd;
#endif
/* debugging operations */
--
2.7.4
>From dc5f29425f9ca2e9050484f558ced4c35c971ceb Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Wed, 7 Sep 2016 19:47:55 +0200
Subject: [PATCH 05/26] tevent: Add tevent_common_wakeup_fd()
This prepares tevent run-down with active threads.
It has the advantage to not depend on talloc'ed structs. It is needed to make
talloc_free(tevent_context) safe when tevent_threaded_contexts are still
around.
Signed-off-by: Volker Lendecke <vl at samba.org>
---
lib/tevent/ABI/tevent-0.9.30.sigs | 1 +
lib/tevent/tevent.c | 19 ++++++++++++-------
lib/tevent/tevent_internal.h | 1 +
3 files changed, 14 insertions(+), 7 deletions(-)
diff --git a/lib/tevent/ABI/tevent-0.9.30.sigs b/lib/tevent/ABI/tevent-0.9.30.sigs
index 66a450e..ea179a0 100644
--- a/lib/tevent/ABI/tevent-0.9.30.sigs
+++ b/lib/tevent/ABI/tevent-0.9.30.sigs
@@ -36,6 +36,7 @@ tevent_common_loop_wait: int (struct tevent_context *, const char *)
tevent_common_schedule_immediate: void (struct tevent_immediate *, struct tevent_context *, tevent_immediate_handler_t, void *, const char *, const char *)
tevent_common_threaded_activate_immediate: void (struct tevent_context *)
tevent_common_wakeup: int (struct tevent_context *)
+tevent_common_wakeup_fd: int (int)
tevent_common_wakeup_init: int (struct tevent_context *)
tevent_context_init: struct tevent_context *(TALLOC_CTX *)
tevent_context_init_byname: struct tevent_context *(TALLOC_CTX *, const char *)
diff --git a/lib/tevent/tevent.c b/lib/tevent/tevent.c
index 87776ec..575337d 100644
--- a/lib/tevent/tevent.c
+++ b/lib/tevent/tevent.c
@@ -902,27 +902,32 @@ int tevent_common_wakeup_init(struct tevent_context *ev)
return 0;
}
-int tevent_common_wakeup(struct tevent_context *ev)
+int tevent_common_wakeup_fd(int fd)
{
ssize_t ret;
- if (ev->wakeup_fde == NULL) {
- return ENOTCONN;
- }
-
do {
#ifdef HAVE_EVENTFD
uint64_t val = 1;
- ret = write(ev->wakeup_fd, &val, sizeof(val));
+ ret = write(fd, &val, sizeof(val));
#else
char c = '\0';
- ret = write(ev->wakeup_fd, &c, 1);
+ ret = write(fd, &c, 1);
#endif
} while ((ret == -1) && (errno == EINTR));
return 0;
}
+int tevent_common_wakeup(struct tevent_context *ev)
+{
+ if (ev->wakeup_fde == NULL) {
+ return ENOTCONN;
+ }
+
+ return tevent_common_wakeup_fd(ev->wakeup_fd);
+}
+
static void tevent_common_wakeup_fini(struct tevent_context *ev)
{
if (ev->wakeup_fde == NULL) {
diff --git a/lib/tevent/tevent_internal.h b/lib/tevent/tevent_internal.h
index 84ae5bc..a4af79e 100644
--- a/lib/tevent/tevent_internal.h
+++ b/lib/tevent/tevent_internal.h
@@ -357,6 +357,7 @@ void tevent_common_threaded_activate_immediate(struct tevent_context *ev);
bool tevent_common_have_events(struct tevent_context *ev);
int tevent_common_wakeup_init(struct tevent_context *ev);
+int tevent_common_wakeup_fd(int fd);
int tevent_common_wakeup(struct tevent_context *ev);
struct tevent_signal *tevent_common_add_signal(struct tevent_context *ev,
--
2.7.4
>From fe1d0d7ebd2df9e9a1b3049f77b6563a1b210364 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Wed, 7 Sep 2016 20:25:36 +0200
Subject: [PATCH 06/26] tevent: Make talloc_free safe when threaded_contexts
exist
I did not find a way to do this safely without a mutex per threaded_context.
Signed-off-by: Volker Lendecke <vl at samba.org>
---
lib/tevent/tevent.c | 61 ++++++++++++++++++++++++++++++++++++--------
lib/tevent/tevent_internal.h | 5 ++++
lib/tevent/tevent_threads.c | 38 +++++++++++++++++++++++++--
3 files changed, 92 insertions(+), 12 deletions(-)
diff --git a/lib/tevent/tevent.c b/lib/tevent/tevent.c
index 575337d..65b101f 100644
--- a/lib/tevent/tevent.c
+++ b/lib/tevent/tevent.c
@@ -200,6 +200,16 @@ static void tevent_atfork_prepare(void)
}
for (ev = tevent_contexts; ev != NULL; ev = ev->next) {
+ struct tevent_threaded_context *tctx;
+
+ for (tctx = ev->threaded_contexts; tctx != NULL;
+ tctx = tctx->next) {
+ ret = pthread_mutex_lock(&tctx->event_ctx_mutex);
+ if (ret != 0) {
+ tevent_abort(ev, "pthread_mutex_lock failed");
+ }
+ }
+
ret = pthread_mutex_lock(&ev->scheduled_mutex);
if (ret != 0) {
tevent_abort(ev, "pthread_mutex_lock failed");
@@ -214,10 +224,21 @@ static void tevent_atfork_parent(void)
for (ev = DLIST_TAIL(tevent_contexts); ev != NULL;
ev = DLIST_PREV(ev)) {
+ struct tevent_threaded_context *tctx;
+
ret = pthread_mutex_unlock(&ev->scheduled_mutex);
if (ret != 0) {
tevent_abort(ev, "pthread_mutex_unlock failed");
}
+
+ for (tctx = DLIST_TAIL(ev->threaded_contexts); tctx != NULL;
+ tctx = DLIST_PREV(tctx)) {
+ ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
+ if (ret != 0) {
+ tevent_abort(
+ ev, "pthread_mutex_unlock failed");
+ }
+ }
}
ret = pthread_mutex_unlock(&tevent_contexts_mutex);
@@ -235,9 +256,15 @@ static void tevent_atfork_child(void)
ev = DLIST_PREV(ev)) {
struct tevent_threaded_context *tctx;
- for (tctx = ev->threaded_contexts; tctx != NULL;
- tctx = tctx->next) {
+ for (tctx = DLIST_TAIL(ev->threaded_contexts); tctx != NULL;
+ tctx = DLIST_PREV(tctx)) {
tctx->event_ctx = NULL;
+
+ ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
+ if (ret != 0) {
+ tevent_abort(
+ ev, "pthread_mutex_unlock failed");
+ }
}
ev->threaded_contexts = NULL;
@@ -289,18 +316,32 @@ int tevent_common_context_destructor(struct tevent_context *ev)
if (ret != 0) {
abort();
}
-#endif
- if (ev->threaded_contexts != NULL) {
+ while (ev->threaded_contexts != NULL) {
+ struct tevent_threaded_context *tctx = ev->threaded_contexts;
+
+ ret = pthread_mutex_lock(&tctx->event_ctx_mutex);
+ if (ret != 0) {
+ abort();
+ }
+
/*
- * Threaded contexts are indicators that threads are
- * about to send us immediates via
- * tevent_threaded_schedule_immediate. The caller
- * needs to make sure that the tevent context lives
- * long enough to receive immediates from all threads.
+ * Indicate to the thread that the tevent_context is
+ * gone. The counterpart of this is in
+ * _tevent_threaded_schedule_immediate, there we read
+ * this under the threaded_context's mutex.
*/
- tevent_abort(ev, "threaded contexts exist");
+
+ tctx->event_ctx = NULL;
+
+ ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
+ if (ret != 0) {
+ abort();
+ }
+
+ DLIST_REMOVE(ev->threaded_contexts, tctx);
}
+#endif
tevent_common_wakeup_fini(ev);
diff --git a/lib/tevent/tevent_internal.h b/lib/tevent/tevent_internal.h
index a4af79e..a5f1ebde 100644
--- a/lib/tevent/tevent_internal.h
+++ b/lib/tevent/tevent_internal.h
@@ -230,7 +230,12 @@ struct tevent_signal {
struct tevent_threaded_context {
struct tevent_threaded_context *next, *prev;
+
+#ifdef HAVE_PTHREAD
+ pthread_mutex_t event_ctx_mutex;
+#endif
struct tevent_context *event_ctx;
+ int wakeup_fd;
};
struct tevent_debug_ops {
diff --git a/lib/tevent/tevent_threads.c b/lib/tevent/tevent_threads.c
index e42759e..8197323 100644
--- a/lib/tevent/tevent_threads.c
+++ b/lib/tevent/tevent_threads.c
@@ -375,9 +375,17 @@ void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
static int tevent_threaded_context_destructor(
struct tevent_threaded_context *tctx)
{
+ int ret;
+
if (tctx->event_ctx != NULL) {
DLIST_REMOVE(tctx->event_ctx->threaded_contexts, tctx);
}
+
+ ret = pthread_mutex_destroy(&tctx->event_ctx_mutex);
+ if (ret != 0) {
+ abort();
+ }
+
return 0;
}
@@ -399,6 +407,13 @@ struct tevent_threaded_context *tevent_threaded_context_create(
return NULL;
}
tctx->event_ctx = ev;
+ tctx->wakeup_fd = ev->wakeup_fd;
+
+ ret = pthread_mutex_init(&tctx->event_ctx_mutex, NULL);
+ if (ret != 0) {
+ TALLOC_FREE(tctx);
+ return NULL;
+ }
DLIST_ADD(ev->threaded_contexts, tctx);
talloc_set_destructor(tctx, tevent_threaded_context_destructor);
@@ -418,9 +433,28 @@ void _tevent_threaded_schedule_immediate(struct tevent_threaded_context *tctx,
const char *location)
{
#ifdef HAVE_PTHREAD
- struct tevent_context *ev = tctx->event_ctx;
+ struct tevent_context *ev;
int ret;
+ ret = pthread_mutex_lock(&tctx->event_ctx_mutex);
+ if (ret != 0) {
+ abort();
+ }
+
+ ev = tctx->event_ctx;
+
+ ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
+ if (ret != 0) {
+ abort();
+ }
+
+ if (ev == NULL) {
+ /*
+ * Our event context is already gone.
+ */
+ return;
+ }
+
if ((im->event_ctx != NULL) || (handler == NULL)) {
abort();
}
@@ -455,7 +489,7 @@ void _tevent_threaded_schedule_immediate(struct tevent_threaded_context *tctx,
* than a noncontended one. So I'd opt for the lower footprint
* initially. Maybe we have to change that later.
*/
- tevent_common_wakeup(ev);
+ tevent_common_wakeup_fd(tctx->wakeup_fd);
#else
/*
* tevent_threaded_context_create() returned NULL with ENOSYS...
--
2.7.4
>From f96eac9664308fe5f4fd780075df450ceb763343 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Wed, 7 Sep 2016 20:37:21 +0200
Subject: [PATCH 07/26] pthreadpool: Make "shutdown" a bool
Just a small cleanup
Signed-off-by: Volker Lendecke <vl at samba.org>
---
source3/lib/pthreadpool/pthreadpool.c | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git a/source3/lib/pthreadpool/pthreadpool.c b/source3/lib/pthreadpool/pthreadpool.c
index ee59cc4..a306c88 100644
--- a/source3/lib/pthreadpool/pthreadpool.c
+++ b/source3/lib/pthreadpool/pthreadpool.c
@@ -73,7 +73,7 @@ struct pthreadpool {
/*
* indicator to worker threads that they should shut down
*/
- int shutdown;
+ bool shutdown;
/*
* maximum number of threads
@@ -150,7 +150,7 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
return ret;
}
- pool->shutdown = 0;
+ pool->shutdown = false;
pool->num_threads = 0;
pool->num_exited = 0;
pool->exited = NULL;
@@ -295,7 +295,7 @@ int pthreadpool_destroy(struct pthreadpool *pool)
* We have active threads, tell them to finish, wait for that.
*/
- pool->shutdown = 1;
+ pool->shutdown = true;
if (pool->num_idle > 0) {
/*
@@ -455,7 +455,7 @@ static void *pthreadpool_server(void *arg)
clock_gettime(CLOCK_REALTIME, &ts);
ts.tv_sec += 1;
- while ((pool->num_jobs == 0) && (pool->shutdown == 0)) {
+ while ((pool->num_jobs == 0) && !pool->shutdown) {
pool->num_idle += 1;
res = pthread_cond_timedwait(
@@ -505,7 +505,7 @@ static void *pthreadpool_server(void *arg)
}
}
- if ((pool->num_jobs == 0) && (pool->shutdown != 0)) {
+ if ((pool->num_jobs == 0) && pool->shutdown) {
/*
* No more work to do and we're asked to shut down, so
* exit
--
2.7.4
>From 8dde4065befa484d6ff4aad0b00950b1135ef08e Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 9 Sep 2016 13:07:57 +0200
Subject: [PATCH 08/26] pthreadpool: Use detached threads
So far we used joinable threads. This prevented pthreadpool_destroy with
blocked threads. This patch converts pthreadpool to detached threads. Now
pthreadpool_destroy does not have to wait for the idle threads to finish, it
can immediately return. pthreadpool_destroy will tell all threads to exit, and
the last active thread will actually free(pthreadpool).
Signed-off-by: Volker Lendecke <vl at samba.org>
---
source3/lib/pthreadpool/pthreadpool.c | 191 ++++++++++++----------------------
source3/lib/pthreadpool/pthreadpool.h | 9 +-
2 files changed, 74 insertions(+), 126 deletions(-)
diff --git a/source3/lib/pthreadpool/pthreadpool.c b/source3/lib/pthreadpool/pthreadpool.c
index a306c88..a1eb924 100644
--- a/source3/lib/pthreadpool/pthreadpool.c
+++ b/source3/lib/pthreadpool/pthreadpool.c
@@ -89,12 +89,6 @@ struct pthreadpool {
* Number of idle threads
*/
int num_idle;
-
- /*
- * An array of threads that require joining.
- */
- int num_exited;
- pthread_t *exited; /* We alloc more */
};
static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
@@ -152,8 +146,6 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
pool->shutdown = false;
pool->num_threads = 0;
- pool->num_exited = 0;
- pool->exited = NULL;
pool->max_threads = max_threads;
pool->num_idle = 0;
@@ -220,11 +212,6 @@ static void pthreadpool_child(void)
pool = DLIST_PREV(pool)) {
pool->num_threads = 0;
-
- pool->num_exited = 0;
- free(pool->exited);
- pool->exited = NULL;
-
pool->num_idle = 0;
pool->head = 0;
pool->num_jobs = 0;
@@ -243,36 +230,40 @@ static void pthreadpool_prep_atfork(void)
pthreadpool_child);
}
-/*
- * Do a pthread_join() on all children that have exited, pool->mutex must be
- * locked
- */
-static void pthreadpool_join_children(struct pthreadpool *pool)
+static int pthreadpool_free(struct pthreadpool *pool)
{
- int i;
+ int ret, ret1;
- for (i=0; i<pool->num_exited; i++) {
- int ret;
+ ret = pthread_mutex_unlock(&pool->mutex);
+ assert(ret == 0);
- ret = pthread_join(pool->exited[i], NULL);
- if (ret != 0) {
- /*
- * Severe internal error, we can't do much but
- * abort here.
- */
- abort();
- }
+ ret = pthread_mutex_destroy(&pool->mutex);
+ ret1 = pthread_cond_destroy(&pool->condvar);
+
+ if (ret != 0) {
+ return ret;
+ }
+ if (ret1 != 0) {
+ return ret1;
}
- pool->num_exited = 0;
- /*
- * Deliberately not free and NULL pool->exited. That will be
- * re-used by realloc later.
- */
+ ret = pthread_mutex_lock(&pthreadpools_mutex);
+ if (ret != 0) {
+ return ret;
+ }
+ DLIST_REMOVE(pthreadpools, pool);
+ ret = pthread_mutex_unlock(&pthreadpools_mutex);
+ assert(ret == 0);
+
+ free(pool->jobs);
+ free(pool);
+
+ return 0;
}
/*
- * Destroy a thread pool, finishing all threads working for it
+ * Destroy a thread pool. Wake up all idle threads for exit. The last
+ * one will free the pool.
*/
int pthreadpool_destroy(struct pthreadpool *pool)
@@ -284,98 +275,49 @@ int pthreadpool_destroy(struct pthreadpool *pool)
return ret;
}
- if ((pool->num_jobs != 0) || pool->shutdown) {
+ if (pool->num_threads == 0) {
+ ret = pthreadpool_free(pool);
+ return ret;
+ }
+
+ if (pool->shutdown) {
ret = pthread_mutex_unlock(&pool->mutex);
assert(ret == 0);
return EBUSY;
}
- if (pool->num_threads > 0) {
- /*
- * We have active threads, tell them to finish, wait for that.
- */
-
- pool->shutdown = true;
-
- if (pool->num_idle > 0) {
- /*
- * Wake the idle threads. They will find
- * pool->shutdown to be set and exit themselves
- */
- ret = pthread_cond_broadcast(&pool->condvar);
- if (ret != 0) {
- pthread_mutex_unlock(&pool->mutex);
- return ret;
- }
- }
-
- while ((pool->num_threads > 0) || (pool->num_exited > 0)) {
-
- if (pool->num_exited > 0) {
- pthreadpool_join_children(pool);
- continue;
- }
- /*
- * A thread that shuts down will also signal
- * pool->condvar
- */
- ret = pthread_cond_wait(&pool->condvar, &pool->mutex);
- if (ret != 0) {
- pthread_mutex_unlock(&pool->mutex);
- return ret;
- }
- }
- }
+ /*
+ * We have active threads, tell them to finish.
+ */
- ret = pthread_mutex_unlock(&pool->mutex);
- if (ret != 0) {
- return ret;
- }
- ret = pthread_mutex_destroy(&pool->mutex);
- ret1 = pthread_cond_destroy(&pool->condvar);
+ pool->shutdown = true;
- if (ret != 0) {
- return ret;
- }
- if (ret1 != 0) {
- return ret1;
- }
+ ret = pthread_cond_broadcast(&pool->condvar);
- ret = pthread_mutex_lock(&pthreadpools_mutex);
- if (ret != 0) {
- return ret;
- }
- DLIST_REMOVE(pthreadpools, pool);
- ret = pthread_mutex_unlock(&pthreadpools_mutex);
- assert(ret == 0);
+ ret1 = pthread_mutex_unlock(&pool->mutex);
+ assert(ret1 == 0);
- free(pool->exited);
- free(pool->jobs);
- free(pool);
-
- return 0;
+ return ret;
}
/*
- * Prepare for pthread_exit(), pool->mutex must be locked
+ * Prepare for pthread_exit(), pool->mutex must be locked and will be
+ * unlocked here. This is a bit of a layering violation, but here we
+ * also take care of removing the pool if we're the last thread.
*/
static void pthreadpool_server_exit(struct pthreadpool *pool)
{
- pthread_t *exited;
+ int ret;
pool->num_threads -= 1;
- exited = (pthread_t *)realloc(
- pool->exited, sizeof(pthread_t) * (pool->num_exited + 1));
-
- if (exited == NULL) {
- /* lost a thread status */
+ if (pool->shutdown && (pool->num_threads == 0)) {
+ pthreadpool_free(pool);
return;
}
- pool->exited = exited;
- pool->exited[pool->num_exited] = pthread_self();
- pool->num_exited += 1;
+ ret = pthread_mutex_unlock(&pool->mutex);
+ assert(ret == 0);
}
static bool pthreadpool_get_job(struct pthreadpool *p,
@@ -470,7 +412,6 @@ static void *pthreadpool_server(void *arg)
* us. Exit.
*/
pthreadpool_server_exit(pool);
- pthread_mutex_unlock(&pool->mutex);
return NULL;
}
@@ -500,7 +441,6 @@ static void *pthreadpool_server(void *arg)
if (ret != 0) {
pthreadpool_server_exit(pool);
- pthread_mutex_unlock(&pool->mutex);
return NULL;
}
}
@@ -511,16 +451,6 @@ static void *pthreadpool_server(void *arg)
* exit
*/
pthreadpool_server_exit(pool);
-
- if (pool->num_threads == 0) {
- /*
- * Ping the main thread waiting for all of us
- * workers to have quit.
- */
- pthread_cond_broadcast(&pool->condvar);
- }
-
- pthread_mutex_unlock(&pool->mutex);
return NULL;
}
}
@@ -529,6 +459,7 @@ static void *pthreadpool_server(void *arg)
int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
void (*fn)(void *private_data), void *private_data)
{
+ pthread_attr_t thread_attr;
pthread_t thread_id;
int res;
sigset_t mask, omask;
@@ -549,11 +480,6 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
}
/*
- * Just some cleanup under the mutex
- */
- pthreadpool_join_children(pool);
-
- /*
* Add job to the end of the queue
*/
if (!pthreadpool_put_job(pool, job_id, fn, private_data)) {
@@ -585,20 +511,37 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
sigfillset(&mask);
+ res = pthread_attr_init(&thread_attr);
+ if (res != 0) {
+ pthread_mutex_unlock(&pool->mutex);
+ return res;
+ }
+
+ res = pthread_attr_setdetachstate(
+ &thread_attr, PTHREAD_CREATE_DETACHED);
+ if (res != 0) {
+ pthread_attr_destroy(&thread_attr);
+ pthread_mutex_unlock(&pool->mutex);
+ return res;
+ }
+
res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
if (res != 0) {
+ pthread_attr_destroy(&thread_attr);
pthread_mutex_unlock(&pool->mutex);
return res;
}
res = pthread_create(&thread_id, NULL, pthreadpool_server,
- (void *)pool);
+ (void *)pool);
if (res == 0) {
pool->num_threads += 1;
}
assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
+ pthread_attr_destroy(&thread_attr);
+
pthread_mutex_unlock(&pool->mutex);
return res;
}
diff --git a/source3/lib/pthreadpool/pthreadpool.h b/source3/lib/pthreadpool/pthreadpool.h
index ee9d957..defbe5a 100644
--- a/source3/lib/pthreadpool/pthreadpool.h
+++ b/source3/lib/pthreadpool/pthreadpool.h
@@ -53,8 +53,13 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
/**
* @brief Destroy a pthreadpool
*
- * Destroy a pthreadpool. If jobs are still active, this returns
- * EBUSY.
+ * Destroy a pthreadpool. If jobs are submitted, but not yet active in
+ * a thread, they won't get executed. If a job has already been
+ * submitted to a thread, the job function will continue running, and
+ * the signal function might still be called. The caller of
+ * pthreadpool_init must make sure the required resources are still
+ * around when the pool is destroyed with pending jobs. The last
+ * thread to exit will finally free() the pool memory.
*
* @param[in] pool The pool to destroy
* @return success: 0, failure: errno
--
2.7.4
>From d6c972c368be9ff4fbdc51cde2715afd0b094305 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 9 Sep 2016 15:18:41 +0200
Subject: [PATCH 09/26] pthreadpool_pipe: Implement EBUSY for _destroy
Restore EBUSY on pthreadpool_pipe_destroy.
We need to count jobs in pthreadpool_pipe so that pthreadpool can exit with
active jobs. Unfortunately this makes pthreadpool_pipe_add_job non-threadsafe.
We could add mutexes around "num_jobs", but this would mean another set of
pthread_atfork functions. As we don't use threaded pthreadpool_pipe_add_job
except in the tests, just remove the tests...
Signed-off-by: Volker Lendecke <vl at samba.org>
---
source3/lib/pthreadpool/pthreadpool_pipe.c | 28 +++-
source3/lib/pthreadpool/tests.c | 220 ++---------------------------
2 files changed, 38 insertions(+), 210 deletions(-)
diff --git a/source3/lib/pthreadpool/pthreadpool_pipe.c b/source3/lib/pthreadpool/pthreadpool_pipe.c
index f7995ab..d6d519a 100644
--- a/source3/lib/pthreadpool/pthreadpool_pipe.c
+++ b/source3/lib/pthreadpool/pthreadpool_pipe.c
@@ -24,6 +24,7 @@
struct pthreadpool_pipe {
struct pthreadpool *pool;
+ int num_jobs;
pid_t pid;
int pipe_fds[2];
};
@@ -39,7 +40,7 @@ int pthreadpool_pipe_init(unsigned max_threads,
struct pthreadpool_pipe *pool;
int ret;
- pool = malloc(sizeof(struct pthreadpool_pipe));
+ pool = calloc(1, sizeof(struct pthreadpool_pipe));
if (pool == NULL) {
return ENOMEM;
}
@@ -88,6 +89,10 @@ int pthreadpool_pipe_destroy(struct pthreadpool_pipe *pool)
{
int ret;
+ if (pool->num_jobs != 0) {
+ return EBUSY;
+ }
+
ret = pthreadpool_destroy(pool->pool);
if (ret != 0) {
return ret;
@@ -132,6 +137,7 @@ static int pthreadpool_pipe_reinit(struct pthreadpool_pipe *pool)
}
pool->pipe_fds[0] = signal_fd;
+ pool->num_jobs = 0;
return 0;
}
@@ -148,7 +154,13 @@ int pthreadpool_pipe_add_job(struct pthreadpool_pipe *pool, int job_id,
}
ret = pthreadpool_add_job(pool->pool, job_id, fn, private_data);
- return ret;
+ if (ret != 0) {
+ return ret;
+ }
+
+ pool->num_jobs += 1;
+
+ return 0;
}
int pthreadpool_pipe_signal_fd(struct pthreadpool_pipe *pool)
@@ -159,7 +171,7 @@ int pthreadpool_pipe_signal_fd(struct pthreadpool_pipe *pool)
int pthreadpool_pipe_finished_jobs(struct pthreadpool_pipe *pool, int *jobids,
unsigned num_jobids)
{
- ssize_t to_read, nread;
+ ssize_t to_read, nread, num_jobs;
pid_t pid = getpid();
if (pool->pid != pid) {
@@ -178,5 +190,13 @@ int pthreadpool_pipe_finished_jobs(struct pthreadpool_pipe *pool, int *jobids,
if ((nread % sizeof(int)) != 0) {
return -EINVAL;
}
- return nread / sizeof(int);
+
+ num_jobs = nread / sizeof(int);
+
+ if (num_jobs > pool->num_jobs) {
+ return -EINVAL;
+ }
+ pool->num_jobs -= num_jobs;
+
+ return num_jobs;
}
diff --git a/source3/lib/pthreadpool/tests.c b/source3/lib/pthreadpool/tests.c
index 0b48b41..933808e 100644
--- a/source3/lib/pthreadpool/tests.c
+++ b/source3/lib/pthreadpool/tests.c
@@ -22,7 +22,7 @@ static int test_init(void)
}
ret = pthreadpool_pipe_destroy(p);
if (ret != 0) {
- fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
+ fprintf(stderr, "pthreadpool_pipe_destroy failed: %s\n",
strerror(ret));
return -1;
}
@@ -72,6 +72,11 @@ static int test_jobs(int num_threads, int num_jobs)
for (i=0; i<num_jobs; i++) {
int jobid = -1;
ret = pthreadpool_pipe_finished_jobs(p, &jobid, 1);
+ if (ret < 0) {
+ fprintf(stderr, "pthreadpool_pipe_finished_jobs "
+ "failed: %s\n", strerror(-ret));
+ return -1;
+ }
if ((ret != 1) || (jobid >= num_jobs)) {
fprintf(stderr, "invalid job number %d\n", jobid);
return -1;
@@ -103,7 +108,7 @@ static int test_busydestroy(void)
struct pthreadpool_pipe *p;
int timeout = 50;
struct pollfd pfd;
- int ret;
+ int ret, jobid;
ret = pthreadpool_pipe_init(1, &p);
if (ret != 0) {
@@ -128,6 +133,13 @@ static int test_busydestroy(void)
poll(&pfd, 1, -1);
+ ret = pthreadpool_pipe_finished_jobs(p, &jobid, 1);
+ if (ret < 0) {
+ fprintf(stderr, "pthreadpool_pipe_finished_jobs failed: %s\n",
+ strerror(-ret));
+ return -1;
+ }
+
ret = pthreadpool_pipe_destroy(p);
if (ret != 0) {
fprintf(stderr, "pthreadpool_pipe_destroy failed: %s\n",
@@ -137,191 +149,6 @@ static int test_busydestroy(void)
return 0;
}
-struct threaded_state {
- pthread_t tid;
- struct pthreadpool_pipe *p;
- int start_job;
- int num_jobs;
- int timeout;
-};
-
-static void *test_threaded_worker(void *p)
-{
- struct threaded_state *state = (struct threaded_state *)p;
- int i;
-
- for (i=0; i<state->num_jobs; i++) {
- int ret = pthreadpool_pipe_add_job(
- state->p, state->start_job + i,
- test_sleep, &state->timeout);
- if (ret != 0) {
- fprintf(stderr, "pthreadpool_pipe_add_job failed: "
- "%s\n", strerror(ret));
- return NULL;
- }
- }
- return NULL;
-}
-
-static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
- int num_jobs)
-{
- struct pthreadpool_pipe **pools;
- struct threaded_state *states;
- struct threaded_state *state;
- struct pollfd *pfds;
- char *finished;
- pid_t child;
- int i, ret, poolnum;
- int received;
-
- states = calloc(num_threads, sizeof(struct threaded_state));
- if (states == NULL) {
- fprintf(stderr, "calloc failed\n");
- return -1;
- }
-
- finished = calloc(num_threads * num_jobs, 1);
- if (finished == NULL) {
- fprintf(stderr, "calloc failed\n");
- return -1;
- }
-
- pools = calloc(num_pools, sizeof(struct pthreadpool_pipe *));
- if (pools == NULL) {
- fprintf(stderr, "calloc failed\n");
- return -1;
- }
-
- pfds = calloc(num_pools, sizeof(struct pollfd));
- if (pfds == NULL) {
- fprintf(stderr, "calloc failed\n");
- return -1;
- }
-
- for (i=0; i<num_pools; i++) {
- ret = pthreadpool_pipe_init(poolsize, &pools[i]);
- if (ret != 0) {
- fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
- strerror(ret));
- return -1;
- }
- pfds[i].fd = pthreadpool_pipe_signal_fd(pools[i]);
- pfds[i].events = POLLIN|POLLHUP;
- }
-
- poolnum = 0;
-
- for (i=0; i<num_threads; i++) {
- state = &states[i];
-
- state->p = pools[poolnum];
- poolnum = (poolnum + 1) % num_pools;
-
- state->num_jobs = num_jobs;
- state->timeout = 1;
- state->start_job = i * num_jobs;
-
- ret = pthread_create(&state->tid, NULL, test_threaded_worker,
- state);
- if (ret != 0) {
- fprintf(stderr, "pthread_create failed: %s\n",
- strerror(ret));
- return -1;
- }
- }
-
- if (random() % 1) {
- poll(NULL, 0, 1);
- }
-
- child = fork();
- if (child < 0) {
- fprintf(stderr, "fork failed: %s\n", strerror(errno));
- return -1;
- }
- if (child == 0) {
- for (i=0; i<num_pools; i++) {
- ret = pthreadpool_pipe_destroy(pools[i]);
- if (ret != 0) {
- fprintf(stderr, "pthreadpool_pipe_destroy "
- "failed: %s\n", strerror(ret));
- exit(1);
- }
- }
- /* child */
- exit(0);
- }
-
- for (i=0; i<num_threads; i++) {
- ret = pthread_join(states[i].tid, NULL);
- if (ret != 0) {
- fprintf(stderr, "pthread_join(%d) failed: %s\n",
- i, strerror(ret));
- return -1;
- }
- }
-
- received = 0;
-
- while (received < num_threads*num_jobs) {
- int j;
-
- ret = poll(pfds, num_pools, 1000);
- if (ret == -1) {
- fprintf(stderr, "poll failed: %s\n",
- strerror(errno));
- return -1;
- }
- if (ret == 0) {
- fprintf(stderr, "\npoll timed out\n");
- break;
- }
-
- for (j=0; j<num_pools; j++) {
- int jobid = -1;
-
- if ((pfds[j].revents & (POLLIN|POLLHUP)) == 0) {
- continue;
- }
-
- ret = pthreadpool_pipe_finished_jobs(
- pools[j], &jobid, 1);
- if ((ret != 1) || (jobid >= num_jobs * num_threads)) {
- fprintf(stderr, "invalid job number %d\n",
- jobid);
- return -1;
- }
- finished[jobid] += 1;
- received += 1;
- }
- }
-
- for (i=0; i<num_threads*num_jobs; i++) {
- if (finished[i] != 1) {
- fprintf(stderr, "finished[%d] = %d\n",
- i, finished[i]);
- return -1;
- }
- }
-
- for (i=0; i<num_pools; i++) {
- ret = pthreadpool_pipe_destroy(pools[i]);
- if (ret != 0) {
- fprintf(stderr, "pthreadpool_pipe_destroy failed: "
- "%s\n", strerror(ret));
- return -1;
- }
- }
-
- free(pfds);
- free(pools);
- free(states);
- free(finished);
-
- return 0;
-}
-
static int test_fork(void)
{
struct pthreadpool_pipe *p;
@@ -390,25 +217,6 @@ int main(void)
return 1;
}
- /*
- * Test 10 threads adding jobs on a single pool
- */
- ret = test_threaded_addjob(1, 10, 5, 5000);
- if (ret != 0) {
- fprintf(stderr, "test_jobs failed\n");
- return 1;
- }
-
- /*
- * Test 10 threads on 3 pools to verify our fork handling
- * works right.
- */
- ret = test_threaded_addjob(3, 10, 5, 5000);
- if (ret != 0) {
- fprintf(stderr, "test_jobs failed\n");
- return 1;
- }
-
printf("success\n");
return 0;
}
--
2.7.4
>From 73b0763246aec61cc3283a37c184794678d17a44 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 9 Sep 2016 13:27:13 +0200
Subject: [PATCH 10/26] pthreadpool_tevent: Move the
pthreadpool_tevent_job_state declaration
Signed-off-by: Volker Lendecke <vl at samba.org>
---
source3/lib/pthreadpool/pthreadpool_tevent.c | 24 ++++++++++++------------
1 file changed, 12 insertions(+), 12 deletions(-)
diff --git a/source3/lib/pthreadpool/pthreadpool_tevent.c b/source3/lib/pthreadpool/pthreadpool_tevent.c
index 0b7a55a..e4efc2d 100644
--- a/source3/lib/pthreadpool/pthreadpool_tevent.c
+++ b/source3/lib/pthreadpool/pthreadpool_tevent.c
@@ -31,6 +31,18 @@ struct pthreadpool_tevent {
struct pthreadpool_tevent_job_state *jobs;
};
+struct pthreadpool_tevent_job_state {
+ struct pthreadpool_tevent_job_state *prev, *next;
+ struct pthreadpool_tevent *pool;
+ struct tevent_context *ev;
+ struct tevent_threaded_context *tctx;
+ struct tevent_immediate *im;
+ struct tevent_req *req;
+
+ void (*fn)(void *private_data);
+ void *private_data;
+};
+
static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool);
static int pthreadpool_tevent_job_signal(int jobid,
@@ -79,18 +91,6 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
return 0;
}
-struct pthreadpool_tevent_job_state {
- struct pthreadpool_tevent_job_state *prev, *next;
- struct pthreadpool_tevent *pool;
- struct tevent_context *ev;
- struct tevent_threaded_context *tctx;
- struct tevent_immediate *im;
- struct tevent_req *req;
-
- void (*fn)(void *private_data);
- void *private_data;
-};
-
static void pthreadpool_tevent_job_fn(void *private_data);
static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
struct tevent_immediate *im,
--
2.7.4
>From 4a2b562e5d780540f8e05d436690e7d0a8932562 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 9 Sep 2016 13:28:51 +0200
Subject: [PATCH 11/26] pthreadpool_tevent: Drop running jobs on talloc_free
Enable us to destroy a pthreadpool_tevent structure with active jobs
Signed-off-by: Volker Lendecke <vl at samba.org>
---
source3/lib/pthreadpool/pthreadpool_tevent.c | 15 ++++++++++-----
1 file changed, 10 insertions(+), 5 deletions(-)
diff --git a/source3/lib/pthreadpool/pthreadpool_tevent.c b/source3/lib/pthreadpool/pthreadpool_tevent.c
index e4efc2d..253a867 100644
--- a/source3/lib/pthreadpool/pthreadpool_tevent.c
+++ b/source3/lib/pthreadpool/pthreadpool_tevent.c
@@ -76,6 +76,7 @@ int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
{
+ struct pthreadpool_tevent_job_state *state, *next;
int ret;
ret = pthreadpool_destroy(pool->pool);
@@ -84,8 +85,10 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
}
pool->pool = NULL;
- if (pool->jobs != NULL) {
- abort();
+ for (state = pool->jobs; state != NULL; state = next) {
+ next = state->next;
+ DLIST_REMOVE(pool->jobs, state);
+ state->pool = NULL;
}
return 0;
@@ -114,7 +117,7 @@ static int pthreadpool_tevent_job_state_destructor(
/*
* We need to reparent to a long term context.
*/
- (void)talloc_reparent(state->req, state->pool, state);
+ (void)talloc_reparent(state->req, NULL, state);
state->req = NULL;
return -1;
}
@@ -214,8 +217,10 @@ static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
struct pthreadpool_tevent_job_state *state = talloc_get_type_abort(
private_data, struct pthreadpool_tevent_job_state);
- DLIST_REMOVE(state->pool->jobs, state);
- state->pool = NULL;
+ if (state->pool != NULL) {
+ DLIST_REMOVE(state->pool->jobs, state);
+ state->pool = NULL;
+ }
TALLOC_FREE(state->tctx);
--
2.7.4
>From ca14b6f11da93fba5a36718257b7a16311893265 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 9 Sep 2016 16:42:05 +0200
Subject: [PATCH 12/26] pthreadpool: Add a small test for pthreadpool_tevent
Signed-off-by: Volker Lendecke <vl at samba.org>
---
source3/lib/pthreadpool/tests.c | 82 +++++++++++++++++++++++++++++++++++++++++
1 file changed, 82 insertions(+)
diff --git a/source3/lib/pthreadpool/tests.c b/source3/lib/pthreadpool/tests.c
index 933808e..4d211f2 100644
--- a/source3/lib/pthreadpool/tests.c
+++ b/source3/lib/pthreadpool/tests.c
@@ -8,6 +8,7 @@
#include <sys/types.h>
#include <sys/wait.h>
#include "pthreadpool_pipe.h"
+#include "pthreadpool_tevent.h"
static int test_init(void)
{
@@ -189,10 +190,91 @@ static int test_fork(void)
return 0;
}
+static void test_tevent_wait(void *private_data)
+{
+ int *timeout = private_data;
+ poll(NULL, 0, *timeout);
+}
+
+static int test_tevent_1(void)
+{
+ struct tevent_context *ev;
+ struct pthreadpool_tevent *pool;
+ struct tevent_req *req1, *req2;
+ int timeout10 = 10;
+ int timeout100 = 100;
+ int ret;
+ bool ok;
+
+ ev = tevent_context_init(NULL);
+ if (ev == NULL) {
+ ret = errno;
+ fprintf(stderr, "tevent_context_init failed: %s\n",
+ strerror(ret));
+ return ret;
+ }
+ ret = pthreadpool_tevent_init(ev, 0, &pool);
+ if (ret != 0) {
+ fprintf(stderr, "pthreadpool_tevent_init failed: %s\n",
+ strerror(ret));
+ TALLOC_FREE(ev);
+ return ret;
+ }
+ req1 = pthreadpool_tevent_job_send(
+ ev, ev, pool, test_tevent_wait, &timeout10);
+ if (req1 == NULL) {
+ fprintf(stderr, "pthreadpool_tevent_job_send failed\n");
+ TALLOC_FREE(ev);
+ return ENOMEM;
+ }
+ req2 = pthreadpool_tevent_job_send(
+ ev, ev, pool, test_tevent_wait, &timeout100);
+ if (req2 == NULL) {
+ fprintf(stderr, "pthreadpool_tevent_job_send failed\n");
+ TALLOC_FREE(ev);
+ return ENOMEM;
+ }
+ ok = tevent_req_poll(req2, ev);
+ if (!ok) {
+ ret = errno;
+ fprintf(stderr, "tevent_req_poll failed: %s\n",
+ strerror(ret));
+ TALLOC_FREE(ev);
+ return ret;
+ }
+ ret = pthreadpool_tevent_job_recv(req1);
+ TALLOC_FREE(req1);
+ if (ret != 0) {
+ fprintf(stderr, "tevent_req_poll failed: %s\n",
+ strerror(ret));
+ TALLOC_FREE(ev);
+ return ret;
+ }
+
+ TALLOC_FREE(req2);
+
+ ret = tevent_loop_wait(ev);
+ if (ret != 0) {
+ fprintf(stderr, "tevent_loop_wait failed\n");
+ return ret;
+ }
+
+ TALLOC_FREE(pool);
+ TALLOC_FREE(ev);
+ return 0;
+}
+
int main(void)
{
int ret;
+ ret = test_tevent_1();
+ if (ret != 0) {
+ fprintf(stderr, "test_event_1 failed: %s\n",
+ strerror(ret));
+ return 1;
+ }
+
ret = test_init();
if (ret != 0) {
fprintf(stderr, "test_init failed\n");
--
2.7.4
>From 573cd2e47ef7df02c4501a4e64ffaa38c4a3a9e0 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 9 Sep 2016 16:51:00 +0200
Subject: [PATCH 13/26] messages_dgm: Convert to pthreadpool_tevent
This itself adds a lot of code, however it removes the unix_msg library.
Signed-off-by: Volker Lendecke <vl at samba.org>
---
source3/lib/messages_dgm.c | 937 ++++++++++++++++++++++++++++++++++++++++++---
source3/wscript_build | 5 +-
2 files changed, 884 insertions(+), 58 deletions(-)
diff --git a/source3/lib/messages_dgm.c b/source3/lib/messages_dgm.c
index 3aa110c..5f82168 100644
--- a/source3/lib/messages_dgm.c
+++ b/source3/lib/messages_dgm.c
@@ -21,11 +21,18 @@
#include "system/network.h"
#include "system/filesys.h"
#include "system/dir.h"
+#include "system/select.h"
#include "lib/util/debug.h"
-#include "lib/unix_msg/unix_msg.h"
#include "lib/messages_dgm.h"
-#include "poll_funcs/poll_funcs_tevent.h"
#include "lib/util/genrand.h"
+#include "lib/util/dlinklist.h"
+#include "lib/pthreadpool/pthreadpool_tevent.h"
+#include "lib/util/msghdr.h"
+#include "lib/util/iov_buf.h"
+#include "lib/util/blocking.h"
+#include "lib/util/tevent_unix.h"
+
+#define MESSAGING_DGM_FRAGMENT_LENGTH 1024
struct sun_path_buf {
/*
@@ -34,15 +41,43 @@ struct sun_path_buf {
char buf[sizeof(struct sockaddr_un)];
};
+struct messaging_dgm_context;
+
+struct messaging_dgm_out {
+ struct messaging_dgm_out *prev, *next;
+ struct messaging_dgm_context *ctx;
+
+ pid_t pid;
+ int sock;
+ bool is_blocking;
+ uint64_t cookie;
+
+ struct tevent_queue *queue;
+ struct tevent_timer *idle_timer;
+};
+
+struct messaging_dgm_in_msg {
+ struct messaging_dgm_in_msg *prev, *next;
+ struct messaging_dgm_context *ctx;
+ size_t msglen;
+ size_t received;
+ pid_t sender_pid;
+ int sender_sock;
+ uint64_t cookie;
+ uint8_t buf[];
+};
+
struct messaging_dgm_context {
+ struct tevent_context *ev;
pid_t pid;
- struct poll_funcs *msg_callbacks;
- void *tevent_handle;
- struct unix_msg_ctx *dgm_ctx;
struct sun_path_buf socket_dir;
struct sun_path_buf lockfile_dir;
int lockfile_fd;
+ int sock;
+ struct tevent_fd *read_fde;
+ struct messaging_dgm_in_msg *in_msgs;
+
void (*recv_cb)(const uint8_t *msg,
size_t msg_len,
int *fds,
@@ -51,14 +86,615 @@ struct messaging_dgm_context {
void *recv_cb_private_data;
bool *have_dgm_context;
+
+ struct pthreadpool_tevent *pool;
+ struct messaging_dgm_out *outsocks;
};
-static struct messaging_dgm_context *global_dgm_context;
+/* Set socket close on exec. */
+static int prepare_socket_cloexec(int sock)
+{
+#ifdef FD_CLOEXEC
+ int flags;
-static void messaging_dgm_recv(struct unix_msg_ctx *ctx,
- uint8_t *msg, size_t msg_len,
- int *fds, size_t num_fds,
- void *private_data);
+ flags = fcntl(sock, F_GETFD, 0);
+ if (flags == -1) {
+ return errno;
+ }
+ flags |= FD_CLOEXEC;
+ if (fcntl(sock, F_SETFD, flags) == -1) {
+ return errno;
+ }
+#endif
+ return 0;
+}
+
+static void close_fd_array(int *fds, size_t num_fds)
+{
+ size_t i;
+
+ for (i = 0; i < num_fds; i++) {
+ if (fds[i] == -1) {
+ continue;
+ }
+
+ close(fds[i]);
+ fds[i] = -1;
+ }
+}
+
+static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
+ struct tevent_timer *te,
+ struct timeval current_time,
+ void *private_data)
+{
+ struct messaging_dgm_out *out = talloc_get_type_abort(
+ private_data, struct messaging_dgm_out);
+ size_t qlen;
+
+ out->idle_timer = NULL;
+
+ qlen = tevent_queue_length(out->queue);
+ if (qlen == 0) {
+ TALLOC_FREE(out);
+ }
+}
+
+static void messaging_dgm_out_rearm_idle_timer(struct messaging_dgm_out *out)
+{
+ size_t qlen;
+
+ qlen = tevent_queue_length(out->queue);
+ if (qlen != 0) {
+ TALLOC_FREE(out->idle_timer);
+ return;
+ }
+
+ if (out->idle_timer != NULL) {
+ tevent_update_timer(out->idle_timer,
+ tevent_timeval_current_ofs(1, 0));
+ return;
+ }
+
+ out->idle_timer = tevent_add_timer(
+ out->ctx->ev, out, tevent_timeval_current_ofs(1, 0),
+ messaging_dgm_out_idle_handler, out);
+ /*
+ * No NULL check, we'll come back here. Worst case we're
+ * leaking a bit.
+ */
+}
+
+static int messaging_dgm_out_destructor(struct messaging_dgm_out *dst);
+static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
+ struct tevent_timer *te,
+ struct timeval current_time,
+ void *private_data);
+
+static int messaging_dgm_out_create(TALLOC_CTX *mem_ctx,
+ struct messaging_dgm_context *ctx,
+ pid_t pid, struct messaging_dgm_out **pout)
+{
+ struct messaging_dgm_out *out;
+ struct sockaddr_un addr = { .sun_family = AF_UNIX };
+ int ret = ENOMEM;
+ int out_pathlen;
+
+ out = talloc(mem_ctx, struct messaging_dgm_out);
+ if (out == NULL) {
+ goto fail;
+ }
+
+ *out = (struct messaging_dgm_out) {
+ .pid = pid,
+ .ctx = ctx,
+ .cookie = 1
+ };
+
+ out_pathlen = snprintf(addr.sun_path, sizeof(addr.sun_path),
+ "%s/%u", ctx->socket_dir.buf, (unsigned)pid);
+ if (out_pathlen < 0) {
+ goto errno_fail;
+ }
+ if ((size_t)out_pathlen >= sizeof(addr.sun_path)) {
+ ret = ENAMETOOLONG;
+ goto fail;
+ }
+
+ out->queue = tevent_queue_create(out, addr.sun_path);
+ if (out->queue == NULL) {
+ ret = ENOMEM;
+ goto fail;
+ }
+
+ out->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
+ if (out->sock == -1) {
+ goto errno_fail;
+ }
+
+ DLIST_ADD(ctx->outsocks, out);
+ talloc_set_destructor(out, messaging_dgm_out_destructor);
+
+ do {
+ ret = connect(out->sock,
+ (const struct sockaddr *)(const void *)&addr,
+ sizeof(addr));
+ } while ((ret == -1) && (errno == EINTR));
+
+ if (ret == -1) {
+ goto errno_fail;
+ }
+
+ ret = set_blocking(out->sock, false);
+ if (ret == -1) {
+ goto errno_fail;
+ }
+ out->is_blocking = false;
+
+ *pout = out;
+ return 0;
+errno_fail:
+ ret = errno;
+fail:
+ TALLOC_FREE(out);
+ return ret;
+}
+
+static int messaging_dgm_out_destructor(struct messaging_dgm_out *out)
+{
+ DLIST_REMOVE(out->ctx->outsocks, out);
+
+ if (tevent_queue_length(out->queue) != 0) {
+ /*
+ * We have pending jobs. We can't close the socket,
+ * this has been handed over to messaging_dgm_out_queue_state.
+ */
+ return 0;
+ }
+
+ if (out->sock != -1) {
+ close(out->sock);
+ out->sock = -1;
+ }
+ return 0;
+}
+
+static int messaging_dgm_out_get(struct messaging_dgm_context *ctx, pid_t pid,
+ struct messaging_dgm_out **pout)
+{
+ struct messaging_dgm_out *out;
+ int ret;
+
+ for (out = ctx->outsocks; out != NULL; out = out->next) {
+ if (out->pid == pid) {
+ break;
+ }
+ }
+
+ if (out == NULL) {
+ ret = messaging_dgm_out_create(ctx, ctx, pid, &out);
+ if (ret != 0) {
+ return ret;
+ }
+ }
+
+ messaging_dgm_out_rearm_idle_timer(out);
+
+ *pout = out;
+ return 0;
+}
+
+static ssize_t messaging_dgm_sendmsg(int sock,
+ const struct iovec *iov, int iovlen,
+ const int *fds, size_t num_fds,
+ int *perrno)
+{
+ struct msghdr msg;
+ ssize_t fdlen, ret;
+
+ /*
+ * Do the actual sendmsg syscall. This will be called from a
+ * pthreadpool helper thread, so be careful what you do here.
+ */
+
+ msg = (struct msghdr) {
+ .msg_iov = discard_const_p(struct iovec, iov),
+ .msg_iovlen = iovlen
+ };
+
+ fdlen = msghdr_prep_fds(&msg, NULL, 0, fds, num_fds);
+ if (fdlen == -1) {
+ *perrno = EINVAL;
+ return -1;
+ }
+
+ {
+ uint8_t buf[fdlen];
+
+ msghdr_prep_fds(&msg, buf, fdlen, fds, num_fds);
+
+ do {
+ ret = sendmsg(sock, &msg, MSG_NOSIGNAL);
+ } while ((ret == -1) && (errno == EINTR));
+ }
+
+ if (ret == -1) {
+ *perrno = errno;
+ }
+ return ret;
+}
+
+struct messaging_dgm_out_queue_state {
+ struct tevent_context *ev;
+ struct pthreadpool_tevent *pool;
+
+ struct tevent_req *req;
+ struct tevent_req *subreq;
+
+ int sock;
+
+ int *fds;
+ uint8_t *buf;
+
+ ssize_t sent;
+ int err;
+};
+
+static int messaging_dgm_out_queue_state_destructor(
+ struct messaging_dgm_out_queue_state *state);
+static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
+ void *private_data);
+static void messaging_dgm_out_threaded_job(void *private_data);
+static void messaging_dgm_out_queue_done(struct tevent_req *subreq);
+
+static struct tevent_req *messaging_dgm_out_queue_send(
+ TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+ struct messaging_dgm_out *out,
+ const struct iovec *iov, int iovlen, const int *fds, size_t num_fds)
+{
+ struct tevent_req *req;
+ struct messaging_dgm_out_queue_state *state;
+ struct tevent_queue_entry *e;
+ size_t i;
+ ssize_t buflen;
+
+ req = tevent_req_create(out, &state,
+ struct messaging_dgm_out_queue_state);
+ if (req == NULL) {
+ return NULL;
+ }
+ state->ev = ev;
+ state->pool = out->ctx->pool;
+ state->sock = out->sock;
+ state->req = req;
+
+ /*
+ * Go blocking in a thread
+ */
+ if (!out->is_blocking) {
+ int ret = set_blocking(out->sock, true);
+ if (ret == -1) {
+ tevent_req_error(req, errno);
+ return tevent_req_post(req, ev);
+ }
+ out->is_blocking = true;
+ }
+
+ buflen = iov_buflen(iov, iovlen);
+ if (buflen == -1) {
+ tevent_req_error(req, EMSGSIZE);
+ return tevent_req_post(req, ev);
+ }
+
+ state->buf = talloc_array(state, uint8_t, buflen);
+ if (tevent_req_nomem(state->buf, req)) {
+ return tevent_req_post(req, ev);
+ }
+ iov_buf(iov, iovlen, state->buf, buflen);
+
+ state->fds = talloc_array(state, int, num_fds);
+ if (tevent_req_nomem(state->fds, req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ for (i=0; i<num_fds; i++) {
+ state->fds[i] = -1;
+ }
+
+ for (i=0; i<num_fds; i++) {
+
+ state->fds[i] = dup(fds[i]);
+
+ if (state->fds[i] == -1) {
+ int ret = errno;
+
+ close_fd_array(state->fds, num_fds);
+
+ tevent_req_error(req, ret);
+ return tevent_req_post(req, ev);
+ }
+ }
+
+ talloc_set_destructor(state, messaging_dgm_out_queue_state_destructor);
+
+ e = tevent_queue_add_entry(out->queue, ev, req,
+ messaging_dgm_out_queue_trigger, req);
+ if (tevent_req_nomem(e, req)) {
+ return tevent_req_post(req, ev);
+ }
+ return req;
+}
+
+static int messaging_dgm_out_queue_state_destructor(
+ struct messaging_dgm_out_queue_state *state)
+{
+ int *fds;
+ size_t num_fds;
+
+ if (state->subreq != NULL) {
+ /*
+ * We're scheduled, but we're destroyed. This happens
+ * if the messaging_dgm_context is destroyed while
+ * we're stuck in a blocking send. There's nothing we
+ * can do but to leak memory.
+ */
+ TALLOC_FREE(state->subreq);
+ (void)talloc_reparent(state->req, NULL, state);
+ return -1;
+ }
+
+ fds = state->fds;
+ num_fds = talloc_array_length(fds);
+ close_fd_array(fds, num_fds);
+ return 0;
+}
+
+static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
+ void *private_data)
+{
+ struct messaging_dgm_out_queue_state *state = tevent_req_data(
+ req, struct messaging_dgm_out_queue_state);
+
+ state->subreq = pthreadpool_tevent_job_send(
+ state, state->ev, state->pool,
+ messaging_dgm_out_threaded_job, state);
+ if (tevent_req_nomem(state->subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(state->subreq, messaging_dgm_out_queue_done,
+ req);
+}
+
+static void messaging_dgm_out_threaded_job(void *private_data)
+{
+ struct messaging_dgm_out_queue_state *state = talloc_get_type_abort(
+ private_data, struct messaging_dgm_out_queue_state);
+
+ struct iovec iov = { .iov_base = state->buf,
+ .iov_len = talloc_get_size(state->buf) };
+ size_t num_fds = talloc_array_length(state->fds);
+
+ state->sent = messaging_dgm_sendmsg(state->sock, &iov, 1,
+ state->fds, num_fds, &state->err);
+}
+
+static void messaging_dgm_out_queue_done(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(
+ subreq, struct tevent_req);
+ struct messaging_dgm_out_queue_state *state = tevent_req_data(
+ req, struct messaging_dgm_out_queue_state);
+ int ret;
+
+ if (subreq != state->subreq) {
+ abort();
+ }
+
+ ret = pthreadpool_tevent_job_recv(subreq);
+
+ TALLOC_FREE(subreq);
+ state->subreq = NULL;
+
+ if (tevent_req_error(req, ret)) {
+ return;
+ }
+ if (state->sent == -1) {
+ tevent_req_error(req, state->err);
+ return;
+ }
+ tevent_req_done(req);
+}
+
+static int messaging_dgm_out_queue_recv(struct tevent_req *req)
+{
+ return tevent_req_simple_recv_unix(req);
+}
+
+static void messaging_dgm_out_sent_fragment(struct tevent_req *req);
+
+static int messaging_dgm_out_send_fragment(
+ struct tevent_context *ev, struct messaging_dgm_out *out,
+ const struct iovec *iov, int iovlen, const int *fds, size_t num_fds)
+{
+ struct tevent_req *req;
+ size_t qlen;
+
+ qlen = tevent_queue_length(out->queue);
+ if (qlen == 0) {
+ ssize_t nsent;
+ int err = 0;
+
+ if (out->is_blocking) {
+ int ret = set_blocking(out->sock, false);
+ if (ret == -1) {
+ return errno;
+ }
+ out->is_blocking = false;
+ }
+
+ nsent = messaging_dgm_sendmsg(out->sock, iov, iovlen, fds,
+ num_fds, &err);
+ if (nsent >= 0) {
+ return 0;
+ }
+
+ if (err != EWOULDBLOCK) {
+ return err;
+ }
+ }
+
+ req = messaging_dgm_out_queue_send(out, ev, out, iov, iovlen,
+ fds, num_fds);
+ if (req == NULL) {
+ return ENOMEM;
+ }
+ tevent_req_set_callback(req, messaging_dgm_out_sent_fragment, out);
+
+ return 0;
+}
+
+static void messaging_dgm_out_sent_fragment(struct tevent_req *req)
+{
+ struct messaging_dgm_out *out = tevent_req_callback_data(
+ req, struct messaging_dgm_out);
+ int ret;
+
+ ret = messaging_dgm_out_queue_recv(req);
+ TALLOC_FREE(req);
+
+ if (ret != 0) {
+ DBG_WARNING("messaging_out_queue_recv returned %s\n",
+ strerror(ret));
+ }
+
+ messaging_dgm_out_rearm_idle_timer(out);
+}
+
+
+struct messaging_dgm_fragment_hdr {
+ size_t msglen;
+ pid_t pid;
+ int sock;
+};
+
+static int messaging_dgm_out_send_fragmented(struct tevent_context *ev,
+ struct messaging_dgm_out *out,
+ const struct iovec *iov,
+ int iovlen,
+ const int *fds, size_t num_fds)
+{
+ ssize_t msglen, sent;
+ int ret = 0;
+ struct iovec iov_copy[iovlen+2];
+ struct messaging_dgm_fragment_hdr hdr;
+ struct iovec src_iov;
+
+ if (iovlen < 0) {
+ return EINVAL;
+ }
+
+ msglen = iov_buflen(iov, iovlen);
+ if (msglen == -1) {
+ return EMSGSIZE;
+ }
+ if (num_fds > INT8_MAX) {
+ return EINVAL;
+ }
+
+ if ((size_t) msglen <=
+ (MESSAGING_DGM_FRAGMENT_LENGTH - sizeof(uint64_t))) {
+ uint64_t cookie = 0;
+
+ iov_copy[0].iov_base = &cookie;
+ iov_copy[0].iov_len = sizeof(cookie);
+ if (iovlen > 0) {
+ memcpy(&iov_copy[1], iov,
+ sizeof(struct iovec) * iovlen);
+ }
+
+ return messaging_dgm_out_send_fragment(
+ ev, out, iov_copy, iovlen+1, fds, num_fds);
+
+ }
+
+ hdr = (struct messaging_dgm_fragment_hdr) {
+ .msglen = msglen,
+ .pid = getpid(),
+ .sock = out->sock
+ };
+
+ iov_copy[0].iov_base = &out->cookie;
+ iov_copy[0].iov_len = sizeof(out->cookie);
+ iov_copy[1].iov_base = &hdr;
+ iov_copy[1].iov_len = sizeof(hdr);
+
+ sent = 0;
+ src_iov = iov[0];
+
+ /*
+ * The following write loop sends the user message in pieces. We have
+ * filled the first two iovecs above with "cookie" and "hdr". In the
+ * following loops we pull message chunks from the user iov array and
+ * fill iov_copy piece by piece, possibly truncating chunks from the
+ * caller's iov array. Ugly, but hopefully efficient.
+ */
+
+ while (sent < msglen) {
+ size_t fragment_len;
+ size_t iov_index = 2;
+
+ fragment_len = sizeof(out->cookie) + sizeof(hdr);
+
+ while (fragment_len < MESSAGING_DGM_FRAGMENT_LENGTH) {
+ size_t space, chunk;
+
+ space = MESSAGING_DGM_FRAGMENT_LENGTH - fragment_len;
+ chunk = MIN(space, src_iov.iov_len);
+
+ iov_copy[iov_index].iov_base = src_iov.iov_base;
+ iov_copy[iov_index].iov_len = chunk;
+ iov_index += 1;
+
+ src_iov.iov_base = (char *)src_iov.iov_base + chunk;
+ src_iov.iov_len -= chunk;
+ fragment_len += chunk;
+
+ if (src_iov.iov_len == 0) {
+ iov += 1;
+ iovlen -= 1;
+ if (iovlen == 0) {
+ break;
+ }
+ src_iov = iov[0];
+ }
+ }
+ sent += (fragment_len - sizeof(out->cookie) - sizeof(hdr));
+
+ /*
+ * only the last fragment should pass the fd array.
+ * That simplifies the receiver a lot.
+ */
+ if (sent < msglen) {
+ ret = messaging_dgm_out_send_fragment(
+ ev, out, iov_copy, iov_index, NULL, 0);
+ } else {
+ ret = messaging_dgm_out_send_fragment(
+ ev, out, iov_copy, iov_index, fds, num_fds);
+ }
+ if (ret != 0) {
+ break;
+ }
+ }
+
+ out->cookie += 1;
+ if (out->cookie == 0) {
+ out->cookie += 1;
+ }
+
+ return ret;
+}
+
+static struct messaging_dgm_context *global_dgm_context;
static int messaging_dgm_context_destructor(struct messaging_dgm_context *c);
@@ -168,6 +804,11 @@ fail_close:
return ret;
}
+static void messaging_dgm_read_handler(struct tevent_context *ev,
+ struct tevent_fd *fde,
+ uint16_t flags,
+ void *private_data);
+
int messaging_dgm_init(struct tevent_context *ev,
uint64_t *punique,
const char *socket_dir,
@@ -193,6 +834,7 @@ int messaging_dgm_init(struct tevent_context *ev,
if (ctx == NULL) {
goto fail_nomem;
}
+ ctx->ev = ev;
ctx->pid = getpid();
ctx->recv_cb = recv_cb;
ctx->recv_cb_private_data = recv_cb_private_data;
@@ -229,30 +871,52 @@ int messaging_dgm_init(struct tevent_context *ev,
return ret;
}
- ctx->msg_callbacks = poll_funcs_init_tevent(ctx);
- if (ctx->msg_callbacks == NULL) {
- goto fail_nomem;
- }
+ unlink(socket_address.sun_path);
- ctx->tevent_handle = poll_funcs_tevent_register(
- ctx, ctx->msg_callbacks, ev);
- if (ctx->tevent_handle == NULL) {
- goto fail_nomem;
+ ctx->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
+ if (ctx->sock == -1) {
+ ret = errno;
+ DBG_WARNING("socket failed: %s\n", strerror(ret));
+ TALLOC_FREE(ctx);
+ return ret;
}
- unlink(socket_address.sun_path);
+ ret = prepare_socket_cloexec(ctx->sock);
+ if (ret == -1) {
+ ret = errno;
+ DBG_WARNING("prepare_socket_cloexec failed: %s\n",
+ strerror(ret));
+ TALLOC_FREE(ctx);
+ return ret;
+ }
- ret = unix_msg_init(&socket_address, ctx->msg_callbacks, 1024,
- messaging_dgm_recv, ctx, &ctx->dgm_ctx);
- if (ret != 0) {
- DEBUG(1, ("unix_msg_init failed: %s\n", strerror(ret)));
+ ret = bind(ctx->sock, (struct sockaddr *)(void *)&socket_address,
+ sizeof(socket_address));
+ if (ret == -1) {
+ ret = errno;
+ DBG_WARNING("bind failed: %s\n", strerror(ret));
TALLOC_FREE(ctx);
return ret;
}
+
+ ctx->read_fde = tevent_add_fd(ctx->ev, ctx, ctx->sock, TEVENT_FD_READ,
+ messaging_dgm_read_handler, ctx);
+ if (ctx->read_fde == NULL) {
+ goto fail_nomem;
+ }
+
talloc_set_destructor(ctx, messaging_dgm_context_destructor);
ctx->have_dgm_context = &have_dgm_context;
+ ret = pthreadpool_tevent_init(ctx, 0, &ctx->pool);
+ if (ret != 0) {
+ DBG_WARNING("pthreadpool_tevent_init failed: %s\n",
+ strerror(ret));
+ TALLOC_FREE(ctx);
+ return ret;
+ }
+
global_dgm_context = ctx;
return 0;
@@ -263,17 +927,32 @@ fail_nomem:
static int messaging_dgm_context_destructor(struct messaging_dgm_context *c)
{
- /*
- * First delete the socket to avoid races. The lockfile is the
- * indicator that we're still around.
- */
- unix_msg_free(c->dgm_ctx);
+ while (c->outsocks != NULL) {
+ TALLOC_FREE(c->outsocks);
+ }
+ while (c->in_msgs != NULL) {
+ TALLOC_FREE(c->in_msgs);
+ }
+
+ TALLOC_FREE(c->read_fde);
+ close(c->sock);
if (getpid() == c->pid) {
struct sun_path_buf name;
int ret;
ret = snprintf(name.buf, sizeof(name.buf), "%s/%u",
+ c->socket_dir.buf, (unsigned)c->pid);
+ if ((ret < 0) || ((size_t)ret >= sizeof(name.buf))) {
+ /*
+ * We've checked the length when creating, so this
+ * should never happen
+ */
+ abort();
+ }
+ unlink(name.buf);
+
+ ret = snprintf(name.buf, sizeof(name.buf), "%s/%u",
c->lockfile_dir.buf, (unsigned)c->pid);
if ((ret < 0) || ((size_t)ret >= sizeof(name.buf))) {
/*
@@ -293,6 +972,174 @@ static int messaging_dgm_context_destructor(struct messaging_dgm_context *c)
return 0;
}
+static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
+ uint8_t *msg, size_t msg_len,
+ int *fds, size_t num_fds);
+
+static void messaging_dgm_read_handler(struct tevent_context *ev,
+ struct tevent_fd *fde,
+ uint16_t flags,
+ void *private_data)
+{
+ struct messaging_dgm_context *ctx = talloc_get_type_abort(
+ private_data, struct messaging_dgm_context);
+ ssize_t received;
+ struct msghdr msg;
+ struct iovec iov;
+ size_t msgbufsize = msghdr_prep_recv_fds(NULL, NULL, 0, INT8_MAX);
+ uint8_t msgbuf[msgbufsize];
+ uint8_t buf[MESSAGING_DGM_FRAGMENT_LENGTH];
+
+ if ((flags & TEVENT_FD_READ) == 0) {
+ return;
+ }
+
+ iov = (struct iovec) { .iov_base = buf, .iov_len = sizeof(buf) };
+ msg = (struct msghdr) { .msg_iov = &iov, .msg_iovlen = 1 };
+
+ msghdr_prep_recv_fds(&msg, msgbuf, msgbufsize, INT8_MAX);
+
+#ifdef MSG_CMSG_CLOEXEC
+ flags |= MSG_CMSG_CLOEXEC;
+#endif
+
+ received = recvmsg(ctx->sock, &msg, 0);
+ if (received == -1) {
+ if ((errno == EAGAIN) ||
+ (errno == EWOULDBLOCK) ||
+ (errno == EINTR) ||
+ (errno == ENOMEM)) {
+ /* Not really an error - just try again. */
+ return;
+ }
+ /* Problem with the socket. Set it unreadable. */
+ tevent_fd_set_flags(ctx->read_fde, 0);
+ return;
+ }
+
+ if ((size_t)received > sizeof(buf)) {
+ /* More than we expected, not for us */
+ return;
+ }
+
+ {
+ size_t num_fds = msghdr_extract_fds(&msg, NULL, 0);
+ size_t i;
+ int fds[num_fds];
+
+ msghdr_extract_fds(&msg, fds, num_fds);
+
+ for (i = 0; i < num_fds; i++) {
+ int err;
+
+ err = prepare_socket_cloexec(fds[i]);
+ if (err != 0) {
+ close_fd_array(fds, num_fds);
+ num_fds = 0;
+ }
+ }
+
+ messaging_dgm_recv(ctx, buf, received, fds, num_fds);
+ }
+
+}
+
+static int messaging_dgm_in_msg_destructor(struct messaging_dgm_in_msg *m)
+{
+ DLIST_REMOVE(m->ctx->in_msgs, m);
+ return 0;
+}
+
+static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
+ uint8_t *buf, size_t buflen,
+ int *fds, size_t num_fds)
+{
+ struct messaging_dgm_fragment_hdr hdr;
+ struct messaging_dgm_in_msg *msg;
+ size_t space;
+ uint64_t cookie;
+
+ if (buflen < sizeof(cookie)) {
+ goto close_fds;
+ }
+ memcpy(&cookie, buf, sizeof(cookie));
+ buf += sizeof(cookie);
+ buflen -= sizeof(cookie);
+
+ if (cookie == 0) {
+ ctx->recv_cb(buf, buflen, fds, num_fds,
+ ctx->recv_cb_private_data);
+ return;
+ }
+
+ if (buflen < sizeof(hdr)) {
+ goto close_fds;
+ }
+ memcpy(&hdr, buf, sizeof(hdr));
+ buf += sizeof(hdr);
+ buflen -= sizeof(hdr);
+
+ for (msg = ctx->in_msgs; msg != NULL; msg = msg->next) {
+ if ((msg->sender_pid == hdr.pid) &&
+ (msg->sender_sock == hdr.sock)) {
+ break;
+ }
+ }
+
+ if ((msg != NULL) && (msg->cookie != cookie)) {
+ TALLOC_FREE(msg);
+ }
+
+ if (msg == NULL) {
+ size_t msglen;
+ msglen = offsetof(struct messaging_dgm_in_msg, buf) +
+ hdr.msglen;
+
+ msg = talloc_size(ctx, msglen);
+ if (msg == NULL) {
+ goto close_fds;
+ }
+ talloc_set_name_const(msg, "struct messaging_dgm_in_msg");
+
+ *msg = (struct messaging_dgm_in_msg) {
+ .ctx = ctx, .msglen = hdr.msglen,
+ .sender_pid = hdr.pid, .sender_sock = hdr.sock,
+ .cookie = cookie
+ };
+ DLIST_ADD(ctx->in_msgs, msg);
+ talloc_set_destructor(msg, messaging_dgm_in_msg_destructor);
+ }
+
+ space = msg->msglen - msg->received;
+ if (buflen > space) {
+ goto close_fds;
+ }
+
+ memcpy(msg->buf + msg->received, buf, buflen);
+ msg->received += buflen;
+
+ if (msg->received < msg->msglen) {
+ /*
+ * Any valid sender will send the fds in the last
+ * block. Invalid senders might have sent fd's that we
+ * need to close here.
+ */
+ goto close_fds;
+ }
+
+ DLIST_REMOVE(ctx->in_msgs, msg);
+ talloc_set_destructor(msg, NULL);
+
+ ctx->recv_cb(msg->buf, msg->msglen, fds, num_fds,
+ ctx->recv_cb_private_data);
+
+ TALLOC_FREE(msg);
+ return;
+
+close_fds:
+ close_fd_array(fds, num_fds);
+}
+
void messaging_dgm_destroy(void)
{
TALLOC_FREE(global_dgm_context);
@@ -303,44 +1150,25 @@ int messaging_dgm_send(pid_t pid,
const int *fds, size_t num_fds)
{
struct messaging_dgm_context *ctx = global_dgm_context;
- struct sockaddr_un dst;
- ssize_t dst_pathlen;
+ struct messaging_dgm_out *out;
int ret;
if (ctx == NULL) {
return ENOTCONN;
}
- dst = (struct sockaddr_un) { .sun_family = AF_UNIX };
-
- dst_pathlen = snprintf(dst.sun_path, sizeof(dst.sun_path),
- "%s/%u", ctx->socket_dir.buf, (unsigned)pid);
- if (dst_pathlen < 0) {
- return errno;
- }
- if ((size_t)dst_pathlen >= sizeof(dst.sun_path)) {
- return ENAMETOOLONG;
+ ret = messaging_dgm_out_get(ctx, pid, &out);
+ if (ret != 0) {
+ return ret;
}
DEBUG(10, ("%s: Sending message to %u\n", __func__, (unsigned)pid));
- ret = unix_msg_send(ctx->dgm_ctx, &dst, iov, iovlen, fds, num_fds);
-
+ ret = messaging_dgm_out_send_fragmented(ctx->ev, out, iov, iovlen,
+ fds, num_fds);
return ret;
}
-static void messaging_dgm_recv(struct unix_msg_ctx *ctx,
- uint8_t *msg, size_t msg_len,
- int *fds, size_t num_fds,
- void *private_data)
-{
- struct messaging_dgm_context *dgm_ctx = talloc_get_type_abort(
- private_data, struct messaging_dgm_context);
-
- dgm_ctx->recv_cb(msg, msg_len, fds, num_fds,
- dgm_ctx->recv_cb_private_data);
-}
-
static int messaging_dgm_read_unique(int fd, uint64_t *punique)
{
char buf[25];
@@ -525,5 +1353,6 @@ void *messaging_dgm_register_tevent_context(TALLOC_CTX *mem_ctx,
if (ctx == NULL) {
return NULL;
}
- return poll_funcs_tevent_register(mem_ctx, ctx->msg_callbacks, ev);
+ return tevent_add_fd(ev, mem_ctx, ctx->sock, TEVENT_FD_READ,
+ messaging_dgm_read_handler, ctx);
}
diff --git a/source3/wscript_build b/source3/wscript_build
index 21a76d3..1598555 100755
--- a/source3/wscript_build
+++ b/source3/wscript_build
@@ -302,8 +302,7 @@ bld.SAMBA3_SUBSYSTEM('TDB_LIB',
bld.SAMBA3_LIBRARY('messages_dgm',
source='''lib/messages_dgm.c lib/messages_dgm_ref.c''',
- deps='''talloc UNIX_MSG POLL_FUNCS_TEVENT samba-debug
- genrand''',
+ deps='''talloc samba-debug PTHREADPOOL msghdr genrand''',
private_library=True)
bld.SAMBA3_LIBRARY('messages_util',
@@ -355,7 +354,6 @@ bld.SAMBA3_SUBSYSTEM('samba3core',
UTIL_PW
SAMBA_VERSION
PTHREADPOOL
- UNIX_MSG
POLL_FUNCS_TEVENT
interfaces
param
@@ -1459,7 +1457,6 @@ bld.SAMBA3_BINARY('spotlight2sparql',
bld.RECURSE('auth')
bld.RECURSE('libgpo/gpext')
bld.RECURSE('lib/pthreadpool')
-bld.RECURSE('lib/unix_msg')
bld.RECURSE('librpc')
bld.RECURSE('librpc/idl')
bld.RECURSE('libsmb')
--
2.7.4
>From 0df060e9c0a28e08af22861db3695ea12a7e1356 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Wed, 28 Sep 2016 14:35:21 -0700
Subject: [PATCH 14/26] lib: Remove unix_msg
Signed-off-by: Volker Lendecke <vl at samba.org>
---
source3/lib/unix_msg/test_drain.c | 83 ---
source3/lib/unix_msg/test_source.c | 93 ---
source3/lib/unix_msg/tests.c | 271 ---------
source3/lib/unix_msg/unix_msg.c | 1094 ------------------------------------
source3/lib/unix_msg/unix_msg.h | 121 ----
source3/lib/unix_msg/wscript_build | 18 -
6 files changed, 1680 deletions(-)
delete mode 100644 source3/lib/unix_msg/test_drain.c
delete mode 100644 source3/lib/unix_msg/test_source.c
delete mode 100644 source3/lib/unix_msg/tests.c
delete mode 100644 source3/lib/unix_msg/unix_msg.c
delete mode 100644 source3/lib/unix_msg/unix_msg.h
delete mode 100644 source3/lib/unix_msg/wscript_build
diff --git a/source3/lib/unix_msg/test_drain.c b/source3/lib/unix_msg/test_drain.c
deleted file mode 100644
index 675ac6f..0000000
--- a/source3/lib/unix_msg/test_drain.c
+++ /dev/null
@@ -1,83 +0,0 @@
-#include "replace.h"
-#include "unix_msg.h"
-#include "poll_funcs/poll_funcs_tevent.h"
-#include "tevent.h"
-#include "system/select.h"
-
-struct cb_state {
- unsigned num_received;
- uint8_t *buf;
- size_t buflen;
-};
-
-static void recv_cb(struct unix_msg_ctx *ctx,
- uint8_t *msg, size_t msg_len,
- int *fds, size_t num_fds,
- void *private_data);
-
-int main(int argc, const char *argv[])
-{
- struct poll_funcs *funcs;
- void *handle;
- struct sockaddr_un addr;
- struct unix_msg_ctx *ctx;
- struct tevent_context *ev;
- int ret;
-
- struct cb_state state;
-
- if (argc != 2) {
- fprintf(stderr, "Usage: %s <sockname>\n", argv[0]);
- return 1;
- }
-
- addr = (struct sockaddr_un) { .sun_family = AF_UNIX };
- strlcpy(addr.sun_path, argv[1], sizeof(addr.sun_path));
- unlink(addr.sun_path);
-
- ev = tevent_context_init(NULL);
- if (ev == NULL) {
- perror("tevent_context_init failed");
- return 1;
- }
- funcs = poll_funcs_init_tevent(ev);
- if (funcs == NULL) {
- fprintf(stderr, "poll_funcs_init_tevent failed\n");
- return 1;
- }
-
- handle = poll_funcs_tevent_register(ev, funcs, ev);
- if (handle == NULL) {
- fprintf(stderr, "poll_funcs_tevent_register failed\n");
- exit(1);
- }
-
- ret = unix_msg_init(&addr, funcs, 256, recv_cb, &state, &ctx);
- if (ret != 0) {
- fprintf(stderr, "unix_msg_init failed: %s\n",
- strerror(ret));
- return 1;
- }
-
- while (1) {
- ret = tevent_loop_once(ev);
- if (ret == -1) {
- fprintf(stderr, "tevent_loop_once failed: %s\n",
- strerror(errno));
- exit(1);
- }
- }
- return 0;
-}
-
-static void recv_cb(struct unix_msg_ctx *ctx,
- uint8_t *msg, size_t msg_len,
- int *fds, size_t num_fds,
- void *private_data)
-{
- unsigned num;
- if (msg_len == sizeof(num)) {
- memcpy(&num, msg, msg_len);
- printf("%u\n", num);
- }
-}
diff --git a/source3/lib/unix_msg/test_source.c b/source3/lib/unix_msg/test_source.c
deleted file mode 100644
index 3b65267..0000000
--- a/source3/lib/unix_msg/test_source.c
+++ /dev/null
@@ -1,93 +0,0 @@
-#include "replace.h"
-#include "unix_msg.h"
-#include "poll_funcs/poll_funcs_tevent.h"
-#include "tevent.h"
-
-int main(int argc, const char *argv[])
-{
- struct poll_funcs *funcs;
- void *tevent_handle;
- struct unix_msg_ctx **ctxs;
- struct tevent_context *ev;
- struct iovec iov;
- int ret;
- unsigned i;
- unsigned num_ctxs = 1;
- struct sockaddr_un dst;
-
- if (argc < 2) {
- fprintf(stderr, "Usage: %s <sockname> [num_contexts]\n", argv[0]);
- return 1;
- }
- if (argc > 2) {
- num_ctxs = atoi(argv[2]);
- }
-
- ev = tevent_context_init(NULL);
- if (ev == NULL) {
- perror("tevent_context_init failed");
- return 1;
- }
- funcs = poll_funcs_init_tevent(NULL);
- if (funcs == NULL) {
- fprintf(stderr, "poll_funcs_init_tevent failed\n");
- return 1;
- }
- tevent_handle = poll_funcs_tevent_register(NULL, funcs, ev);
- if (tevent_handle == NULL) {
- fprintf(stderr, "poll_funcs_tevent_register failed\n");
- return 1;
- }
-
- ctxs = talloc_array(ev, struct unix_msg_ctx *, num_ctxs);
- if (ctxs == NULL) {
- fprintf(stderr, "talloc failed\n");
- return 1;
- }
-
- for (i=0; i<num_ctxs; i++) {
- ret = unix_msg_init(NULL, funcs, 256, NULL, NULL,
- &ctxs[i]);
- if (ret != 0) {
- fprintf(stderr, "unix_msg_init failed: %s\n",
- strerror(ret));
- return 1;
- }
- }
-
- iov.iov_base = &i;
- iov.iov_len = sizeof(i);
-
- dst = (struct sockaddr_un) { .sun_family = AF_UNIX };
- strlcpy(dst.sun_path, argv[1], sizeof(dst.sun_path));
-
- for (i=0; i<num_ctxs; i++) {
- unsigned j;
-
- for (j=0; j<100000; j++) {
- ret = unix_msg_send(ctxs[i], &dst, &iov, 1, NULL, 0);
- if (ret != 0) {
- fprintf(stderr, "unix_msg_send failed: %s\n",
- strerror(ret));
- return 1;
- }
- }
- }
-
- while (true) {
- ret = tevent_loop_once(ev);
- if (ret == -1) {
- fprintf(stderr, "tevent_loop_once failed: %s\n",
- strerror(errno));
- exit(1);
- }
- }
-
- for (i=0; i<num_ctxs; i++) {
- unix_msg_free(ctxs[i]);
- }
-
- talloc_free(ev);
-
- return 0;
-}
diff --git a/source3/lib/unix_msg/tests.c b/source3/lib/unix_msg/tests.c
deleted file mode 100644
index c743c37..0000000
--- a/source3/lib/unix_msg/tests.c
+++ /dev/null
@@ -1,271 +0,0 @@
-#include "replace.h"
-#include "unix_msg.h"
-#include "poll_funcs/poll_funcs_tevent.h"
-#include "tevent.h"
-
-struct cb_state {
- unsigned num_received;
- uint8_t *buf;
- size_t buflen;
-};
-
-static void recv_cb(struct unix_msg_ctx *ctx,
- uint8_t *msg, size_t msg_len,
- int *fds, size_t num_fds,
- void *private_data);
-
-static void expect_messages(struct tevent_context *ev, struct cb_state *state,
- unsigned num_msgs)
-{
- state->num_received = 0;
-
- while (state->num_received < num_msgs) {
- int ret;
-
- ret = tevent_loop_once(ev);
- if (ret == -1) {
- fprintf(stderr, "tevent_loop_once failed: %s\n",
- strerror(errno));
- exit(1);
- }
- }
-}
-
-int main(void)
-{
- struct poll_funcs *funcs;
- void *tevent_handle;
- struct sockaddr_un addr1, addr2;
- struct unix_msg_ctx *ctx1, *ctx2;
- struct tevent_context *ev;
- struct iovec iov;
- uint8_t msg;
- int i, ret;
- static uint8_t buf[1755];
-
- struct cb_state state;
-
- addr1 = (struct sockaddr_un) { .sun_family = AF_UNIX };
- strlcpy(addr1.sun_path, "sock1", sizeof(addr1.sun_path));
- unlink(addr1.sun_path);
-
- addr2 = (struct sockaddr_un) { .sun_family = AF_UNIX };
- strlcpy(addr2.sun_path, "sock2", sizeof(addr2.sun_path));
- unlink(addr2.sun_path);
-
- ev = tevent_context_init(NULL);
- if (ev == NULL) {
- perror("tevent_context_init failed");
- return 1;
- }
-
- funcs = poll_funcs_init_tevent(ev);
- if (funcs == NULL) {
- fprintf(stderr, "poll_funcs_init_tevent failed\n");
- return 1;
- }
- tevent_handle = poll_funcs_tevent_register(ev, funcs, ev);
- if (tevent_handle == NULL) {
- fprintf(stderr, "poll_funcs_register_tevent failed\n");
- return 1;
- }
-
- ret = unix_msg_init(&addr1, funcs, 256, recv_cb, &state, &ctx1);
- if (ret != 0) {
- fprintf(stderr, "unix_msg_init failed: %s\n",
- strerror(ret));
- return 1;
- }
-
- ret = unix_msg_init(&addr1, funcs, 256, recv_cb, &state, &ctx1);
- if (ret == 0) {
- fprintf(stderr, "unix_msg_init succeeded unexpectedly\n");
- return 1;
- }
- if (ret != EADDRINUSE) {
- fprintf(stderr, "unix_msg_init returned %s, expected "
- "EADDRINUSE\n", strerror(ret));
- return 1;
- }
-
- ret = unix_msg_init(&addr2, funcs, 256, recv_cb, &state, &ctx2);
- if (ret != 0) {
- fprintf(stderr, "unix_msg_init failed: %s\n",
- strerror(ret));
- return 1;
- }
-
- printf("sending a 0-length message\n");
-
- state.buf = NULL;
- state.buflen = 0;
-
- ret = unix_msg_send(ctx1, &addr2, NULL, 0, NULL, 0);
- if (ret != 0) {
- fprintf(stderr, "unix_msg_send failed: %s\n",
- strerror(ret));
- return 1;
- }
-
- expect_messages(ev, &state, 1);
-
- printf("sending a small message\n");
-
- msg = random();
- iov.iov_base = &msg;
- iov.iov_len = sizeof(msg);
- state.buf = &msg;
- state.buflen = sizeof(msg);
-
- ret = unix_msg_send(ctx1, &addr2, &iov, 1, NULL, 0);
- if (ret != 0) {
- fprintf(stderr, "unix_msg_send failed: %s\n",
- strerror(ret));
- return 1;
- }
-
- expect_messages(ev, &state, 1);
-
- printf("test send queue caching\n");
-
- /*
- * queues are cached for some time, so this tests sending
- * still works after the cache expires and the queue was
- * freed.
- */
- sleep(SENDQ_CACHE_TIME_SECS + 1);
- ret = tevent_loop_once(ev);
- if (ret == -1) {
- fprintf(stderr, "tevent_loop_once failed: %s\n",
- strerror(errno));
- exit(1);
- }
-
- msg = random();
- iov.iov_base = &msg;
- iov.iov_len = sizeof(msg);
- state.buf = &msg;
- state.buflen = sizeof(msg);
-
- ret = unix_msg_send(ctx1, &addr2, &iov, 1, NULL, 0);
- if (ret != 0) {
- fprintf(stderr, "unix_msg_send failed: %s\n",
- strerror(ret));
- return 1;
- }
-
- expect_messages(ev, &state, 1);
-
- printf("sending six large, interleaved messages\n");
-
- for (i=0; i<sizeof(buf); i++) {
- buf[i] = random();
- }
-
- iov.iov_base = buf;
- iov.iov_len = sizeof(buf);
- state.buf = buf;
- state.buflen = sizeof(buf);
-
- for (i=0; i<3; i++) {
- ret = unix_msg_send(ctx1, &addr2, &iov, 1, NULL, 0);
- if (ret != 0) {
- fprintf(stderr, "unix_msg_send failed: %s\n",
- strerror(ret));
- return 1;
- }
- ret = unix_msg_send(ctx2, &addr2, &iov, 1, NULL, 0);
- if (ret != 0) {
- fprintf(stderr, "unix_msg_send failed: %s\n",
- strerror(ret));
- return 1;
- }
- }
-
- expect_messages(ev, &state, 6);
-
- printf("sending a few messages in small pieces\n");
-
- for (i = 0; i<5; i++) {
- struct iovec iovs[20];
- const size_t num_iovs = ARRAY_SIZE(iovs);
- uint8_t *p = buf;
- size_t j;
-
- for (j=0; j<num_iovs-1; j++) {
- size_t chunk = (random() % ((sizeof(buf) * 2) / num_iovs));
- size_t space = (sizeof(buf) - (p - buf));
-
- if (space == 0) {
- break;
- }
-
- chunk = MIN(chunk, space);
-
- iovs[j].iov_base = p;
- iovs[j].iov_len = chunk;
- p += chunk;
- }
-
- if (p < (buf + sizeof(buf))) {
- iovs[j].iov_base = p;
- iovs[j].iov_len = (sizeof(buf) - (p - buf));
- j++;
- }
-
- ret = unix_msg_send(ctx1, &addr1, iovs, j, NULL, 0);
- if (ret != 0) {
- fprintf(stderr, "unix_msg_send failed: %s\n",
- strerror(ret));
- return 1;
- }
- }
-
- expect_messages(ev, &state, 5);
-
- printf("Filling send queues before freeing\n");
-
- for (i=0; i<5; i++) {
- ret = unix_msg_send(ctx1, &addr2, &iov, 1, NULL, 0);
- if (ret != 0) {
- fprintf(stderr, "unix_msg_send failed: %s\n",
- strerror(ret));
- return 1;
- }
- ret = unix_msg_send(ctx1, &addr1, &iov, 1, NULL, 0);
- if (ret != 0) {
- fprintf(stderr, "unix_msg_send failed: %s\n",
- strerror(ret));
- return 1;
- }
- }
-
- expect_messages(ev, &state, 1); /* Read just one msg */
-
- unix_msg_free(ctx1);
- unix_msg_free(ctx2);
- talloc_free(tevent_handle);
- talloc_free(funcs);
- talloc_free(ev);
-
- return 0;
-}
-
-static void recv_cb(struct unix_msg_ctx *ctx,
- uint8_t *msg, size_t msg_len,
- int *fds, size_t num_fds,
- void *private_data)
-{
- struct cb_state *state = (struct cb_state *)private_data;
-
- if (msg_len != state->buflen) {
- fprintf(stderr, "expected %u bytes, got %u\n",
- (unsigned)state->buflen, (unsigned)msg_len);
- exit(1);
- }
- if ((msg_len != 0) && (memcmp(msg, state->buf, msg_len) != 0)) {
- fprintf(stderr, "message content differs\n");
- exit(1);
- }
- state->num_received += 1;
-}
diff --git a/source3/lib/unix_msg/unix_msg.c b/source3/lib/unix_msg/unix_msg.c
deleted file mode 100644
index 5cbf428..0000000
--- a/source3/lib/unix_msg/unix_msg.c
+++ /dev/null
@@ -1,1094 +0,0 @@
-/*
- * Unix SMB/CIFS implementation.
- * Copyright (C) Volker Lendecke 2013
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "replace.h"
-#include "unix_msg.h"
-#include "system/select.h"
-#include "system/time.h"
-#include "system/network.h"
-#include "lib/util/dlinklist.h"
-#include "pthreadpool/pthreadpool_pipe.h"
-#include "lib/util/iov_buf.h"
-#include "lib/util/msghdr.h"
-#include <fcntl.h>
-#include "lib/util/time.h"
-
-/*
- * This file implements two abstractions: The "unix_dgram" functions implement
- * queueing for unix domain datagram sockets. You can send to a destination
- * socket, and if that has no free space available, it will fall back to an
- * anonymous socket that will poll for writability. "unix_dgram" expects the
- * data size not to exceed the system limit.
- *
- * The "unix_msg" functions implement the fragmentation of large messages on
- * top of "unix_dgram". This is what is exposed to the user of this API.
- */
-
-struct unix_dgram_msg {
- struct unix_dgram_msg *prev, *next;
-
- int sock;
- ssize_t sent;
- int sys_errno;
-};
-
-struct unix_dgram_send_queue {
- struct unix_dgram_send_queue *prev, *next;
- struct unix_dgram_ctx *ctx;
- int sock;
- struct unix_dgram_msg *msgs;
- struct poll_timeout *timeout;
- char path[];
-};
-
-struct unix_dgram_ctx {
- int sock;
- pid_t created_pid;
- const struct poll_funcs *ev_funcs;
- size_t max_msg;
-
- void (*recv_callback)(struct unix_dgram_ctx *ctx,
- uint8_t *msg, size_t msg_len,
- int *fds, size_t num_fds,
- void *private_data);
- void *private_data;
-
- struct poll_watch *sock_read_watch;
- struct unix_dgram_send_queue *send_queues;
-
- struct pthreadpool_pipe *send_pool;
- struct poll_watch *pool_read_watch;
-
- uint8_t *recv_buf;
- char path[];
-};
-
-static void unix_dgram_recv_handler(struct poll_watch *w, int fd, short events,
- void *private_data);
-
-/* Set socket non blocking. */
-static int prepare_socket_nonblock(int sock, bool nonblock)
-{
- int flags;
-#ifdef O_NONBLOCK
-#define FLAG_TO_SET O_NONBLOCK
-#else
-#ifdef SYSV
-#define FLAG_TO_SET O_NDELAY
-#else /* BSD */
-#define FLAG_TO_SET FNDELAY
-#endif
-#endif
-
- flags = fcntl(sock, F_GETFL);
- if (flags == -1) {
- return errno;
- }
- if (nonblock) {
- flags |= FLAG_TO_SET;
- } else {
- flags &= ~FLAG_TO_SET;
- }
- if (fcntl(sock, F_SETFL, flags) == -1) {
- return errno;
- }
-
-#undef FLAG_TO_SET
- return 0;
-}
-
-/* Set socket close on exec. */
-static int prepare_socket_cloexec(int sock)
-{
-#ifdef FD_CLOEXEC
- int flags;
-
- flags = fcntl(sock, F_GETFD, 0);
- if (flags == -1) {
- return errno;
- }
- flags |= FD_CLOEXEC;
- if (fcntl(sock, F_SETFD, flags) == -1) {
- return errno;
- }
-#endif
- return 0;
-}
-
-/* Set socket non blocking and close on exec. */
-static int prepare_socket(int sock)
-{
- int ret = prepare_socket_nonblock(sock, true);
-
- if (ret) {
- return ret;
- }
- return prepare_socket_cloexec(sock);
-}
-
-static size_t unix_dgram_msg_size(void)
-{
- size_t msgsize = sizeof(struct unix_dgram_msg);
- msgsize = (msgsize + 15) & ~15; /* align to 16 */
- return msgsize;
-}
-
-static struct msghdr_buf *unix_dgram_msghdr(struct unix_dgram_msg *msg)
-{
- /*
- * Not portable in C99, but "msg" is aligned and so is
- * unix_dgram_msg_size()
- */
- return (struct msghdr_buf *)(((char *)msg) + unix_dgram_msg_size());
-}
-
-static void close_fd_array(int *fds, size_t num_fds)
-{
- size_t i;
-
- for (i = 0; i < num_fds; i++) {
- if (fds[i] == -1) {
- continue;
- }
-
- close(fds[i]);
- fds[i] = -1;
- }
-}
-
-static void close_fd_array_dgram_msg(struct unix_dgram_msg *dmsg)
-{
- struct msghdr_buf *hdr = unix_dgram_msghdr(dmsg);
- struct msghdr *msg = msghdr_buf_msghdr(hdr);
- size_t num_fds = msghdr_extract_fds(msg, NULL, 0);
- int fds[num_fds];
-
- msghdr_extract_fds(msg, fds, num_fds);
-
- close_fd_array(fds, num_fds);
-}
-
-static int unix_dgram_init(const struct sockaddr_un *addr, size_t max_msg,
- const struct poll_funcs *ev_funcs,
- void (*recv_callback)(struct unix_dgram_ctx *ctx,
- uint8_t *msg, size_t msg_len,
- int *fds, size_t num_fds,
- void *private_data),
- void *private_data,
- struct unix_dgram_ctx **result)
-{
- struct unix_dgram_ctx *ctx;
- size_t pathlen;
- int ret;
-
- if (addr != NULL) {
- pathlen = strlen(addr->sun_path)+1;
- } else {
- pathlen = 1;
- }
-
- ctx = malloc(offsetof(struct unix_dgram_ctx, path) + pathlen);
- if (ctx == NULL) {
- return ENOMEM;
- }
- if (addr != NULL) {
- memcpy(ctx->path, addr->sun_path, pathlen);
- } else {
- ctx->path[0] = '\0';
- }
-
- *ctx = (struct unix_dgram_ctx) {
- .max_msg = max_msg,
- .ev_funcs = ev_funcs,
- .recv_callback = recv_callback,
- .private_data = private_data,
- .created_pid = (pid_t)-1
- };
-
- ctx->recv_buf = malloc(max_msg);
- if (ctx->recv_buf == NULL) {
- free(ctx);
- return ENOMEM;
- }
-
- ctx->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
- if (ctx->sock == -1) {
- ret = errno;
- goto fail_free;
- }
-
- /* Set non-blocking and close-on-exec. */
- ret = prepare_socket(ctx->sock);
- if (ret != 0) {
- goto fail_close;
- }
-
- if (addr != NULL) {
- ret = bind(ctx->sock,
- (const struct sockaddr *)(const void *)addr,
- sizeof(*addr));
- if (ret == -1) {
- ret = errno;
- goto fail_close;
- }
-
- ctx->created_pid = getpid();
-
- ctx->sock_read_watch = ctx->ev_funcs->watch_new(
- ctx->ev_funcs, ctx->sock, POLLIN,
- unix_dgram_recv_handler, ctx);
-
- if (ctx->sock_read_watch == NULL) {
- ret = ENOMEM;
- goto fail_close;
- }
- }
-
- *result = ctx;
- return 0;
-
-fail_close:
- close(ctx->sock);
-fail_free:
- free(ctx->recv_buf);
- free(ctx);
- return ret;
-}
-
-static void unix_dgram_recv_handler(struct poll_watch *w, int fd, short events,
- void *private_data)
-{
- struct unix_dgram_ctx *ctx = (struct unix_dgram_ctx *)private_data;
- ssize_t received;
- int flags = 0;
- struct msghdr msg;
- struct iovec iov;
- size_t bufsize = msghdr_prep_recv_fds(NULL, NULL, 0, INT8_MAX);
- uint8_t buf[bufsize];
-
- iov = (struct iovec) {
- .iov_base = (void *)ctx->recv_buf,
- .iov_len = ctx->max_msg,
- };
-
- msg = (struct msghdr) {
- .msg_iov = &iov,
- .msg_iovlen = 1,
- };
-
- msghdr_prep_recv_fds(&msg, buf, bufsize, INT8_MAX);
-
-#ifdef MSG_CMSG_CLOEXEC
- flags |= MSG_CMSG_CLOEXEC;
-#endif
-
- received = recvmsg(fd, &msg, flags);
- if (received == -1) {
- if ((errno == EAGAIN) ||
- (errno == EWOULDBLOCK) ||
- (errno == EINTR) || (errno == ENOMEM)) {
- /* Not really an error - just try again. */
- return;
- }
- /* Problem with the socket. Set it unreadable. */
- ctx->ev_funcs->watch_update(w, 0);
- return;
- }
- if (received > ctx->max_msg) {
- /* More than we expected, not for us */
- return;
- }
-
- {
- size_t num_fds = msghdr_extract_fds(&msg, NULL, 0);
- int fds[num_fds];
- int i;
-
- msghdr_extract_fds(&msg, fds, num_fds);
-
- for (i = 0; i < num_fds; i++) {
- int err;
-
- err = prepare_socket_cloexec(fds[i]);
- if (err != 0) {
- close_fd_array(fds, num_fds);
- num_fds = 0;
- }
- }
-
- ctx->recv_callback(ctx, ctx->recv_buf, received,
- fds, num_fds, ctx->private_data);
- }
-}
-
-static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
- void *private_data);
-
-static int unix_dgram_init_pthreadpool(struct unix_dgram_ctx *ctx)
-{
- int ret, signalfd;
-
- if (ctx->send_pool != NULL) {
- return 0;
- }
-
- ret = pthreadpool_pipe_init(0, &ctx->send_pool);
- if (ret != 0) {
- return ret;
- }
-
- signalfd = pthreadpool_pipe_signal_fd(ctx->send_pool);
-
- ctx->pool_read_watch = ctx->ev_funcs->watch_new(
- ctx->ev_funcs, signalfd, POLLIN,
- unix_dgram_job_finished, ctx);
- if (ctx->pool_read_watch == NULL) {
- pthreadpool_pipe_destroy(ctx->send_pool);
- ctx->send_pool = NULL;
- return ENOMEM;
- }
-
- return 0;
-}
-
-static int unix_dgram_sendq_schedule_free(struct unix_dgram_send_queue *q);
-
-static int unix_dgram_send_queue_init(
- struct unix_dgram_ctx *ctx, const struct sockaddr_un *dst,
- struct unix_dgram_send_queue **result)
-{
- struct unix_dgram_send_queue *q;
- size_t pathlen;
- int ret, err;
-
- pathlen = strlen(dst->sun_path)+1;
-
- q = malloc(offsetof(struct unix_dgram_send_queue, path) + pathlen);
- if (q == NULL) {
- return ENOMEM;
- }
- q->ctx = ctx;
- q->msgs = NULL;
- q->timeout = NULL;
- memcpy(q->path, dst->sun_path, pathlen);
-
- q->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
- if (q->sock == -1) {
- err = errno;
- goto fail_free;
- }
-
- err = prepare_socket(q->sock);
- if (err != 0) {
- goto fail_close;
- }
-
- do {
- ret = connect(q->sock,
- (const struct sockaddr *)(const void *)dst,
- sizeof(*dst));
- } while ((ret == -1) && (errno == EINTR));
-
- if (ret == -1) {
- err = errno;
- goto fail_close;
- }
-
- err = unix_dgram_init_pthreadpool(ctx);
- if (err != 0) {
- goto fail_close;
- }
-
- ret = unix_dgram_sendq_schedule_free(q);
- if (ret != 0) {
- err = ENOMEM;
- goto fail_close;
- }
-
- DLIST_ADD(ctx->send_queues, q);
-
- *result = q;
- return 0;
-
-fail_close:
- close(q->sock);
-fail_free:
- free(q);
- return err;
-}
-
-static void unix_dgram_send_queue_free(struct unix_dgram_send_queue *q)
-{
- struct unix_dgram_ctx *ctx = q->ctx;
-
- while (q->msgs != NULL) {
- struct unix_dgram_msg *msg;
- msg = q->msgs;
- DLIST_REMOVE(q->msgs, msg);
- close_fd_array_dgram_msg(msg);
- free(msg);
- }
- close(q->sock);
- DLIST_REMOVE(ctx->send_queues, q);
- ctx->ev_funcs->timeout_free(q->timeout);
- free(q);
-}
-
-static void unix_dgram_sendq_scheduled_free_handler(
- struct poll_timeout *t, void *private_data);
-
-static int unix_dgram_sendq_schedule_free(struct unix_dgram_send_queue *q)
-{
- struct unix_dgram_ctx *ctx = q->ctx;
- struct timeval timeout;
-
- if (q->timeout != NULL) {
- return 0;
- }
-
- GetTimeOfDay(&timeout);
- timeout.tv_sec += SENDQ_CACHE_TIME_SECS;
-
- q->timeout = ctx->ev_funcs->timeout_new(
- ctx->ev_funcs,
- timeout,
- unix_dgram_sendq_scheduled_free_handler,
- q);
- if (q->timeout == NULL) {
- return ENOMEM;
- }
-
- return 0;
-}
-
-static void unix_dgram_sendq_scheduled_free_handler(struct poll_timeout *t,
- void *private_data)
-{
- struct unix_dgram_send_queue *q = private_data;
- int ret;
-
- q->ctx->ev_funcs->timeout_free(q->timeout);
- q->timeout = NULL;
-
- if (q->msgs == NULL) {
- unix_dgram_send_queue_free(q);
- return;
- }
-
- ret = unix_dgram_sendq_schedule_free(q);
- if (ret != 0) {
- unix_dgram_send_queue_free(q);
- return;
- }
-}
-
-static int find_send_queue(struct unix_dgram_ctx *ctx,
- const struct sockaddr_un *dst,
- struct unix_dgram_send_queue **ps)
-{
- struct unix_dgram_send_queue *s;
- int ret;
-
- for (s = ctx->send_queues; s != NULL; s = s->next) {
- if (strcmp(s->path, dst->sun_path) == 0) {
- *ps = s;
- return 0;
- }
- }
- ret = unix_dgram_send_queue_init(ctx, dst, &s);
- if (ret != 0) {
- return ret;
- }
- *ps = s;
- return 0;
-}
-
-static int queue_msg(struct unix_dgram_send_queue *q,
- const struct iovec *iov, int iovcnt,
- const int *fds, size_t num_fds)
-{
- struct unix_dgram_msg *msg;
- struct msghdr_buf *hdr;
- size_t msglen, needed;
- ssize_t msghdrlen;
- int fds_copy[MIN(num_fds, INT8_MAX)];
- int i, ret;
-
- for (i=0; i<num_fds; i++) {
- fds_copy[i] = -1;
- }
-
- for (i = 0; i < num_fds; i++) {
- fds_copy[i] = dup(fds[i]);
- if (fds_copy[i] == -1) {
- ret = errno;
- goto fail;
- }
- }
-
- msglen = unix_dgram_msg_size();
-
- msghdrlen = msghdr_copy(NULL, 0, NULL, 0, iov, iovcnt,
- fds_copy, num_fds);
- if (msghdrlen == -1) {
- ret = EMSGSIZE;
- goto fail;
- }
-
- needed = msglen + msghdrlen;
- if (needed < msglen) {
- ret = EMSGSIZE;
- goto fail;
- }
-
- msg = malloc(needed);
- if (msg == NULL) {
- ret = ENOMEM;
- goto fail;
- }
- hdr = unix_dgram_msghdr(msg);
-
- msg->sock = q->sock;
- msghdr_copy(hdr, msghdrlen, NULL, 0, iov, iovcnt,
- fds_copy, num_fds);
-
- DLIST_ADD_END(q->msgs, msg);
- return 0;
-fail:
- close_fd_array(fds_copy, num_fds);
- return ret;
-}
-
-static void unix_dgram_send_job(void *private_data)
-{
- struct unix_dgram_msg *dmsg = private_data;
-
- do {
- struct msghdr_buf *hdr = unix_dgram_msghdr(dmsg);
- struct msghdr *msg = msghdr_buf_msghdr(hdr);
- dmsg->sent = sendmsg(dmsg->sock, msg, 0);
- } while ((dmsg->sent == -1) && (errno == EINTR));
-
- if (dmsg->sent == -1) {
- dmsg->sys_errno = errno;
- }
-}
-
-static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
- void *private_data)
-{
- struct unix_dgram_ctx *ctx = private_data;
- struct unix_dgram_send_queue *q;
- struct unix_dgram_msg *msg;
- int ret, job;
-
- ret = pthreadpool_pipe_finished_jobs(ctx->send_pool, &job, 1);
- if (ret != 1) {
- return;
- }
-
- for (q = ctx->send_queues; q != NULL; q = q->next) {
- if (job == q->sock) {
- break;
- }
- }
-
- if (q == NULL) {
- /* Huh? Should not happen */
- return;
- }
-
- msg = q->msgs;
- DLIST_REMOVE(q->msgs, msg);
- close_fd_array_dgram_msg(msg);
- free(msg);
-
- if (q->msgs != NULL) {
- ret = pthreadpool_pipe_add_job(ctx->send_pool, q->sock,
- unix_dgram_send_job, q->msgs);
- if (ret != 0) {
- unix_dgram_send_queue_free(q);
- return;
- }
- return;
- }
-
- ret = prepare_socket_nonblock(q->sock, true);
- if (ret != 0) {
- unix_dgram_send_queue_free(q);
- }
-}
-
-static int unix_dgram_send(struct unix_dgram_ctx *ctx,
- const struct sockaddr_un *dst,
- const struct iovec *iov, int iovlen,
- const int *fds, size_t num_fds)
-{
- struct unix_dgram_send_queue *q;
- struct msghdr msg;
- ssize_t fdlen;
- int ret;
- int i;
-
- if (num_fds > INT8_MAX) {
- return EINVAL;
- }
-
-#if !defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL) && !defined(HAVE_STRUCT_MSGHDR_MSG_ACCRIGHTS)
- if (num_fds > 0) {
- return ENOSYS;
- }
-#endif
-
- for (i = 0; i < num_fds; i++) {
- /*
- * Make sure we only allow fd passing
- * for communication channels,
- * e.g. sockets, pipes, fifos, ...
- */
- ret = lseek(fds[i], 0, SEEK_CUR);
- if (ret == -1 && errno == ESPIPE) {
- /* ok */
- continue;
- }
-
- /*
- * Reject the message as we may need to call dup(),
- * if we queue the message.
- *
- * That might result in unexpected behavior for the caller
- * for files and broken posix locking.
- */
- return EINVAL;
- }
-
- ret = find_send_queue(ctx, dst, &q);
- if (ret != 0) {
- return ret;
- }
-
- if (q->msgs) {
- /*
- * To preserve message ordering, we have to queue a
- * message when others are waiting in line already.
- */
- return queue_msg(q, iov, iovlen, fds, num_fds);
- }
-
- /*
- * Try a cheap nonblocking send
- */
-
- msg = (struct msghdr) {
- .msg_iov = discard_const_p(struct iovec, iov),
- .msg_iovlen = iovlen
- };
-
- fdlen = msghdr_prep_fds(&msg, NULL, 0, fds, num_fds);
- if (fdlen == -1) {
- return EINVAL;
- }
-
- {
- uint8_t buf[fdlen];
- msghdr_prep_fds(&msg, buf, fdlen, fds, num_fds);
-
- ret = sendmsg(q->sock, &msg, 0);
- }
-
- if (ret >= 0) {
- return 0;
- }
- if ((errno != EWOULDBLOCK) &&
- (errno != EAGAIN) &&
-#ifdef ENOBUFS
- /* FreeBSD can give this for large messages */
- (errno != ENOBUFS) &&
-#endif
- (errno != EINTR)) {
- return errno;
- }
-
- ret = queue_msg(q, iov, iovlen, fds, num_fds);
- if (ret != 0) {
- unix_dgram_send_queue_free(q);
- return ret;
- }
-
- /*
- * While sending the messages via the pthreadpool, we set the
- * socket back to blocking mode. When the sendqueue becomes
- * empty and we could attempt direct sends again, the
- * finished-jobs-handler of the pthreadpool will set it back
- * to non-blocking.
- */
- ret = prepare_socket_nonblock(q->sock, false);
- if (ret != 0) {
- unix_dgram_send_queue_free(q);
- return ret;
- }
- ret = pthreadpool_pipe_add_job(ctx->send_pool, q->sock,
- unix_dgram_send_job, q->msgs);
- if (ret != 0) {
- unix_dgram_send_queue_free(q);
- return ret;
- }
- return 0;
-}
-
-static int unix_dgram_sock(struct unix_dgram_ctx *ctx)
-{
- return ctx->sock;
-}
-
-static int unix_dgram_free(struct unix_dgram_ctx *ctx)
-{
- struct unix_dgram_send_queue *q;
-
- for (q = ctx->send_queues; q != NULL;) {
- struct unix_dgram_send_queue *q_next = q->next;
-
- if (q->msgs != NULL) {
- return EBUSY;
- }
- unix_dgram_send_queue_free(q);
- q = q_next;
- }
-
- if (ctx->send_pool != NULL) {
- int ret = pthreadpool_pipe_destroy(ctx->send_pool);
- if (ret != 0) {
- return ret;
- }
- ctx->ev_funcs->watch_free(ctx->pool_read_watch);
- }
-
- ctx->ev_funcs->watch_free(ctx->sock_read_watch);
-
- close(ctx->sock);
- if (getpid() == ctx->created_pid) {
- /* If we created it, unlink. Otherwise someone else might
- * still have it open */
- unlink(ctx->path);
- }
-
- free(ctx->recv_buf);
- free(ctx);
- return 0;
-}
-
-/*
- * Every message starts with a uint64_t cookie.
- *
- * A value of 0 indicates a single-fragment message which is complete in
- * itself. The data immediately follows the cookie.
- *
- * Every multi-fragment message has a cookie != 0 and starts with a cookie
- * followed by a struct unix_msg_header and then the data. The pid and sock
- * fields are used to assure uniqueness on the receiver side.
- */
-
-struct unix_msg_hdr {
- size_t msglen;
- pid_t pid;
- int sock;
-};
-
-struct unix_msg {
- struct unix_msg *prev, *next;
- size_t msglen;
- size_t received;
- pid_t sender_pid;
- int sender_sock;
- uint64_t cookie;
- uint8_t buf[1];
-};
-
-struct unix_msg_ctx {
- struct unix_dgram_ctx *dgram;
- size_t fragment_len;
- uint64_t cookie;
-
- void (*recv_callback)(struct unix_msg_ctx *ctx,
- uint8_t *msg, size_t msg_len,
- int *fds, size_t num_fds,
- void *private_data);
- void *private_data;
-
- struct unix_msg *msgs;
-};
-
-static void unix_msg_recv(struct unix_dgram_ctx *dgram_ctx,
- uint8_t *buf, size_t buflen,
- int *fds, size_t num_fds,
- void *private_data);
-
-int unix_msg_init(const struct sockaddr_un *addr,
- const struct poll_funcs *ev_funcs,
- size_t fragment_len,
- void (*recv_callback)(struct unix_msg_ctx *ctx,
- uint8_t *msg, size_t msg_len,
- int *fds, size_t num_fds,
- void *private_data),
- void *private_data,
- struct unix_msg_ctx **result)
-{
- struct unix_msg_ctx *ctx;
- int ret;
-
- ctx = malloc(sizeof(*ctx));
- if (ctx == NULL) {
- return ENOMEM;
- }
-
- *ctx = (struct unix_msg_ctx) {
- .fragment_len = fragment_len,
- .cookie = 1,
- .recv_callback = recv_callback,
- .private_data = private_data
- };
-
- ret = unix_dgram_init(addr, fragment_len, ev_funcs,
- unix_msg_recv, ctx, &ctx->dgram);
- if (ret != 0) {
- free(ctx);
- return ret;
- }
-
- *result = ctx;
- return 0;
-}
-
-int unix_msg_send(struct unix_msg_ctx *ctx, const struct sockaddr_un *dst,
- const struct iovec *iov, int iovlen,
- const int *fds, size_t num_fds)
-{
- ssize_t msglen;
- size_t sent;
- int ret = 0;
- struct iovec iov_copy[iovlen+2];
- struct unix_msg_hdr hdr;
- struct iovec src_iov;
-
- if (iovlen < 0) {
- return EINVAL;
- }
-
- msglen = iov_buflen(iov, iovlen);
- if (msglen == -1) {
- return EINVAL;
- }
-
- if (num_fds > INT8_MAX) {
- return EINVAL;
- }
-
- if (msglen <= (ctx->fragment_len - sizeof(uint64_t))) {
- uint64_t cookie = 0;
-
- iov_copy[0].iov_base = &cookie;
- iov_copy[0].iov_len = sizeof(cookie);
- if (iovlen > 0) {
- memcpy(&iov_copy[1], iov,
- sizeof(struct iovec) * iovlen);
- }
-
- return unix_dgram_send(ctx->dgram, dst, iov_copy, iovlen+1,
- fds, num_fds);
- }
-
- hdr = (struct unix_msg_hdr) {
- .msglen = msglen,
- .pid = getpid(),
- .sock = unix_dgram_sock(ctx->dgram)
- };
-
- iov_copy[0].iov_base = &ctx->cookie;
- iov_copy[0].iov_len = sizeof(ctx->cookie);
- iov_copy[1].iov_base = &hdr;
- iov_copy[1].iov_len = sizeof(hdr);
-
- sent = 0;
- src_iov = iov[0];
-
- /*
- * The following write loop sends the user message in pieces. We have
- * filled the first two iovecs above with "cookie" and "hdr". In the
- * following loops we pull message chunks from the user iov array and
- * fill iov_copy piece by piece, possibly truncating chunks from the
- * caller's iov array. Ugly, but hopefully efficient.
- */
-
- while (sent < msglen) {
- size_t fragment_len;
- size_t iov_index = 2;
-
- fragment_len = sizeof(ctx->cookie) + sizeof(hdr);
-
- while (fragment_len < ctx->fragment_len) {
- size_t space, chunk;
-
- space = ctx->fragment_len - fragment_len;
- chunk = MIN(space, src_iov.iov_len);
-
- iov_copy[iov_index].iov_base = src_iov.iov_base;
- iov_copy[iov_index].iov_len = chunk;
- iov_index += 1;
-
- src_iov.iov_base = (char *)src_iov.iov_base + chunk;
- src_iov.iov_len -= chunk;
- fragment_len += chunk;
-
- if (src_iov.iov_len == 0) {
- iov += 1;
- iovlen -= 1;
- if (iovlen == 0) {
- break;
- }
- src_iov = iov[0];
- }
- }
- sent += (fragment_len - sizeof(ctx->cookie) - sizeof(hdr));
-
- /*
- * only the last fragment should pass the fd array.
- * That simplifies the receiver a lot.
- */
- if (sent < msglen) {
- ret = unix_dgram_send(ctx->dgram, dst,
- iov_copy, iov_index,
- NULL, 0);
- } else {
- ret = unix_dgram_send(ctx->dgram, dst,
- iov_copy, iov_index,
- fds, num_fds);
- }
- if (ret != 0) {
- break;
- }
- }
-
- ctx->cookie += 1;
- if (ctx->cookie == 0) {
- ctx->cookie += 1;
- }
-
- return ret;
-}
-
-static void unix_msg_recv(struct unix_dgram_ctx *dgram_ctx,
- uint8_t *buf, size_t buflen,
- int *fds, size_t num_fds,
- void *private_data)
-{
- struct unix_msg_ctx *ctx = (struct unix_msg_ctx *)private_data;
- struct unix_msg_hdr hdr;
- struct unix_msg *msg;
- size_t space;
- uint64_t cookie;
-
- if (buflen < sizeof(cookie)) {
- goto close_fds;
- }
-
- memcpy(&cookie, buf, sizeof(cookie));
-
- buf += sizeof(cookie);
- buflen -= sizeof(cookie);
-
- if (cookie == 0) {
- ctx->recv_callback(ctx, buf, buflen, fds, num_fds,
- ctx->private_data);
- return;
- }
-
- if (buflen < sizeof(hdr)) {
- goto close_fds;
- }
- memcpy(&hdr, buf, sizeof(hdr));
-
- buf += sizeof(hdr);
- buflen -= sizeof(hdr);
-
- for (msg = ctx->msgs; msg != NULL; msg = msg->next) {
- if ((msg->sender_pid == hdr.pid) &&
- (msg->sender_sock == hdr.sock)) {
- break;
- }
- }
-
- if ((msg != NULL) && (msg->cookie != cookie)) {
- DLIST_REMOVE(ctx->msgs, msg);
- free(msg);
- msg = NULL;
- }
-
- if (msg == NULL) {
- msg = malloc(offsetof(struct unix_msg, buf) + hdr.msglen);
- if (msg == NULL) {
- goto close_fds;
- }
- *msg = (struct unix_msg) {
- .msglen = hdr.msglen,
- .sender_pid = hdr.pid,
- .sender_sock = hdr.sock,
- .cookie = cookie
- };
- DLIST_ADD(ctx->msgs, msg);
- }
-
- space = msg->msglen - msg->received;
- if (buflen > space) {
- goto close_fds;
- }
-
- memcpy(msg->buf + msg->received, buf, buflen);
- msg->received += buflen;
-
- if (msg->received < msg->msglen) {
- goto close_fds;
- }
-
- DLIST_REMOVE(ctx->msgs, msg);
- ctx->recv_callback(ctx, msg->buf, msg->msglen, fds, num_fds,
- ctx->private_data);
- free(msg);
- return;
-
-close_fds:
- close_fd_array(fds, num_fds);
-}
-
-int unix_msg_free(struct unix_msg_ctx *ctx)
-{
- int ret;
-
- ret = unix_dgram_free(ctx->dgram);
- if (ret != 0) {
- return ret;
- }
-
- while (ctx->msgs != NULL) {
- struct unix_msg *msg = ctx->msgs;
- DLIST_REMOVE(ctx->msgs, msg);
- free(msg);
- }
-
- free(ctx);
- return 0;
-}
diff --git a/source3/lib/unix_msg/unix_msg.h b/source3/lib/unix_msg/unix_msg.h
deleted file mode 100644
index 375d4ac..0000000
--- a/source3/lib/unix_msg/unix_msg.h
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Unix SMB/CIFS implementation.
- * Copyright (C) Volker Lendecke 2013
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#ifndef __UNIX_MSG_H__
-#define __UNIX_MSG_H__
-
-#include "replace.h"
-#include "poll_funcs/poll_funcs.h"
-#include "system/network.h"
-
-/**
- * @file unix_msg.h
- *
- * @brief Send large messages over unix domain datagram sockets
- *
- * A unix_msg_ctx represents a unix domain datagram socket.
- *
- * Unix domain datagram sockets have some unique properties compared with UDP
- * sockets:
- *
- * - They are reliable, i.e. as long as both sender and receiver are processes
- * that are alive, nothing is lost.
- *
- * - They preserve sequencing
- *
- * Based on these two properties, this code implements sending of large
- * messages. It aims at being maximally efficient for short, single-datagram
- * messages. Ideally, if the receiver queue is not full, sending a message
- * should be a single syscall without malloc. Receiving a message should also
- * not malloc anything before the data is shipped to the user.
- *
- * If unix_msg_send meets a full receive buffer, more effort is required: The
- * socket behind unix_msg_send is not pollable for POLLOUT, it will always be
- * writable: A datagram socket can send anywhere, the full queue is a property
- * of of the receiving socket. unix_msg_send creates a new unnamed socket that
- * it will connect(2) to the target socket. This unnamed socket is then
- * pollable for POLLOUT. The socket will be writable when the destination
- * socket's queue is drained sufficiently.
- *
- * If unix_msg_send is asked to send a message larger than fragment_size, it
- * will try sending the message in pieces with proper framing, the receiving
- * side will reassemble the messages.
- *
- * fd-passing is supported.
- * Note that by default the fds passed to recv_callback are closed by
- * the receive handler in order to avoid fd-leaks. If the provider of
- * the recv_callback wants to use a passed file descriptor after the
- * callback returns, it must copy the fd away and set the corresponding
- * entry in the "fds" array to -1.
- */
-
-/**
- * @brief Abstract structure representing a unix domain datagram socket
- */
-struct unix_msg_ctx;
-
-/**
- * @brief Initialize a struct unix_msg_ctx
- *
- * @param[in] path The socket path
- * @param[in] ev_funcs The event callback functions to use
- * @param[in] fragment_size Maximum datagram size to send/receive
- * @param[in] recv_callback Function called when a message is received
- * @param[in] private_data Private pointer for recv_callback
- * @param[out] result The new struct unix_msg_ctx
- * @return 0 on success, errno on failure
- */
-
-
-int unix_msg_init(const struct sockaddr_un *addr,
- const struct poll_funcs *ev_funcs,
- size_t fragment_size,
- void (*recv_callback)(struct unix_msg_ctx *ctx,
- uint8_t *msg, size_t msg_len,
- int *fds, size_t num_fds,
- void *private_data),
- void *private_data,
- struct unix_msg_ctx **result);
-
-/**
- * @brief Send a message
- *
- * @param[in] ctx The context to send across
- * @param[in] dst_sock The destination socket path
- * @param[in] iov The message
- * @param[in] iovlen The number of iov structs
- * @param[in] fds - optional fd array
- * @param[in] num_fds - fd array size
- * @return 0 on success, errno on failure
- */
-
-int unix_msg_send(struct unix_msg_ctx *ctx, const struct sockaddr_un *dst,
- const struct iovec *iov, int iovlen,
- const int *fds, size_t num_fds);
-
-/**
- * @brief Free a unix_msg_ctx
- *
- * @param[in] ctx The message context to free
- * @return 0 on success, errno on failure (EBUSY)
- */
-int unix_msg_free(struct unix_msg_ctx *ctx);
-
-#define SENDQ_CACHE_TIME_SECS 10
-
-#endif
diff --git a/source3/lib/unix_msg/wscript_build b/source3/lib/unix_msg/wscript_build
deleted file mode 100644
index 469f87e..0000000
--- a/source3/lib/unix_msg/wscript_build
+++ /dev/null
@@ -1,18 +0,0 @@
-#!/usr/bin/env python
-
-bld.SAMBA3_SUBSYSTEM('UNIX_MSG',
- source='unix_msg.c',
- deps='replace PTHREADPOOL iov_buf msghdr time-basic')
-
-bld.SAMBA3_BINARY('unix_msg_test',
- source='tests.c',
- deps='UNIX_MSG POLL_FUNCS_TEVENT',
- install=False)
-bld.SAMBA3_BINARY('unix_msg_test_drain',
- source='test_drain.c',
- deps='UNIX_MSG POLL_FUNCS_TEVENT',
- install=False)
-bld.SAMBA3_BINARY('unix_msg_test_source',
- source='test_source.c',
- deps='UNIX_MSG POLL_FUNCS_TEVENT',
- install=False)
--
2.7.4
>From 0f5671fe45d53259a352de25f5122389b2ffb4c0 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Wed, 28 Sep 2016 14:44:03 -0700
Subject: [PATCH 15/26] lib: Remove poll_funcs
unix_msg was the only user
Signed-off-by: Volker Lendecke <vl at samba.org>
---
lib/poll_funcs/poll_funcs.h | 131 -------
lib/poll_funcs/poll_funcs_tevent.c | 684 -------------------------------------
lib/poll_funcs/poll_funcs_tevent.h | 38 ---
lib/poll_funcs/wscript_build | 5 -
source3/wscript_build | 1 -
wscript_build | 1 -
6 files changed, 860 deletions(-)
delete mode 100644 lib/poll_funcs/poll_funcs.h
delete mode 100644 lib/poll_funcs/poll_funcs_tevent.c
delete mode 100644 lib/poll_funcs/poll_funcs_tevent.h
delete mode 100644 lib/poll_funcs/wscript_build
diff --git a/lib/poll_funcs/poll_funcs.h b/lib/poll_funcs/poll_funcs.h
deleted file mode 100644
index b16f07f..0000000
--- a/lib/poll_funcs/poll_funcs.h
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Unix SMB/CIFS implementation.
- * Copyright (C) Volker Lendecke 2013
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-
-/**
- * @file poll_funcs.h
- *
- * @brief event loop abstraction
- */
-
-/*
- * This is inspired by AvahiWatch, the avahi event loop abstraction.
- */
-
-#ifndef __POLL_FUNCS_H__
-#define __POLL_FUNCS_H__
-
-#include "replace.h"
-
-/**
- * poll_watch and poll_timeout are undefined here, every implementation can
- * implement its own structures.
- */
-
-struct poll_watch;
-struct poll_timeout;
-
-struct poll_funcs {
-
- /**
- * @brief Create a new file descriptor watch
- *
- * @param[in] funcs The callback array
- * @param[in] fd The fd to watch
- * @param[in] events POLLIN and POLLOUT or'ed together
- * @param[in] callback Function to call by the implementation
- * @param[in] private_data Pointer to give back to callback
- *
- * @return A new poll_watch struct
- */
-
- struct poll_watch *(*watch_new)(
- const struct poll_funcs *funcs, int fd, short events,
- void (*callback)(struct poll_watch *w, int fd,
- short events, void *private_data),
- void *private_data);
-
- /**
- * @brief Change the watched events for a struct poll_watch
- *
- * @param[in] w The poll_watch to change
- * @param[in] events new POLLIN and POLLOUT or'ed together
- */
-
- void (*watch_update)(struct poll_watch *w, short events);
-
- /**
- * @brief Read events currently watched
- *
- * @param[in] w The poll_watch to inspect
- *
- * @returns The events currently watched
- */
-
- short (*watch_get_events)(struct poll_watch *w);
-
- /**
- * @brief Free a struct poll_watch
- *
- * @param[in] w The poll_watch struct to free
- */
-
- void (*watch_free)(struct poll_watch *w);
-
-
- /**
- * @brief Create a new timeout watch
- *
- * @param[in] funcs The callback array
- * @param[in] tv The time when the timeout should trigger
- * @param[in] callback Function to call at time "tv"
- * @param[in] private_data Pointer to give back to callback
- *
- * @return A new poll_timeout struct
- */
-
- struct poll_timeout *(*timeout_new)(
- const struct poll_funcs *funcs, const struct timeval tv,
- void (*callback)(struct poll_timeout *t, void *private_data),
- void *private_data);
-
- /**
- * @brief Change the timeout of a watch
- *
- * @param[in] t The timeout watch to change
- * @param[in] tv The new trigger time
- */
-
- void (*timeout_update)(struct poll_timeout *t,
- const struct timeval tv);
-
- /**
- * @brief Free a poll_timeout
- *
- * @param[in] t The poll_timeout to free
- */
-
- void (*timeout_free)(struct poll_timeout *t);
-
- /**
- * @brief private data for use by the implementation
- */
-
- void *private_data;
-};
-
-#endif
diff --git a/lib/poll_funcs/poll_funcs_tevent.c b/lib/poll_funcs/poll_funcs_tevent.c
deleted file mode 100644
index 4dd09a2..0000000
--- a/lib/poll_funcs/poll_funcs_tevent.c
+++ /dev/null
@@ -1,684 +0,0 @@
-/*
- * Unix SMB/CIFS implementation.
- * Copyright (C) Volker Lendecke 2013,2014
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "poll_funcs_tevent.h"
-#include "tevent.h"
-#include "system/select.h"
-#include "lib/util/dlinklist.h"
-
-/*
- * A poll_watch is asked for by the engine using this library via
- * funcs->watch_new(). It represents interest in "fd" becoming readable or
- * writable.
- */
-
-struct poll_watch {
- struct poll_funcs_state *state;
- size_t slot; /* index into state->watches[] */
- int fd;
- int events;
- void (*callback)(struct poll_watch *w, int fd, short events,
- void *private_data);
- void *private_data;
-};
-
-struct poll_timeout {
- struct poll_funcs_state *state;
- size_t slot; /* index into state->timeouts[] */
- struct timeval tv;
- void (*callback)(struct poll_timeout *t, void *private_data);
- void *private_data;
-};
-
-struct poll_funcs_state {
- /*
- * "watches" is the array of all watches that we have handed out via
- * funcs->watch_new(). The "watches" array can contain NULL pointers.
- */
- struct poll_watch **watches;
-
- /*
- * Like "watches" for timeouts;
- */
- struct poll_timeout **timeouts;
-
- /*
- * "contexts is the array of tevent_contexts that serve
- * "watches". "contexts" can contain NULL pointers.
- */
- struct poll_funcs_tevent_context **contexts;
-};
-
-struct poll_funcs_tevent_context {
- struct poll_funcs_tevent_handle *handles;
- struct poll_funcs_state *state;
- unsigned slot; /* index into state->contexts[] */
- struct tevent_context *ev;
- struct tevent_fd **fdes; /* same indexes as state->watches[] */
- struct tevent_timer **timers; /* same indexes as state->timeouts[] */
-};
-
-/*
- * poll_funcs_tevent_register() hands out a struct poll_funcs_tevent_handle as
- * a void *. poll_funcs_tevent_register allows tevent_contexts to be
- * registered multiple times, and we can't add a tevent_fd for the same fd's
- * multiple times. So we have to share one poll_funcs_tevent_context.
- */
-struct poll_funcs_tevent_handle {
- struct poll_funcs_tevent_handle *prev, *next;
- struct poll_funcs_tevent_context *ctx;
-};
-
-static uint16_t poll_events_to_tevent(short events)
-{
- uint16_t ret = 0;
-
- if (events & POLLIN) {
- ret |= TEVENT_FD_READ;
- }
- if (events & POLLOUT) {
- ret |= TEVENT_FD_WRITE;
- }
- return ret;
-}
-
-static short tevent_to_poll_events(uint16_t flags)
-{
- short ret = 0;
-
- if (flags & TEVENT_FD_READ) {
- ret |= POLLIN;
- }
- if (flags & TEVENT_FD_WRITE) {
- ret |= POLLOUT;
- }
- return ret;
-}
-
-/*
- * Find or create a free slot in state->watches[]
- */
-static bool poll_funcs_watch_find_slot(struct poll_funcs_state *state,
- size_t *slot)
-{
- struct poll_watch **watches;
- size_t i, num_watches, num_contexts;
-
- num_watches = talloc_array_length(state->watches);
-
- for (i=0; i<num_watches; i++) {
- if (state->watches[i] == NULL) {
- *slot = i;
- return true;
- }
- }
-
- watches = talloc_realloc(state, state->watches, struct poll_watch *,
- num_watches + 1);
- if (watches == NULL) {
- return false;
- }
- watches[num_watches] = NULL;
- state->watches = watches;
-
- num_contexts = talloc_array_length(state->contexts);
-
- for (i=0; i<num_contexts; i++) {
- struct tevent_fd **fdes;
- struct poll_funcs_tevent_context *c = state->contexts[i];
- if (c == NULL) {
- continue;
- }
- fdes = talloc_realloc(c, c->fdes, struct tevent_fd *,
- num_watches + 1);
- if (fdes == NULL) {
- state->watches = talloc_realloc(
- state, state->watches, struct poll_watch *,
- num_watches);
- return false;
- }
- c->fdes = fdes;
-
- fdes[num_watches] = NULL;
- }
-
- *slot = num_watches;
-
- return true;
-}
-
-static void poll_funcs_fde_handler(struct tevent_context *ev,
- struct tevent_fd *fde, uint16_t flags,
- void *private_data);
-static int poll_watch_destructor(struct poll_watch *w);
-
-static struct poll_watch *tevent_watch_new(
- const struct poll_funcs *funcs, int fd, short events,
- void (*callback)(struct poll_watch *w, int fd, short events,
- void *private_data),
- void *private_data)
-{
- struct poll_funcs_state *state = talloc_get_type_abort(
- funcs->private_data, struct poll_funcs_state);
- struct poll_watch *w;
- size_t i, slot, num_contexts;
-
- if (!poll_funcs_watch_find_slot(state, &slot)) {
- return NULL;
- }
-
- w = talloc(state->watches, struct poll_watch);
- if (w == NULL) {
- return NULL;
- }
- w->state = state;
- w->slot = slot;
- w->fd = fd;
- w->events = poll_events_to_tevent(events);
- w->fd = fd;
- w->callback = callback;
- w->private_data = private_data;
- state->watches[slot] = w;
-
- talloc_set_destructor(w, poll_watch_destructor);
-
- num_contexts = talloc_array_length(state->contexts);
-
- for (i=0; i<num_contexts; i++) {
- struct poll_funcs_tevent_context *c = state->contexts[i];
- if (c == NULL) {
- continue;
- }
- c->fdes[slot] = tevent_add_fd(c->ev, c->fdes, w->fd, w->events,
- poll_funcs_fde_handler, w);
- if (c->fdes[slot] == NULL) {
- goto fail;
- }
- }
- return w;
-
-fail:
- TALLOC_FREE(w);
- return NULL;
-}
-
-static int poll_watch_destructor(struct poll_watch *w)
-{
- struct poll_funcs_state *state = w->state;
- size_t num_contexts = talloc_array_length(state->contexts);
- size_t slot = w->slot;
- size_t i;
-
- TALLOC_FREE(state->watches[slot]);
-
- for (i=0; i<num_contexts; i++) {
- struct poll_funcs_tevent_context *c = state->contexts[i];
- if (c == NULL) {
- continue;
- }
- TALLOC_FREE(c->fdes[slot]);
- }
-
- return 0;
-}
-
-static void tevent_watch_update(struct poll_watch *w, short events)
-{
- struct poll_funcs_state *state = w->state;
- size_t num_contexts = talloc_array_length(state->contexts);
- size_t slot = w->slot;
- size_t i;
-
- w->events = poll_events_to_tevent(events);
-
- for (i=0; i<num_contexts; i++) {
- struct poll_funcs_tevent_context *c = state->contexts[i];
- if (c == NULL) {
- continue;
- }
- tevent_fd_set_flags(c->fdes[slot], w->events);
- }
-}
-
-static short tevent_watch_get_events(struct poll_watch *w)
-{
- return tevent_to_poll_events(w->events);
-}
-
-static void tevent_watch_free(struct poll_watch *w)
-{
- TALLOC_FREE(w);
-}
-
-static bool poll_funcs_timeout_find_slot(struct poll_funcs_state *state,
- size_t *slot)
-{
- struct poll_timeout **timeouts;
- size_t i, num_timeouts, num_contexts;
-
- num_timeouts = talloc_array_length(state->timeouts);
-
- for (i=0; i<num_timeouts; i++) {
- if (state->timeouts[i] == NULL) {
- *slot = i;
- return true;
- }
- }
-
- timeouts = talloc_realloc(state, state->timeouts,
- struct poll_timeout *,
- num_timeouts + 1);
- if (timeouts == NULL) {
- return false;
- }
- timeouts[num_timeouts] = NULL;
- state->timeouts = timeouts;
-
- num_contexts = talloc_array_length(state->contexts);
-
- for (i=0; i<num_contexts; i++) {
- struct tevent_timer **timers;
- struct poll_funcs_tevent_context *c = state->contexts[i];
- if (c == NULL) {
- continue;
- }
- timers = talloc_realloc(c, c->timers, struct tevent_timer *,
- num_timeouts + 1);
- if (timers == NULL) {
- state->timeouts = talloc_realloc(
- state, state->timeouts, struct poll_timeout *,
- num_timeouts);
- return false;
- }
- c->timers = timers;
-
- timers[num_timeouts] = NULL;
- }
-
- *slot = num_timeouts;
-
- return true;
-}
-
-static void poll_funcs_timer_handler(struct tevent_context *ev,
- struct tevent_timer *te,
- struct timeval current_time,
- void *private_data);
-static int poll_timeout_destructor(struct poll_timeout *t);
-
-static struct poll_timeout *tevent_timeout_new(
- const struct poll_funcs *funcs, const struct timeval tv,
- void (*callback)(struct poll_timeout *t, void *private_data),
- void *private_data)
-{
- struct poll_funcs_state *state = talloc_get_type_abort(
- funcs->private_data, struct poll_funcs_state);
- struct poll_timeout *t;
- size_t i, slot, num_contexts;
-
- if (!poll_funcs_timeout_find_slot(state, &slot)) {
- return NULL;
- }
-
- t = talloc(state->timeouts, struct poll_timeout);
- if (t == NULL) {
- return NULL;
- }
- t->state = state;
- t->slot = slot;
- t->tv = tv;
- t->callback = callback;
- t->private_data = private_data;
-
- talloc_set_destructor(t, poll_timeout_destructor);
-
- num_contexts = talloc_array_length(state->contexts);
-
- for (i=0; i<num_contexts; i++) {
- struct poll_funcs_tevent_context *c = state->contexts[i];
- if (c == NULL) {
- continue;
- }
- c->timers[slot] = tevent_add_timer(
- c->ev, c->timers, tv, poll_funcs_timer_handler, t);
- if (c->timers[slot] == NULL) {
- goto fail;
- }
- }
- return t;
-
-fail:
- TALLOC_FREE(t);
- return NULL;
-}
-
-static int poll_timeout_destructor(struct poll_timeout *t)
-{
- struct poll_funcs_state *state = t->state;
- size_t num_contexts = talloc_array_length(state->contexts);
- size_t slot = t->slot;
- size_t i;
-
- TALLOC_FREE(state->timeouts[slot]);
-
- for (i=0; i<num_contexts; i++) {
- struct poll_funcs_tevent_context *c = state->contexts[i];
- if (c == NULL) {
- continue;
- }
- TALLOC_FREE(c->timers[slot]);
- }
-
- return 0;
-}
-
-static void poll_funcs_timer_handler(struct tevent_context *ev,
- struct tevent_timer *te,
- struct timeval current_time,
- void *private_data)
-{
- struct poll_timeout *t = talloc_get_type_abort(
- private_data, struct poll_timeout);
- struct poll_funcs_state *state = t->state;
- size_t slot = t->slot;
- size_t i, num_contexts;
-
- num_contexts = talloc_array_length(state->contexts);
-
- for (i=0; i<num_contexts; i++) {
- struct poll_funcs_tevent_context *c = state->contexts[i];
- if (c == NULL) {
- continue;
- }
- TALLOC_FREE(c->timers[slot]);
- }
-
- t->callback(t, t->private_data);
-}
-
-static void tevent_timeout_update(struct poll_timeout *t,
- const struct timeval tv)
-{
- struct poll_funcs_state *state = t->state;
- size_t num_contexts = talloc_array_length(state->contexts);
- size_t slot = t->slot;
- size_t i;
-
- for (i=0; i<num_contexts; i++) {
- struct poll_funcs_tevent_context *c = state->contexts[i];
- if (c == NULL) {
- continue;
- }
- TALLOC_FREE(c->timers[slot]);
-
- c->timers[slot] = tevent_add_timer(
- c->ev, c->timers, tv, poll_funcs_timer_handler, t);
- if (c->timers[slot] == NULL) {
- /*
- * We just free'ed the space, why did this fail??
- */
- abort();
- }
- }
-}
-
-static void tevent_timeout_free(struct poll_timeout *t)
-{
- TALLOC_FREE(t);
-}
-
-static int poll_funcs_state_destructor(struct poll_funcs_state *state);
-
-struct poll_funcs *poll_funcs_init_tevent(TALLOC_CTX *mem_ctx)
-{
- struct poll_funcs *f;
- struct poll_funcs_state *state;
-
- f = talloc(mem_ctx, struct poll_funcs);
- if (f == NULL) {
- return NULL;
- }
- state = talloc_zero(f, struct poll_funcs_state);
- if (state == NULL) {
- TALLOC_FREE(f);
- return NULL;
- }
- talloc_set_destructor(state, poll_funcs_state_destructor);
-
- f->watch_new = tevent_watch_new;
- f->watch_update = tevent_watch_update;
- f->watch_get_events = tevent_watch_get_events;
- f->watch_free = tevent_watch_free;
- f->timeout_new = tevent_timeout_new;
- f->timeout_update = tevent_timeout_update;
- f->timeout_free = tevent_timeout_free;
- f->private_data = state;
- return f;
-}
-
-static int poll_funcs_state_destructor(struct poll_funcs_state *state)
-{
- size_t num_watches = talloc_array_length(state->watches);
- size_t num_timeouts = talloc_array_length(state->timeouts);
- size_t num_contexts = talloc_array_length(state->contexts);
- size_t i;
- /*
- * Make sure the watches are cleared before the contexts. The watches
- * have destructors attached to them that clean up the fde's
- */
- for (i=0; i<num_watches; i++) {
- TALLOC_FREE(state->watches[i]);
- }
- for (i=0; i<num_timeouts; i++) {
- TALLOC_FREE(state->timeouts[i]);
- }
- for (i=0; i<num_contexts; i++) {
- TALLOC_FREE(state->contexts[i]);
- }
- return 0;
-}
-
-/*
- * Find or create a free slot in state->contexts[]
- */
-static bool poll_funcs_context_slot_find(struct poll_funcs_state *state,
- struct tevent_context *ev,
- size_t *slot)
-{
- struct poll_funcs_tevent_context **contexts;
- size_t num_contexts = talloc_array_length(state->contexts);
- size_t i;
-
- /* Look for an existing match first. */
- for (i=0; i<num_contexts; i++) {
- struct poll_funcs_tevent_context *ctx = state->contexts[i];
-
- if (ctx != NULL && ctx->ev == ev) {
- *slot = i;
- return true;
- }
- }
-
- /* Now look for a free slot. */
- for (i=0; i<num_contexts; i++) {
- struct poll_funcs_tevent_context *ctx = state->contexts[i];
-
- if (ctx == NULL) {
- *slot = i;
- return true;
- }
- }
-
- contexts = talloc_realloc(state, state->contexts,
- struct poll_funcs_tevent_context *,
- num_contexts + 1);
- if (contexts == NULL) {
- return false;
- }
- state->contexts = contexts;
- state->contexts[num_contexts] = NULL;
-
- *slot = num_contexts;
-
- return true;
-}
-
-static int poll_funcs_tevent_context_destructor(
- struct poll_funcs_tevent_context *ctx);
-
-static struct poll_funcs_tevent_context *poll_funcs_tevent_context_new(
- TALLOC_CTX *mem_ctx, struct poll_funcs_state *state,
- struct tevent_context *ev, unsigned slot)
-{
- struct poll_funcs_tevent_context *ctx;
- size_t num_watches = talloc_array_length(state->watches);
- size_t num_timeouts = talloc_array_length(state->timeouts);
- size_t i;
-
- ctx = talloc(mem_ctx, struct poll_funcs_tevent_context);
- if (ctx == NULL) {
- return NULL;
- }
-
- ctx->handles = NULL;
- ctx->state = state;
- ctx->ev = ev;
- ctx->slot = slot;
-
- ctx->fdes = talloc_array(ctx, struct tevent_fd *, num_watches);
- if (ctx->fdes == NULL) {
- goto fail;
- }
-
- for (i=0; i<num_watches; i++) {
- struct poll_watch *w = state->watches[i];
-
- if (w == NULL) {
- ctx->fdes[i] = NULL;
- continue;
- }
- ctx->fdes[i] = tevent_add_fd(ev, ctx->fdes, w->fd, w->events,
- poll_funcs_fde_handler, w);
- if (ctx->fdes[i] == NULL) {
- goto fail;
- }
- }
-
- ctx->timers = talloc_array(ctx, struct tevent_timer *, num_timeouts);
- if (ctx->timers == NULL) {
- goto fail;
- }
-
- for (i=0; i<num_timeouts; i++) {
- struct poll_timeout *t = state->timeouts[i];
-
- if (t == NULL) {
- ctx->timers[i] = NULL;
- continue;
- }
- ctx->timers[i] = tevent_add_timer(ctx->ev, ctx->timers, t->tv,
- poll_funcs_timer_handler, t);
- if (ctx->timers[i] == 0) {
- goto fail;
- }
- }
-
- talloc_set_destructor(ctx, poll_funcs_tevent_context_destructor);
- return ctx;
-fail:
- TALLOC_FREE(ctx);
- return NULL;
-}
-
-static int poll_funcs_tevent_context_destructor(
- struct poll_funcs_tevent_context *ctx)
-{
- struct poll_funcs_tevent_handle *h;
-
- ctx->state->contexts[ctx->slot] = NULL;
-
- for (h = ctx->handles; h != NULL; h = h->next) {
- h->ctx = NULL;
- }
-
- return 0;
-}
-
-static void poll_funcs_fde_handler(struct tevent_context *ev,
- struct tevent_fd *fde, uint16_t flags,
- void *private_data)
-{
- struct poll_watch *w = talloc_get_type_abort(
- private_data, struct poll_watch);
- short events = tevent_to_poll_events(flags);
- w->callback(w, w->fd, events, w->private_data);
-}
-
-static int poll_funcs_tevent_handle_destructor(
- struct poll_funcs_tevent_handle *handle);
-
-void *poll_funcs_tevent_register(TALLOC_CTX *mem_ctx, struct poll_funcs *f,
- struct tevent_context *ev)
-{
- struct poll_funcs_state *state = talloc_get_type_abort(
- f->private_data, struct poll_funcs_state);
- struct poll_funcs_tevent_handle *handle;
- size_t slot;
-
- handle = talloc(mem_ctx, struct poll_funcs_tevent_handle);
- if (handle == NULL) {
- return NULL;
- }
-
- if (!poll_funcs_context_slot_find(state, ev, &slot)) {
- goto fail;
- }
- if (state->contexts[slot] == NULL) {
- state->contexts[slot] = poll_funcs_tevent_context_new(
- state->contexts, state, ev, slot);
- if (state->contexts[slot] == NULL) {
- goto fail;
- }
- }
-
- handle->ctx = state->contexts[slot];
- DLIST_ADD(handle->ctx->handles, handle);
- talloc_set_destructor(handle, poll_funcs_tevent_handle_destructor);
- return handle;
-fail:
- TALLOC_FREE(handle);
- return NULL;
-}
-
-static int poll_funcs_tevent_handle_destructor(
- struct poll_funcs_tevent_handle *handle)
-{
- if (handle->ctx == NULL) {
- return 0;
- }
- if (handle->ctx->handles == NULL) {
- abort();
- }
-
- DLIST_REMOVE(handle->ctx->handles, handle);
-
- if (handle->ctx->handles == NULL) {
- TALLOC_FREE(handle->ctx);
- }
- return 0;
-}
diff --git a/lib/poll_funcs/poll_funcs_tevent.h b/lib/poll_funcs/poll_funcs_tevent.h
deleted file mode 100644
index 8b2964c..0000000
--- a/lib/poll_funcs/poll_funcs_tevent.h
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Unix SMB/CIFS implementation.
- * Copyright (C) Volker Lendecke 2013,2014
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
- */
-
-#ifndef __POLL_FUNCS_TEVENT_H__
-#define __POLL_FUNCS_TEVENT_H__
-
-#include "poll_funcs.h"
-#include "tevent.h"
-
-/*
- * Create a new, empty instance of "struct poll_funcs" to be served by tevent.
- */
-struct poll_funcs *poll_funcs_init_tevent(TALLOC_CTX *mem_ctx);
-
-/*
- * Register a tevent_context to handle the watches that the user of
- * "poll_funcs" showed interest in. talloc_free() the returned pointer when
- * "ev" is not supposed to handle the events anymore.
- */
-void *poll_funcs_tevent_register(TALLOC_CTX *mem_ctx, struct poll_funcs *f,
- struct tevent_context *ev);
-
-#endif
diff --git a/lib/poll_funcs/wscript_build b/lib/poll_funcs/wscript_build
deleted file mode 100644
index df9a298..0000000
--- a/lib/poll_funcs/wscript_build
+++ /dev/null
@@ -1,5 +0,0 @@
-#!/usr/bin/env python
-
-bld.SAMBA_SUBSYSTEM('POLL_FUNCS_TEVENT',
- source='poll_funcs_tevent.c',
- deps='tevent')
diff --git a/source3/wscript_build b/source3/wscript_build
index 1598555..66fcaac 100755
--- a/source3/wscript_build
+++ b/source3/wscript_build
@@ -354,7 +354,6 @@ bld.SAMBA3_SUBSYSTEM('samba3core',
UTIL_PW
SAMBA_VERSION
PTHREADPOOL
- POLL_FUNCS_TEVENT
interfaces
param
dbwrap
diff --git a/wscript_build b/wscript_build
index fa3aa60..93b1832 100644
--- a/wscript_build
+++ b/wscript_build
@@ -43,7 +43,6 @@ bld.RECURSE('lib/texpect')
bld.RECURSE('lib/addns')
bld.RECURSE('lib/ldb')
bld.RECURSE('lib/param')
-bld.RECURSE('lib/poll_funcs')
bld.RECURSE('dynconfig')
bld.RECURSE('lib/util/charset')
bld.RECURSE('python')
--
2.7.4
>From d9c0f5ff83bfb753bc9b4b7562174fa7d63432dd Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 9 Sep 2016 16:51:25 +0200
Subject: [PATCH 16/26] messaging: add an overflow test
Send 1000 messages without picking them up. Then destroy the sending messaging
context.
Signed-off-by: Volker Lendecke <vl at samba.org>
---
source4/lib/messaging/tests/messaging.c | 65 +++++++++++++++++++++++++++++++++
1 file changed, 65 insertions(+)
diff --git a/source4/lib/messaging/tests/messaging.c b/source4/lib/messaging/tests/messaging.c
index 51195a1..4d8b53f 100644
--- a/source4/lib/messaging/tests/messaging.c
+++ b/source4/lib/messaging/tests/messaging.c
@@ -26,6 +26,7 @@
#include "cluster/cluster.h"
#include "param/param.h"
#include "torture/local/proto.h"
+#include "system/select.h"
static uint32_t msg_pong;
@@ -134,9 +135,73 @@ static bool test_ping_speed(struct torture_context *tctx)
return true;
}
+static bool test_messaging_overflow(struct torture_context *tctx)
+{
+ struct imessaging_context *msg_ctx;
+ pid_t child;
+ int fds[2];
+ int i, ret, child_status;
+
+ ret = pipe(fds);
+ torture_assert(tctx, ret == 0, "pipe failed");
+
+ child = fork();
+ if (child < 0) {
+ torture_fail(tctx, "fork failed");
+ }
+
+ if (child == 0) {
+ ssize_t nread;
+ char c;
+
+ ret = close(fds[1]);
+ torture_assert(tctx, ret == 0, "close failed");
+
+ msg_ctx = imessaging_init(tctx, tctx->lp_ctx,
+ cluster_id(getpid(), 0),
+ tctx->ev);
+ torture_assert(tctx, msg_ctx != NULL,
+ "imessaging_init failed");
+
+ do {
+ nread = read(fds[0], &c, 1);
+ } while ((nread == -1) && (errno == EINTR));
+
+ exit(0);
+ }
+
+ msg_ctx = imessaging_init(tctx, tctx->lp_ctx, cluster_id(getpid(), 0),
+ tctx->ev);
+ torture_assert(tctx, msg_ctx != NULL, "imessaging_init failed");
+
+ for (i=0; i<1000; i++) {
+ NTSTATUS status;
+ status = imessaging_send(msg_ctx, cluster_id(child, 0),
+ MSG_PING, NULL);
+ torture_assert_ntstatus_ok(tctx, status,
+ "imessaging_send failed");
+ }
+
+ tevent_loop_once(tctx->ev);
+
+ talloc_free(msg_ctx);
+
+ ret = close(fds[1]);
+ torture_assert(tctx, ret == 0, "close failed");
+
+ ret = waitpid(child, &child_status, 0);
+ torture_assert(tctx, ret == child, "wrong child exited");
+ torture_assert(tctx, child_status == 0, "child failed");
+
+ poll(NULL, 0, 500);
+
+ return true;
+}
+
struct torture_suite *torture_local_messaging(TALLOC_CTX *mem_ctx)
{
struct torture_suite *s = torture_suite_create(mem_ctx, "messaging");
+ torture_suite_add_simple_test(s, "overflow", test_messaging_overflow);
torture_suite_add_simple_test(s, "ping_speed", test_ping_speed);
return s;
}
--
2.7.4
>From 201252754ab2234f5aae5c881f253255143e10ee Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 23 Sep 2016 19:06:56 -0700
Subject: [PATCH 17/26] lib: Add messaging_rec_create
Essentially a wrapper around messaging_rec_dup
Signed-off-by: Volker Lendecke <vl at samba.org>
---
source3/lib/messages.c | 49 +++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 49 insertions(+)
diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index 3e11cc5..a3695e9 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -83,6 +83,8 @@ struct messaging_context {
struct server_id_db *names_db;
};
+static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
+ struct messaging_rec *rec);
static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
struct messaging_rec *rec);
@@ -105,6 +107,53 @@ static void ping_message(struct messaging_context *msg_ctx,
messaging_send(msg_ctx, src, MSG_PONG, data);
}
+static struct messaging_rec *messaging_rec_create(
+ TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
+ uint32_t msg_type, const struct iovec *iov, int iovlen,
+ const int *fds, size_t num_fds)
+{
+ ssize_t buflen;
+ uint8_t *buf;
+ struct messaging_rec *result;
+
+ if (num_fds > INT8_MAX) {
+ return NULL;
+ }
+
+ buflen = iov_buflen(iov, iovlen);
+ if (buflen == -1) {
+ return NULL;
+ }
+ buf = talloc_array(mem_ctx, uint8_t, buflen);
+ if (buf == NULL) {
+ return NULL;
+ }
+ iov_buf(iov, iovlen, buf, buflen);
+
+ {
+ struct messaging_rec rec;
+ int64_t fds64[num_fds];
+ size_t i;
+
+ for (i=0; i<num_fds; i++) {
+ fds64[i] = fds[i];
+ }
+
+ rec = (struct messaging_rec) {
+ .msg_version = MESSAGE_VERSION, .msg_type = msg_type,
+ .src = src, .dest = dst,
+ .buf.data = buf, .buf.length = buflen,
+ .num_fds = num_fds, .fds = fds64,
+ };
+
+ result = messaging_rec_dup(mem_ctx, &rec);
+ }
+
+ TALLOC_FREE(buf);
+
+ return result;
+}
+
static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
int *fds, size_t num_fds,
void *private_data)
--
2.7.4
>From ed9030c6f555d942358f7d6eccce7c26a91f98b1 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 25 Jul 2016 16:31:18 +0200
Subject: [PATCH 18/26] messaging: Optimize self-sends
We need to go through the event loop, which messaging_dgm_send does. We can
also use a tevent_immediate for the same purpose. Right now the main user is
messaging_ctdb: Here strace looks a bit weird when we receive a message.
Signed-off-by: Volker Lendecke <vl at samba.org>
---
source3/lib/messages.c | 58 ++++++++++++++++++++++++++++++++++++++++++++++++++
1 file changed, 58 insertions(+)
diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index a3695e9..ae06243 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -479,6 +479,58 @@ NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
return messaging_send(msg_ctx, server, msg_type, &blob);
}
+struct messaging_post_state {
+ struct messaging_context *msg_ctx;
+ struct messaging_rec *rec;
+};
+
+static void messaging_post_handler(struct tevent_context *ev,
+ struct tevent_immediate *ti,
+ void *private_data);
+
+static int messaging_post_self(struct messaging_context *msg_ctx,
+ struct server_id src, struct server_id dst,
+ uint32_t msg_type,
+ const struct iovec *iov, int iovlen,
+ const int *fds, size_t num_fds)
+{
+ struct tevent_immediate *ti;
+ struct messaging_post_state *state;
+
+ ti = tevent_create_immediate(msg_ctx);
+ if (ti == NULL) {
+ return ENOMEM;
+ }
+ state = talloc(ti, struct messaging_post_state);
+ if (state == NULL) {
+ goto fail;
+ }
+ state->msg_ctx = msg_ctx;
+
+ state->rec = messaging_rec_create(
+ state, src, dst, msg_type, iov, iovlen, fds, num_fds);
+ if (state->rec == NULL) {
+ goto fail;
+ }
+
+ tevent_schedule_immediate(ti, msg_ctx->event_ctx,
+ messaging_post_handler, state);
+ return 0;
+
+fail:
+ TALLOC_FREE(ti);
+ return ENOMEM;
+}
+
+static void messaging_post_handler(struct tevent_context *ev,
+ struct tevent_immediate *ti,
+ void *private_data)
+{
+ struct messaging_post_state *state = talloc_get_type_abort(
+ private_data, struct messaging_post_state);
+ messaging_dispatch_rec(state->msg_ctx, state->rec);
+}
+
int messaging_send_iov_from(struct messaging_context *msg_ctx,
struct server_id src, struct server_id dst,
uint32_t msg_type,
@@ -509,6 +561,12 @@ int messaging_send_iov_from(struct messaging_context *msg_ctx,
return ret;
}
+ if (server_id_equal(&dst, &msg_ctx->id)) {
+ ret = messaging_post_self(msg_ctx, src, dst, msg_type,
+ iov, iovlen, fds, num_fds);
+ return ret;
+ }
+
message_hdr_put(hdr, msg_type, src, dst);
iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
--
2.7.4
>From 61372bc07062abbf081f171a992f51ce00705941 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 12 Sep 2016 13:02:26 +0200
Subject: [PATCH 19/26] tevent: Add tevent_req_reset_endtime
We might decide at some point that we don't want a request to
time out
Signed-off-by: Volker Lendecke <vl at samba.org>
---
lib/tevent/ABI/tevent-0.9.30.sigs | 1 +
lib/tevent/tevent.h | 7 +++++++
lib/tevent/tevent_req.c | 5 +++++
3 files changed, 13 insertions(+)
diff --git a/lib/tevent/ABI/tevent-0.9.30.sigs b/lib/tevent/ABI/tevent-0.9.30.sigs
index ea179a0..7a6a236 100644
--- a/lib/tevent/ABI/tevent-0.9.30.sigs
+++ b/lib/tevent/ABI/tevent-0.9.30.sigs
@@ -69,6 +69,7 @@ tevent_req_poll: bool (struct tevent_req *, struct tevent_context *)
tevent_req_post: struct tevent_req *(struct tevent_req *, struct tevent_context *)
tevent_req_print: char *(TALLOC_CTX *, struct tevent_req *)
tevent_req_received: void (struct tevent_req *)
+tevent_req_reset_endtime: void (struct tevent_req *)
tevent_req_set_callback: void (struct tevent_req *, tevent_req_fn, void *)
tevent_req_set_cancel_fn: void (struct tevent_req *, tevent_req_cancel_fn)
tevent_req_set_cleanup_fn: void (struct tevent_req *, tevent_req_cleanup_fn)
diff --git a/lib/tevent/tevent.h b/lib/tevent/tevent.h
index bb23257..ba4bb4d 100644
--- a/lib/tevent/tevent.h
+++ b/lib/tevent/tevent.h
@@ -1040,6 +1040,13 @@ bool tevent_req_set_endtime(struct tevent_req *req,
struct tevent_context *ev,
struct timeval endtime);
+/**
+ * @brief Reset the timer set by tevent_req_set_endtime.
+ *
+ * @param[in] req The request to reset the timeout for
+ */
+void tevent_req_reset_endtime(struct tevent_req *req);
+
#ifdef DOXYGEN
/**
* @brief Call the notify callback of the given tevent request manually.
diff --git a/lib/tevent/tevent_req.c b/lib/tevent/tevent_req.c
index e2b7104..e309c3d 100644
--- a/lib/tevent/tevent_req.c
+++ b/lib/tevent/tevent_req.c
@@ -313,6 +313,11 @@ bool tevent_req_set_endtime(struct tevent_req *req,
return true;
}
+void tevent_req_reset_endtime(struct tevent_req *req)
+{
+ TALLOC_FREE(req->internal.timer);
+}
+
void tevent_req_set_callback(struct tevent_req *req, tevent_req_fn fn, void *pvt)
{
req->async.fn = fn;
--
2.7.4
>From 11bb60be6325f459b422dd77b5bad1d418c86519 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 12 Sep 2016 14:12:10 +0200
Subject: [PATCH 20/26] messages_dgm: Drop a segment if we can't ship it for 60
seconds
---
source3/lib/messages_dgm.c | 10 ++++++++++
1 file changed, 10 insertions(+)
diff --git a/source3/lib/messages_dgm.c b/source3/lib/messages_dgm.c
index 5f82168..89ccce9 100644
--- a/source3/lib/messages_dgm.c
+++ b/source3/lib/messages_dgm.c
@@ -455,6 +455,8 @@ static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
struct messaging_dgm_out_queue_state *state = tevent_req_data(
req, struct messaging_dgm_out_queue_state);
+ tevent_req_reset_endtime(req);
+
state->subreq = pthreadpool_tevent_job_send(
state, state->ev, state->pool,
messaging_dgm_out_threaded_job, state);
@@ -518,6 +520,7 @@ static int messaging_dgm_out_send_fragment(
{
struct tevent_req *req;
size_t qlen;
+ bool ok;
qlen = tevent_queue_length(out->queue);
if (qlen == 0) {
@@ -550,6 +553,13 @@ static int messaging_dgm_out_send_fragment(
}
tevent_req_set_callback(req, messaging_dgm_out_sent_fragment, out);
+ ok = tevent_req_set_endtime(req, ev,
+ tevent_timeval_current_ofs(60, 0));
+ if (!ok) {
+ TALLOC_FREE(req);
+ return ENOMEM;
+ }
+
return 0;
}
--
2.7.4
>From 67d07d53d4d9dc11655f4e4d3f5e4efbefe1bb4e Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 23 Sep 2016 17:07:20 -0700
Subject: [PATCH 21/26] messages_dgm: Pass down event_ctx one level
Signed-off-by: Volker Lendecke <vl at samba.org>
---
source3/lib/messages_dgm.c | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/source3/lib/messages_dgm.c b/source3/lib/messages_dgm.c
index 89ccce9..070cac6 100644
--- a/source3/lib/messages_dgm.c
+++ b/source3/lib/messages_dgm.c
@@ -983,6 +983,7 @@ static int messaging_dgm_context_destructor(struct messaging_dgm_context *c)
}
static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
+ struct tevent_context *ev,
uint8_t *msg, size_t msg_len,
int *fds, size_t num_fds);
@@ -1049,7 +1050,7 @@ static void messaging_dgm_read_handler(struct tevent_context *ev,
}
}
- messaging_dgm_recv(ctx, buf, received, fds, num_fds);
+ messaging_dgm_recv(ctx, ev, buf, received, fds, num_fds);
}
}
@@ -1061,6 +1062,7 @@ static int messaging_dgm_in_msg_destructor(struct messaging_dgm_in_msg *m)
}
static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
+ struct tevent_context *ev,
uint8_t *buf, size_t buflen,
int *fds, size_t num_fds)
{
--
2.7.4
>From b45c3093b58e4beee2a79bd7159626987c33d109 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 23 Sep 2016 18:36:15 -0700
Subject: [PATCH 22/26] messages_dgm: Pass receiving "ev" to recv_cb
Signed-off-by: Volker Lendecke <vl at samba.org>
---
source3/lib/messages_dgm.c | 10 ++++++----
source3/lib/messages_dgm.h | 3 ++-
source3/lib/messages_dgm_ref.c | 6 ++++--
3 files changed, 12 insertions(+), 7 deletions(-)
diff --git a/source3/lib/messages_dgm.c b/source3/lib/messages_dgm.c
index 070cac6..8c632b9 100644
--- a/source3/lib/messages_dgm.c
+++ b/source3/lib/messages_dgm.c
@@ -78,7 +78,8 @@ struct messaging_dgm_context {
struct tevent_fd *read_fde;
struct messaging_dgm_in_msg *in_msgs;
- void (*recv_cb)(const uint8_t *msg,
+ void (*recv_cb)(struct tevent_context *ev,
+ const uint8_t *msg,
size_t msg_len,
int *fds,
size_t num_fds,
@@ -823,7 +824,8 @@ int messaging_dgm_init(struct tevent_context *ev,
uint64_t *punique,
const char *socket_dir,
const char *lockfile_dir,
- void (*recv_cb)(const uint8_t *msg,
+ void (*recv_cb)(struct tevent_context *ev,
+ const uint8_t *msg,
size_t msg_len,
int *fds,
size_t num_fds,
@@ -1079,7 +1081,7 @@ static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
buflen -= sizeof(cookie);
if (cookie == 0) {
- ctx->recv_cb(buf, buflen, fds, num_fds,
+ ctx->recv_cb(ev, buf, buflen, fds, num_fds,
ctx->recv_cb_private_data);
return;
}
@@ -1142,7 +1144,7 @@ static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
DLIST_REMOVE(ctx->in_msgs, msg);
talloc_set_destructor(msg, NULL);
- ctx->recv_cb(msg->buf, msg->msglen, fds, num_fds,
+ ctx->recv_cb(ev, msg->buf, msg->msglen, fds, num_fds,
ctx->recv_cb_private_data);
TALLOC_FREE(msg);
diff --git a/source3/lib/messages_dgm.h b/source3/lib/messages_dgm.h
index a9cbd81..7695a92 100644
--- a/source3/lib/messages_dgm.h
+++ b/source3/lib/messages_dgm.h
@@ -28,7 +28,8 @@ int messaging_dgm_init(struct tevent_context *ev,
uint64_t *unique,
const char *socket_dir,
const char *lockfile_dir,
- void (*recv_cb)(const uint8_t *msg,
+ void (*recv_cb)(struct tevent_context *ev,
+ const uint8_t *msg,
size_t msg_len,
int *fds,
size_t num_fds,
diff --git a/source3/lib/messages_dgm_ref.c b/source3/lib/messages_dgm_ref.c
index 3ea8b9d..7b8acf1 100644
--- a/source3/lib/messages_dgm_ref.c
+++ b/source3/lib/messages_dgm_ref.c
@@ -36,7 +36,8 @@ static pid_t dgm_pid = 0;
static struct msg_dgm_ref *refs = NULL;
static int msg_dgm_ref_destructor(struct msg_dgm_ref *r);
-static void msg_dgm_ref_recv(const uint8_t *msg, size_t msg_len,
+static void msg_dgm_ref_recv(struct tevent_context *ev,
+ const uint8_t *msg, size_t msg_len,
int *fds, size_t num_fds, void *private_data);
void *messaging_dgm_ref(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
@@ -114,7 +115,8 @@ void *messaging_dgm_ref(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
return result;
}
-static void msg_dgm_ref_recv(const uint8_t *msg, size_t msg_len,
+static void msg_dgm_ref_recv(struct tevent_context *ev,
+ const uint8_t *msg, size_t msg_len,
int *fds, size_t num_fds, void *private_data)
{
struct msg_dgm_ref *r, *next;
--
2.7.4
>From 85226b8e277c190a02110f148e19daae6e24b2d2 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 23 Sep 2016 18:36:15 -0700
Subject: [PATCH 23/26] messages_dgm_ref: Pass receiving "ev" to recv_cb
Signed-off-by: Volker Lendecke <vl at samba.org>
---
source3/lib/messages.c | 3 ++-
source3/lib/messages_dgm_ref.c | 8 +++++---
source3/lib/messages_dgm_ref.h | 3 ++-
source4/lib/messaging/messaging.c | 6 ++++--
4 files changed, 13 insertions(+), 7 deletions(-)
diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index ae06243..96d106a 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -154,7 +154,8 @@ static struct messaging_rec *messaging_rec_create(
return result;
}
-static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
+static void messaging_recv_cb(struct tevent_context *ev,
+ const uint8_t *msg, size_t msg_len,
int *fds, size_t num_fds,
void *private_data)
{
diff --git a/source3/lib/messages_dgm_ref.c b/source3/lib/messages_dgm_ref.c
index 7b8acf1..00a3a66 100644
--- a/source3/lib/messages_dgm_ref.c
+++ b/source3/lib/messages_dgm_ref.c
@@ -27,7 +27,8 @@
struct msg_dgm_ref {
struct msg_dgm_ref *prev, *next;
void *tevent_handle;
- void (*recv_cb)(const uint8_t *msg, size_t msg_len,
+ void (*recv_cb)(struct tevent_context *ev,
+ const uint8_t *msg, size_t msg_len,
int *fds, size_t num_fds, void *private_data);
void *recv_cb_private_data;
};
@@ -44,7 +45,8 @@ void *messaging_dgm_ref(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
uint64_t *unique,
const char *socket_dir,
const char *lockfile_dir,
- void (*recv_cb)(const uint8_t *msg, size_t msg_len,
+ void (*recv_cb)(struct tevent_context *ev,
+ const uint8_t *msg, size_t msg_len,
int *fds, size_t num_fds,
void *private_data),
void *recv_cb_private_data,
@@ -127,7 +129,7 @@ static void msg_dgm_ref_recv(struct tevent_context *ev,
*/
for (r = refs; r != NULL; r = next) {
next = r->next;
- r->recv_cb(msg, msg_len, fds, num_fds,
+ r->recv_cb(ev, msg, msg_len, fds, num_fds,
r->recv_cb_private_data);
}
}
diff --git a/source3/lib/messages_dgm_ref.h b/source3/lib/messages_dgm_ref.h
index 8f0aff8..cd77101 100644
--- a/source3/lib/messages_dgm_ref.h
+++ b/source3/lib/messages_dgm_ref.h
@@ -28,7 +28,8 @@ void *messaging_dgm_ref(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
uint64_t *unique,
const char *socket_dir,
const char *lockfile_dir,
- void (*recv_cb)(const uint8_t *msg, size_t msg_len,
+ void (*recv_cb)(struct tevent_context *ev,
+ const uint8_t *msg, size_t msg_len,
int *fds, size_t num_fds,
void *private_data),
void *recv_cb_private_data,
diff --git a/source4/lib/messaging/messaging.c b/source4/lib/messaging/messaging.c
index c0b64be..950a685 100644
--- a/source4/lib/messaging/messaging.c
+++ b/source4/lib/messaging/messaging.c
@@ -294,7 +294,8 @@ int imessaging_cleanup(struct imessaging_context *msg)
return 0;
}
-static void imessaging_dgm_recv(const uint8_t *buf, size_t buf_len,
+static void imessaging_dgm_recv(struct tevent_context *ev,
+ const uint8_t *buf, size_t buf_len,
int *fds, size_t num_fds,
void *private_data);
@@ -415,7 +416,8 @@ fail:
return NULL;
}
-static void imessaging_dgm_recv(const uint8_t *buf, size_t buf_len,
+static void imessaging_dgm_recv(struct tevent_context *ev,
+ const uint8_t *buf, size_t buf_len,
int *fds, size_t num_fds,
void *private_data)
{
--
2.7.4
>From a838b383d0fffff7ada8c08d463e279be16488ac Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 23 Sep 2016 19:28:10 -0700
Subject: [PATCH 24/26] messaging: Pass "ev" to messaging_dispatch_rec
Signed-off-by: Volker Lendecke <vl at samba.org>
---
source3/lib/messages.c | 6 ++++--
1 file changed, 4 insertions(+), 2 deletions(-)
diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index 96d106a..299328f 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -86,6 +86,7 @@ struct messaging_context {
static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
struct messaging_rec *rec);
static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
+ struct tevent_context *ev,
struct messaging_rec *rec);
/****************************************************************************
@@ -199,7 +200,7 @@ static void messaging_recv_cb(struct tevent_context *ev,
(unsigned)rec.msg_type, rec.buf.length, num_fds,
server_id_str_buf(rec.src, &idbuf));
- messaging_dispatch_rec(msg_ctx, &rec);
+ messaging_dispatch_rec(msg_ctx, ev, &rec);
return;
close_fail:
@@ -529,7 +530,7 @@ static void messaging_post_handler(struct tevent_context *ev,
{
struct messaging_post_state *state = talloc_get_type_abort(
private_data, struct messaging_post_state);
- messaging_dispatch_rec(state->msg_ctx, state->rec);
+ messaging_dispatch_rec(state->msg_ctx, ev, state->rec);
}
int messaging_send_iov_from(struct messaging_context *msg_ctx,
@@ -962,6 +963,7 @@ static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
Dispatch one messaging_rec
*/
static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
+ struct tevent_context *ev,
struct messaging_rec *rec)
{
struct messaging_callback *cb, *next;
--
2.7.4
>From a0c32fc256edd179870409c1b980874a7e58cd74 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 23 Sep 2016 19:35:10 -0700
Subject: [PATCH 25/26] messaging: Act on messages within the right context
Only look at "classic" messaging_register handlers in the main event
context loop
If we're sitting in a nested event context, only act upon the
messaging_filtered_read ones that are registered in the nested context.
Postpone everything else via an immediate to the main tevent context
Signed-off-by: Volker Lendecke <vl at samba.org>
---
source3/lib/messages.c | 56 ++++++++++++++++++++++++++++++++++++++++++--------
1 file changed, 47 insertions(+), 9 deletions(-)
diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index 299328f..d0045d1 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -959,18 +959,14 @@ static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
return true;
}
-/*
- Dispatch one messaging_rec
-*/
-static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
- struct tevent_context *ev,
- struct messaging_rec *rec)
+static void messaging_dispatch_classic(struct messaging_context *msg_ctx,
+ struct messaging_rec *rec)
{
struct messaging_callback *cb, *next;
- unsigned i;
- size_t j;
for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
+ size_t j;
+
next = cb->next;
if (cb->msg_type != rec->msg_type) {
continue;
@@ -996,6 +992,21 @@ static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
* message type
*/
}
+}
+
+/*
+ Dispatch one messaging_rec
+*/
+static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
+ struct tevent_context *ev,
+ struct messaging_rec *rec)
+{
+ unsigned i;
+ size_t j;
+
+ if (ev == msg_ctx->event_ctx) {
+ messaging_dispatch_classic(msg_ctx, rec);
+ }
if (!messaging_append_new_waiters(msg_ctx)) {
for (j=0; j < rec->num_fds; j++) {
@@ -1032,7 +1043,8 @@ static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
state = tevent_req_data(
req, struct messaging_filtered_read_state);
- if (state->filter(rec, state->private_data)) {
+ if ((ev == state->ev) &&
+ state->filter(rec, state->private_data)) {
messaging_filtered_read_done(req, rec);
/*
@@ -1045,6 +1057,32 @@ static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
i += 1;
}
+ if (ev != msg_ctx->event_ctx) {
+ struct iovec iov;
+ int fds[rec->num_fds];
+ int ret;
+
+ /*
+ * We've been listening on a nested event
+ * context. Messages need to be handled in the main
+ * event context, so post to ourselves
+ */
+
+ iov.iov_base = rec->buf.data;
+ iov.iov_len = rec->buf.length;
+
+ for (i=0; i<rec->num_fds; i++) {
+ fds[i] = rec->fds[i];
+ }
+
+ ret = messaging_post_self(
+ msg_ctx, rec->src, rec->dest, rec->msg_type,
+ &iov, 1, fds, rec->num_fds);
+ if (ret == 0) {
+ return;
+ }
+ }
+
/*
* If the fd-array isn't used, just close it.
*/
--
2.7.4
>From d83b19f6c170abab7cd7b55b0286868d141f085f Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Wed, 28 Sep 2016 16:05:25 -0700
Subject: [PATCH 26/26] messaging: Postpone messages to the right tevent
context
Signed-off-by: Volker Lendecke <vl at samba.org>
---
source4/lib/messaging/messaging.c | 63 +++++++++++++++++++++++++++++++++++++++
1 file changed, 63 insertions(+)
diff --git a/source4/lib/messaging/messaging.c b/source4/lib/messaging/messaging.c
index 950a685..aca1a38 100644
--- a/source4/lib/messaging/messaging.c
+++ b/source4/lib/messaging/messaging.c
@@ -56,6 +56,7 @@ struct irpc_request {
struct imessaging_context {
struct imessaging_context *prev, *next;
+ struct tevent_context *ev;
struct server_id server_id;
const char *sock_dir;
const char *lock_dir;
@@ -347,6 +348,7 @@ struct imessaging_context *imessaging_init(TALLOC_CTX *mem_ctx,
if (msg == NULL) {
return NULL;
}
+ msg->ev = ev;
talloc_set_destructor(msg, imessaging_context_destructor);
@@ -416,6 +418,50 @@ fail:
return NULL;
}
+struct imessaging_post_state {
+ struct imessaging_context *msg_ctx;
+ size_t buf_len;
+ uint8_t buf[];
+};
+
+static void imessaging_post_handler(struct tevent_context *ev,
+ struct tevent_immediate *ti,
+ void *private_data)
+{
+ struct imessaging_post_state *state = talloc_get_type_abort(
+ private_data, struct imessaging_post_state);
+ imessaging_dgm_recv(ev, state->buf, state->buf_len, NULL, 0,
+ state->msg_ctx);
+}
+
+static int imessaging_post_self(struct imessaging_context *msg,
+ const uint8_t *buf, size_t buf_len)
+{
+ struct tevent_immediate *ti;
+ struct imessaging_post_state *state;
+
+ ti = tevent_create_immediate(msg);
+ if (ti == NULL) {
+ return ENOMEM;
+ }
+
+ state = talloc_size(
+ ti, offsetof(struct imessaging_post_state, buf) + buf_len);
+ if (state == NULL) {
+ TALLOC_FREE(ti);
+ return ENOMEM;
+ }
+ talloc_set_name_const(state, "struct imessaging_post_state");
+
+ state->msg_ctx = msg;
+ memcpy(state->buf, buf, buf_len);
+
+ tevent_schedule_immediate(ti, msg->ev, imessaging_post_handler,
+ msg);
+
+ return 0;
+}
+
static void imessaging_dgm_recv(struct tevent_context *ev,
const uint8_t *buf, size_t buf_len,
int *fds, size_t num_fds,
@@ -433,6 +479,23 @@ static void imessaging_dgm_recv(struct tevent_context *ev,
return;
}
+ if (num_fds != 0) {
+ /*
+ * Source4 based messaging does not expect fd's yet
+ */
+ return;
+ }
+
+ if (ev != msg->ev) {
+ int ret;
+ ret = imessaging_post_self(msg, buf, buf_len);
+ if (ret != 0) {
+ DBG_WARNING("imessaging_post_self failed: %s\n",
+ strerror(ret));
+ }
+ return;
+ }
+
message_hdr_get(&msg_type, &src, &dst, buf);
data.data = discard_const_p(uint8_t, buf + MESSAGE_HDR_LENGTH);
--
2.7.4
More information about the samba-technical
mailing list