LDAP notification tests fail with full-DB locking

Andrew Bartlett abartlet at samba.org
Mon May 29 04:35:42 UTC 2017


On Sat, 2017-05-27 at 09:32 +1200, Andrew Bartlett wrote:
> On Fri, 2017-05-26 at 21:33 +0200, Stefan Metzmacher via samba-
> technical wrote:
> > 
> > Then prepare a branch with the minimum required patches
> > to trigger the notify problem and give me the
> > make subset that triggers it.
> 
> Thanks.  I will get you this branch next week.  I should even be able
> to split that in the other direction, and not even include event loop
> changes to show it is just the global lock.  (The event loop changes
> are needed for the final fix, naturally).
> 
> The command is 
> make test TESTS=samba4.ldap.notification.python

Attached is the minimum set of patches to show the hang.

Thanks,

Andrew Bartlett

-- 
Andrew Bartlett
https://samba.org/~abartlet/
Authentication Developer, Samba Team         https://samba.org
Samba Development and Support, Catalyst IT   
https://catalyst.net.nz/services/samba



-------------- next part --------------
From 368f45dc661bb903bdf2c9a407c4e0ad40bd32e4 Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Fri, 31 Mar 2017 17:34:13 +1300
Subject: [PATCH 01/11] tdb: Remove locking from tdb_traverse_read()

This restores the original intent of tdb_traverse_read() in
7dd31288a701d772e45b1960ac4ce4cc1be782ed

This is needed to avoid a deadlock with tdb_lockall() and the
transaction start, as ldb_tdb should take the allrecord lock during a
search (which calls tdb_traverse), and can otherwise deadlock against
a transaction starting in another process

We add a test to show that a transaction can now start while a read
traverse is in progress

This allows more operations to happen in parallel.  The blocking point
is moved to the prepare commit.

This in turn permits a roughly doubling of unindexed search
performance, because currently ldb_tdb omits to take the lock due to
an unrelated bug, but taking the allrecord lock triggers the
above-mentioned deadlock.

This behaviour was added in 251aaafe3a9213118ac3a92def9ab2104c40d12a
for Solaris 10 in 2005.

Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/tdb/common/traverse.c          | 10 +---------
 lib/tdb/test/run-nested-traverse.c | 31 +++++++++++++++++++++++++++----
 2 files changed, 28 insertions(+), 13 deletions(-)

diff --git a/lib/tdb/common/traverse.c b/lib/tdb/common/traverse.c
index f33ef34ab8d..f62306e5560 100644
--- a/lib/tdb/common/traverse.c
+++ b/lib/tdb/common/traverse.c
@@ -244,7 +244,7 @@ out:
 
 
 /*
-  a read style traverse - temporarily marks the db read only
+  a read style traverse - temporarily marks each record read only
 */
 _PUBLIC_ int tdb_traverse_read(struct tdb_context *tdb,
 		      tdb_traverse_func fn, void *private_data)
@@ -252,19 +252,11 @@ _PUBLIC_ int tdb_traverse_read(struct tdb_context *tdb,
 	struct tdb_traverse_lock tl = { NULL, 0, 0, F_RDLCK };
 	int ret;
 
-	/* we need to get a read lock on the transaction lock here to
-	   cope with the lock ordering semantics of solaris10 */
-	if (tdb_transaction_lock(tdb, F_RDLCK, TDB_LOCK_WAIT)) {
-		return -1;
-	}
-
 	tdb->traverse_read++;
 	tdb_trace(tdb, "tdb_traverse_read_start");
 	ret = tdb_traverse_internal(tdb, fn, private_data, &tl);
 	tdb->traverse_read--;
 
-	tdb_transaction_unlock(tdb, F_RDLCK);
-
 	return ret;
 }
 
diff --git a/lib/tdb/test/run-nested-traverse.c b/lib/tdb/test/run-nested-traverse.c
index 22ee3e2a2a6..aeaa0859793 100644
--- a/lib/tdb/test/run-nested-traverse.c
+++ b/lib/tdb/test/run-nested-traverse.c
@@ -41,7 +41,30 @@ static int traverse2(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
 	return 0;
 }
 
-static int traverse1(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
+static int traverse1r(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
+		     void *p)
+{
+	ok1(correct_key(key));
+	ok1(correct_data(data));
+	ok1(external_agent_operation(agent, TRANSACTION_START, tdb_name(tdb))
+	    == SUCCESS);
+	ok1(external_agent_operation(agent, STORE, tdb_name(tdb))
+	    == SUCCESS);
+	ok1(external_agent_operation(agent, TRANSACTION_COMMIT, tdb_name(tdb))
+	    == WOULD_HAVE_BLOCKED);
+	tdb_traverse(tdb, traverse2, NULL);
+
+	/* That should *not* release the all-records lock! */
+	ok1(external_agent_operation(agent, TRANSACTION_START, tdb_name(tdb))
+	    == SUCCESS);
+	ok1(external_agent_operation(agent, STORE, tdb_name(tdb))
+	    == SUCCESS);
+	ok1(external_agent_operation(agent, TRANSACTION_COMMIT, tdb_name(tdb))
+	    == WOULD_HAVE_BLOCKED);
+	return 0;
+}
+
+static int traverse1w(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
 		     void *p)
 {
 	ok1(correct_key(key));
@@ -50,7 +73,7 @@ static int traverse1(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data,
 	    == WOULD_HAVE_BLOCKED);
 	tdb_traverse(tdb, traverse2, NULL);
 
-	/* That should *not* release the transaction lock! */
+	/* That should *not* release the all-records lock! */
 	ok1(external_agent_operation(agent, TRANSACTION_START, tdb_name(tdb))
 	    == WOULD_HAVE_BLOCKED);
 	return 0;
@@ -80,8 +103,8 @@ int main(int argc, char *argv[])
 	data.dsize = strlen("world");
 
 	ok1(tdb_store(tdb, key, data, TDB_INSERT) == 0);
-	tdb_traverse(tdb, traverse1, NULL);
-	tdb_traverse_read(tdb, traverse1, NULL);
+	tdb_traverse(tdb, traverse1w, NULL);
+	tdb_traverse_read(tdb, traverse1r, NULL);
 	tdb_close(tdb);
 
 	return exit_status();
-- 
2.11.0


From 0697cfccaba0b2578b650e48689bfb74dd3dba65 Mon Sep 17 00:00:00 2001
From: Garming Sam <garming at catalyst.net.nz>
Date: Thu, 30 Mar 2017 12:03:17 +1300
Subject: [PATCH 02/11] ldb_tdb: Ensure we correctly decrement
 ltdb->read_lock_count

If we do not do this, then we never take the all record lock, and instead do a lock
for every record as we go, which is very slow during a large search

Signed-off-by: Andrew Bartlett <abartlet at samba.org>
Signed-off-by: Garming Sam <garming at catalyst.net.nz>
---
 lib/ldb/ldb_tdb/ldb_tdb.c | 1 +
 1 file changed, 1 insertion(+)

diff --git a/lib/ldb/ldb_tdb/ldb_tdb.c b/lib/ldb/ldb_tdb/ldb_tdb.c
index 3dfd1ccf652..18fb67f0127 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.c
+++ b/lib/ldb/ldb_tdb/ldb_tdb.c
@@ -119,6 +119,7 @@ int ltdb_unlock_read(struct ldb_module *module)
 	struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
 	if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
 		tdb_unlockall_read(ltdb->tdb);
+		ltdb->read_lock_count--;
 		return 0;
 	}
 	ltdb->read_lock_count--;
-- 
2.11.0


From 59d1f91821d767d6fc2276424749711c6321ebc2 Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Tue, 25 Apr 2017 22:33:53 +1200
Subject: [PATCH 03/11] ldb: Show that writes do not appear during an
 ldb_search()

A modify or rename during a search must not cause a search to change
output, and attributes having an index should in particular not see
any change in behaviour in this respect

Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb/tests/ldb_mod_op_test.c | 340 ++++++++++++++++++++++++++++++++++++++++
 1 file changed, 340 insertions(+)

diff --git a/lib/ldb/tests/ldb_mod_op_test.c b/lib/ldb/tests/ldb_mod_op_test.c
index 84043cbaec5..2b313446604 100644
--- a/lib/ldb/tests/ldb_mod_op_test.c
+++ b/lib/ldb/tests/ldb_mod_op_test.c
@@ -1405,6 +1405,334 @@ static void test_ldb_search_against_transaction(void **state)
 
 }
 
