[SCM] Samba Shared Repository - branch master updated

Andrew Bartlett abartlet at samba.org
Fri Jun 11 09:30:01 UTC 2021


The branch, master has been updated
       via  4152499652c pytests: add dns_aging, embracing and extending ageing tests
       via  e9a265612a7 py: samba.dnsserver: add helper for record buffers
       via  581d7a528e8 pytest:dns_base: make_txt_update can set arbitrary TTL
       via  e37437f1ff0 pydns: expose dns_records_match() as dsdb_dns.records.match()
       via  b7077203256 dns: merge dns_records_match and dns_record_match
       via  64e637802fc dlz: remove pretense of HINFO support
       via  51ace4d0010 dns_record_match: drop pretense of HINFO support
       via  341febfb264 dns common: dns_records_match() matches tombstones
       via  070e7113d4c dns: merge dlz/internal dns_records_match()
       via  f6025d9f340 dlz_bind9: remove redundant logging in b9_record_match()
       via  421dc7fc4d8 python:subunit: Avoid misleading "Test was never started" error message
       via  18b78fa4b46 python:subunit: Remove write_traceback()
       via  3031e8071c8 python:subunit: Fix skipping a test with no reason given
      from  18394daf1e6 dbcheck: formatting

https://git.samba.org/?p=samba.git;a=shortlog;h=master


- Log -----------------------------------------------------------------
commit 4152499652c2d5e5eb9ccf5a6a61c273ac6e5f13
Author: Douglas Bagnall <douglas.bagnall at catalyst.net.nz>
Date:   Wed Apr 28 17:40:08 2021 +1200

    pytests: add dns_aging, embracing and extending ageing tests
    
    This incorporates tests from various dns*.py files, but makes them
    correct.
    
    All but one of these tests pass against Windows 2012r2.
    
    Further patches will remove the broken tests in other files, and fix
    Samba so it passes these.
    
    Signed-off-by: Douglas Bagnall <douglas.bagnall at catalyst.net.nz>
    Reviewed-by: Andrew Bartlett <abartlet at samba.org>
    
    Autobuild-User(master): Andrew Bartlett <abartlet at samba.org>
    Autobuild-Date(master): Fri Jun 11 09:29:23 UTC 2021 on sn-devel-184

commit e9a265612a71d5b68197d2bcb205ec58c29463e5
Author: Douglas Bagnall <douglas.bagnall at catalyst.net.nz>
Date:   Thu May 13 03:51:45 2021 +0000

    py: samba.dnsserver: add helper for record buffers
    
    We *always* make these steps when we get a record.
    
    Signed-off-by: Douglas Bagnall <douglas.bagnall at catalyst.net.nz>
    Reviewed-by: Andrew Bartlett <abartlet at samba.org>

commit 581d7a528e86a8354ec20243deb2c436d9cf861d
Author: Douglas Bagnall <douglas.bagnall at catalyst.net.nz>
Date:   Wed May 19 02:39:00 2021 +0000

    pytest:dns_base: make_txt_update can set arbitrary TTL
    
    Also, improve a variable name.
    
    Signed-off-by: Douglas Bagnall <douglas.bagnall at catalyst.net.nz>
    Reviewed-by: Andrew Bartlett <abartlet at samba.org>

commit e37437f1ff0975d0923a89c0ed8a140cf005ba5b
Author: Douglas Bagnall <douglas.bagnall at catalyst.net.nz>
Date:   Fri May 28 18:08:56 2021 +1200

    pydns: expose dns_records_match() as dsdb_dns.records.match()
    
    Signed-off-by: Douglas Bagnall <douglas.bagnall at catalyst.net.nz>
    Reviewed-by: Andrew Bartlett <abartlet at samba.org>

commit b7077203256aac45e64497614f58555b182d4a33
Author: Douglas Bagnall <douglas.bagnall at catalyst.net.nz>
Date:   Sat May 29 21:25:29 2021 +1200

    dns: merge dns_records_match and dns_record_match
    
    Signed-off-by: Douglas Bagnall <douglas.bagnall at catalyst.net.nz>
    Reviewed-by: Andrew Bartlett <abartlet at samba.org>

