Rev 36: Rough implementation of buffer handling. in http://samba.org/~tridge/psomogyi/

psomogyi at gamax.hu psomogyi at gamax.hu
Mon Dec 11 18:56:15 GMT 2006


------------------------------------------------------------
revno: 36
revision-id: psomogyi at gamax.hu-20061211185615-ctn1ctxak621vrhi
parent: psomogyi at gamax.hu-20061206174946-os4dxhvges06h1m9
committer: Peter Somogyi <psomogyi at gamax.hu>
branch nick: ctdb
timestamp: Mon 2006-12-11 19:56:15 +0100
message:
  Rough implementation of buffer handling.
  Many conceptual fix.
modified:
  ib/ibwrapper.c                 ibwrapper.c-20061204130028-0125b4f5a72f4b11
  ib/ibwrapper.h                 ibwrapper.h-20061204130028-32755c6266dd3c49
  ib/ibwrapper_internal.h        ibwrapper_internal.h-20061204130028-47f0a7e658b16ca2
=== modified file 'ib/ibwrapper.c'
--- a/ib/ibwrapper.c	2006-12-06 17:49:46 +0000
+++ b/ib/ibwrapper.c	2006-12-11 18:56:15 +0000
@@ -42,8 +42,39 @@
 #define IBW_LASTERR_BUFSIZE 512
 static char ibw_lasterr[IBW_LASTERR_BUFSIZE];
 
-static ibw_mr *ibw_alloc_mr(ibw_ctx_priv *pctx)
+static int ibw_init_memory(ibw_conn *conn)
 {
+	ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, ibw_ctx_priv);
+	ibw_conn_priv *pconn = talloc_get_type(conn->internal, ibw_conn_priv);
+
+	int	i, num_msg;
+	ibw_wr	*p;
+
+	/* didn't find any reason to split send & recv buffer handling */
+	num_msg = pctx->opts.max_recv_wr + pctx->opts.max_send_wr;
+
+	pconn->buf = memalign(pctx->page_size, pctx->opts.max_msg_size);
+	if (!pconn->buf) {
+		sprintf(ibw_lasterr, "couldn't allocate work buf\n");
+		return -1;
+	}
+	pconn->mr = ibv_reg_mr(pctx->pd, pconn->buf, pctx->opts.bufsize, IBV_ACCESS_LOCAL_WRITE);
+	if (!pconn->mr) {
+		sprintf(ibw_lasterr, "Couldn't allocate mr\n");
+		return -1;
+	}
+
+	pconn->wr_index = talloc_size(pconn, num_msg * sizeof(ibw_wr *));
+
+	for(i=0; i<num_msg; i++) {
+		p = pconn->wr_index[i] = talloc_zero(pconn, ibw_wr);
+		p->msg = pconn->buf + (i * pconn->opts.max_msg_size);
+		p->wr_id = i;
+
+		DLIST_ADD(pconn->mr_list_avail, p);
+	}
+
+	return 0;
 }
 
 static int ibw_ctx_priv_destruct(void *ptr)
@@ -51,14 +82,6 @@
 	ibw_ctx *pctx = talloc_get_type(ctx->internal, ibw_ctx_priv);
 	assert(pctx!=NULL);
 
-	/* free memory regions */
-	
-	/* destroy verbs */
-	if (pctx->cq) {
-		ibv_destroy_cq(pctx->cq);
-		pctx->cq = NULL;
-	}
-
 	if (pctx->verbs_channel) {
 		ibv_destroy_comp_channel(pctx->verbs_channel);
 		pctx->verbs_channel = NULL;
@@ -96,13 +119,6 @@
 	ibw_ctx *ctx = talloc_get_type(ptr, ibw_ctx);
 	assert(ctx!=NULL);
 
-	if (pconn->cm_id) {
-		rdma_destroy_id(pconn->cm_id);
-		pconn->cm_id = NULL;
-	}
-
-	/* free memory regions */
-
 	return 0;
 }
 
