Rev 52: 1st "working" ib version. in http://samba.org/~tridge/psomogyi/

psomogyi at gamax.hu psomogyi at gamax.hu
Thu Jan 25 10:02:00 GMT 2007


------------------------------------------------------------
revno: 52
revision-id: psomogyi at gamax.hu-20070125100159-73mdd3kowsvi1yd5
parent: psomogyi at gamax.hu-20070105171335-smiknw06nd9o7cnw
committer: Peter Somogyi <psomogyi at gamax.hu>
branch nick: ctdb
timestamp: Thu 2007-01-25 11:01:59 +0100
message:
  1st "working" ib version.
  TODO: stress test, variable size messages, flood
modified:
  ib/ibwrapper.c                 ibwrapper.c-20061204130028-0125b4f5a72f4b11
  ib/ibwrapper_internal.h        ibwrapper_internal.h-20061204130028-47f0a7e658b16ca2
  tests/ibwrapper_test.c         ibwrapper_test.c-20061214171730-h11a2z5ed6pt66hj-1
=== modified file 'ib/ibwrapper.c'
--- a/ib/ibwrapper.c	2007-01-04 15:44:41 +0000
+++ b/ib/ibwrapper.c	2007-01-25 10:01:59 +0000
@@ -57,14 +57,14 @@
 {
 	void *buf;
 
-	DEBUG(10, ("ibw_alloc_mr(cmid=%u, n=%u)\n", (uint32_t)pconn->cm_id, n));
+	DEBUG(10, ("ibw_alloc_mr(cmid=%p, n=%u)\n", pconn->cm_id, n));
 	buf = memalign(pctx->pagesize, n);
 	if (!buf) {
 		sprintf(ibw_lasterr, "couldn't allocate memory\n");
 		return NULL;
 	}
 
-	*ppmr = ibv_reg_mr(pctx->pd, buf, n, IBV_ACCESS_LOCAL_WRITE);
+	*ppmr = ibv_reg_mr(pconn->pd, buf, n, IBV_ACCESS_LOCAL_WRITE);
 	if (!*ppmr) {
 		sprintf(ibw_lasterr, "couldn't allocate mr\n");
 		free(buf);
@@ -95,7 +95,7 @@
 	int	i;
 	struct ibw_wr	*p;
 
-	DEBUG(10, ("ibw_init_memory(cmid: %u)\n", (uint32_t)pconn->cm_id));
+	DEBUG(10, ("ibw_init_memory(cmid: %p)\n", pconn->cm_id));
 	pconn->buf_send = ibw_alloc_mr(pctx, pconn,
 		opts->max_send_wr * opts->avg_send_size, &pconn->mr_send);
 	if (!pconn->buf_send) {
@@ -116,7 +116,7 @@
 	for(i=0; i<opts->max_send_wr; i++) {
 		p = pconn->wr_index[i] = talloc_zero(pconn, struct ibw_wr);
 		p->msg = pconn->buf_send + (i * opts->avg_send_size);
-		p->wr_id = i + opts->max_recv_wr;
+		p->wr_id = i;
 
 		DLIST_ADD(pconn->wr_list_avail, p);
 	}
@@ -128,11 +128,6 @@
 {
 	DEBUG(10, ("ibw_ctx_priv_destruct(%u)\n", (uint32_t)pctx));
 
-	if (pctx->pd) {
-		ibv_dealloc_pd(pctx->pd);
-		pctx->pd = NULL;
-	}
-
 	/* destroy cm */
 	if (pctx->cm_channel) {
 		rdma_destroy_event_channel(pctx->cm_channel);
@@ -159,8 +154,8 @@
 
 static int ibw_conn_priv_destruct(struct ibw_conn_priv *pconn)
 {
-	DEBUG(10, ("ibw_conn_priv_destruct(%u, cmid: %u)\n",
-		(uint32_t)pconn, (uint32_t)pconn->cm_id));
+	DEBUG(10, ("ibw_conn_priv_destruct(%u, cmid: %p)\n",
+		(uint32_t)pconn, pconn->cm_id));
 
 	/* free memory regions */
 	ibw_free_mr(&pconn->buf_send, &pconn->mr_send);
@@ -187,6 +182,10 @@
 		talloc_free(pconn->verbs_channel_event);
 		pconn->verbs_channel_event = NULL;
 	}
+	if (pconn->pd) {
+		ibv_dealloc_pd(pconn->pd);
+		pconn->pd = NULL;
+	}
 	if (pconn->cm_id) {
 		rdma_destroy_id(pconn->cm_id);
 		pconn->cm_id = NULL;
@@ -217,6 +216,7 @@
 	talloc_set_destructor(pconn, ibw_conn_priv_destruct);
 
 	conn->ctx = ctx;
+	conn->internal = (void *)pconn;
 
 	DLIST_ADD(ctx->conn_list, conn);
 
@@ -230,11 +230,7 @@
 	struct ibv_qp_init_attr init_attr;
 	int rc;
 
-	DEBUG(10, ("ibw_setup_cq_qp(cmid: %u)\n", (uint32_t)pconn->cm_id));
-
-	/* init mr */
-	if (ibw_init_memory(conn))
-		return -1;
+	DEBUG(10, ("ibw_setup_cq_qp(cmid: %p)\n", pconn->cm_id));
 
 	/* init verbs */
 	pconn->verbs_channel = ibv_create_comp_channel(pconn->cm_id->verbs);
@@ -247,6 +243,17 @@
 	pconn->verbs_channel_event = event_add_fd(pctx->ectx, conn,
 		pconn->verbs_channel->fd, EVENT_FD_READ, ibw_event_handler_verbs, conn);
 
+	pconn->pd = ibv_alloc_pd(pconn->cm_id->verbs);
+	if (!pconn->pd) {
+		sprintf(ibw_lasterr, "ibv_alloc_pd failed %d\n", errno);
+		return -1;
+	}
+	DEBUG(10, ("created pd %p\n", pconn->pd));
+
+	/* init mr */
+	if (ibw_init_memory(conn))
+		return -1;
+
 	/* init cq */
 	pconn->cq = ibv_create_cq(pconn->cm_id->verbs,
 		pctx->opts.max_recv_wr + pctx->opts.max_send_wr,
@@ -272,7 +279,7 @@
 	init_attr.send_cq = pconn->cq;
 	init_attr.recv_cq = pconn->cq;
 
-	rc = rdma_create_qp(pconn->cm_id, pctx->pd, &init_attr);
+	rc = rdma_create_qp(pconn->cm_id, pconn->pd, &init_attr);
 	if (rc) {
 		sprintf(ibw_lasterr, "rdma_create_qp failed with %d\n", rc);
 		return rc;
@@ -299,7 +306,7 @@
 	};
 	struct ibv_recv_wr *bad_wr;
 
-	DEBUG(10, ("ibw_refill_cq_recv(cmid: %u)\n", (uint32_t)pconn->cm_id));
+	DEBUG(10, ("ibw_refill_cq_recv(cmid: %p)\n", pconn->cm_id));
 
 	list.addr = (uintptr_t) pconn->buf_recv + pctx->opts.recv_bufsize * pconn->recv_index;
 	wr.wr_id = pconn->recv_index;
@@ -332,7 +339,7 @@
 	};
 	struct ibv_recv_wr *bad_wr;
 
-	DEBUG(10, ("ibw_fill_cq(cmid: %u)\n", (uint32_t)pconn->cm_id));
+	DEBUG(10, ("ibw_fill_cq(cmid: %p)\n", pconn->cm_id));
 
 	for(i = pctx->opts.max_recv_wr; i!=0; i--) {
 		list.addr = (uintptr_t) pconn->buf_recv + pctx->opts.recv_bufsize * pconn->recv_index;
@@ -355,7 +362,7 @@
 	struct rdma_conn_param conn_param;
 	int	rc;
 
-	DEBUG(10, ("ibw_manage_connect(cmid: %u)", (uint32_t)cma_id));
+	DEBUG(10, ("ibw_manage_connect(cmid: %p)\n", cma_id));
 	rc = ibw_setup_cq_qp(conn);
 	if (rc)
 		return -1;
@@ -427,6 +434,9 @@
 		cma_id->context = (void *)conn;
 		DEBUG(10, ("pconn->cm_id %p\n", pconn->cm_id));
 
+		if (ibw_setup_cq_qp(conn))
+			goto error;
+
 		conn->state = IBWC_INIT;
 		pctx->connstate_func(ctx, conn);
 
@@ -434,9 +444,6 @@
 		if (!pconn->is_accepted) {
 			talloc_free(conn);
 			DEBUG(10, ("pconn->cm_id %p wasn't accepted\n", pconn->cm_id));
-		} else {
-			if (ibw_setup_cq_qp(conn))
-				goto error;
 		}
 
 		/* TODO: clarify whether if it's needed by upper layer: */
@@ -598,23 +605,23 @@
 	struct ibw_wr	*p;
 	int	send_index;
 
-	DEBUG(10, ("ibw_wc_send(cmid: %u, wr_id: %u, bl: %u)\n",
-		(uint32_t)pconn->cm_id, (uint32_t)wc->wr_id, (uint32_t)wc->byte_len));
+	DEBUG(10, ("ibw_wc_send(cmid: %p, wr_id: %u, bl: %u)\n",
+		pconn->cm_id, (uint32_t)wc->wr_id, (uint32_t)wc->byte_len));
 
 	assert(pconn->cm_id->qp->qp_num==wc->qp_num);
-	assert(wc->wr_id > pctx->opts.max_recv_wr);
+	assert(wc->wr_id >= pctx->opts.max_recv_wr);
 	send_index = wc->wr_id - pctx->opts.max_recv_wr;
 	pconn->wr_sent--;
 
 	if (send_index < pctx->opts.max_send_wr) {
-		DEBUG(10, ("ibw_wc_send#1 %u", (int)wc->wr_id));
+		DEBUG(10, ("ibw_wc_send#1 %u\n", (int)wc->wr_id));
 		p = pconn->wr_index[send_index];
 		if (p->msg_large)
 			ibw_free_mr(&p->msg_large, &p->mr_large);
 		DLIST_REMOVE(pconn->wr_list_used, p);
 		DLIST_ADD(pconn->wr_list_avail, p);
 	} else { /* "extra" request - not optimized */
-		DEBUG(10, ("ibw_wc_send#2 %u", (int)wc->wr_id));
+		DEBUG(10, ("ibw_wc_send#2 %u\n", (int)wc->wr_id));
 		for(p=pconn->extra_sent; p!=NULL; p=p->next)
 			if (p->wr_id==(int)wc->wr_id)
 				break;
@@ -643,8 +650,8 @@
 static inline int ibw_append_to_part(struct ibw_conn_priv *pconn,
 	struct ibw_part *part, char **pp, uint32_t add_len, int info)
 {
-	DEBUG(10, ("ibw_append_to_part: cmid=%u, (bs=%u, len=%u, tr=%u), al=%u, i=%u\n",
-		(uint32_t)pconn->cm_id, part->bufsize, part->len, part->to_read, add_len, info));
+	DEBUG(10, ("ibw_append_to_part: cmid=%p, (bs=%u, len=%u, tr=%u), al=%u, i=%u\n",
+		pconn->cm_id, part->bufsize, part->len, part->to_read, add_len, info));
 
 	/* allocate more if necessary - it's an "evergrowing" buffer... */
 	if (part->len + add_len > part->bufsize) {
@@ -681,12 +688,12 @@
 static inline int ibw_wc_mem_threshold(struct ibw_conn_priv *pconn,
 	struct ibw_part *part, uint32_t threshold)
 {
-	DEBUG(10, ("ibw_wc_mem_threshold: cmid=%u, (bs=%u, len=%u, tr=%u), thr=%u\n",
-		(uint32_t)pconn->cm_id, part->bufsize, part->len, part->to_read, threshold));
+	DEBUG(10, ("ibw_wc_mem_threshold: cmid=%p, (bs=%u, len=%u, tr=%u), thr=%u\n",
+		pconn->cm_id, part->bufsize, part->len, part->to_read, threshold));
 
 	if (part->bufsize > threshold) {
-		DEBUG(3, ("ibw_wc_mem_threshold: cmid=%u, %u > %u\n",
-			(uint32_t)pconn->cm_id, part->bufsize, threshold));
+		DEBUG(3, ("ibw_wc_mem_threshold: cmid=%p, %u > %u\n",
+			pconn->cm_id, part->bufsize, threshold));
 		talloc_free(part->buf);
 		part->buf = talloc_size(pconn, threshold);
 		if (part->buf==NULL) {
@@ -706,8 +713,8 @@
 	char	*p;
 	uint32_t	remain = wc->byte_len;
 
-	DEBUG(10, ("ibw_wc_recv: cmid=%u, wr_id: %u, bl: %u\n",
-		(uint32_t)pconn->cm_id, (uint32_t)wc->wr_id, remain));
+	DEBUG(10, ("ibw_wc_recv: cmid=%p, wr_id: %u, bl: %u\n",
+		pconn->cm_id, (uint32_t)wc->wr_id, remain));
 
 	assert(pconn->cm_id->qp->qp_num==wc->qp_num);
 	assert((int)wc->wr_id < pctx->opts.max_recv_wr);
@@ -872,14 +879,6 @@
 	}
 	DEBUG(10, ("created cm_id %p\n", pctx->cm_id));
 
-	/* init verbs */
-	pctx->pd = ibv_alloc_pd(pctx->cm_id->verbs);
-	if (!pctx->pd) {
-		sprintf(ibw_lasterr, "ibv_alloc_pd failed %d\n", errno);
-		goto cleanup;
-	}
-	DEBUG(10, ("created pd %p\n", pctx->pd));
-
 	pctx->pagesize = sysconf(_SC_PAGESIZE);
 
 	return ctx;
@@ -937,7 +936,7 @@
 		sprintf(ibw_lasterr, "rdma_listen failed: %d\n", rc);
 		DEBUG(0, (ibw_lasterr));
 		return rc;
-	}	
+	}
 
 	return 0;
 }
@@ -948,7 +947,7 @@
 	struct rdma_conn_param	conn_param;
 	int	rc;
 
-	DEBUG(10, ("ibw_accept: cmid=%u\n", (uint32_t)pconn->cm_id));
+	DEBUG(10, ("ibw_accept: cmid=%p\n", pconn->cm_id));
 	conn->conn_userdata = conn_userdata;
 
 	memset(&conn_param, 0, sizeof(struct rdma_conn_param));
@@ -975,20 +974,21 @@
 	struct ibw_conn_priv *pconn = NULL;
 	int	rc;
 
-	DEBUG(10, ("ibw_connect: cmid=%u, addr=%s, port=%u\n", (uint32_t)pconn->cm_id,
-		inet_ntoa(serv_addr->sin_addr), serv_addr->sin_port));
 	conn = ibw_conn_new(ctx);
 	conn->conn_userdata = conn_userdata;
 	pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
+	DEBUG(10, ("ibw_connect: addr=%s, port=%u\n", inet_ntoa(serv_addr->sin_addr), serv_addr->sin_port));
 
+	/* init cm */
 	rc = rdma_create_id(pctx->cm_channel, &pconn->cm_id, conn, RDMA_PS_TCP);
 	if (rc) {
 		rc = errno;
-		sprintf(ibw_lasterr, "rdma_create_id error %d\n", rc);
+		sprintf(ibw_lasterr, "ibw_connect/rdma_create_id error %d\n", rc);
 		return rc;
 	}
+	DEBUG(10, ("ibw_connect: rdma_create_id succeeded, cm_id=%p\n", pconn->cm_id));
 
-	rc = rdma_resolve_addr(pconn->cm_id, NULL, (struct sockaddr *) &serv_addr, 2000);
+	rc = rdma_resolve_addr(pconn->cm_id, NULL, (struct sockaddr *) serv_addr, 2000);
 	if (rc) {
 		sprintf(ibw_lasterr, "rdma_resolve_addr error %d\n", rc);
 		DEBUG(0, (ibw_lasterr));
@@ -1006,7 +1006,7 @@
 	struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
 	struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
 
-	DEBUG(10, ("ibw_disconnect: cmid=%u\n", (uint32_t)pconn->cm_id));
+	DEBUG(10, ("ibw_disconnect: cmid=%p\n", pconn->cm_id));
 
 	rc = rdma_disconnect(pctx->cm_id);
 	if (rc) {
@@ -1027,7 +1027,7 @@
 	struct ibw_wr *p = pconn->wr_list_avail;
 
 	if (p!=NULL) {
-		DEBUG(10, ("ibw_alloc_send_buf#1: cmid=%u, len=%d\n", (uint32_t)pconn->cm_id, len));
+		DEBUG(10, ("ibw_alloc_send_buf#1: cmid=%p, len=%d\n", pconn->cm_id, len));
 
 		DLIST_REMOVE(pconn->wr_list_avail, p);
 		DLIST_ADD(pconn->wr_list_used, p);
@@ -1043,7 +1043,7 @@
 			*buf = (void *)p->msg_large;
 		}
 	} else {
-		DEBUG(10, ("ibw_alloc_send_buf#2: cmid=%u, len=%d\n", (uint32_t)pconn->cm_id, len));
+		DEBUG(10, ("ibw_alloc_send_buf#2: cmid=%p, len=%d\n", pconn->cm_id, len));
 		/* not optimized */
 		p = pconn->extra_avail;
 		if (!p) {
@@ -1106,8 +1106,8 @@
 		};
 		struct ibv_send_wr *bad_wr;
 
-		DEBUG(10, ("ibw_wc_send#1(cmid: %u, wrid: %u, n: %d)\n",
-			(uint32_t)pconn->cm_id, (uint32_t)wr.wr_id, len));
+		DEBUG(10, ("ibw_send#1(cmid: %p, wrid: %u, n: %d)\n",
+			pconn->cm_id, (uint32_t)wr.wr_id, len));
 
 		list.addr = (uintptr_t)buf;
 		if (p->msg_large==NULL) {
@@ -1134,7 +1134,7 @@
 		return rc;
 	} /* else put the request into our own queue: */
 
-	DEBUG(10, ("ibw_wc_send#2(cmid: %u, len: %u)\n", (uint32_t)pconn->cm_id, len));
+	DEBUG(10, ("ibw_send#2(cmid: %p, len: %u)\n", pconn->cm_id, len));
 
 	/* to be sent by ibw_wc_send */
 	DLIST_ADD_END(pconn->queue, p, struct ibw_wr *); /* TODO: optimize */

=== modified file 'ib/ibwrapper_internal.h'
--- a/ib/ibwrapper_internal.h	2007-01-04 15:44:41 +0000
+++ b/ib/ibwrapper_internal.h	2007-01-25 10:01:59 +0000
@@ -51,8 +51,6 @@
 	struct rdma_event_channel *cm_channel;
 	struct fd_event *cm_channel_event;
 
-	struct ibv_pd	       *pd;
-
 	ibw_connstate_fn_t connstate_func; /* see ibw_init */
 	ibw_receive_fn_t receive_func; /* see ibw_init */
 
@@ -71,6 +69,7 @@
 	struct fd_event *verbs_channel_event;
 
 	struct rdma_cm_id *cm_id; /* client's cm id */
+	struct ibv_pd	*pd;
 	int	is_accepted;
 
 	struct ibv_cq	*cq; /* qp is in cm_id */

=== modified file 'tests/ibwrapper_test.c'
--- a/tests/ibwrapper_test.c	2006-12-21 16:41:48 +0000
+++ b/tests/ibwrapper_test.c	2007-01-25 10:01:59 +0000
@@ -88,17 +88,20 @@
 	char *buf;
 	void *key;
 	struct ibwtest_ctx *tcx = talloc_get_type(conn->ctx->ctx_userdata, struct ibwtest_ctx);
+	uint32_t	len;
 
 	DEBUG(10, ("test IBWC_CONNECTED\n"));
-	if (ibw_alloc_send_buf(conn, (void **)&buf, &key, strlen(tcx->id)+2)) {
+	len = sizeof(uint32_t)+strlen(tcx->id)+2;
+	if (ibw_alloc_send_buf(conn, (void **)&buf, &key, len)) {
 		DEBUG(0, ("send_id: ibw_alloc_send_buf failed\n"));
 		return -1;
 	}
 
-	buf[0] = (char)TESTOP_SEND_ID;
-	strcpy(buf+1, tcx->id);
+	/* first sizeof(uint32_t) size bytes are for length */
+	buf[sizeof(uint32_t)] = (char)TESTOP_SEND_ID;
+	strcpy(buf+sizeof(uint32_t)+1, tcx->id);
 
-	if (ibw_send(conn, buf, key, strlen(buf+1)+2)) {
+	if (ibw_send(conn, buf, key, len)) {
 		DEBUG(0, ("send_id: ibw_send error\n"));
 		return -1;
 	}
@@ -111,12 +114,16 @@
 	void *key;
 	uint32_t len;
 
+	if (conn->state!=IBWC_CONNECTED)
+		return 0; /* not yet up */
+
 	len = strlen(msg)+2 + sizeof(uint32_t);
 	if (ibw_alloc_send_buf(conn, (void **)&buf, &key, len)) {
 		fprintf(stderr, "send_test_msg: ibw_alloc_send_buf failed\n");
 		return -1;
 	}
 
+	p = buf;
 	p += sizeof(uint32_t);
 	p[0] = (char)TESTOP_SEND_DATA;
 	p++;
@@ -197,11 +204,15 @@
 	struct ibwtest_ctx *tcx = talloc_get_type(conn->ctx->ctx_userdata, struct ibwtest_ctx);
 
 	assert(conn!=NULL);
+	assert(n>=sizeof(uint32_t)+2);
 	pconn = talloc_get_type(conn->conn_userdata, struct ibwtest_conn);
 
-	op = (enum testopcode)((char *)buf)[0];
+	op = (enum testopcode)((char *)buf)[sizeof(uint32_t)];
+	if (op==TESTOP_SEND_ID) {
+		pconn->id = talloc_strdup(pconn, ((char *)buf)+sizeof(uint32_t)+1);
+	}
 	DEBUG(11, ("[%d]msg from %s: \"%s\"(%d)\n", op,
-		pconn->id ? pconn->id : NULL, ((char *)buf)+1, n));
+		pconn->id ? pconn->id : "NULL", ((char *)buf)+sizeof(uint32_t)+1, n));
 
 	if (tcx->is_server) {
 		char *buf2;
@@ -278,9 +289,10 @@
 
 			porcess_next = 0;
 			i++;
+			p = q; /* ++ at end */
 		}
 		if (*p==',') {
-			*p = '\0';
+			*p = '\0'; /* ++ at end */
 			porcess_next = 1;
 		}
 	}
@@ -306,9 +318,9 @@
 		tcx->naddrs * sizeof(struct sockaddr_in));
 	for(i=0; i<tcx->naddrs; i++) {
 		p = tcx->addrs + i;
+		p->sin_family = AF_INET;
 		p->sin_addr.s_addr = inet_addr(attrs[i].name);
 		p->sin_port = atoi(attrs[i].value);
-		p->sin_family = AF_INET;
 	}
 
 	return 0;
@@ -317,7 +329,7 @@
 int ibwtest_init_server(struct ibwtest_ctx *tcx)
 {
 	if (tcx->naddrs!=1) {
-		fprintf(stderr, "incorrecr number of addrs(%d!=1)\n", tcx->naddrs);
+		fprintf(stderr, "incorrect number of addrs(%d!=1)\n", tcx->naddrs);
 		return -1;
 	}
 
@@ -325,6 +337,11 @@
 		DEBUG(0, ("ERROR: ibw_bind failed\n"));
 		return -1;
 	}
+	
+	if (ibw_listen(tcx->ibwctx, 1)) {
+		DEBUG(0, ("ERROR: ibw_listen failed\n"));
+		return -1;
+	}
 
 	/* continued at IBWS_READY */
 	return 0;
@@ -357,7 +374,7 @@
 	testctx = tcx;
 	signal(SIGQUIT, ibwtest_sigquit_handler);
 
-	while ((op=getopt(argc, argv, "i:o:d:m:s")) != -1) {
+	while ((op=getopt(argc, argv, "i:o:d:m:st:")) != -1) {
 		switch (op) {
 		case 'i':
 			tcx->id = talloc_strdup(tcx, optarg);
@@ -375,6 +392,9 @@
 		case 's':
 			tcx->is_server = 1;
 			break;
+		case 't':
+			tcx->nsec = (unsigned int)atoi(optarg);
+			break;
 		default:
 			fprintf(stderr, "ERROR: unknown option -%c\n", (char)op);
 			ibwtest_usage(tcx, argv[0]);



More information about the samba-cvs mailing list