commit 64e637802fc58f0215160449fbdd7686150d10d2
Author: Douglas Bagnall <douglas.bagnall at catalyst.net.nz>
Date:   Fri Apr 23 19:49:05 2021 +1200

    dlz: remove pretense of HINFO support
    
    Signed-off-by: Douglas Bagnall <douglas.bagnall at catalyst.net.nz>
    Reviewed-by: Andrew Bartlett <abartlet at samba.org>

commit 51ace4d00106920cfae560d3d4489c5daefdeb79
Author: Douglas Bagnall <douglas.bagnall at catalyst.net.nz>
Date:   Tue Apr 13 12:06:16 2021 +1200

    dns_record_match: drop pretense of HINFO support
    
    We don't support it really, and if we did there is no sense in which
    it could be updated, which is the context in which this function is
    used.
    
    (modern HINFO returns the constant string "RFC8482". See RFC 8482).
    
    Signed-off-by: Douglas Bagnall <douglas.bagnall at catalyst.net.nz>
    Reviewed-by: Andrew Bartlett <abartlet at samba.org>

commit 341febfb264c68b2e459e066c0824b38cb6be84a
Author: Douglas Bagnall <douglas.bagnall at catalyst.net.nz>
Date:   Tue Apr 13 09:57:33 2021 +1200

    dns common: dns_records_match() matches tombstones
    
    This will be needed by the RPC server. Other callers already filter
    out tombstones, so this is OK.
    
    Signed-off-by: Douglas Bagnall <douglas.bagnall at catalyst.net.nz>
    Reviewed-by: Andrew Bartlett <abartlet at samba.org>

commit 070e7113d4c9f267db856eb4db6d6c2505047a64
Author: Douglas Bagnall <douglas.bagnall at catalyst.net.nz>
Date:   Tue Apr 13 07:00:41 2021 +1200

    dns: merge dlz/internal dns_records_match()
    
    We have had three nearly identical functions called
    dns_record[s]_match. This patch merges two of them, attempting to keep
    the good bits and not the bugs.
    
    That means:
    
    1. We use the AAAA match from dlz, which is agnostic to all the
    billions of ways you can write the same IPv6 address (case sensitivity
    is just the beginning).
    
    2. We lean more on the TXT match from dns_utils, because the dlz used
    a weird bitwise &= operator, but we adjust to exit early.
    
    3. Keep HINFO from dlz (for now).
    
    4. Use the dns_name_equal() that was already in dns_common, which was
    used by dlz. dns_utils had a strange one that probably did the same
    thing.
    
    Signed-off-by: Douglas Bagnall <douglas.bagnall at catalyst.net.nz>
    Reviewed-by: Andrew Bartlett <abartlet at samba.org>

commit f6025d9f3409174ae1889caa4b94b4df2dcfa874
Author: Douglas Bagnall <douglas.bagnall at catalyst.net.nz>
Date:   Tue Apr 13 06:36:03 2021 +1200

    dlz_bind9: remove redundant logging in b9_record_match()
    
    This log message will never be seen. We know because:
    
    1. Always (two places) we are comparing an incoming record against a
       database record.
    
    2. The incoming record has come from b9_parse(), which makes the same
       check.
    
    3. If the database record is bad, we will never get here because the
       first check is b9_record_match() is
    
           if (rec1->wType != rec2->wType) {
                   return false;
           }
    
       and rec1->wType is not going to equal the corrupt database record's
       wType, because point 2.
    
    OK, but why? So we can shift this into dnsserver_common.c, because
    the internal dns server has an inferior record_match() and it could do
    with sharing this one.
    
    Signed-off-by: Douglas Bagnall <douglas.bagnall at catalyst.net.nz>
    Reviewed-by: Andrew Bartlett <abartlet at samba.org>

