[SCM] Samba Shared Repository - branch master updated

Jeremy Allison jra at samba.org
Tue Apr 10 18:30:03 MDT 2012


The branch, master has been updated
       via  fe707f6 Add a new module, aio_linux which implements Linux kernel aio support. Docs to follow.
      from  224379b pidl/NDR/Parser: also do range checks on the array size

http://gitweb.samba.org/?p=samba.git;a=shortlog;h=master


- Log -----------------------------------------------------------------
commit fe707f6549292ccb681ccd0c596cbd17525522f3
Author: Jeremy Allison <jra at samba.org>
Date:   Tue Apr 10 15:45:55 2012 -0700

    Add a new module, aio_linux which implements Linux kernel aio support. Docs to follow.
    
    Autobuild-User: Jeremy Allison <jra at samba.org>
    Autobuild-Date: Wed Apr 11 02:29:04 CEST 2012 on sn-devel-104

-----------------------------------------------------------------------

Summary of changes:
 source3/Makefile.in                                |    5 +
 source3/configure.in                               |   15 +
 .../modules/{vfs_aio_pthread.c => vfs_aio_linux.c} |  516 ++++++++++++--------
 source3/modules/wscript_build                      |   10 +
 source3/wscript                                    |   26 +
 5 files changed, 365 insertions(+), 207 deletions(-)
 copy source3/modules/{vfs_aio_pthread.c => vfs_aio_linux.c} (53%)


Changeset truncated at 500 lines:

diff --git a/source3/Makefile.in b/source3/Makefile.in
index e1d8770..ff223d9 100644
--- a/source3/Makefile.in
+++ b/source3/Makefile.in
@@ -873,6 +873,7 @@ VFS_TSMSM_OBJ = modules/vfs_tsmsm.o
 VFS_FILEID_OBJ = modules/vfs_fileid.o
 VFS_AIO_FORK_OBJ = modules/vfs_aio_fork.o
 VFS_AIO_PTHREAD_OBJ = modules/vfs_aio_pthread.o
+VFS_AIO_LINUX_OBJ = modules/vfs_aio_linux.o
 VFS_PREOPEN_OBJ = modules/vfs_preopen.o
 VFS_SYNCOPS_OBJ = modules/vfs_syncops.o
 VFS_ACL_XATTR_OBJ = modules/vfs_acl_xattr.o
@@ -3066,6 +3067,10 @@ bin/aio_pthread. at SHLIBEXT@: $(BINARY_PREREQS) $(VFS_AIO_PTHREAD_OBJ)
 	@echo "Building plugin $@"
 	@$(SHLD_MODULE) $(VFS_AIO_PTHREAD_OBJ)
 
+bin/aio_linux. at SHLIBEXT@: $(BINARY_PREREQS) $(VFS_AIO_LINUX_OBJ)
+	@echo "Building plugin $@"
+	@$(SHLD_MODULE) $(VFS_AIO_LINUX_OBJ)
+
 bin/preopen. at SHLIBEXT@: $(BINARY_PREREQS) $(VFS_PREOPEN_OBJ)
 	@echo "Building plugin $@"
 	@$(SHLD_MODULE) $(VFS_PREOPEN_OBJ)
diff --git a/source3/configure.in b/source3/configure.in
index bf777a1..0470a18 100644
--- a/source3/configure.in
+++ b/source3/configure.in
@@ -5567,6 +5567,20 @@ if test x"$samba_cv_HAVE_AIO" = x"yes"; then
 		x"$samba_cv_msghdr_msg_acctright" = x"yes"; then
 		default_shared_modules="$default_shared_modules vfs_aio_fork"
 	fi
+
+# Check for Linux kernel aio support.
+	case "$host_os" in
+	*linux*)
+	    AC_MSG_CHECKING(for Linux kernel asynchronous io support)
+	    AC_CHECK_LIB(aio,io_submit,
+		[AIO_LIBS="$LIBS -laio";
+		AC_DEFINE(HAVE_LINUX_KERNEL_AIO, 1, Define to 1 if there is support for Linux kernel asynchronous io)],
+		[])
+	    if test x"$ac_cv_lib_aio_io_submit" = x"yes"; then
+		default_shared_modules="$default_shared_modules vfs_aio_linux"
+	    fi
+            ;;
+        esac
 fi
 
 #################################################
