[RFC] Performance improvements

Swen Schillig swen at vnet.ibm.com
Mon Jun 18 08:04:03 UTC 2018


Hi Jeremy

On Fri, 2018-06-15 at 10:38 -0700, Jeremy Allison wrote:
> On Fri, Jun 15, 2018 at 09:38:08AM +0200, Swen Schillig via samba-
> technical wrote:
> > Over the past few months I was creating a good few patches with the
> > goal to improve the Sambas performance in a clustered environment.
> > 
> > The patchset is modifying CTDB-code only but please keep reading...
> > 
> > The result of the changes is a huge reduction of the variation
> > whether
> > it is execution time or maximum number of operations at a time and
> > an overall performance improvement of >10%.
> > 
> > Attached are a few graphs showing the results of tests performed on
> > our
> > internal cluster with an LDX appliance.
> > The first three graphs are the results of one test stressing the
> > directory listing performance and the two SMB2_TP graphs were
> > performed
> > in a separate test using a 4 cluster node with 3000 client
> > connections
> > per node (so in total 12000).
> > 
> > The improvements are achieved by optimizing the memory management,
> > e.g. using memory pools or being a more precise in the memory
> > requirement prediction and using a less iterative approach for
> > calculation. In addition I tried to re-use allocated memory where
> > possible instead of free'ing and re-allocating the same.
> > 
> > Before I dump the code changes on the community I'd like to know
> > if there is any other test I need to perform and if the described
> > results are good enough for you to consider such non-functional
> > changes.
> > 
> > Thanks in advance for your comments and support.
> 
> Hi Swen,
> 
> These graphs look great ! I'd love to see the code please.
> 
> Depending on changes and intrusiveness we might want to
> split it into micro-commits (in case you haven't already
> done so).
> 

Attached is the patchset in question.
Even though the set consists of 10 patches I wouldn't call them all
micro-patches..but they're as small as possible I guess.

I didn't do any re-check about whether they all comply 100% to our
latest coding guidelines but I'm sure the upcoming discussion about the
patches will cover them all.

In addition I have to admit that one or two patches are part of the
series which aren't necessarily required to achieve the results...
E.g. the patch for the tunables which might interfere with Martins and
Amitays changes announced at XP. Another one or two patches might
possibly be integrated into a joined one.

Looking forward to your comments and suggestions.

Cheers Swen

-------------- next part --------------
From 147d80f314417ddda4404da6212266d421bfd40b Mon Sep 17 00:00:00 2001
From: Swen Schillig <swen at vnet.ibm.com>
Date: Wed, 7 Mar 2018 13:54:45 +0100
Subject: [PATCH 01/10] ctdb: calculate queue input buffer size correctly

The queue's input buffer is calculated in an iterative way.
This can result in a few back and forth jumping and
a few memory allocations and mem-free cycles.
This is very time consuming and not required, because the required
memory size can be calculated right away.

Signed-off-by: Swen Schillig <swen at vnet.ibm.com>
---
 ctdb/common/ctdb_io.c | 59 ++++++++++++++++++++++-----------------------------
 1 file changed, 25 insertions(+), 34 deletions(-)

diff --git a/ctdb/common/ctdb_io.c b/ctdb/common/ctdb_io.c
index 3e732e8527d..41915c6ba5d 100644
--- a/ctdb/common/ctdb_io.c
+++ b/ctdb/common/ctdb_io.c
@@ -43,7 +43,6 @@ struct ctdb_buffer {
 	uint8_t *data;
 	uint32_t length;
 	uint32_t size;
-	uint32_t extend;
 };
 
 struct ctdb_queue_pkt {
@@ -103,16 +102,15 @@ static void queue_process(struct ctdb_queue *queue)
 		return;
 	}
 
+	/* Did we at least read the size into the buffer */
 	pkt_size = *(uint32_t *)queue->buffer.data;
 	if (pkt_size == 0) {
 		DEBUG(DEBUG_CRIT, ("Invalid packet of length 0\n"));
 		goto failed;
 	}
 
+	/* the buffer doesn't contain the full packet, return to get the rest */
 	if (queue->buffer.length < pkt_size) {
-		if (pkt_size > queue->buffer_size) {
-			queue->buffer.extend = pkt_size;
-		}
 		return;
 	}
 
