svn commit: samba r18180 - in branches/tmp/vl-messaging/source: include lib torture

jmcd at samba.org jmcd at samba.org
Wed Sep 6 14:50:52 GMT 2006


Author: jmcd
Date: 2006-09-06 14:50:52 +0000 (Wed, 06 Sep 2006)
New Revision: 18180

WebSVN: http://websvn.samba.org/cgi-bin/viewcvs.cgi?view=rev&root=samba&rev=18180

Log:
>From Aleksey Fedoseev, 

The patch consists of several modifications:

1) added preallocation of incoming stream buffer (minus 1 malloc/free
per incoming message)
2) corrected program exit (cleaning up communication buffer / closing
dispatcher's sockets)
3) added message size test


Modified:
   branches/tmp/vl-messaging/source/include/messages.h
   branches/tmp/vl-messaging/source/lib/messages_socket.c
   branches/tmp/vl-messaging/source/lib/messages_stream.c
   branches/tmp/vl-messaging/source/torture/msgtest.c


Changeset:
Modified: branches/tmp/vl-messaging/source/include/messages.h
===================================================================
--- branches/tmp/vl-messaging/source/include/messages.h	2006-09-06 14:29:57 UTC (rev 18179)
+++ branches/tmp/vl-messaging/source/include/messages.h	2006-09-06 14:50:52 UTC (rev 18180)
@@ -125,6 +125,7 @@
 	struct message_list *prev, *next;
 	struct message_rec *msg;
 	size_t processed; /* number of read/written bytes */
+	size_t allocated; /* number of allocated bytes */
 };
 
 #endif

Modified: branches/tmp/vl-messaging/source/lib/messages_socket.c
===================================================================
--- branches/tmp/vl-messaging/source/lib/messages_socket.c	2006-09-06 14:29:57 UTC (rev 18179)
+++ branches/tmp/vl-messaging/source/lib/messages_socket.c	2006-09-06 14:50:52 UTC (rev 18180)
@@ -101,9 +101,7 @@
 	cleanup_messages(wait_send);
 
 	if (socket_fd >= 0) {
-		if (mtype == MESSAGING_TYPE_STREAM) {
-			shutdown_stream_sockets();
-		}
+		shutdown_stream_sockets();
 		close(socket_fd);
 		socket_fd = -1;
 	}

Modified: branches/tmp/vl-messaging/source/lib/messages_stream.c
===================================================================
--- branches/tmp/vl-messaging/source/lib/messages_stream.c	2006-09-06 14:29:57 UTC (rev 18179)
+++ branches/tmp/vl-messaging/source/lib/messages_stream.c	2006-09-06 14:50:52 UTC (rev 18180)
@@ -70,7 +70,7 @@
 };
 static struct messaging_client *clients_cache = NULL;
 
-static struct message_list *tcp_incoming = NULL;
+static struct message_list *disp_incoming = NULL;
 static int dispatcher_pipe = -1;
 
 /* approximate maximum number of connected clients in the list */
@@ -80,6 +80,8 @@
 #define MESSAGING_DISPATCHER_SOCKET	"dispatcher"
 #define MESSAGING_DISPATCHER_LOCKFILE "dispatcher.pid"
 
+#define INITIAL_CONTAINER_SIZE		64
+
 static const char *dispatch_path(void)
 {
 	static char *name = NULL;
@@ -97,6 +99,56 @@
 }
 
 /****************************************************************************
+ Allocate/reallocate message container
+****************************************************************************/
+
+struct message_list *allocate_container(TALLOC_CTX *mem_ctx,
+					struct message_list *cnt,
+					size_t needsize) 
+{
+	uint8_t *buffer;
+	size_t size = INITIAL_CONTAINER_SIZE;
+	
+	while(size < needsize) size *= 2;
+
+	if(cnt == NULL) {		
+		cnt = TALLOC_ZERO_P(mem_ctx, struct message_list);
+		if(cnt == NULL) {
+			DEBUG(0, ("talloc failed\n"));
+			return NULL;
+		}
+		buffer = TALLOC_ARRAY(cnt, uint8_t, size);
+		if(buffer == NULL) {
+			DEBUG(0, ("talloc failed\n"));
+			TALLOC_FREE(cnt);
+			return NULL;
+		}
+	} else {
+
+		SMB_ASSERT(size > cnt->allocated);
+
+		buffer = TALLOC_REALLOC_ARRAY(cnt, cnt->msg, uint8_t, size);
+		if(buffer == NULL) {
+			DEBUG(0, ("realloc failed\n"));
+			TALLOC_FREE(cnt->msg);
+			/* try to allocate with talloc */
+			buffer = TALLOC_ARRAY(cnt, uint8_t, size);
+			if(buffer == NULL) {
+				DEBUG(0, ("talloc failed\n"));
+				TALLOC_FREE(cnt);
+				return NULL;
+			}
+		}
+	}
+
+	cnt->msg = (struct message_rec*)buffer;
+	cnt->processed = (size_t)-1;
+	cnt->allocated = size;
+
+	return cnt;
+}
+
+/****************************************************************************
  Client's queue helper functions
 ****************************************************************************/
 
