[PATCH] Make indexes more robust in the event of a failure.

Gary Lockyer gary at catalyst.net.nz
Tue Mar 26 22:55:07 UTC 2019


Ensure that the indexes are consistent in the
event of a failure during a modify or rename operation.

Merge Request: https://gitlab.com/samba-team/samba/merge_requests/342
CI results: https://gitlab.com/samba-team/devel/samba/pipelines/53756334

Ngā mihi
Gary
-------------- next part --------------
From 7736d0c14f2e09c09a4f84ed3053c87895252f29 Mon Sep 17 00:00:00 2001
From: Gary Lockyer <gary at catalyst.net.nz>
Date: Wed, 6 Mar 2019 15:28:45 +1300
Subject: [PATCH 1/8] lib ldb tests: Test for index corruption in modify

Add test to test for index corruption in a failed modify.

Pair-Programmed-With: Andrew Bartlett <abartlet at samba.org>
Signed-off-by: Gary Lockyer <gary at catalyst.net.nz>
---
 lib/ldb/tests/python/api.py                  | 68 ++++++++++++++++++++
 selftest/knownfail.d/test_modify_transaction |  3 +
 2 files changed, 71 insertions(+)
 create mode 100644 selftest/knownfail.d/test_modify_transaction

diff --git a/lib/ldb/tests/python/api.py b/lib/ldb/tests/python/api.py
index 0c4e269239b..ca267ae6758 100755
--- a/lib/ldb/tests/python/api.py
+++ b/lib/ldb/tests/python/api.py
@@ -2071,6 +2071,62 @@ class BadIndexTests(LdbBaseTest):
             # https://bugzilla.samba.org/show_bug.cgi?id=13361
             self.assertEquals(len(res), 4)
 
+    def test_modify_transaction(self):
+        self.ldb.add({"dn": "x=y,dc=samba,dc=org",
+                      "objectUUID": b"0123456789abcde1",
+                      "y": "2",
+                      "z": "2"})
+
+        res = self.ldb.search(expression="(y=2)",
+                              base="dc=samba,dc=org")
+        self.assertEqual(len(res), 1)
+
+        self.ldb.add({"dn": "@ATTRIBUTES",
+                      "y": "UNIQUE_INDEX"})
+
+        self.ldb.transaction_start()
+
+        m = ldb.Message()
+        m.dn = ldb.Dn(self.ldb, "x=y,dc=samba,dc=org")
+        m["0"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "y")
+        m["1"] = ldb.MessageElement([], ldb.FLAG_MOD_DELETE, "not-here")
+
+        try:
+            self.ldb.modify(m)
+            self.fail()
+
+        except ldb.LdbError as err:
+            enum = err.args[0]
+            self.assertEqual(enum, ldb.ERR_NO_SUCH_ATTRIBUTE)
+
+        try:
+            self.ldb.transaction_commit()
+            # We should fail here, but we want to be sure
+            # we fail below
+
+        except ldb.LdbError as err:
+            enum = err.args[0]
+            self.assertEqual(enum, ldb.ERR_OPERATIONS_ERROR)
+
+        # The index should still be pointing to x=y
+        res = self.ldb.search(expression="(y=2)",
+                              base="dc=samba,dc=org")
+        self.assertEqual(len(res), 1)
+
+        try:
+            self.ldb.add({"dn": "x=y2,dc=samba,dc=org",
+                        "objectUUID": b"0123456789abcde2",
+                        "y": "2"})
+            self.fail("Added unique attribute twice")
+        except ldb.LdbError as err:
+            enum = err.args[0]
+            self.assertEqual(enum, ldb.ERR_CONSTRAINT_VIOLATION)
+
+        res = self.ldb.search(expression="(y=2)",
+                              base="dc=samba,dc=org")
+        self.assertEqual(len(res), 1)
+        self.assertEqual(str(res[0].dn), "x=y,dc=samba,dc=org")
+
     def tearDown(self):
         super(BadIndexTests, self).tearDown()
 
@@ -2084,6 +2140,18 @@ class GUIDBadIndexTests(BadIndexTests):
         super(GUIDBadIndexTests, self).setUp()
 
 
+class GUIDBadIndexTestsLmdb(BadIndexTests):
+
+    def setUp(self):
+        self.prefix = MDB_PREFIX
+        self.index = MDB_INDEX_OBJ
+        self.IDXGUID = True
+        super(GUIDBadIndexTestsLmdb, self).setUp()
+
+    def tearDown(self):
+        super(GUIDBadIndexTestsLmdb, self).tearDown()
+
+
 class DnTests(TestCase):
 
     def setUp(self):
diff --git a/selftest/knownfail.d/test_modify_transaction b/selftest/knownfail.d/test_modify_transaction
new file mode 100644
index 00000000000..d53070a664b
--- /dev/null
+++ b/selftest/knownfail.d/test_modify_transaction
@@ -0,0 +1,3 @@
+^ldb.python.api.BadIndexTests.test_modify_transaction
+^ldb.python.api.GUIDBadIndexTests.test_modify_transaction
+^ldb.python.api.GUIDBadIndexTestsLmdb.test_modify_transaction
-- 
2.18.1


From 9bce147f5c5a5d8ac6c99a25e88f87577496f2bc Mon Sep 17 00:00:00 2001
From: Gary Lockyer <gary at catalyst.net.nz>
Date: Wed, 6 Mar 2019 15:32:08 +1300
Subject: [PATCH 2/8] lib ldb tests: Test nested transactions

Add a test to document that ldb does not currently support nested
transactions.

Signed-off-by: Gary Lockyer <gary at catalyst.net.nz>
---
 lib/ldb/tests/python/api.py | 101 ++++++++++++++++++++++++++++++++++++
 1 file changed, 101 insertions(+)

diff --git a/lib/ldb/tests/python/api.py b/lib/ldb/tests/python/api.py
index ca267ae6758..5d3ae8e7ec3 100755
--- a/lib/ldb/tests/python/api.py
+++ b/lib/ldb/tests/python/api.py
@@ -3097,6 +3097,107 @@ class VersionTests(TestCase):
     def test_version(self):
         self.assertTrue(isinstance(ldb.__version__, str))
 
+class NestedTransactionTests(LdbBaseTest):
+    def setUp(self):
+        super(NestedTransactionTests, self).setUp()
+        self.testdir = tempdir()
+        self.filename = os.path.join(self.testdir, "test.ldb")
+        self.ldb = ldb.Ldb(self.url(), flags=self.flags())
+        self.ldb.add({"dn": "@INDEXLIST",
+                      "@IDXATTR": [b"x", b"y", b"ou"],
+                      "@IDXGUID": [b"objectUUID"],
+                      "@IDX_DN_GUID": [b"GUID"]})
+
+        super(NestedTransactionTests, self).setUp()
+
+    #
+    # This test documents that currently ldb does not support true nested
+    # transactions.
+    #
+    # Note: The test is written so that it treats failure as pass.
+    #       It is done this way as standalone ldb builds do not use the samba
+    #       known fail mechanism
+    #
+    def test_nested_transactions(self):
+
+        self.ldb.transaction_start()
+
+        self.ldb.add({"dn": "x=x1,dc=samba,dc=org",
+                      "objectUUID": b"0123456789abcde1"})
+        res = self.ldb.search(expression="(objectUUID=0123456789abcde1)",
+                              base="dc=samba,dc=org")
+        self.assertEquals(len(res), 1)
+
+        self.ldb.add({"dn": "x=x2,dc=samba,dc=org",
+                      "objectUUID": b"0123456789abcde2"})
+        res = self.ldb.search(expression="(objectUUID=0123456789abcde2)",
+                              base="dc=samba,dc=org")
+        self.assertEquals(len(res), 1)
+
+        self.ldb.transaction_start()
+        self.ldb.add({"dn": "x=x3,dc=samba,dc=org",
+                      "objectUUID": b"0123456789abcde3"})
+        res = self.ldb.search(expression="(objectUUID=0123456789abcde3)",
+                              base="dc=samba,dc=org")
+        self.assertEquals(len(res), 1)
+        self.ldb.transaction_cancel()
+        #
+        # Check that we can not see the record added by the cancelled
+        # transaction.
+        # Currently this fails as ldb does not support true nested
+        # transactions, and only the outer commits and cancels have an effect
+        #
+        res = self.ldb.search(expression="(objectUUID=0123456789abcde3)",
+                              base="dc=samba,dc=org")
+        #
+        # FIXME this test currently passes on a failure, i.e. if nested
+        #       transaction support worked correctly the correct test would
+        #       be.
+        #         self.assertEqual(len(res), 0)
+        #       as the add of objectUUID=0123456789abcde3 would reverted when
+        #       the sub transaction it was nested in was rolled back.
+        #
+        #       Currently this is not the case so the record is still present.
+        self.assertEqual(len(res), 1)
+
+
+        # Commit the outer transaction
+        #
+        self.ldb.transaction_commit()
+        #
+        # Now check we can still see the records added in the outer
+        # transaction.
+        #
+        res = self.ldb.search(expression="(objectUUID=0123456789abcde1)",
+                              base="dc=samba,dc=org")
+        self.assertEquals(len(res), 1)
+        res = self.ldb.search(expression="(objectUUID=0123456789abcde2)",
+                              base="dc=samba,dc=org")
+        self.assertEquals(len(res), 1)
+        #
+        # And that we can't see the records added by the nested transaction.
+        #
+        res = self.ldb.search(expression="(objectUUID=0123456789abcde3)",
+                              base="dc=samba,dc=org")
+        # FIXME again if nested transactions worked correctly we would not
+        #       see this record. The test should be.
+        #         self.assertEqual(len(res), 0)
+        self.assertEqual(len(res), 1)
+
+    def tearDown(self):
+        super(NestedTransactionTests, self).tearDown()
+
+
+class LmdbNestedTransactionTests(NestedTransactionTests):
+
+    def setUp(self):
+        self.prefix = MDB_PREFIX
+        self.index = MDB_INDEX_OBJ
+        super(LmdbNestedTransactionTests, self).setUp()
+
+    def tearDown(self):
+        super(LmdbNestedTransactionTests, self).tearDown()
+
 
 if __name__ == '__main__':
     import unittest
-- 
2.18.1


From 132ca2d6ca491aa711a00d8e943337ebc6a7ca22 Mon Sep 17 00:00:00 2001
From: Gary Lockyer <gary at catalyst.net.nz>
Date: Wed, 6 Mar 2019 15:45:54 +1300
Subject: [PATCH 3/8] lib ldb tests: remove deprecation warning from api.py

Remove the "DeprecationWarning: Please use assertEqual instead."
warnings from api.py

Signed-off-by: Gary Lockyer <gary at catalyst.net.nz>
---
 lib/ldb/tests/python/api.py | 30 +++++++++++++++---------------
 1 file changed, 15 insertions(+), 15 deletions(-)

diff --git a/lib/ldb/tests/python/api.py b/lib/ldb/tests/python/api.py
index 5d3ae8e7ec3..e872d0ab895 100755
--- a/lib/ldb/tests/python/api.py
+++ b/lib/ldb/tests/python/api.py
@@ -1954,7 +1954,7 @@ class BadIndexTests(LdbBaseTest):
 
         res = self.ldb.search(expression="(y=1)",
                               base="dc=samba,dc=org")
-        self.assertEquals(len(res), 3)
+        self.assertEqual(len(res), 3)
 
         # Now set this to unique index, but forget to check the result
         try:
