[PATCH] restructure messaging

Volker Lendecke vl at samba.org
Wed Sep 28 23:33:25 UTC 2016


Hi!

Looking for a scary patchset? Here it is....

Main reason for this patchset is the last two ones: This patches
passes down the event context that triggered a receiving message
callback to the handlers and only acts upon them in the right context.

On the way there I've eliminated poll_funcs and made our pthreadpool
safe against run-down with blocked threads. It's the last part that
again and again reminded me of

http://bholley.net/blog/2015/must-be-this-tall-to-write-multi-threaded-code.html

Consider it WIP, it has to survive a few autobuilds and the
tevent version needs to be bumped.

Review appreciated!

Thanks, Volker
-------------- next part --------------
>From c5182348fabb078ced166885036bd8d9d73b8fed Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 23 Sep 2016 18:43:04 -0700
Subject: [PATCH 01/26] messaging4: Fix signed/unsigned hickups

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source4/lib/messaging/messaging.c | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/source4/lib/messaging/messaging.c b/source4/lib/messaging/messaging.c
index d0beef6..c0b64be 100644
--- a/source4/lib/messaging/messaging.c
+++ b/source4/lib/messaging/messaging.c
@@ -149,7 +149,7 @@ NTSTATUS imessaging_register(struct imessaging_context *msg, void *private_data,
 	/* possibly expand dispatch array */
 	if (msg_type >= msg->num_types) {
 		struct dispatch_fn **dp;
-		int i;
+		uint32_t i;
 		dp = talloc_realloc(msg, msg->dispatch, struct dispatch_fn *, msg_type+1);
 		NT_STATUS_HAVE_NO_MEMORY(dp);
 		msg->dispatch = dp;
@@ -728,7 +728,7 @@ static int all_servers_func(const char *name, unsigned num_servers,
 	struct irpc_name_records *name_records = talloc_get_type(
 		private_data, struct irpc_name_records);
 	struct irpc_name_record *name_record;
-	int i;
+	uint32_t i;
 
 	name_records->names
 		= talloc_realloc(name_records, name_records->names,
-- 
2.7.4


>From c18ced932b47b066a47964f41dbfd999dbdc5142 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Wed, 31 Aug 2016 15:03:16 +0200
Subject: [PATCH 02/26] tevent: Factor out tevent_common_insert_timer

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 lib/tevent/tevent_timed.c | 70 +++++++++++++++++++++++++++--------------------
 1 file changed, 40 insertions(+), 30 deletions(-)

diff --git a/lib/tevent/tevent_timed.c b/lib/tevent/tevent_timed.c
index 920d39f..bb0160c 100644
--- a/lib/tevent/tevent_timed.c
+++ b/lib/tevent/tevent_timed.c
@@ -154,39 +154,13 @@ static int tevent_common_timed_deny_destructor(struct tevent_timer *te)
 	return -1;
 }
 
-/*
-  add a timed event
-  return NULL on failure (memory allocation error)
-*/
-static struct tevent_timer *tevent_common_add_timer_internal(
-					struct tevent_context *ev,
-					TALLOC_CTX *mem_ctx,
-					struct timeval next_event,
-					tevent_timer_handler_t handler,
-					void *private_data,
-					const char *handler_name,
-					const char *location,
-					bool optimize_zero)
+static void tevent_common_insert_timer(struct tevent_context *ev,
+				       struct tevent_timer *te,
+				       bool optimize_zero)
 {
-	struct tevent_timer *te, *prev_te, *cur_te;
-
-	te = talloc(mem_ctx?mem_ctx:ev, struct tevent_timer);
-	if (te == NULL) return NULL;
-
-	te->event_ctx		= ev;
-	te->next_event		= next_event;
-	te->handler		= handler;
-	te->private_data	= private_data;
-	te->handler_name	= handler_name;
-	te->location		= location;
-	te->additional_data	= NULL;
-
-	if (ev->timer_events == NULL) {
-		ev->last_zero_timer = NULL;
-	}
+	struct tevent_timer *prev_te = NULL;
 
 	/* keep the list ordered */
-	prev_te = NULL;
 	if (optimize_zero && tevent_timeval_is_zero(&te->next_event)) {
 		/*
 		 * Some callers use zero tevent_timer
@@ -199,6 +173,8 @@ static struct tevent_timer *tevent_common_add_timer_internal(
 		prev_te = ev->last_zero_timer;
 		ev->last_zero_timer = te;
 	} else {
+		struct tevent_timer *cur_te;
+
 		/*
 		 * we traverse the list from the tail
 		 * because it's much more likely that
@@ -227,6 +203,40 @@ static struct tevent_timer *tevent_common_add_timer_internal(
 	}
 
 	DLIST_ADD_AFTER(ev->timer_events, te, prev_te);
+}
+
+/*
+  add a timed event
+  return NULL on failure (memory allocation error)
+*/
+static struct tevent_timer *tevent_common_add_timer_internal(
+					struct tevent_context *ev,
+					TALLOC_CTX *mem_ctx,
+					struct timeval next_event,
+					tevent_timer_handler_t handler,
+					void *private_data,
+					const char *handler_name,
+					const char *location,
+					bool optimize_zero)
+{
+	struct tevent_timer *te;
+
+	te = talloc(mem_ctx?mem_ctx:ev, struct tevent_timer);
+	if (te == NULL) return NULL;
+
+	te->event_ctx		= ev;
+	te->next_event		= next_event;
+	te->handler		= handler;
+	te->private_data	= private_data;
+	te->handler_name	= handler_name;
+	te->location		= location;
+	te->additional_data	= NULL;
+
+	if (ev->timer_events == NULL) {
+		ev->last_zero_timer = NULL;
+	}
+
+	tevent_common_insert_timer(ev, te, optimize_zero);
 
 	talloc_set_destructor(te, tevent_common_timed_destructor);
 
-- 
2.7.4


>From ef4331804f81db860671f11409baa1db1a77d4ed Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Wed, 31 Aug 2016 15:39:59 +0200
Subject: [PATCH 03/26] tevent: Add tevent_update_timer()

This will be a quicker way to time out sending sockets in messaging_dgm. Right
now cleanup of out-sockets is a bit coarse. The ideal would be to kill a socket
after being idle n seconds. This would mean to free and re-install a timer on
every packet. tevent_update_timer will be quite a bit cheaper.

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 lib/tevent/ABI/tevent-0.9.30.sigs |  1 +
 lib/tevent/tevent.h               | 10 ++++++++++
 lib/tevent/tevent_timed.c         | 18 ++++++++++++++++++
 3 files changed, 29 insertions(+)

diff --git a/lib/tevent/ABI/tevent-0.9.30.sigs b/lib/tevent/ABI/tevent-0.9.30.sigs
index 9b8bfa1..66a450e 100644
--- a/lib/tevent/ABI/tevent-0.9.30.sigs
+++ b/lib/tevent/ABI/tevent-0.9.30.sigs
@@ -92,5 +92,6 @@ tevent_timeval_set: struct timeval (uint32_t, uint32_t)
 tevent_timeval_until: struct timeval (const struct timeval *, const struct timeval *)
 tevent_timeval_zero: struct timeval (void)
 tevent_trace_point_callback: void (struct tevent_context *, enum tevent_trace_point)
+tevent_update_timer: void (struct tevent_timer *, struct timeval)
 tevent_wakeup_recv: bool (struct tevent_req *)
 tevent_wakeup_send: struct tevent_req *(TALLOC_CTX *, struct tevent_context *, struct timeval)
diff --git a/lib/tevent/tevent.h b/lib/tevent/tevent.h
index 7de04d0..bb23257 100644
--- a/lib/tevent/tevent.h
+++ b/lib/tevent/tevent.h
@@ -252,6 +252,16 @@ struct tevent_timer *_tevent_add_timer(struct tevent_context *ev,
 			  #handler, __location__)
 #endif
 
+/**
+ * @brief Set the time a tevent_timer fires
+ *
+ * @param[in]  te       The timer event to reset
+ *
+ * @param[in]  next_event  Timeval specifying the absolute time to fire this
+ * event. This is not an offset.
+ */
+void tevent_update_timer(struct tevent_timer *te, struct timeval next_event);
+
 #ifdef DOXYGEN
 /**
  * Initialize an immediate event object
diff --git a/lib/tevent/tevent_timed.c b/lib/tevent/tevent_timed.c
index bb0160c..92f3ed1 100644
--- a/lib/tevent/tevent_timed.c
+++ b/lib/tevent/tevent_timed.c
@@ -284,6 +284,24 @@ struct tevent_timer *tevent_common_add_timer_v2(struct tevent_context *ev,
 						true);
 }
 
+void tevent_update_timer(struct tevent_timer *te, struct timeval next_event)
+{
+	struct tevent_context *ev = te->event_ctx;
+
+	if (ev->last_zero_timer == te) {
+		te->event_ctx->last_zero_timer = DLIST_PREV(te);
+	}
+	DLIST_REMOVE(ev->timer_events, te);
+
+	te->next_event = next_event;
+
+	/*
+	 * Not doing the zero_timer optimization. This is for new code
+	 * that should know about immediates.
+	 */
+	tevent_common_insert_timer(ev, te, false);
+}
+
 /*
   do a single event loop using the events defined in ev
 
-- 
2.7.4


>From 6deb4f32574bfd54da5cfc08c402f2931ec49ba4 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Wed, 7 Sep 2016 19:17:21 +0200
Subject: [PATCH 04/26] tevent: Rename wakeup fds

This makes the reading end of the signalling pipe special: If we have eventfd,
this is the same as the write fd. Without eventfd, it will have to be a
separate fd. This moves the requirement to #ifdef from the writing end to the
reading end. Why? We'll use the writing end somewhere else too soon, and this
patch avoids an #ifdef in that new place.

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 lib/tevent/tevent.c          | 21 +++++++++++----------
 lib/tevent/tevent_internal.h |  4 ++--
 2 files changed, 13 insertions(+), 12 deletions(-)

diff --git a/lib/tevent/tevent.c b/lib/tevent/tevent.c
index 331be0e..87776ec 100644
--- a/lib/tevent/tevent.c
+++ b/lib/tevent/tevent.c
@@ -860,7 +860,7 @@ static void wakeup_pipe_handler(struct tevent_context *ev,
 
 int tevent_common_wakeup_init(struct tevent_context *ev)
 {
-	int ret;
+	int ret, read_fd;
 
 	if (ev->wakeup_fde != NULL) {
 		return 0;
@@ -871,7 +871,7 @@ int tevent_common_wakeup_init(struct tevent_context *ev)
 	if (ret == -1) {
 		return errno;
 	}
-	ev->wakeup_fd = ret;
+	read_fd = ev->wakeup_fd = ret;
 #else
 	{
 		int pipe_fds[2];
@@ -879,21 +879,22 @@ int tevent_common_wakeup_init(struct tevent_context *ev)
 		if (ret == -1) {
 			return errno;
 		}
-		ev->wakeup_fd = pipe_fds[0];
-		ev->wakeup_write_fd = pipe_fds[1];
+		ev->wakeup_fd = pipe_fds[1];
+		ev->wakeup_read_fd = pipe_fds[0];
 
 		ev_set_blocking(ev->wakeup_fd, false);
-		ev_set_blocking(ev->wakeup_write_fd, false);
+		ev_set_blocking(ev->wakeup_read_fd, false);
+
+		read_fd = ev->wakeup_read_fd;
 	}
 #endif
 
-	ev->wakeup_fde = tevent_add_fd(ev, ev, ev->wakeup_fd,
-				     TEVENT_FD_READ,
+	ev->wakeup_fde = tevent_add_fd(ev, ev, read_fd, TEVENT_FD_READ,
 				     wakeup_pipe_handler, NULL);
 	if (ev->wakeup_fde == NULL) {
 		close(ev->wakeup_fd);
 #ifndef HAVE_EVENTFD
-		close(ev->wakeup_write_fd);
+		close(ev->wakeup_read_fd);
 #endif
 		return ENOMEM;
 	}
@@ -915,7 +916,7 @@ int tevent_common_wakeup(struct tevent_context *ev)
 		ret = write(ev->wakeup_fd, &val, sizeof(val));
 #else
 		char c = '\0';
-		ret = write(ev->wakeup_write_fd, &c, 1);
+		ret = write(ev->wakeup_fd, &c, 1);
 #endif
 	} while ((ret == -1) && (errno == EINTR));
 
@@ -932,6 +933,6 @@ static void tevent_common_wakeup_fini(struct tevent_context *ev)
 
 	close(ev->wakeup_fd);
 #ifndef HAVE_EVENTFD
-	close(ev->wakeup_write_fd);
+	close(ev->wakeup_read_fd);
 #endif
 }
diff --git a/lib/tevent/tevent_internal.h b/lib/tevent/tevent_internal.h
index d960544..84ae5bc 100644
--- a/lib/tevent/tevent_internal.h
+++ b/lib/tevent/tevent_internal.h
@@ -277,9 +277,9 @@ struct tevent_context {
 
 	/* pipe hack used with signal handlers */
 	struct tevent_fd *wakeup_fde;
-	int wakeup_fd;
+	int wakeup_fd;		/* fd to write into */
 #ifndef HAVE_EVENT_FD
-	int wakeup_write_fd;
+	int wakeup_read_fd;
 #endif
 
 	/* debugging operations */
-- 
2.7.4


>From dc5f29425f9ca2e9050484f558ced4c35c971ceb Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Wed, 7 Sep 2016 19:47:55 +0200
Subject: [PATCH 05/26] tevent: Add tevent_common_wakeup_fd()

This prepares tevent run-down with active threads.

It has the advantage to not depend on talloc'ed structs. It is needed to make
talloc_free(tevent_context) safe when tevent_threaded_contexts are still
around.

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 lib/tevent/ABI/tevent-0.9.30.sigs |  1 +
 lib/tevent/tevent.c               | 19 ++++++++++++-------
 lib/tevent/tevent_internal.h      |  1 +
 3 files changed, 14 insertions(+), 7 deletions(-)

diff --git a/lib/tevent/ABI/tevent-0.9.30.sigs b/lib/tevent/ABI/tevent-0.9.30.sigs
index 66a450e..ea179a0 100644
--- a/lib/tevent/ABI/tevent-0.9.30.sigs
+++ b/lib/tevent/ABI/tevent-0.9.30.sigs
@@ -36,6 +36,7 @@ tevent_common_loop_wait: int (struct tevent_context *, const char *)
 tevent_common_schedule_immediate: void (struct tevent_immediate *, struct tevent_context *, tevent_immediate_handler_t, void *, const char *, const char *)
 tevent_common_threaded_activate_immediate: void (struct tevent_context *)
 tevent_common_wakeup: int (struct tevent_context *)
+tevent_common_wakeup_fd: int (int)
 tevent_common_wakeup_init: int (struct tevent_context *)
 tevent_context_init: struct tevent_context *(TALLOC_CTX *)
 tevent_context_init_byname: struct tevent_context *(TALLOC_CTX *, const char *)
diff --git a/lib/tevent/tevent.c b/lib/tevent/tevent.c
index 87776ec..575337d 100644
--- a/lib/tevent/tevent.c
+++ b/lib/tevent/tevent.c
@@ -902,27 +902,32 @@ int tevent_common_wakeup_init(struct tevent_context *ev)
 	return 0;
 }
 
-int tevent_common_wakeup(struct tevent_context *ev)
+int tevent_common_wakeup_fd(int fd)
 {
 	ssize_t ret;
 
-	if (ev->wakeup_fde == NULL) {
-		return ENOTCONN;
-	}
-
 	do {
 #ifdef HAVE_EVENTFD
 		uint64_t val = 1;
-		ret = write(ev->wakeup_fd, &val, sizeof(val));
+		ret = write(fd, &val, sizeof(val));
 #else
 		char c = '\0';
-		ret = write(ev->wakeup_fd, &c, 1);
+		ret = write(fd, &c, 1);
 #endif
 	} while ((ret == -1) && (errno == EINTR));
 
 	return 0;
 }
 
+int tevent_common_wakeup(struct tevent_context *ev)
+{
+	if (ev->wakeup_fde == NULL) {
+		return ENOTCONN;
+	}
+
+	return tevent_common_wakeup_fd(ev->wakeup_fd);
+}
+
 static void tevent_common_wakeup_fini(struct tevent_context *ev)
 {
 	if (ev->wakeup_fde == NULL) {
diff --git a/lib/tevent/tevent_internal.h b/lib/tevent/tevent_internal.h
index 84ae5bc..a4af79e 100644
--- a/lib/tevent/tevent_internal.h
+++ b/lib/tevent/tevent_internal.h
@@ -357,6 +357,7 @@ void tevent_common_threaded_activate_immediate(struct tevent_context *ev);
 
 bool tevent_common_have_events(struct tevent_context *ev);
 int tevent_common_wakeup_init(struct tevent_context *ev);
+int tevent_common_wakeup_fd(int fd);
 int tevent_common_wakeup(struct tevent_context *ev);
 
 struct tevent_signal *tevent_common_add_signal(struct tevent_context *ev,
-- 
2.7.4


>From fe1d0d7ebd2df9e9a1b3049f77b6563a1b210364 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Wed, 7 Sep 2016 20:25:36 +0200
Subject: [PATCH 06/26] tevent: Make talloc_free safe when threaded_contexts
 exist

I did not find a way to do this safely without a mutex per threaded_context.

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 lib/tevent/tevent.c          | 61 ++++++++++++++++++++++++++++++++++++--------
 lib/tevent/tevent_internal.h |  5 ++++
 lib/tevent/tevent_threads.c  | 38 +++++++++++++++++++++++++--
 3 files changed, 92 insertions(+), 12 deletions(-)

diff --git a/lib/tevent/tevent.c b/lib/tevent/tevent.c
index 575337d..65b101f 100644
--- a/lib/tevent/tevent.c
+++ b/lib/tevent/tevent.c
@@ -200,6 +200,16 @@ static void tevent_atfork_prepare(void)
 	}
 
 	for (ev = tevent_contexts; ev != NULL; ev = ev->next) {
+		struct tevent_threaded_context *tctx;
+
+		for (tctx = ev->threaded_contexts; tctx != NULL;
+		     tctx = tctx->next) {
+			ret = pthread_mutex_lock(&tctx->event_ctx_mutex);
+			if (ret != 0) {
+				tevent_abort(ev, "pthread_mutex_lock failed");
+			}
+		}
+
 		ret = pthread_mutex_lock(&ev->scheduled_mutex);
 		if (ret != 0) {
 			tevent_abort(ev, "pthread_mutex_lock failed");
@@ -214,10 +224,21 @@ static void tevent_atfork_parent(void)
 
 	for (ev = DLIST_TAIL(tevent_contexts); ev != NULL;
 	     ev = DLIST_PREV(ev)) {
+		struct tevent_threaded_context *tctx;
+
 		ret = pthread_mutex_unlock(&ev->scheduled_mutex);
 		if (ret != 0) {
 			tevent_abort(ev, "pthread_mutex_unlock failed");
 		}
+
+		for (tctx = DLIST_TAIL(ev->threaded_contexts); tctx != NULL;
+		     tctx = DLIST_PREV(tctx)) {
+			ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
+			if (ret != 0) {
+				tevent_abort(
+					ev, "pthread_mutex_unlock failed");
+			}
+		}
 	}
 
 	ret = pthread_mutex_unlock(&tevent_contexts_mutex);
@@ -235,9 +256,15 @@ static void tevent_atfork_child(void)
 	     ev = DLIST_PREV(ev)) {
 		struct tevent_threaded_context *tctx;
 
-		for (tctx = ev->threaded_contexts; tctx != NULL;
-		     tctx = tctx->next) {
+		for (tctx = DLIST_TAIL(ev->threaded_contexts); tctx != NULL;
+		     tctx = DLIST_PREV(tctx)) {
 			tctx->event_ctx = NULL;
+
+			ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
+			if (ret != 0) {
+				tevent_abort(
+					ev, "pthread_mutex_unlock failed");
+			}
 		}
 
 		ev->threaded_contexts = NULL;
@@ -289,18 +316,32 @@ int tevent_common_context_destructor(struct tevent_context *ev)
 	if (ret != 0) {
 		abort();
 	}
-#endif
 
-	if (ev->threaded_contexts != NULL) {
+	while (ev->threaded_contexts != NULL) {
+		struct tevent_threaded_context *tctx = ev->threaded_contexts;
+
+		ret = pthread_mutex_lock(&tctx->event_ctx_mutex);
+		if (ret != 0) {
+			abort();
+		}
+
 		/*
-		 * Threaded contexts are indicators that threads are
-		 * about to send us immediates via
-		 * tevent_threaded_schedule_immediate. The caller
-		 * needs to make sure that the tevent context lives
-		 * long enough to receive immediates from all threads.
+		 * Indicate to the thread that the tevent_context is
+		 * gone. The counterpart of this is in
+		 * _tevent_threaded_schedule_immediate, there we read
+		 * this under the threaded_context's mutex.
 		 */
-		tevent_abort(ev, "threaded contexts exist");
+
+		tctx->event_ctx = NULL;
+
+		ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
+		if (ret != 0) {
+			abort();
+		}
+
+		DLIST_REMOVE(ev->threaded_contexts, tctx);
 	}
+#endif
 
 	tevent_common_wakeup_fini(ev);
 
diff --git a/lib/tevent/tevent_internal.h b/lib/tevent/tevent_internal.h
index a4af79e..a5f1ebde 100644
--- a/lib/tevent/tevent_internal.h
+++ b/lib/tevent/tevent_internal.h
@@ -230,7 +230,12 @@ struct tevent_signal {
 
 struct tevent_threaded_context {
 	struct tevent_threaded_context *next, *prev;
+
+#ifdef HAVE_PTHREAD
+	pthread_mutex_t event_ctx_mutex;
+#endif
 	struct tevent_context *event_ctx;
+	int wakeup_fd;
 };
 
 struct tevent_debug_ops {
diff --git a/lib/tevent/tevent_threads.c b/lib/tevent/tevent_threads.c
index e42759e..8197323 100644
--- a/lib/tevent/tevent_threads.c
+++ b/lib/tevent/tevent_threads.c
@@ -375,9 +375,17 @@ void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
 static int tevent_threaded_context_destructor(
 	struct tevent_threaded_context *tctx)
 {
+	int ret;
+
 	if (tctx->event_ctx != NULL) {
 		DLIST_REMOVE(tctx->event_ctx->threaded_contexts, tctx);
 	}
+
+	ret = pthread_mutex_destroy(&tctx->event_ctx_mutex);
+	if (ret != 0) {
+		abort();
+	}
+
 	return 0;
 }
 
@@ -399,6 +407,13 @@ struct tevent_threaded_context *tevent_threaded_context_create(
 		return NULL;
 	}
 	tctx->event_ctx = ev;
+	tctx->wakeup_fd = ev->wakeup_fd;
+
+	ret = pthread_mutex_init(&tctx->event_ctx_mutex, NULL);
+	if (ret != 0) {
+		TALLOC_FREE(tctx);
+		return NULL;
+	}
 
 	DLIST_ADD(ev->threaded_contexts, tctx);
 	talloc_set_destructor(tctx, tevent_threaded_context_destructor);
@@ -418,9 +433,28 @@ void _tevent_threaded_schedule_immediate(struct tevent_threaded_context *tctx,
 					 const char *location)
 {
 #ifdef HAVE_PTHREAD
-	struct tevent_context *ev = tctx->event_ctx;
+	struct tevent_context *ev;
 	int ret;
 
+	ret = pthread_mutex_lock(&tctx->event_ctx_mutex);
+	if (ret != 0) {
+		abort();
+	}
+
+	ev = tctx->event_ctx;
+
+	ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
+	if (ret != 0) {
+		abort();
+	}
+
+	if (ev == NULL) {
+		/*
+		 * Our event context is already gone.
+		 */
+		return;
+	}
+
 	if ((im->event_ctx != NULL) || (handler == NULL)) {
 		abort();
 	}
@@ -455,7 +489,7 @@ void _tevent_threaded_schedule_immediate(struct tevent_threaded_context *tctx,
 	 * than a noncontended one. So I'd opt for the lower footprint
 	 * initially. Maybe we have to change that later.
 	 */
-	tevent_common_wakeup(ev);
+	tevent_common_wakeup_fd(tctx->wakeup_fd);
 #else
 	/*
 	 * tevent_threaded_context_create() returned NULL with ENOSYS...
-- 
2.7.4


>From f96eac9664308fe5f4fd780075df450ceb763343 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Wed, 7 Sep 2016 20:37:21 +0200
Subject: [PATCH 07/26] pthreadpool: Make "shutdown" a bool

Just a small cleanup

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/lib/pthreadpool/pthreadpool.c | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/source3/lib/pthreadpool/pthreadpool.c b/source3/lib/pthreadpool/pthreadpool.c
index ee59cc4..a306c88 100644
--- a/source3/lib/pthreadpool/pthreadpool.c
+++ b/source3/lib/pthreadpool/pthreadpool.c
@@ -73,7 +73,7 @@ struct pthreadpool {
 	/*
 	 * indicator to worker threads that they should shut down
 	 */
-	int shutdown;
+	bool shutdown;
 
 	/*
 	 * maximum number of threads
@@ -150,7 +150,7 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
 		return ret;
 	}
 
-	pool->shutdown = 0;
+	pool->shutdown = false;
 	pool->num_threads = 0;
 	pool->num_exited = 0;
 	pool->exited = NULL;
@@ -295,7 +295,7 @@ int pthreadpool_destroy(struct pthreadpool *pool)
 		 * We have active threads, tell them to finish, wait for that.
 		 */
 
-		pool->shutdown = 1;
+		pool->shutdown = true;
 
 		if (pool->num_idle > 0) {
 			/*
@@ -455,7 +455,7 @@ static void *pthreadpool_server(void *arg)
 		clock_gettime(CLOCK_REALTIME, &ts);
 		ts.tv_sec += 1;
 
-		while ((pool->num_jobs == 0) && (pool->shutdown == 0)) {
+		while ((pool->num_jobs == 0) && !pool->shutdown) {
 
 			pool->num_idle += 1;
 			res = pthread_cond_timedwait(
@@ -505,7 +505,7 @@ static void *pthreadpool_server(void *arg)
 			}
 		}
 
-		if ((pool->num_jobs == 0) && (pool->shutdown != 0)) {
+		if ((pool->num_jobs == 0) && pool->shutdown) {
 			/*
 			 * No more work to do and we're asked to shut down, so
 			 * exit
-- 
2.7.4


>From 8dde4065befa484d6ff4aad0b00950b1135ef08e Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 9 Sep 2016 13:07:57 +0200
Subject: [PATCH 08/26] pthreadpool: Use detached threads

So far we used joinable threads. This prevented pthreadpool_destroy with
blocked threads. This patch converts pthreadpool to detached threads. Now
pthreadpool_destroy does not have to wait for the idle threads to finish, it
can immediately return. pthreadpool_destroy will tell all threads to exit, and
the last active thread will actually free(pthreadpool).

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/lib/pthreadpool/pthreadpool.c | 191 ++++++++++++----------------------
 source3/lib/pthreadpool/pthreadpool.h |   9 +-
 2 files changed, 74 insertions(+), 126 deletions(-)

diff --git a/source3/lib/pthreadpool/pthreadpool.c b/source3/lib/pthreadpool/pthreadpool.c
index a306c88..a1eb924 100644
--- a/source3/lib/pthreadpool/pthreadpool.c
+++ b/source3/lib/pthreadpool/pthreadpool.c
@@ -89,12 +89,6 @@ struct pthreadpool {
 	 * Number of idle threads
 	 */
 	int num_idle;
-
-	/*
-	 * An array of threads that require joining.
-	 */
-	int			num_exited;
-	pthread_t		*exited; /* We alloc more */
 };
 
 static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
@@ -152,8 +146,6 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
 
 	pool->shutdown = false;
 	pool->num_threads = 0;
-	pool->num_exited = 0;
-	pool->exited = NULL;
 	pool->max_threads = max_threads;
 	pool->num_idle = 0;
 
@@ -220,11 +212,6 @@ static void pthreadpool_child(void)
 	     pool = DLIST_PREV(pool)) {
 
 		pool->num_threads = 0;
-
-		pool->num_exited = 0;
-		free(pool->exited);
-		pool->exited = NULL;
-
 		pool->num_idle = 0;
 		pool->head = 0;
 		pool->num_jobs = 0;
@@ -243,36 +230,40 @@ static void pthreadpool_prep_atfork(void)
 		       pthreadpool_child);
 }
 
-/*
- * Do a pthread_join() on all children that have exited, pool->mutex must be
- * locked
- */
-static void pthreadpool_join_children(struct pthreadpool *pool)
+static int pthreadpool_free(struct pthreadpool *pool)
 {
-	int i;
+	int ret, ret1;
 
-	for (i=0; i<pool->num_exited; i++) {
-		int ret;
+	ret = pthread_mutex_unlock(&pool->mutex);
+	assert(ret == 0);
 
-		ret = pthread_join(pool->exited[i], NULL);
-		if (ret != 0) {
-			/*
-			 * Severe internal error, we can't do much but
-			 * abort here.
-			 */
-			abort();
-		}
+	ret = pthread_mutex_destroy(&pool->mutex);
+	ret1 = pthread_cond_destroy(&pool->condvar);
+
+	if (ret != 0) {
+		return ret;
+	}
+	if (ret1 != 0) {
+		return ret1;
 	}
-	pool->num_exited = 0;
 
-	/*
-	 * Deliberately not free and NULL pool->exited. That will be
-	 * re-used by realloc later.
-	 */
+	ret = pthread_mutex_lock(&pthreadpools_mutex);
+	if (ret != 0) {
+		return ret;
+	}
+	DLIST_REMOVE(pthreadpools, pool);
+	ret = pthread_mutex_unlock(&pthreadpools_mutex);
+	assert(ret == 0);
+
+	free(pool->jobs);
+	free(pool);
+
+	return 0;
 }
 
 /*
- * Destroy a thread pool, finishing all threads working for it
+ * Destroy a thread pool. Wake up all idle threads for exit. The last
+ * one will free the pool.
  */
 
 int pthreadpool_destroy(struct pthreadpool *pool)
@@ -284,98 +275,49 @@ int pthreadpool_destroy(struct pthreadpool *pool)
 		return ret;
 	}
 
-	if ((pool->num_jobs != 0) || pool->shutdown) {
+	if (pool->num_threads == 0) {
+		ret = pthreadpool_free(pool);
+		return ret;
+	}
+
+	if (pool->shutdown) {
 		ret = pthread_mutex_unlock(&pool->mutex);
 		assert(ret == 0);
 		return EBUSY;
 	}
 
-	if (pool->num_threads > 0) {
-		/*
-		 * We have active threads, tell them to finish, wait for that.
-		 */
-
-		pool->shutdown = true;
-
-		if (pool->num_idle > 0) {
-			/*
-			 * Wake the idle threads. They will find
-			 * pool->shutdown to be set and exit themselves
-			 */
-			ret = pthread_cond_broadcast(&pool->condvar);
-			if (ret != 0) {
-				pthread_mutex_unlock(&pool->mutex);
-				return ret;
-			}
-		}
-
-		while ((pool->num_threads > 0) || (pool->num_exited > 0)) {
-
-			if (pool->num_exited > 0) {
-				pthreadpool_join_children(pool);
-				continue;
-			}
-			/*
-			 * A thread that shuts down will also signal
-			 * pool->condvar
-			 */
-			ret = pthread_cond_wait(&pool->condvar, &pool->mutex);
-			if (ret != 0) {
-				pthread_mutex_unlock(&pool->mutex);
-				return ret;
-			}
-		}
-	}
+	/*
+	 * We have active threads, tell them to finish.
+	 */
 
-	ret = pthread_mutex_unlock(&pool->mutex);
-	if (ret != 0) {
-		return ret;
-	}
-	ret = pthread_mutex_destroy(&pool->mutex);
-	ret1 = pthread_cond_destroy(&pool->condvar);
+	pool->shutdown = true;
 
-	if (ret != 0) {
-		return ret;
-	}
-	if (ret1 != 0) {
-		return ret1;
-	}
+	ret = pthread_cond_broadcast(&pool->condvar);
 
-	ret = pthread_mutex_lock(&pthreadpools_mutex);
-	if (ret != 0) {
-		return ret;
-	}
-	DLIST_REMOVE(pthreadpools, pool);
-	ret = pthread_mutex_unlock(&pthreadpools_mutex);
-	assert(ret == 0);
+	ret1 = pthread_mutex_unlock(&pool->mutex);
+	assert(ret1 == 0);
 
-	free(pool->exited);
-	free(pool->jobs);
-	free(pool);
-
-	return 0;
+	return ret;
 }
 
 /*
- * Prepare for pthread_exit(), pool->mutex must be locked
+ * Prepare for pthread_exit(), pool->mutex must be locked and will be
+ * unlocked here. This is a bit of a layering violation, but here we
+ * also take care of removing the pool if we're the last thread.
  */
 static void pthreadpool_server_exit(struct pthreadpool *pool)
 {
-	pthread_t *exited;
+	int ret;
 
 	pool->num_threads -= 1;
 
-	exited = (pthread_t *)realloc(
-		pool->exited, sizeof(pthread_t) * (pool->num_exited + 1));
-
-	if (exited == NULL) {
-		/* lost a thread status */
+	if (pool->shutdown && (pool->num_threads == 0)) {
+		pthreadpool_free(pool);
 		return;
 	}
-	pool->exited = exited;
 
-	pool->exited[pool->num_exited] = pthread_self();
-	pool->num_exited += 1;
+	ret = pthread_mutex_unlock(&pool->mutex);
+	assert(ret == 0);
 }
 
 static bool pthreadpool_get_job(struct pthreadpool *p,
@@ -470,7 +412,6 @@ static void *pthreadpool_server(void *arg)
 					 * us. Exit.
 					 */
 					pthreadpool_server_exit(pool);
-					pthread_mutex_unlock(&pool->mutex);
 					return NULL;
 				}
 
@@ -500,7 +441,6 @@ static void *pthreadpool_server(void *arg)
 
 			if (ret != 0) {
 				pthreadpool_server_exit(pool);
-				pthread_mutex_unlock(&pool->mutex);
 				return NULL;
 			}
 		}
@@ -511,16 +451,6 @@ static void *pthreadpool_server(void *arg)
 			 * exit
 			 */
 			pthreadpool_server_exit(pool);
-
-			if (pool->num_threads == 0) {
-				/*
-				 * Ping the main thread waiting for all of us
-				 * workers to have quit.
-				 */
-				pthread_cond_broadcast(&pool->condvar);
-			}
-
-			pthread_mutex_unlock(&pool->mutex);
 			return NULL;
 		}
 	}
@@ -529,6 +459,7 @@ static void *pthreadpool_server(void *arg)
 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
 			void (*fn)(void *private_data), void *private_data)
 {
+	pthread_attr_t thread_attr;
 	pthread_t thread_id;
 	int res;
 	sigset_t mask, omask;
@@ -549,11 +480,6 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
 	}
 
 	/*
-	 * Just some cleanup under the mutex
-	 */
-	pthreadpool_join_children(pool);
-
-	/*
 	 * Add job to the end of the queue
 	 */
 	if (!pthreadpool_put_job(pool, job_id, fn, private_data)) {
@@ -585,20 +511,37 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
 
 	sigfillset(&mask);
 
+	res = pthread_attr_init(&thread_attr);
+	if (res != 0) {
+		pthread_mutex_unlock(&pool->mutex);
+		return res;
+	}
+
+	res = pthread_attr_setdetachstate(
+		&thread_attr, PTHREAD_CREATE_DETACHED);
+	if (res != 0) {
+		pthread_attr_destroy(&thread_attr);
+		pthread_mutex_unlock(&pool->mutex);
+		return res;
+	}
+
         res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
 	if (res != 0) {
+		pthread_attr_destroy(&thread_attr);
 		pthread_mutex_unlock(&pool->mutex);
 		return res;
 	}
 
 	res = pthread_create(&thread_id, NULL, pthreadpool_server,
-				(void *)pool);
+			     (void *)pool);
 	if (res == 0) {
 		pool->num_threads += 1;
 	}
 
         assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
 
+	pthread_attr_destroy(&thread_attr);
+
 	pthread_mutex_unlock(&pool->mutex);
 	return res;
 }
diff --git a/source3/lib/pthreadpool/pthreadpool.h b/source3/lib/pthreadpool/pthreadpool.h
index ee9d957..defbe5a 100644
--- a/source3/lib/pthreadpool/pthreadpool.h
+++ b/source3/lib/pthreadpool/pthreadpool.h
@@ -53,8 +53,13 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
 /**
  * @brief Destroy a pthreadpool
  *
- * Destroy a pthreadpool. If jobs are still active, this returns
- * EBUSY.
+ * Destroy a pthreadpool. If jobs are submitted, but not yet active in
+ * a thread, they won't get executed. If a job has already been
+ * submitted to a thread, the job function will continue running, and
+ * the signal function might still be called. The caller of
+ * pthreadpool_init must make sure the required resources are still
+ * around when the pool is destroyed with pending jobs.  The last
+ * thread to exit will finally free() the pool memory.
  *
  * @param[in]	pool		The pool to destroy
  * @return			success: 0, failure: errno
-- 
2.7.4


>From d6c972c368be9ff4fbdc51cde2715afd0b094305 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 9 Sep 2016 15:18:41 +0200
Subject: [PATCH 09/26] pthreadpool_pipe: Implement EBUSY for _destroy

Restore EBUSY on pthreadpool_pipe_destroy.

We need to count jobs in pthreadpool_pipe so that pthreadpool can exit with
active jobs. Unfortunately this makes pthreadpool_pipe_add_job non-threadsafe.
We could add mutexes around "num_jobs", but this would mean another set of
pthread_atfork functions. As we don't use threaded pthreadpool_pipe_add_job
except in the tests, just remove the tests...

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/lib/pthreadpool/pthreadpool_pipe.c |  28 +++-
 source3/lib/pthreadpool/tests.c            | 220 ++---------------------------
 2 files changed, 38 insertions(+), 210 deletions(-)

diff --git a/source3/lib/pthreadpool/pthreadpool_pipe.c b/source3/lib/pthreadpool/pthreadpool_pipe.c
index f7995ab..d6d519a 100644
--- a/source3/lib/pthreadpool/pthreadpool_pipe.c
+++ b/source3/lib/pthreadpool/pthreadpool_pipe.c
@@ -24,6 +24,7 @@
 
 struct pthreadpool_pipe {
 	struct pthreadpool *pool;
+	int num_jobs;
 	pid_t pid;
 	int pipe_fds[2];
 };
@@ -39,7 +40,7 @@ int pthreadpool_pipe_init(unsigned max_threads,
 	struct pthreadpool_pipe *pool;
 	int ret;
 
-	pool = malloc(sizeof(struct pthreadpool_pipe));
+	pool = calloc(1, sizeof(struct pthreadpool_pipe));
 	if (pool == NULL) {
 		return ENOMEM;
 	}
@@ -88,6 +89,10 @@ int pthreadpool_pipe_destroy(struct pthreadpool_pipe *pool)
 {
 	int ret;
 
+	if (pool->num_jobs != 0) {
+		return EBUSY;
+	}
+
 	ret = pthreadpool_destroy(pool->pool);
 	if (ret != 0) {
 		return ret;
@@ -132,6 +137,7 @@ static int pthreadpool_pipe_reinit(struct pthreadpool_pipe *pool)
 	}
 
 	pool->pipe_fds[0] = signal_fd;
+	pool->num_jobs = 0;
 
 	return 0;
 }
@@ -148,7 +154,13 @@ int pthreadpool_pipe_add_job(struct pthreadpool_pipe *pool, int job_id,
 	}
 
 	ret = pthreadpool_add_job(pool->pool, job_id, fn, private_data);
-	return ret;
+	if (ret != 0) {
+		return ret;
+	}
+
+	pool->num_jobs += 1;
+
+	return 0;
 }
 
 int pthreadpool_pipe_signal_fd(struct pthreadpool_pipe *pool)
@@ -159,7 +171,7 @@ int pthreadpool_pipe_signal_fd(struct pthreadpool_pipe *pool)
 int pthreadpool_pipe_finished_jobs(struct pthreadpool_pipe *pool, int *jobids,
 				   unsigned num_jobids)
 {
-	ssize_t to_read, nread;
+	ssize_t to_read, nread, num_jobs;
 	pid_t pid = getpid();
 
 	if (pool->pid != pid) {
@@ -178,5 +190,13 @@ int pthreadpool_pipe_finished_jobs(struct pthreadpool_pipe *pool, int *jobids,
 	if ((nread % sizeof(int)) != 0) {
 		return -EINVAL;
 	}
-	return nread / sizeof(int);
+
+	num_jobs = nread / sizeof(int);
+
+	if (num_jobs > pool->num_jobs) {
+		return -EINVAL;
+	}
+	pool->num_jobs -= num_jobs;
+
+	return num_jobs;
 }
diff --git a/source3/lib/pthreadpool/tests.c b/source3/lib/pthreadpool/tests.c
index 0b48b41..933808e 100644
--- a/source3/lib/pthreadpool/tests.c
+++ b/source3/lib/pthreadpool/tests.c
@@ -22,7 +22,7 @@ static int test_init(void)
 	}
 	ret = pthreadpool_pipe_destroy(p);
 	if (ret != 0) {
-		fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
+		fprintf(stderr, "pthreadpool_pipe_destroy failed: %s\n",
 			strerror(ret));
 		return -1;
 	}
@@ -72,6 +72,11 @@ static int test_jobs(int num_threads, int num_jobs)
 	for (i=0; i<num_jobs; i++) {
 		int jobid = -1;
 		ret = pthreadpool_pipe_finished_jobs(p, &jobid, 1);
+		if (ret < 0) {
+			fprintf(stderr, "pthreadpool_pipe_finished_jobs "
+				"failed: %s\n", strerror(-ret));
+			return -1;
+		}
 		if ((ret != 1) || (jobid >= num_jobs)) {
 			fprintf(stderr, "invalid job number %d\n", jobid);
 			return -1;
@@ -103,7 +108,7 @@ static int test_busydestroy(void)
 	struct pthreadpool_pipe *p;
 	int timeout = 50;
 	struct pollfd pfd;
-	int ret;
+	int ret, jobid;
 
 	ret = pthreadpool_pipe_init(1, &p);
 	if (ret != 0) {
@@ -128,6 +133,13 @@ static int test_busydestroy(void)
 
 	poll(&pfd, 1, -1);
 
+	ret = pthreadpool_pipe_finished_jobs(p, &jobid, 1);
+	if (ret < 0) {
+		fprintf(stderr, "pthreadpool_pipe_finished_jobs failed: %s\n",
+			strerror(-ret));
+		return -1;
+	}
+
 	ret = pthreadpool_pipe_destroy(p);
 	if (ret != 0) {
 		fprintf(stderr, "pthreadpool_pipe_destroy failed: %s\n",
@@ -137,191 +149,6 @@ static int test_busydestroy(void)
 	return 0;
 }
 
-struct threaded_state {
-	pthread_t tid;
-	struct pthreadpool_pipe *p;
-	int start_job;
-	int num_jobs;
-	int timeout;
-};
-
-static void *test_threaded_worker(void *p)
-{
-	struct threaded_state *state = (struct threaded_state *)p;
-	int i;
-
-	for (i=0; i<state->num_jobs; i++) {
-		int ret = pthreadpool_pipe_add_job(
-			state->p, state->start_job + i,
-			test_sleep, &state->timeout);
-		if (ret != 0) {
-			fprintf(stderr, "pthreadpool_pipe_add_job failed: "
-				"%s\n", strerror(ret));
-			return NULL;
-		}
-	}
-	return NULL;
-}
-
-static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
-				int num_jobs)
-{
-	struct pthreadpool_pipe **pools;
-	struct threaded_state *states;
-	struct threaded_state *state;
-	struct pollfd *pfds;
-	char *finished;
-	pid_t child;
-	int i, ret, poolnum;
-	int received;
-
-	states = calloc(num_threads, sizeof(struct threaded_state));
-	if (states == NULL) {
-		fprintf(stderr, "calloc failed\n");
-		return -1;
-	}
-
-	finished = calloc(num_threads * num_jobs, 1);
-	if (finished == NULL) {
-		fprintf(stderr, "calloc failed\n");
-		return -1;
-	}
-
-	pools = calloc(num_pools, sizeof(struct pthreadpool_pipe *));
-	if (pools == NULL) {
-		fprintf(stderr, "calloc failed\n");
-		return -1;
-	}
-
-	pfds = calloc(num_pools, sizeof(struct pollfd));
-	if (pfds == NULL) {
-		fprintf(stderr, "calloc failed\n");
-		return -1;
-	}
-
-	for (i=0; i<num_pools; i++) {
-		ret = pthreadpool_pipe_init(poolsize, &pools[i]);
-		if (ret != 0) {
-			fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
-				strerror(ret));
-			return -1;
-		}
-		pfds[i].fd = pthreadpool_pipe_signal_fd(pools[i]);
-		pfds[i].events = POLLIN|POLLHUP;
-	}
-
-	poolnum = 0;
-
-	for (i=0; i<num_threads; i++) {
-		state = &states[i];
-
-		state->p = pools[poolnum];
-		poolnum = (poolnum + 1) % num_pools;
-
-		state->num_jobs = num_jobs;
-		state->timeout = 1;
-		state->start_job = i * num_jobs;
-
-		ret = pthread_create(&state->tid, NULL, test_threaded_worker,
-				     state);
-		if (ret != 0) {
-			fprintf(stderr, "pthread_create failed: %s\n",
-				strerror(ret));
-			return -1;
-		}
-	}
-
-	if (random() % 1) {
-		poll(NULL, 0, 1);
-	}
-
-	child = fork();
-	if (child < 0) {
-		fprintf(stderr, "fork failed: %s\n", strerror(errno));
-		return -1;
-	}
-	if (child == 0) {
-		for (i=0; i<num_pools; i++) {
-			ret = pthreadpool_pipe_destroy(pools[i]);
-			if (ret != 0) {
-				fprintf(stderr, "pthreadpool_pipe_destroy "
-					"failed: %s\n", strerror(ret));
-				exit(1);
-			}
-		}
-		/* child */
-		exit(0);
-	}
-
-	for (i=0; i<num_threads; i++) {
-		ret = pthread_join(states[i].tid, NULL);
-		if (ret != 0) {
-			fprintf(stderr, "pthread_join(%d) failed: %s\n",
-				i, strerror(ret));
-			return -1;
-		}
-	}
-
-	received = 0;
-
-	while (received < num_threads*num_jobs) {
-		int j;
-
-		ret = poll(pfds, num_pools, 1000);
-		if (ret == -1) {
-			fprintf(stderr, "poll failed: %s\n",
-				strerror(errno));
-			return -1;
-		}
-		if (ret == 0) {
-			fprintf(stderr, "\npoll timed out\n");
-			break;
-		}
-
-		for (j=0; j<num_pools; j++) {
-			int jobid = -1;
-
-			if ((pfds[j].revents & (POLLIN|POLLHUP)) == 0) {
-				continue;
-			}
-
-			ret = pthreadpool_pipe_finished_jobs(
-				pools[j], &jobid, 1);
-			if ((ret != 1) || (jobid >= num_jobs * num_threads)) {
-				fprintf(stderr, "invalid job number %d\n",
-					jobid);
-				return -1;
-			}
-			finished[jobid] += 1;
-			received += 1;
-		}
-	}
-
-	for (i=0; i<num_threads*num_jobs; i++) {
-		if (finished[i] != 1) {
-			fprintf(stderr, "finished[%d] = %d\n",
-				i, finished[i]);
-			return -1;
-		}
-	}
-
-	for (i=0; i<num_pools; i++) {
-		ret = pthreadpool_pipe_destroy(pools[i]);
-		if (ret != 0) {
-			fprintf(stderr, "pthreadpool_pipe_destroy failed: "
-				"%s\n",	strerror(ret));
-			return -1;
-		}
-	}
-
-	free(pfds);
-	free(pools);
-	free(states);
-	free(finished);
-
-	return 0;
-}
-
 static int test_fork(void)
 {
 	struct pthreadpool_pipe *p;
@@ -390,25 +217,6 @@ int main(void)
 		return 1;
 	}
 
-	/*
-	 * Test 10 threads adding jobs on a single pool
-	 */
-	ret = test_threaded_addjob(1, 10, 5, 5000);
-	if (ret != 0) {
-		fprintf(stderr, "test_jobs failed\n");
-		return 1;
-	}
-
-	/*
-	 * Test 10 threads on 3 pools to verify our fork handling
-	 * works right.
-	 */
-	ret = test_threaded_addjob(3, 10, 5, 5000);
-	if (ret != 0) {
-		fprintf(stderr, "test_jobs failed\n");
-		return 1;
-	}
-
 	printf("success\n");
 	return 0;
 }
-- 
2.7.4


>From 73b0763246aec61cc3283a37c184794678d17a44 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 9 Sep 2016 13:27:13 +0200
Subject: [PATCH 10/26] pthreadpool_tevent: Move the
 pthreadpool_tevent_job_state declaration

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/lib/pthreadpool/pthreadpool_tevent.c | 24 ++++++++++++------------
 1 file changed, 12 insertions(+), 12 deletions(-)

diff --git a/source3/lib/pthreadpool/pthreadpool_tevent.c b/source3/lib/pthreadpool/pthreadpool_tevent.c
index 0b7a55a..e4efc2d 100644
--- a/source3/lib/pthreadpool/pthreadpool_tevent.c
+++ b/source3/lib/pthreadpool/pthreadpool_tevent.c
@@ -31,6 +31,18 @@ struct pthreadpool_tevent {
 	struct pthreadpool_tevent_job_state *jobs;
 };
 
+struct pthreadpool_tevent_job_state {
+	struct pthreadpool_tevent_job_state *prev, *next;
+	struct pthreadpool_tevent *pool;
+	struct tevent_context *ev;
+	struct tevent_threaded_context *tctx;
+	struct tevent_immediate *im;
+	struct tevent_req *req;
+
+	void (*fn)(void *private_data);
+	void *private_data;
+};
+
 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool);
 
 static int pthreadpool_tevent_job_signal(int jobid,
@@ -79,18 +91,6 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
 	return 0;
 }
 
-struct pthreadpool_tevent_job_state {
-	struct pthreadpool_tevent_job_state *prev, *next;
-	struct pthreadpool_tevent *pool;
-	struct tevent_context *ev;
-	struct tevent_threaded_context *tctx;
-	struct tevent_immediate *im;
-	struct tevent_req *req;
-
-	void (*fn)(void *private_data);
-	void *private_data;
-};
-
 static void pthreadpool_tevent_job_fn(void *private_data);
 static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
 					struct tevent_immediate *im,
-- 
2.7.4


>From 4a2b562e5d780540f8e05d436690e7d0a8932562 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 9 Sep 2016 13:28:51 +0200
Subject: [PATCH 11/26] pthreadpool_tevent: Drop running jobs on talloc_free

Enable us to destroy a pthreadpool_tevent structure with active jobs

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/lib/pthreadpool/pthreadpool_tevent.c | 15 ++++++++++-----
 1 file changed, 10 insertions(+), 5 deletions(-)

diff --git a/source3/lib/pthreadpool/pthreadpool_tevent.c b/source3/lib/pthreadpool/pthreadpool_tevent.c
index e4efc2d..253a867 100644
--- a/source3/lib/pthreadpool/pthreadpool_tevent.c
+++ b/source3/lib/pthreadpool/pthreadpool_tevent.c
@@ -76,6 +76,7 @@ int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
 
 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
 {
+	struct pthreadpool_tevent_job_state *state, *next;
 	int ret;
 
 	ret = pthreadpool_destroy(pool->pool);
@@ -84,8 +85,10 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
 	}
 	pool->pool = NULL;
 
-	if (pool->jobs != NULL) {
-		abort();
+	for (state = pool->jobs; state != NULL; state = next) {
+		next = state->next;
+		DLIST_REMOVE(pool->jobs, state);
+		state->pool = NULL;
 	}
 
 	return 0;
@@ -114,7 +117,7 @@ static int pthreadpool_tevent_job_state_destructor(
 	/*
 	 * We need to reparent to a long term context.
 	 */
-	(void)talloc_reparent(state->req, state->pool, state);
+	(void)talloc_reparent(state->req, NULL, state);
 	state->req = NULL;
 	return -1;
 }
@@ -214,8 +217,10 @@ static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
 	struct pthreadpool_tevent_job_state *state = talloc_get_type_abort(
 		private_data, struct pthreadpool_tevent_job_state);
 
-	DLIST_REMOVE(state->pool->jobs, state);
-	state->pool = NULL;
+	if (state->pool != NULL) {
+		DLIST_REMOVE(state->pool->jobs, state);
+		state->pool = NULL;
+	}
 
 	TALLOC_FREE(state->tctx);
 
-- 
2.7.4


>From ca14b6f11da93fba5a36718257b7a16311893265 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 9 Sep 2016 16:42:05 +0200
Subject: [PATCH 12/26] pthreadpool: Add a small test for pthreadpool_tevent

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/lib/pthreadpool/tests.c | 82 +++++++++++++++++++++++++++++++++++++++++
 1 file changed, 82 insertions(+)

diff --git a/source3/lib/pthreadpool/tests.c b/source3/lib/pthreadpool/tests.c
index 933808e..4d211f2 100644
--- a/source3/lib/pthreadpool/tests.c
+++ b/source3/lib/pthreadpool/tests.c
@@ -8,6 +8,7 @@
 #include <sys/types.h>
 #include <sys/wait.h>
 #include "pthreadpool_pipe.h"
+#include "pthreadpool_tevent.h"
 
 static int test_init(void)
 {
@@ -189,10 +190,91 @@ static int test_fork(void)
 	return 0;
 }
 
+static void test_tevent_wait(void *private_data)
+{
+	int *timeout = private_data;
+	poll(NULL, 0, *timeout);
+}
+
+static int test_tevent_1(void)
+{
+	struct tevent_context *ev;
+	struct pthreadpool_tevent *pool;
+	struct tevent_req *req1, *req2;
+	int timeout10 = 10;
+	int timeout100 = 100;
+	int ret;
+	bool ok;
+
+	ev = tevent_context_init(NULL);
+	if (ev == NULL) {
+		ret = errno;
+		fprintf(stderr, "tevent_context_init failed: %s\n",
+			strerror(ret));
+		return ret;
+	}
+	ret = pthreadpool_tevent_init(ev, 0, &pool);
+	if (ret != 0) {
+		fprintf(stderr, "pthreadpool_tevent_init failed: %s\n",
+			strerror(ret));
+		TALLOC_FREE(ev);
+		return ret;
+	}
+	req1 = pthreadpool_tevent_job_send(
+		ev, ev, pool, test_tevent_wait, &timeout10);
+	if (req1 == NULL) {
+		fprintf(stderr, "pthreadpool_tevent_job_send failed\n");
+		TALLOC_FREE(ev);
+		return ENOMEM;
+	}
+	req2 = pthreadpool_tevent_job_send(
+		ev, ev, pool, test_tevent_wait, &timeout100);
+	if (req2 == NULL) {
+		fprintf(stderr, "pthreadpool_tevent_job_send failed\n");
+		TALLOC_FREE(ev);
+		return ENOMEM;
+	}
+	ok = tevent_req_poll(req2, ev);
+	if (!ok) {
+		ret = errno;
+		fprintf(stderr, "tevent_req_poll failed: %s\n",
+			strerror(ret));
+		TALLOC_FREE(ev);
+		return ret;
+	}
+	ret = pthreadpool_tevent_job_recv(req1);
+	TALLOC_FREE(req1);
+	if (ret != 0) {
+		fprintf(stderr, "tevent_req_poll failed: %s\n",
+			strerror(ret));
+		TALLOC_FREE(ev);
+		return ret;
+	}
+
+	TALLOC_FREE(req2);
+
+	ret = tevent_loop_wait(ev);
+	if (ret != 0) {
+		fprintf(stderr, "tevent_loop_wait failed\n");
+		return ret;
+	}
+
+	TALLOC_FREE(pool);
+	TALLOC_FREE(ev);
+	return 0;
+}
+
 int main(void)
 {
 	int ret;
 
+	ret = test_tevent_1();
+	if (ret != 0) {
+		fprintf(stderr, "test_event_1 failed: %s\n",
+			strerror(ret));
+		return 1;
+	}
+
 	ret = test_init();
 	if (ret != 0) {
 		fprintf(stderr, "test_init failed\n");
-- 
2.7.4


>From 573cd2e47ef7df02c4501a4e64ffaa38c4a3a9e0 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 9 Sep 2016 16:51:00 +0200
Subject: [PATCH 13/26] messages_dgm: Convert to pthreadpool_tevent

This itself adds a lot of code, however it removes the unix_msg library.

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/lib/messages_dgm.c | 937 ++++++++++++++++++++++++++++++++++++++++++---
 source3/wscript_build      |   5 +-
 2 files changed, 884 insertions(+), 58 deletions(-)

diff --git a/source3/lib/messages_dgm.c b/source3/lib/messages_dgm.c
index 3aa110c..5f82168 100644
--- a/source3/lib/messages_dgm.c
+++ b/source3/lib/messages_dgm.c
@@ -21,11 +21,18 @@
 #include "system/network.h"
 #include "system/filesys.h"
 #include "system/dir.h"
+#include "system/select.h"
 #include "lib/util/debug.h"
-#include "lib/unix_msg/unix_msg.h"
 #include "lib/messages_dgm.h"
-#include "poll_funcs/poll_funcs_tevent.h"
 #include "lib/util/genrand.h"
+#include "lib/util/dlinklist.h"
+#include "lib/pthreadpool/pthreadpool_tevent.h"
+#include "lib/util/msghdr.h"
+#include "lib/util/iov_buf.h"
+#include "lib/util/blocking.h"
+#include "lib/util/tevent_unix.h"
+
+#define MESSAGING_DGM_FRAGMENT_LENGTH 1024
 
 struct sun_path_buf {
 	/*
@@ -34,15 +41,43 @@ struct sun_path_buf {
 	char buf[sizeof(struct sockaddr_un)];
 };
 
+struct messaging_dgm_context;
+
+struct messaging_dgm_out {
+	struct messaging_dgm_out *prev, *next;
+	struct messaging_dgm_context *ctx;
+
+	pid_t pid;
+	int sock;
+	bool is_blocking;
+	uint64_t cookie;
+
+	struct tevent_queue *queue;
+	struct tevent_timer *idle_timer;
+};
+
+struct messaging_dgm_in_msg {
+	struct messaging_dgm_in_msg *prev, *next;
+	struct messaging_dgm_context *ctx;
+	size_t msglen;
+	size_t received;
+	pid_t sender_pid;
+	int sender_sock;
+	uint64_t cookie;
+	uint8_t buf[];
+};
+
 struct messaging_dgm_context {
+	struct tevent_context *ev;
 	pid_t pid;
-	struct poll_funcs *msg_callbacks;
-	void *tevent_handle;
-	struct unix_msg_ctx *dgm_ctx;
 	struct sun_path_buf socket_dir;
 	struct sun_path_buf lockfile_dir;
 	int lockfile_fd;
 
+	int sock;
+	struct tevent_fd *read_fde;
+	struct messaging_dgm_in_msg *in_msgs;
+
 	void (*recv_cb)(const uint8_t *msg,
 			size_t msg_len,
 			int *fds,
@@ -51,14 +86,615 @@ struct messaging_dgm_context {
 	void *recv_cb_private_data;
 
 	bool *have_dgm_context;
+
+	struct pthreadpool_tevent *pool;
+	struct messaging_dgm_out *outsocks;
 };
 
-static struct messaging_dgm_context *global_dgm_context;
+/* Set socket close on exec. */
+static int prepare_socket_cloexec(int sock)
+{
+#ifdef FD_CLOEXEC
+	int flags;
 
-static void messaging_dgm_recv(struct unix_msg_ctx *ctx,
-			       uint8_t *msg, size_t msg_len,
-			       int *fds, size_t num_fds,
-			       void *private_data);
+	flags = fcntl(sock, F_GETFD, 0);
+	if (flags == -1) {
+		return errno;
+	}
+	flags |= FD_CLOEXEC;
+	if (fcntl(sock, F_SETFD, flags) == -1) {
+		return errno;
+	}
+#endif
+	return 0;
+}
+
+static void close_fd_array(int *fds, size_t num_fds)
+{
+	size_t i;
+
+	for (i = 0; i < num_fds; i++) {
+		if (fds[i] == -1) {
+			continue;
+		}
+
+		close(fds[i]);
+		fds[i] = -1;
+	}
+}
+
+static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
+					   struct tevent_timer *te,
+					   struct timeval current_time,
+					   void *private_data)
+{
+	struct messaging_dgm_out *out = talloc_get_type_abort(
+		private_data, struct messaging_dgm_out);
+	size_t qlen;
+
+	out->idle_timer = NULL;
+
+	qlen = tevent_queue_length(out->queue);
+	if (qlen == 0) {
+		TALLOC_FREE(out);
+	}
+}
+
+static void messaging_dgm_out_rearm_idle_timer(struct messaging_dgm_out *out)
+{
+	size_t qlen;
+
+	qlen = tevent_queue_length(out->queue);
+	if (qlen != 0) {
+		TALLOC_FREE(out->idle_timer);
+		return;
+	}
+
+	if (out->idle_timer != NULL) {
+		tevent_update_timer(out->idle_timer,
+				    tevent_timeval_current_ofs(1, 0));
+		return;
+	}
+
+	out->idle_timer = tevent_add_timer(
+		out->ctx->ev, out, tevent_timeval_current_ofs(1, 0),
+		messaging_dgm_out_idle_handler, out);
+	/*
+	 * No NULL check, we'll come back here. Worst case we're
+	 * leaking a bit.
+	 */
+}
+
+static int messaging_dgm_out_destructor(struct messaging_dgm_out *dst);
+static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
+					   struct tevent_timer *te,
+					   struct timeval current_time,
+					   void *private_data);
+
+static int messaging_dgm_out_create(TALLOC_CTX *mem_ctx,
+				    struct messaging_dgm_context *ctx,
+				    pid_t pid, struct messaging_dgm_out **pout)
+{
+	struct messaging_dgm_out *out;
+	struct sockaddr_un addr = { .sun_family = AF_UNIX };
+	int ret = ENOMEM;
+	int out_pathlen;
+
+	out = talloc(mem_ctx, struct messaging_dgm_out);
+	if (out == NULL) {
+		goto fail;
+	}
+
+	*out = (struct messaging_dgm_out) {
+		.pid = pid,
+		.ctx = ctx,
+		.cookie = 1
+	};
+
+	out_pathlen = snprintf(addr.sun_path, sizeof(addr.sun_path),
+			       "%s/%u", ctx->socket_dir.buf, (unsigned)pid);
+	if (out_pathlen < 0) {
+		goto errno_fail;
+	}
+	if ((size_t)out_pathlen >= sizeof(addr.sun_path)) {
+		ret = ENAMETOOLONG;
+		goto fail;
+	}
+
+	out->queue = tevent_queue_create(out, addr.sun_path);
+	if (out->queue == NULL) {
+		ret = ENOMEM;
+		goto fail;
+	}
+
+	out->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
+	if (out->sock == -1) {
+		goto errno_fail;
+	}
+
+	DLIST_ADD(ctx->outsocks, out);
+	talloc_set_destructor(out, messaging_dgm_out_destructor);
+
+	do {
+		ret = connect(out->sock,
+			      (const struct sockaddr *)(const void *)&addr,
+			      sizeof(addr));
+	} while ((ret == -1) && (errno == EINTR));
+
+	if (ret == -1) {
+		goto errno_fail;
+	}
+
+	ret = set_blocking(out->sock, false);
+	if (ret == -1) {
+		goto errno_fail;
+	}
+	out->is_blocking = false;
+
+	*pout = out;
+	return 0;
+errno_fail:
+	ret = errno;
+fail:
+	TALLOC_FREE(out);
+	return ret;
+}
+
+static int messaging_dgm_out_destructor(struct messaging_dgm_out *out)
+{
+	DLIST_REMOVE(out->ctx->outsocks, out);
+
+	if (tevent_queue_length(out->queue) != 0) {
+		/*
+		 * We have pending jobs. We can't close the socket,
+		 * this has been handed over to messaging_dgm_out_queue_state.
+		 */
+		return 0;
+	}
+
+	if (out->sock != -1) {
+		close(out->sock);
+		out->sock = -1;
+	}
+	return 0;
+}
+
+static int messaging_dgm_out_get(struct messaging_dgm_context *ctx, pid_t pid,
+				 struct messaging_dgm_out **pout)
+{
+	struct messaging_dgm_out *out;
+	int ret;
+
+	for (out = ctx->outsocks; out != NULL; out = out->next) {
+		if (out->pid == pid) {
+			break;
+		}
+	}
+
+	if (out == NULL) {
+		ret = messaging_dgm_out_create(ctx, ctx, pid, &out);
+		if (ret != 0) {
+			return ret;
+		}
+	}
+
+	messaging_dgm_out_rearm_idle_timer(out);
+
+	*pout = out;
+	return 0;
+}
+
+static ssize_t messaging_dgm_sendmsg(int sock,
+				     const struct iovec *iov, int iovlen,
+				     const int *fds, size_t num_fds,
+				     int *perrno)
+{
+	struct msghdr msg;
+	ssize_t fdlen, ret;
+
+	/*
+	 * Do the actual sendmsg syscall. This will be called from a
+	 * pthreadpool helper thread, so be careful what you do here.
+	 */
+
+	msg = (struct msghdr) {
+		.msg_iov = discard_const_p(struct iovec, iov),
+		.msg_iovlen = iovlen
+	};
+
+	fdlen = msghdr_prep_fds(&msg, NULL, 0, fds, num_fds);
+	if (fdlen == -1) {
+		*perrno = EINVAL;
+		return -1;
+	}
+
+	{
+		uint8_t buf[fdlen];
+
+		msghdr_prep_fds(&msg, buf, fdlen, fds, num_fds);
+
+		do {
+			ret = sendmsg(sock, &msg, MSG_NOSIGNAL);
+		} while ((ret == -1) && (errno == EINTR));
+	}
+
+	if (ret == -1) {
+		*perrno = errno;
+	}
+	return ret;
+}
+
+struct messaging_dgm_out_queue_state {
+	struct tevent_context *ev;
+	struct pthreadpool_tevent *pool;
+
+	struct tevent_req *req;
+	struct tevent_req *subreq;
+
+	int sock;
+
+	int *fds;
+	uint8_t *buf;
+
+	ssize_t sent;
+	int err;
+};
+
+static int messaging_dgm_out_queue_state_destructor(
+	struct messaging_dgm_out_queue_state *state);
+static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
+					   void *private_data);
+static void messaging_dgm_out_threaded_job(void *private_data);
+static void messaging_dgm_out_queue_done(struct tevent_req *subreq);
+
+static struct tevent_req *messaging_dgm_out_queue_send(
+	TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+	struct messaging_dgm_out *out,
+	const struct iovec *iov, int iovlen, const int *fds, size_t num_fds)
+{
+	struct tevent_req *req;
+	struct messaging_dgm_out_queue_state *state;
+	struct tevent_queue_entry *e;
+	size_t i;
+	ssize_t buflen;
+
+	req = tevent_req_create(out, &state,
+				struct messaging_dgm_out_queue_state);
+	if (req == NULL) {
+		return NULL;
+	}
+	state->ev = ev;
+	state->pool = out->ctx->pool;
+	state->sock = out->sock;
+	state->req = req;
+
+	/*
+	 * Go blocking in a thread
+	 */
+	if (!out->is_blocking) {
+		int ret = set_blocking(out->sock, true);
+		if (ret == -1) {
+			tevent_req_error(req, errno);
+			return tevent_req_post(req, ev);
+		}
+		out->is_blocking = true;
+	}
+
+	buflen = iov_buflen(iov, iovlen);
+	if (buflen == -1) {
+		tevent_req_error(req, EMSGSIZE);
+		return tevent_req_post(req, ev);
+	}
+
+	state->buf = talloc_array(state, uint8_t, buflen);
+	if (tevent_req_nomem(state->buf, req)) {
+		return tevent_req_post(req, ev);
+	}
+	iov_buf(iov, iovlen, state->buf, buflen);
+
+	state->fds = talloc_array(state, int, num_fds);
+	if (tevent_req_nomem(state->fds, req)) {
+		return tevent_req_post(req, ev);
+	}
+
+	for (i=0; i<num_fds; i++) {
+		state->fds[i] = -1;
+	}
+
+	for (i=0; i<num_fds; i++) {
+
+		state->fds[i] = dup(fds[i]);
+
+		if (state->fds[i] == -1) {
+			int ret = errno;
+
+			close_fd_array(state->fds, num_fds);
+
+			tevent_req_error(req, ret);
+			return tevent_req_post(req, ev);
+		}
+	}
+
+	talloc_set_destructor(state, messaging_dgm_out_queue_state_destructor);
+
+	e = tevent_queue_add_entry(out->queue, ev, req,
+				   messaging_dgm_out_queue_trigger, req);
+	if (tevent_req_nomem(e, req)) {
+		return tevent_req_post(req, ev);
+	}
+	return req;
+}
+
+static int messaging_dgm_out_queue_state_destructor(
+	struct messaging_dgm_out_queue_state *state)
+{
+	int *fds;
+	size_t num_fds;
+
+	if (state->subreq != NULL) {
+		/*
+		 * We're scheduled, but we're destroyed. This happens
+		 * if the messaging_dgm_context is destroyed while
+		 * we're stuck in a blocking send. There's nothing we
+		 * can do but to leak memory.
+		 */
+		TALLOC_FREE(state->subreq);
+		(void)talloc_reparent(state->req, NULL, state);
+		return -1;
+	}
+
+	fds = state->fds;
+	num_fds = talloc_array_length(fds);
+	close_fd_array(fds, num_fds);
+	return 0;
+}
+
+static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
+					   void *private_data)
+{
+	struct messaging_dgm_out_queue_state *state = tevent_req_data(
+		req, struct messaging_dgm_out_queue_state);
+
+	state->subreq = pthreadpool_tevent_job_send(
+		state, state->ev, state->pool,
+		messaging_dgm_out_threaded_job, state);
+	if (tevent_req_nomem(state->subreq, req)) {
+		return;
+	}
+	tevent_req_set_callback(state->subreq, messaging_dgm_out_queue_done,
+				req);
+}
+
+static void messaging_dgm_out_threaded_job(void *private_data)
+{
+	struct messaging_dgm_out_queue_state *state = talloc_get_type_abort(
+		private_data, struct messaging_dgm_out_queue_state);
+
+	struct iovec iov = { .iov_base = state->buf,
+			     .iov_len = talloc_get_size(state->buf) };
+	size_t num_fds = talloc_array_length(state->fds);
+
+	state->sent = messaging_dgm_sendmsg(state->sock, &iov, 1,
+					    state->fds, num_fds, &state->err);
+}
+
+static void messaging_dgm_out_queue_done(struct tevent_req *subreq)
+{
+	struct tevent_req *req = tevent_req_callback_data(
+		subreq, struct tevent_req);
+	struct messaging_dgm_out_queue_state *state = tevent_req_data(
+		req, struct messaging_dgm_out_queue_state);
+	int ret;
+
+	if (subreq != state->subreq) {
+		abort();
+	}
+
+	ret = pthreadpool_tevent_job_recv(subreq);
+
+	TALLOC_FREE(subreq);
+	state->subreq = NULL;
+
+	if (tevent_req_error(req, ret)) {
+		return;
+	}
+	if (state->sent == -1) {
+		tevent_req_error(req, state->err);
+		return;
+	}
+	tevent_req_done(req);
+}
+
+static int messaging_dgm_out_queue_recv(struct tevent_req *req)
+{
+	return tevent_req_simple_recv_unix(req);
+}
+
+static void messaging_dgm_out_sent_fragment(struct tevent_req *req);
+
+static int messaging_dgm_out_send_fragment(
+	struct tevent_context *ev, struct messaging_dgm_out *out,
+	const struct iovec *iov, int iovlen, const int *fds, size_t num_fds)
+{
+	struct tevent_req *req;
+	size_t qlen;
+
+	qlen = tevent_queue_length(out->queue);
+	if (qlen == 0) {
+		ssize_t nsent;
+		int err = 0;
+
+		if (out->is_blocking) {
+			int ret = set_blocking(out->sock, false);
+			if (ret == -1) {
+				return errno;
+			}
+			out->is_blocking = false;
+		}
+
+		nsent = messaging_dgm_sendmsg(out->sock, iov, iovlen, fds,
+					      num_fds, &err);
+		if (nsent >= 0) {
+			return 0;
+		}
+
+		if (err != EWOULDBLOCK) {
+			return err;
+		}
+	}
+
+	req = messaging_dgm_out_queue_send(out, ev, out, iov, iovlen,
+					   fds, num_fds);
+	if (req == NULL) {
+		return ENOMEM;
+	}
+	tevent_req_set_callback(req, messaging_dgm_out_sent_fragment, out);
+
+	return 0;
+}
+
+static void messaging_dgm_out_sent_fragment(struct tevent_req *req)
+{
+	struct messaging_dgm_out *out = tevent_req_callback_data(
+		req, struct messaging_dgm_out);
+	int ret;
+
+	ret = messaging_dgm_out_queue_recv(req);
+	TALLOC_FREE(req);
+
+	if (ret != 0) {
+		DBG_WARNING("messaging_out_queue_recv returned %s\n",
+			    strerror(ret));
+	}
+
+	messaging_dgm_out_rearm_idle_timer(out);
+}
+
+
+struct messaging_dgm_fragment_hdr {
+	size_t msglen;
+	pid_t pid;
+	int sock;
+};
+
+static int messaging_dgm_out_send_fragmented(struct tevent_context *ev,
+					     struct messaging_dgm_out *out,
+					     const struct iovec *iov,
+					     int iovlen,
+					     const int *fds, size_t num_fds)
+{
+	ssize_t msglen, sent;
+	int ret = 0;
+	struct iovec iov_copy[iovlen+2];
+	struct messaging_dgm_fragment_hdr hdr;
+	struct iovec src_iov;
+
+	if (iovlen < 0) {
+		return EINVAL;
+	}
+
+	msglen = iov_buflen(iov, iovlen);
+	if (msglen == -1) {
+		return EMSGSIZE;
+	}
+	if (num_fds > INT8_MAX) {
+		return EINVAL;
+	}
+
+	if ((size_t) msglen <=
+	    (MESSAGING_DGM_FRAGMENT_LENGTH - sizeof(uint64_t))) {
+		uint64_t cookie = 0;
+
+		iov_copy[0].iov_base = &cookie;
+		iov_copy[0].iov_len = sizeof(cookie);
+		if (iovlen > 0) {
+			memcpy(&iov_copy[1], iov,
+			       sizeof(struct iovec) * iovlen);
+		}
+
+		return messaging_dgm_out_send_fragment(
+			ev, out, iov_copy, iovlen+1, fds, num_fds);
+
+	}
+
+	hdr = (struct messaging_dgm_fragment_hdr) {
+		.msglen = msglen,
+		.pid = getpid(),
+		.sock = out->sock
+	};
+
+	iov_copy[0].iov_base = &out->cookie;
+	iov_copy[0].iov_len = sizeof(out->cookie);
+	iov_copy[1].iov_base = &hdr;
+	iov_copy[1].iov_len = sizeof(hdr);
+
+	sent = 0;
+	src_iov = iov[0];
+
+	/*
+	 * The following write loop sends the user message in pieces. We have
+	 * filled the first two iovecs above with "cookie" and "hdr". In the
+	 * following loops we pull message chunks from the user iov array and
+	 * fill iov_copy piece by piece, possibly truncating chunks from the
+	 * caller's iov array. Ugly, but hopefully efficient.
+	 */
+
+	while (sent < msglen) {
+		size_t fragment_len;
+		size_t iov_index = 2;
+
+		fragment_len = sizeof(out->cookie) + sizeof(hdr);
+
+		while (fragment_len < MESSAGING_DGM_FRAGMENT_LENGTH) {
+			size_t space, chunk;
+
+			space = MESSAGING_DGM_FRAGMENT_LENGTH - fragment_len;
+			chunk = MIN(space, src_iov.iov_len);
+
+			iov_copy[iov_index].iov_base = src_iov.iov_base;
+			iov_copy[iov_index].iov_len = chunk;
+			iov_index += 1;
+
+			src_iov.iov_base = (char *)src_iov.iov_base + chunk;
+			src_iov.iov_len -= chunk;
+			fragment_len += chunk;
+
+			if (src_iov.iov_len == 0) {
+				iov += 1;
+				iovlen -= 1;
+				if (iovlen == 0) {
+					break;
+				}
+				src_iov = iov[0];
+			}
+		}
+		sent += (fragment_len - sizeof(out->cookie) - sizeof(hdr));
+
+		/*
+		 * only the last fragment should pass the fd array.
+		 * That simplifies the receiver a lot.
+		 */
+		if (sent < msglen) {
+			ret = messaging_dgm_out_send_fragment(
+				ev, out, iov_copy, iov_index, NULL, 0);
+		} else {
+			ret = messaging_dgm_out_send_fragment(
+				ev, out, iov_copy, iov_index, fds, num_fds);
+		}
+		if (ret != 0) {
+			break;
+		}
+	}
+
+	out->cookie += 1;
+	if (out->cookie == 0) {
+		out->cookie += 1;
+	}
+
+	return ret;
+}
+
+static struct messaging_dgm_context *global_dgm_context;
 
 static int messaging_dgm_context_destructor(struct messaging_dgm_context *c);
 
@@ -168,6 +804,11 @@ fail_close:
 	return ret;
 }
 
+static void messaging_dgm_read_handler(struct tevent_context *ev,
+				       struct tevent_fd *fde,
+				       uint16_t flags,
+				       void *private_data);
+
 int messaging_dgm_init(struct tevent_context *ev,
 		       uint64_t *punique,
 		       const char *socket_dir,
@@ -193,6 +834,7 @@ int messaging_dgm_init(struct tevent_context *ev,
 	if (ctx == NULL) {
 		goto fail_nomem;
 	}
+	ctx->ev = ev;
 	ctx->pid = getpid();
 	ctx->recv_cb = recv_cb;
 	ctx->recv_cb_private_data = recv_cb_private_data;
@@ -229,30 +871,52 @@ int messaging_dgm_init(struct tevent_context *ev,
 		return ret;
 	}
 
-	ctx->msg_callbacks = poll_funcs_init_tevent(ctx);
-	if (ctx->msg_callbacks == NULL) {
-		goto fail_nomem;
-	}
+	unlink(socket_address.sun_path);
 
-	ctx->tevent_handle = poll_funcs_tevent_register(
-		ctx, ctx->msg_callbacks, ev);
-	if (ctx->tevent_handle == NULL) {
-		goto fail_nomem;
+	ctx->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
+	if (ctx->sock == -1) {
+		ret = errno;
+		DBG_WARNING("socket failed: %s\n", strerror(ret));
+		TALLOC_FREE(ctx);
+		return ret;
 	}
 
-	unlink(socket_address.sun_path);
+	ret = prepare_socket_cloexec(ctx->sock);
+	if (ret == -1) {
+		ret = errno;
+		DBG_WARNING("prepare_socket_cloexec failed: %s\n",
+			    strerror(ret));
+		TALLOC_FREE(ctx);
+		return ret;
+	}
 
-	ret = unix_msg_init(&socket_address, ctx->msg_callbacks, 1024,
-			    messaging_dgm_recv, ctx, &ctx->dgm_ctx);
-	if (ret != 0) {
-		DEBUG(1, ("unix_msg_init failed: %s\n", strerror(ret)));
+	ret = bind(ctx->sock, (struct sockaddr *)(void *)&socket_address,
+		   sizeof(socket_address));
+	if (ret == -1) {
+		ret = errno;
+		DBG_WARNING("bind failed: %s\n", strerror(ret));
 		TALLOC_FREE(ctx);
 		return ret;
 	}
+
+	ctx->read_fde = tevent_add_fd(ctx->ev, ctx, ctx->sock, TEVENT_FD_READ,
+				      messaging_dgm_read_handler, ctx);
+	if (ctx->read_fde == NULL) {
+		goto fail_nomem;
+	}
+
 	talloc_set_destructor(ctx, messaging_dgm_context_destructor);
 
 	ctx->have_dgm_context = &have_dgm_context;
 
+	ret = pthreadpool_tevent_init(ctx, 0, &ctx->pool);
+	if (ret != 0) {
+		DBG_WARNING("pthreadpool_tevent_init failed: %s\n",
+			    strerror(ret));
+		TALLOC_FREE(ctx);
+		return ret;
+	}
+
 	global_dgm_context = ctx;
 	return 0;
 
@@ -263,17 +927,32 @@ fail_nomem:
 
 static int messaging_dgm_context_destructor(struct messaging_dgm_context *c)
 {
-	/*
-	 * First delete the socket to avoid races. The lockfile is the
-	 * indicator that we're still around.
-	 */
-	unix_msg_free(c->dgm_ctx);
+	while (c->outsocks != NULL) {
+		TALLOC_FREE(c->outsocks);
+	}
+	while (c->in_msgs != NULL) {
+		TALLOC_FREE(c->in_msgs);
+	}
+
+	TALLOC_FREE(c->read_fde);
+	close(c->sock);
 
 	if (getpid() == c->pid) {
 		struct sun_path_buf name;
 		int ret;
 
 		ret = snprintf(name.buf, sizeof(name.buf), "%s/%u",
+			       c->socket_dir.buf, (unsigned)c->pid);
+		if ((ret < 0) || ((size_t)ret >= sizeof(name.buf))) {
+			/*
+			 * We've checked the length when creating, so this
+			 * should never happen
+			 */
+			abort();
+		}
+		unlink(name.buf);
+
+		ret = snprintf(name.buf, sizeof(name.buf), "%s/%u",
 			       c->lockfile_dir.buf, (unsigned)c->pid);
 		if ((ret < 0) || ((size_t)ret >= sizeof(name.buf))) {
 			/*
@@ -293,6 +972,174 @@ static int messaging_dgm_context_destructor(struct messaging_dgm_context *c)
 	return 0;
 }
 
+static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
+			       uint8_t *msg, size_t msg_len,
+			       int *fds, size_t num_fds);
+
+static void messaging_dgm_read_handler(struct tevent_context *ev,
+				       struct tevent_fd *fde,
+				       uint16_t flags,
+				       void *private_data)
+{
+	struct messaging_dgm_context *ctx = talloc_get_type_abort(
+		private_data, struct messaging_dgm_context);
+	ssize_t received;
+	struct msghdr msg;
+	struct iovec iov;
+	size_t msgbufsize = msghdr_prep_recv_fds(NULL, NULL, 0, INT8_MAX);
+	uint8_t msgbuf[msgbufsize];
+	uint8_t buf[MESSAGING_DGM_FRAGMENT_LENGTH];
+
+	if ((flags & TEVENT_FD_READ) == 0) {
+		return;
+	}
+
+	iov = (struct iovec) { .iov_base = buf, .iov_len = sizeof(buf) };
+	msg = (struct msghdr) { .msg_iov = &iov, .msg_iovlen = 1 };
+
+	msghdr_prep_recv_fds(&msg, msgbuf, msgbufsize, INT8_MAX);
+
+#ifdef MSG_CMSG_CLOEXEC
+	flags |= MSG_CMSG_CLOEXEC;
+#endif
+
+	received = recvmsg(ctx->sock, &msg, 0);
+	if (received == -1) {
+		if ((errno == EAGAIN) ||
+		    (errno == EWOULDBLOCK) ||
+		    (errno == EINTR) ||
+		    (errno == ENOMEM)) {
+			/* Not really an error - just try again. */
+			return;
+		}
+		/* Problem with the socket. Set it unreadable. */
+		tevent_fd_set_flags(ctx->read_fde, 0);
+		return;
+	}
+
+	if ((size_t)received > sizeof(buf)) {
+		/* More than we expected, not for us */
+		return;
+	}
+
+	{
+		size_t num_fds = msghdr_extract_fds(&msg, NULL, 0);
+		size_t i;
+		int fds[num_fds];
+
+		msghdr_extract_fds(&msg, fds, num_fds);
+
+		for (i = 0; i < num_fds; i++) {
+			int err;
+
+			err = prepare_socket_cloexec(fds[i]);
+			if (err != 0) {
+				close_fd_array(fds, num_fds);
+				num_fds = 0;
+			}
+		}
+
+		messaging_dgm_recv(ctx, buf, received, fds, num_fds);
+	}
+
+}
+
+static int messaging_dgm_in_msg_destructor(struct messaging_dgm_in_msg *m)
+{
+	DLIST_REMOVE(m->ctx->in_msgs, m);
+	return 0;
+}
+
+static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
+			       uint8_t *buf, size_t buflen,
+			       int *fds, size_t num_fds)
+{
+	struct messaging_dgm_fragment_hdr hdr;
+	struct messaging_dgm_in_msg *msg;
+	size_t space;
+	uint64_t cookie;
+
+	if (buflen < sizeof(cookie)) {
+		goto close_fds;
+	}
+	memcpy(&cookie, buf, sizeof(cookie));
+	buf += sizeof(cookie);
+	buflen -= sizeof(cookie);
+
+	if (cookie == 0) {
+		ctx->recv_cb(buf, buflen, fds, num_fds,
+			     ctx->recv_cb_private_data);
+		return;
+	}
+
+	if (buflen < sizeof(hdr)) {
+		goto close_fds;
+	}
+	memcpy(&hdr, buf, sizeof(hdr));
+	buf += sizeof(hdr);
+	buflen -= sizeof(hdr);
+
+	for (msg = ctx->in_msgs; msg != NULL; msg = msg->next) {
+		if ((msg->sender_pid == hdr.pid) &&
+		    (msg->sender_sock == hdr.sock)) {
+			break;
+		}
+	}
+
+	if ((msg != NULL) && (msg->cookie != cookie)) {
+		TALLOC_FREE(msg);
+	}
+
+	if (msg == NULL) {
+		size_t msglen;
+		msglen = offsetof(struct messaging_dgm_in_msg, buf) +
+			hdr.msglen;
+
+		msg = talloc_size(ctx, msglen);
+		if (msg == NULL) {
+			goto close_fds;
+		}
+		talloc_set_name_const(msg, "struct messaging_dgm_in_msg");
+
+		*msg = (struct messaging_dgm_in_msg) {
+			.ctx = ctx, .msglen = hdr.msglen,
+			.sender_pid = hdr.pid, .sender_sock = hdr.sock,
+			.cookie = cookie
+		};
+		DLIST_ADD(ctx->in_msgs, msg);
+		talloc_set_destructor(msg, messaging_dgm_in_msg_destructor);
+	}
+
+	space = msg->msglen - msg->received;
+	if (buflen > space) {
+		goto close_fds;
+	}
+
+	memcpy(msg->buf + msg->received, buf, buflen);
+	msg->received += buflen;
+
+	if (msg->received < msg->msglen) {
+		/*
+		 * Any valid sender will send the fds in the last
+		 * block. Invalid senders might have sent fd's that we
+		 * need to close here.
+		 */
+		goto close_fds;
+	}
+
+	DLIST_REMOVE(ctx->in_msgs, msg);
+	talloc_set_destructor(msg, NULL);
+
+	ctx->recv_cb(msg->buf, msg->msglen, fds, num_fds,
+		     ctx->recv_cb_private_data);
+
+	TALLOC_FREE(msg);
+	return;
+
+close_fds:
+	close_fd_array(fds, num_fds);
+}
+
 void messaging_dgm_destroy(void)
 {
 	TALLOC_FREE(global_dgm_context);
@@ -303,44 +1150,25 @@ int messaging_dgm_send(pid_t pid,
 		       const int *fds, size_t num_fds)
 {
 	struct messaging_dgm_context *ctx = global_dgm_context;
-	struct sockaddr_un dst;
-	ssize_t dst_pathlen;
+	struct messaging_dgm_out *out;
 	int ret;
 
 	if (ctx == NULL) {
 		return ENOTCONN;
 	}
 
-	dst = (struct sockaddr_un) { .sun_family = AF_UNIX };
-
-	dst_pathlen = snprintf(dst.sun_path, sizeof(dst.sun_path),
-			       "%s/%u", ctx->socket_dir.buf, (unsigned)pid);
-	if (dst_pathlen < 0) {
-		return errno;
-	}
-	if ((size_t)dst_pathlen >= sizeof(dst.sun_path)) {
-		return ENAMETOOLONG;
+	ret = messaging_dgm_out_get(ctx, pid, &out);
+	if (ret != 0) {
+		return ret;
 	}
 
 	DEBUG(10, ("%s: Sending message to %u\n", __func__, (unsigned)pid));
 
-	ret = unix_msg_send(ctx->dgm_ctx, &dst, iov, iovlen, fds, num_fds);
-
+	ret = messaging_dgm_out_send_fragmented(ctx->ev, out, iov, iovlen,
+						fds, num_fds);
 	return ret;
 }
 
-static void messaging_dgm_recv(struct unix_msg_ctx *ctx,
-			       uint8_t *msg, size_t msg_len,
-			       int *fds, size_t num_fds,
-			       void *private_data)
-{
-	struct messaging_dgm_context *dgm_ctx = talloc_get_type_abort(
-		private_data, struct messaging_dgm_context);
-
-	dgm_ctx->recv_cb(msg, msg_len, fds, num_fds,
-			 dgm_ctx->recv_cb_private_data);
-}
-
 static int messaging_dgm_read_unique(int fd, uint64_t *punique)
 {
 	char buf[25];
@@ -525,5 +1353,6 @@ void *messaging_dgm_register_tevent_context(TALLOC_CTX *mem_ctx,
 	if (ctx == NULL) {
 		return NULL;
 	}
-	return poll_funcs_tevent_register(mem_ctx, ctx->msg_callbacks, ev);
+	return tevent_add_fd(ev, mem_ctx, ctx->sock, TEVENT_FD_READ,
+			     messaging_dgm_read_handler, ctx);
 }
diff --git a/source3/wscript_build b/source3/wscript_build
index 21a76d3..1598555 100755
--- a/source3/wscript_build
+++ b/source3/wscript_build
@@ -302,8 +302,7 @@ bld.SAMBA3_SUBSYSTEM('TDB_LIB',
 
 bld.SAMBA3_LIBRARY('messages_dgm',
                    source='''lib/messages_dgm.c lib/messages_dgm_ref.c''',
-                   deps='''talloc UNIX_MSG POLL_FUNCS_TEVENT samba-debug
-                           genrand''',
+                   deps='''talloc samba-debug PTHREADPOOL msghdr genrand''',
                    private_library=True)
 
 bld.SAMBA3_LIBRARY('messages_util',
@@ -355,7 +354,6 @@ bld.SAMBA3_SUBSYSTEM('samba3core',
                         UTIL_PW
                         SAMBA_VERSION
                         PTHREADPOOL
-                        UNIX_MSG
                         POLL_FUNCS_TEVENT
                         interfaces
                         param
@@ -1459,7 +1457,6 @@ bld.SAMBA3_BINARY('spotlight2sparql',
 bld.RECURSE('auth')
 bld.RECURSE('libgpo/gpext')
 bld.RECURSE('lib/pthreadpool')
-bld.RECURSE('lib/unix_msg')
 bld.RECURSE('librpc')
 bld.RECURSE('librpc/idl')
 bld.RECURSE('libsmb')
-- 
2.7.4


>From 0df060e9c0a28e08af22861db3695ea12a7e1356 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Wed, 28 Sep 2016 14:35:21 -0700
Subject: [PATCH 14/26] lib: Remove unix_msg

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/lib/unix_msg/test_drain.c  |   83 ---
 source3/lib/unix_msg/test_source.c |   93 ---
 source3/lib/unix_msg/tests.c       |  271 ---------
 source3/lib/unix_msg/unix_msg.c    | 1094 ------------------------------------
 source3/lib/unix_msg/unix_msg.h    |  121 ----
 source3/lib/unix_msg/wscript_build |   18 -
 6 files changed, 1680 deletions(-)
 delete mode 100644 source3/lib/unix_msg/test_drain.c
 delete mode 100644 source3/lib/unix_msg/test_source.c
 delete mode 100644 source3/lib/unix_msg/tests.c
 delete mode 100644 source3/lib/unix_msg/unix_msg.c
 delete mode 100644 source3/lib/unix_msg/unix_msg.h
 delete mode 100644 source3/lib/unix_msg/wscript_build

diff --git a/source3/lib/unix_msg/test_drain.c b/source3/lib/unix_msg/test_drain.c
deleted file mode 100644
index 675ac6f..0000000
--- a/source3/lib/unix_msg/test_drain.c
+++ /dev/null
@@ -1,83 +0,0 @@
-#include "replace.h"
-#include "unix_msg.h"
-#include "poll_funcs/poll_funcs_tevent.h"
-#include "tevent.h"
-#include "system/select.h"
-
-struct cb_state {
-	unsigned num_received;
-	uint8_t *buf;
-	size_t buflen;
-};
-
-static void recv_cb(struct unix_msg_ctx *ctx,
-		    uint8_t *msg, size_t msg_len,
-		    int *fds, size_t num_fds,
-		    void *private_data);
-
-int main(int argc, const char *argv[])
-{
-	struct poll_funcs *funcs;
-	void *handle;
-	struct sockaddr_un addr;
-	struct unix_msg_ctx *ctx;
-	struct tevent_context *ev;
-	int ret;
-
-	struct cb_state state;
-
-	if (argc != 2) {
-		fprintf(stderr, "Usage: %s <sockname>\n", argv[0]);
-		return 1;
-	}
-
-	addr = (struct sockaddr_un) { .sun_family = AF_UNIX };
-	strlcpy(addr.sun_path, argv[1], sizeof(addr.sun_path));
-	unlink(addr.sun_path);
-
-	ev = tevent_context_init(NULL);
-	if (ev == NULL) {
-		perror("tevent_context_init failed");
-		return 1;
-	}
-	funcs = poll_funcs_init_tevent(ev);
-	if (funcs == NULL) {
-		fprintf(stderr, "poll_funcs_init_tevent failed\n");
-		return 1;
-	}
-
-	handle = poll_funcs_tevent_register(ev, funcs, ev);
-	if (handle == NULL) {
-		fprintf(stderr, "poll_funcs_tevent_register failed\n");
-		exit(1);
-	}
-
-	ret = unix_msg_init(&addr, funcs, 256, recv_cb, &state, &ctx);
-	if (ret != 0) {
-		fprintf(stderr, "unix_msg_init failed: %s\n",
-			strerror(ret));
-		return 1;
-	}
-
-	while (1) {
-		ret = tevent_loop_once(ev);
-		if (ret == -1) {
-			fprintf(stderr, "tevent_loop_once failed: %s\n",
-				strerror(errno));
-			exit(1);
-		}
-	}
-	return 0;
-}
-
-static void recv_cb(struct unix_msg_ctx *ctx,
-		    uint8_t *msg, size_t msg_len,
-		    int *fds, size_t num_fds,
-		    void *private_data)
-{
-	unsigned num;
-	if (msg_len == sizeof(num)) {
-		memcpy(&num, msg, msg_len);
-		printf("%u\n", num);
-	}
-}
diff --git a/source3/lib/unix_msg/test_source.c b/source3/lib/unix_msg/test_source.c
deleted file mode 100644
index 3b65267..0000000
--- a/source3/lib/unix_msg/test_source.c
+++ /dev/null
@@ -1,93 +0,0 @@
-#include "replace.h"
-#include "unix_msg.h"
-#include "poll_funcs/poll_funcs_tevent.h"
-#include "tevent.h"
-
-int main(int argc, const char *argv[])
-{
-	struct poll_funcs *funcs;
-	void *tevent_handle;
-	struct unix_msg_ctx **ctxs;
-	struct tevent_context *ev;
-	struct iovec iov;
-	int ret;
-	unsigned i;
-	unsigned num_ctxs = 1;
-	struct sockaddr_un dst;
-
-	if (argc < 2) {
-		fprintf(stderr, "Usage: %s <sockname> [num_contexts]\n", argv[0]);
-		return 1;
-	}
-	if (argc > 2) {
-		num_ctxs = atoi(argv[2]);
-	}
-
-	ev = tevent_context_init(NULL);
-	if (ev == NULL) {
-		perror("tevent_context_init failed");
-		return 1;
-	}
-	funcs = poll_funcs_init_tevent(NULL);
-	if (funcs == NULL) {
-		fprintf(stderr, "poll_funcs_init_tevent failed\n");
-		return 1;
-	}
-	tevent_handle = poll_funcs_tevent_register(NULL, funcs, ev);
-	if (tevent_handle == NULL) {
-		fprintf(stderr, "poll_funcs_tevent_register failed\n");
-		return 1;
-	}
-
-	ctxs = talloc_array(ev, struct unix_msg_ctx *, num_ctxs);
-	if (ctxs == NULL) {
-		fprintf(stderr, "talloc failed\n");
-		return 1;
-	}
-
-	for (i=0; i<num_ctxs; i++) {
-		ret = unix_msg_init(NULL, funcs, 256, NULL, NULL,
-				    &ctxs[i]);
-		if (ret != 0) {
-			fprintf(stderr, "unix_msg_init failed: %s\n",
-				strerror(ret));
-			return 1;
-		}
-	}
-
-	iov.iov_base = &i;
-	iov.iov_len = sizeof(i);
-
-	dst = (struct sockaddr_un) { .sun_family = AF_UNIX };
-	strlcpy(dst.sun_path, argv[1], sizeof(dst.sun_path));
-
-	for (i=0; i<num_ctxs; i++) {
-		unsigned j;
-
-		for (j=0; j<100000; j++) {
-			ret = unix_msg_send(ctxs[i], &dst, &iov, 1, NULL, 0);
-			if (ret != 0) {
-				fprintf(stderr, "unix_msg_send failed: %s\n",
-					strerror(ret));
-				return 1;
-			}
-		}
-	}
-
-	while (true) {
-		ret = tevent_loop_once(ev);
-		if (ret == -1) {
-			fprintf(stderr, "tevent_loop_once failed: %s\n",
-				strerror(errno));
-			exit(1);
-		}
-	}
-
-	for (i=0; i<num_ctxs; i++) {
-		unix_msg_free(ctxs[i]);
-	}
-
-	talloc_free(ev);
-
-	return 0;
-}
diff --git a/source3/lib/unix_msg/tests.c b/source3/lib/unix_msg/tests.c
deleted file mode 100644
index c743c37..0000000
--- a/source3/lib/unix_msg/tests.c
+++ /dev/null
@@ -1,271 +0,0 @@
-#include "replace.h"
-#include "unix_msg.h"
-#include "poll_funcs/poll_funcs_tevent.h"
-#include "tevent.h"
-
-struct cb_state {
-	unsigned num_received;
-	uint8_t *buf;
-	size_t buflen;
-};
-
-static void recv_cb(struct unix_msg_ctx *ctx,
-		    uint8_t *msg, size_t msg_len,
-		    int *fds, size_t num_fds,
-		    void *private_data);
-
-static void expect_messages(struct tevent_context *ev, struct cb_state *state,
-			    unsigned num_msgs)
-{
-	state->num_received = 0;
-
-	while (state->num_received < num_msgs) {
-		int ret;
-
-		ret = tevent_loop_once(ev);
-		if (ret == -1) {
-			fprintf(stderr, "tevent_loop_once failed: %s\n",
-				strerror(errno));
-			exit(1);
-		}
-	}
-}
-
-int main(void)
-{
-	struct poll_funcs *funcs;
-	void *tevent_handle;
-	struct sockaddr_un addr1, addr2;
-	struct unix_msg_ctx *ctx1, *ctx2;
-	struct tevent_context *ev;
-	struct iovec iov;
-	uint8_t msg;
-	int i, ret;
-	static uint8_t buf[1755];
-
-	struct cb_state state;
-
-	addr1 = (struct sockaddr_un) { .sun_family = AF_UNIX };
-	strlcpy(addr1.sun_path, "sock1", sizeof(addr1.sun_path));
-	unlink(addr1.sun_path);
-
-	addr2 = (struct sockaddr_un) { .sun_family = AF_UNIX };
-	strlcpy(addr2.sun_path, "sock2", sizeof(addr2.sun_path));
-	unlink(addr2.sun_path);
-
-	ev = tevent_context_init(NULL);
-	if (ev == NULL) {
-		perror("tevent_context_init failed");
-		return 1;
-	}
-
-	funcs = poll_funcs_init_tevent(ev);
-	if (funcs == NULL) {
-		fprintf(stderr, "poll_funcs_init_tevent failed\n");
-		return 1;
-	}
-	tevent_handle = poll_funcs_tevent_register(ev, funcs, ev);
-	if (tevent_handle == NULL) {
-		fprintf(stderr, "poll_funcs_register_tevent failed\n");
-		return 1;
-	}
-
-	ret = unix_msg_init(&addr1, funcs, 256, recv_cb, &state, &ctx1);
-	if (ret != 0) {
-		fprintf(stderr, "unix_msg_init failed: %s\n",
-			strerror(ret));
-		return 1;
-	}
-
-	ret = unix_msg_init(&addr1, funcs, 256, recv_cb, &state, &ctx1);
-	if (ret == 0) {
-		fprintf(stderr, "unix_msg_init succeeded unexpectedly\n");
-		return 1;
-	}
-	if (ret != EADDRINUSE) {
-		fprintf(stderr, "unix_msg_init returned %s, expected "
-			"EADDRINUSE\n", strerror(ret));
-		return 1;
-	}
-
-	ret = unix_msg_init(&addr2, funcs, 256, recv_cb, &state, &ctx2);
-	if (ret != 0) {
-		fprintf(stderr, "unix_msg_init failed: %s\n",
-			strerror(ret));
-		return 1;
-	}
-
-	printf("sending a 0-length message\n");
-
-	state.buf = NULL;
-	state.buflen = 0;
-
-	ret = unix_msg_send(ctx1, &addr2, NULL, 0, NULL, 0);
-	if (ret != 0) {
-		fprintf(stderr, "unix_msg_send failed: %s\n",
-			strerror(ret));
-		return 1;
-	}
-
-	expect_messages(ev, &state, 1);
-
-	printf("sending a small message\n");
-
-	msg = random();
-	iov.iov_base = &msg;
-	iov.iov_len = sizeof(msg);
-	state.buf = &msg;
-	state.buflen = sizeof(msg);
-
-	ret = unix_msg_send(ctx1, &addr2, &iov, 1, NULL, 0);
-	if (ret != 0) {
-		fprintf(stderr, "unix_msg_send failed: %s\n",
-			strerror(ret));
-		return 1;
-	}
-
-	expect_messages(ev, &state, 1);
-
-	printf("test send queue caching\n");
-
-	/*
-	 * queues are cached for some time, so this tests sending
-	 * still works after the cache expires and the queue was
-	 * freed.
-	 */
-	sleep(SENDQ_CACHE_TIME_SECS + 1);
-	ret = tevent_loop_once(ev);
-	if (ret == -1) {
-		fprintf(stderr, "tevent_loop_once failed: %s\n",
-			strerror(errno));
-		exit(1);
-	}
-
-	msg = random();
-	iov.iov_base = &msg;
-	iov.iov_len = sizeof(msg);
-	state.buf = &msg;
-	state.buflen = sizeof(msg);
-
-	ret = unix_msg_send(ctx1, &addr2, &iov, 1, NULL, 0);
-	if (ret != 0) {
-		fprintf(stderr, "unix_msg_send failed: %s\n",
-			strerror(ret));
-		return 1;
-	}
-
-	expect_messages(ev, &state, 1);
-
-	printf("sending six large, interleaved messages\n");
-
-	for (i=0; i<sizeof(buf); i++) {
-		buf[i] = random();
-	}
-
-	iov.iov_base = buf;
-	iov.iov_len = sizeof(buf);
-	state.buf = buf;
-	state.buflen = sizeof(buf);
-
-	for (i=0; i<3; i++) {
-		ret = unix_msg_send(ctx1, &addr2, &iov, 1, NULL, 0);
-		if (ret != 0) {
-			fprintf(stderr, "unix_msg_send failed: %s\n",
-				strerror(ret));
-			return 1;
-		}
-		ret = unix_msg_send(ctx2, &addr2, &iov, 1, NULL, 0);
-		if (ret != 0) {
-			fprintf(stderr, "unix_msg_send failed: %s\n",
-				strerror(ret));
-			return 1;
-		}
-	}
-
-	expect_messages(ev, &state, 6);
-
-	printf("sending a few messages in small pieces\n");
-
-	for (i = 0; i<5; i++) {
-		struct iovec iovs[20];
-		const size_t num_iovs = ARRAY_SIZE(iovs);
-		uint8_t *p = buf;
-		size_t j;
-
-		for (j=0; j<num_iovs-1; j++) {
-			size_t chunk = (random() % ((sizeof(buf) * 2) / num_iovs));
-			size_t space = (sizeof(buf) - (p - buf));
-
-			if (space == 0) {
-				break;
-			}
-
-			chunk = MIN(chunk, space);
-
-			iovs[j].iov_base = p;
-			iovs[j].iov_len = chunk;
-			p += chunk;
-		}
-
-		if (p < (buf + sizeof(buf))) {
-			iovs[j].iov_base = p;
-			iovs[j].iov_len = (sizeof(buf) - (p - buf));
-			j++;
-		}
-
-		ret = unix_msg_send(ctx1, &addr1, iovs, j, NULL, 0);
-		if (ret != 0) {
-			fprintf(stderr, "unix_msg_send failed: %s\n",
-				strerror(ret));
-			return 1;
-		}
-	}
-
-	expect_messages(ev, &state, 5);
-
-	printf("Filling send queues before freeing\n");
-
-	for (i=0; i<5; i++) {
-		ret = unix_msg_send(ctx1, &addr2, &iov, 1, NULL, 0);
-		if (ret != 0) {
-			fprintf(stderr, "unix_msg_send failed: %s\n",
-				strerror(ret));
-			return 1;
-		}
-		ret = unix_msg_send(ctx1, &addr1, &iov, 1, NULL, 0);
-		if (ret != 0) {
-			fprintf(stderr, "unix_msg_send failed: %s\n",
-				strerror(ret));
-			return 1;
-		}
-	}
-
-	expect_messages(ev, &state, 1); /* Read just one msg */
-
-	unix_msg_free(ctx1);
-	unix_msg_free(ctx2);
-	talloc_free(tevent_handle);
-	talloc_free(funcs);
-	talloc_free(ev);
-
-	return 0;
-}
-
-static void recv_cb(struct unix_msg_ctx *ctx,
-		    uint8_t *msg, size_t msg_len,
-		    int *fds, size_t num_fds,
-		    void *private_data)
-{
-	struct cb_state *state = (struct cb_state *)private_data;
-
-	if (msg_len != state->buflen) {
-		fprintf(stderr, "expected %u bytes, got %u\n",
-			(unsigned)state->buflen, (unsigned)msg_len);
-		exit(1);
-	}
-	if ((msg_len != 0) && (memcmp(msg, state->buf, msg_len) != 0)) {
-		fprintf(stderr, "message content differs\n");
-		exit(1);
-	}
-	state->num_received += 1;
-}
diff --git a/source3/lib/unix_msg/unix_msg.c b/source3/lib/unix_msg/unix_msg.c
deleted file mode 100644
index 5cbf428..0000000
--- a/source3/lib/unix_msg/unix_msg.c
+++ /dev/null
@@ -1,1094 +0,0 @@
-/*
- * Unix SMB/CIFS implementation.
- * Copyright (C) Volker Lendecke 2013
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program.  If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "replace.h"
-#include "unix_msg.h"
-#include "system/select.h"
-#include "system/time.h"
-#include "system/network.h"
-#include "lib/util/dlinklist.h"
-#include "pthreadpool/pthreadpool_pipe.h"
-#include "lib/util/iov_buf.h"
-#include "lib/util/msghdr.h"
-#include <fcntl.h>
-#include "lib/util/time.h"
-
-/*
- * This file implements two abstractions: The "unix_dgram" functions implement
- * queueing for unix domain datagram sockets. You can send to a destination
- * socket, and if that has no free space available, it will fall back to an
- * anonymous socket that will poll for writability. "unix_dgram" expects the
- * data size not to exceed the system limit.
- *
- * The "unix_msg" functions implement the fragmentation of large messages on
- * top of "unix_dgram". This is what is exposed to the user of this API.
- */
-
-struct unix_dgram_msg {
-	struct unix_dgram_msg *prev, *next;
-
-	int sock;
-	ssize_t sent;
-	int sys_errno;
-};
-
-struct unix_dgram_send_queue {
-	struct unix_dgram_send_queue *prev, *next;
-	struct unix_dgram_ctx *ctx;
-	int sock;
-	struct unix_dgram_msg *msgs;
-	struct poll_timeout *timeout;
-	char path[];
-};
-
-struct unix_dgram_ctx {
-	int sock;
-	pid_t created_pid;
-	const struct poll_funcs *ev_funcs;
-	size_t max_msg;
-
-	void (*recv_callback)(struct unix_dgram_ctx *ctx,
-			      uint8_t *msg, size_t msg_len,
-			      int *fds, size_t num_fds,
-			      void *private_data);
-	void *private_data;
-
-	struct poll_watch *sock_read_watch;
-	struct unix_dgram_send_queue *send_queues;
-
-	struct pthreadpool_pipe *send_pool;
-	struct poll_watch *pool_read_watch;
-
-	uint8_t *recv_buf;
-	char path[];
-};
-
-static void unix_dgram_recv_handler(struct poll_watch *w, int fd, short events,
-				    void *private_data);
-
-/* Set socket non blocking. */
-static int prepare_socket_nonblock(int sock, bool nonblock)
-{
-	int flags;
-#ifdef O_NONBLOCK
-#define FLAG_TO_SET O_NONBLOCK
-#else
-#ifdef SYSV
-#define FLAG_TO_SET O_NDELAY
-#else /* BSD */
-#define FLAG_TO_SET FNDELAY
-#endif
-#endif
-
-	flags = fcntl(sock, F_GETFL);
-	if (flags == -1) {
-		return errno;
-	}
-	if (nonblock) {
-		flags |= FLAG_TO_SET;
-	} else {
-		flags &= ~FLAG_TO_SET;
-	}
-	if (fcntl(sock, F_SETFL, flags) == -1) {
-		return errno;
-	}
-
-#undef FLAG_TO_SET
-	return 0;
-}
-
-/* Set socket close on exec. */
-static int prepare_socket_cloexec(int sock)
-{
-#ifdef FD_CLOEXEC
-	int flags;
-
-	flags = fcntl(sock, F_GETFD, 0);
-	if (flags == -1) {
-		return errno;
-	}
-	flags |= FD_CLOEXEC;
-	if (fcntl(sock, F_SETFD, flags) == -1) {
-		return errno;
-	}
-#endif
-	return 0;
-}
-
-/* Set socket non blocking and close on exec. */
-static int prepare_socket(int sock)
-{
-	int ret = prepare_socket_nonblock(sock, true);
-
-	if (ret) {
-		return ret;
-	}
-	return prepare_socket_cloexec(sock);
-}
-
-static size_t unix_dgram_msg_size(void)
-{
-	size_t msgsize = sizeof(struct unix_dgram_msg);
-	msgsize = (msgsize + 15) & ~15; /* align to 16 */
-	return msgsize;
-}
-
-static struct msghdr_buf *unix_dgram_msghdr(struct unix_dgram_msg *msg)
-{
-	/*
-	 * Not portable in C99, but "msg" is aligned and so is
-	 * unix_dgram_msg_size()
-	 */
-	return (struct msghdr_buf *)(((char *)msg) + unix_dgram_msg_size());
-}
-
-static void close_fd_array(int *fds, size_t num_fds)
-{
-	size_t i;
-
-	for (i = 0; i < num_fds; i++) {
-		if (fds[i] == -1) {
-			continue;
-		}
-
-		close(fds[i]);
-		fds[i] = -1;
-	}
-}
-
-static void close_fd_array_dgram_msg(struct unix_dgram_msg *dmsg)
-{
-	struct msghdr_buf *hdr = unix_dgram_msghdr(dmsg);
-	struct msghdr *msg = msghdr_buf_msghdr(hdr);
-	size_t num_fds = msghdr_extract_fds(msg, NULL, 0);
-	int fds[num_fds];
-
-	msghdr_extract_fds(msg, fds, num_fds);
-
-	close_fd_array(fds, num_fds);
-}
-
-static int unix_dgram_init(const struct sockaddr_un *addr, size_t max_msg,
-			   const struct poll_funcs *ev_funcs,
-			   void (*recv_callback)(struct unix_dgram_ctx *ctx,
-						 uint8_t *msg, size_t msg_len,
-						 int *fds, size_t num_fds,
-						 void *private_data),
-			   void *private_data,
-			   struct unix_dgram_ctx **result)
-{
-	struct unix_dgram_ctx *ctx;
-	size_t pathlen;
-	int ret;
-
-	if (addr != NULL) {
-		pathlen = strlen(addr->sun_path)+1;
-	} else {
-		pathlen = 1;
-	}
-
-	ctx = malloc(offsetof(struct unix_dgram_ctx, path) + pathlen);
-	if (ctx == NULL) {
-		return ENOMEM;
-	}
-	if (addr != NULL) {
-		memcpy(ctx->path, addr->sun_path, pathlen);
-	} else {
-		ctx->path[0] = '\0';
-	}
-
-	*ctx = (struct unix_dgram_ctx) {
-		.max_msg = max_msg,
-		.ev_funcs = ev_funcs,
-		.recv_callback = recv_callback,
-		.private_data = private_data,
-		.created_pid = (pid_t)-1
-	};
-
-	ctx->recv_buf = malloc(max_msg);
-	if (ctx->recv_buf == NULL) {
-		free(ctx);
-		return ENOMEM;
-	}
-
-	ctx->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
-	if (ctx->sock == -1) {
-		ret = errno;
-		goto fail_free;
-	}
-
-	/* Set non-blocking and close-on-exec. */
-	ret = prepare_socket(ctx->sock);
-	if (ret != 0) {
-		goto fail_close;
-	}
-
-	if (addr != NULL) {
-		ret = bind(ctx->sock,
-			   (const struct sockaddr *)(const void *)addr,
-			   sizeof(*addr));
-		if (ret == -1) {
-			ret = errno;
-			goto fail_close;
-		}
-
-		ctx->created_pid = getpid();
-
-		ctx->sock_read_watch = ctx->ev_funcs->watch_new(
-			ctx->ev_funcs, ctx->sock, POLLIN,
-			unix_dgram_recv_handler, ctx);
-
-		if (ctx->sock_read_watch == NULL) {
-			ret = ENOMEM;
-			goto fail_close;
-		}
-	}
-
-	*result = ctx;
-	return 0;
-
-fail_close:
-	close(ctx->sock);
-fail_free:
-	free(ctx->recv_buf);
-	free(ctx);
-	return ret;
-}
-
-static void unix_dgram_recv_handler(struct poll_watch *w, int fd, short events,
-				    void *private_data)
-{
-	struct unix_dgram_ctx *ctx = (struct unix_dgram_ctx *)private_data;
-	ssize_t received;
-	int flags = 0;
-	struct msghdr msg;
-	struct iovec iov;
-	size_t bufsize = msghdr_prep_recv_fds(NULL, NULL, 0, INT8_MAX);
-	uint8_t buf[bufsize];
-
-	iov = (struct iovec) {
-		.iov_base = (void *)ctx->recv_buf,
-		.iov_len = ctx->max_msg,
-	};
-
-	msg = (struct msghdr) {
-		.msg_iov = &iov,
-		.msg_iovlen = 1,
-	};
-
-	msghdr_prep_recv_fds(&msg, buf, bufsize, INT8_MAX);
-
-#ifdef MSG_CMSG_CLOEXEC
-	flags |= MSG_CMSG_CLOEXEC;
-#endif
-
-	received = recvmsg(fd, &msg, flags);
-	if (received == -1) {
-		if ((errno == EAGAIN) ||
-		    (errno == EWOULDBLOCK) ||
-		    (errno == EINTR) || (errno == ENOMEM)) {
-			/* Not really an error - just try again. */
-			return;
-		}
-		/* Problem with the socket. Set it unreadable. */
-		ctx->ev_funcs->watch_update(w, 0);
-		return;
-	}
-	if (received > ctx->max_msg) {
-		/* More than we expected, not for us */
-		return;
-	}
-
-	{
-		size_t num_fds = msghdr_extract_fds(&msg, NULL, 0);
-		int fds[num_fds];
-		int i;
-
-		msghdr_extract_fds(&msg, fds, num_fds);
-
-		for (i = 0; i < num_fds; i++) {
-			int err;
-
-			err = prepare_socket_cloexec(fds[i]);
-			if (err != 0) {
-				close_fd_array(fds, num_fds);
-				num_fds = 0;
-			}
-		}
-
-		ctx->recv_callback(ctx, ctx->recv_buf, received,
-				   fds, num_fds, ctx->private_data);
-	}
-}
-
-static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
-				    void *private_data);
-
-static int unix_dgram_init_pthreadpool(struct unix_dgram_ctx *ctx)
-{
-	int ret, signalfd;
-
-	if (ctx->send_pool != NULL) {
-		return 0;
-	}
-
-	ret = pthreadpool_pipe_init(0, &ctx->send_pool);
-	if (ret != 0) {
-		return ret;
-	}
-
-	signalfd = pthreadpool_pipe_signal_fd(ctx->send_pool);
-
-	ctx->pool_read_watch = ctx->ev_funcs->watch_new(
-		ctx->ev_funcs, signalfd, POLLIN,
-		unix_dgram_job_finished, ctx);
-	if (ctx->pool_read_watch == NULL) {
-		pthreadpool_pipe_destroy(ctx->send_pool);
-		ctx->send_pool = NULL;
-		return ENOMEM;
-	}
-
-	return 0;
-}
-
-static int unix_dgram_sendq_schedule_free(struct unix_dgram_send_queue *q);
-
-static int unix_dgram_send_queue_init(
-	struct unix_dgram_ctx *ctx, const struct sockaddr_un *dst,
-	struct unix_dgram_send_queue **result)
-{
-	struct unix_dgram_send_queue *q;
-	size_t pathlen;
-	int ret, err;
-
-	pathlen = strlen(dst->sun_path)+1;
-
-	q = malloc(offsetof(struct unix_dgram_send_queue, path) + pathlen);
-	if (q == NULL) {
-		return ENOMEM;
-	}
-	q->ctx = ctx;
-	q->msgs = NULL;
-	q->timeout = NULL;
-	memcpy(q->path, dst->sun_path, pathlen);
-
-	q->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
-	if (q->sock == -1) {
-		err = errno;
-		goto fail_free;
-	}
-
-	err = prepare_socket(q->sock);
-	if (err != 0) {
-		goto fail_close;
-	}
-
-	do {
-		ret = connect(q->sock,
-			      (const struct sockaddr *)(const void *)dst,
-			      sizeof(*dst));
-	} while ((ret == -1) && (errno == EINTR));
-
-	if (ret == -1) {
-		err = errno;
-		goto fail_close;
-	}
-
-	err = unix_dgram_init_pthreadpool(ctx);
-	if (err != 0) {
-		goto fail_close;
-	}
-
-	ret = unix_dgram_sendq_schedule_free(q);
-	if (ret != 0) {
-		err = ENOMEM;
-		goto fail_close;
-	}
-
-	DLIST_ADD(ctx->send_queues, q);
-
-	*result = q;
-	return 0;
-
-fail_close:
-	close(q->sock);
-fail_free:
-	free(q);
-	return err;
-}
-
-static void unix_dgram_send_queue_free(struct unix_dgram_send_queue *q)
-{
-	struct unix_dgram_ctx *ctx = q->ctx;
-
-	while (q->msgs != NULL) {
-		struct unix_dgram_msg *msg;
-		msg = q->msgs;
-		DLIST_REMOVE(q->msgs, msg);
-		close_fd_array_dgram_msg(msg);
-		free(msg);
-	}
-	close(q->sock);
-	DLIST_REMOVE(ctx->send_queues, q);
-	ctx->ev_funcs->timeout_free(q->timeout);
-	free(q);
-}
-
-static void unix_dgram_sendq_scheduled_free_handler(
-	struct poll_timeout *t, void *private_data);
-
-static int unix_dgram_sendq_schedule_free(struct unix_dgram_send_queue *q)
-{
-	struct unix_dgram_ctx *ctx = q->ctx;
-	struct timeval timeout;
-
-	if (q->timeout != NULL) {
-		return 0;
-	}
-
-	GetTimeOfDay(&timeout);
-	timeout.tv_sec += SENDQ_CACHE_TIME_SECS;
-
-	q->timeout = ctx->ev_funcs->timeout_new(
-		ctx->ev_funcs,
-		timeout,
-		unix_dgram_sendq_scheduled_free_handler,
-		q);
-	if (q->timeout == NULL) {
-		return ENOMEM;
-	}
-
-	return 0;
-}
-
-static void unix_dgram_sendq_scheduled_free_handler(struct poll_timeout *t,
-						    void *private_data)
-{
-	struct unix_dgram_send_queue *q = private_data;
-	int ret;
-
-	q->ctx->ev_funcs->timeout_free(q->timeout);
-	q->timeout = NULL;
-
-	if (q->msgs == NULL) {
-		unix_dgram_send_queue_free(q);
-		return;
-	}
-
-	ret = unix_dgram_sendq_schedule_free(q);
-	if (ret != 0) {
-		unix_dgram_send_queue_free(q);
-		return;
-	}
-}
-
-static int find_send_queue(struct unix_dgram_ctx *ctx,
-			   const struct sockaddr_un *dst,
-			   struct unix_dgram_send_queue **ps)
-{
-	struct unix_dgram_send_queue *s;
-	int ret;
-
-	for (s = ctx->send_queues; s != NULL; s = s->next) {
-		if (strcmp(s->path, dst->sun_path) == 0) {
-			*ps = s;
-			return 0;
-		}
-	}
-	ret = unix_dgram_send_queue_init(ctx, dst, &s);
-	if (ret != 0) {
-		return ret;
-	}
-	*ps = s;
-	return 0;
-}
-
-static int queue_msg(struct unix_dgram_send_queue *q,
-		     const struct iovec *iov, int iovcnt,
-		     const int *fds, size_t num_fds)
-{
-	struct unix_dgram_msg *msg;
-	struct msghdr_buf *hdr;
-	size_t msglen, needed;
-	ssize_t msghdrlen;
-	int fds_copy[MIN(num_fds, INT8_MAX)];
-	int i, ret;
-
-	for (i=0; i<num_fds; i++) {
-		fds_copy[i] = -1;
-	}
-
-	for (i = 0; i < num_fds; i++) {
-		fds_copy[i] = dup(fds[i]);
-		if (fds_copy[i] == -1) {
-			ret = errno;
-			goto fail;
-		}
-	}
-
-	msglen = unix_dgram_msg_size();
-
-	msghdrlen = msghdr_copy(NULL, 0, NULL, 0, iov, iovcnt,
-				fds_copy, num_fds);
-	if (msghdrlen == -1) {
-		ret = EMSGSIZE;
-		goto fail;
-	}
-
-	needed = msglen + msghdrlen;
-	if (needed < msglen) {
-		ret = EMSGSIZE;
-		goto fail;
-	}
-
-	msg = malloc(needed);
-	if (msg == NULL) {
-		ret = ENOMEM;
-		goto fail;
-	}
-	hdr = unix_dgram_msghdr(msg);
-
-	msg->sock = q->sock;
-	msghdr_copy(hdr, msghdrlen, NULL, 0, iov, iovcnt,
-		    fds_copy, num_fds);
-
-	DLIST_ADD_END(q->msgs, msg);
-	return 0;
-fail:
-	close_fd_array(fds_copy, num_fds);
-	return ret;
-}
-
-static void unix_dgram_send_job(void *private_data)
-{
-	struct unix_dgram_msg *dmsg = private_data;
-
-	do {
-		struct msghdr_buf *hdr = unix_dgram_msghdr(dmsg);
-		struct msghdr *msg = msghdr_buf_msghdr(hdr);
-		dmsg->sent = sendmsg(dmsg->sock, msg, 0);
-	} while ((dmsg->sent == -1) && (errno == EINTR));
-
-	if (dmsg->sent == -1) {
-		dmsg->sys_errno = errno;
-	}
-}
-
-static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
-				    void *private_data)
-{
-	struct unix_dgram_ctx *ctx = private_data;
-	struct unix_dgram_send_queue *q;
-	struct unix_dgram_msg *msg;
-	int ret, job;
-
-	ret = pthreadpool_pipe_finished_jobs(ctx->send_pool, &job, 1);
-	if (ret != 1) {
-		return;
-	}
-
-	for (q = ctx->send_queues; q != NULL; q = q->next) {
-		if (job == q->sock) {
-			break;
-		}
-	}
-
-	if (q == NULL) {
-		/* Huh? Should not happen */
-		return;
-	}
-
-	msg = q->msgs;
-	DLIST_REMOVE(q->msgs, msg);
-	close_fd_array_dgram_msg(msg);
-	free(msg);
-
-	if (q->msgs != NULL) {
-		ret = pthreadpool_pipe_add_job(ctx->send_pool, q->sock,
-					       unix_dgram_send_job, q->msgs);
-		if (ret != 0) {
-			unix_dgram_send_queue_free(q);
-			return;
-		}
-		return;
-	}
-
-	ret = prepare_socket_nonblock(q->sock, true);
-	if (ret != 0) {
-		unix_dgram_send_queue_free(q);
-	}
-}
-
-static int unix_dgram_send(struct unix_dgram_ctx *ctx,
-			   const struct sockaddr_un *dst,
-			   const struct iovec *iov, int iovlen,
-			   const int *fds, size_t num_fds)
-{
-	struct unix_dgram_send_queue *q;
-	struct msghdr msg;
-	ssize_t fdlen;
-	int ret;
-	int i;
-
-	if (num_fds > INT8_MAX) {
-		return EINVAL;
-	}
-
-#if !defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL) && !defined(HAVE_STRUCT_MSGHDR_MSG_ACCRIGHTS)
-	if (num_fds > 0) {
-		return ENOSYS;
-	}
-#endif
-
-	for (i = 0; i < num_fds; i++) {
-		/*
-		 * Make sure we only allow fd passing
-		 * for communication channels,
-		 * e.g. sockets, pipes, fifos, ...
-		 */
-		ret = lseek(fds[i], 0, SEEK_CUR);
-		if (ret == -1 && errno == ESPIPE) {
-			/* ok */
-			continue;
-		}
-
-		/*
-		 * Reject the message as we may need to call dup(),
-		 * if we queue the message.
-		 *
-		 * That might result in unexpected behavior for the caller
-		 * for files and broken posix locking.
-		 */
-		return EINVAL;
-	}
-
-	ret = find_send_queue(ctx, dst, &q);
-	if (ret != 0) {
-		return ret;
-	}
-
-	if (q->msgs) {
-		/*
-		 * To preserve message ordering, we have to queue a
-		 * message when others are waiting in line already.
-		 */
-		return queue_msg(q, iov, iovlen, fds, num_fds);
-	}
-
-	/*
-	 * Try a cheap nonblocking send
-	 */
-
-	msg = (struct msghdr) {
-		.msg_iov = discard_const_p(struct iovec, iov),
-		.msg_iovlen = iovlen
-	};
-
-	fdlen = msghdr_prep_fds(&msg, NULL, 0, fds, num_fds);
-	if (fdlen == -1) {
-		return EINVAL;
-	}
-
-	{
-		uint8_t buf[fdlen];
-		msghdr_prep_fds(&msg, buf, fdlen, fds, num_fds);
-
-		ret = sendmsg(q->sock, &msg, 0);
-	}
-
-	if (ret >= 0) {
-		return 0;
-	}
-	if ((errno != EWOULDBLOCK) &&
-	    (errno != EAGAIN) &&
-#ifdef ENOBUFS
-	    /* FreeBSD can give this for large messages */
-	    (errno != ENOBUFS) &&
-#endif
-	    (errno != EINTR)) {
-		return errno;
-	}
-
-	ret = queue_msg(q, iov, iovlen, fds, num_fds);
-	if (ret != 0) {
-		unix_dgram_send_queue_free(q);
-		return ret;
-	}
-
-	/*
-	 * While sending the messages via the pthreadpool, we set the
-	 * socket back to blocking mode. When the sendqueue becomes
-	 * empty and we could attempt direct sends again, the
-	 * finished-jobs-handler of the pthreadpool will set it back
-	 * to non-blocking.
-	 */
-	ret = prepare_socket_nonblock(q->sock, false);
-	if (ret != 0) {
-		unix_dgram_send_queue_free(q);
-		return ret;
-	}
-	ret = pthreadpool_pipe_add_job(ctx->send_pool, q->sock,
-				       unix_dgram_send_job, q->msgs);
-	if (ret != 0) {
-		unix_dgram_send_queue_free(q);
-		return ret;
-	}
-	return 0;
-}
-
-static int unix_dgram_sock(struct unix_dgram_ctx *ctx)
-{
-	return ctx->sock;
-}
-
-static int unix_dgram_free(struct unix_dgram_ctx *ctx)
-{
-	struct unix_dgram_send_queue *q;
-
-	for (q = ctx->send_queues; q != NULL;) {
-		struct unix_dgram_send_queue *q_next = q->next;
-
-		if (q->msgs != NULL) {
-			return EBUSY;
-		}
-		unix_dgram_send_queue_free(q);
-		q = q_next;
-	}
-
-	if (ctx->send_pool != NULL) {
-		int ret = pthreadpool_pipe_destroy(ctx->send_pool);
-		if (ret != 0) {
-			return ret;
-		}
-		ctx->ev_funcs->watch_free(ctx->pool_read_watch);
-	}
-
-	ctx->ev_funcs->watch_free(ctx->sock_read_watch);
-
-	close(ctx->sock);
-	if (getpid() == ctx->created_pid) {
-		/* If we created it, unlink. Otherwise someone else might
-		 * still have it open */
-		unlink(ctx->path);
-	}
-
-	free(ctx->recv_buf);
-	free(ctx);
-	return 0;
-}
-
-/*
- * Every message starts with a uint64_t cookie.
- *
- * A value of 0 indicates a single-fragment message which is complete in
- * itself. The data immediately follows the cookie.
- *
- * Every multi-fragment message has a cookie != 0 and starts with a cookie
- * followed by a struct unix_msg_header and then the data. The pid and sock
- * fields are used to assure uniqueness on the receiver side.
- */
-
-struct unix_msg_hdr {
-	size_t msglen;
-	pid_t pid;
-	int sock;
-};
-
-struct unix_msg {
-	struct unix_msg *prev, *next;
-	size_t msglen;
-	size_t received;
-	pid_t sender_pid;
-	int sender_sock;
-	uint64_t cookie;
-	uint8_t buf[1];
-};
-
-struct unix_msg_ctx {
-	struct unix_dgram_ctx *dgram;
-	size_t fragment_len;
-	uint64_t cookie;
-
-	void (*recv_callback)(struct unix_msg_ctx *ctx,
-			      uint8_t *msg, size_t msg_len,
-			      int *fds, size_t num_fds,
-			      void *private_data);
-	void *private_data;
-
-	struct unix_msg *msgs;
-};
-
-static void unix_msg_recv(struct unix_dgram_ctx *dgram_ctx,
-			  uint8_t *buf, size_t buflen,
-			  int *fds, size_t num_fds,
-			  void *private_data);
-
-int unix_msg_init(const struct sockaddr_un *addr,
-		  const struct poll_funcs *ev_funcs,
-		  size_t fragment_len,
-		  void (*recv_callback)(struct unix_msg_ctx *ctx,
-					uint8_t *msg, size_t msg_len,
-					int *fds, size_t num_fds,
-					void *private_data),
-		  void *private_data,
-		  struct unix_msg_ctx **result)
-{
-	struct unix_msg_ctx *ctx;
-	int ret;
-
-	ctx = malloc(sizeof(*ctx));
-	if (ctx == NULL) {
-		return ENOMEM;
-	}
-
-	*ctx = (struct unix_msg_ctx) {
-		.fragment_len = fragment_len,
-		.cookie = 1,
-		.recv_callback = recv_callback,
-		.private_data = private_data
-	};
-
-	ret = unix_dgram_init(addr, fragment_len, ev_funcs,
-			      unix_msg_recv, ctx, &ctx->dgram);
-	if (ret != 0) {
-		free(ctx);
-		return ret;
-	}
-
-	*result = ctx;
-	return 0;
-}
-
-int unix_msg_send(struct unix_msg_ctx *ctx, const struct sockaddr_un *dst,
-		  const struct iovec *iov, int iovlen,
-		  const int *fds, size_t num_fds)
-{
-	ssize_t msglen;
-	size_t sent;
-	int ret = 0;
-	struct iovec iov_copy[iovlen+2];
-	struct unix_msg_hdr hdr;
-	struct iovec src_iov;
-
-	if (iovlen < 0) {
-		return EINVAL;
-	}
-
-	msglen = iov_buflen(iov, iovlen);
-	if (msglen == -1) {
-		return EINVAL;
-	}
-
-	if (num_fds > INT8_MAX) {
-		return EINVAL;
-	}
-
-	if (msglen <= (ctx->fragment_len - sizeof(uint64_t))) {
-		uint64_t cookie = 0;
-
-		iov_copy[0].iov_base = &cookie;
-		iov_copy[0].iov_len = sizeof(cookie);
-		if (iovlen > 0) {
-			memcpy(&iov_copy[1], iov,
-			       sizeof(struct iovec) * iovlen);
-		}
-
-		return unix_dgram_send(ctx->dgram, dst, iov_copy, iovlen+1,
-				       fds, num_fds);
-	}
-
-	hdr = (struct unix_msg_hdr) {
-		.msglen = msglen,
-		.pid = getpid(),
-		.sock = unix_dgram_sock(ctx->dgram)
-	};
-
-	iov_copy[0].iov_base = &ctx->cookie;
-	iov_copy[0].iov_len = sizeof(ctx->cookie);
-	iov_copy[1].iov_base = &hdr;
-	iov_copy[1].iov_len = sizeof(hdr);
-
-	sent = 0;
-	src_iov = iov[0];
-
-	/*
-	 * The following write loop sends the user message in pieces. We have
-	 * filled the first two iovecs above with "cookie" and "hdr". In the
-	 * following loops we pull message chunks from the user iov array and
-	 * fill iov_copy piece by piece, possibly truncating chunks from the
-	 * caller's iov array. Ugly, but hopefully efficient.
-	 */
-
-	while (sent < msglen) {
-		size_t fragment_len;
-		size_t iov_index = 2;
-
-		fragment_len = sizeof(ctx->cookie) + sizeof(hdr);
-
-		while (fragment_len < ctx->fragment_len) {
-			size_t space, chunk;
-
-			space = ctx->fragment_len - fragment_len;
-			chunk = MIN(space, src_iov.iov_len);
-
-			iov_copy[iov_index].iov_base = src_iov.iov_base;
-			iov_copy[iov_index].iov_len = chunk;
-			iov_index += 1;
-
-			src_iov.iov_base = (char *)src_iov.iov_base + chunk;
-			src_iov.iov_len -= chunk;
-			fragment_len += chunk;
-
-			if (src_iov.iov_len == 0) {
-				iov += 1;
-				iovlen -= 1;
-				if (iovlen == 0) {
-					break;
-				}
-				src_iov = iov[0];
-			}
-		}
-		sent += (fragment_len - sizeof(ctx->cookie) - sizeof(hdr));
-
-		/*
-		 * only the last fragment should pass the fd array.
-		 * That simplifies the receiver a lot.
-		 */
-		if (sent < msglen) {
-			ret = unix_dgram_send(ctx->dgram, dst,
-					      iov_copy, iov_index,
-					      NULL, 0);
-		} else {
-			ret = unix_dgram_send(ctx->dgram, dst,
-					      iov_copy, iov_index,
-					      fds, num_fds);
-		}
-		if (ret != 0) {
-			break;
-		}
-	}
-
-	ctx->cookie += 1;
-	if (ctx->cookie == 0) {
-		ctx->cookie += 1;
-	}
-
-	return ret;
-}
-
-static void unix_msg_recv(struct unix_dgram_ctx *dgram_ctx,
-			  uint8_t *buf, size_t buflen,
-			  int *fds, size_t num_fds,
-			  void *private_data)
-{
-	struct unix_msg_ctx *ctx = (struct unix_msg_ctx *)private_data;
-	struct unix_msg_hdr hdr;
-	struct unix_msg *msg;
-	size_t space;
-	uint64_t cookie;
-
-	if (buflen < sizeof(cookie)) {
-		goto close_fds;
-	}
-
-	memcpy(&cookie, buf, sizeof(cookie));
-
-	buf += sizeof(cookie);
-	buflen -= sizeof(cookie);
-
-	if (cookie == 0) {
-		ctx->recv_callback(ctx, buf, buflen, fds, num_fds,
-				   ctx->private_data);
-		return;
-	}
-
-	if (buflen < sizeof(hdr)) {
-		goto close_fds;
-	}
-	memcpy(&hdr, buf, sizeof(hdr));
-
-	buf += sizeof(hdr);
-	buflen -= sizeof(hdr);
-
-	for (msg = ctx->msgs; msg != NULL; msg = msg->next) {
-		if ((msg->sender_pid == hdr.pid) &&
-		    (msg->sender_sock == hdr.sock)) {
-			break;
-		}
-	}
-
-	if ((msg != NULL) && (msg->cookie != cookie)) {
-		DLIST_REMOVE(ctx->msgs, msg);
-		free(msg);
-		msg = NULL;
-	}
-
-	if (msg == NULL) {
-		msg = malloc(offsetof(struct unix_msg, buf) + hdr.msglen);
-		if (msg == NULL) {
-			goto close_fds;
-		}
-		*msg = (struct unix_msg) {
-			.msglen = hdr.msglen,
-			.sender_pid = hdr.pid,
-			.sender_sock = hdr.sock,
-			.cookie = cookie
-		};
-		DLIST_ADD(ctx->msgs, msg);
-	}
-
-	space = msg->msglen - msg->received;
-	if (buflen > space) {
-		goto close_fds;
-	}
-
-	memcpy(msg->buf + msg->received, buf, buflen);
-	msg->received += buflen;
-
-	if (msg->received < msg->msglen) {
-		goto close_fds;
-	}
-
-	DLIST_REMOVE(ctx->msgs, msg);
-	ctx->recv_callback(ctx, msg->buf, msg->msglen, fds, num_fds,
-			   ctx->private_data);
-	free(msg);
-	return;
-
-close_fds:
-	close_fd_array(fds, num_fds);
-}
-
-int unix_msg_free(struct unix_msg_ctx *ctx)
-{
-	int ret;
-
-	ret = unix_dgram_free(ctx->dgram);
-	if (ret != 0) {
-		return ret;
-	}
-
-	while (ctx->msgs != NULL) {
-		struct unix_msg *msg = ctx->msgs;
-		DLIST_REMOVE(ctx->msgs, msg);
-		free(msg);
-	}
-
-	free(ctx);
-	return 0;
-}
diff --git a/source3/lib/unix_msg/unix_msg.h b/source3/lib/unix_msg/unix_msg.h
deleted file mode 100644
index 375d4ac..0000000
--- a/source3/lib/unix_msg/unix_msg.h
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Unix SMB/CIFS implementation.
- * Copyright (C) Volker Lendecke 2013
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program.  If not, see <http://www.gnu.org/licenses/>.
- */
-
-#ifndef __UNIX_MSG_H__
-#define __UNIX_MSG_H__
-
-#include "replace.h"
-#include "poll_funcs/poll_funcs.h"
-#include "system/network.h"
-
-/**
- * @file unix_msg.h
- *
- * @brief Send large messages over unix domain datagram sockets
- *
- * A unix_msg_ctx represents a unix domain datagram socket.
- *
- * Unix domain datagram sockets have some unique properties compared with UDP
- * sockets:
- *
- * - They are reliable, i.e. as long as both sender and receiver are processes
- *   that are alive, nothing is lost.
- *
- * - They preserve sequencing
- *
- * Based on these two properties, this code implements sending of large
- * messages. It aims at being maximally efficient for short, single-datagram
- * messages. Ideally, if the receiver queue is not full, sending a message
- * should be a single syscall without malloc. Receiving a message should also
- * not malloc anything before the data is shipped to the user.
- *
- * If unix_msg_send meets a full receive buffer, more effort is required: The
- * socket behind unix_msg_send is not pollable for POLLOUT, it will always be
- * writable: A datagram socket can send anywhere, the full queue is a property
- * of of the receiving socket. unix_msg_send creates a new unnamed socket that
- * it will connect(2) to the target socket. This unnamed socket is then
- * pollable for POLLOUT. The socket will be writable when the destination
- * socket's queue is drained sufficiently.
- *
- * If unix_msg_send is asked to send a message larger than fragment_size, it
- * will try sending the message in pieces with proper framing, the receiving
- * side will reassemble the messages.
- *
- * fd-passing is supported.
- * Note that by default the fds passed to recv_callback are closed by
- * the receive handler in order to avoid fd-leaks. If the provider of
- * the recv_callback wants to use a passed file descriptor after the
- * callback returns, it must copy the fd away and set the corresponding
- * entry in the "fds" array to -1.
- */
-
-/**
- * @brief Abstract structure representing a unix domain datagram socket
- */
-struct unix_msg_ctx;
-
-/**
- * @brief Initialize a struct unix_msg_ctx
- *
- * @param[in] path The socket path
- * @param[in] ev_funcs The event callback functions to use
- * @param[in] fragment_size Maximum datagram size to send/receive
- * @param[in] recv_callback Function called when a message is received
- * @param[in] private_data Private pointer for recv_callback
- * @param[out] result The new struct unix_msg_ctx
- * @return 0 on success, errno on failure
- */
-
-
-int unix_msg_init(const struct sockaddr_un *addr,
-		  const struct poll_funcs *ev_funcs,
-		  size_t fragment_size,
-		  void (*recv_callback)(struct unix_msg_ctx *ctx,
-					uint8_t *msg, size_t msg_len,
-					int *fds, size_t num_fds,
-					void *private_data),
-		  void *private_data,
-		  struct unix_msg_ctx **result);
-
-/**
- * @brief Send a message
- *
- * @param[in] ctx The context to send across
- * @param[in] dst_sock The destination socket path
- * @param[in] iov The message
- * @param[in] iovlen The number of iov structs
- * @param[in] fds - optional fd array
- * @param[in] num_fds - fd array size
- * @return 0 on success, errno on failure
- */
-
-int unix_msg_send(struct unix_msg_ctx *ctx, const struct sockaddr_un *dst,
-		  const struct iovec *iov, int iovlen,
-		  const int *fds, size_t num_fds);
-
-/**
- * @brief Free a unix_msg_ctx
- *
- * @param[in] ctx The message context to free
- * @return 0 on success, errno on failure (EBUSY)
- */
-int unix_msg_free(struct unix_msg_ctx *ctx);
-
-#define SENDQ_CACHE_TIME_SECS 10
-
-#endif
diff --git a/source3/lib/unix_msg/wscript_build b/source3/lib/unix_msg/wscript_build
deleted file mode 100644
index 469f87e..0000000
--- a/source3/lib/unix_msg/wscript_build
+++ /dev/null
@@ -1,18 +0,0 @@
-#!/usr/bin/env python
-
-bld.SAMBA3_SUBSYSTEM('UNIX_MSG',
-                     source='unix_msg.c',
-		     deps='replace PTHREADPOOL iov_buf msghdr time-basic')
-
-bld.SAMBA3_BINARY('unix_msg_test',
-                  source='tests.c',
-                  deps='UNIX_MSG POLL_FUNCS_TEVENT',
-                  install=False)
-bld.SAMBA3_BINARY('unix_msg_test_drain',
-                  source='test_drain.c',
-                  deps='UNIX_MSG POLL_FUNCS_TEVENT',
-                  install=False)
-bld.SAMBA3_BINARY('unix_msg_test_source',
-                  source='test_source.c',
-                  deps='UNIX_MSG POLL_FUNCS_TEVENT',
-                  install=False)
-- 
2.7.4


>From 0f5671fe45d53259a352de25f5122389b2ffb4c0 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Wed, 28 Sep 2016 14:44:03 -0700
Subject: [PATCH 15/26] lib: Remove poll_funcs

unix_msg was the only user

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 lib/poll_funcs/poll_funcs.h        | 131 -------
 lib/poll_funcs/poll_funcs_tevent.c | 684 -------------------------------------
 lib/poll_funcs/poll_funcs_tevent.h |  38 ---
 lib/poll_funcs/wscript_build       |   5 -
 source3/wscript_build              |   1 -
 wscript_build                      |   1 -
 6 files changed, 860 deletions(-)
 delete mode 100644 lib/poll_funcs/poll_funcs.h
 delete mode 100644 lib/poll_funcs/poll_funcs_tevent.c
 delete mode 100644 lib/poll_funcs/poll_funcs_tevent.h
 delete mode 100644 lib/poll_funcs/wscript_build

diff --git a/lib/poll_funcs/poll_funcs.h b/lib/poll_funcs/poll_funcs.h
deleted file mode 100644
index b16f07f..0000000
--- a/lib/poll_funcs/poll_funcs.h
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Unix SMB/CIFS implementation.
- * Copyright (C) Volker Lendecke 2013
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program.  If not, see <http://www.gnu.org/licenses/>.
- */
-
-/**
- * @file poll_funcs.h
- *
- * @brief event loop abstraction
- */
-
-/*
- * This is inspired by AvahiWatch, the avahi event loop abstraction.
- */
-
-#ifndef __POLL_FUNCS_H__
-#define __POLL_FUNCS_H__
-
-#include "replace.h"
-
-/**
- * poll_watch and poll_timeout are undefined here, every implementation can
- * implement its own structures.
- */
-
-struct poll_watch;
-struct poll_timeout;
-
-struct poll_funcs {
-
-	/**
-	 * @brief Create a new file descriptor watch
-	 *
-	 * @param[in] funcs The callback array
-	 * @param[in] fd The fd to watch
-	 * @param[in] events POLLIN and POLLOUT or'ed together
-	 * @param[in] callback Function to call by the implementation
-	 * @param[in] private_data Pointer to give back to callback
-	 *
-	 * @return A new poll_watch struct
-	 */
-
-	struct poll_watch *(*watch_new)(
-		const struct poll_funcs *funcs, int fd, short events,
-		void (*callback)(struct poll_watch *w, int fd,
-				 short events, void *private_data),
-		void *private_data);
-
-	/**
-	 * @brief Change the watched events for a struct poll_watch
-	 *
-	 * @param[in] w The poll_watch to change
-	 * @param[in] events new POLLIN and POLLOUT or'ed together
-	 */
-
-	void (*watch_update)(struct poll_watch *w, short events);
-
-	/**
-	 * @brief Read events currently watched
-	 *
-	 * @param[in] w The poll_watch to inspect
-	 *
-	 * @returns The events currently watched
-	 */
-
-	short (*watch_get_events)(struct poll_watch *w);
-
-	/**
-	 * @brief Free a struct poll_watch
-	 *
-	 * @param[in] w The poll_watch struct to free
-	 */
-
-	void (*watch_free)(struct poll_watch *w);
-
-
-	/**
-	 * @brief Create a new timeout watch
-	 *
-	 * @param[in] funcs The callback array
-	 * @param[in] tv The time when the timeout should trigger
-	 * @param[in] callback Function to call at time "tv"
-	 * @param[in] private_data Pointer to give back to callback
-	 *
-	 * @return A new poll_timeout struct
-	 */
-
-	struct poll_timeout *(*timeout_new)(
-		const struct poll_funcs *funcs, const struct timeval tv,
-		void (*callback)(struct poll_timeout *t, void *private_data),
-		void *private_data);
-
-	/**
-	 * @brief Change the timeout of a watch
-	 *
-	 * @param[in] t The timeout watch to change
-	 * @param[in] tv The new trigger time
-	 */
-
-	void (*timeout_update)(struct poll_timeout *t,
-			       const struct timeval tv);
-
-	/**
-	 * @brief Free a poll_timeout
-	 *
-	 * @param[in] t The poll_timeout to free
-	 */
-
-	void (*timeout_free)(struct poll_timeout *t);
-
-	/**
-	 * @brief private data for use by the implementation
-	 */
-
-	void *private_data;
-};
-
-#endif
diff --git a/lib/poll_funcs/poll_funcs_tevent.c b/lib/poll_funcs/poll_funcs_tevent.c
deleted file mode 100644
index 4dd09a2..0000000
--- a/lib/poll_funcs/poll_funcs_tevent.c
+++ /dev/null
@@ -1,684 +0,0 @@
-/*
- * Unix SMB/CIFS implementation.
- * Copyright (C) Volker Lendecke 2013,2014
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program.  If not, see <http://www.gnu.org/licenses/>.
- */
-
-#include "poll_funcs_tevent.h"
-#include "tevent.h"
-#include "system/select.h"
-#include "lib/util/dlinklist.h"
-
-/*
- * A poll_watch is asked for by the engine using this library via
- * funcs->watch_new(). It represents interest in "fd" becoming readable or
- * writable.
- */
-
-struct poll_watch {
-	struct poll_funcs_state *state;
-	size_t slot; 		/* index into state->watches[] */
-	int fd;
-	int events;
-	void (*callback)(struct poll_watch *w, int fd, short events,
-			 void *private_data);
-	void *private_data;
-};
-
-struct poll_timeout {
-	struct poll_funcs_state *state;
-	size_t slot;		/* index into state->timeouts[] */
-	struct timeval tv;
-	void (*callback)(struct poll_timeout *t, void *private_data);
-	void *private_data;
-};
-
-struct poll_funcs_state {
-	/*
-	 * "watches" is the array of all watches that we have handed out via
-	 * funcs->watch_new(). The "watches" array can contain NULL pointers.
-	 */
-	struct poll_watch **watches;
-
-	/*
-	 * Like "watches" for timeouts;
-	 */
-	struct poll_timeout **timeouts;
-
-	/*
-	 * "contexts is the array of tevent_contexts that serve
-	 * "watches". "contexts" can contain NULL pointers.
-	 */
-	struct poll_funcs_tevent_context **contexts;
-};
-
-struct poll_funcs_tevent_context {
-	struct poll_funcs_tevent_handle *handles;
-	struct poll_funcs_state *state;
-	unsigned slot;		/* index into state->contexts[] */
-	struct tevent_context *ev;
-	struct tevent_fd **fdes; /* same indexes as state->watches[] */
-	struct tevent_timer **timers;  /* same indexes as state->timeouts[] */
-};
-
-/*
- * poll_funcs_tevent_register() hands out a struct poll_funcs_tevent_handle as
- * a void *. poll_funcs_tevent_register allows tevent_contexts to be
- * registered multiple times, and we can't add a tevent_fd for the same fd's
- * multiple times. So we have to share one poll_funcs_tevent_context.
- */
-struct poll_funcs_tevent_handle {
-	struct poll_funcs_tevent_handle *prev, *next;
-	struct poll_funcs_tevent_context *ctx;
-};
-
-static uint16_t poll_events_to_tevent(short events)
-{
-	uint16_t ret = 0;
-
-	if (events & POLLIN) {
-		ret |= TEVENT_FD_READ;
-	}
-	if (events & POLLOUT) {
-		ret |= TEVENT_FD_WRITE;
-	}
-	return ret;
-}
-
-static short tevent_to_poll_events(uint16_t flags)
-{
-	short ret = 0;
-
-	if (flags & TEVENT_FD_READ) {
-		ret |= POLLIN;
-	}
-	if (flags & TEVENT_FD_WRITE) {
-		ret |= POLLOUT;
-	}
-	return ret;
-}
-
-/*
- * Find or create a free slot in state->watches[]
- */
-static bool poll_funcs_watch_find_slot(struct poll_funcs_state *state,
-				       size_t *slot)
-{
-	struct poll_watch **watches;
-	size_t i, num_watches, num_contexts;
-
-	num_watches = talloc_array_length(state->watches);
-
-	for (i=0; i<num_watches; i++) {
-		if (state->watches[i] == NULL) {
-			*slot = i;
-			return true;
-		}
-	}
-
-	watches = talloc_realloc(state, state->watches, struct poll_watch *,
-				 num_watches + 1);
-	if (watches == NULL) {
-		return false;
-	}
-	watches[num_watches] = NULL;
-	state->watches = watches;
-
-	num_contexts = talloc_array_length(state->contexts);
-
-	for (i=0; i<num_contexts; i++) {
-		struct tevent_fd **fdes;
-		struct poll_funcs_tevent_context *c = state->contexts[i];
-		if (c == NULL) {
-			continue;
-		}
-		fdes = talloc_realloc(c, c->fdes, struct tevent_fd *,
-				      num_watches + 1);
-		if (fdes == NULL) {
-			state->watches = talloc_realloc(
-				state, state->watches, struct poll_watch *,
-				num_watches);
-			return false;
-		}
-		c->fdes = fdes;
-
-		fdes[num_watches] = NULL;
-	}
-
-	*slot = num_watches;
-
-	return true;
-}
-
-static void poll_funcs_fde_handler(struct tevent_context *ev,
-				   struct tevent_fd *fde, uint16_t flags,
-				   void *private_data);
-static int poll_watch_destructor(struct poll_watch *w);
-
-static struct poll_watch *tevent_watch_new(
-	const struct poll_funcs *funcs, int fd, short events,
-	void (*callback)(struct poll_watch *w, int fd, short events,
-			 void *private_data),
-	void *private_data)
-{
-	struct poll_funcs_state *state = talloc_get_type_abort(
-		funcs->private_data, struct poll_funcs_state);
-	struct poll_watch *w;
-	size_t i, slot, num_contexts;
-
-	if (!poll_funcs_watch_find_slot(state, &slot)) {
-		return NULL;
-	}
-
-	w = talloc(state->watches, struct poll_watch);
-	if (w == NULL) {
-		return NULL;
-	}
-	w->state = state;
-	w->slot = slot;
-	w->fd = fd;
-	w->events = poll_events_to_tevent(events);
-	w->fd = fd;
-	w->callback = callback;
-	w->private_data = private_data;
-	state->watches[slot] = w;
-
-	talloc_set_destructor(w, poll_watch_destructor);
-
-	num_contexts = talloc_array_length(state->contexts);
-
-	for (i=0; i<num_contexts; i++) {
-		struct poll_funcs_tevent_context *c = state->contexts[i];
-		if (c == NULL) {
-			continue;
-		}
-		c->fdes[slot] = tevent_add_fd(c->ev, c->fdes, w->fd, w->events,
-					      poll_funcs_fde_handler, w);
-		if (c->fdes[slot] == NULL) {
-			goto fail;
-		}
-	}
-	return w;
-
-fail:
-	TALLOC_FREE(w);
-	return NULL;
-}
-
-static int poll_watch_destructor(struct poll_watch *w)
-{
-	struct poll_funcs_state *state = w->state;
-	size_t num_contexts = talloc_array_length(state->contexts);
-	size_t slot = w->slot;
-	size_t i;
-
-	TALLOC_FREE(state->watches[slot]);
-
-	for (i=0; i<num_contexts; i++) {
-		struct poll_funcs_tevent_context *c = state->contexts[i];
-		if (c == NULL) {
-			continue;
-		}
-		TALLOC_FREE(c->fdes[slot]);
-	}
-
-	return 0;
-}
-
-static void tevent_watch_update(struct poll_watch *w, short events)
-{
-	struct poll_funcs_state *state = w->state;
-	size_t num_contexts = talloc_array_length(state->contexts);
-	size_t slot = w->slot;
-	size_t i;
-
-	w->events = poll_events_to_tevent(events);
-
-	for (i=0; i<num_contexts; i++) {
-		struct poll_funcs_tevent_context *c = state->contexts[i];
-		if (c == NULL) {
-			continue;
-		}
-		tevent_fd_set_flags(c->fdes[slot], w->events);
-	}
-}
-
-static short tevent_watch_get_events(struct poll_watch *w)
-{
-	return tevent_to_poll_events(w->events);
-}
-
-static void tevent_watch_free(struct poll_watch *w)
-{
-	TALLOC_FREE(w);
-}
-
-static bool poll_funcs_timeout_find_slot(struct poll_funcs_state *state,
-					 size_t *slot)
-{
-	struct poll_timeout **timeouts;
-	size_t i, num_timeouts, num_contexts;
-
-	num_timeouts = talloc_array_length(state->timeouts);
-
-	for (i=0; i<num_timeouts; i++) {
-		if (state->timeouts[i] == NULL) {
-			*slot = i;
-			return true;
-		}
-	}
-
-	timeouts = talloc_realloc(state, state->timeouts,
-				  struct poll_timeout *,
-				  num_timeouts + 1);
-	if (timeouts == NULL) {
-		return false;
-	}
-	timeouts[num_timeouts] = NULL;
-	state->timeouts = timeouts;
-
-	num_contexts = talloc_array_length(state->contexts);
-
-	for (i=0; i<num_contexts; i++) {
-		struct tevent_timer **timers;
-		struct poll_funcs_tevent_context *c = state->contexts[i];
-		if (c == NULL) {
-			continue;
-		}
-		timers = talloc_realloc(c, c->timers, struct tevent_timer *,
-					num_timeouts + 1);
-		if (timers == NULL) {
-			state->timeouts = talloc_realloc(
-				state, state->timeouts, struct poll_timeout *,
-				num_timeouts);
-			return false;
-		}
-		c->timers = timers;
-
-		timers[num_timeouts] = NULL;
-	}
-
-	*slot = num_timeouts;
-
-	return true;
-}
-
-static void poll_funcs_timer_handler(struct tevent_context *ev,
-				     struct tevent_timer *te,
-				     struct timeval current_time,
-				     void *private_data);
-static int poll_timeout_destructor(struct poll_timeout *t);
-
-static struct poll_timeout *tevent_timeout_new(
-	const struct poll_funcs *funcs, const struct timeval tv,
-	void (*callback)(struct poll_timeout *t, void *private_data),
-	void *private_data)
-{
-	struct poll_funcs_state *state = talloc_get_type_abort(
-		funcs->private_data, struct poll_funcs_state);
-	struct poll_timeout *t;
-	size_t i, slot, num_contexts;
-
-	if (!poll_funcs_timeout_find_slot(state, &slot)) {
-		return NULL;
-	}
-
-	t = talloc(state->timeouts, struct poll_timeout);
-	if (t == NULL) {
-		return NULL;
-	}
-	t->state = state;
-	t->slot = slot;
-	t->tv = tv;
-	t->callback = callback;
-	t->private_data = private_data;
-
-	talloc_set_destructor(t, poll_timeout_destructor);
-
-	num_contexts = talloc_array_length(state->contexts);
-
-	for (i=0; i<num_contexts; i++) {
-		struct poll_funcs_tevent_context *c = state->contexts[i];
-		if (c == NULL) {
-			continue;
-		}
-		c->timers[slot] = tevent_add_timer(
-			c->ev, c->timers, tv, poll_funcs_timer_handler, t);
-		if (c->timers[slot] == NULL) {
-			goto fail;
-		}
-	}
-	return t;
-
-fail:
-	TALLOC_FREE(t);
-	return NULL;
-}
-
-static int poll_timeout_destructor(struct poll_timeout *t)
-{
-	struct poll_funcs_state *state = t->state;
-	size_t num_contexts = talloc_array_length(state->contexts);
-	size_t slot = t->slot;
-	size_t i;
-
-	TALLOC_FREE(state->timeouts[slot]);
-
-	for (i=0; i<num_contexts; i++) {
-		struct poll_funcs_tevent_context *c = state->contexts[i];
-		if (c == NULL) {
-			continue;
-		}
-		TALLOC_FREE(c->timers[slot]);
-	}
-
-	return 0;
-}
-
-static void poll_funcs_timer_handler(struct tevent_context *ev,
-				     struct tevent_timer *te,
-				     struct timeval current_time,
-				     void *private_data)
-{
-	struct poll_timeout *t = talloc_get_type_abort(
-		private_data, struct poll_timeout);
-	struct poll_funcs_state *state = t->state;
-	size_t slot = t->slot;
-	size_t i, num_contexts;
-
-	num_contexts = talloc_array_length(state->contexts);
-
-	for (i=0; i<num_contexts; i++) {
-		struct poll_funcs_tevent_context *c = state->contexts[i];
-		if (c == NULL) {
-			continue;
-		}
-		TALLOC_FREE(c->timers[slot]);
-	}
-
-	t->callback(t, t->private_data);
-}
-
-static void tevent_timeout_update(struct poll_timeout *t,
-				  const struct timeval tv)
-{
-	struct poll_funcs_state *state = t->state;
-	size_t num_contexts = talloc_array_length(state->contexts);
-	size_t slot = t->slot;
-	size_t i;
-
-	for (i=0; i<num_contexts; i++) {
-		struct poll_funcs_tevent_context *c = state->contexts[i];
-		if (c == NULL) {
-			continue;
-		}
-		TALLOC_FREE(c->timers[slot]);
-
-		c->timers[slot] = tevent_add_timer(
-			c->ev, c->timers, tv, poll_funcs_timer_handler, t);
-		if (c->timers[slot] == NULL) {
-			/*
-			 * We just free'ed the space, why did this fail??
-			 */
-			abort();
-		}
-	}
-}
-
-static void tevent_timeout_free(struct poll_timeout *t)
-{
-	TALLOC_FREE(t);
-}
-
-static int poll_funcs_state_destructor(struct poll_funcs_state *state);
-
-struct poll_funcs *poll_funcs_init_tevent(TALLOC_CTX *mem_ctx)
-{
-	struct poll_funcs *f;
-	struct poll_funcs_state *state;
-
-	f = talloc(mem_ctx, struct poll_funcs);
-	if (f == NULL) {
-		return NULL;
-	}
-	state = talloc_zero(f, struct poll_funcs_state);
-	if (state == NULL) {
-		TALLOC_FREE(f);
-		return NULL;
-	}
-	talloc_set_destructor(state, poll_funcs_state_destructor);
-
-	f->watch_new = tevent_watch_new;
-	f->watch_update = tevent_watch_update;
-	f->watch_get_events = tevent_watch_get_events;
-	f->watch_free = tevent_watch_free;
-	f->timeout_new = tevent_timeout_new;
-	f->timeout_update = tevent_timeout_update;
-	f->timeout_free = tevent_timeout_free;
-	f->private_data = state;
-	return f;
-}
-
-static int poll_funcs_state_destructor(struct poll_funcs_state *state)
-{
-	size_t num_watches = talloc_array_length(state->watches);
-	size_t num_timeouts = talloc_array_length(state->timeouts);
-	size_t num_contexts = talloc_array_length(state->contexts);
-	size_t i;
-	/*
-	 * Make sure the watches are cleared before the contexts. The watches
-	 * have destructors attached to them that clean up the fde's
-	 */
-	for (i=0; i<num_watches; i++) {
-		TALLOC_FREE(state->watches[i]);
-	}
-	for (i=0; i<num_timeouts; i++) {
-		TALLOC_FREE(state->timeouts[i]);
-	}
-	for (i=0; i<num_contexts; i++) {
-		TALLOC_FREE(state->contexts[i]);
-	}
-	return 0;
-}
-
-/*
- * Find or create a free slot in state->contexts[]
- */
-static bool poll_funcs_context_slot_find(struct poll_funcs_state *state,
-					 struct tevent_context *ev,
-					 size_t *slot)
-{
-	struct poll_funcs_tevent_context **contexts;
-	size_t num_contexts = talloc_array_length(state->contexts);
-	size_t i;
-
-	/* Look for an existing match first. */
-	for (i=0; i<num_contexts; i++) {
-		struct poll_funcs_tevent_context *ctx = state->contexts[i];
-
-		if (ctx != NULL && ctx->ev == ev) {
-			*slot = i;
-			return true;
-		}
-	}
-
-	/* Now look for a free slot. */
-	for (i=0; i<num_contexts; i++) {
-		struct poll_funcs_tevent_context *ctx = state->contexts[i];
-
-		if (ctx == NULL) {
-			*slot = i;
-			return true;
-		}
-	}
-
-	contexts = talloc_realloc(state, state->contexts,
-				  struct poll_funcs_tevent_context *,
-				  num_contexts + 1);
-	if (contexts == NULL) {
-		return false;
-	}
-	state->contexts = contexts;
-	state->contexts[num_contexts] = NULL;
-
-	*slot = num_contexts;
-
-	return true;
-}
-
-static int poll_funcs_tevent_context_destructor(
-	struct poll_funcs_tevent_context *ctx);
-
-static struct poll_funcs_tevent_context *poll_funcs_tevent_context_new(
-	TALLOC_CTX *mem_ctx, struct poll_funcs_state *state,
-	struct tevent_context *ev, unsigned slot)
-{
-	struct poll_funcs_tevent_context *ctx;
-	size_t num_watches = talloc_array_length(state->watches);
-	size_t num_timeouts = talloc_array_length(state->timeouts);
-	size_t i;
-
-	ctx = talloc(mem_ctx, struct poll_funcs_tevent_context);
-	if (ctx == NULL) {
-		return NULL;
-	}
-
-	ctx->handles = NULL;
-	ctx->state = state;
-	ctx->ev = ev;
-	ctx->slot = slot;
-
-	ctx->fdes = talloc_array(ctx, struct tevent_fd *, num_watches);
-	if (ctx->fdes == NULL) {
-		goto fail;
-	}
-
-	for (i=0; i<num_watches; i++) {
-		struct poll_watch *w = state->watches[i];
-
-		if (w == NULL) {
-			ctx->fdes[i] = NULL;
-			continue;
-		}
-		ctx->fdes[i] = tevent_add_fd(ev, ctx->fdes, w->fd, w->events,
-					     poll_funcs_fde_handler, w);
-		if (ctx->fdes[i] == NULL) {
-			goto fail;
-		}
-	}
-
-	ctx->timers = talloc_array(ctx, struct tevent_timer *, num_timeouts);
-	if (ctx->timers == NULL) {
-		goto fail;
-	}
-
-	for (i=0; i<num_timeouts; i++) {
-		struct poll_timeout *t = state->timeouts[i];
-
-		if (t == NULL) {
-			ctx->timers[i] = NULL;
-			continue;
-		}
-		ctx->timers[i] = tevent_add_timer(ctx->ev, ctx->timers, t->tv,
-						  poll_funcs_timer_handler, t);
-		if (ctx->timers[i] == 0) {
-			goto fail;
-		}
-	}
-
-	talloc_set_destructor(ctx, poll_funcs_tevent_context_destructor);
-	return ctx;
-fail:
-	TALLOC_FREE(ctx);
-	return NULL;
-}
-
-static int poll_funcs_tevent_context_destructor(
-	struct poll_funcs_tevent_context *ctx)
-{
-	struct poll_funcs_tevent_handle *h;
-
-	ctx->state->contexts[ctx->slot] = NULL;
-
-	for (h = ctx->handles; h != NULL; h = h->next) {
-		h->ctx = NULL;
-	}
-
-	return 0;
-}
-
-static void poll_funcs_fde_handler(struct tevent_context *ev,
-				   struct tevent_fd *fde, uint16_t flags,
-				   void *private_data)
-{
-	struct poll_watch *w = talloc_get_type_abort(
-		private_data, struct poll_watch);
-	short events = tevent_to_poll_events(flags);
-	w->callback(w, w->fd, events, w->private_data);
-}
-
-static int poll_funcs_tevent_handle_destructor(
-	struct poll_funcs_tevent_handle *handle);
-
-void *poll_funcs_tevent_register(TALLOC_CTX *mem_ctx, struct poll_funcs *f,
-				 struct tevent_context *ev)
-{
-	struct poll_funcs_state *state = talloc_get_type_abort(
-		f->private_data, struct poll_funcs_state);
-	struct poll_funcs_tevent_handle *handle;
-	size_t slot;
-
-	handle = talloc(mem_ctx, struct poll_funcs_tevent_handle);
-	if (handle == NULL) {
-		return NULL;
-	}
-
-	if (!poll_funcs_context_slot_find(state, ev, &slot)) {
-		goto fail;
-	}
-	if (state->contexts[slot] == NULL) {
-		state->contexts[slot] = poll_funcs_tevent_context_new(
-			state->contexts, state, ev, slot);
-		if (state->contexts[slot] == NULL) {
-			goto fail;
-		}
-	}
-
-	handle->ctx = state->contexts[slot];
-	DLIST_ADD(handle->ctx->handles, handle);
-	talloc_set_destructor(handle, poll_funcs_tevent_handle_destructor);
-	return handle;
-fail:
-	TALLOC_FREE(handle);
-	return NULL;
-}
-
-static int poll_funcs_tevent_handle_destructor(
-	struct poll_funcs_tevent_handle *handle)
-{
-	if (handle->ctx == NULL) {
-		return 0;
-	}
-	if (handle->ctx->handles == NULL) {
-		abort();
-	}
-
-	DLIST_REMOVE(handle->ctx->handles, handle);
-
-	if (handle->ctx->handles == NULL) {
-		TALLOC_FREE(handle->ctx);
-	}
-	return 0;
-}
diff --git a/lib/poll_funcs/poll_funcs_tevent.h b/lib/poll_funcs/poll_funcs_tevent.h
deleted file mode 100644
index 8b2964c..0000000
--- a/lib/poll_funcs/poll_funcs_tevent.h
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Unix SMB/CIFS implementation.
- * Copyright (C) Volker Lendecke 2013,2014
- *
- * This program is free software; you can redistribute it and/or modify
- * it under the terms of the GNU General Public License as published by
- * the Free Software Foundation; either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with this program.  If not, see <http://www.gnu.org/licenses/>.
- */
-
-#ifndef __POLL_FUNCS_TEVENT_H__
-#define __POLL_FUNCS_TEVENT_H__
-
-#include "poll_funcs.h"
-#include "tevent.h"
-
-/*
- * Create a new, empty instance of "struct poll_funcs" to be served by tevent.
- */
-struct poll_funcs *poll_funcs_init_tevent(TALLOC_CTX *mem_ctx);
-
-/*
- * Register a tevent_context to handle the watches that the user of
- * "poll_funcs" showed interest in. talloc_free() the returned pointer when
- * "ev" is not supposed to handle the events anymore.
- */
-void *poll_funcs_tevent_register(TALLOC_CTX *mem_ctx, struct poll_funcs *f,
-				 struct tevent_context *ev);
-
-#endif
diff --git a/lib/poll_funcs/wscript_build b/lib/poll_funcs/wscript_build
deleted file mode 100644
index df9a298..0000000
--- a/lib/poll_funcs/wscript_build
+++ /dev/null
@@ -1,5 +0,0 @@
-#!/usr/bin/env python
-
-bld.SAMBA_SUBSYSTEM('POLL_FUNCS_TEVENT',
-                    source='poll_funcs_tevent.c',
-                    deps='tevent')
diff --git a/source3/wscript_build b/source3/wscript_build
index 1598555..66fcaac 100755
--- a/source3/wscript_build
+++ b/source3/wscript_build
@@ -354,7 +354,6 @@ bld.SAMBA3_SUBSYSTEM('samba3core',
                         UTIL_PW
                         SAMBA_VERSION
                         PTHREADPOOL
-                        POLL_FUNCS_TEVENT
                         interfaces
                         param
                         dbwrap
diff --git a/wscript_build b/wscript_build
index fa3aa60..93b1832 100644
--- a/wscript_build
+++ b/wscript_build
@@ -43,7 +43,6 @@ bld.RECURSE('lib/texpect')
 bld.RECURSE('lib/addns')
 bld.RECURSE('lib/ldb')
 bld.RECURSE('lib/param')
-bld.RECURSE('lib/poll_funcs')
 bld.RECURSE('dynconfig')
 bld.RECURSE('lib/util/charset')
 bld.RECURSE('python')
-- 
2.7.4


>From d9c0f5ff83bfb753bc9b4b7562174fa7d63432dd Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 9 Sep 2016 16:51:25 +0200
Subject: [PATCH 16/26] messaging: add an overflow test

Send 1000 messages without picking them up. Then destroy the sending messaging
context.

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source4/lib/messaging/tests/messaging.c | 65 +++++++++++++++++++++++++++++++++
 1 file changed, 65 insertions(+)

diff --git a/source4/lib/messaging/tests/messaging.c b/source4/lib/messaging/tests/messaging.c
index 51195a1..4d8b53f 100644
--- a/source4/lib/messaging/tests/messaging.c
+++ b/source4/lib/messaging/tests/messaging.c
@@ -26,6 +26,7 @@
 #include "cluster/cluster.h"
 #include "param/param.h"
 #include "torture/local/proto.h"
+#include "system/select.h"
 
 static uint32_t msg_pong;
 
@@ -134,9 +135,73 @@ static bool test_ping_speed(struct torture_context *tctx)
 	return true;
 }
 
+static bool test_messaging_overflow(struct torture_context *tctx)
+{
+	struct imessaging_context *msg_ctx;
+	pid_t child;
+	int fds[2];
+	int i, ret, child_status;
+
+	ret = pipe(fds);
+	torture_assert(tctx, ret == 0, "pipe failed");
+
+	child = fork();
+	if (child < 0) {
+		torture_fail(tctx, "fork failed");
+	}
+
+	if (child == 0) {
+		ssize_t nread;
+		char c;
+
+		ret = close(fds[1]);
+		torture_assert(tctx, ret == 0, "close failed");
+
+		msg_ctx = imessaging_init(tctx, tctx->lp_ctx,
+					  cluster_id(getpid(), 0),
+					  tctx->ev);
+		torture_assert(tctx, msg_ctx != NULL,
+			       "imessaging_init failed");
+
+		do {
+			nread = read(fds[0], &c, 1);
+		} while ((nread == -1) && (errno == EINTR));
+
+		exit(0);
+	}
+
+	msg_ctx = imessaging_init(tctx, tctx->lp_ctx, cluster_id(getpid(), 0),
+				  tctx->ev);
+	torture_assert(tctx, msg_ctx != NULL, "imessaging_init failed");
+
+	for (i=0; i<1000; i++) {
+		NTSTATUS status;
+		status = imessaging_send(msg_ctx, cluster_id(child, 0),
+					 MSG_PING, NULL);
+		torture_assert_ntstatus_ok(tctx, status,
+					   "imessaging_send failed");
+	}
+
+	tevent_loop_once(tctx->ev);
+
+	talloc_free(msg_ctx);
+
+	ret = close(fds[1]);
+	torture_assert(tctx, ret == 0, "close failed");
+
+	ret = waitpid(child, &child_status, 0);
+	torture_assert(tctx, ret == child, "wrong child exited");
+	torture_assert(tctx, child_status == 0, "child failed");
+
+	poll(NULL, 0, 500);
+
+	return true;
+}
+
 struct torture_suite *torture_local_messaging(TALLOC_CTX *mem_ctx)
 {
 	struct torture_suite *s = torture_suite_create(mem_ctx, "messaging");
+	torture_suite_add_simple_test(s, "overflow", test_messaging_overflow);
 	torture_suite_add_simple_test(s, "ping_speed", test_ping_speed);
 	return s;
 }
-- 
2.7.4


>From 201252754ab2234f5aae5c881f253255143e10ee Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 23 Sep 2016 19:06:56 -0700
Subject: [PATCH 17/26] lib: Add messaging_rec_create

Essentially a wrapper around messaging_rec_dup

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/lib/messages.c | 49 +++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 49 insertions(+)

diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index 3e11cc5..a3695e9 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -83,6 +83,8 @@ struct messaging_context {
 	struct server_id_db *names_db;
 };
 
+static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
+					       struct messaging_rec *rec);
 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
 				   struct messaging_rec *rec);
 
@@ -105,6 +107,53 @@ static void ping_message(struct messaging_context *msg_ctx,
 	messaging_send(msg_ctx, src, MSG_PONG, data);
 }
 
+static struct messaging_rec *messaging_rec_create(
+	TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
+	uint32_t msg_type, const struct iovec *iov, int iovlen,
+	const int *fds, size_t num_fds)
+{
+	ssize_t buflen;
+	uint8_t *buf;
+	struct messaging_rec *result;
+
+	if (num_fds > INT8_MAX) {
+		return NULL;
+	}
+
+	buflen = iov_buflen(iov, iovlen);
+	if (buflen == -1) {
+		return NULL;
+	}
+	buf = talloc_array(mem_ctx, uint8_t, buflen);
+	if (buf == NULL) {
+		return NULL;
+	}
+	iov_buf(iov, iovlen, buf, buflen);
+
+	{
+		struct messaging_rec rec;
+		int64_t fds64[num_fds];
+		size_t i;
+
+		for (i=0; i<num_fds; i++) {
+			fds64[i] = fds[i];
+		}
+
+		rec = (struct messaging_rec) {
+			.msg_version = MESSAGE_VERSION, .msg_type = msg_type,
+			.src = src, .dest = dst,
+			.buf.data = buf, .buf.length = buflen,
+			.num_fds = num_fds, .fds = fds64,
+		};
+
+		result = messaging_rec_dup(mem_ctx, &rec);
+	}
+
+	TALLOC_FREE(buf);
+
+	return result;
+}
+
 static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
 			      int *fds, size_t num_fds,
 			      void *private_data)
-- 
2.7.4


>From ed9030c6f555d942358f7d6eccce7c26a91f98b1 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 25 Jul 2016 16:31:18 +0200
Subject: [PATCH 18/26] messaging: Optimize self-sends

We need to go through the event loop, which messaging_dgm_send does. We can
also use a tevent_immediate for the same purpose. Right now the main user is
messaging_ctdb: Here strace looks a bit weird when we receive a message.

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/lib/messages.c | 58 ++++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 58 insertions(+)

diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index a3695e9..ae06243 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -479,6 +479,58 @@ NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
 	return messaging_send(msg_ctx, server, msg_type, &blob);
 }
 
+struct messaging_post_state {
+	struct messaging_context *msg_ctx;
+	struct messaging_rec *rec;
+};
+
+static void messaging_post_handler(struct tevent_context *ev,
+				   struct tevent_immediate *ti,
+				   void *private_data);
+
+static int messaging_post_self(struct messaging_context *msg_ctx,
+			       struct server_id src, struct server_id dst,
+			       uint32_t msg_type,
+			       const struct iovec *iov, int iovlen,
+			       const int *fds, size_t num_fds)
+{
+	struct tevent_immediate *ti;
+	struct messaging_post_state *state;
+
+	ti = tevent_create_immediate(msg_ctx);
+	if (ti == NULL) {
+		return ENOMEM;
+	}
+	state = talloc(ti, struct messaging_post_state);
+	if (state == NULL) {
+		goto fail;
+	}
+	state->msg_ctx = msg_ctx;
+
+	state->rec = messaging_rec_create(
+		state, src, dst, msg_type, iov, iovlen, fds, num_fds);
+	if (state->rec == NULL) {
+		goto fail;
+	}
+
+	tevent_schedule_immediate(ti, msg_ctx->event_ctx,
+				  messaging_post_handler, state);
+	return 0;
+
+fail:
+	TALLOC_FREE(ti);
+	return ENOMEM;
+}
+
+static void messaging_post_handler(struct tevent_context *ev,
+				   struct tevent_immediate *ti,
+				   void *private_data)
+{
+	struct messaging_post_state *state = talloc_get_type_abort(
+		private_data, struct messaging_post_state);
+	messaging_dispatch_rec(state->msg_ctx, state->rec);
+}
+
 int messaging_send_iov_from(struct messaging_context *msg_ctx,
 			    struct server_id src, struct server_id dst,
 			    uint32_t msg_type,
@@ -509,6 +561,12 @@ int messaging_send_iov_from(struct messaging_context *msg_ctx,
 		return ret;
 	}
 
+	if (server_id_equal(&dst, &msg_ctx->id)) {
+		ret = messaging_post_self(msg_ctx, src, dst, msg_type,
+					  iov, iovlen, fds, num_fds);
+		return ret;
+	}
+
 	message_hdr_put(hdr, msg_type, src, dst);
 	iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
 	memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
-- 
2.7.4


>From 61372bc07062abbf081f171a992f51ce00705941 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 12 Sep 2016 13:02:26 +0200
Subject: [PATCH 19/26] tevent: Add tevent_req_reset_endtime

We might decide at some point that we don't want a request to
time out

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 lib/tevent/ABI/tevent-0.9.30.sigs | 1 +
 lib/tevent/tevent.h               | 7 +++++++
 lib/tevent/tevent_req.c           | 5 +++++
 3 files changed, 13 insertions(+)

diff --git a/lib/tevent/ABI/tevent-0.9.30.sigs b/lib/tevent/ABI/tevent-0.9.30.sigs
index ea179a0..7a6a236 100644
--- a/lib/tevent/ABI/tevent-0.9.30.sigs
+++ b/lib/tevent/ABI/tevent-0.9.30.sigs
@@ -69,6 +69,7 @@ tevent_req_poll: bool (struct tevent_req *, struct tevent_context *)
 tevent_req_post: struct tevent_req *(struct tevent_req *, struct tevent_context *)
 tevent_req_print: char *(TALLOC_CTX *, struct tevent_req *)
 tevent_req_received: void (struct tevent_req *)
+tevent_req_reset_endtime: void (struct tevent_req *)
 tevent_req_set_callback: void (struct tevent_req *, tevent_req_fn, void *)
 tevent_req_set_cancel_fn: void (struct tevent_req *, tevent_req_cancel_fn)
 tevent_req_set_cleanup_fn: void (struct tevent_req *, tevent_req_cleanup_fn)
diff --git a/lib/tevent/tevent.h b/lib/tevent/tevent.h
index bb23257..ba4bb4d 100644
--- a/lib/tevent/tevent.h
+++ b/lib/tevent/tevent.h
@@ -1040,6 +1040,13 @@ bool tevent_req_set_endtime(struct tevent_req *req,
 			    struct tevent_context *ev,
 			    struct timeval endtime);
 
+/**
+ * @brief Reset the timer set by tevent_req_set_endtime.
+ *
+ * @param[in]  req      The request to reset the timeout for
+ */
+void tevent_req_reset_endtime(struct tevent_req *req);
+
 #ifdef DOXYGEN
 /**
  * @brief Call the notify callback of the given tevent request manually.
diff --git a/lib/tevent/tevent_req.c b/lib/tevent/tevent_req.c
index e2b7104..e309c3d 100644
--- a/lib/tevent/tevent_req.c
+++ b/lib/tevent/tevent_req.c
@@ -313,6 +313,11 @@ bool tevent_req_set_endtime(struct tevent_req *req,
 	return true;
 }
 
+void tevent_req_reset_endtime(struct tevent_req *req)
+{
+	TALLOC_FREE(req->internal.timer);
+}
+
 void tevent_req_set_callback(struct tevent_req *req, tevent_req_fn fn, void *pvt)
 {
 	req->async.fn = fn;
-- 
2.7.4


>From 11bb60be6325f459b422dd77b5bad1d418c86519 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 12 Sep 2016 14:12:10 +0200
Subject: [PATCH 20/26] messages_dgm: Drop a segment if we can't ship it for 60
 seconds

---
 source3/lib/messages_dgm.c | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git a/source3/lib/messages_dgm.c b/source3/lib/messages_dgm.c
index 5f82168..89ccce9 100644
--- a/source3/lib/messages_dgm.c
+++ b/source3/lib/messages_dgm.c
@@ -455,6 +455,8 @@ static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
 	struct messaging_dgm_out_queue_state *state = tevent_req_data(
 		req, struct messaging_dgm_out_queue_state);
 
+	tevent_req_reset_endtime(req);
+
 	state->subreq = pthreadpool_tevent_job_send(
 		state, state->ev, state->pool,
 		messaging_dgm_out_threaded_job, state);
@@ -518,6 +520,7 @@ static int messaging_dgm_out_send_fragment(
 {
 	struct tevent_req *req;
 	size_t qlen;
+	bool ok;
 
 	qlen = tevent_queue_length(out->queue);
 	if (qlen == 0) {
@@ -550,6 +553,13 @@ static int messaging_dgm_out_send_fragment(
 	}
 	tevent_req_set_callback(req, messaging_dgm_out_sent_fragment, out);
 
+	ok = tevent_req_set_endtime(req, ev,
+				    tevent_timeval_current_ofs(60, 0));
+	if (!ok) {
+		TALLOC_FREE(req);
+		return ENOMEM;
+	}
+
 	return 0;
 }
 
-- 
2.7.4


>From 67d07d53d4d9dc11655f4e4d3f5e4efbefe1bb4e Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 23 Sep 2016 17:07:20 -0700
Subject: [PATCH 21/26] messages_dgm: Pass down event_ctx one level

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/lib/messages_dgm.c | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/source3/lib/messages_dgm.c b/source3/lib/messages_dgm.c
index 89ccce9..070cac6 100644
--- a/source3/lib/messages_dgm.c
+++ b/source3/lib/messages_dgm.c
@@ -983,6 +983,7 @@ static int messaging_dgm_context_destructor(struct messaging_dgm_context *c)
 }
 
 static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
+			       struct tevent_context *ev,
 			       uint8_t *msg, size_t msg_len,
 			       int *fds, size_t num_fds);
 
@@ -1049,7 +1050,7 @@ static void messaging_dgm_read_handler(struct tevent_context *ev,
 			}
 		}
 
-		messaging_dgm_recv(ctx, buf, received, fds, num_fds);
+		messaging_dgm_recv(ctx, ev, buf, received, fds, num_fds);
 	}
 
 }
@@ -1061,6 +1062,7 @@ static int messaging_dgm_in_msg_destructor(struct messaging_dgm_in_msg *m)
 }
 
 static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
+			       struct tevent_context *ev,
 			       uint8_t *buf, size_t buflen,
 			       int *fds, size_t num_fds)
 {
-- 
2.7.4


>From b45c3093b58e4beee2a79bd7159626987c33d109 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 23 Sep 2016 18:36:15 -0700
Subject: [PATCH 22/26] messages_dgm: Pass receiving "ev" to recv_cb

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/lib/messages_dgm.c     | 10 ++++++----
 source3/lib/messages_dgm.h     |  3 ++-
 source3/lib/messages_dgm_ref.c |  6 ++++--
 3 files changed, 12 insertions(+), 7 deletions(-)

diff --git a/source3/lib/messages_dgm.c b/source3/lib/messages_dgm.c
index 070cac6..8c632b9 100644
--- a/source3/lib/messages_dgm.c
+++ b/source3/lib/messages_dgm.c
@@ -78,7 +78,8 @@ struct messaging_dgm_context {
 	struct tevent_fd *read_fde;
 	struct messaging_dgm_in_msg *in_msgs;
 
-	void (*recv_cb)(const uint8_t *msg,
+	void (*recv_cb)(struct tevent_context *ev,
+			const uint8_t *msg,
 			size_t msg_len,
 			int *fds,
 			size_t num_fds,
@@ -823,7 +824,8 @@ int messaging_dgm_init(struct tevent_context *ev,
 		       uint64_t *punique,
 		       const char *socket_dir,
 		       const char *lockfile_dir,
-		       void (*recv_cb)(const uint8_t *msg,
+		       void (*recv_cb)(struct tevent_context *ev,
+				       const uint8_t *msg,
 				       size_t msg_len,
 				       int *fds,
 				       size_t num_fds,
@@ -1079,7 +1081,7 @@ static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
 	buflen -= sizeof(cookie);
 
 	if (cookie == 0) {
-		ctx->recv_cb(buf, buflen, fds, num_fds,
+		ctx->recv_cb(ev, buf, buflen, fds, num_fds,
 			     ctx->recv_cb_private_data);
 		return;
 	}
@@ -1142,7 +1144,7 @@ static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
 	DLIST_REMOVE(ctx->in_msgs, msg);
 	talloc_set_destructor(msg, NULL);
 
-	ctx->recv_cb(msg->buf, msg->msglen, fds, num_fds,
+	ctx->recv_cb(ev, msg->buf, msg->msglen, fds, num_fds,
 		     ctx->recv_cb_private_data);
 
 	TALLOC_FREE(msg);
diff --git a/source3/lib/messages_dgm.h b/source3/lib/messages_dgm.h
index a9cbd81..7695a92 100644
--- a/source3/lib/messages_dgm.h
+++ b/source3/lib/messages_dgm.h
@@ -28,7 +28,8 @@ int messaging_dgm_init(struct tevent_context *ev,
 		       uint64_t *unique,
 		       const char *socket_dir,
 		       const char *lockfile_dir,
-		       void (*recv_cb)(const uint8_t *msg,
+		       void (*recv_cb)(struct tevent_context *ev,
+				       const uint8_t *msg,
 				       size_t msg_len,
 				       int *fds,
 				       size_t num_fds,
diff --git a/source3/lib/messages_dgm_ref.c b/source3/lib/messages_dgm_ref.c
index 3ea8b9d..7b8acf1 100644
--- a/source3/lib/messages_dgm_ref.c
+++ b/source3/lib/messages_dgm_ref.c
@@ -36,7 +36,8 @@ static pid_t dgm_pid = 0;
 static struct msg_dgm_ref *refs = NULL;
 
 static int msg_dgm_ref_destructor(struct msg_dgm_ref *r);
-static void msg_dgm_ref_recv(const uint8_t *msg, size_t msg_len,
+static void msg_dgm_ref_recv(struct tevent_context *ev,
+			     const uint8_t *msg, size_t msg_len,
 			     int *fds, size_t num_fds, void *private_data);
 
 void *messaging_dgm_ref(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
@@ -114,7 +115,8 @@ void *messaging_dgm_ref(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
 	return result;
 }
 
-static void msg_dgm_ref_recv(const uint8_t *msg, size_t msg_len,
+static void msg_dgm_ref_recv(struct tevent_context *ev,
+			     const uint8_t *msg, size_t msg_len,
 			     int *fds, size_t num_fds, void *private_data)
 {
 	struct msg_dgm_ref *r, *next;
-- 
2.7.4


>From 85226b8e277c190a02110f148e19daae6e24b2d2 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 23 Sep 2016 18:36:15 -0700
Subject: [PATCH 23/26] messages_dgm_ref: Pass receiving "ev" to recv_cb

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/lib/messages.c            | 3 ++-
 source3/lib/messages_dgm_ref.c    | 8 +++++---
 source3/lib/messages_dgm_ref.h    | 3 ++-
 source4/lib/messaging/messaging.c | 6 ++++--
 4 files changed, 13 insertions(+), 7 deletions(-)

diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index ae06243..96d106a 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -154,7 +154,8 @@ static struct messaging_rec *messaging_rec_create(
 	return result;
 }
 
-static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
+static void messaging_recv_cb(struct tevent_context *ev,
+			      const uint8_t *msg, size_t msg_len,
 			      int *fds, size_t num_fds,
 			      void *private_data)
 {
diff --git a/source3/lib/messages_dgm_ref.c b/source3/lib/messages_dgm_ref.c
index 7b8acf1..00a3a66 100644
--- a/source3/lib/messages_dgm_ref.c
+++ b/source3/lib/messages_dgm_ref.c
@@ -27,7 +27,8 @@
 struct msg_dgm_ref {
 	struct msg_dgm_ref *prev, *next;
 	void *tevent_handle;
-	void (*recv_cb)(const uint8_t *msg, size_t msg_len,
+	void (*recv_cb)(struct tevent_context *ev,
+			const uint8_t *msg, size_t msg_len,
 			int *fds, size_t num_fds, void *private_data);
 	void *recv_cb_private_data;
 };
@@ -44,7 +45,8 @@ void *messaging_dgm_ref(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
 			uint64_t *unique,
 			const char *socket_dir,
 			const char *lockfile_dir,
-			void (*recv_cb)(const uint8_t *msg, size_t msg_len,
+			void (*recv_cb)(struct tevent_context *ev,
+					const uint8_t *msg, size_t msg_len,
 					int *fds, size_t num_fds,
 					void *private_data),
 			void *recv_cb_private_data,
@@ -127,7 +129,7 @@ static void msg_dgm_ref_recv(struct tevent_context *ev,
 	 */
 	for (r = refs; r != NULL; r = next) {
 		next = r->next;
-		r->recv_cb(msg, msg_len, fds, num_fds,
+		r->recv_cb(ev, msg, msg_len, fds, num_fds,
 			   r->recv_cb_private_data);
 	}
 }
diff --git a/source3/lib/messages_dgm_ref.h b/source3/lib/messages_dgm_ref.h
index 8f0aff8..cd77101 100644
--- a/source3/lib/messages_dgm_ref.h
+++ b/source3/lib/messages_dgm_ref.h
@@ -28,7 +28,8 @@ void *messaging_dgm_ref(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
 			uint64_t *unique,
 			const char *socket_dir,
 			const char *lockfile_dir,
-			void (*recv_cb)(const uint8_t *msg, size_t msg_len,
+			void (*recv_cb)(struct tevent_context *ev,
+					const uint8_t *msg, size_t msg_len,
 					int *fds, size_t num_fds,
 					void *private_data),
 			void *recv_cb_private_data,
diff --git a/source4/lib/messaging/messaging.c b/source4/lib/messaging/messaging.c
index c0b64be..950a685 100644
--- a/source4/lib/messaging/messaging.c
+++ b/source4/lib/messaging/messaging.c
@@ -294,7 +294,8 @@ int imessaging_cleanup(struct imessaging_context *msg)
 	return 0;
 }
 
-static void imessaging_dgm_recv(const uint8_t *buf, size_t buf_len,
+static void imessaging_dgm_recv(struct tevent_context *ev,
+				const uint8_t *buf, size_t buf_len,
 				int *fds, size_t num_fds,
 				void *private_data);
 
@@ -415,7 +416,8 @@ fail:
 	return NULL;
 }
 
-static void imessaging_dgm_recv(const uint8_t *buf, size_t buf_len,
+static void imessaging_dgm_recv(struct tevent_context *ev,
+				const uint8_t *buf, size_t buf_len,
 				int *fds, size_t num_fds,
 				void *private_data)
 {
-- 
2.7.4


>From a838b383d0fffff7ada8c08d463e279be16488ac Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 23 Sep 2016 19:28:10 -0700
Subject: [PATCH 24/26] messaging: Pass "ev" to messaging_dispatch_rec

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/lib/messages.c | 6 ++++--
 1 file changed, 4 insertions(+), 2 deletions(-)

diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index 96d106a..299328f 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -86,6 +86,7 @@ struct messaging_context {
 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
 					       struct messaging_rec *rec);
 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
+				   struct tevent_context *ev,
 				   struct messaging_rec *rec);
 
 /****************************************************************************
@@ -199,7 +200,7 @@ static void messaging_recv_cb(struct tevent_context *ev,
 		  (unsigned)rec.msg_type, rec.buf.length, num_fds,
 		  server_id_str_buf(rec.src, &idbuf));
 
-	messaging_dispatch_rec(msg_ctx, &rec);
+	messaging_dispatch_rec(msg_ctx, ev, &rec);
 	return;
 
 close_fail:
@@ -529,7 +530,7 @@ static void messaging_post_handler(struct tevent_context *ev,
 {
 	struct messaging_post_state *state = talloc_get_type_abort(
 		private_data, struct messaging_post_state);
-	messaging_dispatch_rec(state->msg_ctx, state->rec);
+	messaging_dispatch_rec(state->msg_ctx, ev, state->rec);
 }
 
 int messaging_send_iov_from(struct messaging_context *msg_ctx,
@@ -962,6 +963,7 @@ static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
   Dispatch one messaging_rec
 */
 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
+				   struct tevent_context *ev,
 				   struct messaging_rec *rec)
 {
 	struct messaging_callback *cb, *next;
-- 
2.7.4


>From a0c32fc256edd179870409c1b980874a7e58cd74 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 23 Sep 2016 19:35:10 -0700
Subject: [PATCH 25/26] messaging: Act on messages within the right context

Only look at "classic" messaging_register handlers in the main event
context loop

If we're sitting in a nested event context, only act upon the
messaging_filtered_read ones that are registered in the nested context.

Postpone everything else via an immediate to the main tevent context

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/lib/messages.c | 56 ++++++++++++++++++++++++++++++++++++++++++--------
 1 file changed, 47 insertions(+), 9 deletions(-)

diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index 299328f..d0045d1 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -959,18 +959,14 @@ static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
 	return true;
 }
 
-/*
-  Dispatch one messaging_rec
-*/
-static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
-				   struct tevent_context *ev,
-				   struct messaging_rec *rec)
+static void messaging_dispatch_classic(struct messaging_context *msg_ctx,
+				       struct messaging_rec *rec)
 {
 	struct messaging_callback *cb, *next;
-	unsigned i;
-	size_t j;
 
 	for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
+		size_t j;
+
 		next = cb->next;
 		if (cb->msg_type != rec->msg_type) {
 			continue;
@@ -996,6 +992,21 @@ static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
 		 * message type
 		 */
 	}
+}
+
+/*
+  Dispatch one messaging_rec
+*/
+static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
+				   struct tevent_context *ev,
+				   struct messaging_rec *rec)
+{
+	unsigned i;
+	size_t j;
+
+	if (ev == msg_ctx->event_ctx) {
+		messaging_dispatch_classic(msg_ctx, rec);
+	}
 
 	if (!messaging_append_new_waiters(msg_ctx)) {
 		for (j=0; j < rec->num_fds; j++) {
@@ -1032,7 +1043,8 @@ static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
 
 		state = tevent_req_data(
 			req, struct messaging_filtered_read_state);
-		if (state->filter(rec, state->private_data)) {
+		if ((ev == state->ev) &&
+		    state->filter(rec, state->private_data)) {
 			messaging_filtered_read_done(req, rec);
 
 			/*
@@ -1045,6 +1057,32 @@ static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
 		i += 1;
 	}
 
+	if (ev != msg_ctx->event_ctx) {
+		struct iovec iov;
+		int fds[rec->num_fds];
+		int ret;
+
+		/*
+		 * We've been listening on a nested event
+		 * context. Messages need to be handled in the main
+		 * event context, so post to ourselves
+		 */
+
+		iov.iov_base = rec->buf.data;
+		iov.iov_len = rec->buf.length;
+
+		for (i=0; i<rec->num_fds; i++) {
+			fds[i] = rec->fds[i];
+		}
+
+		ret = messaging_post_self(
+			msg_ctx, rec->src, rec->dest, rec->msg_type,
+			&iov, 1, fds, rec->num_fds);
+		if (ret == 0) {
+			return;
+		}
+	}
+
 	/*
 	 * If the fd-array isn't used, just close it.
 	 */
-- 
2.7.4


>From d83b19f6c170abab7cd7b55b0286868d141f085f Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Wed, 28 Sep 2016 16:05:25 -0700
Subject: [PATCH 26/26] messaging: Postpone messages to the right tevent
 context

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source4/lib/messaging/messaging.c | 63 +++++++++++++++++++++++++++++++++++++++
 1 file changed, 63 insertions(+)

diff --git a/source4/lib/messaging/messaging.c b/source4/lib/messaging/messaging.c
index 950a685..aca1a38 100644
--- a/source4/lib/messaging/messaging.c
+++ b/source4/lib/messaging/messaging.c
@@ -56,6 +56,7 @@ struct irpc_request {
 
 struct imessaging_context {
 	struct imessaging_context *prev, *next;
+	struct tevent_context *ev;
 	struct server_id server_id;
 	const char *sock_dir;
 	const char *lock_dir;
@@ -347,6 +348,7 @@ struct imessaging_context *imessaging_init(TALLOC_CTX *mem_ctx,
 	if (msg == NULL) {
 		return NULL;
 	}
+	msg->ev = ev;
 
 	talloc_set_destructor(msg, imessaging_context_destructor);
 
@@ -416,6 +418,50 @@ fail:
 	return NULL;
 }
 
+struct imessaging_post_state {
+	struct imessaging_context *msg_ctx;
+	size_t buf_len;
+	uint8_t buf[];
+};
+
+static void imessaging_post_handler(struct tevent_context *ev,
+				    struct tevent_immediate *ti,
+				    void *private_data)
+{
+	struct imessaging_post_state *state = talloc_get_type_abort(
+		private_data, struct imessaging_post_state);
+	imessaging_dgm_recv(ev, state->buf, state->buf_len, NULL, 0,
+			    state->msg_ctx);
+}
+
+static int imessaging_post_self(struct imessaging_context *msg,
+				const uint8_t *buf, size_t buf_len)
+{
+	struct tevent_immediate *ti;
+	struct imessaging_post_state *state;
+
+	ti = tevent_create_immediate(msg);
+	if (ti == NULL) {
+		return ENOMEM;
+	}
+
+	state = talloc_size(
+		ti, offsetof(struct imessaging_post_state, buf) + buf_len);
+	if (state == NULL) {
+		TALLOC_FREE(ti);
+		return ENOMEM;
+	}
+	talloc_set_name_const(state, "struct imessaging_post_state");
+
+	state->msg_ctx = msg;
+	memcpy(state->buf, buf, buf_len);
+
+	tevent_schedule_immediate(ti, msg->ev, imessaging_post_handler,
+				  msg);
+
+	return 0;
+}
+
 static void imessaging_dgm_recv(struct tevent_context *ev,
 				const uint8_t *buf, size_t buf_len,
 				int *fds, size_t num_fds,
@@ -433,6 +479,23 @@ static void imessaging_dgm_recv(struct tevent_context *ev,
 		return;
 	}
 
+	if (num_fds != 0) {
+		/*
+		 * Source4 based messaging does not expect fd's yet
+		 */
+		return;
+	}
+
+	if (ev != msg->ev) {
+		int ret;
+		ret = imessaging_post_self(msg, buf, buf_len);
+		if (ret != 0) {
+			DBG_WARNING("imessaging_post_self failed: %s\n",
+				    strerror(ret));
+		}
+		return;
+	}
+
 	message_hdr_get(&msg_type, &src, &dst, buf);
 
 	data.data = discard_const_p(uint8_t, buf + MESSAGE_HDR_LENGTH);
-- 
2.7.4



More information about the samba-technical mailing list