Unix domain datagram based messaging

Volker Lendecke Volker.Lendecke at SerNet.DE
Wed Apr 9 14:49:31 MDT 2014


Hi!

Attached find a patchset that implements messaging for
source3 based on unix domain datagram sockets. It does it a
bit differently than the source4 system: The busy fallback
is not done with a periodic retry but with a blocking
thread. In my experiments this has turned out to be much
friendlier in a complete overload situation, a thread
blocking in sendmsg was the only way I could avoid a
thundering herd, at least under linux.

With this, smbd leaks a few lockfiles and sockets, so
this needs a bit more work, but the general approach seems
to work fine for me.

This code has two advantages over the tdb-based one:

No signals

Significantly better performance (no array marshalling etc)

There's one little disadvantage: 

We need one more fd and an fcntl lock per smbd held open.
The reason for this is that there is no atomic O_EXCL bind
operation for unix domain sockets, so safely cleaning up for
dead processes is not possible without an fcntl lock. The
fcntl lock might hurt, but in normal operations we never
contend on this, so it should be okay. And it's one per
process, so not too much even with thousands of processes.
We can turn this disadvantage into an advantage later on:
serverid.tdb can go, this code saves the unique id in the
lock file, so serverid_exists can work without the tdb file.

Comments (no formal review yet :-) would be appreciated!

Thanks,

Volker

-- 
SerNet GmbH, Bahnhofsallee 1b, 37081 Göttingen
phone: +49-551-370000-0, fax: +49-551-370000-9
AG Göttingen, HRB 2816, GF: Dr. Johannes Loxen
http://www.sernet.de, mailto:kontakt at sernet.de
-------------- next part --------------
>From 1e4dc45a82008dff160d376aea3d005f2396621e Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 24 Feb 2014 11:43:51 +0000
Subject: [PATCH 01/13] lib: Add poll_funcs

This is an abstraction for a tevent loop. It will be used in low-level
messaging with the goal to make low-leve our low-level messaging routines
usable also for other projects which are not based on tevent.
---
 source3/lib/poll_funcs/poll_funcs.h        |  131 +++++++++++++++++++++++++
 source3/lib/poll_funcs/poll_funcs_tevent.c |  143 ++++++++++++++++++++++++++++
 source3/lib/poll_funcs/poll_funcs_tevent.h |   26 +++++
 source3/lib/poll_funcs/wscript_build       |    5 +
 source3/wscript_build                      |    1 +
 5 files changed, 306 insertions(+)
 create mode 100644 source3/lib/poll_funcs/poll_funcs.h
 create mode 100644 source3/lib/poll_funcs/poll_funcs_tevent.c
 create mode 100644 source3/lib/poll_funcs/poll_funcs_tevent.h
 create mode 100644 source3/lib/poll_funcs/wscript_build

