[PATCH] pthreadpool: create a tevent_threaded_context per registered event context

Ralph Böhme slow at samba.org
Tue Nov 14 15:35:40 UTC 2017


Hi!

Attached is a patch that enhances pthreadpool_tevent to create one
tevent_threaded_context per registered event context instead of per
pthreadpool_tevent_job_send request.

Please review carefully -- the rundown is a bit tricky -- and push if
happy. Thanks!

Fwiw: it passed two private autobuilds and it survives smbtorture
local.messaging tests with a modified messaging subsystem that sends out all
messages via helper threads.

-slow

-- 
Ralph Boehme, Samba Team       https://samba.org/
Samba Developer, SerNet GmbH   https://sernet.de/en/samba/
-------------- next part --------------
From e66e63893082628b53d3906502513aeb011acf2a Mon Sep 17 00:00:00 2001
From: Ralph Boehme <slow at samba.org>
Date: Sat, 11 Nov 2017 13:05:03 +0100
Subject: [PATCH] pthreadpool: create a tevent_threaded_context per registered
 event context

We just need one tevent_threaded_context per unique combintation of
tevent event contexts and pthreadpool_tevent pools, not multiple copies
for identical combinations of a tevent contexts and a pthreadpool_tevent
pools.

With this commit we register tevent contexts in a list in the
pthreadpool_tevent structure and will only have one
tevent_threaded_context object per tevent context per pool.

With many pthreadpool_tevent_job_send reqs this pays off, I've seen a
small decrease in cpu-ticks with valgrind callgrind and a modified
local.messaging.ping-speed torture test. The test modification ensured
messages we never directly send, but always submitted via
pthreadpool_tevent_job_send.

Signed-off-by: Ralph Boehme <slow at samba.org>
---
 lib/pthreadpool/pthreadpool_tevent.c | 139 ++++++++++++++++++++++++++++++-----
 1 file changed, 122 insertions(+), 17 deletions(-)

diff --git a/lib/pthreadpool/pthreadpool_tevent.c b/lib/pthreadpool/pthreadpool_tevent.c
index 253a8673518..9442ed56c67 100644
--- a/lib/pthreadpool/pthreadpool_tevent.c
+++ b/lib/pthreadpool/pthreadpool_tevent.c
@@ -25,8 +25,30 @@
 
 struct pthreadpool_tevent_job_state;
 
+/*
+ * We need one pthreadpool_tevent_glue object per unique combintaion of tevent
+ * contexts and pthreadpool_tevent objects. Maintain a list of used tevent
+ * contexts in a pthreadpool_tevent.
+ */
+struct pthreadpool_tevent_glue {
+	struct pthreadpool_tevent_glue *prev, *next;
+	struct pthreadpool_tevent *pool;
+	struct tevent_context *ev;
+	struct tevent_threaded_context *tctx;
+	struct pthreadpool_tevent_glue_ev_link *ev_link;
+};
+
+/*
+ * The pthreadpool_tevent_glue_ev_link and its destructor ensure we remove the
+ * tevent context from our list of active event contexts.
+ */
+struct pthreadpool_tevent_glue_ev_link {
+	struct pthreadpool_tevent_glue *glue;
+};
+
 struct pthreadpool_tevent {
 	struct pthreadpool *pool;
+	struct pthreadpool_tevent_glue *glue_list;
 
 	struct pthreadpool_tevent_job_state *jobs;
 };