commit 421dc7fc4d83629d3a5f9e558d378f44c7b9dad3
Author: Joseph Sutton <josephsutton at catalyst.net.nz>
Date:   Wed Apr 28 13:55:02 2021 +1200

    python:subunit: Avoid misleading "Test was never started" error message
    
    subunithelper.py keeps track of tests that have been started, and
    displays an error message if a test reports an outcome without having
    previously been started. However, it makes the assumption that a test
    has finished once it has reported a single outcome. This means that a
    misleading error message will be displayed if it receives multiple
    outcomes from the same test (which can happen if a test using the Python
    unittest framework does not complete successfully, and the cleanup
    subsequently fails), and any actual errors from the cleanup remain
    undisplayed.
    
    This commit ensures that only a single outcome is reported for each
    test, and only after the test has finished. Outcomes are buffered up
    until the stopTest() function is called, when a single outcome is
    determined and all errors received for that test are output.
    
    FilterOps still needs to output test outcomes immediately rather than
    buffering them, otherwise they are never picked up and passed on to the
    remote test case by subunithelper.parse_results(). This would result in
    an error as the test would be considered to have never finished.
    
        Example subunitrun output before the change:
    
    time: 2021-04-28 01:28:49.862123Z
    test: samba.tests.example.ExampleTests.test
    time: 2021-04-28 01:28:49.862215Z
    failure: samba.tests.example.ExampleTests.test [
    Traceback (most recent call last):
      File "bin/python/samba/tests/example.py", line 28, in test
        self.fail()
    AssertionError: None
    ]
    time: 2021-04-28 01:28:49.862407Z
    failure: samba.tests.example.ExampleTests.test [
    Traceback (most recent call last):
      File "bin/python/samba/tests/example.py", line 31, in tearDown
        self.fail()
    AssertionError: None
    ]
    time: 2021-04-28 01:28:49.862467Z
    time: 2021-04-28 01:28:49.862510Z
    
        and after:
    
    time: 2021-04-28 01:29:19.949347Z
    test: samba.tests.example.ExampleTests.test
    time: 2021-04-28 01:29:19.949440Z
    time: 2021-04-28 01:29:19.949590Z
    time: 2021-04-28 01:29:19.949640Z
    failure: samba.tests.example.ExampleTests.test [
    Traceback (most recent call last):
      File "bin/python/samba/tests/example.py", line 28, in test
        self.fail()
    AssertionError: None
    Traceback (most recent call last):
      File "bin/python/samba/tests/example.py", line 31, in tearDown
        self.fail()
    AssertionError: None
    ]
    time: 2021-04-28 01:29:19.949702Z
    
    Signed-off-by: Joseph Sutton <josephsutton at catalyst.net.nz>
    Reviewed-by: Andreas Schneider <asn at samba.org>
    Reviewed-by: Andrew Bartlett <abartlet at samba.org>

commit 18b78fa4b464200120f72a498530c5e9471a2565
Author: Joseph Sutton <josephsutton at catalyst.net.nz>
Date:   Wed Apr 28 13:54:44 2021 +1200

    python:subunit: Remove write_traceback()
    
    This functionality is already present in the Python unittest framework,
    and so is not necessary to include here.
    
    Signed-off-by: Joseph Sutton <josephsutton at catalyst.net.nz>
    Reviewed-by: Andreas Schneider <asn at samba.org>
    Reviewed-by: Andrew Bartlett <abartlet at samba.org>

commit 3031e8071c8938f0e64212fd4f767de694032be3
Author: Joseph Sutton <josephsutton at catalyst.net.nz>
Date:   Wed Apr 28 14:17:56 2021 +1200

    python:subunit: Fix skipping a test with no reason given
    
    Not specifying a reason means addSkip() is passed an empty string rather
    than None. As a result, this condition was never hit, and the call to
    _addOutcome() had an incorrect parameter.
    
    Signed-off-by: Joseph Sutton <josephsutton at catalyst.net.nz>
    Reviewed-by: Andreas Schneider <asn at samba.org>
    Reviewed-by: Andrew Bartlett <abartlet at samba.org>

-----------------------------------------------------------------------

Summary of changes:
 python/samba/dnsserver.py              |    7 +
 python/samba/subunit/run.py            |  188 ++--
 python/samba/tests/dns_aging.py        | 1895 ++++++++++++++++++++++++++++++++
 python/samba/tests/dns_base.py         |    6 +-
 selftest/knownfail.d/dns-aging         |   33 +
 selftest/subunithelper.py              |    8 +
 source4/dns_server/dlz_bind9.c         |   91 +-
 source4/dns_server/dns_server.h        |    2 -
 source4/dns_server/dns_update.c        |    8 +-
 source4/dns_server/dns_utils.c         |   73 --
 source4/dns_server/dnsserver_common.c  |   81 ++
 source4/dns_server/dnsserver_common.h  |    3 +
 source4/dns_server/pydns.c             |   37 +
 source4/rpc_server/dnsserver/dnsdata.c |   66 --
 source4/selftest/tests.py              |   10 +
 15 files changed, 2172 insertions(+), 336 deletions(-)
 create mode 100644 python/samba/tests/dns_aging.py
 create mode 100644 selftest/knownfail.d/dns-aging


