[RFC] Performance improvements

Swen Schillig swen at vnet.ibm.com
Thu Jun 21 06:39:39 UTC 2018


Hi Jeremy

On Wed, 2018-06-20 at 15:09 -0700, Jeremy Allison wrote:
> On Wed, Jun 20, 2018 at 10:51:58PM +0200, Swen Schillig wrote:
> > 
> > Therefore, I would really like you to have a look at the code again
> > and
> > "verify" if the changes are as evil as you claim.
> 
> You do have some integer wrap and variable type
> work to do first - so why not do that and then resubmit
> with the "questionable" patch as the last entry
> so it can be reviewed carefully individually ?
> 

Thanks for all your support....and here we go.

Cheers Swen
-------------- next part --------------
From e72411119c72dadf328eee0ade939fee10263877 Mon Sep 17 00:00:00 2001
From: Swen Schillig <swen at vnet.ibm.com>
Date: Wed, 7 Mar 2018 13:54:45 +0100
Subject: [PATCH 01/10] ctdb: calculate queue input buffer size correctly

The queue's input buffer is calculated in an iterative way.
This can result in a few back and forth jumping and
a few memory allocations and mem-free cycles.
This is very time consuming and not required, because the required
memory size can be calculated right away.

Signed-off-by: Swen Schillig <swen at vnet.ibm.com>
---
 ctdb/common/ctdb_io.c | 59 ++++++++++++++++++++++-----------------------------
 1 file changed, 25 insertions(+), 34 deletions(-)

diff --git a/ctdb/common/ctdb_io.c b/ctdb/common/ctdb_io.c
index 3e732e8527d..13782778582 100644
--- a/ctdb/common/ctdb_io.c
+++ b/ctdb/common/ctdb_io.c
@@ -43,7 +43,6 @@ struct ctdb_buffer {
 	uint8_t *data;
 	uint32_t length;
 	uint32_t size;
-	uint32_t extend;
 };
 
 struct ctdb_queue_pkt {
@@ -103,16 +102,15 @@ static void queue_process(struct ctdb_queue *queue)
 		return;
 	}
 
+	/* Did we at least read the size into the buffer */
 	pkt_size = *(uint32_t *)queue->buffer.data;
 	if (pkt_size == 0) {
 		DEBUG(DEBUG_CRIT, ("Invalid packet of length 0\n"));
 		goto failed;
 	}
 
+	/* the buffer doesn't contain the full packet, return to get the rest */
 	if (queue->buffer.length < pkt_size) {
-		if (pkt_size > queue->buffer_size) {
-			queue->buffer.extend = pkt_size;
-		}
 		return;
 	}
 
@@ -159,10 +157,8 @@ failed:
 */
 static void queue_io_read(struct ctdb_queue *queue)
 {
-	int num_ready = 0;
+	size_t num_ready;
 	ssize_t nread;
-	uint8_t *data;
-	int navail;
 
 	/* check how much data is available on the socket for immediately
 	   guaranteed nonblocking access.
@@ -178,41 +174,36 @@ static void queue_io_read(struct ctdb_queue *queue)
 		goto failed;
 	}
 
-	if (queue->buffer.data == NULL) {
-		/* starting fresh, allocate buf to read data */
-		queue->buffer.data = talloc_size(queue, queue->buffer_size);
-		if (queue->buffer.data == NULL) {
-			DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", num_ready));
-			goto failed;
-		}
-		queue->buffer.size = queue->buffer_size;
-	} else if (queue->buffer.extend > 0) {
-		/* extending buffer */
-		data = talloc_realloc_size(queue, queue->buffer.data, queue->buffer.extend);
+	if (num_ready > (queue->buffer.size - queue->buffer.length)) {
+		/* new size is
+		 * buffer.size + num_ready - avail
+		 * with avail = buffer.size - buffer.length
+		 * turns into
+		 * buffer.size + num_ready - (buffer.size - buffer.length)
+		 * or
+		 * buffer.size + num_ready - buffer.size + buffer.length
+		 * or shorter
+		 * num_ready + buffer.length
+		 */
+		size_t new_size = num_ready + queue->buffer.length;
+		uint8_t *data = talloc_realloc_size(queue, queue->buffer.data,
+						    new_size);
+
 		if (data == NULL) {
-			DEBUG(DEBUG_ERR, ("read error realloc failed for %u\n", queue->buffer.extend));
+			D_ERR("read error realloc failed for %u\n", new_size);
 			goto failed;
 		}
 		queue->buffer.data = data;
-		queue->buffer.size = queue->buffer.extend;
-		queue->buffer.extend = 0;
+		queue->buffer.size = new_size;
 	}
 
-	navail = queue->buffer.size - queue->buffer.length;
-	if (num_ready > navail) {
-		num_ready = navail;
-	}
+	nread = sys_read(queue->fd, queue->buffer.data + queue->buffer.length, num_ready);
 
-	if (num_ready > 0) {
-		nread = sys_read(queue->fd,
-				 queue->buffer.data + queue->buffer.length,
-				 num_ready);
-		if (nread <= 0) {
-			DEBUG(DEBUG_ERR, ("read error nread=%d\n", (int)nread));
-			goto failed;
-		}
-		queue->buffer.length += nread;
+	if (nread <= 0) {
+		DEBUG(DEBUG_ERR, ("read error nread=%d\n", (int)nread));
+		goto failed;
 	}
+	queue->buffer.length += nread;
 
 	queue_process(queue);
 	return;
-- 
2.14.4


From 913720a8cdac2e5b79df4f4899e712e0c7c798e2 Mon Sep 17 00:00:00 2001
From: Swen Schillig <swen at vnet.ibm.com>
Date: Wed, 7 Mar 2018 14:40:33 +0100
Subject: [PATCH 02/10] ctdb: replace talloc / memcpy by talloc_memdup

Signed-off-by: Swen Schillig <swen at vnet.ibm.com>
---
 ctdb/common/ctdb_io.c | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)

diff --git a/ctdb/common/ctdb_io.c b/ctdb/common/ctdb_io.c
index 13782778582..47da72afa28 100644
--- a/ctdb/common/ctdb_io.c
+++ b/ctdb/common/ctdb_io.c
@@ -115,12 +115,11 @@ static void queue_process(struct ctdb_queue *queue)
 	}
 
 	/* Extract complete packet */
