aio_read template.

Jeremy Allison jra at samba.org
Mon Jun 6 23:45:40 GMT 2005


Volker,

	Here is a template for the way we'd use aio_read within Samba3.
Seems to work well on 2.6 kernels. It's a standalone program that will
copy a file using aio_read. The one thing I haven't coped with is a 
short read from aio_read (not an EOF read, but just reading less bytes
than we asked for). I'm not sure if this is possible with the aio_read
interface but thought I'd just warn you....

Hopefully you'll find this useful.

Jeremy.
-------------- next part --------------
/*
   Sample aio_read code as would be used in Samba3.
   Copyright (C) Jeremy Allison, 2005.
                                                                                                                                                         
   This program is free software; you can redistribute it and/or modify
   it under the terms of the GNU General Public License as published by
   the Free Software Foundation; either version 2 of the License, or
   (at your option) any later version.
                                                                                                                                                         
   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.
                                                                                                                                                         
   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/

#include <stdio.h>
#include <stdlib.h>
#include <sys/types.h>
#include <unistd.h>
#include <errno.h>
#include <signal.h>
#include <fcntl.h>
#include <time.h>
#include <strings.h>
#include <aio.h>

#ifndef MIN
#define MIN(a,b) ((a)<(b)?(a):(b))
#endif
#ifndef MAX
#define MAX(a,b) ((a)>(b)?(a):(b))
#endif

int set_blocking(int fd, int set)
{
	int val;

	if((val = fcntl(fd, F_GETFL, 0)) == -1)
		return -1;
	if(set) /* Turn blocking on - ie. clear nonblock flag */
		val &= ~O_NONBLOCK;
	else
		val |= O_NONBLOCK;
	return fcntl( fd, F_SETFL, val);
}

static pid_t initialised;
static int select_pipe[2];
static volatile unsigned pipe_written, pipe_read;

/*******************************************************************
 Call this from all Samba signal handlers if you want to avoid a 
 nasty signal race condition.
********************************************************************/

void sys_select_signal(void)
{
	char c = 1;
	if (!initialised) {
		return;
	}

	if (pipe_written > pipe_read+256) {
		return;
	}

	if (write(select_pipe[1], &c, 1) == 1) {
		pipe_written++;
	}
}

/*******************************************************************
 Like select() but avoids the signal race using a pipe
 it also guuarantees that fds on return only ever contains bits set
 for file descriptors that were readable.
********************************************************************/

int sys_select(int maxfd, fd_set *readfds, fd_set *writefds, fd_set *errorfds, struct timeval *tval)
{
	int ret, saved_errno;
	fd_set *readfds2, readfds_buf;

	if (initialised != getpid()) {
		pipe(select_pipe);

		/*
		 * These next two lines seem to fix a bug with the Linux
		 * 2.0.x kernel (and probably other UNIXes as well) where
		 * the one byte read below can block even though the
		 * select returned that there is data in the pipe and
		 * the pipe_written variable was incremented. Thanks to
		 * HP for finding this one. JRA.
		 */

		if(set_blocking(select_pipe[0],0)==-1)
			exit(1);
		if(set_blocking(select_pipe[1],0)==-1)
			exit(1);

		initialised = getpid();
	}

	maxfd = MAX(select_pipe[0]+1, maxfd);

	/* If readfds is NULL we need to provide our own set. */
	if (readfds) {
		readfds2 = readfds;
	} else {
		readfds2 = &readfds_buf;
		FD_ZERO(readfds2);
	}
	FD_SET(select_pipe[0], readfds2);

	errno = 0;
	ret = select(maxfd,readfds2,writefds,errorfds,tval);

	if (ret <= 0) {
		FD_ZERO(readfds2);
		if (writefds)
			FD_ZERO(writefds);
		if (errorfds)
			FD_ZERO(errorfds);
	} else if (FD_ISSET(select_pipe[0], readfds2)) {
		char c;
		saved_errno = errno;
		if (read(select_pipe[0], &c, 1) == 1) {
			pipe_read++;
			/* Mark Weaver <mark-clist at npsl.co.uk> pointed out a critical
			   fix to ensure we don't lose signals. We must always
			   return -1 when the select pipe is set, otherwise if another
			   fd is also ready (so ret == 2) then we used to eat the
			   byte in the pipe and lose the signal. JRA.
			*/
			ret = -1;
			errno = EINTR;
		} else {
			FD_CLR(select_pipe[0], readfds2);
			ret--;
			errno = saved_errno;
		}
	}

	return ret;
}

void BlockSignals(int block,int signum)
{
        sigset_t set;
        sigemptyset(&set);
        sigaddset(&set,signum);
        sigprocmask(block?SIG_BLOCK:SIG_UNBLOCK,&set,NULL);
}