Changeset truncated at 500 lines:

diff --git a/python/samba/dnsserver.py b/python/samba/dnsserver.py
index 68634dbeb5f..9bcab7fb023 100644
--- a/python/samba/dnsserver.py
+++ b/python/samba/dnsserver.py
@@ -297,6 +297,13 @@ def flag_from_string(rec_type):
         raise DNSParseError('Unknown type of DNS record %s' % rec_type) from e
 
 
+def recbuf_from_string(*args, **kwargs):
+    rec = record_from_string(*args, **kwargs)
+    buf = dnsserver.DNS_RPC_RECORD_BUF()
+    buf.rec = rec
+    return buf
+
+
 def dns_name_equal(n1, n2):
     """Match dns name (of type DNS_RPC_NAME)"""
     return n1.str.rstrip('.').lower() == n2.str.rstrip('.').lower()
diff --git a/python/samba/subunit/run.py b/python/samba/subunit/run.py
index 45425e6c2e6..913c61f0d1b 100755
--- a/python/samba/subunit/run.py
+++ b/python/samba/subunit/run.py
@@ -27,49 +27,9 @@
 import datetime
 import os
 import sys
-import traceback
 import unittest
 
 
-# Whether or not to hide layers of the stack trace that are
-# unittest/testtools internal code.  Defaults to True since the
-# system-under-test is rarely unittest or testtools.
-HIDE_INTERNAL_STACK = True
-
-
-def write_traceback(stream, err, test):
-    """Converts a sys.exc_info()-style tuple of values into a string.
-
-    Copied from Python 2.7's unittest.TestResult._exc_info_to_string.
-    """
-    def _is_relevant_tb_level(tb):
-        return '__unittest' in tb.tb_frame.f_globals
-
-    def _count_relevant_tb_levels(tb):
-        length = 0
-        while tb and not _is_relevant_tb_level(tb):
-            length += 1
-            tb = tb.tb_next
-        return length
-
-    exctype, value, tb = err
-    # Skip test runner traceback levels
-    if HIDE_INTERNAL_STACK:
-        while tb and _is_relevant_tb_level(tb):
-            tb = tb.tb_next
-
-    format_exception = traceback.format_exception
-
-    if (HIDE_INTERNAL_STACK and test.failureException
-        and isinstance(value, test.failureException)):
-        # Skip assert*() traceback levels
-        length = _count_relevant_tb_levels(tb)
-        msgLines = format_exception(exctype, value, tb, length)
-    else:
-        msgLines = format_exception(exctype, value, tb)
-    stream.writelines(msgLines)
-
-
 class TestProtocolClient(unittest.TestResult):
     """A TestResult which generates a subunit stream for a test run.
 
@@ -92,80 +52,28 @@ class TestProtocolClient(unittest.TestResult):
     def __init__(self, stream):
         unittest.TestResult.__init__(self)
         self._stream = stream
-        self.failed = False
-
-    def wasSuccessful(self):
-        return not self.failed
-
-    def addError(self, test, error=None):
-        """Report an error in test test.
-
-        :param error: Standard unittest positional argument form - an
-            exc_info tuple.
-        """
-        self._addOutcome("error", test, error=error)
-        self.failed = True
-
-    def addExpectedFailure(self, test, error=None):
-        """Report an expected failure in test test.
-
-        :param error: Standard unittest positional argument form - an
-            exc_info tuple.
-        """
-        self._addOutcome("xfail", test, error=error)
-
-    def addFailure(self, test, error=None):
-        """Report a failure in test test.
+        self.successes = []
 
-        :param error: Standard unittest positional argument form - an
-            exc_info tuple.
-        """
-        self._addOutcome("failure", test, error=error)
-        self.failed = True
-
-    def _addOutcome(self, outcome, test, error=None, error_permitted=True):
-        """Report a failure in test test.
+    def _addOutcome(self, outcome, test, errors=None):
+        """Report an outcome of test test.
 
         :param outcome: A string describing the outcome - used as the
             event name in the subunit stream.
-        :param error: Standard unittest positional argument form - an
-            exc_info tuple.
-        :param error_permitted: If True then error must be supplied.
-            If False then error must not be supplied.
+        :param errors: A list of strings describing the errors.
         """
         self._stream.write(("%s: " % outcome) + test.id())