-	data = talloc_size(queue, pkt_size);
+	data = talloc_memdup(queue, queue->buffer.data, pkt_size);
 	if (data == NULL) {
-		DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", pkt_size));
+		D_ERR("read error alloc failed for %u\n", pkt_size);
 		return;
 	}
-	memcpy(data, queue->buffer.data, pkt_size);
 
 	/* Shift packet out from buffer */
 	if (queue->buffer.length > pkt_size) {
-- 
2.14.4


From fd6f616a5ab9ff4476dc8c2cc8fb34c518d2529e Mon Sep 17 00:00:00 2001
From: Swen Schillig <swen at vnet.ibm.com>
Date: Mon, 12 Mar 2018 11:00:55 +0100
Subject: [PATCH 03/10] ctdb: remove queue destructor as it isn't needed
 anymore

Signed-off-by: Swen Schillig <swen at vnet.ibm.com>
---
 ctdb/common/ctdb_io.c | 13 -------------
 1 file changed, 13 deletions(-)

diff --git a/ctdb/common/ctdb_io.c b/ctdb/common/ctdb_io.c
index 47da72afa28..70946914325 100644
--- a/ctdb/common/ctdb_io.c
+++ b/ctdb/common/ctdb_io.c
@@ -64,7 +64,6 @@ struct ctdb_queue {
 	size_t alignment;
 	void *private_data;
 	ctdb_queue_cb_fn_t callback;
-	bool *destroyed;
 	const char *name;
 	uint32_t buffer_size;
 };
@@ -402,17 +401,6 @@ int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
 	return 0;
 }
 
-/* If someone sets up this pointer, they want to know if the queue is freed */
-static int queue_destructor(struct ctdb_queue *queue)
-{
-	TALLOC_FREE(queue->buffer.data);
-	queue->buffer.length = 0;
-	queue->buffer.size = 0;
-	if (queue->destroyed != NULL)
-		*queue->destroyed = true;
-	return 0;
-}
-
 /*
   setup a packet queue on a socket
  */
@@ -445,7 +433,6 @@ struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
 			return NULL;
 		}
 	}
-	talloc_set_destructor(queue, queue_destructor);
 
 	queue->buffer_size = ctdb->tunable.queue_buffer_size;
 	/* In client code, ctdb->tunable is not initialized.
-- 
2.14.4


From bd87e84646ba54ccfb4a82103f9f5a60a8998a0f Mon Sep 17 00:00:00 2001
From: Swen Schillig <swen at vnet.ibm.com>
Date: Mon, 12 Mar 2018 16:16:13 +0100
Subject: [PATCH 04/10] ctdb: Introduce buffer.offset to avoid memmove

The memmove operation is quiet expensive, therefore,
a new buffer attribute "offset" is introduced to support
an aoptimized buffer processing.

Signed-off-by: Swen Schillig <swen at vnet.ibm.com>
---
 ctdb/common/ctdb_io.c | 87 ++++++++++++++++++++++++++++++++-------------------
 1 file changed, 55 insertions(+), 32 deletions(-)

diff --git a/ctdb/common/ctdb_io.c b/ctdb/common/ctdb_io.c
index 70946914325..cf7c8312755 100644
--- a/ctdb/common/ctdb_io.c
+++ b/ctdb/common/ctdb_io.c
@@ -43,6 +43,7 @@ struct ctdb_buffer {
 	uint8_t *data;
 	uint32_t length;
 	uint32_t size;
+	uint32_t offset;
 };
 
 struct ctdb_queue_pkt {
@@ -95,17 +96,18 @@ static void queue_process_event(struct tevent_context *ev, struct tevent_immedia
 static void queue_process(struct ctdb_queue *queue)
 {
 	uint32_t pkt_size;
-	uint8_t *data;
+	uint8_t *data = NULL;
 
 	if (queue->buffer.length < sizeof(pkt_size)) {
 		return;
 	}
 
 	/* Did we at least read the size into the buffer */
-	pkt_size = *(uint32_t *)queue->buffer.data;
+	pkt_size = *(uint32_t *)(queue->buffer.data + queue->buffer.offset);
 	if (pkt_size == 0) {
 		DEBUG(DEBUG_CRIT, ("Invalid packet of length 0\n"));
-		goto failed;
+		queue->callback(NULL, 0, queue->private_data);
+		return;
 	}
 
 	/* the buffer doesn't contain the full packet, return to get the rest */
@@ -114,20 +116,27 @@ static void queue_process(struct ctdb_queue *queue)
 	}
 
 	/* Extract complete packet */
-	data = talloc_memdup(queue, queue->buffer.data, pkt_size);
+	data = talloc_memdup(queue,
+			     queue->buffer.data + queue->buffer.offset,
+			     pkt_size);
+
 	if (data == NULL) {
 		D_ERR("read error alloc failed for %u\n", pkt_size);
 		return;
 	}
 
-	/* Shift packet out from buffer */
-	if (queue->buffer.length > pkt_size) {
-		memmove(queue->buffer.data,
-			queue->buffer.data + pkt_size,
-			queue->buffer.length - pkt_size);
-	}
+	queue->buffer.offset += pkt_size;
 	queue->buffer.length -= pkt_size;
 
+	if (queue->buffer.offset < pkt_size ||
+	    queue->buffer.offset > queue->buffer.size) {
+		D_ERR("buffer offset overflow\n");
+		TALLOC_FREE(queue->buffer.data);
+		memset(&queue->buffer, 0, sizeof(queue->buffer));
+		queue->callback(NULL, 0, queue->private_data);
+		return;
+	}
+
 	if (queue->buffer.length > 0) {
 		/* There is more data to be processed, schedule an event */
 		tevent_schedule_immediate(queue->im, queue->ctdb->ev,
@@ -135,20 +144,16 @@ static void queue_process(struct ctdb_queue *queue)
 	} else {
 		if (queue->buffer.size > queue->buffer_size) {
 			TALLOC_FREE(queue->buffer.data);
-			queue->buffer.size = 0;
+			memset(&queue->buffer, 0, sizeof(queue->buffer));
+		} else {
+			queue->buffer.offset = 0;
 		}
 	}
 
 	/* It is the responsibility of the callback to free 'data' */
 	queue->callback(data, pkt_size, queue->private_data);
-	return;
-
-failed:
-	queue->callback(NULL, 0, queue->private_data);
-
 }
 