@@ -1967,7 +1967,7 @@ class BadIndexTests(LdbBaseTest):
         # We must still have a working index
         res = self.ldb.search(expression="(y=1)",
                               base="dc=samba,dc=org")
-        self.assertEquals(len(res), 3)
+        self.assertEqual(len(res), 3)
 
     def test_unique_transaction(self):
         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
@@ -1982,7 +1982,7 @@ class BadIndexTests(LdbBaseTest):
 
         res = self.ldb.search(expression="(y=1)",
                               base="dc=samba,dc=org")
-        self.assertEquals(len(res), 3)
+        self.assertEqual(len(res), 3)
 
         self.ldb.transaction_start()
 
@@ -2005,7 +2005,7 @@ class BadIndexTests(LdbBaseTest):
         res = self.ldb.search(expression="(y=1)",
                               base="dc=samba,dc=org")
 
-        self.assertEquals(len(res), 3)
+        self.assertEqual(len(res), 3)
 
     def test_casefold(self):
         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
@@ -2020,7 +2020,7 @@ class BadIndexTests(LdbBaseTest):
 
         res = self.ldb.search(expression="(y=a)",
                               base="dc=samba,dc=org")
-        self.assertEquals(len(res), 2)
+        self.assertEqual(len(res), 2)
 
         self.ldb.add({"dn": "@ATTRIBUTES",
                       "y": "CASE_INSENSITIVE"})
@@ -2030,12 +2030,12 @@ class BadIndexTests(LdbBaseTest):
                               base="dc=samba,dc=org")
 
         if hasattr(self, 'IDXGUID'):
-            self.assertEquals(len(res), 3)
+            self.assertEqual(len(res), 3)
         else:
             # We should not return this entry twice, but sadly
             # we have not yet fixed
             # https://bugzilla.samba.org/show_bug.cgi?id=13361
-            self.assertEquals(len(res), 4)
+            self.assertEqual(len(res), 4)
 
     def test_casefold_transaction(self):
         self.ldb.add({"dn": "x=x,dc=samba,dc=org",
@@ -2050,7 +2050,7 @@ class BadIndexTests(LdbBaseTest):
 
         res = self.ldb.search(expression="(y=a)",
                               base="dc=samba,dc=org")
-        self.assertEquals(len(res), 2)
+        self.assertEqual(len(res), 2)
 
         self.ldb.transaction_start()
 
@@ -2064,12 +2064,12 @@ class BadIndexTests(LdbBaseTest):
                               base="dc=samba,dc=org")
 
         if hasattr(self, 'IDXGUID'):
-            self.assertEquals(len(res), 3)
+            self.assertEqual(len(res), 3)
         else:
             # We should not return this entry twice, but sadly
             # we have not yet fixed
             # https://bugzilla.samba.org/show_bug.cgi?id=13361
-            self.assertEquals(len(res), 4)
+            self.assertEqual(len(res), 4)
 
     def test_modify_transaction(self):
         self.ldb.add({"dn": "x=y,dc=samba,dc=org",
@@ -3126,20 +3126,20 @@ class NestedTransactionTests(LdbBaseTest):
                       "objectUUID": b"0123456789abcde1"})
         res = self.ldb.search(expression="(objectUUID=0123456789abcde1)",
                               base="dc=samba,dc=org")
-        self.assertEquals(len(res), 1)
+        self.assertEqual(len(res), 1)
 
         self.ldb.add({"dn": "x=x2,dc=samba,dc=org",
                       "objectUUID": b"0123456789abcde2"})
         res = self.ldb.search(expression="(objectUUID=0123456789abcde2)",
                               base="dc=samba,dc=org")
-        self.assertEquals(len(res), 1)
+        self.assertEqual(len(res), 1)
 
         self.ldb.transaction_start()
         self.ldb.add({"dn": "x=x3,dc=samba,dc=org",
                       "objectUUID": b"0123456789abcde3"})
         res = self.ldb.search(expression="(objectUUID=0123456789abcde3)",
                               base="dc=samba,dc=org")
-        self.assertEquals(len(res), 1)
+        self.assertEqual(len(res), 1)
         self.ldb.transaction_cancel()
         #
         # Check that we can not see the record added by the cancelled
@@ -3170,10 +3170,10 @@ class NestedTransactionTests(LdbBaseTest):
         #
         res = self.ldb.search(expression="(objectUUID=0123456789abcde1)",
                               base="dc=samba,dc=org")
-        self.assertEquals(len(res), 1)
+        self.assertEqual(len(res), 1)
         res = self.ldb.search(expression="(objectUUID=0123456789abcde2)",
                               base="dc=samba,dc=org")
-        self.assertEquals(len(res), 1)
+        self.assertEqual(len(res), 1)
         #
         # And that we can't see the records added by the nested transaction.
         #
-- 
2.18.1


From 2505a673149664d485c259b5d0de6d7d34a9f573 Mon Sep 17 00:00:00 2001
From: Gary Lockyer <gary at catalyst.net.nz>
Date: Thu, 7 Mar 2019 10:18:00 +1300
Subject: [PATCH 4/8] lib ldb key value: Add limited nested transaction support

Add limited nested transaction support to the back ends to make the key value
operations atomic (for those back ends that support nested transactions).

Note: that only the lmdb backend currently supports nested transactions.

Signed-off-by: Gary Lockyer <gary at catalyst.net.nz>
---
 lib/ldb/ldb_key_value/ldb_kv.h |  3 +++
 lib/ldb/ldb_mdb/ldb_mdb.c      | 34 ++++++++++++++++++++++++++++++++++
 lib/ldb/ldb_tdb/ldb_tdb.c      | 32 ++++++++++++++++++++++++++++++++
 3 files changed, 69 insertions(+)

diff --git a/lib/ldb/ldb_key_value/ldb_kv.h b/lib/ldb/ldb_key_value/ldb_kv.h
index 5070a588c00..e3642ced792 100644
--- a/lib/ldb/ldb_key_value/ldb_kv.h
+++ b/lib/ldb/ldb_key_value/ldb_kv.h
@@ -41,6 +41,9 @@ struct kv_db_ops {
 	const char *(*name)(struct ldb_kv_private *ldb_kv);
 	bool (*has_changed)(struct ldb_kv_private *ldb_kv);
 	bool (*transaction_active)(struct ldb_kv_private *ldb_kv);
+	int (*begin_nested_write)(struct ldb_kv_private *);
+	int (*finish_nested_write)(struct ldb_kv_private *);
+	int (*abort_nested_write)(struct ldb_kv_private *);
 };
 
 /* this private structure is used by the key value backends in the
diff --git a/lib/ldb/ldb_mdb/ldb_mdb.c b/lib/ldb/ldb_mdb/ldb_mdb.c
index 646a67c554c..7104ee83928 100644
--- a/lib/ldb/ldb_mdb/ldb_mdb.c
+++ b/lib/ldb/ldb_mdb/ldb_mdb.c
@@ -576,6 +576,37 @@ static bool lmdb_changed(struct ldb_kv_private *ldb_kv)
 	return true;
 }
 
+/*
+ * Start a sub transaction
+ * As lmdb supports nested transactions we can start a new transaction
+ */
+static int lmdb_nested_transaction_start(struct ldb_kv_private *ldb_kv)
+{
+	int ret = lmdb_transaction_start(ldb_kv);
+	return ret;
+}
+
+/*
+ * Commit a sub transaction
+ * As lmdb supports nested transactions we can commit the nested transaction
+ */
+static int lmdb_nested_transaction_commit(struct ldb_kv_private *ldb_kv)
+{
+	int ret = lmdb_transaction_commit(ldb_kv);
+	return ret;
+}
+
+/*
+ * Cancel a sub transaction
+ * As lmdb supports nested transactions we can cancel the nested transaction
+ */
+static int lmdb_nested_transaction_cancel(struct ldb_kv_private *ldb_kv)
+{
+	int ret = lmdb_transaction_cancel(ldb_kv);
+	return ret;
+}
+
+
 static struct kv_db_ops lmdb_key_value_ops = {
 	.store              = lmdb_store,
 	.delete             = lmdb_delete,
@@ -593,6 +624,9 @@ static struct kv_db_ops lmdb_key_value_ops = {
 	.name               = lmdb_name,
 	.has_changed        = lmdb_changed,
 	.transaction_active = lmdb_transaction_active,
+	.begin_nested_write = lmdb_nested_transaction_start,
+	.finish_nested_write = lmdb_nested_transaction_commit,
+	.abort_nested_write = lmdb_nested_transaction_cancel,
 };
 
 static const char *lmdb_get_path(const char *url)
diff --git a/lib/ldb/ldb_tdb/ldb_tdb.c b/lib/ldb/ldb_tdb/ldb_tdb.c
index 812ddd3e389..fdcac0c7f7c 100644
--- a/lib/ldb/ldb_tdb/ldb_tdb.c
+++ b/lib/ldb/ldb_tdb/ldb_tdb.c
@@ -400,6 +400,35 @@ static bool ltdb_transaction_active(struct ldb_kv_private *ldb_kv)
 	return tdb_transaction_active(ldb_kv->tdb);
 }
 
+/*
+ * Start a sub transaction
+ * As TDB does not currently support nested transactions, we do nothing and
+ * return LDB_SUCCESS
+ */
+static int ltdb_nested_transaction_start(struct ldb_kv_private *ldb_kv)
+{
+	return LDB_SUCCESS;
+}
+
+/*
+ * Commit a sub transaction
+ * As TDB does not currently support nested transactions, we do nothing and
+ * return LDB_SUCCESS
+ */
+static int ltdb_nested_transaction_commit(struct ldb_kv_private *ldb_kv)
+{
+	return LDB_SUCCESS;
+}
+
+/*
+ * Cancel a sub transaction
+ * As TDB does not currently support nested transactions, we do nothing and
+ * return LDB_SUCCESS
+ */
+static int ltdb_nested_transaction_cancel(struct ldb_kv_private *ldb_kv)
+{
+	return LDB_SUCCESS;
+}
 static const struct kv_db_ops key_value_ops = {
     .store = ltdb_store,
     .delete = ltdb_delete,
@@ -417,6 +446,9 @@ static const struct kv_db_ops key_value_ops = {
     .name = ltdb_name,
     .has_changed = ltdb_changed,
     .transaction_active = ltdb_transaction_active,
+    .begin_nested_write = ltdb_nested_transaction_start,
+    .finish_nested_write = ltdb_nested_transaction_commit,
+    .abort_nested_write = ltdb_nested_transaction_cancel,
 };
 
 /*
-- 
2.18.1


From d60767b40691e7465d0ff3eaee8f089b181536a0 Mon Sep 17 00:00:00 2001
From: Gary Lockyer <gary at catalyst.net.nz>
Date: Thu, 7 Mar 2019 10:37:18 +1300
Subject: [PATCH 5/8] lib ldb key value: add nested transaction support.

Use the nested transaction support added to the key value back ends to
make key value operations atomic. This will ensure that rename
operation failures, which delete the original record and add a new
record, leave the database in a consistent state.

Signed-off-by: Gary Lockyer <gary at catalyst.net.nz>
---
 lib/ldb/ldb_key_value/ldb_kv.c | 75 ++++++++++++++++++++++++++++++++++
 lib/ldb/ldb_key_value/ldb_kv.h |  3 ++
 2 files changed, 78 insertions(+)

diff --git a/lib/ldb/ldb_key_value/ldb_kv.c b/lib/ldb/ldb_key_value/ldb_kv.c
index d4f896736a2..db6506bbcff 100644
--- a/lib/ldb/ldb_key_value/ldb_kv.c
+++ b/lib/ldb/ldb_key_value/ldb_kv.c
@@ -432,6 +432,81 @@ static bool ldb_kv_single_valued(const struct ldb_schema_attribute *a,
 	return false;
 }
 
+
+/*
+ * Place holder actual implementation to be added in subsequent commits
+ */
+int ldb_kv_index_sub_transaction_start(struct ldb_kv_private *ldb_kv)
+{
+	return LDB_SUCCESS;
+}
+/*
+ * Starts a sub transaction if they are supported by the backend
+ */
+static int ldb_kv_sub_transaction_start(struct ldb_kv_private *ldb_kv)
+{
+	int ret = LDB_SUCCESS;
+
+	ret = ldb_kv->kv_ops->begin_nested_write(ldb_kv);
+	if (ret == LDB_SUCCESS) {
+		ret = ldb_kv_index_sub_transaction_start(ldb_kv);
+	}
+	return ret;
+}
+
+/*
+ * Place holder actual implementation to be added in subsequent commits
+ */
+int ldb_kv_index_sub_transaction_commit(struct ldb_kv_private *ldb_kv)
+{
+	return LDB_SUCCESS;
+}
+/*
+ * Commits a sub transaction if they are supported by the backend
+ */
+static int ldb_kv_sub_transaction_commit(struct ldb_kv_private *ldb_kv)
+{
+	int ret = LDB_SUCCESS;
+
+	ret = ldb_kv_index_sub_transaction_commit(ldb_kv);
+	if (ret != LDB_SUCCESS) {
+		return ret;
+	}
+	ret = ldb_kv->kv_ops->finish_nested_write(ldb_kv);
+	return ret;
+}
+
+/*
+ * Place holder actual implementation to be added in subsequent commits
+ */
+int ldb_kv_index_sub_transaction_cancel(struct ldb_kv_private *ldb_kv)
+{
+	return LDB_SUCCESS;
+}
+/*
+ * Cancels a sub transaction if they are supported by the backend
+ */
+static int ldb_kv_sub_transaction_cancel(struct ldb_kv_private *ldb_kv)
+{
+	int ret = LDB_SUCCESS;
+
+	ret = ldb_kv_index_sub_transaction_cancel(ldb_kv);
+	if (ret != LDB_SUCCESS) {
+		struct ldb_context *ldb = ldb_module_get_ctx(ldb_kv->module);
+		/*
+		 * In the event of a failure we log the failure and continue
+		 * as we need to cancel the database transaction.
+		 */
+		ldb_debug(ldb,
+			  LDB_DEBUG_ERROR,
+			  __location__": ldb_kv_index_sub_transaction_cancel "
+			  "failed: %s",
+			  ldb_errstring(ldb));
+	}
+	ret = ldb_kv->kv_ops->abort_nested_write(ldb_kv);
+	return ret;
+}
+
 static int ldb_kv_add_internal(struct ldb_module *module,
 			       struct ldb_kv_private *ldb_kv,
 			       const struct ldb_message *msg,
diff --git a/lib/ldb/ldb_key_value/ldb_kv.h b/lib/ldb/ldb_key_value/ldb_kv.h
index e3642ced792..3c1adbed23a 100644
--- a/lib/ldb/ldb_key_value/ldb_kv.h
+++ b/lib/ldb/ldb_key_value/ldb_kv.h
@@ -266,3 +266,6 @@ int ldb_kv_init_store(struct ldb_kv_private *ldb_kv,
 		      struct ldb_context *ldb,
 		      const char *options[],
 		      struct ldb_module **_module);
+int ldb_kv_index_sub_transaction_start(struct ldb_kv_private *ldb_kv);
+int ldb_kv_index_sub_transaction_cancel(struct ldb_kv_private *ldb_kv);
+int ldb_kv_index_sub_transaction_commit(struct ldb_kv_private *ldb_kv);
-- 
2.18.1


From 519d6ffc908e3006ac1f49167bbdd617e1e1433a Mon Sep 17 00:00:00 2001
From: Gary Lockyer <gary at catalyst.net.nz>
Date: Thu, 7 Mar 2019 10:40:59 +1300
Subject: [PATCH 6/8] lib ldb_key value: wrap operations

Wrap the key value operations in nested transactions to maintain
database consistency.

Signed-off-by: Gary Lockyer <gary at catalyst.net.nz>
---
 lib/ldb/ldb_key_value/ldb_kv.c | 117 ++++++++++++++++++++++++++++-----
 1 file changed, 101 insertions(+), 16 deletions(-)

diff --git a/lib/ldb/ldb_key_value/ldb_kv.c b/lib/ldb/ldb_key_value/ldb_kv.c
index db6506bbcff..f9153d7cb84 100644
--- a/lib/ldb/ldb_key_value/ldb_kv.c
+++ b/lib/ldb/ldb_key_value/ldb_kv.c
@@ -653,7 +653,23 @@ static int ldb_kv_add(struct ldb_kv_context *ctx)
 		return LDB_ERR_OPERATIONS_ERROR;
 	}
 
+	ret = ldb_kv_sub_transaction_start(ldb_kv);
+	if (ret != LDB_SUCCESS) {
+		return ret;
+	}
 	ret = ldb_kv_add_internal(module, ldb_kv, req->op.add.message, true);
+	if (ret != LDB_SUCCESS) {
+		int r = ldb_kv_sub_transaction_cancel(ldb_kv);
+		if (r != LDB_SUCCESS) {
+			ldb_debug(
+				ldb_module_get_ctx(module),
+				LDB_DEBUG_FATAL,
+				__location__
+				": Unable to roll back sub transaction");
+		}
+		return ret;
+	}
+	ret = ldb_kv_sub_transaction_commit(ldb_kv);
 
 	return ret;
 }
@@ -744,6 +760,9 @@ static int ldb_kv_delete(struct ldb_kv_context *ctx)
 {
 	struct ldb_module *module = ctx->module;
 	struct ldb_request *req = ctx->req;
+	void *data = ldb_module_get_private(module);
+	struct ldb_kv_private *ldb_kv =
+	    talloc_get_type(data, struct ldb_kv_private);
 	int ret = LDB_SUCCESS;
 
 	ldb_request_set_state(req, LDB_ASYNC_PENDING);
@@ -752,7 +771,23 @@ static int ldb_kv_delete(struct ldb_kv_context *ctx)
 		return LDB_ERR_OPERATIONS_ERROR;
 	}
 
+	ret = ldb_kv_sub_transaction_start(ldb_kv);
+	if (ret != LDB_SUCCESS) {
+		return ret;
+	}
 	ret = ldb_kv_delete_internal(module, req->op.del.dn);
+	if (ret != LDB_SUCCESS) {
+		int r = ldb_kv_sub_transaction_cancel(ldb_kv);
+		if (r != LDB_SUCCESS) {
+			ldb_debug(
+				ldb_module_get_ctx(module),
+				LDB_DEBUG_FATAL,
+				__location__
+				": Unable to roll back sub transaction");
+		}
+		return ret;
+	}
+	ret = ldb_kv_sub_transaction_commit(ldb_kv);
 
 	return ret;
 }