@@ -159,10 +157,8 @@ failed:
 */
 static void queue_io_read(struct ctdb_queue *queue)
 {
-	int num_ready = 0;
+	int num_ready;
 	ssize_t nread;
-	uint8_t *data;
-	int navail;
 
 	/* check how much data is available on the socket for immediately
 	   guaranteed nonblocking access.
@@ -178,41 +174,36 @@ static void queue_io_read(struct ctdb_queue *queue)
 		goto failed;
 	}
 
-	if (queue->buffer.data == NULL) {
-		/* starting fresh, allocate buf to read data */
-		queue->buffer.data = talloc_size(queue, queue->buffer_size);
-		if (queue->buffer.data == NULL) {
-			DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", num_ready));
-			goto failed;
-		}
-		queue->buffer.size = queue->buffer_size;
-	} else if (queue->buffer.extend > 0) {
-		/* extending buffer */
-		data = talloc_realloc_size(queue, queue->buffer.data, queue->buffer.extend);
+	if (num_ready > (queue->buffer.size - queue->buffer.length)) {
+		/* new size is
+		 * buffer.size + num_ready - avail
+		 * with avail = buffer.size - buffer.length
+		 * turns into
+		 * buffer.size + num_ready - (buffer.size - buffer.length)
+		 * or
+		 * buffer.size + num_ready - buffer.size + buffer.length
+		 * or shorter
+		 * num_ready + buffer.length
+		 */
+		int new_size = num_ready + queue->buffer.length;
+		uint8_t *data = talloc_realloc_size(queue, queue->buffer.data,
+						    new_size);
+
 		if (data == NULL) {
-			DEBUG(DEBUG_ERR, ("read error realloc failed for %u\n", queue->buffer.extend));
+			D_ERR("read error realloc failed for %u\n", new_size);
 			goto failed;
 		}
 		queue->buffer.data = data;
-		queue->buffer.size = queue->buffer.extend;
-		queue->buffer.extend = 0;
+		queue->buffer.size = new_size;
 	}
 
-	navail = queue->buffer.size - queue->buffer.length;
-	if (num_ready > navail) {
-		num_ready = navail;
-	}
+	nread = sys_read(queue->fd, queue->buffer.data + queue->buffer.length, num_ready);
 
-	if (num_ready > 0) {
-		nread = sys_read(queue->fd,
-				 queue->buffer.data + queue->buffer.length,
-				 num_ready);
-		if (nread <= 0) {
-			DEBUG(DEBUG_ERR, ("read error nread=%d\n", (int)nread));
-			goto failed;
-		}
-		queue->buffer.length += nread;
+	if (nread <= 0) {
+		DEBUG(DEBUG_ERR, ("read error nread=%d\n", (int)nread));
+		goto failed;
 	}
+	queue->buffer.length += nread;
 
 	queue_process(queue);
 	return;
-- 
2.14.4


From 576be16561deab57a05e3ef3f27dad96a8c496a3 Mon Sep 17 00:00:00 2001
From: Swen Schillig <swen at vnet.ibm.com>
Date: Wed, 7 Mar 2018 14:40:33 +0100
Subject: [PATCH 02/10] ctdb: replace talloc / memcpy by talloc_memdup

Signed-off-by: Swen Schillig <swen at vnet.ibm.com>
---
 ctdb/common/ctdb_io.c | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/ctdb/common/ctdb_io.c b/ctdb/common/ctdb_io.c
index 41915c6ba5d..21db7d6f639 100644
--- a/ctdb/common/ctdb_io.c
+++ b/ctdb/common/ctdb_io.c
@@ -115,12 +115,11 @@ static void queue_process(struct ctdb_queue *queue)
 	}
 
 	/* Extract complete packet */
-	data = talloc_size(queue, pkt_size);
+	data = talloc_memdup(queue, queue->buffer.data, pkt_size);
 	if (data == NULL) {
-		DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", pkt_size));
+		D_ERR("read error alloc failed for %u\n", pkt_size);
 		return;
 	}
-	memcpy(data, queue->buffer.data, pkt_size);
 
 	/* Shift packet out from buffer */
 	if (queue->buffer.length > pkt_size) {
-- 
2.14.4


From 7ec5d5b0cddb82b6eabde05086a16802d4f7d2c7 Mon Sep 17 00:00:00 2001
From: Swen Schillig <swen at vnet.ibm.com>
Date: Mon, 12 Mar 2018 11:00:55 +0100
Subject: [PATCH 03/10] ctdb: remove queue destructor as it isn't needed
 anymore

Signed-off-by: Swen Schillig <swen at vnet.ibm.com>
---
 ctdb/common/ctdb_io.c | 13 -------------
 1 file changed, 13 deletions(-)

diff --git a/ctdb/common/ctdb_io.c b/ctdb/common/ctdb_io.c
index 21db7d6f639..4097c656df4 100644
--- a/ctdb/common/ctdb_io.c
+++ b/ctdb/common/ctdb_io.c
@@ -64,7 +64,6 @@ struct ctdb_queue {
 	size_t alignment;
 	void *private_data;
 	ctdb_queue_cb_fn_t callback;
-	bool *destroyed;
 	const char *name;
 	uint32_t buffer_size;
 };
@@ -402,17 +401,6 @@ int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
 	return 0;
 }
 
-/* If someone sets up this pointer, they want to know if the queue is freed */
-static int queue_destructor(struct ctdb_queue *queue)
-{
-	TALLOC_FREE(queue->buffer.data);
-	queue->buffer.length = 0;
-	queue->buffer.size = 0;
-	if (queue->destroyed != NULL)
-		*queue->destroyed = true;
-	return 0;
-}
-
 /*
   setup a packet queue on a socket
  */
@@ -445,7 +433,6 @@ struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
 			return NULL;
 		}
 	}
-	talloc_set_destructor(queue, queue_destructor);
 
 	queue->buffer_size = ctdb->tunable.queue_buffer_size;
 	/* In client code, ctdb->tunable is not initialized.
-- 
2.14.4


From 8195344b558843e27d1a807a683a673a58de0e8c Mon Sep 17 00:00:00 2001
From: Swen Schillig <swen at vnet.ibm.com>
Date: Mon, 12 Mar 2018 16:16:13 +0100
Subject: [PATCH 04/10] ctdb: Introduce buffer.offset to avoid memmove

The memmove operation is quiet expensive, therefore,
a new buffer attribute "offset" is introduced to support
an aoptimized buffer processing.

Signed-off-by: Swen Schillig <swen at vnet.ibm.com>
---
 ctdb/common/ctdb_io.c | 63 ++++++++++++++++++++++++---------------------------
 1 file changed, 30 insertions(+), 33 deletions(-)

diff --git a/ctdb/common/ctdb_io.c b/ctdb/common/ctdb_io.c
index 4097c656df4..a7b072ca20a 100644
--- a/ctdb/common/ctdb_io.c
+++ b/ctdb/common/ctdb_io.c
@@ -43,6 +43,7 @@ struct ctdb_buffer {
 	uint8_t *data;
 	uint32_t length;
 	uint32_t size;
+	uint32_t offset;
 };
 
 struct ctdb_queue_pkt {
@@ -95,17 +96,18 @@ static void queue_process_event(struct tevent_context *ev, struct tevent_immedia
 static void queue_process(struct ctdb_queue *queue)
 {
 	uint32_t pkt_size;
-	uint8_t *data;
+	uint8_t *data = NULL;
 
 	if (queue->buffer.length < sizeof(pkt_size)) {
 		return;
 	}
 
 	/* Did we at least read the size into the buffer */
-	pkt_size = *(uint32_t *)queue->buffer.data;
+	pkt_size = *(uint32_t *)(queue->buffer.data + queue->buffer.offset);
 	if (pkt_size == 0) {
 		DEBUG(DEBUG_CRIT, ("Invalid packet of length 0\n"));
-		goto failed;
+		queue->callback(NULL, 0, queue->private_data);
+		return;
 	}
 
 	/* the buffer doesn't contain the full packet, return to get the rest */
@@ -114,18 +116,16 @@ static void queue_process(struct ctdb_queue *queue)
 	}
 
 	/* Extract complete packet */
-	data = talloc_memdup(queue, queue->buffer.data, pkt_size);
+	data = talloc_memdup(queue,
+			     queue->buffer.data + queue->buffer.offset,
+			     pkt_size);
+
 	if (data == NULL) {
 		D_ERR("read error alloc failed for %u\n", pkt_size);
 		return;
 	}
 