@@ -6519,6 +6533,7 @@ SMB_MODULE(vfs_tsmsm, \$(VFS_TSMSM_OBJ), "bin/tsmsm.$SHLIBEXT", VFS)
 SMB_MODULE(vfs_fileid, \$(VFS_FILEID_OBJ), "bin/fileid.$SHLIBEXT", VFS)
 SMB_MODULE(vfs_aio_fork, \$(VFS_AIO_FORK_OBJ), "bin/aio_fork.$SHLIBEXT", VFS)
 SMB_MODULE(vfs_aio_pthread, \$(VFS_AIO_PTHREAD_OBJ), "bin/aio_pthread.$SHLIBEXT", VFS)
+SMB_MODULE(vfs_aio_linux, \$(VFS_AIO_LINUX_OBJ), "bin/aio_linux.$SHLIBEXT", VFS)
 SMB_MODULE(vfs_preopen, \$(VFS_PREOPEN_OBJ), "bin/preopen.$SHLIBEXT", VFS)
 SMB_MODULE(vfs_syncops, \$(VFS_SYNCOPS_OBJ), "bin/syncops.$SHLIBEXT", VFS)
 SMB_MODULE(vfs_zfsacl, \$(VFS_ZFSACL_OBJ), "bin/zfsacl.$SHLIBEXT", VFS)
diff --git a/source3/modules/vfs_aio_pthread.c b/source3/modules/vfs_aio_linux.c
similarity index 53%
copy from source3/modules/vfs_aio_pthread.c
copy to source3/modules/vfs_aio_linux.c
index 1cddea3..f6fa80a 100644
--- a/source3/modules/vfs_aio_pthread.c
+++ b/source3/modules/vfs_aio_linux.c
@@ -1,9 +1,6 @@
 /*
- * Simulate Posix AIO using pthreads.
+ * Simulate Posix AIO using Linux kernel AIO.
  *
- * Based on the aio_fork work from Volker and Volker's pthreadpool library.
- *
- * Copyright (C) Volker Lendecke 2008
  * Copyright (C) Jeremy Allison 2012
  *
  * This program is free software; you can redistribute it and/or modify
@@ -23,111 +20,140 @@
 
 #include "includes.h"
 #include "system/filesys.h"
-#include "system/shmem.h"
 #include "smbd/smbd.h"
 #include "smbd/globals.h"
-#include "lib/pthreadpool/pthreadpool.h"
+#include <sys/eventfd.h>
+#include <libaio.h>
 
 struct aio_extra;
-static struct pthreadpool *pool;
-static int aio_pthread_jobid;
+static int event_fd = -1;
+static io_context_t io_ctx;
+static int aio_linux_requestid;
+static struct io_event *io_recv_events;
+static struct fd_event *aio_read_event;
 
 struct aio_private_data {
 	struct aio_private_data *prev, *next;
-	int jobid;
+	int requestid;
 	SMB_STRUCT_AIOCB *aiocb;
+	struct iocb *event_iocb;
 	ssize_t ret_size;
 	int ret_errno;
 	bool cancelled;
-	bool write_command;
 };
 
 /* List of outstanding requests we have. */
 static struct aio_private_data *pd_list;
 
-static void aio_pthread_handle_completion(struct event_context *event_ctx,
-				struct fd_event *event,
-				uint16 flags,
-				void *p);
+static void aio_linux_handle_completion(struct event_context *event_ctx,
+			struct fd_event *event,
+			uint16 flags,
+			void *p);
+
+/************************************************************************
+ Housekeeping. Cleanup if no activity for 30 seconds.
+***********************************************************************/
+
+static void aio_linux_housekeeping(struct tevent_context *event_ctx,
+                                        struct tevent_timer *te,
+                                        struct timeval now,
+                                        void *private_data)
+{
+	/* Remove this timed event handler. */
+	TALLOC_FREE(te);
 
+	if (pd_list != NULL) {
+		/* Still busy. Look again in 30 seconds. */
+		(void)tevent_add_timer(event_ctx,
+					NULL,
+					timeval_current_ofs(30, 0),
+					aio_linux_housekeeping,
+					NULL);
+		return;
+	}
+
+	/* No activity for 30 seconds. Close out kernel resources. */
+	io_queue_release(io_ctx);
+	memset(&io_ctx, '\0', sizeof(io_ctx));
+
+	if (event_fd != -1) {
+		close(event_fd);
+		event_fd = -1;
+	}
+
+	TALLOC_FREE(aio_read_event);
+	TALLOC_FREE(io_recv_events);
+}
 
 /************************************************************************
- Ensure thread pool is initialized.
+ Ensure event fd and aio context are initialized.
 ***********************************************************************/
 