-
 /*
   called when an incoming connection is readable
   This function MUST be safe for reentry via the queue callback!
@@ -157,6 +162,7 @@ static void queue_io_read(struct ctdb_queue *queue)
 {
 	size_t num_ready;
 	ssize_t nread;
+	struct ctdb_buffer *qb = &queue->buffer;
 
 	/* check how much data is available on the socket for immediately
 	   guaranteed nonblocking access.
@@ -169,10 +175,11 @@ static void queue_io_read(struct ctdb_queue *queue)
 	}
 	if (num_ready == 0) {
 		/* the descriptor has been closed */
-		goto failed;
+		queue->callback(NULL, 0, queue->private_data);
+		return;
 	}
 
-	if (num_ready > (queue->buffer.size - queue->buffer.length)) {
+	if (num_ready > (qb->size - (qb->length + qb->offset))) {
 		/* new size is
 		 * buffer.size + num_ready - avail
 		 * with avail = buffer.size - buffer.length
@@ -183,31 +190,47 @@ static void queue_io_read(struct ctdb_queue *queue)
 		 * or shorter
 		 * num_ready + buffer.length
 		 */
-		size_t new_size = num_ready + queue->buffer.length;
-		uint8_t *data = talloc_realloc_size(queue, queue->buffer.data,
-						    new_size);
+
+		size_t new_size = MAX(num_ready + qb->length + qb->offset,
+				      queue->buffer_size);
+		if (new_size < num_ready) {
+			/* with qb->length and qb->offset both being guaranteed
+			 * to be smaller than MAX_TALLOC_SIZE which in turn is
+			 * only a fraction of MAX_INT, the only possibility for
+			 * new_size being smaller than num_ready is an overflow
+			 */
+			D_ERR("read error num_ready overflow %u\n", num_ready);
+			TALLOC_FREE(qb->data);
+			memset(qb, 0, sizeof(*qb));
+			queue->callback(NULL, 0, queue->private_data);
+			return;
+		}
+
+		uint8_t *data = talloc_realloc_size(queue, qb->data, new_size);
 
 		if (data == NULL) {
 			D_ERR("read error realloc failed for %u\n", new_size);
-			goto failed;
+			TALLOC_FREE(qb->data);
+			memset(qb, 0, sizeof(*qb));
+			queue->callback(NULL, 0, queue->private_data);
+			return;
 		}
-		queue->buffer.data = data;
-		queue->buffer.size = new_size;
+
+		qb->data = data;
+		qb->size = new_size;
 	}
 
-	nread = sys_read(queue->fd, queue->buffer.data + queue->buffer.length, num_ready);
+	nread = sys_read(queue->fd, qb->data + qb->length + qb->offset,
+			 num_ready);
 
 	if (nread <= 0) {
 		DEBUG(DEBUG_ERR, ("read error nread=%d\n", (int)nread));
-		goto failed;
+		queue->callback(NULL, 0, queue->private_data);
+		return;
 	}
-	queue->buffer.length += nread;
+	qb->length += nread;
 
 	queue_process(queue);
-	return;
-
-failed:
-	queue->callback(NULL, 0, queue->private_data);
 }
 
 
-- 
2.14.4


From 3dd8f12f1bedaf34e280993372e04a05ec4f483e Mon Sep 17 00:00:00 2001
From: Swen Schillig <swen at vnet.ibm.com>
Date: Mon, 12 Mar 2018 17:56:21 +0100
Subject: [PATCH 05/10] ctdb: Adding memory pool for queue callback

The received packet is copied into a newly allocated memory chunk for further
processing by the assigned callback. Once this is done, the memory is free'd.
This is repeated for each received packet making the memory allocation / free
an expensive task. To optimize this process, a memory pool is defined which
is sized identically to the queue's buffer and is therefore adjustable via
tunable.queue_buffer_size.

Signed-off-by: Swen Schillig <swen at vnet.ibm.com>
---
 ctdb/common/ctdb_io.c | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/ctdb/common/ctdb_io.c b/ctdb/common/ctdb_io.c
index cf7c8312755..1185d380b23 100644
--- a/ctdb/common/ctdb_io.c
+++ b/ctdb/common/ctdb_io.c
@@ -65,6 +65,7 @@ struct ctdb_queue {
 	size_t alignment;
 	void *private_data;
 	ctdb_queue_cb_fn_t callback;
+	char *callback_data_pool;
 	const char *name;
 	uint32_t buffer_size;
 };
@@ -116,7 +117,7 @@ static void queue_process(struct ctdb_queue *queue)
 	}
 
 	/* Extract complete packet */
-	data = talloc_memdup(queue,
+	data = talloc_memdup(queue->callback_data_pool,
 			     queue->buffer.data + queue->buffer.offset,
 			     pkt_size);
 
@@ -465,5 +466,8 @@ struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
 		queue->buffer_size = 1024;
 	}
 
+	queue->callback_data_pool = talloc_pool(queue, queue->buffer_size);
+	CTDB_NO_MEMORY_NULL(ctdb, queue->callback_data_pool);
+
 	return queue;
 }
-- 
2.14.4


From 118f4e99e094fe43c33bbc5240fef3903c76949f Mon Sep 17 00:00:00 2001
From: Swen Schillig <swen at vnet.ibm.com>
Date: Wed, 14 Mar 2018 06:57:20 +0100
Subject: [PATCH 06/10] ctdb: Consolidate all ALIGNMENT instructions

Consolidate all ALIGNMENT instructions which are really padding instructions
into one place. This reduces the number of padding-tasks for each
processing-path to one and eases to update to new requirements if they arise.
One of them might be to remove the buffer padding altogether once we're sure
that it is neither required nor beneficial.

Signed-off-by: Swen Schillig <swen at vnet.ibm.com>
---
 ctdb/common/common.h         |  4 +--
 ctdb/common/ctdb_io.c        | 75 +++++++++++++++++---------------------------
 ctdb/ib/ibw_ctdb_init.c      | 10 ------
 ctdb/include/ctdb_private.h  |  1 -
 ctdb/include/ctdb_protocol.h |  3 --
 ctdb/server/ctdb_client.c    | 18 ++++++-----
 ctdb/server/ctdb_daemon.c    | 16 ++++++----
 ctdb/tcp/ctdb_tcp.h          |  2 --
 ctdb/tcp/tcp_connect.c       |  9 ++++--
 ctdb/tcp/tcp_init.c          | 27 +++++-----------
 ctdb/tcp/tcp_io.c            |  6 ----
 11 files changed, 67 insertions(+), 104 deletions(-)

