Rev 51: ibw: modified tridge's code - in my point of view in http://samba.org/~tridge/psomogyi/

psomogyi at gamax.hu psomogyi at gamax.hu
Fri Jan 5 17:13:35 GMT 2007


------------------------------------------------------------
revno: 51
revision-id: psomogyi at gamax.hu-20070105171335-smiknw06nd9o7cnw
parent: psomogyi at gamax.hu-20070104154441-ekn574yyvselq7e3
committer: Peter Somogyi <psomogyi at gamax.hu>
branch nick: ctdb
timestamp: Fri 2007-01-05 18:13:35 +0100
message:
  ibw: modified tridge's code - in my point of view
  ibw_alloc_send and node-centric params are the basics of these important changes.
  Also tried to avoid memcpy/memdup where it was possible.
modified:
  common/ctdb.c                  ctdb.c-20061127094323-t50f58d65iaao5of-2
  common/ctdb_call.c             ctdb_call.c-20061128065342-to93h6eejj5kon81-1
  ib/ibw_ctdb_init.c             ibw_ctdb_init.c-20070102171305-cn2z4k7ibx8141d5-1
  include/ctdb_private.h         ctdb_private.h-20061117234101-o3qt14umlg9en8z0-13
  tcp/tcp_init.c                 tcp_init.c-20061128004937-x70q1cu5xzg5g2tm-2
=== modified file 'common/ctdb.c'
--- a/common/ctdb.c	2007-01-02 17:16:39 +0000
+++ b/common/ctdb.c	2007-01-05 17:13:35 +0000
@@ -30,14 +30,14 @@
 int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport)
 {
 	int ctdb_tcp_init(struct ctdb_context *ctdb);
-#ifdef HAVE_INFINIBAND
+#ifdef USE_INFINIBAND
 	int ctdb_ibw_init(struct ctdb_context *ctdb);
 #endif /*HAVE_INFINIBAND*/
 
 	if (strcmp(transport, "tcp") == 0) {
 		return ctdb_tcp_init(ctdb);
 	}
-#ifdef HAVE_INFINIBAND
+#ifdef USE_INFINIBAND
 	if (strcmp(transport, "ib") == 0) {
 		return ctdb_ibw_init(ctdb);
 	}
@@ -256,10 +256,15 @@
 	}
 }
 
+void ctdb_stopped(struct ctdb_context *ctdb)
+{
+}
+
 static const struct ctdb_upcalls ctdb_upcalls = {
 	.recv_pkt       = ctdb_recv_pkt,
 	.node_dead      = ctdb_node_dead,
-	.node_connected = ctdb_node_connected
+	.node_connected = ctdb_node_connected,
+	.stopped        = ctdb_stopped
 };
 
 /*

=== modified file 'common/ctdb_call.c'
--- a/common/ctdb_call.c	2006-12-19 01:03:10 +0000
+++ b/common/ctdb_call.c	2007-01-05 17:13:35 +0000
@@ -31,12 +31,10 @@
 /*
   queue a packet or die
 */
-static void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+static inline void ctdb_queue_packet(struct ctdb_node *node, struct ctdb_req_header *hdr)
 {
-	struct ctdb_node *node;
-	node = ctdb->nodes[hdr->destnode];
-	if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) {
-		ctdb_fatal(ctdb, "Unable to queue packet\n");
+	if (node->ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) {
+		ctdb_fatal(node->ctdb, "Unable to queue packet\n");
 	}
 }
 
@@ -121,6 +119,9 @@
 	struct ctdb_reply_error *r;
 	char *msg;
 	int len;
+	struct ctdb_node *node;
+
+	node = ctdb->nodes[hdr->srcnode];
 
 	va_start(ap, fmt);
 	msg = talloc_vasprintf(ctdb, fmt, ap);
@@ -130,7 +131,7 @@
 	va_end(ap);
 
 	len = strlen(msg)+1;
-	r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + len);
+	r = ctdb->methods->allocate_pkt(node, sizeof(*r) + len);
 	CTDB_NO_MEMORY_FATAL(ctdb, r);
 	r->hdr.length = sizeof(*r) + len;
 	r->hdr.operation = CTDB_REPLY_ERROR;
@@ -143,9 +144,8 @@
 
 	talloc_free(msg);
 
-	ctdb_queue_packet(ctdb, &r->hdr);
-
-	talloc_free(r);
+	ctdb_queue_packet(node, &r->hdr);
+	ctdb->methods->dealloc_pkt(node, r);
 }
 
 