@@ -1282,6 +1317,9 @@ static int ldb_kv_modify(struct ldb_kv_context *ctx)
 {
 	struct ldb_module *module = ctx->module;
 	struct ldb_request *req = ctx->req;
+	void *data = ldb_module_get_private(module);
+	struct ldb_kv_private *ldb_kv =
+	    talloc_get_type(data, struct ldb_kv_private);
 	int ret = LDB_SUCCESS;
 
 	ret = ldb_kv_check_special_dn(module, req->op.mod.message);
@@ -1295,7 +1333,56 @@ static int ldb_kv_modify(struct ldb_kv_context *ctx)
 		return LDB_ERR_OPERATIONS_ERROR;
 	}
 
+	ret = ldb_kv_sub_transaction_start(ldb_kv);
+	if (ret != LDB_SUCCESS) {
+		return ret;
+	}
 	ret = ldb_kv_modify_internal(module, req->op.mod.message, req);
+	if (ret != LDB_SUCCESS) {
+		int r = ldb_kv_sub_transaction_cancel(ldb_kv);
+		if (r != LDB_SUCCESS) {
+			ldb_debug(
+				ldb_module_get_ctx(module),
+				LDB_DEBUG_FATAL,
+				__location__
+				": Unable to roll back sub transaction");
+		}
+		return ret;
+	}
+	ret = ldb_kv_sub_transaction_commit(ldb_kv);
+
+
+	return ret;
+}
+
+static int ldb_kv_rename_internal(struct ldb_module *module,
+			   struct ldb_request *req,
+			   struct ldb_message *msg)
+{
+	void *data = ldb_module_get_private(module);
+	struct ldb_kv_private *ldb_kv =
+	    talloc_get_type(data, struct ldb_kv_private);
+	int ret = LDB_SUCCESS;
+
+	/* Always delete first then add, to avoid conflicts with
+	 * unique indexes. We rely on the transaction to make this
+	 * atomic
+	 */
+	ret = ldb_kv_delete_internal(module, msg->dn);
+	if (ret != LDB_SUCCESS) {
+		return ret;
+	}
+
+	msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
+	if (msg->dn == NULL) {
+		return LDB_ERR_OPERATIONS_ERROR;
+	}
+
+	/* We don't check single value as we can have more than 1 with
+	 * deleted attributes. We could go through all elements but that's
+	 * maybe not the most efficient way
+	 */
+	ret = ldb_kv_add_internal(module, ldb_kv, msg, false);
 
 	return ret;
 }
@@ -1403,28 +1490,26 @@ static int ldb_kv_rename(struct ldb_kv_context *ctx)
 	talloc_free(key_old.data);
 	talloc_free(key.data);
 
-	/* Always delete first then add, to avoid conflicts with
-	 * unique indexes. We rely on the transaction to make this
-	 * atomic
-	 */
-	ret = ldb_kv_delete_internal(module, msg->dn);
+
+	ret = ldb_kv_sub_transaction_start(ldb_kv);
 	if (ret != LDB_SUCCESS) {
 		talloc_free(msg);
 		return ret;
 	}
