[PATCH] messaging performance fixes

Volker Lendecke Volker.Lendecke at SerNet.DE
Thu Aug 21 14:35:48 MDT 2014


Hi!

Review/Push would be appreciated!

Thanks,

Volker

-- 
SerNet GmbH, Bahnhofsallee 1b, 37081 Göttingen
phone: +49-551-370000-0, fax: +49-551-370000-9
AG Göttingen, HRB 2816, GF: Dr. Johannes Loxen
http://www.sernet.de, mailto:kontakt at sernet.de
-------------- next part --------------
From 90e3c0e269fa33c87bf05f8b7a16628324d462c2 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Thu, 21 Aug 2014 14:32:07 +0000
Subject: [PATCH 1/4] messaging3: Add msg_sink/source -- perftest

With this pair of programs I did some performance tests of the messaging
system. Guess what -- I found two bugs :-)

See the subsequent patches.

With 1500 msg_source processes I can generate message overload: A

Intel(R) Xeon(R) CPU           L5640  @ 2.27GHz

can receive roughly 100k messages per second. When using
messaging_read_send/recv user/system time is roughly even, a bit more
work done in user space. When using messaging_register, due to less
malloc activity, user space chews a lot less.

By the way: 1.500 helper threads in a blocking sendto() against a single
datagram socket reading as fast as it can (with epoll_wait in between)
only drove the loadavg to 12 on a 24-core machine. So I guess unix domain
datagram sockets are pretty well protected against overload. No thundering
herd or so. Interestingly "top" showed msg_sink at less than 90% CPU,
although it was clearly the bottleneck. But that must be a "top" artifact.

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/torture/msg_sink.c   | 284 +++++++++++++++++++++++++++++++++++++++++++
 source3/torture/msg_source.c | 161 ++++++++++++++++++++++++
 source3/wscript_build        |  14 +++
 3 files changed, 459 insertions(+)
 create mode 100644 source3/torture/msg_sink.c
 create mode 100644 source3/torture/msg_source.c

