[SCM] Samba Shared Repository - branch master updated

Volker Lendecke vlendec at samba.org
Thu May 8 03:33:03 MDT 2014


The branch, master has been updated
       via  50a66d5 torture3: local-messaging-read3
       via  8763c8c messaging3: Relax the self-send check a bit
       via  9988e62 messaging3: Factor out the self-send check
       via  c0f6ab9 messaging3: Push down the self-send callback
       via  80365e0 torture3: Fix local-messaging-read1
       via  e750e2b messaging3: Fix messaging_filtered_read_send
       via  a6e49f9 messaging3: Add messaging_dgm_register_tevent_context
       via  dca572f lib: Enhance poll_funcs_tevent for multiple tevent_contexts
       via  5601576 messaging3: Add comments about not touching "waiters"
       via  8e71945 messaging3: Fix 80-char line limit
       via  8d65512 dbwrap: Use messaging_filtered_read
       via  7a266c5 messaging3: Add messaging_filtered_read
      from  06c25eb wbclient: ensure response struct is initialized

http://gitweb.samba.org/?p=samba.git;a=shortlog;h=master


- Log -----------------------------------------------------------------
commit 50a66d588a13888a594b785c700695771bf6224d
Author: Volker Lendecke <vl at samba.org>
Date:   Wed May 7 11:21:04 2014 +0200

    torture3: local-messaging-read3
    
    This is a testcase for the rpc-style messaging nested event context. We have to
    fork here: The nested event context does not reply to the PING message, it only
    listens for the PONG response. But that's the point of the patches: Correctly
    pick just one message in a nested event context. I think this is the best we
    can do with nested event contexts.
    
    Signed-off-by: Volker Lendecke <vl at samba.org>
    Reviewed-by: Stefan Metzmacher <metze at samba.org>
    Reviewed-by: Jeremy Allison <jra at samba.org>
    
    Autobuild-User(master): Volker Lendecke <vl at samba.org>
    Autobuild-Date(master): Thu May  8 11:32:44 CEST 2014 on sn-devel-104

commit 8763c8c19625a974b040987e6fd73aea0434730a
Author: Volker Lendecke <vl at samba.org>
Date:   Wed May 7 09:51:59 2014 +0200

    messaging3: Relax the self-send check a bit
    
    In the future we will have multiple task id's per process. They should all be
    able to benefit from the self-send local optimization.
    
    Signed-off-by: Volker Lendecke <vl at samba.org>
    Reviewed-by: Stefan Metzmacher <metze at samba.org>
    Reviewed-by: Jeremy Allison <jra at samba.org>

commit 9988e625524a2f79a884d02ae02924bed9c2576d
Author: Volker Lendecke <vl at samba.org>
Date:   Wed May 7 09:50:27 2014 +0200

    messaging3: Factor out the self-send check
    
    Signed-off-by: Volker Lendecke <vl at samba.org>
    Reviewed-by: Stefan Metzmacher <metze at samba.org>
    Reviewed-by: Jeremy Allison <jra at samba.org>

commit c0f6ab92f7516a0b3f0034a1ee25a45c4f088ec3
Author: Volker Lendecke <vl at samba.org>
Date:   Wed May 7 09:44:57 2014 +0200

    messaging3: Push down the self-send callback
    
    In the messaging_read receivers we already defer the callback: We need to
    reply on potentially different tevent contexts, thus the defer_callback.
    
    The callback case in messaging_dispatch_rec was direct before this
    patch. This changes messaging_dispatch_rec to also defer the callback
    in the self-send case.
    
    Now we need only two roundtrips in local-messaging-read1 :-)
    
    Signed-off-by: Volker Lendecke <vl at samba.org>
    Reviewed-by: Stefan Metzmacher <metze at samba.org>
    Reviewed-by: Jeremy Allison <jra at samba.org>