@@ -145,7 +197,8 @@
 	
 	for (client = clients_cache; client != NULL; client = client->next) {
 		clients_count++;
-		if(client->outgoing == NULL && client->incoming == NULL) {
+		if(client->outgoing == NULL &&
+		   client->incoming->processed == (size_t)-1) {
 			last_free = client;
 		}
 	}
@@ -194,15 +247,24 @@
 		drop_old_client();
 	}
 
+	c->next = c->prev = NULL;
+
 	/* set invalid pid cause we don't know yet who is the peer */
 	c->pid = procid_self();
 	c->pid.pid = -1; 
 
 	c->fd = fd;
-	c->incoming = NULL;
+
+	/* be sure that we'll close socket */
+	talloc_set_destructor(c, messaging_client_destr); 
+
+	c->incoming = allocate_container(c, NULL, 0);
+	if (c->incoming == NULL) {
+		TALLOC_FREE(c);
+		return ; 
+	}
 	c->outgoing = NULL;
 	c->connected = True;
-	talloc_set_destructor(c, messaging_client_destr);
 
 	DEBUG(10, ("Adding new client\n"));
 
@@ -291,12 +353,18 @@
 		return;
 	}
 
+	c->next = c->prev = NULL;
+
 	/* set foreign dispatcher pid */
 	c->pid.ip = addr.sin_addr;
 	c->pid.pid = MESSAGING_DISPATCHER_PID; 
 
 	c->fd = fd;
-	c->incoming = NULL;
+	c->incoming = allocate_container(c, NULL, 0);
+	if(c->incoming == NULL) {
+		TALLOC_FREE(c);
+		return ;
+	}
 	c->outgoing = NULL;
 	c->connected = True;
 	talloc_set_destructor(c, messaging_client_destr);
@@ -378,9 +446,17 @@
 		return NULL;
 	}
 
+	result->next = result->prev = NULL;
+
+	result->fd = -1;
+
 	result->pid = *pid;
 	result->connected = False;
