[PATCH] optimize aio rw path

Volker Lendecke Volker.Lendecke at SerNet.DE
Wed Mar 26 06:39:53 MDT 2014


Hi!

Attached find a few patches that optimize our async I/O
path. It might be a bit controversial, because it uses C99
variable arrays. Here in this situation I think it really
reduces code complexity and is important for performance.

Review would be appreciated!

Thanks,

Volker

-- 
SerNet GmbH, Bahnhofsallee 1b, 37081 Göttingen
phone: +49-551-370000-0, fax: +49-551-370000-9
AG Göttingen, HRB 2816, GF: Dr. Johannes Loxen
http://www.sernet.de, mailto:kontakt at sernet.de
-------------- next part --------------
From a6cf048ab9dccefc65276e2badeac573f75cf303 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 24 Mar 2014 09:40:20 +0000
Subject: [PATCH 1/5] pthreadpool: Add a simple benchmark

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/torture/bench_pthreadpool.c |   64 +++++++++++++++++++++++++++++++++++
 source3/torture/proto.h             |    1 +
 source3/torture/torture.c           |    1 +
 source3/wscript_build               |    1 +
 4 files changed, 67 insertions(+)
 create mode 100644 source3/torture/bench_pthreadpool.c

diff --git a/source3/torture/bench_pthreadpool.c b/source3/torture/bench_pthreadpool.c
new file mode 100644
index 0000000..ee0d203
--- /dev/null
+++ b/source3/torture/bench_pthreadpool.c
@@ -0,0 +1,64 @@
+/*
+ * Unix SMB/CIFS implementation.
+ * Little pthreadpool benchmark
+ *
+ * Copyright (C) Volker Lendecke 2014
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "includes.h"
+#include "lib/pthreadpool/pthreadpool.h"
+#include "proto.h"
+
+extern int torture_numops;
+
+static void null_job(void *private_data)
+{
+	return;
+}
+
+bool run_bench_pthreadpool(int dummy)
+{
+	struct pthreadpool *pool;
+	int i, ret;
+
+	ret = pthreadpool_init(1, &pool);
+	if (ret != 0) {
+		d_fprintf(stderr, "pthreadpool_init failed: %s\n",
+			  strerror(ret));
+		return false;
+	}
+
+	for (i=0; i<torture_numops; i++) {
+		int jobid;
+
+		ret = pthreadpool_add_job(pool, 0, null_job, NULL);
+		if (ret != 0) {
+			d_fprintf(stderr, "pthreadpool_add_job failed: %s\n",
+				  strerror(ret));
+			break;
+		}
+		ret = pthreadpool_finished_job(pool, &jobid);
+		if (ret != 0) {
+			d_fprintf(stderr, "pthreadpool_finished_job failed: %s\n",
+				  strerror(ret));
+			break;
+		}
+	}
+
+	pthreadpool_destroy(pool);
+
+	return (ret == 0);
+}
diff --git a/source3/torture/proto.h b/source3/torture/proto.h
index 2b27289..3673d98 100644
--- a/source3/torture/proto.h
+++ b/source3/torture/proto.h
@@ -112,5 +112,6 @@ bool run_dbwrap_watch1(int dummy);
 bool run_idmap_tdb_common_test(int dummy);
 bool run_local_dbwrap_ctdb(int dummy);
 bool run_qpathinfo_bufsize(int dummy);
+bool run_bench_pthreadpool(int dummy);
 
 #endif /* __TORTURE_H__ */
diff --git a/source3/torture/torture.c b/source3/torture/torture.c
index 340f754..61e9338 100644
--- a/source3/torture/torture.c
+++ b/source3/torture/torture.c
@@ -9593,6 +9593,7 @@ static struct {
 	{ "local-tdb-opener", run_local_tdb_opener, 0 },
 	{ "local-tdb-writer", run_local_tdb_writer, 0 },
 	{ "LOCAL-DBWRAP-CTDB", run_local_dbwrap_ctdb, 0 },
+	{ "LOCAL-BENCH-PTHREADPOOL", run_bench_pthreadpool, 0 },
 	{ "qpathinfo-bufsize", run_qpathinfo_bufsize, 0 },
 	{NULL, NULL, 0}};
 