commit 80365e030d63f2d9708748149fb329467a284039
Author: Volker Lendecke <vl at samba.org>
Date:   Wed May 7 08:49:04 2014 +0200

    torture3: Fix local-messaging-read1
    
    Now that we defer requests in dispatch_rec, we need 3 rounds to finish
    the requests
    
    Signed-off-by: Volker Lendecke <vl at samba.org>
    Reviewed-by: Stefan Metzmacher <metze at samba.org>
    Reviewed-by: Jeremy Allison <jra at samba.org>

commit e750e2b1eaf4b98a2870889cc100a150a1b09966
Author: Volker Lendecke <vl at samba.org>
Date:   Tue May 6 09:39:01 2014 +0200

    messaging3: Fix messaging_filtered_read_send
    
    If we register an additional tevent context, we can now properly do
    nested event contexts, listening for just one message type inside a
    tevent_req_poll.
    
    At this point this only enhances things without ctdb, but I'm working fixing
    that soon.
    
    Signed-off-by: Volker Lendecke <vl at samba.org>
    Reviewed-by: Stefan Metzmacher <metze at samba.org>
    Reviewed-by: Jeremy Allison <jra at samba.org>

commit a6e49f9cda6ae6c7ded534aca756b2387bdc8978
Author: Volker Lendecke <vl at samba.org>
Date:   Tue May 6 09:11:17 2014 +0200

    messaging3: Add messaging_dgm_register_tevent_context
    
    Signed-off-by: Volker Lendecke <vl at samba.org>
    Reviewed-by: Stefan Metzmacher <metze at samba.org>
    Reviewed-by: Jeremy Allison <jra at samba.org>

commit dca572ff1ce1559a2254d9ba46d4f86d48c38c21
Author: Volker Lendecke <vl at samba.org>
Date:   Mon May 5 08:45:52 2014 +0200

    lib: Enhance poll_funcs_tevent for multiple tevent_contexts
    
    With this patch it will be possible to use nested event contexts with
    messaging_filtered_read_send/recv. Before this patchset only the one and only
    event context a messaging_context is initialized with is able to receive
    datagrams from the unix domain socket. So if you want to code a synchronous
    RPC-like operation using a nested event context, you will not see the reply,
    because the nested event context does not have the required tevent_fd's.
    Unfortunately, this patchset has to add some advanced array voodoo. The idea
    is that state->watches[] contains what we hand out with watch_new, and
    state->contexts contains references to the tevent_contexts. For every watch we
    need a tevent_fd in every event context, and the routines make sure that the
    arrays are properly maintained.
    
    Signed-off-by: Volker Lendecke <vl at samba.org>
    Reviewed-by: Stefan Metzmacher <metze at samba.org>
    Reviewed-by: Jeremy Allison <jra at samba.org>

commit 5601576d9d182ca1741da6db5eb7cae405333329
Author: Volker Lendecke <vl at samba.org>
Date:   Fri May 2 09:20:40 2014 +0000

    messaging3: Add comments about not touching "waiters"
    
    Signed-off-by: Volker Lendecke <vl at samba.org>
    Reviewed-by: Stefan Metzmacher <metze at samba.org>
    Reviewed-by: Jeremy Allison <jra at samba.org>

commit 8e719456917b3e862b4f13f759d6e6e6646055bd
Author: Volker Lendecke <vl at samba.org>
Date:   Fri May 2 09:12:52 2014 +0000

    messaging3: Fix 80-char line limit
    
    Signed-off-by: Volker Lendecke <vl at samba.org>
    Reviewed-by: Stefan Metzmacher <metze at samba.org>
    Reviewed-by: Jeremy Allison <jra at samba.org>

commit 8d65512e69441d4756ba49b33a235043035ba1bd
Author: Volker Lendecke <vl at samba.org>
Date:   Thu Apr 24 09:23:48 2014 +0000

    dbwrap: Use messaging_filtered_read
    
    This does not really save any code lines, but IMHO the code is simpler
    this way. Also, in case we have lots of watchers this will be slightly
    cheaper, because we don't have to re-establish a tevent_req.
    
    Signed-off-by: Volker Lendecke <vl at samba.org>
    Reviewed-by: Stefan Metzmacher <metze at samba.org>
    Reviewed-by: Jeremy Allison <jra at samba.org>

