[PATCH] Prepare for LMDB by improving ldb_tdb and Samba callers

Andrew Bartlett abartlet at samba.org
Tue Apr 3 23:43:45 UTC 2018


This patch set prepares for the new LMDB backend for Samba's ldb layer
by adding a battery of tests for the new key-value layer and some small
accommodations elsewhere in Samba (like not caching ldb handles across
a fork()).

It also removes a number of tdb-specific assumptions and tests the GUID
index mode that is required in lmdb.

This addresses metze's request that the tests be included before the
lmdb patch set is merged.

Please review and push on top of my previous patch set.

Andrew Bartlett
-- 
Andrew Bartlett
https://samba.org/~abartlet/
Authentication Developer, Samba Team         https://samba.org
Samba Development and Support, Catalyst IT   
https://catalyst.net.nz/services/samba



-------------- next part --------------
From 7e03a8c1f17cc71b9e64d28a13625813a46eeb68 Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Thu, 15 Mar 2018 13:42:17 +1300
Subject: [PATCH 01/23] ldb_wrap: Remove ldb_transaction_cancel_noerr from
 ldb_wrap_fork_hook()

Writing to a TDB, without locks (these are per-process) in a forked child is never going to
end well, if a transaction is open at this point we have bigger problems.

Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb-samba/ldb_wrap.c | 15 +++------------
 1 file changed, 3 insertions(+), 12 deletions(-)

