impersonation part 5 (pthreadpool_tevent_wrapper)

Stefan Metzmacher metze at samba.org
Thu Jul 19 10:06:11 UTC 2018


Hi,

here's patchset that implements a wrapper infrastructure for
pthreadpool_tevent.

It means we can later have hooks, which call
set_thread_credentials() before invoking the job
function. Together with using 'unshare(CLONE_FS)'
in the worker thread, we get per thread creadentials
and a per thread current working directory.

This is required implement async path based syscalls.

Most of time we'll try to use the '*at()' version of syscalls,
e.g. openat(), but if such a function is not available, e.g. getxattr(),
we can simulate it by calling fchdir() before.

In order to get some sanity into the pthreadpool* code,
regarding what happens:
- if we talloc_free() a pending job
- if we talloc_free() the pool
before the job started within a worker thread
and when it's already started.
I came to the conclusion that we really need good tests
to demonstrate the behavior and make sure these tests
are valgrind/helgrind/drd clean.

In order to archive that we need to ensure some synchronization
between the main program and the worker threads.

Typically you would use pthread_mutex_[un]lock() for that
and we already use them in the low level pthreadpool.c code.

But we also need something in pthreadpool_tevent.c, which is upper
layer, which is used by samba.

The tricky part is that each time you use mutexes, you need to
implement pthread_atfork() hooks, which lock *all* mutexes
before we can allow fork. Getting this correct and deadlock
free with the deep interaction between pthreadpool.c and
pthreadpool_tevent.c seemed way to complex.

The assumption we can make regarding the use case, with unexpected
talloc_free(), is that there's always some kind of race and we don't
need the strong guarantees of mutexes.

I did some research and found memory barriers, which can guarantee
that atomic changes happen in a defined order. Typically
one thread changes a value and other threads just read it.

The barrier with the strongest guarantees is a full barrier.

C11 has it as atomic_thread_fence(memory_order_seq_cst) in stdatomic.h.
This results in a 'mfence' assembler instruction.

The same can be done with compiler builtins.
__atomic_signal_fence(__ATOMIC_SEQ_CST) in newer gcc versions
or __sync_synchronize() in older versions (was added in 2005)

This should be the same as:
__asm__ __volatile__ ("mfence" : : : "memory");

Much more details can be found here:
https://gcc.gnu.org/onlinedocs/gcc-7.3.0/gcc/_005f_005fatomic-Builtins.html#g_t_005f_005fatomic-Builtins
https://gcc.gnu.org/onlinedocs/gcc-7.3.0/gcc/_005f_005fsync-Builtins.html#g_t_005f_005fsync-Builtins

In order to simplify things I decided to always use a full barrier,
we can always optimize it later to use read or write barriers.

You can imagine the full barrier as protection against
instruction boundary that prevents instructions to be reordered
before or after the boundary. As well as a 'sync' of all
cpu caches, similar to an write oplock break where the client (cpu)
needs to flush local changes (in the cpu caches) to the server (the RAM).

This means that after the barrier all threads are guaranteed to see
the same memory.

We'll use a model with a bunch of bool variables, where each variable
is only be changed by either the main thread or the worker thread.

The (simplified) design is'

bool orphaned; /* written by main thread */
bool started;  /* written by job thread */
bool finished; /* written by job thread */

job thread:
     job->started = true;
     atomic_thread_fence(memory_order_seq_cst);
     if (job->orphaned) {
        job->finished = true;
        atomic_thread_fence(memory_order_seq_cst)
        exit();
     }
     job->job_fn();
     job->finished = true;
     atomic_thread_fence(memory_order_seq_cst)
     exit()

main thread destructor:

     job->orphaned = true;
     atomic_thread_fence(memory_order_seq_cst);
     if (job->finished) {
         talloc_free(job);
     }

     DLIST_ADD_END(orphaned_jobs, job);

     return;

main thread cleanup orphaned job list:

     for (job ....) {
          next_job = job->next;
          atomic_thread_fence(memory_order_seq_cst)
          if (job->finished) {
               DLIST_REMOVE(orphaned_jobs, job)
               talloc_free(job);
          }
     }

Once I had added that model to the exiting pthreadpool_tevent code
and had cmocka based tests for it, it was much easier to
add the pthreadpool_tevent_wrapper infrastructure.

The tricky part is what happens when talloc_free() is called
on just the wrapper pool, but the raw pool remains.

If the before_job() hook was already called and the wrapper
disappears while the job is running, we can't call the
after_job() hook in order to reset the job environment,
the only thing we can do is to exit that specific worker thread.
Otherwise a wrapper might continue running with special privileges
and a job on a different wrapper pool might still run with these
privileges.

We notify the main thread via a pipe() that it needs to call
pthreadpool_restart_check() in order to start new threads for
pending jobs.

All of these commands pass fine:-)
valgrind --error-exitcode=137 bin/pthreadpooltest_cmocka
valgrind --tool=helgrind --error-exitcode=137 bin/pthreadpooltest_cmocka
valgrind --tool=drd --error-exitcode=137 bin/pthreadpooltest_cmocka

Enjoy the review and push:-)

Thanks!
metze

Am 12.07.2018 um 08:13 schrieb Ralph Böhme via samba-technical:
> On Thu, Jul 12, 2018 at 12:47:16AM +0200, Stefan Metzmacher via
> samba-technical wrote:
>> here's another patchset that implements some cleanups in the
>> pthreadpool code, which make it easier to implement the required
>> pthreadpool wrapper for per thread impersonation.
>> I'll do some private autobuilds with this tomorrow, but a very similar
>> state already passed a few times.
>> Please review:-) Note some is already reviewed by Ralph, I need to
>> port the review tags from his branch to mine. 
> 
> looking through the patchset, all patches were reviewed-by-me.
> 
> -slow
> 

-------------- next part --------------
From 45f02f35b0c9842e46ebf216ac1fcdc8eefabf96 Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Sat, 14 Jul 2018 10:55:02 +0200
Subject: [PATCH 01/20] tevent: use talloc_zero_size() for the private state in
 tevent_context_wrapper_create()

This is watch tevent_req_create() uses and what callers of
tevent_context_wrapper_create() would therefore also expect.

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/tevent/tevent_wrapper.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lib/tevent/tevent_wrapper.c b/lib/tevent/tevent_wrapper.c
index a07696af0a4d..ce07af983588 100644
--- a/lib/tevent/tevent_wrapper.c
+++ b/lib/tevent/tevent_wrapper.c
@@ -371,7 +371,7 @@ struct tevent_context *_tevent_context_wrapper_create(struct tevent_context *mai
 	ev->wrapper.glue->wrap_ev = ev;
 	ev->wrapper.glue->main_ev = main_ev;
 	ev->wrapper.glue->ops = ops;
-	ev->wrapper.glue->private_state = talloc_size(ev->wrapper.glue, psize);
+	ev->wrapper.glue->private_state = talloc_zero_size(ev->wrapper.glue, psize);
 	if (ev->wrapper.glue->private_state == NULL) {
 		talloc_free(ev);
 		return NULL;
-- 
2.17.1


From aaa24ce6c20bbd1853b7aa0aa11a078c8ba567b9 Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Mon, 16 Jul 2018 17:17:59 +0200
Subject: [PATCH 02/20] pthreadpool: make sure a pthreadpool is marked as
 stopped in child processes

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/pthreadpool/pthreadpool.c | 1 +
 1 file changed, 1 insertion(+)

diff --git a/lib/pthreadpool/pthreadpool.c b/lib/pthreadpool/pthreadpool.c
index 610cfb02f154..528f4cdfd678 100644
--- a/lib/pthreadpool/pthreadpool.c
+++ b/lib/pthreadpool/pthreadpool.c
@@ -341,6 +341,7 @@ static void pthreadpool_child(void)
 		pool->num_idle = 0;
 		pool->head = 0;
 		pool->num_jobs = 0;
+		pool->stopped = true;
 
 		ret = pthread_cond_init(&pool->condvar, NULL);
 		assert(ret == 0);
-- 
2.17.1


From b0b16ae08e5518e07e9dbb5961076adc3a80caa4 Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Wed, 18 Jul 2018 08:44:48 +0200
Subject: [PATCH 03/20] lib/replace: check for __thread support

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/replace/wscript | 12 ++++++++++++
 1 file changed, 12 insertions(+)

diff --git a/lib/replace/wscript b/lib/replace/wscript
index fd00a42d5b62..02d98c59e476 100644
--- a/lib/replace/wscript
+++ b/lib/replace/wscript
@@ -551,6 +551,18 @@ def configure(conf):
              conf.CONFIG_SET('HAVE_PTHREAD_MUTEX_CONSISTENT_NP'))):
             conf.DEFINE('HAVE_ROBUST_MUTEXES', 1)
 
+    # __thread is available since 2002 in gcc.
+    conf.CHECK_CODE('''
+        __thread int tls;
+
+        int main(void) {
+            return 0;
+        }
+        ''',
+        'HAVE___THREAD',
+        addmain=False,
+        msg='Checking for __thread local storage')
+
     conf.CHECK_FUNCS_IN('crypt', 'crypt', checklibc=True)
     conf.CHECK_FUNCS_IN('crypt_r', 'crypt', checklibc=True)
 
-- 
2.17.1


From b61c19cc4de02db92d9e749b6ea912d502ae0a26 Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Wed, 18 Jul 2018 08:54:22 +0200
Subject: [PATCH 04/20] third_party/*_wrapper/wscript: remove redundant
 configure checks

HAVE___THREAD and HAVE_DESTRUCTOR_ATTRIBUTE are already checked
as part of Samba.

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 third_party/nss_wrapper/wscript    | 31 ++----------------------------
 third_party/pam_wrapper/wscript    | 30 ++---------------------------
 third_party/resolv_wrapper/wscript | 30 ++---------------------------
 third_party/socket_wrapper/wscript | 30 ++---------------------------
 third_party/uid_wrapper/wscript    | 12 ++----------
 5 files changed, 10 insertions(+), 123 deletions(-)

diff --git a/third_party/nss_wrapper/wscript b/third_party/nss_wrapper/wscript
index d50dd5cbb17d..a289d4710321 100644
--- a/third_party/nss_wrapper/wscript
+++ b/third_party/nss_wrapper/wscript
@@ -11,35 +11,8 @@ def configure(conf):
     else:
         conf.CHECK_HEADERS('nss.h')
 
-        # check HAVE_GCC_THREAD_LOCAL_STORAGE
-        conf.CHECK_CODE('''
-            __thread int tls;
-
-            int main(void) {
-                return 0;
-            }
-            ''',
-            'HAVE_GCC_THREAD_LOCAL_STORAGE',
-            addmain=False,
-            msg='Checking for thread local storage')
-
-        # check HAVE_DESTRUCTOR_ATTRIBUTE
-        conf.CHECK_CODE('''
-            void test_destructor_attribute(void) __attribute__ ((destructor));
-
-            void test_destructor_attribute(void)
-            {
-                return;
-            }
-
-            int main(void) {
-                return 0;
-            }
-            ''',
-            'HAVE_DESTRUCTOR_ATTRIBUTE',
-            addmain=False,
-            strict=True,
-            msg='Checking for library destructor support')
+        if conf.CONFIG_SET("HAVE___THREAD"):
+            conf.DEFINE("HAVE_GCC_THREAD_LOCAL_STORAGE", 1)
 
         # check HAVE_ATTRIBUTE_PRINTF_FORMAT
         conf.CHECK_CODE('''
diff --git a/third_party/pam_wrapper/wscript b/third_party/pam_wrapper/wscript
index 7d4a790caaaa..1a1e3a29bf2c 100644
--- a/third_party/pam_wrapper/wscript
+++ b/third_party/pam_wrapper/wscript
@@ -17,35 +17,9 @@ def configure(conf):
         conf.DEFINE('USING_SYSTEM_PAM_WRAPPER', 1)
         libpam_wrapper_so_path = 'libpam_wrapper.so'
     else:
-        # check HAVE_GCC_THREAD_LOCAL_STORAGE
-        conf.CHECK_CODE('''
-            __thread int tls;
-
-            int main(void) {
-                return 0;
-            }
-            ''',
-            'HAVE_GCC_THREAD_LOCAL_STORAGE',
-            addmain=False,
-            msg='Checking for thread local storage')
-
-        # check HAVE_DESTRUCTOR_ATTRIBUTE
-        conf.CHECK_CODE('''
-            void test_destructor_attribute(void) __attribute__ ((destructor));
 
-            void test_destructor_attribute(void)
-            {
-                return;
-            }
-
-            int main(void) {
-                return 0;
-            }
-            ''',
-            'HAVE_DESTRUCTOR_ATTRIBUTE',
-            addmain=False,
-            strict=True,
-            msg='Checking for library destructor support')
+        if conf.CONFIG_SET("HAVE___THREAD"):
+            conf.DEFINE("HAVE_GCC_THREAD_LOCAL_STORAGE", 1)
 
         # check HAVE_FUNCTION_ATTRIBUTE_FORMAT
         conf.CHECK_CODE('''
diff --git a/third_party/resolv_wrapper/wscript b/third_party/resolv_wrapper/wscript
index bb7722e97758..7cd1d90b8fa8 100644
--- a/third_party/resolv_wrapper/wscript
+++ b/third_party/resolv_wrapper/wscript
@@ -9,35 +9,9 @@ def configure(conf):
         conf.DEFINE('USING_SYSTEM_RESOLV_WRAPPER', 1)
         libresolv_wrapper_so_path = 'libresolv_wrapper.so'
     else:
-        # check HAVE_GCC_THREAD_LOCAL_STORAGE
-        conf.CHECK_CODE('''
-            __thread int tls;
-
-            int main(void) {
-                return 0;
-            }
-            ''',
-            'HAVE_GCC_THREAD_LOCAL_STORAGE',
-            addmain=False,
-            msg='Checking for thread local storage')
-
-        # check HAVE_DESTRUCTOR_ATTRIBUTE
-        conf.CHECK_CODE('''
-            void test_destructor_attribute(void) __attribute__ ((destructor));
 
-            void test_destructor_attribute(void)
-            {
-                return;
-            }
-
-            int main(void) {
-                return 0;
-            }
-            ''',
-            'HAVE_DESTRUCTOR_ATTRIBUTE',
-            addmain=False,
-            strict=True,
-            msg='Checking for library destructor support')
+        if conf.CONFIG_SET("HAVE___THREAD"):
+            conf.DEFINE("HAVE_GCC_THREAD_LOCAL_STORAGE", 1)
 
         # check HAVE_ATTRIBUTE_PRINTF_FORMAT
         conf.CHECK_CODE('''
diff --git a/third_party/socket_wrapper/wscript b/third_party/socket_wrapper/wscript
index f48debe8b157..a0ee4f2f9325 100644
--- a/third_party/socket_wrapper/wscript
+++ b/third_party/socket_wrapper/wscript
@@ -9,35 +9,9 @@ def configure(conf):
         conf.DEFINE('USING_SYSTEM_SOCKET_WRAPPER', 1)
         libsocket_wrapper_so_path = 'libsocket_wrapper.so'
     else:
-        # check HAVE_GCC_THREAD_LOCAL_STORAGE
-        conf.CHECK_CODE('''
-            __thread int tls;
-
-            int main(void) {
-                return 0;
-            }
-            ''',
-            'HAVE_GCC_THREAD_LOCAL_STORAGE',
-            addmain=False,
-            msg='Checking for thread local storage')
-
-        # check HAVE_DESTRUCTOR_ATTRIBUTE
-        conf.CHECK_CODE('''
-            void test_destructor_attribute(void) __attribute__ ((destructor));
 
-            void test_destructor_attribute(void)
-            {
-                return;
-            }
-
-            int main(void) {
-                return 0;
-            }
-            ''',
-            'HAVE_DESTRUCTOR_ATTRIBUTE',
-            addmain=False,
-            strict=True,
-            msg='Checking for library destructor support')
+        if conf.CONFIG_SET("HAVE___THREAD"):
+            conf.DEFINE("HAVE_GCC_THREAD_LOCAL_STORAGE", 1)
 
         # check HAVE_FUNCTION_ATTRIBUTE_FORMAT
         conf.CHECK_CODE('''
diff --git a/third_party/uid_wrapper/wscript b/third_party/uid_wrapper/wscript
index 6344ebf5eba4..8127a9d9f408 100644
--- a/third_party/uid_wrapper/wscript
+++ b/third_party/uid_wrapper/wscript
@@ -23,17 +23,9 @@ def configure(conf):
             addmain=False,
             msg='Checking for atomic builtins')
 
-        # check HAVE_GCC_THREAD_LOCAL_STORAGE
-        conf.CHECK_CODE('''
-            __thread int tls;
 
-            int main(void) {
-                return 0;
-            }
-            ''',
-            'HAVE_GCC_THREAD_LOCAL_STORAGE',
-            addmain=False,
-            msg='Checking for thread local storage')
+        if conf.CONFIG_SET("HAVE___THREAD"):
+            conf.DEFINE("HAVE_GCC_THREAD_LOCAL_STORAGE", 1)
 
         if Options.options.address_sanitizer:
             # check HAVE_ADDRESS_SANITIZER_ATTRIBUTE
-- 
2.17.1


From 042ff967c5d79074c4a6b223198c0c980ef4e708 Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Thu, 21 Jun 2018 14:17:35 +0200
Subject: [PATCH 05/20] replace: add checks for
 atomic_thread_fence(memory_order_seq_cst) and add possible fallbacks

This implements a full memory barrier.
On ubuntu amd64 with results in an 'mfence' instruction.

This is required to syncronization between threads, where
there's typically only one write of a memory that should be
synced between all threads with the barrier.

Much more details can be found here:
https://gcc.gnu.org/onlinedocs/gcc-7.3.0/gcc/_005f_005fatomic-Builtins.html#g_t_005f_005fatomic-Builtins
https://gcc.gnu.org/onlinedocs/gcc-7.3.0/gcc/_005f_005fsync-Builtins.html#g_t_005f_005fsync-Builtins

The main one we use seems to be in C11 via stdatomic.h,
the oldest fallback is __sync_synchronize(), which is available
since 2005 in gcc.

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/replace/system/threads.h | 27 +++++++++++++++++++++++++++
 lib/replace/wscript          | 19 ++++++++++++++++++-
 2 files changed, 45 insertions(+), 1 deletion(-)

diff --git a/lib/replace/system/threads.h b/lib/replace/system/threads.h
index fe6d0fbac541..d189ed620c51 100644
--- a/lib/replace/system/threads.h
+++ b/lib/replace/system/threads.h
@@ -42,4 +42,31 @@
 #define pthread_mutex_consistent pthread_mutex_consistent_np
 #endif
 
+#ifdef HAVE_STDATOMIC_H
+#include <stdatomic.h>
+#endif
+
+#ifndef HAVE_ATOMIC_THREAD_FENCE
+#ifdef HAVE___ATOMIC_THREAD_FENCE
+#define atomic_thread_fence(__ignore_order) __atomic_thread_fence(__ATOMIC_SEQ_CST)
+#define HAVE_ATOMIC_THREAD_FENCE 1
+#endif /* HAVE___ATOMIC_THREAD_FENCE */
+#endif /* not HAVE_ATOMIC_THREAD_FENCE */
+
+#ifndef HAVE_ATOMIC_THREAD_FENCE
+#ifdef HAVE___SYNC_SYNCHRONIZE
+#define atomic_thread_fence(__ignore_order) __sync_synchronize()
+#define HAVE_ATOMIC_THREAD_FENCE 1
+#endif /* HAVE___SYNC_SYNCHRONIZE */
+#endif /* not HAVE_ATOMIC_THREAD_FENCE */
+
+#ifndef HAVE_ATOMIC_THREAD_FENCE
+#ifdef HAVE_ATOMIC_THREAD_FENCE_SUPPORT
+#error mismatch_error_between_configure_test_and_header
+#endif
+/* make sure the build fails if someone uses it without checking the define */
+#define atomic_thread_fence(__order) \
+        __function__atomic_thread_fence_not_available_on_this_platform__()
+#endif /* not HAVE_ATOMIC_THREAD_FENCE */
+
 #endif
