[PATCH] Unix datagram socket messaging

Jeremy Allison jra at samba.org
Tue Apr 22 11:44:50 MDT 2014


On Tue, Apr 22, 2014 at 05:52:12PM +0200, Stefan (metze) Metzmacher wrote:

OK - latest patchset with the following changes:

> My idea was to pass 'struct tevent_context *ev' to make it clear that
> this has to be this specific type.

Fixed.

> There's a 2nd socket(AF_UNIX, SOCK_DGRAM, 0); in
> unix_dgram_send_queue_init()
> which needs to set just FD_CLOEXEC.

Fixed. I split the prepare_socket function
into two, one to set O_NONBLOCK, one to
set CLOEXEC to improve code reuse.

> One of the tests explicitly passes path==NULL to unix_msg_init()
> and then unix_dgram_init().
>
> Maybe the better fix would be only call
> ctx->created_pid = getpid(); when we call bind()
> and set it to -1 otherwise.
>
> Then 'if (getpid() == ctx->created_pid) {' would do the correct thing.

I restored Volker's path == NULL handling
and implemented your requested change.

Although I'm not keen on the explicit allowing
of path==NULL, after looking at the test I
do see the use of it, so fixed it as you
recommended.

Let me know if this is OK to go in.

Cheers,

        Jeremy
-------------- next part --------------
From 261b19590b657fb92dd9fc5a0a3c075e5acaae38 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 24 Feb 2014 11:43:51 +0000
Subject: [PATCH 01/24] lib: Add poll_funcs

This is an abstraction for a tevent loop. It will be used in low-level
messaging with the goal to make low-leve our low-level messaging routines
usable also for other projects which are not based on tevent.

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/lib/poll_funcs/poll_funcs.h        | 131 ++++++++++++++++++++++++++
 source3/lib/poll_funcs/poll_funcs_tevent.c | 143 +++++++++++++++++++++++++++++
 source3/lib/poll_funcs/poll_funcs_tevent.h |  27 ++++++
 source3/lib/poll_funcs/wscript_build       |   5 +
 source3/wscript_build                      |   1 +
 5 files changed, 307 insertions(+)
 create mode 100644 source3/lib/poll_funcs/poll_funcs.h
 create mode 100644 source3/lib/poll_funcs/poll_funcs_tevent.c
 create mode 100644 source3/lib/poll_funcs/poll_funcs_tevent.h
 create mode 100644 source3/lib/poll_funcs/wscript_build