diff --git a/ctdb/common/common.h b/ctdb/common/common.h
index 02bb746c9b3..c1bd77617d5 100644
--- a/ctdb/common/common.h
+++ b/ctdb/common/common.h
@@ -32,10 +32,10 @@ int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length);
 int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd);
 
 struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
-				    TALLOC_CTX *mem_ctx, int fd, int alignment,
+				    TALLOC_CTX *mem_ctx, int fd,
 				    ctdb_queue_cb_fn_t callback,
 				    void *private_data, const char *fmt, ...)
-				    PRINTF_ATTRIBUTE(7,8);
+				    PRINTF_ATTRIBUTE(6,7);
 
 /* From common/ctdb_ltdb.c */
 
diff --git a/ctdb/common/ctdb_io.c b/ctdb/common/ctdb_io.c
index 1185d380b23..7450ca52f91 100644
--- a/ctdb/common/ctdb_io.c
+++ b/ctdb/common/ctdb_io.c
@@ -50,7 +50,6 @@ struct ctdb_queue_pkt {
 	struct ctdb_queue_pkt *next, *prev;
 	uint8_t *data;
 	uint32_t length;
-	uint32_t full_length;
 	uint8_t buf[];
 };
 
@@ -62,7 +61,6 @@ struct ctdb_queue {
 	uint32_t out_queue_length;
 	struct tevent_fd *fde;
 	int fd;
-	size_t alignment;
 	void *private_data;
 	ctdb_queue_cb_fn_t callback;
 	char *callback_data_pool;
@@ -70,8 +68,6 @@ struct ctdb_queue {
 	uint32_t buffer_size;
 };
 
-
-
 int ctdb_queue_length(struct ctdb_queue *queue)
 {
 	return queue->out_queue_length;
@@ -252,6 +248,7 @@ static void queue_io_write(struct ctdb_queue *queue)
 	while (queue->out_queue) {
 		struct ctdb_queue_pkt *pkt = queue->out_queue;
 		ssize_t n;
+
 		if (queue->ctdb->flags & CTDB_FLAG_TORTURE) {
 			n = write(queue->fd, pkt->data, 1);
 		} else {
@@ -259,25 +256,30 @@ static void queue_io_write(struct ctdb_queue *queue)
 		}
 
 		if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
-			if (pkt->length != pkt->full_length) {
+			if (pkt->data != pkt->buf) {
 				/* partial packet sent - we have to drop it */
 				DLIST_REMOVE(queue->out_queue, pkt);
 				queue->out_queue_length--;
 				talloc_free(pkt);
 			}
-			talloc_free(queue->fde);
-			queue->fde = NULL;
 			queue->fd = -1;
+			TALLOC_FREE(queue->fde);
 			tevent_schedule_immediate(queue->im, queue->ctdb->ev,
 						  queue_dead, queue);
 			return;
 		}
-		if (n <= 0) return;
-		
+
+		if (n <= 0) {
+			/* if no data could be written or
+			 * any other error was received, return.
+			 */
+			return;
+		}
+
 		if (n != pkt->length) {
 			pkt->length -= n;
 			pkt->data += n;
-			return;
+			continue;
 		}
 
 		DLIST_REMOVE(queue->out_queue, pkt);
@@ -303,7 +305,6 @@ static void queue_io_handler(struct tevent_context *ev, struct tevent_fd *fde,
 	}
 }
 
-
 /*
   queue a packet for sending
 */
@@ -311,67 +312,50 @@ int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
 {
 	struct ctdb_req_header *hdr = (struct ctdb_req_header *)data;
 	struct ctdb_queue_pkt *pkt;
-	uint32_t length2, full_length;
 
 	/* If the queue does not have valid fd, no point queueing a packet */
 	if (queue->fd == -1) {
 		return 0;
 	}
 
-	if (queue->alignment) {
-		/* enforce the length and alignment rules from the tcp packet allocator */
-		length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
-		*(uint32_t *)data = length2;
-	} else {
-		length2 = length;
-	}
-
-	if (length2 != length) {
-		memset(data+length, 0, length2-length);
-	}
-
-	full_length = length2;
-	
 	/* if the queue is empty then try an immediate write, avoiding
 	   queue overhead. This relies on non-blocking sockets */
-	if (queue->out_queue == NULL && queue->fd != -1 &&
+	if (queue->out_queue == NULL &&
 	    !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) {
-		ssize_t n = write(queue->fd, data, length2);
+		ssize_t n = write(queue->fd, data, length);
 		if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
-			talloc_free(queue->fde);
-			queue->fde = NULL;
 			queue->fd = -1;
+			TALLOC_FREE(queue->fde);
 			tevent_schedule_immediate(queue->im, queue->ctdb->ev,
 						  queue_dead, queue);
-			/* yes, we report success, as the dead node is 
+			/* yes, we report success, as the dead node is
 			   handled via a separate event */
 			return 0;
 		}
+
 		if (n > 0) {
 			data += n;
-			length2 -= n;
+			length -= n;
+		}
+
+		if (length == 0) {
+			return 0;
 		}
-		if (length2 == 0) return 0;
 	}
 
-	pkt = talloc_size(
-		queue, offsetof(struct ctdb_queue_pkt, buf) + length2);
+	pkt = talloc_size(queue, offsetof(struct ctdb_queue_pkt, buf) + length);
 	CTDB_NO_MEMORY(queue->ctdb, pkt);
+
 	talloc_set_name_const(pkt, "struct ctdb_queue_pkt");
 
 	pkt->data = pkt->buf;
-	memcpy(pkt->data, data, length2);
-
-	pkt->length = length2;
-	pkt->full_length = full_length;
-
-	if (queue->out_queue == NULL && queue->fd != -1) {
-		TEVENT_FD_WRITEABLE(queue->fde);
-	}
+	memcpy(pkt->data, data, length);
+	pkt->length = length;
 
+	queue->out_queue_length++;
 	DLIST_ADD_END(queue->out_queue, pkt);
 
-	queue->out_queue_length++;
+	TEVENT_FD_WRITEABLE(queue->fde);
 
 	if (queue->ctdb->tunable.verbose_memory_names != 0) {
 		switch (hdr->operation) {
@@ -429,7 +413,7 @@ int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
   setup a packet queue on a socket
  */
 struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
-				    TALLOC_CTX *mem_ctx, int fd, int alignment,
+				    TALLOC_CTX *mem_ctx, int fd,
 				    ctdb_queue_cb_fn_t callback,
 				    void *private_data, const char *fmt, ...)
 {
@@ -448,7 +432,6 @@ struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
 
 	queue->ctdb = ctdb;
 	queue->fd = fd;
-	queue->alignment = alignment;
 	queue->private_data = private_data;
 	queue->callback = callback;
 	if (fd != -1) {
diff --git a/ctdb/ib/ibw_ctdb_init.c b/ctdb/ib/ibw_ctdb_init.c
index 7e77ec08031..4e5044e2e86 100644
--- a/ctdb/ib/ibw_ctdb_init.c
+++ b/ctdb/ib/ibw_ctdb_init.c
@@ -191,15 +191,6 @@ static void ctdb_ibw_restart(struct ctdb_node *node)
 	DEBUG(DEBUG_ALERT,("WARNING: method restart is not yet implemented for IB\n"));
 }
 
-/*
- * transport packet allocator - allows transport to control memory for packets
- */
-static void *ctdb_ibw_allocate_pkt(TALLOC_CTX *mem_ctx, size_t size)
-{
-	/* TODO: use ibw_alloc_send_buf instead... */
-	return talloc_size(mem_ctx, size);
-}
-
 #ifdef __NOTDEF__
 
 static int ctdb_ibw_stop(struct ctdb_context *cctx)
@@ -217,7 +208,6 @@ static const struct ctdb_methods ctdb_ibw_methods = {
 	.start     = ctdb_ibw_start,
 	.queue_pkt = ctdb_ibw_queue_pkt,
 	.add_node = ctdb_ibw_add_node,
-	.allocate_pkt = ctdb_ibw_allocate_pkt,
 	.restart      = ctdb_ibw_restart,
 
 //	.stop = ctdb_ibw_stop
diff --git a/ctdb/include/ctdb_private.h b/ctdb/include/ctdb_private.h
index 39e32540e00..6e640d7a581 100644
--- a/ctdb/include/ctdb_private.h
+++ b/ctdb/include/ctdb_private.h
@@ -100,7 +100,6 @@ struct ctdb_methods {
 	int (*add_node)(struct ctdb_node *); /* setup a new node */	
 	int (*connect_node)(struct ctdb_node *); /* connect to node */
 	int (*queue_pkt)(struct ctdb_node *, uint8_t *data, uint32_t length);
-	void *(*allocate_pkt)(TALLOC_CTX *mem_ctx, size_t );
 	void (*shutdown)(struct ctdb_context *); /* shutdown transport */
 	void (*restart)(struct ctdb_node *); /* stop and restart the connection */
 };
diff --git a/ctdb/include/ctdb_protocol.h b/ctdb/include/ctdb_protocol.h
index 7b6014fdff9..34d6066affc 100644
--- a/ctdb/include/ctdb_protocol.h
+++ b/ctdb/include/ctdb_protocol.h
@@ -26,9 +26,6 @@
 /* define ctdb port number */
 #define CTDB_PORT 4379
 
-/* we must align packets to ensure ctdb works on all architectures (eg. sparc) */
-#define CTDB_DS_ALIGNMENT 8
-
 /*
   structure passed to a ctdb call backend function
 */
diff --git a/ctdb/server/ctdb_client.c b/ctdb/server/ctdb_client.c
index 67c89dee3d0..472fe2835d1 100644
--- a/ctdb/server/ctdb_client.c
+++ b/ctdb/server/ctdb_client.c
@@ -41,6 +41,8 @@
 #include "common/common.h"
 #include "common/logging.h"
 
+#include "protocol/protocol_api.h"
+
 /*
   allocate a packet for use in client<->daemon communication
  */
@@ -50,13 +52,12 @@ struct ctdb_req_header *_ctdbd_allocate_pkt(struct ctdb_context *ctdb,
 					    size_t length, size_t slength,
 					    const char *type)
 {
-	int size;
-	struct ctdb_req_header *hdr;
+	struct ctdb_req_header *hdr = NULL;
 
 	length = MAX(length, slength);
-	size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
 
-	hdr = (struct ctdb_req_header *)talloc_zero_size(mem_ctx, size);
+	ctdb_allocate_pkt(mem_ctx, length, (uint8_t **)&hdr, &length);
+
 	if (hdr == NULL) {
 		DEBUG(DEBUG_ERR,("Unable to allocate packet for operation %u of length %u\n",
 			 operation, (unsigned)length));
@@ -308,9 +309,12 @@ int ctdb_socket_connect(struct ctdb_context *ctdb)
 
 	set_close_on_exec(ctdb->daemon.sd);
 
-	ctdb->daemon.queue = ctdb_queue_setup(ctdb, ctdb, ctdb->daemon.sd,
-					      CTDB_DS_ALIGNMENT,
-					      ctdb_client_read_cb, ctdb, "to-ctdbd");
+	ctdb->daemon.queue = ctdb_queue_setup(ctdb,
+					      ctdb,
+					      ctdb->daemon.sd,
+					      ctdb_client_read_cb,
+					      ctdb,
+					      "to-ctdbd");
 	return 0;
 }
 
diff --git a/ctdb/server/ctdb_daemon.c b/ctdb/server/ctdb_daemon.c
index 3b06972d030..11ef1c7d900 100644
--- a/ctdb/server/ctdb_daemon.c
+++ b/ctdb/server/ctdb_daemon.c
@@ -48,6 +48,8 @@
 #include "common/pidfile.h"
 #include "common/sock_io.h"
 
+#include "protocol/protocol_api.h"
+
 struct ctdb_client_pid_list {
 	struct ctdb_client_pid_list *next, *prev;
 	struct ctdb_context *ctdb;
@@ -987,9 +989,13 @@ static void ctdb_accept_client(struct tevent_context *ev,
 
 	DLIST_ADD(ctdb->client_pids, client_pid);
 
-	client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
-					 ctdb_daemon_read_cb, client,
-					 "client-%u", client->pid);
+	client->queue = ctdb_queue_setup(ctdb,
+					 client,
+					 fd,
+					 ctdb_daemon_read_cb,
+					 client,
+					 "client-%u",
+					 client->pid);
 
 	talloc_set_destructor(client, ctdb_client_destructor);
 	talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
@@ -1406,11 +1412,9 @@ struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
 						 size_t length, size_t slength,
 						 const char *type)
 {
-	int size;
 	struct ctdb_req_header *hdr;
 
 	length = MAX(length, slength);
-	size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
 
 	if (ctdb->methods == NULL) {
 		DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
@@ -1418,7 +1422,7 @@ struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
 		return NULL;
 	}
 
-	hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
+	ctdb_allocate_pkt(mem_ctx, length, (uint8_t **)&hdr, &length);
 	if (hdr == NULL) {
 		DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
 			 operation, (unsigned)length));
diff --git a/ctdb/tcp/ctdb_tcp.h b/ctdb/tcp/ctdb_tcp.h
index 0a998c94da4..d90bbcbbfeb 100644
--- a/ctdb/tcp/ctdb_tcp.h
+++ b/ctdb/tcp/ctdb_tcp.h
@@ -55,6 +55,4 @@ void ctdb_tcp_read_cb(uint8_t *data, size_t cnt, void *args);
 void ctdb_tcp_tnode_cb(uint8_t *data, size_t cnt, void *private_data);
 void ctdb_tcp_stop_connection(struct ctdb_node *node);
 
-#define CTDB_TCP_ALIGNMENT 8
-
 #endif /* _CTDB_TCP_H */
diff --git a/ctdb/tcp/tcp_connect.c b/ctdb/tcp/tcp_connect.c
index 13452a5e83b..b01db1f3645 100644
--- a/ctdb/tcp/tcp_connect.c
+++ b/ctdb/tcp/tcp_connect.c
@@ -284,8 +284,13 @@ static void ctdb_listen_event(struct tevent_context *ev, struct tevent_fd *fde,
 				      strerror(errno)));
 	}
 
-	in->queue = ctdb_queue_setup(ctdb, in, in->fd, CTDB_TCP_ALIGNMENT,
-				     ctdb_tcp_read_cb, in, "ctdbd-%s", ctdb_addr_to_str(&addr));
+	in->queue = ctdb_queue_setup(ctdb,
+				     in,
+				     in->fd,
+				     ctdb_tcp_read_cb,
+				     in,
+				     "ctdbd-%s",
+				     ctdb_addr_to_str(&addr));
 }
 
 
diff --git a/ctdb/tcp/tcp_init.c b/ctdb/tcp/tcp_init.c
index b6083666e18..81e659efb18 100644
--- a/ctdb/tcp/tcp_init.c
+++ b/ctdb/tcp/tcp_init.c
@@ -47,7 +47,7 @@ static int tnode_destructor(struct ctdb_tcp_node *tnode)
 }
 
 /*
-  initialise tcp portion of a ctdb node 
+  initialise tcp portion of a ctdb node
 */
 static int ctdb_tcp_add_node(struct ctdb_node *node)
 {
@@ -59,9 +59,13 @@ static int ctdb_tcp_add_node(struct ctdb_node *node)
 	node->private_data = tnode;
 	talloc_set_destructor(tnode, tnode_destructor);
 
-	tnode->out_queue = ctdb_queue_setup(node->ctdb, node, tnode->fd, CTDB_TCP_ALIGNMENT,
-					    ctdb_tcp_tnode_cb, node, "to-node-%s", node->name);
-	
+	tnode->out_queue = ctdb_queue_setup(node->ctdb,
+					    node,
+					    tnode->fd,
+					    ctdb_tcp_tnode_cb, node,
+					    "to-node-%s",
+					    node->name);
+
 	return 0;
 }
 
@@ -159,27 +163,12 @@ static int ctdb_tcp_start(struct ctdb_context *ctdb)
 	return 0;
 }
 
-
-/*
-  transport packet allocator - allows transport to control memory for packets
-*/
-static void *ctdb_tcp_allocate_pkt(TALLOC_CTX *mem_ctx, size_t size)
-{
-	/* tcp transport needs to round to 8 byte alignment to ensure
-	   that we can use a length header and 64 bit elements in
-	   structures */
-	size = (size+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1);
-	return talloc_size(mem_ctx, size);
-}
-
-
 static const struct ctdb_methods ctdb_tcp_methods = {
 	.initialise   = ctdb_tcp_initialise,
 	.start        = ctdb_tcp_start,
 	.queue_pkt    = ctdb_tcp_queue_pkt,
 	.add_node     = ctdb_tcp_add_node,
 	.connect_node = ctdb_tcp_connect_node,
-	.allocate_pkt = ctdb_tcp_allocate_pkt,
 	.shutdown     = ctdb_tcp_shutdown,
 	.restart      = ctdb_tcp_restart,
 };
diff --git a/ctdb/tcp/tcp_io.c b/ctdb/tcp/tcp_io.c
index 0eb8e25eea3..bad17c9d0f9 100644
--- a/ctdb/tcp/tcp_io.c
+++ b/ctdb/tcp/tcp_io.c
@@ -50,12 +50,6 @@ void ctdb_tcp_read_cb(uint8_t *data, size_t cnt, void *args)
 		goto failed;
 	}
 
-	if (cnt & (CTDB_TCP_ALIGNMENT-1)) {
-		DEBUG(DEBUG_ALERT,(__location__ " Length 0x%x not multiple of alignment\n", 
-			 (unsigned)cnt));
-		goto failed;
-	}
-
 	if (hdr->ctdb_magic != CTDB_MAGIC) {
 		DEBUG(DEBUG_ALERT,(__location__ " Non CTDB packet 0x%x rejected\n", 
 			 hdr->ctdb_magic));
-- 
2.14.4


From 699877ba6a48f1a867ab83f6b89addebdc1550d3 Mon Sep 17 00:00:00 2001
From: Swen Schillig <swen at vnet.ibm.com>
Date: Thu, 15 Mar 2018 13:51:34 +0100
Subject: [PATCH 07/10] ctdb: Introduce tunable parameter for queue mempool
 size

Like the queue's buffer size, the queue's mempool, which is used to
allocate temporary memory to process input packet and queue sending packets,
should be configurable externally.

Signed-off-by: Swen Schillig <swen at vnet.ibm.com>
---
 ctdb/common/ctdb_io.c            | 14 ++++++++++----
 ctdb/common/tunable.c            |  2 ++
 ctdb/protocol/protocol.h         |  1 +
 ctdb/protocol/protocol_types.c   |  4 ++++
 ctdb/tests/src/protocol_common.c |  2 ++
 5 files changed, 19 insertions(+), 4 deletions(-)

diff --git a/ctdb/common/ctdb_io.c b/ctdb/common/ctdb_io.c
index 7450ca52f91..9e024634d6b 100644
--- a/ctdb/common/ctdb_io.c
+++ b/ctdb/common/ctdb_io.c
@@ -38,6 +38,8 @@
 #include "common/logging.h"
 #include "common/common.h"
 
+#define DEFAULT_MEMPOOL_SIZE 8192
+
 /* structures for packet queueing - see common/ctdb_io.c */
 struct ctdb_buffer {
 	uint8_t *data;
@@ -63,7 +65,7 @@ struct ctdb_queue {
 	int fd;
 	void *private_data;
 	ctdb_queue_cb_fn_t callback;
-	char *callback_data_pool;
+	char *data_pool;
 	const char *name;
 	uint32_t buffer_size;
 };
@@ -113,7 +115,7 @@ static void queue_process(struct ctdb_queue *queue)
 	}
 
 	/* Extract complete packet */
-	data = talloc_memdup(queue->callback_data_pool,
+	data = talloc_memdup(queue->data_pool,
 			     queue->buffer.data + queue->buffer.offset,
 			     pkt_size);
 
@@ -418,6 +420,7 @@ struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
 				    void *private_data, const char *fmt, ...)
 {
 	struct ctdb_queue *queue;
+	const uint32_t mempool_size = ctdb->tunable.queue_mempool_size;
 	va_list ap;
 
 	queue = talloc_zero(mem_ctx, struct ctdb_queue);
@@ -449,8 +452,11 @@ struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
 		queue->buffer_size = 1024;
 	}
 
-	queue->callback_data_pool = talloc_pool(queue, queue->buffer_size);
-	CTDB_NO_MEMORY_NULL(ctdb, queue->callback_data_pool);
+	queue->data_pool = talloc_pool(queue,
+				       mempool_size ? mempool_size :
+						      DEFAULT_MEMPOOL_SIZE);
+
+	CTDB_NO_MEMORY_NULL(ctdb, queue->data_pool);
 
 	return queue;
 }
diff --git a/ctdb/common/tunable.c b/ctdb/common/tunable.c
index 14f6828bd15..22cda6cb61c 100644
--- a/ctdb/common/tunable.c
+++ b/ctdb/common/tunable.c
@@ -153,6 +153,8 @@ static struct {
 		offsetof(struct ctdb_tunable_list, rec_buffer_size_limit) },
 	{ "QueueBufferSize", 1024, false,
 		offsetof(struct ctdb_tunable_list, queue_buffer_size) },
+	{ "QueueMempoolSize", 8192, false,
+		offsetof(struct ctdb_tunable_list, queue_mempool_size) },
 	{ "IPAllocAlgorithm", 2, false,
 		offsetof(struct ctdb_tunable_list, ip_alloc_algorithm) },
 	{ "AllowMixedVersions", 0, false,
diff --git a/ctdb/protocol/protocol.h b/ctdb/protocol/protocol.h
index cb807e3b939..61967a61048 100644
--- a/ctdb/protocol/protocol.h
+++ b/ctdb/protocol/protocol.h
@@ -654,6 +654,7 @@ struct ctdb_tunable_list {
 	uint32_t lock_processes_per_db;
 	uint32_t rec_buffer_size_limit;
 	uint32_t queue_buffer_size;
+	uint32_t queue_mempool_size;
 	uint32_t ip_alloc_algorithm;
 	uint32_t allow_mixed_versions;
 };
diff --git a/ctdb/protocol/protocol_types.c b/ctdb/protocol/protocol_types.c
index 416d4843b74..ee0270915f5 100644
--- a/ctdb/protocol/protocol_types.c
+++ b/ctdb/protocol/protocol_types.c
@@ -2559,6 +2559,7 @@ size_t ctdb_tunable_list_len(struct ctdb_tunable_list *in)
 		ctdb_uint32_len(&in->lock_processes_per_db) +
 		ctdb_uint32_len(&in->rec_buffer_size_limit) +
 		ctdb_uint32_len(&in->queue_buffer_size) +
+		ctdb_uint32_len(&in->queue_mempool_size) +
 		ctdb_uint32_len(&in->ip_alloc_algorithm) +
 		ctdb_uint32_len(&in->allow_mixed_versions);
 }
@@ -2748,6 +2749,9 @@ void ctdb_tunable_list_push(struct ctdb_tunable_list *in, uint8_t *buf,
 	ctdb_uint32_push(&in->queue_buffer_size, buf+offset, &np);
 	offset += np;
 
+	ctdb_uint32_push(&in->queue_mempool_size, buf+offset, &np);
+	offset += np;
+
 	ctdb_uint32_push(&in->ip_alloc_algorithm, buf+offset, &np);
 	offset += np;
 
diff --git a/ctdb/tests/src/protocol_common.c b/ctdb/tests/src/protocol_common.c
index c06272db6cc..6b25cb8748c 100644
--- a/ctdb/tests/src/protocol_common.c
+++ b/ctdb/tests/src/protocol_common.c
@@ -846,6 +846,7 @@ void fill_ctdb_tunable_list(TALLOC_CTX *mem_ctx, struct ctdb_tunable_list *p)
 	p->lock_processes_per_db = rand32();
 	p->rec_buffer_size_limit = rand32();
 	p->queue_buffer_size = rand32();
+	p->queue_mempool_size = rand32();
 	p->ip_alloc_algorithm = rand32();
 	p->allow_mixed_versions = rand32();
 }
@@ -913,6 +914,7 @@ void verify_ctdb_tunable_list(struct ctdb_tunable_list *p1,
 	assert(p1->lock_processes_per_db == p2->lock_processes_per_db);
 	assert(p1->rec_buffer_size_limit == p2->rec_buffer_size_limit);
 	assert(p1->queue_buffer_size == p2->queue_buffer_size);
+	assert(p1->queue_mempool_size == p2->queue_mempool_size);
 	assert(p1->ip_alloc_algorithm == p2->ip_alloc_algorithm);
 	assert(p1->allow_mixed_versions == p2->allow_mixed_versions);
 }
-- 
2.14.4


From 6e6da7ab5c4dc9c80e4bd9fd307192cc7936d02d Mon Sep 17 00:00:00 2001
From: Swen Schillig <swen at vnet.ibm.com>
Date: Thu, 15 Mar 2018 13:56:13 +0100
Subject: [PATCH 08/10] ctdb: Increase queue's default buffer size to 8k

Modifying the queue's default buffer size to 8k increasing
the likelyhood that bigger packets can be read wihtout memory re-allocation.
In addition this enables the possibility to receive/store more packets
without the need to always memcopy areas within the buffer.

Signed-off-by: Swen Schillig <swen at vnet.ibm.com>
---
 ctdb/common/tunable.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/ctdb/common/tunable.c b/ctdb/common/tunable.c
index 22cda6cb61c..404d0157047 100644
--- a/ctdb/common/tunable.c
+++ b/ctdb/common/tunable.c
@@ -151,7 +151,7 @@ static struct {
 		offsetof(struct ctdb_tunable_list, lock_processes_per_db) },
 	{ "RecBufferSizeLimit", 1000*1000, false,
 		offsetof(struct ctdb_tunable_list, rec_buffer_size_limit) },
-	{ "QueueBufferSize", 1024, false,
+	{ "QueueBufferSize", 8192, false,
 		offsetof(struct ctdb_tunable_list, queue_buffer_size) },
 	{ "QueueMempoolSize", 8192, false,
 		offsetof(struct ctdb_tunable_list, queue_mempool_size) },
-- 
2.14.4


From 1ebf3ea9fa6b41111ddbf3aaeea369677b9785be Mon Sep 17 00:00:00 2001
From: Swen Schillig <swen at vnet.ibm.com>
Date: Thu, 15 Mar 2018 14:04:52 +0100
Subject: [PATCH 09/10] ctdb: Enabling ctdb_queue_send to use memory from
 data_pool

ctdb_queue_send must queue data packets if it wasn't able to send the data
right away. In order to do this, the temporary memory requirements can now be
fulfilled by allocating memory from the queue's data_pool.

Signed-off-by: Swen Schillig <swen at vnet.ibm.com>
---
 ctdb/common/ctdb_io.c | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/ctdb/common/ctdb_io.c b/ctdb/common/ctdb_io.c
index 9e024634d6b..bb90fdee375 100644
--- a/ctdb/common/ctdb_io.c
+++ b/ctdb/common/ctdb_io.c
@@ -345,7 +345,9 @@ int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
 		}
 	}
 