-	/* Shift packet out from buffer */
-	if (queue->buffer.length > pkt_size) {
-		memmove(queue->buffer.data,
-			queue->buffer.data + pkt_size,
-			queue->buffer.length - pkt_size);
-	}
+	queue->buffer.offset += pkt_size;
 	queue->buffer.length -= pkt_size;
 
 	if (queue->buffer.length > 0) {
@@ -135,20 +135,16 @@ static void queue_process(struct ctdb_queue *queue)
 	} else {
 		if (queue->buffer.size > queue->buffer_size) {
 			TALLOC_FREE(queue->buffer.data);
-			queue->buffer.size = 0;
+			memset(&queue->buffer, 0, sizeof(queue->buffer));
+		} else {
+			queue->buffer.offset = 0;
 		}
 	}
 
 	/* It is the responsibility of the callback to free 'data' */
 	queue->callback(data, pkt_size, queue->private_data);
-	return;
-
-failed:
-	queue->callback(NULL, 0, queue->private_data);
-
 }
 
-
 /*
   called when an incoming connection is readable
   This function MUST be safe for reentry via the queue callback!
@@ -157,6 +153,7 @@ static void queue_io_read(struct ctdb_queue *queue)
 {
 	int num_ready;
 	ssize_t nread;
+	struct ctdb_buffer *qb = &queue->buffer;
 
 	/* check how much data is available on the socket for immediately
 	   guaranteed nonblocking access.
@@ -169,10 +166,11 @@ static void queue_io_read(struct ctdb_queue *queue)
 	}
 	if (num_ready == 0) {
 		/* the descriptor has been closed */
-		goto failed;
+		queue->callback(NULL, 0, queue->private_data);
+		return;
 	}
 
-	if (num_ready > (queue->buffer.size - queue->buffer.length)) {
+	if (num_ready > (qb->size - (qb->length + qb->offset))) {
 		/* new size is
 		 * buffer.size + num_ready - avail
 		 * with avail = buffer.size - buffer.length
@@ -183,31 +181,30 @@ static void queue_io_read(struct ctdb_queue *queue)
 		 * or shorter
 		 * num_ready + buffer.length
 		 */
-		int new_size = num_ready + queue->buffer.length;
-		uint8_t *data = talloc_realloc_size(queue, queue->buffer.data,
-						    new_size);
 
-		if (data == NULL) {
+		int new_size = num_ready + qb->length + qb->offset;
+		qb->data = talloc_realloc_size(queue, qb->data, new_size);
+		qb->size = new_size;
+
+		if (qb->data == NULL) {
 			D_ERR("read error realloc failed for %u\n", new_size);
-			goto failed;
+			memset(qb, 0, sizeof(*qb));
+			queue->callback(NULL, 0, queue->private_data);
+			return;
 		}
-		queue->buffer.data = data;
-		queue->buffer.size = new_size;
 	}
 
-	nread = sys_read(queue->fd, queue->buffer.data + queue->buffer.length, num_ready);
+	nread = sys_read(queue->fd, qb->data + qb->length + qb->offset,
+			 num_ready);
 
 	if (nread <= 0) {
 		DEBUG(DEBUG_ERR, ("read error nread=%d\n", (int)nread));
-		goto failed;
+		queue->callback(NULL, 0, queue->private_data);
+		return;
 	}
-	queue->buffer.length += nread;
+	qb->length += nread;
 
 	queue_process(queue);
-	return;
-
-failed:
-	queue->callback(NULL, 0, queue->private_data);
 }
 
 
-- 
2.14.4


From d1f188a4e4bf66075bb27b8076df02ab4b5ba2f4 Mon Sep 17 00:00:00 2001
From: Swen Schillig <swen at vnet.ibm.com>
Date: Mon, 12 Mar 2018 17:56:21 +0100
Subject: [PATCH 05/10] ctdb: Adding memory pool for queue callback

The received packet is copied into a newly allocated memory chunk for further
processing by the assigned callback. Once this is done, the memory is free'd.
This is repeated for each received packet making the memory allocation / free
an expensive task. To optimize this process, a memory pool is defined which
is sized identically to the queue's buffer and is therefore adjustable via
tunable.queue_buffer_size.

Signed-off-by: Swen Schillig <swen at vnet.ibm.com>
---
 ctdb/common/ctdb_io.c | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/ctdb/common/ctdb_io.c b/ctdb/common/ctdb_io.c
index a7b072ca20a..7991cdd5f3e 100644
--- a/ctdb/common/ctdb_io.c
+++ b/ctdb/common/ctdb_io.c
@@ -65,6 +65,7 @@ struct ctdb_queue {
 	size_t alignment;
 	void *private_data;
 	ctdb_queue_cb_fn_t callback;
+	char *callback_data_pool;
 	const char *name;
 	uint32_t buffer_size;
 };
@@ -116,7 +117,7 @@ static void queue_process(struct ctdb_queue *queue)
 	}
 
 	/* Extract complete packet */
-	data = talloc_memdup(queue,
+	data = talloc_memdup(queue->callback_data_pool,
 			     queue->buffer.data + queue->buffer.offset,
 			     pkt_size);
 
@@ -439,5 +440,8 @@ struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
 		queue->buffer_size = 1024;
 	}
 
+	queue->callback_data_pool = talloc_pool(queue, queue->buffer_size);
+	CTDB_NO_MEMORY_NULL(ctdb, queue->callback_data_pool);
+
 	return queue;
 }
-- 
2.14.4


From 67155d49cec4b7ebd17c75b450c9f39bcb1f3958 Mon Sep 17 00:00:00 2001
From: Swen Schillig <swen at vnet.ibm.com>
Date: Wed, 14 Mar 2018 06:57:20 +0100
Subject: [PATCH 06/10] ctdb: Consolidate all ALIGNMENT instructions

Consolidate all ALIGNMENT instructions which are really padding instructions
into one place. This reduces the number of padding-tasks for each
processing-path to one and eases to update to new requirements if they arise.
One of them might be to remove the buffer padding altogether once we're sure
that it is neither required nor beneficial.

