Rev 40: Modified send logic to allow large messages. in http://samba.org/~tridge/psomogyi/

psomogyi at gamax.hu psomogyi at gamax.hu
Mon Dec 18 19:52:50 GMT 2006


------------------------------------------------------------
revno: 40
revision-id: psomogyi at gamax.hu-20061218195249-zwcxox0twcwpa2ij
parent: tridge at samba.org-20061218052657-mphl8cgp4g0zoavo
committer: Peter Somogyi <psomogyi at gamax.hu>
branch nick: ctdb
timestamp: Mon 2006-12-18 20:52:49 +0100
message:
  Modified send logic to allow large messages.
  TODO: receiver side.
modified:
  ib/ibwrapper.c                 ibwrapper.c-20061204130028-0125b4f5a72f4b11
  ib/ibwrapper.h                 ibwrapper.h-20061204130028-32755c6266dd3c49
  ib/ibwrapper_internal.h        ibwrapper_internal.h-20061204130028-47f0a7e658b16ca2
  tests/ibwrapper_test.c         ibwrapper_test.c-20061214171730-h11a2z5ed6pt66hj-1
=== modified file 'ib/ibwrapper.c'
--- a/ib/ibwrapper.c	2006-12-14 17:21:39 +0000
+++ b/ib/ibwrapper.c	2006-12-18 19:52:49 +0000
@@ -50,6 +50,37 @@
 	struct fd_event *fde, uint16_t flags, void *private_data);
 static int ibw_fill_cq(struct ibw_conn *conn);
 
