samba_spnupdate invoked oom-killer ( samba BUG )

Volker Lendecke Volker.Lendecke at SerNet.DE
Sat Oct 4 03:50:21 MDT 2014


On Thu, Oct 02, 2014 at 06:56:04AM +1300, Andrew Bartlett wrote:
> On Wed, 2014-10-01 at 17:35 +0200, Volker Lendecke wrote:
> > On Wed, Oct 01, 2014 at 08:33:13AM -0700, Jeremy Allison wrote:
> > > On Wed, Oct 01, 2014 at 06:44:51AM +0200, Volker Lendecke wrote:
> > > > On Tue, Sep 30, 2014 at 11:46:24AM -0700, Richard Sharpe wrote:
> > > > > On Tue, Sep 30, 2014 at 10:24 AM, Chan Min Wai <dcmwai at gmail.com> wrote:
> > > > > > Hi Richard,
> > > > > >
> > > > > > I think smbcontrol didn't work on samba daemon (the AD DC daemon)
> > > > > 
> > > > > Ahhh, this is an AD DC problem. My experience is only with the file
> > > > > server side of the code. I think you need other people to check in
> > > > > here to figure this out.
> > > > > 
> > > > > I seems like there is a leak. I would increase the log level to see if
> > > > > I can see anything that is happening over and over again ...
> > > > 
> > > > FYI: In the last weeks I've got smbcontrol working against the DC,
> > > > so in the near future we'll have the infrastructure to do a debug or
> > > > pool-usage message against the DC.
> > > 
> > > W00t! Great work Volker, thanks. This is something that
> > > has been sorely needed and will greatly increse reliability.
> > 
> > Well, if I only had time now to get this upstream :-(
> 
> Is this something I can help with?

You might want to take a look at the attached patchset. This
is what I've dug out of my huge messaging mess. It compiles,
but right now I don't have the time to even do some simple
tests. At least that's the direction where I want to go
eventually.

Maybe you already have some comments.

Thanks,

Volker


Maybe you already have some comments.

Thanks,

Volker

-- 
SerNet GmbH, Bahnhofsallee 1b, 37081 Göttingen
phone: +49-551-370000-0, fax: +49-551-370000-9
AG Göttingen, HRB 2816, GF: Dr. Johannes Loxen
http://www.sernet.de, mailto:kontakt at sernet.de
-------------- next part --------------
From 235e500bf32037aabe657f9ef7451c93378faad5 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Sat, 4 Oct 2014 10:40:24 +0200
Subject: [PATCH 01/11] messages_dgm: Only pass "unique" to messaging_dgm_init

Only one context per pid, messaging_dgm_init can do getpid() itself.

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/lib/messages.c     |    4 ++--
 source3/lib/messages_dgm.c |   10 +++++-----
 source3/lib/messages_dgm.h |    2 +-
 3 files changed, 8 insertions(+), 8 deletions(-)

diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index aaaee52..017f6f5 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -287,7 +287,7 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
 
 	sec_init();
 
-	ret = messaging_dgm_init(ctx->event_ctx, ctx->id,
+	ret = messaging_dgm_init(ctx->event_ctx, ctx->id.unique_id,
 				 lp_cache_directory(), sec_initial_uid(),
 				 messaging_recv_cb, ctx);
 
@@ -339,7 +339,7 @@ NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
 
 	msg_ctx->id = procid_self();
 
-	ret = messaging_dgm_init(msg_ctx->event_ctx, msg_ctx->id,
+	ret = messaging_dgm_init(msg_ctx->event_ctx, msg_ctx->id.unique_id,
 				 lp_cache_directory(), sec_initial_uid(),
 				 messaging_recv_cb, msg_ctx);
 	if (ret != 0) {
diff --git a/source3/lib/messages_dgm.c b/source3/lib/messages_dgm.c
index ae35282..8f5ac4d 100644
--- a/source3/lib/messages_dgm.c
+++ b/source3/lib/messages_dgm.c
@@ -176,7 +176,7 @@ static int messaging_dgm_lockfile_remove(const char *cache_dir, pid_t pid)
 }
 
 int messaging_dgm_init(struct tevent_context *ev,
-		       struct server_id pid,
+		       uint64_t unique,
 		       const char *cache_dir,
 		       uid_t dir_owner,
 		       void (*recv_cb)(const uint8_t *msg,
@@ -203,7 +203,7 @@ int messaging_dgm_init(struct tevent_context *ev,
 	if (ctx == NULL) {
 		goto fail_nomem;
 	}
-	ctx->pid = pid.pid;
+	ctx->pid = getpid();
 	ctx->recv_cb = recv_cb;
 	ctx->recv_cb_private_data = recv_cb_private_data;
 
@@ -220,14 +220,14 @@ int messaging_dgm_init(struct tevent_context *ev,
 	socket_address = (struct sockaddr_un) { .sun_family = AF_UNIX };
 	sockname_len = snprintf(socket_address.sun_path,
 				sizeof(socket_address.sun_path),
-				"%s/%u", socket_dir.buf, (unsigned)pid.pid);
+				"%s/%u", socket_dir.buf, (unsigned)ctx->pid);
 	if (sockname_len >= sizeof(socket_address.sun_path)) {
 		TALLOC_FREE(ctx);
 		return ENAMETOOLONG;
 	}
 
-	ret = messaging_dgm_lockfile_create(cache_dir, dir_owner, pid.pid,
-					    &ctx->lockfile_fd, pid.unique_id);
+	ret = messaging_dgm_lockfile_create(cache_dir, dir_owner, ctx->pid,
+					    &ctx->lockfile_fd, unique);
 	if (ret != 0) {
 		DEBUG(1, ("%s: messaging_dgm_create_lockfile failed: %s\n",
 			  __func__, strerror(ret)));
diff --git a/source3/lib/messages_dgm.h b/source3/lib/messages_dgm.h
index 00ff56f..9d01976 100644
--- a/source3/lib/messages_dgm.h
+++ b/source3/lib/messages_dgm.h
@@ -21,7 +21,7 @@
 #define _MESSAGES_DGM_H_
 
 int messaging_dgm_init(struct tevent_context *ev,
-		       struct server_id pid,
+		       uint64_t unique,
 		       const char *cache_dir,
 		       uid_t dir_owner,
 		       void (*recv_cb)(const uint8_t *msg,
-- 
1.7.9.5


From b7356d20f4771d8697ccdbdeb5adac151e2a7da5 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Sat, 4 Oct 2014 10:58:15 +0200
Subject: [PATCH 02/11] messaging_dgm: Move directory handling up

When we want to merge with source4, we need to find better places for the lock
and socket directories. Source4 does not have the concept of a cache directory.
So I chose "private dir"/sock and "lock dir"/msg as subdirectories.
---
 source3/lib/messages.c     |   38 ++++++++++-
 source3/lib/messages_dgm.c |  160 +++++++++++++++++---------------------------
 source3/lib/messages_dgm.h |    4 +-
 3 files changed, 98 insertions(+), 104 deletions(-)

diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index 017f6f5..b3debe7 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -271,12 +271,20 @@ static int messaging_context_destructor(struct messaging_context *ctx)
 	return 0;
 }
 
+static const char *private_path(const char *name)
+{
+	return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
+}
+
 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
 					 struct tevent_context *ev)
 {
 	struct messaging_context *ctx;
 	NTSTATUS status;
 	int ret;
+	const char *lck_path;
+	const char *priv_path;
+	bool ok;
 
 	if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
 		return NULL;
@@ -287,8 +295,34 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
 
 	sec_init();
 
+	lck_path = lock_path("msg");
+	if (lck_path == NULL) {
+		TALLOC_FREE(ctx);
+		return NULL;
+	}
+
+	ok = directory_create_or_exist_strict(lck_path, sec_initial_uid(),
+					      0755);
+	if (!ok) {
+		DEBUG(10, ("%s: Could not create lock directory: %s\n",
+			   __func__, strerror(errno)));
+		TALLOC_FREE(ctx);
+		return NULL;
+	}
+
+	priv_path = private_path("sock");
+
+	ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
+					      0700);
+	if (!ok) {
+		DEBUG(10, ("%s: Could not create msg directory: %s\n",
+			   __func__, strerror(errno)));
+		TALLOC_FREE(ctx);
+		return NULL;
+	}
+
 	ret = messaging_dgm_init(ctx->event_ctx, ctx->id.unique_id,
-				 lp_cache_directory(), sec_initial_uid(),
+				 priv_path, lck_path,
 				 messaging_recv_cb, ctx);
 
 	if (ret != 0) {
@@ -340,7 +374,7 @@ NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
 	msg_ctx->id = procid_self();
 
 	ret = messaging_dgm_init(msg_ctx->event_ctx, msg_ctx->id.unique_id,
-				 lp_cache_directory(), sec_initial_uid(),
+				 private_path("sock"), lock_path("msg"),
 				 messaging_recv_cb, msg_ctx);
 	if (ret != 0) {
 		DEBUG(0, ("messaging_dgm_init failed: %s\n", strerror(errno)));
diff --git a/source3/lib/messages_dgm.c b/source3/lib/messages_dgm.c
index 8f5ac4d..b64b2b0 100644
--- a/source3/lib/messages_dgm.c
+++ b/source3/lib/messages_dgm.c
@@ -39,7 +39,8 @@ struct messaging_dgm_context {
 	struct poll_funcs *msg_callbacks;
 	void *tevent_handle;
 	struct unix_msg_ctx *dgm_ctx;
-	struct sun_path_buf cache_dir;
+	struct sun_path_buf socket_dir;
+	struct sun_path_buf lockfile_dir;
 	int lockfile_fd;
 
 	void (*recv_cb)(const uint8_t *msg,
@@ -59,49 +60,23 @@ static void messaging_dgm_recv(struct unix_msg_ctx *ctx,
 			       int *fds, size_t num_fds,
 			       void *private_data);
 
-static int messaging_dgm_lockfile_name(struct sun_path_buf *buf,
-				       const char *cache_dir,
-				       pid_t pid)
-{
-	int ret;
-
-	ret = snprintf(buf->buf, sizeof(buf->buf), "%s/lck/%u", cache_dir,
-		       (unsigned)pid);
-	if (ret >= sizeof(buf->buf)) {
-		return ENAMETOOLONG;
-	}
-	return 0;
-}
-
 static int messaging_dgm_context_destructor(struct messaging_dgm_context *c);
 
-static int messaging_dgm_lockfile_create(const char *cache_dir,
-					 uid_t dir_owner, pid_t pid,
-					 int *plockfile_fd, uint64_t unique)
+static int messaging_dgm_lockfile_create(struct messaging_dgm_context *ctx,
+					 pid_t pid, int *plockfile_fd,
+					 uint64_t unique)
 {
 	fstring buf;
-	struct sun_path_buf dir;
-	struct sun_path_buf lockfile_name;
 	int lockfile_fd;
+	struct sun_path_buf lockfile_name;
 	struct flock lck;
 	int unique_len, ret;
 	ssize_t written;
-	bool ok;
-
-	ret = messaging_dgm_lockfile_name(&lockfile_name, cache_dir, pid);
-	if (ret != 0) {
-		return ret;
-	}
 
-	/* shorter than lockfile_name, can't overflow */
-	snprintf(dir.buf, sizeof(dir.buf), "%s/lck", cache_dir);
-
-	ok = directory_create_or_exist_strict(dir.buf, dir_owner, 0755);
-	if (!ok) {
-		ret = errno;
-		DEBUG(1, ("%s: Could not create lock directory: %s\n",
-			  __func__, strerror(ret)));
-		return ret;
+	ret = snprintf(lockfile_name.buf, sizeof(lockfile_name.buf),
+		       "%s/%u", ctx->lockfile_dir.buf, (int)pid);
+	if (ret >= sizeof(lockfile_name.buf)) {
+		return ENAMETOOLONG;
 	}
 
 	/* no O_EXCL, existence check is via the fcntl lock */
@@ -155,30 +130,10 @@ fail_close:
 	return ret;
 }
 
-static int messaging_dgm_lockfile_remove(const char *cache_dir, pid_t pid)
-{
-	struct sun_path_buf lockfile_name;
-	int ret;
-
-	ret = messaging_dgm_lockfile_name(&lockfile_name, cache_dir, pid);
-	if (ret != 0) {
-		return ret;
-	}
-
-	ret = unlink(lockfile_name.buf);
-	if (ret == -1) {
-		ret = errno;
-		DEBUG(10, ("%s: unlink(%s) failed: %s\n", __func__,
-			   lockfile_name.buf, strerror(ret)));
-	}
-
-	return ret;
-}
-
 int messaging_dgm_init(struct tevent_context *ev,
 		       uint64_t unique,
-		       const char *cache_dir,
-		       uid_t dir_owner,
+		       const char *socket_dir,
+		       const char *lockfile_dir,
 		       void (*recv_cb)(const uint8_t *msg,
 				       size_t msg_len,
 				       int *fds,
@@ -188,11 +143,9 @@ int messaging_dgm_init(struct tevent_context *ev,
 {
 	struct messaging_dgm_context *ctx;
 	int ret;
-	bool ok;
-	struct sun_path_buf socket_dir;
 	struct sockaddr_un socket_address;
-	size_t sockname_len;
 	uint64_t cookie;
+	size_t len;
 	static bool have_dgm_context = false;
 
 	if (have_dgm_context) {
@@ -207,27 +160,31 @@ int messaging_dgm_init(struct tevent_context *ev,
 	ctx->recv_cb = recv_cb;
 	ctx->recv_cb_private_data = recv_cb_private_data;
 
-	ret = snprintf(socket_dir.buf, sizeof(socket_dir.buf),
-		       "%s/msg", cache_dir);
-	if (ret >= sizeof(socket_dir.buf)) {
+	len = strlcpy(ctx->lockfile_dir.buf, lockfile_dir,
+		      sizeof(ctx->lockfile_dir.buf));
+	if (len >= sizeof(ctx->lockfile_dir.buf)) {
 		TALLOC_FREE(ctx);
 		return ENAMETOOLONG;
 	}
 
-	/* shorter than socket_dir, can't overflow */
-	strlcpy(ctx->cache_dir.buf, cache_dir, sizeof(ctx->cache_dir.buf));
+	len = strlcpy(ctx->socket_dir.buf, socket_dir,
+		      sizeof(ctx->socket_dir.buf));
+	if (len >= sizeof(ctx->socket_dir.buf)) {
+		TALLOC_FREE(ctx);
+		return ENAMETOOLONG;
+	}
 
 	socket_address = (struct sockaddr_un) { .sun_family = AF_UNIX };
-	sockname_len = snprintf(socket_address.sun_path,
-				sizeof(socket_address.sun_path),
-				"%s/%u", socket_dir.buf, (unsigned)ctx->pid);
-	if (sockname_len >= sizeof(socket_address.sun_path)) {
+	len = snprintf(socket_address.sun_path,
+		       sizeof(socket_address.sun_path),
+		       "%s/%u", socket_dir, (unsigned)ctx->pid);
+	if (len >= sizeof(socket_address.sun_path)) {
 		TALLOC_FREE(ctx);
 		return ENAMETOOLONG;
 	}
 
-	ret = messaging_dgm_lockfile_create(cache_dir, dir_owner, ctx->pid,
-					    &ctx->lockfile_fd, unique);
+	ret = messaging_dgm_lockfile_create(ctx, ctx->pid, &ctx->lockfile_fd,
+					    unique);
 	if (ret != 0) {
 		DEBUG(1, ("%s: messaging_dgm_create_lockfile failed: %s\n",
 			  __func__, strerror(ret)));
@@ -246,13 +203,6 @@ int messaging_dgm_init(struct tevent_context *ev,
 		goto fail_nomem;
 	}
 
-	ok = directory_create_or_exist_strict(socket_dir.buf, dir_owner, 0700);
-	if (!ok) {
-		DEBUG(1, ("Could not create socket directory\n"));
-		TALLOC_FREE(ctx);
-		return EACCES;
-	}
-
 	unlink(socket_address.sun_path);
 
 	generate_random_buffer((uint8_t *)&cookie, sizeof(cookie));
@@ -285,7 +235,19 @@ static int messaging_dgm_context_destructor(struct messaging_dgm_context *c)
 	unix_msg_free(c->dgm_ctx);
 
 	if (getpid() == c->pid) {
-		(void)messaging_dgm_lockfile_remove(c->cache_dir.buf, c->pid);
+		struct sun_path_buf name;
+		int ret;
+
+		ret = snprintf(name.buf, sizeof(name.buf), "%s/%u",
+			       c->lockfile_dir.buf, (unsigned)c->pid);
+		if (ret >= sizeof(name.buf)) {
+			/*
+			 * We've checked the length when creating, so this
+			 * should never happen
+			 */
+			abort();
+		}
+		unlink(name.buf);
 	}
 	close(c->lockfile_fd);
 
@@ -317,7 +279,7 @@ int messaging_dgm_send(pid_t pid,
 	dst = (struct sockaddr_un) { .sun_family = AF_UNIX };
 
 	dst_pathlen = snprintf(dst.sun_path, sizeof(dst.sun_path),
-			       "%s/msg/%u", ctx->cache_dir.buf, (unsigned)pid);
+			       "%s/%u", ctx->socket_dir.buf, (unsigned)pid);
 	if (dst_pathlen >= sizeof(dst.sun_path)) {
 		return ENAMETOOLONG;
 	}
@@ -345,22 +307,24 @@ int messaging_dgm_cleanup(pid_t pid)
 {
 	struct messaging_dgm_context *ctx = global_dgm_context;
 	struct sun_path_buf lockfile_name, socket_name;
-	int fd, ret;
+	int fd, len, ret;
 	struct flock lck = {};
 
 	if (ctx == NULL) {
 		return ENOTCONN;
 	}
 
-	ret = messaging_dgm_lockfile_name(&lockfile_name, ctx->cache_dir.buf,
-					  pid);
-	if (ret != 0) {
-		return ret;
+	len = snprintf(socket_name.buf, sizeof(socket_name.buf), "%s/%u",
+		       ctx->socket_dir.buf, (unsigned)pid);
+	if (len >= sizeof(socket_name.buf)) {
+		return ENAMETOOLONG;
 	}
 
-	/* same length as lockfile_name, can't overflow */
-	snprintf(socket_name.buf, sizeof(socket_name.buf), "%s/msg/%u",
-		 ctx->cache_dir.buf, (unsigned)pid);
+	len = snprintf(lockfile_name.buf, sizeof(lockfile_name.buf), "%s/%u",
+		       ctx->lockfile_dir.buf, (unsigned)pid);
+	if (len >= sizeof(lockfile_name.buf)) {
+		return ENAMETOOLONG;
+	}
 
 	fd = open(lockfile_name.buf, O_NONBLOCK|O_WRONLY, 0);
 	if (fd == -1) {
@@ -380,12 +344,16 @@ int messaging_dgm_cleanup(pid_t pid)
 	ret = fcntl(fd, F_SETLK, &lck);
 	if (ret != 0) {
 		ret = errno;
-		DEBUG(10, ("%s: Could not get lock: %s\n", __func__,
-			   strerror(ret)));
+		if ((ret != EACCES) && (ret != EAGAIN)) {
+			DEBUG(10, ("%s: Could not get lock: %s\n", __func__,
+				   strerror(ret)));
+		}
 		close(fd);
 		return ret;
 	}
 
+	DEBUG(10, ("%s: Cleaning up : %s\n", __func__, strerror(ret)));
+
 	(void)unlink(socket_name.buf);
 	(void)unlink(lockfile_name.buf);
 	(void)close(fd);
@@ -395,7 +363,6 @@ int messaging_dgm_cleanup(pid_t pid)
 int messaging_dgm_wipe(void)
 {
 	struct messaging_dgm_context *ctx = global_dgm_context;
-	struct sun_path_buf msgdir_name;
 	DIR *msgdir;
 	struct dirent *dp;
 	pid_t our_pid = getpid();
@@ -411,16 +378,9 @@ int messaging_dgm_wipe(void)
 	 * and fcntl(SETLK).
 	 */
 
-	ret = snprintf(msgdir_name.buf, sizeof(msgdir_name.buf),
-		       "%s/msg", ctx->cache_dir.buf);
-	if (ret >= sizeof(msgdir_name.buf)) {
-		return ENAMETOOLONG;
-	}
-
-	msgdir = opendir(msgdir_name.buf);
+	msgdir = opendir(ctx->socket_dir.buf);
 	if (msgdir == NULL) {
-		ret = errno;
-		return ret;
+		return errno;
 	}
 
 	while ((dp = readdir(msgdir)) != NULL) {
diff --git a/source3/lib/messages_dgm.h b/source3/lib/messages_dgm.h
index 9d01976..54a1b31 100644
--- a/source3/lib/messages_dgm.h
+++ b/source3/lib/messages_dgm.h
@@ -22,8 +22,8 @@
 
 int messaging_dgm_init(struct tevent_context *ev,
 		       uint64_t unique,
-		       const char *cache_dir,
-		       uid_t dir_owner,
+		       const char *socket_dir,
+		       const char *lockfile_dir,
 		       void (*recv_cb)(const uint8_t *msg,
 				       size_t msg_len,
 				       int *fds,
-- 
1.7.9.5


From 321cda9a3c00ca15c4953f04aa7c07cc0b369660 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Sun, 14 Sep 2014 17:52:07 +0200
Subject: [PATCH 03/11] unix_msg: remove cookie from unix_msg_init

"pid" and "sock" are sufficient I guess as randomizers to distinguish messages.
In theory, a pid could be recycled very quickly, which might mix up in-flight
messages. But once a few messages have passed, "cookie" would be incremented as
another indicator of a fresh message.

Why? Remove messages_dgm dependency on samba-util

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/lib/messages_dgm.c         |    5 +----
 source3/lib/unix_msg/test_drain.c  |    2 +-
 source3/lib/unix_msg/test_source.c |    2 +-
 source3/lib/unix_msg/tests.c       |    9 +++------
 source3/lib/unix_msg/unix_msg.c    |    4 ++--
 source3/lib/unix_msg/unix_msg.h    |    3 +--
 6 files changed, 9 insertions(+), 16 deletions(-)

diff --git a/source3/lib/messages_dgm.c b/source3/lib/messages_dgm.c
index b64b2b0..1602caf 100644
--- a/source3/lib/messages_dgm.c
+++ b/source3/lib/messages_dgm.c
@@ -144,7 +144,6 @@ int messaging_dgm_init(struct tevent_context *ev,
 	struct messaging_dgm_context *ctx;
 	int ret;
 	struct sockaddr_un socket_address;
-	uint64_t cookie;
 	size_t len;
 	static bool have_dgm_context = false;
 
@@ -205,9 +204,7 @@ int messaging_dgm_init(struct tevent_context *ev,
 
 	unlink(socket_address.sun_path);
 
-	generate_random_buffer((uint8_t *)&cookie, sizeof(cookie));
-
-	ret = unix_msg_init(&socket_address, ctx->msg_callbacks, 1024, cookie,
+	ret = unix_msg_init(&socket_address, ctx->msg_callbacks, 1024,
 			    messaging_dgm_recv, ctx, &ctx->dgm_ctx);
 	if (ret != 0) {
 		DEBUG(1, ("unix_msg_init failed: %s\n", strerror(ret)));
diff --git a/source3/lib/unix_msg/test_drain.c b/source3/lib/unix_msg/test_drain.c
index 5b6a930..8f3bed9 100644
--- a/source3/lib/unix_msg/test_drain.c
+++ b/source3/lib/unix_msg/test_drain.c
@@ -45,7 +45,7 @@ int main(int argc, const char *argv[])
 		return 1;
 	}
 
-	ret = unix_msg_init(&addr, funcs, 256, 1, recv_cb, &state, &ctx);
+	ret = unix_msg_init(&addr, funcs, 256, recv_cb, &state, &ctx);
 	if (ret != 0) {
 		fprintf(stderr, "unix_msg_init failed: %s\n",
 			strerror(ret));
diff --git a/source3/lib/unix_msg/test_source.c b/source3/lib/unix_msg/test_source.c
index 5224ebf..3b65267 100644
--- a/source3/lib/unix_msg/test_source.c
+++ b/source3/lib/unix_msg/test_source.c
@@ -46,7 +46,7 @@ int main(int argc, const char *argv[])
 	}
 
 	for (i=0; i<num_ctxs; i++) {
-		ret = unix_msg_init(NULL, funcs, 256, 1, NULL, NULL,
+		ret = unix_msg_init(NULL, funcs, 256, NULL, NULL,
 				    &ctxs[i]);
 		if (ret != 0) {
 			fprintf(stderr, "unix_msg_init failed: %s\n",
diff --git a/source3/lib/unix_msg/tests.c b/source3/lib/unix_msg/tests.c
index df094af..9a15f9d 100644
--- a/source3/lib/unix_msg/tests.c
+++ b/source3/lib/unix_msg/tests.c
@@ -70,16 +70,14 @@ int main(void)
 		return 1;
 	}
 
-	ret = unix_msg_init(&addr1, funcs, 256, 1,
-			    recv_cb, &state, &ctx1);
+	ret = unix_msg_init(&addr1, funcs, 256, recv_cb, &state, &ctx1);
 	if (ret != 0) {
 		fprintf(stderr, "unix_msg_init failed: %s\n",
 			strerror(ret));
 		return 1;
 	}
 
-	ret = unix_msg_init(&addr1, funcs, 256, 1,
-			    recv_cb, &state, &ctx1);
+	ret = unix_msg_init(&addr1, funcs, 256, recv_cb, &state, &ctx1);
 	if (ret == 0) {
 		fprintf(stderr, "unix_msg_init succeeded unexpectedly\n");
 		return 1;
@@ -90,8 +88,7 @@ int main(void)
 		return 1;
 	}
 
-	ret = unix_msg_init(&addr2, funcs, 256, 1,
-			    recv_cb, &state, &ctx2);
+	ret = unix_msg_init(&addr2, funcs, 256, recv_cb, &state, &ctx2);
 	if (ret != 0) {
 		fprintf(stderr, "unix_msg_init failed: %s\n",
 			strerror(ret));
diff --git a/source3/lib/unix_msg/unix_msg.c b/source3/lib/unix_msg/unix_msg.c
index 4870068..ccc7804 100644
--- a/source3/lib/unix_msg/unix_msg.c
+++ b/source3/lib/unix_msg/unix_msg.c
@@ -843,7 +843,7 @@ static void unix_msg_recv(struct unix_dgram_ctx *dgram_ctx,
 
 int unix_msg_init(const struct sockaddr_un *addr,
 		  const struct poll_funcs *ev_funcs,
-		  size_t fragment_len, uint64_t cookie,
+		  size_t fragment_len,
 		  void (*recv_callback)(struct unix_msg_ctx *ctx,
 					uint8_t *msg, size_t msg_len,
 					int *fds, size_t num_fds,
@@ -861,7 +861,7 @@ int unix_msg_init(const struct sockaddr_un *addr,
 
 	*ctx = (struct unix_msg_ctx) {
 		.fragment_len = fragment_len,
-		.cookie = cookie,
+		.cookie = 1,
 		.recv_callback = recv_callback,
 		.private_data = private_data
 	};
diff --git a/source3/lib/unix_msg/unix_msg.h b/source3/lib/unix_msg/unix_msg.h
index 56f7a40..240ad64 100644
--- a/source3/lib/unix_msg/unix_msg.h
+++ b/source3/lib/unix_msg/unix_msg.h
@@ -75,7 +75,6 @@ struct unix_msg_ctx;
  * @param[in] path The socket path
  * @param[in] ev_funcs The event callback functions to use
  * @param[in] fragment_size Maximum datagram size to send/receive
- * @param[in] cookie Random number to identify this context
  * @param[in] recv_callback Function called when a message is received
  * @param[in] private_data Private pointer for recv_callback
  * @param[out] result The new struct unix_msg_ctx
@@ -85,7 +84,7 @@ struct unix_msg_ctx;
 
 int unix_msg_init(const struct sockaddr_un *addr,
 		  const struct poll_funcs *ev_funcs,
-		  size_t fragment_size, uint64_t cookie,
+		  size_t fragment_size,
 		  void (*recv_callback)(struct unix_msg_ctx *ctx,
 					uint8_t *msg, size_t msg_len,
 					int *fds, size_t num_fds,
-- 
1.7.9.5


From 87beca204c7de811e976428798ece38d950c7799 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Sat, 4 Oct 2014 11:11:46 +0200
Subject: [PATCH 04/11] messages_dgm: Make it an independent lib

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/wscript_build |    7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/source3/wscript_build b/source3/wscript_build
index 54ba3a7..a72dec8 100755
--- a/source3/wscript_build
+++ b/source3/wscript_build
@@ -311,9 +311,13 @@ bld.SAMBA3_SUBSYSTEM('TDB_LIB',
                      lib/g_lock.c''',
                      deps='dbwrap samba-cluster-support')
 
+bld.SAMBA3_LIBRARY('messages_dgm',
+                   source='''lib/messages_dgm.c''',
+                   deps='talloc UNIX_MSG POLL_FUNCS_TEVENT samba-debug',
+                   private_library=True)
+
 bld.SAMBA3_SUBSYSTEM('samba3core',
                    source='''lib/messages.c
-                   lib/messages_dgm.c
                    lib/util_cluster.c
                    lib/id_cache.c
                    lib/talloc_dict.c
@@ -360,6 +364,7 @@ bld.SAMBA3_SUBSYSTEM('samba3core',
                         dbwrap
                         samba3-util
                         errors3
+                        messages_dgm
                         TDB_LIB''')
 
 bld.SAMBA3_LIBRARY('smbd_shim',
-- 
1.7.9.5


From 19afcd1d24738ff8247c80038be1d9fca17a6ba7 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Sat, 4 Oct 2014 11:14:26 +0200
Subject: [PATCH 05/11] messages_dgm: Add a few #includes

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/lib/messages_dgm.h |    4 ++++
 1 file changed, 4 insertions(+)

diff --git a/source3/lib/messages_dgm.h b/source3/lib/messages_dgm.h
index 54a1b31..c9c9c61 100644
--- a/source3/lib/messages_dgm.h
+++ b/source3/lib/messages_dgm.h
@@ -20,6 +20,10 @@
 #ifndef _MESSAGES_DGM_H_
 #define _MESSAGES_DGM_H_
 
+#include "replace.h"
+#include "system/filesys.h"
+#include <tevent.h>
+
 int messaging_dgm_init(struct tevent_context *ev,
 		       uint64_t unique,
 		       const char *socket_dir,
-- 
1.7.9.5


From ff2ddff81f5369809ff79bd967d7a64eb2622198 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Sat, 4 Oct 2014 11:15:12 +0200
Subject: [PATCH 06/11] lib: Add messages_dgm_ref.[ch]

We only have one messaging_dgm context per process. But we will use this from
two completely independent messaging subsystems which are independently
initialized. We need to coordinate creation and destruction, do this via
talloc.

I know this looks like a step back, but when in the future we have really just
one messaging subsystem, this can go again. My immediate goal is to make
source3 and source4 transport-compatible, and this looks like a quick way
towards that goal.
---
 source3/lib/messages_dgm_ref.c |   92 ++++++++++++++++++++++++++++++++++++++++
 source3/lib/messages_dgm_ref.h |   37 ++++++++++++++++
 source3/wscript_build          |    2 +-
 3 files changed, 130 insertions(+), 1 deletion(-)
 create mode 100644 source3/lib/messages_dgm_ref.c
 create mode 100644 source3/lib/messages_dgm_ref.h

diff --git a/source3/lib/messages_dgm_ref.c b/source3/lib/messages_dgm_ref.c
new file mode 100644
index 0000000..f566eb7
--- /dev/null
+++ b/source3/lib/messages_dgm_ref.c
@@ -0,0 +1,92 @@
+/*
+ * Unix SMB/CIFS implementation.
+ * Samba internal messaging functions
+ * Copyright (C) 2014 by Volker Lendecke
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include <talloc.h>
+#include "replace.h"
+#include "messages_dgm.h"
+#include "messages_dgm_ref.h"
+#include "lib/util/debug.h"
+
+static unsigned num_refs = 0;
+static pid_t dgm_pid = 0;
+
+struct msg_dgm_ref {
+	uint8_t dummy;
+};
+
+static int msg_dgm_ref_destructor(struct msg_dgm_ref *r);
+
+void *messaging_dgm_ref(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+			uint64_t unique,
+			const char *socket_dir,
+			const char *lockfile_dir,
+			void (*recv_cb)(const uint8_t *msg, size_t msg_len,
+					int *fds, size_t num_fds,
+					void *private_data),
+			void *recv_cb_private_data,
+			int *err)
+{
+	struct msg_dgm_ref *result;
+	unsigned tmp_num_refs = 0;
+
+	result = talloc(mem_ctx, struct msg_dgm_ref);
+	if (result == NULL) {
+		*err = ENOMEM;
+		return NULL;
+	}
+
+	if (dgm_pid != getpid()) {
+		messaging_dgm_destroy();
+		tmp_num_refs = num_refs;
+		num_refs = 0;
+	}
+
+	if (num_refs == 0) {
+		int ret;
+
+		ret = messaging_dgm_init(ev, unique, socket_dir, lockfile_dir,
+					 recv_cb, recv_cb_private_data);
+		if (ret != 0) {
+			DEBUG(10, ("messaging_dgm_init failed: %s\n",
+				   strerror(ret)));
+			TALLOC_FREE(result);
+			*err = ret;
+			return NULL;
+		}
+	}
+
+	num_refs = tmp_num_refs;
+
+	num_refs += 1;
+	talloc_set_destructor(result, msg_dgm_ref_destructor);
+
+	return result;
+}
+
+static int msg_dgm_ref_destructor(struct msg_dgm_ref *r)
+{
+	if (num_refs == 0) {
+		abort();
+	}
+	num_refs -= 1;
+	if (num_refs == 0) {
+		messaging_dgm_destroy();
+	}
+	return 0;
+}
diff --git a/source3/lib/messages_dgm_ref.h b/source3/lib/messages_dgm_ref.h
new file mode 100644
index 0000000..3df0c06
--- /dev/null
+++ b/source3/lib/messages_dgm_ref.h
@@ -0,0 +1,37 @@
+/*
+ * Unix SMB/CIFS implementation.
+ * Samba internal messaging functions
+ * Copyright (C) 2014 by Volker Lendecke
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef _MESSAGES_DGM_REF_H_
+#define _MESSAGES_DGM_REF_H_
+
+#include <talloc.h>
+#include <tevent.h>
+#include "replace.h"
+
+void *messaging_dgm_ref(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+			uint64_t unique,
+			const char *socket_dir,
+			const char *lockfile_dir,
+			void (*recv_cb)(const uint8_t *msg, size_t msg_len,
+					int *fds, size_t num_fds,
+					void *private_data),
+			void *recv_cb_private_data,
+			int *err);
+
+#endif
diff --git a/source3/wscript_build b/source3/wscript_build
index a72dec8..2297c83 100755
--- a/source3/wscript_build
+++ b/source3/wscript_build
@@ -312,7 +312,7 @@ bld.SAMBA3_SUBSYSTEM('TDB_LIB',
                      deps='dbwrap samba-cluster-support')
 
 bld.SAMBA3_LIBRARY('messages_dgm',
-                   source='''lib/messages_dgm.c''',
+                   source='''lib/messages_dgm.c lib/messages_dgm_ref.c''',
                    deps='talloc UNIX_MSG POLL_FUNCS_TEVENT samba-debug',
                    private_library=True)
 
-- 
1.7.9.5


From fbe195564145919058e27613ece62ee9be96f0b3 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Sat, 4 Oct 2014 11:21:18 +0200
Subject: [PATCH 07/11] messaging3: Use messaging_dgm_ref

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/lib/messages.c |   34 +++++++++++++++-------------------
 1 file changed, 15 insertions(+), 19 deletions(-)

diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index b3debe7..aa35b6e 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -52,6 +52,7 @@
 #include "lib/util/tevent_unix.h"
 #include "lib/background.h"
 #include "lib/messages_dgm.h"
+#include "lib/messages_dgm_ref.h"
 
 struct messaging_callback {
 	struct messaging_callback *prev, *next;
@@ -73,6 +74,7 @@ struct messaging_context {
 	struct tevent_req **waiters;
 	unsigned num_waiters;
 
+	void *msg_dgm_ref;
 	struct messaging_backend *remote;
 };
 
@@ -265,12 +267,6 @@ static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
 	messaging_dispatch_rec(msg_ctx, &rec);
 }
 
-static int messaging_context_destructor(struct messaging_context *ctx)
-{
-	messaging_dgm_destroy();
-	return 0;
-}
-
 static const char *private_path(const char *name)
 {
 	return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
@@ -321,18 +317,16 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
 		return NULL;
 	}
 
-	ret = messaging_dgm_init(ctx->event_ctx, ctx->id.unique_id,
-				 priv_path, lck_path,
-				 messaging_recv_cb, ctx);
+	ctx->msg_dgm_ref = messaging_dgm_ref(
+		ctx, ctx->event_ctx, ctx->id.unique_id,
+		priv_path, lck_path, messaging_recv_cb, ctx, &ret);
 
-	if (ret != 0) {
-		DEBUG(2, ("messaging_dgm_init failed: %s\n", strerror(ret)));
+	if (ctx->msg_dgm_ref == NULL) {
+		DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
 		TALLOC_FREE(ctx);
 		return NULL;
 	}
 
-	talloc_set_destructor(ctx, messaging_context_destructor);
-
 	if (lp_clustering()) {
 		status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
 
@@ -369,15 +363,17 @@ NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
 	NTSTATUS status;
 	int ret;
 
-	messaging_dgm_destroy();
+	TALLOC_FREE(msg_ctx->msg_dgm_ref);
 
 	msg_ctx->id = procid_self();
 
-	ret = messaging_dgm_init(msg_ctx->event_ctx, msg_ctx->id.unique_id,
-				 private_path("sock"), lock_path("msg"),
-				 messaging_recv_cb, msg_ctx);
-	if (ret != 0) {
-		DEBUG(0, ("messaging_dgm_init failed: %s\n", strerror(errno)));
+	msg_ctx->msg_dgm_ref = messaging_dgm_ref(
+		msg_ctx, msg_ctx->event_ctx, msg_ctx->id.unique_id,
+		private_path("sock"), lock_path("msg"),
+		messaging_recv_cb, msg_ctx, &ret);
+
+	if (msg_ctx->msg_dgm_ref == NULL) {
+		DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
 		return map_nt_error_from_unix(ret);
 	}
 
-- 
1.7.9.5


From 325044c0372feb19124317c7ea5bf348832af039 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Tue, 16 Sep 2014 02:11:19 +0200
Subject: [PATCH 08/11] lib: Add server_id marshalling

---
 lib/util/samba_util.h |    3 +++
 lib/util/server_id.c  |   16 ++++++++++++++++
 2 files changed, 19 insertions(+)

diff --git a/lib/util/samba_util.h b/lib/util/samba_util.h
index 41b3fc8..45373f2 100644
--- a/lib/util/samba_util.h
+++ b/lib/util/samba_util.h
@@ -895,6 +895,9 @@ void server_id_set_disconnected(struct server_id *id);
  */
 bool server_id_is_disconnected(const struct server_id *id);
 
+void server_id_put(uint8_t buf[24], const struct server_id id);
+void server_id_get(struct server_id *id, const uint8_t buf[24]);
+
 /*
  * Samba code should use samba_tevent_context_init() instead of
  * tevent_context_init() in order to get the debug output.
diff --git a/lib/util/server_id.c b/lib/util/server_id.c
index 7d3de2f..308ee2a 100644
--- a/lib/util/server_id.c
+++ b/lib/util/server_id.c
@@ -150,3 +150,19 @@ bool server_id_is_disconnected(const struct server_id *id)
 
 	return server_id_equal(id, &dis);
 }
+
+void server_id_put(uint8_t buf[24], const struct server_id id)
+{
+	SBVAL(buf, 0,  id.pid);
+	SIVAL(buf, 8,  id.task_id);
+	SIVAL(buf, 12, id.vnn);
+	SBVAL(buf, 16, id.unique_id);
+}
+
+void server_id_get(struct server_id *id, const uint8_t buf[24])
+{
+	id->pid       = BVAL(buf, 0);
+	id->task_id   = IVAL(buf, 8);
+	id->vnn       = IVAL(buf, 12);
+	id->unique_id = BVAL(buf, 16);
+}
-- 
1.7.9.5


From 432527a7c5d071aaf468ed31bf6ccbc5d42c7089 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Tue, 16 Sep 2014 14:09:35 +0200
Subject: [PATCH 09/11] messaging: Define a binary format for message headers

---
 source3/lib/messages_util.c |   41 +++++++++++++++++++++++++++++++++++++++++
 source3/lib/messages_util.h |   33 +++++++++++++++++++++++++++++++++
 source3/wscript_build       |    5 +++++
 3 files changed, 79 insertions(+)
 create mode 100644 source3/lib/messages_util.c
 create mode 100644 source3/lib/messages_util.h

diff --git a/source3/lib/messages_util.c b/source3/lib/messages_util.c
new file mode 100644
index 0000000..24c364c
--- /dev/null
+++ b/source3/lib/messages_util.c
@@ -0,0 +1,41 @@
+/*
+ * Unix SMB/CIFS implementation.
+ * Samba internal messaging functions
+ * Copyright (C) 2013 by Volker Lendecke
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "replace.h"
+#include "lib/util/samba_util.h"
+#include "librpc/gen_ndr/server_id.h"
+#include "lib/util/byteorder.h"
+#include "messages_util.h"
+
+void message_hdr_put(uint8_t buf[MESSAGE_HDR_LENGTH], uint32_t msg_type,
+		     struct server_id src, struct server_id dst)
+{
+	server_id_put(buf, dst);
+	server_id_put(buf + 24, src);
+	SIVAL(buf, 48, msg_type);
+}
+
+void message_hdr_get(uint32_t *msg_type, struct server_id *src,
+		     struct server_id *dst,
+		     const uint8_t buf[MESSAGE_HDR_LENGTH])
+{
+	server_id_get(dst, buf);
+	server_id_get(src, buf + 24);
+	*msg_type = IVAL(buf, 48);
+}
diff --git a/source3/lib/messages_util.h b/source3/lib/messages_util.h
new file mode 100644
index 0000000..5b22f5e
--- /dev/null
+++ b/source3/lib/messages_util.h
@@ -0,0 +1,33 @@
+/*
+ * Unix SMB/CIFS implementation.
+ * Samba internal messaging functions
+ * Copyright (C) 2013 by Volker Lendecke
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#ifndef _MESSAGES_UTIL_H_
+#define _MESSAGES_UTIL_H_
+
+struct message_hdr;
+
+#define MESSAGE_HDR_LENGTH 52
+
+void message_hdr_put(uint8_t buf[MESSAGE_HDR_LENGTH], uint32_t msg_type,
+		     struct server_id src, struct server_id dst);
+void message_hdr_get(uint32_t *msg_type, struct server_id *src,
+		     struct server_id *dst,
+		     const uint8_t buf[MESSAGE_HDR_LENGTH]);
+
+#endif
diff --git a/source3/wscript_build b/source3/wscript_build
index 2297c83..bba1e12 100755
--- a/source3/wscript_build
+++ b/source3/wscript_build
@@ -316,6 +316,11 @@ bld.SAMBA3_LIBRARY('messages_dgm',
                    deps='talloc UNIX_MSG POLL_FUNCS_TEVENT samba-debug',
                    private_library=True)
 
+bld.SAMBA3_LIBRARY('messages_util',
+                   source='''lib/messages_util.c''',
+                   deps='samba-util',
+                   private_library=True)
+
 bld.SAMBA3_SUBSYSTEM('samba3core',
                    source='''lib/messages.c
                    lib/util_cluster.c
-- 
1.7.9.5


From c3c8234f1a5d76752dc18d46375b3a356ffc483c Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Sat, 4 Oct 2014 11:32:25 +0200
Subject: [PATCH 10/11] messaging3: Use message_hdr_put/get

---
 source3/lib/messages.c |   42 +++++++++++++++---------------------------
 source3/wscript_build  |    1 +
 2 files changed, 16 insertions(+), 27 deletions(-)

diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index aa35b6e..94a738f 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -53,6 +53,7 @@
 #include "lib/background.h"
 #include "lib/messages_dgm.h"
 #include "lib/messages_dgm_ref.h"
+#include "lib/messages_util.h"
 
 struct messaging_callback {
 	struct messaging_callback *prev, *next;
@@ -78,12 +79,6 @@ struct messaging_context {
 	struct messaging_backend *remote;
 };
 
-struct messaging_hdr {
-	uint32_t msg_type;
-	struct server_id dst;
-	struct server_id src;
-};
-
 /****************************************************************************
  A useful function for testing the message system.
 ****************************************************************************/
@@ -211,7 +206,9 @@ static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
 {
 	struct messaging_context *msg_ctx = talloc_get_type_abort(
 		private_data, struct messaging_context);
-	const struct messaging_hdr *hdr;
+	uint8_t hdr[MESSAGE_HDR_LENGTH];
+	uint32_t msg_type;
+	struct server_id src, dst;
 	struct server_id_buf idbuf;
 	struct messaging_rec rec;
 	int64_t fds64[MIN(num_fds, INT8_MAX)];
@@ -242,24 +239,20 @@ static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
 		fds[i] = -1;
 	}
 
-	/*
-	 * messages_dgm guarantees alignment, so we can cast here
-	 */
-	hdr = (const struct messaging_hdr *)msg;
+	message_hdr_get(&msg_type, &src, &dst, hdr);
 
 	DEBUG(10, ("%s: Received message 0x%x len %u (num_fds:%u) from %s\n",
-		   __func__, (unsigned)hdr->msg_type,
-		   (unsigned)(msg_len - sizeof(*hdr)),
-		   (unsigned)num_fds,
-		   server_id_str_buf(hdr->src, &idbuf)));
+		   __func__, (unsigned)msg_type,
+		   (unsigned)(msg_len - MESSAGE_HDR_LENGTH),
+		   (unsigned)num_fds, server_id_str_buf(src, &idbuf)));
 
 	rec = (struct messaging_rec) {
 		.msg_version = MESSAGE_VERSION,
-		.msg_type = hdr->msg_type,
-		.src = hdr->src,
-		.dest = hdr->dst,
-		.buf.data = discard_const_p(uint8, msg) + sizeof(*hdr),
-		.buf.length = msg_len - sizeof(*hdr),
+		.msg_type = msg_type,
+		.src = src,
+		.dest = dst,
+		.buf.data = discard_const_p(uint8, msg) + MESSAGE_HDR_LENGTH,
+		.buf.length = msg_len - MESSAGE_HDR_LENGTH,
 		.num_fds = num_fds,
 		.fds = fds64,
 	};
@@ -494,7 +487,7 @@ NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
 			    const int *fds, size_t num_fds)
 {
 	int ret;
-	struct messaging_hdr hdr;
+	uint8_t hdr[MESSAGE_HDR_LENGTH];
 	struct iovec iov2[iovlen+1];
 
 	if (server_id_is_disconnected(&server)) {
@@ -550,12 +543,7 @@ NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
 		return NT_STATUS_OK;
 	}
 
-	ZERO_STRUCT(hdr);
-	hdr = (struct messaging_hdr) {
-		.msg_type = msg_type,
-		.dst = server,
-		.src = msg_ctx->id
-	};
+	message_hdr_put(hdr, msg_type, msg_ctx->id, server);
 	iov2[0] = (struct iovec){ .iov_base = &hdr, .iov_len = sizeof(hdr) };
 	memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
 
diff --git a/source3/wscript_build b/source3/wscript_build
index bba1e12..3b6748e 100755
--- a/source3/wscript_build
+++ b/source3/wscript_build
@@ -370,6 +370,7 @@ bld.SAMBA3_SUBSYSTEM('samba3core',
                         samba3-util
                         errors3
                         messages_dgm
+                        messages_util
                         TDB_LIB''')
 
 bld.SAMBA3_LIBRARY('smbd_shim',
-- 
1.7.9.5


From 4f2699a58da68920d42e2c43411d43d16693fa7d Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Sat, 4 Oct 2014 11:46:29 +0200
Subject: [PATCH 11/11] messaging4: Use messages_dgm.[ch]

---
 source4/lib/messaging/messaging.c |  455 ++++++++++---------------------------
 1 file changed, 120 insertions(+), 335 deletions(-)

diff --git a/source4/lib/messaging/messaging.c b/source4/lib/messaging/messaging.c
index 53c8a9a..8d3563f 100644
--- a/source4/lib/messaging/messaging.c
+++ b/source4/lib/messaging/messaging.c
@@ -35,6 +35,9 @@
 #include "cluster/cluster.h"
 #include "../lib/util/tevent_ntstatus.h"
 #include "lib/param/param.h"
+#include "../source3/lib/messages_dgm.h"
+#include "../source3/lib/messages_dgm_ref.h"
+#include "../source3/lib/messages_util.h"
 
 /* change the message version with any incompatible changes in the protocol */
 #define IMESSAGING_VERSION 1
@@ -55,10 +58,10 @@ struct irpc_request {
 };
 
 struct imessaging_context {
+	struct imessaging_context *prev, *next;
 	struct server_id server_id;
-	struct socket_context *sock;
-	const char *base_path;
-	const char *path;
+	const char *sock_dir;
+	const char *lock_dir;
 	struct dispatch_fn **dispatch;
 	uint32_t num_types;
 	struct idr_context *dispatch_tree;
@@ -69,12 +72,11 @@ struct imessaging_context {
 	const char **names;
 	struct tdb_wrap *names_db;
 	struct timeval start_time;
-	struct tevent_timer *retry_te;
-	struct {
-		struct tevent_fd *fde;
-	} event;
+	void *msg_dgm_ref;
 };
 
+static struct imessaging_context *imessaging_contexts;
+
 /* we have a linked list of dispatch handlers for each msg_type that
    this messaging server can deal with */
 struct dispatch_fn {
@@ -131,248 +133,20 @@ static NTSTATUS irpc_uptime(struct irpc_message *msg,
 	return NT_STATUS_OK;
 }
 
-/*
-   return the path to a messaging socket
-*/
-static char *imessaging_path(struct imessaging_context *msg, struct server_id server_id)
-{
-	struct server_id_buf buf;
-
-	return talloc_asprintf(msg, "%s/msg.%s", msg->base_path,
-			       server_id_str_buf(server_id, &buf));
-}
-
-/*
-  dispatch a fully received message
-
-  note that this deliberately can match more than one message handler
-  per message. That allows a single messasging context to register
-  (for example) a debug handler for more than one piece of code
-*/
-static void imessaging_dispatch(struct imessaging_context *msg, struct imessaging_rec *rec)
+static struct dispatch_fn *imessaging_find_dispatch(
+	struct imessaging_context *msg, uint32_t msg_type)
 {
-	struct dispatch_fn *d, *next;
-
 	/* temporary IDs use an idtree, the rest use a array of pointers */
-	if (rec->header->msg_type >= MSG_TMP_BASE) {
-		d = (struct dispatch_fn *)idr_find(msg->dispatch_tree,
-						   rec->header->msg_type);
-	} else if (rec->header->msg_type < msg->num_types) {
-		d = msg->dispatch[rec->header->msg_type];
-	} else {
-		d = NULL;
+	if (msg_type >= MSG_TMP_BASE) {
+		return (struct dispatch_fn *)idr_find(msg->dispatch_tree,
+						      msg_type);
 	}
-
-	for (; d; d = next) {
-		DATA_BLOB data;
-		next = d->next;
-		data.data = rec->packet.data + sizeof(*rec->header);
-		data.length = rec->header->length;
-		d->fn(msg, d->private_data, d->msg_type, rec->header->from, &data);
-	}
-	rec->header->length = 0;
-}
-
-/*
-  handler for messages that arrive from other nodes in the cluster
-*/
-static void cluster_message_handler(struct imessaging_context *msg, DATA_BLOB packet)
-{
-	struct imessaging_rec *rec;
-
-	rec = talloc(msg, struct imessaging_rec);
-	if (rec == NULL) {
-		smb_panic("Unable to allocate imessaging_rec");
-	}
-
-	rec->msg           = msg;
-	rec->path          = msg->path;
-	rec->header        = (struct imessaging_header *)packet.data;
-	rec->packet        = packet;
-	rec->retries       = 0;
-
-	if (packet.length != sizeof(*rec->header) + rec->header->length) {
-		DEBUG(0,("messaging: bad message header size %d should be %d\n",
-			 rec->header->length, (int)(packet.length - sizeof(*rec->header))));
-		talloc_free(rec);
-		return;
-	}
-
-	imessaging_dispatch(msg, rec);
-	talloc_free(rec);
-}
-
-
-
-/*
-  try to send the message
-*/
-static NTSTATUS try_send(struct imessaging_rec *rec)
-{
-	struct imessaging_context *msg = rec->msg;
-	size_t nsent;
-	void *priv;
-	NTSTATUS status;
-	struct socket_address *path;
-
-	/* rec->path is the path of the *other* socket, where we want
-	 * this to end up */
-	path = socket_address_from_strings(msg, msg->sock->backend_name,
-					   rec->path, 0);
-	if (!path) {
-		return NT_STATUS_NO_MEMORY;
-	}
-
-	/* we send with privileges so messages work from any context */
-	priv = root_privileges();
-	status = socket_sendto(msg->sock, &rec->packet, &nsent, path);
-	talloc_free(path);
-	talloc_free(priv);
-
-	return status;
-}
-
-/*
-  retry backed off messages
-*/
-static void msg_retry_timer(struct tevent_context *ev, struct tevent_timer *te,
-			    struct timeval t, void *private_data)
-{
-	struct imessaging_context *msg = talloc_get_type(private_data,
-							struct imessaging_context);
-	msg->retry_te = NULL;
-
-	/* put the messages back on the main queue */
-	while (msg->retry_queue) {
-		struct imessaging_rec *rec = msg->retry_queue;
-		DLIST_REMOVE(msg->retry_queue, rec);
-		DLIST_ADD_END(msg->pending, rec, struct imessaging_rec *);
-	}
-
-	TEVENT_FD_WRITEABLE(msg->event.fde);
-}
-
-/*
-  handle a socket write event
-*/
-static void imessaging_send_handler(struct imessaging_context *msg, struct tevent_context *ev)
-{
-	while (msg->pending) {
-		struct imessaging_rec *rec = msg->pending;
-		NTSTATUS status;
-		status = try_send(rec);
-		if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
-			rec->retries++;
-			if (rec->retries > 3) {
-				/* we're getting continuous write errors -
-				   backoff this record */
-				DLIST_REMOVE(msg->pending, rec);
-				DLIST_ADD_END(msg->retry_queue, rec,
-					      struct imessaging_rec *);
-				if (msg->retry_te == NULL) {
-					msg->retry_te =
-						tevent_add_timer(ev, msg,
-								 timeval_current_ofs(1, 0),
-								 msg_retry_timer, msg);
-				}
-			}
-			break;
-		}
-		rec->retries = 0;
-		if (!NT_STATUS_IS_OK(status)) {
-			TALLOC_CTX *tmp_ctx = talloc_new(msg);
-			DEBUG(1,("messaging: Lost message from %s to %s of type %u - %s\n",
-				 server_id_str(tmp_ctx, &rec->header->from),
-				 server_id_str(tmp_ctx, &rec->header->to),
-				 rec->header->msg_type,
-				 nt_errstr(status)));
-			talloc_free(tmp_ctx);
-		}
-		DLIST_REMOVE(msg->pending, rec);
-		talloc_free(rec);
-	}
-	if (msg->pending == NULL) {
-		TEVENT_FD_NOT_WRITEABLE(msg->event.fde);
-	}
-}
-
-/*
-  handle a new incoming packet
-*/
-static void imessaging_recv_handler(struct imessaging_context *msg, struct tevent_context *ev)
-{
-	struct imessaging_rec *rec;
-	NTSTATUS status;
-	DATA_BLOB packet;
-	size_t msize;
-
-	/* see how many bytes are in the next packet */
-	status = socket_pending(msg->sock, &msize);
-	if (!NT_STATUS_IS_OK(status)) {
-		DEBUG(0,("socket_pending failed in messaging - %s\n",
-			 nt_errstr(status)));
-		return;
-	}
-
-	packet = data_blob_talloc(msg, NULL, msize);
-	if (packet.data == NULL) {
-		/* assume this is temporary and retry */
-		return;
-	}
-
-	status = socket_recv(msg->sock, packet.data, msize, &msize);
-	if (!NT_STATUS_IS_OK(status)) {
-		data_blob_free(&packet);
-		return;
-	}
-
-	if (msize < sizeof(*rec->header)) {
-		DEBUG(0,("messaging: bad message of size %d\n", (int)msize));
-		data_blob_free(&packet);
-		return;
-	}
-
-	rec = talloc(msg, struct imessaging_rec);
-	if (rec == NULL) {
-		smb_panic("Unable to allocate imessaging_rec");
-	}
-
-	talloc_steal(rec, packet.data);
-	rec->msg           = msg;
-	rec->path          = msg->path;
-	rec->header        = (struct imessaging_header *)packet.data;
-	rec->packet        = packet;
-	rec->retries       = 0;
-
-	if (msize != sizeof(*rec->header) + rec->header->length) {
-		DEBUG(0,("messaging: bad message header size %d should be %d\n",
-			 rec->header->length, (int)(msize - sizeof(*rec->header))));
-		talloc_free(rec);
-		return;
-	}
-
-	imessaging_dispatch(msg, rec);
-	talloc_free(rec);
-}
-
-
-/*
-  handle a socket event
-*/
-static void imessaging_handler(struct tevent_context *ev, struct tevent_fd *fde,
-			      uint16_t flags, void *private_data)
-{
-	struct imessaging_context *msg = talloc_get_type(private_data,
-							struct imessaging_context);
-	if (flags & TEVENT_FD_WRITE) {
-		imessaging_send_handler(msg, ev);
-	}
-	if (flags & TEVENT_FD_READ) {
-		imessaging_recv_handler(msg, ev);
+	if (msg_type < msg->num_types) {
+		return msg->dispatch[msg_type];
 	}
+	return NULL;
 }
 
-
 /*
   Register a dispatch function for a particular message type.
 */
@@ -463,64 +237,40 @@ void imessaging_deregister(struct imessaging_context *msg, uint32_t msg_type, vo
 NTSTATUS imessaging_send(struct imessaging_context *msg, struct server_id server,
 			uint32_t msg_type, const DATA_BLOB *data)
 {
-	struct imessaging_rec *rec;
-	NTSTATUS status;
-	size_t dlength = data?data->length:0;
+	uint8_t hdr[MESSAGE_HDR_LENGTH];
+	struct iovec iov[2];
+	int num_iov, ret;
+	pid_t pid;
+	void *priv;
 
-	rec = talloc(msg, struct imessaging_rec);
-	if (rec == NULL) {
-		return NT_STATUS_NO_MEMORY;
+	if (!cluster_node_equal(&msg->server_id, &server)) {
+		/* No cluster in source4... */
+		return NT_STATUS_OK;
 	}
 
-	rec->packet = data_blob_talloc(rec, NULL, sizeof(*rec->header) + dlength);
-	if (rec->packet.data == NULL) {
-		talloc_free(rec);
-		return NT_STATUS_NO_MEMORY;
-	}
+	message_hdr_put(hdr, msg_type, msg->server_id, server);
 
-	rec->retries       = 0;
-	rec->msg              = msg;
-	rec->header           = (struct imessaging_header *)rec->packet.data;
-	/* zero padding */
-	ZERO_STRUCTP(rec->header);
-	rec->header->version  = IMESSAGING_VERSION;
-	rec->header->msg_type = msg_type;
-	rec->header->from     = msg->server_id;
-	rec->header->to       = server;
-	rec->header->length   = dlength;
-	if (dlength != 0) {
-		memcpy(rec->packet.data + sizeof(*rec->header),
-		       data->data, dlength);
-	}
+	iov[0] = (struct iovec) { .iov_base = &hdr, .iov_len = sizeof(hdr) };
+	num_iov = 1;
 
-	if (!cluster_node_equal(&msg->server_id, &server)) {
-		/* the destination is on another node - dispatch via
-		   the cluster layer */
-		status = cluster_message_send(server, &rec->packet);
-		talloc_free(rec);
-		return status;
+	if (data != NULL) {
+		iov[1] = (struct iovec) { .iov_base = data->data,
+					  .iov_len = data->length };
+		num_iov += 1;
 	}
 
-	rec->path = imessaging_path(msg, server);
-	talloc_steal(rec, rec->path);
-
-	if (msg->pending != NULL) {
-		status = STATUS_MORE_ENTRIES;
-	} else {
-		status = try_send(rec);
+	pid = server.pid;
+	if (pid == 0) {
+		pid = getpid();
 	}
 
-	if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
-		if (msg->pending == NULL) {
-			TEVENT_FD_WRITEABLE(msg->event.fde);
-		}
-		DLIST_ADD_END(msg->pending, rec, struct imessaging_rec *);
-		return NT_STATUS_OK;
+	priv = root_privileges();
+	ret = messaging_dgm_send(pid, iov, num_iov, NULL, 0);
+	TALLOC_FREE(priv);
+	if (ret != 0) {
+		return map_nt_error_from_unix_common(ret);
 	}
-
-	talloc_free(rec);
-
-	return status;
+	return NT_STATUS_OK;
 }
 
 /*
@@ -547,14 +297,19 @@ int imessaging_cleanup(struct imessaging_context *msg)
 		return 0;
 	}
 
-	DEBUG(5,("imessaging: cleaning up %s\n", msg->path));
-	unlink(msg->path);
 	while (msg->names && msg->names[0]) {
 		irpc_remove_name(msg, msg->names[0]);
 	}
+
+	DLIST_REMOVE(imessaging_contexts, msg);
+
 	return 0;
 }
 
+static void imessaging_dgm_recv(const uint8_t *buf, size_t buf_len,
+				int *fds, size_t num_fds,
+				void *private_data);
+
 /*
   create the listening socket and setup the dispatcher
 
@@ -570,9 +325,8 @@ struct imessaging_context *imessaging_init(TALLOC_CTX *mem_ctx,
 					   bool auto_remove)
 {
 	struct imessaging_context *msg;
-	NTSTATUS status;
-	struct socket_address *path;
 	bool ok;
+	int ret;
 
 	if (ev == NULL) {
 		return NULL;
@@ -583,26 +337,31 @@ struct imessaging_context *imessaging_init(TALLOC_CTX *mem_ctx,
 		return NULL;
 	}
 
-	/* setup a handler for messages from other cluster nodes, if appropriate */
-	status = cluster_message_init(msg, server_id, cluster_message_handler);
-	if (!NT_STATUS_IS_OK(status)) {
-		goto fail;
-	}
-
 	/* create the messaging directory if needed */
 
-	msg->base_path     = lpcfg_imessaging_path(msg, lp_ctx);
-	if (msg->base_path == NULL) {
+	msg->sock_dir = lpcfg_private_path(msg, lp_ctx, "sock");
+	if (msg->sock_dir == NULL) {
+		goto fail;
+	}
+	ok = directory_create_or_exist_strict(msg->sock_dir, geteuid(), 0700);
+	if (!ok) {
 		goto fail;
 	}
 
-	ok = directory_create_or_exist_strict(msg->base_path, geteuid(), 0700);
+	msg->lock_dir = lpcfg_lock_path(msg, lp_ctx, "msg");
+	if (msg->lock_dir == NULL) {
+		goto fail;
+	}
+	ok = directory_create_or_exist_strict(msg->lock_dir, geteuid(), 0755);
 	if (!ok) {
 		goto fail;
 	}
 
-	msg->path          = imessaging_path(msg, server_id);
-	if (msg->path == NULL) {
+	msg->msg_dgm_ref = messaging_dgm_ref(
+		msg, ev, server_id.unique_id, msg->sock_dir, msg->lock_dir,
+		imessaging_dgm_recv, msg, &ret);
+
+	if (msg->msg_dgm_ref == NULL) {
 		goto fail;
 	}
 
@@ -619,39 +378,11 @@ struct imessaging_context *imessaging_init(TALLOC_CTX *mem_ctx,
 
 	msg->start_time    = timeval_current();
 
-	msg->names_db = irpc_namedb_open(msg, msg->base_path, lp_ctx);
+	msg->names_db = irpc_namedb_open(msg, msg->lock_dir, lp_ctx);
 	if (msg->names_db == NULL) {
 		goto fail;
 	}
 
-	status = socket_create("unix", SOCKET_TYPE_DGRAM, &msg->sock, 0);
-	if (!NT_STATUS_IS_OK(status)) {
-		goto fail;
-	}
-
-	/* by stealing here we ensure that the socket is cleaned up (and even
-	   deleted) on exit */
-	talloc_steal(msg, msg->sock);
-
-	path = socket_address_from_strings(msg, msg->sock->backend_name,
-					   msg->path, 0);
-	if (!path) {
-		goto fail;
-	}
-
-	status = socket_listen(msg->sock, path, 50, 0);
-	if (!NT_STATUS_IS_OK(status)) {
-		DEBUG(0,("Unable to setup messaging listener for '%s':%s\n", msg->path, nt_errstr(status)));
-		goto fail;
-	}
-
-	/* it needs to be non blocking for sends */
-	set_blocking(socket_get_fd(msg->sock), false);
-
-	msg->event.fde	= tevent_add_fd(ev, msg, socket_get_fd(msg->sock),
-				        TEVENT_FD_READ, imessaging_handler, msg);
-	tevent_fd_set_auto_close(msg->event.fde);
-
 	if (auto_remove) {
 		talloc_set_destructor(msg, imessaging_cleanup);
 	}
@@ -660,12 +391,66 @@ struct imessaging_context *imessaging_init(TALLOC_CTX *mem_ctx,
 	imessaging_register(msg, NULL, MSG_IRPC, irpc_handler);
 	IRPC_REGISTER(msg, irpc, IRPC_UPTIME, irpc_uptime, msg);
 
+	DLIST_ADD(imessaging_contexts, msg);
+
 	return msg;
 fail:
 	talloc_free(msg);
 	return NULL;
 }
 
+static void imessaging_dgm_recv(const uint8_t *buf, size_t buf_len,
+				int *fds, size_t num_fds,
+				void *private_data)
+{
+	uint32_t msg_type;
+	struct server_id src, dst;
+	struct server_id_buf srcbuf, dstbuf;
+	struct imessaging_context *msg;
+	struct dispatch_fn *d, *next;
+	DATA_BLOB data;
+
+	if (buf_len < MESSAGE_HDR_LENGTH) {
+		/* Invalid message, ignore */
+		return;
+	}
+
+	message_hdr_get(&msg_type, &src, &dst, buf);
+
+	DEBUG(1, ("Got type=0x%x, src=%s, dst=%s\n", (unsigned)msg_type,
+		  server_id_str_buf(src, &srcbuf),
+		  server_id_str_buf(dst, &dstbuf)));
+
+	data.data = discard_const_p(uint8_t, buf + MESSAGE_HDR_LENGTH);
+	data.length = buf_len - MESSAGE_HDR_LENGTH;
+
+	for (msg = imessaging_contexts; msg != NULL; msg = msg->next) {
+		DEBUG(1, ("Checking server_id=%s\n",
+			  server_id_str_buf(msg->server_id, &srcbuf)));
+
+		if (cluster_id_equal(&dst, &msg->server_id)) {
+			break;
+		}
+
+		if ((dst.task_id == 0) && (msg->server_id.pid == 0)) {
+			break;
+		}
+	}
+	if (msg == NULL) {
+		/* Unknown destination */
+		return;
+	}
+
+	d = imessaging_find_dispatch(msg, msg_type);
+
+	for (; d; d = next) {
+		next = d->next;
+		d->fn(msg, d->private_data, d->msg_type, src, &data);
+	}
+
+	return;
+}
+
 /*
    A hack, for the short term until we get 'client only' messaging in place
 */
-- 
1.7.9.5



More information about the samba-technical mailing list