diff --git a/source3/lib/poll_funcs/poll_funcs.h b/source3/lib/poll_funcs/poll_funcs.h
new file mode 100644
index 0000000..b23e7d9
--- /dev/null
+++ b/source3/lib/poll_funcs/poll_funcs.h
@@ -0,0 +1,131 @@
+/*
+ * Unix SMB/CIFS implementation.
+ * Copyright (C) Volker Lendecke 2013
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+/**
+ * @file poll_funcs.h
+ *
+ * @brief event loop abstraction
+ */
+
+/*
+ * This is inspired by AvahiWatch, the avahi event loop abstraction.
+ */
+
+#ifndef __POLL_FUNCS_H__
+#define __POLL_FUNCS_H__
+
+#include "replace.h"
+
+/**
+ * poll_watch and poll_timeout are undefined here, every implementation can
+ * implement its own structures.
+ */
+
+struct poll_watch;
+struct poll_timeout;
+
+struct poll_funcs {
+
+	/**
+	 * @brief Create a new file descriptor watch
+	 *
+	 * @param[in] funcs The callback array
+	 * @param[in] fd The fd to watch
+	 * @param[in] events POLLIN and POLLOUT or'ed together
+	 * @param[in] callback Function to call by the implementation
+	 * @param[in] private_data Pointer to give back to callback
+	 *
+	 * @return A new poll_watch struct
+	 */
+
+	struct poll_watch *(*watch_new)(
+		const struct poll_funcs *funcs, int fd, short events,
+		void (*callback)(struct poll_watch *w, int fd,
+				 short events, void *private_data),
+		void *private_data);
+
+	/**
+	 * @brief Change the watched events for a struct poll_watch
+	 *
+	 * @param[in] w The poll_watch to change
+	 * @param[in] events new POLLIN and POLLOUT or'ed together
+	 */
+
+	void (*watch_update)(struct poll_watch *w, short events);
+
+	/**
+	 * @brief Read events currently watched
+	 *
+	 * @param[in] w The poll_watch to inspect
+	 *
+	 * @returns The events currently watched
+	 */
+
+	short (*watch_get_events)(struct poll_watch *w);
+
+	/**
+	 * @brief Free a struct poll_watch
+	 *
+	 * @param[in] w The poll_watch struct to free
+	 */
+
+	void (*watch_free)(struct poll_watch *w);
+
+
+	/**
+	 * @brief Create a new timeout watch
+	 *
+	 * @param[in] funcs The callback array
+	 * @param[in] tv The time when the timeout should trigger
+	 * @param[in] callback Function to call at time "ts"
+	 * @param[in] private_data Pointer to give back to callback
+	 *
+	 * @return A new poll_timeout struct
+	 */
+
+	struct poll_timeout *(*timeout_new)(
+		const struct poll_funcs *funcs, const struct timeval *tv,
+		void (*callback)(struct poll_timeout *t, void *private_data),
+		void *private_data);
+
+	/**
+	 * @brief Change the timeout of a watch
+	 *
+	 * @param[in] t The timeout watch to change
+	 * @param[in] ts The new trigger time
+	 */
+
+	void (*timeout_update)(struct poll_timeout *t,
+			       const struct timespec *ts);
+
+	/**
+	 * @brief Free a poll_timeout
+	 *
+	 * @param[in] t The poll_timeout to free
+	 */
+
+	void (*timeout_free)(struct poll_timeout *t);
+
+	/**
+	 * @brief private data for use by the implmentation
+	 */
+
+	void *private_data;
+};
+
+#endif
diff --git a/source3/lib/poll_funcs/poll_funcs_tevent.c b/source3/lib/poll_funcs/poll_funcs_tevent.c
new file mode 100644
index 0000000..05177c9
--- /dev/null
+++ b/source3/lib/poll_funcs/poll_funcs_tevent.c
@@ -0,0 +1,143 @@
+/*
+ * Unix SMB/CIFS implementation.
+ * Copyright (C) Volker Lendecke 2013
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "poll_funcs_tevent.h"
+#include "tevent.h"
+#include "system/select.h"
+
+struct poll_watch {
+	struct tevent_fd *fde;
+	int fd;
+	void (*callback)(struct poll_watch *w, int fd, short events,
+			 void *private_data);
+	void *private_data;
+};
+
+static uint16_t poll_events_to_tevent(short events)
+{
+	uint16_t ret = 0;
+
+	if (events & POLLIN) {
+		ret |= TEVENT_FD_READ;
+	}
+	if (events & POLLOUT) {
+		ret |= TEVENT_FD_WRITE;
+	}
+	return ret;
+}
+
+static short tevent_to_poll_events(uint16_t flags)
+{
+	short ret = 0;
+
+	if (flags & TEVENT_FD_READ) {
+		ret |= POLLIN;
+	}
+	if (flags & TEVENT_FD_WRITE) {
+		ret |= POLLOUT;
+	}
+	return ret;
+}
+
+static void tevent_watch_handler(struct tevent_context *ev,
+				 struct tevent_fd *fde, uint16_t flags,
+				 void *private_data);
+
+static struct poll_watch *tevent_watch_new(
+	const struct poll_funcs *funcs, int fd, short events,
+	void (*callback)(struct poll_watch *w, int fd, short events,
+			 void *private_data),
+	void *private_data)
+{
+	struct tevent_context *ev = talloc_get_type_abort(
+		funcs->private_data, struct tevent_context);
+	struct poll_watch *w;
+
+	w = talloc(ev, struct poll_watch);
+	if (w == NULL) {
+		return NULL;
+	}
+	w->fde = tevent_add_fd(ev, w, fd, poll_events_to_tevent(events),
+			       tevent_watch_handler, w);
+	if (w->fde == NULL) {
+		TALLOC_FREE(w);
+		return NULL;
+	}
+	w->fd = fd;
+	w->callback = callback;
+	w->private_data = private_data;
+	return w;
+}
+
+static void tevent_watch_handler(struct tevent_context *ev,
+				 struct tevent_fd *fde, uint16_t flags,
+				 void *private_data)
+{
+	struct poll_watch *w = talloc_get_type_abort(
+		private_data, struct poll_watch);
+
+	w->callback(w, w->fd, tevent_to_poll_events(flags),
+		    w->private_data);
+}
+
+static void tevent_watch_update(struct poll_watch *w, short events)
+{
+	tevent_fd_set_flags(w->fde, poll_events_to_tevent(events));
+}
+
+static short tevent_watch_get_events(struct poll_watch *w)
+{
+	return tevent_to_poll_events(tevent_fd_get_flags(w->fde));
+}
+
+static void tevent_watch_free(struct poll_watch *w)
+{
+	TALLOC_FREE(w);
+}
+
+
+static struct poll_timeout *tevent_timeout_new(
+	const struct poll_funcs *funcs, const struct timeval *tv,
+	void (*callback)(struct poll_timeout *t, void *private_data),
+	void *private_data)
+{
+	/* not implemented yet */
+	return NULL;
+}
+
+static void tevent_timeout_update(struct poll_timeout *t,
+				  const struct timespec *ts)
+{
+	return;
+}
+
+static void tevent_timeout_free(struct poll_timeout *t)
+{
+	return;
+}
+
+void poll_funcs_init_tevent(struct poll_funcs *f)
+{
+	f->watch_new = tevent_watch_new;
+	f->watch_update = tevent_watch_update;
+	f->watch_get_events = tevent_watch_get_events;
+	f->watch_free = tevent_watch_free;
+	f->timeout_new = tevent_timeout_new;
+	f->timeout_update = tevent_timeout_update;
+	f->timeout_free = tevent_timeout_free;
+}
diff --git a/source3/lib/poll_funcs/poll_funcs_tevent.h b/source3/lib/poll_funcs/poll_funcs_tevent.h
new file mode 100644
index 0000000..72c3892
--- /dev/null
+++ b/source3/lib/poll_funcs/poll_funcs_tevent.h
@@ -0,0 +1,26 @@
+/*
+ * Unix SMB/CIFS implementation.
+ * Copyright (C) Volker Lendecke 2013
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef __POLL_FUNCS_TEVENT_H__
+#define __POLL_FUNCS_TEVENT_H__
+
+#include "poll_funcs.h"
+
+void poll_funcs_init_tevent(struct poll_funcs *f);
+
+#endif
diff --git a/source3/lib/poll_funcs/wscript_build b/source3/lib/poll_funcs/wscript_build
new file mode 100644
index 0000000..ab24814
--- /dev/null
+++ b/source3/lib/poll_funcs/wscript_build
@@ -0,0 +1,5 @@
+#!/usr/bin/env python
+
+bld.SAMBA3_SUBSYSTEM('POLL_FUNCS_TEVENT',
+                     source='poll_funcs_tevent.c',
+		     deps='tevent')
diff --git a/source3/wscript_build b/source3/wscript_build
index b872cbc..fd53e2f 100755
--- a/source3/wscript_build
+++ b/source3/wscript_build
@@ -1452,6 +1452,7 @@ bld.RECURSE('auth')
 bld.RECURSE('libgpo/gpext')
 bld.RECURSE('lib/pthreadpool')
 bld.RECURSE('lib/asys')
+bld.RECURSE('lib/poll_funcs')
 bld.RECURSE('librpc')
 bld.RECURSE('librpc/idl')
 bld.RECURSE('libsmb')
-- 
1.7.9.5


>From 8cb5a6947b75fab9ee8c5b242f8f49ac6de23108 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 24 Feb 2014 11:48:16 +0000
Subject: [PATCH 02/13] lib: Add unix_msg

This is a messaging layer based on unix domain datagram sockets.

Sending to an idle socket is just one single nonblocking sendmsg call. If the
recv queue is full, we start a background thread to do a blocking call. The
source4 based imessaging uses a polling fallback. In a situation where
thousands of senders beat one single blocked socket, this will generate load on
the system due to the constant polling. This does not happen with a threaded
blocking send call.

The threaded approach has another advantage: We save become_root() calls on the
retries. The access checks are done when the blocking socket is connected, the
threaded blocking send call does not check permissions anymore.
---
 source3/lib/unix_msg/test_drain.c  |   71 ++++
 source3/lib/unix_msg/test_source.c |   80 ++++
 source3/lib/unix_msg/tests.c       |  226 +++++++++++
 source3/lib/unix_msg/unix_msg.c    |  766 ++++++++++++++++++++++++++++++++++++
 source3/lib/unix_msg/unix_msg.h    |  107 +++++
 source3/lib/unix_msg/wscript_build |   18 +
 source3/wscript_build              |    1 +
 7 files changed, 1269 insertions(+)
 create mode 100644 source3/lib/unix_msg/test_drain.c
 create mode 100644 source3/lib/unix_msg/test_source.c
 create mode 100644 source3/lib/unix_msg/tests.c
 create mode 100644 source3/lib/unix_msg/unix_msg.c
 create mode 100644 source3/lib/unix_msg/unix_msg.h
 create mode 100644 source3/lib/unix_msg/wscript_build

diff --git a/source3/lib/unix_msg/test_drain.c b/source3/lib/unix_msg/test_drain.c
new file mode 100644
index 0000000..a6487fd
--- /dev/null
+++ b/source3/lib/unix_msg/test_drain.c
@@ -0,0 +1,71 @@
+#include "replace.h"
+#include "unix_msg.h"
+#include "poll_funcs/poll_funcs_tevent.h"
+#include "tevent.h"
+#include "system/select.h"
+
+struct cb_state {
+	unsigned num_received;
+	uint8_t *buf;
+	size_t buflen;
+};
+
+static void recv_cb(struct unix_msg_ctx *ctx,
+		    uint8_t *msg, size_t msg_len,
+		    void *private_data);
+
+int main(int argc, const char *argv[])
+{
+	struct poll_funcs funcs;
+	const char *sock;
+	struct unix_msg_ctx *ctx;
+	struct tevent_context *ev;
+	int ret;
+
+	struct cb_state state;
+
+	if (argc != 2) {
+		fprintf(stderr, "Usage: %s <sockname>\n", argv[0]);
+		return 1;
+	}
+
+	sock = argv[1];
+	unlink(sock);
+
+	ev = tevent_context_init(NULL);
+	if (ev == NULL) {
+		perror("tevent_context_init failed");
+		return 1;
+	}
+	poll_funcs_init_tevent(&funcs);
+	funcs.private_data = ev;
+
+	ret = unix_msg_init(sock, &funcs, 256, 1,
+			    recv_cb, &state, &ctx);
+	if (ret != 0) {
+		fprintf(stderr, "unix_msg_init failed: %s\n",
+			strerror(ret));
+		return 1;
+	}
+
+	while (1) {
+		ret = tevent_loop_once(ev);
+		if (ret == -1) {
+			fprintf(stderr, "tevent_loop_once failed: %s\n",
+				strerror(errno));
+			exit(1);
+		}
+	}
+	return 0;
+}
+
+static void recv_cb(struct unix_msg_ctx *ctx,
+		    uint8_t *msg, size_t msg_len,
+		    void *private_data)
+{
+	unsigned num;
+	if (msg_len == sizeof(num)) {
+		memcpy(&num, msg, msg_len);
+		printf("%u\n", num);
+	}
+}
diff --git a/source3/lib/unix_msg/test_source.c b/source3/lib/unix_msg/test_source.c
new file mode 100644
index 0000000..e25b5c2
--- /dev/null
+++ b/source3/lib/unix_msg/test_source.c
@@ -0,0 +1,80 @@
+#include "replace.h"
+#include "unix_msg.h"
+#include "poll_funcs/poll_funcs_tevent.h"
+#include "tevent.h"
+
+int main(int argc, const char *argv[])
+{
+	struct poll_funcs funcs;
+	struct unix_msg_ctx **ctxs;
+	struct tevent_context *ev;
+	struct iovec iov;
+	int ret;
+	unsigned i;
+	unsigned num_ctxs = 1;
+
+	if (argc < 2) {
+		fprintf(stderr, "Usage: %s <sockname> [num_contexts]\n", argv[0]);
+		return 1;
+	}
+	if (argc > 2) {
+		num_ctxs = atoi(argv[2]);
+	}
+
+	ev = tevent_context_init(NULL);
+	if (ev == NULL) {
+		perror("tevent_context_init failed");
+		return 1;
+	}
+	poll_funcs_init_tevent(&funcs);
+	funcs.private_data = ev;
+
+	ctxs = talloc_array(ev, struct unix_msg_ctx *, num_ctxs);
+	if (ctxs == NULL) {
+		fprintf(stderr, "talloc failed\n");
+		return 1;
+	}
+
+	for (i=0; i<num_ctxs; i++) {
+		ret = unix_msg_init(NULL, &funcs, 256, 1, NULL, NULL,
+				    &ctxs[i]);
+		if (ret != 0) {
+			fprintf(stderr, "unix_msg_init failed: %s\n",
+				strerror(ret));
+			return 1;
+		}
+	}
+
+	iov.iov_base = &i;
+	iov.iov_len = sizeof(i);
+
+	for (i=0; i<num_ctxs; i++) {
+		unsigned j;
+
+		for (j=0; j<100000; j++) {
+			ret = unix_msg_send(ctxs[i], argv[1], &iov, 1);
+			if (ret != 0) {
+				fprintf(stderr, "unix_msg_send failed: %s\n",
+					strerror(ret));
+				return 1;
+			}
+		}
+	}
+
+	while (true) {
+		ret = tevent_loop_once(ev);
+		if (ret == -1) {
+			fprintf(stderr, "tevent_loop_once failed: %s\n",
+				strerror(errno));
+			exit(1);
+		}
+	}
+
+	for (i=0; i<num_ctxs; i++) {
+		unix_msg_free(ctxs[i]);
+	}
+
+	talloc_free(ev);
+
+	return 0;
+}
diff --git a/source3/lib/unix_msg/tests.c b/source3/lib/unix_msg/tests.c
new file mode 100644
index 0000000..0a39773
--- /dev/null
+++ b/source3/lib/unix_msg/tests.c
@@ -0,0 +1,226 @@
+#include "replace.h"
+#include "unix_msg.h"
+#include "poll_funcs/poll_funcs_tevent.h"
+#include "tevent.h"
+
+struct cb_state {
+	unsigned num_received;
+	uint8_t *buf;
+	size_t buflen;
+};
+
+static void recv_cb(struct unix_msg_ctx *ctx,
+		    uint8_t *msg, size_t msg_len,
+		    void *private_data);
+
+static void expect_messages(struct tevent_context *ev, struct cb_state *state,
+			    unsigned num_msgs)
+{
+	state->num_received = 0;
+
+	while (state->num_received < num_msgs) {
+		int ret;
+
+		ret = tevent_loop_once(ev);
+		if (ret == -1) {
+			fprintf(stderr, "tevent_loop_once failed: %s\n",
+				strerror(errno));
+			exit(1);
+		}
+	}
+}
+
+int main(void)
+{
+	struct poll_funcs funcs;
+	const char *sock1 = "sock1";
+	const char *sock2 = "sock2";
+	struct unix_msg_ctx *ctx1, *ctx2;
+	struct tevent_context *ev;
+	struct iovec iov;
+	uint8_t msg;
+	int i, ret;
+	static uint8_t buf[1755];
+
+	struct cb_state state;
+
+	unlink(sock1);
+	unlink(sock2);
+
+	ev = tevent_context_init(NULL);
+	if (ev == NULL) {
+		perror("tevent_context_init failed");
+		return 1;
+	}
+	poll_funcs_init_tevent(&funcs);
+	funcs.private_data = ev;
+
+	ret = unix_msg_init(sock1, &funcs, 256, 1,
+			    recv_cb, &state, &ctx1);
+	if (ret != 0) {
+		fprintf(stderr, "unix_msg_init failed: %s\n",
+			strerror(ret));
+		return 1;
+	}
+
+	ret = unix_msg_init(sock1, &funcs, 256, 1,
+			    recv_cb, &state, &ctx1);
+	if (ret == 0) {
+		fprintf(stderr, "unix_msg_init succeeded unexpectedly\n");
+		return 1;
+	}
+	if (ret != EADDRINUSE) {
+		fprintf(stderr, "unix_msg_init returned %s, expected "
+			"EADDRINUSE\n", strerror(ret));
+		return 1;
+	}
+
+	ret = unix_msg_init(sock2, &funcs, 256, 1,
+			    recv_cb, &state, &ctx2);
+	if (ret != 0) {
+		fprintf(stderr, "unix_msg_init failed: %s\n",
+			strerror(ret));
+		return 1;
+	}
+
+	printf("sending a 0-length message\n");
+
+	state.buf = NULL;
+	state.buflen = 0;
+
+	ret = unix_msg_send(ctx1, sock2, NULL, 0);
+	if (ret != 0) {
+		fprintf(stderr, "unix_msg_send failed: %s\n",
+			strerror(ret));
+		return 1;
+	}
+
+	expect_messages(ev, &state, 1);
+
+	printf("sending a small message\n");
+
+	msg = random();
+	iov.iov_base = &msg;
+	iov.iov_len = sizeof(msg);
+	state.buf = &msg;
+	state.buflen = sizeof(msg);
+
+	ret = unix_msg_send(ctx1, sock2, &iov, 1);
+	if (ret != 0) {
+		fprintf(stderr, "unix_msg_send failed: %s\n",
+			strerror(ret));
+		return 1;
+	}
+
+	expect_messages(ev, &state, 1);
+
+	printf("sending six large, interleaved messages\n");
+
+	for (i=0; i<sizeof(buf); i++) {
+		buf[i] = random();
+	}
+
+	iov.iov_base = buf;
+	iov.iov_len = sizeof(buf);
+	state.buf = buf;
+	state.buflen = sizeof(buf);
+
+	for (i=0; i<3; i++) {
+		ret = unix_msg_send(ctx1, sock2, &iov, 1);
+		if (ret != 0) {
+			fprintf(stderr, "unix_msg_send failed: %s\n",
+				strerror(ret));
+			return 1;
+		}
+		ret = unix_msg_send(ctx2, sock2, &iov, 1);
+		if (ret != 0) {
+			fprintf(stderr, "unix_msg_send failed: %s\n",
+				strerror(ret));
+			return 1;
+		}
+	}
+
+	expect_messages(ev, &state, 6);
+
+	printf("sending a few messages in small pieces\n");
+
+	for (i = 0; i<5; i++) {
+		struct iovec iovs[20];
+		const size_t num_iovs = ARRAY_SIZE(iovs);
+		uint8_t *p = buf;
+		size_t j;
+
+		for (j=0; j<num_iovs-1; j++) {
+			size_t chunk = (random() % ((sizeof(buf) * 2) / num_iovs));
+			size_t space = (sizeof(buf) - (p - buf));
+
+			if (space == 0) {
+				break;
+			}
+
+			chunk = MIN(chunk, space);
+
+			iovs[j].iov_base = p;
+			iovs[j].iov_len = chunk;
+			p += chunk;
+		}
+
+		if (p < (buf + sizeof(buf))) {
+			iovs[j].iov_base = p;
+			iovs[j].iov_len = (sizeof(buf) - (p - buf));
+			j++;
+		}
+
+		ret = unix_msg_send(ctx1, sock1, iovs, j);
+		if (ret != 0) {
+			fprintf(stderr, "unix_msg_send failed: %s\n",
+				strerror(ret));
+			return 1;
+		}
+	}
+
+	expect_messages(ev, &state, 5);
+
+	printf("Filling send queues before freeing\n");
+
+	for (i=0; i<5; i++) {
+		ret = unix_msg_send(ctx1, sock2, &iov, 1);
+		if (ret != 0) {
+			fprintf(stderr, "unix_msg_send failed: %s\n",
+				strerror(ret));
+			return 1;
+		}
+		ret = unix_msg_send(ctx1, sock1, &iov, 1);
+		if (ret != 0) {
+			fprintf(stderr, "unix_msg_send failed: %s\n",
+				strerror(ret));
+			return 1;
+		}
+	}
+
+	expect_messages(ev, &state, 1); /* Read just one msg */
+
+	unix_msg_free(ctx1);
+	unix_msg_free(ctx2);
+	talloc_free(ev);
+
+	return 0;
+}
+
+static void recv_cb(struct unix_msg_ctx *ctx,
+		    uint8_t *msg, size_t msg_len,
+		    void *private_data)
+{
+	struct cb_state *state = (struct cb_state *)private_data;
+
+	if (msg_len != state->buflen) {
+		fprintf(stderr, "expected %u bytes, got %u\n",
+			(unsigned)state->buflen, (unsigned)msg_len);
+		exit(1);
+	}
+	if ((msg_len != 0) && (memcmp(msg, state->buf, msg_len) != 0)) {
+		fprintf(stderr, "message content differs\n");
+		exit(1);
+	}
+	state->num_received += 1;
+}
diff --git a/source3/lib/unix_msg/unix_msg.c b/source3/lib/unix_msg/unix_msg.c
new file mode 100644
index 0000000..2b6a544
--- /dev/null
+++ b/source3/lib/unix_msg/unix_msg.c
@@ -0,0 +1,766 @@
+/*
+ * Unix SMB/CIFS implementation.
+ * Copyright (C) Volker Lendecke 2013
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "unix_msg.h"
+#include "system/select.h"
+#include "system/time.h"
+#include "dlinklist.h"
+#include "pthreadpool/pthreadpool.h"
+
+/*
+ * This file implements two abstractions: The "unix_dgram" functions implement
+ * queueing for unix domain datagram sockets. You can send to a destination
+ * socket, and if that has no free space available, it will fall back to an
+ * anonymous socket that will poll for writability. "unix_dgram" expects the
+ * data size not to exceed the system limit.
+ *
+ * The "unix_msg" functions implement the fragmentation of large messages on
+ * top of "unix_dgram". This is what is exposed to the user of this API.
+ */
+
+struct unix_dgram_msg {
+	struct unix_dgram_msg *prev, *next;
+
+	int sock;
+	ssize_t sent;
+	int sys_errno;
+	size_t buflen;
+	uint8_t buf[1];
+};
+
+struct unix_dgram_send_queue {
+	struct unix_dgram_send_queue *prev, *next;
+	struct unix_dgram_ctx *ctx;
+	int sock;
+	struct unix_dgram_msg *msgs;
+	char path[1];
+};
+
+struct unix_dgram_ctx {
+	int sock;
+	pid_t created_pid;
+	const struct poll_funcs *ev_funcs;
+	size_t max_msg;
+
+	void (*recv_callback)(struct unix_dgram_ctx *ctx,
+			      uint8_t *msg, size_t msg_len,
+			      void *private_data);
+	void *private_data;
+
+	struct poll_watch *sock_read_watch;
+	struct unix_dgram_send_queue *send_queues;
+
+	struct pthreadpool *send_pool;
+	struct poll_watch *pool_read_watch;
+
+	uint8_t *recv_buf;
+	char path[1];
+};
+
+static ssize_t iov_buflen(const struct iovec *iov, int iovlen);
+static void unix_dgram_recv_handler(struct poll_watch *w, int fd, short events,
+				    void *private_data);
+
+static int unix_dgram_init(const char *path, size_t max_msg,
+			   const struct poll_funcs *ev_funcs,
+			   void (*recv_callback)(struct unix_dgram_ctx *ctx,
+						 uint8_t *msg, size_t msg_len,
+						 void *private_data),
+			   void *private_data,
+			   struct unix_dgram_ctx **result)
+{
+	struct unix_dgram_ctx *ctx;
+	struct sockaddr_un addr = { 0, };
+	size_t pathlen;
+	int ret;
+
+	if (path != NULL) {
+		pathlen = strlen(path)+1;
+		if (pathlen > sizeof(addr.sun_path)) {
+			return ENAMETOOLONG;
+		}
+	} else {
+		pathlen = 1;
+	}
+
+	ctx = malloc(offsetof(struct unix_dgram_ctx, path) + pathlen);
+	if (ctx == NULL) {
+		return ENOMEM;
+	}
+	if (path != NULL) {
+		memcpy(ctx->path, path, pathlen);
+	} else {
+		ctx->path[0] = '\0';
+	}
+
+	ctx->recv_buf = malloc(max_msg);
+	if (ctx->recv_buf == NULL) {
+		free(ctx);
+		return ENOMEM;
+	}
+	ctx->max_msg = max_msg;
+	ctx->ev_funcs = ev_funcs;
+	ctx->recv_callback = recv_callback;
+	ctx->private_data = private_data;
+	ctx->sock_read_watch = NULL;
+	ctx->send_pool = NULL;
+	ctx->pool_read_watch = NULL;
+	ctx->send_queues = NULL;
+
+	ctx->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
+	if (ctx->sock == -1) {
+		ret = errno;
+		goto fail_free;
+	}
+
+	ctx->created_pid = getpid();
+
+	if (path != NULL) {
+		addr.sun_family = AF_UNIX;
+		memcpy(addr.sun_path, path, pathlen);
+
+		ret = bind(ctx->sock, (struct sockaddr *)&addr, sizeof(addr));
+		if (ret == -1) {
+			ret = errno;
+			goto fail_close;
+		}
+
+		ctx->sock_read_watch = ctx->ev_funcs->watch_new(
+			ctx->ev_funcs, ctx->sock, POLLIN,
+			unix_dgram_recv_handler, ctx);
+
+		if (ctx->sock_read_watch == NULL) {
+			ret = ENOMEM;
+			goto fail_close;
+		}
+	}
+
+	*result = ctx;
+	return 0;
+
+fail_close:
+	close(ctx->sock);
+fail_free:
+	free(ctx->recv_buf);
+	free(ctx);
+	return ret;
+}
+
+static void unix_dgram_recv_handler(struct poll_watch *w, int fd, short events,
+				    void *private_data)
+{
+	struct unix_dgram_ctx *ctx = (struct unix_dgram_ctx *)private_data;
+	ssize_t received;
+
+	received = recv(fd, ctx->recv_buf, ctx->max_msg, MSG_DONTWAIT);
+	if (received == -1) {
+		return;
+	}
+	if (received > ctx->max_msg) {
+		/* More than we expected, not for us */
+		return;
+	}
+	ctx->recv_callback(ctx, ctx->recv_buf, received, ctx->private_data);
+}
+
+static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
+				    void *private_data);
+
+static int unix_dgram_init_pthreadpool(struct unix_dgram_ctx *ctx)
+{
+	int ret, signalfd;
+
+	if (ctx->send_pool != NULL) {
+		return 0;
+	}
+
+	ret = pthreadpool_init(0, &ctx->send_pool);
+	if (ret != 0) {
+		return ret;
+	}
+
+	signalfd = pthreadpool_signal_fd(ctx->send_pool);
+
+	ctx->pool_read_watch = ctx->ev_funcs->watch_new(
+		ctx->ev_funcs, signalfd, POLLIN,
+		unix_dgram_job_finished, ctx);
+	if (ctx->pool_read_watch == NULL) {
+		pthreadpool_destroy(ctx->send_pool);
+		ctx->send_pool = NULL;
+		return ENOMEM;
+	}
+
+	return 0;
+}
+
+static int unix_dgram_send_queue_init(
+	struct unix_dgram_ctx *ctx, const char *path,
+	struct unix_dgram_send_queue **result)
+{
+	struct unix_dgram_send_queue *q;
+	struct sockaddr_un addr = { 0, };
+	size_t pathlen;
+	int ret, err;
+
+	pathlen = strlen(path)+1;
+
+	if (pathlen > sizeof(addr.sun_path)) {
+		return ENAMETOOLONG;
+	}
+
+	q = malloc(offsetof(struct unix_dgram_send_queue, path) + pathlen);
+	if (q == NULL) {
+		return ENOMEM;
+	}
+	q->ctx = ctx;
+	q->msgs = NULL;
+	memcpy(q->path, path, pathlen);
+
+	q->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
+	if (q->sock == -1) {
+		err = errno;
+		goto fail_free;
+	}
+
+	addr.sun_family = AF_UNIX;
+	memcpy(addr.sun_path, path, pathlen+1);
+
+	ret = connect(q->sock, (struct sockaddr *)&addr, sizeof(addr));
+	if (ret == -1) {
+		err = errno;
+		goto fail_close;
+	}
+
+	err = unix_dgram_init_pthreadpool(ctx);
+	if (err != 0) {
+		goto fail_close;
+	}
+
+	DLIST_ADD(ctx->send_queues, q);
+
+	*result = q;
+	return 0;
+
+fail_close:
+	close(q->sock);
+fail_free:
+	free(q);
+	return err;
+}
+
+static void unix_dgram_send_queue_free(struct unix_dgram_send_queue *q)
+{
+	struct unix_dgram_ctx *ctx = q->ctx;
+
+	while (q->msgs != NULL) {
+		struct unix_dgram_msg *msg;
+		msg = q->msgs;
+		DLIST_REMOVE(q->msgs, msg);
+		free(msg);
+	}
+	close(q->sock);
+	DLIST_REMOVE(ctx->send_queues, q);
+	free(q);
+}
+
+static struct unix_dgram_send_queue *find_send_queue(
+	struct unix_dgram_ctx *ctx, const char *dst_sock)
+{
+	struct unix_dgram_send_queue *s;
+
+	for (s = ctx->send_queues; s != NULL; s = s->next) {
+		if (strcmp(s->path, dst_sock) == 0) {
+			return s;
+		}
+	}
+	return NULL;
+}
+
+static int queue_msg(struct unix_dgram_send_queue *q,
+		     const struct iovec *iov, int iovlen)
+{
+	struct unix_dgram_msg *msg;
+	ssize_t buflen;
+	size_t msglen;
+	int i;
+
+	buflen = iov_buflen(iov, iovlen);
+	if (buflen == -1) {
+		return EINVAL;
+	}
+
+	msglen = offsetof(struct unix_dgram_msg, buf) + buflen;
+	if ((msglen < buflen) ||
+	    (msglen < offsetof(struct unix_dgram_msg, buf))) {
+		/* overflow */
+		return EINVAL;
+	}
+
+	msg = malloc(msglen);
+	if (msg == NULL) {
+		return ENOMEM;
+	}
+	msg->buflen = buflen;
+	msg->sock = q->sock;
+
+	buflen = 0;
+	for (i=0; i<iovlen; i++) {
+		memcpy(&msg->buf[buflen], iov[i].iov_base, iov[i].iov_len);
+		buflen += iov[i].iov_len;
+	}
+
+	DLIST_ADD_END(q->msgs, msg, struct unix_dgram_msg);
+	return 0;
+}
+
+static void unix_dgram_send_job(void *private_data)
+{
+	struct unix_dgram_msg *msg = private_data;
+	msg->sent = send(msg->sock, msg->buf, msg->buflen, 0);
+}
+
+static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
+				    void *private_data)
+{
+	struct unix_dgram_ctx *ctx = private_data;
+	struct unix_dgram_send_queue *q;
+	struct unix_dgram_msg *msg;
+	int ret, job;
+
+	ret = pthreadpool_finished_jobs(ctx->send_pool, &job, 1);
+	if (ret != 1) {
+		return;
+	}
+
+	for (q = ctx->send_queues; q != NULL; q = q->next) {
+		if (job == q->sock) {
+			break;
+		}
+	}
+
+	if (q == NULL) {
+		/* Huh? Should not happen */
+		return;
+	}
+
+	msg = q->msgs;
+	DLIST_REMOVE(q->msgs, msg);
+	free(msg);
+
+	if (q->msgs != NULL) {
+		ret = pthreadpool_add_job(ctx->send_pool, q->sock,
+					  unix_dgram_send_job, q->msgs);
+		if (ret == 0) {
+			return;
+		}
+	}
+
+	unix_dgram_send_queue_free(q);
+}
+
+static int unix_dgram_send(struct unix_dgram_ctx *ctx, const char *dst_sock,
+			   const struct iovec *iov, int iovlen)
+{
+	struct unix_dgram_send_queue *q;
+	struct sockaddr_un addr = { 0, };
+	struct msghdr msg;
+	size_t dst_len;
+	int ret;
+
+	dst_len = strlen(dst_sock);
+	if (dst_len >= sizeof(addr.sun_path)) {
+		return ENAMETOOLONG;
+	}
+
+	/*
+	 * To preserve message ordering, we have to queue a message when
+	 * others are waiting in line already.
+	 */
+	q = find_send_queue(ctx, dst_sock);
+	if (q != NULL) {
+		return queue_msg(q, iov, iovlen);
+	}
+
+	/*
+	 * Try a cheap nonblocking send
+	 */
+
+	addr.sun_family = AF_UNIX;
+	memcpy(addr.sun_path, dst_sock, dst_len);
+
+	msg.msg_name = &addr;
+	msg.msg_namelen = sizeof(addr);
+	msg.msg_iov = discard_const_p(struct iovec, iov);
+	msg.msg_iovlen = iovlen;
+	msg.msg_control = NULL;
+	msg.msg_controllen = 0;
+	msg.msg_flags = 0;
+
+	ret = sendmsg(ctx->sock, &msg, MSG_DONTWAIT);
+	if (ret >= 0) {
+		return 0;
+	}
+	if (errno != EWOULDBLOCK) {
+		return errno;
+	}
+
+	ret = unix_dgram_send_queue_init(ctx, dst_sock, &q);
+	if (ret != 0) {
+		return ret;
+	}
+	ret = queue_msg(q, iov, iovlen);
+	if (ret != 0) {
+		unix_dgram_send_queue_free(q);
+		return ret;
+	}
+	ret = pthreadpool_add_job(ctx->send_pool, q->sock,
+				  unix_dgram_send_job, q->msgs);
+	if (ret != 0) {
+		unix_dgram_send_queue_free(q);
+		return ret;
+	}
+	return 0;
+}
+
+static int unix_dgram_sock(struct unix_dgram_ctx *ctx)
+{
+	return ctx->sock;
+}
+
+static int unix_dgram_free(struct unix_dgram_ctx *ctx)
+{
+	if (ctx->send_queues != NULL) {
+		return EBUSY;
+	}
+
+	if (ctx->send_pool != NULL) {
+		int ret = pthreadpool_destroy(ctx->send_pool);
+		if (ret != 0) {
+			return ret;
+		}
+		ctx->ev_funcs->watch_free(ctx->pool_read_watch);
+	}
+
+	ctx->ev_funcs->watch_free(ctx->sock_read_watch);
+
+	if (getpid() == ctx->created_pid) {
+		/* If we created it, unlink. Otherwise someone else might
+		 * still have it open */
+		unlink(ctx->path);
+	}
+
+	close(ctx->sock);
+	free(ctx->recv_buf);
+	free(ctx);
+	return 0;
+}
+
+/*
+ * Every message starts with a uint64_t cookie.
+ *
+ * A value of 0 indicates a single-fragment message which is complete in
+ * itself. The data immediately follows the cookie.
+ *
+ * Every multi-fragment message has a cookie != 0 and starts with a cookie
+ * followed by a struct unix_msg_header and then the data. The pid and sock
+ * fields are used to assure uniqueness on the receiver side.
+ */
+
+struct unix_msg_hdr {
+	size_t msglen;
+	pid_t pid;
+	int sock;
+};
+
+struct unix_msg {
+	struct unix_msg *prev, *next;
+	size_t msglen;
+	size_t received;
+	pid_t sender_pid;
+	int sender_sock;
+	uint64_t cookie;
+	uint8_t buf[1];
+};
+
+struct unix_msg_ctx {
+	struct unix_dgram_ctx *dgram;
+	size_t fragment_len;
+	uint64_t cookie;
+
+	void (*recv_callback)(struct unix_msg_ctx *ctx,
+			      uint8_t *msg, size_t msg_len,
+			      void *private_data);
+	void *private_data;
+
+	struct unix_msg *msgs;
+};
+
+static void unix_msg_recv(struct unix_dgram_ctx *ctx,
+			  uint8_t *msg, size_t msg_len,
+			  void *private_data);
+
+int unix_msg_init(const char *path, const struct poll_funcs *ev_funcs,
+		  size_t fragment_len, uint64_t cookie,
+		  void (*recv_callback)(struct unix_msg_ctx *ctx,
+					uint8_t *msg, size_t msg_len,
+					void *private_data),
+		  void *private_data,
+		  struct unix_msg_ctx **result)
+{
+	struct unix_msg_ctx *ctx;
+	int ret;
+
+	ctx = malloc(sizeof(*ctx));
+	if (ctx == NULL) {
+		return ENOMEM;
+	}
+
+	ret = unix_dgram_init(path, fragment_len, ev_funcs,
+			      unix_msg_recv, ctx, &ctx->dgram);
+	if (ret != 0) {
+		free(ctx);
+		return ret;
+	}
+
+	ctx->fragment_len = fragment_len;
+	ctx->cookie = cookie;
+	ctx->recv_callback = recv_callback;
+	ctx->private_data = private_data;
+	ctx->msgs = NULL;
+
+	*result = ctx;
+	return 0;
+}
+
+int unix_msg_send(struct unix_msg_ctx *ctx, const char *dst_sock,
+		  const struct iovec *iov, int iovlen)
+{
+	ssize_t msglen;
+	size_t sent;
+	int ret = 0;
+	struct iovec *iov_copy;
+	struct unix_msg_hdr hdr;
+	struct iovec src_iov;
+
+	if (iovlen < 0) {
+		return EINVAL;
+	}
+
+	msglen = iov_buflen(iov, iovlen);
+	if (msglen == -1) {
+		return EINVAL;
+	}
+
+	if ((iovlen < 16) &&
+	    (msglen <= (ctx->fragment_len - sizeof(uint64_t)))) {
+		struct iovec tmp_iov[16];
+		uint64_t cookie = 0;
+
+		tmp_iov[0].iov_base = &cookie;
+		tmp_iov[0].iov_len = sizeof(cookie);
+		if (iovlen > 0) {
+			memcpy(&tmp_iov[1], iov,
+			       sizeof(struct iovec) * iovlen);
+		}
+
+		return unix_dgram_send(ctx->dgram, dst_sock, tmp_iov,
+				       iovlen+1);
+	}
+
+	hdr.msglen = msglen;
+	hdr.pid = getpid();
+	hdr.sock = unix_dgram_sock(ctx->dgram);
+
+	iov_copy = malloc(sizeof(struct iovec) * (iovlen + 2));
+	if (iov_copy == NULL) {
+		return ENOMEM;
+	}
+	iov_copy[0].iov_base = &ctx->cookie;
+	iov_copy[0].iov_len = sizeof(ctx->cookie);
+	iov_copy[1].iov_base = &hdr;
+	iov_copy[1].iov_len = sizeof(hdr);
+
+	sent = 0;
+	src_iov = iov[0];
+
+	/*
+	 * The following write loop sends the user message in pieces. We have
+	 * filled the first two iovecs above with "cookie" and "hdr". In the
+	 * following loops we pull message chunks from the user iov array and
+	 * fill iov_copy piece by piece, possibly truncating chunks from the
+	 * caller's iov array. Ugly, but hopefully efficient.
+	 */
+
+	while (sent < msglen) {
+		size_t fragment_len;
+		size_t iov_index = 2;
+
+		fragment_len = sizeof(ctx->cookie) + sizeof(hdr);
+
+		while (fragment_len < ctx->fragment_len) {
+			size_t space, chunk;
+
+			space = ctx->fragment_len - fragment_len;
+			chunk = MIN(space, src_iov.iov_len);
+
+			iov_copy[iov_index].iov_base = src_iov.iov_base;
+			iov_copy[iov_index].iov_len = chunk;
+			iov_index += 1;
+
+			src_iov.iov_base = (char *)src_iov.iov_base + chunk;
+			src_iov.iov_len -= chunk;
+			fragment_len += chunk;
+
+			if (src_iov.iov_len == 0) {
+				iov += 1;
+				iovlen -= 1;
+				if (iovlen == 0) {
+					break;
+				}
+				src_iov = iov[0];
+			}
+		}
+		sent += (fragment_len - sizeof(ctx->cookie) - sizeof(hdr));
+
+		ret = unix_dgram_send(ctx->dgram, dst_sock,
+				      iov_copy, iov_index);
+		if (ret != 0) {
+			break;
+		}
+	}
+
+	free(iov_copy);
+
+	ctx->cookie += 1;
+	if (ctx->cookie == 0) {
+		ctx->cookie += 1;
+	}
+
+	return ret;
+}
+
+static void unix_msg_recv(struct unix_dgram_ctx *dgram_ctx,
+			  uint8_t *buf, size_t buflen,
+			  void *private_data)
+{
+	struct unix_msg_ctx *ctx = (struct unix_msg_ctx *)private_data;
+	struct unix_msg_hdr hdr;
+	struct unix_msg *msg;
+	size_t space;
+	uint64_t cookie;
+
+	if (buflen < sizeof(cookie)) {
+		return;
+	}
+	memcpy(&cookie, buf, sizeof(cookie));
+
+	buf += sizeof(cookie);
+	buflen -= sizeof(cookie);
+
+	if (cookie == 0) {
+		ctx->recv_callback(ctx,	buf, buflen, ctx->private_data);
+		return;
+	}
+
+	if (buflen < sizeof(hdr)) {
+		return;
+	}
+	memcpy(&hdr, buf, sizeof(hdr));
+
+	buf += sizeof(hdr);
+	buflen -= sizeof(hdr);
+
+	for (msg = ctx->msgs; msg != NULL; msg = msg->next) {
+		if ((msg->sender_pid == hdr.pid) &&
+		    (msg->sender_sock == hdr.sock)) {
+			break;
+		}
+	}
+
+	if ((msg != NULL) && (msg->cookie != cookie)) {
+		DLIST_REMOVE(ctx->msgs, msg);
+		free(msg);
+		msg = NULL;
+	}
+
+	if (msg == NULL) {
+		msg = malloc(offsetof(struct unix_msg, buf) + hdr.msglen);
+		if (msg == NULL) {
+			return;
+		}
+		msg->msglen = hdr.msglen;
+		msg->received = 0;
+		msg->sender_pid = hdr.pid;
+		msg->sender_sock = hdr.sock;
+		msg->cookie = cookie;
+		DLIST_ADD(ctx->msgs, msg);
+	}
+
+	space = msg->msglen - msg->received;
+	if (buflen > space) {
+		return;
+	}
+
+	memcpy(msg->buf + msg->received, buf, buflen);
+	msg->received += buflen;
+
+	if (msg->received < msg->msglen) {
+		return;
+	}
+
+	DLIST_REMOVE(ctx->msgs, msg);
+	ctx->recv_callback(ctx, msg->buf, msg->msglen, ctx->private_data);
+	free(msg);
+}
+
+int unix_msg_free(struct unix_msg_ctx *ctx)
+{
+	int ret;
+
+	ret = unix_dgram_free(ctx->dgram);
+	if (ret != 0) {
+		return ret;
+	}
+
+	while (ctx->msgs != NULL) {
+		struct unix_msg *msg = ctx->msgs;
+		DLIST_REMOVE(ctx->msgs, msg);
+		free(msg);
+	}
+
+	free(ctx);
+	return 0;
+}
+
+static ssize_t iov_buflen(const struct iovec *iov, int iovlen)
+{
+	size_t buflen = 0;
+	int i;
+
+	for (i=0; i<iovlen; i++) {
+		size_t thislen = iov[i].iov_len;
+		size_t tmp = buflen + thislen;
+
+		if ((tmp < buflen) || (tmp < thislen)) {
+			/* overflow */
+			return -1;
+		}
+		buflen = tmp;
+	}
+	return buflen;
+}
diff --git a/source3/lib/unix_msg/unix_msg.h b/source3/lib/unix_msg/unix_msg.h
new file mode 100644
index 0000000..fc636d8
--- /dev/null
+++ b/source3/lib/unix_msg/unix_msg.h
@@ -0,0 +1,107 @@
+/*
+ * Unix SMB/CIFS implementation.
+ * Copyright (C) Volker Lendecke 2013
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef __UNIX_DGRAM_H__
+#define __UNIX_DGRAM_H__
+
+#include "replace.h"
+#include "poll_funcs/poll_funcs.h"
+#include "system/network.h"
+
+/**
+ * @file unix_msg.h
+ *
+ * @brief Send large messages over unix domain datagram sockets
+ *
+ * A unix_msg_ctx represents a unix domain datagram socket.
+ *
+ * Unix domain datagram sockets have some unique properties compared with UDP
+ * sockets:
+ *
+ * - They are reliable, i.e. as long as both sender and receiver are processes
+ *   that are alive, nothing is lost.
+ *
+ * - They preserve sequencing
+ *
+ * Based on these two properties, this code implements sending of large
+ * messages. It aims at being maximally efficient for short, single-datagram
+ * messages. Ideally, if the receiver queue is not full, sending a message
+ * should be a single syscall without malloc. Receiving a message should also
+ * not malloc anything before the data is shipped to the user.
+ *
+ * If unix_msg_send meets a full receive buffer, more effort is required: The
+ * socket behind unix_msg_send is not pollable for POLLOUT, it will always be
+ * writable: A datagram socket can send anywhere, the full queue is a property
+ * of of the receiving socket. unix_msg_send creates a new unnamed socket that
+ * it will connect(2) to the target socket. This unnamed socket is then
+ * pollable for POLLOUT. The socket will be writable when the destination
+ * socket's queue is drained sufficiently.
+ *
+ * If unix_msg_send is asked to send a message larger than fragment_size, it
+ * will try sending the message in pieces with proper framing, the receiving
+ * side will reassemble the messages.
+ */
+
+/**
+ * @brief Abstract structure representing a unix domain datagram socket
+ */
+struct unix_msg_ctx;
+
+/**
+ * @brief Initialize a struct unix_msg_ctx
+ *
+ * @param[in] path The socket path
+ * @param[in] ev_funcs The event callback functions to use
+ * @param[in] fragment_size Maximum datagram size to send/receive
+ * @param[in] cookie Random number to identify this context
+ * @param[in] recv_callback Function called when a message is received
+ * @param[in] private_data Private pointer for recv_callback
+ * @param[out] result The new struct unix_msg_ctx
+ * @return 0 on success, errno on failure
+ */
+
+int unix_msg_init(const char *path, const struct poll_funcs *ev_funcs,
+		  size_t fragment_size, uint64_t cookie,
+		  void (*recv_callback)(struct unix_msg_ctx *ctx,
+					uint8_t *msg, size_t msg_len,
+					void *private_data),
+		  void *private_data,
+		  struct unix_msg_ctx **result);
+
+/**
+ * @brief Send a message
+ *
+ * @param[in] ctx The context to send across
+ * @param[in] dst_sock The destination socket path
+ * @param[in] iov The message
+ * @param[in] iovlen The number of iov structs
+ * @return 0 on success, errno on failure
+ */
+
+int unix_msg_send(struct unix_msg_ctx *ctx, const char *dst_sock,
+		  const struct iovec *iov, int iovlen);
+
+/**
+ * @brief Free a unix_msg_ctx
+ *
+ * @param[in] ctx The message context to free
+ * @return 0 on success, errno on failure (EBUSY)
+ */
+int unix_msg_free(struct unix_msg_ctx *ctx);
+
+#endif
diff --git a/source3/lib/unix_msg/wscript_build b/source3/lib/unix_msg/wscript_build
new file mode 100644
index 0000000..200840d
--- /dev/null
+++ b/source3/lib/unix_msg/wscript_build
@@ -0,0 +1,18 @@
+#!/usr/bin/env python
+
+bld.SAMBA3_SUBSYSTEM('UNIX_MSG',
+                     source='unix_msg.c',
+		     deps='replace PTHREADPOOL')
+
+bld.SAMBA3_BINARY('unix_msg_test',
+                  source='tests.c',
+                  deps='UNIX_MSG POLL_FUNCS_TEVENT',
+                  install=False)
+bld.SAMBA3_BINARY('unix_msg_test_drain',
+                  source='test_drain.c',
+                  deps='UNIX_MSG POLL_FUNCS_TEVENT',
+                  install=False)
+bld.SAMBA3_BINARY('unix_msg_test_source',
+                  source='test_source.c',
+                  deps='UNIX_MSG POLL_FUNCS_TEVENT',
+                  install=False)
diff --git a/source3/wscript_build b/source3/wscript_build
index fd53e2f..4d261c6 100755
--- a/source3/wscript_build
+++ b/source3/wscript_build
@@ -1453,6 +1453,7 @@ bld.RECURSE('libgpo/gpext')
 bld.RECURSE('lib/pthreadpool')
 bld.RECURSE('lib/asys')
 bld.RECURSE('lib/poll_funcs')