+static void *ibw_alloc_mr(struct ibw_ctx_priv *pctx, struct ibw_conn_priv *pconn,
+	int n, struct ibv_mr **ppmr)
+{
+	void *buf;
+	buf = memalign(pctx->pagesize, n);
+	if (!buf) {
+		sprintf(ibw_lasterr, "couldn't allocate memory\n");
+		return NULL;
+	}
+
+	*ppmr = ibv_reg_mr(pctx->pd, buf, n, IBV_ACCESS_LOCAL_WRITE);
+	if (!*ppmr) {
+		sprintf(ibw_lasterr, "couldn't allocate mr\n");
+		free(buf);
+		return NULL;
+	}
+
+	return buf;
+}
+
+static void ibw_free_mr(char **ppbuf, struct ibv_mr **ppmr)
+{
+	if (*ppmr!=NULL) {
+		ibv_dereg_mr(*ppmr);
+		*ppmr = NULL;
+	}
+	if (*ppbuf) {
+		free(*ppbuf);
+		*ppbuf = NULL;
+	}
+}
 
 static int ibw_init_memory(struct ibw_conn *conn)
 {
@@ -59,23 +90,26 @@
 	int	i;
 	struct ibw_wr	*p;
 
-	pconn->buf = memalign(pctx->pagesize, pctx->max_msg_size);
-	if (!pconn->buf) {
-		sprintf(ibw_lasterr, "couldn't allocate work buf\n");
-		return -1;
-	}
-	pconn->mr = ibv_reg_mr(pctx->pd, pconn->buf,
-		pctx->qsize * pctx->max_msg_size, IBV_ACCESS_LOCAL_WRITE);
-	if (!pconn->mr) {
-		sprintf(ibw_lasterr, "couldn't allocate mr\n");
-		return -1;
-	}
-
-	pconn->wr_index = talloc_size(pconn, pctx->qsize * sizeof(struct ibw_wr *));
-
-	for(i=0; i<pctx->qsize; i++) {
+	pconn->buf_send = ibw_alloc_mr(pctx, pconn,
+		pctx->opts.max_send_wr * pctx->opts.avg_send_size, &pconn->mr_send);
+	if (!pconn->buf_send) {
+		sprintf(ibw_lasterr, "couldn't allocate work send buf\n");
+		return -1;
+	}
+
+	pconn->buf_recv = ibw_alloc_mr(pctx, pconn,
+		pctx->opts.max_recv_wr * pctx->opts.recv_bufsize, &pconn->mr_recv);
+	if (!pconn->buf_recv) {
+		sprintf(ibw_lasterr, "couldn't allocate work recv buf\n");
+		return -1;
+	}
+
+	pconn->wr_index = talloc_size(pconn, pctx->opts.max_send_wr * sizeof(struct ibw_wr *));
+	assert(pconn->wr_index!=NULL);
+
+	for(i=0; i<pctx->opts.max_send_wr; i++) {
 		p = pconn->wr_index[i] = talloc_zero(pconn, struct ibw_wr);
-		p->msg = pconn->buf + (i * pctx->max_msg_size);
+		p->msg = pconn->buf_send + (i * pctx->opts.avg_send_size);
 		p->wr_id = i;
 
 		DLIST_ADD(pconn->wr_list_avail, p);
@@ -117,14 +151,8 @@
 static int ibw_conn_priv_destruct(struct ibw_conn_priv *pconn)
 {
 	/* free memory regions */
-	if (pconn->mr) {
-		ibv_dereg_mr(pconn->mr);
-		pconn->mr = NULL;
-	}
-	if (pconn->buf) {
-		free(pconn->buf); /* memalign-ed */
-		pconn->buf = NULL;
-	}
+	ibw_free_mr(&pconn->buf_send, &pconn->mr_send);
+	ibw_free_mr(&pconn->buf_recv, &pconn->mr_recv);
 
 	/* pconn->wr_index is freed by talloc */
 	/* pconn->wr_index[i] are freed by talloc */
@@ -204,7 +232,8 @@
 		pconn->verbs_channel->fd, EVENT_FD_READ, ibw_event_handler_verbs, conn);
 
 	/* init cq */
-	pconn->cq = ibv_create_cq(pconn->cm_id->verbs, pctx->qsize,
+	pconn->cq = ibv_create_cq(pconn->cm_id->verbs,
+		pctx->opts.max_recv_wr + pctx->opts.max_send_wr,
 		conn, pconn->verbs_channel, 0);
 	if (pconn->cq==NULL) {
 		sprintf(ibw_lasterr, "ibv_create_cq failed\n");
@@ -244,8 +273,8 @@
 	int	rc;
 	struct ibv_sge list = {
 		.addr 	= (uintptr_t) NULL,
-		.length = pctx->max_msg_size,
-		.lkey 	= pconn->mr->lkey
+		.length = pctx->opts.recv_bufsize,
+		.lkey 	= pconn->mr_recv->lkey
 	};
 	struct ibv_recv_wr wr = {
 		.wr_id 	    = 0,
@@ -253,17 +282,10 @@
 		.num_sge    = 1,
 	};
 	struct ibv_recv_wr *bad_wr;
-	struct ibw_wr	*p = pconn->wr_list_avail;
 
-	if (p==NULL) {
-		sprintf(ibw_lasterr, "out of wr_list_avail");
-		DEBUG(0, (ibw_lasterr));
-		return -1;
-	}
-	DLIST_REMOVE(pconn->wr_list_avail, p);
-	DLIST_ADD(pconn->wr_list_used, p);
-	list.addr = (uintptr_t) p->msg;
-	wr.wr_id = p->wr_id;
+	list.addr = (uintptr_t) pconn->buf_recv + pctx->opts.recv_bufsize * pconn->recv_index;
+	wr.wr_id = pctx->opts.max_send_wr + pconn->recv_index;
+	pconn->recv_index = (pconn->recv_index + 1) % pctx->opts.max_recv_wr;
 
 	rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr);
 	if (rc) {
@@ -282,8 +304,8 @@
 	int	i, rc;
 	struct ibv_sge list = {
 		.addr 	= (uintptr_t) NULL,
-		.length = pctx->max_msg_size,
-		.lkey 	= pconn->mr->lkey
+		.length = pctx->opts.recv_bufsize,
+		.lkey 	= pconn->mr_recv->lkey
 	};
 	struct ibv_recv_wr wr = {
 		.wr_id 	    = 0,
@@ -291,19 +313,11 @@
 		.num_sge    = 1,
 	};
 	struct ibv_recv_wr *bad_wr;
-	struct ibw_wr	*p;
 
 	for(i = pctx->opts.max_recv_wr; i!=0; i--) {
-		p = pconn->wr_list_avail;
-		if (p==NULL) {
-			sprintf(ibw_lasterr, "out of wr_list_avail");
-			DEBUG(0, (ibw_lasterr));
-			return -1;
-		}
-		DLIST_REMOVE(pconn->wr_list_avail, p);
-		DLIST_ADD(pconn->wr_list_used, p);
-		list.addr = (uintptr_t) p->msg;
-		wr.wr_id = p->wr_id;
+		list.addr = (uintptr_t) pconn->buf_recv + pctx->opts.recv_bufsize * pconn->recv_index;
+		wr.wr_id = pctx->opts.max_send_wr + pconn->recv_index;
+		pconn->recv_index = (pconn->recv_index + 1) % pctx->opts.max_recv_wr;
 
 		rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr);
 		if (rc) {
@@ -508,8 +522,13 @@
 	
 			DEBUG(10, ("send completion\n"));
 			assert(pconn->cm_id->qp->qp_num==wc.qp_num);
-			assert(wc.wr_id < pctx->qsize);
+			assert(wc.wr_id < pctx->opts.max_send_wr);
+
 			p = pconn->wr_index[wc.wr_id];
+			if (p->msg_large) {
+				ibw_free_mr(&p->msg_large, &p->mr_large);
+			}
+
 			DLIST_REMOVE(pconn->wr_list_used, p);
 			DLIST_ADD(pconn->wr_list_avail, p);
 		}
@@ -525,19 +544,19 @@
 
 	case IBV_WC_RECV:
 		{
-			struct ibw_wr	*p;
-	
-			assert(pconn->cm_id->qp->qp_num==wc.qp_num);
-			assert(wc.wr_id < pctx->qsize);
-			p = pconn->wr_index[wc.wr_id];
-	
-			DLIST_REMOVE(pconn->wr_list_used, p);
-			DLIST_ADD(pconn->wr_list_avail, p);
-	
+			int	recv_index;
+
 			DEBUG(10, ("recv completion\n"));
-			assert(wc.byte_len <= pctx->max_msg_size);
-	
-			pctx->receive_func(conn, p->msg, wc.byte_len);
+			assert(pconn->cm_id->qp->qp_num==wc.qp_num);
+			assert((int)wc.wr_id > pctx->opts.max_send_wr);
+			recv_index = (int)wc.wr_id - pctx->opts.max_send_wr;
+			assert(recv_index < pctx->opts.max_recv_wr);
+			assert(wc.byte_len <= pctx->opts.recv_bufsize);
+
+/* TODO: take care of fragmented messages !!! */
+			pctx->receive_func(conn,
+				pconn->buf_recv + (recv_index * pctx->opts.recv_bufsize),
+				wc.byte_len);
 			if (ibw_refill_cq_recv(conn))
 				goto error;
 		}
@@ -562,6 +581,8 @@
 
 	opts->max_send_wr = 256;
 	opts->max_recv_wr = 1024;
+	opts->avg_send_size = 1024;
+	opts->recv_bufsize = 256;
 
 	for(i=0; i<nattr; i++) {
 		name = attr[i].name;
@@ -572,6 +593,10 @@
 			opts->max_send_wr = atoi(value);
 		else if (strcmp(name, "max_recv_wr")==0)
 			opts->max_recv_wr = atoi(value);
+		else if (strcmp(name, "avg_send_size")==0)
+			opts->avg_send_size = atoi(value);
+		else if (strcmp(name, "recv_bufsize")==0)
+			opts->recv_bufsize = atoi(value);
 		else {
 			sprintf(ibw_lasterr, "ibw_init: unknown name %s\n", name);
 			return -1;
@@ -584,8 +609,7 @@
 	void *ctx_userdata,
 	ibw_connstate_fn_t ibw_connstate,
 	ibw_receive_fn_t ibw_receive,
-	struct event_context *ectx,
-	int max_msg_size)
+	struct event_context *ectx)
 {
 	struct ibw_ctx *ctx = talloc_zero(NULL, struct ibw_ctx);
 	struct ibw_ctx_priv *pctx;
@@ -640,8 +664,6 @@
 	DEBUG(10, ("created pd %p\n", pctx->pd));
 
 	pctx->pagesize = sysconf(_SC_PAGESIZE);
-	pctx->qsize = pctx->opts.max_send_wr + pctx->opts.max_recv_wr;
-	pctx->max_msg_size = max_msg_size;
 
 	return ctx;
 	/* don't put code here */
@@ -772,8 +794,9 @@
 	return 0;
 }
 
-int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key)
+int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key, int n)
 {
+	struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
 	struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
 	struct ibw_wr *p = pconn->wr_list_avail;
 
@@ -785,8 +808,18 @@
 	DLIST_REMOVE(pconn->wr_list_avail, p);
 	DLIST_ADD(pconn->wr_list_used, p);
 
-	*buf = (void *)p->msg;
-	*key = (void *)p;
+	if (n + sizeof(long) <= pctx->opts.avg_send_size) {
+		*buf = (void *)(p->msg + sizeof(long));
+		*key = (void *)p;
+	} else {
+		p->msg_large = ibw_alloc_mr(pctx, pconn, n + sizeof(long), &p->mr_large);
+		if (!p->msg_large) {
+			sprintf(ibw_lasterr, "ibw_alloc_send_buf alloc error\n");
+			DEBUG(0, (ibw_lasterr));
+			return -1;
+		}
+		*buf = (void *)(p->msg_large + sizeof(long));
+	}
 
 	return 0;
 }
@@ -797,9 +830,9 @@
 	struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
 	struct ibw_wr *p = talloc_get_type(key, struct ibw_wr);
 	struct ibv_sge list = {
-		.addr 	= (uintptr_t) p->msg,
+		.addr 	= (uintptr_t) NULL,
 		.length = n,
-		.lkey 	= pconn->mr->lkey
+		.lkey 	= 0
 	};
 	struct ibv_send_wr wr = {
 		.wr_id 	    = p->wr_id,
@@ -810,8 +843,20 @@
 	};
 	struct ibv_send_wr *bad_wr;
 
-	assert(p->msg==(char *)buf);
-	assert(n<=pctx->max_msg_size);
+	if (n + sizeof(long)<=pctx->opts.avg_send_size) {
+		assert((p->msg + sizeof(long))==(char *)buf);
+		list.lkey = pconn->mr_send->lkey;
+		list.addr = (uintptr_t) p->msg;
+
+		*((uint32_t *)p->msg) = htonl(n);
+	} else {
+		assert((p->msg_large + sizeof(long))==(char *)buf);
+		assert(p->mr_large!=NULL);
+		list.lkey = p->mr_large->lkey;
+		list.addr = (uintptr_t) p->msg_large;
+
+		*((uint32_t *)p->msg_large) = htonl(n);
+	}
 
 	return ibv_post_send(pconn->cm_id->qp, &wr, &bad_wr);
 }

=== modified file 'ib/ibwrapper.h'
--- a/ib/ibwrapper.h	2006-12-13 14:00:41 +0000
+++ b/ib/ibwrapper.h	2006-12-18 19:52:49 +0000
@@ -107,8 +107,7 @@
 	void *ctx_userdata,
 	ibw_connstate_fn_t ibw_connstate,
 	ibw_receive_fn_t ibw_receive,
-	struct event_context *ectx,
-	int max_msg_size);
+	struct event_context *ectx);
 
 /*
  * Must be called in states of (IBWS_ERROR, IBWS_READY, IBWS_CONNECT_REQUEST)
@@ -186,7 +185,7 @@
  *
  * Returns 0 on success.
  */