-	pkt = talloc_size(queue, offsetof(struct ctdb_queue_pkt, buf) + length);
+	pkt = talloc_size(queue->data_pool,
+			  offsetof(struct ctdb_queue_pkt, buf) + length);
+
 	CTDB_NO_MEMORY(queue->ctdb, pkt);
 
 	talloc_set_name_const(pkt, "struct ctdb_queue_pkt");
-- 
2.14.4


From 05e0bfc39949544bec59a18ae642c23c93fc4fed Mon Sep 17 00:00:00 2001
From: Swen Schillig <swen at vnet.ibm.com>
Date: Mon, 19 Mar 2018 17:58:46 +0100
Subject: [PATCH 10/10] ctdb: Optimize receive queue processing.

Process all received packets which are in the queue and complete,
instead of triggering an additional event.

Signed-off-by: Swen Schillig <swen at vnet.ibm.com>
---
 ctdb/common/ctdb_io.c | 34 +++++++++++++---------------------
 1 file changed, 13 insertions(+), 21 deletions(-)

diff --git a/ctdb/common/ctdb_io.c b/ctdb/common/ctdb_io.c
index bb90fdee375..158091ea92f 100644
--- a/ctdb/common/ctdb_io.c
+++ b/ctdb/common/ctdb_io.c
@@ -75,16 +75,6 @@ int ctdb_queue_length(struct ctdb_queue *queue)
 	return queue->out_queue_length;
 }
 
