[SCM] CTDB repository - branch 1.2 updated - ctdb-1.9.1-489-g6568fee

Ronnie Sahlberg sahlberg at samba.org
Mon Nov 7 20:00:06 MST 2011


The branch, 1.2 has been updated
       via  6568feec47b705a39c404bb1a5ff35db265aea6a (commit)
      from  418313dce4b0142d12aa73aeb5e98333055bdbf0 (commit)

http://gitweb.samba.org/?p=ctdb.git;a=shortlog;h=1.2


- Log -----------------------------------------------------------------
commit 6568feec47b705a39c404bb1a5ff35db265aea6a
Author: Ronnie Sahlberg <ronniesahlberg at gmail.com>
Date:   Tue Nov 8 06:55:46 2011 +1100

    Record Fetch Collapse: Collapse multiple fetch request into one single request.
    
    When multiple clients fetch the same record concurrently, send only one single
    fetch across the network and deferr all other fetches locally.
    This improves performance for hot records and reduces cpu load on ctdb.

-----------------------------------------------------------------------

Summary of changes:
 include/ctdb_private.h    |    4 +
 server/ctdb_daemon.c      |  207 +++++++++++++++++++++++++++++++++++++++++++++
 server/ctdb_ltdb_server.c |   11 +++
 3 files changed, 222 insertions(+), 0 deletions(-)


Changeset truncated at 500 lines:

diff --git a/include/ctdb_private.h b/include/ctdb_private.h
index 675ea49..f0050fb 100644
--- a/include/ctdb_private.h
+++ b/include/ctdb_private.h
@@ -523,6 +523,10 @@ struct ctdb_db_context {
 				  struct ctdb_ltdb_header *header,
 				  TDB_DATA data);
 
+	/* used to track which records we are currently fetching
+	   so we can avoid sending duplicate fetch requests
+	*/
+	struct trbt_tree *deferred_fetch;
 };
 
 
diff --git a/server/ctdb_daemon.c b/server/ctdb_daemon.c
index f0c7ec9..b2a03aa 100644
--- a/server/ctdb_daemon.c
+++ b/server/ctdb_daemon.c
@@ -27,6 +27,7 @@
 #include "system/wait.h"
 #include "../include/ctdb_client.h"
 #include "../include/ctdb_private.h"
+#include "../common/rb_tree.h"
 #include <sys/socket.h>
 
 struct ctdb_client_pid_list {
@@ -358,6 +359,190 @@ static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
 	daemon_incoming_packet(client, hdr);	
 }
 
