[PATCH] messaging3 improvements

Jeremy Allison jra at samba.org
Wed May 7 17:32:37 MDT 2014


On Wed, May 07, 2014 at 03:44:46PM -0700, Jeremy Allison wrote:
> On Wed, May 07, 2014 at 03:32:16PM -0700, Jeremy Allison wrote:
> > On Wed, May 07, 2014 at 02:53:36PM -0700, Jeremy Allison wrote:
> > > 
> > > Ok, don't worry I'm not planning to push until
> > > I've also reviewed this code and understand *exactly*
> > > what it's doing (which is fun, as it's complex Volker code :-).
> > 
> > OK, I think the following patch needs squashing
> > into the source3/lib/poll_funcs/poll_funcs_tevent.c
> > change:
> > 
> > ----------------------------------------------
> > [PATCH 05/12] lib: Enhance poll_funcs_tevent for multiple tevent_contexts
> > ----------------------------------------------
> > 
> > state->contexts[i] can explicitly contain NULL,
> > and whilst the code handles this for the
> > destructor case, and the poll_funcs_context_slot_find()
> > case, it doesn't cope with state->contexts[i]==NULL
> > in poll_funcs_watch_find_slot(), tevent_watch_new()
> > or tevent_watch_update().
> > 
> > Volker, please review (and feel free to merge
> > into 05/12 if you think it's right).
> 
> Here's the updated version containing that
> squash I'm still reviewing - just to keep
> everything in sync.

OK, I've finished reviewing and valgrinding
and I'm happy with the following patchset,
with Metze and my 'Reviewed-by's added.

Volker, if you could review the small
changes I made and give it your blessing
feel free to push or I'll push tomorrow
(my time) if you give me your OK to do
so.

Cheers,

	Jeremy.
-------------- next part --------------
From 10cea71cbd1644c7c39fd55f13fb864acbc07f8f Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Thu, 24 Apr 2014 09:05:53 +0000
Subject: [PATCH 01/12] messaging3: Add messaging_filtered_read

This delegates the decision whether to read a message to a callback

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/include/messages.h |   8 +++
 source3/lib/messages.c     | 128 ++++++++++++++++++++++++++++++++++++---------
 2 files changed, 111 insertions(+), 25 deletions(-)

diff --git a/source3/include/messages.h b/source3/include/messages.h
index 06c1748..7801dfb 100644
--- a/source3/include/messages.h
+++ b/source3/include/messages.h
@@ -142,6 +142,14 @@ NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
 			    struct messaging_rec *rec);
 
+struct tevent_req *messaging_filtered_read_send(
+	TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+	struct messaging_context *msg_ctx,
+	bool (*filter)(struct messaging_rec *rec, void *private_data),
+	void *private_data);
+int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+				 struct messaging_rec **presult);
+
 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
 				       struct tevent_context *ev,
 				       struct messaging_context *msg,
diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index 9284ac1..ca254a4 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -458,33 +458,38 @@ static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
 	return result;
 }
 
-struct messaging_read_state {
+struct messaging_filtered_read_state {
 	struct tevent_context *ev;
 	struct messaging_context *msg_ctx;
-	uint32_t msg_type;
+
+	bool (*filter)(struct messaging_rec *rec, void *private_data);
+	void *private_data;
+
 	struct messaging_rec *rec;
 };
 
-static void messaging_read_cleanup(struct tevent_req *req,
-				   enum tevent_req_state req_state);
+static void messaging_filtered_read_cleanup(struct tevent_req *req,
+					    enum tevent_req_state req_state);
 
-struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
-				       struct tevent_context *ev,
-				       struct messaging_context *msg_ctx,
-				       uint32_t msg_type)
+struct tevent_req *messaging_filtered_read_send(
+	TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+	struct messaging_context *msg_ctx,
+	bool (*filter)(struct messaging_rec *rec, void *private_data),
+	void *private_data)
 {
 	struct tevent_req *req;
-	struct messaging_read_state *state;
+	struct messaging_filtered_read_state *state;
 	size_t new_waiters_len;
 
 	req = tevent_req_create(mem_ctx, &state,
-				struct messaging_read_state);
+				struct messaging_filtered_read_state);
 	if (req == NULL) {
 		return NULL;
 	}
 	state->ev = ev;
 	state->msg_ctx = msg_ctx;
-	state->msg_type = msg_type;
+	state->filter = filter;
+	state->private_data = private_data;
 
 	new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
 
@@ -501,16 +506,16 @@ struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
 
 	msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
 	msg_ctx->num_new_waiters += 1;
-	tevent_req_set_cleanup_fn(req, messaging_read_cleanup);
+	tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
 
 	return req;
 }
 
-static void messaging_read_cleanup(struct tevent_req *req,
-				   enum tevent_req_state req_state)
+static void messaging_filtered_read_cleanup(struct tevent_req *req,
+					    enum tevent_req_state req_state)
 {
-	struct messaging_read_state *state = tevent_req_data(
-		req, struct messaging_read_state);
+	struct messaging_filtered_read_state *state = tevent_req_data(
+		req, struct messaging_filtered_read_state);
 	struct messaging_context *msg_ctx = state->msg_ctx;
 	unsigned i;
 
@@ -531,11 +536,11 @@ static void messaging_read_cleanup(struct tevent_req *req,
 	}
 }
 