+bld.RECURSE('lib/unix_msg')
 bld.RECURSE('librpc')
 bld.RECURSE('librpc/idl')
 bld.RECURSE('libsmb')
-- 
1.7.9.5


>From 50cd1373423a295cc571d7ee73fb4c683a49ecd1 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Sun, 29 Dec 2013 13:56:44 +0100
Subject: [PATCH 03/13] lib: Move full_path_tos to util_str.c

This can be useful elsewhere
---
 source3/include/proto.h |    3 +++
 source3/lib/util_str.c  |   39 +++++++++++++++++++++++++++++++++++++++
 source3/smbd/files.c    |   39 ---------------------------------------
 source3/smbd/proto.h    |    3 ---
 4 files changed, 42 insertions(+), 42 deletions(-)

diff --git a/source3/include/proto.h b/source3/include/proto.h
index a6a6815..857be60 100644
--- a/source3/include/proto.h
+++ b/source3/include/proto.h
@@ -723,6 +723,9 @@ bool validate_net_name( const char *name,
 		int max_len);
 char *escape_shell_string(const char *src);
 char **str_list_make_v3(TALLOC_CTX *mem_ctx, const char *string, const char *sep);
+ssize_t full_path_tos(const char *dir, const char *name,
+		      char *tmpbuf, size_t tmpbuf_len,
+		      char **pdst, char **to_free);
 
 /* The following definitions come from lib/version.c  */
 
