[Patches] s3:vfs_aio_pthread: make use of pthreadpool_tevent instead of pthreadpool_pipe

Stefan Metzmacher metze at samba.org
Fri Apr 20 09:54:32 UTC 2018


Hi,

here're patches to make use of pthreadpool_tevent_job_send/recv() (much
simpler api) instead of using pthreadpool_pipe.

This means we now have just one thread pool, that's shared for
all async pread, pwrite, fsync and openat() calls, instead of having
an extra pool for openat() with the same possible number of threads.

This comes with a manpage update and a test (which failed when adding
smb_panic() just before the openat() in aio_open_worker(), with and
without the main change)

Please review and push:-)

Thanks!
metze
-------------- next part --------------
From b616bc9fe611601ded84cdff785dcaeb9d241ccc Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Fri, 9 Mar 2018 14:59:31 +0100
Subject: [PATCH 1/4] s3:smbd: call pthreadpool_tevent_init() already in
 smbd_process()

pthreadpool_tevent_init() doesn't start any thread yet, it only
allocates a bit of memory.

It's easier to start this in a central place, so that it's
available to all VFS modules.

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 source3/modules/vfs_default.c | 31 -------------------------------
 source3/smbd/process.c        |  7 +++++++
 2 files changed, 7 insertions(+), 31 deletions(-)

diff --git a/source3/modules/vfs_default.c b/source3/modules/vfs_default.c
index a9c87e4..e2efdab 100644
--- a/source3/modules/vfs_default.c
+++ b/source3/modules/vfs_default.c
@@ -717,19 +717,6 @@ static ssize_t vfswrap_pwrite(vfs_handle_struct *handle, files_struct *fsp, cons
 	return result;
 }
 
