[PATCH] Cache messaging dgm connections

Jeremy Allison jra at samba.org
Mon Sep 12 20:41:26 UTC 2016


On Sun, Sep 11, 2016 at 04:07:15PM +0200, Ralph Böhme wrote:
> Hi!
> 
> Attached is a performance improvement for our dgram messaging.
> 
> In most cases, the current code opens a new connection to peers for
> every message and closes it when done. This patchset adds caching of
> connections to peers.
> 
> Please review & push if ok.

Reviewing this now - should be done by end of day.


> From 02bb6b255e590446e24d8d0d6ee3cf80abdea9d2 Mon Sep 17 00:00:00 2001
> From: Ralph Boehme <slow at samba.org>
> Date: Fri, 9 Sep 2016 07:12:11 +0200
> Subject: [PATCH 1/8] s4/messaging: messaging_dgm_ref talloc hierarchy fix
> 
> Ensure the messaging dgm context goes away *before* the tevent
> context. The messaging dgm context will likely have active fd or timer
> events, their rundown will touch the associated tevent context.
> 
> Otoh, I deliberately don't free the imessaging context here, that's going
> to happen as part of freeing the talloc_autofree_context() as before. I
> think it suffers the same problem, eg imessaging_deregister() works on
> an imessaging_context that might already be freed. But as it works,
> don't change it.
> 
> Signed-off-by: Ralph Boehme <slow at samba.org>
> ---
>  source4/lib/messaging/messaging.c | 29 +++++++++++++++++++++++++++++
>  source4/lib/messaging/messaging.h |  1 +
>  source4/smbd/server.c             |  8 ++++++++
>  3 files changed, 38 insertions(+)
> 
> diff --git a/source4/lib/messaging/messaging.c b/source4/lib/messaging/messaging.c
> index 486d602..2b7d38d 100644
> --- a/source4/lib/messaging/messaging.c
> +++ b/source4/lib/messaging/messaging.c
> @@ -293,6 +293,31 @@ static void imessaging_dgm_recv(const uint8_t *buf, size_t buf_len,
>  				int *fds, size_t num_fds,
>  				void *private_data);
>  
> +/* Keep a list of imessaging contexts */
> +struct imessaging_context *msg_ctxs;
> +
> +static int imessaging_context_destructor(struct imessaging_context *msg)
> +{
> +	DLIST_REMOVE(msg_ctxs, msg);
> +	return 0;
> +}
> +
> +/*
> + * Cleanup messaging dgm contexts
> + *
> + * We must make sure to unref all messaging_dgm_ref's *before* the
> + * tevent context goes away. Only when the last ref is freed, the
> + * refcounted messaging dgm context will be freed.
> + */
> +void imessaging_dgm_unref_all(void)
> +{
> +	struct imessaging_context *msg = NULL;
> +
> +	for (msg = msg_ctxs; msg != NULL; msg = msg->next) {
> +		TALLOC_FREE(msg->msg_dgm_ref);
> +	}
> +}
> +
>  /*
>    create the listening socket and setup the dispatcher
>  */
> @@ -316,6 +341,8 @@ struct imessaging_context *imessaging_init(TALLOC_CTX *mem_ctx,
>  		return NULL;
>  	}
>  
> +	talloc_set_destructor(msg, imessaging_context_destructor);
> +
>  	/* create the messaging directory if needed */
>  
>  	lock_dir = lpcfg_lock_directory(lp_ctx);
> @@ -374,6 +401,8 @@ struct imessaging_context *imessaging_init(TALLOC_CTX *mem_ctx,
>  	imessaging_register(msg, NULL, MSG_IRPC, irpc_handler);
>  	IRPC_REGISTER(msg, irpc, IRPC_UPTIME, irpc_uptime, msg);
>  
> +	DLIST_ADD(msg_ctxs, msg);
> +
>  	return msg;
>  fail:
>  	talloc_free(msg);
> diff --git a/source4/lib/messaging/messaging.h b/source4/lib/messaging/messaging.h
> index 2efab94..3b76b45 100644
> --- a/source4/lib/messaging/messaging.h
> +++ b/source4/lib/messaging/messaging.h
> @@ -44,6 +44,7 @@ struct imessaging_context *imessaging_init(TALLOC_CTX *mem_ctx,
>  					   struct loadparm_context *lp_ctx,
>  					   struct server_id server_id,
>  					   struct tevent_context *ev);
> +void imessaging_dgm_unref_all(void);
>  int imessaging_cleanup(struct imessaging_context *msg);
>  struct imessaging_context *imessaging_client_init(TALLOC_CTX *mem_ctx,
>  					   struct loadparm_context *lp_ctx,
> diff --git a/source4/smbd/server.c b/source4/smbd/server.c
> index 2399f4f..28ecaca 100644
> --- a/source4/smbd/server.c
> +++ b/source4/smbd/server.c
> @@ -284,6 +284,12 @@ static void show_build(void)
>  	exit(0);
>  }
>  
> +static int event_ctx_destructor(struct tevent_context *event_ctx)
> +{
> +	imessaging_dgm_unref_all();
> +	return 0;
> +}
> +
>  /*
>   main server.
>  */
> @@ -422,6 +428,8 @@ static int binary_smbd_main(const char *binary_name, int argc, const char *argv[
>  		exit_daemon("Initializing event context failed", EACCES);
>  	}
>  
> +	talloc_set_destructor(event_ctx, event_ctx_destructor);
> +
>  	if (opt_interactive) {
>  		/* terminate when stdin goes away */
>  		stdin_event_flags = TEVENT_FD_READ;
> -- 
> 2.7.4
> 
> 
> From 10ac950088f9fc8a5f86a5f8425b68a56f977a5d Mon Sep 17 00:00:00 2001
> From: Ralph Boehme <slow at samba.org>
> Date: Fri, 19 Aug 2016 16:14:52 +0200
> Subject: [PATCH 2/8] unix_msg: modify find_send_queue() to take a struct
>  sockaddr_un
> 
> In one of the next commits unix_dgram_send_queue_init() will be moved
> into find_send_queue and that takes a struct sockaddr_un.
> 
> Signed-off-by: Ralph Boehme <slow at samba.org>
> ---
>  source3/lib/unix_msg/unix_msg.c | 6 +++---
>  1 file changed, 3 insertions(+), 3 deletions(-)
> 
> diff --git a/source3/lib/unix_msg/unix_msg.c b/source3/lib/unix_msg/unix_msg.c
> index 5fac68b..8258f6d 100644
> --- a/source3/lib/unix_msg/unix_msg.c
> +++ b/source3/lib/unix_msg/unix_msg.c
> @@ -434,12 +434,12 @@ static void unix_dgram_send_queue_free(struct unix_dgram_send_queue *q)
>  }
>  
>  static struct unix_dgram_send_queue *find_send_queue(
> -	struct unix_dgram_ctx *ctx, const char *dst_sock)
> +	struct unix_dgram_ctx *ctx, const struct sockaddr_un *dst)
>  {
>  	struct unix_dgram_send_queue *s;
>  
>  	for (s = ctx->send_queues; s != NULL; s = s->next) {
> -		if (strcmp(s->path, dst_sock) == 0) {
> +		if (strcmp(s->path, dst->sun_path) == 0) {
>  			return s;
>  		}
>  	}
> @@ -604,7 +604,7 @@ static int unix_dgram_send(struct unix_dgram_ctx *ctx,
>  	 * To preserve message ordering, we have to queue a message when
>  	 * others are waiting in line already.
>  	 */
> -	q = find_send_queue(ctx, dst->sun_path);
> +	q = find_send_queue(ctx, dst);
>  	if (q != NULL) {
>  		return queue_msg(q, iov, iovlen, fds, num_fds);
>  	}
> -- 
> 2.7.4
> 
> 
> From 180f193ea683dbae05e02365fb2ad2f873f46b46 Mon Sep 17 00:00:00 2001
> From: Ralph Boehme <slow at samba.org>
> Date: Thu, 1 Sep 2016 14:04:30 +0200
> Subject: [PATCH 3/8] unix_msg: Return errno from find_send_queue
> 
> Signed-off-by: : Ralph Boehme <slow at samba.org>
> ---
>  source3/lib/unix_msg/unix_msg.c | 14 ++++++++------
>  1 file changed, 8 insertions(+), 6 deletions(-)
> 
> diff --git a/source3/lib/unix_msg/unix_msg.c b/source3/lib/unix_msg/unix_msg.c
> index 8258f6d..17047ae 100644
> --- a/source3/lib/unix_msg/unix_msg.c
> +++ b/source3/lib/unix_msg/unix_msg.c
> @@ -433,17 +433,19 @@ static void unix_dgram_send_queue_free(struct unix_dgram_send_queue *q)
>  	free(q);
>  }
>  
> -static struct unix_dgram_send_queue *find_send_queue(
> -	struct unix_dgram_ctx *ctx, const struct sockaddr_un *dst)
> +static int find_send_queue(struct unix_dgram_ctx *ctx,
> +			   const struct sockaddr_un *dst,
> +			   struct unix_dgram_send_queue **ps)
>  {
>  	struct unix_dgram_send_queue *s;
>  
>  	for (s = ctx->send_queues; s != NULL; s = s->next) {
>  		if (strcmp(s->path, dst->sun_path) == 0) {
> -			return s;
> +			*ps = s;
> +			return 0;
>  		}
>  	}
> -	return NULL;
> +	return ENOENT;
>  }
>  
>  static int queue_msg(struct unix_dgram_send_queue *q,
> @@ -604,8 +606,8 @@ static int unix_dgram_send(struct unix_dgram_ctx *ctx,
>  	 * To preserve message ordering, we have to queue a message when
>  	 * others are waiting in line already.
>  	 */
> -	q = find_send_queue(ctx, dst);
> -	if (q != NULL) {
> +	ret = find_send_queue(ctx, dst, &q);
> +	if (ret == 0) {
>  		return queue_msg(q, iov, iovlen, fds, num_fds);
>  	}
>  
> -- 
> 2.7.4
> 
> 
> From f1fa38d678349ef5fc21dad33a38fa06a847f84e Mon Sep 17 00:00:00 2001
> From: Ralph Boehme <slow at samba.org>
> Date: Thu, 1 Sep 2016 14:08:55 +0200
> Subject: [PATCH 4/8] messaging: Call messaging_dgm_send under become_root only
>  if necessary
> 
> Signed-off-by: Ralph Boehme <slow at samba.org>
> ---
>  source3/lib/messages.c            | 9 +++++++--
>  source4/lib/messaging/messaging.c | 9 +++++++--
>  2 files changed, 14 insertions(+), 4 deletions(-)
> 
> diff --git a/source3/lib/messages.c b/source3/lib/messages.c
> index 12e7dbc..3ed6dfe 100644
> --- a/source3/lib/messages.c
> +++ b/source3/lib/messages.c
> @@ -464,9 +464,14 @@ int messaging_send_iov_from(struct messaging_context *msg_ctx,
>  	iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
>  	memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
>  
> -	become_root();
>  	ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
> -	unbecome_root();
> +
> +	if (ret == EACCES) {
> +		become_root();
> +		ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
> +					 fds, num_fds);
> +		unbecome_root();
> +	}
>  
>  	return ret;
>  }
> diff --git a/source4/lib/messaging/messaging.c b/source4/lib/messaging/messaging.c
> index 2b7d38d..fe052fbe 100644
> --- a/source4/lib/messaging/messaging.c
> +++ b/source4/lib/messaging/messaging.c
> @@ -255,9 +255,14 @@ NTSTATUS imessaging_send(struct imessaging_context *msg, struct server_id server
>  		pid = getpid();
>  	}
>  
> -	priv = root_privileges();
>  	ret = messaging_dgm_send(pid, iov, num_iov, NULL, 0);
> -	TALLOC_FREE(priv);
> +
> +	if (ret == EACCES) {
> +		priv = root_privileges();
> +		ret = messaging_dgm_send(pid, iov, num_iov, NULL, 0);
> +		TALLOC_FREE(priv);
> +	}
> +
>  	if (ret != 0) {
>  		return map_nt_error_from_unix_common(ret);
>  	}
> -- 
> 2.7.4
> 
> 
> From 4c3a5db7327fd997c0241055c34e195eae6fd6fe Mon Sep 17 00:00:00 2001
> From: Ralph Boehme <slow at samba.org>
> Date: Mon, 22 Aug 2016 14:02:43 +0200
> Subject: [PATCH 5/8] unix_msg: add flag to prepare_socket_nonblock()
> 
> This allows prepare_socket_nonblock() to be called to set a socket to
> non-blocking (as before) as well as blocking. This will be used in a
> subsequent commit.
> 
> Signed-off-by: Ralph Boehme <slow at samba.org>
> ---
>  source3/lib/unix_msg/unix_msg.c | 10 +++++++---
>  1 file changed, 7 insertions(+), 3 deletions(-)
> 
> diff --git a/source3/lib/unix_msg/unix_msg.c b/source3/lib/unix_msg/unix_msg.c
> index 17047ae..f86822e 100644
> --- a/source3/lib/unix_msg/unix_msg.c
> +++ b/source3/lib/unix_msg/unix_msg.c
> @@ -80,7 +80,7 @@ static void unix_dgram_recv_handler(struct poll_watch *w, int fd, short events,
>  				    void *private_data);
>  
>  /* Set socket non blocking. */
> -static int prepare_socket_nonblock(int sock)
> +static int prepare_socket_nonblock(int sock, bool nonblock)
>  {
>  	int flags;
>  #ifdef O_NONBLOCK
> @@ -97,7 +97,11 @@ static int prepare_socket_nonblock(int sock)
>  	if (flags == -1) {
>  		return errno;
>  	}
> -	flags |= FLAG_TO_SET;
> +	if (nonblock) {
> +		flags |= FLAG_TO_SET;
> +	} else {
> +		flags &= ~FLAG_TO_SET;
> +	}
>  	if (fcntl(sock, F_SETFL, flags) == -1) {
>  		return errno;
>  	}
> @@ -127,7 +131,7 @@ static int prepare_socket_cloexec(int sock)
>  /* Set socket non blocking and close on exec. */
>  static int prepare_socket(int sock)
>  {
> -	int ret = prepare_socket_nonblock(sock);
> +	int ret = prepare_socket_nonblock(sock, true);
>  
>  	if (ret) {
>  		return ret;
> -- 
> 2.7.4
> 
> 
> From ecbfe52206432718d21fd520ba1060d28fd9528b Mon Sep 17 00:00:00 2001
> From: Ralph Boehme <slow at samba.org>
> Date: Fri, 19 Aug 2016 16:25:11 +0200
> Subject: [PATCH 6/8] unix_msg: introduce send queue caching
> 
> This introduces caching of unix datagram send queues. Right now send
> queues are only created for peers if the channel to the peer is full and
> a send reported EWOULDBLOCK.
> 
> At this stage, performance will actually be slightly worse, because now
> if there's a cached queue for a peer without queued messages, we don't
> attempt direct send anymore until the send queue is removed from the
> cache.
> 
> The next commit will modify unix_msg to always create a send queue with
> the datagram socket in connected mode and again attempt an non-blocking
> send on the connected socket first. Then only if that returns
> EWOULDBLOCK, the send has to go through the threadpool.
> 
> Signed-off-by: Ralph Boehme <slow at samba.org>
> ---
>  source3/lib/unix_msg/unix_msg.c    | 67 ++++++++++++++++++++++++++++++++++++--
>  source3/lib/unix_msg/unix_msg.h    |  2 ++
>  source3/lib/unix_msg/wscript_build |  2 +-
>  3 files changed, 67 insertions(+), 4 deletions(-)
> 
> diff --git a/source3/lib/unix_msg/unix_msg.c b/source3/lib/unix_msg/unix_msg.c
> index f86822e..d78d872 100644
> --- a/source3/lib/unix_msg/unix_msg.c
> +++ b/source3/lib/unix_msg/unix_msg.c
> @@ -26,6 +26,7 @@
>  #include "lib/util/iov_buf.h"
>  #include "lib/util/msghdr.h"
>  #include <fcntl.h>
> +#include "lib/util/time.h"
>  
>  /*
>   * This file implements two abstractions: The "unix_dgram" functions implement
> @@ -51,6 +52,7 @@ struct unix_dgram_send_queue {
>  	struct unix_dgram_ctx *ctx;
>  	int sock;
>  	struct unix_dgram_msg *msgs;
> +	struct poll_timeout *timeout;
>  	char path[];
>  };
>  
> @@ -364,6 +366,8 @@ static int unix_dgram_init_pthreadpool(struct unix_dgram_ctx *ctx)
>  	return 0;
>  }
>  
> +static int unix_dgram_sendq_schedule_free(struct unix_dgram_send_queue *q);
> +
>  static int unix_dgram_send_queue_init(
>  	struct unix_dgram_ctx *ctx, const struct sockaddr_un *dst,
>  	struct unix_dgram_send_queue **result)
> @@ -380,6 +384,7 @@ static int unix_dgram_send_queue_init(
>  	}
>  	q->ctx = ctx;
>  	q->msgs = NULL;
> +	q->timeout = NULL;
>  	memcpy(q->path, dst->sun_path, pathlen);
>  
>  	q->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
> @@ -411,6 +416,12 @@ static int unix_dgram_send_queue_init(
>  
>  	DLIST_ADD(ctx->send_queues, q);
>  
> +	ret = unix_dgram_sendq_schedule_free(q);
> +	if (ret != 0) {
> +		err = ENOMEM;
> +		goto fail_close;
> +	}
> +
>  	*result = q;
>  	return 0;
>  
> @@ -434,9 +445,59 @@ static void unix_dgram_send_queue_free(struct unix_dgram_send_queue *q)
>  	}
>  	close(q->sock);
>  	DLIST_REMOVE(ctx->send_queues, q);
> +	ctx->ev_funcs->timeout_free(q->timeout);
>  	free(q);
>  }
>  
> +static void unix_dgram_sendq_scheduled_free_handler(
> +	struct poll_timeout *t, void *private_data);
> +
> +static int unix_dgram_sendq_schedule_free(struct unix_dgram_send_queue *q)
> +{
> +	struct unix_dgram_ctx *ctx = q->ctx;
> +	struct timeval timeout;
> +
> +	if (q->timeout != NULL) {
> +		return 0;
> +	}
> +
> +	GetTimeOfDay(&timeout);
> +	timeout.tv_sec += SENDQ_CACHE_TIME;
> +
> +	q->timeout = ctx->ev_funcs->timeout_new(
> +		ctx->ev_funcs,
> +		timeout,
> +		unix_dgram_sendq_scheduled_free_handler,
> +		q);
> +	if (q->timeout == NULL) {
> +		unix_dgram_send_queue_free(q);
> +		return ENOMEM;
> +	}
> +
> +	return 0;
> +}
> +
> +static void unix_dgram_sendq_scheduled_free_handler(struct poll_timeout *t,
> +						    void *private_data)
> +{
> +	struct unix_dgram_send_queue *q = private_data;
> +	int ret;
> +
> +	q->ctx->ev_funcs->timeout_free(q->timeout);
> +	q->timeout = NULL;
> +
> +	if (q->msgs == NULL) {
> +		unix_dgram_send_queue_free(q);
> +		return;
> +	}
> +
> +	ret = unix_dgram_sendq_schedule_free(q);
> +	if (ret != 0) {
> +		unix_dgram_send_queue_free(q);
> +		return;
> +	}
> +}
> +
>  static int find_send_queue(struct unix_dgram_ctx *ctx,
>  			   const struct sockaddr_un *dst,
>  			   struct unix_dgram_send_queue **ps)
> @@ -555,12 +616,12 @@ static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
>  	if (q->msgs != NULL) {
>  		ret = pthreadpool_pipe_add_job(ctx->send_pool, q->sock,
>  					       unix_dgram_send_job, q->msgs);
> -		if (ret == 0) {
> +		if (ret != 0) {
> +			unix_dgram_send_queue_free(q);
>  			return;
>  		}
> +		return;
>  	}
> -
> -	unix_dgram_send_queue_free(q);
>  }
>  
>  static int unix_dgram_send(struct unix_dgram_ctx *ctx,
> diff --git a/source3/lib/unix_msg/unix_msg.h b/source3/lib/unix_msg/unix_msg.h
> index 34c166b..32afff0 100644
> --- a/source3/lib/unix_msg/unix_msg.h
> +++ b/source3/lib/unix_msg/unix_msg.h
> @@ -116,4 +116,6 @@ int unix_msg_send(struct unix_msg_ctx *ctx, const struct sockaddr_un *dst,
>   */
>  int unix_msg_free(struct unix_msg_ctx *ctx);
>  
> +#define SENDQ_CACHE_TIME 10
> +
>  #endif
> diff --git a/source3/lib/unix_msg/wscript_build b/source3/lib/unix_msg/wscript_build
> index b16d52c..469f87e 100644
> --- a/source3/lib/unix_msg/wscript_build
> +++ b/source3/lib/unix_msg/wscript_build
> @@ -2,7 +2,7 @@
>  
>  bld.SAMBA3_SUBSYSTEM('UNIX_MSG',
>                       source='unix_msg.c',
> -		     deps='replace PTHREADPOOL iov_buf msghdr')
> +		     deps='replace PTHREADPOOL iov_buf msghdr time-basic')
>  
>  bld.SAMBA3_BINARY('unix_msg_test',
>                    source='tests.c',
> -- 
> 2.7.4
> 
> 
> From 3e486de61eb40e2608e0b2cd5d93130b3233c0c7 Mon Sep 17 00:00:00 2001
> From: Ralph Boehme <slow at samba.org>
> Date: Fri, 19 Aug 2016 09:22:54 +0200
> Subject: [PATCH 7/8] unix_msg: always create a send queue for a peer
> 
> Previously, we only created a send queue for a peer if the initial send
> to the non-blocking non-connected socket reported EWOULDBOCK (because
> the channel was full).
> 
> With this change, we now always create a send queue and use a connected,
> non-blocking datagram socket from the beginning.
> 
> Initially, the socket of the send queue is set to non-blocking mode and
> we attempt a direct send via sendmsg(). If that returns EWOULDBOCK, we
> set the send queue to blocking mode and let the threadpool handle the
> IO.
> 
> When a send queue becomes empty, we set the send queue socket back to
> non-blocking.
> 
> Signed-off-by: Ralph Boehme <slow at samba.org>
> ---
>  source3/lib/unix_msg/unix_msg.c | 58 +++++++++++++++++++++++++++++++----------
>  1 file changed, 44 insertions(+), 14 deletions(-)
> 
> diff --git a/source3/lib/unix_msg/unix_msg.c b/source3/lib/unix_msg/unix_msg.c
> index d78d872..aae2d26 100644
> --- a/source3/lib/unix_msg/unix_msg.c
> +++ b/source3/lib/unix_msg/unix_msg.c
> @@ -393,7 +393,7 @@ static int unix_dgram_send_queue_init(
>  		goto fail_free;
>  	}
>  
> -	err = prepare_socket_cloexec(q->sock);
> +	err = prepare_socket(q->sock);
>  	if (err != 0) {
>  		goto fail_close;
>  	}
> @@ -503,6 +503,7 @@ static int find_send_queue(struct unix_dgram_ctx *ctx,
>  			   struct unix_dgram_send_queue **ps)
>  {
>  	struct unix_dgram_send_queue *s;
> +	int ret;
>  
>  	for (s = ctx->send_queues; s != NULL; s = s->next) {
>  		if (strcmp(s->path, dst->sun_path) == 0) {
> @@ -510,7 +511,12 @@ static int find_send_queue(struct unix_dgram_ctx *ctx,
>  			return 0;
>  		}
>  	}
> -	return ENOENT;
> +	ret = unix_dgram_send_queue_init(ctx, dst, &s);
> +	if (ret != 0) {
> +		return ret;
> +	}
> +	*ps = s;
> +	return 0;
>  }
>  
>  static int queue_msg(struct unix_dgram_send_queue *q,
> @@ -622,6 +628,11 @@ static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
>  		}
>  		return;
>  	}
> +
> +	ret = prepare_socket_nonblock(q->sock, true);
> +	if (ret != 0) {
> +		unix_dgram_send_queue_free(q);
> +	}
>  }
>  
>  static int unix_dgram_send(struct unix_dgram_ctx *ctx,
> @@ -667,12 +678,16 @@ static int unix_dgram_send(struct unix_dgram_ctx *ctx,
>  		return EINVAL;
>  	}
>  
> -	/*
> -	 * To preserve message ordering, we have to queue a message when
> -	 * others are waiting in line already.
> -	 */
>  	ret = find_send_queue(ctx, dst, &q);
> -	if (ret == 0) {
> +	if (ret != 0) {
> +		return ret;
> +	}
> +
> +	if (q->msgs) {
> +		/*
> +		 * To preserve message ordering, we have to queue a
> +		 * message when others are waiting in line already.
> +		 */
>  		return queue_msg(q, iov, iovlen, fds, num_fds);
>  	}
>  
> @@ -681,8 +696,6 @@ static int unix_dgram_send(struct unix_dgram_ctx *ctx,
>  	 */
>  
>  	msg = (struct msghdr) {
> -		.msg_name = discard_const_p(struct sockaddr_un, dst),
> -		.msg_namelen = sizeof(*dst),
>  		.msg_iov = discard_const_p(struct iovec, iov),
>  		.msg_iovlen = iovlen
>  	};
> @@ -696,7 +709,7 @@ static int unix_dgram_send(struct unix_dgram_ctx *ctx,
>  		uint8_t buf[fdlen];
>  		msghdr_prep_fds(&msg, buf, fdlen, fds, num_fds);
>  
> -		ret = sendmsg(ctx->sock, &msg, 0);
> +		ret = sendmsg(q->sock, &msg, 0);
>  	}
>  
>  	if (ret >= 0) {
> @@ -712,11 +725,20 @@ static int unix_dgram_send(struct unix_dgram_ctx *ctx,
>  		return errno;
>  	}
>  
> -	ret = unix_dgram_send_queue_init(ctx, dst, &q);
> +	ret = queue_msg(q, iov, iovlen, fds, num_fds);
>  	if (ret != 0) {
> +		unix_dgram_send_queue_free(q);
>  		return ret;
>  	}
> -	ret = queue_msg(q, iov, iovlen, fds, num_fds);
> +
> +	/*
> +	 * While sending the messages via the pthreadpool, we set the
> +	 * socket back to blocking mode. When the sendqueue becomes
> +	 * empty and we could attempt direct sends again, the
> +	 * finished-jobs-handler of the pthreadpool will set it back
> +	 * to non-blocking.
> +	 */
> +	ret = prepare_socket_nonblock(q->sock, false);
>  	if (ret != 0) {
>  		unix_dgram_send_queue_free(q);
>  		return ret;
> @@ -737,8 +759,16 @@ static int unix_dgram_sock(struct unix_dgram_ctx *ctx)
>  
>  static int unix_dgram_free(struct unix_dgram_ctx *ctx)
>  {
> -	if (ctx->send_queues != NULL) {
> -		return EBUSY;
> +	struct unix_dgram_send_queue *q;
> +
> +	for (q = ctx->send_queues; q != NULL;) {
> +		struct unix_dgram_send_queue *q_next = q->next;
> +
> +		if (q->msgs != NULL) {
> +			return EBUSY;
> +		}
> +		unix_dgram_send_queue_free(q);
> +		q = q_next;
>  	}
>  
>  	if (ctx->send_pool != NULL) {
> -- 
> 2.7.4
> 
> 
> From 7a84bef34ecbc01e30f82119b56fe1a4ac786960 Mon Sep 17 00:00:00 2001
> From: Ralph Boehme <slow at samba.org>
> Date: Fri, 19 Aug 2016 12:02:12 +0200
> Subject: [PATCH 8/8] unix_msg: add a test for dgram socket caching
> 
> Signed-off-by: Ralph Boehme <slow at samba.org>
> ---
>  source3/lib/unix_msg/tests.c | 30 ++++++++++++++++++++++++++++++
>  1 file changed, 30 insertions(+)
> 
> diff --git a/source3/lib/unix_msg/tests.c b/source3/lib/unix_msg/tests.c
> index 9a15f9d..0fb1b99 100644
> --- a/source3/lib/unix_msg/tests.c
> +++ b/source3/lib/unix_msg/tests.c
> @@ -126,6 +126,36 @@ int main(void)
>  
>  	expect_messages(ev, &state, 1);
>  
> +	printf("test send queue caching\n");
> +
> +	/*
> +	 * queues are cached for some time, so this tests sending
> +	 * still works after the cache expires and the queue was
> +	 * freed.
> +	 */
> +	sleep(SENDQ_CACHE_TIME + 1);
> +	ret = tevent_loop_once(ev);
> +	if (ret == -1) {
> +		fprintf(stderr, "tevent_loop_once failed: %s\n",
> +			strerror(errno));
> +		exit(1);
> +	}
> +
> +	msg = random();
> +	iov.iov_base = &msg;
> +	iov.iov_len = sizeof(msg);
> +	state.buf = &msg;
> +	state.buflen = sizeof(msg);
> +
> +	ret = unix_msg_send(ctx1, &addr2, &iov, 1, NULL, 0);
> +	if (ret != 0) {
> +		fprintf(stderr, "unix_msg_send failed: %s\n",
> +			strerror(ret));
> +		return 1;
> +	}
> +
> +	expect_messages(ev, &state, 1);
> +
>  	printf("sending six large, interleaved messages\n");
>  
>  	for (i=0; i<sizeof(buf); i++) {
> -- 
> 2.7.4
> 




More information about the samba-technical mailing list