Signed-off-by: Swen Schillig <swen at vnet.ibm.com>
---
 ctdb/common/common.h         |  4 +--
 ctdb/common/ctdb_io.c        | 75 +++++++++++++++++---------------------------
 ctdb/ib/ibw_ctdb_init.c      | 10 ------
 ctdb/include/ctdb_private.h  |  1 -
 ctdb/include/ctdb_protocol.h |  3 --
 ctdb/server/ctdb_client.c    | 18 ++++++-----
 ctdb/server/ctdb_daemon.c    | 16 ++++++----
 ctdb/tcp/ctdb_tcp.h          |  2 --
 ctdb/tcp/tcp_connect.c       |  9 ++++--
 ctdb/tcp/tcp_init.c          | 27 +++++-----------
 ctdb/tcp/tcp_io.c            |  6 ----
 11 files changed, 67 insertions(+), 104 deletions(-)

diff --git a/ctdb/common/common.h b/ctdb/common/common.h
index 02bb746c9b3..c1bd77617d5 100644
--- a/ctdb/common/common.h
+++ b/ctdb/common/common.h
@@ -32,10 +32,10 @@ int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length);
 int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd);
 
 struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
-				    TALLOC_CTX *mem_ctx, int fd, int alignment,
+				    TALLOC_CTX *mem_ctx, int fd,
 				    ctdb_queue_cb_fn_t callback,
 				    void *private_data, const char *fmt, ...)
-				    PRINTF_ATTRIBUTE(7,8);
+				    PRINTF_ATTRIBUTE(6,7);
 
 /* From common/ctdb_ltdb.c */
 
diff --git a/ctdb/common/ctdb_io.c b/ctdb/common/ctdb_io.c
index 7991cdd5f3e..0e4bc078cfa 100644
--- a/ctdb/common/ctdb_io.c
+++ b/ctdb/common/ctdb_io.c
@@ -50,7 +50,6 @@ struct ctdb_queue_pkt {
 	struct ctdb_queue_pkt *next, *prev;
 	uint8_t *data;
 	uint32_t length;
-	uint32_t full_length;
 	uint8_t buf[];
 };
 