@@ -35,7 +57,6 @@ struct pthreadpool_tevent_job_state {
 	struct pthreadpool_tevent_job_state *prev, *next;
 	struct pthreadpool_tevent *pool;
 	struct tevent_context *ev;
-	struct tevent_threaded_context *tctx;
 	struct tevent_immediate *im;
 	struct tevent_req *req;
 
@@ -77,6 +98,7 @@ int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
 {
 	struct pthreadpool_tevent_job_state *state, *next;
+	struct pthreadpool_tevent_glue *glue = NULL;
 	int ret;
 
 	ret = pthreadpool_destroy(pool->pool);
@@ -91,6 +113,80 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
 		state->pool = NULL;
 	}
 
+	for (glue = pool->glue_list; glue != NULL; glue = pool->glue_list) {
+		/* The glue desctructor removes it from the list */
+		TALLOC_FREE(glue);
+	}
+	pool->glue_list = NULL;
+
+	return 0;
+}
+
+static int pthreadpool_tevent_glue_destructor(
+	struct pthreadpool_tevent_glue *glue)
+{
+	if (glue->pool->glue_list != NULL) {
+		DLIST_REMOVE(glue->pool->glue_list, glue);
+	}
+
+	/* Ensure the ev_link destructor knows we're gone */
+	glue->ev_link->glue = NULL;
+
+	TALLOC_FREE(glue->ev_link);
+	TALLOC_FREE(glue->tctx);
+
+	return 0;
+}
+
+static int pthreadpool_tevent_glue_link_destructor(
+	struct pthreadpool_tevent_glue_ev_link *ev_link)
+{
+	TALLOC_FREE(ev_link->glue);
+	return 0;
+}
+
+static int pthreadpool_tevent_register_ev(struct pthreadpool_tevent *pool,
+					  struct tevent_context *ev)
+{
+	struct pthreadpool_tevent_glue *glue = NULL;
+	struct pthreadpool_tevent_glue_ev_link *ev_link = NULL;
+
+	for (glue = pool->glue_list; glue != NULL; glue = glue->next) {
+		if (glue->ev == ev) {
+			return 0;
+		}
+	}
+
+	glue = talloc_zero(pool, struct pthreadpool_tevent_glue);
+	if (glue == NULL) {
+		return ENOMEM;
+	}
+	*glue = (struct pthreadpool_tevent_glue) {
+		.pool = pool,
+		.ev = ev,
+	};
+	talloc_set_destructor(glue, pthreadpool_tevent_glue_destructor);
+
+	ev_link = talloc_zero(ev, struct pthreadpool_tevent_glue_ev_link);
+	if (ev_link == NULL) {
+		TALLOC_FREE(glue);
+		return ENOMEM;
+	}
+	ev_link->glue = glue;
+	talloc_set_destructor(ev_link, pthreadpool_tevent_glue_link_destructor);
+
+	glue->ev_link = ev_link;
+
+#ifdef HAVE_PTHREAD
+	glue->tctx = tevent_threaded_context_create(pool, ev);
+	if (glue->tctx == NULL) {
+		TALLOC_FREE(ev_link);
+		TALLOC_FREE(glue);
+		return ENOMEM;
+	}
+#endif
+
+	DLIST_ADD(pool->glue_list, glue);
 	return 0;
 }
 
@@ -147,20 +243,11 @@ struct tevent_req *pthreadpool_tevent_job_send(
 		return tevent_req_post(req, ev);
 	}
 
-#ifdef HAVE_PTHREAD
-	state->tctx = tevent_threaded_context_create(state, ev);
-	if (state->tctx == NULL && errno == ENOSYS) {
-		/*
-		 * Samba build with pthread support but
-		 * tevent without???
-		 */
-		tevent_req_error(req, ENOSYS);
-		return tevent_req_post(req, ev);
-	}
-	if (tevent_req_nomem(state->tctx, req)) {
+	ret = pthreadpool_tevent_register_ev(pool, ev);
+	if (ret != 0) {
+		tevent_req_error(req, errno);
 		return tevent_req_post(req, ev);
 	}
-#endif
 
 	ret = pthreadpool_add_job(pool->pool, 0,
 				  pthreadpool_tevent_job_fn,
@@ -194,10 +281,30 @@ static int pthreadpool_tevent_job_signal(int jobid,
 {
 	struct pthreadpool_tevent_job_state *state = talloc_get_type_abort(
 		job_private_data, struct pthreadpool_tevent_job_state);
+	struct tevent_threaded_context *tctx = NULL;
+	struct pthreadpool_tevent_glue *g = NULL;
+
+	if (state->pool == NULL) {
+		/* The pthreadpool_tevent is already gone */
+		return 0;
+	}
 
-	if (state->tctx != NULL) {
+#ifdef HAVE_PTHREAD
+	for (g = state->pool->glue_list; g != NULL; g = g->next) {
+		if (g->ev == state->ev) {
+			tctx = g->tctx;
+			break;
+		}
+	}
+
+	if (tctx == NULL) {
+		abort();
+	}
+#endif
+
+	if (tctx != NULL) {
 		/* with HAVE_PTHREAD */
-		tevent_threaded_schedule_immediate(state->tctx, state->im,
+		tevent_threaded_schedule_immediate(tctx, state->im,
 						   pthreadpool_tevent_job_done,
 						   state);
 	} else {
@@ -222,8 +329,6 @@ static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
 		state->pool = NULL;
 	}
 
-	TALLOC_FREE(state->tctx);
-
 	if (state->req == NULL) {
 		/*
 		 * There was a talloc_free() state->req
-- 
2.13.6



More information about the samba-technical mailing list