diff --git a/source3/lib/poll_funcs/poll_funcs.h b/source3/lib/poll_funcs/poll_funcs.h
new file mode 100644
index 0000000..b23e7d9
--- /dev/null
+++ b/source3/lib/poll_funcs/poll_funcs.h
@@ -0,0 +1,131 @@
+/*
+ * Unix SMB/CIFS implementation.
+ * Copyright (C) Volker Lendecke 2013
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+/**
+ * @file poll_funcs.h
+ *
+ * @brief event loop abstraction
+ */
+
+/*
+ * This is inspired by AvahiWatch, the avahi event loop abstraction.
+ */
+
+#ifndef __POLL_FUNCS_H__
+#define __POLL_FUNCS_H__
+
+#include "replace.h"
+
+/**
+ * poll_watch and poll_timeout are undefined here, every implementation can
+ * implement its own structures.
+ */
+
+struct poll_watch;
+struct poll_timeout;
+
+struct poll_funcs {
+
+	/**
+	 * @brief Create a new file descriptor watch
+	 *
+	 * @param[in] funcs The callback array
+	 * @param[in] fd The fd to watch
+	 * @param[in] events POLLIN and POLLOUT or'ed together
+	 * @param[in] callback Function to call by the implementation
+	 * @param[in] private_data Pointer to give back to callback
+	 *
+	 * @return A new poll_watch struct
+	 */
+
+	struct poll_watch *(*watch_new)(
+		const struct poll_funcs *funcs, int fd, short events,
+		void (*callback)(struct poll_watch *w, int fd,
+				 short events, void *private_data),
+		void *private_data);
+
+	/**
+	 * @brief Change the watched events for a struct poll_watch
+	 *
+	 * @param[in] w The poll_watch to change
+	 * @param[in] events new POLLIN and POLLOUT or'ed together
+	 */
+
+	void (*watch_update)(struct poll_watch *w, short events);
+
+	/**
+	 * @brief Read events currently watched
+	 *
+	 * @param[in] w The poll_watch to inspect
+	 *
+	 * @returns The events currently watched
+	 */
+
+	short (*watch_get_events)(struct poll_watch *w);
+
+	/**
+	 * @brief Free a struct poll_watch
+	 *
+	 * @param[in] w The poll_watch struct to free
+	 */
+
+	void (*watch_free)(struct poll_watch *w);
+
+
+	/**
+	 * @brief Create a new timeout watch
+	 *
+	 * @param[in] funcs The callback array
+	 * @param[in] tv The time when the timeout should trigger
+	 * @param[in] callback Function to call at time "ts"
+	 * @param[in] private_data Pointer to give back to callback
+	 *
+	 * @return A new poll_timeout struct
+	 */
+
+	struct poll_timeout *(*timeout_new)(
+		const struct poll_funcs *funcs, const struct timeval *tv,
+		void (*callback)(struct poll_timeout *t, void *private_data),
+		void *private_data);
+
+	/**
+	 * @brief Change the timeout of a watch
+	 *
+	 * @param[in] t The timeout watch to change
+	 * @param[in] ts The new trigger time
+	 */
+
+	void (*timeout_update)(struct poll_timeout *t,
+			       const struct timespec *ts);
+
+	/**
+	 * @brief Free a poll_timeout
+	 *
+	 * @param[in] t The poll_timeout to free
+	 */
+
+	void (*timeout_free)(struct poll_timeout *t);
+
+	/**
+	 * @brief private data for use by the implmentation
+	 */
+
+	void *private_data;
+};
+
+#endif
diff --git a/source3/lib/poll_funcs/poll_funcs_tevent.c b/source3/lib/poll_funcs/poll_funcs_tevent.c
new file mode 100644
index 0000000..6e75042
--- /dev/null
+++ b/source3/lib/poll_funcs/poll_funcs_tevent.c
@@ -0,0 +1,143 @@
+/*
+ * Unix SMB/CIFS implementation.
+ * Copyright (C) Volker Lendecke 2013
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "poll_funcs_tevent.h"
+#include "tevent.h"
+#include "system/select.h"
+
+struct poll_watch {
+	struct tevent_fd *fde;
+	int fd;
+	void (*callback)(struct poll_watch *w, int fd, short events,
+			 void *private_data);
+	void *private_data;
+};
+
+static uint16_t poll_events_to_tevent(short events)
+{
+	uint16_t ret = 0;
+
+	if (events & POLLIN) {
+		ret |= TEVENT_FD_READ;
+	}
+	if (events & POLLOUT) {
+		ret |= TEVENT_FD_WRITE;
+	}
+	return ret;
+}
+
+static short tevent_to_poll_events(uint16_t flags)
+{
+	short ret = 0;
+
+	if (flags & TEVENT_FD_READ) {
+		ret |= POLLIN;
+	}
+	if (flags & TEVENT_FD_WRITE) {
+		ret |= POLLOUT;
+	}
+	return ret;
+}
+
+static void tevent_watch_handler(struct tevent_context *ev,
+				 struct tevent_fd *fde, uint16_t flags,
+				 void *private_data);
+
+static struct poll_watch *tevent_watch_new(
+	const struct poll_funcs *funcs, int fd, short events,
+	void (*callback)(struct poll_watch *w, int fd, short events,
+			 void *private_data),
+	void *private_data)
+{
+	struct tevent_context *ev = talloc_get_type_abort(
+		funcs->private_data, struct tevent_context);
+	struct poll_watch *w;
+
+	w = talloc(ev, struct poll_watch);
+	if (w == NULL) {
+		return NULL;
+	}
+	w->fde = tevent_add_fd(ev, w, fd, poll_events_to_tevent(events),
+			       tevent_watch_handler, w);
+	if (w->fde == NULL) {
+		TALLOC_FREE(w);
+		return NULL;
+	}
+	w->fd = fd;
+	w->callback = callback;
+	w->private_data = private_data;
+	return w;
+}
+
+static void tevent_watch_handler(struct tevent_context *ev,
+				 struct tevent_fd *fde, uint16_t flags,
+				 void *private_data)
+{
+	struct poll_watch *w = talloc_get_type_abort(
+		private_data, struct poll_watch);
+
+	w->callback(w, w->fd, tevent_to_poll_events(flags),
+		    w->private_data);
+}
+
+static void tevent_watch_update(struct poll_watch *w, short events)
+{
+	tevent_fd_set_flags(w->fde, poll_events_to_tevent(events));
+}
+
+static short tevent_watch_get_events(struct poll_watch *w)
+{
+	return tevent_to_poll_events(tevent_fd_get_flags(w->fde));
+}
+
+static void tevent_watch_free(struct poll_watch *w)
+{
+	TALLOC_FREE(w);
+}
+
+static struct poll_timeout *tevent_timeout_new(
+	const struct poll_funcs *funcs, const struct timeval *tv,
+	void (*callback)(struct poll_timeout *t, void *private_data),
+	void *private_data)
+{
+	/* not implemented yet */
+	return NULL;
+}
+
+static void tevent_timeout_update(struct poll_timeout *t,
+				  const struct timespec *ts)
+{
+	return;
+}
+
+static void tevent_timeout_free(struct poll_timeout *t)
+{
+	return;
+}
+
+void poll_funcs_init_tevent(struct poll_funcs *f, struct tevent_context *ev)
+{
+	f->watch_new = tevent_watch_new;
+	f->watch_update = tevent_watch_update;
+	f->watch_get_events = tevent_watch_get_events;
+	f->watch_free = tevent_watch_free;
+	f->timeout_new = tevent_timeout_new;
+	f->timeout_update = tevent_timeout_update;
+	f->timeout_free = tevent_timeout_free;
+	f->private_data = ev;
+}
diff --git a/source3/lib/poll_funcs/poll_funcs_tevent.h b/source3/lib/poll_funcs/poll_funcs_tevent.h
new file mode 100644
index 0000000..2e67720
--- /dev/null
+++ b/source3/lib/poll_funcs/poll_funcs_tevent.h
@@ -0,0 +1,27 @@
+/*
+ * Unix SMB/CIFS implementation.
+ * Copyright (C) Volker Lendecke 2013
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef __POLL_FUNCS_TEVENT_H__
+#define __POLL_FUNCS_TEVENT_H__
+
+#include "poll_funcs.h"
+#include "tevent.h"
+
+void poll_funcs_init_tevent(struct poll_funcs *f, struct tevent_context *ev);
+
+#endif
diff --git a/source3/lib/poll_funcs/wscript_build b/source3/lib/poll_funcs/wscript_build
new file mode 100644
index 0000000..ab24814
--- /dev/null
+++ b/source3/lib/poll_funcs/wscript_build
@@ -0,0 +1,5 @@
+#!/usr/bin/env python
+
+bld.SAMBA3_SUBSYSTEM('POLL_FUNCS_TEVENT',
+                     source='poll_funcs_tevent.c',
+		     deps='tevent')
diff --git a/source3/wscript_build b/source3/wscript_build
index b872cbc..fd53e2f 100755
--- a/source3/wscript_build
+++ b/source3/wscript_build
@@ -1452,6 +1452,7 @@ bld.RECURSE('auth')
 bld.RECURSE('libgpo/gpext')
 bld.RECURSE('lib/pthreadpool')
 bld.RECURSE('lib/asys')
+bld.RECURSE('lib/poll_funcs')
 bld.RECURSE('librpc')
 bld.RECURSE('librpc/idl')
 bld.RECURSE('libsmb')
-- 
1.9.1.423.g4596e3a


From a2c6bd50a9a363b3db33d54f41701cb279eb6f08 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 24 Feb 2014 11:48:16 +0000
Subject: [PATCH 02/24] lib: Add unix_msg

This is a messaging layer based on unix domain datagram sockets.

Sending to an idle socket is just one single nonblocking sendmsg call. If the
recv queue is full, we start a background thread to do a blocking call. The
source4 based imessaging uses a polling fallback. In a situation where
thousands of senders beat one single blocked socket, this will generate load on
the system due to the constant polling. This does not happen with a threaded
blocking send call.

The threaded approach has another advantage: We save become_root() calls on the
retries. The access checks are done when the blocking socket is connected, the
threaded blocking send call does not check permissions anymore.

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/lib/unix_msg/test_drain.c  |  70 +++
 source3/lib/unix_msg/test_source.c |  79 ++++
 source3/lib/unix_msg/tests.c       | 225 ++++++++++
 source3/lib/unix_msg/unix_msg.c    | 863 +++++++++++++++++++++++++++++++++++++
 source3/lib/unix_msg/unix_msg.h    | 107 +++++
 source3/lib/unix_msg/wscript_build |  18 +
 source3/wscript_build              |   1 +
 7 files changed, 1363 insertions(+)
 create mode 100644 source3/lib/unix_msg/test_drain.c
 create mode 100644 source3/lib/unix_msg/test_source.c
 create mode 100644 source3/lib/unix_msg/tests.c
 create mode 100644 source3/lib/unix_msg/unix_msg.c
 create mode 100644 source3/lib/unix_msg/unix_msg.h
 create mode 100644 source3/lib/unix_msg/wscript_build

diff --git a/source3/lib/unix_msg/test_drain.c b/source3/lib/unix_msg/test_drain.c
new file mode 100644
index 0000000..6fe8c18
--- /dev/null
+++ b/source3/lib/unix_msg/test_drain.c
@@ -0,0 +1,70 @@
+#include "replace.h"
+#include "unix_msg.h"
+#include "poll_funcs/poll_funcs_tevent.h"
+#include "tevent.h"
+#include "system/select.h"
+
+struct cb_state {
+	unsigned num_received;
+	uint8_t *buf;
+	size_t buflen;
+};
+
+static void recv_cb(struct unix_msg_ctx *ctx,
+		    uint8_t *msg, size_t msg_len,
+		    void *private_data);
+
+int main(int argc, const char *argv[])
+{
+	struct poll_funcs funcs;
+	const char *sock;
+	struct unix_msg_ctx *ctx;
+	struct tevent_context *ev;
+	int ret;
+
+	struct cb_state state;
+
+	if (argc != 2) {
+		fprintf(stderr, "Usage: %s <sockname>\n", argv[0]);
+		return 1;
+	}
+
+	sock = argv[1];
+	unlink(sock);
+
+	ev = tevent_context_init(NULL);
+	if (ev == NULL) {
+		perror("tevent_context_init failed");
+		return 1;
+	}
+	poll_funcs_init_tevent(&funcs, ev);
+
+	ret = unix_msg_init(sock, &funcs, 256, 1,
+			    recv_cb, &state, &ctx);
+	if (ret != 0) {
+		fprintf(stderr, "unix_msg_init failed: %s\n",
+			strerror(ret));
+		return 1;
+	}
+
+	while (1) {
+		ret = tevent_loop_once(ev);
+		if (ret == -1) {
+			fprintf(stderr, "tevent_loop_once failed: %s\n",
+				strerror(errno));
+			exit(1);
+		}
+	}
+	return 0;
+}
+
+static void recv_cb(struct unix_msg_ctx *ctx,
+		    uint8_t *msg, size_t msg_len,
+		    void *private_data)
+{
+	unsigned num;
+	if (msg_len == sizeof(num)) {
+		memcpy(&num, msg, msg_len);
+		printf("%u\n", num);
+	}
+}
diff --git a/source3/lib/unix_msg/test_source.c b/source3/lib/unix_msg/test_source.c
new file mode 100644
index 0000000..bfafee1
--- /dev/null
+++ b/source3/lib/unix_msg/test_source.c
@@ -0,0 +1,79 @@
+#include "replace.h"
+#include "unix_msg.h"
+#include "poll_funcs/poll_funcs_tevent.h"
+#include "tevent.h"
+
+int main(int argc, const char *argv[])
+{
+	struct poll_funcs funcs;
+	struct unix_msg_ctx **ctxs;
+	struct tevent_context *ev;
+	struct iovec iov;
+	int ret;
+	unsigned i;
+	unsigned num_ctxs = 1;
+
+	if (argc < 2) {
+		fprintf(stderr, "Usage: %s <sockname> [num_contexts]\n", argv[0]);
+		return 1;
+	}
+	if (argc > 2) {
+		num_ctxs = atoi(argv[2]);
+	}
+
+	ev = tevent_context_init(NULL);
+	if (ev == NULL) {
+		perror("tevent_context_init failed");
+		return 1;
+	}
+	poll_funcs_init_tevent(&funcs, ev);
+
+	ctxs = talloc_array(ev, struct unix_msg_ctx *, num_ctxs);
+	if (ctxs == NULL) {
+		fprintf(stderr, "talloc failed\n");
+		return 1;
+	}
+
+	for (i=0; i<num_ctxs; i++) {
+		ret = unix_msg_init(NULL, &funcs, 256, 1, NULL, NULL,
+				    &ctxs[i]);
+		if (ret != 0) {
+			fprintf(stderr, "unix_msg_init failed: %s\n",
+				strerror(ret));
+			return 1;
+		}
+	}
+
+	iov.iov_base = &i;
+	iov.iov_len = sizeof(i);
+
+	for (i=0; i<num_ctxs; i++) {
+		unsigned j;
+
+		for (j=0; j<100000; j++) {
+			ret = unix_msg_send(ctxs[i], argv[1], &iov, 1);
+			if (ret != 0) {
+				fprintf(stderr, "unix_msg_send failed: %s\n",
+					strerror(ret));
+				return 1;
+			}
+		}
+	}
+
+	while (true) {
+		ret = tevent_loop_once(ev);
+		if (ret == -1) {
+			fprintf(stderr, "tevent_loop_once failed: %s\n",
+				strerror(errno));
+			exit(1);
+		}
+	}
+
+	for (i=0; i<num_ctxs; i++) {
+		unix_msg_free(ctxs[i]);
+	}
+
+	talloc_free(ev);
+
+	return 0;
+}
diff --git a/source3/lib/unix_msg/tests.c b/source3/lib/unix_msg/tests.c
new file mode 100644
index 0000000..2a4cf86
--- /dev/null
+++ b/source3/lib/unix_msg/tests.c
@@ -0,0 +1,225 @@
+#include "replace.h"
+#include "unix_msg.h"
+#include "poll_funcs/poll_funcs_tevent.h"
+#include "tevent.h"
+
+struct cb_state {
+	unsigned num_received;
+	uint8_t *buf;
+	size_t buflen;
+};
+
+static void recv_cb(struct unix_msg_ctx *ctx,
+		    uint8_t *msg, size_t msg_len,
+		    void *private_data);
+
+static void expect_messages(struct tevent_context *ev, struct cb_state *state,
+			    unsigned num_msgs)
+{
+	state->num_received = 0;
+
+	while (state->num_received < num_msgs) {
+		int ret;
+
+		ret = tevent_loop_once(ev);
+		if (ret == -1) {
+			fprintf(stderr, "tevent_loop_once failed: %s\n",
+				strerror(errno));
+			exit(1);
+		}
+	}
+}
+
+int main(void)
+{
+	struct poll_funcs funcs;
+	const char *sock1 = "sock1";
+	const char *sock2 = "sock2";
+	struct unix_msg_ctx *ctx1, *ctx2;
+	struct tevent_context *ev;
+	struct iovec iov;
+	uint8_t msg;
+	int i, ret;
+	static uint8_t buf[1755];
+
+	struct cb_state state;
+
+	unlink(sock1);
+	unlink(sock2);
+
+	ev = tevent_context_init(NULL);
+	if (ev == NULL) {
+		perror("tevent_context_init failed");
+		return 1;
+	}
+	poll_funcs_init_tevent(&funcs, ev);
+
+	ret = unix_msg_init(sock1, &funcs, 256, 1,
+			    recv_cb, &state, &ctx1);
+	if (ret != 0) {
+		fprintf(stderr, "unix_msg_init failed: %s\n",
+			strerror(ret));
+		return 1;
+	}
+
+	ret = unix_msg_init(sock1, &funcs, 256, 1,
+			    recv_cb, &state, &ctx1);
+	if (ret == 0) {
+		fprintf(stderr, "unix_msg_init succeeded unexpectedly\n");
+		return 1;
+	}
+	if (ret != EADDRINUSE) {
+		fprintf(stderr, "unix_msg_init returned %s, expected "
+			"EADDRINUSE\n", strerror(ret));
+		return 1;
+	}
+
+	ret = unix_msg_init(sock2, &funcs, 256, 1,
+			    recv_cb, &state, &ctx2);
+	if (ret != 0) {
+		fprintf(stderr, "unix_msg_init failed: %s\n",
+			strerror(ret));
+		return 1;
+	}
+
+	printf("sending a 0-length message\n");
+
+	state.buf = NULL;
+	state.buflen = 0;
+
+	ret = unix_msg_send(ctx1, sock2, NULL, 0);
+	if (ret != 0) {
+		fprintf(stderr, "unix_msg_send failed: %s\n",
+			strerror(ret));
+		return 1;
+	}
+
+	expect_messages(ev, &state, 1);
+
+	printf("sending a small message\n");
+
+	msg = random();
+	iov.iov_base = &msg;
+	iov.iov_len = sizeof(msg);
+	state.buf = &msg;
+	state.buflen = sizeof(msg);
+
+	ret = unix_msg_send(ctx1, sock2, &iov, 1);
+	if (ret != 0) {
+		fprintf(stderr, "unix_msg_send failed: %s\n",
+			strerror(ret));
+		return 1;
+	}
+
+	expect_messages(ev, &state, 1);
+
+	printf("sending six large, interleaved messages\n");
+
+	for (i=0; i<sizeof(buf); i++) {
+		buf[i] = random();
+	}
+
+	iov.iov_base = buf;
+	iov.iov_len = sizeof(buf);
+	state.buf = buf;
+	state.buflen = sizeof(buf);
+
+	for (i=0; i<3; i++) {
+		ret = unix_msg_send(ctx1, sock2, &iov, 1);
+		if (ret != 0) {
+			fprintf(stderr, "unix_msg_send failed: %s\n",
+				strerror(ret));
+			return 1;
+		}
+		ret = unix_msg_send(ctx2, sock2, &iov, 1);
+		if (ret != 0) {
+			fprintf(stderr, "unix_msg_send failed: %s\n",
+				strerror(ret));
+			return 1;
+		}
+	}
+
+	expect_messages(ev, &state, 6);
+
+	printf("sending a few messages in small pieces\n");
+
+	for (i = 0; i<5; i++) {
+		struct iovec iovs[20];
+		const size_t num_iovs = ARRAY_SIZE(iovs);
+		uint8_t *p = buf;
+		size_t j;
+
+		for (j=0; j<num_iovs-1; j++) {
+			size_t chunk = (random() % ((sizeof(buf) * 2) / num_iovs));
+			size_t space = (sizeof(buf) - (p - buf));
+
+			if (space == 0) {
+				break;
+			}
+
+			chunk = MIN(chunk, space);
+
+			iovs[j].iov_base = p;
+			iovs[j].iov_len = chunk;
+			p += chunk;
+		}
+
+		if (p < (buf + sizeof(buf))) {
+			iovs[j].iov_base = p;
+			iovs[j].iov_len = (sizeof(buf) - (p - buf));
+			j++;
+		}
+
+		ret = unix_msg_send(ctx1, sock1, iovs, j);
+		if (ret != 0) {
+			fprintf(stderr, "unix_msg_send failed: %s\n",
+				strerror(ret));
+			return 1;
+		}
+	}
+
+	expect_messages(ev, &state, 5);
+
+	printf("Filling send queues before freeing\n");
+
+	for (i=0; i<5; i++) {
+		ret = unix_msg_send(ctx1, sock2, &iov, 1);
+		if (ret != 0) {
+			fprintf(stderr, "unix_msg_send failed: %s\n",
+				strerror(ret));
+			return 1;
+		}
+		ret = unix_msg_send(ctx1, sock1, &iov, 1);
+		if (ret != 0) {
+			fprintf(stderr, "unix_msg_send failed: %s\n",
+				strerror(ret));
+			return 1;
+		}
+	}
+
+	expect_messages(ev, &state, 1); /* Read just one msg */
+
+	unix_msg_free(ctx1);
+	unix_msg_free(ctx2);
+	talloc_free(ev);
+
+	return 0;
+}
+
+static void recv_cb(struct unix_msg_ctx *ctx,
+		    uint8_t *msg, size_t msg_len,
+		    void *private_data)
+{
+	struct cb_state *state = (struct cb_state *)private_data;
+
+	if (msg_len != state->buflen) {
+		fprintf(stderr, "expected %u bytes, got %u\n",
+			(unsigned)state->buflen, (unsigned)msg_len);
+		exit(1);
+	}
+	if ((msg_len != 0) && (memcmp(msg, state->buf, msg_len) != 0)) {
+		fprintf(stderr, "message content differs\n");
+		exit(1);
+	}
+	state->num_received += 1;
+}
diff --git a/source3/lib/unix_msg/unix_msg.c b/source3/lib/unix_msg/unix_msg.c
new file mode 100644
index 0000000..88b6a67
--- /dev/null
+++ b/source3/lib/unix_msg/unix_msg.c
@@ -0,0 +1,863 @@
+/*
+ * Unix SMB/CIFS implementation.
+ * Copyright (C) Volker Lendecke 2013
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "replace.h"
+#include "unix_msg.h"
+#include "system/select.h"
+#include "system/time.h"
+#include "system/network.h"
+#include "dlinklist.h"
+#include "pthreadpool/pthreadpool.h"
+#include <fcntl.h>
+
+/*
+ * This file implements two abstractions: The "unix_dgram" functions implement
+ * queueing for unix domain datagram sockets. You can send to a destination
+ * socket, and if that has no free space available, it will fall back to an
+ * anonymous socket that will poll for writability. "unix_dgram" expects the
+ * data size not to exceed the system limit.
+ *
+ * The "unix_msg" functions implement the fragmentation of large messages on
+ * top of "unix_dgram". This is what is exposed to the user of this API.
+ */
+
+struct unix_dgram_msg {
+	struct unix_dgram_msg *prev, *next;
+
+	int sock;
+	ssize_t sent;
+	int sys_errno;
+	size_t buflen;
+	uint8_t buf[1];
+};
+
+struct unix_dgram_send_queue {
+	struct unix_dgram_send_queue *prev, *next;
+	struct unix_dgram_ctx *ctx;
+	int sock;
+	struct unix_dgram_msg *msgs;
+	char path[1];
+};
+
+struct unix_dgram_ctx {
+	int sock;
+	pid_t created_pid;
+	const struct poll_funcs *ev_funcs;
+	size_t max_msg;
+
+	void (*recv_callback)(struct unix_dgram_ctx *ctx,
+			      uint8_t *msg, size_t msg_len,
+			      void *private_data);
+	void *private_data;
+
+	struct poll_watch *sock_read_watch;
+	struct unix_dgram_send_queue *send_queues;
+
+	struct pthreadpool *send_pool;
+	struct poll_watch *pool_read_watch;
+
+	uint8_t *recv_buf;
+	char path[1];
+};
+
+static ssize_t iov_buflen(const struct iovec *iov, int iovlen);
+static void unix_dgram_recv_handler(struct poll_watch *w, int fd, short events,
+				    void *private_data);
+
+/* Set socket non blocking. */
+static int prepare_socket_nonblock(int sock)
+{
+	int flags;
+#ifdef O_NONBLOCK
+#define FLAG_TO_SET O_NONBLOCK
+#else
+#ifdef SYSV
+#define FLAG_TO_SET O_NDELAY
+#else /* BSD */
+#define FLAG_TO_SET FNDELAY
+#endif
+#endif
+
+	flags = fcntl(sock, F_GETFL);
+	if (flags == -1) {
+		return errno;
+	}
+	flags |= FLAG_TO_SET;
+	if (fcntl(sock, F_SETFL, flags) == -1) {
+		return errno;
+	}
+
+#undef FLAG_TO_SET
+	return 0;
+}
+
+/* Set socket close on exec. */
+static int prepare_socket_cloexec(int sock)
+{
+#ifdef FD_CLOEXEC
+	int flags;
+
+	flags = fcntl(sock, F_GETFD, 0);
+	if (flags == -1) {
+		return errno;
+	}
+	flags |= FD_CLOEXEC;
+	if (fcntl(sock, F_SETFD, flags) == -1) {
+		return errno;
+	}
+#endif
+	return 0;
+}
+
+/* Set socket non blocking and close on exec. */
+static int prepare_socket(int sock)
+{
+	int ret = prepare_socket_nonblock(sock);
+
+	if (ret) {
+		return ret;
+	}
+	return prepare_socket_cloexec(sock);
+}
+
+static int unix_dgram_init(const char *path, size_t max_msg,
+			   const struct poll_funcs *ev_funcs,
+			   void (*recv_callback)(struct unix_dgram_ctx *ctx,
+						 uint8_t *msg, size_t msg_len,
+						 void *private_data),
+			   void *private_data,
+			   struct unix_dgram_ctx **result)
+{
+	struct unix_dgram_ctx *ctx;
+	struct sockaddr_un addr = { 0, };
+	size_t pathlen;
+	int ret;
+
+	if (path != NULL) {
+		pathlen = strlen(path)+1;
+		if (pathlen > sizeof(addr.sun_path)) {
+			return ENAMETOOLONG;
+		}
+	} else {
+		pathlen = 1;
+	}
+
+	pathlen = strlen(path)+1;
+	if (pathlen > sizeof(addr.sun_path)) {
+		return ENAMETOOLONG;
+	}
+
+	ctx = malloc(offsetof(struct unix_dgram_ctx, path) + pathlen);
+	if (ctx == NULL) {
+		return ENOMEM;
+	}
+	if (path != NULL) {
+		memcpy(ctx->path, path, pathlen);
+	} else {
+		ctx->path[0] = '\0';
+	}
+
+	ctx->recv_buf = malloc(max_msg);
+	if (ctx->recv_buf == NULL) {
+		free(ctx);
+		return ENOMEM;
+	}
+	ctx->max_msg = max_msg;
+	ctx->ev_funcs = ev_funcs;
+	ctx->recv_callback = recv_callback;
+	ctx->private_data = private_data;
+	ctx->sock_read_watch = NULL;
+	ctx->send_pool = NULL;
+	ctx->pool_read_watch = NULL;
+	ctx->send_queues = NULL;
+	ctx->created_pid = (pid_t)-1;
+
+	ctx->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
+	if (ctx->sock == -1) {
+		ret = errno;
+		goto fail_free;
+	}
+
+	/* Set non-blocking and close-on-exec. */
+	ret = prepare_socket(ctx->sock);
+	if (ret != 0) {
+		goto fail_close;
+	}
+
+	if (path != NULL) {
+		addr.sun_family = AF_UNIX;
+		memcpy(addr.sun_path, path, pathlen);
+
+		ret = bind(ctx->sock, (struct sockaddr *)(void *)&addr,
+				sizeof(addr));
+		if (ret == -1) {
+			ret = errno;
+			goto fail_close;
+		}
+
+		ctx->created_pid = getpid();
+
+		ctx->sock_read_watch = ctx->ev_funcs->watch_new(
+			ctx->ev_funcs, ctx->sock, POLLIN,
+			unix_dgram_recv_handler, ctx);
+
+		if (ctx->sock_read_watch == NULL) {
+			ret = ENOMEM;
+			goto fail_close;
+		}
+	}
+
+	*result = ctx;
+	return 0;
+
+fail_close:
+	close(ctx->sock);
+fail_free:
+	free(ctx->recv_buf);
+	free(ctx);
+	return ret;
+}
+
+static void unix_dgram_recv_handler(struct poll_watch *w, int fd, short events,
+				    void *private_data)
+{
+	struct unix_dgram_ctx *ctx = (struct unix_dgram_ctx *)private_data;
+	ssize_t received;
+
+	received = recv(fd, ctx->recv_buf, ctx->max_msg, 0);
+	if (received == -1) {
+		if ((errno == EAGAIN) ||
+#ifdef EWOULDBLOCK
+		    (errno == EWOULDBLOCK) ||
+#endif
+		    (errno == EINTR) || (errno == ENOMEM)) {
+			/* Not really an error - just try again. */
+			return;
+		}
+		/* Problem with the socket. Set it unreadable. */
+		ctx->ev_funcs->watch_update(w, 0);
+		return;
+	}
+	if (received > ctx->max_msg) {
+		/* More than we expected, not for us */
+		return;
+	}
+	ctx->recv_callback(ctx, ctx->recv_buf, received, ctx->private_data);
+}
+
+static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
+				    void *private_data);
+
+static int unix_dgram_init_pthreadpool(struct unix_dgram_ctx *ctx)
+{
+	int ret, signalfd;
+
+	if (ctx->send_pool != NULL) {
+		return 0;
+	}
+
+	ret = pthreadpool_init(0, &ctx->send_pool);
+	if (ret != 0) {
+		return ret;
+	}
+
+	signalfd = pthreadpool_signal_fd(ctx->send_pool);
+
+	ctx->pool_read_watch = ctx->ev_funcs->watch_new(
+		ctx->ev_funcs, signalfd, POLLIN,
+		unix_dgram_job_finished, ctx);
+	if (ctx->pool_read_watch == NULL) {
+		pthreadpool_destroy(ctx->send_pool);
+		ctx->send_pool = NULL;
+		return ENOMEM;
+	}
+
+	return 0;
+}
+
+static int unix_dgram_send_queue_init(
+	struct unix_dgram_ctx *ctx, const char *path,
+	struct unix_dgram_send_queue **result)
+{
+	struct unix_dgram_send_queue *q;
+	struct sockaddr_un addr = { 0, };
+	size_t pathlen;
+	int ret, err;
+
+	pathlen = strlen(path)+1;
+
+	if (pathlen > sizeof(addr.sun_path)) {
+		return ENAMETOOLONG;
+	}
+
+	q = malloc(offsetof(struct unix_dgram_send_queue, path) + pathlen);
+	if (q == NULL) {
+		return ENOMEM;
+	}
+	q->ctx = ctx;
+	q->msgs = NULL;
+	memcpy(q->path, path, pathlen);
+
+	q->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
+	if (q->sock == -1) {
+		err = errno;
+		goto fail_free;
+	}
+
+	err = prepare_socket_cloexec(q->sock);
+	if (err != 0) {
+		goto fail_close;
+	}
+
+	addr.sun_family = AF_UNIX;
+	memcpy(addr.sun_path, path, pathlen+1);
+
+	do {
+		ret = connect(q->sock, (struct sockaddr *)&addr, sizeof(addr));
+	} while ((ret == -1) && (errno == EINTR));
+
+	if (ret == -1) {
+		err = errno;
+		goto fail_close;
+	}
+
+	err = unix_dgram_init_pthreadpool(ctx);
+	if (err != 0) {
+		goto fail_close;
+	}
+
+	DLIST_ADD(ctx->send_queues, q);
+
+	*result = q;
+	return 0;
+
+fail_close:
+	close(q->sock);
+fail_free:
+	free(q);
+	return err;
+}
+
+static void unix_dgram_send_queue_free(struct unix_dgram_send_queue *q)
+{
+	struct unix_dgram_ctx *ctx = q->ctx;
+
+	while (q->msgs != NULL) {
+		struct unix_dgram_msg *msg;
+		msg = q->msgs;
+		DLIST_REMOVE(q->msgs, msg);
+		free(msg);
+	}
+	close(q->sock);
+	DLIST_REMOVE(ctx->send_queues, q);
+	free(q);
+}
+
+static struct unix_dgram_send_queue *find_send_queue(
+	struct unix_dgram_ctx *ctx, const char *dst_sock)
+{
+	struct unix_dgram_send_queue *s;
+
+	for (s = ctx->send_queues; s != NULL; s = s->next) {
+		if (strcmp(s->path, dst_sock) == 0) {
+			return s;
+		}
+	}
+	return NULL;
+}
+
+static int queue_msg(struct unix_dgram_send_queue *q,
+		     const struct iovec *iov, int iovlen)
+{
+	struct unix_dgram_msg *msg;
+	ssize_t buflen;
+	size_t msglen;
+	int i;
+
+	buflen = iov_buflen(iov, iovlen);
+	if (buflen == -1) {
+		return EINVAL;
+	}
+
+	msglen = offsetof(struct unix_dgram_msg, buf) + buflen;
+	if ((msglen < buflen) ||
+	    (msglen < offsetof(struct unix_dgram_msg, buf))) {
+		/* overflow */
+		return EINVAL;
+	}
+
+	msg = malloc(msglen);
+	if (msg == NULL) {
+		return ENOMEM;
+	}
+	msg->buflen = buflen;
+	msg->sock = q->sock;
+
+	buflen = 0;
+	for (i=0; i<iovlen; i++) {
+		memcpy(&msg->buf[buflen], iov[i].iov_base, iov[i].iov_len);
+		buflen += iov[i].iov_len;
+	}
+
+	DLIST_ADD_END(q->msgs, msg, struct unix_dgram_msg);
+	return 0;
+}
+
+static void unix_dgram_send_job(void *private_data)
+{
+	struct unix_dgram_msg *msg = private_data;
+
+	do {
+		msg->sent = send(msg->sock, msg->buf, msg->buflen, 0);
+	} while ((msg->sent == -1) && (errno == EINTR));
+}
+
+static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
+				    void *private_data)
+{
+	struct unix_dgram_ctx *ctx = private_data;
+	struct unix_dgram_send_queue *q;
+	struct unix_dgram_msg *msg;
+	int ret, job;
+
+	ret = pthreadpool_finished_jobs(ctx->send_pool, &job, 1);
+	if (ret != 1) {
+		return;
+	}
+
+	for (q = ctx->send_queues; q != NULL; q = q->next) {
+		if (job == q->sock) {
+			break;
+		}
+	}
+
+	if (q == NULL) {
+		/* Huh? Should not happen */
+		return;
+	}
+
+	msg = q->msgs;
+	DLIST_REMOVE(q->msgs, msg);
+	free(msg);
+
+	if (q->msgs != NULL) {
+		ret = pthreadpool_add_job(ctx->send_pool, q->sock,
+					  unix_dgram_send_job, q->msgs);
+		if (ret == 0) {
+			return;
+		}
+	}
+
+	unix_dgram_send_queue_free(q);
+}
+
+static int unix_dgram_send(struct unix_dgram_ctx *ctx, const char *dst_sock,
+			   const struct iovec *iov, int iovlen)
+{
+	struct unix_dgram_send_queue *q;
+	struct sockaddr_un addr = { 0, };
+	struct msghdr msg;
+	size_t dst_len;
+	int ret;
+
+	dst_len = strlen(dst_sock);
+	if (dst_len >= sizeof(addr.sun_path)) {
+		return ENAMETOOLONG;
+	}
+
+	/*
+	 * To preserve message ordering, we have to queue a message when
+	 * others are waiting in line already.
+	 */
+	q = find_send_queue(ctx, dst_sock);
+	if (q != NULL) {
+		return queue_msg(q, iov, iovlen);
+	}
+
+	/*
+	 * Try a cheap nonblocking send
+	 */
+
+	addr.sun_family = AF_UNIX;
+	memcpy(addr.sun_path, dst_sock, dst_len);
+
+	msg.msg_name = &addr;
+	msg.msg_namelen = sizeof(addr);
+	msg.msg_iov = discard_const_p(struct iovec, iov);
+	msg.msg_iovlen = iovlen;
+	msg.msg_control = NULL;
+	msg.msg_controllen = 0;
+	msg.msg_flags = 0;
+
+	ret = sendmsg(ctx->sock, &msg, 0);
+	if (ret >= 0) {
+		return 0;
+	}
+#ifdef EWOULDBLOCK
+	if ((errno != EWOULDBLOCK) && (errno != EAGAIN) && (errno != EINTR)) {
+#else
+	if ((errno != EAGAIN) && (errno != EINTR)) {
+#endif
+		return errno;
+	}
+
+	ret = unix_dgram_send_queue_init(ctx, dst_sock, &q);
+	if (ret != 0) {
+		return ret;
+	}
+	ret = queue_msg(q, iov, iovlen);
+	if (ret != 0) {
+		unix_dgram_send_queue_free(q);
+		return ret;
+	}
+	ret = pthreadpool_add_job(ctx->send_pool, q->sock,
+				  unix_dgram_send_job, q->msgs);
+	if (ret != 0) {
+		unix_dgram_send_queue_free(q);
+		return ret;
+	}
+	return 0;
+}
+
+static int unix_dgram_sock(struct unix_dgram_ctx *ctx)
+{
+	return ctx->sock;
+}
+
+static int unix_dgram_free(struct unix_dgram_ctx *ctx)
+{
+	if (ctx->send_queues != NULL) {
+		return EBUSY;
+	}
+
+	if (ctx->send_pool != NULL) {
+		int ret = pthreadpool_destroy(ctx->send_pool);
+		if (ret != 0) {
+			return ret;
+		}
+		ctx->ev_funcs->watch_free(ctx->pool_read_watch);
+	}
+
+	ctx->ev_funcs->watch_free(ctx->sock_read_watch);
+
+	if (getpid() == ctx->created_pid) {
+		/* If we created it, unlink. Otherwise someone else might
+		 * still have it open */
+		unlink(ctx->path);
+	}
+
+	close(ctx->sock);
+	free(ctx->recv_buf);
+	free(ctx);
+	return 0;
+}
+
+/*
+ * Every message starts with a uint64_t cookie.
+ *
+ * A value of 0 indicates a single-fragment message which is complete in
+ * itself. The data immediately follows the cookie.
+ *
+ * Every multi-fragment message has a cookie != 0 and starts with a cookie
+ * followed by a struct unix_msg_header and then the data. The pid and sock
+ * fields are used to assure uniqueness on the receiver side.
+ */
+
+struct unix_msg_hdr {
+	size_t msglen;
+	pid_t pid;
+	int sock;
+};
+
+struct unix_msg {
+	struct unix_msg *prev, *next;
+	size_t msglen;
+	size_t received;
+	pid_t sender_pid;
+	int sender_sock;
+	uint64_t cookie;
+	uint8_t buf[1];
+};
+
+struct unix_msg_ctx {
+	struct unix_dgram_ctx *dgram;
+	size_t fragment_len;
+	uint64_t cookie;
+
+	void (*recv_callback)(struct unix_msg_ctx *ctx,
+			      uint8_t *msg, size_t msg_len,
+			      void *private_data);
+	void *private_data;
+
+	struct unix_msg *msgs;
+};
+
+static void unix_msg_recv(struct unix_dgram_ctx *ctx,
+			  uint8_t *msg, size_t msg_len,
+			  void *private_data);
+
+int unix_msg_init(const char *path, const struct poll_funcs *ev_funcs,
+		  size_t fragment_len, uint64_t cookie,
+		  void (*recv_callback)(struct unix_msg_ctx *ctx,
+					uint8_t *msg, size_t msg_len,
+					void *private_data),
+		  void *private_data,
+		  struct unix_msg_ctx **result)
+{
+	struct unix_msg_ctx *ctx;
+	int ret;
+
+	ctx = malloc(sizeof(*ctx));
+	if (ctx == NULL) {
+		return ENOMEM;
+	}
+
+	ret = unix_dgram_init(path, fragment_len, ev_funcs,
+			      unix_msg_recv, ctx, &ctx->dgram);
+	if (ret != 0) {
+		free(ctx);
+		return ret;
+	}
+
+	ctx->fragment_len = fragment_len;
+	ctx->cookie = cookie;
+	ctx->recv_callback = recv_callback;
+	ctx->private_data = private_data;
+	ctx->msgs = NULL;
+
+	*result = ctx;
+	return 0;
+}
+
+int unix_msg_send(struct unix_msg_ctx *ctx, const char *dst_sock,
+		  const struct iovec *iov, int iovlen)
+{
+	ssize_t msglen;
+	size_t sent;
+	int ret = 0;
+	struct iovec *iov_copy;
+	struct unix_msg_hdr hdr;
+	struct iovec src_iov;
+
+	if (iovlen < 0) {
+		return EINVAL;
+	}
+
+	msglen = iov_buflen(iov, iovlen);
+	if (msglen == -1) {
+		return EINVAL;
+	}
+
+	if ((iovlen < 16) &&
+	    (msglen <= (ctx->fragment_len - sizeof(uint64_t)))) {
+		struct iovec tmp_iov[16];
+		uint64_t cookie = 0;
+
+		tmp_iov[0].iov_base = &cookie;
+		tmp_iov[0].iov_len = sizeof(cookie);
+		if (iovlen > 0) {
+			memcpy(&tmp_iov[1], iov,
+			       sizeof(struct iovec) * iovlen);
+		}
+
+		return unix_dgram_send(ctx->dgram, dst_sock, tmp_iov,
+				       iovlen+1);
+	}
+
+	hdr.msglen = msglen;
+	hdr.pid = getpid();
+	hdr.sock = unix_dgram_sock(ctx->dgram);
+
+	iov_copy = malloc(sizeof(struct iovec) * (iovlen + 2));
+	if (iov_copy == NULL) {
+		return ENOMEM;
+	}
+	iov_copy[0].iov_base = &ctx->cookie;
+	iov_copy[0].iov_len = sizeof(ctx->cookie);
+	iov_copy[1].iov_base = &hdr;
+	iov_copy[1].iov_len = sizeof(hdr);
+
+	sent = 0;
+	src_iov = iov[0];
+
+	/*
+	 * The following write loop sends the user message in pieces. We have
+	 * filled the first two iovecs above with "cookie" and "hdr". In the
+	 * following loops we pull message chunks from the user iov array and
+	 * fill iov_copy piece by piece, possibly truncating chunks from the
+	 * caller's iov array. Ugly, but hopefully efficient.
+	 */
+
+	while (sent < msglen) {
+		size_t fragment_len;
+		size_t iov_index = 2;
+
+		fragment_len = sizeof(ctx->cookie) + sizeof(hdr);
+
+		while (fragment_len < ctx->fragment_len) {
+			size_t space, chunk;
+
+			space = ctx->fragment_len - fragment_len;
+			chunk = MIN(space, src_iov.iov_len);
+
+			iov_copy[iov_index].iov_base = src_iov.iov_base;
+			iov_copy[iov_index].iov_len = chunk;
+			iov_index += 1;
+
+			src_iov.iov_base = (char *)src_iov.iov_base + chunk;
+			src_iov.iov_len -= chunk;
+			fragment_len += chunk;
+
+			if (src_iov.iov_len == 0) {
+				iov += 1;
+				iovlen -= 1;
+				if (iovlen == 0) {
+					break;
+				}
+				src_iov = iov[0];
+			}
+		}
+		sent += (fragment_len - sizeof(ctx->cookie) - sizeof(hdr));
+
+		ret = unix_dgram_send(ctx->dgram, dst_sock,
+				      iov_copy, iov_index);
+		if (ret != 0) {
+			break;
+		}
+	}
+
+	free(iov_copy);
+
+	ctx->cookie += 1;
+	if (ctx->cookie == 0) {
+		ctx->cookie += 1;
+	}
+
+	return ret;
+}
+
+static void unix_msg_recv(struct unix_dgram_ctx *dgram_ctx,
+			  uint8_t *buf, size_t buflen,
+			  void *private_data)
+{
+	struct unix_msg_ctx *ctx = (struct unix_msg_ctx *)private_data;
+	struct unix_msg_hdr hdr;
+	struct unix_msg *msg;
+	size_t space;
+	uint64_t cookie;
+
+	if (buflen < sizeof(cookie)) {
+		return;
+	}
+	memcpy(&cookie, buf, sizeof(cookie));
+
+	buf += sizeof(cookie);
+	buflen -= sizeof(cookie);
+
+	if (cookie == 0) {
+		ctx->recv_callback(ctx,	buf, buflen, ctx->private_data);
+		return;
+	}
+
+	if (buflen < sizeof(hdr)) {
+		return;
+	}
+	memcpy(&hdr, buf, sizeof(hdr));
+
+	buf += sizeof(hdr);
+	buflen -= sizeof(hdr);
+
+	for (msg = ctx->msgs; msg != NULL; msg = msg->next) {
+		if ((msg->sender_pid == hdr.pid) &&
+		    (msg->sender_sock == hdr.sock)) {
+			break;
+		}
+	}
+
+	if ((msg != NULL) && (msg->cookie != cookie)) {
+		DLIST_REMOVE(ctx->msgs, msg);
+		free(msg);
+		msg = NULL;
+	}
+
+	if (msg == NULL) {
+		msg = malloc(offsetof(struct unix_msg, buf) + hdr.msglen);
+		if (msg == NULL) {
+			return;
+		}
+		msg->msglen = hdr.msglen;
+		msg->received = 0;
+		msg->sender_pid = hdr.pid;
+		msg->sender_sock = hdr.sock;
+		msg->cookie = cookie;
+		DLIST_ADD(ctx->msgs, msg);
+	}
+
+	space = msg->msglen - msg->received;
+	if (buflen > space) {
+		return;
+	}
+
+	memcpy(msg->buf + msg->received, buf, buflen);
+	msg->received += buflen;
+
+	if (msg->received < msg->msglen) {
+		return;
+	}
+
+	DLIST_REMOVE(ctx->msgs, msg);
+	ctx->recv_callback(ctx, msg->buf, msg->msglen, ctx->private_data);
+	free(msg);
+}
+
+int unix_msg_free(struct unix_msg_ctx *ctx)
+{
+	int ret;
+
+	ret = unix_dgram_free(ctx->dgram);
+	if (ret != 0) {
+		return ret;
+	}
+
+	while (ctx->msgs != NULL) {
+		struct unix_msg *msg = ctx->msgs;
+		DLIST_REMOVE(ctx->msgs, msg);
+		free(msg);
+	}
+
+	free(ctx);
+	return 0;
+}
+
+static ssize_t iov_buflen(const struct iovec *iov, int iovlen)
+{
+	size_t buflen = 0;
+	int i;
+
+	for (i=0; i<iovlen; i++) {
+		size_t thislen = iov[i].iov_len;
+		size_t tmp = buflen + thislen;
+
+		if ((tmp < buflen) || (tmp < thislen)) {
+			/* overflow */
+			return -1;
+		}
+		buflen = tmp;
+	}
+	return buflen;
+}
diff --git a/source3/lib/unix_msg/unix_msg.h b/source3/lib/unix_msg/unix_msg.h
new file mode 100644
index 0000000..fc636d8
--- /dev/null
+++ b/source3/lib/unix_msg/unix_msg.h
@@ -0,0 +1,107 @@
+/*
+ * Unix SMB/CIFS implementation.
+ * Copyright (C) Volker Lendecke 2013
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef __UNIX_DGRAM_H__
+#define __UNIX_DGRAM_H__
+
+#include "replace.h"
+#include "poll_funcs/poll_funcs.h"
+#include "system/network.h"
+
+/**
+ * @file unix_msg.h
+ *
+ * @brief Send large messages over unix domain datagram sockets
+ *
+ * A unix_msg_ctx represents a unix domain datagram socket.
+ *
+ * Unix domain datagram sockets have some unique properties compared with UDP
+ * sockets:
+ *
+ * - They are reliable, i.e. as long as both sender and receiver are processes
+ *   that are alive, nothing is lost.
+ *
+ * - They preserve sequencing
+ *
+ * Based on these two properties, this code implements sending of large
+ * messages. It aims at being maximally efficient for short, single-datagram
+ * messages. Ideally, if the receiver queue is not full, sending a message
+ * should be a single syscall without malloc. Receiving a message should also
+ * not malloc anything before the data is shipped to the user.
+ *
+ * If unix_msg_send meets a full receive buffer, more effort is required: The
+ * socket behind unix_msg_send is not pollable for POLLOUT, it will always be
+ * writable: A datagram socket can send anywhere, the full queue is a property
+ * of of the receiving socket. unix_msg_send creates a new unnamed socket that
+ * it will connect(2) to the target socket. This unnamed socket is then
+ * pollable for POLLOUT. The socket will be writable when the destination
+ * socket's queue is drained sufficiently.
+ *
+ * If unix_msg_send is asked to send a message larger than fragment_size, it
+ * will try sending the message in pieces with proper framing, the receiving
+ * side will reassemble the messages.
+ */
+
+/**
+ * @brief Abstract structure representing a unix domain datagram socket
+ */
+struct unix_msg_ctx;
+
+/**
+ * @brief Initialize a struct unix_msg_ctx
+ *
+ * @param[in] path The socket path
+ * @param[in] ev_funcs The event callback functions to use
+ * @param[in] fragment_size Maximum datagram size to send/receive
+ * @param[in] cookie Random number to identify this context
+ * @param[in] recv_callback Function called when a message is received
+ * @param[in] private_data Private pointer for recv_callback
+ * @param[out] result The new struct unix_msg_ctx
+ * @return 0 on success, errno on failure
+ */
+
+int unix_msg_init(const char *path, const struct poll_funcs *ev_funcs,
+		  size_t fragment_size, uint64_t cookie,
+		  void (*recv_callback)(struct unix_msg_ctx *ctx,
+					uint8_t *msg, size_t msg_len,
+					void *private_data),
+		  void *private_data,
+		  struct unix_msg_ctx **result);
+
+/**
+ * @brief Send a message
+ *
+ * @param[in] ctx The context to send across
+ * @param[in] dst_sock The destination socket path
+ * @param[in] iov The message
+ * @param[in] iovlen The number of iov structs
+ * @return 0 on success, errno on failure
+ */
+
+int unix_msg_send(struct unix_msg_ctx *ctx, const char *dst_sock,
+		  const struct iovec *iov, int iovlen);
+
+/**
+ * @brief Free a unix_msg_ctx
+ *
+ * @param[in] ctx The message context to free
+ * @return 0 on success, errno on failure (EBUSY)
+ */
+int unix_msg_free(struct unix_msg_ctx *ctx);
+
+#endif
diff --git a/source3/lib/unix_msg/wscript_build b/source3/lib/unix_msg/wscript_build
new file mode 100644
index 0000000..200840d
--- /dev/null
+++ b/source3/lib/unix_msg/wscript_build
@@ -0,0 +1,18 @@
+#!/usr/bin/env python
+
+bld.SAMBA3_SUBSYSTEM('UNIX_MSG',
+                     source='unix_msg.c',
+		     deps='replace PTHREADPOOL')
+
+bld.SAMBA3_BINARY('unix_msg_test',
+                  source='tests.c',
+                  deps='UNIX_MSG POLL_FUNCS_TEVENT',
+                  install=False)
+bld.SAMBA3_BINARY('unix_msg_test_drain',
+                  source='test_drain.c',
+                  deps='UNIX_MSG POLL_FUNCS_TEVENT',
+                  install=False)
+bld.SAMBA3_BINARY('unix_msg_test_source',
+                  source='test_source.c',
+                  deps='UNIX_MSG POLL_FUNCS_TEVENT',
+                  install=False)
diff --git a/source3/wscript_build b/source3/wscript_build
index fd53e2f..4d261c6 100755
--- a/source3/wscript_build
+++ b/source3/wscript_build
@@ -1453,6 +1453,7 @@ bld.RECURSE('libgpo/gpext')
 bld.RECURSE('lib/pthreadpool')
 bld.RECURSE('lib/asys')
 bld.RECURSE('lib/poll_funcs')
+bld.RECURSE('lib/unix_msg')
 bld.RECURSE('librpc')
 bld.RECURSE('librpc/idl')
 bld.RECURSE('libsmb')
-- 
1.9.1.423.g4596e3a


From effdc4f0f9d93a15c8aba3388b5339141c8037d0 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Sun, 29 Dec 2013 13:56:44 +0100
Subject: [PATCH 03/24] lib: Move full_path_tos to util_str.c

This can be useful elsewhere

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/include/proto.h |  3 +++
 source3/lib/util_str.c  | 39 +++++++++++++++++++++++++++++++++++++++
 source3/smbd/files.c    | 39 ---------------------------------------
 source3/smbd/proto.h    |  3 ---
 4 files changed, 42 insertions(+), 42 deletions(-)

diff --git a/source3/include/proto.h b/source3/include/proto.h
index 3197b76..c854882 100644
--- a/source3/include/proto.h
+++ b/source3/include/proto.h
@@ -724,6 +724,9 @@ bool validate_net_name( const char *name,
 		int max_len);
 char *escape_shell_string(const char *src);
 char **str_list_make_v3(TALLOC_CTX *mem_ctx, const char *string, const char *sep);
+ssize_t full_path_tos(const char *dir, const char *name,
+		      char *tmpbuf, size_t tmpbuf_len,
+		      char **pdst, char **to_free);
 
 /* The following definitions come from lib/version.c  */
 
diff --git a/source3/lib/util_str.c b/source3/lib/util_str.c
index 967beda..908f23a 100644
--- a/source3/lib/util_str.c
+++ b/source3/lib/util_str.c
@@ -1294,3 +1294,42 @@ char **str_list_make_v3(TALLOC_CTX *mem_ctx, const char *string,
 	TALLOC_FREE(s);
 	return list;
 }
+
+/*
+ * This routine improves performance for operations temporarily acting on a
+ * full path. It is equivalent to the much more expensive
+ *
+ * talloc_asprintf(talloc_tos(), "%s/%s", dir, name)
+ *
+ * This actually does make a difference in metadata-heavy workloads (i.e. the
+ * "standard" client.txt nbench run.
+ */
+
+ssize_t full_path_tos(const char *dir, const char *name,
+		      char *tmpbuf, size_t tmpbuf_len,
+		      char **pdst, char **to_free)
+{
+	size_t dirlen, namelen, len;
+	char *dst;
+
+	dirlen = strlen(dir);
+	namelen = strlen(name);
+	len = dirlen + namelen + 1;
+
+	if (len < tmpbuf_len) {
+		dst = tmpbuf;
+		*to_free = NULL;
+	} else {
+		dst = talloc_array(talloc_tos(), char, len+1);
+		if (dst == NULL) {
+			return -1;
+		}
+		*to_free = dst;
+	}
+
+	memcpy(dst, dir, dirlen);
+	dst[dirlen] = '/';
+	memcpy(dst+dirlen+1, name, namelen+1);
+	*pdst = dst;
+	return len;
+}
diff --git a/source3/smbd/files.c b/source3/smbd/files.c
index 5cf037e..8496806 100644
--- a/source3/smbd/files.c
+++ b/source3/smbd/files.c
@@ -688,45 +688,6 @@ NTSTATUS dup_file_fsp(struct smb_request *req, files_struct *from,
 	return fsp_set_smb_fname(to, from->fsp_name);
 }
 
-/*
- * This routine improves performance for operations temporarily acting on a
- * full path. It is equivalent to the much more expensive
- *
- * talloc_asprintf(talloc_tos(), "%s/%s", dir, name)
- *
- * This actually does make a difference in metadata-heavy workloads (i.e. the
- * "standard" client.txt nbench run.
- */
-
-ssize_t full_path_tos(const char *dir, const char *name,
-		      char *tmpbuf, size_t tmpbuf_len,
-		      char **pdst, char **to_free)
-{
-	size_t dirlen, namelen, len;
-	char *dst;
-
-	dirlen = strlen(dir);
-	namelen = strlen(name);
-	len = dirlen + namelen + 1;
-
-	if (len < tmpbuf_len) {
-		dst = tmpbuf;
-		*to_free = NULL;
-	} else {
-		dst = talloc_array(talloc_tos(), char, len+1);
-		if (dst == NULL) {
-			return -1;
-		}
-		*to_free = dst;
-	}
-
-	memcpy(dst, dir, dirlen);
-	dst[dirlen] = '/';
-	memcpy(dst+dirlen+1, name, namelen+1);
-	*pdst = dst;
-	return len;
-}
-
 /**
  * Return a jenkins hash of a pathname on a connection.
  */
diff --git a/source3/smbd/proto.h b/source3/smbd/proto.h
index 62c9728..d9b86b6 100644
--- a/source3/smbd/proto.h
+++ b/source3/smbd/proto.h
@@ -397,9 +397,6 @@ NTSTATUS file_name_hash(connection_struct *conn,
 			const char *name, uint32_t *p_name_hash);
 NTSTATUS fsp_set_smb_fname(struct files_struct *fsp,
 			   const struct smb_filename *smb_fname_in);
-ssize_t full_path_tos(const char *dir, const char *name,
-		      char *tmpbuf, size_t tmpbuf_len,
-		      char **pdst, char **to_free);
 
 /* The following definitions come from smbd/ipc.c  */
 
-- 
1.9.1.423.g4596e3a


From 2028b4cad7ba84e0e59b4e75c276162d3ae1807e Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 24 Feb 2014 12:23:49 +0000
Subject: [PATCH 04/24] lib: Add messaging_dgm

Messaging based on unix domain datagram sockets

This makes every process participating in messaging bind on a unix domain
datagram socket, similar to the source4 based messaging. The details are a bit
different though:

Retry after EWOULDBLOCK is done with a blocking thread, not by polling. This
was the only way I could in experiments avoid a thundering herd or high load
under Linux in extreme overload situations like many thousands of processes
sending to one blocked process. If there are better ideas to do this in a
simple way, I'm more than happy to remove the pthreadpool dependency again.

There is only one socket per process, not per task. I don't think that per-task
sockets are really necessary, we can do filtering in user space. The message
contains the destination server_id, which contains the destination task_id. I
think we can rebase the source4 based imessaging on top of this, allowing
multiple imessaging contexts on top of one messaging_context. I had planned to
do this conversion before this goes in, but Jeremy convinced me that this has
value in itself :-)