@@ -62,7 +61,6 @@ struct ctdb_queue {
 	uint32_t out_queue_length;
 	struct tevent_fd *fde;
 	int fd;
-	size_t alignment;
 	void *private_data;
 	ctdb_queue_cb_fn_t callback;
 	char *callback_data_pool;
@@ -70,8 +68,6 @@ struct ctdb_queue {
 	uint32_t buffer_size;
 };
 
-
-
 int ctdb_queue_length(struct ctdb_queue *queue)
 {
 	return queue->out_queue_length;
@@ -226,6 +222,7 @@ static void queue_io_write(struct ctdb_queue *queue)
 	while (queue->out_queue) {
 		struct ctdb_queue_pkt *pkt = queue->out_queue;
 		ssize_t n;
+
 		if (queue->ctdb->flags & CTDB_FLAG_TORTURE) {
 			n = write(queue->fd, pkt->data, 1);
 		} else {
@@ -233,25 +230,30 @@ static void queue_io_write(struct ctdb_queue *queue)
 		}
 
 		if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
-			if (pkt->length != pkt->full_length) {
+			if (pkt->data != pkt->buf) {
 				/* partial packet sent - we have to drop it */
 				DLIST_REMOVE(queue->out_queue, pkt);
 				queue->out_queue_length--;
 				talloc_free(pkt);
 			}
-			talloc_free(queue->fde);
-			queue->fde = NULL;
 			queue->fd = -1;
+			TALLOC_FREE(queue->fde);
 			tevent_schedule_immediate(queue->im, queue->ctdb->ev,
 						  queue_dead, queue);
 			return;
 		}
-		if (n <= 0) return;
-		
+
+		if (n <= 0) {
+			/* if no data could be written or
+			 * any other error was received, return.
+			 */
+			return;
+		}
+
 		if (n != pkt->length) {
 			pkt->length -= n;
 			pkt->data += n;
-			return;
+			continue;
 		}
 
 		DLIST_REMOVE(queue->out_queue, pkt);
@@ -277,7 +279,6 @@ static void queue_io_handler(struct tevent_context *ev, struct tevent_fd *fde,
 	}
 }
 
-
 /*
   queue a packet for sending
 */
@@ -285,67 +286,50 @@ int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
 {
 	struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
 	struct ctdb_queue_pkt *pkt;
-	uint32_t length2, full_length;
 
 	/* If the queue does not have valid fd, no point queueing a packet */
 	if (queue->fd == -1) {
 		return 0;
 	}
 
-	if (queue->alignment) {
-		/* enforce the length and alignment rules from the tcp packet allocator */
-		length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
-		*(uint32_t *)data = length2;
-	} else {
-		length2 = length;
-	}
-
-	if (length2 != length) {
-		memset(data+length, 0, length2-length);
-	}
-
-	full_length = length2;
-	
 	/* if the queue is empty then try an immediate write, avoiding
 	   queue overhead. This relies on non-blocking sockets */
-	if (queue->out_queue == NULL && queue->fd != -1 &&
+	if (queue->out_queue == NULL &&
 	    !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) {
-		ssize_t n = write(queue->fd, data, length2);
+		ssize_t n = write(queue->fd, data, length);
 		if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
-			talloc_free(queue->fde);
-			queue->fde = NULL;
 			queue->fd = -1;
+			TALLOC_FREE(queue->fde);
 			tevent_schedule_immediate(queue->im, queue->ctdb->ev,
 						  queue_dead, queue);
-			/* yes, we report success, as the dead node is 
+			/* yes, we report success, as the dead node is
 			   handled via a separate event */
 			return 0;
 		}
+
 		if (n > 0) {
 			data += n;
-			length2 -= n;
+			length -= n;
+		}
+
+		if (length == 0) {
+			return 0;
 		}
-		if (length2 == 0) return 0;
 	}
 
-	pkt = talloc_size(
-		queue, offsetof(struct ctdb_queue_pkt, buf) + length2);
+	pkt = talloc_size(queue, offsetof(struct ctdb_queue_pkt, buf) + length);
 	CTDB_NO_MEMORY(queue->ctdb, pkt);
+
 	talloc_set_name_const(pkt, "struct ctdb_queue_pkt");
 
 	pkt->data = pkt->buf;
-	memcpy(pkt->data, data, length2);
-
-	pkt->length = length2;
-	pkt->full_length = full_length;
-
-	if (queue->out_queue == NULL && queue->fd != -1) {
-		TEVENT_FD_WRITEABLE(queue->fde);
-	}
+	memcpy(pkt->data, data, length);
+	pkt->length = length;
 
+	queue->out_queue_length++;
 	DLIST_ADD_END(queue->out_queue, pkt);
 
-	queue->out_queue_length++;
+	TEVENT_FD_WRITEABLE(queue->fde);
 
 	if (queue->ctdb->tunable.verbose_memory_names != 0) {
 		switch (hdr->operation) {
@@ -403,7 +387,7 @@ int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
   setup a packet queue on a socket
  */
 struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
-				    TALLOC_CTX *mem_ctx, int fd, int alignment,
+				    TALLOC_CTX *mem_ctx, int fd,
 				    ctdb_queue_cb_fn_t callback,
 				    void *private_data, const char *fmt, ...)
 {
@@ -422,7 +406,6 @@ struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
 
 	queue->ctdb = ctdb;
 	queue->fd = fd;
-	queue->alignment = alignment;
 	queue->private_data = private_data;
 	queue->callback = callback;
 	if (fd != -1) {
diff --git a/ctdb/ib/ibw_ctdb_init.c b/ctdb/ib/ibw_ctdb_init.c
index 7e77ec08031..4e5044e2e86 100644
--- a/ctdb/ib/ibw_ctdb_init.c
+++ b/ctdb/ib/ibw_ctdb_init.c
@@ -191,15 +191,6 @@ static void ctdb_ibw_restart(struct ctdb_node *node)
 	DEBUG(DEBUG_ALERT,("WARNING: method restart is not yet implemented for IB\n"));
 }
 
-/*
- * transport packet allocator - allows transport to control memory for packets
- */
-static void *ctdb_ibw_allocate_pkt(TALLOC_CTX *mem_ctx, size_t size)
-{
-	/* TODO: use ibw_alloc_send_buf instead... */
-	return talloc_size(mem_ctx, size);
-}
-
 #ifdef __NOTDEF__
 
 static int ctdb_ibw_stop(struct ctdb_context *cctx)
@@ -217,7 +208,6 @@ static const struct ctdb_methods ctdb_ibw_methods = {
 	.start     = ctdb_ibw_start,
 	.queue_pkt = ctdb_ibw_queue_pkt,
 	.add_node = ctdb_ibw_add_node,
-	.allocate_pkt = ctdb_ibw_allocate_pkt,
 	.restart      = ctdb_ibw_restart,
 
 //	.stop = ctdb_ibw_stop
diff --git a/ctdb/include/ctdb_private.h b/ctdb/include/ctdb_private.h
index 39e32540e00..6e640d7a581 100644
--- a/ctdb/include/ctdb_private.h
+++ b/ctdb/include/ctdb_private.h
@@ -100,7 +100,6 @@ struct ctdb_methods {
 	int (*add_node)(struct ctdb_node *); /* setup a new node */	
 	int (*connect_node)(struct ctdb_node *); /* connect to node */
 	int (*queue_pkt)(struct ctdb_node *, uint8_t *data, uint32_t length);
-	void *(*allocate_pkt)(TALLOC_CTX *mem_ctx, size_t );
 	void (*shutdown)(struct ctdb_context *); /* shutdown transport */
 	void (*restart)(struct ctdb_node *); /* stop and restart the connection */
 };
diff --git a/ctdb/include/ctdb_protocol.h b/ctdb/include/ctdb_protocol.h
index 7b6014fdff9..34d6066affc 100644
--- a/ctdb/include/ctdb_protocol.h
+++ b/ctdb/include/ctdb_protocol.h
@@ -26,9 +26,6 @@
 /* define ctdb port number */
 #define CTDB_PORT 4379
 
-/* we must align packets to ensure ctdb works on all architectures (eg. sparc) */
-#define CTDB_DS_ALIGNMENT 8
-
 /*
   structure passed to a ctdb call backend function
 */
diff --git a/ctdb/server/ctdb_client.c b/ctdb/server/ctdb_client.c
index 67c89dee3d0..472fe2835d1 100644
--- a/ctdb/server/ctdb_client.c
+++ b/ctdb/server/ctdb_client.c
@@ -41,6 +41,8 @@
 #include "common/common.h"
 #include "common/logging.h"
 
+#include "protocol/protocol_api.h"
+
 /*
   allocate a packet for use in client<->daemon communication
  */
@@ -50,13 +52,12 @@ struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
 					    size_t length, size_t slength,
 					    const char *type)
 {
-	int size;
-	struct ctdb_req_header *hdr;
+	struct ctdb_req_header *hdr = NULL;
 
 	length = MAX(length, slength);
-	size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
 
-	hdr = (struct ctdb_req_header *)talloc_zero_size(mem_ctx, size);
+	ctdb_allocate_pkt(mem_ctx, length, (uint8_t **)&hdr, &length);
+
 	if (hdr == NULL) {
 		DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
 			 operation, (unsigned)length));
@@ -308,9 +309,12 @@ int ctdb_socket_connect(struct ctdb_context *ctdb)
 
 	set_close_on_exec(ctdb->daemon.sd);
 
-	ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd,
-					      CTDB_DS_ALIGNMENT,
-					      ctdb_client_read_cb, ctdb, "to-ctdbd");
+	ctdb->daemon.queue = ctdb_queue_setup(ctdb,
+					      ctdb,
+					      ctdb->daemon.sd,
+					      ctdb_client_read_cb,
+					      ctdb,
+					      "to-ctdbd");
 	return 0;
 }
 
diff --git a/ctdb/server/ctdb_daemon.c b/ctdb/server/ctdb_daemon.c
index 3b06972d030..11ef1c7d900 100644
--- a/ctdb/server/ctdb_daemon.c
+++ b/ctdb/server/ctdb_daemon.c
@@ -48,6 +48,8 @@
 #include "common/pidfile.h"
 #include "common/sock_io.h"
 
+#include "protocol/protocol_api.h"
+
 struct ctdb_client_pid_list {
 	struct ctdb_client_pid_list *next, *prev;
 	struct ctdb_context *ctdb;
@@ -987,9 +989,13 @@ static void ctdb_accept_client(struct tevent_context *ev,
 
 	DLIST_ADD(ctdb->client_pids, client_pid);
 
-	client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
-					 ctdb_daemon_read_cb, client,
-					 "client-%u", client->pid);
+	client->queue = ctdb_queue_setup(ctdb,
+					 client,
+					 fd,
+					 ctdb_daemon_read_cb,
+					 client,
+					 "client-%u",
+					 client->pid);
 
 	talloc_set_destructor(client, ctdb_client_destructor);
 	talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
@@ -1406,11 +1412,9 @@ struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
 						 size_t length, size_t slength,
 						 const char *type)
 {
-	int size;
 	struct ctdb_req_header *hdr;
 
 	length = MAX(length, slength);
-	size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
 
 	if (ctdb->methods == NULL) {
 		DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
@@ -1418,7 +1422,7 @@ struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
 		return NULL;
 	}
 
-	hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
+	ctdb_allocate_pkt(mem_ctx, length, (uint8_t **)&hdr, &length);
 	if (hdr == NULL) {
 		DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
 			 operation, (unsigned)length));
diff --git a/ctdb/tcp/ctdb_tcp.h b/ctdb/tcp/ctdb_tcp.h
index 0a998c94da4..d90bbcbbfeb 100644
--- a/ctdb/tcp/ctdb_tcp.h
+++ b/ctdb/tcp/ctdb_tcp.h
@@ -55,6 +55,4 @@ void ctdb_tcp_read_cb(uint8_t *data, size_t cnt, void *args);
 void ctdb_tcp_tnode_cb(uint8_t *data, size_t cnt, void *private_data);
 void ctdb_tcp_stop_connection(struct ctdb_node *node);
 
-#define CTDB_TCP_ALIGNMENT 8
-
 #endif /* _CTDB_TCP_H */
diff --git a/ctdb/tcp/tcp_connect.c b/ctdb/tcp/tcp_connect.c
index 13452a5e83b..b01db1f3645 100644
--- a/ctdb/tcp/tcp_connect.c
+++ b/ctdb/tcp/tcp_connect.c
@@ -284,8 +284,13 @@ static void ctdb_listen_event(struct tevent_context *ev, struct tevent_fd *fde,
 				      strerror(errno)));
 	}
 
-	in->queue = ctdb_queue_setup(ctdb, in, in->fd, CTDB_TCP_ALIGNMENT,
-				     ctdb_tcp_read_cb, in, "ctdbd-%s", ctdb_addr_to_str(&addr));
+	in->queue = ctdb_queue_setup(ctdb,
+				     in,
+				     in->fd,
+				     ctdb_tcp_read_cb,
+				     in,
+				     "ctdbd-%s",
+				     ctdb_addr_to_str(&addr));
 }
 
 
