[SCM] Samba Shared Repository - branch master updated

Jeremy Allison jra at samba.org
Thu Mar 27 01:06:04 MDT 2014


The branch, master has been updated
       via  2e2137f smbd: Use asys_results
       via  c35fec8 asys: Allow multiple results to be received
       via  c5d07df pthreadpool: Allow multiple jobs to be received
       via  84aa2dd pthreadpool: Avoid a malloc/free per job
       via  17a60b9 pthreadpool: Add a simple benchmark
      from  01c0299 auth/gensec/spnego: map SPNEGO_REJECT to NT_STATUS_LOGON_FAILURE

http://gitweb.samba.org/?p=samba.git;a=shortlog;h=master


- Log -----------------------------------------------------------------
commit 2e2137f56de2c763b874562868d9879d2ae24fee
Author: Volker Lendecke <vl at samba.org>
Date:   Mon Mar 24 14:53:36 2014 +0000

    smbd: Use asys_results
    
    When multiple aio requests finish simultaneously, this saves a few syscalls
    
    Signed-off-by: Volker Lendecke <vl at samba.org>
    Reviewed-by: Jeremy Allison <jra at samba.org>
    
    Autobuild-User(master): Jeremy Allison <jra at samba.org>
    Autobuild-Date(master): Thu Mar 27 08:05:46 CET 2014 on sn-devel-104

commit c35fec883cf344a269e65670521e2580a91c24aa
Author: Volker Lendecke <vl at samba.org>
Date:   Mon Mar 24 14:36:34 2014 +0000

    asys: Allow multiple results to be received
    
    This makes use of C99 dynamic arrays. In this performance-sensitive code, I
    would like to avoid malloc/free, and I think 15 years after the standard we
    might be able to use this feature. Alternatively, we could use the "results"
    memory area and store the jobids in the upper range, playing some cast-tricks.
    Should work as well.
    
    Signed-off-by: Volker Lendecke <vl at samba.org>
    Reviewed-by: Jeremy Allison <jra at samba.org>

commit c5d07df6abe657ff196266bbfbb376ca7db0968b
Author: Volker Lendecke <vl at samba.org>
Date:   Mon Mar 24 10:39:56 2014 +0000

    pthreadpool: Allow multiple jobs to be received
    
    This can avoid syscalls when multiple jobs are finished simultaneously
    
    Signed-off-by: Volker Lendecke <vl at samba.org>
    Reviewed-by: Jeremy Allison <jra at samba.org>

commit 84aa2ddd861549d6ec8d1ef15f4fd518e03f449b
Author: Volker Lendecke <vl at samba.org>
Date:   Fri Mar 21 17:53:26 2014 +0100

    pthreadpool: Avoid a malloc/free per job
    
    pthreadpool_add_job is in our hottest code path for r/w intensive workloads, so
    we should avoid anything CPU-intensive. pthreadpool used to malloc each job and
    free it in the worker thread. This patch adds a FIFO queue for jobs that helper
    threads copy from, avoiding constant malloc/free. This cuts user space
    CPU in the local-bench-pthreadpool benchmark by roughly 10% on my system.
    
    Signed-off-by: Volker Lendecke <vl at samba.org>
    Reviewed-by: Jeremy Allison <jra at samba.org>

commit 17a60b98db92026ae0b7136a9c8b802bf936423a
Author: Volker Lendecke <vl at samba.org>
Date:   Mon Mar 24 09:40:20 2014 +0000

    pthreadpool: Add a simple benchmark
    
    Signed-off-by: Volker Lendecke <vl at samba.org>
    Reviewed-by: Jeremy Allison <jra at samba.org>

-----------------------------------------------------------------------

Summary of changes:
 source3/lib/asys/asys.c                    |   47 +++++---
 source3/lib/asys/asys.h                    |   26 +++--
 source3/lib/asys/tests.c                   |   14 +--
 source3/lib/fncall.c                       |    2 +-
 source3/lib/pthreadpool/pthreadpool.c      |  164 +++++++++++++++++-----------
 source3/lib/pthreadpool/pthreadpool.h      |   13 ++-
 source3/lib/pthreadpool/pthreadpool_sync.c |   26 +++--
 source3/lib/pthreadpool/tests.c            |    8 +-
 source3/modules/vfs_aio_pthread.c          |    4 +-
 source3/modules/vfs_default.c              |   42 +++----
 source3/torture/bench_pthreadpool.c        |   64 +++++++++++
 source3/torture/proto.h                    |    1 +
 source3/torture/torture.c                  |    1 +
 source3/wscript_build                      |    1 +
 14 files changed, 270 insertions(+), 143 deletions(-)
 create mode 100644 source3/torture/bench_pthreadpool.c


