LMDB key value backend for ldb_tdb (to be renamed ldb_key_val)

Andrew Bartlett abartlet at samba.org
Mon Mar 5 05:31:33 UTC 2018


On Thu, 2018-02-22 at 11:45 +1300, Garming Sam via samba-technical
wrote:
> Hi,
> 
> This is our current set of patches for implementing an LMDB based
> backend for LDB. The work is based on a prototype I wrote around this
> time last year inspired by Jakub's efforts. In saying that, the approach
> I took was completely different. The idea was to refactor ldb_tdb to be
> agnostic about which database backend was being used. The advantage has
> been quite minimal amount of code required to implement a functional
> 64-bit database backend. Many of the performance optimizations made for
> ldb_tdb can simply be reused, but conversely, for now we have deferred
> re-thinking the overall architecture e.g. consolidating the partitions
> into a single file using LMDB sub databases.

G'Day Garming,

Attached is a few of your patches that I've reviewed, to prepare the
way for this in Samba master.  Thanks for the cross-review in the
office today.

> Currently this backend must have keys restricted to less than 511 bytes,
> which is fine for our new GUID indexing scheme, but can run into issues
> with our indexes. Gary is currently working on using truncated keys to
> bypass this limit.

I'm very glad to say that this has been very well addressed by Gary's
patches.  When the rest lands we will need to ensure the integration of
the two works (mostly a matter of running the new tests on both).

> This current set of patches passes autobuild while still running with
> the TDB backend. We have patches to pass the testsuite using the LMDB
> backend, but a few of them still need tidying up and have been omitted
> here for now.

These, and the actual LMDB backend, will be in the next round.

> Performance numbers at this point seem a bit tricky to obtain. Our
> existing perf testing infrastructure relies on the test environment,
> however, LMDB appears to run noticeably slower under cwrap due to some
> calls being intercepted. Basic testing indicates better baseline figures
> and better concurrency (reads no longer block writes, and writes
> effectively only block reads during commit - not prepare commit), but
> there needs to be a lot more testing to properly understand the
> performance characteristics.
> 
> Somewhat related, I am currently investigating running our tests without
> socket wrapper by using network namespaces. There are still areas where
> being able to run socket wrapper is definitely useful, but performance
> testing is definitely not one of them.
> 
> Noteworthy fixes required or bugs found:
> 
> - metadata.tdb needs to be committed last during a transaction
> (getncchanges tests were causing replication errors as reads occurred
> between the commit of metadata.tdb and the rest of the partitions).
> 
> - schema loading during a read lock needs a cached value (as writes can
> happen during reads, long running read-locked operations could read new
> metadata.tdb values).

These fixes are included in the reviewed set. 

I would like to push the attached tomorrow, once I get confirmation
they pass a private autobuild. 

Thanks!

Andrew Bartlett
-- 
Andrew Bartlett                       http://samba.org/~abartlet/
Authentication Developer, Samba Team  http://samba.org
Samba Developer, Catalyst IT          http://catalyst.net.nz/services/samba
-------------- next part --------------
From 1c47c20f085e89c09aa0447b690dae104c789841 Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Thu, 1 Feb 2018 17:16:13 +1300
Subject: [PATCH 01/19] partition: Use a transaction to write and a read lock
 to read the LDB_METADATA_SEQ_NUM

This is critical as otherwise we can read a sequence number in advance
of the data that it represents and so have a false cache.

Signed-off-by: Andrew Bartlett <abartlet at samba.org>
Reviewed-by: Garming Sam <garming at catalyst.net.nz>
---
 source4/dsdb/samdb/ldb_modules/partition.c         | 12 ++--
 .../dsdb/samdb/ldb_modules/partition_metadata.c    | 65 ++++++++++++++++++----
 2 files changed, 60 insertions(+), 17 deletions(-)

diff --git a/source4/dsdb/samdb/ldb_modules/partition.c b/source4/dsdb/samdb/ldb_modules/partition.c
index 426fce36e4a..2cb05f9ef3a 100644
--- a/source4/dsdb/samdb/ldb_modules/partition.c
+++ b/source4/dsdb/samdb/ldb_modules/partition.c
@@ -861,7 +861,7 @@ static int partition_rename(struct ldb_module *module, struct ldb_request *req)
 }
 
 /* start a transaction */
-static int partition_start_trans(struct ldb_module *module)
+int partition_start_trans(struct ldb_module *module)
 {
 	int i;
 	int ret;
@@ -923,7 +923,7 @@ static int partition_start_trans(struct ldb_module *module)
 }
 
 /* prepare for a commit */
-static int partition_prepare_commit(struct ldb_module *module)
+int partition_prepare_commit(struct ldb_module *module)
 {
 	unsigned int i;
 	struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
@@ -960,7 +960,7 @@ static int partition_prepare_commit(struct ldb_module *module)
 
 
 /* end a transaction */
-static int partition_end_trans(struct ldb_module *module)
+int partition_end_trans(struct ldb_module *module)
 {
 	int ret, ret2;
 	unsigned int i;
@@ -1006,7 +1006,7 @@ static int partition_end_trans(struct ldb_module *module)
 }
 
 /* delete a transaction */
-static int partition_del_trans(struct ldb_module *module)
+int partition_del_trans(struct ldb_module *module)
 {
 	int ret, final_ret = LDB_SUCCESS;
 	unsigned int i;
@@ -1205,7 +1205,7 @@ static int partition_sequence_number(struct ldb_module *module, struct ldb_reque
 }
 
 /* lock all the backends */
-static int partition_read_lock(struct ldb_module *module)
+int partition_read_lock(struct ldb_module *module)
 {
 	int i;
 	int ret;
@@ -1309,7 +1309,7 @@ static int partition_read_lock(struct ldb_module *module)
 }
 
 /* unlock all the backends */
-static int partition_read_unlock(struct ldb_module *module)
+int partition_read_unlock(struct ldb_module *module)
 {
 	int i;
 	int ret = LDB_SUCCESS;
diff --git a/source4/dsdb/samdb/ldb_modules/partition_metadata.c b/source4/dsdb/samdb/ldb_modules/partition_metadata.c
index 3c5180b7ab7..e3ad0d8c6c2 100644
--- a/source4/dsdb/samdb/ldb_modules/partition_metadata.c
+++ b/source4/dsdb/samdb/ldb_modules/partition_metadata.c
@@ -300,7 +300,8 @@ int partition_metadata_init(struct ldb_module *module)
 
 	ret = partition_metadata_open(module, false);
 	if (ret == LDB_SUCCESS) {
-		goto end;
+		/* Great, we got the DB open */
+		return LDB_SUCCESS;
 	}
 
 	/* metadata.tdb does not exist, create it */
@@ -314,18 +315,42 @@ int partition_metadata_init(struct ldb_module *module)
 				       "Migrating partition metadata: "
 				       "create of metadata.tdb gave: %s\n",
 				       ldb_errstring(ldb_module_get_ctx(module)));
-		talloc_free(data->metadata);
-		data->metadata = NULL;
-		goto end;
+		TALLOC_FREE(data->metadata);
+		return ret;
+	}
+
+	/*
+	 * We need to fill in the sequence number from the DB, so we
+	 * need to get a lock over all the databases.  We only read
+	 * from the main partitions, but write to metadata so to avoid
+	 * lock ordering we just get a transaction over the lot.
+	 */
+	ret = partition_start_trans(module);
+	if (ret != LDB_SUCCESS) {
+		TALLOC_FREE(data->metadata);
+		return ret;
 	}
 
 	ret = partition_metadata_set_sequence_number(module);
 	if (ret != LDB_SUCCESS) {
-		talloc_free(data->metadata);
-		data->metadata = NULL;
+		TALLOC_FREE(data->metadata);
+		partition_del_trans(module);
+		return ret;
+	}
+
+	ret = partition_prepare_commit(module);
+	if (ret != LDB_SUCCESS) {
+		TALLOC_FREE(data->metadata);
+		partition_del_trans(module);
+		return ret;
+	}
+
+	ret = partition_end_trans(module);
+	if (ret != LDB_SUCCESS) {
+		/* Nothing much we can do */
+		TALLOC_FREE(data->metadata);
 	}
 
-end:
 	return ret;
 }
 
@@ -335,10 +360,28 @@ end:
  */
 int partition_metadata_sequence_number(struct ldb_module *module, uint64_t *value)
 {
-	return partition_metadata_get_uint64(module,
-					     LDB_METADATA_SEQ_NUM,
-					     value,
-					     0);
+
+	/* We have to lock all the databases as otherwise we can
+	 * return a sequence number that is higher than the DB values
+	 * that we can see, as those transactions close after the
+	 * metadata.tdb transaction closes */
+	int ret = partition_read_lock(module);
+	if (ret != LDB_SUCCESS) {
+		return ret;
+	}
+
+	ret = partition_metadata_get_uint64(module,
+					    LDB_METADATA_SEQ_NUM,
+					    value,
+					    0);
+	if (ret == LDB_SUCCESS) {
+		ret = partition_read_unlock(module);
+	} else {
+		/* Don't overwrite the error code */
+		partition_read_unlock(module);
+	}
+	return ret;
+
 }
 
 
-- 
2.14.3


From 8cb307dd068c8122f891cfda5905aa0af2e31ddc Mon Sep 17 00:00:00 2001
From: Garming Sam <garming at catalyst.net.nz>
Date: Fri, 2 Feb 2018 12:05:27 +1300
Subject: [PATCH 02/19] schema: Do not read different schema sequence values
 during a read transaction

During a read lock, we find ourselves seeing an unchanged schema, but
reading any updates to the metadata.tdb (in the case of lmdb, where
reads do not block writes).

The alternative is to read-lock the entire metadata.tdb, however, this
allows more concurrency by allowing reads not to block writes.

Signed-off-by: Garming Sam <garming at catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet at samba.org>
---
 source4/dsdb/samdb/ldb_modules/schema_load.c | 86 ++++++++++++++++++++++++++--
 1 file changed, 80 insertions(+), 6 deletions(-)

diff --git a/source4/dsdb/samdb/ldb_modules/schema_load.c b/source4/dsdb/samdb/ldb_modules/schema_load.c
index 0bbd5fc60f0..f1a01d73f5f 100644
--- a/source4/dsdb/samdb/ldb_modules/schema_load.c
+++ b/source4/dsdb/samdb/ldb_modules/schema_load.c
@@ -36,8 +36,10 @@
 #include "system/filesys.h"
 struct schema_load_private_data {
 	struct ldb_module *module;
-	bool in_transaction;
+	uint64_t in_transaction;
+	uint64_t in_read_transaction;
 	struct tdb_wrap *metadata;
+	uint64_t schema_seq_num_read_lock;
 	uint64_t schema_seq_num_cache;
 	int tdb_seqnum;
 };
@@ -193,7 +195,7 @@ static struct dsdb_schema *dsdb_schema_refresh(struct ldb_module *module, struct
 		return schema;
 	}
 
