[SCM] Samba Shared Repository - branch master updated

Christof Schmitt cs at samba.org
Fri Dec 7 22:28:02 UTC 2018


The branch, master has been updated
       via  353a947b4a9 ctdb: Adding memory pool for queue callback
       via  382705f495d ctdb: Introduce buffer.offset to avoid memmove
      from  7b60d72f672 librpc:ndr: Give the optimizer hints for ndr_push_bytes()

https://git.samba.org/?p=samba.git;a=shortlog;h=master


- Log -----------------------------------------------------------------
commit 353a947b4a983f6664391da6769b914d42612567
Author: Swen Schillig <swen at vnet.ibm.com>
Date:   Mon Mar 12 17:56:21 2018 +0100

    ctdb: Adding memory pool for queue callback
    
    The received packet is copied into a newly allocated memory chunk for further
    processing by the assigned callback. Once this is done, the memory is free'd.
    This is repeated for each received packet making the memory allocation / free
    an expensive task. To optimize this process, a memory pool is defined which
    is sized identically to the queue's buffer.
    During tests it could be seen that more than 95% of all messages were sized
    below the standard buffer_size of 1k.
    
    Signed-off-by: Swen Schillig <swen at vnet.ibm.com>
    Reviewed-by: Martin Schwenke <martin at meltin.net>
    Reviewed-by: Christof Schmitt <cs at samba.org>
    
    Autobuild-User(master): Christof Schmitt <cs at samba.org>
    Autobuild-Date(master): Fri Dec  7 23:27:16 CET 2018 on sn-devel-144

commit 382705f495dd7f196efc8bb24b9cee3649b44836
Author: Swen Schillig <swen at vnet.ibm.com>
Date:   Mon Mar 12 11:00:55 2018 +0100

    ctdb: Introduce buffer.offset to avoid memmove
    
    The memmove operation is quite expensive, therefore,
    a new buffer attribute "offset" is introduced to support
    an optimized buffer processing.
    The optimization is to "walk" through the buffer and process
    each packet until the buffer is fully processed (empty)
    without requiring any memmove.
    Only if a packet is in-complete, the buffer content is moved
    and the new data is read from the queue.
    This way almost all memmove operations are eliminated.
    
    Signed-off-by: Swen Schillig <swen at vnet.ibm.com>
    Reviewed-by: Martin Schwenke <martin at meltin.net>
    Reviewed-by: Christof Schmitt <cs at samba.org>

-----------------------------------------------------------------------

Summary of changes:
 ctdb/common/ctdb_io.c | 62 ++++++++++++++++++++++++++++++++++++++-------------
 1 file changed, 46 insertions(+), 16 deletions(-)


Changeset truncated at 500 lines:

