[PATCH] extend dsacl command to support display and remove

Alexander Bokovoy ab at samba.org
Wed Apr 25 05:16:19 UTC 2018


On Fri, 20 Apr 2018, William Brown via samba-technical wrote:
> Hi,
> 
> While testing the dsacl command I noticed it was only able to add ACE
> content to an ACL, not display or remove.
> 
> I have extended the dsacl command to support removing ACE and
> displaying them in a simple "pretty" format.
Thanks for a useful addition! Comments inline.

> 
> Thanks for your time to review this,
> 
> William

> From 39e3cb2844927a9634818e175a9b0c4753e3afad Mon Sep 17 00:00:00 2001
> From: William Brown <william at blackhats.net.au>
> Date: Thu, 19 Apr 2018 14:31:16 +1000
> Subject: [PATCH] python/samba/netcmd/dsacl.py: support display and remove of
>  dsacls
> 
> The current dsacl command only allowed insertion of an ACE to the ACL. This
> patch provides the ability to display ACLs as well as removing ACEs from them.
> This makes the dsacl command much more complete and useful to administrators.
> 
> Signed-off-by: William Brown <william at blackhats.net.au>
> ---
>  docs-xml/manpages/samba-tool.8.xml     |  12 +-
>  python/samba/netcmd/dsacl.py           | 257 +++++++++++++++++++++++++--------
>  python/samba/tests/samba_tool/dsacl.py |  93 ++++++++++++
>  source4/selftest/tests.py              |   1 +
>  4 files changed, 303 insertions(+), 60 deletions(-)
>  create mode 100644 python/samba/tests/samba_tool/dsacl.py
> 
> diff --git a/docs-xml/manpages/samba-tool.8.xml b/docs-xml/manpages/samba-tool.8.xml
> index 3cde4c5b6fb..fa132d85d53 100644
> --- a/docs-xml/manpages/samba-tool.8.xml
> +++ b/docs-xml/manpages/samba-tool.8.xml
> @@ -409,9 +409,19 @@
>  	<para>Administer DS ACLs</para>
>  </refsect2>
>  
> +<refsect3>
> +	<title>dsacl display</title>
> +	<para>Display access control entries from a directory object.</para>
> +</refsect3>
> +
> +<refsect3>
> +	<title>dsacl remove</title>
> +	<para>Remove access control entries from a directory object.</para>
> +</refsect3>
> +
>  <refsect3>
>  	<title>dsacl set</title>
> -	<para>Modify access list on a directory object.</para>
> +	<para>Add access control entries to a directory object.</para>
>  </refsect3>
>  
>  <refsect2>
> diff --git a/python/samba/netcmd/dsacl.py b/python/samba/netcmd/dsacl.py
> index 28aa843adbc..139e8115a5f 100644
> --- a/python/samba/netcmd/dsacl.py
> +++ b/python/samba/netcmd/dsacl.py
> @@ -1,6 +1,7 @@
>  # Manipulate ACLs on directory objects
>  #
>  # Copyright (C) Nadezhda Ivanova <nivanova at samba.org> 2010
> +# Copyright (C) William Brown <william at blackhats.net.au> 2018
>  #
>  # This program is free software; you can redistribute it and/or modify
>  # it under the terms of the GNU General Public License as published by
> @@ -42,10 +43,105 @@ from samba.netcmd import (
>      Option,
>      )
>  
> +def _get_trustee_sid(samdb, trusteedn):
> +    res = samdb.search(base=trusteedn, expression="(objectClass=*)",
> +        scope=SCOPE_BASE)
> +    assert(len(res) == 1)
> +    return ndr_unpack( security.dom_sid,res[0]["objectSid"][0])
please remove a space after opening parentheses.

> +
> +def _get_domain_sid(samdb):
> +    res = samdb.search(base=samdb.domain_dn(),
> +            expression="(objectClass=*)", scope=SCOPE_BASE)
> +    return ndr_unpack( security.dom_sid,res[0]["objectSid"][0])
Same here.