+/*
+ * This test is also complex.
+ * The purpose is to test if a modify can occour during a ldb_search()
+ * This would be a failure if if in process
+ * (1) and (2):
+ *  - (1) ltdb_search() starts and calls back for one entry
+ *  - (2) one of the entries to be matched is modified
+ *  - (1) the indexed search tries to return the modified entry, but
+ *        it is no longer found, either: 
+ *          - despite it still matching (dn changed)
+ *          - it no longer matching (attrs changed)
+ *
+ * We also try un-indexed to show that the behaviour differs on this
+ * point, which it should not (an index should only impact search
+ * speed).
+ */
+
+struct modify_during_search_test_ctx {
+	struct ldbtest_ctx *test_ctx;
+	int res_count;
+	pid_t child_pid;
+	struct ldb_dn *basedn;
+	bool got_cn;
+	bool got_2_cn;
+	bool rename;
+};
+
+/*
+ * This purpose of this callback is to trigger a write in
+ * the child process while a search is in progress.
+ *
+ * In tdb 1.3.12 tdb_traverse_read() take the read transaction lock
+ * however in ldb 1.1.29 ltdb_search() forgets to take the all-record
+ * lock (except the very first time) due to a ref-counting bug.
+ *
+ * We assume that if the write will proceed, it will proceed in a 3
+ * second window after the function is called.
+ */
+
+static int test_ldb_modify_during_search_callback1(struct ldb_request *req,
+						   struct ldb_reply *ares)
+{
+	int ret;
+	int pipes[2];
+	char buf[2];
+	struct modify_during_search_test_ctx *ctx = req->context;
+	switch (ares->type) {
+	case LDB_REPLY_ENTRY:
+	{
+		const struct ldb_val *cn_val
+			= ldb_dn_get_component_val(ares->message->dn, 0);
+		const char *cn = (char *)cn_val->data;
+		ctx->res_count++;
+		if (strcmp(cn, "test_search_cn") == 0) {
+			ctx->got_cn = true;
+		} else if (strcmp(cn, "test_search_2_cn") == 0) {
+			ctx->got_2_cn = true;
+		}
+		if (ctx->res_count == 2) {
+			return LDB_SUCCESS;
+		}
+		break;
+	}
+	case LDB_REPLY_REFERRAL:
+		return LDB_SUCCESS;
+
+	case LDB_REPLY_DONE:
+		return ldb_request_done(req, LDB_SUCCESS);
+	}
+
+	ret = pipe(pipes);
+	assert_int_equal(ret, 0);
+
+	ctx->child_pid = fork();
+	if (ctx->child_pid == 0 && ctx->rename) {
+		TALLOC_CTX *tmp_ctx = NULL;
+		struct ldb_dn *dn, *new_dn;
+		TALLOC_FREE(ctx->test_ctx->ldb);
+		TALLOC_FREE(ctx->test_ctx->ev);
+		ctx->test_ctx->ev = tevent_context_init(ctx->test_ctx);
+		if (ctx->test_ctx->ev == NULL) {
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+
+		ctx->test_ctx->ldb = ldb_init(ctx->test_ctx,
+					      ctx->test_ctx->ev);
+		if (ctx->test_ctx->ldb == NULL) {
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+
+		ret = ldb_connect(ctx->test_ctx->ldb,
+				  ctx->test_ctx->dbpath, 0, NULL);
+		if (ret != LDB_SUCCESS) {
+			exit(ret);
+		}
+
+		tmp_ctx = talloc_new(ctx->test_ctx);
+		if (tmp_ctx == NULL) {
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+
+		if (ctx->got_cn) {
+			/* Modify the other one */
+			dn = ldb_dn_new_fmt(tmp_ctx, ctx->test_ctx->ldb,
+					    "cn=test_search_2_cn,"
+					    "dc=search_test_entry");
+		} else {
+			dn = ldb_dn_new_fmt(tmp_ctx, ctx->test_ctx->ldb,
+					    "cn=test_search_cn,"
+					    "dc=search_test_entry");
+		}
+		if (dn == NULL) {
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+
+		new_dn = ldb_dn_new_fmt(tmp_ctx, ctx->test_ctx->ldb,
+					"cn=test_search_cn_renamed,"
+					"dc=search_test_entry");
+		if (new_dn == NULL) {
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+
+		ret = ldb_transaction_start(ctx->test_ctx->ldb);
+		if (ret != 0) {
+			exit(ret);
+		}
+
+		if (write(pipes[1], "GO", 2) != 2) {
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+
+		ret = ldb_rename(ctx->test_ctx->ldb, dn, new_dn);
+		if (ret != 0) {
+			exit(ret);
+		}
+
+		ret = ldb_transaction_commit(ctx->test_ctx->ldb);
+		exit(ret);
+		
+	} else if (ctx->child_pid == 0) {
+		TALLOC_CTX *tmp_ctx = NULL;
+		struct ldb_message *msg;
+		struct ldb_message_element *el;
+		TALLOC_FREE(ctx->test_ctx->ldb);
+		TALLOC_FREE(ctx->test_ctx->ev);
+		ctx->test_ctx->ev = tevent_context_init(ctx->test_ctx);
+		if (ctx->test_ctx->ev == NULL) {
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+
+		ctx->test_ctx->ldb = ldb_init(ctx->test_ctx,
+					      ctx->test_ctx->ev);
+		if (ctx->test_ctx->ldb == NULL) {
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+
+		ret = ldb_connect(ctx->test_ctx->ldb,
+				  ctx->test_ctx->dbpath, 0, NULL);
+		if (ret != LDB_SUCCESS) {
+			exit(ret);
+		}
+
+		tmp_ctx = talloc_new(ctx->test_ctx);
+		if (tmp_ctx == NULL) {
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+
+		msg = ldb_msg_new(tmp_ctx);
+		if (msg == NULL) {
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+		
+		if (ctx->got_cn) {
+			/* Modify the other one */
+			msg->dn = ldb_dn_new_fmt(msg, ctx->test_ctx->ldb,
+						 "cn=test_search_2_cn,"
+						 "dc=search_test_entry");
+		} else {
+			msg->dn = ldb_dn_new_fmt(msg, ctx->test_ctx->ldb,
+						 "cn=test_search_cn,"
+						 "dc=search_test_entry");
+		}
+		if (msg->dn == NULL) {
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+
+		ret = ldb_msg_add_string(msg, "filterAttr", "TRUE");
+		if (ret != 0) {
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+		el = ldb_msg_find_element(msg, "filterAttr");
+		if (el == NULL) {
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+		el->flags = LDB_FLAG_MOD_REPLACE;
+
+		ret = ldb_transaction_start(ctx->test_ctx->ldb);
+		if (ret != 0) {
+			exit(ret);
+		}
+
+		if (write(pipes[1], "GO", 2) != 2) {
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+
+		ret = ldb_modify(ctx->test_ctx->ldb, msg);
+		if (ret != 0) {
+			exit(ret);
+		}
+
+		ret = ldb_transaction_commit(ctx->test_ctx->ldb);
+		exit(ret);
+	}
+
+	ret = read(pipes[0], buf, 2);
+	assert_int_equal(ret, 2);
+
+	sleep(3);
+
+	return LDB_SUCCESS;
+}
+
+static void test_ldb_modify_during_search(void **state, bool add_index,
+					  bool rename)
+{
+	struct search_test_ctx *search_test_ctx = talloc_get_type_abort(*state,
+			struct search_test_ctx);
+	struct modify_during_search_test_ctx
+		ctx =
+		{ .res_count = 0,
+		  .test_ctx = search_test_ctx->ldb_test_ctx,
+		  .rename = rename
+		};
+
+	int ret;
+	struct ldb_request *req;
+	pid_t pid;
+	int wstatus;
+
+	if (add_index) {
+		struct ldb_message *msg;
+		struct ldb_dn *indexlist = ldb_dn_new(search_test_ctx,
+						      search_test_ctx->ldb_test_ctx->ldb,
+						      "@INDEXLIST");
+		assert_non_null(indexlist);
+
+		msg = ldb_msg_new(search_test_ctx);
+		assert_non_null(msg);
+
+		msg->dn = indexlist;
+
+		ret = ldb_msg_add_string(msg, "@IDXATTR", "cn");
+		assert_int_equal(ret, LDB_SUCCESS);
+
+		ret = ldb_add(search_test_ctx->ldb_test_ctx->ldb,
+			      msg);
+
+		assert_int_equal(ret, LDB_SUCCESS);
+	}
+
+	tevent_loop_allow_nesting(search_test_ctx->ldb_test_ctx->ev);
+
+	ctx.basedn
+		= ldb_dn_new_fmt(search_test_ctx,
+				 search_test_ctx->ldb_test_ctx->ldb,
+				 "%s",
+				 search_test_ctx->base_dn);
+	assert_non_null(ctx.basedn);
+
+
+	/*
+	 * This search must be over multiple items, and should include
+	 * the new name after a rename, to show that it would match
+	 * both before and after that modify 
+	 */
+	ret = ldb_build_search_req(&req,
+				   search_test_ctx->ldb_test_ctx->ldb,
+				   search_test_ctx,
+				   ctx.basedn,
+				   LDB_SCOPE_SUBTREE,
+				   "(&(!(filterAttr=*))"
+				   "(|(cn=test_search_cn_renamed)(cn=test_search_cn)(cn=test_search_2_cn)))",
+				   NULL,
+				   NULL,
+				   &ctx,
+				   test_ldb_modify_during_search_callback1,
+				   NULL);
+	assert_int_equal(ret, 0);
+	ret = ldb_request(search_test_ctx->ldb_test_ctx->ldb, req);
+
+	if (ret == LDB_SUCCESS) {
+		ret = ldb_wait(req->handle, LDB_WAIT_ALL);
+	}
+	assert_int_equal(ret, 0);
+	assert_int_equal(ctx.res_count, 2);
+	assert_int_equal(ctx.got_cn, true);
+	assert_int_equal(ctx.got_2_cn, true);
+
+	pid = waitpid(ctx.child_pid, &wstatus, 0);
+	assert_int_equal(pid, ctx.child_pid);
+
+	assert_true(WIFEXITED(wstatus));
+
+	assert_int_equal(WEXITSTATUS(wstatus), 0);
+
+
+}
+
+static void test_ldb_modify_during_indexed_search(void **state)
+{
+	return test_ldb_modify_during_search(state, true, false);
+}
+
+static void test_ldb_modify_during_unindexed_search(void **state)
+{
+	return test_ldb_modify_during_search(state, false, false);
+}
+
+static void test_ldb_rename_during_indexed_search(void **state)
+{
+	return test_ldb_modify_during_search(state, true, true);
+}
+
+static void test_ldb_rename_during_unindexed_search(void **state)
+{
+	return test_ldb_modify_during_search(state, false, true);
+}
+
 static int ldb_case_test_setup(void **state)
 {
 	int ret;
@@ -1759,6 +2087,18 @@ int main(int argc, const char **argv)
 		cmocka_unit_test_setup_teardown(test_ldb_search_against_transaction,
 						ldb_search_test_setup,
 						ldb_search_test_teardown),
+		cmocka_unit_test_setup_teardown(test_ldb_modify_during_unindexed_search,
+						ldb_search_test_setup,
+						ldb_search_test_teardown),
+		cmocka_unit_test_setup_teardown(test_ldb_modify_during_indexed_search,
+						ldb_search_test_setup,
+						ldb_search_test_teardown),
+		cmocka_unit_test_setup_teardown(test_ldb_rename_during_unindexed_search,
+						ldb_search_test_setup,
+						ldb_search_test_teardown),
+		cmocka_unit_test_setup_teardown(test_ldb_rename_during_indexed_search,
+						ldb_search_test_setup,
+						ldb_search_test_teardown),
 		cmocka_unit_test_setup_teardown(test_ldb_attrs_case_insensitive,
 						ldb_case_test_setup,
 						ldb_case_test_teardown),
-- 
2.11.0


From af94ad76c6be4982ede8befe960e9d4d89478efa Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Fri, 12 May 2017 01:35:13 +0200
Subject: [PATCH 04/11] dsdb: Allocate OIDs for DB read lock and unlock

Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
 source4/dsdb/samdb/samdb.h | 7 +++++++
 1 file changed, 7 insertions(+)

diff --git a/source4/dsdb/samdb/samdb.h b/source4/dsdb/samdb/samdb.h
index 5dce37e2e9c..471aeeaf443 100644
--- a/source4/dsdb/samdb/samdb.h
+++ b/source4/dsdb/samdb/samdb.h
@@ -299,6 +299,13 @@ struct dsdb_extended_allocate_rid {
 };
 
 /*
+ * these are declared in ldb_module.h and takes no data. This entry is
+ * included to avoid duplicate OID allocation.
+ */
+/* #define LDB_EXTENDED_READ_LOCK_DB "1.3.6.1.4.1.7165.4.4.10" */
+/* #define LDB_EXTENDED_READ_UNLOCK_DB "1.3.6.1.4.1.7165.4.4.11" */
+
+/*
  * passed from the descriptor module in order to
  * store the recalucated nTSecurityDescriptor without
  * modifying the replPropertyMetaData.
-- 
2.11.0


From 59077c205214e5f08151258ef88e887733c06f11 Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Fri, 12 May 2017 01:38:14 +0200
Subject: [PATCH 05/11] ldb: Define OIDs LDB_EXTENDED_READ_LOCK_DB and
 LDB_EXTENDED_READ_UNLOCK_DB

These will be used to allow Samba to provide a consistent view of the DB
despite the use of multiple databases via the partitions module

Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb/include/ldb_module.h | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/lib/ldb/include/ldb_module.h b/lib/ldb/include/ldb_module.h
index 833d5a8f3d0..6b790dfcdc9 100644
--- a/lib/ldb/include/ldb_module.h
+++ b/lib/ldb/include/ldb_module.h
@@ -436,4 +436,12 @@ int ldb_unpack_data_only_attr_list_flags(struct ldb_context *ldb,
 #define LDB_UNPACK_DATA_FLAG_NO_DN           0x0002
 #define LDB_UNPACK_DATA_FLAG_NO_VALUES_ALLOC 0x0004
 
+/* 
+ * These extended operations allow a read lock to be taken during the
+ * overall search operation, not just for each DB read.  This ensures
+ * no modification while the search is being processed. 
+ */
+#define LDB_EXTENDED_READ_LOCK_DB "1.3.6.1.4.1.7165.4.4.10"
+#define LDB_EXTENDED_READ_UNLOCK_DB "1.3.6.1.4.1.7165.4.4.11"
+
 #endif
-- 
2.11.0


From d062b0d82fcbf44be4a7a9d33a325e4d0cc12818 Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Wed, 24 May 2017 11:05:18 +1200
Subject: [PATCH 06/11] dsdb: Allow locking requests no matter who the caller
 is

The LDB_EXTENDED_READ_LOCK_DB and LDB_EXTENDED_READ_UNLOCK_DB operations are not
exposed over LDAP

Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
 source4/dsdb/samdb/ldb_modules/acl.c | 10 +++++++---
 1 file changed, 7 insertions(+), 3 deletions(-)

diff --git a/source4/dsdb/samdb/ldb_modules/acl.c b/source4/dsdb/samdb/ldb_modules/acl.c
index 27d4e7659cc..d710a65b8fb 100644
--- a/source4/dsdb/samdb/ldb_modules/acl.c
+++ b/source4/dsdb/samdb/ldb_modules/acl.c
@@ -1934,9 +1934,13 @@ static int acl_extended(struct ldb_module *module, struct ldb_request *req)
 	struct ldb_context *ldb = ldb_module_get_ctx(module);
 	struct ldb_control *as_system = ldb_request_get_control(req, LDB_CONTROL_AS_SYSTEM_OID);
 
-	/* allow everybody to read the sequence number */
-	if (strcmp(req->op.extended.oid,
-		   LDB_EXTENDED_SEQUENCE_NUMBER) == 0) {
+	/* allow everybody to read the sequence number, lock and unlock the DB */
+	if ((strcmp(req->op.extended.oid,
+		    LDB_EXTENDED_SEQUENCE_NUMBER) == 0) ||
+	    (strcmp(req->op.extended.oid,
+		    LDB_EXTENDED_READ_LOCK_DB) == 0) ||
+	    (strcmp(req->op.extended.oid,
+		    LDB_EXTENDED_READ_UNLOCK_DB) == 0)) {
 		return ldb_next_request(module, req);
 	}
 
-- 
2.11.0


From 46fa9fc62f5b104f2c152d42d80851422fe19ca6 Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Mon, 22 May 2017 16:18:20 +1200
Subject: [PATCH 07/11] ldb: Add test encoding current locking behaviour during
 ldb_search()

Currently, a lock is not held against modifications once the final
record is returned via a callback, so modifications can be made
during the DONE callback.  This makes it hard to write modules
that interpert an ldb search result and do further processing
so will change in the future to allow the full search to be
atomic.

Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb/tests/ldb_mod_op_test.c | 204 ++++++++++++++++++++++++++++++++++++++++
 1 file changed, 204 insertions(+)

diff --git a/lib/ldb/tests/ldb_mod_op_test.c b/lib/ldb/tests/ldb_mod_op_test.c
index 2b313446604..55941d5a6f2 100644
--- a/lib/ldb/tests/ldb_mod_op_test.c
+++ b/lib/ldb/tests/ldb_mod_op_test.c
@@ -1733,6 +1733,207 @@ static void test_ldb_rename_during_unindexed_search(void **state)
 	return test_ldb_modify_during_search(state, false, true);
 }
 
+/*
+ * This test is also complex.
+ *
+ * The purpose is to test if a modify can occour during a ldb_search()
+ * before the end of the callback
+ *
+ * This would be a failure if if in process
+ * (1) and (2):
+ *  - (1) ldb_search() starts and calls back for a number of entries
+ *  - (2) an entry in the DB is allowed to change before the callback returns
+ *  - (1) the callback can see the modification
+ * 
+ */
+
+/*
+ * This purpose of this callback is to trigger a write in
+ * the child process while a search DONE callback is in progress.
+ *
+ * In ldb 1.1.29 ldb_search() omitted to take a all-record
+ * lock for the full duration of the search and callbacks
+ *
+ * We assume that if the write will proceed, it will proceed in a 3
+ * second window after the function is called.
+ */
+
+static int test_ldb_modify_during_whole_search_callback1(struct ldb_request *req,
+							 struct ldb_reply *ares)
+{
+	int ret;
+	int pipes[2];
+	char buf[2];
+	struct modify_during_search_test_ctx *ctx = req->context;
+	struct ldb_dn *search_dn;
+	struct ldb_result *res2;
+	
+	switch (ares->type) {
+	case LDB_REPLY_ENTRY:
+	case LDB_REPLY_REFERRAL:
+		return LDB_SUCCESS;
+
+	case LDB_REPLY_DONE:
+		break;
+	}
+
+	ret = pipe(pipes);
+	assert_int_equal(ret, 0);
+
+	ctx->child_pid = fork();
+	if (ctx->child_pid == 0) {
+		TALLOC_CTX *tmp_ctx = NULL;
+		struct ldb_message *msg;
+		struct ldb_message_element *el;
+		TALLOC_FREE(ctx->test_ctx->ldb);
+		TALLOC_FREE(ctx->test_ctx->ev);
+		ctx->test_ctx->ev = tevent_context_init(ctx->test_ctx);
+		if (ctx->test_ctx->ev == NULL) {
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+
+		ctx->test_ctx->ldb = ldb_init(ctx->test_ctx,
+					      ctx->test_ctx->ev);
+		if (ctx->test_ctx->ldb == NULL) {
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+
+		ret = ldb_connect(ctx->test_ctx->ldb,
+				  ctx->test_ctx->dbpath, 0, NULL);
+		if (ret != LDB_SUCCESS) {
+			exit(ret);
+		}
+
+		tmp_ctx = talloc_new(ctx->test_ctx);
+		if (tmp_ctx == NULL) {
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+
+		msg = ldb_msg_new(tmp_ctx);
+		if (msg == NULL) {
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+		
+		msg->dn = ldb_dn_new_fmt(msg, ctx->test_ctx->ldb,
+					 "cn=test_search_cn,"
+					 "dc=search_test_entry");
+		if (msg->dn == NULL) {
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+
+		ret = ldb_msg_add_string(msg, "filterAttr", "TRUE");
+		if (ret != 0) {
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+		el = ldb_msg_find_element(msg, "filterAttr");
+		if (el == NULL) {
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+		el->flags = LDB_FLAG_MOD_REPLACE;
+
+		ret = ldb_transaction_start(ctx->test_ctx->ldb);
+		if (ret != 0) {
+			exit(ret);
+		}
+
+		if (write(pipes[1], "GO", 2) != 2) {
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+
+		ret = ldb_modify(ctx->test_ctx->ldb, msg);
+		if (ret != 0) {
+			exit(ret);
+		}
+
+		ret = ldb_transaction_commit(ctx->test_ctx->ldb);
+		exit(ret);
+	}
+
+	ret = read(pipes[0], buf, 2);
+	assert_int_equal(ret, 2);
+
+	sleep(3);
+
+	/*
+	 * If writes are not blocked until after this function, we
+	 * will be able to successfully search for this modification
+	 * here 
+	 */
+
+	search_dn = ldb_dn_new_fmt(ares, ctx->test_ctx->ldb,
+				   "cn=test_search_cn,"
+				   "dc=search_test_entry");
+
+	ret = ldb_search(ctx->test_ctx->ldb, ares,
+			 &res2, search_dn, LDB_SCOPE_BASE, NULL,
+			 "filterAttr=TRUE");
+	assert_int_equal(ret, 0);
+
+	/* We got the result */
+	assert_int_equal(res2->count, 1);
+		
+	return ldb_request_done(req, LDB_SUCCESS);
+}
+
+static void test_ldb_modify_during_whole_search(void **state)
+{
+	struct search_test_ctx *search_test_ctx = talloc_get_type_abort(*state,
+			struct search_test_ctx);
+	struct modify_during_search_test_ctx
+		ctx =
+		{ 
+		  .test_ctx = search_test_ctx->ldb_test_ctx,
+		};
+
+	int ret;
+	struct ldb_request *req;
+	pid_t pid;
+	int wstatus;
+
+	tevent_loop_allow_nesting(search_test_ctx->ldb_test_ctx->ev);
+
+	ctx.basedn
+		= ldb_dn_new_fmt(search_test_ctx,
+				 search_test_ctx->ldb_test_ctx->ldb,
+				 "%s",
+				 search_test_ctx->base_dn);
+	assert_non_null(ctx.basedn);
+
+
+	/*
+	 * The search just needs to call DONE, we don't care about the
+	 * contents of the search for this test
+	 */
+	ret = ldb_build_search_req(&req,
+				   search_test_ctx->ldb_test_ctx->ldb,
+				   search_test_ctx,
+				   ctx.basedn,
+				   LDB_SCOPE_SUBTREE,
+				   "(&(!(filterAttr=*))"
+				   "(cn=test_search_cn))",
+				   NULL,
+				   NULL,
+				   &ctx,
+				   test_ldb_modify_during_whole_search_callback1,
+				   NULL);
+	assert_int_equal(ret, 0);
+	ret = ldb_request(search_test_ctx->ldb_test_ctx->ldb, req);
+
+	if (ret == LDB_SUCCESS) {
+		ret = ldb_wait(req->handle, LDB_WAIT_ALL);
+	}
+	assert_int_equal(ret, 0);
+
+	pid = waitpid(ctx.child_pid, &wstatus, 0);
+	assert_int_equal(pid, ctx.child_pid);
+
+	assert_true(WIFEXITED(wstatus));
+
+	assert_int_equal(WEXITSTATUS(wstatus), 0);
+
+
+}
+
 static int ldb_case_test_setup(void **state)
 {
 	int ret;
@@ -2099,6 +2300,9 @@ int main(int argc, const char **argv)
 		cmocka_unit_test_setup_teardown(test_ldb_rename_during_indexed_search,
 						ldb_search_test_setup,
 						ldb_search_test_teardown),
+		cmocka_unit_test_setup_teardown(test_ldb_modify_during_whole_search,
+						ldb_search_test_setup,
+						ldb_search_test_teardown),
 		cmocka_unit_test_setup_teardown(test_ldb_attrs_case_insensitive,
 						ldb_case_test_setup,
 						ldb_case_test_teardown),
-- 
2.11.0


From 67151ff03a3ce790f8458cda29514c53ce964f36 Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Tue, 23 May 2017 15:11:59 +1200
Subject: [PATCH 08/11] dsdb: Teach the Samba partition module how to lock all
 the DB backends

Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
 source4/dsdb/samdb/ldb_modules/partition.c | 154 +++++++++++++++++++++++++++++
 source4/dsdb/samdb/ldb_modules/partition.h |   1 +
 2 files changed, 155 insertions(+)

diff --git a/source4/dsdb/samdb/ldb_modules/partition.c b/source4/dsdb/samdb/ldb_modules/partition.c
index 08a959cabb5..923598fcfaa 100644
--- a/source4/dsdb/samdb/ldb_modules/partition.c
+++ b/source4/dsdb/samdb/ldb_modules/partition.c
@@ -1144,6 +1144,152 @@ static int partition_sequence_number(struct ldb_module *module, struct ldb_reque
 	return ldb_module_done(req, NULL, ext, LDB_SUCCESS);
 }
 
+/* lock or unlock all the backends */
+static int partition_lock_backend(struct ldb_module *module, struct ldb_request *req, bool lock)
+{
+	int i;
+	int ret;
+	int ret2;
+	struct ldb_context *ldb = ldb_module_get_ctx(module);
+	struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
+							      struct partition_private_data);
+	/* Look at base DN */
+	/* Figure out which partition it is under */
+	/* Skip the lot if 'data' isn't here yet (initialization) */
+	if (ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING) {
+		if (lock) {
+			ldb_debug(ldb, LDB_DEBUG_TRACE, "partition_lock_backend() -> (metadata partition)");
+		} else {
+			ldb_debug(ldb, LDB_DEBUG_TRACE, "partition_unlock_backend() -> (metadata partition)");
+		}
+	}
+
+	/* This order must match that in prepare_commit() */
+	ret = ldb_next_request(module, req);
+	if (ret != LDB_SUCCESS) {
+		if (lock) {
+			ldb_debug(ldb,
+				  LDB_DEBUG_FATAL,
+				  "Failed to lock db: %s / %s for metadata partition",
+				  ldb_errstring(ldb),
+				  ldb_strerror(ret));
+		} else {
+			ldb_debug(ldb,
+				  LDB_DEBUG_FATAL,
+				  "Failed to unlock db: %s / %s for metadata partition",
+				  ldb_errstring(ldb),
+				  ldb_strerror(ret));
+		}
+				
+		return ret;
+	}
+
+	ret = partition_reload_if_required(module, data, NULL);
+	if (ret != LDB_SUCCESS) {
+		if (lock == false) {
+			return ret;
+		}
+		
+		if (ret != LDB_SUCCESS) {
+			struct ldb_request *unlock_req = NULL;
+			ret2 = ldb_build_extended_req(&unlock_req, ldb, req,
+						      LDB_EXTENDED_READ_UNLOCK_DB,
+						      NULL, NULL,
+						      NULL, NULL,
+						      NULL);
+			LDB_REQ_SET_LOCATION(unlock_req);
+			if (ret2 != LDB_SUCCESS) {
+				/* Really, what else can we do? */
+				ldb_debug(ldb,
+					  LDB_DEBUG_FATAL,
+					  "Failed to allocate memory to unlock db");
+				return ret;
+			}
+			ret2 = ldb_next_request(module, unlock_req);
+			if (ret2 != LDB_SUCCESS) {
+				ldb_debug(ldb,
+					  LDB_DEBUG_FATAL,
+					  "Failed to unlock primary db: %s / %s",
+					  ldb_errstring(ldb),
+					  ldb_strerror(ret2));
+			}
+		}
+		return ret;
+	}
+
+	for (i=0; data && data->partitions && data->partitions[i]; i++) {
+		struct ldb_request *unlock_req = NULL;
+		if ((module && ldb_module_flags(ldb) & LDB_FLG_ENABLE_TRACING)) {
+			if (lock) {
+				ldb_debug(ldb, LDB_DEBUG_TRACE,
+					  "partition_lock_backend() -> %s",
+					  ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
+			} else {
+				ldb_debug(ldb, LDB_DEBUG_TRACE,
+					  "partition_unlock_backend() -> %s",
+					  ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
+			}
+		}
+		ret = partition_request(data->partitions[i]->module, req);
+		if (ret == LDB_SUCCESS) {
+			continue;
+		}
+			
+		if (lock == false) {
+			return ret;
+		}
+		
+		ldb_debug(ldb,
+			  LDB_DEBUG_FATAL,
+			  "Failed to lock db: %s / %s for %s",
+			  ldb_errstring(ldb),
+			  ldb_strerror(ret),
+			  ldb_dn_get_linearized(data->partitions[i]->ctrl->dn));
+				
+		ret2 = ldb_build_extended_req(&unlock_req, ldb, req,
+					      LDB_EXTENDED_READ_UNLOCK_DB, NULL, NULL,
+					      NULL, NULL,
+					      NULL);
+		LDB_REQ_SET_LOCATION(unlock_req);
+		if (ret2 != LDB_SUCCESS) {
+			/* Really, what else can we do? */
+			ldb_debug(ldb,
+				  LDB_DEBUG_FATAL,
+					  "Failed to allocate memory to unlock db");
+			return ret;
+		}
+		/* Back it out, if it fails on one */
+		for (i--; i >= 0; i--) {
+			ret2 = partition_request(data->partitions[i]->module, unlock_req);
+			if (ret2 != LDB_SUCCESS) {
+				ldb_debug(ldb,
+					  LDB_DEBUG_FATAL,
+					  "Failed to unlock db: %s / %s",
+					  ldb_errstring(ldb),
+					  ldb_strerror(ret2));
+			}
+		}
+		ret2 = partition_request(module, unlock_req);
+		if (ret2 != LDB_SUCCESS) {
+			ldb_debug(ldb,
+				  LDB_DEBUG_FATAL,
+				  "Failed to unlock db: %s / %s",
+				  ldb_errstring(ldb),
+				  ldb_strerror(ret2));
+		}
+		return ret;
+	}
+
+	if (lock) {
+		data->db_read_locked++;
+	} else {
+		data->db_read_locked--;
+	}
+
+	return LDB_SUCCESS;
+}
+
+
 /* extended */
 static int partition_extended(struct ldb_module *module, struct ldb_request *req)
 {
@@ -1157,6 +1303,14 @@ static int partition_extended(struct ldb_module *module, struct ldb_request *req
 		return ldb_next_request(module, req);
 	}
 
+	if (strcmp(req->op.extended.oid, LDB_EXTENDED_READ_LOCK_DB) == 0) {
+		return partition_lock_backend(module, req, true);
+	}
+
+	if (strcmp(req->op.extended.oid, LDB_EXTENDED_READ_UNLOCK_DB) == 0) {
+		return partition_lock_backend(module, req, false);
+	}
+
 	if (strcmp(req->op.extended.oid, DSDB_EXTENDED_SCHEMA_UPDATE_NOW_OID) == 0) {
 		/* Update the metadata.tdb to increment the schema version if needed*/
 		DEBUG(10, ("Incrementing the sequence_number after schema_update_now\n"));
diff --git a/source4/dsdb/samdb/ldb_modules/partition.h b/source4/dsdb/samdb/ldb_modules/partition.h
index ea05e9404d5..cd6cbdb700c 100644
--- a/source4/dsdb/samdb/ldb_modules/partition.h
+++ b/source4/dsdb/samdb/ldb_modules/partition.h
@@ -55,6 +55,7 @@ struct partition_private_data {
 
 	uint64_t metadata_seq;
 	uint32_t in_transaction;
+	uint32_t db_read_locked;
 
 	struct ldb_message *forced_module_msg;
 };
-- 
2.11.0


From 2bd060f804fac5bc78a2ddcea1d5619c56af476a Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Fri, 12 May 2017 01:39:08 +0200
Subject: [PATCH 09/11] ldb_tdb: Implement OIDs LDB_EXTENDED_READ_LOCK_DB and
 LDB_EXTENDED_READ_UNLOCK_DB

This allows Samba to provide a consistent view of the DB
despite the use of multiple databases via the partitions module
and over multiple callbacks via a module stack.

Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb/ldb_tdb/ldb_tdb.c | 29 ++++++++++++++++++++++++++++-
 1 file changed, 28 insertions(+), 1 deletion(-)

diff --git a/lib/ldb/ldb_tdb/ldb_tdb.c b/lib/ldb/ldb_tdb/ldb_tdb.c
index 18fb67f0127..b326a3e3f18 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.c
+++ b/lib/ldb/ldb_tdb/ldb_tdb.c
@@ -1521,6 +1521,33 @@ static int ltdb_handle_request(struct ldb_module *module,
 	return LDB_SUCCESS;
 }
 
+/*
+ * lock and unlock must be handled without an event loop 
+ *
+ * This means we have to handle it here, before the ltdb_handle_request()
+ */
+static int ltdb_handle_extended_or_lock(struct ldb_module *module,
+					struct ldb_request *req)
+{
+	if (strcmp(req->op.extended.oid,
+		   LDB_EXTENDED_READ_LOCK_DB) == 0) {
+		/* Lock (or increment ref count) on the DB */
+		ldb_debug(ldb_module_get_ctx(module),
+			  LDB_DEBUG_FATAL,
+			  "lock db");
+		return ltdb_lock_read(module);
+	} else if (strcmp(req->op.extended.oid,
+			  LDB_EXTENDED_READ_UNLOCK_DB) == 0) {
+		/* Unlock (or decrement ref count) on the DB */
+		ldb_debug(ldb_module_get_ctx(module),
+			  LDB_DEBUG_FATAL,
+			  "UNlock db");
+		return ltdb_unlock_read(module);
+	} else {
+		return ltdb_handle_request(module, req);
+	}
+}
+
 static int ltdb_init_rootdse(struct ldb_module *module)
 {
 	/* ignore errors on this - we expect it for non-sam databases */
@@ -1538,7 +1565,7 @@ static const struct ldb_module_ops ltdb_ops = {
 	.modify            = ltdb_handle_request,
 	.del               = ltdb_handle_request,
 	.rename            = ltdb_handle_request,
-	.extended          = ltdb_handle_request,
+	.extended          = ltdb_handle_extended_or_lock,
 	.start_transaction = ltdb_start_trans,
 	.end_transaction   = ltdb_end_trans,
 	.prepare_commit    = ltdb_prepare_commit,
-- 
2.11.0


From 3f53acf3f839baf3b6c22c311c320998dd4e1c06 Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Tue, 23 May 2017 15:14:28 +1200
Subject: [PATCH 10/11] ldb: Lock the whole backend database for the duration
 of a search

We must hold locks not just for the duration of each search, but for the whole search
as our module stack may make multiple search requests to build up the whole result.

This is explains a number of replication and read corruption issues in Samba

Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb/common/ldb.c            | 155 +++++++++++++++++++++++++++++++++++++++-
 lib/ldb/include/ldb_module.h    |   3 +
 lib/ldb/tests/ldb_mod_op_test.c |  40 ++++++++++-
 3 files changed, 194 insertions(+), 4 deletions(-)

diff --git a/lib/ldb/common/ldb.c b/lib/ldb/common/ldb.c
index 606725603a0..4b587179988 100644
--- a/lib/ldb/common/ldb.c
+++ b/lib/ldb/common/ldb.c
@@ -876,6 +876,87 @@ static int ldb_msg_check_element_flags(struct ldb_context *ldb,
 	return LDB_SUCCESS;
 }
 
+/* 
+ * Because we choose to overload the extended operation API, we have
+ * to have this, even despite the locking calls never using callbacks
+ */
+static int ldb_lock_noop_callback(struct ldb_request *req,
+				  struct ldb_reply *ares)
+{
+	return LDB_ERR_OPERATIONS_ERROR;
+}
+
+struct ldb_db_lock_context {
+	struct ldb_request *unlock_req;
+	struct ldb_request *req;
+	struct ldb_context *ldb;
+};
+
+/*
+ * We have to have a the unlock on a destructor so that we unlock the
+ * DB if a caller calls talloc_free(req).  We trust that the ldb
+ * context has not already gone away.
+ */
+static int ldb_db_lock_destructor(struct ldb_db_lock_context *lock_context)
+{
+	int ret;
+	struct ldb_module *module;
+	FIRST_OP_NOERR(lock_context->ldb, extended);
+	if (module != NULL) {
+		ret = module->ops->extended(module, lock_context->unlock_req);
+	} else {
+		ret = LDB_SUCCESS;
+	}
+
+	if (ret != LDB_SUCCESS) {
+		ldb_debug(lock_context->ldb,
+			  LDB_DEBUG_FATAL,
+			  "Failed to unlock db: %s / %s",
+			  ldb_errstring(lock_context->ldb),
+			  ldb_strerror(ret));
+	}
+	return 0;
+}
+
+static int ldb_lock_backend_callback(struct ldb_request *req,
+				     struct ldb_reply *ares)
+{
+	struct ldb_db_lock_context *lock_context;
+	int ret;
+	
+	lock_context = talloc_get_type(req->context,
+				       struct ldb_db_lock_context);
+
+	if (ares != NULL && ares->error == LDB_SUCCESS) {
+		switch (ares->type) {
+		case LDB_REPLY_ENTRY:
+			return ldb_module_send_entry(lock_context->req,
+						     ares->message,
+						     ares->controls);
+			
+		case LDB_REPLY_REFERRAL:
+			return ldb_module_send_referral(lock_context->req,
+							ares->referral);
+		default:
+			break;
+		}
+	}
+
+	if (!ares) {
+		ret = ldb_module_done(lock_context->req, NULL, NULL,
+				       LDB_ERR_OPERATIONS_ERROR);
+	}
+	ret = ldb_module_done(lock_context->req, ares->controls,
+			       ares->response, ares->error);
+	/*
+	 * If there is an error, or this is LDB_REPLY_DONE, unlock the
+	 * DB by calling the destructor on this context
+	 */
+	talloc_free(lock_context);
+
+	return ret;
+}
+
 
 /*
   start an ldb request
@@ -901,15 +982,87 @@ int ldb_request(struct ldb_context *ldb, struct ldb_request *req)
 	/* call the first module in the chain */
 	switch (req->operation) {
 	case LDB_SEARCH:
+	{
+		struct ldb_request *down_req = NULL;
+		struct ldb_request *lock_req = NULL;
+		struct ldb_db_lock_context *lock_context;
 		/* due to "ldb_build_search_req" base DN always != NULL */
 		if (!ldb_dn_validate(req->op.search.base)) {
 			ldb_asprintf_errstring(ldb, "ldb_search: invalid basedn '%s'",
 					       ldb_dn_get_linearized(req->op.search.base));
 			return LDB_ERR_INVALID_DN_SYNTAX;
 		}
+
+		lock_context = talloc(req, struct ldb_db_lock_context);
+		if (lock_context == NULL) {
+			return ldb_oom(ldb);
+		}
+
+		lock_context->ldb = ldb;
+		lock_context->req = req;
+		
+		ret = ldb_build_search_req_ex(&down_req, ldb, req,
+					      req->op.search.base,
+					      req->op.search.scope,
+					      req->op.search.tree,
+					      req->op.search.attrs,
+					      req->controls,
+					      lock_context,
+					      ldb_lock_backend_callback,
+					      req);
+		LDB_REQ_SET_LOCATION(down_req);
+		if (ret != LDB_SUCCESS) {
+			return ret;
+		}
+
+		ret = ldb_build_extended_req(&lock_req, ldb, down_req,
+					     LDB_EXTENDED_READ_LOCK_DB, NULL, NULL,
+					     NULL, ldb_lock_noop_callback, 
+					     NULL);
+		LDB_REQ_SET_LOCATION(lock_req);
+		if (ret != LDB_SUCCESS) {
+			return ret;
+		}
+		
+		ret = ldb_build_extended_req(&lock_context->unlock_req, ldb, down_req,
+					     LDB_EXTENDED_READ_UNLOCK_DB, NULL, NULL,
+					     NULL, ldb_lock_noop_callback, 
+					     NULL);
+		LDB_REQ_SET_LOCATION(lock_req);
+		if (ret != LDB_SUCCESS) {
+			return ret;
+		}
+		
+		/* call DB lock */
+		FIRST_OP_NOERR(ldb, extended);
+		if (module != NULL) {
+			ret = module->ops->extended(module, lock_req);
+		} else {
+			ret = LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION;
+		}
+		
+		TALLOC_FREE(lock_req);
+		if (ret == LDB_ERR_UNSUPPORTED_CRITICAL_EXTENSION) {
+			/* We might be talking LDAP */
+			ldb_reset_err_string(ldb);
+			ret = 0;
+			TALLOC_FREE(lock_context->unlock_req);
+		} else if ((ret != LDB_SUCCESS) && (ldb->err_string == NULL)) {
+			/* if no error string was setup by the backend */
+			ldb_asprintf_errstring(ldb, "Failed to get DB lock: %s (%d)",
+					       ldb_strerror(ret), ret);
+		} else {
+			talloc_set_destructor(lock_context, ldb_db_lock_destructor);
+		}
+		
+		if (ret != LDB_SUCCESS) {
+			return ret;
+		}
+	
 		FIRST_OP(ldb, search);
-		ret = module->ops->search(module, req);
+		ret = module->ops->search(module, down_req);
 		break;
+	}
 	case LDB_ADD:
 		if (!ldb_dn_validate(req->op.add.message->dn)) {
 			ldb_asprintf_errstring(ldb, "ldb_add: invalid dn '%s'",
diff --git a/lib/ldb/include/ldb_module.h b/lib/ldb/include/ldb_module.h
index 6b790dfcdc9..4c442e2d977 100644
--- a/lib/ldb/include/ldb_module.h
+++ b/lib/ldb/include/ldb_module.h
@@ -440,6 +440,9 @@ int ldb_unpack_data_only_attr_list_flags(struct ldb_context *ldb,
  * These extended operations allow a read lock to be taken during the
  * overall search operation, not just for each DB read.  This ensures
  * no modification while the search is being processed. 
+ *
+ * They must NOT be handled via an event loop, they must return
+ * or fail directly, and do not use the callback or data
  */
 #define LDB_EXTENDED_READ_LOCK_DB "1.3.6.1.4.1.7165.4.4.10"
 #define LDB_EXTENDED_READ_UNLOCK_DB "1.3.6.1.4.1.7165.4.4.11"
diff --git a/lib/ldb/tests/ldb_mod_op_test.c b/lib/ldb/tests/ldb_mod_op_test.c
index 55941d5a6f2..e037c8ced7d 100644
--- a/lib/ldb/tests/ldb_mod_op_test.c
+++ b/lib/ldb/tests/ldb_mod_op_test.c
@@ -1330,6 +1330,12 @@ static int test_ldb_search_against_transaction_callback1(struct ldb_request *req
 				   ctx,
 				   test_ldb_search_against_transaction_callback2,
 				   NULL);
+	/* 
+	 * NOTE WELL: 
+	 * If these assertions fail, we will also fail to
+	 * clean up as we hold locks 
+	 */
+
 	assert_int_equal(ret, 0);
 	ret = ldb_request(ctx->test_ctx->ldb, req);
 
@@ -1867,10 +1873,16 @@ static int test_ldb_modify_during_whole_search_callback1(struct ldb_request *req
 	ret = ldb_search(ctx->test_ctx->ldb, ares,
 			 &res2, search_dn, LDB_SCOPE_BASE, NULL,
 			 "filterAttr=TRUE");
+
+	/* 
+	 * NOTE WELL: 
+	 * If these assertions fail, we will also fail to
+	 * clean up as we hold locks 
+	 */
 	assert_int_equal(ret, 0);
 
-	/* We got the result */
-	assert_int_equal(res2->count, 1);
+	/* We should not have got the result */
+	assert_int_equal(res2->count, 0);
 		
 	return ldb_request_done(req, LDB_SUCCESS);
 }
@@ -1889,7 +1901,9 @@ static void test_ldb_modify_during_whole_search(void **state)
 	struct ldb_request *req;
 	pid_t pid;
 	int wstatus;
-
+	struct ldb_dn *search_dn;
+	struct ldb_result *res2;
+	
 	tevent_loop_allow_nesting(search_test_ctx->ldb_test_ctx->ev);
 
 	ctx.basedn
@@ -1931,6 +1945,26 @@ static void test_ldb_modify_during_whole_search(void **state)
 
 	assert_int_equal(WEXITSTATUS(wstatus), 0);
 
+	/*
+	 * If writes are blocked until after the search function, we
+	 * will be able to successfully search for this modification
+	 * now
+	 */
+
+	search_dn = ldb_dn_new_fmt(search_test_ctx,
+				   search_test_ctx->ldb_test_ctx->ldb,
+				   "cn=test_search_cn,"
+				   "dc=search_test_entry");
+
+	ret = ldb_search(search_test_ctx->ldb_test_ctx->ldb,
+			 search_test_ctx,
+			 &res2, search_dn, LDB_SCOPE_BASE, NULL,
+			 "filterAttr=TRUE");
+	assert_int_equal(ret, 0);
+
+	/* We got the result */
+	assert_int_equal(res2->count, 1);
+		
 
 }
 
-- 
2.11.0


From 9903123e37ccae835d8df562ec300c998cf7e127 Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Tue, 23 May 2017 17:06:01 +1200
Subject: [PATCH 11/11] HACK: remove lock logs

---
 lib/ldb/ldb_tdb/ldb_tdb.c | 12 ++++++------
 1 file changed, 6 insertions(+), 6 deletions(-)

diff --git a/lib/ldb/ldb_tdb/ldb_tdb.c b/lib/ldb/ldb_tdb/ldb_tdb.c
index b326a3e3f18..6e45bd0e38f 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.c
+++ b/lib/ldb/ldb_tdb/ldb_tdb.c
@@ -1532,16 +1532,16 @@ static int ltdb_handle_extended_or_lock(struct ldb_module *module,
 	if (strcmp(req->op.extended.oid,
 		   LDB_EXTENDED_READ_LOCK_DB) == 0) {
 		/* Lock (or increment ref count) on the DB */
-		ldb_debug(ldb_module_get_ctx(module),
-			  LDB_DEBUG_FATAL,
-			  "lock db");
+//		ldb_debug(ldb_module_get_ctx(module),
+//			  LDB_DEBUG_FATAL,
+//			  "lock db");
 		return ltdb_lock_read(module);
 	} else if (strcmp(req->op.extended.oid,
 			  LDB_EXTENDED_READ_UNLOCK_DB) == 0) {
 		/* Unlock (or decrement ref count) on the DB */
-		ldb_debug(ldb_module_get_ctx(module),
-			  LDB_DEBUG_FATAL,
-			  "UNlock db");
+//		ldb_debug(ldb_module_get_ctx(module),
+//			  LDB_DEBUG_FATAL,
+//			  "UNlock db");
 		return ltdb_unlock_read(module);
 	} else {
 		return ltdb_handle_request(module, req);
-- 
2.11.0



More information about the samba-technical mailing list