-	if (private_data->in_transaction) {
+	if (private_data->in_transaction > 0) {
 
 		/*
 		 * If the refresh is not an expected part of a larger
@@ -221,7 +223,19 @@ static struct dsdb_schema *dsdb_schema_refresh(struct ldb_module *module, struct
 	 * continue to hit the database to get the highest USN.
 	 */
 
-	ret = schema_metadata_get_uint64(private_data, DSDB_METADATA_SCHEMA_SEQ_NUM, &schema_seq_num, 0);
+	if (private_data->in_read_transaction > 0) {
+		/*
+		 * We must give a static value of the metadata sequence number
+		 * during a read lock, otherwise, we will fail to load the
+		 * schema at runtime.
+		 */
+		schema_seq_num = private_data->schema_seq_num_read_lock;
+		ret = LDB_SUCCESS;
+	} else {
+		ret = schema_metadata_get_uint64(private_data,
+						 DSDB_METADATA_SCHEMA_SEQ_NUM,
+						 &schema_seq_num, 0);
+	}
 
 	if (schema != NULL) {
 		if (ret == LDB_SUCCESS) {
@@ -564,7 +578,7 @@ static int schema_load_start_transaction(struct ldb_module *module)
 			      "schema_load_init: dsdb_get_schema failed");
 		return LDB_ERR_OPERATIONS_ERROR;
 	}
-	private_data->in_transaction = true;
+	private_data->in_transaction++;
 
 	return ldb_next_start_trans(module);
 }
@@ -573,8 +587,14 @@ static int schema_load_end_transaction(struct ldb_module *module)
 {
 	struct schema_load_private_data *private_data =
 		talloc_get_type(ldb_module_get_private(module), struct schema_load_private_data);
+	struct ldb_context *ldb = ldb_module_get_ctx(module);
 
-	private_data->in_transaction = false;
+	if (private_data->in_transaction == 0) {
+		ldb_debug_set(ldb, LDB_DEBUG_FATAL,
+			      "schema_load_end_transaction: transaction mismatch");
+		return LDB_ERR_OPERATIONS_ERROR;
+	}
+	private_data->in_transaction--;
 
 	return ldb_next_end_trans(module);
 }
@@ -583,8 +603,14 @@ static int schema_load_del_transaction(struct ldb_module *module)
 {
 	struct schema_load_private_data *private_data =
 		talloc_get_type(ldb_module_get_private(module), struct schema_load_private_data);
+	struct ldb_context *ldb = ldb_module_get_ctx(module);
 
-	private_data->in_transaction = false;
+	if (private_data->in_transaction == 0) {
+		ldb_debug_set(ldb, LDB_DEBUG_FATAL,
+			      "schema_load_del_transaction: transaction mismatch");
+		return LDB_ERR_OPERATIONS_ERROR;
+	}
+	private_data->in_transaction--;
 
 	return ldb_next_del_trans(module);
 }
@@ -618,6 +644,52 @@ static int schema_load_extended(struct ldb_module *module, struct ldb_request *r
 	return ldb_next_request(module, req);
 }
 
+static int schema_read_lock(struct ldb_module *module)
+{
+	struct schema_load_private_data *private_data =
+		talloc_get_type(ldb_module_get_private(module), struct schema_load_private_data);
+	uint64_t schema_seq_num = 0;
+
+	if (private_data == NULL) {
+		return ldb_next_read_lock(module);
+	}
+
+	if (private_data->in_transaction == 0 &&
+	    private_data->in_read_transaction == 0) {
+		/*
+		 * This appears to fail during the init path, so do not bother
+		 * checking the return, and return 0 (reload schema).
+		 */
+		schema_metadata_get_uint64(private_data,
+					   DSDB_METADATA_SCHEMA_SEQ_NUM,
+					   &schema_seq_num, 0);
+
+		private_data->schema_seq_num_read_lock = schema_seq_num;
+	}
+	private_data->in_read_transaction++;
+
+	return ldb_next_read_lock(module);
+
+}
+
+static int schema_read_unlock(struct ldb_module *module)
+{
+	struct schema_load_private_data *private_data =
+		talloc_get_type(ldb_module_get_private(module), struct schema_load_private_data);
+
+	if (private_data == NULL) {
+		return ldb_next_read_unlock(module);
+	}
+
+	if (private_data->in_transaction == 0 &&
+	    private_data->in_read_transaction == 1) {
+		private_data->schema_seq_num_read_lock = 0;
+	}
+	private_data->in_read_transaction--;
+
+	return ldb_next_read_unlock(module);
+}
+
 
 static const struct ldb_module_ops ldb_schema_load_module_ops = {
 	.name		= "schema_load",
@@ -627,6 +699,8 @@ static const struct ldb_module_ops ldb_schema_load_module_ops = {
 	.start_transaction = schema_load_start_transaction,
 	.end_transaction   = schema_load_end_transaction,
 	.del_transaction   = schema_load_del_transaction,
+	.read_lock	= schema_read_lock,
+	.read_unlock	= schema_read_unlock,
 };
 
 int ldb_schema_load_module_init(const char *version)
-- 
2.14.3


From 91691101e204271bac51f32b21428f6b6de79e2d Mon Sep 17 00:00:00 2001
From: Garming Sam <garming at catalyst.net.nz>
Date: Wed, 7 Feb 2018 23:21:45 +1300
Subject: [PATCH 03/19] partition: Leave metadata.tdb unlocking until last

With the lmdb patches, I have cleanly observed the database being read
in between the commit of the metadata.tdb and the eventual commits of
the individual partitions.

Signed-off-by: Garming Sam <garming at catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet at samba.org>
---
 source4/dsdb/samdb/ldb_modules/partition.c | 20 ++++++++++++--------
 1 file changed, 12 insertions(+), 8 deletions(-)

diff --git a/source4/dsdb/samdb/ldb_modules/partition.c b/source4/dsdb/samdb/ldb_modules/partition.c
index 2cb05f9ef3a..422ed369ff5 100644
--- a/source4/dsdb/samdb/ldb_modules/partition.c
+++ b/source4/dsdb/samdb/ldb_modules/partition.c
@@ -976,10 +976,6 @@ int partition_end_trans(struct ldb_module *module)
 		data->in_transaction--;
 	}
 
-	ret2 = partition_metadata_end_trans(module);
-	if (ret2 != LDB_SUCCESS) {
-		ret = ret2;
-	}
 
 	for (i=0; data && data->partitions && data->partitions[i]; i++) {
 		if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
@@ -1002,6 +998,12 @@ int partition_end_trans(struct ldb_module *module)
 	if (ret2 != LDB_SUCCESS) {
 		ret = ret2;
 	}
+
+	ret2 = partition_metadata_end_trans(module);
+	if (ret2 != LDB_SUCCESS) {
+		ret = ret2;
+	}
+
 	return ret;
 }
 
@@ -1012,10 +1014,6 @@ int partition_del_trans(struct ldb_module *module)
 	unsigned int i;
 	struct partition_private_data *data = talloc_get_type(ldb_module_get_private(module),
 							      struct partition_private_data);
-	ret = partition_metadata_del_trans(module);
-	if (ret != LDB_SUCCESS) {
-		final_ret = ret;
-	}
 
 	for (i=0; data && data->partitions && data->partitions[i]; i++) {
 		if ((module && ldb_module_flags(ldb_module_get_ctx(module)) & LDB_FLG_ENABLE_TRACING)) {
@@ -1044,6 +1042,12 @@ int partition_del_trans(struct ldb_module *module)
 	if (ret != LDB_SUCCESS) {
 		final_ret = ret;
 	}
+
+	ret = partition_metadata_del_trans(module);
+	if (ret != LDB_SUCCESS) {
+		final_ret = ret;
+	}
+
 	return final_ret;
 }
 
-- 
2.14.3


From 16e19cf6aa432865d601a23a5b0d83a9bfea5529 Mon Sep 17 00:00:00 2001
From: Gary Lockyer <gary at catalyst.net.nz>
Date: Tue, 23 Jan 2018 11:02:28 +1300
Subject: [PATCH 04/19] ldb_mod_op_test: Fix core dump on
 ldb_case_attrs_index_test_teardown

With no schema syntax, this would occasionally crash as it dereferenced
some possibly NULL sequence of memory.

Note: Removing all tests except this one, made it crash reliably.

Signed-off-by: Gary Lockyer <gary at catalyst.net.nz>
Reviewed-by: Garming Sam <garming at catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb/tests/ldb_mod_op_test.c | 8 ++++++++
 1 file changed, 8 insertions(+)

diff --git a/lib/ldb/tests/ldb_mod_op_test.c b/lib/ldb/tests/ldb_mod_op_test.c
index 766ca798335..5878143d8f6 100644
--- a/lib/ldb/tests/ldb_mod_op_test.c
+++ b/lib/ldb/tests/ldb_mod_op_test.c
@@ -2587,6 +2587,14 @@ static void test_ldb_attrs_index_handler(void **state)
 						    syntax, &cn_attr_2);
 	assert_int_equal(ret, LDB_SUCCESS);
 
+	syntax = ldb_standard_syntax_by_name(ldb, LDB_SYNTAX_OCTET_STRING);
+	assert_non_null(syntax);
+
+	ret = ldb_schema_attribute_fill_with_syntax(ldb, ldb,
+						    "", 0,
+						    syntax, &default_attr);
+	assert_int_equal(ret, LDB_SUCCESS);
+
 	/*
 	 * Set an attribute handler
 	 */
-- 
2.14.3


From 494d1cbea0fe87308a5839013cc6cc2745f83eec Mon Sep 17 00:00:00 2001
From: Gary Lockyer <gary at catalyst.net.nz>
Date: Fri, 19 Jan 2018 09:16:04 +1300
Subject: [PATCH 05/19] remove_dc.py: Abort transaction before throwing an
 exception

Signed-off-by: Gary Lockyer <gary at catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet at samba.org>
---
 python/samba/remove_dc.py | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/python/samba/remove_dc.py b/python/samba/remove_dc.py
index da3b628ae9f..6b86a554efa 100644
--- a/python/samba/remove_dc.py
+++ b/python/samba/remove_dc.py
@@ -394,6 +394,7 @@ def remove_dc(samdb, logger, dc_name):
                                   % (dc_name, samdb.domain_dns_name()), estr)
 
         if (len(server_msgs) == 0):
+            samdb.transaction_cancel()
             raise DemoteException("%s is not an AD DC in %s"
                                   % (dc_name, samdb.domain_dns_name()))
         server_dn = server_msgs[0].dn
@@ -412,6 +413,7 @@ def remove_dc(samdb, logger, dc_name):
             ntds_msgs = []
             pass
         else:
+            samdb.transaction_cancel()
             raise DemoteException("Failure checking if %s is an NTDS DSA in %s: "
                                   % (ntds_dn, samdb.domain_dns_name()), estr)
 
@@ -419,6 +421,7 @@ def remove_dc(samdb, logger, dc_name):
     # object, just remove the server object located above
     if (len(ntds_msgs) == 0):
         if server_dn is None:
+            samdb.transaction_cancel()
             raise DemoteException("%s is not an AD DC in %s"
                                   % (dc_name, samdb.domain_dns_name()))
 
-- 
2.14.3


From 1d9e24c693afc90a6c55c0bc3311b77e804a5794 Mon Sep 17 00:00:00 2001
From: Garming Sam <garming at catalyst.net.nz>
Date: Wed, 22 Nov 2017 12:37:07 +1300
Subject: [PATCH 06/19] schema_set: Add a missing newline between functions

Signed-off-by: Garming Sam <garming at catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet at samba.org>
---
 source4/dsdb/schema/schema_set.c | 1 +
 1 file changed, 1 insertion(+)

diff --git a/source4/dsdb/schema/schema_set.c b/source4/dsdb/schema/schema_set.c
index 226e31e4019..8674757f3b0 100644
--- a/source4/dsdb/schema/schema_set.c
+++ b/source4/dsdb/schema/schema_set.c
@@ -50,6 +50,7 @@ const struct ldb_schema_attribute *dsdb_attribute_handler_override(struct ldb_co
 	}
 	return a->ldb_schema_attribute;
 }