-static void queue_process(struct ctdb_queue *queue);
-
-static void queue_process_event(struct tevent_context *ev, struct tevent_immediate *im,
-				void *private_data)
-{
-	struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
-
-	queue_process(queue);
-}
-
 /*
  * This function is used to process data in queue buffer.
  *
@@ -97,6 +87,7 @@ static void queue_process(struct ctdb_queue *queue)
 	uint32_t pkt_size;
 	uint8_t *data = NULL;
 
+process_pkt:
 	if (queue->buffer.length < sizeof(pkt_size)) {
 		return;
 	}
@@ -136,21 +127,22 @@ static void queue_process(struct ctdb_queue *queue)
 		return;
 	}
 
-	if (queue->buffer.length > 0) {
-		/* There is more data to be processed, schedule an event */
-		tevent_schedule_immediate(queue->im, queue->ctdb->ev,
-					  queue_process_event, queue);
-	} else {
-		if (queue->buffer.size > queue->buffer_size) {
-			TALLOC_FREE(queue->buffer.data);
-			memset(&queue->buffer, 0, sizeof(queue->buffer));
-		} else {
-			queue->buffer.offset = 0;
-		}
+	if ((queue->buffer.length == 0) &&
+	    (queue->buffer.size > queue->buffer_size)) {
+		/* if the buffer is empty and more than its
+		 * configured size, free the buffer
+		 * making sure no un-required memory is kept.
+		 */
+		TALLOC_FREE(queue->buffer.data);
+		memset(&queue->buffer, 0, sizeof(queue->buffer));
 	}
 
 	/* It is the responsibility of the callback to free 'data' */
 	queue->callback(data, pkt_size, queue->private_data);
+
+	if (queue->buffer.length > 0) {
+		goto process_pkt;
+	}
 }
 
 /*
-- 
2.14.4



More information about the samba-technical mailing list