diff --git a/lib/replace/wscript b/lib/replace/wscript
index 02d98c59e476..5b53461f8448 100644
--- a/lib/replace/wscript
+++ b/lib/replace/wscript
@@ -113,7 +113,7 @@ def configure(conf):
     conf.CHECK_HEADERS('sys/extattr.h sys/ea.h sys/proplist.h sys/cdefs.h')
     conf.CHECK_HEADERS('utmp.h utmpx.h lastlog.h')
     conf.CHECK_HEADERS('syscall.h sys/syscall.h inttypes.h')
-    conf.CHECK_HEADERS('sys/atomic.h')
+    conf.CHECK_HEADERS('sys/atomic.h stdatomic.h')
     conf.CHECK_HEADERS('libgen.h')
 
     if conf.CHECK_CFLAGS('-Wno-format-truncation'):
@@ -260,6 +260,23 @@ def configure(conf):
                     headers='stdint.h sys/atomic.h',
                     msg='Checking for atomic_add_32 compiler builtin')
 
+    # Check for thread fence. */
+    tf = conf.CHECK_CODE('atomic_thread_fence(memory_order_seq_cst);',
+                         'HAVE_ATOMIC_THREAD_FENCE',
+                         headers='stdatomic.h',
+                         msg='Checking for atomic_thread_fence(memory_order_seq_cst) in stdatomic.h')
+    if not tf:
+        tf = conf.CHECK_CODE('__atomic_thread_fence(__ATOMIC_SEQ_CST);',
+                             'HAVE___ATOMIC_THREAD_FENCE',
+                             msg='Checking for __atomic_thread_fence(__ATOMIC_SEQ_CST)')
+    if not tf:
+        # __sync_synchronize() is available since 2005 in gcc.
+        tf = conf.CHECK_CODE('__sync_synchronize();',
+                             'HAVE___SYNC_SYNCHRONIZE',
+                             msg='Checking for __sync_synchronize')
+    if tf:
+        conf.DEFINE('HAVE_ATOMIC_THREAD_FENCE_SUPPORT', 1)
+
     conf.CHECK_CODE('''
                     #define FALL_THROUGH __attribute__((fallthrough))
 
-- 
2.17.1


From 104d214b480f2b75faf70b13c06ef9f9dd6fde4b Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Wed, 20 Jun 2018 13:38:19 +0200
Subject: [PATCH 06/20] pthreadpool: add some lockless coordination between the
 main and job threads

In the direction from the main process to the job thread, we have:

- 'maycancel', which is set when tevent_req_cancel() is called,
- 'orphaned' is the job request, tevent_context or pthreadpool_tevent
  was talloc_free'ed.

The job function can consume these by using:

   /*
    * return true - if tevent_req_cancel() was called.
    */
   bool pthreadpool_tevent_current_job_canceled(void);

   /*
    * return true - if talloc_free() was called on the job request,
    * tevent_context or pthreadpool_tevent.
    */
   bool pthreadpool_tevent_current_job_orphaned(void);

   /*
    * return true if canceled and orphaned are both false.
    */
   bool pthreadpool_tevent_current_job_continue(void);

In the other direction we remember the following points
in the job execution:

- 'started'  - set when the job is picked up by a worker thread
- 'executed' - set once the job function returned.
- 'finished' - set when pthreadpool_tevent_job_signal() is entered
- 'dropped'  - set when pthreadpool_tevent_job_signal() leaves with orphaned
- 'signaled' - set when pthreadpool_tevent_job_signal() leaves normal

There're only one side writing each element,
either the main process or the job thread.

This means we can do the coordination with a full memory
barrier using atomic_thread_fence(memory_order_seq_cst).
lib/replace provides fallbacks if C11 stdatomic.h is not available.

A real pthreadpool requires pthread and atomic_thread_fence() (or an
replacement) to be available, otherwise we only have pthreadpool_sync.c.
But this should not make a real difference, as at least
__sync_synchronize() is availabe since 2005 in gcc.
We also require __thread which is available since 2002.

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/pthreadpool/pthreadpool_tevent.c | 212 ++++++++++++++++++++++++++-
 lib/pthreadpool/pthreadpool_tevent.h |  14 ++
 wscript                              |  12 +-
 3 files changed, 231 insertions(+), 7 deletions(-)

diff --git a/lib/pthreadpool/pthreadpool_tevent.c b/lib/pthreadpool/pthreadpool_tevent.c
index e7e17d3bf0f7..fbf9c0e835a6 100644
--- a/lib/pthreadpool/pthreadpool_tevent.c
+++ b/lib/pthreadpool/pthreadpool_tevent.c
@@ -18,10 +18,42 @@
  */
 
 #include "replace.h"
+#include "system/threads.h"
 #include "pthreadpool_tevent.h"
 #include "pthreadpool.h"
 #include "lib/util/tevent_unix.h"
 #include "lib/util/dlinklist.h"
+#include "lib/util/attr.h"
+
+#define PTHREAD_TEVENT_JOB_THREAD_FENCE_INIT(__job) do { \
+	_UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
+} while(0);
+
+#ifdef WITH_PTHREADPOOL
+/*
+ * configure checked we have pthread and atomic_thread_fence() available
+ */
+#define __PTHREAD_TEVENT_JOB_THREAD_FENCE(__order) do { \
+	atomic_thread_fence(__order); \
+} while(0)
+#else
+/*
+ * we're using lib/pthreadpool/pthreadpool_sync.c ...
+ */
+#define __PTHREAD_TEVENT_JOB_THREAD_FENCE(__order) do { } while(0)
+#ifndef HAVE___THREAD
+#define __thread
+#endif
+#endif
+
+#define PTHREAD_TEVENT_JOB_THREAD_FENCE(__job) do { \
+	_UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
+	__PTHREAD_TEVENT_JOB_THREAD_FENCE(memory_order_seq_cst); \
+} while(0);
+
+#define PTHREAD_TEVENT_JOB_THREAD_FENCE_FINI(__job) do { \
+	_UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
+} while(0);
 
 struct pthreadpool_tevent_job_state;
 
@@ -75,6 +107,70 @@ struct pthreadpool_tevent_job {
 
 	void (*fn)(void *private_data);
 	void *private_data;
+
+	/*
+	 * Coordination between threads
+	 *
+	 * There're only one side writing each element
+	 * either the main process or the job thread.
+	 *
+	 * The coordination is done by a full memory
+	 * barrier using atomic_thread_fence(memory_order_seq_cst)
+	 * wrapped in PTHREAD_TEVENT_JOB_THREAD_FENCE()
+	 */
+	struct {
+		/*
+		 * 'maycancel'
+		 * set when tevent_req_cancel() is called.
+		 * (only written by main thread!)
+		 */
+		bool maycancel;
+
+		/*
+		 * 'orphaned'
+		 * set when talloc_free is called on the job request,
+		 * tevent_context or pthreadpool_tevent.
+		 * (only written by main thread!)
+		 */
+		bool orphaned;
+
+		/*
+		 * 'started'
+		 * set when the job is picked up by a worker thread
+		 * (only written by job thread!)
+		 */
+		bool started;
+
+		/*
+		 * 'executed'
+		 * set once the job function returned.
+		 * (only written by job thread!)
+		 */
+		bool executed;
+
+		/*
+		 * 'finished'
+		 * set when pthreadpool_tevent_job_signal() is entered
+		 * (only written by job thread!)
+		 */
+		bool finished;
+
+		/*
+		 * 'dropped'
+		 * set when pthreadpool_tevent_job_signal() leaves with
+		 * orphaned already set.
+		 * (only written by job thread!)
+		 */
+		bool dropped;
+
+		/*
+		 * 'signaled'
+		 * set when pthreadpool_tevent_job_signal() leaves normal
+		 * and the immediate event was scheduled.
+		 * (only written by job thread!)
+		 */
+		bool signaled;
+	} needs_fence;
 };
 
 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool);
@@ -299,11 +395,11 @@ static bool pthreadpool_tevent_job_cancel(struct tevent_req *req);
 static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job *job)
 {
 	/*
-	 * We should never be called with state->state != NULL.
+	 * We should never be called with needs_fence.orphaned == false.
 	 * Only pthreadpool_tevent_job_orphan() will call TALLOC_FREE(job)
 	 * after detaching from the request state and pool list.
 	 */
-	if (job->state != NULL) {
+	if (!job->needs_fence.orphaned) {
 		abort();
 	}
 
@@ -328,6 +424,17 @@ static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job *job)
 		}
 	}
 
+	PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+	if (job->needs_fence.dropped) {
+		/*
+		 * The signal function saw job->needs_fence.orphaned
+		 * before it started the signaling via the immediate
+		 * event. So we'll never geht triggered and can
+		 * remove job->im and let the whole job go...
+		 */
+		TALLOC_FREE(job->im);
+	}
+
 	/*
 	 * pthreadpool_tevent_job_orphan() already removed
 	 * it from pool->jobs. And we don't need try
@@ -351,11 +458,15 @@ static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job *job)
 	 */
 	DLIST_REMOVE(orphaned_jobs, job);
 
+	PTHREAD_TEVENT_JOB_THREAD_FENCE_FINI(job);
 	return 0;
 }
 
 static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job)
 {
+	job->needs_fence.orphaned = true;
+	PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+
 	/*
 	 * We're the only function that sets
 	 * job->state = NULL;
@@ -476,6 +587,7 @@ struct tevent_req *pthreadpool_tevent_job_send(
 	if (tevent_req_nomem(job->im, req)) {
 		return tevent_req_post(req, ev);
 	}
+	PTHREAD_TEVENT_JOB_THREAD_FENCE_INIT(job);
 	talloc_set_destructor(job, pthreadpool_tevent_job_destructor);
 	DLIST_ADD_END(job->pool->jobs, job);
 	job->state = state;
@@ -492,13 +604,76 @@ struct tevent_req *pthreadpool_tevent_job_send(
 	return req;
 }
 
+static __thread struct pthreadpool_tevent_job *current_job;
+
+bool pthreadpool_tevent_current_job_canceled(void)
+{
+	if (current_job == NULL) {
+		/*
+		 * Should only be called from within
+		 * the job function.
+		 */
+		abort();
+		return false;
+	}
+
+	PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
+	return current_job->needs_fence.maycancel;
+}
+
+bool pthreadpool_tevent_current_job_orphaned(void)
+{
+	if (current_job == NULL) {
+		/*
+		 * Should only be called from within
+		 * the job function.
+		 */
+		abort();
+		return false;
+	}
+
+	PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
+	return current_job->needs_fence.orphaned;
+}
+
+bool pthreadpool_tevent_current_job_continue(void)
+{
+	if (current_job == NULL) {
+		/*
+		 * Should only be called from within
+		 * the job function.
+		 */
+		abort();
+		return false;
+	}
+
+	PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
+	if (current_job->needs_fence.maycancel) {
+		return false;
+	}
+	PTHREAD_TEVENT_JOB_THREAD_FENCE(current_job);
+	if (current_job->needs_fence.orphaned) {
+		return false;
+	}
+
+	return true;
+}
+
 static void pthreadpool_tevent_job_fn(void *private_data)
 {
 	struct pthreadpool_tevent_job *job =
 		talloc_get_type_abort(private_data,
 		struct pthreadpool_tevent_job);
 
+	current_job = job;
+	job->needs_fence.started = true;
+	PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+
 	job->fn(job->private_data);
+
+	job->needs_fence.executed = true;
+	PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+	current_job = NULL;
 }
 
 static int pthreadpool_tevent_job_signal(int jobid,
@@ -513,8 +688,12 @@ static int pthreadpool_tevent_job_signal(int jobid,
 	struct tevent_threaded_context *tctx = NULL;
 	struct pthreadpool_tevent_glue *g = NULL;
 
-	if (state == NULL) {
+	job->needs_fence.finished = true;
+	PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+	if (job->needs_fence.orphaned) {
 		/* Request already gone */
+		job->needs_fence.dropped = true;
+		PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
 		return 0;
 	}
 
@@ -543,6 +722,8 @@ static int pthreadpool_tevent_job_signal(int jobid,
 					  job);
 	}
 
+	job->needs_fence.signaled = true;
+	PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
 	return 0;
 }
 
@@ -565,9 +746,17 @@ static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
 
 	/*
 	 * pthreadpool_tevent_job_cleanup()
-	 * will destroy the job.
+	 * (called by tevent_req_done() or
+	 * tevent_req_error()) will destroy the job.
 	 */
-	tevent_req_done(state->req);
+
+	if (job->needs_fence.executed) {
+		tevent_req_done(state->req);
+		return;
+	}
+
+	tevent_req_error(state->req, ENOEXEC);
+	return;
 }
 
 static bool pthreadpool_tevent_job_cancel(struct tevent_req *req)
@@ -582,6 +771,19 @@ static bool pthreadpool_tevent_job_cancel(struct tevent_req *req)
 		return false;
 	}
 
+	job->needs_fence.maycancel = true;
+	PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+	if (job->needs_fence.started) {
+		/*
+		 * It was too late to cancel the request.
+		 *
+		 * The job still has the chance to look
+		 * at pthreadpool_tevent_current_job_canceled()
+		 * or pthreadpool_tevent_current_job_continue()
+		 */
+		return false;
+	}
+
 	num = pthreadpool_cancel_job(job->pool->pool, 0,
 				     pthreadpool_tevent_job_fn,
 				     job);
diff --git a/lib/pthreadpool/pthreadpool_tevent.h b/lib/pthreadpool/pthreadpool_tevent.h
index fdb86e23757a..37e491e17c47 100644
--- a/lib/pthreadpool/pthreadpool_tevent.h
+++ b/lib/pthreadpool/pthreadpool_tevent.h
@@ -32,6 +32,20 @@ int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
 size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool);
 size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool);
 
+/*
+ * return true - if tevent_req_cancel() was called.
+ */
+bool pthreadpool_tevent_current_job_canceled(void);
+/*
+ * return true - if talloc_free() was called on the job request,
+ * tevent_context or pthreadpool_tevent.
+ */
+bool pthreadpool_tevent_current_job_orphaned(void);
+/*
+ * return true if canceled and orphaned are both false.
+ */
+bool pthreadpool_tevent_current_job_continue(void);
+
 struct tevent_req *pthreadpool_tevent_job_send(
 	TALLOC_CTX *mem_ctx, struct tevent_context *ev,
 	struct pthreadpool_tevent *pool,
diff --git a/wscript b/wscript
index 19fc6d12118e..f11c49dde676 100644
--- a/wscript
+++ b/wscript
@@ -259,10 +259,18 @@ def configure(conf):
         conf.DEFINE('WITH_NTVFS_FILESERVER', 1)
 
     if Options.options.with_pthreadpool:
-        if conf.CONFIG_SET('HAVE_PTHREAD'):
+        if conf.CONFIG_SET('HAVE_PTHREAD') and \
+           conf.CONFIG_SET('HAVE___THREAD') and \
+           conf.CONFIG_SET('HAVE_ATOMIC_THREAD_FENCE_SUPPORT'):
             conf.DEFINE('WITH_PTHREADPOOL', '1')
         else:
-            Logs.warn("pthreadpool support cannot be enabled when pthread support was not found")
+            if not conf.CONFIG_SET('HAVE_PTHREAD'):
+                Logs.warn("pthreadpool support cannot be enabled when pthread support was not found")
+            if not conf.CONFIG_SET('HAVE_ATOMIC_THREAD_FENCE_SUPPORT'):
+                Logs.warn("""pthreadpool support cannot be enabled when there is
+                          no support for atomic_thead_fence()""")
+            if not conf.CONFIG_SET('HAVE___THREAD'):
+                Logs.warn("pthreadpool support cannot be enabled when __thread support was not found")
             conf.undefine('WITH_PTHREADPOOL')
 
     conf.RECURSE('source3')
-- 
2.17.1


From c817dd808035042ea206272a27cbf06419874357 Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Thu, 21 Jun 2018 12:46:06 +0200
Subject: [PATCH 07/20] s3:wscript: don't check for valgrind related headers
 twice

We already check them in lib/replace/wscript.

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 source3/wscript | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/source3/wscript b/source3/wscript
index 633a3655b211..aed784ef0179 100644
--- a/source3/wscript
+++ b/source3/wscript
@@ -1034,7 +1034,7 @@ syscall(SYS_setgroups32, 0, NULL);
             Logs.warn("--with-dnsupdate=yes but gssapi support not sufficient")
         else:
             conf.DEFINE('WITH_DNS_UPDATES', 1)
-    conf.CHECK_HEADERS('valgrind.h valgrind/valgrind.h valgrind/memcheck.h')
+    # valgrind.h or valgrind/valgrind.h is checked in lib/replace/wscript
     if Options.options.developer:
         if conf.CONFIG_SET('HAVE_VALGRIND_H') or conf.CONFIG_SET('HAVE_VALGRIND_VALGRIND_H'):
             conf.DEFINE('VALGRIND', '1')
-- 
2.17.1


From 89ecfd425396e8c6ae62c39d428691eee152c798 Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Thu, 21 Jun 2018 12:46:48 +0200
Subject: [PATCH 08/20] lib/replace: also check for valgrind/helgrind.h

This will be used in lib/pthreadpool/pthreadpool_tevent.c
in order to avoid extected helgrind/drd warnings.

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/replace/wscript | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)

diff --git a/lib/replace/wscript b/lib/replace/wscript
index 5b53461f8448..3dd10ae0356a 100644
--- a/lib/replace/wscript
+++ b/lib/replace/wscript
@@ -108,7 +108,8 @@ def configure(conf):
     conf.CHECK_HEADERS('sys/fileio.h sys/filesys.h sys/dustat.h sys/sysmacros.h')
     conf.CHECK_HEADERS('xfs/libxfs.h netgroup.h')
 
-    conf.CHECK_HEADERS('valgrind.h valgrind/valgrind.h valgrind/memcheck.h')
+    conf.CHECK_HEADERS('valgrind.h valgrind/valgrind.h')
+    conf.CHECK_HEADERS('valgrind/memcheck.h valgrind/helgrind.h')
     conf.CHECK_HEADERS('nss_common.h nsswitch.h ns_api.h')
     conf.CHECK_HEADERS('sys/extattr.h sys/ea.h sys/proplist.h sys/cdefs.h')
     conf.CHECK_HEADERS('utmp.h utmpx.h lastlog.h')
-- 
2.17.1


From 4bd6dd50226cdb8a3e940ebcf435b89630b2a5b2 Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Thu, 21 Jun 2018 12:43:08 +0200
Subject: [PATCH 09/20] pthreadpool: add helgrind magic to
 PTHREAD_TEVENT_JOB_THREAD_FENCE_*()

This avoids the expected helgrind/drd warnings on the job states which
are protected by the thread fence.

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/pthreadpool/pthreadpool_tevent.c | 34 ++++++++++++++++++++++++++++
 1 file changed, 34 insertions(+)

diff --git a/lib/pthreadpool/pthreadpool_tevent.c b/lib/pthreadpool/pthreadpool_tevent.c
index fbf9c0e835a6..821d13b02362 100644
--- a/lib/pthreadpool/pthreadpool_tevent.c
+++ b/lib/pthreadpool/pthreadpool_tevent.c
@@ -25,8 +25,39 @@
 #include "lib/util/dlinklist.h"
 #include "lib/util/attr.h"
 
+/*
+ * We try to give some hints to helgrind/drd
+ *
+ * Note ANNOTATE_BENIGN_RACE_SIZED(address, size, describtion)
+ * takes an memory address range that ignored by helgrind/drd
+ * 'description' is just ignored...
+ *
+ *
+ * Note that ANNOTATE_HAPPENS_*(unique_uintptr)
+ * just takes a DWORD/(void *) as unique key
+ * for the barrier.
+ */
+#ifdef HAVE_VALGRIND_HELGRIND_H
+#include <valgrind/helgrind.h>
+#endif
+#ifndef ANNOTATE_BENIGN_RACE_SIZED
+#define ANNOTATE_BENIGN_RACE_SIZED(address, size, describtion)
+#endif
+#ifndef ANNOTATE_HAPPENS_BEFORE
+#define ANNOTATE_HAPPENS_BEFORE(unique_uintptr)
+#endif
+#ifndef ANNOTATE_HAPPENS_AFTER
+#define ANNOTATE_HAPPENS_AFTER(unique_uintptr)
+#endif
+#ifndef ANNOTATE_HAPPENS_BEFORE_FORGET_ALL
+#define ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(unique_uintptr)
+#endif
+
 #define PTHREAD_TEVENT_JOB_THREAD_FENCE_INIT(__job) do { \
 	_UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
+	ANNOTATE_BENIGN_RACE_SIZED(&__j->needs_fence, \
+				   sizeof(__j->needs_fence), \
+				   "race by design, protected by fence"); \
 } while(0);
 
 #ifdef WITH_PTHREADPOOL
@@ -48,11 +79,14 @@
 
 #define PTHREAD_TEVENT_JOB_THREAD_FENCE(__job) do { \
 	_UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
+	ANNOTATE_HAPPENS_BEFORE(&__job->needs_fence); \
 	__PTHREAD_TEVENT_JOB_THREAD_FENCE(memory_order_seq_cst); \
+	ANNOTATE_HAPPENS_AFTER(&__job->needs_fence); \
 } while(0);
 
 #define PTHREAD_TEVENT_JOB_THREAD_FENCE_FINI(__job) do { \
 	_UNUSED_ const struct pthreadpool_tevent_job *__j = __job; \
+	ANNOTATE_HAPPENS_BEFORE_FORGET_ALL(&__job->needs_fence); \
 } while(0);
 
 struct pthreadpool_tevent_job_state;
-- 
2.17.1


From 7228448e06f7b093a66b7c433fb51c46e1fb71b4 Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Fri, 22 Jun 2018 17:14:31 +0200
Subject: [PATCH 10/20] pthreadpool: maintain a list of job_states on each
 pthreadpool_tevent_glue

We should avoid traversing a linked list within a thread without holding
a mutex!

Using a mutex would be very tricky as we'll likely deadlock with
the mutexes at the raw pthreadpool layer.

So we use somekind of spinlock using atomic_thread_fence in order to
protect the access to job->state->glue->{tctx,ev} in
pthreadpool_tevent_job_signal().

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/pthreadpool/pthreadpool_tevent.c | 102 ++++++++++++++++++++-------
 1 file changed, 78 insertions(+), 24 deletions(-)

diff --git a/lib/pthreadpool/pthreadpool_tevent.c b/lib/pthreadpool/pthreadpool_tevent.c
index 821d13b02362..3b502a7cc5a3 100644
--- a/lib/pthreadpool/pthreadpool_tevent.c
+++ b/lib/pthreadpool/pthreadpool_tevent.c
@@ -18,6 +18,7 @@
  */
 
 #include "replace.h"
+#include "system/select.h"
 #include "system/threads.h"
 #include "pthreadpool_tevent.h"
 #include "pthreadpool.h"
@@ -104,6 +105,8 @@ struct pthreadpool_tevent_glue {
 	struct tevent_threaded_context *tctx;
 	/* Pointer to link object owned by *ev. */
 	struct pthreadpool_tevent_glue_ev_link *ev_link;
+	/* active jobs */
+	struct pthreadpool_tevent_job_state *states;
 };
 
 /*
@@ -127,6 +130,8 @@ struct pthreadpool_tevent {
 };
 
 struct pthreadpool_tevent_job_state {
+	struct pthreadpool_tevent_job_state *prev, *next;
+	struct pthreadpool_tevent_glue *glue;
 	struct tevent_context *ev;
 	struct tevent_req *req;
 	struct pthreadpool_tevent_job *job;
@@ -322,6 +327,16 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
 static int pthreadpool_tevent_glue_destructor(
 	struct pthreadpool_tevent_glue *glue)
 {
+	struct pthreadpool_tevent_job_state *state = NULL;
+	struct pthreadpool_tevent_job_state *nstate = NULL;
+
+	for (state = glue->states; state != NULL; state = nstate) {
+		nstate = state->next;
+
+		/* The job this removes it from the list */
+		pthreadpool_tevent_job_orphan(state->job);
+	}
+
 	if (glue->pool->glue_list != NULL) {
 		DLIST_REMOVE(glue->pool->glue_list, glue);
 	}
@@ -355,9 +370,11 @@ static int pthreadpool_tevent_glue_link_destructor(
 	return 0;
 }
 
-static int pthreadpool_tevent_register_ev(struct pthreadpool_tevent *pool,
-					  struct tevent_context *ev)
+static int pthreadpool_tevent_register_ev(
+				struct pthreadpool_tevent *pool,
+				struct pthreadpool_tevent_job_state *state)
 {
+	struct tevent_context *ev = state->ev;
 	struct pthreadpool_tevent_glue *glue = NULL;
 	struct pthreadpool_tevent_glue_ev_link *ev_link = NULL;
 
@@ -368,7 +385,9 @@ static int pthreadpool_tevent_register_ev(struct pthreadpool_tevent *pool,
 	 * pair.
 	 */
 	for (glue = pool->glue_list; glue != NULL; glue = glue->next) {
-		if (glue->ev == ev) {
+		if (glue->ev == state->ev) {
+			state->glue = glue;
+			DLIST_ADD_END(glue->states, state);
 			return 0;
 		}
 	}
@@ -416,6 +435,9 @@ static int pthreadpool_tevent_register_ev(struct pthreadpool_tevent *pool,
 	}
 #endif
 
+	state->glue = glue;
+	DLIST_ADD_END(glue->states, state);
+
 	DLIST_ADD(pool->glue_list, glue);
 	return 0;
 }
@@ -431,7 +453,7 @@ static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job *job)
 	/*
 	 * We should never be called with needs_fence.orphaned == false.
 	 * Only pthreadpool_tevent_job_orphan() will call TALLOC_FREE(job)
-	 * after detaching from the request state and pool list.
+	 * after detaching from the request state, glue and pool list.
 	 */
 	if (!job->needs_fence.orphaned) {
 		abort();
@@ -509,6 +531,42 @@ static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job)
 		abort();
 	}
 
+	/*
+	 * Once we marked the request as 'orphaned'
+	 * we spin/loop if it's already marked
+	 * as 'finished' (which means that
+	 * pthreadpool_tevent_job_signal() was entered.
+	 * If it saw 'orphaned' it will exit after setting
+	 * 'dropped', otherwise it dereferences
+	 * job->state->glue->{tctx,ev} until it exited
+	 * after setting 'signaled'.
+	 *
+	 * We need to close this potential gab before
+	 * we can set job->state = NULL.
+	 *
+	 * This is some kind of spinlock, but with
+	 * 1 millisecond sleeps in between, in order
+	 * to give the thread more cpu time to finish.
+	 */
+	PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+	while (job->needs_fence.finished) {
+		if (job->needs_fence.dropped) {
+			break;
+		}
+		if (job->needs_fence.signaled) {
+			break;
+		}
+		poll(NULL, 0, 1);
+		PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+	}
+
+	/*
+	 * Once the gab is closed, we can remove
+	 * the glue link.
+	 */
+	DLIST_REMOVE(job->state->glue->states, job->state);
+	job->state->glue = NULL;
+
 	/*
 	 * We need to reparent to a long term context.
 	 * And detach from the request state.
@@ -561,6 +619,10 @@ static void pthreadpool_tevent_job_cleanup(struct tevent_req *req,
 		 * The job request is not scheduled in the pool
 		 * yet or anymore.
 		 */
+		if (state->glue != NULL) {
+			DLIST_REMOVE(state->glue->states, state);
+			state->glue = NULL;
+		}
 		return;
 	}
 
@@ -605,7 +667,7 @@ struct tevent_req *pthreadpool_tevent_job_send(
 		return tevent_req_post(req, ev);
 	}
 
-	ret = pthreadpool_tevent_register_ev(pool, ev);
+	ret = pthreadpool_tevent_register_ev(pool, state);
 	if (tevent_req_error(req, ret)) {
 		return tevent_req_post(req, ev);
 	}
@@ -718,9 +780,6 @@ static int pthreadpool_tevent_job_signal(int jobid,
 	struct pthreadpool_tevent_job *job =
 		talloc_get_type_abort(job_private_data,
 		struct pthreadpool_tevent_job);
-	struct pthreadpool_tevent_job_state *state = job->state;
-	struct tevent_threaded_context *tctx = NULL;
-	struct pthreadpool_tevent_glue *g = NULL;
 
 	job->needs_fence.finished = true;
 	PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
@@ -731,27 +790,22 @@ static int pthreadpool_tevent_job_signal(int jobid,
 		return 0;
 	}
 
-#ifdef HAVE_PTHREAD
-	for (g = job->pool->glue_list; g != NULL; g = g->next) {
-		if (g->ev == state->ev) {
-			tctx = g->tctx;
-			break;
-		}
-	}
-
-	if (tctx == NULL) {
-		abort();
-	}
-#endif
-
-	if (tctx != NULL) {
+	/*
+	 * state and state->glue are valid,
+	 * see the job->needs_fence.finished
+	 * "spinlock" loop in
+	 * pthreadpool_tevent_job_orphan()
+	 */
+	if (job->state->glue->tctx != NULL) {
 		/* with HAVE_PTHREAD */
-		tevent_threaded_schedule_immediate(tctx, job->im,
+		tevent_threaded_schedule_immediate(job->state->glue->tctx,
+						   job->im,
 						   pthreadpool_tevent_job_done,
 						   job);
 	} else {
 		/* without HAVE_PTHREAD */
-		tevent_schedule_immediate(job->im, state->ev,
+		tevent_schedule_immediate(job->im,
+					  job->state->glue->ev,
 					  pthreadpool_tevent_job_done,
 					  job);
 	}
-- 
2.17.1


From 58f66504a57ed33043f57d44a8e5dcca45095734 Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Fri, 22 Jun 2018 17:22:10 +0200
Subject: [PATCH 11/20] pthreadpool: add a comment about a further optimization
 in pthreadpool_tevent_job_destructor()

This seems to be a really rare race, it's likely that the immediate
event will still trigger and cleanup.

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/pthreadpool/pthreadpool_tevent.c | 17 +++++++++++++++++
 1 file changed, 17 insertions(+)

diff --git a/lib/pthreadpool/pthreadpool_tevent.c b/lib/pthreadpool/pthreadpool_tevent.c
index 3b502a7cc5a3..94b6b9ded8ff 100644
--- a/lib/pthreadpool/pthreadpool_tevent.c
+++ b/lib/pthreadpool/pthreadpool_tevent.c
@@ -491,6 +491,23 @@ static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job *job)
 		TALLOC_FREE(job->im);
 	}
 
+	/*
+	 * TODO?: We could further improve this by adjusting
+	 * tevent_threaded_schedule_immediate_destructor()
+	 * and allow TALLOC_FREE() during its time
+	 * in the main_ev->scheduled_immediates list.
+	 *
+	 * PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+	 * if (state->needs_fence.signaled) {
+	 *       *
+	 *       * The signal function is completed
+	 *       * in future we may be allowed
+	 *       * to call TALLOC_FREE(job->im).
+	 *       *
+	 *      TALLOC_FREE(job->im);
+	 * }
+	 */
+
 	/*
 	 * pthreadpool_tevent_job_orphan() already removed
 	 * it from pool->jobs. And we don't need try
-- 
2.17.1


From 979dbdc60b32e3f35e4d5e4b49faf50dbe39a372 Mon Sep 17 00:00:00 2001
From: Ralph Boehme <slow at samba.org>
Date: Mon, 18 Jun 2018 15:32:30 +0200
Subject: [PATCH 12/20] pthreadpool: test cancelling and freeing pending
 pthreadpool_tevent jobs/pools

Pair-Programmed-With: Stefan Metzmacher <metze at samba.org>

Signed-off-by: Ralph Boehme <slow at samba.org>
Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/pthreadpool/tests_cmocka.c | 434 +++++++++++++++++++++++++++++++++
 1 file changed, 434 insertions(+)

diff --git a/lib/pthreadpool/tests_cmocka.c b/lib/pthreadpool/tests_cmocka.c
index 677800892f65..745a4f3975ab 100644
--- a/lib/pthreadpool/tests_cmocka.c
+++ b/lib/pthreadpool/tests_cmocka.c
@@ -17,12 +17,16 @@
  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  */
 
+#include "config.h"
 #include <errno.h>
 #include <pthread.h>
 #include <setjmp.h>
 #include <stdlib.h>
 #include <string.h>
 #include <limits.h>
+#include <unistd.h>
+#include <sys/types.h>
+#include <sys/socket.h>
 
 #include <talloc.h>
 #include <tevent.h>
@@ -31,6 +35,13 @@
 #include <cmocka.h>
 #include <poll.h>
 
+#ifdef HAVE_VALGRIND_HELGRIND_H
+#include <valgrind/helgrind.h>
+#endif
+#ifndef ANNOTATE_BENIGN_RACE_SIZED
+#define ANNOTATE_BENIGN_RACE_SIZED(address, size, describtion)
+#endif
+
 struct pthreadpool_tevent_test {
 	struct tevent_context *ev;
 	struct pthreadpool_tevent *upool;
@@ -223,12 +234,435 @@ static void test_create(void **state)
 	assert_false(in_main_thread);
 }
 
+struct test_cancel_job {
+	int fdm; /* the main end of socketpair */
+	int fdj; /* the job end of socketpair */
+	bool started;
+	bool canceled;
+	bool orphaned;
+	bool finished;
+	size_t polls;
+	size_t timeouts;
+	int sleep_msec;
+	struct tevent_req *req;
+	bool completed;
+	int ret;
+};
+
+static void test_cancel_job_done(struct tevent_req *req);
+
+static int test_cancel_job_destructor(struct test_cancel_job *job)
+{
+	ANNOTATE_BENIGN_RACE_SIZED(&job->started,
+				   sizeof(job->started),
+				   "protected by pthreadpool_tevent code");
+	if (job->started) {
+		ANNOTATE_BENIGN_RACE_SIZED(&job->finished,
+					   sizeof(job->finished),
+					   "protected by pthreadpool_tevent code");
+		assert_true(job->finished);
+	}
+
+	ANNOTATE_BENIGN_RACE_SIZED(&job->fdj,
+				   sizeof(job->fdj),
+				   "protected by pthreadpool_tevent code");
+
+	if (job->fdm != -1) {
+		close(job->fdm);
+		job->fdm = -1;
+	}
+	if (job->fdj != -1) {
+		close(job->fdj);
+		job->fdj = -1;
+	}
+
+	return 0;
+}
+
+static struct test_cancel_job *test_cancel_job_create(TALLOC_CTX *mem_ctx)
+{
+	struct test_cancel_job *job = NULL;
+
+	job = talloc(mem_ctx, struct test_cancel_job);
+	if (job == NULL) {
+		return NULL;
+	}
+	*job = (struct test_cancel_job) {
+		.fdm = -1,
+		.fdj = -1,
+		.sleep_msec = 50,
+	};
+
+	talloc_set_destructor(job, test_cancel_job_destructor);
+	return job;
+}
+
+static void test_cancel_job_fn(void *ptr)
+{
+	struct test_cancel_job *job = (struct test_cancel_job *)ptr;
+	int fdj = -1;
+	char c = 0;
+	int ret;
+
+	assert_non_null(job); /* make sure we abort without a job pointer */
+
+	job->started = true;
+	fdj = job->fdj;
+	job->fdj = -1;
+
+	if (!pthreadpool_tevent_current_job_continue()) {
+		job->canceled = pthreadpool_tevent_current_job_canceled();
+		job->orphaned = pthreadpool_tevent_current_job_orphaned();
+		job->finished = true;
+		close(fdj);
+		return;
+	}
+
+	/*
+	 * Notify that we main thread
+	 *
+	 * write of 1 byte should always work!
+	 */
+	ret = write(fdj, &c, 1);
+	assert_return_code(ret, 1);
+
+	/*
+	 * loop until the job was tried to
+	 * be canceled or becomes orphaned.
+	 *
+	 * If there's some activity on the fd
+	 * we directly finish.
+	 */
+	do {
+		struct pollfd pfd = {
+			.fd = fdj,
+			.events = POLLIN,
+		};
+
+		job->polls += 1;
+
+		ret = poll(&pfd, 1, job->sleep_msec);
+		if (ret == 1) {
+			job->finished = true;
+			close(fdj);
+			return;
+		}
+		assert_return_code(ret, 0);
+
+		job->timeouts += 1;
+
+	} while (pthreadpool_tevent_current_job_continue());
+
+	job->canceled = pthreadpool_tevent_current_job_canceled();
+	job->orphaned = pthreadpool_tevent_current_job_orphaned();
+	job->finished = true;
+	close(fdj);
+}
+
+static void test_cancel_job_done(struct tevent_req *req)
+{
+	struct test_cancel_job *job =
+		tevent_req_callback_data(req,
+		struct test_cancel_job);
+
+	job->ret = pthreadpool_tevent_job_recv(job->req);
+	TALLOC_FREE(job->req);
+	job->completed = true;
+}
+
+static void test_cancel_job_wait(struct test_cancel_job *job,
+				 struct tevent_context *ev)
+{
+	/*
+	 * We have to keep looping until
+	 * test_cancel_job_done was triggered
+	 */
+	while (!job->completed) {
+		int ret;
+
+		ret = tevent_loop_once(ev);
+		assert_return_code(ret, 0);
+	}
+}
+
+struct test_cancel_state {
+	struct test_cancel_job *job1;
+	struct test_cancel_job *job2;
+	struct test_cancel_job *job3;
+	struct test_cancel_job *job4;
+	struct test_cancel_job *job5;
+	struct test_cancel_job *job6;
+};
+
+static void test_cancel_job(void **private_data)
+{
+	struct pthreadpool_tevent_test *t = *private_data;
+	struct tevent_context *ev = t->ev;
+	struct pthreadpool_tevent *pool = t->opool;
+	struct test_cancel_state *state = NULL;
+	int ret;
+	bool ok;
+	int fdpair[2] = { -1, -1 };
+	char c = 0;
+
+	state = talloc_zero(t, struct test_cancel_state);
+	assert_non_null(state);
+	state->job1 = test_cancel_job_create(state);
+	assert_non_null(state->job1);
+	state->job2 = test_cancel_job_create(state);
+	assert_non_null(state->job2);
+	state->job3 = test_cancel_job_create(state);
+	assert_non_null(state->job3);
+
+	ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
+	assert_return_code(ret, 0);
+
+	state->job1->fdm = fdpair[0];
+	state->job1->fdj = fdpair[1];
+
+	assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 0);
+
+	will_return(__wrap_pthread_create, 0);
+	state->job1->req = pthreadpool_tevent_job_send(
+		state->job1, ev, pool, test_cancel_job_fn, state->job1);
+	assert_non_null(state->job1->req);
+	tevent_req_set_callback(state->job1->req,
+				test_cancel_job_done,
+				state->job1);
+
+	state->job2->req = pthreadpool_tevent_job_send(
+		state->job2, ev, pool, test_cancel_job_fn, NULL);
+	assert_non_null(state->job2->req);
+	tevent_req_set_callback(state->job2->req,
+				test_cancel_job_done,
+				state->job2);
+
+	state->job3->req = pthreadpool_tevent_job_send(
+		state->job3, ev, pool, test_cancel_job_fn, NULL);
+	assert_non_null(state->job3->req);
+	tevent_req_set_callback(state->job3->req,
+				test_cancel_job_done,
+				state->job3);
+
+	/*
+	 * Wait for the job 1 to start.
+	 */
+	ret = read(state->job1->fdm, &c, 1);
+	assert_return_code(ret, 1);
+
+	/*
+	 * We cancel job 3 and destroy job2.
+	 * Both should never be executed.
+	 */
+	assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 2);
+	TALLOC_FREE(state->job2->req);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 1);
+	ok = tevent_req_cancel(state->job3->req);
+	assert_true(ok);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 0);
+
+	/*
+	 * Job 3 should complete as canceled, while
+	 * job 1 is still running.
+	 */
+	test_cancel_job_wait(state->job3, ev);
+	assert_return_code(state->job3->ret, ECANCELED);
+	assert_null(state->job3->req);
+	assert_false(state->job3->started);
+
+	/*
+	 * Now job1 is canceled while it's running,
+	 * this should let it stop it's loop.
+	 */
+	ok = tevent_req_cancel(state->job1->req);
+	assert_false(ok);
+
+	/*
+	 * Job 1 completes, It got at least one sleep
+	 * timeout loop and has state->job1->canceled set.
+	 */
+	test_cancel_job_wait(state->job1, ev);
+	assert_return_code(state->job1->ret, 0);
+	assert_null(state->job1->req);
+	assert_true(state->job1->started);
+	assert_true(state->job1->finished);
+	assert_true(state->job1->canceled);
+	assert_false(state->job1->orphaned);
+	assert_in_range(state->job1->polls, 1, 100);
+	assert_int_equal(state->job1->timeouts, state->job1->polls);
+
+	/*
+	 * Now we create jobs 4 and 5
+	 * Both should execute.
+	 * Job 4 is orphaned while running by a TALLOC_FREE()
+	 * This should stop job 4 and let job 5 start.
+	 * We do a "normal" exit in job 5 by creating some activity
+	 * on the socketpair.
+	 */
+
+	state->job4 = test_cancel_job_create(state);
+	assert_non_null(state->job4);
+
+	ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
+	assert_return_code(ret, 0);
+
+	state->job4->fdm = fdpair[0];
+	state->job4->fdj = fdpair[1];
+
+	state->job4->req = pthreadpool_tevent_job_send(
+		state->job4, ev, pool, test_cancel_job_fn, state->job4);
+	assert_non_null(state->job4->req);
+	tevent_req_set_callback(state->job4->req,
+				test_cancel_job_done,
+				state->job4);
+
+	state->job5 = test_cancel_job_create(state);
+	assert_non_null(state->job5);
+
+	ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
+	assert_return_code(ret, 0);
+
+	state->job5->fdm = fdpair[0];
+	state->job5->fdj = fdpair[1];
+
+	state->job5->req = pthreadpool_tevent_job_send(
+		state->job5, ev, pool, test_cancel_job_fn, state->job5);
+	assert_non_null(state->job5->req);
+	tevent_req_set_callback(state->job5->req,
+				test_cancel_job_done,
+				state->job5);
+
+	/*
+	 * Make sure job 5 can exit as soon as possible.
+	 * It will never get a sleep/poll timeout.
+	 */
+	ret = write(state->job5->fdm, &c, 1);
+	assert_return_code(ret, errno);
+
+	/*
+	 * Wait for the job 4 to start
+	 */
+	ret = read(state->job4->fdm, &c, 1);
+	assert_return_code(ret, 1);
+
+	assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 1);
+
+	/*
+	 * destroy the request so that it's marked
+	 * as orphaned.
+	 */
+	TALLOC_FREE(state->job4->req);
+
+	/*
+	 * Job 5 completes, It got no sleep timeout loop.
+	 */
+	test_cancel_job_wait(state->job5, ev);
+	assert_return_code(state->job5->ret, 0);
+	assert_null(state->job5->req);
+	assert_true(state->job5->started);
+	assert_true(state->job5->finished);
+	assert_false(state->job5->canceled);
+	assert_false(state->job5->orphaned);
+	assert_int_equal(state->job5->polls, 1);
+	assert_int_equal(state->job5->timeouts, 0);
+
+	assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 0);
+
+	/*
+	 * Job 2 is still not executed as we did a TALLOC_FREE()
+	 * before is was scheduled.
+	 */
+	assert_false(state->job2->completed);
+	assert_false(state->job2->started);
+
+	/*
+	 * Job 4 is still wasn't completed as we did a TALLOC_FREE()
+	 * while it is was running. but it was started and has
+	 * orphaned set
+	 */
+	assert_false(state->job4->completed);
+	assert_true(state->job4->started);
+	assert_true(state->job4->finished);
+	assert_false(state->job4->canceled);
+	assert_true(state->job4->orphaned);
+	assert_in_range(state->job4->polls, 1, 100);
+	assert_int_equal(state->job4->timeouts, state->job4->polls);
+
+	/*
+	 * Now we create jobs 6
+	 * We destroy the pool while it's executing.
+	 */
+
+	state->job6 = test_cancel_job_create(state);
+	assert_non_null(state->job6);
+
+	ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
+	assert_return_code(ret, 0);
+
+	state->job6->fdm = fdpair[0];
+	state->job6->fdj = fdpair[1];
+
+	state->job6->req = pthreadpool_tevent_job_send(
+		state->job6, ev, pool, test_cancel_job_fn, state->job6);
+	assert_non_null(state->job6->req);
+	tevent_req_set_callback(state->job6->req,
+				test_cancel_job_done,
+				state->job6);
+
+	/*
+	 * Wait for the job 6 to start
+	 */
+	ret = read(state->job6->fdm, &c, 1);
+	assert_return_code(ret, 1);
+
+	assert_int_equal(pthreadpool_tevent_queued_jobs(pool), 0);
+
+	/*
+	 * destroy the request so that it's marked
+	 * as orphaned.
+	 */
+	pool = NULL;
+	TALLOC_FREE(t->opool);
+
+	/*
+	 * Wait until the job finished.
+	 */
+	ret = read(state->job6->fdm, &c, 1);
+	assert_return_code(ret, 0);
+
+	/*
+	 * Job 6 is still dangling arround.
+	 *
+	 * We need to convince valgrind --tool={drd,helgrind}
+	 * that the read above is good enough to be
+	 * sure the job is finished and closed the other end of
+	 * the socketpair.
+	 */
+	ANNOTATE_BENIGN_RACE_SIZED(state->job6,
+				   sizeof(*state->job6),
+				   "protected by thread fence");
+	assert_non_null(state->job6->req);
+	assert_true(tevent_req_is_in_progress(state->job6->req));
+	assert_false(state->job6->completed);
+	assert_true(state->job6->started);
+	assert_true(state->job6->finished);
+	assert_false(state->job6->canceled);
+	assert_true(state->job6->orphaned);
+	assert_in_range(state->job6->polls, 1, 100);
+	assert_int_equal(state->job6->timeouts, state->job4->polls);
+
+	TALLOC_FREE(state);
+}
+
 int main(int argc, char **argv)
 {
 	const struct CMUnitTest tests[] = {
 		cmocka_unit_test_setup_teardown(test_create,
 						setup_pthreadpool_tevent,
 						teardown_pthreadpool_tevent),
+		cmocka_unit_test_setup_teardown(test_cancel_job,
+						setup_pthreadpool_tevent,
+						teardown_pthreadpool_tevent),
 	};
 
 	cmocka_set_message_output(CM_OUTPUT_SUBUNIT);
-- 
2.17.1


From 37280e5473d0c6bbd8d0de5a13cd6ac1083f786d Mon Sep 17 00:00:00 2001
From: Ralph Boehme <slow at samba.org>
Date: Tue, 13 Mar 2018 16:58:49 +0100
Subject: [PATCH 13/20] configure: check for Linux specific unshare() with
 CLONE_FS

Note we still need some kind of runtime detection as
it can fail in some constraint container setups, which
reject the whole unshare() syscall instead of just the
once used for container features.

In case unshare(CLONE_FS) works, we can have a per thread
current working directory and use [f]chdir() safely in
worker threads.

Pair-Programmed-With: Stefan Metzmacher <metze at samba.org>

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 source3/wscript | 5 +++++
 1 file changed, 5 insertions(+)

diff --git a/source3/wscript b/source3/wscript
index aed784ef0179..a14d76d7469e 100644
--- a/source3/wscript
+++ b/source3/wscript
@@ -1505,6 +1505,11 @@ main() {
                 legacy_quota_libs = ''
     conf.env['legacy_quota_libs'] = legacy_quota_libs
 
+    conf.CHECK_CODE('(void)unshare(CLONE_FS);',
+                    headers='sched.h',
+                    define='HAVE_UNSHARE_CLONE_FS',
+                    msg='for Linux unshare(CLONE_FS)')
+
     #
     # cluster support (CTDB)
     #
-- 
2.17.1


From 026dd8165907eac994ed50c3bb433f5acf0aaf9a Mon Sep 17 00:00:00 2001
From: Ralph Boehme <slow at samba.org>
Date: Tue, 13 Mar 2018 16:59:32 +0100
Subject: [PATCH 14/20] pthreadpool: call unshare(CLONE_FS) if available

This paves the way for pthreadpool jobs that are path based.

Callers can use pthreadpool_per_thread_cwd() to check if
the current pool supports it.

Pair-Programmed-With: Stefan Metzmacher <metze at samba.org>

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/pthreadpool/pthreadpool.c      | 34 ++++++++++++++++++++++++++++++
 lib/pthreadpool/pthreadpool.h      | 17 +++++++++++++++
 lib/pthreadpool/pthreadpool_sync.c |  5 +++++
 3 files changed, 56 insertions(+)

diff --git a/lib/pthreadpool/pthreadpool.c b/lib/pthreadpool/pthreadpool.c
index 528f4cdfd678..127e684c63e3 100644
--- a/lib/pthreadpool/pthreadpool.c
+++ b/lib/pthreadpool/pthreadpool.c
@@ -112,10 +112,13 @@ struct pthreadpool {
 	 * where the forking thread will unlock it again.
 	 */
 	pthread_mutex_t fork_mutex;
+
+	bool per_thread_cwd;
 };
 
 static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
 static struct pthreadpool *pthreadpools = NULL;
+static bool pthreadpool_support_thread_cwd = false;
 static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
 
 static void pthreadpool_prep_atfork(void);
@@ -182,6 +185,11 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
 	pool->max_threads = max_threads;
 	pool->num_idle = 0;
 	pool->prefork_cond = NULL;
+	if (max_threads != 0) {
+		pool->per_thread_cwd = pthreadpool_support_thread_cwd;
+	} else {
+		pool->per_thread_cwd = false;
+	}
 
 	ret = pthread_mutex_lock(&pthreadpools_mutex);
 	if (ret != 0) {
@@ -241,6 +249,15 @@ size_t pthreadpool_queued_jobs(struct pthreadpool *pool)
 	return ret;
 }
 
+bool pthreadpool_per_thread_cwd(struct pthreadpool *pool)
+{
+	if (pool->stopped) {
+		return false;
+	}
+
+	return pool->per_thread_cwd;
+}
+
 static void pthreadpool_prepare_pool(struct pthreadpool *pool)
 {
 	int ret;
@@ -359,6 +376,16 @@ static void pthreadpool_child(void)
 
 static void pthreadpool_prep_atfork(void)
 {
+#ifdef HAVE_UNSHARE_CLONE_FS
+	int res;
+
+	/* remember if unshare(CLONE_FS) works. */
+	res = unshare(CLONE_FS);
+	if (res == 0) {
+		pthreadpool_support_thread_cwd = true;
+	}
+#endif
+
 	pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
 		       pthreadpool_child);
 }
@@ -571,6 +598,13 @@ static void *pthreadpool_server(void *arg)
 	struct pthreadpool *pool = (struct pthreadpool *)arg;
 	int res;
 
+#ifdef HAVE_UNSHARE_CLONE_FS
+	if (pool->per_thread_cwd) {
+		res = unshare(CLONE_FS);
+		assert(res == 0);
+	}
+#endif
+
 	res = pthread_mutex_lock(&pool->mutex);
 	if (res != 0) {
 		return NULL;
diff --git a/lib/pthreadpool/pthreadpool.h b/lib/pthreadpool/pthreadpool.h
index b4733580e07b..d8daf9e4519b 100644
--- a/lib/pthreadpool/pthreadpool.h
+++ b/lib/pthreadpool/pthreadpool.h
@@ -71,6 +71,23 @@ size_t pthreadpool_max_threads(struct pthreadpool *pool);
  */
 size_t pthreadpool_queued_jobs(struct pthreadpool *pool);
 
+/**
+ * @brief Check for per thread current working directory support of pthreadpool
+ *
+ * Since Linux kernel 2.6.16, unshare(CLONE_FS) is supported,
+ * which provides a per thread current working directory
+ * and allows [f]chdir() within the worker threads.
+ *
+ * Note that this doesn't work on some contraint container setups,
+ * the complete unshare() syscall might be rejected.
+ * pthreadpool_per_thread_cwd() returns what is available
+ * at runtime, so the callers should really check this!
+ *
+ * @param[in]	pool		The pool to run the job on
+ * @return			supported: true, otherwise: false
+ */
+bool pthreadpool_per_thread_cwd(struct pthreadpool *pool);
+
 /**
  * @brief Stop a pthreadpool
  *
diff --git a/lib/pthreadpool/pthreadpool_sync.c b/lib/pthreadpool/pthreadpool_sync.c
index 48e6a0ddb604..2ed6f36dbbc7 100644
--- a/lib/pthreadpool/pthreadpool_sync.c
+++ b/lib/pthreadpool/pthreadpool_sync.c
@@ -65,6 +65,11 @@ size_t pthreadpool_queued_jobs(struct pthreadpool *pool)
 	return 0;
 }
 
+bool pthreadpool_per_thread_cwd(struct pthreadpool *pool)
+{
+	return false;
+}
+
 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
 			void (*fn)(void *private_data), void *private_data)
 {
-- 
2.17.1


From 10ae603150056d5d05f7f14dd81a96248b7370ae Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Fri, 22 Jun 2018 01:02:41 +0200
Subject: [PATCH 15/20] pthreadpool: add
 pthreadpool_tevent_[current_job_]per_thread_cwd()

This can be used to check if worker threads run with
unshare(CLONE_FS).

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/pthreadpool/pthreadpool_tevent.c | 26 ++++++++++++++++++++++++++
 lib/pthreadpool/pthreadpool_tevent.h |  7 +++++++
 2 files changed, 33 insertions(+)

diff --git a/lib/pthreadpool/pthreadpool_tevent.c b/lib/pthreadpool/pthreadpool_tevent.c
index 94b6b9ded8ff..01e8586b384d 100644
--- a/lib/pthreadpool/pthreadpool_tevent.c
+++ b/lib/pthreadpool/pthreadpool_tevent.c
@@ -210,6 +210,8 @@ struct pthreadpool_tevent_job {
 		 */
 		bool signaled;
 	} needs_fence;
+
+	bool per_thread_cwd;
 };
 
 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool);
@@ -283,6 +285,15 @@ size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool)
 	return pthreadpool_queued_jobs(pool->pool);
 }
 
+bool pthreadpool_tevent_per_thread_cwd(struct pthreadpool_tevent *pool)
+{
+	if (pool->pool == NULL) {
+		return false;
+	}
+
+	return pthreadpool_per_thread_cwd(pool->pool);
+}
+
 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
 {
 	struct pthreadpool_tevent_job *job = NULL;
@@ -701,6 +712,7 @@ struct tevent_req *pthreadpool_tevent_job_send(
 		return tevent_req_post(req, ev);
 	}
 	PTHREAD_TEVENT_JOB_THREAD_FENCE_INIT(job);
+	job->per_thread_cwd = pthreadpool_tevent_per_thread_cwd(pool);
 	talloc_set_destructor(job, pthreadpool_tevent_job_destructor);
 	DLIST_ADD_END(job->pool->jobs, job);
 	job->state = state;
@@ -772,6 +784,20 @@ bool pthreadpool_tevent_current_job_continue(void)
 	return true;
 }
 
+bool pthreadpool_tevent_current_job_per_thread_cwd(void)
+{
+	if (current_job == NULL) {
+		/*
+		 * Should only be called from within
+		 * the job function.
+		 */
+		abort();
+		return false;
+	}
+
+	return current_job->per_thread_cwd;
+}
+
 static void pthreadpool_tevent_job_fn(void *private_data)
 {
 	struct pthreadpool_tevent_job *job =
diff --git a/lib/pthreadpool/pthreadpool_tevent.h b/lib/pthreadpool/pthreadpool_tevent.h
index 37e491e17c47..ff2ab7cfb73d 100644
--- a/lib/pthreadpool/pthreadpool_tevent.h
+++ b/lib/pthreadpool/pthreadpool_tevent.h
@@ -31,6 +31,7 @@ int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
 
 size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool);
 size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool);
+bool pthreadpool_tevent_per_thread_cwd(struct pthreadpool_tevent *pool);
 
 /*
  * return true - if tevent_req_cancel() was called.
@@ -46,6 +47,12 @@ bool pthreadpool_tevent_current_job_orphaned(void);
  */
 bool pthreadpool_tevent_current_job_continue(void);
 
+/*
+ * return true if the current job can rely on a per thread
+ * current working directory.
+ */
+bool pthreadpool_tevent_current_job_per_thread_cwd(void);
+
 struct tevent_req *pthreadpool_tevent_job_send(
 	TALLOC_CTX *mem_ctx, struct tevent_context *ev,
 	struct pthreadpool_tevent *pool,
-- 
2.17.1


From 26cbfbbab5564a051dcbdfefe9ff9f3749dce117 Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Wed, 18 Jul 2018 10:17:51 +0200
Subject: [PATCH 16/20] pthreadpool: test pthreadpool_tevent_max_threads()
 returns the expected result

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/pthreadpool/tests_cmocka.c | 10 ++++++++++
 1 file changed, 10 insertions(+)

diff --git a/lib/pthreadpool/tests_cmocka.c b/lib/pthreadpool/tests_cmocka.c
index 745a4f3975ab..95b3a54be123 100644
--- a/lib/pthreadpool/tests_cmocka.c
+++ b/lib/pthreadpool/tests_cmocka.c
@@ -53,6 +53,7 @@ static int setup_pthreadpool_tevent(void **state)
 {
 	struct pthreadpool_tevent_test *t;
 	int ret;
+	size_t max_threads;
 
 	t = talloc_zero(NULL, struct pthreadpool_tevent_test);
 	assert_non_null(t);
@@ -63,12 +64,21 @@ static int setup_pthreadpool_tevent(void **state)
 	ret = pthreadpool_tevent_init(t->ev, UINT_MAX, &t->upool);
 	assert_return_code(ret, 0);
 
+	max_threads = pthreadpool_tevent_max_threads(t->upool);
+	assert_int_equal(max_threads, UINT_MAX);
+
 	ret = pthreadpool_tevent_init(t->ev, 1, &t->opool);
 	assert_return_code(ret, 0);
 
+	max_threads = pthreadpool_tevent_max_threads(t->opool);
+	assert_int_equal(max_threads, 1);
+
 	ret = pthreadpool_tevent_init(t->ev, 0, &t->spool);
 	assert_return_code(ret, 0);
 
+	max_threads = pthreadpool_tevent_max_threads(t->spool);
+	assert_int_equal(max_threads, 0);
+
 	*state = t;
 
 	return 0;
-- 
2.17.1


From 2b84b836a4454fbad5f5109a260c47e99006ed9a Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Wed, 18 Jul 2018 10:21:22 +0200
Subject: [PATCH 17/20] pthreadpool: add tests for
 pthreadpool_tevent_[current_job_]per_thread_cwd()

Note this currently this doesn't enforce the support for
unshare(CLONE_FS) as some contraint container environment
(e.g. docker) reject the whole unshare() system call.

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/pthreadpool/tests_cmocka.c | 144 +++++++++++++++++++++++++++++++++
 1 file changed, 144 insertions(+)

diff --git a/lib/pthreadpool/tests_cmocka.c b/lib/pthreadpool/tests_cmocka.c
index 95b3a54be123..fa508e24eab2 100644
--- a/lib/pthreadpool/tests_cmocka.c
+++ b/lib/pthreadpool/tests_cmocka.c
@@ -244,6 +244,147 @@ static void test_create(void **state)
 	assert_false(in_main_thread);
 }
 
+static void test_per_thread_cwd_job(void *ptr)
+{
+	const bool *per_thread_cwd_ptr = ptr;
+	bool per_thread_cwd;
+	char cwdbuf[PATH_MAX] = {0,};
+	char *cwdstr = NULL;
+	int ret;
+
+	/*
+	 * This needs to be consistent.
+	 */
+	per_thread_cwd = pthreadpool_tevent_current_job_per_thread_cwd();
+	assert_int_equal(per_thread_cwd, *per_thread_cwd_ptr);
+
+	if (!per_thread_cwd) {
+		return;
+	}
+
+	/*
+	 * Check we're not already in "/".
+	 */
+	cwdstr = getcwd(cwdbuf, sizeof(cwdbuf));
+	assert_non_null(cwdstr);
+	assert_string_not_equal(cwdstr, "/");
+
+	ret = chdir("/");
+	assert_int_equal(ret, 0);
+
+	/*
+	 * Check we're in "/" now.
+	 */
+	cwdstr = getcwd(cwdbuf, sizeof(cwdbuf));
+	assert_non_null(cwdstr);
+	assert_string_equal(cwdstr, "/");
+}
+
+static int test_per_thread_cwd_do(struct tevent_context *ev,
+				  struct pthreadpool_tevent *pool)
+{
+	struct tevent_req *req;
+	bool per_thread_cwd;
+	bool ok;
+	int ret;
+	per_thread_cwd = pthreadpool_tevent_per_thread_cwd(pool);
+
+	req = pthreadpool_tevent_job_send(
+		ev, ev, pool, test_per_thread_cwd_job, &per_thread_cwd);
+	if (req == NULL) {
+		fprintf(stderr, "pthreadpool_tevent_job_send failed\n");
+		return ENOMEM;
+	}
+
+	ok = tevent_req_poll(req, ev);
+	if (!ok) {
+		ret = errno;
+		fprintf(stderr, "tevent_req_poll failed: %s\n",
+			strerror(ret));
+		return ret;
+	}
+
+	ret = pthreadpool_tevent_job_recv(req);
+	TALLOC_FREE(req);
+	if (ret != 0) {
+		fprintf(stderr, "tevent_req_recv failed: %s\n",
+			strerror(ret));
+		return ret;
+	}
+
+	return 0;
+}
+
+static void test_per_thread_cwd(void **state)
+{
+	struct pthreadpool_tevent_test *t = *state;
+	int ret;
+	bool per_thread_cwd_u;
+	bool per_thread_cwd_o;
+	bool per_thread_cwd_s;
+	char cwdbuf1[PATH_MAX] = {0,};
+	char *cwdstr1 = NULL;
+	char cwdbuf2[PATH_MAX] = {0,};
+	char *cwdstr2 = NULL;
+
+	/*
+	 * The unlimited and one pools
+	 * should be consistent.
+	 *
+	 * We can't enforce this as some constraint
+	 * container environments disable unshare()
+	 * completely, even just with CLONE_FS.
+	 */
+	per_thread_cwd_u = pthreadpool_tevent_per_thread_cwd(t->upool);
+	per_thread_cwd_o = pthreadpool_tevent_per_thread_cwd(t->opool);
+	assert_int_equal(per_thread_cwd_u, per_thread_cwd_o);
+
+	/*
+	 * The sync pool should never support this.
+	 */
+	per_thread_cwd_s = pthreadpool_tevent_per_thread_cwd(t->spool);
+	assert_false(per_thread_cwd_s);
+
+	/*
+	 * Check we're not already in "/".
+	 */
+	cwdstr1 = getcwd(cwdbuf1, sizeof(cwdbuf1));
+	assert_non_null(cwdstr1);
+	assert_string_not_equal(cwdstr1, "/");
+
+	will_return(__wrap_pthread_create, 0);
+	ret = test_per_thread_cwd_do(t->ev, t->upool);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * Check we're still in the same directory.
+	 */
+	cwdstr2 = getcwd(cwdbuf2, sizeof(cwdbuf2));
+	assert_non_null(cwdstr2);
+	assert_string_equal(cwdstr2, cwdstr1);
+
+	will_return(__wrap_pthread_create, 0);
+	ret = test_per_thread_cwd_do(t->ev, t->opool);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * Check we're still in the same directory.
+	 */
+	cwdstr2 = getcwd(cwdbuf2, sizeof(cwdbuf2));
+	assert_non_null(cwdstr2);
+	assert_string_equal(cwdstr2, cwdstr1);
+
+	ret = test_per_thread_cwd_do(t->ev, t->spool);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * Check we're still in the same directory.
+	 */
+	cwdstr2 = getcwd(cwdbuf2, sizeof(cwdbuf2));
+	assert_non_null(cwdstr2);
+	assert_string_equal(cwdstr2, cwdstr1);
+}
+
 struct test_cancel_job {
 	int fdm; /* the main end of socketpair */
 	int fdj; /* the job end of socketpair */
@@ -670,6 +811,9 @@ int main(int argc, char **argv)
 		cmocka_unit_test_setup_teardown(test_create,
 						setup_pthreadpool_tevent,
 						teardown_pthreadpool_tevent),
+		cmocka_unit_test_setup_teardown(test_per_thread_cwd,
+						setup_pthreadpool_tevent,
+						teardown_pthreadpool_tevent),
 		cmocka_unit_test_setup_teardown(test_cancel_job,
 						setup_pthreadpool_tevent,
 						teardown_pthreadpool_tevent),
-- 
2.17.1


From aba2d97de74e7bd1b6714901ca93c8e9591b26fa Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Mon, 16 Jul 2018 14:43:01 +0200
Subject: [PATCH 18/20] pthreadpool: add
 pthreadpool_restart_check[_monitor_{fd,drain}]()

This makes it possible to monitor the pthreadpool for exited worker
threads and may restart new threads from the main thread again.

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/pthreadpool/pthreadpool.c      | 281 +++++++++++++++++++++++++++++
 lib/pthreadpool/pthreadpool.h      |  64 +++++++
 lib/pthreadpool/pthreadpool_sync.c |  20 ++
 3 files changed, 365 insertions(+)

diff --git a/lib/pthreadpool/pthreadpool.c b/lib/pthreadpool/pthreadpool.c
index 127e684c63e3..db3837cbda37 100644
--- a/lib/pthreadpool/pthreadpool.c
+++ b/lib/pthreadpool/pthreadpool.c
@@ -23,6 +23,7 @@
 #include "system/threads.h"
 #include "pthreadpool.h"
 #include "lib/util/dlinklist.h"
+#include "lib/util/blocking.h"
 
 #ifdef NDEBUG
 #undef NDEBUG
@@ -52,6 +53,8 @@ struct pthreadpool {
 	 */
 	pthread_cond_t condvar;
 
+	int check_pipefd[2];
+
 	/*
 	 * Array of jobs
 	 */
@@ -136,6 +139,7 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
 {
 	struct pthreadpool *pool;
 	int ret;
+	bool ok;
 
 	pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
 	if (pool == NULL) {
@@ -153,10 +157,52 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
 		return ENOMEM;
 	}
 
+	ret = pipe(pool->check_pipefd);
+	if (ret != 0) {
+		free(pool->jobs);
+		free(pool);
+		return ENOMEM;
+	}
+
+	ok = smb_set_close_on_exec(pool->check_pipefd[0]);
+	if (!ok) {
+		close(pool->check_pipefd[0]);
+		close(pool->check_pipefd[1]);
+		free(pool->jobs);
+		free(pool);
+		return EINVAL;
+	}
+	ok = smb_set_close_on_exec(pool->check_pipefd[1]);
+	if (!ok) {
+		close(pool->check_pipefd[0]);
+		close(pool->check_pipefd[1]);
+		free(pool->jobs);
+		free(pool);
+		return EINVAL;
+	}
+	ret = set_blocking(pool->check_pipefd[0], true);
+	if (ret == -1) {
+		close(pool->check_pipefd[0]);
+		close(pool->check_pipefd[1]);
+		free(pool->jobs);
+		free(pool);
+		return EINVAL;
+	}
+	ret = set_blocking(pool->check_pipefd[1], false);
+	if (ret == -1) {
+		close(pool->check_pipefd[0]);
+		close(pool->check_pipefd[1]);
+		free(pool->jobs);
+		free(pool);
+		return EINVAL;
+	}
+
 	pool->head = pool->num_jobs = 0;
 
 	ret = pthread_mutex_init(&pool->mutex, NULL);
 	if (ret != 0) {
+		close(pool->check_pipefd[0]);
+		close(pool->check_pipefd[1]);
 		free(pool->jobs);
 		free(pool);
 		return ret;
@@ -165,6 +211,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
 	ret = pthread_cond_init(&pool->condvar, NULL);
 	if (ret != 0) {
 		pthread_mutex_destroy(&pool->mutex);
+		close(pool->check_pipefd[0]);
+		close(pool->check_pipefd[1]);
 		free(pool->jobs);
 		free(pool);
 		return ret;
@@ -174,6 +222,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
 	if (ret != 0) {
 		pthread_cond_destroy(&pool->condvar);
 		pthread_mutex_destroy(&pool->mutex);
+		close(pool->check_pipefd[0]);
+		close(pool->check_pipefd[1]);
 		free(pool->jobs);
 		free(pool);
 		return ret;
@@ -196,6 +246,8 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
 		pthread_mutex_destroy(&pool->fork_mutex);
 		pthread_cond_destroy(&pool->condvar);
 		pthread_mutex_destroy(&pool->mutex);
+		close(pool->check_pipefd[0]);
+		close(pool->check_pipefd[1]);
 		free(pool->jobs);
 		free(pool);
 		return ret;
@@ -359,6 +411,14 @@ static void pthreadpool_child(void)
 		pool->head = 0;
 		pool->num_jobs = 0;
 		pool->stopped = true;
+		if (pool->check_pipefd[0] != -1) {
+			close(pool->check_pipefd[0]);
+			pool->check_pipefd[0] = -1;
+		}
+		if (pool->check_pipefd[1] != -1) {
+			close(pool->check_pipefd[1]);
+			pool->check_pipefd[1] = -1;
+		}
 
 		ret = pthread_cond_init(&pool->condvar, NULL);
 		assert(ret == 0);
@@ -421,6 +481,14 @@ static int pthreadpool_free(struct pthreadpool *pool)
 		return ret2;
 	}
 
+	if (pool->check_pipefd[0] != -1) {
+		close(pool->check_pipefd[0]);
+		pool->check_pipefd[0] = -1;
+	}
+	if (pool->check_pipefd[1] != -1) {
+		close(pool->check_pipefd[1]);
+		pool->check_pipefd[1] = -1;
+	}
 	free(pool->jobs);
 	free(pool);
 
@@ -437,6 +505,15 @@ static int pthreadpool_stop_locked(struct pthreadpool *pool)
 
 	pool->stopped = true;
 
+	if (pool->check_pipefd[0] != -1) {
+		close(pool->check_pipefd[0]);
+		pool->check_pipefd[0] = -1;
+	}
+	if (pool->check_pipefd[1] != -1) {
+		close(pool->check_pipefd[1]);
+		pool->check_pipefd[1] = -1;
+	}
+
 	if (pool->num_threads == 0) {
 		return 0;
 	}
@@ -521,6 +598,33 @@ static void pthreadpool_server_exit(struct pthreadpool *pool)
 
 	free_it = (pool->destroyed && (pool->num_threads == 0));
 
+	while (true) {
+		uint8_t c = 0;
+		ssize_t nwritten = 0;
+
+		if (pool->check_pipefd[1] == -1) {
+			break;
+		}
+
+		nwritten = write(pool->check_pipefd[1], &c, 1);
+		if (nwritten == -1) {
+			if (errno == EINTR) {
+				continue;
+			}
+			if (errno == EAGAIN) {
+				break;
+			}
+#ifdef EWOULDBLOCK
+			if (errno == EWOULDBLOCK) {
+				break;
+			}
+#endif
+			/* ignore ... */
+		}
+
+		break;
+	}
+
 	ret = pthread_mutex_unlock(&pool->mutex);
 	assert(ret == 0);
 
@@ -851,6 +955,183 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
 	return res;
 }
 
+int pthreadpool_restart_check(struct pthreadpool *pool)
+{
+	int res;
+	int unlock_res;
+	unsigned possible_threads = 0;
+	unsigned missing_threads = 0;
+
+	assert(!pool->destroyed);
+
+	res = pthread_mutex_lock(&pool->mutex);
+	if (res != 0) {
+		return res;
+	}
+
+	if (pool->stopped) {
+		/*
+		 * Protect against the pool being shut down while
+		 * trying to add a job
+		 */
+		unlock_res = pthread_mutex_unlock(&pool->mutex);
+		assert(unlock_res == 0);
+		return EINVAL;
+	}
+
+	if (pool->num_jobs == 0) {
+		/*
+		 * This also handles the pool->max_threads == 0 case as it never
+		 * calls pthreadpool_put_job()
+		 */
+		unlock_res = pthread_mutex_unlock(&pool->mutex);
+		assert(unlock_res == 0);
+		return 0;
+	}
+
+	if (pool->num_idle > 0) {
+		/*
+		 * We have idle threads and pending jobs,
+		 * this means we better let all threads
+		 * start and check for pending jobs.
+		 */
+		res = pthread_cond_broadcast(&pool->condvar);
+		assert(res == 0);
+	}
+
+	if (pool->num_threads < pool->max_threads) {
+		possible_threads = pool->max_threads - pool->num_threads;
+	}
+
+	if (pool->num_idle < pool->num_jobs) {
+		missing_threads = pool->num_jobs - pool->num_idle;
+	}
+
+	missing_threads = MIN(missing_threads, possible_threads);
+
+	while (missing_threads > 0) {
+
+		res = pthreadpool_create_thread(pool);
+		if (res != 0) {
+			break;
+		}
+
+		missing_threads--;
+	}
+
+	if (missing_threads == 0) {
+		/*
+		 * Ok, we recreated all thread we need.
+		 */
+		unlock_res = pthread_mutex_unlock(&pool->mutex);
+		assert(unlock_res == 0);
+		return 0;
+	}
+
+	if (pool->num_threads != 0) {
+		/*
+		 * At least one thread is still available, let
+		 * that one run the queued jobs.
+		 */
+		unlock_res = pthread_mutex_unlock(&pool->mutex);
+		assert(unlock_res == 0);
+		return 0;
+	}
+
+	/*
+	 * There's no thread available to run any pending jobs.
+	 * The caller may want to cancel the jobs and destroy the pool.
+	 * But that's up to the caller.
+	 */
+	unlock_res = pthread_mutex_unlock(&pool->mutex);
+	assert(unlock_res == 0);
+
+	return res;
+}
+
+int pthreadpool_restart_check_monitor_fd(struct pthreadpool *pool)
+{
+	int fd;
+	int ret;
+	bool ok;
+
+	if (pool->stopped) {
+		errno = EINVAL;
+		return -1;
+	}
+
+	if (pool->check_pipefd[0] == -1) {
+		errno = ENOSYS;
+		return -1;
+	}
+
+	fd = dup(pool->check_pipefd[0]);
+	if (fd == -1) {
+		return -1;
+	}
+
+	ok = smb_set_close_on_exec(fd);
+	if (!ok) {
+		int saved_errno = errno;
+		close(fd);
+		errno = saved_errno;
+		return -1;
+	}
+
+	ret = set_blocking(fd, false);
+	if (ret == -1) {
+		int saved_errno = errno;
+		close(fd);
+		errno = saved_errno;
+		return -1;
+	}
+
+	return fd;
+}
+
+int pthreadpool_restart_check_monitor_drain(struct pthreadpool *pool)
+{
+	if (pool->stopped) {
+		return EINVAL;
+	}
+
+	if (pool->check_pipefd[0] == -1) {
+		return ENOSYS;
+	}
+
+	while (true) {
+		uint8_t buf[128];
+		ssize_t nread;
+
+		nread = read(pool->check_pipefd[0], buf, sizeof(buf));
+		if (nread == -1) {
+			if (errno == EINTR) {
+				continue;
+			}
+			if (errno == EAGAIN) {
+				return 0;
+			}
+#ifdef EWOULDBLOCK
+			if (errno == EWOULDBLOCK) {
+				return 0;
+			}
+#endif
+			if (errno == 0) {
+				errno = INT_MAX;
+			}
+
+			return errno;
+		}
+
+		if (nread < sizeof(buf)) {
+			return 0;
+		}
+	}
+
+	abort();
+	return INT_MAX;
+}
+
 size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id,
 			      void (*fn)(void *private_data), void *private_data)
 {
diff --git a/lib/pthreadpool/pthreadpool.h b/lib/pthreadpool/pthreadpool.h
index d8daf9e4519b..543567ceaf78 100644
--- a/lib/pthreadpool/pthreadpool.h
+++ b/lib/pthreadpool/pthreadpool.h
@@ -144,6 +144,70 @@ int pthreadpool_destroy(struct pthreadpool *pool);
 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
 			void (*fn)(void *private_data), void *private_data);
 
+/**
+ * @brief Check if the pthreadpool needs a restart.
+ *
+ * This checks if there are enough threads to run the already
+ * queued jobs. This should be called only the callers signal_fn
+ * (passed to pthreadpool_init()) returned an error, so
+ * that the job's worker thread exited.
+ *
+ * Typically this is called once the file destriptor
+ * returned by pthreadpool_restart_check_monitor_fd()
+ * became readable and pthreadpool_restart_check_monitor_drain()
+ * returned success.
+ *
+ * This function tries to restart the missing threads.
+ *
+ * @param[in]	pool		The pool to run the job on
+ * @return			success: 0, failure: errno
+ *
+ * @see pthreadpool_restart_check_monitor_fd
+ * @see pthreadpool_restart_check_monitor_drain
+ */
+int pthreadpool_restart_check(struct pthreadpool *pool);
+
+/**
+ * @brief Return a file destriptor that monitors the pool.
+ *
+ * If the file destrictor becomes readable,
+ * the event handler should call pthreadpool_restart_check_monitor_drain().
+ *
+ * pthreadpool_restart_check() should also be called once the
+ * state is drained.
+ *
+ * This function returns a fresh fd using dup() each time.
+ *
+ * If the pool doesn't require restarts, this function
+ * returns -1 and sets errno = ENOSYS. The caller
+ * may ignore that situation.
+ *
+ * @param[in]	pool		The pool to run the job on
+ * @return			success: 0, failure: -1 (set errno)
+ *
+ * @see pthreadpool_restart_check_monitor_fd
+ * @see pthreadpool_restart_check_monitor_drain
+ */
+int pthreadpool_restart_check_monitor_fd(struct pthreadpool *pool);
+
+/**
+ * @brief Drain the monitor file destriptor of the pool.
+ *
+ * If the file destrictor returned by pthreadpool_restart_check_monitor_fd()
+ * becomes readable, pthreadpool_restart_check_monitor_drain() should be
+ * called before pthreadpool_restart_check().
+ *
+ * If this function returns an error the caller should close
+ * the file destriptor it got from pthreadpool_restart_check_monitor_fd().
+ *
+ * @param[in]	pool		The pool to run the job on
+ * @return			success: 0, failure: errno
+ *
+ * @see pthreadpool_restart_check_monitor_fd
+ * @see pthreadpool_restart_check
+ */
+int pthreadpool_restart_check_monitor_drain(struct pthreadpool *pool);
+
 /**
  * @brief Try to cancel a job in a pthreadpool
  *
diff --git a/lib/pthreadpool/pthreadpool_sync.c b/lib/pthreadpool/pthreadpool_sync.c
index 2ed6f36dbbc7..a476ea712c3a 100644
--- a/lib/pthreadpool/pthreadpool_sync.c
+++ b/lib/pthreadpool/pthreadpool_sync.c
@@ -83,6 +83,26 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
 			       pool->signal_fn_private_data);
 }
 
+int pthreadpool_restart_check(struct pthreadpool *pool)
+{
+	if (pool->stopped) {
+		return EINVAL;
+	}
+
+	return 0;
+}
+
+int pthreadpool_restart_check_monitor_fd(struct pthreadpool *pool)
+{
+	errno = ENOSYS;
+	return -1;
+}
+
+int pthreadpool_restart_check_monitor_drain(struct pthreadpool *pool)
+{
+	return EINVAL;
+}
+
 size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id,
 			      void (*fn)(void *private_data), void *private_data)
 {
-- 
2.17.1


From 2d3bf25680c1300dd5a1f72c951980d32099d0b4 Mon Sep 17 00:00:00 2001
From: Stefan Metzmacher <metze at samba.org>
Date: Fri, 20 Apr 2018 17:12:07 +0200
Subject: [PATCH 19/20] pthreadpool: implement
 pthreadpool_tevent_wrapper_create() infrastructure

This can be used implement a generic per thread impersonation
for thread pools.

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/pthreadpool/pthreadpool_tevent.c | 379 ++++++++++++++++++++++++++-
 lib/pthreadpool/pthreadpool_tevent.h |  25 ++
 2 files changed, 403 insertions(+), 1 deletion(-)

diff --git a/lib/pthreadpool/pthreadpool_tevent.c b/lib/pthreadpool/pthreadpool_tevent.c
index 01e8586b384d..cb6994d611a2 100644
--- a/lib/pthreadpool/pthreadpool_tevent.c
+++ b/lib/pthreadpool/pthreadpool_tevent.c
@@ -103,6 +103,8 @@ struct pthreadpool_tevent_glue {
 	/* Tuple we are keeping track of in this list. */
 	struct tevent_context *ev;
 	struct tevent_threaded_context *tctx;
+	/* recheck monitor fd event */
+	struct tevent_fd *fde;
 	/* Pointer to link object owned by *ev. */
 	struct pthreadpool_tevent_glue_ev_link *ev_link;
 	/* active jobs */
@@ -122,11 +124,32 @@ struct pthreadpool_tevent_glue_ev_link {
 	struct pthreadpool_tevent_glue *glue;
 };
 
+struct pthreadpool_tevent_wrapper {
+	struct pthreadpool_tevent *main_tp;
+	struct pthreadpool_tevent *wrap_tp;
+	const struct pthreadpool_tevent_wrapper_ops *ops;
+	void *private_state;
+};
+
 struct pthreadpool_tevent {
+	struct pthreadpool_tevent *prev, *next;
+
 	struct pthreadpool *pool;
 	struct pthreadpool_tevent_glue *glue_list;
 
 	struct pthreadpool_tevent_job *jobs;
+
+	struct {
+		/*
+		 * This is used on the main context
+		 */
+		struct pthreadpool_tevent *list;
+
+		/*
+		 * This is used on the wrapper context
+		 */
+		struct pthreadpool_tevent_wrapper *ctx;
+	} wrapper;
 };
 
 struct pthreadpool_tevent_job_state {
@@ -141,6 +164,7 @@ struct pthreadpool_tevent_job {
 	struct pthreadpool_tevent_job *prev, *next;
 
 	struct pthreadpool_tevent *pool;
+	struct pthreadpool_tevent_wrapper *wrapper;
 	struct pthreadpool_tevent_job_state *state;
 	struct tevent_immediate *im;
 
@@ -180,6 +204,15 @@ struct pthreadpool_tevent_job {
 		 */
 		bool started;
 
+		/*
+		 * 'wrapper'
+		 * set before calling the wrapper before_job() or
+		 * after_job() hooks.
+		 * unset again check the hook finished.
+		 * (only written by job thread!)
+		 */
+		bool wrapper;
+
 		/*
 		 * 'executed'
 		 * set once the job function returned.
@@ -209,6 +242,18 @@ struct pthreadpool_tevent_job {
 		 * (only written by job thread!)
 		 */
 		bool signaled;
+
+		/*
+		 * 'exit_thread'
+		 * maybe set during pthreadpool_tevent_job_fn()
+		 * if some wrapper related code generated an error
+		 * and the environment isn't safe anymore.
+		 *
+		 * In such a case pthreadpool_tevent_job_signal()
+		 * will pick this up and therminate the current
+		 * worker thread by returning -1.
+		 */
+		bool exit_thread; /* only written/read by job thread! */
 	} needs_fence;
 
 	bool per_thread_cwd;
@@ -267,8 +312,22 @@ int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
 	return 0;
 }
 
+static struct pthreadpool_tevent *pthreadpool_tevent_unwrap(
+	struct pthreadpool_tevent *pool)
+{
+	struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
+
+	if (wrapper != NULL) {
+		return wrapper->main_tp;
+	}
+
+	return pool;
+}
+
 size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool)
 {
+	pool = pthreadpool_tevent_unwrap(pool);
+
 	if (pool->pool == NULL) {
 		return 0;
 	}
@@ -278,6 +337,8 @@ size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool)
 
 size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool)
 {
+	pool = pthreadpool_tevent_unwrap(pool);
+
 	if (pool->pool == NULL) {
 		return 0;
 	}
@@ -287,6 +348,8 @@ size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool)
 
 bool pthreadpool_tevent_per_thread_cwd(struct pthreadpool_tevent *pool)
 {
+	pool = pthreadpool_tevent_unwrap(pool);
+
 	if (pool->pool == NULL) {
 		return false;
 	}
@@ -298,21 +361,94 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
 {
 	struct pthreadpool_tevent_job *job = NULL;
 	struct pthreadpool_tevent_job *njob = NULL;
+	struct pthreadpool_tevent *wrap_tp = NULL;
+	struct pthreadpool_tevent *nwrap_tp = NULL;
 	struct pthreadpool_tevent_glue *glue = NULL;
 	int ret;
 
+	if (pool->wrapper.ctx != NULL) {
+		struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
+
+		pool->wrapper.ctx = NULL;
+		pool = wrapper->main_tp;
+
+		DLIST_REMOVE(pool->wrapper.list, wrapper->wrap_tp);
+
+		for (job = pool->jobs; job != NULL; job = njob) {
+			njob = job->next;
+
+			if (job->wrapper != wrapper) {
+				continue;
+			}
+
+			/*
+			 * This removes the job from the list
+			 *
+			 * Note that it waits in case
+			 * the wrapper hooks are currently
+			 * executing on the job.
+			 */
+			pthreadpool_tevent_job_orphan(job);
+		}
+
+		/*
+		 * At this point we're sure that no job
+		 * still references the pthreadpool_tevent_wrapper
+		 * structure, so we can free it.
+		 */
+		TALLOC_FREE(wrapper);
+
+		pthreadpool_tevent_cleanup_orphaned_jobs();
+		return 0;
+	}
+
+	if (pool->pool == NULL) {
+		/*
+		 * A dangling wrapper without main_tp.
+		 */
+		return 0;
+	}
+
 	ret = pthreadpool_stop(pool->pool);
 	if (ret != 0) {
 		return ret;
 	}
 
+	/*
+	 * orphan all jobs (including wrapper jobs)
+	 */
 	for (job = pool->jobs; job != NULL; job = njob) {
 		njob = job->next;
 
-		/* The job this removes it from the list */
+		/*
+		 * The job this removes it from the list
+		 *
+		 * Note that it waits in case
+		 * the wrapper hooks are currently
+		 * executing on the job (thread).
+		 */
 		pthreadpool_tevent_job_orphan(job);
 	}
 
+	/*
+	 * cleanup all existing wrappers, remember we just orphaned
+	 * all jobs (including the once of the wrappers).
+	 *
+	 * So we just mark as broken, so that
+	 * pthreadpool_tevent_job_send() won't accept new jobs.
+	 */
+	for (wrap_tp = pool->wrapper.list; wrap_tp != NULL; wrap_tp = nwrap_tp) {
+		nwrap_tp = wrap_tp->next;
+
+		/*
+		 * Just mark them as broken, so that we can't
+		 * get more jobs.
+		 */
+		TALLOC_FREE(wrap_tp->wrapper.ctx);
+
+		DLIST_REMOVE(pool->wrapper.list, wrap_tp);
+	}
+
 	/*
 	 * Delete all the registered
 	 * tevent_context/tevent_threaded_context
@@ -335,12 +471,77 @@ static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
 	return 0;
 }
 
+struct pthreadpool_tevent *_pthreadpool_tevent_wrapper_create(
+				struct pthreadpool_tevent *main_tp,
+				TALLOC_CTX *mem_ctx,
+				const struct pthreadpool_tevent_wrapper_ops *ops,
+				void *pstate,
+				size_t psize,
+				const char *type,
+				const char *location)
+{
+	void **ppstate = (void **)pstate;
+	struct pthreadpool_tevent *wrap_tp = NULL;
+	struct pthreadpool_tevent_wrapper *wrapper = NULL;
+
+	pthreadpool_tevent_cleanup_orphaned_jobs();
+
+	if (main_tp->wrapper.ctx != NULL) {
+		/*
+		 * stacking of wrappers is not supported
+		 */
+		errno = EINVAL;
+		return NULL;
+	}
+
+	if (main_tp->pool == NULL) {
+		/*
+		 * The pool is no longer valid,
+		 * most likely it was a wrapper context
+		 * where the main pool was destroyed.
+		 */
+		errno = EINVAL;
+		return NULL;
+	}
+
+	wrap_tp = talloc_zero(mem_ctx, struct pthreadpool_tevent);
+	if (wrap_tp == NULL) {
+		return NULL;
+	}
+
+	wrapper = talloc_zero(wrap_tp, struct pthreadpool_tevent_wrapper);
+	if (wrapper == NULL) {
+		TALLOC_FREE(wrap_tp);
+		return NULL;
+	}
+	wrapper->main_tp = main_tp;
+	wrapper->wrap_tp = wrap_tp;
+	wrapper->ops = ops;
+	wrapper->private_state = talloc_zero_size(wrapper, psize);
+	if (wrapper->private_state == NULL) {
+		TALLOC_FREE(wrap_tp);
+		return NULL;
+	}
+	talloc_set_name_const(wrapper->private_state, type);
+
+	wrap_tp->wrapper.ctx = wrapper;
+
+	DLIST_ADD_END(main_tp->wrapper.list, wrap_tp);
+
+	talloc_set_destructor(wrap_tp, pthreadpool_tevent_destructor);
+
+	*ppstate = wrapper->private_state;
+	return wrap_tp;
+}
+
 static int pthreadpool_tevent_glue_destructor(
 	struct pthreadpool_tevent_glue *glue)
 {
 	struct pthreadpool_tevent_job_state *state = NULL;
 	struct pthreadpool_tevent_job_state *nstate = NULL;
 
+	TALLOC_FREE(glue->fde);
+
 	for (state = glue->states; state != NULL; state = nstate) {
 		nstate = state->next;
 
@@ -381,6 +582,59 @@ static int pthreadpool_tevent_glue_link_destructor(
 	return 0;
 }
 
+static void pthreadpool_tevent_glue_monitor(struct tevent_context *ev,
+					    struct tevent_fd *fde,
+					    uint16_t flags,
+					    void *private_data)
+{
+	struct pthreadpool_tevent_glue *glue =
+		talloc_get_type_abort(private_data,
+		struct pthreadpool_tevent_glue);
+	struct pthreadpool_tevent_job *job = NULL;
+	struct pthreadpool_tevent_job *njob = NULL;
+	int ret = -1;
+
+	ret = pthreadpool_restart_check_monitor_drain(glue->pool->pool);
+	if (ret != 0) {
+		TALLOC_FREE(glue->fde);
+	}
+
+	ret = pthreadpool_restart_check(glue->pool->pool);
+	if (ret == 0) {
+		/*
+		 * success...
+		 */
+		goto done;
+	}
+
+	/*
+	 * There's a problem and the pool
+	 * has not a single thread available
+	 * for pending jobs, so we can only
+	 * stop the jobs and return an error.
+	 * This is similar to a failure from
+	 * pthreadpool_add_job().
+	 */
+	for (job = glue->pool->jobs; job != NULL; job = njob) {
+		njob = job->next;
+
+		tevent_req_defer_callback(job->state->req,
+					  job->state->ev);
+		tevent_req_error(job->state->req, ret);
+	}
+
+done:
+	if (glue->states == NULL) {
+		/*
+		 * If the glue doesn't have any pending jobs
+		 * we remove the glue.
+		 *
+		 * In order to remove the fd event.
+		 */
+		TALLOC_FREE(glue);
+	}
+}
+
 static int pthreadpool_tevent_register_ev(
 				struct pthreadpool_tevent *pool,
 				struct pthreadpool_tevent_job_state *state)
@@ -388,6 +642,7 @@ static int pthreadpool_tevent_register_ev(
 	struct tevent_context *ev = state->ev;
 	struct pthreadpool_tevent_glue *glue = NULL;
 	struct pthreadpool_tevent_glue_ev_link *ev_link = NULL;
+	int monitor_fd = -1;
 
 	/*
 	 * See if this tevent_context was already registered by
@@ -420,6 +675,28 @@ static int pthreadpool_tevent_register_ev(
 	};
 	talloc_set_destructor(glue, pthreadpool_tevent_glue_destructor);
 
+	monitor_fd = pthreadpool_restart_check_monitor_fd(pool->pool);
+	if (monitor_fd == -1 && errno != ENOSYS) {
+		int saved_errno = errno;
+		TALLOC_FREE(glue);
+		return saved_errno;
+	}
+
+	if (monitor_fd != -1) {
+		glue->fde = tevent_add_fd(ev,
+					  glue,
+					  monitor_fd,
+					  TEVENT_FD_READ,
+					  pthreadpool_tevent_glue_monitor,
+					  glue);
+		if (glue->fde == NULL) {
+			close(monitor_fd);
+			TALLOC_FREE(glue);
+			return ENOMEM;
+		}
+		tevent_fd_set_auto_close(glue->fde);
+	}
+
 	/*
 	 * Now allocate the link object to the event context. Note this
 	 * is allocated OFF THE EVENT CONTEXT ITSELF, so if the event
@@ -559,6 +836,24 @@ static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job)
 		abort();
 	}
 
+	/*
+	 * Once we marked the request as 'orphaned'
+	 * we spin/loop if 'wrapper' is marked as active.
+	 *
+	 * We need to wait until the wrapper hook finished
+	 * before we can set job->wrapper = NULL.
+	 *
+	 * This is some kind of spinlock, but with
+	 * 1 millisecond sleeps in between, in order
+	 * to give the thread more cpu time to finish.
+	 */
+	PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+	while (job->needs_fence.wrapper) {
+		poll(NULL, 0, 1);
+		PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+	}
+	job->wrapper = NULL;
+
 	/*
 	 * Once we marked the request as 'orphaned'
 	 * we spin/loop if it's already marked
@@ -673,9 +968,14 @@ struct tevent_req *pthreadpool_tevent_job_send(
 	struct pthreadpool_tevent_job_state *state = NULL;
 	struct pthreadpool_tevent_job *job = NULL;
 	int ret;
+	struct pthreadpool_tevent_wrapper *wrapper = pool->wrapper.ctx;
 
 	pthreadpool_tevent_cleanup_orphaned_jobs();
 
+	if (wrapper != NULL) {
+		pool = wrapper->main_tp;
+	}
+
 	req = tevent_req_create(mem_ctx, &state,
 				struct pthreadpool_tevent_job_state);
 	if (req == NULL) {
@@ -705,6 +1005,7 @@ struct tevent_req *pthreadpool_tevent_job_send(
 		return tevent_req_post(req, ev);
 	}
 	job->pool = pool;
+	job->wrapper = wrapper;
 	job->fn = fn;
 	job->private_data = private_data;
 	job->im = tevent_create_immediate(state->job);
@@ -803,15 +1104,73 @@ static void pthreadpool_tevent_job_fn(void *private_data)
 	struct pthreadpool_tevent_job *job =
 		talloc_get_type_abort(private_data,
 		struct pthreadpool_tevent_job);
+	struct pthreadpool_tevent_wrapper *wrapper = NULL;
 
 	current_job = job;
 	job->needs_fence.started = true;
 	PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+	if (job->needs_fence.orphaned) {
+		current_job = NULL;
+		return;
+	}
+
+	wrapper = job->wrapper;
+	if (wrapper != NULL) {
+		bool ok;
+
+		job->needs_fence.wrapper = true;
+		PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+		if (job->needs_fence.orphaned) {
+			job->needs_fence.wrapper = false;
+			PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+			current_job = NULL;
+			return;
+		}
+		ok = wrapper->ops->before_job(wrapper->wrap_tp,
+					      wrapper->private_state,
+					      wrapper->main_tp,
+					      __location__);
+		job->needs_fence.wrapper = false;
+		PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+		if (!ok) {
+			job->needs_fence.exit_thread = true;
+			PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+			current_job = NULL;
+			return;
+		}
+	}
 
 	job->fn(job->private_data);
 
 	job->needs_fence.executed = true;
 	PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+
+	if (wrapper != NULL) {
+		bool ok;
+
+		job->needs_fence.wrapper = true;
+		PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+		if (job->needs_fence.orphaned) {
+			job->needs_fence.wrapper = false;
+			job->needs_fence.exit_thread = true;
+			PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+			current_job = NULL;
+			return;
+		}
+		ok = wrapper->ops->after_job(wrapper->wrap_tp,
+					     wrapper->private_state,
+					     wrapper->main_tp,
+					     __location__);
+		job->needs_fence.wrapper = false;
+		PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+		if (!ok) {
+			job->needs_fence.exit_thread = true;
+			PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+			current_job = NULL;
+			return;
+		}
+	}
+
 	current_job = NULL;
 }
 
@@ -830,6 +1189,15 @@ static int pthreadpool_tevent_job_signal(int jobid,
 		/* Request already gone */
 		job->needs_fence.dropped = true;
 		PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+		if (job->needs_fence.exit_thread) {
+			/*
+			 * A problem with the wrapper the current job/worker
+			 * thread needs to terminate.
+			 *
+			 * The pthreadpool_tevent is already gone.
+			 */
+			return -1;
+		}
 		return 0;
 	}
 
@@ -855,6 +1223,15 @@ static int pthreadpool_tevent_job_signal(int jobid,
 
 	job->needs_fence.signaled = true;
 	PTHREAD_TEVENT_JOB_THREAD_FENCE(job);
+	if (job->needs_fence.exit_thread) {
+		/*
+		 * A problem with the wrapper the current job/worker
+		 * thread needs to terminate.
+		 *
+		 * The pthreadpool_tevent is already gone.
+		 */
+		return -1;
+	}
 	return 0;
 }
 
diff --git a/lib/pthreadpool/pthreadpool_tevent.h b/lib/pthreadpool/pthreadpool_tevent.h
index ff2ab7cfb73d..653f5847ecfc 100644
--- a/lib/pthreadpool/pthreadpool_tevent.h
+++ b/lib/pthreadpool/pthreadpool_tevent.h
@@ -29,6 +29,31 @@ struct pthreadpool_tevent;
 int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
 			    struct pthreadpool_tevent **presult);
 
+struct pthreadpool_tevent_wrapper_ops {
+	const char *name;
+
+	bool (*before_job)(struct pthreadpool_tevent *wrap_tp,
+			   void *private_state,
+			   struct pthreadpool_tevent *main_tp,
+			   const char *location);
+	bool (*after_job)(struct pthreadpool_tevent *wrap_tp,
+			  void *private_state,
+			  struct pthreadpool_tevent *main_tp,
+			  const char *location);
+};
+
+struct pthreadpool_tevent *_pthreadpool_tevent_wrapper_create(
+				struct pthreadpool_tevent *main_tp,
+				TALLOC_CTX *mem_ctx,
+				const struct pthreadpool_tevent_wrapper_ops *ops,
+				void *pstate,
+				size_t psize,
+				const char *type,
+				const char *location);
+#define pthreadpool_tevent_wrapper_create(main_tp, mem_ctx, ops, state, type) \
+	_pthreadpool_tevent_wrapper_create(main_tp, mem_ctx, ops, \
+				       state, sizeof(type), #type, __location__)
+
 size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool);
 size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool);
 bool pthreadpool_tevent_per_thread_cwd(struct pthreadpool_tevent *pool);
-- 
2.17.1


From ddb0c021db29337596f634ae65abe08e69ac32e2 Mon Sep 17 00:00:00 2001
From: Ralph Boehme <slow at samba.org>
Date: Mon, 18 Jun 2018 16:57:18 +0200
Subject: [PATCH 20/20] pthreadpool: test cancelling and freeing jobs of a
 wrapped pthreadpool_tevent

Pair-Programmed-With: Stefan Metzmacher <metze at samba.org>

Signed-off-by: Stefan Metzmacher <metze at samba.org>
---
 lib/pthreadpool/tests_cmocka.c | 524 +++++++++++++++++++++++++++++++++
 1 file changed, 524 insertions(+)

diff --git a/lib/pthreadpool/tests_cmocka.c b/lib/pthreadpool/tests_cmocka.c
index fa508e24eab2..6dc4b47c6429 100644
--- a/lib/pthreadpool/tests_cmocka.c
+++ b/lib/pthreadpool/tests_cmocka.c
@@ -543,6 +543,8 @@ struct test_cancel_state {
 	struct test_cancel_job *job4;
 	struct test_cancel_job *job5;
 	struct test_cancel_job *job6;
+	struct test_cancel_job *job7;
+	struct test_cancel_job *job8;
 };
 
 static void test_cancel_job(void **private_data)
@@ -805,6 +807,525 @@ static void test_cancel_job(void **private_data)
 	TALLOC_FREE(state);
 }
 
+struct test_pthreadpool_tevent_wrap_tp_stats {
+	unsigned num_before;
+	unsigned num_after;
+	bool destroyed;
+};
+
+struct test_pthreadpool_tevent_wrap_tp_state {
+	struct test_pthreadpool_tevent_wrap_tp_state **selfptr;
+	struct test_pthreadpool_tevent_wrap_tp_stats *stats;
+};
+
+static int test_pthreadpool_tevent_wrap_tp_state_destructor(
+	struct test_pthreadpool_tevent_wrap_tp_state *state)
+{
+	state->stats->destroyed = true;
+	*state->selfptr = NULL;
+
+	return 0;
+}
+
+static bool test_pthreadpool_tevent_tp_before(struct pthreadpool_tevent *wrap,
+					      void *private_data,
+					      struct pthreadpool_tevent *main,
+					      const char *location)
+{
+	struct test_pthreadpool_tevent_wrap_tp_state *state =
+		talloc_get_type_abort(private_data,
+		struct test_pthreadpool_tevent_wrap_tp_state);
+
+	state->stats->num_before++;
+
+	return true;
+}
+
+static bool test_pthreadpool_tevent_tp_after(struct pthreadpool_tevent *wrap,
+					     void *private_data,
+					     struct pthreadpool_tevent *main,
+					     const char *location)
+{
+	struct test_pthreadpool_tevent_wrap_tp_state *state =
+		talloc_get_type_abort(private_data,
+		struct test_pthreadpool_tevent_wrap_tp_state);
+
+	state->stats->num_after++;
+
+	return true;
+}
+
+static const struct pthreadpool_tevent_wrapper_ops test_tp_ops = {
+	.name		= "test_pthreadpool_tevent_tp",
+	.before_job	= test_pthreadpool_tevent_tp_before,
+	.after_job	= test_pthreadpool_tevent_tp_after,
+};
+
+static void test_wrap_cancel_job(void **private_data)
+{
+	struct pthreadpool_tevent_test *t = *private_data;
+	struct tevent_context *ev = t->ev;
+	struct pthreadpool_tevent *poolw1 = NULL;
+	struct test_pthreadpool_tevent_wrap_tp_state *poolw1_state = NULL;
+	struct test_pthreadpool_tevent_wrap_tp_stats poolw1_stats = {
+		.destroyed = false,
+	};
+	struct pthreadpool_tevent *poolw2 = NULL;
+	struct test_pthreadpool_tevent_wrap_tp_state *poolw2_state = NULL;
+	struct test_pthreadpool_tevent_wrap_tp_stats poolw2_stats = {
+		.destroyed = false,
+	};
+	size_t max_threads_o;
+	size_t max_threads_w1;
+	size_t max_threads_w2;
+	bool per_thread_cwd_o;
+	bool per_thread_cwd_w1;
+	bool per_thread_cwd_w2;
+	struct test_cancel_state *state = NULL;
+	int ret;
+	bool ok;
+	int fdpair[2] = { -1, -1 };
+	char c = 0;
+
+	max_threads_o = pthreadpool_tevent_max_threads(t->opool);
+	per_thread_cwd_o = pthreadpool_tevent_per_thread_cwd(t->opool);
+
+	poolw1 = pthreadpool_tevent_wrapper_create(
+		t->opool, t, &test_tp_ops, &poolw1_state,
+		struct test_pthreadpool_tevent_wrap_tp_state);
+	assert_non_null(poolw1);
+	poolw1_state->selfptr = &poolw1_state;
+	ANNOTATE_BENIGN_RACE_SIZED(&poolw1_stats,
+				   sizeof(poolw1_stats),
+				   "protected by pthreadpool_tevent code");
+	poolw1_state->stats = &poolw1_stats;
+	talloc_set_destructor(poolw1_state,
+			      test_pthreadpool_tevent_wrap_tp_state_destructor);
+
+	poolw2 = pthreadpool_tevent_wrapper_create(
+		t->opool, t, &test_tp_ops, &poolw2_state,
+		struct test_pthreadpool_tevent_wrap_tp_state);
+	assert_non_null(poolw2);
+	poolw2_state->selfptr = &poolw2_state;
+	ANNOTATE_BENIGN_RACE_SIZED(&poolw2_stats,
+				   sizeof(poolw2_stats),
+				   "protected by pthreadpool_tevent code");
+	poolw2_state->stats = &poolw2_stats;
+	talloc_set_destructor(poolw2_state,
+			      test_pthreadpool_tevent_wrap_tp_state_destructor);
+
+	assert_false(poolw1_stats.destroyed);
+	assert_int_equal(poolw1_stats.num_before, 0);
+	assert_int_equal(poolw1_stats.num_after, 0);
+	max_threads_w1 = pthreadpool_tevent_max_threads(poolw1);
+	assert_int_equal(max_threads_w1, max_threads_o);
+	per_thread_cwd_w1 = pthreadpool_tevent_per_thread_cwd(poolw1);
+	assert_int_equal(per_thread_cwd_w1, per_thread_cwd_o);
+
+	assert_false(poolw2_stats.destroyed);
+	assert_int_equal(poolw2_stats.num_before, 0);
+	assert_int_equal(poolw2_stats.num_after, 0);
+	max_threads_w2 = pthreadpool_tevent_max_threads(poolw2);
+	assert_int_equal(max_threads_w2, max_threads_o);
+	per_thread_cwd_w2 = pthreadpool_tevent_per_thread_cwd(poolw2);
+	assert_int_equal(per_thread_cwd_w2, per_thread_cwd_o);
+
+	state = talloc_zero(t, struct test_cancel_state);
+	assert_non_null(state);
+	state->job1 = test_cancel_job_create(state);
+	assert_non_null(state->job1);
+	state->job2 = test_cancel_job_create(state);
+	assert_non_null(state->job2);
+	state->job3 = test_cancel_job_create(state);
+	assert_non_null(state->job3);
+
+
+	ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
+	assert_return_code(ret, 0);
+
+	state->job1->fdm = fdpair[0];
+	state->job1->fdj = fdpair[1];
+
+	assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 0);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 0);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 0);
+
+	will_return(__wrap_pthread_create, 0);
+	state->job1->req = pthreadpool_tevent_job_send(
+		state->job1, ev, poolw1, test_cancel_job_fn, state->job1);
+	assert_non_null(state->job1->req);
+	tevent_req_set_callback(state->job1->req,
+				test_cancel_job_done,
+				state->job1);
+
+	state->job2->req = pthreadpool_tevent_job_send(
+		state->job2, ev, poolw1, test_cancel_job_fn, NULL);
+	assert_non_null(state->job2->req);
+	tevent_req_set_callback(state->job2->req,
+				test_cancel_job_done,
+				state->job2);
+
+	state->job3->req = pthreadpool_tevent_job_send(
+		state->job3, ev, poolw1, test_cancel_job_fn, NULL);
+	assert_non_null(state->job3->req);
+	tevent_req_set_callback(state->job3->req,
+				test_cancel_job_done,
+				state->job3);
+
+	/*
+	 * Wait for the job 1 to start.
+	 */
+	ret = read(state->job1->fdm, &c, 1);
+	assert_return_code(ret, 1);
+
+	/*
+	 * We cancel job 3 and destroy job2.
+	 * Both should never be executed.
+	 */
+	assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 2);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 2);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 2);
+	TALLOC_FREE(state->job2->req);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 1);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 1);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 1);
+	ok = tevent_req_cancel(state->job3->req);
+	assert_true(ok);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 0);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 0);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 0);
+
+	/*
+	 * Job 3 should complete as canceled, while
+	 * job 1 is still running.
+	 */
+	test_cancel_job_wait(state->job3, ev);
+	assert_return_code(state->job3->ret, ECANCELED);
+	assert_null(state->job3->req);
+	assert_false(state->job3->started);
+
+	/*
+	 * Now job1 is canceled while it's running,
+	 * this should let it stop it's loop.
+	 */
+	ok = tevent_req_cancel(state->job1->req);
+	assert_false(ok);
+
+	/*
+	 * Job 1 completes, It got at least one sleep
+	 * timeout loop and has state->job1->canceled set.
+	 */
+	test_cancel_job_wait(state->job1, ev);
+	assert_return_code(state->job1->ret, 0);
+	assert_null(state->job1->req);
+	assert_true(state->job1->started);
+	assert_true(state->job1->finished);
+	assert_true(state->job1->canceled);
+	assert_false(state->job1->orphaned);
+	assert_in_range(state->job1->polls, 1, 100);
+	assert_int_equal(state->job1->timeouts, state->job1->polls);
+
+	assert_false(poolw1_stats.destroyed);
+	assert_int_equal(poolw1_stats.num_before, 1);
+	assert_int_equal(poolw1_stats.num_after, 1);
+
+	assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 0);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 0);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 0);
+
+	/*
+	 * Now we create jobs 4 and 5
+	 * Both should execute.
+	 * Job 4 is orphaned while running by a TALLOC_FREE()
+	 * This should stop job 4 and let job 5 start.
+	 * We do a "normal" exit in job 5 by creating some activity
+	 * on the socketpair.
+	 */
+
+	state->job4 = test_cancel_job_create(state);
+	assert_non_null(state->job4);
+
+	ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
+	assert_return_code(ret, 0);
+
+	state->job4->fdm = fdpair[0];
+	state->job4->fdj = fdpair[1];
+
+	state->job4->req = pthreadpool_tevent_job_send(
+		state->job4, ev, poolw1, test_cancel_job_fn, state->job4);
+	assert_non_null(state->job4->req);
+	tevent_req_set_callback(state->job4->req,
+				test_cancel_job_done,
+				state->job4);
+
+	state->job5 = test_cancel_job_create(state);
+	assert_non_null(state->job5);
+
+	ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
+	assert_return_code(ret, 0);
+
+	state->job5->fdm = fdpair[0];
+	state->job5->fdj = fdpair[1];
+
+	state->job5->req = pthreadpool_tevent_job_send(
+		state->job5, ev, poolw1, test_cancel_job_fn, state->job5);
+	assert_non_null(state->job5->req);
+	tevent_req_set_callback(state->job5->req,
+				test_cancel_job_done,
+				state->job5);
+
+	/*
+	 * Make sure job 5 can exit as soon as possible.
+	 * It will never get a sleep/poll timeout.
+	 */
+	ret = write(state->job5->fdm, &c, 1);
+	assert_return_code(ret, errno);
+
+	/*
+	 * Wait for the job 4 to start
+	 */
+	ret = read(state->job4->fdm, &c, 1);
+	assert_return_code(ret, 1);
+
+	assert_false(poolw1_stats.destroyed);
+	assert_int_equal(poolw1_stats.num_before, 2);
+	assert_int_equal(poolw1_stats.num_after, 1);
+
+	assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 1);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 1);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 1);
+
+	/*
+	 * destroy the request so that it's marked
+	 * as orphaned.
+	 *
+	 * As we're in the wrapper mode, the
+	 * job thread will exit and try to create
+	 * a new thread.
+	 */
+	TALLOC_FREE(state->job4->req);
+
+	/*
+	 * Job 5 completes, It got no sleep timeout loop.
+	 */
+	will_return(__wrap_pthread_create, 0);
+	test_cancel_job_wait(state->job5, ev);
+	assert_return_code(state->job5->ret, 0);
+	assert_null(state->job5->req);
+	assert_true(state->job5->started);
+	assert_true(state->job5->finished);
+	assert_false(state->job5->canceled);
+	assert_false(state->job5->orphaned);
+	assert_int_equal(state->job5->polls, 1);
+	assert_int_equal(state->job5->timeouts, 0);
+
+	assert_false(poolw1_stats.destroyed);
+	assert_int_equal(poolw1_stats.num_before, 3);
+	assert_int_equal(poolw1_stats.num_after, 2);
+
+	assert_int_equal(pthreadpool_tevent_queued_jobs(t->opool), 0);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(poolw1), 0);
+	assert_int_equal(pthreadpool_tevent_queued_jobs(poolw2), 0);
+
+	/*
+	 * Job 2 is still not executed as we did a TALLOC_FREE()
+	 * before is was scheduled.
+	 */
+	assert_false(state->job2->completed);
+	assert_false(state->job2->started);
+
+	/*
+	 * Job 4 is still wasn't completed as we did a TALLOC_FREE()
+	 * while it is was running. but it was started and has
+	 * orphaned set
+	 */
+	assert_false(state->job4->completed);
+	assert_true(state->job4->started);
+	assert_true(state->job4->finished);
+	assert_false(state->job4->canceled);
+	assert_true(state->job4->orphaned);
+	assert_in_range(state->job4->polls, 1, 100);
+	assert_int_equal(state->job4->timeouts, state->job4->polls);
+
+	assert_false(poolw1_stats.destroyed);
+	assert_int_equal(poolw1_stats.num_before, 3);
+	assert_int_equal(poolw1_stats.num_after, 2);
+
+	/*
+	 * Now we create jobs 6
+	 * We destroy the pool wrapper while it's executing.
+	 */
+
+	state->job6 = test_cancel_job_create(state);
+	assert_non_null(state->job6);
+
+	ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
+	assert_return_code(ret, 0);
+
+	state->job6->fdm = fdpair[0];
+	state->job6->fdj = fdpair[1];
+
+	state->job6->req = pthreadpool_tevent_job_send(
+		state->job6, ev, poolw1, test_cancel_job_fn, state->job6);
+	assert_non_null(state->job6->req);
+	tevent_req_set_callback(state->job6->req,
+				test_cancel_job_done,
+				state->job6);
+
+	/*
+	 * Wait for the job 6 to start
+	 */
+	ret = read(state->job6->fdm, &c, 1);
+	assert_return_code(ret, 1);
+
+	assert_non_null(poolw1_state);
+	assert_false(poolw1_stats.destroyed);
+	assert_int_equal(poolw1_stats.num_before, 4);
+	assert_int_equal(poolw1_stats.num_after, 2);
+
+	/*
+	 * destroy the request so that it's marked
+	 * as orphaned.
+	 */
+	TALLOC_FREE(poolw1);
+
+	/*
+	 * Wait until the job finished.
+	 */
+	ret = read(state->job6->fdm, &c, 1);
+	assert_return_code(ret, 0);
+
+	assert_null(poolw1_state);
+	assert_true(poolw1_stats.destroyed);
+	assert_int_equal(poolw1_stats.num_before, 4);
+	assert_int_equal(poolw1_stats.num_after, 2);
+
+	/*
+	 * Job 6 is still dangling arround.
+	 *
+	 * We need to convince valgrind --tool={drd,helgrind}
+	 * that the read above is good enough to be
+	 * sure the job is finished and closed the other end of
+	 * the socketpair.
+	 */
+	ANNOTATE_BENIGN_RACE_SIZED(state->job6,
+				   sizeof(*state->job6),
+				   "protected by thread fence");
+	assert_non_null(state->job6->req);
+	assert_true(tevent_req_is_in_progress(state->job6->req));
+	assert_false(state->job6->completed);
+	assert_true(state->job6->started);
+	assert_true(state->job6->finished);
+	assert_false(state->job6->canceled);
+	assert_true(state->job6->orphaned);
+	assert_in_range(state->job6->polls, 1, 100);
+	assert_int_equal(state->job6->timeouts, state->job4->polls);
+
+	/*
+	 * Now we create jobs 7
+	 * On a new wrapper pool
+	 * We destroy the main pool while it's executing.
+	 */
+
+	state->job7 = test_cancel_job_create(state);
+	assert_non_null(state->job7);
+
+	assert_false(poolw2_stats.destroyed);
+	assert_int_equal(poolw2_stats.num_before, 0);
+	assert_int_equal(poolw2_stats.num_after, 0);
+
+	ret = socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair);
+	assert_return_code(ret, 0);
+
+	state->job7->fdm = fdpair[0];
+	state->job7->fdj = fdpair[1];
+
+	will_return(__wrap_pthread_create, 0);
+	state->job7->req = pthreadpool_tevent_job_send(
+		state->job7, ev, poolw2, test_cancel_job_fn, state->job7);
+	assert_non_null(state->job7->req);
+	tevent_req_set_callback(state->job7->req,
+				test_cancel_job_done,
+				state->job7);
+
+	/*
+	 * Wait for the job 7 to start
+	 */
+	ret = read(state->job7->fdm, &c, 1);
+	assert_return_code(ret, 1);
+
+	assert_false(poolw2_stats.destroyed);
+	assert_int_equal(poolw2_stats.num_before, 1);
+	assert_int_equal(poolw2_stats.num_after, 0);
+
+	/*
+	 * destroy the request so that it's marked
+	 * as orphaned.
+	 */
+	TALLOC_FREE(t->opool);
+
+	/*
+	 * Wait until the job finished.
+	 */
+	ret = read(state->job7->fdm, &c, 1);
+	assert_return_code(ret, 0);
+
+	assert_null(poolw2_state);
+	assert_true(poolw2_stats.destroyed);
+	assert_int_equal(poolw2_stats.num_before, 1);
+	assert_int_equal(poolw2_stats.num_after, 0);
+
+	/*
+	 * Job 7 is still dangling arround.
+	 *
+	 * We need to convince valgrind --tool={drd,helgrind}
+	 * that the read above is good enough to be
+	 * sure the job is finished and closed the other end of
+	 * the socketpair.
+	 */
+	ANNOTATE_BENIGN_RACE_SIZED(state->job7,
+				   sizeof(*state->job7),
+				   "protected by thread fence");
+	assert_non_null(state->job7->req);
+	assert_true(tevent_req_is_in_progress(state->job7->req));
+	assert_false(state->job7->completed);
+	assert_true(state->job7->started);
+	assert_true(state->job7->finished);
+	assert_false(state->job7->canceled);
+	assert_true(state->job7->orphaned);
+	assert_in_range(state->job7->polls, 1, 100);
+	assert_int_equal(state->job7->timeouts, state->job4->polls);
+
+	/*
+	 * Now check that adding a new job to the dangling
+	 * wrapper gives an error.
+	 */
+	state->job8 = test_cancel_job_create(state);
+	assert_non_null(state->job8);
+
+	state->job8->req = pthreadpool_tevent_job_send(
+		state->job8, ev, poolw2, test_cancel_job_fn, state->job8);
+	assert_non_null(state->job8->req);
+	tevent_req_set_callback(state->job8->req,
+				test_cancel_job_done,
+				state->job8);
+
+	/*
+	 * Job 8 completes, But with a failure.
+	 */
+	test_cancel_job_wait(state->job8, ev);
+	assert_return_code(state->job8->ret, 0);
+	assert_null(state->job8->req);
+	assert_false(state->job8->started);
+	assert_false(state->job8->finished);
+	assert_false(state->job8->canceled);
+	assert_false(state->job8->orphaned);
+	assert_int_equal(state->job8->polls, 0);
+	assert_int_equal(state->job8->timeouts, 0);
+
+	TALLOC_FREE(state);
+}
+
 int main(int argc, char **argv)
 {
 	const struct CMUnitTest tests[] = {
@@ -817,6 +1338,9 @@ int main(int argc, char **argv)
 		cmocka_unit_test_setup_teardown(test_cancel_job,
 						setup_pthreadpool_tevent,
 						teardown_pthreadpool_tevent),
+		cmocka_unit_test_setup_teardown(test_wrap_cancel_job,
+						setup_pthreadpool_tevent,
+						teardown_pthreadpool_tevent),
 	};
 
 	cmocka_set_message_output(CM_OUTPUT_SUBUNIT);
-- 
2.17.1

-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 833 bytes
Desc: OpenPGP digital signature
URL: <http://lists.samba.org/pipermail/samba-technical/attachments/20180719/40246f1c/signature.sig>


More information about the samba-technical mailing list