diff --git a/ctdb/common/ctdb_io.c b/ctdb/common/ctdb_io.c
index 32d8fc753a6..d86540762ea 100644
--- a/ctdb/common/ctdb_io.c
+++ b/ctdb/common/ctdb_io.c
@@ -43,6 +43,7 @@ struct ctdb_buffer {
 	uint8_t *data;
 	uint32_t length;
 	uint32_t size;
+	uint32_t offset;
 };
 
 struct ctdb_queue_pkt {
@@ -64,6 +65,7 @@ struct ctdb_queue {
 	size_t alignment;
 	void *private_data;
 	ctdb_queue_cb_fn_t callback;
+	TALLOC_CTX *data_pool;
 	const char *name;
 	uint32_t buffer_size;
 };
@@ -95,14 +97,14 @@ static void queue_process_event(struct tevent_context *ev, struct tevent_immedia
 static void queue_process(struct ctdb_queue *queue)
 {
 	uint32_t pkt_size;
-	uint8_t *data;
+	uint8_t *data = NULL;
 
 	if (queue->buffer.length < sizeof(pkt_size)) {
 		return;
 	}
 
 	/* Did we at least read the size into the buffer */
-	pkt_size = *(uint32_t *)queue->buffer.data;
+	pkt_size = *(uint32_t *)(queue->buffer.data + queue->buffer.offset);
 	if (pkt_size == 0) {
 		DEBUG(DEBUG_CRIT, ("Invalid packet of length 0\n"));
 		goto failed;
@@ -114,20 +116,26 @@ static void queue_process(struct ctdb_queue *queue)
 	}
 
 	/* Extract complete packet */
-	data = talloc_memdup(queue, queue->buffer.data, pkt_size);
+	data = talloc_memdup(queue->data_pool,
+			     queue->buffer.data + queue->buffer.offset,
+			     pkt_size);
+
 	if (data == NULL) {
 		D_ERR("read error alloc failed for %u\n", pkt_size);
 		return;
 	}
 
-	/* Shift packet out from buffer */
-	if (queue->buffer.length > pkt_size) {
-		memmove(queue->buffer.data,
-			queue->buffer.data + pkt_size,
-			queue->buffer.length - pkt_size);
-	}
+	queue->buffer.offset += pkt_size;
 	queue->buffer.length -= pkt_size;
 
+	if (queue->buffer.offset < pkt_size ||
+	    queue->buffer.offset > queue->buffer.size) {
+		D_ERR("buffer offset overflow\n");
+		TALLOC_FREE(queue->buffer.data);
+		memset(&queue->buffer, 0, sizeof(queue->buffer));
+		goto failed;
+	}
+
 	if (queue->buffer.length > 0) {
 		/* There is more data to be processed, schedule an event */
 		tevent_schedule_immediate(queue->im, queue->ctdb->ev,
@@ -137,6 +145,7 @@ static void queue_process(struct ctdb_queue *queue)
 			TALLOC_FREE(queue->buffer.data);
 			queue->buffer.size = 0;
 		}
+		queue->buffer.offset = 0;
 	}
 
 	/* It is the responsibility of the callback to free 'data' */
@@ -145,10 +154,8 @@ static void queue_process(struct ctdb_queue *queue)
 
 failed:
 	queue->callback(NULL, 0, queue->private_data);
-
 }
 
-
 /*
   called when an incoming connection is readable
   This function MUST be safe for reentry via the queue callback!
@@ -156,7 +163,7 @@ failed:
 static void queue_io_read(struct ctdb_queue *queue)
 {
 	int num_ready = 0;
-	uint32_t pkt_size;
+	uint32_t pkt_size = 0;
 	ssize_t nread;
 	uint8_t *data;
 
@@ -185,12 +192,12 @@ static void queue_io_read(struct ctdb_queue *queue)
 		goto data_read;
 	}
 
-	if (queue->buffer.length < sizeof(pkt_size)) {
+	if (sizeof(pkt_size) > queue->buffer.length) {
 		/* data read is not sufficient to gather message size */
-		goto data_read;
+		goto buffer_shift;
 	}
 
-	pkt_size = *(uint32_t *)queue->buffer.data;
+	pkt_size = *(uint32_t *)(queue->buffer.data + queue->buffer.offset);
 	if (pkt_size > queue->buffer.size) {
 		data = talloc_realloc_size(queue,
 					   queue->buffer.data,
@@ -201,6 +208,21 @@ static void queue_io_read(struct ctdb_queue *queue)
 		}
 		queue->buffer.data = data;
 		queue->buffer.size = pkt_size;
+		/* fall through here as we might need to move the data as well */
+	}
+
+buffer_shift:
+	if (sizeof(pkt_size) > queue->buffer.size - queue->buffer.offset ||
+	    pkt_size > queue->buffer.size - queue->buffer.offset) {
+		/* Either the offset has progressed too far to host at least
+		 * the size information or the remaining space in the buffer
+		 * is not sufficient for the full message.
+		 * Therefore, move the data and try again.
+		 */
+		memmove(queue->buffer.data,
+			queue->buffer.data + queue->buffer.offset,
+			queue->buffer.length);
+		queue->buffer.offset = 0;
 	}
 
 data_read:
@@ -208,7 +230,9 @@ data_read:
 
 	if (num_ready > 0) {
 		nread = sys_read(queue->fd,
-				 queue->buffer.data + queue->buffer.length,
+				 queue->buffer.data +
+					queue->buffer.offset +
+					queue->buffer.length,
 				 num_ready);
 		if (nread <= 0) {
 			DEBUG(DEBUG_ERR, ("read error nread=%d\n", (int)nread));
@@ -456,5 +480,11 @@ struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
 		queue->buffer_size = 1024;
 	}
 
+	queue->data_pool = talloc_pool(queue, queue->buffer_size);
+	if (queue->data_pool == NULL) {
+		TALLOC_FREE(queue);
+		return NULL;
+	}
+
 	return queue;
 }


-- 
Samba Shared Repository



More information about the samba-cvs mailing list