[PATCH] messaging3 improvements

Volker Lendecke Volker.Lendecke at SerNet.DE
Wed May 7 12:18:46 MDT 2014


Hi!

Attached find a patch series that enables messaging3 for
nested event contexts. I think this makes it a better basis
for our internal communication. Also, the
messaging_filtered_read thingy makes it trivial to allow
multiple task id's in one process sharing the same messaging
context.

Review would be appreciated!

Thanks,

Volker

-- 
SerNet GmbH, Bahnhofsallee 1b, 37081 Göttingen
phone: +49-551-370000-0, fax: +49-551-370000-9
AG Göttingen, HRB 2816, GF: Dr. Johannes Loxen
http://www.sernet.de, mailto:kontakt at sernet.de
-------------- next part --------------
From 774640149e726b210145aed25a5d414d8081ceec Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Thu, 24 Apr 2014 09:05:53 +0000
Subject: [PATCH 01/12] messaging3: Add messaging_filtered_read

This delegates the decision whether to read a message to a callback

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/include/messages.h |    8 +++
 source3/lib/messages.c     |  128 +++++++++++++++++++++++++++++++++++---------
 2 files changed, 111 insertions(+), 25 deletions(-)

diff --git a/source3/include/messages.h b/source3/include/messages.h
index 06c1748..7801dfb 100644
--- a/source3/include/messages.h
+++ b/source3/include/messages.h
@@ -142,6 +142,14 @@ NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
 			    struct messaging_rec *rec);
 
+struct tevent_req *messaging_filtered_read_send(
+	TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+	struct messaging_context *msg_ctx,
+	bool (*filter)(struct messaging_rec *rec, void *private_data),
+	void *private_data);
+int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+				 struct messaging_rec **presult);
+
 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
 				       struct tevent_context *ev,
 				       struct messaging_context *msg,
diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index 9284ac1..ca254a4 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -458,33 +458,38 @@ static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
 	return result;
 }
 
-struct messaging_read_state {
+struct messaging_filtered_read_state {
 	struct tevent_context *ev;
 	struct messaging_context *msg_ctx;
-	uint32_t msg_type;
+
+	bool (*filter)(struct messaging_rec *rec, void *private_data);
+	void *private_data;
+
 	struct messaging_rec *rec;
 };
 
-static void messaging_read_cleanup(struct tevent_req *req,
-				   enum tevent_req_state req_state);
+static void messaging_filtered_read_cleanup(struct tevent_req *req,
+					    enum tevent_req_state req_state);
 
-struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
-				       struct tevent_context *ev,
-				       struct messaging_context *msg_ctx,
-				       uint32_t msg_type)
+struct tevent_req *messaging_filtered_read_send(
+	TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+	struct messaging_context *msg_ctx,
+	bool (*filter)(struct messaging_rec *rec, void *private_data),
+	void *private_data)
 {
 	struct tevent_req *req;
-	struct messaging_read_state *state;
+	struct messaging_filtered_read_state *state;
 	size_t new_waiters_len;
 
 	req = tevent_req_create(mem_ctx, &state,
-				struct messaging_read_state);
+				struct messaging_filtered_read_state);
 	if (req == NULL) {
 		return NULL;
 	}
 	state->ev = ev;
 	state->msg_ctx = msg_ctx;
-	state->msg_type = msg_type;
+	state->filter = filter;
+	state->private_data = private_data;
 
 	new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
 
@@ -501,16 +506,16 @@ struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
 
 	msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
 	msg_ctx->num_new_waiters += 1;
-	tevent_req_set_cleanup_fn(req, messaging_read_cleanup);
+	tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
 
 	return req;
 }
 
-static void messaging_read_cleanup(struct tevent_req *req,
-				   enum tevent_req_state req_state)
+static void messaging_filtered_read_cleanup(struct tevent_req *req,
+					    enum tevent_req_state req_state)
 {
-	struct messaging_read_state *state = tevent_req_data(
-		req, struct messaging_read_state);
+	struct messaging_filtered_read_state *state = tevent_req_data(
+		req, struct messaging_filtered_read_state);
 	struct messaging_context *msg_ctx = state->msg_ctx;
 	unsigned i;
 
@@ -531,11 +536,11 @@ static void messaging_read_cleanup(struct tevent_req *req,
 	}
 }
 
