[PATCH] Remove cached event context from irpc and imessaging

Volker Lendecke Volker.Lendecke at SerNet.DE
Mon May 5 01:48:36 MDT 2014


On Mon, May 05, 2014 at 05:16:48PM +1200, Andrew Bartlett wrote:
> I've been thinking about the steps to use irpc in common, and this seems
> to me to be the first step, to remove the cached event context that is
> avoided in the source3 code.

From that area: Attached find some unfinished patches that
show how it might be possible to have multiple task ids on
top of the source3 based messaging system. They are not
ready for upstream yet, I am just posting them for reference
here and as a possible starting point for a discussion.

With best regards,

Volker Lendecke

-- 
SerNet GmbH, Bahnhofsallee 1b, 37081 Göttingen
phone: +49-551-370000-0, fax: +49-551-370000-9
AG Göttingen, HRB 2816, GF: Dr. Johannes Loxen
http://www.sernet.de, mailto:kontakt at sernet.de
-------------- next part --------------
From 292e20a7cb5259aa862243d3ead6ada559829cc4 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Thu, 24 Apr 2014 09:05:53 +0000
Subject: [PATCH 1/5] messaging: Add messaging_filtered_read

This delegates the decision whether to read a message to a callback
---
 source3/include/messages.h |    8 +++
 source3/lib/messages.c     |  128 +++++++++++++++++++++++++++++++++++---------
 2 files changed, 111 insertions(+), 25 deletions(-)

diff --git a/source3/include/messages.h b/source3/include/messages.h
index 06c1748..7801dfb 100644
--- a/source3/include/messages.h
+++ b/source3/include/messages.h
@@ -142,6 +142,14 @@ NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
 			    struct messaging_rec *rec);
 
+struct tevent_req *messaging_filtered_read_send(
+	TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+	struct messaging_context *msg_ctx,
+	bool (*filter)(struct messaging_rec *rec, void *private_data),
+	void *private_data);
+int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+				 struct messaging_rec **presult);
+
 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
 				       struct tevent_context *ev,
 				       struct messaging_context *msg,
diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index 9284ac1..ca254a4 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -458,33 +458,38 @@ static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
 	return result;
 }
 
-struct messaging_read_state {
+struct messaging_filtered_read_state {
 	struct tevent_context *ev;
 	struct messaging_context *msg_ctx;
-	uint32_t msg_type;
+
+	bool (*filter)(struct messaging_rec *rec, void *private_data);
+	void *private_data;
+
 	struct messaging_rec *rec;
 };
 
-static void messaging_read_cleanup(struct tevent_req *req,
-				   enum tevent_req_state req_state);
+static void messaging_filtered_read_cleanup(struct tevent_req *req,
+					    enum tevent_req_state req_state);
 
-struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
-				       struct tevent_context *ev,
-				       struct messaging_context *msg_ctx,
-				       uint32_t msg_type)
+struct tevent_req *messaging_filtered_read_send(
+	TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+	struct messaging_context *msg_ctx,
+	bool (*filter)(struct messaging_rec *rec, void *private_data),
+	void *private_data)
 {
 	struct tevent_req *req;
-	struct messaging_read_state *state;
+	struct messaging_filtered_read_state *state;
 	size_t new_waiters_len;
 
 	req = tevent_req_create(mem_ctx, &state,
-				struct messaging_read_state);
+				struct messaging_filtered_read_state);
 	if (req == NULL) {
 		return NULL;
 	}
 	state->ev = ev;
 	state->msg_ctx = msg_ctx;
-	state->msg_type = msg_type;
+	state->filter = filter;
+	state->private_data = private_data;
 
 	new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
 
@@ -501,16 +506,16 @@ struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
 
 	msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
 	msg_ctx->num_new_waiters += 1;
-	tevent_req_set_cleanup_fn(req, messaging_read_cleanup);
+	tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
 
 	return req;
 }
 
-static void messaging_read_cleanup(struct tevent_req *req,
-				   enum tevent_req_state req_state)
+static void messaging_filtered_read_cleanup(struct tevent_req *req,
+					    enum tevent_req_state req_state)
 {
-	struct messaging_read_state *state = tevent_req_data(
-		req, struct messaging_read_state);
+	struct messaging_filtered_read_state *state = tevent_req_data(
+		req, struct messaging_filtered_read_state);
 	struct messaging_context *msg_ctx = state->msg_ctx;
 	unsigned i;
 
@@ -531,11 +536,11 @@ static void messaging_read_cleanup(struct tevent_req *req,
 	}
 }
 