diff --git a/source3/wscript_build b/source3/wscript_build
index 02a5c7d..ca53951 100755
--- a/source3/wscript_build
+++ b/source3/wscript_build
@@ -1243,6 +1243,7 @@ bld.SAMBA3_BINARY('smbtorture' + bld.env.suffix3,
                  torture/test_dbwrap_ctdb.c
                  torture/test_buffersize.c
                  torture/t_strappend.c
+                 torture/bench_pthreadpool.c
                  torture/wbc_async.c''',
                  deps='''
                  talloc
-- 
1.7.9.5


From 5d4be634c51f97e1593fc645c00b98ba2d0ec0c6 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Fri, 21 Mar 2014 17:53:26 +0100
Subject: [PATCH 2/5] pthreadpool: Avoid a malloc/free per job

pthreadpool_add_job is in our hottest code path for r/w intensive workloads, so
we should avoid anything CPU-intensive. pthreadpool used to malloc each job and
free it in the worker thread. This patch adds a FIFO queue for jobs that helper
threads copy from, avoiding constant malloc/free. This cuts user space
CPU in the local-bench-pthreadpool benchmark by roughly 10% on my system.

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/lib/pthreadpool/pthreadpool.c |  145 +++++++++++++++++++++------------
 1 file changed, 91 insertions(+), 54 deletions(-)

diff --git a/source3/lib/pthreadpool/pthreadpool.c b/source3/lib/pthreadpool/pthreadpool.c
index 654d420..d51e808 100644
--- a/source3/lib/pthreadpool/pthreadpool.c
+++ b/source3/lib/pthreadpool/pthreadpool.c
@@ -34,7 +34,6 @@
 #include "lib/util/dlinklist.h"
 
 struct pthreadpool_job {
-	struct pthreadpool_job *next;
 	int id;
 	void (*fn)(void *private_data);
 	void *private_data;
@@ -57,9 +56,13 @@ struct pthreadpool {
 	pthread_cond_t condvar;
 
 	/*
-	 * List of work jobs
+	 * Array of jobs
 	 */
-	struct pthreadpool_job *jobs, *last_job;
+	size_t jobs_array_len;
+	struct pthreadpool_job *jobs;
+
+	size_t head;
+	size_t num_jobs;
 
 	/*
 	 * pipe for signalling
@@ -113,9 +116,21 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
 		return ENOMEM;
 	}
 
+	pool->jobs_array_len = 4;
+	pool->jobs = calloc(
+		pool->jobs_array_len, sizeof(struct pthreadpool_job));
+
+	if (pool->jobs == NULL) {
+		free(pool);
+		return ENOMEM;
+	}
+
+	pool->head = pool->num_jobs = 0;
+
 	ret = pipe(pool->sig_pipe);
 	if (ret == -1) {
 		int err = errno;
+		free(pool->jobs);
 		free(pool);
 		return err;
 	}
@@ -124,6 +139,7 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
 	if (ret != 0) {
 		close(pool->sig_pipe[0]);
 		close(pool->sig_pipe[1]);
+		free(pool->jobs);
 		free(pool);
 		return ret;
 	}
@@ -133,12 +149,12 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
 		pthread_mutex_destroy(&pool->mutex);
 		close(pool->sig_pipe[0]);
 		close(pool->sig_pipe[1]);
+		free(pool->jobs);
 		free(pool);
 		return ret;
 	}
 
 	pool->shutdown = 0;
-	pool->jobs = pool->last_job = NULL;
 	pool->num_threads = 0;
 	pool->num_exited = 0;
 	pool->exited = NULL;
@@ -151,6 +167,7 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
 		pthread_mutex_destroy(&pool->mutex);
 		close(pool->sig_pipe[0]);
 		close(pool->sig_pipe[1]);
+		free(pool->jobs);
 		free(pool);
 		return ret;
 	}
@@ -221,14 +238,8 @@ static void pthreadpool_child(void)
 		pool->exited = NULL;
 
 		pool->num_idle = 0;
-
-		while (pool->jobs != NULL) {
-			struct pthreadpool_job *job;
-			job = pool->jobs;
-			pool->jobs = job->next;
-			free(job);
-		}
-		pool->last_job = NULL;
+		pool->head = 0;
+		pool->num_jobs = 0;
 
 		ret = pthread_mutex_unlock(&pool->mutex);
 		assert(ret == 0);
@@ -311,7 +322,7 @@ int pthreadpool_destroy(struct pthreadpool *pool)
 		return ret;
 	}
 
-	if ((pool->jobs != NULL) || pool->shutdown) {
+	if ((pool->num_jobs != 0) || pool->shutdown) {
 		ret = pthread_mutex_unlock(&pool->mutex);
 		assert(ret == 0);
 		return EBUSY;
@@ -383,6 +394,7 @@ int pthreadpool_destroy(struct pthreadpool *pool)
 	pool->sig_pipe[1] = -1;
 
 	free(pool->exited);
+	free(pool->jobs);
 	free(pool);
 
 	return 0;
@@ -410,6 +422,61 @@ static void pthreadpool_server_exit(struct pthreadpool *pool)
 	pool->num_exited += 1;
 }
 
+static bool pthreadpool_get_job(struct pthreadpool *p,
+				struct pthreadpool_job *job)
+{
+	if (p->num_jobs == 0) {
+		return false;
+	}
+	*job = p->jobs[p->head];
+	p->head = (p->head+1) % p->jobs_array_len;
+	p->num_jobs -= 1;
+	return true;
+}
+
+static bool pthreadpool_put_job(struct pthreadpool *p,
+				int id,
+				void (*fn)(void *private_data),
+				void *private_data)
+{
+	struct pthreadpool_job *job;
+
+	if (p->num_jobs == p->jobs_array_len) {
+		struct pthreadpool_job *tmp;
+		size_t new_len = p->jobs_array_len * 2;
+
+		tmp = realloc(
+			p->jobs, sizeof(struct pthreadpool_job) * new_len);
+		if (tmp == NULL) {
+			return false;
+		}
+		p->jobs = tmp;
+
+		/*
+		 * We just doubled the jobs array. The array implements a FIFO
+		 * queue with a modulo-based wraparound, so we have to memcpy
+		 * the jobs that are logically at the queue end but physically
+		 * before the queue head into the reallocated area. The new
+		 * space starts at the current jobs_array_len, and we have to
+		 * copy everything before the current head job into the new
+		 * area.
+		 */
+		memcpy(&p->jobs[p->jobs_array_len], p->jobs,
+		       sizeof(struct pthreadpool_job) * p->head);
+
+		p->jobs_array_len = new_len;
+	}
+
+	job = &p->jobs[(p->head + p->num_jobs) % p->jobs_array_len];
+	job->id = id;
+	job->fn = fn;
+	job->private_data = private_data;
+
+	p->num_jobs += 1;
+
+	return true;
+}
+
 static void *pthreadpool_server(void *arg)
 {
 	struct pthreadpool *pool = (struct pthreadpool *)arg;
@@ -422,7 +489,7 @@ static void *pthreadpool_server(void *arg)
 
 	while (1) {
 		struct timespec ts;
-		struct pthreadpool_job *job;
+		struct pthreadpool_job job;
 
 		/*
 		 * idle-wait at most 1 second. If nothing happens in that
@@ -432,7 +499,7 @@ static void *pthreadpool_server(void *arg)
 		clock_gettime(CLOCK_REALTIME, &ts);
 		ts.tv_sec += 1;
 
-		while ((pool->jobs == NULL) && (pool->shutdown == 0)) {
+		while ((pool->num_jobs == 0) && (pool->shutdown == 0)) {
 
 			pool->num_idle += 1;
 			res = pthread_cond_timedwait(
@@ -441,7 +508,7 @@ static void *pthreadpool_server(void *arg)
 
 			if (res == ETIMEDOUT) {
 
-				if (pool->jobs == NULL) {
+				if (pool->num_jobs == 0) {
 					/*
 					 * we timed out and still no work for
 					 * us. Exit.
@@ -456,19 +523,9 @@ static void *pthreadpool_server(void *arg)
 			assert(res == 0);
 		}
 
-		job = pool->jobs;
-
-		if (job != NULL) {
+		if (pthreadpool_get_job(pool, &job)) {
 			ssize_t written;
-
-			/*
-			 * Ok, there's work for us to do, remove the job from
-			 * the pthreadpool list
-			 */
-			pool->jobs = job->next;
-			if (pool->last_job == job) {
-				pool->last_job = NULL;
-			}
+			int sig_pipe = pool->sig_pipe[1];
 
 			/*
 			 * Do the work with the mutex unlocked
@@ -477,12 +534,8 @@ static void *pthreadpool_server(void *arg)
 			res = pthread_mutex_unlock(&pool->mutex);
 			assert(res == 0);
 
-			job->fn(job->private_data);
-
-			written = write(pool->sig_pipe[1], &job->id,
-					sizeof(int));
-
-			free(job);
+			job.fn(job.private_data);
+			written = write(sig_pipe, &job.id, sizeof(job.id));
 
 			res = pthread_mutex_lock(&pool->mutex);
 			assert(res == 0);
@@ -494,7 +547,7 @@ static void *pthreadpool_server(void *arg)
 			}
 		}
 
-		if ((pool->jobs == NULL) && (pool->shutdown != 0)) {
+		if ((pool->num_jobs == 0) && (pool->shutdown != 0)) {
 			/*
 			 * No more work to do and we're asked to shut down, so
 			 * exit
@@ -518,24 +571,12 @@ static void *pthreadpool_server(void *arg)
 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
 			void (*fn)(void *private_data), void *private_data)
 {
-	struct pthreadpool_job *job;
 	pthread_t thread_id;
 	int res;
 	sigset_t mask, omask;
 
-	job = (struct pthreadpool_job *)malloc(sizeof(struct pthreadpool_job));
-	if (job == NULL) {
-		return ENOMEM;
-	}
-
-	job->fn = fn;
-	job->private_data = private_data;
-	job->id = job_id;
-	job->next = NULL;
-
 	res = pthread_mutex_lock(&pool->mutex);
 	if (res != 0) {
-		free(job);
 		return res;
 	}
 
@@ -546,7 +587,6 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
 		 */
 		res = pthread_mutex_unlock(&pool->mutex);
 		assert(res == 0);
-		free(job);
 		return EINVAL;
 	}
 
@@ -558,13 +598,10 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
 	/*
 	 * Add job to the end of the queue
 	 */
-	if (pool->jobs == NULL) {
-		pool->jobs = job;
-	}
-	else {
-		pool->last_job->next = job;
+	if (!pthreadpool_put_job(pool, job_id, fn, private_data)) {
+		pthread_mutex_unlock(&pool->mutex);
+		return ENOMEM;
 	}
-	pool->last_job = job;
 
 	if (pool->num_idle > 0) {
 		/*
-- 
1.7.9.5


From 0fdb3d77ddac78f3f7bbab1659afad1180749ec5 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 24 Mar 2014 10:39:56 +0000
Subject: [PATCH 3/5] pthreadpool: Allow multiple jobs to be received

This can avoid syscalls when multiple jobs are finished simultaneously

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/lib/asys/asys.c                    |    6 +++---
 source3/lib/fncall.c                       |    2 +-
 source3/lib/pthreadpool/pthreadpool.c      |   19 ++++++++++---------
 source3/lib/pthreadpool/pthreadpool.h      |   13 ++++++++-----
 source3/lib/pthreadpool/pthreadpool_sync.c |   26 +++++++++++++++++---------
 source3/lib/pthreadpool/tests.c            |    8 ++++----
 source3/modules/vfs_aio_pthread.c          |    4 ++--
 source3/torture/bench_pthreadpool.c        |    8 ++++----
 8 files changed, 49 insertions(+), 37 deletions(-)

diff --git a/source3/lib/asys/asys.c b/source3/lib/asys/asys.c
index 9937d24..1fd7700 100644
--- a/source3/lib/asys/asys.c
+++ b/source3/lib/asys/asys.c
@@ -295,9 +295,9 @@ int asys_result(struct asys_context *ctx, ssize_t *pret, int *perrno,
 	struct asys_job *job;
 	int ret, jobid;
 
-	ret = pthreadpool_finished_job(ctx->pool, &jobid);
-	if (ret != 0) {
-		return ret;
+	ret = pthreadpool_finished_jobs(ctx->pool, &jobid, 1);
+	if (ret < 0) {
+		return -ret;
 	}
 	if ((jobid < 0) || (jobid >= ctx->num_jobs)) {
 		return EIO;
diff --git a/source3/lib/fncall.c b/source3/lib/fncall.c
index 7f728ba..88304d6 100644
--- a/source3/lib/fncall.c
+++ b/source3/lib/fncall.c
@@ -287,7 +287,7 @@ static void fncall_handler(struct tevent_context *ev, struct tevent_fd *fde,
 	int i, num_pending;
 	int job_id;
 
-	if (pthreadpool_finished_job(ctx->pool, &job_id) != 0) {
+	if (pthreadpool_finished_jobs(ctx->pool, &job_id, 1) < 0) {
 		return;
 	}
 
diff --git a/source3/lib/pthreadpool/pthreadpool.c b/source3/lib/pthreadpool/pthreadpool.c
index d51e808..4436ab3 100644
--- a/source3/lib/pthreadpool/pthreadpool.c
+++ b/source3/lib/pthreadpool/pthreadpool.c
@@ -288,25 +288,26 @@ static void pthreadpool_join_children(struct pthreadpool *pool)
  * Fetch a finished job number from the signal pipe
  */
 
-int pthreadpool_finished_job(struct pthreadpool *pool, int *jobid)
+int pthreadpool_finished_jobs(struct pthreadpool *pool, int *jobids,
+			      unsigned num_jobids)
 {
-	int ret_jobid;
-	ssize_t nread;
+	ssize_t to_read, nread;
 
 	nread = -1;
 	errno = EINTR;
 
+	to_read = sizeof(int) * num_jobids;
+
 	while ((nread == -1) && (errno == EINTR)) {
-		nread = read(pool->sig_pipe[0], &ret_jobid, sizeof(int));
+		nread = read(pool->sig_pipe[0], jobids, to_read);
 	}
 	if (nread == -1) {
-		return errno;
+		return -errno;
 	}
-	if (nread != sizeof(int)) {
-		return EINVAL;
+	if ((nread % sizeof(int)) != 0) {
+		return -EINVAL;
 	}
-	*jobid = ret_jobid;
-	return 0;
+	return nread / sizeof(int);
 }
 
 /*
diff --git a/source3/lib/pthreadpool/pthreadpool.h b/source3/lib/pthreadpool/pthreadpool.h
index fac2d25..adb825a 100644
--- a/source3/lib/pthreadpool/pthreadpool.h
+++ b/source3/lib/pthreadpool/pthreadpool.h
@@ -61,7 +61,7 @@ int pthreadpool_destroy(struct pthreadpool *pool);
  *
  * This adds a job to a pthreadpool. The job can be identified by
  * job_id. This integer will be returned from
- * pthreadpool_finished_job() then the job is completed.
+ * pthreadpool_finished_jobs() then the job is completed.
  *
  * @param[in]	pool		The pool to run the job on
  * @param[in]	job_id		A custom identifier
@@ -84,15 +84,18 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
 int pthreadpool_signal_fd(struct pthreadpool *pool);
 
 /**
- * @brief Get the job_id of a finished job
+ * @brief Get the job_ids of finished jobs
  *
  * This blocks until a job has finished unless the fd returned by
  * pthreadpool_signal_fd() is readable.
  *
  * @param[in]	pool		The pool to query for finished jobs
- * @param[out]  pjobid		The job_id of the finished job
- * @return			success: 0, failure: errno
+ * @param[out]  jobids		The job_ids of the finished job
+ * @param[int]  num_jobids      The job_ids array size
+ * @return			success: >=0, number of finished jobs
+ *                              failure: -errno
  */
-int pthreadpool_finished_job(struct pthreadpool *pool, int *jobid);
+int pthreadpool_finished_jobs(struct pthreadpool *pool, int *jobids,
+			      unsigned num_jobids);
 
 #endif
diff --git a/source3/lib/pthreadpool/pthreadpool_sync.c b/source3/lib/pthreadpool/pthreadpool_sync.c
index 0c2d12f..5f06cae 100644
--- a/source3/lib/pthreadpool/pthreadpool_sync.c
+++ b/source3/lib/pthreadpool/pthreadpool_sync.c
@@ -133,27 +133,35 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
 
 }
 
-int pthreadpool_finished_job(struct pthreadpool *pool, int *jobid)
+int pthreadpool_finished_jobs(struct pthreadpool *pool, int *jobids,
+			      unsigned num_jobids)
 {
-	int ret_jobid;
-	ssize_t nread;
+	ssize_t to_read, nread;
+	int ret;
 
 	nread = -1;
 	errno = EINTR;
 
+	to_read = sizeof(int) * num_jobids;
+
 	while ((nread == -1) && (errno == EINTR)) {
-		nread = read(pool->sig_pipe[0], &ret_jobid, sizeof(int));
+		nread = read(pool->sig_pipe[0], jobids, to_read);
 	}
 	if (nread == -1) {
-		return errno;
+		return -errno;
 	}
-	if (nread != sizeof(int)) {
-		return EINVAL;
+	if ((nread % sizeof(int)) != 0) {
+		return -EINVAL;
 	}
-	*jobid = ret_jobid;
 
 	pool->pipe_busy = 0;
-	return pthreadpool_write_to_pipe(pool);
+
+	ret = pthreadpool_write_to_pipe(pool);
+	if (ret != 0) {
+		return -ret;
+	}
+
+	return nread / sizeof(int);
 }
 
 int pthreadpool_destroy(struct pthreadpool *pool)
diff --git a/source3/lib/pthreadpool/tests.c b/source3/lib/pthreadpool/tests.c
index 170cedf..8474712 100644
--- a/source3/lib/pthreadpool/tests.c
+++ b/source3/lib/pthreadpool/tests.c
@@ -71,8 +71,8 @@ static int test_jobs(int num_threads, int num_jobs)
 
 	for (i=0; i<num_jobs; i++) {
 		int jobid = -1;
-		ret = pthreadpool_finished_job(p, &jobid);
-		if ((ret != 0) || (jobid >= num_jobs)) {
+		ret = pthreadpool_finished_jobs(p, &jobid, 1);
+		if ((ret != 1) || (jobid >= num_jobs)) {
 			fprintf(stderr, "invalid job number %d\n", jobid);
 			return -1;
 		}
@@ -284,8 +284,8 @@ static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
 				continue;
 			}
 
-			ret = pthreadpool_finished_job(pools[j], &jobid);
-			if ((ret != 0) || (jobid >= num_jobs * num_threads)) {
+			ret = pthreadpool_finished_jobs(pools[j], &jobid, 1);
+			if ((ret != 1) || (jobid >= num_jobs * num_threads)) {
 				fprintf(stderr, "invalid job number %d\n",
 					jobid);
 				return -1;
diff --git a/source3/modules/vfs_aio_pthread.c b/source3/modules/vfs_aio_pthread.c
index f7756b9..de114d1 100644
--- a/source3/modules/vfs_aio_pthread.c
+++ b/source3/modules/vfs_aio_pthread.c
@@ -166,8 +166,8 @@ static void aio_open_handle_completion(struct tevent_context *event_ctx,
 		return;
 	}
 
-	ret = pthreadpool_finished_job(open_pool, &jobid);
-	if (ret) {
+	ret = pthreadpool_finished_jobs(open_pool, &jobid, 1);
+	if (ret != 1) {
 		smb_panic("aio_open_handle_completion");
 		/* notreached. */
 		return;
diff --git a/source3/torture/bench_pthreadpool.c b/source3/torture/bench_pthreadpool.c
index ee0d203..247063d 100644
--- a/source3/torture/bench_pthreadpool.c
+++ b/source3/torture/bench_pthreadpool.c
@@ -50,15 +50,15 @@ bool run_bench_pthreadpool(int dummy)
 				  strerror(ret));
 			break;
 		}
-		ret = pthreadpool_finished_job(pool, &jobid);
-		if (ret != 0) {
+		ret = pthreadpool_finished_jobs(pool, &jobid, 1);
+		if (ret < 0) {
 			d_fprintf(stderr, "pthreadpool_finished_job failed: %s\n",
-				  strerror(ret));
+				  strerror(-ret));
 			break;
 		}
 	}
 
 	pthreadpool_destroy(pool);
 
-	return (ret == 0);
+	return (ret == 1);
 }
-- 
1.7.9.5


From 351cd208d96f13dbb2999ae6e3f1c9eae7c3f10b Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 24 Mar 2014 14:36:34 +0000
Subject: [PATCH 4/5] asys: Allow multiple results to be received

This makes use of C99 dynamic arrays. In this performance-sensitive code, I
would like to avoid malloc/free, and I think 15 years after the standard we
might be able to use this feature. Alternatively, we could use the "results"
memory area and store the jobids in the upper range, playing some cast-tricks.
Should work as well.

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/lib/asys/asys.c       |   49 +++++++++++++++++++++++++----------------
 source3/lib/asys/asys.h       |   26 +++++++++++++++-------
 source3/lib/asys/tests.c      |   14 +++++-------
 source3/modules/vfs_default.c |   35 +++++++++++++----------------
 4 files changed, 69 insertions(+), 55 deletions(-)

diff --git a/source3/lib/asys/asys.c b/source3/lib/asys/asys.c
index 1fd7700..906d8cf 100644
--- a/source3/lib/asys/asys.c
+++ b/source3/lib/asys/asys.c
@@ -288,30 +288,41 @@ void asys_cancel(struct asys_context *ctx, void *private_data)
 	}
 }
 
-int asys_result(struct asys_context *ctx, ssize_t *pret, int *perrno,
-		void *pdata)
+int asys_results(struct asys_context *ctx, struct asys_result *results,
+		 unsigned num_results)
 {
-	void **pprivate_data = (void **)pdata;
-	struct asys_job *job;
-	int ret, jobid;
+	int jobids[num_results];
+	int i, ret;
 
-	ret = pthreadpool_finished_jobs(ctx->pool, &jobid, 1);
-	if (ret < 0) {
-		return -ret;
-	}
-	if ((jobid < 0) || (jobid >= ctx->num_jobs)) {
-		return EIO;
+	ret = pthreadpool_finished_jobs(ctx->pool, jobids, num_results);
+	if (ret <= 0) {
+		return ret;
 	}
 
-	job = ctx->jobs[jobid];
+	for (i=0; i<ret; i++) {
+		struct asys_result *result = &results[i];
+		struct asys_job *job;
+		int jobid;
+
+		jobid = jobids[i];
+
+		if ((jobid < 0) || (jobid >= ctx->num_jobs)) {
+			return -EIO;
+		}
+
+		job = ctx->jobs[jobid];
 
-	if (job->canceled) {
-		return ECANCELED;
+		if (job->canceled) {
+			result->ret = -1;
+			result->err = ECANCELED;
+		} else {
+			result->ret = job->ret;
+			result->err = job->err;
+		}
+		result->private_data = job->private_data;
+
+		job->busy = 0;
 	}
 
-	*pret = job->ret;
-	*perrno = job->err;
-	*pprivate_data = job->private_data;
-	job->busy = 0;
-	return 0;
+	return ret;
 }
diff --git a/source3/lib/asys/asys.h b/source3/lib/asys/asys.h
index 10805bd..7c3dfdf 100644
--- a/source3/lib/asys/asys.h
+++ b/source3/lib/asys/asys.h
@@ -104,18 +104,28 @@ void asys_set_log_fn(struct asys_context *ctx, asys_log_fn fn,
 
 int asys_signalfd(struct asys_context *ctx);
 
+struct asys_result {
+	ssize_t ret;
+	int err;
+	void *private_data;
+};
+
 /**
- * @brief Pull the result from an async operation
+ * @brief Pull the results from async operations
  *
- * Whe the fd returned from asys_signalfd() is readable, an async
- * operation has finished. The result from the async operation can be
- * pulled with asys_result().
+ * Whe the fd returned from asys_signalfd() is readable, one or more async
+ * operations have finished. The result from the async operations can be pulled
+ * with asys_results().
  *
- * @param[in]	ctx	The asys context
- * @return		success: 0, failure: errno
+ * @param[in]	ctx	    The asys context
+ * @param[out]  results     The result strutcts
+ * @param[in]   num_results The length of the results array
+ * @return		    success: >=0, number of finished jobs
+ *                          failure: -errno
  */
-int asys_result(struct asys_context *ctx, ssize_t *pret, int *perrno,
-		void *pdata);
+int asys_results(struct asys_context *ctx, struct asys_result *results,
+		 unsigned num_results);
+
 void asys_cancel(struct asys_context *ctx, void *private_data);
 
 int asys_pread(struct asys_context *ctx, int fildes, void *buf, size_t nbyte,
diff --git a/source3/lib/asys/tests.c b/source3/lib/asys/tests.c
index 354f1bf..e54e3ea 100644
--- a/source3/lib/asys/tests.c
+++ b/source3/lib/asys/tests.c
@@ -64,20 +64,18 @@ int main(int argc, const char *argv[])
 	}
 
 	for (i=0; i<ntasks; i++) {
-		void *priv;
-		ssize_t retval;
-		int err;
+		struct asys_result result;
 		int *pidx;
 
-		ret = asys_result(ctx, &retval, &err, &priv);
-		if (ret == -1) {
-			errno = ret;
+		ret = asys_results(ctx, &result, 1);
+		if (ret < 0) {
+			errno = -ret;
 			perror("asys_result failed");
 			return 1;
 		}
-		pidx = (int *)priv;
+		pidx = (int *)result.private_data;
 
-		printf("%d returned %d\n", *pidx, (int)retval);
+		printf("%d returned %d\n", *pidx, (int)result.ret);
 	}
 
 	ret = asys_context_destroy(ctx);
diff --git a/source3/modules/vfs_default.c b/source3/modules/vfs_default.c
index 7dd9c0c..02ab35b 100644
--- a/source3/modules/vfs_default.c
+++ b/source3/modules/vfs_default.c
@@ -793,44 +793,39 @@ static void vfswrap_asys_finished(struct tevent_context *ev,
 					uint16_t flags, void *p)
 {
 	struct asys_context *asys_ctx = (struct asys_context *)p;
-	struct tevent_req *req;
-	struct vfswrap_asys_state *state;
-	int res;
-	ssize_t ret;
-	int err;
-	void *private_data;
 
 	if ((flags & TEVENT_FD_READ) == 0) {
 		return;
 	}
 
 	while (true) {
-		res = asys_result(asys_ctx, &ret, &err, &private_data);
-		if (res == EINTR || res == EAGAIN) {
+		struct tevent_req *req;
+		struct vfswrap_asys_state *state;
+		struct asys_result result;
+		int res;
+
+		res = asys_results(asys_ctx, &result, 1);
+		if (res < 0) {
+			DEBUG(1, ("asys_result returned %s\n",
+				  strerror(-res)));
 			return;
 		}
-#ifdef EWOULDBLOCK
-		if (res == EWOULDBLOCK) {
-			return;
-		}
-#endif
-
-		if (res == ECANCELED) {
+		if (res == 0) {
 			return;
 		}
 
-		if (res != 0) {
-			DEBUG(1, ("asys_result returned %s\n", strerror(res)));
+		if ((result.ret == -1) && (result.err == ECANCELED)) {
 			return;
 		}
 
-		req = talloc_get_type_abort(private_data, struct tevent_req);
+		req = talloc_get_type_abort(result.private_data,
+					    struct tevent_req);
 		state = tevent_req_data(req, struct vfswrap_asys_state);
 
 		talloc_set_destructor(state, NULL);
 
-		state->ret = ret;
-		state->err = err;
+		state->ret = result.ret;
+		state->err = result.err;
 		tevent_req_defer_callback(req, ev);
 		tevent_req_done(req);
 	}
-- 
1.7.9.5


From af2e973de8aeadaa5ea6cc4a44036923830e9b2e Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Mon, 24 Mar 2014 14:53:36 +0000
Subject: [PATCH 5/5] smbd: Use asys_results

When multiple aio requests finish simultaneously, this saves a few syscalls

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/modules/vfs_default.c |   33 +++++++++++++++------------------
 1 file changed, 15 insertions(+), 18 deletions(-)

diff --git a/source3/modules/vfs_default.c b/source3/modules/vfs_default.c
index 02ab35b..673ebfe 100644
--- a/source3/modules/vfs_default.c
+++ b/source3/modules/vfs_default.c
@@ -793,39 +793,36 @@ static void vfswrap_asys_finished(struct tevent_context *ev,
 					uint16_t flags, void *p)
 {
 	struct asys_context *asys_ctx = (struct asys_context *)p;
+	struct asys_result results[outstanding_aio_calls];
+	int i, ret;
 
 	if ((flags & TEVENT_FD_READ) == 0) {
 		return;
 	}
 
-	while (true) {
+	ret = asys_results(asys_ctx, results, outstanding_aio_calls);
+	if (ret < 0) {
+		DEBUG(1, ("asys_results returned %s\n", strerror(-ret)));
+		return;
+	}
+
+	for (i=0; i<ret; i++) {
+		struct asys_result *result = &results[i];
 		struct tevent_req *req;
 		struct vfswrap_asys_state *state;
-		struct asys_result result;
-		int res;
-
-		res = asys_results(asys_ctx, &result, 1);
-		if (res < 0) {
-			DEBUG(1, ("asys_result returned %s\n",
-				  strerror(-res)));
-			return;
-		}
-		if (res == 0) {
-			return;
-		}
 
-		if ((result.ret == -1) && (result.err == ECANCELED)) {
-			return;
+		if ((result->ret == -1) && (result->err == ECANCELED)) {
+			continue;
 		}
 
-		req = talloc_get_type_abort(result.private_data,
+		req = talloc_get_type_abort(result->private_data,
 					    struct tevent_req);
 		state = tevent_req_data(req, struct vfswrap_asys_state);
 
 		talloc_set_destructor(state, NULL);
 
-		state->ret = result.ret;
-		state->err = result.err;
+		state->ret = result->ret;
+		state->err = result->err;
 		tevent_req_defer_callback(req, ev);
 		tevent_req_done(req);
 	}
-- 
1.7.9.5



More information about the samba-technical mailing list