Rev 45: Made receiver handle partial packets. in http://samba.org/~tridge/psomogyi/

psomogyi at gamax.hu psomogyi at gamax.hu
Wed Dec 20 16:42:58 GMT 2006


------------------------------------------------------------
revno: 45
revision-id: psomogyi at gamax.hu-20061220164258-ejbcgxxkgksgnje8
parent: tridge at samba.org-20061219233527-6luodorqow1fms8j
parent: tridge at samba.org-20061219052703-70zyt2rylhthunx2
committer: Peter Somogyi <psomogyi at gamax.hu>
branch nick: ctdb
timestamp: Wed 2006-12-20 17:42:58 +0100
message:
  Made receiver handle partial packets.
modified:
  ib/ibwrapper.c                 ibwrapper.c-20061204130028-0125b4f5a72f4b11
  ib/ibwrapper_internal.h        ibwrapper_internal.h-20061204130028-47f0a7e658b16ca2
=== modified file 'ib/ibwrapper.c'
--- a/ib/ibwrapper.c	2006-12-18 19:52:49 +0000
+++ b/ib/ibwrapper.c	2006-12-20 16:42:58 +0000
@@ -49,6 +49,8 @@
 static void ibw_event_handler_verbs(struct event_context *ev,
 	struct fd_event *fde, uint16_t flags, void *private_data);
 static int ibw_fill_cq(struct ibw_conn *conn);
+static inline int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc);
+static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc);
 
 static void *ibw_alloc_mr(struct ibw_ctx_priv *pctx, struct ibw_conn_priv *pconn,
 	int n, struct ibv_mr **ppmr)
@@ -503,67 +505,61 @@
 
 	struct ibv_wc wc;
 	int rc;