diff --git a/ctdb/tcp/tcp_init.c b/ctdb/tcp/tcp_init.c
index b6083666e18..81e659efb18 100644
--- a/ctdb/tcp/tcp_init.c
+++ b/ctdb/tcp/tcp_init.c
@@ -47,7 +47,7 @@ static int tnode_destructor(struct ctdb_tcp_node *tnode)
 }
 
 /*
-  initialise tcp portion of a ctdb node 
+  initialise tcp portion of a ctdb node
 */
 static int ctdb_tcp_add_node(struct ctdb_node *node)
 {
@@ -59,9 +59,13 @@ static int ctdb_tcp_add_node(struct ctdb_node *node)
 	node->private_data = tnode;
 	talloc_set_destructor(tnode, tnode_destructor);
 
-	tnode->out_queue = ctdb_queue_setup(node->ctdb, node, tnode->fd, CTDB_TCP_ALIGNMENT,
-					    ctdb_tcp_tnode_cb, node, "to-node-%s", node->name);
-	
+	tnode->out_queue = ctdb_queue_setup(node->ctdb,
+					    node,
+					    tnode->fd,
+					    ctdb_tcp_tnode_cb, node,
+					    "to-node-%s",
+					    node->name);
+
 	return 0;
 }
 
@@ -159,27 +163,12 @@ static int ctdb_tcp_start(struct ctdb_context *ctdb)
 	return 0;
 }
 
-
-/*
-  transport packet allocator - allows transport to control memory for packets
-*/
-static void *ctdb_tcp_allocate_pkt(TALLOC_CTX *mem_ctx, size_t size)
-{
-	/* tcp transport needs to round to 8 byte alignment to ensure
-	   that we can use a length header and 64 bit elements in
-	   structures */
-	size = (size+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1);
-	return talloc_size(mem_ctx, size);
-}
-
-
 static const struct ctdb_methods ctdb_tcp_methods = {
 	.initialise   = ctdb_tcp_initialise,
 	.start        = ctdb_tcp_start,
 	.queue_pkt    = ctdb_tcp_queue_pkt,
 	.add_node     = ctdb_tcp_add_node,
 	.connect_node = ctdb_tcp_connect_node,
-	.allocate_pkt = ctdb_tcp_allocate_pkt,
 	.shutdown     = ctdb_tcp_shutdown,
 	.restart      = ctdb_tcp_restart,
 };
diff --git a/ctdb/tcp/tcp_io.c b/ctdb/tcp/tcp_io.c
index 0eb8e25eea3..bad17c9d0f9 100644
--- a/ctdb/tcp/tcp_io.c
+++ b/ctdb/tcp/tcp_io.c
@@ -50,12 +50,6 @@ void ctdb_tcp_read_cb(uint8_t *data, size_t cnt, void *args)
 		goto failed;
 	}
 