@@ -110,6 +126,33 @@
 {
 	ibw_conn *pconn = talloc_get_type(ptr, ibw_conn_priv);
 	assert(pconn!=NULL);
+
+	/* free memory regions */
+	if (pconn->mr) {
+		ibv_dereg_mr(pconn->mr);
+		pconn->mr = NULL;
+	}
+	if (pconn->buf) {
+		free(pconn->buf); /* memalign-ed */
+		pconn->buf = NULL;
+	}
+
+	/* pconn->wr_index is freed by talloc */
+	/* pconn->wr_index[i] are freed by talloc */
+
+	/* destroy verbs */
+	if (pconn->cm_id->qp) {
+		ibv_destroy_qp(pconn->qp);
+		pconn->qp = NULL;
+	}
+	if (pconn->cq) {
+		ibv_destroy_cq(pconn->cq);
+		pconn->cq = NULL;
+	}
+	if (pconn->cm_id) {
+		rdma_destroy_id(pctx->cm_id);
+		pctx->cm_id = NULL;
+	}
 }
 
 static int ibw_conn_destruct(void *ptr)
@@ -145,12 +188,60 @@
 	return conn;
 }
 
-static int ibw_manage_connect(struct rdma_cm_id *cma_id)
+static int ibw_setup_cq_qp(ibw_conn *conn)
+{
+	ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, ibw_ctx_priv);
+	ibw_conn_priv *pconn = talloc_get_type(conn->internal, ibw_conn_priv);
+	struct ibv_qp_init_attr init_attr;
+	int rc;
+
+	if (ibw_init_memory(conn))
+		return -1;
+
+	pctx->cq = ibv_create_cq(conn->cm_id->verbs, pctx->opts.max_send_wr + pctx->opts.max_recv_wr,
+		ctx, ctx->verbs_channel, 0);
+	if (cq==NULL) {
+		sprintf(ibw_lasterr, "ibv_create_cq failed\n");
+		return -1;
+	}
+
+	rc = ibv_req_notify_cq(pctx->cq, 0);
+	if (rc) {
+		sprintf(ibw_lasterr, "ibv_req_notify_cq failed with %d\n", rc);
+		return rc;
+	}
+
+	memset(&init_attr, 0, sizeof(init_attr));
+	init_attr.cap.max_send_wr = pctx->opts.max_send_wr;
+	init_attr.cap.max_recv_wr = pctx->opts.max_recv_wr;
+	init_attr.cap.max_recv_sge = 1;
+	init_attr.cap.max_send_sge = 1;
+	init_attr.qp_type = IBV_QPT_RC;
+	init_attr.send_cq = ctx->cq;
+	init_attr.recv_cq = ctx->cq;
+
+	rc = rdma_create_qp(conn->cm_id, pctx->pd, &init_attr);
+	if (rc) {
+		sprintf(ibw_lasterr, "rdma_create_qp (%d) failed with %d\n", is_server, rc);
+		return rc;
+	}
+	/* elase result is in pconn->cm_id->qp */
+
+	return rc;
+}
+
+static void ibw_refill_cq(ibw_conn *conn)
+{
+}
+
+static int ibw_manage_connect(ibw_conn *conn, struct rdma_cm_id *cma_id)
 {
 	struct rdma_conn_param conn_param;
 	int	rc;
 
-	/* TODO: setup verbs... */
+	rc = ibw_setup_cq_qp(conn);
+	if (rc)
+		return -1;
 
 	/* cm connect */
 	memset(&conn_param, 0, sizeof conn_param);
@@ -179,7 +270,7 @@
 
 	assert(ctx!=NULL);
 
-	rc = rdma_get_cm_event(cb->cm_channel, &event);
+	rc = rdma_get_cm_event(pctx->cm_channel, &event);
 	if (rc) {
 		ctx->state = IBWS_ERROR;
 		sprintf(ibw_lasterr, "rdma_get_cm_event error %d\n", rc);
@@ -198,7 +289,7 @@
 		pctx->state = IWINT_ADDR_RESOLVED;
 		rc = rdma_resolve_route(cma_id, 2000);
 		if (rc) {
-			cb->state = ERROR;
+			ctx->state = ERROR;
 			sprintf(ibw_lasterr, "rdma_resolve_route error %d\n", rc);
 			DEBUG(0, ibw_lasterr);
 		}
@@ -212,7 +303,7 @@
 		conn = talloc_get_type(cma_id->context, ibw_conn);
 		pconn = talloc_get_type(conn->internal, ibw_conn_priv);
 
-		rc = ibw_manage_connect(cma_id);
+		rc = ibw_manage_connect(conn, cma_id);
 		if (rc)
 			error = 1;
 
@@ -234,6 +325,9 @@
 		if (!pconn->is_accepted) {
 			talloc_free(conn);
 			DEBUG(10, "pconn->cm_id %p wasn't accepted\n", pconn->cm_id);
+		} else {
+			if (ibw_setup_cq_qp(ctx, conn))
+				error = 1;
 		}
 
 		/* TODO: clarify whether if it's needed by upper layer: */
@@ -245,12 +339,12 @@
 
 	case RDMA_CM_EVENT_ESTABLISHED:
 		/* expected after ibw_accept and ibw_connect[not directly] */
-		DEBUG(0, "ESTABLISHED\n");
-		ctx->state = IBWS_READY;
+		DEBUG(0, "ESTABLISHED (conn: %u)\n", cma_id->context);
 		conn = talloc_get_type(cma_id->context, ibw_conn);
 		assert(conn!=NULL); /* important assumption */
 		pconn = talloc_get_type(conn->internal, ibw_conn_priv);
 
+		/* client conn is up */
 		conn->state = IBWC_CONNECTED;
 
 		/* both ctx and conn have changed */
@@ -274,9 +368,13 @@
 			pctx->connstate_func(NULL, conn);
 
 			talloc_free(conn);
+
+			if (ctx->conn_list==NULL)
+				rdma_disconnect(ctx->cm_id);
 		} else {
 			DEBUG(0, "server DISCONNECT event\n");
 			ctx->state = IBWS_STOPPED; /* ??? TODO: try it... */
+			/* talloc_free(ctx) should be called within or after this func */
 			pctx->connstate_func(ctx, NULL);
 		}
 		break;
@@ -313,28 +411,32 @@
 static void ibw_event_handler_verbs(struct event_context *ev,
 	struct fd_event *fde, uint16_t flags, void *private_data)
 {
-	int	rc;
 	ibw_ctx	*ctx = talloc_get_type(private_data, ibw_ctx);
 	ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, ibw_ctx_priv);
 
+
 }
 
 static int ibw_process_init_attrs(ibw_initattr *attr, int nattr, ibw_opts *opts)
 {
-	int	i;
+	int	i, mtu;
 	char *name, *value;
-	
+
+	opts->max_send_wr = 256;
+	opts->max_recv_wr = 1024;
+	opts->max_msg_size = 1024;
+
 	for(i=0; i<nattr; i++) {
 		name = attr[i].name;
 		value = attr[i].value;
 
 		assert(name!=NULL && value!=NULL);
-		if (strcmp(name, "dev_name")==0)
-			opts->opts.dev_name = talloc_strdup(ctx, value);
-		else if (strcmp(name, "rx_depth")==0)
-			opts->rx_depth = atoi(value);
-		else if (strcmp(name, "mtu")==0)
-			opts->mtu = atoi(value);
+		if (strcmp(name, "max_send_wr")==0)
+			opts->max_send_wr = atoi(value);
+		else if (strcmp(name, "max_recv_wr")==0)
+			opts->max_recv_wr = atoi(value);
+		else if (strcmp(name, "max_msg_size")==0)
+			opts->bufsize = atoi(value);
 		else {
 			sprintf(ibw_lasterr, "ibw_init: unknown name %s\n", name);
 			return -1;
@@ -378,8 +480,7 @@
 	/* init cm */
 	pctx->cm_channel = rdma_create_event_channel();
 	if (!pctx->cm_channel) {
-		ret = errno;
-		sprintf(ibw_lasterr, "rdma_create_event_channel error %d\n", ret);
+		sprintf(ibw_lasterr, "rdma_create_event_channel error %d\n", errno);
 		goto cleanup;
 	}
 
@@ -404,23 +505,21 @@
 
 	pctx->verbs_channel = ibv_create_comp_channel(cm_id->verbs);
 	if (!pctx->verbs_channel) {
-		sprintf(stderr, "ibv_create_comp_channel failed %d\n", errno);
+		sprintf(ibw_lasterr, "ibv_create_comp_channel failed %d\n", errno);
 		goto cleanup;
 	}
-	DEBUG_LOG("created channel %p\n", pctx->channel);
+	DEBUG(10, "created channel %p\n", pctx->channel);
 
 	pctx->verbs_channel_event = event_add_fd(pctx->ectx, pctx,
 		pctx->verbs_channel->fd, EVENT_FD_READ, ibw_event_handler_verbs, ctx);
 
-	pctx->cq = ibv_create_cq(cm_id->verbs, pctx->opts.rx_depth, ctx,
-		ctx->verbs_channel, 0);
-
-	/* allocate ib memory regions */
+	pctx->pagesize = sysconf(_SC_PAGESIZE);
 
 	return ctx;
-
+	/* don't put code here */
 cleanup:
 	DEBUG(0, ibw_lasterr);
+
 	if (ctx)
 		talloc_free(ctx);
 
@@ -455,7 +554,7 @@
 	int	rc;
 
 	DEBUG_LOG("rdma_listen...\n");
-	rc = rdma_listen(cb->cm_id, backlog);
+	rc = rdma_listen(pctx->cm_id, backlog);
 	if (rc) {
 		sprintf(ibw_lasterr, "rdma_listen failed: %d\n", ret);
 		DEBUG(0, ibw_lasterr);
@@ -471,6 +570,8 @@
 	ibw_conn_priv *pconn = talloc_get_type(conn->internal, ibw_conn_priv);
 	struct rdma_conn_param	conn_param;
 
+	conn->conn_userdata = conn_userdata;
+
 	memset(&conn_param, 0, sizeof(struct rdma_conn_param));
 	conn_param.responder_resources = 1;
 	conn_param.initiator_depth = 1;
@@ -522,21 +623,61 @@
 void ibw_disconnect(ibw_conn *conn)
 {
 	ibw_conn_priv *pconn = talloc_get_type(conn->internal, ibw_conn_priv);
-	
+	ibw_ctx *ctx = conn->ctx;
+	ibw_ctx_priv *pctx = talloc_get_type(ctx->internal);
+
+	rdma_disconnect(pctx->cm_id);
+
+	/* continued at RDMA_CM_EVENT_DISCONNECTED */
+
 	return 0;
 }
 
-int ibw_alloc_send_buf(ibw_conn *conn, void **buf, void **key, int n)
+int ibw_alloc_send_buf(ibw_conn *conn, void **buf, void **key, int *maxsize)
 {
 	ibw_conn_priv *pconn = talloc_get_type(conn->internal, ibw_conn_priv);
-
-	return 0;
+	ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, ibw_ctx_priv);
+	ibw_wr *p = pctx->wr_list_avail;
+
+	if (p==NULL) {
+		sprintf(ibw_last_err, "insufficient wr chunks\n");
+		return -1;
+	}
+
+	*maxsize = pctx->opts.max_msg_size;
+
+	DLIST_REMOVE(pctx->wr_list_avail, p);
+	DLIST_ADD(pctx->wr_list_used, p);
+
+	*buf = (void *)p->msg;
+	*key = (void *)p;
+
+	return pctx->buf;
 }
 
 int ibw_send(ibw_conn *conn, void *buf, void *key, int n)
 {
-	ibw_conn_priv *pconn = (ibw_ctx_priv *)ctx->internal;
-	return 0;
+	ibw_ctx_priv pctx = talloc_get_type(conn->ctx->internal, ibw_ctx_priv);
+	ibw_wr *p = talloc_get_type(key, ibw_wr);
+	struct ibv_sge list = {
+		.addr 	= (uintptr_t) p->msg,
+		.length = n,
+		.lkey 	= pctx->mr->lkey
+	};
+	struct ibv_send_wr wr = {
+		.wr_id 	    = p->wr_id,
+		.sg_list    = &list,
+		.num_sge    = 1,
+		.opcode     = IBV_WR_SEND,
+		.send_flags = IBV_SEND_SIGNALED,
+	};
+	struct ibv_send_wr *bad_wr;
+
+	assert(p->msg==(char *)buf);
+
+	p->conn = conn; /* set it only now */
+
+	return ibv_post_send(conn->qp, &wr, &bad_wr);
 }
 
 const char *ibw_getLastError()

=== modified file 'ib/ibwrapper.h'
--- a/ib/ibwrapper.h	2006-12-06 17:49:46 +0000
+++ b/ib/ibwrapper.h	2006-12-11 18:56:15 +0000
@@ -178,10 +178,11 @@
  * You have to use this buf to fill in before send.
  * It's just to avoid memcpy.in ibw_send.
  * Use the same (buf, key) pair with ibw_send.
+ * Don't use more space than maxsize.
  *
  * Returns 0 on success.
  */
-int ibw_alloc_send_buf(ibw_conn *conn, void **buf, void **key, int n);
+int ibw_alloc_send_buf(ibw_conn *conn, void **buf, void **key, int *maxsize);
 
 /*
  * Send the message in one

=== modified file 'ib/ibwrapper_internal.h'
--- a/ib/ibwrapper_internal.h	2006-12-06 17:49:46 +0000
+++ b/ib/ibwrapper_internal.h	2006-12-11 18:56:15 +0000
@@ -21,18 +21,19 @@
  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
  */
 
-typedef struct _ibw_mr {
-	struct ibv_mr *mr;
-	struct _ibw_mr *next, *prev;
-} ibw_mr;
-
 typedef struct _ibw_opts {
-	char	*dev_name;
-	int	rx_depth;
-	int	mtu;
-	int	ib_port;
+	int	max_send_wr;
+	int	max_recv_wr;
+	int	max_msg_size;
 } ibw_opts;
 
+typedef struct _ibw_wr {
+	char	*msg; /* initialized in ibw_init_memory once */
+	ibw_conn *conn; /*valid only when in wr_list_used */
+	int	wr_id; /* position in wr_index list; also used as wr id */
+	struct _ibw_wr *next, *prev; /* in wr_list_avail or wr_list_used */
+} ibw_wr;
+
 typedef enum {
 	IWINT_INIT = 0,
 	IWINT_ADDR_RESOLVED,
@@ -41,11 +42,6 @@
 } ibw_state_ctx;
 
 typedef struct _ibw_ctx_priv {
-	ibw_mr *avail_first;
-	ibw_mr *avail_last;
-	ibw_mr *used_first;
-	ibw_mr *used_last;
-
 	struct event_context *ectx;
 
 	ibw_opts opts;
@@ -64,22 +60,19 @@
 
 	ibw_connstate_fn_t connstate_func;
 	ibw_receive_fn_t receive_func;
+
+	long	pagesize; /* sysconf result for memalign */
 } ibw_ctx_priv;
 
 typedef struct _ibw_conn_priv {
-	struct ibv_cq	*cq;
-	struct ibv_qp	*qp;
-
 	struct rdma_cm_id *cm_id; /* client's cm id */
 	int	is_accepted;
+
+	struct ibv_cq	*cq; /* qp is in cm_id */
+	struct ibv_mr *mr;
+	char *buf; /* fixed size (opts.bufsize) buffer for send/recv */
+	ibw_wr *wr_list_avail;
+	ibw_wr *wr_list_used;
+	ibw_wr **wr_index; /* array[0..(max_send_wr + max_recv_wr)-1] of (ibw_wr *) */
 } ibw_conn_priv;
 
-/* 
- * Must be called in all cases after selecting/polling
- * for FDs set via ibw_add_event_fn_t.
- *
- * fd_index: fd identifier passed in ibw_add_event_fn_t
- * with the same fd was set there.
- */
-//int ibw_process_event(ibw_ctx *ctx, int fd_index);
-



More information about the samba-cvs mailing list