[PATCH] Unix datagram socket messaging

Jeremy Allison jra at samba.org
Fri Apr 18 16:34:14 MDT 2014


On Wed, Apr 16, 2014 at 12:00:04PM +0200, Stefan (metze) Metzmacher wrote:
> 
> Can we pass tevent_context to poll_funcs_init_tevent() instead of
> using: func.private_data = ev; in the caller?

Done.

> There're environments with just winbindd running, so we might need a
> periodic
> cleanup run not only in smbd.

Refactored so cleanup is run in nmbd and winbindd also.

> I think we should cast with (struct sockaddr *)(void *)&addr, in order
> to avoid
> strict aliasing warnings on some systems.

Done.

> After socket() we should set FD_CLOEXEC and O_NONBLOCK similar to
> tsocket_bsd_common_prepare_fd() or set_blocking() and
> smb_set_close_on_exec().
> MSG_DONTWAIT might not be available everywhere.

Done.

> may need some more magic and also check for EAGAIN,
> see tsocket_bsd_error_from_errno() or commit
> f5a3d74264abb25009e73b1b35d62db34fa62343.
> ... ok, I just found that it's fixed in
> https://git.samba.org/?p=vl/samba.git/.git;a=commitdiff;h=325e62cbb116abdec7cf1466862bd12ecd42224e
> I think we should squash that and also this one
> https://git.samba.org/?p=vl/samba.git/.git;a=commitdiff;h=d44c82a1edddb0d838702e15e00538faae9386bc

Done.

> + unlink(ctx->path);
> 
> We may need to check that ctx->path is not an empty string?

I altered it so ctx->path can never be invalid
(returns EINVAL) on path==NULL.

Here is the latest full patchset updated with all
these changes. Compiles and I'm now running
through make test to ensure it's all OK.

I'll follow up to get this in once it passes
make tests on Monday ! Just wanted you and
Volker to see where I'd gotten with it.

Cheers,

	Jeremy.
-------------- next part --------------
From a2388474c1dce3f4edc571f48fb1f0653f5cb828 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 24 Feb 2014 11:43:51 +0000
Subject: [PATCH 01/24] lib: Add poll_funcs

This is an abstraction for a tevent loop. It will be used in low-level
messaging with the goal to make low-leve our low-level messaging routines
usable also for other projects which are not based on tevent.

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/lib/poll_funcs/poll_funcs.h        | 131 ++++++++++++++++++++++++++
 source3/lib/poll_funcs/poll_funcs_tevent.c | 143 +++++++++++++++++++++++++++++
 source3/lib/poll_funcs/poll_funcs_tevent.h |  26 ++++++
 source3/lib/poll_funcs/wscript_build       |   5 +
 source3/wscript_build                      |   1 +
 5 files changed, 306 insertions(+)
 create mode 100644 source3/lib/poll_funcs/poll_funcs.h
 create mode 100644 source3/lib/poll_funcs/poll_funcs_tevent.c
 create mode 100644 source3/lib/poll_funcs/poll_funcs_tevent.h
 create mode 100644 source3/lib/poll_funcs/wscript_build

