impersonation part 5 (pthreadpool_tevent_wrapper)

Stefan Metzmacher metze at samba.org
Mon Jul 23 22:11:05 UTC 2018


Hi,

here's an updated patchset, that will likely pass all tests.
https://gitlab.com/samba-team/devel/samba/pipelines/26347832
A very similar one already passed:
https://gitlab.com/samba-team/devel/samba/pipelines/26325711

Please review and push:-)

Thanks!
metze

Am 19.07.2018 um 12:06 schrieb Stefan Metzmacher via samba-technical:
> Hi,
> 
> here's patchset that implements a wrapper infrastructure for
> pthreadpool_tevent.
> 
> It means we can later have hooks, which call
> set_thread_credentials() before invoking the job
> function. Together with using 'unshare(CLONE_FS)'
> in the worker thread, we get per thread creadentials
> and a per thread current working directory.
> 
> This is required implement async path based syscalls.
> 
> Most of time we'll try to use the '*at()' version of syscalls,
> e.g. openat(), but if such a function is not available, e.g. getxattr(),
> we can simulate it by calling fchdir() before.
> 
> In order to get some sanity into the pthreadpool* code,
> regarding what happens:
> - if we talloc_free() a pending job
> - if we talloc_free() the pool
> before the job started within a worker thread
> and when it's already started.
> I came to the conclusion that we really need good tests
> to demonstrate the behavior and make sure these tests
> are valgrind/helgrind/drd clean.
> 
> In order to archive that we need to ensure some synchronization
> between the main program and the worker threads.
> 
> Typically you would use pthread_mutex_[un]lock() for that
> and we already use them in the low level pthreadpool.c code.
> 
> But we also need something in pthreadpool_tevent.c, which is upper
> layer, which is used by samba.
> 
> The tricky part is that each time you use mutexes, you need to
> implement pthread_atfork() hooks, which lock *all* mutexes
> before we can allow fork. Getting this correct and deadlock
> free with the deep interaction between pthreadpool.c and
> pthreadpool_tevent.c seemed way to complex.
> 
> The assumption we can make regarding the use case, with unexpected
> talloc_free(), is that there's always some kind of race and we don't
> need the strong guarantees of mutexes.
> 
> I did some research and found memory barriers, which can guarantee
> that atomic changes happen in a defined order. Typically
> one thread changes a value and other threads just read it.
> 
> The barrier with the strongest guarantees is a full barrier.
> 
> C11 has it as atomic_thread_fence(memory_order_seq_cst) in stdatomic.h.
> This results in a 'mfence' assembler instruction.
> 
> The same can be done with compiler builtins.
> __atomic_signal_fence(__ATOMIC_SEQ_CST) in newer gcc versions
> or __sync_synchronize() in older versions (was added in 2005)
> 
> This should be the same as:
> __asm__ __volatile__ ("mfence" : : : "memory");
> 
> Much more details can be found here:
> https://gcc.gnu.org/onlinedocs/gcc-7.3.0/gcc/_005f_005fatomic-Builtins.html#g_t_005f_005fatomic-Builtins
> https://gcc.gnu.org/onlinedocs/gcc-7.3.0/gcc/_005f_005fsync-Builtins.html#g_t_005f_005fsync-Builtins
> 
> In order to simplify things I decided to always use a full barrier,
> we can always optimize it later to use read or write barriers.
> 
> You can imagine the full barrier as protection against
> instruction boundary that prevents instructions to be reordered
> before or after the boundary. As well as a 'sync' of all
> cpu caches, similar to an write oplock break where the client (cpu)
> needs to flush local changes (in the cpu caches) to the server (the RAM).
> 
> This means that after the barrier all threads are guaranteed to see
> the same memory.
> 
> We'll use a model with a bunch of bool variables, where each variable
> is only be changed by either the main thread or the worker thread.
> 
> The (simplified) design is'
> 
> bool orphaned; /* written by main thread */
> bool started;  /* written by job thread */
> bool finished; /* written by job thread */
> 
> job thread:
>      job->started = true;
>      atomic_thread_fence(memory_order_seq_cst);
>      if (job->orphaned) {
>         job->finished = true;
>         atomic_thread_fence(memory_order_seq_cst)
>         exit();
>      }
>      job->job_fn();
>      job->finished = true;
>      atomic_thread_fence(memory_order_seq_cst)
>      exit()
> 
> main thread destructor:
> 
>      job->orphaned = true;
>      atomic_thread_fence(memory_order_seq_cst);
>      if (job->finished) {
>          talloc_free(job);
>      }
> 
>      DLIST_ADD_END(orphaned_jobs, job);
> 
>      return;
> 
> main thread cleanup orphaned job list:
> 
>      for (job ....) {
>           next_job = job->next;
>           atomic_thread_fence(memory_order_seq_cst)
>           if (job->finished) {
>                DLIST_REMOVE(orphaned_jobs, job)
>                talloc_free(job);
>           }
>      }
> 
> Once I had added that model to the exiting pthreadpool_tevent code
> and had cmocka based tests for it, it was much easier to
> add the pthreadpool_tevent_wrapper infrastructure.
> 
> The tricky part is what happens when talloc_free() is called
> on just the wrapper pool, but the raw pool remains.
> 
> If the before_job() hook was already called and the wrapper
> disappears while the job is running, we can't call the
> after_job() hook in order to reset the job environment,
> the only thing we can do is to exit that specific worker thread.
> Otherwise a wrapper might continue running with special privileges
> and a job on a different wrapper pool might still run with these
> privileges.
> 
> We notify the main thread via a pipe() that it needs to call
> pthreadpool_restart_check() in order to start new threads for
> pending jobs.
> 
> All of these commands pass fine:-)
> valgrind --error-exitcode=137 bin/pthreadpooltest_cmocka
> valgrind --tool=helgrind --error-exitcode=137 bin/pthreadpooltest_cmocka
> valgrind --tool=drd --error-exitcode=137 bin/pthreadpooltest_cmocka
> 
> Enjoy the review and push:-)
> 
> Thanks!
> metze
> 
> Am 12.07.2018 um 08:13 schrieb Ralph Böhme via samba-technical:
>> On Thu, Jul 12, 2018 at 12:47:16AM +0200, Stefan Metzmacher via
>> samba-technical wrote:
>>> here's another patchset that implements some cleanups in the
>>> pthreadpool code, which make it easier to implement the required
>>> pthreadpool wrapper for per thread impersonation.
>>> I'll do some private autobuilds with this tomorrow, but a very similar
>>> state already passed a few times.
>>> Please review:-) Note some is already reviewed by Ralph, I need to
>>> port the review tags from his branch to mine. 
>>
>> looking through the patchset, all patches were reviewed-by-me.
>>
>> -slow
>>
> 

-------------- next part --------------
From 929934b103ad5ed3b7efc06c8cbb470382eaccb5 Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Sat, 14 Jul 2018 10:55:02 +0200
Subject: [PATCH 01/23] tevent: use talloc_zero_size() for the private state in
 tevent_context_wrapper_create()

This is watch tevent_req_create() uses and what callers of
tevent_context_wrapper_create() would therefore also expect.

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/tevent/tevent_wrapper.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lib/tevent/tevent_wrapper.c b/lib/tevent/tevent_wrapper.c
index a07696af0a4d..ce07af983588 100644
--- a/lib/tevent/tevent_wrapper.c
+++ b/lib/tevent/tevent_wrapper.c
@@ -371,7 +371,7 @@ struct tevent_context *_tevent_context_wrapper_create(struct tevent_context *mai
 	ev->wrapper.glue->wrap_ev = ev;
 	ev->wrapper.glue->main_ev = main_ev;
 	ev->wrapper.glue->ops = ops;
-	ev->wrapper.glue->private_state = talloc_size(ev->wrapper.glue, psize);
+	ev->wrapper.glue->private_state = talloc_zero_size(ev->wrapper.glue, psize);
 	if (ev->wrapper.glue->private_state == NULL) {
 		talloc_free(ev);
 		return NULL;
-- 
2.17.1


From 22ea7cde4b3a62861b21f125613abe25210edc10 Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Mon, 16 Jul 2018 17:17:59 +0200
Subject: [PATCH 02/23] pthreadpool: make sure a pthreadpool is marked as
 stopped in child processes

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/pthreadpool/pthreadpool.c | 1 +
 1 file changed, 1 insertion(+)

diff --git a/lib/pthreadpool/pthreadpool.c b/lib/pthreadpool/pthreadpool.c
index 610cfb02f154..528f4cdfd678 100644
--- a/lib/pthreadpool/pthreadpool.c
+++ b/lib/pthreadpool/pthreadpool.c
@@ -341,6 +341,7 @@ static void pthreadpool_child(void)
 		pool->num_idle = 0;
 		pool->head = 0;
 		pool->num_jobs = 0;
+		pool->stopped = true;
 
 		ret = pthread_cond_init(&pool->condvar, NULL);
 		assert(ret == 0);
-- 
2.17.1


From aaa57d5dc790c0a5ad0ab4e8e052da3b49c4f528 Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Wed, 18 Jul 2018 10:17:51 +0200
Subject: [PATCH 03/23] pthreadpool: test pthreadpool_tevent_max_threads()
 returns the expected result

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/pthreadpool/tests_cmocka.c | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git a/lib/pthreadpool/tests_cmocka.c b/lib/pthreadpool/tests_cmocka.c
index 677800892f65..e2fb84394285 100644
--- a/lib/pthreadpool/tests_cmocka.c
+++ b/lib/pthreadpool/tests_cmocka.c
@@ -42,6 +42,7 @@ static int setup_pthreadpool_tevent(void **state)
 {
 	struct pthreadpool_tevent_test *t;
 	int ret;
+	size_t max_threads;
 
 	t = talloc_zero(NULL, struct pthreadpool_tevent_test);
 	assert_non_null(t);
@@ -52,12 +53,21 @@ static int setup_pthreadpool_tevent(void **state)
 	ret = pthreadpool_tevent_init(t->ev, UINT_MAX, &t->upool);
 	assert_return_code(ret, 0);
 
+	max_threads = pthreadpool_tevent_max_threads(t->upool);
+	assert_int_equal(max_threads, UINT_MAX);
+
 	ret = pthreadpool_tevent_init(t->ev, 1, &t->opool);
 	assert_return_code(ret, 0);
 
+	max_threads = pthreadpool_tevent_max_threads(t->opool);
+	assert_int_equal(max_threads, 1);
+
 	ret = pthreadpool_tevent_init(t->ev, 0, &t->spool);
 	assert_return_code(ret, 0);
 
+	max_threads = pthreadpool_tevent_max_threads(t->spool);
+	assert_int_equal(max_threads, 0);
+
 	*state = t;
 
 	return 0;
-- 
2.17.1


From 2202a6ec8e83b09060376c07b1a95bd448380157 Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Mon, 23 Jul 2018 23:24:22 +0200
Subject: [PATCH 04/23] pthreadpool: replace assert_return_code(ret, 0); with
 assert_int_equal(ret, 0);

We need to assert the exact value!

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/pthreadpool/tests_cmocka.c | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)

diff --git a/lib/pthreadpool/tests_cmocka.c b/lib/pthreadpool/tests_cmocka.c
index e2fb84394285..e6af8849f01d 100644
--- a/lib/pthreadpool/tests_cmocka.c
+++ b/lib/pthreadpool/tests_cmocka.c
@@ -51,19 +51,19 @@ static int setup_pthreadpool_tevent(void **state)
 	assert_non_null(t->ev);
 
 	ret = pthreadpool_tevent_init(t->ev, UINT_MAX, &t->upool);
-	assert_return_code(ret, 0);
+	assert_int_equal(ret, 0);
 
 	max_threads = pthreadpool_tevent_max_threads(t->upool);
 	assert_int_equal(max_threads, UINT_MAX);
 
 	ret = pthreadpool_tevent_init(t->ev, 1, &t->opool);
-	assert_return_code(ret, 0);
+	assert_int_equal(ret, 0);
 
 	max_threads = pthreadpool_tevent_max_threads(t->opool);
 	assert_int_equal(max_threads, 1);
 
 	ret = pthreadpool_tevent_init(t->ev, 0, &t->spool);
-	assert_return_code(ret, 0);
+	assert_int_equal(ret, 0);
 
 	max_threads = pthreadpool_tevent_max_threads(t->spool);
 	assert_int_equal(max_threads, 0);
-- 
2.17.1


From d4f4473c41b13e619cf030953012baa850dbd426 Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Wed, 18 Jul 2018 08:44:48 +0200
Subject: [PATCH 05/23] lib/replace: check for __thread support

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/replace/wscript | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git a/lib/replace/wscript b/lib/replace/wscript
index fd00a42d5b62..02d98c59e476 100644
--- a/lib/replace/wscript
+++ b/lib/replace/wscript
@@ -551,6 +551,18 @@ def configure(conf):
              conf.CONFIG_SET('HAVE_PTHREAD_MUTEX_CONSISTENT_NP'))):
             conf.DEFINE('HAVE_ROBUST_MUTEXES', 1)
 
+    # __thread is available since 2002 in gcc.
+    conf.CHECK_CODE('''
+        __thread int tls;
+
+        int main(void) {
+            return 0;
+        }
+        ''',
+        'HAVE___THREAD',
+        addmain=False,
+        msg='Checking for __thread local storage')
+
     conf.CHECK_FUNCS_IN('crypt', 'crypt', checklibc=True)
     conf.CHECK_FUNCS_IN('crypt_r', 'crypt', checklibc=True)
 
-- 
2.17.1


From 09d7af3894da1e7eb352a2122c8c5da90b219f2f Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Wed, 18 Jul 2018 08:54:22 +0200
Subject: [PATCH 06/23] third_party/*_wrapper/wscript: remove redundant
 configure checks

HAVE___THREAD and HAVE_DESTRUCTOR_ATTRIBUTE are already checked
as part of Samba.

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 third_party/nss_wrapper/wscript    | 31 ++----------------------------
 third_party/pam_wrapper/wscript    | 30 ++---------------------------
 third_party/resolv_wrapper/wscript | 30 ++---------------------------
 third_party/socket_wrapper/wscript | 30 ++---------------------------
 third_party/uid_wrapper/wscript    | 12 ++----------
 5 files changed, 10 insertions(+), 123 deletions(-)

diff --git a/third_party/nss_wrapper/wscript b/third_party/nss_wrapper/wscript
index d50dd5cbb17d..a289d4710321 100644
--- a/third_party/nss_wrapper/wscript
+++ b/third_party/nss_wrapper/wscript
@@ -11,35 +11,8 @@ def configure(conf):
     else:
         conf.CHECK_HEADERS('nss.h')
 
-        # check HAVE_GCC_THREAD_LOCAL_STORAGE
-        conf.CHECK_CODE('''
-            __thread int tls;
-
-            int main(void) {
-                return 0;
-            }
-            ''',
-            'HAVE_GCC_THREAD_LOCAL_STORAGE',
-            addmain=False,
-            msg='Checking for thread local storage')
-
-        # check HAVE_DESTRUCTOR_ATTRIBUTE
-        conf.CHECK_CODE('''
-            void test_destructor_attribute(void) __attribute__ ((destructor));
-
-            void test_destructor_attribute(void)
-            {
-                return;
-            }
-
-            int main(void) {
-                return 0;
-            }
-            ''',
-            'HAVE_DESTRUCTOR_ATTRIBUTE',
-            addmain=False,
-            strict=True,
-            msg='Checking for library destructor support')
+        if conf.CONFIG_SET("HAVE___THREAD"):
+            conf.DEFINE("HAVE_GCC_THREAD_LOCAL_STORAGE", 1)
 
         # check HAVE_ATTRIBUTE_PRINTF_FORMAT
         conf.CHECK_CODE('''
diff --git a/third_party/pam_wrapper/wscript b/third_party/pam_wrapper/wscript
index 7d4a790caaaa..1a1e3a29bf2c 100644
--- a/third_party/pam_wrapper/wscript
+++ b/third_party/pam_wrapper/wscript
@@ -17,35 +17,9 @@ def configure(conf):
         conf.DEFINE('USING_SYSTEM_PAM_WRAPPER', 1)
         libpam_wrapper_so_path = 'libpam_wrapper.so'
     else:
-        # check HAVE_GCC_THREAD_LOCAL_STORAGE
-        conf.CHECK_CODE('''
-            __thread int tls;
-
-            int main(void) {
-                return 0;
-            }
-            ''',
-            'HAVE_GCC_THREAD_LOCAL_STORAGE',
-            addmain=False,
-            msg='Checking for thread local storage')
-
-        # check HAVE_DESTRUCTOR_ATTRIBUTE
-        conf.CHECK_CODE('''
-            void test_destructor_attribute(void) __attribute__ ((destructor));
 
-            void test_destructor_attribute(void)
-            {
-                return;
-            }
-
-            int main(void) {
-                return 0;
-            }
-            ''',
-            'HAVE_DESTRUCTOR_ATTRIBUTE',
-            addmain=False,
-            strict=True,
-            msg='Checking for library destructor support')
+        if conf.CONFIG_SET("HAVE___THREAD"):
+            conf.DEFINE("HAVE_GCC_THREAD_LOCAL_STORAGE", 1)
 
         # check HAVE_FUNCTION_ATTRIBUTE_FORMAT
         conf.CHECK_CODE('''
diff --git a/third_party/resolv_wrapper/wscript b/third_party/resolv_wrapper/wscript
index bb7722e97758..7cd1d90b8fa8 100644
--- a/third_party/resolv_wrapper/wscript
+++ b/third_party/resolv_wrapper/wscript
@@ -9,35 +9,9 @@ def configure(conf):
         conf.DEFINE('USING_SYSTEM_RESOLV_WRAPPER', 1)
         libresolv_wrapper_so_path = 'libresolv_wrapper.so'
     else:
-        # check HAVE_GCC_THREAD_LOCAL_STORAGE
-        conf.CHECK_CODE('''
-            __thread int tls;
-
-            int main(void) {
-                return 0;
-            }
-            ''',
-            'HAVE_GCC_THREAD_LOCAL_STORAGE',
-            addmain=False,
-            msg='Checking for thread local storage')
-
-        # check HAVE_DESTRUCTOR_ATTRIBUTE
-        conf.CHECK_CODE('''
-            void test_destructor_attribute(void) __attribute__ ((destructor));
 
-            void test_destructor_attribute(void)
-            {
-                return;
-            }
-
-            int main(void) {
-                return 0;
-            }
-            ''',
-            'HAVE_DESTRUCTOR_ATTRIBUTE',
-            addmain=False,
-            strict=True,
-            msg='Checking for library destructor support')
+        if conf.CONFIG_SET("HAVE___THREAD"):
+            conf.DEFINE("HAVE_GCC_THREAD_LOCAL_STORAGE", 1)
 
         # check HAVE_ATTRIBUTE_PRINTF_FORMAT
         conf.CHECK_CODE('''
diff --git a/third_party/socket_wrapper/wscript b/third_party/socket_wrapper/wscript
index f48debe8b157..a0ee4f2f9325 100644
--- a/third_party/socket_wrapper/wscript
+++ b/third_party/socket_wrapper/wscript
@@ -9,35 +9,9 @@ def configure(conf):
         conf.DEFINE('USING_SYSTEM_SOCKET_WRAPPER', 1)
         libsocket_wrapper_so_path = 'libsocket_wrapper.so'
     else:
-        # check HAVE_GCC_THREAD_LOCAL_STORAGE
-        conf.CHECK_CODE('''
-            __thread int tls;
-
-            int main(void) {
-                return 0;
-            }
-            ''',
-            'HAVE_GCC_THREAD_LOCAL_STORAGE',
-            addmain=False,
-            msg='Checking for thread local storage')
-
-        # check HAVE_DESTRUCTOR_ATTRIBUTE
-        conf.CHECK_CODE('''
-            void test_destructor_attribute(void) __attribute__ ((destructor));
 
-            void test_destructor_attribute(void)
-            {
-                return;
-            }
-
-            int main(void) {
-                return 0;
-            }
-            ''',
-            'HAVE_DESTRUCTOR_ATTRIBUTE',
-            addmain=False,
-            strict=True,
-            msg='Checking for library destructor support')
+        if conf.CONFIG_SET("HAVE___THREAD"):
+            conf.DEFINE("HAVE_GCC_THREAD_LOCAL_STORAGE", 1)
 
         # check HAVE_FUNCTION_ATTRIBUTE_FORMAT
         conf.CHECK_CODE('''
diff --git a/third_party/uid_wrapper/wscript b/third_party/uid_wrapper/wscript
index 6344ebf5eba4..8127a9d9f408 100644
--- a/third_party/uid_wrapper/wscript
+++ b/third_party/uid_wrapper/wscript
@@ -23,17 +23,9 @@ def configure(conf):
             addmain=False,
             msg='Checking for atomic builtins')
 
-        # check HAVE_GCC_THREAD_LOCAL_STORAGE
-        conf.CHECK_CODE('''
-            __thread int tls;
 
-            int main(void) {
-                return 0;
-            }
-            ''',
-            'HAVE_GCC_THREAD_LOCAL_STORAGE',
-            addmain=False,
-            msg='Checking for thread local storage')
+        if conf.CONFIG_SET("HAVE___THREAD"):
+            conf.DEFINE("HAVE_GCC_THREAD_LOCAL_STORAGE", 1)
 
         if Options.options.address_sanitizer:
             # check HAVE_ADDRESS_SANITIZER_ATTRIBUTE
-- 
2.17.1


From f9a18a27a9155acb7a2faf70390bfafa167b9a76 Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Thu, 21 Jun 2018 14:17:35 +0200
Subject: [PATCH 07/23] replace: add checks for
 atomic_thread_fence(memory_order_seq_cst) and add possible fallbacks

This implements a full memory barrier.
On ubuntu amd64 with results in an 'mfence' instruction.

This is required to syncronization between threads, where
there's typically only one write of a memory that should be
synced between all threads with the barrier.

Much more details can be found here:
https://gcc.gnu.org/onlinedocs/gcc-7.3.0/gcc/_005f_005fatomic-Builtins.html#g_t_005f_005fatomic-Builtins
https://gcc.gnu.org/onlinedocs/gcc-7.3.0/gcc/_005f_005fsync-Builtins.html#g_t_005f_005fsync-Builtins

The main one we use seems to be in C11 via stdatomic.h,
the oldest fallback is __sync_synchronize(), which is available
since 2005 in gcc.

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/replace/system/threads.h | 27 +++++++++++++++++++++++++++
 lib/replace/wscript          | 19 ++++++++++++++++++-
 2 files changed, 45 insertions(+), 1 deletion(-)

diff --git a/lib/replace/system/threads.h b/lib/replace/system/threads.h
index fe6d0fbac541..d189ed620c51 100644
--- a/lib/replace/system/threads.h
+++ b/lib/replace/system/threads.h
@@ -42,4 +42,31 @@
 #define pthread_mutex_consistent pthread_mutex_consistent_np
 #endif
 
+#ifdef HAVE_STDATOMIC_H
+#include <stdatomic.h>
+#endif
+
+#ifndef HAVE_ATOMIC_THREAD_FENCE
+#ifdef HAVE___ATOMIC_THREAD_FENCE
+#define atomic_thread_fence(__ignore_order) __atomic_thread_fence(__ATOMIC_SEQ_CST)
+#define HAVE_ATOMIC_THREAD_FENCE 1
+#endif /* HAVE___ATOMIC_THREAD_FENCE */
+#endif /* not HAVE_ATOMIC_THREAD_FENCE */
+
+#ifndef HAVE_ATOMIC_THREAD_FENCE
+#ifdef HAVE___SYNC_SYNCHRONIZE
+#define atomic_thread_fence(__ignore_order) __sync_synchronize()
+#define HAVE_ATOMIC_THREAD_FENCE 1
+#endif /* HAVE___SYNC_SYNCHRONIZE */
+#endif /* not HAVE_ATOMIC_THREAD_FENCE */
+
+#ifndef HAVE_ATOMIC_THREAD_FENCE
+#ifdef HAVE_ATOMIC_THREAD_FENCE_SUPPORT
+#error mismatch_error_between_configure_test_and_header
+#endif
+/* make sure the build fails if someone uses it without checking the define */
+#define atomic_thread_fence(__order) \
+        __function__atomic_thread_fence_not_available_on_this_platform__()
+#endif /* not HAVE_ATOMIC_THREAD_FENCE */
+
 #endif