diff --git a/lib/ldb-samba/ldb_wrap.c b/lib/ldb-samba/ldb_wrap.c
index 9959b04ed95..8c3bf6f7bf3 100644
--- a/lib/ldb-samba/ldb_wrap.c
+++ b/lib/ldb-samba/ldb_wrap.c
@@ -329,20 +329,11 @@ int samba_ldb_connect(struct ldb_context *ldb, struct loadparm_context *lp_ctx,
 }
 
 /*
-  when we fork() we need to make sure that any open ldb contexts have
-  any open transactions cancelled (ntdb databases doesn't need reopening,
-  as we don't use clear_if_first).
- */
+  call tdb_reopen_all() in case there is a TDB open so we are
+  not blocked from re-opening it inside ldb_tdb.
+*/
  void ldb_wrap_fork_hook(void)
 {
-	struct ldb_wrap *w;
-
-	for (w=ldb_wrap_list; w; w=w->next) {
-		if (ldb_transaction_cancel_noerr(w->ldb) != LDB_SUCCESS) {
-			smb_panic("Failed to cancel child transactions\n");
-		}
-	}
-
 	if (tdb_reopen_all(1) != 0) {
 		smb_panic("tdb_reopen_all failed\n");
 	}
-- 
2.11.0


From 697bafe111fb208a86fa030794ec9288938422c6 Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Thu, 15 Mar 2018 13:44:52 +1300
Subject: [PATCH 02/23] ldb_wrap: Remove the magic cache of database handles
 except for sam.ldb

sam.ldb is handled in samdb_connect_url(), not this function.

This cache caused issues when "private dir" was changed in a testing script, but also
just generates many-owner shared mutable state that is frowned upon these days.

Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb-samba/ldb_wrap.c | 15 +++++++--------
 1 file changed, 7 insertions(+), 8 deletions(-)

diff --git a/lib/ldb-samba/ldb_wrap.c b/lib/ldb-samba/ldb_wrap.c
index 8c3bf6f7bf3..53255556e7c 100644
--- a/lib/ldb-samba/ldb_wrap.c
+++ b/lib/ldb-samba/ldb_wrap.c
@@ -303,9 +303,13 @@ int samba_ldb_connect(struct ldb_context *ldb, struct loadparm_context *lp_ctx,
 	struct ldb_context *ldb;
 	int ret;
 
-	ldb = ldb_wrap_find(url, ev, lp_ctx, session_info, credentials, flags);
-	if (ldb != NULL)
-		return talloc_reference(mem_ctx, ldb);
+	/*
+	 * Unlike samdb_connect_url() do not try and cache the LDB
+	 * handle, get a new one each time.  Only sam.ldb is
+	 * punitively expensive to open and helpful caches like this
+	 * cause challenges (such as if the value for 'private dir'
+	 * changes).
+	 */
 
 	ldb = samba_ldb_init(mem_ctx, ev, lp_ctx, session_info, credentials);
 
@@ -318,11 +322,6 @@ int samba_ldb_connect(struct ldb_context *ldb, struct loadparm_context *lp_ctx,
 		return NULL;
 	}
 
-	if (!ldb_wrap_add(url, ev, lp_ctx, session_info, credentials, flags, ldb)) {
-		talloc_free(ldb);
-		return NULL;
-	}
-
 	DEBUG(3,("ldb_wrap open of %s\n", url));
 
 	return ldb;
-- 
2.11.0


From 694ece0d6d6386894bed742ae016b54b13f2af1c Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Thu, 8 Mar 2018 14:01:50 +1300
Subject: [PATCH 03/23] ldb: Fix missing NULL terminator in ldb_mod_op_test
 testsuite

Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb/tests/ldb_mod_op_test.c | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

diff --git a/lib/ldb/tests/ldb_mod_op_test.c b/lib/ldb/tests/ldb_mod_op_test.c
index b91130252a7..0d38c4f9681 100644
--- a/lib/ldb/tests/ldb_mod_op_test.c
+++ b/lib/ldb/tests/ldb_mod_op_test.c
@@ -3099,7 +3099,7 @@ static int ldb_unique_index_test_setup(void **state)
 		"dn: @INDEXLIST\n"
 		"@IDXATTR: cn\n"
 		"\n";
-	const char *options[] = {"modules:unique_index_test"};
+	const char *options[] = {"modules:unique_index_test", NULL};
 
 
 	ret = ldb_register_module(&ldb_unique_index_test_module_ops);
@@ -3201,7 +3201,7 @@ static int ldb_non_unique_index_test_setup(void **state)
 		"dn: @INDEXLIST\n"
 		"@IDXATTR: cn\n"
 		"\n";
-	const char *options[] = {"modules:unique_index_test"};
+	const char *options[] = {"modules:unique_index_test", NULL};
 
 
 	ret = ldb_register_module(&ldb_unique_index_test_module_ops);
-- 
2.11.0


From 063b7b79b6dce1ef189b1b460c6f7a44fbf33258 Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Fri, 23 Mar 2018 13:05:55 +1300
Subject: [PATCH 04/23] samba-tool domain classicupgrade: Do not mix
 python-samdb transactions and passdb modifications

This worked previously because we knew the same tdb was in use under the hood,
but now that nested TDB transactions are banned this breaks, and it breaks for
LMDB.

Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
 python/samba/upgrade.py | 48 +++++++++++++++++++-----------------------------
 1 file changed, 19 insertions(+), 29 deletions(-)

diff --git a/python/samba/upgrade.py b/python/samba/upgrade.py
index ff470665fe9..e3fa376e053 100644
--- a/python/samba/upgrade.py
+++ b/python/samba/upgrade.py
@@ -785,38 +785,28 @@ Please fix this account before attempting to upgrade again
     result.samdb.transaction_commit()
 
     logger.info("Adding users")
-    # Start a new transaction (should speed this up a little, due to index churn)
-    result.samdb.transaction_start()
-
-    try:
-        # Export users to samba4 backend
-        logger.info("Importing users")
-        for username in userdata:
-            if username.lower() == 'administrator':
-                if userdata[username].user_sid != dom_sid(str(domainsid) + "-500"):
-                    logger.error("User 'Administrator' in your existing directory has SID %s, expected it to be %s" % (userdata[username].user_sid, dom_sid(str(domainsid) + "-500")))
-                    raise ProvisioningError("User 'Administrator' in your existing directory does not have SID ending in -500")
-            if username.lower() == 'root':
-                if userdata[username].user_sid == dom_sid(str(domainsid) + "-500"):
-                    logger.warn('User root has been replaced by Administrator')
-                else:
-                    logger.warn('User root has been kept in the directory, it should be removed in favour of the Administrator user')
 
-            s4_passdb.add_sam_account(userdata[username])
-            if username in uids:
-                add_ad_posix_idmap_entry(result.samdb, userdata[username].user_sid, uids[username], "ID_TYPE_UID", logger)
-                if (username in homes) and (homes[username] is not None) and \
-                   (username in shells) and (shells[username] is not None) and \
-                   (username in pgids) and (pgids[username] is not None):
-                    add_posix_attrs(samdb=result.samdb, sid=userdata[username].user_sid, name=username, nisdomain=domainname.lower(), xid_type="ID_TYPE_UID", home=homes[username], shell=shells[username], pgid=pgids[username], logger=logger)
+    # Export users to samba4 backend
+    logger.info("Importing users")
+    for username in userdata:
+        if username.lower() == 'administrator':
+            if userdata[username].user_sid != dom_sid(str(domainsid) + "-500"):
+                logger.error("User 'Administrator' in your existing directory has SID %s, expected it to be %s" % (userdata[username].user_sid, dom_sid(str(domainsid) + "-500")))
+                raise ProvisioningError("User 'Administrator' in your existing directory does not have SID ending in -500")
+        if username.lower() == 'root':
+            if userdata[username].user_sid == dom_sid(str(domainsid) + "-500"):
+                logger.warn('User root has been replaced by Administrator')
+            else:
+                logger.warn('User root has been kept in the directory, it should be removed in favour of the Administrator user')
 
-    except:
-        # We need this, so that we do not give even more errors due to not cancelling the transaction
-        result.samdb.transaction_cancel()
-        raise
+        s4_passdb.add_sam_account(userdata[username])
+        if username in uids:
+            add_ad_posix_idmap_entry(result.samdb, userdata[username].user_sid, uids[username], "ID_TYPE_UID", logger)
+            if (username in homes) and (homes[username] is not None) and \
+               (username in shells) and (shells[username] is not None) and \
+               (username in pgids) and (pgids[username] is not None):
+                add_posix_attrs(samdb=result.samdb, sid=userdata[username].user_sid, name=username, nisdomain=domainname.lower(), xid_type="ID_TYPE_UID", home=homes[username], shell=shells[username], pgid=pgids[username], logger=logger)
 
-    logger.info("Committing 'add users' transaction to disk")
-    result.samdb.transaction_commit()
 
     logger.info("Adding users to groups")
     # Start a new transaction (should speed this up a little, due to index churn)
-- 
2.11.0


From e97a09eabff698a4e5ec12434f7ee2cb266bd393 Mon Sep 17 00:00:00 2001
From: Garming Sam <garming at catalyst.net.nz>
Date: Fri, 16 Feb 2018 17:13:26 +1300
Subject: [PATCH 05/23] ldb: Change some prototypes to using ldb_val instead of
 TDB_DATA

Signed-off-by: Garming Sam <garming at catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb/ldb_tdb/ldb_index.c |  8 ++++----
 lib/ldb/ldb_tdb/ldb_tdb.c   | 49 +++++++++++++++++++++++++++++++++++++--------
 lib/ldb/ldb_tdb/ldb_tdb.h   |  8 ++++----
 3 files changed, 49 insertions(+), 16 deletions(-)

diff --git a/lib/ldb/ldb_tdb/ldb_index.c b/lib/ldb/ldb_tdb/ldb_index.c
index bb534c3833c..076db10f2dd 100644
--- a/lib/ldb/ldb_tdb/ldb_index.c
+++ b/lib/ldb/ldb_tdb/ldb_index.c
@@ -2783,11 +2783,11 @@ static int re_key(struct ltdb_private *ltdb, struct ldb_val ldb_key, struct ldb_
 	}
 	if (key.dsize != key2.dsize ||
 	    (memcmp(key.dptr, key2.dptr, key.dsize) != 0)) {
-		TDB_DATA data = {
-			.dptr = val.data,
-			.dsize = val.length
+		struct ldb_val ldb_key2 = {
+			.data = key2.dptr,
+			.length = key2.dsize
 		};
-		ltdb->kv_ops->update_in_iterate(ltdb, key, key2, data, ctx);
+		ltdb->kv_ops->update_in_iterate(ltdb, ldb_key, ldb_key2, val, ctx);
 	}
 	talloc_free(key2.dptr);
 
diff --git a/lib/ldb/ldb_tdb/ldb_tdb.c b/lib/ldb/ldb_tdb/ldb_tdb.c
index 20796f2162d..f94a308cbf9 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.c
+++ b/lib/ldb/ldb_tdb/ldb_tdb.c
@@ -417,8 +417,17 @@ static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
 	return ret;
 }
 
-static int ltdb_tdb_store(struct ltdb_private *ltdb, TDB_DATA key, TDB_DATA data, int flags)
+static int ltdb_tdb_store(struct ltdb_private *ltdb, struct ldb_val ldb_key,
+			  struct ldb_val ldb_data, int flags)
 {
+	TDB_DATA key = {
+		.dptr = ldb_key.data,
+		.dsize = ldb_key.length
+	};
+	TDB_DATA data = {
+		.dptr = ldb_data.data,
+		.dsize = ldb_data.length
+	};
 	return tdb_store(ltdb->tdb, key, data, flags);
 }
 
@@ -439,7 +448,8 @@ int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flg
 {
 	void *data = ldb_module_get_private(module);
 	struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
-	TDB_DATA tdb_key, tdb_data;
+	TDB_DATA tdb_key;
+	struct ldb_val ldb_key;
 	struct ldb_val ldb_data;
 	int ret = LDB_SUCCESS;
 	TALLOC_CTX *tdb_key_ctx = talloc_new(module);
@@ -465,10 +475,10 @@ int ltdb_store(struct ldb_module *module, const struct ldb_message *msg, int flg
 		return LDB_ERR_OTHER;
 	}
 
-	tdb_data.dptr = ldb_data.data;
-	tdb_data.dsize = ldb_data.length;
+	ldb_key.data = tdb_key.dptr;
+	ldb_key.length = tdb_key.dsize;
 
-	ret = ltdb->kv_ops->store(ltdb, tdb_key, tdb_data, flgs);
+	ret = ltdb->kv_ops->store(ltdb, ldb_key, ldb_data, flgs);
 	if (ret != 0) {
 		bool is_special = ldb_dn_is_special(msg->dn);
 		ret = ltdb->kv_ops->error(ltdb);
@@ -654,8 +664,12 @@ static int ltdb_add(struct ltdb_context *ctx)
 	return ret;
 }
 
-static int ltdb_tdb_delete(struct ltdb_private *ltdb, TDB_DATA tdb_key)
+static int ltdb_tdb_delete(struct ltdb_private *ltdb, struct ldb_val ldb_key)
 {
+	TDB_DATA tdb_key = {
+		.dptr = ldb_key.data,
+		.dsize = ldb_key.length
+	};
 	return tdb_delete(ltdb->tdb, tdb_key);
 }
 
@@ -668,6 +682,7 @@ int ltdb_delete_noindex(struct ldb_module *module,
 {
 	void *data = ldb_module_get_private(module);
 	struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
+	struct ldb_val ldb_key;
 	TDB_DATA tdb_key;
 	int ret;
 	TALLOC_CTX *tdb_key_ctx = talloc_new(module);
@@ -686,7 +701,10 @@ int ltdb_delete_noindex(struct ldb_module *module,
 		return LDB_ERR_OTHER;
 	}
 
-	ret = ltdb->kv_ops->delete(ltdb, tdb_key);
+	ldb_key.data = tdb_key.dptr;
+	ldb_key.length = tdb_key.dsize;
+
+	ret = ltdb->kv_ops->delete(ltdb, ldb_key);
 	TALLOC_FREE(tdb_key_ctx);
 
 	if (ret != 0) {
@@ -1777,12 +1795,27 @@ static int ltdb_tdb_traverse_fn(struct ltdb_private *ltdb, ldb_kv_traverse_fn fn
 	}
 }
 
-static int ltdb_tdb_update_in_iterate(struct ltdb_private *ltdb, TDB_DATA key, TDB_DATA key2, TDB_DATA data, void *state)
+static int ltdb_tdb_update_in_iterate(struct ltdb_private *ltdb,
+				      struct ldb_val ldb_key,
+				      struct ldb_val ldb_key2,
+				      struct ldb_val ldb_data, void *state)
 {
 	int tdb_ret;
 	struct ldb_context *ldb;
 	struct ltdb_reindex_context *ctx = (struct ltdb_reindex_context *)state;
 	struct ldb_module *module = ctx->module;
+	TDB_DATA key = {
+		.dptr = ldb_key.data,
+		.dsize = ldb_key.length
+	};
+	TDB_DATA key2 = {
+		.dptr = ldb_key2.data,
+		.dsize = ldb_key2.length
+	};
+	TDB_DATA data = {
+		.dptr = ldb_data.data,
+		.dsize = ldb_data.length
+	};
 
 	ldb = ldb_module_get_ctx(module);
 
diff --git a/lib/ldb/ldb_tdb/ldb_tdb.h b/lib/ldb/ldb_tdb/ldb_tdb.h
index 28dd20915c7..2c0ea7cfe38 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.h
+++ b/lib/ldb/ldb_tdb/ldb_tdb.h
@@ -10,11 +10,11 @@ typedef int (*ldb_kv_traverse_fn)(struct ltdb_private *ltdb,
 				  void *ctx);
 
 struct kv_db_ops {
-	int (*store)(struct ltdb_private *ltdb, TDB_DATA key, TDB_DATA data, int flags);
-	int (*delete)(struct ltdb_private *ltdb, TDB_DATA key);
+	int (*store)(struct ltdb_private *ltdb, struct ldb_val key, struct ldb_val data, int flags);
+	int (*delete)(struct ltdb_private *ltdb, struct ldb_val key);
 	int (*iterate)(struct ltdb_private *ltdb, ldb_kv_traverse_fn fn, void *ctx);
-	int (*update_in_iterate)(struct ltdb_private *ltdb, TDB_DATA key,
-				 TDB_DATA key2, TDB_DATA data, void *ctx);
+	int (*update_in_iterate)(struct ltdb_private *ltdb, struct ldb_val key,
+				 struct ldb_val key2, struct ldb_val data, void *ctx);
 	int (*fetch_and_parse)(struct ltdb_private *ltdb, TDB_DATA key,
                                int (*parser)(TDB_DATA key, TDB_DATA data,
                                              void *private_data),
-- 
2.11.0


From 9b363597f46c458f83ed0f10980a1a2ce1f08cf9 Mon Sep 17 00:00:00 2001
From: Garming Sam <garming at catalyst.net.nz>
Date: Mon, 19 Feb 2018 12:37:20 +1300
Subject: [PATCH 06/23] ldb: Change remaining fetch prototypes to remove
 TDB_DATA

Signed-off-by: Garming Sam <garming at catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb/ldb_tdb/ldb_search.c | 28 +++++++++++++++-------------
 lib/ldb/ldb_tdb/ldb_tdb.c    | 38 +++++++++++++++++++++++++++++++++++---
 lib/ldb/ldb_tdb/ldb_tdb.h    |  4 ++--
 3 files changed, 52 insertions(+), 18 deletions(-)

diff --git a/lib/ldb/ldb_tdb/ldb_search.c b/lib/ldb/ldb_tdb/ldb_search.c
index 78ef8b0abb1..35583a1eaa3 100644
--- a/lib/ldb/ldb_tdb/ldb_search.c
+++ b/lib/ldb/ldb_tdb/ldb_search.c
@@ -180,17 +180,15 @@ struct ltdb_parse_data_unpack_ctx {
 	unsigned int unpack_flags;
 };
 
-static int ltdb_parse_data_unpack(TDB_DATA key, TDB_DATA data,
+static int ltdb_parse_data_unpack(struct ldb_val key,
+				  struct ldb_val data,
 				  void *private_data)
 {
 	struct ltdb_parse_data_unpack_ctx *ctx = private_data;
 	unsigned int nb_elements_in_db;
 	int ret;
 	struct ldb_context *ldb = ldb_module_get_ctx(ctx->module);
-	struct ldb_val data_parse = {
-		.data = data.dptr,
-		.length = data.dsize
-	};
+	struct ldb_val data_parse = data;
 
 	if (ctx->unpack_flags & LDB_UNPACK_DATA_FLAG_NO_DATA_ALLOC) {
 		/*
@@ -200,13 +198,13 @@ static int ltdb_parse_data_unpack(TDB_DATA key, TDB_DATA data,
 		 * and the caller needs a stable result.
 		 */
 		data_parse.data = talloc_memdup(ctx->msg,
-						data.dptr,
-						data.dsize);
+						data.data,
+						data.length);
 		if (data_parse.data == NULL) {
 			ldb_debug(ldb, LDB_DEBUG_ERROR,
 				  "Unable to allocate data(%d) for %*.*s\n",
-				  (int)data.dsize,
-				  (int)key.dsize, (int)key.dsize, key.dptr);
+				  (int)data.length,
+				  (int)key.length, (int)key.length, key.data);
 			return LDB_ERR_OPERATIONS_ERROR;
 		}
 	}
@@ -217,13 +215,13 @@ static int ltdb_parse_data_unpack(TDB_DATA key, TDB_DATA data,
 						   ctx->unpack_flags,
 						   &nb_elements_in_db);
 	if (ret == -1) {
-		if (data_parse.data != data.dptr) {
+		if (data_parse.data != data.data) {
 			talloc_free(data_parse.data);
 		}
 
 		ldb_debug(ldb, LDB_DEBUG_ERROR, "Invalid data for index %*.*s\n",
-			  (int)key.dsize, (int)key.dsize, key.dptr);
-		return LDB_ERR_OPERATIONS_ERROR;		
+			  (int)key.length, (int)key.length, key.data);
+		return LDB_ERR_OPERATIONS_ERROR;
 	}
 	return ret;
 }
@@ -246,13 +244,17 @@ int ltdb_search_key(struct ldb_module *module, struct ltdb_private *ltdb,
 		.module = module,
 		.unpack_flags = unpack_flags
 	};
+	struct ldb_val ldb_key = {
+		.data = tdb_key.dptr,
+		.length = tdb_key.dsize
+	};
 
 	memset(msg, 0, sizeof(*msg));
 
 	msg->num_elements = 0;
 	msg->elements = NULL;
 
-	ret = ltdb->kv_ops->fetch_and_parse(ltdb, tdb_key,
+	ret = ltdb->kv_ops->fetch_and_parse(ltdb, ldb_key,
 					    ltdb_parse_data_unpack, &ctx);
 
 	if (ret == -1) {
diff --git a/lib/ldb/ldb_tdb/ldb_tdb.c b/lib/ldb/ldb_tdb/ldb_tdb.c
index f94a308cbf9..423b5a1f687 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.c
+++ b/lib/ldb/ldb_tdb/ldb_tdb.c
@@ -1765,6 +1765,9 @@ struct kv_ctx {
 	ldb_kv_traverse_fn kv_traverse_fn;
 	void *ctx;
 	struct ltdb_private *ltdb;
+	int (*parser)(struct ldb_val key,
+		      struct ldb_val data,
+		      void *private_data);
 };
 
 static int ldb_tdb_traverse_fn_wrapper(struct tdb_context *tdb, TDB_DATA tdb_key, TDB_DATA tdb_data, void *ctx)
@@ -1847,12 +1850,41 @@ static int ltdb_tdb_update_in_iterate(struct ltdb_private *ltdb,
 	return tdb_ret;
 }
 
-static int ltdb_tdb_parse_record(struct ltdb_private *ltdb, TDB_DATA key,
-				 int (*parser)(TDB_DATA key, TDB_DATA data,
+static int ltdb_tdb_parse_record_wrapper(TDB_DATA tdb_key, TDB_DATA tdb_data,
+					 void *ctx)
+{
+	struct kv_ctx *kv_ctx = ctx;
+	struct ldb_val key = {
+		.length = tdb_key.dsize,
+		.data = tdb_key.dptr,
+	};
+	struct ldb_val data = {
+		.length = tdb_data.dsize,
+		.data = tdb_data.dptr,
+	};
+
+	return kv_ctx->parser(key, data, kv_ctx->ctx);
+}
+
+static int ltdb_tdb_parse_record(struct ltdb_private *ltdb,
+				 struct ldb_val ldb_key,
+				 int (*parser)(struct ldb_val key,
+					       struct ldb_val data,
 					       void *private_data),
 				 void *ctx)
 {
-	return tdb_parse_record(ltdb->tdb, key, parser, ctx);
+	struct kv_ctx kv_ctx = {
+		.parser = parser,
+		.ctx = ctx,
+		.ltdb = ltdb
+	};
+	TDB_DATA key = {
+		.dptr = ldb_key.data,
+		.dsize = ldb_key.length
+	};
+
+	return tdb_parse_record(ltdb->tdb, key, ltdb_tdb_parse_record_wrapper,
+				&kv_ctx);
 }
 
 static const char * ltdb_tdb_name(struct ltdb_private *ltdb)
diff --git a/lib/ldb/ldb_tdb/ldb_tdb.h b/lib/ldb/ldb_tdb/ldb_tdb.h
index 2c0ea7cfe38..f14666ba88a 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.h
+++ b/lib/ldb/ldb_tdb/ldb_tdb.h
@@ -15,8 +15,8 @@ struct kv_db_ops {
 	int (*iterate)(struct ltdb_private *ltdb, ldb_kv_traverse_fn fn, void *ctx);
 	int (*update_in_iterate)(struct ltdb_private *ltdb, struct ldb_val key,
 				 struct ldb_val key2, struct ldb_val data, void *ctx);
-	int (*fetch_and_parse)(struct ltdb_private *ltdb, TDB_DATA key,
-                               int (*parser)(TDB_DATA key, TDB_DATA data,
+	int (*fetch_and_parse)(struct ltdb_private *ltdb, struct ldb_val key,
+                               int (*parser)(struct ldb_val key, struct ldb_val data,
                                              void *private_data),
                                void *ctx);
 	int (*lock_read)(struct ldb_module *);
-- 
2.11.0


From 4ed37516f5e785288be53d2c54a6fc962a1ab759 Mon Sep 17 00:00:00 2001
From: Gary Lockyer <gary at catalyst.net.nz>
Date: Wed, 17 Jan 2018 14:02:09 +1300
Subject: [PATCH 07/23] upgradeprovision: Do not copy backup lmdb -lock files

Signed-off-by: Gary Lockyer <gary at catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet at samba.org>
---
 source4/scripting/bin/samba_upgradeprovision | 7 ++++---
 1 file changed, 4 insertions(+), 3 deletions(-)

diff --git a/source4/scripting/bin/samba_upgradeprovision b/source4/scripting/bin/samba_upgradeprovision
index f3e690ba552..e0db12e0818 100755
--- a/source4/scripting/bin/samba_upgradeprovision
+++ b/source4/scripting/bin/samba_upgradeprovision
@@ -1397,9 +1397,10 @@ def backup_provision(paths, dir, only_db):
     else:
         os.mkdir(os.path.join(dir, "sam.ldb.d"), 0700)
 
-        for ldb in os.listdir(samldbdir):
-            tdb_util.tdb_copy(os.path.join(samldbdir, ldb),
-                              os.path.join(dir, "sam.ldb.d", ldb))
+        for ldb_name in os.listdir(samldbdir):
+            if not ldb_name.endswith("-lock"):
+                tdb_util.tdb_copy(os.path.join(samldbdir, ldb_name),
+                                  os.path.join(dir, "sam.ldb.d", ldb_name))
 
 
 def sync_calculated_attributes(samdb, names):
-- 
2.11.0


From a8a15fdbe5d609868e94649bb083dcd811b5f18e Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Tue, 6 Mar 2018 17:00:07 +1300
Subject: [PATCH 08/23] ldb: Ignore these tests in mdb test mode

These are tests are specifically for when the GUID index is not in use
which is always in with ldb_mdb.

Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb/tests/ldb_mod_op_test.c | 11 +++++++++++
 1 file changed, 11 insertions(+)

diff --git a/lib/ldb/tests/ldb_mod_op_test.c b/lib/ldb/tests/ldb_mod_op_test.c
index 0d38c4f9681..db0c594b430 100644
--- a/lib/ldb/tests/ldb_mod_op_test.c
+++ b/lib/ldb/tests/ldb_mod_op_test.c
@@ -3389,6 +3389,11 @@ static void test_ldb_unique_index_duplicate_logging(void **state)
 	char *debug_string = NULL;
 	char *p = NULL;
 
+	/* The non-GUID mode is not compatible with mdb */
+	if (strcmp(TEST_BE, "mdb") == 0) {
+		return;
+	}
+	
 	ldb_set_debug(test_ctx->ldb, ldb_debug_string, &debug_string);
 	tmp_ctx = talloc_new(test_ctx);
 	assert_non_null(tmp_ctx);
@@ -3438,6 +3443,11 @@ static void test_ldb_duplicate_dn_logging(void **state)
 	TALLOC_CTX *tmp_ctx;
 	char *debug_string = NULL;
 
+	/* The non-GUID mode is not compatible with mdb */
+	if (strcmp(TEST_BE, "mdb") == 0) {
+		return;
+	}
+	
 	ldb_set_debug(test_ctx->ldb, ldb_debug_string, &debug_string);
 	tmp_ctx = talloc_new(test_ctx);
 	assert_non_null(tmp_ctx);
@@ -3836,6 +3846,7 @@ int main(int argc, const char **argv)
 			test_ldb_add_to_index_unique_values_required,
 			ldb_non_unique_index_test_setup,
 			ldb_non_unique_index_test_teardown),
+		/* These tests are not compatible with mdb */
 		cmocka_unit_test_setup_teardown(
 			test_ldb_unique_index_duplicate_logging,
 			ldb_unique_index_test_setup,
-- 
2.11.0


From 2d21aeda1ea19b653e8ebdf900105d3dce063ecb Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Fri, 23 Mar 2018 11:10:25 +1300
Subject: [PATCH 09/23] ldb: Allow GUID index mode to be tested on TDB

Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb/tests/ldb_mod_op_test.c | 149 ++++++++++++++++++++++++++++++++++------
 lib/ldb/wscript                 |   7 ++
 2 files changed, 136 insertions(+), 20 deletions(-)

diff --git a/lib/ldb/tests/ldb_mod_op_test.c b/lib/ldb/tests/ldb_mod_op_test.c
index db0c594b430..dfc0209260e 100644
--- a/lib/ldb/tests/ldb_mod_op_test.c
+++ b/lib/ldb/tests/ldb_mod_op_test.c
@@ -202,6 +202,16 @@ static void test_ldif_message_redacted(void **state)
 static int ldbtest_setup(void **state)
 {
 	struct ldbtest_ctx *test_ctx;
+	struct ldb_ldif *ldif;
+#ifdef GUID_IDX
+	const char *index_ldif =		\
+		"dn: @INDEXLIST\n"
+		"@IDXGUID: objectUUID\n"
+		"@IDX_DN_GUID: GUID\n"
+		"\n";
+#else
+	const char *index_ldif = "\n";
+#endif
 	int ret;
 
 	ldbtest_noconn_setup((void **) &test_ctx);
@@ -209,6 +219,10 @@ static int ldbtest_setup(void **state)
 	ret = ldb_connect(test_ctx->ldb, test_ctx->dbpath, 0, NULL);
 	assert_int_equal(ret, 0);
 
+	while ((ldif = ldb_ldif_read_string(test_ctx->ldb, &index_ldif))) {
+		ret = ldb_add(test_ctx->ldb, ldif->msg);
+		assert_int_equal(ret, LDB_SUCCESS);
+	}
 	*state = test_ctx;
 	return 0;
 }
@@ -241,6 +255,9 @@ static void test_ldb_add(void **state)
 	ret = ldb_msg_add_string(msg, "cn", "test_cn_val");
 	assert_int_equal(ret, 0);
 
+	ret = ldb_msg_add_string(msg, "objectUUID", "0123456789abcdef");
+	assert_int_equal(ret, 0);
+
 	ret = ldb_add(test_ctx->ldb, msg);
 	assert_int_equal(ret, 0);
 
@@ -279,6 +296,9 @@ static void test_ldb_search(void **state)
 	ret = ldb_msg_add_string(msg, "cn", "test_cn_val1");
 	assert_int_equal(ret, 0);
 
+	ret = ldb_msg_add_string(msg, "objectUUID", "0123456789abcde1");
+	assert_int_equal(ret, 0);
+
 	ret = ldb_add(test_ctx->ldb, msg);
 	assert_int_equal(ret, 0);
 
@@ -294,6 +314,9 @@ static void test_ldb_search(void **state)
 	ret = ldb_msg_add_string(msg, "cn", "test_cn_val2");
 	assert_int_equal(ret, 0);
 
+	ret = ldb_msg_add_string(msg, "objectUUID", "0123456789abcde2");
+	assert_int_equal(ret, 0);
+
 	ret = ldb_add(test_ctx->ldb, msg);
 	assert_int_equal(ret, 0);
 
@@ -390,7 +413,8 @@ static void assert_dn_doesnt_exist(struct ldbtest_ctx *test_ctx,
 
 static void add_dn_with_cn(struct ldbtest_ctx *test_ctx,
 			   struct ldb_dn *dn,
-			   const char *cn_value)
+			   const char *cn_value,
+			   const char *uuid_value)
 {
 	int ret;
 	TALLOC_CTX *tmp_ctx;
@@ -409,6 +433,9 @@ static void add_dn_with_cn(struct ldbtest_ctx *test_ctx,
 	ret = ldb_msg_add_string(msg, "cn", cn_value);
 	assert_int_equal(ret, LDB_SUCCESS);
 
+	ret = ldb_msg_add_string(msg, "objectUUID", uuid_value);
+	assert_int_equal(ret, 0);
+
 	ret = ldb_add(test_ctx->ldb, msg);
 	assert_int_equal(ret, LDB_SUCCESS);
 
@@ -428,7 +455,9 @@ static void test_ldb_del(void **state)
 	dn = ldb_dn_new_fmt(test_ctx, test_ctx->ldb, "%s", basedn);
 	assert_non_null(dn);
 
-	add_dn_with_cn(test_ctx, dn, "test_del_cn_val");
+	add_dn_with_cn(test_ctx, dn,
+		       "test_del_cn_val",
+		       "0123456789abcdef");
 
 	ret = ldb_delete(test_ctx->ldb, dn);
 	assert_int_equal(ret, LDB_SUCCESS);
@@ -552,7 +581,8 @@ static void test_ldb_build_search_req(void **state)
 
 static void add_keyval(struct ldbtest_ctx *test_ctx,
 		       const char *key,
-		       const char *val)
+		       const char *val,
+		       const char *uuid)
 {
 	int ret;
 	struct ldb_message *msg;
@@ -566,6 +596,9 @@ static void add_keyval(struct ldbtest_ctx *test_ctx,
 	ret = ldb_msg_add_string(msg, key, val);
 	assert_int_equal(ret, 0);
 
+	ret = ldb_msg_add_string(msg, "objectUUID", uuid);
+	assert_int_equal(ret, 0);
+
 	ret = ldb_add(test_ctx->ldb, msg);
 	assert_int_equal(ret, 0);
 
@@ -601,7 +634,8 @@ static void test_transactions(void **state)
 	ret = ldb_transaction_start(test_ctx->ldb);
 	assert_int_equal(ret, 0);
 
-	add_keyval(test_ctx, "vegetable", "carrot");
+	add_keyval(test_ctx, "vegetable", "carrot",
+		   "0123456789abcde0");
 
 	/* commit lev-0 transaction */
 	ret = ldb_transaction_commit(test_ctx->ldb);
@@ -611,7 +645,8 @@ static void test_transactions(void **state)
 	ret = ldb_transaction_start(test_ctx->ldb);
 	assert_int_equal(ret, 0);
 
-	add_keyval(test_ctx, "fruit", "apple");
+	add_keyval(test_ctx, "fruit", "apple",
+		   "0123456789abcde1");
 
 	/* abort lev-1 nested transaction */
 	ret = ldb_transaction_cancel(test_ctx->ldb);
@@ -637,14 +672,16 @@ static void test_nested_transactions(void **state)
 	ret = ldb_transaction_start(test_ctx->ldb);
 	assert_int_equal(ret, 0);
 
-	add_keyval(test_ctx, "vegetable", "carrot");
+	add_keyval(test_ctx, "vegetable", "carrot",
+		   "0123456789abcde0");
 
 
 	/* start another lev-1 nested transaction */
 	ret = ldb_transaction_start(test_ctx->ldb);
 	assert_int_equal(ret, 0);
 
-	add_keyval(test_ctx, "fruit", "apple");
+	add_keyval(test_ctx, "fruit", "apple",
+		   "0123456789abcde1");
 
 	/* abort lev-1 nested transaction */
 	ret = ldb_transaction_cancel(test_ctx->ldb);
@@ -825,6 +862,7 @@ static int ldb_modify_test_setup(void **state)
 	struct ldb_mod_test_ctx *mod_test_ctx;
 	struct keyval kvs[] = {
 		{ "cn", "test_mod_cn" },
+		{ "objectUUID", "0123456789abcdef"},
 		{ NULL, NULL },
 	};
 
@@ -1129,6 +1167,7 @@ static int ldb_search_test_setup(void **state)
 		{ "cn", "test_search_cn2" },
 		{ "uid", "test_search_uid" },
 		{ "uid", "test_search_uid2" },
+		{ "objectUUID", "0123456789abcde0"},
 		{ NULL, NULL },
 	};
 	struct keyval kvs2[] = {
@@ -1136,6 +1175,7 @@ static int ldb_search_test_setup(void **state)
 		{ "cn", "test_search_2_cn2" },
 		{ "uid", "test_search_2_uid" },
 		{ "uid", "test_search_2_uid2" },
+		{ "objectUUID", "0123456789abcde1"},
 		{ NULL, NULL },
 	};
 
@@ -1532,6 +1572,12 @@ static int test_ldb_search_against_transaction_callback1(struct ldb_request *req
 			exit(LDB_ERR_OPERATIONS_ERROR);
 		}
 
+		ret = ldb_msg_add_string(msg, "objectUUID",
+					 "0123456789abcdef");
+		if (ret != 0) {
+			exit(ret);
+		}
+
 		ret = ldb_add(ctx->test_ctx->ldb, msg);
 		if (ret != 0) {
 			exit(ret);
@@ -1894,13 +1940,16 @@ static void test_ldb_modify_during_search(void **state, bool add_index,
 
 		ret = ldb_msg_add_string(msg, "@IDXATTR", "cn");
 		assert_int_equal(ret, LDB_SUCCESS);
-
 		ret = ldb_add(search_test_ctx->ldb_test_ctx->ldb,
 			      msg);
-
+		if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
+			msg->elements[0].flags = LDB_FLAG_MOD_ADD;
+			ret = ldb_modify(search_test_ctx->ldb_test_ctx->ldb,
+					 msg);
+		}
 		assert_int_equal(ret, LDB_SUCCESS);
 	}
-
+		
 	tevent_loop_allow_nesting(search_test_ctx->ldb_test_ctx->ev);
 
 	ctx.basedn
@@ -2436,6 +2485,7 @@ static int ldb_case_test_setup(void **state)
 	struct keyval kvs[] = {
 		{ "cn", "CaseInsensitiveValue" },
 		{ "uid", "CaseSensitiveValue" },
+		{ "objectUUID", "0123456789abcdef" },
 		{ NULL, NULL },
 	};
 
@@ -2649,6 +2699,11 @@ static void test_ldb_attrs_index_handler(void **state)
 	/* Add the index (actually any modify will do) */
 	while ((ldif = ldb_ldif_read_string(ldb_test_ctx->ldb, &index_ldif))) {
 		ret = ldb_add(ldb_test_ctx->ldb, ldif->msg);
+		if (ret == LDB_ERR_ENTRY_ALREADY_EXISTS) {
+			ldif->msg->elements[0].flags = LDB_FLAG_MOD_ADD;
+			ret = ldb_modify(ldb_test_ctx->ldb,
+					 ldif->msg);
+		}
 		assert_int_equal(ret, LDB_SUCCESS);
 	}
 
@@ -2734,7 +2789,8 @@ static int ldb_rename_test_setup(void **state)
 
 	add_dn_with_cn(ldb_test_ctx,
 		       rename_test_ctx->basedn,
-		       "test_rename_cn_val");
+		       "test_rename_cn_val",
+		       "0123456789abcde0");
 
 	*state = rename_test_ctx;
 	return 0;
@@ -2839,7 +2895,8 @@ static void test_ldb_rename_to_exists(void **state)
 
 	add_dn_with_cn(rename_test_ctx->ldb_test_ctx,
 		       new_dn,
-		       "test_rename_cn_val");
+		       "test_rename_cn_val",
+		       "0123456789abcde1");
 
 	ret = ldb_rename(rename_test_ctx->ldb_test_ctx->ldb,
 			 rename_test_ctx->basedn,
@@ -3006,6 +3063,10 @@ static void test_read_only(void **state)
 		ret = ldb_msg_add_string(msg, "cn", "test_cn_val");
 		assert_int_equal(ret, 0);
 
+		ret = ldb_msg_add_string(msg, "objectUUID",
+					 "0123456789abcde1");
+		assert_int_equal(ret, LDB_SUCCESS);
+
 		ret = ldb_add(ro_ldb, msg);
 		assert_int_equal(ret, LDB_ERR_UNWILLING_TO_PERFORM);
 		TALLOC_FREE(msg);
@@ -3025,6 +3086,10 @@ static void test_read_only(void **state)
 		ret = ldb_msg_add_string(msg, "cn", "test_cn_val");
 		assert_int_equal(ret, 0);
 
+		ret = ldb_msg_add_string(msg, "objectUUID",
+					 "0123456789abcde2");
+		assert_int_equal(ret, LDB_SUCCESS);
+
 		ret = ldb_add(rw_ldb, msg);
 		assert_int_equal(ret, LDB_SUCCESS);
 		TALLOC_FREE(msg);
@@ -3098,6 +3163,10 @@ static int ldb_unique_index_test_setup(void **state)
 	const char *index_ldif =  \
 		"dn: @INDEXLIST\n"
 		"@IDXATTR: cn\n"
+#ifdef GUID_IDX
+		"@IDXGUID: objectUUID\n"
+		"@IDX_DN_GUID: GUID\n"
+#endif
 		"\n";
 	const char *options[] = {"modules:unique_index_test", NULL};
 
@@ -3186,6 +3255,10 @@ static void test_ldb_add_unique_value_to_unique_index(void **state)
 	ret = ldb_msg_add_string(msg, "cn", "test_unique_index");
 	assert_int_equal(ret, LDB_SUCCESS);
 
+	ret = ldb_msg_add_string(msg, "objectUUID",
+				 "0123456789abcde1");
+	assert_int_equal(ret, LDB_SUCCESS);
+
 	ret = ldb_add(test_ctx->ldb, msg);
 	assert_int_equal(ret, LDB_SUCCESS);
 
@@ -3200,6 +3273,10 @@ static int ldb_non_unique_index_test_setup(void **state)
 	const char *index_ldif =  \
 		"dn: @INDEXLIST\n"
 		"@IDXATTR: cn\n"
+#ifdef GUID_IDX
+		"@IDXGUID: objectUUID\n"
+		"@IDX_DN_GUID: GUID\n"
+#endif
 		"\n";
 	const char *options[] = {"modules:unique_index_test", NULL};
 
@@ -3270,6 +3347,10 @@ static void test_ldb_add_duplicate_value_to_unique_index(void **state)
 	ret = ldb_msg_add_string(msg01, "cn", "test_unique_index");
 	assert_int_equal(ret, LDB_SUCCESS);
 
+	ret = ldb_msg_add_string(msg01, "objectUUID",
+				 "0123456789abcde1");
+	assert_int_equal(ret, LDB_SUCCESS);
+
 	ret = ldb_add(test_ctx->ldb, msg01);
 	assert_int_equal(ret, LDB_SUCCESS);
 
@@ -3281,6 +3362,10 @@ static void test_ldb_add_duplicate_value_to_unique_index(void **state)
 
 	ret = ldb_msg_add_string(msg02, "cn", "test_unique_index");
 	assert_int_equal(ret, LDB_SUCCESS);
+	
+	ret = ldb_msg_add_string(msg02, "objectUUID",
+				 "0123456789abcde2");
+	assert_int_equal(ret, LDB_SUCCESS);
 
 	ret = ldb_add(test_ctx->ldb, msg02);
 	assert_int_equal(ret, LDB_ERR_CONSTRAINT_VIOLATION);
@@ -3311,6 +3396,9 @@ static void test_ldb_add_to_index_duplicates_allowed(void **state)
 	ret = ldb_msg_add_string(msg01, "cn", "test_unique_index");
 	assert_int_equal(ret, LDB_SUCCESS);
 
+	ret = ldb_msg_add_string(msg01, "objectUUID",
+				 "0123456789abcde1");
+
 	ret = ldb_add(test_ctx->ldb, msg01);
 	assert_int_equal(ret, LDB_SUCCESS);
 
@@ -3323,6 +3411,9 @@ static void test_ldb_add_to_index_duplicates_allowed(void **state)
 	ret = ldb_msg_add_string(msg02, "cn", "test_unique_index");
 	assert_int_equal(ret, LDB_SUCCESS);
 
+	ret = ldb_msg_add_string(msg02, "objectUUID",
+				 "0123456789abcde2");
+	
 	ret = ldb_add(test_ctx->ldb, msg02);
 	assert_int_equal(ret, LDB_SUCCESS);
 	talloc_free(tmp_ctx);
@@ -3352,6 +3443,9 @@ static void test_ldb_add_to_index_unique_values_required(void **state)
 	ret = ldb_msg_add_string(msg01, "cn", "test_unique_index");
 	assert_int_equal(ret, LDB_SUCCESS);
 
+	ret = ldb_msg_add_string(msg01, "objectUUID",
+				 "0123456789abcde1");
+	
 	ret = ldb_add(test_ctx->ldb, msg01);
 	assert_int_equal(ret, LDB_SUCCESS);
 
@@ -3364,6 +3458,9 @@ static void test_ldb_add_to_index_unique_values_required(void **state)
 	ret = ldb_msg_add_string(msg02, "cn", "test_unique_index");
 	assert_int_equal(ret, LDB_SUCCESS);
 
+	ret = ldb_msg_add_string(msg02, "objectUUID",
+				 "0123456789abcde2");
+	
 	ret = ldb_add(test_ctx->ldb, msg02);
 	assert_int_equal(ret, LDB_ERR_CONSTRAINT_VIOLATION);
 	talloc_free(tmp_ctx);
@@ -3389,10 +3486,10 @@ static void test_ldb_unique_index_duplicate_logging(void **state)
 	char *debug_string = NULL;
 	char *p = NULL;
 
-	/* The non-GUID mode is not compatible with mdb */
-	if (strcmp(TEST_BE, "mdb") == 0) {
-		return;
-	}
+	/* The GUID mode is not compatible with this test */
+#ifdef GUID_IDX
+	return;
+#endif
 	
 	ldb_set_debug(test_ctx->ldb, ldb_debug_string, &debug_string);
 	tmp_ctx = talloc_new(test_ctx);
@@ -3407,6 +3504,9 @@ static void test_ldb_unique_index_duplicate_logging(void **state)
 	ret = ldb_msg_add_string(msg01, "cn", "test_unique_index");
 	assert_int_equal(ret, LDB_SUCCESS);
 
+	ret = ldb_msg_add_string(msg01, "objectUUID",
+				 "0123456789abcde1");
+	
 	ret = ldb_add(test_ctx->ldb, msg01);
 	assert_int_equal(ret, LDB_SUCCESS);
 
@@ -3419,6 +3519,9 @@ static void test_ldb_unique_index_duplicate_logging(void **state)
 	ret = ldb_msg_add_string(msg02, "cn", "test_unique_index");
 	assert_int_equal(ret, LDB_SUCCESS);
 
+	ret = ldb_msg_add_string(msg02, "objectUUID",
+				 "0123456789abcde2");
+	
 	ret = ldb_add(test_ctx->ldb, msg02);
 	assert_int_equal(ret, LDB_ERR_CONSTRAINT_VIOLATION);
 
@@ -3443,10 +3546,10 @@ static void test_ldb_duplicate_dn_logging(void **state)
 	TALLOC_CTX *tmp_ctx;
 	char *debug_string = NULL;
 
-	/* The non-GUID mode is not compatible with mdb */
-	if (strcmp(TEST_BE, "mdb") == 0) {
-		return;
-	}
+	/* The GUID mode is not compatible with this test */
+#ifdef GUID_IDX
+	return;
+#endif
 	
 	ldb_set_debug(test_ctx->ldb, ldb_debug_string, &debug_string);
 	tmp_ctx = talloc_new(test_ctx);
@@ -3461,6 +3564,9 @@ static void test_ldb_duplicate_dn_logging(void **state)
 	ret = ldb_msg_add_string(msg01, "cn", "test_unique_index01");
 	assert_int_equal(ret, LDB_SUCCESS);
 
+	ret = ldb_msg_add_string(msg01, "objectUUID",
+				 "0123456789abcde1");
+	
 	ret = ldb_add(test_ctx->ldb, msg01);
 	assert_int_equal(ret, LDB_SUCCESS);
 
@@ -3473,6 +3579,9 @@ static void test_ldb_duplicate_dn_logging(void **state)
 	ret = ldb_msg_add_string(msg02, "cn", "test_unique_index02");
 	assert_int_equal(ret, LDB_SUCCESS);
 
+	ret = ldb_msg_add_string(msg02, "objectUUID",
+				 "0123456789abcde2");
+	
 	ret = ldb_add(test_ctx->ldb, msg02);
 	assert_int_equal(ret, LDB_ERR_ENTRY_ALREADY_EXISTS);
 
diff --git a/lib/ldb/wscript b/lib/ldb/wscript
index 1455f92eb2e..f959d59b28f 100644
--- a/lib/ldb/wscript
+++ b/lib/ldb/wscript
@@ -353,6 +353,12 @@ def build(bld):
                          deps='cmocka ldb',
                          install=False)
 
+        bld.SAMBA_BINARY('ldb_tdb_guid_mod_op_test',
+                         source='tests/ldb_mod_op_test.c',
+                         cflags='-DTEST_BE=\"tdb\" -DGUID_IDX=1',
+                         deps='cmocka ldb',
+                         install=False)
+
         bld.SAMBA_BINARY('ldb_msg_test',
                          source='tests/ldb_msg.c',
                          deps='cmocka ldb',
@@ -386,6 +392,7 @@ def test(ctx):
 
     cmocka_ret = 0
     for test_exe in ['ldb_tdb_mod_op_test',
+                     'ldb_tdb_guid_mod_op_test',
                      'ldb_msg_test']:
             cmd = os.path.join(Utils.g_module.blddir, test_exe)
             cmocka_ret = cmocka_ret or samba_utils.RUN_COMMAND(cmd)
-- 
2.11.0


From 3cba80b688be4a79df8985f1078a93b3d6b43d40 Mon Sep 17 00:00:00 2001
From: Gary Lockyer <gary at catalyst.net.nz>
Date: Tue, 13 Mar 2018 08:10:54 +1300
Subject: [PATCH 10/23] ldb index: Fix truncation key length calculation

Signed-off-by: Gary Lockyer <gary at catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb/ldb_tdb/ldb_index.c   | 18 ++++++++++++++++--
 lib/ldb/tests/python/index.py |  7 +++++--
 2 files changed, 21 insertions(+), 4 deletions(-)

diff --git a/lib/ldb/ldb_tdb/ldb_index.c b/lib/ldb/ldb_tdb/ldb_index.c
index 076db10f2dd..ccbc6443b81 100644
--- a/lib/ldb/ldb_tdb/ldb_index.c
+++ b/lib/ldb/ldb_tdb/ldb_index.c
@@ -848,6 +848,18 @@ static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
 	unsigned indx_len = 0;
 	unsigned frmt_len = 0;
 
+	if (max_key_length < 4) {
+		ldb_asprintf_errstring(
+			ldb,
+			__location__ ": max_key_length of (%u) < 4",
+			max_key_length);
+	}
+	/*
+	 * ltdb_key_dn() makes something 4 bytes longer, it adds a leading
+	 * "DN=" and a trailing string terninator
+	 */
+	max_key_length -= 4;
+
 	if (attr[0] == '@') {
 		attr_for_dn = attr;
 		v = *value;
@@ -911,12 +923,13 @@ static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
 	if (should_b64_encode) {
 		unsigned vstr_len = 0;
 		char *vstr = ldb_base64_encode(ldb, (char *)v.data, v.length);
+		unsigned num_separators = 3;
 		if (!vstr) {
 			talloc_free(attr_folded);
 			return NULL;
 		}
 		vstr_len = strlen(vstr);
-		key_len = 3 + indx_len + attr_len + vstr_len;
+		key_len = num_separators + indx_len + attr_len + vstr_len;
 		if (key_len > max_key_length) {
 			unsigned excess = key_len - max_key_length;
 			frmt_len = vstr_len - excess;
@@ -943,7 +956,8 @@ static struct ldb_dn *ltdb_index_key(struct ldb_context *ldb,
 		}
 		talloc_free(vstr);
 	} else {
-		key_len = 2 + indx_len + attr_len + (int)v.length;
+		unsigned num_separators = 2;
+		key_len = num_separators + indx_len + attr_len + (int)v.length;
 		if (key_len > max_key_length) {
 			unsigned excess = key_len - max_key_length;
 			frmt_len = v.length - excess;
diff --git a/lib/ldb/tests/python/index.py b/lib/ldb/tests/python/index.py
index e9eaaf059e1..1d66ee930d1 100755
--- a/lib/ldb/tests/python/index.py
+++ b/lib/ldb/tests/python/index.py
@@ -113,11 +113,14 @@ class MaxIndexKeyLengthTests(LdbBaseTest):
         super(MaxIndexKeyLengthTests, self).setUp()
         self.testdir = tempdir()
         self.filename = os.path.join(self.testdir, "key_len_test.ldb")
-        # Note that the maximum key length is set to 50
+        # Note that the maximum key length is set to 54
+        # This accounts for the 4 bytes added by the dn formatting
+        # a leading dn=, and a trailing zero terminator
+        #
         self.l = ldb.Ldb(self.url(),
                          options=[
                              "modules:rdn_name",
-                             "max_key_len_for_self_test:50"])
+                             "max_key_len_for_self_test:54"])
         self.l.add({"dn": "@ATTRIBUTES",
                     "uniqueThing": "UNIQUE_INDEX"})
         self.l.add({"dn": "@INDEXLIST",
-- 
2.11.0


From 81eca0012072d70a621a619f18ef03373d3758d0 Mon Sep 17 00:00:00 2001
From: Gary Lockyer <gary at catalyst.net.nz>
Date: Tue, 13 Mar 2018 08:45:28 +1300
Subject: [PATCH 11/23] ldb index: Add tests for truncated base 64 index keys

Signed-off-by: Gary Lockyer <gary at catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb/tests/python/index.py | 37 ++++++++++++++++++++++++++++++++++++-
 1 file changed, 36 insertions(+), 1 deletion(-)

diff --git a/lib/ldb/tests/python/index.py b/lib/ldb/tests/python/index.py
index 1d66ee930d1..9b9e4f3469f 100755
--- a/lib/ldb/tests/python/index.py
+++ b/lib/ldb/tests/python/index.py
@@ -124,7 +124,12 @@ class MaxIndexKeyLengthTests(LdbBaseTest):
         self.l.add({"dn": "@ATTRIBUTES",
                     "uniqueThing": "UNIQUE_INDEX"})
         self.l.add({"dn": "@INDEXLIST",
-                    "@IDXATTR": [b"uniqueThing", b"notUnique"],
+                    "@IDXATTR": [
+                        b"uniqueThing",
+                        b"notUnique",
+                        b"base64____lt",
+                        b"base64_____eq",
+                        b"base64______gt"],
                     "@IDXONE": [b"1"],
                     "@IDXGUID": [b"objectUUID"],
                     "@IDX_DN_GUID": [b"GUID"]})
@@ -870,6 +875,36 @@ class MaxIndexKeyLengthTests(LdbBaseTest):
             contains(res, "OU=12,OU=SEARCH_NON_UNIQUE01,DC=SAMBA,DC=ORG"))
 
     #
+    # Test index key truncation for base64 encoded values
+    #
+    def test_index_truncated_base64_encoded_keys(self):
+        value = b"aaaaaaaaaaaaaaaaaaaa\x02"
+        # base64 encodes to "YWFhYWFhYWFhYWFhYWFhYWFhYWEC"
+
+        # One less than max key length
+        self.l.add({"dn": "OU=01,OU=BASE64,DC=SAMBA,DC=ORG",
+                    "base64____lt": value,
+                    "objectUUID": b"0123456789abcde0"})
+        self.checkGuids(
+            "@INDEX:BASE64____LT::YWFhYWFhYWFhYWFhYWFhYWFhYWEC",
+            b"0123456789abcde0")
+
+        # Equal max key length
+        self.l.add({"dn": "OU=02,OU=BASE64,DC=SAMBA,DC=ORG",
+                    "base64_____eq": value,
+                    "objectUUID": b"0123456789abcde1"})
+        self.checkGuids(
+            "@INDEX:BASE64_____EQ::YWFhYWFhYWFhYWFhYWFhYWFhYWEC",
+            b"0123456789abcde1")
+
+        # One greater than max key length
+        self.l.add({"dn": "OU=03,OU=BASE64,DC=SAMBA,DC=ORG",
+                    "base64______gt": value,
+                    "objectUUID": b"0123456789abcde2"})
+        self.checkGuids(
+            "@INDEX#BASE64______GT##YWFhYWFhYWFhYWFhYWFhYWFhYWE",
+            b"0123456789abcde2")
+    #
     # Test adding to non unique index with identical multivalued index
     # attributes
     #
-- 
2.11.0


From 7a37e8359311fd9098c7fa6bfa9a6b663e97a985 Mon Sep 17 00:00:00 2001
From: Gary Lockyer <gary at catalyst.net.nz>
Date: Tue, 13 Mar 2018 12:43:25 +1300
Subject: [PATCH 12/23] ldb test: close pipes to stop forked tests failing on
 failure

Signed-off-by: Gary Lockyer <gary at catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb/tests/ldb_mod_op_test.c | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git a/lib/ldb/tests/ldb_mod_op_test.c b/lib/ldb/tests/ldb_mod_op_test.c
index dfc0209260e..23decd5b585 100644
--- a/lib/ldb/tests/ldb_mod_op_test.c
+++ b/lib/ldb/tests/ldb_mod_op_test.c
@@ -1524,6 +1524,7 @@ static int test_ldb_search_against_transaction_callback1(struct ldb_request *req
 		struct ldb_message *msg;
 		TALLOC_FREE(ctx->test_ctx->ldb);
 		TALLOC_FREE(ctx->test_ctx->ev);
+		close(pipes[0]);
 		ctx->test_ctx->ev = tevent_context_init(ctx->test_ctx);
 		if (ctx->test_ctx->ev == NULL) {
 			exit(LDB_ERR_OPERATIONS_ERROR);
@@ -1586,7 +1587,7 @@ static int test_ldb_search_against_transaction_callback1(struct ldb_request *req
 		ret = ldb_transaction_commit(ctx->test_ctx->ldb);
 		exit(ret);
 	}
-
+	close(pipes[1]);
 	ret = read(pipes[0], buf, 2);
 	assert_int_equal(ret, 2);
 
@@ -1760,6 +1761,7 @@ static int test_ldb_modify_during_search_callback1(struct ldb_request *req,
 		struct ldb_dn *dn, *new_dn;
 		TALLOC_FREE(ctx->test_ctx->ldb);
 		TALLOC_FREE(ctx->test_ctx->ev);
+		close(pipes[0]);
 		ctx->test_ctx->ev = tevent_context_init(ctx->test_ctx);
 		if (ctx->test_ctx->ev == NULL) {
 			exit(LDB_ERR_OPERATIONS_ERROR);
@@ -1826,6 +1828,7 @@ static int test_ldb_modify_during_search_callback1(struct ldb_request *req,
 		struct ldb_message_element *el;
 		TALLOC_FREE(ctx->test_ctx->ldb);
 		TALLOC_FREE(ctx->test_ctx->ev);
+		close(pipes[0]);
 		ctx->test_ctx->ev = tevent_context_init(ctx->test_ctx);
 		if (ctx->test_ctx->ev == NULL) {
 			exit(LDB_ERR_OPERATIONS_ERROR);
@@ -1901,6 +1904,7 @@ static int test_ldb_modify_during_search_callback1(struct ldb_request *req,
 	 * sending the "GO" as it is blocked at ldb_transaction_start().
 	 */
 
+	close(pipes[1]);
 	ret = read(pipes[0], buf, 2);
 	assert_int_equal(ret, 2);
 
@@ -2075,6 +2079,7 @@ static int test_ldb_modify_during_whole_search_callback1(struct ldb_request *req
 		struct ldb_message_element *el;
 		TALLOC_FREE(ctx->test_ctx->ldb);
 		TALLOC_FREE(ctx->test_ctx->ev);
+		close(pipes[0]);
 		ctx->test_ctx->ev = tevent_context_init(ctx->test_ctx);
 		if (ctx->test_ctx->ev == NULL) {
 			exit(LDB_ERR_OPERATIONS_ERROR);
@@ -2137,6 +2142,7 @@ static int test_ldb_modify_during_whole_search_callback1(struct ldb_request *req
 		exit(ret);
 	}
 
+	close(pipes[1]);
 	ret = read(pipes[0], buf, 2);
 	assert_int_equal(ret, 2);
 
@@ -2349,6 +2355,7 @@ static void test_ldb_modify_before_ldb_wait(void **state)
 		struct ldb_message_element *el;
 		TALLOC_FREE(search_test_ctx->ldb_test_ctx->ldb);
 		TALLOC_FREE(search_test_ctx->ldb_test_ctx->ev);
+		close(pipes[0]);
 		search_test_ctx->ldb_test_ctx->ev = tevent_context_init(search_test_ctx->ldb_test_ctx);
 		if (search_test_ctx->ldb_test_ctx->ev == NULL) {
 			exit(LDB_ERR_OPERATIONS_ERROR);
@@ -2417,6 +2424,7 @@ static void test_ldb_modify_before_ldb_wait(void **state)
 		ret = ldb_transaction_commit(search_test_ctx->ldb_test_ctx->ldb);
 		exit(ret);
 	}
+	close(pipes[1]);
 
 	ret = read(pipes[0], buf, 2);
 	assert_int_equal(ret, 2);
-- 
2.11.0


From 2d0e83b53517b06900e7b246b3cced81d8421f99 Mon Sep 17 00:00:00 2001
From: Gary Lockyer <gary at catalyst.net.nz>
Date: Tue, 13 Mar 2018 16:43:54 +1300
Subject: [PATCH 13/23] ldb-samba: require pid match for cached ldb

Signed-off-by: Gary Lockyer <gary at catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb-samba/ldb_wrap.c | 7 ++++++-
 1 file changed, 6 insertions(+), 1 deletion(-)

diff --git a/lib/ldb-samba/ldb_wrap.c b/lib/ldb-samba/ldb_wrap.c
index 53255556e7c..143e128d5d7 100644
--- a/lib/ldb-samba/ldb_wrap.c
+++ b/lib/ldb-samba/ldb_wrap.c
@@ -94,6 +94,8 @@ static struct ldb_wrap {
 		/* the context is what we use to tell if two ldb
 		 * connections are exactly equivalent
 		 */
+		pid_t pid; /* We want to re-open in a new PID due to
+			    * the LMDB backend */
 		const char *url;
 		struct tevent_context *ev;
 		struct loadparm_context *lp_ctx;
@@ -186,10 +188,12 @@ char *wrap_casefold(void *context, void *mem_ctx, const char *s, size_t n)
 				   struct cli_credentials *credentials,
 				   unsigned int flags)
 {
+	pid_t pid = getpid();
 	struct ldb_wrap *w;
 	/* see if we can re-use an existing ldb */
 	for (w=ldb_wrap_list; w; w=w->next) {
-		if (w->context.ev == ev &&
+		if (w->context.pid == pid &&
+		    w->context.ev == ev &&
 		    w->context.lp_ctx == lp_ctx &&
 		    w->context.session_info == session_info &&
 		    w->context.credentials == credentials &&
@@ -249,6 +253,7 @@ int samba_ldb_connect(struct ldb_context *ldb, struct loadparm_context *lp_ctx,
 		return false;
 	}
 
+	c.pid          = getpid();
 	c.url          = url;
 	c.ev           = ev;
 	c.lp_ctx       = lp_ctx;
-- 
2.11.0


From 615e6931b9e5dd55b08b04429430b35a3a13dc08 Mon Sep 17 00:00:00 2001
From: Gary Lockyer <gary at catalyst.net.nz>
Date: Tue, 20 Mar 2018 11:20:35 +1300
Subject: [PATCH 14/23] ldb tests: ldb_mod_op_test use correct ldb to create dn

Signed-off-by: Gary Lockyer <gary at catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb/tests/ldb_mod_op_test.c | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/lib/ldb/tests/ldb_mod_op_test.c b/lib/ldb/tests/ldb_mod_op_test.c
index 23decd5b585..6582f37f7d2 100644
--- a/lib/ldb/tests/ldb_mod_op_test.c
+++ b/lib/ldb/tests/ldb_mod_op_test.c
@@ -3088,7 +3088,7 @@ static void test_read_only(void **state)
 		msg = ldb_msg_new(tmp_ctx);
 		assert_non_null(msg);
 
-		msg->dn = ldb_dn_new_fmt(msg, ro_ldb, "dc=test");
+		msg->dn = ldb_dn_new_fmt(msg, rw_ldb, "dc=test");
 		assert_non_null(msg->dn);
 
 		ret = ldb_msg_add_string(msg, "cn", "test_cn_val");
-- 
2.11.0


From 7e5c36c96ee520f944f56273269e80fd70fc39ce Mon Sep 17 00:00:00 2001
From: Gary Lockyer <gary at catalyst.net.nz>
Date: Thu, 15 Mar 2018 11:33:32 +1300
Subject: [PATCH 15/23] ldb_tdb: ltdb_tdb_parse_record map tdb error codes

Signed-off-by: Gary Lockyer <gary at catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb/ldb_tdb/ldb_tdb.c | 9 +++++++--
 1 file changed, 7 insertions(+), 2 deletions(-)

diff --git a/lib/ldb/ldb_tdb/ldb_tdb.c b/lib/ldb/ldb_tdb/ldb_tdb.c
index 423b5a1f687..bfd3770c320 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.c
+++ b/lib/ldb/ldb_tdb/ldb_tdb.c
@@ -1882,9 +1882,14 @@ static int ltdb_tdb_parse_record(struct ltdb_private *ltdb,
 		.dptr = ldb_key.data,
 		.dsize = ldb_key.length
 	};
+	int ret;
 
-	return tdb_parse_record(ltdb->tdb, key, ltdb_tdb_parse_record_wrapper,
-				&kv_ctx);
+	ret = tdb_parse_record(ltdb->tdb, key, ltdb_tdb_parse_record_wrapper,
+			       &kv_ctx);
+	if (ret == 0) {
+		return LDB_SUCCESS;
+	}
+	return ltdb_err_map(tdb_error(ltdb->tdb));
 }
 
 static const char * ltdb_tdb_name(struct ltdb_private *ltdb)
-- 
2.11.0


From f860bf941d157a0f758b628bf182f9f71e1bc948 Mon Sep 17 00:00:00 2001
From: Gary Lockyer <gary at catalyst.net.nz>
Date: Thu, 15 Mar 2018 11:36:33 +1300
Subject: [PATCH 16/23] ldb_tdb: ltdb_tdb_store require active transaction

Signed-off-by: Gary Lockyer <gary at catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb/ldb_tdb/ldb_tdb.c | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/lib/ldb/ldb_tdb/ldb_tdb.c b/lib/ldb/ldb_tdb/ldb_tdb.c
index bfd3770c320..395663405b9 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.c
+++ b/lib/ldb/ldb_tdb/ldb_tdb.c
@@ -428,6 +428,9 @@ static int ltdb_tdb_store(struct ltdb_private *ltdb, struct ldb_val ldb_key,
 		.dptr = ldb_data.data,
 		.dsize = ldb_data.length
 	};
+	if (!tdb_transaction_active(ltdb->tdb)){
+		return LDB_ERR_PROTOCOL_ERROR;
+	}
 	return tdb_store(ltdb->tdb, key, data, flags);
 }
 
-- 
2.11.0


From 3ffd4e8d6abe7dac88ded7e644c2d5bb4de4ae86 Mon Sep 17 00:00:00 2001
From: Gary Lockyer <gary at catalyst.net.nz>
Date: Thu, 15 Mar 2018 11:37:06 +1300
Subject: [PATCH 17/23] ldb_tdb: ltdb_tdb_delete require active transaction

Signed-off-by: Gary Lockyer <gary at catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb/ldb_tdb/ldb_tdb.c | 3 +++
 1 file changed, 3 insertions(+)

diff --git a/lib/ldb/ldb_tdb/ldb_tdb.c b/lib/ldb/ldb_tdb/ldb_tdb.c
index 395663405b9..1151d203c48 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.c
+++ b/lib/ldb/ldb_tdb/ldb_tdb.c
@@ -673,6 +673,9 @@ static int ltdb_tdb_delete(struct ltdb_private *ltdb, struct ldb_val ldb_key)
 		.dptr = ldb_key.data,
 		.dsize = ldb_key.length
 	};
+	if (!tdb_transaction_active(ltdb->tdb)){
+		return LDB_ERR_PROTOCOL_ERROR;
+	}
 	return tdb_delete(ltdb->tdb, tdb_key);
 }
 
-- 
2.11.0


From db2f3cf054c0eadced7d0bed3c7dd65555f0b1c1 Mon Sep 17 00:00:00 2001
From: Gary Lockyer <gary at catalyst.net.nz>
Date: Tue, 20 Mar 2018 11:25:28 +1300
Subject: [PATCH 18/23] ldb: make backends expose if there is an active
 transaction

Signed-off-by: Gary Lockyer <gary at catalyst.net.nz>
Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb/ldb_tdb/ldb_tdb.c | 5 +++++
 lib/ldb/ldb_tdb/ldb_tdb.h | 1 +
 2 files changed, 6 insertions(+)

diff --git a/lib/ldb/ldb_tdb/ldb_tdb.c b/lib/ldb/ldb_tdb/ldb_tdb.c
index 1151d203c48..4d86a023a1f 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.c
+++ b/lib/ldb/ldb_tdb/ldb_tdb.c
@@ -1913,6 +1913,10 @@ static bool ltdb_tdb_changed(struct ltdb_private *ltdb)
 	return has_changed;
 }
 
+static bool ltdb_transaction_active(struct ltdb_private *ltdb) {
+	return tdb_transaction_active(ltdb->tdb);
+}
+
 static const struct kv_db_ops key_value_ops = {
 	.store = ltdb_tdb_store,
 	.delete = ltdb_tdb_delete,
@@ -1929,6 +1933,7 @@ static const struct kv_db_ops key_value_ops = {
 	.errorstr = ltdb_errorstr,
 	.name = ltdb_tdb_name,
 	.has_changed = ltdb_tdb_changed,
+	.transaction_active = ltdb_transaction_active,
 };
 
 static void ltdb_callback(struct tevent_context *ev,
diff --git a/lib/ldb/ldb_tdb/ldb_tdb.h b/lib/ldb/ldb_tdb/ldb_tdb.h
index f14666ba88a..72ebb9713f2 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.h
+++ b/lib/ldb/ldb_tdb/ldb_tdb.h
@@ -29,6 +29,7 @@ struct kv_db_ops {
 	const char * (*errorstr)(struct ltdb_private *ltdb);
 	const char * (*name)(struct ltdb_private *ltdb);
 	bool (*has_changed)(struct ltdb_private *ltdb);
+	bool (*transaction_active)(struct ltdb_private *ltdb);
 };
 
 /* this private structure is used by the ltdb backend in the
-- 
2.11.0


From 5bc925a17fc20a74f2771f5f7337e6821e865cc6 Mon Sep 17 00:00:00 2001
From: Gary Lockyer <gary at catalyst.net.nz>
Date: Fri, 23 Mar 2018 11:23:39 +1300
Subject: [PATCH 19/23] ldb: Unwind transaction counter if start_transaction
 fails

Signed-off-by: Andrew Bartlett <abartlet at samba.org>
Signed-off-by: Gary Lockyer <gary at catalyst.net.nz>
---
 lib/ldb/common/ldb.c | 1 +
 1 file changed, 1 insertion(+)

diff --git a/lib/ldb/common/ldb.c b/lib/ldb/common/ldb.c
index a4d9977d1b4..2249089d087 100644
--- a/lib/ldb/common/ldb.c
+++ b/lib/ldb/common/ldb.c
@@ -379,6 +379,7 @@ int ldb_transaction_start(struct ldb_context *ldb)
 				"ldb transaction start: %s (%d)",
 				ldb_strerror(status),
 				status);
+		ldb->transaction_active--;
 		}
 		if ((next_module && next_module->ldb->flags & LDB_FLG_ENABLE_TRACING)) {
 			ldb_debug(next_module->ldb, LDB_DEBUG_TRACE, "start ldb transaction error: %s",
-- 
2.11.0


From b06357c9286acd58ae2bab2afffb8a46f4e6ba7f Mon Sep 17 00:00:00 2001
From: Gary Lockyer <gary at catalyst.net.nz>
Date: Fri, 23 Mar 2018 11:27:10 +1300
Subject: [PATCH 20/23] ldb: Do not make search or DB modifications without a
 lock

The ldb_cache startup code would previously not take a read lock nor a sufficiently wide write transaction.

The new code takes a read lock, and if it needs to write takes a write lock (transaction)
and re-reads before continuing.

Signed-off-by: Gary Lockyer <gary at catalyst.net.nz>
Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb/ldb_tdb/ldb_cache.c | 50 ++++++++++++++++++++++++++++++++++-----------
 1 file changed, 38 insertions(+), 12 deletions(-)

diff --git a/lib/ldb/ldb_tdb/ldb_cache.c b/lib/ldb/ldb_tdb/ldb_cache.c
index 4790bcd7e53..0d16452b913 100644
--- a/lib/ldb/ldb_tdb/ldb_cache.c
+++ b/lib/ldb/ldb_tdb/ldb_cache.c
@@ -386,6 +386,7 @@ int ltdb_cache_load(struct ldb_module *module)
 	uint64_t seq;
 	struct ldb_message *baseinfo = NULL, *options = NULL;
 	const struct ldb_schema_attribute *a;
+	bool have_write_txn = false;
 	int r;
 
 	ldb = ldb_module_get_ctx(module);
@@ -406,29 +407,38 @@ int ltdb_cache_load(struct ldb_module *module)
 	baseinfo_dn = ldb_dn_new(baseinfo, ldb, LTDB_BASEINFO);
 	if (baseinfo_dn == NULL) goto failed;
 
+	r = ltdb->kv_ops->lock_read(module);
+	if (r != LDB_SUCCESS) {
+		goto failed;
+	}
 	r= ltdb_search_dn1(module, baseinfo_dn, baseinfo, 0);
 	if (r != LDB_SUCCESS && r != LDB_ERR_NO_SUCH_OBJECT) {
-		goto failed;
+		goto failed_and_unlock;
 	}
-	
+
 	/* possibly initialise the baseinfo */
 	if (r == LDB_ERR_NO_SUCH_OBJECT) {
 
+		/* Give up the read lock, try again with a write lock */
+		r = ltdb->kv_ops->unlock_read(module);
+		if (r != LDB_SUCCESS) {
+			goto failed;
+		}
+		
 		if (ltdb->kv_ops->begin_write(ltdb) != 0) {
 			goto failed;
 		}
 
+		have_write_txn = true;
+		
 		/* error handling for ltdb_baseinfo_init() is by
 		   looking for the record again. */
 		ltdb_baseinfo_init(module);
 
-		if (ltdb->kv_ops->finish_write(ltdb) != 0) {
-			goto failed;
-		}
-
 		if (ltdb_search_dn1(module, baseinfo_dn, baseinfo, 0) != LDB_SUCCESS) {
-			goto failed;
+			goto failed_and_unlock;
 		}
+
 	}
 
 	/* Ignore the result, and update the sequence number */
@@ -443,16 +453,17 @@ int ltdb_cache_load(struct ldb_module *module)
 	ltdb->sequence_number = seq;
 
 	/* Read an interpret database options */
+	
 	options = ldb_msg_new(ltdb->cache);
-	if (options == NULL) goto failed;
+	if (options == NULL) goto failed_and_unlock;
 
 	options_dn = ldb_dn_new(options, ldb, LTDB_OPTIONS);
-	if (options_dn == NULL) goto failed;
+	if (options_dn == NULL) goto failed_and_unlock;
 
 	r= ltdb_search_dn1(module, options_dn, options, 0);
 	talloc_free(options_dn);
 	if (r != LDB_SUCCESS && r != LDB_ERR_NO_SUCH_OBJECT) {
-		goto failed;
+		goto failed_and_unlock;
 	}
 	
 	/* set flags if they do exist */
@@ -479,7 +490,7 @@ int ltdb_cache_load(struct ldb_module *module)
 	ltdb_attributes_unload(module);
 
 	if (ltdb_index_load(module, ltdb) == -1) {
-		goto failed;
+		goto failed_and_unlock;
 	}
 
 	/*
@@ -488,7 +499,7 @@ int ltdb_cache_load(struct ldb_module *module)
 	 * partition module.
 	 */
 	if (ltdb_attributes_load(module) == -1) {
-		goto failed;
+		goto failed_and_unlock;
 	}
 
 	ltdb->GUID_index_syntax = NULL;
@@ -503,10 +514,25 @@ int ltdb_cache_load(struct ldb_module *module)
 	}
 
 done:
+	if (have_write_txn) {
+		if (ltdb->kv_ops->finish_write(ltdb) != 0) {
+			goto failed;
+		}
+	} else {
+		ltdb->kv_ops->unlock_read(module);
+	}
+	
 	talloc_free(options);
 	talloc_free(baseinfo);
 	return 0;
 
+failed_and_unlock:
+	if (have_write_txn) {
+		ltdb->kv_ops->abort_write(ltdb);
+	} else {
+		ltdb->kv_ops->unlock_read(module);
+	}
+	
 failed:
 	talloc_free(options);
 	talloc_free(baseinfo);
-- 
2.11.0


From 435f6be561a3c73cd0100b561960cfc0c171bfb3 Mon Sep 17 00:00:00 2001
From: Gary Lockyer <gary at catalyst.net.nz>
Date: Fri, 23 Mar 2018 11:28:18 +1300
Subject: [PATCH 21/23] ldb_tdb: Disallow TDB nested transactions and use
 tdb_transaction_active()

This avoids keeping a counter, which can be error-prone.

Signed-off-by: Andrew Bartlett <abartlet at samba.org>
Signed-off-by: Gary Lockyer <gary at catalyst.net.nz>
---
 lib/ldb/ldb_tdb/ldb_tdb.c | 27 +++++++++++++++------------
 lib/ldb/ldb_tdb/ldb_tdb.h |  1 -
 2 files changed, 15 insertions(+), 13 deletions(-)

diff --git a/lib/ldb/ldb_tdb/ldb_tdb.c b/lib/ldb/ldb_tdb/ldb_tdb.c
index 4d86a023a1f..83dc38bdcec 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.c
+++ b/lib/ldb/ldb_tdb/ldb_tdb.c
@@ -101,7 +101,7 @@ static int ltdb_lock_read(struct ldb_module *module)
 	int tdb_ret = 0;
 	int ret;
 
-	if (ltdb->in_transaction == 0 &&
+	if (tdb_transaction_active(ltdb->tdb) == false &&
 	    ltdb->read_lock_count == 0) {
 		tdb_ret = tdb_lockall_read(ltdb->tdb);
 	}
@@ -128,7 +128,7 @@ static int ltdb_unlock_read(struct ldb_module *module)
 {
 	void *data = ldb_module_get_private(module);
 	struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
-	if (ltdb->in_transaction == 0 && ltdb->read_lock_count == 1) {
+	if (!tdb_transaction_active(ltdb->tdb) && ltdb->read_lock_count == 1) {
 		tdb_unlockall_read(ltdb->tdb);
 		ltdb->read_lock_count--;
 		return 0;
@@ -379,7 +379,7 @@ static int ltdb_modified(struct ldb_module *module, struct ldb_dn *dn)
 
 	/* only allow modifies inside a transaction, otherwise the
 	 * ldb is unsafe */
-	if (ltdb->in_transaction == 0) {
+	if (ltdb->kv_ops->transaction_active(ltdb) == false) {
 		ldb_set_errstring(ldb_module_get_ctx(module), "ltdb modify without transaction");
 		return LDB_ERR_OPERATIONS_ERROR;
 	}
@@ -1467,7 +1467,6 @@ static int ltdb_start_trans(struct ldb_module *module)
 		return ltdb->kv_ops->error(ltdb);
 	}
 
-	ltdb->in_transaction++;
 
 	ltdb_index_transaction_start(module);
 
@@ -1488,8 +1487,11 @@ static int ltdb_prepare_commit(struct ldb_module *module)
 	void *data = ldb_module_get_private(module);
 	struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
 
-	if (ltdb->in_transaction != 1) {
-		return LDB_SUCCESS;
+	if (!ltdb->kv_ops->transaction_active(ltdb)) {
+		ldb_set_errstring(ldb_module_get_ctx(module),
+				  "ltdb_prepare_commit() called "
+				  "without transaction active");
+		return LDB_ERR_OPERATIONS_ERROR;
 	}
 
 	/*
@@ -1513,13 +1515,11 @@ static int ltdb_prepare_commit(struct ldb_module *module)
 	ret = ltdb_index_transaction_commit(module);
 	if (ret != LDB_SUCCESS) {
 		ltdb->kv_ops->abort_write(ltdb);
-		ltdb->in_transaction--;
 		return ret;
 	}
 
 	if (ltdb->kv_ops->prepare_write(ltdb) != 0) {
 		ret = ltdb->kv_ops->error(ltdb);
-		ltdb->in_transaction--;
 		ldb_debug_set(ldb_module_get_ctx(module),
 			      LDB_DEBUG_FATAL,
 			      "Failure during "
@@ -1547,7 +1547,6 @@ static int ltdb_end_trans(struct ldb_module *module)
 		}
 	}
 
-	ltdb->in_transaction--;
 	ltdb->prepared_commit = false;
 
 	if (ltdb->kv_ops->finish_write(ltdb) != 0) {
@@ -1567,7 +1566,6 @@ static int ltdb_del_trans(struct ldb_module *module)
 	void *data = ldb_module_get_private(module);
 	struct ltdb_private *ltdb = talloc_get_type(data, struct ltdb_private);
 
-	ltdb->in_transaction--;
 
 	if (ltdb_index_transaction_cancel(module) != 0) {
 		ltdb->kv_ops->abort_write(ltdb);
@@ -1797,7 +1795,7 @@ static int ltdb_tdb_traverse_fn(struct ltdb_private *ltdb, ldb_kv_traverse_fn fn
 		.ctx = ctx,
 		.ltdb = ltdb
 	};
-	if (ltdb->in_transaction != 0) {
+	if (tdb_transaction_active(ltdb->tdb)) {
 		return tdb_traverse(ltdb->tdb, ldb_tdb_traverse_fn_wrapper, &kv_ctx);
 	} else {
 		return tdb_traverse_read(ltdb->tdb, ldb_tdb_traverse_fn_wrapper, &kv_ctx);
@@ -1890,6 +1888,11 @@ static int ltdb_tdb_parse_record(struct ltdb_private *ltdb,
 	};
 	int ret;
 
+	if (tdb_transaction_active(ltdb->tdb) == false &&
+	    ltdb->read_lock_count == 0) {
+		return LDB_ERR_PROTOCOL_ERROR;
+	}
+
 	ret = tdb_parse_record(ltdb->tdb, key, ltdb_tdb_parse_record_wrapper,
 			       &kv_ctx);
 	if (ret == 0) {
@@ -2196,7 +2199,7 @@ int ltdb_connect(struct ldb_context *ldb, const char *url,
 		path = url;
 	}
 
-	tdb_flags = TDB_DEFAULT | TDB_SEQNUM;
+	tdb_flags = TDB_DEFAULT | TDB_SEQNUM | TDB_DISALLOW_NESTING;
 
 	/* check for the 'nosync' option */
 	if (flags & LDB_FLG_NOSYNC) {
diff --git a/lib/ldb/ldb_tdb/ldb_tdb.h b/lib/ldb/ldb_tdb/ldb_tdb.h
index 72ebb9713f2..4d531208da6 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.h
+++ b/lib/ldb/ldb_tdb/ldb_tdb.h
@@ -53,7 +53,6 @@ struct ltdb_private {
 		const char *GUID_index_dn_component;
 	} *cache;
 
-	int in_transaction;
 
 	bool check_base;
 	bool disallow_dn_filter;
-- 
2.11.0


From eb82fa0f089a35e66e2855e0b6ec4b56a70ac303 Mon Sep 17 00:00:00 2001
From: Gary Lockyer <gary at catalyst.net.nz>
Date: Tue, 20 Mar 2018 12:14:10 +1300
Subject: [PATCH 22/23] ldb tests: api ensure database correctly populated

Signed-off-by: Gary Lockyer <gary at catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb/tests/python/api.py | 13 +++++++++++++
 1 file changed, 13 insertions(+)

diff --git a/lib/ldb/tests/python/api.py b/lib/ldb/tests/python/api.py
index 12409a8c991..6a3790afa36 100755
--- a/lib/ldb/tests/python/api.py
+++ b/lib/ldb/tests/python/api.py
@@ -1221,6 +1221,19 @@ class AddModifyTests(LdbBaseTest):
                     "name": b"Admins",
                     "x": "z", "y": "a",
                     "objectUUID": b"0123456789abcde2"})
+
+        res2 = self.l.search(base="DC=SAMBA,DC=ORG",
+                             scope=ldb.SCOPE_SUBTREE,
+                             expression="(objectUUID=0123456789abcde1)")
+        self.assertEqual(len(res2), 1)
+        self.assertEqual(str(res2[0].dn), "OU=DUP,DC=SAMBA,DC=ORG")
+
+        res3 = self.l.search(base="DC=SAMBA,DC=ORG",
+                             scope=ldb.SCOPE_SUBTREE,
+                             expression="(objectUUID=0123456789abcde2)")
+        self.assertEqual(len(res3), 1)
+        self.assertEqual(str(res3[0].dn), "OU=DUP2,DC=SAMBA,DC=ORG")
+
         try:
             self.l.rename("OU=DUP,DC=SAMBA,DC=ORG",
                           "OU=DUP2,DC=SAMBA,DC=ORG")
-- 
2.11.0


From 7c0d9a26fa943cf7f8aa584b770e9187319bacfe Mon Sep 17 00:00:00 2001
From: Gary Lockyer <gary at catalyst.net.nz>
Date: Tue, 20 Mar 2018 12:15:12 +1300
Subject: [PATCH 23/23] ldb tests: add cmocka tests of kv operations

Add tests for the behaviour the ldb layer expects the key value layer to
provide.  This should make it easier to add another KV store

Signed-off-by: Gary Lockyer <gary at catalyst.net.nz>
Reviewed-by: Andrew Bartlett <abartlet at samba.org>
---
 lib/ldb/tests/ldb_kv_ops_test.c | 1579 +++++++++++++++++++++++++++++++++++++++
 lib/ldb/wscript                 |    9 +-
 2 files changed, 1587 insertions(+), 1 deletion(-)
 create mode 100644 lib/ldb/tests/ldb_kv_ops_test.c

diff --git a/lib/ldb/tests/ldb_kv_ops_test.c b/lib/ldb/tests/ldb_kv_ops_test.c
new file mode 100644
index 00000000000..fda49c74d04
--- /dev/null
+++ b/lib/ldb/tests/ldb_kv_ops_test.c
@@ -0,0 +1,1579 @@
+/*
+ * Tests exercising the ldb key value operations.
+ *
+ *  Copyright (C) Andrew Bartlett <abartlet at samba.org> 2018
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+/*
+ * from cmocka.c:
+ * These headers or their equivalents should be included prior to
+ * including
+ * this header file.
+ *
+ * #include <stdarg.h>
+ * #include <stddef.h>
+ * #include <setjmp.h>
+ *
+ * This allows test applications to use custom definitions of C standard
+ * library functions and types.
+ *
+ */
+
+/*
+ * A KV module is expected to have the following behaviour
+ *
+ * - A transaction must be open to perform any read, write or delete operation
+ * - Writes and Deletes should not be visible until a transaction is commited
+ * - Nested transactions are not permitted
+ * - transactions can be rolled back and commited.
+ * - supports iteration over all records in the database
+ * - supports the update_in_iterate operation allowing entries to be
+ *   re-keyed.
+ */
+#include <stdarg.h>
+#include <stddef.h>
+#include <setjmp.h>
+#include <cmocka.h>
+
+#include <errno.h>
+#include <unistd.h>
+#include <talloc.h>
+
+#define TEVENT_DEPRECATED 1
+#include <tevent.h>
+
+#include <ldb.h>
+#include <ldb_module.h>
+#include <ldb_private.h>
+#include <string.h>
+#include <ctype.h>
+
+#include <sys/wait.h>
+
+#include "lmdb.h"
+#include "ldb_tdb/ldb_tdb.h"
+
+
+#define DEFAULT_BE  "tdb"
+
+#ifndef TEST_BE
+#define TEST_BE DEFAULT_BE
+#endif /* TEST_BE */
+
+#define NUM_RECS 1024
+
+
+struct test_ctx {
+	struct tevent_context *ev;
+	struct ldb_context *ldb;
+
+	const char *dbfile;
+	const char *lockfile;   /* lockfile is separate */
+
+	const char *dbpath;
+};
+
+static void unlink_old_db(struct test_ctx *test_ctx)
+{
+	int ret;
+
+	errno = 0;
+	ret = unlink(test_ctx->lockfile);
+	if (ret == -1 && errno != ENOENT) {
+		fail();
+	}
+
+	errno = 0;
+	ret = unlink(test_ctx->dbfile);
+	if (ret == -1 && errno != ENOENT) {
+		fail();
+	}
+}
+
+static int noconn_setup(void **state)
+{
+	struct test_ctx *test_ctx;
+
+	test_ctx = talloc_zero(NULL, struct test_ctx);
+	assert_non_null(test_ctx);
+
+	test_ctx->ev = tevent_context_init(test_ctx);
+	assert_non_null(test_ctx->ev);
+
+	test_ctx->ldb = ldb_init(test_ctx, test_ctx->ev);
+	assert_non_null(test_ctx->ldb);
+
+	test_ctx->dbfile = talloc_strdup(test_ctx, "kvopstest.ldb");
+	assert_non_null(test_ctx->dbfile);
+
+	test_ctx->lockfile = talloc_asprintf(test_ctx, "%s-lock",
+					     test_ctx->dbfile);
+	assert_non_null(test_ctx->lockfile);
+
+	test_ctx->dbpath = talloc_asprintf(test_ctx,
+			TEST_BE"://%s", test_ctx->dbfile);
+	assert_non_null(test_ctx->dbpath);
+
+	unlink_old_db(test_ctx);
+	*state = test_ctx;
+	return 0;
+}
+
+static int noconn_teardown(void **state)
+{
+	struct test_ctx *test_ctx = talloc_get_type_abort(*state,
+							  struct test_ctx);
+
+	unlink_old_db(test_ctx);
+	talloc_free(test_ctx);
+	return 0;
+}
+
+static int setup(void **state)
+{
+	struct test_ctx *test_ctx;
+	int ret;
+	struct ldb_ldif *ldif;
+	const char *index_ldif =		\
+		"dn: @INDEXLIST\n"
+		"@IDXGUID: objectUUID\n"
+		"@IDX_DN_GUID: GUID\n"
+		"\n";
+
+	noconn_setup((void **) &test_ctx);
+
+	ret = ldb_connect(test_ctx->ldb, test_ctx->dbpath, 0, NULL);
+	assert_int_equal(ret, 0);
+
+	while ((ldif = ldb_ldif_read_string(test_ctx->ldb, &index_ldif))) {
+		ret = ldb_add(test_ctx->ldb, ldif->msg);
+		assert_int_equal(ret, LDB_SUCCESS);
+	}
+	*state = test_ctx;
+	return 0;
+}
+
+static int teardown(void **state)
+{
+	struct test_ctx *test_ctx = talloc_get_type_abort(*state,
+							  struct test_ctx);
+	noconn_teardown((void **) &test_ctx);
+	return 0;
+}
+
+static struct ltdb_private *get_ltdb(struct ldb_context *ldb)
+{
+	void *data = NULL;
+	struct ltdb_private *ltdb = NULL;
+
+	data = ldb_module_get_private(ldb->modules);
+	assert_non_null(data);
+
+	ltdb = talloc_get_type(data, struct ltdb_private);
+	assert_non_null(ltdb);
+
+	return ltdb;
+}
+
+static int parse(struct ldb_val key,
+		 struct ldb_val data,
+		 void *private_data)
+{
+	struct ldb_val* read = private_data;
+
+	/* Yes, we essentially leak this.  That is OK */
+	read->data = talloc_size(talloc_autofree_context(),
+				 data.length);
+	assert_non_null(read->data);
+	
+	memcpy(read->data, data.data, data.length);
+	read->length = data.length;
+	return LDB_SUCCESS;
+}
+
+/*
+ * Test that data can be written to the kv store and be read back.
+ */
+static void test_add_get(void **state)
+{
+	int ret;
+	struct test_ctx *test_ctx = talloc_get_type_abort(*state,
+							  struct test_ctx);
+	struct ltdb_private *ltdb = get_ltdb(test_ctx->ldb);
+	uint8_t key_val[] = "TheKey";
+	struct ldb_val key = {
+		.data   = key_val,
+		.length = sizeof(key_val)
+	};
+
+	uint8_t value[] = "The record contents";
+	struct ldb_val data = {
+		.data    = value,
+		.length = sizeof(value)
+	};
+
+	struct ldb_val read;
+
+	int flags = 0;
+	TALLOC_CTX *tmp_ctx;
+
+	tmp_ctx = talloc_new(test_ctx);
+	assert_non_null(tmp_ctx);
+
+	/*
+	 * Begin a transaction
+	 */
+	ret = ltdb->kv_ops->begin_write(ltdb);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * Write the record
+	 */
+	ret = ltdb->kv_ops->store(ltdb, key, data, flags);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * Commit the transaction
+	 */
+	ret = ltdb->kv_ops->finish_write(ltdb);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * And now read it back
+	 */
+	ret = ltdb->kv_ops->lock_read(test_ctx->ldb->modules);
+	assert_int_equal(ret, 0);
+
+	ret = ltdb->kv_ops->fetch_and_parse(ltdb, key, parse, &read);
+	assert_int_equal(ret, 0);
+
+	assert_int_equal(sizeof(value), read.length);
+	assert_memory_equal(value, read.data, sizeof(value));
+
+	ret = ltdb->kv_ops->unlock_read(test_ctx->ldb->modules);
+	assert_int_equal(ret, 0);
+	talloc_free(tmp_ctx);
+}
+/*
+ * Test that attempts to read data without a read transaction fail.
+ */
+static void test_read_outside_transaction(void **state)
+{
+	int ret;
+	struct test_ctx *test_ctx = talloc_get_type_abort(*state,
+							  struct test_ctx);
+	struct ltdb_private *ltdb = get_ltdb(test_ctx->ldb);
+	uint8_t key_val[] = "TheKey";
+	struct ldb_val key = {
+		.data   = key_val,
+		.length = sizeof(key_val)
+	};
+
+	uint8_t value[] = "The record contents";
+	struct ldb_val data = {
+		.data    = value,
+		.length = sizeof(value)
+	};
+
+	struct ldb_val read;
+
+	int flags = 0;
+	TALLOC_CTX *tmp_ctx;
+
+	tmp_ctx = talloc_new(test_ctx);
+	assert_non_null(tmp_ctx);
+
+	/*
+	 * Begin a transaction
+	 */
+	ret = ltdb->kv_ops->begin_write(ltdb);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * Write the record
+	 */
+	ret = ltdb->kv_ops->store(ltdb, key, data, flags);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * Commit the transaction
+	 */
+	ret = ltdb->kv_ops->finish_write(ltdb);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * And now read it back
+	 * Note there is no read transaction active
+	 */
+	ret = ltdb->kv_ops->fetch_and_parse(ltdb, key, parse, &read);
+	assert_int_equal(ret, LDB_ERR_PROTOCOL_ERROR);
+
+	talloc_free(tmp_ctx);
+}
+
+/*
+ * Test that data can be deleted from the kv store
+ */
+static void test_delete(void **state)
+{
+	int ret;
+	struct test_ctx *test_ctx = talloc_get_type_abort(*state,
+							  struct test_ctx);
+	struct ltdb_private *ltdb = get_ltdb(test_ctx->ldb);
+	uint8_t key_val[] = "TheKey";
+	struct ldb_val key = {
+		.data   = key_val,
+		.length = sizeof(key_val)
+	};
+
+	uint8_t value[] = "The record contents";
+	struct ldb_val data = {
+		.data    = value,
+		.length = sizeof(value)
+	};
+
+	struct ldb_val read;
+
+	int flags = 0;
+	TALLOC_CTX *tmp_ctx;
+
+	tmp_ctx = talloc_new(test_ctx);
+	assert_non_null(tmp_ctx);
+
+	/*
+	 * Begin a transaction
+	 */
+	ret = ltdb->kv_ops->begin_write(ltdb);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * Write the record
+	 */
+	ret = ltdb->kv_ops->store(ltdb, key, data, flags);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * Commit the transaction
+	 */
+	ret = ltdb->kv_ops->finish_write(ltdb);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * And now read it back
+	 */
+	ret = ltdb->kv_ops->lock_read(test_ctx->ldb->modules);
+	assert_int_equal(ret, 0);
+	ret = ltdb->kv_ops->fetch_and_parse(ltdb, key, parse, &read);
+	assert_int_equal(ret, 0);
+	assert_int_equal(sizeof(value), read.length);
+	assert_memory_equal(value, read.data, sizeof(value));
+	ret = ltdb->kv_ops->unlock_read(test_ctx->ldb->modules);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * Begin a transaction
+	 */
+	ret = ltdb->kv_ops->begin_write(ltdb);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * Now delete it.
+	 */
+	ret = ltdb->kv_ops->delete(ltdb, key);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * Commit the transaction
+	 */
+	ret = ltdb->kv_ops->finish_write(ltdb);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * And now try to read it back
+	 */
+	ret = ltdb->kv_ops->lock_read(test_ctx->ldb->modules);
+	assert_int_equal(ret, 0);
+	ret = ltdb->kv_ops->fetch_and_parse(ltdb, key, parse, &read);
+	assert_int_equal(ret, LDB_ERR_NO_SUCH_OBJECT);
+	ret = ltdb->kv_ops->unlock_read(test_ctx->ldb->modules);
+	assert_int_equal(ret, 0);
+
+	talloc_free(tmp_ctx);
+}
+
+/*
+ * Check that writes are correctly rolled back when a transaction
+ * is rolled back.
+ */
+static void test_transaction_abort_write(void **state)
+{
+	int ret;
+	struct test_ctx *test_ctx = talloc_get_type_abort(*state,
+							  struct test_ctx);
+	struct ltdb_private *ltdb = get_ltdb(test_ctx->ldb);
+	uint8_t key_val[] = "TheKey";
+	struct ldb_val key = {
+		.data   = key_val,
+		.length = sizeof(key_val)
+	};
+
+	uint8_t value[] = "The record contents";
+	struct ldb_val data = {
+		.data    = value,
+		.length = sizeof(value)
+	};
+
+	struct ldb_val read;
+
+	int flags = 0;
+	TALLOC_CTX *tmp_ctx;
+
+	tmp_ctx = talloc_new(test_ctx);
+	assert_non_null(tmp_ctx);
+
+	/*
+	 * Begin a transaction
+	 */
+	ret = ltdb->kv_ops->begin_write(ltdb);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * Write the record
+	 */
+	ret = ltdb->kv_ops->store(ltdb, key, data, flags);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * And now read it back
+	 */
+	ret = ltdb->kv_ops->fetch_and_parse(ltdb, key, parse, &read);
+	assert_int_equal(ret, 0);
+	assert_int_equal(sizeof(value), read.length);
+	assert_memory_equal(value, read.data, sizeof(value));
+
+
+	/*
+	 * Now abort the transaction
+	 */
+	ret = ltdb->kv_ops->abort_write(ltdb);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * And now read it back, should not be there
+	 */
+	ret = ltdb->kv_ops->lock_read(test_ctx->ldb->modules);
+	assert_int_equal(ret, 0);
+	ret = ltdb->kv_ops->fetch_and_parse(ltdb, key, parse, &read);
+	assert_int_equal(ret, LDB_ERR_NO_SUCH_OBJECT);
+	ret = ltdb->kv_ops->unlock_read(test_ctx->ldb->modules);
+	assert_int_equal(ret, 0);
+
+	talloc_free(tmp_ctx);
+}
+
+/*
+ * Check that deletes are correctly rolled back when a transaction is
+ * aborted.
+ */
+static void test_transaction_abort_delete(void **state)
+{
+	int ret;
+	struct test_ctx *test_ctx = talloc_get_type_abort(*state,
+							  struct test_ctx);
+	struct ltdb_private *ltdb = get_ltdb(test_ctx->ldb);
+	uint8_t key_val[] = "TheKey";
+	struct ldb_val key = {
+		.data   = key_val,
+		.length = sizeof(key_val)
+	};
+
+	uint8_t value[] = "The record contents";
+	struct ldb_val data = {
+		.data    = value,
+		.length = sizeof(value)
+	};
+
+	struct ldb_val read;
+
+	int flags = 0;
+	TALLOC_CTX *tmp_ctx;
+
+	tmp_ctx = talloc_new(test_ctx);
+	assert_non_null(tmp_ctx);
+
+	/*
+	 * Begin a transaction
+	 */
+	ret = ltdb->kv_ops->begin_write(ltdb);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * Write the record
+	 */
+	ret = ltdb->kv_ops->store(ltdb, key, data, flags);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * Commit the transaction
+	 */
+	ret = ltdb->kv_ops->finish_write(ltdb);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * And now read it back
+	 */
+	ret = ltdb->kv_ops->lock_read(test_ctx->ldb->modules);
+	assert_int_equal(ret, 0);
+	ret = ltdb->kv_ops->fetch_and_parse(ltdb, key, parse, &read);
+	assert_int_equal(ret, 0);
+	assert_int_equal(sizeof(value), read.length);
+	assert_memory_equal(value, read.data, sizeof(value));
+	ret = ltdb->kv_ops->unlock_read(test_ctx->ldb->modules);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * Begin a transaction
+	 */
+	ret = ltdb->kv_ops->begin_write(ltdb);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * Now delete it.
+	 */
+	ret = ltdb->kv_ops->delete(ltdb, key);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * And now read it back
+	 */
+	ret = ltdb->kv_ops->fetch_and_parse(ltdb, key, parse, &read);
+	assert_int_equal(ret, LDB_ERR_NO_SUCH_OBJECT);
+
+	/*
+	 * Abort the transaction
+	 */
+	ret = ltdb->kv_ops->abort_write(ltdb);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * And now try to read it back
+	 */
+	ret = ltdb->kv_ops->lock_read(test_ctx->ldb->modules);
+	assert_int_equal(ret, 0);
+	ret = ltdb->kv_ops->fetch_and_parse(ltdb, key, parse, &read);
+	assert_int_equal(ret, 0);
+	assert_int_equal(sizeof(value), read.length);
+	assert_memory_equal(value, read.data, sizeof(value));
+	ret = ltdb->kv_ops->unlock_read(test_ctx->ldb->modules);
+	assert_int_equal(ret, 0);
+
+	talloc_free(tmp_ctx);
+}
+
+/*
+ * Test that writes outside a transaction fail
+ */
+static void test_write_outside_transaction(void **state)
+{
+	int ret;
+	struct test_ctx *test_ctx = talloc_get_type_abort(*state,
+							  struct test_ctx);
+	struct ltdb_private *ltdb = get_ltdb(test_ctx->ldb);
+	uint8_t key_val[] = "TheKey";
+	struct ldb_val key = {
+		.data   = key_val,
+		.length = sizeof(key_val)
+	};
+
+	uint8_t value[] = "The record contents";
+	struct ldb_val data = {
+		.data    = value,
+		.length = sizeof(value)
+	};
+
+
+	int flags = 0;
+	TALLOC_CTX *tmp_ctx;
+
+	tmp_ctx = talloc_new(test_ctx);
+	assert_non_null(tmp_ctx);
+
+	/*
+	 * Attempt to write the record
+	 */
+	ret = ltdb->kv_ops->store(ltdb, key, data, flags);
+	assert_int_equal(ret, LDB_ERR_PROTOCOL_ERROR);
+
+	talloc_free(tmp_ctx);
+}
+
+/*
+ * Test data can not be deleted outside a transaction
+ */
+static void test_delete_outside_transaction(void **state)
+{
+	int ret;
+	struct test_ctx *test_ctx = talloc_get_type_abort(*state,
+							  struct test_ctx);
+	struct ltdb_private *ltdb = get_ltdb(test_ctx->ldb);
+	uint8_t key_val[] = "TheKey";
+	struct ldb_val key = {
+		.data   = key_val,
+		.length = sizeof(key_val)
+	};
+
+	uint8_t value[] = "The record contents";
+	struct ldb_val data = {
+		.data    = value,
+		.length = sizeof(value)
+	};
+
+	struct ldb_val read;
+
+	int flags = 0;
+	TALLOC_CTX *tmp_ctx;
+
+	tmp_ctx = talloc_new(test_ctx);
+	assert_non_null(tmp_ctx);
+
+	/*
+	 * Begin a transaction
+	 */
+	ret = ltdb->kv_ops->begin_write(ltdb);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * Write the record
+	 */
+	ret = ltdb->kv_ops->store(ltdb, key, data, flags);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * Commit the transaction
+	 */
+	ret = ltdb->kv_ops->finish_write(ltdb);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * And now read it back
+	 */
+	ret = ltdb->kv_ops->lock_read(test_ctx->ldb->modules);
+	assert_int_equal(ret, 0);
+	ret = ltdb->kv_ops->fetch_and_parse(ltdb, key, parse, &read);
+	assert_int_equal(ret, 0);
+	assert_int_equal(sizeof(value), read.length);
+	assert_memory_equal(value, read.data, sizeof(value));
+	ret = ltdb->kv_ops->unlock_read(test_ctx->ldb->modules);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * Now attempt to delete a record
+	 */
+	ret = ltdb->kv_ops->delete(ltdb, key);
+	assert_int_equal(ret, LDB_ERR_PROTOCOL_ERROR);
+
+	/*
+	 * And now read it back
+	 */
+	ret = ltdb->kv_ops->lock_read(test_ctx->ldb->modules);
+	assert_int_equal(ret, 0);
+	ret = ltdb->kv_ops->fetch_and_parse(ltdb, key, parse, &read);
+	assert_int_equal(ret, 0);
+	assert_int_equal(sizeof(value), read.length);
+	assert_memory_equal(value, read.data, sizeof(value));
+	ret = ltdb->kv_ops->unlock_read(test_ctx->ldb->modules);
+	assert_int_equal(ret, 0);
+
+	talloc_free(tmp_ctx);
+}
+
+static int traverse_fn(struct ltdb_private *ltdb,
+		       struct ldb_val key,
+		       struct ldb_val data,
+		       void *ctx) {
+
+	int *visits = ctx;
+	int i;
+
+	if (strncmp("key ", (char *) key.data, 4) == 0) {
+		i = strtol((char *) &key.data[4], NULL, 10);
+		visits[i]++;
+	}
+	return LDB_SUCCESS;
+}
+
+
+/*
+ * Test that iterate visits all the records.
+ */
+static void test_iterate(void **state)
+{
+	int ret;
+	struct test_ctx *test_ctx = talloc_get_type_abort(*state,
+							  struct test_ctx);
+	struct ltdb_private *ltdb = get_ltdb(test_ctx->ldb);
+	int i;
+	int num_recs = 1024;
+	int visits[num_recs];
+
+	TALLOC_CTX *tmp_ctx;
+
+	tmp_ctx = talloc_new(test_ctx);
+	assert_non_null(tmp_ctx);
+
+	/*
+	 * Begin a transaction
+	 */
+	ret = ltdb->kv_ops->begin_write(ltdb);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * Write the records
+	 */
+	for (i = 0; i < num_recs; i++) {
+		struct ldb_val key;
+		struct ldb_val rec;
+		int flags = 0;
+
+		visits[i] = 0;
+		key.data   = (uint8_t *)talloc_asprintf(tmp_ctx, "key %04d", i);
+		key.length = strlen((char *)key.data) + 1;
+
+		rec.data = (uint8_t *) talloc_asprintf(tmp_ctx,
+						       "data for record (%04d)",
+						       i);
+		rec.length = strlen((char *)rec.data) + 1;
+
+		ret = ltdb->kv_ops->store(ltdb, key, rec, flags);
+		assert_int_equal(ret, 0);
+
+		TALLOC_FREE(key.data);
+		TALLOC_FREE(rec.data);
+	}
+
+	/*
+	 * Commit the transaction
+	 */
+	ret = ltdb->kv_ops->finish_write(ltdb);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * Now iterate over the kv store and ensure that all the
+	 * records are visited.
+	 */
+	ret = ltdb->kv_ops->lock_read(test_ctx->ldb->modules);
+	assert_int_equal(ret, 0);
+	ret = ltdb->kv_ops->iterate(ltdb, traverse_fn, visits);
+	for (i = 0; i <num_recs; i++) {
+		assert_int_equal(1, visits[i]);
+	}
+	ret = ltdb->kv_ops->unlock_read(test_ctx->ldb->modules);
+	assert_int_equal(ret, 0);
+
+	TALLOC_FREE(tmp_ctx);
+}
+
+struct update_context {
+	struct ldb_context* ldb;
+	int visits[NUM_RECS];
+};
+
+static int update_fn(struct ltdb_private *ltdb,
+		     struct ldb_val key,
+		     struct ldb_val data,
+		     void *ctx) {
+
+	struct ldb_val new_key;
+	struct ldb_module *module = NULL;
+	struct update_context *context =NULL;
+	int ret = LDB_SUCCESS;
+	TALLOC_CTX *tmp_ctx;
+
+	tmp_ctx = talloc_new(ltdb);
+	assert_non_null(tmp_ctx);
+
+	context = talloc_get_type_abort(ctx, struct update_context);
+
+	module = talloc_zero(tmp_ctx, struct ldb_module);
+	module->ldb = context->ldb;
+
+	if (strncmp("key ", (char *) key.data, 4) == 0) {
+		int i = strtol((char *) &key.data[4], NULL, 10);
+		context->visits[i]++;
+		new_key.data = talloc_memdup(tmp_ctx, key.data, key.length);
+		new_key.length  = key.length;
+		new_key.data[0] = 'K';
+
+		ret = ltdb->kv_ops->update_in_iterate(ltdb,
+						      key,
+						      new_key,
+						      data,
+						      &module);
+	}
+	TALLOC_FREE(tmp_ctx);
+	return ret;
+}
+
+/*
+ * Test that update_in_iterate behaves as expected.
+ */
+static void test_update_in_iterate(void **state)
+{
+	int ret;
+	struct test_ctx *test_ctx = talloc_get_type_abort(*state,
+							  struct test_ctx);
+	struct ltdb_private *ltdb = get_ltdb(test_ctx->ldb);
+	int i;
+	struct update_context *context = NULL;
+
+
+	TALLOC_CTX *tmp_ctx;
+
+	tmp_ctx = talloc_new(test_ctx);
+	assert_non_null(tmp_ctx);
+
+	context = talloc_zero(tmp_ctx, struct update_context);
+	assert_non_null(context);
+	context->ldb = test_ctx->ldb;
+	/*
+	 * Begin a transaction
+	 */
+	ret = ltdb->kv_ops->begin_write(ltdb);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * Write the records
+	 */
+	for (i = 0; i < NUM_RECS; i++) {
+		struct ldb_val key;
+		struct ldb_val rec;
+		int flags = 0;
+
+		key.data   = (uint8_t *)talloc_asprintf(tmp_ctx, "key %04d", i);
+		key.length = strlen((char *)key.data) + 1;
+
+		rec.data   = (uint8_t *) talloc_asprintf(tmp_ctx,
+							 "data for record (%04d)",
+							 i);
+		rec.length = strlen((char *)rec.data) + 1;
+
+		ret = ltdb->kv_ops->store(ltdb, key, rec, flags);
+		assert_int_equal(ret, 0);
+
+		TALLOC_FREE(key.data);
+		TALLOC_FREE(rec.data);
+	}
+
+	/*
+	 * Commit the transaction
+	 */
+	ret = ltdb->kv_ops->finish_write(ltdb);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * Now iterate over the kv store and ensure that all the
+	 * records are visited.
+	 */
+
+	/*
+	 * Needs to be done inside a transaction
+	 */
+	ret = ltdb->kv_ops->begin_write(ltdb);
+	assert_int_equal(ret, 0);
+
+	ret = ltdb->kv_ops->iterate(ltdb, update_fn, context);
+	for (i = 0; i < NUM_RECS; i++) {
+		assert_int_equal(1, context->visits[i]);
+	}
+
+	ret = ltdb->kv_ops->finish_write(ltdb);
+	assert_int_equal(ret, 0);
+
+	TALLOC_FREE(tmp_ctx);
+}
+
+/*
+ * Ensure that writes are not visible until the transaction has been
+ * committed.
+ */
+static void test_write_transaction_isolation(void **state)
+{
+	int ret;
+	struct test_ctx *test_ctx = talloc_get_type_abort(*state,
+							  struct test_ctx);
+	struct ltdb_private *ltdb = get_ltdb(test_ctx->ldb);
+	struct ldb_val key;
+	struct ldb_val val;
+
+	const char *KEY1 = "KEY01";
+	const char *VAL1 = "VALUE01";
+
+	const char *KEY2 = "KEY02";
+	const char *VAL2 = "VALUE02";
+
+	/*
+	 * Pipes etc to co-ordinate the processes
+	 */
+	int to_child[2];
+	int to_parent[2];
+	char buf[2];
+	pid_t pid, w_pid;
+	int wstatus;
+
+	TALLOC_CTX *tmp_ctx;
+	tmp_ctx = talloc_new(test_ctx);
+	assert_non_null(tmp_ctx);
+
+
+	/*
+	 * Add a record to the database
+	 */
+	ret = ltdb->kv_ops->begin_write(ltdb);
+	assert_int_equal(ret, 0);
+
+	key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY1);
+	key.length = strlen(KEY1) + 1;
+
+	val.data = (uint8_t *)talloc_strdup(tmp_ctx, VAL1);
+	val.length = strlen(VAL1) + 1;
+
+	ret = ltdb->kv_ops->store(ltdb, key, val, 0);
+	assert_int_equal(ret, 0);
+
+	ret = ltdb->kv_ops->finish_write(ltdb);
+	assert_int_equal(ret, 0);
+
+
+	ret = pipe(to_child);
+	assert_int_equal(ret, 0);
+	ret = pipe(to_parent);
+	assert_int_equal(ret, 0);
+	/*
+	 * Now fork a new process
+	 */
+
+	pid = fork();
+	if (pid == 0) {
+
+		struct ldb_context *ldb = NULL;
+		close(to_child[1]);
+		close(to_parent[0]);
+
+		/*
+		 * Wait for the transaction to start
+		 */
+		ret = read(to_child[0], buf, 2);
+		if (ret != 2) {
+			print_error(__location__": read returned (%d)\n",
+				    ret);
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+		ldb = ldb_init(test_ctx, test_ctx->ev);
+		ret = ldb_connect(ldb, test_ctx->dbpath, 0, NULL);
+		if (ret != LDB_SUCCESS) {
+			print_error(__location__": ldb_connect returned (%d)\n",
+				    ret);
+			exit(ret);
+		}
+
+		ltdb = get_ltdb(ldb);
+
+		ret = ltdb->kv_ops->lock_read(ldb->modules);
+		if (ret != LDB_SUCCESS) {
+			print_error(__location__": lock_read returned (%d)\n",
+				    ret);
+			exit(ret);
+		}
+
+		/*
+		 * Check that KEY1 is there
+		 */
+		key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY1);
+		key.length = strlen(KEY1) + 1;
+
+		ret = ltdb->kv_ops->fetch_and_parse(ltdb, key, parse, &val);
+		if (ret != LDB_SUCCESS) {
+			print_error(__location__": fetch_and_parse returned "
+				    "(%d)\n",
+				    ret);
+			exit(ret);
+		}
+
+		if ((strlen(VAL1) + 1) != val.length) {
+			print_error(__location__": KEY1 value lengths different"
+				    ", expected (%d) actual(%d)\n",
+				    (int)(strlen(VAL1) + 1), (int)val.length);
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+		if (memcmp(VAL1, val.data, strlen(VAL1)) != 0) {
+			print_error(__location__": KEY1 values different, "
+				    "expected (%s) actual(%s)\n",
+				    VAL1,
+				    val.data);
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+
+		ret = ltdb->kv_ops->unlock_read(ldb->modules);
+		if (ret != LDB_SUCCESS) {
+			print_error(__location__": unlock_read returned (%d)\n",
+				    ret);
+			exit(ret);
+		}
+
+		/*
+		 * Check that KEY2 is not there
+		 */
+		key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY2);
+		key.length = strlen(KEY2 + 1);
+
+		ret = ltdb->kv_ops->lock_read(ldb->modules);
+		if (ret != LDB_SUCCESS) {
+			print_error(__location__": lock_read returned (%d)\n",
+				    ret);
+			exit(ret);
+		}
+
+		ret = ltdb->kv_ops->fetch_and_parse(ltdb, key, parse, &val);
+		if (ret != LDB_ERR_NO_SUCH_OBJECT) {
+			print_error(__location__": fetch_and_parse returned "
+				    "(%d)\n",
+				    ret);
+			exit(ret);
+		}
+
+		ret = ltdb->kv_ops->unlock_read(ldb->modules);
+		if (ret != LDB_SUCCESS) {
+			print_error(__location__": unlock_read returned (%d)\n",
+				    ret);
+			exit(ret);
+		}
+
+		/*
+		 * Signal the other process to commit the transaction
+		 */
+		ret = write(to_parent[1], "GO", 2);
+		if (ret != 2) {
+			print_error(__location__": write returned (%d)\n",
+				    ret);
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+
+		/*
+		 * Wait for the transaction to be commited
+		 */
+		ret = read(to_child[0], buf, 2);
+		if (ret != 2) {
+			print_error(__location__": read returned (%d)\n",
+				    ret);
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+
+		/*
+		 * Check that KEY1 is there
+		 */
+		ret = ltdb->kv_ops->lock_read(ldb->modules);
+		if (ret != LDB_SUCCESS) {
+			print_error(__location__": unlock_read returned (%d)\n",
+				    ret);
+			exit(ret);
+		}
+		key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY1);
+		key.length = strlen(KEY1) + 1;
+
+		ret = ltdb->kv_ops->fetch_and_parse(ltdb, key, parse, &val);
+		if (ret != LDB_SUCCESS) {
+			print_error(__location__": fetch_and_parse returned "
+				    "(%d)\n",
+				    ret);
+			exit(ret);
+		}
+
+		if ((strlen(VAL1) + 1) != val.length) {
+			print_error(__location__": KEY1 value lengths different"
+				    ", expected (%d) actual(%d)\n",
+				    (int)(strlen(VAL1) + 1), (int)val.length);
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+		if (memcmp(VAL1, val.data, strlen(VAL1)) != 0) {
+			print_error(__location__": KEY1 values different, "
+				    "expected (%s) actual(%s)\n",
+				    VAL1,
+				    val.data);
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+
+		ret = ltdb->kv_ops->unlock_read(ldb->modules);
+		if (ret != LDB_SUCCESS) {
+			print_error(__location__": unlock_read returned (%d)\n",
+				    ret);
+			exit(ret);
+		}
+
+
+		/*
+		 * Check that KEY2 is there
+		 */
+		ret = ltdb->kv_ops->lock_read(ldb->modules);
+		if (ret != LDB_SUCCESS) {
+			print_error(__location__": unlock_read returned (%d)\n",
+				    ret);
+			exit(ret);
+		}
+
+		key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY2);
+		key.length = strlen(KEY2) + 1;
+
+		ret = ltdb->kv_ops->fetch_and_parse(ltdb, key, parse, &val);
+		if (ret != LDB_SUCCESS) {
+			print_error(__location__": fetch_and_parse returned "
+				    "(%d)\n",
+				    ret);
+			exit(ret);
+		}
+
+		if ((strlen(VAL2) + 1) != val.length) {
+			print_error(__location__": KEY2 value lengths different"
+				    ", expected (%d) actual(%d)\n",
+				    (int)(strlen(VAL2) + 1), (int)val.length);
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+		if (memcmp(VAL2, val.data, strlen(VAL2)) != 0) {
+			print_error(__location__": KEY2 values different, "
+				    "expected (%s) actual(%s)\n",
+				    VAL2,
+				    val.data);
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+
+		ret = ltdb->kv_ops->unlock_read(ldb->modules);
+		if (ret != LDB_SUCCESS) {
+			print_error(__location__": unlock_read returned (%d)\n",
+				    ret);
+			exit(ret);
+		}
+
+		exit(0);
+	}
+	close(to_child[0]);
+	close(to_parent[1]);
+
+	/*
+	 * Begin a transaction and add a record to the database
+	 * but leave the transaction open
+	 */
+	ret = ltdb->kv_ops->begin_write(ltdb);
+	assert_int_equal(ret, 0);
+
+	key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY2);
+	key.length = strlen(KEY2) + 1;
+
+	val.data = (uint8_t *)talloc_strdup(tmp_ctx, VAL2);
+	val.length = strlen(VAL2) + 1;
+
+	ret = ltdb->kv_ops->store(ltdb, key, val, 0);
+	assert_int_equal(ret, 0);
+
+	/*
+	 * Signal the child process
+	 */
+	ret = write(to_child[1], "GO", 2);
+	assert_int_equal(2, ret);
+
+	/*
+	 * Wait for the child process to check the DB state while the
+	 * transaction is active
+	 */
+	ret = read(to_parent[0], buf, 2);
+	assert_int_equal(2, ret);
+
+	/*
+	 * commit the transaction
+	 */
+	ret = ltdb->kv_ops->finish_write(ltdb);
+	assert_int_equal(0, ret);
+
+	/*
+	 * Signal the child process
+	 */
+	ret = write(to_child[1], "GO", 2);
+	assert_int_equal(2, ret);
+
+	w_pid = waitpid(pid, &wstatus, 0);
+	assert_int_equal(pid, w_pid);
+
+	assert_true(WIFEXITED(wstatus));
+
+	assert_int_equal(WEXITSTATUS(wstatus), 0);
+
+
+	TALLOC_FREE(tmp_ctx);
+}
+
+/*
+ * Ensure that deletes are not visible until the transaction has been
+ * committed.
+ */
+static void test_delete_transaction_isolation(void **state)
+{
+	int ret;
+	struct test_ctx *test_ctx = talloc_get_type_abort(*state,
+							  struct test_ctx);
+	struct ltdb_private *ltdb = get_ltdb(test_ctx->ldb);
+	struct ldb_val key;
+	struct ldb_val val;
+
+	const char *KEY1 = "KEY01";
+	const char *VAL1 = "VALUE01";
+
+	const char *KEY2 = "KEY02";
+	const char *VAL2 = "VALUE02";
+
+	/*
+	 * Pipes etc to co-ordinate the processes
+	 */
+	int to_child[2];
+	int to_parent[2];
+	char buf[2];
+	pid_t pid, w_pid;
+	int wstatus;
+
+	TALLOC_CTX *tmp_ctx;
+	tmp_ctx = talloc_new(test_ctx);
+	assert_non_null(tmp_ctx);
+
+
+	/*
+	 * Add records to the database
+	 */
+	ret = ltdb->kv_ops->begin_write(ltdb);
+	assert_int_equal(ret, 0);
+
+	key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY1);
+	key.length = strlen(KEY1) + 1;
+
+	val.data = (uint8_t *)talloc_strdup(tmp_ctx, VAL1);
+	val.length = strlen(VAL1) + 1;
+
+	ret = ltdb->kv_ops->store(ltdb, key, val, 0);
+	assert_int_equal(ret, 0);
+
+	key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY2);
+	key.length = strlen(KEY2) + 1;
+
+	val.data = (uint8_t *)talloc_strdup(tmp_ctx, VAL2);
+	val.length = strlen(VAL2) + 1;
+
+	ret = ltdb->kv_ops->store(ltdb, key, val, 0);
+	assert_int_equal(ret, 0);
+
+	ret = ltdb->kv_ops->finish_write(ltdb);
+	assert_int_equal(ret, 0);
+
+
+	ret = pipe(to_child);
+	assert_int_equal(ret, 0);
+	ret = pipe(to_parent);
+	assert_int_equal(ret, 0);
+	/*
+	 * Now fork a new process
+	 */
+
+	pid = fork();
+	if (pid == 0) {
+
+		struct ldb_context *ldb = NULL;
+		close(to_child[1]);
+		close(to_parent[0]);
+
+		/*
+		 * Wait for the transaction to be started
+		 */
+		ret = read(to_child[0], buf, 2);
+		if (ret != 2) {
+			print_error(__location__": read returned (%d)\n",
+				    ret);
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+
+		ldb = ldb_init(test_ctx, test_ctx->ev);
+		ret = ldb_connect(ldb, test_ctx->dbpath, 0, NULL);
+		if (ret != LDB_SUCCESS) {
+			print_error(__location__": ldb_connect returned (%d)\n",
+				    ret);
+			exit(ret);
+		}
+
+		ltdb = get_ltdb(ldb);
+
+		ret = ltdb->kv_ops->lock_read(ldb->modules);
+		if (ret != LDB_SUCCESS) {
+			print_error(__location__": lock_read returned (%d)\n",
+				    ret);
+			exit(ret);
+		}
+
+		/*
+		 * Check that KEY1 is there
+		 */
+		key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY1);
+		key.length = strlen(KEY1) + 1;
+
+		ret = ltdb->kv_ops->fetch_and_parse(ltdb, key, parse, &val);
+		if (ret != LDB_SUCCESS) {
+			print_error(__location__": fetch_and_parse returned "
+				    "(%d)\n",
+				    ret);
+			exit(ret);
+		}
+
+		if ((strlen(VAL1) + 1) != val.length) {
+			print_error(__location__": KEY1 value lengths different"
+				    ", expected (%d) actual(%d)\n",
+				    (int)(strlen(VAL1) + 1), (int)val.length);
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+		if (memcmp(VAL1, val.data, strlen(VAL1)) != 0) {
+			print_error(__location__": KEY1 values different, "
+				    "expected (%s) actual(%s)\n",
+				    VAL1,
+				    val.data);
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+
+		/*
+		 * Check that KEY2 is there
+		 */
+
+		key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY2);
+		key.length = strlen(KEY2) + 1;
+
+		ret = ltdb->kv_ops->fetch_and_parse(ltdb, key, parse, &val);
+		if (ret != LDB_SUCCESS) {
+			print_error(__location__": fetch_and_parse returned "
+				    "(%d)\n",
+				    ret);
+			exit(ret);
+		}
+
+		if ((strlen(VAL2) + 1) != val.length) {
+			print_error(__location__": KEY2 value lengths different"
+				    ", expected (%d) actual(%d)\n",
+				    (int)(strlen(VAL2) + 1), (int)val.length);
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+		if (memcmp(VAL2, val.data, strlen(VAL2)) != 0) {
+			print_error(__location__": KEY2 values different, "
+				    "expected (%s) actual(%s)\n",
+				    VAL2,
+				    val.data);
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+
+		ret = ltdb->kv_ops->unlock_read(ldb->modules);
+		if (ret != LDB_SUCCESS) {
+			print_error(__location__": unlock_read returned (%d)\n",
+				    ret);
+			exit(ret);
+		}
+
+		/*
+		 * Signal the other process to commit the transaction
+		 */
+		ret = write(to_parent[1], "GO", 2);
+		if (ret != 2) {
+			print_error(__location__": write returned (%d)\n",
+				    ret);
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+
+		/*
+		 * Wait for the transaction to be commited
+		 */
+		ret = read(to_child[0], buf, 2);
+		if (ret != 2) {
+			print_error(__location__": read returned (%d)\n",
+				    ret);
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+
+		/*
+		 * Check that KEY1 is there
+		 */
+		ret = ltdb->kv_ops->lock_read(ldb->modules);
+		if (ret != LDB_SUCCESS) {
+			print_error(__location__": unlock_read returned (%d)\n",
+				    ret);
+			exit(ret);
+		}
+		key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY1);
+		key.length = strlen(KEY1) + 1;
+
+		ret = ltdb->kv_ops->fetch_and_parse(ltdb, key, parse, &val);
+		if (ret != LDB_SUCCESS) {
+			print_error(__location__": fetch_and_parse returned "
+				    "(%d)\n",
+				    ret);
+			exit(ret);
+		}
+
+		if ((strlen(VAL1) + 1) != val.length) {
+			print_error(__location__": KEY1 value lengths different"
+				    ", expected (%d) actual(%d)\n",
+				    (int)(strlen(VAL1) + 1), (int)val.length);
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+		if (memcmp(VAL1, val.data, strlen(VAL1)) != 0) {
+			print_error(__location__": KEY1 values different, "
+				    "expected (%s) actual(%s)\n",
+				    VAL1,
+				    val.data);
+			exit(LDB_ERR_OPERATIONS_ERROR);
+		}
+
+		/*
+		 * Check that KEY2 is not there
+		 */
+		key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY2);
+		key.length = strlen(KEY2 + 1);
+
+		ret = ltdb->kv_ops->lock_read(ldb->modules);
+		if (ret != LDB_SUCCESS) {
+			print_error(__location__": lock_read returned (%d)\n",
+				    ret);
+			exit(ret);
+		}
+
+		ret = ltdb->kv_ops->fetch_and_parse(ltdb, key, parse, &val);
+		if (ret != LDB_ERR_NO_SUCH_OBJECT) {
+			print_error(__location__": fetch_and_parse returned "
+				    "(%d)\n",
+				    ret);
+			exit(ret);
+		}
+
+		ret = ltdb->kv_ops->unlock_read(ldb->modules);
+		if (ret != LDB_SUCCESS) {
+			print_error(__location__": unlock_read returned (%d)\n",
+				    ret);
+			exit(ret);
+		}
+
+		exit(0);
+	}
+	close(to_child[0]);
+	close(to_parent[1]);
+
+	/*
+	 * Begin a transaction and delete a record from the database
+	 * but leave the transaction open
+	 */
+	ret = ltdb->kv_ops->begin_write(ltdb);
+	assert_int_equal(ret, 0);
+
+	key.data = (uint8_t *)talloc_strdup(tmp_ctx, KEY2);
+	key.length = strlen(KEY2) + 1;
+
+	ret = ltdb->kv_ops->delete(ltdb, key);
+	assert_int_equal(ret, 0);
+	/*
+	 * Signal the child process
+	 */
+	ret = write(to_child[1], "GO", 2);
+	assert_int_equal(2, ret);
+
+	/*
+	 * Wait for the child process to check the DB state while the
+	 * transaction is active
+	 */
+	ret = read(to_parent[0], buf, 2);
+	assert_int_equal(2, ret);
+
+	/*
+	 * commit the transaction
+	 */
+	ret = ltdb->kv_ops->finish_write(ltdb);
+	assert_int_equal(0, ret);
+
+	/*
+	 * Signal the child process
+	 */
+	ret = write(to_child[1], "GO", 2);
+	assert_int_equal(2, ret);
+
+	w_pid = waitpid(pid, &wstatus, 0);
+	assert_int_equal(pid, w_pid);
+
+	assert_true(WIFEXITED(wstatus));
+
+	assert_int_equal(WEXITSTATUS(wstatus), 0);
+
+
+	TALLOC_FREE(tmp_ctx);
+}
+
+
+int main(int argc, const char **argv)
+{
+	const struct CMUnitTest tests[] = {
+		cmocka_unit_test_setup_teardown(
+			test_add_get,
+			setup,
+			teardown),
+		cmocka_unit_test_setup_teardown(
+			test_delete,
+			setup,
+			teardown),
+		cmocka_unit_test_setup_teardown(
+			test_transaction_abort_write,
+			setup,
+			teardown),
+		cmocka_unit_test_setup_teardown(
+			test_transaction_abort_delete,
+			setup,
+			teardown),
+		cmocka_unit_test_setup_teardown(
+			test_read_outside_transaction,
+			setup,
+			teardown),
+		cmocka_unit_test_setup_teardown(
+			test_write_outside_transaction,
+			setup,
+			teardown),
+		cmocka_unit_test_setup_teardown(
+			test_delete_outside_transaction,
+			setup,
+			teardown),
+		cmocka_unit_test_setup_teardown(
+			test_iterate,
+			setup,
+			teardown),
+		cmocka_unit_test_setup_teardown(
+			test_update_in_iterate,
+			setup,
+			teardown),
+		cmocka_unit_test_setup_teardown(
+			test_write_transaction_isolation,
+			setup,
+			teardown),
+		cmocka_unit_test_setup_teardown(
+			test_delete_transaction_isolation,
+			setup,
+			teardown),
+	};
+
+	return cmocka_run_group_tests(tests, NULL, NULL);
+}
diff --git a/lib/ldb/wscript b/lib/ldb/wscript
index f959d59b28f..81fcb55e79b 100644
--- a/lib/ldb/wscript
+++ b/lib/ldb/wscript
@@ -359,6 +359,12 @@ def build(bld):
                          deps='cmocka ldb',
                          install=False)
 
+        bld.SAMBA_BINARY('ldb_tdb_kv_ops_test',
+                         source='tests/ldb_kv_ops_test.c',
+                         cflags='-DTEST_BE=\"tdb\"',
+                         deps='cmocka ldb',
+                         install=False)
+
         bld.SAMBA_BINARY('ldb_msg_test',
                          source='tests/ldb_msg.c',
                          deps='cmocka ldb',
@@ -393,7 +399,8 @@ def test(ctx):
     cmocka_ret = 0
     for test_exe in ['ldb_tdb_mod_op_test',
                      'ldb_tdb_guid_mod_op_test',
-                     'ldb_msg_test']:
+                     'ldb_msg_test',
+                     'ldb_tdb_kv_ops_test']:
             cmd = os.path.join(Utils.g_module.blddir, test_exe)
             cmocka_ret = cmocka_ret or samba_utils.RUN_COMMAND(cmd)
 
-- 
2.11.0



More information about the samba-technical mailing list