-static void messaging_read_done(struct tevent_req *req,
-				struct messaging_rec *rec)
+static void messaging_filtered_read_done(struct tevent_req *req,
+					 struct messaging_rec *rec)
 {
-	struct messaging_read_state *state = tevent_req_data(
-		req, struct messaging_read_state);
+	struct messaging_filtered_read_state *state = tevent_req_data(
+		req, struct messaging_filtered_read_state);
 
 	state->rec = messaging_rec_dup(state, rec);
 	if (tevent_req_nomem(state->rec, req)) {
@@ -544,6 +549,79 @@ static void messaging_read_done(struct tevent_req *req,
 	tevent_req_done(req);
 }
 
+int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+				 struct messaging_rec **presult)
+{
+	struct messaging_filtered_read_state *state = tevent_req_data(
+		req, struct messaging_filtered_read_state);
+	int err;
+
+	if (tevent_req_is_unix_error(req, &err)) {
+		tevent_req_received(req);
+		return err;
+	}
+	*presult = talloc_move(mem_ctx, &state->rec);
+	return 0;
+}
+
+struct messaging_read_state {
+	uint32_t msg_type;
+	struct messaging_rec *rec;
+};
+
+static bool messaging_read_filter(struct messaging_rec *rec,
+				  void *private_data);
+static void messaging_read_done(struct tevent_req *subreq);
+
+struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
+				       struct tevent_context *ev,
+				       struct messaging_context *msg,
+				       uint32_t msg_type)
+{
+	struct tevent_req *req, *subreq;
+	struct messaging_read_state *state;
+
+	req = tevent_req_create(mem_ctx, &state,
+				struct messaging_read_state);
+	if (req == NULL) {
+		return NULL;
+	}
+	state->msg_type = msg_type;
+
+	subreq = messaging_filtered_read_send(state, ev, msg,
+					      messaging_read_filter, state);
+	if (tevent_req_nomem(subreq, req)) {
+		return tevent_req_post(req, ev);
+	}
+	tevent_req_set_callback(subreq, messaging_read_done, req);
+	return req;
+}
+
+static bool messaging_read_filter(struct messaging_rec *rec,
+				  void *private_data)
+{
+	struct messaging_read_state *state = talloc_get_type_abort(
+		private_data, struct messaging_read_state);
+
+	return rec->msg_type == state->msg_type;
+}
+
+static void messaging_read_done(struct tevent_req *subreq)
+{
+	struct tevent_req *req = tevent_req_callback_data(
+		subreq, struct tevent_req);
+	struct messaging_read_state *state = tevent_req_data(
+		req, struct messaging_read_state);
+	int ret;
+
+	ret = messaging_filtered_read_recv(subreq, state, &state->rec);
+	TALLOC_FREE(subreq);
+	if (tevent_req_error(req, ret)) {
+		return;
+	}
+	tevent_req_done(req);
+}
+
 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
 			struct messaging_rec **presult)
 {
@@ -552,7 +630,6 @@ int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
 	int err;
 
 	if (tevent_req_is_unix_error(req, &err)) {
-		tevent_req_received(req);
 		return err;
 	}
 	if (presult != NULL) {
@@ -618,7 +695,7 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx,
 	i = 0;
 	while (i < msg_ctx->num_waiters) {
 		struct tevent_req *req;
-		struct messaging_read_state *state;
+		struct messaging_filtered_read_state *state;
 
 		req = msg_ctx->waiters[i];
 		if (req == NULL) {
@@ -638,9 +715,10 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx,
 			continue;
 		}
 
-		state = tevent_req_data(req, struct messaging_read_state);
-		if (state->msg_type == rec->msg_type) {
-			messaging_read_done(req, rec);
+		state = tevent_req_data(
+			req, struct messaging_filtered_read_state);
+		if (state->filter(rec, state->private_data)) {
+			messaging_filtered_read_done(req, rec);
 		}
 
 		i += 1;
-- 
1.7.9.5


From e1225f0a69fbc763c9ee8ce2de974a9f86e7a580 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Thu, 24 Apr 2014 09:23:48 +0000
Subject: [PATCH 02/12] dbwrap: Use messaging_filtered_read

This does not really save any code lines, but IMHO the code is simpler
this way. Also, in case we have lots of watchers this will be slightly
cheaper, because we don't have to re-establish a tevent_req.

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/lib/dbwrap/dbwrap_watch.c |   42 ++++++++++++++++++-------------------
 1 file changed, 21 insertions(+), 21 deletions(-)

diff --git a/source3/lib/dbwrap/dbwrap_watch.c b/source3/lib/dbwrap/dbwrap_watch.c
index 4f3a2b3..a5f1ebd 100644
--- a/source3/lib/dbwrap/dbwrap_watch.c
+++ b/source3/lib/dbwrap/dbwrap_watch.c
@@ -235,6 +235,8 @@ struct dbwrap_record_watch_state {
 	TDB_DATA w_key;
 };
 
+static bool dbwrap_record_watch_filter(struct messaging_rec *rec,
+				       void *private_data);
 static void dbwrap_record_watch_done(struct tevent_req *subreq);
 static int dbwrap_record_watch_state_destructor(
 	struct dbwrap_record_watch_state *state);
@@ -271,8 +273,8 @@ struct tevent_req *dbwrap_record_watch_send(TALLOC_CTX *mem_ctx,
 		return tevent_req_post(req, ev);
 	}
 
-	subreq = messaging_read_send(state, ev, state->msg,
-				     MSG_DBWRAP_MODIFIED);
+	subreq = messaging_filtered_read_send(
+		state, ev, state->msg, dbwrap_record_watch_filter, state);
 	if (tevent_req_nomem(subreq, req)) {
 		return tevent_req_post(req, ev);
 	}
@@ -288,6 +290,21 @@ struct tevent_req *dbwrap_record_watch_send(TALLOC_CTX *mem_ctx,
 	return req;
 }
 
+static bool dbwrap_record_watch_filter(struct messaging_rec *rec,
+				       void *private_data)
+{
+	struct dbwrap_record_watch_state *state = talloc_get_type_abort(
+		private_data, struct dbwrap_record_watch_state);
+
+	if (rec->msg_type != MSG_DBWRAP_MODIFIED) {
+		return false;
+	}
+	if (rec->buf.length != state->w_key.dsize) {
+		return false;
+	}
+	return memcmp(rec->buf.data, state->w_key.dptr,	rec->buf.length) == 0;
+}
+
 static int dbwrap_record_watch_state_destructor(
 	struct dbwrap_record_watch_state *s)
 {
@@ -351,33 +368,16 @@ static void dbwrap_record_watch_done(struct tevent_req *subreq)
 {
 	struct tevent_req *req = tevent_req_callback_data(
 		subreq, struct tevent_req);
-	struct dbwrap_record_watch_state *state = tevent_req_data(
-		req, struct dbwrap_record_watch_state);
 	struct messaging_rec *rec;
 	int ret;
 
-	ret = messaging_read_recv(subreq, talloc_tos(), &rec);
+	ret = messaging_filtered_read_recv(subreq, talloc_tos(), &rec);
 	TALLOC_FREE(subreq);
 	if (ret != 0) {
 		tevent_req_nterror(req, map_nt_error_from_unix(ret));
 		return;
 	}
-
-	if ((rec->buf.length == state->w_key.dsize) &&
-	    (memcmp(rec->buf.data, state->w_key.dptr, rec->buf.length) == 0)) {
-		tevent_req_done(req);
-		return;
-	}
-
-	/*
-	 * Not our record, wait for the next one
-	 */
-	subreq = messaging_read_send(state, state->ev, state->msg,
-				     MSG_DBWRAP_MODIFIED);
-	if (tevent_req_nomem(subreq, req)) {
-		return;
-	}
-	tevent_req_set_callback(subreq, dbwrap_record_watch_done, req);
+	tevent_req_done(req);
 }
 
 NTSTATUS dbwrap_record_watch_recv(struct tevent_req *req,
-- 
1.7.9.5


From 9e7f58e5620504fb6dc3b2646c2f75748e5e9e14 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 2 May 2014 09:12:52 +0000
Subject: [PATCH 03/12] messaging3: Fix 80-char line limit

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/lib/messages.c |   14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index ca254a4..065782a 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -709,7 +709,7 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx,
 				memmove(&msg_ctx->waiters[i],
 					&msg_ctx->waiters[i+1],
 					sizeof(struct tevent_req *) *
-						(msg_ctx->num_waiters - i - 1));
+					    (msg_ctx->num_waiters - i - 1));
 			}
 			msg_ctx->num_waiters -= 1;
 			continue;
@@ -735,7 +735,8 @@ bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
 
 	req = background_job_send(
 		msg, msg->event_ctx, msg, NULL, 0,
-		lp_parm_int(-1, "messaging", "messaging dgm cleanup interval", 60*15),
+		lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
+			    60*15),
 		mess_parent_dgm_cleanup, msg);
 	if (req == NULL) {
 		return false;
@@ -752,7 +753,8 @@ static int mess_parent_dgm_cleanup(void *private_data)
 
 	status = messaging_dgm_wipe(msg_ctx);
 	DEBUG(10, ("messaging_dgm_wipe returned %s\n", nt_errstr(status)));
-	return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval", 60*15);
+	return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
+			   60*15);
 }
 
 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
@@ -763,11 +765,13 @@ static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
 
 	status = background_job_recv(req);
 	TALLOC_FREE(req);
-	DEBUG(1, ("messaging dgm cleanup job ended with %s\n", nt_errstr(status)));
+	DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
+		  nt_errstr(status)));
 
 	req = background_job_send(
 		msg, msg->event_ctx, msg, NULL, 0,
-		lp_parm_int(-1, "messaging", "messaging dgm cleanup interval", 60*15),
+		lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
+			    60*15),
 		mess_parent_dgm_cleanup, msg);
 	if (req == NULL) {
 		DEBUG(1, ("background_job_send failed\n"));
-- 
1.7.9.5


From 9894803ac74d7660d139f70585dc6307656dfc4e Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 2 May 2014 09:20:40 +0000
Subject: [PATCH 04/12] messaging3: Add comments about not touching "waiters"

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/lib/messages.c |   16 ++++++++++++++++
 1 file changed, 16 insertions(+)

diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index 065782a..6a08531 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -491,6 +491,14 @@ struct tevent_req *messaging_filtered_read_send(
 	state->filter = filter;
 	state->private_data = private_data;
 
+	/*
+	 * We add ourselves to the "new_waiters" array, not the "waiters"
+	 * array. If we are called from within messaging_read_done,
+	 * messaging_dispatch_rec will be in an active for-loop on
+	 * "waiters". We must be careful not to mess with this array, because
+	 * it could mean that a single event is being delivered twice.
+	 */
+
 	new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
 
 	if (new_waiters_len == msg_ctx->num_new_waiters) {
@@ -521,6 +529,14 @@ static void messaging_filtered_read_cleanup(struct tevent_req *req,
 
 	tevent_req_set_cleanup_fn(req, NULL);
 
+	/*
+	 * Just set the [new_]waiters entry to NULL, be careful not to mess
+	 * with the other "waiters" array contents. We are often called from
+	 * within "messaging_dispatch_rec", which loops over
+	 * "waiters". Messing with the "waiters" array will mess up that
+	 * for-loop.
+	 */
+
 	for (i=0; i<msg_ctx->num_waiters; i++) {
 		if (msg_ctx->waiters[i] == req) {
 			msg_ctx->waiters[i] = NULL;
-- 
1.7.9.5


From 3209b0c4c333e5b1bf9aae00414ec6c4d328a1ee Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 5 May 2014 08:45:52 +0200
Subject: [PATCH 05/12] lib: Enhance poll_funcs_tevent for multiple
 tevent_contexts

With this patch it will be possible to use nested event contexts with
messaging_filtered_read_send/recv. Before this patchset only the one and only
event context a messaging_context is initialized with is able to receive
datagrams from the unix domain socket. So if you want to code a synchronous
RPC-like operation using a nested event context, you will not see the reply,
because the nested event context does not have the required tevent_fd's.
Unfortunately, this patchset has to add some advanced array voodoo. The idea
is that state->watches[] contains what we hand out with watch_new, and
state->contexts contains references to the tevent_contexts. For every watch we
need a tevent_fd in every event context, and the routines make sure that the
arrays are properly maintained.

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/lib/messages_dgm.c                 |   18 +-
 source3/lib/poll_funcs/poll_funcs_tevent.c |  349 ++++++++++++++++++++++++++--
 source3/lib/poll_funcs/poll_funcs_tevent.h |   15 +-
 source3/lib/unix_msg/test_drain.c          |   11 +-
 source3/lib/unix_msg/test_source.c         |   16 +-
 source3/lib/unix_msg/tests.c               |   23 +-
 6 files changed, 391 insertions(+), 41 deletions(-)

diff --git a/source3/lib/messages_dgm.c b/source3/lib/messages_dgm.c
index 354dac3..56643b1 100644
--- a/source3/lib/messages_dgm.c
+++ b/source3/lib/messages_dgm.c
@@ -30,7 +30,8 @@
 
 struct messaging_dgm_context {
 	struct messaging_context *msg_ctx;
-	struct poll_funcs msg_callbacks;
+	struct poll_funcs *msg_callbacks;
+	void *tevent_handle;
 	struct unix_msg_ctx *dgm_ctx;
 	char *cache_dir;
 	int lockfile_fd;
@@ -224,7 +225,18 @@ NTSTATUS messaging_dgm_init(struct messaging_context *msg_ctx,
 		return map_nt_error_from_unix(ret);
 	}
 
-	poll_funcs_init_tevent(&ctx->msg_callbacks, msg_ctx->event_ctx);
+	ctx->msg_callbacks = poll_funcs_init_tevent(ctx);
+	if (ctx->msg_callbacks == NULL) {
+		TALLOC_FREE(result);
+		return NT_STATUS_NO_MEMORY;
+	}
+
+	ctx->tevent_handle = poll_funcs_tevent_register(
+		ctx, ctx->msg_callbacks, msg_ctx->event_ctx);
+	if (ctx->tevent_handle == NULL) {
+		TALLOC_FREE(result);
+		return NT_STATUS_NO_MEMORY;
+	}
 
 	ok = directory_create_or_exist_strict(socket_dir, sec_initial_uid(),
 					      0700);
@@ -239,7 +251,7 @@ NTSTATUS messaging_dgm_init(struct messaging_context *msg_ctx,
 
 	generate_random_buffer((uint8_t *)&cookie, sizeof(cookie));
 
-	ret = unix_msg_init(socket_name, &ctx->msg_callbacks, 1024, cookie,
+	ret = unix_msg_init(socket_name, ctx->msg_callbacks, 1024, cookie,
 			    messaging_dgm_recv, ctx, &ctx->dgm_ctx);
 	TALLOC_FREE(socket_name);
 	if (ret != 0) {
diff --git a/source3/lib/poll_funcs/poll_funcs_tevent.c b/source3/lib/poll_funcs/poll_funcs_tevent.c
index 6e75042..b032211 100644
--- a/source3/lib/poll_funcs/poll_funcs_tevent.c
+++ b/source3/lib/poll_funcs/poll_funcs_tevent.c
@@ -1,6 +1,6 @@
 /*
  * Unix SMB/CIFS implementation.
- * Copyright (C) Volker Lendecke 2013
+ * Copyright (C) Volker Lendecke 2013,2014
  *
  * This program is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License as published by
@@ -20,14 +20,56 @@
 #include "tevent.h"
 #include "system/select.h"
 
+/*
+ * A poll_watch is asked for by the engine using this library via
+ * funcs->watch_new(). It represents interest in "fd" becoming readable or
+ * writable.
+ */
+
 struct poll_watch {
-	struct tevent_fd *fde;
+	struct poll_funcs_state *state;
+	unsigned slot; 		/* index into state->watches[] */
 	int fd;
+	int events;
 	void (*callback)(struct poll_watch *w, int fd, short events,
 			 void *private_data);
 	void *private_data;
 };
 
+struct poll_funcs_state {
+	/*
+	 * "watches" is the array of all watches that we have handed out via
+	 * funcs->watch_new(). The "watches" array can contain NULL pointers.
+	 */
+	unsigned num_watches;
+	struct poll_watch **watches;
+
+	/*
+	 * "contexts is the array of tevent_contexts that serve
+	 * "watches". "contexts" can contain NULL pointers.
+	 */
+	unsigned num_contexts;
+	struct poll_funcs_tevent_context **contexts;
+};
+
+struct poll_funcs_tevent_context {
+	unsigned refcount;
+	struct poll_funcs_state *state;
+	unsigned slot;		/* index into state->contexts[] */
+	struct tevent_context *ev;
+	struct tevent_fd **fdes; /* same indexes as state->watches[] */
+};
+
+/*
+ * poll_funcs_tevent_register() hands out a struct poll_funcs_tevent_handle as
+ * a void *. poll_funcs_tevent_register allows tevent_contexts to be
+ * registered multiple times, and we can't add a tevent_fd for the same fd's
+ * multiple times. So we have to share one poll_funcs_tevent_context.
+ */
+struct poll_funcs_tevent_handle {
+	struct poll_funcs_tevent_context *ctx;
+};
+
 static uint16_t poll_events_to_tevent(short events)
 {
 	uint16_t ret = 0;
@@ -54,9 +96,54 @@ static short tevent_to_poll_events(uint16_t flags)
 	return ret;
 }
 
-static void tevent_watch_handler(struct tevent_context *ev,
-				 struct tevent_fd *fde, uint16_t flags,
-				 void *private_data);
+/*
+ * Find or create a free slot in state->watches[]
+ */
+static bool poll_funcs_watch_find_slot(struct poll_funcs_state *state,
+				       unsigned *slot)
+{
+	struct poll_watch **watches;
+	unsigned i;
+
+	for (i=0; i<state->num_watches; i++) {
+		if (state->watches[i] == NULL) {
+			*slot = i;
+			return true;
+		}
+	}
+
+	watches = talloc_realloc(state, state->watches, struct poll_watch *,
+				 state->num_watches + 1);
+	if (watches == NULL) {
+		return false;
+	}
+	watches[state->num_watches] = NULL;
+	state->watches = watches;
+
+	for (i=0; i<state->num_contexts; i++) {
+		struct poll_funcs_tevent_context *c = state->contexts[i];
+		struct tevent_fd **fdes;
+
+		fdes = talloc_realloc(c, c->fdes, struct tevent_fd *,
+				      state->num_watches + 1);
+		if (fdes == NULL) {
+			return false;
+		}
+		c->fdes = fdes;
+
+		fdes[state->num_watches] = NULL;
+	}
+
+	*slot = state->num_watches;
+	state->num_watches += 1;
+
+	return true;
+}
+
+static void poll_funcs_fde_handler(struct tevent_context *ev,
+				   struct tevent_fd *fde, uint16_t flags,
+				   void *private_data);
+static int poll_watch_destructor(struct poll_watch *w);
 
 static struct poll_watch *tevent_watch_new(
 	const struct poll_funcs *funcs, int fd, short events,
@@ -64,45 +151,82 @@ static struct poll_watch *tevent_watch_new(
 			 void *private_data),
 	void *private_data)
 {
-	struct tevent_context *ev = talloc_get_type_abort(
-		funcs->private_data, struct tevent_context);
+	struct poll_funcs_state *state = talloc_get_type_abort(
+		funcs->private_data, struct poll_funcs_state);
 	struct poll_watch *w;
+	unsigned i, slot;
 
-	w = talloc(ev, struct poll_watch);
-	if (w == NULL) {
+	if (!poll_funcs_watch_find_slot(state, &slot)) {
 		return NULL;
 	}
-	w->fde = tevent_add_fd(ev, w, fd, poll_events_to_tevent(events),
-			       tevent_watch_handler, w);
-	if (w->fde == NULL) {
-		TALLOC_FREE(w);
+
+	w = talloc(state->watches, struct poll_watch);
+	if (w == NULL) {
 		return NULL;
 	}
+	w->state = state;
+	w->slot = slot;
+	w->fd = fd;
+	w->events = poll_events_to_tevent(events);
 	w->fd = fd;
 	w->callback = callback;
 	w->private_data = private_data;
+	state->watches[slot] = w;
+
+	talloc_set_destructor(w, poll_watch_destructor);
+
+	for (i=0; i<state->num_contexts; i++) {
+		struct poll_funcs_tevent_context *c = state->contexts[i];
+
+		c->fdes[slot] = tevent_add_fd(c->ev, c->fdes, w->fd, w->events,
+					      poll_funcs_fde_handler, w);
+		if (c->fdes[slot] == NULL) {
+			goto fail;
+		}
+	}
 	return w;
+
+fail:
+	TALLOC_FREE(w);
+	return NULL;
 }
 
-static void tevent_watch_handler(struct tevent_context *ev,
-				 struct tevent_fd *fde, uint16_t flags,
-				 void *private_data)
+static int poll_watch_destructor(struct poll_watch *w)
 {
-	struct poll_watch *w = talloc_get_type_abort(
-		private_data, struct poll_watch);
+	struct poll_funcs_state *state = w->state;
+	unsigned slot = w->slot;
+	unsigned i;
 
-	w->callback(w, w->fd, tevent_to_poll_events(flags),
-		    w->private_data);
+	TALLOC_FREE(state->watches[slot]);
+
+	for (i=0; i<state->num_contexts; i++) {
+		struct poll_funcs_tevent_context *c = state->contexts[i];
+		if (c == NULL) {
+			continue;
+		}
+		TALLOC_FREE(c->fdes[slot]);
+	}
+
+	return 0;
 }
 
 static void tevent_watch_update(struct poll_watch *w, short events)
 {
-	tevent_fd_set_flags(w->fde, poll_events_to_tevent(events));
+	struct poll_funcs_state *state = w->state;
+	unsigned slot = w->slot;
+	unsigned i;
+
+	w->events = poll_events_to_tevent(events);
+
+	for (i=0; i<state->num_contexts; i++) {
+		struct poll_funcs_tevent_context *c = state->contexts[i];
+		tevent_fd_set_flags(c->fdes[slot], w->events);
+	}
 }
 
 static short tevent_watch_get_events(struct poll_watch *w)
 {
-	return tevent_to_poll_events(tevent_fd_get_flags(w->fde));
+	return tevent_to_poll_events(w->events);
 }
 
 static void tevent_watch_free(struct poll_watch *w)
@@ -130,8 +254,24 @@ static void tevent_timeout_free(struct poll_timeout *t)
 	return;
 }
 
-void poll_funcs_init_tevent(struct poll_funcs *f, struct tevent_context *ev)
+static int poll_funcs_state_destructor(struct poll_funcs_state *state);
+
+struct poll_funcs *poll_funcs_init_tevent(TALLOC_CTX *mem_ctx)
 {
+	struct poll_funcs *f;
+	struct poll_funcs_state *state;
+
+	f = talloc(mem_ctx, struct poll_funcs);
+	if (f == NULL) {
+		return NULL;
+	}
+	state = talloc_zero(f, struct poll_funcs_state);
+	if (state == NULL) {
+		TALLOC_FREE(f);
+		return NULL;
+	}
+	talloc_set_destructor(state, poll_funcs_state_destructor);
+
 	f->watch_new = tevent_watch_new;
 	f->watch_update = tevent_watch_update;
 	f->watch_get_events = tevent_watch_get_events;
@@ -139,5 +279,166 @@ void poll_funcs_init_tevent(struct poll_funcs *f, struct tevent_context *ev)
 	f->timeout_new = tevent_timeout_new;
 	f->timeout_update = tevent_timeout_update;
 	f->timeout_free = tevent_timeout_free;
-	f->private_data = ev;
+	f->private_data = state;
+	return f;
+}
+
+static int poll_funcs_state_destructor(struct poll_funcs_state *state)
+{
+	unsigned i;
+	/*
+	 * Make sure the watches are cleared before the contexts. The watches
+	 * have destructors attached to them that clean up the fde's
+	 */
+	for (i=0; i<state->num_watches; i++) {
+		TALLOC_FREE(state->watches[i]);
+	}
+	return 0;
+}
+
+/*
+ * Find or create a free slot in state->contexts[]
+ */
+static bool poll_funcs_context_slot_find(struct poll_funcs_state *state,
+					 struct tevent_context *ev,
+					 unsigned *slot)
+{
+	struct poll_funcs_tevent_context **contexts;
+	unsigned i;
+
+	for (i=0; i<state->num_contexts; i++) {
+		struct poll_funcs_tevent_context *ctx = state->contexts[i];
+
+		if ((ctx == NULL) || (ctx->ev == ev)) {
+			*slot = i;
+			return true;
+		}
+	}
+
+	contexts = talloc_realloc(state, state->contexts,
+				  struct poll_funcs_tevent_context *,
+				  state->num_contexts + 1);
+	if (contexts == NULL) {
+		return false;
+	}
+	state->contexts = contexts;
+	state->contexts[state->num_contexts] = NULL;
+
+	*slot = state->num_contexts;
+	state->num_contexts += 1;
+
+	return true;
+}
+
+static int poll_funcs_tevent_context_destructor(
+	struct poll_funcs_tevent_context *ctx);
+
+static struct poll_funcs_tevent_context *poll_funcs_tevent_context_new(
+	TALLOC_CTX *mem_ctx, struct poll_funcs_state *state,
+	struct tevent_context *ev, unsigned slot)
+{
+	struct poll_funcs_tevent_context *ctx;
+	unsigned i;
+
+	ctx = talloc(mem_ctx, struct poll_funcs_tevent_context);
+	if (ctx == NULL) {
+		return NULL;
+	}
+
+	ctx->refcount = 0;
+	ctx->state = state;
+	ctx->ev = ev;
+	ctx->slot = slot;
+
+	ctx->fdes = talloc_array(ctx, struct tevent_fd *, state->num_watches);
+	if (ctx->fdes == NULL) {
+		goto fail;
+	}
+
+	for (i=0; i<state->num_watches; i++) {
+		struct poll_watch *w = state->watches[i];
+
+		if (w == NULL) {
+			ctx->fdes[i] = NULL;
+			continue;
+		}
+		ctx->fdes[i] = tevent_add_fd(ev, ctx->fdes, w->fd, w->events,
+					     poll_funcs_fde_handler, w);
+		if (ctx->fdes[i] == NULL) {
+			goto fail;
+		}
+	}
+	talloc_set_destructor(ctx, poll_funcs_tevent_context_destructor);
+	return ctx;
+fail:
+	TALLOC_FREE(ctx);
+	return NULL;
+}
+
+static int poll_funcs_tevent_context_destructor(
+	struct poll_funcs_tevent_context *ctx)
+{
+	ctx->state->contexts[ctx->slot] = NULL;
+	return 0;
+}
+
+static void poll_funcs_fde_handler(struct tevent_context *ev,
+				   struct tevent_fd *fde, uint16_t flags,
+				   void *private_data)
+{
+	struct poll_watch *w = talloc_get_type_abort(
+		private_data, struct poll_watch);
+	w->callback(w, w->fd, tevent_to_poll_events(flags),
+		    w->private_data);
+}
+
+static int poll_funcs_tevent_handle_destructor(
+	struct poll_funcs_tevent_handle *handle);
+
+void *poll_funcs_tevent_register(TALLOC_CTX *mem_ctx, struct poll_funcs *f,
+				 struct tevent_context *ev)
+{
+	struct poll_funcs_state *state = talloc_get_type_abort(
+		f->private_data, struct poll_funcs_state);
+	struct poll_funcs_tevent_handle *handle;
+	unsigned slot;
+
+	handle = talloc(mem_ctx, struct poll_funcs_tevent_handle);
+	if (handle == NULL) {
+		return NULL;
+	}
+
+	if (!poll_funcs_context_slot_find(state, ev, &slot)) {
+		goto fail;
+	}
+	if (state->contexts[slot] == NULL) {
+		state->contexts[slot] = poll_funcs_tevent_context_new(
+			state->contexts, state, ev, slot);
+		if (state->contexts[slot] == NULL) {
+			goto fail;
+		}
+	}
+
+	handle->ctx = state->contexts[slot];
+	handle->ctx->refcount += 1;
+	talloc_set_destructor(handle, poll_funcs_tevent_handle_destructor);
+	return handle;
+fail:
+	TALLOC_FREE(handle);
+	return NULL;
+}
+
+static int poll_funcs_tevent_handle_destructor(
+	struct poll_funcs_tevent_handle *handle)
+{
+	if (handle->ctx->refcount == 0) {
+		abort();
+	}
+	handle->ctx->refcount -= 1;
+
+	if (handle->ctx->refcount != 0) {
+		return 0;
+	}
+	TALLOC_FREE(handle->ctx);
+	return 0;
 }
diff --git a/source3/lib/poll_funcs/poll_funcs_tevent.h b/source3/lib/poll_funcs/poll_funcs_tevent.h
index 2e67720..8b2964c 100644
--- a/source3/lib/poll_funcs/poll_funcs_tevent.h
+++ b/source3/lib/poll_funcs/poll_funcs_tevent.h
@@ -1,6 +1,6 @@
 /*
  * Unix SMB/CIFS implementation.
- * Copyright (C) Volker Lendecke 2013
+ * Copyright (C) Volker Lendecke 2013,2014
  *
  * This program is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License as published by
@@ -22,6 +22,17 @@
 #include "poll_funcs.h"
 #include "tevent.h"
 
-void poll_funcs_init_tevent(struct poll_funcs *f, struct tevent_context *ev);
+/*
+ * Create a new, empty instance of "struct poll_funcs" to be served by tevent.
+ */
+struct poll_funcs *poll_funcs_init_tevent(TALLOC_CTX *mem_ctx);
+
+/*
+ * Register a tevent_context to handle the watches that the user of
+ * "poll_funcs" showed interest in. talloc_free() the returned pointer when
+ * "ev" is not supposed to handle the events anymore.
+ */
+void *poll_funcs_tevent_register(TALLOC_CTX *mem_ctx, struct poll_funcs *f,
+				 struct tevent_context *ev);
 
 #endif
diff --git a/source3/lib/unix_msg/test_drain.c b/source3/lib/unix_msg/test_drain.c
index 6fe8c18..c2568b6 100644
--- a/source3/lib/unix_msg/test_drain.c
+++ b/source3/lib/unix_msg/test_drain.c
@@ -16,7 +16,7 @@ static void recv_cb(struct unix_msg_ctx *ctx,
 
 int main(int argc, const char *argv[])
 {
-	struct poll_funcs funcs;
+	struct poll_funcs *funcs;
 	const char *sock;
 	struct unix_msg_ctx *ctx;
 	struct tevent_context *ev;
@@ -37,10 +37,13 @@ int main(int argc, const char *argv[])
 		perror("tevent_context_init failed");
 		return 1;
 	}
-	poll_funcs_init_tevent(&funcs, ev);
+	funcs = poll_funcs_init_tevent(ev);
+	if (funcs == NULL) {
+		fprintf(stderr, "poll_funcs_init_tevent failed\n");
+		return 1;
+	}
 
-	ret = unix_msg_init(sock, &funcs, 256, 1,
-			    recv_cb, &state, &ctx);
+	ret = unix_msg_init(sock, funcs, 256, 1, recv_cb, &state, &ctx);
 	if (ret != 0) {
 		fprintf(stderr, "unix_msg_init failed: %s\n",
 			strerror(ret));
diff --git a/source3/lib/unix_msg/test_source.c b/source3/lib/unix_msg/test_source.c
index bfafee1..94984d8 100644
--- a/source3/lib/unix_msg/test_source.c
+++ b/source3/lib/unix_msg/test_source.c
@@ -5,7 +5,8 @@
 
 int main(int argc, const char *argv[])
 {
-	struct poll_funcs funcs;
+	struct poll_funcs *funcs;
+	void *tevent_handle;
 	struct unix_msg_ctx **ctxs;
 	struct tevent_context *ev;
 	struct iovec iov;
@@ -26,7 +27,16 @@ int main(int argc, const char *argv[])
 		perror("tevent_context_init failed");
 		return 1;
 	}
-	poll_funcs_init_tevent(&funcs, ev);
+	funcs = poll_funcs_init_tevent(NULL);
+	if (funcs == NULL) {
+		fprintf(stderr, "poll_funcs_init_tevent failed\n");
+		return 1;
+	}
+	tevent_handle = poll_funcs_tevent_register(NULL, funcs, ev);
+	if (tevent_handle == NULL) {
+		fprintf(stderr, "poll_funcs_tevent_register failed\n");
+		return 1;
+	}
 
 	ctxs = talloc_array(ev, struct unix_msg_ctx *, num_ctxs);
 	if (ctxs == NULL) {
@@ -35,7 +45,7 @@ int main(int argc, const char *argv[])
 	}
 
 	for (i=0; i<num_ctxs; i++) {
-		ret = unix_msg_init(NULL, &funcs, 256, 1, NULL, NULL,
+		ret = unix_msg_init(NULL, funcs, 256, 1, NULL, NULL,
 				    &ctxs[i]);
 		if (ret != 0) {
 			fprintf(stderr, "unix_msg_init failed: %s\n",
diff --git a/source3/lib/unix_msg/tests.c b/source3/lib/unix_msg/tests.c
index 2a4cf86..29d5dcb 100644
--- a/source3/lib/unix_msg/tests.c
+++ b/source3/lib/unix_msg/tests.c
@@ -32,7 +32,8 @@ static void expect_messages(struct tevent_context *ev, struct cb_state *state,
 
 int main(void)
 {
-	struct poll_funcs funcs;
+	struct poll_funcs *funcs;
+	void *tevent_handle;
 	const char *sock1 = "sock1";
 	const char *sock2 = "sock2";
 	struct unix_msg_ctx *ctx1, *ctx2;
@@ -52,9 +53,19 @@ int main(void)
 		perror("tevent_context_init failed");
 		return 1;
 	}
-	poll_funcs_init_tevent(&funcs, ev);
 
-	ret = unix_msg_init(sock1, &funcs, 256, 1,
+	funcs = poll_funcs_init_tevent(ev);
+	if (funcs == NULL) {
+		fprintf(stderr, "poll_funcs_init_tevent failed\n");
+		return 1;
+	}
+	tevent_handle = poll_funcs_tevent_register(ev, funcs, ev);
+	if (tevent_handle == NULL) {
+		fprintf(stderr, "poll_funcs_register_tevent failed\n");
+		return 1;
+	}
+
+	ret = unix_msg_init(sock1, funcs, 256, 1,
 			    recv_cb, &state, &ctx1);
 	if (ret != 0) {
 		fprintf(stderr, "unix_msg_init failed: %s\n",
@@ -62,7 +73,7 @@ int main(void)
 		return 1;
 	}
 
-	ret = unix_msg_init(sock1, &funcs, 256, 1,
+	ret = unix_msg_init(sock1, funcs, 256, 1,
 			    recv_cb, &state, &ctx1);
 	if (ret == 0) {
 		fprintf(stderr, "unix_msg_init succeeded unexpectedly\n");
@@ -74,7 +85,7 @@ int main(void)
 		return 1;
 	}
 
-	ret = unix_msg_init(sock2, &funcs, 256, 1,
+	ret = unix_msg_init(sock2, funcs, 256, 1,
 			    recv_cb, &state, &ctx2);
 	if (ret != 0) {
 		fprintf(stderr, "unix_msg_init failed: %s\n",
@@ -201,6 +212,8 @@ int main(void)
 
 	unix_msg_free(ctx1);
 	unix_msg_free(ctx2);
+	talloc_free(tevent_handle);
+	talloc_free(funcs);
 	talloc_free(ev);
 
 	return 0;
-- 
1.7.9.5


From c53f7a1230ccf8b9540aba33b9f6c14f30630008 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Tue, 6 May 2014 09:11:17 +0200
Subject: [PATCH 06/12] messaging3: Add messaging_dgm_register_tevent_context

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/include/messages.h |    3 +++
 source3/lib/messages_dgm.c |    9 +++++++++
 2 files changed, 12 insertions(+)

diff --git a/source3/include/messages.h b/source3/include/messages.h
index 7801dfb..852e8a1 100644
--- a/source3/include/messages.h
+++ b/source3/include/messages.h
@@ -99,6 +99,9 @@ NTSTATUS messaging_dgm_init(struct messaging_context *msg_ctx,
 			    struct messaging_backend **presult);
 NTSTATUS messaging_dgm_cleanup(struct messaging_context *msg_ctx, pid_t pid);
 NTSTATUS messaging_dgm_wipe(struct messaging_context *msg_ctx);
+void *messaging_dgm_register_tevent_context(TALLOC_CTX *mem_ctx,
+					    struct messaging_context *msg_ctx,
+					    struct tevent_context *ev);
 
 NTSTATUS messaging_ctdbd_init(struct messaging_context *msg_ctx,
 			      TALLOC_CTX *mem_ctx,
diff --git a/source3/lib/messages_dgm.c b/source3/lib/messages_dgm.c
index 56643b1..55a6fcf 100644
--- a/source3/lib/messages_dgm.c
+++ b/source3/lib/messages_dgm.c
@@ -473,3 +473,12 @@ NTSTATUS messaging_dgm_wipe(struct messaging_context *msg_ctx)
 
 	return NT_STATUS_OK;
 }
+
+void *messaging_dgm_register_tevent_context(TALLOC_CTX *mem_ctx,
+					    struct messaging_context *msg_ctx,
+					    struct tevent_context *ev)
+{
+	struct messaging_dgm_context *ctx = talloc_get_type_abort(
+		msg_ctx->local->private_data, struct messaging_dgm_context);
+	return poll_funcs_tevent_register(mem_ctx, ctx->msg_callbacks, ev);
+}
-- 
1.7.9.5


From 1469291ec9fe30fcaa59e63110ccc9067a8bea88 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Tue, 6 May 2014 09:39:01 +0200
Subject: [PATCH 07/12] messaging3: Fix messaging_filtered_read_send

If we register an additional tevent context, we can now properly do
nested event contexts, listening for just one message type inside a
tevent_req_poll.

At this point this only enhances things without ctdb, but I'm working fixing
that soon.

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/lib/messages.c |   16 ++++++++++++++++
 1 file changed, 16 insertions(+)

diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index 6a08531..53dfbcf 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -461,6 +461,7 @@ static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
 struct messaging_filtered_read_state {
 	struct tevent_context *ev;
 	struct messaging_context *msg_ctx;
+	void *tevent_handle;
 
 	bool (*filter)(struct messaging_rec *rec, void *private_data);
 	void *private_data;
@@ -491,6 +492,12 @@ struct tevent_req *messaging_filtered_read_send(
 	state->filter = filter;
 	state->private_data = private_data;
 
+	state->tevent_handle = messaging_dgm_register_tevent_context(
+		state, msg_ctx, ev);
+	if (tevent_req_nomem(state, req)) {
+		return tevent_req_post(req, ev);
+	}
+
 	/*
 	 * We add ourselves to the "new_waiters" array, not the "waiters"
 	 * array. If we are called from within messaging_read_done,
@@ -529,6 +536,8 @@ static void messaging_filtered_read_cleanup(struct tevent_req *req,
 
 	tevent_req_set_cleanup_fn(req, NULL);
 
+	TALLOC_FREE(state->tevent_handle);
+
 	/*
 	 * Just set the [new_]waiters entry to NULL, be careful not to mess
 	 * with the other "waiters" array contents. We are often called from
@@ -562,6 +571,13 @@ static void messaging_filtered_read_done(struct tevent_req *req,
 	if (tevent_req_nomem(state->rec, req)) {
 		return;
 	}
+
+	/*
+	 * We have to defer the callback here, as we might be called from
+	 * within a different tevent_context than state->ev
+	 */
+	tevent_req_defer_callback(req, state->ev);
+
 	tevent_req_done(req);
 }
 
-- 
1.7.9.5


From a61d1161fb2ede30837f0ee41e73b392d0e37c35 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Wed, 7 May 2014 08:49:04 +0200
Subject: [PATCH 08/12] torture3: Fix local-messaging-read1

Now that we defer requests in dispatch_rec, we need 3 rounds to finish
the requests

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/torture/test_messaging_read.c |    9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git a/source3/torture/test_messaging_read.c b/source3/torture/test_messaging_read.c
index 387ebfd..188b021 100644
--- a/source3/torture/test_messaging_read.c
+++ b/source3/torture/test_messaging_read.c
@@ -91,6 +91,7 @@ bool run_messaging_read1(int dummy)
 	unsigned count2 = 0;
 	NTSTATUS status;
 	bool retval = false;
+	int i;
 
 	ev = samba_tevent_context_init(talloc_tos());
 	if (ev == NULL) {
@@ -121,9 +122,11 @@ bool run_messaging_read1(int dummy)
 		goto fail;
 	}
 
-	if (tevent_loop_once(ev) != 0) {
-		fprintf(stderr, "tevent_loop_once failed\n");
-		goto fail;
+	for (i=0; i<3; i++) {
+		if (tevent_loop_once(ev) != 0) {
+			fprintf(stderr, "tevent_loop_once failed\n");
+			goto fail;
+		}
 	}
 
 	printf("%u/%u\n", count1, count2);
-- 
1.7.9.5


From c278e471e758edcd778f5191cd6fcd84158fd28f Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Wed, 7 May 2014 09:44:57 +0200
Subject: [PATCH 09/12] messaging3: Push down the self-send callback

In the messaging_read receivers we already defer the callback: We need to
reply on potentially different tevent contexts, thus the defer_callback.

The callback case in messaging_dispatch_rec was direct before this
patch. This changes messaging_dispatch_rec to also defer the callback
in the self-send case.

Now we need only two roundtrips in local-messaging-read1 :-)

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/lib/messages.c                |  145 +++++++++++++++++++++------------
 source3/torture/test_messaging_read.c |    2 +-
 2 files changed, 94 insertions(+), 53 deletions(-)

diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index 53dfbcf..a435f4f 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -341,15 +341,6 @@ void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
 	}
 }
 
-struct messaging_selfsend_state {
-	struct messaging_context *msg;
-	struct messaging_rec rec;
-};
-
-static void messaging_trigger_self(struct tevent_context *ev,
-				   struct tevent_immediate *im,
-				   void *private_data);
-
 /*
   Send a message to a particular server
 */
@@ -368,33 +359,13 @@ NTSTATUS messaging_send(struct messaging_context *msg_ctx,
 	}
 
 	if (server_id_equal(&msg_ctx->id, &server)) {
-		struct messaging_selfsend_state *state;
-		struct tevent_immediate *im;
-
-		state = talloc_pooled_object(
-			msg_ctx, struct messaging_selfsend_state,
-			1, data->length);
-		if (state == NULL) {
-			return NT_STATUS_NO_MEMORY;
-		}
-		state->msg = msg_ctx;
-		state->rec.msg_version = MESSAGE_VERSION;
-		state->rec.msg_type = msg_type & MSG_TYPE_MASK;
-		state->rec.dest = server;
-		state->rec.src = msg_ctx->id;
-
-		/* Can't fail, it's a pooled_object */
-		state->rec.buf = data_blob_talloc(
-			state, data->data, data->length);
-
-		im = tevent_create_immediate(state);
-		if (im == NULL) {
-			TALLOC_FREE(state);
-			return NT_STATUS_NO_MEMORY;
-		}
-
-		tevent_schedule_immediate(im, msg_ctx->event_ctx,
-					  messaging_trigger_self, state);
+		struct messaging_rec rec;
+		rec.msg_version = MESSAGE_VERSION;
+		rec.msg_type = msg_type & MSG_TYPE_MASK;
+		rec.dest = server;
+		rec.src = msg_ctx->id;
+		rec.buf = *data;
+		messaging_dispatch_rec(msg_ctx, &rec);
 		return NT_STATUS_OK;
 	}
 
@@ -402,16 +373,6 @@ NTSTATUS messaging_send(struct messaging_context *msg_ctx,
 				       msg_ctx->local);
 }
 
-static void messaging_trigger_self(struct tevent_context *ev,
-				   struct tevent_immediate *im,
-				   void *private_data)
-{
-	struct messaging_selfsend_state *state = talloc_get_type_abort(
-		private_data, struct messaging_selfsend_state);
-	messaging_dispatch_rec(state->msg, &state->rec);
-	TALLOC_FREE(state);
-}
-
 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
 			    struct server_id server, uint32_t msg_type,
 			    const uint8_t *buf, size_t len)
@@ -698,6 +659,67 @@ static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
 	return true;
 }
 
+struct messaging_defer_callback_state {
+	struct messaging_context *msg_ctx;
+	struct messaging_rec *rec;
+	void (*fn)(struct messaging_context *msg, void *private_data,
+		   uint32_t msg_type, struct server_id server_id,
+		   DATA_BLOB *data);
+	void *private_data;
+};
+
+static void messaging_defer_callback_trigger(struct tevent_context *ev,
+					     struct tevent_immediate *im,
+					     void *private_data);
+
+static void messaging_defer_callback(
+	struct messaging_context *msg_ctx, struct messaging_rec *rec,
+	void (*fn)(struct messaging_context *msg, void *private_data,
+		   uint32_t msg_type, struct server_id server_id,
+		   DATA_BLOB *data),
+	void *private_data)
+{
+	struct messaging_defer_callback_state *state;
+	struct tevent_immediate *im;
+
+	state = talloc(msg_ctx, struct messaging_defer_callback_state);
+	if (state == NULL) {
+		DEBUG(1, ("talloc failed\n"));
+		return;
+	}
+	state->msg_ctx = msg_ctx;
+	state->fn = fn;
+	state->private_data = private_data;
+
+	state->rec = messaging_rec_dup(state, rec);
+	if (state->rec == NULL) {
+		DEBUG(1, ("talloc failed\n"));
+		TALLOC_FREE(state);
+		return;
+	}
+
+	im = tevent_create_immediate(state);
+	if (im == NULL) {
+		DEBUG(1, ("tevent_create_immediate failed\n"));
+		TALLOC_FREE(state);
+		return;
+	}
+	tevent_schedule_immediate(im, msg_ctx->event_ctx,
+				  messaging_defer_callback_trigger, state);
+}
+
+static void messaging_defer_callback_trigger(struct tevent_context *ev,
+					     struct tevent_immediate *im,
+					     void *private_data)
+{
+	struct messaging_defer_callback_state *state = talloc_get_type_abort(
+		private_data, struct messaging_defer_callback_state);
+	struct messaging_rec *rec = state->rec;
+
+	state->fn(state->msg_ctx, state->private_data, rec->msg_type, rec->src,
+		  &rec->buf);
+}
+
 /*
   Dispatch one messaging_rec
 */
@@ -709,15 +731,34 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx,
 
 	for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
 		next = cb->next;
-		if (cb->msg_type == rec->msg_type) {
+		if (cb->msg_type != rec->msg_type) {
+			continue;
+		}
+
+		if (server_id_equal(&msg_ctx->id, &rec->dest)) {
+			/*
+			 * This is a self-send. We are called here from
+			 * messaging_send(), and we don't want to directly
+			 * recurse into the callback but go via a
+			 * tevent_loop_once
+			 */
+			messaging_defer_callback(msg_ctx, rec, cb->fn,
+						 cb->private_data);
+		} else {
+			/*
+			 * This comes from a different process. we are called
+			 * from the event loop, so we should call back
+			 * directly.
+			 */
 			cb->fn(msg_ctx, cb->private_data, rec->msg_type,
 			       rec->src, &rec->buf);
-			/* we continue looking for matching messages
-			   after finding one. This matters for
-			   subsystems like the internal notify code
-			   which register more than one handler for
-			   the same message type */
 		}
+		/*
+		 * we continue looking for matching messages after finding
+		 * one. This matters for subsystems like the internal notify
+		 * code which register more than one handler for the same
+		 * message type
+		 */
 	}
 
 	if (!messaging_append_new_waiters(msg_ctx)) {
diff --git a/source3/torture/test_messaging_read.c b/source3/torture/test_messaging_read.c
index 188b021..0bb3128 100644
--- a/source3/torture/test_messaging_read.c
+++ b/source3/torture/test_messaging_read.c
@@ -122,7 +122,7 @@ bool run_messaging_read1(int dummy)
 		goto fail;
 	}
 
-	for (i=0; i<3; i++) {
+	for (i=0; i<2; i++) {
 		if (tevent_loop_once(ev) != 0) {
 			fprintf(stderr, "tevent_loop_once failed\n");
 			goto fail;
-- 
1.7.9.5


From 64066bb35c1a11a9c63ed1f6b8fa0ef0395ed78c Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Wed, 7 May 2014 09:50:27 +0200
Subject: [PATCH 10/12] messaging3: Factor out the self-send check

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/lib/messages.c |   10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index a435f4f..657a062 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -341,6 +341,12 @@ void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
 	}
 }
 
+static bool messaging_is_self_send(const struct messaging_context *msg_ctx,
+				   const struct server_id *dst)
+{
+	return server_id_equal(&msg_ctx->id, dst);
+}
+
 /*
   Send a message to a particular server
 */
@@ -358,7 +364,7 @@ NTSTATUS messaging_send(struct messaging_context *msg_ctx,
 						msg_ctx->remote);
 	}
 
-	if (server_id_equal(&msg_ctx->id, &server)) {
+	if (messaging_is_self_send(msg_ctx, &server)) {
 		struct messaging_rec rec;
 		rec.msg_version = MESSAGE_VERSION;
 		rec.msg_type = msg_type & MSG_TYPE_MASK;
@@ -735,7 +741,7 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx,
 			continue;
 		}
 
-		if (server_id_equal(&msg_ctx->id, &rec->dest)) {
+		if (messaging_is_self_send(msg_ctx, &rec->dest)) {
 			/*
 			 * This is a self-send. We are called here from
 			 * messaging_send(), and we don't want to directly
-- 
1.7.9.5


From ec92423328c5fc0b9ead56ba90b0fd625faeeada Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Wed, 7 May 2014 09:51:59 +0200
Subject: [PATCH 11/12] messaging3: Relax the self-send check a bit

In the future we will have multiple task id's per process. They should all be
able to benefit from the self-send local optimization.

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/lib/messages.c |    3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index 657a062..a384ffd 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -344,7 +344,8 @@ void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
 static bool messaging_is_self_send(const struct messaging_context *msg_ctx,
 				   const struct server_id *dst)
 {
-	return server_id_equal(&msg_ctx->id, dst);
+	return ((msg_ctx->id.vnn == dst->vnn) &&
+		(msg_ctx->id.pid == dst->pid));
 }
 
 /*
-- 
1.7.9.5


From f35f4b28c6cb0ac94e07207b55cb11a5a9c2c2cf Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Wed, 7 May 2014 11:21:04 +0200
Subject: [PATCH 12/12] torture3: local-messaging-read3

This is a testcase for the rpc-style messaging nested event context. We have to
fork here: The nested event context does not reply to the PING message, it only
listens for the PONG response. But that's the point of the patches: Correctly
pick just one message in a nested event context. I think this is the best we
can do with nested event contexts.

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/selftest/tests.py             |    1 +
 source3/torture/proto.h               |    1 +
 source3/torture/test_messaging_read.c |  200 +++++++++++++++++++++++++++++++++
 source3/torture/torture.c             |    1 +
 4 files changed, 203 insertions(+)

diff --git a/source3/selftest/tests.py b/source3/selftest/tests.py
index fcf3cd5..bad66d3 100755
--- a/source3/selftest/tests.py
+++ b/source3/selftest/tests.py
@@ -104,6 +104,7 @@ local_tests = [
     "LOCAL-IDMAP-TDB-COMMON",
     "LOCAL-MESSAGING-READ1",
     "LOCAL-MESSAGING-READ2",
+    "LOCAL-MESSAGING-READ3",
     "LOCAL-hex_encode_buf",
     "LOCAL-sprintf_append",
     "LOCAL-remove_duplicate_addrs2"]
diff --git a/source3/torture/proto.h b/source3/torture/proto.h
index a737ea4..20c1110 100644
--- a/source3/torture/proto.h
+++ b/source3/torture/proto.h
@@ -115,5 +115,6 @@ bool run_qpathinfo_bufsize(int dummy);
 bool run_bench_pthreadpool(int dummy);
 bool run_messaging_read1(int dummy);
 bool run_messaging_read2(int dummy);
+bool run_messaging_read3(int dummy);
 
 #endif /* __TORTURE_H__ */
diff --git a/source3/torture/test_messaging_read.c b/source3/torture/test_messaging_read.c
index 0bb3128..757b83e 100644
--- a/source3/torture/test_messaging_read.c
+++ b/source3/torture/test_messaging_read.c
@@ -248,3 +248,203 @@ fail:
 	TALLOC_FREE(ev);
 	return retval;
 }
+
+struct msg_pingpong_state {
+	uint8_t dummy;
+};
+
+static void msg_pingpong_done(struct tevent_req *subreq);
+
+static struct tevent_req *msg_pingpong_send(TALLOC_CTX *mem_ctx,
+					    struct tevent_context *ev,
+					    struct messaging_context *msg_ctx,
+					    struct server_id dst)
+{
+	struct tevent_req *req, *subreq;
+	struct msg_pingpong_state *state;
+	NTSTATUS status;
+
+	req = tevent_req_create(mem_ctx, &state, struct msg_pingpong_state);
+	if (req == NULL) {
+		return NULL;
+	}
+
+	status = messaging_send_buf(msg_ctx, dst, MSG_PING, NULL, 0);
+	if (!NT_STATUS_IS_OK(status)) {
+		tevent_req_error(req, map_errno_from_nt_status(status));
+		return tevent_req_post(req, ev);
+	}
+
+	subreq = messaging_read_send(state, ev, msg_ctx, MSG_PONG);
+	if (tevent_req_nomem(subreq, req)) {
+		return tevent_req_post(req, ev);
+	}
+	tevent_req_set_callback(subreq, msg_pingpong_done, req);
+	return req;
+}
+
+static void msg_pingpong_done(struct tevent_req *subreq)
+{
+	struct tevent_req *req = tevent_req_callback_data(
+		subreq, struct tevent_req);
+	int ret;
+
+	ret = messaging_read_recv(subreq, NULL, NULL);
+	TALLOC_FREE(subreq);
+	if (ret != 0) {
+		tevent_req_error(req, ret);
+		return;
+	}
+	tevent_req_done(req);
+}
+
+static int msg_pingpong_recv(struct tevent_req *req)
+{
+	int err;
+
+	if (tevent_req_is_unix_error(req, &err)) {
+		return err;
+	}
+	return 0;
+}
+
+static int msg_pingpong(struct messaging_context *msg_ctx,
+			struct server_id dst)
+{
+	struct tevent_context *ev;
+	struct tevent_req *req;
+	int ret = ENOMEM;
+
+	ev = tevent_context_init(msg_ctx);
+	if (ev == NULL) {
+		goto fail;
+	}
+	req = msg_pingpong_send(ev, ev, msg_ctx, dst);
+	if (req == NULL) {
+		goto fail;
+	}
+	if (!tevent_req_poll(req, ev)) {
+		ret = errno;
+		goto fail;
+	}
+	ret = msg_pingpong_recv(req);
+fail:
+	TALLOC_FREE(ev);
+	return ret;
+}
+
+static void ping_responder_exit(struct tevent_context *ev,
+				struct tevent_fd *fde,
+				uint16_t flags,
+				void *private_data)
+{
+	bool *done = private_data;
+	*done = true;
+}
+
+static void ping_responder(int ready_pipe, int exit_pipe)
+{
+	struct tevent_context *ev;
+	struct messaging_context *msg_ctx;
+	struct tevent_fd *exit_handler;
+	char c = 0;
+	bool done = false;
+
+	ev = samba_tevent_context_init(talloc_tos());
+	if (ev == NULL) {
+		fprintf(stderr, "child tevent_context_init failed\n");
+		exit(1);
+	}
+	msg_ctx = messaging_init(ev, ev);
+	if (msg_ctx == NULL) {
+		fprintf(stderr, "child messaging_init failed\n");
+		exit(1);
+	}
+	exit_handler = tevent_add_fd(ev, ev, exit_pipe, TEVENT_FD_READ,
+				     ping_responder_exit, &done);
+	if (exit_handler == NULL) {
+		fprintf(stderr, "child tevent_add_fd failed\n");
+		exit(1);
+	}
+
+	if (write(ready_pipe, &c, 1) != 1) {
+		fprintf(stderr, "child messaging_init failed\n");
+		exit(1);
+	}
+
+	while (!done) {
+		int ret;
+		ret = tevent_loop_once(ev);
+		if (ret != 0) {
+			fprintf(stderr, "child tevent_loop_once failed\n");
+			exit(1);
+		}
+	}
+
+	TALLOC_FREE(msg_ctx);
+	TALLOC_FREE(ev);
+}
+
+bool run_messaging_read3(int dummy)
+{
+	struct tevent_context *ev = NULL;
+	struct messaging_context *msg_ctx = NULL;
+	bool retval = false;
+	pid_t child;
+	int ready_pipe[2];
+	int exit_pipe[2];
+	int ret;
+	char c;
+	struct server_id dst;
+
+	if ((pipe(ready_pipe) != 0) || (pipe(exit_pipe) != 0)) {
+		perror("pipe failed");
+		return false;
+	}
+
+	child = fork();
+	if (child == -1) {
+		perror("fork failed");
+		return false;
+	}
+
+	if (child == 0) {
+		close(ready_pipe[0]);
+		close(exit_pipe[1]);
+		ping_responder(ready_pipe[1], exit_pipe[0]);
+		exit(0);
+	}
+	close(ready_pipe[1]);
+	close(exit_pipe[0]);
+
+	if (read(ready_pipe[0], &c, 1) != 1) {
+		perror("read failed");
+		return false;
+	}
+
+	ev = samba_tevent_context_init(talloc_tos());
+	if (ev == NULL) {
+		fprintf(stderr, "tevent_context_init failed\n");
+		goto fail;
+	}
+	msg_ctx = messaging_init(ev, ev);
+	if (msg_ctx == NULL) {
+		fprintf(stderr, "messaging_init failed\n");
+		goto fail;
+	}
+
+	dst = messaging_server_id(msg_ctx);
+	dst.pid = child;
+
+	ret = msg_pingpong(msg_ctx, dst);
+	if (ret != 0){
+		fprintf(stderr, "msg_pingpong failed\n");
+		goto fail;
+	}
+
+	retval = true;
+fail:
+	TALLOC_FREE(msg_ctx);
+	TALLOC_FREE(ev);
+	return retval;
+}
diff --git a/source3/torture/torture.c b/source3/torture/torture.c
index f97119a..0826506 100644
--- a/source3/torture/torture.c
+++ b/source3/torture/torture.c
@@ -9576,6 +9576,7 @@ static struct {
 	{ "LOCAL-DBWRAP-WATCH1", run_dbwrap_watch1, 0 },
 	{ "LOCAL-MESSAGING-READ1", run_messaging_read1, 0 },
 	{ "LOCAL-MESSAGING-READ2", run_messaging_read2, 0 },
+	{ "LOCAL-MESSAGING-READ3", run_messaging_read3, 0 },
 	{ "LOCAL-BASE64", run_local_base64, 0},
 	{ "LOCAL-RBTREE", run_local_rbtree, 0},
 	{ "LOCAL-MEMCACHE", run_local_memcache, 0},
-- 
1.7.9.5



More information about the samba-technical mailing list