-int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key);
+int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key, int n);
 
 /*
  * Send the message in one

=== modified file 'ib/ibwrapper_internal.h'
--- a/ib/ibwrapper_internal.h	2006-12-13 14:00:41 +0000
+++ b/ib/ibwrapper_internal.h	2006-12-18 19:52:49 +0000
@@ -24,11 +24,17 @@
 struct ibw_opts {
 	int	max_send_wr;
 	int	max_recv_wr;
+	int	avg_send_size;
+	int	recv_bufsize;
 };
 
 struct ibw_wr {
 	char	*msg; /* initialized in ibw_init_memory once per connection */
 	int	wr_id; /* position in wr_index list; also used as wr id */
+
+	char	*msg_large; /* allocated specially for "large" message */
+	struct ibv_mr *mr_large;
+
 	struct ibw_wr *next, *prev; /* in wr_list_avail or wr_list_used */
 };
 
@@ -48,8 +54,6 @@
 	ibw_receive_fn_t receive_func; /* see ibw_init */
 
 	long	pagesize; /* sysconf result for memalign */
-	int	qsize; /* opts.max_send_wr + opts.max_recv_wr */
-	int	max_msg_size; /* see ibw_init */
 };
 
 struct ibw_conn_priv {
@@ -60,10 +64,16 @@
 	int	is_accepted;
 
 	struct ibv_cq	*cq; /* qp is in cm_id */
-	struct ibv_mr *mr;
-	char *buf; /* fixed size (qsize * opts.max_msg_size) buffer for send/recv */
+
+	char *buf_send; /* max_send_wr * avg_send_size */
+	struct ibv_mr *mr_send;
 	struct ibw_wr *wr_list_avail;
 	struct ibw_wr *wr_list_used;
 	struct ibw_wr **wr_index; /* array[0..(qsize-1)] of (ibw_wr *) */
+
+	/* buf_recv is a ring buffer */
+	char *buf_recv; /* max_recv_wr * avg_recv_size */
+	struct ibv_mr *mr_recv;
+	int recv_index; /* index of the next recv buffer */
 };
 