-
-	msg->dn = ldb_dn_copy(msg, req->op.rename.newdn);
-	if (msg->dn == NULL) {
+	ret = ldb_kv_rename_internal(module, req, msg);
+	if (ret != LDB_SUCCESS) {
+		int r = ldb_kv_sub_transaction_cancel(ldb_kv);
+		if (r != LDB_SUCCESS) {
+			ldb_debug(
+				ldb_module_get_ctx(module),
+				LDB_DEBUG_FATAL,
+				__location__
+				": Unable to roll back sub transaction");
+		}
 		talloc_free(msg);
-		return LDB_ERR_OPERATIONS_ERROR;
+		return ret;
 	}
-
-	/* We don't check single value as we can have more than 1 with
-	 * deleted attributes. We could go through all elements but that's
-	 * maybe not the most efficient way
-	 */
-	ret = ldb_kv_add_internal(module, ldb_kv, msg, false);
-
+	ret = ldb_kv_sub_transaction_commit(ldb_kv);
 	talloc_free(msg);
 
 	return ret;
-- 
2.18.1


From 1788cc6e5dabded6a22fd4bb7ced99f6be15ccb9 Mon Sep 17 00:00:00 2001
From: Gary Lockyer <gary at catalyst.net.nz>
Date: Tue, 26 Mar 2019 12:42:32 +1300
Subject: [PATCH 7/8] lib ldb ldb_key_value tests: Add tests for wrapped
 operations

Add test exercising the sub/nested transactions wrapping the key value
operations.

Signed-off-by: Gary Lockyer <gary at catalyst.net.nz>
---
 lib/ldb/ldb_key_value/ldb_kv.c                |  33 +
 lib/ldb/tests/ldb_key_value_mdb_test.valgrind |  97 ++
 lib/ldb/tests/ldb_key_value_test.c            | 843 ++++++++++++++++++
 lib/ldb/tests/ldb_kv_ops_test.c               |   8 +-
 lib/ldb/tests/ldb_kv_ops_test.valgrind        |  97 ++
 lib/ldb/wscript                               |  36 +-
 6 files changed, 1112 insertions(+), 2 deletions(-)
 create mode 100644 lib/ldb/tests/ldb_key_value_mdb_test.valgrind
 create mode 100644 lib/ldb/tests/ldb_key_value_test.c
 create mode 100644 lib/ldb/tests/ldb_kv_ops_test.valgrind

diff --git a/lib/ldb/ldb_key_value/ldb_kv.c b/lib/ldb/ldb_key_value/ldb_kv.c
index f9153d7cb84..fa0dcd0615a 100644
--- a/lib/ldb/ldb_key_value/ldb_kv.c
+++ b/lib/ldb/ldb_key_value/ldb_kv.c
@@ -618,6 +618,15 @@ static int ldb_kv_add_internal(struct ldb_module *module,
 
 	ret = ldb_kv_modified(module, msg->dn);
 
+	/*
+	 * To allow testing of the error recovery code in ldb_kv_add
+	 * cmocka tests can define CMOCKA_UNIT_TEST_ADD_INTERNAL_FAIL
+	 * to inject failures at this point.
+	 */
+#ifdef CMOCKA_UNIT_TEST_ADD_INTERNAL_FAIL
+	CMOCKA_UNIT_TEST_ADD_INTERNAL_FAIL
+#endif
+
 	return ret;
 }
 
@@ -750,6 +759,14 @@ static int ldb_kv_delete_internal(struct ldb_module *module, struct ldb_dn *dn)
 
 done:
 	talloc_free(msg);
+	/*
+	 * To allow testing of the error recovery code in ldb_kv_delete
+	 * cmocka tests can define CMOCKA_UNIT_TEST_DELETE_INTERNAL_FAIL
+	 * to inject failures at this point.
+	 */
+#ifdef CMOCKA_UNIT_TEST_DELETE_INTERNAL_FAIL
+	CMOCKA_UNIT_TEST_DELETE_INTERNAL_FAIL
+#endif
 	return ret;
 }
 
@@ -1307,6 +1324,14 @@ int ldb_kv_modify_internal(struct ldb_module *module,
 
 done:
 	TALLOC_FREE(mem_ctx);
+	/*
+	 * To allow testing of the error recovery code in ldb_kv_modify
+	 * cmocka tests can define CMOCKA_UNIT_TEST_MODIFY_INTERNAL_FAIL
+	 * to inject failures at this point.
+	 */
+#ifdef CMOCKA_UNIT_TEST_MODIFY_INTERNAL_FAIL
+	CMOCKA_UNIT_TEST_MODIFY_INTERNAL_FAIL
+#endif
 	return ret;
 }
 
@@ -1384,6 +1409,14 @@ static int ldb_kv_rename_internal(struct ldb_module *module,
 	 */
 	ret = ldb_kv_add_internal(module, ldb_kv, msg, false);
 
+	/*
+	 * To allow testing of the error recovery code in ldb_kv_rename
+	 * cmocka tests can define CMOCKA_UNIT_TEST_RENAME_INTERNAL_FAIL
+	 * to inject failures at this point.
+	 */
+#ifdef CMOCKA_UNIT_TEST_RENAME_INTERNAL_FAIL
+	CMOCKA_UNIT_TEST_RENAME_INTERNAL_FAIL
+#endif
 	return ret;
 }
 