Changeset truncated at 500 lines:

diff --git a/source3/lib/asys/asys.c b/source3/lib/asys/asys.c
index 9937d24..906d8cf 100644
--- a/source3/lib/asys/asys.c
+++ b/source3/lib/asys/asys.c
@@ -288,30 +288,41 @@ void asys_cancel(struct asys_context *ctx, void *private_data)
 	}
 }
 
-int asys_result(struct asys_context *ctx, ssize_t *pret, int *perrno,
-		void *pdata)
+int asys_results(struct asys_context *ctx, struct asys_result *results,
+		 unsigned num_results)
 {
-	void **pprivate_data = (void **)pdata;
-	struct asys_job *job;
-	int ret, jobid;
+	int jobids[num_results];
+	int i, ret;
 
-	ret = pthreadpool_finished_job(ctx->pool, &jobid);
-	if (ret != 0) {
+	ret = pthreadpool_finished_jobs(ctx->pool, jobids, num_results);
+	if (ret <= 0) {
 		return ret;
 	}
-	if ((jobid < 0) || (jobid >= ctx->num_jobs)) {
-		return EIO;
-	}
 
-	job = ctx->jobs[jobid];
+	for (i=0; i<ret; i++) {
+		struct asys_result *result = &results[i];
+		struct asys_job *job;
+		int jobid;
+
+		jobid = jobids[i];
+
+		if ((jobid < 0) || (jobid >= ctx->num_jobs)) {
+			return -EIO;
+		}
+
+		job = ctx->jobs[jobid];
 
-	if (job->canceled) {
-		return ECANCELED;
+		if (job->canceled) {
+			result->ret = -1;
+			result->err = ECANCELED;
+		} else {
+			result->ret = job->ret;
+			result->err = job->err;
+		}
+		result->private_data = job->private_data;
+
+		job->busy = 0;
 	}
 
-	*pret = job->ret;
-	*perrno = job->err;
-	*pprivate_data = job->private_data;
-	job->busy = 0;
-	return 0;
+	return ret;
 }
diff --git a/source3/lib/asys/asys.h b/source3/lib/asys/asys.h
index 10805bd..7c3dfdf 100644
--- a/source3/lib/asys/asys.h
+++ b/source3/lib/asys/asys.h
@@ -104,18 +104,28 @@ void asys_set_log_fn(struct asys_context *ctx, asys_log_fn fn,
 
 int asys_signalfd(struct asys_context *ctx);
 
+struct asys_result {
+	ssize_t ret;
+	int err;
+	void *private_data;
+};
+
 /**
- * @brief Pull the result from an async operation
+ * @brief Pull the results from async operations
  *
- * Whe the fd returned from asys_signalfd() is readable, an async
- * operation has finished. The result from the async operation can be
- * pulled with asys_result().
+ * Whe the fd returned from asys_signalfd() is readable, one or more async
+ * operations have finished. The result from the async operations can be pulled
+ * with asys_results().
  *
- * @param[in]	ctx	The asys context
- * @return		success: 0, failure: errno
+ * @param[in]	ctx	    The asys context
+ * @param[out]  results     The result strutcts
+ * @param[in]   num_results The length of the results array
+ * @return		    success: >=0, number of finished jobs
+ *                          failure: -errno
  */
-int asys_result(struct asys_context *ctx, ssize_t *pret, int *perrno,
-		void *pdata);
+int asys_results(struct asys_context *ctx, struct asys_result *results,
+		 unsigned num_results);
+
 void asys_cancel(struct asys_context *ctx, void *private_data);
 
 int asys_pread(struct asys_context *ctx, int fildes, void *buf, size_t nbyte,
diff --git a/source3/lib/asys/tests.c b/source3/lib/asys/tests.c
index 354f1bf..e54e3ea 100644
--- a/source3/lib/asys/tests.c
+++ b/source3/lib/asys/tests.c
@@ -64,20 +64,18 @@ int main(int argc, const char *argv[])
 	}
 
 	for (i=0; i<ntasks; i++) {
-		void *priv;
-		ssize_t retval;
-		int err;
+		struct asys_result result;
 		int *pidx;
 
-		ret = asys_result(ctx, &retval, &err, &priv);
-		if (ret == -1) {
-			errno = ret;
+		ret = asys_results(ctx, &result, 1);
+		if (ret < 0) {
+			errno = -ret;
 			perror("asys_result failed");
 			return 1;
 		}
-		pidx = (int *)priv;
+		pidx = (int *)result.private_data;
 
-		printf("%d returned %d\n", *pidx, (int)retval);
+		printf("%d returned %d\n", *pidx, (int)result.ret);
 	}
 
 	ret = asys_context_destroy(ctx);
diff --git a/source3/lib/fncall.c b/source3/lib/fncall.c
index 7f728ba..88304d6 100644
--- a/source3/lib/fncall.c
+++ b/source3/lib/fncall.c
@@ -287,7 +287,7 @@ static void fncall_handler(struct tevent_context *ev, struct tevent_fd *fde,
 	int i, num_pending;
 	int job_id;
 
-	if (pthreadpool_finished_job(ctx->pool, &job_id) != 0) {
+	if (pthreadpool_finished_jobs(ctx->pool, &job_id, 1) < 0) {
 		return;
 	}
 
diff --git a/source3/lib/pthreadpool/pthreadpool.c b/source3/lib/pthreadpool/pthreadpool.c
index 654d420..4436ab3 100644
--- a/source3/lib/pthreadpool/pthreadpool.c
+++ b/source3/lib/pthreadpool/pthreadpool.c
@@ -34,7 +34,6 @@
 #include "lib/util/dlinklist.h"
 
 struct pthreadpool_job {
-	struct pthreadpool_job *next;
 	int id;
 	void (*fn)(void *private_data);
 	void *private_data;
@@ -57,9 +56,13 @@ struct pthreadpool {
 	pthread_cond_t condvar;
 
 	/*
-	 * List of work jobs
+	 * Array of jobs
 	 */
-	struct pthreadpool_job *jobs, *last_job;
+	size_t jobs_array_len;
+	struct pthreadpool_job *jobs;
+
+	size_t head;
+	size_t num_jobs;
 
 	/*
 	 * pipe for signalling
@@ -113,9 +116,21 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
 		return ENOMEM;
 	}
 
+	pool->jobs_array_len = 4;
+	pool->jobs = calloc(
+		pool->jobs_array_len, sizeof(struct pthreadpool_job));
+
+	if (pool->jobs == NULL) {
+		free(pool);
+		return ENOMEM;
+	}
+
+	pool->head = pool->num_jobs = 0;
+
 	ret = pipe(pool->sig_pipe);
 	if (ret == -1) {
 		int err = errno;
+		free(pool->jobs);
 		free(pool);
 		return err;
 	}
@@ -124,6 +139,7 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
 	if (ret != 0) {
 		close(pool->sig_pipe[0]);
 		close(pool->sig_pipe[1]);
+		free(pool->jobs);
 		free(pool);
 		return ret;
 	}
@@ -133,12 +149,12 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
 		pthread_mutex_destroy(&pool->mutex);
 		close(pool->sig_pipe[0]);
 		close(pool->sig_pipe[1]);
+		free(pool->jobs);
 		free(pool);
 		return ret;
 	}
 
 	pool->shutdown = 0;
-	pool->jobs = pool->last_job = NULL;
 	pool->num_threads = 0;
 	pool->num_exited = 0;
 	pool->exited = NULL;
@@ -151,6 +167,7 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
 		pthread_mutex_destroy(&pool->mutex);
 		close(pool->sig_pipe[0]);
 		close(pool->sig_pipe[1]);
+		free(pool->jobs);
 		free(pool);
 		return ret;
 	}
@@ -221,14 +238,8 @@ static void pthreadpool_child(void)
 		pool->exited = NULL;
 
 		pool->num_idle = 0;
-
-		while (pool->jobs != NULL) {
-			struct pthreadpool_job *job;
-			job = pool->jobs;
-			pool->jobs = job->next;
-			free(job);
-		}
-		pool->last_job = NULL;
+		pool->head = 0;
+		pool->num_jobs = 0;
 
 		ret = pthread_mutex_unlock(&pool->mutex);
 		assert(ret == 0);
@@ -277,25 +288,26 @@ static void pthreadpool_join_children(struct pthreadpool *pool)
  * Fetch a finished job number from the signal pipe
  */
 
-int pthreadpool_finished_job(struct pthreadpool *pool, int *jobid)
+int pthreadpool_finished_jobs(struct pthreadpool *pool, int *jobids,
+			      unsigned num_jobids)
 {
-	int ret_jobid;
-	ssize_t nread;
+	ssize_t to_read, nread;
 
 	nread = -1;
 	errno = EINTR;
 
+	to_read = sizeof(int) * num_jobids;
+
 	while ((nread == -1) && (errno == EINTR)) {
-		nread = read(pool->sig_pipe[0], &ret_jobid, sizeof(int));
+		nread = read(pool->sig_pipe[0], jobids, to_read);
 	}
 	if (nread == -1) {
-		return errno;
+		return -errno;
 	}
-	if (nread != sizeof(int)) {
-		return EINVAL;
+	if ((nread % sizeof(int)) != 0) {
+		return -EINVAL;
 	}
-	*jobid = ret_jobid;
-	return 0;
+	return nread / sizeof(int);
 }
 
 /*
@@ -311,7 +323,7 @@ int pthreadpool_destroy(struct pthreadpool *pool)
 		return ret;
 	}
 
-	if ((pool->jobs != NULL) || pool->shutdown) {
+	if ((pool->num_jobs != 0) || pool->shutdown) {
 		ret = pthread_mutex_unlock(&pool->mutex);
 		assert(ret == 0);
 		return EBUSY;
@@ -383,6 +395,7 @@ int pthreadpool_destroy(struct pthreadpool *pool)
 	pool->sig_pipe[1] = -1;
 
 	free(pool->exited);
+	free(pool->jobs);
 	free(pool);
 
 	return 0;
@@ -410,6 +423,61 @@ static void pthreadpool_server_exit(struct pthreadpool *pool)
 	pool->num_exited += 1;
 }
 
+static bool pthreadpool_get_job(struct pthreadpool *p,
+				struct pthreadpool_job *job)
+{
+	if (p->num_jobs == 0) {
+		return false;
+	}
+	*job = p->jobs[p->head];
+	p->head = (p->head+1) % p->jobs_array_len;
+	p->num_jobs -= 1;
+	return true;
+}
+
+static bool pthreadpool_put_job(struct pthreadpool *p,
+				int id,
+				void (*fn)(void *private_data),
+				void *private_data)
+{
+	struct pthreadpool_job *job;
+
+	if (p->num_jobs == p->jobs_array_len) {
+		struct pthreadpool_job *tmp;
+		size_t new_len = p->jobs_array_len * 2;
+
+		tmp = realloc(
+			p->jobs, sizeof(struct pthreadpool_job) * new_len);
+		if (tmp == NULL) {
+			return false;
+		}
+		p->jobs = tmp;
+
+		/*
+		 * We just doubled the jobs array. The array implements a FIFO
+		 * queue with a modulo-based wraparound, so we have to memcpy
+		 * the jobs that are logically at the queue end but physically
+		 * before the queue head into the reallocated area. The new
+		 * space starts at the current jobs_array_len, and we have to
+		 * copy everything before the current head job into the new
+		 * area.
+		 */
+		memcpy(&p->jobs[p->jobs_array_len], p->jobs,
+		       sizeof(struct pthreadpool_job) * p->head);
+
+		p->jobs_array_len = new_len;
+	}
+
+	job = &p->jobs[(p->head + p->num_jobs) % p->jobs_array_len];
+	job->id = id;
+	job->fn = fn;
+	job->private_data = private_data;
+
+	p->num_jobs += 1;
+
+	return true;
+}
+
 static void *pthreadpool_server(void *arg)
 {
 	struct pthreadpool *pool = (struct pthreadpool *)arg;
@@ -422,7 +490,7 @@ static void *pthreadpool_server(void *arg)
 
 	while (1) {
 		struct timespec ts;
-		struct pthreadpool_job *job;
+		struct pthreadpool_job job;
 
 		/*
 		 * idle-wait at most 1 second. If nothing happens in that
@@ -432,7 +500,7 @@ static void *pthreadpool_server(void *arg)
 		clock_gettime(CLOCK_REALTIME, &ts);
 		ts.tv_sec += 1;
 
-		while ((pool->jobs == NULL) && (pool->shutdown == 0)) {
+		while ((pool->num_jobs == 0) && (pool->shutdown == 0)) {
 
 			pool->num_idle += 1;
 			res = pthread_cond_timedwait(
@@ -441,7 +509,7 @@ static void *pthreadpool_server(void *arg)
 
 			if (res == ETIMEDOUT) {
 
-				if (pool->jobs == NULL) {
+				if (pool->num_jobs == 0) {
 					/*
 					 * we timed out and still no work for
 					 * us. Exit.
@@ -456,19 +524,9 @@ static void *pthreadpool_server(void *arg)
 			assert(res == 0);
 		}
 
-		job = pool->jobs;
-
-		if (job != NULL) {
+		if (pthreadpool_get_job(pool, &job)) {
 			ssize_t written;
-
-			/*
-			 * Ok, there's work for us to do, remove the job from
-			 * the pthreadpool list
-			 */
-			pool->jobs = job->next;
-			if (pool->last_job == job) {
-				pool->last_job = NULL;
-			}
+			int sig_pipe = pool->sig_pipe[1];
 
 			/*
 			 * Do the work with the mutex unlocked
@@ -477,12 +535,8 @@ static void *pthreadpool_server(void *arg)
 			res = pthread_mutex_unlock(&pool->mutex);
 			assert(res == 0);
 
-			job->fn(job->private_data);
-
-			written = write(pool->sig_pipe[1], &job->id,
-					sizeof(int));
-
-			free(job);
+			job.fn(job.private_data);
+			written = write(sig_pipe, &job.id, sizeof(job.id));
 
 			res = pthread_mutex_lock(&pool->mutex);
 			assert(res == 0);
@@ -494,7 +548,7 @@ static void *pthreadpool_server(void *arg)
 			}
 		}
 
-		if ((pool->jobs == NULL) && (pool->shutdown != 0)) {
+		if ((pool->num_jobs == 0) && (pool->shutdown != 0)) {
 			/*
 			 * No more work to do and we're asked to shut down, so
 			 * exit
@@ -518,24 +572,12 @@ static void *pthreadpool_server(void *arg)
 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
 			void (*fn)(void *private_data), void *private_data)
 {
-	struct pthreadpool_job *job;
 	pthread_t thread_id;
 	int res;
 	sigset_t mask, omask;
 
-	job = (struct pthreadpool_job *)malloc(sizeof(struct pthreadpool_job));
-	if (job == NULL) {
-		return ENOMEM;
-	}
-
-	job->fn = fn;
-	job->private_data = private_data;
-	job->id = job_id;
-	job->next = NULL;
-
 	res = pthread_mutex_lock(&pool->mutex);
 	if (res != 0) {
-		free(job);
 		return res;
 	}
 
@@ -546,7 +588,6 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
 		 */
 		res = pthread_mutex_unlock(&pool->mutex);
 		assert(res == 0);
-		free(job);
 		return EINVAL;
 	}
 
@@ -558,13 +599,10 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
 	/*
 	 * Add job to the end of the queue
 	 */
-	if (pool->jobs == NULL) {
-		pool->jobs = job;
-	}
-	else {
-		pool->last_job->next = job;
+	if (!pthreadpool_put_job(pool, job_id, fn, private_data)) {
+		pthread_mutex_unlock(&pool->mutex);
+		return ENOMEM;
 	}
-	pool->last_job = job;
 
 	if (pool->num_idle > 0) {
 		/*
diff --git a/source3/lib/pthreadpool/pthreadpool.h b/source3/lib/pthreadpool/pthreadpool.h
index fac2d25..adb825a 100644
--- a/source3/lib/pthreadpool/pthreadpool.h
+++ b/source3/lib/pthreadpool/pthreadpool.h
@@ -61,7 +61,7 @@ int pthreadpool_destroy(struct pthreadpool *pool);
  *
  * This adds a job to a pthreadpool. The job can be identified by
  * job_id. This integer will be returned from
- * pthreadpool_finished_job() then the job is completed.
+ * pthreadpool_finished_jobs() then the job is completed.
  *
  * @param[in]	pool		The pool to run the job on
  * @param[in]	job_id		A custom identifier
@@ -84,15 +84,18 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
 int pthreadpool_signal_fd(struct pthreadpool *pool);
 


-- 
Samba Shared Repository


More information about the samba-cvs mailing list