[Fwd: Re: ctdb: Adding memory pool for queue callback]

swen swen at linux.ibm.com
Wed Nov 28 07:56:30 UTC 2018


I got a second reviewer for the attached 2 patches, woohoo.

Thanks again Martin, Christof.

The commit messages are slightly updated (typo, more info).

Posted here now for final processing... pushing, maybe :-)

Cheers Swen
-------- Forwarded Message --------
From: Martin Schwenke <martin at meltin.net>
To: Swen Schillig <swen at vnet.ibm.com>
Cc: samba-technical <samba-technical at lists.samba.org>
Subject: Re: ctdb: Adding memory pool for queue callback
Date: Tue, 9 Oct 2018 15:01:43 +1100

On Mon, 01 Oct 2018 12:55:12 +0200, Swen Schillig <swen at vnet.ibm.com>
wrote:

> On Fri, 2018-09-28 at 15:48 +1000, Martin Schwenke wrote:
> > This is generally a good improvement.
> > 
> > I think the following part of the commit message probably needs to
> > be
> > updated:
> > 
> >   All received messages are processed synchronously per queue,
> >   therefore, a larger memory pool is not requried.
> > 
> > There are definitely packets that are processed
> > asynchronously.  Any
> > of
> > the control handling functions called by ctdb_control_dispatch()
> > that
> > have async_reply passed to them can talloc_steal() the control
> > structure
> > (i.e. the packet, because structs on the wire!).  Similarly for
> > things
> > like ctdb_reply_call().
> > 
> > However, even with some packets processed asynchronously I think
> > that
> > this should provide a useful reduction in malloc()s.
> > 
> > Can you please take a look at these code paths and, if convinced,
> > update the commit message?  
> Agree. 
> Patch description updated.

I'd prefer to see the commit message mention that the pool doesn't
cover all cases due to some cases being handled asynchronously.

However, the patch itself will provide a significant reduction in the
number of malloc()s, so:

Reviewed-by: Martin Schwenke <martin at meltin.net>

Another team reviewer please?

This one depends on a previous patch, which is still awaiting a 2nd
reviewer:

  
https://lists.samba.org/archive/samba-technical/2018-August/129776.html

My RB+ for this is here:

  
https://lists.samba.org/archive/samba-technical/2018-August/129929.html

Thanks...

peace & happiness,
martin

-------------- next part --------------
From db02bd0dacb10caa9f7d06a41af7a674247f02f1 Mon Sep 17 00:00:00 2001
From: Swen Schillig <swen at vnet.ibm.com>
Date: Mon, 12 Mar 2018 11:00:55 +0100
Subject: [PATCH 1/2] ctdb: Introduce buffer.offset to avoid memmove

The memmove operation is quite expensive, therefore,
a new buffer attribute "offset" is introduced to support
an optimized buffer processing.
The optimization is to "walk" through the buffer and process
each packet until the buffer is fully processed (empty)
without requiring any memmove.
Only if a packet is in-complete, the buffer content is moved
and the new data is read from the queue.
This way almost all memmove operations are eliminated.

Signed-off-by: Swen Schillig <swen at vnet.ibm.com>
---
 ctdb/common/ctdb_io.c | 55 ++++++++++++++++++++++++++++++-------------
 1 file changed, 39 insertions(+), 16 deletions(-)