commit 7a266c575af9fa31583c2bd64f79e3b66fd30815
Author: Volker Lendecke <vl at samba.org>
Date:   Thu Apr 24 09:05:53 2014 +0000

    messaging3: Add messaging_filtered_read
    
    This delegates the decision whether to read a message to a callback
    
    Signed-off-by: Volker Lendecke <vl at samba.org>
    Reviewed-by: Stefan Metzmacher <metze at samba.org>
    Reviewed-by: Jeremy Allison <jra at samba.org>

-----------------------------------------------------------------------

Summary of changes:
 source3/include/messages.h                 |   11 +
 source3/lib/dbwrap/dbwrap_watch.c          |   42 ++--
 source3/lib/messages.c                     |  326 +++++++++++++++++++-------
 source3/lib/messages_dgm.c                 |   27 ++-
 source3/lib/poll_funcs/poll_funcs_tevent.c |  356 ++++++++++++++++++++++++++--
 source3/lib/poll_funcs/poll_funcs_tevent.h |   15 +-
 source3/lib/unix_msg/test_drain.c          |   11 +-
 source3/lib/unix_msg/test_source.c         |   16 +-
 source3/lib/unix_msg/tests.c               |   23 ++-
 source3/selftest/tests.py                  |    1 +
 source3/torture/proto.h                    |    1 +
 source3/torture/test_messaging_read.c      |  209 ++++++++++++++++-
 source3/torture/torture.c                  |    1 +
 13 files changed, 892 insertions(+), 147 deletions(-)


Changeset truncated at 500 lines:

diff --git a/source3/include/messages.h b/source3/include/messages.h
index 06c1748..852e8a1 100644
--- a/source3/include/messages.h
+++ b/source3/include/messages.h
@@ -99,6 +99,9 @@ NTSTATUS messaging_dgm_init(struct messaging_context *msg_ctx,
 			    struct messaging_backend **presult);
 NTSTATUS messaging_dgm_cleanup(struct messaging_context *msg_ctx, pid_t pid);
 NTSTATUS messaging_dgm_wipe(struct messaging_context *msg_ctx);
+void *messaging_dgm_register_tevent_context(TALLOC_CTX *mem_ctx,
+					    struct messaging_context *msg_ctx,
+					    struct tevent_context *ev);
 
 NTSTATUS messaging_ctdbd_init(struct messaging_context *msg_ctx,
 			      TALLOC_CTX *mem_ctx,
@@ -142,6 +145,14 @@ NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
 			    struct messaging_rec *rec);
 
+struct tevent_req *messaging_filtered_read_send(
+	TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+	struct messaging_context *msg_ctx,
+	bool (*filter)(struct messaging_rec *rec, void *private_data),
+	void *private_data);
+int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+				 struct messaging_rec **presult);
+
 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
 				       struct tevent_context *ev,
 				       struct messaging_context *msg,
diff --git a/source3/lib/dbwrap/dbwrap_watch.c b/source3/lib/dbwrap/dbwrap_watch.c
index 4f3a2b3..a5f1ebd 100644
--- a/source3/lib/dbwrap/dbwrap_watch.c
+++ b/source3/lib/dbwrap/dbwrap_watch.c
@@ -235,6 +235,8 @@ struct dbwrap_record_watch_state {
 	TDB_DATA w_key;
 };
 
+static bool dbwrap_record_watch_filter(struct messaging_rec *rec,
+				       void *private_data);
 static void dbwrap_record_watch_done(struct tevent_req *subreq);
 static int dbwrap_record_watch_state_destructor(
 	struct dbwrap_record_watch_state *state);
@@ -271,8 +273,8 @@ struct tevent_req *dbwrap_record_watch_send(TALLOC_CTX *mem_ctx,
 		return tevent_req_post(req, ev);
 	}
 