@@ -157,8 +157,11 @@
 				    struct ctdb_ltdb_header *header)
 {
 	struct ctdb_reply_redirect *r;
-
-	r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r));
+	struct ctdb_node *node;
+
+	node = ctdb->nodes[c->hdr.srcnode];
+
+	r = ctdb->methods->allocate_pkt(node, sizeof(*r));
 	CTDB_NO_MEMORY_FATAL(ctdb, r);
 	r->hdr.length = sizeof(*r);
 	r->hdr.operation = CTDB_REPLY_REDIRECT;
@@ -167,9 +170,8 @@
 	r->hdr.reqid     = c->hdr.reqid;
 	r->dmaster       = header->dmaster;
 
-	ctdb_queue_packet(ctdb, &r->hdr);
-
-	talloc_free(r);
+	ctdb_queue_packet(node, &r->hdr);
+	ctdb->methods->dealloc_pkt(node, r);
 }
 
 /*
@@ -186,13 +188,18 @@
 {
 	struct ctdb_req_dmaster *r;
 	int len;
+	struct ctdb_node *node;
+	uint32_t destnode;
+
+	destnode = ctdb_lmaster(ctdb, key);
+	node = ctdb->nodes[destnode];
 	
 	len = sizeof(*r) + key->dsize + data->dsize;
-	r = ctdb->methods->allocate_pkt(ctdb, len);
+	r = ctdb->methods->allocate_pkt(node, len);
 	CTDB_NO_MEMORY_FATAL(ctdb, r);
 	r->hdr.length    = len;
 	r->hdr.operation = CTDB_REQ_DMASTER;
-	r->hdr.destnode  = ctdb_lmaster(ctdb, key);
+	r->hdr.destnode  = destnode;
 	r->hdr.srcnode   = ctdb->vnn;
 	r->hdr.reqid     = c->hdr.reqid;
 	r->dmaster       = header->laccessor;
@@ -205,14 +212,14 @@
 		/* we are the lmaster - don't send to ourselves */
 		ctdb_request_dmaster(ctdb, &r->hdr);
 	} else {
-		ctdb_queue_packet(ctdb, &r->hdr);
+		ctdb_queue_packet(node, &r->hdr);
 
 		/* update the ltdb to record the new dmaster */
 		header->dmaster = r->hdr.destnode;
 		ctdb_ltdb_store(ctdb, *key, header, *data);
 	}
 
-	talloc_free(r);
+	ctdb->methods->dealloc_pkt(node, r);
 }
 
 
@@ -229,7 +236,9 @@
 	TDB_DATA key, data;
 	struct ctdb_ltdb_header header;
 	int ret;
+	struct ctdb_node *node;
 
+	node = ctdb->nodes[c->dmaster];
 	key.dptr = c->data;
 	key.dsize = c->keylen;
 	data.dptr = c->data + c->keylen;
@@ -255,7 +264,7 @@
 	}
 
 	/* send the CTDB_REPLY_DMASTER */
-	r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + data.dsize);
+	r = ctdb->methods->allocate_pkt(node, sizeof(*r) + data.dsize);
 	CTDB_NO_MEMORY_FATAL(ctdb, r);
 	r->hdr.length = sizeof(*r) + data.dsize;
 	r->hdr.operation = CTDB_REPLY_DMASTER;
@@ -265,9 +274,8 @@
 	r->datalen       = data.dsize;
 	memcpy(&r->data[0], data.dptr, data.dsize);
 
-	ctdb_queue_packet(ctdb, &r->hdr);
-
-	talloc_free(r);
+	ctdb_queue_packet(node, &r->hdr);
+	ctdb->methods->dealloc_pkt(node, r);
 }
 
 
@@ -281,7 +289,9 @@
 	struct ctdb_reply_call *r;
 	int ret;
 	struct ctdb_ltdb_header header;
+	struct ctdb_node *node;
 
+	node = ctdb->nodes[hdr->srcnode];
 	key.dptr = c->data;
 	key.dsize = c->keylen;
 	call_data.dptr = c->data + c->keylen;
@@ -317,7 +327,7 @@
 			call_data.dsize?&call_data:NULL,
 			&reply_data, c->hdr.srcnode);
 
-	r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + reply_data.dsize);
+	r = ctdb->methods->allocate_pkt(node, sizeof(*r) + reply_data.dsize);
 	CTDB_NO_MEMORY_FATAL(ctdb, r);
 	r->hdr.length = sizeof(*r) + reply_data.dsize;
 	r->hdr.operation = CTDB_REPLY_CALL;
@@ -327,10 +337,10 @@
 	r->datalen       = reply_data.dsize;
 	memcpy(&r->data[0], reply_data.dptr, reply_data.dsize);
 