-static bool init_aio_threadpool(struct vfs_handle_struct *handle)
+static bool init_aio_linux(struct vfs_handle_struct *handle)
 {
-	struct fd_event *sock_event = NULL;
-	int ret = 0;
+	struct tevent_timer *te = NULL;
 
-	if (pool) {
+	if (event_fd != -1) {
+		/* Already initialized. */
 		return true;
 	}
 
-	ret = pthreadpool_init(aio_pending_size, &pool);
-	if (ret) {
-		errno = ret;
-		return false;
+	/* Shedule a shutdown event for 30 seconds from now. */
+	te = tevent_add_timer(server_event_context(),
+				NULL,
+				timeval_current_ofs(30, 0),
+				aio_linux_housekeeping,
+				NULL);
+
+	if (te == NULL) {
+		goto fail;
 	}
-	sock_event = tevent_add_fd(server_event_context(),
+
+	/* Ensure we have enough space for aio_pending_size events. */
+	io_recv_events = talloc_zero_array(NULL,
+				struct io_event,
+				aio_pending_size);
+	if (io_recv_events == NULL) {
+		goto fail;
+	}
+
+	event_fd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
+	if (event_fd == -1) {
+		goto fail;
+	}
+
+	aio_read_event = tevent_add_fd(server_event_context(),
 				NULL,
-				pthreadpool_signal_fd(pool),
+				event_fd,
 				TEVENT_FD_READ,
-				aio_pthread_handle_completion,
+				aio_linux_handle_completion,
 				NULL);
-	if (sock_event == NULL) {
-		pthreadpool_destroy(pool);
-		pool = NULL;
-		return false;
+	if (aio_read_event == NULL) {
+		goto fail;
+	}
+
+	if (io_queue_init(aio_pending_size, &io_ctx)) {
+		goto fail;
 	}
 
-	DEBUG(10,("init_aio_threadpool: initialized with up to %d threads\n",
+	DEBUG(10,("init_aio_linux: initialized with up to %d events\n",
 		  aio_pending_size));
 
 	return true;
-}
 
+  fail:
 
-/************************************************************************
- Worker function - core of the pthread aio engine.
- This is the function that actually does the IO.
-***********************************************************************/
+	DEBUG(10,("init_aio_linux: initialization failed\n"));
 
-static void aio_worker(void *private_data)
-{
-	struct aio_private_data *pd =
-			(struct aio_private_data *)private_data;
-
-	if (pd->write_command) {
-		pd->ret_size = sys_pwrite(pd->aiocb->aio_fildes,
-				(const void *)pd->aiocb->aio_buf,
-				pd->aiocb->aio_nbytes,
-				pd->aiocb->aio_offset);
-		if (pd->ret_size == -1 && errno == ESPIPE) {
-			/* Maintain the fiction that pipes can
-			   be seeked (sought?) on. */
-			pd->ret_size = sys_write(pd->aiocb->aio_fildes,
-					(const void *)pd->aiocb->aio_buf,
-					pd->aiocb->aio_nbytes);
-		}
-	} else {
-		pd->ret_size = sys_pread(pd->aiocb->aio_fildes,
-				(void *)pd->aiocb->aio_buf,
-				pd->aiocb->aio_nbytes,
-				pd->aiocb->aio_offset);
-		if (pd->ret_size == -1 && errno == ESPIPE) {
-			/* Maintain the fiction that pipes can
-			   be seeked (sought?) on. */
-			pd->ret_size = sys_read(pd->aiocb->aio_fildes,
-					(void *)pd->aiocb->aio_buf,
-					pd->aiocb->aio_nbytes);
-		}
-	}
-	if (pd->ret_size == -1) {
-		pd->ret_errno = errno;
-	} else {
-		pd->ret_errno = 0;
-	}
+	TALLOC_FREE(te);
+	TALLOC_FREE(io_recv_events);
+	TALLOC_FREE(aio_read_event);
+	if (event_fd != -1) {
+		close(event_fd);
+		event_fd = -1;
+	}
+	memset(&io_ctx, '\0', sizeof(io_ctx));
+	return false;
 }
 
 /************************************************************************
@@ -151,7 +177,8 @@ static struct aio_private_data *create_private_data(TALLOC_CTX *ctx,
 	if (!pd) {
 		return NULL;
 	}
-	pd->jobid = aio_pthread_jobid++;
+	pd->event_iocb = talloc_zero(pd, struct iocb);
+	pd->requestid = aio_linux_requestid++;
 	pd->aiocb = aiocb;
 	pd->ret_size = -1;
 	pd->ret_errno = EINPROGRESS;
@@ -161,10 +188,10 @@ static struct aio_private_data *create_private_data(TALLOC_CTX *ctx,
 }
 
 /************************************************************************
- Spin off a threadpool (if needed) and initiate a pread call.
+ Initiate an asynchronous pread call.
 ***********************************************************************/
 
-static int aio_pthread_read(struct vfs_handle_struct *handle,
+static int aio_linux_read(struct vfs_handle_struct *handle,
 				struct files_struct *fsp,
 				SMB_STRUCT_AIOCB *aiocb)
 {
@@ -172,25 +199,34 @@ static int aio_pthread_read(struct vfs_handle_struct *handle,
 	struct aio_private_data *pd = NULL;
 	int ret;
 
-	if (!init_aio_threadpool(handle)) {
+	if (!init_aio_linux(handle)) {
 		return -1;
 	}
 
 	pd = create_private_data(aio_ex, aiocb);
 	if (pd == NULL) {
-		DEBUG(10, ("aio_pthread_read: Could not create private data.\n"));
+		DEBUG(10, ("aio_linux_read: Could not create private data.\n"));
 		return -1;
 	}
 
-	ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd);
-	if (ret) {
+	io_prep_pread(pd->event_iocb,
+			pd->aiocb->aio_fildes,
+			discard_const(pd->aiocb->aio_buf),
+			pd->aiocb->aio_nbytes,
+			pd->aiocb->aio_offset);
+	io_set_eventfd(pd->event_iocb, event_fd);
+	/* Use the callback pointer as a private data ptr. */
+	io_set_callback(pd->event_iocb, (io_callback_t)pd);
+
+	ret = io_submit(io_ctx, 1, &pd->event_iocb);
+	if (ret < 0) {
 		errno = ret;
 		return -1;
 	}
 
-	DEBUG(10, ("aio_pthread_read: jobid=%d pread requested "
+	DEBUG(10, ("aio_linux_read: requestid=%d read requested "
 		"of %llu bytes at offset %llu\n",
-		pd->jobid,
+		pd->requestid,
 		(unsigned long long)pd->aiocb->aio_nbytes,
 		(unsigned long long)pd->aiocb->aio_offset));
 
@@ -198,10 +234,10 @@ static int aio_pthread_read(struct vfs_handle_struct *handle,
 }
 
 /************************************************************************
- Spin off a threadpool (if needed) and initiate a pwrite call.
+ Initiate an asynchronous pwrite call.
 ***********************************************************************/
 
-static int aio_pthread_write(struct vfs_handle_struct *handle,
+static int aio_linux_write(struct vfs_handle_struct *handle,
 				struct files_struct *fsp,
 				SMB_STRUCT_AIOCB *aiocb)
 {
@@ -209,27 +245,34 @@ static int aio_pthread_write(struct vfs_handle_struct *handle,
 	struct aio_private_data *pd = NULL;
 	int ret;
 
-	if (!init_aio_threadpool(handle)) {
+	if (!init_aio_linux(handle)) {
 		return -1;
 	}
 
 	pd = create_private_data(aio_ex, aiocb);
 	if (pd == NULL) {
-		DEBUG(10, ("aio_pthread_write: Could not create private data.\n"));
+		DEBUG(10, ("aio_linux_write: Could not create private data.\n"));
 		return -1;
 	}
 
-	pd->write_command = true;
+	io_prep_pwrite(pd->event_iocb,
+			pd->aiocb->aio_fildes,
+			discard_const(pd->aiocb->aio_buf),
+			pd->aiocb->aio_nbytes,
+			pd->aiocb->aio_offset);
+	io_set_eventfd(pd->event_iocb, event_fd);
+	/* Use the callback pointer as a private data ptr. */
+	io_set_callback(pd->event_iocb, (io_callback_t)pd);
 
-	ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd);
-	if (ret) {
+	ret = io_submit(io_ctx, 1, &pd->event_iocb);
+	if (ret < 0) {
 		errno = ret;
 		return -1;
 	}
 
-	DEBUG(10, ("aio_pthread_write: jobid=%d pwrite requested "
+	DEBUG(10, ("aio_linux_write: requestid=%d pwrite requested "
 		"of %llu bytes at offset %llu\n",
-		pd->jobid,
+		pd->requestid,
 		(unsigned long long)pd->aiocb->aio_nbytes,
 		(unsigned long long)pd->aiocb->aio_offset));
 
@@ -237,62 +280,92 @@ static int aio_pthread_write(struct vfs_handle_struct *handle,
 }
 
 /************************************************************************
- Find the private data by jobid.
+ Handle a single finished io.
 ***********************************************************************/
 
-static struct aio_private_data *find_private_data_by_jobid(int jobid)
+static void aio_linux_handle_io_finished(struct io_event *ioev)
 {
-	struct aio_private_data *pd;
+	struct aio_extra *aio_ex = NULL;
+	struct aio_private_data *pd = (struct aio_private_data *)ioev->data;
 
-	for (pd = pd_list; pd != NULL; pd = pd->next) {
-		if (pd->jobid == jobid) {
-			return pd;
-		}
+	/* ioev->res2 contains the -errno if error. */
+	/* ioev->res contains the number of bytes sent/received. */
+	if (ioev->res2) {
+		pd->ret_size = -1;
+		pd->ret_errno = -ioev->res2;
+	} else {
+		pd->ret_size = ioev->res;
+		pd->ret_errno = 0;
 	}
 
-	return NULL;
+	aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr;
+	smbd_aio_complete_aio_ex(aio_ex);
+
+	DEBUG(10,("aio_linux_handle_io_finished: requestid %d completed\n",
+		pd->requestid ));
+	TALLOC_FREE(aio_ex);
 }
 
 /************************************************************************
- Callback when an IO completes.
+ Callback when multiple IOs complete.
 ***********************************************************************/
 
-static void aio_pthread_handle_completion(struct event_context *event_ctx,
+static void aio_linux_handle_completion(struct event_context *event_ctx,
 				struct fd_event *event,
 				uint16 flags,
 				void *p)
 {
-	struct aio_extra *aio_ex = NULL;
-	struct aio_private_data *pd = NULL;
-	int jobid = 0;
-	int ret;
+	uint64_t num_events = 0;
 
-	DEBUG(10, ("aio_pthread_handle_completion called with flags=%d\n",
+	DEBUG(10, ("aio_linux_handle_completion called with flags=%d\n",
 			(int)flags));
 
 	if ((flags & EVENT_FD_READ) == 0) {
 		return;
 	}
 
-	ret = pthreadpool_finished_job(pool, &jobid);
-	if (ret) {
-		smb_panic("aio_pthread_handle_completion");
-		return;
+	/* Read the number of events available. */
+	if (sys_read(event_fd, &num_events, sizeof(num_events)) !=
+			sizeof(num_events)) {
+		smb_panic("aio_linux_handle_completion: invalid read");
 	}
 
-	pd = find_private_data_by_jobid(jobid);
-	if (pd == NULL) {
-		DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n",
-			  jobid));
-		return;
-	}
+	while (num_events > 0) {
+		uint64_t events_to_read = MIN(num_events, aio_pending_size);
+		struct timespec ts;
+		int i;
+		int ret;
 
-	aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr;
-	smbd_aio_complete_aio_ex(aio_ex);
+		ts.tv_sec = 0;
+		ts.tv_nsec = 0;
 
-	DEBUG(10,("aio_pthread_handle_completion: jobid %d completed\n",
-		jobid ));
-	TALLOC_FREE(aio_ex);
+		ret = io_getevents(io_ctx,
+			1,
+			(long)events_to_read,
+			io_recv_events,
+			&ts);
+


-- 
Samba Shared Repository


More information about the samba-cvs mailing list