[SCM] CTDB repository - branch master updated - ctdb-2.1-26-gd788bc8

Amitay Isaacs amitay at samba.org
Mon Feb 18 23:20:52 MST 2013


The branch, master has been updated
       via  d788bc8f7212b7dc1587ae592242dc8c876f4053 (commit)
      from  855ab348901edb3ec1327499a43f509d279b8182 (commit)

http://gitweb.samba.org/?p=ctdb.git;a=shortlog;h=master


- Log -----------------------------------------------------------------
commit d788bc8f7212b7dc1587ae592242dc8c876f4053
Author: Amitay Isaacs <amitay at gmail.com>
Date:   Fri Jan 18 10:42:14 2013 +1100

    common/io: Rewrite socket handling code to read all available data
    
    This improves the processing of packets considerably.  It has been
    observed that there can be as many as 10 packets in the socket buffer and
    the current code of reading a single packet from a socket at a time is
    not very optimal.  This change reads all the bytes from socket buffer and
    then parses to extract multiple packets.  If there are multiple packets,
    set up a timed event to process next packet.
    
    Signed-off-by: Amitay Isaacs <amitay at gmail.com>

-----------------------------------------------------------------------

Summary of changes:
 common/ctdb_io.c |  160 +++++++++++++++++++++++++++++++-----------------------
 1 files changed, 92 insertions(+), 68 deletions(-)


Changeset truncated at 500 lines:

diff --git a/common/ctdb_io.c b/common/ctdb_io.c
index 3ac1b63..b4224c4 100644
--- a/common/ctdb_io.c
+++ b/common/ctdb_io.c
@@ -30,9 +30,10 @@
 #include <stdarg.h>
 
 /* structures for packet queueing - see common/ctdb_io.c */