-static int vfswrap_init_pool(struct smbd_server_connection *conn)
-{
-	int ret;
-
-	if (conn->pool != NULL) {
-		return 0;
-	}
-
-	ret = pthreadpool_tevent_init(conn, lp_aio_max_threads(),
-				      &conn->pool);
-	return ret;
-}
-
 struct vfswrap_pread_state {
 	ssize_t ret;
 	int fd;
@@ -754,18 +741,12 @@ static struct tevent_req *vfswrap_pread_send(struct vfs_handle_struct *handle,
 {
 	struct tevent_req *req, *subreq;
 	struct vfswrap_pread_state *state;
-	int ret;
 
 	req = tevent_req_create(mem_ctx, &state, struct vfswrap_pread_state);
 	if (req == NULL) {
 		return NULL;
 	}
 
-	ret = vfswrap_init_pool(handle->conn->sconn);
-	if (tevent_req_error(req, ret)) {
-		return tevent_req_post(req, ev);
-	}
-
 	state->ret = -1;
 	state->fd = fsp->fh->fd;
 	state->buf = data;
@@ -878,18 +859,12 @@ static struct tevent_req *vfswrap_pwrite_send(struct vfs_handle_struct *handle,
 {
 	struct tevent_req *req, *subreq;
 	struct vfswrap_pwrite_state *state;
-	int ret;
 
 	req = tevent_req_create(mem_ctx, &state, struct vfswrap_pwrite_state);
 	if (req == NULL) {
 		return NULL;
 	}
 
-	ret = vfswrap_init_pool(handle->conn->sconn);
-	if (tevent_req_error(req, ret)) {
-		return tevent_req_post(req, ev);
-	}
-
 	state->ret = -1;
 	state->fd = fsp->fh->fd;
 	state->buf = data;
@@ -997,18 +972,12 @@ static struct tevent_req *vfswrap_fsync_send(struct vfs_handle_struct *handle,
 {
 	struct tevent_req *req, *subreq;
 	struct vfswrap_fsync_state *state;
-	int ret;
 
 	req = tevent_req_create(mem_ctx, &state, struct vfswrap_fsync_state);
 	if (req == NULL) {
 		return NULL;
 	}
 
-	ret = vfswrap_init_pool(handle->conn->sconn);
-	if (tevent_req_error(req, ret)) {
-		return tevent_req_post(req, ev);
-	}
-
 	state->ret = -1;
 	state->fd = fsp->fh->fd;
 
diff --git a/source3/smbd/process.c b/source3/smbd/process.c
index f992e65..516ac4c 100644
--- a/source3/smbd/process.c
+++ b/source3/smbd/process.c
@@ -42,6 +42,7 @@
 #include "lib/id_cache.h"
 #include "lib/util/sys_rw_data.h"
 #include "system/threads.h"
+#include "lib/pthreadpool/pthreadpool_tevent.h"
 
 /* Internal message queue for deferred opens. */
 struct pending_message_list {
@@ -3927,6 +3928,12 @@ void smbd_process(struct tevent_context *ev_ctx,
 	sconn->ev_ctx = ev_ctx;
 	sconn->msg_ctx = msg_ctx;
 
+	ret = pthreadpool_tevent_init(sconn, lp_aio_max_threads(),
+				      &sconn->pool);
+	if (ret != 0) {
+		exit_server("pthreadpool_tevent_init() failed.");
+	}
+
 	if (lp_server_max_protocol() >= PROTOCOL_SMB2_02) {
 		/*
 		 * We're not making the decision here,
-- 
1.9.1


From b68d372a1b21580aab5796bf26e5297612941953 Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Fri, 20 Apr 2018 11:04:20 +0200
Subject: [PATCH 2/4] docs-xml: rewrite the vfs_aio_pthread manpage to reflect
 the >= 4.0.0 behavior

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 docs-xml/manpages/vfs_aio_pthread.8.xml | 51 +++++++++------------------------
 1 file changed, 14 insertions(+), 37 deletions(-)

diff --git a/docs-xml/manpages/vfs_aio_pthread.8.xml b/docs-xml/manpages/vfs_aio_pthread.8.xml
index 69a8346..c6f291b 100644
--- a/docs-xml/manpages/vfs_aio_pthread.8.xml
+++ b/docs-xml/manpages/vfs_aio_pthread.8.xml
@@ -13,7 +13,7 @@
 
 <refnamediv>
 	<refname>vfs_aio_pthread</refname>
-	<refpurpose>implement async I/O in Samba vfs using a pthread pool</refpurpose>
+	<refpurpose>implement async open in Samba vfs using a pthread pool</refpurpose>
 </refnamediv>
 
 <refsynopsisdiv>
@@ -30,40 +30,18 @@
 	<manvolnum>7</manvolnum></citerefentry> suite.</para>
 
 	<para>The <command>aio_pthread</command> VFS module enables asynchronous
-	I/O for Samba on platforms which have the pthreads API available,
-	without using the Posix AIO interface. Posix AIO can suffer from severe
-	limitations.  For example, on some Linux versions the
-	real-time signals that it uses are broken under heavy load.
-	Other systems only allow AIO when special kernel modules are
-	loaded or only allow a certain system-wide amount of async
-	requests being scheduled. Systems based on glibc (most Linux
-	systems) only allow a single outstanding request per file
-	descriptor which essentially makes Posix AIO useless on systems
-	using the glibc implementation.</para>
-
-	<para>To work around all these limitations, the aio_pthread module
-	was written. It uses a pthread pool instead of the
-	internal Posix AIO interface to allow read and write calls
-	to be process asynchronously. A pthread pool is created
-	which expands dynamically by creating new threads as work is
-	given to it to a maximum of 100 threads per smbd process.
-	To change this limit see the "aio num threads" parameter
-	below. New threads are not created if idle threads are
-	available when a new read or write request is received,
-	the new work is given to an existing idle thread. Threads
-	terminate themselves if idle for one second.
+	opens (for new files) with <smbconfoption name="aio_pthread:aio open">yes</smbconfoption>
+	on platforms which have the pthreads API available,
+	support the openat() syscall and support per thread credentials (modern Linux kernels).
 	</para>
 
-	<para>
-	Note that the smb.conf parameters <command>aio read size</command>
-	and <command>aio write size</command> must also be set appropriately
-	for this module to be active.
-	</para>
+	<para>The module makes use of the global thread pool which uses the
+	<smbconfoption name="aio max threads"/> option.</para>
 
 	<para>This module MUST be listed last in any module stack as
-	the Samba VFS pread/pwrite interface is not thread-safe. This
-	module makes direct pread and pwrite system calls and does
-	NOT call the Samba VFS pread and pwrite interfaces.</para>
+	the Samba VFS open interface is not thread-safe. This
+	module makes direct openat() system calls and does
+	NOT call the Samba VFS open interfaces.</para>
 
 </refsect1>
 
@@ -76,9 +54,8 @@
 <programlisting>
         <smbconfsection name="[cooldata]"/>
 	<smbconfoption name="path">/data/ice</smbconfoption>
-	<smbconfoption name="aio read size">1024</smbconfoption>
-	<smbconfoption name="aio write size">1024</smbconfoption>
 	<smbconfoption name="vfs objects">aio_pthread</smbconfoption>
+	<smbconfoption name="aio_pthread:aio open">yes</smbconfoption>
 </programlisting>
 
 </refsect1>
@@ -89,17 +66,17 @@
 	<variablelist>
 
 		<varlistentry>
-		<term>aio_pthread:aio num threads = INTEGER</term>
+		<term>aio_pthread:aio open = BOOL</term>
 		<listitem>
-		<para>Limit the maximum number of threads per smbd that
-		will be created in the thread pool to service IO requests.
+		<para>Try async opens for creating new files.
 		</para>
-		<para>By default this is set to 100.</para>
+		<para>The default is 'no'.</para>
 		</listitem>
 		</varlistentry>
 
 	</variablelist>
 </refsect1>
+
 <refsect1>
 	<title>VERSION</title>
 
-- 
1.9.1


From c700321cec898971e6873c9a5055af74900480ea Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Fri, 20 Apr 2018 11:27:30 +0200
Subject: [PATCH 3/4] selftest: add some basic testing for aio_pthread

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 selftest/target/Samba3.pm | 6 ++++++
 source3/selftest/tests.py | 3 ++-
 2 files changed, 8 insertions(+), 1 deletion(-)

diff --git a/selftest/target/Samba3.pm b/selftest/target/Samba3.pm
index b93cbc2..6c86701 100755
--- a/selftest/target/Samba3.pm
+++ b/selftest/target/Samba3.pm
@@ -770,6 +770,12 @@ sub setup_simpleserver
 	full_audit:success = none
 	full_audit:failure = none
 
+[vfs_aio_pthread]
+	path = $prefix_abs/share
+	read only = no
+	vfs objects = aio_pthread
+	aio_pthread:aio open = yes
+
 [vfs_aio_fork]
 	path = $prefix_abs/share
         vfs objects = aio_fork
diff --git a/source3/selftest/tests.py b/source3/selftest/tests.py
index a22d5e6..06bda70 100755
--- a/source3/selftest/tests.py
+++ b/source3/selftest/tests.py
@@ -112,8 +112,9 @@ tests = ["MANGLE-ILLEGAL"]
 for t in tests:
     plantestsuite("samba3.smbtorture_s3.plain(%s).%s" % (env, t), env, [os.path.join(samba3srcdir, "script/tests/test_smbtorture_s3.sh"), t, '//$SERVER_IP/mangle_illegal', '$USERNAME', '$PASSWORD', smbtorture3, "", "-l $LOCAL_PATH"])
 
-tests = ["RW1", "RW2", "RW3"]
+tests = ["RW1", "RW2", "RW3", "SMB2-BASIC"]
 for t in tests:
+    plantestsuite("samba3.smbtorture_s3.vfs_aio_pthread(simpleserver).%s" % t, "simpleserver", [os.path.join(samba3srcdir, "script/tests/test_smbtorture_s3.sh"), t, '//$SERVER_IP/vfs_aio_pthread', '$USERNAME', '$PASSWORD', smbtorture3, "", "-l $LOCAL_PATH"])
     plantestsuite("samba3.smbtorture_s3.vfs_aio_fork(simpleserver).%s" % t, "simpleserver", [os.path.join(samba3srcdir, "script/tests/test_smbtorture_s3.sh"), t, '//$SERVER_IP/vfs_aio_fork', '$USERNAME', '$PASSWORD', smbtorture3, "", "-l $LOCAL_PATH"])
 
 posix_tests = ["POSIX", "POSIX-APPEND", "POSIX-SYMLINK-ACL", "POSIX-SYMLINK-EA", "POSIX-OFD-LOCK",
-- 
1.9.1


From de34ef8a25d5c374c1159b3c8664b9ac5ec75fcf Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Fri, 9 Mar 2018 15:02:04 +0100
Subject: [PATCH 4/4] s3:vfs_aio_pthread: make use of pthreadpool_tevent
 instead of pthreadpool_pipe

pthreadpool_tevent provides a much simpler api and avoids an extra
pipe for the completion notification.

This means we now have just one thread pool, that's shared for
all async pread, pwrite, fsync and openat() calls, instead of having
an extra pool for openat() with the same possible number of threads.

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 source3/modules/vfs_aio_pthread.c | 134 ++++++--------------------------------
 1 file changed, 19 insertions(+), 115 deletions(-)

diff --git a/source3/modules/vfs_aio_pthread.c b/source3/modules/vfs_aio_pthread.c
index d78dd11..88794bb 100644
--- a/source3/modules/vfs_aio_pthread.c
+++ b/source3/modules/vfs_aio_pthread.c
@@ -26,54 +26,13 @@
 #include "system/shmem.h"
 #include "smbd/smbd.h"
 #include "smbd/globals.h"
-#include "../lib/pthreadpool/pthreadpool_pipe.h"
+#include "../lib/pthreadpool/pthreadpool_tevent.h"
 #ifdef HAVE_LINUX_FALLOC_H
 #include <linux/falloc.h>
 #endif
 
 #if defined(HAVE_OPENAT) && defined(USE_LINUX_THREAD_CREDENTIALS)
 
-/************************************************************************
- Ensure thread pool is initialized.
-***********************************************************************/
-
-static bool init_aio_threadpool(struct tevent_context *ev_ctx,
-				struct pthreadpool_pipe **pp_pool,
-				void (*completion_fn)(struct tevent_context *,
-						struct tevent_fd *,
-						uint16_t,
-						void *))
-{
-	struct tevent_fd *sock_event = NULL;
-	int ret = 0;
-
-	if (*pp_pool) {
-		return true;
-	}
-
-	ret = pthreadpool_pipe_init(lp_aio_max_threads(), pp_pool);
-	if (ret) {
-		errno = ret;
-		return false;
-	}
-	sock_event = tevent_add_fd(ev_ctx,
-				NULL,
-				pthreadpool_pipe_signal_fd(*pp_pool),
-				TEVENT_FD_READ,
-				completion_fn,
-				NULL);
-	if (sock_event == NULL) {
-		pthreadpool_pipe_destroy(*pp_pool);
-		*pp_pool = NULL;
-		return false;
-	}
-
-	DEBUG(10,("init_aio_threadpool: initialized with up to %d threads\n",
-		  (int)lp_aio_max_threads()));
-
-	return true;
-}
-
 /*
  * We must have openat() to do any thread-based
  * asynchronous opens. We also must be using
@@ -81,19 +40,9 @@ static bool init_aio_threadpool(struct tevent_context *ev_ctx,
  * for now).
  */
 
-/*
- * NB. This threadpool is shared over all
- * instances of this VFS module in this
- * process, as is the current jobid.
- */
-
-static struct pthreadpool_pipe *open_pool;
-static int aio_pthread_open_jobid;
-
 struct aio_open_private_data {
 	struct aio_open_private_data *prev, *next;
 	/* Inputs. */
-	int jobid;
 	int dir_fd;
 	int flags;
 	mode_t mode;
@@ -113,23 +62,6 @@ struct aio_open_private_data {
 static struct aio_open_private_data *open_pd_list;
 
 /************************************************************************
- Find the open private data by jobid.
-***********************************************************************/
-
-static struct aio_open_private_data *find_open_private_data_by_jobid(int jobid)
-{
-	struct aio_open_private_data *opd;
-
-	for (opd = open_pd_list; opd != NULL; opd = opd->next) {
-		if (opd->jobid == jobid) {
-			return opd;
-		}
-	}
-
-	return NULL;
-}
-
-/************************************************************************
  Find the open private data by mid.
 ***********************************************************************/
 
@@ -150,42 +82,24 @@ static struct aio_open_private_data *find_open_private_data_by_mid(uint64_t mid)
  Callback when an open completes.
 ***********************************************************************/
 
-static void aio_open_handle_completion(struct tevent_context *event_ctx,
-				struct tevent_fd *event,
-				uint16_t flags,
-				void *p)
+static void aio_open_handle_completion(struct tevent_req *subreq)
 {
-	struct aio_open_private_data *opd = NULL;
-	int jobid = 0;
+	struct aio_open_private_data *opd =
+		tevent_req_callback_data(subreq,
+		struct aio_open_private_data);
 	int ret;
 	struct smbXsrv_connection *xconn;
 
-	DEBUG(10, ("aio_open_handle_completion called with flags=%d\n",
-		(int)flags));
-
-	if ((flags & TEVENT_FD_READ) == 0) {
-		return;
-	}
-
-	ret = pthreadpool_pipe_finished_jobs(open_pool, &jobid, 1);
-	if (ret != 1) {
+	ret = pthreadpool_tevent_job_recv(subreq);
+	TALLOC_FREE(subreq);
+	if (ret != 0) {
 		smb_panic("aio_open_handle_completion");
 		/* notreached. */
 		return;
 	}
 
-	opd = find_open_private_data_by_jobid(jobid);
-	if (opd == NULL) {
-		DEBUG(0, ("aio_open_handle_completion cannot find jobid %d\n",
-			jobid));
-		smb_panic("aio_open_handle_completion - no jobid");
-		/* notreached. */
-		return;
-	}
-
-	DEBUG(10,("aio_open_handle_completion: jobid %d mid %llu "
+	DEBUG(10,("aio_open_handle_completion: mid %llu "
 		"for file %s/%s completed\n",
-		jobid,
 		(unsigned long long)opd->mid,
 		opd->dname,
 		opd->fname));
@@ -295,7 +209,6 @@ static struct aio_open_private_data *create_private_open_data(const files_struct
 		return NULL;
 	}
 
-	opd->jobid = aio_pthread_open_jobid++;
 	opd->dir_fd = -1;
 	opd->ret_fd = -1;
 	opd->ret_errno = EINPROGRESS;
@@ -354,13 +267,7 @@ static int open_async(const files_struct *fsp,
 			mode_t mode)
 {
 	struct aio_open_private_data *opd = NULL;
-	int ret;
-
-	if (!init_aio_threadpool(fsp->conn->sconn->ev_ctx,
-			&open_pool,
-			aio_open_handle_completion)) {
-		return -1;
-	}
+	struct tevent_req *subreq = NULL;
 
 	opd = create_private_open_data(fsp, flags, mode);
 	if (opd == NULL) {
@@ -368,18 +275,17 @@ static int open_async(const files_struct *fsp,
 		return -1;
 	}
 
-	ret = pthreadpool_pipe_add_job(open_pool,
-				       opd->jobid,
-				       aio_open_worker,
-				       (void *)opd);
-	if (ret) {
-		errno = ret;
+	subreq = pthreadpool_tevent_job_send(opd,
+					     fsp->conn->sconn->ev_ctx,
+					     fsp->conn->sconn->pool,
+					     aio_open_worker, opd);
+	if (subreq == NULL) {
 		return -1;
 	}
+	tevent_req_set_callback(subreq, aio_open_handle_completion, opd);
 
-	DEBUG(5,("open_async: mid %llu jobid %d created for file %s/%s\n",
+	DEBUG(5,("open_async: mid %llu created for file %s/%s\n",
 		(unsigned long long)opd->mid,
-		opd->jobid,
 		opd->dname,
 		opd->fname));
 
@@ -406,10 +312,9 @@ static bool find_completed_open(files_struct *fsp,
 
 	if (opd->in_progress) {
 		DEBUG(0,("find_completed_open: mid %llu "
-			"jobid %d still in progress for "
+			"still in progress for "
 			"file %s/%s. PANIC !\n",
 			(unsigned long long)opd->mid,
-			opd->jobid,
 			opd->dname,
 			opd->fname));
 		/* Disaster ! This is an open timeout. Just panic. */
@@ -423,12 +328,11 @@ static bool find_completed_open(files_struct *fsp,
 
 	DEBUG(5,("find_completed_open: mid %llu returning "
 		"fd = %d, errno = %d (%s) "
-		"jobid (%d) for file %s\n",
+		"for file %s\n",
 		(unsigned long long)opd->mid,
 		opd->ret_fd,
 		opd->ret_errno,
 		strerror(opd->ret_errno),
-		opd->jobid,
 		smb_fname_str_dbg(fsp->fsp_name)));
 
 	/* Now we can free the opd. */
-- 
1.9.1

-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 836 bytes
Desc: OpenPGP digital signature
URL: <http://lists.samba.org/pipermail/samba-technical/attachments/20180420/78abab16/signature.sig>


More information about the samba-technical mailing list