-	ctdb_queue_packet(ctdb, &r->hdr);
+	ctdb_queue_packet(node, &r->hdr);
+	ctdb->methods->dealloc_pkt(node, r);
 
 	talloc_free(reply_data.dptr);
-	talloc_free(r);
 }
 
 enum call_state {CTDB_CALL_WAIT, CTDB_CALL_DONE, CTDB_CALL_ERROR};
@@ -440,7 +450,10 @@
 {
 	struct ctdb_reply_redirect *c = (struct ctdb_reply_redirect *)hdr;
 	struct ctdb_call_state *state;
-
+	struct ctdb_node *node;
+#ifdef USE_INFINIBAND
+	uint8_t	*r;
+#endif /* USE_INFINIBAND */
 	state = idr_find(ctdb->idr, hdr->reqid);
 
 	talloc_steal(state, c);
@@ -453,7 +466,18 @@
 	/* send it off again */
 	state->node = ctdb->nodes[c->dmaster];
 
-	ctdb_queue_packet(ctdb, &state->c->hdr);
+	node = ctdb->nodes[state->c->hdr.destnode];
+
+#ifdef USE_INFINIBAND
+	r = ctdb->methods->allocate_pkt(node, state->c->hdr.length);
+	memcpy(r, &state->c->hdr, state->c->hdr.length);
+#endif /* USE_INFINIBAND */
+
+	ctdb_queue_packet(node, &state->c->hdr);
+
+#ifdef USE_INFINIBAND
+	ctdb->methods->dealloc_pkt(node, r);
+#endif /* USE_INFINIBAND */
 }
 
 /*
@@ -520,6 +544,7 @@
 	int ret;
 	struct ctdb_ltdb_header header;
 	TDB_DATA data;
+	struct ctdb_node *node;
 
 	/*
 	  if we are the dmaster for this key then we don't need to
@@ -538,8 +563,9 @@
 	state = talloc_zero(ctdb, struct ctdb_call_state);
 	CTDB_NO_MEMORY_NULL(ctdb, state);
 
+	node = ctdb->nodes[header.dmaster];
 	len = sizeof(*state->c) + key.dsize + (call_data?call_data->dsize:0);
-	state->c = ctdb->methods->allocate_pkt(ctdb, len);
+	state->c = ctdb->methods->allocate_pkt(node, len);
 	CTDB_NO_MEMORY_NULL(ctdb, state->c);
 
 	state->c->hdr.length    = len;
@@ -566,7 +592,12 @@
 
 	talloc_set_destructor(state, ctdb_call_destructor);
 
-	ctdb_queue_packet(ctdb, &state->c->hdr);
+	ctdb_queue_packet(node, &state->c->hdr);
+
+#ifdef USE_INFINIBAND
+	ctdb->methods->dealloc_pkt(node, state->c);
+	state->c = NULL;
+#endif /* USE_INFINIBAND */
 
 	event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0), 
 			ctdb_call_timeout, state);

=== modified file 'ib/ibw_ctdb_init.c'
--- a/ib/ibw_ctdb_init.c	2007-01-04 15:44:41 +0000
+++ b/ib/ibw_ctdb_init.c	2007-01-05 17:13:35 +0000
@@ -29,6 +29,9 @@
 #include "ibwrapper.h"
 #include "ibw_ctdb.h"
 
