svn commit: samba r18180 - in branches/tmp/vl-messaging/source:
include lib torture
jmcd at samba.org
jmcd at samba.org
Wed Sep 6 14:50:52 GMT 2006
Author: jmcd
Date: 2006-09-06 14:50:52 +0000 (Wed, 06 Sep 2006)
New Revision: 18180
WebSVN: http://websvn.samba.org/cgi-bin/viewcvs.cgi?view=rev&root=samba&rev=18180
Log:
>From Aleksey Fedoseev,
The patch consists of several modifications:
1) added preallocation of incoming stream buffer (minus 1 malloc/free
per incoming message)
2) corrected program exit (cleaning up communication buffer / closing
dispatcher's sockets)
3) added message size test
Modified:
branches/tmp/vl-messaging/source/include/messages.h
branches/tmp/vl-messaging/source/lib/messages_socket.c
branches/tmp/vl-messaging/source/lib/messages_stream.c
branches/tmp/vl-messaging/source/torture/msgtest.c
Changeset:
Modified: branches/tmp/vl-messaging/source/include/messages.h
===================================================================
--- branches/tmp/vl-messaging/source/include/messages.h 2006-09-06 14:29:57 UTC (rev 18179)
+++ branches/tmp/vl-messaging/source/include/messages.h 2006-09-06 14:50:52 UTC (rev 18180)
@@ -125,6 +125,7 @@
struct message_list *prev, *next;
struct message_rec *msg;
size_t processed; /* number of read/written bytes */
+ size_t allocated; /* number of allocated bytes */
};
#endif
Modified: branches/tmp/vl-messaging/source/lib/messages_socket.c
===================================================================
--- branches/tmp/vl-messaging/source/lib/messages_socket.c 2006-09-06 14:29:57 UTC (rev 18179)
+++ branches/tmp/vl-messaging/source/lib/messages_socket.c 2006-09-06 14:50:52 UTC (rev 18180)
@@ -101,9 +101,7 @@
cleanup_messages(wait_send);
if (socket_fd >= 0) {
- if (mtype == MESSAGING_TYPE_STREAM) {
- shutdown_stream_sockets();
- }
+ shutdown_stream_sockets();
close(socket_fd);
socket_fd = -1;
}
Modified: branches/tmp/vl-messaging/source/lib/messages_stream.c
===================================================================
--- branches/tmp/vl-messaging/source/lib/messages_stream.c 2006-09-06 14:29:57 UTC (rev 18179)
+++ branches/tmp/vl-messaging/source/lib/messages_stream.c 2006-09-06 14:50:52 UTC (rev 18180)
@@ -70,7 +70,7 @@
};
static struct messaging_client *clients_cache = NULL;
-static struct message_list *tcp_incoming = NULL;
+static struct message_list *disp_incoming = NULL;
static int dispatcher_pipe = -1;
/* approximate maximum number of connected clients in the list */
@@ -80,6 +80,8 @@
#define MESSAGING_DISPATCHER_SOCKET "dispatcher"
#define MESSAGING_DISPATCHER_LOCKFILE "dispatcher.pid"
+#define INITIAL_CONTAINER_SIZE 64
+
static const char *dispatch_path(void)
{
static char *name = NULL;
@@ -97,6 +99,56 @@
}
/****************************************************************************
+ Allocate/reallocate message container
+****************************************************************************/
+
+struct message_list *allocate_container(TALLOC_CTX *mem_ctx,
+ struct message_list *cnt,
+ size_t needsize)
+{
+ uint8_t *buffer;
+ size_t size = INITIAL_CONTAINER_SIZE;
+
+ while(size < needsize) size *= 2;
+
+ if(cnt == NULL) {
+ cnt = TALLOC_ZERO_P(mem_ctx, struct message_list);
+ if(cnt == NULL) {
+ DEBUG(0, ("talloc failed\n"));
+ return NULL;
+ }
+ buffer = TALLOC_ARRAY(cnt, uint8_t, size);
+ if(buffer == NULL) {
+ DEBUG(0, ("talloc failed\n"));
+ TALLOC_FREE(cnt);
+ return NULL;
+ }
+ } else {
+
+ SMB_ASSERT(size > cnt->allocated);
+
+ buffer = TALLOC_REALLOC_ARRAY(cnt, cnt->msg, uint8_t, size);
+ if(buffer == NULL) {
+ DEBUG(0, ("realloc failed\n"));
+ TALLOC_FREE(cnt->msg);
+ /* try to allocate with talloc */
+ buffer = TALLOC_ARRAY(cnt, uint8_t, size);
+ if(buffer == NULL) {
+ DEBUG(0, ("talloc failed\n"));
+ TALLOC_FREE(cnt);
+ return NULL;
+ }
+ }
+ }
+
+ cnt->msg = (struct message_rec*)buffer;
+ cnt->processed = (size_t)-1;
+ cnt->allocated = size;
+
+ return cnt;
+}
+
+/****************************************************************************
Client's queue helper functions
****************************************************************************/
@@ -145,7 +197,8 @@
for (client = clients_cache; client != NULL; client = client->next) {
clients_count++;
- if(client->outgoing == NULL && client->incoming == NULL) {
+ if(client->outgoing == NULL &&
+ client->incoming->processed == (size_t)-1) {
last_free = client;
}
}
@@ -194,15 +247,24 @@
drop_old_client();
}
+ c->next = c->prev = NULL;
+
/* set invalid pid cause we don't know yet who is the peer */
c->pid = procid_self();
c->pid.pid = -1;
c->fd = fd;
- c->incoming = NULL;
+
+ /* be sure that we'll close socket */
+ talloc_set_destructor(c, messaging_client_destr);
+
+ c->incoming = allocate_container(c, NULL, 0);
+ if (c->incoming == NULL) {
+ TALLOC_FREE(c);
+ return ;
+ }
c->outgoing = NULL;
c->connected = True;
- talloc_set_destructor(c, messaging_client_destr);
DEBUG(10, ("Adding new client\n"));
@@ -291,12 +353,18 @@
return;
}
+ c->next = c->prev = NULL;
+
/* set foreign dispatcher pid */
c->pid.ip = addr.sin_addr;
c->pid.pid = MESSAGING_DISPATCHER_PID;
c->fd = fd;
- c->incoming = NULL;
+ c->incoming = allocate_container(c, NULL, 0);
+ if(c->incoming == NULL) {
+ TALLOC_FREE(c);
+ return ;
+ }
c->outgoing = NULL;
c->connected = True;
talloc_set_destructor(c, messaging_client_destr);
@@ -378,9 +446,17 @@
return NULL;
}
+ result->next = result->prev = NULL;
+
+ result->fd = -1;
+
result->pid = *pid;
result->connected = False;
- result->incoming = NULL;
+ result->incoming = allocate_container(result, NULL, 0);
+ if(result->incoming == NULL) {
+ TALLOC_FREE(result);
+ return NULL;
+ }
result->outgoing = NULL;
if(mtype == MESSAGING_TYPE_DISPATCHER) {
@@ -489,7 +565,7 @@
static BOOL container_full(const struct message_list *li)
{
- return ((li != NULL) && (li->processed != 0) &&
+ return ((li != NULL) && (li->processed != (size_t)-1) &&
(li->processed == li->msg->len));
}
@@ -505,19 +581,9 @@
ssize_t nread;
uint8_t *target;
- if ((cnt != NULL) && container_full(cnt)) {
- TALLOC_FREE(cnt);
- }
-
- if (cnt == NULL) {
- cnt = TALLOC_ZERO_P(mem_ctx, struct message_list);
- if (cnt == NULL) {
- DEBUG(0, ("talloc failed\n"));
- return NULL;
- }
- }
-
- if (cnt->msg == NULL) {
+ SMB_ASSERT(cnt != NULL);
+
+ if (cnt->processed == (size_t)-1) {
/* First, read message length */
target = (uint8_t *)(&len);
to_read = sizeof(size_t);
@@ -529,38 +595,44 @@
nread = sys_read(fd, target, to_read);
if(nread == 0) {
DEBUG(10, ("Messaging peer closed\n"));
- TALLOC_FREE(cnt);
+ cnt->processed = (size_t)-1;
return NULL;
}
if (nread < 0) {
DEBUG(5, ("Error while reading from socket (errno = %d)\n",
errno));
- TALLOC_FREE(cnt);
+ cnt->processed = (size_t)-1;
return NULL;
}
- cnt->processed += nread;
+ if(cnt->processed == (size_t)-1) {
- if (cnt->msg == NULL) {
- if(cnt->processed < sizeof(size_t)) {
+ if(nread < sizeof(size_t)) {
DEBUG(0, ("Received less that %d bytes!\n",
sizeof(size_t)));
- TALLOC_FREE(cnt);
+ cnt->processed = (size_t)-1;
return NULL;
}
DEBUG(10, ("Receiving msg of length %d\n",
len));
- cnt->msg = (struct message_rec*)TALLOC_ZERO_ARRAY(cnt, char,
- len);
- if (cnt->msg == NULL) {
- DEBUG(0, ("talloc failed\n"));
- TALLOC_FREE(cnt);
- return NULL;
+
+ if(len > cnt->allocated) {
+ DEBUG(10, ("Reallocating incoming container"
+ " to fit size %d\n", len));
+ cnt = allocate_container(mem_ctx, cnt, len);
+ if(cnt == NULL) {
+ DEBUG(0, ("Reallocating failed!\n"));
+ return NULL;
+ }
}
- cnt->msg->len = len;
+
+ cnt->processed = 0;
+ cnt->msg->len = len;
}
+ cnt->processed += nread;
+
return cnt;
}
@@ -570,23 +642,23 @@
static BOOL receive_one_message(int fd,
TALLOC_CTX *mem_ctx,
- struct message_list **msg,
+ struct message_list *msg,
BOOL* received)
{
*received = False;
- *msg = read_from_stream_socket(fd, mem_ctx, *msg);
- if (*msg == NULL) {
+
+ if (read_from_stream_socket(fd, mem_ctx, msg) == NULL) {
DEBUG(5, ("failed to read\n"));
return False;
}
- if (!container_full(*msg)) {
+ if (!container_full(msg)) {
return True;
}
- SMB_ASSERT((*msg)->msg != NULL);
+ SMB_ASSERT(msg->msg != NULL);
- if ((*msg)->msg->len < sizeof(struct message_rec)) {
+ if (msg->msg->len < sizeof(struct message_rec)) {
DEBUG(5, ("Message too short\n"));
return False;
}
@@ -605,7 +677,7 @@
struct message_rec *msg_rec;
BOOL received = False;
- if(!receive_one_message(c->fd, c, &c->incoming, &received)) {
+ if(!receive_one_message(c->fd, c, c->incoming, &received)) {
DEBUG(5, ("killing client %s\n", procid_str_static(&c->pid)));
return False;
}
@@ -621,7 +693,8 @@
DEBUG(10, ("Got hello from %s\n",
procid_str_static(&msg_rec->src)));
c->pid = msg_rec->src;
- TALLOC_FREE((c->incoming));
+
+ c->incoming->processed = (size_t)-1;
}
}
@@ -784,11 +857,7 @@
DLIST_ADD_END((*received), li, tmp);
- /*
- We don't free client->incoming, because it
- will be freed in a next
- read_from_stream_socket call.
- */
+ client->incoming->processed = (size_t)-1;
}
}
@@ -834,6 +903,8 @@
close(dispatcher_pipe);
dispatcher_pipe = -1;
}
+
+ TALLOC_FREE(disp_incoming);
}
/****************************************************************************
@@ -1003,6 +1074,11 @@
#ifdef CLUSTER_SUPPORT
tcp_fd = open_remote_listener();
+ if (tcp_fd < 0) {
+ DEBUG(0, ("Can't create tcp socket\n"));
+ close(fd);
+ exit(1);
+ }
#endif /* CLUSTER_SUPPORT */
ok = True;
@@ -1027,6 +1103,11 @@
shutdown_stream_sockets();
+ close(fd);
+#ifdef CLUSTER_SUPPORT
+ close(tcp_fd);
+#endif
+
/*
Presently we don't remove socket & lock file. Should be fixed later
*/
@@ -1193,6 +1274,15 @@
return -1;
good:
+
+ SMB_ASSERT(disp_incoming == NULL);
+ disp_incoming = allocate_container(NULL, NULL, 0);
+ if(disp_incoming == NULL) {
+ DEBUG(0, ("Can't initialize tcp incoming buffer\n"));
+ close(sock);
+ return -1;
+ }
+
DEBUG(10, ("Dispatcher connected, say hello.\n"));
/* say HELLO to the dispatcher to identify oneself */
message_send_pid_socket(pid_to_procid(MESSAGING_DISPATCHER_PID),
@@ -1218,15 +1308,35 @@
void receive_on_socket_dispatcher(int socket_fd, struct message_list **queue)
{
BOOL received = False;
- if(!receive_one_message(socket_fd, NULL, &tcp_incoming, &received)) {
+ if(!receive_one_message(socket_fd, NULL, disp_incoming, &received)) {
DEBUG(0, ("Can't receive a message through the daemon\n"));
return ;
}
if(received) {
- struct message_list *tmp;
- DLIST_ADD_END((*queue), tcp_incoming, tmp);
- /* give memory control to a higher level code */
- tcp_incoming = NULL;
+ struct message_list *tmp, *li;
+
+ SMB_ASSERT(disp_incoming != NULL && disp_incoming->msg != NULL);
+
+ li = TALLOC_ZERO_P(NULL, struct message_list);
+ if(li == NULL) {
+ DEBUG(0, ("talloc failed\n"));
+ return ;
+ }
+ li->msg = (struct message_rec*)TALLOC_ARRAY(
+ li, uint8_t,
+ disp_incoming->msg->len);
+ if(li->msg == NULL) {
+ DEBUG(0, ("talloc failed\n"));
+ TALLOC_FREE(li);
+ return ;
+ }
+
+ memcpy(li->msg, disp_incoming->msg,
+ disp_incoming->msg->len);
+
+ DLIST_ADD_END((*queue), li, tmp);
+
+ disp_incoming->processed = (size_t)-1;
}
}
Modified: branches/tmp/vl-messaging/source/torture/msgtest.c
===================================================================
--- branches/tmp/vl-messaging/source/torture/msgtest.c 2006-09-06 14:29:57 UTC (rev 18179)
+++ branches/tmp/vl-messaging/source/torture/msgtest.c 2006-09-06 14:50:52 UTC (rev 18180)
@@ -56,7 +56,10 @@
{
pid_t pid;
int i, n;
- char buf[12], *p, *addr;
+ char buf[12], *large_buf;
+#ifdef CLUSTER_SUPPORT
+ char *p, *addr;
+#endif
struct timeval dispatch_timeout = timeval_set(0, DISPATCH_TIMEOUT);
struct process_id receiver;
@@ -181,6 +184,36 @@
(ping_count+pong_count)/timeval_elapsed(&tv));
}
+ /* large messages test */
+ pong_count = 0;
+
+ {
+ struct timeval tv = timeval_current();
+ size_t timelimit = n;
+ size_t ping_count = 0;
+
+ printf("Starting large messages test.\n");
+
+ for(i = 1; i <= (1 << 16) && timeval_elapsed(&tv) < timelimit; i *= 2) {
+ large_buf = TALLOC_ZERO(NULL, i);
+ if(large_buf == NULL) {
+ printf("talloc failed!\n");
+ break;
+ }
+
+ printf("size %d... ", i);
+
+ if(message_send_pid(receiver, MSG_PING,
+ large_buf, i, False)) ping_count++;
+
+ while (ping_count > pong_count && timeval_elapsed(&tv) < timelimit) {
+ message_select_and_dispatch(&dispatch_timeout);
+ }
+
+ printf("passed\n");
+ }
+ }
+
message_end();
return (0);
More information about the samba-cvs
mailing list