diff --git a/source3/lib/poll_funcs/poll_funcs.h b/source3/lib/poll_funcs/poll_funcs.h
new file mode 100644
index 0000000..b23e7d9
--- /dev/null
+++ b/source3/lib/poll_funcs/poll_funcs.h
@@ -0,0 +1,131 @@
+/*
+ * Unix SMB/CIFS implementation.
+ * Copyright (C) Volker Lendecke 2013
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+/**
+ * @file poll_funcs.h
+ *
+ * @brief event loop abstraction
+ */
+
+/*
+ * This is inspired by AvahiWatch, the avahi event loop abstraction.
+ */
+
+#ifndef __POLL_FUNCS_H__
+#define __POLL_FUNCS_H__
+
+#include "replace.h"
+
+/**
+ * poll_watch and poll_timeout are undefined here, every implementation can
+ * implement its own structures.
+ */
+
+struct poll_watch;
+struct poll_timeout;
+
+struct poll_funcs {
+
+	/**
+	 * @brief Create a new file descriptor watch
+	 *
+	 * @param[in] funcs The callback array
+	 * @param[in] fd The fd to watch
+	 * @param[in] events POLLIN and POLLOUT or'ed together
+	 * @param[in] callback Function to call by the implementation
+	 * @param[in] private_data Pointer to give back to callback
+	 *
+	 * @return A new poll_watch struct
+	 */
+
+	struct poll_watch *(*watch_new)(
+		const struct poll_funcs *funcs, int fd, short events,
+		void (*callback)(struct poll_watch *w, int fd,
+				 short events, void *private_data),
+		void *private_data);
+
+	/**
+	 * @brief Change the watched events for a struct poll_watch
+	 *
+	 * @param[in] w The poll_watch to change
+	 * @param[in] events new POLLIN and POLLOUT or'ed together
+	 */
+
+	void (*watch_update)(struct poll_watch *w, short events);
+
+	/**
+	 * @brief Read events currently watched
+	 *
+	 * @param[in] w The poll_watch to inspect
+	 *
+	 * @returns The events currently watched
+	 */
+
+	short (*watch_get_events)(struct poll_watch *w);
+
+	/**
+	 * @brief Free a struct poll_watch
+	 *
+	 * @param[in] w The poll_watch struct to free
+	 */
+
+	void (*watch_free)(struct poll_watch *w);
+
+
+	/**
+	 * @brief Create a new timeout watch
+	 *
+	 * @param[in] funcs The callback array
+	 * @param[in] tv The time when the timeout should trigger
+	 * @param[in] callback Function to call at time "ts"
+	 * @param[in] private_data Pointer to give back to callback
+	 *
+	 * @return A new poll_timeout struct
+	 */
+
+	struct poll_timeout *(*timeout_new)(
+		const struct poll_funcs *funcs, const struct timeval *tv,
+		void (*callback)(struct poll_timeout *t, void *private_data),
+		void *private_data);
+
+	/**
+	 * @brief Change the timeout of a watch
+	 *
+	 * @param[in] t The timeout watch to change
+	 * @param[in] ts The new trigger time
+	 */
+
+	void (*timeout_update)(struct poll_timeout *t,
+			       const struct timespec *ts);
+
+	/**
+	 * @brief Free a poll_timeout
+	 *
+	 * @param[in] t The poll_timeout to free
+	 */
+
+	void (*timeout_free)(struct poll_timeout *t);
+
+	/**
+	 * @brief private data for use by the implmentation
+	 */
+
+	void *private_data;
+};
+
+#endif
diff --git a/source3/lib/poll_funcs/poll_funcs_tevent.c b/source3/lib/poll_funcs/poll_funcs_tevent.c
new file mode 100644
index 0000000..9627bb7
--- /dev/null
+++ b/source3/lib/poll_funcs/poll_funcs_tevent.c
@@ -0,0 +1,143 @@
+/*
+ * Unix SMB/CIFS implementation.
+ * Copyright (C) Volker Lendecke 2013
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "poll_funcs_tevent.h"
+#include "tevent.h"
+#include "system/select.h"
+
+struct poll_watch {
+	struct tevent_fd *fde;
+	int fd;
+	void (*callback)(struct poll_watch *w, int fd, short events,
+			 void *private_data);
+	void *private_data;
+};
+
+static uint16_t poll_events_to_tevent(short events)
+{
+	uint16_t ret = 0;
+
+	if (events & POLLIN) {
+		ret |= TEVENT_FD_READ;
+	}
+	if (events & POLLOUT) {
+		ret |= TEVENT_FD_WRITE;
+	}
+	return ret;
+}
+
+static short tevent_to_poll_events(uint16_t flags)
+{
+	short ret = 0;
+
+	if (flags & TEVENT_FD_READ) {
+		ret |= POLLIN;
+	}
+	if (flags & TEVENT_FD_WRITE) {
+		ret |= POLLOUT;
+	}
+	return ret;
+}
+
+static void tevent_watch_handler(struct tevent_context *ev,
+				 struct tevent_fd *fde, uint16_t flags,
+				 void *private_data);
+
+static struct poll_watch *tevent_watch_new(
+	const struct poll_funcs *funcs, int fd, short events,
+	void (*callback)(struct poll_watch *w, int fd, short events,
+			 void *private_data),
+	void *private_data)
+{
+	struct tevent_context *ev = talloc_get_type_abort(
+		funcs->private_data, struct tevent_context);
+	struct poll_watch *w;
+
+	w = talloc(ev, struct poll_watch);
+	if (w == NULL) {
+		return NULL;
+	}
+	w->fde = tevent_add_fd(ev, w, fd, poll_events_to_tevent(events),
+			       tevent_watch_handler, w);
+	if (w->fde == NULL) {
+		TALLOC_FREE(w);
+		return NULL;
+	}
+	w->fd = fd;
+	w->callback = callback;
+	w->private_data = private_data;
+	return w;
+}
+
+static void tevent_watch_handler(struct tevent_context *ev,
+				 struct tevent_fd *fde, uint16_t flags,
+				 void *private_data)
+{
+	struct poll_watch *w = talloc_get_type_abort(
+		private_data, struct poll_watch);
+
+	w->callback(w, w->fd, tevent_to_poll_events(flags),
+		    w->private_data);
+}
+
+static void tevent_watch_update(struct poll_watch *w, short events)
+{
+	tevent_fd_set_flags(w->fde, poll_events_to_tevent(events));
+}
+
+static short tevent_watch_get_events(struct poll_watch *w)
+{
+	return tevent_to_poll_events(tevent_fd_get_flags(w->fde));
+}
+
+static void tevent_watch_free(struct poll_watch *w)
+{
+	TALLOC_FREE(w);
+}
+
+static struct poll_timeout *tevent_timeout_new(
+	const struct poll_funcs *funcs, const struct timeval *tv,
+	void (*callback)(struct poll_timeout *t, void *private_data),
+	void *private_data)
+{
+	/* not implemented yet */
+	return NULL;
+}
+
+static void tevent_timeout_update(struct poll_timeout *t,
+				  const struct timespec *ts)
+{
+	return;
+}
+
+static void tevent_timeout_free(struct poll_timeout *t)
+{
+	return;
+}
+
+void poll_funcs_init_tevent(struct poll_funcs *f, void *private_data)
+{
+	f->watch_new = tevent_watch_new;
+	f->watch_update = tevent_watch_update;
+	f->watch_get_events = tevent_watch_get_events;
+	f->watch_free = tevent_watch_free;
+	f->timeout_new = tevent_timeout_new;
+	f->timeout_update = tevent_timeout_update;
+	f->timeout_free = tevent_timeout_free;
+	f->private_data = private_data;
+}
diff --git a/source3/lib/poll_funcs/poll_funcs_tevent.h b/source3/lib/poll_funcs/poll_funcs_tevent.h
new file mode 100644
index 0000000..5f1f7fd
--- /dev/null
+++ b/source3/lib/poll_funcs/poll_funcs_tevent.h
@@ -0,0 +1,26 @@
+/*
+ * Unix SMB/CIFS implementation.
+ * Copyright (C) Volker Lendecke 2013
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef __POLL_FUNCS_TEVENT_H__
+#define __POLL_FUNCS_TEVENT_H__
+
+#include "poll_funcs.h"
+
+void poll_funcs_init_tevent(struct poll_funcs *f, void *private_data);
+
+#endif
diff --git a/source3/lib/poll_funcs/wscript_build b/source3/lib/poll_funcs/wscript_build
new file mode 100644
index 0000000..ab24814
--- /dev/null
+++ b/source3/lib/poll_funcs/wscript_build
@@ -0,0 +1,5 @@
+#!/usr/bin/env python
+
+bld.SAMBA3_SUBSYSTEM('POLL_FUNCS_TEVENT',
+                     source='poll_funcs_tevent.c',
+		     deps='tevent')
diff --git a/source3/wscript_build b/source3/wscript_build
index b872cbc..fd53e2f 100755
--- a/source3/wscript_build
+++ b/source3/wscript_build
@@ -1452,6 +1452,7 @@ bld.RECURSE('auth')
 bld.RECURSE('libgpo/gpext')
 bld.RECURSE('lib/pthreadpool')
 bld.RECURSE('lib/asys')
+bld.RECURSE('lib/poll_funcs')
 bld.RECURSE('librpc')
 bld.RECURSE('librpc/idl')
 bld.RECURSE('libsmb')
-- 
1.9.1.423.g4596e3a


From 51a623f0b87b1bd038a769a2b12e3419c3f78c13 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 24 Feb 2014 11:48:16 +0000
Subject: [PATCH 02/24] lib: Add unix_msg

This is a messaging layer based on unix domain datagram sockets.

Sending to an idle socket is just one single nonblocking sendmsg call. If the
recv queue is full, we start a background thread to do a blocking call. The
source4 based imessaging uses a polling fallback. In a situation where
thousands of senders beat one single blocked socket, this will generate load on
the system due to the constant polling. This does not happen with a threaded
blocking send call.

The threaded approach has another advantage: We save become_root() calls on the
retries. The access checks are done when the blocking socket is connected, the
threaded blocking send call does not check permissions anymore.

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/lib/unix_msg/test_drain.c  |  70 ++++
 source3/lib/unix_msg/test_source.c |  79 ++++
 source3/lib/unix_msg/tests.c       | 225 ++++++++++
 source3/lib/unix_msg/unix_msg.c    | 826 +++++++++++++++++++++++++++++++++++++
 source3/lib/unix_msg/unix_msg.h    | 107 +++++
 source3/lib/unix_msg/wscript_build |  18 +
 source3/wscript_build              |   1 +
 7 files changed, 1326 insertions(+)
 create mode 100644 source3/lib/unix_msg/test_drain.c
 create mode 100644 source3/lib/unix_msg/test_source.c
 create mode 100644 source3/lib/unix_msg/tests.c
 create mode 100644 source3/lib/unix_msg/unix_msg.c
 create mode 100644 source3/lib/unix_msg/unix_msg.h
 create mode 100644 source3/lib/unix_msg/wscript_build

diff --git a/source3/lib/unix_msg/test_drain.c b/source3/lib/unix_msg/test_drain.c
new file mode 100644
index 0000000..6fe8c18
--- /dev/null
+++ b/source3/lib/unix_msg/test_drain.c
@@ -0,0 +1,70 @@
+#include "replace.h"
+#include "unix_msg.h"
+#include "poll_funcs/poll_funcs_tevent.h"
+#include "tevent.h"
+#include "system/select.h"
+
+struct cb_state {
+	unsigned num_received;
+	uint8_t *buf;
+	size_t buflen;
+};
+
+static void recv_cb(struct unix_msg_ctx *ctx,
+		    uint8_t *msg, size_t msg_len,
+		    void *private_data);
+
+int main(int argc, const char *argv[])
+{
+	struct poll_funcs funcs;
+	const char *sock;
+	struct unix_msg_ctx *ctx;
+	struct tevent_context *ev;
+	int ret;
+
+	struct cb_state state;
+
+	if (argc != 2) {
+		fprintf(stderr, "Usage: %s <sockname>\n", argv[0]);
+		return 1;
+	}
+
+	sock = argv[1];
+	unlink(sock);
+
+	ev = tevent_context_init(NULL);
+	if (ev == NULL) {
+		perror("tevent_context_init failed");
+		return 1;
+	}
+	poll_funcs_init_tevent(&funcs, ev);
+
+	ret = unix_msg_init(sock, &funcs, 256, 1,
+			    recv_cb, &state, &ctx);
+	if (ret != 0) {
+		fprintf(stderr, "unix_msg_init failed: %s\n",
+			strerror(ret));
+		return 1;
+	}
+
+	while (1) {
+		ret = tevent_loop_once(ev);
+		if (ret == -1) {
+			fprintf(stderr, "tevent_loop_once failed: %s\n",
+				strerror(errno));
+			exit(1);
+		}
+	}
+	return 0;
+}
+
+static void recv_cb(struct unix_msg_ctx *ctx,
+		    uint8_t *msg, size_t msg_len,
+		    void *private_data)
+{
+	unsigned num;
+	if (msg_len == sizeof(num)) {
+		memcpy(&num, msg, msg_len);
+		printf("%u\n", num);
+	}
+}
diff --git a/source3/lib/unix_msg/test_source.c b/source3/lib/unix_msg/test_source.c
new file mode 100644
index 0000000..bfafee1
--- /dev/null
+++ b/source3/lib/unix_msg/test_source.c
@@ -0,0 +1,79 @@
+#include "replace.h"
+#include "unix_msg.h"
+#include "poll_funcs/poll_funcs_tevent.h"
+#include "tevent.h"
+
+int main(int argc, const char *argv[])
+{
+	struct poll_funcs funcs;
+	struct unix_msg_ctx **ctxs;
+	struct tevent_context *ev;
+	struct iovec iov;
+	int ret;
+	unsigned i;
+	unsigned num_ctxs = 1;
+
+	if (argc < 2) {
+		fprintf(stderr, "Usage: %s <sockname> [num_contexts]\n", argv[0]);
+		return 1;
+	}
+	if (argc > 2) {
+		num_ctxs = atoi(argv[2]);
+	}
+
+	ev = tevent_context_init(NULL);
+	if (ev == NULL) {
+		perror("tevent_context_init failed");
+		return 1;
+	}
+	poll_funcs_init_tevent(&funcs, ev);
+
+	ctxs = talloc_array(ev, struct unix_msg_ctx *, num_ctxs);
+	if (ctxs == NULL) {
+		fprintf(stderr, "talloc failed\n");
+		return 1;
+	}
+
+	for (i=0; i<num_ctxs; i++) {
+		ret = unix_msg_init(NULL, &funcs, 256, 1, NULL, NULL,
+				    &ctxs[i]);
+		if (ret != 0) {
+			fprintf(stderr, "unix_msg_init failed: %s\n",
+				strerror(ret));
+			return 1;
+		}
+	}
+
+	iov.iov_base = &i;
+	iov.iov_len = sizeof(i);
+
+	for (i=0; i<num_ctxs; i++) {
+		unsigned j;
+
+		for (j=0; j<100000; j++) {
+			ret = unix_msg_send(ctxs[i], argv[1], &iov, 1);
+			if (ret != 0) {
+				fprintf(stderr, "unix_msg_send failed: %s\n",
+					strerror(ret));
+				return 1;
+			}
+		}
+	}
+
+	while (true) {
+		ret = tevent_loop_once(ev);
+		if (ret == -1) {
+			fprintf(stderr, "tevent_loop_once failed: %s\n",
+				strerror(errno));
+			exit(1);
+		}
+	}
+
+	for (i=0; i<num_ctxs; i++) {
+		unix_msg_free(ctxs[i]);
+	}
+
+	talloc_free(ev);
+
+	return 0;
+}
diff --git a/source3/lib/unix_msg/tests.c b/source3/lib/unix_msg/tests.c
new file mode 100644
index 0000000..2a4cf86
--- /dev/null
+++ b/source3/lib/unix_msg/tests.c
@@ -0,0 +1,225 @@
+#include "replace.h"
+#include "unix_msg.h"
+#include "poll_funcs/poll_funcs_tevent.h"
+#include "tevent.h"
+
+struct cb_state {
+	unsigned num_received;
+	uint8_t *buf;
+	size_t buflen;
+};
+
+static void recv_cb(struct unix_msg_ctx *ctx,
+		    uint8_t *msg, size_t msg_len,
+		    void *private_data);
+
+static void expect_messages(struct tevent_context *ev, struct cb_state *state,
+			    unsigned num_msgs)
+{
+	state->num_received = 0;
+
+	while (state->num_received < num_msgs) {
+		int ret;
+
+		ret = tevent_loop_once(ev);
+		if (ret == -1) {
+			fprintf(stderr, "tevent_loop_once failed: %s\n",
+				strerror(errno));
+			exit(1);
+		}
+	}
+}
+
+int main(void)
+{
+	struct poll_funcs funcs;
+	const char *sock1 = "sock1";
+	const char *sock2 = "sock2";
+	struct unix_msg_ctx *ctx1, *ctx2;
+	struct tevent_context *ev;
+	struct iovec iov;
+	uint8_t msg;
+	int i, ret;
+	static uint8_t buf[1755];
+
+	struct cb_state state;
+
+	unlink(sock1);
+	unlink(sock2);
+
+	ev = tevent_context_init(NULL);
+	if (ev == NULL) {
+		perror("tevent_context_init failed");
+		return 1;
+	}
+	poll_funcs_init_tevent(&funcs, ev);
+
+	ret = unix_msg_init(sock1, &funcs, 256, 1,
+			    recv_cb, &state, &ctx1);
+	if (ret != 0) {
+		fprintf(stderr, "unix_msg_init failed: %s\n",
+			strerror(ret));
+		return 1;
+	}
+
+	ret = unix_msg_init(sock1, &funcs, 256, 1,
+			    recv_cb, &state, &ctx1);
+	if (ret == 0) {
+		fprintf(stderr, "unix_msg_init succeeded unexpectedly\n");
+		return 1;
+	}
+	if (ret != EADDRINUSE) {
+		fprintf(stderr, "unix_msg_init returned %s, expected "
+			"EADDRINUSE\n", strerror(ret));
+		return 1;
+	}
+
+	ret = unix_msg_init(sock2, &funcs, 256, 1,
+			    recv_cb, &state, &ctx2);
+	if (ret != 0) {
+		fprintf(stderr, "unix_msg_init failed: %s\n",
+			strerror(ret));
+		return 1;
+	}
+
+	printf("sending a 0-length message\n");
+
+	state.buf = NULL;
+	state.buflen = 0;
+
+	ret = unix_msg_send(ctx1, sock2, NULL, 0);
+	if (ret != 0) {
+		fprintf(stderr, "unix_msg_send failed: %s\n",
+			strerror(ret));
+		return 1;
+	}
+
+	expect_messages(ev, &state, 1);
+
+	printf("sending a small message\n");
+
+	msg = random();
+	iov.iov_base = &msg;
+	iov.iov_len = sizeof(msg);
+	state.buf = &msg;
+	state.buflen = sizeof(msg);
+
+	ret = unix_msg_send(ctx1, sock2, &iov, 1);
+	if (ret != 0) {
+		fprintf(stderr, "unix_msg_send failed: %s\n",
+			strerror(ret));
+		return 1;
+	}
+
+	expect_messages(ev, &state, 1);
+
+	printf("sending six large, interleaved messages\n");
+
+	for (i=0; i<sizeof(buf); i++) {
+		buf[i] = random();
+	}
+
+	iov.iov_base = buf;
+	iov.iov_len = sizeof(buf);
+	state.buf = buf;
+	state.buflen = sizeof(buf);
+
+	for (i=0; i<3; i++) {
+		ret = unix_msg_send(ctx1, sock2, &iov, 1);
+		if (ret != 0) {
+			fprintf(stderr, "unix_msg_send failed: %s\n",
+				strerror(ret));
+			return 1;
+		}
+		ret = unix_msg_send(ctx2, sock2, &iov, 1);
+		if (ret != 0) {
+			fprintf(stderr, "unix_msg_send failed: %s\n",
+				strerror(ret));
+			return 1;
+		}
+	}
+
+	expect_messages(ev, &state, 6);
+
+	printf("sending a few messages in small pieces\n");
+
+	for (i = 0; i<5; i++) {
+		struct iovec iovs[20];
+		const size_t num_iovs = ARRAY_SIZE(iovs);
+		uint8_t *p = buf;
+		size_t j;
+
+		for (j=0; j<num_iovs-1; j++) {
+			size_t chunk = (random() % ((sizeof(buf) * 2) / num_iovs));
+			size_t space = (sizeof(buf) - (p - buf));
+
+			if (space == 0) {
+				break;
+			}
+
+			chunk = MIN(chunk, space);
+
+			iovs[j].iov_base = p;
+			iovs[j].iov_len = chunk;
+			p += chunk;
+		}
+
+		if (p < (buf + sizeof(buf))) {
+			iovs[j].iov_base = p;
+			iovs[j].iov_len = (sizeof(buf) - (p - buf));
+			j++;
+		}
+
+		ret = unix_msg_send(ctx1, sock1, iovs, j);
+		if (ret != 0) {
+			fprintf(stderr, "unix_msg_send failed: %s\n",
+				strerror(ret));
+			return 1;
+		}
+	}
+
+	expect_messages(ev, &state, 5);
+
+	printf("Filling send queues before freeing\n");
+
+	for (i=0; i<5; i++) {
+		ret = unix_msg_send(ctx1, sock2, &iov, 1);
+		if (ret != 0) {
+			fprintf(stderr, "unix_msg_send failed: %s\n",
+				strerror(ret));
+			return 1;
+		}
+		ret = unix_msg_send(ctx1, sock1, &iov, 1);
+		if (ret != 0) {
+			fprintf(stderr, "unix_msg_send failed: %s\n",
+				strerror(ret));
+			return 1;
+		}
+	}
+
+	expect_messages(ev, &state, 1); /* Read just one msg */
+
+	unix_msg_free(ctx1);
+	unix_msg_free(ctx2);
+	talloc_free(ev);
+
+	return 0;
+}
+
+static void recv_cb(struct unix_msg_ctx *ctx,
+		    uint8_t *msg, size_t msg_len,
+		    void *private_data)
+{
+	struct cb_state *state = (struct cb_state *)private_data;
+
+	if (msg_len != state->buflen) {
+		fprintf(stderr, "expected %u bytes, got %u\n",
+			(unsigned)state->buflen, (unsigned)msg_len);
+		exit(1);
+	}
+	if ((msg_len != 0) && (memcmp(msg, state->buf, msg_len) != 0)) {
+		fprintf(stderr, "message content differs\n");
+		exit(1);
+	}
+	state->num_received += 1;
+}
diff --git a/source3/lib/unix_msg/unix_msg.c b/source3/lib/unix_msg/unix_msg.c
new file mode 100644
index 0000000..b6f4cdf
--- /dev/null
+++ b/source3/lib/unix_msg/unix_msg.c
@@ -0,0 +1,826 @@
+/*
+ * Unix SMB/CIFS implementation.
+ * Copyright (C) Volker Lendecke 2013
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "unix_msg.h"
+#include "system/select.h"
+#include "system/time.h"
+#include "dlinklist.h"
+#include "pthreadpool/pthreadpool.h"
+
+/*
+ * This file implements two abstractions: The "unix_dgram" functions implement
+ * queueing for unix domain datagram sockets. You can send to a destination
+ * socket, and if that has no free space available, it will fall back to an
+ * anonymous socket that will poll for writability. "unix_dgram" expects the
+ * data size not to exceed the system limit.
+ *
+ * The "unix_msg" functions implement the fragmentation of large messages on
+ * top of "unix_dgram". This is what is exposed to the user of this API.
+ */
+
+struct unix_dgram_msg {
+	struct unix_dgram_msg *prev, *next;
+
+	int sock;
+	ssize_t sent;
+	int sys_errno;
+	size_t buflen;
+	uint8_t buf[1];
+};
+
+struct unix_dgram_send_queue {
+	struct unix_dgram_send_queue *prev, *next;
+	struct unix_dgram_ctx *ctx;
+	int sock;
+	struct unix_dgram_msg *msgs;
+	char path[1];
+};
+
+struct unix_dgram_ctx {
+	int sock;
+	pid_t created_pid;
+	const struct poll_funcs *ev_funcs;
+	size_t max_msg;
+
+	void (*recv_callback)(struct unix_dgram_ctx *ctx,
+			      uint8_t *msg, size_t msg_len,
+			      void *private_data);
+	void *private_data;
+
+	struct poll_watch *sock_read_watch;
+	struct unix_dgram_send_queue *send_queues;
+
+	struct pthreadpool *send_pool;
+	struct poll_watch *pool_read_watch;
+
+	uint8_t *recv_buf;
+	char path[1];
+};
+
+static ssize_t iov_buflen(const struct iovec *iov, int iovlen);
+static void unix_dgram_recv_handler(struct poll_watch *w, int fd, short events,
+				    void *private_data);
+
+/* Set socket non blocking and close on exec. */
+static int prepare_socket(int sock)
+{
+	int flags;
+#ifdef O_NONBLOCK
+#define FLAG_TO_SET O_NONBLOCK
+#else
+#ifdef SYSV
+#define FLAG_TO_SET O_NDELAY
+#else /* BSD */
+#define FLAG_TO_SET FNDELAY
+#endif
+#endif
+
+	flags = fcntl(fd, F_GETFL);
+	if (flags == -1) {
+		return errno;
+	}
+	flags |= FLAG_TO_SET;
+	if (fcntl(fd, F_SETFL, flags) == -1) {
+		return errno;
+	}
+
+#undef FLAG_TO_SET
+
+#ifdef FD_CLOEXEC
+	flags = fcntl(fd, F_GETFD, 0);
+	if (flags == -1) {
+		return errno;
+	}
+	flags |= FD_CLOEXEC;
+	if (fcntl(fd, F_SETFD, flags) == -1) {
+		return errno;
+	}
+#endif
+	return 0;
+}
+
+static int unix_dgram_init(const char *path, size_t max_msg,
+			   const struct poll_funcs *ev_funcs,
+			   void (*recv_callback)(struct unix_dgram_ctx *ctx,
+						 uint8_t *msg, size_t msg_len,
+						 void *private_data),
+			   void *private_data,
+			   struct unix_dgram_ctx **result)
+{
+	struct unix_dgram_ctx *ctx;
+	struct sockaddr_un addr = { 0, };
+	size_t pathlen;
+	int ret;
+
+	if (path == NULL) {
+		return EINVAL;
+	}
+	pathlen = strlen(path)+1;
+	if (pathlen > sizeof(addr.sun_path)) {
+		return ENAMETOOLONG;
+	}
+
+	ctx = malloc(offsetof(struct unix_dgram_ctx, path) + pathlen);
+	if (ctx == NULL) {
+		return ENOMEM;
+	}
+	memcpy(ctx->path, path, pathlen);
+
+	ctx->recv_buf = malloc(max_msg);
+	if (ctx->recv_buf == NULL) {
+		free(ctx);
+		return ENOMEM;
+	}
+	ctx->max_msg = max_msg;
+	ctx->ev_funcs = ev_funcs;
+	ctx->recv_callback = recv_callback;
+	ctx->private_data = private_data;
+	ctx->sock_read_watch = NULL;
+	ctx->send_pool = NULL;
+	ctx->pool_read_watch = NULL;
+	ctx->send_queues = NULL;
+
+	ctx->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
+	if (ctx->sock == -1) {
+		ret = errno;
+		goto fail_free;
+	}
+
+	/* Set non-blocking and close-on-exec. */
+	ret = prepare_socket(ctx->sock);
+	if (ret != 0) {
+		goto fail_close;
+	}
+
+	ctx->created_pid = getpid();
+
+	if (path != NULL) {
+		addr.sun_family = AF_UNIX;
+		memcpy(addr.sun_path, path, pathlen);
+
+		ret = bind(ctx->sock, (struct sockaddr *)(void *)&addr,
+				sizeof(addr));
+		if (ret == -1) {
+			ret = errno;
+			goto fail_close;
+		}
+
+		ctx->sock_read_watch = ctx->ev_funcs->watch_new(
+			ctx->ev_funcs, ctx->sock, POLLIN,
+			unix_dgram_recv_handler, ctx);
+
+		if (ctx->sock_read_watch == NULL) {
+			ret = ENOMEM;
+			goto fail_close;
+		}
+	}
+
+	*result = ctx;
+	return 0;
+
+fail_close:
+	close(ctx->sock);
+fail_free:
+	free(ctx->recv_buf);
+	free(ctx);
+	return ret;
+}
+
+static void unix_dgram_recv_handler(struct poll_watch *w, int fd, short events,
+				    void *private_data)
+{
+	struct unix_dgram_ctx *ctx = (struct unix_dgram_ctx *)private_data;
+	ssize_t received;
+
+	received = recv(fd, ctx->recv_buf, ctx->max_msg, 0);
+	if (received == -1) {
+		if ((errno == EAGAIN) ||
+#ifdef EWOULDBLOCK
+		    (errno == EWOULDBLOCK) ||
+#endif
+		    (errno == EINTR) || (errno == ENOMEM)) {
+			/* Not really an error - just try again. */
+			return;
+		}
+		/* Problem with the socket. Set it unreadable. */
+		ctx->ev_funcs->watch_update(w, 0);
+		return;
+	}
+	if (received > ctx->max_msg) {
+		/* More than we expected, not for us */
+		return;
+	}
+	ctx->recv_callback(ctx, ctx->recv_buf, received, ctx->private_data);
+}
+
+static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
+				    void *private_data);
+
+static int unix_dgram_init_pthreadpool(struct unix_dgram_ctx *ctx)
+{
+	int ret, signalfd;
+
+	if (ctx->send_pool != NULL) {
+		return 0;
+	}
+
+	ret = pthreadpool_init(0, &ctx->send_pool);
+	if (ret != 0) {
+		return ret;
+	}
+
+	signalfd = pthreadpool_signal_fd(ctx->send_pool);
+
+	ctx->pool_read_watch = ctx->ev_funcs->watch_new(
+		ctx->ev_funcs, signalfd, POLLIN,
+		unix_dgram_job_finished, ctx);
+	if (ctx->pool_read_watch == NULL) {
+		pthreadpool_destroy(ctx->send_pool);
+		ctx->send_pool = NULL;
+		return ENOMEM;
+	}
+
+	return 0;
+}
+
+static int unix_dgram_send_queue_init(
+	struct unix_dgram_ctx *ctx, const char *path,
+	struct unix_dgram_send_queue **result)
+{
+	struct unix_dgram_send_queue *q;
+	struct sockaddr_un addr = { 0, };
+	size_t pathlen;
+	int ret, err;
+
+	pathlen = strlen(path)+1;
+
+	if (pathlen > sizeof(addr.sun_path)) {
+		return ENAMETOOLONG;
+	}
+
+	q = malloc(offsetof(struct unix_dgram_send_queue, path) + pathlen);
+	if (q == NULL) {
+		return ENOMEM;
+	}
+	q->ctx = ctx;
+	q->msgs = NULL;
+	memcpy(q->path, path, pathlen);
+
+	q->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
+	if (q->sock == -1) {
+		err = errno;
+		goto fail_free;
+	}
+
+	addr.sun_family = AF_UNIX;
+	memcpy(addr.sun_path, path, pathlen+1);
+
+	do {
+		ret = connect(q->sock, (struct sockaddr *)&addr, sizeof(addr));
+	} while ((ret == -1) && (errno == EINTR));
+
+	if (ret == -1) {
+		err = errno;
+		goto fail_close;
+	}
+
+	err = unix_dgram_init_pthreadpool(ctx);
+	if (err != 0) {
+		goto fail_close;
+	}
+
+	DLIST_ADD(ctx->send_queues, q);
+
+	*result = q;
+	return 0;
+
+fail_close:
+	close(q->sock);
+fail_free:
+	free(q);
+	return err;
+}
+
+static void unix_dgram_send_queue_free(struct unix_dgram_send_queue *q)
+{
+	struct unix_dgram_ctx *ctx = q->ctx;
+
+	while (q->msgs != NULL) {
+		struct unix_dgram_msg *msg;
+		msg = q->msgs;
+		DLIST_REMOVE(q->msgs, msg);
+		free(msg);
+	}
+	close(q->sock);
+	DLIST_REMOVE(ctx->send_queues, q);
+	free(q);
+}
+
+static struct unix_dgram_send_queue *find_send_queue(
+	struct unix_dgram_ctx *ctx, const char *dst_sock)
+{
+	struct unix_dgram_send_queue *s;
+
+	for (s = ctx->send_queues; s != NULL; s = s->next) {
+		if (strcmp(s->path, dst_sock) == 0) {
+			return s;
+		}
+	}
+	return NULL;
+}
+
+static int queue_msg(struct unix_dgram_send_queue *q,
+		     const struct iovec *iov, int iovlen)
+{
+	struct unix_dgram_msg *msg;
+	ssize_t buflen;
+	size_t msglen;
+	int i;
+
+	buflen = iov_buflen(iov, iovlen);
+	if (buflen == -1) {
+		return EINVAL;
+	}
+
+	msglen = offsetof(struct unix_dgram_msg, buf) + buflen;
+	if ((msglen < buflen) ||
+	    (msglen < offsetof(struct unix_dgram_msg, buf))) {
+		/* overflow */
+		return EINVAL;
+	}
+
+	msg = malloc(msglen);
+	if (msg == NULL) {
+		return ENOMEM;
+	}
+	msg->buflen = buflen;
+	msg->sock = q->sock;
+
+	buflen = 0;
+	for (i=0; i<iovlen; i++) {
+		memcpy(&msg->buf[buflen], iov[i].iov_base, iov[i].iov_len);
+		buflen += iov[i].iov_len;
+	}
+
+	DLIST_ADD_END(q->msgs, msg, struct unix_dgram_msg);
+	return 0;
+}
+
+static void unix_dgram_send_job(void *private_data)
+{
+	struct unix_dgram_msg *msg = private_data;
+
+	do {
+		msg->sent = send(msg->sock, msg->buf, msg->buflen, 0);
+	} while ((msg->sent == -1) && (errno == EINTR));
+}
+
+static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
+				    void *private_data)
+{
+	struct unix_dgram_ctx *ctx = private_data;
+	struct unix_dgram_send_queue *q;
+	struct unix_dgram_msg *msg;
+	int ret, job;
+
+	ret = pthreadpool_finished_jobs(ctx->send_pool, &job, 1);
+	if (ret != 1) {
+		return;
+	}
+
+	for (q = ctx->send_queues; q != NULL; q = q->next) {
+		if (job == q->sock) {
+			break;
+		}
+	}
+
+	if (q == NULL) {
+		/* Huh? Should not happen */
+		return;
+	}
+
+	msg = q->msgs;
+	DLIST_REMOVE(q->msgs, msg);
+	free(msg);
+
+	if (q->msgs != NULL) {
+		ret = pthreadpool_add_job(ctx->send_pool, q->sock,
+					  unix_dgram_send_job, q->msgs);
+		if (ret == 0) {
+			return;
+		}
+	}
+
+	unix_dgram_send_queue_free(q);
+}
+
+static int unix_dgram_send(struct unix_dgram_ctx *ctx, const char *dst_sock,
+			   const struct iovec *iov, int iovlen)
+{
+	struct unix_dgram_send_queue *q;
+	struct sockaddr_un addr = { 0, };
+	struct msghdr msg;
+	size_t dst_len;
+	int ret;
+
+	dst_len = strlen(dst_sock);
+	if (dst_len >= sizeof(addr.sun_path)) {
+		return ENAMETOOLONG;
+	}
+
+	/*
+	 * To preserve message ordering, we have to queue a message when
+	 * others are waiting in line already.
+	 */
+	q = find_send_queue(ctx, dst_sock);
+	if (q != NULL) {
+		return queue_msg(q, iov, iovlen);
+	}
+
+	/*
+	 * Try a cheap nonblocking send
+	 */
+
+	addr.sun_family = AF_UNIX;
+	memcpy(addr.sun_path, dst_sock, dst_len);
+
+	msg.msg_name = &addr;
+	msg.msg_namelen = sizeof(addr);
+	msg.msg_iov = discard_const_p(struct iovec, iov);
+	msg.msg_iovlen = iovlen;
+	msg.msg_control = NULL;
+	msg.msg_controllen = 0;
+	msg.msg_flags = 0;
+
+	ret = sendmsg(ctx->sock, &msg, 0);
+	if (ret >= 0) {
+		return 0;
+	}
+#ifdef EWOULDBLOCK
+	if ((errno != EWOULDBLOCK) && (errno != EAGAIN) && (errno != EINTR)) {
+#else
+	if ((errno != EAGAIN) && (errno != EINTR)) {
+#endif
+		return errno;
+	}
+
+	ret = unix_dgram_send_queue_init(ctx, dst_sock, &q);
+	if (ret != 0) {
+		return ret;
+	}
+	ret = queue_msg(q, iov, iovlen);
+	if (ret != 0) {
+		unix_dgram_send_queue_free(q);
+		return ret;
+	}
+	ret = pthreadpool_add_job(ctx->send_pool, q->sock,
+				  unix_dgram_send_job, q->msgs);
+	if (ret != 0) {
+		unix_dgram_send_queue_free(q);
+		return ret;
+	}
+	return 0;
+}
+
+static int unix_dgram_sock(struct unix_dgram_ctx *ctx)
+{
+	return ctx->sock;
+}
+
+static int unix_dgram_free(struct unix_dgram_ctx *ctx)
+{
+	if (ctx->send_queues != NULL) {
+		return EBUSY;
+	}
+
+	if (ctx->send_pool != NULL) {
+		int ret = pthreadpool_destroy(ctx->send_pool);
+		if (ret != 0) {
+			return ret;
+		}
+		ctx->ev_funcs->watch_free(ctx->pool_read_watch);
+	}
+
+	ctx->ev_funcs->watch_free(ctx->sock_read_watch);
+
+	if (getpid() == ctx->created_pid) {
+		/* If we created it, unlink. Otherwise someone else might
+		 * still have it open */
+		unlink(ctx->path);
+	}
+
+	close(ctx->sock);
+	free(ctx->recv_buf);
+	free(ctx);
+	return 0;
+}
+
+/*
+ * Every message starts with a uint64_t cookie.
+ *
+ * A value of 0 indicates a single-fragment message which is complete in
+ * itself. The data immediately follows the cookie.
+ *
+ * Every multi-fragment message has a cookie != 0 and starts with a cookie
+ * followed by a struct unix_msg_header and then the data. The pid and sock
+ * fields are used to assure uniqueness on the receiver side.
+ */
+
+struct unix_msg_hdr {
+	size_t msglen;
+	pid_t pid;
+	int sock;
+};
+
+struct unix_msg {
+	struct unix_msg *prev, *next;
+	size_t msglen;
+	size_t received;
+	pid_t sender_pid;
+	int sender_sock;
+	uint64_t cookie;
+	uint8_t buf[1];
+};
+
+struct unix_msg_ctx {
+	struct unix_dgram_ctx *dgram;
+	size_t fragment_len;
+	uint64_t cookie;
+
+	void (*recv_callback)(struct unix_msg_ctx *ctx,
+			      uint8_t *msg, size_t msg_len,
+			      void *private_data);
+	void *private_data;
+
+	struct unix_msg *msgs;
+};
+
+static void unix_msg_recv(struct unix_dgram_ctx *ctx,
+			  uint8_t *msg, size_t msg_len,
+			  void *private_data);
+
+int unix_msg_init(const char *path, const struct poll_funcs *ev_funcs,
+		  size_t fragment_len, uint64_t cookie,
+		  void (*recv_callback)(struct unix_msg_ctx *ctx,
+					uint8_t *msg, size_t msg_len,
+					void *private_data),
+		  void *private_data,
+		  struct unix_msg_ctx **result)
+{
+	struct unix_msg_ctx *ctx;
+	int ret;
+
+	ctx = malloc(sizeof(*ctx));
+	if (ctx == NULL) {
+		return ENOMEM;
+	}
+
+	ret = unix_dgram_init(path, fragment_len, ev_funcs,
+			      unix_msg_recv, ctx, &ctx->dgram);
+	if (ret != 0) {
+		free(ctx);
+		return ret;
+	}
+
+	ctx->fragment_len = fragment_len;
+	ctx->cookie = cookie;
+	ctx->recv_callback = recv_callback;
+	ctx->private_data = private_data;
+	ctx->msgs = NULL;
+
+	*result = ctx;
+	return 0;
+}
+
+int unix_msg_send(struct unix_msg_ctx *ctx, const char *dst_sock,
+		  const struct iovec *iov, int iovlen)
+{
+	ssize_t msglen;
+	size_t sent;
+	int ret = 0;
+	struct iovec *iov_copy;
+	struct unix_msg_hdr hdr;
+	struct iovec src_iov;
+
+	if (iovlen < 0) {
+		return EINVAL;
+	}
+
+	msglen = iov_buflen(iov, iovlen);
+	if (msglen == -1) {
+		return EINVAL;
+	}
+
+	if ((iovlen < 16) &&
+	    (msglen <= (ctx->fragment_len - sizeof(uint64_t)))) {
+		struct iovec tmp_iov[16];
+		uint64_t cookie = 0;
+
+		tmp_iov[0].iov_base = &cookie;
+		tmp_iov[0].iov_len = sizeof(cookie);
+		if (iovlen > 0) {
+			memcpy(&tmp_iov[1], iov,
+			       sizeof(struct iovec) * iovlen);
+		}
+
+		return unix_dgram_send(ctx->dgram, dst_sock, tmp_iov,
+				       iovlen+1);
+	}
+
+	hdr.msglen = msglen;
+	hdr.pid = getpid();
+	hdr.sock = unix_dgram_sock(ctx->dgram);
+
+	iov_copy = malloc(sizeof(struct iovec) * (iovlen + 2));
+	if (iov_copy == NULL) {
+		return ENOMEM;
+	}
+	iov_copy[0].iov_base = &ctx->cookie;
+	iov_copy[0].iov_len = sizeof(ctx->cookie);
+	iov_copy[1].iov_base = &hdr;
+	iov_copy[1].iov_len = sizeof(hdr);
+
+	sent = 0;
+	src_iov = iov[0];
+
+	/*
+	 * The following write loop sends the user message in pieces. We have
+	 * filled the first two iovecs above with "cookie" and "hdr". In the
+	 * following loops we pull message chunks from the user iov array and
+	 * fill iov_copy piece by piece, possibly truncating chunks from the
+	 * caller's iov array. Ugly, but hopefully efficient.
+	 */
+
+	while (sent < msglen) {
+		size_t fragment_len;
+		size_t iov_index = 2;
+
+		fragment_len = sizeof(ctx->cookie) + sizeof(hdr);
+
+		while (fragment_len < ctx->fragment_len) {
+			size_t space, chunk;
+
+			space = ctx->fragment_len - fragment_len;
+			chunk = MIN(space, src_iov.iov_len);
+
+			iov_copy[iov_index].iov_base = src_iov.iov_base;
+			iov_copy[iov_index].iov_len = chunk;
+			iov_index += 1;
+
+			src_iov.iov_base = (char *)src_iov.iov_base + chunk;
+			src_iov.iov_len -= chunk;
+			fragment_len += chunk;
+
+			if (src_iov.iov_len == 0) {
+				iov += 1;
+				iovlen -= 1;
+				if (iovlen == 0) {
+					break;
+				}
+				src_iov = iov[0];
+			}
+		}
+		sent += (fragment_len - sizeof(ctx->cookie) - sizeof(hdr));
+
+		ret = unix_dgram_send(ctx->dgram, dst_sock,
+				      iov_copy, iov_index);
+		if (ret != 0) {
+			break;
+		}
+	}
+
+	free(iov_copy);
+
+	ctx->cookie += 1;
+	if (ctx->cookie == 0) {
+		ctx->cookie += 1;
+	}
+
+	return ret;
+}
+
+static void unix_msg_recv(struct unix_dgram_ctx *dgram_ctx,
+			  uint8_t *buf, size_t buflen,
+			  void *private_data)
+{
+	struct unix_msg_ctx *ctx = (struct unix_msg_ctx *)private_data;
+	struct unix_msg_hdr hdr;
+	struct unix_msg *msg;
+	size_t space;
+	uint64_t cookie;
+
+	if (buflen < sizeof(cookie)) {
+		return;
+	}
+	memcpy(&cookie, buf, sizeof(cookie));
+
+	buf += sizeof(cookie);
+	buflen -= sizeof(cookie);
+
+	if (cookie == 0) {
+		ctx->recv_callback(ctx,	buf, buflen, ctx->private_data);
+		return;
+	}
+
+	if (buflen < sizeof(hdr)) {
+		return;
+	}
+	memcpy(&hdr, buf, sizeof(hdr));
+
+	buf += sizeof(hdr);
+	buflen -= sizeof(hdr);
+
+	for (msg = ctx->msgs; msg != NULL; msg = msg->next) {
+		if ((msg->sender_pid == hdr.pid) &&
+		    (msg->sender_sock == hdr.sock)) {
+			break;
+		}
+	}
+
+	if ((msg != NULL) && (msg->cookie != cookie)) {
+		DLIST_REMOVE(ctx->msgs, msg);
+		free(msg);
+		msg = NULL;
+	}
+
+	if (msg == NULL) {
+		msg = malloc(offsetof(struct unix_msg, buf) + hdr.msglen);
+		if (msg == NULL) {
+			return;
+		}
+		msg->msglen = hdr.msglen;
+		msg->received = 0;
+		msg->sender_pid = hdr.pid;
+		msg->sender_sock = hdr.sock;
+		msg->cookie = cookie;
+		DLIST_ADD(ctx->msgs, msg);
+	}
+
+	space = msg->msglen - msg->received;
+	if (buflen > space) {
+		return;
+	}
+
+	memcpy(msg->buf + msg->received, buf, buflen);
+	msg->received += buflen;
+
+	if (msg->received < msg->msglen) {
+		return;
+	}
+
+	DLIST_REMOVE(ctx->msgs, msg);
+	ctx->recv_callback(ctx, msg->buf, msg->msglen, ctx->private_data);
+	free(msg);
+}
+
+int unix_msg_free(struct unix_msg_ctx *ctx)
+{
+	int ret;
+
+	ret = unix_dgram_free(ctx->dgram);
+	if (ret != 0) {
+		return ret;
+	}
+
+	while (ctx->msgs != NULL) {
+		struct unix_msg *msg = ctx->msgs;
+		DLIST_REMOVE(ctx->msgs, msg);
+		free(msg);
+	}
+
+	free(ctx);
+	return 0;
+}
+
+static ssize_t iov_buflen(const struct iovec *iov, int iovlen)
+{
+	size_t buflen = 0;
+	int i;
+
+	for (i=0; i<iovlen; i++) {
+		size_t thislen = iov[i].iov_len;
+		size_t tmp = buflen + thislen;
+
+		if ((tmp < buflen) || (tmp < thislen)) {
+			/* overflow */
+			return -1;
+		}
+		buflen = tmp;
+	}
+	return buflen;
+}
diff --git a/source3/lib/unix_msg/unix_msg.h b/source3/lib/unix_msg/unix_msg.h
new file mode 100644
index 0000000..fc636d8
--- /dev/null
+++ b/source3/lib/unix_msg/unix_msg.h
@@ -0,0 +1,107 @@
+/*
+ * Unix SMB/CIFS implementation.
+ * Copyright (C) Volker Lendecke 2013
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef __UNIX_DGRAM_H__
+#define __UNIX_DGRAM_H__
+
+#include "replace.h"
+#include "poll_funcs/poll_funcs.h"
+#include "system/network.h"
+
+/**
+ * @file unix_msg.h
+ *
+ * @brief Send large messages over unix domain datagram sockets
+ *
+ * A unix_msg_ctx represents a unix domain datagram socket.
+ *
+ * Unix domain datagram sockets have some unique properties compared with UDP
+ * sockets:
+ *
+ * - They are reliable, i.e. as long as both sender and receiver are processes
+ *   that are alive, nothing is lost.
+ *
+ * - They preserve sequencing
+ *
+ * Based on these two properties, this code implements sending of large
+ * messages. It aims at being maximally efficient for short, single-datagram
+ * messages. Ideally, if the receiver queue is not full, sending a message
+ * should be a single syscall without malloc. Receiving a message should also
+ * not malloc anything before the data is shipped to the user.
+ *
+ * If unix_msg_send meets a full receive buffer, more effort is required: The
+ * socket behind unix_msg_send is not pollable for POLLOUT, it will always be
+ * writable: A datagram socket can send anywhere, the full queue is a property
+ * of of the receiving socket. unix_msg_send creates a new unnamed socket that
+ * it will connect(2) to the target socket. This unnamed socket is then
+ * pollable for POLLOUT. The socket will be writable when the destination
+ * socket's queue is drained sufficiently.
+ *
+ * If unix_msg_send is asked to send a message larger than fragment_size, it
+ * will try sending the message in pieces with proper framing, the receiving
+ * side will reassemble the messages.
+ */
+
+/**
+ * @brief Abstract structure representing a unix domain datagram socket
+ */
+struct unix_msg_ctx;
+
+/**
+ * @brief Initialize a struct unix_msg_ctx
+ *
+ * @param[in] path The socket path
+ * @param[in] ev_funcs The event callback functions to use
+ * @param[in] fragment_size Maximum datagram size to send/receive
+ * @param[in] cookie Random number to identify this context
+ * @param[in] recv_callback Function called when a message is received
+ * @param[in] private_data Private pointer for recv_callback
+ * @param[out] result The new struct unix_msg_ctx
+ * @return 0 on success, errno on failure
+ */
+
+int unix_msg_init(const char *path, const struct poll_funcs *ev_funcs,
+		  size_t fragment_size, uint64_t cookie,
+		  void (*recv_callback)(struct unix_msg_ctx *ctx,
+					uint8_t *msg, size_t msg_len,
+					void *private_data),
+		  void *private_data,
+		  struct unix_msg_ctx **result);
+
+/**
+ * @brief Send a message
+ *
+ * @param[in] ctx The context to send across
+ * @param[in] dst_sock The destination socket path
+ * @param[in] iov The message
+ * @param[in] iovlen The number of iov structs
+ * @return 0 on success, errno on failure
+ */
+
+int unix_msg_send(struct unix_msg_ctx *ctx, const char *dst_sock,
+		  const struct iovec *iov, int iovlen);
+
+/**
+ * @brief Free a unix_msg_ctx
+ *
+ * @param[in] ctx The message context to free
+ * @return 0 on success, errno on failure (EBUSY)
+ */
+int unix_msg_free(struct unix_msg_ctx *ctx);
+
+#endif
diff --git a/source3/lib/unix_msg/wscript_build b/source3/lib/unix_msg/wscript_build
new file mode 100644
index 0000000..200840d
--- /dev/null
+++ b/source3/lib/unix_msg/wscript_build
@@ -0,0 +1,18 @@
+#!/usr/bin/env python
+
+bld.SAMBA3_SUBSYSTEM('UNIX_MSG',
+                     source='unix_msg.c',
+		     deps='replace PTHREADPOOL')
+
+bld.SAMBA3_BINARY('unix_msg_test',
+                  source='tests.c',
+                  deps='UNIX_MSG POLL_FUNCS_TEVENT',
+                  install=False)
+bld.SAMBA3_BINARY('unix_msg_test_drain',
+                  source='test_drain.c',
+                  deps='UNIX_MSG POLL_FUNCS_TEVENT',
+                  install=False)
+bld.SAMBA3_BINARY('unix_msg_test_source',
+                  source='test_source.c',
+                  deps='UNIX_MSG POLL_FUNCS_TEVENT',
+                  install=False)
diff --git a/source3/wscript_build b/source3/wscript_build
index fd53e2f..4d261c6 100755
--- a/source3/wscript_build
+++ b/source3/wscript_build
@@ -1453,6 +1453,7 @@ bld.RECURSE('libgpo/gpext')
 bld.RECURSE('lib/pthreadpool')
 bld.RECURSE('lib/asys')
 bld.RECURSE('lib/poll_funcs')
+bld.RECURSE('lib/unix_msg')
 bld.RECURSE('librpc')
 bld.RECURSE('librpc/idl')
 bld.RECURSE('libsmb')
-- 
1.9.1.423.g4596e3a


From 91e269f8df7d5de86b05ab505de2ba9a36b757cd Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Sun, 29 Dec 2013 13:56:44 +0100
Subject: [PATCH 03/24] lib: Move full_path_tos to util_str.c

This can be useful elsewhere

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/include/proto.h |  3 +++
 source3/lib/util_str.c  | 39 +++++++++++++++++++++++++++++++++++++++
 source3/smbd/files.c    | 39 ---------------------------------------
 source3/smbd/proto.h    |  3 ---
 4 files changed, 42 insertions(+), 42 deletions(-)

diff --git a/source3/include/proto.h b/source3/include/proto.h
index 3197b76..c854882 100644
--- a/source3/include/proto.h
+++ b/source3/include/proto.h
@@ -724,6 +724,9 @@ bool validate_net_name( const char *name,
 		int max_len);
 char *escape_shell_string(const char *src);
 char **str_list_make_v3(TALLOC_CTX *mem_ctx, const char *string, const char *sep);
+ssize_t full_path_tos(const char *dir, const char *name,
+		      char *tmpbuf, size_t tmpbuf_len,
+		      char **pdst, char **to_free);
 
 /* The following definitions come from lib/version.c  */
 
diff --git a/source3/lib/util_str.c b/source3/lib/util_str.c
index 967beda..908f23a 100644
--- a/source3/lib/util_str.c
+++ b/source3/lib/util_str.c
@@ -1294,3 +1294,42 @@ char **str_list_make_v3(TALLOC_CTX *mem_ctx, const char *string,
 	TALLOC_FREE(s);
 	return list;
 }
+
+/*
+ * This routine improves performance for operations temporarily acting on a
+ * full path. It is equivalent to the much more expensive
+ *
+ * talloc_asprintf(talloc_tos(), "%s/%s", dir, name)
+ *
+ * This actually does make a difference in metadata-heavy workloads (i.e. the
+ * "standard" client.txt nbench run.
+ */
+
+ssize_t full_path_tos(const char *dir, const char *name,
+		      char *tmpbuf, size_t tmpbuf_len,
+		      char **pdst, char **to_free)
+{
+	size_t dirlen, namelen, len;
+	char *dst;
+
+	dirlen = strlen(dir);
+	namelen = strlen(name);
+	len = dirlen + namelen + 1;
+
+	if (len < tmpbuf_len) {
+		dst = tmpbuf;
+		*to_free = NULL;
+	} else {
+		dst = talloc_array(talloc_tos(), char, len+1);
+		if (dst == NULL) {
+			return -1;
+		}
+		*to_free = dst;
+	}
+
+	memcpy(dst, dir, dirlen);
+	dst[dirlen] = '/';
+	memcpy(dst+dirlen+1, name, namelen+1);
+	*pdst = dst;
+	return len;
+}
diff --git a/source3/smbd/files.c b/source3/smbd/files.c
index 5cf037e..8496806 100644
--- a/source3/smbd/files.c
+++ b/source3/smbd/files.c
@@ -688,45 +688,6 @@ NTSTATUS dup_file_fsp(struct smb_request *req, files_struct *from,
 	return fsp_set_smb_fname(to, from->fsp_name);
 }
 
-/*
- * This routine improves performance for operations temporarily acting on a
- * full path. It is equivalent to the much more expensive
- *
- * talloc_asprintf(talloc_tos(), "%s/%s", dir, name)
- *
- * This actually does make a difference in metadata-heavy workloads (i.e. the
- * "standard" client.txt nbench run.
- */
-
-ssize_t full_path_tos(const char *dir, const char *name,
-		      char *tmpbuf, size_t tmpbuf_len,
-		      char **pdst, char **to_free)
-{
-	size_t dirlen, namelen, len;
-	char *dst;
-
-	dirlen = strlen(dir);
-	namelen = strlen(name);
-	len = dirlen + namelen + 1;
-
-	if (len < tmpbuf_len) {
-		dst = tmpbuf;
-		*to_free = NULL;
-	} else {
-		dst = talloc_array(talloc_tos(), char, len+1);
-		if (dst == NULL) {
-			return -1;
-		}
-		*to_free = dst;
-	}
-
-	memcpy(dst, dir, dirlen);
-	dst[dirlen] = '/';
-	memcpy(dst+dirlen+1, name, namelen+1);
-	*pdst = dst;
-	return len;
-}
-
 /**
  * Return a jenkins hash of a pathname on a connection.
  */
diff --git a/source3/smbd/proto.h b/source3/smbd/proto.h
index 62c9728..d9b86b6 100644
--- a/source3/smbd/proto.h
+++ b/source3/smbd/proto.h
@@ -397,9 +397,6 @@ NTSTATUS file_name_hash(connection_struct *conn,
 			const char *name, uint32_t *p_name_hash);
 NTSTATUS fsp_set_smb_fname(struct files_struct *fsp,
 			   const struct smb_filename *smb_fname_in);
-ssize_t full_path_tos(const char *dir, const char *name,
-		      char *tmpbuf, size_t tmpbuf_len,
-		      char **pdst, char **to_free);
 
 /* The following definitions come from smbd/ipc.c  */
 
-- 
1.9.1.423.g4596e3a


From f164a41ae93a1cf037d885630e54f988ddeea575 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 24 Feb 2014 12:23:49 +0000
Subject: [PATCH 04/24] lib: Add messaging_dgm

Messaging based on unix domain datagram sockets

This makes every process participating in messaging bind on a unix domain
datagram socket, similar to the source4 based messaging. The details are a bit
different though:

Retry after EWOULDBLOCK is done with a blocking thread, not by polling. This
was the only way I could in experiments avoid a thundering herd or high load
under Linux in extreme overload situations like many thousands of processes
sending to one blocked process. If there are better ideas to do this in a
simple way, I'm more than happy to remove the pthreadpool dependency again.

There is only one socket per process, not per task. I don't think that per-task
sockets are really necessary, we can do filtering in user space. The message
contains the destination server_id, which contains the destination task_id. I
think we can rebase the source4 based imessaging on top of this, allowing
multiple imessaging contexts on top of one messaging_context. I had planned to
do this conversion before this goes in, but Jeremy convinced me that this has
value in itself :-)

