[PATCH] messaging performance fixes
Jeremy Allison
jra at samba.org
Thu Aug 21 14:37:43 MDT 2014
On Thu, Aug 21, 2014 at 10:35:48PM +0200, Volker Lendecke wrote:
> Hi!
>
> Review/Push would be appreciated!
Wow ! Amazing work. I'm guessing we need a bug to
get the pthreadpool fix back ported ?
I'll review asap...
>
> --
> SerNet GmbH, Bahnhofsallee 1b, 37081 Göttingen
> phone: +49-551-370000-0, fax: +49-551-370000-9
> AG Göttingen, HRB 2816, GF: Dr. Johannes Loxen
> http://www.sernet.de, mailto:kontakt at sernet.de
> From 90e3c0e269fa33c87bf05f8b7a16628324d462c2 Mon Sep 17 00:00:00 2001
> From: Volker Lendecke <vl at samba.org>
> Date: Thu, 21 Aug 2014 14:32:07 +0000
> Subject: [PATCH 1/4] messaging3: Add msg_sink/source -- perftest
>
> With this pair of programs I did some performance tests of the messaging
> system. Guess what -- I found two bugs :-)
>
> See the subsequent patches.
>
> With 1500 msg_source processes I can generate message overload: A
>
> Intel(R) Xeon(R) CPU L5640 @ 2.27GHz
>
> can receive roughly 100k messages per second. When using
> messaging_read_send/recv user/system time is roughly even, a bit more
> work done in user space. When using messaging_register, due to less
> malloc activity, user space chews a lot less.
>
> By the way: 1.500 helper threads in a blocking sendto() against a single
> datagram socket reading as fast as it can (with epoll_wait in between)
> only drove the loadavg to 12 on a 24-core machine. So I guess unix domain
> datagram sockets are pretty well protected against overload. No thundering
> herd or so. Interestingly "top" showed msg_sink at less than 90% CPU,
> although it was clearly the bottleneck. But that must be a "top" artifact.
>
> Signed-off-by: Volker Lendecke <vl at samba.org>
> ---
> source3/torture/msg_sink.c | 284 +++++++++++++++++++++++++++++++++++++++++++
> source3/torture/msg_source.c | 161 ++++++++++++++++++++++++
> source3/wscript_build | 14 +++
> 3 files changed, 459 insertions(+)
> create mode 100644 source3/torture/msg_sink.c
> create mode 100644 source3/torture/msg_source.c
>
> diff --git a/source3/torture/msg_sink.c b/source3/torture/msg_sink.c
> new file mode 100644
> index 0000000..158fe3c
> --- /dev/null
> +++ b/source3/torture/msg_sink.c
> @@ -0,0 +1,284 @@
> +/*
> + * Unix SMB/CIFS implementation.
> + * Receive and count messages
> + * Copyright (C) Volker Lendecke 2014
> + *
> + * This program is free software; you can redistribute it and/or modify
> + * it under the terms of the GNU General Public License as published by
> + * the Free Software Foundation; either version 3 of the License, or
> + * (at your option) any later version.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + * GNU General Public License for more details.
> + *
> + * You should have received a copy of the GNU General Public License
> + * along with this program. If not, see <http://www.gnu.org/licenses/>.
> + */
> +
> +#include "replace.h"
> +#include "includes.h"
> +#include "messages.h"
> +#include "lib/util/tevent_unix.h"
> +#include <stdio.h>
> +
> +struct sink_state {
> + struct tevent_context *ev;
> + struct messaging_context *msg_ctx;
> + int msg_type;
> + unsigned *counter;
> +};
> +
> +static void sink_done(struct tevent_req *subreq);
> +
> +static struct tevent_req *sink_send(TALLOC_CTX *mem_ctx,
> + struct tevent_context *ev,
> + struct messaging_context *msg_ctx,
> + int msg_type, unsigned *counter)
> +{
> + struct tevent_req *req, *subreq;
> + struct sink_state *state;
> +
> + req = tevent_req_create(mem_ctx, &state, struct sink_state);
> + if (req == NULL) {
> + return NULL;
> + }
> + state->ev = ev;
> + state->msg_ctx = msg_ctx;
> + state->msg_type = msg_type;
> + state->counter = counter;
> +
> + subreq = messaging_read_send(state, state->ev, state->msg_ctx,
> + state->msg_type);
> + if (tevent_req_nomem(subreq, req)) {
> + return tevent_req_post(req, ev);
> + }
> + tevent_req_set_callback(subreq, sink_done, req);
> + return req;
> +}
> +
> +static void sink_done(struct tevent_req *subreq)
> +{
> + struct tevent_req *req = tevent_req_callback_data(
> + subreq, struct tevent_req);
> + struct sink_state *state = tevent_req_data(
> + req, struct sink_state);
> + int ret;
> +
> + ret = messaging_read_recv(subreq, NULL, NULL);
> + TALLOC_FREE(subreq);
> + if (tevent_req_error(req, ret)) {
> + return;
> + }
> +
> + *state->counter += 1;
> +
> + subreq = messaging_read_send(state, state->ev, state->msg_ctx,
> + state->msg_type);
> + if (tevent_req_nomem(subreq, req)) {
> + return;
> + }
> + tevent_req_set_callback(subreq, sink_done, req);
> +}
> +
> +static int sink_recv(struct tevent_req *req)
> +{
> + int err;
> +
> + if (tevent_req_is_unix_error(req, &err)) {
> + return err;
> + }
> + return 0;
> +}
> +
> +struct prcount_state {
> + struct tevent_context *ev;
> + struct timeval interval;
> + unsigned *counter;
> +};
> +
> +static void prcount_waited(struct tevent_req *subreq);
> +
> +static struct tevent_req *prcount_send(TALLOC_CTX *mem_ctx,
> + struct tevent_context *ev,
> + struct timeval interval,
> + unsigned *counter)
> +{
> + struct tevent_req *req, *subreq;
> + struct prcount_state *state;
> +
> + req = tevent_req_create(mem_ctx, &state, struct prcount_state);
> + if (req == NULL) {
> + return NULL;
> + }
> + state->ev = ev;
> + state->interval = interval;
> + state->counter = counter;
> +
> + subreq = tevent_wakeup_send(
> + state, state->ev,
> + timeval_current_ofs(state->interval.tv_sec,
> + state->interval.tv_usec));
> + if (tevent_req_nomem(subreq, req)) {
> + return tevent_req_post(req, ev);
> + }
> + tevent_req_set_callback(subreq, prcount_waited, req);
> + return req;
> +}
> +
> +static void prcount_waited(struct tevent_req *subreq)
> +{
> + struct tevent_req *req = tevent_req_callback_data(
> + subreq, struct tevent_req);
> + struct prcount_state *state = tevent_req_data(
> + req, struct prcount_state);
> + bool ok;
> +
> + ok = tevent_wakeup_recv(subreq);
> + TALLOC_FREE(subreq);
> + if (!ok) {
> + tevent_req_error(req, ENOMEM);
> + return;
> + }
> +
> + printf("%u\n", *state->counter);
> +
> + subreq = tevent_wakeup_send(
> + state, state->ev,
> + timeval_current_ofs(state->interval.tv_sec,
> + state->interval.tv_usec));
> + if (tevent_req_nomem(subreq, req)) {
> + return;
> + }
> + tevent_req_set_callback(subreq, prcount_waited, req);
> +}
> +
> +static int prcount_recv(struct tevent_req *req)
> +{
> + int err;
> +
> + if (tevent_req_is_unix_error(req, &err)) {
> + return err;
> + }
> + return 0;
> +}
> +
> +struct msgcount_state {
> + unsigned count;
> +};
> +
> +static void msgcount_sunk(struct tevent_req *subreq);
> +static void msgcount_printed(struct tevent_req *subreq);
> +
> +static struct tevent_req *msgcount_send(TALLOC_CTX *mem_ctx,
> + struct tevent_context *ev,
> + struct messaging_context *msg_ctx,
> + int msg_type, struct timeval interval)
> +{
> + struct tevent_req *req, *subreq;
> + struct msgcount_state *state;
> +
> + req = tevent_req_create(mem_ctx, &state, struct msgcount_state);
> + if (req == NULL) {
> + return NULL;
> + }
> +
> + subreq = sink_send(state, ev, msg_ctx, msg_type, &state->count);
> + if (tevent_req_nomem(subreq, req)) {
> + return tevent_req_post(req, ev);
> + }
> + tevent_req_set_callback(subreq, msgcount_sunk, req);
> +
> + subreq = prcount_send(state, ev, interval, &state->count);
> + if (tevent_req_nomem(subreq, req)) {
> + return tevent_req_post(req, ev);
> + }
> + tevent_req_set_callback(subreq, msgcount_printed, req);
> +
> + return req;
> +}
> +
> +static void msgcount_sunk(struct tevent_req *subreq)
> +{
> + struct tevent_req *req = tevent_req_callback_data(
> + subreq, struct tevent_req);
> + int ret;
> +
> + ret = sink_recv(subreq);
> + TALLOC_FREE(subreq);
> + if (tevent_req_error(req, ret)) {
> + return;
> + }
> + tevent_req_done(req);
> +}
> +
> +static void msgcount_printed(struct tevent_req *subreq)
> +{
> + struct tevent_req *req = tevent_req_callback_data(
> + subreq, struct tevent_req);
> + int ret;
> +
> + ret = prcount_recv(subreq);
> + TALLOC_FREE(subreq);
> + if (tevent_req_error(req, ret)) {
> + return;
> + }
> + tevent_req_done(req);
> +}
> +
> +static int msgcount_recv(struct tevent_req *req)
> +{
> + int err;
> +
> + if (tevent_req_is_unix_error(req, &err)) {
> + return err;
> + }
> + return 0;
> +}
> +
> +int main(void)
> +{
> + TALLOC_CTX *frame = talloc_stackframe();
> + struct tevent_context *ev;
> + struct messaging_context *msg_ctx;
> + struct tevent_req *req;
> + int ret;
> + struct server_id id;
> + struct server_id_buf tmp;
> +
> + lp_load_global(get_dyn_CONFIGFILE());
> +
> + ev = tevent_context_init(frame);
> + if (ev == NULL) {
> + perror("tevent_context_init failed");
> + return -1;
> + }
> +
> + msg_ctx = messaging_init(ev, ev);
> + if (msg_ctx == NULL) {
> + perror("messaging_init failed");
> + return -1;
> + }
> +
> + id = messaging_server_id(msg_ctx);
> +
> + printf("server_id: %s\n", server_id_str_buf(id, &tmp));
> +
> + req = msgcount_send(ev, ev, msg_ctx, MSG_SMB_NOTIFY,
> + timeval_set(1, 0));
> + if (req == NULL) {
> + perror("msgcount_send failed");
> + return -1;
> + }
> +
> + if (!tevent_req_poll(req, ev)) {
> + perror("tevent_req_poll failed");
> + return -1;
> + }
> +
> + ret = msgcount_recv(req);
> + printf("msgcount_recv returned %d\n", ret);
> +
> + return 0;
> +}
> diff --git a/source3/torture/msg_source.c b/source3/torture/msg_source.c
> new file mode 100644
> index 0000000..ca797a0
> --- /dev/null
> +++ b/source3/torture/msg_source.c
> @@ -0,0 +1,161 @@
> +/*
> + * Unix SMB/CIFS implementation.
> + * Send messages once a second
> + * Copyright (C) Volker Lendecke 2014
> + *
> + * This program is free software; you can redistribute it and/or modify
> + * it under the terms of the GNU General Public License as published by
> + * the Free Software Foundation; either version 3 of the License, or
> + * (at your option) any later version.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + * GNU General Public License for more details.
> + *
> + * You should have received a copy of the GNU General Public License
> + * along with this program. If not, see <http://www.gnu.org/licenses/>.
> + */
> +
> +#include "replace.h"
> +#include "includes.h"
> +#include "messages.h"
> +#include "lib/util/tevent_unix.h"
> +#include <stdio.h>
> +
> +struct source_state {
> + struct tevent_context *ev;
> + struct messaging_context *msg_ctx;
> + int msg_type;
> + struct timeval interval;
> + struct server_id dst;
> +};
> +
> +static void source_waited(struct tevent_req *subreq);
> +
> +static struct tevent_req *source_send(TALLOC_CTX *mem_ctx,
> + struct tevent_context *ev,
> + struct messaging_context *msg_ctx,
> + int msg_type,
> + struct timeval interval,
> + struct server_id dst)
> +{
> + struct tevent_req *req, *subreq;
> + struct source_state *state;
> +
> + req = tevent_req_create(mem_ctx, &state, struct source_state);
> + if (req == NULL) {
> + return NULL;
> + }
> + state->ev = ev;
> + state->msg_ctx = msg_ctx;
> + state->msg_type = msg_type;
> + state->interval = interval;
> + state->dst = dst;
> +
> + subreq = tevent_wakeup_send(
> + state, state->ev,
> + timeval_current_ofs(state->interval.tv_sec,
> + state->interval.tv_usec));
> + if (tevent_req_nomem(subreq, req)) {
> + return tevent_req_post(req, ev);
> + }
> + tevent_req_set_callback(subreq, source_waited, req);
> + return req;
> +}
> +
> +static void source_waited(struct tevent_req *subreq)
> +{
> + struct tevent_req *req = tevent_req_callback_data(
> + subreq, struct tevent_req);
> + struct source_state *state = tevent_req_data(
> + req, struct source_state);
> + bool ok;
> + uint8_t buf[200] = { };
> +
> + ok = tevent_wakeup_recv(subreq);
> + TALLOC_FREE(subreq);
> + if (!ok) {
> + tevent_req_error(req, ENOMEM);
> + return;
> + }
> +
> + messaging_send_buf(state->msg_ctx, state->dst, state->msg_type,
> + buf, sizeof(buf));
> +
> + subreq = tevent_wakeup_send(
> + state, state->ev,
> + timeval_current_ofs(state->interval.tv_sec,
> + state->interval.tv_usec));
> + if (tevent_req_nomem(subreq, req)) {
> + return;
> + }
> + tevent_req_set_callback(subreq, source_waited, req);
> +}
> +
> +static int source_recv(struct tevent_req *req)
> +{
> + int err;
> +
> + if (tevent_req_is_unix_error(req, &err)) {
> + return err;
> + }
> + return 0;
> +}
> +
> +int main(int argc, const char *argv[])
> +{
> + TALLOC_CTX *frame = talloc_stackframe();
> + struct tevent_context *ev;
> + struct messaging_context *msg_ctx;
> + struct tevent_req *req;
> + int ret;
> + struct server_id id;
> +
> + if (argc != 2) {
> + fprintf(stderr, "Usage: %s <dst>\n", argv[0]);
> + return -1;
> + }
> +
> + lp_load(get_dyn_CONFIGFILE(), true, false, false, true);
> +
> + ev = tevent_context_init(frame);
> + if (ev == NULL) {
> + perror("tevent_context_init failed");
> + return -1;
> + }
> +
> + msg_ctx = messaging_init(ev, ev);
> + if (msg_ctx == NULL) {
> + perror("messaging_init failed");
> + return -1;
> + }
> +
> + id = server_id_from_string(get_my_vnn(), argv[1]);
> + if (!procid_valid(&id)) {
> + fprintf(stderr, "pid %s invalid\n", argv[1]);
> + return -1;
> + }
> +
> + req = source_send(ev, ev, msg_ctx, MSG_SMB_NOTIFY,
> + timeval_set(0, 10000), id);
> + if (req == NULL) {
> + perror("source_send failed");
> + return -1;
> + }
> +
> + if (!tevent_req_poll(req, ev)) {
> + perror("tevent_req_poll failed");
> + return -1;
> + }
> +
> + ret = source_recv(req);
> +
> + printf("source_recv returned %d\n", ret);
> +
> + return 0;
> +}
> +
> +
> +
> +
> diff --git a/source3/wscript_build b/source3/wscript_build
> index 4365d61..740ab76 100755
> --- a/source3/wscript_build
> +++ b/source3/wscript_build
> @@ -1292,6 +1292,20 @@ bld.SAMBA3_BINARY('msgtest',
> param''',
> install=False)
>
> +bld.SAMBA3_BINARY('msg_sink',
> + source='torture/msg_sink.c',
> + deps='''
> + talloc
> + param''',
> + install=False)
> +
> +bld.SAMBA3_BINARY('msg_source',
> + source='torture/msg_source.c',
> + deps='''
> + talloc
> + param''',
> + install=False)
> +
> bld.SAMBA3_BINARY('smbcacls',
> source='utils/smbcacls.c',
> deps='''
> --
> 1.8.1.2
>
>
> From c59b9b609c5f60c1303b620951f3f927bf973d8a Mon Sep 17 00:00:00 2001
> From: Volker Lendecke <vl at samba.org>
> Date: Thu, 21 Aug 2014 19:55:06 +0000
> Subject: [PATCH 2/4] pthreadpool: Slightly serialize jobs
>
> Using the new msg_source program with 1.500 instances against a single
> msg_sink I found the msg_source process to spawn two worker threads for
> synchronously sending the data towards the receiving socket. This should
> not happen: Per destination node we only create one queue. We strictly
> only add pthreadpool jobs one after the other, so a single helper thread
> should be perfectly sufficient.
>
> It turned out that under heavy overload the main sending thread was
> scheduled before the thread that just had finished its send() job. So
> the helper thread was not able to increment the pool->num_idle variable
> indicating that we don't have to create a new thread when the new job
> is added.
>
> This patch moves the signalling write under the mutex. This means that
> indicating readiness via the pipe and the pool->num_idle variable happen both
> under the same mutex lock and thus are atomic. No superfluous threads anymore.
>
> Signed-off-by: Volker Lendecke <vl at samba.org>
> ---
> source3/lib/pthreadpool/pthreadpool.c | 2 +-
> 1 file changed, 1 insertion(+), 1 deletion(-)
>
> diff --git a/source3/lib/pthreadpool/pthreadpool.c b/source3/lib/pthreadpool/pthreadpool.c
> index 4436ab3..d683578 100644
> --- a/source3/lib/pthreadpool/pthreadpool.c
> +++ b/source3/lib/pthreadpool/pthreadpool.c
> @@ -536,11 +536,11 @@ static void *pthreadpool_server(void *arg)
> assert(res == 0);
>
> job.fn(job.private_data);
> - written = write(sig_pipe, &job.id, sizeof(job.id));
>
> res = pthread_mutex_lock(&pool->mutex);
> assert(res == 0);
>
> + written = write(sig_pipe, &job.id, sizeof(job.id));
> if (written != sizeof(int)) {
> pthreadpool_server_exit(pool);
> pthread_mutex_unlock(&pool->mutex);
> --
> 1.8.1.2
>
>
> From 30626f6e0bfbc6f6c73bf695aae8fc32b4e37718 Mon Sep 17 00:00:00 2001
> From: Volker Lendecke <vl at samba.org>
> Date: Thu, 21 Aug 2014 18:36:33 +0000
> Subject: [PATCH 3/4] lib: Introduce server_id_same_process()
>
> Signed-off-by: Volker Lendecke <vl at samba.org>
> ---
> lib/util/samba_util.h | 2 ++
> lib/util/server_id.c | 12 +++++++-----
> 2 files changed, 9 insertions(+), 5 deletions(-)
>
> diff --git a/lib/util/samba_util.h b/lib/util/samba_util.h
> index 233b5fd..f1f4c2d 100644
> --- a/lib/util/samba_util.h
> +++ b/lib/util/samba_util.h
> @@ -985,6 +985,8 @@ struct server_id;
> struct server_id_buf { char buf[48]; }; /* probably a bit too large ... */
> char *server_id_str_buf(struct server_id id, struct server_id_buf *dst);
>
> +bool server_id_same_process(const struct server_id *p1,
> + const struct server_id *p2);
> bool server_id_equal(const struct server_id *p1, const struct server_id *p2);
> char *server_id_str(TALLOC_CTX *mem_ctx, const struct server_id *id);
> struct server_id server_id_from_string(uint32_t local_vnn,
> diff --git a/lib/util/server_id.c b/lib/util/server_id.c
> index e0a05a7..7d3de2f 100644
> --- a/lib/util/server_id.c
> +++ b/lib/util/server_id.c
> @@ -20,9 +20,15 @@
> #include "includes.h"
> #include "librpc/gen_ndr/server_id.h"
>
> +bool server_id_same_process(const struct server_id *p1,
> + const struct server_id *p2)
> +{
> + return ((p1->pid == p2->pid) && (p1->vnn == p2->vnn));
> +}
> +
> bool server_id_equal(const struct server_id *p1, const struct server_id *p2)
> {
> - if (p1->pid != p2->pid) {
> + if (!server_id_same_process(p1, p2)) {
> return false;
> }
>
> @@ -30,10 +36,6 @@ bool server_id_equal(const struct server_id *p1, const struct server_id *p2)
> return false;
> }
>
> - if (p1->vnn != p2->vnn) {
> - return false;
> - }
> -
> if (p1->unique_id != p2->unique_id) {
> return false;
> }
> --
> 1.8.1.2
>
>
> From 7b2afde469dac0fa8149ccdb3b63942ba4a8a6d3 Mon Sep 17 00:00:00 2001
> From: Volker Lendecke <vl at samba.org>
> Date: Thu, 21 Aug 2014 18:41:49 +0000
> Subject: [PATCH 4/4] messaging3: Avoid messaging_is_self_send
>
> This was a bad API, and it was used in a buggy way: In
> messaging_dispatch_rec we always did the defer, we referenced the
> destination pid, not the source. In messaging_send_iov this is the right
> thing to do to reference the destination, but when we have arrived in
> messaging_dispatch_rec we should compare source and destination.
>
> Signed-off-by: Volker Lendecke <vl at samba.org>
> ---
> source3/lib/messages.c | 15 ++++++---------
> 1 file changed, 6 insertions(+), 9 deletions(-)
>
> diff --git a/source3/lib/messages.c b/source3/lib/messages.c
> index 25d3f01..80ecec4 100644
> --- a/source3/lib/messages.c
> +++ b/source3/lib/messages.c
> @@ -403,13 +403,6 @@ void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
> }
> }
>
> -static bool messaging_is_self_send(const struct messaging_context *msg_ctx,
> - const struct server_id *dst)
> -{
> - return ((msg_ctx->id.vnn == dst->vnn) &&
> - (msg_ctx->id.pid == dst->pid));
> -}
> -
> /*
> Send a message to a particular server
> */
> @@ -455,10 +448,14 @@ NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
> return NT_STATUS_OK;
> }
>
> - if (messaging_is_self_send(msg_ctx, &server)) {
> + if (server_id_same_process(&msg_ctx->id, &server)) {
> struct messaging_rec rec;
> uint8_t *buf;
>
> + /*
> + * Self-send, directly dispatch
> + */
> +
> buf = iov_buf(talloc_tos(), iov, iovlen);
> if (buf == NULL) {
> return NT_STATUS_NO_MEMORY;
> @@ -827,7 +824,7 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx,
> continue;
> }
>
> - if (messaging_is_self_send(msg_ctx, &rec->dest)) {
> + if (server_id_same_process(&rec->src, &rec->dest)) {
> /*
> * This is a self-send. We are called here from
> * messaging_send(), and we don't want to directly
> --
> 1.8.1.2
>
More information about the samba-technical
mailing list