> +
> +def _get_sddl(samdb, object_dn):
> +    res = samdb.search(base=object_dn, scope=SCOPE_BASE,
> +            attrs=["nTSecurityDescriptor"])
> +    # we should theoretically always have an SD
> +    assert(len(res) == 1)
> +    desc = res[0]["nTSecurityDescriptor"][0]
> +    return ndr_unpack(security.descriptor, desc)
> +
> +def _get_sddl_as_str(samdb, desc):
> +    domain_sid = _get_domain_sid(samdb)
> +    desc_sddl = desc.as_sddl(domain_sid)
> +    return desc_sddl
> +
> +def _get_sddl_as_pretty_str(samdb, desc):
> +    desc_sddl = _get_sddl_as_str(samdb, desc)
> +    # Apply a very simple approach to make this readable - split content to
> +    # new lines.
> +    desc_sddl = desc_sddl.replace('(', '\n(')
> +    # The second step is for handling headers in the middle of content.
> +    # This finds ) where the next char is != \n, then captures it.
> +    # We then add )\n and the captured character back.
> +    desc_sddl = re.sub('\)([^\n])', r')\n\1', desc_sddl)
> +    return desc_sddl
> +
> +def _clear_sddl_inherited(desc_sddl):
> +    # Could this regex be more precise?
> +    desc_aces = re.findall("\(.*?\)", desc_sddl)
> +    for ace in desc_aces:
> +        # This isn't clear: this is removing inherited descriptors from the
> +        # set so that when we resend the modification we don't tread on
> +        # them.
> +        if ("ID" in ace):
> +            desc_sddl = desc_sddl.replace(ace, "")
> +    return desc_sddl
> +
> +# Extra handling in libcli/security/security_descriptor.c needs work to
> +# make this a data manipulation rather than string manipulation.
> +def _add_ace_sddl(samdb, desc, new_ace):
> +    """Add new ace explicitly."""
> +    desc_sddl = _get_sddl_as_str(samdb, desc)
> +    desc_sddl = _clear_sddl_inherited(desc_sddl)
> +    if new_ace in desc_sddl:
> +        return
> +    #TODO add bindings for descriptor manipulation and get rid of this
> +    # This is a bit of a "hammer". It works by taking the SDDL like:
> +    #    D:(...)(...)
> +    # And injecting our new ACE before the first existing ACE. The issue is
> +    # if the sddl is not in an expected order, we'll inject the ACE into the
> +    # incorrect location. As a result we *probably* should make desc type
> +    # that can handle this case properly and is aware of the sections
> +    # in the sddl.
> +    if desc_sddl.find("(") >= 0:
> +        desc_sddl = desc_sddl.replace('(', '%s(' % new_ace, 1)
> +    else:
> +        # If there are no existing ACE entries, this just appends our
> +        # new content.
> +        desc_sddl = desc_sddl + new_ace
> +    desc = security.descriptor.from_sddl(desc_sddl, _get_domain_sid(samdb))
> +    return desc
> +
> +def _del_ace_sddl(samdb, desc, rem_ace):
> +    desc_sddl = _get_sddl_as_str(samdb, desc)
> +    desc_sddl = _clear_sddl_inherited(desc_sddl)
> +    # This would be better with native methods, for now we use str manipulation
> +    if rem_ace in desc_sddl:
> +        print('present')
> +    rem_ace = rem_ace.strip()
> +    if not rem_ace.startswith('(') or not rem_ace.endswith(')'):
> +        # Do nothing. Invalid ace. Can we raise a better error?
> +        return desc
> +    desc_sddl = desc_sddl.replace(rem_ace.strip(), "")
> +    if rem_ace in desc_sddl:
> +        print('still present')
> +    desc = security.descriptor.from_sddl(desc_sddl, _get_domain_sid(samdb))
> +    return desc
> +
> +def _commit_descriptor(samdb, object_dn, desc, controls=None):
> +    assert(isinstance(desc, security.descriptor))
> +    m = ldb.Message()
> +    m.dn = ldb.Dn(samdb, object_dn)
> +    m["nTSecurityDescriptor"]= ldb.MessageElement(
> +            (ndr_pack(desc)), ldb.FLAG_MOD_REPLACE,
> +            "nTSecurityDescriptor")
> +    samdb.modify(m)
>  
>  
>  class cmd_dsacl_set(Command):
> -    """Modify access list on a directory object."""
> +    """Modify access entries on a directory object."""
>  
>      synopsis = "%prog [options]"
>      car_help = """ The access control right to allow or deny """
> @@ -83,58 +179,6 @@ class cmd_dsacl_set(Command):
>              type="string"),
>          ]
>  
> -    def find_trustee_sid(self, samdb, trusteedn):
> -        res = samdb.search(base=trusteedn, expression="(objectClass=*)",
> -            scope=SCOPE_BASE)
> -        assert(len(res) == 1)
> -        return ndr_unpack( security.dom_sid,res[0]["objectSid"][0])
same here.