-	subreq = messaging_read_send(state, ev, state->msg,
-				     MSG_DBWRAP_MODIFIED);
+	subreq = messaging_filtered_read_send(
+		state, ev, state->msg, dbwrap_record_watch_filter, state);
 	if (tevent_req_nomem(subreq, req)) {
 		return tevent_req_post(req, ev);
 	}
@@ -288,6 +290,21 @@ struct tevent_req *dbwrap_record_watch_send(TALLOC_CTX *mem_ctx,
 	return req;
 }
 
+static bool dbwrap_record_watch_filter(struct messaging_rec *rec,
+				       void *private_data)
+{
+	struct dbwrap_record_watch_state *state = talloc_get_type_abort(
+		private_data, struct dbwrap_record_watch_state);
+
+	if (rec->msg_type != MSG_DBWRAP_MODIFIED) {
+		return false;
+	}
+	if (rec->buf.length != state->w_key.dsize) {
+		return false;
+	}
+	return memcmp(rec->buf.data, state->w_key.dptr,	rec->buf.length) == 0;
+}
+
 static int dbwrap_record_watch_state_destructor(
 	struct dbwrap_record_watch_state *s)
 {
@@ -351,33 +368,16 @@ static void dbwrap_record_watch_done(struct tevent_req *subreq)
 {
 	struct tevent_req *req = tevent_req_callback_data(
 		subreq, struct tevent_req);
-	struct dbwrap_record_watch_state *state = tevent_req_data(
-		req, struct dbwrap_record_watch_state);
 	struct messaging_rec *rec;
 	int ret;
 
-	ret = messaging_read_recv(subreq, talloc_tos(), &rec);
+	ret = messaging_filtered_read_recv(subreq, talloc_tos(), &rec);
 	TALLOC_FREE(subreq);
 	if (ret != 0) {
 		tevent_req_nterror(req, map_nt_error_from_unix(ret));
 		return;
 	}
-
-	if ((rec->buf.length == state->w_key.dsize) &&
-	    (memcmp(rec->buf.data, state->w_key.dptr, rec->buf.length) == 0)) {
-		tevent_req_done(req);
-		return;
-	}
-
-	/*
-	 * Not our record, wait for the next one
-	 */
-	subreq = messaging_read_send(state, state->ev, state->msg,
-				     MSG_DBWRAP_MODIFIED);
-	if (tevent_req_nomem(subreq, req)) {
-		return;
-	}
-	tevent_req_set_callback(subreq, dbwrap_record_watch_done, req);
+	tevent_req_done(req);
 }
 
 NTSTATUS dbwrap_record_watch_recv(struct tevent_req *req,
diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index 9284ac1..6778080 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -341,14 +341,12 @@ void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
 	}
 }
 
-struct messaging_selfsend_state {
-	struct messaging_context *msg;
-	struct messaging_rec rec;
-};
-
-static void messaging_trigger_self(struct tevent_context *ev,
-				   struct tevent_immediate *im,
-				   void *private_data);
+static bool messaging_is_self_send(const struct messaging_context *msg_ctx,
+				   const struct server_id *dst)
+{
+	return ((msg_ctx->id.vnn == dst->vnn) &&
+		(msg_ctx->id.pid == dst->pid));
+}
 
 /*
   Send a message to a particular server
@@ -367,34 +365,14 @@ NTSTATUS messaging_send(struct messaging_context *msg_ctx,
 						msg_ctx->remote);
 	}
 
-	if (server_id_equal(&msg_ctx->id, &server)) {
-		struct messaging_selfsend_state *state;
-		struct tevent_immediate *im;
-
-		state = talloc_pooled_object(
-			msg_ctx, struct messaging_selfsend_state,
-			1, data->length);
-		if (state == NULL) {
-			return NT_STATUS_NO_MEMORY;
-		}
-		state->msg = msg_ctx;
-		state->rec.msg_version = MESSAGE_VERSION;
-		state->rec.msg_type = msg_type & MSG_TYPE_MASK;
-		state->rec.dest = server;
-		state->rec.src = msg_ctx->id;
-
-		/* Can't fail, it's a pooled_object */
-		state->rec.buf = data_blob_talloc(
-			state, data->data, data->length);
-
-		im = tevent_create_immediate(state);
-		if (im == NULL) {
-			TALLOC_FREE(state);
-			return NT_STATUS_NO_MEMORY;
-		}
-
-		tevent_schedule_immediate(im, msg_ctx->event_ctx,
-					  messaging_trigger_self, state);
+	if (messaging_is_self_send(msg_ctx, &server)) {
+		struct messaging_rec rec;
+		rec.msg_version = MESSAGE_VERSION;
+		rec.msg_type = msg_type & MSG_TYPE_MASK;
+		rec.dest = server;
+		rec.src = msg_ctx->id;
+		rec.buf = *data;
+		messaging_dispatch_rec(msg_ctx, &rec);
 		return NT_STATUS_OK;
 	}
 