diff --git a/source3/lib/util_str.c b/source3/lib/util_str.c
index 967beda..908f23a 100644
--- a/source3/lib/util_str.c
+++ b/source3/lib/util_str.c
@@ -1294,3 +1294,42 @@ char **str_list_make_v3(TALLOC_CTX *mem_ctx, const char *string,
 	TALLOC_FREE(s);
 	return list;
 }
+
+/*
+ * This routine improves performance for operations temporarily acting on a
+ * full path. It is equivalent to the much more expensive
+ *
+ * talloc_asprintf(talloc_tos(), "%s/%s", dir, name)
+ *
+ * This actually does make a difference in metadata-heavy workloads (i.e. the
+ * "standard" client.txt nbench run.
+ */
+
+ssize_t full_path_tos(const char *dir, const char *name,
+		      char *tmpbuf, size_t tmpbuf_len,
+		      char **pdst, char **to_free)
+{
+	size_t dirlen, namelen, len;
+	char *dst;
+
+	dirlen = strlen(dir);
+	namelen = strlen(name);
+	len = dirlen + namelen + 1;
+
+	if (len < tmpbuf_len) {
+		dst = tmpbuf;
+		*to_free = NULL;
+	} else {
+		dst = talloc_array(talloc_tos(), char, len+1);
+		if (dst == NULL) {
+			return -1;
+		}
+		*to_free = dst;
+	}
+
+	memcpy(dst, dir, dirlen);
+	dst[dirlen] = '/';
+	memcpy(dst+dirlen+1, name, namelen+1);
+	*pdst = dst;
+	return len;
+}
diff --git a/source3/smbd/files.c b/source3/smbd/files.c
index 5cf037e..8496806 100644
--- a/source3/smbd/files.c
+++ b/source3/smbd/files.c
@@ -688,45 +688,6 @@ NTSTATUS dup_file_fsp(struct smb_request *req, files_struct *from,
 	return fsp_set_smb_fname(to, from->fsp_name);
 }
 
-/*
- * This routine improves performance for operations temporarily acting on a
- * full path. It is equivalent to the much more expensive
- *
- * talloc_asprintf(talloc_tos(), "%s/%s", dir, name)
- *
- * This actually does make a difference in metadata-heavy workloads (i.e. the
- * "standard" client.txt nbench run.
- */
-
-ssize_t full_path_tos(const char *dir, const char *name,
-		      char *tmpbuf, size_t tmpbuf_len,
-		      char **pdst, char **to_free)
-{
-	size_t dirlen, namelen, len;
-	char *dst;
-
-	dirlen = strlen(dir);
-	namelen = strlen(name);
-	len = dirlen + namelen + 1;
-
-	if (len < tmpbuf_len) {
-		dst = tmpbuf;
-		*to_free = NULL;
-	} else {
-		dst = talloc_array(talloc_tos(), char, len+1);
-		if (dst == NULL) {
-			return -1;
-		}
-		*to_free = dst;
-	}
-
-	memcpy(dst, dir, dirlen);
-	dst[dirlen] = '/';
-	memcpy(dst+dirlen+1, name, namelen+1);
-	*pdst = dst;
-	return len;
-}
-
 /**
  * Return a jenkins hash of a pathname on a connection.
  */
diff --git a/source3/smbd/proto.h b/source3/smbd/proto.h
index 62c9728..d9b86b6 100644
--- a/source3/smbd/proto.h
+++ b/source3/smbd/proto.h
@@ -397,9 +397,6 @@ NTSTATUS file_name_hash(connection_struct *conn,
 			const char *name, uint32_t *p_name_hash);
 NTSTATUS fsp_set_smb_fname(struct files_struct *fsp,
 			   const struct smb_filename *smb_fname_in);
-ssize_t full_path_tos(const char *dir, const char *name,
-		      char *tmpbuf, size_t tmpbuf_len,
-		      char **pdst, char **to_free);
 
 /* The following definitions come from smbd/ipc.c  */
 
-- 
1.7.9.5


>From bf4dfa5b45068f40bd631c0f7b701b9e0082c0a5 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 24 Feb 2014 12:23:49 +0000
Subject: [PATCH 04/13] lib: Add messaging_dgm

Messaging based on unix domain datagram sockets
---
 source3/include/messages.h |    5 +
 source3/lib/messages.c     |    8 +-
 source3/lib/messages_dgm.c |  417 ++++++++++++++++++++++++++++++++++++++++++++
 source3/smbd/server.c      |    6 +
 source3/wscript_build      |    3 +
 5 files changed, 435 insertions(+), 4 deletions(-)
 create mode 100644 source3/lib/messages_dgm.c

diff --git a/source3/include/messages.h b/source3/include/messages.h
index 47c5f7a..9437965 100644
--- a/source3/include/messages.h
+++ b/source3/include/messages.h
@@ -91,6 +91,11 @@ struct messaging_backend {
 	void *private_data;
 };
 
+NTSTATUS messaging_dgm_init(struct messaging_context *msg_ctx,
+			    TALLOC_CTX *mem_ctx,
+			    struct messaging_backend **presult);
+NTSTATUS messaging_dgm_cleanup(struct messaging_context *msg_ctx, pid_t pid);
+
 NTSTATUS messaging_tdb_init(struct messaging_context *msg_ctx,
 			    TALLOC_CTX *mem_ctx,
 			    struct messaging_backend **presult);
diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index 4ff933d..983fe69 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -197,10 +197,10 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
 	ctx->id = procid_self();
 	ctx->event_ctx = ev;
 