+/* not nice; temporary workaround for the current implementation... */
+static void *last_key = NULL;
+
 static int ctdb_ibw_listen(struct ctdb_context *ctdb, int backlog)
 {
 	struct ibw_ctx *ictx = talloc_get_type(ctdb->private, struct ibw_ctx);
@@ -108,14 +111,12 @@
 /*
  * transport packet allocator - allows transport to control memory for packets
  */
-static void *ctdb_ibw_allocate_pkt(struct ctdb_context *ctdb, size_t size)
+static void *ctdb_ibw_allocate_pkt(struct ctdb_node *node, size_t size)
 {
 	struct ibw_conn *conn = NULL;
 	void *buf = NULL;
-	void *key; /* TODO: expand the param list with this */
 
-	/* TODO2: !!! I need "node" or ibw_conn here */
-	if (ibw_alloc_send_buf(conn, &buf, &key, (int)size))
+	if (ibw_alloc_send_buf(conn, &buf, &last_key, size))
 		return NULL;
 
 	return buf;
@@ -124,20 +125,40 @@
 static int ctdb_ibw_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length)
 {
 	struct ibw_conn *conn = talloc_get_type(node->private, struct ibw_conn);
-	void *key = NULL; /* TODO: expand the param list with this */
-
-	assert(conn!=NULL);
-	return ibw_send(conn, data, key, length);
+	int	rc;
+
+	rc = ibw_send(conn, data, last_key, length);
+	last_key = NULL;
+
+	return rc;
+}
+
+static void ctdb_ibw_dealloc_pkt(struct ctdb_node *node, void *data)
+{
+	if (last_key) {
+		struct ibw_conn *conn = talloc_get_type(node->private, struct ibw_conn);
+	
+		assert(conn!=NULL);
+		ibw_cancel_send_buf(conn, data, last_key);
+	} /* else ibw_send is already using it and will free it after completion */
+}
+
+static int ctdb_ibw_stop(struct ctdb_context *cctx)
+{
+	struct ibw_ctx *ictx = talloc_get_type(cctx->private, struct ibw_ctx);
+
+	assert(ictx!=NULL);
+	return ibw_stop(ictx);
 }
 
 static const struct ctdb_methods ctdb_ibw_methods = {
 	.start     = ctdb_ibw_start,
 	.add_node  = ctdb_ibw_add_node,
 	.queue_pkt = ctdb_ibw_queue_pkt,
-	.allocate_pkt = ctdb_ibw_allocate_pkt
-	
-//	.dealloc_pkt = ctdb_ibw_dealloc_pkt
-//	.stop = ctdb_ibw_stop
+	.allocate_pkt = ctdb_ibw_allocate_pkt,
+
+	.dealloc_pkt = ctdb_ibw_dealloc_pkt,
+	.stop = ctdb_ibw_stop
 };
 
 /*
@@ -146,7 +167,7 @@
 int ctdb_ibw_init(struct ctdb_context *ctdb)
 {
 	struct ibw_ctx *ictx;
-	
+
 	ictx = ibw_init(
 		NULL, //struct ibw_initattr *attr, /* TODO */
 		0, //int nattr, /* TODO */

=== modified file 'include/ctdb_private.h'
--- a/include/ctdb_private.h	2006-12-19 23:32:31 +0000
+++ b/include/ctdb_private.h	2007-01-05 17:13:35 +0000
@@ -55,7 +55,9 @@
 	int (*start)(struct ctdb_context *); /* start protocol processing */	
 	int (*add_node)(struct ctdb_node *); /* setup a new node */	
 	int (*queue_pkt)(struct ctdb_node *, uint8_t *data, uint32_t length);
-	void *(*allocate_pkt)(struct ctdb_context *, size_t );
+	void *(*allocate_pkt)(struct ctdb_node *, size_t);
+	void (*dealloc_pkt)(struct ctdb_node *, void *data);
+	int (*stop)(struct ctdb_context *); /* initiate stopping the protocol */
 };
 
 /*
@@ -70,6 +72,9 @@
 
 	/* node_connected is called when a connection to a node is established */
 	void (*node_connected)(struct ctdb_node *);
+
+	/* protocol has been stopped */
+	void (*stopped)(struct ctdb_context *);
 };
 
 /* main state of the ctdb daemon */

=== modified file 'tcp/tcp_init.c'
--- a/tcp/tcp_init.c	2006-12-19 01:07:07 +0000
+++ b/tcp/tcp_init.c	2007-01-05 17:13:35 +0000
@@ -67,21 +67,31 @@
 /*
   transport packet allocator - allows transport to control memory for packets
 */
-void *ctdb_tcp_allocate_pkt(struct ctdb_context *ctdb, size_t size)
+void *ctdb_tcp_allocate_pkt(struct ctdb_node *node, size_t size)
 {
 	/* tcp transport needs to round to 8 byte alignment to ensure
 	   that we can use a length header and 64 bit elements in
 	   structures */
 	size = (size+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1);
-	return talloc_size(ctdb, size);
-}
-
+	return talloc_size(node, size);
+}
+
+void ctdb_tcp_dealloc_pkt(struct ctdb_node *node, void *buf)
+{
+	talloc_free(buf);
+}
+
+int ctdb_tcp_stop(struct ctdb_context *ctdb)
+{
+	return 0;
+}
 
 static const struct ctdb_methods ctdb_tcp_methods = {
 	.start     = ctdb_tcp_start,
 	.add_node  = ctdb_tcp_add_node,
 	.queue_pkt = ctdb_tcp_queue_pkt,
-	.allocate_pkt = ctdb_tcp_allocate_pkt
+	.allocate_pkt = ctdb_tcp_allocate_pkt,
+	.dealloc_pkt = ctdb_tcp_dealloc_pkt
 };
 
 /*



More information about the samba-cvs mailing list