-
-	rc = ibv_poll_cq(pconn->cq, 1, &wc);
-	if (rc!=1) {
-		sprintf(ibw_lasterr, "ibv_poll_cq error %d\n", rc);
-		goto error;
-	}
-	if (wc.status) {
-		sprintf(ibw_lasterr, "cq completion failed status %d\n",
-			wc.status);
-		goto error;
-	}
-
-	switch(wc.opcode) {
-	case IBV_WC_SEND:
-		{
-			struct ibw_wr	*p;
+	struct ibv_cq *ev_cq;
+	void          *ev_ctx;
+
+	/* TODO: check whether if it's good to have more channels here... */
+	rc = ibv_get_cq_event(pconn->verbs_channel, &ev_cq, &ev_ctx);
+	if (rc) {
+		sprintf(ibw_lasterr, "Failed to get cq_event with %d\n", rc);
+		goto error;
+	}
+	if (ev_cq != pconn->cq) {
+		sprintf(ibw_lasterr, "ev_cq(%u) != pconn->cq(%u)\n",
+			(unsigned int)ev_cq, (unsigned int)pconn->cq);
+		goto error;
+	}
+	rc = ibv_req_notify_cq(pconn->cq, 0);
+	if (rc) {
+		sprintf(ibw_lasterr, "Couldn't request CQ notification (%d)\n", rc);
+		goto error;
+	}
+
+	while((rc=ibv_poll_cq(pconn->cq, 1, &wc))==1) {
+		if (wc.status) {
+			sprintf(ibw_lasterr, "cq completion failed status %d\n",
+				wc.status);
+			goto error;
+		}
+
+		switch(wc.opcode) {
+		case IBV_WC_SEND:
+			DEBUG(10, ("send completion\n"));
+			if (ibw_wc_send(conn, &wc))
+				goto error;
+			break;
+
+		case IBV_WC_RDMA_WRITE:
+			DEBUG(10, ("rdma write completion\n"));
+			break;
 	
-			DEBUG(10, ("send completion\n"));
-			assert(pconn->cm_id->qp->qp_num==wc.qp_num);
-			assert(wc.wr_id < pctx->opts.max_send_wr);
-
-			p = pconn->wr_index[wc.wr_id];
-			if (p->msg_large) {
-				ibw_free_mr(&p->msg_large, &p->mr_large);
-			}
-
-			DLIST_REMOVE(pconn->wr_list_used, p);
-			DLIST_ADD(pconn->wr_list_avail, p);
-		}
-		break;
-
-	case IBV_WC_RDMA_WRITE:
-		DEBUG(10, ("rdma write completion\n"));
-		break;
-
-	case IBV_WC_RDMA_READ:
-		DEBUG(10, ("rdma read completion\n"));
-		break;
-
-	case IBV_WC_RECV:
-		{
-			int	recv_index;
-
+		case IBV_WC_RDMA_READ:
+			DEBUG(10, ("rdma read completion\n"));
+			break;
+
+		case IBV_WC_RECV:
 			DEBUG(10, ("recv completion\n"));
-			assert(pconn->cm_id->qp->qp_num==wc.qp_num);
-			assert((int)wc.wr_id > pctx->opts.max_send_wr);
-			recv_index = (int)wc.wr_id - pctx->opts.max_send_wr;
-			assert(recv_index < pctx->opts.max_recv_wr);
-			assert(wc.byte_len <= pctx->opts.recv_bufsize);
-
-/* TODO: take care of fragmented messages !!! */
-			pctx->receive_func(conn,
-				pconn->buf_recv + (recv_index * pctx->opts.recv_bufsize),
-				wc.byte_len);
-			if (ibw_refill_cq_recv(conn))
+			if (ibw_wc_recv(conn, &wc))
 				goto error;
+			break;
+
+		default:
+			sprintf(ibw_lasterr, "unknown completion %d\n", wc.opcode);
+			goto error;
 		}
-		break;
-
-	default:
-		sprintf(ibw_lasterr, "unknown completion %d\n", wc.opcode);
+	}
+	if (rc!=0) {
+		sprintf(ibw_lasterr, "ibv_poll_cq error %d\n", rc);
 		goto error;
 	}
 
@@ -574,6 +570,163 @@
 	pctx->connstate_func(NULL, conn);
 }
 
+static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc)
+{
+	struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
+	struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
+	struct ibw_wr	*p;
+
+	assert(pconn->cm_id->qp->qp_num==wc->qp_num);
+	assert(wc->wr_id < pctx->opts.max_send_wr);
+
+	p = pconn->wr_index[wc->wr_id];
+	if (p->msg_large) {
+		ibw_free_mr(&p->msg_large, &p->mr_large);
+	}
+
+	DLIST_REMOVE(pconn->wr_list_used, p);
+	DLIST_ADD(pconn->wr_list_avail, p);
+
+	return 0;
+}
+
+static inline int ibw_append_to_part(void *memctx, struct ibw_part *part,
+	char **pp, uint32_t add_len, int info)
+{
+	/* allocate more if necessary - it's an "evergrowing" buffer... */
+	if (part->len + add_len > part->bufsize) {
+		if (part->buf==NULL) {
+			assert(part->len==0);
+			part->buf = talloc_size(memctx, add_len);
+			if (part->buf==NULL) {
+				sprintf(ibw_lasterr, "recv talloc_size error (%u) #%d\n",
+					add_len, info);
+				return -1;
+			}
+			part->bufsize = add_len;
+		} else {
+			part->buf = talloc_realloc_size(memctx,
+				part->buf, part->len + add_len);
+			if (part->buf==NULL) {
+				sprintf(ibw_lasterr, "recv realloc error (%u + %u) #%d\n",
+					part->len, add_len, info);
+				return -1;
+			}
+		}
+		part->bufsize = part->len + add_len;
+	}
+
+	/* consume pp */
+	memcpy(part->buf + part->len, *pp, add_len);
+	*pp += add_len;
+	part->len += add_len;
+	part->to_read -= add_len;
+
+	return 0;
+}
+
+static inline int ibw_wc_mem_threshold(void *memctx, struct ibw_part *part, uint32_t threshold)
+{
+	if (part->bufsize > threshold) {
+		talloc_free(part->buf);
+		part->buf = talloc_size(memctx, threshold);
+		if (part->buf==NULL) {
+			sprintf(ibw_lasterr, "talloc_size failed\n");
+			return -1;
+		}
+		part->bufsize = threshold;
+	}
+	return 0;
+}
+
+static inline int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc)
+{
+	struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
+	struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
+	int	recv_index;
+	char	*p;
+	uint32_t	remain;
+	struct ibw_part	*part;
+
+	assert(pconn->cm_id->qp->qp_num==wc->qp_num);
+	assert((int)wc->wr_id > pctx->opts.max_send_wr);
+	recv_index = (int)wc->wr_id - pctx->opts.max_send_wr;
+	assert(recv_index < pctx->opts.max_recv_wr);
+	assert(wc->byte_len <= pctx->opts.recv_bufsize);
+
+	p = pconn->buf_recv + (recv_index * pctx->opts.recv_bufsize);
+	part = &pconn->part;
+
+	remain = wc->byte_len;
+	while(remain) {
+		/* here always true: (part->len!=0 && part->to_read!=0) ||
+			(part->len==0 && part->to_read==0) */
+		if (part->len) { /* is there a partial msg to be continued? */
+			int read_len = (part->to_read<=remain) ? part->to_read : remain;
+			if (ibw_append_to_part(pconn, part, &p, read_len, 421))
+				goto error;
+			remain -= read_len;
+
+			if (part->len<=sizeof(uint32_t) && part->to_read==0) {
+				assert(part->len==sizeof(uint32_t));
+				/* set it again now... */
+				part->to_read = *((uint32_t *)(part->buf));
+				if (part->to_read<sizeof(uint32_t)) {
+					sprintf(ibw_lasterr, "got msglen=%u #2\n", part->to_read);
+					goto error;
+				}
+				part->to_read -= sizeof(uint32_t); /* it's already read */
+			}
+
+			if (part->to_read==0) {
+				pctx->receive_func(conn, part->buf, part->len);
+				part->len = 0; /* tells not having partial data (any more) */
+				if (ibw_wc_mem_threshold(pconn, part, pctx->opts.recv_threshold))
+					goto error;
+			}
+		} else {
+			if (remain>=sizeof(uint32_t)) {
+				uint32_t msglen = *(uint32_t *)p;
+				if (msglen<sizeof(uint32_t)) {
+					sprintf(ibw_lasterr, "got msglen=%u\n", msglen);
+					goto error;
+				}
+
+				/* mostly awaited case: */
+				if (msglen<=remain) {
+					pctx->receive_func(conn, p, msglen);
+					p += msglen;
+					remain -= msglen;
+				} else {
+					part->to_read = msglen;
+					/* part->len is already 0 */
+					if (ibw_append_to_part(pconn, part, &p, remain, 422))
+						goto error;
+					remain = 0; /* to be continued ... */
+					/* part->to_read > 0 here */
+				}
+			} else { /* edge case: */
+				part->to_read = sizeof(uint32_t);
+				/* part->len is already 0 */
+				if (ibw_append_to_part(pconn, part, &p, remain, 423))
+					goto error;
+				remain = 0;
+				/* part->to_read > 0 here */
+			}
+		}
+	} /* <remain> is always decreased at least by 1 */
+
+	if (ibw_refill_cq_recv(conn))
+		goto error;
+
+	return 0;
+
+error:
+	DEBUG(0, ("ibw_wc_recv error: %s", ibw_lasterr));
+	conn->state = IBWC_ERROR;
+	return -1;
+}
+
 static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct ibw_opts *opts)
 {
 	int	i;
@@ -583,6 +736,7 @@
 	opts->max_recv_wr = 1024;
 	opts->avg_send_size = 1024;
 	opts->recv_bufsize = 256;
+	opts->recv_threshold = 1 * 1024 * 1024;
 
 	for(i=0; i<nattr; i++) {
 		name = attr[i].name;
@@ -597,6 +751,8 @@
 			opts->avg_send_size = atoi(value);
 		else if (strcmp(name, "recv_bufsize")==0)
 			opts->recv_bufsize = atoi(value);
+		else if (strcmp(name, "recv_threshold")==0)
+			opts->recv_threshold = atoi(value);
 		else {
 			sprintf(ibw_lasterr, "ibw_init: unknown name %s\n", name);
 			return -1;
@@ -843,7 +999,7 @@
 	};
 	struct ibv_send_wr *bad_wr;
 
-	if (n + sizeof(long)<=pctx->opts.avg_send_size) {
+	if (n + sizeof(uint32_t)<=pctx->opts.avg_send_size) {
 		assert((p->msg + sizeof(long))==(char *)buf);
 		list.lkey = pconn->mr_send->lkey;
 		list.addr = (uintptr_t) p->msg;

=== modified file 'ib/ibwrapper_internal.h'
--- a/ib/ibwrapper_internal.h	2006-12-18 19:52:49 +0000
+++ b/ib/ibwrapper_internal.h	2006-12-20 16:42:58 +0000
@@ -22,10 +22,11 @@
  */
 
 struct ibw_opts {
-	int	max_send_wr;
-	int	max_recv_wr;
-	int	avg_send_size;
-	int	recv_bufsize;
+	uint32_t	max_send_wr;
+	uint32_t	max_recv_wr;
+	uint32_t	avg_send_size;
+	uint32_t	recv_bufsize;
+	uint32_t	recv_threshold;
 };
 
 struct ibw_wr {
@@ -56,6 +57,13 @@
 	long	pagesize; /* sysconf result for memalign */
 };
 
+struct ibw_part {
+	char *buf; /* talloced memory buffer */
+	uint32_t bufsize; /* allocated size of buf - always grows */
+	uint32_t len; /* message part length */
+	uint32_t to_read; /* 4 or *((uint32_t)buf) if len>=sizeof(uint32_t) */
+};
+
 struct ibw_conn_priv {
 	struct ibv_comp_channel *verbs_channel;
 	struct fd_event *verbs_channel_event;
@@ -74,6 +82,7 @@
 	/* buf_recv is a ring buffer */
 	char *buf_recv; /* max_recv_wr * avg_recv_size */
 	struct ibv_mr *mr_recv;
-	int recv_index; /* index of the next recv buffer */
+	int recv_index; /* index of the next recv buffer when refilling */
+	struct ibw_part part;
 };
 



More information about the samba-cvs mailing list