-	if (cnt & (CTDB_TCP_ALIGNMENT-1)) {
-		DEBUG(DEBUG_ALERT,(__location__ " Length 0x%x not multiple of alignment\n", 
-			 (unsigned)cnt));
-		goto failed;
-	}
-
 	if (hdr->ctdb_magic != CTDB_MAGIC) {
 		DEBUG(DEBUG_ALERT,(__location__ " Non CTDB packet 0x%x rejected\n", 
 			 hdr->ctdb_magic));
-- 
2.14.4


From 70165cd5ed274b96c4cea0a914cf64a59c188125 Mon Sep 17 00:00:00 2001
From: Swen Schillig <swen at vnet.ibm.com>
Date: Thu, 15 Mar 2018 13:51:34 +0100
Subject: [PATCH 07/10] ctdb: Introduce tunable parameter for queue mempool
 size

Like the queue's buffer size, the queue's mempool, which is used to
allocate temporary memory to process input packet and queue sending packets,
should be configurable externally.

Signed-off-by: Swen Schillig <swen at vnet.ibm.com>
---
 ctdb/common/ctdb_io.c            | 14 ++++++++++----
 ctdb/common/tunable.c            |  2 ++
 ctdb/protocol/protocol.h         |  1 +
 ctdb/protocol/protocol_types.c   |  4 ++++
 ctdb/tests/src/protocol_common.c |  2 ++
 5 files changed, 19 insertions(+), 4 deletions(-)

diff --git a/ctdb/common/ctdb_io.c b/ctdb/common/ctdb_io.c
index 0e4bc078cfa..148e5349051 100644
--- a/ctdb/common/ctdb_io.c
+++ b/ctdb/common/ctdb_io.c
@@ -38,6 +38,8 @@
 #include "common/logging.h"
 #include "common/common.h"
 
+#define DEFAULT_MEMPOOL_SIZE 8192
+
 /* structures for packet queueing - see common/ctdb_io.c */
 struct ctdb_buffer {
 	uint8_t *data;
@@ -63,7 +65,7 @@ struct ctdb_queue {
 	int fd;
 	void *private_data;
 	ctdb_queue_cb_fn_t callback;
-	char *callback_data_pool;
+	char *data_pool;
 	const char *name;
 	uint32_t buffer_size;
 };
@@ -113,7 +115,7 @@ static void queue_process(struct ctdb_queue *queue)
 	}
 
 	/* Extract complete packet */
-	data = talloc_memdup(queue->callback_data_pool,
+	data = talloc_memdup(queue->data_pool,
 			     queue->buffer.data + queue->buffer.offset,
 			     pkt_size);
 
@@ -392,6 +394,7 @@ struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
 				    void *private_data, const char *fmt, ...)
 {
 	struct ctdb_queue *queue;
+	const uint32_t mempool_size = ctdb->tunable.queue_mempool_size;
 	va_list ap;
 
 	queue = talloc_zero(mem_ctx, struct ctdb_queue);
@@ -423,8 +426,11 @@ struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
 		queue->buffer_size = 1024;
 	}
 
-	queue->callback_data_pool = talloc_pool(queue, queue->buffer_size);
-	CTDB_NO_MEMORY_NULL(ctdb, queue->callback_data_pool);
+	queue->data_pool = talloc_pool(queue,
+				       mempool_size ? mempool_size :
+						      DEFAULT_MEMPOOL_SIZE);
+
+	CTDB_NO_MEMORY_NULL(ctdb, queue->data_pool);
 
 	return queue;
 }
diff --git a/ctdb/common/tunable.c b/ctdb/common/tunable.c
index 14f6828bd15..22cda6cb61c 100644
--- a/ctdb/common/tunable.c
+++ b/ctdb/common/tunable.c
@@ -153,6 +153,8 @@ static struct {
 		offsetof(struct ctdb_tunable_list, rec_buffer_size_limit) },
 	{ "QueueBufferSize", 1024, false,
 		offsetof(struct ctdb_tunable_list, queue_buffer_size) },
+	{ "QueueMempoolSize", 8192, false,
+		offsetof(struct ctdb_tunable_list, queue_mempool_size) },
 	{ "IPAllocAlgorithm", 2, false,
 		offsetof(struct ctdb_tunable_list, ip_alloc_algorithm) },
 	{ "AllowMixedVersions", 0, false,
diff --git a/ctdb/protocol/protocol.h b/ctdb/protocol/protocol.h
index cb807e3b939..61967a61048 100644
--- a/ctdb/protocol/protocol.h
+++ b/ctdb/protocol/protocol.h
@@ -654,6 +654,7 @@ struct ctdb_tunable_list {
 	uint32_t lock_processes_per_db;
 	uint32_t rec_buffer_size_limit;
 	uint32_t queue_buffer_size;
+	uint32_t queue_mempool_size;
 	uint32_t ip_alloc_algorithm;
 	uint32_t allow_mixed_versions;
 };
diff --git a/ctdb/protocol/protocol_types.c b/ctdb/protocol/protocol_types.c
index 416d4843b74..ee0270915f5 100644
--- a/ctdb/protocol/protocol_types.c
+++ b/ctdb/protocol/protocol_types.c
@@ -2559,6 +2559,7 @@ size_t ctdb_tunable_list_len(struct ctdb_tunable_list *in)
 		ctdb_uint32_len(&in->lock_processes_per_db) +
 		ctdb_uint32_len(&in->rec_buffer_size_limit) +
 		ctdb_uint32_len(&in->queue_buffer_size) +
+		ctdb_uint32_len(&in->queue_mempool_size) +
 		ctdb_uint32_len(&in->ip_alloc_algorithm) +
 		ctdb_uint32_len(&in->allow_mixed_versions);
 }
@@ -2748,6 +2749,9 @@ void ctdb_tunable_list_push(struct ctdb_tunable_list *in, uint8_t *buf,
 	ctdb_uint32_push(&in->queue_buffer_size, buf+offset, &np);
 	offset += np;
 
+	ctdb_uint32_push(&in->queue_mempool_size, buf+offset, &np);
+	offset += np;
+
 	ctdb_uint32_push(&in->ip_alloc_algorithm, buf+offset, &np);
 	offset += np;
 
diff --git a/ctdb/tests/src/protocol_common.c b/ctdb/tests/src/protocol_common.c
index c06272db6cc..6b25cb8748c 100644
--- a/ctdb/tests/src/protocol_common.c
+++ b/ctdb/tests/src/protocol_common.c
@@ -846,6 +846,7 @@ void fill_ctdb_tunable_list(TALLOC_CTX *mem_ctx, struct ctdb_tunable_list *p)
 	p->lock_processes_per_db = rand32();
 	p->rec_buffer_size_limit = rand32();
 	p->queue_buffer_size = rand32();
+	p->queue_mempool_size = rand32();
 	p->ip_alloc_algorithm = rand32();
 	p->allow_mixed_versions = rand32();
 }
@@ -913,6 +914,7 @@ void verify_ctdb_tunable_list(struct ctdb_tunable_list *p1,
 	assert(p1->lock_processes_per_db == p2->lock_processes_per_db);
 	assert(p1->rec_buffer_size_limit == p2->rec_buffer_size_limit);
 	assert(p1->queue_buffer_size == p2->queue_buffer_size);
+	assert(p1->queue_mempool_size == p2->queue_mempool_size);
 	assert(p1->ip_alloc_algorithm == p2->ip_alloc_algorithm);
 	assert(p1->allow_mixed_versions == p2->allow_mixed_versions);
 }