diff --git a/lib/ldb/tests/ldb_key_value_mdb_test.valgrind b/lib/ldb/tests/ldb_key_value_mdb_test.valgrind
new file mode 100644
index 00000000000..17470760f04
--- /dev/null
+++ b/lib/ldb/tests/ldb_key_value_mdb_test.valgrind
@@ -0,0 +1,97 @@
+{
+   Memory allocated by setup
+   Memcheck:Leak
+   match-leak-kinds: possible
+   fun:malloc
+   ...
+   fun:setup
+}
+{
+   Memory allocated by setup
+   Memcheck:Leak
+   match-leak-kinds: possible
+   fun:realloc
+   ...
+   fun:setup
+}
+{
+   Memory allocated by ldb_init
+   Memcheck:Leak
+   match-leak-kinds: possible
+   fun:malloc
+   ...
+   fun:ldb_init
+}
+{
+   Memory allocated by ldb_init
+   Memcheck:Leak
+   match-leak-kinds: possible
+   fun:realloc
+   ...
+   fun:ldb_init
+}
+{
+   Memory allocated by noconn_setup
+   Memcheck:Leak
+   match-leak-kinds: possible
+   fun:malloc
+   ...
+   fun:noconn_setup
+}
+{
+   Memory allocated by parse, which allocates on the NULL context
+   Memcheck:Leak
+   match-leak-kinds: all
+   fun:malloc
+   ...
+   fun:parse
+}
+{
+   Memory allocated by tdb_parse_data
+   Memcheck:Leak
+   match-leak-kinds: all
+   fun:malloc
+   ...
+   fun:tdb_parse_data
+}
+{
+   Memory allocated by ldb_connect
+   Memcheck:Leak
+   match-leak-kinds: possible
+   fun:malloc
+   ...
+   fun:ldb_connect
+}
+{
+   Memory allocated by ldb_connect
+   Memcheck:Leak
+   match-leak-kinds: possible
+   fun:calloc
+   ...
+   fun:ldb_connect
+}
+{
+   Memory allocated by ldb_kv_cache_load
+   Memcheck:Leak
+   match-leak-kinds: possible
+   fun:malloc
+   ...
+   fun:ldb_kv_cache_load
+}
+{
+   Memory allocated by ldb_kv_index_load
+   Memcheck:Leak
+   match-leak-kinds: possible
+   fun:malloc
+   ...
+   fun:ldb_kv_index_load
+}
+{
+   Memory allocated by ldb_asprintf_errstring
+   Memcheck:Leak
+   match-leak-kinds: possible
+   fun:malloc
+   ...
+   fun:ldb_asprintf_errstring
+}
+
diff --git a/lib/ldb/tests/ldb_key_value_test.c b/lib/ldb/tests/ldb_key_value_test.c
new file mode 100644
index 00000000000..e71f81b866e
--- /dev/null
+++ b/lib/ldb/tests/ldb_key_value_test.c
@@ -0,0 +1,843 @@
+/*
+ * Tests exercising the ldb key value operations.
+ *
+ *  Copyright (C) Andrew Bartlett <abartlet at samba.org> 2018
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ */
+
+/*
+ * from cmocka.c:
+ * These headers or their equivalents should be included prior to
+ * including
+ * this header file.
+ *
+ * #include <stdarg.h>
+ * #include <stddef.h>
+ * #include <setjmp.h>
+ *
+ * This allows test applications to use custom definitions of C standard
+ * library functions and types.
+ *
+ */
+
+/*
+ */
+#include <stdarg.h>
+#include <stddef.h>
+#include <setjmp.h>
+#include <cmocka.h>
+
+#include <limits.h>
+#define NO_FAILURE INT_MAX
+#define FAILURE_LDB_ERR LDB_ERR_OTHER
+
+/*
+ * To test failure in ldb_kv_add, ldb_kv_delete, ldb_kv_modify and ldb_kv_rename
+ * we use the following global variables and macros to trigger a failure in
+ * the ldb_kv_<op>_internal functions. This allows testing of the sub
+ * transaction commits and roll backs in those operations.
+ *
+ * NOTE: Not all back ends support nested/sub transactions
+ */
+int cmocka_unit_test_fail_add_internal_after = NO_FAILURE;
+#define CMOCKA_UNIT_TEST_ADD_INTERNAL_FAIL \
+	{\
+		cmocka_unit_test_fail_add_internal_after--;\
+		if (cmocka_unit_test_fail_add_internal_after <= 0) {\
+			assert_int_equal(LDB_SUCCESS, ret);\
+			ret = FAILURE_LDB_ERR;\
+		}\
+	}\
+
+int cmocka_unit_test_fail_delete_internal_after = NO_FAILURE;
+#define CMOCKA_UNIT_TEST_DELETE_INTERNAL_FAIL \
+	{\
+		cmocka_unit_test_fail_delete_internal_after--;\
+		if (cmocka_unit_test_fail_delete_internal_after <= 0) {\
+			assert_int_equal(LDB_SUCCESS, ret);\
+			ret = FAILURE_LDB_ERR;\
+		}\
+	}\
+
+int cmocka_unit_test_fail_rename_internal_after = NO_FAILURE;
+#define CMOCKA_UNIT_TEST_RENAME_INTERNAL_FAIL \
+	{\
+		cmocka_unit_test_fail_rename_internal_after--;\
+		if (cmocka_unit_test_fail_rename_internal_after <= 0) {\
+			assert_int_equal(LDB_SUCCESS, ret);\
+			ret = FAILURE_LDB_ERR;\
+		}\
+	}\
+
+int cmocka_unit_test_fail_modify_internal_after = NO_FAILURE;
+#define CMOCKA_UNIT_TEST_MODIFY_INTERNAL_FAIL \
+	{\
+		cmocka_unit_test_fail_modify_internal_after--;\
+		if (cmocka_unit_test_fail_modify_internal_after <= 0) {\
+			assert_int_equal(LDB_SUCCESS, ret);\
+			ret = FAILURE_LDB_ERR;\
+		}\
+	}\
+
+#include "ldb_key_value/ldb_kv.c"
+
+
+#define DEFAULT_BE  "tdb"
+
+#ifndef TEST_BE
+#define TEST_BE DEFAULT_BE
+#endif /* TEST_BE */
+
+#define NUM_RECS 1024
+
+
+struct test_ctx {
+	struct tevent_context *ev;
+	struct ldb_context *ldb;
+
+	const char *dbfile;
+	const char *lockfile;   /* lockfile is separate */
+
+	const char *dbpath;
+};
+
+/*
+ * Remove the database files
+ */
+static void unlink_old_db(struct test_ctx *test_ctx)
+{
+	int ret;
+
+	errno = 0;
+	ret = unlink(test_ctx->lockfile);
+	if (ret == -1 && errno != ENOENT) {
+		fail();
+	}
+
+	errno = 0;
+	ret = unlink(test_ctx->dbfile);
+	if (ret == -1 && errno != ENOENT) {
+		fail();
+	}
+}
+
+/*
+ * Test setup
+ */
+static int noconn_setup(void **state)
+{
+	struct test_ctx *test_ctx;
+	cmocka_unit_test_fail_add_internal_after = NO_FAILURE;
+	cmocka_unit_test_fail_delete_internal_after = NO_FAILURE;
+	cmocka_unit_test_fail_rename_internal_after = NO_FAILURE;
+	cmocka_unit_test_fail_modify_internal_after = NO_FAILURE;
+
+	test_ctx = talloc_zero(NULL, struct test_ctx);
+	assert_non_null(test_ctx);
+
+	test_ctx->ev = tevent_context_init(test_ctx);
+	assert_non_null(test_ctx->ev);
+
+	test_ctx->ldb = ldb_init(test_ctx, test_ctx->ev);
+	assert_non_null(test_ctx->ldb);
+
+	test_ctx->dbfile = talloc_strdup(test_ctx, "kvopstest.ldb");
+	assert_non_null(test_ctx->dbfile);
+
+	test_ctx->lockfile = talloc_asprintf(test_ctx, "%s-lock",
+					     test_ctx->dbfile);
+	assert_non_null(test_ctx->lockfile);
+
+	test_ctx->dbpath = talloc_asprintf(test_ctx,
+			TEST_BE"://%s", test_ctx->dbfile);
+	assert_non_null(test_ctx->dbpath);
+
+	unlink_old_db(test_ctx);
+	*state = test_ctx;
+	return 0;
+}
+
+/*
+ * Test teardown
+ */
+static int noconn_teardown(void **state)
+{
+	struct test_ctx *test_ctx = talloc_get_type_abort(*state,
+							  struct test_ctx);
+
+	unlink_old_db(test_ctx);
+	talloc_free(test_ctx);
+	return 0;
+}
+
+/*
+ * Test setup
+ */
+static int setup(void **state)
+{
+	struct test_ctx *test_ctx;
+	int ret;
+	struct ldb_ldif *ldif;
+	const char *index_ldif =		\
+		"dn: @INDEXLIST\n"
+		"@IDXGUID: objectUUID\n"
+		"@IDX_DN_GUID: GUID\n"
+		"\n";
+
+	noconn_setup((void **) &test_ctx);
+
+	ret = ldb_connect(test_ctx->ldb, test_ctx->dbpath, 0, NULL);
+	assert_int_equal(ret, 0);
+
+	while ((ldif = ldb_ldif_read_string(test_ctx->ldb, &index_ldif))) {
+		ret = ldb_add(test_ctx->ldb, ldif->msg);
+		assert_int_equal(ret, LDB_SUCCESS);
+	}
+	*state = test_ctx;
+	return 0;
+}
+
+/*
+ * Test teardown
+ */
+static int teardown(void **state)
+{
+	struct test_ctx *test_ctx = talloc_get_type_abort(*state,
+							  struct test_ctx);
+	noconn_teardown((void **) &test_ctx);
+	return 0;
+}
+
+/*
+ * Build an ldb_kv_context that can be passed to the ldb_kv operation under test
+ */
+static struct ldb_kv_context* build_ldb_kv_context(
+	TALLOC_CTX *ctx,
+	struct ldb_module *module,
+	struct ldb_request *req)
+{
+	struct ldb_kv_context *ldb_kv_ctx = NULL;
+
+	ldb_kv_ctx = talloc_zero(ctx, struct ldb_kv_context);
+	assert_non_null(ldb_kv_ctx);
+
+	ldb_kv_ctx->module = module;
+	ldb_kv_ctx->req = req;
+
+	return ldb_kv_ctx;
+}
+
+/*
+ * Build an add request
+ */
+static struct ldb_request *build_add_request(
+	TALLOC_CTX *ctx,
+	struct ldb_context *ldb,
+	const char* dc,
+	const char* uuid,
+	const char* cn)
+{
+	int ret;
+	struct ldb_message *msg;
+	struct ldb_request *req;
+
+	msg = ldb_msg_new(ctx);
+	assert_non_null(msg);
+
+	msg->dn = ldb_dn_new_fmt(msg, ldb, "dc=%s", dc);
+	assert_non_null(msg->dn);
+
+	ret = ldb_msg_add_string(msg, "cn", cn);
+	assert_int_equal(ret, 0);
+
+	ret = ldb_msg_add_string(msg, "objectUUID", uuid);
+	assert_int_equal(ret, 0);
+
+	ret = ldb_msg_sanity_check(ldb, msg);
+	assert_int_equal(ret, LDB_SUCCESS);
+
+	ret = ldb_build_add_req(
+	    &req, ldb, ldb, msg, NULL, NULL, ldb_op_default_callback, NULL);
+	assert_int_equal(ret, LDB_SUCCESS);
+	return req;
+}
+
+/*
+ * Build a delete request
+ */
+static struct ldb_request *build_delete_request(
+	TALLOC_CTX *ctx,
+	struct ldb_context *ldb,
+	const char* dc)
+{
+	int ret = LDB_SUCCESS;
+	struct ldb_dn *dn = NULL;
+	struct ldb_request *req = NULL;
+
+	dn = ldb_dn_new_fmt(ctx, ldb, "dc=%s", dc);
+	assert_non_null(dn);
+
+	ret = ldb_build_del_req(
+	    &req, ldb, ctx, dn, NULL, NULL, ldb_op_default_callback, NULL);
+	assert_int_equal(ret, LDB_SUCCESS);
+	return req;
+}
+
+/*
+ * Build a rename request
+ */
+static struct ldb_request *build_rename_request(
+	TALLOC_CTX *ctx,
+	struct ldb_context *ldb,
+	const char* old_dc,
+	const char* new_dc)
+{
+	int ret = LDB_SUCCESS;
+	struct ldb_dn *old_dn = NULL;
+	struct ldb_dn *new_dn = NULL;
+	struct ldb_request *req = NULL;
+
+	old_dn = ldb_dn_new_fmt(ctx, ldb, "dc=%s", old_dc);
+	assert_non_null(old_dn);
+
+	new_dn = ldb_dn_new_fmt(ctx, ldb, "dc=%s", new_dc);
+	assert_non_null(new_dn);
+
+	ret = ldb_build_rename_req(
+		&req,
+		ldb,
+		ctx,
+		old_dn,
+		new_dn,
+		NULL,
+		NULL,
+		ldb_op_default_callback,
+		NULL);
+	assert_int_equal(ret, LDB_SUCCESS);
+	return req;
+}
+
+/*
+ * Build a ldb modify request
+ */
+static struct ldb_request *build_modify_request(
+	TALLOC_CTX *ctx,
+	struct ldb_context *ldb,
+	const char* dc,
+	const char* cn)
+{
+	int ret;
+	struct ldb_message *msg;
+	struct ldb_request *req;
+
+	msg = ldb_msg_new(ctx);
+	assert_non_null(msg);
+
+	msg->dn = ldb_dn_new_fmt(msg, ldb, "dc=%s", dc);
+	assert_non_null(msg->dn);
+
+	ret = ldb_msg_add_empty(msg, "cn", LDB_FLAG_MOD_REPLACE, NULL);
+	assert_int_equal(ret, LDB_SUCCESS);
+	ret = ldb_msg_add_string(msg, "cn", cn);
+	assert_int_equal(ret, LDB_SUCCESS);
+
+	ret = ldb_msg_sanity_check(ldb, msg);
+	assert_int_equal(ret, LDB_SUCCESS);
+
+	ret = ldb_build_mod_req(
+	    &req, ldb, ldb, msg, NULL, NULL, ldb_op_default_callback, NULL);
+	assert_int_equal(ret, LDB_SUCCESS);
+	return req;
+}
+
+/*
+ * Delete a record from the database
+ */
+static void delete_record(
+	TALLOC_CTX *ctx,
+	struct ldb_context *ldb,
+	const char* dc)
+{
+	struct ldb_kv_context *ldb_kv_ctx = NULL;
+	struct ldb_dn *basedn = NULL;
+	struct ldb_result *result = NULL;
+	struct ldb_request *req = NULL;
+	int ret = LDB_SUCCESS;
+
+	req = build_delete_request(ctx, ldb, dc);
+	ldb_kv_ctx = build_ldb_kv_context(ctx, ldb->modules, req);
+
+	ret = ldb_transaction_start(ldb);
+	assert_int_equal(ret, LDB_SUCCESS);
+
+	cmocka_unit_test_fail_delete_internal_after = NO_FAILURE;
+	cmocka_unit_test_fail_modify_internal_after = NO_FAILURE;
+	ret = ldb_kv_delete(ldb_kv_ctx);
+	assert_int_equal(ret, LDB_SUCCESS);
+	TALLOC_FREE(ldb_kv_ctx);
+	TALLOC_FREE(req);
+
+	ret = ldb_transaction_commit(ldb);
+	assert_int_equal(ret, LDB_SUCCESS);
+
+	/*
+	 * Ensure that the record was actually deleted.
+	 */
+	basedn = ldb_dn_new_fmt(ctx, ldb, "dc=%s", dc);
+	assert_non_null(basedn);
+
+	/*
+	 * DN search, indexed
+	 */
+	ret = ldb_search(ldb, ctx, &result, basedn, LDB_SCOPE_BASE, NULL, NULL);
+	assert_int_equal(ret, LDB_SUCCESS);
+	assert_non_null(result);
+	assert_int_equal(result->count, 0);
+	TALLOC_FREE(basedn);
+	TALLOC_FREE(result);
+}
+
+/*
+ * Add a record to the database
+ */
+static void add_record(
+	TALLOC_CTX *ctx,
+	struct ldb_context *ldb,
+	const char* dc,
+	const char* uuid,
+	const char* cn)
+{
+
+	struct ldb_request *req = NULL;
+	int ret = LDB_SUCCESS;
+	struct ldb_kv_context *ldb_kv_ctx = NULL;
+	struct ldb_dn *basedn = NULL;
+	struct ldb_result *result = NULL;
+
+	req = build_add_request(ctx, ldb, dc, uuid, cn);
+
+	ldb_req_set_location(req, "add_record");
+
+	assert_int_equal(ret, LDB_SUCCESS);
+
+
+	ldb_kv_ctx = build_ldb_kv_context(ctx, ldb->modules, req);
+	cmocka_unit_test_fail_add_internal_after = NO_FAILURE;
+	cmocka_unit_test_fail_modify_internal_after = NO_FAILURE;
+
+	ret = ldb_transaction_start(ldb);
+	assert_int_equal(ret, LDB_SUCCESS);
+
+	ret = ldb_kv_add(ldb_kv_ctx);
+	assert_int_equal(ret, LDB_SUCCESS);
+	TALLOC_FREE(ldb_kv_ctx);
+	TALLOC_FREE(req);
+
+	ret = ldb_transaction_commit(ldb);
+	assert_int_equal(ret, LDB_SUCCESS);
+
+	/*
+	 * Ensure that the record was actually written.
+	 */
+	basedn = ldb_dn_new_fmt(ctx, ldb, "dc=%s", dc);
+	assert_non_null(basedn);
+
+	/*
+	 * DN search, indexed
+	 */
+	ret = ldb_search(ldb, ctx, &result, basedn, LDB_SCOPE_BASE, NULL, NULL);
+	assert_int_equal(ret, LDB_SUCCESS);
+	assert_non_null(result);
+	assert_int_equal(result->count, 1);
+	TALLOC_FREE(result);
+
+
+	/*
+	 * CN search unindexed
+	 */
+	ret = ldb_search(
+		ldb,
+		ctx,
+		&result,
+		basedn,
+		LDB_SCOPE_SUBTREE,
+		NULL,
+		"(cn=%s)",
+		cn);
+	assert_int_equal(ret, LDB_SUCCESS);
+	assert_non_null(result);
+	assert_int_equal(result->count, 1);
+	TALLOC_FREE(result);
+	TALLOC_FREE(basedn);
+}
+
+/*
+ * Test that a failed add operation does not change the database.
+ */
+static void test_add_failure(void **state)
+{
+	int ret = LDB_SUCCESS;
+	struct test_ctx *test_ctx = talloc_get_type_abort(*state,
+							  struct test_ctx);
+	struct ldb_request *req = NULL;
+	struct ldb_dn *basedn = NULL;
+	struct ldb_result *result = NULL;
+	struct ldb_kv_context *ldb_kv_ctx = NULL;
+
+	TALLOC_CTX *tmp_ctx = talloc_new(test_ctx);
+	assert_non_null(tmp_ctx);
+
+	req = build_add_request(
+		tmp_ctx,
+		test_ctx->ldb,
+		"test_add_failure",
+		"0123456789abcdef",
+		"test_add_failure_value");
+
+	ldb_req_set_location(req, "test_add_failure");
+
+	ldb_kv_ctx = build_ldb_kv_context(tmp_ctx, test_ctx->ldb->modules, req);
+	cmocka_unit_test_fail_add_internal_after = 1;
+
+	ret = ldb_transaction_start(test_ctx->ldb);
+	assert_int_equal(ret, LDB_SUCCESS);
+
+	ret = ldb_kv_add(ldb_kv_ctx);
+
+	assert_int_equal(ret, FAILURE_LDB_ERR);
+	TALLOC_FREE(ldb_kv_ctx);
+	TALLOC_FREE(req);
+
+
+	/*
+	 * a search for "cn=test_add_failure_value" should fail
+	 * as the transaction containing the operation should have been
+	 * rolled back leaving the database consistent
+	 *
+	 * This should be an un-indexed search so the index caches won't be
+	 * used.
+	 */
+	basedn = ldb_dn_new_fmt(
+		tmp_ctx,
+		test_ctx->ldb,
+		"dc=%s",
+		"test_add_failure");
+	assert_non_null(basedn);
+
+	ret = ldb_search(
+		test_ctx->ldb, tmp_ctx,
+		&result,
+		basedn,
+		LDB_SCOPE_SUBTREE,
+		NULL,
+		"(cn=%s)",
+		"test_add_failure_value");
+	assert_int_equal(ret, LDB_SUCCESS);
+	assert_non_null(result);
+	assert_int_equal(result->count, 0);
+	TALLOC_FREE(basedn);
+	TALLOC_FREE(result);
+
+	ldb_transaction_cancel(test_ctx->ldb);
+	TALLOC_FREE(tmp_ctx);
+}
+
+
+/*
+ * Test that a failed delete operation does not modify the database.
+ */
+static void test_delete_failure(void **state)
+{
+	int ret = LDB_SUCCESS;
+	struct test_ctx *test_ctx = talloc_get_type_abort(*state,
+							  struct test_ctx);
+	struct ldb_request *req = NULL;
+	struct ldb_dn *basedn = NULL;
+	struct ldb_result *result = NULL;
+	struct ldb_kv_context *ldb_kv_ctx = NULL;
+
+	TALLOC_CTX *tmp_ctx = talloc_new(test_ctx);
+	assert_non_null(tmp_ctx);
+
+	add_record(
+		tmp_ctx,
+		test_ctx->ldb,
+		"test_delete_failure",
+		"0123456789abcded",
+		"test_delete_failure_value");
+
+	req = build_delete_request(
+		tmp_ctx,
+		test_ctx->ldb,
+		"test_delete_failure");
+
+	ldb_kv_ctx = build_ldb_kv_context(tmp_ctx, test_ctx->ldb->modules, req);
+	cmocka_unit_test_fail_delete_internal_after = 1;
+
+	ret = ldb_transaction_start(test_ctx->ldb);
+	assert_int_equal(ret, LDB_SUCCESS);
+
+	ret = ldb_kv_delete(ldb_kv_ctx);
+	assert_int_equal(ret, FAILURE_LDB_ERR);
+	TALLOC_FREE(ldb_kv_ctx);
+	TALLOC_FREE(req);
+
+	/*
+	 * a search for "cn=test_add_failure_value" should succeed
+	 * as the transaction containing the operation should have been
+	 * rolled back leaving the database consistent
+	 *
+	 * This should be an un-indexed search so the index caches won't be
+	 * used.
+	 */
+	basedn = ldb_dn_new_fmt(
+		tmp_ctx,
+		test_ctx->ldb,
+		"dc=%s",
+		"test_delete_failure");
+	assert_non_null(basedn);
+
+	ret = ldb_search(
+		test_ctx->ldb, tmp_ctx,
+		&result,
+		basedn,
+		LDB_SCOPE_SUBTREE,
+		NULL,
+		"(cn=%s)",
+		"test_delete_failure_value");
+	assert_int_equal(ret, LDB_SUCCESS);
+	assert_non_null(result);
+	assert_int_equal(result->count, 1);
+	TALLOC_FREE(basedn);
+	TALLOC_FREE(result);
+
+
+	ldb_transaction_cancel(test_ctx->ldb);
+	delete_record(
+		tmp_ctx,
+		test_ctx->ldb,
+		"test_delete_failure");
+	TALLOC_FREE(tmp_ctx);
+}
+
+/*
+ * Test that a failed rename operation dies not change the database
+ */
+static void test_rename_failure(void **state)
+{
+	int ret = LDB_SUCCESS;
+	struct test_ctx *test_ctx = talloc_get_type_abort(*state,
+							  struct test_ctx);
+	struct ldb_request *req = NULL;
+	struct ldb_dn *basedn = NULL;
+	struct ldb_result *result = NULL;
+	struct ldb_kv_context *ldb_kv_ctx = NULL;
+
+	TALLOC_CTX *tmp_ctx = talloc_new(test_ctx);
+	assert_non_null(tmp_ctx);
+
+	add_record(
+		tmp_ctx,
+		test_ctx->ldb,
+		"test_rename_failure",
+		"0123456789abcdec",
+		"test_rename_failure_value");
+
+	req = build_rename_request(
+		tmp_ctx,
+		test_ctx->ldb,
+		"test_rename_failure",
+		"test_rename_failure_renamed");
+
+	ldb_kv_ctx = build_ldb_kv_context(tmp_ctx, test_ctx->ldb->modules, req);
+	cmocka_unit_test_fail_rename_internal_after = 1;
+
+	ret = ldb_transaction_start(test_ctx->ldb);
+	assert_int_equal(ret, LDB_SUCCESS);
+
+	ret = ldb_kv_rename(ldb_kv_ctx);
+	assert_int_equal(ret, FAILURE_LDB_ERR);
+	TALLOC_FREE(ldb_kv_ctx);
+	TALLOC_FREE(req);
+
+	/*
+	 * The original record should be present
+	 */
+	basedn = ldb_dn_new_fmt(
+		tmp_ctx,
+		test_ctx->ldb,
+		"dc=%s",
+		"test_rename_failure");
+	assert_non_null(basedn);
+
+	ret = ldb_search(
+		test_ctx->ldb, tmp_ctx,
+		&result,
+		basedn,
+		LDB_SCOPE_SUBTREE,
+		NULL,
+		"(cn=%s)",
+		"test_rename_failure_value");
+	assert_int_equal(ret, LDB_SUCCESS);
+	assert_non_null(result);
+	assert_int_equal(result->count, 1);
+	TALLOC_FREE(basedn);
+	TALLOC_FREE(result);
+
+	/*
+	 * And the renamed record should not be present
+	 */
+	basedn = ldb_dn_new_fmt(
+		tmp_ctx,
+		test_ctx->ldb,
+		"dc=%s",
+		"test_rename_failure_renamed");
+	assert_non_null(basedn);
+
+	ret = ldb_search(
+		test_ctx->ldb, tmp_ctx,
+		&result,
+		basedn,
+		LDB_SCOPE_SUBTREE,
+		NULL,
+		"(cn=%s)",
+		"test_rename_failure_value");
+	assert_int_equal(ret, LDB_SUCCESS);
+	assert_non_null(result);
+	assert_int_equal(result->count, 0);
+	TALLOC_FREE(basedn);
+	TALLOC_FREE(result);
+
+	ldb_transaction_cancel(test_ctx->ldb);
+	delete_record(
+		tmp_ctx,
+		test_ctx->ldb,
+		"test_rename_failure");
+	TALLOC_FREE(tmp_ctx);
+}
+
+/*
+ * Test that a failed modification operation does not change the database
+ */
+static void test_modify_failure(void **state)
+{
+	int ret = LDB_SUCCESS;
+	struct test_ctx *test_ctx = talloc_get_type_abort(*state,
+							  struct test_ctx);
+	struct ldb_request *req = NULL;
+	struct ldb_dn *basedn = NULL;
+	struct ldb_result *result = NULL;
+	struct ldb_kv_context *ldb_kv_ctx = NULL;
+
+	TALLOC_CTX *tmp_ctx = talloc_new(test_ctx);
+	assert_non_null(tmp_ctx);
+
+	add_record(
+		tmp_ctx,
+		test_ctx->ldb,
+		"test_modify_failure",
+		"0123456789abcdeb",
+		"test_modify_failure_value");
+
+	req = build_modify_request(
+		tmp_ctx,
+		test_ctx->ldb,
+		"test_modify_failure",
+		"test_modify_failure_value_modified");
+
+	ldb_kv_ctx = build_ldb_kv_context(tmp_ctx, test_ctx->ldb->modules, req);
+	cmocka_unit_test_fail_modify_internal_after = 2;
+
+	ret = ldb_transaction_start(test_ctx->ldb);
+	assert_int_equal(ret, LDB_SUCCESS);
+
+	ret = ldb_kv_modify(ldb_kv_ctx);
+	assert_int_equal(ret, FAILURE_LDB_ERR);
+	TALLOC_FREE(ldb_kv_ctx);
+	TALLOC_FREE(req);
+
+
+	/*
+	 * The original value should be present
+	 */
+	basedn = ldb_dn_new_fmt(
+		tmp_ctx,
+		test_ctx->ldb,
+		"dc=%s",
+		"test_modify_failure");
+	assert_non_null(basedn);
+
+	ret = ldb_search(
+		test_ctx->ldb, tmp_ctx,
+		&result,
+		basedn,
+		LDB_SCOPE_SUBTREE,
+		NULL,
+		"(cn=%s)",
+		"test_modify_failure_value");
+	assert_int_equal(ret, LDB_SUCCESS);
+	assert_non_null(result);
+	assert_int_equal(result->count, 1);
+	TALLOC_FREE(result);
+
+	/*
+	 * And the modified record should not be present
+	 */
+	ret = ldb_search(
+		test_ctx->ldb, tmp_ctx,
+		&result,
+		basedn,
+		LDB_SCOPE_SUBTREE,
+		NULL,
+		"(cn=%s)",
+		"test_modify_failure_value_modified");
+	assert_int_equal(ret, LDB_SUCCESS);
+	assert_non_null(result);
+	assert_int_equal(result->count, 0);
+	TALLOC_FREE(basedn);
+	TALLOC_FREE(result);
+
+	ldb_transaction_cancel(test_ctx->ldb);
+	delete_record(
+		tmp_ctx,
+		test_ctx->ldb,
+		"test_modify_failure");
+	TALLOC_FREE(tmp_ctx);
+}
+
+int main(int argc, const char **argv)
+{
+	const struct CMUnitTest tests[] = {
+		cmocka_unit_test_setup_teardown(
+			test_add_failure,
+			setup,
+			teardown),
+		cmocka_unit_test_setup_teardown(
+			test_delete_failure,
+			setup,
+			teardown),
+		cmocka_unit_test_setup_teardown(
+			test_rename_failure,
+			setup,
+			teardown),
+		cmocka_unit_test_setup_teardown(
+			test_modify_failure,
+			setup,
+			teardown),
+	};
+
+	return cmocka_run_group_tests(tests, NULL, NULL);
+}
diff --git a/lib/ldb/tests/ldb_kv_ops_test.c b/lib/ldb/tests/ldb_kv_ops_test.c
index d6a4dc058e5..7c7da196dc1 100644
--- a/lib/ldb/tests/ldb_kv_ops_test.c
+++ b/lib/ldb/tests/ldb_kv_ops_test.c
@@ -1440,6 +1440,12 @@ static void test_delete_transaction_isolation(void **state)
 				    val.data);
 			exit(LDB_ERR_OPERATIONS_ERROR);
 		}