Per socket we also create a fcntl-based lockfile to allow race-free cleanup of
orphaned sockets. This lockfile contains the unique_id, which in the future
will make the server_id.tdb obsolete.

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/include/messages.h      |   5 +
 source3/lib/messages.c          |   8 +-
 source3/lib/messages_dgm.c      | 420 ++++++++++++++++++++++++++++++++++++++++
 source3/lib/unix_msg/unix_msg.c |  11 +-
 source3/smbd/server.c           |   6 +
 source3/wscript_build           |   3 +
 6 files changed, 445 insertions(+), 8 deletions(-)
 create mode 100644 source3/lib/messages_dgm.c

diff --git a/source3/include/messages.h b/source3/include/messages.h
index 47c5f7a..9437965 100644
--- a/source3/include/messages.h
+++ b/source3/include/messages.h
@@ -91,6 +91,11 @@ struct messaging_backend {
 	void *private_data;
 };
 
+NTSTATUS messaging_dgm_init(struct messaging_context *msg_ctx,
+			    TALLOC_CTX *mem_ctx,
+			    struct messaging_backend **presult);
+NTSTATUS messaging_dgm_cleanup(struct messaging_context *msg_ctx, pid_t pid);
+
 NTSTATUS messaging_tdb_init(struct messaging_context *msg_ctx,
 			    TALLOC_CTX *mem_ctx,
 			    struct messaging_backend **presult);
diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index 4ff933d..983fe69 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -197,10 +197,10 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
 	ctx->id = procid_self();
 	ctx->event_ctx = ev;
 
-	status = messaging_tdb_init(ctx, ctx, &ctx->local);
+	status = messaging_dgm_init(ctx, ctx, &ctx->local);
 
 	if (!NT_STATUS_IS_OK(status)) {
-		DEBUG(2, ("messaging_tdb_init failed: %s\n",
+		DEBUG(2, ("messaging_dgm_init failed: %s\n",
 			  nt_errstr(status)));
 		TALLOC_FREE(ctx);
 		return NULL;
@@ -245,9 +245,9 @@ NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
 
 	msg_ctx->id = procid_self();
 
-	status = messaging_tdb_init(msg_ctx, msg_ctx, &msg_ctx->local);
+	status = messaging_dgm_init(msg_ctx, msg_ctx, &msg_ctx->local);
 	if (!NT_STATUS_IS_OK(status)) {
-		DEBUG(0, ("messaging_tdb_init failed: %s\n",
+		DEBUG(0, ("messaging_dgm_init failed: %s\n",
 			  nt_errstr(status)));
 		return status;
 	}
diff --git a/source3/lib/messages_dgm.c b/source3/lib/messages_dgm.c
new file mode 100644
index 0000000..0d4df2f
--- /dev/null
+++ b/source3/lib/messages_dgm.c
@@ -0,0 +1,420 @@
+/*
+ * Unix SMB/CIFS implementation.
+ * Samba internal messaging functions
+ * Copyright (C) 2013 by Volker Lendecke
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "includes.h"
+#include "lib/util/data_blob.h"
+#include "lib/util/debug.h"
+#include "lib/unix_msg/unix_msg.h"
+#include "system/filesys.h"
+#include "messages.h"
+#include "lib/param/param.h"
+#include "poll_funcs/poll_funcs_tevent.h"
+#include "unix_msg/unix_msg.h"
+#include "librpc/gen_ndr/messaging.h"
+
+struct messaging_dgm_context {
+	struct messaging_context *msg_ctx;
+	struct poll_funcs msg_callbacks;
+	struct unix_msg_ctx *dgm_ctx;
+	char *cache_dir;
+	int lockfile_fd;
+};
+
+struct messaging_dgm_hdr {
+	uint32_t msg_version;
+	enum messaging_type msg_type;
+	struct server_id dst;
+	struct server_id src;
+};
+
+static NTSTATUS messaging_dgm_send(struct messaging_context *msg_ctx,
+				   struct server_id pid, int msg_type,
+				   const DATA_BLOB *data,
+				   struct messaging_backend *backend);
+static void messaging_dgm_recv(struct unix_msg_ctx *ctx,
+			       uint8_t *msg, size_t msg_len,
+			       void *private_data);
+
+static int messaging_dgm_context_destructor(struct messaging_dgm_context *c);
+
+static int messaging_dgm_lockfile_create(const char *cache_dir, pid_t pid,
+					 int *plockfile_fd, uint64_t unique)
+{
+	char buf[PATH_MAX];
+	char *dir, *to_free;
+	ssize_t dirlen;
+	char *lockfile_name;
+	int lockfile_fd;
+	struct flock lck = {};
+	int unique_len, ret;
+	ssize_t written;
+	bool ok;
+
+	dirlen = full_path_tos(cache_dir, "lck", buf, sizeof(buf),
+			       &dir, &to_free);
+	if (dirlen == -1) {
+		return ENOMEM;
+	}
+
+	ok = directory_create_or_exist_strict(dir, sec_initial_uid(), 0755);
+	if (!ok) {
+		ret = errno;
+		DEBUG(1, ("%s: Could not create lock directory: %s\n",
+			  __func__, strerror(ret)));
+		TALLOC_FREE(to_free);
+		return ret;
+	}
+
+	lockfile_name = talloc_asprintf(talloc_tos(), "%s/%u", dir,
+					(unsigned)pid);
+	TALLOC_FREE(to_free);
+	if (lockfile_name == NULL) {
+		DEBUG(1, ("%s: talloc_asprintf failed\n", __func__));
+		return ENOMEM;
+	}
+
+	/* no O_EXCL, existence check is via the fcntl lock */
+
+	lockfile_fd = open(lockfile_name, O_NONBLOCK|O_CREAT|O_WRONLY, 0644);
+	if (lockfile_fd == -1) {
+		ret = errno;
+		DEBUG(1, ("%s: open failed: %s\n", __func__, strerror(errno)));
+		goto fail_free;
+	}
+
+	lck.l_type = F_WRLCK;
+	lck.l_whence = SEEK_SET;
+	lck.l_start = 0;
+	lck.l_len = 0;
+
+	ret = fcntl(lockfile_fd, F_SETLK, &lck);
+	if (ret == -1) {
+		ret = errno;
+		DEBUG(1, ("%s: fcntl failed: %s\n", __func__, strerror(ret)));
+		goto fail_close;
+	}
+
+	unique_len = snprintf(buf, sizeof(buf), "%"PRIu64, unique);
+
+	/* shorten a potentially preexisting file */
+
+	ret = ftruncate(lockfile_fd, unique_len);
+	if (ret == -1) {
+		ret = errno;
+		DEBUG(1, ("%s: ftruncate failed: %s\n", __func__,
+			  strerror(ret)));
+		goto fail_unlink;
+	}
+
+	written = write(lockfile_fd, buf, unique_len);
+	if (written != unique_len) {
+		ret = errno;
+		DEBUG(1, ("%s: write failed: %s\n", __func__, strerror(ret)));
+		goto fail_unlink;
+	}
+
+	*plockfile_fd = lockfile_fd;
+	return 0;
+
+fail_unlink:
+	unlink(lockfile_name);
+fail_close:
+	close(lockfile_fd);
+fail_free:
+	TALLOC_FREE(lockfile_name);
+	return ret;
+}
+
+static int messaging_dgm_lockfile_remove(const char *cache_dir, pid_t pid)
+{
+	fstring fname;
+	char buf[PATH_MAX];
+	char *lockfile_name, *to_free;
+	ssize_t len;
+	int ret;
+
+	fstr_sprintf(fname, "lck/%u", (unsigned)pid);
+
+	len = full_path_tos(cache_dir, fname, buf, sizeof(buf),
+			    &lockfile_name, &to_free);
+	if (len == -1) {
+		return ENOMEM;
+	}
+
+	ret = unlink(lockfile_name);
+	if (ret == -1) {
+		ret = errno;
+		DEBUG(10, ("%s: unlink failed: %s\n", __func__,
+			   strerror(ret)));
+	}
+	TALLOC_FREE(to_free);
+	return ret;
+}
+
+NTSTATUS messaging_dgm_init(struct messaging_context *msg_ctx,
+			    TALLOC_CTX *mem_ctx,
+			    struct messaging_backend **presult)
+{
+	struct messaging_backend *result;
+	struct messaging_dgm_context *ctx;
+	struct loadparm_context *lp_ctx;
+	struct server_id pid = messaging_server_id(msg_ctx);
+	int ret;
+	bool ok;
+	const char *cache_dir;
+	char *socket_dir, *socket_name;
+	uint64_t cookie;
+
+	lp_ctx = loadparm_init_s3(talloc_tos(), loadparm_s3_helpers());
+	if (lp_ctx == NULL) {
+		DEBUG(0, ("loadparm_init_s3 failed\n"));
+		return NT_STATUS_INTERNAL_ERROR;
+	}
+
+	cache_dir = lp_cache_directory();
+	if (cache_dir == NULL) {
+		NTSTATUS status = map_nt_error_from_unix(errno);
+		talloc_unlink(talloc_tos(), lp_ctx);
+		return status;
+	}
+
+	talloc_unlink(talloc_tos(), lp_ctx);
+	lp_ctx = NULL;
+
+	result = talloc(mem_ctx, struct messaging_backend);
+	if (result == NULL) {
+		goto fail_nomem;
+	}
+	ctx = talloc_zero(result, struct messaging_dgm_context);
+	if (ctx == NULL) {
+		goto fail_nomem;
+	}
+
+	result->private_data = ctx;
+	result->send_fn = messaging_dgm_send;
+	ctx->msg_ctx = msg_ctx;
+
+	ctx->cache_dir = talloc_strdup(ctx, cache_dir);
+	if (ctx->cache_dir == NULL) {
+		goto fail_nomem;
+	}
+	socket_dir = talloc_asprintf(ctx, "%s/msg", cache_dir);
+	if (socket_dir == NULL) {
+		goto fail_nomem;
+	}
+	socket_name = talloc_asprintf(ctx, "%s/%u", socket_dir,
+				      (unsigned)pid.pid);
+	if (socket_name == NULL) {
+		goto fail_nomem;
+	}
+
+	sec_init();
+
+	ret = messaging_dgm_lockfile_create(cache_dir, pid.pid,
+					    &ctx->lockfile_fd, pid.unique_id);
+	if (ret != 0) {
+		DEBUG(1, ("%s: messaging_dgm_create_lockfile failed: %s\n",
+			  __func__, strerror(ret)));
+		TALLOC_FREE(result);
+		return map_nt_error_from_unix(ret);
+	}
+
+	poll_funcs_init_tevent(&ctx->msg_callbacks, msg_ctx->event_ctx);
+
+	ok = directory_create_or_exist_strict(socket_dir, sec_initial_uid(),
+					      0700);
+	if (!ok) {
+		DEBUG(1, ("Could not create socket directory\n"));
+		TALLOC_FREE(result);
+		return NT_STATUS_ACCESS_DENIED;
+	}
+	TALLOC_FREE(socket_dir);
+
+	unlink(socket_name);
+
+	generate_random_buffer((uint8_t *)&cookie, sizeof(cookie));
+
+	ret = unix_msg_init(socket_name, &ctx->msg_callbacks, 1024, cookie,
+			    messaging_dgm_recv, ctx, &ctx->dgm_ctx);
+	TALLOC_FREE(socket_name);
+	if (ret != 0) {
+		DEBUG(1, ("unix_msg_init failed: %s\n", strerror(ret)));
+		TALLOC_FREE(result);
+		return map_nt_error_from_unix(ret);
+	}
+	talloc_set_destructor(ctx, messaging_dgm_context_destructor);
+
+	*presult = result;
+	return NT_STATUS_OK;
+
+fail_nomem:
+	TALLOC_FREE(result);
+	return NT_STATUS_NO_MEMORY;
+}
+
+static int messaging_dgm_context_destructor(struct messaging_dgm_context *c)
+{
+	struct server_id pid = messaging_server_id(c->msg_ctx);
+
+	/*
+	 * First delete the socket to avoid races. The lockfile is the
+	 * indicator that we're still around.
+	 */
+	unix_msg_free(c->dgm_ctx);
+
+	if (getpid() == pid.pid) {
+		(void)messaging_dgm_lockfile_remove(c->cache_dir, pid.pid);
+	}
+	close(c->lockfile_fd);
+	return 0;
+}
+
+static NTSTATUS messaging_dgm_send(struct messaging_context *msg_ctx,
+				   struct server_id pid, int msg_type,
+				   const DATA_BLOB *data,
+				   struct messaging_backend *backend)
+{
+	struct messaging_dgm_context *ctx = talloc_get_type_abort(
+		backend->private_data, struct messaging_dgm_context);
+	fstring pid_str;
+	char buf[PATH_MAX];
+	char *dst_sock, *to_free;
+	struct messaging_dgm_hdr hdr;
+	struct iovec iov[2];
+	ssize_t pathlen;
+	int ret;
+
+	fstr_sprintf(pid_str, "msg/%u", (unsigned)pid.pid);
+
+	pathlen = full_path_tos(ctx->cache_dir, pid_str, buf, sizeof(buf),
+				&dst_sock, &to_free);
+	if (pathlen == -1) {
+		return NT_STATUS_NO_MEMORY;
+	}
+
+	hdr.msg_version = MESSAGE_VERSION;
+	hdr.msg_type = msg_type & MSG_TYPE_MASK;
+	hdr.dst = pid;
+	hdr.src = msg_ctx->id;
+
+	DEBUG(10, ("%s: Sending message 0x%x len %u to %s\n", __func__,
+		   (unsigned)hdr.msg_type, (unsigned)data->length,
+		   server_id_str(talloc_tos(), &pid)));
+
+	iov[0].iov_base = &hdr;
+	iov[0].iov_len = sizeof(hdr);
+	iov[1].iov_base = data->data;
+	iov[1].iov_len = data->length;
+
+	become_root();
+	ret = unix_msg_send(ctx->dgm_ctx, dst_sock, iov, ARRAY_SIZE(iov));
+	unbecome_root();
+
+	TALLOC_FREE(to_free);
+
+	if (ret != 0) {
+		return map_nt_error_from_unix(ret);
+	}
+	return NT_STATUS_OK;
+}
+
+static void messaging_dgm_recv(struct unix_msg_ctx *ctx,
+			       uint8_t *msg, size_t msg_len,
+			       void *private_data)
+{
+	struct messaging_dgm_context *dgm_ctx = talloc_get_type_abort(
+		private_data, struct messaging_dgm_context);
+	struct messaging_dgm_hdr *hdr;
+	struct messaging_rec rec;
+
+	if (msg_len < sizeof(*hdr)) {
+		DEBUG(1, ("message too short: %u\n", (unsigned)msg_len));
+		return;
+	}
+
+	/*
+	 * unix_msg guarantees alignment, so we can cast here
+	 */
+	hdr = (struct messaging_dgm_hdr *)msg;
+
+	rec.msg_version = hdr->msg_version;
+	rec.msg_type = hdr->msg_type;
+	rec.dest = hdr->dst;
+	rec.src = hdr->src;
+	rec.buf.data = msg + sizeof(*hdr);
+	rec.buf.length = msg_len - sizeof(*hdr);
+
+	DEBUG(10, ("%s: Received message 0x%x len %u from %s\n", __func__,
+		   (unsigned)hdr->msg_type, (unsigned)rec.buf.length,
+		   server_id_str(talloc_tos(), &rec.src)));
+
+	messaging_dispatch_rec(dgm_ctx->msg_ctx, &rec);
+}
+
+NTSTATUS messaging_dgm_cleanup(struct messaging_context *msg_ctx, pid_t pid)
+{
+	struct messaging_dgm_context *ctx = talloc_get_type_abort(
+		msg_ctx->local->private_data, struct messaging_dgm_context);
+	char *lockfile_name, *socket_name;
+	int fd, ret;
+	struct flock lck = {};
+	NTSTATUS status = NT_STATUS_OK;
+
+	lockfile_name = talloc_asprintf(talloc_tos(), "%s/lck/%u",
+					ctx->cache_dir, (unsigned)pid);
+	if (lockfile_name == NULL) {
+		return NT_STATUS_NO_MEMORY;
+	}
+	socket_name = talloc_asprintf(lockfile_name, "%s/msg/%u",
+				      ctx->cache_dir, (unsigned)pid);
+	if (socket_name == NULL) {
+		TALLOC_FREE(lockfile_name);
+		return NT_STATUS_NO_MEMORY;
+	}
+
+	fd = open(lockfile_name, O_NONBLOCK|O_WRONLY, 0);
+	if (fd == -1) {
+		status = map_nt_error_from_unix(errno);
+		DEBUG(10, ("%s: open(%s) failed: %s\n", __func__,
+			   lockfile_name, strerror(errno)));
+		return status;
+	}
+
+	lck.l_type = F_WRLCK;
+	lck.l_whence = SEEK_SET;
+	lck.l_start = 0;
+	lck.l_len = 0;
+
+	ret = fcntl(fd, F_SETLK, &lck);
+	if (ret != 0) {
+		status = map_nt_error_from_unix(errno);
+		DEBUG(10, ("%s: Could not get lock: %s\n", __func__,
+			   strerror(errno)));
+		TALLOC_FREE(lockfile_name);
+		close(fd);
+		return status;
+	}
+
+	(void)unlink(socket_name);
+	(void)unlink(lockfile_name);
+	(void)close(fd);
+
+	TALLOC_FREE(lockfile_name);
+	return NT_STATUS_OK;
+}
diff --git a/source3/lib/unix_msg/unix_msg.c b/source3/lib/unix_msg/unix_msg.c
index b6f4cdf..0d5c14f 100644
--- a/source3/lib/unix_msg/unix_msg.c
+++ b/source3/lib/unix_msg/unix_msg.c
@@ -16,11 +16,14 @@
  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  */
 
+#include "replace.h"
 #include "unix_msg.h"
 #include "system/select.h"
 #include "system/time.h"
+#include "system/network.h"
 #include "dlinklist.h"
 #include "pthreadpool/pthreadpool.h"
+#include <fcntl.h>
 
 /*
  * This file implements two abstractions: The "unix_dgram" functions implement
@@ -90,24 +93,24 @@ static int prepare_socket(int sock)
 #endif
 #endif
 
-	flags = fcntl(fd, F_GETFL);
+	flags = fcntl(sock, F_GETFL);
 	if (flags == -1) {
 		return errno;
 	}
 	flags |= FLAG_TO_SET;
-	if (fcntl(fd, F_SETFL, flags) == -1) {
+	if (fcntl(sock, F_SETFL, flags) == -1) {
 		return errno;
 	}
 
 #undef FLAG_TO_SET
 
 #ifdef FD_CLOEXEC
-	flags = fcntl(fd, F_GETFD, 0);
+	flags = fcntl(sock, F_GETFD, 0);
 	if (flags == -1) {
 		return errno;
 	}
 	flags |= FD_CLOEXEC;
-	if (fcntl(fd, F_SETFD, flags) == -1) {
+	if (fcntl(sock, F_SETFD, flags) == -1) {
 		return errno;
 	}
 #endif
diff --git a/source3/smbd/server.c b/source3/smbd/server.c
index bc9d293..7c6a905 100644
--- a/source3/smbd/server.c
+++ b/source3/smbd/server.c
@@ -465,6 +465,8 @@ static void remove_child_pid(struct smbd_parent_context *parent,
 	}
 
 	if (unclean_shutdown) {
+		NTSTATUS status;
+
 		/* a child terminated uncleanly so tickle all
 		   processes to see if they can grab any of the
 		   pending locks
@@ -488,6 +490,10 @@ static void remove_child_pid(struct smbd_parent_context *parent,
 		 * terminated uncleanly.
 		 */
 		messaging_cleanup_server(parent->msg_ctx, child_id);
+
+		status = messaging_dgm_cleanup(parent->msg_ctx, pid);
+		DEBUG(10, ("%s: messaging_dgm_cleanup returned %s\n",
+			   __func__, nt_errstr(status)));
 	}
 
 	if (!serverid_deregister(child_id)) {
diff --git a/source3/wscript_build b/source3/wscript_build
index 4d261c6..2ba7a96 100755
--- a/source3/wscript_build
+++ b/source3/wscript_build
@@ -314,6 +314,7 @@ bld.SAMBA3_SUBSYSTEM('TDB_LIB',
 bld.SAMBA3_SUBSYSTEM('samba3core',
                    source='''lib/messages.c
                    lib/messages_local.c
+                   lib/messages_dgm.c
                    lib/util_cluster.c
                    lib/id_cache.c
                    lib/talloc_dict.c
@@ -352,6 +353,8 @@ bld.SAMBA3_SUBSYSTEM('samba3core',
                         UTIL_PW
                         SAMBA_VERSION
                         PTHREADPOOL
+                        UNIX_MSG
+                        POLL_FUNCS_TEVENT
                         interfaces
                         param
                         dbwrap
-- 
1.9.1.423.g4596e3a


From f7aa6d01f0c74583af84484cb465393e2e68441f Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 24 Feb 2014 13:20:16 +0000
Subject: [PATCH 05/24] lib: Remove messages_local

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/include/messages.h   |  12 -
 source3/lib/messages.c       |  17 --
 source3/lib/messages_local.c | 573 -------------------------------------------
 source3/smbd/server.c        |  11 -
 source3/wscript_build        |   1 -
 5 files changed, 614 deletions(-)
 delete mode 100644 source3/lib/messages_local.c

diff --git a/source3/include/messages.h b/source3/include/messages.h
index 9437965..b89af9c 100644
--- a/source3/include/messages.h
+++ b/source3/include/messages.h
@@ -96,15 +96,6 @@ NTSTATUS messaging_dgm_init(struct messaging_context *msg_ctx,
 			    struct messaging_backend **presult);
 NTSTATUS messaging_dgm_cleanup(struct messaging_context *msg_ctx, pid_t pid);
 
-NTSTATUS messaging_tdb_init(struct messaging_context *msg_ctx,
-			    TALLOC_CTX *mem_ctx,
-			    struct messaging_backend **presult);
-
-bool messaging_tdb_parent_init(TALLOC_CTX *mem_ctx);
-
-NTSTATUS messaging_tdb_cleanup(struct messaging_context *msg_ctx,
-			struct server_id pid);
-
 NTSTATUS messaging_ctdbd_init(struct messaging_context *msg_ctx,
 			      TALLOC_CTX *mem_ctx,
 			      struct messaging_backend **presult);
@@ -151,9 +142,6 @@ struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
 			struct messaging_rec **presult);
 
-void messaging_cleanup_server(struct messaging_context *msg_ctx,
-				struct server_id pid);
-
 #include "librpc/gen_ndr/ndr_messaging.h"
 
 #endif
diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index 983fe69..5db1214 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -567,21 +567,4 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx,
 	return;
 }
 
-/*
-  Call when a process has terminated abnormally.
-*/
-void messaging_cleanup_server(struct messaging_context *msg_ctx,
-				struct server_id server)
-{
-	if (server_id_is_disconnected(&server)) {
-		return;
-	}
-
-	if (!procid_is_local(&server)) {
-		return;
-	}
-
-	(void)messaging_tdb_cleanup(msg_ctx, server);
-
-}
 /** @} **/
diff --git a/source3/lib/messages_local.c b/source3/lib/messages_local.c
deleted file mode 100644
index d535df1..0000000
--- a/source3/lib/messages_local.c
+++ /dev/null
@@ -1,573 +0,0 @@
-/* 
-   Unix SMB/CIFS implementation.
-   Samba internal messaging functions
-   Copyright (C) 2007 by Volker Lendecke
-
-   This program is free software; you can redistribute it and/or modify
-   it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 3 of the License, or
-   (at your option) any later version.
-
-   This program is distributed in the hope that it will be useful,
-   but WITHOUT ANY WARRANTY; without even the implied warranty of
-   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-   GNU General Public License for more details.
-
-   You should have received a copy of the GNU General Public License
-   along with this program.  If not, see <http://www.gnu.org/licenses/>.
-*/
-
-/**
-  @defgroup messages Internal messaging framework
-  @{
-  @file messages.c
-
-  @brief  Module for internal messaging between Samba daemons. 
-
-   The idea is that if a part of Samba wants to do communication with
-   another Samba process then it will do a message_register() of a
-   dispatch function, and use message_send_pid() to send messages to
-   that process.
-
-   The dispatch function is given the pid of the sender, and it can
-   use that to reply by message_send_pid().  See ping_message() for a
-   simple example.
-
-   @caution Dispatch functions must be able to cope with incoming
-   messages on an *odd* byte boundary.
-
-   This system doesn't have any inherent size limitations but is not
-   very efficient for large messages or when messages are sent in very
-   quick succession.
-
-*/
-
-#include "includes.h"
-#include "system/filesys.h"
-#include "messages.h"
-#include "serverid.h"
-#include "lib/tdb_wrap/tdb_wrap.h"
-#include "lib/param/param.h"
-
-struct messaging_tdb_context {
-	struct messaging_context *msg_ctx;
-	struct tdb_wrap *tdb;
-	struct tevent_signal *se;
-	int received_messages;
-	bool *have_context;
-};
-
-static NTSTATUS messaging_tdb_send(struct messaging_context *msg_ctx,
-				   struct server_id pid, int msg_type,
-				   const DATA_BLOB *data,
-				   struct messaging_backend *backend);
-static void message_dispatch(struct messaging_context *msg_ctx);
-
-static void messaging_tdb_signal_handler(struct tevent_context *ev_ctx,
-					 struct tevent_signal *se,
-					 int signum, int count,
-					 void *_info, void *private_data)
-{
-	struct messaging_tdb_context *ctx = talloc_get_type(private_data,
-					    struct messaging_tdb_context);
-
-	ctx->received_messages++;
-
-	DEBUG(10, ("messaging_tdb_signal_handler: sig[%d] count[%d] msgs[%d]\n",
-		   signum, count, ctx->received_messages));
-
-	message_dispatch(ctx->msg_ctx);
-}
-
-static int messaging_tdb_context_destructor(struct messaging_tdb_context *ctx);
-
-/****************************************************************************
- Initialise the messaging functions. 
-****************************************************************************/
-
-NTSTATUS messaging_tdb_init(struct messaging_context *msg_ctx,
-			    TALLOC_CTX *mem_ctx,
-			    struct messaging_backend **presult)
-{
-	struct messaging_backend *result;
-	struct messaging_tdb_context *ctx;
-	struct loadparm_context *lp_ctx;
-	static bool have_context = false;
-	const char *fname;
-
-	if (have_context) {
-		DEBUG(0, ("No two messaging contexts per process\n"));
-		return NT_STATUS_OBJECT_NAME_COLLISION;
-	}
-
-	if (!(result = talloc(mem_ctx, struct messaging_backend))) {
-		DEBUG(0, ("talloc failed\n"));
-		return NT_STATUS_NO_MEMORY;
-	}
-
-	lp_ctx = loadparm_init_s3(result, loadparm_s3_helpers());
-	if (lp_ctx == NULL) {
-		DEBUG(0, ("loadparm_init_s3 failed\n"));
-		TALLOC_FREE(result);
-		return NT_STATUS_INTERNAL_ERROR;
-	}
-
-	ctx = talloc_zero(result, struct messaging_tdb_context);
-	if (!ctx) {
-		DEBUG(0, ("talloc failed\n"));
-		TALLOC_FREE(result);
-		return NT_STATUS_NO_MEMORY;
-	}
-	result->private_data = ctx;
-	result->send_fn = messaging_tdb_send;
-
-	ctx->msg_ctx = msg_ctx;
-	ctx->have_context = &have_context;
-
-	fname = lock_path("messages.tdb");
-
-	ctx->tdb = tdb_wrap_open(
-		ctx, fname, lpcfg_tdb_hash_size(lp_ctx, fname),
-		lpcfg_tdb_flags(lp_ctx, TDB_CLEAR_IF_FIRST|TDB_DEFAULT|
-				TDB_VOLATILE| TDB_INCOMPATIBLE_HASH),
-		O_RDWR|O_CREAT,0600);
-
-	talloc_unlink(result, lp_ctx);
-
-	if (!ctx->tdb) {
-		NTSTATUS status = map_nt_error_from_unix(errno);
-		DEBUG(2, ("ERROR: Failed to initialise messages database: "
-			  "%s\n", strerror(errno)));
-		TALLOC_FREE(result);
-		return status;
-	}
-
-	ctx->se = tevent_add_signal(msg_ctx->event_ctx,
-				    ctx,
-				    SIGUSR1, 0,
-				    messaging_tdb_signal_handler,
-				    ctx);
-	if (!ctx->se) {
-		NTSTATUS status = map_nt_error_from_unix(errno);
-		DEBUG(0, ("ERROR: Failed to initialise messages signal handler: "
-			  "%s\n", strerror(errno)));
-		TALLOC_FREE(result);
-		return status;
-	}
-
-	sec_init();
-
-	have_context = true;
-	talloc_set_destructor(ctx, messaging_tdb_context_destructor);
-
-	*presult = result;
-	return NT_STATUS_OK;
-}
-
-static int messaging_tdb_context_destructor(struct messaging_tdb_context *ctx)
-{
-	SMB_ASSERT(*ctx->have_context);
-	*ctx->have_context = false;
-	return 0;
-}
-
-bool messaging_tdb_parent_init(TALLOC_CTX *mem_ctx)
-{
-	struct tdb_wrap *db;
-	struct loadparm_context *lp_ctx;
-	const char *fname;
-
-	lp_ctx = loadparm_init_s3(mem_ctx, loadparm_s3_helpers());
-	if (lp_ctx == NULL) {
-		DEBUG(0, ("loadparm_init_s3 failed\n"));
-		return false;
-	}
-
-	/*
-	 * Open the tdb in the parent process (smbd) so that our
-	 * CLEAR_IF_FIRST optimization in tdb_reopen_all can properly
-	 * work.
-	 */
-
-	fname = lock_path("messages.tdb");
-	db = tdb_wrap_open(
-		mem_ctx, fname, lpcfg_tdb_hash_size(lp_ctx, fname),
-		lpcfg_tdb_flags(lp_ctx, TDB_CLEAR_IF_FIRST|TDB_DEFAULT|
-				TDB_VOLATILE|TDB_INCOMPATIBLE_HASH),
-		O_RDWR|O_CREAT,0600);
-	talloc_unlink(mem_ctx, lp_ctx);
-	if (db == NULL) {
-		DEBUG(1, ("could not open messaging.tdb: %s\n",
-			  strerror(errno)));
-		return false;
-	}
-	return true;
-}
-
-/*******************************************************************
- Form a static tdb key from a pid.
-******************************************************************/
-
-static TDB_DATA message_key_pid(TALLOC_CTX *mem_ctx, struct server_id pid)
-{
-	char *key;
-	TDB_DATA kbuf;
-
-	key = talloc_asprintf(mem_ctx, "PID/%s", procid_str_static(&pid));
-
-	SMB_ASSERT(key != NULL);
-
-	kbuf.dptr = (uint8 *)key;
-	kbuf.dsize = strlen(key)+1;
-	return kbuf;
-}
-
-/*******************************************************************
- Called when a process has terminated abnormally. Remove all messages
- pending for it.
-******************************************************************/
-
-NTSTATUS messaging_tdb_cleanup(struct messaging_context *msg_ctx,
-				struct server_id pid)
-{
-	struct messaging_tdb_context *ctx = talloc_get_type(
-					msg_ctx->local->private_data,
-					struct messaging_tdb_context);
-	struct tdb_wrap *tdb = ctx->tdb;
-	TDB_DATA key;
-	TALLOC_CTX *frame = talloc_stackframe();
-
-	key = message_key_pid(frame, pid);
-	/*
-	 * We have to lock the key to avoid
-	 * races in case the server_id was
-	 * re-used and is active (a remote
-	 * possibility, true). We only
-	 * clean up the database if we
-	 * know server_id doesn't exist
-	 * while checked under the chainlock.
-	 */
-	if (tdb_chainlock(tdb->tdb, key) != 0) {
-		TALLOC_FREE(frame);
-		return NT_STATUS_LOCK_NOT_GRANTED;
-	}
-	if (!serverid_exists(&pid)) {
-		(void)tdb_delete(tdb->tdb, key);
-	}
-	tdb_chainunlock(tdb->tdb, key);
-	TALLOC_FREE(frame);
-	return NT_STATUS_OK;
-}
-
-/*
-  Fetch the messaging array for a process
- */
-
-static NTSTATUS messaging_tdb_fetch(TDB_CONTEXT *msg_tdb,
-				    TDB_DATA key,
-				    TALLOC_CTX *mem_ctx,
-				    struct messaging_array **presult)
-{
-	struct messaging_array *result;
-	TDB_DATA data;
-	DATA_BLOB blob;
-	enum ndr_err_code ndr_err;
-
-	if (!(result = talloc_zero(mem_ctx, struct messaging_array))) {
-		return NT_STATUS_NO_MEMORY;
-	}
-
-	data = tdb_fetch(msg_tdb, key);
-
-	if (data.dptr == NULL) {
-		*presult = result;
-		return NT_STATUS_OK;
-	}
-
-	blob = data_blob_const(data.dptr, data.dsize);
-
-	ndr_err = ndr_pull_struct_blob_all(
-		&blob, result, result,
-		(ndr_pull_flags_fn_t)ndr_pull_messaging_array);
-
-	SAFE_FREE(data.dptr);
-
-	if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
-		TALLOC_FREE(result);
-		return ndr_map_error2ntstatus(ndr_err);
-	}
-
-	if (DEBUGLEVEL >= 10) {
-		DEBUG(10, ("messaging_tdb_fetch:\n"));
-		NDR_PRINT_DEBUG(messaging_array, result);
-	}
-
-	*presult = result;
-	return NT_STATUS_OK;
-}
-
-/*
-  Store a messaging array for a pid
-*/
-
-static NTSTATUS messaging_tdb_store(TDB_CONTEXT *msg_tdb,
-				    TDB_DATA key,
-				    struct messaging_array *array)
-{
-	TDB_DATA data;
-	DATA_BLOB blob;
-	enum ndr_err_code ndr_err;
-	TALLOC_CTX *mem_ctx;
-	int ret;
-
-	if (array->num_messages == 0) {
-		tdb_delete(msg_tdb, key);
-		return NT_STATUS_OK;
-	}
-
-	if (!(mem_ctx = talloc_new(array))) {
-		return NT_STATUS_NO_MEMORY;
-	}
-
-	ndr_err = ndr_push_struct_blob(&blob, mem_ctx, array,
-		(ndr_push_flags_fn_t)ndr_push_messaging_array);
-
-	if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
-		talloc_free(mem_ctx);
-		return ndr_map_error2ntstatus(ndr_err);
-	}
-
-	if (DEBUGLEVEL >= 10) {
-		DEBUG(10, ("messaging_tdb_store:\n"));
-		NDR_PRINT_DEBUG(messaging_array, array);
-	}
-
-	data.dptr = blob.data;
-	data.dsize = blob.length;
-
-	ret = tdb_store(msg_tdb, key, data, TDB_REPLACE);
-	TALLOC_FREE(mem_ctx);
-
-	return (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION;
-}
-
-/****************************************************************************
- Notify a process that it has a message. If the process doesn't exist 
- then delete its record in the database.
-****************************************************************************/
-
-static NTSTATUS message_notify(struct server_id procid)
-{
-	pid_t pid = procid.pid;
-	int ret;
-	uid_t euid = geteuid();
-
-	/*
-	 * Doing kill with a non-positive pid causes messages to be
-	 * sent to places we don't want.
-	 */
-
-	SMB_ASSERT(pid > 0);
-	if (pid <= 0) {
-		return NT_STATUS_INVALID_HANDLE;
-	}
-
-	if (euid != 0) {
-		/* If we're not root become so to send the message. */
-		save_re_uid();
-		set_effective_uid(0);
-	}
-
-	ret = kill(pid, SIGUSR1);
-
-	if (euid != 0) {
-		/* Go back to who we were. */
-		int saved_errno = errno;
-		restore_re_uid_fromroot();
-		errno = saved_errno;
-	}
-
-	if (ret == 0) {
-		return NT_STATUS_OK;
-	}
-
-	/*
-	 * Something has gone wrong
-	 */
-
-	DEBUG(2,("message to process %d failed - %s\n", (int)pid,
-		 strerror(errno)));
-
-	/*
-	 * No call to map_nt_error_from_unix -- don't want to link in
-	 * errormap.o into lots of utils.
-	 */
-
-	if (errno == ESRCH)  return NT_STATUS_INVALID_HANDLE;
-	if (errno == EINVAL) return NT_STATUS_INVALID_PARAMETER;
-	if (errno == EPERM)  return NT_STATUS_ACCESS_DENIED;
-	return NT_STATUS_UNSUCCESSFUL;
-}
-
-/****************************************************************************
- Send a message to a particular pid.
-****************************************************************************/
-
-static NTSTATUS messaging_tdb_send(struct messaging_context *msg_ctx,
-				   struct server_id pid, int msg_type,
-				   const DATA_BLOB *data,
-				   struct messaging_backend *backend)
-{
-	struct messaging_tdb_context *ctx = talloc_get_type(backend->private_data,
-					    struct messaging_tdb_context);
-	struct messaging_array *msg_array;
-	struct messaging_rec *rec;
-	NTSTATUS status;
-	TDB_DATA key;
-	struct tdb_wrap *tdb = ctx->tdb;
-	TALLOC_CTX *frame = talloc_stackframe();
-
-	/* NULL pointer means implicit length zero. */
-	if (!data->data) {
-		SMB_ASSERT(data->length == 0);
-	}
-
-	/*
-	 * Doing kill with a non-positive pid causes messages to be
-	 * sent to places we don't want.
-	 */
-
-	SMB_ASSERT(procid_to_pid(&pid) > 0);
-
-	key = message_key_pid(frame, pid);
-
-	if (tdb_chainlock(tdb->tdb, key) != 0) {
-		TALLOC_FREE(frame);
-		return NT_STATUS_LOCK_NOT_GRANTED;
-	}
-
-	status = messaging_tdb_fetch(tdb->tdb, key, frame, &msg_array);
-
-	if (!NT_STATUS_IS_OK(status)) {
-		goto done;
-	}
-
-	if ((msg_type & MSG_FLAG_LOWPRIORITY)
-	    && (msg_array->num_messages > 1000)) {
-		DEBUG(5, ("Dropping message for PID %s\n",
-			  procid_str_static(&pid)));
-		status = NT_STATUS_INSUFFICIENT_RESOURCES;
-		goto done;
-	}
-
-	if (!(rec = talloc_realloc(frame, msg_array->messages,
-					 struct messaging_rec,
-					 msg_array->num_messages+1))) {
-		status = NT_STATUS_NO_MEMORY;
-		goto done;
-	}
-
-	rec[msg_array->num_messages].msg_version = MESSAGE_VERSION;
-	rec[msg_array->num_messages].msg_type = msg_type & MSG_TYPE_MASK;
-	rec[msg_array->num_messages].dest = pid;
-	rec[msg_array->num_messages].src = msg_ctx->id;
-	rec[msg_array->num_messages].buf = *data;
-
-	msg_array->messages = rec;
-	msg_array->num_messages += 1;
-
-	status = messaging_tdb_store(tdb->tdb, key, msg_array);
-
-	if (!NT_STATUS_IS_OK(status)) {
-		goto done;
-	}
-
-	status = message_notify(pid);
-
-	if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
-		DEBUG(2, ("pid %s doesn't exist - deleting messages record\n",
-			  procid_str_static(&pid)));
-		tdb_delete(tdb->tdb, message_key_pid(frame, pid));
-	}
-
- done:
-	tdb_chainunlock(tdb->tdb, key);
-	TALLOC_FREE(frame);
-	return status;
-}
-
-/****************************************************************************
- Retrieve all messages for a process.
-****************************************************************************/
-
-static NTSTATUS retrieve_all_messages(TDB_CONTEXT *msg_tdb,
-				      struct server_id id,
-				      TALLOC_CTX *mem_ctx,
-				      struct messaging_array **presult)
-{
-	struct messaging_array *result;
-	TDB_DATA key = message_key_pid(mem_ctx, id);
-	NTSTATUS status;
-
-	if (tdb_chainlock(msg_tdb, key) != 0) {
-		TALLOC_FREE(key.dptr);
-		return NT_STATUS_LOCK_NOT_GRANTED;
-	}
-
-	status = messaging_tdb_fetch(msg_tdb, key, mem_ctx, &result);
-
-	/*
-	 * We delete the record here, tdb_set_max_dead keeps it around
-	 */
-	tdb_delete(msg_tdb, key);
-	tdb_chainunlock(msg_tdb, key);
-
-	if (NT_STATUS_IS_OK(status)) {
-		*presult = result;
-	}
-
-	TALLOC_FREE(key.dptr);
-
-	return status;
-}
-
-/****************************************************************************
- Receive and dispatch any messages pending for this process.
- JRA changed Dec 13 2006. Only one message handler now permitted per type.
- *NOTE*: Dispatch functions must be able to cope with incoming
- messages on an *odd* byte boundary.
-****************************************************************************/
-
-static void message_dispatch(struct messaging_context *msg_ctx)
-{
-	struct messaging_tdb_context *ctx = talloc_get_type(msg_ctx->local->private_data,
-					    struct messaging_tdb_context);
-	struct messaging_array *msg_array = NULL;
-	struct tdb_wrap *tdb = ctx->tdb;
-	NTSTATUS status;
-	uint32 i;
-
-	if (ctx->received_messages == 0) {
-		return;
-	}
-
-	DEBUG(10, ("message_dispatch: received_messages = %d\n",
-		   ctx->received_messages));
-
-	status = retrieve_all_messages(tdb->tdb, msg_ctx->id, NULL, &msg_array);
-	if (!NT_STATUS_IS_OK(status)) {
-		DEBUG(0, ("message_dispatch: failed to retrieve messages: %s\n",
-			   nt_errstr(status)));
-		return;
-	}
-
-	ctx->received_messages = 0;
-
-	for (i=0; i<msg_array->num_messages; i++) {
-		messaging_dispatch_rec(msg_ctx, &msg_array->messages[i]);
-	}
-
-	TALLOC_FREE(msg_array);
-}
-
-/** @} **/
diff --git a/source3/smbd/server.c b/source3/smbd/server.c
index 7c6a905..9fcf254 100644
--- a/source3/smbd/server.c
+++ b/source3/smbd/server.c
@@ -484,13 +484,6 @@ static void remove_child_pid(struct smbd_parent_context *parent,
 			DEBUG(1,("Scheduled cleanup of brl and lock database after unclean shutdown\n"));
 		}
 
-		/*
-		 * Ensure we flush any stored messages
-		 * queued for the child process that
-		 * terminated uncleanly.
-		 */
-		messaging_cleanup_server(parent->msg_ctx, child_id);
-
 		status = messaging_dgm_cleanup(parent->msg_ctx, pid);
 		DEBUG(10, ("%s: messaging_dgm_cleanup returned %s\n",
 			   __func__, nt_errstr(status)));
@@ -1457,10 +1450,6 @@ extern void build_options(bool screen);
 	if (!locking_init())
 		exit(1);
 
-	if (!messaging_tdb_parent_init(ev_ctx)) {
-		exit(1);
-	}
-
 	if (!smbd_parent_notify_init(NULL, msg_ctx, ev_ctx)) {
 		exit(1);
 	}
diff --git a/source3/wscript_build b/source3/wscript_build
index 2ba7a96..4f5661f 100755
--- a/source3/wscript_build
+++ b/source3/wscript_build
@@ -313,7 +313,6 @@ bld.SAMBA3_SUBSYSTEM('TDB_LIB',
 
 bld.SAMBA3_SUBSYSTEM('samba3core',
                    source='''lib/messages.c
-                   lib/messages_local.c
                    lib/messages_dgm.c
                    lib/util_cluster.c
                    lib/id_cache.c
-- 
1.9.1.423.g4596e3a


From a8a41dc3a4b57d74171172562ee54f2950faaeb9 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 4 Apr 2014 15:00:16 +0000
Subject: [PATCH 06/24] smbd: Add a timestamp to queued notify events

In a cluster and with changed messaging it can happen that messages are
scheduled after new SMB requests. This re-ordering breaks a few notify tests.
This starts the infrastructure to add timestamps to notify events, so that they
can be sorted before they are sent out. The timestamp will be the current local
time of notify_fname, that's all we can do.

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/smbd/notify.c | 21 ++++++++++++++-------
 1 file changed, 14 insertions(+), 7 deletions(-)

diff --git a/source3/smbd/notify.c b/source3/smbd/notify.c
index c19982a..6a4b52c 100644
--- a/source3/smbd/notify.c
+++ b/source3/smbd/notify.c
@@ -24,6 +24,12 @@
 #include "smbd/globals.h"
 #include "../librpc/gen_ndr/ndr_notify.h"
 
+struct notify_change_event {
+	struct timespec when;
+	uint32_t action;
+	const char *name;
+};
+
 struct notify_change_buf {
 	/*
 	 * If no requests are pending, changes are queued here. Simple array,
@@ -35,7 +41,7 @@ struct notify_change_buf {
 	 * asked we just return NT_STATUS_OK without specific changes.
 	 */
 	int num_changes;
-	struct notify_change *changes;
+	struct notify_change_event *changes;
 
 	/*
 	 * If no changes are around requests are queued here. Using a linked
@@ -87,8 +93,8 @@ struct notify_mid_map {
 	uint64_t mid;
 };
 
-static bool notify_change_record_identical(struct notify_change *c1,
-					struct notify_change *c2)
+static bool notify_change_record_identical(struct notify_change_event *c1,
+					   struct notify_change_event *c2)
 {
 	/* Note this is deliberately case sensitive. */
 	if (c1->action == c2->action &&
@@ -100,7 +106,7 @@ static bool notify_change_record_identical(struct notify_change *c1,
 
 static bool notify_marshall_changes(int num_changes,
 				uint32 max_offset,
-				struct notify_change *changes,
+				struct notify_change_event *changes,
 				DATA_BLOB *final_blob)
 {
 	int i;
@@ -111,7 +117,7 @@ static bool notify_marshall_changes(int num_changes,
 
 	for (i=0; i<num_changes; i++) {
 		enum ndr_err_code ndr_err;
-		struct notify_change *c;
+		struct notify_change_event *c;
 		struct FILE_NOTIFY_INFORMATION m;
 		DATA_BLOB blob;
 
@@ -437,7 +443,7 @@ void notify_fname(connection_struct *conn, uint32 action, uint32 filter,
 
 static void notify_fsp(files_struct *fsp, uint32 action, const char *name)
 {
-	struct notify_change *change, *changes;
+	struct notify_change_event *change, *changes;
 	char *tmp;
 
 	if (fsp->notify == NULL) {
@@ -483,7 +489,8 @@ static void notify_fsp(files_struct *fsp, uint32 action, const char *name)
 
 	if (!(changes = talloc_realloc(
 		      fsp->notify, fsp->notify->changes,
-		      struct notify_change, fsp->notify->num_changes+1))) {
+		      struct notify_change_event,
+		      fsp->notify->num_changes+1))) {
 		DEBUG(0, ("talloc_realloc failed\n"));
 		return;
 	}
-- 
1.9.1.423.g4596e3a


From fdbad71d683300707aad45e65e74e77a9b4c03b6 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 4 Apr 2014 15:03:44 +0000
Subject: [PATCH 07/24] smbd: Pass timespec_current to notify_fsp

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/smbd/notify.c | 11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)

diff --git a/source3/smbd/notify.c b/source3/smbd/notify.c
index 6a4b52c..693418a 100644
--- a/source3/smbd/notify.c
+++ b/source3/smbd/notify.c
@@ -63,7 +63,8 @@ struct notify_change_request {
 	void *backend_data;
 };
 
-static void notify_fsp(files_struct *fsp, uint32 action, const char *name);
+static void notify_fsp(files_struct *fsp, struct timespec when,
+		       uint32 action, const char *name);
 
 bool change_notify_fsp_has_changes(struct files_struct *fsp)
 {
@@ -214,7 +215,7 @@ static void notify_callback(void *private_data, const struct notify_event *e)
 {
 	files_struct *fsp = (files_struct *)private_data;
 	DEBUG(10, ("notify_callback called for %s\n", fsp_str_dbg(fsp)));
-	notify_fsp(fsp, e->action, e->path);
+	notify_fsp(fsp, timespec_current(), e->action, e->path);
 }
 
 static void sys_notify_callback(struct sys_notify_context *ctx,
@@ -223,7 +224,7 @@ static void sys_notify_callback(struct sys_notify_context *ctx,
 {
 	files_struct *fsp = (files_struct *)private_data;
 	DEBUG(10, ("sys_notify_callback called for %s\n", fsp_str_dbg(fsp)));
-	notify_fsp(fsp, e->action, e->path);
+	notify_fsp(fsp, timespec_current(), e->action, e->path);
 }
 
 NTSTATUS change_notify_create(struct files_struct *fsp, uint32 filter,
@@ -441,7 +442,8 @@ void notify_fname(connection_struct *conn, uint32 action, uint32 filter,
 	TALLOC_FREE(to_free);
 }
 
-static void notify_fsp(files_struct *fsp, uint32 action, const char *name)
+static void notify_fsp(files_struct *fsp, struct timespec when,
+		       uint32 action, const char *name)
 {
 	struct notify_change_event *change, *changes;
 	char *tmp;
@@ -507,6 +509,7 @@ static void notify_fsp(files_struct *fsp, uint32 action, const char *name)
 	string_replace(tmp, '/', '\\');
 	change->name = tmp;	
 
+	change->when = when;
 	change->action = action;
 	fsp->notify->num_changes += 1;
 
-- 
1.9.1.423.g4596e3a


From 2df1ab3e8f22fc260f2325d1e8446b898531a963 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 4 Apr 2014 15:11:51 +0000
Subject: [PATCH 08/24] smbd: Pass timespec_current through the notify_callback

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/smbd/notify.c          | 5 +++--
 source3/smbd/notify_internal.c | 8 +++++---
 source3/smbd/proto.h           | 3 ++-
 3 files changed, 10 insertions(+), 6 deletions(-)

diff --git a/source3/smbd/notify.c b/source3/smbd/notify.c
index 693418a..b8085dd 100644
--- a/source3/smbd/notify.c
+++ b/source3/smbd/notify.c
@@ -211,11 +211,12 @@ void change_notify_reply(struct smb_request *req,
 	notify_buf->num_changes = 0;
 }
 
-static void notify_callback(void *private_data, const struct notify_event *e)
+static void notify_callback(void *private_data, struct timespec when,
+			    const struct notify_event *e)
 {
 	files_struct *fsp = (files_struct *)private_data;
 	DEBUG(10, ("notify_callback called for %s\n", fsp_str_dbg(fsp)));
-	notify_fsp(fsp, timespec_current(), e->action, e->path);
+	notify_fsp(fsp, when, e->action, e->path);
 }
 
 static void sys_notify_callback(struct sys_notify_context *ctx,
diff --git a/source3/smbd/notify_internal.c b/source3/smbd/notify_internal.c
index 4d88565..69f9377 100644
--- a/source3/smbd/notify_internal.c
+++ b/source3/smbd/notify_internal.c
@@ -44,7 +44,7 @@
 struct notify_list {
 	struct notify_list *next, *prev;
 	const char *path;
-	void (*callback)(void *, const struct notify_event *);
+	void (*callback)(void *, struct timespec, const struct notify_event *);
 	void *private_data;
 };
 
@@ -194,7 +194,8 @@ static int notify_context_destructor(struct notify_context *notify)
 
 NTSTATUS notify_add(struct notify_context *notify,
 		    const char *path, uint32_t filter, uint32_t subdir_filter,
-		    void (*callback)(void *, const struct notify_event *),
+		    void (*callback)(void *, struct timespec,
+				     const struct notify_event *),
 		    void *private_data)
 {
 	struct notify_db_entry e;
@@ -820,7 +821,8 @@ static void notify_handler(struct messaging_context *msg_ctx,
 
 	for (listel=notify->list;listel;listel=listel->next) {
 		if (listel->private_data == n->private_data) {
-			listel->callback(listel->private_data, n);
+			listel->callback(listel->private_data,
+					 timespec_current(), n);
 			break;
 		}
 	}
diff --git a/source3/smbd/proto.h b/source3/smbd/proto.h
index d9b86b6..bc15f15 100644
--- a/source3/smbd/proto.h
+++ b/source3/smbd/proto.h
@@ -552,7 +552,8 @@ struct notify_context *notify_init(TALLOC_CTX *mem_ctx,
 				   struct tevent_context *ev);
 NTSTATUS notify_add(struct notify_context *notify,
 		    const char *path, uint32_t filter, uint32_t subdir_filter,
-		    void (*callback)(void *, const struct notify_event *),
+		    void (*callback)(void *, struct timespec,
+				     const struct notify_event *),
 		    void *private_data);
 NTSTATUS notify_remove(struct notify_context *notify, void *private_data);
 void notify_trigger(struct notify_context *notify,
-- 
1.9.1.423.g4596e3a


From 73be88a927d272c8b3af76d913c9633e1362d638 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Sun, 2 Mar 2014 18:34:53 +0100
Subject: [PATCH 09/24] lib: Introduce iov_buflen

.. with overflow protection

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/include/proto.h |  1 +
 source3/lib/util_sock.c | 28 +++++++++++++++++++++++-----
 2 files changed, 24 insertions(+), 5 deletions(-)

diff --git a/source3/include/proto.h b/source3/include/proto.h
index c854882..983a4d0 100644
--- a/source3/include/proto.h
+++ b/source3/include/proto.h
@@ -583,6 +583,7 @@ NTSTATUS read_fd_with_timeout(int fd, char *buf,
 				  size_t *size_ret);
 NTSTATUS read_data(int fd, char *buffer, size_t N);
 ssize_t write_data(int fd, const char *buffer, size_t N);
+ssize_t iov_buflen(const struct iovec *iov, int iovlen);
 ssize_t write_data_iov(int fd, const struct iovec *orig_iov, int iovcnt);
 bool send_keepalive(int client);
 NTSTATUS read_smb_length_return_keepalive(int fd, char *inbuf,
diff --git a/source3/lib/util_sock.c b/source3/lib/util_sock.c
index 12e4ccd..b8149ca 100644
--- a/source3/lib/util_sock.c
+++ b/source3/lib/util_sock.c
@@ -201,6 +201,24 @@ NTSTATUS read_data(int fd, char *buffer, size_t N)
 	return read_fd_with_timeout(fd, buffer, N, N, 0, NULL);
 }
 
+ssize_t iov_buflen(const struct iovec *iov, int iovcnt)
+{
+	size_t buflen = 0;
+	int i;
+
+	for (i=0; i<iovcnt; i++) {
+		size_t thislen = iov[i].iov_len;
+		size_t tmp = buflen + thislen;
+
+		if ((tmp < buflen) || (tmp < thislen)) {
+			/* overflow */
+			return -1;
+		}
+		buflen = tmp;
+	}
+	return buflen;
+}
+
 /****************************************************************************
  Write all data from an iov array
  NB. This can be called with a non-socket fd, don't add dependencies
@@ -209,15 +227,15 @@ NTSTATUS read_data(int fd, char *buffer, size_t N)
 
 ssize_t write_data_iov(int fd, const struct iovec *orig_iov, int iovcnt)
 {
-	int i;
-	size_t to_send;
+	ssize_t to_send;
 	ssize_t thistime;
 	size_t sent;
 	struct iovec *iov_copy, *iov;
 
-	to_send = 0;
-	for (i=0; i<iovcnt; i++) {
-		to_send += orig_iov[i].iov_len;
+	to_send = iov_buflen(orig_iov, iovcnt);
+	if (to_send == -1) {
+		errno = EINVAL;
+		return -1;
 	}
 
 	thistime = sys_writev(fd, orig_iov, iovcnt);
-- 
1.9.1.423.g4596e3a


From 31cad04a4888b573f9df0396386a99095c79b2da Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Sun, 2 Mar 2014 19:33:08 +0100
Subject: [PATCH 10/24] lib: Add iov_buf

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/include/proto.h |  1 +
 source3/lib/util_sock.c | 25 +++++++++++++++++++++++++
 2 files changed, 26 insertions(+)

diff --git a/source3/include/proto.h b/source3/include/proto.h
index 983a4d0..0a4db86 100644
--- a/source3/include/proto.h
+++ b/source3/include/proto.h
@@ -584,6 +584,7 @@ NTSTATUS read_fd_with_timeout(int fd, char *buf,
 NTSTATUS read_data(int fd, char *buffer, size_t N);
 ssize_t write_data(int fd, const char *buffer, size_t N);
 ssize_t iov_buflen(const struct iovec *iov, int iovlen);
+uint8_t *iov_buf(TALLOC_CTX *mem_ctx, const struct iovec *iov, int iovcnt);
 ssize_t write_data_iov(int fd, const struct iovec *orig_iov, int iovcnt);
 bool send_keepalive(int client);
 NTSTATUS read_smb_length_return_keepalive(int fd, char *inbuf,
diff --git a/source3/lib/util_sock.c b/source3/lib/util_sock.c
index b8149ca..c5de61a 100644
--- a/source3/lib/util_sock.c
+++ b/source3/lib/util_sock.c
@@ -219,6 +219,31 @@ ssize_t iov_buflen(const struct iovec *iov, int iovcnt)
 	return buflen;
 }
 
+uint8_t *iov_buf(TALLOC_CTX *mem_ctx, const struct iovec *iov, int iovcnt)
+{
+	int i;
+	ssize_t buflen;
+	uint8_t *buf, *p;
+
+	buflen = iov_buflen(iov, iovcnt);
+	if (buflen == -1) {
+		return NULL;
+	}
+	buf = talloc_array(mem_ctx, uint8_t, buflen);
+	if (buf == NULL) {
+		return NULL;
+	}
+
+	p = buf;
+	for (i=0; i<iovcnt; i++) {
+		size_t len = iov[i].iov_len;
+
+		memcpy(p, iov[i].iov_base, len);
+		p += len;
+	}
+	return buf;
+}
+
 /****************************************************************************
  Write all data from an iov array
  NB. This can be called with a non-socket fd, don't add dependencies
-- 
1.9.1.423.g4596e3a


From 0870d2dfec6edad1a7568dc83e4c0d7782e11cd7 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Tue, 25 Feb 2014 12:15:58 +0000
Subject: [PATCH 11/24] messaging3: Add messaging_send_iov

This uses a copy, will be replaced by a direct iovec call through to
sendmsg on the unix domain socket

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/include/messages.h |  3 +++
 source3/lib/messages.c     | 19 +++++++++++++++++++
 2 files changed, 22 insertions(+)

diff --git a/source3/include/messages.h b/source3/include/messages.h
index b89af9c..3e1b664 100644
--- a/source3/include/messages.h
+++ b/source3/include/messages.h
@@ -132,6 +132,9 @@ NTSTATUS messaging_send(struct messaging_context *msg_ctx,
 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
 			    struct server_id server, uint32_t msg_type,
 			    const uint8_t *buf, size_t len);
+NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
+			    struct server_id server, uint32_t msg_type,
+			    const struct iovec *iov, int iovlen);
 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
 			    struct messaging_rec *rec);
 
diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index 5db1214..b4ed807 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -419,6 +419,25 @@ NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
 	return messaging_send(msg_ctx, server, msg_type, &blob);
 }
 
+NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
+			    struct server_id server, uint32_t msg_type,
+			    const struct iovec *iov, int iovlen)
+{
+	uint8_t *buf;
+	NTSTATUS status;
+
+	buf = iov_buf(talloc_tos(), iov, iovlen);
+	if (buf == NULL) {
+		return NT_STATUS_NO_MEMORY;
+	}
+
+	status = messaging_send_buf(msg_ctx, server, msg_type,
+				    buf, talloc_get_size(buf));
+
+	TALLOC_FREE(buf);
+	return status;
+}
+
 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
 					       struct messaging_rec *rec)
 {
-- 
1.9.1.423.g4596e3a


From 3b4a2132c4c807951874c7c0a0ffe68e1824f9fb Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 4 Apr 2014 21:01:01 +0200
Subject: [PATCH 12/24] smbd: Pass on a timestamp in MSG_PVFS_NOTIFY

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/smbd/notify_internal.c | 67 +++++++++++++++++++++---------------------
 1 file changed, 33 insertions(+), 34 deletions(-)

diff --git a/source3/smbd/notify_internal.c b/source3/smbd/notify_internal.c
index 69f9377..e902bf4 100644
--- a/source3/smbd/notify_internal.c
+++ b/source3/smbd/notify_internal.c
@@ -767,30 +767,32 @@ done:
 	TALLOC_FREE(data.dptr);
 }
 
+struct notify_msg {
+	struct timespec when;
+	void *private_data;
+	uint32_t action;
+	char path[1];
+};
+
 static NTSTATUS notify_send(struct notify_context *notify,
 			    struct server_id *pid,
 			    const char *path, uint32_t action,
 			    void *private_data)
 {
-	struct notify_event ev;
-	DATA_BLOB data;
-	NTSTATUS status;
-	enum ndr_err_code ndr_err;
+	struct notify_msg m = {};
+	struct iovec iov[2];
 
-	ev.action = action;
-	ev.path = path;
-	ev.private_data = private_data;
+	m.when = timespec_current();
+	m.private_data = private_data;
+	m.action = action;
 
-	ndr_err = ndr_push_struct_blob(
-		&data, talloc_tos(), &ev,
-		(ndr_push_flags_fn_t)ndr_push_notify_event);
-	if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
-		return ndr_map_error2ntstatus(ndr_err);
-	}
-	status = messaging_send(notify->msg, *pid, MSG_PVFS_NOTIFY,
-				&data);
-	TALLOC_FREE(data.data);
-	return status;
+	iov[0].iov_base = &m;
+	iov[0].iov_len = offsetof(struct notify_msg, path);
+	iov[1].iov_base = discard_const_p(char, path);
+	iov[1].iov_len = strlen(path)+1;
+
+	return messaging_send_iov(notify->msg, *pid, MSG_PVFS_NOTIFY,
+				  iov, ARRAY_SIZE(iov));
 }
 
 static void notify_handler(struct messaging_context *msg_ctx,
@@ -799,34 +801,31 @@ static void notify_handler(struct messaging_context *msg_ctx,
 {
 	struct notify_context *notify = talloc_get_type_abort(
 		private_data, struct notify_context);
-	enum ndr_err_code ndr_err;
-	struct notify_event *n;
+	struct notify_msg *m;
+	struct notify_event e;
 	struct notify_list *listel;
 
-	n = talloc(talloc_tos(), struct notify_event);
-	if (n == NULL) {
-		DEBUG(1, ("talloc failed\n"));
+	if (data->length == 0) {
+		DEBUG(1, ("%s: Got 0-sized MSG_PVFS_NOTIFY msg\n", __func__));
 		return;
 	}
-
-	ndr_err = ndr_pull_struct_blob(
-		data, n, n, (ndr_pull_flags_fn_t)ndr_pull_notify_event);
-	if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
-		TALLOC_FREE(n);
+	if (data->data[data->length-1] != 0) {
+		DEBUG(1, ("%s: MSG_PVFS_NOTIFY path not 0-terminated\n",
+			  __func__));
 		return;
 	}
-	if (DEBUGLEVEL >= 10) {
-		NDR_PRINT_DEBUG(notify_event, n);
-	}
+
+	m = (struct notify_msg *)data->data;
+	e.action = m->action;
+	e.path = m->path;
+	e.private_data = m->private_data;
 
 	for (listel=notify->list;listel;listel=listel->next) {
-		if (listel->private_data == n->private_data) {
-			listel->callback(listel->private_data,
-					 timespec_current(), n);
+		if (listel->private_data == m->private_data) {
+			listel->callback(listel->private_data, m->when, &e);
 			break;
 		}
 	}
-	TALLOC_FREE(n);
 }
 
 struct notify_walk_idx_state {
-- 
1.9.1.423.g4596e3a


From cb1133bd4821eb9cbbd1ea0b9c84fade1dc93d11 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 4 Apr 2014 21:12:06 +0200
Subject: [PATCH 13/24] smbd: Sort notify events by timestamp

This will fix the raw.notify test with the new messaging system. With the new
messaging system messages come in via yet another fd that has to line up in
poll next to the incoming client TCP socket. With the signal-based messaging
messages were always handled before client requests. The new scheme means that
notify messages might be deferred a bit (something which can happen in a
cluster already now), which then means that notify_marshall_changes() will
coalesce entries, which in turn makes raw.notify unhappy.

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/smbd/notify.c | 16 ++++++++++++++++
 1 file changed, 16 insertions(+)

diff --git a/source3/smbd/notify.c b/source3/smbd/notify.c
index b8085dd..dd4dc1a 100644
--- a/source3/smbd/notify.c
+++ b/source3/smbd/notify.c
@@ -170,6 +170,14 @@ static bool notify_marshall_changes(int num_changes,
 	return True;
 }
 
+static int compare_notify_change_events(const void *p1, const void *p2)
+{
+	const struct notify_change_event *e1 = p1;
+	const struct notify_change_event *e2 = p2;
+
+	return timespec_compare(&e1->when, &e2->when);
+}
+
 /****************************************************************************
  Setup the common parts of the return packet and send it.
 *****************************************************************************/
@@ -194,6 +202,14 @@ void change_notify_reply(struct smb_request *req,
 		return;
 	}
 
+	/*
+	 * Sort the notifies by timestamp when the event happened to avoid
+	 * coalescing and thus dropping events in notify_marshall_changes.
+	 */
+
+	qsort(notify_buf->changes, notify_buf->num_changes,
+	      sizeof(*(notify_buf->changes)), compare_notify_change_events);
+
 	if (!notify_marshall_changes(notify_buf->num_changes, max_param,
 					notify_buf->changes, &blob)) {
 		/*
-- 
1.9.1.423.g4596e3a


From bbadd61f370bf873b1bbce18e6077dcbc13ec068 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 11 Apr 2014 09:09:49 +0200
Subject: [PATCH 14/24] printing_cups: Call the msg_ctx destructor on exit

With the new messaging, if we don't do this, we'll leave sockets around. I'm
sure we will not catch everything, so a periodic cleanup will be required.

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/printing/print_cups.c | 1 +
 1 file changed, 1 insertion(+)

diff --git a/source3/printing/print_cups.c b/source3/printing/print_cups.c
index 6c1e9ce..0ec71ab 100644
--- a/source3/printing/print_cups.c
+++ b/source3/printing/print_cups.c
@@ -483,6 +483,7 @@ static bool cups_pcap_load_async(struct tevent_context *ev,
 	close(fds[0]);
 	cups_cache_reload_async(fds[1]);
 	close(fds[1]);
+	TALLOC_FREE(msg_ctx);
 	_exit(0);
 }
 
-- 
1.9.1.423.g4596e3a


From b870396c4776ef7bafe5fd4f934fc29a44be4b89 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 11 Apr 2014 09:12:46 +0200
Subject: [PATCH 15/24] smbcontrol: Clean up the msg_ctx

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/utils/smbcontrol.c | 1 +
 1 file changed, 1 insertion(+)

diff --git a/source3/utils/smbcontrol.c b/source3/utils/smbcontrol.c
index 156a7ad..d0e923a 100644
--- a/source3/utils/smbcontrol.c
+++ b/source3/utils/smbcontrol.c
@@ -1580,6 +1580,7 @@ int main(int argc, const char **argv)
 	}
 
 	ret = !do_command(evt_ctx, msg_ctx, argc, argv);
+	TALLOC_FREE(msg_ctx);
 	TALLOC_FREE(frame);
 	return ret;
 }
-- 
1.9.1.423.g4596e3a


From 654528dde490f17c744eb56b5811f0291a05f0b2 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 11 Apr 2014 09:13:10 +0200
Subject: [PATCH 16/24] smbd: Always clean up the child's msg_ctx

This is a bit lazy programming, we could and possibly should do this in
exit_server() in the child. But this way we make sure the cleanup works. If it
only was executed for unclean exits, we might not detect failure of this code
in the parent.

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/smbd/server.c | 11 +++++------
 1 file changed, 5 insertions(+), 6 deletions(-)

diff --git a/source3/smbd/server.c b/source3/smbd/server.c
index 9fcf254..55a64f6 100644
--- a/source3/smbd/server.c
+++ b/source3/smbd/server.c
@@ -445,9 +445,14 @@ static void remove_child_pid(struct smbd_parent_context *parent,
 {
 	struct smbd_child_pid *child;
 	struct server_id child_id;
+	NTSTATUS status;
 
 	child_id = pid_to_procid(pid);
 
+	status = messaging_dgm_cleanup(parent->msg_ctx, pid);
+	DEBUG(10, ("%s: messaging_dgm_cleanup returned %s\n",
+		   __func__, nt_errstr(status)));
+
 	for (child = parent->children; child != NULL; child = child->next) {
 		if (child->pid == pid) {
 			struct smbd_child_pid *tmp = child;
@@ -465,8 +470,6 @@ static void remove_child_pid(struct smbd_parent_context *parent,
 	}
 
 	if (unclean_shutdown) {
-		NTSTATUS status;
-
 		/* a child terminated uncleanly so tickle all
 		   processes to see if they can grab any of the
 		   pending locks
@@ -483,10 +486,6 @@ static void remove_child_pid(struct smbd_parent_context *parent,
 						parent);
 			DEBUG(1,("Scheduled cleanup of brl and lock database after unclean shutdown\n"));
 		}
-
-		status = messaging_dgm_cleanup(parent->msg_ctx, pid);
-		DEBUG(10, ("%s: messaging_dgm_cleanup returned %s\n",
-			   __func__, nt_errstr(status)));
 	}
 
 	if (!serverid_deregister(child_id)) {
-- 
1.9.1.423.g4596e3a


From fe579fe177b2713024e2281b91cbc10dbe0db348 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Thu, 10 Apr 2014 22:07:11 +0200
Subject: [PATCH 17/24] messaging_dgm: Add messaging_dgm_wipe

This walks all sockets and wipes the left-overs

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/include/messages.h |  1 +
 source3/lib/messages_dgm.c | 54 ++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 55 insertions(+)

diff --git a/source3/include/messages.h b/source3/include/messages.h
index 3e1b664..8a818db 100644
--- a/source3/include/messages.h
+++ b/source3/include/messages.h
@@ -95,6 +95,7 @@ NTSTATUS messaging_dgm_init(struct messaging_context *msg_ctx,
 			    TALLOC_CTX *mem_ctx,
 			    struct messaging_backend **presult);
 NTSTATUS messaging_dgm_cleanup(struct messaging_context *msg_ctx, pid_t pid);
+NTSTATUS messaging_dgm_wipe(struct messaging_context *msg_ctx);
 
 NTSTATUS messaging_ctdbd_init(struct messaging_context *msg_ctx,
 			      TALLOC_CTX *mem_ctx,
diff --git a/source3/lib/messages_dgm.c b/source3/lib/messages_dgm.c
index 0d4df2f..5827b78 100644
--- a/source3/lib/messages_dgm.c
+++ b/source3/lib/messages_dgm.c
@@ -418,3 +418,57 @@ NTSTATUS messaging_dgm_cleanup(struct messaging_context *msg_ctx, pid_t pid)
 	TALLOC_FREE(lockfile_name);
 	return NT_STATUS_OK;
 }
+
+NTSTATUS messaging_dgm_wipe(struct messaging_context *msg_ctx)
+{
+	struct messaging_dgm_context *ctx = talloc_get_type_abort(
+		msg_ctx->local->private_data, struct messaging_dgm_context);
+	char *msgdir_name;
+	DIR *msgdir;
+	struct dirent *dp;
+	pid_t our_pid = getpid();
+
+	/*
+	 * We scan the socket directory and not the lock directory. Otherwise
+	 * we would race against messaging_dgm_lockfile_create's open(O_CREAT)
+	 * and fcntl(SETLK).
+	 */
+
+	msgdir_name = talloc_asprintf(talloc_tos(), "%s/msg", ctx->cache_dir);
+	if (msgdir_name == NULL) {
+		return NT_STATUS_NO_MEMORY;
+	}
+
+	msgdir = opendir(msgdir_name);
+	TALLOC_FREE(msgdir_name);
+	if (msgdir == NULL) {
+		return map_nt_error_from_unix(errno);
+	}
+
+	while ((dp = readdir(msgdir)) != NULL) {
+		NTSTATUS status;
+		unsigned long pid;
+
+		pid = strtoul(dp->d_name, NULL, 10);
+		if (pid == 0) {
+			/*
+			 * . and .. and other malformed entries
+			 */
+			continue;
+		}
+		if (pid == our_pid) {
+			/*
+			 * fcntl(F_GETLK) will succeed for ourselves, we hold
+			 * that lock ourselves.
+			 */
+			continue;
+		}
+
+		status = messaging_dgm_cleanup(msg_ctx, pid);
+		DEBUG(10, ("messaging_dgm_cleanup(%lu) returned %s\n",
+			   pid, nt_errstr(status)));
+	}
+	closedir(msgdir);
+
+	return NT_STATUS_OK;
+}
-- 
1.9.1.423.g4596e3a


From 33be2cf7161eef222b25f1aad500dd40819b4f35 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Thu, 10 Apr 2014 22:09:04 +0200
Subject: [PATCH 18/24] smbcontrol: Add dgm-cleanup command

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/utils/smbcontrol.c | 20 ++++++++++++++++++++
 1 file changed, 20 insertions(+)

diff --git a/source3/utils/smbcontrol.c b/source3/utils/smbcontrol.c
index d0e923a..274de70 100644
--- a/source3/utils/smbcontrol.c
+++ b/source3/utils/smbcontrol.c
@@ -968,6 +968,25 @@ static bool do_num_children(struct tevent_context *ev_ctx,
 	return num_replies;
 }
 
+static bool do_dgm_cleanup(struct tevent_context *ev_ctx,
+			   struct messaging_context *msg_ctx,
+			   const struct server_id pid,
+			   const int argc, const char **argv)
+{
+	NTSTATUS status;
+
+	if (pid.pid != 0) {
+		status = messaging_dgm_cleanup(msg_ctx, pid.pid);
+	} else {
+		status = messaging_dgm_wipe(msg_ctx);
+	}
+
+	printf("cleanup(%u) returned %s\n", (unsigned)pid.pid,
+	       nt_errstr(status));
+
+	return NT_STATUS_IS_OK(status);
+}
+
 /* Shutdown a server process */
 
 static bool do_shutdown(struct tevent_context *ev_ctx,
@@ -1378,6 +1397,7 @@ static const struct {
 	{ "notify-cleanup", do_notify_cleanup },
 	{ "num-children", do_num_children,
 	  "Print number of smbd child processes" },
+	{ "dgm-cleanup", do_dgm_cleanup },
 	{ "noop", do_noop, "Do nothing" },
 	{ NULL }
 };
-- 
1.9.1.423.g4596e3a


From 36e6151bcfd7c43b584d788cc7bbd251a26697eb Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 11 Apr 2014 11:07:10 +0000
Subject: [PATCH 19/24] smbd: Call the msg_ctx destructor for background jobs

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/lib/background.c | 1 +
 1 file changed, 1 insertion(+)

diff --git a/source3/lib/background.c b/source3/lib/background.c
index 6a91783..a9fd04f 100644
--- a/source3/lib/background.c
+++ b/source3/lib/background.c
@@ -181,6 +181,7 @@ static void background_job_waited(struct tevent_req *subreq)
 		if (written == -1) {
 			_exit(1);
 		}
+		TALLOC_FREE(state->msg);
 		_exit(0);
 	}
 
-- 
1.9.1.423.g4596e3a


From 8357428e5576692577bb26ac06ac1baf16d2d551 Mon Sep 17 00:00:00 2001
From: Jeremy Allison <jra at samba.org>
Date: Fri, 18 Apr 2014 14:47:39 -0700
Subject: [PATCH 20/24] s3 : build system : Move lib/background.c from
 smbd_base to samba3core.

Allows background jobs to be run from winbindd and nmbd.

Signed-off-by: Jeremy Allison <jra at samba.org>
---
 source3/wscript_build | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/source3/wscript_build b/source3/wscript_build
index 4f5661f..e9c2f91 100755
--- a/source3/wscript_build
+++ b/source3/wscript_build
@@ -343,7 +343,8 @@ bld.SAMBA3_SUBSYSTEM('samba3core',
                    lib/audit.c
                    lib/tevent_wait.c
                    lib/idmap_cache.c
-                   lib/util_ea.c''',
+                   lib/util_ea.c
+                   lib/background.c''',
                    deps='''
                         samba3util
                         LIBTSOCKET
@@ -554,7 +555,6 @@ bld.SAMBA3_LIBRARY('smbd_base',
                    smbd/error.c
                    printing/printspoolss.c
                    printing/spoolssd.c
-                   lib/background.c
                    lib/sessionid_tdb.c
                    lib/conn_tdb.c
                    smbd/fake_file.c
-- 
1.9.1.423.g4596e3a


From 06345f1921d4a9b0e063f4b4fdd6168ee3c9cf69 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 11 Apr 2014 11:08:56 +0000
Subject: [PATCH 21/24] s3: messaging: Add infrastructure to clean up orphaned
 sockets every 15 minutes as a background task.

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/include/messages.h |  2 ++
 source3/lib/messages.c     | 50 ++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 52 insertions(+)

diff --git a/source3/include/messages.h b/source3/include/messages.h
index 8a818db..1681ec9 100644
--- a/source3/include/messages.h
+++ b/source3/include/messages.h
@@ -146,6 +146,8 @@ struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
 			struct messaging_rec **presult);
 
+bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg);
+
 #include "librpc/gen_ndr/ndr_messaging.h"
 
 #endif
diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index b4ed807..b6fe423 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -50,6 +50,7 @@
 #include "serverid.h"
 #include "messages.h"
 #include "lib/util/tevent_unix.h"
+#include "lib/background.h"
 
 struct messaging_callback {
 	struct messaging_callback *prev, *next;
@@ -586,4 +587,53 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx,
 	return;
 }
 
+static int mess_parent_dgm_cleanup(void *private_data);
+static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
+
+bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
+{
+	struct tevent_req *req;
+
+	req = background_job_send(
+		msg, msg->event_ctx, msg, NULL, 0,
+		lp_parm_int(-1, "messaging", "messaging dgm cleanup interval", 60*15),
+		mess_parent_dgm_cleanup, msg);
+	if (req == NULL) {
+		return false;
+	}
+	tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
+	return true;
+}
+
+static int mess_parent_dgm_cleanup(void *private_data)
+{
+	struct messaging_context *msg_ctx = talloc_get_type_abort(
+		private_data, struct messaging_context);
+	NTSTATUS status;
+
+	status = messaging_dgm_wipe(msg_ctx);
+	DEBUG(10, ("messaging_dgm_wipe returned %s\n", nt_errstr(status)));
+	return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval", 60*15);
+}
+
+static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
+{
+	struct messaging_context *msg = tevent_req_callback_data(
+		req, struct messaging_context);
+	NTSTATUS status;
+
+	status = background_job_recv(req);
+	TALLOC_FREE(req);
+	DEBUG(1, ("messaging dgm cleanup job ended with %s\n", nt_errstr(status)));
+
+	req = background_job_send(
+		msg, msg->event_ctx, msg, NULL, 0,
+		lp_parm_int(-1, "messaging", "messaging dgm cleanup interval", 60*15),
+		mess_parent_dgm_cleanup, msg);
+	if (req == NULL) {
+		DEBUG(1, ("background_job_send failed\n"));
+	}
+	tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
+}
+
 /** @} **/
-- 
1.9.1.423.g4596e3a


From 67671077d9066b3caca2bd5e71d711915f50cf08 Mon Sep 17 00:00:00 2001
From: Jeremy Allison <jra at samba.org>
Date: Fri, 18 Apr 2014 15:06:05 -0700
Subject: [PATCH 22/24] s3: smbd: Call dgram cleanup init background setup.

Signed-off-by: Jeremy Allison <jra at samba.org>
---
 source3/smbd/server.c | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/source3/smbd/server.c b/source3/smbd/server.c
index 55a64f6..33021c0 100644
--- a/source3/smbd/server.c
+++ b/source3/smbd/server.c
@@ -1453,6 +1453,10 @@ extern void build_options(bool screen);
 		exit(1);
 	}
 
+	if (!messaging_parent_dgm_cleanup_init(msg_ctx)) {
+		exit(1);
+	}
+
 	if (!smbd_scavenger_init(NULL, msg_ctx, ev_ctx)) {
 		exit(1);
 	}
-- 
1.9.1.423.g4596e3a


From a13edb69fcec4026313cf33159db9281d2432c8a Mon Sep 17 00:00:00 2001
From: Jeremy Allison <jra at samba.org>
Date: Fri, 18 Apr 2014 15:08:19 -0700
Subject: [PATCH 23/24] s3: nmbd: Call dgram cleanup init background setup.

Signed-off-by: Jeremy Allison <jra at samba.org>
---
 source3/nmbd/nmbd.c | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/source3/nmbd/nmbd.c b/source3/nmbd/nmbd.c
index 80c2329..dd7f133 100644
--- a/source3/nmbd/nmbd.c
+++ b/source3/nmbd/nmbd.c
@@ -1015,6 +1015,10 @@ static bool open_sockets(bool isdaemon, int port)
 	if (!nmbd_setup_sig_hup_handler(msg))
 		exit(1);
 
+	if (!messaging_parent_dgm_cleanup_init(msg)) {
+		exit(1);
+	}
+
 	/* get broadcast messages */
 
 	if (!serverid_register(messaging_server_id(msg),
-- 
1.9.1.423.g4596e3a


From f4f466f17eed34946fda874feddabf4186639c5f Mon Sep 17 00:00:00 2001
From: Jeremy Allison <jra at samba.org>
Date: Fri, 18 Apr 2014 15:09:28 -0700
Subject: [PATCH 24/24] s3: winbindd: Call dgram cleanup init background setup.

Signed-off-by: Jeremy Allison <jra at samba.org>
---
 source3/winbindd/winbindd.c | 4 ++++
 1 file changed, 4 insertions(+)

diff --git a/source3/winbindd/winbindd.c b/source3/winbindd/winbindd.c
index 8bc1343..7d91ad8 100644
--- a/source3/winbindd/winbindd.c
+++ b/source3/winbindd/winbindd.c
@@ -1568,6 +1568,10 @@ int main(int argc, const char **argv)
 
 	winbindd_register_handlers(winbind_messaging_context(), !Fork);
 
+	if (!messaging_parent_dgm_cleanup_init(winbind_messaging_context())) {
+		exit(1);
+	}
+
 	status = init_system_session_info();
 	if (!NT_STATUS_IS_OK(status)) {
 		DEBUG(1, ("ERROR: failed to setup system user info: %s.\n",
-- 
1.9.1.423.g4596e3a



More information about the samba-technical mailing list