> -
> -    def modify_descriptor(self, samdb, object_dn, desc, controls=None):
> -        assert(isinstance(desc, security.descriptor))
> -        m = ldb.Message()
> -        m.dn = ldb.Dn(samdb, object_dn)
> -        m["nTSecurityDescriptor"]= ldb.MessageElement(
> -                (ndr_pack(desc)), ldb.FLAG_MOD_REPLACE,
> -                "nTSecurityDescriptor")
> -        samdb.modify(m)
> -
> -    def read_descriptor(self, samdb, object_dn):
> -        res = samdb.search(base=object_dn, scope=SCOPE_BASE,
> -                attrs=["nTSecurityDescriptor"])
> -        # we should theoretically always have an SD
> -        assert(len(res) == 1)
> -        desc = res[0]["nTSecurityDescriptor"][0]
> -        return ndr_unpack(security.descriptor, desc)
> -
> -    def get_domain_sid(self, samdb):
> -        res = samdb.search(base=samdb.domain_dn(),
> -                expression="(objectClass=*)", scope=SCOPE_BASE)
> -        return ndr_unpack( security.dom_sid,res[0]["objectSid"][0])
same here

> -
> -    def add_ace(self, samdb, object_dn, new_ace):
> -        """Add new ace explicitly."""
> -        desc = self.read_descriptor(samdb, object_dn)
> -        desc_sddl = desc.as_sddl(self.get_domain_sid(samdb))
> -        #TODO add bindings for descriptor manipulation and get rid of this
> -        desc_aces = re.findall("\(.*?\)", desc_sddl)
> -        for ace in desc_aces:
> -            if ("ID" in ace):
> -                desc_sddl = desc_sddl.replace(ace, "")
> -        if new_ace in desc_sddl:
> -            return
> -        if desc_sddl.find("(") >= 0:
> -            desc_sddl = desc_sddl[:desc_sddl.index("(")] + new_ace + desc_sddl[desc_sddl.index("("):]
> -        else:
> -            desc_sddl = desc_sddl + new_ace
> -        desc = security.descriptor.from_sddl(desc_sddl, self.get_domain_sid(samdb))
> -        self.modify_descriptor(samdb, object_dn, desc)
> -
> -    def print_new_acl(self, samdb, object_dn):
> -        desc = self.read_descriptor(samdb, object_dn)
> -        desc_sddl = desc.as_sddl(self.get_domain_sid(samdb))
> -        self.outf.write("new descriptor for %s:\n" % object_dn)
> -        self.outf.write(desc_sddl + "\n")
> -
>      def run(self, car, action, objectdn, trusteedn, sddl,
>              H=None, credopts=None, sambaopts=None, versionopts=None):
>          lp = sambaopts.get_loadparm()
> @@ -160,23 +204,118 @@ class cmd_dsacl_set(Command):
>                  'repl-sync' : GUID_DRS_REPL_SYNCRONIZE,
>                  'ro-repl-secret-sync' : GUID_DRS_RO_REPL_SECRET_SYNC,
>                  }
> -        sid = self.find_trustee_sid(samdb, trusteedn)
> +
> +        sid = None
> +        if trusteedn is not None:
> +            sid = _get_trustee_sid(samdb, trusteedn)
> +
>          if sddl:
>              new_ace = sddl
> -        elif action == "allow":
> +        elif action == "allow" and sid is not None:
>              new_ace = "(OA;;CR;%s;;%s)" % (cars[car], str(sid))
> -        elif action == "deny":
> +        elif action == "deny" and sid is not None:
>              new_ace = "(OD;;CR;%s;;%s)" % (cars[car], str(sid))
>          else:
>              raise CommandError("Wrong argument '%s'!" % action)
>  
> -        self.print_new_acl(samdb, objectdn)
> -        self.add_ace(samdb, objectdn, new_ace)
> -        self.print_new_acl(samdb, objectdn)
> +        desc = _get_sddl(samdb, objectdn)
> +
> +        orig_desc_sddl = _get_sddl_as_str(samdb, desc)
> +        self.outf.write("original descriptor for %s:\n" % objectdn)
> +        self.outf.write(orig_desc_sddl + "\n")
> +
> +        # This returns a new descriptor
> +        desc = _add_ace_sddl(samdb, desc, new_ace)
> +        _commit_descriptor(samdb, objectdn, desc)
> +
> +        new_desc_sddl = _get_sddl_as_str(samdb, desc)
> +        self.outf.write("new descriptor for %s:\n" % objectdn)
> +        self.outf.write(new_desc_sddl + "\n")
> +
> +class cmd_dsacl_display(Command):
> +    """Display access entries on a directory object."""
> +    synopsis = "%prog [options]"
> +
> +    takes_optiongroups = {
> +        "sambaopts": options.SambaOptions,
> +        "credopts": options.CredentialsOptions,
> +        "versionopts": options.VersionOptions,
> +        }
> +
> +    takes_options = [
> +        Option("-H", "--URL", help="LDB URL for database or target server",
> +               type=str, metavar="URL", dest="H"),
> +        Option("--objectdn", help="DN of the object whose SD to display",
> +            type="string"),
> +        ]
> +
> +    def run(self, objectdn, H=None, credopts=None, sambaopts=None,
> +            versionopts=None):
> +        lp = sambaopts.get_loadparm()
> +        creds = credopts.get_credentials(lp)
> +
> +        # Can't display if we don't have a target!
> +        if objectdn is None :
> +            return self.usage()
> +
> +        samdb = SamDB(url=H, session_info=system_session(),
> +            credentials=creds, lp=lp)
> +
> +        desc = _get_sddl(samdb, objectdn)
> +        desc_sddl = _get_sddl_as_pretty_str(samdb, desc)
> +        self.outf.write("descriptor for %s:\n" % objectdn)
> +        self.outf.write(desc_sddl + "\n")
> +
> +class cmd_dsacl_remove(Command):
> +    """Remove an access entry from an object."""
> +    synopsis = "%prog [options]"
> +
> +    takes_optiongroups = {
> +        "sambaopts": options.SambaOptions,
> +        "credopts": options.CredentialsOptions,
> +        "versionopts": options.VersionOptions,
> +        }
> +
> +    takes_options = [
> +        Option("-H", "--URL", help="LDB URL for database or target server",
> +               type=str, metavar="URL", dest="H"),
> +        Option("--objectdn", help="DN of the object whose SD to display",
> +            type="string"),
> +        Option("--sddl", help="An ACE to be removed from the object",
> +            type="string"),
> +        ]
> +
> +    def run(self, objectdn, sddl, H=None, credopts=None, sambaopts=None,
> +            versionopts=None):
> +        lp = sambaopts.get_loadparm()
> +        creds = credopts.get_credentials(lp)
> +
> +        if objectdn is None or sddl is None:
> +            return self.usage()
> +
> +        samdb = SamDB(url=H, session_info=system_session(),
> +            credentials=creds, lp=lp)
> +
> +        # Now remove the sddl from the descriptor, or raise "not found".
> +        desc = _get_sddl(samdb, objectdn)
> +
> +        orig_desc_sddl = _get_sddl_as_str(samdb, desc)
> +        self.outf.write("original descriptor for %s:\n" % objectdn)
> +        self.outf.write(orig_desc_sddl + "\n")
> +
> +        # This returns a new descriptor
> +        desc = _del_ace_sddl(samdb, desc, sddl)
> +        _commit_descriptor(samdb, objectdn, desc)
>  
> +        new_desc_sddl = _get_sddl_as_str(samdb, desc)
> +        self.outf.write("new descriptor for %s:\n" % objectdn)
> +        self.outf.write(new_desc_sddl + "\n")
>  
>  class cmd_dsacl(SuperCommand):
>      """DS ACLs manipulation."""
>  
>      subcommands = {}
>      subcommands["set"] = cmd_dsacl_set()
> +    subcommands["display"] = cmd_dsacl_display()
> +    subcommands["remove"] = cmd_dsacl_remove()
> +    # subcommands["replace"] = cmd_dsacl_replace()
> diff --git a/python/samba/tests/samba_tool/dsacl.py b/python/samba/tests/samba_tool/dsacl.py
> new file mode 100644
> index 00000000000..0bd4c38f0ab
> --- /dev/null
> +++ b/python/samba/tests/samba_tool/dsacl.py
> @@ -0,0 +1,93 @@
> +# Unix SMB/CIFS implementation.
> +# Copyright (C) William Brown <william at blackhats.net.au> 2018
> +#
> +# This program is free software; you can redistribute it and/or modify
> +# it under the terms of the GNU General Public License as published by
> +# the Free Software Foundation; either version 3 of the License, or
> +# (at your option) any later version.
> +#
> +# This program is distributed in the hope that it will be useful,
> +# but WITHOUT ANY WARRANTY; without even the implied warranty of
> +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> +# GNU General Public License for more details.
> +#
> +# You should have received a copy of the GNU General Public License
> +# along with this program.  If not, see <http://www.gnu.org/licenses/>.
> +#
> +
> +import os
> +import ldb
> +from samba.tests.samba_tool.base import SambaToolCmdTest
> +
> +class DsaclCmdTestCase(SambaToolCmdTest):
> +    """Tests for samba-tool dsacl subcommands"""
> +    samdb = None
> +
> +    def setUp(self):
> +        super(DsaclCmdTestCase, self).setUp()
> +        self.samdb = self.getSamDB("-H", "ldap://%s" % os.environ["DC_SERVER"],
> +            "-U%s%%%s" % (os.environ["DC_USERNAME"], os.environ["DC_PASSWORD"]))
> +        # self.runsubcmd("dsacl", "set", "")
> +        # we add a group to manipulate the acl on
> +        self.runsubcmd("group", "add", "dsacl_test",
> +                              "-H", "ldap://%s" % os.environ["DC_SERVER"],
> +                              "-U%s%%%s" % (os.environ["DC_USERNAME"],
> +                                            os.environ["DC_PASSWORD"]))
> +
> +    def tearDown(self):
> +        super(DsaclCmdTestCase, self).tearDown()
> +        # Remove our test group
> +        if self._find_group():
> +            self.runsubcmd("group", "delete", "dsacl_test",
> +                                  "-H", "ldap://%s" % os.environ["DC_SERVER"],
> +                                  "-U%s%%%s" % (os.environ["DC_USERNAME"],
> +                                                os.environ["DC_PASSWORD"]))
> +
> +    def _find_group(self):
> +        search_filter = ("(&(sAMAccountName=%s)(objectCategory=%s,%s))" %
> +                         (ldb.binary_encode("dsacl_test"),
> +                         "CN=Group,CN=Schema,CN=Configuration",
> +                         self.samdb.domain_dn()))
> +        grouplist = self.samdb.search(base=self.samdb.domain_dn(),
> +                                      scope=ldb.SCOPE_SUBTREE,
> +                                      expression=search_filter,
> +                                      attrs=[])
> +        if grouplist:
> +            return grouplist[0]
> +        else:
> +            return None
> +
> +    def test_display(self):
> +        """Tests that we can display ACE"""
> +        x = self._find_group()
> +
> +        (result, out, err) = self.runsubcmd("dsacl", "display",
> +                              "--objectdn=%s" % x['dn'],
> +                              "-H", "ldap://%s" % os.environ["DC_SERVER"],
> +                              "-U%s%%%s" % (os.environ["DC_USERNAME"],
> +                                            os.environ["DC_PASSWORD"]))
> +
> +        self.assertCmdSuccess(result, out, err)
> +
> +    def test_add_remove(self):
> +        """Tests a forced error"""
> +        x = self._find_group()
> +
> +        (result, out, err) = self.runsubcmd("dsacl", "set",
> +                              "--objectdn=%s" % x['dn'],
> +                              "--sddl=(A;;RPLCLORC;;;AN)",
> +                              "-H", "ldap://%s" % os.environ["DC_SERVER"],
> +                              "-U%s%%%s" % (os.environ["DC_USERNAME"],
> +                                            os.environ["DC_PASSWORD"]))
> +
> +        self.assertCmdSuccess(result, out, err)
> +
> +        (result, out, err) = self.runsubcmd("dsacl", "remove",
> +                              "--objectdn=%s" % x['dn'],
> +                              "--sddl=(A;;RPLCLORC;;;AN)",
> +                              "-H", "ldap://%s" % os.environ["DC_SERVER"],
> +                              "-U%s%%%s" % (os.environ["DC_USERNAME"],
> +                                            os.environ["DC_PASSWORD"]))
> +
> +        self.assertCmdSuccess(result, out, err)
> +
> diff --git a/source4/selftest/tests.py b/source4/selftest/tests.py
> index d1e5bc6a509..3757aed83aa 100755
> --- a/source4/selftest/tests.py
> +++ b/source4/selftest/tests.py
> @@ -611,6 +611,7 @@ planpythontestsuite("chgdcpass:local", "samba.tests.samba_tool.user_check_passwo
>  planpythontestsuite("ad_dc_ntvfs:local", "samba.tests.samba_tool.group")
>  planpythontestsuite("ad_dc_ntvfs:local", "samba.tests.samba_tool.ou")
>  planpythontestsuite("ad_dc_ntvfs:local", "samba.tests.samba_tool.computer")
> +planpythontestsuite("ad_dc_ntvfs:local", "samba.tests.samba_tool.dsacl")
>  planpythontestsuite("ad_dc:local", "samba.tests.samba_tool.ntacl")
>  planpythontestsuite("none", "samba.tests.samba_tool.provision_password_check")
>  planpythontestsuite("none", "samba.tests.samba_tool.help")
> -- 
> 2.14.3
> 


-- 
/ Alexander Bokovoy



More information about the samba-technical mailing list