-static void messaging_read_done(struct tevent_req *req,
-				struct messaging_rec *rec)
+static void messaging_filtered_read_done(struct tevent_req *req,
+					 struct messaging_rec *rec)
 {
-	struct messaging_read_state *state = tevent_req_data(
-		req, struct messaging_read_state);
+	struct messaging_filtered_read_state *state = tevent_req_data(
+		req, struct messaging_filtered_read_state);
 
 	state->rec = messaging_rec_dup(state, rec);
 	if (tevent_req_nomem(state->rec, req)) {
@@ -544,6 +549,79 @@ static void messaging_read_done(struct tevent_req *req,
 	tevent_req_done(req);
 }
 
+int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+				 struct messaging_rec **presult)
+{
+	struct messaging_filtered_read_state *state = tevent_req_data(
+		req, struct messaging_filtered_read_state);
+	int err;
+
+	if (tevent_req_is_unix_error(req, &err)) {
+		tevent_req_received(req);
+		return err;
+	}
+	*presult = talloc_move(mem_ctx, &state->rec);
+	return 0;
+}
+
+struct messaging_read_state {
+	uint32_t msg_type;
+	struct messaging_rec *rec;
+};
+
+static bool messaging_read_filter(struct messaging_rec *rec,
+				  void *private_data);
+static void messaging_read_done(struct tevent_req *subreq);
+
+struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
+				       struct tevent_context *ev,
+				       struct messaging_context *msg,
+				       uint32_t msg_type)
+{
+	struct tevent_req *req, *subreq;
+	struct messaging_read_state *state;
+
+	req = tevent_req_create(mem_ctx, &state,
+				struct messaging_read_state);
+	if (req == NULL) {
+		return NULL;
+	}
+	state->msg_type = msg_type;
+
+	subreq = messaging_filtered_read_send(state, ev, msg,
+					      messaging_read_filter, state);
+	if (tevent_req_nomem(subreq, req)) {
+		return tevent_req_post(req, ev);
+	}
+	tevent_req_set_callback(subreq, messaging_read_done, req);
+	return req;
+}
+
+static bool messaging_read_filter(struct messaging_rec *rec,
+				  void *private_data)
+{
+	struct messaging_read_state *state = talloc_get_type_abort(
+		private_data, struct messaging_read_state);
+
+	return rec->msg_type == state->msg_type;
+}
+
+static void messaging_read_done(struct tevent_req *subreq)
+{
+	struct tevent_req *req = tevent_req_callback_data(
+		subreq, struct tevent_req);
+	struct messaging_read_state *state = tevent_req_data(
+		req, struct messaging_read_state);
+	int ret;
+
+	ret = messaging_filtered_read_recv(subreq, state, &state->rec);
+	TALLOC_FREE(subreq);
+	if (tevent_req_error(req, ret)) {
+		return;
+	}
+	tevent_req_done(req);
+}
+
 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
 			struct messaging_rec **presult)
 {
@@ -552,7 +630,6 @@ int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
 	int err;
 
 	if (tevent_req_is_unix_error(req, &err)) {
-		tevent_req_received(req);
 		return err;
 	}
 	if (presult != NULL) {
@@ -618,7 +695,7 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx,
 	i = 0;
 	while (i < msg_ctx->num_waiters) {
 		struct tevent_req *req;
-		struct messaging_read_state *state;
+		struct messaging_filtered_read_state *state;
 
 		req = msg_ctx->waiters[i];
 		if (req == NULL) {
@@ -638,9 +715,10 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx,
 			continue;
 		}
 
-		state = tevent_req_data(req, struct messaging_read_state);
-		if (state->msg_type == rec->msg_type) {
-			messaging_read_done(req, rec);
+		state = tevent_req_data(
+			req, struct messaging_filtered_read_state);
+		if (state->filter(rec, state->private_data)) {
+			messaging_filtered_read_done(req, rec);
 		}
 
 		i += 1;
-- 
1.7.9.5


From 1ea5388c8430a76e6868973b60cb8c2e2e7fb3f4 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Thu, 24 Apr 2014 09:23:48 +0000
Subject: [PATCH 2/5] dbwrap: Use messaging_filtered_read

This does not really save any code lines, but IMHO the code is simpler
this way. Also, in case we have lots of watchers this will be slightly
cheaper, because we don't have to re-establish a tevent_req.
---
 source3/lib/dbwrap/dbwrap_watch.c |   42 ++++++++++++++++++-------------------
 1 file changed, 21 insertions(+), 21 deletions(-)

diff --git a/source3/lib/dbwrap/dbwrap_watch.c b/source3/lib/dbwrap/dbwrap_watch.c
index 4f3a2b3..a5f1ebd 100644
--- a/source3/lib/dbwrap/dbwrap_watch.c
+++ b/source3/lib/dbwrap/dbwrap_watch.c
@@ -235,6 +235,8 @@ struct dbwrap_record_watch_state {
 	TDB_DATA w_key;
 };
 
+static bool dbwrap_record_watch_filter(struct messaging_rec *rec,
+				       void *private_data);
 static void dbwrap_record_watch_done(struct tevent_req *subreq);
 static int dbwrap_record_watch_state_destructor(
 	struct dbwrap_record_watch_state *state);
@@ -271,8 +273,8 @@ struct tevent_req *dbwrap_record_watch_send(TALLOC_CTX *mem_ctx,
 		return tevent_req_post(req, ev);
 	}
 
-	subreq = messaging_read_send(state, ev, state->msg,
-				     MSG_DBWRAP_MODIFIED);
+	subreq = messaging_filtered_read_send(
+		state, ev, state->msg, dbwrap_record_watch_filter, state);
 	if (tevent_req_nomem(subreq, req)) {
 		return tevent_req_post(req, ev);
 	}
@@ -288,6 +290,21 @@ struct tevent_req *dbwrap_record_watch_send(TALLOC_CTX *mem_ctx,
 	return req;
 }
 
+static bool dbwrap_record_watch_filter(struct messaging_rec *rec,
+				       void *private_data)
+{
+	struct dbwrap_record_watch_state *state = talloc_get_type_abort(
+		private_data, struct dbwrap_record_watch_state);
+
+	if (rec->msg_type != MSG_DBWRAP_MODIFIED) {
+		return false;
+	}
+	if (rec->buf.length != state->w_key.dsize) {
+		return false;
+	}
+	return memcmp(rec->buf.data, state->w_key.dptr,	rec->buf.length) == 0;
+}
+
 static int dbwrap_record_watch_state_destructor(
 	struct dbwrap_record_watch_state *s)
 {
@@ -351,33 +368,16 @@ static void dbwrap_record_watch_done(struct tevent_req *subreq)
 {
 	struct tevent_req *req = tevent_req_callback_data(
 		subreq, struct tevent_req);
-	struct dbwrap_record_watch_state *state = tevent_req_data(
-		req, struct dbwrap_record_watch_state);
 	struct messaging_rec *rec;
 	int ret;
 
-	ret = messaging_read_recv(subreq, talloc_tos(), &rec);
+	ret = messaging_filtered_read_recv(subreq, talloc_tos(), &rec);
 	TALLOC_FREE(subreq);
 	if (ret != 0) {
 		tevent_req_nterror(req, map_nt_error_from_unix(ret));
 		return;
 	}
-
-	if ((rec->buf.length == state->w_key.dsize) &&
-	    (memcmp(rec->buf.data, state->w_key.dptr, rec->buf.length) == 0)) {
-		tevent_req_done(req);
-		return;
-	}
-
-	/*
-	 * Not our record, wait for the next one
-	 */
-	subreq = messaging_read_send(state, state->ev, state->msg,
-				     MSG_DBWRAP_MODIFIED);
-	if (tevent_req_nomem(subreq, req)) {
-		return;
-	}
-	tevent_req_set_callback(subreq, dbwrap_record_watch_done, req);
+	tevent_req_done(req);
 }
 
 NTSTATUS dbwrap_record_watch_recv(struct tevent_req *req,
-- 
1.7.9.5


From 755e4e2dee2083d0e71507111a7ef00c77f96d52 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 2 May 2014 09:12:52 +0000
Subject: [PATCH 3/5] messaging3: Fix 80-char line limit

---
 source3/lib/messages.c |   14 +++++++++-----
 1 file changed, 9 insertions(+), 5 deletions(-)

diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index ca254a4..065782a 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -709,7 +709,7 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx,
 				memmove(&msg_ctx->waiters[i],
 					&msg_ctx->waiters[i+1],
 					sizeof(struct tevent_req *) *
-						(msg_ctx->num_waiters - i - 1));
+					    (msg_ctx->num_waiters - i - 1));
 			}
 			msg_ctx->num_waiters -= 1;
 			continue;
@@ -735,7 +735,8 @@ bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
 
 	req = background_job_send(
 		msg, msg->event_ctx, msg, NULL, 0,
-		lp_parm_int(-1, "messaging", "messaging dgm cleanup interval", 60*15),
+		lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
+			    60*15),
 		mess_parent_dgm_cleanup, msg);
 	if (req == NULL) {
 		return false;
@@ -752,7 +753,8 @@ static int mess_parent_dgm_cleanup(void *private_data)
 
 	status = messaging_dgm_wipe(msg_ctx);
 	DEBUG(10, ("messaging_dgm_wipe returned %s\n", nt_errstr(status)));
-	return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval", 60*15);
+	return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
+			   60*15);
 }
 
 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
@@ -763,11 +765,13 @@ static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
 
 	status = background_job_recv(req);
 	TALLOC_FREE(req);
-	DEBUG(1, ("messaging dgm cleanup job ended with %s\n", nt_errstr(status)));
+	DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
+		  nt_errstr(status)));
 
 	req = background_job_send(
 		msg, msg->event_ctx, msg, NULL, 0,
-		lp_parm_int(-1, "messaging", "messaging dgm cleanup interval", 60*15),
+		lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
+			    60*15),
 		mess_parent_dgm_cleanup, msg);
 	if (req == NULL) {
 		DEBUG(1, ("background_job_send failed\n"));
-- 
1.7.9.5


From 95303239cf56ee509fd67ae1f82e149092b06481 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 2 May 2014 09:20:40 +0000
Subject: [PATCH 4/5] messaging3: Add comments about not touching "waiters"

---
 source3/lib/messages.c |   16 ++++++++++++++++
 1 file changed, 16 insertions(+)

diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index 065782a..6a08531 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -491,6 +491,14 @@ struct tevent_req *messaging_filtered_read_send(
 	state->filter = filter;
 	state->private_data = private_data;
 
+	/*
+	 * We add ourselves to the "new_waiters" array, not the "waiters"
+	 * array. If we are called from within messaging_read_done,
+	 * messaging_dispatch_rec will be in an active for-loop on
+	 * "waiters". We must be careful not to mess with this array, because
+	 * it could mean that a single event is being delivered twice.
+	 */
+
 	new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
 
 	if (new_waiters_len == msg_ctx->num_new_waiters) {
@@ -521,6 +529,14 @@ static void messaging_filtered_read_cleanup(struct tevent_req *req,
 
 	tevent_req_set_cleanup_fn(req, NULL);
 
+	/*
+	 * Just set the [new_]waiters entry to NULL, be careful not to mess
+	 * with the other "waiters" array contents. We are often called from
+	 * within "messaging_dispatch_rec", which loops over
+	 * "waiters". Messing with the "waiters" array will mess up that
+	 * for-loop.
+	 */
+
 	for (i=0; i<msg_ctx->num_waiters; i++) {
 		if (msg_ctx->waiters[i] == req) {
 			msg_ctx->waiters[i] = NULL;
-- 
1.7.9.5


From cf17b5b7a10adf58acc23df78b20ee78372c052c Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Thu, 24 Apr 2014 09:54:21 +0000
Subject: [PATCH 5/5] messaging: Add msg_task

Allow multiple tasks within one process
---
 source3/include/messages.h |   14 +++++
 source3/lib/messages.c     |  144 ++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 158 insertions(+)

diff --git a/source3/include/messages.h b/source3/include/messages.h
index 7801dfb..1d42f95 100644
--- a/source3/include/messages.h
+++ b/source3/include/messages.h
@@ -84,6 +84,8 @@ struct messaging_context {
 
 	struct messaging_backend *local;
 	struct messaging_backend *remote;
+
+	struct idr_context *tasks;
 };
 
 struct messaging_backend {
@@ -159,6 +161,18 @@ int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
 
 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg);
 
+struct msg_task *msg_task_init(TALLOC_CTX *mem_ctx,
+			       struct messaging_context *msg_ctx);
+struct server_id msg_task_id(const struct msg_task *t);
+NTSTATUS msg_task_send(struct msg_task *t, struct server_id dst,
+		       uint32_t msg_type, const struct iovec *iov, int iovlen);
+struct tevent_req *msg_task_read_send(
+	TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct msg_task *t,
+	bool (*filter)(struct messaging_rec *rec, void *private_data),
+	void *private_data);
+int msg_task_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+		       struct messaging_rec **presult);
+
 #include "librpc/gen_ndr/ndr_messaging.h"
 
 #endif
diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index 6a08531..e5fb000 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -198,6 +198,12 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
 	ctx->id = procid_self();
 	ctx->event_ctx = ev;
 
+	ctx->tasks = idr_init(ctx);
+	if (ctx->tasks == NULL) {
+		TALLOC_FREE(ctx);
+		return NULL;
+	}
+
 	status = messaging_dgm_init(ctx, ctx, &ctx->local);
 
 	if (!NT_STATUS_IS_OK(status)) {
@@ -795,4 +801,142 @@ static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
 	tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
 }
 
+struct msg_task {
+	struct messaging_context *msg_ctx;
+	uint32_t task_id;
+};
+
+static int msg_task_destructor(struct msg_task *t);
+
+struct msg_task *msg_task_init(TALLOC_CTX *mem_ctx,
+			       struct messaging_context *msg_ctx)
+{
+	struct msg_task *t;
+	int task_id;
+
+	t = talloc(mem_ctx, struct msg_task);
+	if (t == NULL) {
+		return NULL;
+	}
+	t->msg_ctx = msg_ctx;
+
+	task_id = idr_get_new_above(msg_ctx->tasks, t, 1, INT32_MAX);
+	if (task_id == -1) {
+		TALLOC_FREE(t);
+		return NULL;
+	}
+	t->task_id = task_id;
+	talloc_set_destructor(t, msg_task_destructor);
+	return t;
+}
+
+static int msg_task_destructor(struct msg_task *t)
+{
+	return idr_remove(t->msg_ctx->tasks, t->task_id);
+}
+
+struct server_id msg_task_id(const struct msg_task *t)
+{
+	struct server_id id = messaging_server_id(t->msg_ctx);
+	id.task_id = t->task_id;
+	return id;
+}
+
+NTSTATUS msg_task_send(struct msg_task *t, struct server_id dst,
+		       uint32_t msg_type, const struct iovec *iov, int iovlen)
+{
+	struct messaging_context *msg_ctx = t->msg_ctx;
+	uint32_t orig_task_id;
+	NTSTATUS status;
+
+	orig_task_id = msg_ctx->id.task_id; /* HACK alert ... :-) */
+	msg_ctx->id.task_id = t->task_id;
+
+	status = messaging_send_iov(msg_ctx, dst, msg_type, iov, iovlen);
+
+	msg_ctx->id.task_id = orig_task_id;
+
+	return status;
+}
+
+struct msg_task_read_state {
+	uint32_t task_id;
+	bool (*filter)(struct messaging_rec *rec, void *private_data);
+	void *private_data;
+	struct messaging_rec *rec;
+};
+
+static bool msg_task_read_filter(struct messaging_rec *rec,
+				 void *private_data);
+static void msg_task_read_done(struct tevent_req *subreq);
+
+struct tevent_req *msg_task_read_send(
+	TALLOC_CTX *mem_ctx, struct tevent_context *ev, struct msg_task *t,
+	bool (*filter)(struct messaging_rec *rec, void *private_data),
+	void *private_data)
+{
+	struct tevent_req *req, *subreq;
+	struct msg_task_read_state *state;
+
+	req = tevent_req_create(mem_ctx, &state, struct msg_task_read_state);
+	if (req == NULL) {
+		return NULL;
+	}
+	state->task_id = t->task_id;
+	state->filter = filter;
+	state->private_data = private_data;
+
+	subreq = messaging_filtered_read_send(state, ev, t->msg_ctx,
+					      msg_task_read_filter, state);
+	if (tevent_req_nomem(subreq, req)) {
+		return tevent_req_post(req, ev);
+	}
+	tevent_req_set_callback(subreq, msg_task_read_done, req);
+	return req;
+}
+
+static bool msg_task_read_filter(struct messaging_rec *rec,
+				 void *private_data)
+{
+	struct msg_task_read_state *state = talloc_get_type_abort(
+		private_data, struct msg_task_read_state);
+
+	if (rec->dest.task_id != state->task_id) {
+		return false;
+	}
+	return state->filter(rec, state->private_data);
+}
+
+static void msg_task_read_done(struct tevent_req *subreq)
+{
+	struct tevent_req *req = tevent_req_callback_data(
+		subreq, struct tevent_req);
+	struct msg_task_read_state *state = tevent_req_data(
+		req, struct msg_task_read_state);
+	int ret;
+
+	ret = messaging_read_recv(subreq, state, &state->rec);
+	TALLOC_FREE(subreq);
+	if (tevent_req_error(req, ret)) {
+		return;
+	}
+	tevent_req_done(req);
+}
+
+int msg_task_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+		       struct messaging_rec **presult)
+{
+	struct msg_task_read_state *state = tevent_req_data(
+		req, struct msg_task_read_state);
+	int ret;
+
+	if (tevent_req_is_unix_error(req, &ret)) {
+		return ret;
+	}
+	if (presult != NULL) {
+		*presult = talloc_move(mem_ctx, &state->rec);
+	}
+	return 0;
+}
+
 /** @} **/
-- 
1.7.9.5



More information about the samba-technical mailing list