Rev 52: 1st "working" ib version. in
http://samba.org/~tridge/psomogyi/
psomogyi at gamax.hu
psomogyi at gamax.hu
Thu Jan 25 10:02:00 GMT 2007
------------------------------------------------------------
revno: 52
revision-id: psomogyi at gamax.hu-20070125100159-73mdd3kowsvi1yd5
parent: psomogyi at gamax.hu-20070105171335-smiknw06nd9o7cnw
committer: Peter Somogyi <psomogyi at gamax.hu>
branch nick: ctdb
timestamp: Thu 2007-01-25 11:01:59 +0100
message:
1st "working" ib version.
TODO: stress test, variable size messages, flood
modified:
ib/ibwrapper.c ibwrapper.c-20061204130028-0125b4f5a72f4b11
ib/ibwrapper_internal.h ibwrapper_internal.h-20061204130028-47f0a7e658b16ca2
tests/ibwrapper_test.c ibwrapper_test.c-20061214171730-h11a2z5ed6pt66hj-1
=== modified file 'ib/ibwrapper.c'
--- a/ib/ibwrapper.c 2007-01-04 15:44:41 +0000
+++ b/ib/ibwrapper.c 2007-01-25 10:01:59 +0000
@@ -57,14 +57,14 @@
{
void *buf;
- DEBUG(10, ("ibw_alloc_mr(cmid=%u, n=%u)\n", (uint32_t)pconn->cm_id, n));
+ DEBUG(10, ("ibw_alloc_mr(cmid=%p, n=%u)\n", pconn->cm_id, n));
buf = memalign(pctx->pagesize, n);
if (!buf) {
sprintf(ibw_lasterr, "couldn't allocate memory\n");
return NULL;
}
- *ppmr = ibv_reg_mr(pctx->pd, buf, n, IBV_ACCESS_LOCAL_WRITE);
+ *ppmr = ibv_reg_mr(pconn->pd, buf, n, IBV_ACCESS_LOCAL_WRITE);
if (!*ppmr) {
sprintf(ibw_lasterr, "couldn't allocate mr\n");
free(buf);
@@ -95,7 +95,7 @@
int i;
struct ibw_wr *p;
- DEBUG(10, ("ibw_init_memory(cmid: %u)\n", (uint32_t)pconn->cm_id));
+ DEBUG(10, ("ibw_init_memory(cmid: %p)\n", pconn->cm_id));
pconn->buf_send = ibw_alloc_mr(pctx, pconn,
opts->max_send_wr * opts->avg_send_size, &pconn->mr_send);
if (!pconn->buf_send) {
@@ -116,7 +116,7 @@
for(i=0; i<opts->max_send_wr; i++) {
p = pconn->wr_index[i] = talloc_zero(pconn, struct ibw_wr);
p->msg = pconn->buf_send + (i * opts->avg_send_size);
- p->wr_id = i + opts->max_recv_wr;
+ p->wr_id = i;
DLIST_ADD(pconn->wr_list_avail, p);
}
@@ -128,11 +128,6 @@
{
DEBUG(10, ("ibw_ctx_priv_destruct(%u)\n", (uint32_t)pctx));
- if (pctx->pd) {
- ibv_dealloc_pd(pctx->pd);
- pctx->pd = NULL;
- }
-
/* destroy cm */
if (pctx->cm_channel) {
rdma_destroy_event_channel(pctx->cm_channel);
@@ -159,8 +154,8 @@
static int ibw_conn_priv_destruct(struct ibw_conn_priv *pconn)
{
- DEBUG(10, ("ibw_conn_priv_destruct(%u, cmid: %u)\n",
- (uint32_t)pconn, (uint32_t)pconn->cm_id));
+ DEBUG(10, ("ibw_conn_priv_destruct(%u, cmid: %p)\n",
+ (uint32_t)pconn, pconn->cm_id));
/* free memory regions */
ibw_free_mr(&pconn->buf_send, &pconn->mr_send);
@@ -187,6 +182,10 @@
talloc_free(pconn->verbs_channel_event);
pconn->verbs_channel_event = NULL;
}
+ if (pconn->pd) {
+ ibv_dealloc_pd(pconn->pd);
+ pconn->pd = NULL;
+ }
if (pconn->cm_id) {
rdma_destroy_id(pconn->cm_id);
pconn->cm_id = NULL;
@@ -217,6 +216,7 @@
talloc_set_destructor(pconn, ibw_conn_priv_destruct);
conn->ctx = ctx;
+ conn->internal = (void *)pconn;
DLIST_ADD(ctx->conn_list, conn);
@@ -230,11 +230,7 @@
struct ibv_qp_init_attr init_attr;
int rc;
- DEBUG(10, ("ibw_setup_cq_qp(cmid: %u)\n", (uint32_t)pconn->cm_id));
-
- /* init mr */
- if (ibw_init_memory(conn))
- return -1;
+ DEBUG(10, ("ibw_setup_cq_qp(cmid: %p)\n", pconn->cm_id));
/* init verbs */
pconn->verbs_channel = ibv_create_comp_channel(pconn->cm_id->verbs);
@@ -247,6 +243,17 @@
pconn->verbs_channel_event = event_add_fd(pctx->ectx, conn,
pconn->verbs_channel->fd, EVENT_FD_READ, ibw_event_handler_verbs, conn);
+ pconn->pd = ibv_alloc_pd(pconn->cm_id->verbs);
+ if (!pconn->pd) {
+ sprintf(ibw_lasterr, "ibv_alloc_pd failed %d\n", errno);
+ return -1;
+ }
+ DEBUG(10, ("created pd %p\n", pconn->pd));
+
+ /* init mr */
+ if (ibw_init_memory(conn))
+ return -1;
+
/* init cq */
pconn->cq = ibv_create_cq(pconn->cm_id->verbs,
pctx->opts.max_recv_wr + pctx->opts.max_send_wr,
@@ -272,7 +279,7 @@
init_attr.send_cq = pconn->cq;
init_attr.recv_cq = pconn->cq;
- rc = rdma_create_qp(pconn->cm_id, pctx->pd, &init_attr);
+ rc = rdma_create_qp(pconn->cm_id, pconn->pd, &init_attr);
if (rc) {
sprintf(ibw_lasterr, "rdma_create_qp failed with %d\n", rc);
return rc;
@@ -299,7 +306,7 @@
};
struct ibv_recv_wr *bad_wr;
- DEBUG(10, ("ibw_refill_cq_recv(cmid: %u)\n", (uint32_t)pconn->cm_id));
+ DEBUG(10, ("ibw_refill_cq_recv(cmid: %p)\n", pconn->cm_id));
list.addr = (uintptr_t) pconn->buf_recv + pctx->opts.recv_bufsize * pconn->recv_index;
wr.wr_id = pconn->recv_index;
@@ -332,7 +339,7 @@
};
struct ibv_recv_wr *bad_wr;
- DEBUG(10, ("ibw_fill_cq(cmid: %u)\n", (uint32_t)pconn->cm_id));
+ DEBUG(10, ("ibw_fill_cq(cmid: %p)\n", pconn->cm_id));
for(i = pctx->opts.max_recv_wr; i!=0; i--) {
list.addr = (uintptr_t) pconn->buf_recv + pctx->opts.recv_bufsize * pconn->recv_index;
@@ -355,7 +362,7 @@
struct rdma_conn_param conn_param;
int rc;
- DEBUG(10, ("ibw_manage_connect(cmid: %u)", (uint32_t)cma_id));
+ DEBUG(10, ("ibw_manage_connect(cmid: %p)\n", cma_id));
rc = ibw_setup_cq_qp(conn);
if (rc)
return -1;
@@ -427,6 +434,9 @@
cma_id->context = (void *)conn;
DEBUG(10, ("pconn->cm_id %p\n", pconn->cm_id));
+ if (ibw_setup_cq_qp(conn))
+ goto error;
+
conn->state = IBWC_INIT;
pctx->connstate_func(ctx, conn);
@@ -434,9 +444,6 @@
if (!pconn->is_accepted) {
talloc_free(conn);
DEBUG(10, ("pconn->cm_id %p wasn't accepted\n", pconn->cm_id));
- } else {
- if (ibw_setup_cq_qp(conn))
- goto error;
}
/* TODO: clarify whether if it's needed by upper layer: */
@@ -598,23 +605,23 @@
struct ibw_wr *p;
int send_index;
- DEBUG(10, ("ibw_wc_send(cmid: %u, wr_id: %u, bl: %u)\n",
- (uint32_t)pconn->cm_id, (uint32_t)wc->wr_id, (uint32_t)wc->byte_len));
+ DEBUG(10, ("ibw_wc_send(cmid: %p, wr_id: %u, bl: %u)\n",
+ pconn->cm_id, (uint32_t)wc->wr_id, (uint32_t)wc->byte_len));
assert(pconn->cm_id->qp->qp_num==wc->qp_num);
- assert(wc->wr_id > pctx->opts.max_recv_wr);
+ assert(wc->wr_id >= pctx->opts.max_recv_wr);
send_index = wc->wr_id - pctx->opts.max_recv_wr;
pconn->wr_sent--;
if (send_index < pctx->opts.max_send_wr) {
- DEBUG(10, ("ibw_wc_send#1 %u", (int)wc->wr_id));
+ DEBUG(10, ("ibw_wc_send#1 %u\n", (int)wc->wr_id));
p = pconn->wr_index[send_index];
if (p->msg_large)
ibw_free_mr(&p->msg_large, &p->mr_large);
DLIST_REMOVE(pconn->wr_list_used, p);
DLIST_ADD(pconn->wr_list_avail, p);
} else { /* "extra" request - not optimized */
- DEBUG(10, ("ibw_wc_send#2 %u", (int)wc->wr_id));
+ DEBUG(10, ("ibw_wc_send#2 %u\n", (int)wc->wr_id));
for(p=pconn->extra_sent; p!=NULL; p=p->next)
if (p->wr_id==(int)wc->wr_id)
break;
@@ -643,8 +650,8 @@
static inline int ibw_append_to_part(struct ibw_conn_priv *pconn,
struct ibw_part *part, char **pp, uint32_t add_len, int info)
{
- DEBUG(10, ("ibw_append_to_part: cmid=%u, (bs=%u, len=%u, tr=%u), al=%u, i=%u\n",
- (uint32_t)pconn->cm_id, part->bufsize, part->len, part->to_read, add_len, info));
+ DEBUG(10, ("ibw_append_to_part: cmid=%p, (bs=%u, len=%u, tr=%u), al=%u, i=%u\n",
+ pconn->cm_id, part->bufsize, part->len, part->to_read, add_len, info));
/* allocate more if necessary - it's an "evergrowing" buffer... */
if (part->len + add_len > part->bufsize) {
@@ -681,12 +688,12 @@
static inline int ibw_wc_mem_threshold(struct ibw_conn_priv *pconn,
struct ibw_part *part, uint32_t threshold)
{
- DEBUG(10, ("ibw_wc_mem_threshold: cmid=%u, (bs=%u, len=%u, tr=%u), thr=%u\n",
- (uint32_t)pconn->cm_id, part->bufsize, part->len, part->to_read, threshold));
+ DEBUG(10, ("ibw_wc_mem_threshold: cmid=%p, (bs=%u, len=%u, tr=%u), thr=%u\n",
+ pconn->cm_id, part->bufsize, part->len, part->to_read, threshold));
if (part->bufsize > threshold) {
- DEBUG(3, ("ibw_wc_mem_threshold: cmid=%u, %u > %u\n",
- (uint32_t)pconn->cm_id, part->bufsize, threshold));
+ DEBUG(3, ("ibw_wc_mem_threshold: cmid=%p, %u > %u\n",
+ pconn->cm_id, part->bufsize, threshold));
talloc_free(part->buf);
part->buf = talloc_size(pconn, threshold);
if (part->buf==NULL) {
@@ -706,8 +713,8 @@
char *p;
uint32_t remain = wc->byte_len;
- DEBUG(10, ("ibw_wc_recv: cmid=%u, wr_id: %u, bl: %u\n",
- (uint32_t)pconn->cm_id, (uint32_t)wc->wr_id, remain));
+ DEBUG(10, ("ibw_wc_recv: cmid=%p, wr_id: %u, bl: %u\n",
+ pconn->cm_id, (uint32_t)wc->wr_id, remain));
assert(pconn->cm_id->qp->qp_num==wc->qp_num);
assert((int)wc->wr_id < pctx->opts.max_recv_wr);
@@ -872,14 +879,6 @@
}
DEBUG(10, ("created cm_id %p\n", pctx->cm_id));
- /* init verbs */
- pctx->pd = ibv_alloc_pd(pctx->cm_id->verbs);
- if (!pctx->pd) {
- sprintf(ibw_lasterr, "ibv_alloc_pd failed %d\n", errno);
- goto cleanup;
- }
- DEBUG(10, ("created pd %p\n", pctx->pd));
-
pctx->pagesize = sysconf(_SC_PAGESIZE);
return ctx;
@@ -937,7 +936,7 @@
sprintf(ibw_lasterr, "rdma_listen failed: %d\n", rc);
DEBUG(0, (ibw_lasterr));
return rc;
- }
+ }
return 0;
}
@@ -948,7 +947,7 @@
struct rdma_conn_param conn_param;
int rc;
- DEBUG(10, ("ibw_accept: cmid=%u\n", (uint32_t)pconn->cm_id));
+ DEBUG(10, ("ibw_accept: cmid=%p\n", pconn->cm_id));
conn->conn_userdata = conn_userdata;
memset(&conn_param, 0, sizeof(struct rdma_conn_param));
@@ -975,20 +974,21 @@
struct ibw_conn_priv *pconn = NULL;
int rc;
- DEBUG(10, ("ibw_connect: cmid=%u, addr=%s, port=%u\n", (uint32_t)pconn->cm_id,
- inet_ntoa(serv_addr->sin_addr), serv_addr->sin_port));
conn = ibw_conn_new(ctx);
conn->conn_userdata = conn_userdata;
pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
+ DEBUG(10, ("ibw_connect: addr=%s, port=%u\n", inet_ntoa(serv_addr->sin_addr), serv_addr->sin_port));
+ /* init cm */
rc = rdma_create_id(pctx->cm_channel, &pconn->cm_id, conn, RDMA_PS_TCP);
if (rc) {
rc = errno;
- sprintf(ibw_lasterr, "rdma_create_id error %d\n", rc);
+ sprintf(ibw_lasterr, "ibw_connect/rdma_create_id error %d\n", rc);
return rc;
}
+ DEBUG(10, ("ibw_connect: rdma_create_id succeeded, cm_id=%p\n", pconn->cm_id));
- rc = rdma_resolve_addr(pconn->cm_id, NULL, (struct sockaddr *) &serv_addr, 2000);
+ rc = rdma_resolve_addr(pconn->cm_id, NULL, (struct sockaddr *) serv_addr, 2000);
if (rc) {
sprintf(ibw_lasterr, "rdma_resolve_addr error %d\n", rc);
DEBUG(0, (ibw_lasterr));
@@ -1006,7 +1006,7 @@
struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
- DEBUG(10, ("ibw_disconnect: cmid=%u\n", (uint32_t)pconn->cm_id));
+ DEBUG(10, ("ibw_disconnect: cmid=%p\n", pconn->cm_id));
rc = rdma_disconnect(pctx->cm_id);
if (rc) {
@@ -1027,7 +1027,7 @@
struct ibw_wr *p = pconn->wr_list_avail;
if (p!=NULL) {
- DEBUG(10, ("ibw_alloc_send_buf#1: cmid=%u, len=%d\n", (uint32_t)pconn->cm_id, len));
+ DEBUG(10, ("ibw_alloc_send_buf#1: cmid=%p, len=%d\n", pconn->cm_id, len));
DLIST_REMOVE(pconn->wr_list_avail, p);
DLIST_ADD(pconn->wr_list_used, p);
@@ -1043,7 +1043,7 @@
*buf = (void *)p->msg_large;
}
} else {
- DEBUG(10, ("ibw_alloc_send_buf#2: cmid=%u, len=%d\n", (uint32_t)pconn->cm_id, len));
+ DEBUG(10, ("ibw_alloc_send_buf#2: cmid=%p, len=%d\n", pconn->cm_id, len));
/* not optimized */
p = pconn->extra_avail;
if (!p) {
@@ -1106,8 +1106,8 @@
};
struct ibv_send_wr *bad_wr;
- DEBUG(10, ("ibw_wc_send#1(cmid: %u, wrid: %u, n: %d)\n",
- (uint32_t)pconn->cm_id, (uint32_t)wr.wr_id, len));
+ DEBUG(10, ("ibw_send#1(cmid: %p, wrid: %u, n: %d)\n",
+ pconn->cm_id, (uint32_t)wr.wr_id, len));
list.addr = (uintptr_t)buf;
if (p->msg_large==NULL) {
@@ -1134,7 +1134,7 @@
return rc;
} /* else put the request into our own queue: */
- DEBUG(10, ("ibw_wc_send#2(cmid: %u, len: %u)\n", (uint32_t)pconn->cm_id, len));
+ DEBUG(10, ("ibw_send#2(cmid: %p, len: %u)\n", pconn->cm_id, len));
/* to be sent by ibw_wc_send */
DLIST_ADD_END(pconn->queue, p, struct ibw_wr *); /* TODO: optimize */
=== modified file 'ib/ibwrapper_internal.h'
--- a/ib/ibwrapper_internal.h 2007-01-04 15:44:41 +0000
+++ b/ib/ibwrapper_internal.h 2007-01-25 10:01:59 +0000
@@ -51,8 +51,6 @@
struct rdma_event_channel *cm_channel;
struct fd_event *cm_channel_event;
- struct ibv_pd *pd;
-
ibw_connstate_fn_t connstate_func; /* see ibw_init */
ibw_receive_fn_t receive_func; /* see ibw_init */
@@ -71,6 +69,7 @@
struct fd_event *verbs_channel_event;
struct rdma_cm_id *cm_id; /* client's cm id */
+ struct ibv_pd *pd;
int is_accepted;
struct ibv_cq *cq; /* qp is in cm_id */
=== modified file 'tests/ibwrapper_test.c'
--- a/tests/ibwrapper_test.c 2006-12-21 16:41:48 +0000
+++ b/tests/ibwrapper_test.c 2007-01-25 10:01:59 +0000
@@ -88,17 +88,20 @@
char *buf;
void *key;
struct ibwtest_ctx *tcx = talloc_get_type(conn->ctx->ctx_userdata, struct ibwtest_ctx);
+ uint32_t len;
DEBUG(10, ("test IBWC_CONNECTED\n"));
- if (ibw_alloc_send_buf(conn, (void **)&buf, &key, strlen(tcx->id)+2)) {
+ len = sizeof(uint32_t)+strlen(tcx->id)+2;
+ if (ibw_alloc_send_buf(conn, (void **)&buf, &key, len)) {
DEBUG(0, ("send_id: ibw_alloc_send_buf failed\n"));
return -1;
}
- buf[0] = (char)TESTOP_SEND_ID;
- strcpy(buf+1, tcx->id);
+ /* first sizeof(uint32_t) size bytes are for length */
+ buf[sizeof(uint32_t)] = (char)TESTOP_SEND_ID;
+ strcpy(buf+sizeof(uint32_t)+1, tcx->id);
- if (ibw_send(conn, buf, key, strlen(buf+1)+2)) {
+ if (ibw_send(conn, buf, key, len)) {
DEBUG(0, ("send_id: ibw_send error\n"));
return -1;
}
@@ -111,12 +114,16 @@
void *key;
uint32_t len;
+ if (conn->state!=IBWC_CONNECTED)
+ return 0; /* not yet up */
+
len = strlen(msg)+2 + sizeof(uint32_t);
if (ibw_alloc_send_buf(conn, (void **)&buf, &key, len)) {
fprintf(stderr, "send_test_msg: ibw_alloc_send_buf failed\n");
return -1;
}
+ p = buf;
p += sizeof(uint32_t);
p[0] = (char)TESTOP_SEND_DATA;
p++;
@@ -197,11 +204,15 @@
struct ibwtest_ctx *tcx = talloc_get_type(conn->ctx->ctx_userdata, struct ibwtest_ctx);
assert(conn!=NULL);
+ assert(n>=sizeof(uint32_t)+2);
pconn = talloc_get_type(conn->conn_userdata, struct ibwtest_conn);
- op = (enum testopcode)((char *)buf)[0];
+ op = (enum testopcode)((char *)buf)[sizeof(uint32_t)];
+ if (op==TESTOP_SEND_ID) {
+ pconn->id = talloc_strdup(pconn, ((char *)buf)+sizeof(uint32_t)+1);
+ }
DEBUG(11, ("[%d]msg from %s: \"%s\"(%d)\n", op,
- pconn->id ? pconn->id : NULL, ((char *)buf)+1, n));
+ pconn->id ? pconn->id : "NULL", ((char *)buf)+sizeof(uint32_t)+1, n));
if (tcx->is_server) {
char *buf2;
@@ -278,9 +289,10 @@
porcess_next = 0;
i++;
+ p = q; /* ++ at end */
}
if (*p==',') {
- *p = '\0';
+ *p = '\0'; /* ++ at end */
porcess_next = 1;
}
}
@@ -306,9 +318,9 @@
tcx->naddrs * sizeof(struct sockaddr_in));
for(i=0; i<tcx->naddrs; i++) {
p = tcx->addrs + i;
+ p->sin_family = AF_INET;
p->sin_addr.s_addr = inet_addr(attrs[i].name);
p->sin_port = atoi(attrs[i].value);
- p->sin_family = AF_INET;
}
return 0;
@@ -317,7 +329,7 @@
int ibwtest_init_server(struct ibwtest_ctx *tcx)
{
if (tcx->naddrs!=1) {
- fprintf(stderr, "incorrecr number of addrs(%d!=1)\n", tcx->naddrs);
+ fprintf(stderr, "incorrect number of addrs(%d!=1)\n", tcx->naddrs);
return -1;
}
@@ -325,6 +337,11 @@
DEBUG(0, ("ERROR: ibw_bind failed\n"));
return -1;
}
+
+ if (ibw_listen(tcx->ibwctx, 1)) {
+ DEBUG(0, ("ERROR: ibw_listen failed\n"));
+ return -1;
+ }
/* continued at IBWS_READY */
return 0;
@@ -357,7 +374,7 @@
testctx = tcx;
signal(SIGQUIT, ibwtest_sigquit_handler);
- while ((op=getopt(argc, argv, "i:o:d:m:s")) != -1) {
+ while ((op=getopt(argc, argv, "i:o:d:m:st:")) != -1) {
switch (op) {
case 'i':
tcx->id = talloc_strdup(tcx, optarg);
@@ -375,6 +392,9 @@
case 's':
tcx->is_server = 1;
break;
+ case 't':
+ tcx->nsec = (unsigned int)atoi(optarg);
+ break;
default:
fprintf(stderr, "ERROR: unknown option -%c\n", (char)op);
ibwtest_usage(tcx, argv[0]);
More information about the samba-cvs
mailing list