-        if error_permitted:
-            if error is None:
-                raise ValueError
-        else:
-            if error is not None:
-                raise ValueError
-        if error is not None:
+        if errors:
             self._stream.write(" [\n")
-            write_traceback(self._stream, error, test)
-        else:
-            self._stream.write("\n")
-        if error is not None:
-            self._stream.write("]\n")
-
-    def addSkip(self, test, reason=None):
-        """Report a skipped test."""
-        if reason is None:
-            self._addOutcome("skip", test, error=None)
-        else:
-            self._stream.write("skip: %s [\n" % test.id())
-            self._stream.write("%s\n" % reason)
-            self._stream.write("]\n")
+            for error in errors:
+                self._stream.write(error)
+                if not error.endswith('\n'):
+                    self._stream.write('\n')
+            self._stream.write("]")
+        self._stream.write("\n")
 
     def addSuccess(self, test):
         """Report a success in a test."""
-        self._addOutcome("successful", test, error_permitted=False)
-
-    def addUnexpectedSuccess(self, test):
-        """Report an unexpected success in test test.
-        """
-        self._addOutcome("uxsuccess", test, error_permitted=False)
-        self.failed = True
+        self.successes.append(test)
 
     def startTest(self, test):
         """Mark a test as starting its test run."""
@@ -174,9 +82,81 @@ class TestProtocolClient(unittest.TestResult):
         self._stream.flush()
 
     def stopTest(self, test):
+        """Mark a test as having finished its test run."""
         super(TestProtocolClient, self).stopTest(test)
+        self.writeOutcome(test)
+
+    def writeOutcome(self, test):
+        """Output the overall outcome for test test."""
+        err,       self.errors              = self._filterErrors(test,    self.errors)
+        fail,      self.failures            = self._filterErrors(test,    self.failures)
+        xfail,     self.expectedFailures    = self._filterErrors(test,    self.expectedFailures)
+        skip,      self.skipped             = self._filterErrors(test,    self.skipped)
+        success,   self.successes           = self._filterSuccesses(test, self.successes)
+        uxsuccess, self.unexpectedSuccesses = self._filterSuccesses(test, self.unexpectedSuccesses)
+
+        if err:
+            outcome = "error"
+        elif fail:
+            outcome = "failure"
+        elif skip:
+            outcome = "skip"
+        elif uxsuccess:
+            outcome = "uxsuccess"
+        elif xfail:
+            outcome = "xfail"
+        elif success:
+            outcome = "successful"
+        else:
+            outcome = None
+
+        if outcome:
+            self._addOutcome(outcome, test, errors=err+fail+skip+xfail)
+
         self._stream.flush()
 
