[PATCH] pthreadpool: create a tevent_threaded_context per registered event context
Ralph Böhme
slow at samba.org
Tue Nov 14 15:35:40 UTC 2017
Hi!
Attached is a patch that enhances pthreadpool_tevent to create one
tevent_threaded_context per registered event context instead of per
pthreadpool_tevent_job_send request.
Please review carefully -- the rundown is a bit tricky -- and push if
happy. Thanks!
Fwiw: it passed two private autobuilds and it survives smbtorture
local.messaging tests with a modified messaging subsystem that sends out all
messages via helper threads.
-slow
--
Ralph Boehme, Samba Team https://samba.org/
Samba Developer, SerNet GmbH https://sernet.de/en/samba/
-------------- next part --------------
From e66e63893082628b53d3906502513aeb011acf2a Mon Sep 17 00:00:00 2001
From: Ralph Boehme <slow at samba.org>
Date: Sat, 11 Nov 2017 13:05:03 +0100
Subject: [PATCH] pthreadpool: create a tevent_threaded_context per registered
event context
We just need one tevent_threaded_context per unique combintation of
tevent event contexts and pthreadpool_tevent pools, not multiple copies
for identical combinations of a tevent contexts and a pthreadpool_tevent
pools.
With this commit we register tevent contexts in a list in the
pthreadpool_tevent structure and will only have one
tevent_threaded_context object per tevent context per pool.
With many pthreadpool_tevent_job_send reqs this pays off, I've seen a
small decrease in cpu-ticks with valgrind callgrind and a modified
local.messaging.ping-speed torture test. The test modification ensured
messages we never directly send, but always submitted via
pthreadpool_tevent_job_send.
Signed-off-by: Ralph Boehme <slow at samba.org>
---
lib/pthreadpool/pthreadpool_tevent.c | 139 ++++++++++++++++++++++++++++++-----
1 file changed, 122 insertions(+), 17 deletions(-)
diff --git a/lib/pthreadpool/pthreadpool_tevent.c b/lib/pthreadpool/pthreadpool_tevent.c
index 253a8673518..9442ed56c67 100644
--- a/lib/pthreadpool/pthreadpool_tevent.c
+++ b/lib/pthreadpool/pthreadpool_tevent.c
@@ -25,8 +25,30 @@
struct pthreadpool_tevent_job_state;
+/*
+ * We need one pthreadpool_tevent_glue object per unique combintaion of tevent
+ * contexts and pthreadpool_tevent objects. Maintain a list of used tevent
+ * contexts in a pthreadpool_tevent.
+ */
+struct pthreadpool_tevent_glue {
+ struct pthreadpool_tevent_glue *prev, *next;
+ struct pthreadpool_tevent *pool;
+ struct tevent_context *ev;
+ struct tevent_threaded_context *tctx;
+ struct pthreadpool_tevent_glue_ev_link *ev_link;
+};
+
+/*
+ * The pthreadpool_tevent_glue_ev_link and its destructor ensure we remove the
+ * tevent context from our list of active event contexts.
+ */
+struct pthreadpool_tevent_glue_ev_link {
+ struct pthreadpool_tevent_glue *glue;
+};
+
struct pthreadpool_tevent {
struct pthreadpool *pool;
+ struct pthreadpool_tevent_glue *glue_list;
struct pthreadpool_tevent_job_state *jobs;
};
@@ -35,7 +57,6 @@ struct pthreadpool_tevent_job_state {
struct pthreadpool_tevent_job_state *prev, *next;
struct pthreadpool_tevent *pool;
struct tevent_context *ev;
- struct tevent_threaded_context *tctx;
struct tevent_immediate *im;
struct tevent_req *req;
@@ -77,6 +98,7 @@ int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
{
struct pthreadpool_tevent_job_state *state, *next;
+ struct pthreadpool_tevent_glue *glue = NULL;
int ret;
ret = pthreadpool_destroy(pool->pool);
@@ -91,6 +113,80 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
state->pool = NULL;
}
+ for (glue = pool->glue_list; glue != NULL; glue = pool->glue_list) {
+ /* The glue desctructor removes it from the list */
+ TALLOC_FREE(glue);
+ }
+ pool->glue_list = NULL;
+
+ return 0;
+}
+
+static int pthreadpool_tevent_glue_destructor(
+ struct pthreadpool_tevent_glue *glue)
+{
+ if (glue->pool->glue_list != NULL) {
+ DLIST_REMOVE(glue->pool->glue_list, glue);
+ }
+
+ /* Ensure the ev_link destructor knows we're gone */
+ glue->ev_link->glue = NULL;
+
+ TALLOC_FREE(glue->ev_link);
+ TALLOC_FREE(glue->tctx);
+
+ return 0;
+}
+
+static int pthreadpool_tevent_glue_link_destructor(
+ struct pthreadpool_tevent_glue_ev_link *ev_link)
+{
+ TALLOC_FREE(ev_link->glue);
+ return 0;
+}
+
+static int pthreadpool_tevent_register_ev(struct pthreadpool_tevent *pool,
+ struct tevent_context *ev)
+{
+ struct pthreadpool_tevent_glue *glue = NULL;
+ struct pthreadpool_tevent_glue_ev_link *ev_link = NULL;
+
+ for (glue = pool->glue_list; glue != NULL; glue = glue->next) {
+ if (glue->ev == ev) {
+ return 0;
+ }
+ }
+
+ glue = talloc_zero(pool, struct pthreadpool_tevent_glue);
+ if (glue == NULL) {
+ return ENOMEM;
+ }
+ *glue = (struct pthreadpool_tevent_glue) {
+ .pool = pool,
+ .ev = ev,
+ };
+ talloc_set_destructor(glue, pthreadpool_tevent_glue_destructor);
+
+ ev_link = talloc_zero(ev, struct pthreadpool_tevent_glue_ev_link);
+ if (ev_link == NULL) {
+ TALLOC_FREE(glue);
+ return ENOMEM;
+ }
+ ev_link->glue = glue;
+ talloc_set_destructor(ev_link, pthreadpool_tevent_glue_link_destructor);
+
+ glue->ev_link = ev_link;
+
+#ifdef HAVE_PTHREAD
+ glue->tctx = tevent_threaded_context_create(pool, ev);
+ if (glue->tctx == NULL) {
+ TALLOC_FREE(ev_link);
+ TALLOC_FREE(glue);
+ return ENOMEM;
+ }
+#endif
+
+ DLIST_ADD(pool->glue_list, glue);
return 0;
}
@@ -147,20 +243,11 @@ struct tevent_req *pthreadpool_tevent_job_send(
return tevent_req_post(req, ev);
}
-#ifdef HAVE_PTHREAD
- state->tctx = tevent_threaded_context_create(state, ev);
- if (state->tctx == NULL && errno == ENOSYS) {
- /*
- * Samba build with pthread support but
- * tevent without???
- */
- tevent_req_error(req, ENOSYS);
- return tevent_req_post(req, ev);
- }
- if (tevent_req_nomem(state->tctx, req)) {
+ ret = pthreadpool_tevent_register_ev(pool, ev);
+ if (ret != 0) {
+ tevent_req_error(req, errno);
return tevent_req_post(req, ev);
}
-#endif
ret = pthreadpool_add_job(pool->pool, 0,
pthreadpool_tevent_job_fn,
@@ -194,10 +281,30 @@ static int pthreadpool_tevent_job_signal(int jobid,
{
struct pthreadpool_tevent_job_state *state = talloc_get_type_abort(
job_private_data, struct pthreadpool_tevent_job_state);
+ struct tevent_threaded_context *tctx = NULL;
+ struct pthreadpool_tevent_glue *g = NULL;
+
+ if (state->pool == NULL) {
+ /* The pthreadpool_tevent is already gone */
+ return 0;
+ }
- if (state->tctx != NULL) {
+#ifdef HAVE_PTHREAD
+ for (g = state->pool->glue_list; g != NULL; g = g->next) {
+ if (g->ev == state->ev) {
+ tctx = g->tctx;
+ break;
+ }
+ }
+
+ if (tctx == NULL) {
+ abort();
+ }
+#endif
+
+ if (tctx != NULL) {
/* with HAVE_PTHREAD */
- tevent_threaded_schedule_immediate(state->tctx, state->im,
+ tevent_threaded_schedule_immediate(tctx, state->im,
pthreadpool_tevent_job_done,
state);
} else {
@@ -222,8 +329,6 @@ static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
state->pool = NULL;
}
- TALLOC_FREE(state->tctx);
-
if (state->req == NULL) {
/*
* There was a talloc_free() state->req
--
2.13.6
More information about the samba-technical
mailing list