#define SIG_AIO_READ_DONE (SIGRTMIN)
#define AIO_PENDING_SIZE 10
static sig_atomic_t signals_received;
static struct aiocb aio_pending_array[AIO_PENDING_SIZE];

static void process_completed_read(int fd_out, char *buf, off_t *p_curr_off, int *p_finished)
{
	struct aiocb a;
	ssize_t nbytes_read;

	BlockSignals(1, SIG_AIO_READ_DONE);
	a = aio_pending_array[0];
	memset(&aio_pending_array[0], '\0', sizeof(a));
        if (signals_received > 1) {
		memmove(&aio_pending_array[0],
			&aio_pending_array[1],
			sizeof(a)*(signals_received-1));
	}
	signals_received--;
	/* now we can receive more signals */
	BlockSignals(0, SIG_AIO_READ_DONE);

	nbytes_read = aio_return(&a);
	if (nbytes_read == -1) {
		fprintf(stderr, "Error in aio_read: %s\n", strerror(errno) );
		exit(1);
	}
	if (nbytes_read == 0) {
		/* End of file. */
		*p_finished = 1;
		return;
	}
	if (pwrite(fd_out, buf, nbytes_read, *p_curr_off) != nbytes_read) {
		fprintf(stderr, "Error in write: %s\n", strerror(errno) );
		exit(1);
	}

	*p_curr_off += nbytes_read;
	printf("New offset = 0x%x\n", *p_curr_off);
}

static void schedule_aio_read(struct aiocb *a, int fd_in, char *buf, size_t bufsize, off_t cur_offset)
{
	memset(a, '\0', sizeof(struct aiocb));
	a->aio_fildes = fd_in;
	a->aio_buf = buf;
	a->aio_nbytes = bufsize;
	a->aio_offset = cur_offset;
	a->aio_sigevent.sigev_notify = SIGEV_SIGNAL;
	a->aio_sigevent.sigev_signo  = SIG_AIO_READ_DONE;
	a->aio_sigevent.sigev_value.sival_ptr = (void *)a;
	aio_read(a);
}

static void signal_handler(int sig, siginfo_t *info, void *unused)
{
	if (signals_received < AIO_PENDING_SIZE - 1) {
		aio_pending_array[signals_received] = *(struct aiocb *)(info->si_value.sival_ptr);
		signals_received++;
	} /* Else signal is lost. */
	sys_select_signal();
}

int main(int argc, char **argv)
{
	int fd_in;
	int fd_out;
	sigset_t io_set;
	struct sigaction sa;
	int finished = 0;
	int schedule_next_read = 1;
	off_t curr_offset = 0;
	struct timeval tval;
	struct aiocb a;
	char buf[4096];

	if (argc != 3) {
		printf("Usage: %s fromfile tofile\n");
		exit(1);
	}
	fd_in = open(argv[1], O_RDONLY);
	if (fd_in == -1) {
		fprintf(stderr, "open of %s failed: %s\n", argv[1], strerror(errno));
		exit(1);
	}

	fd_out = open(argv[2], O_CREAT|O_TRUNC|O_RDWR, 0644);
	if (fd_out == -1) {
		fprintf(stderr, "open of %s failed: %s\n", argv[2], strerror(errno));
		exit(1);
	}

	memset(&sa, '\0', sizeof(struct sigaction));
	sa.sa_sigaction = signal_handler;
	sa.sa_flags = SA_RESTART|SA_SIGINFO;
	//sa.sa_flags = SA_SIGINFO;
	sigemptyset(&sa.sa_mask);
	sigaddset(&sa.sa_mask, SIG_AIO_READ_DONE);
	if (sigaction( SIG_AIO_READ_DONE, &sa, NULL) == -1) {
		fprintf(stderr, "sigaction failed: %s\n", strerror(errno));
		exit(1);
	}

	/* Ensure the signal pipe exists before we start the aio so we don't
	   lose the signal write down the pipe (which we would if the pipe hadn't
	   been created. This isn't a problem in a real smbd. */
	tval.tv_sec = 0;
	tval.tv_usec = 0;
	sys_select(0,NULL,NULL,NULL,&tval);

	while (!finished) {
		int ret;
		if (schedule_next_read) {
			schedule_aio_read(&a, fd_in,buf,sizeof(buf),curr_offset);
			schedule_next_read = 0;
		}
		printf("signals_received = %d\n", signals_received);
		ret = sys_select(0,NULL,NULL,NULL,NULL);
		if (signals_received && ret == -1 && errno == EINTR) {
			process_completed_read(fd_out, buf, &curr_offset, &finished);
			schedule_next_read = 1;
		}
	}

	close(fd_in);
	close(fd_out);
	return 0;
}


More information about the samba-technical mailing list