@@ -402,16 +380,6 @@ NTSTATUS messaging_send(struct messaging_context *msg_ctx,
 				       msg_ctx->local);
 }
 
-static void messaging_trigger_self(struct tevent_context *ev,
-				   struct tevent_immediate *im,
-				   void *private_data)
-{
-	struct messaging_selfsend_state *state = talloc_get_type_abort(
-		private_data, struct messaging_selfsend_state);
-	messaging_dispatch_rec(state->msg, &state->rec);
-	TALLOC_FREE(state);
-}
-
 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
 			    struct server_id server, uint32_t msg_type,
 			    const uint8_t *buf, size_t len)
@@ -458,33 +426,59 @@ static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
 	return result;
 }
 
-struct messaging_read_state {
+struct messaging_filtered_read_state {
 	struct tevent_context *ev;
 	struct messaging_context *msg_ctx;
-	uint32_t msg_type;
+	void *tevent_handle;
+
+	bool (*filter)(struct messaging_rec *rec, void *private_data);
+	void *private_data;
+
 	struct messaging_rec *rec;
 };
 
-static void messaging_read_cleanup(struct tevent_req *req,
-				   enum tevent_req_state req_state);
+static void messaging_filtered_read_cleanup(struct tevent_req *req,
+					    enum tevent_req_state req_state);
 
-struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
-				       struct tevent_context *ev,
-				       struct messaging_context *msg_ctx,
-				       uint32_t msg_type)
+struct tevent_req *messaging_filtered_read_send(
+	TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+	struct messaging_context *msg_ctx,
+	bool (*filter)(struct messaging_rec *rec, void *private_data),
+	void *private_data)
 {
 	struct tevent_req *req;
-	struct messaging_read_state *state;
+	struct messaging_filtered_read_state *state;
 	size_t new_waiters_len;
 
 	req = tevent_req_create(mem_ctx, &state,
-				struct messaging_read_state);
+				struct messaging_filtered_read_state);
 	if (req == NULL) {
 		return NULL;
 	}
 	state->ev = ev;
 	state->msg_ctx = msg_ctx;
-	state->msg_type = msg_type;
+	state->filter = filter;
+	state->private_data = private_data;
+
+	/*
+	 * We have to defer the callback here, as we might be called from
+	 * within a different tevent_context than state->ev
+	 */
+	tevent_req_defer_callback(req, state->ev);
+
+	state->tevent_handle = messaging_dgm_register_tevent_context(
+		state, msg_ctx, ev);
+	if (tevent_req_nomem(state, req)) {
+		return tevent_req_post(req, ev);
+	}
+
+	/*
+	 * We add ourselves to the "new_waiters" array, not the "waiters"
+	 * array. If we are called from within messaging_read_done,
+	 * messaging_dispatch_rec will be in an active for-loop on
+	 * "waiters". We must be careful not to mess with this array, because
+	 * it could mean that a single event is being delivered twice.
+	 */
 
 	new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
 