+struct ctdb_deferred_fetch_call {
+	struct ctdb_deferred_fetch_call *next, *prev;
+	struct ctdb_req_call *c;
+	struct ctdb_daemon_packet_wrap *w;
+};
+
+struct ctdb_deferred_fetch_queue {
+	struct ctdb_deferred_fetch_call *deferred_calls;
+};
+
+struct ctdb_deferred_requeue {
+	struct ctdb_deferred_fetch_call *dfc;
+	struct ctdb_client *client;
+};
+
+/* called from a timer event and starts reprocessing the deferred call.*/
+static void reprocess_deferred_call(struct event_context *ev, struct timed_event *te, 
+				       struct timeval t, void *private_data)
+{
+	struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
+	struct ctdb_client *client = dfr->client;
+
+	talloc_steal(client, dfr->dfc->c);
+	daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
+	talloc_free(dfr);
+}
+
+/* the referral context is destroyed either after a timeout or when the initial
+   fetch-lock has finished.
+   at this stage, immediately start reprocessing the queued up deferred
+   calls so they get reprocessed immediately (and since we are dmaster at
+   this stage, trigger the waiting smbd processes to pick up and aquire the
+   record right away.
+*/
+static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
+{
+
+	/* need to reprocess the packets from the queue explicitely instead of
+	   just using a normal destructor since we want, need, to
+	   call the clients in the same oder as the requests queued up
+	*/
+	while (dfq->deferred_calls != NULL) {
+		struct ctdb_client *client;
+		struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
+		struct ctdb_deferred_requeue *dfr;
+
+		DLIST_REMOVE(dfq->deferred_calls, dfc);
+
+		client = ctdb_reqid_find(dfc->w->ctdb, dfc->w->client_id, struct ctdb_client);
+		if (client == NULL) {
+			DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
+				 dfc->w->client_id));
+			continue;
+		}
+
+		/* process it by pushing it back onto the eventloop */
+		dfr = talloc(client, struct ctdb_deferred_requeue);
+		if (dfr == NULL) {
+			DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
+			continue;
+		}
+
+		dfr->dfc    = talloc_steal(dfr, dfc);
+		dfr->client = client;
+
+		event_add_timed(dfc->w->ctdb->ev, client, timeval_zero(), reprocess_deferred_call, dfr);
+	}
+
+	return 0;
+}
+
+/* insert the new deferral context into the rb tree.
+   there should never be a pre-existing context here, but check for it
+   warn and destroy the previous context if there is already a deferral context
+   for this key.
+*/
+static void *insert_dfq_callback(void *parm, void *data)
+{
+        if (data) {
+		DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
+                talloc_free(data);
+        }
+        return parm;
+}
+
+/* if the original fetch-lock did not complete within a reasonable time,
+   free the context and context for all deferred requests to cause them to be
+   re-inserted into the event system.
+*/
+static void dfq_timeout(struct event_context *ev, struct timed_event *te, 
+				  struct timeval t, void *private_data)
+{
+	talloc_free(private_data);
+}
+
+/* This function is used in the local daemon to register a KEY in a database
+   for being "fetched"
+   While the remote fetch is in-flight, any futher attempts to re-fetch the
+   same record will be deferred until the fetch completes.
+*/
+static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
+{
+	uint32_t *k;
+	struct ctdb_deferred_fetch_queue *dfq;
+
+	k = talloc_zero_size(call, ((call->key.dsize + 3) & 0xfffffffc) + 4);
+	if (k == NULL) {
+		DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
+		return -1;
+	}
+
+	k[0] = (call->key.dsize + 3) / 4 + 1;
+	memcpy(&k[1], call->key.dptr, call->key.dsize);
+
+	dfq  = talloc(call, struct ctdb_deferred_fetch_queue);
+	if (dfq == NULL) {
+		DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
+		talloc_free(k);
+		return -1;
+	}
+	dfq->deferred_calls = NULL;
+
+	trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
+
+	talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
+
+	/* if the fetch havent completed in 30 seconds, just tear it all down
+	   and let it try again as the events are reissued */
+	event_add_timed(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0), dfq_timeout, dfq);
+
+	talloc_free(k);
+	return 0;
+}
+
+/* check if this is a duplicate request to a fetch already in-flight
+   if it is, make this call deferred to be reprocessed later when
+   the in-flight fetch completes.
+*/
+static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call *c)
+{
+	uint32_t *k;
+	struct ctdb_deferred_fetch_queue *dfq;
+	struct ctdb_deferred_fetch_call *dfc;
+
+	k = talloc_zero_size(c, ((key.dsize + 3) & 0xfffffffc) + 4);
+	if (k == NULL) {
+		DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
+		return -1;
+	}
+
+	k[0] = (key.dsize + 3) / 4 + 1;
+	memcpy(&k[1], key.dptr, key.dsize);
+
+	dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
+	if (dfq == NULL) {
+		talloc_free(k);
+		return -1;
+	}
+
+
+	talloc_free(k);
+
+	dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
+	if (dfc == NULL) {
+		DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
+		return -1;
+	}
+
+	dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
+	if (dfc->w == NULL) {
+		DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
+		talloc_free(dfc);
+		return -1;
+	}
+
+	dfc->c = talloc_steal(dfc, c);
+	dfc->w->ctdb = ctdb_db->ctdb;
+	dfc->w->client_id = client->client_id;
+
+	DLIST_ADD_END(dfq->deferred_calls, dfc, NULL);
+
+	return 0;
+}
+
 
 /*
   this is called when the ctdb daemon received a ctdb request call
@@ -423,6 +608,20 @@ static void daemon_request_call_from_client(struct ctdb_client *client,
 		return;
 	}
 
+	if (c->flags & CTDB_IMMEDIATE_MIGRATION) {
+		/* check if this fetch-lock request is a duplicate for a
+		   request we already have in flight. If so defer it until
+		   the first request completes.
+		 */
+		if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
+			ret = ctdb_ltdb_unlock(ctdb_db, key);
+			if (ret != 0) {
+				DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
+			}
+			return;
+		}
+	}
+
 	dstate = talloc(client, struct daemon_call_state);
 	if (dstate == NULL) {
 		ret = ctdb_ltdb_unlock(ctdb_db, key);
@@ -462,6 +661,14 @@ static void daemon_request_call_from_client(struct ctdb_client *client,
 		state = ctdb_call_local_send(ctdb_db, call, &header, &data);
 	} else {
 		state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
+		if (call->flags & CTDB_IMMEDIATE_MIGRATION) {
+			/* This request triggered a remote fetch-lock.
+			   set up a deferral for this key so any additional
+			   fetch-locks are deferred until the current one
+			   finishes.
+			 */
+			setup_deferred_fetch_locks(ctdb_db, call);
+		}
 	}
 
 	ret = ctdb_ltdb_unlock(ctdb_db, key);
diff --git a/server/ctdb_ltdb_server.c b/server/ctdb_ltdb_server.c
index 3d18e06..39dfdf3 100644
--- a/server/ctdb_ltdb_server.c
+++ b/server/ctdb_ltdb_server.c
@@ -905,6 +905,17 @@ again:
 		}
 	}
 
+	/* set up a rb tree we can use to track which records we have a 
+	   fetch-lock in-flight for so we can defer any additional calls
+	   for the same record.
+	 */
+	ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
+	if (ctdb_db->deferred_fetch == NULL) {
+		DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
+		talloc_free(ctdb_db);
+		return -1;
+	}
+
 	DLIST_ADD(ctdb->db_list, ctdb_db);
 
 	/* setting this can help some high churn databases */


-- 
CTDB repository


More information about the samba-cvs mailing list