diff --git a/source3/torture/msg_sink.c b/source3/torture/msg_sink.c
new file mode 100644
index 0000000..158fe3c
--- /dev/null
+++ b/source3/torture/msg_sink.c
@@ -0,0 +1,284 @@
+/*
+ *  Unix SMB/CIFS implementation.
+ *  Receive and count messages
+ *  Copyright (C) Volker Lendecke 2014
+ *
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; either version 3 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "replace.h"
+#include "includes.h"
+#include "messages.h"
+#include "lib/util/tevent_unix.h"
+#include <stdio.h>
+
+struct sink_state {
+	struct tevent_context *ev;
+	struct messaging_context *msg_ctx;
+	int msg_type;
+	unsigned *counter;
+};
+
+static void sink_done(struct tevent_req *subreq);
+
+static struct tevent_req *sink_send(TALLOC_CTX *mem_ctx,
+				    struct tevent_context *ev,
+				    struct messaging_context *msg_ctx,
+				    int msg_type, unsigned *counter)
+{
+	struct tevent_req *req, *subreq;
+	struct sink_state *state;
+
+	req = tevent_req_create(mem_ctx, &state, struct sink_state);
+	if (req == NULL) {
+		return NULL;
+	}
+	state->ev = ev;
+	state->msg_ctx = msg_ctx;
+	state->msg_type = msg_type;
+	state->counter = counter;
+
+	subreq = messaging_read_send(state, state->ev, state->msg_ctx,
+				     state->msg_type);
+	if (tevent_req_nomem(subreq, req)) {
+		return tevent_req_post(req, ev);
+	}
+	tevent_req_set_callback(subreq, sink_done, req);
+	return req;
+}
+
+static void sink_done(struct tevent_req *subreq)
+{
+	struct tevent_req *req = tevent_req_callback_data(
+		subreq, struct tevent_req);
+	struct sink_state *state = tevent_req_data(
+		req, struct sink_state);
+	int ret;
+
+	ret = messaging_read_recv(subreq, NULL, NULL);
+	TALLOC_FREE(subreq);
+	if (tevent_req_error(req, ret)) {
+		return;
+	}
+
+	*state->counter += 1;
+
+	subreq = messaging_read_send(state, state->ev, state->msg_ctx,
+				     state->msg_type);
+	if (tevent_req_nomem(subreq, req)) {
+		return;
+	}
+	tevent_req_set_callback(subreq, sink_done, req);
+}
+
+static int sink_recv(struct tevent_req *req)
+{
+	int err;
+
+	if (tevent_req_is_unix_error(req, &err)) {
+		return err;
+	}
+	return 0;
+}
+
+struct prcount_state {
+	struct tevent_context *ev;
+	struct timeval interval;
+	unsigned *counter;
+};
+
+static void prcount_waited(struct tevent_req *subreq);
+
+static struct tevent_req *prcount_send(TALLOC_CTX *mem_ctx,
+				       struct tevent_context *ev,
+				       struct timeval interval,
+				       unsigned *counter)
+{
+	struct tevent_req *req, *subreq;
+	struct prcount_state *state;
+
+	req = tevent_req_create(mem_ctx, &state, struct prcount_state);
+	if (req == NULL) {
+		return NULL;
+	}
+	state->ev = ev;
+	state->interval = interval;
+	state->counter = counter;
+
+	subreq = tevent_wakeup_send(
+		state, state->ev,
+		timeval_current_ofs(state->interval.tv_sec,
+				    state->interval.tv_usec));
+	if (tevent_req_nomem(subreq, req)) {
+		return tevent_req_post(req, ev);
+	}
+	tevent_req_set_callback(subreq, prcount_waited, req);
+	return req;
+}
+
+static void prcount_waited(struct tevent_req *subreq)
+{
+	struct tevent_req *req = tevent_req_callback_data(
+		subreq, struct tevent_req);
+	struct prcount_state *state = tevent_req_data(
+		req, struct prcount_state);
+	bool ok;
+
+	ok = tevent_wakeup_recv(subreq);
+	TALLOC_FREE(subreq);
+	if (!ok) {
+		tevent_req_error(req, ENOMEM);
+		return;
+	}
+
+	printf("%u\n", *state->counter);
+
+	subreq = tevent_wakeup_send(
+		state, state->ev,
+		timeval_current_ofs(state->interval.tv_sec,
+				    state->interval.tv_usec));
+	if (tevent_req_nomem(subreq, req)) {
+		return;
+	}
+	tevent_req_set_callback(subreq, prcount_waited, req);
+}
+
+static int prcount_recv(struct tevent_req *req)
+{
+	int err;
+
+	if (tevent_req_is_unix_error(req, &err)) {
+		return err;
+	}
+	return 0;
+}
+
+struct msgcount_state {
+	unsigned count;
+};
+
+static void msgcount_sunk(struct tevent_req *subreq);
+static void msgcount_printed(struct tevent_req *subreq);
+
+static struct tevent_req *msgcount_send(TALLOC_CTX *mem_ctx,
+					struct tevent_context *ev,
+					struct messaging_context *msg_ctx,
+					int msg_type, struct timeval interval)
+{
+	struct tevent_req *req, *subreq;
+	struct msgcount_state *state;
+
+	req = tevent_req_create(mem_ctx, &state, struct msgcount_state);
+	if (req == NULL) {
+		return NULL;
+	}
+
+	subreq = sink_send(state, ev, msg_ctx, msg_type, &state->count);
+	if (tevent_req_nomem(subreq, req)) {
+		return tevent_req_post(req, ev);
+	}
+	tevent_req_set_callback(subreq, msgcount_sunk, req);
+
+	subreq = prcount_send(state, ev, interval, &state->count);
+	if (tevent_req_nomem(subreq, req)) {
+		return tevent_req_post(req, ev);
+	}
+	tevent_req_set_callback(subreq, msgcount_printed, req);
+
+	return req;
+}
+
+static void msgcount_sunk(struct tevent_req *subreq)
+{
+	struct tevent_req *req = tevent_req_callback_data(
+		subreq, struct tevent_req);
+	int ret;
+
+	ret = sink_recv(subreq);
+	TALLOC_FREE(subreq);
+	if (tevent_req_error(req, ret)) {
+		return;
+	}
+	tevent_req_done(req);
+}
+
+static void msgcount_printed(struct tevent_req *subreq)
+{
+	struct tevent_req *req = tevent_req_callback_data(
+		subreq, struct tevent_req);
+	int ret;
+
+	ret = prcount_recv(subreq);
+	TALLOC_FREE(subreq);
+	if (tevent_req_error(req, ret)) {
+		return;
+	}
+	tevent_req_done(req);
+}
+
+static int msgcount_recv(struct tevent_req *req)
+{
+	int err;
+
+	if (tevent_req_is_unix_error(req, &err)) {
+		return err;
+	}
+	return 0;
+}
+
+int main(void)
+{
+	TALLOC_CTX *frame = talloc_stackframe();
+	struct tevent_context *ev;
+	struct messaging_context *msg_ctx;
+	struct tevent_req *req;
+	int ret;
+	struct server_id id;
+	struct server_id_buf tmp;
+
+	lp_load_global(get_dyn_CONFIGFILE());
+
+	ev = tevent_context_init(frame);
+	if (ev == NULL) {
+		perror("tevent_context_init failed");
+		return -1;
+	}
+
+	msg_ctx = messaging_init(ev, ev);
+	if (msg_ctx == NULL) {
+		perror("messaging_init failed");
+		return -1;
+	}
+
+	id = messaging_server_id(msg_ctx);
+
+	printf("server_id: %s\n", server_id_str_buf(id, &tmp));
+
+	req = msgcount_send(ev, ev, msg_ctx, MSG_SMB_NOTIFY,
+			    timeval_set(1, 0));
+	if (req == NULL) {
+		perror("msgcount_send failed");
+		return -1;
+	}
+
+	if (!tevent_req_poll(req, ev)) {
+		perror("tevent_req_poll failed");
+		return -1;
+	}
+
+	ret = msgcount_recv(req);
+	printf("msgcount_recv returned %d\n", ret);
+
+	return 0;
+}
diff --git a/source3/torture/msg_source.c b/source3/torture/msg_source.c
new file mode 100644
index 0000000..ca797a0
--- /dev/null
+++ b/source3/torture/msg_source.c
@@ -0,0 +1,161 @@
+/*
+ *  Unix SMB/CIFS implementation.
+ *  Send messages once a second
+ *  Copyright (C) Volker Lendecke 2014
+ *
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; either version 3 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "replace.h"
+#include "includes.h"
+#include "messages.h"
+#include "lib/util/tevent_unix.h"
+#include <stdio.h>
+
+struct source_state {
+	struct tevent_context *ev;
+	struct messaging_context *msg_ctx;
+	int msg_type;
+	struct timeval interval;
+	struct server_id dst;
+};
+
+static void source_waited(struct tevent_req *subreq);
+
+static struct tevent_req *source_send(TALLOC_CTX *mem_ctx,
+				      struct tevent_context *ev,
+				      struct messaging_context *msg_ctx,
+				      int msg_type,
+				      struct timeval interval,
+				      struct server_id dst)
+{
+	struct tevent_req *req, *subreq;
+	struct source_state *state;
+
+	req = tevent_req_create(mem_ctx, &state, struct source_state);
+	if (req == NULL) {
+		return NULL;
+	}
+	state->ev = ev;
+	state->msg_ctx = msg_ctx;
+	state->msg_type = msg_type;
+	state->interval = interval;
+	state->dst = dst;
+
+	subreq = tevent_wakeup_send(
+		state, state->ev,
+		timeval_current_ofs(state->interval.tv_sec,
+				    state->interval.tv_usec));
+	if (tevent_req_nomem(subreq, req)) {
+		return tevent_req_post(req, ev);
+	}
+	tevent_req_set_callback(subreq, source_waited, req);
+	return req;
+}
+
+static void source_waited(struct tevent_req *subreq)
+{
+	struct tevent_req *req = tevent_req_callback_data(
+		subreq, struct tevent_req);
+	struct source_state *state = tevent_req_data(
+		req, struct source_state);
+	bool ok;
+	uint8_t buf[200] = { };
+
+	ok = tevent_wakeup_recv(subreq);
+	TALLOC_FREE(subreq);
+	if (!ok) {
+		tevent_req_error(req, ENOMEM);
+		return;
+	}
+
+	messaging_send_buf(state->msg_ctx, state->dst, state->msg_type,
+			   buf, sizeof(buf));
+
+	subreq = tevent_wakeup_send(
+		state, state->ev,
+		timeval_current_ofs(state->interval.tv_sec,
+				    state->interval.tv_usec));
+	if (tevent_req_nomem(subreq, req)) {
+		return;
+	}
+	tevent_req_set_callback(subreq, source_waited, req);
+}
+
+static int source_recv(struct tevent_req *req)
+{
+	int err;
+
+	if (tevent_req_is_unix_error(req, &err)) {
+		return err;
+	}
+	return 0;
+}
+
+int main(int argc, const char *argv[])
+{
+	TALLOC_CTX *frame = talloc_stackframe();
+	struct tevent_context *ev;
+	struct messaging_context *msg_ctx;
+	struct tevent_req *req;
+	int ret;
+	struct server_id id;
+
+	if (argc != 2) {
+		fprintf(stderr, "Usage: %s <dst>\n", argv[0]);
+		return -1;
+	}
+
+	lp_load(get_dyn_CONFIGFILE(), true, false, false, true);
+
+	ev = tevent_context_init(frame);
+	if (ev == NULL) {
+		perror("tevent_context_init failed");
+		return -1;
+	}
+
+	msg_ctx = messaging_init(ev, ev);
+	if (msg_ctx == NULL) {
+		perror("messaging_init failed");
+		return -1;
+	}
+
+	id = server_id_from_string(get_my_vnn(), argv[1]);
+	if (!procid_valid(&id)) {
+		fprintf(stderr, "pid %s invalid\n", argv[1]);
+		return -1;
+	}
+
+	req = source_send(ev, ev, msg_ctx, MSG_SMB_NOTIFY,
+			  timeval_set(0, 10000), id);
+	if (req == NULL) {
+		perror("source_send failed");
+		return -1;
+	}
+
+	if (!tevent_req_poll(req, ev)) {
+		perror("tevent_req_poll failed");
+		return -1;
+	}
+
+	ret = source_recv(req);
+
+	printf("source_recv returned %d\n", ret);
+
+	return 0;
+}
+
+
+
+
diff --git a/source3/wscript_build b/source3/wscript_build
index 4365d61..740ab76 100755
--- a/source3/wscript_build
+++ b/source3/wscript_build
@@ -1292,6 +1292,20 @@ bld.SAMBA3_BINARY('msgtest',
                  param''',
                  install=False)
 
+bld.SAMBA3_BINARY('msg_sink',
+                 source='torture/msg_sink.c',
+                 deps='''
+                 talloc
+                 param''',
+                 install=False)
+
+bld.SAMBA3_BINARY('msg_source',
+                 source='torture/msg_source.c',
+                 deps='''
+                 talloc
+                 param''',
+                 install=False)
+
 bld.SAMBA3_BINARY('smbcacls',
                  source='utils/smbcacls.c',
                  deps='''
-- 
1.8.1.2


From c59b9b609c5f60c1303b620951f3f927bf973d8a Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Thu, 21 Aug 2014 19:55:06 +0000
Subject: [PATCH 2/4] pthreadpool: Slightly serialize jobs

Using the new msg_source program with 1.500 instances against a single
msg_sink I found the msg_source process to spawn two worker threads for
synchronously sending the data towards the receiving socket. This should
not happen: Per destination node we only create one queue. We strictly
only add pthreadpool jobs one after the other, so a single helper thread
should be perfectly sufficient.

It turned out that under heavy overload the main sending thread was
scheduled before the thread that just had finished its send() job. So
the helper thread was not able to increment the pool->num_idle variable
indicating that we don't have to create a new thread when the new job
is added.

This patch moves the signalling write under the mutex. This means that
indicating readiness via the pipe and the pool->num_idle variable happen both
under the same mutex lock and thus are atomic. No superfluous threads anymore.

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/lib/pthreadpool/pthreadpool.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/source3/lib/pthreadpool/pthreadpool.c b/source3/lib/pthreadpool/pthreadpool.c
index 4436ab3..d683578 100644
--- a/source3/lib/pthreadpool/pthreadpool.c
+++ b/source3/lib/pthreadpool/pthreadpool.c
@@ -536,11 +536,11 @@ static void *pthreadpool_server(void *arg)
 			assert(res == 0);
 
 			job.fn(job.private_data);
-			written = write(sig_pipe, &job.id, sizeof(job.id));
 
 			res = pthread_mutex_lock(&pool->mutex);
 			assert(res == 0);
 
+			written = write(sig_pipe, &job.id, sizeof(job.id));
 			if (written != sizeof(int)) {
 				pthreadpool_server_exit(pool);
 				pthread_mutex_unlock(&pool->mutex);
-- 
1.8.1.2


From 30626f6e0bfbc6f6c73bf695aae8fc32b4e37718 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Thu, 21 Aug 2014 18:36:33 +0000
Subject: [PATCH 3/4] lib: Introduce server_id_same_process()

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 lib/util/samba_util.h |  2 ++
 lib/util/server_id.c  | 12 +++++++-----
 2 files changed, 9 insertions(+), 5 deletions(-)

diff --git a/lib/util/samba_util.h b/lib/util/samba_util.h
index 233b5fd..f1f4c2d 100644
--- a/lib/util/samba_util.h
+++ b/lib/util/samba_util.h
@@ -985,6 +985,8 @@ struct server_id;
 struct server_id_buf { char buf[48]; }; /* probably a bit too large ... */
 char *server_id_str_buf(struct server_id id, struct server_id_buf *dst);
 