-struct ctdb_partial {
+struct ctdb_buffer {
 	uint8_t *data;
 	uint32_t length;
+	uint32_t size;
 };
 
 struct ctdb_queue_pkt {
@@ -44,7 +45,7 @@ struct ctdb_queue_pkt {
 
 struct ctdb_queue {
 	struct ctdb_context *ctdb;
-	struct ctdb_partial partial; /* partial input packet */
+	struct ctdb_buffer buffer; /* input buffer */
 	struct ctdb_queue_pkt *out_queue, *out_queue_tail;
 	uint32_t out_queue_length;
 	struct fd_event *fde;
@@ -63,6 +64,75 @@ int ctdb_queue_length(struct ctdb_queue *queue)
 	return queue->out_queue_length;
 }
 
+static void queue_process(struct ctdb_queue *queue);
+
+static void queue_process_event(struct event_context *ev, struct timed_event *te,
+				struct timeval t, void *private_data)
+{
+	struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
+
+	queue_process(queue);
+}
+
+/*
+ * This function is used to process data in queue buffer.
+ *
+ * Queue callback function can end up freeing the queue, there should not be a
+ * loop processing packets from queue buffer.  Instead set up a timed event for
+ * immediate run to process remaining packets from buffer.
+ */
+static void queue_process(struct ctdb_queue *queue)
+{
+	uint32_t pkt_size;
+	uint8_t *data;
+
+	if (queue->buffer.length < sizeof(pkt_size)) {
+		return;
+	}
+
+	pkt_size = *(uint32_t *)queue->buffer.data;
+	if (pkt_size == 0) {
+		DEBUG(DEBUG_CRIT, ("Invalid packet of length 0\n"));
+		goto failed;
+	}
+
+	if (queue->buffer.length < pkt_size) {
+		DEBUG(DEBUG_DEBUG, ("Partial packet data read\n"));
+		return;
+	}
+
+	/* Extract complete packet */
+	data = talloc_size(queue, pkt_size);
+	if (data == NULL) {
+		DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", pkt_size));
+		return;
+	}
+	memcpy(data, queue->buffer.data, pkt_size);
+
+	/* Shift packet out from buffer */
+	if (queue->buffer.length > pkt_size) {
+		memmove(queue->buffer.data,
+			queue->buffer.data + pkt_size,
+			queue->buffer.length - pkt_size);
+	}
+	queue->buffer.length -= pkt_size;
+
+	if (queue->buffer.length > 0) {
+		/* There is more data to be processed, setup timed event */
+		event_add_timed(queue->ctdb->ev, queue, timeval_zero(),
+				queue_process_event, queue);
+	}
+
+	/* It is the responsibility of the callback to free 'data' */
+	queue->callback(data, pkt_size, queue->private_data);
+	return;
+
+failed:
+	queue->callback(NULL, 0, queue->private_data);
+
+}
+
+
 /*
   called when an incoming connection is readable
   This function MUST be safe for reentry via the queue callback!
@@ -70,10 +140,6 @@ int ctdb_queue_length(struct ctdb_queue *queue)
 static void queue_io_read(struct ctdb_queue *queue)
 {
 	int num_ready = 0;
-	uint32_t sz_bytes_req;
-	uint32_t pkt_size;
-	uint32_t pkt_bytes_remaining;
-	uint32_t to_read;
 	ssize_t nread;
 	uint8_t *data;
 
@@ -91,77 +157,33 @@ static void queue_io_read(struct ctdb_queue *queue)
 		goto failed;
 	}
 
-	if (queue->partial.data == NULL) {
-		/* starting fresh, allocate buf for size bytes */
-		sz_bytes_req = sizeof(pkt_size);
-		queue->partial.data = talloc_size(queue, sz_bytes_req);
-		if (queue->partial.data == NULL) {
-			DEBUG(DEBUG_ERR,("read error alloc failed for %u\n",
-					 sz_bytes_req));
+	if (queue->buffer.data == NULL) {
+		/* starting fresh, allocate buf to read data */
+		queue->buffer.data = talloc_size(queue, num_ready);
+		if (queue->buffer.data == NULL) {
+			DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", num_ready));
 			goto failed;
 		}
-	} else if (queue->partial.length < sizeof(pkt_size)) {
-		/* yet to find out the packet length */
-		sz_bytes_req = sizeof(pkt_size) - queue->partial.length;
-	} else {
-		/* partial packet, length known, full buf allocated */
-		sz_bytes_req = 0;
-	}
-	data = queue->partial.data;
-
-	if (sz_bytes_req > 0) {
-		to_read = MIN(sz_bytes_req, num_ready);
-		nread = read(queue->fd, data + queue->partial.length,
-			     to_read);
-		if (nread <= 0) {
-			DEBUG(DEBUG_ERR,("read error nread=%d\n", (int)nread));
+		queue->buffer.size = num_ready;
+	} else if (queue->buffer.length + num_ready > queue->buffer.size) {
+		/* extending buffer */
+		data = talloc_realloc_size(queue, queue->buffer.data, queue->buffer.length + num_ready);
+		if (data == NULL) {
+			DEBUG(DEBUG_ERR, ("read error realloc failed for %u\n", queue->buffer.length + num_ready));
 			goto failed;
 		}
-		queue->partial.length += nread;
-
-		if (nread < sz_bytes_req) {
-			/* not enough to know the length */
-			DEBUG(DEBUG_DEBUG,("Partial packet length read\n"));
-			return;
-		}
-		/* size now known, allocate buffer for the full packet */
-		queue->partial.data = talloc_realloc_size(queue, data,
-							  *(uint32_t *)data);
-		if (queue->partial.data == NULL) {
-			DEBUG(DEBUG_ERR,("read error alloc failed for %u\n",
-					 *(uint32_t *)data));
-			goto failed;
-		}
-		data = queue->partial.data;
-		num_ready -= nread;
+		queue->buffer.data = data;
+		queue->buffer.size = queue->buffer.length + num_ready;
 	}
 
-	pkt_size = *(uint32_t *)data;
-	if (pkt_size == 0) {
-		DEBUG(DEBUG_CRIT,("Invalid packet of length 0\n"));
-		goto failed;
-	}
-
-	pkt_bytes_remaining = pkt_size - queue->partial.length;
-	to_read = MIN(pkt_bytes_remaining, num_ready);
-	nread = read(queue->fd, data + queue->partial.length,
-		     to_read);
+	nread = read(queue->fd, queue->buffer.data + queue->buffer.length, num_ready);
 	if (nread <= 0) {
-		DEBUG(DEBUG_ERR,("read error nread=%d\n",
-				 (int)nread));
+		DEBUG(DEBUG_ERR, ("read error nread=%d\n", (int)nread));
 		goto failed;
 	}
-	queue->partial.length += nread;
-
-	if (queue->partial.length < pkt_size) {
-		DEBUG(DEBUG_DEBUG,("Partial packet data read\n"));
-		return;
-	}
+	queue->buffer.length += nread;
 
-	queue->partial.data = NULL;
-	queue->partial.length = 0;
-	/* it is the responsibility of the callback to free 'data' */
-	queue->callback(data, pkt_size, queue->private_data);
+	queue_process(queue);
 	return;
 
 failed:
@@ -354,6 +376,9 @@ int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
 /* If someone sets up this pointer, they want to know if the queue is freed */
 static int queue_destructor(struct ctdb_queue *queue)
 {
+	TALLOC_FREE(queue->buffer.data);
+	queue->buffer.length = 0;
+	queue->buffer.size = 0;
 	if (queue->destroyed != NULL)
 		*queue->destroyed = true;
 	return 0;
@@ -364,7 +389,6 @@ static int queue_destructor(struct ctdb_queue *queue)
  */
 struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
 				    TALLOC_CTX *mem_ctx, int fd, int alignment,
-				    
 				    ctdb_queue_cb_fn_t callback,
 				    void *private_data, const char *fmt, ...)
 {


-- 
CTDB repository


More information about the samba-cvs mailing list