-static void messaging_read_done(struct tevent_req *req,
-				struct messaging_rec *rec)
+static void messaging_filtered_read_done(struct tevent_req *req,
+					 struct messaging_rec *rec)
 {
-	struct messaging_read_state *state = tevent_req_data(
-		req, struct messaging_read_state);
+	struct messaging_filtered_read_state *state = tevent_req_data(
+		req, struct messaging_filtered_read_state);
 
 	state->rec = messaging_rec_dup(state, rec);
 	if (tevent_req_nomem(state->rec, req)) {
@@ -544,6 +549,79 @@ static void messaging_read_done(struct tevent_req *req,
 	tevent_req_done(req);
 }
 
+int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+				 struct messaging_rec **presult)
+{
+	struct messaging_filtered_read_state *state = tevent_req_data(
+		req, struct messaging_filtered_read_state);
+	int err;
+
+	if (tevent_req_is_unix_error(req, &err)) {
+		tevent_req_received(req);
+		return err;
+	}
+	*presult = talloc_move(mem_ctx, &state->rec);
+	return 0;
+}
+
+struct messaging_read_state {
+	uint32_t msg_type;
+	struct messaging_rec *rec;
+};
+
+static bool messaging_read_filter(struct messaging_rec *rec,
+				  void *private_data);
+static void messaging_read_done(struct tevent_req *subreq);
+
+struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
+				       struct tevent_context *ev,
+				       struct messaging_context *msg,
+				       uint32_t msg_type)
+{
+	struct tevent_req *req, *subreq;
+	struct messaging_read_state *state;
+
+	req = tevent_req_create(mem_ctx, &state,
+				struct messaging_read_state);
+	if (req == NULL) {
+		return NULL;
+	}
+	state->msg_type = msg_type;
+
+	subreq = messaging_filtered_read_send(state, ev, msg,
+					      messaging_read_filter, state);
+	if (tevent_req_nomem(subreq, req)) {
+		return tevent_req_post(req, ev);
+	}
+	tevent_req_set_callback(subreq, messaging_read_done, req);
+	return req;
+}
+
+static bool messaging_read_filter(struct messaging_rec *rec,
+				  void *private_data)
+{
+	struct messaging_read_state *state = talloc_get_type_abort(
+		private_data, struct messaging_read_state);
+
+	return rec->msg_type == state->msg_type;
+}
+
+static void messaging_read_done(struct tevent_req *subreq)
+{
+	struct tevent_req *req = tevent_req_callback_data(
+		subreq, struct tevent_req);
+	struct messaging_read_state *state = tevent_req_data(
+		req, struct messaging_read_state);
+	int ret;
+
+	ret = messaging_filtered_read_recv(subreq, state, &state->rec);
+	TALLOC_FREE(subreq);
+	if (tevent_req_error(req, ret)) {
+		return;
+	}
+	tevent_req_done(req);
+}
+
 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
 			struct messaging_rec **presult)
 {
@@ -552,7 +630,6 @@ int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
 	int err;
 
 	if (tevent_req_is_unix_error(req, &err)) {
-		tevent_req_received(req);
 		return err;
 	}
 	if (presult != NULL) {
@@ -618,7 +695,7 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx,
 	i = 0;
 	while (i < msg_ctx->num_waiters) {
 		struct tevent_req *req;
-		struct messaging_read_state *state;
+		struct messaging_filtered_read_state *state;
 
 		req = msg_ctx->waiters[i];
 		if (req == NULL) {
@@ -638,9 +715,10 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx,
 			continue;
 		}
 
-		state = tevent_req_data(req, struct messaging_read_state);
-		if (state->msg_type == rec->msg_type) {
-			messaging_read_done(req, rec);
+		state = tevent_req_data(
+			req, struct messaging_filtered_read_state);
+		if (state->filter(rec, state->private_data)) {
+			messaging_filtered_read_done(req, rec);
 		}
 
 		i += 1;
-- 
1.9.1.423.g4596e3a


From c4c221130fec4762632f8447cde3c773d0984521 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Thu, 24 Apr 2014 09:23:48 +0000
Subject: [PATCH 02/12] dbwrap: Use messaging_filtered_read

This does not really save any code lines, but IMHO the code is simpler
this way. Also, in case we have lots of watchers this will be slightly
cheaper, because we don't have to re-establish a tevent_req.

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/lib/dbwrap/dbwrap_watch.c | 42 +++++++++++++++++++--------------------
 1 file changed, 21 insertions(+), 21 deletions(-)

diff --git a/source3/lib/dbwrap/dbwrap_watch.c b/source3/lib/dbwrap/dbwrap_watch.c
index 4f3a2b3..a5f1ebd 100644
--- a/source3/lib/dbwrap/dbwrap_watch.c
+++ b/source3/lib/dbwrap/dbwrap_watch.c
@@ -235,6 +235,8 @@ struct dbwrap_record_watch_state {
 	TDB_DATA w_key;
 };
 
+static bool dbwrap_record_watch_filter(struct messaging_rec *rec,
+				       void *private_data);
 static void dbwrap_record_watch_done(struct tevent_req *subreq);
 static int dbwrap_record_watch_state_destructor(
 	struct dbwrap_record_watch_state *state);
@@ -271,8 +273,8 @@ struct tevent_req *dbwrap_record_watch_send(TALLOC_CTX *mem_ctx,
 		return tevent_req_post(req, ev);
 	}
 
-	subreq = messaging_read_send(state, ev, state->msg,
-				     MSG_DBWRAP_MODIFIED);
+	subreq = messaging_filtered_read_send(
+		state, ev, state->msg, dbwrap_record_watch_filter, state);
 	if (tevent_req_nomem(subreq, req)) {
 		return tevent_req_post(req, ev);
 	}
@@ -288,6 +290,21 @@ struct tevent_req *dbwrap_record_watch_send(TALLOC_CTX *mem_ctx,
 	return req;
 }
 
+static bool dbwrap_record_watch_filter(struct messaging_rec *rec,
+				       void *private_data)
+{
+	struct dbwrap_record_watch_state *state = talloc_get_type_abort(
+		private_data, struct dbwrap_record_watch_state);
+
+	if (rec->msg_type != MSG_DBWRAP_MODIFIED) {
+		return false;
+	}
+	if (rec->buf.length != state->w_key.dsize) {
+		return false;
+	}
+	return memcmp(rec->buf.data, state->w_key.dptr,	rec->buf.length) == 0;
+}
+
 static int dbwrap_record_watch_state_destructor(
 	struct dbwrap_record_watch_state *s)
 {
@@ -351,33 +368,16 @@ static void dbwrap_record_watch_done(struct tevent_req *subreq)
 {
 	struct tevent_req *req = tevent_req_callback_data(
 		subreq, struct tevent_req);
-	struct dbwrap_record_watch_state *state = tevent_req_data(
-		req, struct dbwrap_record_watch_state);
 	struct messaging_rec *rec;
 	int ret;
 
-	ret = messaging_read_recv(subreq, talloc_tos(), &rec);
+	ret = messaging_filtered_read_recv(subreq, talloc_tos(), &rec);
 	TALLOC_FREE(subreq);
 	if (ret != 0) {
 		tevent_req_nterror(req, map_nt_error_from_unix(ret));
 		return;
 	}
-
-	if ((rec->buf.length == state->w_key.dsize) &&
-	    (memcmp(rec->buf.data, state->w_key.dptr, rec->buf.length) == 0)) {
-		tevent_req_done(req);
-		return;
-	}
-
-	/*
-	 * Not our record, wait for the next one
-	 */
-	subreq = messaging_read_send(state, state->ev, state->msg,
-				     MSG_DBWRAP_MODIFIED);
-	if (tevent_req_nomem(subreq, req)) {
-		return;
-	}
-	tevent_req_set_callback(subreq, dbwrap_record_watch_done, req);
+	tevent_req_done(req);
 }
 
 NTSTATUS dbwrap_record_watch_recv(struct tevent_req *req,
-- 
1.9.1.423.g4596e3a


From 9ee71a8897d6daf0f304b116594e3ca647f06fb5 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 2 May 2014 09:12:52 +0000
Subject: [PATCH 03/12] messaging3: Fix 80-char line limit

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/lib/messages.c | 14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index ca254a4..065782a 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -709,7 +709,7 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx,
 				memmove(&msg_ctx->waiters[i],
 					&msg_ctx->waiters[i+1],
 					sizeof(struct tevent_req *) *
-						(msg_ctx->num_waiters - i - 1));
+					    (msg_ctx->num_waiters - i - 1));
 			}
 			msg_ctx->num_waiters -= 1;
 			continue;
@@ -735,7 +735,8 @@ bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
 
 	req = background_job_send(
 		msg, msg->event_ctx, msg, NULL, 0,
-		lp_parm_int(-1, "messaging", "messaging dgm cleanup interval", 60*15),
+		lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
+			    60*15),
 		mess_parent_dgm_cleanup, msg);
 	if (req == NULL) {
 		return false;
@@ -752,7 +753,8 @@ static int mess_parent_dgm_cleanup(void *private_data)
 
 	status = messaging_dgm_wipe(msg_ctx);
 	DEBUG(10, ("messaging_dgm_wipe returned %s\n", nt_errstr(status)));
-	return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval", 60*15);
+	return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
+			   60*15);
 }
 
 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
@@ -763,11 +765,13 @@ static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
 
 	status = background_job_recv(req);
 	TALLOC_FREE(req);
-	DEBUG(1, ("messaging dgm cleanup job ended with %s\n", nt_errstr(status)));
+	DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
+		  nt_errstr(status)));
 
 	req = background_job_send(
 		msg, msg->event_ctx, msg, NULL, 0,
-		lp_parm_int(-1, "messaging", "messaging dgm cleanup interval", 60*15),
+		lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
+			    60*15),
 		mess_parent_dgm_cleanup, msg);
 	if (req == NULL) {
 		DEBUG(1, ("background_job_send failed\n"));
-- 
1.9.1.423.g4596e3a


From 1b969251a15504f1bc16f7e4a18793011f87e1a0 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 2 May 2014 09:20:40 +0000
Subject: [PATCH 04/12] messaging3: Add comments about not touching "waiters"

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/lib/messages.c | 16 ++++++++++++++++
 1 file changed, 16 insertions(+)

diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index 065782a..6a08531 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -491,6 +491,14 @@ struct tevent_req *messaging_filtered_read_send(
 	state->filter = filter;
 	state->private_data = private_data;
 
+	/*
+	 * We add ourselves to the "new_waiters" array, not the "waiters"
+	 * array. If we are called from within messaging_read_done,
+	 * messaging_dispatch_rec will be in an active for-loop on
+	 * "waiters". We must be careful not to mess with this array, because
+	 * it could mean that a single event is being delivered twice.
+	 */
+
 	new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
 
 	if (new_waiters_len == msg_ctx->num_new_waiters) {
@@ -521,6 +529,14 @@ static void messaging_filtered_read_cleanup(struct tevent_req *req,
 
 	tevent_req_set_cleanup_fn(req, NULL);
 
+	/*
+	 * Just set the [new_]waiters entry to NULL, be careful not to mess
+	 * with the other "waiters" array contents. We are often called from
+	 * within "messaging_dispatch_rec", which loops over
+	 * "waiters". Messing with the "waiters" array will mess up that
+	 * for-loop.
+	 */
+
 	for (i=0; i<msg_ctx->num_waiters; i++) {
 		if (msg_ctx->waiters[i] == req) {
 			msg_ctx->waiters[i] = NULL;
-- 
1.9.1.423.g4596e3a


From d5ceea4eb3f326dc8f7fc417c84d6566f033c767 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 5 May 2014 08:45:52 +0200
Subject: [PATCH 05/12] lib: Enhance poll_funcs_tevent for multiple
 tevent_contexts

With this patch it will be possible to use nested event contexts with
messaging_filtered_read_send/recv. Before this patchset only the one and only
event context a messaging_context is initialized with is able to receive
datagrams from the unix domain socket. So if you want to code a synchronous
RPC-like operation using a nested event context, you will not see the reply,
because the nested event context does not have the required tevent_fd's.
Unfortunately, this patchset has to add some advanced array voodoo. The idea
is that state->watches[] contains what we hand out with watch_new, and
state->contexts contains references to the tevent_contexts. For every watch we
need a tevent_fd in every event context, and the routines make sure that the
arrays are properly maintained.

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/lib/messages_dgm.c                 |  18 +-
 source3/lib/poll_funcs/poll_funcs_tevent.c | 356 +++++++++++++++++++++++++++--
 source3/lib/poll_funcs/poll_funcs_tevent.h |  15 +-
 source3/lib/unix_msg/test_drain.c          |  11 +-
 source3/lib/unix_msg/test_source.c         |  16 +-
 source3/lib/unix_msg/tests.c               |  23 +-
 6 files changed, 398 insertions(+), 41 deletions(-)

diff --git a/source3/lib/messages_dgm.c b/source3/lib/messages_dgm.c
index 354dac3..56643b1 100644
--- a/source3/lib/messages_dgm.c
+++ b/source3/lib/messages_dgm.c
@@ -30,7 +30,8 @@
 
 struct messaging_dgm_context {
 	struct messaging_context *msg_ctx;
-	struct poll_funcs msg_callbacks;
+	struct poll_funcs *msg_callbacks;
+	void *tevent_handle;
 	struct unix_msg_ctx *dgm_ctx;
 	char *cache_dir;
 	int lockfile_fd;
@@ -224,7 +225,18 @@ NTSTATUS messaging_dgm_init(struct messaging_context *msg_ctx,
 		return map_nt_error_from_unix(ret);
 	}
 
-	poll_funcs_init_tevent(&ctx->msg_callbacks, msg_ctx->event_ctx);
+	ctx->msg_callbacks = poll_funcs_init_tevent(ctx);
+	if (ctx->msg_callbacks == NULL) {
+		TALLOC_FREE(result);
+		return NT_STATUS_NO_MEMORY;
+	}
+
+	ctx->tevent_handle = poll_funcs_tevent_register(
+		ctx, ctx->msg_callbacks, msg_ctx->event_ctx);
+	if (ctx->tevent_handle == NULL) {
+		TALLOC_FREE(result);
+		return NT_STATUS_NO_MEMORY;
+	}
 
 	ok = directory_create_or_exist_strict(socket_dir, sec_initial_uid(),
 					      0700);
@@ -239,7 +251,7 @@ NTSTATUS messaging_dgm_init(struct messaging_context *msg_ctx,
 
 	generate_random_buffer((uint8_t *)&cookie, sizeof(cookie));
 
-	ret = unix_msg_init(socket_name, &ctx->msg_callbacks, 1024, cookie,
+	ret = unix_msg_init(socket_name, ctx->msg_callbacks, 1024, cookie,
 			    messaging_dgm_recv, ctx, &ctx->dgm_ctx);
 	TALLOC_FREE(socket_name);
 	if (ret != 0) {
diff --git a/source3/lib/poll_funcs/poll_funcs_tevent.c b/source3/lib/poll_funcs/poll_funcs_tevent.c
index 6e75042..ee800ba 100644
--- a/source3/lib/poll_funcs/poll_funcs_tevent.c
+++ b/source3/lib/poll_funcs/poll_funcs_tevent.c
@@ -1,6 +1,6 @@
 /*
  * Unix SMB/CIFS implementation.
- * Copyright (C) Volker Lendecke 2013
+ * Copyright (C) Volker Lendecke 2013,2014
  *
  * This program is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License as published by
@@ -20,14 +20,56 @@
 #include "tevent.h"
 #include "system/select.h"
 
+/*
+ * A poll_watch is asked for by the engine using this library via
+ * funcs->watch_new(). It represents interest in "fd" becoming readable or
+ * writable.
+ */
+
 struct poll_watch {
-	struct tevent_fd *fde;
+	struct poll_funcs_state *state;
+	unsigned slot; 		/* index into state->watches[] */
 	int fd;
+	int events;
 	void (*callback)(struct poll_watch *w, int fd, short events,
 			 void *private_data);
 	void *private_data;
 };
 
+struct poll_funcs_state {
+	/*
+	 * "watches" is the array of all watches that we have handed out via
+	 * funcs->watch_new(). The "watches" array can contain NULL pointers.
+	 */
+	unsigned num_watches;
+	struct poll_watch **watches;
+
+	/*
+	 * "contexts is the array of tevent_contexts that serve
+	 * "watches". "contexts" can contain NULL pointers.
+	 */
+	unsigned num_contexts;
+	struct poll_funcs_tevent_context **contexts;
+};
+
+struct poll_funcs_tevent_context {
+	unsigned refcount;
+	struct poll_funcs_state *state;
+	unsigned slot;		/* index into state->contexts[] */
+	struct tevent_context *ev;
+	struct tevent_fd **fdes; /* same indexes as state->watches[] */
+};
+
+/*
+ * poll_funcs_tevent_register() hands out a struct poll_funcs_tevent_handle as
+ * a void *. poll_funcs_tevent_register allows tevent_contexts to be
+ * registered multiple times, and we can't add a tevent_fd for the same fd's
+ * multiple times. So we have to share one poll_funcs_tevent_context.
+ */
+struct poll_funcs_tevent_handle {
+	struct poll_funcs_tevent_context *ctx;
+};
+
 static uint16_t poll_events_to_tevent(short events)
 {
 	uint16_t ret = 0;
@@ -54,9 +96,56 @@ static short tevent_to_poll_events(uint16_t flags)
 	return ret;
 }
 
-static void tevent_watch_handler(struct tevent_context *ev,
-				 struct tevent_fd *fde, uint16_t flags,
-				 void *private_data);
+/*
+ * Find or create a free slot in state->watches[]
+ */
+static bool poll_funcs_watch_find_slot(struct poll_funcs_state *state,
+				       unsigned *slot)
+{
+	struct poll_watch **watches;
+	unsigned i;
+
+	for (i=0; i<state->num_watches; i++) {
+		if (state->watches[i] == NULL) {
+			*slot = i;
+			return true;
+		}
+	}
+
+	watches = talloc_realloc(state, state->watches, struct poll_watch *,
+				 state->num_watches + 1);
+	if (watches == NULL) {
+		return false;
+	}
+	watches[state->num_watches] = NULL;
+	state->watches = watches;
+
+	for (i=0; i<state->num_contexts; i++) {
+		struct tevent_fd **fdes;
+		struct poll_funcs_tevent_context *c = state->contexts[i];
+		if (c == NULL) {
+			continue;
+		}
+		fdes = talloc_realloc(c, c->fdes, struct tevent_fd *,
+				      state->num_watches + 1);
+		if (fdes == NULL) {
+			return false;
+		}
+		c->fdes = fdes;
+
+		fdes[state->num_watches] = NULL;
+	}
+
+	*slot = state->num_watches;
+	state->num_watches += 1;
+
+	return true;
+}
+
+static void poll_funcs_fde_handler(struct tevent_context *ev,
+				   struct tevent_fd *fde, uint16_t flags,
+				   void *private_data);
+static int poll_watch_destructor(struct poll_watch *w);
 
 static struct poll_watch *tevent_watch_new(
 	const struct poll_funcs *funcs, int fd, short events,
@@ -64,45 +153,87 @@ static struct poll_watch *tevent_watch_new(
 			 void *private_data),
 	void *private_data)
 {
-	struct tevent_context *ev = talloc_get_type_abort(
-		funcs->private_data, struct tevent_context);
+	struct poll_funcs_state *state = talloc_get_type_abort(
+		funcs->private_data, struct poll_funcs_state);
 	struct poll_watch *w;
+	unsigned i, slot;
 
-	w = talloc(ev, struct poll_watch);
-	if (w == NULL) {
+	if (!poll_funcs_watch_find_slot(state, &slot)) {
 		return NULL;
 	}
-	w->fde = tevent_add_fd(ev, w, fd, poll_events_to_tevent(events),
-			       tevent_watch_handler, w);
-	if (w->fde == NULL) {
-		TALLOC_FREE(w);
+
+	w = talloc(state->watches, struct poll_watch);
+	if (w == NULL) {
 		return NULL;
 	}
+	w->state = state;
+	w->slot = slot;
+	w->fd = fd;
+	w->events = poll_events_to_tevent(events);
 	w->fd = fd;
 	w->callback = callback;
 	w->private_data = private_data;
+	state->watches[slot] = w;
+
+	talloc_set_destructor(w, poll_watch_destructor);
+
+	for (i=0; i<state->num_contexts; i++) {
+		struct poll_funcs_tevent_context *c = state->contexts[i];
+		if (c == NULL) {
+			continue;
+		}
+		c->fdes[slot] = tevent_add_fd(c->ev, c->fdes, w->fd, w->events,
+					      poll_funcs_fde_handler, w);
+		if (c->fdes[slot] == NULL) {
+			goto fail;
+		}
+	}
 	return w;
+
+fail:
+	TALLOC_FREE(w);
+	return NULL;
 }
 
-static void tevent_watch_handler(struct tevent_context *ev,
-				 struct tevent_fd *fde, uint16_t flags,
-				 void *private_data)
+static int poll_watch_destructor(struct poll_watch *w)
 {
-	struct poll_watch *w = talloc_get_type_abort(
-		private_data, struct poll_watch);
+	struct poll_funcs_state *state = w->state;
+	unsigned slot = w->slot;
+	unsigned i;
+
+	TALLOC_FREE(state->watches[slot]);
+
+	for (i=0; i<state->num_contexts; i++) {
+		struct poll_funcs_tevent_context *c = state->contexts[i];
+		if (c == NULL) {
+			continue;
+		}
+		TALLOC_FREE(c->fdes[slot]);
+	}
 
-	w->callback(w, w->fd, tevent_to_poll_events(flags),
-		    w->private_data);
+	return 0;
 }
 
 static void tevent_watch_update(struct poll_watch *w, short events)
 {
-	tevent_fd_set_flags(w->fde, poll_events_to_tevent(events));
+	struct poll_funcs_state *state = w->state;
+	unsigned slot = w->slot;
+	unsigned i;
+
+	w->events = poll_events_to_tevent(events);
+
+	for (i=0; i<state->num_contexts; i++) {
+		struct poll_funcs_tevent_context *c = state->contexts[i];
+		if (c == NULL) {
+			continue;
+		}
+		tevent_fd_set_flags(c->fdes[slot], w->events);
+	}
 }
 
 static short tevent_watch_get_events(struct poll_watch *w)
 {
-	return tevent_to_poll_events(tevent_fd_get_flags(w->fde));
+	return tevent_to_poll_events(w->events);
 }
 
 static void tevent_watch_free(struct poll_watch *w)
@@ -130,8 +261,24 @@ static void tevent_timeout_free(struct poll_timeout *t)
 	return;
 }
 
-void poll_funcs_init_tevent(struct poll_funcs *f, struct tevent_context *ev)
+static int poll_funcs_state_destructor(struct poll_funcs_state *state);
+
+struct poll_funcs *poll_funcs_init_tevent(TALLOC_CTX *mem_ctx)
 {
+	struct poll_funcs *f;
+	struct poll_funcs_state *state;
+
+	f = talloc(mem_ctx, struct poll_funcs);
+	if (f == NULL) {
+		return NULL;
+	}
+	state = talloc_zero(f, struct poll_funcs_state);
+	if (state == NULL) {
+		TALLOC_FREE(f);
+		return NULL;
+	}
+	talloc_set_destructor(state, poll_funcs_state_destructor);
+
 	f->watch_new = tevent_watch_new;
 	f->watch_update = tevent_watch_update;
 	f->watch_get_events = tevent_watch_get_events;
@@ -139,5 +286,166 @@ void poll_funcs_init_tevent(struct poll_funcs *f, struct tevent_context *ev)
 	f->timeout_new = tevent_timeout_new;
 	f->timeout_update = tevent_timeout_update;
 	f->timeout_free = tevent_timeout_free;
-	f->private_data = ev;
+	f->private_data = state;
+	return f;
+}
+
+static int poll_funcs_state_destructor(struct poll_funcs_state *state)
+{
+	unsigned i;
+	/*
+	 * Make sure the watches are cleared before the contexts. The watches
+	 * have destructors attached to them that clean up the fde's
+	 */
+	for (i=0; i<state->num_watches; i++) {
+		TALLOC_FREE(state->watches[i]);
+	}
+	return 0;
+}
+
+/*
+ * Find or create a free slot in state->contexts[]
+ */
+static bool poll_funcs_context_slot_find(struct poll_funcs_state *state,
+					 struct tevent_context *ev,
+					 unsigned *slot)
+{
+	struct poll_funcs_tevent_context **contexts;
+	unsigned i;
+
+	for (i=0; i<state->num_contexts; i++) {
+		struct poll_funcs_tevent_context *ctx = state->contexts[i];
+
+		if ((ctx == NULL) || (ctx->ev == ev)) {
+			*slot = i;
+			return true;
+		}
+	}
+
+	contexts = talloc_realloc(state, state->contexts,
+				  struct poll_funcs_tevent_context *,
+				  state->num_contexts + 1);
+	if (contexts == NULL) {
+		return false;
+	}
+	state->contexts = contexts;
+	state->contexts[state->num_contexts] = NULL;
+
+	*slot = state->num_contexts;
+	state->num_contexts += 1;
+
+	return true;
+}
+
+static int poll_funcs_tevent_context_destructor(
+	struct poll_funcs_tevent_context *ctx);
+
+static struct poll_funcs_tevent_context *poll_funcs_tevent_context_new(
+	TALLOC_CTX *mem_ctx, struct poll_funcs_state *state,
+	struct tevent_context *ev, unsigned slot)
+{
+	struct poll_funcs_tevent_context *ctx;
+	unsigned i;
+
+	ctx = talloc(mem_ctx, struct poll_funcs_tevent_context);
+	if (ctx == NULL) {
+		return NULL;
+	}
+
+	ctx->refcount = 0;
+	ctx->state = state;
+	ctx->ev = ev;
+	ctx->slot = slot;
+
+	ctx->fdes = talloc_array(ctx, struct tevent_fd *, state->num_watches);
+	if (ctx->fdes == NULL) {
+		goto fail;
+	}
+
+	for (i=0; i<state->num_watches; i++) {
+		struct poll_watch *w = state->watches[i];
+
+		if (w == NULL) {
+			ctx->fdes[i] = NULL;
+			continue;
+		}
+		ctx->fdes[i] = tevent_add_fd(ev, ctx->fdes, w->fd, w->events,
+					     poll_funcs_fde_handler, w);
+		if (ctx->fdes[i] == NULL) {
+			goto fail;
+		}
+	}
+	talloc_set_destructor(ctx, poll_funcs_tevent_context_destructor);
+	return ctx;
+fail:
+	TALLOC_FREE(ctx);
+	return NULL;
+}
+
+static int poll_funcs_tevent_context_destructor(
+	struct poll_funcs_tevent_context *ctx)
+{
+	ctx->state->contexts[ctx->slot] = NULL;
+	return 0;
+}
+
+static void poll_funcs_fde_handler(struct tevent_context *ev,
+				   struct tevent_fd *fde, uint16_t flags,
+				   void *private_data)
+{
+	struct poll_watch *w = talloc_get_type_abort(
+		private_data, struct poll_watch);
+	short events = tevent_to_poll_events(flags);
+	w->callback(w, w->fd, events, w->private_data);
+}
+
+static int poll_funcs_tevent_handle_destructor(
+	struct poll_funcs_tevent_handle *handle);
+
+void *poll_funcs_tevent_register(TALLOC_CTX *mem_ctx, struct poll_funcs *f,
+				 struct tevent_context *ev)
+{
+	struct poll_funcs_state *state = talloc_get_type_abort(
+		f->private_data, struct poll_funcs_state);
+	struct poll_funcs_tevent_handle *handle;
+	unsigned slot;
+
+	handle = talloc(mem_ctx, struct poll_funcs_tevent_handle);
+	if (handle == NULL) {
+		return NULL;
+	}
+
+	if (!poll_funcs_context_slot_find(state, ev, &slot)) {
+		goto fail;
+	}
+	if (state->contexts[slot] == NULL) {
+		state->contexts[slot] = poll_funcs_tevent_context_new(
+			state->contexts, state, ev, slot);
+		if (state->contexts[slot] == NULL) {
+			goto fail;
+		}
+	}
+
+	handle->ctx = state->contexts[slot];
+	handle->ctx->refcount += 1;
+	talloc_set_destructor(handle, poll_funcs_tevent_handle_destructor);
+	return handle;
+fail:
+	TALLOC_FREE(handle);
+	return NULL;
+}
+
+static int poll_funcs_tevent_handle_destructor(
+	struct poll_funcs_tevent_handle *handle)
+{
+	if (handle->ctx->refcount == 0) {
+		abort();
+	}
+	handle->ctx->refcount -= 1;
+
+	if (handle->ctx->refcount != 0) {
+		return 0;
+	}
+	TALLOC_FREE(handle->ctx);
+	return 0;
 }
diff --git a/source3/lib/poll_funcs/poll_funcs_tevent.h b/source3/lib/poll_funcs/poll_funcs_tevent.h
index 2e67720..8b2964c 100644
--- a/source3/lib/poll_funcs/poll_funcs_tevent.h
+++ b/source3/lib/poll_funcs/poll_funcs_tevent.h
@@ -1,6 +1,6 @@
 /*
  * Unix SMB/CIFS implementation.
- * Copyright (C) Volker Lendecke 2013
+ * Copyright (C) Volker Lendecke 2013,2014
  *
  * This program is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License as published by
@@ -22,6 +22,17 @@
 #include "poll_funcs.h"
 #include "tevent.h"
 
-void poll_funcs_init_tevent(struct poll_funcs *f, struct tevent_context *ev);
+/*
+ * Create a new, empty instance of "struct poll_funcs" to be served by tevent.
+ */
+struct poll_funcs *poll_funcs_init_tevent(TALLOC_CTX *mem_ctx);
+
+/*
+ * Register a tevent_context to handle the watches that the user of
+ * "poll_funcs" showed interest in. talloc_free() the returned pointer when
+ * "ev" is not supposed to handle the events anymore.
+ */
+void *poll_funcs_tevent_register(TALLOC_CTX *mem_ctx, struct poll_funcs *f,
+				 struct tevent_context *ev);
 
 #endif
diff --git a/source3/lib/unix_msg/test_drain.c b/source3/lib/unix_msg/test_drain.c
index 6fe8c18..c2568b6 100644
--- a/source3/lib/unix_msg/test_drain.c
+++ b/source3/lib/unix_msg/test_drain.c
@@ -16,7 +16,7 @@ static void recv_cb(struct unix_msg_ctx *ctx,
 
 int main(int argc, const char *argv[])
 {
-	struct poll_funcs funcs;
+	struct poll_funcs *funcs;
 	const char *sock;
 	struct unix_msg_ctx *ctx;
 	struct tevent_context *ev;
@@ -37,10 +37,13 @@ int main(int argc, const char *argv[])
 		perror("tevent_context_init failed");
 		return 1;
 	}
-	poll_funcs_init_tevent(&funcs, ev);
+	funcs = poll_funcs_init_tevent(ev);
+	if (funcs == NULL) {
+		fprintf(stderr, "poll_funcs_init_tevent failed\n");
+		return 1;
+	}
 
-	ret = unix_msg_init(sock, &funcs, 256, 1,
-			    recv_cb, &state, &ctx);
+	ret = unix_msg_init(sock, funcs, 256, 1, recv_cb, &state, &ctx);
 	if (ret != 0) {
 		fprintf(stderr, "unix_msg_init failed: %s\n",
 			strerror(ret));
diff --git a/source3/lib/unix_msg/test_source.c b/source3/lib/unix_msg/test_source.c
index bfafee1..94984d8 100644
--- a/source3/lib/unix_msg/test_source.c
+++ b/source3/lib/unix_msg/test_source.c
@@ -5,7 +5,8 @@
 
 int main(int argc, const char *argv[])
 {
-	struct poll_funcs funcs;
+	struct poll_funcs *funcs;
+	void *tevent_handle;
 	struct unix_msg_ctx **ctxs;
 	struct tevent_context *ev;
 	struct iovec iov;
@@ -26,7 +27,16 @@ int main(int argc, const char *argv[])
 		perror("tevent_context_init failed");
 		return 1;
 	}
-	poll_funcs_init_tevent(&funcs, ev);
+	funcs = poll_funcs_init_tevent(NULL);
+	if (funcs == NULL) {
+		fprintf(stderr, "poll_funcs_init_tevent failed\n");
+		return 1;
+	}
+	tevent_handle = poll_funcs_tevent_register(NULL, funcs, ev);
+	if (tevent_handle == NULL) {
+		fprintf(stderr, "poll_funcs_tevent_register failed\n");
+		return 1;
+	}
 
 	ctxs = talloc_array(ev, struct unix_msg_ctx *, num_ctxs);
 	if (ctxs == NULL) {
@@ -35,7 +45,7 @@ int main(int argc, const char *argv[])
 	}
 
 	for (i=0; i<num_ctxs; i++) {
-		ret = unix_msg_init(NULL, &funcs, 256, 1, NULL, NULL,
+		ret = unix_msg_init(NULL, funcs, 256, 1, NULL, NULL,
 				    &ctxs[i]);
 		if (ret != 0) {
 			fprintf(stderr, "unix_msg_init failed: %s\n",
diff --git a/source3/lib/unix_msg/tests.c b/source3/lib/unix_msg/tests.c
index 2a4cf86..29d5dcb 100644
--- a/source3/lib/unix_msg/tests.c
+++ b/source3/lib/unix_msg/tests.c
@@ -32,7 +32,8 @@ static void expect_messages(struct tevent_context *ev, struct cb_state *state,
 
 int main(void)
 {
-	struct poll_funcs funcs;
+	struct poll_funcs *funcs;
+	void *tevent_handle;
 	const char *sock1 = "sock1";
 	const char *sock2 = "sock2";
 	struct unix_msg_ctx *ctx1, *ctx2;
@@ -52,9 +53,19 @@ int main(void)
 		perror("tevent_context_init failed");
 		return 1;
 	}
-	poll_funcs_init_tevent(&funcs, ev);
 
-	ret = unix_msg_init(sock1, &funcs, 256, 1,
+	funcs = poll_funcs_init_tevent(ev);
+	if (funcs == NULL) {
+		fprintf(stderr, "poll_funcs_init_tevent failed\n");
+		return 1;
+	}
+	tevent_handle = poll_funcs_tevent_register(ev, funcs, ev);
+	if (tevent_handle == NULL) {
+		fprintf(stderr, "poll_funcs_register_tevent failed\n");
+		return 1;
+	}
+
+	ret = unix_msg_init(sock1, funcs, 256, 1,
 			    recv_cb, &state, &ctx1);
 	if (ret != 0) {
 		fprintf(stderr, "unix_msg_init failed: %s\n",
@@ -62,7 +73,7 @@ int main(void)
 		return 1;
 	}
 
-	ret = unix_msg_init(sock1, &funcs, 256, 1,
+	ret = unix_msg_init(sock1, funcs, 256, 1,
 			    recv_cb, &state, &ctx1);
 	if (ret == 0) {
 		fprintf(stderr, "unix_msg_init succeeded unexpectedly\n");
@@ -74,7 +85,7 @@ int main(void)
 		return 1;
 	}
 
-	ret = unix_msg_init(sock2, &funcs, 256, 1,
+	ret = unix_msg_init(sock2, funcs, 256, 1,
 			    recv_cb, &state, &ctx2);
 	if (ret != 0) {
 		fprintf(stderr, "unix_msg_init failed: %s\n",
@@ -201,6 +212,8 @@ int main(void)
 
 	unix_msg_free(ctx1);
 	unix_msg_free(ctx2);
+	talloc_free(tevent_handle);
+	talloc_free(funcs);
 	talloc_free(ev);
 
 	return 0;
-- 
1.9.1.423.g4596e3a


From 8af79c42286bfd818ccb26b269ba19f6cbd4f957 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Tue, 6 May 2014 09:11:17 +0200
Subject: [PATCH 06/12] messaging3: Add messaging_dgm_register_tevent_context

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/include/messages.h | 3 +++
 source3/lib/messages_dgm.c | 9 +++++++++
 2 files changed, 12 insertions(+)

diff --git a/source3/include/messages.h b/source3/include/messages.h
index 7801dfb..852e8a1 100644
--- a/source3/include/messages.h
+++ b/source3/include/messages.h
@@ -99,6 +99,9 @@ NTSTATUS messaging_dgm_init(struct messaging_context *msg_ctx,
 			    struct messaging_backend **presult);
 NTSTATUS messaging_dgm_cleanup(struct messaging_context *msg_ctx, pid_t pid);
 NTSTATUS messaging_dgm_wipe(struct messaging_context *msg_ctx);
+void *messaging_dgm_register_tevent_context(TALLOC_CTX *mem_ctx,
+					    struct messaging_context *msg_ctx,
+					    struct tevent_context *ev);
 
 NTSTATUS messaging_ctdbd_init(struct messaging_context *msg_ctx,
 			      TALLOC_CTX *mem_ctx,
diff --git a/source3/lib/messages_dgm.c b/source3/lib/messages_dgm.c
index 56643b1..55a6fcf 100644
--- a/source3/lib/messages_dgm.c
+++ b/source3/lib/messages_dgm.c
@@ -473,3 +473,12 @@ NTSTATUS messaging_dgm_wipe(struct messaging_context *msg_ctx)
 
 	return NT_STATUS_OK;
 }
+
+void *messaging_dgm_register_tevent_context(TALLOC_CTX *mem_ctx,
+					    struct messaging_context *msg_ctx,
+					    struct tevent_context *ev)
+{
+	struct messaging_dgm_context *ctx = talloc_get_type_abort(
+		msg_ctx->local->private_data, struct messaging_dgm_context);
+	return poll_funcs_tevent_register(mem_ctx, ctx->msg_callbacks, ev);
+}
-- 
1.9.1.423.g4596e3a


From 3d0f4f7e41e87af647d06f479aaa0fb253b8c753 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Tue, 6 May 2014 09:39:01 +0200
Subject: [PATCH 07/12] messaging3: Fix messaging_filtered_read_send

If we register an additional tevent context, we can now properly do
nested event contexts, listening for just one message type inside a
tevent_req_poll.

At this point this only enhances things without ctdb, but I'm working fixing
that soon.

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/lib/messages.c | 15 +++++++++++++++
 1 file changed, 15 insertions(+)

diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index 6a08531..50c79a1 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -461,6 +461,7 @@ static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
 struct messaging_filtered_read_state {
 	struct tevent_context *ev;
 	struct messaging_context *msg_ctx;
+	void *tevent_handle;
 
 	bool (*filter)(struct messaging_rec *rec, void *private_data);
 	void *private_data;
@@ -492,6 +493,18 @@ struct tevent_req *messaging_filtered_read_send(
 	state->private_data = private_data;
 
 	/*
+	 * We have to defer the callback here, as we might be called from
+	 * within a different tevent_context than state->ev
+	 */
+	tevent_req_defer_callback(req, state->ev);
+
+	state->tevent_handle = messaging_dgm_register_tevent_context(
+		state, msg_ctx, ev);
+	if (tevent_req_nomem(state, req)) {
+		return tevent_req_post(req, ev);
+	}
+
+	/*
 	 * We add ourselves to the "new_waiters" array, not the "waiters"
 	 * array. If we are called from within messaging_read_done,
 	 * messaging_dispatch_rec will be in an active for-loop on
@@ -529,6 +542,8 @@ static void messaging_filtered_read_cleanup(struct tevent_req *req,
 
 	tevent_req_set_cleanup_fn(req, NULL);
 
+	TALLOC_FREE(state->tevent_handle);
+
 	/*
 	 * Just set the [new_]waiters entry to NULL, be careful not to mess
 	 * with the other "waiters" array contents. We are often called from
-- 
1.9.1.423.g4596e3a


From ce55c9be04bced0514bba2e5e975ef86f623593c Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Wed, 7 May 2014 08:49:04 +0200
Subject: [PATCH 08/12] torture3: Fix local-messaging-read1

Now that we defer requests in dispatch_rec, we need 3 rounds to finish
the requests

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/torture/test_messaging_read.c | 9 ++++++---
 1 file changed, 6 insertions(+), 3 deletions(-)

diff --git a/source3/torture/test_messaging_read.c b/source3/torture/test_messaging_read.c
index 387ebfd..188b021 100644
--- a/source3/torture/test_messaging_read.c
+++ b/source3/torture/test_messaging_read.c
@@ -91,6 +91,7 @@ bool run_messaging_read1(int dummy)
 	unsigned count2 = 0;
 	NTSTATUS status;
 	bool retval = false;
+	int i;
 
 	ev = samba_tevent_context_init(talloc_tos());
 	if (ev == NULL) {
@@ -121,9 +122,11 @@ bool run_messaging_read1(int dummy)
 		goto fail;
 	}
 
-	if (tevent_loop_once(ev) != 0) {
-		fprintf(stderr, "tevent_loop_once failed\n");
-		goto fail;
+	for (i=0; i<3; i++) {
+		if (tevent_loop_once(ev) != 0) {
+			fprintf(stderr, "tevent_loop_once failed\n");
+			goto fail;
+		}
 	}
 
 	printf("%u/%u\n", count1, count2);
-- 
1.9.1.423.g4596e3a


From 95ff0175b7a41e3935b8b1865a79d9d05e6b7d8d Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Wed, 7 May 2014 09:44:57 +0200
Subject: [PATCH 09/12] messaging3: Push down the self-send callback

In the messaging_read receivers we already defer the callback: We need to
reply on potentially different tevent contexts, thus the defer_callback.

The callback case in messaging_dispatch_rec was direct before this
patch. This changes messaging_dispatch_rec to also defer the callback
in the self-send case.

Now we need only two roundtrips in local-messaging-read1 :-)

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/lib/messages.c                | 146 ++++++++++++++++++++++------------
 source3/torture/test_messaging_read.c |   2 +-
 2 files changed, 95 insertions(+), 53 deletions(-)

diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index 50c79a1..1c62809 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -341,15 +341,6 @@ void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
 	}
 }
 
-struct messaging_selfsend_state {
-	struct messaging_context *msg;
-	struct messaging_rec rec;
-};
-
-static void messaging_trigger_self(struct tevent_context *ev,
-				   struct tevent_immediate *im,
-				   void *private_data);
-
 /*
   Send a message to a particular server
 */
@@ -368,33 +359,13 @@ NTSTATUS messaging_send(struct messaging_context *msg_ctx,
 	}
 
 	if (server_id_equal(&msg_ctx->id, &server)) {
-		struct messaging_selfsend_state *state;
-		struct tevent_immediate *im;
-
-		state = talloc_pooled_object(
-			msg_ctx, struct messaging_selfsend_state,
-			1, data->length);
-		if (state == NULL) {
-			return NT_STATUS_NO_MEMORY;
-		}
-		state->msg = msg_ctx;
-		state->rec.msg_version = MESSAGE_VERSION;
-		state->rec.msg_type = msg_type & MSG_TYPE_MASK;
-		state->rec.dest = server;
-		state->rec.src = msg_ctx->id;
-
-		/* Can't fail, it's a pooled_object */
-		state->rec.buf = data_blob_talloc(
-			state, data->data, data->length);
-
-		im = tevent_create_immediate(state);
-		if (im == NULL) {
-			TALLOC_FREE(state);
-			return NT_STATUS_NO_MEMORY;
-		}
-
-		tevent_schedule_immediate(im, msg_ctx->event_ctx,
-					  messaging_trigger_self, state);
+		struct messaging_rec rec;
+		rec.msg_version = MESSAGE_VERSION;
+		rec.msg_type = msg_type & MSG_TYPE_MASK;
+		rec.dest = server;
+		rec.src = msg_ctx->id;
+		rec.buf = *data;
+		messaging_dispatch_rec(msg_ctx, &rec);
 		return NT_STATUS_OK;
 	}
 
@@ -402,16 +373,6 @@ NTSTATUS messaging_send(struct messaging_context *msg_ctx,
 				       msg_ctx->local);
 }
 
-static void messaging_trigger_self(struct tevent_context *ev,
-				   struct tevent_immediate *im,
-				   void *private_data)
-{
-	struct messaging_selfsend_state *state = talloc_get_type_abort(
-		private_data, struct messaging_selfsend_state);
-	messaging_dispatch_rec(state->msg, &state->rec);
-	TALLOC_FREE(state);
-}
-
 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
 			    struct server_id server, uint32_t msg_type,
 			    const uint8_t *buf, size_t len)
@@ -697,6 +658,68 @@ static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
 	return true;
 }
 
+struct messaging_defer_callback_state {
+	struct messaging_context *msg_ctx;
+	struct messaging_rec *rec;
+	void (*fn)(struct messaging_context *msg, void *private_data,
+		   uint32_t msg_type, struct server_id server_id,
+		   DATA_BLOB *data);
+	void *private_data;
+};
+
+static void messaging_defer_callback_trigger(struct tevent_context *ev,
+					     struct tevent_immediate *im,
+					     void *private_data);
+
+static void messaging_defer_callback(
+	struct messaging_context *msg_ctx, struct messaging_rec *rec,
+	void (*fn)(struct messaging_context *msg, void *private_data,
+		   uint32_t msg_type, struct server_id server_id,
+		   DATA_BLOB *data),
+	void *private_data)
+{
+	struct messaging_defer_callback_state *state;
+	struct tevent_immediate *im;
+
+	state = talloc(msg_ctx, struct messaging_defer_callback_state);
+	if (state == NULL) {
+		DEBUG(1, ("talloc failed\n"));
+		return;
+	}
+	state->msg_ctx = msg_ctx;
+	state->fn = fn;
+	state->private_data = private_data;
+
+	state->rec = messaging_rec_dup(state, rec);
+	if (state->rec == NULL) {
+		DEBUG(1, ("talloc failed\n"));
+		TALLOC_FREE(state);
+		return;
+	}
+
+	im = tevent_create_immediate(state);
+	if (im == NULL) {
+		DEBUG(1, ("tevent_create_immediate failed\n"));
+		TALLOC_FREE(state);
+		return;
+	}
+	tevent_schedule_immediate(im, msg_ctx->event_ctx,
+				  messaging_defer_callback_trigger, state);
+}
+
+static void messaging_defer_callback_trigger(struct tevent_context *ev,
+					     struct tevent_immediate *im,
+					     void *private_data)
+{
+	struct messaging_defer_callback_state *state = talloc_get_type_abort(
+		private_data, struct messaging_defer_callback_state);
+	struct messaging_rec *rec = state->rec;
+
+	state->fn(state->msg_ctx, state->private_data, rec->msg_type, rec->src,
+		  &rec->buf);
+	TALLOC_FREE(state);
+}
+
 /*
   Dispatch one messaging_rec
 */
@@ -708,15 +731,34 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx,
 
 	for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
 		next = cb->next;
-		if (cb->msg_type == rec->msg_type) {
+		if (cb->msg_type != rec->msg_type) {
+			continue;
+		}
+
+		if (server_id_equal(&msg_ctx->id, &rec->dest)) {
+			/*
+			 * This is a self-send. We are called here from
+			 * messaging_send(), and we don't want to directly
+			 * recurse into the callback but go via a
+			 * tevent_loop_once
+			 */
+			messaging_defer_callback(msg_ctx, rec, cb->fn,
+						 cb->private_data);
+		} else {
+			/*
+			 * This comes from a different process. we are called
+			 * from the event loop, so we should call back
+			 * directly.
+			 */
 			cb->fn(msg_ctx, cb->private_data, rec->msg_type,
 			       rec->src, &rec->buf);
-			/* we continue looking for matching messages
-			   after finding one. This matters for
-			   subsystems like the internal notify code
-			   which register more than one handler for
-			   the same message type */
 		}
+		/*
+		 * we continue looking for matching messages after finding
+		 * one. This matters for subsystems like the internal notify
+		 * code which register more than one handler for the same
+		 * message type
+		 */
 	}
 
 	if (!messaging_append_new_waiters(msg_ctx)) {
diff --git a/source3/torture/test_messaging_read.c b/source3/torture/test_messaging_read.c
index 188b021..0bb3128 100644
--- a/source3/torture/test_messaging_read.c
+++ b/source3/torture/test_messaging_read.c
@@ -122,7 +122,7 @@ bool run_messaging_read1(int dummy)
 		goto fail;
 	}
 
-	for (i=0; i<3; i++) {
+	for (i=0; i<2; i++) {
 		if (tevent_loop_once(ev) != 0) {
 			fprintf(stderr, "tevent_loop_once failed\n");
 			goto fail;
-- 
1.9.1.423.g4596e3a


From 6ab5f6b2324cde6626339bb2af8ce9d2b19086ff Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Wed, 7 May 2014 09:50:27 +0200
Subject: [PATCH 10/12] messaging3: Factor out the self-send check

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/lib/messages.c | 10 ++++++++--
 1 file changed, 8 insertions(+), 2 deletions(-)

diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index 1c62809..e722b2a 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -341,6 +341,12 @@ void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
 	}
 }
 
+static bool messaging_is_self_send(const struct messaging_context *msg_ctx,
+				   const struct server_id *dst)
+{
+	return server_id_equal(&msg_ctx->id, dst);
+}
+
 /*
   Send a message to a particular server
 */
@@ -358,7 +364,7 @@ NTSTATUS messaging_send(struct messaging_context *msg_ctx,
 						msg_ctx->remote);
 	}
 
-	if (server_id_equal(&msg_ctx->id, &server)) {
+	if (messaging_is_self_send(msg_ctx, &server)) {
 		struct messaging_rec rec;
 		rec.msg_version = MESSAGE_VERSION;
 		rec.msg_type = msg_type & MSG_TYPE_MASK;
@@ -735,7 +741,7 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx,
 			continue;
 		}
 
-		if (server_id_equal(&msg_ctx->id, &rec->dest)) {
+		if (messaging_is_self_send(msg_ctx, &rec->dest)) {
 			/*
 			 * This is a self-send. We are called here from
 			 * messaging_send(), and we don't want to directly
-- 
1.9.1.423.g4596e3a


From d14aa6c2177eb3284a7a5da840d03e7938ca10a4 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Wed, 7 May 2014 09:51:59 +0200
Subject: [PATCH 11/12] messaging3: Relax the self-send check a bit

In the future we will have multiple task id's per process. They should all be
able to benefit from the self-send local optimization.

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/lib/messages.c | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index e722b2a..6778080 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -344,7 +344,8 @@ void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
 static bool messaging_is_self_send(const struct messaging_context *msg_ctx,
 				   const struct server_id *dst)
 {
-	return server_id_equal(&msg_ctx->id, dst);
+	return ((msg_ctx->id.vnn == dst->vnn) &&
+		(msg_ctx->id.pid == dst->pid));
 }
 
 /*
-- 
1.9.1.423.g4596e3a


From eb9fb176faf7eebd66077566d754fce0af477faa Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Wed, 7 May 2014 11:21:04 +0200
Subject: [PATCH 12/12] torture3: local-messaging-read3

This is a testcase for the rpc-style messaging nested event context. We have to
fork here: The nested event context does not reply to the PING message, it only
listens for the PONG response. But that's the point of the patches: Correctly
pick just one message in a nested event context. I think this is the best we
can do with nested event contexts.

Signed-off-by: Volker Lendecke <vl at samba.org>
Reviewed-by: Stefan Metzmacher <metze at samba.org>
Reviewed-by: Jeremy Allison <jra at samba.org>
---
 source3/selftest/tests.py             |   1 +
 source3/torture/proto.h               |   1 +
 source3/torture/test_messaging_read.c | 200 ++++++++++++++++++++++++++++++++++
 source3/torture/torture.c             |   1 +
 4 files changed, 203 insertions(+)

diff --git a/source3/selftest/tests.py b/source3/selftest/tests.py
index fcf3cd5..bad66d3 100755
--- a/source3/selftest/tests.py
+++ b/source3/selftest/tests.py
@@ -104,6 +104,7 @@ local_tests = [
     "LOCAL-IDMAP-TDB-COMMON",
     "LOCAL-MESSAGING-READ1",
     "LOCAL-MESSAGING-READ2",
+    "LOCAL-MESSAGING-READ3",
     "LOCAL-hex_encode_buf",
     "LOCAL-sprintf_append",
     "LOCAL-remove_duplicate_addrs2"]
diff --git a/source3/torture/proto.h b/source3/torture/proto.h
index a737ea4..20c1110 100644
--- a/source3/torture/proto.h
+++ b/source3/torture/proto.h
@@ -115,5 +115,6 @@ bool run_qpathinfo_bufsize(int dummy);
 bool run_bench_pthreadpool(int dummy);
 bool run_messaging_read1(int dummy);
 bool run_messaging_read2(int dummy);
+bool run_messaging_read3(int dummy);
 
 #endif /* __TORTURE_H__ */
diff --git a/source3/torture/test_messaging_read.c b/source3/torture/test_messaging_read.c
index 0bb3128..757b83e 100644
--- a/source3/torture/test_messaging_read.c
+++ b/source3/torture/test_messaging_read.c
@@ -248,3 +248,203 @@ fail:
 	TALLOC_FREE(ev);
 	return retval;
 }
+
+struct msg_pingpong_state {
+	uint8_t dummy;
+};
+
+static void msg_pingpong_done(struct tevent_req *subreq);
+
+static struct tevent_req *msg_pingpong_send(TALLOC_CTX *mem_ctx,
+					    struct tevent_context *ev,
+					    struct messaging_context *msg_ctx,
+					    struct server_id dst)
+{
+	struct tevent_req *req, *subreq;
+	struct msg_pingpong_state *state;
+	NTSTATUS status;
+
+	req = tevent_req_create(mem_ctx, &state, struct msg_pingpong_state);
+	if (req == NULL) {
+		return NULL;
+	}
+
+	status = messaging_send_buf(msg_ctx, dst, MSG_PING, NULL, 0);
+	if (!NT_STATUS_IS_OK(status)) {
+		tevent_req_error(req, map_errno_from_nt_status(status));
+		return tevent_req_post(req, ev);
+	}
+
+	subreq = messaging_read_send(state, ev, msg_ctx, MSG_PONG);
+	if (tevent_req_nomem(subreq, req)) {
+		return tevent_req_post(req, ev);
+	}
+	tevent_req_set_callback(subreq, msg_pingpong_done, req);
+	return req;
+}
+
+static void msg_pingpong_done(struct tevent_req *subreq)
+{
+	struct tevent_req *req = tevent_req_callback_data(
+		subreq, struct tevent_req);
+	int ret;
+
+	ret = messaging_read_recv(subreq, NULL, NULL);
+	TALLOC_FREE(subreq);
+	if (ret != 0) {
+		tevent_req_error(req, ret);
+		return;
+	}
+	tevent_req_done(req);
+}
+
+static int msg_pingpong_recv(struct tevent_req *req)
+{
+	int err;
+
+	if (tevent_req_is_unix_error(req, &err)) {
+		return err;
+	}
+	return 0;
+}
+
+static int msg_pingpong(struct messaging_context *msg_ctx,
+			struct server_id dst)
+{
+	struct tevent_context *ev;
+	struct tevent_req *req;
+	int ret = ENOMEM;
+
+	ev = tevent_context_init(msg_ctx);
+	if (ev == NULL) {
+		goto fail;
+	}
+	req = msg_pingpong_send(ev, ev, msg_ctx, dst);
+	if (req == NULL) {
+		goto fail;
+	}
+	if (!tevent_req_poll(req, ev)) {
+		ret = errno;
+		goto fail;
+	}
+	ret = msg_pingpong_recv(req);
+fail:
+	TALLOC_FREE(ev);
+	return ret;
+}
+
+static void ping_responder_exit(struct tevent_context *ev,
+				struct tevent_fd *fde,
+				uint16_t flags,
+				void *private_data)
+{
+	bool *done = private_data;
+	*done = true;
+}
+
+static void ping_responder(int ready_pipe, int exit_pipe)
+{
+	struct tevent_context *ev;
+	struct messaging_context *msg_ctx;
+	struct tevent_fd *exit_handler;
+	char c = 0;
+	bool done = false;
+
+	ev = samba_tevent_context_init(talloc_tos());
+	if (ev == NULL) {
+		fprintf(stderr, "child tevent_context_init failed\n");
+		exit(1);
+	}
+	msg_ctx = messaging_init(ev, ev);
+	if (msg_ctx == NULL) {
+		fprintf(stderr, "child messaging_init failed\n");
+		exit(1);
+	}
+	exit_handler = tevent_add_fd(ev, ev, exit_pipe, TEVENT_FD_READ,
+				     ping_responder_exit, &done);
+	if (exit_handler == NULL) {
+		fprintf(stderr, "child tevent_add_fd failed\n");
+		exit(1);
+	}
+
+	if (write(ready_pipe, &c, 1) != 1) {
+		fprintf(stderr, "child messaging_init failed\n");
+		exit(1);
+	}
+
+	while (!done) {
+		int ret;
+		ret = tevent_loop_once(ev);
+		if (ret != 0) {
+			fprintf(stderr, "child tevent_loop_once failed\n");
+			exit(1);
+		}
+	}
+
+	TALLOC_FREE(msg_ctx);
+	TALLOC_FREE(ev);
+}
+
+bool run_messaging_read3(int dummy)
+{
+	struct tevent_context *ev = NULL;
+	struct messaging_context *msg_ctx = NULL;
+	bool retval = false;
+	pid_t child;
+	int ready_pipe[2];
+	int exit_pipe[2];
+	int ret;
+	char c;
+	struct server_id dst;
+
+	if ((pipe(ready_pipe) != 0) || (pipe(exit_pipe) != 0)) {
+		perror("pipe failed");
+		return false;
+	}
+
+	child = fork();
+	if (child == -1) {
+		perror("fork failed");
+		return false;
+	}
+
+	if (child == 0) {
+		close(ready_pipe[0]);
+		close(exit_pipe[1]);
+		ping_responder(ready_pipe[1], exit_pipe[0]);
+		exit(0);
+	}
+	close(ready_pipe[1]);
+	close(exit_pipe[0]);
+
+	if (read(ready_pipe[0], &c, 1) != 1) {
+		perror("read failed");
+		return false;
+	}
+
+	ev = samba_tevent_context_init(talloc_tos());
+	if (ev == NULL) {
+		fprintf(stderr, "tevent_context_init failed\n");
+		goto fail;
+	}
+	msg_ctx = messaging_init(ev, ev);
+	if (msg_ctx == NULL) {
+		fprintf(stderr, "messaging_init failed\n");
+		goto fail;
+	}
+
+	dst = messaging_server_id(msg_ctx);
+	dst.pid = child;
+
+	ret = msg_pingpong(msg_ctx, dst);
+	if (ret != 0){
+		fprintf(stderr, "msg_pingpong failed\n");
+		goto fail;
+	}
+
+	retval = true;
+fail:
+	TALLOC_FREE(msg_ctx);
+	TALLOC_FREE(ev);
+	return retval;
+}
diff --git a/source3/torture/torture.c b/source3/torture/torture.c
index f97119a..0826506 100644
--- a/source3/torture/torture.c
+++ b/source3/torture/torture.c
@@ -9576,6 +9576,7 @@ static struct {
 	{ "LOCAL-DBWRAP-WATCH1", run_dbwrap_watch1, 0 },
 	{ "LOCAL-MESSAGING-READ1", run_messaging_read1, 0 },
 	{ "LOCAL-MESSAGING-READ2", run_messaging_read2, 0 },
+	{ "LOCAL-MESSAGING-READ3", run_messaging_read3, 0 },
 	{ "LOCAL-BASE64", run_local_base64, 0},
 	{ "LOCAL-RBTREE", run_local_rbtree, 0},
 	{ "LOCAL-MEMCACHE", run_local_memcache, 0},
-- 
1.9.1.423.g4596e3a



More information about the samba-technical mailing list