@@ -501,21 +495,31 @@ struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
 
 	msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
 	msg_ctx->num_new_waiters += 1;
-	tevent_req_set_cleanup_fn(req, messaging_read_cleanup);
+	tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
 
 	return req;
 }
 
-static void messaging_read_cleanup(struct tevent_req *req,
-				   enum tevent_req_state req_state)
+static void messaging_filtered_read_cleanup(struct tevent_req *req,
+					    enum tevent_req_state req_state)
 {
-	struct messaging_read_state *state = tevent_req_data(
-		req, struct messaging_read_state);
+	struct messaging_filtered_read_state *state = tevent_req_data(
+		req, struct messaging_filtered_read_state);
 	struct messaging_context *msg_ctx = state->msg_ctx;
 	unsigned i;
 
 	tevent_req_set_cleanup_fn(req, NULL);
 
+	TALLOC_FREE(state->tevent_handle);
+
+	/*
+	 * Just set the [new_]waiters entry to NULL, be careful not to mess
+	 * with the other "waiters" array contents. We are often called from
+	 * within "messaging_dispatch_rec", which loops over
+	 * "waiters". Messing with the "waiters" array will mess up that
+	 * for-loop.
+	 */
+
 	for (i=0; i<msg_ctx->num_waiters; i++) {
 		if (msg_ctx->waiters[i] == req) {
 			msg_ctx->waiters[i] = NULL;
@@ -531,11 +535,11 @@ static void messaging_read_cleanup(struct tevent_req *req,
 	}
 }
 
-static void messaging_read_done(struct tevent_req *req,
-				struct messaging_rec *rec)
+static void messaging_filtered_read_done(struct tevent_req *req,
+					 struct messaging_rec *rec)
 {
-	struct messaging_read_state *state = tevent_req_data(
-		req, struct messaging_read_state);
+	struct messaging_filtered_read_state *state = tevent_req_data(
+		req, struct messaging_filtered_read_state);
 
 	state->rec = messaging_rec_dup(state, rec);
 	if (tevent_req_nomem(state->rec, req)) {
@@ -544,6 +548,79 @@ static void messaging_read_done(struct tevent_req *req,
 	tevent_req_done(req);
 }
 
+int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+				 struct messaging_rec **presult)
+{
+	struct messaging_filtered_read_state *state = tevent_req_data(
+		req, struct messaging_filtered_read_state);
+	int err;
+
+	if (tevent_req_is_unix_error(req, &err)) {
+		tevent_req_received(req);
+		return err;
+	}
+	*presult = talloc_move(mem_ctx, &state->rec);
+	return 0;
+}
+
+struct messaging_read_state {
+	uint32_t msg_type;
+	struct messaging_rec *rec;
+};
+
+static bool messaging_read_filter(struct messaging_rec *rec,
+				  void *private_data);
+static void messaging_read_done(struct tevent_req *subreq);
+
+struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
+				       struct tevent_context *ev,
+				       struct messaging_context *msg,
+				       uint32_t msg_type)
+{
+	struct tevent_req *req, *subreq;
+	struct messaging_read_state *state;
+
+	req = tevent_req_create(mem_ctx, &state,
+				struct messaging_read_state);
+	if (req == NULL) {
+		return NULL;
+	}
+	state->msg_type = msg_type;
+
+	subreq = messaging_filtered_read_send(state, ev, msg,
+					      messaging_read_filter, state);
+	if (tevent_req_nomem(subreq, req)) {
+		return tevent_req_post(req, ev);
+	}
+	tevent_req_set_callback(subreq, messaging_read_done, req);
+	return req;
+}
+
+static bool messaging_read_filter(struct messaging_rec *rec,
+				  void *private_data)
+{
+	struct messaging_read_state *state = talloc_get_type_abort(
+		private_data, struct messaging_read_state);
+
+	return rec->msg_type == state->msg_type;
+}
+
+static void messaging_read_done(struct tevent_req *subreq)
+{
+	struct tevent_req *req = tevent_req_callback_data(
+		subreq, struct tevent_req);
+	struct messaging_read_state *state = tevent_req_data(
+		req, struct messaging_read_state);
+	int ret;
+
+	ret = messaging_filtered_read_recv(subreq, state, &state->rec);
+	TALLOC_FREE(subreq);
+	if (tevent_req_error(req, ret)) {
+		return;
+	}
+	tevent_req_done(req);
+}
+
 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
 			struct messaging_rec **presult)
 {
@@ -552,7 +629,6 @@ int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
 	int err;
 
 	if (tevent_req_is_unix_error(req, &err)) {
-		tevent_req_received(req);
 		return err;
 	}
 	if (presult != NULL) {
@@ -589,6 +665,68 @@ static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
 	return true;
 }
 
+struct messaging_defer_callback_state {
+	struct messaging_context *msg_ctx;
+	struct messaging_rec *rec;
+	void (*fn)(struct messaging_context *msg, void *private_data,
+		   uint32_t msg_type, struct server_id server_id,
+		   DATA_BLOB *data);
+	void *private_data;
+};
+
+static void messaging_defer_callback_trigger(struct tevent_context *ev,
+					     struct tevent_immediate *im,
+					     void *private_data);
+
+static void messaging_defer_callback(
+	struct messaging_context *msg_ctx, struct messaging_rec *rec,
+	void (*fn)(struct messaging_context *msg, void *private_data,
+		   uint32_t msg_type, struct server_id server_id,
+		   DATA_BLOB *data),
+	void *private_data)
+{
+	struct messaging_defer_callback_state *state;
+	struct tevent_immediate *im;
+
+	state = talloc(msg_ctx, struct messaging_defer_callback_state);
+	if (state == NULL) {
+		DEBUG(1, ("talloc failed\n"));
+		return;
+	}
+	state->msg_ctx = msg_ctx;
+	state->fn = fn;
+	state->private_data = private_data;
+
+	state->rec = messaging_rec_dup(state, rec);
+	if (state->rec == NULL) {
+		DEBUG(1, ("talloc failed\n"));
+		TALLOC_FREE(state);
+		return;
+	}
+
+	im = tevent_create_immediate(state);
+	if (im == NULL) {
+		DEBUG(1, ("tevent_create_immediate failed\n"));
+		TALLOC_FREE(state);
+		return;
+	}
+	tevent_schedule_immediate(im, msg_ctx->event_ctx,
+				  messaging_defer_callback_trigger, state);
+}
+
+static void messaging_defer_callback_trigger(struct tevent_context *ev,
+					     struct tevent_immediate *im,
+					     void *private_data)
+{
+	struct messaging_defer_callback_state *state = talloc_get_type_abort(
+		private_data, struct messaging_defer_callback_state);
+	struct messaging_rec *rec = state->rec;
+
+	state->fn(state->msg_ctx, state->private_data, rec->msg_type, rec->src,
+		  &rec->buf);
+	TALLOC_FREE(state);
+}
+
 /*
   Dispatch one messaging_rec
 */
@@ -600,15 +738,34 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx,
 
 	for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
 		next = cb->next;
-		if (cb->msg_type == rec->msg_type) {
+		if (cb->msg_type != rec->msg_type) {
+			continue;
+		}
+
+		if (messaging_is_self_send(msg_ctx, &rec->dest)) {
+			/*
+			 * This is a self-send. We are called here from
+			 * messaging_send(), and we don't want to directly
+			 * recurse into the callback but go via a
+			 * tevent_loop_once
+			 */
+			messaging_defer_callback(msg_ctx, rec, cb->fn,
+						 cb->private_data);
+		} else {
+			/*
+			 * This comes from a different process. we are called
+			 * from the event loop, so we should call back
+			 * directly.


-- 
Samba Shared Repository


More information about the samba-cvs mailing list