=== modified file 'tests/ibwrapper_test.c'
--- a/tests/ibwrapper_test.c	2006-12-14 17:21:39 +0000
+++ b/tests/ibwrapper_test.c	2006-12-18 19:52:49 +0000
@@ -50,7 +50,6 @@
 	struct sockaddr_in *addrs; /* dynamic array of dest addrs */
 	int	naddrs;
 
-	int	max_msg_size;
 	unsigned int	nsec; /* nanosleep between messages */
 
 	int	cnt;
@@ -91,15 +90,15 @@
 	struct ibwtest_ctx *tcx = talloc_get_type(conn->ctx->ctx_userdata, struct ibwtest_ctx);
 
 	DEBUG(10, ("test IBWC_CONNECTED\n"));
-	if (ibw_alloc_send_buf(conn, (void **)&buf, &key)) {
+	if (ibw_alloc_send_buf(conn, (void **)&buf, &key, strlen(tcx->id)+2)) {
 		DEBUG(0, ("send_id: ibw_alloc_send_buf failed\n"));
 		return -1;
 	}
-	
+
 	buf[0] = (char)TESTOP_SEND_ID;
 	strcpy(buf+1, tcx->id);
 
-	if (ibw_send(conn, buf, key, strlen(buf+1))) {
+	if (ibw_send(conn, buf, key, strlen(buf+1)+2)) {
 		DEBUG(0, ("send_id: ibw_send error\n"));
 		return -1;
 	}
@@ -111,16 +110,15 @@
 	char *buf;
 	void *key;
 
-	if (ibw_alloc_send_buf(conn, (void **)&buf, &key)) {
+	if (ibw_alloc_send_buf(conn, (void **)&buf, &key, strlen(msg)+2)) {
 		fprintf(stderr, "send_test_msg: ibw_alloc_send_buf failed\n");
 		return -1;
 	}
 
 	buf[0] = (char)TESTOP_SEND_DATA;
-	assert(strlen(msg)<tcx->max_msg_size-1);
 	strcpy(buf+1, msg);
 	
-	if (ibw_send(conn, buf, key, strlen(buf+1))) {
+	if (ibw_send(conn, buf, key, strlen(buf+1)+2)) {
 		DEBUG(0, ("send_test_msg: ibw_send error\n"));
 		return -1;
 	}
@@ -205,7 +203,7 @@
 		char *buf2;
 		void *key2;
 		/* bounce message */
-		if (ibw_alloc_send_buf(conn, (void **)&buf2, &key2)) {
+		if (ibw_alloc_send_buf(conn, (void **)&buf2, &key2, n)) {
 			fprintf(stderr, "ibw_alloc_send_buf error #2\n");
 			return -1;
 		}
@@ -331,11 +329,10 @@
 void ibwtest_usage(struct ibwtest_ctx *tcx, char *name)
 {
 	printf("Usage:\n");
-	printf("\t%s -i <id> -o {name:value} -d {addr:port} -m max_msg_size -t nsec -s\n", name);
+	printf("\t%s -i <id> -o {name:value} -d {addr:port} -t nsec -s\n", name);
 	printf("\t-i <id> is a free text, acting as a server id, max 23 chars [mandatory]\n");
 	printf("\t-o name1:value1,name2:value2,... is a list of (name, value) pairs\n");
 	printf("\t-d addr1:port1,addr2:port2,... is a list of destination ip addresses\n");
-	printf("\t-m max_msg_size maximum message size [default %d]\n", tcx->max_msg_size);
 	printf("\t-t nsec delta time between sends in nanosec [default %d]\n", tcx->nsec);
 	printf("\t-s server mode (you have to give exactly one -d address:port in this case)\n");
 	printf("Press ctrl+C to stop the program.\n");
@@ -350,7 +347,6 @@
 
 	tcx = talloc_zero(NULL, struct ibwtest_ctx);
 	memset(tcx, 0, sizeof(struct ibwtest_ctx));
-	tcx->max_msg_size = 256;
 	tcx->nsec = 1000;
 
 	/* here is the only case we can't avoid using global... */
@@ -372,9 +368,6 @@
 			if (ibwtest_getdests(tcx, op))
 				goto cleanup;
 			break;
-		case 'm':
-			tcx->max_msg_size = atoi(optarg);
-			break;
 		case 's':
 			tcx->is_server = 1;
 			break;
@@ -396,8 +389,7 @@
 		tcx,
 		ibwtest_connstate_handler,
 		ibwtest_receive_handler,
-		ev,
-		tcx->max_msg_size
+		ev
 	);
 	if (!tcx->ibwctx)
 		goto cleanup;



More information about the samba-cvs mailing list