+		ret = ldb_kv->kv_ops->unlock_read(ldb->modules);
+		if (ret != LDB_SUCCESS) {
+			print_error(__location__": unlock_read returned (%d)\n",
+				    ret);
+			exit(ret);
+		}
 
 		/*
 		 * Check that KEY2 is not there
@@ -1468,7 +1474,7 @@ static void test_delete_transaction_isolation(void **state)
 				    ret);
 			exit(ret);
 		}
-
+		TALLOC_FREE(tmp_ctx);
 		exit(0);
 	}
 	close(to_child[0]);
diff --git a/lib/ldb/tests/ldb_kv_ops_test.valgrind b/lib/ldb/tests/ldb_kv_ops_test.valgrind
new file mode 100644
index 00000000000..17470760f04
--- /dev/null
+++ b/lib/ldb/tests/ldb_kv_ops_test.valgrind
@@ -0,0 +1,97 @@
+{
+   Memory allocated by setup
+   Memcheck:Leak
+   match-leak-kinds: possible
+   fun:malloc
+   ...
+   fun:setup
+}
+{
+   Memory allocated by setup
+   Memcheck:Leak
+   match-leak-kinds: possible
+   fun:realloc
+   ...
+   fun:setup
+}
+{
+   Memory allocated by ldb_init
+   Memcheck:Leak
+   match-leak-kinds: possible
+   fun:malloc
+   ...
+   fun:ldb_init
+}
+{
+   Memory allocated by ldb_init
+   Memcheck:Leak
+   match-leak-kinds: possible
+   fun:realloc
+   ...
+   fun:ldb_init
+}
+{
+   Memory allocated by noconn_setup
+   Memcheck:Leak
+   match-leak-kinds: possible
+   fun:malloc
+   ...
+   fun:noconn_setup
+}
+{
+   Memory allocated by parse, which allocates on the NULL context
+   Memcheck:Leak
+   match-leak-kinds: all
+   fun:malloc
+   ...
+   fun:parse
+}
+{
+   Memory allocated by tdb_parse_data
+   Memcheck:Leak
+   match-leak-kinds: all
+   fun:malloc
+   ...
+   fun:tdb_parse_data
+}
+{
+   Memory allocated by ldb_connect
+   Memcheck:Leak
+   match-leak-kinds: possible
+   fun:malloc
+   ...
+   fun:ldb_connect
+}
+{
+   Memory allocated by ldb_connect
+   Memcheck:Leak
+   match-leak-kinds: possible
+   fun:calloc
+   ...
+   fun:ldb_connect
+}
+{
+   Memory allocated by ldb_kv_cache_load
+   Memcheck:Leak
+   match-leak-kinds: possible
+   fun:malloc
+   ...
+   fun:ldb_kv_cache_load
+}
+{
+   Memory allocated by ldb_kv_index_load
+   Memcheck:Leak
+   match-leak-kinds: possible
+   fun:malloc
+   ...
+   fun:ldb_kv_index_load
+}
+{
+   Memory allocated by ldb_asprintf_errstring
+   Memcheck:Leak
+   match-leak-kinds: possible
+   fun:malloc
+   ...
+   fun:ldb_asprintf_errstring
+}
+
diff --git a/lib/ldb/wscript b/lib/ldb/wscript
index 7891693a0fd..e0af5516c79 100644
--- a/lib/ldb/wscript
+++ b/lib/ldb/wscript
@@ -501,6 +501,16 @@ def build(bld):
                          deps='cmocka ldb',
                          install=False)
 
+        bld.SAMBA_BINARY('ldb_key_value_tdb_test',
+                         bld.SUBDIR('ldb_key_value',
+                             '''ldb_kv_search.c
+                                ldb_kv_index.c
+                                ldb_kv_cache.c''') +
+                         'tests/ldb_key_value_test.c',
+                         cflags='-DTEST_BE=\"tdb\"',
+                         deps='cmocka ldb ldb_tdb',
+                         install=False)
+
         if bld.CONFIG_SET('HAVE_LMDB'):
             bld.SAMBA_BINARY('ldb_mdb_mod_op_test',
                              source='tests/ldb_mod_op_test.c',
@@ -524,6 +534,24 @@ def build(bld):
                              cflags='-DTEST_BE=\"mdb\"',
                              deps='cmocka ldb',
                              install=False)
+
+            #
+            # We rely on the versions of the ldb_key_value functions included
+            # in ldb_key_value_test.c taking priority over the versions
+            # in the ldb_key_value shared library.
+            # If this turns out to not be the case, the dependencies will
+            # need to be unrolled, and all the source files included and the
+            # ldb_tdb module initialization code will need to be called
+            # manually.
+            bld.SAMBA_BINARY('ldb_key_value_mdb_test',
+                             bld.SUBDIR('ldb_key_value',
+                                 '''ldb_kv_search.c
+                                    ldb_kv_index.c
+                                    ldb_kv_cache.c''') +
+                             'tests/ldb_key_value_test.c',
+                             cflags='-DTEST_BE=\"mdb\"',
+                             deps='cmocka ldb ldb_tdb',
+                             install=False)
         else:
             bld.SAMBA_BINARY('ldb_no_lmdb_test',
                              source='tests/ldb_no_lmdb_test.c',
@@ -568,6 +596,11 @@ def test(ctx):
                  'ldb_msg_test',
                  'ldb_tdb_kv_ops_test',
                  'ldb_tdb_test',
+                 # we currently don't run ldb_key_value_tdb_test as it
+                 # currently only tests the nested/sub transaction handling
+                 # on operations which the TDB backend does not currently
+                 # support
+                 # 'ldb_key_value_tdb_test
                  'ldb_match_test']
 
     if env.HAVE_LMDB:
@@ -576,7 +609,8 @@ def test(ctx):
                      # we don't want to run ldb_lmdb_size_test (which proves we can
                      # fit > 4G of data into the DB), it would fill up the disk on
                      # many of our test instances
-                     'ldb_mdb_kv_ops_test']
+                     'ldb_mdb_kv_ops_test',
+                     'ldb_key_value_mdb_test']
     else:
         test_exes += ['ldb_no_lmdb_test']
 
-- 
2.18.1


From 2ec0e1e48761c88f3236a377417803b03b25289a Mon Sep 17 00:00:00 2001
From: Gary Lockyer <gary at catalyst.net.nz>
Date: Thu, 7 Mar 2019 10:40:10 +1300
Subject: [PATCH 8/8] lib ldb key value: fix index buffering

As a performance enhancement the key value layer maintains a cache of
the index records, which is written to disk as part of a prepare commit.
This patch adds an extra cache at the operation layer to ensure that the
cached indexes remain consistent in the event of an operation failing.

Signed-off-by: Gary Lockyer <gary at catalyst.net.nz>
---
 lib/ldb/ldb_key_value/ldb_kv.c               |  22 --
 lib/ldb/ldb_key_value/ldb_kv.h               |  12 ++
 lib/ldb/ldb_key_value/ldb_kv_index.c         | 207 ++++++++++++++++++-
 selftest/knownfail.d/test_modify_transaction |   3 -
 4 files changed, 208 insertions(+), 36 deletions(-)
 delete mode 100644 selftest/knownfail.d/test_modify_transaction

diff --git a/lib/ldb/ldb_key_value/ldb_kv.c b/lib/ldb/ldb_key_value/ldb_kv.c
index fa0dcd0615a..ec00cbf5bbd 100644
--- a/lib/ldb/ldb_key_value/ldb_kv.c
+++ b/lib/ldb/ldb_key_value/ldb_kv.c
@@ -432,14 +432,6 @@ static bool ldb_kv_single_valued(const struct ldb_schema_attribute *a,
 	return false;
 }
 
-
-/*
- * Place holder actual implementation to be added in subsequent commits
- */
-int ldb_kv_index_sub_transaction_start(struct ldb_kv_private *ldb_kv)
-{
-	return LDB_SUCCESS;
-}
 /*
  * Starts a sub transaction if they are supported by the backend
  */
