[PATCH] Avoid privileged segfault in GetNCChanges and add many more tests

Andrew Bartlett abartlet at samba.org
Wed Aug 9 03:36:34 UTC 2017


This patch set fixes bug 12946 and adds a number of tests to ensure
that there are not other similar issues waiting to be uncovered.

I don't change our server behaviour to match windows exactly, as I
prefer that we keep our server behaviour of checking the ACLs first.

Please review and push!

Thanks,

Andrew Bartlett
-- 
Andrew Bartlett
https://samba.org/~abartlet/
Authentication Developer, Samba Team         https://samba.org
Samba Development and Support, Catalyst IT   
https://catalyst.net.nz/services/samba



-------------- next part --------------
From 31ace02d44cda2d1c2ce89f3f1d476740dd7e287 Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Wed, 9 Aug 2017 13:57:13 +1200
Subject: [PATCH 01/10] py-librpc: Strictly check the type of the incoming sid
 pointer

This avoids casting another type of object to a void* and then to a SID

Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
 source4/librpc/ndr/py_security.c | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git a/source4/librpc/ndr/py_security.c b/source4/librpc/ndr/py_security.c
index 4cc25664996..a2fa4a284ae 100644
--- a/source4/librpc/ndr/py_security.c
+++ b/source4/librpc/ndr/py_security.c
@@ -249,7 +249,15 @@ static PyObject *py_descriptor_from_sddl(PyObject *self, PyObject *args)
 	if (!PyArg_ParseTuple(args, "sO!", &sddl, &dom_sid_Type, &py_sid))
 		return NULL;
 
-	sid = pytalloc_get_ptr(py_sid);
+	if (PyObject_TypeCheck(py_sid, &dom_sid_Type)) {
+		sid = pytalloc_get_ptr(py_sid);
+	} else {
+		PyErr_SetString(PyExc_TypeError,
+				"expected security.dom_sid "
+				"for second argument to .from_sddl");
+		return NULL;
+
+	}
 
 	secdesc = sddl_decode(NULL, sddl, sid);
 	if (secdesc == NULL) {
-- 
2.11.0


From 70777326fa50e878aa3520bb8fa3ddc7dc11ecf8 Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Fri, 4 Aug 2017 11:44:19 +1200
Subject: [PATCH 02/10] s4-drsupai: Avoid segfault when replicating as a
 non-admin with GUID_DRS_GET_CHANGES

Users who are not administrator do not get b_state->sam_ctx_system filled in.

Signed-off-by: Andrew Bartlett <abartlet at samba.org>
BUG: https://bugzilla.samba.org/show_bug.cgi?id=12946
---
 source4/rpc_server/drsuapi/getncchanges.c  |   2 +-
 source4/selftest/tests.py                  |   5 ++
 source4/torture/drs/python/getnc_unpriv.py | 125 +++++++++++++++++++++++++++++
 3 files changed, 131 insertions(+), 1 deletion(-)
 create mode 100644 source4/torture/drs/python/getnc_unpriv.py

diff --git a/source4/rpc_server/drsuapi/getncchanges.c b/source4/rpc_server/drsuapi/getncchanges.c
index 096162dfded..b98f14c156a 100644
--- a/source4/rpc_server/drsuapi/getncchanges.c
+++ b/source4/rpc_server/drsuapi/getncchanges.c
@@ -2250,7 +2250,7 @@ allowed:
 			return WERR_NOT_ENOUGH_MEMORY;
 		}
 