diff --git a/ctdb/common/ctdb_io.c b/ctdb/common/ctdb_io.c
index 32d8fc753a6..5bed7a61b31 100644
--- a/ctdb/common/ctdb_io.c
+++ b/ctdb/common/ctdb_io.c
@@ -43,6 +43,7 @@ struct ctdb_buffer {
 	uint8_t *data;
 	uint32_t length;
 	uint32_t size;
+	uint32_t offset;
 };
 
 struct ctdb_queue_pkt {
@@ -95,14 +96,14 @@ static void queue_process_event(struct tevent_context *ev, struct tevent_immedia
 static void queue_process(struct ctdb_queue *queue)
 {
 	uint32_t pkt_size;
-	uint8_t *data;
+	uint8_t *data = NULL;
 
 	if (queue->buffer.length < sizeof(pkt_size)) {
 		return;
 	}
 
 	/* Did we at least read the size into the buffer */
-	pkt_size = *(uint32_t *)queue->buffer.data;
+	pkt_size = *(uint32_t *)(queue->buffer.data + queue->buffer.offset);
 	if (pkt_size == 0) {
 		DEBUG(DEBUG_CRIT, ("Invalid packet of length 0\n"));
 		goto failed;
@@ -114,20 +115,26 @@ static void queue_process(struct ctdb_queue *queue)
 	}
 
 	/* Extract complete packet */
-	data = talloc_memdup(queue, queue->buffer.data, pkt_size);
+	data = talloc_memdup(queue,
+			     queue->buffer.data + queue->buffer.offset,
+			     pkt_size);
+
 	if (data == NULL) {
 		D_ERR("read error alloc failed for %u\n", pkt_size);
 		return;
 	}
 
-	/* Shift packet out from buffer */
-	if (queue->buffer.length > pkt_size) {
-		memmove(queue->buffer.data,
-			queue->buffer.data + pkt_size,
-			queue->buffer.length - pkt_size);
-	}
+	queue->buffer.offset += pkt_size;
 	queue->buffer.length -= pkt_size;
 
+	if (queue->buffer.offset < pkt_size ||
+	    queue->buffer.offset > queue->buffer.size) {
+		D_ERR("buffer offset overflow\n");
+		TALLOC_FREE(queue->buffer.data);
+		memset(&queue->buffer, 0, sizeof(queue->buffer));
+		goto failed;
+	}
+
 	if (queue->buffer.length > 0) {
 		/* There is more data to be processed, schedule an event */
 		tevent_schedule_immediate(queue->im, queue->ctdb->ev,
@@ -137,6 +144,7 @@ static void queue_process(struct ctdb_queue *queue)
 			TALLOC_FREE(queue->buffer.data);
 			queue->buffer.size = 0;
 		}
+		queue->buffer.offset = 0;
 	}
 
 	/* It is the responsibility of the callback to free 'data' */
@@ -145,10 +153,8 @@ static void queue_process(struct ctdb_queue *queue)
 
 failed:
 	queue->callback(NULL, 0, queue->private_data);
-
 }
 
-
 /*
   called when an incoming connection is readable
   This function MUST be safe for reentry via the queue callback!
@@ -156,7 +162,7 @@ failed:
 static void queue_io_read(struct ctdb_queue *queue)
 {
 	int num_ready = 0;
-	uint32_t pkt_size;
+	uint32_t pkt_size = 0;
 	ssize_t nread;
 	uint8_t *data;
 
@@ -185,12 +191,12 @@ static void queue_io_read(struct ctdb_queue *queue)
 		goto data_read;
 	}
 
-	if (queue->buffer.length < sizeof(pkt_size)) {
+	if (sizeof(pkt_size) > queue->buffer.length) {
 		/* data read is not sufficient to gather message size */
-		goto data_read;
+		goto buffer_shift;
 	}
 
-	pkt_size = *(uint32_t *)queue->buffer.data;
+	pkt_size = *(uint32_t *)(queue->buffer.data + queue->buffer.offset);
 	if (pkt_size > queue->buffer.size) {
 		data = talloc_realloc_size(queue,
 					   queue->buffer.data,
@@ -201,6 +207,21 @@ static void queue_io_read(struct ctdb_queue *queue)
 		}
 		queue->buffer.data = data;
 		queue->buffer.size = pkt_size;
+		/* fall through here as we might need to move the data as well */
+	}
+
+buffer_shift:
+	if (sizeof(pkt_size) > queue->buffer.size - queue->buffer.offset ||
+	    pkt_size > queue->buffer.size - queue->buffer.offset) {
+		/* Either the offset has progressed too far to host at least
+		 * the size information or the remaining space in the buffer
+		 * is not sufficient for the full message.
+		 * Therefore, move the data and try again.
+		 */
+		memmove(queue->buffer.data,
+			queue->buffer.data + queue->buffer.offset,
+			queue->buffer.length);
+		queue->buffer.offset = 0;
 	}
 
 data_read:
@@ -208,7 +229,9 @@ data_read:
 
 	if (num_ready > 0) {
 		nread = sys_read(queue->fd,
-				 queue->buffer.data + queue->buffer.length,
+				 queue->buffer.data +
+					queue->buffer.offset +
+					queue->buffer.length,
 				 num_ready);
 		if (nread <= 0) {
 			DEBUG(DEBUG_ERR, ("read error nread=%d\n", (int)nread));
-- 
2.17.2


From b2ba802563aec8faa4cbde9e29ce18e10df47571 Mon Sep 17 00:00:00 2001
From: Swen Schillig <swen at vnet.ibm.com>
Date: Mon, 12 Mar 2018 17:56:21 +0100
Subject: [PATCH 2/2] ctdb: Adding memory pool for queue callback

The received packet is copied into a newly allocated memory chunk for further
processing by the assigned callback. Once this is done, the memory is free'd.
This is repeated for each received packet making the memory allocation / free
an expensive task. To optimize this process, a memory pool is defined which
is sized identically to the queue's buffer.
During tests it could be seen that more than 95% of all messages were sized
below the standard buffer_size of 1k.

Signed-off-by: Swen Schillig <swen at vnet.ibm.com>
---
 ctdb/common/ctdb_io.c | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git a/ctdb/common/ctdb_io.c b/ctdb/common/ctdb_io.c
index 5bed7a61b31..d86540762ea 100644
--- a/ctdb/common/ctdb_io.c
+++ b/ctdb/common/ctdb_io.c
@@ -65,6 +65,7 @@ struct ctdb_queue {
 	size_t alignment;
 	void *private_data;
 	ctdb_queue_cb_fn_t callback;
+	TALLOC_CTX *data_pool;
 	const char *name;
 	uint32_t buffer_size;
 };
@@ -115,7 +116,7 @@ static void queue_process(struct ctdb_queue *queue)
 	}
 
 	/* Extract complete packet */
-	data = talloc_memdup(queue,
+	data = talloc_memdup(queue->data_pool,
 			     queue->buffer.data + queue->buffer.offset,
 			     pkt_size);
 
@@ -479,5 +480,11 @@ struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
 		queue->buffer_size = 1024;
 	}
 
+	queue->data_pool = talloc_pool(queue, queue->buffer_size);
+	if (queue->data_pool == NULL) {
+		TALLOC_FREE(queue);
+		return NULL;
+	}
+
 	return queue;
 }
-- 
2.17.2



More information about the samba-technical mailing list