Rev 71: ib: added external send queue to workaround downtime in http://samba.org/~tridge/psomogyi/

psomogyi at gamax.hu psomogyi at gamax.hu
Mon Feb 26 10:59:20 GMT 2007


------------------------------------------------------------
revno: 71
revision-id: psomogyi at gamax.hu-20070226105920-dlootykwymain6s1
parent: psomogyi at gamax.hu-20070222172518-plrikn35x8g2sis1
committer: Peter Somogyi <psomogyi at gamax.hu>
branch nick: ctdb
timestamp: Mon 2007-02-26 11:59:20 +0100
message:
  ib: added external send queue to workaround downtime
  Workaround is because I couldn't find a correct way in ib to reconnect cleanly (with queue kept) when destination is unreachable.
  When connection is broken, all internal queue contents are being destroyed and reconnects automatically.
  An "external" send queue is kept until the connection is up again for a dest node.
modified:
  common/ctdb_message.c          ctdb_message.c-20070208224107-9dnio7x7z33prrmt-1
  ib/ibw_ctdb.c                  ibw_ctdb.c-20070102171255-7krov7858dqza466-1
  ib/ibw_ctdb.h                  ibw_ctdb.h-20070102171259-nmuvtzt98aqzg7xp-1
  ib/ibw_ctdb_init.c             ibw_ctdb_init.c-20070102171305-cn2z4k7ibx8141d5-1
  ib/ibwrapper.c                 ibwrapper.c-20061204130028-0125b4f5a72f4b11
  ib/ibwrapper.h                 ibwrapper.h-20061204130028-32755c6266dd3c49
  ib/ibwrapper_test.c            ibwrapper_test.c-20061214171730-h11a2z5ed6pt66hj-1
=== modified file 'common/ctdb_message.c'
--- a/common/ctdb_message.c	2007-02-16 14:21:31 +0000
+++ b/common/ctdb_message.c	2007-02-26 10:59:20 +0000
@@ -60,8 +60,6 @@
 	struct ctdb_req_message *r;
 	int len;
 
-	ctdb_connect_wait(ctdb); /* recursion */
-
 	len = offsetof(struct ctdb_req_message, data) + data.dsize;
 	r = ctdb->methods->allocate_pkt(ctdb, len);
 	CTDB_NO_MEMORY(ctdb, r);

=== modified file 'ib/ibw_ctdb.c'
--- a/ib/ibw_ctdb.c	2007-02-15 16:02:38 +0000
+++ b/ib/ibw_ctdb.c	2007-02-26 10:59:20 +0000
@@ -29,8 +29,13 @@
 #include "ibwrapper.h"
 #include "ibw_ctdb.h"
 
-int ctdb_ibw_node_connect(struct ibw_ctx *ictx, struct ctdb_node *node)
+int ctdb_ibw_node_connect(struct ctdb_node *node)
 {
+	struct ctdb_ibw_node *cn = talloc_get_type(node->private, struct ctdb_ibw_node);
+	int	rc;
+
+	assert(cn!=NULL);
+	assert(cn->conn!=NULL);
 	struct sockaddr_in sock_out;
 
 	memset(&sock_out, 0, sizeof(struct sockaddr_in));
@@ -38,12 +43,12 @@
 	sock_out.sin_port = htons(node->address.port);
 	sock_out.sin_family = PF_INET;
 
-	if (ibw_connect(ictx, &sock_out, node)) {
-		DEBUG(0, ("ctdb_ibw_node_connect: ibw_connect failed - retrying in 1 sec...\n"));
+	rc = ibw_connect(cn->conn, &sock_out, node);
+	if (rc) {
+		DEBUG(0, ("ctdb_ibw_node_connect/ibw_connect failed - retrying...\n"));
 		/* try again once a second */
 		event_add_timed(node->ctdb->ev, node, timeval_current_ofs(1, 0), 
 			ctdb_ibw_node_connect_event, node);
-		return -1;
 	}
 
 	/* continues at ibw_ctdb.c/IBWC_CONNECTED in good case */
@@ -54,9 +59,8 @@
 	struct timeval t, void *private)
 {
 	struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
-	struct ibw_ctx *ictx = talloc_get_type(node->ctdb->private, struct ibw_ctx);
 
-	ctdb_ibw_node_connect(ictx, node);
+	ctdb_ibw_node_connect(node);
 }
 
 int ctdb_ibw_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn)