-		ret = dsdb_find_guid_by_dn(b_state->sam_ctx_system,
+		ret = dsdb_find_guid_by_dn(b_state->sam_ctx,
 					   getnc_state->ncRoot_dn,
 					   &getnc_state->ncRoot_guid);
 		if (ret != LDB_SUCCESS) {
diff --git a/source4/selftest/tests.py b/source4/selftest/tests.py
index 2865095b005..869ea80b69d 100755
--- a/source4/selftest/tests.py
+++ b/source4/selftest/tests.py
@@ -838,6 +838,11 @@ for env in ['vampire_dc', 'promoted_dc']:
                            name="samba4.drs.getnc_exop.python(%s)" % env,
                            environ={'DC1': "$DC_SERVER", 'DC2': '$%s_SERVER' % env.upper()},
                            extra_args=['-U$DOMAIN/$DC_USERNAME%$DC_PASSWORD'])
+    planoldpythontestsuite(env, "getnc_unpriv",
+                           extra_path=[os.path.join(samba4srcdir, 'torture/drs/python')],
+                           name="samba4.drs.getnc_unpriv.python(%s)" % env,
+                           environ={'DC1': "$DC_SERVER", 'DC2': '$%s_SERVER' % env.upper()},
+                           extra_args=['-U$DOMAIN/$DC_USERNAME%$DC_PASSWORD'])
     planoldpythontestsuite(env, "linked_attributes_drs",
                            extra_path=[os.path.join(samba4srcdir, 'torture/drs/python')],
                            name="samba4.drs.linked_attributes_drs.python(%s)" % env,
diff --git a/source4/torture/drs/python/getnc_unpriv.py b/source4/torture/drs/python/getnc_unpriv.py
new file mode 100644
index 00000000000..2bbbbf8816e
--- /dev/null
+++ b/source4/torture/drs/python/getnc_unpriv.py
@@ -0,0 +1,125 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Tests various schema replication scenarios
+#
+# Copyright (C) Kamen Mazdrashki <kamenim at samba.org> 2011
+# Copyright (C) Andrew Bartlett <abartlet at samba.org> 2016
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+
+#
+# Usage:
+#  export DC1=dc1_dns_name
+#  export DC2=dc2_dns_name
+#  export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
+#  PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN getnc_exop -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
+#
+
+import random
+
+import drs_base
+from drs_base import AbstractLink
+
+import samba.tests
+import random
+import samba
+
+from samba import sd_utils
+import ldb
+from ldb import SCOPE_BASE
+
+from samba.dcerpc import drsuapi, misc, drsblobs
+from samba.drs_utils import drs_DsBind
+from samba.ndr import ndr_unpack, ndr_pack
+from samba.credentials import DONT_USE_KERBEROS
+class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
+    """Confirm the behaviour of DsGetNCChanges for unprivileged users"""
+
+    def setUp(self):
+        super(DrsReplicaSyncUnprivTestCase, self).setUp()
+        self.get_changes_user = "get-changes-user"
+        self.base_dn = self.ldb_dc1.get_default_basedn()
+        self.ou = "OU=test_getncchanges,%s" % self.base_dn
+        self.user_pass = samba.generate_random_password(12, 16)
+        self.ldb_dc1.add({
+            "dn": self.ou,
+            "objectclass": "organizationalUnit"})
+        self.ldb_dc1.newuser(self.get_changes_user, self.user_pass,
+                             userou="OU=test_getncchanges")
+        (self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1)
+
+        self.sd_utils = sd_utils.SDUtils(self.ldb_dc1)
+        user_dn = "cn=%s,%s" % (self.get_changes_user, self.ou)
+        user_sid = self.sd_utils.get_object_sid(user_dn)
+        mod = "(A;;CR;1131f6aa-9c07-11d1-f79f-00c04fc2dcd2;;%s)" % str(user_sid)
+        self.sd_utils.dacl_add_ace(self.base_dn, mod)
+
+        # We set DONT_USE_KERBEROS to avoid a race with getting the
+        # user replicated to our selected KDC
+        self.user_creds = self.insta_creds(template=self.get_credentials(),
+                                           username=self.get_changes_user,
+                                           userpass=self.user_pass,
+                                           kerberos_state=DONT_USE_KERBEROS)
+        (self.user_drs, self.user_drs_handle) = self._ds_bind(self.dnsname_dc1,
+                                                              self.user_creds)
+
+    def tearDown(self):
+        try:
+            self.ldb_dc1.delete(self.ou, ["tree_delete:1"])
+        except ldb.LdbError as (enum, string):
+            if enum == ldb.ERR_NO_SUCH_OBJECT:
+                pass
+        super(DrsReplicaSyncUnprivTestCase, self).tearDown()
+
+    def test_do_single_repl(self):
+        """
+        Make sure that DRSU_EXOP_REPL_OBJ works as a less-privileged
+        user with the GET_CHANGES right
+        """
+
+        ou1 = "OU=get_anc1,%s" % self.ou
+        self.ldb_dc1.add({
+            "dn": ou1,
+            "objectclass": "organizationalUnit"
+            })
+        ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+        req8 = self._exop_req8(dest_dsa=None,
+                               invocation_id=self.ldb_dc1.get_invocation_id(),
+                               nc_dn_str=ou1,
+                               exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
+                               replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+        (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
+        self._check_ctr6(ctr, [ou1])
+
+    def test_do_full_repl(self):
+        """
+        Make sure that DRSU_EXOP_REPL_OBJ works as a less-privileged
+        user with the GET_CHANGES right
+        """
+
+        ou1 = "OU=get_anc1,%s" % self.ou
+        self.ldb_dc1.add({
+            "dn": ou1,
+            "objectclass": "organizationalUnit"
+            })
+        ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+        req8 = self._exop_req8(dest_dsa=None,
+                               invocation_id=self.ldb_dc1.get_invocation_id(),
+                               nc_dn_str=ou1,
+                               exop=drsuapi.DRSUAPI_EXOP_NONE,
+                               replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+        (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
+        self.assertEqual(ctr.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_NONE)
-- 
2.11.0


From 9819c62e1afbb6c1351b68658f819d5d2caf7100 Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Fri, 4 Aug 2017 15:21:31 +1200
Subject: [PATCH 03/10] dsdb: Use samba.generate_random_password() in dirsync
 test

We do not like fixed passwords

Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
 source4/dsdb/tests/python/dirsync.py | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/source4/dsdb/tests/python/dirsync.py b/source4/dsdb/tests/python/dirsync.py
index e6c5bbab641..168d5110dfb 100755
--- a/source4/dsdb/tests/python/dirsync.py
+++ b/source4/dsdb/tests/python/dirsync.py
@@ -79,7 +79,7 @@ class DirsyncBaseTests(samba.tests.TestCase):
         self.ldb_admin = SamDB(ldapshost, credentials=creds, session_info=system_session(lp), lp=lp)
         self.base_dn = self.ldb_admin.domain_dn()
         self.domain_sid = security.dom_sid(self.ldb_admin.get_domain_sid())
-        self.user_pass = "samba123 at AAA"
+        self.user_pass = samba.generate_random_password(12, 16)
         self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
         self.sd_utils = sd_utils.SDUtils(self.ldb_admin)
         #used for anonymous login
-- 
2.11.0


From 881bfcd11383a6a74a3242179f399daf5b0b6fc0 Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Wed, 9 Aug 2017 13:56:07 +1200
Subject: [PATCH 04/10] selftest: Make dirsync test use symobolic name and OA
 not A

A is for Allow, OA is for Object Allow, which means check the GUID.

The previous ACE allowed all access, which was not the intention.

Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
 source4/dsdb/tests/python/dirsync.py | 5 +++--
 1 file changed, 3 insertions(+), 2 deletions(-)

diff --git a/source4/dsdb/tests/python/dirsync.py b/source4/dsdb/tests/python/dirsync.py
index 168d5110dfb..e302c42d8bb 100755
--- a/source4/dsdb/tests/python/dirsync.py
+++ b/source4/dsdb/tests/python/dirsync.py
@@ -30,7 +30,7 @@ import base64
 from ldb import LdbError, SCOPE_BASE
 from ldb import Message, MessageElement, Dn
 from ldb import FLAG_MOD_ADD, FLAG_MOD_DELETE
-from samba.dcerpc import security, misc, drsblobs
+from samba.dcerpc import security, misc, drsblobs, security
 from samba.ndr import ndr_unpack, ndr_pack
 
 from samba.auth import system_session
@@ -119,7 +119,8 @@ class SimpleDirsyncTests(DirsyncBaseTests):
         self.desc_sddl = self.sd_utils.get_sd_as_sddl(self.base_dn)
 
         user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.dirsync_user))
-        mod = "(A;;CR;1131f6aa-9c07-11d1-f79f-00c04fc2dcd2;;%s)" % str(user_sid)
+        mod = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_CHANGES,
+                                   str(user_sid))
         self.sd_utils.dacl_add_ace(self.base_dn, mod)
 
         # add admins to the Domain Admins group
-- 
2.11.0


From 60e25add297360b4677494e53d6a3470663c5ec7 Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Tue, 8 Aug 2017 14:34:14 +1200
Subject: [PATCH 05/10] s4-drsuapi: Refuse to replicate an NC is that not
 actually an NC

This prevents replication of an OU, you must replicate a whole NC per Windows 2012R2

Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
 source4/rpc_server/drsuapi/getncchanges.c  | 41 ++++++++++++++++++++++++------
 source4/torture/drs/python/getnc_unpriv.py |  4 ++-
 2 files changed, 36 insertions(+), 9 deletions(-)

diff --git a/source4/rpc_server/drsuapi/getncchanges.c b/source4/rpc_server/drsuapi/getncchanges.c
index b98f14c156a..21af30069b8 100644
--- a/source4/rpc_server/drsuapi/getncchanges.c
+++ b/source4/rpc_server/drsuapi/getncchanges.c
@@ -2240,6 +2240,14 @@ allowed:
 	}
 
 	if (getnc_state == NULL) {
+		struct ldb_result *res;
+		const char *attrs[] = {
+			"instanceType",
+			"objectGuID",
+			NULL
+		};
+		uint32_t nc_instanceType;
+
 		getnc_state = talloc_zero(b_state, struct drsuapi_getncchanges_state);
 		if (getnc_state == NULL) {
 			return WERR_NOT_ENOUGH_MEMORY;
@@ -2250,14 +2258,21 @@ allowed:
 			return WERR_NOT_ENOUGH_MEMORY;
 		}
 
-		ret = dsdb_find_guid_by_dn(b_state->sam_ctx,
-					   getnc_state->ncRoot_dn,
-					   &getnc_state->ncRoot_guid);
-		if (ret != LDB_SUCCESS) {
-			DEBUG(0,(__location__ ": Failed to find GUID of ncRoot_dn %s\n",
-				 ldb_dn_get_linearized(getnc_state->ncRoot_dn)));
-			return WERR_DS_DRA_INTERNAL_ERROR;
-		}
+		ret = dsdb_search_dn(sam_ctx, mem_ctx, &res,
+				     getnc_state->ncRoot_dn, attrs,
+				     DSDB_SEARCH_SHOW_DELETED |
+				     DSDB_SEARCH_SHOW_RECYCLED);
+		if (ret != LDB_SUCCESS || res->count < 1) {
+			DBG_WARNING("Failed to find ncRoot_dn %s\n",
+				    ldb_dn_get_linearized(getnc_state->ncRoot_dn));
+			return WERR_DS_CANT_FIND_EXPECTED_NC;
+		}
+		getnc_state->ncRoot_guid = samdb_result_guid(res->msgs[0],
+							     "objectGUID");
+		nc_instanceType = ldb_msg_find_attr_as_int(res->msgs[0],
+							   "instanceType",
+							   0);
+		TALLOC_FREE(res);
 		ncRoot->guid = getnc_state->ncRoot_guid;
 
 		/* find out if we are to replicate Schema NC */
@@ -2278,6 +2293,16 @@ allowed:
 		 */
 		switch (req10->extended_op) {
 		case DRSUAPI_EXOP_NONE:
+			if ((nc_instanceType & INSTANCE_TYPE_IS_NC_HEAD) == 0) {
+				const char *dn_str
+					= ldb_dn_get_linearized(getnc_state->ncRoot_dn);
+
+				DBG_NOTICE("Rejecting full replication on "
+					   "not NC %s", dn_str);
+
+				return WERR_DS_CANT_FIND_EXPECTED_NC;
+			}
+
 			break;
 		case DRSUAPI_EXOP_FSMO_RID_ALLOC:
 			werr = getncchanges_rid_alloc(b_state, mem_ctx, req10, &r->out.ctr->ctr6, &search_dn);
diff --git a/source4/torture/drs/python/getnc_unpriv.py b/source4/torture/drs/python/getnc_unpriv.py
index 2bbbbf8816e..f691a52117e 100644
--- a/source4/torture/drs/python/getnc_unpriv.py
+++ b/source4/torture/drs/python/getnc_unpriv.py
@@ -118,8 +118,10 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
         ou1_id = self._get_identifier(self.ldb_dc1, ou1)
         req8 = self._exop_req8(dest_dsa=None,
                                invocation_id=self.ldb_dc1.get_invocation_id(),
-                               nc_dn_str=ou1,
+                               nc_dn_str=self.ldb_dc1.get_default_basedn(),
                                exop=drsuapi.DRSUAPI_EXOP_NONE,
                                replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+
         (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
         self.assertEqual(ctr.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_NONE)
+
-- 
2.11.0


From 4a4926d993b8f3e3efdb4bd7cb429a0511f1c63f Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Tue, 8 Aug 2017 16:49:24 +1200
Subject: [PATCH 06/10] selftest: encrypt the LDAP connection in drs_base.py

Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
 source4/torture/drs/python/drs_base.py | 4 +++-
 1 file changed, 3 insertions(+), 1 deletion(-)

diff --git a/source4/torture/drs/python/drs_base.py b/source4/torture/drs/python/drs_base.py
index 9e258bbd23f..96d0e9206fa 100644
--- a/source4/torture/drs/python/drs_base.py
+++ b/source4/torture/drs/python/drs_base.py
@@ -32,7 +32,7 @@ from samba import dsdb
 from samba.dcerpc import drsuapi, misc, drsblobs, security
 from samba.ndr import ndr_unpack, ndr_pack
 from samba.drs_utils import drs_DsBind
-
+from samba import gensec
 from ldb import (
     SCOPE_BASE,
     Message,
@@ -49,6 +49,8 @@ class DrsBaseTestCase(SambaToolCmdTest):
 
     def setUp(self):
         super(DrsBaseTestCase, self).setUp()
+        creds = self.get_credentials()
+        creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
 
         # connect to DCs
         url_dc = samba.tests.env_get_var_value("DC1")
-- 
2.11.0


From 9b27ea48a244872cc6ab9a10cfc5591420127cb2 Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Tue, 8 Aug 2017 16:50:11 +1200
Subject: [PATCH 07/10] selftest: Move get_partial_attribute_set() to
 DrsBaseTestCase

Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
 source4/torture/drs/python/drs_base.py   | 7 +++++++
 source4/torture/drs/python/getnc_exop.py | 6 ------
 2 files changed, 7 insertions(+), 6 deletions(-)

diff --git a/source4/torture/drs/python/drs_base.py b/source4/torture/drs/python/drs_base.py
index 96d0e9206fa..5fb2205ab72 100644
--- a/source4/torture/drs/python/drs_base.py
+++ b/source4/torture/drs/python/drs_base.py
@@ -381,6 +381,13 @@ class DrsBaseTestCase(SambaToolCmdTest):
         (drs_handle, supported_extensions) = drs_DsBind(drs)
         return (drs, drs_handle)
 
+    def get_partial_attribute_set(self, attids=[drsuapi.DRSUAPI_ATTID_objectClass]):
+        partial_attribute_set = drsuapi.DsPartialAttributeSet()
+        partial_attribute_set.attids = attids
+        partial_attribute_set.num_attids = len(attids)
+        return partial_attribute_set
+
+
 
 class AbstractLink:
     def __init__(self, attid, flags, identifier, targetGUID):
diff --git a/source4/torture/drs/python/getnc_exop.py b/source4/torture/drs/python/getnc_exop.py
index caa782658fd..2b4bdc4b714 100644
--- a/source4/torture/drs/python/getnc_exop.py
+++ b/source4/torture/drs/python/getnc_exop.py
@@ -559,12 +559,6 @@ class DrsReplicaPrefixMapTestCase(drs_base.DrsBaseTestCase):
             if enum == ldb.ERR_NO_SUCH_OBJECT:
                 pass
 
-    def get_partial_attribute_set(self, attids=[drsuapi.DRSUAPI_ATTID_objectClass]):
-        partial_attribute_set = drsuapi.DsPartialAttributeSet()
-        partial_attribute_set.attids = attids
-        partial_attribute_set.num_attids = len(attids)
-        return partial_attribute_set
-
     def test_missing_prefix_map_dsa(self):
         partial_attribute_set = self.get_partial_attribute_set()
 
-- 
2.11.0


From 2b0a4274a591598164ab05cb5aa2dd0c8ec6fedf Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Tue, 8 Aug 2017 16:52:04 +1200
Subject: [PATCH 08/10] selftest: Confirm privileged replication of an OU is
 not permitted

Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
 source4/torture/drs/python/getnc_exop.py | 26 ++++++++++++++++++++++++++
 1 file changed, 26 insertions(+)

diff --git a/source4/torture/drs/python/getnc_exop.py b/source4/torture/drs/python/getnc_exop.py
index 2b4bdc4b714..e265bdc52a5 100644
--- a/source4/torture/drs/python/getnc_exop.py
+++ b/source4/torture/drs/python/getnc_exop.py
@@ -35,6 +35,7 @@ from drs_base import AbstractLink
 
 import samba.tests
 import random
+from samba import werror, WERRORError
 
 import ldb
 from ldb import SCOPE_BASE
@@ -193,6 +194,31 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
         (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 8, req8)
         self._check_ctr6(ctr, [ou2])
 
+    def test_do_full_repl_on_ou(self):
+        """
+        Make sure that a full replication on a not-an-nc fails with
+        the right error code
+        """
+
+        ou1 = "OU=get_anc1,%s" % self.ou
+        self.ldb_dc1.add({
+            "dn": ou1,
+            "objectclass": "organizationalUnit"
+            })
+        ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+        req8 = self._exop_req8(dest_dsa=None,
+                               invocation_id=self.ldb_dc1.get_invocation_id(),
+                               nc_dn_str=ou1,
+                               exop=drsuapi.DRSUAPI_EXOP_NONE,
+                               replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+
+        try:
+            (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle,
+                                                   8, req8)
+            self.fail("Expected DsGetNCChanges to fail with WERR_DS_CANT_FIND_EXPECTED_NC")
+        except WERRORError as (enum, estr):
+            self.assertEquals(enum, werror.WERR_DS_CANT_FIND_EXPECTED_NC)
+
     def test_link_utdv_hwm(self):
         """Test verify the DRS_GET_ANC behavior."""
 
-- 
2.11.0


From 0416fb45aeb540f120d02de18caa107a7d9d079a Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Tue, 8 Aug 2017 16:57:15 +1200
Subject: [PATCH 09/10] selftest: Extend further getnc_unpriv tests to pass
 against windows 2012R2

An important change in this patch is changing the ACE type from
 A (Allow)
to
 AO (Object Allow)

as that will then respect the supplied GUID, which we also make use
the constant from the security.idl.

We split out the tests to check replication with a user having GET_CHANGES
and GET_ALL_CHANGES rights.

Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
 selftest/knownfail.d/getnc_unpriv          |  20 ++
 source4/torture/drs/python/getnc_unpriv.py | 425 ++++++++++++++++++++++++++++-
 2 files changed, 436 insertions(+), 9 deletions(-)
 create mode 100644 selftest/knownfail.d/getnc_unpriv

diff --git a/selftest/knownfail.d/getnc_unpriv b/selftest/knownfail.d/getnc_unpriv
new file mode 100644
index 00000000000..14d59200c20
--- /dev/null
+++ b/selftest/knownfail.d/getnc_unpriv
@@ -0,0 +1,20 @@
+samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_full_repl_on_ou\(vampire_dc\)
+samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_full_repl_on_ou_unpriv\(vampire_dc\)
+samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl\(vampire_dc\)
+samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl_all\(vampire_dc\)
+samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl_dest_dsa\(vampire_dc\)
+samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl_dest_dsa_unpriv\(vampire_dc\)
+samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl_on_ou\(vampire_dc\)
+samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_single_repl_on_ou_unpriv\(vampire_dc\)
+samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_single_repl_unpriv\(vampire_dc\)
+samba4.drs.getnc_unpriv.python\(vampire_dc\)\(vampire_dc\)
+samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_full_repl_on_ou\(promoted_dc\)
+samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_full_repl_on_ou_unpriv\(promoted_dc\)
+samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl\(promoted_dc\)
+samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl_all\(promoted_dc\)
+samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl_dest_dsa\(promoted_dc\)
+samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl_dest_dsa_unpriv\(promoted_dc\)
+samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl_on_ou\(promoted_dc\)
+samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_single_repl_on_ou_unpriv\(promoted_dc\)
+samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_single_repl_unpriv\(promoted_dc\)
+samba4.drs.getnc_unpriv.python\(promoted_dc\)\(promoted_dc\)
diff --git a/source4/torture/drs/python/getnc_unpriv.py b/source4/torture/drs/python/getnc_unpriv.py
index f691a52117e..7fa34b0d81c 100644
--- a/source4/torture/drs/python/getnc_unpriv.py
+++ b/source4/torture/drs/python/getnc_unpriv.py
@@ -36,12 +36,13 @@ from drs_base import AbstractLink
 import samba.tests
 import random
 import samba
+from samba import werror, WERRORError
 
 from samba import sd_utils
 import ldb
 from ldb import SCOPE_BASE
 
-from samba.dcerpc import drsuapi, misc, drsblobs
+from samba.dcerpc import drsuapi, misc, drsblobs, security
 from samba.drs_utils import drs_DsBind
 from samba.ndr import ndr_unpack, ndr_pack
 from samba.credentials import DONT_USE_KERBEROS
@@ -62,10 +63,13 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
         (self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1)
 
         self.sd_utils = sd_utils.SDUtils(self.ldb_dc1)
-        user_dn = "cn=%s,%s" % (self.get_changes_user, self.ou)
-        user_sid = self.sd_utils.get_object_sid(user_dn)
-        mod = "(A;;CR;1131f6aa-9c07-11d1-f79f-00c04fc2dcd2;;%s)" % str(user_sid)
-        self.sd_utils.dacl_add_ace(self.base_dn, mod)
+        self.user_dn = "cn=%s,%s" % (self.get_changes_user, self.ou)
+        user_sid = self.sd_utils.get_object_sid(self.user_dn)
+        self.acl_mod = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_CHANGES,
+                                            str(user_sid))
+        self.acl_mod_all = self.acl_mod + "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_ALL_CHANGES,
+                                            str(user_sid))
+        self.desc_sddl = self.sd_utils.get_sd_as_sddl(self.base_dn)
 
         # We set DONT_USE_KERBEROS to avoid a race with getting the
         # user replicated to our selected KDC
@@ -77,6 +81,7 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
                                                               self.user_creds)
 
     def tearDown(self):
+        self.sd_utils.modify_sd_on_dn(self.base_dn, self.desc_sddl)
         try:
             self.ldb_dc1.delete(self.ou, ["tree_delete:1"])
         except ldb.LdbError as (enum, string):
@@ -89,6 +94,7 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
         Make sure that DRSU_EXOP_REPL_OBJ works as a less-privileged
         user with the GET_CHANGES right
         """
+        self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod)
 
         ou1 = "OU=get_anc1,%s" % self.ou
         self.ldb_dc1.add({
@@ -101,8 +107,332 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
                                nc_dn_str=ou1,
                                exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
                                replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
-        (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
-        self._check_ctr6(ctr, [ou1])
+        try:
+            (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
+                                                        8, req8)
+            self.fail("Should have failed with WERR_DS_DRA_ACCESS_DENIED")
+        except WERRORError as (enum, estr):
+            self.assertEquals(enum, werror.WERR_DS_DRA_ACCESS_DENIED)
+
+    def test_do_single_repl_all(self):
+        """
+        Make sure that DRSU_EXOP_REPL_OBJ works as a less-privileged
+        user with the GET_CHANGES right
+        """
+        self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod_all)
+
+        ou1 = "OU=get_anc1,%s" % self.ou
+        self.ldb_dc1.add({
+            "dn": ou1,
+            "objectclass": "organizationalUnit"
+            })
+        ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+        req8 = self._exop_req8(dest_dsa=None,
+                               invocation_id=self.ldb_dc1.get_invocation_id(),
+                               nc_dn_str=ou1,
+                               exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
+                               replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+        (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
+                                                    8, req8)
+
+    def test_do_single_repl_pas(self):
+        """
+        Make sure that DRSU_EXOP_REPL_OBJ works as a less-privileged
+        user with the GET_CHANGES right
+        """
+        self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod)
+
+        ou1 = "OU=get_anc1,%s" % self.ou
+        self.ldb_dc1.add({
+            "dn": ou1,
+            "objectclass": "organizationalUnit"
+            })
+        ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+        req8 = self._exop_req8(dest_dsa=None,
+                               invocation_id=self.ldb_dc1.get_invocation_id(),
+                               nc_dn_str=ou1,
+                               exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
+                               partial_attribute_set=self.get_partial_attribute_set(),
+                               replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+        (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
+                                                    8, req8)
+
+    def test_do_single_repl_unpriv(self):
+        """
+        Make sure that DRSU_EXOP_REPL_OBJ does not work as a less-privileged
+        user with only the GET_CHANGES right
+        """
+
+        ou1 = "OU=get_anc1,%s" % self.ou
+        self.ldb_dc1.add({
+            "dn": ou1,
+            "objectclass": "organizationalUnit"
+            })
+        ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+        req8 = self._exop_req8(dest_dsa=None,
+                               invocation_id=self.ldb_dc1.get_invocation_id(),
+                               nc_dn_str=ou1,
+                               exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
+                               replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+        try:
+            (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
+            self.fail("Should have failed with WERR_DS_DRA_BAD_DN")
+        except WERRORError as (enum, estr):
+            self.assertEquals(enum, werror.WERR_DS_DRA_BAD_DN)
+
+    def test_do_secret_repl_on_ou(self):
+        """
+        Make sure that DRSU_EXOP_REPL_SECRET does not work
+        as a less-privileged user with the GET_CHANGES right
+        """
+        self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod)
+
+        ou1 = "OU=get_anc1,%s" % self.ou
+        self.ldb_dc1.add({
+            "dn": ou1,
+            "objectclass": "organizationalUnit"
+            })
+        ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+        req8 = self._exop_req8(dest_dsa=None,
+                               invocation_id=self.ldb_dc1.get_invocation_id(),
+                               nc_dn_str=ou1,
+                               exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+                               replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+        try:
+            (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
+                                                        8, req8)
+            self.fail("Should have failed with WERR_DS_DRA_ACCESS_DENIED")
+        except WERRORError as (enum, estr):
+            self.assertEquals(enum, werror.WERR_DS_DRA_ACCESS_DENIED)
+
+    def test_do_single_repl_on_ou_unpriv(self):
+        """
+        Make sure that DRSU_EXOP_REPL_SECRET does not work as a less-privileged
+        user without the GET_CHANGES right
+        """
+
+        ou1 = "OU=get_anc1,%s" % self.ou
+        self.ldb_dc1.add({
+            "dn": ou1,
+            "objectclass": "organizationalUnit"
+            })
+        ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+        req8 = self._exop_req8(dest_dsa=None,
+                               invocation_id=self.ldb_dc1.get_invocation_id(),
+                               nc_dn_str=ou1,
+                               exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+                               replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+        try:
+            (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
+            self.fail("Should have failed with WERR_DS_DRA_BAD_DN")
+        except WERRORError as (enum, estr):
+            self.assertEquals(enum, werror.WERR_DS_DRA_BAD_DN)
+
+    def test_do_secret_repl(self):
+        """
+        Make sure that DRSU_EXOP_REPL_SECRET does not work
+        as a less-privileged user with the GET_CHANGES right
+        """
+        self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod)
+
+        ou1 = "OU=get_anc1,%s" % self.ou
+        self.ldb_dc1.add({
+            "dn": ou1,
+            "objectclass": "organizationalUnit"
+            })
+        ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+        req8 = self._exop_req8(dest_dsa=None,
+                               invocation_id=self.ldb_dc1.get_invocation_id(),
+                               nc_dn_str=self.user_dn,
+                               exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+                               replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+        try:
+            (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
+                                                        8, req8)
+            self.fail("Should have failed with WERR_DS_DRA_ACCESS_DENIED")
+        except WERRORError as (enum, estr):
+            self.assertEquals(enum, werror.WERR_DS_DRA_ACCESS_DENIED)
+
+    def test_do_secret_repl_all(self):
+        """
+        Make sure that DRSU_EXOP_REPL_SECRET does not work
+        as a less-privileged user with the GET_CHANGES right
+        """
+        self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod_all)
+
+        ou1 = "OU=get_anc1,%s" % self.ou
+        self.ldb_dc1.add({
+            "dn": ou1,
+            "objectclass": "organizationalUnit"
+            })
+        ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+        req8 = self._exop_req8(dest_dsa=None,
+                               invocation_id=self.ldb_dc1.get_invocation_id(),
+                               nc_dn_str=self.user_dn,
+                               exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+                               replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+        try:
+            (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
+                                                        8, req8)
+            self.fail("Should have failed with WERR_DS_DRA_DB_ERROR")
+        except WERRORError as (enum, estr):
+            self.assertEquals(enum, werror.WERR_DS_DRA_DB_ERROR)
+
+
+    def test_do_secret_repl_dest_dsa(self):
+        """
+        Make sure that DRSU_EXOP_REPL_SECRET does not work
+        as a less-privileged user with the GET_CHANGES right
+        """
+        self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod)
+
+        ou1 = "OU=get_anc1,%s" % self.ou
+        self.ldb_dc1.add({
+            "dn": ou1,
+            "objectclass": "organizationalUnit"
+            })
+        ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+        req8 = self._exop_req8(dest_dsa=self.ldb_dc1.get_ntds_GUID(),
+                               invocation_id=self.ldb_dc1.get_invocation_id(),
+                               nc_dn_str=self.user_dn,
+                               exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+                               replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+        try:
+            (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
+                                                        8, req8)
+            self.fail("Should have failed with WERR_DS_DRA_ACCESS_DENIED")
+        except WERRORError as (enum, estr):
+            self.assertEquals(enum, werror.WERR_DS_DRA_ACCESS_DENIED)
+
+    def test_do_secret_repl_dest_dsa_all(self):
+        """
+        Make sure that DRSU_EXOP_REPL_SECRET does not work
+        as a less-privileged user with the GET_CHANGES right
+        """
+        self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod_all)
+
+        ou1 = "OU=get_anc1,%s" % self.ou
+        self.ldb_dc1.add({
+            "dn": ou1,
+            "objectclass": "organizationalUnit"
+            })
+        ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+        req8 = self._exop_req8(dest_dsa=self.ldb_dc1.get_ntds_GUID(),
+                               invocation_id=self.ldb_dc1.get_invocation_id(),
+                               nc_dn_str=self.user_dn,
+                               exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+                               replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP
+                               | drsuapi.DRSUAPI_DRS_SYNC_FORCED)
+        try:
+            (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
+                                                        8, req8)
+            self.fail("Should have failed with an error")
+        except WERRORError as (enum, estr):
+
+            # There are a number of possible errors here, this
+            # is what Samba currently gives.
+
+            self.assertEquals(enum, werror.WERR_DS_DRA_SOURCE_DISABLED)
+
+    def test_do_secret_repl_dest_dsa_unpriv(self):
+        """
+        Make sure that DRSU_EXOP_REPL_SECRET does not work
+        as a less-privileged user with the GET_CHANGES right
+        """
+
+        ou1 = "OU=get_anc1,%s" % self.ou
+        self.ldb_dc1.add({
+            "dn": ou1,
+            "objectclass": "organizationalUnit"
+            })
+        ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+        req8 = self._exop_req8(dest_dsa=self.ldb_dc1.get_ntds_GUID(),
+                               invocation_id=self.ldb_dc1.get_invocation_id(),
+                               nc_dn_str=self.user_dn,
+                               exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+                               replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+
+        try:
+            (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
+            print ctr.__ndr_print__()
+            self.fail("Should have failed with WERR_DS_DRA_BAD_DN")
+        except WERRORError as (enum, estr):
+            self.assertEquals(enum, werror.WERR_DS_DRA_BAD_DN)
+
+
+    def test_do_single_repl_unpriv(self):
+        """
+        Make sure that DRSU_EXOP_REPL_SECRET does not work as a less-privileged
+        user without the GET_CHANGES right
+        """
+
+        ou1 = "OU=get_anc1,%s" % self.ou
+        self.ldb_dc1.add({
+            "dn": ou1,
+            "objectclass": "organizationalUnit"
+            })
+        ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+        req8 = self._exop_req8(dest_dsa=None,
+                               invocation_id=self.ldb_dc1.get_invocation_id(),
+                               nc_dn_str=self.user_dn,
+                               exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+                               replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+        try:
+            (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
+            self.fail("Should have failed with WERR_DS_DRA_BAD_DN")
+        except WERRORError as (enum, estr):
+            self.assertEquals(enum, werror.WERR_DS_DRA_BAD_DN)
+
+    def test_do_full_repl_on_ou(self):
+        """
+        Make sure that a full replication on a not-an-nc fails with
+        the right error code
+        """
+
+        self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod)
+
+        ou1 = "OU=get_anc1,%s" % self.ou
+        self.ldb_dc1.add({
+            "dn": ou1,
+            "objectclass": "organizationalUnit"
+            })
+        ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+        req8 = self._exop_req8(dest_dsa=None,
+                               invocation_id=self.ldb_dc1.get_invocation_id(),
+                               nc_dn_str=ou1,
+                               exop=drsuapi.DRSUAPI_EXOP_NONE,
+                               replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+
+        try:
+            (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
+            self.fail("Should have failed with WERR_DS_CANT_FIND_EXPECTED_NC")
+        except WERRORError as (enum, estr):
+            self.assertEquals(enum, werror.WERR_DS_CANT_FIND_EXPECTED_NC)
+
+    def test_do_full_repl_on_ou_unpriv(self):
+        """
+        Make sure that a full replication on a not-an-nc fails with
+        the right error code, for a user with no extra rights
+        """
+
+        ou1 = "OU=get_anc1,%s" % self.ou
+        self.ldb_dc1.add({
+            "dn": ou1,
+            "objectclass": "organizationalUnit"
+            })
+        ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+        req8 = self._exop_req8(dest_dsa=None,
+                               invocation_id=self.ldb_dc1.get_invocation_id(),
+                               nc_dn_str=ou1,
+                               exop=drsuapi.DRSUAPI_EXOP_NONE,
+                               replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+
+        try:
+            (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
+                                                        8, req8)
+            self.fail("Should have failed with WERR_DS_DRA_BAD_DN")
+        except WERRORError as (enum, estr):
+            self.assertEquals(enum, werror.WERR_DS_DRA_BAD_DN)
 
     def test_do_full_repl(self):
         """
@@ -110,6 +440,79 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
         user with the GET_CHANGES right
         """
 
+        self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod)
+        ou1 = "OU=get_anc1,%s" % self.ou
+        self.ldb_dc1.add({
+            "dn": ou1,
+            "objectclass": "organizationalUnit"
+            })
+        ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+        req8 = self._exop_req8(dest_dsa=None,
+                               invocation_id=self.ldb_dc1.get_invocation_id(),
+                               nc_dn_str=self.ldb_dc1.get_default_basedn(),
+                               exop=drsuapi.DRSUAPI_EXOP_NONE,
+                               replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+
+        try:
+            (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
+                                                        8, req8)
+            self.fail("Should have failed with WERR_DS_DRA_ACCESS_DENIED")
+        except WERRORError as (enum, estr):
+            self.assertEquals(enum, werror.WERR_DS_DRA_ACCESS_DENIED)
+
+
+    def test_do_full_repl_all(self):
+        """
+        Make sure that DRSU_EXOP_REPL_OBJ works as a less-privileged
+        user with the GET_CHANGES right
+        """
+
+        self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod_all)
+        ou1 = "OU=get_anc1,%s" % self.ou
+        self.ldb_dc1.add({
+            "dn": ou1,
+            "objectclass": "organizationalUnit"
+            })
+        ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+        req8 = self._exop_req8(dest_dsa=None,
+                               invocation_id=self.ldb_dc1.get_invocation_id(),
+                               nc_dn_str=self.ldb_dc1.get_default_basedn(),
+                               exop=drsuapi.DRSUAPI_EXOP_NONE,
+                               replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+
+        (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
+                                                    8, req8)
+
+
+    def test_do_full_repl_pas(self):
+        """
+        Make sure that DRSU_EXOP_REPL_OBJ works as a less-privileged
+        user with the GET_CHANGES right
+        """
+
+        self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod)
+        ou1 = "OU=get_anc1,%s" % self.ou
+        self.ldb_dc1.add({
+            "dn": ou1,
+            "objectclass": "organizationalUnit"
+            })
+        ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+        req8 = self._exop_req8(dest_dsa=None,
+                               invocation_id=self.ldb_dc1.get_invocation_id(),
+                               nc_dn_str=self.ldb_dc1.get_default_basedn(),
+                               exop=drsuapi.DRSUAPI_EXOP_NONE,
+                               partial_attribute_set=self.get_partial_attribute_set(),
+                               replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+
+        (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
+                                                        8, req8)
+
+    def test_do_full_repl_unpriv(self):
+        """
+        Make sure that DRSU_EXOP_REPL_OBJ fails as a less-privileged
+        user without the GET_CHANGES right
+        """
+
         ou1 = "OU=get_anc1,%s" % self.ou
         self.ldb_dc1.add({
             "dn": ou1,
@@ -122,6 +525,10 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
                                exop=drsuapi.DRSUAPI_EXOP_NONE,
                                replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
 
-        (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
-        self.assertEqual(ctr.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_NONE)
+        try:
+            (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
+                                                        8, req8)
+            self.fail("Should have failed with WERR_DS_DRA_ACCESS_DENIED")
+        except WERRORError as (enum, estr):
+            self.assertEquals(enum, werror.WERR_DS_DRA_ACCESS_DENIED)
 
-- 
2.11.0


From 527e2db2e30f224d7e565ad9daa626f60a6fe0db Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Wed, 9 Aug 2017 15:00:31 +1200
Subject: [PATCH 10/10] selftest: Samba checks GetNCChanges access in a
 different order to windows

Generally Samba checks access first, before checking the DN

Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
 selftest/knownfail.d/getnc_unpriv          | 20 ---------------
 source4/torture/drs/python/getnc_unpriv.py | 40 ++++++++++++++++++++++--------
 2 files changed, 30 insertions(+), 30 deletions(-)
 delete mode 100644 selftest/knownfail.d/getnc_unpriv

diff --git a/selftest/knownfail.d/getnc_unpriv b/selftest/knownfail.d/getnc_unpriv
deleted file mode 100644
index 14d59200c20..00000000000
--- a/selftest/knownfail.d/getnc_unpriv
+++ /dev/null
@@ -1,20 +0,0 @@
-samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_full_repl_on_ou\(vampire_dc\)
-samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_full_repl_on_ou_unpriv\(vampire_dc\)
-samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl\(vampire_dc\)
-samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl_all\(vampire_dc\)
-samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl_dest_dsa\(vampire_dc\)
-samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl_dest_dsa_unpriv\(vampire_dc\)
-samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl_on_ou\(vampire_dc\)
-samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_single_repl_on_ou_unpriv\(vampire_dc\)
-samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_single_repl_unpriv\(vampire_dc\)
-samba4.drs.getnc_unpriv.python\(vampire_dc\)\(vampire_dc\)
-samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_full_repl_on_ou\(promoted_dc\)
-samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_full_repl_on_ou_unpriv\(promoted_dc\)
-samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl\(promoted_dc\)
-samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl_all\(promoted_dc\)
-samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl_dest_dsa\(promoted_dc\)
-samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl_dest_dsa_unpriv\(promoted_dc\)
-samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl_on_ou\(promoted_dc\)
-samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_single_repl_on_ou_unpriv\(promoted_dc\)
-samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_single_repl_unpriv\(promoted_dc\)
-samba4.drs.getnc_unpriv.python\(promoted_dc\)\(promoted_dc\)
diff --git a/source4/torture/drs/python/getnc_unpriv.py b/source4/torture/drs/python/getnc_unpriv.py
index 7fa34b0d81c..a9bf9f81612 100644
--- a/source4/torture/drs/python/getnc_unpriv.py
+++ b/source4/torture/drs/python/getnc_unpriv.py
@@ -178,7 +178,9 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
             (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
             self.fail("Should have failed with WERR_DS_DRA_BAD_DN")
         except WERRORError as (enum, estr):
-            self.assertEquals(enum, werror.WERR_DS_DRA_BAD_DN)
+            # Allow Samba to check access first
+            if enum != werror.WERR_DS_DRA_ACCESS_DENIED:
+                self.assertEquals(enum, werror.WERR_DS_DRA_BAD_DN)
 
     def test_do_secret_repl_on_ou(self):
         """
@@ -203,7 +205,9 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
                                                         8, req8)
             self.fail("Should have failed with WERR_DS_DRA_ACCESS_DENIED")
         except WERRORError as (enum, estr):
-            self.assertEquals(enum, werror.WERR_DS_DRA_ACCESS_DENIED)
+            # Allow Samba to have a different error here
+            if enum != werror.WERR_DS_DRA_SOURCE_DISABLED:
+                self.assertEquals(enum, werror.WERR_DS_DRA_ACCESS_DENIED)
 
     def test_do_single_repl_on_ou_unpriv(self):
         """
@@ -226,7 +230,9 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
             (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
             self.fail("Should have failed with WERR_DS_DRA_BAD_DN")
         except WERRORError as (enum, estr):
-            self.assertEquals(enum, werror.WERR_DS_DRA_BAD_DN)
+            # Allow Samba to check access first
+            if enum != werror.WERR_DS_DRA_ACCESS_DENIED:
+                self.assertEquals(enum, werror.WERR_DS_DRA_BAD_DN)
 
     def test_do_secret_repl(self):
         """
@@ -251,7 +257,9 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
                                                         8, req8)
             self.fail("Should have failed with WERR_DS_DRA_ACCESS_DENIED")
         except WERRORError as (enum, estr):
-            self.assertEquals(enum, werror.WERR_DS_DRA_ACCESS_DENIED)
+            # Allow Samba to have a different error here
+            if enum != werror.WERR_DS_DRA_SOURCE_DISABLED:
+                self.assertEquals(enum, werror.WERR_DS_DRA_ACCESS_DENIED)
 
     def test_do_secret_repl_all(self):
         """
@@ -276,7 +284,9 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
                                                         8, req8)
             self.fail("Should have failed with WERR_DS_DRA_DB_ERROR")
         except WERRORError as (enum, estr):
-            self.assertEquals(enum, werror.WERR_DS_DRA_DB_ERROR)
+            # Allow Samba to have a different error here
+            if enum != werror.WERR_DS_DRA_SOURCE_DISABLED:
+                self.assertEquals(enum, werror.WERR_DS_DRA_DB_ERROR)
 
 
     def test_do_secret_repl_dest_dsa(self):
@@ -302,7 +312,9 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
                                                         8, req8)
             self.fail("Should have failed with WERR_DS_DRA_ACCESS_DENIED")
         except WERRORError as (enum, estr):
-            self.assertEquals(enum, werror.WERR_DS_DRA_ACCESS_DENIED)
+            # Allow Samba to have a different error here
+            if enum != werror.WERR_DS_DRA_SOURCE_DISABLED:
+                self.assertEquals(enum, werror.WERR_DS_DRA_ACCESS_DENIED)
 
     def test_do_secret_repl_dest_dsa_all(self):
         """
@@ -357,7 +369,9 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
             print ctr.__ndr_print__()
             self.fail("Should have failed with WERR_DS_DRA_BAD_DN")
         except WERRORError as (enum, estr):
-            self.assertEquals(enum, werror.WERR_DS_DRA_BAD_DN)
+            # Allow Samba to have a different error here
+            if enum != werror.WERR_DS_DRA_ACCESS_DENIED:
+                self.assertEquals(enum, werror.WERR_DS_DRA_BAD_DN)
 
 
     def test_do_single_repl_unpriv(self):
@@ -381,7 +395,9 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
             (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
             self.fail("Should have failed with WERR_DS_DRA_BAD_DN")
         except WERRORError as (enum, estr):
-            self.assertEquals(enum, werror.WERR_DS_DRA_BAD_DN)
+            # Allow Samba to check access first
+            if enum != werror.WERR_DS_DRA_ACCESS_DENIED:
+                self.assertEquals(enum, werror.WERR_DS_DRA_BAD_DN)
 
     def test_do_full_repl_on_ou(self):
         """
@@ -407,7 +423,9 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
             (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
             self.fail("Should have failed with WERR_DS_CANT_FIND_EXPECTED_NC")
         except WERRORError as (enum, estr):
-            self.assertEquals(enum, werror.WERR_DS_CANT_FIND_EXPECTED_NC)
+            # Allow Samba to check access first
+            if enum != werror.WERR_DS_DRA_ACCESS_DENIED:
+                self.assertEquals(enum, werror.WERR_DS_CANT_FIND_EXPECTED_NC)
 
     def test_do_full_repl_on_ou_unpriv(self):
         """
@@ -432,7 +450,9 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
                                                         8, req8)
             self.fail("Should have failed with WERR_DS_DRA_BAD_DN")
         except WERRORError as (enum, estr):
-            self.assertEquals(enum, werror.WERR_DS_DRA_BAD_DN)
+            # Allow Samba to check access first
+            if enum != werror.WERR_DS_DRA_ACCESS_DENIED:
+                self.assertEquals(enum, werror.WERR_DS_DRA_BAD_DN)
 
     def test_do_full_repl(self):
         """
-- 
2.11.0



More information about the samba-technical mailing list