Per socket we also create a fcntl-based lockfile to allow race-free cleanup of
orphaned sockets. This lockfile contains the unique_id, which in the future
will make the server_id.tdb obsolete.

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/include/messages.h |   5 +
 source3/lib/messages.c     |   8 +-
 source3/lib/messages_dgm.c | 420 +++++++++++++++++++++++++++++++++++++++++++++
 source3/smbd/server.c      |   6 +
 source3/wscript_build      |   3 +
 5 files changed, 438 insertions(+), 4 deletions(-)
 create mode 100644 source3/lib/messages_dgm.c

diff --git a/source3/include/messages.h b/source3/include/messages.h
index 47c5f7a..9437965 100644
--- a/source3/include/messages.h
+++ b/source3/include/messages.h
@@ -91,6 +91,11 @@ struct messaging_backend {
 	void *private_data;
 };
 
+NTSTATUS messaging_dgm_init(struct messaging_context *msg_ctx,
+			    TALLOC_CTX *mem_ctx,
+			    struct messaging_backend **presult);
+NTSTATUS messaging_dgm_cleanup(struct messaging_context *msg_ctx, pid_t pid);
+
 NTSTATUS messaging_tdb_init(struct messaging_context *msg_ctx,
 			    TALLOC_CTX *mem_ctx,
 			    struct messaging_backend **presult);
diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index 4ff933d..983fe69 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -197,10 +197,10 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
 	ctx->id = procid_self();
 	ctx->event_ctx = ev;
 
-	status = messaging_tdb_init(ctx, ctx, &ctx->local);
+	status = messaging_dgm_init(ctx, ctx, &ctx->local);
 
 	if (!NT_STATUS_IS_OK(status)) {
-		DEBUG(2, ("messaging_tdb_init failed: %s\n",
+		DEBUG(2, ("messaging_dgm_init failed: %s\n",
 			  nt_errstr(status)));
 		TALLOC_FREE(ctx);
 		return NULL;
@@ -245,9 +245,9 @@ NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
 
 	msg_ctx->id = procid_self();
 
-	status = messaging_tdb_init(msg_ctx, msg_ctx, &msg_ctx->local);
+	status = messaging_dgm_init(msg_ctx, msg_ctx, &msg_ctx->local);
 	if (!NT_STATUS_IS_OK(status)) {
-		DEBUG(0, ("messaging_tdb_init failed: %s\n",
+		DEBUG(0, ("messaging_dgm_init failed: %s\n",
 			  nt_errstr(status)));
 		return status;
 	}
diff --git a/source3/lib/messages_dgm.c b/source3/lib/messages_dgm.c
new file mode 100644
index 0000000..0d4df2f
--- /dev/null
+++ b/source3/lib/messages_dgm.c
@@ -0,0 +1,420 @@
+/*
+ * Unix SMB/CIFS implementation.
+ * Samba internal messaging functions
+ * Copyright (C) 2013 by Volker Lendecke
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "includes.h"
+#include "lib/util/data_blob.h"
+#include "lib/util/debug.h"
+#include "lib/unix_msg/unix_msg.h"
+#include "system/filesys.h"
+#include "messages.h"
+#include "lib/param/param.h"
+#include "poll_funcs/poll_funcs_tevent.h"
+#include "unix_msg/unix_msg.h"
+#include "librpc/gen_ndr/messaging.h"
+
+struct messaging_dgm_context {
+	struct messaging_context *msg_ctx;
+	struct poll_funcs msg_callbacks;
+	struct unix_msg_ctx *dgm_ctx;
+	char *cache_dir;
+	int lockfile_fd;
+};
+
+struct messaging_dgm_hdr {
+	uint32_t msg_version;
+	enum messaging_type msg_type;
+	struct server_id dst;
+	struct server_id src;
+};
+
+static NTSTATUS messaging_dgm_send(struct messaging_context *msg_ctx,
+				   struct server_id pid, int msg_type,
+				   const DATA_BLOB *data,
+				   struct messaging_backend *backend);
+static void messaging_dgm_recv(struct unix_msg_ctx *ctx,
+			       uint8_t *msg, size_t msg_len,
+			       void *private_data);
+
+static int messaging_dgm_context_destructor(struct messaging_dgm_context *c);
+
+static int messaging_dgm_lockfile_create(const char *cache_dir, pid_t pid,
+					 int *plockfile_fd, uint64_t unique)
+{
+	char buf[PATH_MAX];
+	char *dir, *to_free;
+	ssize_t dirlen;
+	char *lockfile_name;
+	int lockfile_fd;
+	struct flock lck = {};
+	int unique_len, ret;
+	ssize_t written;
+	bool ok;
+
+	dirlen = full_path_tos(cache_dir, "lck", buf, sizeof(buf),
+			       &dir, &to_free);
+	if (dirlen == -1) {
+		return ENOMEM;
+	}
+
+	ok = directory_create_or_exist_strict(dir, sec_initial_uid(), 0755);
+	if (!ok) {
+		ret = errno;
+		DEBUG(1, ("%s: Could not create lock directory: %s\n",
+			  __func__, strerror(ret)));
+		TALLOC_FREE(to_free);
+		return ret;
+	}
+
+	lockfile_name = talloc_asprintf(talloc_tos(), "%s/%u", dir,
+					(unsigned)pid);
+	TALLOC_FREE(to_free);
+	if (lockfile_name == NULL) {
+		DEBUG(1, ("%s: talloc_asprintf failed\n", __func__));
+		return ENOMEM;
+	}
+
+	/* no O_EXCL, existence check is via the fcntl lock */
+
+	lockfile_fd = open(lockfile_name, O_NONBLOCK|O_CREAT|O_WRONLY, 0644);
+	if (lockfile_fd == -1) {
+		ret = errno;
+		DEBUG(1, ("%s: open failed: %s\n", __func__, strerror(errno)));
+		goto fail_free;
+	}
+
+	lck.l_type = F_WRLCK;
+	lck.l_whence = SEEK_SET;
+	lck.l_start = 0;
+	lck.l_len = 0;
+
+	ret = fcntl(lockfile_fd, F_SETLK, &lck);
+	if (ret == -1) {
+		ret = errno;
+		DEBUG(1, ("%s: fcntl failed: %s\n", __func__, strerror(ret)));
+		goto fail_close;
+	}
+
+	unique_len = snprintf(buf, sizeof(buf), "%"PRIu64, unique);
+
+	/* shorten a potentially preexisting file */
+
+	ret = ftruncate(lockfile_fd, unique_len);
+	if (ret == -1) {
+		ret = errno;
+		DEBUG(1, ("%s: ftruncate failed: %s\n", __func__,
+			  strerror(ret)));
+		goto fail_unlink;
+	}
+
+	written = write(lockfile_fd, buf, unique_len);
+	if (written != unique_len) {
+		ret = errno;
+		DEBUG(1, ("%s: write failed: %s\n", __func__, strerror(ret)));
+		goto fail_unlink;
+	}
+
+	*plockfile_fd = lockfile_fd;
+	return 0;
+
+fail_unlink:
+	unlink(lockfile_name);
+fail_close:
+	close(lockfile_fd);
+fail_free:
+	TALLOC_FREE(lockfile_name);
+	return ret;
+}
+
+static int messaging_dgm_lockfile_remove(const char *cache_dir, pid_t pid)
+{
+	fstring fname;
+	char buf[PATH_MAX];
+	char *lockfile_name, *to_free;
+	ssize_t len;
+	int ret;
+
+	fstr_sprintf(fname, "lck/%u", (unsigned)pid);
+
+	len = full_path_tos(cache_dir, fname, buf, sizeof(buf),
+			    &lockfile_name, &to_free);
+	if (len == -1) {
+		return ENOMEM;
+	}
+
+	ret = unlink(lockfile_name);
+	if (ret == -1) {
+		ret = errno;
+		DEBUG(10, ("%s: unlink failed: %s\n", __func__,
+			   strerror(ret)));
+	}
+	TALLOC_FREE(to_free);
+	return ret;
+}
+
+NTSTATUS messaging_dgm_init(struct messaging_context *msg_ctx,
+			    TALLOC_CTX *mem_ctx,
+			    struct messaging_backend **presult)
+{
+	struct messaging_backend *result;
+	struct messaging_dgm_context *ctx;
+	struct loadparm_context *lp_ctx;
+	struct server_id pid = messaging_server_id(msg_ctx);
+	int ret;
+	bool ok;
+	const char *cache_dir;
+	char *socket_dir, *socket_name;
+	uint64_t cookie;
+
+	lp_ctx = loadparm_init_s3(talloc_tos(), loadparm_s3_helpers());
+	if (lp_ctx == NULL) {
+		DEBUG(0, ("loadparm_init_s3 failed\n"));
+		return NT_STATUS_INTERNAL_ERROR;
+	}
+
+	cache_dir = lp_cache_directory();
+	if (cache_dir == NULL) {
+		NTSTATUS status = map_nt_error_from_unix(errno);
+		talloc_unlink(talloc_tos(), lp_ctx);
+		return status;
+	}
+
+	talloc_unlink(talloc_tos(), lp_ctx);
+	lp_ctx = NULL;
+
+	result = talloc(mem_ctx, struct messaging_backend);
+	if (result == NULL) {
+		goto fail_nomem;
+	}
+	ctx = talloc_zero(result, struct messaging_dgm_context);
+	if (ctx == NULL) {
+		goto fail_nomem;
+	}
+
+	result->private_data = ctx;
+	result->send_fn = messaging_dgm_send;
+	ctx->msg_ctx = msg_ctx;
+
+	ctx->cache_dir = talloc_strdup(ctx, cache_dir);
+	if (ctx->cache_dir == NULL) {
+		goto fail_nomem;
+	}
+	socket_dir = talloc_asprintf(ctx, "%s/msg", cache_dir);
+	if (socket_dir == NULL) {
+		goto fail_nomem;
+	}
+	socket_name = talloc_asprintf(ctx, "%s/%u", socket_dir,
+				      (unsigned)pid.pid);
+	if (socket_name == NULL) {
+		goto fail_nomem;
+	}
+
+	sec_init();
+
+	ret = messaging_dgm_lockfile_create(cache_dir, pid.pid,
+					    &ctx->lockfile_fd, pid.unique_id);
+	if (ret != 0) {
+		DEBUG(1, ("%s: messaging_dgm_create_lockfile failed: %s\n",
+			  __func__, strerror(ret)));
+		TALLOC_FREE(result);
+		return map_nt_error_from_unix(ret);
+	}
+
+	poll_funcs_init_tevent(&ctx->msg_callbacks, msg_ctx->event_ctx);
+
+	ok = directory_create_or_exist_strict(socket_dir, sec_initial_uid(),
+					      0700);
+	if (!ok) {
+		DEBUG(1, ("Could not create socket directory\n"));
+		TALLOC_FREE(result);
+		return NT_STATUS_ACCESS_DENIED;
+	}
+	TALLOC_FREE(socket_dir);
+
+	unlink(socket_name);
+
+	generate_random_buffer((uint8_t *)&cookie, sizeof(cookie));
+
+	ret = unix_msg_init(socket_name, &ctx->msg_callbacks, 1024, cookie,
+			    messaging_dgm_recv, ctx, &ctx->dgm_ctx);
+	TALLOC_FREE(socket_name);
+	if (ret != 0) {
+		DEBUG(1, ("unix_msg_init failed: %s\n", strerror(ret)));
+		TALLOC_FREE(result);
+		return map_nt_error_from_unix(ret);
+	}
+	talloc_set_destructor(ctx, messaging_dgm_context_destructor);
+
+	*presult = result;
+	return NT_STATUS_OK;
+
+fail_nomem:
+	TALLOC_FREE(result);
+	return NT_STATUS_NO_MEMORY;
+}
+
+static int messaging_dgm_context_destructor(struct messaging_dgm_context *c)
+{
+	struct server_id pid = messaging_server_id(c->msg_ctx);
+
+	/*
+	 * First delete the socket to avoid races. The lockfile is the
+	 * indicator that we're still around.
+	 */
+	unix_msg_free(c->dgm_ctx);
+
+	if (getpid() == pid.pid) {
+		(void)messaging_dgm_lockfile_remove(c->cache_dir, pid.pid);
+	}
+	close(c->lockfile_fd);
+	return 0;
+}
+
+static NTSTATUS messaging_dgm_send(struct messaging_context *msg_ctx,
+				   struct server_id pid, int msg_type,
+				   const DATA_BLOB *data,
+				   struct messaging_backend *backend)
+{
+	struct messaging_dgm_context *ctx = talloc_get_type_abort(
+		backend->private_data, struct messaging_dgm_context);
+	fstring pid_str;
+	char buf[PATH_MAX];
+	char *dst_sock, *to_free;
+	struct messaging_dgm_hdr hdr;
+	struct iovec iov[2];
+	ssize_t pathlen;
+	int ret;
+
+	fstr_sprintf(pid_str, "msg/%u", (unsigned)pid.pid);
+
+	pathlen = full_path_tos(ctx->cache_dir, pid_str, buf, sizeof(buf),
+				&dst_sock, &to_free);
+	if (pathlen == -1) {
+		return NT_STATUS_NO_MEMORY;
+	}
+
+	hdr.msg_version = MESSAGE_VERSION;
+	hdr.msg_type = msg_type & MSG_TYPE_MASK;
+	hdr.dst = pid;
+	hdr.src = msg_ctx->id;
+
+	DEBUG(10, ("%s: Sending message 0x%x len %u to %s\n", __func__,
+		   (unsigned)hdr.msg_type, (unsigned)data->length,
+		   server_id_str(talloc_tos(), &pid)));
+
+	iov[0].iov_base = &hdr;
+	iov[0].iov_len = sizeof(hdr);
+	iov[1].iov_base = data->data;
+	iov[1].iov_len = data->length;
+
+	become_root();
+	ret = unix_msg_send(ctx->dgm_ctx, dst_sock, iov, ARRAY_SIZE(iov));
+	unbecome_root();
+
+	TALLOC_FREE(to_free);
+
+	if (ret != 0) {
+		return map_nt_error_from_unix(ret);
+	}
+	return NT_STATUS_OK;
+}
+
+static void messaging_dgm_recv(struct unix_msg_ctx *ctx,
+			       uint8_t *msg, size_t msg_len,
+			       void *private_data)
+{
+	struct messaging_dgm_context *dgm_ctx = talloc_get_type_abort(
+		private_data, struct messaging_dgm_context);
+	struct messaging_dgm_hdr *hdr;
+	struct messaging_rec rec;
+
+	if (msg_len < sizeof(*hdr)) {
+		DEBUG(1, ("message too short: %u\n", (unsigned)msg_len));
+		return;
+	}
+
+	/*
+	 * unix_msg guarantees alignment, so we can cast here
+	 */
+	hdr = (struct messaging_dgm_hdr *)msg;
+
+	rec.msg_version = hdr->msg_version;
+	rec.msg_type = hdr->msg_type;
+	rec.dest = hdr->dst;
+	rec.src = hdr->src;
+	rec.buf.data = msg + sizeof(*hdr);
+	rec.buf.length = msg_len - sizeof(*hdr);
+
+	DEBUG(10, ("%s: Received message 0x%x len %u from %s\n", __func__,
+		   (unsigned)hdr->msg_type, (unsigned)rec.buf.length,
+		   server_id_str(talloc_tos(), &rec.src)));
+
+	messaging_dispatch_rec(dgm_ctx->msg_ctx, &rec);
+}
+
+NTSTATUS messaging_dgm_cleanup(struct messaging_context *msg_ctx, pid_t pid)
+{
+	struct messaging_dgm_context *ctx = talloc_get_type_abort(
+		msg_ctx->local->private_data, struct messaging_dgm_context);
+	char *lockfile_name, *socket_name;
+	int fd, ret;
+	struct flock lck = {};
+	NTSTATUS status = NT_STATUS_OK;
+
+	lockfile_name = talloc_asprintf(talloc_tos(), "%s/lck/%u",
+					ctx->cache_dir, (unsigned)pid);
+	if (lockfile_name == NULL) {
+		return NT_STATUS_NO_MEMORY;
+	}
+	socket_name = talloc_asprintf(lockfile_name, "%s/msg/%u",
+				      ctx->cache_dir, (unsigned)pid);
+	if (socket_name == NULL) {
+		TALLOC_FREE(lockfile_name);
+		return NT_STATUS_NO_MEMORY;
+	}
+
+	fd = open(lockfile_name, O_NONBLOCK|O_WRONLY, 0);
+	if (fd == -1) {
+		status = map_nt_error_from_unix(errno);
+		DEBUG(10, ("%s: open(%s) failed: %s\n", __func__,
+			   lockfile_name, strerror(errno)));
+		return status;
+	}
+
+	lck.l_type = F_WRLCK;
+	lck.l_whence = SEEK_SET;
+	lck.l_start = 0;
+	lck.l_len = 0;
+
+	ret = fcntl(fd, F_SETLK, &lck);
+	if (ret != 0) {
+		status = map_nt_error_from_unix(errno);
+		DEBUG(10, ("%s: Could not get lock: %s\n", __func__,
+			   strerror(errno)));
+		TALLOC_FREE(lockfile_name);
+		close(fd);
+		return status;
+	}
+
+	(void)unlink(socket_name);
+	(void)unlink(lockfile_name);
+	(void)close(fd);
+
+	TALLOC_FREE(lockfile_name);
+	return NT_STATUS_OK;
+}
diff --git a/source3/smbd/server.c b/source3/smbd/server.c
index bc9d293..7c6a905 100644
--- a/source3/smbd/server.c
+++ b/source3/smbd/server.c
@@ -465,6 +465,8 @@ static void remove_child_pid(struct smbd_parent_context *parent,
 	}
 
 	if (unclean_shutdown) {
+		NTSTATUS status;
+
 		/* a child terminated uncleanly so tickle all
 		   processes to see if they can grab any of the
 		   pending locks
@@ -488,6 +490,10 @@ static void remove_child_pid(struct smbd_parent_context *parent,
 		 * terminated uncleanly.
 		 */
 		messaging_cleanup_server(parent->msg_ctx, child_id);
+
+		status = messaging_dgm_cleanup(parent->msg_ctx, pid);
+		DEBUG(10, ("%s: messaging_dgm_cleanup returned %s\n",
+			   __func__, nt_errstr(status)));
 	}
 
 	if (!serverid_deregister(child_id)) {
diff --git a/source3/wscript_build b/source3/wscript_build
index 4d261c6..2ba7a96 100755
--- a/source3/wscript_build
+++ b/source3/wscript_build
@@ -314,6 +314,7 @@ bld.SAMBA3_SUBSYSTEM('TDB_LIB',
 bld.SAMBA3_SUBSYSTEM('samba3core',
                    source='''lib/messages.c
                    lib/messages_local.c
+                   lib/messages_dgm.c
                    lib/util_cluster.c
                    lib/id_cache.c
                    lib/talloc_dict.c
@@ -352,6 +353,8 @@ bld.SAMBA3_SUBSYSTEM('samba3core',
                         UTIL_PW
                         SAMBA_VERSION
                         PTHREADPOOL
+                        UNIX_MSG
+                        POLL_FUNCS_TEVENT
                         interfaces
                         param
                         dbwrap
-- 
1.9.1.423.g4596e3a


From 4a94f707878b6ceccb3f9d286cf07c9b678f1fdc Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 24 Feb 2014 13:20:16 +0000
Subject: [PATCH 05/24] lib: Remove messages_local

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/include/messages.h   |  12 -
 source3/lib/messages.c       |  17 --
 source3/lib/messages_local.c | 573 -------------------------------------------
 source3/smbd/server.c        |  11 -
 source3/wscript_build        |   1 -
 5 files changed, 614 deletions(-)
 delete mode 100644 source3/lib/messages_local.c

diff --git a/source3/include/messages.h b/source3/include/messages.h
index 9437965..b89af9c 100644
--- a/source3/include/messages.h
+++ b/source3/include/messages.h
@@ -96,15 +96,6 @@ NTSTATUS messaging_dgm_init(struct messaging_context *msg_ctx,
 			    struct messaging_backend **presult);
 NTSTATUS messaging_dgm_cleanup(struct messaging_context *msg_ctx, pid_t pid);
 
-NTSTATUS messaging_tdb_init(struct messaging_context *msg_ctx,
-			    TALLOC_CTX *mem_ctx,
-			    struct messaging_backend **presult);
-
-bool messaging_tdb_parent_init(TALLOC_CTX *mem_ctx);
-
-NTSTATUS messaging_tdb_cleanup(struct messaging_context *msg_ctx,
-			struct server_id pid);
-
 NTSTATUS messaging_ctdbd_init(struct messaging_context *msg_ctx,
 			      TALLOC_CTX *mem_ctx,
 			      struct messaging_backend **presult);
@@ -151,9 +142,6 @@ struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
 			struct messaging_rec **presult);
 
-void messaging_cleanup_server(struct messaging_context *msg_ctx,
-				struct server_id pid);
-
 #include "librpc/gen_ndr/ndr_messaging.h"
 
 #endif
diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index 983fe69..5db1214 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -567,21 +567,4 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx,
 	return;
 }
 
-/*
-  Call when a process has terminated abnormally.
-*/
-void messaging_cleanup_server(struct messaging_context *msg_ctx,
-				struct server_id server)
-{
-	if (server_id_is_disconnected(&server)) {
-		return;
-	}
-
-	if (!procid_is_local(&server)) {
-		return;
-	}
-
-	(void)messaging_tdb_cleanup(msg_ctx, server);
-
-}
 /** @} **/
diff --git a/source3/lib/messages_local.c b/source3/lib/messages_local.c
deleted file mode 100644
index d535df1..0000000
--- a/source3/lib/messages_local.c
+++ /dev/null
@@ -1,573 +0,0 @@
-/* 
-   Unix SMB/CIFS implementation.
-   Samba internal messaging functions
-   Copyright (C) 2007 by Volker Lendecke
-
-   This program is free software; you can redistribute it and/or modify
-   it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 3 of the License, or
-   (at your option) any later version.
-
-   This program is distributed in the hope that it will be useful,
-   but WITHOUT ANY WARRANTY; without even the implied warranty of
-   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-   GNU General Public License for more details.
-
-   You should have received a copy of the GNU General Public License
-   along with this program.  If not, see <http://www.gnu.org/licenses/>.
-*/
-
-/**
-  @defgroup messages Internal messaging framework
-  @{
-  @file messages.c
-
-  @brief  Module for internal messaging between Samba daemons. 
-
-   The idea is that if a part of Samba wants to do communication with
-   another Samba process then it will do a message_register() of a
-   dispatch function, and use message_send_pid() to send messages to
-   that process.
-
-   The dispatch function is given the pid of the sender, and it can
-   use that to reply by message_send_pid().  See ping_message() for a
-   simple example.
-
-   @caution Dispatch functions must be able to cope with incoming
-   messages on an *odd* byte boundary.
-
-   This system doesn't have any inherent size limitations but is not
-   very efficient for large messages or when messages are sent in very
-   quick succession.
-
-*/
-
-#include "includes.h"
-#include "system/filesys.h"
-#include "messages.h"
-#include "serverid.h"
-#include "lib/tdb_wrap/tdb_wrap.h"
-#include "lib/param/param.h"
-
-struct messaging_tdb_context {
-	struct messaging_context *msg_ctx;
-	struct tdb_wrap *tdb;
-	struct tevent_signal *se;
-	int received_messages;
-	bool *have_context;
-};
-
-static NTSTATUS messaging_tdb_send(struct messaging_context *msg_ctx,
-				   struct server_id pid, int msg_type,
-				   const DATA_BLOB *data,
-				   struct messaging_backend *backend);
-static void message_dispatch(struct messaging_context *msg_ctx);
-
-static void messaging_tdb_signal_handler(struct tevent_context *ev_ctx,
-					 struct tevent_signal *se,
-					 int signum, int count,
-					 void *_info, void *private_data)
-{
-	struct messaging_tdb_context *ctx = talloc_get_type(private_data,
-					    struct messaging_tdb_context);
-
-	ctx->received_messages++;
-
-	DEBUG(10, ("messaging_tdb_signal_handler: sig[%d] count[%d] msgs[%d]\n",
-		   signum, count, ctx->received_messages));
-
-	message_dispatch(ctx->msg_ctx);
-}
-
-static int messaging_tdb_context_destructor(struct messaging_tdb_context *ctx);
-
-/****************************************************************************
- Initialise the messaging functions. 
-****************************************************************************/
-
-NTSTATUS messaging_tdb_init(struct messaging_context *msg_ctx,
-			    TALLOC_CTX *mem_ctx,
-			    struct messaging_backend **presult)
-{
-	struct messaging_backend *result;
-	struct messaging_tdb_context *ctx;
-	struct loadparm_context *lp_ctx;
-	static bool have_context = false;
-	const char *fname;
-
-	if (have_context) {
-		DEBUG(0, ("No two messaging contexts per process\n"));
-		return NT_STATUS_OBJECT_NAME_COLLISION;
-	}
-
-	if (!(result = talloc(mem_ctx, struct messaging_backend))) {
-		DEBUG(0, ("talloc failed\n"));
-		return NT_STATUS_NO_MEMORY;
-	}
-
-	lp_ctx = loadparm_init_s3(result, loadparm_s3_helpers());
-	if (lp_ctx == NULL) {
-		DEBUG(0, ("loadparm_init_s3 failed\n"));
-		TALLOC_FREE(result);
-		return NT_STATUS_INTERNAL_ERROR;
-	}
-
-	ctx = talloc_zero(result, struct messaging_tdb_context);
-	if (!ctx) {
-		DEBUG(0, ("talloc failed\n"));
-		TALLOC_FREE(result);
-		return NT_STATUS_NO_MEMORY;
-	}
-	result->private_data = ctx;
-	result->send_fn = messaging_tdb_send;
-
-	ctx->msg_ctx = msg_ctx;
-	ctx->have_context = &have_context;
-
-	fname = lock_path("messages.tdb");
-
-	ctx->tdb = tdb_wrap_open(
-		ctx, fname, lpcfg_tdb_hash_size(lp_ctx, fname),
-		lpcfg_tdb_flags(lp_ctx, TDB_CLEAR_IF_FIRST|TDB_DEFAULT|
-				TDB_VOLATILE| TDB_INCOMPATIBLE_HASH),
-		O_RDWR|O_CREAT,0600);
-
-	talloc_unlink(result, lp_ctx);
-
-	if (!ctx->tdb) {
-		NTSTATUS status = map_nt_error_from_unix(errno);
-		DEBUG(2, ("ERROR: Failed to initialise messages database: "
-			  "%s\n", strerror(errno)));
-		TALLOC_FREE(result);
-		return status;
-	}
-
-	ctx->se = tevent_add_signal(msg_ctx->event_ctx,
-				    ctx,
-				    SIGUSR1, 0,
-				    messaging_tdb_signal_handler,
-				    ctx);
-	if (!ctx->se) {
-		NTSTATUS status = map_nt_error_from_unix(errno);
-		DEBUG(0, ("ERROR: Failed to initialise messages signal handler: "
-			  "%s\n", strerror(errno)));
-		TALLOC_FREE(result);
-		return status;
-	}
-
-	sec_init();
-
-	have_context = true;
-	talloc_set_destructor(ctx, messaging_tdb_context_destructor);
-
-	*presult = result;
-	return NT_STATUS_OK;
-}
-
-static int messaging_tdb_context_destructor(struct messaging_tdb_context *ctx)
-{
-	SMB_ASSERT(*ctx->have_context);
-	*ctx->have_context = false;
-	return 0;
-}
-
-bool messaging_tdb_parent_init(TALLOC_CTX *mem_ctx)
-{
-	struct tdb_wrap *db;
-	struct loadparm_context *lp_ctx;
-	const char *fname;
-
-	lp_ctx = loadparm_init_s3(mem_ctx, loadparm_s3_helpers());
-	if (lp_ctx == NULL) {
-		DEBUG(0, ("loadparm_init_s3 failed\n"));
-		return false;
-	}
-
-	/*
-	 * Open the tdb in the parent process (smbd) so that our
-	 * CLEAR_IF_FIRST optimization in tdb_reopen_all can properly
-	 * work.
-	 */
-
-	fname = lock_path("messages.tdb");
-	db = tdb_wrap_open(
-		mem_ctx, fname, lpcfg_tdb_hash_size(lp_ctx, fname),
-		lpcfg_tdb_flags(lp_ctx, TDB_CLEAR_IF_FIRST|TDB_DEFAULT|
-				TDB_VOLATILE|TDB_INCOMPATIBLE_HASH),
-		O_RDWR|O_CREAT,0600);
-	talloc_unlink(mem_ctx, lp_ctx);
-	if (db == NULL) {
-		DEBUG(1, ("could not open messaging.tdb: %s\n",
-			  strerror(errno)));
-		return false;
-	}
-	return true;
-}
-
-/*******************************************************************
- Form a static tdb key from a pid.
-******************************************************************/
-
-static TDB_DATA message_key_pid(TALLOC_CTX *mem_ctx, struct server_id pid)
-{
-	char *key;
-	TDB_DATA kbuf;
-
-	key = talloc_asprintf(mem_ctx, "PID/%s", procid_str_static(&pid));
-
-	SMB_ASSERT(key != NULL);
-
-	kbuf.dptr = (uint8 *)key;
-	kbuf.dsize = strlen(key)+1;
-	return kbuf;
-}
-
-/*******************************************************************
- Called when a process has terminated abnormally. Remove all messages
- pending for it.
-******************************************************************/
-
-NTSTATUS messaging_tdb_cleanup(struct messaging_context *msg_ctx,
-				struct server_id pid)
-{
-	struct messaging_tdb_context *ctx = talloc_get_type(
-					msg_ctx->local->private_data,
-					struct messaging_tdb_context);
-	struct tdb_wrap *tdb = ctx->tdb;
-	TDB_DATA key;
-	TALLOC_CTX *frame = talloc_stackframe();
-
-	key = message_key_pid(frame, pid);
-	/*
-	 * We have to lock the key to avoid
-	 * races in case the server_id was
-	 * re-used and is active (a remote
-	 * possibility, true). We only
-	 * clean up the database if we
-	 * know server_id doesn't exist
-	 * while checked under the chainlock.
-	 */
-	if (tdb_chainlock(tdb->tdb, key) != 0) {
-		TALLOC_FREE(frame);
-		return NT_STATUS_LOCK_NOT_GRANTED;
-	}
-	if (!serverid_exists(&pid)) {
-		(void)tdb_delete(tdb->tdb, key);
-	}
-	tdb_chainunlock(tdb->tdb, key);
-	TALLOC_FREE(frame);
-	return NT_STATUS_OK;
-}
-
-/*
-  Fetch the messaging array for a process
- */
-
-static NTSTATUS messaging_tdb_fetch(TDB_CONTEXT *msg_tdb,
-				    TDB_DATA key,
-				    TALLOC_CTX *mem_ctx,
-				    struct messaging_array **presult)
-{
-	struct messaging_array *result;
-	TDB_DATA data;
-	DATA_BLOB blob;
-	enum ndr_err_code ndr_err;
-
-	if (!(result = talloc_zero(mem_ctx, struct messaging_array))) {
-		return NT_STATUS_NO_MEMORY;
-	}
-
-	data = tdb_fetch(msg_tdb, key);
-
-	if (data.dptr == NULL) {
-		*presult = result;
-		return NT_STATUS_OK;
-	}
-
-	blob = data_blob_const(data.dptr, data.dsize);
-
-	ndr_err = ndr_pull_struct_blob_all(
-		&blob, result, result,
-		(ndr_pull_flags_fn_t)ndr_pull_messaging_array);
-
-	SAFE_FREE(data.dptr);
-
-	if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
-		TALLOC_FREE(result);
-		return ndr_map_error2ntstatus(ndr_err);
-	}
-
-	if (DEBUGLEVEL >= 10) {
-		DEBUG(10, ("messaging_tdb_fetch:\n"));
-		NDR_PRINT_DEBUG(messaging_array, result);
-	}
-
-	*presult = result;
-	return NT_STATUS_OK;
-}
-
-/*
-  Store a messaging array for a pid
-*/
-
-static NTSTATUS messaging_tdb_store(TDB_CONTEXT *msg_tdb,
-				    TDB_DATA key,
-				    struct messaging_array *array)
-{
-	TDB_DATA data;
-	DATA_BLOB blob;
-	enum ndr_err_code ndr_err;
-	TALLOC_CTX *mem_ctx;
-	int ret;
-
-	if (array->num_messages == 0) {
-		tdb_delete(msg_tdb, key);
-		return NT_STATUS_OK;
-	}
-
-	if (!(mem_ctx = talloc_new(array))) {
-		return NT_STATUS_NO_MEMORY;
-	}
-
-	ndr_err = ndr_push_struct_blob(&blob, mem_ctx, array,
-		(ndr_push_flags_fn_t)ndr_push_messaging_array);
-
-	if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
-		talloc_free(mem_ctx);
-		return ndr_map_error2ntstatus(ndr_err);
-	}
-
-	if (DEBUGLEVEL >= 10) {
-		DEBUG(10, ("messaging_tdb_store:\n"));
-		NDR_PRINT_DEBUG(messaging_array, array);
-	}
-
-	data.dptr = blob.data;
-	data.dsize = blob.length;
-
-	ret = tdb_store(msg_tdb, key, data, TDB_REPLACE);
-	TALLOC_FREE(mem_ctx);
-
-	return (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION;
-}
-
-/****************************************************************************
- Notify a process that it has a message. If the process doesn't exist 
- then delete its record in the database.
-****************************************************************************/
-
-static NTSTATUS message_notify(struct server_id procid)
-{
-	pid_t pid = procid.pid;
-	int ret;
-	uid_t euid = geteuid();
-
-	/*
-	 * Doing kill with a non-positive pid causes messages to be
-	 * sent to places we don't want.
-	 */
-
-	SMB_ASSERT(pid > 0);
-	if (pid <= 0) {
-		return NT_STATUS_INVALID_HANDLE;
-	}
-
-	if (euid != 0) {
-		/* If we're not root become so to send the message. */
-		save_re_uid();
-		set_effective_uid(0);
-	}
-
-	ret = kill(pid, SIGUSR1);
-
-	if (euid != 0) {
-		/* Go back to who we were. */
-		int saved_errno = errno;
-		restore_re_uid_fromroot();
-		errno = saved_errno;
-	}
-
-	if (ret == 0) {
-		return NT_STATUS_OK;
-	}
-
-	/*
-	 * Something has gone wrong
-	 */
-
-	DEBUG(2,("message to process %d failed - %s\n", (int)pid,
-		 strerror(errno)));
-
-	/*
-	 * No call to map_nt_error_from_unix -- don't want to link in
-	 * errormap.o into lots of utils.
-	 */
-
-	if (errno == ESRCH)  return NT_STATUS_INVALID_HANDLE;
-	if (errno == EINVAL) return NT_STATUS_INVALID_PARAMETER;
-	if (errno == EPERM)  return NT_STATUS_ACCESS_DENIED;
-	return NT_STATUS_UNSUCCESSFUL;
-}
-
-/****************************************************************************
- Send a message to a particular pid.
-****************************************************************************/
-
-static NTSTATUS messaging_tdb_send(struct messaging_context *msg_ctx,
-				   struct server_id pid, int msg_type,
-				   const DATA_BLOB *data,
-				   struct messaging_backend *backend)
-{
-	struct messaging_tdb_context *ctx = talloc_get_type(backend->private_data,
-					    struct messaging_tdb_context);
-	struct messaging_array *msg_array;
-	struct messaging_rec *rec;
-	NTSTATUS status;
-	TDB_DATA key;
-	struct tdb_wrap *tdb = ctx->tdb;
-	TALLOC_CTX *frame = talloc_stackframe();
-
-	/* NULL pointer means implicit length zero. */
-	if (!data->data) {
-		SMB_ASSERT(data->length == 0);
-	}
-
-	/*
-	 * Doing kill with a non-positive pid causes messages to be
-	 * sent to places we don't want.
-	 */
-
-	SMB_ASSERT(procid_to_pid(&pid) > 0);
-
-	key = message_key_pid(frame, pid);
-
-	if (tdb_chainlock(tdb->tdb, key) != 0) {
-		TALLOC_FREE(frame);
-		return NT_STATUS_LOCK_NOT_GRANTED;
-	}
-
-	status = messaging_tdb_fetch(tdb->tdb, key, frame, &msg_array);
-
-	if (!NT_STATUS_IS_OK(status)) {
-		goto done;
-	}
-
-	if ((msg_type & MSG_FLAG_LOWPRIORITY)
-	    && (msg_array->num_messages > 1000)) {
-		DEBUG(5, ("Dropping message for PID %s\n",
-			  procid_str_static(&pid)));
-		status = NT_STATUS_INSUFFICIENT_RESOURCES;
-		goto done;
-	}
-
-	if (!(rec = talloc_realloc(frame, msg_array->messages,
-					 struct messaging_rec,
-					 msg_array->num_messages+1))) {
-		status = NT_STATUS_NO_MEMORY;
-		goto done;
-	}
-
-	rec[msg_array->num_messages].msg_version = MESSAGE_VERSION;
-	rec[msg_array->num_messages].msg_type = msg_type & MSG_TYPE_MASK;
-	rec[msg_array->num_messages].dest = pid;
-	rec[msg_array->num_messages].src = msg_ctx->id;
-	rec[msg_array->num_messages].buf = *data;
-
-	msg_array->messages = rec;
-	msg_array->num_messages += 1;
-
-	status = messaging_tdb_store(tdb->tdb, key, msg_array);
-
-	if (!NT_STATUS_IS_OK(status)) {
-		goto done;
-	}
-
-	status = message_notify(pid);
-
-	if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
-		DEBUG(2, ("pid %s doesn't exist - deleting messages record\n",
-			  procid_str_static(&pid)));
-		tdb_delete(tdb->tdb, message_key_pid(frame, pid));
-	}
-
- done:
-	tdb_chainunlock(tdb->tdb, key);
-	TALLOC_FREE(frame);
-	return status;
-}
-
-/****************************************************************************
- Retrieve all messages for a process.
-****************************************************************************/
-
-static NTSTATUS retrieve_all_messages(TDB_CONTEXT *msg_tdb,
-				      struct server_id id,
-				      TALLOC_CTX *mem_ctx,
-				      struct messaging_array **presult)
-{
-	struct messaging_array *result;
-	TDB_DATA key = message_key_pid(mem_ctx, id);
-	NTSTATUS status;
-
-	if (tdb_chainlock(msg_tdb, key) != 0) {
-		TALLOC_FREE(key.dptr);
-		return NT_STATUS_LOCK_NOT_GRANTED;
-	}
-
-	status = messaging_tdb_fetch(msg_tdb, key, mem_ctx, &result);
-
-	/*
-	 * We delete the record here, tdb_set_max_dead keeps it around
-	 */
-	tdb_delete(msg_tdb, key);
-	tdb_chainunlock(msg_tdb, key);
-
-	if (NT_STATUS_IS_OK(status)) {
-		*presult = result;
-	}
-
-	TALLOC_FREE(key.dptr);
-
-	return status;
-}
-
-/****************************************************************************
- Receive and dispatch any messages pending for this process.
- JRA changed Dec 13 2006. Only one message handler now permitted per type.
- *NOTE*: Dispatch functions must be able to cope with incoming
- messages on an *odd* byte boundary.
-****************************************************************************/
-
-static void message_dispatch(struct messaging_context *msg_ctx)
-{
-	struct messaging_tdb_context *ctx = talloc_get_type(msg_ctx->local->private_data,
-					    struct messaging_tdb_context);
-	struct messaging_array *msg_array = NULL;
-	struct tdb_wrap *tdb = ctx->tdb;
-	NTSTATUS status;
-	uint32 i;
-
-	if (ctx->received_messages == 0) {
-		return;
-	}
-
-	DEBUG(10, ("message_dispatch: received_messages = %d\n",
-		   ctx->received_messages));
-
-	status = retrieve_all_messages(tdb->tdb, msg_ctx->id, NULL, &msg_array);
-	if (!NT_STATUS_IS_OK(status)) {
-		DEBUG(0, ("message_dispatch: failed to retrieve messages: %s\n",
-			   nt_errstr(status)));
-		return;
-	}
-
-	ctx->received_messages = 0;
-
-	for (i=0; i<msg_array->num_messages; i++) {
-		messaging_dispatch_rec(msg_ctx, &msg_array->messages[i]);
-	}
-
-	TALLOC_FREE(msg_array);
-}
-
-/** @} **/
diff --git a/source3/smbd/server.c b/source3/smbd/server.c
index 7c6a905..9fcf254 100644
--- a/source3/smbd/server.c
+++ b/source3/smbd/server.c
@@ -484,13 +484,6 @@ static void remove_child_pid(struct smbd_parent_context *parent,
 			DEBUG(1,("Scheduled cleanup of brl and lock database after unclean shutdown\n"));
 		}
 
-		/*
-		 * Ensure we flush any stored messages
-		 * queued for the child process that
-		 * terminated uncleanly.
-		 */
-		messaging_cleanup_server(parent->msg_ctx, child_id);
-
 		status = messaging_dgm_cleanup(parent->msg_ctx, pid);
 		DEBUG(10, ("%s: messaging_dgm_cleanup returned %s\n",
 			   __func__, nt_errstr(status)));
@@ -1457,10 +1450,6 @@ extern void build_options(bool screen);
 	if (!locking_init())
 		exit(1);
 
-	if (!messaging_tdb_parent_init(ev_ctx)) {
-		exit(1);
-	}
-
 	if (!smbd_parent_notify_init(NULL, msg_ctx, ev_ctx)) {
 		exit(1);
 	}
diff --git a/source3/wscript_build b/source3/wscript_build
index 2ba7a96..4f5661f 100755
--- a/source3/wscript_build
+++ b/source3/wscript_build
@@ -313,7 +313,6 @@ bld.SAMBA3_SUBSYSTEM('TDB_LIB',
 
 bld.SAMBA3_SUBSYSTEM('samba3core',
                    source='''lib/messages.c
-                   lib/messages_local.c
                    lib/messages_dgm.c
                    lib/util_cluster.c
                    lib/id_cache.c
-- 
1.9.1.423.g4596e3a


From bfdb516e0a5325f589a6b474aa32e0a3f2133887 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 4 Apr 2014 15:00:16 +0000
Subject: [PATCH 06/24] smbd: Add a timestamp to queued notify events

In a cluster and with changed messaging it can happen that messages are
scheduled after new SMB requests. This re-ordering breaks a few notify tests.
This starts the infrastructure to add timestamps to notify events, so that they
can be sorted before they are sent out. The timestamp will be the current local
time of notify_fname, that's all we can do.

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/smbd/notify.c | 21 ++++++++++++++-------
 1 file changed, 14 insertions(+), 7 deletions(-)

diff --git a/source3/smbd/notify.c b/source3/smbd/notify.c
index c19982a..6a4b52c 100644
--- a/source3/smbd/notify.c
+++ b/source3/smbd/notify.c
@@ -24,6 +24,12 @@
 #include "smbd/globals.h"
 #include "../librpc/gen_ndr/ndr_notify.h"
 
+struct notify_change_event {
+	struct timespec when;
+	uint32_t action;
+	const char *name;
+};
+
 struct notify_change_buf {
 	/*
 	 * If no requests are pending, changes are queued here. Simple array,
@@ -35,7 +41,7 @@ struct notify_change_buf {
 	 * asked we just return NT_STATUS_OK without specific changes.
 	 */
 	int num_changes;
-	struct notify_change *changes;
+	struct notify_change_event *changes;
 
 	/*
 	 * If no changes are around requests are queued here. Using a linked
@@ -87,8 +93,8 @@ struct notify_mid_map {
 	uint64_t mid;
 };
 
-static bool notify_change_record_identical(struct notify_change *c1,
-					struct notify_change *c2)
+static bool notify_change_record_identical(struct notify_change_event *c1,
+					   struct notify_change_event *c2)
 {
 	/* Note this is deliberately case sensitive. */
 	if (c1->action == c2->action &&
@@ -100,7 +106,7 @@ static bool notify_change_record_identical(struct notify_change *c1,
 
 static bool notify_marshall_changes(int num_changes,
 				uint32 max_offset,
-				struct notify_change *changes,
+				struct notify_change_event *changes,
 				DATA_BLOB *final_blob)
 {
 	int i;
@@ -111,7 +117,7 @@ static bool notify_marshall_changes(int num_changes,
 
 	for (i=0; i<num_changes; i++) {
 		enum ndr_err_code ndr_err;
-		struct notify_change *c;
+		struct notify_change_event *c;
 		struct FILE_NOTIFY_INFORMATION m;
 		DATA_BLOB blob;
 
@@ -437,7 +443,7 @@ void notify_fname(connection_struct *conn, uint32 action, uint32 filter,
 
 static void notify_fsp(files_struct *fsp, uint32 action, const char *name)
 {
-	struct notify_change *change, *changes;
+	struct notify_change_event *change, *changes;
 	char *tmp;
 
 	if (fsp->notify == NULL) {
@@ -483,7 +489,8 @@ static void notify_fsp(files_struct *fsp, uint32 action, const char *name)
 
 	if (!(changes = talloc_realloc(
 		      fsp->notify, fsp->notify->changes,
-		      struct notify_change, fsp->notify->num_changes+1))) {
+		      struct notify_change_event,
+		      fsp->notify->num_changes+1))) {
 		DEBUG(0, ("talloc_realloc failed\n"));
 		return;
 	}
-- 
1.9.1.423.g4596e3a


From 346a3c7ddb45be109a64aa82f4916171d4653a58 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 4 Apr 2014 15:03:44 +0000
Subject: [PATCH 07/24] smbd: Pass timespec_current to notify_fsp

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/smbd/notify.c | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)

diff --git a/source3/smbd/notify.c b/source3/smbd/notify.c
index 6a4b52c..693418a 100644
--- a/source3/smbd/notify.c
+++ b/source3/smbd/notify.c
@@ -63,7 +63,8 @@ struct notify_change_request {
 	void *backend_data;
 };
 
-static void notify_fsp(files_struct *fsp, uint32 action, const char *name);
+static void notify_fsp(files_struct *fsp, struct timespec when,
+		       uint32 action, const char *name);
 
 bool change_notify_fsp_has_changes(struct files_struct *fsp)
 {
@@ -214,7 +215,7 @@ static void notify_callback(void *private_data, const struct notify_event *e)
 {
 	files_struct *fsp = (files_struct *)private_data;
 	DEBUG(10, ("notify_callback called for %s\n", fsp_str_dbg(fsp)));
-	notify_fsp(fsp, e->action, e->path);
+	notify_fsp(fsp, timespec_current(), e->action, e->path);
 }
 
 static void sys_notify_callback(struct sys_notify_context *ctx,
@@ -223,7 +224,7 @@ static void sys_notify_callback(struct sys_notify_context *ctx,
 {
 	files_struct *fsp = (files_struct *)private_data;
 	DEBUG(10, ("sys_notify_callback called for %s\n", fsp_str_dbg(fsp)));
-	notify_fsp(fsp, e->action, e->path);
+	notify_fsp(fsp, timespec_current(), e->action, e->path);
 }
 
 NTSTATUS change_notify_create(struct files_struct *fsp, uint32 filter,
@@ -441,7 +442,8 @@ void notify_fname(connection_struct *conn, uint32 action, uint32 filter,
 	TALLOC_FREE(to_free);
 }
 
-static void notify_fsp(files_struct *fsp, uint32 action, const char *name)
+static void notify_fsp(files_struct *fsp, struct timespec when,
+		       uint32 action, const char *name)
 {
 	struct notify_change_event *change, *changes;
 	char *tmp;
@@ -507,6 +509,7 @@ static void notify_fsp(files_struct *fsp, uint32 action, const char *name)
 	string_replace(tmp, '/', '\\');
 	change->name = tmp;	
 
+	change->when = when;
 	change->action = action;
 	fsp->notify->num_changes += 1;
 
-- 
1.9.1.423.g4596e3a


From 01b83de1115042598516353101cc7fdfa5b6b3ac Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 4 Apr 2014 15:11:51 +0000
Subject: [PATCH 08/24] smbd: Pass timespec_current through the notify_callback

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/smbd/notify.c          | 5 +++--
 source3/smbd/notify_internal.c | 8 +++++---
 source3/smbd/proto.h           | 3 ++-
 3 files changed, 10 insertions(+), 6 deletions(-)

diff --git a/source3/smbd/notify.c b/source3/smbd/notify.c
index 693418a..b8085dd 100644
--- a/source3/smbd/notify.c
+++ b/source3/smbd/notify.c
@@ -211,11 +211,12 @@ void change_notify_reply(struct smb_request *req,
 	notify_buf->num_changes = 0;
 }
 
-static void notify_callback(void *private_data, const struct notify_event *e)
+static void notify_callback(void *private_data, struct timespec when,
+			    const struct notify_event *e)
 {
 	files_struct *fsp = (files_struct *)private_data;
 	DEBUG(10, ("notify_callback called for %s\n", fsp_str_dbg(fsp)));
-	notify_fsp(fsp, timespec_current(), e->action, e->path);
+	notify_fsp(fsp, when, e->action, e->path);
 }
 
 static void sys_notify_callback(struct sys_notify_context *ctx,
diff --git a/source3/smbd/notify_internal.c b/source3/smbd/notify_internal.c
index 4d88565..69f9377 100644
--- a/source3/smbd/notify_internal.c
+++ b/source3/smbd/notify_internal.c
@@ -44,7 +44,7 @@
 struct notify_list {
 	struct notify_list *next, *prev;
 	const char *path;
-	void (*callback)(void *, const struct notify_event *);
+	void (*callback)(void *, struct timespec, const struct notify_event *);
 	void *private_data;
 };
 
@@ -194,7 +194,8 @@ static int notify_context_destructor(struct notify_context *notify)
 
 NTSTATUS notify_add(struct notify_context *notify,
 		    const char *path, uint32_t filter, uint32_t subdir_filter,
-		    void (*callback)(void *, const struct notify_event *),
+		    void (*callback)(void *, struct timespec,
+				     const struct notify_event *),
 		    void *private_data)
 {
 	struct notify_db_entry e;
@@ -820,7 +821,8 @@ static void notify_handler(struct messaging_context *msg_ctx,
 
 	for (listel=notify->list;listel;listel=listel->next) {
 		if (listel->private_data == n->private_data) {
-			listel->callback(listel->private_data, n);
+			listel->callback(listel->private_data,
+					 timespec_current(), n);
 			break;
 		}
 	}
diff --git a/source3/smbd/proto.h b/source3/smbd/proto.h
index d9b86b6..bc15f15 100644
--- a/source3/smbd/proto.h
+++ b/source3/smbd/proto.h
@@ -552,7 +552,8 @@ struct notify_context *notify_init(TALLOC_CTX *mem_ctx,
 				   struct tevent_context *ev);
 NTSTATUS notify_add(struct notify_context *notify,
 		    const char *path, uint32_t filter, uint32_t subdir_filter,
-		    void (*callback)(void *, const struct notify_event *),
+		    void (*callback)(void *, struct timespec,
+				     const struct notify_event *),
 		    void *private_data);
 NTSTATUS notify_remove(struct notify_context *notify, void *private_data);
 void notify_trigger(struct notify_context *notify,
-- 
1.9.1.423.g4596e3a


From 7da0071413454f1b5c54cb188685618e09703a8d Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Sun, 2 Mar 2014 18:34:53 +0100
Subject: [PATCH 09/24] lib: Introduce iov_buflen

.. with overflow protection

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/include/proto.h |  1 +
 source3/lib/util_sock.c | 28 +++++++++++++++++++++++-----
 2 files changed, 24 insertions(+), 5 deletions(-)

diff --git a/source3/include/proto.h b/source3/include/proto.h
index c854882..983a4d0 100644
--- a/source3/include/proto.h
+++ b/source3/include/proto.h
@@ -583,6 +583,7 @@ NTSTATUS read_fd_with_timeout(int fd, char *buf,
 				  size_t *size_ret);
 NTSTATUS read_data(int fd, char *buffer, size_t N);
 ssize_t write_data(int fd, const char *buffer, size_t N);
+ssize_t iov_buflen(const struct iovec *iov, int iovlen);
 ssize_t write_data_iov(int fd, const struct iovec *orig_iov, int iovcnt);
 bool send_keepalive(int client);
 NTSTATUS read_smb_length_return_keepalive(int fd, char *inbuf,
diff --git a/source3/lib/util_sock.c b/source3/lib/util_sock.c
index 12e4ccd..b8149ca 100644
--- a/source3/lib/util_sock.c
+++ b/source3/lib/util_sock.c
@@ -201,6 +201,24 @@ NTSTATUS read_data(int fd, char *buffer, size_t N)
 	return read_fd_with_timeout(fd, buffer, N, N, 0, NULL);
 }
 
+ssize_t iov_buflen(const struct iovec *iov, int iovcnt)
+{
+	size_t buflen = 0;
+	int i;
+
+	for (i=0; i<iovcnt; i++) {
+		size_t thislen = iov[i].iov_len;
+		size_t tmp = buflen + thislen;
+
+		if ((tmp < buflen) || (tmp < thislen)) {
+			/* overflow */
+			return -1;
+		}
+		buflen = tmp;
+	}
+	return buflen;
+}
+
 /****************************************************************************
  Write all data from an iov array
  NB. This can be called with a non-socket fd, don't add dependencies
@@ -209,15 +227,15 @@ NTSTATUS read_data(int fd, char *buffer, size_t N)
 
 ssize_t write_data_iov(int fd, const struct iovec *orig_iov, int iovcnt)
 {
-	int i;
-	size_t to_send;
+	ssize_t to_send;
 	ssize_t thistime;
 	size_t sent;
 	struct iovec *iov_copy, *iov;
 
-	to_send = 0;
-	for (i=0; i<iovcnt; i++) {
-		to_send += orig_iov[i].iov_len;
+	to_send = iov_buflen(orig_iov, iovcnt);
+	if (to_send == -1) {
+		errno = EINVAL;
+		return -1;
 	}
 
 	thistime = sys_writev(fd, orig_iov, iovcnt);
-- 
1.9.1.423.g4596e3a


From 76611b116d430c2a1607a388f1f8048082a6a569 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Sun, 2 Mar 2014 19:33:08 +0100
Subject: [PATCH 10/24] lib: Add iov_buf

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/include/proto.h |  1 +
 source3/lib/util_sock.c | 25 +++++++++++++++++++++++++
 2 files changed, 26 insertions(+)

diff --git a/source3/include/proto.h b/source3/include/proto.h
index 983a4d0..0a4db86 100644
--- a/source3/include/proto.h
+++ b/source3/include/proto.h
@@ -584,6 +584,7 @@ NTSTATUS read_fd_with_timeout(int fd, char *buf,
 NTSTATUS read_data(int fd, char *buffer, size_t N);
 ssize_t write_data(int fd, const char *buffer, size_t N);
 ssize_t iov_buflen(const struct iovec *iov, int iovlen);
+uint8_t *iov_buf(TALLOC_CTX *mem_ctx, const struct iovec *iov, int iovcnt);
 ssize_t write_data_iov(int fd, const struct iovec *orig_iov, int iovcnt);
 bool send_keepalive(int client);
 NTSTATUS read_smb_length_return_keepalive(int fd, char *inbuf,
diff --git a/source3/lib/util_sock.c b/source3/lib/util_sock.c
index b8149ca..c5de61a 100644
--- a/source3/lib/util_sock.c
+++ b/source3/lib/util_sock.c
@@ -219,6 +219,31 @@ ssize_t iov_buflen(const struct iovec *iov, int iovcnt)
 	return buflen;
 }
 
+uint8_t *iov_buf(TALLOC_CTX *mem_ctx, const struct iovec *iov, int iovcnt)
+{
+	int i;
+	ssize_t buflen;
+	uint8_t *buf, *p;
+
+	buflen = iov_buflen(iov, iovcnt);
+	if (buflen == -1) {
+		return NULL;
+	}
+	buf = talloc_array(mem_ctx, uint8_t, buflen);
+	if (buf == NULL) {
+		return NULL;
+	}
+
+	p = buf;
+	for (i=0; i<iovcnt; i++) {
+		size_t len = iov[i].iov_len;
+
+		memcpy(p, iov[i].iov_base, len);
+		p += len;
+	}
+	return buf;
+}
+
 /****************************************************************************
  Write all data from an iov array
  NB. This can be called with a non-socket fd, don't add dependencies
-- 
1.9.1.423.g4596e3a


From 810d47929811d4b55a7833aa99a5d4eb4ce787eb Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Tue, 25 Feb 2014 12:15:58 +0000
Subject: [PATCH 11/24] messaging3: Add messaging_send_iov

This uses a copy, will be replaced by a direct iovec call through to
sendmsg on the unix domain socket

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/include/messages.h |  3 +++
 source3/lib/messages.c     | 19 +++++++++++++++++++
 2 files changed, 22 insertions(+)

diff --git a/source3/include/messages.h b/source3/include/messages.h
index b89af9c..3e1b664 100644
--- a/source3/include/messages.h
+++ b/source3/include/messages.h
@@ -132,6 +132,9 @@ NTSTATUS messaging_send(struct messaging_context *msg_ctx,
 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
 			    struct server_id server, uint32_t msg_type,
 			    const uint8_t *buf, size_t len);
+NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
+			    struct server_id server, uint32_t msg_type,
+			    const struct iovec *iov, int iovlen);
 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
 			    struct messaging_rec *rec);
 
diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index 5db1214..b4ed807 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -419,6 +419,25 @@ NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
 	return messaging_send(msg_ctx, server, msg_type, &blob);
 }
 
+NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
+			    struct server_id server, uint32_t msg_type,
+			    const struct iovec *iov, int iovlen)
+{
+	uint8_t *buf;
+	NTSTATUS status;
+
+	buf = iov_buf(talloc_tos(), iov, iovlen);
+	if (buf == NULL) {
+		return NT_STATUS_NO_MEMORY;
+	}
+
+	status = messaging_send_buf(msg_ctx, server, msg_type,
+				    buf, talloc_get_size(buf));
+
+	TALLOC_FREE(buf);
+	return status;
+}
+
 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
 					       struct messaging_rec *rec)
 {
-- 
1.9.1.423.g4596e3a


From 069a261437bfe78439375223fc63856807e45a72 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 4 Apr 2014 21:01:01 +0200
Subject: [PATCH 12/24] smbd: Pass on a timestamp in MSG_PVFS_NOTIFY

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/smbd/notify_internal.c | 67 +++++++++++++++++++++---------------------
 1 file changed, 33 insertions(+), 34 deletions(-)

diff --git a/source3/smbd/notify_internal.c b/source3/smbd/notify_internal.c
index 69f9377..e902bf4 100644
--- a/source3/smbd/notify_internal.c
+++ b/source3/smbd/notify_internal.c
@@ -767,30 +767,32 @@ done:
 	TALLOC_FREE(data.dptr);
 }
 
+struct notify_msg {
+	struct timespec when;
+	void *private_data;
+	uint32_t action;
+	char path[1];
+};
+
 static NTSTATUS notify_send(struct notify_context *notify,
 			    struct server_id *pid,
 			    const char *path, uint32_t action,
 			    void *private_data)
 {
-	struct notify_event ev;
-	DATA_BLOB data;
-	NTSTATUS status;
-	enum ndr_err_code ndr_err;
+	struct notify_msg m = {};
+	struct iovec iov[2];
 
-	ev.action = action;
-	ev.path = path;
-	ev.private_data = private_data;
+	m.when = timespec_current();
+	m.private_data = private_data;
+	m.action = action;
 
-	ndr_err = ndr_push_struct_blob(
-		&data, talloc_tos(), &ev,
-		(ndr_push_flags_fn_t)ndr_push_notify_event);
-	if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
-		return ndr_map_error2ntstatus(ndr_err);
-	}
-	status = messaging_send(notify->msg, *pid, MSG_PVFS_NOTIFY,
-				&data);
-	TALLOC_FREE(data.data);
-	return status;
+	iov[0].iov_base = &m;
+	iov[0].iov_len = offsetof(struct notify_msg, path);
+	iov[1].iov_base = discard_const_p(char, path);
+	iov[1].iov_len = strlen(path)+1;
+
+	return messaging_send_iov(notify->msg, *pid, MSG_PVFS_NOTIFY,
+				  iov, ARRAY_SIZE(iov));
 }
 
 static void notify_handler(struct messaging_context *msg_ctx,
@@ -799,34 +801,31 @@ static void notify_handler(struct messaging_context *msg_ctx,
 {
 	struct notify_context *notify = talloc_get_type_abort(
 		private_data, struct notify_context);
-	enum ndr_err_code ndr_err;
-	struct notify_event *n;
+	struct notify_msg *m;
+	struct notify_event e;
 	struct notify_list *listel;
 
-	n = talloc(talloc_tos(), struct notify_event);
-	if (n == NULL) {
-		DEBUG(1, ("talloc failed\n"));
+	if (data->length == 0) {
+		DEBUG(1, ("%s: Got 0-sized MSG_PVFS_NOTIFY msg\n", __func__));
 		return;
 	}
-
-	ndr_err = ndr_pull_struct_blob(
-		data, n, n, (ndr_pull_flags_fn_t)ndr_pull_notify_event);
-	if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
-		TALLOC_FREE(n);
+	if (data->data[data->length-1] != 0) {
+		DEBUG(1, ("%s: MSG_PVFS_NOTIFY path not 0-terminated\n",
+			  __func__));
 		return;
 	}
-	if (DEBUGLEVEL >= 10) {
-		NDR_PRINT_DEBUG(notify_event, n);
-	}
+
+	m = (struct notify_msg *)data->data;
+	e.action = m->action;
+	e.path = m->path;
+	e.private_data = m->private_data;
 
 	for (listel=notify->list;listel;listel=listel->next) {
-		if (listel->private_data == n->private_data) {
-			listel->callback(listel->private_data,
-					 timespec_current(), n);
+		if (listel->private_data == m->private_data) {
+			listel->callback(listel->private_data, m->when, &e);
 			break;
 		}
 	}
-	TALLOC_FREE(n);
 }
 
 struct notify_walk_idx_state {
-- 
1.9.1.423.g4596e3a


From 0ee4c14e9c3dc397e627364de72fae9ce7e5174f Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 4 Apr 2014 21:12:06 +0200
Subject: [PATCH 13/24] smbd: Sort notify events by timestamp

This will fix the raw.notify test with the new messaging system. With the new
messaging system messages come in via yet another fd that has to line up in
poll next to the incoming client TCP socket. With the signal-based messaging
messages were always handled before client requests. The new scheme means that
notify messages might be deferred a bit (something which can happen in a
cluster already now), which then means that notify_marshall_changes() will
coalesce entries, which in turn makes raw.notify unhappy.

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/smbd/notify.c | 16 ++++++++++++++++
 1 file changed, 16 insertions(+)

diff --git a/source3/smbd/notify.c b/source3/smbd/notify.c
index b8085dd..dd4dc1a 100644
--- a/source3/smbd/notify.c
+++ b/source3/smbd/notify.c
@@ -170,6 +170,14 @@ static bool notify_marshall_changes(int num_changes,
 	return True;
 }
 
+static int compare_notify_change_events(const void *p1, const void *p2)
+{
+	const struct notify_change_event *e1 = p1;
+	const struct notify_change_event *e2 = p2;
+
+	return timespec_compare(&e1->when, &e2->when);
+}
+
 /****************************************************************************
  Setup the common parts of the return packet and send it.
 *****************************************************************************/
@@ -194,6 +202,14 @@ void change_notify_reply(struct smb_request *req,
 		return;
 	}
 
+	/*
+	 * Sort the notifies by timestamp when the event happened to avoid
+	 * coalescing and thus dropping events in notify_marshall_changes.
+	 */
+
+	qsort(notify_buf->changes, notify_buf->num_changes,
+	      sizeof(*(notify_buf->changes)), compare_notify_change_events);
+
 	if (!notify_marshall_changes(notify_buf->num_changes, max_param,
 					notify_buf->changes, &blob)) {
 		/*
-- 
1.9.1.423.g4596e3a


From 7f92da0179ad20596be077e704bada06a69c253f Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 11 Apr 2014 09:09:49 +0200
Subject: [PATCH 14/24] printing_cups: Call the msg_ctx destructor on exit

With the new messaging, if we don't do this, we'll leave sockets around. I'm
sure we will not catch everything, so a periodic cleanup will be required.

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/printing/print_cups.c | 1 +
 1 file changed, 1 insertion(+)

diff --git a/source3/printing/print_cups.c b/source3/printing/print_cups.c
index 6c1e9ce..0ec71ab 100644
--- a/source3/printing/print_cups.c
+++ b/source3/printing/print_cups.c
@@ -483,6 +483,7 @@ static bool cups_pcap_load_async(struct tevent_context *ev,
 	close(fds[0]);
 	cups_cache_reload_async(fds[1]);
 	close(fds[1]);
+	TALLOC_FREE(msg_ctx);
 	_exit(0);
 }
 
-- 
1.9.1.423.g4596e3a


From 6770ed55d82303d71e379b7c1e95c50e8dca6f08 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 11 Apr 2014 09:12:46 +0200
Subject: [PATCH 15/24] smbcontrol: Clean up the msg_ctx

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/utils/smbcontrol.c | 1 +
 1 file changed, 1 insertion(+)

diff --git a/source3/utils/smbcontrol.c b/source3/utils/smbcontrol.c
index 156a7ad..d0e923a 100644
--- a/source3/utils/smbcontrol.c
+++ b/source3/utils/smbcontrol.c
@@ -1580,6 +1580,7 @@ int main(int argc, const char **argv)
 	}
 
 	ret = !do_command(evt_ctx, msg_ctx, argc, argv);
+	TALLOC_FREE(msg_ctx);
 	TALLOC_FREE(frame);
 	return ret;
 }
-- 
1.9.1.423.g4596e3a


From d5bdcd5e4ec99283e7e3ade21d599a5622a4de29 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 11 Apr 2014 09:13:10 +0200
Subject: [PATCH 16/24] smbd: Always clean up the child's msg_ctx

This is a bit lazy programming, we could and possibly should do this in
exit_server() in the child. But this way we make sure the cleanup works. If it
only was executed for unclean exits, we might not detect failure of this code
in the parent.

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/smbd/server.c | 11 +++++------
 1 file changed, 5 insertions(+), 6 deletions(-)

diff --git a/source3/smbd/server.c b/source3/smbd/server.c
index 9fcf254..55a64f6 100644
--- a/source3/smbd/server.c
+++ b/source3/smbd/server.c
@@ -445,9 +445,14 @@ static void remove_child_pid(struct smbd_parent_context *parent,
 {
 	struct smbd_child_pid *child;
 	struct server_id child_id;
+	NTSTATUS status;
 
 	child_id = pid_to_procid(pid);
 
+	status = messaging_dgm_cleanup(parent->msg_ctx, pid);
+	DEBUG(10, ("%s: messaging_dgm_cleanup returned %s\n",
+		   __func__, nt_errstr(status)));
+
 	for (child = parent->children; child != NULL; child = child->next) {
 		if (child->pid == pid) {
 			struct smbd_child_pid *tmp = child;
@@ -465,8 +470,6 @@ static void remove_child_pid(struct smbd_parent_context *parent,
 	}
 
 	if (unclean_shutdown) {
-		NTSTATUS status;
-
 		/* a child terminated uncleanly so tickle all
 		   processes to see if they can grab any of the
 		   pending locks
@@ -483,10 +486,6 @@ static void remove_child_pid(struct smbd_parent_context *parent,
 						parent);
 			DEBUG(1,("Scheduled cleanup of brl and lock database after unclean shutdown\n"));
 		}
-
-		status = messaging_dgm_cleanup(parent->msg_ctx, pid);
-		DEBUG(10, ("%s: messaging_dgm_cleanup returned %s\n",
-			   __func__, nt_errstr(status)));
 	}
 
 	if (!serverid_deregister(child_id)) {
-- 
1.9.1.423.g4596e3a


From 19685b4d02727dc755fd6491f3bb85a66dd99937 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Thu, 10 Apr 2014 22:07:11 +0200
Subject: [PATCH 17/24] messaging_dgm: Add messaging_dgm_wipe

This walks all sockets and wipes the left-overs

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/include/messages.h |  1 +
 source3/lib/messages_dgm.c | 54 ++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 55 insertions(+)

diff --git a/source3/include/messages.h b/source3/include/messages.h
index 3e1b664..8a818db 100644
--- a/source3/include/messages.h
+++ b/source3/include/messages.h
@@ -95,6 +95,7 @@ NTSTATUS messaging_dgm_init(struct messaging_context *msg_ctx,
 			    TALLOC_CTX *mem_ctx,
 			    struct messaging_backend **presult);
 NTSTATUS messaging_dgm_cleanup(struct messaging_context *msg_ctx, pid_t pid);
+NTSTATUS messaging_dgm_wipe(struct messaging_context *msg_ctx);
 
 NTSTATUS messaging_ctdbd_init(struct messaging_context *msg_ctx,
 			      TALLOC_CTX *mem_ctx,
diff --git a/source3/lib/messages_dgm.c b/source3/lib/messages_dgm.c
index 0d4df2f..5827b78 100644
--- a/source3/lib/messages_dgm.c
+++ b/source3/lib/messages_dgm.c
@@ -418,3 +418,57 @@ NTSTATUS messaging_dgm_cleanup(struct messaging_context *msg_ctx, pid_t pid)
 	TALLOC_FREE(lockfile_name);
 	return NT_STATUS_OK;
 }
+
+NTSTATUS messaging_dgm_wipe(struct messaging_context *msg_ctx)
+{
+	struct messaging_dgm_context *ctx = talloc_get_type_abort(
+		msg_ctx->local->private_data, struct messaging_dgm_context);
+	char *msgdir_name;
+	DIR *msgdir;
+	struct dirent *dp;
+	pid_t our_pid = getpid();
+
+	/*
+	 * We scan the socket directory and not the lock directory. Otherwise
+	 * we would race against messaging_dgm_lockfile_create's open(O_CREAT)
+	 * and fcntl(SETLK).
+	 */
+
+	msgdir_name = talloc_asprintf(talloc_tos(), "%s/msg", ctx->cache_dir);
+	if (msgdir_name == NULL) {
+		return NT_STATUS_NO_MEMORY;
+	}
+
+	msgdir = opendir(msgdir_name);
+	TALLOC_FREE(msgdir_name);
+	if (msgdir == NULL) {
+		return map_nt_error_from_unix(errno);
+	}
+
+	while ((dp = readdir(msgdir)) != NULL) {
+		NTSTATUS status;
+		unsigned long pid;
+
+		pid = strtoul(dp->d_name, NULL, 10);
+		if (pid == 0) {
+			/*
+			 * . and .. and other malformed entries
+			 */
+			continue;
+		}
+		if (pid == our_pid) {
+			/*
+			 * fcntl(F_GETLK) will succeed for ourselves, we hold
+			 * that lock ourselves.
+			 */
+			continue;
+		}
+
+		status = messaging_dgm_cleanup(msg_ctx, pid);
+		DEBUG(10, ("messaging_dgm_cleanup(%lu) returned %s\n",
+			   pid, nt_errstr(status)));
+	}
+	closedir(msgdir);
+
+	return NT_STATUS_OK;
+}
-- 
1.9.1.423.g4596e3a


From d37001764d714759ba9ac89255cf2be671b33ec2 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Thu, 10 Apr 2014 22:09:04 +0200
Subject: [PATCH 18/24] smbcontrol: Add dgm-cleanup command

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/utils/smbcontrol.c | 20 ++++++++++++++++++++
 1 file changed, 20 insertions(+)

diff --git a/source3/utils/smbcontrol.c b/source3/utils/smbcontrol.c
index d0e923a..274de70 100644
--- a/source3/utils/smbcontrol.c
+++ b/source3/utils/smbcontrol.c
@@ -968,6 +968,25 @@ static bool do_num_children(struct tevent_context *ev_ctx,
 	return num_replies;
 }
 
+static bool do_dgm_cleanup(struct tevent_context *ev_ctx,
+			   struct messaging_context *msg_ctx,
+			   const struct server_id pid,
+			   const int argc, const char **argv)
+{
+	NTSTATUS status;
+
+	if (pid.pid != 0) {
+		status = messaging_dgm_cleanup(msg_ctx, pid.pid);
+	} else {
+		status = messaging_dgm_wipe(msg_ctx);
+	}
+
+	printf("cleanup(%u) returned %s\n", (unsigned)pid.pid,
+	       nt_errstr(status));
+
+	return NT_STATUS_IS_OK(status);
+}
+
 /* Shutdown a server process */
 
 static bool do_shutdown(struct tevent_context *ev_ctx,
@@ -1378,6 +1397,7 @@ static const struct {
 	{ "notify-cleanup", do_notify_cleanup },
 	{ "num-children", do_num_children,
 	  "Print number of smbd child processes" },
+	{ "dgm-cleanup", do_dgm_cleanup },
 	{ "noop", do_noop, "Do nothing" },
 	{ NULL }
 };
-- 
1.9.1.423.g4596e3a


From 1cfc5654f0885b37d8fea0a19b7755ea4b0c3376 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 11 Apr 2014 11:07:10 +0000
Subject: [PATCH 19/24] smbd: Call the msg_ctx destructor for background jobs

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/lib/background.c | 1 +
 1 file changed, 1 insertion(+)

diff --git a/source3/lib/background.c b/source3/lib/background.c
index 6a91783..a9fd04f 100644
--- a/source3/lib/background.c
+++ b/source3/lib/background.c
@@ -181,6 +181,7 @@ static void background_job_waited(struct tevent_req *subreq)
 		if (written == -1) {
 			_exit(1);
 		}
+		TALLOC_FREE(state->msg);
 		_exit(0);
 	}
 
-- 
1.9.1.423.g4596e3a


From 9f4e2d0037773bd3c0cb13ca7bd0788dda357c92 Mon Sep 17 00:00:00 2001
From: Jeremy Allison <jra at samba.org>
Date: Fri, 18 Apr 2014 14:47:39 -0700
Subject: [PATCH 20/24] s3 : build system : Move lib/background.c from
 smbd_base to samba3core.

Allows background jobs to be run from winbindd and nmbd.

Signed-off-by: Jeremy Allison <jra at samba.org>
Reviewed-by: Michael Adam <obnox at samba.org>
---
 source3/wscript_build | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/source3/wscript_build b/source3/wscript_build
index 4f5661f..e9c2f91 100755
--- a/source3/wscript_build
+++ b/source3/wscript_build
@@ -343,7 +343,8 @@ bld.SAMBA3_SUBSYSTEM('samba3core',
                    lib/audit.c
                    lib/tevent_wait.c
                    lib/idmap_cache.c
-                   lib/util_ea.c''',
+                   lib/util_ea.c
+                   lib/background.c''',
                    deps='''
                         samba3util
                         LIBTSOCKET
@@ -554,7 +555,6 @@ bld.SAMBA3_LIBRARY('smbd_base',
                    smbd/error.c
                    printing/printspoolss.c
                    printing/spoolssd.c
-                   lib/background.c
                    lib/sessionid_tdb.c
                    lib/conn_tdb.c
                    smbd/fake_file.c
-- 
1.9.1.423.g4596e3a


From 90ec27fda9c3471ee8e1dce1e7646e1d198c6643 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 11 Apr 2014 11:08:56 +0000
Subject: [PATCH 21/24] s3: messaging: Add infrastructure to clean up orphaned
 sockets every 15 minutes as a background task.

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/include/messages.h |  2 ++
 source3/lib/messages.c     | 50 ++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 52 insertions(+)

diff --git a/source3/include/messages.h b/source3/include/messages.h
index 8a818db..1681ec9 100644
--- a/source3/include/messages.h
+++ b/source3/include/messages.h
@@ -146,6 +146,8 @@ struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
 			struct messaging_rec **presult);
 
+bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg);
+
 #include "librpc/gen_ndr/ndr_messaging.h"
 
 #endif
diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index b4ed807..b6fe423 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -50,6 +50,7 @@
 #include "serverid.h"
 #include "messages.h"
 #include "lib/util/tevent_unix.h"
+#include "lib/background.h"
 
 struct messaging_callback {
 	struct messaging_callback *prev, *next;
@@ -586,4 +587,53 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx,
 	return;
 }
 
+static int mess_parent_dgm_cleanup(void *private_data);
+static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
+
+bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
+{
+	struct tevent_req *req;
+
+	req = background_job_send(
+		msg, msg->event_ctx, msg, NULL, 0,
+		lp_parm_int(-1, "messaging", "messaging dgm cleanup interval", 60*15),
+		mess_parent_dgm_cleanup, msg);
+	if (req == NULL) {
+		return false;
+	}
+	tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
+	return true;
+}
+
+static int mess_parent_dgm_cleanup(void *private_data)
+{
+	struct messaging_context *msg_ctx = talloc_get_type_abort(
+		private_data, struct messaging_context);
+	NTSTATUS status;
+
+	status = messaging_dgm_wipe(msg_ctx);
+	DEBUG(10, ("messaging_dgm_wipe returned %s\n", nt_errstr(status)));
+	return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval", 60*15);
+}
+
+static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
+{
+	struct messaging_context *msg = tevent_req_callback_data(
+		req, struct messaging_context);
+	NTSTATUS status;
+
+	status = background_job_recv(req);
+	TALLOC_FREE(req);
+	DEBUG(1, ("messaging dgm cleanup job ended with %s\n", nt_errstr(status)));
+
+	req = background_job_send(
+		msg, msg->event_ctx, msg, NULL, 0,
+		lp_parm_int(-1, "messaging", "messaging dgm cleanup interval", 60*15),
+		mess_parent_dgm_cleanup, msg);
+	if (req == NULL) {
+		DEBUG(1, ("background_job_send failed\n"));
+	}
+	tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
+}
+
 /** @} **/
-- 
1.9.1.423.g4596e3a


From 39c3d4a20e186b0cad036a4addc9f4fa68a7a122 Mon Sep 17 00:00:00 2001
From: Jeremy Allison <jra at samba.org>
Date: Fri, 18 Apr 2014 15:06:05 -0700
Subject: [PATCH 22/24] s3: smbd: Call dgram cleanup init background setup.

Signed-off-by: Jeremy Allison <jra at samba.org>
Reviewed-by: Michael Adam <obnox at samba.org>
---
 source3/smbd/server.c | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/source3/smbd/server.c b/source3/smbd/server.c
index 55a64f6..33021c0 100644
--- a/source3/smbd/server.c
+++ b/source3/smbd/server.c
@@ -1453,6 +1453,10 @@ extern void build_options(bool screen);
 		exit(1);
 	}
 
+	if (!messaging_parent_dgm_cleanup_init(msg_ctx)) {
+		exit(1);
+	}
+
 	if (!smbd_scavenger_init(NULL, msg_ctx, ev_ctx)) {
 		exit(1);
 	}
-- 
1.9.1.423.g4596e3a


From bac4471f991f19b107962d78832a2640fe07ee93 Mon Sep 17 00:00:00 2001
From: Jeremy Allison <jra at samba.org>
Date: Fri, 18 Apr 2014 15:08:19 -0700
Subject: [PATCH 23/24] s3: nmbd: Call dgram cleanup init background setup.

Signed-off-by: Jeremy Allison <jra at samba.org>
Reviewed-by: Michael Adam <obnox at samba.org>
---
 source3/nmbd/nmbd.c | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/source3/nmbd/nmbd.c b/source3/nmbd/nmbd.c
index 80c2329..dd7f133 100644
--- a/source3/nmbd/nmbd.c
+++ b/source3/nmbd/nmbd.c
@@ -1015,6 +1015,10 @@ static bool open_sockets(bool isdaemon, int port)
 	if (!nmbd_setup_sig_hup_handler(msg))
 		exit(1);
 
+	if (!messaging_parent_dgm_cleanup_init(msg)) {
+		exit(1);
+	}
+
 	/* get broadcast messages */
 
 	if (!serverid_register(messaging_server_id(msg),
-- 
1.9.1.423.g4596e3a


From e34961b77c91dbaf073362dbdd43f352ec21609d Mon Sep 17 00:00:00 2001
From: Jeremy Allison <jra at samba.org>
Date: Fri, 18 Apr 2014 15:09:28 -0700
Subject: [PATCH 24/24] s3: winbindd: Call dgram cleanup init background setup.

Signed-off-by: Jeremy Allison <jra at samba.org>
Reviewed-by: Michael Adam <obnox at samba.org>
---
 source3/winbindd/winbindd.c | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/source3/winbindd/winbindd.c b/source3/winbindd/winbindd.c
index 8bc1343..7d91ad8 100644
--- a/source3/winbindd/winbindd.c
+++ b/source3/winbindd/winbindd.c
@@ -1568,6 +1568,10 @@ int main(int argc, const char **argv)
 
 	winbindd_register_handlers(winbind_messaging_context(), !Fork);
 
+	if (!messaging_parent_dgm_cleanup_init(winbind_messaging_context())) {
+		exit(1);
+	}
+
 	status = init_system_session_info();
 	if (!NT_STATUS_IS_OK(status)) {
 		DEBUG(1, ("ERROR: failed to setup system user info: %s.\n",
-- 
1.9.1.423.g4596e3a



More information about the samba-technical mailing list