@@ -454,13 +446,6 @@ static int ldb_kv_sub_transaction_start(struct ldb_kv_private *ldb_kv)
 	return ret;
 }
 
-/*
- * Place holder actual implementation to be added in subsequent commits
- */
-int ldb_kv_index_sub_transaction_commit(struct ldb_kv_private *ldb_kv)
-{
-	return LDB_SUCCESS;
-}
 /*
  * Commits a sub transaction if they are supported by the backend
  */
@@ -476,13 +461,6 @@ static int ldb_kv_sub_transaction_commit(struct ldb_kv_private *ldb_kv)
 	return ret;
 }
 
-/*
- * Place holder actual implementation to be added in subsequent commits
- */
-int ldb_kv_index_sub_transaction_cancel(struct ldb_kv_private *ldb_kv)
-{
-	return LDB_SUCCESS;
-}
 /*
  * Cancels a sub transaction if they are supported by the backend
  */
diff --git a/lib/ldb/ldb_key_value/ldb_kv.h b/lib/ldb/ldb_key_value/ldb_kv.h
index 3c1adbed23a..c643ae87367 100644
--- a/lib/ldb/ldb_key_value/ldb_kv.h
+++ b/lib/ldb/ldb_key_value/ldb_kv.h
@@ -72,7 +72,19 @@ struct ldb_kv_private {
 
 	bool check_base;
 	bool disallow_dn_filter;
+	/*
+	 * To improve the performance of batch operations we maintain a cache
+	 * of index records, these entries get written to disk in the
+	 * prepare_commit phase.
+	 */
 	struct ldb_kv_idxptr *idxptr;
+	/*
+	 * To ensure that the indexes in idxptr are consistent we cache any
+	 * index updates during an operation, i.e. ldb_kv_add, ldb_kv_delete ...
+	 * Once the changes to the data record have been commited to disk
+	 * the contents of this cache are copied to idxptr
+	 */
+	struct ldb_kv_idxptr *nested_idx_ptr;
 	bool prepared_commit;
 	int read_lock_count;
 
diff --git a/lib/ldb/ldb_key_value/ldb_kv_index.c b/lib/ldb/ldb_key_value/ldb_kv_index.c
index 6d02c91a597..c6faf9e8958 100644
--- a/lib/ldb/ldb_key_value/ldb_kv_index.c
+++ b/lib/ldb/ldb_key_value/ldb_kv_index.c
@@ -162,6 +162,11 @@ struct dn_list {
 };
 
 struct ldb_kv_idxptr {
+	/*
+	 * In memory tdb to cache the index updates performed during a
+	 * transaction.  This improves the performance of operations like
+	 * re-index and join
+	 */
 	struct tdb_context *itdb;
 	int error;
 };
@@ -353,7 +358,7 @@ static int ldb_kv_dn_list_load(struct ldb_module *module,
 	struct ldb_message *msg;
 	int ret, version;
 	struct ldb_message_element *el;
-	TDB_DATA rec;
+	TDB_DATA rec = {0};
 	struct dn_list *list2;
 	TDB_DATA key;
 
@@ -368,7 +373,19 @@ static int ldb_kv_dn_list_load(struct ldb_module *module,
 	key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
 	key.dsize = strlen((char *)key.dptr);
 
-	rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
+	/*
+	 * Have we cached this index record?
+	 * If we have a nested transaction cache try that first.
+	 * then try the transaction cache.
+	 * if the record is not cached it will need to be read from disk.
+	 */
+	if (ldb_kv->nested_idx_ptr != NULL &&
+	    ldb_kv->nested_idx_ptr->itdb != NULL) {
+		rec = tdb_fetch(ldb_kv->nested_idx_ptr->itdb, key);
+	}
+	if (rec.dptr == NULL) {
+		rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
+	}
 	if (rec.dptr == NULL) {
 		goto normal_index;
 	}
@@ -709,14 +726,20 @@ static int ldb_kv_dn_list_store(struct ldb_module *module,
 {
 	struct ldb_kv_private *ldb_kv = talloc_get_type(
 	    ldb_module_get_private(module), struct ldb_kv_private);
-	TDB_DATA rec, key;
-	int ret;
-	struct dn_list *list2;
+	TDB_DATA rec = {0};
+	TDB_DATA key = {0};
+
+	int ret = LDB_SUCCESS;
+	struct dn_list *list2 = NULL;
+	struct ldb_kv_idxptr *cache = NULL;
 
 	if (ldb_kv->idxptr == NULL) {
 		return ldb_kv_dn_list_store_full(module, ldb_kv, dn, list);
 	}
 
+	/*
+	 * Open the transaction and nested transaction caches if needed
+	 */
 	if (ldb_kv->idxptr->itdb == NULL) {
 		ldb_kv->idxptr->itdb =
 		    tdb_open(NULL, 1000, TDB_INTERNAL, O_RDWR, 0);
@@ -724,6 +747,23 @@ static int ldb_kv_dn_list_store(struct ldb_module *module,
 			return LDB_ERR_OPERATIONS_ERROR;
 		}
 	}
+	if (ldb_kv->nested_idx_ptr != NULL &&
+	    ldb_kv->nested_idx_ptr->itdb == NULL) {
+		ldb_kv->nested_idx_ptr->itdb =
+		    tdb_open(NULL, 11, TDB_INTERNAL, O_RDWR, 0);
+		if (ldb_kv->nested_idx_ptr->itdb == NULL) {
+			return LDB_ERR_OPERATIONS_ERROR;
+		}
+	}
+	/*
+	 * Determine the index record cache to update
+	 * If there is a nested transaction active we update that cache
+	 * other wise we update the transaction level cache
+	 */
+	cache = ldb_kv->idxptr;
+	if (ldb_kv->nested_idx_ptr != NULL) {
+		cache = ldb_kv->nested_idx_ptr;
+	}
 
 	key.dptr = discard_const_p(unsigned char, ldb_dn_get_linearized(dn));
 	if (key.dptr == NULL) {
@@ -731,7 +771,17 @@ static int ldb_kv_dn_list_store(struct ldb_module *module,
 	}
 	key.dsize = strlen((char *)key.dptr);
 
-	rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
+	/*
+	 * Get the index record.
+	 * Try the nested transaction cache first
+	 * then the transaction cache
+	 */
+	if( ldb_kv->nested_idx_ptr != NULL) {
+		rec = tdb_fetch(ldb_kv->nested_idx_ptr->itdb, key);
+	}
+	if (rec.dptr == NULL) {
+		rec = tdb_fetch(ldb_kv->idxptr->itdb, key);
+	}
 	if (rec.dptr != NULL) {
 		list2 = ldb_kv_index_idxptr(module, rec, false);
 		if (list2 == NULL) {
@@ -744,7 +794,7 @@ static int ldb_kv_dn_list_store(struct ldb_module *module,
 		return LDB_SUCCESS;
 	}
 
-	list2 = talloc(ldb_kv->idxptr, struct dn_list);
+	list2 = talloc(cache, struct dn_list);
 	if (list2 == NULL) {
 		return LDB_ERR_OPERATIONS_ERROR;
 	}
@@ -759,9 +809,10 @@ static int ldb_kv_dn_list_store(struct ldb_module *module,
 	 * This is not a store into the main DB, but into an in-memory
 	 * TDB, so we don't need a guard on ltdb->read_only
 	 */
-	ret = tdb_store(ldb_kv->idxptr->itdb, key, rec, TDB_INSERT);
+	ret = tdb_store(cache->itdb, key, rec, TDB_INSERT);
 	if (ret != 0) {
-		return ltdb_err_map(tdb_error(ldb_kv->idxptr->itdb));
+		return ltdb_err_map(
+			tdb_error(cache->itdb));
 	}
 	return LDB_SUCCESS;
 }
@@ -845,8 +896,11 @@ int ldb_kv_index_transaction_cancel(struct ldb_module *module)
 	if (ldb_kv->idxptr && ldb_kv->idxptr->itdb) {
 		tdb_close(ldb_kv->idxptr->itdb);
 	}
-	talloc_free(ldb_kv->idxptr);
-	ldb_kv->idxptr = NULL;
+	TALLOC_FREE(ldb_kv->idxptr);
+	if (ldb_kv->nested_idx_ptr && ldb_kv->nested_idx_ptr->itdb) {
+		tdb_close(ldb_kv->nested_idx_ptr->itdb);
+	}
+	TALLOC_FREE(ldb_kv->nested_idx_ptr);
 	return LDB_SUCCESS;
 }
 
@@ -3169,3 +3223,134 @@ int ldb_kv_reindex(struct ldb_module *module)
 	}
 	return LDB_SUCCESS;
 }
+
+/*
+ * Copy the contents of the nested transaction index cache record to the
+ * transaction index cache.
+ */
+static int ldb_kv_sub_transaction_traverse(
+	struct tdb_context *tdb,
+	TDB_DATA key,
+	TDB_DATA data,
+	void *state)
+{
+	struct ldb_module *module = state;
+	struct ldb_dn *dn = NULL;
+	struct ldb_context *ldb = ldb_module_get_ctx(module);
+	struct ldb_kv_private *ldb_kv = talloc_get_type(
+	    ldb_module_get_private(module), struct ldb_kv_private);
+	struct ldb_val v = {0};
+	TDB_DATA rec = {0};
+	struct dn_list *list = NULL;
+	struct dn_list *list2 = NULL;
+	int ret = 0;
+
+	list = ldb_kv_index_idxptr(module, data, true);
+	if (list == NULL) {
+		ldb_kv->idxptr->error = LDB_ERR_OPERATIONS_ERROR;
+		return -1;
+	}
+
+	v.data = key.dptr;
+	v.length = strnlen((char *)key.dptr, key.dsize);
+
+	dn = ldb_dn_from_ldb_val(module, ldb, &v);
+	if (dn == NULL) {
+		ldb_asprintf_errstring(
+			ldb,
+			"Failed to parse index key %*.*s as an LDB DN",
+			(int)v.length,
+			(int)v.length,
+			(const char *)v.data);
+		ldb_kv->nested_idx_ptr->error = LDB_ERR_OPERATIONS_ERROR;
+		return -1;
+	}
+
+	list2 = talloc(ldb_kv->idxptr, struct dn_list);
+	if (list2 == NULL) {
+		return LDB_ERR_OPERATIONS_ERROR;
+	}
+	list2->dn = talloc_steal(list2, list->dn);
+	list2->count = list->count;
+
+	rec.dptr = (uint8_t *)&list2;
+	rec.dsize = sizeof(void *);
+
+
+	/*
+	 * This is not a store into the main DB, but into an in-memory
+	 * TDB, so we don't need a guard on ltdb->read_only
+	 */
+	ret = tdb_store(ldb_kv->idxptr->itdb, key, rec, TDB_INSERT);
+	talloc_free(dn);
+	if (ret != 0) {
+		ldb_kv->idxptr->error = ltdb_err_map(
+		    tdb_error(ldb_kv->idxptr->itdb));
+		return -1;
+	}
+	return 0;
+}
+
+/*
+ * Initialise the index cache for a sub transaction.
+ */
+int ldb_kv_index_sub_transaction_start(struct ldb_kv_private *ldb_kv)
+{
+	ldb_kv->nested_idx_ptr = talloc_zero(ldb_kv, struct ldb_kv_idxptr);
+	if (ldb_kv->nested_idx_ptr == NULL) {
+		return LDB_ERR_OPERATIONS_ERROR;
+	}
+	return LDB_SUCCESS;
+}
+
+/*
+ * Clear the contents of the nested transaction index cache when the nested
+ * transaction is cancelled.
+ */
+int ldb_kv_index_sub_transaction_cancel(struct ldb_kv_private *ldb_kv)
+{
+	if (ldb_kv->nested_idx_ptr != NULL &&
+	    ldb_kv->nested_idx_ptr->itdb != NULL) {
+		tdb_close(ldb_kv->nested_idx_ptr->itdb);
+	}
+	TALLOC_FREE(ldb_kv->nested_idx_ptr);
+	return LDB_SUCCESS;
+}
+
+/*
+ * Commit a nested transaction,
+ * Copy the contents of the nested transaction index cache to the
+ * transaction index cache.
+ */
+int ldb_kv_index_sub_transaction_commit(struct ldb_kv_private *ldb_kv)
+{
+	int ret = 0;
+
+	if (ldb_kv->nested_idx_ptr == NULL) {
+		return LDB_SUCCESS;
+	}
+	if (ldb_kv->nested_idx_ptr->itdb == NULL) {
+		return LDB_SUCCESS;
+	}
+	tdb_traverse(
+	    ldb_kv->nested_idx_ptr->itdb,
+	    ldb_kv_sub_transaction_traverse,
+	    ldb_kv->module);
+	tdb_close(ldb_kv->nested_idx_ptr->itdb);
+	ldb_kv->nested_idx_ptr->itdb = NULL;
+
+	ret = ldb_kv->nested_idx_ptr->error;
+	if (ret != LDB_SUCCESS) {
+		struct ldb_context *ldb = ldb_module_get_ctx(ldb_kv->module);
+		if (!ldb_errstring(ldb)) {
+			ldb_set_errstring(ldb, ldb_strerror(ret));
+		}
+		ldb_asprintf_errstring(
+			ldb,
+			__location__": Failed to update index records in "
+			"sub transaction commit: %s",
+			ldb_errstring(ldb));
+	}
+	TALLOC_FREE(ldb_kv->nested_idx_ptr);
+	return ret;
+}
diff --git a/selftest/knownfail.d/test_modify_transaction b/selftest/knownfail.d/test_modify_transaction
deleted file mode 100644
index d53070a664b..00000000000
--- a/selftest/knownfail.d/test_modify_transaction
+++ /dev/null
@@ -1,3 +0,0 @@
-^ldb.python.api.BadIndexTests.test_modify_transaction
-^ldb.python.api.GUIDBadIndexTests.test_modify_transaction
-^ldb.python.api.GUIDBadIndexTestsLmdb.test_modify_transaction
-- 
2.18.1

-------------- next part --------------
A non-text attachment was scrubbed...
Name: signature.asc
Type: application/pgp-signature
Size: 488 bytes
Desc: OpenPGP digital signature
URL: <http://lists.samba.org/pipermail/samba-technical/attachments/20190327/e74ae72e/signature.sig>


More information about the samba-technical mailing list