[PATCH] extend dsacl command to support display and remove

William Brown william at blackhats.net.au
Wed Apr 25 07:33:25 UTC 2018


On Wed, 2018-04-25 at 08:16 +0300, Alexander Bokovoy wrote:
> On Fri, 20 Apr 2018, William Brown via samba-technical wrote:
> > Hi,
> > 
> > While testing the dsacl command I noticed it was only able to add
> > ACE
> > content to an ACL, not display or remove.
> > 
> > I have extended the dsacl command to support removing ACE and
> > displaying them in a simple "pretty" format.
> 
> Thanks for a useful addition! Comments inline.

Thanks mate! Funnily, that's actually the original code that I moved
into functions out of the class.

Anyway, I have fixed the two whitespace issues, and attaching the
updated patch.

Thank you!



> 
> > 
> > Thanks for your time to review this,
> > 
> > William
> > From 39e3cb2844927a9634818e175a9b0c4753e3afad Mon Sep 17 00:00:00
> > 2001
> > From: William Brown <william at blackhats.net.au>
> > Date: Thu, 19 Apr 2018 14:31:16 +1000
> > Subject: [PATCH] python/samba/netcmd/dsacl.py: support display and
> > remove of
> >  dsacls
> > 
> > The current dsacl command only allowed insertion of an ACE to the
> > ACL. This
> > patch provides the ability to display ACLs as well as removing ACEs
> > from them.
> > This makes the dsacl command much more complete and useful to
> > administrators.
> > 
> > Signed-off-by: William Brown <william at blackhats.net.au>
> > ---
> >  docs-xml/manpages/samba-tool.8.xml     |  12 +-
> >  python/samba/netcmd/dsacl.py           | 257
> > +++++++++++++++++++++++++--------
> >  python/samba/tests/samba_tool/dsacl.py |  93 ++++++++++++
> >  source4/selftest/tests.py              |   1 +
> >  4 files changed, 303 insertions(+), 60 deletions(-)
> >  create mode 100644 python/samba/tests/samba_tool/dsacl.py
> > 
> > diff --git a/docs-xml/manpages/samba-tool.8.xml b/docs-
> > xml/manpages/samba-tool.8.xml
> > index 3cde4c5b6fb..fa132d85d53 100644
> > --- a/docs-xml/manpages/samba-tool.8.xml
> > +++ b/docs-xml/manpages/samba-tool.8.xml
> > @@ -409,9 +409,19 @@
> >  	<para>Administer DS ACLs</para>
> >  </refsect2>
> >  
> > +<refsect3>
> > +	<title>dsacl display</title>
> > +	<para>Display access control entries from a directory
> > object.</para>
> > +</refsect3>
> > +
> > +<refsect3>
> > +	<title>dsacl remove</title>
> > +	<para>Remove access control entries from a directory
> > object.</para>
> > +</refsect3>
> > +
> >  <refsect3>
> >  	<title>dsacl set</title>
> > -	<para>Modify access list on a directory object.</para>
> > +	<para>Add access control entries to a directory
> > object.</para>
> >  </refsect3>
> >  
> >  <refsect2>
> > diff --git a/python/samba/netcmd/dsacl.py
> > b/python/samba/netcmd/dsacl.py
> > index 28aa843adbc..139e8115a5f 100644
> > --- a/python/samba/netcmd/dsacl.py
> > +++ b/python/samba/netcmd/dsacl.py
> > @@ -1,6 +1,7 @@
> >  # Manipulate ACLs on directory objects
> >  #
> >  # Copyright (C) Nadezhda Ivanova <nivanova at samba.org> 2010
> > +# Copyright (C) William Brown <william at blackhats.net.au> 2018
> >  #
> >  # This program is free software; you can redistribute it and/or
> > modify
> >  # it under the terms of the GNU General Public License as
> > published by
> > @@ -42,10 +43,105 @@ from samba.netcmd import (
> >      Option,
> >      )
> >  
> > +def _get_trustee_sid(samdb, trusteedn):
> > +    res = samdb.search(base=trusteedn,
> > expression="(objectClass=*)",
> > +        scope=SCOPE_BASE)
> > +    assert(len(res) == 1)
> > +    return ndr_unpack( security.dom_sid,res[0]["objectSid"][0])
> 
> please remove a space after opening parentheses.
> 
> > +
> > +def _get_domain_sid(samdb):
> > +    res = samdb.search(base=samdb.domain_dn(),
> > +            expression="(objectClass=*)", scope=SCOPE_BASE)
> > +    return ndr_unpack( security.dom_sid,res[0]["objectSid"][0])
> 
> Same here.
> 
> > +
> > +def _get_sddl(samdb, object_dn):
> > +    res = samdb.search(base=object_dn, scope=SCOPE_BASE,
> > +            attrs=["nTSecurityDescriptor"])
> > +    # we should theoretically always have an SD
> > +    assert(len(res) == 1)
> > +    desc = res[0]["nTSecurityDescriptor"][0]
> > +    return ndr_unpack(security.descriptor, desc)
> > +
> > +def _get_sddl_as_str(samdb, desc):
> > +    domain_sid = _get_domain_sid(samdb)
> > +    desc_sddl = desc.as_sddl(domain_sid)
> > +    return desc_sddl
> > +
> > +def _get_sddl_as_pretty_str(samdb, desc):
> > +    desc_sddl = _get_sddl_as_str(samdb, desc)
> > +    # Apply a very simple approach to make this readable - split
> > content to
> > +    # new lines.
> > +    desc_sddl = desc_sddl.replace('(', '\n(')
> > +    # The second step is for handling headers in the middle of
> > content.
> > +    # This finds ) where the next char is != \n, then captures it.
> > +    # We then add )\n and the captured character back.
> > +    desc_sddl = re.sub('\)([^\n])', r')\n\1', desc_sddl)
> > +    return desc_sddl
> > +
> > +def _clear_sddl_inherited(desc_sddl):
> > +    # Could this regex be more precise?
> > +    desc_aces = re.findall("\(.*?\)", desc_sddl)
> > +    for ace in desc_aces:
> > +        # This isn't clear: this is removing inherited descriptors
> > from the
> > +        # set so that when we resend the modification we don't
> > tread on
> > +        # them.
> > +        if ("ID" in ace):
> > +            desc_sddl = desc_sddl.replace(ace, "")
> > +    return desc_sddl
> > +
> > +# Extra handling in libcli/security/security_descriptor.c needs
> > work to
> > +# make this a data manipulation rather than string manipulation.
> > +def _add_ace_sddl(samdb, desc, new_ace):
> > +    """Add new ace explicitly."""
> > +    desc_sddl = _get_sddl_as_str(samdb, desc)
> > +    desc_sddl = _clear_sddl_inherited(desc_sddl)
> > +    if new_ace in desc_sddl:
> > +        return
> > +    #TODO add bindings for descriptor manipulation and get rid of
> > this
> > +    # This is a bit of a "hammer". It works by taking the SDDL
> > like:
> > +    #    D:(...)(...)
> > +    # And injecting our new ACE before the first existing ACE. The
> > issue is
> > +    # if the sddl is not in an expected order, we'll inject the
> > ACE into the
> > +    # incorrect location. As a result we *probably* should make
> > desc type
> > +    # that can handle this case properly and is aware of the
> > sections
> > +    # in the sddl.
> > +    if desc_sddl.find("(") >= 0:
> > +        desc_sddl = desc_sddl.replace('(', '%s(' % new_ace, 1)
> > +    else:
> > +        # If there are no existing ACE entries, this just appends
> > our
> > +        # new content.
> > +        desc_sddl = desc_sddl + new_ace
> > +    desc = security.descriptor.from_sddl(desc_sddl,
> > _get_domain_sid(samdb))
> > +    return desc
> > +
> > +def _del_ace_sddl(samdb, desc, rem_ace):
> > +    desc_sddl = _get_sddl_as_str(samdb, desc)
> > +    desc_sddl = _clear_sddl_inherited(desc_sddl)
> > +    # This would be better with native methods, for now we use str
> > manipulation
> > +    if rem_ace in desc_sddl:
> > +        print('present')
> > +    rem_ace = rem_ace.strip()
> > +    if not rem_ace.startswith('(') or not rem_ace.endswith(')'):
> > +        # Do nothing. Invalid ace. Can we raise a better error?
> > +        return desc
> > +    desc_sddl = desc_sddl.replace(rem_ace.strip(), "")
> > +    if rem_ace in desc_sddl:
> > +        print('still present')
> > +    desc = security.descriptor.from_sddl(desc_sddl,
> > _get_domain_sid(samdb))
> > +    return desc
> > +
> > +def _commit_descriptor(samdb, object_dn, desc, controls=None):
> > +    assert(isinstance(desc, security.descriptor))
> > +    m = ldb.Message()
> > +    m.dn = ldb.Dn(samdb, object_dn)
> > +    m["nTSecurityDescriptor"]= ldb.MessageElement(
> > +            (ndr_pack(desc)), ldb.FLAG_MOD_REPLACE,
> > +            "nTSecurityDescriptor")
> > +    samdb.modify(m)
> >  
> >  
> >  class cmd_dsacl_set(Command):
> > -    """Modify access list on a directory object."""
> > +    """Modify access entries on a directory object."""
> >  
> >      synopsis = "%prog [options]"
> >      car_help = """ The access control right to allow or deny """
> > @@ -83,58 +179,6 @@ class cmd_dsacl_set(Command):
> >              type="string"),
> >          ]
> >  
> > -    def find_trustee_sid(self, samdb, trusteedn):
> > -        res = samdb.search(base=trusteedn,
> > expression="(objectClass=*)",
> > -            scope=SCOPE_BASE)
> > -        assert(len(res) == 1)
> > -        return ndr_unpack(
> > security.dom_sid,res[0]["objectSid"][0])
> 
> same here.
> 
> > -
> > -    def modify_descriptor(self, samdb, object_dn, desc,
> > controls=None):
> > -        assert(isinstance(desc, security.descriptor))
> > -        m = ldb.Message()
> > -        m.dn = ldb.Dn(samdb, object_dn)
> > -        m["nTSecurityDescriptor"]= ldb.MessageElement(
> > -                (ndr_pack(desc)), ldb.FLAG_MOD_REPLACE,
> > -                "nTSecurityDescriptor")
> > -        samdb.modify(m)
> > -
> > -    def read_descriptor(self, samdb, object_dn):
> > -        res = samdb.search(base=object_dn, scope=SCOPE_BASE,
> > -                attrs=["nTSecurityDescriptor"])
> > -        # we should theoretically always have an SD
> > -        assert(len(res) == 1)
> > -        desc = res[0]["nTSecurityDescriptor"][0]
> > -        return ndr_unpack(security.descriptor, desc)
> > -
> > -    def get_domain_sid(self, samdb):
> > -        res = samdb.search(base=samdb.domain_dn(),
> > -                expression="(objectClass=*)", scope=SCOPE_BASE)
> > -        return ndr_unpack(
> > security.dom_sid,res[0]["objectSid"][0])
> 
> same here
> 
> > -
> > -    def add_ace(self, samdb, object_dn, new_ace):
> > -        """Add new ace explicitly."""
> > -        desc = self.read_descriptor(samdb, object_dn)
> > -        desc_sddl = desc.as_sddl(self.get_domain_sid(samdb))
> > -        #TODO add bindings for descriptor manipulation and get rid
> > of this
> > -        desc_aces = re.findall("\(.*?\)", desc_sddl)
> > -        for ace in desc_aces:
> > -            if ("ID" in ace):
> > -                desc_sddl = desc_sddl.replace(ace, "")
> > -        if new_ace in desc_sddl:
> > -            return
> > -        if desc_sddl.find("(") >= 0:
> > -            desc_sddl = desc_sddl[:desc_sddl.index("(")] + new_ace
> > + desc_sddl[desc_sddl.index("("):]
> > -        else:
> > -            desc_sddl = desc_sddl + new_ace
> > -        desc = security.descriptor.from_sddl(desc_sddl,
> > self.get_domain_sid(samdb))
> > -        self.modify_descriptor(samdb, object_dn, desc)
> > -
> > -    def print_new_acl(self, samdb, object_dn):
> > -        desc = self.read_descriptor(samdb, object_dn)
> > -        desc_sddl = desc.as_sddl(self.get_domain_sid(samdb))
> > -        self.outf.write("new descriptor for %s:\n" % object_dn)
> > -        self.outf.write(desc_sddl + "\n")
> > -
> >      def run(self, car, action, objectdn, trusteedn, sddl,
> >              H=None, credopts=None, sambaopts=None,
> > versionopts=None):
> >          lp = sambaopts.get_loadparm()
> > @@ -160,23 +204,118 @@ class cmd_dsacl_set(Command):
> >                  'repl-sync' : GUID_DRS_REPL_SYNCRONIZE,
> >                  'ro-repl-secret-sync' :
> > GUID_DRS_RO_REPL_SECRET_SYNC,
> >                  }
> > -        sid = self.find_trustee_sid(samdb, trusteedn)
> > +
> > +        sid = None
> > +        if trusteedn is not None:
> > +            sid = _get_trustee_sid(samdb, trusteedn)
> > +
> >          if sddl:
> >              new_ace = sddl
> > -        elif action == "allow":
> > +        elif action == "allow" and sid is not None:
> >              new_ace = "(OA;;CR;%s;;%s)" % (cars[car], str(sid))
> > -        elif action == "deny":
> > +        elif action == "deny" and sid is not None:
> >              new_ace = "(OD;;CR;%s;;%s)" % (cars[car], str(sid))
> >          else:
> >              raise CommandError("Wrong argument '%s'!" % action)
> >  
> > -        self.print_new_acl(samdb, objectdn)
> > -        self.add_ace(samdb, objectdn, new_ace)
> > -        self.print_new_acl(samdb, objectdn)
> > +        desc = _get_sddl(samdb, objectdn)
> > +
> > +        orig_desc_sddl = _get_sddl_as_str(samdb, desc)
> > +        self.outf.write("original descriptor for %s:\n" %
> > objectdn)
> > +        self.outf.write(orig_desc_sddl + "\n")
> > +
> > +        # This returns a new descriptor
> > +        desc = _add_ace_sddl(samdb, desc, new_ace)
> > +        _commit_descriptor(samdb, objectdn, desc)
> > +
> > +        new_desc_sddl = _get_sddl_as_str(samdb, desc)
> > +        self.outf.write("new descriptor for %s:\n" % objectdn)
> > +        self.outf.write(new_desc_sddl + "\n")
> > +
> > +class cmd_dsacl_display(Command):
> > +    """Display access entries on a directory object."""
> > +    synopsis = "%prog [options]"
> > +
> > +    takes_optiongroups = {
> > +        "sambaopts": options.SambaOptions,
> > +        "credopts": options.CredentialsOptions,
> > +        "versionopts": options.VersionOptions,
> > +        }
> > +
> > +    takes_options = [
> > +        Option("-H", "--URL", help="LDB URL for database or target
> > server",
> > +               type=str, metavar="URL", dest="H"),
> > +        Option("--objectdn", help="DN of the object whose SD to
> > display",
> > +            type="string"),
> > +        ]
> > +
> > +    def run(self, objectdn, H=None, credopts=None, sambaopts=None,
> > +            versionopts=None):
> > +        lp = sambaopts.get_loadparm()
> > +        creds = credopts.get_credentials(lp)
> > +
> > +        # Can't display if we don't have a target!
> > +        if objectdn is None :
> > +            return self.usage()
> > +
> > +        samdb = SamDB(url=H, session_info=system_session(),
> > +            credentials=creds, lp=lp)
> > +
> > +        desc = _get_sddl(samdb, objectdn)
> > +        desc_sddl = _get_sddl_as_pretty_str(samdb, desc)
> > +        self.outf.write("descriptor for %s:\n" % objectdn)
> > +        self.outf.write(desc_sddl + "\n")
> > +
> > +class cmd_dsacl_remove(Command):
> > +    """Remove an access entry from an object."""
> > +    synopsis = "%prog [options]"
> > +
> > +    takes_optiongroups = {
> > +        "sambaopts": options.SambaOptions,
> > +        "credopts": options.CredentialsOptions,
> > +        "versionopts": options.VersionOptions,
> > +        }
> > +
> > +    takes_options = [
> > +        Option("-H", "--URL", help="LDB URL for database or target
> > server",
> > +               type=str, metavar="URL", dest="H"),
> > +        Option("--objectdn", help="DN of the object whose SD to
> > display",
> > +            type="string"),
> > +        Option("--sddl", help="An ACE to be removed from the
> > object",
> > +            type="string"),
> > +        ]
> > +
> > +    def run(self, objectdn, sddl, H=None, credopts=None,
> > sambaopts=None,
> > +            versionopts=None):
> > +        lp = sambaopts.get_loadparm()
> > +        creds = credopts.get_credentials(lp)
> > +
> > +        if objectdn is None or sddl is None:
> > +            return self.usage()
> > +
> > +        samdb = SamDB(url=H, session_info=system_session(),
> > +            credentials=creds, lp=lp)
> > +
> > +        # Now remove the sddl from the descriptor, or raise "not
> > found".
> > +        desc = _get_sddl(samdb, objectdn)
> > +
> > +        orig_desc_sddl = _get_sddl_as_str(samdb, desc)
> > +        self.outf.write("original descriptor for %s:\n" %
> > objectdn)
> > +        self.outf.write(orig_desc_sddl + "\n")
> > +
> > +        # This returns a new descriptor
> > +        desc = _del_ace_sddl(samdb, desc, sddl)
> > +        _commit_descriptor(samdb, objectdn, desc)
> >  
> > +        new_desc_sddl = _get_sddl_as_str(samdb, desc)
> > +        self.outf.write("new descriptor for %s:\n" % objectdn)
> > +        self.outf.write(new_desc_sddl + "\n")
> >  
> >  class cmd_dsacl(SuperCommand):
> >      """DS ACLs manipulation."""
> >  
> >      subcommands = {}
> >      subcommands["set"] = cmd_dsacl_set()
> > +    subcommands["display"] = cmd_dsacl_display()
> > +    subcommands["remove"] = cmd_dsacl_remove()
> > +    # subcommands["replace"] = cmd_dsacl_replace()
> > diff --git a/python/samba/tests/samba_tool/dsacl.py
> > b/python/samba/tests/samba_tool/dsacl.py
> > new file mode 100644
> > index 00000000000..0bd4c38f0ab
> > --- /dev/null
> > +++ b/python/samba/tests/samba_tool/dsacl.py
> > @@ -0,0 +1,93 @@
> > +# Unix SMB/CIFS implementation.
> > +# Copyright (C) William Brown <william at blackhats.net.au> 2018
> > +#
> > +# This program is free software; you can redistribute it and/or
> > modify
> > +# it under the terms of the GNU General Public License as
> > published by
> > +# the Free Software Foundation; either version 3 of the License,
> > or
> > +# (at your option) any later version.
> > +#
> > +# This program is distributed in the hope that it will be useful,
> > +# but WITHOUT ANY WARRANTY; without even the implied warranty of
> > +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> > +# GNU General Public License for more details.
> > +#
> > +# You should have received a copy of the GNU General Public
> > License
> > +# along with this program.  If not, see <http://www.gnu.org/licens
> > es/>.
> > +#
> > +
> > +import os
> > +import ldb
> > +from samba.tests.samba_tool.base import SambaToolCmdTest
> > +
> > +class DsaclCmdTestCase(SambaToolCmdTest):
> > +    """Tests for samba-tool dsacl subcommands"""
> > +    samdb = None
> > +
> > +    def setUp(self):
> > +        super(DsaclCmdTestCase, self).setUp()
> > +        self.samdb = self.getSamDB("-H", "ldap://%s" %
> > os.environ["DC_SERVER"],
> > +            "-U%s%%%s" % (os.environ["DC_USERNAME"],
> > os.environ["DC_PASSWORD"]))
> > +        # self.runsubcmd("dsacl", "set", "")
> > +        # we add a group to manipulate the acl on
> > +        self.runsubcmd("group", "add", "dsacl_test",
> > +                              "-H", "ldap://%s" %
> > os.environ["DC_SERVER"],
> > +                              "-U%s%%%s" %
> > (os.environ["DC_USERNAME"],
> > +                                            os.environ["DC_PASSWOR
> > D"]))
> > +
> > +    def tearDown(self):
> > +        super(DsaclCmdTestCase, self).tearDown()
> > +        # Remove our test group
> > +        if self._find_group():
> > +            self.runsubcmd("group", "delete", "dsacl_test",
> > +                                  "-H", "ldap://%s" %
> > os.environ["DC_SERVER"],
> > +                                  "-U%s%%%s" %
> > (os.environ["DC_USERNAME"],
> > +                                                os.environ["DC_PAS
> > SWORD"]))
> > +
> > +    def _find_group(self):
> > +        search_filter =
> > ("(&(sAMAccountName=%s)(objectCategory=%s,%s))" %
> > +                         (ldb.binary_encode("dsacl_test"),
> > +                         "CN=Group,CN=Schema,CN=Configuration",
> > +                         self.samdb.domain_dn()))
> > +        grouplist = self.samdb.search(base=self.samdb.domain_dn(),
> > +                                      scope=ldb.SCOPE_SUBTREE,
> > +                                      expression=search_filter,
> > +                                      attrs=[])
> > +        if grouplist:
> > +            return grouplist[0]
> > +        else:
> > +            return None
> > +
> > +    def test_display(self):
> > +        """Tests that we can display ACE"""
> > +        x = self._find_group()
> > +
> > +        (result, out, err) = self.runsubcmd("dsacl", "display",
> > +                              "--objectdn=%s" % x['dn'],
> > +                              "-H", "ldap://%s" %
> > os.environ["DC_SERVER"],
> > +                              "-U%s%%%s" %
> > (os.environ["DC_USERNAME"],
> > +                                            os.environ["DC_PASSWOR
> > D"]))
> > +
> > +        self.assertCmdSuccess(result, out, err)
> > +
> > +    def test_add_remove(self):
> > +        """Tests a forced error"""
> > +        x = self._find_group()
> > +
> > +        (result, out, err) = self.runsubcmd("dsacl", "set",
> > +                              "--objectdn=%s" % x['dn'],
> > +                              "--sddl=(A;;RPLCLORC;;;AN)",
> > +                              "-H", "ldap://%s" %
> > os.environ["DC_SERVER"],
> > +                              "-U%s%%%s" %
> > (os.environ["DC_USERNAME"],
> > +                                            os.environ["DC_PASSWOR
> > D"]))
> > +
> > +        self.assertCmdSuccess(result, out, err)
> > +
> > +        (result, out, err) = self.runsubcmd("dsacl", "remove",
> > +                              "--objectdn=%s" % x['dn'],
> > +                              "--sddl=(A;;RPLCLORC;;;AN)",
> > +                              "-H", "ldap://%s" %
> > os.environ["DC_SERVER"],
> > +                              "-U%s%%%s" %
> > (os.environ["DC_USERNAME"],
> > +                                            os.environ["DC_PASSWOR
> > D"]))
> > +
> > +        self.assertCmdSuccess(result, out, err)
> > +
> > diff --git a/source4/selftest/tests.py b/source4/selftest/tests.py
> > index d1e5bc6a509..3757aed83aa 100755
> > --- a/source4/selftest/tests.py
> > +++ b/source4/selftest/tests.py
> > @@ -611,6 +611,7 @@ planpythontestsuite("chgdcpass:local",
> > "samba.tests.samba_tool.user_check_passwo
> >  planpythontestsuite("ad_dc_ntvfs:local",
> > "samba.tests.samba_tool.group")
> >  planpythontestsuite("ad_dc_ntvfs:local",
> > "samba.tests.samba_tool.ou")
> >  planpythontestsuite("ad_dc_ntvfs:local",
> > "samba.tests.samba_tool.computer")
> > +planpythontestsuite("ad_dc_ntvfs:local",
> > "samba.tests.samba_tool.dsacl")
> >  planpythontestsuite("ad_dc:local", "samba.tests.samba_tool.ntacl")
> >  planpythontestsuite("none",
> > "samba.tests.samba_tool.provision_password_check")
> >  planpythontestsuite("none", "samba.tests.samba_tool.help")
> > -- 
> > 2.14.3
> > 
> 
> 
-------------- next part --------------
A non-text attachment was scrubbed...
Name: 0001-python-samba-netcmd-dsacl.py-support-display-and-rem.patch
Type: text/x-patch
Size: 18786 bytes
Desc: not available
URL: <http://lists.samba.org/pipermail/samba-technical/attachments/20180425/a20a5d15/0001-python-samba-netcmd-dsacl.py-support-display-and-rem.bin>


More information about the samba-technical mailing list