[RFC] [CTDB] Optimized memory handling.
Jeremy Allison
jra at samba.org
Tue Dec 12 23:52:10 UTC 2017
On Tue, Dec 12, 2017 at 12:49:08PM +0100, Swen Schillig via samba-technical wrote:
> As mentioned in an earlier mail, I'm new to SAMBA and CTDB.
> During my on-boarding (reading code) I was wondering if the memory
> handling is really so fortunate.
>
> The attached patch shows a more extensive usage of theĀ
> memory pool concepts provided by the talloc API.
> In addition I re-organized the processing a bit in that way that I'm
> trying to first gather the memory requirements and then allocate,
> instead of the other way around and then fail.
>
> The currently used magic numbers for the pool sizes are still under
> investigation...hints and recommendations are welcome.
>
> However, initial tests showed big improvements.
Can you give more details on what you mean by "big improvements" ?
Some numbers would be good :-).
Thanks,
Jeremy.
> diff --git a/ctdb/common/ctdb_io.c b/ctdb/common/ctdb_io.c
> index 2b43287..f0a73d1 100644
> --- a/ctdb/common/ctdb_io.c
> +++ b/ctdb/common/ctdb_io.c
> @@ -70,7 +70,8 @@ struct ctdb_queue {
> uint32_t buffer_size;
> };
>
> -
> +#define DEFAULT_BUFFER_SIZE 1024
> +static TALLOC_CTX *data_pool = NULL;
>
> int ctdb_queue_length(struct ctdb_queue *queue)
> {
> @@ -98,7 +99,9 @@ static void queue_process(struct ctdb_queue *queue)
> {
> uint32_t pkt_size;
> uint8_t *data;
> + uint8_t *tmp_bdata;
>
> + /* Did we at least read the size into the buffer */
> if (queue->buffer.length < sizeof(pkt_size)) {
> return;
> }
> @@ -117,12 +120,11 @@ static void queue_process(struct ctdb_queue *queue)
> }
>
> /* Extract complete packet */
> - data = talloc_size(queue, pkt_size);
> + data = talloc_memdup(data_pool, queue->buffer.data, pkt_size);
> if (data == NULL) {
> - DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", pkt_size));
> + DEBUG(DEBUG_ERR, ("data alloc failed for %u\n", pkt_size));
> return;
> }
> - memcpy(data, queue->buffer.data, pkt_size);
>
> /* Shift packet out from buffer */
> if (queue->buffer.length > pkt_size) {
> @@ -136,10 +138,15 @@ static void queue_process(struct ctdb_queue *queue)
> /* There is more data to be processed, schedule an event */
> tevent_schedule_immediate(queue->im, queue->ctdb->ev,
> queue_process_event, queue);
> - } else {
> - if (queue->buffer.size > queue->buffer_size) {
> - TALLOC_FREE(queue->buffer.data);
> - queue->buffer.size = 0;
> + } else if (queue->buffer.size > (queue->buffer_size << 1)) {
> + /* only reduce size if buffer grew to more than double */
> + // SS should that be done at all ? Why not leaving it as it is.
> +
> + tmp_bdata = talloc_realloc_size(queue, queue->buffer.data,
> + queue->buffer_size);
> + if (likely(tmp_bdata)) {
> + queue->buffer.data = tmp_bdata;
> + queue->buffer.size = queue->buffer_size;
> }
> }
>
> @@ -152,17 +159,15 @@ failed:
>
> }
>
> -
> /*
> called when an incoming connection is readable
> This function MUST be safe for reentry via the queue callback!
> */
> static void queue_io_read(struct ctdb_queue *queue)
> {
> - int num_ready = 0;
> ssize_t nread;
> uint8_t *data;
> - int navail;
> + int num_ready;
>
> /* check how much data is available on the socket for immediately
> guaranteed nonblocking access.
> @@ -173,24 +178,24 @@ static void queue_io_read(struct ctdb_queue *queue)
> if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
> return;
> }
> +
> if (num_ready == 0) {
> /* the descriptor has been closed */
> goto failed;
> }
>
> - if (queue->buffer.data == NULL) {
> - /* starting fresh, allocate buf to read data */
> - queue->buffer.data = talloc_size(queue, queue->buffer_size);
> - if (queue->buffer.data == NULL) {
> - DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", num_ready));
> - goto failed;
> - }
> - queue->buffer.size = queue->buffer_size;
> - } else if (queue->buffer.extend > 0) {
> + if (num_ready > (queue->buffer.size - queue->buffer.length)) {
> + queue->buffer.extend = MAX(queue->buffer.extend,
> + (num_ready + queue->buffer.length));
> + }
> +
> + if (queue->buffer.extend > 0) {
> /* extending buffer */
> - data = talloc_realloc_size(queue, queue->buffer.data, queue->buffer.extend);
> + data = talloc_realloc_size(queue, queue->buffer.data,
> + queue->buffer.extend);
> if (data == NULL) {
> - DEBUG(DEBUG_ERR, ("read error realloc failed for %u\n", queue->buffer.extend));
> + DEBUG(DEBUG_ERR, ("read error realloc failed for %u\n",
> + queue->buffer.extend));
> goto failed;
> }
> queue->buffer.data = data;
> @@ -198,22 +203,17 @@ static void queue_io_read(struct ctdb_queue *queue)
> queue->buffer.extend = 0;
> }
>
> - navail = queue->buffer.size - queue->buffer.length;
> - if (num_ready > navail) {
> - num_ready = navail;
> - }
> + nread = sys_read(queue->fd, queue->buffer.data + queue->buffer.length,
> + num_ready);
>
> - if (num_ready > 0) {
> - nread = sys_read(queue->fd,
> - queue->buffer.data + queue->buffer.length,
> - num_ready);
> - if (nread <= 0) {
> - DEBUG(DEBUG_ERR, ("read error nread=%d\n", (int)nread));
> - goto failed;
> - }
> - queue->buffer.length += nread;
> + if (nread <= 0) {
> + DEBUG(DEBUG_ERR, ("read error %d (%s)\n", errno,
> + strerror(errno)));
> + goto failed;
> }
>
> + queue->buffer.length += nread;
> +
> queue_process(queue);
> return;
>
> @@ -392,8 +392,7 @@ int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
> int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
> {
> queue->fd = fd;
> - talloc_free(queue->fde);
> - queue->fde = NULL;
> + TALLOC_FREE(queue->fde);
>
> if (fd != -1) {
> queue->fde = tevent_add_fd(queue->ctdb->ev, queue, fd,
> @@ -423,34 +422,47 @@ struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
> struct ctdb_queue *queue;
> va_list ap;
>
> - queue = talloc_zero(mem_ctx, struct ctdb_queue);
> + if (unlikely(data_pool == NULL)) {
> + data_pool = talloc_pool(NULL, 32 * DEFAULT_BUFFER_SIZE);
> + if (data_pool == NULL) {
> + DEBUG(DEBUG_ERR, ("data_pool alloc failed for %p\n",
> + private_data));
> + return NULL;
> + }
> + }
> +
> + queue = talloc_pooled_object(ctx, struct ctdb_queue, 4, 2048);
> CTDB_NO_MEMORY_NULL(ctdb, queue);
> + memset(queue, 0, sizeof(struct ctdb_queue));
>
> va_start(ap, fmt);
> queue->name = talloc_vasprintf(queue, fmt, ap);
> va_end(ap);
> - CTDB_NO_MEMORY_NULL(ctdb, queue->name);
> -
> - queue->im= tevent_create_immediate(queue);
> - CTDB_NO_MEMORY_NULL(ctdb, queue->im);
>
> + queue->im = tevent_create_immediate(queue);
> queue->ctdb = ctdb;
> queue->alignment = alignment;
> queue->private_data = private_data;
> queue->callback = callback;
>
> if (ctdb_queue_set_fd(queue, fd) != 0) {
> - talloc_free(queue);
> - return NULL;
> + goto failed;
> }
>
> - queue->buffer_size = ctdb->tunable.queue_buffer_size;
> /* In client code, ctdb->tunable is not initialized.
> * This does not affect recovery daemon.
> */
> - if (queue->buffer_size == 0) {
> - queue->buffer_size = 1024;
> + queue->buffer_size = ctdb->tunable.queue_buffer_size ?
> + ctdb->tunable.queue_buffer_size :
> + DEFAULT_BUFFER_SIZE;
> +
> + queue->buffer.data = talloc_size(queue, queue->buffer_size);
> + if (likely(queue->buffer.data != NULL)) {
> + queue->buffer.size = queue->buffer_size;
> + return queue;
> }
>
> - return queue;
> +failed:
> + talloc_free(ctx);
> + return NULL;
> }
More information about the samba-technical
mailing list