-	status = messaging_tdb_init(ctx, ctx, &ctx->local);
+	status = messaging_dgm_init(ctx, ctx, &ctx->local);
 
 	if (!NT_STATUS_IS_OK(status)) {
-		DEBUG(2, ("messaging_tdb_init failed: %s\n",
+		DEBUG(2, ("messaging_dgm_init failed: %s\n",
 			  nt_errstr(status)));
 		TALLOC_FREE(ctx);
 		return NULL;
@@ -245,9 +245,9 @@ NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
 
 	msg_ctx->id = procid_self();
 
-	status = messaging_tdb_init(msg_ctx, msg_ctx, &msg_ctx->local);
+	status = messaging_dgm_init(msg_ctx, msg_ctx, &msg_ctx->local);
 	if (!NT_STATUS_IS_OK(status)) {
-		DEBUG(0, ("messaging_tdb_init failed: %s\n",
+		DEBUG(0, ("messaging_dgm_init failed: %s\n",
 			  nt_errstr(status)));
 		return status;
 	}
diff --git a/source3/lib/messages_dgm.c b/source3/lib/messages_dgm.c
new file mode 100644
index 0000000..bb701ef
--- /dev/null
+++ b/source3/lib/messages_dgm.c
@@ -0,0 +1,417 @@
+/*
+ * Unix SMB/CIFS implementation.
+ * Samba internal messaging functions
+ * Copyright (C) 2013 by Volker Lendecke
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "includes.h"
+#include "lib/util/data_blob.h"
+#include "lib/util/debug.h"
+#include "lib/unix_msg/unix_msg.h"
+#include "system/filesys.h"
+#include "messages.h"
+#include "lib/param/param.h"
+#include "poll_funcs/poll_funcs_tevent.h"
+#include "unix_msg/unix_msg.h"
+#include "librpc/gen_ndr/messaging.h"
+
+struct messaging_dgm_context {
+	struct messaging_context *msg_ctx;
+	struct poll_funcs msg_callbacks;
+	struct unix_msg_ctx *dgm_ctx;
+	char *cache_dir;
+	int lockfile_fd;
+};
+
+struct messaging_dgm_hdr {
+	uint32_t msg_version;
+	enum messaging_type msg_type;
+	struct server_id dst;
+	struct server_id src;
+};
+
+static NTSTATUS messaging_dgm_send(struct messaging_context *msg_ctx,
+				   struct server_id pid, int msg_type,
+				   const DATA_BLOB *data,
+				   struct messaging_backend *backend);
+static void messaging_dgm_recv(struct unix_msg_ctx *ctx,
+			       uint8_t *msg, size_t msg_len,
+			       void *private_data);
+
+static int messaging_dgm_context_destructor(struct messaging_dgm_context *c);
+
+static int messaging_dgm_lockfile_create(const char *cache_dir, pid_t pid,
+					 int *plockfile_fd, uint64_t unique)
+{
+	char buf[PATH_MAX];
+	char *dir, *to_free;
+	ssize_t dirlen;
+	char *lockfile_name;
+	int lockfile_fd;
+	struct flock lck = {};
+	int unique_len, ret;
+	ssize_t written;
+	bool ok;
+
+	dirlen = full_path_tos(cache_dir, "lck", buf, sizeof(buf),
+			       &dir, &to_free);
+	if (dirlen == -1) {
+		return ENOMEM;
+	}
+
+	ok = directory_create_or_exist_strict(dir, sec_initial_uid(), 0755);
+	if (!ok) {
+		ret = errno;
+		DEBUG(1, ("%s: Could not create lock directory: %s\n",
+			  __func__, strerror(ret)));
+		TALLOC_FREE(to_free);
+		return ret;
+	}
+
+	lockfile_name = talloc_asprintf(talloc_tos(), "%s/%u", dir,
+					(unsigned)pid);
+	TALLOC_FREE(to_free);
+	if (lockfile_name == NULL) {
+		DEBUG(1, ("%s: talloc_asprintf failed\n", __func__));
+		return ENOMEM;
+	}
+
+	/* no O_EXCL, existence check is via the fcntl lock */
+
+	lockfile_fd = open(lockfile_name, O_NONBLOCK|O_CREAT|O_WRONLY, 0644);
+	if (lockfile_fd == -1) {
+		ret = errno;
+		DEBUG(1, ("%s: open failed: %s\n", __func__, strerror(errno)));
+		goto fail_free;
+	}
+
+	lck.l_type = F_WRLCK;
+	lck.l_whence = SEEK_SET;
+	lck.l_start = 0;
+	lck.l_len = 0;
+
+	ret = fcntl(lockfile_fd, F_SETLK, &lck);
+	if (ret == -1) {
+		ret = errno;
+		DEBUG(1, ("%s: fcntl failed: %s\n", __func__, strerror(ret)));
+		goto fail_close;
+	}
+
+	unique_len = snprintf(buf, sizeof(buf), "%llu", unique);
+
+	/* shorten a potentially preexisting file */
+
+	ret = ftruncate(lockfile_fd, unique_len);
+	if (ret == -1) {
+		ret = errno;
+		DEBUG(1, ("%s: ftruncate failed: %s\n", __func__,
+			  strerror(ret)));
+		goto fail_unlink;
+	}
+
+	written = write(lockfile_fd, buf, unique_len);
+	if (written != unique_len) {
+		ret = errno;
+		DEBUG(1, ("%s: write failed: %s\n", __func__, strerror(ret)));
+		goto fail_unlink;
+	}
+
+	*plockfile_fd = lockfile_fd;
+	return 0;
+
+fail_unlink:
+	unlink(lockfile_name);
+fail_close:
+	close(lockfile_fd);
+fail_free:
+	TALLOC_FREE(lockfile_name);
+	return ret;
+}
+
+static int messaging_dgm_lockfile_remove(const char *cache_dir, pid_t pid)
+{
+	fstring fname;
+	char buf[PATH_MAX];
+	char *lockfile_name, *to_free;
+	ssize_t len;
+	int ret;
+
+	fstr_sprintf(fname, "msg/%u", (unsigned)pid);
+
+	len = full_path_tos(cache_dir, fname, buf, sizeof(buf),
+			    &lockfile_name, &to_free);
+	if (len == -1) {
+		return ENOMEM;
+	}
+
+	ret = unlink(lockfile_name);
+	if (ret == -1) {
+		ret = errno;
+		DEBUG(10, ("%s: unlink failed: %s\n", __func__,
+			   strerror(ret)));
+	}
+	TALLOC_FREE(to_free);
+	return ret;
+}
+
+NTSTATUS messaging_dgm_init(struct messaging_context *msg_ctx,
+			    TALLOC_CTX *mem_ctx,
+			    struct messaging_backend **presult)
+{
+	struct messaging_backend *result;
+	struct messaging_dgm_context *ctx;
+	struct loadparm_context *lp_ctx;
+	struct server_id pid = messaging_server_id(msg_ctx);
+	int ret;
+	bool ok;
+	const char *cache_dir;
+	char *socket_dir, *socket_name;
+	uint64_t cookie;
+
+	lp_ctx = loadparm_init_s3(talloc_tos(), loadparm_s3_helpers());
+	if (lp_ctx == NULL) {
+		DEBUG(0, ("loadparm_init_s3 failed\n"));
+		return NT_STATUS_INTERNAL_ERROR;
+	}
+
+	cache_dir = lp_cache_directory();
+	if (cache_dir == NULL) {
+		NTSTATUS status = map_nt_error_from_unix(errno);
+		talloc_unlink(talloc_tos(), lp_ctx);
+		return status;
+	}
+
+	talloc_unlink(talloc_tos(), lp_ctx);
+	lp_ctx = NULL;
+
+	result = talloc(mem_ctx, struct messaging_backend);
+	if (result == NULL) {
+		goto fail_nomem;
+	}
+	ctx = talloc_zero(result, struct messaging_dgm_context);
+	if (ctx == NULL) {
+		goto fail_nomem;
+	}
+
+	result->private_data = ctx;
+	result->send_fn = messaging_dgm_send;
+	ctx->msg_ctx = msg_ctx;
+
+	ctx->cache_dir = talloc_strdup(ctx, cache_dir);
+	if (ctx->cache_dir == NULL) {
+		goto fail_nomem;
+	}
+	socket_dir = talloc_asprintf(ctx, "%s/msg", cache_dir);
+	if (socket_dir == NULL) {
+		goto fail_nomem;
+	}
+	socket_name = talloc_asprintf(ctx, "%s/%u", socket_dir,
+				      (unsigned)pid.pid);
+	if (socket_name == NULL) {
+		goto fail_nomem;
+	}
+
+	sec_init();
+
+	ret = messaging_dgm_lockfile_create(cache_dir, pid.pid,
+					    &ctx->lockfile_fd, pid.unique_id);
+	if (ret != 0) {
+		DEBUG(1, ("%s: messaging_dgm_create_lockfile failed: %s\n",
+			  __func__, strerror(ret)));
+		TALLOC_FREE(result);
+		return map_nt_error_from_unix(ret);
+	}
+
+	poll_funcs_init_tevent(&ctx->msg_callbacks);
+	ctx->msg_callbacks.private_data = msg_ctx->event_ctx;
+
+	ok = directory_create_or_exist_strict(socket_dir, sec_initial_uid(),
+					      0700);
+	if (!ok) {
+		DEBUG(1, ("Could not create socket directory\n"));
+		TALLOC_FREE(result);
+		return NT_STATUS_ACCESS_DENIED;
+	}
+	TALLOC_FREE(socket_dir);
+
+	unlink(socket_name);
+
+	generate_random_buffer((uint8_t *)&cookie, sizeof(cookie));
+
+	ret = unix_msg_init(socket_name, &ctx->msg_callbacks, 1024, cookie,
+			    messaging_dgm_recv, ctx, &ctx->dgm_ctx);
+	TALLOC_FREE(socket_name);
+	if (ret != 0) {
+		DEBUG(1, ("unix_msg_init failed: %s\n", strerror(ret)));
+		TALLOC_FREE(result);
+		return map_nt_error_from_unix(ret);
+	}
+	talloc_set_destructor(ctx, messaging_dgm_context_destructor);
+
+	*presult = result;
+	return NT_STATUS_OK;
+
+fail_nomem:
+	TALLOC_FREE(result);
+	return NT_STATUS_NO_MEMORY;
+}
+
+static int messaging_dgm_context_destructor(struct messaging_dgm_context *c)
+{
+	struct server_id pid = messaging_server_id(c->msg_ctx);
+
+	if (getpid() == pid.pid) {
+		(void)messaging_dgm_lockfile_remove(c->cache_dir, pid.pid);
+	}
+
+	close(c->lockfile_fd);
+
+	unix_msg_free(c->dgm_ctx);
+	return 0;
+}
+
+static NTSTATUS messaging_dgm_send(struct messaging_context *msg_ctx,
+				   struct server_id pid, int msg_type,
+				   const DATA_BLOB *data,
+				   struct messaging_backend *backend)
+{
+	struct messaging_dgm_context *ctx = talloc_get_type_abort(
+		backend->private_data, struct messaging_dgm_context);
+	fstring pid_str;
+	char buf[PATH_MAX];
+	char *dst_sock, *to_free;
+	struct messaging_dgm_hdr hdr;
+	struct iovec iov[2];
+	ssize_t pathlen;
+	int ret;
+
+	fstr_sprintf(pid_str, "msg/%u", (unsigned)pid.pid);
+
+	pathlen = full_path_tos(ctx->cache_dir, pid_str, buf, sizeof(buf),
+				&dst_sock, &to_free);
+	if (pathlen == -1) {
+		return NT_STATUS_NO_MEMORY;
+	}
+
+	hdr.msg_version = MESSAGE_VERSION;
+	hdr.msg_type = msg_type & MSG_TYPE_MASK;
+	hdr.dst = pid;
+	hdr.src = msg_ctx->id;
+
+	DEBUG(10, ("%s: Sending message 0x%x len %u to %s\n", __func__,
+		   (unsigned)hdr.msg_type, (unsigned)data->length,
+		   server_id_str(talloc_tos(), &pid)));
+
+	iov[0].iov_base = &hdr;
+	iov[0].iov_len = sizeof(hdr);
+	iov[1].iov_base = data->data;
+	iov[1].iov_len = data->length;
+
+	become_root();
+	ret = unix_msg_send(ctx->dgm_ctx, dst_sock, iov, ARRAY_SIZE(iov));
+	unbecome_root();
+
+	TALLOC_FREE(to_free);
+
+	if (ret != 0) {
+		return map_nt_error_from_unix(ret);
+	}
+	return NT_STATUS_OK;
+}
+
+static void messaging_dgm_recv(struct unix_msg_ctx *ctx,
+			       uint8_t *msg, size_t msg_len,
+			       void *private_data)
+{
+	struct messaging_dgm_context *dgm_ctx = talloc_get_type_abort(
+		private_data, struct messaging_dgm_context);
+	struct messaging_dgm_hdr *hdr;
+	struct messaging_rec rec;
+
+	if (msg_len < sizeof(*hdr)) {
+		DEBUG(1, ("message too short: %u\n", (unsigned)msg_len));
+		return;
+	}
+
+	/*
+	 * unix_msg guarantees alignment, so we can cast here
+	 */
+	hdr = (struct messaging_dgm_hdr *)msg;
+
+	rec.msg_version = hdr->msg_version;
+	rec.msg_type = hdr->msg_type;
+	rec.dest = hdr->dst;
+	rec.src = hdr->src;
+	rec.buf.data = msg + sizeof(*hdr);
+	rec.buf.length = msg_len - sizeof(*hdr);
+
+	DEBUG(10, ("%s: Received message 0x%x len %u from %s\n", __func__,
+		   (unsigned)hdr->msg_type, (unsigned)rec.buf.length,
+		   server_id_str(talloc_tos(), &rec.src)));
+
+	messaging_dispatch_rec(dgm_ctx->msg_ctx, &rec);
+}
+
+NTSTATUS messaging_dgm_cleanup(struct messaging_context *msg_ctx, pid_t pid)
+{
+	struct messaging_dgm_context *ctx = talloc_get_type_abort(
+		msg_ctx->local->private_data, struct messaging_dgm_context);
+	char *lockfile_name, *socket_name;
+	int fd, ret;
+	struct flock lck = {};
+	NTSTATUS status = NT_STATUS_OK;
+
+	lockfile_name = talloc_asprintf(talloc_tos(), "%s/lck/%u",
+					ctx->cache_dir, (unsigned)pid);
+	if (lockfile_name == NULL) {
+		return NT_STATUS_NO_MEMORY;
+	}
+	socket_name = talloc_asprintf(lockfile_name, "%s/msg/%u",
+				      ctx->cache_dir, (unsigned)pid);
+	if (socket_name == NULL) {
+		TALLOC_FREE(lockfile_name);
+		return NT_STATUS_NO_MEMORY;
+	}
+
+	fd = open(lockfile_name, O_NONBLOCK|O_WRONLY, 0);
+	if (fd == -1) {
+		status = map_nt_error_from_unix(errno);
+		DEBUG(10, ("%s: open(%s) failed: %s\n", __func__,
+			   lockfile_name, strerror(errno)));
+		return status;
+	}
+
+	lck.l_type = F_WRLCK;
+	lck.l_whence = SEEK_SET;
+	lck.l_start = 0;
+	lck.l_len = 0;
+
+	ret = fcntl(fd, F_SETLK, &lck);
+	if (ret != 0) {
+		status = map_nt_error_from_unix(errno);
+		DEBUG(10, ("%s: Could not get lock: %s\n", __func__,
+			   strerror(errno)));
+		TALLOC_FREE(lockfile_name);
+		return status;
+	}
+
+	(void)unlink(socket_name);
+	(void)unlink(lockfile_name);
+	(void)close(fd);
+
+	TALLOC_FREE(lockfile_name);
+	return NT_STATUS_OK;
+}
diff --git a/source3/smbd/server.c b/source3/smbd/server.c
index bc9d293..7c6a905 100644
--- a/source3/smbd/server.c
+++ b/source3/smbd/server.c
@@ -465,6 +465,8 @@ static void remove_child_pid(struct smbd_parent_context *parent,
 	}
 
 	if (unclean_shutdown) {
+		NTSTATUS status;
+
 		/* a child terminated uncleanly so tickle all
 		   processes to see if they can grab any of the
 		   pending locks
@@ -488,6 +490,10 @@ static void remove_child_pid(struct smbd_parent_context *parent,
 		 * terminated uncleanly.
 		 */
 		messaging_cleanup_server(parent->msg_ctx, child_id);
+
+		status = messaging_dgm_cleanup(parent->msg_ctx, pid);
+		DEBUG(10, ("%s: messaging_dgm_cleanup returned %s\n",
+			   __func__, nt_errstr(status)));
 	}
 
 	if (!serverid_deregister(child_id)) {
diff --git a/source3/wscript_build b/source3/wscript_build
index 4d261c6..2ba7a96 100755
--- a/source3/wscript_build
+++ b/source3/wscript_build
@@ -314,6 +314,7 @@ bld.SAMBA3_SUBSYSTEM('TDB_LIB',
 bld.SAMBA3_SUBSYSTEM('samba3core',
                    source='''lib/messages.c
                    lib/messages_local.c
+                   lib/messages_dgm.c
                    lib/util_cluster.c
                    lib/id_cache.c
                    lib/talloc_dict.c
@@ -352,6 +353,8 @@ bld.SAMBA3_SUBSYSTEM('samba3core',
                         UTIL_PW
                         SAMBA_VERSION
                         PTHREADPOOL
+                        UNIX_MSG
+                        POLL_FUNCS_TEVENT
                         interfaces
                         param
                         dbwrap
-- 
1.7.9.5


>From 150e498797b04661fa9f0ffd32289bb2fcf17ea4 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 24 Feb 2014 13:20:16 +0000
Subject: [PATCH 05/13] lib: Remove messages_local

---
 source3/include/messages.h   |   12 -
 source3/lib/messages.c       |   17 --
 source3/lib/messages_local.c |  573 ------------------------------------------
 source3/smbd/server.c        |   11 -
 source3/wscript_build        |    1 -
 5 files changed, 614 deletions(-)
 delete mode 100644 source3/lib/messages_local.c

diff --git a/source3/include/messages.h b/source3/include/messages.h
index 9437965..b89af9c 100644
--- a/source3/include/messages.h
+++ b/source3/include/messages.h
@@ -96,15 +96,6 @@ NTSTATUS messaging_dgm_init(struct messaging_context *msg_ctx,
 			    struct messaging_backend **presult);
 NTSTATUS messaging_dgm_cleanup(struct messaging_context *msg_ctx, pid_t pid);
 
-NTSTATUS messaging_tdb_init(struct messaging_context *msg_ctx,
-			    TALLOC_CTX *mem_ctx,
-			    struct messaging_backend **presult);
-
-bool messaging_tdb_parent_init(TALLOC_CTX *mem_ctx);
-
-NTSTATUS messaging_tdb_cleanup(struct messaging_context *msg_ctx,
-			struct server_id pid);
-
 NTSTATUS messaging_ctdbd_init(struct messaging_context *msg_ctx,
 			      TALLOC_CTX *mem_ctx,
 			      struct messaging_backend **presult);
@@ -151,9 +142,6 @@ struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
 			struct messaging_rec **presult);
 
-void messaging_cleanup_server(struct messaging_context *msg_ctx,
-				struct server_id pid);
-
 #include "librpc/gen_ndr/ndr_messaging.h"
 
 #endif
diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index 983fe69..5db1214 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -567,21 +567,4 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx,
 	return;
 }
 
-/*
-  Call when a process has terminated abnormally.
-*/
-void messaging_cleanup_server(struct messaging_context *msg_ctx,
-				struct server_id server)
-{
-	if (server_id_is_disconnected(&server)) {
-		return;
-	}
-
-	if (!procid_is_local(&server)) {
-		return;
-	}
-
-	(void)messaging_tdb_cleanup(msg_ctx, server);
-
-}
 /** @} **/
diff --git a/source3/lib/messages_local.c b/source3/lib/messages_local.c
deleted file mode 100644
index d535df1..0000000
--- a/source3/lib/messages_local.c
+++ /dev/null
@@ -1,573 +0,0 @@
-/* 
-   Unix SMB/CIFS implementation.
-   Samba internal messaging functions
-   Copyright (C) 2007 by Volker Lendecke
-
-   This program is free software; you can redistribute it and/or modify
-   it under the terms of the GNU General Public License as published by
-   the Free Software Foundation; either version 3 of the License, or
-   (at your option) any later version.
-
-   This program is distributed in the hope that it will be useful,
-   but WITHOUT ANY WARRANTY; without even the implied warranty of
-   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-   GNU General Public License for more details.
-
-   You should have received a copy of the GNU General Public License
-   along with this program.  If not, see <http://www.gnu.org/licenses/>.
-*/
-
-/**
-  @defgroup messages Internal messaging framework
-  @{
-  @file messages.c
-
-  @brief  Module for internal messaging between Samba daemons. 
-
-   The idea is that if a part of Samba wants to do communication with
-   another Samba process then it will do a message_register() of a
-   dispatch function, and use message_send_pid() to send messages to
-   that process.
-
-   The dispatch function is given the pid of the sender, and it can
-   use that to reply by message_send_pid().  See ping_message() for a
-   simple example.
-
-   @caution Dispatch functions must be able to cope with incoming
-   messages on an *odd* byte boundary.
-
-   This system doesn't have any inherent size limitations but is not
-   very efficient for large messages or when messages are sent in very
-   quick succession.
-
-*/
-
-#include "includes.h"
-#include "system/filesys.h"
-#include "messages.h"
-#include "serverid.h"
-#include "lib/tdb_wrap/tdb_wrap.h"
-#include "lib/param/param.h"
-
-struct messaging_tdb_context {
-	struct messaging_context *msg_ctx;
-	struct tdb_wrap *tdb;
-	struct tevent_signal *se;
-	int received_messages;
-	bool *have_context;
-};
-
-static NTSTATUS messaging_tdb_send(struct messaging_context *msg_ctx,
-				   struct server_id pid, int msg_type,
-				   const DATA_BLOB *data,
-				   struct messaging_backend *backend);
-static void message_dispatch(struct messaging_context *msg_ctx);
-
-static void messaging_tdb_signal_handler(struct tevent_context *ev_ctx,
-					 struct tevent_signal *se,
-					 int signum, int count,
-					 void *_info, void *private_data)
-{
-	struct messaging_tdb_context *ctx = talloc_get_type(private_data,
-					    struct messaging_tdb_context);
-
-	ctx->received_messages++;
-
-	DEBUG(10, ("messaging_tdb_signal_handler: sig[%d] count[%d] msgs[%d]\n",
-		   signum, count, ctx->received_messages));
-
-	message_dispatch(ctx->msg_ctx);
-}
-
-static int messaging_tdb_context_destructor(struct messaging_tdb_context *ctx);
-
-/****************************************************************************
- Initialise the messaging functions. 
-****************************************************************************/
-
-NTSTATUS messaging_tdb_init(struct messaging_context *msg_ctx,
-			    TALLOC_CTX *mem_ctx,
-			    struct messaging_backend **presult)
-{
-	struct messaging_backend *result;
-	struct messaging_tdb_context *ctx;
-	struct loadparm_context *lp_ctx;
-	static bool have_context = false;
-	const char *fname;
-
-	if (have_context) {
-		DEBUG(0, ("No two messaging contexts per process\n"));
-		return NT_STATUS_OBJECT_NAME_COLLISION;
-	}
-
-	if (!(result = talloc(mem_ctx, struct messaging_backend))) {
-		DEBUG(0, ("talloc failed\n"));
-		return NT_STATUS_NO_MEMORY;
-	}
-
-	lp_ctx = loadparm_init_s3(result, loadparm_s3_helpers());
-	if (lp_ctx == NULL) {
-		DEBUG(0, ("loadparm_init_s3 failed\n"));
-		TALLOC_FREE(result);
-		return NT_STATUS_INTERNAL_ERROR;
-	}
-
-	ctx = talloc_zero(result, struct messaging_tdb_context);
-	if (!ctx) {
-		DEBUG(0, ("talloc failed\n"));
-		TALLOC_FREE(result);
-		return NT_STATUS_NO_MEMORY;
-	}
-	result->private_data = ctx;
-	result->send_fn = messaging_tdb_send;
-
-	ctx->msg_ctx = msg_ctx;
-	ctx->have_context = &have_context;
-
-	fname = lock_path("messages.tdb");
-
-	ctx->tdb = tdb_wrap_open(
-		ctx, fname, lpcfg_tdb_hash_size(lp_ctx, fname),
-		lpcfg_tdb_flags(lp_ctx, TDB_CLEAR_IF_FIRST|TDB_DEFAULT|
-				TDB_VOLATILE| TDB_INCOMPATIBLE_HASH),
-		O_RDWR|O_CREAT,0600);
-
-	talloc_unlink(result, lp_ctx);
-
-	if (!ctx->tdb) {
-		NTSTATUS status = map_nt_error_from_unix(errno);
-		DEBUG(2, ("ERROR: Failed to initialise messages database: "
-			  "%s\n", strerror(errno)));
-		TALLOC_FREE(result);
-		return status;
-	}
-
-	ctx->se = tevent_add_signal(msg_ctx->event_ctx,
-				    ctx,
-				    SIGUSR1, 0,
-				    messaging_tdb_signal_handler,
-				    ctx);
-	if (!ctx->se) {
-		NTSTATUS status = map_nt_error_from_unix(errno);
-		DEBUG(0, ("ERROR: Failed to initialise messages signal handler: "
-			  "%s\n", strerror(errno)));
-		TALLOC_FREE(result);
-		return status;
-	}
-
-	sec_init();
-
-	have_context = true;
-	talloc_set_destructor(ctx, messaging_tdb_context_destructor);
-
-	*presult = result;
-	return NT_STATUS_OK;
-}
-
-static int messaging_tdb_context_destructor(struct messaging_tdb_context *ctx)
-{
-	SMB_ASSERT(*ctx->have_context);
-	*ctx->have_context = false;
-	return 0;
-}
-
-bool messaging_tdb_parent_init(TALLOC_CTX *mem_ctx)
-{
-	struct tdb_wrap *db;
-	struct loadparm_context *lp_ctx;
-	const char *fname;
-
-	lp_ctx = loadparm_init_s3(mem_ctx, loadparm_s3_helpers());
-	if (lp_ctx == NULL) {
-		DEBUG(0, ("loadparm_init_s3 failed\n"));
-		return false;
-	}
-
-	/*
-	 * Open the tdb in the parent process (smbd) so that our
-	 * CLEAR_IF_FIRST optimization in tdb_reopen_all can properly
-	 * work.
-	 */
-
-	fname = lock_path("messages.tdb");
-	db = tdb_wrap_open(
-		mem_ctx, fname, lpcfg_tdb_hash_size(lp_ctx, fname),
-		lpcfg_tdb_flags(lp_ctx, TDB_CLEAR_IF_FIRST|TDB_DEFAULT|
-				TDB_VOLATILE|TDB_INCOMPATIBLE_HASH),
-		O_RDWR|O_CREAT,0600);
-	talloc_unlink(mem_ctx, lp_ctx);
-	if (db == NULL) {
-		DEBUG(1, ("could not open messaging.tdb: %s\n",
-			  strerror(errno)));
-		return false;
-	}
-	return true;
-}
-
-/*******************************************************************
- Form a static tdb key from a pid.
-******************************************************************/
-
-static TDB_DATA message_key_pid(TALLOC_CTX *mem_ctx, struct server_id pid)
-{
-	char *key;
-	TDB_DATA kbuf;
-
-	key = talloc_asprintf(mem_ctx, "PID/%s", procid_str_static(&pid));
-
-	SMB_ASSERT(key != NULL);
-
-	kbuf.dptr = (uint8 *)key;
-	kbuf.dsize = strlen(key)+1;
-	return kbuf;
-}
-
-/*******************************************************************
- Called when a process has terminated abnormally. Remove all messages
- pending for it.
-******************************************************************/
-
-NTSTATUS messaging_tdb_cleanup(struct messaging_context *msg_ctx,
-				struct server_id pid)
-{
-	struct messaging_tdb_context *ctx = talloc_get_type(
-					msg_ctx->local->private_data,
-					struct messaging_tdb_context);
-	struct tdb_wrap *tdb = ctx->tdb;
-	TDB_DATA key;
-	TALLOC_CTX *frame = talloc_stackframe();
-
-	key = message_key_pid(frame, pid);
-	/*
-	 * We have to lock the key to avoid
-	 * races in case the server_id was
-	 * re-used and is active (a remote
-	 * possibility, true). We only
-	 * clean up the database if we
-	 * know server_id doesn't exist
-	 * while checked under the chainlock.
-	 */
-	if (tdb_chainlock(tdb->tdb, key) != 0) {
-		TALLOC_FREE(frame);
-		return NT_STATUS_LOCK_NOT_GRANTED;
-	}
-	if (!serverid_exists(&pid)) {
-		(void)tdb_delete(tdb->tdb, key);
-	}
-	tdb_chainunlock(tdb->tdb, key);
-	TALLOC_FREE(frame);
-	return NT_STATUS_OK;
-}
-
-/*
-  Fetch the messaging array for a process
- */
-
-static NTSTATUS messaging_tdb_fetch(TDB_CONTEXT *msg_tdb,
-				    TDB_DATA key,
-				    TALLOC_CTX *mem_ctx,
-				    struct messaging_array **presult)
-{
-	struct messaging_array *result;
-	TDB_DATA data;
-	DATA_BLOB blob;
-	enum ndr_err_code ndr_err;
-
-	if (!(result = talloc_zero(mem_ctx, struct messaging_array))) {
-		return NT_STATUS_NO_MEMORY;
-	}
-
-	data = tdb_fetch(msg_tdb, key);
-
-	if (data.dptr == NULL) {
-		*presult = result;
-		return NT_STATUS_OK;
-	}
-
-	blob = data_blob_const(data.dptr, data.dsize);
-
-	ndr_err = ndr_pull_struct_blob_all(
-		&blob, result, result,
-		(ndr_pull_flags_fn_t)ndr_pull_messaging_array);
-
-	SAFE_FREE(data.dptr);
-
-	if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
-		TALLOC_FREE(result);
-		return ndr_map_error2ntstatus(ndr_err);
-	}
-
-	if (DEBUGLEVEL >= 10) {
-		DEBUG(10, ("messaging_tdb_fetch:\n"));
-		NDR_PRINT_DEBUG(messaging_array, result);
-	}
-
-	*presult = result;
-	return NT_STATUS_OK;
-}
-
-/*
-  Store a messaging array for a pid
-*/
-
-static NTSTATUS messaging_tdb_store(TDB_CONTEXT *msg_tdb,
-				    TDB_DATA key,
-				    struct messaging_array *array)
-{
-	TDB_DATA data;
-	DATA_BLOB blob;
-	enum ndr_err_code ndr_err;
-	TALLOC_CTX *mem_ctx;
-	int ret;
-
-	if (array->num_messages == 0) {
-		tdb_delete(msg_tdb, key);
-		return NT_STATUS_OK;
-	}
-
-	if (!(mem_ctx = talloc_new(array))) {
-		return NT_STATUS_NO_MEMORY;
-	}
-
-	ndr_err = ndr_push_struct_blob(&blob, mem_ctx, array,
-		(ndr_push_flags_fn_t)ndr_push_messaging_array);
-
-	if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
-		talloc_free(mem_ctx);
-		return ndr_map_error2ntstatus(ndr_err);
-	}
-
-	if (DEBUGLEVEL >= 10) {
-		DEBUG(10, ("messaging_tdb_store:\n"));
-		NDR_PRINT_DEBUG(messaging_array, array);
-	}
-
-	data.dptr = blob.data;
-	data.dsize = blob.length;
-
-	ret = tdb_store(msg_tdb, key, data, TDB_REPLACE);
-	TALLOC_FREE(mem_ctx);
-
-	return (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION;
-}
-
-/****************************************************************************
- Notify a process that it has a message. If the process doesn't exist 
- then delete its record in the database.
-****************************************************************************/
-
-static NTSTATUS message_notify(struct server_id procid)
-{
-	pid_t pid = procid.pid;
-	int ret;
-	uid_t euid = geteuid();
-
-	/*
-	 * Doing kill with a non-positive pid causes messages to be
-	 * sent to places we don't want.
-	 */
-
-	SMB_ASSERT(pid > 0);
-	if (pid <= 0) {
-		return NT_STATUS_INVALID_HANDLE;
-	}
-
-	if (euid != 0) {
-		/* If we're not root become so to send the message. */
-		save_re_uid();
-		set_effective_uid(0);
-	}
-
-	ret = kill(pid, SIGUSR1);
-
-	if (euid != 0) {
-		/* Go back to who we were. */
-		int saved_errno = errno;
-		restore_re_uid_fromroot();
-		errno = saved_errno;
-	}
-
-	if (ret == 0) {
-		return NT_STATUS_OK;
-	}
-
-	/*
-	 * Something has gone wrong
-	 */
-
-	DEBUG(2,("message to process %d failed - %s\n", (int)pid,
-		 strerror(errno)));
-
-	/*
-	 * No call to map_nt_error_from_unix -- don't want to link in
-	 * errormap.o into lots of utils.
-	 */
-
-	if (errno == ESRCH)  return NT_STATUS_INVALID_HANDLE;
-	if (errno == EINVAL) return NT_STATUS_INVALID_PARAMETER;
-	if (errno == EPERM)  return NT_STATUS_ACCESS_DENIED;
-	return NT_STATUS_UNSUCCESSFUL;
-}
-
-/****************************************************************************
- Send a message to a particular pid.
-****************************************************************************/
-
-static NTSTATUS messaging_tdb_send(struct messaging_context *msg_ctx,
-				   struct server_id pid, int msg_type,
-				   const DATA_BLOB *data,
-				   struct messaging_backend *backend)
-{
-	struct messaging_tdb_context *ctx = talloc_get_type(backend->private_data,
-					    struct messaging_tdb_context);
-	struct messaging_array *msg_array;
-	struct messaging_rec *rec;
-	NTSTATUS status;
-	TDB_DATA key;
-	struct tdb_wrap *tdb = ctx->tdb;
-	TALLOC_CTX *frame = talloc_stackframe();
-
-	/* NULL pointer means implicit length zero. */
-	if (!data->data) {
-		SMB_ASSERT(data->length == 0);
-	}
-
-	/*
-	 * Doing kill with a non-positive pid causes messages to be
-	 * sent to places we don't want.
-	 */
-
-	SMB_ASSERT(procid_to_pid(&pid) > 0);
-
-	key = message_key_pid(frame, pid);
-
-	if (tdb_chainlock(tdb->tdb, key) != 0) {
-		TALLOC_FREE(frame);
-		return NT_STATUS_LOCK_NOT_GRANTED;
-	}
-
-	status = messaging_tdb_fetch(tdb->tdb, key, frame, &msg_array);
-
-	if (!NT_STATUS_IS_OK(status)) {
-		goto done;
-	}
-
-	if ((msg_type & MSG_FLAG_LOWPRIORITY)
-	    && (msg_array->num_messages > 1000)) {
-		DEBUG(5, ("Dropping message for PID %s\n",
-			  procid_str_static(&pid)));
-		status = NT_STATUS_INSUFFICIENT_RESOURCES;
-		goto done;
-	}
-
-	if (!(rec = talloc_realloc(frame, msg_array->messages,
-					 struct messaging_rec,
-					 msg_array->num_messages+1))) {
-		status = NT_STATUS_NO_MEMORY;
-		goto done;
-	}
-
-	rec[msg_array->num_messages].msg_version = MESSAGE_VERSION;
-	rec[msg_array->num_messages].msg_type = msg_type & MSG_TYPE_MASK;
-	rec[msg_array->num_messages].dest = pid;
-	rec[msg_array->num_messages].src = msg_ctx->id;
-	rec[msg_array->num_messages].buf = *data;
-
-	msg_array->messages = rec;
-	msg_array->num_messages += 1;
-
-	status = messaging_tdb_store(tdb->tdb, key, msg_array);
-
-	if (!NT_STATUS_IS_OK(status)) {
-		goto done;
-	}
-
-	status = message_notify(pid);
-
-	if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
-		DEBUG(2, ("pid %s doesn't exist - deleting messages record\n",
-			  procid_str_static(&pid)));
-		tdb_delete(tdb->tdb, message_key_pid(frame, pid));
-	}
-
- done:
-	tdb_chainunlock(tdb->tdb, key);
-	TALLOC_FREE(frame);
-	return status;
-}
-
-/****************************************************************************
- Retrieve all messages for a process.
-****************************************************************************/
-
-static NTSTATUS retrieve_all_messages(TDB_CONTEXT *msg_tdb,
-				      struct server_id id,
-				      TALLOC_CTX *mem_ctx,
-				      struct messaging_array **presult)
-{
-	struct messaging_array *result;
-	TDB_DATA key = message_key_pid(mem_ctx, id);
-	NTSTATUS status;
-
-	if (tdb_chainlock(msg_tdb, key) != 0) {
-		TALLOC_FREE(key.dptr);
-		return NT_STATUS_LOCK_NOT_GRANTED;
-	}
-
-	status = messaging_tdb_fetch(msg_tdb, key, mem_ctx, &result);
-
-	/*
-	 * We delete the record here, tdb_set_max_dead keeps it around
-	 */
-	tdb_delete(msg_tdb, key);
-	tdb_chainunlock(msg_tdb, key);
-
-	if (NT_STATUS_IS_OK(status)) {
-		*presult = result;
-	}
-
-	TALLOC_FREE(key.dptr);
-
-	return status;
-}
-
-/****************************************************************************
- Receive and dispatch any messages pending for this process.
- JRA changed Dec 13 2006. Only one message handler now permitted per type.
- *NOTE*: Dispatch functions must be able to cope with incoming
- messages on an *odd* byte boundary.
-****************************************************************************/
-
-static void message_dispatch(struct messaging_context *msg_ctx)
-{
-	struct messaging_tdb_context *ctx = talloc_get_type(msg_ctx->local->private_data,
-					    struct messaging_tdb_context);
-	struct messaging_array *msg_array = NULL;
-	struct tdb_wrap *tdb = ctx->tdb;
-	NTSTATUS status;
-	uint32 i;
-
-	if (ctx->received_messages == 0) {
-		return;
-	}
-
-	DEBUG(10, ("message_dispatch: received_messages = %d\n",
-		   ctx->received_messages));
-
-	status = retrieve_all_messages(tdb->tdb, msg_ctx->id, NULL, &msg_array);
-	if (!NT_STATUS_IS_OK(status)) {
-		DEBUG(0, ("message_dispatch: failed to retrieve messages: %s\n",
-			   nt_errstr(status)));
-		return;
-	}
-
-	ctx->received_messages = 0;
-
-	for (i=0; i<msg_array->num_messages; i++) {
-		messaging_dispatch_rec(msg_ctx, &msg_array->messages[i]);
-	}
-
-	TALLOC_FREE(msg_array);
-}
-
-/** @} **/
diff --git a/source3/smbd/server.c b/source3/smbd/server.c
index 7c6a905..9fcf254 100644
--- a/source3/smbd/server.c
+++ b/source3/smbd/server.c
@@ -484,13 +484,6 @@ static void remove_child_pid(struct smbd_parent_context *parent,
 			DEBUG(1,("Scheduled cleanup of brl and lock database after unclean shutdown\n"));
 		}
 
-		/*
-		 * Ensure we flush any stored messages
-		 * queued for the child process that
-		 * terminated uncleanly.
-		 */
-		messaging_cleanup_server(parent->msg_ctx, child_id);
-
 		status = messaging_dgm_cleanup(parent->msg_ctx, pid);
 		DEBUG(10, ("%s: messaging_dgm_cleanup returned %s\n",
 			   __func__, nt_errstr(status)));
@@ -1457,10 +1450,6 @@ extern void build_options(bool screen);
 	if (!locking_init())
 		exit(1);
 
-	if (!messaging_tdb_parent_init(ev_ctx)) {
-		exit(1);
-	}
-
 	if (!smbd_parent_notify_init(NULL, msg_ctx, ev_ctx)) {
 		exit(1);
 	}
diff --git a/source3/wscript_build b/source3/wscript_build
index 2ba7a96..4f5661f 100755
--- a/source3/wscript_build
+++ b/source3/wscript_build
@@ -313,7 +313,6 @@ bld.SAMBA3_SUBSYSTEM('TDB_LIB',
 
 bld.SAMBA3_SUBSYSTEM('samba3core',
                    source='''lib/messages.c
-                   lib/messages_local.c
                    lib/messages_dgm.c
                    lib/util_cluster.c
                    lib/id_cache.c
-- 
1.7.9.5


>From 342b46b36bb3b8ccef0b0fc845bd3f9798f441b8 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 4 Apr 2014 15:00:16 +0000
Subject: [PATCH 06/13] smbd: Add a timestamp to queued notify events

In a cluster and with changed messaging it can happen that messages are
scheduled after new SMB requests. This re-ordering breaks a few notify tests.
This starts the infrastructure to add timestamps to notify events, so that they
can be sorted before they are sent out. The timestamp will be the current local
time of notify_fname, that's all we can do.
---
 source3/smbd/notify.c |   21 ++++++++++++++-------
 1 file changed, 14 insertions(+), 7 deletions(-)

diff --git a/source3/smbd/notify.c b/source3/smbd/notify.c
index c19982a..6a4b52c 100644
--- a/source3/smbd/notify.c
+++ b/source3/smbd/notify.c
@@ -24,6 +24,12 @@
 #include "smbd/globals.h"
 #include "../librpc/gen_ndr/ndr_notify.h"
 
+struct notify_change_event {
+	struct timespec when;
+	uint32_t action;
+	const char *name;
+};
+
 struct notify_change_buf {
 	/*
 	 * If no requests are pending, changes are queued here. Simple array,
@@ -35,7 +41,7 @@ struct notify_change_buf {
 	 * asked we just return NT_STATUS_OK without specific changes.
 	 */
 	int num_changes;
-	struct notify_change *changes;
+	struct notify_change_event *changes;
 
 	/*
 	 * If no changes are around requests are queued here. Using a linked
@@ -87,8 +93,8 @@ struct notify_mid_map {
 	uint64_t mid;
 };
 
-static bool notify_change_record_identical(struct notify_change *c1,
-					struct notify_change *c2)
+static bool notify_change_record_identical(struct notify_change_event *c1,
+					   struct notify_change_event *c2)
 {
 	/* Note this is deliberately case sensitive. */
 	if (c1->action == c2->action &&
@@ -100,7 +106,7 @@ static bool notify_change_record_identical(struct notify_change *c1,
 
 static bool notify_marshall_changes(int num_changes,
 				uint32 max_offset,
-				struct notify_change *changes,
+				struct notify_change_event *changes,
 				DATA_BLOB *final_blob)
 {
 	int i;
@@ -111,7 +117,7 @@ static bool notify_marshall_changes(int num_changes,
 
 	for (i=0; i<num_changes; i++) {
 		enum ndr_err_code ndr_err;
-		struct notify_change *c;
+		struct notify_change_event *c;
 		struct FILE_NOTIFY_INFORMATION m;
 		DATA_BLOB blob;
 
@@ -437,7 +443,7 @@ void notify_fname(connection_struct *conn, uint32 action, uint32 filter,
 
 static void notify_fsp(files_struct *fsp, uint32 action, const char *name)
 {
-	struct notify_change *change, *changes;
+	struct notify_change_event *change, *changes;
 	char *tmp;
 
 	if (fsp->notify == NULL) {
@@ -483,7 +489,8 @@ static void notify_fsp(files_struct *fsp, uint32 action, const char *name)
 
 	if (!(changes = talloc_realloc(
 		      fsp->notify, fsp->notify->changes,
-		      struct notify_change, fsp->notify->num_changes+1))) {
+		      struct notify_change_event,
+		      fsp->notify->num_changes+1))) {
 		DEBUG(0, ("talloc_realloc failed\n"));
 		return;
 	}
-- 
1.7.9.5


>From eb45fade2cf9121b1450d90322522458a46f7564 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 4 Apr 2014 15:03:44 +0000
Subject: [PATCH 07/13] smbd: Pass timespec_current to notify_fsp

---
 source3/smbd/notify.c |   11 +++++++----
 1 file changed, 7 insertions(+), 4 deletions(-)

diff --git a/source3/smbd/notify.c b/source3/smbd/notify.c
index 6a4b52c..693418a 100644
--- a/source3/smbd/notify.c
+++ b/source3/smbd/notify.c
@@ -63,7 +63,8 @@ struct notify_change_request {
 	void *backend_data;
 };
 
-static void notify_fsp(files_struct *fsp, uint32 action, const char *name);
+static void notify_fsp(files_struct *fsp, struct timespec when,
+		       uint32 action, const char *name);
 
 bool change_notify_fsp_has_changes(struct files_struct *fsp)
 {
@@ -214,7 +215,7 @@ static void notify_callback(void *private_data, const struct notify_event *e)
 {
 	files_struct *fsp = (files_struct *)private_data;
 	DEBUG(10, ("notify_callback called for %s\n", fsp_str_dbg(fsp)));
-	notify_fsp(fsp, e->action, e->path);
+	notify_fsp(fsp, timespec_current(), e->action, e->path);
 }
 
 static void sys_notify_callback(struct sys_notify_context *ctx,
@@ -223,7 +224,7 @@ static void sys_notify_callback(struct sys_notify_context *ctx,
 {
 	files_struct *fsp = (files_struct *)private_data;
 	DEBUG(10, ("sys_notify_callback called for %s\n", fsp_str_dbg(fsp)));
-	notify_fsp(fsp, e->action, e->path);
+	notify_fsp(fsp, timespec_current(), e->action, e->path);
 }
 
 NTSTATUS change_notify_create(struct files_struct *fsp, uint32 filter,
@@ -441,7 +442,8 @@ void notify_fname(connection_struct *conn, uint32 action, uint32 filter,
 	TALLOC_FREE(to_free);
 }
 
-static void notify_fsp(files_struct *fsp, uint32 action, const char *name)
+static void notify_fsp(files_struct *fsp, struct timespec when,
+		       uint32 action, const char *name)
 {
 	struct notify_change_event *change, *changes;
 	char *tmp;
@@ -507,6 +509,7 @@ static void notify_fsp(files_struct *fsp, uint32 action, const char *name)
 	string_replace(tmp, '/', '\\');
 	change->name = tmp;	
 
+	change->when = when;
 	change->action = action;
 	fsp->notify->num_changes += 1;
 
-- 
1.7.9.5


>From 5811933f906a12a24e8219a0e7015b1f8bbdbb90 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 4 Apr 2014 15:11:51 +0000
Subject: [PATCH 08/13] smbd: Pass timespec_current through the
 notify_callback

---
 source3/smbd/notify.c          |    5 +++--
 source3/smbd/notify_internal.c |    8 +++++---
 source3/smbd/proto.h           |    3 ++-
 3 files changed, 10 insertions(+), 6 deletions(-)

diff --git a/source3/smbd/notify.c b/source3/smbd/notify.c
index 693418a..b8085dd 100644
--- a/source3/smbd/notify.c
+++ b/source3/smbd/notify.c
@@ -211,11 +211,12 @@ void change_notify_reply(struct smb_request *req,
 	notify_buf->num_changes = 0;
 }
 
-static void notify_callback(void *private_data, const struct notify_event *e)
+static void notify_callback(void *private_data, struct timespec when,
+			    const struct notify_event *e)
 {
 	files_struct *fsp = (files_struct *)private_data;
 	DEBUG(10, ("notify_callback called for %s\n", fsp_str_dbg(fsp)));
-	notify_fsp(fsp, timespec_current(), e->action, e->path);
+	notify_fsp(fsp, when, e->action, e->path);
 }
 
 static void sys_notify_callback(struct sys_notify_context *ctx,
diff --git a/source3/smbd/notify_internal.c b/source3/smbd/notify_internal.c
index 4d88565..69f9377 100644
--- a/source3/smbd/notify_internal.c
+++ b/source3/smbd/notify_internal.c
@@ -44,7 +44,7 @@
 struct notify_list {
 	struct notify_list *next, *prev;
 	const char *path;
-	void (*callback)(void *, const struct notify_event *);
+	void (*callback)(void *, struct timespec, const struct notify_event *);
 	void *private_data;
 };
 
@@ -194,7 +194,8 @@ static int notify_context_destructor(struct notify_context *notify)
 
 NTSTATUS notify_add(struct notify_context *notify,
 		    const char *path, uint32_t filter, uint32_t subdir_filter,
-		    void (*callback)(void *, const struct notify_event *),
+		    void (*callback)(void *, struct timespec,
+				     const struct notify_event *),
 		    void *private_data)
 {
 	struct notify_db_entry e;
@@ -820,7 +821,8 @@ static void notify_handler(struct messaging_context *msg_ctx,
 
 	for (listel=notify->list;listel;listel=listel->next) {
 		if (listel->private_data == n->private_data) {
-			listel->callback(listel->private_data, n);
+			listel->callback(listel->private_data,
+					 timespec_current(), n);
 			break;
 		}
 	}
diff --git a/source3/smbd/proto.h b/source3/smbd/proto.h
index d9b86b6..bc15f15 100644
--- a/source3/smbd/proto.h
+++ b/source3/smbd/proto.h
@@ -552,7 +552,8 @@ struct notify_context *notify_init(TALLOC_CTX *mem_ctx,
 				   struct tevent_context *ev);
 NTSTATUS notify_add(struct notify_context *notify,
 		    const char *path, uint32_t filter, uint32_t subdir_filter,
-		    void (*callback)(void *, const struct notify_event *),
+		    void (*callback)(void *, struct timespec,
+				     const struct notify_event *),
 		    void *private_data);
 NTSTATUS notify_remove(struct notify_context *notify, void *private_data);
 void notify_trigger(struct notify_context *notify,
-- 
1.7.9.5


>From 89197afa5e3763dd41f34e100a9b78794dd82da3 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Sun, 2 Mar 2014 18:34:53 +0100
Subject: [PATCH 09/13] lib: Introduce iov_buflen

.. with overflow protection

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/include/proto.h |    1 +
 source3/lib/util_sock.c |   28 +++++++++++++++++++++++-----
 2 files changed, 24 insertions(+), 5 deletions(-)

diff --git a/source3/include/proto.h b/source3/include/proto.h
index 857be60..80dcd8c 100644
--- a/source3/include/proto.h
+++ b/source3/include/proto.h
@@ -582,6 +582,7 @@ NTSTATUS read_fd_with_timeout(int fd, char *buf,
 				  size_t *size_ret);
 NTSTATUS read_data(int fd, char *buffer, size_t N);
 ssize_t write_data(int fd, const char *buffer, size_t N);
+ssize_t iov_buflen(const struct iovec *iov, int iovlen);
 ssize_t write_data_iov(int fd, const struct iovec *orig_iov, int iovcnt);
 bool send_keepalive(int client);
 NTSTATUS read_smb_length_return_keepalive(int fd, char *inbuf,
diff --git a/source3/lib/util_sock.c b/source3/lib/util_sock.c
index 12e4ccd..b8149ca 100644
--- a/source3/lib/util_sock.c
+++ b/source3/lib/util_sock.c
@@ -201,6 +201,24 @@ NTSTATUS read_data(int fd, char *buffer, size_t N)
 	return read_fd_with_timeout(fd, buffer, N, N, 0, NULL);
 }
 
+ssize_t iov_buflen(const struct iovec *iov, int iovcnt)
+{
+	size_t buflen = 0;
+	int i;
+
+	for (i=0; i<iovcnt; i++) {
+		size_t thislen = iov[i].iov_len;
+		size_t tmp = buflen + thislen;
+
+		if ((tmp < buflen) || (tmp < thislen)) {
+			/* overflow */
+			return -1;
+		}
+		buflen = tmp;
+	}
+	return buflen;
+}
+
 /****************************************************************************
  Write all data from an iov array
  NB. This can be called with a non-socket fd, don't add dependencies
@@ -209,15 +227,15 @@ NTSTATUS read_data(int fd, char *buffer, size_t N)
 
 ssize_t write_data_iov(int fd, const struct iovec *orig_iov, int iovcnt)
 {
-	int i;
-	size_t to_send;
+	ssize_t to_send;
 	ssize_t thistime;
 	size_t sent;
 	struct iovec *iov_copy, *iov;
 
-	to_send = 0;
-	for (i=0; i<iovcnt; i++) {
-		to_send += orig_iov[i].iov_len;
+	to_send = iov_buflen(orig_iov, iovcnt);
+	if (to_send == -1) {
+		errno = EINVAL;
+		return -1;
 	}
 
 	thistime = sys_writev(fd, orig_iov, iovcnt);
-- 
1.7.9.5


>From 41e6741ca678c90ac8c53f7c996feaecd68d95e5 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Sun, 2 Mar 2014 19:33:08 +0100
Subject: [PATCH 10/13] lib: Add iov_buf

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/include/proto.h |    1 +
 source3/lib/util_sock.c |   25 +++++++++++++++++++++++++
 2 files changed, 26 insertions(+)

diff --git a/source3/include/proto.h b/source3/include/proto.h
index 80dcd8c..7cedb18 100644
--- a/source3/include/proto.h
+++ b/source3/include/proto.h
@@ -583,6 +583,7 @@ NTSTATUS read_fd_with_timeout(int fd, char *buf,
 NTSTATUS read_data(int fd, char *buffer, size_t N);
 ssize_t write_data(int fd, const char *buffer, size_t N);
 ssize_t iov_buflen(const struct iovec *iov, int iovlen);
+uint8_t *iov_buf(TALLOC_CTX *mem_ctx, const struct iovec *iov, int iovcnt);
 ssize_t write_data_iov(int fd, const struct iovec *orig_iov, int iovcnt);
 bool send_keepalive(int client);
 NTSTATUS read_smb_length_return_keepalive(int fd, char *inbuf,
diff --git a/source3/lib/util_sock.c b/source3/lib/util_sock.c
index b8149ca..c5de61a 100644
--- a/source3/lib/util_sock.c
+++ b/source3/lib/util_sock.c
@@ -219,6 +219,31 @@ ssize_t iov_buflen(const struct iovec *iov, int iovcnt)
 	return buflen;
 }
 
+uint8_t *iov_buf(TALLOC_CTX *mem_ctx, const struct iovec *iov, int iovcnt)
+{
+	int i;
+	ssize_t buflen;
+	uint8_t *buf, *p;
+
+	buflen = iov_buflen(iov, iovcnt);
+	if (buflen == -1) {
+		return NULL;
+	}
+	buf = talloc_array(mem_ctx, uint8_t, buflen);
+	if (buf == NULL) {
+		return NULL;
+	}
+
+	p = buf;
+	for (i=0; i<iovcnt; i++) {
+		size_t len = iov[i].iov_len;
+
+		memcpy(p, iov[i].iov_base, len);
+		p += len;
+	}
+	return buf;
+}
+
 /****************************************************************************
  Write all data from an iov array
  NB. This can be called with a non-socket fd, don't add dependencies
-- 
1.7.9.5


>From 8f08de8c711a1a967454befb3619d32a1a14af38 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Tue, 25 Feb 2014 12:15:58 +0000
Subject: [PATCH 11/13] messaging3: Add messaging_send_iov

This uses a copy, will be replaced by a direct iovec call through to
sendmsg on the unix domain socket
---
 source3/include/messages.h |    3 +++
 source3/lib/messages.c     |   19 +++++++++++++++++++
 2 files changed, 22 insertions(+)

diff --git a/source3/include/messages.h b/source3/include/messages.h
index b89af9c..3e1b664 100644
--- a/source3/include/messages.h
+++ b/source3/include/messages.h
@@ -132,6 +132,9 @@ NTSTATUS messaging_send(struct messaging_context *msg_ctx,
 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
 			    struct server_id server, uint32_t msg_type,
 			    const uint8_t *buf, size_t len);
+NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
+			    struct server_id server, uint32_t msg_type,
+			    const struct iovec *iov, int iovlen);
 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
 			    struct messaging_rec *rec);
 
diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index 5db1214..b4ed807 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -419,6 +419,25 @@ NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
 	return messaging_send(msg_ctx, server, msg_type, &blob);
 }
 
+NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
+			    struct server_id server, uint32_t msg_type,
+			    const struct iovec *iov, int iovlen)
+{
+	uint8_t *buf;
+	NTSTATUS status;
+
+	buf = iov_buf(talloc_tos(), iov, iovlen);
+	if (buf == NULL) {
+		return NT_STATUS_NO_MEMORY;
+	}
+
+	status = messaging_send_buf(msg_ctx, server, msg_type,
+				    buf, talloc_get_size(buf));
+
+	TALLOC_FREE(buf);
+	return status;
+}
+
 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
 					       struct messaging_rec *rec)
 {
-- 
1.7.9.5


>From 89e923c88f794980a57db625cde7536beeb6e2b1 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 4 Apr 2014 21:01:01 +0200
Subject: [PATCH 12/13] smbd: Pass on a timestamp in MSG_PVFS_NOTIFY

---
 source3/smbd/notify_internal.c |   67 ++++++++++++++++++++--------------------
 1 file changed, 33 insertions(+), 34 deletions(-)

diff --git a/source3/smbd/notify_internal.c b/source3/smbd/notify_internal.c
index 69f9377..e902bf4 100644
--- a/source3/smbd/notify_internal.c
+++ b/source3/smbd/notify_internal.c
@@ -767,30 +767,32 @@ done:
 	TALLOC_FREE(data.dptr);
 }
 
+struct notify_msg {
+	struct timespec when;
+	void *private_data;
+	uint32_t action;
+	char path[1];
+};
+
 static NTSTATUS notify_send(struct notify_context *notify,
 			    struct server_id *pid,
 			    const char *path, uint32_t action,
 			    void *private_data)
 {
-	struct notify_event ev;
-	DATA_BLOB data;
-	NTSTATUS status;
-	enum ndr_err_code ndr_err;
+	struct notify_msg m = {};
+	struct iovec iov[2];
 
-	ev.action = action;
-	ev.path = path;
-	ev.private_data = private_data;
+	m.when = timespec_current();
+	m.private_data = private_data;
+	m.action = action;
 
-	ndr_err = ndr_push_struct_blob(
-		&data, talloc_tos(), &ev,
-		(ndr_push_flags_fn_t)ndr_push_notify_event);
-	if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
-		return ndr_map_error2ntstatus(ndr_err);
-	}
-	status = messaging_send(notify->msg, *pid, MSG_PVFS_NOTIFY,
-				&data);
-	TALLOC_FREE(data.data);
-	return status;
+	iov[0].iov_base = &m;
+	iov[0].iov_len = offsetof(struct notify_msg, path);
+	iov[1].iov_base = discard_const_p(char, path);
+	iov[1].iov_len = strlen(path)+1;
+
+	return messaging_send_iov(notify->msg, *pid, MSG_PVFS_NOTIFY,
+				  iov, ARRAY_SIZE(iov));
 }
 
 static void notify_handler(struct messaging_context *msg_ctx,
@@ -799,34 +801,31 @@ static void notify_handler(struct messaging_context *msg_ctx,
 {
 	struct notify_context *notify = talloc_get_type_abort(
 		private_data, struct notify_context);
-	enum ndr_err_code ndr_err;
-	struct notify_event *n;
+	struct notify_msg *m;
+	struct notify_event e;
 	struct notify_list *listel;
 
-	n = talloc(talloc_tos(), struct notify_event);
-	if (n == NULL) {
-		DEBUG(1, ("talloc failed\n"));
+	if (data->length == 0) {
+		DEBUG(1, ("%s: Got 0-sized MSG_PVFS_NOTIFY msg\n", __func__));
 		return;
 	}
-
-	ndr_err = ndr_pull_struct_blob(
-		data, n, n, (ndr_pull_flags_fn_t)ndr_pull_notify_event);
-	if (!NDR_ERR_CODE_IS_SUCCESS(ndr_err)) {
-		TALLOC_FREE(n);
+	if (data->data[data->length-1] != 0) {
+		DEBUG(1, ("%s: MSG_PVFS_NOTIFY path not 0-terminated\n",
+			  __func__));
 		return;
 	}
-	if (DEBUGLEVEL >= 10) {
-		NDR_PRINT_DEBUG(notify_event, n);
-	}
+
+	m = (struct notify_msg *)data->data;
+	e.action = m->action;
+	e.path = m->path;
+	e.private_data = m->private_data;
 
 	for (listel=notify->list;listel;listel=listel->next) {
-		if (listel->private_data == n->private_data) {
-			listel->callback(listel->private_data,
-					 timespec_current(), n);
+		if (listel->private_data == m->private_data) {
+			listel->callback(listel->private_data, m->when, &e);
 			break;
 		}
 	}
-	TALLOC_FREE(n);
 }
 
 struct notify_walk_idx_state {
-- 
1.7.9.5


>From 61e1cbd96faf877b4564a665e48a57f4c6f69845 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 4 Apr 2014 21:12:06 +0200
Subject: [PATCH 13/13] smbd: Sort notify events by timestamp

---
 source3/smbd/notify.c |   16 ++++++++++++++++
 1 file changed, 16 insertions(+)

diff --git a/source3/smbd/notify.c b/source3/smbd/notify.c
index b8085dd..dd4dc1a 100644
--- a/source3/smbd/notify.c
+++ b/source3/smbd/notify.c
@@ -170,6 +170,14 @@ static bool notify_marshall_changes(int num_changes,
 	return True;
 }
 
+static int compare_notify_change_events(const void *p1, const void *p2)
+{
+	const struct notify_change_event *e1 = p1;
+	const struct notify_change_event *e2 = p2;
+
+	return timespec_compare(&e1->when, &e2->when);
+}
+
 /****************************************************************************
  Setup the common parts of the return packet and send it.
 *****************************************************************************/
@@ -194,6 +202,14 @@ void change_notify_reply(struct smb_request *req,
 		return;
 	}
 
+	/*
+	 * Sort the notifies by timestamp when the event happened to avoid
+	 * coalescing and thus dropping events in notify_marshall_changes.
+	 */
+
+	qsort(notify_buf->changes, notify_buf->num_changes,
+	      sizeof(*(notify_buf->changes)), compare_notify_change_events);
+
 	if (!notify_marshall_changes(notify_buf->num_changes, max_param,
 					notify_buf->changes, &blob)) {
 		/*
-- 
1.7.9.5



More information about the samba-technical mailing list