+
 /*
  * Set the attribute handlers onto the LDB, and potentially write the
  * @INDEXLIST, @IDXONE and @ATTRIBUTES records.  The @ATTRIBUTES records
-- 
2.14.3


From 8985b8bdc8e63b03c3372e172f396ded3bd7dec9 Mon Sep 17 00:00:00 2001
From: Bob Campbell <bobcampbell at catalyst.net.nz>
Date: Tue, 11 Jul 2017 16:40:14 +1200
Subject: [PATCH 07/19] samdb/schema_load: do schema loading with one search

It appears that there was a race condition between searching for the
attribute & class definitions, and searching for the schema object, if
the schema was changed in-between the two searches.

This is likely the cause of ldap_schema being flapping.

BUG: https://bugzilla.samba.org/show_bug.cgi?id=12889

Signed-off-by: Bob Campbell <bobcampbell at catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet at samba.org>
Reviewed-by: Garming Sam <garming at catalyst.net.nz>
---
 source4/dsdb/samdb/ldb_modules/schema_load.c | 62 ++++++++++++++--------------
 source4/dsdb/schema/schema_init.c            | 24 ++++++++---
 2 files changed, 51 insertions(+), 35 deletions(-)

diff --git a/source4/dsdb/samdb/ldb_modules/schema_load.c b/source4/dsdb/samdb/ldb_modules/schema_load.c
index f1a01d73f5f..2d712e2c6df 100644
--- a/source4/dsdb/samdb/ldb_modules/schema_load.c
+++ b/source4/dsdb/samdb/ldb_modules/schema_load.c
@@ -296,20 +296,17 @@ static int dsdb_schema_from_db(struct ldb_module *module,
 	struct ldb_context *ldb = ldb_module_get_ctx(module);
 	TALLOC_CTX *tmp_ctx;
 	char *error_string;
-	int ret;
+	int ret, i;
 	struct ldb_dn *schema_dn = ldb_get_schema_basedn(ldb);
-	struct ldb_result *schema_res;
 	struct ldb_result *res;
-	static const char *schema_head_attrs[] = {
-		"prefixMap",
-		"schemaInfo",
-		"fSMORoleOwner",
-		NULL
-	};
+	struct ldb_message *schema_msg = NULL;
 	static const char *schema_attrs[] = {
 		DSDB_SCHEMA_COMMON_ATTRS,
 		DSDB_SCHEMA_ATTR_ATTRS,
 		DSDB_SCHEMA_CLASS_ATTRS,
+		"prefixMap",
+		"schemaInfo",
+		"fSMORoleOwner",
 		NULL
 	};
 	unsigned flags;
@@ -324,33 +321,20 @@ static int dsdb_schema_from_db(struct ldb_module *module,
 	ldb_set_flags(ldb, flags & ~LDB_FLG_ENABLE_TRACING);
 
 	/*
-	 * setup the prefix mappings and schema info
-	 */
-	ret = dsdb_module_search_dn(module, tmp_ctx, &schema_res,
-				    schema_dn, schema_head_attrs,
-				    DSDB_FLAG_NEXT_MODULE, NULL);
-	if (ret == LDB_ERR_NO_SUCH_OBJECT) {
-		ldb_reset_err_string(ldb);
-		ldb_debug(ldb, LDB_DEBUG_WARNING,
-			  "schema_load_init: no schema head present: (skip schema loading)\n");
-		goto failed;
-	} else if (ret != LDB_SUCCESS) {
-		ldb_asprintf_errstring(ldb, 
-				       "dsdb_schema: failed to search the schema head: %s",
-				       ldb_errstring(ldb));
-		goto failed;
-	}
-
-	/*
-	 * load the attribute definitions.
+	 * Load the attribute and class definitions, as well as
+	 * the schema object. We do this in one search and then
+	 * split it so that there isn't a race condition when
+	 * the schema is changed between two searches.
 	 */
 	ret = dsdb_module_search(module, tmp_ctx, &res,
-				 schema_dn, LDB_SCOPE_ONELEVEL,
+				 schema_dn, LDB_SCOPE_SUBTREE,
 				 schema_attrs,
 				 DSDB_FLAG_NEXT_MODULE |
 				 DSDB_SEARCH_SHOW_DN_IN_STORAGE_FORMAT,
 				 NULL,
-				 "(|(objectClass=attributeSchema)(objectClass=classSchema))");
+				 "(|(objectClass=attributeSchema)"
+				 "(objectClass=classSchema)"
+				 "(objectClass=dMD))");
 	if (ret != LDB_SUCCESS) {
 		ldb_asprintf_errstring(ldb, 
 				       "dsdb_schema: failed to search attributeSchema and classSchema objects: %s",
@@ -358,8 +342,26 @@ static int dsdb_schema_from_db(struct ldb_module *module,
 		goto failed;
 	}
 
+	/*
+	 * Separate the schema object from the attribute and
+	 * class objects.
+	 */
+	for (i = 0; i < res->count; i++) {
+		if (ldb_msg_find_element(res->msgs[i], "prefixMap")) {
+			schema_msg = res->msgs[i];
+			break;
+		}
+	}
+
+	if (schema_msg == NULL) {
+		ldb_asprintf_errstring(ldb,
+				       "dsdb_schema load failed: failed to find prefixMap");
+		ret = LDB_ERR_NO_SUCH_ATTRIBUTE;
+		goto failed;
+	}
+
 	ret = dsdb_schema_from_ldb_results(tmp_ctx, ldb,
-					   schema_res, res, schema, &error_string);
+					   schema_msg, res, schema, &error_string);
 	if (ret != LDB_SUCCESS) {
 		ldb_asprintf_errstring(ldb, 
 				       "dsdb_schema load failed: %s",
diff --git a/source4/dsdb/schema/schema_init.c b/source4/dsdb/schema/schema_init.c
index dbd504549d7..9314ce731b8 100644
--- a/source4/dsdb/schema/schema_init.c
+++ b/source4/dsdb/schema/schema_init.c
@@ -904,10 +904,24 @@ int dsdb_load_ldb_results_into_schema(TALLOC_CTX *mem_ctx, struct ldb_context *l
 				      char **error_string)
 {
 	unsigned int i;
+	WERROR status;
 
 	schema->ts_last_change = 0;
 	for (i=0; i < attrs_class_res->count; i++) {
-		WERROR status = dsdb_schema_set_el_from_ldb_msg(ldb, schema, attrs_class_res->msgs[i]);
+		const char *prefixMap = NULL;
+		/*
+		 * attrs_class_res also includes the schema object;
+		 * we only want to process classes & attributes
+		 */
+		prefixMap = ldb_msg_find_attr_as_string(
+				attrs_class_res->msgs[i],
+				"prefixMap", NULL);
+		if (prefixMap != NULL) {
+			continue;
+		}
+
+		status = dsdb_schema_set_el_from_ldb_msg(ldb, schema,
+							 attrs_class_res->msgs[i]);
 		if (!W_ERROR_IS_OK(status)) {
 			*error_string = talloc_asprintf(mem_ctx,
 				      "dsdb_load_ldb_results_into_schema: failed to load attribute or class definition: %s:%s",
@@ -928,7 +942,7 @@ int dsdb_load_ldb_results_into_schema(TALLOC_CTX *mem_ctx, struct ldb_context *l
 */
 
 int dsdb_schema_from_ldb_results(TALLOC_CTX *mem_ctx, struct ldb_context *ldb,
-				 struct ldb_result *schema_res,
+				 struct ldb_message *schema_msg,
 				 struct ldb_result *attrs_class_res,
 				 struct dsdb_schema **schema_out,
 				 char **error_string)
@@ -961,7 +975,7 @@ int dsdb_schema_from_ldb_results(TALLOC_CTX *mem_ctx, struct ldb_context *ldb,
 							      false);
 	}
 
-	prefix_val = ldb_msg_find_ldb_val(schema_res->msgs[0], "prefixMap");
+	prefix_val = ldb_msg_find_ldb_val(schema_msg, "prefixMap");
 	if (!prefix_val) {
 		*error_string = talloc_asprintf(mem_ctx, 
 						"schema_fsmo_init: no prefixMap attribute found");
@@ -969,7 +983,7 @@ int dsdb_schema_from_ldb_results(TALLOC_CTX *mem_ctx, struct ldb_context *ldb,
 		talloc_free(tmp_ctx);
 		return LDB_ERR_CONSTRAINT_VIOLATION;
 	}
-	info_val = ldb_msg_find_ldb_val(schema_res->msgs[0], "schemaInfo");
+	info_val = ldb_msg_find_ldb_val(schema_msg, "schemaInfo");
 	if (!info_val) {
 		status = dsdb_schema_info_blob_new(mem_ctx, &info_val_default);
 		if (!W_ERROR_IS_OK(status)) {
@@ -999,7 +1013,7 @@ int dsdb_schema_from_ldb_results(TALLOC_CTX *mem_ctx, struct ldb_context *ldb,
 		return ret;
 	}
 
-	schema->fsmo.master_dn = ldb_msg_find_attr_as_dn(ldb, schema, schema_res->msgs[0], "fSMORoleOwner");
+	schema->fsmo.master_dn = ldb_msg_find_attr_as_dn(ldb, schema, schema_msg, "fSMORoleOwner");
 	if (ldb_dn_compare(samdb_ntds_settings_dn(ldb, tmp_ctx), schema->fsmo.master_dn) == 0) {
 		schema->fsmo.we_are_master = true;
 	} else {
-- 
2.14.3


From 2d42705e5e40790b29dcb2876f94ff6707245de9 Mon Sep 17 00:00:00 2001
From: Garming Sam <garming at catalyst.net.nz>
Date: Tue, 21 Nov 2017 11:31:10 +1300
Subject: [PATCH 08/19] dsdb: The schema should be reloaded during the
 transaction

Reload the schema just after getting the tranaction lock
but before the transaction counter is bumped.

This ensures we reload the schema exactly once but with
the DB locked.

Signed-off-by: Garming Sam <garming at catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet at samba.org>
---
 source4/dsdb/samdb/ldb_modules/schema_load.c | 8 +++++++-
 1 file changed, 7 insertions(+), 1 deletion(-)

diff --git a/source4/dsdb/samdb/ldb_modules/schema_load.c b/source4/dsdb/samdb/ldb_modules/schema_load.c
index 2d712e2c6df..2099fac1159 100644
--- a/source4/dsdb/samdb/ldb_modules/schema_load.c
+++ b/source4/dsdb/samdb/ldb_modules/schema_load.c
@@ -572,6 +572,12 @@ static int schema_load_start_transaction(struct ldb_module *module)
 		talloc_get_type(ldb_module_get_private(module), struct schema_load_private_data);
 	struct ldb_context *ldb = ldb_module_get_ctx(module);
 	struct dsdb_schema *schema;
+	int ret;
+
+	ret = ldb_next_start_trans(module);
+	if (ret != LDB_SUCCESS) {
+		return ret;
+	}
 
 	/* Try the schema refresh now */
 	schema = dsdb_get_schema(ldb, NULL);
@@ -582,7 +588,7 @@ static int schema_load_start_transaction(struct ldb_module *module)
 	}
 	private_data->in_transaction++;
 
-	return ldb_next_start_trans(module);
+	return ret;
 }
 
 static int schema_load_end_transaction(struct ldb_module *module)
-- 
2.14.3


From 0636d492f24c01cc5d33439b3a40dbf3b70254dc Mon Sep 17 00:00:00 2001
From: Garming Sam <garming at catalyst.net.nz>
Date: Tue, 10 Jan 2017 19:05:40 +1300
Subject: [PATCH 09/19] ldb_tdb: Begin abstracting out the base key value
 operations

This will allow us to change the backend from tdb to lmdb.

Signed-off-by: Garming Sam <garming at catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb/ldb_tdb/ldb_tdb.c | 30 ++++++++++++++++++++++++++++++
 lib/ldb/ldb_tdb/ldb_tdb.h | 11 +++++++++++
 2 files changed, 41 insertions(+)

diff --git a/lib/ldb/ldb_tdb/ldb_tdb.c b/lib/ldb/ldb_tdb/ldb_tdb.c
index dcb877312a9..ee3a7814997 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.c
+++ b/lib/ldb/ldb_tdb/ldb_tdb.c
@@ -413,6 +413,16 @@ static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
 	return ret;
 }
 
+static int ltdb_tdb_store(struct ltdb_private *ltdb, TDB_DATA key, TDB_DATA data, int flags)
+{
+	return tdb_store(ltdb->tdb, key, data, flags);
+}
+
+static int ltdb_error(struct ltdb_private *ltdb)
+{
+	return ltdb_err_map(tdb_error(ltdb->tdb));
+}
+
 /*
   store a record into the db
 */
@@ -635,6 +645,11 @@ static int ltdb_add(struct ltdb_context *ctx)
 	return ret;
 }
 
+static int ltdb_tdb_delete(struct ltdb_private *ltdb, TDB_DATA tdb_key)
+{
+	return tdb_delete(ltdb->tdb, tdb_key);
+}
+
 /*
   delete a record from the database, not updating indexes (used for deleting
   index records)
@@ -1671,6 +1686,20 @@ static void ltdb_handle_extended(struct ltdb_context *ctx)
 	ltdb_request_extended_done(ctx, ext, ret);
 }
 
+static const char * ltdb_tdb_name(struct ltdb_private *ltdb)
+{
+	return tdb_name(ltdb->tdb);
+}
+
+static const struct kv_db_ops key_value_ops = {
+	.store = ltdb_tdb_store,
+	.delete = ltdb_tdb_delete,
+	.lock_read = ltdb_lock_read,
+	.unlock_read = ltdb_unlock_read,
+	.error = ltdb_error,
+	.name = ltdb_tdb_name,
+};
+
 static void ltdb_callback(struct tevent_context *ev,
 			  struct tevent_timer *te,
 			  struct timeval t,
@@ -1934,6 +1963,7 @@ static int ltdb_connect(struct ldb_context *ldb, const char *url,
 	}
 
 	ltdb->sequence_number = 0;
+	ltdb->kv_ops = &key_value_ops;
 
 	module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
 	if (!module) {
diff --git a/lib/ldb/ldb_tdb/ldb_tdb.h b/lib/ldb/ldb_tdb/ldb_tdb.h
index 421ae4d95d6..60ba2652a61 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.h
+++ b/lib/ldb/ldb_tdb/ldb_tdb.h
@@ -4,9 +4,20 @@
 #include "tdb.h"
 #include "ldb_module.h"
 
+struct ltdb_private;
+struct kv_db_ops {
+	int (*store)(struct ltdb_private *ltdb, TDB_DATA key, TDB_DATA data, int flags);
+	int (*delete)(struct ltdb_private *ltdb, TDB_DATA key);
+	int (*lock_read)(struct ldb_module *);
+	int (*unlock_read)(struct ldb_module *);
+	int (*error)(struct ltdb_private *ltdb);
+	const char * (*name)(struct ltdb_private *ltdb);
+};
+
 /* this private structure is used by the ltdb backend in the
    ldb_context */
 struct ltdb_private {
+	const struct kv_db_ops *kv_ops;
 	TDB_CONTEXT *tdb;
 	unsigned int connect_flags;
 	
-- 
2.14.3


From 1dc6d2aded20ba5bf967eb189d3853b84513795c Mon Sep 17 00:00:00 2001
From: Garming Sam <garming at catalyst.net.nz>
Date: Tue, 10 Jan 2017 20:45:02 +1300
Subject: [PATCH 10/19] ldb_tdb: Replace exists, name and error_map with key
 value ops

Signed-off-by: Garming Sam <garming at catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb/ldb_tdb/ldb_index.c  |  2 +-
 lib/ldb/ldb_tdb/ldb_search.c |  2 +-
 lib/ldb/ldb_tdb/ldb_tdb.c    | 16 ++++++++--------
 3 files changed, 10 insertions(+), 10 deletions(-)

diff --git a/lib/ldb/ldb_tdb/ldb_index.c b/lib/ldb/ldb_tdb/ldb_index.c
index 17cb267be78..263fe578ce2 100644
--- a/lib/ldb/ldb_tdb/ldb_index.c
+++ b/lib/ldb/ldb_tdb/ldb_index.c
@@ -3004,7 +3004,7 @@ int ltdb_reindex(struct ldb_module *module)
 		ldb_debug(ldb_module_get_ctx(module),
 			  LDB_DEBUG_WARNING, "Reindexing: re_index successful on %s, "
 			  "final index write-out will be in transaction commit",
-			  tdb_name(ltdb->tdb));
+			  ltdb->kv_ops->name(ltdb));
 	}
 	return LDB_SUCCESS;
 }
diff --git a/lib/ldb/ldb_tdb/ldb_search.c b/lib/ldb/ldb_tdb/ldb_search.c
index d8bf865e785..c82b36618ca 100644
--- a/lib/ldb/ldb_tdb/ldb_search.c
+++ b/lib/ldb/ldb_tdb/ldb_search.c
@@ -256,7 +256,7 @@ int ltdb_search_key(struct ldb_module *module, struct ltdb_private *ltdb,
 			       ltdb_parse_data_unpack, &ctx); 
 	
 	if (ret == -1) {
-		ret = ltdb_err_map(tdb_error(ltdb->tdb));
+		ret = ltdb->kv_ops->error(ltdb);
 		if (ret == LDB_SUCCESS) {
 			/*
 			 * Just to be sure we don't turn errors
diff --git a/lib/ldb/ldb_tdb/ldb_tdb.c b/lib/ldb/ldb_tdb/ldb_tdb.c
index ee3a7814997..e5ee4e3d00f 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.c
+++ b/lib/ldb/ldb_tdb/ldb_tdb.c
@@ -391,7 +391,7 @@ static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
 		if (ltdb->warn_reindex) {
 			ldb_debug(ldb_module_get_ctx(module),
 				LDB_DEBUG_ERROR, "Reindexing %s due to modification on %s",
-				tdb_name(ltdb->tdb), ldb_dn_get_linearized(dn));
+				ltdb->kv_ops->name(ltdb), ldb_dn_get_linearized(dn));
 		}
 		ret = ltdb_reindex(module);
 	}
@@ -459,10 +459,10 @@ int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flg
 	tdb_data.dptr = ldb_data.data;
 	tdb_data.dsize = ldb_data.length;
 
-	ret = tdb_store(ltdb->tdb, tdb_key, tdb_data, flgs);
+	ret = ltdb->kv_ops->store(ltdb, tdb_key, tdb_data, flgs);
 	if (ret != 0) {
 		bool is_special = ldb_dn_is_special(msg->dn);
-		ret = ltdb_err_map(tdb_error(ltdb->tdb));
+		ret = ltdb->kv_ops->error(ltdb);
 
 		/*
 		 * LDB_ERR_ENTRY_ALREADY_EXISTS means the DN, not
@@ -677,11 +677,11 @@ int ltdb_delete_noindex(struct ldb_module *module,
 		return LDB_ERR_OTHER;
 	}
 
-	ret = tdb_delete(ltdb->tdb, tdb_key);
+	ret = ltdb->kv_ops->delete(ltdb, tdb_key);
 	TALLOC_FREE(tdb_key_ctx);
 
 	if (ret != 0) {
-		ret = ltdb_err_map(tdb_error(ltdb->tdb));
+		ret = ltdb->kv_ops->error(ltdb);
 	}
 
 	return ret;
@@ -1412,7 +1412,7 @@ static int ltdb_start_trans(struct ldb_module *module)
 	}
 
 	if (tdb_transaction_start(ltdb->tdb) != 0) {
-		return ltdb_err_map(tdb_error(ltdb->tdb));
+		return ltdb->kv_ops->error(ltdb);
 	}
 
 	ltdb->in_transaction++;
@@ -1473,7 +1473,7 @@ static int ltdb_end_trans(struct ldb_module *module)
 	ltdb->prepared_commit = false;
 
 	if (tdb_transaction_commit(ltdb->tdb) != 0) {
-		ret = ltdb_err_map(tdb_error(ltdb->tdb));
+		ret = ltdb->kv_ops->error(ltdb);
 		ldb_asprintf_errstring(ldb_module_get_ctx(module),
 				       "Failure during tdb_transaction_commit(): %s -> %s",
 				       tdb_errorstr(ltdb->tdb),
@@ -1493,7 +1493,7 @@ static int ltdb_del_trans(struct ldb_module *module)
 
 	if (ltdb_index_transaction_cancel(module) != 0) {
 		tdb_transaction_cancel(ltdb->tdb);
-		return ltdb_err_map(tdb_error(ltdb->tdb));
+		return ltdb->kv_ops->error(ltdb);
 	}
 
 	tdb_transaction_cancel(ltdb->tdb);
-- 
2.14.3


From 59bdb3a7d61479c90f49dced7123e24084a59eb3 Mon Sep 17 00:00:00 2001
From: Garming Sam <garming at catalyst.net.nz>
Date: Tue, 10 Jan 2017 21:44:11 +1300
Subject: [PATCH 11/19] ldb_tdb: Replace tdb transaction code with generic key
 value ones

Signed-off-by: Garming Sam <garming at catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb/ldb_tdb/ldb_cache.c |  6 ++++--
 lib/ldb/ldb_tdb/ldb_tdb.c   | 38 +++++++++++++++++++++++++++++++-------
 lib/ldb/ldb_tdb/ldb_tdb.h   |  4 ++++
 3 files changed, 39 insertions(+), 9 deletions(-)

diff --git a/lib/ldb/ldb_tdb/ldb_cache.c b/lib/ldb/ldb_tdb/ldb_cache.c
index 5b90bd99f47..600b73f767e 100644
--- a/lib/ldb/ldb_tdb/ldb_cache.c
+++ b/lib/ldb/ldb_tdb/ldb_cache.c
@@ -415,7 +415,7 @@ int ltdb_cache_load(struct ldb_module *module)
 	/* possibly initialise the baseinfo */
 	if (r == LDB_ERR_NO_SUCH_OBJECT) {
 
-		if (tdb_transaction_start(ltdb->tdb) != 0) {
+		if (ltdb->kv_ops->begin_write(ltdb) != 0) {
 			goto failed;
 		}
 
@@ -423,7 +423,9 @@ int ltdb_cache_load(struct ldb_module *module)
 		   looking for the record again. */
 		ltdb_baseinfo_init(module);
 
-		tdb_transaction_commit(ltdb->tdb);
+		if (ltdb->kv_ops->finish_write(ltdb) != 0) {
+			goto failed;
+		}
 
 		if (ltdb_search_dn1(module, baseinfo_dn, baseinfo, 0) != LDB_SUCCESS) {
 			goto failed;
diff --git a/lib/ldb/ldb_tdb/ldb_tdb.c b/lib/ldb/ldb_tdb/ldb_tdb.c
index e5ee4e3d00f..70ab7880090 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.c
+++ b/lib/ldb/ldb_tdb/ldb_tdb.c
@@ -1401,6 +1401,26 @@ static int ltdb_rename(struct ltdb_context *ctx)
 	return ret;
 }
 
+static int ltdb_tdb_transaction_start(struct ltdb_private *ltdb)
+{
+	return tdb_transaction_start(ltdb->tdb);
+}
+
+static int ltdb_tdb_transaction_cancel(struct ltdb_private *ltdb)
+{
+	return tdb_transaction_cancel(ltdb->tdb);
+}
+
+static int ltdb_tdb_transaction_prepare_commit(struct ltdb_private *ltdb)
+{
+	return tdb_transaction_prepare_commit(ltdb->tdb);
+}
+
+static int ltdb_tdb_transaction_commit(struct ltdb_private *ltdb)
+{
+	return tdb_transaction_commit(ltdb->tdb);
+}
+
 static int ltdb_start_trans(struct ldb_module *module)
 {
 	void *data = ldb_module_get_private(module);
@@ -1411,7 +1431,7 @@ static int ltdb_start_trans(struct ldb_module *module)
 		return LDB_ERR_UNWILLING_TO_PERFORM;
 	}
 
-	if (tdb_transaction_start(ltdb->tdb) != 0) {
+	if (ltdb->kv_ops->begin_write(ltdb) != 0) {
 		return ltdb->kv_ops->error(ltdb);
 	}
 
@@ -1434,13 +1454,13 @@ static int ltdb_prepare_commit(struct ldb_module *module)
 
 	ret = ltdb_index_transaction_commit(module);
 	if (ret != LDB_SUCCESS) {
-		tdb_transaction_cancel(ltdb->tdb);
+		ltdb->kv_ops->abort_write(ltdb);
 		ltdb->in_transaction--;
 		return ret;
 	}
 
-	if (tdb_transaction_prepare_commit(ltdb->tdb) != 0) {
-		ret = ltdb_err_map(tdb_error(ltdb->tdb));
+	if (ltdb->kv_ops->prepare_write(ltdb) != 0) {
+		ret = ltdb->kv_ops->error(ltdb);
 		ltdb->in_transaction--;
 		ldb_debug_set(ldb_module_get_ctx(module),
 			      LDB_DEBUG_FATAL,
@@ -1472,7 +1492,7 @@ static int ltdb_end_trans(struct ldb_module *module)
 	ltdb->in_transaction--;
 	ltdb->prepared_commit = false;
 
-	if (tdb_transaction_commit(ltdb->tdb) != 0) {
+	if (ltdb->kv_ops->finish_write(ltdb) != 0) {
 		ret = ltdb->kv_ops->error(ltdb);
 		ldb_asprintf_errstring(ldb_module_get_ctx(module),
 				       "Failure during tdb_transaction_commit(): %s -> %s",
@@ -1492,11 +1512,11 @@ static int ltdb_del_trans(struct ldb_module *module)
 	ltdb->in_transaction--;
 
 	if (ltdb_index_transaction_cancel(module) != 0) {
-		tdb_transaction_cancel(ltdb->tdb);
+		ltdb->kv_ops->abort_write(ltdb);
 		return ltdb->kv_ops->error(ltdb);
 	}
 
-	tdb_transaction_cancel(ltdb->tdb);
+	ltdb->kv_ops->abort_write(ltdb);
 	return LDB_SUCCESS;
 }
 
@@ -1696,6 +1716,10 @@ static const struct kv_db_ops key_value_ops = {
 	.delete = ltdb_tdb_delete,
 	.lock_read = ltdb_lock_read,
 	.unlock_read = ltdb_unlock_read,
+	.begin_write = ltdb_tdb_transaction_start,
+	.prepare_write = ltdb_tdb_transaction_prepare_commit,
+	.finish_write = ltdb_tdb_transaction_commit,
+	.abort_write = ltdb_tdb_transaction_cancel,
 	.error = ltdb_error,
 	.name = ltdb_tdb_name,
 };
diff --git a/lib/ldb/ldb_tdb/ldb_tdb.h b/lib/ldb/ldb_tdb/ldb_tdb.h
index 60ba2652a61..0ffe8458e86 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.h
+++ b/lib/ldb/ldb_tdb/ldb_tdb.h
@@ -10,6 +10,10 @@ struct kv_db_ops {
 	int (*delete)(struct ltdb_private *ltdb, TDB_DATA key);
 	int (*lock_read)(struct ldb_module *);
 	int (*unlock_read)(struct ldb_module *);
+	int (*begin_write)(struct ltdb_private *);
+	int (*prepare_write)(struct ltdb_private *);
+	int (*abort_write)(struct ltdb_private *);
+	int (*finish_write)(struct ltdb_private *);
 	int (*error)(struct ltdb_private *ltdb);
 	const char * (*name)(struct ltdb_private *ltdb);
 };
-- 
2.14.3


From 2e922f393f9d73da7fea5612f2213872e153584e Mon Sep 17 00:00:00 2001
From: Garming Sam <garming at catalyst.net.nz>
Date: Tue, 10 Jan 2017 23:19:55 +1300
Subject: [PATCH 12/19] ldb_tdb: Add lock_read and unlock_read to key value ops

Signed-off-by: Garming Sam <garming at catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb/ldb_tdb/ldb_search.c | 12 ++++++------
 lib/ldb/ldb_tdb/ldb_tdb.c    | 31 ++++++++++++++++++++++++-------
 lib/ldb/ldb_tdb/ldb_tdb.h    |  3 +--
 3 files changed, 31 insertions(+), 15 deletions(-)

diff --git a/lib/ldb/ldb_tdb/ldb_search.c b/lib/ldb/ldb_tdb/ldb_search.c
index c82b36618ca..58cb36d1f33 100644
--- a/lib/ldb/ldb_tdb/ldb_search.c
+++ b/lib/ldb/ldb_tdb/ldb_search.c
@@ -711,17 +711,17 @@ int ltdb_search(struct ltdb_context *ctx)
 
 	ldb_request_set_state(req, LDB_ASYNC_PENDING);
 
-	if (ltdb_lock_read(module) != 0) {
+	if (ltdb->kv_ops->lock_read(module) != 0) {
 		return LDB_ERR_OPERATIONS_ERROR;
 	}
 
 	if (ltdb_cache_load(module) != 0) {
-		ltdb_unlock_read(module);
+		ltdb->kv_ops->unlock_read(module);
 		return LDB_ERR_OPERATIONS_ERROR;
 	}
 
 	if (req->op.search.tree == NULL) {
-		ltdb_unlock_read(module);
+		ltdb->kv_ops->unlock_read(module);
 		return LDB_ERR_OPERATIONS_ERROR;
 	}
 
@@ -768,7 +768,7 @@ int ltdb_search(struct ltdb_context *ctx)
 		 */
 		ret = ltdb_search_and_return_base(ltdb, ctx);
 
-		ltdb_unlock_read(module);
+		ltdb->kv_ops->unlock_read(module);
 
 		return ret;
 
@@ -830,7 +830,7 @@ int ltdb_search(struct ltdb_context *ctx)
 				 * full search or we may return
 				 * duplicate entries
 				 */
-				ltdb_unlock_read(module);
+				ltdb->kv_ops->unlock_read(module);
 				return LDB_ERR_OPERATIONS_ERROR;
 			}
 			ret = ltdb_search_full(ctx);
@@ -840,7 +840,7 @@ int ltdb_search(struct ltdb_context *ctx)
 		}
 	}
 
-	ltdb_unlock_read(module);
+	ltdb->kv_ops->unlock_read(module);
 
 	return ret;
 }
diff --git a/lib/ldb/ldb_tdb/ldb_tdb.c b/lib/ldb/ldb_tdb/ldb_tdb.c
index 70ab7880090..7e95437e79e 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.c
+++ b/lib/ldb/ldb_tdb/ldb_tdb.c
@@ -94,7 +94,7 @@ int ltdb_err_map(enum TDB_ERROR tdb_code)
 /*
   lock the database for read - use by ltdb_search and ltdb_sequence_number
 */
-int ltdb_lock_read(struct ldb_module *module)
+static int ltdb_lock_read(struct ldb_module *module)
 {
 	void *data = ldb_module_get_private(module);
 	struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
@@ -124,7 +124,7 @@ int ltdb_lock_read(struct ldb_module *module)
 /*
   unlock the database after a ltdb_lock_read()
 */
-int ltdb_unlock_read(struct ldb_module *module)
+static int ltdb_unlock_read(struct ldb_module *module)
 {
 	void *data = ldb_module_get_private(module);
 	struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
@@ -1529,6 +1529,8 @@ static int ltdb_sequence_number(struct ltdb_context *ctx,
 	struct ldb_context *ldb;
 	struct ldb_module *module = ctx->module;
 	struct ldb_request *req = ctx->req;
+	void *data = ldb_module_get_private(module);
+	struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
 	TALLOC_CTX *tmp_ctx = NULL;
 	struct ldb_seqnum_request *seq;
 	struct ldb_seqnum_result *res;
@@ -1547,7 +1549,7 @@ static int ltdb_sequence_number(struct ltdb_context *ctx,
 
 	ldb_request_set_state(req, LDB_ASYNC_PENDING);
 
-	if (ltdb_lock_read(module) != 0) {
+	if (ltdb->kv_ops->lock_read(module) != 0) {
 		return LDB_ERR_OPERATIONS_ERROR;
 	}
 
@@ -1609,7 +1611,8 @@ static int ltdb_sequence_number(struct ltdb_context *ctx,
 
 done:
 	talloc_free(tmp_ctx);
-	ltdb_unlock_read(module);
+
+	ltdb->kv_ops->unlock_read(module);
 	return ret;
 }
 
@@ -1872,6 +1875,21 @@ static int ltdb_init_rootdse(struct ldb_module *module)
 	return LDB_SUCCESS;
 }
 
+
+static int generic_lock_read(struct ldb_module *module)
+{
+	void *data = ldb_module_get_private(module);
+	struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
+	return ltdb->kv_ops->lock_read(module);
+}
+
+static int generic_unlock_read(struct ldb_module *module)
+{
+	void *data = ldb_module_get_private(module);
+	struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
+	return ltdb->kv_ops->unlock_read(module);
+}
+
 static const struct ldb_module_ops ltdb_ops = {
 	.name              = "tdb",
 	.init_context      = ltdb_init_rootdse,
@@ -1885,8 +1903,8 @@ static const struct ldb_module_ops ltdb_ops = {
 	.end_transaction   = ltdb_end_trans,
 	.prepare_commit    = ltdb_prepare_commit,
 	.del_transaction   = ltdb_del_trans,
-	.read_lock         = ltdb_lock_read,
-	.read_unlock       = ltdb_unlock_read,
+	.read_lock         = generic_lock_read,
+	.read_unlock       = generic_unlock_read,
 };
 
 /*
@@ -1905,7 +1923,6 @@ static int ltdb_connect(struct ldb_context *ldb, const char *url,
 	 * We hold locks, so we must use a private event context
 	 * on each returned handle
 	 */
-
 	ldb_set_require_private_event_context(ldb);
 
 	/* parse the url */
diff --git a/lib/ldb/ldb_tdb/ldb_tdb.h b/lib/ldb/ldb_tdb/ldb_tdb.h
index 0ffe8458e86..4d4805b98ef 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.h
+++ b/lib/ldb/ldb_tdb/ldb_tdb.h
@@ -165,8 +165,6 @@ int ltdb_filter_attrs(TALLOC_CTX *mem_ctx,
 int ltdb_search(struct ltdb_context *ctx);
 
 /* The following definitions come from lib/ldb/ldb_tdb/ldb_tdb.c  */
-int ltdb_lock_read(struct ldb_module *module);
-int ltdb_unlock_read(struct ldb_module *module);
 /* 
  * Determine if this key could hold a record.  We allow the new GUID
  * index, the old DN index and a possible future ID=
@@ -185,6 +183,7 @@ int ltdb_idx_to_key(struct ldb_module *module,
 		    TALLOC_CTX *mem_ctx,
 		    const struct ldb_val *idx_val,
 		    TDB_DATA *key);
+TDB_DATA ltdb_key(struct ldb_module *module, struct ldb_dn *dn);
 int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flgs);
 int ltdb_modify_internal(struct ldb_module *module, const struct ldb_message *msg, struct ldb_request *req);
 int ltdb_delete_noindex(struct ldb_module *module,
-- 
2.14.3


From c5ebc0347b4a631d1ccf9661f7b527d53d809143 Mon Sep 17 00:00:00 2001
From: Garming Sam <garming at catalyst.net.nz>
Date: Tue, 10 Jan 2017 23:23:22 +1300
Subject: [PATCH 13/19] ldb_tdb: Remove tdb_get_seqnum and use a generic
 'has_changed'

Signed-off-by: Garming Sam <garming at catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb/ldb_tdb/ldb_cache.c |  8 ++++----
 lib/ldb/ldb_tdb/ldb_tdb.c   | 10 ++++++++++
 lib/ldb/ldb_tdb/ldb_tdb.h   |  1 +
 3 files changed, 15 insertions(+), 4 deletions(-)

diff --git a/lib/ldb/ldb_tdb/ldb_cache.c b/lib/ldb/ldb_tdb/ldb_cache.c
index 600b73f767e..4790bcd7e53 100644
--- a/lib/ldb/ldb_tdb/ldb_cache.c
+++ b/lib/ldb/ldb_tdb/ldb_cache.c
@@ -391,8 +391,7 @@ int ltdb_cache_load(struct ldb_module *module)
 	ldb = ldb_module_get_ctx(module);
 
 	/* a very fast check to avoid extra database reads */
-	if (ltdb->cache != NULL && 
-	    tdb_get_seqnum(ltdb->tdb) == ltdb->tdb_seqnum) {
+	if (ltdb->cache != NULL && !ltdb->kv_ops->has_changed(ltdb)) {
 		return 0;
 	}
 
@@ -432,7 +431,8 @@ int ltdb_cache_load(struct ldb_module *module)
 		}
 	}
 
-	ltdb->tdb_seqnum = tdb_get_seqnum(ltdb->tdb);
+	/* Ignore the result, and update the sequence number */
+	ltdb->kv_ops->has_changed(ltdb);
 
 	/* if the current internal sequence number is the same as the one
 	   in the database then assume the rest of the cache is OK */
@@ -594,7 +594,7 @@ int ltdb_increase_sequence_number(struct ldb_module *module)
 
 	/* updating the tdb_seqnum here avoids us reloading the cache
 	   records due to our own modification */
-	ltdb->tdb_seqnum = tdb_get_seqnum(ltdb->tdb);
+	ltdb->kv_ops->has_changed(ltdb);
 
 	return ret;
 }
diff --git a/lib/ldb/ldb_tdb/ldb_tdb.c b/lib/ldb/ldb_tdb/ldb_tdb.c
index 7e95437e79e..d7fe4917965 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.c
+++ b/lib/ldb/ldb_tdb/ldb_tdb.c
@@ -1714,6 +1714,15 @@ static const char * ltdb_tdb_name(struct ltdb_private *ltdb)
 	return tdb_name(ltdb->tdb);
 }
 
+static bool ltdb_tdb_changed(struct ltdb_private *ltdb)
+{
+	bool ret = (tdb_get_seqnum(ltdb->tdb) != ltdb->tdb_seqnum);
+
+	ltdb->tdb_seqnum = tdb_get_seqnum(ltdb->tdb);
+
+	return ret;
+}
+
 static const struct kv_db_ops key_value_ops = {
 	.store = ltdb_tdb_store,
 	.delete = ltdb_tdb_delete,
@@ -1725,6 +1734,7 @@ static const struct kv_db_ops key_value_ops = {
 	.abort_write = ltdb_tdb_transaction_cancel,
 	.error = ltdb_error,
 	.name = ltdb_tdb_name,
+	.has_changed = ltdb_tdb_changed,
 };
 
 static void ltdb_callback(struct tevent_context *ev,
diff --git a/lib/ldb/ldb_tdb/ldb_tdb.h b/lib/ldb/ldb_tdb/ldb_tdb.h
index 4d4805b98ef..ba827f92a88 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.h
+++ b/lib/ldb/ldb_tdb/ldb_tdb.h
@@ -16,6 +16,7 @@ struct kv_db_ops {
 	int (*finish_write)(struct ltdb_private *);
 	int (*error)(struct ltdb_private *ltdb);
 	const char * (*name)(struct ltdb_private *ltdb);
+	bool (*has_changed)(struct ltdb_private *ltdb);
 };
 
 /* this private structure is used by the ltdb backend in the
-- 
2.14.3


From 0e9899b09a8a84814a3738deed429e53aa11fefb Mon Sep 17 00:00:00 2001
From: Gary Lockyer <gary at catalyst.net.nz>
Date: Tue, 13 Feb 2018 15:21:34 +1300
Subject: [PATCH 14/19] ldb_tdb: Add errorstr to the key value ops

Signed-off-by: Garming Sam <garming at catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb/ldb_tdb/ldb_tdb.c | 12 +++++++++---
 lib/ldb/ldb_tdb/ldb_tdb.h |  1 +
 2 files changed, 10 insertions(+), 3 deletions(-)

diff --git a/lib/ldb/ldb_tdb/ldb_tdb.c b/lib/ldb/ldb_tdb/ldb_tdb.c
index d7fe4917965..6cdd5733b98 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.c
+++ b/lib/ldb/ldb_tdb/ldb_tdb.c
@@ -423,6 +423,11 @@ static int ltdb_error(struct ltdb_private *ltdb)
 	return ltdb_err_map(tdb_error(ltdb->tdb));
 }
 
+static const char *ltdb_errorstr(struct ltdb_private *ltdb)
+{
+	return tdb_errorstr(ltdb->tdb);
+}
+
 /*
   store a record into the db
 */
@@ -1465,8 +1470,8 @@ static int ltdb_prepare_commit(struct ldb_module *module)
 		ldb_debug_set(ldb_module_get_ctx(module),
 			      LDB_DEBUG_FATAL,
 			      "Failure during "
-			      "tdb_transaction_prepare_commit(): %s -> %s",
-			      tdb_errorstr(ltdb->tdb),
+			      "prepare_write): %s -> %s",
+			      ltdb->kv_ops->errorstr(ltdb),
 			      ldb_strerror(ret));
 		return ret;
 	}
@@ -1496,7 +1501,7 @@ static int ltdb_end_trans(struct ldb_module *module)
 		ret = ltdb->kv_ops->error(ltdb);
 		ldb_asprintf_errstring(ldb_module_get_ctx(module),
 				       "Failure during tdb_transaction_commit(): %s -> %s",
-				       tdb_errorstr(ltdb->tdb),
+				       ltdb->kv_ops->errorstr(ltdb),
 				       ldb_strerror(ret));
 		return ret;
 	}
@@ -1733,6 +1738,7 @@ static const struct kv_db_ops key_value_ops = {
 	.finish_write = ltdb_tdb_transaction_commit,
 	.abort_write = ltdb_tdb_transaction_cancel,
 	.error = ltdb_error,
+	.errorstr = ltdb_errorstr,
 	.name = ltdb_tdb_name,
 	.has_changed = ltdb_tdb_changed,
 };
diff --git a/lib/ldb/ldb_tdb/ldb_tdb.h b/lib/ldb/ldb_tdb/ldb_tdb.h
index ba827f92a88..5c930bf997a 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.h
+++ b/lib/ldb/ldb_tdb/ldb_tdb.h
@@ -15,6 +15,7 @@ struct kv_db_ops {
 	int (*abort_write)(struct ltdb_private *);
 	int (*finish_write)(struct ltdb_private *);
 	int (*error)(struct ltdb_private *ltdb);
+	const char * (*errorstr)(struct ltdb_private *ltdb);
 	const char * (*name)(struct ltdb_private *ltdb);
 	bool (*has_changed)(struct ltdb_private *ltdb);
 };
-- 
2.14.3


From 647ae80d2ff25153125f3022271fd2c82f568269 Mon Sep 17 00:00:00 2001
From: Garming Sam <garming at catalyst.net.nz>
Date: Wed, 11 Jan 2017 11:36:48 +1300
Subject: [PATCH 15/19] ldb_tdb: factor out the (to be) common init code

Signed-off-by: Garming Sam <garming at catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb/ldb_tdb/ldb_tdb.c | 97 ++++++++++++++++++++++++++---------------------
 lib/ldb/ldb_tdb/ldb_tdb.h |  3 ++
 2 files changed, 57 insertions(+), 43 deletions(-)

diff --git a/lib/ldb/ldb_tdb/ldb_tdb.c b/lib/ldb/ldb_tdb/ldb_tdb.c
index 6cdd5733b98..6593dc2ee93 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.c
+++ b/lib/ldb/ldb_tdb/ldb_tdb.c
@@ -1923,6 +1923,57 @@ static const struct ldb_module_ops ltdb_ops = {
 	.read_unlock       = generic_unlock_read,
 };
 
+int init_store(struct ltdb_private *ltdb,
+		      const char *name,
+		      struct ldb_context *ldb,
+		      const char *options[],
+		      struct ldb_module **_module)
+{
+	struct ldb_module *module;
+
+	if (getenv("LDB_WARN_UNINDEXED")) {
+		ltdb->warn_unindexed = true;
+	}
+
+	if (getenv("LDB_WARN_REINDEX")) {
+		ltdb->warn_reindex = true;
+	}
+
+	ltdb->sequence_number = 0;
+
+	module = ldb_module_new(ldb, ldb, name, &ltdb_ops);
+	if (!module) {
+		ldb_oom(ldb);
+		talloc_free(ltdb);
+		return LDB_ERR_OPERATIONS_ERROR;
+	}
+	ldb_module_set_private(module, ltdb);
+	talloc_steal(module, ltdb);
+
+	if (ltdb_cache_load(module) != 0) {
+		ldb_asprintf_errstring(ldb, "Unable to load ltdb cache "
+				       "records for backend '%s'", name);
+		talloc_free(module);
+		return LDB_ERR_OPERATIONS_ERROR;
+	}
+
+	*_module = module;
+	/*
+	 * Set the maximum key length
+	 */
+	{
+		const char *len_str =
+			ldb_options_find(ldb, options,
+					 "max_key_len_for_self_test");
+		if (len_str != NULL) {
+			unsigned len = strtoul(len_str, NULL, 0);
+			ltdb->max_key_length = len;
+		}
+	}
+
+	return LDB_SUCCESS;
+}
+
 /*
   connect to the database
 */
@@ -1930,7 +1981,6 @@ static int ltdb_connect(struct ldb_context *ldb, const char *url,
 			unsigned int flags, const char *options[],
 			struct ldb_module **_module)
 {
-	struct ldb_module *module;
 	const char *path;
 	int tdb_flags, open_flags;
 	struct ltdb_private *ltdb;
@@ -1995,6 +2045,8 @@ static int ltdb_connect(struct ldb_context *ldb, const char *url,
 		open_flags = O_CREAT | O_RDWR;
 	}
 
+	ltdb->kv_ops = &key_value_ops;
+
 	/* note that we use quite a large default hash size */
 	ltdb->tdb = ltdb_wrap_open(ltdb, path, 10000,
 				   tdb_flags, open_flags,
@@ -2011,48 +2063,7 @@ static int ltdb_connect(struct ldb_context *ldb, const char *url,
 		return LDB_ERR_OPERATIONS_ERROR;
 	}
 
-	if (getenv("LDB_WARN_UNINDEXED")) {
-		ltdb->warn_unindexed = true;
-	}
-
-	if (getenv("LDB_WARN_REINDEX")) {
-		ltdb->warn_reindex = true;
-	}
-
-	ltdb->sequence_number = 0;
-	ltdb->kv_ops = &key_value_ops;
-
-	module = ldb_module_new(ldb, ldb, "ldb_tdb backend", &ltdb_ops);
-	if (!module) {
-		ldb_oom(ldb);
-		talloc_free(ltdb);
-		return LDB_ERR_OPERATIONS_ERROR;
-	}
-	ldb_module_set_private(module, ltdb);
-	talloc_steal(module, ltdb);
-
-	if (ltdb_cache_load(module) != 0) {
-		ldb_asprintf_errstring(ldb,
-				       "Unable to load ltdb cache records of tdb '%s'", path);
-		talloc_free(module);
-		return LDB_ERR_OPERATIONS_ERROR;
-	}
-
-	*_module = module;
-
-	/*
-	 * Set the maximum key length
-	 */
-	{
-		const char *len_str =
-			ldb_options_find(ldb, options,
-					 "max_key_len_for_self_test");
-		if (len_str != NULL) {
-			unsigned len = strtoul(len_str, NULL, 0);
-			ltdb->max_key_length = len;
-		}
-	}
-	return LDB_SUCCESS;
+	return init_store(ltdb, "ldb_tdb backend", ldb, options, _module);
 }
 
 int ldb_tdb_init(const char *version)
diff --git a/lib/ldb/ldb_tdb/ldb_tdb.h b/lib/ldb/ldb_tdb/ldb_tdb.h
index 5c930bf997a..a40bbad9699 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.h
+++ b/lib/ldb/ldb_tdb/ldb_tdb.h
@@ -196,3 +196,6 @@ struct tdb_context *ltdb_wrap_open(TALLOC_CTX *mem_ctx,
 				   const char *path, int hash_size, int tdb_flags,
 				   int open_flags, mode_t mode,
 				   struct ldb_context *ldb);
+int init_store(struct ltdb_private *ltdb, const char *name,
+	       struct ldb_context *ldb, const char *options[],
+	       struct ldb_module **_module);
-- 
2.14.3


From 5fa454faf9858d745fd67b36e714aac703957fc5 Mon Sep 17 00:00:00 2001
From: Garming Sam <garming at catalyst.net.nz>
Date: Tue, 10 Jan 2017 20:43:38 +1300
Subject: [PATCH 16/19] ldb_tdb: Use key value ops for fetch command

Signed-off-by: Garming Sam <garming at catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb/ldb_tdb/ldb_search.c | 6 +++---
 lib/ldb/ldb_tdb/ldb_tdb.c    | 9 +++++++++
 lib/ldb/ldb_tdb/ldb_tdb.h    | 4 ++++
 3 files changed, 16 insertions(+), 3 deletions(-)

diff --git a/lib/ldb/ldb_tdb/ldb_search.c b/lib/ldb/ldb_tdb/ldb_search.c
index 58cb36d1f33..4332036790b 100644
--- a/lib/ldb/ldb_tdb/ldb_search.c
+++ b/lib/ldb/ldb_tdb/ldb_search.c
@@ -252,9 +252,9 @@ int ltdb_search_key(struct ldb_module *module, struct ltdb_private *ltdb,
 	msg->num_elements = 0;
 	msg->elements = NULL;
 
-	ret = tdb_parse_record(ltdb->tdb, tdb_key, 
-			       ltdb_parse_data_unpack, &ctx); 
-	
+	ret = ltdb->kv_ops->fetch_and_parse(ltdb, tdb_key,
+					    ltdb_parse_data_unpack, &ctx);
+
 	if (ret == -1) {
 		ret = ltdb->kv_ops->error(ltdb);
 		if (ret == LDB_SUCCESS) {
diff --git a/lib/ldb/ldb_tdb/ldb_tdb.c b/lib/ldb/ldb_tdb/ldb_tdb.c
index 6593dc2ee93..f02ad5b53b1 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.c
+++ b/lib/ldb/ldb_tdb/ldb_tdb.c
@@ -1714,6 +1714,14 @@ static void ltdb_handle_extended(struct ltdb_context *ctx)
 	ltdb_request_extended_done(ctx, ext, ret);
 }
 
+static int ltdb_tdb_parse_record(struct ltdb_private *ltdb, TDB_DATA key,
+				 int (*parser)(TDB_DATA key, TDB_DATA data,
+					       void *private_data),
+				 void *ctx)
+{
+	return tdb_parse_record(ltdb->tdb, key, parser, ctx);
+}
+
 static const char * ltdb_tdb_name(struct ltdb_private *ltdb)
 {
 	return tdb_name(ltdb->tdb);
@@ -1731,6 +1739,7 @@ static bool ltdb_tdb_changed(struct ltdb_private *ltdb)
 static const struct kv_db_ops key_value_ops = {
 	.store = ltdb_tdb_store,
 	.delete = ltdb_tdb_delete,
+	.fetch_and_parse = ltdb_tdb_parse_record,
 	.lock_read = ltdb_lock_read,
 	.unlock_read = ltdb_unlock_read,
 	.begin_write = ltdb_tdb_transaction_start,
diff --git a/lib/ldb/ldb_tdb/ldb_tdb.h b/lib/ldb/ldb_tdb/ldb_tdb.h
index a40bbad9699..ce8f19c2265 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.h
+++ b/lib/ldb/ldb_tdb/ldb_tdb.h
@@ -8,6 +8,10 @@ struct ltdb_private;
 struct kv_db_ops {
 	int (*store)(struct ltdb_private *ltdb, TDB_DATA key, TDB_DATA data, int flags);
 	int (*delete)(struct ltdb_private *ltdb, TDB_DATA key);
+	int (*fetch_and_parse)(struct ltdb_private *ltdb, TDB_DATA key,
+                               int (*parser)(TDB_DATA key, TDB_DATA data,
+                                             void *private_data),
+                               void *ctx);
 	int (*lock_read)(struct ldb_module *);
 	int (*unlock_read)(struct ldb_module *);
 	int (*begin_write)(struct ltdb_private *);
-- 
2.14.3


From b62baa209fd6af983e9dd3ee96f67c5884b636e8 Mon Sep 17 00:00:00 2001
From: Garming Sam <garming at catalyst.net.nz>
Date: Fri, 16 Feb 2018 13:06:31 +1300
Subject: [PATCH 17/19] ldb_tdb: Implement a traversal function in key value
 ops

This can handle both read-only and writable traverses.

Signed-off-by: Garming Sam <garming at catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb/ldb_tdb/ldb_index.c  | 75 +++++++++++++-------------------------------
 lib/ldb/ldb_tdb/ldb_search.c | 20 +++++-------
 lib/ldb/ldb_tdb/ldb_tdb.c    | 74 ++++++++++++++++++++++++++++++++++++++++++-
 lib/ldb/ldb_tdb/ldb_tdb.h    | 14 +++++++++
 4 files changed, 117 insertions(+), 66 deletions(-)

diff --git a/lib/ldb/ldb_tdb/ldb_index.c b/lib/ldb/ldb_tdb/ldb_index.c
index 263fe578ce2..482bef977de 100644
--- a/lib/ldb/ldb_tdb/ldb_index.c
+++ b/lib/ldb/ldb_tdb/ldb_index.c
@@ -2673,17 +2673,16 @@ int ltdb_index_delete(struct ldb_module *module, const struct ldb_message *msg)
   commit, which in turn greatly reduces DB churn as we will likely
   be able to do a direct update into the old record.
 */
-static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
+static int delete_index(struct ltdb_private *ltdb, struct ldb_val key, struct ldb_val data, void *state)
 {
 	struct ldb_module *module = state;
-	struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module), struct ltdb_private);
 	const char *dnstr = "DN=" LTDB_INDEX ":";
 	struct dn_list list;
 	struct ldb_dn *dn;
 	struct ldb_val v;
 	int ret;
 
-	if (strncmp((char *)key.dptr, dnstr, strlen(dnstr)) != 0) {
+	if (strncmp((char *)key.data, dnstr, strlen(dnstr)) != 0) {
 		return 0;
 	}
 	/* we need to put a empty list in the internal tdb for this
@@ -2692,8 +2691,8 @@ static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, vo
 	list.count = 0;
 
 	/* the offset of 3 is to remove the DN= prefix. */
-	v.data = key.dptr + 3;
-	v.length = strnlen((char *)key.dptr, key.dsize) - 3;
+	v.data = key.data + 3;
+	v.length = strnlen((char *)key.data, key.length) - 3;
 
 	dn = ldb_dn_from_ldb_val(ltdb, ldb_module_get_ctx(module), &v);
 
@@ -2713,29 +2712,23 @@ static int delete_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, vo
 	return 0;
 }
 
-struct ltdb_reindex_context {
-	struct ldb_module *module;
-	int error;
-	uint32_t count;
-};
-
 /*
-  traversal function that adds @INDEX records during a re index
+  traversal function that adds @INDEX records during a re index TODO wrong comment
 */
-static int re_key(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
+static int re_key(struct ltdb_private *ltdb, struct ldb_val ldb_key, struct ldb_val val, void *state)
 {
 	struct ldb_context *ldb;
 	struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
 	struct ldb_module *module = ctx->module;
 	struct ldb_message *msg;
 	unsigned int nb_elements_in_db;
-	const struct ldb_val val = {
-		.data = data.dptr,
-		.length = data.dsize,
-	};
 	int ret;
 	TDB_DATA key2;
 	bool is_record;
+	TDB_DATA key = {
+		.dptr = ldb_key.data,
+		.dsize = ldb_key.length
+	};
 	
 	ldb = ldb_module_get_ctx(module);
 
@@ -2790,32 +2783,11 @@ static int re_key(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *st
 	}
 	if (key.dsize != key2.dsize ||
 	    (memcmp(key.dptr, key2.dptr, key.dsize) != 0)) {
-		int tdb_ret;
-		tdb_ret = tdb_delete(tdb, key);
-		if (tdb_ret != 0) {
-			ldb_debug(ldb, LDB_DEBUG_ERROR,
-				  "Failed to delete %*.*s "
-				  "for rekey as %*.*s: %s",
-				  (int)key.dsize, (int)key.dsize,
-				  (const char *)key.dptr,
-				  (int)key2.dsize, (int)key2.dsize,
-				  (const char *)key.dptr,
-				  tdb_errorstr(tdb));
-			ctx->error = ltdb_err_map(tdb_error(tdb));
-			return -1;
-		}
-		tdb_ret = tdb_store(tdb, key2, data, 0);
-		if (tdb_ret != 0) {
-			ldb_debug(ldb, LDB_DEBUG_ERROR,
-				  "Failed to rekey %*.*s as %*.*s: %s",
-				  (int)key.dsize, (int)key.dsize,
-				  (const char *)key.dptr,
-				  (int)key2.dsize, (int)key2.dsize,
-				  (const char *)key.dptr,
-				  tdb_errorstr(tdb));
-			ctx->error = ltdb_err_map(tdb_error(tdb));
-			return -1;
-		}
+		TDB_DATA data = {
+			.dptr = val.data,
+			.dsize = val.length
+		};
+		ltdb->kv_ops->update_in_iterate(ltdb, key, key2, data, ctx);
 	}
 	talloc_free(key2.dptr);
 
@@ -2834,18 +2806,16 @@ static int re_key(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *st
 /*
   traversal function that adds @INDEX records during a re index
 */
-static int re_index(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
+static int re_index(struct ltdb_private *ltdb, struct ldb_val ldb_key, struct ldb_val val, void *state)
 {
 	struct ldb_context *ldb;
 	struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
 	struct ldb_module *module = ctx->module;
-	struct ltdb_private *ltdb = talloc_get_type(ldb_module_get_private(module),
-						    struct ltdb_private);
 	struct ldb_message *msg;
 	unsigned int nb_elements_in_db;
-	const struct ldb_val val = {
-		.data = data.dptr,
-		.length = data.dsize,
+	TDB_DATA key = {
+		.dptr = ldb_key.data,
+		.dsize = ldb_key.length
 	};
 	int ret;
 	bool is_record;
@@ -2955,7 +2925,7 @@ int ltdb_reindex(struct ldb_module *module)
 	/* first traverse the database deleting any @INDEX records by
 	 * putting NULL entries in the in-memory tdb
 	 */
-	ret = tdb_traverse(ltdb->tdb, delete_index, module);
+	ret = ltdb->kv_ops->iterate(ltdb, delete_index, module);
 	if (ret < 0) {
 		struct ldb_context *ldb = ldb_module_get_ctx(module);
 		ldb_asprintf_errstring(ldb, "index deletion traverse failed: %s",
@@ -2967,8 +2937,7 @@ int ltdb_reindex(struct ldb_module *module)
 	ctx.error = 0;
 	ctx.count = 0;
 
-	/* now traverse adding any indexes for normal LDB records */
-	ret = tdb_traverse(ltdb->tdb, re_key, &ctx);
+	ret = ltdb->kv_ops->iterate(ltdb, re_key, &ctx);
 	if (ret < 0) {
 		struct ldb_context *ldb = ldb_module_get_ctx(module);
 		ldb_asprintf_errstring(ldb, "key correction traverse failed: %s",
@@ -2986,7 +2955,7 @@ int ltdb_reindex(struct ldb_module *module)
 	ctx.count = 0;
 
 	/* now traverse adding any indexes for normal LDB records */
-	ret = tdb_traverse(ltdb->tdb, re_index, &ctx);
+	ret = ltdb->kv_ops->iterate(ltdb, re_index, &ctx);
 	if (ret < 0) {
 		struct ldb_context *ldb = ldb_module_get_ctx(module);
 		ldb_asprintf_errstring(ldb, "reindexing traverse failed: %s",
diff --git a/lib/ldb/ldb_tdb/ldb_search.c b/lib/ldb/ldb_tdb/ldb_search.c
index 4332036790b..78ef8b0abb1 100644
--- a/lib/ldb/ldb_tdb/ldb_search.c
+++ b/lib/ldb/ldb_tdb/ldb_search.c
@@ -487,23 +487,23 @@ failed:
 /*
   search function for a non-indexed search
  */
-static int search_func(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *state)
+static int search_func(struct ltdb_private *ltdb, struct ldb_val key, struct ldb_val val, void *state)
 {
 	struct ldb_context *ldb;
 	struct ltdb_context *ac;
 	struct ldb_message *msg, *filtered_msg;
-	const struct ldb_val val = {
-		.data = data.dptr,
-		.length = data.dsize,
-	};
 	int ret;
 	bool matched;
 	unsigned int nb_elements_in_db;
+	TDB_DATA tdb_key = {
+		.dptr = key.data,
+		.dsize = key.length
+	};
 
 	ac = talloc_get_type(state, struct ltdb_context);
 	ldb = ldb_module_get_ctx(ac->module);
 
-	if (ltdb_key_is_record(key) == false) {
+	if (ltdb_key_is_record(tdb_key) == false) {
 		return 0;
 	}
 
@@ -528,7 +528,7 @@ static int search_func(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, voi
 
 	if (!msg->dn) {
 		msg->dn = ldb_dn_new(msg, ldb,
-				     (char *)key.dptr + 3);
+				     (char *)key.data + 3);
 		if (msg->dn == NULL) {
 			talloc_free(msg);
 			ac->error = LDB_ERR_OPERATIONS_ERROR;
@@ -581,11 +581,7 @@ static int ltdb_search_full(struct ltdb_context *ctx)
 	int ret;
 
 	ctx->error = LDB_SUCCESS;
-	if (ltdb->in_transaction != 0) {
-		ret = tdb_traverse(ltdb->tdb, search_func, ctx);
-	} else {
-		ret = tdb_traverse_read(ltdb->tdb, search_func, ctx);
-	}
+	ret = ltdb->kv_ops->iterate(ltdb, search_func, ctx);
 
 	if (ret < 0) {
 		return LDB_ERR_OPERATIONS_ERROR;
diff --git a/lib/ldb/ldb_tdb/ldb_tdb.c b/lib/ldb/ldb_tdb/ldb_tdb.c
index f02ad5b53b1..9a2b47b87c4 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.c
+++ b/lib/ldb/ldb_tdb/ldb_tdb.c
@@ -926,7 +926,6 @@ static int msg_delete_element(struct ldb_module *module,
 	return LDB_ERR_NO_SUCH_ATTRIBUTE;
 }
 
-
 /*
   modify a record - internal interface
 
@@ -1714,6 +1713,77 @@ static void ltdb_handle_extended(struct ltdb_context *ctx)
 	ltdb_request_extended_done(ctx, ext, ret);
 }
 
+struct kv_ctx {
+	ldb_kv_traverse_fn kv_traverse_fn;
+	void *ctx;
+	struct ltdb_private *ltdb;
+};
+
+static int ldb_tdb_traverse_fn_wrapper(struct tdb_context *tdb, TDB_DATA tdb_key, TDB_DATA tdb_data, void *ctx)
+{
+	struct kv_ctx *kv_ctx = ctx;
+	struct ldb_val key = {
+		.length = tdb_key.dsize,
+		.data = tdb_key.dptr,
+	};
+	struct ldb_val data = {
+		.length = tdb_data.dsize,
+		.data = tdb_data.dptr,
+	};
+	return kv_ctx->kv_traverse_fn(kv_ctx->ltdb, key, data, kv_ctx->ctx);
+}
+
+static int ltdb_tdb_traverse_fn(struct ltdb_private *ltdb, ldb_kv_traverse_fn fn, void *ctx)
+{
+	struct kv_ctx kv_ctx = {
+		.kv_traverse_fn = fn,
+		.ctx = ctx,
+		.ltdb = ltdb
+	};
+	if (ltdb->in_transaction != 0) {
+		return tdb_traverse(ltdb->tdb, ldb_tdb_traverse_fn_wrapper, &kv_ctx);
+	} else {
+		return tdb_traverse_read(ltdb->tdb, ldb_tdb_traverse_fn_wrapper, &kv_ctx);
+	}
+}
+
+static int ltdb_tdb_update_in_iterate(struct ltdb_private *ltdb, TDB_DATA key, TDB_DATA key2, TDB_DATA data, void *state)
+{
+	int tdb_ret;
+	struct ldb_context *ldb;
+	struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
+	struct ldb_module *module = ctx->module;
+
+	ldb = ldb_module_get_ctx(module);
+
+	tdb_ret = tdb_delete(ltdb->tdb, key);
+	if (tdb_ret != 0) {
+		ldb_debug(ldb, LDB_DEBUG_ERROR,
+			  "Failed to delete %*.*s "
+			  "for rekey as %*.*s: %s",
+			  (int)key.dsize, (int)key.dsize,
+			  (const char *)key.dptr,
+			  (int)key2.dsize, (int)key2.dsize,
+			  (const char *)key.dptr,
+			  tdb_errorstr(ltdb->tdb));
+		ctx->error = ltdb_err_map(tdb_error(ltdb->tdb));
+		return -1;
+	}
+	tdb_ret = tdb_store(ltdb->tdb, key2, data, 0);
+	if (tdb_ret != 0) {
+		ldb_debug(ldb, LDB_DEBUG_ERROR,
+			  "Failed to rekey %*.*s as %*.*s: %s",
+			  (int)key.dsize, (int)key.dsize,
+			  (const char *)key.dptr,
+			  (int)key2.dsize, (int)key2.dsize,
+			  (const char *)key.dptr,
+			  tdb_errorstr(ltdb->tdb));
+		ctx->error = ltdb_err_map(tdb_error(ltdb->tdb));
+		return -1;
+	}
+	return tdb_ret;
+}
+
 static int ltdb_tdb_parse_record(struct ltdb_private *ltdb, TDB_DATA key,
 				 int (*parser)(TDB_DATA key, TDB_DATA data,
 					       void *private_data),
@@ -1739,6 +1809,8 @@ static bool ltdb_tdb_changed(struct ltdb_private *ltdb)
 static const struct kv_db_ops key_value_ops = {
 	.store = ltdb_tdb_store,
 	.delete = ltdb_tdb_delete,
+	.iterate = ltdb_tdb_traverse_fn,
+	.update_in_iterate = ltdb_tdb_update_in_iterate,
 	.fetch_and_parse = ltdb_tdb_parse_record,
 	.lock_read = ltdb_lock_read,
 	.unlock_read = ltdb_unlock_read,
diff --git a/lib/ldb/ldb_tdb/ldb_tdb.h b/lib/ldb/ldb_tdb/ldb_tdb.h
index ce8f19c2265..bf679900764 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.h
+++ b/lib/ldb/ldb_tdb/ldb_tdb.h
@@ -5,9 +5,16 @@
 #include "ldb_module.h"
 
 struct ltdb_private;
+typedef int (*ldb_kv_traverse_fn)(struct ltdb_private *ltdb,
+				  struct ldb_val key, struct ldb_val data,
+				  void *ctx);
+
 struct kv_db_ops {
 	int (*store)(struct ltdb_private *ltdb, TDB_DATA key, TDB_DATA data, int flags);
 	int (*delete)(struct ltdb_private *ltdb, TDB_DATA key);
+	int (*iterate)(struct ltdb_private *ltdb, ldb_kv_traverse_fn fn, void *ctx);
+	int (*update_in_iterate)(struct ltdb_private *ltdb, TDB_DATA key,
+				 TDB_DATA key2, TDB_DATA data, void *ctx);
 	int (*fetch_and_parse)(struct ltdb_private *ltdb, TDB_DATA key,
                                int (*parser)(TDB_DATA key, TDB_DATA data,
                                              void *private_data),
@@ -86,6 +93,13 @@ struct ltdb_context {
 	int error;
 };
 
+struct ltdb_reindex_context {
+	struct ldb_module *module;
+	int error;
+	uint32_t count;
+};
+
+
 /* special record types */
 #define LTDB_INDEX      "@INDEX"
 #define LTDB_INDEXLIST  "@INDEXLIST"
-- 
2.14.3


From ca34bb90de595a652523f9a3be094fa538afd0de Mon Sep 17 00:00:00 2001
From: Garming Sam <garming at catalyst.net.nz>
Date: Fri, 13 Jan 2017 11:32:14 +1300
Subject: [PATCH 18/19] partition: Allow a different backend store from
 @PARTITION

By default, use tdb, but otherwise read the value from backendStore.

Signed-off-by: Garming Sam <garming at catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet at samba.org>
---
 source4/dsdb/samdb/ldb_modules/partition.h      |  2 ++
 source4/dsdb/samdb/ldb_modules/partition_init.c | 37 ++++++++++++++++---------
 2 files changed, 26 insertions(+), 13 deletions(-)

diff --git a/source4/dsdb/samdb/ldb_modules/partition.h b/source4/dsdb/samdb/ldb_modules/partition.h
index ea05e9404d5..1bf213f9767 100644
--- a/source4/dsdb/samdb/ldb_modules/partition.h
+++ b/source4/dsdb/samdb/ldb_modules/partition.h
@@ -57,6 +57,8 @@ struct partition_private_data {
 	uint32_t in_transaction;
 
 	struct ldb_message *forced_module_msg;
+
+	const char *backend_db_store;
 };
 
 #include "dsdb/samdb/ldb_modules/partition_proto.h"
diff --git a/source4/dsdb/samdb/ldb_modules/partition_init.c b/source4/dsdb/samdb/ldb_modules/partition_init.c
index 3e2648efa11..9a6ac0c05a9 100644
--- a/source4/dsdb/samdb/ldb_modules/partition_init.c
+++ b/source4/dsdb/samdb/ldb_modules/partition_init.c
@@ -139,7 +139,7 @@ static int partition_reload_metadata(struct ldb_module *module, struct partition
 	struct ldb_result *res;
 	struct ldb_context *ldb = ldb_module_get_ctx(module);
 	const char *attrs[] = { "partition", "replicateEntries", "modules", "ldapBackend",
-				"partialReplica", NULL };
+				"partialReplica", "backendStore", NULL };
 	/* perform search for @PARTITION, looking for module, replicateEntries and ldapBackend */
 	ret = dsdb_module_search_dn(module, mem_ctx, &res, 
 				    ldb_dn_new(mem_ctx, ldb, DSDB_PARTITION_DN),
@@ -201,8 +201,8 @@ static const char **find_modules_for_dn(struct partition_private_data *data, str
 static int new_partition_from_dn(struct ldb_context *ldb, struct partition_private_data *data, 
 				 TALLOC_CTX *mem_ctx, 
 				 struct ldb_dn *dn, const char *filename,
+				 const char *backend_db_store,
 				 struct dsdb_partition **partition) {
-	const char *backend_url;
 	struct dsdb_control_current_partition *ctrl;
 	struct ldb_module *backend_module;
 	struct ldb_module *module_chain;
@@ -225,29 +225,31 @@ static int new_partition_from_dn(struct ldb_context *ldb, struct partition_priva
 		(*partition)->backend_url = data->ldapBackend;
 	} else {
 		/* the backend LDB is the DN (base64 encoded if not 'plain') followed by .ldb */
-		backend_url = ldb_relative_path(ldb, 
-						  *partition, 
-						  filename);
-		if (!backend_url) {
+		char *backend_path = ldb_relative_path(ldb,
+						       *partition,
+						       filename);
+		if (!backend_path) {
 			ldb_asprintf_errstring(ldb, 
 					       "partition_init: unable to determine an relative path for partition: %s", filename);
 			talloc_free(*partition);
 			return LDB_ERR_OPERATIONS_ERROR;
 		}
-		(*partition)->backend_url = talloc_steal((*partition), backend_url);
+		(*partition)->backend_url = talloc_asprintf(*partition, "%s://%s",
+							    backend_db_store,
+							    backend_path);
 
 		if (!(ldb_module_flags(ldb) & LDB_FLG_RDONLY)) {
 			char *p;
-			char *backend_dir = talloc_strdup(*partition, backend_url);
-			
-			p = strrchr(backend_dir, '/');
+			char *backend_dir;
+
+			p = strrchr(backend_path, '/');
 			if (p) {
 				p[0] = '\0';
 			}
+			backend_dir = backend_path;
 
 			/* Failure is quite reasonable, it might alredy exist */
 			mkdir(backend_dir, 0700);
-			talloc_free(backend_dir);
 		}
 
 	}
@@ -417,6 +419,13 @@ int partition_reload_if_required(struct ldb_module *module,
 
 	partition_attributes = ldb_msg_find_element(msg, "partition");
 	partial_replicas     = ldb_msg_find_element(msg, "partialReplica");
+	data->backend_db_store
+		= talloc_strdup(data, ldb_msg_find_attr_as_string(msg, "backendStore", "tdb"));
+
+	if (data->backend_db_store == NULL) {
+		talloc_free(mem_ctx);
+		return ldb_module_oom(module);
+	}
 
 	for (i=0; partition_attributes && i < partition_attributes->num_values; i++) {
 		unsigned int j;
@@ -498,7 +507,7 @@ int partition_reload_if_required(struct ldb_module *module,
 		 * correctly.  We don't want to mess that up as the
 		 * schema isn't loaded yet */
 		ret = new_partition_from_dn(ldb, data, data->partitions, dn, 
-					    filename,
+					    filename, data->backend_db_store,
 					    &partition);
 		if (ret != LDB_SUCCESS) {
 			talloc_free(mem_ctx);
@@ -816,7 +825,9 @@ int partition_create(struct ldb_module *module, struct ldb_request *req)
 		}
 		
 		/* Make a partition structure for this new partition, so we can copy in the template structure */ 
-		ret = new_partition_from_dn(ldb, data, req, ldb_dn_copy(req, dn), filename, &partition);
+		ret = new_partition_from_dn(ldb, data, req, ldb_dn_copy(req, dn),
+					    filename, data->backend_db_store,
+					    &partition);
 		if (ret != LDB_SUCCESS) {
 			return ret;
 		}
-- 
2.14.3


From 8689a2e63214f84411a5029146caa94d384641c2 Mon Sep 17 00:00:00 2001
From: Garming Sam <garming at catalyst.net.nz>
Date: Fri, 16 Feb 2018 13:26:46 +1300
Subject: [PATCH 19/19] ldb_tdb: Build a key value operation library

This allows sharing of the originally ldb_tdb operations to the new
ldb_mdb backend.

Signed-off-by: Garming Sam <garming at catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb/ldb_tdb/ldb_tdb.c      | 12 +++------
 lib/ldb/ldb_tdb/ldb_tdb.h      |  4 +++
 lib/ldb/ldb_tdb/ldb_tdb_init.c | 59 ++++++++++++++++++++++++++++++++++++++++++
 lib/ldb/wscript                | 12 ++++++---
 4 files changed, 75 insertions(+), 12 deletions(-)
 create mode 100644 lib/ldb/ldb_tdb/ldb_tdb_init.c

diff --git a/lib/ldb/ldb_tdb/ldb_tdb.c b/lib/ldb/ldb_tdb/ldb_tdb.c
index 9a2b47b87c4..4f1c241e96a 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.c
+++ b/lib/ldb/ldb_tdb/ldb_tdb.c
@@ -2058,9 +2058,9 @@ int init_store(struct ltdb_private *ltdb,
 /*
   connect to the database
 */
-static int ltdb_connect(struct ldb_context *ldb, const char *url,
-			unsigned int flags, const char *options[],
-			struct ldb_module **_module)
+int ltdb_connect(struct ldb_context *ldb, const char *url,
+		 unsigned int flags, const char *options[],
+		 struct ldb_module **_module)
 {
 	const char *path;
 	int tdb_flags, open_flags;
@@ -2146,9 +2146,3 @@ static int ltdb_connect(struct ldb_context *ldb, const char *url,
 
 	return init_store(ltdb, "ldb_tdb backend", ldb, options, _module);
 }
-
-int ldb_tdb_init(const char *version)
-{
-	LDB_MODULE_CHECK_VERSION(version);
-	return ldb_register_backend("tdb", ltdb_connect, false);
-}
diff --git a/lib/ldb/ldb_tdb/ldb_tdb.h b/lib/ldb/ldb_tdb/ldb_tdb.h
index bf679900764..2235bd47c98 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.h
+++ b/lib/ldb/ldb_tdb/ldb_tdb.h
@@ -217,3 +217,7 @@ struct tdb_context *ltdb_wrap_open(TALLOC_CTX *mem_ctx,
 int init_store(struct ltdb_private *ltdb, const char *name,
 	       struct ldb_context *ldb, const char *options[],
 	       struct ldb_module **_module);
+
+int ltdb_connect(struct ldb_context *ldb, const char *url,
+		 unsigned int flags, const char *options[],
+		 struct ldb_module **_module);
diff --git a/lib/ldb/ldb_tdb/ldb_tdb_init.c b/lib/ldb/ldb_tdb/ldb_tdb_init.c
new file mode 100644
index 00000000000..b18c98a367c
--- /dev/null
+++ b/lib/ldb/ldb_tdb/ldb_tdb_init.c
@@ -0,0 +1,59 @@
+/*
+   ldb database library
+
+   Copyright (C) Andrew Tridgell 2004
+   Copyright (C) Stefan Metzmacher 2004
+   Copyright (C) Simo Sorce 2006-2008
+   Copyright (C) Matthias Dieter Wallnöfer 2009-2010
+
+     ** NOTE! The following LGPL license applies to the ldb
+     ** library. This does NOT imply that all of Samba is released
+     ** under the LGPL
+
+   This library is free software; you can redistribute it and/or
+   modify it under the terms of the GNU Lesser General Public
+   License as published by the Free Software Foundation; either
+   version 3 of the License, or (at your option) any later version.
+
+   This library is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   Lesser General Public License for more details.
+
+   You should have received a copy of the GNU Lesser General Public
+   License along with this library; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+/*
+ *  Name: ldb_tdb
+ *
+ *  Component: ldb tdb backend
+ *
+ *  Description: core functions for tdb backend
+ *
+ *  Author: Andrew Tridgell
+ *  Author: Stefan Metzmacher
+ *
+ *  Modifications:
+ *
+ *  - description: make the module use asynchronous calls
+ *    date: Feb 2006
+ *    Author: Simo Sorce
+ *
+ *  - description: make it possible to use event contexts
+ *    date: Jan 2008
+ *    Author: Simo Sorce
+ *
+ *  - description: fix up memory leaks and small bugs
+ *    date: Oct 2009
+ *    Author: Matthias Dieter Wallnöfer
+ */
+
+#include "ldb_tdb.h"
+#include "ldb_private.h"
+
+int ldb_tdb_init(const char *version)
+{
+	LDB_MODULE_CHECK_VERSION(version);
+	return ldb_register_backend("tdb", ltdb_connect, false);
+}
diff --git a/lib/ldb/wscript b/lib/ldb/wscript
index e14fa63ec2c..1455f92eb2e 100644
--- a/lib/ldb/wscript
+++ b/lib/ldb/wscript
@@ -307,14 +307,20 @@ def build(bld):
 
         bld.SAMBA_MODULE('ldb_tdb',
                          bld.SUBDIR('ldb_tdb',
-                                    '''ldb_tdb.c ldb_search.c ldb_index.c
-                                    ldb_cache.c ldb_tdb_wrap.c'''),
+                                    '''ldb_tdb_init.c'''),
                          init_function='ldb_tdb_init',
                          module_init_name='ldb_init_module',
                          internal_module=False,
-                         deps='tdb ldb',
+                         deps='tdb ldb ldb_key_value',
                          subsystem='ldb')
 
+        bld.SAMBA_LIBRARY('ldb_key_value',
+                          bld.SUBDIR('ldb_tdb',
+                                    '''ldb_tdb.c ldb_search.c ldb_index.c
+                                    ldb_cache.c ldb_tdb_wrap.c'''),
+                          private_library=True,
+                          deps='tdb ldb')
+
         # have a separate subsystem for common/ldb.c, so it can rebuild
         # for install with a different -DLDB_MODULESDIR=
         bld.SAMBA_SUBSYSTEM('LIBLDB_MAIN',
-- 
2.14.3



More information about the samba-technical mailing list