+bool server_id_same_process(const struct server_id *p1,
+			    const struct server_id *p2);
 bool server_id_equal(const struct server_id *p1, const struct server_id *p2);
 char *server_id_str(TALLOC_CTX *mem_ctx, const struct server_id *id);
 struct server_id server_id_from_string(uint32_t local_vnn,
diff --git a/lib/util/server_id.c b/lib/util/server_id.c
index e0a05a7..7d3de2f 100644
--- a/lib/util/server_id.c
+++ b/lib/util/server_id.c
@@ -20,9 +20,15 @@
 #include "includes.h"
 #include "librpc/gen_ndr/server_id.h"
 
+bool server_id_same_process(const struct server_id *p1,
+			    const struct server_id *p2)
+{
+	return ((p1->pid == p2->pid) && (p1->vnn == p2->vnn));
+}
+
 bool server_id_equal(const struct server_id *p1, const struct server_id *p2)
 {
-	if (p1->pid != p2->pid) {
+	if (!server_id_same_process(p1, p2)) {
 		return false;
 	}
 
@@ -30,10 +36,6 @@ bool server_id_equal(const struct server_id *p1, const struct server_id *p2)
 		return false;
 	}
 
-	if (p1->vnn != p2->vnn) {
-		return false;
-	}
-
 	if (p1->unique_id != p2->unique_id) {
 		return false;
 	}
-- 
1.8.1.2


From 7b2afde469dac0fa8149ccdb3b63942ba4a8a6d3 Mon Sep 17 00:00:00 2001
From: Volker Lendecke <vl at samba.org>
Date: Thu, 21 Aug 2014 18:41:49 +0000
Subject: [PATCH 4/4] messaging3: Avoid messaging_is_self_send

This was a bad API, and it was used in a buggy way: In
messaging_dispatch_rec we always did the defer, we referenced the
destination pid, not the source. In messaging_send_iov this is the right
thing to do to reference the destination, but when we have arrived in
messaging_dispatch_rec we should compare source and destination.

Signed-off-by: Volker Lendecke <vl at samba.org>
---
 source3/lib/messages.c | 15 ++++++---------
 1 file changed, 6 insertions(+), 9 deletions(-)

diff --git a/source3/lib/messages.c b/source3/lib/messages.c
index 25d3f01..80ecec4 100644
--- a/source3/lib/messages.c
+++ b/source3/lib/messages.c
@@ -403,13 +403,6 @@ void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
 	}
 }
 
-static bool messaging_is_self_send(const struct messaging_context *msg_ctx,
-				   const struct server_id *dst)
-{
-	return ((msg_ctx->id.vnn == dst->vnn) &&
-		(msg_ctx->id.pid == dst->pid));
-}
-
 /*
   Send a message to a particular server
 */
@@ -455,10 +448,14 @@ NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
 		return NT_STATUS_OK;
 	}
 
-	if (messaging_is_self_send(msg_ctx, &server)) {
+	if (server_id_same_process(&msg_ctx->id, &server)) {
 		struct messaging_rec rec;
 		uint8_t *buf;
 
+		/*
+		 * Self-send, directly dispatch
+		 */
+
 		buf = iov_buf(talloc_tos(), iov, iovlen);
 		if (buf == NULL) {
 			return NT_STATUS_NO_MEMORY;
@@ -827,7 +824,7 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx,
 			continue;
 		}
 
-		if (messaging_is_self_send(msg_ctx, &rec->dest)) {
+		if (server_id_same_process(&rec->src, &rec->dest)) {
 			/*
 			 * This is a self-send. We are called here from
 			 * messaging_send(), and we don't want to directly
-- 
1.8.1.2



More information about the samba-technical mailing list