-- 
2.14.4


From 84e889a3e2610c6c142678ab2ac18c9f1d0c8848 Mon Sep 17 00:00:00 2001
From: Swen Schillig <swen at vnet.ibm.com>
Date: Thu, 15 Mar 2018 13:56:13 +0100
Subject: [PATCH 08/10] ctdb: Increase queue's default buffer size to 8k

Modifying the queue's default buffer size to 8k increasing
the likelyhood that bigger packets can be read wihtout memory re-allocation.
In addition this enables the possibility to receive/store more packets
without the need to always memcopy areas within the buffer.

Signed-off-by: Swen Schillig <swen at vnet.ibm.com>
---
 ctdb/common/tunable.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/ctdb/common/tunable.c b/ctdb/common/tunable.c
index 22cda6cb61c..404d0157047 100644
--- a/ctdb/common/tunable.c
+++ b/ctdb/common/tunable.c
@@ -151,7 +151,7 @@ static struct {
 		offsetof(struct ctdb_tunable_list, lock_processes_per_db) },
 	{ "RecBufferSizeLimit", 1000*1000, false,
 		offsetof(struct ctdb_tunable_list, rec_buffer_size_limit) },
-	{ "QueueBufferSize", 1024, false,
+	{ "QueueBufferSize", 8192, false,
 		offsetof(struct ctdb_tunable_list, queue_buffer_size) },
 	{ "QueueMempoolSize", 8192, false,
 		offsetof(struct ctdb_tunable_list, queue_mempool_size) },
-- 
2.14.4


From 29ff3fa74960778acb3bdd7b0acd5ba278f2f32a Mon Sep 17 00:00:00 2001
From: Swen Schillig <swen at vnet.ibm.com>
Date: Thu, 15 Mar 2018 14:04:52 +0100
Subject: [PATCH 09/10] ctdb: Enabling ctdb_queue_send to use memory from
 data_pool

ctdb_queue_send must queue data packets if it wasn't able to send the data
right away. In order to do this, the temporary memory requirements can now be
fulfilled by allocating memory from the queue's data_pool.

Signed-off-by: Swen Schillig <swen at vnet.ibm.com>
---
 ctdb/common/ctdb_io.c | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/ctdb/common/ctdb_io.c b/ctdb/common/ctdb_io.c
index 148e5349051..b1766ccc2a7 100644
--- a/ctdb/common/ctdb_io.c
+++ b/ctdb/common/ctdb_io.c
@@ -319,7 +319,9 @@ int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
 		}
 	}
 
-	pkt = talloc_size(queue, offsetof(struct ctdb_queue_pkt, buf) + length);
+	pkt = talloc_size(queue->data_pool,
+			  offsetof(struct ctdb_queue_pkt, buf) + length);
+
 	CTDB_NO_MEMORY(queue->ctdb, pkt);
 
 	talloc_set_name_const(pkt, "struct ctdb_queue_pkt");
-- 
2.14.4


From 07afed6dcd88446d62b791e7975bca04d7483f4f Mon Sep 17 00:00:00 2001
From: Swen Schillig <swen at vnet.ibm.com>
Date: Mon, 19 Mar 2018 17:58:46 +0100
Subject: [PATCH 10/10] ctdb: Optimize receive queue processing.

Process all received packets which are in the queue and complete,
instead of triggering an additional event.

Signed-off-by: Swen Schillig <swen at vnet.ibm.com>
---
 ctdb/common/ctdb_io.c | 37 +++++++++++++++----------------------
 1 file changed, 15 insertions(+), 22 deletions(-)

diff --git a/ctdb/common/ctdb_io.c b/ctdb/common/ctdb_io.c
index b1766ccc2a7..f7fb4ba5e7e 100644
--- a/ctdb/common/ctdb_io.c
+++ b/ctdb/common/ctdb_io.c
@@ -75,16 +75,6 @@ int ctdb_queue_length(struct ctdb_queue *queue)
 	return queue->out_queue_length;
 }
 
-static void queue_process(struct ctdb_queue *queue);
-
-static void queue_process_event(struct tevent_context *ev, struct tevent_immediate *im,
-				void *private_data)
-{
-	struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
-
-	queue_process(queue);
-}
-
 /*
  * This function is used to process data in queue buffer.
  *
@@ -97,6 +87,7 @@ static void queue_process(struct ctdb_queue *queue)
 	uint32_t pkt_size;
 	uint8_t *data = NULL;
 
+process_pkt:
 	if (queue->buffer.length < sizeof(pkt_size)) {
 		return;
 	}
@@ -127,21 +118,22 @@ static void queue_process(struct ctdb_queue *queue)
 	queue->buffer.offset += pkt_size;
 	queue->buffer.length -= pkt_size;
 
-	if (queue->buffer.length > 0) {
-		/* There is more data to be processed, schedule an event */
-		tevent_schedule_immediate(queue->im, queue->ctdb->ev,
-					  queue_process_event, queue);
-	} else {
-		if (queue->buffer.size > queue->buffer_size) {
-			TALLOC_FREE(queue->buffer.data);
-			memset(&queue->buffer, 0, sizeof(queue->buffer));
-		} else {
-			queue->buffer.offset = 0;
-		}
+	if ((queue->buffer.length == 0) &&
+	    (queue->buffer.size > queue->buffer_size)) {
+		/* if the buffer is empty and more than its
+		 * configured size, free the buffer
+		 * making sure no un-required memory is kept.
+		 */
+		TALLOC_FREE(queue->buffer.data);
+		memset(&queue->buffer, 0, sizeof(queue->buffer));
 	}
 
 	/* It is the responsibility of the callback to free 'data' */
 	queue->callback(data, pkt_size, queue->private_data);
+
+	if (queue->buffer.length > 0) {
+		goto process_pkt;
+	}
 }
 
 /*
@@ -181,7 +173,8 @@ static void queue_io_read(struct ctdb_queue *queue)
 		 * num_ready + buffer.length
 		 */
 
-		int new_size = num_ready + qb->length + qb->offset;
+		int new_size = MAX(num_ready + qb->length + qb->offset,
+				   queue->buffer_size);
 		qb->data = talloc_realloc_size(queue, qb->data, new_size);
 		qb->size = new_size;
 
-- 
2.14.4



More information about the samba-technical mailing list