diff --git a/lib/replace/wscript b/lib/replace/wscript
index 02d98c59e476..5b53461f8448 100644
--- a/lib/replace/wscript
+++ b/lib/replace/wscript
@@ -113,7 +113,7 @@ def configure(conf):
     conf.CHECK_HEADERS('sys/extattr.h sys/ea.h sys/proplist.h sys/cdefs.h')
     conf.CHECK_HEADERS('utmp.h utmpx.h lastlog.h')
     conf.CHECK_HEADERS('syscall.h sys/syscall.h inttypes.h')
-    conf.CHECK_HEADERS('sys/atomic.h')
+    conf.CHECK_HEADERS('sys/atomic.h stdatomic.h')
     conf.CHECK_HEADERS('libgen.h')
 
     if conf.CHECK_CFLAGS('-Wno-format-truncation'):
@@ -260,6 +260,23 @@ def configure(conf):
                     headers='stdint.h sys/atomic.h',
                     msg='Checking for atomic_add_32 compiler builtin')
 
+    # Check for thread fence. */
+    tf = conf.CHECK_CODE('atomic_thread_fence(memory_order_seq_cst);',
+                         'HAVE_ATOMIC_THREAD_FENCE',
+                         headers='stdatomic.h',
+                         msg='Checking for atomic_thread_fence(memory_order_seq_cst) in stdatomic.h')
+    if not tf:
+        tf = conf.CHECK_CODE('__atomic_thread_fence(__ATOMIC_SEQ_CST);',
+                             'HAVE___ATOMIC_THREAD_FENCE',
+                             msg='Checking for __atomic_thread_fence(__ATOMIC_SEQ_CST)')
+    if not tf:
+        # __sync_synchronize() is available since 2005 in gcc.
+        tf = conf.CHECK_CODE('__sync_synchronize();',
+                             'HAVE___SYNC_SYNCHRONIZE',
+                             msg='Checking for __sync_synchronize')
+    if tf:
+        conf.DEFINE('HAVE_ATOMIC_THREAD_FENCE_SUPPORT', 1)
+
     conf.CHECK_CODE('''
                     #define FALL_THROUGH __attribute__((fallthrough))
 
-- 
2.17.1


From cd19ebad426cd8e5e4d0331e7f81a93b5316c3b6 Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Wed, 20 Jun 2018 13:38:19 +0200
Subject: [PATCH 08/23] pthreadpool: add some lockless coordination between the
 main and job threads

In the direction from the main process to the job thread, we have:

- 'maycancel', which is set when tevent_req_cancel() is called,
- 'orphaned' is the job request, tevent_context or pthreadpool_tevent
  was talloc_free'ed.

The job function can consume these by using:

   /*
    * return true - if tevent_req_cancel() was called.
    */
   bool pthreadpool_tevent_current_job_canceled(void);

   /*
    * return true - if talloc_free() was called on the job request,
    * tevent_context or pthreadpool_tevent.
    */
   bool pthreadpool_tevent_current_job_orphaned(void);

   /*
    * return true if canceled and orphaned are both false.
    */
   bool pthreadpool_tevent_current_job_continue(void);

In the other direction we remember the following points
in the job execution:

- 'started'  - set when the job is picked up by a worker thread
- 'executed' - set once the job function returned.
- 'finished' - set when pthreadpool_tevent_job_signal() is entered
- 'dropped'  - set when pthreadpool_tevent_job_signal() leaves with orphaned
- 'signaled' - set when pthreadpool_tevent_job_signal() leaves normal

There're only one side writing each element,
either the main process or the job thread.

This means we can do the coordination with a full memory
barrier using atomic_thread_fence(memory_order_seq_cst).
lib/replace provides fallbacks if C11 stdatomic.h is not available.

A real pthreadpool requires pthread and atomic_thread_fence() (or an
replacement) to be available, otherwise we only have pthreadpool_sync.c.
But this should not make a real difference, as at least
__sync_synchronize() is availabe since 2005 in gcc.
We also require __thread which is available since 2002.

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/pthreadpool/pthreadpool_tevent.c | 212 ++++++++++++++++++++++++++-
 lib/pthreadpool/pthreadpool_tevent.h |  14 ++
 wscript                              |  12 +-
 3 files changed, 231 insertions(+), 7 deletions(-)

diff --git a/lib/pthreadpool/pthreadpool_tevent.c b/lib/pthreadpool/pthreadpool_tevent.c
index e7e17d3bf0f7..fbf9c0e835a6 100644
--- a/lib/pthreadpool/pthreadpool_tevent.c
+++ b/lib/pthreadpool/pthreadpool_tevent.c
@@ -18,10 +18,42 @@
  */
 
 #include "replace.h"
+#include "system/threads.h"
 #include "pthreadpool_tevent.h"
 #include "pthreadpool.h"
 #include "lib/util/tevent_unix.h"
 #include "lib/util/dlinklist.h"
+#include "lib/util/attr.h"
+
+#define PTHREAD_TEVENT_JOB_THREAD_FENCE_INIT(__job) do { \
+	_UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
+} while(0);
+
+#ifdef WITH_PTHREADPOOL
+/*
+ * configure checked we have pthread and atomic_thread_fence() available
+ */
+#define __PTHREAD_TEVENT_JOB_THREAD_FENCE(__order) do { \
+	atomic_thread_fence(__order); \
+} while(0)
+#else
+/*
+ * we're using lib/pthreadpool/pthreadpool_sync.c ...
+ */
+#define __PTHREAD_TEVENT_JOB_THREAD_FENCE(__order) do { } while(0)
+#ifndef HAVE___THREAD
+#define __thread
+#endif
+#endif
+
+#define PTHREAD_TEVENT_JOB_THREAD_FENCE(__job) do { \
+	_UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
+	__PTHREAD_TEVENT_JOB_THREAD_FENCE(memory_order_seq_cst); \
+} while(0);
+
+#define PTHREAD_TEVENT_JOB_THREAD_FENCE_FINI(__job) do { \
+	_UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
+} while(0);
 
 struct pthreadpool_tevent_job_state;
 
@@ -75,6 +107,70 @@ struct pthreadpool_tevent_job {
 
 	void (*fn)(void *private_data);
 	void *private_data;
+
+	/*
+	 * Coordination between threads
+	 *
+	 * There're only one side writing each element
+	 * either the main process or the job thread.
+	 *
+	 * The coordination is done by a full memory
+	 * barrier using atomic_thread_fence(memory_order_seq_cst)
+	 * wrapped in PTHREAD_TEVENT_JOB_THREAD_FENCE()
+	 */
+	struct {
+		/*
+		 * 'maycancel'
+		 * set when tevent_req_cancel() is called.
+		 * (only written by main thread!)
+		 */
+		bool maycancel;
+
+		/*
+		 * 'orphaned'
+		 * set when talloc_free is called on the job request,
+		 * tevent_context or pthreadpool_tevent.
+		 * (only written by main thread!)
+		 */
+		bool orphaned;
+
+		/*
+		 * 'started'
+		 * set when the job is picked up by a worker thread
+		 * (only written by job thread!)
+		 */
+		bool started;
+
+		/*
+		 * 'executed'
+		 * set once the job function returned.
+		 * (only written by job thread!)
+		 */
+		bool executed;
+
+		/*
+		 * 'finished'
+		 * set when pthreadpool_tevent_job_signal() is entered
+		 * (only written by job thread!)
+		 */
+		bool finished;
+
+		/*
+		 * 'dropped'
+		 * set when pthreadpool_tevent_job_signal() leaves with
+		 * orphaned already set.
+		 * (only written by job thread!)
+		 */
+		bool dropped;
+
+		/*
+		 * 'signaled'
+		 * set when pthreadpool_tevent_job_signal() leaves normal
+		 * and the immediate event was scheduled.
+		 * (only written by job thread!)
+		 */
+		bool signaled;
+	} needs_fence;
 };
 
 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool);
@@ -299,11 +395,11 @@ static bool pthreadpool_tevent_job_cancel(struct tevent_req *req);
 static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job *job)
 {
 	/*
-	 * We should never be called with state->state != NULL.
+	 * We should never be called with needs_fence.orphaned == false.
 	 * Only pthreadpool_tevent_job_orphan() will call TALLOC_FREE(job)
 	 * after detaching from the request state and pool list.
 	 */
-	if (job->state != NULL) {
+	if (!job->needs_fence.orphaned) {
 		abort();
 	}
 
@@ -328,6 +424,17 @@ static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job *job)
 		}
 	}
 
+	PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+	if (job->needs_fence.dropped) {
+		/*
+		 * The signal function saw job->needs_fence.orphaned
+		 * before it started the signaling via the immediate
+		 * event. So we'll never geht triggered and can
+		 * remove job->im and let the whole job go...
+		 */
+		TALLOC_FREE(job->im);
+	}
+
 	/*
 	 * pthreadpool_tevent_job_orphan() already removed
 	 * it from pool->jobs. And we don't need try
@@ -351,11 +458,15 @@ static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job *job)
 	 */
 	DLIST_REMOVE(orphaned_jobs, job);
 
+	PTHREAD_TEVENT_JOB_THREAD_FENCE_FINI(job);
 	return 0;
 }
 
 static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job)
 {
+	job->needs_fence.orphaned = true;
+	PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+
 	/*
 	 * We're the only function that sets
 	 * job->state = NULL;
@@ -476,6 +587,7 @@ struct tevent_req *pthreadpool_tevent_job_send(
 	if (tevent_req_nomem(job->im, req)) {
 		return tevent_req_post(req, ev);
 	}
+	PTHREAD_TEVENT_JOB_THREAD_FENCE_INIT(job);
 	talloc_set_destructor(job, pthreadpool_tevent_job_destructor);
 	DLIST_ADD_END(job->pool->jobs, job);
 	job->state = state;
@@ -492,13 +604,76 @@ struct tevent_req *pthreadpool_tevent_job_send(
 	return req;
 }
 
+static __thread struct pthreadpool_tevent_job *current_job;
+
+bool pthreadpool_tevent_current_job_canceled(void)
+{
+	if (current_job == NULL) {
+		/*
+		 * Should only be called from within
+		 * the job function.
+		 */
+		abort();
+		return false;
+	}
+
+	PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
+	return current_job->needs_fence.maycancel;
+}
+
+bool pthreadpool_tevent_current_job_orphaned(void)
+{
+	if (current_job == NULL) {
+		/*
+		 * Should only be called from within
+		 * the job function.
+		 */
+		abort();
+		return false;
+	}
+
+	PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
+	return current_job->needs_fence.orphaned;
+}
+
+bool pthreadpool_tevent_current_job_continue(void)
+{
+	if (current_job == NULL) {
+		/*
+		 * Should only be called from within
+		 * the job function.
+		 */
+		abort();
+		return false;
+	}
+
+	PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
+	if (current_job->needs_fence.maycancel) {
+		return false;
+	}
+	PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
+	if (current_job->needs_fence.orphaned) {
+		return false;
+	}
+
+	return true;
+}
+
 static void pthreadpool_tevent_job_fn(void *private_data)
 {
 	struct pthreadpool_tevent_job *job =
 		talloc_get_type_abort(private_data,
 		struct pthreadpool_tevent_job);
 
+	current_job = job;
+	job->needs_fence.started = true;
+	PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+
 	job->fn(job->private_data);
+
+	job->needs_fence.executed = true;
+	PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+	current_job = NULL;
 }
 
 static int pthreadpool_tevent_job_signal(int jobid,
@@ -513,8 +688,12 @@ static int pthreadpool_tevent_job_signal(int jobid,
 	struct tevent_threaded_context *tctx = NULL;
 	struct pthreadpool_tevent_glue *g = NULL;
 
-	if (state == NULL) {
+	job->needs_fence.finished = true;
+	PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+	if (job->needs_fence.orphaned) {
 		/* Request already gone */
+		job->needs_fence.dropped = true;
+		PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
 		return 0;
 	}
 
@@ -543,6 +722,8 @@ static int pthreadpool_tevent_job_signal(int jobid,
 					  job);
 	}
 
+	job->needs_fence.signaled = true;
+	PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
 	return 0;
 }
 
@@ -565,9 +746,17 @@ static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
 
 	/*
 	 * pthreadpool_tevent_job_cleanup()
-	 * will destroy the job.
+	 * (called by tevent_req_done() or
+	 * tevent_req_error()) will destroy the job.
 	 */
-	tevent_req_done(state->req);
+
+	if (job->needs_fence.executed) {
+		tevent_req_done(state->req);
+		return;
+	}
+
+	tevent_req_error(state->req, ENOEXEC);
+	return;
 }
 
 static bool pthreadpool_tevent_job_cancel(struct tevent_req *req)
@@ -582,6 +771,19 @@ static bool pthreadpool_tevent_job_cancel(struct tevent_req *req)
 		return false;
 	}
 
+	job->needs_fence.maycancel = true;
+	PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+	if (job->needs_fence.started) {
+		/*
+		 * It was too late to cancel the request.
+		 *
+		 * The job still has the chance to look
+		 * at pthreadpool_tevent_current_job_canceled()
+		 * or pthreadpool_tevent_current_job_continue()
+		 */
+		return false;
+	}
+
 	num = pthreadpool_cancel_job(job->pool->pool, 0,
 				     pthreadpool_tevent_job_fn,
 				     job);
diff --git a/lib/pthreadpool/pthreadpool_tevent.h b/lib/pthreadpool/pthreadpool_tevent.h
index fdb86e23757a..37e491e17c47 100644
--- a/lib/pthreadpool/pthreadpool_tevent.h
+++ b/lib/pthreadpool/pthreadpool_tevent.h
@@ -32,6 +32,20 @@ int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
 size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool);
 size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool);
 
+/*
+ * return true - if tevent_req_cancel() was called.
+ */
+bool pthreadpool_tevent_current_job_canceled(void);
+/*
+ * return true - if talloc_free() was called on the job request,
+ * tevent_context or pthreadpool_tevent.
+ */
+bool pthreadpool_tevent_current_job_orphaned(void);
+/*
+ * return true if canceled and orphaned are both false.
+ */
+bool pthreadpool_tevent_current_job_continue(void);
+
 struct tevent_req *pthreadpool_tevent_job_send(
 	TALLOC_CTX *mem_ctx, struct tevent_context *ev,
 	struct pthreadpool_tevent *pool,
diff --git a/wscript b/wscript
index 19fc6d12118e..f11c49dde676 100644
--- a/wscript
+++ b/wscript
@@ -259,10 +259,18 @@ def configure(conf):
         conf.DEFINE('WITH_NTVFS_FILESERVER', 1)
 
     if Options.options.with_pthreadpool:
-        if conf.CONFIG_SET('HAVE_PTHREAD'):
+        if conf.CONFIG_SET('HAVE_PTHREAD') and \
+           conf.CONFIG_SET('HAVE___THREAD') and \
+           conf.CONFIG_SET('HAVE_ATOMIC_THREAD_FENCE_SUPPORT'):
             conf.DEFINE('WITH_PTHREADPOOL', '1')
         else:
-            Logs.warn("pthreadpool support cannot be enabled when pthread support was not found")
+            if not conf.CONFIG_SET('HAVE_PTHREAD'):
+                Logs.warn("pthreadpool support cannot be enabled when pthread support was not found")
+            if not conf.CONFIG_SET('HAVE_ATOMIC_THREAD_FENCE_SUPPORT'):
+                Logs.warn("""pthreadpool support cannot be enabled when there is
+                          no support for atomic_thead_fence()""")
+            if not conf.CONFIG_SET('HAVE___THREAD'):
+                Logs.warn("pthreadpool support cannot be enabled when __thread support was not found")
             conf.undefine('WITH_PTHREADPOOL')
 
     conf.RECURSE('source3')
-- 
2.17.1


From 419d031ee11af3f912b4c97304e1a6b5cc43c721 Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Thu, 21 Jun 2018 12:46:06 +0200
Subject: [PATCH 09/23] s3:wscript: don't check for valgrind related headers
 twice

We already check them in lib/replace/wscript.

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 source3/wscript | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/source3/wscript b/source3/wscript
index 633a3655b211..aed784ef0179 100644
--- a/source3/wscript
+++ b/source3/wscript
@@ -1034,7 +1034,7 @@ syscall(SYS_setgroups32, 0, NULL);
             Logs.warn("--with-dnsupdate=yes but gssapi support not sufficient")
         else:
             conf.DEFINE('WITH_DNS_UPDATES', 1)
-    conf.CHECK_HEADERS('valgrind.h valgrind/valgrind.h valgrind/memcheck.h')
+    # valgrind.h or valgrind/valgrind.h is checked in lib/replace/wscript
     if Options.options.developer:
         if conf.CONFIG_SET('HAVE_VALGRIND_H') or conf.CONFIG_SET('HAVE_VALGRIND_VALGRIND_H'):
             conf.DEFINE('VALGRIND', '1')
-- 
2.17.1


From cc3a701ebb6351579e897bdbd06fb4478afa9e46 Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Thu, 21 Jun 2018 12:46:48 +0200
Subject: [PATCH 10/23] lib/replace: also check for valgrind/helgrind.h

This will be used in lib/pthreadpool/pthreadpool_tevent.c
in order to avoid extected helgrind/drd warnings.

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/replace/wscript | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/lib/replace/wscript b/lib/replace/wscript
index 5b53461f8448..3dd10ae0356a 100644
--- a/lib/replace/wscript
+++ b/lib/replace/wscript
@@ -108,7 +108,8 @@ def configure(conf):
     conf.CHECK_HEADERS('sys/fileio.h sys/filesys.h sys/dustat.h sys/sysmacros.h')
     conf.CHECK_HEADERS('xfs/libxfs.h netgroup.h')
 
-    conf.CHECK_HEADERS('valgrind.h valgrind/valgrind.h valgrind/memcheck.h')
+    conf.CHECK_HEADERS('valgrind.h valgrind/valgrind.h')
+    conf.CHECK_HEADERS('valgrind/memcheck.h valgrind/helgrind.h')
     conf.CHECK_HEADERS('nss_common.h nsswitch.h ns_api.h')
     conf.CHECK_HEADERS('sys/extattr.h sys/ea.h sys/proplist.h sys/cdefs.h')
     conf.CHECK_HEADERS('utmp.h utmpx.h lastlog.h')
-- 
2.17.1


From 94839880fe5553401fa3e68c40f52f62f38d15e0 Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Thu, 21 Jun 2018 12:43:08 +0200
Subject: [PATCH 11/23] pthreadpool: add helgrind magic to
 PTHREAD_TEVENT_JOB_THREAD_FENCE_*()

This avoids the expected helgrind/drd warnings on the job states which
are protected by the thread fence.

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/pthreadpool/pthreadpool_tevent.c | 34 ++++++++++++++++++++++++++++
 1 file changed, 34 insertions(+)

diff --git a/lib/pthreadpool/pthreadpool_tevent.c b/lib/pthreadpool/pthreadpool_tevent.c
index fbf9c0e835a6..821d13b02362 100644
--- a/lib/pthreadpool/pthreadpool_tevent.c
+++ b/lib/pthreadpool/pthreadpool_tevent.c
@@ -25,8 +25,39 @@
 #include "lib/util/dlinklist.h"
 #include "lib/util/attr.h"
 
+/*
+ * We try to give some hints to helgrind/drd
+ *
+ * Note ANNOTATE_BENIGN_RACE_SIZED(address, size, describtion)
+ * takes an memory address range that ignored by helgrind/drd
+ * 'description' is just ignored...
+ *
+ *
+ * Note that ANNOTATE_HAPPENS_*(unique_uintptr)
+ * just takes a DWORD/(void *) as unique key
+ * for the barrier.
+ */
+#ifdef HAVE_VALGRIND_HELGRIND_H
+#include <valgrind/helgrind.h>
+#endif
+#ifndef ANNOTATE_BENIGN_RACE_SIZED
+#define ANNOTATE_BENIGN_RACE_SIZED(address, size, describtion)
+#endif
+#ifndef ANNOTATE_HAPPENS_BEFORE
+#define ANNOTATE_HAPPENS_BEFORE(unique_uintptr)
+#endif
+#ifndef ANNOTATE_HAPPENS_AFTER
+#define ANNOTATE_HAPPENS_AFTER(unique_uintptr)
+#endif
+#ifndef ANNOTATE_HAPPENS_BEFORE_FORGET_ALL
+#define ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(unique_uintptr)
+#endif
+
 #define PTHREAD_TEVENT_JOB_THREAD_FENCE_INIT(__job) do { \
 	_UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
+	ANNOTATE_BENIGN_RACE_SIZED(&__j->needs_fence, \
+				   sizeof(__j->needs_fence), \
+				   "race by design, protected by fence"); \
 } while(0);
 
 #ifdef WITH_PTHREADPOOL
@@ -48,11 +79,14 @@
 
 #define PTHREAD_TEVENT_JOB_THREAD_FENCE(__job) do { \
 	_UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
+	ANNOTATE_HAPPENS_BEFORE(&__job->needs_fence); \
 	__PTHREAD_TEVENT_JOB_THREAD_FENCE(memory_order_seq_cst); \
+	ANNOTATE_HAPPENS_AFTER(&__job->needs_fence); \
 } while(0);
 
 #define PTHREAD_TEVENT_JOB_THREAD_FENCE_FINI(__job) do { \
 	_UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
+	ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&__job->needs_fence); \
 } while(0);
 
 struct pthreadpool_tevent_job_state;
-- 
2.17.1


From 1e5fce84f87add46dbdab428fa4727577c4391f7 Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Fri, 22 Jun 2018 17:14:31 +0200
Subject: [PATCH 12/23] pthreadpool: maintain a list of job_states on each
 pthreadpool_tevent_glue

We should avoid traversing a linked list within a thread without holding
a mutex!

Using a mutex would be very tricky as we'll likely deadlock with
the mutexes at the raw pthreadpool layer.

So we use somekind of spinlock using atomic_thread_fence in order to
protect the access to job->state->glue->{tctx,ev} in
pthreadpool_tevent_job_signal().

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/pthreadpool/pthreadpool_tevent.c | 102 ++++++++++++++++++++-------
 1 file changed, 78 insertions(+), 24 deletions(-)

diff --git a/lib/pthreadpool/pthreadpool_tevent.c b/lib/pthreadpool/pthreadpool_tevent.c
index 821d13b02362..3b502a7cc5a3 100644
--- a/lib/pthreadpool/pthreadpool_tevent.c
+++ b/lib/pthreadpool/pthreadpool_tevent.c
@@ -18,6 +18,7 @@
  */
 
 #include "replace.h"
+#include "system/select.h"
 #include "system/threads.h"
 #include "pthreadpool_tevent.h"
 #include "pthreadpool.h"
@@ -104,6 +105,8 @@ struct pthreadpool_tevent_glue {
 	struct tevent_threaded_context *tctx;
 	/* Pointer to link object owned by *ev. */
 	struct pthreadpool_tevent_glue_ev_link *ev_link;
+	/* active jobs */
+	struct pthreadpool_tevent_job_state *states;
 };
 
 /*
@@ -127,6 +130,8 @@ struct pthreadpool_tevent {
 };
 
 struct pthreadpool_tevent_job_state {
+	struct pthreadpool_tevent_job_state *prev, *next;
+	struct pthreadpool_tevent_glue *glue;
 	struct tevent_context *ev;
 	struct tevent_req *req;
 	struct pthreadpool_tevent_job *job;
@@ -322,6 +327,16 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
 static int pthreadpool_tevent_glue_destructor(
 	struct pthreadpool_tevent_glue *glue)
 {
+	struct pthreadpool_tevent_job_state *state = NULL;
+	struct pthreadpool_tevent_job_state *nstate = NULL;
+
+	for (state = glue->states; state != NULL; state = nstate) {
+		nstate = state->next;
+
+		/* The job this removes it from the list */
+		pthreadpool_tevent_job_orphan(state->job);
+	}
+
 	if (glue->pool->glue_list != NULL) {
 		DLIST_REMOVE(glue->pool->glue_list, glue);
 	}
@@ -355,9 +370,11 @@ static int pthreadpool_tevent_glue_link_destructor(
 	return 0;
 }
 
-static int pthreadpool_tevent_register_ev(struct pthreadpool_tevent *pool,
-					  struct tevent_context *ev)
+static int pthreadpool_tevent_register_ev(
+				struct pthreadpool_tevent *pool,
+				struct pthreadpool_tevent_job_state *state)
 {
+	struct tevent_context *ev = state->ev;
 	struct pthreadpool_tevent_glue *glue = NULL;
 	struct pthreadpool_tevent_glue_ev_link *ev_link = NULL;
 
@@ -368,7 +385,9 @@ static int pthreadpool_tevent_register_ev(struct pthreadpool_tevent *pool,
 	 * pair.
 	 */
 	for (glue = pool->glue_list; glue != NULL; glue = glue->next) {
-		if (glue->ev == ev) {
+		if (glue->ev == state->ev) {
+			state->glue = glue;
+			DLIST_ADD_END(glue->states, state);
 			return 0;
 		}
 	}
@@ -416,6 +435,9 @@ static int pthreadpool_tevent_register_ev(struct pthreadpool_tevent *pool,
 	}
 #endif
 
+	state->glue = glue;
+	DLIST_ADD_END(glue->states, state);
+
 	DLIST_ADD(pool->glue_list, glue);
 	return 0;
 }
@@ -431,7 +453,7 @@ static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job *job)
 	/*
 	 * We should never be called with needs_fence.orphaned == false.
 	 * Only pthreadpool_tevent_job_orphan() will call TALLOC_FREE(job)
-	 * after detaching from the request state and pool list.
+	 * after detaching from the request state, glue and pool list.
 	 */
 	if (!job->needs_fence.orphaned) {
 		abort();
@@ -509,6 +531,42 @@ static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job)
 		abort();
 	}
 
+	/*
+	 * Once we marked the request as 'orphaned'
+	 * we spin/loop if it's already marked
+	 * as 'finished' (which means that
+	 * pthreadpool_tevent_job_signal() was entered.
+	 * If it saw 'orphaned' it will exit after setting
+	 * 'dropped', otherwise it dereferences
+	 * job->state->glue->{tctx,ev} until it exited
+	 * after setting 'signaled'.
+	 *
+	 * We need to close this potential gab before
+	 * we can set job->state = NULL.
+	 *
+	 * This is some kind of spinlock, but with
+	 * 1 millisecond sleeps in between, in order
+	 * to give the thread more cpu time to finish.
+	 */
+	PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+	while (job->needs_fence.finished) {
+		if (job->needs_fence.dropped) {
+			break;
+		}
+		if (job->needs_fence.signaled) {
+			break;
+		}
+		poll(NULL, 0, 1);
+		PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+	}
+
+	/*
+	 * Once the gab is closed, we can remove
+	 * the glue link.
+	 */
+	DLIST_REMOVE(job->state->glue->states, job->state);
+	job->state->glue = NULL;
+
 	/*
 	 * We need to reparent to a long term context.
 	 * And detach from the request state.
@@ -561,6 +619,10 @@ static void pthreadpool_tevent_job_cleanup(struct tevent_req *req,
 		 * The job request is not scheduled in the pool
 		 * yet or anymore.
 		 */
+		if (state->glue != NULL) {
+			DLIST_REMOVE(state->glue->states, state);
+			state->glue = NULL;
+		}
 		return;
 	}
 
@@ -605,7 +667,7 @@ struct tevent_req *pthreadpool_tevent_job_send(
 		return tevent_req_post(req, ev);
 	}
 
-	ret = pthreadpool_tevent_register_ev(pool, ev);
+	ret = pthreadpool_tevent_register_ev(pool, state);
 	if (tevent_req_error(req, ret)) {
 		return tevent_req_post(req, ev);
 	}
@@ -718,9 +780,6 @@ static int pthreadpool_tevent_job_signal(int jobid,
 	struct pthreadpool_tevent_job *job =
 		talloc_get_type_abort(job_private_data,
 		struct pthreadpool_tevent_job);
-	struct pthreadpool_tevent_job_state *state = job->state;
-	struct tevent_threaded_context *tctx = NULL;
-	struct pthreadpool_tevent_glue *g = NULL;
 
 	job->needs_fence.finished = true;
 	PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
@@ -731,27 +790,22 @@ static int pthreadpool_tevent_job_signal(int jobid,
 		return 0;
 	}
 
-#ifdef HAVE_PTHREAD
-	for (g = job->pool->glue_list; g != NULL; g = g->next) {
-		if (g->ev == state->ev) {
-			tctx = g->tctx;
-			break;
-		}
-	}
-
-	if (tctx == NULL) {
-		abort();
-	}
-#endif
-
-	if (tctx != NULL) {
+	/*
+	 * state and state->glue are valid,
+	 * see the job->needs_fence.finished
+	 * "spinlock" loop in
+	 * pthreadpool_tevent_job_orphan()
+	 */
+	if (job->state->glue->tctx != NULL) {
 		/* with HAVE_PTHREAD */
-		tevent_threaded_schedule_immediate(tctx, job->im,
+		tevent_threaded_schedule_immediate(job->state->glue->tctx,
+						   job->im,
 						   pthreadpool_tevent_job_done,
 						   job);
 	} else {
 		/* without HAVE_PTHREAD */
-		tevent_schedule_immediate(job->im, state->ev,
+		tevent_schedule_immediate(job->im,
+					  job->state->glue->ev,
 					  pthreadpool_tevent_job_done,
 					  job);
 	}
-- 
2.17.1


From 6f9855ccca585bbabd9fe29b4eddd0426a252b81 Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Fri, 22 Jun 2018 17:22:10 +0200
Subject: [PATCH 13/23] pthreadpool: add a comment about a further optimization
 in pthreadpool_tevent_job_destructor()

This seems to be a really rare race, it's likely that the immediate
event will still trigger and cleanup.

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/pthreadpool/pthreadpool_tevent.c | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)

diff --git a/lib/pthreadpool/pthreadpool_tevent.c b/lib/pthreadpool/pthreadpool_tevent.c
index 3b502a7cc5a3..94b6b9ded8ff 100644
--- a/lib/pthreadpool/pthreadpool_tevent.c
+++ b/lib/pthreadpool/pthreadpool_tevent.c
@@ -491,6 +491,23 @@ static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job *job)
 		TALLOC_FREE(job->im);
 	}
 
+	/*
+	 * TODO?: We could further improve this by adjusting
+	 * tevent_threaded_schedule_immediate_destructor()
+	 * and allow TALLOC_FREE() during its time
+	 * in the main_ev->scheduled_immediates list.
+	 *
+	 * PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+	 * if (state->needs_fence.signaled) {
+	 *       *
+	 *       * The signal function is completed
+	 *       * in future we may be allowed
+	 *       * to call TALLOC_FREE(job->im).
+	 *       *
+	 *      TALLOC_FREE(job->im);
+	 * }
+	 */
+
 	/*
 	 * pthreadpool_tevent_job_orphan() already removed
 	 * it from pool->jobs. And we don't need try
-- 
2.17.1


From 80c19a2a90f34fc95b817fa1b353c29c5bb6f0db Mon Sep 17 00:00:00 2001
From: Ralph Boehme <slow at samba.org>
Date: Mon, 18 Jun 2018 15:32:30 +0200
Subject: [PATCH 14/23] pthreadpool: test cancelling and freeing pending
 pthreadpool_tevent jobs/pools

Pair-Programmed-With: Stefan Metzmacher <metze at samba.org>

Signed-off-by: Ralph Boehme <slow at samba.org>
Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/pthreadpool/tests_cmocka.c | 434 +++++++++++++++++++++++++++++++++
 1 file changed, 434 insertions(+)

diff --git a/lib/pthreadpool/tests_cmocka.c b/lib/pthreadpool/tests_cmocka.c
index e6af8849f01d..dc7b1150b5c0 100644
--- a/lib/pthreadpool/tests_cmocka.c
+++ b/lib/pthreadpool/tests_cmocka.c
@@ -17,12 +17,16 @@
  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  */
 
+#include "config.h"
 #include <errno.h>
 #include <pthread.h>
 #include <setjmp.h>
 #include <stdlib.h>
 #include <string.h>
 #include <limits.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
 
 #include <talloc.h>
 #include <tevent.h>
@@ -31,6 +35,13 @@
 #include <cmocka.h>
 #include <poll.h>
 
+#ifdef HAVE_VALGRIND_HELGRIND_H
+#include <valgrind/helgrind.h>
+#endif
+#ifndef ANNOTATE_BENIGN_RACE_SIZED
+#define ANNOTATE_BENIGN_RACE_SIZED(address, size, describtion)
+#endif
+
 struct pthreadpool_tevent_test {
 	struct tevent_context *ev;
 	struct pthreadpool_tevent *upool;
@@ -233,12 +244,435 @@ static void test_create(void **state)
 	assert_false(in_main_thread);
 }
 
+struct test_cancel_job {
+	int fdm; /* the main end of socketpair */
+	int fdj; /* the job end of socketpair */
+	bool started;
+	bool canceled;
+	bool orphaned;
+	bool finished;
+	size_t polls;
+	size_t timeouts;
+	int sleep_msec;
+	struct tevent_req *req;
+	bool completed;
+	int ret;
+};
+
+static void test_cancel_job_done(struct tevent_req *req);
+
+static int test_cancel_job_destructor(struct test_cancel_job *job)
+{
+	ANNOTATE_BENIGN_RACE_SIZED(&job->started,
+				   sizeof(job->started),
+				   "protected by pthreadpool_tevent code");
+	if (job->started) {
+		ANNOTATE_BENIGN_RACE_SIZED(&job->finished,
+					   sizeof(job->finished),
+					   "protected by pthreadpool_tevent code");
+		assert_true(job->finished);
+	}
+
+	ANNOTATE_BENIGN_RACE_SIZED(&job->fdj,
+				   sizeof(job->fdj),
+				   "protected by pthreadpool_tevent code");
+
+	if (job->fdm != -1) {
+		close(job->fdm);
+		job->fdm = -1;
+	}
+	if (job->fdj != -1) {
+		close(job->fdj);
+		job->fdj = -1;
+	}
+
+	return 0;
+}
+
+static struct test_cancel_job *test_cancel_job_create(TALLOC_CTX *mem_ctx)
+{
+	struct test_cancel_job *job = NULL;
+
+	job = talloc(mem_ctx, struct test_cancel_job);
+	if (job == NULL) {
+		return NULL;
+	}
+	*job = (struct test_cancel_job) {
+		.fdm = -1,
+		.fdj = -1,
+		.sleep_msec = 50,
+	};
+
+	talloc_set_destructor(job, test_cancel_job_destructor);
+	return job;
+}
+
+static void test_cancel_job_fn(void *ptr)
+{
+	struct test_cancel_job *job = (struct test_cancel_job *)ptr;
+	int fdj = -1;
+	char c = 0;
+	int ret;
+
+	assert_non_null(job); /* make sure we abort without a job pointer */
+
+	job->started = true;
+	fdj = job->fdj;
+	job->fdj = -1;
+
+	if (!pthreadpool_tevent_current_job_continue()) {
+		job->canceled = pthreadpool_tevent_current_job_canceled();
+		job->orphaned = pthreadpool_tevent_current_job_orphaned();
+		job->finished = true;
+		close(fdj);
+		return;
+	}
+
+	/*
+	 * Notify that we main thread
+	 *
+	 * write of 1 byte should always work!
+	 */
+	ret = write(fdj, &c, 1);
+	assert_int_equal(ret, 1);
+
+	/*
+	 * loop until the job was tried to
+	 * be canceled or becomes orphaned.
+	 *
+	 * If there's some activity on the fd
+	 * we directly finish.
+	 */
+	do {
+		struct pollfd pfd = {
+			.fd = fdj,
+			.events = POLLIN,
+		};
+
+		job->polls += 1;
+
+		ret = poll(&pfd, 1, job->sleep_msec);
+		if (ret == 1) {
+			job->finished = true;
+			close(fdj);
+			return;
+		}
+		assert_int_equal(ret, 0);
+
+		job->timeouts += 1;
+
+	} while (pthreadpool_tevent_current_job_continue());
+
+	job->canceled = pthreadpool_tevent_current_job_canceled();
+	job->orphaned = pthreadpool_tevent_current_job_orphaned();
+	job->finished = true;
+	close(fdj);
+}
+
+static void test_cancel_job_done(struct tevent_req *req)
+{
+	struct test_cancel_job *job =
+		tevent_req_callback_data(req,
+		struct test_cancel_job);
+
+	job->ret = pthreadpool_tevent_job_recv(job->req);
+	TALLOC_FREE(job->req);
+	job->completed = true;
+}
+
+static void test_cancel_job_wait(struct test_cancel_job *job,
+				 struct tevent_context *ev)
+{
+	/*
+	 * We have to keep looping until
+	 * test_cancel_job_done was triggered
+	 */
+	while (!job->completed) {
+		int ret;
+
+		ret = tevent_loop_once(ev);
+		assert_int_equal(ret, 0);
+	}
+}
+
+struct test_cancel_state {
+	struct test_cancel_job *job1;
+	struct test_cancel_job *job2;
+	struct test_cancel_job *job3;
+	struct test_cancel_job *job4;
+	struct test_cancel_job *job5;
+	struct test_cancel_job *job6;
+};
+
+static void test_cancel_job(void **private_data)
+{
+	struct pthreadpool_tevent_test *t = *private_data;
+	struct tevent_context *ev = t->ev;
+	struct pthreadpool_tevent *pool = t->opool;
+	struct test_cancel_state *state = NULL;
+	int ret;
+	bool ok;
+	int fdpair[2] = { -1, -1 };
+	char c = 0;
+
+	state = talloc_zero(t, struct test_cancel_state);
+	assert_non_null(state);
+	state->job1 = test_cancel_job_create(state);
+	assert_non_null(state->job1);
+	state->job2 = test_cancel_job_create(state);
+	assert_non_null(state->job2);
+	state->job3 = test_cancel_job_create(state);
+	assert_non_null(state->job3);
+
+	ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
+	assert_int_equal(ret, 0);
+
+	state->job1->fdm = fdpair[0];
+	state->job1->fdj = fdpair[1];
+
+	assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 0);
+
+	will_return(__wrap_pthread_create, 0);
+	state->job1->req = pthreadpool_tevent_job_send(
+		state->job1, ev, pool, test_cancel_job_fn, state->job1);
+	assert_non_null(state->job1->req);
+	tevent_req_set_callback(state->job1->req,
+				test_cancel_job_done,
+				state->job1);
+
+	state->job2->req = pthreadpool_tevent_job_send(
+		state->job2, ev, pool, test_cancel_job_fn, NULL);
+	assert_non_null(state->job2->req);
+	tevent_req_set_callback(state->job2->req,
+				test_cancel_job_done,
+				state->job2);
+
+	state->job3->req = pthreadpool_tevent_job_send(
+		state->job3, ev, pool, test_cancel_job_fn, NULL);
+	assert_non_null(state->job3->req);
+	tevent_req_set_callback(state->job3->req,
+				test_cancel_job_done,
+				state->job3);
+
+	/*
+	 * Wait for the job 1 to start.
+	 */
+	ret = read(state->job1->fdm, &c, 1);
+	assert_int_equal(ret, 1);
+
+	/*
+	 * We cancel job 3 and destroy job2.
+	 * Both should never be executed.
+	 */
+	assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 2);
+	TALLOC_FREE(state->job2->req);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 1);
+	ok = tevent_req_cancel(state->job3->req);
+	assert_true(ok);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 0);
+
+	/*
+	 * Job 3 should complete as canceled, while
+	 * job 1 is still running.
+	 */
+	test_cancel_job_wait(state->job3, ev);
+	assert_int_equal(state->job3->ret, ECANCELED);
+	assert_null(state->job3->req);
+	assert_false(state->job3->started);
+
+	/*
+	 * Now job1 is canceled while it's running,
+	 * this should let it stop it's loop.
+	 */
+	ok = tevent_req_cancel(state->job1->req);
+	assert_false(ok);
+
+	/*
+	 * Job 1 completes, It got at least one sleep
+	 * timeout loop and has state->job1->canceled set.
+	 */
+	test_cancel_job_wait(state->job1, ev);
+	assert_int_equal(state->job1->ret, 0);
+	assert_null(state->job1->req);
+	assert_true(state->job1->started);
+	assert_true(state->job1->finished);
+	assert_true(state->job1->canceled);
+	assert_false(state->job1->orphaned);
+	assert_in_range(state->job1->polls, 1, 100);
+	assert_int_equal(state->job1->timeouts, state->job1->polls);
+
+	/*
+	 * Now we create jobs 4 and 5
+	 * Both should execute.
+	 * Job 4 is orphaned while running by a TALLOC_FREE()
+	 * This should stop job 4 and let job 5 start.
+	 * We do a "normal" exit in job 5 by creating some activity
+	 * on the socketpair.
+	 */
+
+	state->job4 = test_cancel_job_create(state);
+	assert_non_null(state->job4);
+
+	ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
+	assert_int_equal(ret, 0);
+
+	state->job4->fdm = fdpair[0];
+	state->job4->fdj = fdpair[1];
+
+	state->job4->req = pthreadpool_tevent_job_send(
+		state->job4, ev, pool, test_cancel_job_fn, state->job4);
+	assert_non_null(state->job4->req);
+	tevent_req_set_callback(state->job4->req,
+				test_cancel_job_done,
+				state->job4);
+
+	state->job5 = test_cancel_job_create(state);
+	assert_non_null(state->job5);
+
+	ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
+	assert_int_equal(ret, 0);
+
+	state->job5->fdm = fdpair[0];
+	state->job5->fdj = fdpair[1];
+
+	state->job5->req = pthreadpool_tevent_job_send(
+		state->job5, ev, pool, test_cancel_job_fn, state->job5);
+	assert_non_null(state->job5->req);
+	tevent_req_set_callback(state->job5->req,
+				test_cancel_job_done,
+				state->job5);
+
+	/*
+	 * Make sure job 5 can exit as soon as possible.
+	 * It will never get a sleep/poll timeout.
+	 */
+	ret = write(state->job5->fdm, &c, 1);
+	assert_int_equal(ret, 1);
+
+	/*
+	 * Wait for the job 4 to start
+	 */
+	ret = read(state->job4->fdm, &c, 1);
+	assert_int_equal(ret, 1);
+
+	assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 1);
+
+	/*
+	 * destroy the request so that it's marked
+	 * as orphaned.
+	 */
+	TALLOC_FREE(state->job4->req);
+
+	/*
+	 * Job 5 completes, It got no sleep timeout loop.
+	 */
+	test_cancel_job_wait(state->job5, ev);
+	assert_int_equal(state->job5->ret, 0);
+	assert_null(state->job5->req);
+	assert_true(state->job5->started);
+	assert_true(state->job5->finished);
+	assert_false(state->job5->canceled);
+	assert_false(state->job5->orphaned);
+	assert_int_equal(state->job5->polls, 1);
+	assert_int_equal(state->job5->timeouts, 0);
+
+	assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 0);
+
+	/*
+	 * Job 2 is still not executed as we did a TALLOC_FREE()
+	 * before is was scheduled.
+	 */
+	assert_false(state->job2->completed);
+	assert_false(state->job2->started);
+
+	/*
+	 * Job 4 is still wasn't completed as we did a TALLOC_FREE()
+	 * while it is was running. but it was started and has
+	 * orphaned set
+	 */
+	assert_false(state->job4->completed);
+	assert_true(state->job4->started);
+	assert_true(state->job4->finished);
+	assert_false(state->job4->canceled);
+	assert_true(state->job4->orphaned);
+	assert_in_range(state->job4->polls, 1, 100);
+	assert_int_equal(state->job4->timeouts, state->job4->polls);
+
+	/*
+	 * Now we create jobs 6
+	 * We destroy the pool while it's executing.
+	 */
+
+	state->job6 = test_cancel_job_create(state);
+	assert_non_null(state->job6);
+
+	ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
+	assert_int_equal(ret, 0);
+
+	state->job6->fdm = fdpair[0];
+	state->job6->fdj = fdpair[1];
+
+	state->job6->req = pthreadpool_tevent_job_send(
+		state->job6, ev, pool, test_cancel_job_fn, state->job6);
+	assert_non_null(state->job6->req);
+	tevent_req_set_callback(state->job6->req,
+				test_cancel_job_done,
+				state->job6);
+
+	/*
+	 * Wait for the job 6 to start
+	 */
+	ret = read(state->job6->fdm, &c, 1);
+	assert_int_equal(ret, 1);
+
+	assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 0);
+
+	/*
+	 * destroy the request so that it's marked
+	 * as orphaned.
+	 */
+	pool = NULL;
+	TALLOC_FREE(t->opool);
+
+	/*
+	 * Wait until the job finished.
+	 */
+	ret = read(state->job6->fdm, &c, 1);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * Job 6 is still dangling arround.
+	 *
+	 * We need to convince valgrind --tool={drd,helgrind}
+	 * that the read above is good enough to be
+	 * sure the job is finished and closed the other end of
+	 * the socketpair.
+	 */
+	ANNOTATE_BENIGN_RACE_SIZED(state->job6,
+				   sizeof(*state->job6),
+				   "protected by thread fence");
+	assert_non_null(state->job6->req);
+	assert_true(tevent_req_is_in_progress(state->job6->req));
+	assert_false(state->job6->completed);
+	assert_true(state->job6->started);
+	assert_true(state->job6->finished);
+	assert_false(state->job6->canceled);
+	assert_true(state->job6->orphaned);
+	assert_in_range(state->job6->polls, 1, 100);
+	assert_int_equal(state->job6->timeouts, state->job4->polls);
+
+	TALLOC_FREE(state);
+}
+
 int main(int argc, char **argv)
 {
 	const struct CMUnitTest tests[] = {
 		cmocka_unit_test_setup_teardown(test_create,
 						setup_pthreadpool_tevent,
 						teardown_pthreadpool_tevent),
+		cmocka_unit_test_setup_teardown(test_cancel_job,
+						setup_pthreadpool_tevent,
+						teardown_pthreadpool_tevent),
 	};
 
 	cmocka_set_message_output(CM_OUTPUT_SUBUNIT);
-- 
2.17.1


From cd871a7142f7ea3acbd0467ccdb6a25d17f80b18 Mon Sep 17 00:00:00 2001
From: Ralph Boehme <slow at samba.org>
Date: Tue, 13 Mar 2018 16:58:49 +0100
Subject: [PATCH 15/23] configure: check for Linux specific unshare() with
 CLONE_FS

Note we still need some kind of runtime detection as
it can fail in some constraint container setups, which
reject the whole unshare() syscall instead of just the
once used for container features.

In case unshare(CLONE_FS) works, we can have a per thread
current working directory and use [f]chdir() safely in
worker threads.

Pair-Programmed-With: Stefan Metzmacher <metze at samba.org>

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 source3/wscript | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/source3/wscript b/source3/wscript
index aed784ef0179..a14d76d7469e 100644
--- a/source3/wscript
+++ b/source3/wscript
@@ -1505,6 +1505,11 @@ main() {
                 legacy_quota_libs = ''
     conf.env['legacy_quota_libs'] = legacy_quota_libs
 
+    conf.CHECK_CODE('(void)unshare(CLONE_FS);',
+                    headers='sched.h',
+                    define='HAVE_UNSHARE_CLONE_FS',
+                    msg='for Linux unshare(CLONE_FS)')
+
     #
     # cluster support (CTDB)
     #
-- 
2.17.1


From 6f7dc7762135607ac43ef51f6a95dc0aff6de19b Mon Sep 17 00:00:00 2001
From: Ralph Boehme <slow at samba.org>
Date: Tue, 13 Mar 2018 16:59:32 +0100
Subject: [PATCH 16/23] pthreadpool: call unshare(CLONE_FS) if available

This paves the way for pthreadpool jobs that are path based.

Callers can use pthreadpool_per_thread_cwd() to check if
the current pool supports it.

Pair-Programmed-With: Stefan Metzmacher <metze at samba.org>

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/pthreadpool/pthreadpool.c      | 34 ++++++++++++++++++++++++++++++
 lib/pthreadpool/pthreadpool.h      | 17 +++++++++++++++
 lib/pthreadpool/pthreadpool_sync.c |  5 +++++
 3 files changed, 56 insertions(+)

diff --git a/lib/pthreadpool/pthreadpool.c b/lib/pthreadpool/pthreadpool.c
index 528f4cdfd678..127e684c63e3 100644
--- a/lib/pthreadpool/pthreadpool.c
+++ b/lib/pthreadpool/pthreadpool.c
@@ -112,10 +112,13 @@ struct pthreadpool {
 	 * where the forking thread will unlock it again.
 	 */
 	pthread_mutex_t fork_mutex;
+
+	bool per_thread_cwd;
 };
 
 static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
 static struct pthreadpool *pthreadpools = NULL;
+static bool pthreadpool_support_thread_cwd = false;
 static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
 
 static void pthreadpool_prep_atfork(void);
@@ -182,6 +185,11 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
 	pool->max_threads = max_threads;
 	pool->num_idle = 0;
 	pool->prefork_cond = NULL;
+	if (max_threads != 0) {
+		pool->per_thread_cwd = pthreadpool_support_thread_cwd;
+	} else {
+		pool->per_thread_cwd = false;
+	}
 
 	ret = pthread_mutex_lock(&pthreadpools_mutex);
 	if (ret != 0) {
@@ -241,6 +249,15 @@ size_t pthreadpool_queued_jobs(struct pthreadpool *pool)
 	return ret;
 }
 
+bool pthreadpool_per_thread_cwd(struct pthreadpool *pool)
+{
+	if (pool->stopped) {
+		return false;
+	}
+
+	return pool->per_thread_cwd;
+}
+
 static void pthreadpool_prepare_pool(struct pthreadpool *pool)
 {
 	int ret;
@@ -359,6 +376,16 @@ static void pthreadpool_child(void)
 
 static void pthreadpool_prep_atfork(void)
 {
+#ifdef HAVE_UNSHARE_CLONE_FS
+	int res;
+
+	/* remember if unshare(CLONE_FS) works. */
+	res = unshare(CLONE_FS);
+	if (res == 0) {
+		pthreadpool_support_thread_cwd = true;
+	}
+#endif
+
 	pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
 		       pthreadpool_child);
 }
@@ -571,6 +598,13 @@ static void *pthreadpool_server(void *arg)
 	struct pthreadpool *pool = (struct pthreadpool *)arg;
 	int res;
 
+#ifdef HAVE_UNSHARE_CLONE_FS
+	if (pool->per_thread_cwd) {
+		res = unshare(CLONE_FS);
+		assert(res == 0);
+	}
+#endif
+
 	res = pthread_mutex_lock(&pool->mutex);
 	if (res != 0) {
 		return NULL;
diff --git a/lib/pthreadpool/pthreadpool.h b/lib/pthreadpool/pthreadpool.h
index b4733580e07b..d8daf9e4519b 100644
--- a/lib/pthreadpool/pthreadpool.h
+++ b/lib/pthreadpool/pthreadpool.h
@@ -71,6 +71,23 @@ size_t pthreadpool_max_threads(struct pthreadpool *pool);
  */
 size_t pthreadpool_queued_jobs(struct pthreadpool *pool);
 
+/**
+ * @brief Check for per thread current working directory support of pthreadpool
+ *
+ * Since Linux kernel 2.6.16, unshare(CLONE_FS) is supported,
+ * which provides a per thread current working directory
+ * and allows [f]chdir() within the worker threads.
+ *
+ * Note that this doesn't work on some contraint container setups,
+ * the complete unshare() syscall might be rejected.
+ * pthreadpool_per_thread_cwd() returns what is available
+ * at runtime, so the callers should really check this!
+ *
+ * @param[in]	pool		The pool to run the job on
+ * @return			supported: true, otherwise: false
+ */
+bool pthreadpool_per_thread_cwd(struct pthreadpool *pool);
+
 /**
  * @brief Stop a pthreadpool
  *
diff --git a/lib/pthreadpool/pthreadpool_sync.c b/lib/pthreadpool/pthreadpool_sync.c
index 48e6a0ddb604..2ed6f36dbbc7 100644
--- a/lib/pthreadpool/pthreadpool_sync.c
+++ b/lib/pthreadpool/pthreadpool_sync.c
@@ -65,6 +65,11 @@ size_t pthreadpool_queued_jobs(struct pthreadpool *pool)
 	return 0;
 }
 
+bool pthreadpool_per_thread_cwd(struct pthreadpool *pool)
+{
+	return false;
+}
+
 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
 			void (*fn)(void *private_data), void *private_data)
 {
-- 
2.17.1


From 8b8f65aeda7570b2e961bc4b865ad42ce53b4f80 Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Fri, 22 Jun 2018 01:02:41 +0200
Subject: [PATCH 17/23] pthreadpool: add
 pthreadpool_tevent_[current_job_]per_thread_cwd()

This can be used to check if worker threads run with
unshare(CLONE_FS).

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/pthreadpool/pthreadpool_tevent.c | 26 ++++++++++++++++++++++++++
 lib/pthreadpool/pthreadpool_tevent.h |  7 +++++++
 2 files changed, 33 insertions(+)

diff --git a/lib/pthreadpool/pthreadpool_tevent.c b/lib/pthreadpool/pthreadpool_tevent.c
index 94b6b9ded8ff..01e8586b384d 100644
--- a/lib/pthreadpool/pthreadpool_tevent.c
+++ b/lib/pthreadpool/pthreadpool_tevent.c
@@ -210,6 +210,8 @@ struct pthreadpool_tevent_job {
 		 */
 		bool signaled;
 	} needs_fence;
+
+	bool per_thread_cwd;
 };
 
 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool);
@@ -283,6 +285,15 @@ size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool)
 	return pthreadpool_queued_jobs(pool->pool);
 }
 
+bool pthreadpool_tevent_per_thread_cwd(struct pthreadpool_tevent *pool)
+{
+	if (pool->pool == NULL) {
+		return false;
+	}
+
+	return pthreadpool_per_thread_cwd(pool->pool);
+}
+
 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
 {
 	struct pthreadpool_tevent_job *job = NULL;
@@ -701,6 +712,7 @@ struct tevent_req *pthreadpool_tevent_job_send(
 		return tevent_req_post(req, ev);
 	}
 	PTHREAD_TEVENT_JOB_THREAD_FENCE_INIT(job);
+	job->per_thread_cwd = pthreadpool_tevent_per_thread_cwd(pool);
 	talloc_set_destructor(job, pthreadpool_tevent_job_destructor);
 	DLIST_ADD_END(job->pool->jobs, job);
 	job->state = state;
@@ -772,6 +784,20 @@ bool pthreadpool_tevent_current_job_continue(void)
 	return true;
 }
 
+bool pthreadpool_tevent_current_job_per_thread_cwd(void)
+{
+	if (current_job == NULL) {
+		/*
+		 * Should only be called from within
+		 * the job function.
+		 */
+		abort();
+		return false;
+	}
+
+	return current_job->per_thread_cwd;
+}
+
 static void pthreadpool_tevent_job_fn(void *private_data)
 {
 	struct pthreadpool_tevent_job *job =
diff --git a/lib/pthreadpool/pthreadpool_tevent.h b/lib/pthreadpool/pthreadpool_tevent.h
index 37e491e17c47..ff2ab7cfb73d 100644
--- a/lib/pthreadpool/pthreadpool_tevent.h
+++ b/lib/pthreadpool/pthreadpool_tevent.h
@@ -31,6 +31,7 @@ int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
 
 size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool);
 size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool);
+bool pthreadpool_tevent_per_thread_cwd(struct pthreadpool_tevent *pool);
 
 /*
  * return true - if tevent_req_cancel() was called.
@@ -46,6 +47,12 @@ bool pthreadpool_tevent_current_job_orphaned(void);
  */
 bool pthreadpool_tevent_current_job_continue(void);
 
+/*
+ * return true if the current job can rely on a per thread
+ * current working directory.
+ */
+bool pthreadpool_tevent_current_job_per_thread_cwd(void);
+
 struct tevent_req *pthreadpool_tevent_job_send(
 	TALLOC_CTX *mem_ctx, struct tevent_context *ev,
 	struct pthreadpool_tevent *pool,
-- 
2.17.1


From b4e7b97abc52fd46c33e6470c8e09943c1b38da1 Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Wed, 18 Jul 2018 10:21:22 +0200
Subject: [PATCH 18/23] pthreadpool: add tests for
 pthreadpool_tevent_[current_job_]per_thread_cwd()

Note this currently this doesn't enforce the support for
unshare(CLONE_FS) as some contraint container environment
(e.g. docker) reject the whole unshare() system call.

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/pthreadpool/tests_cmocka.c | 144 +++++++++++++++++++++++++++++++++
 1 file changed, 144 insertions(+)

diff --git a/lib/pthreadpool/tests_cmocka.c b/lib/pthreadpool/tests_cmocka.c
index dc7b1150b5c0..5c7f6ab6904b 100644
--- a/lib/pthreadpool/tests_cmocka.c
+++ b/lib/pthreadpool/tests_cmocka.c
@@ -244,6 +244,147 @@ static void test_create(void **state)
 	assert_false(in_main_thread);
 }
 
+static void test_per_thread_cwd_job(void *ptr)
+{
+	const bool *per_thread_cwd_ptr = ptr;
+	bool per_thread_cwd;
+	char cwdbuf[PATH_MAX] = {0,};
+	char *cwdstr = NULL;
+	int ret;
+
+	/*
+	 * This needs to be consistent.
+	 */
+	per_thread_cwd = pthreadpool_tevent_current_job_per_thread_cwd();
+	assert_int_equal(per_thread_cwd, *per_thread_cwd_ptr);
+
+	if (!per_thread_cwd) {
+		return;
+	}
+
+	/*
+	 * Check we're not already in "/".
+	 */
+	cwdstr = getcwd(cwdbuf, sizeof(cwdbuf));
+	assert_non_null(cwdstr);
+	assert_string_not_equal(cwdstr, "/");
+
+	ret = chdir("/");
+	assert_int_equal(ret, 0);
+
+	/*
+	 * Check we're in "/" now.
+	 */
+	cwdstr = getcwd(cwdbuf, sizeof(cwdbuf));
+	assert_non_null(cwdstr);
+	assert_string_equal(cwdstr, "/");
+}
+
+static int test_per_thread_cwd_do(struct tevent_context *ev,
+				  struct pthreadpool_tevent *pool)
+{
+	struct tevent_req *req;
+	bool per_thread_cwd;
+	bool ok;
+	int ret;
+	per_thread_cwd = pthreadpool_tevent_per_thread_cwd(pool);
+
+	req = pthreadpool_tevent_job_send(
+		ev, ev, pool, test_per_thread_cwd_job, &per_thread_cwd);
+	if (req == NULL) {
+		fprintf(stderr, "pthreadpool_tevent_job_send failed\n");
+		return ENOMEM;
+	}
+
+	ok = tevent_req_poll(req, ev);
+	if (!ok) {
+		ret = errno;
+		fprintf(stderr, "tevent_req_poll failed: %s\n",
+			strerror(ret));
+		return ret;
+	}
+
+	ret = pthreadpool_tevent_job_recv(req);
+	TALLOC_FREE(req);
+	if (ret != 0) {
+		fprintf(stderr, "tevent_req_recv failed: %s\n",
+			strerror(ret));
+		return ret;
+	}
+
+	return 0;
+}
+
+static void test_per_thread_cwd(void **state)
+{
+	struct pthreadpool_tevent_test *t = *state;
+	int ret;
+	bool per_thread_cwd_u;
+	bool per_thread_cwd_o;
+	bool per_thread_cwd_s;
+	char cwdbuf1[PATH_MAX] = {0,};
+	char *cwdstr1 = NULL;
+	char cwdbuf2[PATH_MAX] = {0,};
+	char *cwdstr2 = NULL;
+
+	/*
+	 * The unlimited and one pools
+	 * should be consistent.
+	 *
+	 * We can't enforce this as some constraint
+	 * container environments disable unshare()
+	 * completely, even just with CLONE_FS.
+	 */
+	per_thread_cwd_u = pthreadpool_tevent_per_thread_cwd(t->upool);
+	per_thread_cwd_o = pthreadpool_tevent_per_thread_cwd(t->opool);
+	assert_int_equal(per_thread_cwd_u, per_thread_cwd_o);
+
+	/*
+	 * The sync pool should never support this.
+	 */
+	per_thread_cwd_s = pthreadpool_tevent_per_thread_cwd(t->spool);
+	assert_false(per_thread_cwd_s);
+
+	/*
+	 * Check we're not already in "/".
+	 */
+	cwdstr1 = getcwd(cwdbuf1, sizeof(cwdbuf1));
+	assert_non_null(cwdstr1);
+	assert_string_not_equal(cwdstr1, "/");
+
+	will_return(__wrap_pthread_create, 0);
+	ret = test_per_thread_cwd_do(t->ev, t->upool);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * Check we're still in the same directory.
+	 */
+	cwdstr2 = getcwd(cwdbuf2, sizeof(cwdbuf2));
+	assert_non_null(cwdstr2);
+	assert_string_equal(cwdstr2, cwdstr1);
+
+	will_return(__wrap_pthread_create, 0);
+	ret = test_per_thread_cwd_do(t->ev, t->opool);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * Check we're still in the same directory.
+	 */
+	cwdstr2 = getcwd(cwdbuf2, sizeof(cwdbuf2));
+	assert_non_null(cwdstr2);
+	assert_string_equal(cwdstr2, cwdstr1);
+
+	ret = test_per_thread_cwd_do(t->ev, t->spool);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * Check we're still in the same directory.
+	 */
+	cwdstr2 = getcwd(cwdbuf2, sizeof(cwdbuf2));
+	assert_non_null(cwdstr2);
+	assert_string_equal(cwdstr2, cwdstr1);
+}
+
 struct test_cancel_job {
 	int fdm; /* the main end of socketpair */
 	int fdj; /* the job end of socketpair */
@@ -670,6 +811,9 @@ int main(int argc, char **argv)
 		cmocka_unit_test_setup_teardown(test_create,
 						setup_pthreadpool_tevent,
 						teardown_pthreadpool_tevent),
+		cmocka_unit_test_setup_teardown(test_per_thread_cwd,
+						setup_pthreadpool_tevent,
+						teardown_pthreadpool_tevent),
 		cmocka_unit_test_setup_teardown(test_cancel_job,
 						setup_pthreadpool_tevent,
 						teardown_pthreadpool_tevent),
-- 
2.17.1


From 9e85522ca5d26767b4c8e035166c54942cf92584 Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Mon, 16 Jul 2018 14:43:01 +0200
Subject: [PATCH 19/23] pthreadpool: add
 pthreadpool_restart_check[_monitor_{fd,drain}]()

This makes it possible to monitor the pthreadpool for exited worker
threads and may restart new threads from the main thread again.

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/pthreadpool/pthreadpool.c      | 281 +++++++++++++++++++++++++++++
 lib/pthreadpool/pthreadpool.h      |  64 +++++++
 lib/pthreadpool/pthreadpool_sync.c |  20 ++
 3 files changed, 365 insertions(+)

diff --git a/lib/pthreadpool/pthreadpool.c b/lib/pthreadpool/pthreadpool.c
index 127e684c63e3..db3837cbda37 100644
--- a/lib/pthreadpool/pthreadpool.c
+++ b/lib/pthreadpool/pthreadpool.c
@@ -23,6 +23,7 @@
 #include "system/threads.h"
 #include "pthreadpool.h"
 #include "lib/util/dlinklist.h"
+#include "lib/util/blocking.h"
 
 #ifdef NDEBUG
 #undef NDEBUG
@@ -52,6 +53,8 @@ struct pthreadpool {
 	 */
 	pthread_cond_t condvar;
 
+	int check_pipefd[2];
+
 	/*
 	 * Array of jobs
 	 */
@@ -136,6 +139,7 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
 {
 	struct pthreadpool *pool;
 	int ret;
+	bool ok;
 
 	pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
 	if (pool == NULL) {
@@ -153,10 +157,52 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
 		return ENOMEM;
 	}
 
+	ret = pipe(pool->check_pipefd);
+	if (ret != 0) {
+		free(pool->jobs);
+		free(pool);
+		return ENOMEM;
+	}
+
+	ok = smb_set_close_on_exec(pool->check_pipefd[0]);
+	if (!ok) {
+		close(pool->check_pipefd[0]);
+		close(pool->check_pipefd[1]);
+		free(pool->jobs);
+		free(pool);
+		return EINVAL;
+	}
+	ok = smb_set_close_on_exec(pool->check_pipefd[1]);
+	if (!ok) {
+		close(pool->check_pipefd[0]);
+		close(pool->check_pipefd[1]);
+		free(pool->jobs);
+		free(pool);
+		return EINVAL;
+	}
+	ret = set_blocking(pool->check_pipefd[0], true);
+	if (ret == -1) {
+		close(pool->check_pipefd[0]);
+		close(pool->check_pipefd[1]);
+		free(pool->jobs);
+		free(pool);
+		return EINVAL;
+	}
+	ret = set_blocking(pool->check_pipefd[1], false);
+	if (ret == -1) {
+		close(pool->check_pipefd[0]);
+		close(pool->check_pipefd[1]);
+		free(pool->jobs);
+		free(pool);
+		return EINVAL;
+	}
+
 	pool->head = pool->num_jobs = 0;
 
 	ret = pthread_mutex_init(&pool->mutex, NULL);
 	if (ret != 0) {
+		close(pool->check_pipefd[0]);
+		close(pool->check_pipefd[1]);
 		free(pool->jobs);
 		free(pool);
 		return ret;
@@ -165,6 +211,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
 	ret = pthread_cond_init(&pool->condvar, NULL);
 	if (ret != 0) {
 		pthread_mutex_destroy(&pool->mutex);
+		close(pool->check_pipefd[0]);
+		close(pool->check_pipefd[1]);
 		free(pool->jobs);
 		free(pool);
 		return ret;
@@ -174,6 +222,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
 	if (ret != 0) {
 		pthread_cond_destroy(&pool->condvar);
 		pthread_mutex_destroy(&pool->mutex);
+		close(pool->check_pipefd[0]);
+		close(pool->check_pipefd[1]);
 		free(pool->jobs);
 		free(pool);
 		return ret;
@@ -196,6 +246,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
 		pthread_mutex_destroy(&pool->fork_mutex);
 		pthread_cond_destroy(&pool->condvar);
 		pthread_mutex_destroy(&pool->mutex);
+		close(pool->check_pipefd[0]);
+		close(pool->check_pipefd[1]);
 		free(pool->jobs);
 		free(pool);
 		return ret;
@@ -359,6 +411,14 @@ static void pthreadpool_child(void)
 		pool->head = 0;
 		pool->num_jobs = 0;
 		pool->stopped = true;
+		if (pool->check_pipefd[0] != -1) {
+			close(pool->check_pipefd[0]);
+			pool->check_pipefd[0] = -1;
+		}
+		if (pool->check_pipefd[1] != -1) {
+			close(pool->check_pipefd[1]);
+			pool->check_pipefd[1] = -1;
+		}
 
 		ret = pthread_cond_init(&pool->condvar, NULL);
 		assert(ret == 0);
@@ -421,6 +481,14 @@ static int pthreadpool_free(struct pthreadpool *pool)
 		return ret2;
 	}
 
+	if (pool->check_pipefd[0] != -1) {
+		close(pool->check_pipefd[0]);
+		pool->check_pipefd[0] = -1;
+	}
+	if (pool->check_pipefd[1] != -1) {
+		close(pool->check_pipefd[1]);
+		pool->check_pipefd[1] = -1;
+	}
 	free(pool->jobs);
 	free(pool);
 
@@ -437,6 +505,15 @@ static int pthreadpool_stop_locked(struct pthreadpool *pool)
 
 	pool->stopped = true;
 
+	if (pool->check_pipefd[0] != -1) {
+		close(pool->check_pipefd[0]);
+		pool->check_pipefd[0] = -1;
+	}
+	if (pool->check_pipefd[1] != -1) {
+		close(pool->check_pipefd[1]);
+		pool->check_pipefd[1] = -1;
+	}
+
 	if (pool->num_threads == 0) {
 		return 0;
 	}
@@ -521,6 +598,33 @@ static void pthreadpool_server_exit(struct pthreadpool *pool)
 
 	free_it = (pool->destroyed && (pool->num_threads == 0));
 
+	while (true) {
+		uint8_t c = 0;
+		ssize_t nwritten = 0;
+
+		if (pool->check_pipefd[1] == -1) {
+			break;
+		}
+
+		nwritten = write(pool->check_pipefd[1], &c, 1);
+		if (nwritten == -1) {
+			if (errno == EINTR) {
+				continue;
+			}
+			if (errno == EAGAIN) {
+				break;
+			}
+#ifdef EWOULDBLOCK
+			if (errno == EWOULDBLOCK) {
+				break;
+			}
+#endif
+			/* ignore ... */
+		}
+
+		break;
+	}
+
 	ret = pthread_mutex_unlock(&pool->mutex);
 	assert(ret == 0);
 
@@ -851,6 +955,183 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
 	return res;
 }
 
+int pthreadpool_restart_check(struct pthreadpool *pool)
+{
+	int res;
+	int unlock_res;
+	unsigned possible_threads = 0;
+	unsigned missing_threads = 0;
+
+	assert(!pool->destroyed);
+
+	res = pthread_mutex_lock(&pool->mutex);
+	if (res != 0) {
+		return res;
+	}
+
+	if (pool->stopped) {
+		/*
+		 * Protect against the pool being shut down while
+		 * trying to add a job
+		 */
+		unlock_res = pthread_mutex_unlock(&pool->mutex);
+		assert(unlock_res == 0);
+		return EINVAL;
+	}
+
+	if (pool->num_jobs == 0) {
+		/*
+		 * This also handles the pool->max_threads == 0 case as it never
+		 * calls pthreadpool_put_job()
+		 */
+		unlock_res = pthread_mutex_unlock(&pool->mutex);
+		assert(unlock_res == 0);
+		return 0;
+	}
+
+	if (pool->num_idle > 0) {
+		/*
+		 * We have idle threads and pending jobs,
+		 * this means we better let all threads
+		 * start and check for pending jobs.
+		 */
+		res = pthread_cond_broadcast(&pool->condvar);
+		assert(res == 0);
+	}
+
+	if (pool->num_threads < pool->max_threads) {
+		possible_threads = pool->max_threads - pool->num_threads;
+	}
+
+	if (pool->num_idle < pool->num_jobs) {
+		missing_threads = pool->num_jobs - pool->num_idle;
+	}
+
+	missing_threads = MIN(missing_threads, possible_threads);
+
+	while (missing_threads > 0) {
+
+		res = pthreadpool_create_thread(pool);
+		if (res != 0) {
+			break;
+		}
+
+		missing_threads--;
+	}
+
+	if (missing_threads == 0) {
+		/*
+		 * Ok, we recreated all thread we need.
+		 */
+		unlock_res = pthread_mutex_unlock(&pool->mutex);
+		assert(unlock_res == 0);
+		return 0;
+	}
+
+	if (pool->num_threads != 0) {
+		/*
+		 * At least one thread is still available, let
+		 * that one run the queued jobs.
+		 */
+		unlock_res = pthread_mutex_unlock(&pool->mutex);
+		assert(unlock_res == 0);
+		return 0;
+	}
+
+	/*
+	 * There's no thread available to run any pending jobs.
+	 * The caller may want to cancel the jobs and destroy the pool.
+	 * But that's up to the caller.
+	 */
+	unlock_res = pthread_mutex_unlock(&pool->mutex);
+	assert(unlock_res == 0);
+
+	return res;
+}
+
+int pthreadpool_restart_check_monitor_fd(struct pthreadpool *pool)
+{
+	int fd;
+	int ret;
+	bool ok;
+
+	if (pool->stopped) {
+		errno = EINVAL;
+		return -1;
+	}
+
+	if (pool->check_pipefd[0] == -1) {
+		errno = ENOSYS;
+		return -1;
+	}
+
+	fd = dup(pool->check_pipefd[0]);
+	if (fd == -1) {
+		return -1;
+	}
+
+	ok = smb_set_close_on_exec(fd);
+	if (!ok) {
+		int saved_errno = errno;
+		close(fd);
+		errno = saved_errno;
+		return -1;
+	}
+
+	ret = set_blocking(fd, false);
+	if (ret == -1) {
+		int saved_errno = errno;
+		close(fd);
+		errno = saved_errno;
+		return -1;
+	}
+
+	return fd;
+}
+
+int pthreadpool_restart_check_monitor_drain(struct pthreadpool *pool)
+{
+	if (pool->stopped) {
+		return EINVAL;
+	}
+
+	if (pool->check_pipefd[0] == -1) {
+		return ENOSYS;
+	}
+
+	while (true) {
+		uint8_t buf[128];
+		ssize_t nread;
+
+		nread = read(pool->check_pipefd[0], buf, sizeof(buf));
+		if (nread == -1) {
+			if (errno == EINTR) {
+				continue;
+			}
+			if (errno == EAGAIN) {
+				return 0;
+			}
+#ifdef EWOULDBLOCK
+			if (errno == EWOULDBLOCK) {
+				return 0;
+			}
+#endif
+			if (errno == 0) {
+				errno = INT_MAX;
+			}
+
+			return errno;
+		}
+
+		if (nread < sizeof(buf)) {
+			return 0;
+		}
+	}
+
+	abort();
+	return INT_MAX;
+}
+
 size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id,
 			      void (*fn)(void *private_data), void *private_data)
 {
diff --git a/lib/pthreadpool/pthreadpool.h b/lib/pthreadpool/pthreadpool.h
index d8daf9e4519b..543567ceaf78 100644
--- a/lib/pthreadpool/pthreadpool.h
+++ b/lib/pthreadpool/pthreadpool.h
@@ -144,6 +144,70 @@ int pthreadpool_destroy(struct pthreadpool *pool);
 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
 			void (*fn)(void *private_data), void *private_data);
 
+/**
+ * @brief Check if the pthreadpool needs a restart.
+ *
+ * This checks if there are enough threads to run the already
+ * queued jobs. This should be called only the callers signal_fn
+ * (passed to pthreadpool_init()) returned an error, so
+ * that the job's worker thread exited.
+ *
+ * Typically this is called once the file destriptor
+ * returned by pthreadpool_restart_check_monitor_fd()
+ * became readable and pthreadpool_restart_check_monitor_drain()
+ * returned success.
+ *
+ * This function tries to restart the missing threads.
+ *
+ * @param[in]	pool		The pool to run the job on
+ * @return			success: 0, failure: errno
+ *
+ * @see pthreadpool_restart_check_monitor_fd
+ * @see pthreadpool_restart_check_monitor_drain
+ */
+int pthreadpool_restart_check(struct pthreadpool *pool);
+
+/**
+ * @brief Return a file destriptor that monitors the pool.
+ *
+ * If the file destrictor becomes readable,
+ * the event handler should call pthreadpool_restart_check_monitor_drain().
+ *
+ * pthreadpool_restart_check() should also be called once the
+ * state is drained.
+ *
+ * This function returns a fresh fd using dup() each time.
+ *
+ * If the pool doesn't require restarts, this function
+ * returns -1 and sets errno = ENOSYS. The caller
+ * may ignore that situation.
+ *
+ * @param[in]	pool		The pool to run the job on
+ * @return			success: 0, failure: -1 (set errno)
+ *
+ * @see pthreadpool_restart_check_monitor_fd
+ * @see pthreadpool_restart_check_monitor_drain
+ */
+int pthreadpool_restart_check_monitor_fd(struct pthreadpool *pool);
+
+/**
+ * @brief Drain the monitor file destriptor of the pool.
+ *
+ * If the file destrictor returned by pthreadpool_restart_check_monitor_fd()
+ * becomes readable, pthreadpool_restart_check_monitor_drain() should be
+ * called before pthreadpool_restart_check().
+ *
+ * If this function returns an error the caller should close
+ * the file destriptor it got from pthreadpool_restart_check_monitor_fd().
+ *
+ * @param[in]	pool		The pool to run the job on
+ * @return			success: 0, failure: errno
+ *
+ * @see pthreadpool_restart_check_monitor_fd
+ * @see pthreadpool_restart_check
+ */
+int pthreadpool_restart_check_monitor_drain(struct pthreadpool *pool);
+
 /**
  * @brief Try to cancel a job in a pthreadpool
  *
diff --git a/lib/pthreadpool/pthreadpool_sync.c b/lib/pthreadpool/pthreadpool_sync.c
index 2ed6f36dbbc7..a476ea712c3a 100644
--- a/lib/pthreadpool/pthreadpool_sync.c
+++ b/lib/pthreadpool/pthreadpool_sync.c
@@ -83,6 +83,26 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
 			       pool->signal_fn_private_data);
 }
 
+int pthreadpool_restart_check(struct pthreadpool *pool)
+{
+	if (pool->stopped) {
+		return EINVAL;
+	}
+
+	return 0;
+}
+
+int pthreadpool_restart_check_monitor_fd(struct pthreadpool *pool)
+{
+	errno = ENOSYS;
+	return -1;
+}
+
+int pthreadpool_restart_check_monitor_drain(struct pthreadpool *pool)
+{
+	return EINVAL;
+}
+
 size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id,
 			      void (*fn)(void *private_data), void *private_data)
 {
-- 
2.17.1


From 5dd3b3d0211ae3c1845e7f707737d789ee8fc0fc Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Fri, 20 Apr 2018 17:12:07 +0200
Subject: [PATCH 20/23] pthreadpool: implement
 pthreadpool_tevent_wrapper_create() infrastructure

This can be used implement a generic per thread impersonation
for thread pools.

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/pthreadpool/pthreadpool_tevent.c | 402 ++++++++++++++++++++++++++-
 lib/pthreadpool/pthreadpool_tevent.h |  32 +++
 2 files changed, 433 insertions(+), 1 deletion(-)

diff --git a/lib/pthreadpool/pthreadpool_tevent.c b/lib/pthreadpool/pthreadpool_tevent.c
index 01e8586b384d..19b1e6d9650a 100644
--- a/lib/pthreadpool/pthreadpool_tevent.c
+++ b/lib/pthreadpool/pthreadpool_tevent.c
@@ -103,6 +103,8 @@ struct pthreadpool_tevent_glue {
 	/* Tuple we are keeping track of in this list. */
 	struct tevent_context *ev;
 	struct tevent_threaded_context *tctx;
+	/* recheck monitor fd event */
+	struct tevent_fd *fde;
 	/* Pointer to link object owned by *ev. */
 	struct pthreadpool_tevent_glue_ev_link *ev_link;
 	/* active jobs */
@@ -122,11 +124,33 @@ struct pthreadpool_tevent_glue_ev_link {
 	struct pthreadpool_tevent_glue *glue;
 };
 
+struct pthreadpool_tevent_wrapper {
+	struct pthreadpool_tevent *main_tp;
+	struct pthreadpool_tevent *wrap_tp;
+	const struct pthreadpool_tevent_wrapper_ops *ops;
+	void *private_state;
+	bool force_per_thread_cwd;
+};
+
 struct pthreadpool_tevent {
+	struct pthreadpool_tevent *prev, *next;
+
 	struct pthreadpool *pool;
 	struct pthreadpool_tevent_glue *glue_list;
 
 	struct pthreadpool_tevent_job *jobs;
+
+	struct {
+		/*
+		 * This is used on the main context
+		 */
+		struct pthreadpool_tevent *list;
+
+		/*
+		 * This is used on the wrapper context
+		 */
+		struct pthreadpool_tevent_wrapper *ctx;
+	} wrapper;
 };
 
 struct pthreadpool_tevent_job_state {
@@ -141,6 +165,7 @@ struct pthreadpool_tevent_job {
 	struct pthreadpool_tevent_job *prev, *next;
 
 	struct pthreadpool_tevent *pool;
+	struct pthreadpool_tevent_wrapper *wrapper;
 	struct pthreadpool_tevent_job_state *state;
 	struct tevent_immediate *im;
 
@@ -180,6 +205,15 @@ struct pthreadpool_tevent_job {
 		 */
 		bool started;
 
+		/*
+		 * 'wrapper'
+		 * set before calling the wrapper before_job() or
+		 * after_job() hooks.
+		 * unset again check the hook finished.
+		 * (only written by job thread!)
+		 */
+		bool wrapper;
+
 		/*
 		 * 'executed'
 		 * set once the job function returned.
@@ -209,6 +243,18 @@ struct pthreadpool_tevent_job {
 		 * (only written by job thread!)
 		 */
 		bool signaled;
+
+		/*
+		 * 'exit_thread'
+		 * maybe set during pthreadpool_tevent_job_fn()
+		 * if some wrapper related code generated an error
+		 * and the environment isn't safe anymore.
+		 *
+		 * In such a case pthreadpool_tevent_job_signal()
+		 * will pick this up and therminate the current
+		 * worker thread by returning -1.
+		 */
+		bool exit_thread; /* only written/read by job thread! */
 	} needs_fence;
 
 	bool per_thread_cwd;
@@ -267,8 +313,22 @@ int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
 	return 0;
 }
 
+static struct pthreadpool_tevent *pthreadpool_tevent_unwrap(
+	struct pthreadpool_tevent *pool)
+{
+	struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
+
+	if (wrapper != NULL) {
+		return wrapper->main_tp;
+	}
+
+	return pool;
+}
+
 size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool)
 {
+	pool = pthreadpool_tevent_unwrap(pool);
+
 	if (pool->pool == NULL) {
 		return 0;
 	}
@@ -278,6 +338,8 @@ size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool)
 
 size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool)
 {
+	pool = pthreadpool_tevent_unwrap(pool);
+
 	if (pool->pool == NULL) {
 		return 0;
 	}
@@ -287,6 +349,14 @@ size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool)
 
 bool pthreadpool_tevent_per_thread_cwd(struct pthreadpool_tevent *pool)
 {
+	struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
+
+	if (wrapper != NULL && wrapper->force_per_thread_cwd) {
+		return true;
+	}
+
+	pool = pthreadpool_tevent_unwrap(pool);
+
 	if (pool->pool == NULL) {
 		return false;
 	}
@@ -298,21 +368,94 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
 {
 	struct pthreadpool_tevent_job *job = NULL;
 	struct pthreadpool_tevent_job *njob = NULL;
+	struct pthreadpool_tevent *wrap_tp = NULL;
+	struct pthreadpool_tevent *nwrap_tp = NULL;
 	struct pthreadpool_tevent_glue *glue = NULL;
 	int ret;
 
+	if (pool->wrapper.ctx != NULL) {
+		struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
+
+		pool->wrapper.ctx = NULL;
+		pool = wrapper->main_tp;
+
+		DLIST_REMOVE(pool->wrapper.list, wrapper->wrap_tp);
+
+		for (job = pool->jobs; job != NULL; job = njob) {
+			njob = job->next;
+
+			if (job->wrapper != wrapper) {
+				continue;
+			}
+
+			/*
+			 * This removes the job from the list
+			 *
+			 * Note that it waits in case
+			 * the wrapper hooks are currently
+			 * executing on the job.
+			 */
+			pthreadpool_tevent_job_orphan(job);
+		}
+
+		/*
+		 * At this point we're sure that no job
+		 * still references the pthreadpool_tevent_wrapper
+		 * structure, so we can free it.
+		 */
+		TALLOC_FREE(wrapper);
+
+		pthreadpool_tevent_cleanup_orphaned_jobs();
+		return 0;
+	}
+
+	if (pool->pool == NULL) {
+		/*
+		 * A dangling wrapper without main_tp.
+		 */
+		return 0;
+	}
+
 	ret = pthreadpool_stop(pool->pool);
 	if (ret != 0) {
 		return ret;
 	}
 
+	/*
+	 * orphan all jobs (including wrapper jobs)
+	 */
 	for (job = pool->jobs; job != NULL; job = njob) {
 		njob = job->next;
 
-		/* The job this removes it from the list */
+		/*
+		 * The job this removes it from the list
+		 *
+		 * Note that it waits in case
+		 * the wrapper hooks are currently
+		 * executing on the job (thread).
+		 */
 		pthreadpool_tevent_job_orphan(job);
 	}
 
+	/*
+	 * cleanup all existing wrappers, remember we just orphaned
+	 * all jobs (including the once of the wrappers).
+	 *
+	 * So we just mark as broken, so that
+	 * pthreadpool_tevent_job_send() won't accept new jobs.
+	 */
+	for (wrap_tp = pool->wrapper.list; wrap_tp != NULL; wrap_tp = nwrap_tp) {
+		nwrap_tp = wrap_tp->next;
+
+		/*
+		 * Just mark them as broken, so that we can't
+		 * get more jobs.
+		 */
+		TALLOC_FREE(wrap_tp->wrapper.ctx);
+
+		DLIST_REMOVE(pool->wrapper.list, wrap_tp);
+	}
+
 	/*
 	 * Delete all the registered
 	 * tevent_context/tevent_threaded_context
@@ -335,12 +478,93 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
 	return 0;
 }
 
+struct pthreadpool_tevent *_pthreadpool_tevent_wrapper_create(
+				struct pthreadpool_tevent *main_tp,
+				TALLOC_CTX *mem_ctx,
+				const struct pthreadpool_tevent_wrapper_ops *ops,
+				void *pstate,
+				size_t psize,
+				const char *type,
+				const char *location)
+{
+	void **ppstate = (void **)pstate;
+	struct pthreadpool_tevent *wrap_tp = NULL;
+	struct pthreadpool_tevent_wrapper *wrapper = NULL;
+
+	pthreadpool_tevent_cleanup_orphaned_jobs();
+
+	if (main_tp->wrapper.ctx != NULL) {
+		/*
+		 * stacking of wrappers is not supported
+		 */
+		errno = EINVAL;
+		return NULL;
+	}
+
+	if (main_tp->pool == NULL) {
+		/*
+		 * The pool is no longer valid,
+		 * most likely it was a wrapper context
+		 * where the main pool was destroyed.
+		 */
+		errno = EINVAL;
+		return NULL;
+	}
+
+	wrap_tp = talloc_zero(mem_ctx, struct pthreadpool_tevent);
+	if (wrap_tp == NULL) {
+		return NULL;
+	}
+
+	wrapper = talloc_zero(wrap_tp, struct pthreadpool_tevent_wrapper);
+	if (wrapper == NULL) {
+		TALLOC_FREE(wrap_tp);
+		return NULL;
+	}
+	wrapper->main_tp = main_tp;
+	wrapper->wrap_tp = wrap_tp;
+	wrapper->ops = ops;
+	wrapper->private_state = talloc_zero_size(wrapper, psize);
+	if (wrapper->private_state == NULL) {
+		TALLOC_FREE(wrap_tp);
+		return NULL;
+	}
+	talloc_set_name_const(wrapper->private_state, type);
+
+	wrap_tp->wrapper.ctx = wrapper;
+
+	DLIST_ADD_END(main_tp->wrapper.list, wrap_tp);
+
+	talloc_set_destructor(wrap_tp, pthreadpool_tevent_destructor);
+
+	*ppstate = wrapper->private_state;
+	return wrap_tp;
+}
+
+void pthreadpool_tevent_force_per_thread_cwd(struct pthreadpool_tevent *pool,
+					     const void *private_state)
+{
+	struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
+
+	if (wrapper == NULL) {
+		abort();
+	}
+
+	if (wrapper->private_state != private_state) {
+		abort();
+	}
+
+	wrapper->force_per_thread_cwd = true;
+}
+
 static int pthreadpool_tevent_glue_destructor(
 	struct pthreadpool_tevent_glue *glue)
 {
 	struct pthreadpool_tevent_job_state *state = NULL;
 	struct pthreadpool_tevent_job_state *nstate = NULL;
 
+	TALLOC_FREE(glue->fde);
+
 	for (state = glue->states; state != NULL; state = nstate) {
 		nstate = state->next;
 
@@ -381,6 +605,59 @@ static int pthreadpool_tevent_glue_link_destructor(
 	return 0;
 }
 
+static void pthreadpool_tevent_glue_monitor(struct tevent_context *ev,
+					    struct tevent_fd *fde,
+					    uint16_t flags,
+					    void *private_data)
+{
+	struct pthreadpool_tevent_glue *glue =
+		talloc_get_type_abort(private_data,
+		struct pthreadpool_tevent_glue);
+	struct pthreadpool_tevent_job *job = NULL;
+	struct pthreadpool_tevent_job *njob = NULL;
+	int ret = -1;
+
+	ret = pthreadpool_restart_check_monitor_drain(glue->pool->pool);
+	if (ret != 0) {
+		TALLOC_FREE(glue->fde);
+	}
+
+	ret = pthreadpool_restart_check(glue->pool->pool);
+	if (ret == 0) {
+		/*
+		 * success...
+		 */
+		goto done;
+	}
+
+	/*
+	 * There's a problem and the pool
+	 * has not a single thread available
+	 * for pending jobs, so we can only
+	 * stop the jobs and return an error.
+	 * This is similar to a failure from
+	 * pthreadpool_add_job().
+	 */
+	for (job = glue->pool->jobs; job != NULL; job = njob) {
+		njob = job->next;
+
+		tevent_req_defer_callback(job->state->req,
+					  job->state->ev);
+		tevent_req_error(job->state->req, ret);
+	}
+
+done:
+	if (glue->states == NULL) {
+		/*
+		 * If the glue doesn't have any pending jobs
+		 * we remove the glue.
+		 *
+		 * In order to remove the fd event.
+		 */
+		TALLOC_FREE(glue);
+	}
+}
+
 static int pthreadpool_tevent_register_ev(
 				struct pthreadpool_tevent *pool,
 				struct pthreadpool_tevent_job_state *state)
@@ -388,6 +665,7 @@ static int pthreadpool_tevent_register_ev(
 	struct tevent_context *ev = state->ev;
 	struct pthreadpool_tevent_glue *glue = NULL;
 	struct pthreadpool_tevent_glue_ev_link *ev_link = NULL;
+	int monitor_fd = -1;
 
 	/*
 	 * See if this tevent_context was already registered by
@@ -420,6 +698,28 @@ static int pthreadpool_tevent_register_ev(
 	};
 	talloc_set_destructor(glue, pthreadpool_tevent_glue_destructor);
 
+	monitor_fd = pthreadpool_restart_check_monitor_fd(pool->pool);
+	if (monitor_fd == -1 && errno != ENOSYS) {
+		int saved_errno = errno;
+		TALLOC_FREE(glue);
+		return saved_errno;
+	}
+
+	if (monitor_fd != -1) {
+		glue->fde = tevent_add_fd(ev,
+					  glue,
+					  monitor_fd,
+					  TEVENT_FD_READ,
+					  pthreadpool_tevent_glue_monitor,
+					  glue);
+		if (glue->fde == NULL) {
+			close(monitor_fd);
+			TALLOC_FREE(glue);
+			return ENOMEM;
+		}
+		tevent_fd_set_auto_close(glue->fde);
+	}
+
 	/*
 	 * Now allocate the link object to the event context. Note this
 	 * is allocated OFF THE EVENT CONTEXT ITSELF, so if the event
@@ -559,6 +859,24 @@ static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job)
 		abort();
 	}
 
+	/*
+	 * Once we marked the request as 'orphaned'
+	 * we spin/loop if 'wrapper' is marked as active.
+	 *
+	 * We need to wait until the wrapper hook finished
+	 * before we can set job->wrapper = NULL.
+	 *
+	 * This is some kind of spinlock, but with
+	 * 1 millisecond sleeps in between, in order
+	 * to give the thread more cpu time to finish.
+	 */
+	PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+	while (job->needs_fence.wrapper) {
+		poll(NULL, 0, 1);
+		PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+	}
+	job->wrapper = NULL;
+
 	/*
 	 * Once we marked the request as 'orphaned'
 	 * we spin/loop if it's already marked
@@ -673,9 +991,14 @@ struct tevent_req *pthreadpool_tevent_job_send(
 	struct pthreadpool_tevent_job_state *state = NULL;
 	struct pthreadpool_tevent_job *job = NULL;
 	int ret;
+	struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
 
 	pthreadpool_tevent_cleanup_orphaned_jobs();
 
+	if (wrapper != NULL) {
+		pool = wrapper->main_tp;
+	}
+
 	req = tevent_req_create(mem_ctx, &state,
 				struct pthreadpool_tevent_job_state);
 	if (req == NULL) {
@@ -705,6 +1028,7 @@ struct tevent_req *pthreadpool_tevent_job_send(
 		return tevent_req_post(req, ev);
 	}
 	job->pool = pool;
+	job->wrapper = wrapper;
 	job->fn = fn;
 	job->private_data = private_data;
 	job->im = tevent_create_immediate(state->job);
@@ -803,15 +1127,73 @@ static void pthreadpool_tevent_job_fn(void *private_data)
 	struct pthreadpool_tevent_job *job =
 		talloc_get_type_abort(private_data,
 		struct pthreadpool_tevent_job);
+	struct pthreadpool_tevent_wrapper *wrapper = NULL;
 
 	current_job = job;
 	job->needs_fence.started = true;
 	PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+	if (job->needs_fence.orphaned) {
+		current_job = NULL;
+		return;
+	}
+
+	wrapper = job->wrapper;
+	if (wrapper != NULL) {
+		bool ok;
+
+		job->needs_fence.wrapper = true;
+		PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+		if (job->needs_fence.orphaned) {
+			job->needs_fence.wrapper = false;
+			PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+			current_job = NULL;
+			return;
+		}
+		ok = wrapper->ops->before_job(wrapper->wrap_tp,
+					      wrapper->private_state,
+					      wrapper->main_tp,
+					      __location__);
+		job->needs_fence.wrapper = false;
+		PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+		if (!ok) {
+			job->needs_fence.exit_thread = true;
+			PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+			current_job = NULL;
+			return;
+		}
+	}
 
 	job->fn(job->private_data);
 
 	job->needs_fence.executed = true;
 	PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+
+	if (wrapper != NULL) {
+		bool ok;
+
+		job->needs_fence.wrapper = true;
+		PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+		if (job->needs_fence.orphaned) {
+			job->needs_fence.wrapper = false;
+			job->needs_fence.exit_thread = true;
+			PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+			current_job = NULL;
+			return;
+		}
+		ok = wrapper->ops->after_job(wrapper->wrap_tp,
+					     wrapper->private_state,
+					     wrapper->main_tp,
+					     __location__);
+		job->needs_fence.wrapper = false;
+		PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+		if (!ok) {
+			job->needs_fence.exit_thread = true;
+			PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+			current_job = NULL;
+			return;
+		}
+	}
+
 	current_job = NULL;
 }
 
@@ -830,6 +1212,15 @@ static int pthreadpool_tevent_job_signal(int jobid,
 		/* Request already gone */
 		job->needs_fence.dropped = true;
 		PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+		if (job->needs_fence.exit_thread) {
+			/*
+			 * A problem with the wrapper the current job/worker
+			 * thread needs to terminate.
+			 *
+			 * The pthreadpool_tevent is already gone.
+			 */
+			return -1;
+		}
 		return 0;
 	}
 
@@ -855,6 +1246,15 @@ static int pthreadpool_tevent_job_signal(int jobid,
 
 	job->needs_fence.signaled = true;
 	PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+	if (job->needs_fence.exit_thread) {
+		/*
+		 * A problem with the wrapper the current job/worker
+		 * thread needs to terminate.
+		 *
+		 * The pthreadpool_tevent is already gone.
+		 */
+		return -1;
+	}
 	return 0;
 }
 
diff --git a/lib/pthreadpool/pthreadpool_tevent.h b/lib/pthreadpool/pthreadpool_tevent.h
index ff2ab7cfb73d..6c939fc1d2d8 100644
--- a/lib/pthreadpool/pthreadpool_tevent.h
+++ b/lib/pthreadpool/pthreadpool_tevent.h
@@ -29,6 +29,38 @@ struct pthreadpool_tevent;
 int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
 			    struct pthreadpool_tevent **presult);
 
+struct pthreadpool_tevent_wrapper_ops {
+	const char *name;
+
+	bool (*before_job)(struct pthreadpool_tevent *wrap_tp,
+			   void *private_state,
+			   struct pthreadpool_tevent *main_tp,
+			   const char *location);
+	bool (*after_job)(struct pthreadpool_tevent *wrap_tp,
+			  void *private_state,
+			  struct pthreadpool_tevent *main_tp,
+			  const char *location);
+};
+
+struct pthreadpool_tevent *_pthreadpool_tevent_wrapper_create(
+				struct pthreadpool_tevent *main_tp,
+				TALLOC_CTX *mem_ctx,
+				const struct pthreadpool_tevent_wrapper_ops *ops,
+				void *pstate,
+				size_t psize,
+				const char *type,
+				const char *location);
+#define pthreadpool_tevent_wrapper_create(main_tp, mem_ctx, ops, state, type) \
+	_pthreadpool_tevent_wrapper_create(main_tp, mem_ctx, ops, \
+				       state, sizeof(type), #type, __location__)
+
+/*
+ * this can only be called directly after
+ * pthreadpool_tevent_wrapper_create()
+ */
+void pthreadpool_tevent_force_per_thread_cwd(struct pthreadpool_tevent *pool,
+					     const void *private_state);
+
 size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool);
 size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool);
 bool pthreadpool_tevent_per_thread_cwd(struct pthreadpool_tevent *pool);
-- 
2.17.1


From 4b4691b6ccab08d4001ae04e36b9b9747bf55a3a Mon Sep 17 00:00:00 2001
From: Ralph Boehme <slow at samba.org>
Date: Mon, 18 Jun 2018 16:57:18 +0200
Subject: [PATCH 21/23] pthreadpool: test cancelling and freeing jobs of a
 wrapped pthreadpool_tevent

Pair-Programmed-With: Stefan Metzmacher <metze at samba.org>

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/pthreadpool/tests_cmocka.c | 573 +++++++++++++++++++++++++++++++++
 1 file changed, 573 insertions(+)

diff --git a/lib/pthreadpool/tests_cmocka.c b/lib/pthreadpool/tests_cmocka.c
index 5c7f6ab6904b..b5b6b4cc624e 100644
--- a/lib/pthreadpool/tests_cmocka.c
+++ b/lib/pthreadpool/tests_cmocka.c
@@ -543,6 +543,9 @@ struct test_cancel_state {
 	struct test_cancel_job *job4;
 	struct test_cancel_job *job5;
 	struct test_cancel_job *job6;
+	struct test_cancel_job *job7;
+	struct test_cancel_job *job8;
+	struct test_cancel_job *job9;
 };
 
 static void test_cancel_job(void **private_data)
@@ -805,6 +808,573 @@ static void test_cancel_job(void **private_data)
 	TALLOC_FREE(state);
 }
 
+struct test_pthreadpool_tevent_wrap_tp_stats {
+	unsigned num_before;
+	unsigned num_after;
+	bool destroyed;
+};
+
+struct test_pthreadpool_tevent_wrap_tp_state {
+	struct test_pthreadpool_tevent_wrap_tp_state **selfptr;
+	struct test_pthreadpool_tevent_wrap_tp_stats *stats;
+};
+
+static int test_pthreadpool_tevent_wrap_tp_state_destructor(
+	struct test_pthreadpool_tevent_wrap_tp_state *state)
+{
+	state->stats->destroyed = true;
+	*state->selfptr = NULL;
+
+	return 0;
+}
+
+static bool test_pthreadpool_tevent_tp_before(struct pthreadpool_tevent *wrap,
+					      void *private_data,
+					      struct pthreadpool_tevent *main,
+					      const char *location)
+{
+	struct test_pthreadpool_tevent_wrap_tp_state *state =
+		talloc_get_type_abort(private_data,
+		struct test_pthreadpool_tevent_wrap_tp_state);
+
+	state->stats->num_before++;
+
+	return true;
+}
+
+static bool test_pthreadpool_tevent_tp_after(struct pthreadpool_tevent *wrap,
+					     void *private_data,
+					     struct pthreadpool_tevent *main,
+					     const char *location)
+{
+	struct test_pthreadpool_tevent_wrap_tp_state *state =
+		talloc_get_type_abort(private_data,
+		struct test_pthreadpool_tevent_wrap_tp_state);
+
+	state->stats->num_after++;
+
+	return true;
+}
+
+static const struct pthreadpool_tevent_wrapper_ops test_tp_ops = {
+	.name		= "test_pthreadpool_tevent_tp",
+	.before_job	= test_pthreadpool_tevent_tp_before,
+	.after_job	= test_pthreadpool_tevent_tp_after,
+};
+
+static void test_wrap_cancel_job(void **private_data)
+{
+	struct pthreadpool_tevent_test *t = *private_data;
+	struct tevent_context *ev = t->ev;
+	struct pthreadpool_tevent *poolw1 = NULL;
+	struct test_pthreadpool_tevent_wrap_tp_state *poolw1_state = NULL;
+	struct test_pthreadpool_tevent_wrap_tp_stats poolw1_stats = {
+		.destroyed = false,
+	};
+	struct pthreadpool_tevent *poolw2 = NULL;
+	struct test_pthreadpool_tevent_wrap_tp_state *poolw2_state = NULL;
+	struct test_pthreadpool_tevent_wrap_tp_stats poolw2_stats = {
+		.destroyed = false,
+	};
+	size_t max_threads_o;
+	size_t max_threads_w1;
+	size_t max_threads_w2;
+	bool per_thread_cwd_o;
+	bool per_thread_cwd_w1;
+	bool per_thread_cwd_w2;
+	struct test_cancel_state *state = NULL;
+	int ret;
+	bool ok;
+	int fdpair[2] = { -1, -1 };
+	char c = 0;
+
+	max_threads_o = pthreadpool_tevent_max_threads(t->opool);
+	per_thread_cwd_o = pthreadpool_tevent_per_thread_cwd(t->opool);
+
+	poolw1 = pthreadpool_tevent_wrapper_create(
+		t->opool, t, &test_tp_ops, &poolw1_state,
+		struct test_pthreadpool_tevent_wrap_tp_state);
+	assert_non_null(poolw1);
+	poolw1_state->selfptr = &poolw1_state;
+	ANNOTATE_BENIGN_RACE_SIZED(&poolw1_stats,
+				   sizeof(poolw1_stats),
+				   "protected by pthreadpool_tevent code");
+	poolw1_state->stats = &poolw1_stats;
+	talloc_set_destructor(poolw1_state,
+			      test_pthreadpool_tevent_wrap_tp_state_destructor);
+
+	poolw2 = pthreadpool_tevent_wrapper_create(
+		t->opool, t, &test_tp_ops, &poolw2_state,
+		struct test_pthreadpool_tevent_wrap_tp_state);
+	assert_non_null(poolw2);
+	poolw2_state->selfptr = &poolw2_state;
+	ANNOTATE_BENIGN_RACE_SIZED(&poolw2_stats,
+				   sizeof(poolw2_stats),
+				   "protected by pthreadpool_tevent code");
+	poolw2_state->stats = &poolw2_stats;
+	talloc_set_destructor(poolw2_state,
+			      test_pthreadpool_tevent_wrap_tp_state_destructor);
+
+	assert_false(poolw1_stats.destroyed);
+	assert_int_equal(poolw1_stats.num_before, 0);
+	assert_int_equal(poolw1_stats.num_after, 0);
+	max_threads_w1 = pthreadpool_tevent_max_threads(poolw1);
+	assert_int_equal(max_threads_w1, max_threads_o);
+	per_thread_cwd_w1 = pthreadpool_tevent_per_thread_cwd(poolw1);
+	assert_int_equal(per_thread_cwd_w1, per_thread_cwd_o);
+
+	assert_false(poolw2_stats.destroyed);
+	assert_int_equal(poolw2_stats.num_before, 0);
+	assert_int_equal(poolw2_stats.num_after, 0);
+	max_threads_w2 = pthreadpool_tevent_max_threads(poolw2);
+	assert_int_equal(max_threads_w2, max_threads_o);
+	per_thread_cwd_w2 = pthreadpool_tevent_per_thread_cwd(poolw2);
+	assert_int_equal(per_thread_cwd_w2, per_thread_cwd_o);
+
+	state = talloc_zero(t, struct test_cancel_state);
+	assert_non_null(state);
+	state->job1 = test_cancel_job_create(state);
+	assert_non_null(state->job1);
+	state->job2 = test_cancel_job_create(state);
+	assert_non_null(state->job2);
+	state->job3 = test_cancel_job_create(state);
+	assert_non_null(state->job3);
+
+	ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
+	assert_int_equal(ret, 0);
+
+	state->job1->fdm = fdpair[0];
+	state->job1->fdj = fdpair[1];
+
+	assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 0);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 0);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 0);
+
+	will_return(__wrap_pthread_create, 0);
+	state->job1->req = pthreadpool_tevent_job_send(
+		state->job1, ev, poolw1, test_cancel_job_fn, state->job1);
+	assert_non_null(state->job1->req);
+	tevent_req_set_callback(state->job1->req,
+				test_cancel_job_done,
+				state->job1);
+
+	state->job2->req = pthreadpool_tevent_job_send(
+		state->job2, ev, poolw1, test_cancel_job_fn, NULL);
+	assert_non_null(state->job2->req);
+	tevent_req_set_callback(state->job2->req,
+				test_cancel_job_done,
+				state->job2);
+
+	state->job3->req = pthreadpool_tevent_job_send(
+		state->job3, ev, poolw1, test_cancel_job_fn, NULL);
+	assert_non_null(state->job3->req);
+	tevent_req_set_callback(state->job3->req,
+				test_cancel_job_done,
+				state->job3);
+
+	/*
+	 * Wait for the job 1 to start.
+	 */
+	ret = read(state->job1->fdm, &c, 1);
+	assert_int_equal(ret, 1);
+
+	/*
+	 * We cancel job 3 and destroy job2.
+	 * Both should never be executed.
+	 */
+	assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 2);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 2);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 2);
+	TALLOC_FREE(state->job2->req);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 1);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 1);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 1);
+	ok = tevent_req_cancel(state->job3->req);
+	assert_true(ok);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 0);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 0);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 0);
+
+	/*
+	 * Job 3 should complete as canceled, while
+	 * job 1 is still running.
+	 */
+	test_cancel_job_wait(state->job3, ev);
+	assert_int_equal(state->job3->ret, ECANCELED);
+	assert_null(state->job3->req);
+	assert_false(state->job3->started);
+
+	/*
+	 * Now job1 is canceled while it's running,
+	 * this should let it stop it's loop.
+	 */
+	ok = tevent_req_cancel(state->job1->req);
+	assert_false(ok);
+
+	/*
+	 * Job 1 completes, It got at least one sleep
+	 * timeout loop and has state->job1->canceled set.
+	 */
+	test_cancel_job_wait(state->job1, ev);
+	assert_int_equal(state->job1->ret, 0);
+	assert_null(state->job1->req);
+	assert_true(state->job1->started);
+	assert_true(state->job1->finished);
+	assert_true(state->job1->canceled);
+	assert_false(state->job1->orphaned);
+	assert_in_range(state->job1->polls, 1, 100);
+	assert_int_equal(state->job1->timeouts, state->job1->polls);
+
+	assert_false(poolw1_stats.destroyed);
+	assert_int_equal(poolw1_stats.num_before, 1);
+	assert_int_equal(poolw1_stats.num_after, 1);
+
+	assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 0);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 0);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 0);
+
+	/*
+	 * Now we create jobs 4 and 5
+	 * Both should execute.
+	 * Job 4 is orphaned while running by a TALLOC_FREE()
+	 * This should stop job 4 and let job 5 start.
+	 * We do a "normal" exit in job 5 by creating some activity
+	 * on the socketpair.
+	 */
+
+	state->job4 = test_cancel_job_create(state);
+	assert_non_null(state->job4);
+
+	ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
+	assert_int_equal(ret, 0);
+
+	state->job4->fdm = fdpair[0];
+	state->job4->fdj = fdpair[1];
+
+	state->job4->req = pthreadpool_tevent_job_send(
+		state->job4, ev, poolw1, test_cancel_job_fn, state->job4);
+	assert_non_null(state->job4->req);
+	tevent_req_set_callback(state->job4->req,
+				test_cancel_job_done,
+				state->job4);
+
+	state->job5 = test_cancel_job_create(state);
+	assert_non_null(state->job5);
+
+	ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
+	assert_int_equal(ret, 0);
+
+	state->job5->fdm = fdpair[0];
+	state->job5->fdj = fdpair[1];
+
+	state->job5->req = pthreadpool_tevent_job_send(
+		state->job5, ev, poolw1, test_cancel_job_fn, state->job5);
+	assert_non_null(state->job5->req);
+	tevent_req_set_callback(state->job5->req,
+				test_cancel_job_done,
+				state->job5);
+
+	/*
+	 * Make sure job 5 can exit as soon as possible.
+	 * It will never get a sleep/poll timeout.
+	 */
+	ret = write(state->job5->fdm, &c, 1);
+	assert_int_equal(ret, 1);
+
+	/*
+	 * Wait for the job 4 to start
+	 */
+	ret = read(state->job4->fdm, &c, 1);
+	assert_int_equal(ret, 1);
+
+	assert_false(poolw1_stats.destroyed);
+	assert_int_equal(poolw1_stats.num_before, 2);
+	assert_int_equal(poolw1_stats.num_after, 1);
+
+	assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 1);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 1);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 1);
+
+	/*
+	 * destroy the request so that it's marked
+	 * as orphaned.
+	 *
+	 * As we're in the wrapper mode, the
+	 * job thread will exit and try to create
+	 * a new thread.
+	 */
+	TALLOC_FREE(state->job4->req);
+
+	/*
+	 * Job 5 completes, It got no sleep timeout loop.
+	 */
+	will_return(__wrap_pthread_create, 0);
+	test_cancel_job_wait(state->job5, ev);
+	assert_int_equal(state->job5->ret, 0);
+	assert_null(state->job5->req);
+	assert_true(state->job5->started);
+	assert_true(state->job5->finished);
+	assert_false(state->job5->canceled);
+	assert_false(state->job5->orphaned);
+	assert_int_equal(state->job5->polls, 1);
+	assert_int_equal(state->job5->timeouts, 0);
+
+	assert_false(poolw1_stats.destroyed);
+	assert_int_equal(poolw1_stats.num_before, 3);
+	assert_int_equal(poolw1_stats.num_after, 2);
+
+	assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 0);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 0);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 0);
+
+	/*
+	 * Job 2 is still not executed as we did a TALLOC_FREE()
+	 * before is was scheduled.
+	 */
+	assert_false(state->job2->completed);
+	assert_false(state->job2->started);
+
+	/*
+	 * Job 4 is still wasn't completed as we did a TALLOC_FREE()
+	 * while it is was running. but it was started and has
+	 * orphaned set
+	 */
+	assert_false(state->job4->completed);
+	assert_true(state->job4->started);
+	assert_true(state->job4->finished);
+	assert_false(state->job4->canceled);
+	assert_true(state->job4->orphaned);
+	assert_in_range(state->job4->polls, 1, 100);
+	assert_int_equal(state->job4->timeouts, state->job4->polls);
+
+	assert_false(poolw1_stats.destroyed);
+	assert_int_equal(poolw1_stats.num_before, 3);
+	assert_int_equal(poolw1_stats.num_after, 2);
+
+	/*
+	 * Now we create jobs 6 and 7
+	 * Both should execute.
+	 * We destroy the pool wrapper (1) while job 6 is executing.
+	 * This should stop job 6 and let job 7 start.
+	 * Job 7 runs on the pool wrapper (2).
+	 * We do a "normal" exit in job 7 by creating some activity
+	 * on the socketpair.
+	 */
+
+	state->job6 = test_cancel_job_create(state);
+	assert_non_null(state->job6);
+
+	ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
+	assert_int_equal(ret, 0);
+
+	state->job6->fdm = fdpair[0];
+	state->job6->fdj = fdpair[1];
+
+	state->job6->req = pthreadpool_tevent_job_send(
+		state->job6, ev, poolw1, test_cancel_job_fn, state->job6);
+	assert_non_null(state->job6->req);
+	tevent_req_set_callback(state->job6->req,
+				test_cancel_job_done,
+				state->job6);
+
+	state->job7 = test_cancel_job_create(state);
+	assert_non_null(state->job7);
+
+	ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
+	assert_int_equal(ret, 0);
+
+	state->job7->fdm = fdpair[0];
+	state->job7->fdj = fdpair[1];
+
+	state->job7->req = pthreadpool_tevent_job_send(
+		state->job7, ev, poolw2, test_cancel_job_fn, state->job7);
+	assert_non_null(state->job7->req);
+	tevent_req_set_callback(state->job7->req,
+				test_cancel_job_done,
+				state->job7);
+
+	/*
+	 * Make sure job 7 can exit as soon as possible.
+	 * It will never get a sleep/poll timeout.
+	 */
+	ret = write(state->job7->fdm, &c, 1);
+	assert_int_equal(ret, 1);
+
+	/*
+	 * Wait for the job 6 to start
+	 */
+	ret = read(state->job6->fdm, &c, 1);
+	assert_int_equal(ret, 1);
+
+	assert_non_null(poolw1_state);
+	assert_false(poolw1_stats.destroyed);
+	assert_int_equal(poolw1_stats.num_before, 4);
+	assert_int_equal(poolw1_stats.num_after, 2);
+
+	assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 1);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 1);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 1);
+
+	/*
+	 * destroy the request so that it's marked
+	 * as orphaned.
+	 */
+	TALLOC_FREE(poolw1);
+
+	/*
+	 * Wait until the job finished.
+	 */
+	ret = read(state->job6->fdm, &c, 1);
+	assert_int_equal(ret, 0);
+
+	assert_null(poolw1_state);
+	assert_true(poolw1_stats.destroyed);
+	assert_int_equal(poolw1_stats.num_before, 4);
+	assert_int_equal(poolw1_stats.num_after, 2);
+
+	/*
+	 * Job 6 is still dangling arround.
+	 *
+	 * We need to convince valgrind --tool={drd,helgrind}
+	 * that the read above is good enough to be
+	 * sure the job is finished and closed the other end of
+	 * the socketpair.
+	 */
+	ANNOTATE_BENIGN_RACE_SIZED(state->job6,
+				   sizeof(*state->job6),
+				   "protected by thread fence");
+	assert_non_null(state->job6->req);
+	assert_true(tevent_req_is_in_progress(state->job6->req));
+	assert_false(state->job6->completed);
+	assert_true(state->job6->started);
+	assert_true(state->job6->finished);
+	assert_false(state->job6->canceled);
+	assert_true(state->job6->orphaned);
+	assert_in_range(state->job6->polls, 1, 100);
+	assert_int_equal(state->job6->timeouts, state->job4->polls);
+
+	/*
+	 * Job 7 completes, It got no sleep timeout loop.
+	 */
+	will_return(__wrap_pthread_create, 0);
+	test_cancel_job_wait(state->job7, ev);
+	assert_int_equal(state->job7->ret, 0);
+	assert_null(state->job7->req);
+	assert_true(state->job7->started);
+	assert_true(state->job7->finished);
+	assert_false(state->job7->canceled);
+	assert_false(state->job7->orphaned);
+	assert_int_equal(state->job7->polls, 1);
+	assert_int_equal(state->job7->timeouts, 0);
+
+	assert_non_null(poolw2_state);
+	assert_false(poolw2_stats.destroyed);
+	assert_int_equal(poolw2_stats.num_before, 1);
+	assert_int_equal(poolw2_stats.num_after, 1);
+
+	assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 0);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 0);
+
+	/*
+	 * Now we create jobs 8
+	 * On a new wrapper pool
+	 * We destroy the main pool while it's executing.
+	 */
+
+	state->job8 = test_cancel_job_create(state);
+	assert_non_null(state->job8);
+
+	ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
+	assert_int_equal(ret, 0);
+
+	state->job8->fdm = fdpair[0];
+	state->job8->fdj = fdpair[1];
+
+	state->job8->req = pthreadpool_tevent_job_send(
+		state->job8, ev, poolw2, test_cancel_job_fn, state->job8);
+	assert_non_null(state->job8->req);
+	tevent_req_set_callback(state->job8->req,
+				test_cancel_job_done,
+				state->job8);
+
+	/*
+	 * Wait for the job 8 to start
+	 */
+	ret = read(state->job8->fdm, &c, 1);
+	assert_int_equal(ret, 1);
+
+	assert_false(poolw2_stats.destroyed);
+	assert_int_equal(poolw2_stats.num_before, 2);
+	assert_int_equal(poolw2_stats.num_after, 1);
+
+	/*
+	 * destroy the request so that it's marked
+	 * as orphaned.
+	 */
+	TALLOC_FREE(t->opool);
+
+	/*
+	 * Wait until the job finished.
+	 */
+	ret = read(state->job8->fdm, &c, 1);
+	assert_int_equal(ret, 0);
+
+	assert_null(poolw2_state);
+	assert_true(poolw2_stats.destroyed);
+	assert_int_equal(poolw2_stats.num_before, 2);
+	assert_int_equal(poolw2_stats.num_after, 1);
+
+	/*
+	 * Job 8 is still dangling arround.
+	 *
+	 * We need to convince valgrind --tool={drd,helgrind}
+	 * that the read above is good enough to be
+	 * sure the job is finished and closed the other end of
+	 * the socketpair.
+	 */
+	ANNOTATE_BENIGN_RACE_SIZED(state->job8,
+				   sizeof(*state->job8),
+				   "protected by thread fence");
+	assert_non_null(state->job8->req);
+	assert_true(tevent_req_is_in_progress(state->job8->req));
+	assert_false(state->job8->completed);
+	assert_true(state->job8->started);
+	assert_true(state->job8->finished);
+	assert_false(state->job8->canceled);
+	assert_true(state->job8->orphaned);
+	assert_in_range(state->job8->polls, 1, 100);
+	assert_int_equal(state->job8->timeouts, state->job4->polls);
+
+	/*
+	 * Now check that adding a new job to the dangling
+	 * wrapper gives an error.
+	 */
+	state->job9 = test_cancel_job_create(state);
+	assert_non_null(state->job9);
+
+	state->job9->req = pthreadpool_tevent_job_send(
+		state->job9, ev, poolw2, test_cancel_job_fn, state->job9);
+	assert_non_null(state->job9->req);
+	tevent_req_set_callback(state->job9->req,
+				test_cancel_job_done,
+				state->job9);
+
+	/*
+	 * Job 9 completes, But with a failure.
+	 */
+	test_cancel_job_wait(state->job9, ev);
+	assert_int_equal(state->job9->ret, EINVAL);
+	assert_null(state->job9->req);
+	assert_false(state->job9->started);
+	assert_false(state->job9->finished);
+	assert_false(state->job9->canceled);
+	assert_false(state->job9->orphaned);
+	assert_int_equal(state->job9->polls, 0);
+	assert_int_equal(state->job9->timeouts, 0);
+
+	TALLOC_FREE(state);
+}
+
 int main(int argc, char **argv)
 {
 	const struct CMUnitTest tests[] = {
@@ -817,6 +1387,9 @@ int main(int argc, char **argv)
 		cmocka_unit_test_setup_teardown(test_cancel_job,
 						setup_pthreadpool_tevent,
 						teardown_pthreadpool_tevent),
+		cmocka_unit_test_setup_teardown(test_wrap_cancel_job,
+						setup_pthreadpool_tevent,
+						teardown_pthreadpool_tevent),
 	};
 
 	cmocka_set_message_output(CM_OUTPUT_SUBUNIT);
-- 
2.17.1


From 211ffc9db9de3a371bc5ccae3b5ef82662b0d704 Mon Sep 17 00:00:00 2001
From: Ralph Boehme <slow at samba.org>
Date: Thu, 28 Jun 2018 14:28:34 +0200
Subject: [PATCH 22/23] lib/util: rename USE_LINUX_THREAD_CREDENTIALS to
 HAVE_LINUX_THREAD_CREDENTIALS

The define reflects the results of a feature test, not a configure
option.

Signed-off-by: Ralph Boehme <slow at samba.org>
---
 lib/util/setid.c                  | 24 ++++++++++++------------
 source3/lib/util_sec.c            | 20 ++++++++++----------
 source3/modules/vfs_aio_pthread.c |  4 ++--
 source3/wscript                   |  8 ++++----
 tests/summary.c                   |  2 +-
 5 files changed, 29 insertions(+), 29 deletions(-)

diff --git a/lib/util/setid.c b/lib/util/setid.c
index 88195d8ee826..eb7511083f09 100644
--- a/lib/util/setid.c
+++ b/lib/util/setid.c
@@ -56,7 +56,7 @@ int samba_setgroups(size_t setlen, const gid_t *gidset);
 
 #endif
 
-#if defined(USE_LINUX_THREAD_CREDENTIALS)
+#if defined(HAVE_LINUX_THREAD_CREDENTIALS)
 #if defined(HAVE_UNISTD_H)
 #include <unistd.h>
 #endif
@@ -80,7 +80,7 @@ int samba_setgroups(size_t setlen, const gid_t *gidset);
 /* All the setXX[ug]id functions and setgroups Samba uses. */
 int samba_setresuid(uid_t ruid, uid_t euid, uid_t suid)
 {
-#if defined(USE_LINUX_THREAD_CREDENTIALS)
+#if defined(HAVE_LINUX_THREAD_CREDENTIALS)
 #if defined(USE_LINUX_32BIT_SYSCALLS)
 	return syscall(SYS_setresuid32, ruid, euid, suid);
 #else
@@ -96,7 +96,7 @@ int samba_setresuid(uid_t ruid, uid_t euid, uid_t suid)
 
 int samba_setresgid(gid_t rgid, gid_t egid, gid_t sgid)
 {
-#if defined(USE_LINUX_THREAD_CREDENTIALS)
+#if defined(HAVE_LINUX_THREAD_CREDENTIALS)
 #if defined(USE_LINUX_32BIT_SYSCALLS)
 	return syscall(SYS_setresgid32, rgid, egid, sgid);
 #else
@@ -112,7 +112,7 @@ int samba_setresgid(gid_t rgid, gid_t egid, gid_t sgid)
 
 int samba_setreuid(uid_t ruid, uid_t euid)
 {
-#if defined(USE_LINUX_THREAD_CREDENTIALS)
+#if defined(HAVE_LINUX_THREAD_CREDENTIALS)
 #if defined(USE_LINUX_32BIT_SYSCALLS)
 	return syscall(SYS_setreuid32, ruid, euid);
 #else
@@ -128,7 +128,7 @@ int samba_setreuid(uid_t ruid, uid_t euid)
 
 int samba_setregid(gid_t rgid, gid_t egid)
 {
-#if defined(USE_LINUX_THREAD_CREDENTIALS)
+#if defined(HAVE_LINUX_THREAD_CREDENTIALS)
 #if defined(USE_LINUX_32BIT_SYSCALLS)
 	return syscall(SYS_setregid32, rgid, egid);
 #else
@@ -144,7 +144,7 @@ int samba_setregid(gid_t rgid, gid_t egid)
 
 int samba_seteuid(uid_t euid)
 {
-#if defined(USE_LINUX_THREAD_CREDENTIALS)
+#if defined(HAVE_LINUX_THREAD_CREDENTIALS)
 #if defined(USE_LINUX_32BIT_SYSCALLS)
 	/* seteuid is not a separate system call. */
 	return syscall(SYS_setresuid32, -1, euid, -1);
@@ -162,7 +162,7 @@ int samba_seteuid(uid_t euid)
 
 int samba_setegid(gid_t egid)
 {
-#if defined(USE_LINUX_THREAD_CREDENTIALS)
+#if defined(HAVE_LINUX_THREAD_CREDENTIALS)
 #if defined(USE_LINUX_32BIT_SYSCALLS)
 	/* setegid is not a separate system call. */
 	return syscall(SYS_setresgid32, -1, egid, -1);
@@ -180,7 +180,7 @@ int samba_setegid(gid_t egid)
 
 int samba_setuid(uid_t uid)
 {
-#if defined(USE_LINUX_THREAD_CREDENTIALS)
+#if defined(HAVE_LINUX_THREAD_CREDENTIALS)
 #if defined(USE_LINUX_32BIT_SYSCALLS)
 	return syscall(SYS_setuid32, uid);
 #else
@@ -196,7 +196,7 @@ int samba_setuid(uid_t uid)
 
 int samba_setgid(gid_t gid)
 {
-#if defined(USE_LINUX_THREAD_CREDENTIALS)
+#if defined(HAVE_LINUX_THREAD_CREDENTIALS)
 #if defined(USE_LINUX_32BIT_SYSCALLS)
 	return syscall(SYS_setgid32, gid);
 #else
@@ -215,7 +215,7 @@ int samba_setuidx(int flags, uid_t uid)
 #if defined(HAVE_SETUIDX)
 	return setuidx(flags, uid);
 #else
-	/* USE_LINUX_THREAD_CREDENTIALS doesn't have this. */
+	/* HAVE_LINUX_THREAD_CREDENTIALS doesn't have this. */
 	errno = ENOSYS;
 	return -1;
 #endif
@@ -226,7 +226,7 @@ int samba_setgidx(int flags, gid_t gid)
 #if defined(HAVE_SETGIDX)
 	return setgidx(flags, gid);
 #else
-	/* USE_LINUX_THREAD_CREDENTIALS doesn't have this. */
+	/* HAVE_LINUX_THREAD_CREDENTIALS doesn't have this. */
 	errno = ENOSYS;
 	return -1;
 #endif
@@ -234,7 +234,7 @@ int samba_setgidx(int flags, gid_t gid)
 
 int samba_setgroups(size_t setlen, const gid_t *gidset)
 {
-#if defined(USE_LINUX_THREAD_CREDENTIALS)
+#if defined(HAVE_LINUX_THREAD_CREDENTIALS)
 #if defined(USE_LINUX_32BIT_SYSCALLS)
 	return syscall(SYS_setgroups32, setlen, gidset);
 #else
diff --git a/source3/lib/util_sec.c b/source3/lib/util_sec.c
index 760f8b06906e..a6817f6e3c49 100644
--- a/source3/lib/util_sec.c
+++ b/source3/lib/util_sec.c
@@ -165,7 +165,7 @@ static void assert_gid(gid_t rgid, gid_t egid)
 ****************************************************************************/
 void gain_root_privilege(void)
 {	
-#if defined(USE_SETRESUID) || defined(USE_LINUX_THREAD_CREDENTIALS)
+#if defined(USE_SETRESUID) || defined(HAVE_LINUX_THREAD_CREDENTIALS)
 	samba_setresuid(0,0,0);
 #endif
     
@@ -195,7 +195,7 @@ void gain_root_privilege(void)
 ****************************************************************************/
 void gain_root_group_privilege(void)
 {
-#if defined(USE_SETRESUID) || defined(USE_LINUX_THREAD_CREDENTIALS)
+#if defined(USE_SETRESUID) || defined(HAVE_LINUX_THREAD_CREDENTIALS)
 	samba_setresgid(0,0,0);
 #endif
 
@@ -232,7 +232,7 @@ void gain_root_group_privilege(void)
 ****************************************************************************/
 void set_effective_uid(uid_t uid)
 {
-#if defined(USE_SETRESUID) || defined(USE_LINUX_THREAD_CREDENTIALS)
+#if defined(USE_SETRESUID) || defined(HAVE_LINUX_THREAD_CREDENTIALS)
         /* Set the effective as well as the real uid. */
 	if (samba_setresuid(uid,uid,-1) == -1) {
 		if (errno == EAGAIN) {
@@ -264,7 +264,7 @@ void set_effective_uid(uid_t uid)
 ****************************************************************************/
 void set_effective_gid(gid_t gid)
 {
-#if defined(USE_SETRESUID) || defined(USE_LINUX_THREAD_CREDENTIALS)
+#if defined(USE_SETRESUID) || defined(HAVE_LINUX_THREAD_CREDENTIALS)
 	samba_setresgid(-1,gid,-1);
 #endif
 
@@ -303,7 +303,7 @@ void save_re_uid(void)
 
 void restore_re_uid_fromroot(void)
 {
-#if defined(USE_SETRESUID) || defined(USE_LINUX_THREAD_CREDENTIALS)
+#if defined(USE_SETRESUID) || defined(HAVE_LINUX_THREAD_CREDENTIALS)
 	samba_setresuid(saved_ruid, saved_euid, -1);
 #elif USE_SETREUID
 	samba_setreuid(saved_ruid, -1);
@@ -342,7 +342,7 @@ void save_re_gid(void)
 ****************************************************************************/
 void restore_re_gid(void)
 {
-#if defined(USE_SETRESUID) || defined(USE_LINUX_THREAD_CREDENTIALS)
+#if defined(USE_SETRESUID) || defined(HAVE_LINUX_THREAD_CREDENTIALS)
 	samba_setresgid(saved_rgid, saved_egid, -1);
 #elif USE_SETREUID
 	samba_setregid(saved_rgid, -1);
@@ -370,7 +370,7 @@ int set_re_uid(void)
 {
 	uid_t uid = geteuid();
 
-#if defined(USE_SETRESUID) || defined(USE_LINUX_THREAD_CREDENTIALS)
+#if defined(USE_SETRESUID) || defined(HAVE_LINUX_THREAD_CREDENTIALS)
 	samba_setresuid(uid, uid, -1);
 #endif
 
@@ -409,7 +409,7 @@ void become_user_permanently(uid_t uid, gid_t gid)
 	gain_root_privilege();
 	gain_root_group_privilege();
 
-#if defined(USE_SETRESUID) || defined(USE_LINUX_THREAD_CREDENTIALS)
+#if defined(USE_SETRESUID) || defined(HAVE_LINUX_THREAD_CREDENTIALS)
 	samba_setresgid(gid,gid,gid);
 	samba_setgid(gid);
 	samba_setresuid(uid,uid,uid);
@@ -454,7 +454,7 @@ int set_thread_credentials(uid_t uid,
 			size_t setlen,
 			const gid_t *gidset)
 {
-#if defined(USE_LINUX_THREAD_CREDENTIALS)
+#if defined(HAVE_LINUX_THREAD_CREDENTIALS)
 	/*
 	 * With Linux thread-specific credentials
 	 * we know we have setresuid/setresgid
@@ -501,7 +501,7 @@ static int have_syscall(void)
 {
 	errno = 0;
 
-#if defined(USE_SETRESUID) || defined(USE_LINUX_THREAD_CREDENTIALS)
+#if defined(USE_SETRESUID) || defined(HAVE_LINUX_THREAD_CREDENTIALS)
 	samba_setresuid(-1,-1,-1);
 #endif
 
diff --git a/source3/modules/vfs_aio_pthread.c b/source3/modules/vfs_aio_pthread.c
index a9e2b09827bd..c1d1a7d518a6 100644
--- a/source3/modules/vfs_aio_pthread.c
+++ b/source3/modules/vfs_aio_pthread.c
@@ -31,7 +31,7 @@
 #include <linux/falloc.h>
 #endif
 
-#if defined(HAVE_OPENAT) && defined(USE_LINUX_THREAD_CREDENTIALS)
+#if defined(HAVE_OPENAT) && defined(HAVE_LINUX_THREAD_CREDENTIALS)
 
 /*
  * We must have openat() to do any thread-based
@@ -395,7 +395,7 @@ static int aio_pthread_open_fn(vfs_handle_struct *handle,
 #endif
 
 static struct vfs_fn_pointers vfs_aio_pthread_fns = {
-#if defined(HAVE_OPENAT) && defined(USE_LINUX_THREAD_CREDENTIALS)
+#if defined(HAVE_OPENAT) && defined(HAVE_LINUX_THREAD_CREDENTIALS)
 	.open_fn = aio_pthread_open_fn,
 #endif
 };
diff --git a/source3/wscript b/source3/wscript
index a14d76d7469e..8409e776dccc 100644
--- a/source3/wscript
+++ b/source3/wscript
@@ -964,23 +964,23 @@ syscall(SYS_setgroups32, 0, NULL);
         if (conf.CONFIG_SET('USE_LINUX_32BIT_SYSCALLS')):
             seteuid = conf.CHECK_CODE('''
                                 #define AUTOCONF_TEST 1
-                                #define USE_LINUX_THREAD_CREDENTIALS 1
+                                #define HAVE_LINUX_THREAD_CREDENTIALS 1
                                 #define USE_LINUX_32BIT_SYSCALLS 1
                                 #include "../lib/util/setid.c"
                                 #include "./lib/util_sec.c"
                                 ''',
-                                'USE_LINUX_THREAD_CREDENTIALS',
+                                'HAVE_LINUX_THREAD_CREDENTIALS',
                                 addmain=False,
                                 execute=True,
                                 msg="Checking whether we can use Linux thread-specific credentials with 32-bit system calls")
         else:
             seteuid = conf.CHECK_CODE('''
                                 #define AUTOCONF_TEST 1
-                                #define USE_LINUX_THREAD_CREDENTIALS 1
+                                #define HAVE_LINUX_THREAD_CREDENTIALS 1
                                 #include "../lib/util/setid.c"
                                 #include "./lib/util_sec.c"
                                 ''',
-                                'USE_LINUX_THREAD_CREDENTIALS',
+                                'HAVE_LINUX_THREAD_CREDENTIALS',
                                 addmain=False,
                                 execute=True,
                                 msg="Checking whether we can use Linux thread-specific credentials")
diff --git a/tests/summary.c b/tests/summary.c
index 0843ee2cc915..87a64fd90efd 100644
--- a/tests/summary.c
+++ b/tests/summary.c
@@ -12,7 +12,7 @@ int main()
 #warning "WARNING: No automated network interface determination"
 #endif
 
-#if !(defined(USE_SETEUID) || defined(USE_SETREUID) || defined(USE_SETRESUID) || defined(USE_SETUIDX) || defined(USE_LINUX_THREAD_CREDENTIALS))
+#if !(defined(USE_SETEUID) || defined(USE_SETREUID) || defined(USE_SETRESUID) || defined(USE_SETUIDX) || defined(HAVE_LINUX_THREAD_CREDENTIALS))
 #error "ERROR: no seteuid method available"
 #endif
 
-- 
2.17.1


From f61181771dcb5f62ecb5283c0c1d129cd439f150 Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Thu, 3 May 2018 16:08:06 +0200
Subject: [PATCH 23/23] s3:util_sec: add a cache to set_thread_credentials()

Calling set_thread_credentials() with the same values,
skips syscalls the 2nd time.

We only do this if '__thread' is supported to provide
thread local storage.

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 source3/lib/util_sec.c | 27 +++++++++++++++++++++++++++
 1 file changed, 27 insertions(+)

diff --git a/source3/lib/util_sec.c b/source3/lib/util_sec.c
index a6817f6e3c49..703c522d77b8 100644
--- a/source3/lib/util_sec.c
+++ b/source3/lib/util_sec.c
@@ -460,6 +460,24 @@ int set_thread_credentials(uid_t uid,
 	 * we know we have setresuid/setresgid
 	 * available.
 	 */
+#ifdef HAVE___THREAD
+	static struct {
+		bool active;
+		uid_t uid;
+		gid_t gid;
+		size_t setlen;
+		uintptr_t gidset;
+	} __thread cache;
+
+	if (cache.active &&
+	    cache.uid == uid &&
+	    cache.gid == gid &&
+	    cache.setlen == setlen &&
+	    (const gid_t *)cache.gidset == gidset)
+	{
+		return 0;
+	}
+#endif /* HAVE___THREAD */
 
 	/* Become root. */
 	/* Set ru=0, eu=0 */
@@ -485,6 +503,15 @@ int set_thread_credentials(uid_t uid,
 		smb_panic("set_thread_credentials failed\n");
 		return -1;
 	}
+
+#ifdef HAVE___THREAD
+	cache.active = true;
+	cache.uid = uid;
+	cache.gid = gid;
+	cache.setlen = setlen;
+	cache.gidset = (uintptr_t)gidset;
+#endif /* HAVE___THREAD */
+
 	return 0;
 #else
 	errno = ENOSYS;
-- 
2.17.1

-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 833 bytes
Desc: OpenPGP digital signature
URL: <http://lists.samba.org/pipermail/samba-technical/attachments/20180724/7c77c60b/signature-0001.sig>


More information about the samba-technical mailing list