[PATCH] Avoid privileged segfault in GetNCChanges and add many more tests
Andrew Bartlett
abartlet at samba.org
Wed Aug 9 03:36:34 UTC 2017
This patch set fixes bug 12946 and adds a number of tests to ensure
that there are not other similar issues waiting to be uncovered.
I don't change our server behaviour to match windows exactly, as I
prefer that we keep our server behaviour of checking the ACLs first.
Please review and push!
Thanks,
Andrew Bartlett
--
Andrew Bartlett
https://samba.org/~abartlet/
Authentication Developer, Samba Team https://samba.org
Samba Development and Support, Catalyst IT
https://catalyst.net.nz/services/samba
-------------- next part --------------
From 31ace02d44cda2d1c2ce89f3f1d476740dd7e287 Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Wed, 9 Aug 2017 13:57:13 +1200
Subject: [PATCH 01/10] py-librpc: Strictly check the type of the incoming sid
pointer
This avoids casting another type of object to a void* and then to a SID
Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
source4/librpc/ndr/py_security.c | 10 +++++++++-
1 file changed, 9 insertions(+), 1 deletion(-)
diff --git a/source4/librpc/ndr/py_security.c b/source4/librpc/ndr/py_security.c
index 4cc25664996..a2fa4a284ae 100644
--- a/source4/librpc/ndr/py_security.c
+++ b/source4/librpc/ndr/py_security.c
@@ -249,7 +249,15 @@ static PyObject *py_descriptor_from_sddl(PyObject *self, PyObject *args)
if (!PyArg_ParseTuple(args, "sO!", &sddl, &dom_sid_Type, &py_sid))
return NULL;
- sid = pytalloc_get_ptr(py_sid);
+ if (PyObject_TypeCheck(py_sid, &dom_sid_Type)) {
+ sid = pytalloc_get_ptr(py_sid);
+ } else {
+ PyErr_SetString(PyExc_TypeError,
+ "expected security.dom_sid "
+ "for second argument to .from_sddl");
+ return NULL;
+
+ }
secdesc = sddl_decode(NULL, sddl, sid);
if (secdesc == NULL) {
--
2.11.0
From 70777326fa50e878aa3520bb8fa3ddc7dc11ecf8 Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Fri, 4 Aug 2017 11:44:19 +1200
Subject: [PATCH 02/10] s4-drsupai: Avoid segfault when replicating as a
non-admin with GUID_DRS_GET_CHANGES
Users who are not administrator do not get b_state->sam_ctx_system filled in.
Signed-off-by: Andrew Bartlett <abartlet at samba.org>
BUG: https://bugzilla.samba.org/show_bug.cgi?id=12946
---
source4/rpc_server/drsuapi/getncchanges.c | 2 +-
source4/selftest/tests.py | 5 ++
source4/torture/drs/python/getnc_unpriv.py | 125 +++++++++++++++++++++++++++++
3 files changed, 131 insertions(+), 1 deletion(-)
create mode 100644 source4/torture/drs/python/getnc_unpriv.py
diff --git a/source4/rpc_server/drsuapi/getncchanges.c b/source4/rpc_server/drsuapi/getncchanges.c
index 096162dfded..b98f14c156a 100644
--- a/source4/rpc_server/drsuapi/getncchanges.c
+++ b/source4/rpc_server/drsuapi/getncchanges.c
@@ -2250,7 +2250,7 @@ allowed:
return WERR_NOT_ENOUGH_MEMORY;
}
- ret = dsdb_find_guid_by_dn(b_state->sam_ctx_system,
+ ret = dsdb_find_guid_by_dn(b_state->sam_ctx,
getnc_state->ncRoot_dn,
&getnc_state->ncRoot_guid);
if (ret != LDB_SUCCESS) {
diff --git a/source4/selftest/tests.py b/source4/selftest/tests.py
index 2865095b005..869ea80b69d 100755
--- a/source4/selftest/tests.py
+++ b/source4/selftest/tests.py
@@ -838,6 +838,11 @@ for env in ['vampire_dc', 'promoted_dc']:
name="samba4.drs.getnc_exop.python(%s)" % env,
environ={'DC1': "$DC_SERVER", 'DC2': '$%s_SERVER' % env.upper()},
extra_args=['-U$DOMAIN/$DC_USERNAME%$DC_PASSWORD'])
+ planoldpythontestsuite(env, "getnc_unpriv",
+ extra_path=[os.path.join(samba4srcdir, 'torture/drs/python')],
+ name="samba4.drs.getnc_unpriv.python(%s)" % env,
+ environ={'DC1': "$DC_SERVER", 'DC2': '$%s_SERVER' % env.upper()},
+ extra_args=['-U$DOMAIN/$DC_USERNAME%$DC_PASSWORD'])
planoldpythontestsuite(env, "linked_attributes_drs",
extra_path=[os.path.join(samba4srcdir, 'torture/drs/python')],
name="samba4.drs.linked_attributes_drs.python(%s)" % env,
diff --git a/source4/torture/drs/python/getnc_unpriv.py b/source4/torture/drs/python/getnc_unpriv.py
new file mode 100644
index 00000000000..2bbbbf8816e
--- /dev/null
+++ b/source4/torture/drs/python/getnc_unpriv.py
@@ -0,0 +1,125 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+#
+# Tests various schema replication scenarios
+#
+# Copyright (C) Kamen Mazdrashki <kamenim at samba.org> 2011
+# Copyright (C) Andrew Bartlett <abartlet at samba.org> 2016
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+#
+# Usage:
+# export DC1=dc1_dns_name
+# export DC2=dc2_dns_name
+# export SUBUNITRUN=$samba4srcdir/scripting/bin/subunitrun
+# PYTHONPATH="$PYTHONPATH:$samba4srcdir/torture/drs/python" $SUBUNITRUN getnc_exop -U"$DOMAIN/$DC_USERNAME"%"$DC_PASSWORD"
+#
+
+import random
+
+import drs_base
+from drs_base import AbstractLink
+
+import samba.tests
+import random
+import samba
+
+from samba import sd_utils
+import ldb
+from ldb import SCOPE_BASE
+
+from samba.dcerpc import drsuapi, misc, drsblobs
+from samba.drs_utils import drs_DsBind
+from samba.ndr import ndr_unpack, ndr_pack
+from samba.credentials import DONT_USE_KERBEROS
+class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
+ """Confirm the behaviour of DsGetNCChanges for unprivileged users"""
+
+ def setUp(self):
+ super(DrsReplicaSyncUnprivTestCase, self).setUp()
+ self.get_changes_user = "get-changes-user"
+ self.base_dn = self.ldb_dc1.get_default_basedn()
+ self.ou = "OU=test_getncchanges,%s" % self.base_dn
+ self.user_pass = samba.generate_random_password(12, 16)
+ self.ldb_dc1.add({
+ "dn": self.ou,
+ "objectclass": "organizationalUnit"})
+ self.ldb_dc1.newuser(self.get_changes_user, self.user_pass,
+ userou="OU=test_getncchanges")
+ (self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1)
+
+ self.sd_utils = sd_utils.SDUtils(self.ldb_dc1)
+ user_dn = "cn=%s,%s" % (self.get_changes_user, self.ou)
+ user_sid = self.sd_utils.get_object_sid(user_dn)
+ mod = "(A;;CR;1131f6aa-9c07-11d1-f79f-00c04fc2dcd2;;%s)" % str(user_sid)
+ self.sd_utils.dacl_add_ace(self.base_dn, mod)
+
+ # We set DONT_USE_KERBEROS to avoid a race with getting the
+ # user replicated to our selected KDC
+ self.user_creds = self.insta_creds(template=self.get_credentials(),
+ username=self.get_changes_user,
+ userpass=self.user_pass,
+ kerberos_state=DONT_USE_KERBEROS)
+ (self.user_drs, self.user_drs_handle) = self._ds_bind(self.dnsname_dc1,
+ self.user_creds)
+
+ def tearDown(self):
+ try:
+ self.ldb_dc1.delete(self.ou, ["tree_delete:1"])
+ except ldb.LdbError as (enum, string):
+ if enum == ldb.ERR_NO_SUCH_OBJECT:
+ pass
+ super(DrsReplicaSyncUnprivTestCase, self).tearDown()
+
+ def test_do_single_repl(self):
+ """
+ Make sure that DRSU_EXOP_REPL_OBJ works as a less-privileged
+ user with the GET_CHANGES right
+ """
+
+ ou1 = "OU=get_anc1,%s" % self.ou
+ self.ldb_dc1.add({
+ "dn": ou1,
+ "objectclass": "organizationalUnit"
+ })
+ ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+ req8 = self._exop_req8(dest_dsa=None,
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=ou1,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
+ replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+ (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
+ self._check_ctr6(ctr, [ou1])
+
+ def test_do_full_repl(self):
+ """
+ Make sure that DRSU_EXOP_REPL_OBJ works as a less-privileged
+ user with the GET_CHANGES right
+ """
+
+ ou1 = "OU=get_anc1,%s" % self.ou
+ self.ldb_dc1.add({
+ "dn": ou1,
+ "objectclass": "organizationalUnit"
+ })
+ ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+ req8 = self._exop_req8(dest_dsa=None,
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=ou1,
+ exop=drsuapi.DRSUAPI_EXOP_NONE,
+ replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+ (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
+ self.assertEqual(ctr.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_NONE)
--
2.11.0
From 9819c62e1afbb6c1351b68658f819d5d2caf7100 Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Fri, 4 Aug 2017 15:21:31 +1200
Subject: [PATCH 03/10] dsdb: Use samba.generate_random_password() in dirsync
test
We do not like fixed passwords
Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
source4/dsdb/tests/python/dirsync.py | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/source4/dsdb/tests/python/dirsync.py b/source4/dsdb/tests/python/dirsync.py
index e6c5bbab641..168d5110dfb 100755
--- a/source4/dsdb/tests/python/dirsync.py
+++ b/source4/dsdb/tests/python/dirsync.py
@@ -79,7 +79,7 @@ class DirsyncBaseTests(samba.tests.TestCase):
self.ldb_admin = SamDB(ldapshost, credentials=creds, session_info=system_session(lp), lp=lp)
self.base_dn = self.ldb_admin.domain_dn()
self.domain_sid = security.dom_sid(self.ldb_admin.get_domain_sid())
- self.user_pass = "samba123 at AAA"
+ self.user_pass = samba.generate_random_password(12, 16)
self.configuration_dn = self.ldb_admin.get_config_basedn().get_linearized()
self.sd_utils = sd_utils.SDUtils(self.ldb_admin)
#used for anonymous login
--
2.11.0
From 881bfcd11383a6a74a3242179f399daf5b0b6fc0 Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Wed, 9 Aug 2017 13:56:07 +1200
Subject: [PATCH 04/10] selftest: Make dirsync test use symobolic name and OA
not A
A is for Allow, OA is for Object Allow, which means check the GUID.
The previous ACE allowed all access, which was not the intention.
Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
source4/dsdb/tests/python/dirsync.py | 5 +++--
1 file changed, 3 insertions(+), 2 deletions(-)
diff --git a/source4/dsdb/tests/python/dirsync.py b/source4/dsdb/tests/python/dirsync.py
index 168d5110dfb..e302c42d8bb 100755
--- a/source4/dsdb/tests/python/dirsync.py
+++ b/source4/dsdb/tests/python/dirsync.py
@@ -30,7 +30,7 @@ import base64
from ldb import LdbError, SCOPE_BASE
from ldb import Message, MessageElement, Dn
from ldb import FLAG_MOD_ADD, FLAG_MOD_DELETE
-from samba.dcerpc import security, misc, drsblobs
+from samba.dcerpc import security, misc, drsblobs, security
from samba.ndr import ndr_unpack, ndr_pack
from samba.auth import system_session
@@ -119,7 +119,8 @@ class SimpleDirsyncTests(DirsyncBaseTests):
self.desc_sddl = self.sd_utils.get_sd_as_sddl(self.base_dn)
user_sid = self.sd_utils.get_object_sid(self.get_user_dn(self.dirsync_user))
- mod = "(A;;CR;1131f6aa-9c07-11d1-f79f-00c04fc2dcd2;;%s)" % str(user_sid)
+ mod = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_CHANGES,
+ str(user_sid))
self.sd_utils.dacl_add_ace(self.base_dn, mod)
# add admins to the Domain Admins group
--
2.11.0
From 60e25add297360b4677494e53d6a3470663c5ec7 Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Tue, 8 Aug 2017 14:34:14 +1200
Subject: [PATCH 05/10] s4-drsuapi: Refuse to replicate an NC is that not
actually an NC
This prevents replication of an OU, you must replicate a whole NC per Windows 2012R2
Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
source4/rpc_server/drsuapi/getncchanges.c | 41 ++++++++++++++++++++++++------
source4/torture/drs/python/getnc_unpriv.py | 4 ++-
2 files changed, 36 insertions(+), 9 deletions(-)
diff --git a/source4/rpc_server/drsuapi/getncchanges.c b/source4/rpc_server/drsuapi/getncchanges.c
index b98f14c156a..21af30069b8 100644
--- a/source4/rpc_server/drsuapi/getncchanges.c
+++ b/source4/rpc_server/drsuapi/getncchanges.c
@@ -2240,6 +2240,14 @@ allowed:
}
if (getnc_state == NULL) {
+ struct ldb_result *res;
+ const char *attrs[] = {
+ "instanceType",
+ "objectGuID",
+ NULL
+ };
+ uint32_t nc_instanceType;
+
getnc_state = talloc_zero(b_state, struct drsuapi_getncchanges_state);
if (getnc_state == NULL) {
return WERR_NOT_ENOUGH_MEMORY;
@@ -2250,14 +2258,21 @@ allowed:
return WERR_NOT_ENOUGH_MEMORY;
}
- ret = dsdb_find_guid_by_dn(b_state->sam_ctx,
- getnc_state->ncRoot_dn,
- &getnc_state->ncRoot_guid);
- if (ret != LDB_SUCCESS) {
- DEBUG(0,(__location__ ": Failed to find GUID of ncRoot_dn %s\n",
- ldb_dn_get_linearized(getnc_state->ncRoot_dn)));
- return WERR_DS_DRA_INTERNAL_ERROR;
- }
+ ret = dsdb_search_dn(sam_ctx, mem_ctx, &res,
+ getnc_state->ncRoot_dn, attrs,
+ DSDB_SEARCH_SHOW_DELETED |
+ DSDB_SEARCH_SHOW_RECYCLED);
+ if (ret != LDB_SUCCESS || res->count < 1) {
+ DBG_WARNING("Failed to find ncRoot_dn %s\n",
+ ldb_dn_get_linearized(getnc_state->ncRoot_dn));
+ return WERR_DS_CANT_FIND_EXPECTED_NC;
+ }
+ getnc_state->ncRoot_guid = samdb_result_guid(res->msgs[0],
+ "objectGUID");
+ nc_instanceType = ldb_msg_find_attr_as_int(res->msgs[0],
+ "instanceType",
+ 0);
+ TALLOC_FREE(res);
ncRoot->guid = getnc_state->ncRoot_guid;
/* find out if we are to replicate Schema NC */
@@ -2278,6 +2293,16 @@ allowed:
*/
switch (req10->extended_op) {
case DRSUAPI_EXOP_NONE:
+ if ((nc_instanceType & INSTANCE_TYPE_IS_NC_HEAD) == 0) {
+ const char *dn_str
+ = ldb_dn_get_linearized(getnc_state->ncRoot_dn);
+
+ DBG_NOTICE("Rejecting full replication on "
+ "not NC %s", dn_str);
+
+ return WERR_DS_CANT_FIND_EXPECTED_NC;
+ }
+
break;
case DRSUAPI_EXOP_FSMO_RID_ALLOC:
werr = getncchanges_rid_alloc(b_state, mem_ctx, req10, &r->out.ctr->ctr6, &search_dn);
diff --git a/source4/torture/drs/python/getnc_unpriv.py b/source4/torture/drs/python/getnc_unpriv.py
index 2bbbbf8816e..f691a52117e 100644
--- a/source4/torture/drs/python/getnc_unpriv.py
+++ b/source4/torture/drs/python/getnc_unpriv.py
@@ -118,8 +118,10 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
ou1_id = self._get_identifier(self.ldb_dc1, ou1)
req8 = self._exop_req8(dest_dsa=None,
invocation_id=self.ldb_dc1.get_invocation_id(),
- nc_dn_str=ou1,
+ nc_dn_str=self.ldb_dc1.get_default_basedn(),
exop=drsuapi.DRSUAPI_EXOP_NONE,
replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+
(level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
self.assertEqual(ctr.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_NONE)
+
--
2.11.0
From 4a4926d993b8f3e3efdb4bd7cb429a0511f1c63f Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Tue, 8 Aug 2017 16:49:24 +1200
Subject: [PATCH 06/10] selftest: encrypt the LDAP connection in drs_base.py
Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
source4/torture/drs/python/drs_base.py | 4 +++-
1 file changed, 3 insertions(+), 1 deletion(-)
diff --git a/source4/torture/drs/python/drs_base.py b/source4/torture/drs/python/drs_base.py
index 9e258bbd23f..96d0e9206fa 100644
--- a/source4/torture/drs/python/drs_base.py
+++ b/source4/torture/drs/python/drs_base.py
@@ -32,7 +32,7 @@ from samba import dsdb
from samba.dcerpc import drsuapi, misc, drsblobs, security
from samba.ndr import ndr_unpack, ndr_pack
from samba.drs_utils import drs_DsBind
-
+from samba import gensec
from ldb import (
SCOPE_BASE,
Message,
@@ -49,6 +49,8 @@ class DrsBaseTestCase(SambaToolCmdTest):
def setUp(self):
super(DrsBaseTestCase, self).setUp()
+ creds = self.get_credentials()
+ creds.set_gensec_features(creds.get_gensec_features() | gensec.FEATURE_SEAL)
# connect to DCs
url_dc = samba.tests.env_get_var_value("DC1")
--
2.11.0
From 9b27ea48a244872cc6ab9a10cfc5591420127cb2 Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Tue, 8 Aug 2017 16:50:11 +1200
Subject: [PATCH 07/10] selftest: Move get_partial_attribute_set() to
DrsBaseTestCase
Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
source4/torture/drs/python/drs_base.py | 7 +++++++
source4/torture/drs/python/getnc_exop.py | 6 ------
2 files changed, 7 insertions(+), 6 deletions(-)
diff --git a/source4/torture/drs/python/drs_base.py b/source4/torture/drs/python/drs_base.py
index 96d0e9206fa..5fb2205ab72 100644
--- a/source4/torture/drs/python/drs_base.py
+++ b/source4/torture/drs/python/drs_base.py
@@ -381,6 +381,13 @@ class DrsBaseTestCase(SambaToolCmdTest):
(drs_handle, supported_extensions) = drs_DsBind(drs)
return (drs, drs_handle)
+ def get_partial_attribute_set(self, attids=[drsuapi.DRSUAPI_ATTID_objectClass]):
+ partial_attribute_set = drsuapi.DsPartialAttributeSet()
+ partial_attribute_set.attids = attids
+ partial_attribute_set.num_attids = len(attids)
+ return partial_attribute_set
+
+
class AbstractLink:
def __init__(self, attid, flags, identifier, targetGUID):
diff --git a/source4/torture/drs/python/getnc_exop.py b/source4/torture/drs/python/getnc_exop.py
index caa782658fd..2b4bdc4b714 100644
--- a/source4/torture/drs/python/getnc_exop.py
+++ b/source4/torture/drs/python/getnc_exop.py
@@ -559,12 +559,6 @@ class DrsReplicaPrefixMapTestCase(drs_base.DrsBaseTestCase):
if enum == ldb.ERR_NO_SUCH_OBJECT:
pass
- def get_partial_attribute_set(self, attids=[drsuapi.DRSUAPI_ATTID_objectClass]):
- partial_attribute_set = drsuapi.DsPartialAttributeSet()
- partial_attribute_set.attids = attids
- partial_attribute_set.num_attids = len(attids)
- return partial_attribute_set
-
def test_missing_prefix_map_dsa(self):
partial_attribute_set = self.get_partial_attribute_set()
--
2.11.0
From 2b0a4274a591598164ab05cb5aa2dd0c8ec6fedf Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Tue, 8 Aug 2017 16:52:04 +1200
Subject: [PATCH 08/10] selftest: Confirm privileged replication of an OU is
not permitted
Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
source4/torture/drs/python/getnc_exop.py | 26 ++++++++++++++++++++++++++
1 file changed, 26 insertions(+)
diff --git a/source4/torture/drs/python/getnc_exop.py b/source4/torture/drs/python/getnc_exop.py
index 2b4bdc4b714..e265bdc52a5 100644
--- a/source4/torture/drs/python/getnc_exop.py
+++ b/source4/torture/drs/python/getnc_exop.py
@@ -35,6 +35,7 @@ from drs_base import AbstractLink
import samba.tests
import random
+from samba import werror, WERRORError
import ldb
from ldb import SCOPE_BASE
@@ -193,6 +194,31 @@ class DrsReplicaSyncTestCase(drs_base.DrsBaseTestCase):
(level, ctr) = self.drs.DsGetNCChanges(self.drs_handle, 8, req8)
self._check_ctr6(ctr, [ou2])
+ def test_do_full_repl_on_ou(self):
+ """
+ Make sure that a full replication on a not-an-nc fails with
+ the right error code
+ """
+
+ ou1 = "OU=get_anc1,%s" % self.ou
+ self.ldb_dc1.add({
+ "dn": ou1,
+ "objectclass": "organizationalUnit"
+ })
+ ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+ req8 = self._exop_req8(dest_dsa=None,
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=ou1,
+ exop=drsuapi.DRSUAPI_EXOP_NONE,
+ replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+
+ try:
+ (level, ctr) = self.drs.DsGetNCChanges(self.drs_handle,
+ 8, req8)
+ self.fail("Expected DsGetNCChanges to fail with WERR_DS_CANT_FIND_EXPECTED_NC")
+ except WERRORError as (enum, estr):
+ self.assertEquals(enum, werror.WERR_DS_CANT_FIND_EXPECTED_NC)
+
def test_link_utdv_hwm(self):
"""Test verify the DRS_GET_ANC behavior."""
--
2.11.0
From 0416fb45aeb540f120d02de18caa107a7d9d079a Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Tue, 8 Aug 2017 16:57:15 +1200
Subject: [PATCH 09/10] selftest: Extend further getnc_unpriv tests to pass
against windows 2012R2
An important change in this patch is changing the ACE type from
A (Allow)
to
AO (Object Allow)
as that will then respect the supplied GUID, which we also make use
the constant from the security.idl.
We split out the tests to check replication with a user having GET_CHANGES
and GET_ALL_CHANGES rights.
Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
selftest/knownfail.d/getnc_unpriv | 20 ++
source4/torture/drs/python/getnc_unpriv.py | 425 ++++++++++++++++++++++++++++-
2 files changed, 436 insertions(+), 9 deletions(-)
create mode 100644 selftest/knownfail.d/getnc_unpriv
diff --git a/selftest/knownfail.d/getnc_unpriv b/selftest/knownfail.d/getnc_unpriv
new file mode 100644
index 00000000000..14d59200c20
--- /dev/null
+++ b/selftest/knownfail.d/getnc_unpriv
@@ -0,0 +1,20 @@
+samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_full_repl_on_ou\(vampire_dc\)
+samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_full_repl_on_ou_unpriv\(vampire_dc\)
+samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl\(vampire_dc\)
+samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl_all\(vampire_dc\)
+samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl_dest_dsa\(vampire_dc\)
+samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl_dest_dsa_unpriv\(vampire_dc\)
+samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl_on_ou\(vampire_dc\)
+samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_single_repl_on_ou_unpriv\(vampire_dc\)
+samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_single_repl_unpriv\(vampire_dc\)
+samba4.drs.getnc_unpriv.python\(vampire_dc\)\(vampire_dc\)
+samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_full_repl_on_ou\(promoted_dc\)
+samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_full_repl_on_ou_unpriv\(promoted_dc\)
+samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl\(promoted_dc\)
+samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl_all\(promoted_dc\)
+samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl_dest_dsa\(promoted_dc\)
+samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl_dest_dsa_unpriv\(promoted_dc\)
+samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl_on_ou\(promoted_dc\)
+samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_single_repl_on_ou_unpriv\(promoted_dc\)
+samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_single_repl_unpriv\(promoted_dc\)
+samba4.drs.getnc_unpriv.python\(promoted_dc\)\(promoted_dc\)
diff --git a/source4/torture/drs/python/getnc_unpriv.py b/source4/torture/drs/python/getnc_unpriv.py
index f691a52117e..7fa34b0d81c 100644
--- a/source4/torture/drs/python/getnc_unpriv.py
+++ b/source4/torture/drs/python/getnc_unpriv.py
@@ -36,12 +36,13 @@ from drs_base import AbstractLink
import samba.tests
import random
import samba
+from samba import werror, WERRORError
from samba import sd_utils
import ldb
from ldb import SCOPE_BASE
-from samba.dcerpc import drsuapi, misc, drsblobs
+from samba.dcerpc import drsuapi, misc, drsblobs, security
from samba.drs_utils import drs_DsBind
from samba.ndr import ndr_unpack, ndr_pack
from samba.credentials import DONT_USE_KERBEROS
@@ -62,10 +63,13 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
(self.drs, self.drs_handle) = self._ds_bind(self.dnsname_dc1)
self.sd_utils = sd_utils.SDUtils(self.ldb_dc1)
- user_dn = "cn=%s,%s" % (self.get_changes_user, self.ou)
- user_sid = self.sd_utils.get_object_sid(user_dn)
- mod = "(A;;CR;1131f6aa-9c07-11d1-f79f-00c04fc2dcd2;;%s)" % str(user_sid)
- self.sd_utils.dacl_add_ace(self.base_dn, mod)
+ self.user_dn = "cn=%s,%s" % (self.get_changes_user, self.ou)
+ user_sid = self.sd_utils.get_object_sid(self.user_dn)
+ self.acl_mod = "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_CHANGES,
+ str(user_sid))
+ self.acl_mod_all = self.acl_mod + "(OA;;CR;%s;;%s)" % (security.GUID_DRS_GET_ALL_CHANGES,
+ str(user_sid))
+ self.desc_sddl = self.sd_utils.get_sd_as_sddl(self.base_dn)
# We set DONT_USE_KERBEROS to avoid a race with getting the
# user replicated to our selected KDC
@@ -77,6 +81,7 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
self.user_creds)
def tearDown(self):
+ self.sd_utils.modify_sd_on_dn(self.base_dn, self.desc_sddl)
try:
self.ldb_dc1.delete(self.ou, ["tree_delete:1"])
except ldb.LdbError as (enum, string):
@@ -89,6 +94,7 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
Make sure that DRSU_EXOP_REPL_OBJ works as a less-privileged
user with the GET_CHANGES right
"""
+ self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod)
ou1 = "OU=get_anc1,%s" % self.ou
self.ldb_dc1.add({
@@ -101,8 +107,332 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
nc_dn_str=ou1,
exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
- (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
- self._check_ctr6(ctr, [ou1])
+ try:
+ (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
+ 8, req8)
+ self.fail("Should have failed with WERR_DS_DRA_ACCESS_DENIED")
+ except WERRORError as (enum, estr):
+ self.assertEquals(enum, werror.WERR_DS_DRA_ACCESS_DENIED)
+
+ def test_do_single_repl_all(self):
+ """
+ Make sure that DRSU_EXOP_REPL_OBJ works as a less-privileged
+ user with the GET_CHANGES right
+ """
+ self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod_all)
+
+ ou1 = "OU=get_anc1,%s" % self.ou
+ self.ldb_dc1.add({
+ "dn": ou1,
+ "objectclass": "organizationalUnit"
+ })
+ ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+ req8 = self._exop_req8(dest_dsa=None,
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=ou1,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
+ replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+ (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
+ 8, req8)
+
+ def test_do_single_repl_pas(self):
+ """
+ Make sure that DRSU_EXOP_REPL_OBJ works as a less-privileged
+ user with the GET_CHANGES right
+ """
+ self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod)
+
+ ou1 = "OU=get_anc1,%s" % self.ou
+ self.ldb_dc1.add({
+ "dn": ou1,
+ "objectclass": "organizationalUnit"
+ })
+ ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+ req8 = self._exop_req8(dest_dsa=None,
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=ou1,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
+ partial_attribute_set=self.get_partial_attribute_set(),
+ replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+ (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
+ 8, req8)
+
+ def test_do_single_repl_unpriv(self):
+ """
+ Make sure that DRSU_EXOP_REPL_OBJ does not work as a less-privileged
+ user with only the GET_CHANGES right
+ """
+
+ ou1 = "OU=get_anc1,%s" % self.ou
+ self.ldb_dc1.add({
+ "dn": ou1,
+ "objectclass": "organizationalUnit"
+ })
+ ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+ req8 = self._exop_req8(dest_dsa=None,
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=ou1,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_OBJ,
+ replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+ try:
+ (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
+ self.fail("Should have failed with WERR_DS_DRA_BAD_DN")
+ except WERRORError as (enum, estr):
+ self.assertEquals(enum, werror.WERR_DS_DRA_BAD_DN)
+
+ def test_do_secret_repl_on_ou(self):
+ """
+ Make sure that DRSU_EXOP_REPL_SECRET does not work
+ as a less-privileged user with the GET_CHANGES right
+ """
+ self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod)
+
+ ou1 = "OU=get_anc1,%s" % self.ou
+ self.ldb_dc1.add({
+ "dn": ou1,
+ "objectclass": "organizationalUnit"
+ })
+ ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+ req8 = self._exop_req8(dest_dsa=None,
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=ou1,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+ replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+ try:
+ (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
+ 8, req8)
+ self.fail("Should have failed with WERR_DS_DRA_ACCESS_DENIED")
+ except WERRORError as (enum, estr):
+ self.assertEquals(enum, werror.WERR_DS_DRA_ACCESS_DENIED)
+
+ def test_do_single_repl_on_ou_unpriv(self):
+ """
+ Make sure that DRSU_EXOP_REPL_SECRET does not work as a less-privileged
+ user without the GET_CHANGES right
+ """
+
+ ou1 = "OU=get_anc1,%s" % self.ou
+ self.ldb_dc1.add({
+ "dn": ou1,
+ "objectclass": "organizationalUnit"
+ })
+ ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+ req8 = self._exop_req8(dest_dsa=None,
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=ou1,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+ replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+ try:
+ (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
+ self.fail("Should have failed with WERR_DS_DRA_BAD_DN")
+ except WERRORError as (enum, estr):
+ self.assertEquals(enum, werror.WERR_DS_DRA_BAD_DN)
+
+ def test_do_secret_repl(self):
+ """
+ Make sure that DRSU_EXOP_REPL_SECRET does not work
+ as a less-privileged user with the GET_CHANGES right
+ """
+ self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod)
+
+ ou1 = "OU=get_anc1,%s" % self.ou
+ self.ldb_dc1.add({
+ "dn": ou1,
+ "objectclass": "organizationalUnit"
+ })
+ ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+ req8 = self._exop_req8(dest_dsa=None,
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=self.user_dn,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+ replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+ try:
+ (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
+ 8, req8)
+ self.fail("Should have failed with WERR_DS_DRA_ACCESS_DENIED")
+ except WERRORError as (enum, estr):
+ self.assertEquals(enum, werror.WERR_DS_DRA_ACCESS_DENIED)
+
+ def test_do_secret_repl_all(self):
+ """
+ Make sure that DRSU_EXOP_REPL_SECRET does not work
+ as a less-privileged user with the GET_CHANGES right
+ """
+ self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod_all)
+
+ ou1 = "OU=get_anc1,%s" % self.ou
+ self.ldb_dc1.add({
+ "dn": ou1,
+ "objectclass": "organizationalUnit"
+ })
+ ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+ req8 = self._exop_req8(dest_dsa=None,
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=self.user_dn,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+ replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+ try:
+ (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
+ 8, req8)
+ self.fail("Should have failed with WERR_DS_DRA_DB_ERROR")
+ except WERRORError as (enum, estr):
+ self.assertEquals(enum, werror.WERR_DS_DRA_DB_ERROR)
+
+
+ def test_do_secret_repl_dest_dsa(self):
+ """
+ Make sure that DRSU_EXOP_REPL_SECRET does not work
+ as a less-privileged user with the GET_CHANGES right
+ """
+ self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod)
+
+ ou1 = "OU=get_anc1,%s" % self.ou
+ self.ldb_dc1.add({
+ "dn": ou1,
+ "objectclass": "organizationalUnit"
+ })
+ ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+ req8 = self._exop_req8(dest_dsa=self.ldb_dc1.get_ntds_GUID(),
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=self.user_dn,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+ replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+ try:
+ (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
+ 8, req8)
+ self.fail("Should have failed with WERR_DS_DRA_ACCESS_DENIED")
+ except WERRORError as (enum, estr):
+ self.assertEquals(enum, werror.WERR_DS_DRA_ACCESS_DENIED)
+
+ def test_do_secret_repl_dest_dsa_all(self):
+ """
+ Make sure that DRSU_EXOP_REPL_SECRET does not work
+ as a less-privileged user with the GET_CHANGES right
+ """
+ self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod_all)
+
+ ou1 = "OU=get_anc1,%s" % self.ou
+ self.ldb_dc1.add({
+ "dn": ou1,
+ "objectclass": "organizationalUnit"
+ })
+ ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+ req8 = self._exop_req8(dest_dsa=self.ldb_dc1.get_ntds_GUID(),
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=self.user_dn,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+ replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP
+ | drsuapi.DRSUAPI_DRS_SYNC_FORCED)
+ try:
+ (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
+ 8, req8)
+ self.fail("Should have failed with an error")
+ except WERRORError as (enum, estr):
+
+ # There are a number of possible errors here, this
+ # is what Samba currently gives.
+
+ self.assertEquals(enum, werror.WERR_DS_DRA_SOURCE_DISABLED)
+
+ def test_do_secret_repl_dest_dsa_unpriv(self):
+ """
+ Make sure that DRSU_EXOP_REPL_SECRET does not work
+ as a less-privileged user with the GET_CHANGES right
+ """
+
+ ou1 = "OU=get_anc1,%s" % self.ou
+ self.ldb_dc1.add({
+ "dn": ou1,
+ "objectclass": "organizationalUnit"
+ })
+ ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+ req8 = self._exop_req8(dest_dsa=self.ldb_dc1.get_ntds_GUID(),
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=self.user_dn,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+ replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+
+ try:
+ (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
+ print ctr.__ndr_print__()
+ self.fail("Should have failed with WERR_DS_DRA_BAD_DN")
+ except WERRORError as (enum, estr):
+ self.assertEquals(enum, werror.WERR_DS_DRA_BAD_DN)
+
+
+ def test_do_single_repl_unpriv(self):
+ """
+ Make sure that DRSU_EXOP_REPL_SECRET does not work as a less-privileged
+ user without the GET_CHANGES right
+ """
+
+ ou1 = "OU=get_anc1,%s" % self.ou
+ self.ldb_dc1.add({
+ "dn": ou1,
+ "objectclass": "organizationalUnit"
+ })
+ ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+ req8 = self._exop_req8(dest_dsa=None,
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=self.user_dn,
+ exop=drsuapi.DRSUAPI_EXOP_REPL_SECRET,
+ replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+ try:
+ (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
+ self.fail("Should have failed with WERR_DS_DRA_BAD_DN")
+ except WERRORError as (enum, estr):
+ self.assertEquals(enum, werror.WERR_DS_DRA_BAD_DN)
+
+ def test_do_full_repl_on_ou(self):
+ """
+ Make sure that a full replication on a not-an-nc fails with
+ the right error code
+ """
+
+ self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod)
+
+ ou1 = "OU=get_anc1,%s" % self.ou
+ self.ldb_dc1.add({
+ "dn": ou1,
+ "objectclass": "organizationalUnit"
+ })
+ ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+ req8 = self._exop_req8(dest_dsa=None,
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=ou1,
+ exop=drsuapi.DRSUAPI_EXOP_NONE,
+ replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+
+ try:
+ (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
+ self.fail("Should have failed with WERR_DS_CANT_FIND_EXPECTED_NC")
+ except WERRORError as (enum, estr):
+ self.assertEquals(enum, werror.WERR_DS_CANT_FIND_EXPECTED_NC)
+
+ def test_do_full_repl_on_ou_unpriv(self):
+ """
+ Make sure that a full replication on a not-an-nc fails with
+ the right error code, for a user with no extra rights
+ """
+
+ ou1 = "OU=get_anc1,%s" % self.ou
+ self.ldb_dc1.add({
+ "dn": ou1,
+ "objectclass": "organizationalUnit"
+ })
+ ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+ req8 = self._exop_req8(dest_dsa=None,
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=ou1,
+ exop=drsuapi.DRSUAPI_EXOP_NONE,
+ replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+
+ try:
+ (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
+ 8, req8)
+ self.fail("Should have failed with WERR_DS_DRA_BAD_DN")
+ except WERRORError as (enum, estr):
+ self.assertEquals(enum, werror.WERR_DS_DRA_BAD_DN)
def test_do_full_repl(self):
"""
@@ -110,6 +440,79 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
user with the GET_CHANGES right
"""
+ self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod)
+ ou1 = "OU=get_anc1,%s" % self.ou
+ self.ldb_dc1.add({
+ "dn": ou1,
+ "objectclass": "organizationalUnit"
+ })
+ ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+ req8 = self._exop_req8(dest_dsa=None,
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=self.ldb_dc1.get_default_basedn(),
+ exop=drsuapi.DRSUAPI_EXOP_NONE,
+ replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+
+ try:
+ (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
+ 8, req8)
+ self.fail("Should have failed with WERR_DS_DRA_ACCESS_DENIED")
+ except WERRORError as (enum, estr):
+ self.assertEquals(enum, werror.WERR_DS_DRA_ACCESS_DENIED)
+
+
+ def test_do_full_repl_all(self):
+ """
+ Make sure that DRSU_EXOP_REPL_OBJ works as a less-privileged
+ user with the GET_CHANGES right
+ """
+
+ self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod_all)
+ ou1 = "OU=get_anc1,%s" % self.ou
+ self.ldb_dc1.add({
+ "dn": ou1,
+ "objectclass": "organizationalUnit"
+ })
+ ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+ req8 = self._exop_req8(dest_dsa=None,
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=self.ldb_dc1.get_default_basedn(),
+ exop=drsuapi.DRSUAPI_EXOP_NONE,
+ replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+
+ (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
+ 8, req8)
+
+
+ def test_do_full_repl_pas(self):
+ """
+ Make sure that DRSU_EXOP_REPL_OBJ works as a less-privileged
+ user with the GET_CHANGES right
+ """
+
+ self.sd_utils.dacl_add_ace(self.base_dn, self.acl_mod)
+ ou1 = "OU=get_anc1,%s" % self.ou
+ self.ldb_dc1.add({
+ "dn": ou1,
+ "objectclass": "organizationalUnit"
+ })
+ ou1_id = self._get_identifier(self.ldb_dc1, ou1)
+ req8 = self._exop_req8(dest_dsa=None,
+ invocation_id=self.ldb_dc1.get_invocation_id(),
+ nc_dn_str=self.ldb_dc1.get_default_basedn(),
+ exop=drsuapi.DRSUAPI_EXOP_NONE,
+ partial_attribute_set=self.get_partial_attribute_set(),
+ replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
+
+ (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
+ 8, req8)
+
+ def test_do_full_repl_unpriv(self):
+ """
+ Make sure that DRSU_EXOP_REPL_OBJ fails as a less-privileged
+ user without the GET_CHANGES right
+ """
+
ou1 = "OU=get_anc1,%s" % self.ou
self.ldb_dc1.add({
"dn": ou1,
@@ -122,6 +525,10 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
exop=drsuapi.DRSUAPI_EXOP_NONE,
replica_flags=drsuapi.DRSUAPI_DRS_WRIT_REP)
- (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
- self.assertEqual(ctr.extended_ret, drsuapi.DRSUAPI_EXOP_ERR_NONE)
+ try:
+ (level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle,
+ 8, req8)
+ self.fail("Should have failed with WERR_DS_DRA_ACCESS_DENIED")
+ except WERRORError as (enum, estr):
+ self.assertEquals(enum, werror.WERR_DS_DRA_ACCESS_DENIED)
--
2.11.0
From 527e2db2e30f224d7e565ad9daa626f60a6fe0db Mon Sep 17 00:00:00 2001
From: Andrew Bartlett <abartlet at samba.org>
Date: Wed, 9 Aug 2017 15:00:31 +1200
Subject: [PATCH 10/10] selftest: Samba checks GetNCChanges access in a
different order to windows
Generally Samba checks access first, before checking the DN
Signed-off-by: Andrew Bartlett <abartlet at samba.org>
---
selftest/knownfail.d/getnc_unpriv | 20 ---------------
source4/torture/drs/python/getnc_unpriv.py | 40 ++++++++++++++++++++++--------
2 files changed, 30 insertions(+), 30 deletions(-)
delete mode 100644 selftest/knownfail.d/getnc_unpriv
diff --git a/selftest/knownfail.d/getnc_unpriv b/selftest/knownfail.d/getnc_unpriv
deleted file mode 100644
index 14d59200c20..00000000000
--- a/selftest/knownfail.d/getnc_unpriv
+++ /dev/null
@@ -1,20 +0,0 @@
-samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_full_repl_on_ou\(vampire_dc\)
-samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_full_repl_on_ou_unpriv\(vampire_dc\)
-samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl\(vampire_dc\)
-samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl_all\(vampire_dc\)
-samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl_dest_dsa\(vampire_dc\)
-samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl_dest_dsa_unpriv\(vampire_dc\)
-samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl_on_ou\(vampire_dc\)
-samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_single_repl_on_ou_unpriv\(vampire_dc\)
-samba4.drs.getnc_unpriv.python\(vampire_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_single_repl_unpriv\(vampire_dc\)
-samba4.drs.getnc_unpriv.python\(vampire_dc\)\(vampire_dc\)
-samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_full_repl_on_ou\(promoted_dc\)
-samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_full_repl_on_ou_unpriv\(promoted_dc\)
-samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl\(promoted_dc\)
-samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl_all\(promoted_dc\)
-samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl_dest_dsa\(promoted_dc\)
-samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl_dest_dsa_unpriv\(promoted_dc\)
-samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_secret_repl_on_ou\(promoted_dc\)
-samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_single_repl_on_ou_unpriv\(promoted_dc\)
-samba4.drs.getnc_unpriv.python\(promoted_dc\).getnc_unpriv.DrsReplicaSyncUnprivTestCase.test_do_single_repl_unpriv\(promoted_dc\)
-samba4.drs.getnc_unpriv.python\(promoted_dc\)\(promoted_dc\)
diff --git a/source4/torture/drs/python/getnc_unpriv.py b/source4/torture/drs/python/getnc_unpriv.py
index 7fa34b0d81c..a9bf9f81612 100644
--- a/source4/torture/drs/python/getnc_unpriv.py
+++ b/source4/torture/drs/python/getnc_unpriv.py
@@ -178,7 +178,9 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
(level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
self.fail("Should have failed with WERR_DS_DRA_BAD_DN")
except WERRORError as (enum, estr):
- self.assertEquals(enum, werror.WERR_DS_DRA_BAD_DN)
+ # Allow Samba to check access first
+ if enum != werror.WERR_DS_DRA_ACCESS_DENIED:
+ self.assertEquals(enum, werror.WERR_DS_DRA_BAD_DN)
def test_do_secret_repl_on_ou(self):
"""
@@ -203,7 +205,9 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
8, req8)
self.fail("Should have failed with WERR_DS_DRA_ACCESS_DENIED")
except WERRORError as (enum, estr):
- self.assertEquals(enum, werror.WERR_DS_DRA_ACCESS_DENIED)
+ # Allow Samba to have a different error here
+ if enum != werror.WERR_DS_DRA_SOURCE_DISABLED:
+ self.assertEquals(enum, werror.WERR_DS_DRA_ACCESS_DENIED)
def test_do_single_repl_on_ou_unpriv(self):
"""
@@ -226,7 +230,9 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
(level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
self.fail("Should have failed with WERR_DS_DRA_BAD_DN")
except WERRORError as (enum, estr):
- self.assertEquals(enum, werror.WERR_DS_DRA_BAD_DN)
+ # Allow Samba to check access first
+ if enum != werror.WERR_DS_DRA_ACCESS_DENIED:
+ self.assertEquals(enum, werror.WERR_DS_DRA_BAD_DN)
def test_do_secret_repl(self):
"""
@@ -251,7 +257,9 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
8, req8)
self.fail("Should have failed with WERR_DS_DRA_ACCESS_DENIED")
except WERRORError as (enum, estr):
- self.assertEquals(enum, werror.WERR_DS_DRA_ACCESS_DENIED)
+ # Allow Samba to have a different error here
+ if enum != werror.WERR_DS_DRA_SOURCE_DISABLED:
+ self.assertEquals(enum, werror.WERR_DS_DRA_ACCESS_DENIED)
def test_do_secret_repl_all(self):
"""
@@ -276,7 +284,9 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
8, req8)
self.fail("Should have failed with WERR_DS_DRA_DB_ERROR")
except WERRORError as (enum, estr):
- self.assertEquals(enum, werror.WERR_DS_DRA_DB_ERROR)
+ # Allow Samba to have a different error here
+ if enum != werror.WERR_DS_DRA_SOURCE_DISABLED:
+ self.assertEquals(enum, werror.WERR_DS_DRA_DB_ERROR)
def test_do_secret_repl_dest_dsa(self):
@@ -302,7 +312,9 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
8, req8)
self.fail("Should have failed with WERR_DS_DRA_ACCESS_DENIED")
except WERRORError as (enum, estr):
- self.assertEquals(enum, werror.WERR_DS_DRA_ACCESS_DENIED)
+ # Allow Samba to have a different error here
+ if enum != werror.WERR_DS_DRA_SOURCE_DISABLED:
+ self.assertEquals(enum, werror.WERR_DS_DRA_ACCESS_DENIED)
def test_do_secret_repl_dest_dsa_all(self):
"""
@@ -357,7 +369,9 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
print ctr.__ndr_print__()
self.fail("Should have failed with WERR_DS_DRA_BAD_DN")
except WERRORError as (enum, estr):
- self.assertEquals(enum, werror.WERR_DS_DRA_BAD_DN)
+ # Allow Samba to have a different error here
+ if enum != werror.WERR_DS_DRA_ACCESS_DENIED:
+ self.assertEquals(enum, werror.WERR_DS_DRA_BAD_DN)
def test_do_single_repl_unpriv(self):
@@ -381,7 +395,9 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
(level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
self.fail("Should have failed with WERR_DS_DRA_BAD_DN")
except WERRORError as (enum, estr):
- self.assertEquals(enum, werror.WERR_DS_DRA_BAD_DN)
+ # Allow Samba to check access first
+ if enum != werror.WERR_DS_DRA_ACCESS_DENIED:
+ self.assertEquals(enum, werror.WERR_DS_DRA_BAD_DN)
def test_do_full_repl_on_ou(self):
"""
@@ -407,7 +423,9 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
(level, ctr) = self.user_drs.DsGetNCChanges(self.user_drs_handle, 8, req8)
self.fail("Should have failed with WERR_DS_CANT_FIND_EXPECTED_NC")
except WERRORError as (enum, estr):
- self.assertEquals(enum, werror.WERR_DS_CANT_FIND_EXPECTED_NC)
+ # Allow Samba to check access first
+ if enum != werror.WERR_DS_DRA_ACCESS_DENIED:
+ self.assertEquals(enum, werror.WERR_DS_CANT_FIND_EXPECTED_NC)
def test_do_full_repl_on_ou_unpriv(self):
"""
@@ -432,7 +450,9 @@ class DrsReplicaSyncUnprivTestCase(drs_base.DrsBaseTestCase):
8, req8)
self.fail("Should have failed with WERR_DS_DRA_BAD_DN")
except WERRORError as (enum, estr):
- self.assertEquals(enum, werror.WERR_DS_DRA_BAD_DN)
+ # Allow Samba to check access first
+ if enum != werror.WERR_DS_DRA_ACCESS_DENIED:
+ self.assertEquals(enum, werror.WERR_DS_DRA_BAD_DN)
def test_do_full_repl(self):
"""
--
2.11.0
More information about the samba-technical
mailing list