@@ -94,14 +98,15 @@
 		case IBWC_CONNECTED: { /* after ibw_accept or ibw_connect */
 			struct ctdb_node *node = talloc_get_type(conn->conn_userdata, struct ctdb_node);
 			if (node!=NULL) { /* after ibw_connect */
-				node->private = (void *)conn;
+				struct ctdb_ibw_node *cn = talloc_get_type(node->private, struct ctdb_ibw_node);
+
 				node->ctdb->upcalls->node_connected(node);
+				ctdb_flush_cn_queue(cn);
 			} else { /* after ibw_accept */
 				/* NOP in CTDB case */
 			}
 		} break;
 		case IBWC_DISCONNECTED: { /* after ibw_disconnect */
-			/* TODO: have a CTDB upcall */
 			struct ctdb_node *node = talloc_get_type(conn->conn_userdata, struct ctdb_node);
 			if (node!=NULL)
 				node->ctdb->upcalls->node_dead(node);
@@ -110,13 +115,16 @@
 		} break;
 		case IBWC_ERROR: {
 			struct ctdb_node *node = talloc_get_type(conn->conn_userdata, struct ctdb_node);
-			if (node!=NULL)
-				node->private = NULL; /* not to use again */
+			if (node!=NULL) {
+				struct ctdb_ibw_node *cn = talloc_get_type(node->private, struct ctdb_ibw_node);
+				struct ibw_ctx *ictx = cn->conn->ctx;
 
-			DEBUG(10, ("IBWC_ERROR, reconnecting immediately...\n"));
-			talloc_free(conn);
-			event_add_timed(node->ctdb->ev, node, timeval_current_ofs(1, 0),
-				ctdb_ibw_node_connect_event, node);
+				DEBUG(10, ("IBWC_ERROR, reconnecting...\n"));
+				talloc_free(cn->conn); /* internal queue content is destroyed */
+				cn->conn = (void *)ibw_conn_new(ictx, node);
+				event_add_timed(node->ctdb->ev, node, timeval_current_ofs(1, 0),
+					ctdb_ibw_node_connect_event, node);
+			}
 		} break;
 		default:
 			assert(0);

=== modified file 'ib/ibw_ctdb.h'
--- a/ib/ibw_ctdb.h	2007-02-15 16:02:38 +0000
+++ b/ib/ibw_ctdb.h	2007-02-26 10:59:20 +0000
@@ -21,10 +21,26 @@
  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  */
 
+struct ctdb_ibw_msg {
+	uint8_t *data;
+	uint32_t length;
+	struct ctdb_ibw_msg *prev;
+	struct ctdb_ibw_msg *next;
+};
+
+struct ctdb_ibw_node {
+	struct ibw_conn *conn;
+
+	struct ctdb_ibw_msg *queue;
+	struct ctdb_ibw_msg *queue_last;
+	int	qcnt;
+};
+
 int ctdb_ibw_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn);
 int ctdb_ibw_receive_handler(struct ibw_conn *conn, void *buf, int n);
 
-int ctdb_ibw_node_connect(struct ibw_ctx *ictx, struct ctdb_node *node);
+int ctdb_ibw_node_connect(struct ctdb_node *node);
 void ctdb_ibw_node_connect_event(struct event_context *ev, struct timed_event *te, 
 	struct timeval t, void *private);
 
+int ctdb_flush_cn_queue(struct ctdb_ibw_node *cn);

=== modified file 'ib/ibw_ctdb_init.c'
--- a/ib/ibw_ctdb_init.c	2007-02-15 16:02:38 +0000
+++ b/ib/ibw_ctdb_init.c	2007-02-26 10:59:20 +0000
@@ -58,7 +58,6 @@
  */
 static int ctdb_ibw_start(struct ctdb_context *ctdb)
 {
-	struct ibw_ctx *ictx = talloc_get_type(ctdb->private, struct ibw_ctx);
 	int i;
 
 	/* listen on our own address */
@@ -71,44 +70,88 @@
 		if (!(ctdb->flags & CTDB_FLAG_SELF_CONNECT) &&
 			ctdb_same_address(&ctdb->address, &node->address))
 			continue;
-		ctdb_ibw_node_connect(ictx, node);
+		ctdb_ibw_node_connect(node);
 	}
 
 	return 0;
 }
 
