[RFC] [CTDB] Optimized memory handling.

Jeremy Allison jra at samba.org
Tue Dec 12 23:52:10 UTC 2017


On Tue, Dec 12, 2017 at 12:49:08PM +0100, Swen Schillig via samba-technical wrote:
> As mentioned in an earlier mail, I'm new to SAMBA and CTDB.
> During my on-boarding (reading code) I was wondering if the memory
> handling is really so fortunate.
> 
> The attached patch shows a more extensive usage of theĀ 
> memory pool concepts provided by the talloc API.
> In addition I re-organized the processing a bit in that way that I'm
> trying to first gather the memory requirements and then allocate,
> instead of the other way around and then fail.
> 
> The currently used magic numbers for the pool sizes are still under
> investigation...hints and recommendations are welcome.
> 
> However, initial tests showed big improvements.

Can you give more details on what you mean by "big improvements" ?
Some numbers would be good :-).

Thanks,

Jeremy.

> diff --git a/ctdb/common/ctdb_io.c b/ctdb/common/ctdb_io.c
> index 2b43287..f0a73d1 100644
> --- a/ctdb/common/ctdb_io.c
> +++ b/ctdb/common/ctdb_io.c
> @@ -70,7 +70,8 @@ struct ctdb_queue {
>  	uint32_t buffer_size;
>  };
>  
> -
> +#define DEFAULT_BUFFER_SIZE 1024
> +static TALLOC_CTX *data_pool = NULL;
>  
>  int ctdb_queue_length(struct ctdb_queue *queue)
>  {
> @@ -98,7 +99,9 @@ static void queue_process(struct ctdb_queue *queue)
>  {
>  	uint32_t pkt_size;
>  	uint8_t *data;
> +	uint8_t *tmp_bdata;
>  
> +	/* Did we at least read the size into the buffer */
>  	if (queue->buffer.length < sizeof(pkt_size)) {
>  		return;
>  	}
> @@ -117,12 +120,11 @@ static void queue_process(struct ctdb_queue *queue)
>  	}
>  
>  	/* Extract complete packet */
> -	data = talloc_size(queue, pkt_size);
> +	data = talloc_memdup(data_pool, queue->buffer.data, pkt_size);
>  	if (data == NULL) {
> -		DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", pkt_size));
> +		DEBUG(DEBUG_ERR, ("data alloc failed for %u\n", pkt_size));
>  		return;
>  	}
> -	memcpy(data, queue->buffer.data, pkt_size);
>  
>  	/* Shift packet out from buffer */
>  	if (queue->buffer.length > pkt_size) {
> @@ -136,10 +138,15 @@ static void queue_process(struct ctdb_queue *queue)
>  		/* There is more data to be processed, schedule an event */
>  		tevent_schedule_immediate(queue->im, queue->ctdb->ev,
>  					  queue_process_event, queue);
> -	} else {
> -		if (queue->buffer.size > queue->buffer_size) {
> -			TALLOC_FREE(queue->buffer.data);
> -			queue->buffer.size = 0;
> +	} else if (queue->buffer.size > (queue->buffer_size << 1)) {
> +		/* only reduce size if buffer grew to more than double */
> +		// SS should that be done at all ? Why not leaving it as it is.
> +
> +		tmp_bdata = talloc_realloc_size(queue, queue->buffer.data,
> +						queue->buffer_size);
> +		if (likely(tmp_bdata)) {
> +			queue->buffer.data = tmp_bdata;
> +			queue->buffer.size = queue->buffer_size;
>  		}
>  	}
>  
> @@ -152,17 +159,15 @@ failed:
>  
>  }
>  
> -
>  /*
>    called when an incoming connection is readable
>    This function MUST be safe for reentry via the queue callback!
>  */
>  static void queue_io_read(struct ctdb_queue *queue)
>  {
> -	int num_ready = 0;
>  	ssize_t nread;
>  	uint8_t *data;
> -	int navail;
> +	int num_ready;
>  
>  	/* check how much data is available on the socket for immediately
>  	   guaranteed nonblocking access.
> @@ -173,24 +178,24 @@ static void queue_io_read(struct ctdb_queue *queue)
>  	if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
>  		return;
>  	}
> +
>  	if (num_ready == 0) {
>  		/* the descriptor has been closed */
>  		goto failed;
>  	}
>  
> -	if (queue->buffer.data == NULL) {
> -		/* starting fresh, allocate buf to read data */
> -		queue->buffer.data = talloc_size(queue, queue->buffer_size);
> -		if (queue->buffer.data == NULL) {
> -			DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", num_ready));
> -			goto failed;
> -		}
> -		queue->buffer.size = queue->buffer_size;
> -	} else if (queue->buffer.extend > 0) {
> +	if (num_ready > (queue->buffer.size - queue->buffer.length)) {
> +		queue->buffer.extend = MAX(queue->buffer.extend,
> +					   (num_ready + queue->buffer.length));
> +	}
> +
> +	if (queue->buffer.extend > 0) {
>  		/* extending buffer */
> -		data = talloc_realloc_size(queue, queue->buffer.data, queue->buffer.extend);
> +		data = talloc_realloc_size(queue, queue->buffer.data,
> +					   queue->buffer.extend);
>  		if (data == NULL) {
> -			DEBUG(DEBUG_ERR, ("read error realloc failed for %u\n", queue->buffer.extend));
> +			DEBUG(DEBUG_ERR, ("read error realloc failed for %u\n",
> +					  queue->buffer.extend));
>  			goto failed;
>  		}
>  		queue->buffer.data = data;
> @@ -198,22 +203,17 @@ static void queue_io_read(struct ctdb_queue *queue)
>  		queue->buffer.extend = 0;
>  	}
>  
> -	navail = queue->buffer.size - queue->buffer.length;
> -	if (num_ready > navail) {
> -		num_ready = navail;
> -	}
> +	nread = sys_read(queue->fd, queue->buffer.data + queue->buffer.length,
> +			 num_ready);
>  
> -	if (num_ready > 0) {
> -		nread = sys_read(queue->fd,
> -				 queue->buffer.data + queue->buffer.length,
> -				 num_ready);
> -		if (nread <= 0) {
> -			DEBUG(DEBUG_ERR, ("read error nread=%d\n", (int)nread));
> -			goto failed;
> -		}
> -		queue->buffer.length += nread;
> +	if (nread <= 0) {
> +		DEBUG(DEBUG_ERR, ("read error %d (%s)\n", errno,
> +							  strerror(errno)));
> +		goto failed;
>  	}
>  
> +	queue->buffer.length += nread;
> +
>  	queue_process(queue);
>  	return;
>  
> @@ -392,8 +392,7 @@ int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
>  int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
>  {
>  	queue->fd = fd;
> -	talloc_free(queue->fde);
> -	queue->fde = NULL;
> +	TALLOC_FREE(queue->fde);
>  
>  	if (fd != -1) {
>  		queue->fde = tevent_add_fd(queue->ctdb->ev, queue, fd,
> @@ -423,34 +422,47 @@ struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
>  	struct ctdb_queue *queue;
>  	va_list ap;
>  
> -	queue = talloc_zero(mem_ctx, struct ctdb_queue);
> +	if (unlikely(data_pool == NULL)) {
> +		data_pool = talloc_pool(NULL, 32 * DEFAULT_BUFFER_SIZE);
> +		if (data_pool == NULL) {
> +			DEBUG(DEBUG_ERR, ("data_pool alloc failed for %p\n",
> +					  private_data));
> +			return NULL;
> +		}
> +	}
> +
> +	queue = talloc_pooled_object(ctx, struct ctdb_queue, 4, 2048);
>  	CTDB_NO_MEMORY_NULL(ctdb, queue);
> +	memset(queue, 0, sizeof(struct ctdb_queue));
>  
>  	va_start(ap, fmt);
>  	queue->name = talloc_vasprintf(queue, fmt, ap);
>  	va_end(ap);
> -	CTDB_NO_MEMORY_NULL(ctdb, queue->name);
> -
> -	queue->im= tevent_create_immediate(queue);
> -	CTDB_NO_MEMORY_NULL(ctdb, queue->im);
>  
> +	queue->im = tevent_create_immediate(queue);
>  	queue->ctdb = ctdb;
>  	queue->alignment = alignment;
>  	queue->private_data = private_data;
>  	queue->callback = callback;
>  
>  	if (ctdb_queue_set_fd(queue, fd) != 0) {
> -		talloc_free(queue);
> -		return NULL;
> +		goto failed;
>  	}
>  
> -	queue->buffer_size = ctdb->tunable.queue_buffer_size;
>  	/* In client code, ctdb->tunable is not initialized.
>  	 * This does not affect recovery daemon.
>  	 */
> -	if (queue->buffer_size == 0) {
> -		queue->buffer_size = 1024;
> +	queue->buffer_size = ctdb->tunable.queue_buffer_size ?
> +				ctdb->tunable.queue_buffer_size :
> +				DEFAULT_BUFFER_SIZE;
> +
> +	queue->buffer.data = talloc_size(queue, queue->buffer_size);
> +	if (likely(queue->buffer.data != NULL)) {
> +		queue->buffer.size = queue->buffer_size;
> +		return queue;
>  	}
>  
> -	return queue;
> +failed:
> +	talloc_free(ctx);
> +	return NULL;
>  }




More information about the samba-technical mailing list