Rev 36: Rough implementation of buffer handling. in
http://samba.org/~tridge/psomogyi/
psomogyi at gamax.hu
psomogyi at gamax.hu
Mon Dec 11 18:56:15 GMT 2006
------------------------------------------------------------
revno: 36
revision-id: psomogyi at gamax.hu-20061211185615-ctn1ctxak621vrhi
parent: psomogyi at gamax.hu-20061206174946-os4dxhvges06h1m9
committer: Peter Somogyi <psomogyi at gamax.hu>
branch nick: ctdb
timestamp: Mon 2006-12-11 19:56:15 +0100
message:
Rough implementation of buffer handling.
Many conceptual fix.
modified:
ib/ibwrapper.c ibwrapper.c-20061204130028-0125b4f5a72f4b11
ib/ibwrapper.h ibwrapper.h-20061204130028-32755c6266dd3c49
ib/ibwrapper_internal.h ibwrapper_internal.h-20061204130028-47f0a7e658b16ca2
=== modified file 'ib/ibwrapper.c'
--- a/ib/ibwrapper.c 2006-12-06 17:49:46 +0000
+++ b/ib/ibwrapper.c 2006-12-11 18:56:15 +0000
@@ -42,8 +42,39 @@
#define IBW_LASTERR_BUFSIZE 512
static char ibw_lasterr[IBW_LASTERR_BUFSIZE];
-static ibw_mr *ibw_alloc_mr(ibw_ctx_priv *pctx)
+static int ibw_init_memory(ibw_conn *conn)
{
+ ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, ibw_ctx_priv);
+ ibw_conn_priv *pconn = talloc_get_type(conn->internal, ibw_conn_priv);
+
+ int i, num_msg;
+ ibw_wr *p;
+
+ /* didn't find any reason to split send & recv buffer handling */
+ num_msg = pctx->opts.max_recv_wr + pctx->opts.max_send_wr;
+
+ pconn->buf = memalign(pctx->page_size, pctx->opts.max_msg_size);
+ if (!pconn->buf) {
+ sprintf(ibw_lasterr, "couldn't allocate work buf\n");
+ return -1;
+ }
+ pconn->mr = ibv_reg_mr(pctx->pd, pconn->buf, pctx->opts.bufsize, IBV_ACCESS_LOCAL_WRITE);
+ if (!pconn->mr) {
+ sprintf(ibw_lasterr, "Couldn't allocate mr\n");
+ return -1;
+ }
+
+ pconn->wr_index = talloc_size(pconn, num_msg * sizeof(ibw_wr *));
+
+ for(i=0; i<num_msg; i++) {
+ p = pconn->wr_index[i] = talloc_zero(pconn, ibw_wr);
+ p->msg = pconn->buf + (i * pconn->opts.max_msg_size);
+ p->wr_id = i;
+
+ DLIST_ADD(pconn->mr_list_avail, p);
+ }
+
+ return 0;
}
static int ibw_ctx_priv_destruct(void *ptr)
@@ -51,14 +82,6 @@
ibw_ctx *pctx = talloc_get_type(ctx->internal, ibw_ctx_priv);
assert(pctx!=NULL);
- /* free memory regions */
-
- /* destroy verbs */
- if (pctx->cq) {
- ibv_destroy_cq(pctx->cq);
- pctx->cq = NULL;
- }
-
if (pctx->verbs_channel) {
ibv_destroy_comp_channel(pctx->verbs_channel);
pctx->verbs_channel = NULL;
@@ -96,13 +119,6 @@
ibw_ctx *ctx = talloc_get_type(ptr, ibw_ctx);
assert(ctx!=NULL);
- if (pconn->cm_id) {
- rdma_destroy_id(pconn->cm_id);
- pconn->cm_id = NULL;
- }
-
- /* free memory regions */
-
return 0;
}
@@ -110,6 +126,33 @@
{
ibw_conn *pconn = talloc_get_type(ptr, ibw_conn_priv);
assert(pconn!=NULL);
+
+ /* free memory regions */
+ if (pconn->mr) {
+ ibv_dereg_mr(pconn->mr);
+ pconn->mr = NULL;
+ }
+ if (pconn->buf) {
+ free(pconn->buf); /* memalign-ed */
+ pconn->buf = NULL;
+ }
+
+ /* pconn->wr_index is freed by talloc */
+ /* pconn->wr_index[i] are freed by talloc */
+
+ /* destroy verbs */
+ if (pconn->cm_id->qp) {
+ ibv_destroy_qp(pconn->qp);
+ pconn->qp = NULL;
+ }
+ if (pconn->cq) {
+ ibv_destroy_cq(pconn->cq);
+ pconn->cq = NULL;
+ }
+ if (pconn->cm_id) {
+ rdma_destroy_id(pctx->cm_id);
+ pctx->cm_id = NULL;
+ }
}
static int ibw_conn_destruct(void *ptr)
@@ -145,12 +188,60 @@
return conn;
}
-static int ibw_manage_connect(struct rdma_cm_id *cma_id)
+static int ibw_setup_cq_qp(ibw_conn *conn)
+{
+ ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, ibw_ctx_priv);
+ ibw_conn_priv *pconn = talloc_get_type(conn->internal, ibw_conn_priv);
+ struct ibv_qp_init_attr init_attr;
+ int rc;
+
+ if (ibw_init_memory(conn))
+ return -1;
+
+ pctx->cq = ibv_create_cq(conn->cm_id->verbs, pctx->opts.max_send_wr + pctx->opts.max_recv_wr,
+ ctx, ctx->verbs_channel, 0);
+ if (cq==NULL) {
+ sprintf(ibw_lasterr, "ibv_create_cq failed\n");
+ return -1;
+ }
+
+ rc = ibv_req_notify_cq(pctx->cq, 0);
+ if (rc) {
+ sprintf(ibw_lasterr, "ibv_req_notify_cq failed with %d\n", rc);
+ return rc;
+ }
+
+ memset(&init_attr, 0, sizeof(init_attr));
+ init_attr.cap.max_send_wr = pctx->opts.max_send_wr;
+ init_attr.cap.max_recv_wr = pctx->opts.max_recv_wr;
+ init_attr.cap.max_recv_sge = 1;
+ init_attr.cap.max_send_sge = 1;
+ init_attr.qp_type = IBV_QPT_RC;
+ init_attr.send_cq = ctx->cq;
+ init_attr.recv_cq = ctx->cq;
+
+ rc = rdma_create_qp(conn->cm_id, pctx->pd, &init_attr);
+ if (rc) {
+ sprintf(ibw_lasterr, "rdma_create_qp (%d) failed with %d\n", is_server, rc);
+ return rc;
+ }
+ /* elase result is in pconn->cm_id->qp */
+
+ return rc;
+}
+
+static void ibw_refill_cq(ibw_conn *conn)
+{
+}
+
+static int ibw_manage_connect(ibw_conn *conn, struct rdma_cm_id *cma_id)
{
struct rdma_conn_param conn_param;
int rc;
- /* TODO: setup verbs... */
+ rc = ibw_setup_cq_qp(conn);
+ if (rc)
+ return -1;
/* cm connect */
memset(&conn_param, 0, sizeof conn_param);
@@ -179,7 +270,7 @@
assert(ctx!=NULL);
- rc = rdma_get_cm_event(cb->cm_channel, &event);
+ rc = rdma_get_cm_event(pctx->cm_channel, &event);
if (rc) {
ctx->state = IBWS_ERROR;
sprintf(ibw_lasterr, "rdma_get_cm_event error %d\n", rc);
@@ -198,7 +289,7 @@
pctx->state = IWINT_ADDR_RESOLVED;
rc = rdma_resolve_route(cma_id, 2000);
if (rc) {
- cb->state = ERROR;
+ ctx->state = ERROR;
sprintf(ibw_lasterr, "rdma_resolve_route error %d\n", rc);
DEBUG(0, ibw_lasterr);
}
@@ -212,7 +303,7 @@
conn = talloc_get_type(cma_id->context, ibw_conn);
pconn = talloc_get_type(conn->internal, ibw_conn_priv);
- rc = ibw_manage_connect(cma_id);
+ rc = ibw_manage_connect(conn, cma_id);
if (rc)
error = 1;
@@ -234,6 +325,9 @@
if (!pconn->is_accepted) {
talloc_free(conn);
DEBUG(10, "pconn->cm_id %p wasn't accepted\n", pconn->cm_id);
+ } else {
+ if (ibw_setup_cq_qp(ctx, conn))
+ error = 1;
}
/* TODO: clarify whether if it's needed by upper layer: */
@@ -245,12 +339,12 @@
case RDMA_CM_EVENT_ESTABLISHED:
/* expected after ibw_accept and ibw_connect[not directly] */
- DEBUG(0, "ESTABLISHED\n");
- ctx->state = IBWS_READY;
+ DEBUG(0, "ESTABLISHED (conn: %u)\n", cma_id->context);
conn = talloc_get_type(cma_id->context, ibw_conn);
assert(conn!=NULL); /* important assumption */
pconn = talloc_get_type(conn->internal, ibw_conn_priv);
+ /* client conn is up */
conn->state = IBWC_CONNECTED;
/* both ctx and conn have changed */
@@ -274,9 +368,13 @@
pctx->connstate_func(NULL, conn);
talloc_free(conn);
+
+ if (ctx->conn_list==NULL)
+ rdma_disconnect(ctx->cm_id);
} else {
DEBUG(0, "server DISCONNECT event\n");
ctx->state = IBWS_STOPPED; /* ??? TODO: try it... */
+ /* talloc_free(ctx) should be called within or after this func */
pctx->connstate_func(ctx, NULL);
}
break;
@@ -313,28 +411,32 @@
static void ibw_event_handler_verbs(struct event_context *ev,
struct fd_event *fde, uint16_t flags, void *private_data)
{
- int rc;
ibw_ctx *ctx = talloc_get_type(private_data, ibw_ctx);
ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, ibw_ctx_priv);
+
}
static int ibw_process_init_attrs(ibw_initattr *attr, int nattr, ibw_opts *opts)
{
- int i;
+ int i, mtu;
char *name, *value;
-
+
+ opts->max_send_wr = 256;
+ opts->max_recv_wr = 1024;
+ opts->max_msg_size = 1024;
+
for(i=0; i<nattr; i++) {
name = attr[i].name;
value = attr[i].value;
assert(name!=NULL && value!=NULL);
- if (strcmp(name, "dev_name")==0)
- opts->opts.dev_name = talloc_strdup(ctx, value);
- else if (strcmp(name, "rx_depth")==0)
- opts->rx_depth = atoi(value);
- else if (strcmp(name, "mtu")==0)
- opts->mtu = atoi(value);
+ if (strcmp(name, "max_send_wr")==0)
+ opts->max_send_wr = atoi(value);
+ else if (strcmp(name, "max_recv_wr")==0)
+ opts->max_recv_wr = atoi(value);
+ else if (strcmp(name, "max_msg_size")==0)
+ opts->bufsize = atoi(value);
else {
sprintf(ibw_lasterr, "ibw_init: unknown name %s\n", name);
return -1;
@@ -378,8 +480,7 @@
/* init cm */
pctx->cm_channel = rdma_create_event_channel();
if (!pctx->cm_channel) {
- ret = errno;
- sprintf(ibw_lasterr, "rdma_create_event_channel error %d\n", ret);
+ sprintf(ibw_lasterr, "rdma_create_event_channel error %d\n", errno);
goto cleanup;
}
@@ -404,23 +505,21 @@
pctx->verbs_channel = ibv_create_comp_channel(cm_id->verbs);
if (!pctx->verbs_channel) {
- sprintf(stderr, "ibv_create_comp_channel failed %d\n", errno);
+ sprintf(ibw_lasterr, "ibv_create_comp_channel failed %d\n", errno);
goto cleanup;
}
- DEBUG_LOG("created channel %p\n", pctx->channel);
+ DEBUG(10, "created channel %p\n", pctx->channel);
pctx->verbs_channel_event = event_add_fd(pctx->ectx, pctx,
pctx->verbs_channel->fd, EVENT_FD_READ, ibw_event_handler_verbs, ctx);
- pctx->cq = ibv_create_cq(cm_id->verbs, pctx->opts.rx_depth, ctx,
- ctx->verbs_channel, 0);
-
- /* allocate ib memory regions */
+ pctx->pagesize = sysconf(_SC_PAGESIZE);
return ctx;
-
+ /* don't put code here */
cleanup:
DEBUG(0, ibw_lasterr);
+
if (ctx)
talloc_free(ctx);
@@ -455,7 +554,7 @@
int rc;
DEBUG_LOG("rdma_listen...\n");
- rc = rdma_listen(cb->cm_id, backlog);
+ rc = rdma_listen(pctx->cm_id, backlog);
if (rc) {
sprintf(ibw_lasterr, "rdma_listen failed: %d\n", ret);
DEBUG(0, ibw_lasterr);
@@ -471,6 +570,8 @@
ibw_conn_priv *pconn = talloc_get_type(conn->internal, ibw_conn_priv);
struct rdma_conn_param conn_param;
+ conn->conn_userdata = conn_userdata;
+
memset(&conn_param, 0, sizeof(struct rdma_conn_param));
conn_param.responder_resources = 1;
conn_param.initiator_depth = 1;
@@ -522,21 +623,61 @@
void ibw_disconnect(ibw_conn *conn)
{
ibw_conn_priv *pconn = talloc_get_type(conn->internal, ibw_conn_priv);
-
+ ibw_ctx *ctx = conn->ctx;
+ ibw_ctx_priv *pctx = talloc_get_type(ctx->internal);
+
+ rdma_disconnect(pctx->cm_id);
+
+ /* continued at RDMA_CM_EVENT_DISCONNECTED */
+
return 0;
}
-int ibw_alloc_send_buf(ibw_conn *conn, void **buf, void **key, int n)
+int ibw_alloc_send_buf(ibw_conn *conn, void **buf, void **key, int *maxsize)
{
ibw_conn_priv *pconn = talloc_get_type(conn->internal, ibw_conn_priv);
-
- return 0;
+ ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, ibw_ctx_priv);
+ ibw_wr *p = pctx->wr_list_avail;
+
+ if (p==NULL) {
+ sprintf(ibw_last_err, "insufficient wr chunks\n");
+ return -1;
+ }
+
+ *maxsize = pctx->opts.max_msg_size;
+
+ DLIST_REMOVE(pctx->wr_list_avail, p);
+ DLIST_ADD(pctx->wr_list_used, p);
+
+ *buf = (void *)p->msg;
+ *key = (void *)p;
+
+ return pctx->buf;
}
int ibw_send(ibw_conn *conn, void *buf, void *key, int n)
{
- ibw_conn_priv *pconn = (ibw_ctx_priv *)ctx->internal;
- return 0;
+ ibw_ctx_priv pctx = talloc_get_type(conn->ctx->internal, ibw_ctx_priv);
+ ibw_wr *p = talloc_get_type(key, ibw_wr);
+ struct ibv_sge list = {
+ .addr = (uintptr_t) p->msg,
+ .length = n,
+ .lkey = pctx->mr->lkey
+ };
+ struct ibv_send_wr wr = {
+ .wr_id = p->wr_id,
+ .sg_list = &list,
+ .num_sge = 1,
+ .opcode = IBV_WR_SEND,
+ .send_flags = IBV_SEND_SIGNALED,
+ };
+ struct ibv_send_wr *bad_wr;
+
+ assert(p->msg==(char *)buf);
+
+ p->conn = conn; /* set it only now */
+
+ return ibv_post_send(conn->qp, &wr, &bad_wr);
}
const char *ibw_getLastError()
=== modified file 'ib/ibwrapper.h'
--- a/ib/ibwrapper.h 2006-12-06 17:49:46 +0000
+++ b/ib/ibwrapper.h 2006-12-11 18:56:15 +0000
@@ -178,10 +178,11 @@
* You have to use this buf to fill in before send.
* It's just to avoid memcpy.in ibw_send.
* Use the same (buf, key) pair with ibw_send.
+ * Don't use more space than maxsize.
*
* Returns 0 on success.
*/
-int ibw_alloc_send_buf(ibw_conn *conn, void **buf, void **key, int n);
+int ibw_alloc_send_buf(ibw_conn *conn, void **buf, void **key, int *maxsize);
/*
* Send the message in one
=== modified file 'ib/ibwrapper_internal.h'
--- a/ib/ibwrapper_internal.h 2006-12-06 17:49:46 +0000
+++ b/ib/ibwrapper_internal.h 2006-12-11 18:56:15 +0000
@@ -21,18 +21,19 @@
* Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
*/
-typedef struct _ibw_mr {
- struct ibv_mr *mr;
- struct _ibw_mr *next, *prev;
-} ibw_mr;
-
typedef struct _ibw_opts {
- char *dev_name;
- int rx_depth;
- int mtu;
- int ib_port;
+ int max_send_wr;
+ int max_recv_wr;
+ int max_msg_size;
} ibw_opts;
+typedef struct _ibw_wr {
+ char *msg; /* initialized in ibw_init_memory once */
+ ibw_conn *conn; /*valid only when in wr_list_used */
+ int wr_id; /* position in wr_index list; also used as wr id */
+ struct _ibw_wr *next, *prev; /* in wr_list_avail or wr_list_used */
+} ibw_wr;
+
typedef enum {
IWINT_INIT = 0,
IWINT_ADDR_RESOLVED,
@@ -41,11 +42,6 @@
} ibw_state_ctx;
typedef struct _ibw_ctx_priv {
- ibw_mr *avail_first;
- ibw_mr *avail_last;
- ibw_mr *used_first;
- ibw_mr *used_last;
-
struct event_context *ectx;
ibw_opts opts;
@@ -64,22 +60,19 @@
ibw_connstate_fn_t connstate_func;
ibw_receive_fn_t receive_func;
+
+ long pagesize; /* sysconf result for memalign */
} ibw_ctx_priv;
typedef struct _ibw_conn_priv {
- struct ibv_cq *cq;
- struct ibv_qp *qp;
-
struct rdma_cm_id *cm_id; /* client's cm id */
int is_accepted;
+
+ struct ibv_cq *cq; /* qp is in cm_id */
+ struct ibv_mr *mr;
+ char *buf; /* fixed size (opts.bufsize) buffer for send/recv */
+ ibw_wr *wr_list_avail;
+ ibw_wr *wr_list_used;
+ ibw_wr **wr_index; /* array[0..(max_send_wr + max_recv_wr)-1] of (ibw_wr *) */
} ibw_conn_priv;
-/*
- * Must be called in all cases after selecting/polling
- * for FDs set via ibw_add_event_fn_t.
- *
- * fd_index: fd identifier passed in ibw_add_event_fn_t
- * with the same fd was set there.
- */
-//int ibw_process_event(ibw_ctx *ctx, int fd_index);
-
More information about the samba-cvs
mailing list