-	result->incoming = NULL;
+	result->incoming = allocate_container(result, NULL, 0);
+	if(result->incoming == NULL) {
+		TALLOC_FREE(result);
+		return NULL;
+	}
 	result->outgoing = NULL;
 
 	if(mtype == MESSAGING_TYPE_DISPATCHER) {
@@ -489,7 +565,7 @@
 
 static BOOL container_full(const struct message_list *li)
 {
-	return ((li != NULL) && (li->processed != 0) &&
+	return ((li != NULL) && (li->processed != (size_t)-1) &&
 		(li->processed == li->msg->len));
 }
 
@@ -505,19 +581,9 @@
 	ssize_t nread;
 	uint8_t *target;
 
-	if ((cnt != NULL) && container_full(cnt)) {
-		TALLOC_FREE(cnt);
-	}
-
-	if (cnt == NULL) {
-		cnt = TALLOC_ZERO_P(mem_ctx, struct message_list);
-		if (cnt == NULL) {
-			DEBUG(0, ("talloc failed\n"));
-			return NULL;
-		}
-	}
-
-	if (cnt->msg == NULL) {
+	SMB_ASSERT(cnt != NULL);
+	
+	if (cnt->processed == (size_t)-1) {
 		/* First, read message length */
 		target = (uint8_t *)(&len);
 		to_read = sizeof(size_t);
@@ -529,38 +595,44 @@
 	nread = sys_read(fd, target, to_read);
 	if(nread == 0) {
 		DEBUG(10, ("Messaging peer closed\n"));
-		TALLOC_FREE(cnt);
+		cnt->processed = (size_t)-1;
 		return NULL;
 	}
 	if (nread < 0) {
 		DEBUG(5, ("Error while reading from socket (errno = %d)\n",
 			  errno));
-		TALLOC_FREE(cnt);
+		cnt->processed = (size_t)-1;		
 		return NULL;
 	}
 
-	cnt->processed += nread;
+	if(cnt->processed == (size_t)-1) {
 
-	if (cnt->msg == NULL) {
-		if(cnt->processed < sizeof(size_t)) {
+		if(nread < sizeof(size_t)) {
 			DEBUG(0, ("Received less that %d bytes!\n",
 				  sizeof(size_t)));
-			TALLOC_FREE(cnt);
+			cnt->processed = (size_t)-1;
 			return NULL;
 		}
 
 		DEBUG(10, ("Receiving msg of length %d\n",
 			   len));
-		cnt->msg = (struct message_rec*)TALLOC_ZERO_ARRAY(cnt, char,
-								  len);
-		if (cnt->msg == NULL) {
-			DEBUG(0, ("talloc failed\n"));
-			TALLOC_FREE(cnt);
-			return NULL;
+
+		if(len > cnt->allocated) {
+			DEBUG(10, ("Reallocating incoming container"
+				   " to fit size %d\n", len));
+			cnt = allocate_container(mem_ctx, cnt, len);
+			if(cnt == NULL) {
+				DEBUG(0, ("Reallocating failed!\n"));
+				return NULL;		
+			}
 		}
-		cnt->msg->len = len;
+
+		cnt->processed = 0;
+		cnt->msg->len = len;		
 	}
 
+	cnt->processed += nread;
+
 	return cnt;
 }
 
@@ -570,23 +642,23 @@
 
 static BOOL receive_one_message(int fd,
 				TALLOC_CTX *mem_ctx,
-				struct message_list **msg,
+				struct message_list *msg,
 				BOOL* received)
 {
 	*received = False;
-	*msg = read_from_stream_socket(fd, mem_ctx, *msg);
-	if (*msg == NULL) {
+
+	if (read_from_stream_socket(fd, mem_ctx, msg) == NULL) {
 		DEBUG(5, ("failed to read\n"));
 		return False;
 	}
 
-	if (!container_full(*msg)) {
+	if (!container_full(msg)) {
 		return True;
 	}
 
-	SMB_ASSERT((*msg)->msg != NULL);
+	SMB_ASSERT(msg->msg != NULL);
 
-	if ((*msg)->msg->len < sizeof(struct message_rec)) {
+	if (msg->msg->len < sizeof(struct message_rec)) {
 		DEBUG(5, ("Message too short\n"));
 		return False;
 	}
@@ -605,7 +677,7 @@
 	struct message_rec *msg_rec;
 	BOOL received = False;
 
-	if(!receive_one_message(c->fd, c, &c->incoming, &received)) {
+	if(!receive_one_message(c->fd, c, c->incoming, &received)) {
 		DEBUG(5, ("killing client %s\n", procid_str_static(&c->pid)));
 		return False;
 	}
@@ -621,7 +693,8 @@
 			DEBUG(10, ("Got hello from %s\n",
 				   procid_str_static(&msg_rec->src)));
 			c->pid = msg_rec->src;
-			TALLOC_FREE((c->incoming));
+
+			c->incoming->processed = (size_t)-1;
 		}
 	}
 
@@ -784,11 +857,7 @@
 				
 				DLIST_ADD_END((*received), li, tmp);
 
-				/* 
-				   We don't free client->incoming, because it
-				   will be freed in a next
-				   read_from_stream_socket call.
-				*/
+				client->incoming->processed = (size_t)-1;
 			}
 		}
 		
@@ -834,6 +903,8 @@
 		close(dispatcher_pipe);
 		dispatcher_pipe = -1;
 	}
+	
+	TALLOC_FREE(disp_incoming);
 }
 
 /****************************************************************************
@@ -1003,6 +1074,11 @@
 
 #ifdef CLUSTER_SUPPORT
 	tcp_fd = open_remote_listener();
+	if (tcp_fd < 0) {
+		DEBUG(0, ("Can't create tcp socket\n"));
+		close(fd);
+		exit(1);
+	}
 #endif /* CLUSTER_SUPPORT */
 
 	ok = True;
@@ -1027,6 +1103,11 @@
 
 	shutdown_stream_sockets();
 
+	close(fd);
+#ifdef CLUSTER_SUPPORT
+	close(tcp_fd);
+#endif
+
 	/* 
 	   Presently we don't remove socket & lock file. Should be fixed later
 	*/
@@ -1193,6 +1274,15 @@
 	return -1;
 
 good:
+
+	SMB_ASSERT(disp_incoming == NULL);
+	disp_incoming = allocate_container(NULL, NULL, 0);
+	if(disp_incoming == NULL) {
+		DEBUG(0, ("Can't initialize tcp incoming buffer\n"));
+		close(sock);
+		return -1;
+	}
+
 	DEBUG(10, ("Dispatcher connected, say hello.\n"));
 	/* say HELLO to the dispatcher to identify oneself */
 	message_send_pid_socket(pid_to_procid(MESSAGING_DISPATCHER_PID),
@@ -1218,15 +1308,35 @@
 void receive_on_socket_dispatcher(int socket_fd, struct message_list **queue)
 {
 	BOOL received = False;
-	if(!receive_one_message(socket_fd, NULL, &tcp_incoming, &received)) {
+	if(!receive_one_message(socket_fd, NULL, disp_incoming, &received)) {
 		DEBUG(0, ("Can't receive a message through the daemon\n"));
 		return ;
 	}	
 
 	if(received) {
-		struct message_list *tmp;
-		DLIST_ADD_END((*queue), tcp_incoming, tmp);
-		/* give memory control to a higher level code */
-		tcp_incoming = NULL; 
+		struct message_list *tmp, *li;
+		
+		SMB_ASSERT(disp_incoming != NULL && disp_incoming->msg != NULL);
+		
+		li = TALLOC_ZERO_P(NULL, struct message_list);
+		if(li == NULL) {
+			DEBUG(0, ("talloc failed\n"));
+			return ;
+		}
+		li->msg = (struct message_rec*)TALLOC_ARRAY(
+			li, uint8_t,
+			disp_incoming->msg->len);
+		if(li->msg == NULL) {
+			DEBUG(0, ("talloc failed\n"));
+			TALLOC_FREE(li);
+			return ;
+		}
+		
+		memcpy(li->msg, disp_incoming->msg,
+		       disp_incoming->msg->len);
+
+		DLIST_ADD_END((*queue), li, tmp);
+
+		disp_incoming->processed = (size_t)-1; 
 	}
 }

Modified: branches/tmp/vl-messaging/source/torture/msgtest.c
===================================================================
--- branches/tmp/vl-messaging/source/torture/msgtest.c	2006-09-06 14:29:57 UTC (rev 18179)
+++ branches/tmp/vl-messaging/source/torture/msgtest.c	2006-09-06 14:50:52 UTC (rev 18180)
@@ -56,7 +56,10 @@
 {
 	pid_t pid;
 	int i, n;
-	char buf[12], *p, *addr;
+	char buf[12], *large_buf;
+#ifdef CLUSTER_SUPPORT
+	char *p, *addr;
+#endif
 	struct timeval dispatch_timeout = timeval_set(0, DISPATCH_TIMEOUT);
 	struct process_id receiver;
 
@@ -181,6 +184,36 @@
 		       (ping_count+pong_count)/timeval_elapsed(&tv));
 	}	
 
+	/* large messages test */
+	pong_count = 0;
+
+	{
+		struct timeval tv = timeval_current();
+		size_t timelimit = n;
+		size_t ping_count = 0;
+
+		printf("Starting large messages test.\n");
+
+		for(i = 1; i <= (1 << 16) && timeval_elapsed(&tv) < timelimit; i *= 2) {
+			large_buf = TALLOC_ZERO(NULL, i);
+			if(large_buf == NULL) {
+				printf("talloc failed!\n");
+				break;
+			}
+			
+			printf("size %d... ", i);
+			
+			if(message_send_pid(receiver, MSG_PING,
+					    large_buf, i, False)) ping_count++;
+			
+			while (ping_count > pong_count && timeval_elapsed(&tv) < timelimit) {
+				message_select_and_dispatch(&dispatch_timeout);
+			}
+
+			printf("passed\n");
+		}		
+	}
+
 	message_end();
 
 	return (0);



More information about the samba-cvs mailing list