+    def _filterErrors(self, test, errors):
+        """Filter a list of errors by test test.
+
+        :param test: The test to filter by.
+        :param errors: A list of <test, error> pairs to filter.
+
+        :return: A pair whose first element is a list of strings containing
+            errors that apply to test test, and whose second element is a list
+            of the remaining elements.
+        """
+        filtered = []
+        unfiltered = []
+
+        for error in errors:
+            if error[0] is test:
+                filtered.append(error[1])
+            else:
+                unfiltered.append(error)
+
+        return (filtered, unfiltered)
+
+    def _filterSuccesses(self, test, successes):
+        """Filter a list of successes by test test.
+
+        :param test: The test to filter by.
+        :param successes: A list of tests to filter.
+
+        :return: A tuple whose first element is a boolean stating whether test
+            test was found in the list of successes, and whose second element is
+            a list of the remaining elements.
+        """
+        filtered = False
+        unfiltered = []
+
+        for success in successes:
+            if success is test:
+                filtered = True
+            else:
+                unfiltered.append(success)
+
+        return (filtered, unfiltered)
+
     def time(self, a_datetime):
         """Inform the client of the time.
 
diff --git a/python/samba/tests/dns_aging.py b/python/samba/tests/dns_aging.py
new file mode 100644
index 00000000000..f9bcdb2eed2
--- /dev/null
+++ b/python/samba/tests/dns_aging.py
@@ -0,0 +1,1895 @@
+# Unix SMB/CIFS implementation.
+# Copyright (C) Kai Blin  <kai at samba.org> 2011
+# Copyright (C) Catalyst.NET 2021
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 3 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program.  If not, see <http://www.gnu.org/licenses/>.
+#
+
+import sys
+from samba import dsdb
+from samba import dsdb_dns
+from samba.ndr import ndr_unpack, ndr_pack
+from samba.samdb import SamDB
+from samba.auth import system_session
+import ldb
+from samba import credentials
+from samba.dcerpc import dns, dnsp, dnsserver
+from samba.dnsserver import TXTRecord
+from samba.dnsserver import recbuf_from_string
+from samba.tests.subunitrun import SubunitOptions, TestProgram
+from samba import werror, WERRORError
+from samba.tests.dns_base import DNSTest
+import samba.getopt as options
+import optparse
+import time
+
+
+parser = optparse.OptionParser(
+    "dns_aging.py <server name> <server ip> [options]")
+sambaopts = options.SambaOptions(parser)
+parser.add_option_group(sambaopts)
+
+
+# use command line creds if available
+credopts = options.CredentialsOptions(parser)
+parser.add_option_group(credopts)
+subunitopts = SubunitOptions(parser)
+parser.add_option_group(subunitopts)
+
+opts, args = parser.parse_args()
+if len(args) < 2:
+    parser.print_usage()
+    sys.exit(1)
+
+LP = sambaopts.get_loadparm()
+CREDS = credopts.get_credentials(LP)
+SERVER_NAME = args[0]
+SERVER_IP = args[1]
+CREDS.set_krb_forwardable(credentials.NO_KRB_FORWARDABLE)
+
+DOMAIN = CREDS.get_realm().lower()
+
+# Unix time start, in DNS timestamp (24 * 365.25 * 369)
+# These are ballpark extremes for the timestamp.
+DNS_TIMESTAMP_1970 = 3234654
+DNS_TIMESTAMP_2101 = 4383000
+DNS_TIMESTAMP_1981 = 3333333  # a middling timestamp
+
+def get_samdb():
+    return SamDB(url=f"ldap://{SERVER_IP}",
+                 lp=LP,
+                 session_info=system_session(),
+                 credentials=CREDS)
+
+
+def get_file_samdb():
+    # For Samba only direct file access, needed for the tombstoning functions.
+    # (For Windows, we instruct it to tombstone over RPC).
+    return SamDB(url=LP.samdb_url(),
+                 lp=LP,
+                 session_info=system_session(),
+                 credentials=CREDS)
+
+
+def get_rpc():
+    return dnsserver.dnsserver(f"ncacn_ip_tcp:{SERVER_IP}[sign]", LP, CREDS)
+
+
+def create_zone(name, rpc=None, aging=True):
+    if rpc is None:
+        rpc = get_rpc()
+    z = dnsserver.DNS_RPC_ZONE_CREATE_INFO_LONGHORN()
+    z.pszZoneName = name
+    z.dwZoneType = dnsp.DNS_ZONE_TYPE_PRIMARY
+    z.fAging = int(bool(aging))
+    z.dwDpFlags = dnsserver.DNS_DP_DOMAIN_DEFAULT
+    z.fDsIntegrated = 1
+    z.fLoadExisting = 1
+    z.fAllowUpdate = dnsp.DNS_ZONE_UPDATE_UNSECURE
+    rpc.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+                         0,
+                         SERVER_IP,
+                         None,
+                         0,
+                         'ZoneCreate',
+                         dnsserver.DNSSRV_TYPEID_ZONE_CREATE,
+                         z)
+
+
+def delete_zone(name, rpc=None):
+    if rpc is None:
+        rpc = get_rpc()
+    rpc.DnssrvOperation2(dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+                         0,
+                         SERVER_IP,
+                         name,
+                         0,
+                         'DeleteZoneFromDs',
+                         dnsserver.DNSSRV_TYPEID_NULL,
+                         None)
+
+
+def txt_s_list(txt):
+    """Construct a txt record string list, which is a fiddly matter."""
+    if isinstance(txt, str):
+        txt = [txt]
+    s_list = dnsp.string_list()
+    s_list.count = len(txt)
+    s_list.str = txt
+    return s_list
+
+
+def copy_rec(rec):
+    copy = dnsserver.DNS_RPC_RECORD()
+    copy.wType = rec.wType
+    copy.dwFlags = rec.dwFlags
+    copy.dwSerial = rec.dwSerial
+    copy.dwTtlSeconds = rec.dwTtlSeconds
+    copy.data = rec.data
+    copy.dwTimeStamp = rec.dwTimeStamp
+    return copy
+
+
+class TestDNSAging(DNSTest):
+    """Probe DNS aging and scavenging, using LDAP and RPC to set and test
+    the timestamps behind DNS's back."""
+    server = SERVER_NAME
+    server_ip = SERVER_IP
+    creds = CREDS
+
+    def setUp(self):
+        super().setUp()
+        self.rpc_conn = get_rpc()
+        self.samdb = get_samdb()
+
+        # We always have a zone of our own named after the test function.
+        self.zone = self.id().rsplit('.', 1)[1]
+        self.addCleanup(delete_zone, self.zone, self.rpc_conn)
+        try:
+            create_zone(self.zone, self.rpc_conn)
+        except WERRORError as e:
+            if e.args[0] != werror.WERR_DNS_ERROR_ZONE_ALREADY_EXISTS:
+                raise
+            print(f"zone {self.zone} already exists")
+
+        # Though we set this in create_zone(), that doesn't work on
+        # Windows, so we repeat again here.
+        self.set_zone_int_params(AllowUpdate=dnsp.DNS_ZONE_UPDATE_UNSECURE)
+
+        self.zone_dn = (f"DC={self.zone},CN=MicrosoftDNS,DC=DomainDNSZones,"
+                        f"{self.samdb.get_default_basedn()}")
+
+    def set_zone_int_params(self, zone=None, **kwargs):
+        """Keyword arguments set parameters on the zone. e.g.:
+
+            self.set_zone_int_params(Aging=1,
+                                     RefreshInterval=222)
+
+        See [MS-DNSP] 3.1.1.2.1 "DNS Zone Integer Properties" for names.
+        """
+        if zone is None:
+            zone = self.zone
+        for key, val in kwargs.items():
+            name_param = dnsserver.DNS_RPC_NAME_AND_PARAM()
+            name_param.dwParam = val
+            name_param.pszNodeName = key
+            try:
+                self.rpc_conn.DnssrvOperation2(
+                    dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+                    0,
+                    SERVER_IP,
+                    zone,
+                    0,
+                    'ResetDwordProperty',
+                    dnsserver.DNSSRV_TYPEID_NAME_AND_PARAM,
+                    name_param)
+            except WERRORError as e:
+                self.fail(str(e))
+
+    def rpc_replace(self, name, old=None, new=None):
+        """Replace a DNS_RPC_RECORD or DNS_RPC_RECORD_BUF"""
+        # wrap our recs, if necessary
+        if isinstance(new, dnsserver.DNS_RPC_RECORD):
+            rec = new
+            new = dnsserver.DNS_RPC_RECORD_BUF()
+            new.rec = rec
+
+        if isinstance(old, dnsserver.DNS_RPC_RECORD):
+            rec = old
+            old = dnsserver.DNS_RPC_RECORD_BUF()
+            old.rec = rec
+
+        try:
+            self.rpc_conn.DnssrvUpdateRecord2(
+                dnsserver.DNS_CLIENT_VERSION_LONGHORN,
+                0,
+                SERVER_IP,
+                self.zone,
+                name,
+                new,
+                old)
+        except WERRORError as e:
+            self.fail(f"could not replace record ({e})")
+
+    def rpc_add(self, name, data, wtype):
+        rec_buf = recbuf_from_string(wtype, data)
+        self.rpc_replace(name, None, rec_buf)
+
+    def rpc_delete(self, name, data, wtype):
+        rec_buf = recbuf_from_string(wtype, data)
+        self.rpc_replace(name, rec_buf, None)
+
+    def get_unique_txt_record(self, name, txt):
+        """Get the TXT record on Name with value txt, asserting that there is
+        only one."""
+        if isinstance(txt, str):
+            txt = [txt]
+        recs = self.ldap_get_records(name)
+
+        match = None
+        for r in recs:
+            if r.wType != dnsp.DNS_TYPE_TXT:
+                continue
+            txt2 = [x for x in r.data.str]
+            if txt2 == txt:
+                self.assertIsNone(match)
+                match = r


-- 
Samba Shared Repository



More information about the samba-cvs mailing list