-
 /*
  * initialise ibw portion of a ctdb node 
  */
 static int ctdb_ibw_add_node(struct ctdb_node *node)
 {
-	/* TODO: clarify whether is this necessary for us ?
-	   - why not enough doing such thing internally at connect time ? */
-	return 0;
+	struct ibw_ctx *ictx = talloc_get_type(node->ctdb->private, struct ibw_ctx);
+	struct ctdb_ibw_node *cn = talloc_zero(node, struct ctdb_ibw_node);
+
+	assert(cn!=NULL);
+	cn->conn = ibw_conn_new(ictx, node);
+	node->private = (void *)cn;
+
+	return (cn->conn!=NULL ? 0 : -1);
+}
+
+static int ctdb_ibw_send_pkt(struct ibw_conn *conn, uint8_t *data, uint32_t length)
+{
+	void	*buf, *key;
+
+	if (ibw_alloc_send_buf(conn, &buf, &key, length)) {
+		DEBUG(0, ("queue_pkt/ibw_alloc_send_buf failed\n"));
+		return -1;
+	}
+
+	memcpy(buf, data, length);
+	return ibw_send(conn, buf, key, length);
+}
+
+int ctdb_flush_cn_queue(struct ctdb_ibw_node *cn)
+{
+	struct ctdb_ibw_msg *p;
+	int	rc = 0;
+
+	while(cn->queue) {
+		p = cn->queue;
+		rc = ctdb_ibw_send_pkt(cn->conn, p->data, p->length);
+		if (rc)
+			return -1; /* will be retried later when conn is up */
+
+		DLIST_REMOVE(cn->queue, p);
+		cn->qcnt--;
+		talloc_free(p); /* it will talloc_free p->data as well */
+	}
+	assert(cn->qcnt==0);
+	/* cn->queue_last = NULL is not needed - see DLIST_ADD_AFTER */
+
+	return rc;
 }
 
 static int ctdb_ibw_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length)
 {
-	struct ibw_conn *conn = talloc_get_type(node->private, struct ibw_conn);
+	struct ctdb_ibw_node *cn = talloc_get_type(node->private, struct ctdb_ibw_node);
 	int	rc;
-	void	*buf, *key;
 
 	assert(length>=sizeof(uint32_t));
+	assert(cn!=NULL);
 
-	if (conn==NULL) {
+	if (cn->conn==NULL) {
 		DEBUG(0, ("ctdb_ibw_queue_pkt: conn is NULL\n"));
 		return -1;
 	}
 
-	if (ibw_alloc_send_buf(conn, &buf, &key, length)) {
-		DEBUG(0, ("queue_pkt/ibw_alloc_send_buf failed\n"));
-		return -1;
+	if (cn->conn->state==IBWC_CONNECTED) {
+		rc = ctdb_ibw_send_pkt(cn->conn, data, length);
+	} else {
+		struct ctdb_ibw_msg *p = talloc_zero(cn, struct ctdb_ibw_msg);
+		p->data = talloc_memdup(p, data, length);
+		p->length = length;
+
+		DLIST_ADD_AFTER(cn->queue, p, cn->queue_last);
+		cn->queue_last = p;
+		cn->qcnt++;
+
+		rc = 0;
 	}
 
-	memcpy(buf, data, length);
-	rc = ibw_send(conn, buf, key, length);
-
 	return rc;
 }
 

=== modified file 'ib/ibwrapper.c'
--- a/ib/ibwrapper.c	2007-02-15 16:02:38 +0000
+++ b/ib/ibwrapper.c	2007-02-26 10:59:20 +0000
@@ -161,42 +161,57 @@
 
 static int ibw_conn_priv_destruct(struct ibw_conn_priv *pconn)
 {
-	DEBUG(10, ("ibw_conn_priv_destruct(%u, cmid: %p)\n",
-		(uint32_t)pconn, pconn->cm_id));
-
-	/* free memory regions */
-	ibw_free_mr(&pconn->buf_send, &pconn->mr_send);
-	ibw_free_mr(&pconn->buf_recv, &pconn->mr_recv);
+	DEBUG(10, ("ibw_conn_priv_destruct(%p, cmid: %p)\n",
+		pconn, pconn->cm_id));
 
 	/* pconn->wr_index is freed by talloc */
 	/* pconn->wr_index[i] are freed by talloc */
 
 	/* destroy verbs */
-	if (pconn->cm_id->qp) {
-		ibv_destroy_qp(pconn->cm_id->qp);
+	if (pconn->cm_id!=NULL && pconn->cm_id->qp!=NULL) {
+		rdma_destroy_qp(pconn->cm_id);
 		pconn->cm_id->qp = NULL;
 	}
-	if (pconn->cq) {
+
+	if (pconn->cq!=NULL) {
 		ibv_destroy_cq(pconn->cq);
 		pconn->cq = NULL;
 	}
-	if (pconn->verbs_channel) {
+
+	if (pconn->verbs_channel!=NULL) {
 		ibv_destroy_comp_channel(pconn->verbs_channel);
 		pconn->verbs_channel = NULL;
 	}
+
+	/* must be freed here because its order is important */
 	if (pconn->verbs_channel_event) {
-		/* TODO: do we have to do this here? */
 		talloc_free(pconn->verbs_channel_event);
 		pconn->verbs_channel_event = NULL;
 	}
+
+	/* free memory regions */
+	ibw_free_mr(&pconn->buf_send, &pconn->mr_send);
+	ibw_free_mr(&pconn->buf_recv, &pconn->mr_recv);
+
 	if (pconn->pd) {
 		ibv_dealloc_pd(pconn->pd);
 		pconn->pd = NULL;
+		DEBUG(10, ("pconn=%p pd deallocated\n", pconn));
 	}
+
 	if (pconn->cm_id) {
 		rdma_destroy_id(pconn->cm_id);
 		pconn->cm_id = NULL;
+		DEBUG(10, ("pconn=%p cm_id destroyed\n", pconn));
 	}
+
+	return 0;
+}
+
+static int ibw_wr_destruct(struct ibw_wr *wr)
+{
+	if (wr->buf_large!=NULL)
+		ibw_free_mr(&wr->buf_large, &wr->mr_large);
 	return 0;
 }
 
@@ -209,16 +224,18 @@
 	return 0;
 }
 
-static struct ibw_conn *ibw_conn_new(struct ibw_ctx *ctx)
+struct ibw_conn *ibw_conn_new(struct ibw_ctx *ctx, TALLOC_CTX *mem_ctx)
 {
 	struct ibw_conn *conn;
 	struct ibw_conn_priv *pconn;
 
-	conn = talloc_zero(ctx, struct ibw_conn);
+	assert(ctx!=NULL);
+
+	conn = talloc_zero(mem_ctx, struct ibw_conn);
 	assert(conn!=NULL);
 	talloc_set_destructor(conn, ibw_conn_destruct);
 
-	pconn = talloc_zero(ctx, struct ibw_conn_priv);
+	pconn = talloc_zero(conn, struct ibw_conn_priv);
 	assert(pconn!=NULL);
 	talloc_set_destructor(pconn, ibw_conn_priv_destruct);
 
@@ -248,7 +265,7 @@
 	}
 	DEBUG(10, ("created channel %p\n", pconn->verbs_channel));
 
-	pconn->verbs_channel_event = event_add_fd(pctx->ectx, conn,
+	pconn->verbs_channel_event = event_add_fd(pctx->ectx, NULL, /* not pconn or conn */
 		pconn->verbs_channel->fd, EVENT_FD_READ, ibw_event_handler_verbs, conn);
 
 	pconn->pd = ibv_alloc_pd(pconn->cm_id->verbs);
@@ -371,14 +388,15 @@
 	return 0;
 }
 
-static int ibw_manage_connect(struct ibw_conn *conn, struct rdma_cm_id *cma_id)
+static int ibw_manage_connect(struct ibw_conn *conn)
 {
 	struct rdma_conn_param conn_param;
+	struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
 	int	rc;
 
-	DEBUG(10, ("ibw_manage_connect(cmid: %p)\n", cma_id));
-	rc = ibw_setup_cq_qp(conn);
-	if (rc)
+	DEBUG(10, ("ibw_manage_connect(cmid: %p)\n", pconn->cm_id));
+
+	if (ibw_setup_cq_qp(conn))
 		return -1;
 
 	/* cm connect */
@@ -387,7 +405,7 @@
 	conn_param.initiator_depth = 1;
 	conn_param.retry_count = 10;
 
-	rc = rdma_connect(cma_id, &conn_param);
+	rc = rdma_connect(pconn->cm_id, &conn_param);
 	if (rc)
 		sprintf(ibw_lasterr, "rdma_connect error %d\n", rc);
 
@@ -436,7 +454,7 @@
 		assert(cma_id->context!=NULL);
 		conn = talloc_get_type(cma_id->context, struct ibw_conn);
 
-		rc = ibw_manage_connect(conn, cma_id);
+		rc = ibw_manage_connect(conn);
 		if (rc)
 			goto error;
 
@@ -445,7 +463,7 @@
 	case RDMA_CM_EVENT_CONNECT_REQUEST:
 		DEBUG(11, ("RDMA_CM_EVENT_CONNECT_REQUEST\n"));
 		ctx->state = IBWS_CONNECT_REQUEST;
-		conn = ibw_conn_new(ctx);
+		conn = ibw_conn_new(ctx, ctx);
 		pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
 		pconn->cm_id = cma_id; /* !!! event will be freed but id not */
 		cma_id->context = (void *)conn;
@@ -459,6 +477,9 @@
 
 		/* continued at ibw_accept when invoked by the func above */
 		if (!pconn->is_accepted) {
+			rc = rdma_reject(cma_id, NULL, 0);
+			if (rc)
+				DEBUG(0, ("rdma_reject failed with rc=%d\n", rc));
 			talloc_free(conn);
 			DEBUG(10, ("pconn->cm_id %p wasn't accepted\n", pconn->cm_id));
 		}
@@ -476,6 +497,8 @@
 		conn = talloc_get_type(cma_id->context, struct ibw_conn);
 		assert(conn!=NULL); /* important assumption */
 
+		DEBUG(10, ("ibw_setup_cq_qp succeeded (cmid=%p)\n", cma_id));
+
 		/* client conn is up */
 		conn->state = IBWC_CONNECTED;
 
@@ -485,22 +508,30 @@
 
 	case RDMA_CM_EVENT_ADDR_ERROR:
 		sprintf(ibw_lasterr, "RDMA_CM_EVENT_ADDR_ERROR, error %d\n", event->status);
-		goto error;
 	case RDMA_CM_EVENT_ROUTE_ERROR:
 		sprintf(ibw_lasterr, "RDMA_CM_EVENT_ROUTE_ERROR, error %d\n", event->status);
-		goto error;
 	case RDMA_CM_EVENT_CONNECT_ERROR:
 		sprintf(ibw_lasterr, "RDMA_CM_EVENT_CONNECT_ERROR, error %d\n", event->status);
-		goto error;
 	case RDMA_CM_EVENT_UNREACHABLE:
 		sprintf(ibw_lasterr, "RDMA_CM_EVENT_UNREACHABLE, error %d\n", event->status);
-		goto error;
 	case RDMA_CM_EVENT_REJECTED:
 		sprintf(ibw_lasterr, "RDMA_CM_EVENT_REJECTED, error %d\n", event->status);
+		conn = talloc_get_type(cma_id->context, struct ibw_conn);
+		if (conn) {
+			if ((rc=rdma_ack_cm_event(event)))
+				DEBUG(0, ("reject/rdma_ack_cm_event failed with %d\n", rc));
+			event = NULL;
+			pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
+			ibw_conn_priv_destruct(pconn);
+		}
 		goto error;
 
 	case RDMA_CM_EVENT_DISCONNECTED:
 		DEBUG(11, ("RDMA_CM_EVENT_DISCONNECTED\n"));
+		if ((rc=rdma_ack_cm_event(event)))
+			DEBUG(0, ("disc/rdma_ack_cm_event failed with %d\n", rc));
+		event = NULL; /* don't ack more */
+
 		if (cma_id!=pctx->cm_id) {
 			DEBUG(0, ("client DISCONNECT event cm_id=%p\n", cma_id));
 			conn = talloc_get_type(cma_id->context, struct ibw_conn);
@@ -518,14 +549,20 @@
 		goto error;
 	}
 
-	if ((rc=rdma_ack_cm_event(event))) {
+	if (event!=NULL && (rc=rdma_ack_cm_event(event))) {
 		sprintf(ibw_lasterr, "rdma_ack_cm_event failed with %d\n", rc);
 		goto error;
 	}
 
 	return;
 error:
+	if (event!=NULL && (rc=rdma_ack_cm_event(event))) {
+		sprintf(ibw_lasterr, "rdma_ack_cm_event failed with %d\n", rc);
+		goto error;
+	}
+
 	DEBUG(0, ("cm event handler: %s", ibw_lasterr));
+
 	if (cma_id!=pctx->cm_id) {
 		conn = talloc_get_type(cma_id->context, struct ibw_conn);
 		if (conn)
@@ -569,8 +606,8 @@
 
 	while((rc=ibv_poll_cq(pconn->cq, 1, &wc))==1) {
 		if (wc.status) {
-			sprintf(ibw_lasterr, "cq completion failed status %d rc %d\n",
-				wc.status, rc);
+			sprintf(ibw_lasterr, "cq completion failed status=%d, opcode=%d, rc=%d\n",
+				wc.status, wc.opcode, rc);
 			goto error;
 		}
 
@@ -605,11 +642,57 @@
 		goto error;
 	}
 
+	ibv_ack_cq_events(pconn->cq, 1);
+
 	return;
 error:
+	ibv_ack_cq_events(pconn->cq, 1);
+
 	DEBUG(0, (ibw_lasterr));
-	conn->state = IBWC_ERROR;
-	pctx->connstate_func(NULL, conn);
+	
+	if (conn->state!=IBWC_ERROR) {
+		conn->state = IBWC_ERROR;
+		pctx->connstate_func(NULL, conn);
+	}
+}
+
+static int ibw_process_queue(struct ibw_conn *conn)
+{
+	struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
+	struct ibw_ctx_priv *pctx;
+	struct ibw_wr	*p;
+	int	rc;
+	uint32_t	msg_size;
+
+	if (pconn->queue==NULL)
+		return 0; /* NOP */
+
+	p = pconn->queue;
+
+	/* we must have at least 1 fragment to send */
+	assert(p->queued_ref_cnt>0);
+	p->queued_ref_cnt--;
+
+	pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
+	msg_size = (p->queued_ref_cnt) ? pctx->opts.recv_bufsize : p->queued_rlen;
+
+	assert(p->queued_msg!=NULL);
+	assert(msg_size!=0);
+
+	DEBUG(10, ("ibw_process_queue refcnt=%d msgsize=%u\n",
+		p->queued_ref_cnt, msg_size));
+
+	rc = ibw_send_packet(conn, p->queued_msg, p, msg_size);
+
+	/* was this the last fragment? */
+	if (p->queued_ref_cnt) {
+		p->queued_msg += pctx->opts.recv_bufsize;
+	} else {
+		DLIST_REMOVE2(pconn->queue, p, qprev, qnext);
+		p->queued_msg = NULL;
+	}
+
+	return rc;
 }
 
 static int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc)
@@ -618,7 +701,6 @@
 	struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
 	struct ibw_wr	*p;
 	int	send_index;
-	int	rc = 0;
 
 	DEBUG(10, ("ibw_wc_send(cmid: %p, wr_id: %u, bl: %u)\n",
 		pconn->cm_id, (uint32_t)wc->wr_id, (uint32_t)wc->byte_len));
@@ -662,30 +744,7 @@
 		}
 	}
 
-	if (pconn->queue) {
-		uint32_t	msg_size;
-		
-		DEBUG(10, ("ibw_wc_send#queue %u\n", (int)wc->wr_id));
-		
-		p = pconn->queue;
-
-		assert(p->queued_ref_cnt>0);
-		p->queued_ref_cnt--;
-
-		msg_size = (p->queued_ref_cnt) ? pctx->opts.recv_bufsize : p->queued_rlen;
- 
-		assert(p->queued_msg!=NULL);
-		assert(msg_size!=0);
-		rc = ibw_send_packet(conn, p->queued_msg, p, msg_size);
-		if (p->queued_ref_cnt) {
-			p->queued_msg += pctx->opts.recv_bufsize;
-		} else {
-			DLIST_REMOVE2(pconn->queue, p, qprev, qnext);
-			p->queued_msg = NULL;
-		}
-	}
-
-	return rc;
+	return ibw_process_queue(conn);
 }
 
 static int ibw_append_to_part(struct ibw_conn_priv *pconn,
@@ -874,8 +933,7 @@
 	struct ibw_ctx_priv *pctx;
 	int	rc;
 
-	DEBUG(10, ("ibw_init(ctx_userdata: %u, ectx: %u)\n",
-		(uint32_t)ctx_userdata, (uint32_t)ectx));
+	DEBUG(10, ("ibw_init(ctx_userdata: %p, ectx: %p)\n", ctx_userdata, ectx));
 
 	/* initialize basic data structures */
 	memset(ibw_lasterr, 0, IBW_LASTERR_BUFSIZE);
@@ -1010,19 +1068,25 @@
 	return 0;
 }
 
-int ibw_connect(struct ibw_ctx *ctx, struct sockaddr_in *serv_addr, void *conn_userdata)
+int ibw_connect(struct ibw_conn *conn, struct sockaddr_in *serv_addr, void *conn_userdata)
 {
-	struct ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, struct ibw_ctx_priv);
-	struct ibw_conn *conn = NULL;
+	struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
 	struct ibw_conn_priv *pconn = NULL;
 	int	rc;
 
-	conn = ibw_conn_new(ctx);
+	assert(conn!=NULL);
+
 	conn->conn_userdata = conn_userdata;
 	pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
 	DEBUG(10, ("ibw_connect: addr=%s, port=%u\n", inet_ntoa(serv_addr->sin_addr),
 		ntohs(serv_addr->sin_port)));
 
+	/* clean previous - probably half - initialization */
+	if (ibw_conn_priv_destruct(pconn)) {
+		DEBUG(0, ("ibw_connect/ibw_pconn_destruct failed for cm_id=%p\n", pconn->cm_id));
+		return -1;
+	}
+
 	/* init cm */
 	rc = rdma_create_id(pctx->cm_channel, &pconn->cm_id, conn, RDMA_PS_TCP);
 	if (rc) {
@@ -1053,11 +1117,23 @@
 
 	DEBUG(10, ("ibw_disconnect: cmid=%p\n", pconn->cm_id));
 
-	rc = rdma_disconnect(pconn->cm_id);
-	if (rc) {
-		sprintf(ibw_lasterr, "ibw_disconnect failed with %d\n", rc);
-		DEBUG(0, (ibw_lasterr));
-		return rc;
+	assert(pconn!=NULL);
+
+	switch(conn->state) {
+	case IBWC_ERROR:
+		ibw_conn_priv_destruct(pconn); /* do this here right now */
+		break;
+	case IBWC_CONNECTED:
+		rc = rdma_disconnect(pconn->cm_id);
+		if (rc) {
+			sprintf(ibw_lasterr, "ibw_disconnect failed with %d\n", rc);
+			DEBUG(0, (ibw_lasterr));
+			return rc;
+		}
+		break;
+	default:
+		DEBUG(9, ("invalid state for disconnect: %d\n", conn->state));
+		break;
 	}
 
 	return 0;
@@ -1092,6 +1168,7 @@
 		p = pconn->extra_avail;
 		if (!p) {
 			p = pconn->extra_avail = talloc_zero(pconn, struct ibw_wr);
+			talloc_set_destructor(p, ibw_wr_destruct);
 			if (p==NULL) {
 				sprintf(ibw_lasterr, "talloc_zero failed (emax: %u)\n", pconn->extra_max);
 				goto error;
@@ -1174,6 +1251,8 @@
 
 	DEBUG(10, ("ibw_send#queued(cmid: %p, len: %u)\n", pconn->cm_id, len));
 
+	/* TODO: clarify how to continue when state==IBWC_STOPPED */
+
 	/* to be sent by ibw_wc_send */
 	/* regardless "normal" or [a part of] "large" packet */
 	if (!p->queued_ref_cnt) {

=== modified file 'ib/ibwrapper.h'
--- a/ib/ibwrapper.h	2007-01-03 16:37:47 +0000
+++ b/ib/ibwrapper.h	2007-02-26 10:59:20 +0000
@@ -154,6 +154,14 @@
 int ibw_accept(struct ibw_ctx *ctx, struct ibw_conn *conn, void *conn_userdata);
 
 /*
+ * Create a new connection structure
+ * available for queueing ibw_send
+ *
+ * <parent> is needed to be notified by talloc destruct action.
+ */
+struct ibw_conn *ibw_conn_new(struct ibw_ctx *ctx, TALLOC_CTX *mem_ctx);
+
+/*
  * Needs a normal internet address here
  * can be called within IBWS_READY|IBWS_CONNECT_REQUEST
  *
@@ -162,7 +170,7 @@
  * You have +1 waiting here: you will get ibw_conn (having the
  * same <conn_userdata> member) structure in ibw_connstate_fn_t.
  */
-int ibw_connect(struct ibw_ctx *ctx, struct sockaddr_in *serv_addr, void *conn_userdata);
+int ibw_connect(struct ibw_conn *conn, struct sockaddr_in *serv_addr, void *conn_userdata);
 
 /*
  * Sends out a disconnect request.

=== modified file 'ib/ibwrapper_test.c'
--- a/ib/ibwrapper_test.c	2007-02-15 16:02:38 +0000
+++ b/ib/ibwrapper_test.c	2007-02-26 10:59:20 +0000
@@ -81,11 +81,13 @@
 
 int ibwtest_connect_everybody(struct ibwtest_ctx *tcx)
 {
-	struct ibwtest_conn	*pconn = talloc_zero(tcx, struct ibwtest_conn);
+	struct ibw_conn		*conn;
+	struct ibwtest_conn	*tconn = talloc_zero(tcx, struct ibwtest_conn);
 	int	i;
 
 	for(i=0; i<tcx->naddrs; i++) {
-		if (ibw_connect(tcx->ibwctx, &tcx->addrs[i], pconn)) {
+		conn = ibw_conn_new(tcx->ibwctx, tconn);
+		if (ibw_connect(conn, &tcx->addrs[i], tconn)) {
 			fprintf(stderr, "ibw_connect error at %d\n", i);
 			return -1;
 		}
@@ -237,7 +239,7 @@
 int ibwtest_connstate_handler(struct ibw_ctx *ctx, struct ibw_conn *conn)
 {
 	struct ibwtest_ctx	*tcx = NULL; /* userdata */
-	struct ibwtest_conn	*pconn = NULL; /* userdata */
+	struct ibwtest_conn	*tconn = NULL; /* userdata */
 
 	if (ctx) {
 		tcx = talloc_get_type(ctx->ctx_userdata, struct ibwtest_ctx);
@@ -251,8 +253,8 @@
 			break;
 		case IBWS_CONNECT_REQUEST:
 			DEBUG(10, ("test IBWS_CONNECT_REQUEST\n"));
-			pconn = talloc_zero(conn, struct ibwtest_conn);
-			if (ibw_accept(ctx, conn, pconn)) {
+			tconn = talloc_zero(conn, struct ibwtest_conn);
+			if (ibw_accept(ctx, conn, tconn)) {
 				DEBUG(0, ("error accepting the connect request\n"));
 			}
 			break;
@@ -271,7 +273,7 @@
 	}
 
 	if (conn) {
-		pconn = talloc_get_type(conn->conn_userdata, struct ibwtest_conn);
+		tconn = talloc_get_type(conn->conn_userdata, struct ibwtest_conn);
 		switch(conn->state) {
 		case IBWC_INIT:
 			DEBUG(10, ("test IBWC_INIT\n"));
@@ -300,22 +302,22 @@
 
 int ibwtest_receive_handler(struct ibw_conn *conn, void *buf, int n)
 {
-	struct ibwtest_conn *pconn;
+	struct ibwtest_conn *tconn;
 	enum testopcode op;
 	struct ibwtest_ctx *tcx = talloc_get_type(conn->ctx->ctx_userdata, struct ibwtest_ctx);
 	int	rc = 0;
 
 	assert(conn!=NULL);
 	assert(n>=sizeof(uint32_t)+1);
-	pconn = talloc_get_type(conn->conn_userdata, struct ibwtest_conn);
+	tconn = talloc_get_type(conn->conn_userdata, struct ibwtest_conn);
 
 	op = (enum testopcode)((char *)buf)[sizeof(uint32_t)];
 	if (op==TESTOP_SEND_ID) {
-		pconn->id = talloc_strdup(pconn, ((char *)buf)+sizeof(uint32_t)+1);
+		tconn->id = talloc_strdup(tconn, ((char *)buf)+sizeof(uint32_t)+1);
 	}
 	if (op==TESTOP_SEND_ID || op==TESTOP_SEND_TEXT) {
 		DEBUG(11, ("[%d]msg from %s: \"%s\"(%d)\n", op,
-			pconn->id ? pconn->id : "NULL", ((char *)buf)+sizeof(uint32_t)+1, n));
+			tconn->id ? tconn->id : "NULL", ((char *)buf)+sizeof(uint32_t)+1, n));
 	}
 
 	if (tcx->is_server) {
@@ -327,7 +329,7 @@
 				op,
 				n - sizeof(uint32_t) - 2,
 				(uint32_t)sum,
-				pconn->id ? pconn->id : "NULL"));
+				tconn->id ? tconn->id : "NULL"));
 			if (sum!=((unsigned char *)buf)[n-1]) {
 				DEBUG(0, ("ERROR: checksum mismatch %u!=%u\n",
 					(uint32_t)sum, (uint32_t)((unsigned char *)buf)[n-1]));



More information about the samba-cvs mailing list